##// END OF EJS Templates
add sugar methods/properties to AsyncResult...
MinRK -
Show More
@@ -1,396 +1,469 b''
1 """AsyncResult objects for the client
1 """AsyncResult objects for the client
2
2
3 Authors:
3 Authors:
4
4
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import sys
18 import time
19 import time
20 from datetime import datetime
19
21
20 from zmq import MessageTracker
22 from zmq import MessageTracker
21
23
24 from IPython.core.display import clear_output
22 from IPython.external.decorator import decorator
25 from IPython.external.decorator import decorator
23 from IPython.parallel import error
26 from IPython.parallel import error
24
27
28
25 #-----------------------------------------------------------------------------
29 #-----------------------------------------------------------------------------
26 # Classes
30 # Classes
27 #-----------------------------------------------------------------------------
31 #-----------------------------------------------------------------------------
28
32
29 # global empty tracker that's always done:
33 # global empty tracker that's always done:
30 finished_tracker = MessageTracker()
34 finished_tracker = MessageTracker()
31
35
32 @decorator
36 @decorator
33 def check_ready(f, self, *args, **kwargs):
37 def check_ready(f, self, *args, **kwargs):
34 """Call spin() to sync state prior to calling the method."""
38 """Call spin() to sync state prior to calling the method."""
35 self.wait(0)
39 self.wait(0)
36 if not self._ready:
40 if not self._ready:
37 raise error.TimeoutError("result not ready")
41 raise error.TimeoutError("result not ready")
38 return f(self, *args, **kwargs)
42 return f(self, *args, **kwargs)
39
43
40 class AsyncResult(object):
44 class AsyncResult(object):
41 """Class for representing results of non-blocking calls.
45 """Class for representing results of non-blocking calls.
42
46
43 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
47 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
44 """
48 """
45
49
46 msg_ids = None
50 msg_ids = None
47 _targets = None
51 _targets = None
48 _tracker = None
52 _tracker = None
49 _single_result = False
53 _single_result = False
50
54
51 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
55 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
52 if isinstance(msg_ids, basestring):
56 if isinstance(msg_ids, basestring):
53 # always a list
57 # always a list
54 msg_ids = [msg_ids]
58 msg_ids = [msg_ids]
55 if tracker is None:
59 if tracker is None:
56 # default to always done
60 # default to always done
57 tracker = finished_tracker
61 tracker = finished_tracker
58 self._client = client
62 self._client = client
59 self.msg_ids = msg_ids
63 self.msg_ids = msg_ids
60 self._fname=fname
64 self._fname=fname
61 self._targets = targets
65 self._targets = targets
62 self._tracker = tracker
66 self._tracker = tracker
63 self._ready = False
67 self._ready = False
64 self._success = None
68 self._success = None
65 self._metadata = None
69 self._metadata = None
66 if len(msg_ids) == 1:
70 if len(msg_ids) == 1:
67 self._single_result = not isinstance(targets, (list, tuple))
71 self._single_result = not isinstance(targets, (list, tuple))
68 else:
72 else:
69 self._single_result = False
73 self._single_result = False
70
74
71 def __repr__(self):
75 def __repr__(self):
72 if self._ready:
76 if self._ready:
73 return "<%s: finished>"%(self.__class__.__name__)
77 return "<%s: finished>"%(self.__class__.__name__)
74 else:
78 else:
75 return "<%s: %s>"%(self.__class__.__name__,self._fname)
79 return "<%s: %s>"%(self.__class__.__name__,self._fname)
76
80
77
81
78 def _reconstruct_result(self, res):
82 def _reconstruct_result(self, res):
79 """Reconstruct our result from actual result list (always a list)
83 """Reconstruct our result from actual result list (always a list)
80
84
81 Override me in subclasses for turning a list of results
85 Override me in subclasses for turning a list of results
82 into the expected form.
86 into the expected form.
83 """
87 """
84 if self._single_result:
88 if self._single_result:
85 return res[0]
89 return res[0]
86 else:
90 else:
87 return res
91 return res
88
92
89 def get(self, timeout=-1):
93 def get(self, timeout=-1):
90 """Return the result when it arrives.
94 """Return the result when it arrives.
91
95
92 If `timeout` is not ``None`` and the result does not arrive within
96 If `timeout` is not ``None`` and the result does not arrive within
93 `timeout` seconds then ``TimeoutError`` is raised. If the
97 `timeout` seconds then ``TimeoutError`` is raised. If the
94 remote call raised an exception then that exception will be reraised
98 remote call raised an exception then that exception will be reraised
95 by get() inside a `RemoteError`.
99 by get() inside a `RemoteError`.
96 """
100 """
97 if not self.ready():
101 if not self.ready():
98 self.wait(timeout)
102 self.wait(timeout)
99
103
100 if self._ready:
104 if self._ready:
101 if self._success:
105 if self._success:
102 return self._result
106 return self._result
103 else:
107 else:
104 raise self._exception
108 raise self._exception
105 else:
109 else:
106 raise error.TimeoutError("Result not ready.")
110 raise error.TimeoutError("Result not ready.")
107
111
108 def ready(self):
112 def ready(self):
109 """Return whether the call has completed."""
113 """Return whether the call has completed."""
110 if not self._ready:
114 if not self._ready:
111 self.wait(0)
115 self.wait(0)
112 return self._ready
116 return self._ready
113
117
114 def wait(self, timeout=-1):
118 def wait(self, timeout=-1):
115 """Wait until the result is available or until `timeout` seconds pass.
119 """Wait until the result is available or until `timeout` seconds pass.
116
120
117 This method always returns None.
121 This method always returns None.
118 """
122 """
119 if self._ready:
123 if self._ready:
120 return
124 return
121 self._ready = self._client.wait(self.msg_ids, timeout)
125 self._ready = self._client.wait(self.msg_ids, timeout)
122 if self._ready:
126 if self._ready:
123 try:
127 try:
124 results = map(self._client.results.get, self.msg_ids)
128 results = map(self._client.results.get, self.msg_ids)
125 self._result = results
129 self._result = results
126 if self._single_result:
130 if self._single_result:
127 r = results[0]
131 r = results[0]
128 if isinstance(r, Exception):
132 if isinstance(r, Exception):
129 raise r
133 raise r
130 else:
134 else:
131 results = error.collect_exceptions(results, self._fname)
135 results = error.collect_exceptions(results, self._fname)
132 self._result = self._reconstruct_result(results)
136 self._result = self._reconstruct_result(results)
133 except Exception, e:
137 except Exception, e:
134 self._exception = e
138 self._exception = e
135 self._success = False
139 self._success = False
136 else:
140 else:
137 self._success = True
141 self._success = True
138 finally:
142 finally:
139 self._metadata = map(self._client.metadata.get, self.msg_ids)
143 self._metadata = map(self._client.metadata.get, self.msg_ids)
140
144
141
145
142 def successful(self):
146 def successful(self):
143 """Return whether the call completed without raising an exception.
147 """Return whether the call completed without raising an exception.
144
148
145 Will raise ``AssertionError`` if the result is not ready.
149 Will raise ``AssertionError`` if the result is not ready.
146 """
150 """
147 assert self.ready()
151 assert self.ready()
148 return self._success
152 return self._success
149
153
150 #----------------------------------------------------------------
154 #----------------------------------------------------------------
151 # Extra methods not in mp.pool.AsyncResult
155 # Extra methods not in mp.pool.AsyncResult
152 #----------------------------------------------------------------
156 #----------------------------------------------------------------
153
157
154 def get_dict(self, timeout=-1):
158 def get_dict(self, timeout=-1):
155 """Get the results as a dict, keyed by engine_id.
159 """Get the results as a dict, keyed by engine_id.
156
160
157 timeout behavior is described in `get()`.
161 timeout behavior is described in `get()`.
158 """
162 """
159
163
160 results = self.get(timeout)
164 results = self.get(timeout)
161 engine_ids = [ md['engine_id'] for md in self._metadata ]
165 engine_ids = [ md['engine_id'] for md in self._metadata ]
162 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
166 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
163 maxcount = bycount.count(bycount[-1])
167 maxcount = bycount.count(bycount[-1])
164 if maxcount > 1:
168 if maxcount > 1:
165 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
169 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
166 maxcount, bycount[-1]))
170 maxcount, bycount[-1]))
167
171
168 return dict(zip(engine_ids,results))
172 return dict(zip(engine_ids,results))
169
173
170 @property
174 @property
171 def result(self):
175 def result(self):
172 """result property wrapper for `get(timeout=0)`."""
176 """result property wrapper for `get(timeout=0)`."""
173 return self.get()
177 return self.get()
174
178
175 # abbreviated alias:
179 # abbreviated alias:
176 r = result
180 r = result
177
181
178 @property
182 @property
179 @check_ready
183 @check_ready
180 def metadata(self):
184 def metadata(self):
181 """property for accessing execution metadata."""
185 """property for accessing execution metadata."""
182 if self._single_result:
186 if self._single_result:
183 return self._metadata[0]
187 return self._metadata[0]
184 else:
188 else:
185 return self._metadata
189 return self._metadata
186
190
187 @property
191 @property
188 def result_dict(self):
192 def result_dict(self):
189 """result property as a dict."""
193 """result property as a dict."""
190 return self.get_dict()
194 return self.get_dict()
191
195
192 def __dict__(self):
196 def __dict__(self):
193 return self.get_dict(0)
197 return self.get_dict(0)
194
198
195 def abort(self):
199 def abort(self):
196 """abort my tasks."""
200 """abort my tasks."""
197 assert not self.ready(), "Can't abort, I am already done!"
201 assert not self.ready(), "Can't abort, I am already done!"
198 return self._client.abort(self.msg_ids, targets=self._targets, block=True)
202 return self._client.abort(self.msg_ids, targets=self._targets, block=True)
199
203
200 @property
204 @property
201 def sent(self):
205 def sent(self):
202 """check whether my messages have been sent."""
206 """check whether my messages have been sent."""
203 return self._tracker.done
207 return self._tracker.done
204
208
205 def wait_for_send(self, timeout=-1):
209 def wait_for_send(self, timeout=-1):
206 """wait for pyzmq send to complete.
210 """wait for pyzmq send to complete.
207
211
208 This is necessary when sending arrays that you intend to edit in-place.
212 This is necessary when sending arrays that you intend to edit in-place.
209 `timeout` is in seconds, and will raise TimeoutError if it is reached
213 `timeout` is in seconds, and will raise TimeoutError if it is reached
210 before the send completes.
214 before the send completes.
211 """
215 """
212 return self._tracker.wait(timeout)
216 return self._tracker.wait(timeout)
213
217
214 #-------------------------------------
218 #-------------------------------------
215 # dict-access
219 # dict-access
216 #-------------------------------------
220 #-------------------------------------
217
221
218 @check_ready
222 @check_ready
219 def __getitem__(self, key):
223 def __getitem__(self, key):
220 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
224 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
221 """
225 """
222 if isinstance(key, int):
226 if isinstance(key, int):
223 return error.collect_exceptions([self._result[key]], self._fname)[0]
227 return error.collect_exceptions([self._result[key]], self._fname)[0]
224 elif isinstance(key, slice):
228 elif isinstance(key, slice):
225 return error.collect_exceptions(self._result[key], self._fname)
229 return error.collect_exceptions(self._result[key], self._fname)
226 elif isinstance(key, basestring):
230 elif isinstance(key, basestring):
227 values = [ md[key] for md in self._metadata ]
231 values = [ md[key] for md in self._metadata ]
228 if self._single_result:
232 if self._single_result:
229 return values[0]
233 return values[0]
230 else:
234 else:
231 return values
235 return values
232 else:
236 else:
233 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
237 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
234
238
235 def __getattr__(self, key):
239 def __getattr__(self, key):
236 """getattr maps to getitem for convenient attr access to metadata."""
240 """getattr maps to getitem for convenient attr access to metadata."""
237 try:
241 try:
238 return self.__getitem__(key)
242 return self.__getitem__(key)
239 except (error.TimeoutError, KeyError):
243 except (error.TimeoutError, KeyError):
240 raise AttributeError("%r object has no attribute %r"%(
244 raise AttributeError("%r object has no attribute %r"%(
241 self.__class__.__name__, key))
245 self.__class__.__name__, key))
242
246
243 # asynchronous iterator:
247 # asynchronous iterator:
244 def __iter__(self):
248 def __iter__(self):
245 if self._single_result:
249 if self._single_result:
246 raise TypeError("AsyncResults with a single result are not iterable.")
250 raise TypeError("AsyncResults with a single result are not iterable.")
247 try:
251 try:
248 rlist = self.get(0)
252 rlist = self.get(0)
249 except error.TimeoutError:
253 except error.TimeoutError:
250 # wait for each result individually
254 # wait for each result individually
251 for msg_id in self.msg_ids:
255 for msg_id in self.msg_ids:
252 ar = AsyncResult(self._client, msg_id, self._fname)
256 ar = AsyncResult(self._client, msg_id, self._fname)
253 yield ar.get()
257 yield ar.get()
254 else:
258 else:
255 # already done
259 # already done
256 for r in rlist:
260 for r in rlist:
257 yield r
261 yield r
258
262
263 def __len__(self):
264 return len(self.msg_ids)
265
266 #-------------------------------------
267 # Sugar methods and attributes
268 #-------------------------------------
269
270 @property
271 def progress(self):
272 """the number of tasks which have been completed at this point.
273
274 Fractional progress would be given by 1.0 * ar.progress / len(ar)
275 """
276 self.wait(0)
277 return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding))
278
279 @property
280 def elapsed(self):
281 """elapsed time since initial submission"""
282 if self.ready():
283 return self.wall_time
284
285 now = submitted = datetime.now()
286 for msg_id in self.msg_ids:
287 if msg_id in self._client.metadata:
288 stamp = self._client.metadata[msg_id]['submitted']
289 if stamp and stamp < submitted:
290 submitted = stamp
291 return (now-submitted).total_seconds()
292
293 @property
294 @check_ready
295 def serial_time(self):
296 """serial computation time of a parallel calculation
297
298 Computed as the sum of (completed-started) of each task
299 """
300 t = 0
301 for md in self._metadata:
302 t += (md['completed'] - md['started']).total_seconds()
303 return t
304
305 @property
306 @check_ready
307 def wall_time(self):
308 """actual computation time of a parallel calculation
309
310 Computed as the time between the latest `received` stamp
311 and the earliest `submitted`.
312
313 Only reliable if Client was spinning/waiting when the task finished, because
314 the `received` timestamp is created when a result is pulled off of the zmq queue,
315 which happens as a result of `client.spin()`.
316 """
317 received = max([ md['received'] for md in self._metadata ])
318 submitted = min([ md['submitted'] for md in self._metadata ])
319 return (received - submitted).total_seconds()
320
321 def wait_interactive(self, interval=1., timeout=None):
322 """interactive wait, printing progress at regular intervals"""
323 N = len(self)
324 tic = time.time()
325 while not self.ready() and (timeout is None or time.time() - tic <= timeout):
326 self.wait(interval)
327 clear_output()
328 print "%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed),
329 sys.stdout.flush()
330 print
331 print "done"
259
332
260
333
261 class AsyncMapResult(AsyncResult):
334 class AsyncMapResult(AsyncResult):
262 """Class for representing results of non-blocking gathers.
335 """Class for representing results of non-blocking gathers.
263
336
264 This will properly reconstruct the gather.
337 This will properly reconstruct the gather.
265
338
266 This class is iterable at any time, and will wait on results as they come.
339 This class is iterable at any time, and will wait on results as they come.
267
340
268 If ordered=False, then the first results to arrive will come first, otherwise
341 If ordered=False, then the first results to arrive will come first, otherwise
269 results will be yielded in the order they were submitted.
342 results will be yielded in the order they were submitted.
270
343
271 """
344 """
272
345
273 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
346 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
274 AsyncResult.__init__(self, client, msg_ids, fname=fname)
347 AsyncResult.__init__(self, client, msg_ids, fname=fname)
275 self._mapObject = mapObject
348 self._mapObject = mapObject
276 self._single_result = False
349 self._single_result = False
277 self.ordered = ordered
350 self.ordered = ordered
278
351
279 def _reconstruct_result(self, res):
352 def _reconstruct_result(self, res):
280 """Perform the gather on the actual results."""
353 """Perform the gather on the actual results."""
281 return self._mapObject.joinPartitions(res)
354 return self._mapObject.joinPartitions(res)
282
355
283 # asynchronous iterator:
356 # asynchronous iterator:
284 def __iter__(self):
357 def __iter__(self):
285 it = self._ordered_iter if self.ordered else self._unordered_iter
358 it = self._ordered_iter if self.ordered else self._unordered_iter
286 for r in it():
359 for r in it():
287 yield r
360 yield r
288
361
289 # asynchronous ordered iterator:
362 # asynchronous ordered iterator:
290 def _ordered_iter(self):
363 def _ordered_iter(self):
291 """iterator for results *as they arrive*, preserving submission order."""
364 """iterator for results *as they arrive*, preserving submission order."""
292 try:
365 try:
293 rlist = self.get(0)
366 rlist = self.get(0)
294 except error.TimeoutError:
367 except error.TimeoutError:
295 # wait for each result individually
368 # wait for each result individually
296 for msg_id in self.msg_ids:
369 for msg_id in self.msg_ids:
297 ar = AsyncResult(self._client, msg_id, self._fname)
370 ar = AsyncResult(self._client, msg_id, self._fname)
298 rlist = ar.get()
371 rlist = ar.get()
299 try:
372 try:
300 for r in rlist:
373 for r in rlist:
301 yield r
374 yield r
302 except TypeError:
375 except TypeError:
303 # flattened, not a list
376 # flattened, not a list
304 # this could get broken by flattened data that returns iterables
377 # this could get broken by flattened data that returns iterables
305 # but most calls to map do not expose the `flatten` argument
378 # but most calls to map do not expose the `flatten` argument
306 yield rlist
379 yield rlist
307 else:
380 else:
308 # already done
381 # already done
309 for r in rlist:
382 for r in rlist:
310 yield r
383 yield r
311
384
312 # asynchronous unordered iterator:
385 # asynchronous unordered iterator:
313 def _unordered_iter(self):
386 def _unordered_iter(self):
314 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
387 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
315 try:
388 try:
316 rlist = self.get(0)
389 rlist = self.get(0)
317 except error.TimeoutError:
390 except error.TimeoutError:
318 pending = set(self.msg_ids)
391 pending = set(self.msg_ids)
319 while pending:
392 while pending:
320 try:
393 try:
321 self._client.wait(pending, 1e-3)
394 self._client.wait(pending, 1e-3)
322 except error.TimeoutError:
395 except error.TimeoutError:
323 # ignore timeout error, because that only means
396 # ignore timeout error, because that only means
324 # *some* jobs are outstanding
397 # *some* jobs are outstanding
325 pass
398 pass
326 # update ready set with those no longer outstanding:
399 # update ready set with those no longer outstanding:
327 ready = pending.difference(self._client.outstanding)
400 ready = pending.difference(self._client.outstanding)
328 # update pending to exclude those that are finished
401 # update pending to exclude those that are finished
329 pending = pending.difference(ready)
402 pending = pending.difference(ready)
330 while ready:
403 while ready:
331 msg_id = ready.pop()
404 msg_id = ready.pop()
332 ar = AsyncResult(self._client, msg_id, self._fname)
405 ar = AsyncResult(self._client, msg_id, self._fname)
333 rlist = ar.get()
406 rlist = ar.get()
334 try:
407 try:
335 for r in rlist:
408 for r in rlist:
336 yield r
409 yield r
337 except TypeError:
410 except TypeError:
338 # flattened, not a list
411 # flattened, not a list
339 # this could get broken by flattened data that returns iterables
412 # this could get broken by flattened data that returns iterables
340 # but most calls to map do not expose the `flatten` argument
413 # but most calls to map do not expose the `flatten` argument
341 yield rlist
414 yield rlist
342 else:
415 else:
343 # already done
416 # already done
344 for r in rlist:
417 for r in rlist:
345 yield r
418 yield r
346
419
347
420
348
421
349 class AsyncHubResult(AsyncResult):
422 class AsyncHubResult(AsyncResult):
350 """Class to wrap pending results that must be requested from the Hub.
423 """Class to wrap pending results that must be requested from the Hub.
351
424
352 Note that waiting/polling on these objects requires polling the Hubover the network,
425 Note that waiting/polling on these objects requires polling the Hubover the network,
353 so use `AsyncHubResult.wait()` sparingly.
426 so use `AsyncHubResult.wait()` sparingly.
354 """
427 """
355
428
356 def wait(self, timeout=-1):
429 def wait(self, timeout=-1):
357 """wait for result to complete."""
430 """wait for result to complete."""
358 start = time.time()
431 start = time.time()
359 if self._ready:
432 if self._ready:
360 return
433 return
361 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
434 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
362 local_ready = self._client.wait(local_ids, timeout)
435 local_ready = self._client.wait(local_ids, timeout)
363 if local_ready:
436 if local_ready:
364 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
437 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
365 if not remote_ids:
438 if not remote_ids:
366 self._ready = True
439 self._ready = True
367 else:
440 else:
368 rdict = self._client.result_status(remote_ids, status_only=False)
441 rdict = self._client.result_status(remote_ids, status_only=False)
369 pending = rdict['pending']
442 pending = rdict['pending']
370 while pending and (timeout < 0 or time.time() < start+timeout):
443 while pending and (timeout < 0 or time.time() < start+timeout):
371 rdict = self._client.result_status(remote_ids, status_only=False)
444 rdict = self._client.result_status(remote_ids, status_only=False)
372 pending = rdict['pending']
445 pending = rdict['pending']
373 if pending:
446 if pending:
374 time.sleep(0.1)
447 time.sleep(0.1)
375 if not pending:
448 if not pending:
376 self._ready = True
449 self._ready = True
377 if self._ready:
450 if self._ready:
378 try:
451 try:
379 results = map(self._client.results.get, self.msg_ids)
452 results = map(self._client.results.get, self.msg_ids)
380 self._result = results
453 self._result = results
381 if self._single_result:
454 if self._single_result:
382 r = results[0]
455 r = results[0]
383 if isinstance(r, Exception):
456 if isinstance(r, Exception):
384 raise r
457 raise r
385 else:
458 else:
386 results = error.collect_exceptions(results, self._fname)
459 results = error.collect_exceptions(results, self._fname)
387 self._result = self._reconstruct_result(results)
460 self._result = self._reconstruct_result(results)
388 except Exception, e:
461 except Exception, e:
389 self._exception = e
462 self._exception = e
390 self._success = False
463 self._success = False
391 else:
464 else:
392 self._success = True
465 self._success = True
393 finally:
466 finally:
394 self._metadata = map(self._client.metadata.get, self.msg_ids)
467 self._metadata = map(self._client.metadata.get, self.msg_ids)
395
468
396 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult'] No newline at end of file
469 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']
General Comments 0
You need to be logged in to leave comments. Login now