##// END OF EJS Templates
match return shape in AsyncResult to sync results
MinRK -
Show More
@@ -21,9 +21,10 b' class AsyncResult(object):'
21
21
22 Provides the same interface as :py:class:`multiprocessing.AsyncResult`.
22 Provides the same interface as :py:class:`multiprocessing.AsyncResult`.
23 """
23 """
24 def __init__(self, client, msg_ids):
24 def __init__(self, client, msg_ids, targets=None):
25 self._client = client
25 self._client = client
26 self.msg_ids = msg_ids
26 self.msg_ids = msg_ids
27 self._targets=targets
27 self._ready = False
28 self._ready = False
28 self._success = None
29 self._success = None
29
30
@@ -41,6 +42,8 b' class AsyncResult(object):'
41 """
42 """
42 if len(res) == 1:
43 if len(res) == 1:
43 return res[0]
44 return res[0]
45 elif self.targets is not None:
46 return dict(zip(self._targets, res))
44 else:
47 else:
45 return res
48 return res
46
49
@@ -632,7 +632,7 b' class Client(object):'
632 whether or not to wait until done
632 whether or not to wait until done
633
633
634 """
634 """
635 result = self.apply(execute, (code,), targets=None, block=block, bound=False)
635 result = self.apply(_execute, (code,), targets=None, block=block, bound=False)
636 return result
636 return result
637
637
638 def _maybe_raise(self, result):
638 def _maybe_raise(self, result):
@@ -721,6 +721,18 b' class Client(object):'
721 if not isinstance(kwargs, dict):
721 if not isinstance(kwargs, dict):
722 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
722 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
723
723
724 if isinstance(after, Dependency):
725 after = after.as_dict()
726 elif isinstance(after, AsyncResult):
727 after=after.msg_ids
728 elif after is None:
729 after = []
730 if isinstance(follow, Dependency):
731 follow = follow.as_dict()
732 elif isinstance(follow, AsyncResult):
733 follow=follow.msg_ids
734 elif follow is None:
735 follow = []
724 options = dict(bound=bound, block=block, after=after, follow=follow)
736 options = dict(bound=bound, block=block, after=after, follow=follow)
725
737
726 if targets is None:
738 if targets is None:
@@ -732,18 +744,11 b' class Client(object):'
732 after=None, follow=None):
744 after=None, follow=None):
733 """The underlying method for applying functions in a load balanced
745 """The underlying method for applying functions in a load balanced
734 manner, via the task queue."""
746 manner, via the task queue."""
735 if isinstance(after, Dependency):
736 after = after.as_dict()
737 elif after is None:
738 after = []
739 if isinstance(follow, Dependency):
740 follow = follow.as_dict()
741 elif follow is None:
742 follow = []
743 subheader = dict(after=after, follow=follow)
744
747
748 subheader = dict(after=after, follow=follow)
745 bufs = ss.pack_apply_message(f,args,kwargs)
749 bufs = ss.pack_apply_message(f,args,kwargs)
746 content = dict(bound=bound)
750 content = dict(bound=bound)
751
747 msg = self.session.send(self._task_socket, "apply_request",
752 msg = self.session.send(self._task_socket, "apply_request",
748 content=content, buffers=bufs, subheader=subheader)
753 content=content, buffers=bufs, subheader=subheader)
749 msg_id = msg['msg_id']
754 msg_id = msg['msg_id']
@@ -761,17 +766,11 b' class Client(object):'
761 via the MUX queue."""
766 via the MUX queue."""
762
767
763 queues,targets = self._build_targets(targets)
768 queues,targets = self._build_targets(targets)
764 bufs = ss.pack_apply_message(f,args,kwargs)
769
765 if isinstance(after, Dependency):
766 after = after.as_dict()
767 elif after is None:
768 after = []
769 if isinstance(follow, Dependency):
770 follow = follow.as_dict()
771 elif follow is None:
772 follow = []
773 subheader = dict(after=after, follow=follow)
770 subheader = dict(after=after, follow=follow)
774 content = dict(bound=bound)
771 content = dict(bound=bound)
772 bufs = ss.pack_apply_message(f,args,kwargs)
773
775 msg_ids = []
774 msg_ids = []
776 for queue in queues:
775 for queue in queues:
777 msg = self.session.send(self._mux_socket, "apply_request",
776 msg = self.session.send(self._mux_socket, "apply_request",
@@ -783,7 +782,7 b' class Client(object):'
783 if block:
782 if block:
784 self.barrier(msg_ids)
783 self.barrier(msg_ids)
785 else:
784 else:
786 return AsyncResult(self, msg_ids)
785 return AsyncResult(self, msg_ids, targets=targets)
787 if len(msg_ids) == 1:
786 if len(msg_ids) == 1:
788 return self._maybe_raise(self.results[msg_ids[0]])
787 return self._maybe_raise(self.results[msg_ids[0]])
789 else:
788 else:
@@ -850,7 +849,7 b' class Client(object):'
850 else:
849 else:
851 r = self.push({key: partition}, targets=engineid, block=False)
850 r = self.push({key: partition}, targets=engineid, block=False)
852 msg_ids.extend(r.msg_ids)
851 msg_ids.extend(r.msg_ids)
853 r = AsyncResult(self, msg_ids)
852 r = AsyncResult(self, msg_ids,targets)
854 if block:
853 if block:
855 return r.get()
854 return r.get()
856 else:
855 else:
@@ -263,21 +263,19 b' class DirectView(View):'
263 Partition a Python sequence and send the partitions to a set of engines.
263 Partition a Python sequence and send the partitions to a set of engines.
264 """
264 """
265 block = block if block is not None else self.block
265 block = block if block is not None else self.block
266 if targets is None:
266 targets = targets if targets is not None else self.targets
267 targets = self.targets
268
267
269 return self.client.scatter(key, seq, dist=dist, flatten=flatten,
268 return self.client.scatter(key, seq, dist=dist, flatten=flatten,
270 targets=targets, block=block)
269 targets=targets, block=block)
271
270
272 @sync_results
271 @sync_results
273 @save_ids
272 @save_ids
274 def gather(self, key, dist='b', targets=None, block=True):
273 def gather(self, key, dist='b', targets=None, block=None):
275 """
274 """
276 Gather a partitioned sequence on a set of engines as a single local seq.
275 Gather a partitioned sequence on a set of engines as a single local seq.
277 """
276 """
278 block = block if block is not None else self.block
277 block = block if block is not None else self.block
279 if targets is None:
278 targets = targets if targets is not None else self.targets
280 targets = self.targets
281
279
282 return self.client.gather(key, dist=dist, targets=targets, block=block)
280 return self.client.gather(key, dist=dist, targets=targets, block=block)
283
281
General Comments 0
You need to be logged in to leave comments. Login now