##// END OF EJS Templates
add rich AsyncResult behavior
MinRK -
Show More
@@ -10,12 +10,21 b''
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 from IPython.external.decorator import decorator
13 import error
14 import error
14
15
15 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
16 # Classes
17 # Classes
17 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
18
19
20 @decorator
21 def check_ready(f, self, *args, **kwargs):
22 """Call spin() to sync state prior to calling the method."""
23 self.wait(0)
24 if not self._ready:
25 raise error.TimeoutError("result not ready")
26 return f(self, *args, **kwargs)
27
19 class AsyncResult(object):
28 class AsyncResult(object):
20 """Class for representing results of non-blocking calls.
29 """Class for representing results of non-blocking calls.
21
30
@@ -79,6 +88,7 b' class AsyncResult(object):'
79 if self._ready:
88 if self._ready:
80 try:
89 try:
81 results = map(self._client.results.get, self.msg_ids)
90 results = map(self._client.results.get, self.msg_ids)
91 self._result = results
82 results = error.collect_exceptions(results, self._fname)
92 results = error.collect_exceptions(results, self._fname)
83 self._result = self._reconstruct_result(results)
93 self._result = self._reconstruct_result(results)
84 except Exception, e:
94 except Exception, e:
@@ -86,6 +96,8 b' class AsyncResult(object):'
86 self._success = False
96 self._success = False
87 else:
97 else:
88 self._success = True
98 self._success = True
99 finally:
100 self._metadata = map(self._client.metadata.get, self.msg_ids)
89
101
90
102
91 def successful(self):
103 def successful(self):
@@ -95,7 +107,67 b' class AsyncResult(object):'
95 """
107 """
96 assert self._ready
108 assert self._ready
97 return self._success
109 return self._success
110
111 #----------------------------------------------------------------
112 # Extra methods not in mp.pool.AsyncResult
113 #----------------------------------------------------------------
114
115 def get_dict(self, timeout=-1):
116 """Get the results as a dict, keyed by engine_id."""
117 results = self.get(timeout)
118 engine_ids = [md['engine_id'] for md in self._metadata ]
119 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
120 maxcount = bycount.count(bycount[-1])
121 if maxcount > 1:
122 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
123 maxcount, bycount[-1]))
124
125 return dict(zip(engine_ids,results))
126
127 @property
128 @check_ready
129 def result(self):
130 """result property."""
131 return self._result
132
133 @property
134 @check_ready
135 def metadata(self):
136 """metadata property."""
137 return self._metadata
138
139 @property
140 @check_ready
141 def result_dict(self):
142 """result property as a dict."""
143 return self.get_dict(0)
144
145 #-------------------------------------
146 # dict-access
147 #-------------------------------------
148
149 @check_ready
150 def __getitem__(self, key):
151 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
152 """
153 if isinstance(key, int):
154 return error.collect_exceptions([self._result[key]], self._fname)[0]
155 elif isinstance(key, slice):
156 return error.collect_exceptions(self._result[key], self._fname)
157 elif isinstance(key, basestring):
158 return [ md[key] for md in self._metadata ]
159 else:
160 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
161
162 @check_ready
163 def __getattr__(self, key):
164 """getattr maps to getitem for convenient access to metadata."""
165 if key not in self._metadata[0].keys():
166 raise AttributeError("%r object has no attribute %r"%(
167 self.__class__.__name__, key))
168 return self.__getitem__(key)
98
169
170
99 class AsyncMapResult(AsyncResult):
171 class AsyncMapResult(AsyncResult):
100 """Class for representing results of non-blocking gathers.
172 """Class for representing results of non-blocking gathers.
101
173
@@ -111,3 +183,4 b' class AsyncMapResult(AsyncResult):'
111 return self._mapObject.joinPartitions(res)
183 return self._mapObject.joinPartitions(res)
112
184
113
185
186 __all__ = ['AsyncResult', 'AsyncMapResult'] No newline at end of file
@@ -250,7 +250,7 b' class CompositeError(KernelError):'
250 et,ev,tb = sys.exc_info()
250 et,ev,tb = sys.exc_info()
251
251
252
252
253 def collect_exceptions(rdict_or_list, method):
253 def collect_exceptions(rdict_or_list, method='unspecified'):
254 """check a result dict for errors, and raise CompositeError if any exist.
254 """check a result dict for errors, and raise CompositeError if any exist.
255 Passthrough otherwise."""
255 Passthrough otherwise."""
256 elist = []
256 elist = []
General Comments 0
You need to be logged in to leave comments. Login now