##// END OF EJS Templates
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
MinRK -
Show More
@@ -0,0 +1,112 b''
1 """AsyncResult objects for the client"""
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
4 #
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
8
9 #-----------------------------------------------------------------------------
10 # Imports
11 #-----------------------------------------------------------------------------
12
13 import error
14
15 #-----------------------------------------------------------------------------
16 # Classes
17 #-----------------------------------------------------------------------------
18
19 class AsyncResult(object):
20 """Class for representing results of non-blocking calls.
21
22 Provides the same interface as :py:class:`multiprocessing.AsyncResult`.
23 """
24 def __init__(self, client, msg_ids):
25 self._client = client
26 self._msg_ids = msg_ids
27 self._ready = False
28 self._success = None
29
30 def __repr__(self):
31 if self._ready:
32 return "<%s: finished>"%(self.__class__.__name__)
33 else:
34 return "<%s: %r>"%(self.__class__.__name__,self._msg_ids)
35
36
37 def _reconstruct_result(self, res):
38 """
39 Override me in subclasses for turning a list of results
40 into the expected form.
41 """
42 if len(res) == 1:
43 return res[0]
44 else:
45 return res
46
47 def get(self, timeout=-1):
48 """Return the result when it arrives.
49
50 If `timeout` is not ``None`` and the result does not arrive within
51 `timeout` seconds then ``TimeoutError`` is raised. If the
52 remote call raised an exception then that exception will be reraised
53 by get().
54 """
55 if not self.ready():
56 self.wait(timeout)
57
58 if self._ready:
59 if self._success:
60 return self._result
61 else:
62 raise self._exception
63 else:
64 raise error.TimeoutError("Result not ready.")
65
66 def ready(self):
67 """Return whether the call has completed."""
68 if not self._ready:
69 self.wait(0)
70 return self._ready
71
72 def wait(self, timeout=-1):
73 """Wait until the result is available or until `timeout` seconds pass.
74 """
75 if self._ready:
76 return
77 self._ready = self._client.barrier(self._msg_ids, timeout)
78 if self._ready:
79 try:
80 results = map(self._client.results.get, self._msg_ids)
81 results = error.collect_exceptions(results, 'get')
82 self._result = self._reconstruct_result(results)
83 except Exception, e:
84 self._exception = e
85 self._success = False
86 else:
87 self._success = True
88
89
90 def successful(self):
91 """Return whether the call completed without raising an exception.
92
93 Will raise ``AssertionError`` if the result is not ready.
94 """
95 assert self._ready
96 return self._success
97
98 class AsyncMapResult(AsyncResult):
99 """Class for representing results of non-blocking gathers.
100
101 This will properly reconstruct the gather.
102 """
103
104 def __init__(self, client, msg_ids, mapObject):
105 self._mapObject = mapObject
106 AsyncResult.__init__(self, client, msg_ids)
107
108 def _reconstruct_result(self, res):
109 """Perform the gather on the actual results."""
110 return self._mapObject.joinPartitions(res)
111
112
@@ -29,7 +29,7 b' from view import DirectView, LoadBalancedView'
29 29 from dependency import Dependency, depend, require
30 30 import error
31 31 import map as Map
32 from pendingresult import PendingResult,PendingMapResult
32 from asyncresult import AsyncResult, AsyncMapResult
33 33 from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
34 34
35 35 #--------------------------------------------------------------------------
@@ -746,7 +746,7 b' class Client(object):'
746 746 self.barrier(msg_id)
747 747 return self._maybe_raise(self.results[msg_id])
748 748 else:
749 return PendingResult(self, [msg_id])
749 return AsyncResult(self, [msg_id])
750 750
751 751 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
752 752 after=None, follow=None):
@@ -776,7 +776,7 b' class Client(object):'
776 776 if block:
777 777 self.barrier(msg_ids)
778 778 else:
779 return PendingResult(self, msg_ids)
779 return AsyncResult(self, msg_ids)
780 780 if len(msg_ids) == 1:
781 781 return self._maybe_raise(self.results[msg_ids[0]])
782 782 else:
@@ -785,12 +785,24 b' class Client(object):'
785 785 result[target] = self.results[mid]
786 786 return error.collect_exceptions(result, f.__name__)
787 787
788 #--------------------------------------------------------------------------
789 # Map and decorators
790 #--------------------------------------------------------------------------
791
788 792 def map(self, f, *sequences):
789 793 """Parallel version of builtin `map`, using all our engines."""
790 794 pf = ParallelFunction(self, f, block=self.block,
791 795 bound=True, targets='all')
792 796 return pf.map(*sequences)
793 797
798 def parallel(self, bound=True, targets='all', block=True):
799 """Decorator for making a ParallelFunction"""
800 return parallel(self, bound=bound, targets=targets, block=block)
801
802 def remote(self, bound=True, targets='all', block=True):
803 """Decorator for making a RemoteFunction"""
804 return remote(self, bound=bound, targets=targets, block=block)
805
794 806 #--------------------------------------------------------------------------
795 807 # Data movement
796 808 #--------------------------------------------------------------------------
@@ -831,7 +843,7 b' class Client(object):'
831 843 else:
832 844 mid = self.push({key: partition}, targets=engineid, block=False)
833 845 msg_ids.append(mid)
834 r = PendingResult(self, msg_ids)
846 r = AsyncResult(self, msg_ids)
835 847 if block:
836 848 r.wait()
837 849 return
@@ -850,7 +862,7 b' class Client(object):'
850 862 for index, engineid in enumerate(targets):
851 863 msg_ids.append(self.pull(key, targets=engineid,block=False))
852 864
853 r = PendingMapResult(self, msg_ids, mapObject)
865 r = AsyncMapResult(self, msg_ids, mapObject)
854 866 if block:
855 867 r.wait()
856 868 return r.result
@@ -1002,6 +1014,6 b" __all__ = [ 'Client',"
1002 1014 'ParallelFunction',
1003 1015 'DirectView',
1004 1016 'LoadBalancedView',
1005 'PendingResult',
1006 'PendingMapResult'
1017 'AsyncResult',
1018 'AsyncMapResult'
1007 1019 ]
@@ -145,6 +145,9 b' class SecurityError(KernelError):'
145 145 class FileTimeoutError(KernelError):
146 146 pass
147 147
148 class TimeoutError(KernelError):
149 pass
150
148 151 class RemoteError(KernelError):
149 152 """Error raised elsewhere"""
150 153 ename=None
@@ -9,7 +9,7 b' import time'
9 9 import uuid
10 10
11 11 import zmq
12 from zmq.devices import ProcessDevice
12 from zmq.devices import ProcessDevice,ThreadDevice
13 13 from zmq.eventloop import ioloop, zmqstream
14 14
15 15 #internal
@@ -27,7 +27,7 b' class Heart(object):'
27 27 device=None
28 28 id=None
29 29 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_id=None):
30 self.device = ProcessDevice(zmq.FORWARDER, in_type, out_type)
30 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
31 31 self.device.daemon=True
32 32 self.device.connect_in(in_addr)
33 33 self.device.connect_out(out_addr)
@@ -11,7 +11,7 b''
11 11 #-----------------------------------------------------------------------------
12 12
13 13 import map as Map
14 from pendingresult import PendingMapResult
14 from asyncresult import AsyncMapResult
15 15
16 16 #-----------------------------------------------------------------------------
17 17 # Decorators
@@ -126,10 +126,10 b' class ParallelFunction(RemoteFunction):'
126 126 f=self.func
127 127 mid = self.client.apply(f, args=args, block=False,
128 128 bound=self.bound,
129 targets=engineid).msg_ids[0]
129 targets=engineid)._msg_ids[0]
130 130 msg_ids.append(mid)
131 131
132 r = PendingMapResult(self.client, msg_ids, self.mapObject)
132 r = AsyncMapResult(self.client, msg_ids, self.mapObject)
133 133 if self.block:
134 134 r.wait()
135 135 return r.result
1 NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now