##// END OF EJS Templates
better flush iopub with AsyncResults...
MinRK -
Show More
@@ -1,690 +1,707
1 1 """AsyncResult objects for the client
2 2
3 3 Authors:
4 4
5 5 * MinRK
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2010-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import print_function
19 19
20 20 import sys
21 21 import time
22 22 from datetime import datetime
23 23
24 24 from zmq import MessageTracker
25 25
26 26 from IPython.core.display import clear_output, display, display_pretty
27 27 from IPython.external.decorator import decorator
28 28 from IPython.parallel import error
29 29
30 30 #-----------------------------------------------------------------------------
31 31 # Functions
32 32 #-----------------------------------------------------------------------------
33 33
34 34 def _total_seconds(td):
35 35 """timedelta.total_seconds was added in 2.7"""
36 36 try:
37 37 # Python >= 2.7
38 38 return td.total_seconds()
39 39 except AttributeError:
40 40 # Python 2.6
41 41 return 1e-6 * (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)
42 42
43 43 def _raw_text(s):
44 44 display_pretty(s, raw=True)
45 45
46 46 #-----------------------------------------------------------------------------
47 47 # Classes
48 48 #-----------------------------------------------------------------------------
49 49
50 50 # global empty tracker that's always done:
51 51 finished_tracker = MessageTracker()
52 52
53 53 @decorator
54 54 def check_ready(f, self, *args, **kwargs):
55 55 """Call spin() to sync state prior to calling the method."""
56 56 self.wait(0)
57 57 if not self._ready:
58 58 raise error.TimeoutError("result not ready")
59 59 return f(self, *args, **kwargs)
60 60
61 61 class AsyncResult(object):
62 62 """Class for representing results of non-blocking calls.
63 63
64 64 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
65 65 """
66 66
67 67 msg_ids = None
68 68 _targets = None
69 69 _tracker = None
70 70 _single_result = False
71 71
72 72 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
73 73 if isinstance(msg_ids, basestring):
74 74 # always a list
75 75 msg_ids = [msg_ids]
76 76 if tracker is None:
77 77 # default to always done
78 78 tracker = finished_tracker
79 79 self._client = client
80 80 self.msg_ids = msg_ids
81 81 self._fname=fname
82 82 self._targets = targets
83 83 self._tracker = tracker
84 84 self._ready = False
85 self._outputs_ready = False
85 86 self._success = None
86 87 self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ]
87 88 if len(msg_ids) == 1:
88 89 self._single_result = not isinstance(targets, (list, tuple))
89 90 else:
90 91 self._single_result = False
91 92
92 93 def __repr__(self):
93 94 if self._ready:
94 95 return "<%s: finished>"%(self.__class__.__name__)
95 96 else:
96 97 return "<%s: %s>"%(self.__class__.__name__,self._fname)
97 98
98 99
99 100 def _reconstruct_result(self, res):
100 101 """Reconstruct our result from actual result list (always a list)
101 102
102 103 Override me in subclasses for turning a list of results
103 104 into the expected form.
104 105 """
105 106 if self._single_result:
106 107 return res[0]
107 108 else:
108 109 return res
109 110
110 111 def get(self, timeout=-1):
111 112 """Return the result when it arrives.
112 113
113 114 If `timeout` is not ``None`` and the result does not arrive within
114 115 `timeout` seconds then ``TimeoutError`` is raised. If the
115 116 remote call raised an exception then that exception will be reraised
116 117 by get() inside a `RemoteError`.
117 118 """
118 119 if not self.ready():
119 120 self.wait(timeout)
120 121
121 122 if self._ready:
122 123 if self._success:
123 124 return self._result
124 125 else:
125 126 raise self._exception
126 127 else:
127 128 raise error.TimeoutError("Result not ready.")
128 129
129 130 def _check_ready(self):
130 131 if not self.ready():
131 132 raise error.TimeoutError("Result not ready.")
132 133
133 134 def ready(self):
134 135 """Return whether the call has completed."""
135 136 if not self._ready:
136 137 self.wait(0)
138 elif not self._outputs_ready:
139 self._wait_for_outputs(0)
140
137 141 return self._ready
138 142
139 143 def wait(self, timeout=-1):
140 144 """Wait until the result is available or until `timeout` seconds pass.
141 145
142 146 This method always returns None.
143 147 """
144 148 if self._ready:
149 self._wait_for_outputs(timeout)
145 150 return
146 151 self._ready = self._client.wait(self.msg_ids, timeout)
147 152 if self._ready:
148 153 try:
149 154 results = map(self._client.results.get, self.msg_ids)
150 155 self._result = results
151 156 if self._single_result:
152 157 r = results[0]
153 158 if isinstance(r, Exception):
154 159 raise r
155 160 else:
156 161 results = error.collect_exceptions(results, self._fname)
157 162 self._result = self._reconstruct_result(results)
158 163 except Exception as e:
159 164 self._exception = e
160 165 self._success = False
161 166 else:
162 167 self._success = True
163 168 finally:
164
165 self._wait_for_outputs(10)
169 if timeout is not None and timeout < 0:
170 timeout = 10
171 self._wait_for_outputs(timeout)
166 172
167 173
168 174 def successful(self):
169 175 """Return whether the call completed without raising an exception.
170 176
171 177 Will raise ``AssertionError`` if the result is not ready.
172 178 """
173 179 assert self.ready()
174 180 return self._success
175 181
176 182 #----------------------------------------------------------------
177 183 # Extra methods not in mp.pool.AsyncResult
178 184 #----------------------------------------------------------------
179 185
180 186 def get_dict(self, timeout=-1):
181 187 """Get the results as a dict, keyed by engine_id.
182 188
183 189 timeout behavior is described in `get()`.
184 190 """
185 191
186 192 results = self.get(timeout)
187 193 engine_ids = [ md['engine_id'] for md in self._metadata ]
188 194 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
189 195 maxcount = bycount.count(bycount[-1])
190 196 if maxcount > 1:
191 197 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
192 198 maxcount, bycount[-1]))
193 199
194 200 return dict(zip(engine_ids,results))
195 201
196 202 @property
197 203 def result(self):
198 204 """result property wrapper for `get(timeout=-1)`."""
199 205 return self.get()
200 206
201 207 # abbreviated alias:
202 208 r = result
203 209
204 210 @property
205 211 def metadata(self):
206 212 """property for accessing execution metadata."""
207 213 if self._single_result:
208 214 return self._metadata[0]
209 215 else:
210 216 return self._metadata
211 217
212 218 @property
213 219 def result_dict(self):
214 220 """result property as a dict."""
215 221 return self.get_dict()
216 222
217 223 def __dict__(self):
218 224 return self.get_dict(0)
219 225
220 226 def abort(self):
221 227 """abort my tasks."""
222 228 assert not self.ready(), "Can't abort, I am already done!"
223 229 return self._client.abort(self.msg_ids, targets=self._targets, block=True)
224 230
225 231 @property
226 232 def sent(self):
227 233 """check whether my messages have been sent."""
228 234 return self._tracker.done
229 235
230 236 def wait_for_send(self, timeout=-1):
231 237 """wait for pyzmq send to complete.
232 238
233 239 This is necessary when sending arrays that you intend to edit in-place.
234 240 `timeout` is in seconds, and will raise TimeoutError if it is reached
235 241 before the send completes.
236 242 """
237 243 return self._tracker.wait(timeout)
238 244
239 245 #-------------------------------------
240 246 # dict-access
241 247 #-------------------------------------
242 248
243 249 def __getitem__(self, key):
244 250 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
245 251 """
246 252 if isinstance(key, int):
247 253 self._check_ready()
248 254 return error.collect_exceptions([self._result[key]], self._fname)[0]
249 255 elif isinstance(key, slice):
250 256 self._check_ready()
251 257 return error.collect_exceptions(self._result[key], self._fname)
252 258 elif isinstance(key, basestring):
253 259 # metadata proxy *does not* require that results are done
260 self.wait(0)
254 261 values = [ md[key] for md in self._metadata ]
255 262 if self._single_result:
256 263 return values[0]
257 264 else:
258 265 return values
259 266 else:
260 267 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
261 268
262 269 def __getattr__(self, key):
263 270 """getattr maps to getitem for convenient attr access to metadata."""
264 271 try:
265 272 return self.__getitem__(key)
266 273 except (error.TimeoutError, KeyError):
267 274 raise AttributeError("%r object has no attribute %r"%(
268 275 self.__class__.__name__, key))
269 276
270 277 # asynchronous iterator:
271 278 def __iter__(self):
272 279 if self._single_result:
273 280 raise TypeError("AsyncResults with a single result are not iterable.")
274 281 try:
275 282 rlist = self.get(0)
276 283 except error.TimeoutError:
277 284 # wait for each result individually
278 285 for msg_id in self.msg_ids:
279 286 ar = AsyncResult(self._client, msg_id, self._fname)
280 287 yield ar.get()
281 288 else:
282 289 # already done
283 290 for r in rlist:
284 291 yield r
285 292
286 293 def __len__(self):
287 294 return len(self.msg_ids)
288 295
289 296 #-------------------------------------
290 297 # Sugar methods and attributes
291 298 #-------------------------------------
292 299
293 300 def timedelta(self, start, end, start_key=min, end_key=max):
294 301 """compute the difference between two sets of timestamps
295 302
296 303 The default behavior is to use the earliest of the first
297 304 and the latest of the second list, but this can be changed
298 305 by passing a different
299 306
300 307 Parameters
301 308 ----------
302 309
303 310 start : one or more datetime objects (e.g. ar.submitted)
304 311 end : one or more datetime objects (e.g. ar.received)
305 312 start_key : callable
306 313 Function to call on `start` to extract the relevant
307 314 entry [defalt: min]
308 315 end_key : callable
309 316 Function to call on `end` to extract the relevant
310 317 entry [default: max]
311 318
312 319 Returns
313 320 -------
314 321
315 322 dt : float
316 323 The time elapsed (in seconds) between the two selected timestamps.
317 324 """
318 325 if not isinstance(start, datetime):
319 326 # handle single_result AsyncResults, where ar.stamp is single object,
320 327 # not a list
321 328 start = start_key(start)
322 329 if not isinstance(end, datetime):
323 330 # handle single_result AsyncResults, where ar.stamp is single object,
324 331 # not a list
325 332 end = end_key(end)
326 333 return _total_seconds(end - start)
327 334
328 335 @property
329 336 def progress(self):
330 337 """the number of tasks which have been completed at this point.
331 338
332 339 Fractional progress would be given by 1.0 * ar.progress / len(ar)
333 340 """
334 341 self.wait(0)
335 342 return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding))
336 343
337 344 @property
338 345 def elapsed(self):
339 346 """elapsed time since initial submission"""
340 347 if self.ready():
341 348 return self.wall_time
342 349
343 350 now = submitted = datetime.now()
344 351 for msg_id in self.msg_ids:
345 352 if msg_id in self._client.metadata:
346 353 stamp = self._client.metadata[msg_id]['submitted']
347 354 if stamp and stamp < submitted:
348 355 submitted = stamp
349 356 return _total_seconds(now-submitted)
350 357
351 358 @property
352 359 @check_ready
353 360 def serial_time(self):
354 361 """serial computation time of a parallel calculation
355 362
356 363 Computed as the sum of (completed-started) of each task
357 364 """
358 365 t = 0
359 366 for md in self._metadata:
360 367 t += _total_seconds(md['completed'] - md['started'])
361 368 return t
362 369
363 370 @property
364 371 @check_ready
365 372 def wall_time(self):
366 373 """actual computation time of a parallel calculation
367 374
368 375 Computed as the time between the latest `received` stamp
369 376 and the earliest `submitted`.
370 377
371 378 Only reliable if Client was spinning/waiting when the task finished, because
372 379 the `received` timestamp is created when a result is pulled off of the zmq queue,
373 380 which happens as a result of `client.spin()`.
374 381
375 382 For similar comparison of other timestamp pairs, check out AsyncResult.timedelta.
376 383
377 384 """
378 385 return self.timedelta(self.submitted, self.received)
379 386
380 def wait_interactive(self, interval=1., timeout=None):
387 def wait_interactive(self, interval=1., timeout=-1):
381 388 """interactive wait, printing progress at regular intervals"""
389 if timeout is None:
390 timeout = -1
382 391 N = len(self)
383 392 tic = time.time()
384 while not self.ready() and (timeout is None or time.time() - tic <= timeout):
393 while not self.ready() and (timeout < 0 or time.time() - tic <= timeout):
385 394 self.wait(interval)
386 395 clear_output()
387 396 print("%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed), end="")
388 397 sys.stdout.flush()
389 398 print()
390 399 print("done")
391 400
392 401 def _republish_displaypub(self, content, eid):
393 402 """republish individual displaypub content dicts"""
394 403 try:
395 404 ip = get_ipython()
396 405 except NameError:
397 406 # displaypub is meaningless outside IPython
398 407 return
399 408 md = content['metadata'] or {}
400 409 md['engine'] = eid
401 410 ip.display_pub.publish(content['source'], content['data'], md)
402 411
403 412 def _display_stream(self, text, prefix='', file=None):
404 413 if not text:
405 414 # nothing to display
406 415 return
407 416 if file is None:
408 417 file = sys.stdout
409 418 end = '' if text.endswith('\n') else '\n'
410 419
411 420 multiline = text.count('\n') > int(text.endswith('\n'))
412 421 if prefix and multiline and not text.startswith('\n'):
413 422 prefix = prefix + '\n'
414 423 print("%s%s" % (prefix, text), file=file, end=end)
415 424
416 425
417 426 def _display_single_result(self):
418 427 self._display_stream(self.stdout)
419 428 self._display_stream(self.stderr, file=sys.stderr)
420 429
421 430 try:
422 431 get_ipython()
423 432 except NameError:
424 433 # displaypub is meaningless outside IPython
425 434 return
426 435
427 436 for output in self.outputs:
428 437 self._republish_displaypub(output, self.engine_id)
429 438
430 439 if self.pyout is not None:
431 440 display(self.get())
432 441
433 442 def _wait_for_outputs(self, timeout=-1):
434 443 """wait for the 'status=idle' message that indicates we have all outputs
435 444 """
436 if not self._success:
445 if self._outputs_ready or not self._success:
437 446 # don't wait on errors
438 447 return
448
449 # cast None to -1 for infinite timeout
450 if timeout is None:
451 timeout = -1
452
439 453 tic = time.time()
440 while not all(md['outputs_ready'] for md in self._metadata):
454 self._client._flush_iopub(self._client._iopub_socket)
455 self._outputs_ready = all(md['outputs_ready'] for md in self._metadata)
456 while not self._outputs_ready:
441 457 time.sleep(0.01)
442 458 self._client._flush_iopub(self._client._iopub_socket)
459 self._outputs_ready = all(md['outputs_ready'] for md in self._metadata)
443 460 if timeout >= 0 and time.time() > tic + timeout:
444 461 break
445 462
446 463 @check_ready
447 464 def display_outputs(self, groupby="type"):
448 465 """republish the outputs of the computation
449 466
450 467 Parameters
451 468 ----------
452 469
453 470 groupby : str [default: type]
454 471 if 'type':
455 472 Group outputs by type (show all stdout, then all stderr, etc.):
456 473
457 474 [stdout:1] foo
458 475 [stdout:2] foo
459 476 [stderr:1] bar
460 477 [stderr:2] bar
461 478 if 'engine':
462 479 Display outputs for each engine before moving on to the next:
463 480
464 481 [stdout:1] foo
465 482 [stderr:1] bar
466 483 [stdout:2] foo
467 484 [stderr:2] bar
468 485
469 486 if 'order':
470 487 Like 'type', but further collate individual displaypub
471 488 outputs. This is meant for cases of each command producing
472 489 several plots, and you would like to see all of the first
473 490 plots together, then all of the second plots, and so on.
474 491 """
475 492 if self._single_result:
476 493 self._display_single_result()
477 494 return
478 495
479 496 stdouts = self.stdout
480 497 stderrs = self.stderr
481 498 pyouts = self.pyout
482 499 output_lists = self.outputs
483 500 results = self.get()
484 501
485 502 targets = self.engine_id
486 503
487 504 if groupby == "engine":
488 505 for eid,stdout,stderr,outputs,r,pyout in zip(
489 506 targets, stdouts, stderrs, output_lists, results, pyouts
490 507 ):
491 508 self._display_stream(stdout, '[stdout:%i] ' % eid)
492 509 self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
493 510
494 511 try:
495 512 get_ipython()
496 513 except NameError:
497 514 # displaypub is meaningless outside IPython
498 515 return
499 516
500 517 if outputs or pyout is not None:
501 518 _raw_text('[output:%i]' % eid)
502 519
503 520 for output in outputs:
504 521 self._republish_displaypub(output, eid)
505 522
506 523 if pyout is not None:
507 524 display(r)
508 525
509 526 elif groupby in ('type', 'order'):
510 527 # republish stdout:
511 528 for eid,stdout in zip(targets, stdouts):
512 529 self._display_stream(stdout, '[stdout:%i] ' % eid)
513 530
514 531 # republish stderr:
515 532 for eid,stderr in zip(targets, stderrs):
516 533 self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
517 534
518 535 try:
519 536 get_ipython()
520 537 except NameError:
521 538 # displaypub is meaningless outside IPython
522 539 return
523 540
524 541 if groupby == 'order':
525 542 output_dict = dict((eid, outputs) for eid,outputs in zip(targets, output_lists))
526 543 N = max(len(outputs) for outputs in output_lists)
527 544 for i in range(N):
528 545 for eid in targets:
529 546 outputs = output_dict[eid]
530 547 if len(outputs) >= N:
531 548 _raw_text('[output:%i]' % eid)
532 549 self._republish_displaypub(outputs[i], eid)
533 550 else:
534 551 # republish displaypub output
535 552 for eid,outputs in zip(targets, output_lists):
536 553 if outputs:
537 554 _raw_text('[output:%i]' % eid)
538 555 for output in outputs:
539 556 self._republish_displaypub(output, eid)
540 557
541 558 # finally, add pyout:
542 559 for eid,r,pyout in zip(targets, results, pyouts):
543 560 if pyout is not None:
544 561 display(r)
545 562
546 563 else:
547 564 raise ValueError("groupby must be one of 'type', 'engine', 'collate', not %r" % groupby)
548 565
549 566
550 567
551 568
552 569 class AsyncMapResult(AsyncResult):
553 570 """Class for representing results of non-blocking gathers.
554 571
555 572 This will properly reconstruct the gather.
556 573
557 574 This class is iterable at any time, and will wait on results as they come.
558 575
559 576 If ordered=False, then the first results to arrive will come first, otherwise
560 577 results will be yielded in the order they were submitted.
561 578
562 579 """
563 580
564 581 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
565 582 AsyncResult.__init__(self, client, msg_ids, fname=fname)
566 583 self._mapObject = mapObject
567 584 self._single_result = False
568 585 self.ordered = ordered
569 586
570 587 def _reconstruct_result(self, res):
571 588 """Perform the gather on the actual results."""
572 589 return self._mapObject.joinPartitions(res)
573 590
574 591 # asynchronous iterator:
575 592 def __iter__(self):
576 593 it = self._ordered_iter if self.ordered else self._unordered_iter
577 594 for r in it():
578 595 yield r
579 596
580 597 # asynchronous ordered iterator:
581 598 def _ordered_iter(self):
582 599 """iterator for results *as they arrive*, preserving submission order."""
583 600 try:
584 601 rlist = self.get(0)
585 602 except error.TimeoutError:
586 603 # wait for each result individually
587 604 for msg_id in self.msg_ids:
588 605 ar = AsyncResult(self._client, msg_id, self._fname)
589 606 rlist = ar.get()
590 607 try:
591 608 for r in rlist:
592 609 yield r
593 610 except TypeError:
594 611 # flattened, not a list
595 612 # this could get broken by flattened data that returns iterables
596 613 # but most calls to map do not expose the `flatten` argument
597 614 yield rlist
598 615 else:
599 616 # already done
600 617 for r in rlist:
601 618 yield r
602 619
603 620 # asynchronous unordered iterator:
604 621 def _unordered_iter(self):
605 622 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
606 623 try:
607 624 rlist = self.get(0)
608 625 except error.TimeoutError:
609 626 pending = set(self.msg_ids)
610 627 while pending:
611 628 try:
612 629 self._client.wait(pending, 1e-3)
613 630 except error.TimeoutError:
614 631 # ignore timeout error, because that only means
615 632 # *some* jobs are outstanding
616 633 pass
617 634 # update ready set with those no longer outstanding:
618 635 ready = pending.difference(self._client.outstanding)
619 636 # update pending to exclude those that are finished
620 637 pending = pending.difference(ready)
621 638 while ready:
622 639 msg_id = ready.pop()
623 640 ar = AsyncResult(self._client, msg_id, self._fname)
624 641 rlist = ar.get()
625 642 try:
626 643 for r in rlist:
627 644 yield r
628 645 except TypeError:
629 646 # flattened, not a list
630 647 # this could get broken by flattened data that returns iterables
631 648 # but most calls to map do not expose the `flatten` argument
632 649 yield rlist
633 650 else:
634 651 # already done
635 652 for r in rlist:
636 653 yield r
637 654
638 655
639 656 class AsyncHubResult(AsyncResult):
640 657 """Class to wrap pending results that must be requested from the Hub.
641 658
642 659 Note that waiting/polling on these objects requires polling the Hubover the network,
643 660 so use `AsyncHubResult.wait()` sparingly.
644 661 """
645 662
646 def _wait_for_outputs(self, timeout=None):
663 def _wait_for_outputs(self, timeout=-1):
647 664 """no-op, because HubResults are never incomplete"""
648 return
665 self._outputs_ready = True
649 666
650 667 def wait(self, timeout=-1):
651 668 """wait for result to complete."""
652 669 start = time.time()
653 670 if self._ready:
654 671 return
655 672 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
656 673 local_ready = self._client.wait(local_ids, timeout)
657 674 if local_ready:
658 675 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
659 676 if not remote_ids:
660 677 self._ready = True
661 678 else:
662 679 rdict = self._client.result_status(remote_ids, status_only=False)
663 680 pending = rdict['pending']
664 681 while pending and (timeout < 0 or time.time() < start+timeout):
665 682 rdict = self._client.result_status(remote_ids, status_only=False)
666 683 pending = rdict['pending']
667 684 if pending:
668 685 time.sleep(0.1)
669 686 if not pending:
670 687 self._ready = True
671 688 if self._ready:
672 689 try:
673 690 results = map(self._client.results.get, self.msg_ids)
674 691 self._result = results
675 692 if self._single_result:
676 693 r = results[0]
677 694 if isinstance(r, Exception):
678 695 raise r
679 696 else:
680 697 results = error.collect_exceptions(results, self._fname)
681 698 self._result = self._reconstruct_result(results)
682 699 except Exception as e:
683 700 self._exception = e
684 701 self._success = False
685 702 else:
686 703 self._success = True
687 704 finally:
688 705 self._metadata = map(self._client.metadata.get, self.msg_ids)
689 706
690 707 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']
@@ -1,267 +1,294
1 1 """Tests for asyncresult.py
2 2
3 3 Authors:
4 4
5 5 * Min RK
6 6 """
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 import time
20 20
21 import nose.tools as nt
22
21 23 from IPython.utils.io import capture_output
22 24
23 25 from IPython.parallel.error import TimeoutError
24 26 from IPython.parallel import error, Client
25 27 from IPython.parallel.tests import add_engines
26 28 from .clienttest import ClusterTestCase
27 29
28 30 def setup():
29 31 add_engines(2, total=True)
30 32
31 33 def wait(n):
32 34 import time
33 35 time.sleep(n)
34 36 return n
35 37
36 38 class AsyncResultTest(ClusterTestCase):
37 39
38 40 def test_single_result_view(self):
39 41 """various one-target views get the right value for single_result"""
40 42 eid = self.client.ids[-1]
41 43 ar = self.client[eid].apply_async(lambda : 42)
42 44 self.assertEqual(ar.get(), 42)
43 45 ar = self.client[[eid]].apply_async(lambda : 42)
44 46 self.assertEqual(ar.get(), [42])
45 47 ar = self.client[-1:].apply_async(lambda : 42)
46 48 self.assertEqual(ar.get(), [42])
47 49
48 50 def test_get_after_done(self):
49 51 ar = self.client[-1].apply_async(lambda : 42)
50 52 ar.wait()
51 53 self.assertTrue(ar.ready())
52 54 self.assertEqual(ar.get(), 42)
53 55 self.assertEqual(ar.get(), 42)
54 56
55 57 def test_get_before_done(self):
56 58 ar = self.client[-1].apply_async(wait, 0.1)
57 59 self.assertRaises(TimeoutError, ar.get, 0)
58 60 ar.wait(0)
59 61 self.assertFalse(ar.ready())
60 62 self.assertEqual(ar.get(), 0.1)
61 63
62 64 def test_get_after_error(self):
63 65 ar = self.client[-1].apply_async(lambda : 1/0)
64 66 ar.wait(10)
65 67 self.assertRaisesRemote(ZeroDivisionError, ar.get)
66 68 self.assertRaisesRemote(ZeroDivisionError, ar.get)
67 69 self.assertRaisesRemote(ZeroDivisionError, ar.get_dict)
68 70
69 71 def test_get_dict(self):
70 72 n = len(self.client)
71 73 ar = self.client[:].apply_async(lambda : 5)
72 74 self.assertEqual(ar.get(), [5]*n)
73 75 d = ar.get_dict()
74 76 self.assertEqual(sorted(d.keys()), sorted(self.client.ids))
75 77 for eid,r in d.iteritems():
76 78 self.assertEqual(r, 5)
77 79
78 80 def test_list_amr(self):
79 81 ar = self.client.load_balanced_view().map_async(wait, [0.1]*5)
80 82 rlist = list(ar)
81 83
82 84 def test_getattr(self):
83 85 ar = self.client[:].apply_async(wait, 0.5)
84 86 self.assertEqual(ar.engine_id, [None] * len(ar))
85 87 self.assertRaises(AttributeError, lambda : ar._foo)
86 88 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
87 89 self.assertRaises(AttributeError, lambda : ar.foo)
88 90 self.assertFalse(hasattr(ar, '__length_hint__'))
89 91 self.assertFalse(hasattr(ar, 'foo'))
90 92 self.assertTrue(hasattr(ar, 'engine_id'))
91 93 ar.get(5)
92 94 self.assertRaises(AttributeError, lambda : ar._foo)
93 95 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
94 96 self.assertRaises(AttributeError, lambda : ar.foo)
95 97 self.assertTrue(isinstance(ar.engine_id, list))
96 98 self.assertEqual(ar.engine_id, ar['engine_id'])
97 99 self.assertFalse(hasattr(ar, '__length_hint__'))
98 100 self.assertFalse(hasattr(ar, 'foo'))
99 101 self.assertTrue(hasattr(ar, 'engine_id'))
100 102
101 103 def test_getitem(self):
102 104 ar = self.client[:].apply_async(wait, 0.5)
103 105 self.assertEqual(ar['engine_id'], [None] * len(ar))
104 106 self.assertRaises(KeyError, lambda : ar['foo'])
105 107 ar.get(5)
106 108 self.assertRaises(KeyError, lambda : ar['foo'])
107 109 self.assertTrue(isinstance(ar['engine_id'], list))
108 110 self.assertEqual(ar.engine_id, ar['engine_id'])
109 111
110 112 def test_single_result(self):
111 113 ar = self.client[-1].apply_async(wait, 0.5)
112 114 self.assertRaises(KeyError, lambda : ar['foo'])
113 115 self.assertEqual(ar['engine_id'], None)
114 116 self.assertTrue(ar.get(5) == 0.5)
115 117 self.assertTrue(isinstance(ar['engine_id'], int))
116 118 self.assertTrue(isinstance(ar.engine_id, int))
117 119 self.assertEqual(ar.engine_id, ar['engine_id'])
118 120
119 121 def test_abort(self):
120 122 e = self.client[-1]
121 123 ar = e.execute('import time; time.sleep(1)', block=False)
122 124 ar2 = e.apply_async(lambda : 2)
123 125 ar2.abort()
124 126 self.assertRaises(error.TaskAborted, ar2.get)
125 127 ar.get()
126 128
127 129 def test_len(self):
128 130 v = self.client.load_balanced_view()
129 131 ar = v.map_async(lambda x: x, range(10))
130 132 self.assertEqual(len(ar), 10)
131 133 ar = v.apply_async(lambda x: x, range(10))
132 134 self.assertEqual(len(ar), 1)
133 135 ar = self.client[:].apply_async(lambda x: x, range(10))
134 136 self.assertEqual(len(ar), len(self.client.ids))
135 137
136 138 def test_wall_time_single(self):
137 139 v = self.client.load_balanced_view()
138 140 ar = v.apply_async(time.sleep, 0.25)
139 141 self.assertRaises(TimeoutError, getattr, ar, 'wall_time')
140 142 ar.get(2)
141 143 self.assertTrue(ar.wall_time < 1.)
142 144 self.assertTrue(ar.wall_time > 0.2)
143 145
144 146 def test_wall_time_multi(self):
145 147 self.minimum_engines(4)
146 148 v = self.client[:]
147 149 ar = v.apply_async(time.sleep, 0.25)
148 150 self.assertRaises(TimeoutError, getattr, ar, 'wall_time')
149 151 ar.get(2)
150 152 self.assertTrue(ar.wall_time < 1.)
151 153 self.assertTrue(ar.wall_time > 0.2)
152 154
153 155 def test_serial_time_single(self):
154 156 v = self.client.load_balanced_view()
155 157 ar = v.apply_async(time.sleep, 0.25)
156 158 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
157 159 ar.get(2)
158 160 self.assertTrue(ar.serial_time < 1.)
159 161 self.assertTrue(ar.serial_time > 0.2)
160 162
161 163 def test_serial_time_multi(self):
162 164 self.minimum_engines(4)
163 165 v = self.client[:]
164 166 ar = v.apply_async(time.sleep, 0.25)
165 167 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
166 168 ar.get(2)
167 169 self.assertTrue(ar.serial_time < 2.)
168 170 self.assertTrue(ar.serial_time > 0.8)
169 171
170 172 def test_elapsed_single(self):
171 173 v = self.client.load_balanced_view()
172 174 ar = v.apply_async(time.sleep, 0.25)
173 175 while not ar.ready():
174 176 time.sleep(0.01)
175 177 self.assertTrue(ar.elapsed < 1)
176 178 self.assertTrue(ar.elapsed < 1)
177 179 ar.get(2)
178 180
179 181 def test_elapsed_multi(self):
180 182 v = self.client[:]
181 183 ar = v.apply_async(time.sleep, 0.25)
182 184 while not ar.ready():
183 185 time.sleep(0.01)
184 186 self.assertTrue(ar.elapsed < 1)
185 187 self.assertTrue(ar.elapsed < 1)
186 188 ar.get(2)
187 189
188 190 def test_hubresult_timestamps(self):
189 191 self.minimum_engines(4)
190 192 v = self.client[:]
191 193 ar = v.apply_async(time.sleep, 0.25)
192 194 ar.get(2)
193 195 rc2 = Client(profile='iptest')
194 196 # must have try/finally to close second Client, otherwise
195 197 # will have dangling sockets causing problems
196 198 try:
197 199 time.sleep(0.25)
198 200 hr = rc2.get_result(ar.msg_ids)
199 201 self.assertTrue(hr.elapsed > 0., "got bad elapsed: %s" % hr.elapsed)
200 202 hr.get(1)
201 203 self.assertTrue(hr.wall_time < ar.wall_time + 0.2, "got bad wall_time: %s > %s" % (hr.wall_time, ar.wall_time))
202 204 self.assertEqual(hr.serial_time, ar.serial_time)
203 205 finally:
204 206 rc2.close()
205 207
206 208 def test_display_empty_streams_single(self):
207 209 """empty stdout/err are not displayed (single result)"""
208 210 self.minimum_engines(1)
209 211
210 212 v = self.client[-1]
211 213 ar = v.execute("print (5555)")
212 214 ar.get(5)
213 215 with capture_output() as io:
214 216 ar.display_outputs()
215 217 self.assertEqual(io.stderr, '')
216 218 self.assertEqual('5555\n', io.stdout)
217 219
218 220 ar = v.execute("a=5")
219 221 ar.get(5)
220 222 with capture_output() as io:
221 223 ar.display_outputs()
222 224 self.assertEqual(io.stderr, '')
223 225 self.assertEqual(io.stdout, '')
224 226
225 227 def test_display_empty_streams_type(self):
226 228 """empty stdout/err are not displayed (groupby type)"""
227 229 self.minimum_engines(1)
228 230
229 231 v = self.client[:]
230 232 ar = v.execute("print (5555)")
231 233 ar.get(5)
232 234 with capture_output() as io:
233 235 ar.display_outputs()
234 236 self.assertEqual(io.stderr, '')
235 237 self.assertEqual(io.stdout.count('5555'), len(v), io.stdout)
236 238 self.assertFalse('\n\n' in io.stdout, io.stdout)
237 239 self.assertEqual(io.stdout.count('[stdout:'), len(v), io.stdout)
238 240
239 241 ar = v.execute("a=5")
240 242 ar.get(5)
241 243 with capture_output() as io:
242 244 ar.display_outputs()
243 245 self.assertEqual(io.stderr, '')
244 246 self.assertEqual(io.stdout, '')
245 247
246 248 def test_display_empty_streams_engine(self):
247 249 """empty stdout/err are not displayed (groupby engine)"""
248 250 self.minimum_engines(1)
249 251
250 252 v = self.client[:]
251 253 ar = v.execute("print (5555)")
252 254 ar.get(5)
253 255 with capture_output() as io:
254 256 ar.display_outputs('engine')
255 257 self.assertEqual(io.stderr, '')
256 258 self.assertEqual(io.stdout.count('5555'), len(v), io.stdout)
257 259 self.assertFalse('\n\n' in io.stdout, io.stdout)
258 260 self.assertEqual(io.stdout.count('[stdout:'), len(v), io.stdout)
259 261
260 262 ar = v.execute("a=5")
261 263 ar.get(5)
262 264 with capture_output() as io:
263 265 ar.display_outputs('engine')
264 266 self.assertEqual(io.stderr, '')
265 267 self.assertEqual(io.stdout, '')
268
269 def test_await_data(self):
270 """asking for ar.data flushes outputs"""
271 self.minimum_engines(1)
266 272
273 v = self.client[-1]
274 ar = v.execute('\n'.join([
275 "import time",
276 "from IPython.zmq.datapub import publish_data",
277 "for i in range(5):",
278 " publish_data(dict(i=i))",
279 " time.sleep(0.1)",
280 ]), block=False)
281 found = set()
282 tic = time.time()
283 # timeout after 10s
284 while time.time() <= tic + 10:
285 if ar.data:
286 found.add(ar.data['i'])
287 if ar.data['i'] == 4:
288 break
289 time.sleep(0.05)
290
291 ar.get(5)
292 nt.assert_in(4, found)
293 self.assertTrue(len(found) > 1, "should have seen data multiple times, but got: %s" % found)
267 294
General Comments 0
You need to be logged in to leave comments. Login now