##// END OF EJS Templates
check whether all engines are at HWM in a few places...
MinRK -
Show More
@@ -1,797 +1,813 b''
1 """The Python scheduler for rich scheduling.
1 """The Python scheduler for rich scheduling.
2
2
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 Python Scheduler exists.
5 Python Scheduler exists.
6
6
7 Authors:
7 Authors:
8
8
9 * Min RK
9 * Min RK
10 """
10 """
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2010-2011 The IPython Development Team
12 # Copyright (C) 2010-2011 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #----------------------------------------------------------------------
18 #----------------------------------------------------------------------
19 # Imports
19 # Imports
20 #----------------------------------------------------------------------
20 #----------------------------------------------------------------------
21
21
22 from __future__ import print_function
22 from __future__ import print_function
23
23
24 import logging
24 import logging
25 import sys
25 import sys
26 import time
26 import time
27
27
28 from datetime import datetime, timedelta
28 from datetime import datetime, timedelta
29 from random import randint, random
29 from random import randint, random
30 from types import FunctionType
30 from types import FunctionType
31
31
32 try:
32 try:
33 import numpy
33 import numpy
34 except ImportError:
34 except ImportError:
35 numpy = None
35 numpy = None
36
36
37 import zmq
37 import zmq
38 from zmq.eventloop import ioloop, zmqstream
38 from zmq.eventloop import ioloop, zmqstream
39
39
40 # local imports
40 # local imports
41 from IPython.external.decorator import decorator
41 from IPython.external.decorator import decorator
42 from IPython.config.application import Application
42 from IPython.config.application import Application
43 from IPython.config.loader import Config
43 from IPython.config.loader import Config
44 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
44 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
45 from IPython.utils.py3compat import cast_bytes
45 from IPython.utils.py3compat import cast_bytes
46
46
47 from IPython.parallel import error, util
47 from IPython.parallel import error, util
48 from IPython.parallel.factory import SessionFactory
48 from IPython.parallel.factory import SessionFactory
49 from IPython.parallel.util import connect_logger, local_logger
49 from IPython.parallel.util import connect_logger, local_logger
50
50
51 from .dependency import Dependency
51 from .dependency import Dependency
52
52
53 @decorator
53 @decorator
54 def logged(f,self,*args,**kwargs):
54 def logged(f,self,*args,**kwargs):
55 # print ("#--------------------")
55 # print ("#--------------------")
56 self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs)
56 self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs)
57 # print ("#--")
57 # print ("#--")
58 return f(self,*args, **kwargs)
58 return f(self,*args, **kwargs)
59
59
60 #----------------------------------------------------------------------
60 #----------------------------------------------------------------------
61 # Chooser functions
61 # Chooser functions
62 #----------------------------------------------------------------------
62 #----------------------------------------------------------------------
63
63
64 def plainrandom(loads):
64 def plainrandom(loads):
65 """Plain random pick."""
65 """Plain random pick."""
66 n = len(loads)
66 n = len(loads)
67 return randint(0,n-1)
67 return randint(0,n-1)
68
68
69 def lru(loads):
69 def lru(loads):
70 """Always pick the front of the line.
70 """Always pick the front of the line.
71
71
72 The content of `loads` is ignored.
72 The content of `loads` is ignored.
73
73
74 Assumes LRU ordering of loads, with oldest first.
74 Assumes LRU ordering of loads, with oldest first.
75 """
75 """
76 return 0
76 return 0
77
77
78 def twobin(loads):
78 def twobin(loads):
79 """Pick two at random, use the LRU of the two.
79 """Pick two at random, use the LRU of the two.
80
80
81 The content of loads is ignored.
81 The content of loads is ignored.
82
82
83 Assumes LRU ordering of loads, with oldest first.
83 Assumes LRU ordering of loads, with oldest first.
84 """
84 """
85 n = len(loads)
85 n = len(loads)
86 a = randint(0,n-1)
86 a = randint(0,n-1)
87 b = randint(0,n-1)
87 b = randint(0,n-1)
88 return min(a,b)
88 return min(a,b)
89
89
90 def weighted(loads):
90 def weighted(loads):
91 """Pick two at random using inverse load as weight.
91 """Pick two at random using inverse load as weight.
92
92
93 Return the less loaded of the two.
93 Return the less loaded of the two.
94 """
94 """
95 # weight 0 a million times more than 1:
95 # weight 0 a million times more than 1:
96 weights = 1./(1e-6+numpy.array(loads))
96 weights = 1./(1e-6+numpy.array(loads))
97 sums = weights.cumsum()
97 sums = weights.cumsum()
98 t = sums[-1]
98 t = sums[-1]
99 x = random()*t
99 x = random()*t
100 y = random()*t
100 y = random()*t
101 idx = 0
101 idx = 0
102 idy = 0
102 idy = 0
103 while sums[idx] < x:
103 while sums[idx] < x:
104 idx += 1
104 idx += 1
105 while sums[idy] < y:
105 while sums[idy] < y:
106 idy += 1
106 idy += 1
107 if weights[idy] > weights[idx]:
107 if weights[idy] > weights[idx]:
108 return idy
108 return idy
109 else:
109 else:
110 return idx
110 return idx
111
111
112 def leastload(loads):
112 def leastload(loads):
113 """Always choose the lowest load.
113 """Always choose the lowest load.
114
114
115 If the lowest load occurs more than once, the first
115 If the lowest load occurs more than once, the first
116 occurance will be used. If loads has LRU ordering, this means
116 occurance will be used. If loads has LRU ordering, this means
117 the LRU of those with the lowest load is chosen.
117 the LRU of those with the lowest load is chosen.
118 """
118 """
119 return loads.index(min(loads))
119 return loads.index(min(loads))
120
120
121 #---------------------------------------------------------------------
121 #---------------------------------------------------------------------
122 # Classes
122 # Classes
123 #---------------------------------------------------------------------
123 #---------------------------------------------------------------------
124
124
125
125
126 # store empty default dependency:
126 # store empty default dependency:
127 MET = Dependency([])
127 MET = Dependency([])
128
128
129
129
130 class Job(object):
130 class Job(object):
131 """Simple container for a job"""
131 """Simple container for a job"""
132 def __init__(self, msg_id, raw_msg, idents, msg, header, metadata,
132 def __init__(self, msg_id, raw_msg, idents, msg, header, metadata,
133 targets, after, follow, timeout):
133 targets, after, follow, timeout):
134 self.msg_id = msg_id
134 self.msg_id = msg_id
135 self.raw_msg = raw_msg
135 self.raw_msg = raw_msg
136 self.idents = idents
136 self.idents = idents
137 self.msg = msg
137 self.msg = msg
138 self.header = header
138 self.header = header
139 self.metadata = metadata
139 self.metadata = metadata
140 self.targets = targets
140 self.targets = targets
141 self.after = after
141 self.after = after
142 self.follow = follow
142 self.follow = follow
143 self.timeout = timeout
143 self.timeout = timeout
144
144
145
145
146 self.timestamp = time.time()
146 self.timestamp = time.time()
147 self.blacklist = set()
147 self.blacklist = set()
148
148
149 @property
149 @property
150 def dependents(self):
150 def dependents(self):
151 return self.follow.union(self.after)
151 return self.follow.union(self.after)
152
152
153 class TaskScheduler(SessionFactory):
153 class TaskScheduler(SessionFactory):
154 """Python TaskScheduler object.
154 """Python TaskScheduler object.
155
155
156 This is the simplest object that supports msg_id based
156 This is the simplest object that supports msg_id based
157 DAG dependencies. *Only* task msg_ids are checked, not
157 DAG dependencies. *Only* task msg_ids are checked, not
158 msg_ids of jobs submitted via the MUX queue.
158 msg_ids of jobs submitted via the MUX queue.
159
159
160 """
160 """
161
161
162 hwm = Integer(1, config=True,
162 hwm = Integer(1, config=True,
163 help="""specify the High Water Mark (HWM) for the downstream
163 help="""specify the High Water Mark (HWM) for the downstream
164 socket in the Task scheduler. This is the maximum number
164 socket in the Task scheduler. This is the maximum number
165 of allowed outstanding tasks on each engine.
165 of allowed outstanding tasks on each engine.
166
166
167 The default (1) means that only one task can be outstanding on each
167 The default (1) means that only one task can be outstanding on each
168 engine. Setting TaskScheduler.hwm=0 means there is no limit, and the
168 engine. Setting TaskScheduler.hwm=0 means there is no limit, and the
169 engines continue to be assigned tasks while they are working,
169 engines continue to be assigned tasks while they are working,
170 effectively hiding network latency behind computation, but can result
170 effectively hiding network latency behind computation, but can result
171 in an imbalance of work when submitting many heterogenous tasks all at
171 in an imbalance of work when submitting many heterogenous tasks all at
172 once. Any positive value greater than one is a compromise between the
172 once. Any positive value greater than one is a compromise between the
173 two.
173 two.
174
174
175 """
175 """
176 )
176 )
177 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
177 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
178 'leastload', config=True, allow_none=False,
178 'leastload', config=True, allow_none=False,
179 help="""select the task scheduler scheme [default: Python LRU]
179 help="""select the task scheduler scheme [default: Python LRU]
180 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
180 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
181 )
181 )
182 def _scheme_name_changed(self, old, new):
182 def _scheme_name_changed(self, old, new):
183 self.log.debug("Using scheme %r"%new)
183 self.log.debug("Using scheme %r"%new)
184 self.scheme = globals()[new]
184 self.scheme = globals()[new]
185
185
186 # input arguments:
186 # input arguments:
187 scheme = Instance(FunctionType) # function for determining the destination
187 scheme = Instance(FunctionType) # function for determining the destination
188 def _scheme_default(self):
188 def _scheme_default(self):
189 return leastload
189 return leastload
190 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
190 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
191 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
191 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
192 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
192 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
193 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
193 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
194 query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream
194 query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream
195
195
196 # internals:
196 # internals:
197 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
197 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
198 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
198 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
199 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
199 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
200 depending = Dict() # dict by msg_id of Jobs
200 depending = Dict() # dict by msg_id of Jobs
201 pending = Dict() # dict by engine_uuid of submitted tasks
201 pending = Dict() # dict by engine_uuid of submitted tasks
202 completed = Dict() # dict by engine_uuid of completed tasks
202 completed = Dict() # dict by engine_uuid of completed tasks
203 failed = Dict() # dict by engine_uuid of failed tasks
203 failed = Dict() # dict by engine_uuid of failed tasks
204 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
204 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
205 clients = Dict() # dict by msg_id for who submitted the task
205 clients = Dict() # dict by msg_id for who submitted the task
206 targets = List() # list of target IDENTs
206 targets = List() # list of target IDENTs
207 loads = List() # list of engine loads
207 loads = List() # list of engine loads
208 # full = Set() # set of IDENTs that have HWM outstanding tasks
208 # full = Set() # set of IDENTs that have HWM outstanding tasks
209 all_completed = Set() # set of all completed tasks
209 all_completed = Set() # set of all completed tasks
210 all_failed = Set() # set of all failed tasks
210 all_failed = Set() # set of all failed tasks
211 all_done = Set() # set of all finished tasks=union(completed,failed)
211 all_done = Set() # set of all finished tasks=union(completed,failed)
212 all_ids = Set() # set of all submitted task IDs
212 all_ids = Set() # set of all submitted task IDs
213
213
214 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
214 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
215
215
216 ident = CBytes() # ZMQ identity. This should just be self.session.session
216 ident = CBytes() # ZMQ identity. This should just be self.session.session
217 # but ensure Bytes
217 # but ensure Bytes
218 def _ident_default(self):
218 def _ident_default(self):
219 return self.session.bsession
219 return self.session.bsession
220
220
221 def start(self):
221 def start(self):
222 self.query_stream.on_recv(self.dispatch_query_reply)
222 self.query_stream.on_recv(self.dispatch_query_reply)
223 self.session.send(self.query_stream, "connection_request", {})
223 self.session.send(self.query_stream, "connection_request", {})
224
224
225 self.engine_stream.on_recv(self.dispatch_result, copy=False)
225 self.engine_stream.on_recv(self.dispatch_result, copy=False)
226 self.client_stream.on_recv(self.dispatch_submission, copy=False)
226 self.client_stream.on_recv(self.dispatch_submission, copy=False)
227
227
228 self._notification_handlers = dict(
228 self._notification_handlers = dict(
229 registration_notification = self._register_engine,
229 registration_notification = self._register_engine,
230 unregistration_notification = self._unregister_engine
230 unregistration_notification = self._unregister_engine
231 )
231 )
232 self.notifier_stream.on_recv(self.dispatch_notification)
232 self.notifier_stream.on_recv(self.dispatch_notification)
233 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
233 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
234 self.auditor.start()
234 self.auditor.start()
235 self.log.info("Scheduler started [%s]"%self.scheme_name)
235 self.log.info("Scheduler started [%s]"%self.scheme_name)
236
236
237 def resume_receiving(self):
237 def resume_receiving(self):
238 """Resume accepting jobs."""
238 """Resume accepting jobs."""
239 self.client_stream.on_recv(self.dispatch_submission, copy=False)
239 self.client_stream.on_recv(self.dispatch_submission, copy=False)
240
240
241 def stop_receiving(self):
241 def stop_receiving(self):
242 """Stop accepting jobs while there are no engines.
242 """Stop accepting jobs while there are no engines.
243 Leave them in the ZMQ queue."""
243 Leave them in the ZMQ queue."""
244 self.client_stream.on_recv(None)
244 self.client_stream.on_recv(None)
245
245
246 #-----------------------------------------------------------------------
246 #-----------------------------------------------------------------------
247 # [Un]Registration Handling
247 # [Un]Registration Handling
248 #-----------------------------------------------------------------------
248 #-----------------------------------------------------------------------
249
249
250
250
251 def dispatch_query_reply(self, msg):
251 def dispatch_query_reply(self, msg):
252 """handle reply to our initial connection request"""
252 """handle reply to our initial connection request"""
253 try:
253 try:
254 idents,msg = self.session.feed_identities(msg)
254 idents,msg = self.session.feed_identities(msg)
255 except ValueError:
255 except ValueError:
256 self.log.warn("task::Invalid Message: %r",msg)
256 self.log.warn("task::Invalid Message: %r",msg)
257 return
257 return
258 try:
258 try:
259 msg = self.session.unserialize(msg)
259 msg = self.session.unserialize(msg)
260 except ValueError:
260 except ValueError:
261 self.log.warn("task::Unauthorized message from: %r"%idents)
261 self.log.warn("task::Unauthorized message from: %r"%idents)
262 return
262 return
263
263
264 content = msg['content']
264 content = msg['content']
265 for uuid in content.get('engines', {}).values():
265 for uuid in content.get('engines', {}).values():
266 self._register_engine(cast_bytes(uuid))
266 self._register_engine(cast_bytes(uuid))
267
267
268
268
269 @util.log_errors
269 @util.log_errors
270 def dispatch_notification(self, msg):
270 def dispatch_notification(self, msg):
271 """dispatch register/unregister events."""
271 """dispatch register/unregister events."""
272 try:
272 try:
273 idents,msg = self.session.feed_identities(msg)
273 idents,msg = self.session.feed_identities(msg)
274 except ValueError:
274 except ValueError:
275 self.log.warn("task::Invalid Message: %r",msg)
275 self.log.warn("task::Invalid Message: %r",msg)
276 return
276 return
277 try:
277 try:
278 msg = self.session.unserialize(msg)
278 msg = self.session.unserialize(msg)
279 except ValueError:
279 except ValueError:
280 self.log.warn("task::Unauthorized message from: %r"%idents)
280 self.log.warn("task::Unauthorized message from: %r"%idents)
281 return
281 return
282
282
283 msg_type = msg['header']['msg_type']
283 msg_type = msg['header']['msg_type']
284
284
285 handler = self._notification_handlers.get(msg_type, None)
285 handler = self._notification_handlers.get(msg_type, None)
286 if handler is None:
286 if handler is None:
287 self.log.error("Unhandled message type: %r"%msg_type)
287 self.log.error("Unhandled message type: %r"%msg_type)
288 else:
288 else:
289 try:
289 try:
290 handler(cast_bytes(msg['content']['uuid']))
290 handler(cast_bytes(msg['content']['uuid']))
291 except Exception:
291 except Exception:
292 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
292 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
293
293
294 def _register_engine(self, uid):
294 def _register_engine(self, uid):
295 """New engine with ident `uid` became available."""
295 """New engine with ident `uid` became available."""
296 # head of the line:
296 # head of the line:
297 self.targets.insert(0,uid)
297 self.targets.insert(0,uid)
298 self.loads.insert(0,0)
298 self.loads.insert(0,0)
299
299
300 # initialize sets
300 # initialize sets
301 self.completed[uid] = set()
301 self.completed[uid] = set()
302 self.failed[uid] = set()
302 self.failed[uid] = set()
303 self.pending[uid] = {}
303 self.pending[uid] = {}
304
304
305 # rescan the graph:
305 # rescan the graph:
306 self.update_graph(None)
306 self.update_graph(None)
307
307
308 def _unregister_engine(self, uid):
308 def _unregister_engine(self, uid):
309 """Existing engine with ident `uid` became unavailable."""
309 """Existing engine with ident `uid` became unavailable."""
310 if len(self.targets) == 1:
310 if len(self.targets) == 1:
311 # this was our only engine
311 # this was our only engine
312 pass
312 pass
313
313
314 # handle any potentially finished tasks:
314 # handle any potentially finished tasks:
315 self.engine_stream.flush()
315 self.engine_stream.flush()
316
316
317 # don't pop destinations, because they might be used later
317 # don't pop destinations, because they might be used later
318 # map(self.destinations.pop, self.completed.pop(uid))
318 # map(self.destinations.pop, self.completed.pop(uid))
319 # map(self.destinations.pop, self.failed.pop(uid))
319 # map(self.destinations.pop, self.failed.pop(uid))
320
320
321 # prevent this engine from receiving work
321 # prevent this engine from receiving work
322 idx = self.targets.index(uid)
322 idx = self.targets.index(uid)
323 self.targets.pop(idx)
323 self.targets.pop(idx)
324 self.loads.pop(idx)
324 self.loads.pop(idx)
325
325
326 # wait 5 seconds before cleaning up pending jobs, since the results might
326 # wait 5 seconds before cleaning up pending jobs, since the results might
327 # still be incoming
327 # still be incoming
328 if self.pending[uid]:
328 if self.pending[uid]:
329 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
329 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
330 dc.start()
330 dc.start()
331 else:
331 else:
332 self.completed.pop(uid)
332 self.completed.pop(uid)
333 self.failed.pop(uid)
333 self.failed.pop(uid)
334
334
335
335
336 def handle_stranded_tasks(self, engine):
336 def handle_stranded_tasks(self, engine):
337 """Deal with jobs resident in an engine that died."""
337 """Deal with jobs resident in an engine that died."""
338 lost = self.pending[engine]
338 lost = self.pending[engine]
339 for msg_id in lost.keys():
339 for msg_id in lost.keys():
340 if msg_id not in self.pending[engine]:
340 if msg_id not in self.pending[engine]:
341 # prevent double-handling of messages
341 # prevent double-handling of messages
342 continue
342 continue
343
343
344 raw_msg = lost[msg_id].raw_msg
344 raw_msg = lost[msg_id].raw_msg
345 idents,msg = self.session.feed_identities(raw_msg, copy=False)
345 idents,msg = self.session.feed_identities(raw_msg, copy=False)
346 parent = self.session.unpack(msg[1].bytes)
346 parent = self.session.unpack(msg[1].bytes)
347 idents = [engine, idents[0]]
347 idents = [engine, idents[0]]
348
348
349 # build fake error reply
349 # build fake error reply
350 try:
350 try:
351 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
351 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
352 except:
352 except:
353 content = error.wrap_exception()
353 content = error.wrap_exception()
354 # build fake metadata
354 # build fake metadata
355 md = dict(
355 md = dict(
356 status=u'error',
356 status=u'error',
357 engine=engine,
357 engine=engine,
358 date=datetime.now(),
358 date=datetime.now(),
359 )
359 )
360 msg = self.session.msg('apply_reply', content, parent=parent, metadata=md)
360 msg = self.session.msg('apply_reply', content, parent=parent, metadata=md)
361 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
361 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
362 # and dispatch it
362 # and dispatch it
363 self.dispatch_result(raw_reply)
363 self.dispatch_result(raw_reply)
364
364
365 # finally scrub completed/failed lists
365 # finally scrub completed/failed lists
366 self.completed.pop(engine)
366 self.completed.pop(engine)
367 self.failed.pop(engine)
367 self.failed.pop(engine)
368
368
369
369
370 #-----------------------------------------------------------------------
370 #-----------------------------------------------------------------------
371 # Job Submission
371 # Job Submission
372 #-----------------------------------------------------------------------
372 #-----------------------------------------------------------------------
373
373
374
374
375 @util.log_errors
375 @util.log_errors
376 def dispatch_submission(self, raw_msg):
376 def dispatch_submission(self, raw_msg):
377 """Dispatch job submission to appropriate handlers."""
377 """Dispatch job submission to appropriate handlers."""
378 # ensure targets up to date:
378 # ensure targets up to date:
379 self.notifier_stream.flush()
379 self.notifier_stream.flush()
380 try:
380 try:
381 idents, msg = self.session.feed_identities(raw_msg, copy=False)
381 idents, msg = self.session.feed_identities(raw_msg, copy=False)
382 msg = self.session.unserialize(msg, content=False, copy=False)
382 msg = self.session.unserialize(msg, content=False, copy=False)
383 except Exception:
383 except Exception:
384 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
384 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
385 return
385 return
386
386
387
387
388 # send to monitor
388 # send to monitor
389 self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
389 self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
390
390
391 header = msg['header']
391 header = msg['header']
392 md = msg['metadata']
392 md = msg['metadata']
393 msg_id = header['msg_id']
393 msg_id = header['msg_id']
394 self.all_ids.add(msg_id)
394 self.all_ids.add(msg_id)
395
395
396 # get targets as a set of bytes objects
396 # get targets as a set of bytes objects
397 # from a list of unicode objects
397 # from a list of unicode objects
398 targets = md.get('targets', [])
398 targets = md.get('targets', [])
399 targets = map(cast_bytes, targets)
399 targets = map(cast_bytes, targets)
400 targets = set(targets)
400 targets = set(targets)
401
401
402 retries = md.get('retries', 0)
402 retries = md.get('retries', 0)
403 self.retries[msg_id] = retries
403 self.retries[msg_id] = retries
404
404
405 # time dependencies
405 # time dependencies
406 after = md.get('after', None)
406 after = md.get('after', None)
407 if after:
407 if after:
408 after = Dependency(after)
408 after = Dependency(after)
409 if after.all:
409 if after.all:
410 if after.success:
410 if after.success:
411 after = Dependency(after.difference(self.all_completed),
411 after = Dependency(after.difference(self.all_completed),
412 success=after.success,
412 success=after.success,
413 failure=after.failure,
413 failure=after.failure,
414 all=after.all,
414 all=after.all,
415 )
415 )
416 if after.failure:
416 if after.failure:
417 after = Dependency(after.difference(self.all_failed),
417 after = Dependency(after.difference(self.all_failed),
418 success=after.success,
418 success=after.success,
419 failure=after.failure,
419 failure=after.failure,
420 all=after.all,
420 all=after.all,
421 )
421 )
422 if after.check(self.all_completed, self.all_failed):
422 if after.check(self.all_completed, self.all_failed):
423 # recast as empty set, if `after` already met,
423 # recast as empty set, if `after` already met,
424 # to prevent unnecessary set comparisons
424 # to prevent unnecessary set comparisons
425 after = MET
425 after = MET
426 else:
426 else:
427 after = MET
427 after = MET
428
428
429 # location dependencies
429 # location dependencies
430 follow = Dependency(md.get('follow', []))
430 follow = Dependency(md.get('follow', []))
431
431
432 # turn timeouts into datetime objects:
432 # turn timeouts into datetime objects:
433 timeout = md.get('timeout', None)
433 timeout = md.get('timeout', None)
434 if timeout:
434 if timeout:
435 # cast to float, because jsonlib returns floats as decimal.Decimal,
435 # cast to float, because jsonlib returns floats as decimal.Decimal,
436 # which timedelta does not accept
436 # which timedelta does not accept
437 timeout = datetime.now() + timedelta(0,float(timeout),0)
437 timeout = datetime.now() + timedelta(0,float(timeout),0)
438
438
439 job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg,
439 job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg,
440 header=header, targets=targets, after=after, follow=follow,
440 header=header, targets=targets, after=after, follow=follow,
441 timeout=timeout, metadata=md,
441 timeout=timeout, metadata=md,
442 )
442 )
443
443
444 # validate and reduce dependencies:
444 # validate and reduce dependencies:
445 for dep in after,follow:
445 for dep in after,follow:
446 if not dep: # empty dependency
446 if not dep: # empty dependency
447 continue
447 continue
448 # check valid:
448 # check valid:
449 if msg_id in dep or dep.difference(self.all_ids):
449 if msg_id in dep or dep.difference(self.all_ids):
450 self.depending[msg_id] = job
450 self.depending[msg_id] = job
451 return self.fail_unreachable(msg_id, error.InvalidDependency)
451 return self.fail_unreachable(msg_id, error.InvalidDependency)
452 # check if unreachable:
452 # check if unreachable:
453 if dep.unreachable(self.all_completed, self.all_failed):
453 if dep.unreachable(self.all_completed, self.all_failed):
454 self.depending[msg_id] = job
454 self.depending[msg_id] = job
455 return self.fail_unreachable(msg_id)
455 return self.fail_unreachable(msg_id)
456
456
457 if after.check(self.all_completed, self.all_failed):
457 if after.check(self.all_completed, self.all_failed):
458 # time deps already met, try to run
458 # time deps already met, try to run
459 if not self.maybe_run(job):
459 if not self.maybe_run(job):
460 # can't run yet
460 # can't run yet
461 if msg_id not in self.all_failed:
461 if msg_id not in self.all_failed:
462 # could have failed as unreachable
462 # could have failed as unreachable
463 self.save_unmet(job)
463 self.save_unmet(job)
464 else:
464 else:
465 self.save_unmet(job)
465 self.save_unmet(job)
466
466
467 def audit_timeouts(self):
467 def audit_timeouts(self):
468 """Audit all waiting tasks for expired timeouts."""
468 """Audit all waiting tasks for expired timeouts."""
469 now = datetime.now()
469 now = datetime.now()
470 for msg_id in self.depending.keys():
470 for msg_id in self.depending.keys():
471 # must recheck, in case one failure cascaded to another:
471 # must recheck, in case one failure cascaded to another:
472 if msg_id in self.depending:
472 if msg_id in self.depending:
473 job = self.depending[msg_id]
473 job = self.depending[msg_id]
474 if job.timeout and job.timeout < now:
474 if job.timeout and job.timeout < now:
475 self.fail_unreachable(msg_id, error.TaskTimeout)
475 self.fail_unreachable(msg_id, error.TaskTimeout)
476
476
477 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
477 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
478 """a task has become unreachable, send a reply with an ImpossibleDependency
478 """a task has become unreachable, send a reply with an ImpossibleDependency
479 error."""
479 error."""
480 if msg_id not in self.depending:
480 if msg_id not in self.depending:
481 self.log.error("msg %r already failed!", msg_id)
481 self.log.error("msg %r already failed!", msg_id)
482 return
482 return
483 job = self.depending.pop(msg_id)
483 job = self.depending.pop(msg_id)
484 for mid in job.dependents:
484 for mid in job.dependents:
485 if mid in self.graph:
485 if mid in self.graph:
486 self.graph[mid].remove(msg_id)
486 self.graph[mid].remove(msg_id)
487
487
488 try:
488 try:
489 raise why()
489 raise why()
490 except:
490 except:
491 content = error.wrap_exception()
491 content = error.wrap_exception()
492
492
493 self.all_done.add(msg_id)
493 self.all_done.add(msg_id)
494 self.all_failed.add(msg_id)
494 self.all_failed.add(msg_id)
495
495
496 msg = self.session.send(self.client_stream, 'apply_reply', content,
496 msg = self.session.send(self.client_stream, 'apply_reply', content,
497 parent=job.header, ident=job.idents)
497 parent=job.header, ident=job.idents)
498 self.session.send(self.mon_stream, msg, ident=[b'outtask']+job.idents)
498 self.session.send(self.mon_stream, msg, ident=[b'outtask']+job.idents)
499
499
500 self.update_graph(msg_id, success=False)
500 self.update_graph(msg_id, success=False)
501
501
502 def available_engines(self):
503 """return a list of available engine indices based on HWM"""
504 if not self.hwm:
505 return range(len(self.targets))
506 available = []
507 for idx in range(len(self.targets)):
508 if self.loads[idx] < self.hwm:
509 available.append(idx)
510 return available
511
502 def maybe_run(self, job):
512 def maybe_run(self, job):
503 """check location dependencies, and run if they are met."""
513 """check location dependencies, and run if they are met."""
504 msg_id = job.msg_id
514 msg_id = job.msg_id
505 self.log.debug("Attempting to assign task %s", msg_id)
515 self.log.debug("Attempting to assign task %s", msg_id)
506 if not self.targets:
516 available = self.available_engines()
517 if not available:
507 # no engines, definitely can't run
518 # no engines, definitely can't run
508 return False
519 return False
509
520
510 if job.follow or job.targets or job.blacklist or self.hwm:
521 if job.follow or job.targets or job.blacklist or self.hwm:
511 # we need a can_run filter
522 # we need a can_run filter
512 def can_run(idx):
523 def can_run(idx):
513 # check hwm
524 # check hwm
514 if self.hwm and self.loads[idx] == self.hwm:
525 if self.hwm and self.loads[idx] == self.hwm:
515 return False
526 return False
516 target = self.targets[idx]
527 target = self.targets[idx]
517 # check blacklist
528 # check blacklist
518 if target in job.blacklist:
529 if target in job.blacklist:
519 return False
530 return False
520 # check targets
531 # check targets
521 if job.targets and target not in job.targets:
532 if job.targets and target not in job.targets:
522 return False
533 return False
523 # check follow
534 # check follow
524 return job.follow.check(self.completed[target], self.failed[target])
535 return job.follow.check(self.completed[target], self.failed[target])
525
536
526 indices = filter(can_run, range(len(self.targets)))
537 indices = filter(can_run, available)
527
538
528 if not indices:
539 if not indices:
529 # couldn't run
540 # couldn't run
530 if job.follow.all:
541 if job.follow.all:
531 # check follow for impossibility
542 # check follow for impossibility
532 dests = set()
543 dests = set()
533 relevant = set()
544 relevant = set()
534 if job.follow.success:
545 if job.follow.success:
535 relevant = self.all_completed
546 relevant = self.all_completed
536 if job.follow.failure:
547 if job.follow.failure:
537 relevant = relevant.union(self.all_failed)
548 relevant = relevant.union(self.all_failed)
538 for m in job.follow.intersection(relevant):
549 for m in job.follow.intersection(relevant):
539 dests.add(self.destinations[m])
550 dests.add(self.destinations[m])
540 if len(dests) > 1:
551 if len(dests) > 1:
541 self.depending[msg_id] = job
552 self.depending[msg_id] = job
542 self.fail_unreachable(msg_id)
553 self.fail_unreachable(msg_id)
543 return False
554 return False
544 if job.targets:
555 if job.targets:
545 # check blacklist+targets for impossibility
556 # check blacklist+targets for impossibility
546 job.targets.difference_update(job.blacklist)
557 job.targets.difference_update(job.blacklist)
547 if not job.targets or not job.targets.intersection(self.targets):
558 if not job.targets or not job.targets.intersection(self.targets):
548 self.depending[msg_id] = job
559 self.depending[msg_id] = job
549 self.fail_unreachable(msg_id)
560 self.fail_unreachable(msg_id)
550 return False
561 return False
551 return False
562 return False
552 else:
563 else:
553 indices = None
564 indices = None
554
565
555 self.submit_task(job, indices)
566 self.submit_task(job, indices)
556 return True
567 return True
557
568
558 def save_unmet(self, job):
569 def save_unmet(self, job):
559 """Save a message for later submission when its dependencies are met."""
570 """Save a message for later submission when its dependencies are met."""
560 msg_id = job.msg_id
571 msg_id = job.msg_id
561 self.depending[msg_id] = job
572 self.depending[msg_id] = job
562 # track the ids in follow or after, but not those already finished
573 # track the ids in follow or after, but not those already finished
563 for dep_id in job.after.union(job.follow).difference(self.all_done):
574 for dep_id in job.after.union(job.follow).difference(self.all_done):
564 if dep_id not in self.graph:
575 if dep_id not in self.graph:
565 self.graph[dep_id] = set()
576 self.graph[dep_id] = set()
566 self.graph[dep_id].add(msg_id)
577 self.graph[dep_id].add(msg_id)
567
578
568 def submit_task(self, job, indices=None):
579 def submit_task(self, job, indices=None):
569 """Submit a task to any of a subset of our targets."""
580 """Submit a task to any of a subset of our targets."""
570 if indices:
581 if indices:
571 loads = [self.loads[i] for i in indices]
582 loads = [self.loads[i] for i in indices]
572 else:
583 else:
573 loads = self.loads
584 loads = self.loads
574 idx = self.scheme(loads)
585 idx = self.scheme(loads)
575 if indices:
586 if indices:
576 idx = indices[idx]
587 idx = indices[idx]
577 target = self.targets[idx]
588 target = self.targets[idx]
578 # print (target, map(str, msg[:3]))
589 # print (target, map(str, msg[:3]))
579 # send job to the engine
590 # send job to the engine
580 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
591 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
581 self.engine_stream.send_multipart(job.raw_msg, copy=False)
592 self.engine_stream.send_multipart(job.raw_msg, copy=False)
582 # update load
593 # update load
583 self.add_job(idx)
594 self.add_job(idx)
584 self.pending[target][job.msg_id] = job
595 self.pending[target][job.msg_id] = job
585 # notify Hub
596 # notify Hub
586 content = dict(msg_id=job.msg_id, engine_id=target.decode('ascii'))
597 content = dict(msg_id=job.msg_id, engine_id=target.decode('ascii'))
587 self.session.send(self.mon_stream, 'task_destination', content=content,
598 self.session.send(self.mon_stream, 'task_destination', content=content,
588 ident=[b'tracktask',self.ident])
599 ident=[b'tracktask',self.ident])
589
600
590
601
591 #-----------------------------------------------------------------------
602 #-----------------------------------------------------------------------
592 # Result Handling
603 # Result Handling
593 #-----------------------------------------------------------------------
604 #-----------------------------------------------------------------------
594
605
595
606
596 @util.log_errors
607 @util.log_errors
597 def dispatch_result(self, raw_msg):
608 def dispatch_result(self, raw_msg):
598 """dispatch method for result replies"""
609 """dispatch method for result replies"""
599 try:
610 try:
600 idents,msg = self.session.feed_identities(raw_msg, copy=False)
611 idents,msg = self.session.feed_identities(raw_msg, copy=False)
601 msg = self.session.unserialize(msg, content=False, copy=False)
612 msg = self.session.unserialize(msg, content=False, copy=False)
602 engine = idents[0]
613 engine = idents[0]
603 try:
614 try:
604 idx = self.targets.index(engine)
615 idx = self.targets.index(engine)
605 except ValueError:
616 except ValueError:
606 pass # skip load-update for dead engines
617 pass # skip load-update for dead engines
607 else:
618 else:
608 self.finish_job(idx)
619 self.finish_job(idx)
609 except Exception:
620 except Exception:
610 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
621 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
611 return
622 return
612
623
613 md = msg['metadata']
624 md = msg['metadata']
614 parent = msg['parent_header']
625 parent = msg['parent_header']
615 if md.get('dependencies_met', True):
626 if md.get('dependencies_met', True):
616 success = (md['status'] == 'ok')
627 success = (md['status'] == 'ok')
617 msg_id = parent['msg_id']
628 msg_id = parent['msg_id']
618 retries = self.retries[msg_id]
629 retries = self.retries[msg_id]
619 if not success and retries > 0:
630 if not success and retries > 0:
620 # failed
631 # failed
621 self.retries[msg_id] = retries - 1
632 self.retries[msg_id] = retries - 1
622 self.handle_unmet_dependency(idents, parent)
633 self.handle_unmet_dependency(idents, parent)
623 else:
634 else:
624 del self.retries[msg_id]
635 del self.retries[msg_id]
625 # relay to client and update graph
636 # relay to client and update graph
626 self.handle_result(idents, parent, raw_msg, success)
637 self.handle_result(idents, parent, raw_msg, success)
627 # send to Hub monitor
638 # send to Hub monitor
628 self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False)
639 self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False)
629 else:
640 else:
630 self.handle_unmet_dependency(idents, parent)
641 self.handle_unmet_dependency(idents, parent)
631
642
632 def handle_result(self, idents, parent, raw_msg, success=True):
643 def handle_result(self, idents, parent, raw_msg, success=True):
633 """handle a real task result, either success or failure"""
644 """handle a real task result, either success or failure"""
634 # first, relay result to client
645 # first, relay result to client
635 engine = idents[0]
646 engine = idents[0]
636 client = idents[1]
647 client = idents[1]
637 # swap_ids for ROUTER-ROUTER mirror
648 # swap_ids for ROUTER-ROUTER mirror
638 raw_msg[:2] = [client,engine]
649 raw_msg[:2] = [client,engine]
639 # print (map(str, raw_msg[:4]))
650 # print (map(str, raw_msg[:4]))
640 self.client_stream.send_multipart(raw_msg, copy=False)
651 self.client_stream.send_multipart(raw_msg, copy=False)
641 # now, update our data structures
652 # now, update our data structures
642 msg_id = parent['msg_id']
653 msg_id = parent['msg_id']
643 self.pending[engine].pop(msg_id)
654 self.pending[engine].pop(msg_id)
644 if success:
655 if success:
645 self.completed[engine].add(msg_id)
656 self.completed[engine].add(msg_id)
646 self.all_completed.add(msg_id)
657 self.all_completed.add(msg_id)
647 else:
658 else:
648 self.failed[engine].add(msg_id)
659 self.failed[engine].add(msg_id)
649 self.all_failed.add(msg_id)
660 self.all_failed.add(msg_id)
650 self.all_done.add(msg_id)
661 self.all_done.add(msg_id)
651 self.destinations[msg_id] = engine
662 self.destinations[msg_id] = engine
652
663
653 self.update_graph(msg_id, success)
664 self.update_graph(msg_id, success)
654
665
655 def handle_unmet_dependency(self, idents, parent):
666 def handle_unmet_dependency(self, idents, parent):
656 """handle an unmet dependency"""
667 """handle an unmet dependency"""
657 engine = idents[0]
668 engine = idents[0]
658 msg_id = parent['msg_id']
669 msg_id = parent['msg_id']
659
670
660 job = self.pending[engine].pop(msg_id)
671 job = self.pending[engine].pop(msg_id)
661 job.blacklist.add(engine)
672 job.blacklist.add(engine)
662
673
663 if job.blacklist == job.targets:
674 if job.blacklist == job.targets:
664 self.depending[msg_id] = job
675 self.depending[msg_id] = job
665 self.fail_unreachable(msg_id)
676 self.fail_unreachable(msg_id)
666 elif not self.maybe_run(job):
677 elif not self.maybe_run(job):
667 # resubmit failed
678 # resubmit failed
668 if msg_id not in self.all_failed:
679 if msg_id not in self.all_failed:
669 # put it back in our dependency tree
680 # put it back in our dependency tree
670 self.save_unmet(job)
681 self.save_unmet(job)
671
682
672 if self.hwm:
683 if self.hwm:
673 try:
684 try:
674 idx = self.targets.index(engine)
685 idx = self.targets.index(engine)
675 except ValueError:
686 except ValueError:
676 pass # skip load-update for dead engines
687 pass # skip load-update for dead engines
677 else:
688 else:
678 if self.loads[idx] == self.hwm-1:
689 if self.loads[idx] == self.hwm-1:
679 self.update_graph(None)
690 self.update_graph(None)
680
691
681
682
683 def update_graph(self, dep_id=None, success=True):
692 def update_graph(self, dep_id=None, success=True):
684 """dep_id just finished. Update our dependency
693 """dep_id just finished. Update our dependency
685 graph and submit any jobs that just became runable.
694 graph and submit any jobs that just became runnable.
686
695
687 Called with dep_id=None to update entire graph for hwm, but without finishing
696 Called with dep_id=None to update entire graph for hwm, but without finishing a task.
688 a task.
689 """
697 """
690 # print ("\n\n***********")
698 # print ("\n\n***********")
691 # pprint (dep_id)
699 # pprint (dep_id)
692 # pprint (self.graph)
700 # pprint (self.graph)
693 # pprint (self.depending)
701 # pprint (self.depending)
694 # pprint (self.all_completed)
702 # pprint (self.all_completed)
695 # pprint (self.all_failed)
703 # pprint (self.all_failed)
696 # print ("\n\n***********\n\n")
704 # print ("\n\n***********\n\n")
697 # update any jobs that depended on the dependency
705 # update any jobs that depended on the dependency
698 jobs = self.graph.pop(dep_id, [])
706 jobs = self.graph.pop(dep_id, [])
699
707
700 # recheck *all* jobs if
708 # recheck *all* jobs if
701 # a) we have HWM and an engine just become no longer full
709 # a) we have HWM and an engine just become no longer full
702 # or b) dep_id was given as None
710 # or b) dep_id was given as None
703
711
704 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
712 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
705 jobs = self.depending.keys()
713 jobs = self.depending.keys()
706
714
707 for msg_id in sorted(jobs, key=lambda msg_id: self.depending[msg_id].timestamp):
715 for msg_id in sorted(jobs, key=lambda msg_id: self.depending[msg_id].timestamp):
708 job = self.depending[msg_id]
716 job = self.depending[msg_id]
709
717
710 if job.after.unreachable(self.all_completed, self.all_failed)\
718 if job.after.unreachable(self.all_completed, self.all_failed)\
711 or job.follow.unreachable(self.all_completed, self.all_failed):
719 or job.follow.unreachable(self.all_completed, self.all_failed):
712 self.fail_unreachable(msg_id)
720 self.fail_unreachable(msg_id)
713
721
714 elif job.after.check(self.all_completed, self.all_failed): # time deps met, maybe run
722 elif job.after.check(self.all_completed, self.all_failed): # time deps met, maybe run
715 if self.maybe_run(job):
723 if self.maybe_run(job):
716
724
717 self.depending.pop(msg_id)
725 self.depending.pop(msg_id)
718 for mid in job.dependents:
726 for mid in job.dependents:
719 if mid in self.graph:
727 if mid in self.graph:
720 self.graph[mid].remove(msg_id)
728 self.graph[mid].remove(msg_id)
729
730 # abort the loop if we just filled up all of our engines.
731 # avoids an O(N) operation in situation of full queue,
732 # where graph update is triggered as soon as an engine becomes
733 # non-full, and all tasks after the first are checked,
734 # even though they can't run.
735 if not self.available_engines():
736 return
721
737
722 #----------------------------------------------------------------------
738 #----------------------------------------------------------------------
723 # methods to be overridden by subclasses
739 # methods to be overridden by subclasses
724 #----------------------------------------------------------------------
740 #----------------------------------------------------------------------
725
741
726 def add_job(self, idx):
742 def add_job(self, idx):
727 """Called after self.targets[idx] just got the job with header.
743 """Called after self.targets[idx] just got the job with header.
728 Override with subclasses. The default ordering is simple LRU.
744 Override with subclasses. The default ordering is simple LRU.
729 The default loads are the number of outstanding jobs."""
745 The default loads are the number of outstanding jobs."""
730 self.loads[idx] += 1
746 self.loads[idx] += 1
731 for lis in (self.targets, self.loads):
747 for lis in (self.targets, self.loads):
732 lis.append(lis.pop(idx))
748 lis.append(lis.pop(idx))
733
749
734
750
735 def finish_job(self, idx):
751 def finish_job(self, idx):
736 """Called after self.targets[idx] just finished a job.
752 """Called after self.targets[idx] just finished a job.
737 Override with subclasses."""
753 Override with subclasses."""
738 self.loads[idx] -= 1
754 self.loads[idx] -= 1
739
755
740
756
741
757
742 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, reg_addr, config=None,
758 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, reg_addr, config=None,
743 logname='root', log_url=None, loglevel=logging.DEBUG,
759 logname='root', log_url=None, loglevel=logging.DEBUG,
744 identity=b'task', in_thread=False):
760 identity=b'task', in_thread=False):
745
761
746 ZMQStream = zmqstream.ZMQStream
762 ZMQStream = zmqstream.ZMQStream
747
763
748 if config:
764 if config:
749 # unwrap dict back into Config
765 # unwrap dict back into Config
750 config = Config(config)
766 config = Config(config)
751
767
752 if in_thread:
768 if in_thread:
753 # use instance() to get the same Context/Loop as our parent
769 # use instance() to get the same Context/Loop as our parent
754 ctx = zmq.Context.instance()
770 ctx = zmq.Context.instance()
755 loop = ioloop.IOLoop.instance()
771 loop = ioloop.IOLoop.instance()
756 else:
772 else:
757 # in a process, don't use instance()
773 # in a process, don't use instance()
758 # for safety with multiprocessing
774 # for safety with multiprocessing
759 ctx = zmq.Context()
775 ctx = zmq.Context()
760 loop = ioloop.IOLoop()
776 loop = ioloop.IOLoop()
761 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
777 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
762 ins.setsockopt(zmq.IDENTITY, identity + b'_in')
778 ins.setsockopt(zmq.IDENTITY, identity + b'_in')
763 ins.bind(in_addr)
779 ins.bind(in_addr)
764
780
765 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
781 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
766 outs.setsockopt(zmq.IDENTITY, identity + b'_out')
782 outs.setsockopt(zmq.IDENTITY, identity + b'_out')
767 outs.bind(out_addr)
783 outs.bind(out_addr)
768 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
784 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
769 mons.connect(mon_addr)
785 mons.connect(mon_addr)
770 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
786 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
771 nots.setsockopt(zmq.SUBSCRIBE, b'')
787 nots.setsockopt(zmq.SUBSCRIBE, b'')
772 nots.connect(not_addr)
788 nots.connect(not_addr)
773
789
774 querys = ZMQStream(ctx.socket(zmq.DEALER),loop)
790 querys = ZMQStream(ctx.socket(zmq.DEALER),loop)
775 querys.connect(reg_addr)
791 querys.connect(reg_addr)
776
792
777 # setup logging.
793 # setup logging.
778 if in_thread:
794 if in_thread:
779 log = Application.instance().log
795 log = Application.instance().log
780 else:
796 else:
781 if log_url:
797 if log_url:
782 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
798 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
783 else:
799 else:
784 log = local_logger(logname, loglevel)
800 log = local_logger(logname, loglevel)
785
801
786 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
802 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
787 mon_stream=mons, notifier_stream=nots,
803 mon_stream=mons, notifier_stream=nots,
788 query_stream=querys,
804 query_stream=querys,
789 loop=loop, log=log,
805 loop=loop, log=log,
790 config=config)
806 config=config)
791 scheduler.start()
807 scheduler.start()
792 if not in_thread:
808 if not in_thread:
793 try:
809 try:
794 loop.start()
810 loop.start()
795 except KeyboardInterrupt:
811 except KeyboardInterrupt:
796 scheduler.log.critical("Interrupted, exiting...")
812 scheduler.log.critical("Interrupted, exiting...")
797
813
General Comments 0
You need to be logged in to leave comments. Login now