Show More
@@ -15,13 +15,17 b' Authors:' | |||||
15 | # Imports |
|
15 | # Imports | |
16 | #----------------------------------------------------------------------------- |
|
16 | #----------------------------------------------------------------------------- | |
17 |
|
17 | |||
|
18 | import sys | |||
18 | import time |
|
19 | import time | |
|
20 | from datetime import datetime | |||
19 |
|
21 | |||
20 | from zmq import MessageTracker |
|
22 | from zmq import MessageTracker | |
21 |
|
23 | |||
|
24 | from IPython.core.display import clear_output | |||
22 | from IPython.external.decorator import decorator |
|
25 | from IPython.external.decorator import decorator | |
23 | from IPython.parallel import error |
|
26 | from IPython.parallel import error | |
24 |
|
27 | |||
|
28 | ||||
25 | #----------------------------------------------------------------------------- |
|
29 | #----------------------------------------------------------------------------- | |
26 | # Classes |
|
30 | # Classes | |
27 | #----------------------------------------------------------------------------- |
|
31 | #----------------------------------------------------------------------------- | |
@@ -255,7 +259,76 b' class AsyncResult(object):' | |||||
255 | # already done |
|
259 | # already done | |
256 | for r in rlist: |
|
260 | for r in rlist: | |
257 | yield r |
|
261 | yield r | |
258 |
|
262 | |||
|
263 | def __len__(self): | |||
|
264 | return len(self.msg_ids) | |||
|
265 | ||||
|
266 | #------------------------------------- | |||
|
267 | # Sugar methods and attributes | |||
|
268 | #------------------------------------- | |||
|
269 | ||||
|
270 | @property | |||
|
271 | def progress(self): | |||
|
272 | """the number of tasks which have been completed at this point. | |||
|
273 | ||||
|
274 | Fractional progress would be given by 1.0 * ar.progress / len(ar) | |||
|
275 | """ | |||
|
276 | self.wait(0) | |||
|
277 | return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding)) | |||
|
278 | ||||
|
279 | @property | |||
|
280 | def elapsed(self): | |||
|
281 | """elapsed time since initial submission""" | |||
|
282 | if self.ready(): | |||
|
283 | return self.wall_time | |||
|
284 | ||||
|
285 | now = submitted = datetime.now() | |||
|
286 | for msg_id in self.msg_ids: | |||
|
287 | if msg_id in self._client.metadata: | |||
|
288 | stamp = self._client.metadata[msg_id]['submitted'] | |||
|
289 | if stamp and stamp < submitted: | |||
|
290 | submitted = stamp | |||
|
291 | return (now-submitted).total_seconds() | |||
|
292 | ||||
|
293 | @property | |||
|
294 | @check_ready | |||
|
295 | def serial_time(self): | |||
|
296 | """serial computation time of a parallel calculation | |||
|
297 | ||||
|
298 | Computed as the sum of (completed-started) of each task | |||
|
299 | """ | |||
|
300 | t = 0 | |||
|
301 | for md in self._metadata: | |||
|
302 | t += (md['completed'] - md['started']).total_seconds() | |||
|
303 | return t | |||
|
304 | ||||
|
305 | @property | |||
|
306 | @check_ready | |||
|
307 | def wall_time(self): | |||
|
308 | """actual computation time of a parallel calculation | |||
|
309 | ||||
|
310 | Computed as the time between the latest `received` stamp | |||
|
311 | and the earliest `submitted`. | |||
|
312 | ||||
|
313 | Only reliable if Client was spinning/waiting when the task finished, because | |||
|
314 | the `received` timestamp is created when a result is pulled off of the zmq queue, | |||
|
315 | which happens as a result of `client.spin()`. | |||
|
316 | """ | |||
|
317 | received = max([ md['received'] for md in self._metadata ]) | |||
|
318 | submitted = min([ md['submitted'] for md in self._metadata ]) | |||
|
319 | return (received - submitted).total_seconds() | |||
|
320 | ||||
|
321 | def wait_interactive(self, interval=1., timeout=None): | |||
|
322 | """interactive wait, printing progress at regular intervals""" | |||
|
323 | N = len(self) | |||
|
324 | tic = time.time() | |||
|
325 | while not self.ready() and (timeout is None or time.time() - tic <= timeout): | |||
|
326 | self.wait(interval) | |||
|
327 | clear_output() | |||
|
328 | print "%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed), | |||
|
329 | sys.stdout.flush() | |||
|
330 | ||||
|
331 | print "done" | |||
259 |
|
332 | |||
260 |
|
333 | |||
261 | class AsyncMapResult(AsyncResult): |
|
334 | class AsyncMapResult(AsyncResult): |
General Comments 0
You need to be logged in to leave comments.
Login now