##// END OF EJS Templates
add timeout for unmet dependencies in task scheduler
MinRK -
Show More
@@ -36,7 +36,7 b' class AsyncResult(object):'
36 36 self._fname=fname
37 37 self._ready = False
38 38 self._success = None
39 self._flatten_result = len(msg_ids) == 1
39 self._single_result = len(msg_ids) == 1
40 40
41 41 def __repr__(self):
42 42 if self._ready:
@@ -50,7 +50,7 b' class AsyncResult(object):'
50 50 Override me in subclasses for turning a list of results
51 51 into the expected form.
52 52 """
53 if self._flatten_result:
53 if self._single_result:
54 54 return res[0]
55 55 else:
56 56 return res
@@ -90,7 +90,12 b' class AsyncResult(object):'
90 90 try:
91 91 results = map(self._client.results.get, self.msg_ids)
92 92 self._result = results
93 results = error.collect_exceptions(results, self._fname)
93 if self._single_result:
94 r = results[0]
95 if isinstance(r, Exception):
96 raise r
97 else:
98 results = error.collect_exceptions(results, self._fname)
94 99 self._result = self._reconstruct_result(results)
95 100 except Exception, e:
96 101 self._exception = e
@@ -138,7 +143,7 b' class AsyncResult(object):'
138 143 @check_ready
139 144 def metadata(self):
140 145 """metadata property."""
141 if self._flatten_result:
146 if self._single_result:
142 147 return self._metadata[0]
143 148 else:
144 149 return self._metadata
@@ -165,7 +170,7 b' class AsyncResult(object):'
165 170 return error.collect_exceptions(self._result[key], self._fname)
166 171 elif isinstance(key, basestring):
167 172 values = [ md[key] for md in self._metadata ]
168 if self._flatten_result:
173 if self._single_result:
169 174 return values[0]
170 175 else:
171 176 return values
@@ -190,7 +195,7 b' class AsyncMapResult(AsyncResult):'
190 195 def __init__(self, client, msg_ids, mapObject, fname=''):
191 196 AsyncResult.__init__(self, client, msg_ids, fname=fname)
192 197 self._mapObject = mapObject
193 self._flatten_result = False
198 self._single_result = False
194 199
195 200 def _reconstruct_result(self, res):
196 201 """Perform the gather on the actual results."""
@@ -765,9 +765,26 b' class Client(object):'
765 765 raise result
766 766
767 767 return result
768
768
769 def _build_dependency(self, dep):
770 """helper for building jsonable dependencies from various input forms"""
771 if isinstance(dep, Dependency):
772 return dep.as_dict()
773 elif isinstance(dep, AsyncResult):
774 return dep.msg_ids
775 elif dep is None:
776 return []
777 elif isinstance(dep, set):
778 return list(dep)
779 elif isinstance(dep, (list,dict)):
780 return dep
781 elif isinstance(dep, str):
782 return [dep]
783 else:
784 raise TypeError("Dependency may be: set,list,dict,Dependency or AsyncResult, not %r"%type(dep))
785
769 786 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
770 after=None, follow=None):
787 after=None, follow=None, timeout=None):
771 788 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
772 789
773 790 This is the central execution command for the client.
@@ -817,6 +834,10 b' class Client(object):'
817 834 This job will only be run on an engine where this dependency
818 835 is met.
819 836
837 timeout : float or None
838 Only for load-balanced execution (targets=None)
839 Specify an amount of time (in seconds)
840
820 841 Returns
821 842 -------
822 843 if block is False:
@@ -844,33 +865,23 b' class Client(object):'
844 865 raise TypeError("args must be tuple or list, not %s"%type(args))
845 866 if not isinstance(kwargs, dict):
846 867 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
847
848 if isinstance(after, Dependency):
849 after = after.as_dict()
850 elif isinstance(after, AsyncResult):
851 after=after.msg_ids
852 elif after is None:
853 after = []
854 if isinstance(follow, Dependency):
855 # if len(follow) > 1 and follow.mode == 'all':
856 # warn("complex follow-dependencies are not rigorously tested for reachability", UserWarning)
857 follow = follow.as_dict()
858 elif isinstance(follow, AsyncResult):
859 follow=follow.msg_ids
860 elif follow is None:
861 follow = []
862 options = dict(bound=bound, block=block, after=after, follow=follow)
868
869 after = self._build_dependency(after)
870 follow = self._build_dependency(follow)
871
872 options = dict(bound=bound, block=block)
863 873
864 874 if targets is None:
865 return self._apply_balanced(f, args, kwargs, **options)
875 return self._apply_balanced(f, args, kwargs, timeout=timeout,
876 after=after, follow=follow, **options)
866 877 else:
867 878 return self._apply_direct(f, args, kwargs, targets=targets, **options)
868 879
869 880 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
870 after=None, follow=None):
881 after=None, follow=None, timeout=None):
871 882 """The underlying method for applying functions in a load balanced
872 883 manner, via the task queue."""
873 subheader = dict(after=after, follow=follow)
884 subheader = dict(after=after, follow=follow, timeout=timeout)
874 885 bufs = ss.pack_apply_message(f,args,kwargs)
875 886 content = dict(bound=bound)
876 887
@@ -885,8 +896,7 b' class Client(object):'
885 896 else:
886 897 return ar
887 898
888 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
889 after=None, follow=None):
899 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None):
890 900 """Then underlying method for applying functions to specific engines
891 901 via the MUX queue."""
892 902
@@ -100,9 +100,9 b' class ControllerFactory(HubFactory):'
100 100 self.log.warn("task::using no Task scheduler")
101 101
102 102 else:
103 self.log.warn("task::using Python %s Task scheduler"%self.scheme)
103 self.log.info("task::using Python %s Task scheduler"%self.scheme)
104 104 sargs = (self.client_addrs['task'], self.engine_addrs['task'], self.monitor_url, self.client_addrs['notification'])
105 q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme))
105 q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level))
106 106 q.daemon=True
107 107 children.append(q)
108 108
@@ -55,7 +55,7 b' def require(*names):'
55 55 return depend(_require, *names)
56 56
57 57 class Dependency(set):
58 """An object for representing a set of dependencies.
58 """An object for representing a set of msg_id dependencies.
59 59
60 60 Subclassed from set()."""
61 61
@@ -154,6 +154,9 b' class UnmetDependency(KernelError):'
154 154 class ImpossibleDependency(UnmetDependency):
155 155 pass
156 156
157 class DependencyTimeout(UnmetDependency):
158 pass
159
157 160 class RemoteError(KernelError):
158 161 """Error raised elsewhere"""
159 162 ename=None
@@ -12,9 +12,9 b' Python Scheduler exists.'
12 12 from __future__ import print_function
13 13 import sys
14 14 import logging
15 from random import randint,random
15 from random import randint, random
16 16 from types import FunctionType
17
17 from datetime import datetime, timedelta
18 18 try:
19 19 import numpy
20 20 except ImportError:
@@ -29,11 +29,11 b' from IPython.external.decorator import decorator'
29 29 from IPython.utils.traitlets import Instance, Dict, List, Set
30 30
31 31 import error
32 from client import Client
32 # from client import Client
33 33 from dependency import Dependency
34 34 import streamsession as ss
35 35 from entry_point import connect_logger, local_logger
36 from factory import LoggingFactory
36 from factory import SessionFactory
37 37
38 38
39 39 @decorator
@@ -110,7 +110,7 b' def leastload(loads):'
110 110 # store empty default dependency:
111 111 MET = Dependency([])
112 112
113 class TaskScheduler(LoggingFactory):
113 class TaskScheduler(SessionFactory):
114 114 """Python TaskScheduler object.
115 115
116 116 This is the simplest object that supports msg_id based
@@ -125,7 +125,6 b' class TaskScheduler(LoggingFactory):'
125 125 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
126 126 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
127 127 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
128 io_loop = Instance(ioloop.IOLoop)
129 128
130 129 # internals:
131 130 dependencies = Dict() # dict by msg_id of [ msg_ids that depend on key ]
@@ -141,20 +140,18 b' class TaskScheduler(LoggingFactory):'
141 140 all_failed = Set() # set of all failed tasks
142 141 all_done = Set() # set of all finished tasks=union(completed,failed)
143 142 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
144 session = Instance(ss.StreamSession)
143 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
145 144
146 145
147 def __init__(self, **kwargs):
148 super(TaskScheduler, self).__init__(**kwargs)
149
150 self.session = ss.StreamSession(username="TaskScheduler")
151
146 def start(self):
152 147 self.engine_stream.on_recv(self.dispatch_result, copy=False)
153 148 self._notification_handlers = dict(
154 149 registration_notification = self._register_engine,
155 150 unregistration_notification = self._unregister_engine
156 151 )
157 152 self.notifier_stream.on_recv(self.dispatch_notification)
153 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 1e3, self.loop) # 1 Hz
154 self.auditor.start()
158 155 self.log.info("Scheduler started...%r"%self)
159 156
160 157 def resume_receiving(self):
@@ -261,37 +258,55 b' class TaskScheduler(LoggingFactory):'
261 258
262 259 # location dependencies
263 260 follow = Dependency(header.get('follow', []))
264
265 261 # check if unreachable:
266 262 if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed):
267 self.depending[msg_id] = [raw_msg,MET,MET]
263 self.depending[msg_id] = [raw_msg,MET,MET,None]
268 264 return self.fail_unreachable(msg_id)
269 265
266 # turn timeouts into datetime objects:
267 timeout = header.get('timeout', None)
268 if timeout:
269 timeout = datetime.now() + timedelta(0,timeout,0)
270
270 271 if after.check(self.all_completed, self.all_failed):
271 272 # time deps already met, try to run
272 273 if not self.maybe_run(msg_id, raw_msg, follow):
273 274 # can't run yet
274 self.save_unmet(msg_id, raw_msg, after, follow)
275 self.save_unmet(msg_id, raw_msg, after, follow, timeout)
275 276 else:
276 self.save_unmet(msg_id, raw_msg, after, follow)
277 self.save_unmet(msg_id, raw_msg, after, follow, timeout)
277 278
278 279 @logged
279 def fail_unreachable(self, msg_id):
280 def audit_timeouts(self):
281 """Audit all waiting tasks for expired timeouts."""
282 now = datetime.now()
283 for msg_id in self.depending.keys():
284 # must recheck, in case one failure cascaded to another:
285 if msg_id in self.depending:
286 raw,after,follow,timeout = self.depending[msg_id]
287 if timeout and timeout < now:
288 self.fail_unreachable(msg_id, timeout=True)
289
290 @logged
291 def fail_unreachable(self, msg_id, timeout=False):
280 292 """a message has become unreachable"""
281 293 if msg_id not in self.depending:
282 294 self.log.error("msg %r already failed!"%msg_id)
283 295 return
284 raw_msg, after, follow = self.depending.pop(msg_id)
296 raw_msg, after, follow, timeout = self.depending.pop(msg_id)
285 297 for mid in follow.union(after):
286 298 if mid in self.dependencies:
287 299 self.dependencies[mid].remove(msg_id)
288 300
301 # FIXME: unpacking a message I've already unpacked, but didn't save:
289 302 idents,msg = self.session.feed_identities(raw_msg, copy=False)
290 303 msg = self.session.unpack_message(msg, copy=False, content=False)
291 304 header = msg['header']
292 305
306 impossible = error.DependencyTimeout if timeout else error.ImpossibleDependency
307
293 308 try:
294 raise error.ImpossibleDependency()
309 raise impossible()
295 310 except:
296 311 content = ss.wrap_exception()
297 312
@@ -334,9 +349,9 b' class TaskScheduler(LoggingFactory):'
334 349 return True
335 350
336 351 @logged
337 def save_unmet(self, msg_id, raw_msg, after, follow):
352 def save_unmet(self, msg_id, raw_msg, after, follow, timeout):
338 353 """Save a message for later submission when its dependencies are met."""
339 self.depending[msg_id] = [raw_msg,after,follow]
354 self.depending[msg_id] = [raw_msg,after,follow,timeout]
340 355 # track the ids in follow or after, but not those already finished
341 356 for dep_id in after.union(follow).difference(self.all_done):
342 357 if dep_id not in self.dependencies:
@@ -413,10 +428,10 b' class TaskScheduler(LoggingFactory):'
413 428 if msg_id not in self.blacklist:
414 429 self.blacklist[msg_id] = set()
415 430 self.blacklist[msg_id].add(engine)
416 raw_msg,follow = self.pending[engine].pop(msg_id)
431 raw_msg,follow,timeout = self.pending[engine].pop(msg_id)
417 432 if not self.maybe_run(msg_id, raw_msg, follow):
418 433 # resubmit failed, put it back in our dependency tree
419 self.save_unmet(msg_id, raw_msg, MET, follow)
434 self.save_unmet(msg_id, raw_msg, MET, follow, timeout)
420 435 pass
421 436
422 437 @logged
@@ -435,7 +450,7 b' class TaskScheduler(LoggingFactory):'
435 450 jobs = self.dependencies.pop(dep_id)
436 451
437 452 for msg_id in jobs:
438 raw_msg, after, follow = self.depending[msg_id]
453 raw_msg, after, follow, timeout = self.depending[msg_id]
439 454 # if dep_id in after:
440 455 # if after.mode == 'all' and (success or not after.success_only):
441 456 # after.remove(dep_id)
@@ -497,9 +512,9 b" def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, logname='ZMQ', log_a"
497 512 local_logger(logname, loglevel)
498 513
499 514 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
500 mon_stream=mons,notifier_stream=nots,
501 scheme=scheme,io_loop=loop, logname=logname)
502
515 mon_stream=mons, notifier_stream=nots,
516 scheme=scheme, loop=loop, logname=logname)
517 scheduler.start()
503 518 try:
504 519 loop.start()
505 520 except KeyboardInterrupt:
General Comments 0
You need to be logged in to leave comments. Login now