##// END OF EJS Templates
Merge pull request #4214 from minrk/ident-bytes...
Min RK -
r12634:195e4cf6 merge
parent child Browse files
Show More
@@ -1,852 +1,852
1 """The Python scheduler for rich scheduling.
1 """The Python scheduler for rich scheduling.
2
2
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 Python Scheduler exists.
5 Python Scheduler exists.
6
6
7 Authors:
7 Authors:
8
8
9 * Min RK
9 * Min RK
10 """
10 """
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2010-2011 The IPython Development Team
12 # Copyright (C) 2010-2011 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #----------------------------------------------------------------------
18 #----------------------------------------------------------------------
19 # Imports
19 # Imports
20 #----------------------------------------------------------------------
20 #----------------------------------------------------------------------
21
21
22 import logging
22 import logging
23 import sys
23 import sys
24 import time
24 import time
25
25
26 from collections import deque
26 from collections import deque
27 from datetime import datetime
27 from datetime import datetime
28 from random import randint, random
28 from random import randint, random
29 from types import FunctionType
29 from types import FunctionType
30
30
31 try:
31 try:
32 import numpy
32 import numpy
33 except ImportError:
33 except ImportError:
34 numpy = None
34 numpy = None
35
35
36 import zmq
36 import zmq
37 from zmq.eventloop import ioloop, zmqstream
37 from zmq.eventloop import ioloop, zmqstream
38
38
39 # local imports
39 # local imports
40 from IPython.external.decorator import decorator
40 from IPython.external.decorator import decorator
41 from IPython.config.application import Application
41 from IPython.config.application import Application
42 from IPython.config.loader import Config
42 from IPython.config.loader import Config
43 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
43 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
44 from IPython.utils.py3compat import cast_bytes
44 from IPython.utils.py3compat import cast_bytes
45
45
46 from IPython.parallel import error, util
46 from IPython.parallel import error, util
47 from IPython.parallel.factory import SessionFactory
47 from IPython.parallel.factory import SessionFactory
48 from IPython.parallel.util import connect_logger, local_logger
48 from IPython.parallel.util import connect_logger, local_logger
49
49
50 from .dependency import Dependency
50 from .dependency import Dependency
51
51
52 @decorator
52 @decorator
53 def logged(f,self,*args,**kwargs):
53 def logged(f,self,*args,**kwargs):
54 # print ("#--------------------")
54 # print ("#--------------------")
55 self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs)
55 self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs)
56 # print ("#--")
56 # print ("#--")
57 return f(self,*args, **kwargs)
57 return f(self,*args, **kwargs)
58
58
59 #----------------------------------------------------------------------
59 #----------------------------------------------------------------------
60 # Chooser functions
60 # Chooser functions
61 #----------------------------------------------------------------------
61 #----------------------------------------------------------------------
62
62
63 def plainrandom(loads):
63 def plainrandom(loads):
64 """Plain random pick."""
64 """Plain random pick."""
65 n = len(loads)
65 n = len(loads)
66 return randint(0,n-1)
66 return randint(0,n-1)
67
67
68 def lru(loads):
68 def lru(loads):
69 """Always pick the front of the line.
69 """Always pick the front of the line.
70
70
71 The content of `loads` is ignored.
71 The content of `loads` is ignored.
72
72
73 Assumes LRU ordering of loads, with oldest first.
73 Assumes LRU ordering of loads, with oldest first.
74 """
74 """
75 return 0
75 return 0
76
76
77 def twobin(loads):
77 def twobin(loads):
78 """Pick two at random, use the LRU of the two.
78 """Pick two at random, use the LRU of the two.
79
79
80 The content of loads is ignored.
80 The content of loads is ignored.
81
81
82 Assumes LRU ordering of loads, with oldest first.
82 Assumes LRU ordering of loads, with oldest first.
83 """
83 """
84 n = len(loads)
84 n = len(loads)
85 a = randint(0,n-1)
85 a = randint(0,n-1)
86 b = randint(0,n-1)
86 b = randint(0,n-1)
87 return min(a,b)
87 return min(a,b)
88
88
89 def weighted(loads):
89 def weighted(loads):
90 """Pick two at random using inverse load as weight.
90 """Pick two at random using inverse load as weight.
91
91
92 Return the less loaded of the two.
92 Return the less loaded of the two.
93 """
93 """
94 # weight 0 a million times more than 1:
94 # weight 0 a million times more than 1:
95 weights = 1./(1e-6+numpy.array(loads))
95 weights = 1./(1e-6+numpy.array(loads))
96 sums = weights.cumsum()
96 sums = weights.cumsum()
97 t = sums[-1]
97 t = sums[-1]
98 x = random()*t
98 x = random()*t
99 y = random()*t
99 y = random()*t
100 idx = 0
100 idx = 0
101 idy = 0
101 idy = 0
102 while sums[idx] < x:
102 while sums[idx] < x:
103 idx += 1
103 idx += 1
104 while sums[idy] < y:
104 while sums[idy] < y:
105 idy += 1
105 idy += 1
106 if weights[idy] > weights[idx]:
106 if weights[idy] > weights[idx]:
107 return idy
107 return idy
108 else:
108 else:
109 return idx
109 return idx
110
110
111 def leastload(loads):
111 def leastload(loads):
112 """Always choose the lowest load.
112 """Always choose the lowest load.
113
113
114 If the lowest load occurs more than once, the first
114 If the lowest load occurs more than once, the first
115 occurance will be used. If loads has LRU ordering, this means
115 occurance will be used. If loads has LRU ordering, this means
116 the LRU of those with the lowest load is chosen.
116 the LRU of those with the lowest load is chosen.
117 """
117 """
118 return loads.index(min(loads))
118 return loads.index(min(loads))
119
119
120 #---------------------------------------------------------------------
120 #---------------------------------------------------------------------
121 # Classes
121 # Classes
122 #---------------------------------------------------------------------
122 #---------------------------------------------------------------------
123
123
124
124
125 # store empty default dependency:
125 # store empty default dependency:
126 MET = Dependency([])
126 MET = Dependency([])
127
127
128
128
129 class Job(object):
129 class Job(object):
130 """Simple container for a job"""
130 """Simple container for a job"""
131 def __init__(self, msg_id, raw_msg, idents, msg, header, metadata,
131 def __init__(self, msg_id, raw_msg, idents, msg, header, metadata,
132 targets, after, follow, timeout):
132 targets, after, follow, timeout):
133 self.msg_id = msg_id
133 self.msg_id = msg_id
134 self.raw_msg = raw_msg
134 self.raw_msg = raw_msg
135 self.idents = idents
135 self.idents = idents
136 self.msg = msg
136 self.msg = msg
137 self.header = header
137 self.header = header
138 self.metadata = metadata
138 self.metadata = metadata
139 self.targets = targets
139 self.targets = targets
140 self.after = after
140 self.after = after
141 self.follow = follow
141 self.follow = follow
142 self.timeout = timeout
142 self.timeout = timeout
143 self.removed = False # used for lazy-delete from sorted queue
143 self.removed = False # used for lazy-delete from sorted queue
144
144
145 self.timestamp = time.time()
145 self.timestamp = time.time()
146 self.blacklist = set()
146 self.blacklist = set()
147
147
148 def __lt__(self, other):
148 def __lt__(self, other):
149 return self.timestamp < other.timestamp
149 return self.timestamp < other.timestamp
150
150
151 def __cmp__(self, other):
151 def __cmp__(self, other):
152 return cmp(self.timestamp, other.timestamp)
152 return cmp(self.timestamp, other.timestamp)
153
153
154 @property
154 @property
155 def dependents(self):
155 def dependents(self):
156 return self.follow.union(self.after)
156 return self.follow.union(self.after)
157
157
158 class TaskScheduler(SessionFactory):
158 class TaskScheduler(SessionFactory):
159 """Python TaskScheduler object.
159 """Python TaskScheduler object.
160
160
161 This is the simplest object that supports msg_id based
161 This is the simplest object that supports msg_id based
162 DAG dependencies. *Only* task msg_ids are checked, not
162 DAG dependencies. *Only* task msg_ids are checked, not
163 msg_ids of jobs submitted via the MUX queue.
163 msg_ids of jobs submitted via the MUX queue.
164
164
165 """
165 """
166
166
167 hwm = Integer(1, config=True,
167 hwm = Integer(1, config=True,
168 help="""specify the High Water Mark (HWM) for the downstream
168 help="""specify the High Water Mark (HWM) for the downstream
169 socket in the Task scheduler. This is the maximum number
169 socket in the Task scheduler. This is the maximum number
170 of allowed outstanding tasks on each engine.
170 of allowed outstanding tasks on each engine.
171
171
172 The default (1) means that only one task can be outstanding on each
172 The default (1) means that only one task can be outstanding on each
173 engine. Setting TaskScheduler.hwm=0 means there is no limit, and the
173 engine. Setting TaskScheduler.hwm=0 means there is no limit, and the
174 engines continue to be assigned tasks while they are working,
174 engines continue to be assigned tasks while they are working,
175 effectively hiding network latency behind computation, but can result
175 effectively hiding network latency behind computation, but can result
176 in an imbalance of work when submitting many heterogenous tasks all at
176 in an imbalance of work when submitting many heterogenous tasks all at
177 once. Any positive value greater than one is a compromise between the
177 once. Any positive value greater than one is a compromise between the
178 two.
178 two.
179
179
180 """
180 """
181 )
181 )
182 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
182 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
183 'leastload', config=True, allow_none=False,
183 'leastload', config=True, allow_none=False,
184 help="""select the task scheduler scheme [default: Python LRU]
184 help="""select the task scheduler scheme [default: Python LRU]
185 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
185 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
186 )
186 )
187 def _scheme_name_changed(self, old, new):
187 def _scheme_name_changed(self, old, new):
188 self.log.debug("Using scheme %r"%new)
188 self.log.debug("Using scheme %r"%new)
189 self.scheme = globals()[new]
189 self.scheme = globals()[new]
190
190
191 # input arguments:
191 # input arguments:
192 scheme = Instance(FunctionType) # function for determining the destination
192 scheme = Instance(FunctionType) # function for determining the destination
193 def _scheme_default(self):
193 def _scheme_default(self):
194 return leastload
194 return leastload
195 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
195 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
196 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
196 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
197 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
197 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
198 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
198 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
199 query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream
199 query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream
200
200
201 # internals:
201 # internals:
202 queue = Instance(deque) # sorted list of Jobs
202 queue = Instance(deque) # sorted list of Jobs
203 def _queue_default(self):
203 def _queue_default(self):
204 return deque()
204 return deque()
205 queue_map = Dict() # dict by msg_id of Jobs (for O(1) access to the Queue)
205 queue_map = Dict() # dict by msg_id of Jobs (for O(1) access to the Queue)
206 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
206 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
207 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
207 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
208 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
208 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
209 pending = Dict() # dict by engine_uuid of submitted tasks
209 pending = Dict() # dict by engine_uuid of submitted tasks
210 completed = Dict() # dict by engine_uuid of completed tasks
210 completed = Dict() # dict by engine_uuid of completed tasks
211 failed = Dict() # dict by engine_uuid of failed tasks
211 failed = Dict() # dict by engine_uuid of failed tasks
212 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
212 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
213 clients = Dict() # dict by msg_id for who submitted the task
213 clients = Dict() # dict by msg_id for who submitted the task
214 targets = List() # list of target IDENTs
214 targets = List() # list of target IDENTs
215 loads = List() # list of engine loads
215 loads = List() # list of engine loads
216 # full = Set() # set of IDENTs that have HWM outstanding tasks
216 # full = Set() # set of IDENTs that have HWM outstanding tasks
217 all_completed = Set() # set of all completed tasks
217 all_completed = Set() # set of all completed tasks
218 all_failed = Set() # set of all failed tasks
218 all_failed = Set() # set of all failed tasks
219 all_done = Set() # set of all finished tasks=union(completed,failed)
219 all_done = Set() # set of all finished tasks=union(completed,failed)
220 all_ids = Set() # set of all submitted task IDs
220 all_ids = Set() # set of all submitted task IDs
221
221
222 ident = CBytes() # ZMQ identity. This should just be self.session.session
222 ident = CBytes() # ZMQ identity. This should just be self.session.session
223 # but ensure Bytes
223 # but ensure Bytes
224 def _ident_default(self):
224 def _ident_default(self):
225 return self.session.bsession
225 return self.session.bsession
226
226
227 def start(self):
227 def start(self):
228 self.query_stream.on_recv(self.dispatch_query_reply)
228 self.query_stream.on_recv(self.dispatch_query_reply)
229 self.session.send(self.query_stream, "connection_request", {})
229 self.session.send(self.query_stream, "connection_request", {})
230
230
231 self.engine_stream.on_recv(self.dispatch_result, copy=False)
231 self.engine_stream.on_recv(self.dispatch_result, copy=False)
232 self.client_stream.on_recv(self.dispatch_submission, copy=False)
232 self.client_stream.on_recv(self.dispatch_submission, copy=False)
233
233
234 self._notification_handlers = dict(
234 self._notification_handlers = dict(
235 registration_notification = self._register_engine,
235 registration_notification = self._register_engine,
236 unregistration_notification = self._unregister_engine
236 unregistration_notification = self._unregister_engine
237 )
237 )
238 self.notifier_stream.on_recv(self.dispatch_notification)
238 self.notifier_stream.on_recv(self.dispatch_notification)
239 self.log.info("Scheduler started [%s]" % self.scheme_name)
239 self.log.info("Scheduler started [%s]" % self.scheme_name)
240
240
241 def resume_receiving(self):
241 def resume_receiving(self):
242 """Resume accepting jobs."""
242 """Resume accepting jobs."""
243 self.client_stream.on_recv(self.dispatch_submission, copy=False)
243 self.client_stream.on_recv(self.dispatch_submission, copy=False)
244
244
245 def stop_receiving(self):
245 def stop_receiving(self):
246 """Stop accepting jobs while there are no engines.
246 """Stop accepting jobs while there are no engines.
247 Leave them in the ZMQ queue."""
247 Leave them in the ZMQ queue."""
248 self.client_stream.on_recv(None)
248 self.client_stream.on_recv(None)
249
249
250 #-----------------------------------------------------------------------
250 #-----------------------------------------------------------------------
251 # [Un]Registration Handling
251 # [Un]Registration Handling
252 #-----------------------------------------------------------------------
252 #-----------------------------------------------------------------------
253
253
254
254
255 def dispatch_query_reply(self, msg):
255 def dispatch_query_reply(self, msg):
256 """handle reply to our initial connection request"""
256 """handle reply to our initial connection request"""
257 try:
257 try:
258 idents,msg = self.session.feed_identities(msg)
258 idents,msg = self.session.feed_identities(msg)
259 except ValueError:
259 except ValueError:
260 self.log.warn("task::Invalid Message: %r",msg)
260 self.log.warn("task::Invalid Message: %r",msg)
261 return
261 return
262 try:
262 try:
263 msg = self.session.unserialize(msg)
263 msg = self.session.unserialize(msg)
264 except ValueError:
264 except ValueError:
265 self.log.warn("task::Unauthorized message from: %r"%idents)
265 self.log.warn("task::Unauthorized message from: %r"%idents)
266 return
266 return
267
267
268 content = msg['content']
268 content = msg['content']
269 for uuid in content.get('engines', {}).values():
269 for uuid in content.get('engines', {}).values():
270 self._register_engine(cast_bytes(uuid))
270 self._register_engine(cast_bytes(uuid))
271
271
272
272
273 @util.log_errors
273 @util.log_errors
274 def dispatch_notification(self, msg):
274 def dispatch_notification(self, msg):
275 """dispatch register/unregister events."""
275 """dispatch register/unregister events."""
276 try:
276 try:
277 idents,msg = self.session.feed_identities(msg)
277 idents,msg = self.session.feed_identities(msg)
278 except ValueError:
278 except ValueError:
279 self.log.warn("task::Invalid Message: %r",msg)
279 self.log.warn("task::Invalid Message: %r",msg)
280 return
280 return
281 try:
281 try:
282 msg = self.session.unserialize(msg)
282 msg = self.session.unserialize(msg)
283 except ValueError:
283 except ValueError:
284 self.log.warn("task::Unauthorized message from: %r"%idents)
284 self.log.warn("task::Unauthorized message from: %r"%idents)
285 return
285 return
286
286
287 msg_type = msg['header']['msg_type']
287 msg_type = msg['header']['msg_type']
288
288
289 handler = self._notification_handlers.get(msg_type, None)
289 handler = self._notification_handlers.get(msg_type, None)
290 if handler is None:
290 if handler is None:
291 self.log.error("Unhandled message type: %r"%msg_type)
291 self.log.error("Unhandled message type: %r"%msg_type)
292 else:
292 else:
293 try:
293 try:
294 handler(cast_bytes(msg['content']['uuid']))
294 handler(cast_bytes(msg['content']['uuid']))
295 except Exception:
295 except Exception:
296 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
296 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
297
297
298 def _register_engine(self, uid):
298 def _register_engine(self, uid):
299 """New engine with ident `uid` became available."""
299 """New engine with ident `uid` became available."""
300 # head of the line:
300 # head of the line:
301 self.targets.insert(0,uid)
301 self.targets.insert(0,uid)
302 self.loads.insert(0,0)
302 self.loads.insert(0,0)
303
303
304 # initialize sets
304 # initialize sets
305 self.completed[uid] = set()
305 self.completed[uid] = set()
306 self.failed[uid] = set()
306 self.failed[uid] = set()
307 self.pending[uid] = {}
307 self.pending[uid] = {}
308
308
309 # rescan the graph:
309 # rescan the graph:
310 self.update_graph(None)
310 self.update_graph(None)
311
311
312 def _unregister_engine(self, uid):
312 def _unregister_engine(self, uid):
313 """Existing engine with ident `uid` became unavailable."""
313 """Existing engine with ident `uid` became unavailable."""
314 if len(self.targets) == 1:
314 if len(self.targets) == 1:
315 # this was our only engine
315 # this was our only engine
316 pass
316 pass
317
317
318 # handle any potentially finished tasks:
318 # handle any potentially finished tasks:
319 self.engine_stream.flush()
319 self.engine_stream.flush()
320
320
321 # don't pop destinations, because they might be used later
321 # don't pop destinations, because they might be used later
322 # map(self.destinations.pop, self.completed.pop(uid))
322 # map(self.destinations.pop, self.completed.pop(uid))
323 # map(self.destinations.pop, self.failed.pop(uid))
323 # map(self.destinations.pop, self.failed.pop(uid))
324
324
325 # prevent this engine from receiving work
325 # prevent this engine from receiving work
326 idx = self.targets.index(uid)
326 idx = self.targets.index(uid)
327 self.targets.pop(idx)
327 self.targets.pop(idx)
328 self.loads.pop(idx)
328 self.loads.pop(idx)
329
329
330 # wait 5 seconds before cleaning up pending jobs, since the results might
330 # wait 5 seconds before cleaning up pending jobs, since the results might
331 # still be incoming
331 # still be incoming
332 if self.pending[uid]:
332 if self.pending[uid]:
333 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
333 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
334 dc.start()
334 dc.start()
335 else:
335 else:
336 self.completed.pop(uid)
336 self.completed.pop(uid)
337 self.failed.pop(uid)
337 self.failed.pop(uid)
338
338
339
339
340 def handle_stranded_tasks(self, engine):
340 def handle_stranded_tasks(self, engine):
341 """Deal with jobs resident in an engine that died."""
341 """Deal with jobs resident in an engine that died."""
342 lost = self.pending[engine]
342 lost = self.pending[engine]
343 for msg_id in lost.keys():
343 for msg_id in lost.keys():
344 if msg_id not in self.pending[engine]:
344 if msg_id not in self.pending[engine]:
345 # prevent double-handling of messages
345 # prevent double-handling of messages
346 continue
346 continue
347
347
348 raw_msg = lost[msg_id].raw_msg
348 raw_msg = lost[msg_id].raw_msg
349 idents,msg = self.session.feed_identities(raw_msg, copy=False)
349 idents,msg = self.session.feed_identities(raw_msg, copy=False)
350 parent = self.session.unpack(msg[1].bytes)
350 parent = self.session.unpack(msg[1].bytes)
351 idents = [engine, idents[0]]
351 idents = [engine, idents[0]]
352
352
353 # build fake error reply
353 # build fake error reply
354 try:
354 try:
355 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
355 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
356 except:
356 except:
357 content = error.wrap_exception()
357 content = error.wrap_exception()
358 # build fake metadata
358 # build fake metadata
359 md = dict(
359 md = dict(
360 status=u'error',
360 status=u'error',
361 engine=engine,
361 engine=engine.decode('ascii'),
362 date=datetime.now(),
362 date=datetime.now(),
363 )
363 )
364 msg = self.session.msg('apply_reply', content, parent=parent, metadata=md)
364 msg = self.session.msg('apply_reply', content, parent=parent, metadata=md)
365 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
365 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
366 # and dispatch it
366 # and dispatch it
367 self.dispatch_result(raw_reply)
367 self.dispatch_result(raw_reply)
368
368
369 # finally scrub completed/failed lists
369 # finally scrub completed/failed lists
370 self.completed.pop(engine)
370 self.completed.pop(engine)
371 self.failed.pop(engine)
371 self.failed.pop(engine)
372
372
373
373
374 #-----------------------------------------------------------------------
374 #-----------------------------------------------------------------------
375 # Job Submission
375 # Job Submission
376 #-----------------------------------------------------------------------
376 #-----------------------------------------------------------------------
377
377
378
378
379 @util.log_errors
379 @util.log_errors
380 def dispatch_submission(self, raw_msg):
380 def dispatch_submission(self, raw_msg):
381 """Dispatch job submission to appropriate handlers."""
381 """Dispatch job submission to appropriate handlers."""
382 # ensure targets up to date:
382 # ensure targets up to date:
383 self.notifier_stream.flush()
383 self.notifier_stream.flush()
384 try:
384 try:
385 idents, msg = self.session.feed_identities(raw_msg, copy=False)
385 idents, msg = self.session.feed_identities(raw_msg, copy=False)
386 msg = self.session.unserialize(msg, content=False, copy=False)
386 msg = self.session.unserialize(msg, content=False, copy=False)
387 except Exception:
387 except Exception:
388 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
388 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
389 return
389 return
390
390
391
391
392 # send to monitor
392 # send to monitor
393 self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
393 self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
394
394
395 header = msg['header']
395 header = msg['header']
396 md = msg['metadata']
396 md = msg['metadata']
397 msg_id = header['msg_id']
397 msg_id = header['msg_id']
398 self.all_ids.add(msg_id)
398 self.all_ids.add(msg_id)
399
399
400 # get targets as a set of bytes objects
400 # get targets as a set of bytes objects
401 # from a list of unicode objects
401 # from a list of unicode objects
402 targets = md.get('targets', [])
402 targets = md.get('targets', [])
403 targets = map(cast_bytes, targets)
403 targets = map(cast_bytes, targets)
404 targets = set(targets)
404 targets = set(targets)
405
405
406 retries = md.get('retries', 0)
406 retries = md.get('retries', 0)
407 self.retries[msg_id] = retries
407 self.retries[msg_id] = retries
408
408
409 # time dependencies
409 # time dependencies
410 after = md.get('after', None)
410 after = md.get('after', None)
411 if after:
411 if after:
412 after = Dependency(after)
412 after = Dependency(after)
413 if after.all:
413 if after.all:
414 if after.success:
414 if after.success:
415 after = Dependency(after.difference(self.all_completed),
415 after = Dependency(after.difference(self.all_completed),
416 success=after.success,
416 success=after.success,
417 failure=after.failure,
417 failure=after.failure,
418 all=after.all,
418 all=after.all,
419 )
419 )
420 if after.failure:
420 if after.failure:
421 after = Dependency(after.difference(self.all_failed),
421 after = Dependency(after.difference(self.all_failed),
422 success=after.success,
422 success=after.success,
423 failure=after.failure,
423 failure=after.failure,
424 all=after.all,
424 all=after.all,
425 )
425 )
426 if after.check(self.all_completed, self.all_failed):
426 if after.check(self.all_completed, self.all_failed):
427 # recast as empty set, if `after` already met,
427 # recast as empty set, if `after` already met,
428 # to prevent unnecessary set comparisons
428 # to prevent unnecessary set comparisons
429 after = MET
429 after = MET
430 else:
430 else:
431 after = MET
431 after = MET
432
432
433 # location dependencies
433 # location dependencies
434 follow = Dependency(md.get('follow', []))
434 follow = Dependency(md.get('follow', []))
435
435
436 # turn timeouts into datetime objects:
436 # turn timeouts into datetime objects:
437 timeout = md.get('timeout', None)
437 timeout = md.get('timeout', None)
438 if timeout:
438 if timeout:
439 timeout = time.time() + float(timeout)
439 timeout = time.time() + float(timeout)
440
440
441 job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg,
441 job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg,
442 header=header, targets=targets, after=after, follow=follow,
442 header=header, targets=targets, after=after, follow=follow,
443 timeout=timeout, metadata=md,
443 timeout=timeout, metadata=md,
444 )
444 )
445 if timeout:
445 if timeout:
446 # schedule timeout callback
446 # schedule timeout callback
447 self.loop.add_timeout(timeout, lambda : self.job_timeout(job))
447 self.loop.add_timeout(timeout, lambda : self.job_timeout(job))
448
448
449 # validate and reduce dependencies:
449 # validate and reduce dependencies:
450 for dep in after,follow:
450 for dep in after,follow:
451 if not dep: # empty dependency
451 if not dep: # empty dependency
452 continue
452 continue
453 # check valid:
453 # check valid:
454 if msg_id in dep or dep.difference(self.all_ids):
454 if msg_id in dep or dep.difference(self.all_ids):
455 self.queue_map[msg_id] = job
455 self.queue_map[msg_id] = job
456 return self.fail_unreachable(msg_id, error.InvalidDependency)
456 return self.fail_unreachable(msg_id, error.InvalidDependency)
457 # check if unreachable:
457 # check if unreachable:
458 if dep.unreachable(self.all_completed, self.all_failed):
458 if dep.unreachable(self.all_completed, self.all_failed):
459 self.queue_map[msg_id] = job
459 self.queue_map[msg_id] = job
460 return self.fail_unreachable(msg_id)
460 return self.fail_unreachable(msg_id)
461
461
462 if after.check(self.all_completed, self.all_failed):
462 if after.check(self.all_completed, self.all_failed):
463 # time deps already met, try to run
463 # time deps already met, try to run
464 if not self.maybe_run(job):
464 if not self.maybe_run(job):
465 # can't run yet
465 # can't run yet
466 if msg_id not in self.all_failed:
466 if msg_id not in self.all_failed:
467 # could have failed as unreachable
467 # could have failed as unreachable
468 self.save_unmet(job)
468 self.save_unmet(job)
469 else:
469 else:
470 self.save_unmet(job)
470 self.save_unmet(job)
471
471
472 def job_timeout(self, job):
472 def job_timeout(self, job):
473 """callback for a job's timeout.
473 """callback for a job's timeout.
474
474
475 The job may or may not have been run at this point.
475 The job may or may not have been run at this point.
476 """
476 """
477 now = time.time()
477 now = time.time()
478 if job.timeout >= (now + 1):
478 if job.timeout >= (now + 1):
479 self.log.warn("task %s timeout fired prematurely: %s > %s",
479 self.log.warn("task %s timeout fired prematurely: %s > %s",
480 job.msg_id, job.timeout, now
480 job.msg_id, job.timeout, now
481 )
481 )
482 if job.msg_id in self.queue_map:
482 if job.msg_id in self.queue_map:
483 # still waiting, but ran out of time
483 # still waiting, but ran out of time
484 self.log.info("task %r timed out", job.msg_id)
484 self.log.info("task %r timed out", job.msg_id)
485 self.fail_unreachable(job.msg_id, error.TaskTimeout)
485 self.fail_unreachable(job.msg_id, error.TaskTimeout)
486
486
487 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
487 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
488 """a task has become unreachable, send a reply with an ImpossibleDependency
488 """a task has become unreachable, send a reply with an ImpossibleDependency
489 error."""
489 error."""
490 if msg_id not in self.queue_map:
490 if msg_id not in self.queue_map:
491 self.log.error("task %r already failed!", msg_id)
491 self.log.error("task %r already failed!", msg_id)
492 return
492 return
493 job = self.queue_map.pop(msg_id)
493 job = self.queue_map.pop(msg_id)
494 # lazy-delete from the queue
494 # lazy-delete from the queue
495 job.removed = True
495 job.removed = True
496 for mid in job.dependents:
496 for mid in job.dependents:
497 if mid in self.graph:
497 if mid in self.graph:
498 self.graph[mid].remove(msg_id)
498 self.graph[mid].remove(msg_id)
499
499
500 try:
500 try:
501 raise why()
501 raise why()
502 except:
502 except:
503 content = error.wrap_exception()
503 content = error.wrap_exception()
504 self.log.debug("task %r failing as unreachable with: %s", msg_id, content['ename'])
504 self.log.debug("task %r failing as unreachable with: %s", msg_id, content['ename'])
505
505
506 self.all_done.add(msg_id)
506 self.all_done.add(msg_id)
507 self.all_failed.add(msg_id)
507 self.all_failed.add(msg_id)
508
508
509 msg = self.session.send(self.client_stream, 'apply_reply', content,
509 msg = self.session.send(self.client_stream, 'apply_reply', content,
510 parent=job.header, ident=job.idents)
510 parent=job.header, ident=job.idents)
511 self.session.send(self.mon_stream, msg, ident=[b'outtask']+job.idents)
511 self.session.send(self.mon_stream, msg, ident=[b'outtask']+job.idents)
512
512
513 self.update_graph(msg_id, success=False)
513 self.update_graph(msg_id, success=False)
514
514
515 def available_engines(self):
515 def available_engines(self):
516 """return a list of available engine indices based on HWM"""
516 """return a list of available engine indices based on HWM"""
517 if not self.hwm:
517 if not self.hwm:
518 return range(len(self.targets))
518 return range(len(self.targets))
519 available = []
519 available = []
520 for idx in range(len(self.targets)):
520 for idx in range(len(self.targets)):
521 if self.loads[idx] < self.hwm:
521 if self.loads[idx] < self.hwm:
522 available.append(idx)
522 available.append(idx)
523 return available
523 return available
524
524
525 def maybe_run(self, job):
525 def maybe_run(self, job):
526 """check location dependencies, and run if they are met."""
526 """check location dependencies, and run if they are met."""
527 msg_id = job.msg_id
527 msg_id = job.msg_id
528 self.log.debug("Attempting to assign task %s", msg_id)
528 self.log.debug("Attempting to assign task %s", msg_id)
529 available = self.available_engines()
529 available = self.available_engines()
530 if not available:
530 if not available:
531 # no engines, definitely can't run
531 # no engines, definitely can't run
532 return False
532 return False
533
533
534 if job.follow or job.targets or job.blacklist or self.hwm:
534 if job.follow or job.targets or job.blacklist or self.hwm:
535 # we need a can_run filter
535 # we need a can_run filter
536 def can_run(idx):
536 def can_run(idx):
537 # check hwm
537 # check hwm
538 if self.hwm and self.loads[idx] == self.hwm:
538 if self.hwm and self.loads[idx] == self.hwm:
539 return False
539 return False
540 target = self.targets[idx]
540 target = self.targets[idx]
541 # check blacklist
541 # check blacklist
542 if target in job.blacklist:
542 if target in job.blacklist:
543 return False
543 return False
544 # check targets
544 # check targets
545 if job.targets and target not in job.targets:
545 if job.targets and target not in job.targets:
546 return False
546 return False
547 # check follow
547 # check follow
548 return job.follow.check(self.completed[target], self.failed[target])
548 return job.follow.check(self.completed[target], self.failed[target])
549
549
550 indices = filter(can_run, available)
550 indices = filter(can_run, available)
551
551
552 if not indices:
552 if not indices:
553 # couldn't run
553 # couldn't run
554 if job.follow.all:
554 if job.follow.all:
555 # check follow for impossibility
555 # check follow for impossibility
556 dests = set()
556 dests = set()
557 relevant = set()
557 relevant = set()
558 if job.follow.success:
558 if job.follow.success:
559 relevant = self.all_completed
559 relevant = self.all_completed
560 if job.follow.failure:
560 if job.follow.failure:
561 relevant = relevant.union(self.all_failed)
561 relevant = relevant.union(self.all_failed)
562 for m in job.follow.intersection(relevant):
562 for m in job.follow.intersection(relevant):
563 dests.add(self.destinations[m])
563 dests.add(self.destinations[m])
564 if len(dests) > 1:
564 if len(dests) > 1:
565 self.queue_map[msg_id] = job
565 self.queue_map[msg_id] = job
566 self.fail_unreachable(msg_id)
566 self.fail_unreachable(msg_id)
567 return False
567 return False
568 if job.targets:
568 if job.targets:
569 # check blacklist+targets for impossibility
569 # check blacklist+targets for impossibility
570 job.targets.difference_update(job.blacklist)
570 job.targets.difference_update(job.blacklist)
571 if not job.targets or not job.targets.intersection(self.targets):
571 if not job.targets or not job.targets.intersection(self.targets):
572 self.queue_map[msg_id] = job
572 self.queue_map[msg_id] = job
573 self.fail_unreachable(msg_id)
573 self.fail_unreachable(msg_id)
574 return False
574 return False
575 return False
575 return False
576 else:
576 else:
577 indices = None
577 indices = None
578
578
579 self.submit_task(job, indices)
579 self.submit_task(job, indices)
580 return True
580 return True
581
581
582 def save_unmet(self, job):
582 def save_unmet(self, job):
583 """Save a message for later submission when its dependencies are met."""
583 """Save a message for later submission when its dependencies are met."""
584 msg_id = job.msg_id
584 msg_id = job.msg_id
585 self.log.debug("Adding task %s to the queue", msg_id)
585 self.log.debug("Adding task %s to the queue", msg_id)
586 self.queue_map[msg_id] = job
586 self.queue_map[msg_id] = job
587 self.queue.append(job)
587 self.queue.append(job)
588 # track the ids in follow or after, but not those already finished
588 # track the ids in follow or after, but not those already finished
589 for dep_id in job.after.union(job.follow).difference(self.all_done):
589 for dep_id in job.after.union(job.follow).difference(self.all_done):
590 if dep_id not in self.graph:
590 if dep_id not in self.graph:
591 self.graph[dep_id] = set()
591 self.graph[dep_id] = set()
592 self.graph[dep_id].add(msg_id)
592 self.graph[dep_id].add(msg_id)
593
593
594 def submit_task(self, job, indices=None):
594 def submit_task(self, job, indices=None):
595 """Submit a task to any of a subset of our targets."""
595 """Submit a task to any of a subset of our targets."""
596 if indices:
596 if indices:
597 loads = [self.loads[i] for i in indices]
597 loads = [self.loads[i] for i in indices]
598 else:
598 else:
599 loads = self.loads
599 loads = self.loads
600 idx = self.scheme(loads)
600 idx = self.scheme(loads)
601 if indices:
601 if indices:
602 idx = indices[idx]
602 idx = indices[idx]
603 target = self.targets[idx]
603 target = self.targets[idx]
604 # print (target, map(str, msg[:3]))
604 # print (target, map(str, msg[:3]))
605 # send job to the engine
605 # send job to the engine
606 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
606 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
607 self.engine_stream.send_multipart(job.raw_msg, copy=False)
607 self.engine_stream.send_multipart(job.raw_msg, copy=False)
608 # update load
608 # update load
609 self.add_job(idx)
609 self.add_job(idx)
610 self.pending[target][job.msg_id] = job
610 self.pending[target][job.msg_id] = job
611 # notify Hub
611 # notify Hub
612 content = dict(msg_id=job.msg_id, engine_id=target.decode('ascii'))
612 content = dict(msg_id=job.msg_id, engine_id=target.decode('ascii'))
613 self.session.send(self.mon_stream, 'task_destination', content=content,
613 self.session.send(self.mon_stream, 'task_destination', content=content,
614 ident=[b'tracktask',self.ident])
614 ident=[b'tracktask',self.ident])
615
615
616
616
617 #-----------------------------------------------------------------------
617 #-----------------------------------------------------------------------
618 # Result Handling
618 # Result Handling
619 #-----------------------------------------------------------------------
619 #-----------------------------------------------------------------------
620
620
621
621
622 @util.log_errors
622 @util.log_errors
623 def dispatch_result(self, raw_msg):
623 def dispatch_result(self, raw_msg):
624 """dispatch method for result replies"""
624 """dispatch method for result replies"""
625 try:
625 try:
626 idents,msg = self.session.feed_identities(raw_msg, copy=False)
626 idents,msg = self.session.feed_identities(raw_msg, copy=False)
627 msg = self.session.unserialize(msg, content=False, copy=False)
627 msg = self.session.unserialize(msg, content=False, copy=False)
628 engine = idents[0]
628 engine = idents[0]
629 try:
629 try:
630 idx = self.targets.index(engine)
630 idx = self.targets.index(engine)
631 except ValueError:
631 except ValueError:
632 pass # skip load-update for dead engines
632 pass # skip load-update for dead engines
633 else:
633 else:
634 self.finish_job(idx)
634 self.finish_job(idx)
635 except Exception:
635 except Exception:
636 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
636 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
637 return
637 return
638
638
639 md = msg['metadata']
639 md = msg['metadata']
640 parent = msg['parent_header']
640 parent = msg['parent_header']
641 if md.get('dependencies_met', True):
641 if md.get('dependencies_met', True):
642 success = (md['status'] == 'ok')
642 success = (md['status'] == 'ok')
643 msg_id = parent['msg_id']
643 msg_id = parent['msg_id']
644 retries = self.retries[msg_id]
644 retries = self.retries[msg_id]
645 if not success and retries > 0:
645 if not success and retries > 0:
646 # failed
646 # failed
647 self.retries[msg_id] = retries - 1
647 self.retries[msg_id] = retries - 1
648 self.handle_unmet_dependency(idents, parent)
648 self.handle_unmet_dependency(idents, parent)
649 else:
649 else:
650 del self.retries[msg_id]
650 del self.retries[msg_id]
651 # relay to client and update graph
651 # relay to client and update graph
652 self.handle_result(idents, parent, raw_msg, success)
652 self.handle_result(idents, parent, raw_msg, success)
653 # send to Hub monitor
653 # send to Hub monitor
654 self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False)
654 self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False)
655 else:
655 else:
656 self.handle_unmet_dependency(idents, parent)
656 self.handle_unmet_dependency(idents, parent)
657
657
658 def handle_result(self, idents, parent, raw_msg, success=True):
658 def handle_result(self, idents, parent, raw_msg, success=True):
659 """handle a real task result, either success or failure"""
659 """handle a real task result, either success or failure"""
660 # first, relay result to client
660 # first, relay result to client
661 engine = idents[0]
661 engine = idents[0]
662 client = idents[1]
662 client = idents[1]
663 # swap_ids for ROUTER-ROUTER mirror
663 # swap_ids for ROUTER-ROUTER mirror
664 raw_msg[:2] = [client,engine]
664 raw_msg[:2] = [client,engine]
665 # print (map(str, raw_msg[:4]))
665 # print (map(str, raw_msg[:4]))
666 self.client_stream.send_multipart(raw_msg, copy=False)
666 self.client_stream.send_multipart(raw_msg, copy=False)
667 # now, update our data structures
667 # now, update our data structures
668 msg_id = parent['msg_id']
668 msg_id = parent['msg_id']
669 self.pending[engine].pop(msg_id)
669 self.pending[engine].pop(msg_id)
670 if success:
670 if success:
671 self.completed[engine].add(msg_id)
671 self.completed[engine].add(msg_id)
672 self.all_completed.add(msg_id)
672 self.all_completed.add(msg_id)
673 else:
673 else:
674 self.failed[engine].add(msg_id)
674 self.failed[engine].add(msg_id)
675 self.all_failed.add(msg_id)
675 self.all_failed.add(msg_id)
676 self.all_done.add(msg_id)
676 self.all_done.add(msg_id)
677 self.destinations[msg_id] = engine
677 self.destinations[msg_id] = engine
678
678
679 self.update_graph(msg_id, success)
679 self.update_graph(msg_id, success)
680
680
681 def handle_unmet_dependency(self, idents, parent):
681 def handle_unmet_dependency(self, idents, parent):
682 """handle an unmet dependency"""
682 """handle an unmet dependency"""
683 engine = idents[0]
683 engine = idents[0]
684 msg_id = parent['msg_id']
684 msg_id = parent['msg_id']
685
685
686 job = self.pending[engine].pop(msg_id)
686 job = self.pending[engine].pop(msg_id)
687 job.blacklist.add(engine)
687 job.blacklist.add(engine)
688
688
689 if job.blacklist == job.targets:
689 if job.blacklist == job.targets:
690 self.queue_map[msg_id] = job
690 self.queue_map[msg_id] = job
691 self.fail_unreachable(msg_id)
691 self.fail_unreachable(msg_id)
692 elif not self.maybe_run(job):
692 elif not self.maybe_run(job):
693 # resubmit failed
693 # resubmit failed
694 if msg_id not in self.all_failed:
694 if msg_id not in self.all_failed:
695 # put it back in our dependency tree
695 # put it back in our dependency tree
696 self.save_unmet(job)
696 self.save_unmet(job)
697
697
698 if self.hwm:
698 if self.hwm:
699 try:
699 try:
700 idx = self.targets.index(engine)
700 idx = self.targets.index(engine)
701 except ValueError:
701 except ValueError:
702 pass # skip load-update for dead engines
702 pass # skip load-update for dead engines
703 else:
703 else:
704 if self.loads[idx] == self.hwm-1:
704 if self.loads[idx] == self.hwm-1:
705 self.update_graph(None)
705 self.update_graph(None)
706
706
707 def update_graph(self, dep_id=None, success=True):
707 def update_graph(self, dep_id=None, success=True):
708 """dep_id just finished. Update our dependency
708 """dep_id just finished. Update our dependency
709 graph and submit any jobs that just became runnable.
709 graph and submit any jobs that just became runnable.
710
710
711 Called with dep_id=None to update entire graph for hwm, but without finishing a task.
711 Called with dep_id=None to update entire graph for hwm, but without finishing a task.
712 """
712 """
713 # print ("\n\n***********")
713 # print ("\n\n***********")
714 # pprint (dep_id)
714 # pprint (dep_id)
715 # pprint (self.graph)
715 # pprint (self.graph)
716 # pprint (self.queue_map)
716 # pprint (self.queue_map)
717 # pprint (self.all_completed)
717 # pprint (self.all_completed)
718 # pprint (self.all_failed)
718 # pprint (self.all_failed)
719 # print ("\n\n***********\n\n")
719 # print ("\n\n***********\n\n")
720 # update any jobs that depended on the dependency
720 # update any jobs that depended on the dependency
721 msg_ids = self.graph.pop(dep_id, [])
721 msg_ids = self.graph.pop(dep_id, [])
722
722
723 # recheck *all* jobs if
723 # recheck *all* jobs if
724 # a) we have HWM and an engine just become no longer full
724 # a) we have HWM and an engine just become no longer full
725 # or b) dep_id was given as None
725 # or b) dep_id was given as None
726
726
727 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
727 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
728 jobs = self.queue
728 jobs = self.queue
729 using_queue = True
729 using_queue = True
730 else:
730 else:
731 using_queue = False
731 using_queue = False
732 jobs = deque(sorted( self.queue_map[msg_id] for msg_id in msg_ids ))
732 jobs = deque(sorted( self.queue_map[msg_id] for msg_id in msg_ids ))
733
733
734 to_restore = []
734 to_restore = []
735 while jobs:
735 while jobs:
736 job = jobs.popleft()
736 job = jobs.popleft()
737 if job.removed:
737 if job.removed:
738 continue
738 continue
739 msg_id = job.msg_id
739 msg_id = job.msg_id
740
740
741 put_it_back = True
741 put_it_back = True
742
742
743 if job.after.unreachable(self.all_completed, self.all_failed)\
743 if job.after.unreachable(self.all_completed, self.all_failed)\
744 or job.follow.unreachable(self.all_completed, self.all_failed):
744 or job.follow.unreachable(self.all_completed, self.all_failed):
745 self.fail_unreachable(msg_id)
745 self.fail_unreachable(msg_id)
746 put_it_back = False
746 put_it_back = False
747
747
748 elif job.after.check(self.all_completed, self.all_failed): # time deps met, maybe run
748 elif job.after.check(self.all_completed, self.all_failed): # time deps met, maybe run
749 if self.maybe_run(job):
749 if self.maybe_run(job):
750 put_it_back = False
750 put_it_back = False
751 self.queue_map.pop(msg_id)
751 self.queue_map.pop(msg_id)
752 for mid in job.dependents:
752 for mid in job.dependents:
753 if mid in self.graph:
753 if mid in self.graph:
754 self.graph[mid].remove(msg_id)
754 self.graph[mid].remove(msg_id)
755
755
756 # abort the loop if we just filled up all of our engines.
756 # abort the loop if we just filled up all of our engines.
757 # avoids an O(N) operation in situation of full queue,
757 # avoids an O(N) operation in situation of full queue,
758 # where graph update is triggered as soon as an engine becomes
758 # where graph update is triggered as soon as an engine becomes
759 # non-full, and all tasks after the first are checked,
759 # non-full, and all tasks after the first are checked,
760 # even though they can't run.
760 # even though they can't run.
761 if not self.available_engines():
761 if not self.available_engines():
762 break
762 break
763
763
764 if using_queue and put_it_back:
764 if using_queue and put_it_back:
765 # popped a job from the queue but it neither ran nor failed,
765 # popped a job from the queue but it neither ran nor failed,
766 # so we need to put it back when we are done
766 # so we need to put it back when we are done
767 # make sure to_restore preserves the same ordering
767 # make sure to_restore preserves the same ordering
768 to_restore.append(job)
768 to_restore.append(job)
769
769
770 # put back any tasks we popped but didn't run
770 # put back any tasks we popped but didn't run
771 if using_queue:
771 if using_queue:
772 self.queue.extendleft(to_restore)
772 self.queue.extendleft(to_restore)
773
773
774 #----------------------------------------------------------------------
774 #----------------------------------------------------------------------
775 # methods to be overridden by subclasses
775 # methods to be overridden by subclasses
776 #----------------------------------------------------------------------
776 #----------------------------------------------------------------------
777
777
778 def add_job(self, idx):
778 def add_job(self, idx):
779 """Called after self.targets[idx] just got the job with header.
779 """Called after self.targets[idx] just got the job with header.
780 Override with subclasses. The default ordering is simple LRU.
780 Override with subclasses. The default ordering is simple LRU.
781 The default loads are the number of outstanding jobs."""
781 The default loads are the number of outstanding jobs."""
782 self.loads[idx] += 1
782 self.loads[idx] += 1
783 for lis in (self.targets, self.loads):
783 for lis in (self.targets, self.loads):
784 lis.append(lis.pop(idx))
784 lis.append(lis.pop(idx))
785
785
786
786
787 def finish_job(self, idx):
787 def finish_job(self, idx):
788 """Called after self.targets[idx] just finished a job.
788 """Called after self.targets[idx] just finished a job.
789 Override with subclasses."""
789 Override with subclasses."""
790 self.loads[idx] -= 1
790 self.loads[idx] -= 1
791
791
792
792
793
793
794 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, reg_addr, config=None,
794 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, reg_addr, config=None,
795 logname='root', log_url=None, loglevel=logging.DEBUG,
795 logname='root', log_url=None, loglevel=logging.DEBUG,
796 identity=b'task', in_thread=False):
796 identity=b'task', in_thread=False):
797
797
798 ZMQStream = zmqstream.ZMQStream
798 ZMQStream = zmqstream.ZMQStream
799
799
800 if config:
800 if config:
801 # unwrap dict back into Config
801 # unwrap dict back into Config
802 config = Config(config)
802 config = Config(config)
803
803
804 if in_thread:
804 if in_thread:
805 # use instance() to get the same Context/Loop as our parent
805 # use instance() to get the same Context/Loop as our parent
806 ctx = zmq.Context.instance()
806 ctx = zmq.Context.instance()
807 loop = ioloop.IOLoop.instance()
807 loop = ioloop.IOLoop.instance()
808 else:
808 else:
809 # in a process, don't use instance()
809 # in a process, don't use instance()
810 # for safety with multiprocessing
810 # for safety with multiprocessing
811 ctx = zmq.Context()
811 ctx = zmq.Context()
812 loop = ioloop.IOLoop()
812 loop = ioloop.IOLoop()
813 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
813 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
814 util.set_hwm(ins, 0)
814 util.set_hwm(ins, 0)
815 ins.setsockopt(zmq.IDENTITY, identity + b'_in')
815 ins.setsockopt(zmq.IDENTITY, identity + b'_in')
816 ins.bind(in_addr)
816 ins.bind(in_addr)
817
817
818 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
818 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
819 util.set_hwm(outs, 0)
819 util.set_hwm(outs, 0)
820 outs.setsockopt(zmq.IDENTITY, identity + b'_out')
820 outs.setsockopt(zmq.IDENTITY, identity + b'_out')
821 outs.bind(out_addr)
821 outs.bind(out_addr)
822 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
822 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
823 util.set_hwm(mons, 0)
823 util.set_hwm(mons, 0)
824 mons.connect(mon_addr)
824 mons.connect(mon_addr)
825 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
825 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
826 nots.setsockopt(zmq.SUBSCRIBE, b'')
826 nots.setsockopt(zmq.SUBSCRIBE, b'')
827 nots.connect(not_addr)
827 nots.connect(not_addr)
828
828
829 querys = ZMQStream(ctx.socket(zmq.DEALER),loop)
829 querys = ZMQStream(ctx.socket(zmq.DEALER),loop)
830 querys.connect(reg_addr)
830 querys.connect(reg_addr)
831
831
832 # setup logging.
832 # setup logging.
833 if in_thread:
833 if in_thread:
834 log = Application.instance().log
834 log = Application.instance().log
835 else:
835 else:
836 if log_url:
836 if log_url:
837 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
837 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
838 else:
838 else:
839 log = local_logger(logname, loglevel)
839 log = local_logger(logname, loglevel)
840
840
841 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
841 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
842 mon_stream=mons, notifier_stream=nots,
842 mon_stream=mons, notifier_stream=nots,
843 query_stream=querys,
843 query_stream=querys,
844 loop=loop, log=log,
844 loop=loop, log=log,
845 config=config)
845 config=config)
846 scheduler.start()
846 scheduler.start()
847 if not in_thread:
847 if not in_thread:
848 try:
848 try:
849 loop.start()
849 loop.start()
850 except KeyboardInterrupt:
850 except KeyboardInterrupt:
851 scheduler.log.critical("Interrupted, exiting...")
851 scheduler.log.critical("Interrupted, exiting...")
852
852
General Comments 0
You need to be logged in to leave comments. Login now