Show More
@@ -21,9 +21,10 b' class AsyncResult(object):' | |||||
21 |
|
21 | |||
22 | Provides the same interface as :py:class:`multiprocessing.AsyncResult`. |
|
22 | Provides the same interface as :py:class:`multiprocessing.AsyncResult`. | |
23 | """ |
|
23 | """ | |
24 | def __init__(self, client, msg_ids): |
|
24 | def __init__(self, client, msg_ids, targets=None): | |
25 | self._client = client |
|
25 | self._client = client | |
26 | self.msg_ids = msg_ids |
|
26 | self.msg_ids = msg_ids | |
|
27 | self._targets=targets | |||
27 | self._ready = False |
|
28 | self._ready = False | |
28 | self._success = None |
|
29 | self._success = None | |
29 |
|
30 | |||
@@ -41,6 +42,8 b' class AsyncResult(object):' | |||||
41 | """ |
|
42 | """ | |
42 | if len(res) == 1: |
|
43 | if len(res) == 1: | |
43 | return res[0] |
|
44 | return res[0] | |
|
45 | elif self.targets is not None: | |||
|
46 | return dict(zip(self._targets, res)) | |||
44 | else: |
|
47 | else: | |
45 | return res |
|
48 | return res | |
46 |
|
49 |
@@ -632,7 +632,7 b' class Client(object):' | |||||
632 | whether or not to wait until done |
|
632 | whether or not to wait until done | |
633 |
|
633 | |||
634 | """ |
|
634 | """ | |
635 | result = self.apply(execute, (code,), targets=None, block=block, bound=False) |
|
635 | result = self.apply(_execute, (code,), targets=None, block=block, bound=False) | |
636 | return result |
|
636 | return result | |
637 |
|
637 | |||
638 | def _maybe_raise(self, result): |
|
638 | def _maybe_raise(self, result): | |
@@ -721,6 +721,18 b' class Client(object):' | |||||
721 | if not isinstance(kwargs, dict): |
|
721 | if not isinstance(kwargs, dict): | |
722 | raise TypeError("kwargs must be dict, not %s"%type(kwargs)) |
|
722 | raise TypeError("kwargs must be dict, not %s"%type(kwargs)) | |
723 |
|
723 | |||
|
724 | if isinstance(after, Dependency): | |||
|
725 | after = after.as_dict() | |||
|
726 | elif isinstance(after, AsyncResult): | |||
|
727 | after=after.msg_ids | |||
|
728 | elif after is None: | |||
|
729 | after = [] | |||
|
730 | if isinstance(follow, Dependency): | |||
|
731 | follow = follow.as_dict() | |||
|
732 | elif isinstance(follow, AsyncResult): | |||
|
733 | follow=follow.msg_ids | |||
|
734 | elif follow is None: | |||
|
735 | follow = [] | |||
724 | options = dict(bound=bound, block=block, after=after, follow=follow) |
|
736 | options = dict(bound=bound, block=block, after=after, follow=follow) | |
725 |
|
737 | |||
726 | if targets is None: |
|
738 | if targets is None: | |
@@ -732,18 +744,11 b' class Client(object):' | |||||
732 | after=None, follow=None): |
|
744 | after=None, follow=None): | |
733 | """The underlying method for applying functions in a load balanced |
|
745 | """The underlying method for applying functions in a load balanced | |
734 | manner, via the task queue.""" |
|
746 | manner, via the task queue.""" | |
735 | if isinstance(after, Dependency): |
|
|||
736 | after = after.as_dict() |
|
|||
737 | elif after is None: |
|
|||
738 | after = [] |
|
|||
739 | if isinstance(follow, Dependency): |
|
|||
740 | follow = follow.as_dict() |
|
|||
741 | elif follow is None: |
|
|||
742 | follow = [] |
|
|||
743 | subheader = dict(after=after, follow=follow) |
|
|||
744 |
|
747 | |||
|
748 | subheader = dict(after=after, follow=follow) | |||
745 | bufs = ss.pack_apply_message(f,args,kwargs) |
|
749 | bufs = ss.pack_apply_message(f,args,kwargs) | |
746 | content = dict(bound=bound) |
|
750 | content = dict(bound=bound) | |
|
751 | ||||
747 | msg = self.session.send(self._task_socket, "apply_request", |
|
752 | msg = self.session.send(self._task_socket, "apply_request", | |
748 | content=content, buffers=bufs, subheader=subheader) |
|
753 | content=content, buffers=bufs, subheader=subheader) | |
749 | msg_id = msg['msg_id'] |
|
754 | msg_id = msg['msg_id'] | |
@@ -761,17 +766,11 b' class Client(object):' | |||||
761 | via the MUX queue.""" |
|
766 | via the MUX queue.""" | |
762 |
|
767 | |||
763 | queues,targets = self._build_targets(targets) |
|
768 | queues,targets = self._build_targets(targets) | |
764 | bufs = ss.pack_apply_message(f,args,kwargs) |
|
769 | ||
765 | if isinstance(after, Dependency): |
|
|||
766 | after = after.as_dict() |
|
|||
767 | elif after is None: |
|
|||
768 | after = [] |
|
|||
769 | if isinstance(follow, Dependency): |
|
|||
770 | follow = follow.as_dict() |
|
|||
771 | elif follow is None: |
|
|||
772 | follow = [] |
|
|||
773 | subheader = dict(after=after, follow=follow) |
|
770 | subheader = dict(after=after, follow=follow) | |
774 | content = dict(bound=bound) |
|
771 | content = dict(bound=bound) | |
|
772 | bufs = ss.pack_apply_message(f,args,kwargs) | |||
|
773 | ||||
775 | msg_ids = [] |
|
774 | msg_ids = [] | |
776 | for queue in queues: |
|
775 | for queue in queues: | |
777 | msg = self.session.send(self._mux_socket, "apply_request", |
|
776 | msg = self.session.send(self._mux_socket, "apply_request", | |
@@ -783,7 +782,7 b' class Client(object):' | |||||
783 | if block: |
|
782 | if block: | |
784 | self.barrier(msg_ids) |
|
783 | self.barrier(msg_ids) | |
785 | else: |
|
784 | else: | |
786 | return AsyncResult(self, msg_ids) |
|
785 | return AsyncResult(self, msg_ids, targets=targets) | |
787 | if len(msg_ids) == 1: |
|
786 | if len(msg_ids) == 1: | |
788 | return self._maybe_raise(self.results[msg_ids[0]]) |
|
787 | return self._maybe_raise(self.results[msg_ids[0]]) | |
789 | else: |
|
788 | else: | |
@@ -850,7 +849,7 b' class Client(object):' | |||||
850 | else: |
|
849 | else: | |
851 | r = self.push({key: partition}, targets=engineid, block=False) |
|
850 | r = self.push({key: partition}, targets=engineid, block=False) | |
852 | msg_ids.extend(r.msg_ids) |
|
851 | msg_ids.extend(r.msg_ids) | |
853 | r = AsyncResult(self, msg_ids) |
|
852 | r = AsyncResult(self, msg_ids,targets) | |
854 | if block: |
|
853 | if block: | |
855 | return r.get() |
|
854 | return r.get() | |
856 | else: |
|
855 | else: |
@@ -263,21 +263,19 b' class DirectView(View):' | |||||
263 | Partition a Python sequence and send the partitions to a set of engines. |
|
263 | Partition a Python sequence and send the partitions to a set of engines. | |
264 | """ |
|
264 | """ | |
265 | block = block if block is not None else self.block |
|
265 | block = block if block is not None else self.block | |
266 | if targets is None: |
|
266 | targets = targets if targets is not None else self.targets | |
267 | targets = self.targets |
|
|||
268 |
|
267 | |||
269 | return self.client.scatter(key, seq, dist=dist, flatten=flatten, |
|
268 | return self.client.scatter(key, seq, dist=dist, flatten=flatten, | |
270 | targets=targets, block=block) |
|
269 | targets=targets, block=block) | |
271 |
|
270 | |||
272 | @sync_results |
|
271 | @sync_results | |
273 | @save_ids |
|
272 | @save_ids | |
274 |
def gather(self, key, dist='b', targets=None, block= |
|
273 | def gather(self, key, dist='b', targets=None, block=None): | |
275 | """ |
|
274 | """ | |
276 | Gather a partitioned sequence on a set of engines as a single local seq. |
|
275 | Gather a partitioned sequence on a set of engines as a single local seq. | |
277 | """ |
|
276 | """ | |
278 | block = block if block is not None else self.block |
|
277 | block = block if block is not None else self.block | |
279 | if targets is None: |
|
278 | targets = targets if targets is not None else self.targets | |
280 | targets = self.targets |
|
|||
281 |
|
279 | |||
282 | return self.client.gather(key, dist=dist, targets=targets, block=block) |
|
280 | return self.client.gather(key, dist=dist, targets=targets, block=block) | |
283 |
|
281 |
General Comments 0
You need to be logged in to leave comments.
Login now