##// END OF EJS Templates
remove debug statements from Scheduler...
MinRK -
Show More
@@ -1,703 +1,692 b''
1 1 """The Python scheduler for rich scheduling.
2 2
3 3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 5 Python Scheduler exists.
6 6
7 7 Authors:
8 8
9 9 * Min RK
10 10 """
11 11 #-----------------------------------------------------------------------------
12 12 # Copyright (C) 2010-2011 The IPython Development Team
13 13 #
14 14 # Distributed under the terms of the BSD License. The full license is in
15 15 # the file COPYING, distributed as part of this software.
16 16 #-----------------------------------------------------------------------------
17 17
18 18 #----------------------------------------------------------------------
19 19 # Imports
20 20 #----------------------------------------------------------------------
21 21
22 22 from __future__ import print_function
23 23
24 24 import logging
25 25 import sys
26 26
27 27 from datetime import datetime, timedelta
28 28 from random import randint, random
29 29 from types import FunctionType
30 30
31 31 try:
32 32 import numpy
33 33 except ImportError:
34 34 numpy = None
35 35
36 36 import zmq
37 37 from zmq.eventloop import ioloop, zmqstream
38 38
39 39 # local imports
40 40 from IPython.external.decorator import decorator
41 41 from IPython.config.application import Application
42 42 from IPython.config.loader import Config
43 43 from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum
44 44
45 45 from IPython.parallel import error
46 46 from IPython.parallel.factory import SessionFactory
47 47 from IPython.parallel.util import connect_logger, local_logger
48 48
49 49 from .dependency import Dependency
50 50
51 51 @decorator
52 52 def logged(f,self,*args,**kwargs):
53 53 # print ("#--------------------")
54 self.log.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
54 self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs)
55 55 # print ("#--")
56 56 return f(self,*args, **kwargs)
57 57
58 58 #----------------------------------------------------------------------
59 59 # Chooser functions
60 60 #----------------------------------------------------------------------
61 61
62 62 def plainrandom(loads):
63 63 """Plain random pick."""
64 64 n = len(loads)
65 65 return randint(0,n-1)
66 66
67 67 def lru(loads):
68 68 """Always pick the front of the line.
69 69
70 70 The content of `loads` is ignored.
71 71
72 72 Assumes LRU ordering of loads, with oldest first.
73 73 """
74 74 return 0
75 75
76 76 def twobin(loads):
77 77 """Pick two at random, use the LRU of the two.
78 78
79 79 The content of loads is ignored.
80 80
81 81 Assumes LRU ordering of loads, with oldest first.
82 82 """
83 83 n = len(loads)
84 84 a = randint(0,n-1)
85 85 b = randint(0,n-1)
86 86 return min(a,b)
87 87
88 88 def weighted(loads):
89 89 """Pick two at random using inverse load as weight.
90 90
91 91 Return the less loaded of the two.
92 92 """
93 93 # weight 0 a million times more than 1:
94 94 weights = 1./(1e-6+numpy.array(loads))
95 95 sums = weights.cumsum()
96 96 t = sums[-1]
97 97 x = random()*t
98 98 y = random()*t
99 99 idx = 0
100 100 idy = 0
101 101 while sums[idx] < x:
102 102 idx += 1
103 103 while sums[idy] < y:
104 104 idy += 1
105 105 if weights[idy] > weights[idx]:
106 106 return idy
107 107 else:
108 108 return idx
109 109
110 110 def leastload(loads):
111 111 """Always choose the lowest load.
112 112
113 113 If the lowest load occurs more than once, the first
114 114 occurance will be used. If loads has LRU ordering, this means
115 115 the LRU of those with the lowest load is chosen.
116 116 """
117 117 return loads.index(min(loads))
118 118
119 119 #---------------------------------------------------------------------
120 120 # Classes
121 121 #---------------------------------------------------------------------
122 122 # store empty default dependency:
123 123 MET = Dependency([])
124 124
125 125 class TaskScheduler(SessionFactory):
126 126 """Python TaskScheduler object.
127 127
128 128 This is the simplest object that supports msg_id based
129 129 DAG dependencies. *Only* task msg_ids are checked, not
130 130 msg_ids of jobs submitted via the MUX queue.
131 131
132 132 """
133 133
134 134 hwm = Int(0, config=True, shortname='hwm',
135 135 help="""specify the High Water Mark (HWM) for the downstream
136 136 socket in the Task scheduler. This is the maximum number
137 137 of allowed outstanding tasks on each engine."""
138 138 )
139 139 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
140 140 'leastload', config=True, shortname='scheme', allow_none=False,
141 141 help="""select the task scheduler scheme [default: Python LRU]
142 142 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
143 143 )
144 144 def _scheme_name_changed(self, old, new):
145 145 self.log.debug("Using scheme %r"%new)
146 146 self.scheme = globals()[new]
147 147
148 148 # input arguments:
149 149 scheme = Instance(FunctionType) # function for determining the destination
150 150 def _scheme_default(self):
151 151 return leastload
152 152 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
153 153 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
154 154 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
155 155 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
156 156
157 157 # internals:
158 158 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
159 159 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
160 160 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
161 161 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
162 162 pending = Dict() # dict by engine_uuid of submitted tasks
163 163 completed = Dict() # dict by engine_uuid of completed tasks
164 164 failed = Dict() # dict by engine_uuid of failed tasks
165 165 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
166 166 clients = Dict() # dict by msg_id for who submitted the task
167 167 targets = List() # list of target IDENTs
168 168 loads = List() # list of engine loads
169 169 # full = Set() # set of IDENTs that have HWM outstanding tasks
170 170 all_completed = Set() # set of all completed tasks
171 171 all_failed = Set() # set of all failed tasks
172 172 all_done = Set() # set of all finished tasks=union(completed,failed)
173 173 all_ids = Set() # set of all submitted task IDs
174 174 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
175 175 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
176 176
177 177
178 178 def start(self):
179 179 self.engine_stream.on_recv(self.dispatch_result, copy=False)
180 180 self._notification_handlers = dict(
181 181 registration_notification = self._register_engine,
182 182 unregistration_notification = self._unregister_engine
183 183 )
184 184 self.notifier_stream.on_recv(self.dispatch_notification)
185 185 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
186 186 self.auditor.start()
187 187 self.log.info("Scheduler started [%s]"%self.scheme_name)
188 188
189 189 def resume_receiving(self):
190 190 """Resume accepting jobs."""
191 191 self.client_stream.on_recv(self.dispatch_submission, copy=False)
192 192
193 193 def stop_receiving(self):
194 194 """Stop accepting jobs while there are no engines.
195 195 Leave them in the ZMQ queue."""
196 196 self.client_stream.on_recv(None)
197 197
198 198 #-----------------------------------------------------------------------
199 199 # [Un]Registration Handling
200 200 #-----------------------------------------------------------------------
201 201
202 202 def dispatch_notification(self, msg):
203 203 """dispatch register/unregister events."""
204 204 try:
205 205 idents,msg = self.session.feed_identities(msg)
206 206 except ValueError:
207 207 self.log.warn("task::Invalid Message: %r"%msg)
208 208 return
209 209 try:
210 210 msg = self.session.unpack_message(msg)
211 211 except ValueError:
212 212 self.log.warn("task::Unauthorized message from: %r"%idents)
213 213 return
214 214
215 215 msg_type = msg['msg_type']
216 216
217 217 handler = self._notification_handlers.get(msg_type, None)
218 218 if handler is None:
219 219 self.log.error("Unhandled message type: %r"%msg_type)
220 220 else:
221 221 try:
222 222 handler(str(msg['content']['queue']))
223 223 except KeyError:
224 224 self.log.error("task::Invalid notification msg: %r"%msg)
225 225
226 226 @logged
227 227 def _register_engine(self, uid):
228 228 """New engine with ident `uid` became available."""
229 229 # head of the line:
230 230 self.targets.insert(0,uid)
231 231 self.loads.insert(0,0)
232 232 # initialize sets
233 233 self.completed[uid] = set()
234 234 self.failed[uid] = set()
235 235 self.pending[uid] = {}
236 236 if len(self.targets) == 1:
237 237 self.resume_receiving()
238 238 # rescan the graph:
239 239 self.update_graph(None)
240 240
241 241 def _unregister_engine(self, uid):
242 242 """Existing engine with ident `uid` became unavailable."""
243 243 if len(self.targets) == 1:
244 244 # this was our only engine
245 245 self.stop_receiving()
246 246
247 247 # handle any potentially finished tasks:
248 248 self.engine_stream.flush()
249 249
250 250 # don't pop destinations, because they might be used later
251 251 # map(self.destinations.pop, self.completed.pop(uid))
252 252 # map(self.destinations.pop, self.failed.pop(uid))
253 253
254 254 # prevent this engine from receiving work
255 255 idx = self.targets.index(uid)
256 256 self.targets.pop(idx)
257 257 self.loads.pop(idx)
258 258
259 259 # wait 5 seconds before cleaning up pending jobs, since the results might
260 260 # still be incoming
261 261 if self.pending[uid]:
262 262 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
263 263 dc.start()
264 264 else:
265 265 self.completed.pop(uid)
266 266 self.failed.pop(uid)
267 267
268 268
269 @logged
270 269 def handle_stranded_tasks(self, engine):
271 270 """Deal with jobs resident in an engine that died."""
272 271 lost = self.pending[engine]
273 272 for msg_id in lost.keys():
274 273 if msg_id not in self.pending[engine]:
275 274 # prevent double-handling of messages
276 275 continue
277 276
278 277 raw_msg = lost[msg_id][0]
279 278 idents,msg = self.session.feed_identities(raw_msg, copy=False)
280 279 parent = self.session.unpack(msg[1].bytes)
281 280 idents = [engine, idents[0]]
282 281
283 282 # build fake error reply
284 283 try:
285 284 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
286 285 except:
287 286 content = error.wrap_exception()
288 287 msg = self.session.msg('apply_reply', content, parent=parent, subheader={'status':'error'})
289 288 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
290 289 # and dispatch it
291 290 self.dispatch_result(raw_reply)
292 291
293 292 # finally scrub completed/failed lists
294 293 self.completed.pop(engine)
295 294 self.failed.pop(engine)
296 295
297 296
298 297 #-----------------------------------------------------------------------
299 298 # Job Submission
300 299 #-----------------------------------------------------------------------
301 @logged
302 300 def dispatch_submission(self, raw_msg):
303 301 """Dispatch job submission to appropriate handlers."""
304 302 # ensure targets up to date:
305 303 self.notifier_stream.flush()
306 304 try:
307 305 idents, msg = self.session.feed_identities(raw_msg, copy=False)
308 306 msg = self.session.unpack_message(msg, content=False, copy=False)
309 307 except Exception:
310 308 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
311 309 return
312 310
313 311
314 312 # send to monitor
315 313 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
316 314
317 315 header = msg['header']
318 316 msg_id = header['msg_id']
319 317 self.all_ids.add(msg_id)
320 318
321 319 # targets
322 320 targets = set(header.get('targets', []))
323 321 retries = header.get('retries', 0)
324 322 self.retries[msg_id] = retries
325 323
326 324 # time dependencies
327 325 after = Dependency(header.get('after', []))
328 326 if after.all:
329 327 if after.success:
330 328 after.difference_update(self.all_completed)
331 329 if after.failure:
332 330 after.difference_update(self.all_failed)
333 331 if after.check(self.all_completed, self.all_failed):
334 332 # recast as empty set, if `after` already met,
335 333 # to prevent unnecessary set comparisons
336 334 after = MET
337 335
338 336 # location dependencies
339 337 follow = Dependency(header.get('follow', []))
340 338
341 339 # turn timeouts into datetime objects:
342 340 timeout = header.get('timeout', None)
343 341 if timeout:
344 342 timeout = datetime.now() + timedelta(0,timeout,0)
345 343
346 344 args = [raw_msg, targets, after, follow, timeout]
347 345
348 346 # validate and reduce dependencies:
349 347 for dep in after,follow:
350 348 # check valid:
351 349 if msg_id in dep or dep.difference(self.all_ids):
352 350 self.depending[msg_id] = args
353 351 return self.fail_unreachable(msg_id, error.InvalidDependency)
354 352 # check if unreachable:
355 353 if dep.unreachable(self.all_completed, self.all_failed):
356 354 self.depending[msg_id] = args
357 355 return self.fail_unreachable(msg_id)
358 356
359 357 if after.check(self.all_completed, self.all_failed):
360 358 # time deps already met, try to run
361 359 if not self.maybe_run(msg_id, *args):
362 360 # can't run yet
363 361 if msg_id not in self.all_failed:
364 362 # could have failed as unreachable
365 363 self.save_unmet(msg_id, *args)
366 364 else:
367 365 self.save_unmet(msg_id, *args)
368 366
369 # @logged
370 367 def audit_timeouts(self):
371 368 """Audit all waiting tasks for expired timeouts."""
372 369 now = datetime.now()
373 370 for msg_id in self.depending.keys():
374 371 # must recheck, in case one failure cascaded to another:
375 372 if msg_id in self.depending:
376 373 raw,after,targets,follow,timeout = self.depending[msg_id]
377 374 if timeout and timeout < now:
378 375 self.fail_unreachable(msg_id, error.TaskTimeout)
379 376
380 @logged
381 377 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
382 378 """a task has become unreachable, send a reply with an ImpossibleDependency
383 379 error."""
384 380 if msg_id not in self.depending:
385 self.log.error("msg %r already failed!"%msg_id)
381 self.log.error("msg %r already failed!", msg_id)
386 382 return
387 383 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
388 384 for mid in follow.union(after):
389 385 if mid in self.graph:
390 386 self.graph[mid].remove(msg_id)
391 387
392 388 # FIXME: unpacking a message I've already unpacked, but didn't save:
393 389 idents,msg = self.session.feed_identities(raw_msg, copy=False)
394 390 header = self.session.unpack(msg[1].bytes)
395 391
396 392 try:
397 393 raise why()
398 394 except:
399 395 content = error.wrap_exception()
400 396
401 397 self.all_done.add(msg_id)
402 398 self.all_failed.add(msg_id)
403 399
404 400 msg = self.session.send(self.client_stream, 'apply_reply', content,
405 401 parent=header, ident=idents)
406 402 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
407 403
408 404 self.update_graph(msg_id, success=False)
409 405
410 @logged
411 406 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
412 407 """check location dependencies, and run if they are met."""
413 408 blacklist = self.blacklist.setdefault(msg_id, set())
414 409 if follow or targets or blacklist or self.hwm:
415 410 # we need a can_run filter
416 411 def can_run(idx):
417 412 # check hwm
418 413 if self.hwm and self.loads[idx] == self.hwm:
419 414 return False
420 415 target = self.targets[idx]
421 416 # check blacklist
422 417 if target in blacklist:
423 418 return False
424 419 # check targets
425 420 if targets and target not in targets:
426 421 return False
427 422 # check follow
428 423 return follow.check(self.completed[target], self.failed[target])
429 424
430 425 indices = filter(can_run, range(len(self.targets)))
431 426
432 427 if not indices:
433 428 # couldn't run
434 429 if follow.all:
435 430 # check follow for impossibility
436 431 dests = set()
437 432 relevant = set()
438 433 if follow.success:
439 434 relevant = self.all_completed
440 435 if follow.failure:
441 436 relevant = relevant.union(self.all_failed)
442 437 for m in follow.intersection(relevant):
443 438 dests.add(self.destinations[m])
444 439 if len(dests) > 1:
445 440 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
446 441 self.fail_unreachable(msg_id)
447 442 return False
448 443 if targets:
449 444 # check blacklist+targets for impossibility
450 445 targets.difference_update(blacklist)
451 446 if not targets or not targets.intersection(self.targets):
452 447 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
453 448 self.fail_unreachable(msg_id)
454 449 return False
455 450 return False
456 451 else:
457 452 indices = None
458 453
459 454 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
460 455 return True
461 456
462 @logged
463 457 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
464 458 """Save a message for later submission when its dependencies are met."""
465 459 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
466 460 # track the ids in follow or after, but not those already finished
467 461 for dep_id in after.union(follow).difference(self.all_done):
468 462 if dep_id not in self.graph:
469 463 self.graph[dep_id] = set()
470 464 self.graph[dep_id].add(msg_id)
471 465
472 @logged
473 466 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
474 467 """Submit a task to any of a subset of our targets."""
475 468 if indices:
476 469 loads = [self.loads[i] for i in indices]
477 470 else:
478 471 loads = self.loads
479 472 idx = self.scheme(loads)
480 473 if indices:
481 474 idx = indices[idx]
482 475 target = self.targets[idx]
483 476 # print (target, map(str, msg[:3]))
484 477 # send job to the engine
485 478 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
486 479 self.engine_stream.send_multipart(raw_msg, copy=False)
487 480 # update load
488 481 self.add_job(idx)
489 482 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
490 483 # notify Hub
491 484 content = dict(msg_id=msg_id, engine_id=target)
492 485 self.session.send(self.mon_stream, 'task_destination', content=content,
493 486 ident=['tracktask',self.session.session])
494 487
495 488
496 489 #-----------------------------------------------------------------------
497 490 # Result Handling
498 491 #-----------------------------------------------------------------------
499 @logged
500 492 def dispatch_result(self, raw_msg):
501 493 """dispatch method for result replies"""
502 494 try:
503 495 idents,msg = self.session.feed_identities(raw_msg, copy=False)
504 496 msg = self.session.unpack_message(msg, content=False, copy=False)
505 497 engine = idents[0]
506 498 try:
507 499 idx = self.targets.index(engine)
508 500 except ValueError:
509 501 pass # skip load-update for dead engines
510 502 else:
511 503 self.finish_job(idx)
512 504 except Exception:
513 self.log.error("task::Invaid result: %r"%raw_msg, exc_info=True)
505 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
514 506 return
515 507
516 508 header = msg['header']
517 509 parent = msg['parent_header']
518 510 if header.get('dependencies_met', True):
519 511 success = (header['status'] == 'ok')
520 512 msg_id = parent['msg_id']
521 513 retries = self.retries[msg_id]
522 514 if not success and retries > 0:
523 515 # failed
524 516 self.retries[msg_id] = retries - 1
525 517 self.handle_unmet_dependency(idents, parent)
526 518 else:
527 519 del self.retries[msg_id]
528 520 # relay to client and update graph
529 521 self.handle_result(idents, parent, raw_msg, success)
530 522 # send to Hub monitor
531 523 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
532 524 else:
533 525 self.handle_unmet_dependency(idents, parent)
534 526
535 @logged
536 527 def handle_result(self, idents, parent, raw_msg, success=True):
537 528 """handle a real task result, either success or failure"""
538 529 # first, relay result to client
539 530 engine = idents[0]
540 531 client = idents[1]
541 532 # swap_ids for XREP-XREP mirror
542 533 raw_msg[:2] = [client,engine]
543 534 # print (map(str, raw_msg[:4]))
544 535 self.client_stream.send_multipart(raw_msg, copy=False)
545 536 # now, update our data structures
546 537 msg_id = parent['msg_id']
547 538 self.blacklist.pop(msg_id, None)
548 539 self.pending[engine].pop(msg_id)
549 540 if success:
550 541 self.completed[engine].add(msg_id)
551 542 self.all_completed.add(msg_id)
552 543 else:
553 544 self.failed[engine].add(msg_id)
554 545 self.all_failed.add(msg_id)
555 546 self.all_done.add(msg_id)
556 547 self.destinations[msg_id] = engine
557 548
558 549 self.update_graph(msg_id, success)
559 550
560 @logged
561 551 def handle_unmet_dependency(self, idents, parent):
562 552 """handle an unmet dependency"""
563 553 engine = idents[0]
564 554 msg_id = parent['msg_id']
565 555
566 556 if msg_id not in self.blacklist:
567 557 self.blacklist[msg_id] = set()
568 558 self.blacklist[msg_id].add(engine)
569 559
570 560 args = self.pending[engine].pop(msg_id)
571 561 raw,targets,after,follow,timeout = args
572 562
573 563 if self.blacklist[msg_id] == targets:
574 564 self.depending[msg_id] = args
575 565 self.fail_unreachable(msg_id)
576 566 elif not self.maybe_run(msg_id, *args):
577 567 # resubmit failed
578 568 if msg_id not in self.all_failed:
579 569 # put it back in our dependency tree
580 570 self.save_unmet(msg_id, *args)
581 571
582 572 if self.hwm:
583 573 try:
584 574 idx = self.targets.index(engine)
585 575 except ValueError:
586 576 pass # skip load-update for dead engines
587 577 else:
588 578 if self.loads[idx] == self.hwm-1:
589 579 self.update_graph(None)
590 580
591 581
592 582
593 @logged
594 583 def update_graph(self, dep_id=None, success=True):
595 584 """dep_id just finished. Update our dependency
596 585 graph and submit any jobs that just became runable.
597 586
598 587 Called with dep_id=None to update entire graph for hwm, but without finishing
599 588 a task.
600 589 """
601 590 # print ("\n\n***********")
602 591 # pprint (dep_id)
603 592 # pprint (self.graph)
604 593 # pprint (self.depending)
605 594 # pprint (self.all_completed)
606 595 # pprint (self.all_failed)
607 596 # print ("\n\n***********\n\n")
608 597 # update any jobs that depended on the dependency
609 598 jobs = self.graph.pop(dep_id, [])
610 599
611 600 # recheck *all* jobs if
612 601 # a) we have HWM and an engine just become no longer full
613 602 # or b) dep_id was given as None
614 603 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
615 604 jobs = self.depending.keys()
616 605
617 606 for msg_id in jobs:
618 607 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
619 608
620 609 if after.unreachable(self.all_completed, self.all_failed)\
621 610 or follow.unreachable(self.all_completed, self.all_failed):
622 611 self.fail_unreachable(msg_id)
623 612
624 613 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
625 614 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
626 615
627 616 self.depending.pop(msg_id)
628 617 for mid in follow.union(after):
629 618 if mid in self.graph:
630 619 self.graph[mid].remove(msg_id)
631 620
632 621 #----------------------------------------------------------------------
633 622 # methods to be overridden by subclasses
634 623 #----------------------------------------------------------------------
635 624
636 625 def add_job(self, idx):
637 626 """Called after self.targets[idx] just got the job with header.
638 627 Override with subclasses. The default ordering is simple LRU.
639 628 The default loads are the number of outstanding jobs."""
640 629 self.loads[idx] += 1
641 630 for lis in (self.targets, self.loads):
642 631 lis.append(lis.pop(idx))
643 632
644 633
645 634 def finish_job(self, idx):
646 635 """Called after self.targets[idx] just finished a job.
647 636 Override with subclasses."""
648 637 self.loads[idx] -= 1
649 638
650 639
651 640
652 641 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
653 642 logname='root', log_url=None, loglevel=logging.DEBUG,
654 643 identity=b'task', in_thread=False):
655 644
656 645 ZMQStream = zmqstream.ZMQStream
657 646
658 647 if config:
659 648 # unwrap dict back into Config
660 649 config = Config(config)
661 650
662 651 if in_thread:
663 652 # use instance() to get the same Context/Loop as our parent
664 653 ctx = zmq.Context.instance()
665 654 loop = ioloop.IOLoop.instance()
666 655 else:
667 656 # in a process, don't use instance()
668 657 # for safety with multiprocessing
669 658 ctx = zmq.Context()
670 659 loop = ioloop.IOLoop()
671 660 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
672 661 ins.setsockopt(zmq.IDENTITY, identity)
673 662 ins.bind(in_addr)
674 663
675 664 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
676 665 outs.setsockopt(zmq.IDENTITY, identity)
677 666 outs.bind(out_addr)
678 667 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
679 668 mons.connect(mon_addr)
680 669 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
681 670 nots.setsockopt(zmq.SUBSCRIBE, b'')
682 671 nots.connect(not_addr)
683 672
684 673 # setup logging.
685 674 if in_thread:
686 675 log = Application.instance().log
687 676 else:
688 677 if log_url:
689 678 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
690 679 else:
691 680 log = local_logger(logname, loglevel)
692 681
693 682 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
694 683 mon_stream=mons, notifier_stream=nots,
695 684 loop=loop, log=log,
696 685 config=config)
697 686 scheduler.start()
698 687 if not in_thread:
699 688 try:
700 689 loop.start()
701 690 except KeyboardInterrupt:
702 691 print ("interrupted, exiting...", file=sys.__stderr__)
703 692
General Comments 0
You need to be logged in to leave comments. Login now