Show More
@@ -36,7 +36,7 b' class AsyncResult(object):' | |||||
36 | self._fname=fname |
|
36 | self._fname=fname | |
37 | self._ready = False |
|
37 | self._ready = False | |
38 | self._success = None |
|
38 | self._success = None | |
39 |
self._ |
|
39 | self._single_result = len(msg_ids) == 1 | |
40 |
|
40 | |||
41 | def __repr__(self): |
|
41 | def __repr__(self): | |
42 | if self._ready: |
|
42 | if self._ready: | |
@@ -50,7 +50,7 b' class AsyncResult(object):' | |||||
50 | Override me in subclasses for turning a list of results |
|
50 | Override me in subclasses for turning a list of results | |
51 | into the expected form. |
|
51 | into the expected form. | |
52 | """ |
|
52 | """ | |
53 |
if self._ |
|
53 | if self._single_result: | |
54 | return res[0] |
|
54 | return res[0] | |
55 | else: |
|
55 | else: | |
56 | return res |
|
56 | return res | |
@@ -90,6 +90,11 b' class AsyncResult(object):' | |||||
90 | try: |
|
90 | try: | |
91 | results = map(self._client.results.get, self.msg_ids) |
|
91 | results = map(self._client.results.get, self.msg_ids) | |
92 | self._result = results |
|
92 | self._result = results | |
|
93 | if self._single_result: | |||
|
94 | r = results[0] | |||
|
95 | if isinstance(r, Exception): | |||
|
96 | raise r | |||
|
97 | else: | |||
93 | results = error.collect_exceptions(results, self._fname) |
|
98 | results = error.collect_exceptions(results, self._fname) | |
94 | self._result = self._reconstruct_result(results) |
|
99 | self._result = self._reconstruct_result(results) | |
95 | except Exception, e: |
|
100 | except Exception, e: | |
@@ -138,7 +143,7 b' class AsyncResult(object):' | |||||
138 | @check_ready |
|
143 | @check_ready | |
139 | def metadata(self): |
|
144 | def metadata(self): | |
140 | """metadata property.""" |
|
145 | """metadata property.""" | |
141 |
if self._ |
|
146 | if self._single_result: | |
142 | return self._metadata[0] |
|
147 | return self._metadata[0] | |
143 | else: |
|
148 | else: | |
144 | return self._metadata |
|
149 | return self._metadata | |
@@ -165,7 +170,7 b' class AsyncResult(object):' | |||||
165 | return error.collect_exceptions(self._result[key], self._fname) |
|
170 | return error.collect_exceptions(self._result[key], self._fname) | |
166 | elif isinstance(key, basestring): |
|
171 | elif isinstance(key, basestring): | |
167 | values = [ md[key] for md in self._metadata ] |
|
172 | values = [ md[key] for md in self._metadata ] | |
168 |
if self._ |
|
173 | if self._single_result: | |
169 | return values[0] |
|
174 | return values[0] | |
170 | else: |
|
175 | else: | |
171 | return values |
|
176 | return values | |
@@ -190,7 +195,7 b' class AsyncMapResult(AsyncResult):' | |||||
190 | def __init__(self, client, msg_ids, mapObject, fname=''): |
|
195 | def __init__(self, client, msg_ids, mapObject, fname=''): | |
191 | AsyncResult.__init__(self, client, msg_ids, fname=fname) |
|
196 | AsyncResult.__init__(self, client, msg_ids, fname=fname) | |
192 | self._mapObject = mapObject |
|
197 | self._mapObject = mapObject | |
193 |
self._ |
|
198 | self._single_result = False | |
194 |
|
199 | |||
195 | def _reconstruct_result(self, res): |
|
200 | def _reconstruct_result(self, res): | |
196 | """Perform the gather on the actual results.""" |
|
201 | """Perform the gather on the actual results.""" |
@@ -766,8 +766,25 b' class Client(object):' | |||||
766 |
|
766 | |||
767 | return result |
|
767 | return result | |
768 |
|
768 | |||
|
769 | def _build_dependency(self, dep): | |||
|
770 | """helper for building jsonable dependencies from various input forms""" | |||
|
771 | if isinstance(dep, Dependency): | |||
|
772 | return dep.as_dict() | |||
|
773 | elif isinstance(dep, AsyncResult): | |||
|
774 | return dep.msg_ids | |||
|
775 | elif dep is None: | |||
|
776 | return [] | |||
|
777 | elif isinstance(dep, set): | |||
|
778 | return list(dep) | |||
|
779 | elif isinstance(dep, (list,dict)): | |||
|
780 | return dep | |||
|
781 | elif isinstance(dep, str): | |||
|
782 | return [dep] | |||
|
783 | else: | |||
|
784 | raise TypeError("Dependency may be: set,list,dict,Dependency or AsyncResult, not %r"%type(dep)) | |||
|
785 | ||||
769 | def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None, |
|
786 | def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None, | |
770 | after=None, follow=None): |
|
787 | after=None, follow=None, timeout=None): | |
771 | """Call `f(*args, **kwargs)` on a remote engine(s), returning the result. |
|
788 | """Call `f(*args, **kwargs)` on a remote engine(s), returning the result. | |
772 |
|
789 | |||
773 | This is the central execution command for the client. |
|
790 | This is the central execution command for the client. | |
@@ -817,6 +834,10 b' class Client(object):' | |||||
817 | This job will only be run on an engine where this dependency |
|
834 | This job will only be run on an engine where this dependency | |
818 | is met. |
|
835 | is met. | |
819 |
|
836 | |||
|
837 | timeout : float or None | |||
|
838 | Only for load-balanced execution (targets=None) | |||
|
839 | Specify an amount of time (in seconds) | |||
|
840 | ||||
820 | Returns |
|
841 | Returns | |
821 | ------- |
|
842 | ------- | |
822 | if block is False: |
|
843 | if block is False: | |
@@ -845,32 +866,22 b' class Client(object):' | |||||
845 | if not isinstance(kwargs, dict): |
|
866 | if not isinstance(kwargs, dict): | |
846 | raise TypeError("kwargs must be dict, not %s"%type(kwargs)) |
|
867 | raise TypeError("kwargs must be dict, not %s"%type(kwargs)) | |
847 |
|
|
868 | ||
848 | if isinstance(after, Dependency): |
|
869 | after = self._build_dependency(after) | |
849 | after = after.as_dict() |
|
870 | follow = self._build_dependency(follow) | |
850 | elif isinstance(after, AsyncResult): |
|
871 | ||
851 | after=after.msg_ids |
|
872 | options = dict(bound=bound, block=block) | |
852 | elif after is None: |
|
|||
853 | after = [] |
|
|||
854 | if isinstance(follow, Dependency): |
|
|||
855 | # if len(follow) > 1 and follow.mode == 'all': |
|
|||
856 | # warn("complex follow-dependencies are not rigorously tested for reachability", UserWarning) |
|
|||
857 | follow = follow.as_dict() |
|
|||
858 | elif isinstance(follow, AsyncResult): |
|
|||
859 | follow=follow.msg_ids |
|
|||
860 | elif follow is None: |
|
|||
861 | follow = [] |
|
|||
862 | options = dict(bound=bound, block=block, after=after, follow=follow) |
|
|||
863 |
|
873 | |||
864 | if targets is None: |
|
874 | if targets is None: | |
865 |
return self._apply_balanced(f, args, kwargs, |
|
875 | return self._apply_balanced(f, args, kwargs, timeout=timeout, | |
|
876 | after=after, follow=follow, **options) | |||
866 | else: |
|
877 | else: | |
867 | return self._apply_direct(f, args, kwargs, targets=targets, **options) |
|
878 | return self._apply_direct(f, args, kwargs, targets=targets, **options) | |
868 |
|
879 | |||
869 | def _apply_balanced(self, f, args, kwargs, bound=True, block=None, |
|
880 | def _apply_balanced(self, f, args, kwargs, bound=True, block=None, | |
870 | after=None, follow=None): |
|
881 | after=None, follow=None, timeout=None): | |
871 | """The underlying method for applying functions in a load balanced |
|
882 | """The underlying method for applying functions in a load balanced | |
872 | manner, via the task queue.""" |
|
883 | manner, via the task queue.""" | |
873 | subheader = dict(after=after, follow=follow) |
|
884 | subheader = dict(after=after, follow=follow, timeout=timeout) | |
874 | bufs = ss.pack_apply_message(f,args,kwargs) |
|
885 | bufs = ss.pack_apply_message(f,args,kwargs) | |
875 | content = dict(bound=bound) |
|
886 | content = dict(bound=bound) | |
876 |
|
887 | |||
@@ -885,8 +896,7 b' class Client(object):' | |||||
885 | else: |
|
896 | else: | |
886 | return ar |
|
897 | return ar | |
887 |
|
898 | |||
888 |
def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None |
|
899 | def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None): | |
889 | after=None, follow=None): |
|
|||
890 | """Then underlying method for applying functions to specific engines |
|
900 | """Then underlying method for applying functions to specific engines | |
891 | via the MUX queue.""" |
|
901 | via the MUX queue.""" | |
892 |
|
902 |
@@ -100,9 +100,9 b' class ControllerFactory(HubFactory):' | |||||
100 | self.log.warn("task::using no Task scheduler") |
|
100 | self.log.warn("task::using no Task scheduler") | |
101 |
|
101 | |||
102 | else: |
|
102 | else: | |
103 |
self.log. |
|
103 | self.log.info("task::using Python %s Task scheduler"%self.scheme) | |
104 | sargs = (self.client_addrs['task'], self.engine_addrs['task'], self.monitor_url, self.client_addrs['notification']) |
|
104 | sargs = (self.client_addrs['task'], self.engine_addrs['task'], self.monitor_url, self.client_addrs['notification']) | |
105 | q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme)) |
|
105 | q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level)) | |
106 | q.daemon=True |
|
106 | q.daemon=True | |
107 | children.append(q) |
|
107 | children.append(q) | |
108 |
|
108 |
@@ -55,7 +55,7 b' def require(*names):' | |||||
55 | return depend(_require, *names) |
|
55 | return depend(_require, *names) | |
56 |
|
56 | |||
57 | class Dependency(set): |
|
57 | class Dependency(set): | |
58 | """An object for representing a set of dependencies. |
|
58 | """An object for representing a set of msg_id dependencies. | |
59 |
|
59 | |||
60 | Subclassed from set().""" |
|
60 | Subclassed from set().""" | |
61 |
|
61 |
@@ -154,6 +154,9 b' class UnmetDependency(KernelError):' | |||||
154 | class ImpossibleDependency(UnmetDependency): |
|
154 | class ImpossibleDependency(UnmetDependency): | |
155 | pass |
|
155 | pass | |
156 |
|
156 | |||
|
157 | class DependencyTimeout(UnmetDependency): | |||
|
158 | pass | |||
|
159 | ||||
157 | class RemoteError(KernelError): |
|
160 | class RemoteError(KernelError): | |
158 | """Error raised elsewhere""" |
|
161 | """Error raised elsewhere""" | |
159 | ename=None |
|
162 | ename=None |
@@ -14,7 +14,7 b' import sys' | |||||
14 | import logging |
|
14 | import logging | |
15 | from random import randint,random |
|
15 | from random import randint, random | |
16 | from types import FunctionType |
|
16 | from types import FunctionType | |
17 |
|
17 | from datetime import datetime, timedelta | ||
18 | try: |
|
18 | try: | |
19 | import numpy |
|
19 | import numpy | |
20 | except ImportError: |
|
20 | except ImportError: | |
@@ -29,11 +29,11 b' from IPython.external.decorator import decorator' | |||||
29 | from IPython.utils.traitlets import Instance, Dict, List, Set |
|
29 | from IPython.utils.traitlets import Instance, Dict, List, Set | |
30 |
|
30 | |||
31 | import error |
|
31 | import error | |
32 | from client import Client |
|
32 | # from client import Client | |
33 | from dependency import Dependency |
|
33 | from dependency import Dependency | |
34 | import streamsession as ss |
|
34 | import streamsession as ss | |
35 | from entry_point import connect_logger, local_logger |
|
35 | from entry_point import connect_logger, local_logger | |
36 |
from factory import |
|
36 | from factory import SessionFactory | |
37 |
|
37 | |||
38 |
|
38 | |||
39 | @decorator |
|
39 | @decorator | |
@@ -110,7 +110,7 b' def leastload(loads):' | |||||
110 | # store empty default dependency: |
|
110 | # store empty default dependency: | |
111 | MET = Dependency([]) |
|
111 | MET = Dependency([]) | |
112 |
|
112 | |||
113 |
class TaskScheduler( |
|
113 | class TaskScheduler(SessionFactory): | |
114 | """Python TaskScheduler object. |
|
114 | """Python TaskScheduler object. | |
115 |
|
115 | |||
116 | This is the simplest object that supports msg_id based |
|
116 | This is the simplest object that supports msg_id based | |
@@ -125,7 +125,6 b' class TaskScheduler(LoggingFactory):' | |||||
125 | engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream |
|
125 | engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream | |
126 | notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream |
|
126 | notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream | |
127 | mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream |
|
127 | mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream | |
128 | io_loop = Instance(ioloop.IOLoop) |
|
|||
129 |
|
128 | |||
130 | # internals: |
|
129 | # internals: | |
131 | dependencies = Dict() # dict by msg_id of [ msg_ids that depend on key ] |
|
130 | dependencies = Dict() # dict by msg_id of [ msg_ids that depend on key ] | |
@@ -141,20 +140,18 b' class TaskScheduler(LoggingFactory):' | |||||
141 | all_failed = Set() # set of all failed tasks |
|
140 | all_failed = Set() # set of all failed tasks | |
142 | all_done = Set() # set of all finished tasks=union(completed,failed) |
|
141 | all_done = Set() # set of all finished tasks=union(completed,failed) | |
143 | blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency |
|
142 | blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency | |
144 | session = Instance(ss.StreamSession) |
|
143 | auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback') | |
145 |
|
||||
146 |
|
||||
147 | def __init__(self, **kwargs): |
|
|||
148 | super(TaskScheduler, self).__init__(**kwargs) |
|
|||
149 |
|
|
144 | ||
150 | self.session = ss.StreamSession(username="TaskScheduler") |
|
|||
151 |
|
|
145 | ||
|
146 | def start(self): | |||
152 | self.engine_stream.on_recv(self.dispatch_result, copy=False) |
|
147 | self.engine_stream.on_recv(self.dispatch_result, copy=False) | |
153 | self._notification_handlers = dict( |
|
148 | self._notification_handlers = dict( | |
154 | registration_notification = self._register_engine, |
|
149 | registration_notification = self._register_engine, | |
155 | unregistration_notification = self._unregister_engine |
|
150 | unregistration_notification = self._unregister_engine | |
156 | ) |
|
151 | ) | |
157 | self.notifier_stream.on_recv(self.dispatch_notification) |
|
152 | self.notifier_stream.on_recv(self.dispatch_notification) | |
|
153 | self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 1e3, self.loop) # 1 Hz | |||
|
154 | self.auditor.start() | |||
158 | self.log.info("Scheduler started...%r"%self) |
|
155 | self.log.info("Scheduler started...%r"%self) | |
159 |
|
156 | |||
160 | def resume_receiving(self): |
|
157 | def resume_receiving(self): | |
@@ -261,37 +258,55 b' class TaskScheduler(LoggingFactory):' | |||||
261 |
|
258 | |||
262 | # location dependencies |
|
259 | # location dependencies | |
263 | follow = Dependency(header.get('follow', [])) |
|
260 | follow = Dependency(header.get('follow', [])) | |
264 |
|
||||
265 | # check if unreachable: |
|
261 | # check if unreachable: | |
266 | if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed): |
|
262 | if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed): | |
267 | self.depending[msg_id] = [raw_msg,MET,MET] |
|
263 | self.depending[msg_id] = [raw_msg,MET,MET,None] | |
268 | return self.fail_unreachable(msg_id) |
|
264 | return self.fail_unreachable(msg_id) | |
269 |
|
265 | |||
|
266 | # turn timeouts into datetime objects: | |||
|
267 | timeout = header.get('timeout', None) | |||
|
268 | if timeout: | |||
|
269 | timeout = datetime.now() + timedelta(0,timeout,0) | |||
|
270 | ||||
270 | if after.check(self.all_completed, self.all_failed): |
|
271 | if after.check(self.all_completed, self.all_failed): | |
271 | # time deps already met, try to run |
|
272 | # time deps already met, try to run | |
272 | if not self.maybe_run(msg_id, raw_msg, follow): |
|
273 | if not self.maybe_run(msg_id, raw_msg, follow): | |
273 | # can't run yet |
|
274 | # can't run yet | |
274 | self.save_unmet(msg_id, raw_msg, after, follow) |
|
275 | self.save_unmet(msg_id, raw_msg, after, follow, timeout) | |
275 | else: |
|
276 | else: | |
276 | self.save_unmet(msg_id, raw_msg, after, follow) |
|
277 | self.save_unmet(msg_id, raw_msg, after, follow, timeout) | |
277 |
|
278 | |||
278 | @logged |
|
279 | @logged | |
279 | def fail_unreachable(self, msg_id): |
|
280 | def audit_timeouts(self): | |
|
281 | """Audit all waiting tasks for expired timeouts.""" | |||
|
282 | now = datetime.now() | |||
|
283 | for msg_id in self.depending.keys(): | |||
|
284 | # must recheck, in case one failure cascaded to another: | |||
|
285 | if msg_id in self.depending: | |||
|
286 | raw,after,follow,timeout = self.depending[msg_id] | |||
|
287 | if timeout and timeout < now: | |||
|
288 | self.fail_unreachable(msg_id, timeout=True) | |||
|
289 | ||||
|
290 | @logged | |||
|
291 | def fail_unreachable(self, msg_id, timeout=False): | |||
280 | """a message has become unreachable""" |
|
292 | """a message has become unreachable""" | |
281 | if msg_id not in self.depending: |
|
293 | if msg_id not in self.depending: | |
282 | self.log.error("msg %r already failed!"%msg_id) |
|
294 | self.log.error("msg %r already failed!"%msg_id) | |
283 | return |
|
295 | return | |
284 | raw_msg, after, follow = self.depending.pop(msg_id) |
|
296 | raw_msg, after, follow, timeout = self.depending.pop(msg_id) | |
285 | for mid in follow.union(after): |
|
297 | for mid in follow.union(after): | |
286 | if mid in self.dependencies: |
|
298 | if mid in self.dependencies: | |
287 | self.dependencies[mid].remove(msg_id) |
|
299 | self.dependencies[mid].remove(msg_id) | |
288 |
|
300 | |||
|
301 | # FIXME: unpacking a message I've already unpacked, but didn't save: | |||
289 | idents,msg = self.session.feed_identities(raw_msg, copy=False) |
|
302 | idents,msg = self.session.feed_identities(raw_msg, copy=False) | |
290 | msg = self.session.unpack_message(msg, copy=False, content=False) |
|
303 | msg = self.session.unpack_message(msg, copy=False, content=False) | |
291 | header = msg['header'] |
|
304 | header = msg['header'] | |
292 |
|
305 | |||
|
306 | impossible = error.DependencyTimeout if timeout else error.ImpossibleDependency | |||
|
307 | ||||
293 | try: |
|
308 | try: | |
294 |
raise |
|
309 | raise impossible() | |
295 | except: |
|
310 | except: | |
296 | content = ss.wrap_exception() |
|
311 | content = ss.wrap_exception() | |
297 |
|
312 | |||
@@ -334,9 +349,9 b' class TaskScheduler(LoggingFactory):' | |||||
334 | return True |
|
349 | return True | |
335 |
|
350 | |||
336 | @logged |
|
351 | @logged | |
337 | def save_unmet(self, msg_id, raw_msg, after, follow): |
|
352 | def save_unmet(self, msg_id, raw_msg, after, follow, timeout): | |
338 | """Save a message for later submission when its dependencies are met.""" |
|
353 | """Save a message for later submission when its dependencies are met.""" | |
339 | self.depending[msg_id] = [raw_msg,after,follow] |
|
354 | self.depending[msg_id] = [raw_msg,after,follow,timeout] | |
340 | # track the ids in follow or after, but not those already finished |
|
355 | # track the ids in follow or after, but not those already finished | |
341 | for dep_id in after.union(follow).difference(self.all_done): |
|
356 | for dep_id in after.union(follow).difference(self.all_done): | |
342 | if dep_id not in self.dependencies: |
|
357 | if dep_id not in self.dependencies: | |
@@ -413,10 +428,10 b' class TaskScheduler(LoggingFactory):' | |||||
413 | if msg_id not in self.blacklist: |
|
428 | if msg_id not in self.blacklist: | |
414 | self.blacklist[msg_id] = set() |
|
429 | self.blacklist[msg_id] = set() | |
415 | self.blacklist[msg_id].add(engine) |
|
430 | self.blacklist[msg_id].add(engine) | |
416 | raw_msg,follow = self.pending[engine].pop(msg_id) |
|
431 | raw_msg,follow,timeout = self.pending[engine].pop(msg_id) | |
417 | if not self.maybe_run(msg_id, raw_msg, follow): |
|
432 | if not self.maybe_run(msg_id, raw_msg, follow): | |
418 | # resubmit failed, put it back in our dependency tree |
|
433 | # resubmit failed, put it back in our dependency tree | |
419 | self.save_unmet(msg_id, raw_msg, MET, follow) |
|
434 | self.save_unmet(msg_id, raw_msg, MET, follow, timeout) | |
420 | pass |
|
435 | pass | |
421 |
|
436 | |||
422 | @logged |
|
437 | @logged | |
@@ -435,7 +450,7 b' class TaskScheduler(LoggingFactory):' | |||||
435 | jobs = self.dependencies.pop(dep_id) |
|
450 | jobs = self.dependencies.pop(dep_id) | |
436 |
|
451 | |||
437 | for msg_id in jobs: |
|
452 | for msg_id in jobs: | |
438 | raw_msg, after, follow = self.depending[msg_id] |
|
453 | raw_msg, after, follow, timeout = self.depending[msg_id] | |
439 | # if dep_id in after: |
|
454 | # if dep_id in after: | |
440 | # if after.mode == 'all' and (success or not after.success_only): |
|
455 | # if after.mode == 'all' and (success or not after.success_only): | |
441 | # after.remove(dep_id) |
|
456 | # after.remove(dep_id) | |
@@ -498,8 +513,8 b" def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, logname='ZMQ', log_a" | |||||
498 |
|
513 | |||
499 | scheduler = TaskScheduler(client_stream=ins, engine_stream=outs, |
|
514 | scheduler = TaskScheduler(client_stream=ins, engine_stream=outs, | |
500 | mon_stream=mons,notifier_stream=nots, |
|
515 | mon_stream=mons, notifier_stream=nots, | |
501 |
scheme=scheme, |
|
516 | scheme=scheme, loop=loop, logname=logname) | |
502 |
|
517 | scheduler.start() | ||
503 | try: |
|
518 | try: | |
504 | loop.start() |
|
519 | loop.start() | |
505 | except KeyboardInterrupt: |
|
520 | except KeyboardInterrupt: |
General Comments 0
You need to be logged in to leave comments.
Login now