##// END OF EJS Templates
fix logic for infinite wait cutoff when waiting for outputs after receiving result
MinRK -
Show More
@@ -1,707 +1,708 b''
1 1 """AsyncResult objects for the client
2 2
3 3 Authors:
4 4
5 5 * MinRK
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2010-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import print_function
19 19
20 20 import sys
21 21 import time
22 22 from datetime import datetime
23 23
24 24 from zmq import MessageTracker
25 25
26 26 from IPython.core.display import clear_output, display, display_pretty
27 27 from IPython.external.decorator import decorator
28 28 from IPython.parallel import error
29 29
30 30 #-----------------------------------------------------------------------------
31 31 # Functions
32 32 #-----------------------------------------------------------------------------
33 33
34 34 def _total_seconds(td):
35 35 """timedelta.total_seconds was added in 2.7"""
36 36 try:
37 37 # Python >= 2.7
38 38 return td.total_seconds()
39 39 except AttributeError:
40 40 # Python 2.6
41 41 return 1e-6 * (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)
42 42
43 43 def _raw_text(s):
44 44 display_pretty(s, raw=True)
45 45
46 46 #-----------------------------------------------------------------------------
47 47 # Classes
48 48 #-----------------------------------------------------------------------------
49 49
50 50 # global empty tracker that's always done:
51 51 finished_tracker = MessageTracker()
52 52
53 53 @decorator
54 54 def check_ready(f, self, *args, **kwargs):
55 55 """Call spin() to sync state prior to calling the method."""
56 56 self.wait(0)
57 57 if not self._ready:
58 58 raise error.TimeoutError("result not ready")
59 59 return f(self, *args, **kwargs)
60 60
61 61 class AsyncResult(object):
62 62 """Class for representing results of non-blocking calls.
63 63
64 64 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
65 65 """
66 66
67 67 msg_ids = None
68 68 _targets = None
69 69 _tracker = None
70 70 _single_result = False
71 71
72 72 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
73 73 if isinstance(msg_ids, basestring):
74 74 # always a list
75 75 msg_ids = [msg_ids]
76 76 if tracker is None:
77 77 # default to always done
78 78 tracker = finished_tracker
79 79 self._client = client
80 80 self.msg_ids = msg_ids
81 81 self._fname=fname
82 82 self._targets = targets
83 83 self._tracker = tracker
84 84 self._ready = False
85 85 self._outputs_ready = False
86 86 self._success = None
87 87 self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ]
88 88 if len(msg_ids) == 1:
89 89 self._single_result = not isinstance(targets, (list, tuple))
90 90 else:
91 91 self._single_result = False
92 92
93 93 def __repr__(self):
94 94 if self._ready:
95 95 return "<%s: finished>"%(self.__class__.__name__)
96 96 else:
97 97 return "<%s: %s>"%(self.__class__.__name__,self._fname)
98 98
99 99
100 100 def _reconstruct_result(self, res):
101 101 """Reconstruct our result from actual result list (always a list)
102 102
103 103 Override me in subclasses for turning a list of results
104 104 into the expected form.
105 105 """
106 106 if self._single_result:
107 107 return res[0]
108 108 else:
109 109 return res
110 110
111 111 def get(self, timeout=-1):
112 112 """Return the result when it arrives.
113 113
114 114 If `timeout` is not ``None`` and the result does not arrive within
115 115 `timeout` seconds then ``TimeoutError`` is raised. If the
116 116 remote call raised an exception then that exception will be reraised
117 117 by get() inside a `RemoteError`.
118 118 """
119 119 if not self.ready():
120 120 self.wait(timeout)
121 121
122 122 if self._ready:
123 123 if self._success:
124 124 return self._result
125 125 else:
126 126 raise self._exception
127 127 else:
128 128 raise error.TimeoutError("Result not ready.")
129 129
130 130 def _check_ready(self):
131 131 if not self.ready():
132 132 raise error.TimeoutError("Result not ready.")
133 133
134 134 def ready(self):
135 135 """Return whether the call has completed."""
136 136 if not self._ready:
137 137 self.wait(0)
138 138 elif not self._outputs_ready:
139 139 self._wait_for_outputs(0)
140 140
141 141 return self._ready
142 142
143 143 def wait(self, timeout=-1):
144 144 """Wait until the result is available or until `timeout` seconds pass.
145 145
146 146 This method always returns None.
147 147 """
148 148 if self._ready:
149 149 self._wait_for_outputs(timeout)
150 150 return
151 151 self._ready = self._client.wait(self.msg_ids, timeout)
152 152 if self._ready:
153 153 try:
154 154 results = map(self._client.results.get, self.msg_ids)
155 155 self._result = results
156 156 if self._single_result:
157 157 r = results[0]
158 158 if isinstance(r, Exception):
159 159 raise r
160 160 else:
161 161 results = error.collect_exceptions(results, self._fname)
162 162 self._result = self._reconstruct_result(results)
163 163 except Exception as e:
164 164 self._exception = e
165 165 self._success = False
166 166 else:
167 167 self._success = True
168 168 finally:
169 if timeout is not None and timeout < 0:
169 if timeout is None or timeout < 0:
170 # cutoff infinite wait at 10s
170 171 timeout = 10
171 172 self._wait_for_outputs(timeout)
172 173
173 174
174 175 def successful(self):
175 176 """Return whether the call completed without raising an exception.
176 177
177 178 Will raise ``AssertionError`` if the result is not ready.
178 179 """
179 180 assert self.ready()
180 181 return self._success
181 182
182 183 #----------------------------------------------------------------
183 184 # Extra methods not in mp.pool.AsyncResult
184 185 #----------------------------------------------------------------
185 186
186 187 def get_dict(self, timeout=-1):
187 188 """Get the results as a dict, keyed by engine_id.
188 189
189 190 timeout behavior is described in `get()`.
190 191 """
191 192
192 193 results = self.get(timeout)
193 194 engine_ids = [ md['engine_id'] for md in self._metadata ]
194 195 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
195 196 maxcount = bycount.count(bycount[-1])
196 197 if maxcount > 1:
197 198 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
198 199 maxcount, bycount[-1]))
199 200
200 201 return dict(zip(engine_ids,results))
201 202
202 203 @property
203 204 def result(self):
204 205 """result property wrapper for `get(timeout=-1)`."""
205 206 return self.get()
206 207
207 208 # abbreviated alias:
208 209 r = result
209 210
210 211 @property
211 212 def metadata(self):
212 213 """property for accessing execution metadata."""
213 214 if self._single_result:
214 215 return self._metadata[0]
215 216 else:
216 217 return self._metadata
217 218
218 219 @property
219 220 def result_dict(self):
220 221 """result property as a dict."""
221 222 return self.get_dict()
222 223
223 224 def __dict__(self):
224 225 return self.get_dict(0)
225 226
226 227 def abort(self):
227 228 """abort my tasks."""
228 229 assert not self.ready(), "Can't abort, I am already done!"
229 230 return self._client.abort(self.msg_ids, targets=self._targets, block=True)
230 231
231 232 @property
232 233 def sent(self):
233 234 """check whether my messages have been sent."""
234 235 return self._tracker.done
235 236
236 237 def wait_for_send(self, timeout=-1):
237 238 """wait for pyzmq send to complete.
238 239
239 240 This is necessary when sending arrays that you intend to edit in-place.
240 241 `timeout` is in seconds, and will raise TimeoutError if it is reached
241 242 before the send completes.
242 243 """
243 244 return self._tracker.wait(timeout)
244 245
245 246 #-------------------------------------
246 247 # dict-access
247 248 #-------------------------------------
248 249
249 250 def __getitem__(self, key):
250 251 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
251 252 """
252 253 if isinstance(key, int):
253 254 self._check_ready()
254 255 return error.collect_exceptions([self._result[key]], self._fname)[0]
255 256 elif isinstance(key, slice):
256 257 self._check_ready()
257 258 return error.collect_exceptions(self._result[key], self._fname)
258 259 elif isinstance(key, basestring):
259 260 # metadata proxy *does not* require that results are done
260 261 self.wait(0)
261 262 values = [ md[key] for md in self._metadata ]
262 263 if self._single_result:
263 264 return values[0]
264 265 else:
265 266 return values
266 267 else:
267 268 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
268 269
269 270 def __getattr__(self, key):
270 271 """getattr maps to getitem for convenient attr access to metadata."""
271 272 try:
272 273 return self.__getitem__(key)
273 274 except (error.TimeoutError, KeyError):
274 275 raise AttributeError("%r object has no attribute %r"%(
275 276 self.__class__.__name__, key))
276 277
277 278 # asynchronous iterator:
278 279 def __iter__(self):
279 280 if self._single_result:
280 281 raise TypeError("AsyncResults with a single result are not iterable.")
281 282 try:
282 283 rlist = self.get(0)
283 284 except error.TimeoutError:
284 285 # wait for each result individually
285 286 for msg_id in self.msg_ids:
286 287 ar = AsyncResult(self._client, msg_id, self._fname)
287 288 yield ar.get()
288 289 else:
289 290 # already done
290 291 for r in rlist:
291 292 yield r
292 293
293 294 def __len__(self):
294 295 return len(self.msg_ids)
295 296
296 297 #-------------------------------------
297 298 # Sugar methods and attributes
298 299 #-------------------------------------
299 300
300 301 def timedelta(self, start, end, start_key=min, end_key=max):
301 302 """compute the difference between two sets of timestamps
302 303
303 304 The default behavior is to use the earliest of the first
304 305 and the latest of the second list, but this can be changed
305 306 by passing a different
306 307
307 308 Parameters
308 309 ----------
309 310
310 311 start : one or more datetime objects (e.g. ar.submitted)
311 312 end : one or more datetime objects (e.g. ar.received)
312 313 start_key : callable
313 314 Function to call on `start` to extract the relevant
314 315 entry [defalt: min]
315 316 end_key : callable
316 317 Function to call on `end` to extract the relevant
317 318 entry [default: max]
318 319
319 320 Returns
320 321 -------
321 322
322 323 dt : float
323 324 The time elapsed (in seconds) between the two selected timestamps.
324 325 """
325 326 if not isinstance(start, datetime):
326 327 # handle single_result AsyncResults, where ar.stamp is single object,
327 328 # not a list
328 329 start = start_key(start)
329 330 if not isinstance(end, datetime):
330 331 # handle single_result AsyncResults, where ar.stamp is single object,
331 332 # not a list
332 333 end = end_key(end)
333 334 return _total_seconds(end - start)
334 335
335 336 @property
336 337 def progress(self):
337 338 """the number of tasks which have been completed at this point.
338 339
339 340 Fractional progress would be given by 1.0 * ar.progress / len(ar)
340 341 """
341 342 self.wait(0)
342 343 return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding))
343 344
344 345 @property
345 346 def elapsed(self):
346 347 """elapsed time since initial submission"""
347 348 if self.ready():
348 349 return self.wall_time
349 350
350 351 now = submitted = datetime.now()
351 352 for msg_id in self.msg_ids:
352 353 if msg_id in self._client.metadata:
353 354 stamp = self._client.metadata[msg_id]['submitted']
354 355 if stamp and stamp < submitted:
355 356 submitted = stamp
356 357 return _total_seconds(now-submitted)
357 358
358 359 @property
359 360 @check_ready
360 361 def serial_time(self):
361 362 """serial computation time of a parallel calculation
362 363
363 364 Computed as the sum of (completed-started) of each task
364 365 """
365 366 t = 0
366 367 for md in self._metadata:
367 368 t += _total_seconds(md['completed'] - md['started'])
368 369 return t
369 370
370 371 @property
371 372 @check_ready
372 373 def wall_time(self):
373 374 """actual computation time of a parallel calculation
374 375
375 376 Computed as the time between the latest `received` stamp
376 377 and the earliest `submitted`.
377 378
378 379 Only reliable if Client was spinning/waiting when the task finished, because
379 380 the `received` timestamp is created when a result is pulled off of the zmq queue,
380 381 which happens as a result of `client.spin()`.
381 382
382 383 For similar comparison of other timestamp pairs, check out AsyncResult.timedelta.
383 384
384 385 """
385 386 return self.timedelta(self.submitted, self.received)
386 387
387 388 def wait_interactive(self, interval=1., timeout=-1):
388 389 """interactive wait, printing progress at regular intervals"""
389 390 if timeout is None:
390 391 timeout = -1
391 392 N = len(self)
392 393 tic = time.time()
393 394 while not self.ready() and (timeout < 0 or time.time() - tic <= timeout):
394 395 self.wait(interval)
395 396 clear_output()
396 397 print("%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed), end="")
397 398 sys.stdout.flush()
398 399 print()
399 400 print("done")
400 401
401 402 def _republish_displaypub(self, content, eid):
402 403 """republish individual displaypub content dicts"""
403 404 try:
404 405 ip = get_ipython()
405 406 except NameError:
406 407 # displaypub is meaningless outside IPython
407 408 return
408 409 md = content['metadata'] or {}
409 410 md['engine'] = eid
410 411 ip.display_pub.publish(content['source'], content['data'], md)
411 412
412 413 def _display_stream(self, text, prefix='', file=None):
413 414 if not text:
414 415 # nothing to display
415 416 return
416 417 if file is None:
417 418 file = sys.stdout
418 419 end = '' if text.endswith('\n') else '\n'
419 420
420 421 multiline = text.count('\n') > int(text.endswith('\n'))
421 422 if prefix and multiline and not text.startswith('\n'):
422 423 prefix = prefix + '\n'
423 424 print("%s%s" % (prefix, text), file=file, end=end)
424 425
425 426
426 427 def _display_single_result(self):
427 428 self._display_stream(self.stdout)
428 429 self._display_stream(self.stderr, file=sys.stderr)
429 430
430 431 try:
431 432 get_ipython()
432 433 except NameError:
433 434 # displaypub is meaningless outside IPython
434 435 return
435 436
436 437 for output in self.outputs:
437 438 self._republish_displaypub(output, self.engine_id)
438 439
439 440 if self.pyout is not None:
440 441 display(self.get())
441 442
442 443 def _wait_for_outputs(self, timeout=-1):
443 444 """wait for the 'status=idle' message that indicates we have all outputs
444 445 """
445 446 if self._outputs_ready or not self._success:
446 447 # don't wait on errors
447 448 return
448 449
449 450 # cast None to -1 for infinite timeout
450 451 if timeout is None:
451 452 timeout = -1
452 453
453 454 tic = time.time()
454 455 self._client._flush_iopub(self._client._iopub_socket)
455 456 self._outputs_ready = all(md['outputs_ready'] for md in self._metadata)
456 457 while not self._outputs_ready:
457 458 time.sleep(0.01)
458 459 self._client._flush_iopub(self._client._iopub_socket)
459 460 self._outputs_ready = all(md['outputs_ready'] for md in self._metadata)
460 461 if timeout >= 0 and time.time() > tic + timeout:
461 462 break
462 463
463 464 @check_ready
464 465 def display_outputs(self, groupby="type"):
465 466 """republish the outputs of the computation
466 467
467 468 Parameters
468 469 ----------
469 470
470 471 groupby : str [default: type]
471 472 if 'type':
472 473 Group outputs by type (show all stdout, then all stderr, etc.):
473 474
474 475 [stdout:1] foo
475 476 [stdout:2] foo
476 477 [stderr:1] bar
477 478 [stderr:2] bar
478 479 if 'engine':
479 480 Display outputs for each engine before moving on to the next:
480 481
481 482 [stdout:1] foo
482 483 [stderr:1] bar
483 484 [stdout:2] foo
484 485 [stderr:2] bar
485 486
486 487 if 'order':
487 488 Like 'type', but further collate individual displaypub
488 489 outputs. This is meant for cases of each command producing
489 490 several plots, and you would like to see all of the first
490 491 plots together, then all of the second plots, and so on.
491 492 """
492 493 if self._single_result:
493 494 self._display_single_result()
494 495 return
495 496
496 497 stdouts = self.stdout
497 498 stderrs = self.stderr
498 499 pyouts = self.pyout
499 500 output_lists = self.outputs
500 501 results = self.get()
501 502
502 503 targets = self.engine_id
503 504
504 505 if groupby == "engine":
505 506 for eid,stdout,stderr,outputs,r,pyout in zip(
506 507 targets, stdouts, stderrs, output_lists, results, pyouts
507 508 ):
508 509 self._display_stream(stdout, '[stdout:%i] ' % eid)
509 510 self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
510 511
511 512 try:
512 513 get_ipython()
513 514 except NameError:
514 515 # displaypub is meaningless outside IPython
515 516 return
516 517
517 518 if outputs or pyout is not None:
518 519 _raw_text('[output:%i]' % eid)
519 520
520 521 for output in outputs:
521 522 self._republish_displaypub(output, eid)
522 523
523 524 if pyout is not None:
524 525 display(r)
525 526
526 527 elif groupby in ('type', 'order'):
527 528 # republish stdout:
528 529 for eid,stdout in zip(targets, stdouts):
529 530 self._display_stream(stdout, '[stdout:%i] ' % eid)
530 531
531 532 # republish stderr:
532 533 for eid,stderr in zip(targets, stderrs):
533 534 self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
534 535
535 536 try:
536 537 get_ipython()
537 538 except NameError:
538 539 # displaypub is meaningless outside IPython
539 540 return
540 541
541 542 if groupby == 'order':
542 543 output_dict = dict((eid, outputs) for eid,outputs in zip(targets, output_lists))
543 544 N = max(len(outputs) for outputs in output_lists)
544 545 for i in range(N):
545 546 for eid in targets:
546 547 outputs = output_dict[eid]
547 548 if len(outputs) >= N:
548 549 _raw_text('[output:%i]' % eid)
549 550 self._republish_displaypub(outputs[i], eid)
550 551 else:
551 552 # republish displaypub output
552 553 for eid,outputs in zip(targets, output_lists):
553 554 if outputs:
554 555 _raw_text('[output:%i]' % eid)
555 556 for output in outputs:
556 557 self._republish_displaypub(output, eid)
557 558
558 559 # finally, add pyout:
559 560 for eid,r,pyout in zip(targets, results, pyouts):
560 561 if pyout is not None:
561 562 display(r)
562 563
563 564 else:
564 565 raise ValueError("groupby must be one of 'type', 'engine', 'collate', not %r" % groupby)
565 566
566 567
567 568
568 569
569 570 class AsyncMapResult(AsyncResult):
570 571 """Class for representing results of non-blocking gathers.
571 572
572 573 This will properly reconstruct the gather.
573 574
574 575 This class is iterable at any time, and will wait on results as they come.
575 576
576 577 If ordered=False, then the first results to arrive will come first, otherwise
577 578 results will be yielded in the order they were submitted.
578 579
579 580 """
580 581
581 582 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
582 583 AsyncResult.__init__(self, client, msg_ids, fname=fname)
583 584 self._mapObject = mapObject
584 585 self._single_result = False
585 586 self.ordered = ordered
586 587
587 588 def _reconstruct_result(self, res):
588 589 """Perform the gather on the actual results."""
589 590 return self._mapObject.joinPartitions(res)
590 591
591 592 # asynchronous iterator:
592 593 def __iter__(self):
593 594 it = self._ordered_iter if self.ordered else self._unordered_iter
594 595 for r in it():
595 596 yield r
596 597
597 598 # asynchronous ordered iterator:
598 599 def _ordered_iter(self):
599 600 """iterator for results *as they arrive*, preserving submission order."""
600 601 try:
601 602 rlist = self.get(0)
602 603 except error.TimeoutError:
603 604 # wait for each result individually
604 605 for msg_id in self.msg_ids:
605 606 ar = AsyncResult(self._client, msg_id, self._fname)
606 607 rlist = ar.get()
607 608 try:
608 609 for r in rlist:
609 610 yield r
610 611 except TypeError:
611 612 # flattened, not a list
612 613 # this could get broken by flattened data that returns iterables
613 614 # but most calls to map do not expose the `flatten` argument
614 615 yield rlist
615 616 else:
616 617 # already done
617 618 for r in rlist:
618 619 yield r
619 620
620 621 # asynchronous unordered iterator:
621 622 def _unordered_iter(self):
622 623 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
623 624 try:
624 625 rlist = self.get(0)
625 626 except error.TimeoutError:
626 627 pending = set(self.msg_ids)
627 628 while pending:
628 629 try:
629 630 self._client.wait(pending, 1e-3)
630 631 except error.TimeoutError:
631 632 # ignore timeout error, because that only means
632 633 # *some* jobs are outstanding
633 634 pass
634 635 # update ready set with those no longer outstanding:
635 636 ready = pending.difference(self._client.outstanding)
636 637 # update pending to exclude those that are finished
637 638 pending = pending.difference(ready)
638 639 while ready:
639 640 msg_id = ready.pop()
640 641 ar = AsyncResult(self._client, msg_id, self._fname)
641 642 rlist = ar.get()
642 643 try:
643 644 for r in rlist:
644 645 yield r
645 646 except TypeError:
646 647 # flattened, not a list
647 648 # this could get broken by flattened data that returns iterables
648 649 # but most calls to map do not expose the `flatten` argument
649 650 yield rlist
650 651 else:
651 652 # already done
652 653 for r in rlist:
653 654 yield r
654 655
655 656
656 657 class AsyncHubResult(AsyncResult):
657 658 """Class to wrap pending results that must be requested from the Hub.
658 659
659 660 Note that waiting/polling on these objects requires polling the Hubover the network,
660 661 so use `AsyncHubResult.wait()` sparingly.
661 662 """
662 663
663 664 def _wait_for_outputs(self, timeout=-1):
664 665 """no-op, because HubResults are never incomplete"""
665 666 self._outputs_ready = True
666 667
667 668 def wait(self, timeout=-1):
668 669 """wait for result to complete."""
669 670 start = time.time()
670 671 if self._ready:
671 672 return
672 673 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
673 674 local_ready = self._client.wait(local_ids, timeout)
674 675 if local_ready:
675 676 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
676 677 if not remote_ids:
677 678 self._ready = True
678 679 else:
679 680 rdict = self._client.result_status(remote_ids, status_only=False)
680 681 pending = rdict['pending']
681 682 while pending and (timeout < 0 or time.time() < start+timeout):
682 683 rdict = self._client.result_status(remote_ids, status_only=False)
683 684 pending = rdict['pending']
684 685 if pending:
685 686 time.sleep(0.1)
686 687 if not pending:
687 688 self._ready = True
688 689 if self._ready:
689 690 try:
690 691 results = map(self._client.results.get, self.msg_ids)
691 692 self._result = results
692 693 if self._single_result:
693 694 r = results[0]
694 695 if isinstance(r, Exception):
695 696 raise r
696 697 else:
697 698 results = error.collect_exceptions(results, self._fname)
698 699 self._result = self._reconstruct_result(results)
699 700 except Exception as e:
700 701 self._exception = e
701 702 self._success = False
702 703 else:
703 704 self._success = True
704 705 finally:
705 706 self._metadata = map(self._client.metadata.get, self.msg_ids)
706 707
707 708 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']
General Comments 0
You need to be logged in to leave comments. Login now