##// END OF EJS Templates
remove debug statements from Scheduler...
MinRK -
Show More
@@ -1,703 +1,692 b''
1 """The Python scheduler for rich scheduling.
1 """The Python scheduler for rich scheduling.
2
2
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 Python Scheduler exists.
5 Python Scheduler exists.
6
6
7 Authors:
7 Authors:
8
8
9 * Min RK
9 * Min RK
10 """
10 """
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2010-2011 The IPython Development Team
12 # Copyright (C) 2010-2011 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #----------------------------------------------------------------------
18 #----------------------------------------------------------------------
19 # Imports
19 # Imports
20 #----------------------------------------------------------------------
20 #----------------------------------------------------------------------
21
21
22 from __future__ import print_function
22 from __future__ import print_function
23
23
24 import logging
24 import logging
25 import sys
25 import sys
26
26
27 from datetime import datetime, timedelta
27 from datetime import datetime, timedelta
28 from random import randint, random
28 from random import randint, random
29 from types import FunctionType
29 from types import FunctionType
30
30
31 try:
31 try:
32 import numpy
32 import numpy
33 except ImportError:
33 except ImportError:
34 numpy = None
34 numpy = None
35
35
36 import zmq
36 import zmq
37 from zmq.eventloop import ioloop, zmqstream
37 from zmq.eventloop import ioloop, zmqstream
38
38
39 # local imports
39 # local imports
40 from IPython.external.decorator import decorator
40 from IPython.external.decorator import decorator
41 from IPython.config.application import Application
41 from IPython.config.application import Application
42 from IPython.config.loader import Config
42 from IPython.config.loader import Config
43 from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum
43 from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum
44
44
45 from IPython.parallel import error
45 from IPython.parallel import error
46 from IPython.parallel.factory import SessionFactory
46 from IPython.parallel.factory import SessionFactory
47 from IPython.parallel.util import connect_logger, local_logger
47 from IPython.parallel.util import connect_logger, local_logger
48
48
49 from .dependency import Dependency
49 from .dependency import Dependency
50
50
51 @decorator
51 @decorator
52 def logged(f,self,*args,**kwargs):
52 def logged(f,self,*args,**kwargs):
53 # print ("#--------------------")
53 # print ("#--------------------")
54 self.log.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
54 self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs)
55 # print ("#--")
55 # print ("#--")
56 return f(self,*args, **kwargs)
56 return f(self,*args, **kwargs)
57
57
58 #----------------------------------------------------------------------
58 #----------------------------------------------------------------------
59 # Chooser functions
59 # Chooser functions
60 #----------------------------------------------------------------------
60 #----------------------------------------------------------------------
61
61
62 def plainrandom(loads):
62 def plainrandom(loads):
63 """Plain random pick."""
63 """Plain random pick."""
64 n = len(loads)
64 n = len(loads)
65 return randint(0,n-1)
65 return randint(0,n-1)
66
66
67 def lru(loads):
67 def lru(loads):
68 """Always pick the front of the line.
68 """Always pick the front of the line.
69
69
70 The content of `loads` is ignored.
70 The content of `loads` is ignored.
71
71
72 Assumes LRU ordering of loads, with oldest first.
72 Assumes LRU ordering of loads, with oldest first.
73 """
73 """
74 return 0
74 return 0
75
75
76 def twobin(loads):
76 def twobin(loads):
77 """Pick two at random, use the LRU of the two.
77 """Pick two at random, use the LRU of the two.
78
78
79 The content of loads is ignored.
79 The content of loads is ignored.
80
80
81 Assumes LRU ordering of loads, with oldest first.
81 Assumes LRU ordering of loads, with oldest first.
82 """
82 """
83 n = len(loads)
83 n = len(loads)
84 a = randint(0,n-1)
84 a = randint(0,n-1)
85 b = randint(0,n-1)
85 b = randint(0,n-1)
86 return min(a,b)
86 return min(a,b)
87
87
88 def weighted(loads):
88 def weighted(loads):
89 """Pick two at random using inverse load as weight.
89 """Pick two at random using inverse load as weight.
90
90
91 Return the less loaded of the two.
91 Return the less loaded of the two.
92 """
92 """
93 # weight 0 a million times more than 1:
93 # weight 0 a million times more than 1:
94 weights = 1./(1e-6+numpy.array(loads))
94 weights = 1./(1e-6+numpy.array(loads))
95 sums = weights.cumsum()
95 sums = weights.cumsum()
96 t = sums[-1]
96 t = sums[-1]
97 x = random()*t
97 x = random()*t
98 y = random()*t
98 y = random()*t
99 idx = 0
99 idx = 0
100 idy = 0
100 idy = 0
101 while sums[idx] < x:
101 while sums[idx] < x:
102 idx += 1
102 idx += 1
103 while sums[idy] < y:
103 while sums[idy] < y:
104 idy += 1
104 idy += 1
105 if weights[idy] > weights[idx]:
105 if weights[idy] > weights[idx]:
106 return idy
106 return idy
107 else:
107 else:
108 return idx
108 return idx
109
109
110 def leastload(loads):
110 def leastload(loads):
111 """Always choose the lowest load.
111 """Always choose the lowest load.
112
112
113 If the lowest load occurs more than once, the first
113 If the lowest load occurs more than once, the first
114 occurance will be used. If loads has LRU ordering, this means
114 occurance will be used. If loads has LRU ordering, this means
115 the LRU of those with the lowest load is chosen.
115 the LRU of those with the lowest load is chosen.
116 """
116 """
117 return loads.index(min(loads))
117 return loads.index(min(loads))
118
118
119 #---------------------------------------------------------------------
119 #---------------------------------------------------------------------
120 # Classes
120 # Classes
121 #---------------------------------------------------------------------
121 #---------------------------------------------------------------------
122 # store empty default dependency:
122 # store empty default dependency:
123 MET = Dependency([])
123 MET = Dependency([])
124
124
125 class TaskScheduler(SessionFactory):
125 class TaskScheduler(SessionFactory):
126 """Python TaskScheduler object.
126 """Python TaskScheduler object.
127
127
128 This is the simplest object that supports msg_id based
128 This is the simplest object that supports msg_id based
129 DAG dependencies. *Only* task msg_ids are checked, not
129 DAG dependencies. *Only* task msg_ids are checked, not
130 msg_ids of jobs submitted via the MUX queue.
130 msg_ids of jobs submitted via the MUX queue.
131
131
132 """
132 """
133
133
134 hwm = Int(0, config=True, shortname='hwm',
134 hwm = Int(0, config=True, shortname='hwm',
135 help="""specify the High Water Mark (HWM) for the downstream
135 help="""specify the High Water Mark (HWM) for the downstream
136 socket in the Task scheduler. This is the maximum number
136 socket in the Task scheduler. This is the maximum number
137 of allowed outstanding tasks on each engine."""
137 of allowed outstanding tasks on each engine."""
138 )
138 )
139 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
139 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
140 'leastload', config=True, shortname='scheme', allow_none=False,
140 'leastload', config=True, shortname='scheme', allow_none=False,
141 help="""select the task scheduler scheme [default: Python LRU]
141 help="""select the task scheduler scheme [default: Python LRU]
142 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
142 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
143 )
143 )
144 def _scheme_name_changed(self, old, new):
144 def _scheme_name_changed(self, old, new):
145 self.log.debug("Using scheme %r"%new)
145 self.log.debug("Using scheme %r"%new)
146 self.scheme = globals()[new]
146 self.scheme = globals()[new]
147
147
148 # input arguments:
148 # input arguments:
149 scheme = Instance(FunctionType) # function for determining the destination
149 scheme = Instance(FunctionType) # function for determining the destination
150 def _scheme_default(self):
150 def _scheme_default(self):
151 return leastload
151 return leastload
152 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
152 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
153 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
153 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
154 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
154 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
155 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
155 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
156
156
157 # internals:
157 # internals:
158 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
158 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
159 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
159 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
160 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
160 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
161 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
161 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
162 pending = Dict() # dict by engine_uuid of submitted tasks
162 pending = Dict() # dict by engine_uuid of submitted tasks
163 completed = Dict() # dict by engine_uuid of completed tasks
163 completed = Dict() # dict by engine_uuid of completed tasks
164 failed = Dict() # dict by engine_uuid of failed tasks
164 failed = Dict() # dict by engine_uuid of failed tasks
165 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
165 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
166 clients = Dict() # dict by msg_id for who submitted the task
166 clients = Dict() # dict by msg_id for who submitted the task
167 targets = List() # list of target IDENTs
167 targets = List() # list of target IDENTs
168 loads = List() # list of engine loads
168 loads = List() # list of engine loads
169 # full = Set() # set of IDENTs that have HWM outstanding tasks
169 # full = Set() # set of IDENTs that have HWM outstanding tasks
170 all_completed = Set() # set of all completed tasks
170 all_completed = Set() # set of all completed tasks
171 all_failed = Set() # set of all failed tasks
171 all_failed = Set() # set of all failed tasks
172 all_done = Set() # set of all finished tasks=union(completed,failed)
172 all_done = Set() # set of all finished tasks=union(completed,failed)
173 all_ids = Set() # set of all submitted task IDs
173 all_ids = Set() # set of all submitted task IDs
174 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
174 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
175 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
175 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
176
176
177
177
178 def start(self):
178 def start(self):
179 self.engine_stream.on_recv(self.dispatch_result, copy=False)
179 self.engine_stream.on_recv(self.dispatch_result, copy=False)
180 self._notification_handlers = dict(
180 self._notification_handlers = dict(
181 registration_notification = self._register_engine,
181 registration_notification = self._register_engine,
182 unregistration_notification = self._unregister_engine
182 unregistration_notification = self._unregister_engine
183 )
183 )
184 self.notifier_stream.on_recv(self.dispatch_notification)
184 self.notifier_stream.on_recv(self.dispatch_notification)
185 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
185 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
186 self.auditor.start()
186 self.auditor.start()
187 self.log.info("Scheduler started [%s]"%self.scheme_name)
187 self.log.info("Scheduler started [%s]"%self.scheme_name)
188
188
189 def resume_receiving(self):
189 def resume_receiving(self):
190 """Resume accepting jobs."""
190 """Resume accepting jobs."""
191 self.client_stream.on_recv(self.dispatch_submission, copy=False)
191 self.client_stream.on_recv(self.dispatch_submission, copy=False)
192
192
193 def stop_receiving(self):
193 def stop_receiving(self):
194 """Stop accepting jobs while there are no engines.
194 """Stop accepting jobs while there are no engines.
195 Leave them in the ZMQ queue."""
195 Leave them in the ZMQ queue."""
196 self.client_stream.on_recv(None)
196 self.client_stream.on_recv(None)
197
197
198 #-----------------------------------------------------------------------
198 #-----------------------------------------------------------------------
199 # [Un]Registration Handling
199 # [Un]Registration Handling
200 #-----------------------------------------------------------------------
200 #-----------------------------------------------------------------------
201
201
202 def dispatch_notification(self, msg):
202 def dispatch_notification(self, msg):
203 """dispatch register/unregister events."""
203 """dispatch register/unregister events."""
204 try:
204 try:
205 idents,msg = self.session.feed_identities(msg)
205 idents,msg = self.session.feed_identities(msg)
206 except ValueError:
206 except ValueError:
207 self.log.warn("task::Invalid Message: %r"%msg)
207 self.log.warn("task::Invalid Message: %r"%msg)
208 return
208 return
209 try:
209 try:
210 msg = self.session.unpack_message(msg)
210 msg = self.session.unpack_message(msg)
211 except ValueError:
211 except ValueError:
212 self.log.warn("task::Unauthorized message from: %r"%idents)
212 self.log.warn("task::Unauthorized message from: %r"%idents)
213 return
213 return
214
214
215 msg_type = msg['msg_type']
215 msg_type = msg['msg_type']
216
216
217 handler = self._notification_handlers.get(msg_type, None)
217 handler = self._notification_handlers.get(msg_type, None)
218 if handler is None:
218 if handler is None:
219 self.log.error("Unhandled message type: %r"%msg_type)
219 self.log.error("Unhandled message type: %r"%msg_type)
220 else:
220 else:
221 try:
221 try:
222 handler(str(msg['content']['queue']))
222 handler(str(msg['content']['queue']))
223 except KeyError:
223 except KeyError:
224 self.log.error("task::Invalid notification msg: %r"%msg)
224 self.log.error("task::Invalid notification msg: %r"%msg)
225
225
226 @logged
226 @logged
227 def _register_engine(self, uid):
227 def _register_engine(self, uid):
228 """New engine with ident `uid` became available."""
228 """New engine with ident `uid` became available."""
229 # head of the line:
229 # head of the line:
230 self.targets.insert(0,uid)
230 self.targets.insert(0,uid)
231 self.loads.insert(0,0)
231 self.loads.insert(0,0)
232 # initialize sets
232 # initialize sets
233 self.completed[uid] = set()
233 self.completed[uid] = set()
234 self.failed[uid] = set()
234 self.failed[uid] = set()
235 self.pending[uid] = {}
235 self.pending[uid] = {}
236 if len(self.targets) == 1:
236 if len(self.targets) == 1:
237 self.resume_receiving()
237 self.resume_receiving()
238 # rescan the graph:
238 # rescan the graph:
239 self.update_graph(None)
239 self.update_graph(None)
240
240
241 def _unregister_engine(self, uid):
241 def _unregister_engine(self, uid):
242 """Existing engine with ident `uid` became unavailable."""
242 """Existing engine with ident `uid` became unavailable."""
243 if len(self.targets) == 1:
243 if len(self.targets) == 1:
244 # this was our only engine
244 # this was our only engine
245 self.stop_receiving()
245 self.stop_receiving()
246
246
247 # handle any potentially finished tasks:
247 # handle any potentially finished tasks:
248 self.engine_stream.flush()
248 self.engine_stream.flush()
249
249
250 # don't pop destinations, because they might be used later
250 # don't pop destinations, because they might be used later
251 # map(self.destinations.pop, self.completed.pop(uid))
251 # map(self.destinations.pop, self.completed.pop(uid))
252 # map(self.destinations.pop, self.failed.pop(uid))
252 # map(self.destinations.pop, self.failed.pop(uid))
253
253
254 # prevent this engine from receiving work
254 # prevent this engine from receiving work
255 idx = self.targets.index(uid)
255 idx = self.targets.index(uid)
256 self.targets.pop(idx)
256 self.targets.pop(idx)
257 self.loads.pop(idx)
257 self.loads.pop(idx)
258
258
259 # wait 5 seconds before cleaning up pending jobs, since the results might
259 # wait 5 seconds before cleaning up pending jobs, since the results might
260 # still be incoming
260 # still be incoming
261 if self.pending[uid]:
261 if self.pending[uid]:
262 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
262 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
263 dc.start()
263 dc.start()
264 else:
264 else:
265 self.completed.pop(uid)
265 self.completed.pop(uid)
266 self.failed.pop(uid)
266 self.failed.pop(uid)
267
267
268
268
269 @logged
270 def handle_stranded_tasks(self, engine):
269 def handle_stranded_tasks(self, engine):
271 """Deal with jobs resident in an engine that died."""
270 """Deal with jobs resident in an engine that died."""
272 lost = self.pending[engine]
271 lost = self.pending[engine]
273 for msg_id in lost.keys():
272 for msg_id in lost.keys():
274 if msg_id not in self.pending[engine]:
273 if msg_id not in self.pending[engine]:
275 # prevent double-handling of messages
274 # prevent double-handling of messages
276 continue
275 continue
277
276
278 raw_msg = lost[msg_id][0]
277 raw_msg = lost[msg_id][0]
279 idents,msg = self.session.feed_identities(raw_msg, copy=False)
278 idents,msg = self.session.feed_identities(raw_msg, copy=False)
280 parent = self.session.unpack(msg[1].bytes)
279 parent = self.session.unpack(msg[1].bytes)
281 idents = [engine, idents[0]]
280 idents = [engine, idents[0]]
282
281
283 # build fake error reply
282 # build fake error reply
284 try:
283 try:
285 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
284 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
286 except:
285 except:
287 content = error.wrap_exception()
286 content = error.wrap_exception()
288 msg = self.session.msg('apply_reply', content, parent=parent, subheader={'status':'error'})
287 msg = self.session.msg('apply_reply', content, parent=parent, subheader={'status':'error'})
289 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
288 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
290 # and dispatch it
289 # and dispatch it
291 self.dispatch_result(raw_reply)
290 self.dispatch_result(raw_reply)
292
291
293 # finally scrub completed/failed lists
292 # finally scrub completed/failed lists
294 self.completed.pop(engine)
293 self.completed.pop(engine)
295 self.failed.pop(engine)
294 self.failed.pop(engine)
296
295
297
296
298 #-----------------------------------------------------------------------
297 #-----------------------------------------------------------------------
299 # Job Submission
298 # Job Submission
300 #-----------------------------------------------------------------------
299 #-----------------------------------------------------------------------
301 @logged
302 def dispatch_submission(self, raw_msg):
300 def dispatch_submission(self, raw_msg):
303 """Dispatch job submission to appropriate handlers."""
301 """Dispatch job submission to appropriate handlers."""
304 # ensure targets up to date:
302 # ensure targets up to date:
305 self.notifier_stream.flush()
303 self.notifier_stream.flush()
306 try:
304 try:
307 idents, msg = self.session.feed_identities(raw_msg, copy=False)
305 idents, msg = self.session.feed_identities(raw_msg, copy=False)
308 msg = self.session.unpack_message(msg, content=False, copy=False)
306 msg = self.session.unpack_message(msg, content=False, copy=False)
309 except Exception:
307 except Exception:
310 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
308 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
311 return
309 return
312
310
313
311
314 # send to monitor
312 # send to monitor
315 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
313 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
316
314
317 header = msg['header']
315 header = msg['header']
318 msg_id = header['msg_id']
316 msg_id = header['msg_id']
319 self.all_ids.add(msg_id)
317 self.all_ids.add(msg_id)
320
318
321 # targets
319 # targets
322 targets = set(header.get('targets', []))
320 targets = set(header.get('targets', []))
323 retries = header.get('retries', 0)
321 retries = header.get('retries', 0)
324 self.retries[msg_id] = retries
322 self.retries[msg_id] = retries
325
323
326 # time dependencies
324 # time dependencies
327 after = Dependency(header.get('after', []))
325 after = Dependency(header.get('after', []))
328 if after.all:
326 if after.all:
329 if after.success:
327 if after.success:
330 after.difference_update(self.all_completed)
328 after.difference_update(self.all_completed)
331 if after.failure:
329 if after.failure:
332 after.difference_update(self.all_failed)
330 after.difference_update(self.all_failed)
333 if after.check(self.all_completed, self.all_failed):
331 if after.check(self.all_completed, self.all_failed):
334 # recast as empty set, if `after` already met,
332 # recast as empty set, if `after` already met,
335 # to prevent unnecessary set comparisons
333 # to prevent unnecessary set comparisons
336 after = MET
334 after = MET
337
335
338 # location dependencies
336 # location dependencies
339 follow = Dependency(header.get('follow', []))
337 follow = Dependency(header.get('follow', []))
340
338
341 # turn timeouts into datetime objects:
339 # turn timeouts into datetime objects:
342 timeout = header.get('timeout', None)
340 timeout = header.get('timeout', None)
343 if timeout:
341 if timeout:
344 timeout = datetime.now() + timedelta(0,timeout,0)
342 timeout = datetime.now() + timedelta(0,timeout,0)
345
343
346 args = [raw_msg, targets, after, follow, timeout]
344 args = [raw_msg, targets, after, follow, timeout]
347
345
348 # validate and reduce dependencies:
346 # validate and reduce dependencies:
349 for dep in after,follow:
347 for dep in after,follow:
350 # check valid:
348 # check valid:
351 if msg_id in dep or dep.difference(self.all_ids):
349 if msg_id in dep or dep.difference(self.all_ids):
352 self.depending[msg_id] = args
350 self.depending[msg_id] = args
353 return self.fail_unreachable(msg_id, error.InvalidDependency)
351 return self.fail_unreachable(msg_id, error.InvalidDependency)
354 # check if unreachable:
352 # check if unreachable:
355 if dep.unreachable(self.all_completed, self.all_failed):
353 if dep.unreachable(self.all_completed, self.all_failed):
356 self.depending[msg_id] = args
354 self.depending[msg_id] = args
357 return self.fail_unreachable(msg_id)
355 return self.fail_unreachable(msg_id)
358
356
359 if after.check(self.all_completed, self.all_failed):
357 if after.check(self.all_completed, self.all_failed):
360 # time deps already met, try to run
358 # time deps already met, try to run
361 if not self.maybe_run(msg_id, *args):
359 if not self.maybe_run(msg_id, *args):
362 # can't run yet
360 # can't run yet
363 if msg_id not in self.all_failed:
361 if msg_id not in self.all_failed:
364 # could have failed as unreachable
362 # could have failed as unreachable
365 self.save_unmet(msg_id, *args)
363 self.save_unmet(msg_id, *args)
366 else:
364 else:
367 self.save_unmet(msg_id, *args)
365 self.save_unmet(msg_id, *args)
368
366
369 # @logged
370 def audit_timeouts(self):
367 def audit_timeouts(self):
371 """Audit all waiting tasks for expired timeouts."""
368 """Audit all waiting tasks for expired timeouts."""
372 now = datetime.now()
369 now = datetime.now()
373 for msg_id in self.depending.keys():
370 for msg_id in self.depending.keys():
374 # must recheck, in case one failure cascaded to another:
371 # must recheck, in case one failure cascaded to another:
375 if msg_id in self.depending:
372 if msg_id in self.depending:
376 raw,after,targets,follow,timeout = self.depending[msg_id]
373 raw,after,targets,follow,timeout = self.depending[msg_id]
377 if timeout and timeout < now:
374 if timeout and timeout < now:
378 self.fail_unreachable(msg_id, error.TaskTimeout)
375 self.fail_unreachable(msg_id, error.TaskTimeout)
379
376
380 @logged
381 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
377 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
382 """a task has become unreachable, send a reply with an ImpossibleDependency
378 """a task has become unreachable, send a reply with an ImpossibleDependency
383 error."""
379 error."""
384 if msg_id not in self.depending:
380 if msg_id not in self.depending:
385 self.log.error("msg %r already failed!"%msg_id)
381 self.log.error("msg %r already failed!", msg_id)
386 return
382 return
387 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
383 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
388 for mid in follow.union(after):
384 for mid in follow.union(after):
389 if mid in self.graph:
385 if mid in self.graph:
390 self.graph[mid].remove(msg_id)
386 self.graph[mid].remove(msg_id)
391
387
392 # FIXME: unpacking a message I've already unpacked, but didn't save:
388 # FIXME: unpacking a message I've already unpacked, but didn't save:
393 idents,msg = self.session.feed_identities(raw_msg, copy=False)
389 idents,msg = self.session.feed_identities(raw_msg, copy=False)
394 header = self.session.unpack(msg[1].bytes)
390 header = self.session.unpack(msg[1].bytes)
395
391
396 try:
392 try:
397 raise why()
393 raise why()
398 except:
394 except:
399 content = error.wrap_exception()
395 content = error.wrap_exception()
400
396
401 self.all_done.add(msg_id)
397 self.all_done.add(msg_id)
402 self.all_failed.add(msg_id)
398 self.all_failed.add(msg_id)
403
399
404 msg = self.session.send(self.client_stream, 'apply_reply', content,
400 msg = self.session.send(self.client_stream, 'apply_reply', content,
405 parent=header, ident=idents)
401 parent=header, ident=idents)
406 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
402 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
407
403
408 self.update_graph(msg_id, success=False)
404 self.update_graph(msg_id, success=False)
409
405
410 @logged
411 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
406 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
412 """check location dependencies, and run if they are met."""
407 """check location dependencies, and run if they are met."""
413 blacklist = self.blacklist.setdefault(msg_id, set())
408 blacklist = self.blacklist.setdefault(msg_id, set())
414 if follow or targets or blacklist or self.hwm:
409 if follow or targets or blacklist or self.hwm:
415 # we need a can_run filter
410 # we need a can_run filter
416 def can_run(idx):
411 def can_run(idx):
417 # check hwm
412 # check hwm
418 if self.hwm and self.loads[idx] == self.hwm:
413 if self.hwm and self.loads[idx] == self.hwm:
419 return False
414 return False
420 target = self.targets[idx]
415 target = self.targets[idx]
421 # check blacklist
416 # check blacklist
422 if target in blacklist:
417 if target in blacklist:
423 return False
418 return False
424 # check targets
419 # check targets
425 if targets and target not in targets:
420 if targets and target not in targets:
426 return False
421 return False
427 # check follow
422 # check follow
428 return follow.check(self.completed[target], self.failed[target])
423 return follow.check(self.completed[target], self.failed[target])
429
424
430 indices = filter(can_run, range(len(self.targets)))
425 indices = filter(can_run, range(len(self.targets)))
431
426
432 if not indices:
427 if not indices:
433 # couldn't run
428 # couldn't run
434 if follow.all:
429 if follow.all:
435 # check follow for impossibility
430 # check follow for impossibility
436 dests = set()
431 dests = set()
437 relevant = set()
432 relevant = set()
438 if follow.success:
433 if follow.success:
439 relevant = self.all_completed
434 relevant = self.all_completed
440 if follow.failure:
435 if follow.failure:
441 relevant = relevant.union(self.all_failed)
436 relevant = relevant.union(self.all_failed)
442 for m in follow.intersection(relevant):
437 for m in follow.intersection(relevant):
443 dests.add(self.destinations[m])
438 dests.add(self.destinations[m])
444 if len(dests) > 1:
439 if len(dests) > 1:
445 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
440 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
446 self.fail_unreachable(msg_id)
441 self.fail_unreachable(msg_id)
447 return False
442 return False
448 if targets:
443 if targets:
449 # check blacklist+targets for impossibility
444 # check blacklist+targets for impossibility
450 targets.difference_update(blacklist)
445 targets.difference_update(blacklist)
451 if not targets or not targets.intersection(self.targets):
446 if not targets or not targets.intersection(self.targets):
452 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
447 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
453 self.fail_unreachable(msg_id)
448 self.fail_unreachable(msg_id)
454 return False
449 return False
455 return False
450 return False
456 else:
451 else:
457 indices = None
452 indices = None
458
453
459 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
454 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
460 return True
455 return True
461
456
462 @logged
463 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
457 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
464 """Save a message for later submission when its dependencies are met."""
458 """Save a message for later submission when its dependencies are met."""
465 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
459 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
466 # track the ids in follow or after, but not those already finished
460 # track the ids in follow or after, but not those already finished
467 for dep_id in after.union(follow).difference(self.all_done):
461 for dep_id in after.union(follow).difference(self.all_done):
468 if dep_id not in self.graph:
462 if dep_id not in self.graph:
469 self.graph[dep_id] = set()
463 self.graph[dep_id] = set()
470 self.graph[dep_id].add(msg_id)
464 self.graph[dep_id].add(msg_id)
471
465
472 @logged
473 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
466 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
474 """Submit a task to any of a subset of our targets."""
467 """Submit a task to any of a subset of our targets."""
475 if indices:
468 if indices:
476 loads = [self.loads[i] for i in indices]
469 loads = [self.loads[i] for i in indices]
477 else:
470 else:
478 loads = self.loads
471 loads = self.loads
479 idx = self.scheme(loads)
472 idx = self.scheme(loads)
480 if indices:
473 if indices:
481 idx = indices[idx]
474 idx = indices[idx]
482 target = self.targets[idx]
475 target = self.targets[idx]
483 # print (target, map(str, msg[:3]))
476 # print (target, map(str, msg[:3]))
484 # send job to the engine
477 # send job to the engine
485 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
478 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
486 self.engine_stream.send_multipart(raw_msg, copy=False)
479 self.engine_stream.send_multipart(raw_msg, copy=False)
487 # update load
480 # update load
488 self.add_job(idx)
481 self.add_job(idx)
489 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
482 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
490 # notify Hub
483 # notify Hub
491 content = dict(msg_id=msg_id, engine_id=target)
484 content = dict(msg_id=msg_id, engine_id=target)
492 self.session.send(self.mon_stream, 'task_destination', content=content,
485 self.session.send(self.mon_stream, 'task_destination', content=content,
493 ident=['tracktask',self.session.session])
486 ident=['tracktask',self.session.session])
494
487
495
488
496 #-----------------------------------------------------------------------
489 #-----------------------------------------------------------------------
497 # Result Handling
490 # Result Handling
498 #-----------------------------------------------------------------------
491 #-----------------------------------------------------------------------
499 @logged
500 def dispatch_result(self, raw_msg):
492 def dispatch_result(self, raw_msg):
501 """dispatch method for result replies"""
493 """dispatch method for result replies"""
502 try:
494 try:
503 idents,msg = self.session.feed_identities(raw_msg, copy=False)
495 idents,msg = self.session.feed_identities(raw_msg, copy=False)
504 msg = self.session.unpack_message(msg, content=False, copy=False)
496 msg = self.session.unpack_message(msg, content=False, copy=False)
505 engine = idents[0]
497 engine = idents[0]
506 try:
498 try:
507 idx = self.targets.index(engine)
499 idx = self.targets.index(engine)
508 except ValueError:
500 except ValueError:
509 pass # skip load-update for dead engines
501 pass # skip load-update for dead engines
510 else:
502 else:
511 self.finish_job(idx)
503 self.finish_job(idx)
512 except Exception:
504 except Exception:
513 self.log.error("task::Invaid result: %r"%raw_msg, exc_info=True)
505 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
514 return
506 return
515
507
516 header = msg['header']
508 header = msg['header']
517 parent = msg['parent_header']
509 parent = msg['parent_header']
518 if header.get('dependencies_met', True):
510 if header.get('dependencies_met', True):
519 success = (header['status'] == 'ok')
511 success = (header['status'] == 'ok')
520 msg_id = parent['msg_id']
512 msg_id = parent['msg_id']
521 retries = self.retries[msg_id]
513 retries = self.retries[msg_id]
522 if not success and retries > 0:
514 if not success and retries > 0:
523 # failed
515 # failed
524 self.retries[msg_id] = retries - 1
516 self.retries[msg_id] = retries - 1
525 self.handle_unmet_dependency(idents, parent)
517 self.handle_unmet_dependency(idents, parent)
526 else:
518 else:
527 del self.retries[msg_id]
519 del self.retries[msg_id]
528 # relay to client and update graph
520 # relay to client and update graph
529 self.handle_result(idents, parent, raw_msg, success)
521 self.handle_result(idents, parent, raw_msg, success)
530 # send to Hub monitor
522 # send to Hub monitor
531 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
523 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
532 else:
524 else:
533 self.handle_unmet_dependency(idents, parent)
525 self.handle_unmet_dependency(idents, parent)
534
526
535 @logged
536 def handle_result(self, idents, parent, raw_msg, success=True):
527 def handle_result(self, idents, parent, raw_msg, success=True):
537 """handle a real task result, either success or failure"""
528 """handle a real task result, either success or failure"""
538 # first, relay result to client
529 # first, relay result to client
539 engine = idents[0]
530 engine = idents[0]
540 client = idents[1]
531 client = idents[1]
541 # swap_ids for XREP-XREP mirror
532 # swap_ids for XREP-XREP mirror
542 raw_msg[:2] = [client,engine]
533 raw_msg[:2] = [client,engine]
543 # print (map(str, raw_msg[:4]))
534 # print (map(str, raw_msg[:4]))
544 self.client_stream.send_multipart(raw_msg, copy=False)
535 self.client_stream.send_multipart(raw_msg, copy=False)
545 # now, update our data structures
536 # now, update our data structures
546 msg_id = parent['msg_id']
537 msg_id = parent['msg_id']
547 self.blacklist.pop(msg_id, None)
538 self.blacklist.pop(msg_id, None)
548 self.pending[engine].pop(msg_id)
539 self.pending[engine].pop(msg_id)
549 if success:
540 if success:
550 self.completed[engine].add(msg_id)
541 self.completed[engine].add(msg_id)
551 self.all_completed.add(msg_id)
542 self.all_completed.add(msg_id)
552 else:
543 else:
553 self.failed[engine].add(msg_id)
544 self.failed[engine].add(msg_id)
554 self.all_failed.add(msg_id)
545 self.all_failed.add(msg_id)
555 self.all_done.add(msg_id)
546 self.all_done.add(msg_id)
556 self.destinations[msg_id] = engine
547 self.destinations[msg_id] = engine
557
548
558 self.update_graph(msg_id, success)
549 self.update_graph(msg_id, success)
559
550
560 @logged
561 def handle_unmet_dependency(self, idents, parent):
551 def handle_unmet_dependency(self, idents, parent):
562 """handle an unmet dependency"""
552 """handle an unmet dependency"""
563 engine = idents[0]
553 engine = idents[0]
564 msg_id = parent['msg_id']
554 msg_id = parent['msg_id']
565
555
566 if msg_id not in self.blacklist:
556 if msg_id not in self.blacklist:
567 self.blacklist[msg_id] = set()
557 self.blacklist[msg_id] = set()
568 self.blacklist[msg_id].add(engine)
558 self.blacklist[msg_id].add(engine)
569
559
570 args = self.pending[engine].pop(msg_id)
560 args = self.pending[engine].pop(msg_id)
571 raw,targets,after,follow,timeout = args
561 raw,targets,after,follow,timeout = args
572
562
573 if self.blacklist[msg_id] == targets:
563 if self.blacklist[msg_id] == targets:
574 self.depending[msg_id] = args
564 self.depending[msg_id] = args
575 self.fail_unreachable(msg_id)
565 self.fail_unreachable(msg_id)
576 elif not self.maybe_run(msg_id, *args):
566 elif not self.maybe_run(msg_id, *args):
577 # resubmit failed
567 # resubmit failed
578 if msg_id not in self.all_failed:
568 if msg_id not in self.all_failed:
579 # put it back in our dependency tree
569 # put it back in our dependency tree
580 self.save_unmet(msg_id, *args)
570 self.save_unmet(msg_id, *args)
581
571
582 if self.hwm:
572 if self.hwm:
583 try:
573 try:
584 idx = self.targets.index(engine)
574 idx = self.targets.index(engine)
585 except ValueError:
575 except ValueError:
586 pass # skip load-update for dead engines
576 pass # skip load-update for dead engines
587 else:
577 else:
588 if self.loads[idx] == self.hwm-1:
578 if self.loads[idx] == self.hwm-1:
589 self.update_graph(None)
579 self.update_graph(None)
590
580
591
581
592
582
593 @logged
594 def update_graph(self, dep_id=None, success=True):
583 def update_graph(self, dep_id=None, success=True):
595 """dep_id just finished. Update our dependency
584 """dep_id just finished. Update our dependency
596 graph and submit any jobs that just became runable.
585 graph and submit any jobs that just became runable.
597
586
598 Called with dep_id=None to update entire graph for hwm, but without finishing
587 Called with dep_id=None to update entire graph for hwm, but without finishing
599 a task.
588 a task.
600 """
589 """
601 # print ("\n\n***********")
590 # print ("\n\n***********")
602 # pprint (dep_id)
591 # pprint (dep_id)
603 # pprint (self.graph)
592 # pprint (self.graph)
604 # pprint (self.depending)
593 # pprint (self.depending)
605 # pprint (self.all_completed)
594 # pprint (self.all_completed)
606 # pprint (self.all_failed)
595 # pprint (self.all_failed)
607 # print ("\n\n***********\n\n")
596 # print ("\n\n***********\n\n")
608 # update any jobs that depended on the dependency
597 # update any jobs that depended on the dependency
609 jobs = self.graph.pop(dep_id, [])
598 jobs = self.graph.pop(dep_id, [])
610
599
611 # recheck *all* jobs if
600 # recheck *all* jobs if
612 # a) we have HWM and an engine just become no longer full
601 # a) we have HWM and an engine just become no longer full
613 # or b) dep_id was given as None
602 # or b) dep_id was given as None
614 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
603 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
615 jobs = self.depending.keys()
604 jobs = self.depending.keys()
616
605
617 for msg_id in jobs:
606 for msg_id in jobs:
618 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
607 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
619
608
620 if after.unreachable(self.all_completed, self.all_failed)\
609 if after.unreachable(self.all_completed, self.all_failed)\
621 or follow.unreachable(self.all_completed, self.all_failed):
610 or follow.unreachable(self.all_completed, self.all_failed):
622 self.fail_unreachable(msg_id)
611 self.fail_unreachable(msg_id)
623
612
624 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
613 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
625 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
614 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
626
615
627 self.depending.pop(msg_id)
616 self.depending.pop(msg_id)
628 for mid in follow.union(after):
617 for mid in follow.union(after):
629 if mid in self.graph:
618 if mid in self.graph:
630 self.graph[mid].remove(msg_id)
619 self.graph[mid].remove(msg_id)
631
620
632 #----------------------------------------------------------------------
621 #----------------------------------------------------------------------
633 # methods to be overridden by subclasses
622 # methods to be overridden by subclasses
634 #----------------------------------------------------------------------
623 #----------------------------------------------------------------------
635
624
636 def add_job(self, idx):
625 def add_job(self, idx):
637 """Called after self.targets[idx] just got the job with header.
626 """Called after self.targets[idx] just got the job with header.
638 Override with subclasses. The default ordering is simple LRU.
627 Override with subclasses. The default ordering is simple LRU.
639 The default loads are the number of outstanding jobs."""
628 The default loads are the number of outstanding jobs."""
640 self.loads[idx] += 1
629 self.loads[idx] += 1
641 for lis in (self.targets, self.loads):
630 for lis in (self.targets, self.loads):
642 lis.append(lis.pop(idx))
631 lis.append(lis.pop(idx))
643
632
644
633
645 def finish_job(self, idx):
634 def finish_job(self, idx):
646 """Called after self.targets[idx] just finished a job.
635 """Called after self.targets[idx] just finished a job.
647 Override with subclasses."""
636 Override with subclasses."""
648 self.loads[idx] -= 1
637 self.loads[idx] -= 1
649
638
650
639
651
640
652 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
641 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
653 logname='root', log_url=None, loglevel=logging.DEBUG,
642 logname='root', log_url=None, loglevel=logging.DEBUG,
654 identity=b'task', in_thread=False):
643 identity=b'task', in_thread=False):
655
644
656 ZMQStream = zmqstream.ZMQStream
645 ZMQStream = zmqstream.ZMQStream
657
646
658 if config:
647 if config:
659 # unwrap dict back into Config
648 # unwrap dict back into Config
660 config = Config(config)
649 config = Config(config)
661
650
662 if in_thread:
651 if in_thread:
663 # use instance() to get the same Context/Loop as our parent
652 # use instance() to get the same Context/Loop as our parent
664 ctx = zmq.Context.instance()
653 ctx = zmq.Context.instance()
665 loop = ioloop.IOLoop.instance()
654 loop = ioloop.IOLoop.instance()
666 else:
655 else:
667 # in a process, don't use instance()
656 # in a process, don't use instance()
668 # for safety with multiprocessing
657 # for safety with multiprocessing
669 ctx = zmq.Context()
658 ctx = zmq.Context()
670 loop = ioloop.IOLoop()
659 loop = ioloop.IOLoop()
671 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
660 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
672 ins.setsockopt(zmq.IDENTITY, identity)
661 ins.setsockopt(zmq.IDENTITY, identity)
673 ins.bind(in_addr)
662 ins.bind(in_addr)
674
663
675 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
664 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
676 outs.setsockopt(zmq.IDENTITY, identity)
665 outs.setsockopt(zmq.IDENTITY, identity)
677 outs.bind(out_addr)
666 outs.bind(out_addr)
678 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
667 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
679 mons.connect(mon_addr)
668 mons.connect(mon_addr)
680 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
669 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
681 nots.setsockopt(zmq.SUBSCRIBE, b'')
670 nots.setsockopt(zmq.SUBSCRIBE, b'')
682 nots.connect(not_addr)
671 nots.connect(not_addr)
683
672
684 # setup logging.
673 # setup logging.
685 if in_thread:
674 if in_thread:
686 log = Application.instance().log
675 log = Application.instance().log
687 else:
676 else:
688 if log_url:
677 if log_url:
689 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
678 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
690 else:
679 else:
691 log = local_logger(logname, loglevel)
680 log = local_logger(logname, loglevel)
692
681
693 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
682 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
694 mon_stream=mons, notifier_stream=nots,
683 mon_stream=mons, notifier_stream=nots,
695 loop=loop, log=log,
684 loop=loop, log=log,
696 config=config)
685 config=config)
697 scheduler.start()
686 scheduler.start()
698 if not in_thread:
687 if not in_thread:
699 try:
688 try:
700 loop.start()
689 loop.start()
701 except KeyboardInterrupt:
690 except KeyboardInterrupt:
702 print ("interrupted, exiting...", file=sys.__stderr__)
691 print ("interrupted, exiting...", file=sys.__stderr__)
703
692
General Comments 0
You need to be logged in to leave comments. Login now