##// END OF EJS Templates
don't perform costly 'difference_update' on dependencies...
MinRK -
Show More
@@ -1,692 +1,697
1 1 """The Python scheduler for rich scheduling.
2 2
3 3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 5 Python Scheduler exists.
6 6
7 7 Authors:
8 8
9 9 * Min RK
10 10 """
11 11 #-----------------------------------------------------------------------------
12 12 # Copyright (C) 2010-2011 The IPython Development Team
13 13 #
14 14 # Distributed under the terms of the BSD License. The full license is in
15 15 # the file COPYING, distributed as part of this software.
16 16 #-----------------------------------------------------------------------------
17 17
18 18 #----------------------------------------------------------------------
19 19 # Imports
20 20 #----------------------------------------------------------------------
21 21
22 22 from __future__ import print_function
23 23
24 24 import logging
25 25 import sys
26 26
27 27 from datetime import datetime, timedelta
28 28 from random import randint, random
29 29 from types import FunctionType
30 30
31 31 try:
32 32 import numpy
33 33 except ImportError:
34 34 numpy = None
35 35
36 36 import zmq
37 37 from zmq.eventloop import ioloop, zmqstream
38 38
39 39 # local imports
40 40 from IPython.external.decorator import decorator
41 41 from IPython.config.application import Application
42 42 from IPython.config.loader import Config
43 43 from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum
44 44
45 45 from IPython.parallel import error
46 46 from IPython.parallel.factory import SessionFactory
47 47 from IPython.parallel.util import connect_logger, local_logger
48 48
49 49 from .dependency import Dependency
50 50
51 51 @decorator
52 52 def logged(f,self,*args,**kwargs):
53 53 # print ("#--------------------")
54 54 self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs)
55 55 # print ("#--")
56 56 return f(self,*args, **kwargs)
57 57
58 58 #----------------------------------------------------------------------
59 59 # Chooser functions
60 60 #----------------------------------------------------------------------
61 61
62 62 def plainrandom(loads):
63 63 """Plain random pick."""
64 64 n = len(loads)
65 65 return randint(0,n-1)
66 66
67 67 def lru(loads):
68 68 """Always pick the front of the line.
69 69
70 70 The content of `loads` is ignored.
71 71
72 72 Assumes LRU ordering of loads, with oldest first.
73 73 """
74 74 return 0
75 75
76 76 def twobin(loads):
77 77 """Pick two at random, use the LRU of the two.
78 78
79 79 The content of loads is ignored.
80 80
81 81 Assumes LRU ordering of loads, with oldest first.
82 82 """
83 83 n = len(loads)
84 84 a = randint(0,n-1)
85 85 b = randint(0,n-1)
86 86 return min(a,b)
87 87
88 88 def weighted(loads):
89 89 """Pick two at random using inverse load as weight.
90 90
91 91 Return the less loaded of the two.
92 92 """
93 93 # weight 0 a million times more than 1:
94 94 weights = 1./(1e-6+numpy.array(loads))
95 95 sums = weights.cumsum()
96 96 t = sums[-1]
97 97 x = random()*t
98 98 y = random()*t
99 99 idx = 0
100 100 idy = 0
101 101 while sums[idx] < x:
102 102 idx += 1
103 103 while sums[idy] < y:
104 104 idy += 1
105 105 if weights[idy] > weights[idx]:
106 106 return idy
107 107 else:
108 108 return idx
109 109
110 110 def leastload(loads):
111 111 """Always choose the lowest load.
112 112
113 113 If the lowest load occurs more than once, the first
114 114 occurance will be used. If loads has LRU ordering, this means
115 115 the LRU of those with the lowest load is chosen.
116 116 """
117 117 return loads.index(min(loads))
118 118
119 119 #---------------------------------------------------------------------
120 120 # Classes
121 121 #---------------------------------------------------------------------
122 122 # store empty default dependency:
123 123 MET = Dependency([])
124 124
125 125 class TaskScheduler(SessionFactory):
126 126 """Python TaskScheduler object.
127 127
128 128 This is the simplest object that supports msg_id based
129 129 DAG dependencies. *Only* task msg_ids are checked, not
130 130 msg_ids of jobs submitted via the MUX queue.
131 131
132 132 """
133 133
134 134 hwm = Int(0, config=True, shortname='hwm',
135 135 help="""specify the High Water Mark (HWM) for the downstream
136 136 socket in the Task scheduler. This is the maximum number
137 137 of allowed outstanding tasks on each engine."""
138 138 )
139 139 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
140 140 'leastload', config=True, shortname='scheme', allow_none=False,
141 141 help="""select the task scheduler scheme [default: Python LRU]
142 142 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
143 143 )
144 144 def _scheme_name_changed(self, old, new):
145 145 self.log.debug("Using scheme %r"%new)
146 146 self.scheme = globals()[new]
147 147
148 148 # input arguments:
149 149 scheme = Instance(FunctionType) # function for determining the destination
150 150 def _scheme_default(self):
151 151 return leastload
152 152 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
153 153 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
154 154 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
155 155 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
156 156
157 157 # internals:
158 158 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
159 159 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
160 160 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
161 161 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
162 162 pending = Dict() # dict by engine_uuid of submitted tasks
163 163 completed = Dict() # dict by engine_uuid of completed tasks
164 164 failed = Dict() # dict by engine_uuid of failed tasks
165 165 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
166 166 clients = Dict() # dict by msg_id for who submitted the task
167 167 targets = List() # list of target IDENTs
168 168 loads = List() # list of engine loads
169 169 # full = Set() # set of IDENTs that have HWM outstanding tasks
170 170 all_completed = Set() # set of all completed tasks
171 171 all_failed = Set() # set of all failed tasks
172 172 all_done = Set() # set of all finished tasks=union(completed,failed)
173 173 all_ids = Set() # set of all submitted task IDs
174 174 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
175 175 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
176 176
177 177
178 178 def start(self):
179 179 self.engine_stream.on_recv(self.dispatch_result, copy=False)
180 180 self._notification_handlers = dict(
181 181 registration_notification = self._register_engine,
182 182 unregistration_notification = self._unregister_engine
183 183 )
184 184 self.notifier_stream.on_recv(self.dispatch_notification)
185 185 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
186 186 self.auditor.start()
187 187 self.log.info("Scheduler started [%s]"%self.scheme_name)
188 188
189 189 def resume_receiving(self):
190 190 """Resume accepting jobs."""
191 191 self.client_stream.on_recv(self.dispatch_submission, copy=False)
192 192
193 193 def stop_receiving(self):
194 194 """Stop accepting jobs while there are no engines.
195 195 Leave them in the ZMQ queue."""
196 196 self.client_stream.on_recv(None)
197 197
198 198 #-----------------------------------------------------------------------
199 199 # [Un]Registration Handling
200 200 #-----------------------------------------------------------------------
201 201
202 202 def dispatch_notification(self, msg):
203 203 """dispatch register/unregister events."""
204 204 try:
205 205 idents,msg = self.session.feed_identities(msg)
206 206 except ValueError:
207 207 self.log.warn("task::Invalid Message: %r"%msg)
208 208 return
209 209 try:
210 210 msg = self.session.unpack_message(msg)
211 211 except ValueError:
212 212 self.log.warn("task::Unauthorized message from: %r"%idents)
213 213 return
214 214
215 215 msg_type = msg['msg_type']
216 216
217 217 handler = self._notification_handlers.get(msg_type, None)
218 218 if handler is None:
219 219 self.log.error("Unhandled message type: %r"%msg_type)
220 220 else:
221 221 try:
222 222 handler(str(msg['content']['queue']))
223 223 except KeyError:
224 224 self.log.error("task::Invalid notification msg: %r"%msg)
225 225
226 @logged
227 226 def _register_engine(self, uid):
228 227 """New engine with ident `uid` became available."""
229 228 # head of the line:
230 229 self.targets.insert(0,uid)
231 230 self.loads.insert(0,0)
232 231 # initialize sets
233 232 self.completed[uid] = set()
234 233 self.failed[uid] = set()
235 234 self.pending[uid] = {}
236 235 if len(self.targets) == 1:
237 236 self.resume_receiving()
238 237 # rescan the graph:
239 238 self.update_graph(None)
240 239
241 240 def _unregister_engine(self, uid):
242 241 """Existing engine with ident `uid` became unavailable."""
243 242 if len(self.targets) == 1:
244 243 # this was our only engine
245 244 self.stop_receiving()
246 245
247 246 # handle any potentially finished tasks:
248 247 self.engine_stream.flush()
249 248
250 249 # don't pop destinations, because they might be used later
251 250 # map(self.destinations.pop, self.completed.pop(uid))
252 251 # map(self.destinations.pop, self.failed.pop(uid))
253 252
254 253 # prevent this engine from receiving work
255 254 idx = self.targets.index(uid)
256 255 self.targets.pop(idx)
257 256 self.loads.pop(idx)
258 257
259 258 # wait 5 seconds before cleaning up pending jobs, since the results might
260 259 # still be incoming
261 260 if self.pending[uid]:
262 261 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
263 262 dc.start()
264 263 else:
265 264 self.completed.pop(uid)
266 265 self.failed.pop(uid)
267 266
268 267
269 268 def handle_stranded_tasks(self, engine):
270 269 """Deal with jobs resident in an engine that died."""
271 270 lost = self.pending[engine]
272 271 for msg_id in lost.keys():
273 272 if msg_id not in self.pending[engine]:
274 273 # prevent double-handling of messages
275 274 continue
276 275
277 276 raw_msg = lost[msg_id][0]
278 277 idents,msg = self.session.feed_identities(raw_msg, copy=False)
279 278 parent = self.session.unpack(msg[1].bytes)
280 279 idents = [engine, idents[0]]
281 280
282 281 # build fake error reply
283 282 try:
284 283 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
285 284 except:
286 285 content = error.wrap_exception()
287 286 msg = self.session.msg('apply_reply', content, parent=parent, subheader={'status':'error'})
288 287 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
289 288 # and dispatch it
290 289 self.dispatch_result(raw_reply)
291 290
292 291 # finally scrub completed/failed lists
293 292 self.completed.pop(engine)
294 293 self.failed.pop(engine)
295 294
296 295
297 296 #-----------------------------------------------------------------------
298 297 # Job Submission
299 298 #-----------------------------------------------------------------------
300 299 def dispatch_submission(self, raw_msg):
301 300 """Dispatch job submission to appropriate handlers."""
302 301 # ensure targets up to date:
303 302 self.notifier_stream.flush()
304 303 try:
305 304 idents, msg = self.session.feed_identities(raw_msg, copy=False)
306 305 msg = self.session.unpack_message(msg, content=False, copy=False)
307 306 except Exception:
308 307 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
309 308 return
310 309
311 310
312 311 # send to monitor
313 312 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
314 313
315 314 header = msg['header']
316 315 msg_id = header['msg_id']
317 316 self.all_ids.add(msg_id)
318 317
319 318 # targets
320 319 targets = set(header.get('targets', []))
321 320 retries = header.get('retries', 0)
322 321 self.retries[msg_id] = retries
323 322
324 323 # time dependencies
325 after = Dependency(header.get('after', []))
324 after = header.get('after', None)
325 if after:
326 after = Dependency(after)
326 327 if after.all:
327 328 if after.success:
328 after.difference_update(self.all_completed)
329 after = after.difference(self.all_completed)
329 330 if after.failure:
330 after.difference_update(self.all_failed)
331 after = after.difference(self.all_failed)
331 332 if after.check(self.all_completed, self.all_failed):
332 333 # recast as empty set, if `after` already met,
333 334 # to prevent unnecessary set comparisons
334 335 after = MET
336 else:
337 after = MET
335 338
336 339 # location dependencies
337 340 follow = Dependency(header.get('follow', []))
338 341
339 342 # turn timeouts into datetime objects:
340 343 timeout = header.get('timeout', None)
341 344 if timeout:
342 345 timeout = datetime.now() + timedelta(0,timeout,0)
343 346
344 347 args = [raw_msg, targets, after, follow, timeout]
345 348
346 349 # validate and reduce dependencies:
347 350 for dep in after,follow:
351 if not dep: # empty dependency
352 continue
348 353 # check valid:
349 354 if msg_id in dep or dep.difference(self.all_ids):
350 355 self.depending[msg_id] = args
351 356 return self.fail_unreachable(msg_id, error.InvalidDependency)
352 357 # check if unreachable:
353 358 if dep.unreachable(self.all_completed, self.all_failed):
354 359 self.depending[msg_id] = args
355 360 return self.fail_unreachable(msg_id)
356 361
357 362 if after.check(self.all_completed, self.all_failed):
358 363 # time deps already met, try to run
359 364 if not self.maybe_run(msg_id, *args):
360 365 # can't run yet
361 366 if msg_id not in self.all_failed:
362 367 # could have failed as unreachable
363 368 self.save_unmet(msg_id, *args)
364 369 else:
365 370 self.save_unmet(msg_id, *args)
366 371
367 372 def audit_timeouts(self):
368 373 """Audit all waiting tasks for expired timeouts."""
369 374 now = datetime.now()
370 375 for msg_id in self.depending.keys():
371 376 # must recheck, in case one failure cascaded to another:
372 377 if msg_id in self.depending:
373 378 raw,after,targets,follow,timeout = self.depending[msg_id]
374 379 if timeout and timeout < now:
375 380 self.fail_unreachable(msg_id, error.TaskTimeout)
376 381
377 382 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
378 383 """a task has become unreachable, send a reply with an ImpossibleDependency
379 384 error."""
380 385 if msg_id not in self.depending:
381 386 self.log.error("msg %r already failed!", msg_id)
382 387 return
383 388 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
384 389 for mid in follow.union(after):
385 390 if mid in self.graph:
386 391 self.graph[mid].remove(msg_id)
387 392
388 393 # FIXME: unpacking a message I've already unpacked, but didn't save:
389 394 idents,msg = self.session.feed_identities(raw_msg, copy=False)
390 395 header = self.session.unpack(msg[1].bytes)
391 396
392 397 try:
393 398 raise why()
394 399 except:
395 400 content = error.wrap_exception()
396 401
397 402 self.all_done.add(msg_id)
398 403 self.all_failed.add(msg_id)
399 404
400 405 msg = self.session.send(self.client_stream, 'apply_reply', content,
401 406 parent=header, ident=idents)
402 407 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
403 408
404 409 self.update_graph(msg_id, success=False)
405 410
406 411 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
407 412 """check location dependencies, and run if they are met."""
408 413 blacklist = self.blacklist.setdefault(msg_id, set())
409 414 if follow or targets or blacklist or self.hwm:
410 415 # we need a can_run filter
411 416 def can_run(idx):
412 417 # check hwm
413 418 if self.hwm and self.loads[idx] == self.hwm:
414 419 return False
415 420 target = self.targets[idx]
416 421 # check blacklist
417 422 if target in blacklist:
418 423 return False
419 424 # check targets
420 425 if targets and target not in targets:
421 426 return False
422 427 # check follow
423 428 return follow.check(self.completed[target], self.failed[target])
424 429
425 430 indices = filter(can_run, range(len(self.targets)))
426 431
427 432 if not indices:
428 433 # couldn't run
429 434 if follow.all:
430 435 # check follow for impossibility
431 436 dests = set()
432 437 relevant = set()
433 438 if follow.success:
434 439 relevant = self.all_completed
435 440 if follow.failure:
436 441 relevant = relevant.union(self.all_failed)
437 442 for m in follow.intersection(relevant):
438 443 dests.add(self.destinations[m])
439 444 if len(dests) > 1:
440 445 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
441 446 self.fail_unreachable(msg_id)
442 447 return False
443 448 if targets:
444 449 # check blacklist+targets for impossibility
445 450 targets.difference_update(blacklist)
446 451 if not targets or not targets.intersection(self.targets):
447 452 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
448 453 self.fail_unreachable(msg_id)
449 454 return False
450 455 return False
451 456 else:
452 457 indices = None
453 458
454 459 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
455 460 return True
456 461
457 462 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
458 463 """Save a message for later submission when its dependencies are met."""
459 464 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
460 465 # track the ids in follow or after, but not those already finished
461 466 for dep_id in after.union(follow).difference(self.all_done):
462 467 if dep_id not in self.graph:
463 468 self.graph[dep_id] = set()
464 469 self.graph[dep_id].add(msg_id)
465 470
466 471 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
467 472 """Submit a task to any of a subset of our targets."""
468 473 if indices:
469 474 loads = [self.loads[i] for i in indices]
470 475 else:
471 476 loads = self.loads
472 477 idx = self.scheme(loads)
473 478 if indices:
474 479 idx = indices[idx]
475 480 target = self.targets[idx]
476 481 # print (target, map(str, msg[:3]))
477 482 # send job to the engine
478 483 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
479 484 self.engine_stream.send_multipart(raw_msg, copy=False)
480 485 # update load
481 486 self.add_job(idx)
482 487 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
483 488 # notify Hub
484 489 content = dict(msg_id=msg_id, engine_id=target)
485 490 self.session.send(self.mon_stream, 'task_destination', content=content,
486 491 ident=['tracktask',self.session.session])
487 492
488 493
489 494 #-----------------------------------------------------------------------
490 495 # Result Handling
491 496 #-----------------------------------------------------------------------
492 497 def dispatch_result(self, raw_msg):
493 498 """dispatch method for result replies"""
494 499 try:
495 500 idents,msg = self.session.feed_identities(raw_msg, copy=False)
496 501 msg = self.session.unpack_message(msg, content=False, copy=False)
497 502 engine = idents[0]
498 503 try:
499 504 idx = self.targets.index(engine)
500 505 except ValueError:
501 506 pass # skip load-update for dead engines
502 507 else:
503 508 self.finish_job(idx)
504 509 except Exception:
505 510 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
506 511 return
507 512
508 513 header = msg['header']
509 514 parent = msg['parent_header']
510 515 if header.get('dependencies_met', True):
511 516 success = (header['status'] == 'ok')
512 517 msg_id = parent['msg_id']
513 518 retries = self.retries[msg_id]
514 519 if not success and retries > 0:
515 520 # failed
516 521 self.retries[msg_id] = retries - 1
517 522 self.handle_unmet_dependency(idents, parent)
518 523 else:
519 524 del self.retries[msg_id]
520 525 # relay to client and update graph
521 526 self.handle_result(idents, parent, raw_msg, success)
522 527 # send to Hub monitor
523 528 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
524 529 else:
525 530 self.handle_unmet_dependency(idents, parent)
526 531
527 532 def handle_result(self, idents, parent, raw_msg, success=True):
528 533 """handle a real task result, either success or failure"""
529 534 # first, relay result to client
530 535 engine = idents[0]
531 536 client = idents[1]
532 537 # swap_ids for XREP-XREP mirror
533 538 raw_msg[:2] = [client,engine]
534 539 # print (map(str, raw_msg[:4]))
535 540 self.client_stream.send_multipart(raw_msg, copy=False)
536 541 # now, update our data structures
537 542 msg_id = parent['msg_id']
538 543 self.blacklist.pop(msg_id, None)
539 544 self.pending[engine].pop(msg_id)
540 545 if success:
541 546 self.completed[engine].add(msg_id)
542 547 self.all_completed.add(msg_id)
543 548 else:
544 549 self.failed[engine].add(msg_id)
545 550 self.all_failed.add(msg_id)
546 551 self.all_done.add(msg_id)
547 552 self.destinations[msg_id] = engine
548 553
549 554 self.update_graph(msg_id, success)
550 555
551 556 def handle_unmet_dependency(self, idents, parent):
552 557 """handle an unmet dependency"""
553 558 engine = idents[0]
554 559 msg_id = parent['msg_id']
555 560
556 561 if msg_id not in self.blacklist:
557 562 self.blacklist[msg_id] = set()
558 563 self.blacklist[msg_id].add(engine)
559 564
560 565 args = self.pending[engine].pop(msg_id)
561 566 raw,targets,after,follow,timeout = args
562 567
563 568 if self.blacklist[msg_id] == targets:
564 569 self.depending[msg_id] = args
565 570 self.fail_unreachable(msg_id)
566 571 elif not self.maybe_run(msg_id, *args):
567 572 # resubmit failed
568 573 if msg_id not in self.all_failed:
569 574 # put it back in our dependency tree
570 575 self.save_unmet(msg_id, *args)
571 576
572 577 if self.hwm:
573 578 try:
574 579 idx = self.targets.index(engine)
575 580 except ValueError:
576 581 pass # skip load-update for dead engines
577 582 else:
578 583 if self.loads[idx] == self.hwm-1:
579 584 self.update_graph(None)
580 585
581 586
582 587
583 588 def update_graph(self, dep_id=None, success=True):
584 589 """dep_id just finished. Update our dependency
585 590 graph and submit any jobs that just became runable.
586 591
587 592 Called with dep_id=None to update entire graph for hwm, but without finishing
588 593 a task.
589 594 """
590 595 # print ("\n\n***********")
591 596 # pprint (dep_id)
592 597 # pprint (self.graph)
593 598 # pprint (self.depending)
594 599 # pprint (self.all_completed)
595 600 # pprint (self.all_failed)
596 601 # print ("\n\n***********\n\n")
597 602 # update any jobs that depended on the dependency
598 603 jobs = self.graph.pop(dep_id, [])
599 604
600 605 # recheck *all* jobs if
601 606 # a) we have HWM and an engine just become no longer full
602 607 # or b) dep_id was given as None
603 608 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
604 609 jobs = self.depending.keys()
605 610
606 611 for msg_id in jobs:
607 612 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
608 613
609 614 if after.unreachable(self.all_completed, self.all_failed)\
610 615 or follow.unreachable(self.all_completed, self.all_failed):
611 616 self.fail_unreachable(msg_id)
612 617
613 618 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
614 619 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
615 620
616 621 self.depending.pop(msg_id)
617 622 for mid in follow.union(after):
618 623 if mid in self.graph:
619 624 self.graph[mid].remove(msg_id)
620 625
621 626 #----------------------------------------------------------------------
622 627 # methods to be overridden by subclasses
623 628 #----------------------------------------------------------------------
624 629
625 630 def add_job(self, idx):
626 631 """Called after self.targets[idx] just got the job with header.
627 632 Override with subclasses. The default ordering is simple LRU.
628 633 The default loads are the number of outstanding jobs."""
629 634 self.loads[idx] += 1
630 635 for lis in (self.targets, self.loads):
631 636 lis.append(lis.pop(idx))
632 637
633 638
634 639 def finish_job(self, idx):
635 640 """Called after self.targets[idx] just finished a job.
636 641 Override with subclasses."""
637 642 self.loads[idx] -= 1
638 643
639 644
640 645
641 646 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
642 647 logname='root', log_url=None, loglevel=logging.DEBUG,
643 648 identity=b'task', in_thread=False):
644 649
645 650 ZMQStream = zmqstream.ZMQStream
646 651
647 652 if config:
648 653 # unwrap dict back into Config
649 654 config = Config(config)
650 655
651 656 if in_thread:
652 657 # use instance() to get the same Context/Loop as our parent
653 658 ctx = zmq.Context.instance()
654 659 loop = ioloop.IOLoop.instance()
655 660 else:
656 661 # in a process, don't use instance()
657 662 # for safety with multiprocessing
658 663 ctx = zmq.Context()
659 664 loop = ioloop.IOLoop()
660 665 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
661 666 ins.setsockopt(zmq.IDENTITY, identity)
662 667 ins.bind(in_addr)
663 668
664 669 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
665 670 outs.setsockopt(zmq.IDENTITY, identity)
666 671 outs.bind(out_addr)
667 672 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
668 673 mons.connect(mon_addr)
669 674 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
670 675 nots.setsockopt(zmq.SUBSCRIBE, b'')
671 676 nots.connect(not_addr)
672 677
673 678 # setup logging.
674 679 if in_thread:
675 680 log = Application.instance().log
676 681 else:
677 682 if log_url:
678 683 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
679 684 else:
680 685 log = local_logger(logname, loglevel)
681 686
682 687 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
683 688 mon_stream=mons, notifier_stream=nots,
684 689 loop=loop, log=log,
685 690 config=config)
686 691 scheduler.start()
687 692 if not in_thread:
688 693 try:
689 694 loop.start()
690 695 except KeyboardInterrupt:
691 696 print ("interrupted, exiting...", file=sys.__stderr__)
692 697
General Comments 0
You need to be logged in to leave comments. Login now