##// END OF EJS Templates
Merge pull request #1369 from minrk/EngineError...
Min RK -
r6098:0291d619 merge
parent child Browse files
Show More
@@ -1,726 +1,732 b''
1 """The Python scheduler for rich scheduling.
1 """The Python scheduler for rich scheduling.
2
2
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 Python Scheduler exists.
5 Python Scheduler exists.
6
6
7 Authors:
7 Authors:
8
8
9 * Min RK
9 * Min RK
10 """
10 """
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2010-2011 The IPython Development Team
12 # Copyright (C) 2010-2011 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #----------------------------------------------------------------------
18 #----------------------------------------------------------------------
19 # Imports
19 # Imports
20 #----------------------------------------------------------------------
20 #----------------------------------------------------------------------
21
21
22 from __future__ import print_function
22 from __future__ import print_function
23
23
24 import logging
24 import logging
25 import sys
25 import sys
26
26
27 from datetime import datetime, timedelta
27 from datetime import datetime, timedelta
28 from random import randint, random
28 from random import randint, random
29 from types import FunctionType
29 from types import FunctionType
30
30
31 try:
31 try:
32 import numpy
32 import numpy
33 except ImportError:
33 except ImportError:
34 numpy = None
34 numpy = None
35
35
36 import zmq
36 import zmq
37 from zmq.eventloop import ioloop, zmqstream
37 from zmq.eventloop import ioloop, zmqstream
38
38
39 # local imports
39 # local imports
40 from IPython.external.decorator import decorator
40 from IPython.external.decorator import decorator
41 from IPython.config.application import Application
41 from IPython.config.application import Application
42 from IPython.config.loader import Config
42 from IPython.config.loader import Config
43 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
43 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
44
44
45 from IPython.parallel import error
45 from IPython.parallel import error
46 from IPython.parallel.factory import SessionFactory
46 from IPython.parallel.factory import SessionFactory
47 from IPython.parallel.util import connect_logger, local_logger, asbytes
47 from IPython.parallel.util import connect_logger, local_logger, asbytes
48
48
49 from .dependency import Dependency
49 from .dependency import Dependency
50
50
51 @decorator
51 @decorator
52 def logged(f,self,*args,**kwargs):
52 def logged(f,self,*args,**kwargs):
53 # print ("#--------------------")
53 # print ("#--------------------")
54 self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs)
54 self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs)
55 # print ("#--")
55 # print ("#--")
56 return f(self,*args, **kwargs)
56 return f(self,*args, **kwargs)
57
57
58 #----------------------------------------------------------------------
58 #----------------------------------------------------------------------
59 # Chooser functions
59 # Chooser functions
60 #----------------------------------------------------------------------
60 #----------------------------------------------------------------------
61
61
62 def plainrandom(loads):
62 def plainrandom(loads):
63 """Plain random pick."""
63 """Plain random pick."""
64 n = len(loads)
64 n = len(loads)
65 return randint(0,n-1)
65 return randint(0,n-1)
66
66
67 def lru(loads):
67 def lru(loads):
68 """Always pick the front of the line.
68 """Always pick the front of the line.
69
69
70 The content of `loads` is ignored.
70 The content of `loads` is ignored.
71
71
72 Assumes LRU ordering of loads, with oldest first.
72 Assumes LRU ordering of loads, with oldest first.
73 """
73 """
74 return 0
74 return 0
75
75
76 def twobin(loads):
76 def twobin(loads):
77 """Pick two at random, use the LRU of the two.
77 """Pick two at random, use the LRU of the two.
78
78
79 The content of loads is ignored.
79 The content of loads is ignored.
80
80
81 Assumes LRU ordering of loads, with oldest first.
81 Assumes LRU ordering of loads, with oldest first.
82 """
82 """
83 n = len(loads)
83 n = len(loads)
84 a = randint(0,n-1)
84 a = randint(0,n-1)
85 b = randint(0,n-1)
85 b = randint(0,n-1)
86 return min(a,b)
86 return min(a,b)
87
87
88 def weighted(loads):
88 def weighted(loads):
89 """Pick two at random using inverse load as weight.
89 """Pick two at random using inverse load as weight.
90
90
91 Return the less loaded of the two.
91 Return the less loaded of the two.
92 """
92 """
93 # weight 0 a million times more than 1:
93 # weight 0 a million times more than 1:
94 weights = 1./(1e-6+numpy.array(loads))
94 weights = 1./(1e-6+numpy.array(loads))
95 sums = weights.cumsum()
95 sums = weights.cumsum()
96 t = sums[-1]
96 t = sums[-1]
97 x = random()*t
97 x = random()*t
98 y = random()*t
98 y = random()*t
99 idx = 0
99 idx = 0
100 idy = 0
100 idy = 0
101 while sums[idx] < x:
101 while sums[idx] < x:
102 idx += 1
102 idx += 1
103 while sums[idy] < y:
103 while sums[idy] < y:
104 idy += 1
104 idy += 1
105 if weights[idy] > weights[idx]:
105 if weights[idy] > weights[idx]:
106 return idy
106 return idy
107 else:
107 else:
108 return idx
108 return idx
109
109
110 def leastload(loads):
110 def leastload(loads):
111 """Always choose the lowest load.
111 """Always choose the lowest load.
112
112
113 If the lowest load occurs more than once, the first
113 If the lowest load occurs more than once, the first
114 occurance will be used. If loads has LRU ordering, this means
114 occurance will be used. If loads has LRU ordering, this means
115 the LRU of those with the lowest load is chosen.
115 the LRU of those with the lowest load is chosen.
116 """
116 """
117 return loads.index(min(loads))
117 return loads.index(min(loads))
118
118
119 #---------------------------------------------------------------------
119 #---------------------------------------------------------------------
120 # Classes
120 # Classes
121 #---------------------------------------------------------------------
121 #---------------------------------------------------------------------
122 # store empty default dependency:
122 # store empty default dependency:
123 MET = Dependency([])
123 MET = Dependency([])
124
124
125 class TaskScheduler(SessionFactory):
125 class TaskScheduler(SessionFactory):
126 """Python TaskScheduler object.
126 """Python TaskScheduler object.
127
127
128 This is the simplest object that supports msg_id based
128 This is the simplest object that supports msg_id based
129 DAG dependencies. *Only* task msg_ids are checked, not
129 DAG dependencies. *Only* task msg_ids are checked, not
130 msg_ids of jobs submitted via the MUX queue.
130 msg_ids of jobs submitted via the MUX queue.
131
131
132 """
132 """
133
133
134 hwm = Integer(1, config=True,
134 hwm = Integer(1, config=True,
135 help="""specify the High Water Mark (HWM) for the downstream
135 help="""specify the High Water Mark (HWM) for the downstream
136 socket in the Task scheduler. This is the maximum number
136 socket in the Task scheduler. This is the maximum number
137 of allowed outstanding tasks on each engine.
137 of allowed outstanding tasks on each engine.
138
138
139 The default (1) means that only one task can be outstanding on each
139 The default (1) means that only one task can be outstanding on each
140 engine. Setting TaskScheduler.hwm=0 means there is no limit, and the
140 engine. Setting TaskScheduler.hwm=0 means there is no limit, and the
141 engines continue to be assigned tasks while they are working,
141 engines continue to be assigned tasks while they are working,
142 effectively hiding network latency behind computation, but can result
142 effectively hiding network latency behind computation, but can result
143 in an imbalance of work when submitting many heterogenous tasks all at
143 in an imbalance of work when submitting many heterogenous tasks all at
144 once. Any positive value greater than one is a compromise between the
144 once. Any positive value greater than one is a compromise between the
145 two.
145 two.
146
146
147 """
147 """
148 )
148 )
149 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
149 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
150 'leastload', config=True, allow_none=False,
150 'leastload', config=True, allow_none=False,
151 help="""select the task scheduler scheme [default: Python LRU]
151 help="""select the task scheduler scheme [default: Python LRU]
152 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
152 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
153 )
153 )
154 def _scheme_name_changed(self, old, new):
154 def _scheme_name_changed(self, old, new):
155 self.log.debug("Using scheme %r"%new)
155 self.log.debug("Using scheme %r"%new)
156 self.scheme = globals()[new]
156 self.scheme = globals()[new]
157
157
158 # input arguments:
158 # input arguments:
159 scheme = Instance(FunctionType) # function for determining the destination
159 scheme = Instance(FunctionType) # function for determining the destination
160 def _scheme_default(self):
160 def _scheme_default(self):
161 return leastload
161 return leastload
162 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
162 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
163 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
163 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
164 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
164 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
165 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
165 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
166
166
167 # internals:
167 # internals:
168 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
168 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
169 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
169 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
170 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
170 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
171 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
171 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
172 pending = Dict() # dict by engine_uuid of submitted tasks
172 pending = Dict() # dict by engine_uuid of submitted tasks
173 completed = Dict() # dict by engine_uuid of completed tasks
173 completed = Dict() # dict by engine_uuid of completed tasks
174 failed = Dict() # dict by engine_uuid of failed tasks
174 failed = Dict() # dict by engine_uuid of failed tasks
175 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
175 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
176 clients = Dict() # dict by msg_id for who submitted the task
176 clients = Dict() # dict by msg_id for who submitted the task
177 targets = List() # list of target IDENTs
177 targets = List() # list of target IDENTs
178 loads = List() # list of engine loads
178 loads = List() # list of engine loads
179 # full = Set() # set of IDENTs that have HWM outstanding tasks
179 # full = Set() # set of IDENTs that have HWM outstanding tasks
180 all_completed = Set() # set of all completed tasks
180 all_completed = Set() # set of all completed tasks
181 all_failed = Set() # set of all failed tasks
181 all_failed = Set() # set of all failed tasks
182 all_done = Set() # set of all finished tasks=union(completed,failed)
182 all_done = Set() # set of all finished tasks=union(completed,failed)
183 all_ids = Set() # set of all submitted task IDs
183 all_ids = Set() # set of all submitted task IDs
184 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
184 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
185 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
185 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
186
186
187 ident = CBytes() # ZMQ identity. This should just be self.session.session
187 ident = CBytes() # ZMQ identity. This should just be self.session.session
188 # but ensure Bytes
188 # but ensure Bytes
189 def _ident_default(self):
189 def _ident_default(self):
190 return self.session.bsession
190 return self.session.bsession
191
191
192 def start(self):
192 def start(self):
193 self.engine_stream.on_recv(self.dispatch_result, copy=False)
193 self.engine_stream.on_recv(self.dispatch_result, copy=False)
194 self._notification_handlers = dict(
194 self._notification_handlers = dict(
195 registration_notification = self._register_engine,
195 registration_notification = self._register_engine,
196 unregistration_notification = self._unregister_engine
196 unregistration_notification = self._unregister_engine
197 )
197 )
198 self.notifier_stream.on_recv(self.dispatch_notification)
198 self.notifier_stream.on_recv(self.dispatch_notification)
199 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
199 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
200 self.auditor.start()
200 self.auditor.start()
201 self.log.info("Scheduler started [%s]"%self.scheme_name)
201 self.log.info("Scheduler started [%s]"%self.scheme_name)
202
202
203 def resume_receiving(self):
203 def resume_receiving(self):
204 """Resume accepting jobs."""
204 """Resume accepting jobs."""
205 self.client_stream.on_recv(self.dispatch_submission, copy=False)
205 self.client_stream.on_recv(self.dispatch_submission, copy=False)
206
206
207 def stop_receiving(self):
207 def stop_receiving(self):
208 """Stop accepting jobs while there are no engines.
208 """Stop accepting jobs while there are no engines.
209 Leave them in the ZMQ queue."""
209 Leave them in the ZMQ queue."""
210 self.client_stream.on_recv(None)
210 self.client_stream.on_recv(None)
211
211
212 #-----------------------------------------------------------------------
212 #-----------------------------------------------------------------------
213 # [Un]Registration Handling
213 # [Un]Registration Handling
214 #-----------------------------------------------------------------------
214 #-----------------------------------------------------------------------
215
215
216 def dispatch_notification(self, msg):
216 def dispatch_notification(self, msg):
217 """dispatch register/unregister events."""
217 """dispatch register/unregister events."""
218 try:
218 try:
219 idents,msg = self.session.feed_identities(msg)
219 idents,msg = self.session.feed_identities(msg)
220 except ValueError:
220 except ValueError:
221 self.log.warn("task::Invalid Message: %r",msg)
221 self.log.warn("task::Invalid Message: %r",msg)
222 return
222 return
223 try:
223 try:
224 msg = self.session.unserialize(msg)
224 msg = self.session.unserialize(msg)
225 except ValueError:
225 except ValueError:
226 self.log.warn("task::Unauthorized message from: %r"%idents)
226 self.log.warn("task::Unauthorized message from: %r"%idents)
227 return
227 return
228
228
229 msg_type = msg['header']['msg_type']
229 msg_type = msg['header']['msg_type']
230
230
231 handler = self._notification_handlers.get(msg_type, None)
231 handler = self._notification_handlers.get(msg_type, None)
232 if handler is None:
232 if handler is None:
233 self.log.error("Unhandled message type: %r"%msg_type)
233 self.log.error("Unhandled message type: %r"%msg_type)
234 else:
234 else:
235 try:
235 try:
236 handler(asbytes(msg['content']['queue']))
236 handler(asbytes(msg['content']['queue']))
237 except Exception:
237 except Exception:
238 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
238 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
239
239
240 def _register_engine(self, uid):
240 def _register_engine(self, uid):
241 """New engine with ident `uid` became available."""
241 """New engine with ident `uid` became available."""
242 # head of the line:
242 # head of the line:
243 self.targets.insert(0,uid)
243 self.targets.insert(0,uid)
244 self.loads.insert(0,0)
244 self.loads.insert(0,0)
245
245
246 # initialize sets
246 # initialize sets
247 self.completed[uid] = set()
247 self.completed[uid] = set()
248 self.failed[uid] = set()
248 self.failed[uid] = set()
249 self.pending[uid] = {}
249 self.pending[uid] = {}
250 if len(self.targets) == 1:
250 if len(self.targets) == 1:
251 self.resume_receiving()
251 self.resume_receiving()
252 # rescan the graph:
252 # rescan the graph:
253 self.update_graph(None)
253 self.update_graph(None)
254
254
255 def _unregister_engine(self, uid):
255 def _unregister_engine(self, uid):
256 """Existing engine with ident `uid` became unavailable."""
256 """Existing engine with ident `uid` became unavailable."""
257 if len(self.targets) == 1:
257 if len(self.targets) == 1:
258 # this was our only engine
258 # this was our only engine
259 self.stop_receiving()
259 self.stop_receiving()
260
260
261 # handle any potentially finished tasks:
261 # handle any potentially finished tasks:
262 self.engine_stream.flush()
262 self.engine_stream.flush()
263
263
264 # don't pop destinations, because they might be used later
264 # don't pop destinations, because they might be used later
265 # map(self.destinations.pop, self.completed.pop(uid))
265 # map(self.destinations.pop, self.completed.pop(uid))
266 # map(self.destinations.pop, self.failed.pop(uid))
266 # map(self.destinations.pop, self.failed.pop(uid))
267
267
268 # prevent this engine from receiving work
268 # prevent this engine from receiving work
269 idx = self.targets.index(uid)
269 idx = self.targets.index(uid)
270 self.targets.pop(idx)
270 self.targets.pop(idx)
271 self.loads.pop(idx)
271 self.loads.pop(idx)
272
272
273 # wait 5 seconds before cleaning up pending jobs, since the results might
273 # wait 5 seconds before cleaning up pending jobs, since the results might
274 # still be incoming
274 # still be incoming
275 if self.pending[uid]:
275 if self.pending[uid]:
276 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
276 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
277 dc.start()
277 dc.start()
278 else:
278 else:
279 self.completed.pop(uid)
279 self.completed.pop(uid)
280 self.failed.pop(uid)
280 self.failed.pop(uid)
281
281
282
282
283 def handle_stranded_tasks(self, engine):
283 def handle_stranded_tasks(self, engine):
284 """Deal with jobs resident in an engine that died."""
284 """Deal with jobs resident in an engine that died."""
285 lost = self.pending[engine]
285 lost = self.pending[engine]
286 for msg_id in lost.keys():
286 for msg_id in lost.keys():
287 if msg_id not in self.pending[engine]:
287 if msg_id not in self.pending[engine]:
288 # prevent double-handling of messages
288 # prevent double-handling of messages
289 continue
289 continue
290
290
291 raw_msg = lost[msg_id][0]
291 raw_msg = lost[msg_id][0]
292 idents,msg = self.session.feed_identities(raw_msg, copy=False)
292 idents,msg = self.session.feed_identities(raw_msg, copy=False)
293 parent = self.session.unpack(msg[1].bytes)
293 parent = self.session.unpack(msg[1].bytes)
294 idents = [engine, idents[0]]
294 idents = [engine, idents[0]]
295
295
296 # build fake error reply
296 # build fake error reply
297 try:
297 try:
298 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
298 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
299 except:
299 except:
300 content = error.wrap_exception()
300 content = error.wrap_exception()
301 msg = self.session.msg('apply_reply', content, parent=parent, subheader={'status':'error'})
301 # build fake header
302 header = dict(
303 status='error',
304 engine=engine,
305 date=datetime.now(),
306 )
307 msg = self.session.msg('apply_reply', content, parent=parent, subheader=header)
302 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
308 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
303 # and dispatch it
309 # and dispatch it
304 self.dispatch_result(raw_reply)
310 self.dispatch_result(raw_reply)
305
311
306 # finally scrub completed/failed lists
312 # finally scrub completed/failed lists
307 self.completed.pop(engine)
313 self.completed.pop(engine)
308 self.failed.pop(engine)
314 self.failed.pop(engine)
309
315
310
316
311 #-----------------------------------------------------------------------
317 #-----------------------------------------------------------------------
312 # Job Submission
318 # Job Submission
313 #-----------------------------------------------------------------------
319 #-----------------------------------------------------------------------
314 def dispatch_submission(self, raw_msg):
320 def dispatch_submission(self, raw_msg):
315 """Dispatch job submission to appropriate handlers."""
321 """Dispatch job submission to appropriate handlers."""
316 # ensure targets up to date:
322 # ensure targets up to date:
317 self.notifier_stream.flush()
323 self.notifier_stream.flush()
318 try:
324 try:
319 idents, msg = self.session.feed_identities(raw_msg, copy=False)
325 idents, msg = self.session.feed_identities(raw_msg, copy=False)
320 msg = self.session.unserialize(msg, content=False, copy=False)
326 msg = self.session.unserialize(msg, content=False, copy=False)
321 except Exception:
327 except Exception:
322 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
328 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
323 return
329 return
324
330
325
331
326 # send to monitor
332 # send to monitor
327 self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
333 self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
328
334
329 header = msg['header']
335 header = msg['header']
330 msg_id = header['msg_id']
336 msg_id = header['msg_id']
331 self.all_ids.add(msg_id)
337 self.all_ids.add(msg_id)
332
338
333 # get targets as a set of bytes objects
339 # get targets as a set of bytes objects
334 # from a list of unicode objects
340 # from a list of unicode objects
335 targets = header.get('targets', [])
341 targets = header.get('targets', [])
336 targets = map(asbytes, targets)
342 targets = map(asbytes, targets)
337 targets = set(targets)
343 targets = set(targets)
338
344
339 retries = header.get('retries', 0)
345 retries = header.get('retries', 0)
340 self.retries[msg_id] = retries
346 self.retries[msg_id] = retries
341
347
342 # time dependencies
348 # time dependencies
343 after = header.get('after', None)
349 after = header.get('after', None)
344 if after:
350 if after:
345 after = Dependency(after)
351 after = Dependency(after)
346 if after.all:
352 if after.all:
347 if after.success:
353 if after.success:
348 after = Dependency(after.difference(self.all_completed),
354 after = Dependency(after.difference(self.all_completed),
349 success=after.success,
355 success=after.success,
350 failure=after.failure,
356 failure=after.failure,
351 all=after.all,
357 all=after.all,
352 )
358 )
353 if after.failure:
359 if after.failure:
354 after = Dependency(after.difference(self.all_failed),
360 after = Dependency(after.difference(self.all_failed),
355 success=after.success,
361 success=after.success,
356 failure=after.failure,
362 failure=after.failure,
357 all=after.all,
363 all=after.all,
358 )
364 )
359 if after.check(self.all_completed, self.all_failed):
365 if after.check(self.all_completed, self.all_failed):
360 # recast as empty set, if `after` already met,
366 # recast as empty set, if `after` already met,
361 # to prevent unnecessary set comparisons
367 # to prevent unnecessary set comparisons
362 after = MET
368 after = MET
363 else:
369 else:
364 after = MET
370 after = MET
365
371
366 # location dependencies
372 # location dependencies
367 follow = Dependency(header.get('follow', []))
373 follow = Dependency(header.get('follow', []))
368
374
369 # turn timeouts into datetime objects:
375 # turn timeouts into datetime objects:
370 timeout = header.get('timeout', None)
376 timeout = header.get('timeout', None)
371 if timeout:
377 if timeout:
372 # cast to float, because jsonlib returns floats as decimal.Decimal,
378 # cast to float, because jsonlib returns floats as decimal.Decimal,
373 # which timedelta does not accept
379 # which timedelta does not accept
374 timeout = datetime.now() + timedelta(0,float(timeout),0)
380 timeout = datetime.now() + timedelta(0,float(timeout),0)
375
381
376 args = [raw_msg, targets, after, follow, timeout]
382 args = [raw_msg, targets, after, follow, timeout]
377
383
378 # validate and reduce dependencies:
384 # validate and reduce dependencies:
379 for dep in after,follow:
385 for dep in after,follow:
380 if not dep: # empty dependency
386 if not dep: # empty dependency
381 continue
387 continue
382 # check valid:
388 # check valid:
383 if msg_id in dep or dep.difference(self.all_ids):
389 if msg_id in dep or dep.difference(self.all_ids):
384 self.depending[msg_id] = args
390 self.depending[msg_id] = args
385 return self.fail_unreachable(msg_id, error.InvalidDependency)
391 return self.fail_unreachable(msg_id, error.InvalidDependency)
386 # check if unreachable:
392 # check if unreachable:
387 if dep.unreachable(self.all_completed, self.all_failed):
393 if dep.unreachable(self.all_completed, self.all_failed):
388 self.depending[msg_id] = args
394 self.depending[msg_id] = args
389 return self.fail_unreachable(msg_id)
395 return self.fail_unreachable(msg_id)
390
396
391 if after.check(self.all_completed, self.all_failed):
397 if after.check(self.all_completed, self.all_failed):
392 # time deps already met, try to run
398 # time deps already met, try to run
393 if not self.maybe_run(msg_id, *args):
399 if not self.maybe_run(msg_id, *args):
394 # can't run yet
400 # can't run yet
395 if msg_id not in self.all_failed:
401 if msg_id not in self.all_failed:
396 # could have failed as unreachable
402 # could have failed as unreachable
397 self.save_unmet(msg_id, *args)
403 self.save_unmet(msg_id, *args)
398 else:
404 else:
399 self.save_unmet(msg_id, *args)
405 self.save_unmet(msg_id, *args)
400
406
401 def audit_timeouts(self):
407 def audit_timeouts(self):
402 """Audit all waiting tasks for expired timeouts."""
408 """Audit all waiting tasks for expired timeouts."""
403 now = datetime.now()
409 now = datetime.now()
404 for msg_id in self.depending.keys():
410 for msg_id in self.depending.keys():
405 # must recheck, in case one failure cascaded to another:
411 # must recheck, in case one failure cascaded to another:
406 if msg_id in self.depending:
412 if msg_id in self.depending:
407 raw,after,targets,follow,timeout = self.depending[msg_id]
413 raw,after,targets,follow,timeout = self.depending[msg_id]
408 if timeout and timeout < now:
414 if timeout and timeout < now:
409 self.fail_unreachable(msg_id, error.TaskTimeout)
415 self.fail_unreachable(msg_id, error.TaskTimeout)
410
416
411 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
417 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
412 """a task has become unreachable, send a reply with an ImpossibleDependency
418 """a task has become unreachable, send a reply with an ImpossibleDependency
413 error."""
419 error."""
414 if msg_id not in self.depending:
420 if msg_id not in self.depending:
415 self.log.error("msg %r already failed!", msg_id)
421 self.log.error("msg %r already failed!", msg_id)
416 return
422 return
417 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
423 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
418 for mid in follow.union(after):
424 for mid in follow.union(after):
419 if mid in self.graph:
425 if mid in self.graph:
420 self.graph[mid].remove(msg_id)
426 self.graph[mid].remove(msg_id)
421
427
422 # FIXME: unpacking a message I've already unpacked, but didn't save:
428 # FIXME: unpacking a message I've already unpacked, but didn't save:
423 idents,msg = self.session.feed_identities(raw_msg, copy=False)
429 idents,msg = self.session.feed_identities(raw_msg, copy=False)
424 header = self.session.unpack(msg[1].bytes)
430 header = self.session.unpack(msg[1].bytes)
425
431
426 try:
432 try:
427 raise why()
433 raise why()
428 except:
434 except:
429 content = error.wrap_exception()
435 content = error.wrap_exception()
430
436
431 self.all_done.add(msg_id)
437 self.all_done.add(msg_id)
432 self.all_failed.add(msg_id)
438 self.all_failed.add(msg_id)
433
439
434 msg = self.session.send(self.client_stream, 'apply_reply', content,
440 msg = self.session.send(self.client_stream, 'apply_reply', content,
435 parent=header, ident=idents)
441 parent=header, ident=idents)
436 self.session.send(self.mon_stream, msg, ident=[b'outtask']+idents)
442 self.session.send(self.mon_stream, msg, ident=[b'outtask']+idents)
437
443
438 self.update_graph(msg_id, success=False)
444 self.update_graph(msg_id, success=False)
439
445
440 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
446 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
441 """check location dependencies, and run if they are met."""
447 """check location dependencies, and run if they are met."""
442 blacklist = self.blacklist.setdefault(msg_id, set())
448 blacklist = self.blacklist.setdefault(msg_id, set())
443 if follow or targets or blacklist or self.hwm:
449 if follow or targets or blacklist or self.hwm:
444 # we need a can_run filter
450 # we need a can_run filter
445 def can_run(idx):
451 def can_run(idx):
446 # check hwm
452 # check hwm
447 if self.hwm and self.loads[idx] == self.hwm:
453 if self.hwm and self.loads[idx] == self.hwm:
448 return False
454 return False
449 target = self.targets[idx]
455 target = self.targets[idx]
450 # check blacklist
456 # check blacklist
451 if target in blacklist:
457 if target in blacklist:
452 return False
458 return False
453 # check targets
459 # check targets
454 if targets and target not in targets:
460 if targets and target not in targets:
455 return False
461 return False
456 # check follow
462 # check follow
457 return follow.check(self.completed[target], self.failed[target])
463 return follow.check(self.completed[target], self.failed[target])
458
464
459 indices = filter(can_run, range(len(self.targets)))
465 indices = filter(can_run, range(len(self.targets)))
460
466
461 if not indices:
467 if not indices:
462 # couldn't run
468 # couldn't run
463 if follow.all:
469 if follow.all:
464 # check follow for impossibility
470 # check follow for impossibility
465 dests = set()
471 dests = set()
466 relevant = set()
472 relevant = set()
467 if follow.success:
473 if follow.success:
468 relevant = self.all_completed
474 relevant = self.all_completed
469 if follow.failure:
475 if follow.failure:
470 relevant = relevant.union(self.all_failed)
476 relevant = relevant.union(self.all_failed)
471 for m in follow.intersection(relevant):
477 for m in follow.intersection(relevant):
472 dests.add(self.destinations[m])
478 dests.add(self.destinations[m])
473 if len(dests) > 1:
479 if len(dests) > 1:
474 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
480 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
475 self.fail_unreachable(msg_id)
481 self.fail_unreachable(msg_id)
476 return False
482 return False
477 if targets:
483 if targets:
478 # check blacklist+targets for impossibility
484 # check blacklist+targets for impossibility
479 targets.difference_update(blacklist)
485 targets.difference_update(blacklist)
480 if not targets or not targets.intersection(self.targets):
486 if not targets or not targets.intersection(self.targets):
481 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
487 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
482 self.fail_unreachable(msg_id)
488 self.fail_unreachable(msg_id)
483 return False
489 return False
484 return False
490 return False
485 else:
491 else:
486 indices = None
492 indices = None
487
493
488 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
494 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
489 return True
495 return True
490
496
491 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
497 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
492 """Save a message for later submission when its dependencies are met."""
498 """Save a message for later submission when its dependencies are met."""
493 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
499 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
494 # track the ids in follow or after, but not those already finished
500 # track the ids in follow or after, but not those already finished
495 for dep_id in after.union(follow).difference(self.all_done):
501 for dep_id in after.union(follow).difference(self.all_done):
496 if dep_id not in self.graph:
502 if dep_id not in self.graph:
497 self.graph[dep_id] = set()
503 self.graph[dep_id] = set()
498 self.graph[dep_id].add(msg_id)
504 self.graph[dep_id].add(msg_id)
499
505
500 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
506 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
501 """Submit a task to any of a subset of our targets."""
507 """Submit a task to any of a subset of our targets."""
502 if indices:
508 if indices:
503 loads = [self.loads[i] for i in indices]
509 loads = [self.loads[i] for i in indices]
504 else:
510 else:
505 loads = self.loads
511 loads = self.loads
506 idx = self.scheme(loads)
512 idx = self.scheme(loads)
507 if indices:
513 if indices:
508 idx = indices[idx]
514 idx = indices[idx]
509 target = self.targets[idx]
515 target = self.targets[idx]
510 # print (target, map(str, msg[:3]))
516 # print (target, map(str, msg[:3]))
511 # send job to the engine
517 # send job to the engine
512 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
518 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
513 self.engine_stream.send_multipart(raw_msg, copy=False)
519 self.engine_stream.send_multipart(raw_msg, copy=False)
514 # update load
520 # update load
515 self.add_job(idx)
521 self.add_job(idx)
516 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
522 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
517 # notify Hub
523 # notify Hub
518 content = dict(msg_id=msg_id, engine_id=target.decode('ascii'))
524 content = dict(msg_id=msg_id, engine_id=target.decode('ascii'))
519 self.session.send(self.mon_stream, 'task_destination', content=content,
525 self.session.send(self.mon_stream, 'task_destination', content=content,
520 ident=[b'tracktask',self.ident])
526 ident=[b'tracktask',self.ident])
521
527
522
528
523 #-----------------------------------------------------------------------
529 #-----------------------------------------------------------------------
524 # Result Handling
530 # Result Handling
525 #-----------------------------------------------------------------------
531 #-----------------------------------------------------------------------
526 def dispatch_result(self, raw_msg):
532 def dispatch_result(self, raw_msg):
527 """dispatch method for result replies"""
533 """dispatch method for result replies"""
528 try:
534 try:
529 idents,msg = self.session.feed_identities(raw_msg, copy=False)
535 idents,msg = self.session.feed_identities(raw_msg, copy=False)
530 msg = self.session.unserialize(msg, content=False, copy=False)
536 msg = self.session.unserialize(msg, content=False, copy=False)
531 engine = idents[0]
537 engine = idents[0]
532 try:
538 try:
533 idx = self.targets.index(engine)
539 idx = self.targets.index(engine)
534 except ValueError:
540 except ValueError:
535 pass # skip load-update for dead engines
541 pass # skip load-update for dead engines
536 else:
542 else:
537 self.finish_job(idx)
543 self.finish_job(idx)
538 except Exception:
544 except Exception:
539 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
545 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
540 return
546 return
541
547
542 header = msg['header']
548 header = msg['header']
543 parent = msg['parent_header']
549 parent = msg['parent_header']
544 if header.get('dependencies_met', True):
550 if header.get('dependencies_met', True):
545 success = (header['status'] == 'ok')
551 success = (header['status'] == 'ok')
546 msg_id = parent['msg_id']
552 msg_id = parent['msg_id']
547 retries = self.retries[msg_id]
553 retries = self.retries[msg_id]
548 if not success and retries > 0:
554 if not success and retries > 0:
549 # failed
555 # failed
550 self.retries[msg_id] = retries - 1
556 self.retries[msg_id] = retries - 1
551 self.handle_unmet_dependency(idents, parent)
557 self.handle_unmet_dependency(idents, parent)
552 else:
558 else:
553 del self.retries[msg_id]
559 del self.retries[msg_id]
554 # relay to client and update graph
560 # relay to client and update graph
555 self.handle_result(idents, parent, raw_msg, success)
561 self.handle_result(idents, parent, raw_msg, success)
556 # send to Hub monitor
562 # send to Hub monitor
557 self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False)
563 self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False)
558 else:
564 else:
559 self.handle_unmet_dependency(idents, parent)
565 self.handle_unmet_dependency(idents, parent)
560
566
561 def handle_result(self, idents, parent, raw_msg, success=True):
567 def handle_result(self, idents, parent, raw_msg, success=True):
562 """handle a real task result, either success or failure"""
568 """handle a real task result, either success or failure"""
563 # first, relay result to client
569 # first, relay result to client
564 engine = idents[0]
570 engine = idents[0]
565 client = idents[1]
571 client = idents[1]
566 # swap_ids for XREP-XREP mirror
572 # swap_ids for XREP-XREP mirror
567 raw_msg[:2] = [client,engine]
573 raw_msg[:2] = [client,engine]
568 # print (map(str, raw_msg[:4]))
574 # print (map(str, raw_msg[:4]))
569 self.client_stream.send_multipart(raw_msg, copy=False)
575 self.client_stream.send_multipart(raw_msg, copy=False)
570 # now, update our data structures
576 # now, update our data structures
571 msg_id = parent['msg_id']
577 msg_id = parent['msg_id']
572 self.blacklist.pop(msg_id, None)
578 self.blacklist.pop(msg_id, None)
573 self.pending[engine].pop(msg_id)
579 self.pending[engine].pop(msg_id)
574 if success:
580 if success:
575 self.completed[engine].add(msg_id)
581 self.completed[engine].add(msg_id)
576 self.all_completed.add(msg_id)
582 self.all_completed.add(msg_id)
577 else:
583 else:
578 self.failed[engine].add(msg_id)
584 self.failed[engine].add(msg_id)
579 self.all_failed.add(msg_id)
585 self.all_failed.add(msg_id)
580 self.all_done.add(msg_id)
586 self.all_done.add(msg_id)
581 self.destinations[msg_id] = engine
587 self.destinations[msg_id] = engine
582
588
583 self.update_graph(msg_id, success)
589 self.update_graph(msg_id, success)
584
590
585 def handle_unmet_dependency(self, idents, parent):
591 def handle_unmet_dependency(self, idents, parent):
586 """handle an unmet dependency"""
592 """handle an unmet dependency"""
587 engine = idents[0]
593 engine = idents[0]
588 msg_id = parent['msg_id']
594 msg_id = parent['msg_id']
589
595
590 if msg_id not in self.blacklist:
596 if msg_id not in self.blacklist:
591 self.blacklist[msg_id] = set()
597 self.blacklist[msg_id] = set()
592 self.blacklist[msg_id].add(engine)
598 self.blacklist[msg_id].add(engine)
593
599
594 args = self.pending[engine].pop(msg_id)
600 args = self.pending[engine].pop(msg_id)
595 raw,targets,after,follow,timeout = args
601 raw,targets,after,follow,timeout = args
596
602
597 if self.blacklist[msg_id] == targets:
603 if self.blacklist[msg_id] == targets:
598 self.depending[msg_id] = args
604 self.depending[msg_id] = args
599 self.fail_unreachable(msg_id)
605 self.fail_unreachable(msg_id)
600 elif not self.maybe_run(msg_id, *args):
606 elif not self.maybe_run(msg_id, *args):
601 # resubmit failed
607 # resubmit failed
602 if msg_id not in self.all_failed:
608 if msg_id not in self.all_failed:
603 # put it back in our dependency tree
609 # put it back in our dependency tree
604 self.save_unmet(msg_id, *args)
610 self.save_unmet(msg_id, *args)
605
611
606 if self.hwm:
612 if self.hwm:
607 try:
613 try:
608 idx = self.targets.index(engine)
614 idx = self.targets.index(engine)
609 except ValueError:
615 except ValueError:
610 pass # skip load-update for dead engines
616 pass # skip load-update for dead engines
611 else:
617 else:
612 if self.loads[idx] == self.hwm-1:
618 if self.loads[idx] == self.hwm-1:
613 self.update_graph(None)
619 self.update_graph(None)
614
620
615
621
616
622
617 def update_graph(self, dep_id=None, success=True):
623 def update_graph(self, dep_id=None, success=True):
618 """dep_id just finished. Update our dependency
624 """dep_id just finished. Update our dependency
619 graph and submit any jobs that just became runable.
625 graph and submit any jobs that just became runable.
620
626
621 Called with dep_id=None to update entire graph for hwm, but without finishing
627 Called with dep_id=None to update entire graph for hwm, but without finishing
622 a task.
628 a task.
623 """
629 """
624 # print ("\n\n***********")
630 # print ("\n\n***********")
625 # pprint (dep_id)
631 # pprint (dep_id)
626 # pprint (self.graph)
632 # pprint (self.graph)
627 # pprint (self.depending)
633 # pprint (self.depending)
628 # pprint (self.all_completed)
634 # pprint (self.all_completed)
629 # pprint (self.all_failed)
635 # pprint (self.all_failed)
630 # print ("\n\n***********\n\n")
636 # print ("\n\n***********\n\n")
631 # update any jobs that depended on the dependency
637 # update any jobs that depended on the dependency
632 jobs = self.graph.pop(dep_id, [])
638 jobs = self.graph.pop(dep_id, [])
633
639
634 # recheck *all* jobs if
640 # recheck *all* jobs if
635 # a) we have HWM and an engine just become no longer full
641 # a) we have HWM and an engine just become no longer full
636 # or b) dep_id was given as None
642 # or b) dep_id was given as None
637 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
643 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
638 jobs = self.depending.keys()
644 jobs = self.depending.keys()
639
645
640 for msg_id in jobs:
646 for msg_id in jobs:
641 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
647 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
642
648
643 if after.unreachable(self.all_completed, self.all_failed)\
649 if after.unreachable(self.all_completed, self.all_failed)\
644 or follow.unreachable(self.all_completed, self.all_failed):
650 or follow.unreachable(self.all_completed, self.all_failed):
645 self.fail_unreachable(msg_id)
651 self.fail_unreachable(msg_id)
646
652
647 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
653 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
648 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
654 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
649
655
650 self.depending.pop(msg_id)
656 self.depending.pop(msg_id)
651 for mid in follow.union(after):
657 for mid in follow.union(after):
652 if mid in self.graph:
658 if mid in self.graph:
653 self.graph[mid].remove(msg_id)
659 self.graph[mid].remove(msg_id)
654
660
655 #----------------------------------------------------------------------
661 #----------------------------------------------------------------------
656 # methods to be overridden by subclasses
662 # methods to be overridden by subclasses
657 #----------------------------------------------------------------------
663 #----------------------------------------------------------------------
658
664
659 def add_job(self, idx):
665 def add_job(self, idx):
660 """Called after self.targets[idx] just got the job with header.
666 """Called after self.targets[idx] just got the job with header.
661 Override with subclasses. The default ordering is simple LRU.
667 Override with subclasses. The default ordering is simple LRU.
662 The default loads are the number of outstanding jobs."""
668 The default loads are the number of outstanding jobs."""
663 self.loads[idx] += 1
669 self.loads[idx] += 1
664 for lis in (self.targets, self.loads):
670 for lis in (self.targets, self.loads):
665 lis.append(lis.pop(idx))
671 lis.append(lis.pop(idx))
666
672
667
673
668 def finish_job(self, idx):
674 def finish_job(self, idx):
669 """Called after self.targets[idx] just finished a job.
675 """Called after self.targets[idx] just finished a job.
670 Override with subclasses."""
676 Override with subclasses."""
671 self.loads[idx] -= 1
677 self.loads[idx] -= 1
672
678
673
679
674
680
675 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
681 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
676 logname='root', log_url=None, loglevel=logging.DEBUG,
682 logname='root', log_url=None, loglevel=logging.DEBUG,
677 identity=b'task', in_thread=False):
683 identity=b'task', in_thread=False):
678
684
679 ZMQStream = zmqstream.ZMQStream
685 ZMQStream = zmqstream.ZMQStream
680
686
681 if config:
687 if config:
682 # unwrap dict back into Config
688 # unwrap dict back into Config
683 config = Config(config)
689 config = Config(config)
684
690
685 if in_thread:
691 if in_thread:
686 # use instance() to get the same Context/Loop as our parent
692 # use instance() to get the same Context/Loop as our parent
687 ctx = zmq.Context.instance()
693 ctx = zmq.Context.instance()
688 loop = ioloop.IOLoop.instance()
694 loop = ioloop.IOLoop.instance()
689 else:
695 else:
690 # in a process, don't use instance()
696 # in a process, don't use instance()
691 # for safety with multiprocessing
697 # for safety with multiprocessing
692 ctx = zmq.Context()
698 ctx = zmq.Context()
693 loop = ioloop.IOLoop()
699 loop = ioloop.IOLoop()
694 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
700 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
695 ins.setsockopt(zmq.IDENTITY, identity)
701 ins.setsockopt(zmq.IDENTITY, identity)
696 ins.bind(in_addr)
702 ins.bind(in_addr)
697
703
698 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
704 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
699 outs.setsockopt(zmq.IDENTITY, identity)
705 outs.setsockopt(zmq.IDENTITY, identity)
700 outs.bind(out_addr)
706 outs.bind(out_addr)
701 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
707 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
702 mons.connect(mon_addr)
708 mons.connect(mon_addr)
703 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
709 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
704 nots.setsockopt(zmq.SUBSCRIBE, b'')
710 nots.setsockopt(zmq.SUBSCRIBE, b'')
705 nots.connect(not_addr)
711 nots.connect(not_addr)
706
712
707 # setup logging.
713 # setup logging.
708 if in_thread:
714 if in_thread:
709 log = Application.instance().log
715 log = Application.instance().log
710 else:
716 else:
711 if log_url:
717 if log_url:
712 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
718 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
713 else:
719 else:
714 log = local_logger(logname, loglevel)
720 log = local_logger(logname, loglevel)
715
721
716 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
722 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
717 mon_stream=mons, notifier_stream=nots,
723 mon_stream=mons, notifier_stream=nots,
718 loop=loop, log=log,
724 loop=loop, log=log,
719 config=config)
725 config=config)
720 scheduler.start()
726 scheduler.start()
721 if not in_thread:
727 if not in_thread:
722 try:
728 try:
723 loop.start()
729 loop.start()
724 except KeyboardInterrupt:
730 except KeyboardInterrupt:
725 scheduler.log.critical("Interrupted, exiting...")
731 scheduler.log.critical("Interrupted, exiting...")
726
732
General Comments 0
You need to be logged in to leave comments. Login now