##// END OF EJS Templates
add sugar methods/properties to AsyncResult...
MinRK -
Show More
@@ -15,13 +15,17 b' Authors:'
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import sys
18 import time
19 import time
20 from datetime import datetime
19
21
20 from zmq import MessageTracker
22 from zmq import MessageTracker
21
23
24 from IPython.core.display import clear_output
22 from IPython.external.decorator import decorator
25 from IPython.external.decorator import decorator
23 from IPython.parallel import error
26 from IPython.parallel import error
24
27
28
25 #-----------------------------------------------------------------------------
29 #-----------------------------------------------------------------------------
26 # Classes
30 # Classes
27 #-----------------------------------------------------------------------------
31 #-----------------------------------------------------------------------------
@@ -255,7 +259,76 b' class AsyncResult(object):'
255 # already done
259 # already done
256 for r in rlist:
260 for r in rlist:
257 yield r
261 yield r
258
262
263 def __len__(self):
264 return len(self.msg_ids)
265
266 #-------------------------------------
267 # Sugar methods and attributes
268 #-------------------------------------
269
270 @property
271 def progress(self):
272 """the number of tasks which have been completed at this point.
273
274 Fractional progress would be given by 1.0 * ar.progress / len(ar)
275 """
276 self.wait(0)
277 return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding))
278
279 @property
280 def elapsed(self):
281 """elapsed time since initial submission"""
282 if self.ready():
283 return self.wall_time
284
285 now = submitted = datetime.now()
286 for msg_id in self.msg_ids:
287 if msg_id in self._client.metadata:
288 stamp = self._client.metadata[msg_id]['submitted']
289 if stamp and stamp < submitted:
290 submitted = stamp
291 return (now-submitted).total_seconds()
292
293 @property
294 @check_ready
295 def serial_time(self):
296 """serial computation time of a parallel calculation
297
298 Computed as the sum of (completed-started) of each task
299 """
300 t = 0
301 for md in self._metadata:
302 t += (md['completed'] - md['started']).total_seconds()
303 return t
304
305 @property
306 @check_ready
307 def wall_time(self):
308 """actual computation time of a parallel calculation
309
310 Computed as the time between the latest `received` stamp
311 and the earliest `submitted`.
312
313 Only reliable if Client was spinning/waiting when the task finished, because
314 the `received` timestamp is created when a result is pulled off of the zmq queue,
315 which happens as a result of `client.spin()`.
316 """
317 received = max([ md['received'] for md in self._metadata ])
318 submitted = min([ md['submitted'] for md in self._metadata ])
319 return (received - submitted).total_seconds()
320
321 def wait_interactive(self, interval=1., timeout=None):
322 """interactive wait, printing progress at regular intervals"""
323 N = len(self)
324 tic = time.time()
325 while not self.ready() and (timeout is None or time.time() - tic <= timeout):
326 self.wait(interval)
327 clear_output()
328 print "%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed),
329 sys.stdout.flush()
330 print
331 print "done"
259
332
260
333
261 class AsyncMapResult(AsyncResult):
334 class AsyncMapResult(AsyncResult):
General Comments 0
You need to be logged in to leave comments. Login now