##// END OF EJS Templates
don't perform costly 'difference_update' on dependencies...
MinRK -
Show More
@@ -1,692 +1,697 b''
1 """The Python scheduler for rich scheduling.
1 """The Python scheduler for rich scheduling.
2
2
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 Python Scheduler exists.
5 Python Scheduler exists.
6
6
7 Authors:
7 Authors:
8
8
9 * Min RK
9 * Min RK
10 """
10 """
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2010-2011 The IPython Development Team
12 # Copyright (C) 2010-2011 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #----------------------------------------------------------------------
18 #----------------------------------------------------------------------
19 # Imports
19 # Imports
20 #----------------------------------------------------------------------
20 #----------------------------------------------------------------------
21
21
22 from __future__ import print_function
22 from __future__ import print_function
23
23
24 import logging
24 import logging
25 import sys
25 import sys
26
26
27 from datetime import datetime, timedelta
27 from datetime import datetime, timedelta
28 from random import randint, random
28 from random import randint, random
29 from types import FunctionType
29 from types import FunctionType
30
30
31 try:
31 try:
32 import numpy
32 import numpy
33 except ImportError:
33 except ImportError:
34 numpy = None
34 numpy = None
35
35
36 import zmq
36 import zmq
37 from zmq.eventloop import ioloop, zmqstream
37 from zmq.eventloop import ioloop, zmqstream
38
38
39 # local imports
39 # local imports
40 from IPython.external.decorator import decorator
40 from IPython.external.decorator import decorator
41 from IPython.config.application import Application
41 from IPython.config.application import Application
42 from IPython.config.loader import Config
42 from IPython.config.loader import Config
43 from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum
43 from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum
44
44
45 from IPython.parallel import error
45 from IPython.parallel import error
46 from IPython.parallel.factory import SessionFactory
46 from IPython.parallel.factory import SessionFactory
47 from IPython.parallel.util import connect_logger, local_logger
47 from IPython.parallel.util import connect_logger, local_logger
48
48
49 from .dependency import Dependency
49 from .dependency import Dependency
50
50
51 @decorator
51 @decorator
52 def logged(f,self,*args,**kwargs):
52 def logged(f,self,*args,**kwargs):
53 # print ("#--------------------")
53 # print ("#--------------------")
54 self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs)
54 self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs)
55 # print ("#--")
55 # print ("#--")
56 return f(self,*args, **kwargs)
56 return f(self,*args, **kwargs)
57
57
58 #----------------------------------------------------------------------
58 #----------------------------------------------------------------------
59 # Chooser functions
59 # Chooser functions
60 #----------------------------------------------------------------------
60 #----------------------------------------------------------------------
61
61
62 def plainrandom(loads):
62 def plainrandom(loads):
63 """Plain random pick."""
63 """Plain random pick."""
64 n = len(loads)
64 n = len(loads)
65 return randint(0,n-1)
65 return randint(0,n-1)
66
66
67 def lru(loads):
67 def lru(loads):
68 """Always pick the front of the line.
68 """Always pick the front of the line.
69
69
70 The content of `loads` is ignored.
70 The content of `loads` is ignored.
71
71
72 Assumes LRU ordering of loads, with oldest first.
72 Assumes LRU ordering of loads, with oldest first.
73 """
73 """
74 return 0
74 return 0
75
75
76 def twobin(loads):
76 def twobin(loads):
77 """Pick two at random, use the LRU of the two.
77 """Pick two at random, use the LRU of the two.
78
78
79 The content of loads is ignored.
79 The content of loads is ignored.
80
80
81 Assumes LRU ordering of loads, with oldest first.
81 Assumes LRU ordering of loads, with oldest first.
82 """
82 """
83 n = len(loads)
83 n = len(loads)
84 a = randint(0,n-1)
84 a = randint(0,n-1)
85 b = randint(0,n-1)
85 b = randint(0,n-1)
86 return min(a,b)
86 return min(a,b)
87
87
88 def weighted(loads):
88 def weighted(loads):
89 """Pick two at random using inverse load as weight.
89 """Pick two at random using inverse load as weight.
90
90
91 Return the less loaded of the two.
91 Return the less loaded of the two.
92 """
92 """
93 # weight 0 a million times more than 1:
93 # weight 0 a million times more than 1:
94 weights = 1./(1e-6+numpy.array(loads))
94 weights = 1./(1e-6+numpy.array(loads))
95 sums = weights.cumsum()
95 sums = weights.cumsum()
96 t = sums[-1]
96 t = sums[-1]
97 x = random()*t
97 x = random()*t
98 y = random()*t
98 y = random()*t
99 idx = 0
99 idx = 0
100 idy = 0
100 idy = 0
101 while sums[idx] < x:
101 while sums[idx] < x:
102 idx += 1
102 idx += 1
103 while sums[idy] < y:
103 while sums[idy] < y:
104 idy += 1
104 idy += 1
105 if weights[idy] > weights[idx]:
105 if weights[idy] > weights[idx]:
106 return idy
106 return idy
107 else:
107 else:
108 return idx
108 return idx
109
109
110 def leastload(loads):
110 def leastload(loads):
111 """Always choose the lowest load.
111 """Always choose the lowest load.
112
112
113 If the lowest load occurs more than once, the first
113 If the lowest load occurs more than once, the first
114 occurance will be used. If loads has LRU ordering, this means
114 occurance will be used. If loads has LRU ordering, this means
115 the LRU of those with the lowest load is chosen.
115 the LRU of those with the lowest load is chosen.
116 """
116 """
117 return loads.index(min(loads))
117 return loads.index(min(loads))
118
118
119 #---------------------------------------------------------------------
119 #---------------------------------------------------------------------
120 # Classes
120 # Classes
121 #---------------------------------------------------------------------
121 #---------------------------------------------------------------------
122 # store empty default dependency:
122 # store empty default dependency:
123 MET = Dependency([])
123 MET = Dependency([])
124
124
125 class TaskScheduler(SessionFactory):
125 class TaskScheduler(SessionFactory):
126 """Python TaskScheduler object.
126 """Python TaskScheduler object.
127
127
128 This is the simplest object that supports msg_id based
128 This is the simplest object that supports msg_id based
129 DAG dependencies. *Only* task msg_ids are checked, not
129 DAG dependencies. *Only* task msg_ids are checked, not
130 msg_ids of jobs submitted via the MUX queue.
130 msg_ids of jobs submitted via the MUX queue.
131
131
132 """
132 """
133
133
134 hwm = Int(0, config=True, shortname='hwm',
134 hwm = Int(0, config=True, shortname='hwm',
135 help="""specify the High Water Mark (HWM) for the downstream
135 help="""specify the High Water Mark (HWM) for the downstream
136 socket in the Task scheduler. This is the maximum number
136 socket in the Task scheduler. This is the maximum number
137 of allowed outstanding tasks on each engine."""
137 of allowed outstanding tasks on each engine."""
138 )
138 )
139 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
139 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
140 'leastload', config=True, shortname='scheme', allow_none=False,
140 'leastload', config=True, shortname='scheme', allow_none=False,
141 help="""select the task scheduler scheme [default: Python LRU]
141 help="""select the task scheduler scheme [default: Python LRU]
142 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
142 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
143 )
143 )
144 def _scheme_name_changed(self, old, new):
144 def _scheme_name_changed(self, old, new):
145 self.log.debug("Using scheme %r"%new)
145 self.log.debug("Using scheme %r"%new)
146 self.scheme = globals()[new]
146 self.scheme = globals()[new]
147
147
148 # input arguments:
148 # input arguments:
149 scheme = Instance(FunctionType) # function for determining the destination
149 scheme = Instance(FunctionType) # function for determining the destination
150 def _scheme_default(self):
150 def _scheme_default(self):
151 return leastload
151 return leastload
152 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
152 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
153 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
153 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
154 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
154 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
155 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
155 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
156
156
157 # internals:
157 # internals:
158 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
158 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
159 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
159 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
160 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
160 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
161 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
161 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
162 pending = Dict() # dict by engine_uuid of submitted tasks
162 pending = Dict() # dict by engine_uuid of submitted tasks
163 completed = Dict() # dict by engine_uuid of completed tasks
163 completed = Dict() # dict by engine_uuid of completed tasks
164 failed = Dict() # dict by engine_uuid of failed tasks
164 failed = Dict() # dict by engine_uuid of failed tasks
165 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
165 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
166 clients = Dict() # dict by msg_id for who submitted the task
166 clients = Dict() # dict by msg_id for who submitted the task
167 targets = List() # list of target IDENTs
167 targets = List() # list of target IDENTs
168 loads = List() # list of engine loads
168 loads = List() # list of engine loads
169 # full = Set() # set of IDENTs that have HWM outstanding tasks
169 # full = Set() # set of IDENTs that have HWM outstanding tasks
170 all_completed = Set() # set of all completed tasks
170 all_completed = Set() # set of all completed tasks
171 all_failed = Set() # set of all failed tasks
171 all_failed = Set() # set of all failed tasks
172 all_done = Set() # set of all finished tasks=union(completed,failed)
172 all_done = Set() # set of all finished tasks=union(completed,failed)
173 all_ids = Set() # set of all submitted task IDs
173 all_ids = Set() # set of all submitted task IDs
174 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
174 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
175 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
175 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
176
176
177
177
178 def start(self):
178 def start(self):
179 self.engine_stream.on_recv(self.dispatch_result, copy=False)
179 self.engine_stream.on_recv(self.dispatch_result, copy=False)
180 self._notification_handlers = dict(
180 self._notification_handlers = dict(
181 registration_notification = self._register_engine,
181 registration_notification = self._register_engine,
182 unregistration_notification = self._unregister_engine
182 unregistration_notification = self._unregister_engine
183 )
183 )
184 self.notifier_stream.on_recv(self.dispatch_notification)
184 self.notifier_stream.on_recv(self.dispatch_notification)
185 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
185 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
186 self.auditor.start()
186 self.auditor.start()
187 self.log.info("Scheduler started [%s]"%self.scheme_name)
187 self.log.info("Scheduler started [%s]"%self.scheme_name)
188
188
189 def resume_receiving(self):
189 def resume_receiving(self):
190 """Resume accepting jobs."""
190 """Resume accepting jobs."""
191 self.client_stream.on_recv(self.dispatch_submission, copy=False)
191 self.client_stream.on_recv(self.dispatch_submission, copy=False)
192
192
193 def stop_receiving(self):
193 def stop_receiving(self):
194 """Stop accepting jobs while there are no engines.
194 """Stop accepting jobs while there are no engines.
195 Leave them in the ZMQ queue."""
195 Leave them in the ZMQ queue."""
196 self.client_stream.on_recv(None)
196 self.client_stream.on_recv(None)
197
197
198 #-----------------------------------------------------------------------
198 #-----------------------------------------------------------------------
199 # [Un]Registration Handling
199 # [Un]Registration Handling
200 #-----------------------------------------------------------------------
200 #-----------------------------------------------------------------------
201
201
202 def dispatch_notification(self, msg):
202 def dispatch_notification(self, msg):
203 """dispatch register/unregister events."""
203 """dispatch register/unregister events."""
204 try:
204 try:
205 idents,msg = self.session.feed_identities(msg)
205 idents,msg = self.session.feed_identities(msg)
206 except ValueError:
206 except ValueError:
207 self.log.warn("task::Invalid Message: %r"%msg)
207 self.log.warn("task::Invalid Message: %r"%msg)
208 return
208 return
209 try:
209 try:
210 msg = self.session.unpack_message(msg)
210 msg = self.session.unpack_message(msg)
211 except ValueError:
211 except ValueError:
212 self.log.warn("task::Unauthorized message from: %r"%idents)
212 self.log.warn("task::Unauthorized message from: %r"%idents)
213 return
213 return
214
214
215 msg_type = msg['msg_type']
215 msg_type = msg['msg_type']
216
216
217 handler = self._notification_handlers.get(msg_type, None)
217 handler = self._notification_handlers.get(msg_type, None)
218 if handler is None:
218 if handler is None:
219 self.log.error("Unhandled message type: %r"%msg_type)
219 self.log.error("Unhandled message type: %r"%msg_type)
220 else:
220 else:
221 try:
221 try:
222 handler(str(msg['content']['queue']))
222 handler(str(msg['content']['queue']))
223 except KeyError:
223 except KeyError:
224 self.log.error("task::Invalid notification msg: %r"%msg)
224 self.log.error("task::Invalid notification msg: %r"%msg)
225
225
226 @logged
227 def _register_engine(self, uid):
226 def _register_engine(self, uid):
228 """New engine with ident `uid` became available."""
227 """New engine with ident `uid` became available."""
229 # head of the line:
228 # head of the line:
230 self.targets.insert(0,uid)
229 self.targets.insert(0,uid)
231 self.loads.insert(0,0)
230 self.loads.insert(0,0)
232 # initialize sets
231 # initialize sets
233 self.completed[uid] = set()
232 self.completed[uid] = set()
234 self.failed[uid] = set()
233 self.failed[uid] = set()
235 self.pending[uid] = {}
234 self.pending[uid] = {}
236 if len(self.targets) == 1:
235 if len(self.targets) == 1:
237 self.resume_receiving()
236 self.resume_receiving()
238 # rescan the graph:
237 # rescan the graph:
239 self.update_graph(None)
238 self.update_graph(None)
240
239
241 def _unregister_engine(self, uid):
240 def _unregister_engine(self, uid):
242 """Existing engine with ident `uid` became unavailable."""
241 """Existing engine with ident `uid` became unavailable."""
243 if len(self.targets) == 1:
242 if len(self.targets) == 1:
244 # this was our only engine
243 # this was our only engine
245 self.stop_receiving()
244 self.stop_receiving()
246
245
247 # handle any potentially finished tasks:
246 # handle any potentially finished tasks:
248 self.engine_stream.flush()
247 self.engine_stream.flush()
249
248
250 # don't pop destinations, because they might be used later
249 # don't pop destinations, because they might be used later
251 # map(self.destinations.pop, self.completed.pop(uid))
250 # map(self.destinations.pop, self.completed.pop(uid))
252 # map(self.destinations.pop, self.failed.pop(uid))
251 # map(self.destinations.pop, self.failed.pop(uid))
253
252
254 # prevent this engine from receiving work
253 # prevent this engine from receiving work
255 idx = self.targets.index(uid)
254 idx = self.targets.index(uid)
256 self.targets.pop(idx)
255 self.targets.pop(idx)
257 self.loads.pop(idx)
256 self.loads.pop(idx)
258
257
259 # wait 5 seconds before cleaning up pending jobs, since the results might
258 # wait 5 seconds before cleaning up pending jobs, since the results might
260 # still be incoming
259 # still be incoming
261 if self.pending[uid]:
260 if self.pending[uid]:
262 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
261 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
263 dc.start()
262 dc.start()
264 else:
263 else:
265 self.completed.pop(uid)
264 self.completed.pop(uid)
266 self.failed.pop(uid)
265 self.failed.pop(uid)
267
266
268
267
269 def handle_stranded_tasks(self, engine):
268 def handle_stranded_tasks(self, engine):
270 """Deal with jobs resident in an engine that died."""
269 """Deal with jobs resident in an engine that died."""
271 lost = self.pending[engine]
270 lost = self.pending[engine]
272 for msg_id in lost.keys():
271 for msg_id in lost.keys():
273 if msg_id not in self.pending[engine]:
272 if msg_id not in self.pending[engine]:
274 # prevent double-handling of messages
273 # prevent double-handling of messages
275 continue
274 continue
276
275
277 raw_msg = lost[msg_id][0]
276 raw_msg = lost[msg_id][0]
278 idents,msg = self.session.feed_identities(raw_msg, copy=False)
277 idents,msg = self.session.feed_identities(raw_msg, copy=False)
279 parent = self.session.unpack(msg[1].bytes)
278 parent = self.session.unpack(msg[1].bytes)
280 idents = [engine, idents[0]]
279 idents = [engine, idents[0]]
281
280
282 # build fake error reply
281 # build fake error reply
283 try:
282 try:
284 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
283 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
285 except:
284 except:
286 content = error.wrap_exception()
285 content = error.wrap_exception()
287 msg = self.session.msg('apply_reply', content, parent=parent, subheader={'status':'error'})
286 msg = self.session.msg('apply_reply', content, parent=parent, subheader={'status':'error'})
288 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
287 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
289 # and dispatch it
288 # and dispatch it
290 self.dispatch_result(raw_reply)
289 self.dispatch_result(raw_reply)
291
290
292 # finally scrub completed/failed lists
291 # finally scrub completed/failed lists
293 self.completed.pop(engine)
292 self.completed.pop(engine)
294 self.failed.pop(engine)
293 self.failed.pop(engine)
295
294
296
295
297 #-----------------------------------------------------------------------
296 #-----------------------------------------------------------------------
298 # Job Submission
297 # Job Submission
299 #-----------------------------------------------------------------------
298 #-----------------------------------------------------------------------
300 def dispatch_submission(self, raw_msg):
299 def dispatch_submission(self, raw_msg):
301 """Dispatch job submission to appropriate handlers."""
300 """Dispatch job submission to appropriate handlers."""
302 # ensure targets up to date:
301 # ensure targets up to date:
303 self.notifier_stream.flush()
302 self.notifier_stream.flush()
304 try:
303 try:
305 idents, msg = self.session.feed_identities(raw_msg, copy=False)
304 idents, msg = self.session.feed_identities(raw_msg, copy=False)
306 msg = self.session.unpack_message(msg, content=False, copy=False)
305 msg = self.session.unpack_message(msg, content=False, copy=False)
307 except Exception:
306 except Exception:
308 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
307 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
309 return
308 return
310
309
311
310
312 # send to monitor
311 # send to monitor
313 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
312 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
314
313
315 header = msg['header']
314 header = msg['header']
316 msg_id = header['msg_id']
315 msg_id = header['msg_id']
317 self.all_ids.add(msg_id)
316 self.all_ids.add(msg_id)
318
317
319 # targets
318 # targets
320 targets = set(header.get('targets', []))
319 targets = set(header.get('targets', []))
321 retries = header.get('retries', 0)
320 retries = header.get('retries', 0)
322 self.retries[msg_id] = retries
321 self.retries[msg_id] = retries
323
322
324 # time dependencies
323 # time dependencies
325 after = Dependency(header.get('after', []))
324 after = header.get('after', None)
326 if after.all:
325 if after:
327 if after.success:
326 after = Dependency(after)
328 after.difference_update(self.all_completed)
327 if after.all:
329 if after.failure:
328 if after.success:
330 after.difference_update(self.all_failed)
329 after = after.difference(self.all_completed)
331 if after.check(self.all_completed, self.all_failed):
330 if after.failure:
332 # recast as empty set, if `after` already met,
331 after = after.difference(self.all_failed)
333 # to prevent unnecessary set comparisons
332 if after.check(self.all_completed, self.all_failed):
333 # recast as empty set, if `after` already met,
334 # to prevent unnecessary set comparisons
335 after = MET
336 else:
334 after = MET
337 after = MET
335
338
336 # location dependencies
339 # location dependencies
337 follow = Dependency(header.get('follow', []))
340 follow = Dependency(header.get('follow', []))
338
341
339 # turn timeouts into datetime objects:
342 # turn timeouts into datetime objects:
340 timeout = header.get('timeout', None)
343 timeout = header.get('timeout', None)
341 if timeout:
344 if timeout:
342 timeout = datetime.now() + timedelta(0,timeout,0)
345 timeout = datetime.now() + timedelta(0,timeout,0)
343
346
344 args = [raw_msg, targets, after, follow, timeout]
347 args = [raw_msg, targets, after, follow, timeout]
345
348
346 # validate and reduce dependencies:
349 # validate and reduce dependencies:
347 for dep in after,follow:
350 for dep in after,follow:
351 if not dep: # empty dependency
352 continue
348 # check valid:
353 # check valid:
349 if msg_id in dep or dep.difference(self.all_ids):
354 if msg_id in dep or dep.difference(self.all_ids):
350 self.depending[msg_id] = args
355 self.depending[msg_id] = args
351 return self.fail_unreachable(msg_id, error.InvalidDependency)
356 return self.fail_unreachable(msg_id, error.InvalidDependency)
352 # check if unreachable:
357 # check if unreachable:
353 if dep.unreachable(self.all_completed, self.all_failed):
358 if dep.unreachable(self.all_completed, self.all_failed):
354 self.depending[msg_id] = args
359 self.depending[msg_id] = args
355 return self.fail_unreachable(msg_id)
360 return self.fail_unreachable(msg_id)
356
361
357 if after.check(self.all_completed, self.all_failed):
362 if after.check(self.all_completed, self.all_failed):
358 # time deps already met, try to run
363 # time deps already met, try to run
359 if not self.maybe_run(msg_id, *args):
364 if not self.maybe_run(msg_id, *args):
360 # can't run yet
365 # can't run yet
361 if msg_id not in self.all_failed:
366 if msg_id not in self.all_failed:
362 # could have failed as unreachable
367 # could have failed as unreachable
363 self.save_unmet(msg_id, *args)
368 self.save_unmet(msg_id, *args)
364 else:
369 else:
365 self.save_unmet(msg_id, *args)
370 self.save_unmet(msg_id, *args)
366
371
367 def audit_timeouts(self):
372 def audit_timeouts(self):
368 """Audit all waiting tasks for expired timeouts."""
373 """Audit all waiting tasks for expired timeouts."""
369 now = datetime.now()
374 now = datetime.now()
370 for msg_id in self.depending.keys():
375 for msg_id in self.depending.keys():
371 # must recheck, in case one failure cascaded to another:
376 # must recheck, in case one failure cascaded to another:
372 if msg_id in self.depending:
377 if msg_id in self.depending:
373 raw,after,targets,follow,timeout = self.depending[msg_id]
378 raw,after,targets,follow,timeout = self.depending[msg_id]
374 if timeout and timeout < now:
379 if timeout and timeout < now:
375 self.fail_unreachable(msg_id, error.TaskTimeout)
380 self.fail_unreachable(msg_id, error.TaskTimeout)
376
381
377 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
382 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
378 """a task has become unreachable, send a reply with an ImpossibleDependency
383 """a task has become unreachable, send a reply with an ImpossibleDependency
379 error."""
384 error."""
380 if msg_id not in self.depending:
385 if msg_id not in self.depending:
381 self.log.error("msg %r already failed!", msg_id)
386 self.log.error("msg %r already failed!", msg_id)
382 return
387 return
383 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
388 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
384 for mid in follow.union(after):
389 for mid in follow.union(after):
385 if mid in self.graph:
390 if mid in self.graph:
386 self.graph[mid].remove(msg_id)
391 self.graph[mid].remove(msg_id)
387
392
388 # FIXME: unpacking a message I've already unpacked, but didn't save:
393 # FIXME: unpacking a message I've already unpacked, but didn't save:
389 idents,msg = self.session.feed_identities(raw_msg, copy=False)
394 idents,msg = self.session.feed_identities(raw_msg, copy=False)
390 header = self.session.unpack(msg[1].bytes)
395 header = self.session.unpack(msg[1].bytes)
391
396
392 try:
397 try:
393 raise why()
398 raise why()
394 except:
399 except:
395 content = error.wrap_exception()
400 content = error.wrap_exception()
396
401
397 self.all_done.add(msg_id)
402 self.all_done.add(msg_id)
398 self.all_failed.add(msg_id)
403 self.all_failed.add(msg_id)
399
404
400 msg = self.session.send(self.client_stream, 'apply_reply', content,
405 msg = self.session.send(self.client_stream, 'apply_reply', content,
401 parent=header, ident=idents)
406 parent=header, ident=idents)
402 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
407 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
403
408
404 self.update_graph(msg_id, success=False)
409 self.update_graph(msg_id, success=False)
405
410
406 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
411 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
407 """check location dependencies, and run if they are met."""
412 """check location dependencies, and run if they are met."""
408 blacklist = self.blacklist.setdefault(msg_id, set())
413 blacklist = self.blacklist.setdefault(msg_id, set())
409 if follow or targets or blacklist or self.hwm:
414 if follow or targets or blacklist or self.hwm:
410 # we need a can_run filter
415 # we need a can_run filter
411 def can_run(idx):
416 def can_run(idx):
412 # check hwm
417 # check hwm
413 if self.hwm and self.loads[idx] == self.hwm:
418 if self.hwm and self.loads[idx] == self.hwm:
414 return False
419 return False
415 target = self.targets[idx]
420 target = self.targets[idx]
416 # check blacklist
421 # check blacklist
417 if target in blacklist:
422 if target in blacklist:
418 return False
423 return False
419 # check targets
424 # check targets
420 if targets and target not in targets:
425 if targets and target not in targets:
421 return False
426 return False
422 # check follow
427 # check follow
423 return follow.check(self.completed[target], self.failed[target])
428 return follow.check(self.completed[target], self.failed[target])
424
429
425 indices = filter(can_run, range(len(self.targets)))
430 indices = filter(can_run, range(len(self.targets)))
426
431
427 if not indices:
432 if not indices:
428 # couldn't run
433 # couldn't run
429 if follow.all:
434 if follow.all:
430 # check follow for impossibility
435 # check follow for impossibility
431 dests = set()
436 dests = set()
432 relevant = set()
437 relevant = set()
433 if follow.success:
438 if follow.success:
434 relevant = self.all_completed
439 relevant = self.all_completed
435 if follow.failure:
440 if follow.failure:
436 relevant = relevant.union(self.all_failed)
441 relevant = relevant.union(self.all_failed)
437 for m in follow.intersection(relevant):
442 for m in follow.intersection(relevant):
438 dests.add(self.destinations[m])
443 dests.add(self.destinations[m])
439 if len(dests) > 1:
444 if len(dests) > 1:
440 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
445 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
441 self.fail_unreachable(msg_id)
446 self.fail_unreachable(msg_id)
442 return False
447 return False
443 if targets:
448 if targets:
444 # check blacklist+targets for impossibility
449 # check blacklist+targets for impossibility
445 targets.difference_update(blacklist)
450 targets.difference_update(blacklist)
446 if not targets or not targets.intersection(self.targets):
451 if not targets or not targets.intersection(self.targets):
447 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
452 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
448 self.fail_unreachable(msg_id)
453 self.fail_unreachable(msg_id)
449 return False
454 return False
450 return False
455 return False
451 else:
456 else:
452 indices = None
457 indices = None
453
458
454 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
459 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
455 return True
460 return True
456
461
457 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
462 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
458 """Save a message for later submission when its dependencies are met."""
463 """Save a message for later submission when its dependencies are met."""
459 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
464 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
460 # track the ids in follow or after, but not those already finished
465 # track the ids in follow or after, but not those already finished
461 for dep_id in after.union(follow).difference(self.all_done):
466 for dep_id in after.union(follow).difference(self.all_done):
462 if dep_id not in self.graph:
467 if dep_id not in self.graph:
463 self.graph[dep_id] = set()
468 self.graph[dep_id] = set()
464 self.graph[dep_id].add(msg_id)
469 self.graph[dep_id].add(msg_id)
465
470
466 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
471 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
467 """Submit a task to any of a subset of our targets."""
472 """Submit a task to any of a subset of our targets."""
468 if indices:
473 if indices:
469 loads = [self.loads[i] for i in indices]
474 loads = [self.loads[i] for i in indices]
470 else:
475 else:
471 loads = self.loads
476 loads = self.loads
472 idx = self.scheme(loads)
477 idx = self.scheme(loads)
473 if indices:
478 if indices:
474 idx = indices[idx]
479 idx = indices[idx]
475 target = self.targets[idx]
480 target = self.targets[idx]
476 # print (target, map(str, msg[:3]))
481 # print (target, map(str, msg[:3]))
477 # send job to the engine
482 # send job to the engine
478 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
483 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
479 self.engine_stream.send_multipart(raw_msg, copy=False)
484 self.engine_stream.send_multipart(raw_msg, copy=False)
480 # update load
485 # update load
481 self.add_job(idx)
486 self.add_job(idx)
482 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
487 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
483 # notify Hub
488 # notify Hub
484 content = dict(msg_id=msg_id, engine_id=target)
489 content = dict(msg_id=msg_id, engine_id=target)
485 self.session.send(self.mon_stream, 'task_destination', content=content,
490 self.session.send(self.mon_stream, 'task_destination', content=content,
486 ident=['tracktask',self.session.session])
491 ident=['tracktask',self.session.session])
487
492
488
493
489 #-----------------------------------------------------------------------
494 #-----------------------------------------------------------------------
490 # Result Handling
495 # Result Handling
491 #-----------------------------------------------------------------------
496 #-----------------------------------------------------------------------
492 def dispatch_result(self, raw_msg):
497 def dispatch_result(self, raw_msg):
493 """dispatch method for result replies"""
498 """dispatch method for result replies"""
494 try:
499 try:
495 idents,msg = self.session.feed_identities(raw_msg, copy=False)
500 idents,msg = self.session.feed_identities(raw_msg, copy=False)
496 msg = self.session.unpack_message(msg, content=False, copy=False)
501 msg = self.session.unpack_message(msg, content=False, copy=False)
497 engine = idents[0]
502 engine = idents[0]
498 try:
503 try:
499 idx = self.targets.index(engine)
504 idx = self.targets.index(engine)
500 except ValueError:
505 except ValueError:
501 pass # skip load-update for dead engines
506 pass # skip load-update for dead engines
502 else:
507 else:
503 self.finish_job(idx)
508 self.finish_job(idx)
504 except Exception:
509 except Exception:
505 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
510 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
506 return
511 return
507
512
508 header = msg['header']
513 header = msg['header']
509 parent = msg['parent_header']
514 parent = msg['parent_header']
510 if header.get('dependencies_met', True):
515 if header.get('dependencies_met', True):
511 success = (header['status'] == 'ok')
516 success = (header['status'] == 'ok')
512 msg_id = parent['msg_id']
517 msg_id = parent['msg_id']
513 retries = self.retries[msg_id]
518 retries = self.retries[msg_id]
514 if not success and retries > 0:
519 if not success and retries > 0:
515 # failed
520 # failed
516 self.retries[msg_id] = retries - 1
521 self.retries[msg_id] = retries - 1
517 self.handle_unmet_dependency(idents, parent)
522 self.handle_unmet_dependency(idents, parent)
518 else:
523 else:
519 del self.retries[msg_id]
524 del self.retries[msg_id]
520 # relay to client and update graph
525 # relay to client and update graph
521 self.handle_result(idents, parent, raw_msg, success)
526 self.handle_result(idents, parent, raw_msg, success)
522 # send to Hub monitor
527 # send to Hub monitor
523 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
528 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
524 else:
529 else:
525 self.handle_unmet_dependency(idents, parent)
530 self.handle_unmet_dependency(idents, parent)
526
531
527 def handle_result(self, idents, parent, raw_msg, success=True):
532 def handle_result(self, idents, parent, raw_msg, success=True):
528 """handle a real task result, either success or failure"""
533 """handle a real task result, either success or failure"""
529 # first, relay result to client
534 # first, relay result to client
530 engine = idents[0]
535 engine = idents[0]
531 client = idents[1]
536 client = idents[1]
532 # swap_ids for XREP-XREP mirror
537 # swap_ids for XREP-XREP mirror
533 raw_msg[:2] = [client,engine]
538 raw_msg[:2] = [client,engine]
534 # print (map(str, raw_msg[:4]))
539 # print (map(str, raw_msg[:4]))
535 self.client_stream.send_multipart(raw_msg, copy=False)
540 self.client_stream.send_multipart(raw_msg, copy=False)
536 # now, update our data structures
541 # now, update our data structures
537 msg_id = parent['msg_id']
542 msg_id = parent['msg_id']
538 self.blacklist.pop(msg_id, None)
543 self.blacklist.pop(msg_id, None)
539 self.pending[engine].pop(msg_id)
544 self.pending[engine].pop(msg_id)
540 if success:
545 if success:
541 self.completed[engine].add(msg_id)
546 self.completed[engine].add(msg_id)
542 self.all_completed.add(msg_id)
547 self.all_completed.add(msg_id)
543 else:
548 else:
544 self.failed[engine].add(msg_id)
549 self.failed[engine].add(msg_id)
545 self.all_failed.add(msg_id)
550 self.all_failed.add(msg_id)
546 self.all_done.add(msg_id)
551 self.all_done.add(msg_id)
547 self.destinations[msg_id] = engine
552 self.destinations[msg_id] = engine
548
553
549 self.update_graph(msg_id, success)
554 self.update_graph(msg_id, success)
550
555
551 def handle_unmet_dependency(self, idents, parent):
556 def handle_unmet_dependency(self, idents, parent):
552 """handle an unmet dependency"""
557 """handle an unmet dependency"""
553 engine = idents[0]
558 engine = idents[0]
554 msg_id = parent['msg_id']
559 msg_id = parent['msg_id']
555
560
556 if msg_id not in self.blacklist:
561 if msg_id not in self.blacklist:
557 self.blacklist[msg_id] = set()
562 self.blacklist[msg_id] = set()
558 self.blacklist[msg_id].add(engine)
563 self.blacklist[msg_id].add(engine)
559
564
560 args = self.pending[engine].pop(msg_id)
565 args = self.pending[engine].pop(msg_id)
561 raw,targets,after,follow,timeout = args
566 raw,targets,after,follow,timeout = args
562
567
563 if self.blacklist[msg_id] == targets:
568 if self.blacklist[msg_id] == targets:
564 self.depending[msg_id] = args
569 self.depending[msg_id] = args
565 self.fail_unreachable(msg_id)
570 self.fail_unreachable(msg_id)
566 elif not self.maybe_run(msg_id, *args):
571 elif not self.maybe_run(msg_id, *args):
567 # resubmit failed
572 # resubmit failed
568 if msg_id not in self.all_failed:
573 if msg_id not in self.all_failed:
569 # put it back in our dependency tree
574 # put it back in our dependency tree
570 self.save_unmet(msg_id, *args)
575 self.save_unmet(msg_id, *args)
571
576
572 if self.hwm:
577 if self.hwm:
573 try:
578 try:
574 idx = self.targets.index(engine)
579 idx = self.targets.index(engine)
575 except ValueError:
580 except ValueError:
576 pass # skip load-update for dead engines
581 pass # skip load-update for dead engines
577 else:
582 else:
578 if self.loads[idx] == self.hwm-1:
583 if self.loads[idx] == self.hwm-1:
579 self.update_graph(None)
584 self.update_graph(None)
580
585
581
586
582
587
583 def update_graph(self, dep_id=None, success=True):
588 def update_graph(self, dep_id=None, success=True):
584 """dep_id just finished. Update our dependency
589 """dep_id just finished. Update our dependency
585 graph and submit any jobs that just became runable.
590 graph and submit any jobs that just became runable.
586
591
587 Called with dep_id=None to update entire graph for hwm, but without finishing
592 Called with dep_id=None to update entire graph for hwm, but without finishing
588 a task.
593 a task.
589 """
594 """
590 # print ("\n\n***********")
595 # print ("\n\n***********")
591 # pprint (dep_id)
596 # pprint (dep_id)
592 # pprint (self.graph)
597 # pprint (self.graph)
593 # pprint (self.depending)
598 # pprint (self.depending)
594 # pprint (self.all_completed)
599 # pprint (self.all_completed)
595 # pprint (self.all_failed)
600 # pprint (self.all_failed)
596 # print ("\n\n***********\n\n")
601 # print ("\n\n***********\n\n")
597 # update any jobs that depended on the dependency
602 # update any jobs that depended on the dependency
598 jobs = self.graph.pop(dep_id, [])
603 jobs = self.graph.pop(dep_id, [])
599
604
600 # recheck *all* jobs if
605 # recheck *all* jobs if
601 # a) we have HWM and an engine just become no longer full
606 # a) we have HWM and an engine just become no longer full
602 # or b) dep_id was given as None
607 # or b) dep_id was given as None
603 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
608 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
604 jobs = self.depending.keys()
609 jobs = self.depending.keys()
605
610
606 for msg_id in jobs:
611 for msg_id in jobs:
607 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
612 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
608
613
609 if after.unreachable(self.all_completed, self.all_failed)\
614 if after.unreachable(self.all_completed, self.all_failed)\
610 or follow.unreachable(self.all_completed, self.all_failed):
615 or follow.unreachable(self.all_completed, self.all_failed):
611 self.fail_unreachable(msg_id)
616 self.fail_unreachable(msg_id)
612
617
613 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
618 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
614 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
619 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
615
620
616 self.depending.pop(msg_id)
621 self.depending.pop(msg_id)
617 for mid in follow.union(after):
622 for mid in follow.union(after):
618 if mid in self.graph:
623 if mid in self.graph:
619 self.graph[mid].remove(msg_id)
624 self.graph[mid].remove(msg_id)
620
625
621 #----------------------------------------------------------------------
626 #----------------------------------------------------------------------
622 # methods to be overridden by subclasses
627 # methods to be overridden by subclasses
623 #----------------------------------------------------------------------
628 #----------------------------------------------------------------------
624
629
625 def add_job(self, idx):
630 def add_job(self, idx):
626 """Called after self.targets[idx] just got the job with header.
631 """Called after self.targets[idx] just got the job with header.
627 Override with subclasses. The default ordering is simple LRU.
632 Override with subclasses. The default ordering is simple LRU.
628 The default loads are the number of outstanding jobs."""
633 The default loads are the number of outstanding jobs."""
629 self.loads[idx] += 1
634 self.loads[idx] += 1
630 for lis in (self.targets, self.loads):
635 for lis in (self.targets, self.loads):
631 lis.append(lis.pop(idx))
636 lis.append(lis.pop(idx))
632
637
633
638
634 def finish_job(self, idx):
639 def finish_job(self, idx):
635 """Called after self.targets[idx] just finished a job.
640 """Called after self.targets[idx] just finished a job.
636 Override with subclasses."""
641 Override with subclasses."""
637 self.loads[idx] -= 1
642 self.loads[idx] -= 1
638
643
639
644
640
645
641 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
646 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
642 logname='root', log_url=None, loglevel=logging.DEBUG,
647 logname='root', log_url=None, loglevel=logging.DEBUG,
643 identity=b'task', in_thread=False):
648 identity=b'task', in_thread=False):
644
649
645 ZMQStream = zmqstream.ZMQStream
650 ZMQStream = zmqstream.ZMQStream
646
651
647 if config:
652 if config:
648 # unwrap dict back into Config
653 # unwrap dict back into Config
649 config = Config(config)
654 config = Config(config)
650
655
651 if in_thread:
656 if in_thread:
652 # use instance() to get the same Context/Loop as our parent
657 # use instance() to get the same Context/Loop as our parent
653 ctx = zmq.Context.instance()
658 ctx = zmq.Context.instance()
654 loop = ioloop.IOLoop.instance()
659 loop = ioloop.IOLoop.instance()
655 else:
660 else:
656 # in a process, don't use instance()
661 # in a process, don't use instance()
657 # for safety with multiprocessing
662 # for safety with multiprocessing
658 ctx = zmq.Context()
663 ctx = zmq.Context()
659 loop = ioloop.IOLoop()
664 loop = ioloop.IOLoop()
660 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
665 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
661 ins.setsockopt(zmq.IDENTITY, identity)
666 ins.setsockopt(zmq.IDENTITY, identity)
662 ins.bind(in_addr)
667 ins.bind(in_addr)
663
668
664 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
669 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
665 outs.setsockopt(zmq.IDENTITY, identity)
670 outs.setsockopt(zmq.IDENTITY, identity)
666 outs.bind(out_addr)
671 outs.bind(out_addr)
667 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
672 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
668 mons.connect(mon_addr)
673 mons.connect(mon_addr)
669 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
674 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
670 nots.setsockopt(zmq.SUBSCRIBE, b'')
675 nots.setsockopt(zmq.SUBSCRIBE, b'')
671 nots.connect(not_addr)
676 nots.connect(not_addr)
672
677
673 # setup logging.
678 # setup logging.
674 if in_thread:
679 if in_thread:
675 log = Application.instance().log
680 log = Application.instance().log
676 else:
681 else:
677 if log_url:
682 if log_url:
678 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
683 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
679 else:
684 else:
680 log = local_logger(logname, loglevel)
685 log = local_logger(logname, loglevel)
681
686
682 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
687 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
683 mon_stream=mons, notifier_stream=nots,
688 mon_stream=mons, notifier_stream=nots,
684 loop=loop, log=log,
689 loop=loop, log=log,
685 config=config)
690 config=config)
686 scheduler.start()
691 scheduler.start()
687 if not in_thread:
692 if not in_thread:
688 try:
693 try:
689 loop.start()
694 loop.start()
690 except KeyboardInterrupt:
695 except KeyboardInterrupt:
691 print ("interrupted, exiting...", file=sys.__stderr__)
696 print ("interrupted, exiting...", file=sys.__stderr__)
692
697
General Comments 0
You need to be logged in to leave comments. Login now