##// END OF EJS Templates
add timeout for unmet dependencies in task scheduler
MinRK -
Show More
@@ -36,7 +36,7 b' class AsyncResult(object):'
36 self._fname=fname
36 self._fname=fname
37 self._ready = False
37 self._ready = False
38 self._success = None
38 self._success = None
39 self._flatten_result = len(msg_ids) == 1
39 self._single_result = len(msg_ids) == 1
40
40
41 def __repr__(self):
41 def __repr__(self):
42 if self._ready:
42 if self._ready:
@@ -50,7 +50,7 b' class AsyncResult(object):'
50 Override me in subclasses for turning a list of results
50 Override me in subclasses for turning a list of results
51 into the expected form.
51 into the expected form.
52 """
52 """
53 if self._flatten_result:
53 if self._single_result:
54 return res[0]
54 return res[0]
55 else:
55 else:
56 return res
56 return res
@@ -90,7 +90,12 b' class AsyncResult(object):'
90 try:
90 try:
91 results = map(self._client.results.get, self.msg_ids)
91 results = map(self._client.results.get, self.msg_ids)
92 self._result = results
92 self._result = results
93 results = error.collect_exceptions(results, self._fname)
93 if self._single_result:
94 r = results[0]
95 if isinstance(r, Exception):
96 raise r
97 else:
98 results = error.collect_exceptions(results, self._fname)
94 self._result = self._reconstruct_result(results)
99 self._result = self._reconstruct_result(results)
95 except Exception, e:
100 except Exception, e:
96 self._exception = e
101 self._exception = e
@@ -138,7 +143,7 b' class AsyncResult(object):'
138 @check_ready
143 @check_ready
139 def metadata(self):
144 def metadata(self):
140 """metadata property."""
145 """metadata property."""
141 if self._flatten_result:
146 if self._single_result:
142 return self._metadata[0]
147 return self._metadata[0]
143 else:
148 else:
144 return self._metadata
149 return self._metadata
@@ -165,7 +170,7 b' class AsyncResult(object):'
165 return error.collect_exceptions(self._result[key], self._fname)
170 return error.collect_exceptions(self._result[key], self._fname)
166 elif isinstance(key, basestring):
171 elif isinstance(key, basestring):
167 values = [ md[key] for md in self._metadata ]
172 values = [ md[key] for md in self._metadata ]
168 if self._flatten_result:
173 if self._single_result:
169 return values[0]
174 return values[0]
170 else:
175 else:
171 return values
176 return values
@@ -190,7 +195,7 b' class AsyncMapResult(AsyncResult):'
190 def __init__(self, client, msg_ids, mapObject, fname=''):
195 def __init__(self, client, msg_ids, mapObject, fname=''):
191 AsyncResult.__init__(self, client, msg_ids, fname=fname)
196 AsyncResult.__init__(self, client, msg_ids, fname=fname)
192 self._mapObject = mapObject
197 self._mapObject = mapObject
193 self._flatten_result = False
198 self._single_result = False
194
199
195 def _reconstruct_result(self, res):
200 def _reconstruct_result(self, res):
196 """Perform the gather on the actual results."""
201 """Perform the gather on the actual results."""
@@ -765,9 +765,26 b' class Client(object):'
765 raise result
765 raise result
766
766
767 return result
767 return result
768
768
769 def _build_dependency(self, dep):
770 """helper for building jsonable dependencies from various input forms"""
771 if isinstance(dep, Dependency):
772 return dep.as_dict()
773 elif isinstance(dep, AsyncResult):
774 return dep.msg_ids
775 elif dep is None:
776 return []
777 elif isinstance(dep, set):
778 return list(dep)
779 elif isinstance(dep, (list,dict)):
780 return dep
781 elif isinstance(dep, str):
782 return [dep]
783 else:
784 raise TypeError("Dependency may be: set,list,dict,Dependency or AsyncResult, not %r"%type(dep))
785
769 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
786 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
770 after=None, follow=None):
787 after=None, follow=None, timeout=None):
771 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
788 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
772
789
773 This is the central execution command for the client.
790 This is the central execution command for the client.
@@ -817,6 +834,10 b' class Client(object):'
817 This job will only be run on an engine where this dependency
834 This job will only be run on an engine where this dependency
818 is met.
835 is met.
819
836
837 timeout : float or None
838 Only for load-balanced execution (targets=None)
839 Specify an amount of time (in seconds)
840
820 Returns
841 Returns
821 -------
842 -------
822 if block is False:
843 if block is False:
@@ -844,33 +865,23 b' class Client(object):'
844 raise TypeError("args must be tuple or list, not %s"%type(args))
865 raise TypeError("args must be tuple or list, not %s"%type(args))
845 if not isinstance(kwargs, dict):
866 if not isinstance(kwargs, dict):
846 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
867 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
847
868
848 if isinstance(after, Dependency):
869 after = self._build_dependency(after)
849 after = after.as_dict()
870 follow = self._build_dependency(follow)
850 elif isinstance(after, AsyncResult):
871
851 after=after.msg_ids
872 options = dict(bound=bound, block=block)
852 elif after is None:
853 after = []
854 if isinstance(follow, Dependency):
855 # if len(follow) > 1 and follow.mode == 'all':
856 # warn("complex follow-dependencies are not rigorously tested for reachability", UserWarning)
857 follow = follow.as_dict()
858 elif isinstance(follow, AsyncResult):
859 follow=follow.msg_ids
860 elif follow is None:
861 follow = []
862 options = dict(bound=bound, block=block, after=after, follow=follow)
863
873
864 if targets is None:
874 if targets is None:
865 return self._apply_balanced(f, args, kwargs, **options)
875 return self._apply_balanced(f, args, kwargs, timeout=timeout,
876 after=after, follow=follow, **options)
866 else:
877 else:
867 return self._apply_direct(f, args, kwargs, targets=targets, **options)
878 return self._apply_direct(f, args, kwargs, targets=targets, **options)
868
879
869 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
880 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
870 after=None, follow=None):
881 after=None, follow=None, timeout=None):
871 """The underlying method for applying functions in a load balanced
882 """The underlying method for applying functions in a load balanced
872 manner, via the task queue."""
883 manner, via the task queue."""
873 subheader = dict(after=after, follow=follow)
884 subheader = dict(after=after, follow=follow, timeout=timeout)
874 bufs = ss.pack_apply_message(f,args,kwargs)
885 bufs = ss.pack_apply_message(f,args,kwargs)
875 content = dict(bound=bound)
886 content = dict(bound=bound)
876
887
@@ -885,8 +896,7 b' class Client(object):'
885 else:
896 else:
886 return ar
897 return ar
887
898
888 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
899 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None):
889 after=None, follow=None):
890 """Then underlying method for applying functions to specific engines
900 """Then underlying method for applying functions to specific engines
891 via the MUX queue."""
901 via the MUX queue."""
892
902
@@ -100,9 +100,9 b' class ControllerFactory(HubFactory):'
100 self.log.warn("task::using no Task scheduler")
100 self.log.warn("task::using no Task scheduler")
101
101
102 else:
102 else:
103 self.log.warn("task::using Python %s Task scheduler"%self.scheme)
103 self.log.info("task::using Python %s Task scheduler"%self.scheme)
104 sargs = (self.client_addrs['task'], self.engine_addrs['task'], self.monitor_url, self.client_addrs['notification'])
104 sargs = (self.client_addrs['task'], self.engine_addrs['task'], self.monitor_url, self.client_addrs['notification'])
105 q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme))
105 q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level))
106 q.daemon=True
106 q.daemon=True
107 children.append(q)
107 children.append(q)
108
108
@@ -55,7 +55,7 b' def require(*names):'
55 return depend(_require, *names)
55 return depend(_require, *names)
56
56
57 class Dependency(set):
57 class Dependency(set):
58 """An object for representing a set of dependencies.
58 """An object for representing a set of msg_id dependencies.
59
59
60 Subclassed from set()."""
60 Subclassed from set()."""
61
61
@@ -154,6 +154,9 b' class UnmetDependency(KernelError):'
154 class ImpossibleDependency(UnmetDependency):
154 class ImpossibleDependency(UnmetDependency):
155 pass
155 pass
156
156
157 class DependencyTimeout(UnmetDependency):
158 pass
159
157 class RemoteError(KernelError):
160 class RemoteError(KernelError):
158 """Error raised elsewhere"""
161 """Error raised elsewhere"""
159 ename=None
162 ename=None
@@ -12,9 +12,9 b' Python Scheduler exists.'
12 from __future__ import print_function
12 from __future__ import print_function
13 import sys
13 import sys
14 import logging
14 import logging
15 from random import randint,random
15 from random import randint, random
16 from types import FunctionType
16 from types import FunctionType
17
17 from datetime import datetime, timedelta
18 try:
18 try:
19 import numpy
19 import numpy
20 except ImportError:
20 except ImportError:
@@ -29,11 +29,11 b' from IPython.external.decorator import decorator'
29 from IPython.utils.traitlets import Instance, Dict, List, Set
29 from IPython.utils.traitlets import Instance, Dict, List, Set
30
30
31 import error
31 import error
32 from client import Client
32 # from client import Client
33 from dependency import Dependency
33 from dependency import Dependency
34 import streamsession as ss
34 import streamsession as ss
35 from entry_point import connect_logger, local_logger
35 from entry_point import connect_logger, local_logger
36 from factory import LoggingFactory
36 from factory import SessionFactory
37
37
38
38
39 @decorator
39 @decorator
@@ -110,7 +110,7 b' def leastload(loads):'
110 # store empty default dependency:
110 # store empty default dependency:
111 MET = Dependency([])
111 MET = Dependency([])
112
112
113 class TaskScheduler(LoggingFactory):
113 class TaskScheduler(SessionFactory):
114 """Python TaskScheduler object.
114 """Python TaskScheduler object.
115
115
116 This is the simplest object that supports msg_id based
116 This is the simplest object that supports msg_id based
@@ -125,7 +125,6 b' class TaskScheduler(LoggingFactory):'
125 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
125 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
126 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
126 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
127 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
127 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
128 io_loop = Instance(ioloop.IOLoop)
129
128
130 # internals:
129 # internals:
131 dependencies = Dict() # dict by msg_id of [ msg_ids that depend on key ]
130 dependencies = Dict() # dict by msg_id of [ msg_ids that depend on key ]
@@ -141,20 +140,18 b' class TaskScheduler(LoggingFactory):'
141 all_failed = Set() # set of all failed tasks
140 all_failed = Set() # set of all failed tasks
142 all_done = Set() # set of all finished tasks=union(completed,failed)
141 all_done = Set() # set of all finished tasks=union(completed,failed)
143 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
142 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
144 session = Instance(ss.StreamSession)
143 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
145
144
146
145
147 def __init__(self, **kwargs):
146 def start(self):
148 super(TaskScheduler, self).__init__(**kwargs)
149
150 self.session = ss.StreamSession(username="TaskScheduler")
151
152 self.engine_stream.on_recv(self.dispatch_result, copy=False)
147 self.engine_stream.on_recv(self.dispatch_result, copy=False)
153 self._notification_handlers = dict(
148 self._notification_handlers = dict(
154 registration_notification = self._register_engine,
149 registration_notification = self._register_engine,
155 unregistration_notification = self._unregister_engine
150 unregistration_notification = self._unregister_engine
156 )
151 )
157 self.notifier_stream.on_recv(self.dispatch_notification)
152 self.notifier_stream.on_recv(self.dispatch_notification)
153 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 1e3, self.loop) # 1 Hz
154 self.auditor.start()
158 self.log.info("Scheduler started...%r"%self)
155 self.log.info("Scheduler started...%r"%self)
159
156
160 def resume_receiving(self):
157 def resume_receiving(self):
@@ -261,37 +258,55 b' class TaskScheduler(LoggingFactory):'
261
258
262 # location dependencies
259 # location dependencies
263 follow = Dependency(header.get('follow', []))
260 follow = Dependency(header.get('follow', []))
264
265 # check if unreachable:
261 # check if unreachable:
266 if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed):
262 if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed):
267 self.depending[msg_id] = [raw_msg,MET,MET]
263 self.depending[msg_id] = [raw_msg,MET,MET,None]
268 return self.fail_unreachable(msg_id)
264 return self.fail_unreachable(msg_id)
269
265
266 # turn timeouts into datetime objects:
267 timeout = header.get('timeout', None)
268 if timeout:
269 timeout = datetime.now() + timedelta(0,timeout,0)
270
270 if after.check(self.all_completed, self.all_failed):
271 if after.check(self.all_completed, self.all_failed):
271 # time deps already met, try to run
272 # time deps already met, try to run
272 if not self.maybe_run(msg_id, raw_msg, follow):
273 if not self.maybe_run(msg_id, raw_msg, follow):
273 # can't run yet
274 # can't run yet
274 self.save_unmet(msg_id, raw_msg, after, follow)
275 self.save_unmet(msg_id, raw_msg, after, follow, timeout)
275 else:
276 else:
276 self.save_unmet(msg_id, raw_msg, after, follow)
277 self.save_unmet(msg_id, raw_msg, after, follow, timeout)
277
278
278 @logged
279 @logged
279 def fail_unreachable(self, msg_id):
280 def audit_timeouts(self):
281 """Audit all waiting tasks for expired timeouts."""
282 now = datetime.now()
283 for msg_id in self.depending.keys():
284 # must recheck, in case one failure cascaded to another:
285 if msg_id in self.depending:
286 raw,after,follow,timeout = self.depending[msg_id]
287 if timeout and timeout < now:
288 self.fail_unreachable(msg_id, timeout=True)
289
290 @logged
291 def fail_unreachable(self, msg_id, timeout=False):
280 """a message has become unreachable"""
292 """a message has become unreachable"""
281 if msg_id not in self.depending:
293 if msg_id not in self.depending:
282 self.log.error("msg %r already failed!"%msg_id)
294 self.log.error("msg %r already failed!"%msg_id)
283 return
295 return
284 raw_msg, after, follow = self.depending.pop(msg_id)
296 raw_msg, after, follow, timeout = self.depending.pop(msg_id)
285 for mid in follow.union(after):
297 for mid in follow.union(after):
286 if mid in self.dependencies:
298 if mid in self.dependencies:
287 self.dependencies[mid].remove(msg_id)
299 self.dependencies[mid].remove(msg_id)
288
300
301 # FIXME: unpacking a message I've already unpacked, but didn't save:
289 idents,msg = self.session.feed_identities(raw_msg, copy=False)
302 idents,msg = self.session.feed_identities(raw_msg, copy=False)
290 msg = self.session.unpack_message(msg, copy=False, content=False)
303 msg = self.session.unpack_message(msg, copy=False, content=False)
291 header = msg['header']
304 header = msg['header']
292
305
306 impossible = error.DependencyTimeout if timeout else error.ImpossibleDependency
307
293 try:
308 try:
294 raise error.ImpossibleDependency()
309 raise impossible()
295 except:
310 except:
296 content = ss.wrap_exception()
311 content = ss.wrap_exception()
297
312
@@ -334,9 +349,9 b' class TaskScheduler(LoggingFactory):'
334 return True
349 return True
335
350
336 @logged
351 @logged
337 def save_unmet(self, msg_id, raw_msg, after, follow):
352 def save_unmet(self, msg_id, raw_msg, after, follow, timeout):
338 """Save a message for later submission when its dependencies are met."""
353 """Save a message for later submission when its dependencies are met."""
339 self.depending[msg_id] = [raw_msg,after,follow]
354 self.depending[msg_id] = [raw_msg,after,follow,timeout]
340 # track the ids in follow or after, but not those already finished
355 # track the ids in follow or after, but not those already finished
341 for dep_id in after.union(follow).difference(self.all_done):
356 for dep_id in after.union(follow).difference(self.all_done):
342 if dep_id not in self.dependencies:
357 if dep_id not in self.dependencies:
@@ -413,10 +428,10 b' class TaskScheduler(LoggingFactory):'
413 if msg_id not in self.blacklist:
428 if msg_id not in self.blacklist:
414 self.blacklist[msg_id] = set()
429 self.blacklist[msg_id] = set()
415 self.blacklist[msg_id].add(engine)
430 self.blacklist[msg_id].add(engine)
416 raw_msg,follow = self.pending[engine].pop(msg_id)
431 raw_msg,follow,timeout = self.pending[engine].pop(msg_id)
417 if not self.maybe_run(msg_id, raw_msg, follow):
432 if not self.maybe_run(msg_id, raw_msg, follow):
418 # resubmit failed, put it back in our dependency tree
433 # resubmit failed, put it back in our dependency tree
419 self.save_unmet(msg_id, raw_msg, MET, follow)
434 self.save_unmet(msg_id, raw_msg, MET, follow, timeout)
420 pass
435 pass
421
436
422 @logged
437 @logged
@@ -435,7 +450,7 b' class TaskScheduler(LoggingFactory):'
435 jobs = self.dependencies.pop(dep_id)
450 jobs = self.dependencies.pop(dep_id)
436
451
437 for msg_id in jobs:
452 for msg_id in jobs:
438 raw_msg, after, follow = self.depending[msg_id]
453 raw_msg, after, follow, timeout = self.depending[msg_id]
439 # if dep_id in after:
454 # if dep_id in after:
440 # if after.mode == 'all' and (success or not after.success_only):
455 # if after.mode == 'all' and (success or not after.success_only):
441 # after.remove(dep_id)
456 # after.remove(dep_id)
@@ -497,9 +512,9 b" def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, logname='ZMQ', log_a"
497 local_logger(logname, loglevel)
512 local_logger(logname, loglevel)
498
513
499 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
514 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
500 mon_stream=mons,notifier_stream=nots,
515 mon_stream=mons, notifier_stream=nots,
501 scheme=scheme,io_loop=loop, logname=logname)
516 scheme=scheme, loop=loop, logname=logname)
502
517 scheduler.start()
503 try:
518 try:
504 loop.start()
519 loop.start()
505 except KeyboardInterrupt:
520 except KeyboardInterrupt:
General Comments 0
You need to be logged in to leave comments. Login now