Show More
@@ -326,7 +326,7 class Client(object): | |||||
326 | else: |
|
326 | else: | |
327 | self._registration_socket.connect(url) |
|
327 | self._registration_socket.connect(url) | |
328 | self._engines = ReverseDict() |
|
328 | self._engines = ReverseDict() | |
329 |
self._ids = |
|
329 | self._ids = [] | |
330 | self.outstanding=set() |
|
330 | self.outstanding=set() | |
331 | self.results = {} |
|
331 | self.results = {} | |
332 | self.metadata = {} |
|
332 | self.metadata = {} | |
@@ -370,7 +370,8 class Client(object): | |||||
370 | for k,v in engines.iteritems(): |
|
370 | for k,v in engines.iteritems(): | |
371 | eid = int(k) |
|
371 | eid = int(k) | |
372 | self._engines[eid] = bytes(v) # force not unicode |
|
372 | self._engines[eid] = bytes(v) # force not unicode | |
373 |
self._ids.a |
|
373 | self._ids.append(eid) | |
|
374 | self._ids = sorted(self._ids) | |||
374 | if sorted(self._engines.keys()) != range(len(self._engines)) and \ |
|
375 | if sorted(self._engines.keys()) != range(len(self._engines)) and \ | |
375 | self._task_scheme == 'pure' and self._task_socket: |
|
376 | self._task_scheme == 'pure' and self._task_socket: | |
376 | self._stop_scheduling_tasks() |
|
377 | self._stop_scheduling_tasks() | |
@@ -470,7 +471,6 class Client(object): | |||||
470 | eid = content['id'] |
|
471 | eid = content['id'] | |
471 | d = {eid : content['queue']} |
|
472 | d = {eid : content['queue']} | |
472 | self._update_engines(d) |
|
473 | self._update_engines(d) | |
473 | self._ids.add(int(eid)) |
|
|||
474 |
|
474 | |||
475 | def _unregister_engine(self, msg): |
|
475 | def _unregister_engine(self, msg): | |
476 | """Unregister an engine that has died.""" |
|
476 | """Unregister an engine that has died.""" | |
@@ -664,9 +664,9 class Client(object): | |||||
664 | """property for convenient RemoteFunction generation. |
|
664 | """property for convenient RemoteFunction generation. | |
665 |
|
665 | |||
666 | >>> @client.remote |
|
666 | >>> @client.remote | |
667 |
... def |
|
667 | ... def getpid(): | |
668 | import os |
|
668 | import os | |
669 |
|
|
669 | return os.getpid() | |
670 | """ |
|
670 | """ | |
671 | return remote(self, block=self.block) |
|
671 | return remote(self, block=self.block) | |
672 |
|
672 | |||
@@ -867,6 +867,7 class Client(object): | |||||
867 | # pass to Dependency constructor |
|
867 | # pass to Dependency constructor | |
868 | return list(Dependency(dep)) |
|
868 | return list(Dependency(dep)) | |
869 |
|
869 | |||
|
870 | @defaultblock | |||
870 | def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None, |
|
871 | def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None, | |
871 | after=None, follow=None, timeout=None): |
|
872 | after=None, follow=None, timeout=None): | |
872 | """Call `f(*args, **kwargs)` on a remote engine(s), returning the result. |
|
873 | """Call `f(*args, **kwargs)` on a remote engine(s), returning the result. | |
@@ -903,24 +904,9 class Client(object): | |||||
903 | Run on each specified engine |
|
904 | Run on each specified engine | |
904 | if int: |
|
905 | if int: | |
905 | Run on single engine |
|
906 | Run on single engine | |
906 |
|
||||
907 | after : Dependency or collection of msg_ids |
|
|||
908 | Only for load-balanced execution (targets=None) |
|
|||
909 | Specify a list of msg_ids as a time-based dependency. |
|
|||
910 | This job will only be run *after* the dependencies |
|
|||
911 | have been met. |
|
|||
912 |
|
||||
913 | follow : Dependency or collection of msg_ids |
|
|||
914 | Only for load-balanced execution (targets=None) |
|
|||
915 | Specify a list of msg_ids as a location-based dependency. |
|
|||
916 | This job will only be run on an engine where this dependency |
|
|||
917 | is met. |
|
|||
918 |
|
907 | |||
919 | timeout : float/int or None |
|
908 | after,follow,timeout only used in `apply_balanced`. See that docstring | |
920 | Only for load-balanced execution (targets=None) |
|
909 | for details. | |
921 | Specify an amount of time (in seconds) for the scheduler to |
|
|||
922 | wait for dependencies to be met before failing with a |
|
|||
923 | DependencyTimeout. |
|
|||
924 |
|
910 | |||
925 | Returns |
|
911 | Returns | |
926 | ------- |
|
912 | ------- | |
@@ -947,25 +933,88 class Client(object): | |||||
947 | if not isinstance(kwargs, dict): |
|
933 | if not isinstance(kwargs, dict): | |
948 | raise TypeError("kwargs must be dict, not %s"%type(kwargs)) |
|
934 | raise TypeError("kwargs must be dict, not %s"%type(kwargs)) | |
949 |
|
935 | |||
950 | options = dict(bound=bound, block=block) |
|
936 | options = dict(bound=bound, block=block, targets=targets) | |
951 |
|
937 | |||
952 | if targets is None: |
|
938 | if targets is None: | |
953 | if self._task_socket: |
|
939 | return self.apply_balanced(f, args, kwargs, timeout=timeout, | |
954 | return self._apply_balanced(f, args, kwargs, timeout=timeout, |
|
940 | after=after, follow=follow, **options) | |
955 | after=after, follow=follow, **options) |
|
|||
956 | else: |
|
|||
957 | msg = "Task farming is disabled" |
|
|||
958 | if self._task_scheme == 'pure': |
|
|||
959 | msg += " because the pure ZMQ scheduler cannot handle" |
|
|||
960 | msg += " disappearing engines." |
|
|||
961 | raise RuntimeError(msg) |
|
|||
962 | else: |
|
941 | else: | |
963 | return self._apply_direct(f, args, kwargs, targets=targets, **options) |
|
942 | if follow or after or timeout: | |
|
943 | msg = "follow, after, and timeout args are only used for load-balanced" | |||
|
944 | msg += "execution." | |||
|
945 | raise ValueError(msg) | |||
|
946 | return self._apply_direct(f, args, kwargs, **options) | |||
964 |
|
947 | |||
965 | def _apply_balanced(self, f, args, kwargs, bound=True, block=None, |
|
948 | @defaultblock | |
|
949 | def apply_balanced(self, f, args, kwargs, bound=True, block=None, targets=None, | |||
966 | after=None, follow=None, timeout=None): |
|
950 | after=None, follow=None, timeout=None): | |
967 | """The underlying method for applying functions in a load balanced |
|
951 | """call f(*args, **kwargs) remotely in a load-balanced manner. | |
968 | manner, via the task queue.""" |
|
952 | ||
|
953 | Parameters | |||
|
954 | ---------- | |||
|
955 | ||||
|
956 | f : function | |||
|
957 | The fuction to be called remotely | |||
|
958 | args : tuple/list | |||
|
959 | The positional arguments passed to `f` | |||
|
960 | kwargs : dict | |||
|
961 | The keyword arguments passed to `f` | |||
|
962 | bound : bool (default: True) | |||
|
963 | Whether to execute in the Engine(s) namespace, or in a clean | |||
|
964 | namespace not affecting the engine. | |||
|
965 | block : bool (default: self.block) | |||
|
966 | Whether to wait for the result, or return immediately. | |||
|
967 | False: | |||
|
968 | returns AsyncResult | |||
|
969 | True: | |||
|
970 | returns actual result(s) of f(*args, **kwargs) | |||
|
971 | if multiple targets: | |||
|
972 | list of results, matching `targets` | |||
|
973 | targets : int,list of ints, 'all', None | |||
|
974 | Specify the destination of the job. | |||
|
975 | if None: | |||
|
976 | Submit via Task queue for load-balancing. | |||
|
977 | if 'all': | |||
|
978 | Run on all active engines | |||
|
979 | if list: | |||
|
980 | Run on each specified engine | |||
|
981 | if int: | |||
|
982 | Run on single engine | |||
|
983 | ||||
|
984 | after : Dependency or collection of msg_ids | |||
|
985 | Only for load-balanced execution (targets=None) | |||
|
986 | Specify a list of msg_ids as a time-based dependency. | |||
|
987 | This job will only be run *after* the dependencies | |||
|
988 | have been met. | |||
|
989 | ||||
|
990 | follow : Dependency or collection of msg_ids | |||
|
991 | Only for load-balanced execution (targets=None) | |||
|
992 | Specify a list of msg_ids as a location-based dependency. | |||
|
993 | This job will only be run on an engine where this dependency | |||
|
994 | is met. | |||
|
995 | ||||
|
996 | timeout : float/int or None | |||
|
997 | Only for load-balanced execution (targets=None) | |||
|
998 | Specify an amount of time (in seconds) for the scheduler to | |||
|
999 | wait for dependencies to be met before failing with a | |||
|
1000 | DependencyTimeout. | |||
|
1001 | ||||
|
1002 | Returns | |||
|
1003 | ------- | |||
|
1004 | if block is False: | |||
|
1005 | return AsyncResult wrapping msg_id | |||
|
1006 | output of AsyncResult.get() is identical to that of `apply(...block=True)` | |||
|
1007 | else: | |||
|
1008 | wait for, and return actual result of `f(*args, **kwargs)` | |||
|
1009 | ||||
|
1010 | """ | |||
|
1011 | ||||
|
1012 | if self._task_socket is None: | |||
|
1013 | msg = "Task farming is disabled" | |||
|
1014 | if self._task_scheme == 'pure': | |||
|
1015 | msg += " because the pure ZMQ scheduler cannot handle" | |||
|
1016 | msg += " disappearing engines." | |||
|
1017 | raise RuntimeError(msg) | |||
969 |
|
1018 | |||
970 | if self._task_scheme == 'pure': |
|
1019 | if self._task_scheme == 'pure': | |
971 | # pure zmq scheme doesn't support dependencies |
|
1020 | # pure zmq scheme doesn't support dependencies | |
@@ -978,9 +1027,26 class Client(object): | |||||
978 | warnings.warn(msg, RuntimeWarning) |
|
1027 | warnings.warn(msg, RuntimeWarning) | |
979 |
|
1028 | |||
980 |
|
1029 | |||
|
1030 | # defaults: | |||
|
1031 | args = args if args is not None else [] | |||
|
1032 | kwargs = kwargs if kwargs is not None else {} | |||
|
1033 | ||||
|
1034 | if targets: | |||
|
1035 | idents,_ = self._build_targets(targets) | |||
|
1036 | else: | |||
|
1037 | idents = [] | |||
|
1038 | ||||
|
1039 | # enforce types of f,args,kwrags | |||
|
1040 | if not callable(f): | |||
|
1041 | raise TypeError("f must be callable, not %s"%type(f)) | |||
|
1042 | if not isinstance(args, (tuple, list)): | |||
|
1043 | raise TypeError("args must be tuple or list, not %s"%type(args)) | |||
|
1044 | if not isinstance(kwargs, dict): | |||
|
1045 | raise TypeError("kwargs must be dict, not %s"%type(kwargs)) | |||
|
1046 | ||||
981 | after = self._build_dependency(after) |
|
1047 | after = self._build_dependency(after) | |
982 | follow = self._build_dependency(follow) |
|
1048 | follow = self._build_dependency(follow) | |
983 | subheader = dict(after=after, follow=follow, timeout=timeout) |
|
1049 | subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents) | |
984 | bufs = ss.pack_apply_message(f,args,kwargs) |
|
1050 | bufs = ss.pack_apply_message(f,args,kwargs) | |
985 | content = dict(bound=bound) |
|
1051 | content = dict(bound=bound) | |
986 |
|
1052 | |||
@@ -991,31 +1057,40 class Client(object): | |||||
991 | self.history.append(msg_id) |
|
1057 | self.history.append(msg_id) | |
992 | ar = AsyncResult(self, [msg_id], fname=f.__name__) |
|
1058 | ar = AsyncResult(self, [msg_id], fname=f.__name__) | |
993 | if block: |
|
1059 | if block: | |
994 |
|
|
1060 | try: | |
|
1061 | return ar.get() | |||
|
1062 | except KeyboardInterrupt: | |||
|
1063 | return ar | |||
995 | else: |
|
1064 | else: | |
996 | return ar |
|
1065 | return ar | |
997 |
|
1066 | |||
998 | def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None): |
|
1067 | def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None): | |
999 | """Then underlying method for applying functions to specific engines |
|
1068 | """Then underlying method for applying functions to specific engines | |
1000 |
via the MUX queue. |
|
1069 | via the MUX queue. | |
|
1070 | ||||
|
1071 | Not to be called directly! | |||
|
1072 | """ | |||
1001 |
|
1073 | |||
1002 |
|
|
1074 | idents,targets = self._build_targets(targets) | |
1003 |
|
1075 | |||
1004 | subheader = {} |
|
1076 | subheader = {} | |
1005 | content = dict(bound=bound) |
|
1077 | content = dict(bound=bound) | |
1006 | bufs = ss.pack_apply_message(f,args,kwargs) |
|
1078 | bufs = ss.pack_apply_message(f,args,kwargs) | |
1007 |
|
1079 | |||
1008 | msg_ids = [] |
|
1080 | msg_ids = [] | |
1009 |
for |
|
1081 | for ident in idents: | |
1010 | msg = self.session.send(self._mux_socket, "apply_request", |
|
1082 | msg = self.session.send(self._mux_socket, "apply_request", | |
1011 |
content=content, buffers=bufs,ident= |
|
1083 | content=content, buffers=bufs, ident=ident, subheader=subheader) | |
1012 | msg_id = msg['msg_id'] |
|
1084 | msg_id = msg['msg_id'] | |
1013 | self.outstanding.add(msg_id) |
|
1085 | self.outstanding.add(msg_id) | |
1014 | self.history.append(msg_id) |
|
1086 | self.history.append(msg_id) | |
1015 | msg_ids.append(msg_id) |
|
1087 | msg_ids.append(msg_id) | |
1016 | ar = AsyncResult(self, msg_ids, fname=f.__name__) |
|
1088 | ar = AsyncResult(self, msg_ids, fname=f.__name__) | |
1017 | if block: |
|
1089 | if block: | |
1018 |
|
|
1090 | try: | |
|
1091 | return ar.get() | |||
|
1092 | except KeyboardInterrupt: | |||
|
1093 | return ar | |||
1019 | else: |
|
1094 | else: | |
1020 | return ar |
|
1095 | return ar | |
1021 |
|
1096 | |||
@@ -1037,6 +1112,15 class Client(object): | |||||
1037 | """Decorator for making a RemoteFunction.""" |
|
1112 | """Decorator for making a RemoteFunction.""" | |
1038 | return remote(self, bound=bound, targets=targets, block=block) |
|
1113 | return remote(self, bound=bound, targets=targets, block=block) | |
1039 |
|
1114 | |||
|
1115 | def view(self, targets=None, balanced=False): | |||
|
1116 | """Method for constructing View objects""" | |||
|
1117 | if not balanced: | |||
|
1118 | if not targets: | |||
|
1119 | targets = slice(None) | |||
|
1120 | return self[targets] | |||
|
1121 | else: | |||
|
1122 | return LoadBalancedView(self, targets) | |||
|
1123 | ||||
1040 | #-------------------------------------------------------------------------- |
|
1124 | #-------------------------------------------------------------------------- | |
1041 | # Data movement |
|
1125 | # Data movement | |
1042 | #-------------------------------------------------------------------------- |
|
1126 | #-------------------------------------------------------------------------- |
@@ -265,6 +265,9 class TaskScheduler(SessionFactory): | |||||
265 | msg_id = header['msg_id'] |
|
265 | msg_id = header['msg_id'] | |
266 | self.all_ids.add(msg_id) |
|
266 | self.all_ids.add(msg_id) | |
267 |
|
267 | |||
|
268 | # targets | |||
|
269 | targets = set(header.get('targets', [])) | |||
|
270 | ||||
268 | # time dependencies |
|
271 | # time dependencies | |
269 | after = Dependency(header.get('after', [])) |
|
272 | after = Dependency(header.get('after', [])) | |
270 | if after.all: |
|
273 | if after.all: | |
@@ -279,28 +282,31 class TaskScheduler(SessionFactory): | |||||
279 | # location dependencies |
|
282 | # location dependencies | |
280 | follow = Dependency(header.get('follow', [])) |
|
283 | follow = Dependency(header.get('follow', [])) | |
281 |
|
284 | |||
|
285 | # turn timeouts into datetime objects: | |||
|
286 | timeout = header.get('timeout', None) | |||
|
287 | if timeout: | |||
|
288 | timeout = datetime.now() + timedelta(0,timeout,0) | |||
|
289 | ||||
|
290 | args = [raw_msg, targets, after, follow, timeout] | |||
|
291 | ||||
|
292 | # validate and reduce dependencies: | |||
282 | for dep in after,follow: |
|
293 | for dep in after,follow: | |
283 | # check valid: |
|
294 | # check valid: | |
284 | if msg_id in dep or dep.difference(self.all_ids): |
|
295 | if msg_id in dep or dep.difference(self.all_ids): | |
285 |
self.depending[msg_id] = |
|
296 | self.depending[msg_id] = args | |
286 | return self.fail_unreachable(msg_id, error.InvalidDependency) |
|
297 | return self.fail_unreachable(msg_id, error.InvalidDependency) | |
287 | # check if unreachable: |
|
298 | # check if unreachable: | |
288 | if dep.unreachable(self.all_failed): |
|
299 | if dep.unreachable(self.all_failed): | |
289 |
self.depending[msg_id] = |
|
300 | self.depending[msg_id] = args | |
290 | return self.fail_unreachable(msg_id) |
|
301 | return self.fail_unreachable(msg_id) | |
291 |
|
302 | |||
292 | # turn timeouts into datetime objects: |
|
|||
293 | timeout = header.get('timeout', None) |
|
|||
294 | if timeout: |
|
|||
295 | timeout = datetime.now() + timedelta(0,timeout,0) |
|
|||
296 |
|
||||
297 | if after.check(self.all_completed, self.all_failed): |
|
303 | if after.check(self.all_completed, self.all_failed): | |
298 | # time deps already met, try to run |
|
304 | # time deps already met, try to run | |
299 |
if not self.maybe_run(msg_id, |
|
305 | if not self.maybe_run(msg_id, *args): | |
300 | # can't run yet |
|
306 | # can't run yet | |
301 |
self.save_unmet(msg_id, |
|
307 | self.save_unmet(msg_id, *args) | |
302 | else: |
|
308 | else: | |
303 |
self.save_unmet(msg_id, |
|
309 | self.save_unmet(msg_id, *args) | |
304 |
|
310 | |||
305 | # @logged |
|
311 | # @logged | |
306 | def audit_timeouts(self): |
|
312 | def audit_timeouts(self): | |
@@ -309,17 +315,18 class TaskScheduler(SessionFactory): | |||||
309 | for msg_id in self.depending.keys(): |
|
315 | for msg_id in self.depending.keys(): | |
310 | # must recheck, in case one failure cascaded to another: |
|
316 | # must recheck, in case one failure cascaded to another: | |
311 | if msg_id in self.depending: |
|
317 | if msg_id in self.depending: | |
312 | raw,after,follow,timeout = self.depending[msg_id] |
|
318 | raw,after,targets,follow,timeout = self.depending[msg_id] | |
313 | if timeout and timeout < now: |
|
319 | if timeout and timeout < now: | |
314 | self.fail_unreachable(msg_id, timeout=True) |
|
320 | self.fail_unreachable(msg_id, timeout=True) | |
315 |
|
321 | |||
316 | @logged |
|
322 | @logged | |
317 | def fail_unreachable(self, msg_id, why=error.ImpossibleDependency): |
|
323 | def fail_unreachable(self, msg_id, why=error.ImpossibleDependency): | |
318 | """a message has become unreachable""" |
|
324 | """a task has become unreachable, send a reply with an ImpossibleDependency | |
|
325 | error.""" | |||
319 | if msg_id not in self.depending: |
|
326 | if msg_id not in self.depending: | |
320 | self.log.error("msg %r already failed!"%msg_id) |
|
327 | self.log.error("msg %r already failed!"%msg_id) | |
321 | return |
|
328 | return | |
322 |
raw_msg, |
|
329 | raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id) | |
323 | for mid in follow.union(after): |
|
330 | for mid in follow.union(after): | |
324 | if mid in self.graph: |
|
331 | if mid in self.graph: | |
325 | self.graph[mid].remove(msg_id) |
|
332 | self.graph[mid].remove(msg_id) | |
@@ -344,37 +351,51 class TaskScheduler(SessionFactory): | |||||
344 | self.update_graph(msg_id, success=False) |
|
351 | self.update_graph(msg_id, success=False) | |
345 |
|
352 | |||
346 | @logged |
|
353 | @logged | |
347 |
def maybe_run(self, msg_id, raw_msg, follow |
|
354 | def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout): | |
348 | """check location dependencies, and run if they are met.""" |
|
355 | """check location dependencies, and run if they are met.""" | |
349 |
|
356 | blacklist = self.blacklist.setdefault(msg_id, set()) | ||
350 | if follow: |
|
357 | if follow or targets or blacklist: | |
|
358 | # we need a can_run filter | |||
351 | def can_run(idx): |
|
359 | def can_run(idx): | |
352 | target = self.targets[idx] |
|
360 | target = self.targets[idx] | |
353 | return target not in self.blacklist.get(msg_id, []) and\ |
|
361 | # check targets | |
354 | follow.check(self.completed[target], self.failed[target]) |
|
362 | if targets and target not in targets: | |
|
363 | return False | |||
|
364 | # check blacklist | |||
|
365 | if target in blacklist: | |||
|
366 | return False | |||
|
367 | # check follow | |||
|
368 | return follow.check(self.completed[target], self.failed[target]) | |||
355 |
|
369 | |||
356 | indices = filter(can_run, range(len(self.targets))) |
|
370 | indices = filter(can_run, range(len(self.targets))) | |
357 | if not indices: |
|
371 | if not indices: | |
|
372 | # couldn't run | |||
358 | if follow.all: |
|
373 | if follow.all: | |
|
374 | # check follow for impossibility | |||
359 | dests = set() |
|
375 | dests = set() | |
360 | relevant = self.all_completed if follow.success_only else self.all_done |
|
376 | relevant = self.all_completed if follow.success_only else self.all_done | |
361 | for m in follow.intersection(relevant): |
|
377 | for m in follow.intersection(relevant): | |
362 | dests.add(self.destinations[m]) |
|
378 | dests.add(self.destinations[m]) | |
363 | if len(dests) > 1: |
|
379 | if len(dests) > 1: | |
364 | self.fail_unreachable(msg_id) |
|
380 | self.fail_unreachable(msg_id) | |
365 |
|
381 | return False | ||
366 |
|
382 | if targets: | ||
|
383 | # check blacklist+targets for impossibility | |||
|
384 | targets.difference_update(blacklist) | |||
|
385 | if not targets or not targets.intersection(self.targets): | |||
|
386 | self.fail_unreachable(msg_id) | |||
|
387 | return False | |||
367 | return False |
|
388 | return False | |
368 | else: |
|
389 | else: | |
369 | indices = None |
|
390 | indices = None | |
370 |
|
391 | |||
371 | self.submit_task(msg_id, raw_msg, follow, timeout, indices) |
|
392 | self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices) | |
372 | return True |
|
393 | return True | |
373 |
|
394 | |||
374 | @logged |
|
395 | @logged | |
375 | def save_unmet(self, msg_id, raw_msg, after, follow, timeout): |
|
396 | def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout): | |
376 | """Save a message for later submission when its dependencies are met.""" |
|
397 | """Save a message for later submission when its dependencies are met.""" | |
377 | self.depending[msg_id] = [raw_msg,after,follow,timeout] |
|
398 | self.depending[msg_id] = [raw_msg,targets,after,follow,timeout] | |
378 | # track the ids in follow or after, but not those already finished |
|
399 | # track the ids in follow or after, but not those already finished | |
379 | for dep_id in after.union(follow).difference(self.all_done): |
|
400 | for dep_id in after.union(follow).difference(self.all_done): | |
380 | if dep_id not in self.graph: |
|
401 | if dep_id not in self.graph: | |
@@ -382,7 +403,7 class TaskScheduler(SessionFactory): | |||||
382 | self.graph[dep_id].add(msg_id) |
|
403 | self.graph[dep_id].add(msg_id) | |
383 |
|
404 | |||
384 | @logged |
|
405 | @logged | |
385 | def submit_task(self, msg_id, raw_msg, follow, timeout, indices=None): |
|
406 | def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None): | |
386 | """Submit a task to any of a subset of our targets.""" |
|
407 | """Submit a task to any of a subset of our targets.""" | |
387 | if indices: |
|
408 | if indices: | |
388 | loads = [self.loads[i] for i in indices] |
|
409 | loads = [self.loads[i] for i in indices] | |
@@ -396,7 +417,7 class TaskScheduler(SessionFactory): | |||||
396 | self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False) |
|
417 | self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False) | |
397 | self.engine_stream.send_multipart(raw_msg, copy=False) |
|
418 | self.engine_stream.send_multipart(raw_msg, copy=False) | |
398 | self.add_job(idx) |
|
419 | self.add_job(idx) | |
399 | self.pending[target][msg_id] = (raw_msg, follow, timeout) |
|
420 | self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout) | |
400 | content = dict(msg_id=msg_id, engine_id=target) |
|
421 | content = dict(msg_id=msg_id, engine_id=target) | |
401 | self.session.send(self.mon_stream, 'task_destination', content=content, |
|
422 | self.session.send(self.mon_stream, 'task_destination', content=content, | |
402 | ident=['tracktask',self.session.session]) |
|
423 | ident=['tracktask',self.session.session]) | |
@@ -406,6 +427,7 class TaskScheduler(SessionFactory): | |||||
406 | #----------------------------------------------------------------------- |
|
427 | #----------------------------------------------------------------------- | |
407 | @logged |
|
428 | @logged | |
408 | def dispatch_result(self, raw_msg): |
|
429 | def dispatch_result(self, raw_msg): | |
|
430 | """dispatch method for result replies""" | |||
409 | try: |
|
431 | try: | |
410 | idents,msg = self.session.feed_identities(raw_msg, copy=False) |
|
432 | idents,msg = self.session.feed_identities(raw_msg, copy=False) | |
411 | msg = self.session.unpack_message(msg, content=False, copy=False) |
|
433 | msg = self.session.unpack_message(msg, content=False, copy=False) | |
@@ -424,6 +446,7 class TaskScheduler(SessionFactory): | |||||
424 |
|
446 | |||
425 | @logged |
|
447 | @logged | |
426 | def handle_result(self, idents, parent, raw_msg, success=True): |
|
448 | def handle_result(self, idents, parent, raw_msg, success=True): | |
|
449 | """handle a real task result, either success or failure""" | |||
427 | # first, relay result to client |
|
450 | # first, relay result to client | |
428 | engine = idents[0] |
|
451 | engine = idents[0] | |
429 | client = idents[1] |
|
452 | client = idents[1] | |
@@ -448,21 +471,30 class TaskScheduler(SessionFactory): | |||||
448 |
|
471 | |||
449 | @logged |
|
472 | @logged | |
450 | def handle_unmet_dependency(self, idents, parent): |
|
473 | def handle_unmet_dependency(self, idents, parent): | |
|
474 | """handle an unmet dependency""" | |||
451 | engine = idents[0] |
|
475 | engine = idents[0] | |
452 | msg_id = parent['msg_id'] |
|
476 | msg_id = parent['msg_id'] | |
|
477 | ||||
453 | if msg_id not in self.blacklist: |
|
478 | if msg_id not in self.blacklist: | |
454 | self.blacklist[msg_id] = set() |
|
479 | self.blacklist[msg_id] = set() | |
455 | self.blacklist[msg_id].add(engine) |
|
480 | self.blacklist[msg_id].add(engine) | |
456 | raw_msg,follow,timeout = self.pending[engine].pop(msg_id) |
|
481 | ||
457 | if not self.maybe_run(msg_id, raw_msg, follow, timeout): |
|
482 | args = self.pending[engine].pop(msg_id) | |
|
483 | raw,targets,after,follow,timeout = args | |||
|
484 | ||||
|
485 | if self.blacklist[msg_id] == targets: | |||
|
486 | self.depending[msg_id] = args | |||
|
487 | return self.fail_unreachable(msg_id) | |||
|
488 | ||||
|
489 | elif not self.maybe_run(msg_id, *args): | |||
458 | # resubmit failed, put it back in our dependency tree |
|
490 | # resubmit failed, put it back in our dependency tree | |
459 |
self.save_unmet(msg_id, |
|
491 | self.save_unmet(msg_id, *args) | |
460 |
|
|
492 | ||
461 |
|
493 | |||
462 | @logged |
|
494 | @logged | |
463 | def update_graph(self, dep_id, success=True): |
|
495 | def update_graph(self, dep_id, success=True): | |
464 | """dep_id just finished. Update our dependency |
|
496 | """dep_id just finished. Update our dependency | |
465 |
|
|
497 | graph and submit any jobs that just became runable.""" | |
466 | # print ("\n\n***********") |
|
498 | # print ("\n\n***********") | |
467 | # pprint (dep_id) |
|
499 | # pprint (dep_id) | |
468 | # pprint (self.graph) |
|
500 | # pprint (self.graph) | |
@@ -475,7 +507,7 class TaskScheduler(SessionFactory): | |||||
475 | jobs = self.graph.pop(dep_id) |
|
507 | jobs = self.graph.pop(dep_id) | |
476 |
|
508 | |||
477 | for msg_id in jobs: |
|
509 | for msg_id in jobs: | |
478 | raw_msg, after, follow, timeout = self.depending[msg_id] |
|
510 | raw_msg, targets, after, follow, timeout = self.depending[msg_id] | |
479 | # if dep_id in after: |
|
511 | # if dep_id in after: | |
480 | # if after.all and (success or not after.success_only): |
|
512 | # if after.all and (success or not after.success_only): | |
481 | # after.remove(dep_id) |
|
513 | # after.remove(dep_id) | |
@@ -484,8 +516,7 class TaskScheduler(SessionFactory): | |||||
484 | self.fail_unreachable(msg_id) |
|
516 | self.fail_unreachable(msg_id) | |
485 |
|
517 | |||
486 | elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run |
|
518 | elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run | |
487 | self.depending[msg_id][1] = MET |
|
519 | if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout): | |
488 | if self.maybe_run(msg_id, raw_msg, follow, timeout): |
|
|||
489 |
|
520 | |||
490 | self.depending.pop(msg_id) |
|
521 | self.depending.pop(msg_id) | |
491 | for mid in follow.union(after): |
|
522 | for mid in follow.union(after): |
@@ -66,10 +66,15 class View(object): | |||||
66 |
|
66 | |||
67 | Don't use this class, use subclasses. |
|
67 | Don't use this class, use subclasses. | |
68 | """ |
|
68 | """ | |
69 | _targets = None |
|
|||
70 | block=None |
|
69 | block=None | |
71 | bound=None |
|
70 | bound=None | |
72 | history=None |
|
71 | history=None | |
|
72 | outstanding = set() | |||
|
73 | results = {} | |||
|
74 | ||||
|
75 | _targets = None | |||
|
76 | _apply_name = 'apply' | |||
|
77 | _default_names = ['targets', 'block'] | |||
73 |
|
78 | |||
74 | def __init__(self, client, targets=None): |
|
79 | def __init__(self, client, targets=None): | |
75 | self.client = client |
|
80 | self.client = client | |
@@ -80,6 +85,9 class View(object): | |||||
80 | self.history = [] |
|
85 | self.history = [] | |
81 | self.outstanding = set() |
|
86 | self.outstanding = set() | |
82 | self.results = {} |
|
87 | self.results = {} | |
|
88 | for name in self._default_names: | |||
|
89 | setattr(self, name, getattr(self, name, None)) | |||
|
90 | ||||
83 |
|
91 | |||
84 | def __repr__(self): |
|
92 | def __repr__(self): | |
85 | strtargets = str(self._targets) |
|
93 | strtargets = str(self._targets) | |
@@ -95,11 +103,23 class View(object): | |||||
95 | def targets(self, value): |
|
103 | def targets(self, value): | |
96 | self._targets = value |
|
104 | self._targets = value | |
97 | # raise AttributeError("Cannot set my targets argument after construction!") |
|
105 | # raise AttributeError("Cannot set my targets argument after construction!") | |
98 |
|
106 | |||
|
107 | def _defaults(self, *excludes): | |||
|
108 | """return dict of our default attributes, excluding names given.""" | |||
|
109 | d = {} | |||
|
110 | for name in self._default_names: | |||
|
111 | if name not in excludes: | |||
|
112 | d[name] = getattr(self, name) | |||
|
113 | return d | |||
|
114 | ||||
99 | @sync_results |
|
115 | @sync_results | |
100 | def spin(self): |
|
116 | def spin(self): | |
101 | """spin the client, and sync""" |
|
117 | """spin the client, and sync""" | |
102 | self.client.spin() |
|
118 | self.client.spin() | |
|
119 | ||||
|
120 | @property | |||
|
121 | def _apply(self): | |||
|
122 | return getattr(self.client, self._apply_name) | |||
103 |
|
123 | |||
104 | @sync_results |
|
124 | @sync_results | |
105 | @save_ids |
|
125 | @save_ids | |
@@ -113,7 +133,7 class View(object): | |||||
113 | else: |
|
133 | else: | |
114 | returns actual result of f(*args, **kwargs) |
|
134 | returns actual result of f(*args, **kwargs) | |
115 | """ |
|
135 | """ | |
116 |
return self. |
|
136 | return self._apply(f, args, kwargs, **self._defaults()) | |
117 |
|
137 | |||
118 | @save_ids |
|
138 | @save_ids | |
119 | def apply_async(self, f, *args, **kwargs): |
|
139 | def apply_async(self, f, *args, **kwargs): | |
@@ -123,7 +143,8 class View(object): | |||||
123 |
|
143 | |||
124 | returns msg_id |
|
144 | returns msg_id | |
125 | """ |
|
145 | """ | |
126 | return self.client.apply(f,args,kwargs, block=False, targets=self.targets, bound=False) |
|
146 | d = self._defaults('block', 'bound') | |
|
147 | return self._apply(f,args,kwargs, block=False, bound=False, **d) | |||
127 |
|
148 | |||
128 | @spin_after |
|
149 | @spin_after | |
129 | @save_ids |
|
150 | @save_ids | |
@@ -135,7 +156,8 class View(object): | |||||
135 |
|
156 | |||
136 | returns: actual result of f(*args, **kwargs) |
|
157 | returns: actual result of f(*args, **kwargs) | |
137 | """ |
|
158 | """ | |
138 | return self.client.apply(f,args,kwargs, block=True, targets=self.targets, bound=False) |
|
159 | d = self._defaults('block', 'bound') | |
|
160 | return self._apply(f,args,kwargs, block=True, bound=False, **d) | |||
139 |
|
161 | |||
140 | @sync_results |
|
162 | @sync_results | |
141 | @save_ids |
|
163 | @save_ids | |
@@ -150,7 +172,8 class View(object): | |||||
150 | This method has access to the targets' globals |
|
172 | This method has access to the targets' globals | |
151 |
|
173 | |||
152 | """ |
|
174 | """ | |
153 | return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=True) |
|
175 | d = self._defaults('bound') | |
|
176 | return self._apply(f, args, kwargs, bound=True, **d) | |||
154 |
|
177 | |||
155 | @sync_results |
|
178 | @sync_results | |
156 | @save_ids |
|
179 | @save_ids | |
@@ -163,7 +186,8 class View(object): | |||||
163 | This method has access to the targets' globals |
|
186 | This method has access to the targets' globals | |
164 |
|
187 | |||
165 | """ |
|
188 | """ | |
166 | return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True) |
|
189 | d = self._defaults('block', 'bound') | |
|
190 | return self._apply(f, args, kwargs, block=False, bound=True, **d) | |||
167 |
|
191 | |||
168 | @spin_after |
|
192 | @spin_after | |
169 | @save_ids |
|
193 | @save_ids | |
@@ -175,7 +199,8 class View(object): | |||||
175 | This method has access to the targets' globals |
|
199 | This method has access to the targets' globals | |
176 |
|
200 | |||
177 | """ |
|
201 | """ | |
178 | return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True) |
|
202 | d = self._defaults('block', 'bound') | |
|
203 | return self._apply(f, args, kwargs, block=True, bound=True, **d) | |||
179 |
|
204 | |||
180 | @spin_after |
|
205 | @spin_after | |
181 | @save_ids |
|
206 | @save_ids | |
@@ -337,24 +362,22 class LoadBalancedView(View): | |||||
337 |
|
362 | |||
338 | Typically created via: |
|
363 | Typically created via: | |
339 |
|
364 | |||
340 |
>>> |
|
365 | >>> v = client[None] | |
341 |
<LoadBalancedView |
|
366 | <LoadBalancedView None> | |
342 |
|
367 | |||
343 | but can also be created with: |
|
368 | but can also be created with: | |
344 |
|
369 | |||
345 | >>> lbc = LoadBalancedView(client) |
|
370 | >>> v = client.view([1,3],balanced=True) | |
|
371 | ||||
|
372 | which would restrict loadbalancing to between engines 1 and 3. | |||
346 |
|
373 | |||
347 | TODO: allow subset of engines across which to balance. |
|
|||
348 | """ |
|
374 | """ | |
349 | def __repr__(self): |
|
|||
350 | return "<%s %s>"%(self.__class__.__name__, self.client._config['url']) |
|
|||
351 |
|
375 | |||
352 | @property |
|
376 | _apply_name = 'apply_balanced' | |
353 | def targets(self): |
|
377 | _default_names = ['targets', 'block', 'bound', 'follow', 'after', 'timeout'] | |
354 | return None |
|
378 | ||
355 |
|
379 | def __init__(self, client, targets=None): | ||
356 | @targets.setter |
|
380 | super(LoadBalancedView, self).__init__(client, targets) | |
357 | def targets(self, value): |
|
381 | self._ntargets = 1 | |
358 | raise AttributeError("Cannot set targets for LoadbalancedView!") |
|
382 | self._apply_name = 'apply_balanced' | |
359 |
|
383 | |||
360 | No newline at end of file |
|
General Comments 0
You need to be logged in to leave comments.
Login now