##// END OF EJS Templates
Merge pull request #1608 from minrk/ar_sugar_2.6...
Fernando Perez -
r6505:5750e2dd merge
parent child Browse files
Show More
@@ -1,505 +1,517 b''
1 """AsyncResult objects for the client
1 """AsyncResult objects for the client
2
2
3 Authors:
3 Authors:
4
4
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import sys
18 import sys
19 import time
19 import time
20 from datetime import datetime
20 from datetime import datetime
21
21
22 from zmq import MessageTracker
22 from zmq import MessageTracker
23
23
24 from IPython.core.display import clear_output
24 from IPython.core.display import clear_output
25 from IPython.external.decorator import decorator
25 from IPython.external.decorator import decorator
26 from IPython.parallel import error
26 from IPython.parallel import error
27
27
28 #-----------------------------------------------------------------------------
29 # Functions
30 #-----------------------------------------------------------------------------
31
32 def _total_seconds(td):
33 """timedelta.total_seconds was added in 2.7"""
34 try:
35 # Python >= 2.7
36 return td.total_seconds()
37 except AttributeError:
38 # Python 2.6
39 return 1e-6 * (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)
28
40
29 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
30 # Classes
42 # Classes
31 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
32
44
33 # global empty tracker that's always done:
45 # global empty tracker that's always done:
34 finished_tracker = MessageTracker()
46 finished_tracker = MessageTracker()
35
47
36 @decorator
48 @decorator
37 def check_ready(f, self, *args, **kwargs):
49 def check_ready(f, self, *args, **kwargs):
38 """Call spin() to sync state prior to calling the method."""
50 """Call spin() to sync state prior to calling the method."""
39 self.wait(0)
51 self.wait(0)
40 if not self._ready:
52 if not self._ready:
41 raise error.TimeoutError("result not ready")
53 raise error.TimeoutError("result not ready")
42 return f(self, *args, **kwargs)
54 return f(self, *args, **kwargs)
43
55
44 class AsyncResult(object):
56 class AsyncResult(object):
45 """Class for representing results of non-blocking calls.
57 """Class for representing results of non-blocking calls.
46
58
47 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
59 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
48 """
60 """
49
61
50 msg_ids = None
62 msg_ids = None
51 _targets = None
63 _targets = None
52 _tracker = None
64 _tracker = None
53 _single_result = False
65 _single_result = False
54
66
55 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
67 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
56 if isinstance(msg_ids, basestring):
68 if isinstance(msg_ids, basestring):
57 # always a list
69 # always a list
58 msg_ids = [msg_ids]
70 msg_ids = [msg_ids]
59 if tracker is None:
71 if tracker is None:
60 # default to always done
72 # default to always done
61 tracker = finished_tracker
73 tracker = finished_tracker
62 self._client = client
74 self._client = client
63 self.msg_ids = msg_ids
75 self.msg_ids = msg_ids
64 self._fname=fname
76 self._fname=fname
65 self._targets = targets
77 self._targets = targets
66 self._tracker = tracker
78 self._tracker = tracker
67 self._ready = False
79 self._ready = False
68 self._success = None
80 self._success = None
69 self._metadata = None
81 self._metadata = None
70 if len(msg_ids) == 1:
82 if len(msg_ids) == 1:
71 self._single_result = not isinstance(targets, (list, tuple))
83 self._single_result = not isinstance(targets, (list, tuple))
72 else:
84 else:
73 self._single_result = False
85 self._single_result = False
74
86
75 def __repr__(self):
87 def __repr__(self):
76 if self._ready:
88 if self._ready:
77 return "<%s: finished>"%(self.__class__.__name__)
89 return "<%s: finished>"%(self.__class__.__name__)
78 else:
90 else:
79 return "<%s: %s>"%(self.__class__.__name__,self._fname)
91 return "<%s: %s>"%(self.__class__.__name__,self._fname)
80
92
81
93
82 def _reconstruct_result(self, res):
94 def _reconstruct_result(self, res):
83 """Reconstruct our result from actual result list (always a list)
95 """Reconstruct our result from actual result list (always a list)
84
96
85 Override me in subclasses for turning a list of results
97 Override me in subclasses for turning a list of results
86 into the expected form.
98 into the expected form.
87 """
99 """
88 if self._single_result:
100 if self._single_result:
89 return res[0]
101 return res[0]
90 else:
102 else:
91 return res
103 return res
92
104
93 def get(self, timeout=-1):
105 def get(self, timeout=-1):
94 """Return the result when it arrives.
106 """Return the result when it arrives.
95
107
96 If `timeout` is not ``None`` and the result does not arrive within
108 If `timeout` is not ``None`` and the result does not arrive within
97 `timeout` seconds then ``TimeoutError`` is raised. If the
109 `timeout` seconds then ``TimeoutError`` is raised. If the
98 remote call raised an exception then that exception will be reraised
110 remote call raised an exception then that exception will be reraised
99 by get() inside a `RemoteError`.
111 by get() inside a `RemoteError`.
100 """
112 """
101 if not self.ready():
113 if not self.ready():
102 self.wait(timeout)
114 self.wait(timeout)
103
115
104 if self._ready:
116 if self._ready:
105 if self._success:
117 if self._success:
106 return self._result
118 return self._result
107 else:
119 else:
108 raise self._exception
120 raise self._exception
109 else:
121 else:
110 raise error.TimeoutError("Result not ready.")
122 raise error.TimeoutError("Result not ready.")
111
123
112 def ready(self):
124 def ready(self):
113 """Return whether the call has completed."""
125 """Return whether the call has completed."""
114 if not self._ready:
126 if not self._ready:
115 self.wait(0)
127 self.wait(0)
116 return self._ready
128 return self._ready
117
129
118 def wait(self, timeout=-1):
130 def wait(self, timeout=-1):
119 """Wait until the result is available or until `timeout` seconds pass.
131 """Wait until the result is available or until `timeout` seconds pass.
120
132
121 This method always returns None.
133 This method always returns None.
122 """
134 """
123 if self._ready:
135 if self._ready:
124 return
136 return
125 self._ready = self._client.wait(self.msg_ids, timeout)
137 self._ready = self._client.wait(self.msg_ids, timeout)
126 if self._ready:
138 if self._ready:
127 try:
139 try:
128 results = map(self._client.results.get, self.msg_ids)
140 results = map(self._client.results.get, self.msg_ids)
129 self._result = results
141 self._result = results
130 if self._single_result:
142 if self._single_result:
131 r = results[0]
143 r = results[0]
132 if isinstance(r, Exception):
144 if isinstance(r, Exception):
133 raise r
145 raise r
134 else:
146 else:
135 results = error.collect_exceptions(results, self._fname)
147 results = error.collect_exceptions(results, self._fname)
136 self._result = self._reconstruct_result(results)
148 self._result = self._reconstruct_result(results)
137 except Exception, e:
149 except Exception, e:
138 self._exception = e
150 self._exception = e
139 self._success = False
151 self._success = False
140 else:
152 else:
141 self._success = True
153 self._success = True
142 finally:
154 finally:
143 self._metadata = map(self._client.metadata.get, self.msg_ids)
155 self._metadata = map(self._client.metadata.get, self.msg_ids)
144
156
145
157
146 def successful(self):
158 def successful(self):
147 """Return whether the call completed without raising an exception.
159 """Return whether the call completed without raising an exception.
148
160
149 Will raise ``AssertionError`` if the result is not ready.
161 Will raise ``AssertionError`` if the result is not ready.
150 """
162 """
151 assert self.ready()
163 assert self.ready()
152 return self._success
164 return self._success
153
165
154 #----------------------------------------------------------------
166 #----------------------------------------------------------------
155 # Extra methods not in mp.pool.AsyncResult
167 # Extra methods not in mp.pool.AsyncResult
156 #----------------------------------------------------------------
168 #----------------------------------------------------------------
157
169
158 def get_dict(self, timeout=-1):
170 def get_dict(self, timeout=-1):
159 """Get the results as a dict, keyed by engine_id.
171 """Get the results as a dict, keyed by engine_id.
160
172
161 timeout behavior is described in `get()`.
173 timeout behavior is described in `get()`.
162 """
174 """
163
175
164 results = self.get(timeout)
176 results = self.get(timeout)
165 engine_ids = [ md['engine_id'] for md in self._metadata ]
177 engine_ids = [ md['engine_id'] for md in self._metadata ]
166 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
178 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
167 maxcount = bycount.count(bycount[-1])
179 maxcount = bycount.count(bycount[-1])
168 if maxcount > 1:
180 if maxcount > 1:
169 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
181 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
170 maxcount, bycount[-1]))
182 maxcount, bycount[-1]))
171
183
172 return dict(zip(engine_ids,results))
184 return dict(zip(engine_ids,results))
173
185
174 @property
186 @property
175 def result(self):
187 def result(self):
176 """result property wrapper for `get(timeout=0)`."""
188 """result property wrapper for `get(timeout=0)`."""
177 return self.get()
189 return self.get()
178
190
179 # abbreviated alias:
191 # abbreviated alias:
180 r = result
192 r = result
181
193
182 @property
194 @property
183 @check_ready
195 @check_ready
184 def metadata(self):
196 def metadata(self):
185 """property for accessing execution metadata."""
197 """property for accessing execution metadata."""
186 if self._single_result:
198 if self._single_result:
187 return self._metadata[0]
199 return self._metadata[0]
188 else:
200 else:
189 return self._metadata
201 return self._metadata
190
202
191 @property
203 @property
192 def result_dict(self):
204 def result_dict(self):
193 """result property as a dict."""
205 """result property as a dict."""
194 return self.get_dict()
206 return self.get_dict()
195
207
196 def __dict__(self):
208 def __dict__(self):
197 return self.get_dict(0)
209 return self.get_dict(0)
198
210
199 def abort(self):
211 def abort(self):
200 """abort my tasks."""
212 """abort my tasks."""
201 assert not self.ready(), "Can't abort, I am already done!"
213 assert not self.ready(), "Can't abort, I am already done!"
202 return self._client.abort(self.msg_ids, targets=self._targets, block=True)
214 return self._client.abort(self.msg_ids, targets=self._targets, block=True)
203
215
204 @property
216 @property
205 def sent(self):
217 def sent(self):
206 """check whether my messages have been sent."""
218 """check whether my messages have been sent."""
207 return self._tracker.done
219 return self._tracker.done
208
220
209 def wait_for_send(self, timeout=-1):
221 def wait_for_send(self, timeout=-1):
210 """wait for pyzmq send to complete.
222 """wait for pyzmq send to complete.
211
223
212 This is necessary when sending arrays that you intend to edit in-place.
224 This is necessary when sending arrays that you intend to edit in-place.
213 `timeout` is in seconds, and will raise TimeoutError if it is reached
225 `timeout` is in seconds, and will raise TimeoutError if it is reached
214 before the send completes.
226 before the send completes.
215 """
227 """
216 return self._tracker.wait(timeout)
228 return self._tracker.wait(timeout)
217
229
218 #-------------------------------------
230 #-------------------------------------
219 # dict-access
231 # dict-access
220 #-------------------------------------
232 #-------------------------------------
221
233
222 @check_ready
234 @check_ready
223 def __getitem__(self, key):
235 def __getitem__(self, key):
224 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
236 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
225 """
237 """
226 if isinstance(key, int):
238 if isinstance(key, int):
227 return error.collect_exceptions([self._result[key]], self._fname)[0]
239 return error.collect_exceptions([self._result[key]], self._fname)[0]
228 elif isinstance(key, slice):
240 elif isinstance(key, slice):
229 return error.collect_exceptions(self._result[key], self._fname)
241 return error.collect_exceptions(self._result[key], self._fname)
230 elif isinstance(key, basestring):
242 elif isinstance(key, basestring):
231 values = [ md[key] for md in self._metadata ]
243 values = [ md[key] for md in self._metadata ]
232 if self._single_result:
244 if self._single_result:
233 return values[0]
245 return values[0]
234 else:
246 else:
235 return values
247 return values
236 else:
248 else:
237 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
249 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
238
250
239 def __getattr__(self, key):
251 def __getattr__(self, key):
240 """getattr maps to getitem for convenient attr access to metadata."""
252 """getattr maps to getitem for convenient attr access to metadata."""
241 try:
253 try:
242 return self.__getitem__(key)
254 return self.__getitem__(key)
243 except (error.TimeoutError, KeyError):
255 except (error.TimeoutError, KeyError):
244 raise AttributeError("%r object has no attribute %r"%(
256 raise AttributeError("%r object has no attribute %r"%(
245 self.__class__.__name__, key))
257 self.__class__.__name__, key))
246
258
247 # asynchronous iterator:
259 # asynchronous iterator:
248 def __iter__(self):
260 def __iter__(self):
249 if self._single_result:
261 if self._single_result:
250 raise TypeError("AsyncResults with a single result are not iterable.")
262 raise TypeError("AsyncResults with a single result are not iterable.")
251 try:
263 try:
252 rlist = self.get(0)
264 rlist = self.get(0)
253 except error.TimeoutError:
265 except error.TimeoutError:
254 # wait for each result individually
266 # wait for each result individually
255 for msg_id in self.msg_ids:
267 for msg_id in self.msg_ids:
256 ar = AsyncResult(self._client, msg_id, self._fname)
268 ar = AsyncResult(self._client, msg_id, self._fname)
257 yield ar.get()
269 yield ar.get()
258 else:
270 else:
259 # already done
271 # already done
260 for r in rlist:
272 for r in rlist:
261 yield r
273 yield r
262
274
263 def __len__(self):
275 def __len__(self):
264 return len(self.msg_ids)
276 return len(self.msg_ids)
265
277
266 #-------------------------------------
278 #-------------------------------------
267 # Sugar methods and attributes
279 # Sugar methods and attributes
268 #-------------------------------------
280 #-------------------------------------
269
281
270 def timedelta(self, start, end, start_key=min, end_key=max):
282 def timedelta(self, start, end, start_key=min, end_key=max):
271 """compute the difference between two sets of timestamps
283 """compute the difference between two sets of timestamps
272
284
273 The default behavior is to use the earliest of the first
285 The default behavior is to use the earliest of the first
274 and the latest of the second list, but this can be changed
286 and the latest of the second list, but this can be changed
275 by passing a different
287 by passing a different
276
288
277 Parameters
289 Parameters
278 ----------
290 ----------
279
291
280 start : one or more datetime objects (e.g. ar.submitted)
292 start : one or more datetime objects (e.g. ar.submitted)
281 end : one or more datetime objects (e.g. ar.received)
293 end : one or more datetime objects (e.g. ar.received)
282 start_key : callable
294 start_key : callable
283 Function to call on `start` to extract the relevant
295 Function to call on `start` to extract the relevant
284 entry [defalt: min]
296 entry [defalt: min]
285 end_key : callable
297 end_key : callable
286 Function to call on `end` to extract the relevant
298 Function to call on `end` to extract the relevant
287 entry [default: max]
299 entry [default: max]
288
300
289 Returns
301 Returns
290 -------
302 -------
291
303
292 dt : float
304 dt : float
293 The time elapsed (in seconds) between the two selected timestamps.
305 The time elapsed (in seconds) between the two selected timestamps.
294 """
306 """
295 if not isinstance(start, datetime):
307 if not isinstance(start, datetime):
296 # handle single_result AsyncResults, where ar.stamp is single object,
308 # handle single_result AsyncResults, where ar.stamp is single object,
297 # not a list
309 # not a list
298 start = start_key(start)
310 start = start_key(start)
299 if not isinstance(end, datetime):
311 if not isinstance(end, datetime):
300 # handle single_result AsyncResults, where ar.stamp is single object,
312 # handle single_result AsyncResults, where ar.stamp is single object,
301 # not a list
313 # not a list
302 end = end_key(end)
314 end = end_key(end)
303 return (end - start).total_seconds()
315 return _total_seconds(end - start)
304
316
305 @property
317 @property
306 def progress(self):
318 def progress(self):
307 """the number of tasks which have been completed at this point.
319 """the number of tasks which have been completed at this point.
308
320
309 Fractional progress would be given by 1.0 * ar.progress / len(ar)
321 Fractional progress would be given by 1.0 * ar.progress / len(ar)
310 """
322 """
311 self.wait(0)
323 self.wait(0)
312 return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding))
324 return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding))
313
325
314 @property
326 @property
315 def elapsed(self):
327 def elapsed(self):
316 """elapsed time since initial submission"""
328 """elapsed time since initial submission"""
317 if self.ready():
329 if self.ready():
318 return self.wall_time
330 return self.wall_time
319
331
320 now = submitted = datetime.now()
332 now = submitted = datetime.now()
321 for msg_id in self.msg_ids:
333 for msg_id in self.msg_ids:
322 if msg_id in self._client.metadata:
334 if msg_id in self._client.metadata:
323 stamp = self._client.metadata[msg_id]['submitted']
335 stamp = self._client.metadata[msg_id]['submitted']
324 if stamp and stamp < submitted:
336 if stamp and stamp < submitted:
325 submitted = stamp
337 submitted = stamp
326 return (now-submitted).total_seconds()
338 return _total_seconds(now-submitted)
327
339
328 @property
340 @property
329 @check_ready
341 @check_ready
330 def serial_time(self):
342 def serial_time(self):
331 """serial computation time of a parallel calculation
343 """serial computation time of a parallel calculation
332
344
333 Computed as the sum of (completed-started) of each task
345 Computed as the sum of (completed-started) of each task
334 """
346 """
335 t = 0
347 t = 0
336 for md in self._metadata:
348 for md in self._metadata:
337 t += (md['completed'] - md['started']).total_seconds()
349 t += _total_seconds(md['completed'] - md['started'])
338 return t
350 return t
339
351
340 @property
352 @property
341 @check_ready
353 @check_ready
342 def wall_time(self):
354 def wall_time(self):
343 """actual computation time of a parallel calculation
355 """actual computation time of a parallel calculation
344
356
345 Computed as the time between the latest `received` stamp
357 Computed as the time between the latest `received` stamp
346 and the earliest `submitted`.
358 and the earliest `submitted`.
347
359
348 Only reliable if Client was spinning/waiting when the task finished, because
360 Only reliable if Client was spinning/waiting when the task finished, because
349 the `received` timestamp is created when a result is pulled off of the zmq queue,
361 the `received` timestamp is created when a result is pulled off of the zmq queue,
350 which happens as a result of `client.spin()`.
362 which happens as a result of `client.spin()`.
351
363
352 For similar comparison of other timestamp pairs, check out AsyncResult.timedelta.
364 For similar comparison of other timestamp pairs, check out AsyncResult.timedelta.
353
365
354 """
366 """
355 return self.timedelta(self.submitted, self.received)
367 return self.timedelta(self.submitted, self.received)
356
368
357 def wait_interactive(self, interval=1., timeout=None):
369 def wait_interactive(self, interval=1., timeout=None):
358 """interactive wait, printing progress at regular intervals"""
370 """interactive wait, printing progress at regular intervals"""
359 N = len(self)
371 N = len(self)
360 tic = time.time()
372 tic = time.time()
361 while not self.ready() and (timeout is None or time.time() - tic <= timeout):
373 while not self.ready() and (timeout is None or time.time() - tic <= timeout):
362 self.wait(interval)
374 self.wait(interval)
363 clear_output()
375 clear_output()
364 print "%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed),
376 print "%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed),
365 sys.stdout.flush()
377 sys.stdout.flush()
366 print
378 print
367 print "done"
379 print "done"
368
380
369
381
370 class AsyncMapResult(AsyncResult):
382 class AsyncMapResult(AsyncResult):
371 """Class for representing results of non-blocking gathers.
383 """Class for representing results of non-blocking gathers.
372
384
373 This will properly reconstruct the gather.
385 This will properly reconstruct the gather.
374
386
375 This class is iterable at any time, and will wait on results as they come.
387 This class is iterable at any time, and will wait on results as they come.
376
388
377 If ordered=False, then the first results to arrive will come first, otherwise
389 If ordered=False, then the first results to arrive will come first, otherwise
378 results will be yielded in the order they were submitted.
390 results will be yielded in the order they were submitted.
379
391
380 """
392 """
381
393
382 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
394 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
383 AsyncResult.__init__(self, client, msg_ids, fname=fname)
395 AsyncResult.__init__(self, client, msg_ids, fname=fname)
384 self._mapObject = mapObject
396 self._mapObject = mapObject
385 self._single_result = False
397 self._single_result = False
386 self.ordered = ordered
398 self.ordered = ordered
387
399
388 def _reconstruct_result(self, res):
400 def _reconstruct_result(self, res):
389 """Perform the gather on the actual results."""
401 """Perform the gather on the actual results."""
390 return self._mapObject.joinPartitions(res)
402 return self._mapObject.joinPartitions(res)
391
403
392 # asynchronous iterator:
404 # asynchronous iterator:
393 def __iter__(self):
405 def __iter__(self):
394 it = self._ordered_iter if self.ordered else self._unordered_iter
406 it = self._ordered_iter if self.ordered else self._unordered_iter
395 for r in it():
407 for r in it():
396 yield r
408 yield r
397
409
398 # asynchronous ordered iterator:
410 # asynchronous ordered iterator:
399 def _ordered_iter(self):
411 def _ordered_iter(self):
400 """iterator for results *as they arrive*, preserving submission order."""
412 """iterator for results *as they arrive*, preserving submission order."""
401 try:
413 try:
402 rlist = self.get(0)
414 rlist = self.get(0)
403 except error.TimeoutError:
415 except error.TimeoutError:
404 # wait for each result individually
416 # wait for each result individually
405 for msg_id in self.msg_ids:
417 for msg_id in self.msg_ids:
406 ar = AsyncResult(self._client, msg_id, self._fname)
418 ar = AsyncResult(self._client, msg_id, self._fname)
407 rlist = ar.get()
419 rlist = ar.get()
408 try:
420 try:
409 for r in rlist:
421 for r in rlist:
410 yield r
422 yield r
411 except TypeError:
423 except TypeError:
412 # flattened, not a list
424 # flattened, not a list
413 # this could get broken by flattened data that returns iterables
425 # this could get broken by flattened data that returns iterables
414 # but most calls to map do not expose the `flatten` argument
426 # but most calls to map do not expose the `flatten` argument
415 yield rlist
427 yield rlist
416 else:
428 else:
417 # already done
429 # already done
418 for r in rlist:
430 for r in rlist:
419 yield r
431 yield r
420
432
421 # asynchronous unordered iterator:
433 # asynchronous unordered iterator:
422 def _unordered_iter(self):
434 def _unordered_iter(self):
423 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
435 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
424 try:
436 try:
425 rlist = self.get(0)
437 rlist = self.get(0)
426 except error.TimeoutError:
438 except error.TimeoutError:
427 pending = set(self.msg_ids)
439 pending = set(self.msg_ids)
428 while pending:
440 while pending:
429 try:
441 try:
430 self._client.wait(pending, 1e-3)
442 self._client.wait(pending, 1e-3)
431 except error.TimeoutError:
443 except error.TimeoutError:
432 # ignore timeout error, because that only means
444 # ignore timeout error, because that only means
433 # *some* jobs are outstanding
445 # *some* jobs are outstanding
434 pass
446 pass
435 # update ready set with those no longer outstanding:
447 # update ready set with those no longer outstanding:
436 ready = pending.difference(self._client.outstanding)
448 ready = pending.difference(self._client.outstanding)
437 # update pending to exclude those that are finished
449 # update pending to exclude those that are finished
438 pending = pending.difference(ready)
450 pending = pending.difference(ready)
439 while ready:
451 while ready:
440 msg_id = ready.pop()
452 msg_id = ready.pop()
441 ar = AsyncResult(self._client, msg_id, self._fname)
453 ar = AsyncResult(self._client, msg_id, self._fname)
442 rlist = ar.get()
454 rlist = ar.get()
443 try:
455 try:
444 for r in rlist:
456 for r in rlist:
445 yield r
457 yield r
446 except TypeError:
458 except TypeError:
447 # flattened, not a list
459 # flattened, not a list
448 # this could get broken by flattened data that returns iterables
460 # this could get broken by flattened data that returns iterables
449 # but most calls to map do not expose the `flatten` argument
461 # but most calls to map do not expose the `flatten` argument
450 yield rlist
462 yield rlist
451 else:
463 else:
452 # already done
464 # already done
453 for r in rlist:
465 for r in rlist:
454 yield r
466 yield r
455
467
456
468
457
469
458 class AsyncHubResult(AsyncResult):
470 class AsyncHubResult(AsyncResult):
459 """Class to wrap pending results that must be requested from the Hub.
471 """Class to wrap pending results that must be requested from the Hub.
460
472
461 Note that waiting/polling on these objects requires polling the Hubover the network,
473 Note that waiting/polling on these objects requires polling the Hubover the network,
462 so use `AsyncHubResult.wait()` sparingly.
474 so use `AsyncHubResult.wait()` sparingly.
463 """
475 """
464
476
465 def wait(self, timeout=-1):
477 def wait(self, timeout=-1):
466 """wait for result to complete."""
478 """wait for result to complete."""
467 start = time.time()
479 start = time.time()
468 if self._ready:
480 if self._ready:
469 return
481 return
470 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
482 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
471 local_ready = self._client.wait(local_ids, timeout)
483 local_ready = self._client.wait(local_ids, timeout)
472 if local_ready:
484 if local_ready:
473 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
485 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
474 if not remote_ids:
486 if not remote_ids:
475 self._ready = True
487 self._ready = True
476 else:
488 else:
477 rdict = self._client.result_status(remote_ids, status_only=False)
489 rdict = self._client.result_status(remote_ids, status_only=False)
478 pending = rdict['pending']
490 pending = rdict['pending']
479 while pending and (timeout < 0 or time.time() < start+timeout):
491 while pending and (timeout < 0 or time.time() < start+timeout):
480 rdict = self._client.result_status(remote_ids, status_only=False)
492 rdict = self._client.result_status(remote_ids, status_only=False)
481 pending = rdict['pending']
493 pending = rdict['pending']
482 if pending:
494 if pending:
483 time.sleep(0.1)
495 time.sleep(0.1)
484 if not pending:
496 if not pending:
485 self._ready = True
497 self._ready = True
486 if self._ready:
498 if self._ready:
487 try:
499 try:
488 results = map(self._client.results.get, self.msg_ids)
500 results = map(self._client.results.get, self.msg_ids)
489 self._result = results
501 self._result = results
490 if self._single_result:
502 if self._single_result:
491 r = results[0]
503 r = results[0]
492 if isinstance(r, Exception):
504 if isinstance(r, Exception):
493 raise r
505 raise r
494 else:
506 else:
495 results = error.collect_exceptions(results, self._fname)
507 results = error.collect_exceptions(results, self._fname)
496 self._result = self._reconstruct_result(results)
508 self._result = self._reconstruct_result(results)
497 except Exception, e:
509 except Exception, e:
498 self._exception = e
510 self._exception = e
499 self._success = False
511 self._success = False
500 else:
512 else:
501 self._success = True
513 self._success = True
502 finally:
514 finally:
503 self._metadata = map(self._client.metadata.get, self.msg_ids)
515 self._metadata = map(self._client.metadata.get, self.msg_ids)
504
516
505 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult'] No newline at end of file
517 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']
General Comments 0
You need to be logged in to leave comments. Login now