##// END OF EJS Templates
AsyncResult.display_outputs should only print stdout/err if non-empty (single engine)
MinRK -
Show More
@@ -1,651 +1,652 b''
1 1 """AsyncResult objects for the client
2 2
3 3 Authors:
4 4
5 5 * MinRK
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2010-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import sys
19 19 import time
20 20 from datetime import datetime
21 21
22 22 from zmq import MessageTracker
23 23
24 24 from IPython.core.display import clear_output, display
25 25 from IPython.external.decorator import decorator
26 26 from IPython.parallel import error
27 27
28 28 #-----------------------------------------------------------------------------
29 29 # Functions
30 30 #-----------------------------------------------------------------------------
31 31
32 32 def _total_seconds(td):
33 33 """timedelta.total_seconds was added in 2.7"""
34 34 try:
35 35 # Python >= 2.7
36 36 return td.total_seconds()
37 37 except AttributeError:
38 38 # Python 2.6
39 39 return 1e-6 * (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)
40 40
41 41 #-----------------------------------------------------------------------------
42 42 # Classes
43 43 #-----------------------------------------------------------------------------
44 44
45 45 # global empty tracker that's always done:
46 46 finished_tracker = MessageTracker()
47 47
48 48 @decorator
49 49 def check_ready(f, self, *args, **kwargs):
50 50 """Call spin() to sync state prior to calling the method."""
51 51 self.wait(0)
52 52 if not self._ready:
53 53 raise error.TimeoutError("result not ready")
54 54 return f(self, *args, **kwargs)
55 55
56 56 class AsyncResult(object):
57 57 """Class for representing results of non-blocking calls.
58 58
59 59 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
60 60 """
61 61
62 62 msg_ids = None
63 63 _targets = None
64 64 _tracker = None
65 65 _single_result = False
66 66
67 67 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
68 68 if isinstance(msg_ids, basestring):
69 69 # always a list
70 70 msg_ids = [msg_ids]
71 71 if tracker is None:
72 72 # default to always done
73 73 tracker = finished_tracker
74 74 self._client = client
75 75 self.msg_ids = msg_ids
76 76 self._fname=fname
77 77 self._targets = targets
78 78 self._tracker = tracker
79 79 self._ready = False
80 80 self._success = None
81 81 self._metadata = None
82 82 if len(msg_ids) == 1:
83 83 self._single_result = not isinstance(targets, (list, tuple))
84 84 else:
85 85 self._single_result = False
86 86
87 87 def __repr__(self):
88 88 if self._ready:
89 89 return "<%s: finished>"%(self.__class__.__name__)
90 90 else:
91 91 return "<%s: %s>"%(self.__class__.__name__,self._fname)
92 92
93 93
94 94 def _reconstruct_result(self, res):
95 95 """Reconstruct our result from actual result list (always a list)
96 96
97 97 Override me in subclasses for turning a list of results
98 98 into the expected form.
99 99 """
100 100 if self._single_result:
101 101 return res[0]
102 102 else:
103 103 return res
104 104
105 105 def get(self, timeout=-1):
106 106 """Return the result when it arrives.
107 107
108 108 If `timeout` is not ``None`` and the result does not arrive within
109 109 `timeout` seconds then ``TimeoutError`` is raised. If the
110 110 remote call raised an exception then that exception will be reraised
111 111 by get() inside a `RemoteError`.
112 112 """
113 113 if not self.ready():
114 114 self.wait(timeout)
115 115
116 116 if self._ready:
117 117 if self._success:
118 118 return self._result
119 119 else:
120 120 raise self._exception
121 121 else:
122 122 raise error.TimeoutError("Result not ready.")
123 123
124 124 def ready(self):
125 125 """Return whether the call has completed."""
126 126 if not self._ready:
127 127 self.wait(0)
128 128 return self._ready
129 129
130 130 def wait(self, timeout=-1):
131 131 """Wait until the result is available or until `timeout` seconds pass.
132 132
133 133 This method always returns None.
134 134 """
135 135 if self._ready:
136 136 return
137 137 self._ready = self._client.wait(self.msg_ids, timeout)
138 138 if self._ready:
139 139 try:
140 140 results = map(self._client.results.get, self.msg_ids)
141 141 self._result = results
142 142 if self._single_result:
143 143 r = results[0]
144 144 if isinstance(r, Exception):
145 145 raise r
146 146 else:
147 147 results = error.collect_exceptions(results, self._fname)
148 148 self._result = self._reconstruct_result(results)
149 149 except Exception, e:
150 150 self._exception = e
151 151 self._success = False
152 152 else:
153 153 self._success = True
154 154 finally:
155 155 self._metadata = map(self._client.metadata.get, self.msg_ids)
156 156
157 157
158 158 def successful(self):
159 159 """Return whether the call completed without raising an exception.
160 160
161 161 Will raise ``AssertionError`` if the result is not ready.
162 162 """
163 163 assert self.ready()
164 164 return self._success
165 165
166 166 #----------------------------------------------------------------
167 167 # Extra methods not in mp.pool.AsyncResult
168 168 #----------------------------------------------------------------
169 169
170 170 def get_dict(self, timeout=-1):
171 171 """Get the results as a dict, keyed by engine_id.
172 172
173 173 timeout behavior is described in `get()`.
174 174 """
175 175
176 176 results = self.get(timeout)
177 177 engine_ids = [ md['engine_id'] for md in self._metadata ]
178 178 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
179 179 maxcount = bycount.count(bycount[-1])
180 180 if maxcount > 1:
181 181 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
182 182 maxcount, bycount[-1]))
183 183
184 184 return dict(zip(engine_ids,results))
185 185
186 186 @property
187 187 def result(self):
188 188 """result property wrapper for `get(timeout=0)`."""
189 189 return self.get()
190 190
191 191 # abbreviated alias:
192 192 r = result
193 193
194 194 @property
195 195 @check_ready
196 196 def metadata(self):
197 197 """property for accessing execution metadata."""
198 198 if self._single_result:
199 199 return self._metadata[0]
200 200 else:
201 201 return self._metadata
202 202
203 203 @property
204 204 def result_dict(self):
205 205 """result property as a dict."""
206 206 return self.get_dict()
207 207
208 208 def __dict__(self):
209 209 return self.get_dict(0)
210 210
211 211 def abort(self):
212 212 """abort my tasks."""
213 213 assert not self.ready(), "Can't abort, I am already done!"
214 214 return self._client.abort(self.msg_ids, targets=self._targets, block=True)
215 215
216 216 @property
217 217 def sent(self):
218 218 """check whether my messages have been sent."""
219 219 return self._tracker.done
220 220
221 221 def wait_for_send(self, timeout=-1):
222 222 """wait for pyzmq send to complete.
223 223
224 224 This is necessary when sending arrays that you intend to edit in-place.
225 225 `timeout` is in seconds, and will raise TimeoutError if it is reached
226 226 before the send completes.
227 227 """
228 228 return self._tracker.wait(timeout)
229 229
230 230 #-------------------------------------
231 231 # dict-access
232 232 #-------------------------------------
233 233
234 234 @check_ready
235 235 def __getitem__(self, key):
236 236 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
237 237 """
238 238 if isinstance(key, int):
239 239 return error.collect_exceptions([self._result[key]], self._fname)[0]
240 240 elif isinstance(key, slice):
241 241 return error.collect_exceptions(self._result[key], self._fname)
242 242 elif isinstance(key, basestring):
243 243 values = [ md[key] for md in self._metadata ]
244 244 if self._single_result:
245 245 return values[0]
246 246 else:
247 247 return values
248 248 else:
249 249 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
250 250
251 251 def __getattr__(self, key):
252 252 """getattr maps to getitem for convenient attr access to metadata."""
253 253 try:
254 254 return self.__getitem__(key)
255 255 except (error.TimeoutError, KeyError):
256 256 raise AttributeError("%r object has no attribute %r"%(
257 257 self.__class__.__name__, key))
258 258
259 259 # asynchronous iterator:
260 260 def __iter__(self):
261 261 if self._single_result:
262 262 raise TypeError("AsyncResults with a single result are not iterable.")
263 263 try:
264 264 rlist = self.get(0)
265 265 except error.TimeoutError:
266 266 # wait for each result individually
267 267 for msg_id in self.msg_ids:
268 268 ar = AsyncResult(self._client, msg_id, self._fname)
269 269 yield ar.get()
270 270 else:
271 271 # already done
272 272 for r in rlist:
273 273 yield r
274 274
275 275 def __len__(self):
276 276 return len(self.msg_ids)
277 277
278 278 #-------------------------------------
279 279 # Sugar methods and attributes
280 280 #-------------------------------------
281 281
282 282 def timedelta(self, start, end, start_key=min, end_key=max):
283 283 """compute the difference between two sets of timestamps
284 284
285 285 The default behavior is to use the earliest of the first
286 286 and the latest of the second list, but this can be changed
287 287 by passing a different
288 288
289 289 Parameters
290 290 ----------
291 291
292 292 start : one or more datetime objects (e.g. ar.submitted)
293 293 end : one or more datetime objects (e.g. ar.received)
294 294 start_key : callable
295 295 Function to call on `start` to extract the relevant
296 296 entry [defalt: min]
297 297 end_key : callable
298 298 Function to call on `end` to extract the relevant
299 299 entry [default: max]
300 300
301 301 Returns
302 302 -------
303 303
304 304 dt : float
305 305 The time elapsed (in seconds) between the two selected timestamps.
306 306 """
307 307 if not isinstance(start, datetime):
308 308 # handle single_result AsyncResults, where ar.stamp is single object,
309 309 # not a list
310 310 start = start_key(start)
311 311 if not isinstance(end, datetime):
312 312 # handle single_result AsyncResults, where ar.stamp is single object,
313 313 # not a list
314 314 end = end_key(end)
315 315 return _total_seconds(end - start)
316 316
317 317 @property
318 318 def progress(self):
319 319 """the number of tasks which have been completed at this point.
320 320
321 321 Fractional progress would be given by 1.0 * ar.progress / len(ar)
322 322 """
323 323 self.wait(0)
324 324 return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding))
325 325
326 326 @property
327 327 def elapsed(self):
328 328 """elapsed time since initial submission"""
329 329 if self.ready():
330 330 return self.wall_time
331 331
332 332 now = submitted = datetime.now()
333 333 for msg_id in self.msg_ids:
334 334 if msg_id in self._client.metadata:
335 335 stamp = self._client.metadata[msg_id]['submitted']
336 336 if stamp and stamp < submitted:
337 337 submitted = stamp
338 338 return _total_seconds(now-submitted)
339 339
340 340 @property
341 341 @check_ready
342 342 def serial_time(self):
343 343 """serial computation time of a parallel calculation
344 344
345 345 Computed as the sum of (completed-started) of each task
346 346 """
347 347 t = 0
348 348 for md in self._metadata:
349 349 t += _total_seconds(md['completed'] - md['started'])
350 350 return t
351 351
352 352 @property
353 353 @check_ready
354 354 def wall_time(self):
355 355 """actual computation time of a parallel calculation
356 356
357 357 Computed as the time between the latest `received` stamp
358 358 and the earliest `submitted`.
359 359
360 360 Only reliable if Client was spinning/waiting when the task finished, because
361 361 the `received` timestamp is created when a result is pulled off of the zmq queue,
362 362 which happens as a result of `client.spin()`.
363 363
364 364 For similar comparison of other timestamp pairs, check out AsyncResult.timedelta.
365 365
366 366 """
367 367 return self.timedelta(self.submitted, self.received)
368 368
369 369 def wait_interactive(self, interval=1., timeout=None):
370 370 """interactive wait, printing progress at regular intervals"""
371 371 N = len(self)
372 372 tic = time.time()
373 373 while not self.ready() and (timeout is None or time.time() - tic <= timeout):
374 374 self.wait(interval)
375 375 clear_output()
376 376 print "%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed),
377 377 sys.stdout.flush()
378 378 print
379 379 print "done"
380 380
381 381 def _republish_displaypub(self, content, eid):
382 382 """republish individual displaypub content dicts"""
383 383 try:
384 384 ip = get_ipython()
385 385 except NameError:
386 386 # displaypub is meaningless outside IPython
387 387 return
388 388 md = content['metadata'] or {}
389 389 md['engine'] = eid
390 390 ip.display_pub.publish(content['source'], content['data'], md)
391 391
392 392
393 393 def _display_single_result(self):
394
395 print self.stdout
396 print >> sys.stderr, self.stderr
394 if self.stdout:
395 print self.stdout
396 if self.stderr:
397 print >> sys.stderr, self.stderr
397 398
398 399 try:
399 400 get_ipython()
400 401 except NameError:
401 402 # displaypub is meaningless outside IPython
402 403 return
403 404
404 405 for output in self.outputs:
405 406 self._republish_displaypub(output, self.engine_id)
406 407
407 408 if self.pyout is not None:
408 409 display(self.get())
409 410
410 411 @check_ready
411 412 def display_outputs(self, groupby="type"):
412 413 """republish the outputs of the computation
413 414
414 415 Parameters
415 416 ----------
416 417
417 418 groupby : str [default: type]
418 419 if 'type':
419 420 Group outputs by type (show all stdout, then all stderr, etc.):
420 421
421 422 [stdout:1] foo
422 423 [stdout:2] foo
423 424 [stderr:1] bar
424 425 [stderr:2] bar
425 426 if 'engine':
426 427 Display outputs for each engine before moving on to the next:
427 428
428 429 [stdout:1] foo
429 430 [stderr:1] bar
430 431 [stdout:2] foo
431 432 [stderr:2] bar
432 433
433 434 if 'order':
434 435 Like 'type', but further collate individual displaypub
435 436 outputs. This is meant for cases of each command producing
436 437 several plots, and you would like to see all of the first
437 438 plots together, then all of the second plots, and so on.
438 439 """
439 440 # flush iopub, just in case
440 441 self._client._flush_iopub(self._client._iopub_socket)
441 442 if self._single_result:
442 443 self._display_single_result()
443 444 return
444 445
445 446 stdouts = [s.rstrip() for s in self.stdout]
446 447 stderrs = [s.rstrip() for s in self.stderr]
447 448 pyouts = [p for p in self.pyout]
448 449 output_lists = self.outputs
449 450 results = self.get()
450 451
451 452 targets = self.engine_id
452 453
453 454 if groupby == "engine":
454 455 for eid,stdout,stderr,outputs,r,pyout in zip(
455 456 targets, stdouts, stderrs, output_lists, results, pyouts
456 457 ):
457 458 if stdout:
458 459 print '[stdout:%i]' % eid, stdout
459 460 if stderr:
460 461 print >> sys.stderr, '[stderr:%i]' % eid, stderr
461 462
462 463 try:
463 464 get_ipython()
464 465 except NameError:
465 466 # displaypub is meaningless outside IPython
466 467 return
467 468
468 469 for output in outputs:
469 470 self._republish_displaypub(output, eid)
470 471
471 472 if pyout is not None:
472 473 display(r)
473 474
474 475 elif groupby in ('type', 'order'):
475 476 # republish stdout:
476 477 if any(stdouts):
477 478 for eid,stdout in zip(targets, stdouts):
478 479 print '[stdout:%i]' % eid, stdout
479 480
480 481 # republish stderr:
481 482 if any(stderrs):
482 483 for eid,stderr in zip(targets, stderrs):
483 484 print >> sys.stderr, '[stderr:%i]' % eid, stderr
484 485
485 486 try:
486 487 get_ipython()
487 488 except NameError:
488 489 # displaypub is meaningless outside IPython
489 490 return
490 491
491 492 if groupby == 'order':
492 493 output_dict = dict((eid, outputs) for eid,outputs in zip(targets, output_lists))
493 494 N = max(len(outputs) for outputs in output_lists)
494 495 for i in range(N):
495 496 for eid in targets:
496 497 outputs = output_dict[eid]
497 498 if len(outputs) >= N:
498 499 self._republish_displaypub(outputs[i], eid)
499 500 else:
500 501 # republish displaypub output
501 502 for eid,outputs in zip(targets, output_lists):
502 503 for output in outputs:
503 504 self._republish_displaypub(output, eid)
504 505
505 506 # finally, add pyout:
506 507 for eid,r,pyout in zip(targets, results, pyouts):
507 508 if pyout is not None:
508 509 display(r)
509 510
510 511 else:
511 512 raise ValueError("groupby must be one of 'type', 'engine', 'collate', not %r" % groupby)
512 513
513 514
514 515
515 516
516 517 class AsyncMapResult(AsyncResult):
517 518 """Class for representing results of non-blocking gathers.
518 519
519 520 This will properly reconstruct the gather.
520 521
521 522 This class is iterable at any time, and will wait on results as they come.
522 523
523 524 If ordered=False, then the first results to arrive will come first, otherwise
524 525 results will be yielded in the order they were submitted.
525 526
526 527 """
527 528
528 529 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
529 530 AsyncResult.__init__(self, client, msg_ids, fname=fname)
530 531 self._mapObject = mapObject
531 532 self._single_result = False
532 533 self.ordered = ordered
533 534
534 535 def _reconstruct_result(self, res):
535 536 """Perform the gather on the actual results."""
536 537 return self._mapObject.joinPartitions(res)
537 538
538 539 # asynchronous iterator:
539 540 def __iter__(self):
540 541 it = self._ordered_iter if self.ordered else self._unordered_iter
541 542 for r in it():
542 543 yield r
543 544
544 545 # asynchronous ordered iterator:
545 546 def _ordered_iter(self):
546 547 """iterator for results *as they arrive*, preserving submission order."""
547 548 try:
548 549 rlist = self.get(0)
549 550 except error.TimeoutError:
550 551 # wait for each result individually
551 552 for msg_id in self.msg_ids:
552 553 ar = AsyncResult(self._client, msg_id, self._fname)
553 554 rlist = ar.get()
554 555 try:
555 556 for r in rlist:
556 557 yield r
557 558 except TypeError:
558 559 # flattened, not a list
559 560 # this could get broken by flattened data that returns iterables
560 561 # but most calls to map do not expose the `flatten` argument
561 562 yield rlist
562 563 else:
563 564 # already done
564 565 for r in rlist:
565 566 yield r
566 567
567 568 # asynchronous unordered iterator:
568 569 def _unordered_iter(self):
569 570 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
570 571 try:
571 572 rlist = self.get(0)
572 573 except error.TimeoutError:
573 574 pending = set(self.msg_ids)
574 575 while pending:
575 576 try:
576 577 self._client.wait(pending, 1e-3)
577 578 except error.TimeoutError:
578 579 # ignore timeout error, because that only means
579 580 # *some* jobs are outstanding
580 581 pass
581 582 # update ready set with those no longer outstanding:
582 583 ready = pending.difference(self._client.outstanding)
583 584 # update pending to exclude those that are finished
584 585 pending = pending.difference(ready)
585 586 while ready:
586 587 msg_id = ready.pop()
587 588 ar = AsyncResult(self._client, msg_id, self._fname)
588 589 rlist = ar.get()
589 590 try:
590 591 for r in rlist:
591 592 yield r
592 593 except TypeError:
593 594 # flattened, not a list
594 595 # this could get broken by flattened data that returns iterables
595 596 # but most calls to map do not expose the `flatten` argument
596 597 yield rlist
597 598 else:
598 599 # already done
599 600 for r in rlist:
600 601 yield r
601 602
602 603
603 604
604 605 class AsyncHubResult(AsyncResult):
605 606 """Class to wrap pending results that must be requested from the Hub.
606 607
607 608 Note that waiting/polling on these objects requires polling the Hubover the network,
608 609 so use `AsyncHubResult.wait()` sparingly.
609 610 """
610 611
611 612 def wait(self, timeout=-1):
612 613 """wait for result to complete."""
613 614 start = time.time()
614 615 if self._ready:
615 616 return
616 617 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
617 618 local_ready = self._client.wait(local_ids, timeout)
618 619 if local_ready:
619 620 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
620 621 if not remote_ids:
621 622 self._ready = True
622 623 else:
623 624 rdict = self._client.result_status(remote_ids, status_only=False)
624 625 pending = rdict['pending']
625 626 while pending and (timeout < 0 or time.time() < start+timeout):
626 627 rdict = self._client.result_status(remote_ids, status_only=False)
627 628 pending = rdict['pending']
628 629 if pending:
629 630 time.sleep(0.1)
630 631 if not pending:
631 632 self._ready = True
632 633 if self._ready:
633 634 try:
634 635 results = map(self._client.results.get, self.msg_ids)
635 636 self._result = results
636 637 if self._single_result:
637 638 r = results[0]
638 639 if isinstance(r, Exception):
639 640 raise r
640 641 else:
641 642 results = error.collect_exceptions(results, self._fname)
642 643 self._result = self._reconstruct_result(results)
643 644 except Exception, e:
644 645 self._exception = e
645 646 self._success = False
646 647 else:
647 648 self._success = True
648 649 finally:
649 650 self._metadata = map(self._client.metadata.get, self.msg_ids)
650 651
651 652 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult'] No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now