##// END OF EJS Templates
add rich AsyncResult behavior
MinRK -
Show More
@@ -10,12 +10,21 b''
10 10 # Imports
11 11 #-----------------------------------------------------------------------------
12 12
13 from IPython.external.decorator import decorator
13 14 import error
14 15
15 16 #-----------------------------------------------------------------------------
16 17 # Classes
17 18 #-----------------------------------------------------------------------------
18 19
20 @decorator
21 def check_ready(f, self, *args, **kwargs):
22 """Call spin() to sync state prior to calling the method."""
23 self.wait(0)
24 if not self._ready:
25 raise error.TimeoutError("result not ready")
26 return f(self, *args, **kwargs)
27
19 28 class AsyncResult(object):
20 29 """Class for representing results of non-blocking calls.
21 30
@@ -79,6 +88,7 b' class AsyncResult(object):'
79 88 if self._ready:
80 89 try:
81 90 results = map(self._client.results.get, self.msg_ids)
91 self._result = results
82 92 results = error.collect_exceptions(results, self._fname)
83 93 self._result = self._reconstruct_result(results)
84 94 except Exception, e:
@@ -86,6 +96,8 b' class AsyncResult(object):'
86 96 self._success = False
87 97 else:
88 98 self._success = True
99 finally:
100 self._metadata = map(self._client.metadata.get, self.msg_ids)
89 101
90 102
91 103 def successful(self):
@@ -95,7 +107,67 b' class AsyncResult(object):'
95 107 """
96 108 assert self._ready
97 109 return self._success
110
111 #----------------------------------------------------------------
112 # Extra methods not in mp.pool.AsyncResult
113 #----------------------------------------------------------------
114
115 def get_dict(self, timeout=-1):
116 """Get the results as a dict, keyed by engine_id."""
117 results = self.get(timeout)
118 engine_ids = [md['engine_id'] for md in self._metadata ]
119 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
120 maxcount = bycount.count(bycount[-1])
121 if maxcount > 1:
122 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
123 maxcount, bycount[-1]))
124
125 return dict(zip(engine_ids,results))
126
127 @property
128 @check_ready
129 def result(self):
130 """result property."""
131 return self._result
132
133 @property
134 @check_ready
135 def metadata(self):
136 """metadata property."""
137 return self._metadata
138
139 @property
140 @check_ready
141 def result_dict(self):
142 """result property as a dict."""
143 return self.get_dict(0)
144
145 #-------------------------------------
146 # dict-access
147 #-------------------------------------
148
149 @check_ready
150 def __getitem__(self, key):
151 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
152 """
153 if isinstance(key, int):
154 return error.collect_exceptions([self._result[key]], self._fname)[0]
155 elif isinstance(key, slice):
156 return error.collect_exceptions(self._result[key], self._fname)
157 elif isinstance(key, basestring):
158 return [ md[key] for md in self._metadata ]
159 else:
160 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
161
162 @check_ready
163 def __getattr__(self, key):
164 """getattr maps to getitem for convenient access to metadata."""
165 if key not in self._metadata[0].keys():
166 raise AttributeError("%r object has no attribute %r"%(
167 self.__class__.__name__, key))
168 return self.__getitem__(key)
98 169
170
99 171 class AsyncMapResult(AsyncResult):
100 172 """Class for representing results of non-blocking gathers.
101 173
@@ -111,3 +183,4 b' class AsyncMapResult(AsyncResult):'
111 183 return self._mapObject.joinPartitions(res)
112 184
113 185
186 __all__ = ['AsyncResult', 'AsyncMapResult'] No newline at end of file
@@ -250,7 +250,7 b' class CompositeError(KernelError):'
250 250 et,ev,tb = sys.exc_info()
251 251
252 252
253 def collect_exceptions(rdict_or_list, method):
253 def collect_exceptions(rdict_or_list, method='unspecified'):
254 254 """check a result dict for errors, and raise CompositeError if any exist.
255 255 Passthrough otherwise."""
256 256 elist = []
General Comments 0
You need to be logged in to leave comments. Login now