Show More
@@ -21,9 +21,10 class AsyncResult(object): | |||
|
21 | 21 | |
|
22 | 22 | Provides the same interface as :py:class:`multiprocessing.AsyncResult`. |
|
23 | 23 | """ |
|
24 | def __init__(self, client, msg_ids): | |
|
24 | def __init__(self, client, msg_ids, targets=None): | |
|
25 | 25 | self._client = client |
|
26 | 26 | self.msg_ids = msg_ids |
|
27 | self._targets=targets | |
|
27 | 28 | self._ready = False |
|
28 | 29 | self._success = None |
|
29 | 30 | |
@@ -41,6 +42,8 class AsyncResult(object): | |||
|
41 | 42 | """ |
|
42 | 43 | if len(res) == 1: |
|
43 | 44 | return res[0] |
|
45 | elif self.targets is not None: | |
|
46 | return dict(zip(self._targets, res)) | |
|
44 | 47 | else: |
|
45 | 48 | return res |
|
46 | 49 |
@@ -632,7 +632,7 class Client(object): | |||
|
632 | 632 | whether or not to wait until done |
|
633 | 633 | |
|
634 | 634 | """ |
|
635 | result = self.apply(execute, (code,), targets=None, block=block, bound=False) | |
|
635 | result = self.apply(_execute, (code,), targets=None, block=block, bound=False) | |
|
636 | 636 | return result |
|
637 | 637 | |
|
638 | 638 | def _maybe_raise(self, result): |
@@ -721,6 +721,18 class Client(object): | |||
|
721 | 721 | if not isinstance(kwargs, dict): |
|
722 | 722 | raise TypeError("kwargs must be dict, not %s"%type(kwargs)) |
|
723 | 723 | |
|
724 | if isinstance(after, Dependency): | |
|
725 | after = after.as_dict() | |
|
726 | elif isinstance(after, AsyncResult): | |
|
727 | after=after.msg_ids | |
|
728 | elif after is None: | |
|
729 | after = [] | |
|
730 | if isinstance(follow, Dependency): | |
|
731 | follow = follow.as_dict() | |
|
732 | elif isinstance(follow, AsyncResult): | |
|
733 | follow=follow.msg_ids | |
|
734 | elif follow is None: | |
|
735 | follow = [] | |
|
724 | 736 | options = dict(bound=bound, block=block, after=after, follow=follow) |
|
725 | 737 | |
|
726 | 738 | if targets is None: |
@@ -732,18 +744,11 class Client(object): | |||
|
732 | 744 | after=None, follow=None): |
|
733 | 745 | """The underlying method for applying functions in a load balanced |
|
734 | 746 | manner, via the task queue.""" |
|
735 | if isinstance(after, Dependency): | |
|
736 | after = after.as_dict() | |
|
737 | elif after is None: | |
|
738 | after = [] | |
|
739 | if isinstance(follow, Dependency): | |
|
740 | follow = follow.as_dict() | |
|
741 | elif follow is None: | |
|
742 | follow = [] | |
|
743 | subheader = dict(after=after, follow=follow) | |
|
744 | 747 | |
|
748 | subheader = dict(after=after, follow=follow) | |
|
745 | 749 | bufs = ss.pack_apply_message(f,args,kwargs) |
|
746 | 750 | content = dict(bound=bound) |
|
751 | ||
|
747 | 752 | msg = self.session.send(self._task_socket, "apply_request", |
|
748 | 753 | content=content, buffers=bufs, subheader=subheader) |
|
749 | 754 | msg_id = msg['msg_id'] |
@@ -761,17 +766,11 class Client(object): | |||
|
761 | 766 | via the MUX queue.""" |
|
762 | 767 | |
|
763 | 768 | queues,targets = self._build_targets(targets) |
|
764 | bufs = ss.pack_apply_message(f,args,kwargs) | |
|
765 | if isinstance(after, Dependency): | |
|
766 | after = after.as_dict() | |
|
767 | elif after is None: | |
|
768 | after = [] | |
|
769 | if isinstance(follow, Dependency): | |
|
770 | follow = follow.as_dict() | |
|
771 | elif follow is None: | |
|
772 | follow = [] | |
|
769 | ||
|
773 | 770 | subheader = dict(after=after, follow=follow) |
|
774 | 771 | content = dict(bound=bound) |
|
772 | bufs = ss.pack_apply_message(f,args,kwargs) | |
|
773 | ||
|
775 | 774 | msg_ids = [] |
|
776 | 775 | for queue in queues: |
|
777 | 776 | msg = self.session.send(self._mux_socket, "apply_request", |
@@ -783,7 +782,7 class Client(object): | |||
|
783 | 782 | if block: |
|
784 | 783 | self.barrier(msg_ids) |
|
785 | 784 | else: |
|
786 | return AsyncResult(self, msg_ids) | |
|
785 | return AsyncResult(self, msg_ids, targets=targets) | |
|
787 | 786 | if len(msg_ids) == 1: |
|
788 | 787 | return self._maybe_raise(self.results[msg_ids[0]]) |
|
789 | 788 | else: |
@@ -850,7 +849,7 class Client(object): | |||
|
850 | 849 | else: |
|
851 | 850 | r = self.push({key: partition}, targets=engineid, block=False) |
|
852 | 851 | msg_ids.extend(r.msg_ids) |
|
853 | r = AsyncResult(self, msg_ids) | |
|
852 | r = AsyncResult(self, msg_ids,targets) | |
|
854 | 853 | if block: |
|
855 | 854 | return r.get() |
|
856 | 855 | else: |
@@ -263,21 +263,19 class DirectView(View): | |||
|
263 | 263 | Partition a Python sequence and send the partitions to a set of engines. |
|
264 | 264 | """ |
|
265 | 265 | block = block if block is not None else self.block |
|
266 | if targets is None: | |
|
267 | targets = self.targets | |
|
266 | targets = targets if targets is not None else self.targets | |
|
268 | 267 | |
|
269 | 268 | return self.client.scatter(key, seq, dist=dist, flatten=flatten, |
|
270 | 269 | targets=targets, block=block) |
|
271 | 270 | |
|
272 | 271 | @sync_results |
|
273 | 272 | @save_ids |
|
274 |
def gather(self, key, dist='b', targets=None, block= |
|
|
273 | def gather(self, key, dist='b', targets=None, block=None): | |
|
275 | 274 | """ |
|
276 | 275 | Gather a partitioned sequence on a set of engines as a single local seq. |
|
277 | 276 | """ |
|
278 | 277 | block = block if block is not None else self.block |
|
279 | if targets is None: | |
|
280 | targets = self.targets | |
|
278 | targets = targets if targets is not None else self.targets | |
|
281 | 279 | |
|
282 | 280 | return self.client.gather(key, dist=dist, targets=targets, block=block) |
|
283 | 281 |
General Comments 0
You need to be logged in to leave comments.
Login now