##// END OF EJS Templates
match return shape in AsyncResult to sync results
MinRK -
Show More
@@ -21,9 +21,10 class AsyncResult(object):
21 21
22 22 Provides the same interface as :py:class:`multiprocessing.AsyncResult`.
23 23 """
24 def __init__(self, client, msg_ids):
24 def __init__(self, client, msg_ids, targets=None):
25 25 self._client = client
26 26 self.msg_ids = msg_ids
27 self._targets=targets
27 28 self._ready = False
28 29 self._success = None
29 30
@@ -41,6 +42,8 class AsyncResult(object):
41 42 """
42 43 if len(res) == 1:
43 44 return res[0]
45 elif self.targets is not None:
46 return dict(zip(self._targets, res))
44 47 else:
45 48 return res
46 49
@@ -632,7 +632,7 class Client(object):
632 632 whether or not to wait until done
633 633
634 634 """
635 result = self.apply(execute, (code,), targets=None, block=block, bound=False)
635 result = self.apply(_execute, (code,), targets=None, block=block, bound=False)
636 636 return result
637 637
638 638 def _maybe_raise(self, result):
@@ -721,6 +721,18 class Client(object):
721 721 if not isinstance(kwargs, dict):
722 722 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
723 723
724 if isinstance(after, Dependency):
725 after = after.as_dict()
726 elif isinstance(after, AsyncResult):
727 after=after.msg_ids
728 elif after is None:
729 after = []
730 if isinstance(follow, Dependency):
731 follow = follow.as_dict()
732 elif isinstance(follow, AsyncResult):
733 follow=follow.msg_ids
734 elif follow is None:
735 follow = []
724 736 options = dict(bound=bound, block=block, after=after, follow=follow)
725 737
726 738 if targets is None:
@@ -732,18 +744,11 class Client(object):
732 744 after=None, follow=None):
733 745 """The underlying method for applying functions in a load balanced
734 746 manner, via the task queue."""
735 if isinstance(after, Dependency):
736 after = after.as_dict()
737 elif after is None:
738 after = []
739 if isinstance(follow, Dependency):
740 follow = follow.as_dict()
741 elif follow is None:
742 follow = []
743 subheader = dict(after=after, follow=follow)
744 747
748 subheader = dict(after=after, follow=follow)
745 749 bufs = ss.pack_apply_message(f,args,kwargs)
746 750 content = dict(bound=bound)
751
747 752 msg = self.session.send(self._task_socket, "apply_request",
748 753 content=content, buffers=bufs, subheader=subheader)
749 754 msg_id = msg['msg_id']
@@ -761,17 +766,11 class Client(object):
761 766 via the MUX queue."""
762 767
763 768 queues,targets = self._build_targets(targets)
764 bufs = ss.pack_apply_message(f,args,kwargs)
765 if isinstance(after, Dependency):
766 after = after.as_dict()
767 elif after is None:
768 after = []
769 if isinstance(follow, Dependency):
770 follow = follow.as_dict()
771 elif follow is None:
772 follow = []
769
773 770 subheader = dict(after=after, follow=follow)
774 771 content = dict(bound=bound)
772 bufs = ss.pack_apply_message(f,args,kwargs)
773
775 774 msg_ids = []
776 775 for queue in queues:
777 776 msg = self.session.send(self._mux_socket, "apply_request",
@@ -783,7 +782,7 class Client(object):
783 782 if block:
784 783 self.barrier(msg_ids)
785 784 else:
786 return AsyncResult(self, msg_ids)
785 return AsyncResult(self, msg_ids, targets=targets)
787 786 if len(msg_ids) == 1:
788 787 return self._maybe_raise(self.results[msg_ids[0]])
789 788 else:
@@ -850,7 +849,7 class Client(object):
850 849 else:
851 850 r = self.push({key: partition}, targets=engineid, block=False)
852 851 msg_ids.extend(r.msg_ids)
853 r = AsyncResult(self, msg_ids)
852 r = AsyncResult(self, msg_ids,targets)
854 853 if block:
855 854 return r.get()
856 855 else:
@@ -263,21 +263,19 class DirectView(View):
263 263 Partition a Python sequence and send the partitions to a set of engines.
264 264 """
265 265 block = block if block is not None else self.block
266 if targets is None:
267 targets = self.targets
266 targets = targets if targets is not None else self.targets
268 267
269 268 return self.client.scatter(key, seq, dist=dist, flatten=flatten,
270 269 targets=targets, block=block)
271 270
272 271 @sync_results
273 272 @save_ids
274 def gather(self, key, dist='b', targets=None, block=True):
273 def gather(self, key, dist='b', targets=None, block=None):
275 274 """
276 275 Gather a partitioned sequence on a set of engines as a single local seq.
277 276 """
278 277 block = block if block is not None else self.block
279 if targets is None:
280 targets = self.targets
278 targets = targets if targets is not None else self.targets
281 279
282 280 return self.client.gather(key, dist=dist, targets=targets, block=block)
283 281
General Comments 0
You need to be logged in to leave comments. Login now