##// END OF EJS Templates
add missing '>> sys.stderr' for one display_outputs case
MinRK -
Show More
@@ -1,651 +1,651
1 1 """AsyncResult objects for the client
2 2
3 3 Authors:
4 4
5 5 * MinRK
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2010-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import sys
19 19 import time
20 20 from datetime import datetime
21 21
22 22 from zmq import MessageTracker
23 23
24 24 from IPython.core.display import clear_output, display
25 25 from IPython.external.decorator import decorator
26 26 from IPython.parallel import error
27 27
28 28 #-----------------------------------------------------------------------------
29 29 # Functions
30 30 #-----------------------------------------------------------------------------
31 31
32 32 def _total_seconds(td):
33 33 """timedelta.total_seconds was added in 2.7"""
34 34 try:
35 35 # Python >= 2.7
36 36 return td.total_seconds()
37 37 except AttributeError:
38 38 # Python 2.6
39 39 return 1e-6 * (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)
40 40
41 41 #-----------------------------------------------------------------------------
42 42 # Classes
43 43 #-----------------------------------------------------------------------------
44 44
45 45 # global empty tracker that's always done:
46 46 finished_tracker = MessageTracker()
47 47
48 48 @decorator
49 49 def check_ready(f, self, *args, **kwargs):
50 50 """Call spin() to sync state prior to calling the method."""
51 51 self.wait(0)
52 52 if not self._ready:
53 53 raise error.TimeoutError("result not ready")
54 54 return f(self, *args, **kwargs)
55 55
56 56 class AsyncResult(object):
57 57 """Class for representing results of non-blocking calls.
58 58
59 59 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
60 60 """
61 61
62 62 msg_ids = None
63 63 _targets = None
64 64 _tracker = None
65 65 _single_result = False
66 66
67 67 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
68 68 if isinstance(msg_ids, basestring):
69 69 # always a list
70 70 msg_ids = [msg_ids]
71 71 if tracker is None:
72 72 # default to always done
73 73 tracker = finished_tracker
74 74 self._client = client
75 75 self.msg_ids = msg_ids
76 76 self._fname=fname
77 77 self._targets = targets
78 78 self._tracker = tracker
79 79 self._ready = False
80 80 self._success = None
81 81 self._metadata = None
82 82 if len(msg_ids) == 1:
83 83 self._single_result = not isinstance(targets, (list, tuple))
84 84 else:
85 85 self._single_result = False
86 86
87 87 def __repr__(self):
88 88 if self._ready:
89 89 return "<%s: finished>"%(self.__class__.__name__)
90 90 else:
91 91 return "<%s: %s>"%(self.__class__.__name__,self._fname)
92 92
93 93
94 94 def _reconstruct_result(self, res):
95 95 """Reconstruct our result from actual result list (always a list)
96 96
97 97 Override me in subclasses for turning a list of results
98 98 into the expected form.
99 99 """
100 100 if self._single_result:
101 101 return res[0]
102 102 else:
103 103 return res
104 104
105 105 def get(self, timeout=-1):
106 106 """Return the result when it arrives.
107 107
108 108 If `timeout` is not ``None`` and the result does not arrive within
109 109 `timeout` seconds then ``TimeoutError`` is raised. If the
110 110 remote call raised an exception then that exception will be reraised
111 111 by get() inside a `RemoteError`.
112 112 """
113 113 if not self.ready():
114 114 self.wait(timeout)
115 115
116 116 if self._ready:
117 117 if self._success:
118 118 return self._result
119 119 else:
120 120 raise self._exception
121 121 else:
122 122 raise error.TimeoutError("Result not ready.")
123 123
124 124 def ready(self):
125 125 """Return whether the call has completed."""
126 126 if not self._ready:
127 127 self.wait(0)
128 128 return self._ready
129 129
130 130 def wait(self, timeout=-1):
131 131 """Wait until the result is available or until `timeout` seconds pass.
132 132
133 133 This method always returns None.
134 134 """
135 135 if self._ready:
136 136 return
137 137 self._ready = self._client.wait(self.msg_ids, timeout)
138 138 if self._ready:
139 139 try:
140 140 results = map(self._client.results.get, self.msg_ids)
141 141 self._result = results
142 142 if self._single_result:
143 143 r = results[0]
144 144 if isinstance(r, Exception):
145 145 raise r
146 146 else:
147 147 results = error.collect_exceptions(results, self._fname)
148 148 self._result = self._reconstruct_result(results)
149 149 except Exception, e:
150 150 self._exception = e
151 151 self._success = False
152 152 else:
153 153 self._success = True
154 154 finally:
155 155 self._metadata = map(self._client.metadata.get, self.msg_ids)
156 156
157 157
158 158 def successful(self):
159 159 """Return whether the call completed without raising an exception.
160 160
161 161 Will raise ``AssertionError`` if the result is not ready.
162 162 """
163 163 assert self.ready()
164 164 return self._success
165 165
166 166 #----------------------------------------------------------------
167 167 # Extra methods not in mp.pool.AsyncResult
168 168 #----------------------------------------------------------------
169 169
170 170 def get_dict(self, timeout=-1):
171 171 """Get the results as a dict, keyed by engine_id.
172 172
173 173 timeout behavior is described in `get()`.
174 174 """
175 175
176 176 results = self.get(timeout)
177 177 engine_ids = [ md['engine_id'] for md in self._metadata ]
178 178 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
179 179 maxcount = bycount.count(bycount[-1])
180 180 if maxcount > 1:
181 181 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
182 182 maxcount, bycount[-1]))
183 183
184 184 return dict(zip(engine_ids,results))
185 185
186 186 @property
187 187 def result(self):
188 188 """result property wrapper for `get(timeout=0)`."""
189 189 return self.get()
190 190
191 191 # abbreviated alias:
192 192 r = result
193 193
194 194 @property
195 195 @check_ready
196 196 def metadata(self):
197 197 """property for accessing execution metadata."""
198 198 if self._single_result:
199 199 return self._metadata[0]
200 200 else:
201 201 return self._metadata
202 202
203 203 @property
204 204 def result_dict(self):
205 205 """result property as a dict."""
206 206 return self.get_dict()
207 207
208 208 def __dict__(self):
209 209 return self.get_dict(0)
210 210
211 211 def abort(self):
212 212 """abort my tasks."""
213 213 assert not self.ready(), "Can't abort, I am already done!"
214 214 return self._client.abort(self.msg_ids, targets=self._targets, block=True)
215 215
216 216 @property
217 217 def sent(self):
218 218 """check whether my messages have been sent."""
219 219 return self._tracker.done
220 220
221 221 def wait_for_send(self, timeout=-1):
222 222 """wait for pyzmq send to complete.
223 223
224 224 This is necessary when sending arrays that you intend to edit in-place.
225 225 `timeout` is in seconds, and will raise TimeoutError if it is reached
226 226 before the send completes.
227 227 """
228 228 return self._tracker.wait(timeout)
229 229
230 230 #-------------------------------------
231 231 # dict-access
232 232 #-------------------------------------
233 233
234 234 @check_ready
235 235 def __getitem__(self, key):
236 236 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
237 237 """
238 238 if isinstance(key, int):
239 239 return error.collect_exceptions([self._result[key]], self._fname)[0]
240 240 elif isinstance(key, slice):
241 241 return error.collect_exceptions(self._result[key], self._fname)
242 242 elif isinstance(key, basestring):
243 243 values = [ md[key] for md in self._metadata ]
244 244 if self._single_result:
245 245 return values[0]
246 246 else:
247 247 return values
248 248 else:
249 249 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
250 250
251 251 def __getattr__(self, key):
252 252 """getattr maps to getitem for convenient attr access to metadata."""
253 253 try:
254 254 return self.__getitem__(key)
255 255 except (error.TimeoutError, KeyError):
256 256 raise AttributeError("%r object has no attribute %r"%(
257 257 self.__class__.__name__, key))
258 258
259 259 # asynchronous iterator:
260 260 def __iter__(self):
261 261 if self._single_result:
262 262 raise TypeError("AsyncResults with a single result are not iterable.")
263 263 try:
264 264 rlist = self.get(0)
265 265 except error.TimeoutError:
266 266 # wait for each result individually
267 267 for msg_id in self.msg_ids:
268 268 ar = AsyncResult(self._client, msg_id, self._fname)
269 269 yield ar.get()
270 270 else:
271 271 # already done
272 272 for r in rlist:
273 273 yield r
274 274
275 275 def __len__(self):
276 276 return len(self.msg_ids)
277 277
278 278 #-------------------------------------
279 279 # Sugar methods and attributes
280 280 #-------------------------------------
281 281
282 282 def timedelta(self, start, end, start_key=min, end_key=max):
283 283 """compute the difference between two sets of timestamps
284 284
285 285 The default behavior is to use the earliest of the first
286 286 and the latest of the second list, but this can be changed
287 287 by passing a different
288 288
289 289 Parameters
290 290 ----------
291 291
292 292 start : one or more datetime objects (e.g. ar.submitted)
293 293 end : one or more datetime objects (e.g. ar.received)
294 294 start_key : callable
295 295 Function to call on `start` to extract the relevant
296 296 entry [defalt: min]
297 297 end_key : callable
298 298 Function to call on `end` to extract the relevant
299 299 entry [default: max]
300 300
301 301 Returns
302 302 -------
303 303
304 304 dt : float
305 305 The time elapsed (in seconds) between the two selected timestamps.
306 306 """
307 307 if not isinstance(start, datetime):
308 308 # handle single_result AsyncResults, where ar.stamp is single object,
309 309 # not a list
310 310 start = start_key(start)
311 311 if not isinstance(end, datetime):
312 312 # handle single_result AsyncResults, where ar.stamp is single object,
313 313 # not a list
314 314 end = end_key(end)
315 315 return _total_seconds(end - start)
316 316
317 317 @property
318 318 def progress(self):
319 319 """the number of tasks which have been completed at this point.
320 320
321 321 Fractional progress would be given by 1.0 * ar.progress / len(ar)
322 322 """
323 323 self.wait(0)
324 324 return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding))
325 325
326 326 @property
327 327 def elapsed(self):
328 328 """elapsed time since initial submission"""
329 329 if self.ready():
330 330 return self.wall_time
331 331
332 332 now = submitted = datetime.now()
333 333 for msg_id in self.msg_ids:
334 334 if msg_id in self._client.metadata:
335 335 stamp = self._client.metadata[msg_id]['submitted']
336 336 if stamp and stamp < submitted:
337 337 submitted = stamp
338 338 return _total_seconds(now-submitted)
339 339
340 340 @property
341 341 @check_ready
342 342 def serial_time(self):
343 343 """serial computation time of a parallel calculation
344 344
345 345 Computed as the sum of (completed-started) of each task
346 346 """
347 347 t = 0
348 348 for md in self._metadata:
349 349 t += _total_seconds(md['completed'] - md['started'])
350 350 return t
351 351
352 352 @property
353 353 @check_ready
354 354 def wall_time(self):
355 355 """actual computation time of a parallel calculation
356 356
357 357 Computed as the time between the latest `received` stamp
358 358 and the earliest `submitted`.
359 359
360 360 Only reliable if Client was spinning/waiting when the task finished, because
361 361 the `received` timestamp is created when a result is pulled off of the zmq queue,
362 362 which happens as a result of `client.spin()`.
363 363
364 364 For similar comparison of other timestamp pairs, check out AsyncResult.timedelta.
365 365
366 366 """
367 367 return self.timedelta(self.submitted, self.received)
368 368
369 369 def wait_interactive(self, interval=1., timeout=None):
370 370 """interactive wait, printing progress at regular intervals"""
371 371 N = len(self)
372 372 tic = time.time()
373 373 while not self.ready() and (timeout is None or time.time() - tic <= timeout):
374 374 self.wait(interval)
375 375 clear_output()
376 376 print "%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed),
377 377 sys.stdout.flush()
378 378 print
379 379 print "done"
380 380
381 381 def _republish_displaypub(self, content, eid):
382 382 """republish individual displaypub content dicts"""
383 383 try:
384 384 ip = get_ipython()
385 385 except NameError:
386 386 # displaypub is meaningless outside IPython
387 387 return
388 388 md = content['metadata'] or {}
389 389 md['engine'] = eid
390 390 ip.display_pub.publish(content['source'], content['data'], md)
391 391
392 392
393 393 def _display_single_result(self):
394 394
395 395 print self.stdout
396 396 print >> sys.stderr, self.stderr
397 397
398 398 try:
399 399 get_ipython()
400 400 except NameError:
401 401 # displaypub is meaningless outside IPython
402 402 return
403 403
404 404 for output in self.outputs:
405 405 self._republish_displaypub(output, self.engine_id)
406 406
407 407 if self.pyout is not None:
408 408 display(self.get())
409 409
410 410 @check_ready
411 411 def display_outputs(self, groupby="type"):
412 412 """republish the outputs of the computation
413 413
414 414 Parameters
415 415 ----------
416 416
417 417 groupby : str [default: type]
418 418 if 'type':
419 419 Group outputs by type (show all stdout, then all stderr, etc.):
420 420
421 421 [stdout:1] foo
422 422 [stdout:2] foo
423 423 [stderr:1] bar
424 424 [stderr:2] bar
425 425 if 'engine':
426 426 Display outputs for each engine before moving on to the next:
427 427
428 428 [stdout:1] foo
429 429 [stderr:1] bar
430 430 [stdout:2] foo
431 431 [stderr:2] bar
432 432
433 433 if 'order':
434 434 Like 'type', but further collate individual displaypub
435 435 outputs. This is meant for cases of each command producing
436 436 several plots, and you would like to see all of the first
437 437 plots together, then all of the second plots, and so on.
438 438 """
439 439 # flush iopub, just in case
440 440 self._client._flush_iopub(self._client._iopub_socket)
441 441 if self._single_result:
442 442 self._display_single_result()
443 443 return
444 444
445 445 stdouts = [s.rstrip() for s in self.stdout]
446 446 stderrs = [s.rstrip() for s in self.stderr]
447 447 pyouts = [p for p in self.pyout]
448 448 output_lists = self.outputs
449 449 results = self.get()
450 450
451 451 targets = self.engine_id
452 452
453 453 if groupby == "engine":
454 454 for eid,stdout,stderr,outputs,r,pyout in zip(
455 455 targets, stdouts, stderrs, output_lists, results, pyouts
456 456 ):
457 457 if stdout:
458 458 print '[stdout:%2i]' % eid, stdout
459 459 if stderr:
460 print '[stderr:%2i]' % eid, stderr
460 print >> sys.stderr, '[stderr:%2i]' % eid, stderr
461 461
462 462 try:
463 463 get_ipython()
464 464 except NameError:
465 465 # displaypub is meaningless outside IPython
466 466 return
467 467
468 468 for output in outputs:
469 469 self._republish_displaypub(output, eid)
470 470
471 471 if pyout is not None:
472 472 display(r)
473 473
474 474 elif groupby in ('type', 'order'):
475 475 # republish stdout:
476 476 if any(stdouts):
477 477 for eid,stdout in zip(targets, stdouts):
478 478 print '[stdout:%2i]' % eid, stdout
479 479
480 480 # republish stderr:
481 481 if any(stderrs):
482 482 for eid,stderr in zip(targets, stderrs):
483 483 print >> sys.stderr, '[stderr:%2i]' % eid, stderr
484 484
485 485 try:
486 486 get_ipython()
487 487 except NameError:
488 488 # displaypub is meaningless outside IPython
489 489 return
490 490
491 491 if groupby == 'order':
492 492 output_dict = dict((eid, outputs) for eid,outputs in zip(targets, output_lists))
493 493 N = max(len(outputs) for outputs in output_lists)
494 494 for i in range(N):
495 495 for eid in targets:
496 496 outputs = output_dict[eid]
497 497 if len(outputs) >= N:
498 498 self._republish_displaypub(outputs[i], eid)
499 499 else:
500 500 # republish displaypub output
501 501 for eid,outputs in zip(targets, output_lists):
502 502 for output in outputs:
503 503 self._republish_displaypub(output, eid)
504 504
505 505 # finally, add pyout:
506 506 for eid,r,pyout in zip(targets, results, pyouts):
507 507 if pyout is not None:
508 508 display(r)
509 509
510 510 else:
511 511 raise ValueError("groupby must be one of 'type', 'engine', 'collate', not %r" % groupby)
512 512
513 513
514 514
515 515
516 516 class AsyncMapResult(AsyncResult):
517 517 """Class for representing results of non-blocking gathers.
518 518
519 519 This will properly reconstruct the gather.
520 520
521 521 This class is iterable at any time, and will wait on results as they come.
522 522
523 523 If ordered=False, then the first results to arrive will come first, otherwise
524 524 results will be yielded in the order they were submitted.
525 525
526 526 """
527 527
528 528 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
529 529 AsyncResult.__init__(self, client, msg_ids, fname=fname)
530 530 self._mapObject = mapObject
531 531 self._single_result = False
532 532 self.ordered = ordered
533 533
534 534 def _reconstruct_result(self, res):
535 535 """Perform the gather on the actual results."""
536 536 return self._mapObject.joinPartitions(res)
537 537
538 538 # asynchronous iterator:
539 539 def __iter__(self):
540 540 it = self._ordered_iter if self.ordered else self._unordered_iter
541 541 for r in it():
542 542 yield r
543 543
544 544 # asynchronous ordered iterator:
545 545 def _ordered_iter(self):
546 546 """iterator for results *as they arrive*, preserving submission order."""
547 547 try:
548 548 rlist = self.get(0)
549 549 except error.TimeoutError:
550 550 # wait for each result individually
551 551 for msg_id in self.msg_ids:
552 552 ar = AsyncResult(self._client, msg_id, self._fname)
553 553 rlist = ar.get()
554 554 try:
555 555 for r in rlist:
556 556 yield r
557 557 except TypeError:
558 558 # flattened, not a list
559 559 # this could get broken by flattened data that returns iterables
560 560 # but most calls to map do not expose the `flatten` argument
561 561 yield rlist
562 562 else:
563 563 # already done
564 564 for r in rlist:
565 565 yield r
566 566
567 567 # asynchronous unordered iterator:
568 568 def _unordered_iter(self):
569 569 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
570 570 try:
571 571 rlist = self.get(0)
572 572 except error.TimeoutError:
573 573 pending = set(self.msg_ids)
574 574 while pending:
575 575 try:
576 576 self._client.wait(pending, 1e-3)
577 577 except error.TimeoutError:
578 578 # ignore timeout error, because that only means
579 579 # *some* jobs are outstanding
580 580 pass
581 581 # update ready set with those no longer outstanding:
582 582 ready = pending.difference(self._client.outstanding)
583 583 # update pending to exclude those that are finished
584 584 pending = pending.difference(ready)
585 585 while ready:
586 586 msg_id = ready.pop()
587 587 ar = AsyncResult(self._client, msg_id, self._fname)
588 588 rlist = ar.get()
589 589 try:
590 590 for r in rlist:
591 591 yield r
592 592 except TypeError:
593 593 # flattened, not a list
594 594 # this could get broken by flattened data that returns iterables
595 595 # but most calls to map do not expose the `flatten` argument
596 596 yield rlist
597 597 else:
598 598 # already done
599 599 for r in rlist:
600 600 yield r
601 601
602 602
603 603
604 604 class AsyncHubResult(AsyncResult):
605 605 """Class to wrap pending results that must be requested from the Hub.
606 606
607 607 Note that waiting/polling on these objects requires polling the Hubover the network,
608 608 so use `AsyncHubResult.wait()` sparingly.
609 609 """
610 610
611 611 def wait(self, timeout=-1):
612 612 """wait for result to complete."""
613 613 start = time.time()
614 614 if self._ready:
615 615 return
616 616 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
617 617 local_ready = self._client.wait(local_ids, timeout)
618 618 if local_ready:
619 619 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
620 620 if not remote_ids:
621 621 self._ready = True
622 622 else:
623 623 rdict = self._client.result_status(remote_ids, status_only=False)
624 624 pending = rdict['pending']
625 625 while pending and (timeout < 0 or time.time() < start+timeout):
626 626 rdict = self._client.result_status(remote_ids, status_only=False)
627 627 pending = rdict['pending']
628 628 if pending:
629 629 time.sleep(0.1)
630 630 if not pending:
631 631 self._ready = True
632 632 if self._ready:
633 633 try:
634 634 results = map(self._client.results.get, self.msg_ids)
635 635 self._result = results
636 636 if self._single_result:
637 637 r = results[0]
638 638 if isinstance(r, Exception):
639 639 raise r
640 640 else:
641 641 results = error.collect_exceptions(results, self._fname)
642 642 self._result = self._reconstruct_result(results)
643 643 except Exception, e:
644 644 self._exception = e
645 645 self._success = False
646 646 else:
647 647 self._success = True
648 648 finally:
649 649 self._metadata = map(self._client.metadata.get, self.msg_ids)
650 650
651 651 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult'] No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now