##// END OF EJS Templates
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
MinRK -
Show More
@@ -0,0 +1,112 b''
1 """AsyncResult objects for the client"""
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
4 #
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
8
9 #-----------------------------------------------------------------------------
10 # Imports
11 #-----------------------------------------------------------------------------
12
13 import error
14
15 #-----------------------------------------------------------------------------
16 # Classes
17 #-----------------------------------------------------------------------------
18
19 class AsyncResult(object):
20 """Class for representing results of non-blocking calls.
21
22 Provides the same interface as :py:class:`multiprocessing.AsyncResult`.
23 """
24 def __init__(self, client, msg_ids):
25 self._client = client
26 self._msg_ids = msg_ids
27 self._ready = False
28 self._success = None
29
30 def __repr__(self):
31 if self._ready:
32 return "<%s: finished>"%(self.__class__.__name__)
33 else:
34 return "<%s: %r>"%(self.__class__.__name__,self._msg_ids)
35
36
37 def _reconstruct_result(self, res):
38 """
39 Override me in subclasses for turning a list of results
40 into the expected form.
41 """
42 if len(res) == 1:
43 return res[0]
44 else:
45 return res
46
47 def get(self, timeout=-1):
48 """Return the result when it arrives.
49
50 If `timeout` is not ``None`` and the result does not arrive within
51 `timeout` seconds then ``TimeoutError`` is raised. If the
52 remote call raised an exception then that exception will be reraised
53 by get().
54 """
55 if not self.ready():
56 self.wait(timeout)
57
58 if self._ready:
59 if self._success:
60 return self._result
61 else:
62 raise self._exception
63 else:
64 raise error.TimeoutError("Result not ready.")
65
66 def ready(self):
67 """Return whether the call has completed."""
68 if not self._ready:
69 self.wait(0)
70 return self._ready
71
72 def wait(self, timeout=-1):
73 """Wait until the result is available or until `timeout` seconds pass.
74 """
75 if self._ready:
76 return
77 self._ready = self._client.barrier(self._msg_ids, timeout)
78 if self._ready:
79 try:
80 results = map(self._client.results.get, self._msg_ids)
81 results = error.collect_exceptions(results, 'get')
82 self._result = self._reconstruct_result(results)
83 except Exception, e:
84 self._exception = e
85 self._success = False
86 else:
87 self._success = True
88
89
90 def successful(self):
91 """Return whether the call completed without raising an exception.
92
93 Will raise ``AssertionError`` if the result is not ready.
94 """
95 assert self._ready
96 return self._success
97
98 class AsyncMapResult(AsyncResult):
99 """Class for representing results of non-blocking gathers.
100
101 This will properly reconstruct the gather.
102 """
103
104 def __init__(self, client, msg_ids, mapObject):
105 self._mapObject = mapObject
106 AsyncResult.__init__(self, client, msg_ids)
107
108 def _reconstruct_result(self, res):
109 """Perform the gather on the actual results."""
110 return self._mapObject.joinPartitions(res)
111
112
@@ -29,7 +29,7 b' from view import DirectView, LoadBalancedView'
29 from dependency import Dependency, depend, require
29 from dependency import Dependency, depend, require
30 import error
30 import error
31 import map as Map
31 import map as Map
32 from pendingresult import PendingResult,PendingMapResult
32 from asyncresult import AsyncResult, AsyncMapResult
33 from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
33 from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
34
34
35 #--------------------------------------------------------------------------
35 #--------------------------------------------------------------------------
@@ -746,7 +746,7 b' class Client(object):'
746 self.barrier(msg_id)
746 self.barrier(msg_id)
747 return self._maybe_raise(self.results[msg_id])
747 return self._maybe_raise(self.results[msg_id])
748 else:
748 else:
749 return PendingResult(self, [msg_id])
749 return AsyncResult(self, [msg_id])
750
750
751 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
751 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
752 after=None, follow=None):
752 after=None, follow=None):
@@ -776,7 +776,7 b' class Client(object):'
776 if block:
776 if block:
777 self.barrier(msg_ids)
777 self.barrier(msg_ids)
778 else:
778 else:
779 return PendingResult(self, msg_ids)
779 return AsyncResult(self, msg_ids)
780 if len(msg_ids) == 1:
780 if len(msg_ids) == 1:
781 return self._maybe_raise(self.results[msg_ids[0]])
781 return self._maybe_raise(self.results[msg_ids[0]])
782 else:
782 else:
@@ -785,12 +785,24 b' class Client(object):'
785 result[target] = self.results[mid]
785 result[target] = self.results[mid]
786 return error.collect_exceptions(result, f.__name__)
786 return error.collect_exceptions(result, f.__name__)
787
787
788 #--------------------------------------------------------------------------
789 # Map and decorators
790 #--------------------------------------------------------------------------
791
788 def map(self, f, *sequences):
792 def map(self, f, *sequences):
789 """Parallel version of builtin `map`, using all our engines."""
793 """Parallel version of builtin `map`, using all our engines."""
790 pf = ParallelFunction(self, f, block=self.block,
794 pf = ParallelFunction(self, f, block=self.block,
791 bound=True, targets='all')
795 bound=True, targets='all')
792 return pf.map(*sequences)
796 return pf.map(*sequences)
793
797
798 def parallel(self, bound=True, targets='all', block=True):
799 """Decorator for making a ParallelFunction"""
800 return parallel(self, bound=bound, targets=targets, block=block)
801
802 def remote(self, bound=True, targets='all', block=True):
803 """Decorator for making a RemoteFunction"""
804 return remote(self, bound=bound, targets=targets, block=block)
805
794 #--------------------------------------------------------------------------
806 #--------------------------------------------------------------------------
795 # Data movement
807 # Data movement
796 #--------------------------------------------------------------------------
808 #--------------------------------------------------------------------------
@@ -831,7 +843,7 b' class Client(object):'
831 else:
843 else:
832 mid = self.push({key: partition}, targets=engineid, block=False)
844 mid = self.push({key: partition}, targets=engineid, block=False)
833 msg_ids.append(mid)
845 msg_ids.append(mid)
834 r = PendingResult(self, msg_ids)
846 r = AsyncResult(self, msg_ids)
835 if block:
847 if block:
836 r.wait()
848 r.wait()
837 return
849 return
@@ -850,7 +862,7 b' class Client(object):'
850 for index, engineid in enumerate(targets):
862 for index, engineid in enumerate(targets):
851 msg_ids.append(self.pull(key, targets=engineid,block=False))
863 msg_ids.append(self.pull(key, targets=engineid,block=False))
852
864
853 r = PendingMapResult(self, msg_ids, mapObject)
865 r = AsyncMapResult(self, msg_ids, mapObject)
854 if block:
866 if block:
855 r.wait()
867 r.wait()
856 return r.result
868 return r.result
@@ -1002,6 +1014,6 b" __all__ = [ 'Client',"
1002 'ParallelFunction',
1014 'ParallelFunction',
1003 'DirectView',
1015 'DirectView',
1004 'LoadBalancedView',
1016 'LoadBalancedView',
1005 'PendingResult',
1017 'AsyncResult',
1006 'PendingMapResult'
1018 'AsyncMapResult'
1007 ]
1019 ]
@@ -145,6 +145,9 b' class SecurityError(KernelError):'
145 class FileTimeoutError(KernelError):
145 class FileTimeoutError(KernelError):
146 pass
146 pass
147
147
148 class TimeoutError(KernelError):
149 pass
150
148 class RemoteError(KernelError):
151 class RemoteError(KernelError):
149 """Error raised elsewhere"""
152 """Error raised elsewhere"""
150 ename=None
153 ename=None
@@ -9,7 +9,7 b' import time'
9 import uuid
9 import uuid
10
10
11 import zmq
11 import zmq
12 from zmq.devices import ProcessDevice
12 from zmq.devices import ProcessDevice,ThreadDevice
13 from zmq.eventloop import ioloop, zmqstream
13 from zmq.eventloop import ioloop, zmqstream
14
14
15 #internal
15 #internal
@@ -27,7 +27,7 b' class Heart(object):'
27 device=None
27 device=None
28 id=None
28 id=None
29 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_id=None):
29 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_id=None):
30 self.device = ProcessDevice(zmq.FORWARDER, in_type, out_type)
30 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
31 self.device.daemon=True
31 self.device.daemon=True
32 self.device.connect_in(in_addr)
32 self.device.connect_in(in_addr)
33 self.device.connect_out(out_addr)
33 self.device.connect_out(out_addr)
@@ -11,7 +11,7 b''
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 import map as Map
13 import map as Map
14 from pendingresult import PendingMapResult
14 from asyncresult import AsyncMapResult
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Decorators
17 # Decorators
@@ -126,10 +126,10 b' class ParallelFunction(RemoteFunction):'
126 f=self.func
126 f=self.func
127 mid = self.client.apply(f, args=args, block=False,
127 mid = self.client.apply(f, args=args, block=False,
128 bound=self.bound,
128 bound=self.bound,
129 targets=engineid).msg_ids[0]
129 targets=engineid)._msg_ids[0]
130 msg_ids.append(mid)
130 msg_ids.append(mid)
131
131
132 r = PendingMapResult(self.client, msg_ids, self.mapObject)
132 r = AsyncMapResult(self.client, msg_ids, self.mapObject)
133 if self.block:
133 if self.block:
134 r.wait()
134 r.wait()
135 return r.result
135 return r.result
1 NO CONTENT: file was removed
NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now