Show More
@@ -0,0 +1,112 b'' | |||
|
1 | """AsyncResult objects for the client""" | |
|
2 | #----------------------------------------------------------------------------- | |
|
3 | # Copyright (C) 2010 The IPython Development Team | |
|
4 | # | |
|
5 | # Distributed under the terms of the BSD License. The full license is in | |
|
6 | # the file COPYING, distributed as part of this software. | |
|
7 | #----------------------------------------------------------------------------- | |
|
8 | ||
|
9 | #----------------------------------------------------------------------------- | |
|
10 | # Imports | |
|
11 | #----------------------------------------------------------------------------- | |
|
12 | ||
|
13 | import error | |
|
14 | ||
|
15 | #----------------------------------------------------------------------------- | |
|
16 | # Classes | |
|
17 | #----------------------------------------------------------------------------- | |
|
18 | ||
|
19 | class AsyncResult(object): | |
|
20 | """Class for representing results of non-blocking calls. | |
|
21 | ||
|
22 | Provides the same interface as :py:class:`multiprocessing.AsyncResult`. | |
|
23 | """ | |
|
24 | def __init__(self, client, msg_ids): | |
|
25 | self._client = client | |
|
26 | self._msg_ids = msg_ids | |
|
27 | self._ready = False | |
|
28 | self._success = None | |
|
29 | ||
|
30 | def __repr__(self): | |
|
31 | if self._ready: | |
|
32 | return "<%s: finished>"%(self.__class__.__name__) | |
|
33 | else: | |
|
34 | return "<%s: %r>"%(self.__class__.__name__,self._msg_ids) | |
|
35 | ||
|
36 | ||
|
37 | def _reconstruct_result(self, res): | |
|
38 | """ | |
|
39 | Override me in subclasses for turning a list of results | |
|
40 | into the expected form. | |
|
41 | """ | |
|
42 | if len(res) == 1: | |
|
43 | return res[0] | |
|
44 | else: | |
|
45 | return res | |
|
46 | ||
|
47 | def get(self, timeout=-1): | |
|
48 | """Return the result when it arrives. | |
|
49 | ||
|
50 | If `timeout` is not ``None`` and the result does not arrive within | |
|
51 | `timeout` seconds then ``TimeoutError`` is raised. If the | |
|
52 | remote call raised an exception then that exception will be reraised | |
|
53 | by get(). | |
|
54 | """ | |
|
55 | if not self.ready(): | |
|
56 | self.wait(timeout) | |
|
57 | ||
|
58 | if self._ready: | |
|
59 | if self._success: | |
|
60 | return self._result | |
|
61 | else: | |
|
62 | raise self._exception | |
|
63 | else: | |
|
64 | raise error.TimeoutError("Result not ready.") | |
|
65 | ||
|
66 | def ready(self): | |
|
67 | """Return whether the call has completed.""" | |
|
68 | if not self._ready: | |
|
69 | self.wait(0) | |
|
70 | return self._ready | |
|
71 | ||
|
72 | def wait(self, timeout=-1): | |
|
73 | """Wait until the result is available or until `timeout` seconds pass. | |
|
74 | """ | |
|
75 | if self._ready: | |
|
76 | return | |
|
77 | self._ready = self._client.barrier(self._msg_ids, timeout) | |
|
78 | if self._ready: | |
|
79 | try: | |
|
80 | results = map(self._client.results.get, self._msg_ids) | |
|
81 | results = error.collect_exceptions(results, 'get') | |
|
82 | self._result = self._reconstruct_result(results) | |
|
83 | except Exception, e: | |
|
84 | self._exception = e | |
|
85 | self._success = False | |
|
86 | else: | |
|
87 | self._success = True | |
|
88 | ||
|
89 | ||
|
90 | def successful(self): | |
|
91 | """Return whether the call completed without raising an exception. | |
|
92 | ||
|
93 | Will raise ``AssertionError`` if the result is not ready. | |
|
94 | """ | |
|
95 | assert self._ready | |
|
96 | return self._success | |
|
97 | ||
|
98 | class AsyncMapResult(AsyncResult): | |
|
99 | """Class for representing results of non-blocking gathers. | |
|
100 | ||
|
101 | This will properly reconstruct the gather. | |
|
102 | """ | |
|
103 | ||
|
104 | def __init__(self, client, msg_ids, mapObject): | |
|
105 | self._mapObject = mapObject | |
|
106 | AsyncResult.__init__(self, client, msg_ids) | |
|
107 | ||
|
108 | def _reconstruct_result(self, res): | |
|
109 | """Perform the gather on the actual results.""" | |
|
110 | return self._mapObject.joinPartitions(res) | |
|
111 | ||
|
112 |
@@ -29,7 +29,7 b' from view import DirectView, LoadBalancedView' | |||
|
29 | 29 | from dependency import Dependency, depend, require |
|
30 | 30 | import error |
|
31 | 31 | import map as Map |
|
32 |
from |
|
|
32 | from asyncresult import AsyncResult, AsyncMapResult | |
|
33 | 33 | from remotefunction import remote,parallel,ParallelFunction,RemoteFunction |
|
34 | 34 | |
|
35 | 35 | #-------------------------------------------------------------------------- |
@@ -746,7 +746,7 b' class Client(object):' | |||
|
746 | 746 | self.barrier(msg_id) |
|
747 | 747 | return self._maybe_raise(self.results[msg_id]) |
|
748 | 748 | else: |
|
749 |
return |
|
|
749 | return AsyncResult(self, [msg_id]) | |
|
750 | 750 | |
|
751 | 751 | def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None, |
|
752 | 752 | after=None, follow=None): |
@@ -776,7 +776,7 b' class Client(object):' | |||
|
776 | 776 | if block: |
|
777 | 777 | self.barrier(msg_ids) |
|
778 | 778 | else: |
|
779 |
return |
|
|
779 | return AsyncResult(self, msg_ids) | |
|
780 | 780 | if len(msg_ids) == 1: |
|
781 | 781 | return self._maybe_raise(self.results[msg_ids[0]]) |
|
782 | 782 | else: |
@@ -785,12 +785,24 b' class Client(object):' | |||
|
785 | 785 | result[target] = self.results[mid] |
|
786 | 786 | return error.collect_exceptions(result, f.__name__) |
|
787 | 787 | |
|
788 | #-------------------------------------------------------------------------- | |
|
789 | # Map and decorators | |
|
790 | #-------------------------------------------------------------------------- | |
|
791 | ||
|
788 | 792 | def map(self, f, *sequences): |
|
789 | 793 | """Parallel version of builtin `map`, using all our engines.""" |
|
790 | 794 | pf = ParallelFunction(self, f, block=self.block, |
|
791 | 795 | bound=True, targets='all') |
|
792 | 796 | return pf.map(*sequences) |
|
793 | 797 | |
|
798 | def parallel(self, bound=True, targets='all', block=True): | |
|
799 | """Decorator for making a ParallelFunction""" | |
|
800 | return parallel(self, bound=bound, targets=targets, block=block) | |
|
801 | ||
|
802 | def remote(self, bound=True, targets='all', block=True): | |
|
803 | """Decorator for making a RemoteFunction""" | |
|
804 | return remote(self, bound=bound, targets=targets, block=block) | |
|
805 | ||
|
794 | 806 | #-------------------------------------------------------------------------- |
|
795 | 807 | # Data movement |
|
796 | 808 | #-------------------------------------------------------------------------- |
@@ -831,7 +843,7 b' class Client(object):' | |||
|
831 | 843 | else: |
|
832 | 844 | mid = self.push({key: partition}, targets=engineid, block=False) |
|
833 | 845 | msg_ids.append(mid) |
|
834 |
r = |
|
|
846 | r = AsyncResult(self, msg_ids) | |
|
835 | 847 | if block: |
|
836 | 848 | r.wait() |
|
837 | 849 | return |
@@ -850,7 +862,7 b' class Client(object):' | |||
|
850 | 862 | for index, engineid in enumerate(targets): |
|
851 | 863 | msg_ids.append(self.pull(key, targets=engineid,block=False)) |
|
852 | 864 | |
|
853 |
r = |
|
|
865 | r = AsyncMapResult(self, msg_ids, mapObject) | |
|
854 | 866 | if block: |
|
855 | 867 | r.wait() |
|
856 | 868 | return r.result |
@@ -1002,6 +1014,6 b" __all__ = [ 'Client'," | |||
|
1002 | 1014 | 'ParallelFunction', |
|
1003 | 1015 | 'DirectView', |
|
1004 | 1016 | 'LoadBalancedView', |
|
1005 |
' |
|
|
1006 |
' |
|
|
1017 | 'AsyncResult', | |
|
1018 | 'AsyncMapResult' | |
|
1007 | 1019 | ] |
@@ -145,6 +145,9 b' class SecurityError(KernelError):' | |||
|
145 | 145 | class FileTimeoutError(KernelError): |
|
146 | 146 | pass |
|
147 | 147 | |
|
148 | class TimeoutError(KernelError): | |
|
149 | pass | |
|
150 | ||
|
148 | 151 | class RemoteError(KernelError): |
|
149 | 152 | """Error raised elsewhere""" |
|
150 | 153 | ename=None |
@@ -9,7 +9,7 b' import time' | |||
|
9 | 9 | import uuid |
|
10 | 10 | |
|
11 | 11 | import zmq |
|
12 | from zmq.devices import ProcessDevice | |
|
12 | from zmq.devices import ProcessDevice,ThreadDevice | |
|
13 | 13 | from zmq.eventloop import ioloop, zmqstream |
|
14 | 14 | |
|
15 | 15 | #internal |
@@ -27,7 +27,7 b' class Heart(object):' | |||
|
27 | 27 | device=None |
|
28 | 28 | id=None |
|
29 | 29 | def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_id=None): |
|
30 |
self.device = |
|
|
30 | self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type) | |
|
31 | 31 | self.device.daemon=True |
|
32 | 32 | self.device.connect_in(in_addr) |
|
33 | 33 | self.device.connect_out(out_addr) |
@@ -11,7 +11,7 b'' | |||
|
11 | 11 | #----------------------------------------------------------------------------- |
|
12 | 12 | |
|
13 | 13 | import map as Map |
|
14 |
from |
|
|
14 | from asyncresult import AsyncMapResult | |
|
15 | 15 | |
|
16 | 16 | #----------------------------------------------------------------------------- |
|
17 | 17 | # Decorators |
@@ -126,10 +126,10 b' class ParallelFunction(RemoteFunction):' | |||
|
126 | 126 | f=self.func |
|
127 | 127 | mid = self.client.apply(f, args=args, block=False, |
|
128 | 128 | bound=self.bound, |
|
129 | targets=engineid).msg_ids[0] | |
|
129 | targets=engineid)._msg_ids[0] | |
|
130 | 130 | msg_ids.append(mid) |
|
131 | 131 | |
|
132 |
r = |
|
|
132 | r = AsyncMapResult(self.client, msg_ids, self.mapObject) | |
|
133 | 133 | if self.block: |
|
134 | 134 | r.wait() |
|
135 | 135 | return r.result |
|
1 | NO CONTENT: file was removed |
General Comments 0
You need to be logged in to leave comments.
Login now