Show More
@@ -0,0 +1,112 b'' | |||||
|
1 | """AsyncResult objects for the client""" | |||
|
2 | #----------------------------------------------------------------------------- | |||
|
3 | # Copyright (C) 2010 The IPython Development Team | |||
|
4 | # | |||
|
5 | # Distributed under the terms of the BSD License. The full license is in | |||
|
6 | # the file COPYING, distributed as part of this software. | |||
|
7 | #----------------------------------------------------------------------------- | |||
|
8 | ||||
|
9 | #----------------------------------------------------------------------------- | |||
|
10 | # Imports | |||
|
11 | #----------------------------------------------------------------------------- | |||
|
12 | ||||
|
13 | import error | |||
|
14 | ||||
|
15 | #----------------------------------------------------------------------------- | |||
|
16 | # Classes | |||
|
17 | #----------------------------------------------------------------------------- | |||
|
18 | ||||
|
19 | class AsyncResult(object): | |||
|
20 | """Class for representing results of non-blocking calls. | |||
|
21 | ||||
|
22 | Provides the same interface as :py:class:`multiprocessing.AsyncResult`. | |||
|
23 | """ | |||
|
24 | def __init__(self, client, msg_ids): | |||
|
25 | self._client = client | |||
|
26 | self._msg_ids = msg_ids | |||
|
27 | self._ready = False | |||
|
28 | self._success = None | |||
|
29 | ||||
|
30 | def __repr__(self): | |||
|
31 | if self._ready: | |||
|
32 | return "<%s: finished>"%(self.__class__.__name__) | |||
|
33 | else: | |||
|
34 | return "<%s: %r>"%(self.__class__.__name__,self._msg_ids) | |||
|
35 | ||||
|
36 | ||||
|
37 | def _reconstruct_result(self, res): | |||
|
38 | """ | |||
|
39 | Override me in subclasses for turning a list of results | |||
|
40 | into the expected form. | |||
|
41 | """ | |||
|
42 | if len(res) == 1: | |||
|
43 | return res[0] | |||
|
44 | else: | |||
|
45 | return res | |||
|
46 | ||||
|
47 | def get(self, timeout=-1): | |||
|
48 | """Return the result when it arrives. | |||
|
49 | ||||
|
50 | If `timeout` is not ``None`` and the result does not arrive within | |||
|
51 | `timeout` seconds then ``TimeoutError`` is raised. If the | |||
|
52 | remote call raised an exception then that exception will be reraised | |||
|
53 | by get(). | |||
|
54 | """ | |||
|
55 | if not self.ready(): | |||
|
56 | self.wait(timeout) | |||
|
57 | ||||
|
58 | if self._ready: | |||
|
59 | if self._success: | |||
|
60 | return self._result | |||
|
61 | else: | |||
|
62 | raise self._exception | |||
|
63 | else: | |||
|
64 | raise error.TimeoutError("Result not ready.") | |||
|
65 | ||||
|
66 | def ready(self): | |||
|
67 | """Return whether the call has completed.""" | |||
|
68 | if not self._ready: | |||
|
69 | self.wait(0) | |||
|
70 | return self._ready | |||
|
71 | ||||
|
72 | def wait(self, timeout=-1): | |||
|
73 | """Wait until the result is available or until `timeout` seconds pass. | |||
|
74 | """ | |||
|
75 | if self._ready: | |||
|
76 | return | |||
|
77 | self._ready = self._client.barrier(self._msg_ids, timeout) | |||
|
78 | if self._ready: | |||
|
79 | try: | |||
|
80 | results = map(self._client.results.get, self._msg_ids) | |||
|
81 | results = error.collect_exceptions(results, 'get') | |||
|
82 | self._result = self._reconstruct_result(results) | |||
|
83 | except Exception, e: | |||
|
84 | self._exception = e | |||
|
85 | self._success = False | |||
|
86 | else: | |||
|
87 | self._success = True | |||
|
88 | ||||
|
89 | ||||
|
90 | def successful(self): | |||
|
91 | """Return whether the call completed without raising an exception. | |||
|
92 | ||||
|
93 | Will raise ``AssertionError`` if the result is not ready. | |||
|
94 | """ | |||
|
95 | assert self._ready | |||
|
96 | return self._success | |||
|
97 | ||||
|
98 | class AsyncMapResult(AsyncResult): | |||
|
99 | """Class for representing results of non-blocking gathers. | |||
|
100 | ||||
|
101 | This will properly reconstruct the gather. | |||
|
102 | """ | |||
|
103 | ||||
|
104 | def __init__(self, client, msg_ids, mapObject): | |||
|
105 | self._mapObject = mapObject | |||
|
106 | AsyncResult.__init__(self, client, msg_ids) | |||
|
107 | ||||
|
108 | def _reconstruct_result(self, res): | |||
|
109 | """Perform the gather on the actual results.""" | |||
|
110 | return self._mapObject.joinPartitions(res) | |||
|
111 | ||||
|
112 |
@@ -29,7 +29,7 b' from view import DirectView, LoadBalancedView' | |||||
29 | from dependency import Dependency, depend, require |
|
29 | from dependency import Dependency, depend, require | |
30 | import error |
|
30 | import error | |
31 | import map as Map |
|
31 | import map as Map | |
32 |
from |
|
32 | from asyncresult import AsyncResult, AsyncMapResult | |
33 | from remotefunction import remote,parallel,ParallelFunction,RemoteFunction |
|
33 | from remotefunction import remote,parallel,ParallelFunction,RemoteFunction | |
34 |
|
34 | |||
35 | #-------------------------------------------------------------------------- |
|
35 | #-------------------------------------------------------------------------- | |
@@ -746,7 +746,7 b' class Client(object):' | |||||
746 | self.barrier(msg_id) |
|
746 | self.barrier(msg_id) | |
747 | return self._maybe_raise(self.results[msg_id]) |
|
747 | return self._maybe_raise(self.results[msg_id]) | |
748 | else: |
|
748 | else: | |
749 |
return |
|
749 | return AsyncResult(self, [msg_id]) | |
750 |
|
750 | |||
751 | def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None, |
|
751 | def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None, | |
752 | after=None, follow=None): |
|
752 | after=None, follow=None): | |
@@ -776,7 +776,7 b' class Client(object):' | |||||
776 | if block: |
|
776 | if block: | |
777 | self.barrier(msg_ids) |
|
777 | self.barrier(msg_ids) | |
778 | else: |
|
778 | else: | |
779 |
return |
|
779 | return AsyncResult(self, msg_ids) | |
780 | if len(msg_ids) == 1: |
|
780 | if len(msg_ids) == 1: | |
781 | return self._maybe_raise(self.results[msg_ids[0]]) |
|
781 | return self._maybe_raise(self.results[msg_ids[0]]) | |
782 | else: |
|
782 | else: | |
@@ -785,12 +785,24 b' class Client(object):' | |||||
785 | result[target] = self.results[mid] |
|
785 | result[target] = self.results[mid] | |
786 | return error.collect_exceptions(result, f.__name__) |
|
786 | return error.collect_exceptions(result, f.__name__) | |
787 |
|
787 | |||
|
788 | #-------------------------------------------------------------------------- | |||
|
789 | # Map and decorators | |||
|
790 | #-------------------------------------------------------------------------- | |||
|
791 | ||||
788 | def map(self, f, *sequences): |
|
792 | def map(self, f, *sequences): | |
789 | """Parallel version of builtin `map`, using all our engines.""" |
|
793 | """Parallel version of builtin `map`, using all our engines.""" | |
790 | pf = ParallelFunction(self, f, block=self.block, |
|
794 | pf = ParallelFunction(self, f, block=self.block, | |
791 | bound=True, targets='all') |
|
795 | bound=True, targets='all') | |
792 | return pf.map(*sequences) |
|
796 | return pf.map(*sequences) | |
793 |
|
797 | |||
|
798 | def parallel(self, bound=True, targets='all', block=True): | |||
|
799 | """Decorator for making a ParallelFunction""" | |||
|
800 | return parallel(self, bound=bound, targets=targets, block=block) | |||
|
801 | ||||
|
802 | def remote(self, bound=True, targets='all', block=True): | |||
|
803 | """Decorator for making a RemoteFunction""" | |||
|
804 | return remote(self, bound=bound, targets=targets, block=block) | |||
|
805 | ||||
794 | #-------------------------------------------------------------------------- |
|
806 | #-------------------------------------------------------------------------- | |
795 | # Data movement |
|
807 | # Data movement | |
796 | #-------------------------------------------------------------------------- |
|
808 | #-------------------------------------------------------------------------- | |
@@ -831,7 +843,7 b' class Client(object):' | |||||
831 | else: |
|
843 | else: | |
832 | mid = self.push({key: partition}, targets=engineid, block=False) |
|
844 | mid = self.push({key: partition}, targets=engineid, block=False) | |
833 | msg_ids.append(mid) |
|
845 | msg_ids.append(mid) | |
834 |
r = |
|
846 | r = AsyncResult(self, msg_ids) | |
835 | if block: |
|
847 | if block: | |
836 | r.wait() |
|
848 | r.wait() | |
837 | return |
|
849 | return | |
@@ -850,7 +862,7 b' class Client(object):' | |||||
850 | for index, engineid in enumerate(targets): |
|
862 | for index, engineid in enumerate(targets): | |
851 | msg_ids.append(self.pull(key, targets=engineid,block=False)) |
|
863 | msg_ids.append(self.pull(key, targets=engineid,block=False)) | |
852 |
|
864 | |||
853 |
r = |
|
865 | r = AsyncMapResult(self, msg_ids, mapObject) | |
854 | if block: |
|
866 | if block: | |
855 | r.wait() |
|
867 | r.wait() | |
856 | return r.result |
|
868 | return r.result | |
@@ -1002,6 +1014,6 b" __all__ = [ 'Client'," | |||||
1002 | 'ParallelFunction', |
|
1014 | 'ParallelFunction', | |
1003 | 'DirectView', |
|
1015 | 'DirectView', | |
1004 | 'LoadBalancedView', |
|
1016 | 'LoadBalancedView', | |
1005 |
' |
|
1017 | 'AsyncResult', | |
1006 |
' |
|
1018 | 'AsyncMapResult' | |
1007 | ] |
|
1019 | ] |
@@ -145,6 +145,9 b' class SecurityError(KernelError):' | |||||
145 | class FileTimeoutError(KernelError): |
|
145 | class FileTimeoutError(KernelError): | |
146 | pass |
|
146 | pass | |
147 |
|
147 | |||
|
148 | class TimeoutError(KernelError): | |||
|
149 | pass | |||
|
150 | ||||
148 | class RemoteError(KernelError): |
|
151 | class RemoteError(KernelError): | |
149 | """Error raised elsewhere""" |
|
152 | """Error raised elsewhere""" | |
150 | ename=None |
|
153 | ename=None |
@@ -9,7 +9,7 b' import time' | |||||
9 | import uuid |
|
9 | import uuid | |
10 |
|
10 | |||
11 | import zmq |
|
11 | import zmq | |
12 | from zmq.devices import ProcessDevice |
|
12 | from zmq.devices import ProcessDevice,ThreadDevice | |
13 | from zmq.eventloop import ioloop, zmqstream |
|
13 | from zmq.eventloop import ioloop, zmqstream | |
14 |
|
14 | |||
15 | #internal |
|
15 | #internal | |
@@ -27,7 +27,7 b' class Heart(object):' | |||||
27 | device=None |
|
27 | device=None | |
28 | id=None |
|
28 | id=None | |
29 | def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_id=None): |
|
29 | def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_id=None): | |
30 |
self.device = |
|
30 | self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type) | |
31 | self.device.daemon=True |
|
31 | self.device.daemon=True | |
32 | self.device.connect_in(in_addr) |
|
32 | self.device.connect_in(in_addr) | |
33 | self.device.connect_out(out_addr) |
|
33 | self.device.connect_out(out_addr) |
@@ -11,7 +11,7 b'' | |||||
11 | #----------------------------------------------------------------------------- |
|
11 | #----------------------------------------------------------------------------- | |
12 |
|
12 | |||
13 | import map as Map |
|
13 | import map as Map | |
14 |
from |
|
14 | from asyncresult import AsyncMapResult | |
15 |
|
15 | |||
16 | #----------------------------------------------------------------------------- |
|
16 | #----------------------------------------------------------------------------- | |
17 | # Decorators |
|
17 | # Decorators | |
@@ -126,10 +126,10 b' class ParallelFunction(RemoteFunction):' | |||||
126 | f=self.func |
|
126 | f=self.func | |
127 | mid = self.client.apply(f, args=args, block=False, |
|
127 | mid = self.client.apply(f, args=args, block=False, | |
128 | bound=self.bound, |
|
128 | bound=self.bound, | |
129 | targets=engineid).msg_ids[0] |
|
129 | targets=engineid)._msg_ids[0] | |
130 | msg_ids.append(mid) |
|
130 | msg_ids.append(mid) | |
131 |
|
131 | |||
132 |
r = |
|
132 | r = AsyncMapResult(self.client, msg_ids, self.mapObject) | |
133 | if self.block: |
|
133 | if self.block: | |
134 | r.wait() |
|
134 | r.wait() | |
135 | return r.result |
|
135 | return r.result |
1 | NO CONTENT: file was removed |
|
NO CONTENT: file was removed |
General Comments 0
You need to be logged in to leave comments.
Login now