##// END OF EJS Templates
pass config obj to Scheduler as dict...
MinRK -
Show More
@@ -1,117 +1,119 b''
1 1 #!/usr/bin/env python
2 2 """The IPython Controller with 0MQ
3 3 This is a collection of one Hub and several Schedulers.
4 4 """
5 5 #-----------------------------------------------------------------------------
6 6 # Copyright (C) 2010 The IPython Development Team
7 7 #
8 8 # Distributed under the terms of the BSD License. The full license is in
9 9 # the file COPYING, distributed as part of this software.
10 10 #-----------------------------------------------------------------------------
11 11
12 12 #-----------------------------------------------------------------------------
13 13 # Imports
14 14 #-----------------------------------------------------------------------------
15 15 from __future__ import print_function
16 16
17 17 from multiprocessing import Process
18 18
19 19 import zmq
20 20 from zmq.devices import ProcessMonitoredQueue
21 21 # internal:
22 22 from IPython.utils.importstring import import_item
23 23 from IPython.utils.traitlets import Int, CStr, Instance, List, Bool
24 24
25 25 from IPython.parallel.util import signal_children
26 26 from .hub import Hub, HubFactory
27 27 from .scheduler import launch_scheduler
28 28
29 29 #-----------------------------------------------------------------------------
30 30 # Configurable
31 31 #-----------------------------------------------------------------------------
32 32
33 33
34 34 class ControllerFactory(HubFactory):
35 35 """Configurable for setting up a Hub and Schedulers."""
36 36
37 37 usethreads = Bool(False, config=True)
38 38 # pure-zmq downstream HWM
39 39 hwm = Int(0, config=True)
40 40
41 41 # internal
42 42 children = List()
43 43 mq_class = CStr('zmq.devices.ProcessMonitoredQueue')
44 44
45 45 def _usethreads_changed(self, name, old, new):
46 46 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
47 47
48 48 def __init__(self, **kwargs):
49 49 super(ControllerFactory, self).__init__(**kwargs)
50 50 self.subconstructors.append(self.construct_schedulers)
51 51
52 52 def start(self):
53 53 super(ControllerFactory, self).start()
54 54 child_procs = []
55 55 for child in self.children:
56 56 child.start()
57 57 if isinstance(child, ProcessMonitoredQueue):
58 58 child_procs.append(child.launcher)
59 59 elif isinstance(child, Process):
60 60 child_procs.append(child)
61 61 if child_procs:
62 62 signal_children(child_procs)
63 63
64 64
65 65 def construct_schedulers(self):
66 66 children = self.children
67 67 mq = import_item(self.mq_class)
68 68
69 69 # maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url
70 70 # IOPub relay (in a Process)
71 71 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
72 72 q.bind_in(self.client_info['iopub'])
73 73 q.bind_out(self.engine_info['iopub'])
74 74 q.setsockopt_out(zmq.SUBSCRIBE, '')
75 75 q.connect_mon(self.monitor_url)
76 76 q.daemon=True
77 77 children.append(q)
78 78
79 79 # Multiplexer Queue (in a Process)
80 80 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
81 81 q.bind_in(self.client_info['mux'])
82 82 q.setsockopt_in(zmq.IDENTITY, 'mux')
83 83 q.bind_out(self.engine_info['mux'])
84 84 q.connect_mon(self.monitor_url)
85 85 q.daemon=True
86 86 children.append(q)
87 87
88 88 # Control Queue (in a Process)
89 89 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
90 90 q.bind_in(self.client_info['control'])
91 91 q.setsockopt_in(zmq.IDENTITY, 'control')
92 92 q.bind_out(self.engine_info['control'])
93 93 q.connect_mon(self.monitor_url)
94 94 q.daemon=True
95 95 children.append(q)
96 96 # Task Queue (in a Process)
97 97 if self.scheme == 'pure':
98 98 self.log.warn("task::using pure XREQ Task scheduler")
99 99 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
100 100 q.setsockopt_out(zmq.HWM, self.hwm)
101 101 q.bind_in(self.client_info['task'][1])
102 102 q.setsockopt_in(zmq.IDENTITY, 'task')
103 103 q.bind_out(self.engine_info['task'])
104 104 q.connect_mon(self.monitor_url)
105 105 q.daemon=True
106 106 children.append(q)
107 107 elif self.scheme == 'none':
108 108 self.log.warn("task::using no Task scheduler")
109 109
110 110 else:
111 111 self.log.info("task::using Python %s Task scheduler"%self.scheme)
112 sargs = (self.client_info['task'][1], self.engine_info['task'], self.monitor_url, self.client_info['notification'])
113 kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level, config=self.config)
112 sargs = (self.client_info['task'][1], self.engine_info['task'],
113 self.monitor_url, self.client_info['notification'])
114 kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level,
115 config=dict(self.config))
114 116 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
115 117 q.daemon=True
116 118 children.append(q)
117 119
@@ -1,592 +1,596 b''
1 1 """The Python scheduler for rich scheduling.
2 2
3 3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 5 Python Scheduler exists.
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2010-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #----------------------------------------------------------------------
15 15 # Imports
16 16 #----------------------------------------------------------------------
17 17
18 18 from __future__ import print_function
19 19
20 20 import logging
21 21 import sys
22 22
23 23 from datetime import datetime, timedelta
24 24 from random import randint, random
25 25 from types import FunctionType
26 26
27 27 try:
28 28 import numpy
29 29 except ImportError:
30 30 numpy = None
31 31
32 32 import zmq
33 33 from zmq.eventloop import ioloop, zmqstream
34 34
35 35 # local imports
36 36 from IPython.external.decorator import decorator
37 from IPython.config.loader import Config
37 38 from IPython.utils.traitlets import Instance, Dict, List, Set
38 39
39 40 from IPython.parallel import error
40 41 from IPython.parallel.factory import SessionFactory
41 42 from IPython.parallel.util import connect_logger, local_logger
42 43
43 44 from .dependency import Dependency
44 45
45 46 @decorator
46 47 def logged(f,self,*args,**kwargs):
47 48 # print ("#--------------------")
48 49 self.log.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
49 50 # print ("#--")
50 51 return f(self,*args, **kwargs)
51 52
52 53 #----------------------------------------------------------------------
53 54 # Chooser functions
54 55 #----------------------------------------------------------------------
55 56
56 57 def plainrandom(loads):
57 58 """Plain random pick."""
58 59 n = len(loads)
59 60 return randint(0,n-1)
60 61
61 62 def lru(loads):
62 63 """Always pick the front of the line.
63 64
64 65 The content of `loads` is ignored.
65 66
66 67 Assumes LRU ordering of loads, with oldest first.
67 68 """
68 69 return 0
69 70
70 71 def twobin(loads):
71 72 """Pick two at random, use the LRU of the two.
72 73
73 74 The content of loads is ignored.
74 75
75 76 Assumes LRU ordering of loads, with oldest first.
76 77 """
77 78 n = len(loads)
78 79 a = randint(0,n-1)
79 80 b = randint(0,n-1)
80 81 return min(a,b)
81 82
82 83 def weighted(loads):
83 84 """Pick two at random using inverse load as weight.
84 85
85 86 Return the less loaded of the two.
86 87 """
87 88 # weight 0 a million times more than 1:
88 89 weights = 1./(1e-6+numpy.array(loads))
89 90 sums = weights.cumsum()
90 91 t = sums[-1]
91 92 x = random()*t
92 93 y = random()*t
93 94 idx = 0
94 95 idy = 0
95 96 while sums[idx] < x:
96 97 idx += 1
97 98 while sums[idy] < y:
98 99 idy += 1
99 100 if weights[idy] > weights[idx]:
100 101 return idy
101 102 else:
102 103 return idx
103 104
104 105 def leastload(loads):
105 106 """Always choose the lowest load.
106 107
107 108 If the lowest load occurs more than once, the first
108 109 occurance will be used. If loads has LRU ordering, this means
109 110 the LRU of those with the lowest load is chosen.
110 111 """
111 112 return loads.index(min(loads))
112 113
113 114 #---------------------------------------------------------------------
114 115 # Classes
115 116 #---------------------------------------------------------------------
116 117 # store empty default dependency:
117 118 MET = Dependency([])
118 119
119 120 class TaskScheduler(SessionFactory):
120 121 """Python TaskScheduler object.
121 122
122 123 This is the simplest object that supports msg_id based
123 124 DAG dependencies. *Only* task msg_ids are checked, not
124 125 msg_ids of jobs submitted via the MUX queue.
125 126
126 127 """
127 128
128 129 # input arguments:
129 130 scheme = Instance(FunctionType, default=leastload) # function for determining the destination
130 131 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
131 132 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
132 133 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
133 134 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
134 135
135 136 # internals:
136 137 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
137 138 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
138 139 pending = Dict() # dict by engine_uuid of submitted tasks
139 140 completed = Dict() # dict by engine_uuid of completed tasks
140 141 failed = Dict() # dict by engine_uuid of failed tasks
141 142 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
142 143 clients = Dict() # dict by msg_id for who submitted the task
143 144 targets = List() # list of target IDENTs
144 145 loads = List() # list of engine loads
145 146 all_completed = Set() # set of all completed tasks
146 147 all_failed = Set() # set of all failed tasks
147 148 all_done = Set() # set of all finished tasks=union(completed,failed)
148 149 all_ids = Set() # set of all submitted task IDs
149 150 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
150 151 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
151 152
152 153
153 154 def start(self):
154 155 self.engine_stream.on_recv(self.dispatch_result, copy=False)
155 156 self._notification_handlers = dict(
156 157 registration_notification = self._register_engine,
157 158 unregistration_notification = self._unregister_engine
158 159 )
159 160 self.notifier_stream.on_recv(self.dispatch_notification)
160 161 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
161 162 self.auditor.start()
162 163 self.log.info("Scheduler started...%r"%self)
163 164
164 165 def resume_receiving(self):
165 166 """Resume accepting jobs."""
166 167 self.client_stream.on_recv(self.dispatch_submission, copy=False)
167 168
168 169 def stop_receiving(self):
169 170 """Stop accepting jobs while there are no engines.
170 171 Leave them in the ZMQ queue."""
171 172 self.client_stream.on_recv(None)
172 173
173 174 #-----------------------------------------------------------------------
174 175 # [Un]Registration Handling
175 176 #-----------------------------------------------------------------------
176 177
177 178 def dispatch_notification(self, msg):
178 179 """dispatch register/unregister events."""
179 180 idents,msg = self.session.feed_identities(msg)
180 181 msg = self.session.unpack_message(msg)
181 182 msg_type = msg['msg_type']
182 183 handler = self._notification_handlers.get(msg_type, None)
183 184 if handler is None:
184 185 raise Exception("Unhandled message type: %s"%msg_type)
185 186 else:
186 187 try:
187 188 handler(str(msg['content']['queue']))
188 189 except KeyError:
189 190 self.log.error("task::Invalid notification msg: %s"%msg)
190 191
191 192 @logged
192 193 def _register_engine(self, uid):
193 194 """New engine with ident `uid` became available."""
194 195 # head of the line:
195 196 self.targets.insert(0,uid)
196 197 self.loads.insert(0,0)
197 198 # initialize sets
198 199 self.completed[uid] = set()
199 200 self.failed[uid] = set()
200 201 self.pending[uid] = {}
201 202 if len(self.targets) == 1:
202 203 self.resume_receiving()
203 204
204 205 def _unregister_engine(self, uid):
205 206 """Existing engine with ident `uid` became unavailable."""
206 207 if len(self.targets) == 1:
207 208 # this was our only engine
208 209 self.stop_receiving()
209 210
210 211 # handle any potentially finished tasks:
211 212 self.engine_stream.flush()
212 213
213 214 self.completed.pop(uid)
214 215 self.failed.pop(uid)
215 216 # don't pop destinations, because it might be used later
216 217 # map(self.destinations.pop, self.completed.pop(uid))
217 218 # map(self.destinations.pop, self.failed.pop(uid))
218 219
219 220 idx = self.targets.index(uid)
220 221 self.targets.pop(idx)
221 222 self.loads.pop(idx)
222 223
223 224 # wait 5 seconds before cleaning up pending jobs, since the results might
224 225 # still be incoming
225 226 if self.pending[uid]:
226 227 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
227 228 dc.start()
228 229
229 230 @logged
230 231 def handle_stranded_tasks(self, engine):
231 232 """Deal with jobs resident in an engine that died."""
232 233 lost = self.pending.pop(engine)
233 234
234 235 for msg_id, (raw_msg, targets, MET, follow, timeout) in lost.iteritems():
235 236 self.all_failed.add(msg_id)
236 237 self.all_done.add(msg_id)
237 238 idents,msg = self.session.feed_identities(raw_msg, copy=False)
238 239 msg = self.session.unpack_message(msg, copy=False, content=False)
239 240 parent = msg['header']
240 241 idents = [idents[0],engine]+idents[1:]
241 242 # print (idents)
242 243 try:
243 244 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
244 245 except:
245 246 content = error.wrap_exception()
246 247 msg = self.session.send(self.client_stream, 'apply_reply', content,
247 248 parent=parent, ident=idents)
248 249 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
249 250 self.update_graph(msg_id)
250 251
251 252
252 253 #-----------------------------------------------------------------------
253 254 # Job Submission
254 255 #-----------------------------------------------------------------------
255 256 @logged
256 257 def dispatch_submission(self, raw_msg):
257 258 """Dispatch job submission to appropriate handlers."""
258 259 # ensure targets up to date:
259 260 self.notifier_stream.flush()
260 261 try:
261 262 idents, msg = self.session.feed_identities(raw_msg, copy=False)
262 263 msg = self.session.unpack_message(msg, content=False, copy=False)
263 264 except:
264 265 self.log.error("task::Invaid task: %s"%raw_msg, exc_info=True)
265 266 return
266 267
267 268 # send to monitor
268 269 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
269 270
270 271 header = msg['header']
271 272 msg_id = header['msg_id']
272 273 self.all_ids.add(msg_id)
273 274
274 275 # targets
275 276 targets = set(header.get('targets', []))
276 277
277 278 # time dependencies
278 279 after = Dependency(header.get('after', []))
279 280 if after.all:
280 281 if after.success:
281 282 after.difference_update(self.all_completed)
282 283 if after.failure:
283 284 after.difference_update(self.all_failed)
284 285 if after.check(self.all_completed, self.all_failed):
285 286 # recast as empty set, if `after` already met,
286 287 # to prevent unnecessary set comparisons
287 288 after = MET
288 289
289 290 # location dependencies
290 291 follow = Dependency(header.get('follow', []))
291 292
292 293 # turn timeouts into datetime objects:
293 294 timeout = header.get('timeout', None)
294 295 if timeout:
295 296 timeout = datetime.now() + timedelta(0,timeout,0)
296 297
297 298 args = [raw_msg, targets, after, follow, timeout]
298 299
299 300 # validate and reduce dependencies:
300 301 for dep in after,follow:
301 302 # check valid:
302 303 if msg_id in dep or dep.difference(self.all_ids):
303 304 self.depending[msg_id] = args
304 305 return self.fail_unreachable(msg_id, error.InvalidDependency)
305 306 # check if unreachable:
306 307 if dep.unreachable(self.all_completed, self.all_failed):
307 308 self.depending[msg_id] = args
308 309 return self.fail_unreachable(msg_id)
309 310
310 311 if after.check(self.all_completed, self.all_failed):
311 312 # time deps already met, try to run
312 313 if not self.maybe_run(msg_id, *args):
313 314 # can't run yet
314 315 self.save_unmet(msg_id, *args)
315 316 else:
316 317 self.save_unmet(msg_id, *args)
317 318
318 319 # @logged
319 320 def audit_timeouts(self):
320 321 """Audit all waiting tasks for expired timeouts."""
321 322 now = datetime.now()
322 323 for msg_id in self.depending.keys():
323 324 # must recheck, in case one failure cascaded to another:
324 325 if msg_id in self.depending:
325 326 raw,after,targets,follow,timeout = self.depending[msg_id]
326 327 if timeout and timeout < now:
327 328 self.fail_unreachable(msg_id, timeout=True)
328 329
329 330 @logged
330 331 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
331 332 """a task has become unreachable, send a reply with an ImpossibleDependency
332 333 error."""
333 334 if msg_id not in self.depending:
334 335 self.log.error("msg %r already failed!"%msg_id)
335 336 return
336 337 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
337 338 for mid in follow.union(after):
338 339 if mid in self.graph:
339 340 self.graph[mid].remove(msg_id)
340 341
341 342 # FIXME: unpacking a message I've already unpacked, but didn't save:
342 343 idents,msg = self.session.feed_identities(raw_msg, copy=False)
343 344 msg = self.session.unpack_message(msg, copy=False, content=False)
344 345 header = msg['header']
345 346
346 347 try:
347 348 raise why()
348 349 except:
349 350 content = error.wrap_exception()
350 351
351 352 self.all_done.add(msg_id)
352 353 self.all_failed.add(msg_id)
353 354
354 355 msg = self.session.send(self.client_stream, 'apply_reply', content,
355 356 parent=header, ident=idents)
356 357 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
357 358
358 359 self.update_graph(msg_id, success=False)
359 360
360 361 @logged
361 362 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
362 363 """check location dependencies, and run if they are met."""
363 364 blacklist = self.blacklist.setdefault(msg_id, set())
364 365 if follow or targets or blacklist:
365 366 # we need a can_run filter
366 367 def can_run(idx):
367 368 target = self.targets[idx]
368 369 # check targets
369 370 if targets and target not in targets:
370 371 return False
371 372 # check blacklist
372 373 if target in blacklist:
373 374 return False
374 375 # check follow
375 376 return follow.check(self.completed[target], self.failed[target])
376 377
377 378 indices = filter(can_run, range(len(self.targets)))
378 379 if not indices:
379 380 # couldn't run
380 381 if follow.all:
381 382 # check follow for impossibility
382 383 dests = set()
383 384 relevant = set()
384 385 if follow.success:
385 386 relevant = self.all_completed
386 387 if follow.failure:
387 388 relevant = relevant.union(self.all_failed)
388 389 for m in follow.intersection(relevant):
389 390 dests.add(self.destinations[m])
390 391 if len(dests) > 1:
391 392 self.fail_unreachable(msg_id)
392 393 return False
393 394 if targets:
394 395 # check blacklist+targets for impossibility
395 396 targets.difference_update(blacklist)
396 397 if not targets or not targets.intersection(self.targets):
397 398 self.fail_unreachable(msg_id)
398 399 return False
399 400 return False
400 401 else:
401 402 indices = None
402 403
403 404 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
404 405 return True
405 406
406 407 @logged
407 408 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
408 409 """Save a message for later submission when its dependencies are met."""
409 410 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
410 411 # track the ids in follow or after, but not those already finished
411 412 for dep_id in after.union(follow).difference(self.all_done):
412 413 if dep_id not in self.graph:
413 414 self.graph[dep_id] = set()
414 415 self.graph[dep_id].add(msg_id)
415 416
416 417 @logged
417 418 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
418 419 """Submit a task to any of a subset of our targets."""
419 420 if indices:
420 421 loads = [self.loads[i] for i in indices]
421 422 else:
422 423 loads = self.loads
423 424 idx = self.scheme(loads)
424 425 if indices:
425 426 idx = indices[idx]
426 427 target = self.targets[idx]
427 428 # print (target, map(str, msg[:3]))
428 429 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
429 430 self.engine_stream.send_multipart(raw_msg, copy=False)
430 431 self.add_job(idx)
431 432 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
432 433 content = dict(msg_id=msg_id, engine_id=target)
433 434 self.session.send(self.mon_stream, 'task_destination', content=content,
434 435 ident=['tracktask',self.session.session])
435 436
436 437 #-----------------------------------------------------------------------
437 438 # Result Handling
438 439 #-----------------------------------------------------------------------
439 440 @logged
440 441 def dispatch_result(self, raw_msg):
441 442 """dispatch method for result replies"""
442 443 try:
443 444 idents,msg = self.session.feed_identities(raw_msg, copy=False)
444 445 msg = self.session.unpack_message(msg, content=False, copy=False)
445 446 except:
446 447 self.log.error("task::Invaid result: %s"%raw_msg, exc_info=True)
447 448 return
448 449
449 450 header = msg['header']
450 451 if header.get('dependencies_met', True):
451 452 success = (header['status'] == 'ok')
452 453 self.handle_result(idents, msg['parent_header'], raw_msg, success)
453 454 # send to Hub monitor
454 455 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
455 456 else:
456 457 self.handle_unmet_dependency(idents, msg['parent_header'])
457 458
458 459 @logged
459 460 def handle_result(self, idents, parent, raw_msg, success=True):
460 461 """handle a real task result, either success or failure"""
461 462 # first, relay result to client
462 463 engine = idents[0]
463 464 client = idents[1]
464 465 # swap_ids for XREP-XREP mirror
465 466 raw_msg[:2] = [client,engine]
466 467 # print (map(str, raw_msg[:4]))
467 468 self.client_stream.send_multipart(raw_msg, copy=False)
468 469 # now, update our data structures
469 470 msg_id = parent['msg_id']
470 471 self.blacklist.pop(msg_id, None)
471 472 self.pending[engine].pop(msg_id)
472 473 if success:
473 474 self.completed[engine].add(msg_id)
474 475 self.all_completed.add(msg_id)
475 476 else:
476 477 self.failed[engine].add(msg_id)
477 478 self.all_failed.add(msg_id)
478 479 self.all_done.add(msg_id)
479 480 self.destinations[msg_id] = engine
480 481
481 482 self.update_graph(msg_id, success)
482 483
483 484 @logged
484 485 def handle_unmet_dependency(self, idents, parent):
485 486 """handle an unmet dependency"""
486 487 engine = idents[0]
487 488 msg_id = parent['msg_id']
488 489
489 490 if msg_id not in self.blacklist:
490 491 self.blacklist[msg_id] = set()
491 492 self.blacklist[msg_id].add(engine)
492 493
493 494 args = self.pending[engine].pop(msg_id)
494 495 raw,targets,after,follow,timeout = args
495 496
496 497 if self.blacklist[msg_id] == targets:
497 498 self.depending[msg_id] = args
498 499 return self.fail_unreachable(msg_id)
499 500
500 501 elif not self.maybe_run(msg_id, *args):
501 502 # resubmit failed, put it back in our dependency tree
502 503 self.save_unmet(msg_id, *args)
503 504
504 505
505 506 @logged
506 507 def update_graph(self, dep_id, success=True):
507 508 """dep_id just finished. Update our dependency
508 509 graph and submit any jobs that just became runable."""
509 510 # print ("\n\n***********")
510 511 # pprint (dep_id)
511 512 # pprint (self.graph)
512 513 # pprint (self.depending)
513 514 # pprint (self.all_completed)
514 515 # pprint (self.all_failed)
515 516 # print ("\n\n***********\n\n")
516 517 if dep_id not in self.graph:
517 518 return
518 519 jobs = self.graph.pop(dep_id)
519 520
520 521 for msg_id in jobs:
521 522 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
522 523
523 524 if after.unreachable(self.all_completed, self.all_failed) or follow.unreachable(self.all_completed, self.all_failed):
524 525 self.fail_unreachable(msg_id)
525 526
526 527 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
527 528 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
528 529
529 530 self.depending.pop(msg_id)
530 531 for mid in follow.union(after):
531 532 if mid in self.graph:
532 533 self.graph[mid].remove(msg_id)
533 534
534 535 #----------------------------------------------------------------------
535 536 # methods to be overridden by subclasses
536 537 #----------------------------------------------------------------------
537 538
538 539 def add_job(self, idx):
539 540 """Called after self.targets[idx] just got the job with header.
540 541 Override with subclasses. The default ordering is simple LRU.
541 542 The default loads are the number of outstanding jobs."""
542 543 self.loads[idx] += 1
543 544 for lis in (self.targets, self.loads):
544 545 lis.append(lis.pop(idx))
545 546
546 547
547 548 def finish_job(self, idx):
548 549 """Called after self.targets[idx] just finished a job.
549 550 Override with subclasses."""
550 551 self.loads[idx] -= 1
551 552
552 553
553 554
554 555 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ',
555 556 log_addr=None, loglevel=logging.DEBUG, scheme='lru',
556 557 identity=b'task'):
557 558 from zmq.eventloop import ioloop
558 559 from zmq.eventloop.zmqstream import ZMQStream
559 560
561 if config:
562 # unwrap dict back into Config
563 config = Config(config)
564
560 565 ctx = zmq.Context()
561 566 loop = ioloop.IOLoop()
562 print (in_addr, out_addr, mon_addr, not_addr)
563 567 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
564 568 ins.setsockopt(zmq.IDENTITY, identity)
565 569 ins.bind(in_addr)
566 570
567 571 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
568 572 outs.setsockopt(zmq.IDENTITY, identity)
569 573 outs.bind(out_addr)
570 574 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
571 575 mons.connect(mon_addr)
572 576 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
573 577 nots.setsockopt(zmq.SUBSCRIBE, '')
574 578 nots.connect(not_addr)
575 579
576 580 scheme = globals().get(scheme, None)
577 581 # setup logging
578 582 if log_addr:
579 583 connect_logger(logname, ctx, log_addr, root="scheduler", loglevel=loglevel)
580 584 else:
581 585 local_logger(logname, loglevel)
582 586
583 587 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
584 588 mon_stream=mons, notifier_stream=nots,
585 589 scheme=scheme, loop=loop, logname=logname,
586 590 config=config)
587 591 scheduler.start()
588 592 try:
589 593 loop.start()
590 594 except KeyboardInterrupt:
591 595 print ("interrupted, exiting...", file=sys.__stderr__)
592 596
General Comments 0
You need to be logged in to leave comments. Login now