##// END OF EJS Templates
use per-timeout callback, rather than audit for timeouts
MinRK -
Show More
@@ -1,844 +1,846 b''
1 """The Python scheduler for rich scheduling.
1 """The Python scheduler for rich scheduling.
2
2
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 Python Scheduler exists.
5 Python Scheduler exists.
6
6
7 Authors:
7 Authors:
8
8
9 * Min RK
9 * Min RK
10 """
10 """
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2010-2011 The IPython Development Team
12 # Copyright (C) 2010-2011 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #----------------------------------------------------------------------
18 #----------------------------------------------------------------------
19 # Imports
19 # Imports
20 #----------------------------------------------------------------------
20 #----------------------------------------------------------------------
21
21
22 import heapq
22 import heapq
23 import logging
23 import logging
24 import sys
24 import sys
25 import time
25 import time
26
26
27 from datetime import datetime, timedelta
27 from datetime import datetime, timedelta
28 from random import randint, random
28 from random import randint, random
29 from types import FunctionType
29 from types import FunctionType
30
30
31 try:
31 try:
32 import numpy
32 import numpy
33 except ImportError:
33 except ImportError:
34 numpy = None
34 numpy = None
35
35
36 import zmq
36 import zmq
37 from zmq.eventloop import ioloop, zmqstream
37 from zmq.eventloop import ioloop, zmqstream
38
38
39 # local imports
39 # local imports
40 from IPython.external.decorator import decorator
40 from IPython.external.decorator import decorator
41 from IPython.config.application import Application
41 from IPython.config.application import Application
42 from IPython.config.loader import Config
42 from IPython.config.loader import Config
43 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
43 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
44 from IPython.utils.py3compat import cast_bytes
44 from IPython.utils.py3compat import cast_bytes
45
45
46 from IPython.parallel import error, util
46 from IPython.parallel import error, util
47 from IPython.parallel.factory import SessionFactory
47 from IPython.parallel.factory import SessionFactory
48 from IPython.parallel.util import connect_logger, local_logger
48 from IPython.parallel.util import connect_logger, local_logger
49
49
50 from .dependency import Dependency
50 from .dependency import Dependency
51
51
52 @decorator
52 @decorator
53 def logged(f,self,*args,**kwargs):
53 def logged(f,self,*args,**kwargs):
54 # print ("#--------------------")
54 # print ("#--------------------")
55 self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs)
55 self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs)
56 # print ("#--")
56 # print ("#--")
57 return f(self,*args, **kwargs)
57 return f(self,*args, **kwargs)
58
58
59 #----------------------------------------------------------------------
59 #----------------------------------------------------------------------
60 # Chooser functions
60 # Chooser functions
61 #----------------------------------------------------------------------
61 #----------------------------------------------------------------------
62
62
63 def plainrandom(loads):
63 def plainrandom(loads):
64 """Plain random pick."""
64 """Plain random pick."""
65 n = len(loads)
65 n = len(loads)
66 return randint(0,n-1)
66 return randint(0,n-1)
67
67
68 def lru(loads):
68 def lru(loads):
69 """Always pick the front of the line.
69 """Always pick the front of the line.
70
70
71 The content of `loads` is ignored.
71 The content of `loads` is ignored.
72
72
73 Assumes LRU ordering of loads, with oldest first.
73 Assumes LRU ordering of loads, with oldest first.
74 """
74 """
75 return 0
75 return 0
76
76
77 def twobin(loads):
77 def twobin(loads):
78 """Pick two at random, use the LRU of the two.
78 """Pick two at random, use the LRU of the two.
79
79
80 The content of loads is ignored.
80 The content of loads is ignored.
81
81
82 Assumes LRU ordering of loads, with oldest first.
82 Assumes LRU ordering of loads, with oldest first.
83 """
83 """
84 n = len(loads)
84 n = len(loads)
85 a = randint(0,n-1)
85 a = randint(0,n-1)
86 b = randint(0,n-1)
86 b = randint(0,n-1)
87 return min(a,b)
87 return min(a,b)
88
88
89 def weighted(loads):
89 def weighted(loads):
90 """Pick two at random using inverse load as weight.
90 """Pick two at random using inverse load as weight.
91
91
92 Return the less loaded of the two.
92 Return the less loaded of the two.
93 """
93 """
94 # weight 0 a million times more than 1:
94 # weight 0 a million times more than 1:
95 weights = 1./(1e-6+numpy.array(loads))
95 weights = 1./(1e-6+numpy.array(loads))
96 sums = weights.cumsum()
96 sums = weights.cumsum()
97 t = sums[-1]
97 t = sums[-1]
98 x = random()*t
98 x = random()*t
99 y = random()*t
99 y = random()*t
100 idx = 0
100 idx = 0
101 idy = 0
101 idy = 0
102 while sums[idx] < x:
102 while sums[idx] < x:
103 idx += 1
103 idx += 1
104 while sums[idy] < y:
104 while sums[idy] < y:
105 idy += 1
105 idy += 1
106 if weights[idy] > weights[idx]:
106 if weights[idy] > weights[idx]:
107 return idy
107 return idy
108 else:
108 else:
109 return idx
109 return idx
110
110
111 def leastload(loads):
111 def leastload(loads):
112 """Always choose the lowest load.
112 """Always choose the lowest load.
113
113
114 If the lowest load occurs more than once, the first
114 If the lowest load occurs more than once, the first
115 occurance will be used. If loads has LRU ordering, this means
115 occurance will be used. If loads has LRU ordering, this means
116 the LRU of those with the lowest load is chosen.
116 the LRU of those with the lowest load is chosen.
117 """
117 """
118 return loads.index(min(loads))
118 return loads.index(min(loads))
119
119
120 #---------------------------------------------------------------------
120 #---------------------------------------------------------------------
121 # Classes
121 # Classes
122 #---------------------------------------------------------------------
122 #---------------------------------------------------------------------
123
123
124
124
125 # store empty default dependency:
125 # store empty default dependency:
126 MET = Dependency([])
126 MET = Dependency([])
127
127
128
128
129 class Job(object):
129 class Job(object):
130 """Simple container for a job"""
130 """Simple container for a job"""
131 def __init__(self, msg_id, raw_msg, idents, msg, header, metadata,
131 def __init__(self, msg_id, raw_msg, idents, msg, header, metadata,
132 targets, after, follow, timeout):
132 targets, after, follow, timeout):
133 self.msg_id = msg_id
133 self.msg_id = msg_id
134 self.raw_msg = raw_msg
134 self.raw_msg = raw_msg
135 self.idents = idents
135 self.idents = idents
136 self.msg = msg
136 self.msg = msg
137 self.header = header
137 self.header = header
138 self.metadata = metadata
138 self.metadata = metadata
139 self.targets = targets
139 self.targets = targets
140 self.after = after
140 self.after = after
141 self.follow = follow
141 self.follow = follow
142 self.timeout = timeout
142 self.timeout = timeout
143 self.removed = False # used for lazy-delete in heap-sorted queue
143 self.removed = False # used for lazy-delete in heap-sorted queue
144
144
145 self.timestamp = time.time()
145 self.timestamp = time.time()
146 self.blacklist = set()
146 self.blacklist = set()
147
147
148 def __lt__(self, other):
148 def __lt__(self, other):
149 return self.timestamp < other.timestamp
149 return self.timestamp < other.timestamp
150
150
151 def __cmp__(self, other):
151 def __cmp__(self, other):
152 return cmp(self.timestamp, other.timestamp)
152 return cmp(self.timestamp, other.timestamp)
153
153
154 @property
154 @property
155 def dependents(self):
155 def dependents(self):
156 return self.follow.union(self.after)
156 return self.follow.union(self.after)
157
157
158 class TaskScheduler(SessionFactory):
158 class TaskScheduler(SessionFactory):
159 """Python TaskScheduler object.
159 """Python TaskScheduler object.
160
160
161 This is the simplest object that supports msg_id based
161 This is the simplest object that supports msg_id based
162 DAG dependencies. *Only* task msg_ids are checked, not
162 DAG dependencies. *Only* task msg_ids are checked, not
163 msg_ids of jobs submitted via the MUX queue.
163 msg_ids of jobs submitted via the MUX queue.
164
164
165 """
165 """
166
166
167 hwm = Integer(1, config=True,
167 hwm = Integer(1, config=True,
168 help="""specify the High Water Mark (HWM) for the downstream
168 help="""specify the High Water Mark (HWM) for the downstream
169 socket in the Task scheduler. This is the maximum number
169 socket in the Task scheduler. This is the maximum number
170 of allowed outstanding tasks on each engine.
170 of allowed outstanding tasks on each engine.
171
171
172 The default (1) means that only one task can be outstanding on each
172 The default (1) means that only one task can be outstanding on each
173 engine. Setting TaskScheduler.hwm=0 means there is no limit, and the
173 engine. Setting TaskScheduler.hwm=0 means there is no limit, and the
174 engines continue to be assigned tasks while they are working,
174 engines continue to be assigned tasks while they are working,
175 effectively hiding network latency behind computation, but can result
175 effectively hiding network latency behind computation, but can result
176 in an imbalance of work when submitting many heterogenous tasks all at
176 in an imbalance of work when submitting many heterogenous tasks all at
177 once. Any positive value greater than one is a compromise between the
177 once. Any positive value greater than one is a compromise between the
178 two.
178 two.
179
179
180 """
180 """
181 )
181 )
182 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
182 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
183 'leastload', config=True, allow_none=False,
183 'leastload', config=True, allow_none=False,
184 help="""select the task scheduler scheme [default: Python LRU]
184 help="""select the task scheduler scheme [default: Python LRU]
185 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
185 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
186 )
186 )
187 def _scheme_name_changed(self, old, new):
187 def _scheme_name_changed(self, old, new):
188 self.log.debug("Using scheme %r"%new)
188 self.log.debug("Using scheme %r"%new)
189 self.scheme = globals()[new]
189 self.scheme = globals()[new]
190
190
191 # input arguments:
191 # input arguments:
192 scheme = Instance(FunctionType) # function for determining the destination
192 scheme = Instance(FunctionType) # function for determining the destination
193 def _scheme_default(self):
193 def _scheme_default(self):
194 return leastload
194 return leastload
195 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
195 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
196 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
196 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
197 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
197 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
198 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
198 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
199 query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream
199 query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream
200
200
201 # internals:
201 # internals:
202 queue = List() # heap-sorted list of Jobs
202 queue = List() # heap-sorted list of Jobs
203 queue_map = Dict() # dict by msg_id of Jobs (for O(1) access to the Queue)
203 queue_map = Dict() # dict by msg_id of Jobs (for O(1) access to the Queue)
204 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
204 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
205 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
205 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
206 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
206 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
207 pending = Dict() # dict by engine_uuid of submitted tasks
207 pending = Dict() # dict by engine_uuid of submitted tasks
208 completed = Dict() # dict by engine_uuid of completed tasks
208 completed = Dict() # dict by engine_uuid of completed tasks
209 failed = Dict() # dict by engine_uuid of failed tasks
209 failed = Dict() # dict by engine_uuid of failed tasks
210 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
210 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
211 clients = Dict() # dict by msg_id for who submitted the task
211 clients = Dict() # dict by msg_id for who submitted the task
212 targets = List() # list of target IDENTs
212 targets = List() # list of target IDENTs
213 loads = List() # list of engine loads
213 loads = List() # list of engine loads
214 # full = Set() # set of IDENTs that have HWM outstanding tasks
214 # full = Set() # set of IDENTs that have HWM outstanding tasks
215 all_completed = Set() # set of all completed tasks
215 all_completed = Set() # set of all completed tasks
216 all_failed = Set() # set of all failed tasks
216 all_failed = Set() # set of all failed tasks
217 all_done = Set() # set of all finished tasks=union(completed,failed)
217 all_done = Set() # set of all finished tasks=union(completed,failed)
218 all_ids = Set() # set of all submitted task IDs
218 all_ids = Set() # set of all submitted task IDs
219
219
220 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
221
222 ident = CBytes() # ZMQ identity. This should just be self.session.session
220 ident = CBytes() # ZMQ identity. This should just be self.session.session
223 # but ensure Bytes
221 # but ensure Bytes
224 def _ident_default(self):
222 def _ident_default(self):
225 return self.session.bsession
223 return self.session.bsession
226
224
227 def start(self):
225 def start(self):
228 self.query_stream.on_recv(self.dispatch_query_reply)
226 self.query_stream.on_recv(self.dispatch_query_reply)
229 self.session.send(self.query_stream, "connection_request", {})
227 self.session.send(self.query_stream, "connection_request", {})
230
228
231 self.engine_stream.on_recv(self.dispatch_result, copy=False)
229 self.engine_stream.on_recv(self.dispatch_result, copy=False)
232 self.client_stream.on_recv(self.dispatch_submission, copy=False)
230 self.client_stream.on_recv(self.dispatch_submission, copy=False)
233
231
234 self._notification_handlers = dict(
232 self._notification_handlers = dict(
235 registration_notification = self._register_engine,
233 registration_notification = self._register_engine,
236 unregistration_notification = self._unregister_engine
234 unregistration_notification = self._unregister_engine
237 )
235 )
238 self.notifier_stream.on_recv(self.dispatch_notification)
236 self.notifier_stream.on_recv(self.dispatch_notification)
239 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
237 self.log.info("Scheduler started [%s]" % self.scheme_name)
240 self.auditor.start()
241 self.log.info("Scheduler started [%s]"%self.scheme_name)
242
238
243 def resume_receiving(self):
239 def resume_receiving(self):
244 """Resume accepting jobs."""
240 """Resume accepting jobs."""
245 self.client_stream.on_recv(self.dispatch_submission, copy=False)
241 self.client_stream.on_recv(self.dispatch_submission, copy=False)
246
242
247 def stop_receiving(self):
243 def stop_receiving(self):
248 """Stop accepting jobs while there are no engines.
244 """Stop accepting jobs while there are no engines.
249 Leave them in the ZMQ queue."""
245 Leave them in the ZMQ queue."""
250 self.client_stream.on_recv(None)
246 self.client_stream.on_recv(None)
251
247
252 #-----------------------------------------------------------------------
248 #-----------------------------------------------------------------------
253 # [Un]Registration Handling
249 # [Un]Registration Handling
254 #-----------------------------------------------------------------------
250 #-----------------------------------------------------------------------
255
251
256
252
257 def dispatch_query_reply(self, msg):
253 def dispatch_query_reply(self, msg):
258 """handle reply to our initial connection request"""
254 """handle reply to our initial connection request"""
259 try:
255 try:
260 idents,msg = self.session.feed_identities(msg)
256 idents,msg = self.session.feed_identities(msg)
261 except ValueError:
257 except ValueError:
262 self.log.warn("task::Invalid Message: %r",msg)
258 self.log.warn("task::Invalid Message: %r",msg)
263 return
259 return
264 try:
260 try:
265 msg = self.session.unserialize(msg)
261 msg = self.session.unserialize(msg)
266 except ValueError:
262 except ValueError:
267 self.log.warn("task::Unauthorized message from: %r"%idents)
263 self.log.warn("task::Unauthorized message from: %r"%idents)
268 return
264 return
269
265
270 content = msg['content']
266 content = msg['content']
271 for uuid in content.get('engines', {}).values():
267 for uuid in content.get('engines', {}).values():
272 self._register_engine(cast_bytes(uuid))
268 self._register_engine(cast_bytes(uuid))
273
269
274
270
275 @util.log_errors
271 @util.log_errors
276 def dispatch_notification(self, msg):
272 def dispatch_notification(self, msg):
277 """dispatch register/unregister events."""
273 """dispatch register/unregister events."""
278 try:
274 try:
279 idents,msg = self.session.feed_identities(msg)
275 idents,msg = self.session.feed_identities(msg)
280 except ValueError:
276 except ValueError:
281 self.log.warn("task::Invalid Message: %r",msg)
277 self.log.warn("task::Invalid Message: %r",msg)
282 return
278 return
283 try:
279 try:
284 msg = self.session.unserialize(msg)
280 msg = self.session.unserialize(msg)
285 except ValueError:
281 except ValueError:
286 self.log.warn("task::Unauthorized message from: %r"%idents)
282 self.log.warn("task::Unauthorized message from: %r"%idents)
287 return
283 return
288
284
289 msg_type = msg['header']['msg_type']
285 msg_type = msg['header']['msg_type']
290
286
291 handler = self._notification_handlers.get(msg_type, None)
287 handler = self._notification_handlers.get(msg_type, None)
292 if handler is None:
288 if handler is None:
293 self.log.error("Unhandled message type: %r"%msg_type)
289 self.log.error("Unhandled message type: %r"%msg_type)
294 else:
290 else:
295 try:
291 try:
296 handler(cast_bytes(msg['content']['uuid']))
292 handler(cast_bytes(msg['content']['uuid']))
297 except Exception:
293 except Exception:
298 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
294 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
299
295
300 def _register_engine(self, uid):
296 def _register_engine(self, uid):
301 """New engine with ident `uid` became available."""
297 """New engine with ident `uid` became available."""
302 # head of the line:
298 # head of the line:
303 self.targets.insert(0,uid)
299 self.targets.insert(0,uid)
304 self.loads.insert(0,0)
300 self.loads.insert(0,0)
305
301
306 # initialize sets
302 # initialize sets
307 self.completed[uid] = set()
303 self.completed[uid] = set()
308 self.failed[uid] = set()
304 self.failed[uid] = set()
309 self.pending[uid] = {}
305 self.pending[uid] = {}
310
306
311 # rescan the graph:
307 # rescan the graph:
312 self.update_graph(None)
308 self.update_graph(None)
313
309
314 def _unregister_engine(self, uid):
310 def _unregister_engine(self, uid):
315 """Existing engine with ident `uid` became unavailable."""
311 """Existing engine with ident `uid` became unavailable."""
316 if len(self.targets) == 1:
312 if len(self.targets) == 1:
317 # this was our only engine
313 # this was our only engine
318 pass
314 pass
319
315
320 # handle any potentially finished tasks:
316 # handle any potentially finished tasks:
321 self.engine_stream.flush()
317 self.engine_stream.flush()
322
318
323 # don't pop destinations, because they might be used later
319 # don't pop destinations, because they might be used later
324 # map(self.destinations.pop, self.completed.pop(uid))
320 # map(self.destinations.pop, self.completed.pop(uid))
325 # map(self.destinations.pop, self.failed.pop(uid))
321 # map(self.destinations.pop, self.failed.pop(uid))
326
322
327 # prevent this engine from receiving work
323 # prevent this engine from receiving work
328 idx = self.targets.index(uid)
324 idx = self.targets.index(uid)
329 self.targets.pop(idx)
325 self.targets.pop(idx)
330 self.loads.pop(idx)
326 self.loads.pop(idx)
331
327
332 # wait 5 seconds before cleaning up pending jobs, since the results might
328 # wait 5 seconds before cleaning up pending jobs, since the results might
333 # still be incoming
329 # still be incoming
334 if self.pending[uid]:
330 if self.pending[uid]:
335 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
331 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
336 dc.start()
332 dc.start()
337 else:
333 else:
338 self.completed.pop(uid)
334 self.completed.pop(uid)
339 self.failed.pop(uid)
335 self.failed.pop(uid)
340
336
341
337
342 def handle_stranded_tasks(self, engine):
338 def handle_stranded_tasks(self, engine):
343 """Deal with jobs resident in an engine that died."""
339 """Deal with jobs resident in an engine that died."""
344 lost = self.pending[engine]
340 lost = self.pending[engine]
345 for msg_id in lost.keys():
341 for msg_id in lost.keys():
346 if msg_id not in self.pending[engine]:
342 if msg_id not in self.pending[engine]:
347 # prevent double-handling of messages
343 # prevent double-handling of messages
348 continue
344 continue
349
345
350 raw_msg = lost[msg_id].raw_msg
346 raw_msg = lost[msg_id].raw_msg
351 idents,msg = self.session.feed_identities(raw_msg, copy=False)
347 idents,msg = self.session.feed_identities(raw_msg, copy=False)
352 parent = self.session.unpack(msg[1].bytes)
348 parent = self.session.unpack(msg[1].bytes)
353 idents = [engine, idents[0]]
349 idents = [engine, idents[0]]
354
350
355 # build fake error reply
351 # build fake error reply
356 try:
352 try:
357 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
353 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
358 except:
354 except:
359 content = error.wrap_exception()
355 content = error.wrap_exception()
360 # build fake metadata
356 # build fake metadata
361 md = dict(
357 md = dict(
362 status=u'error',
358 status=u'error',
363 engine=engine,
359 engine=engine,
364 date=datetime.now(),
360 date=datetime.now(),
365 )
361 )
366 msg = self.session.msg('apply_reply', content, parent=parent, metadata=md)
362 msg = self.session.msg('apply_reply', content, parent=parent, metadata=md)
367 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
363 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
368 # and dispatch it
364 # and dispatch it
369 self.dispatch_result(raw_reply)
365 self.dispatch_result(raw_reply)
370
366
371 # finally scrub completed/failed lists
367 # finally scrub completed/failed lists
372 self.completed.pop(engine)
368 self.completed.pop(engine)
373 self.failed.pop(engine)
369 self.failed.pop(engine)
374
370
375
371
376 #-----------------------------------------------------------------------
372 #-----------------------------------------------------------------------
377 # Job Submission
373 # Job Submission
378 #-----------------------------------------------------------------------
374 #-----------------------------------------------------------------------
379
375
380
376
381 @util.log_errors
377 @util.log_errors
382 def dispatch_submission(self, raw_msg):
378 def dispatch_submission(self, raw_msg):
383 """Dispatch job submission to appropriate handlers."""
379 """Dispatch job submission to appropriate handlers."""
384 # ensure targets up to date:
380 # ensure targets up to date:
385 self.notifier_stream.flush()
381 self.notifier_stream.flush()
386 try:
382 try:
387 idents, msg = self.session.feed_identities(raw_msg, copy=False)
383 idents, msg = self.session.feed_identities(raw_msg, copy=False)
388 msg = self.session.unserialize(msg, content=False, copy=False)
384 msg = self.session.unserialize(msg, content=False, copy=False)
389 except Exception:
385 except Exception:
390 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
386 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
391 return
387 return
392
388
393
389
394 # send to monitor
390 # send to monitor
395 self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
391 self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
396
392
397 header = msg['header']
393 header = msg['header']
398 md = msg['metadata']
394 md = msg['metadata']
399 msg_id = header['msg_id']
395 msg_id = header['msg_id']
400 self.all_ids.add(msg_id)
396 self.all_ids.add(msg_id)
401
397
402 # get targets as a set of bytes objects
398 # get targets as a set of bytes objects
403 # from a list of unicode objects
399 # from a list of unicode objects
404 targets = md.get('targets', [])
400 targets = md.get('targets', [])
405 targets = map(cast_bytes, targets)
401 targets = map(cast_bytes, targets)
406 targets = set(targets)
402 targets = set(targets)
407
403
408 retries = md.get('retries', 0)
404 retries = md.get('retries', 0)
409 self.retries[msg_id] = retries
405 self.retries[msg_id] = retries
410
406
411 # time dependencies
407 # time dependencies
412 after = md.get('after', None)
408 after = md.get('after', None)
413 if after:
409 if after:
414 after = Dependency(after)
410 after = Dependency(after)
415 if after.all:
411 if after.all:
416 if after.success:
412 if after.success:
417 after = Dependency(after.difference(self.all_completed),
413 after = Dependency(after.difference(self.all_completed),
418 success=after.success,
414 success=after.success,
419 failure=after.failure,
415 failure=after.failure,
420 all=after.all,
416 all=after.all,
421 )
417 )
422 if after.failure:
418 if after.failure:
423 after = Dependency(after.difference(self.all_failed),
419 after = Dependency(after.difference(self.all_failed),
424 success=after.success,
420 success=after.success,
425 failure=after.failure,
421 failure=after.failure,
426 all=after.all,
422 all=after.all,
427 )
423 )
428 if after.check(self.all_completed, self.all_failed):
424 if after.check(self.all_completed, self.all_failed):
429 # recast as empty set, if `after` already met,
425 # recast as empty set, if `after` already met,
430 # to prevent unnecessary set comparisons
426 # to prevent unnecessary set comparisons
431 after = MET
427 after = MET
432 else:
428 else:
433 after = MET
429 after = MET
434
430
435 # location dependencies
431 # location dependencies
436 follow = Dependency(md.get('follow', []))
432 follow = Dependency(md.get('follow', []))
437
433
438 # turn timeouts into datetime objects:
434 # turn timeouts into datetime objects:
439 timeout = md.get('timeout', None)
435 timeout = md.get('timeout', None)
440 if timeout:
436 if timeout:
441 # cast to float, because jsonlib returns floats as decimal.Decimal,
437 timeout = time.time() + float(timeout)
442 # which timedelta does not accept
443 timeout = datetime.now() + timedelta(0,float(timeout),0)
444
438
445 job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg,
439 job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg,
446 header=header, targets=targets, after=after, follow=follow,
440 header=header, targets=targets, after=after, follow=follow,
447 timeout=timeout, metadata=md,
441 timeout=timeout, metadata=md,
448 )
442 )
449
443 if timeout:
444 # schedule timeout callback
445 self.loop.add_timeout(timeout, lambda : self.job_timeout(job))
446
450 # validate and reduce dependencies:
447 # validate and reduce dependencies:
451 for dep in after,follow:
448 for dep in after,follow:
452 if not dep: # empty dependency
449 if not dep: # empty dependency
453 continue
450 continue
454 # check valid:
451 # check valid:
455 if msg_id in dep or dep.difference(self.all_ids):
452 if msg_id in dep or dep.difference(self.all_ids):
456 self.queue_map[msg_id] = job
453 self.queue_map[msg_id] = job
457 return self.fail_unreachable(msg_id, error.InvalidDependency)
454 return self.fail_unreachable(msg_id, error.InvalidDependency)
458 # check if unreachable:
455 # check if unreachable:
459 if dep.unreachable(self.all_completed, self.all_failed):
456 if dep.unreachable(self.all_completed, self.all_failed):
460 self.queue_map[msg_id] = job
457 self.queue_map[msg_id] = job
461 return self.fail_unreachable(msg_id)
458 return self.fail_unreachable(msg_id)
462
459
463 if after.check(self.all_completed, self.all_failed):
460 if after.check(self.all_completed, self.all_failed):
464 # time deps already met, try to run
461 # time deps already met, try to run
465 if not self.maybe_run(job):
462 if not self.maybe_run(job):
466 # can't run yet
463 # can't run yet
467 if msg_id not in self.all_failed:
464 if msg_id not in self.all_failed:
468 # could have failed as unreachable
465 # could have failed as unreachable
469 self.save_unmet(job)
466 self.save_unmet(job)
470 else:
467 else:
471 self.save_unmet(job)
468 self.save_unmet(job)
472
469
473 def audit_timeouts(self):
470 def job_timeout(self, job):
474 """Audit all waiting tasks for expired timeouts."""
471 """callback for a job's timeout.
475 now = datetime.now()
472
476 for msg_id in self.queue_map.keys():
473 The job may or may not have been run at this point.
477 # must recheck, in case one failure cascaded to another:
474 """
478 if msg_id in self.queue_map:
475 if job.timeout >= (time.time() + 1):
479 job = self.queue_map[msg_id]
476 self.log.warn("task %s timeout fired prematurely: %s > %s",
480 if job.timeout and job.timeout < now:
477 job.msg_id, job.timeout, now
481 self.fail_unreachable(msg_id, error.TaskTimeout)
478 )
479 if job.msg_id in self.queue_map:
480 # still waiting, but ran out of time
481 self.log.info("task %r timed out", job.msg_id)
482 self.fail_unreachable(job.msg_id, error.TaskTimeout)
482
483
483 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
484 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
484 """a task has become unreachable, send a reply with an ImpossibleDependency
485 """a task has become unreachable, send a reply with an ImpossibleDependency
485 error."""
486 error."""
486 if msg_id not in self.queue_map:
487 if msg_id not in self.queue_map:
487 self.log.error("msg %r already failed!", msg_id)
488 self.log.error("task %r already failed!", msg_id)
488 return
489 return
489 job = self.queue_map.pop(msg_id)
490 job = self.queue_map.pop(msg_id)
490 # lazy-delete from the queue
491 # lazy-delete from the queue
491 job.removed = True
492 job.removed = True
492 for mid in job.dependents:
493 for mid in job.dependents:
493 if mid in self.graph:
494 if mid in self.graph:
494 self.graph[mid].remove(msg_id)
495 self.graph[mid].remove(msg_id)
495
496
496 try:
497 try:
497 raise why()
498 raise why()
498 except:
499 except:
499 content = error.wrap_exception()
500 content = error.wrap_exception()
501 self.log.debug("task %r failing as unreachable with: %s", msg_id, content['ename'])
500
502
501 self.all_done.add(msg_id)
503 self.all_done.add(msg_id)
502 self.all_failed.add(msg_id)
504 self.all_failed.add(msg_id)
503
505
504 msg = self.session.send(self.client_stream, 'apply_reply', content,
506 msg = self.session.send(self.client_stream, 'apply_reply', content,
505 parent=job.header, ident=job.idents)
507 parent=job.header, ident=job.idents)
506 self.session.send(self.mon_stream, msg, ident=[b'outtask']+job.idents)
508 self.session.send(self.mon_stream, msg, ident=[b'outtask']+job.idents)
507
509
508 self.update_graph(msg_id, success=False)
510 self.update_graph(msg_id, success=False)
509
511
510 def available_engines(self):
512 def available_engines(self):
511 """return a list of available engine indices based on HWM"""
513 """return a list of available engine indices based on HWM"""
512 if not self.hwm:
514 if not self.hwm:
513 return range(len(self.targets))
515 return range(len(self.targets))
514 available = []
516 available = []
515 for idx in range(len(self.targets)):
517 for idx in range(len(self.targets)):
516 if self.loads[idx] < self.hwm:
518 if self.loads[idx] < self.hwm:
517 available.append(idx)
519 available.append(idx)
518 return available
520 return available
519
521
520 def maybe_run(self, job):
522 def maybe_run(self, job):
521 """check location dependencies, and run if they are met."""
523 """check location dependencies, and run if they are met."""
522 msg_id = job.msg_id
524 msg_id = job.msg_id
523 self.log.debug("Attempting to assign task %s", msg_id)
525 self.log.debug("Attempting to assign task %s", msg_id)
524 available = self.available_engines()
526 available = self.available_engines()
525 if not available:
527 if not available:
526 # no engines, definitely can't run
528 # no engines, definitely can't run
527 return False
529 return False
528
530
529 if job.follow or job.targets or job.blacklist or self.hwm:
531 if job.follow or job.targets or job.blacklist or self.hwm:
530 # we need a can_run filter
532 # we need a can_run filter
531 def can_run(idx):
533 def can_run(idx):
532 # check hwm
534 # check hwm
533 if self.hwm and self.loads[idx] == self.hwm:
535 if self.hwm and self.loads[idx] == self.hwm:
534 return False
536 return False
535 target = self.targets[idx]
537 target = self.targets[idx]
536 # check blacklist
538 # check blacklist
537 if target in job.blacklist:
539 if target in job.blacklist:
538 return False
540 return False
539 # check targets
541 # check targets
540 if job.targets and target not in job.targets:
542 if job.targets and target not in job.targets:
541 return False
543 return False
542 # check follow
544 # check follow
543 return job.follow.check(self.completed[target], self.failed[target])
545 return job.follow.check(self.completed[target], self.failed[target])
544
546
545 indices = filter(can_run, available)
547 indices = filter(can_run, available)
546
548
547 if not indices:
549 if not indices:
548 # couldn't run
550 # couldn't run
549 if job.follow.all:
551 if job.follow.all:
550 # check follow for impossibility
552 # check follow for impossibility
551 dests = set()
553 dests = set()
552 relevant = set()
554 relevant = set()
553 if job.follow.success:
555 if job.follow.success:
554 relevant = self.all_completed
556 relevant = self.all_completed
555 if job.follow.failure:
557 if job.follow.failure:
556 relevant = relevant.union(self.all_failed)
558 relevant = relevant.union(self.all_failed)
557 for m in job.follow.intersection(relevant):
559 for m in job.follow.intersection(relevant):
558 dests.add(self.destinations[m])
560 dests.add(self.destinations[m])
559 if len(dests) > 1:
561 if len(dests) > 1:
560 self.queue_map[msg_id] = job
562 self.queue_map[msg_id] = job
561 self.fail_unreachable(msg_id)
563 self.fail_unreachable(msg_id)
562 return False
564 return False
563 if job.targets:
565 if job.targets:
564 # check blacklist+targets for impossibility
566 # check blacklist+targets for impossibility
565 job.targets.difference_update(job.blacklist)
567 job.targets.difference_update(job.blacklist)
566 if not job.targets or not job.targets.intersection(self.targets):
568 if not job.targets or not job.targets.intersection(self.targets):
567 self.queue_map[msg_id] = job
569 self.queue_map[msg_id] = job
568 self.fail_unreachable(msg_id)
570 self.fail_unreachable(msg_id)
569 return False
571 return False
570 return False
572 return False
571 else:
573 else:
572 indices = None
574 indices = None
573
575
574 self.submit_task(job, indices)
576 self.submit_task(job, indices)
575 return True
577 return True
576
578
577 def save_unmet(self, job):
579 def save_unmet(self, job):
578 """Save a message for later submission when its dependencies are met."""
580 """Save a message for later submission when its dependencies are met."""
579 msg_id = job.msg_id
581 msg_id = job.msg_id
580 self.log.debug("Adding task %s to the queue", msg_id)
582 self.log.debug("Adding task %s to the queue", msg_id)
581 self.queue_map[msg_id] = job
583 self.queue_map[msg_id] = job
582 heapq.heappush(self.queue, job)
584 heapq.heappush(self.queue, job)
583 # track the ids in follow or after, but not those already finished
585 # track the ids in follow or after, but not those already finished
584 for dep_id in job.after.union(job.follow).difference(self.all_done):
586 for dep_id in job.after.union(job.follow).difference(self.all_done):
585 if dep_id not in self.graph:
587 if dep_id not in self.graph:
586 self.graph[dep_id] = set()
588 self.graph[dep_id] = set()
587 self.graph[dep_id].add(msg_id)
589 self.graph[dep_id].add(msg_id)
588
590
589 def submit_task(self, job, indices=None):
591 def submit_task(self, job, indices=None):
590 """Submit a task to any of a subset of our targets."""
592 """Submit a task to any of a subset of our targets."""
591 if indices:
593 if indices:
592 loads = [self.loads[i] for i in indices]
594 loads = [self.loads[i] for i in indices]
593 else:
595 else:
594 loads = self.loads
596 loads = self.loads
595 idx = self.scheme(loads)
597 idx = self.scheme(loads)
596 if indices:
598 if indices:
597 idx = indices[idx]
599 idx = indices[idx]
598 target = self.targets[idx]
600 target = self.targets[idx]
599 # print (target, map(str, msg[:3]))
601 # print (target, map(str, msg[:3]))
600 # send job to the engine
602 # send job to the engine
601 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
603 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
602 self.engine_stream.send_multipart(job.raw_msg, copy=False)
604 self.engine_stream.send_multipart(job.raw_msg, copy=False)
603 # update load
605 # update load
604 self.add_job(idx)
606 self.add_job(idx)
605 self.pending[target][job.msg_id] = job
607 self.pending[target][job.msg_id] = job
606 # notify Hub
608 # notify Hub
607 content = dict(msg_id=job.msg_id, engine_id=target.decode('ascii'))
609 content = dict(msg_id=job.msg_id, engine_id=target.decode('ascii'))
608 self.session.send(self.mon_stream, 'task_destination', content=content,
610 self.session.send(self.mon_stream, 'task_destination', content=content,
609 ident=[b'tracktask',self.ident])
611 ident=[b'tracktask',self.ident])
610
612
611
613
612 #-----------------------------------------------------------------------
614 #-----------------------------------------------------------------------
613 # Result Handling
615 # Result Handling
614 #-----------------------------------------------------------------------
616 #-----------------------------------------------------------------------
615
617
616
618
617 @util.log_errors
619 @util.log_errors
618 def dispatch_result(self, raw_msg):
620 def dispatch_result(self, raw_msg):
619 """dispatch method for result replies"""
621 """dispatch method for result replies"""
620 try:
622 try:
621 idents,msg = self.session.feed_identities(raw_msg, copy=False)
623 idents,msg = self.session.feed_identities(raw_msg, copy=False)
622 msg = self.session.unserialize(msg, content=False, copy=False)
624 msg = self.session.unserialize(msg, content=False, copy=False)
623 engine = idents[0]
625 engine = idents[0]
624 try:
626 try:
625 idx = self.targets.index(engine)
627 idx = self.targets.index(engine)
626 except ValueError:
628 except ValueError:
627 pass # skip load-update for dead engines
629 pass # skip load-update for dead engines
628 else:
630 else:
629 self.finish_job(idx)
631 self.finish_job(idx)
630 except Exception:
632 except Exception:
631 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
633 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
632 return
634 return
633
635
634 md = msg['metadata']
636 md = msg['metadata']
635 parent = msg['parent_header']
637 parent = msg['parent_header']
636 if md.get('dependencies_met', True):
638 if md.get('dependencies_met', True):
637 success = (md['status'] == 'ok')
639 success = (md['status'] == 'ok')
638 msg_id = parent['msg_id']
640 msg_id = parent['msg_id']
639 retries = self.retries[msg_id]
641 retries = self.retries[msg_id]
640 if not success and retries > 0:
642 if not success and retries > 0:
641 # failed
643 # failed
642 self.retries[msg_id] = retries - 1
644 self.retries[msg_id] = retries - 1
643 self.handle_unmet_dependency(idents, parent)
645 self.handle_unmet_dependency(idents, parent)
644 else:
646 else:
645 del self.retries[msg_id]
647 del self.retries[msg_id]
646 # relay to client and update graph
648 # relay to client and update graph
647 self.handle_result(idents, parent, raw_msg, success)
649 self.handle_result(idents, parent, raw_msg, success)
648 # send to Hub monitor
650 # send to Hub monitor
649 self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False)
651 self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False)
650 else:
652 else:
651 self.handle_unmet_dependency(idents, parent)
653 self.handle_unmet_dependency(idents, parent)
652
654
653 def handle_result(self, idents, parent, raw_msg, success=True):
655 def handle_result(self, idents, parent, raw_msg, success=True):
654 """handle a real task result, either success or failure"""
656 """handle a real task result, either success or failure"""
655 # first, relay result to client
657 # first, relay result to client
656 engine = idents[0]
658 engine = idents[0]
657 client = idents[1]
659 client = idents[1]
658 # swap_ids for ROUTER-ROUTER mirror
660 # swap_ids for ROUTER-ROUTER mirror
659 raw_msg[:2] = [client,engine]
661 raw_msg[:2] = [client,engine]
660 # print (map(str, raw_msg[:4]))
662 # print (map(str, raw_msg[:4]))
661 self.client_stream.send_multipart(raw_msg, copy=False)
663 self.client_stream.send_multipart(raw_msg, copy=False)
662 # now, update our data structures
664 # now, update our data structures
663 msg_id = parent['msg_id']
665 msg_id = parent['msg_id']
664 self.pending[engine].pop(msg_id)
666 self.pending[engine].pop(msg_id)
665 if success:
667 if success:
666 self.completed[engine].add(msg_id)
668 self.completed[engine].add(msg_id)
667 self.all_completed.add(msg_id)
669 self.all_completed.add(msg_id)
668 else:
670 else:
669 self.failed[engine].add(msg_id)
671 self.failed[engine].add(msg_id)
670 self.all_failed.add(msg_id)
672 self.all_failed.add(msg_id)
671 self.all_done.add(msg_id)
673 self.all_done.add(msg_id)
672 self.destinations[msg_id] = engine
674 self.destinations[msg_id] = engine
673
675
674 self.update_graph(msg_id, success)
676 self.update_graph(msg_id, success)
675
677
676 def handle_unmet_dependency(self, idents, parent):
678 def handle_unmet_dependency(self, idents, parent):
677 """handle an unmet dependency"""
679 """handle an unmet dependency"""
678 engine = idents[0]
680 engine = idents[0]
679 msg_id = parent['msg_id']
681 msg_id = parent['msg_id']
680
682
681 job = self.pending[engine].pop(msg_id)
683 job = self.pending[engine].pop(msg_id)
682 job.blacklist.add(engine)
684 job.blacklist.add(engine)
683
685
684 if job.blacklist == job.targets:
686 if job.blacklist == job.targets:
685 self.queue_map[msg_id] = job
687 self.queue_map[msg_id] = job
686 self.fail_unreachable(msg_id)
688 self.fail_unreachable(msg_id)
687 elif not self.maybe_run(job):
689 elif not self.maybe_run(job):
688 # resubmit failed
690 # resubmit failed
689 if msg_id not in self.all_failed:
691 if msg_id not in self.all_failed:
690 # put it back in our dependency tree
692 # put it back in our dependency tree
691 self.save_unmet(job)
693 self.save_unmet(job)
692
694
693 if self.hwm:
695 if self.hwm:
694 try:
696 try:
695 idx = self.targets.index(engine)
697 idx = self.targets.index(engine)
696 except ValueError:
698 except ValueError:
697 pass # skip load-update for dead engines
699 pass # skip load-update for dead engines
698 else:
700 else:
699 if self.loads[idx] == self.hwm-1:
701 if self.loads[idx] == self.hwm-1:
700 self.update_graph(None)
702 self.update_graph(None)
701
703
702 def update_graph(self, dep_id=None, success=True):
704 def update_graph(self, dep_id=None, success=True):
703 """dep_id just finished. Update our dependency
705 """dep_id just finished. Update our dependency
704 graph and submit any jobs that just became runnable.
706 graph and submit any jobs that just became runnable.
705
707
706 Called with dep_id=None to update entire graph for hwm, but without finishing a task.
708 Called with dep_id=None to update entire graph for hwm, but without finishing a task.
707 """
709 """
708 # print ("\n\n***********")
710 # print ("\n\n***********")
709 # pprint (dep_id)
711 # pprint (dep_id)
710 # pprint (self.graph)
712 # pprint (self.graph)
711 # pprint (self.queue_map)
713 # pprint (self.queue_map)
712 # pprint (self.all_completed)
714 # pprint (self.all_completed)
713 # pprint (self.all_failed)
715 # pprint (self.all_failed)
714 # print ("\n\n***********\n\n")
716 # print ("\n\n***********\n\n")
715 # update any jobs that depended on the dependency
717 # update any jobs that depended on the dependency
716 msg_ids = self.graph.pop(dep_id, [])
718 msg_ids = self.graph.pop(dep_id, [])
717
719
718 # recheck *all* jobs if
720 # recheck *all* jobs if
719 # a) we have HWM and an engine just become no longer full
721 # a) we have HWM and an engine just become no longer full
720 # or b) dep_id was given as None
722 # or b) dep_id was given as None
721
723
722 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
724 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
723 jobs = self.queue
725 jobs = self.queue
724 using_queue = True
726 using_queue = True
725 else:
727 else:
726 using_queue = False
728 using_queue = False
727 jobs = heapq.heapify([ self.queue_map[msg_id] for msg_id in msg_ids ])
729 jobs = heapq.heapify([ self.queue_map[msg_id] for msg_id in msg_ids ])
728
730
729 to_restore = []
731 to_restore = []
730 while jobs:
732 while jobs:
731 job = heapq.heappop(jobs)
733 job = heapq.heappop(jobs)
732 if job.removed:
734 if job.removed:
733 continue
735 continue
734 msg_id = job.msg_id
736 msg_id = job.msg_id
735
737
736 put_it_back = True
738 put_it_back = True
737
739
738 if job.after.unreachable(self.all_completed, self.all_failed)\
740 if job.after.unreachable(self.all_completed, self.all_failed)\
739 or job.follow.unreachable(self.all_completed, self.all_failed):
741 or job.follow.unreachable(self.all_completed, self.all_failed):
740 self.fail_unreachable(msg_id)
742 self.fail_unreachable(msg_id)
741 put_it_back = False
743 put_it_back = False
742
744
743 elif job.after.check(self.all_completed, self.all_failed): # time deps met, maybe run
745 elif job.after.check(self.all_completed, self.all_failed): # time deps met, maybe run
744 if self.maybe_run(job):
746 if self.maybe_run(job):
745 put_it_back = False
747 put_it_back = False
746 self.queue_map.pop(msg_id)
748 self.queue_map.pop(msg_id)
747 for mid in job.dependents:
749 for mid in job.dependents:
748 if mid in self.graph:
750 if mid in self.graph:
749 self.graph[mid].remove(msg_id)
751 self.graph[mid].remove(msg_id)
750
752
751 # abort the loop if we just filled up all of our engines.
753 # abort the loop if we just filled up all of our engines.
752 # avoids an O(N) operation in situation of full queue,
754 # avoids an O(N) operation in situation of full queue,
753 # where graph update is triggered as soon as an engine becomes
755 # where graph update is triggered as soon as an engine becomes
754 # non-full, and all tasks after the first are checked,
756 # non-full, and all tasks after the first are checked,
755 # even though they can't run.
757 # even though they can't run.
756 if not self.available_engines():
758 if not self.available_engines():
757 break
759 break
758
760
759 if using_queue and put_it_back:
761 if using_queue and put_it_back:
760 # popped a job from the queue but it neither ran nor failed,
762 # popped a job from the queue but it neither ran nor failed,
761 # so we need to put it back when we are done
763 # so we need to put it back when we are done
762 to_restore.append(job)
764 to_restore.append(job)
763
765
764 # put back any tasks we popped but didn't run
766 # put back any tasks we popped but didn't run
765 for job in to_restore:
767 for job in to_restore:
766 heapq.heappush(self.queue, job)
768 heapq.heappush(self.queue, job)
767
769
768
770
769 #----------------------------------------------------------------------
771 #----------------------------------------------------------------------
770 # methods to be overridden by subclasses
772 # methods to be overridden by subclasses
771 #----------------------------------------------------------------------
773 #----------------------------------------------------------------------
772
774
773 def add_job(self, idx):
775 def add_job(self, idx):
774 """Called after self.targets[idx] just got the job with header.
776 """Called after self.targets[idx] just got the job with header.
775 Override with subclasses. The default ordering is simple LRU.
777 Override with subclasses. The default ordering is simple LRU.
776 The default loads are the number of outstanding jobs."""
778 The default loads are the number of outstanding jobs."""
777 self.loads[idx] += 1
779 self.loads[idx] += 1
778 for lis in (self.targets, self.loads):
780 for lis in (self.targets, self.loads):
779 lis.append(lis.pop(idx))
781 lis.append(lis.pop(idx))
780
782
781
783
782 def finish_job(self, idx):
784 def finish_job(self, idx):
783 """Called after self.targets[idx] just finished a job.
785 """Called after self.targets[idx] just finished a job.
784 Override with subclasses."""
786 Override with subclasses."""
785 self.loads[idx] -= 1
787 self.loads[idx] -= 1
786
788
787
789
788
790
789 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, reg_addr, config=None,
791 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, reg_addr, config=None,
790 logname='root', log_url=None, loglevel=logging.DEBUG,
792 logname='root', log_url=None, loglevel=logging.DEBUG,
791 identity=b'task', in_thread=False):
793 identity=b'task', in_thread=False):
792
794
793 ZMQStream = zmqstream.ZMQStream
795 ZMQStream = zmqstream.ZMQStream
794
796 loglevel = logging.DEBUG
795 if config:
797 if config:
796 # unwrap dict back into Config
798 # unwrap dict back into Config
797 config = Config(config)
799 config = Config(config)
798
800
799 if in_thread:
801 if in_thread:
800 # use instance() to get the same Context/Loop as our parent
802 # use instance() to get the same Context/Loop as our parent
801 ctx = zmq.Context.instance()
803 ctx = zmq.Context.instance()
802 loop = ioloop.IOLoop.instance()
804 loop = ioloop.IOLoop.instance()
803 else:
805 else:
804 # in a process, don't use instance()
806 # in a process, don't use instance()
805 # for safety with multiprocessing
807 # for safety with multiprocessing
806 ctx = zmq.Context()
808 ctx = zmq.Context()
807 loop = ioloop.IOLoop()
809 loop = ioloop.IOLoop()
808 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
810 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
809 ins.setsockopt(zmq.IDENTITY, identity + b'_in')
811 ins.setsockopt(zmq.IDENTITY, identity + b'_in')
810 ins.bind(in_addr)
812 ins.bind(in_addr)
811
813
812 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
814 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
813 outs.setsockopt(zmq.IDENTITY, identity + b'_out')
815 outs.setsockopt(zmq.IDENTITY, identity + b'_out')
814 outs.bind(out_addr)
816 outs.bind(out_addr)
815 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
817 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
816 mons.connect(mon_addr)
818 mons.connect(mon_addr)
817 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
819 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
818 nots.setsockopt(zmq.SUBSCRIBE, b'')
820 nots.setsockopt(zmq.SUBSCRIBE, b'')
819 nots.connect(not_addr)
821 nots.connect(not_addr)
820
822
821 querys = ZMQStream(ctx.socket(zmq.DEALER),loop)
823 querys = ZMQStream(ctx.socket(zmq.DEALER),loop)
822 querys.connect(reg_addr)
824 querys.connect(reg_addr)
823
825
824 # setup logging.
826 # setup logging.
825 if in_thread:
827 if in_thread:
826 log = Application.instance().log
828 log = Application.instance().log
827 else:
829 else:
828 if log_url:
830 if log_url:
829 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
831 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
830 else:
832 else:
831 log = local_logger(logname, loglevel)
833 log = local_logger(logname, loglevel)
832
834
833 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
835 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
834 mon_stream=mons, notifier_stream=nots,
836 mon_stream=mons, notifier_stream=nots,
835 query_stream=querys,
837 query_stream=querys,
836 loop=loop, log=log,
838 loop=loop, log=log,
837 config=config)
839 config=config)
838 scheduler.start()
840 scheduler.start()
839 if not in_thread:
841 if not in_thread:
840 try:
842 try:
841 loop.start()
843 loop.start()
842 except KeyboardInterrupt:
844 except KeyboardInterrupt:
843 scheduler.log.critical("Interrupted, exiting...")
845 scheduler.log.critical("Interrupted, exiting...")
844
846
General Comments 0
You need to be logged in to leave comments. Login now