Show More
@@ -36,7 +36,7 b' class AsyncResult(object):' | |||
|
36 | 36 | self._fname=fname |
|
37 | 37 | self._ready = False |
|
38 | 38 | self._success = None |
|
39 |
self._ |
|
|
39 | self._single_result = len(msg_ids) == 1 | |
|
40 | 40 | |
|
41 | 41 | def __repr__(self): |
|
42 | 42 | if self._ready: |
@@ -50,7 +50,7 b' class AsyncResult(object):' | |||
|
50 | 50 | Override me in subclasses for turning a list of results |
|
51 | 51 | into the expected form. |
|
52 | 52 | """ |
|
53 |
if self._ |
|
|
53 | if self._single_result: | |
|
54 | 54 | return res[0] |
|
55 | 55 | else: |
|
56 | 56 | return res |
@@ -90,7 +90,12 b' class AsyncResult(object):' | |||
|
90 | 90 | try: |
|
91 | 91 | results = map(self._client.results.get, self.msg_ids) |
|
92 | 92 | self._result = results |
|
93 | results = error.collect_exceptions(results, self._fname) | |
|
93 | if self._single_result: | |
|
94 | r = results[0] | |
|
95 | if isinstance(r, Exception): | |
|
96 | raise r | |
|
97 | else: | |
|
98 | results = error.collect_exceptions(results, self._fname) | |
|
94 | 99 | self._result = self._reconstruct_result(results) |
|
95 | 100 | except Exception, e: |
|
96 | 101 | self._exception = e |
@@ -138,7 +143,7 b' class AsyncResult(object):' | |||
|
138 | 143 | @check_ready |
|
139 | 144 | def metadata(self): |
|
140 | 145 | """metadata property.""" |
|
141 |
if self._ |
|
|
146 | if self._single_result: | |
|
142 | 147 | return self._metadata[0] |
|
143 | 148 | else: |
|
144 | 149 | return self._metadata |
@@ -165,7 +170,7 b' class AsyncResult(object):' | |||
|
165 | 170 | return error.collect_exceptions(self._result[key], self._fname) |
|
166 | 171 | elif isinstance(key, basestring): |
|
167 | 172 | values = [ md[key] for md in self._metadata ] |
|
168 |
if self._ |
|
|
173 | if self._single_result: | |
|
169 | 174 | return values[0] |
|
170 | 175 | else: |
|
171 | 176 | return values |
@@ -190,7 +195,7 b' class AsyncMapResult(AsyncResult):' | |||
|
190 | 195 | def __init__(self, client, msg_ids, mapObject, fname=''): |
|
191 | 196 | AsyncResult.__init__(self, client, msg_ids, fname=fname) |
|
192 | 197 | self._mapObject = mapObject |
|
193 |
self._ |
|
|
198 | self._single_result = False | |
|
194 | 199 | |
|
195 | 200 | def _reconstruct_result(self, res): |
|
196 | 201 | """Perform the gather on the actual results.""" |
@@ -765,9 +765,26 b' class Client(object):' | |||
|
765 | 765 | raise result |
|
766 | 766 | |
|
767 | 767 | return result |
|
768 | ||
|
768 | ||
|
769 | def _build_dependency(self, dep): | |
|
770 | """helper for building jsonable dependencies from various input forms""" | |
|
771 | if isinstance(dep, Dependency): | |
|
772 | return dep.as_dict() | |
|
773 | elif isinstance(dep, AsyncResult): | |
|
774 | return dep.msg_ids | |
|
775 | elif dep is None: | |
|
776 | return [] | |
|
777 | elif isinstance(dep, set): | |
|
778 | return list(dep) | |
|
779 | elif isinstance(dep, (list,dict)): | |
|
780 | return dep | |
|
781 | elif isinstance(dep, str): | |
|
782 | return [dep] | |
|
783 | else: | |
|
784 | raise TypeError("Dependency may be: set,list,dict,Dependency or AsyncResult, not %r"%type(dep)) | |
|
785 | ||
|
769 | 786 | def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None, |
|
770 | after=None, follow=None): | |
|
787 | after=None, follow=None, timeout=None): | |
|
771 | 788 | """Call `f(*args, **kwargs)` on a remote engine(s), returning the result. |
|
772 | 789 | |
|
773 | 790 | This is the central execution command for the client. |
@@ -817,6 +834,10 b' class Client(object):' | |||
|
817 | 834 | This job will only be run on an engine where this dependency |
|
818 | 835 | is met. |
|
819 | 836 | |
|
837 | timeout : float or None | |
|
838 | Only for load-balanced execution (targets=None) | |
|
839 | Specify an amount of time (in seconds) | |
|
840 | ||
|
820 | 841 | Returns |
|
821 | 842 | ------- |
|
822 | 843 | if block is False: |
@@ -844,33 +865,23 b' class Client(object):' | |||
|
844 | 865 | raise TypeError("args must be tuple or list, not %s"%type(args)) |
|
845 | 866 | if not isinstance(kwargs, dict): |
|
846 | 867 | raise TypeError("kwargs must be dict, not %s"%type(kwargs)) |
|
847 |
|
|
|
848 | if isinstance(after, Dependency): | |
|
849 | after = after.as_dict() | |
|
850 | elif isinstance(after, AsyncResult): | |
|
851 | after=after.msg_ids | |
|
852 | elif after is None: | |
|
853 | after = [] | |
|
854 | if isinstance(follow, Dependency): | |
|
855 | # if len(follow) > 1 and follow.mode == 'all': | |
|
856 | # warn("complex follow-dependencies are not rigorously tested for reachability", UserWarning) | |
|
857 | follow = follow.as_dict() | |
|
858 | elif isinstance(follow, AsyncResult): | |
|
859 | follow=follow.msg_ids | |
|
860 | elif follow is None: | |
|
861 | follow = [] | |
|
862 | options = dict(bound=bound, block=block, after=after, follow=follow) | |
|
868 | ||
|
869 | after = self._build_dependency(after) | |
|
870 | follow = self._build_dependency(follow) | |
|
871 | ||
|
872 | options = dict(bound=bound, block=block) | |
|
863 | 873 | |
|
864 | 874 | if targets is None: |
|
865 |
return self._apply_balanced(f, args, kwargs, |
|
|
875 | return self._apply_balanced(f, args, kwargs, timeout=timeout, | |
|
876 | after=after, follow=follow, **options) | |
|
866 | 877 | else: |
|
867 | 878 | return self._apply_direct(f, args, kwargs, targets=targets, **options) |
|
868 | 879 | |
|
869 | 880 | def _apply_balanced(self, f, args, kwargs, bound=True, block=None, |
|
870 | after=None, follow=None): | |
|
881 | after=None, follow=None, timeout=None): | |
|
871 | 882 | """The underlying method for applying functions in a load balanced |
|
872 | 883 | manner, via the task queue.""" |
|
873 | subheader = dict(after=after, follow=follow) | |
|
884 | subheader = dict(after=after, follow=follow, timeout=timeout) | |
|
874 | 885 | bufs = ss.pack_apply_message(f,args,kwargs) |
|
875 | 886 | content = dict(bound=bound) |
|
876 | 887 | |
@@ -885,8 +896,7 b' class Client(object):' | |||
|
885 | 896 | else: |
|
886 | 897 | return ar |
|
887 | 898 | |
|
888 |
def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None |
|
|
889 | after=None, follow=None): | |
|
899 | def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None): | |
|
890 | 900 | """Then underlying method for applying functions to specific engines |
|
891 | 901 | via the MUX queue.""" |
|
892 | 902 |
@@ -100,9 +100,9 b' class ControllerFactory(HubFactory):' | |||
|
100 | 100 | self.log.warn("task::using no Task scheduler") |
|
101 | 101 | |
|
102 | 102 | else: |
|
103 |
self.log. |
|
|
103 | self.log.info("task::using Python %s Task scheduler"%self.scheme) | |
|
104 | 104 | sargs = (self.client_addrs['task'], self.engine_addrs['task'], self.monitor_url, self.client_addrs['notification']) |
|
105 | q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme)) | |
|
105 | q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level)) | |
|
106 | 106 | q.daemon=True |
|
107 | 107 | children.append(q) |
|
108 | 108 |
@@ -55,7 +55,7 b' def require(*names):' | |||
|
55 | 55 | return depend(_require, *names) |
|
56 | 56 | |
|
57 | 57 | class Dependency(set): |
|
58 | """An object for representing a set of dependencies. | |
|
58 | """An object for representing a set of msg_id dependencies. | |
|
59 | 59 | |
|
60 | 60 | Subclassed from set().""" |
|
61 | 61 |
@@ -154,6 +154,9 b' class UnmetDependency(KernelError):' | |||
|
154 | 154 | class ImpossibleDependency(UnmetDependency): |
|
155 | 155 | pass |
|
156 | 156 | |
|
157 | class DependencyTimeout(UnmetDependency): | |
|
158 | pass | |
|
159 | ||
|
157 | 160 | class RemoteError(KernelError): |
|
158 | 161 | """Error raised elsewhere""" |
|
159 | 162 | ename=None |
@@ -12,9 +12,9 b' Python Scheduler exists.' | |||
|
12 | 12 | from __future__ import print_function |
|
13 | 13 | import sys |
|
14 | 14 | import logging |
|
15 | from random import randint,random | |
|
15 | from random import randint, random | |
|
16 | 16 | from types import FunctionType |
|
17 | ||
|
17 | from datetime import datetime, timedelta | |
|
18 | 18 | try: |
|
19 | 19 | import numpy |
|
20 | 20 | except ImportError: |
@@ -29,11 +29,11 b' from IPython.external.decorator import decorator' | |||
|
29 | 29 | from IPython.utils.traitlets import Instance, Dict, List, Set |
|
30 | 30 | |
|
31 | 31 | import error |
|
32 | from client import Client | |
|
32 | # from client import Client | |
|
33 | 33 | from dependency import Dependency |
|
34 | 34 | import streamsession as ss |
|
35 | 35 | from entry_point import connect_logger, local_logger |
|
36 |
from factory import |
|
|
36 | from factory import SessionFactory | |
|
37 | 37 | |
|
38 | 38 | |
|
39 | 39 | @decorator |
@@ -110,7 +110,7 b' def leastload(loads):' | |||
|
110 | 110 | # store empty default dependency: |
|
111 | 111 | MET = Dependency([]) |
|
112 | 112 | |
|
113 |
class TaskScheduler( |
|
|
113 | class TaskScheduler(SessionFactory): | |
|
114 | 114 | """Python TaskScheduler object. |
|
115 | 115 | |
|
116 | 116 | This is the simplest object that supports msg_id based |
@@ -125,7 +125,6 b' class TaskScheduler(LoggingFactory):' | |||
|
125 | 125 | engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream |
|
126 | 126 | notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream |
|
127 | 127 | mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream |
|
128 | io_loop = Instance(ioloop.IOLoop) | |
|
129 | 128 | |
|
130 | 129 | # internals: |
|
131 | 130 | dependencies = Dict() # dict by msg_id of [ msg_ids that depend on key ] |
@@ -141,20 +140,18 b' class TaskScheduler(LoggingFactory):' | |||
|
141 | 140 | all_failed = Set() # set of all failed tasks |
|
142 | 141 | all_done = Set() # set of all finished tasks=union(completed,failed) |
|
143 | 142 | blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency |
|
144 | session = Instance(ss.StreamSession) | |
|
143 | auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback') | |
|
145 | 144 | |
|
146 | 145 | |
|
147 |
def |
|
|
148 | super(TaskScheduler, self).__init__(**kwargs) | |
|
149 | ||
|
150 | self.session = ss.StreamSession(username="TaskScheduler") | |
|
151 | ||
|
146 | def start(self): | |
|
152 | 147 | self.engine_stream.on_recv(self.dispatch_result, copy=False) |
|
153 | 148 | self._notification_handlers = dict( |
|
154 | 149 | registration_notification = self._register_engine, |
|
155 | 150 | unregistration_notification = self._unregister_engine |
|
156 | 151 | ) |
|
157 | 152 | self.notifier_stream.on_recv(self.dispatch_notification) |
|
153 | self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 1e3, self.loop) # 1 Hz | |
|
154 | self.auditor.start() | |
|
158 | 155 | self.log.info("Scheduler started...%r"%self) |
|
159 | 156 | |
|
160 | 157 | def resume_receiving(self): |
@@ -261,37 +258,55 b' class TaskScheduler(LoggingFactory):' | |||
|
261 | 258 | |
|
262 | 259 | # location dependencies |
|
263 | 260 | follow = Dependency(header.get('follow', [])) |
|
264 | ||
|
265 | 261 | # check if unreachable: |
|
266 | 262 | if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed): |
|
267 | self.depending[msg_id] = [raw_msg,MET,MET] | |
|
263 | self.depending[msg_id] = [raw_msg,MET,MET,None] | |
|
268 | 264 | return self.fail_unreachable(msg_id) |
|
269 | 265 | |
|
266 | # turn timeouts into datetime objects: | |
|
267 | timeout = header.get('timeout', None) | |
|
268 | if timeout: | |
|
269 | timeout = datetime.now() + timedelta(0,timeout,0) | |
|
270 | ||
|
270 | 271 | if after.check(self.all_completed, self.all_failed): |
|
271 | 272 | # time deps already met, try to run |
|
272 | 273 | if not self.maybe_run(msg_id, raw_msg, follow): |
|
273 | 274 | # can't run yet |
|
274 | self.save_unmet(msg_id, raw_msg, after, follow) | |
|
275 | self.save_unmet(msg_id, raw_msg, after, follow, timeout) | |
|
275 | 276 | else: |
|
276 | self.save_unmet(msg_id, raw_msg, after, follow) | |
|
277 | self.save_unmet(msg_id, raw_msg, after, follow, timeout) | |
|
277 | 278 | |
|
278 | 279 | @logged |
|
279 | def fail_unreachable(self, msg_id): | |
|
280 | def audit_timeouts(self): | |
|
281 | """Audit all waiting tasks for expired timeouts.""" | |
|
282 | now = datetime.now() | |
|
283 | for msg_id in self.depending.keys(): | |
|
284 | # must recheck, in case one failure cascaded to another: | |
|
285 | if msg_id in self.depending: | |
|
286 | raw,after,follow,timeout = self.depending[msg_id] | |
|
287 | if timeout and timeout < now: | |
|
288 | self.fail_unreachable(msg_id, timeout=True) | |
|
289 | ||
|
290 | @logged | |
|
291 | def fail_unreachable(self, msg_id, timeout=False): | |
|
280 | 292 | """a message has become unreachable""" |
|
281 | 293 | if msg_id not in self.depending: |
|
282 | 294 | self.log.error("msg %r already failed!"%msg_id) |
|
283 | 295 | return |
|
284 | raw_msg, after, follow = self.depending.pop(msg_id) | |
|
296 | raw_msg, after, follow, timeout = self.depending.pop(msg_id) | |
|
285 | 297 | for mid in follow.union(after): |
|
286 | 298 | if mid in self.dependencies: |
|
287 | 299 | self.dependencies[mid].remove(msg_id) |
|
288 | 300 | |
|
301 | # FIXME: unpacking a message I've already unpacked, but didn't save: | |
|
289 | 302 | idents,msg = self.session.feed_identities(raw_msg, copy=False) |
|
290 | 303 | msg = self.session.unpack_message(msg, copy=False, content=False) |
|
291 | 304 | header = msg['header'] |
|
292 | 305 | |
|
306 | impossible = error.DependencyTimeout if timeout else error.ImpossibleDependency | |
|
307 | ||
|
293 | 308 | try: |
|
294 |
raise |
|
|
309 | raise impossible() | |
|
295 | 310 | except: |
|
296 | 311 | content = ss.wrap_exception() |
|
297 | 312 | |
@@ -334,9 +349,9 b' class TaskScheduler(LoggingFactory):' | |||
|
334 | 349 | return True |
|
335 | 350 | |
|
336 | 351 | @logged |
|
337 | def save_unmet(self, msg_id, raw_msg, after, follow): | |
|
352 | def save_unmet(self, msg_id, raw_msg, after, follow, timeout): | |
|
338 | 353 | """Save a message for later submission when its dependencies are met.""" |
|
339 | self.depending[msg_id] = [raw_msg,after,follow] | |
|
354 | self.depending[msg_id] = [raw_msg,after,follow,timeout] | |
|
340 | 355 | # track the ids in follow or after, but not those already finished |
|
341 | 356 | for dep_id in after.union(follow).difference(self.all_done): |
|
342 | 357 | if dep_id not in self.dependencies: |
@@ -413,10 +428,10 b' class TaskScheduler(LoggingFactory):' | |||
|
413 | 428 | if msg_id not in self.blacklist: |
|
414 | 429 | self.blacklist[msg_id] = set() |
|
415 | 430 | self.blacklist[msg_id].add(engine) |
|
416 | raw_msg,follow = self.pending[engine].pop(msg_id) | |
|
431 | raw_msg,follow,timeout = self.pending[engine].pop(msg_id) | |
|
417 | 432 | if not self.maybe_run(msg_id, raw_msg, follow): |
|
418 | 433 | # resubmit failed, put it back in our dependency tree |
|
419 | self.save_unmet(msg_id, raw_msg, MET, follow) | |
|
434 | self.save_unmet(msg_id, raw_msg, MET, follow, timeout) | |
|
420 | 435 | pass |
|
421 | 436 | |
|
422 | 437 | @logged |
@@ -435,7 +450,7 b' class TaskScheduler(LoggingFactory):' | |||
|
435 | 450 | jobs = self.dependencies.pop(dep_id) |
|
436 | 451 | |
|
437 | 452 | for msg_id in jobs: |
|
438 | raw_msg, after, follow = self.depending[msg_id] | |
|
453 | raw_msg, after, follow, timeout = self.depending[msg_id] | |
|
439 | 454 | # if dep_id in after: |
|
440 | 455 | # if after.mode == 'all' and (success or not after.success_only): |
|
441 | 456 | # after.remove(dep_id) |
@@ -497,9 +512,9 b" def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, logname='ZMQ', log_a" | |||
|
497 | 512 | local_logger(logname, loglevel) |
|
498 | 513 | |
|
499 | 514 | scheduler = TaskScheduler(client_stream=ins, engine_stream=outs, |
|
500 | mon_stream=mons,notifier_stream=nots, | |
|
501 |
scheme=scheme, |
|
|
502 | ||
|
515 | mon_stream=mons, notifier_stream=nots, | |
|
516 | scheme=scheme, loop=loop, logname=logname) | |
|
517 | scheduler.start() | |
|
503 | 518 | try: |
|
504 | 519 | loop.start() |
|
505 | 520 | except KeyboardInterrupt: |
General Comments 0
You need to be logged in to leave comments.
Login now