##// END OF EJS Templates
skip ugly %2i formatting...
MinRK -
Show More
@@ -1,651 +1,651 b''
1 1 """AsyncResult objects for the client
2 2
3 3 Authors:
4 4
5 5 * MinRK
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2010-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import sys
19 19 import time
20 20 from datetime import datetime
21 21
22 22 from zmq import MessageTracker
23 23
24 24 from IPython.core.display import clear_output, display
25 25 from IPython.external.decorator import decorator
26 26 from IPython.parallel import error
27 27
28 28 #-----------------------------------------------------------------------------
29 29 # Functions
30 30 #-----------------------------------------------------------------------------
31 31
32 32 def _total_seconds(td):
33 33 """timedelta.total_seconds was added in 2.7"""
34 34 try:
35 35 # Python >= 2.7
36 36 return td.total_seconds()
37 37 except AttributeError:
38 38 # Python 2.6
39 39 return 1e-6 * (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)
40 40
41 41 #-----------------------------------------------------------------------------
42 42 # Classes
43 43 #-----------------------------------------------------------------------------
44 44
45 45 # global empty tracker that's always done:
46 46 finished_tracker = MessageTracker()
47 47
48 48 @decorator
49 49 def check_ready(f, self, *args, **kwargs):
50 50 """Call spin() to sync state prior to calling the method."""
51 51 self.wait(0)
52 52 if not self._ready:
53 53 raise error.TimeoutError("result not ready")
54 54 return f(self, *args, **kwargs)
55 55
56 56 class AsyncResult(object):
57 57 """Class for representing results of non-blocking calls.
58 58
59 59 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
60 60 """
61 61
62 62 msg_ids = None
63 63 _targets = None
64 64 _tracker = None
65 65 _single_result = False
66 66
67 67 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
68 68 if isinstance(msg_ids, basestring):
69 69 # always a list
70 70 msg_ids = [msg_ids]
71 71 if tracker is None:
72 72 # default to always done
73 73 tracker = finished_tracker
74 74 self._client = client
75 75 self.msg_ids = msg_ids
76 76 self._fname=fname
77 77 self._targets = targets
78 78 self._tracker = tracker
79 79 self._ready = False
80 80 self._success = None
81 81 self._metadata = None
82 82 if len(msg_ids) == 1:
83 83 self._single_result = not isinstance(targets, (list, tuple))
84 84 else:
85 85 self._single_result = False
86 86
87 87 def __repr__(self):
88 88 if self._ready:
89 89 return "<%s: finished>"%(self.__class__.__name__)
90 90 else:
91 91 return "<%s: %s>"%(self.__class__.__name__,self._fname)
92 92
93 93
94 94 def _reconstruct_result(self, res):
95 95 """Reconstruct our result from actual result list (always a list)
96 96
97 97 Override me in subclasses for turning a list of results
98 98 into the expected form.
99 99 """
100 100 if self._single_result:
101 101 return res[0]
102 102 else:
103 103 return res
104 104
105 105 def get(self, timeout=-1):
106 106 """Return the result when it arrives.
107 107
108 108 If `timeout` is not ``None`` and the result does not arrive within
109 109 `timeout` seconds then ``TimeoutError`` is raised. If the
110 110 remote call raised an exception then that exception will be reraised
111 111 by get() inside a `RemoteError`.
112 112 """
113 113 if not self.ready():
114 114 self.wait(timeout)
115 115
116 116 if self._ready:
117 117 if self._success:
118 118 return self._result
119 119 else:
120 120 raise self._exception
121 121 else:
122 122 raise error.TimeoutError("Result not ready.")
123 123
124 124 def ready(self):
125 125 """Return whether the call has completed."""
126 126 if not self._ready:
127 127 self.wait(0)
128 128 return self._ready
129 129
130 130 def wait(self, timeout=-1):
131 131 """Wait until the result is available or until `timeout` seconds pass.
132 132
133 133 This method always returns None.
134 134 """
135 135 if self._ready:
136 136 return
137 137 self._ready = self._client.wait(self.msg_ids, timeout)
138 138 if self._ready:
139 139 try:
140 140 results = map(self._client.results.get, self.msg_ids)
141 141 self._result = results
142 142 if self._single_result:
143 143 r = results[0]
144 144 if isinstance(r, Exception):
145 145 raise r
146 146 else:
147 147 results = error.collect_exceptions(results, self._fname)
148 148 self._result = self._reconstruct_result(results)
149 149 except Exception, e:
150 150 self._exception = e
151 151 self._success = False
152 152 else:
153 153 self._success = True
154 154 finally:
155 155 self._metadata = map(self._client.metadata.get, self.msg_ids)
156 156
157 157
158 158 def successful(self):
159 159 """Return whether the call completed without raising an exception.
160 160
161 161 Will raise ``AssertionError`` if the result is not ready.
162 162 """
163 163 assert self.ready()
164 164 return self._success
165 165
166 166 #----------------------------------------------------------------
167 167 # Extra methods not in mp.pool.AsyncResult
168 168 #----------------------------------------------------------------
169 169
170 170 def get_dict(self, timeout=-1):
171 171 """Get the results as a dict, keyed by engine_id.
172 172
173 173 timeout behavior is described in `get()`.
174 174 """
175 175
176 176 results = self.get(timeout)
177 177 engine_ids = [ md['engine_id'] for md in self._metadata ]
178 178 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
179 179 maxcount = bycount.count(bycount[-1])
180 180 if maxcount > 1:
181 181 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
182 182 maxcount, bycount[-1]))
183 183
184 184 return dict(zip(engine_ids,results))
185 185
186 186 @property
187 187 def result(self):
188 188 """result property wrapper for `get(timeout=0)`."""
189 189 return self.get()
190 190
191 191 # abbreviated alias:
192 192 r = result
193 193
194 194 @property
195 195 @check_ready
196 196 def metadata(self):
197 197 """property for accessing execution metadata."""
198 198 if self._single_result:
199 199 return self._metadata[0]
200 200 else:
201 201 return self._metadata
202 202
203 203 @property
204 204 def result_dict(self):
205 205 """result property as a dict."""
206 206 return self.get_dict()
207 207
208 208 def __dict__(self):
209 209 return self.get_dict(0)
210 210
211 211 def abort(self):
212 212 """abort my tasks."""
213 213 assert not self.ready(), "Can't abort, I am already done!"
214 214 return self._client.abort(self.msg_ids, targets=self._targets, block=True)
215 215
216 216 @property
217 217 def sent(self):
218 218 """check whether my messages have been sent."""
219 219 return self._tracker.done
220 220
221 221 def wait_for_send(self, timeout=-1):
222 222 """wait for pyzmq send to complete.
223 223
224 224 This is necessary when sending arrays that you intend to edit in-place.
225 225 `timeout` is in seconds, and will raise TimeoutError if it is reached
226 226 before the send completes.
227 227 """
228 228 return self._tracker.wait(timeout)
229 229
230 230 #-------------------------------------
231 231 # dict-access
232 232 #-------------------------------------
233 233
234 234 @check_ready
235 235 def __getitem__(self, key):
236 236 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
237 237 """
238 238 if isinstance(key, int):
239 239 return error.collect_exceptions([self._result[key]], self._fname)[0]
240 240 elif isinstance(key, slice):
241 241 return error.collect_exceptions(self._result[key], self._fname)
242 242 elif isinstance(key, basestring):
243 243 values = [ md[key] for md in self._metadata ]
244 244 if self._single_result:
245 245 return values[0]
246 246 else:
247 247 return values
248 248 else:
249 249 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
250 250
251 251 def __getattr__(self, key):
252 252 """getattr maps to getitem for convenient attr access to metadata."""
253 253 try:
254 254 return self.__getitem__(key)
255 255 except (error.TimeoutError, KeyError):
256 256 raise AttributeError("%r object has no attribute %r"%(
257 257 self.__class__.__name__, key))
258 258
259 259 # asynchronous iterator:
260 260 def __iter__(self):
261 261 if self._single_result:
262 262 raise TypeError("AsyncResults with a single result are not iterable.")
263 263 try:
264 264 rlist = self.get(0)
265 265 except error.TimeoutError:
266 266 # wait for each result individually
267 267 for msg_id in self.msg_ids:
268 268 ar = AsyncResult(self._client, msg_id, self._fname)
269 269 yield ar.get()
270 270 else:
271 271 # already done
272 272 for r in rlist:
273 273 yield r
274 274
275 275 def __len__(self):
276 276 return len(self.msg_ids)
277 277
278 278 #-------------------------------------
279 279 # Sugar methods and attributes
280 280 #-------------------------------------
281 281
282 282 def timedelta(self, start, end, start_key=min, end_key=max):
283 283 """compute the difference between two sets of timestamps
284 284
285 285 The default behavior is to use the earliest of the first
286 286 and the latest of the second list, but this can be changed
287 287 by passing a different
288 288
289 289 Parameters
290 290 ----------
291 291
292 292 start : one or more datetime objects (e.g. ar.submitted)
293 293 end : one or more datetime objects (e.g. ar.received)
294 294 start_key : callable
295 295 Function to call on `start` to extract the relevant
296 296 entry [defalt: min]
297 297 end_key : callable
298 298 Function to call on `end` to extract the relevant
299 299 entry [default: max]
300 300
301 301 Returns
302 302 -------
303 303
304 304 dt : float
305 305 The time elapsed (in seconds) between the two selected timestamps.
306 306 """
307 307 if not isinstance(start, datetime):
308 308 # handle single_result AsyncResults, where ar.stamp is single object,
309 309 # not a list
310 310 start = start_key(start)
311 311 if not isinstance(end, datetime):
312 312 # handle single_result AsyncResults, where ar.stamp is single object,
313 313 # not a list
314 314 end = end_key(end)
315 315 return _total_seconds(end - start)
316 316
317 317 @property
318 318 def progress(self):
319 319 """the number of tasks which have been completed at this point.
320 320
321 321 Fractional progress would be given by 1.0 * ar.progress / len(ar)
322 322 """
323 323 self.wait(0)
324 324 return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding))
325 325
326 326 @property
327 327 def elapsed(self):
328 328 """elapsed time since initial submission"""
329 329 if self.ready():
330 330 return self.wall_time
331 331
332 332 now = submitted = datetime.now()
333 333 for msg_id in self.msg_ids:
334 334 if msg_id in self._client.metadata:
335 335 stamp = self._client.metadata[msg_id]['submitted']
336 336 if stamp and stamp < submitted:
337 337 submitted = stamp
338 338 return _total_seconds(now-submitted)
339 339
340 340 @property
341 341 @check_ready
342 342 def serial_time(self):
343 343 """serial computation time of a parallel calculation
344 344
345 345 Computed as the sum of (completed-started) of each task
346 346 """
347 347 t = 0
348 348 for md in self._metadata:
349 349 t += _total_seconds(md['completed'] - md['started'])
350 350 return t
351 351
352 352 @property
353 353 @check_ready
354 354 def wall_time(self):
355 355 """actual computation time of a parallel calculation
356 356
357 357 Computed as the time between the latest `received` stamp
358 358 and the earliest `submitted`.
359 359
360 360 Only reliable if Client was spinning/waiting when the task finished, because
361 361 the `received` timestamp is created when a result is pulled off of the zmq queue,
362 362 which happens as a result of `client.spin()`.
363 363
364 364 For similar comparison of other timestamp pairs, check out AsyncResult.timedelta.
365 365
366 366 """
367 367 return self.timedelta(self.submitted, self.received)
368 368
369 369 def wait_interactive(self, interval=1., timeout=None):
370 370 """interactive wait, printing progress at regular intervals"""
371 371 N = len(self)
372 372 tic = time.time()
373 373 while not self.ready() and (timeout is None or time.time() - tic <= timeout):
374 374 self.wait(interval)
375 375 clear_output()
376 376 print "%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed),
377 377 sys.stdout.flush()
378 378 print
379 379 print "done"
380 380
381 381 def _republish_displaypub(self, content, eid):
382 382 """republish individual displaypub content dicts"""
383 383 try:
384 384 ip = get_ipython()
385 385 except NameError:
386 386 # displaypub is meaningless outside IPython
387 387 return
388 388 md = content['metadata'] or {}
389 389 md['engine'] = eid
390 390 ip.display_pub.publish(content['source'], content['data'], md)
391 391
392 392
393 393 def _display_single_result(self):
394 394
395 395 print self.stdout
396 396 print >> sys.stderr, self.stderr
397 397
398 398 try:
399 399 get_ipython()
400 400 except NameError:
401 401 # displaypub is meaningless outside IPython
402 402 return
403 403
404 404 for output in self.outputs:
405 405 self._republish_displaypub(output, self.engine_id)
406 406
407 407 if self.pyout is not None:
408 408 display(self.get())
409 409
410 410 @check_ready
411 411 def display_outputs(self, groupby="type"):
412 412 """republish the outputs of the computation
413 413
414 414 Parameters
415 415 ----------
416 416
417 417 groupby : str [default: type]
418 418 if 'type':
419 419 Group outputs by type (show all stdout, then all stderr, etc.):
420 420
421 421 [stdout:1] foo
422 422 [stdout:2] foo
423 423 [stderr:1] bar
424 424 [stderr:2] bar
425 425 if 'engine':
426 426 Display outputs for each engine before moving on to the next:
427 427
428 428 [stdout:1] foo
429 429 [stderr:1] bar
430 430 [stdout:2] foo
431 431 [stderr:2] bar
432 432
433 433 if 'order':
434 434 Like 'type', but further collate individual displaypub
435 435 outputs. This is meant for cases of each command producing
436 436 several plots, and you would like to see all of the first
437 437 plots together, then all of the second plots, and so on.
438 438 """
439 439 # flush iopub, just in case
440 440 self._client._flush_iopub(self._client._iopub_socket)
441 441 if self._single_result:
442 442 self._display_single_result()
443 443 return
444 444
445 445 stdouts = [s.rstrip() for s in self.stdout]
446 446 stderrs = [s.rstrip() for s in self.stderr]
447 447 pyouts = [p for p in self.pyout]
448 448 output_lists = self.outputs
449 449 results = self.get()
450 450
451 451 targets = self.engine_id
452 452
453 453 if groupby == "engine":
454 454 for eid,stdout,stderr,outputs,r,pyout in zip(
455 455 targets, stdouts, stderrs, output_lists, results, pyouts
456 456 ):
457 457 if stdout:
458 print '[stdout:%2i]' % eid, stdout
458 print '[stdout:%i]' % eid, stdout
459 459 if stderr:
460 print >> sys.stderr, '[stderr:%2i]' % eid, stderr
460 print >> sys.stderr, '[stderr:%i]' % eid, stderr
461 461
462 462 try:
463 463 get_ipython()
464 464 except NameError:
465 465 # displaypub is meaningless outside IPython
466 466 return
467 467
468 468 for output in outputs:
469 469 self._republish_displaypub(output, eid)
470 470
471 471 if pyout is not None:
472 472 display(r)
473 473
474 474 elif groupby in ('type', 'order'):
475 475 # republish stdout:
476 476 if any(stdouts):
477 477 for eid,stdout in zip(targets, stdouts):
478 print '[stdout:%2i]' % eid, stdout
478 print '[stdout:%i]' % eid, stdout
479 479
480 480 # republish stderr:
481 481 if any(stderrs):
482 482 for eid,stderr in zip(targets, stderrs):
483 print >> sys.stderr, '[stderr:%2i]' % eid, stderr
483 print >> sys.stderr, '[stderr:%i]' % eid, stderr
484 484
485 485 try:
486 486 get_ipython()
487 487 except NameError:
488 488 # displaypub is meaningless outside IPython
489 489 return
490 490
491 491 if groupby == 'order':
492 492 output_dict = dict((eid, outputs) for eid,outputs in zip(targets, output_lists))
493 493 N = max(len(outputs) for outputs in output_lists)
494 494 for i in range(N):
495 495 for eid in targets:
496 496 outputs = output_dict[eid]
497 497 if len(outputs) >= N:
498 498 self._republish_displaypub(outputs[i], eid)
499 499 else:
500 500 # republish displaypub output
501 501 for eid,outputs in zip(targets, output_lists):
502 502 for output in outputs:
503 503 self._republish_displaypub(output, eid)
504 504
505 505 # finally, add pyout:
506 506 for eid,r,pyout in zip(targets, results, pyouts):
507 507 if pyout is not None:
508 508 display(r)
509 509
510 510 else:
511 511 raise ValueError("groupby must be one of 'type', 'engine', 'collate', not %r" % groupby)
512 512
513 513
514 514
515 515
516 516 class AsyncMapResult(AsyncResult):
517 517 """Class for representing results of non-blocking gathers.
518 518
519 519 This will properly reconstruct the gather.
520 520
521 521 This class is iterable at any time, and will wait on results as they come.
522 522
523 523 If ordered=False, then the first results to arrive will come first, otherwise
524 524 results will be yielded in the order they were submitted.
525 525
526 526 """
527 527
528 528 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
529 529 AsyncResult.__init__(self, client, msg_ids, fname=fname)
530 530 self._mapObject = mapObject
531 531 self._single_result = False
532 532 self.ordered = ordered
533 533
534 534 def _reconstruct_result(self, res):
535 535 """Perform the gather on the actual results."""
536 536 return self._mapObject.joinPartitions(res)
537 537
538 538 # asynchronous iterator:
539 539 def __iter__(self):
540 540 it = self._ordered_iter if self.ordered else self._unordered_iter
541 541 for r in it():
542 542 yield r
543 543
544 544 # asynchronous ordered iterator:
545 545 def _ordered_iter(self):
546 546 """iterator for results *as they arrive*, preserving submission order."""
547 547 try:
548 548 rlist = self.get(0)
549 549 except error.TimeoutError:
550 550 # wait for each result individually
551 551 for msg_id in self.msg_ids:
552 552 ar = AsyncResult(self._client, msg_id, self._fname)
553 553 rlist = ar.get()
554 554 try:
555 555 for r in rlist:
556 556 yield r
557 557 except TypeError:
558 558 # flattened, not a list
559 559 # this could get broken by flattened data that returns iterables
560 560 # but most calls to map do not expose the `flatten` argument
561 561 yield rlist
562 562 else:
563 563 # already done
564 564 for r in rlist:
565 565 yield r
566 566
567 567 # asynchronous unordered iterator:
568 568 def _unordered_iter(self):
569 569 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
570 570 try:
571 571 rlist = self.get(0)
572 572 except error.TimeoutError:
573 573 pending = set(self.msg_ids)
574 574 while pending:
575 575 try:
576 576 self._client.wait(pending, 1e-3)
577 577 except error.TimeoutError:
578 578 # ignore timeout error, because that only means
579 579 # *some* jobs are outstanding
580 580 pass
581 581 # update ready set with those no longer outstanding:
582 582 ready = pending.difference(self._client.outstanding)
583 583 # update pending to exclude those that are finished
584 584 pending = pending.difference(ready)
585 585 while ready:
586 586 msg_id = ready.pop()
587 587 ar = AsyncResult(self._client, msg_id, self._fname)
588 588 rlist = ar.get()
589 589 try:
590 590 for r in rlist:
591 591 yield r
592 592 except TypeError:
593 593 # flattened, not a list
594 594 # this could get broken by flattened data that returns iterables
595 595 # but most calls to map do not expose the `flatten` argument
596 596 yield rlist
597 597 else:
598 598 # already done
599 599 for r in rlist:
600 600 yield r
601 601
602 602
603 603
604 604 class AsyncHubResult(AsyncResult):
605 605 """Class to wrap pending results that must be requested from the Hub.
606 606
607 607 Note that waiting/polling on these objects requires polling the Hubover the network,
608 608 so use `AsyncHubResult.wait()` sparingly.
609 609 """
610 610
611 611 def wait(self, timeout=-1):
612 612 """wait for result to complete."""
613 613 start = time.time()
614 614 if self._ready:
615 615 return
616 616 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
617 617 local_ready = self._client.wait(local_ids, timeout)
618 618 if local_ready:
619 619 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
620 620 if not remote_ids:
621 621 self._ready = True
622 622 else:
623 623 rdict = self._client.result_status(remote_ids, status_only=False)
624 624 pending = rdict['pending']
625 625 while pending and (timeout < 0 or time.time() < start+timeout):
626 626 rdict = self._client.result_status(remote_ids, status_only=False)
627 627 pending = rdict['pending']
628 628 if pending:
629 629 time.sleep(0.1)
630 630 if not pending:
631 631 self._ready = True
632 632 if self._ready:
633 633 try:
634 634 results = map(self._client.results.get, self.msg_ids)
635 635 self._result = results
636 636 if self._single_result:
637 637 r = results[0]
638 638 if isinstance(r, Exception):
639 639 raise r
640 640 else:
641 641 results = error.collect_exceptions(results, self._fname)
642 642 self._result = self._reconstruct_result(results)
643 643 except Exception, e:
644 644 self._exception = e
645 645 self._success = False
646 646 else:
647 647 self._success = True
648 648 finally:
649 649 self._metadata = map(self._client.metadata.get, self.msg_ids)
650 650
651 651 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult'] No newline at end of file
@@ -1,1655 +1,1655 b''
1 1 """A semi-synchronous Client for the ZMQ cluster
2 2
3 3 Authors:
4 4
5 5 * MinRK
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2010-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import os
19 19 import json
20 20 import sys
21 21 from threading import Thread, Event
22 22 import time
23 23 import warnings
24 24 from datetime import datetime
25 25 from getpass import getpass
26 26 from pprint import pprint
27 27
28 28 pjoin = os.path.join
29 29
30 30 import zmq
31 31 # from zmq.eventloop import ioloop, zmqstream
32 32
33 33 from IPython.config.configurable import MultipleInstanceError
34 34 from IPython.core.application import BaseIPythonApplication
35 35
36 36 from IPython.utils.coloransi import TermColors
37 37 from IPython.utils.jsonutil import rekey
38 38 from IPython.utils.localinterfaces import LOCAL_IPS
39 39 from IPython.utils.path import get_ipython_dir
40 40 from IPython.utils.py3compat import cast_bytes
41 41 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
42 42 Dict, List, Bool, Set, Any)
43 43 from IPython.external.decorator import decorator
44 44 from IPython.external.ssh import tunnel
45 45
46 46 from IPython.parallel import Reference
47 47 from IPython.parallel import error
48 48 from IPython.parallel import util
49 49
50 50 from IPython.zmq.session import Session, Message
51 51
52 52 from .asyncresult import AsyncResult, AsyncHubResult
53 53 from IPython.core.profiledir import ProfileDir, ProfileDirError
54 54 from .view import DirectView, LoadBalancedView
55 55
56 56 if sys.version_info[0] >= 3:
57 57 # xrange is used in a couple 'isinstance' tests in py2
58 58 # should be just 'range' in 3k
59 59 xrange = range
60 60
61 61 #--------------------------------------------------------------------------
62 62 # Decorators for Client methods
63 63 #--------------------------------------------------------------------------
64 64
65 65 @decorator
66 66 def spin_first(f, self, *args, **kwargs):
67 67 """Call spin() to sync state prior to calling the method."""
68 68 self.spin()
69 69 return f(self, *args, **kwargs)
70 70
71 71
72 72 #--------------------------------------------------------------------------
73 73 # Classes
74 74 #--------------------------------------------------------------------------
75 75
76 76
77 77 class ExecuteReply(object):
78 78 """wrapper for finished Execute results"""
79 79 def __init__(self, msg_id, content, metadata):
80 80 self.msg_id = msg_id
81 81 self._content = content
82 82 self.execution_count = content['execution_count']
83 83 self.metadata = metadata
84 84
85 85 def __getitem__(self, key):
86 86 return self.metadata[key]
87 87
88 88 def __getattr__(self, key):
89 89 if key not in self.metadata:
90 90 raise AttributeError(key)
91 91 return self.metadata[key]
92 92
93 93 def __repr__(self):
94 94 pyout = self.metadata['pyout'] or {'data':{}}
95 95 text_out = pyout['data'].get('text/plain', '')
96 96 if len(text_out) > 32:
97 97 text_out = text_out[:29] + '...'
98 98
99 99 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
100 100
101 101 def _repr_pretty_(self, p, cycle):
102 102 pyout = self.metadata['pyout'] or {'data':{}}
103 103 text_out = pyout['data'].get('text/plain', '')
104 104
105 105 if not text_out:
106 106 return
107 107
108 108 try:
109 109 ip = get_ipython()
110 110 except NameError:
111 111 colors = "NoColor"
112 112 else:
113 113 colors = ip.colors
114 114
115 115 if colors == "NoColor":
116 116 out = normal = ""
117 117 else:
118 118 out = TermColors.Red
119 119 normal = TermColors.Normal
120 120
121 121 p.text(
122 u'[%2i] ' % self.metadata['engine_id'] +
123 out + u'Out[%2i]: ' % self.execution_count +
122 u'[%i] ' % self.metadata['engine_id'] +
123 out + u'Out[%i]: ' % self.execution_count +
124 124 normal + text_out
125 125 )
126 126
127 127 def _repr_html_(self):
128 128 pyout = self.metadata['pyout'] or {'data':{}}
129 129 return pyout['data'].get("text/html")
130 130
131 131 def _repr_latex_(self):
132 132 pyout = self.metadata['pyout'] or {'data':{}}
133 133 return pyout['data'].get("text/latex")
134 134
135 135 def _repr_json_(self):
136 136 pyout = self.metadata['pyout'] or {'data':{}}
137 137 return pyout['data'].get("application/json")
138 138
139 139 def _repr_javascript_(self):
140 140 pyout = self.metadata['pyout'] or {'data':{}}
141 141 return pyout['data'].get("application/javascript")
142 142
143 143 def _repr_png_(self):
144 144 pyout = self.metadata['pyout'] or {'data':{}}
145 145 return pyout['data'].get("image/png")
146 146
147 147 def _repr_jpeg_(self):
148 148 pyout = self.metadata['pyout'] or {'data':{}}
149 149 return pyout['data'].get("image/jpeg")
150 150
151 151 def _repr_svg_(self):
152 152 pyout = self.metadata['pyout'] or {'data':{}}
153 153 return pyout['data'].get("image/svg+xml")
154 154
155 155
156 156 class Metadata(dict):
157 157 """Subclass of dict for initializing metadata values.
158 158
159 159 Attribute access works on keys.
160 160
161 161 These objects have a strict set of keys - errors will raise if you try
162 162 to add new keys.
163 163 """
164 164 def __init__(self, *args, **kwargs):
165 165 dict.__init__(self)
166 166 md = {'msg_id' : None,
167 167 'submitted' : None,
168 168 'started' : None,
169 169 'completed' : None,
170 170 'received' : None,
171 171 'engine_uuid' : None,
172 172 'engine_id' : None,
173 173 'follow' : None,
174 174 'after' : None,
175 175 'status' : None,
176 176
177 177 'pyin' : None,
178 178 'pyout' : None,
179 179 'pyerr' : None,
180 180 'stdout' : '',
181 181 'stderr' : '',
182 182 'outputs' : [],
183 183 }
184 184 self.update(md)
185 185 self.update(dict(*args, **kwargs))
186 186
187 187 def __getattr__(self, key):
188 188 """getattr aliased to getitem"""
189 189 if key in self.iterkeys():
190 190 return self[key]
191 191 else:
192 192 raise AttributeError(key)
193 193
194 194 def __setattr__(self, key, value):
195 195 """setattr aliased to setitem, with strict"""
196 196 if key in self.iterkeys():
197 197 self[key] = value
198 198 else:
199 199 raise AttributeError(key)
200 200
201 201 def __setitem__(self, key, value):
202 202 """strict static key enforcement"""
203 203 if key in self.iterkeys():
204 204 dict.__setitem__(self, key, value)
205 205 else:
206 206 raise KeyError(key)
207 207
208 208
209 209 class Client(HasTraits):
210 210 """A semi-synchronous client to the IPython ZMQ cluster
211 211
212 212 Parameters
213 213 ----------
214 214
215 215 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
216 216 Connection information for the Hub's registration. If a json connector
217 217 file is given, then likely no further configuration is necessary.
218 218 [Default: use profile]
219 219 profile : bytes
220 220 The name of the Cluster profile to be used to find connector information.
221 221 If run from an IPython application, the default profile will be the same
222 222 as the running application, otherwise it will be 'default'.
223 223 context : zmq.Context
224 224 Pass an existing zmq.Context instance, otherwise the client will create its own.
225 225 debug : bool
226 226 flag for lots of message printing for debug purposes
227 227 timeout : int/float
228 228 time (in seconds) to wait for connection replies from the Hub
229 229 [Default: 10]
230 230
231 231 #-------------- session related args ----------------
232 232
233 233 config : Config object
234 234 If specified, this will be relayed to the Session for configuration
235 235 username : str
236 236 set username for the session object
237 237 packer : str (import_string) or callable
238 238 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
239 239 function to serialize messages. Must support same input as
240 240 JSON, and output must be bytes.
241 241 You can pass a callable directly as `pack`
242 242 unpacker : str (import_string) or callable
243 243 The inverse of packer. Only necessary if packer is specified as *not* one
244 244 of 'json' or 'pickle'.
245 245
246 246 #-------------- ssh related args ----------------
247 247 # These are args for configuring the ssh tunnel to be used
248 248 # credentials are used to forward connections over ssh to the Controller
249 249 # Note that the ip given in `addr` needs to be relative to sshserver
250 250 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
251 251 # and set sshserver as the same machine the Controller is on. However,
252 252 # the only requirement is that sshserver is able to see the Controller
253 253 # (i.e. is within the same trusted network).
254 254
255 255 sshserver : str
256 256 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
257 257 If keyfile or password is specified, and this is not, it will default to
258 258 the ip given in addr.
259 259 sshkey : str; path to ssh private key file
260 260 This specifies a key to be used in ssh login, default None.
261 261 Regular default ssh keys will be used without specifying this argument.
262 262 password : str
263 263 Your ssh password to sshserver. Note that if this is left None,
264 264 you will be prompted for it if passwordless key based login is unavailable.
265 265 paramiko : bool
266 266 flag for whether to use paramiko instead of shell ssh for tunneling.
267 267 [default: True on win32, False else]
268 268
269 269 ------- exec authentication args -------
270 270 If even localhost is untrusted, you can have some protection against
271 271 unauthorized execution by signing messages with HMAC digests.
272 272 Messages are still sent as cleartext, so if someone can snoop your
273 273 loopback traffic this will not protect your privacy, but will prevent
274 274 unauthorized execution.
275 275
276 276 exec_key : str
277 277 an authentication key or file containing a key
278 278 default: None
279 279
280 280
281 281 Attributes
282 282 ----------
283 283
284 284 ids : list of int engine IDs
285 285 requesting the ids attribute always synchronizes
286 286 the registration state. To request ids without synchronization,
287 287 use semi-private _ids attributes.
288 288
289 289 history : list of msg_ids
290 290 a list of msg_ids, keeping track of all the execution
291 291 messages you have submitted in order.
292 292
293 293 outstanding : set of msg_ids
294 294 a set of msg_ids that have been submitted, but whose
295 295 results have not yet been received.
296 296
297 297 results : dict
298 298 a dict of all our results, keyed by msg_id
299 299
300 300 block : bool
301 301 determines default behavior when block not specified
302 302 in execution methods
303 303
304 304 Methods
305 305 -------
306 306
307 307 spin
308 308 flushes incoming results and registration state changes
309 309 control methods spin, and requesting `ids` also ensures up to date
310 310
311 311 wait
312 312 wait on one or more msg_ids
313 313
314 314 execution methods
315 315 apply
316 316 legacy: execute, run
317 317
318 318 data movement
319 319 push, pull, scatter, gather
320 320
321 321 query methods
322 322 queue_status, get_result, purge, result_status
323 323
324 324 control methods
325 325 abort, shutdown
326 326
327 327 """
328 328
329 329
330 330 block = Bool(False)
331 331 outstanding = Set()
332 332 results = Instance('collections.defaultdict', (dict,))
333 333 metadata = Instance('collections.defaultdict', (Metadata,))
334 334 history = List()
335 335 debug = Bool(False)
336 336 _spin_thread = Any()
337 337 _stop_spinning = Any()
338 338
339 339 profile=Unicode()
340 340 def _profile_default(self):
341 341 if BaseIPythonApplication.initialized():
342 342 # an IPython app *might* be running, try to get its profile
343 343 try:
344 344 return BaseIPythonApplication.instance().profile
345 345 except (AttributeError, MultipleInstanceError):
346 346 # could be a *different* subclass of config.Application,
347 347 # which would raise one of these two errors.
348 348 return u'default'
349 349 else:
350 350 return u'default'
351 351
352 352
353 353 _outstanding_dict = Instance('collections.defaultdict', (set,))
354 354 _ids = List()
355 355 _connected=Bool(False)
356 356 _ssh=Bool(False)
357 357 _context = Instance('zmq.Context')
358 358 _config = Dict()
359 359 _engines=Instance(util.ReverseDict, (), {})
360 360 # _hub_socket=Instance('zmq.Socket')
361 361 _query_socket=Instance('zmq.Socket')
362 362 _control_socket=Instance('zmq.Socket')
363 363 _iopub_socket=Instance('zmq.Socket')
364 364 _notification_socket=Instance('zmq.Socket')
365 365 _mux_socket=Instance('zmq.Socket')
366 366 _task_socket=Instance('zmq.Socket')
367 367 _task_scheme=Unicode()
368 368 _closed = False
369 369 _ignored_control_replies=Integer(0)
370 370 _ignored_hub_replies=Integer(0)
371 371
372 372 def __new__(self, *args, **kw):
373 373 # don't raise on positional args
374 374 return HasTraits.__new__(self, **kw)
375 375
376 376 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
377 377 context=None, debug=False, exec_key=None,
378 378 sshserver=None, sshkey=None, password=None, paramiko=None,
379 379 timeout=10, **extra_args
380 380 ):
381 381 if profile:
382 382 super(Client, self).__init__(debug=debug, profile=profile)
383 383 else:
384 384 super(Client, self).__init__(debug=debug)
385 385 if context is None:
386 386 context = zmq.Context.instance()
387 387 self._context = context
388 388 self._stop_spinning = Event()
389 389
390 390 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
391 391 if self._cd is not None:
392 392 if url_or_file is None:
393 393 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
394 394 if url_or_file is None:
395 395 raise ValueError(
396 396 "I can't find enough information to connect to a hub!"
397 397 " Please specify at least one of url_or_file or profile."
398 398 )
399 399
400 400 if not util.is_url(url_or_file):
401 401 # it's not a url, try for a file
402 402 if not os.path.exists(url_or_file):
403 403 if self._cd:
404 404 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
405 405 if not os.path.exists(url_or_file):
406 406 raise IOError("Connection file not found: %r" % url_or_file)
407 407 with open(url_or_file) as f:
408 408 cfg = json.loads(f.read())
409 409 else:
410 410 cfg = {'url':url_or_file}
411 411
412 412 # sync defaults from args, json:
413 413 if sshserver:
414 414 cfg['ssh'] = sshserver
415 415 if exec_key:
416 416 cfg['exec_key'] = exec_key
417 417 exec_key = cfg['exec_key']
418 418 location = cfg.setdefault('location', None)
419 419 cfg['url'] = util.disambiguate_url(cfg['url'], location)
420 420 url = cfg['url']
421 421 proto,addr,port = util.split_url(url)
422 422 if location is not None and addr == '127.0.0.1':
423 423 # location specified, and connection is expected to be local
424 424 if location not in LOCAL_IPS and not sshserver:
425 425 # load ssh from JSON *only* if the controller is not on
426 426 # this machine
427 427 sshserver=cfg['ssh']
428 428 if location not in LOCAL_IPS and not sshserver:
429 429 # warn if no ssh specified, but SSH is probably needed
430 430 # This is only a warning, because the most likely cause
431 431 # is a local Controller on a laptop whose IP is dynamic
432 432 warnings.warn("""
433 433 Controller appears to be listening on localhost, but not on this machine.
434 434 If this is true, you should specify Client(...,sshserver='you@%s')
435 435 or instruct your controller to listen on an external IP."""%location,
436 436 RuntimeWarning)
437 437 elif not sshserver:
438 438 # otherwise sync with cfg
439 439 sshserver = cfg['ssh']
440 440
441 441 self._config = cfg
442 442
443 443 self._ssh = bool(sshserver or sshkey or password)
444 444 if self._ssh and sshserver is None:
445 445 # default to ssh via localhost
446 446 sshserver = url.split('://')[1].split(':')[0]
447 447 if self._ssh and password is None:
448 448 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
449 449 password=False
450 450 else:
451 451 password = getpass("SSH Password for %s: "%sshserver)
452 452 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
453 453
454 454 # configure and construct the session
455 455 if exec_key is not None:
456 456 if os.path.isfile(exec_key):
457 457 extra_args['keyfile'] = exec_key
458 458 else:
459 459 exec_key = cast_bytes(exec_key)
460 460 extra_args['key'] = exec_key
461 461 self.session = Session(**extra_args)
462 462
463 463 self._query_socket = self._context.socket(zmq.DEALER)
464 464 self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
465 465 if self._ssh:
466 466 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
467 467 else:
468 468 self._query_socket.connect(url)
469 469
470 470 self.session.debug = self.debug
471 471
472 472 self._notification_handlers = {'registration_notification' : self._register_engine,
473 473 'unregistration_notification' : self._unregister_engine,
474 474 'shutdown_notification' : lambda msg: self.close(),
475 475 }
476 476 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
477 477 'apply_reply' : self._handle_apply_reply}
478 478 self._connect(sshserver, ssh_kwargs, timeout)
479 479
480 480 def __del__(self):
481 481 """cleanup sockets, but _not_ context."""
482 482 self.close()
483 483
484 484 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
485 485 if ipython_dir is None:
486 486 ipython_dir = get_ipython_dir()
487 487 if profile_dir is not None:
488 488 try:
489 489 self._cd = ProfileDir.find_profile_dir(profile_dir)
490 490 return
491 491 except ProfileDirError:
492 492 pass
493 493 elif profile is not None:
494 494 try:
495 495 self._cd = ProfileDir.find_profile_dir_by_name(
496 496 ipython_dir, profile)
497 497 return
498 498 except ProfileDirError:
499 499 pass
500 500 self._cd = None
501 501
502 502 def _update_engines(self, engines):
503 503 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
504 504 for k,v in engines.iteritems():
505 505 eid = int(k)
506 506 self._engines[eid] = v
507 507 self._ids.append(eid)
508 508 self._ids = sorted(self._ids)
509 509 if sorted(self._engines.keys()) != range(len(self._engines)) and \
510 510 self._task_scheme == 'pure' and self._task_socket:
511 511 self._stop_scheduling_tasks()
512 512
513 513 def _stop_scheduling_tasks(self):
514 514 """Stop scheduling tasks because an engine has been unregistered
515 515 from a pure ZMQ scheduler.
516 516 """
517 517 self._task_socket.close()
518 518 self._task_socket = None
519 519 msg = "An engine has been unregistered, and we are using pure " +\
520 520 "ZMQ task scheduling. Task farming will be disabled."
521 521 if self.outstanding:
522 522 msg += " If you were running tasks when this happened, " +\
523 523 "some `outstanding` msg_ids may never resolve."
524 524 warnings.warn(msg, RuntimeWarning)
525 525
526 526 def _build_targets(self, targets):
527 527 """Turn valid target IDs or 'all' into two lists:
528 528 (int_ids, uuids).
529 529 """
530 530 if not self._ids:
531 531 # flush notification socket if no engines yet, just in case
532 532 if not self.ids:
533 533 raise error.NoEnginesRegistered("Can't build targets without any engines")
534 534
535 535 if targets is None:
536 536 targets = self._ids
537 537 elif isinstance(targets, basestring):
538 538 if targets.lower() == 'all':
539 539 targets = self._ids
540 540 else:
541 541 raise TypeError("%r not valid str target, must be 'all'"%(targets))
542 542 elif isinstance(targets, int):
543 543 if targets < 0:
544 544 targets = self.ids[targets]
545 545 if targets not in self._ids:
546 546 raise IndexError("No such engine: %i"%targets)
547 547 targets = [targets]
548 548
549 549 if isinstance(targets, slice):
550 550 indices = range(len(self._ids))[targets]
551 551 ids = self.ids
552 552 targets = [ ids[i] for i in indices ]
553 553
554 554 if not isinstance(targets, (tuple, list, xrange)):
555 555 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
556 556
557 557 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
558 558
559 559 def _connect(self, sshserver, ssh_kwargs, timeout):
560 560 """setup all our socket connections to the cluster. This is called from
561 561 __init__."""
562 562
563 563 # Maybe allow reconnecting?
564 564 if self._connected:
565 565 return
566 566 self._connected=True
567 567
568 568 def connect_socket(s, url):
569 569 url = util.disambiguate_url(url, self._config['location'])
570 570 if self._ssh:
571 571 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
572 572 else:
573 573 return s.connect(url)
574 574
575 575 self.session.send(self._query_socket, 'connection_request')
576 576 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
577 577 poller = zmq.Poller()
578 578 poller.register(self._query_socket, zmq.POLLIN)
579 579 # poll expects milliseconds, timeout is seconds
580 580 evts = poller.poll(timeout*1000)
581 581 if not evts:
582 582 raise error.TimeoutError("Hub connection request timed out")
583 583 idents,msg = self.session.recv(self._query_socket,mode=0)
584 584 if self.debug:
585 585 pprint(msg)
586 586 msg = Message(msg)
587 587 content = msg.content
588 588 self._config['registration'] = dict(content)
589 589 if content.status == 'ok':
590 590 ident = self.session.bsession
591 591 if content.mux:
592 592 self._mux_socket = self._context.socket(zmq.DEALER)
593 593 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
594 594 connect_socket(self._mux_socket, content.mux)
595 595 if content.task:
596 596 self._task_scheme, task_addr = content.task
597 597 self._task_socket = self._context.socket(zmq.DEALER)
598 598 self._task_socket.setsockopt(zmq.IDENTITY, ident)
599 599 connect_socket(self._task_socket, task_addr)
600 600 if content.notification:
601 601 self._notification_socket = self._context.socket(zmq.SUB)
602 602 connect_socket(self._notification_socket, content.notification)
603 603 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
604 604 # if content.query:
605 605 # self._query_socket = self._context.socket(zmq.DEALER)
606 606 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
607 607 # connect_socket(self._query_socket, content.query)
608 608 if content.control:
609 609 self._control_socket = self._context.socket(zmq.DEALER)
610 610 self._control_socket.setsockopt(zmq.IDENTITY, ident)
611 611 connect_socket(self._control_socket, content.control)
612 612 if content.iopub:
613 613 self._iopub_socket = self._context.socket(zmq.SUB)
614 614 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
615 615 self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
616 616 connect_socket(self._iopub_socket, content.iopub)
617 617 self._update_engines(dict(content.engines))
618 618 else:
619 619 self._connected = False
620 620 raise Exception("Failed to connect!")
621 621
622 622 #--------------------------------------------------------------------------
623 623 # handlers and callbacks for incoming messages
624 624 #--------------------------------------------------------------------------
625 625
626 626 def _unwrap_exception(self, content):
627 627 """unwrap exception, and remap engine_id to int."""
628 628 e = error.unwrap_exception(content)
629 629 # print e.traceback
630 630 if e.engine_info:
631 631 e_uuid = e.engine_info['engine_uuid']
632 632 eid = self._engines[e_uuid]
633 633 e.engine_info['engine_id'] = eid
634 634 return e
635 635
636 636 def _extract_metadata(self, header, parent, content):
637 637 md = {'msg_id' : parent['msg_id'],
638 638 'received' : datetime.now(),
639 639 'engine_uuid' : header.get('engine', None),
640 640 'follow' : parent.get('follow', []),
641 641 'after' : parent.get('after', []),
642 642 'status' : content['status'],
643 643 }
644 644
645 645 if md['engine_uuid'] is not None:
646 646 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
647 647
648 648 if 'date' in parent:
649 649 md['submitted'] = parent['date']
650 650 if 'started' in header:
651 651 md['started'] = header['started']
652 652 if 'date' in header:
653 653 md['completed'] = header['date']
654 654 return md
655 655
656 656 def _register_engine(self, msg):
657 657 """Register a new engine, and update our connection info."""
658 658 content = msg['content']
659 659 eid = content['id']
660 660 d = {eid : content['queue']}
661 661 self._update_engines(d)
662 662
663 663 def _unregister_engine(self, msg):
664 664 """Unregister an engine that has died."""
665 665 content = msg['content']
666 666 eid = int(content['id'])
667 667 if eid in self._ids:
668 668 self._ids.remove(eid)
669 669 uuid = self._engines.pop(eid)
670 670
671 671 self._handle_stranded_msgs(eid, uuid)
672 672
673 673 if self._task_socket and self._task_scheme == 'pure':
674 674 self._stop_scheduling_tasks()
675 675
676 676 def _handle_stranded_msgs(self, eid, uuid):
677 677 """Handle messages known to be on an engine when the engine unregisters.
678 678
679 679 It is possible that this will fire prematurely - that is, an engine will
680 680 go down after completing a result, and the client will be notified
681 681 of the unregistration and later receive the successful result.
682 682 """
683 683
684 684 outstanding = self._outstanding_dict[uuid]
685 685
686 686 for msg_id in list(outstanding):
687 687 if msg_id in self.results:
688 688 # we already
689 689 continue
690 690 try:
691 691 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
692 692 except:
693 693 content = error.wrap_exception()
694 694 # build a fake message:
695 695 parent = {}
696 696 header = {}
697 697 parent['msg_id'] = msg_id
698 698 header['engine'] = uuid
699 699 header['date'] = datetime.now()
700 700 msg = dict(parent_header=parent, header=header, content=content)
701 701 self._handle_apply_reply(msg)
702 702
703 703 def _handle_execute_reply(self, msg):
704 704 """Save the reply to an execute_request into our results.
705 705
706 706 execute messages are never actually used. apply is used instead.
707 707 """
708 708
709 709 parent = msg['parent_header']
710 710 msg_id = parent['msg_id']
711 711 if msg_id not in self.outstanding:
712 712 if msg_id in self.history:
713 713 print ("got stale result: %s"%msg_id)
714 714 else:
715 715 print ("got unknown result: %s"%msg_id)
716 716 else:
717 717 self.outstanding.remove(msg_id)
718 718
719 719 content = msg['content']
720 720 header = msg['header']
721 721
722 722 # construct metadata:
723 723 md = self.metadata[msg_id]
724 724 md.update(self._extract_metadata(header, parent, content))
725 725 # is this redundant?
726 726 self.metadata[msg_id] = md
727 727
728 728 e_outstanding = self._outstanding_dict[md['engine_uuid']]
729 729 if msg_id in e_outstanding:
730 730 e_outstanding.remove(msg_id)
731 731
732 732 # construct result:
733 733 if content['status'] == 'ok':
734 734 self.results[msg_id] = ExecuteReply(msg_id, content, md)
735 735 elif content['status'] == 'aborted':
736 736 self.results[msg_id] = error.TaskAborted(msg_id)
737 737 elif content['status'] == 'resubmitted':
738 738 # TODO: handle resubmission
739 739 pass
740 740 else:
741 741 self.results[msg_id] = self._unwrap_exception(content)
742 742
743 743 def _handle_apply_reply(self, msg):
744 744 """Save the reply to an apply_request into our results."""
745 745 parent = msg['parent_header']
746 746 msg_id = parent['msg_id']
747 747 if msg_id not in self.outstanding:
748 748 if msg_id in self.history:
749 749 print ("got stale result: %s"%msg_id)
750 750 print self.results[msg_id]
751 751 print msg
752 752 else:
753 753 print ("got unknown result: %s"%msg_id)
754 754 else:
755 755 self.outstanding.remove(msg_id)
756 756 content = msg['content']
757 757 header = msg['header']
758 758
759 759 # construct metadata:
760 760 md = self.metadata[msg_id]
761 761 md.update(self._extract_metadata(header, parent, content))
762 762 # is this redundant?
763 763 self.metadata[msg_id] = md
764 764
765 765 e_outstanding = self._outstanding_dict[md['engine_uuid']]
766 766 if msg_id in e_outstanding:
767 767 e_outstanding.remove(msg_id)
768 768
769 769 # construct result:
770 770 if content['status'] == 'ok':
771 771 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
772 772 elif content['status'] == 'aborted':
773 773 self.results[msg_id] = error.TaskAborted(msg_id)
774 774 elif content['status'] == 'resubmitted':
775 775 # TODO: handle resubmission
776 776 pass
777 777 else:
778 778 self.results[msg_id] = self._unwrap_exception(content)
779 779
780 780 def _flush_notifications(self):
781 781 """Flush notifications of engine registrations waiting
782 782 in ZMQ queue."""
783 783 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
784 784 while msg is not None:
785 785 if self.debug:
786 786 pprint(msg)
787 787 msg_type = msg['header']['msg_type']
788 788 handler = self._notification_handlers.get(msg_type, None)
789 789 if handler is None:
790 790 raise Exception("Unhandled message type: %s"%msg.msg_type)
791 791 else:
792 792 handler(msg)
793 793 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
794 794
795 795 def _flush_results(self, sock):
796 796 """Flush task or queue results waiting in ZMQ queue."""
797 797 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
798 798 while msg is not None:
799 799 if self.debug:
800 800 pprint(msg)
801 801 msg_type = msg['header']['msg_type']
802 802 handler = self._queue_handlers.get(msg_type, None)
803 803 if handler is None:
804 804 raise Exception("Unhandled message type: %s"%msg.msg_type)
805 805 else:
806 806 handler(msg)
807 807 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
808 808
809 809 def _flush_control(self, sock):
810 810 """Flush replies from the control channel waiting
811 811 in the ZMQ queue.
812 812
813 813 Currently: ignore them."""
814 814 if self._ignored_control_replies <= 0:
815 815 return
816 816 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
817 817 while msg is not None:
818 818 self._ignored_control_replies -= 1
819 819 if self.debug:
820 820 pprint(msg)
821 821 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
822 822
823 823 def _flush_ignored_control(self):
824 824 """flush ignored control replies"""
825 825 while self._ignored_control_replies > 0:
826 826 self.session.recv(self._control_socket)
827 827 self._ignored_control_replies -= 1
828 828
829 829 def _flush_ignored_hub_replies(self):
830 830 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
831 831 while msg is not None:
832 832 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
833 833
834 834 def _flush_iopub(self, sock):
835 835 """Flush replies from the iopub channel waiting
836 836 in the ZMQ queue.
837 837 """
838 838 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
839 839 while msg is not None:
840 840 if self.debug:
841 841 pprint(msg)
842 842 parent = msg['parent_header']
843 843 # ignore IOPub messages with no parent.
844 844 # Caused by print statements or warnings from before the first execution.
845 845 if not parent:
846 846 continue
847 847 msg_id = parent['msg_id']
848 848 content = msg['content']
849 849 header = msg['header']
850 850 msg_type = msg['header']['msg_type']
851 851
852 852 # init metadata:
853 853 md = self.metadata[msg_id]
854 854
855 855 if msg_type == 'stream':
856 856 name = content['name']
857 857 s = md[name] or ''
858 858 md[name] = s + content['data']
859 859 elif msg_type == 'pyerr':
860 860 md.update({'pyerr' : self._unwrap_exception(content)})
861 861 elif msg_type == 'pyin':
862 862 md.update({'pyin' : content['code']})
863 863 elif msg_type == 'display_data':
864 864 md['outputs'].append(content)
865 865 elif msg_type == 'pyout':
866 866 md['pyout'] = content
867 867 else:
868 868 # unhandled msg_type (status, etc.)
869 869 pass
870 870
871 871 # reduntant?
872 872 self.metadata[msg_id] = md
873 873
874 874 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
875 875
876 876 #--------------------------------------------------------------------------
877 877 # len, getitem
878 878 #--------------------------------------------------------------------------
879 879
880 880 def __len__(self):
881 881 """len(client) returns # of engines."""
882 882 return len(self.ids)
883 883
884 884 def __getitem__(self, key):
885 885 """index access returns DirectView multiplexer objects
886 886
887 887 Must be int, slice, or list/tuple/xrange of ints"""
888 888 if not isinstance(key, (int, slice, tuple, list, xrange)):
889 889 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
890 890 else:
891 891 return self.direct_view(key)
892 892
893 893 #--------------------------------------------------------------------------
894 894 # Begin public methods
895 895 #--------------------------------------------------------------------------
896 896
897 897 @property
898 898 def ids(self):
899 899 """Always up-to-date ids property."""
900 900 self._flush_notifications()
901 901 # always copy:
902 902 return list(self._ids)
903 903
904 904 def close(self):
905 905 if self._closed:
906 906 return
907 907 self.stop_spin_thread()
908 908 snames = filter(lambda n: n.endswith('socket'), dir(self))
909 909 for socket in map(lambda name: getattr(self, name), snames):
910 910 if isinstance(socket, zmq.Socket) and not socket.closed:
911 911 socket.close()
912 912 self._closed = True
913 913
914 914 def _spin_every(self, interval=1):
915 915 """target func for use in spin_thread"""
916 916 while True:
917 917 if self._stop_spinning.is_set():
918 918 return
919 919 time.sleep(interval)
920 920 self.spin()
921 921
922 922 def spin_thread(self, interval=1):
923 923 """call Client.spin() in a background thread on some regular interval
924 924
925 925 This helps ensure that messages don't pile up too much in the zmq queue
926 926 while you are working on other things, or just leaving an idle terminal.
927 927
928 928 It also helps limit potential padding of the `received` timestamp
929 929 on AsyncResult objects, used for timings.
930 930
931 931 Parameters
932 932 ----------
933 933
934 934 interval : float, optional
935 935 The interval on which to spin the client in the background thread
936 936 (simply passed to time.sleep).
937 937
938 938 Notes
939 939 -----
940 940
941 941 For precision timing, you may want to use this method to put a bound
942 942 on the jitter (in seconds) in `received` timestamps used
943 943 in AsyncResult.wall_time.
944 944
945 945 """
946 946 if self._spin_thread is not None:
947 947 self.stop_spin_thread()
948 948 self._stop_spinning.clear()
949 949 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
950 950 self._spin_thread.daemon = True
951 951 self._spin_thread.start()
952 952
953 953 def stop_spin_thread(self):
954 954 """stop background spin_thread, if any"""
955 955 if self._spin_thread is not None:
956 956 self._stop_spinning.set()
957 957 self._spin_thread.join()
958 958 self._spin_thread = None
959 959
960 960 def spin(self):
961 961 """Flush any registration notifications and execution results
962 962 waiting in the ZMQ queue.
963 963 """
964 964 if self._notification_socket:
965 965 self._flush_notifications()
966 966 if self._iopub_socket:
967 967 self._flush_iopub(self._iopub_socket)
968 968 if self._mux_socket:
969 969 self._flush_results(self._mux_socket)
970 970 if self._task_socket:
971 971 self._flush_results(self._task_socket)
972 972 if self._control_socket:
973 973 self._flush_control(self._control_socket)
974 974 if self._query_socket:
975 975 self._flush_ignored_hub_replies()
976 976
977 977 def wait(self, jobs=None, timeout=-1):
978 978 """waits on one or more `jobs`, for up to `timeout` seconds.
979 979
980 980 Parameters
981 981 ----------
982 982
983 983 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
984 984 ints are indices to self.history
985 985 strs are msg_ids
986 986 default: wait on all outstanding messages
987 987 timeout : float
988 988 a time in seconds, after which to give up.
989 989 default is -1, which means no timeout
990 990
991 991 Returns
992 992 -------
993 993
994 994 True : when all msg_ids are done
995 995 False : timeout reached, some msg_ids still outstanding
996 996 """
997 997 tic = time.time()
998 998 if jobs is None:
999 999 theids = self.outstanding
1000 1000 else:
1001 1001 if isinstance(jobs, (int, basestring, AsyncResult)):
1002 1002 jobs = [jobs]
1003 1003 theids = set()
1004 1004 for job in jobs:
1005 1005 if isinstance(job, int):
1006 1006 # index access
1007 1007 job = self.history[job]
1008 1008 elif isinstance(job, AsyncResult):
1009 1009 map(theids.add, job.msg_ids)
1010 1010 continue
1011 1011 theids.add(job)
1012 1012 if not theids.intersection(self.outstanding):
1013 1013 return True
1014 1014 self.spin()
1015 1015 while theids.intersection(self.outstanding):
1016 1016 if timeout >= 0 and ( time.time()-tic ) > timeout:
1017 1017 break
1018 1018 time.sleep(1e-3)
1019 1019 self.spin()
1020 1020 return len(theids.intersection(self.outstanding)) == 0
1021 1021
1022 1022 #--------------------------------------------------------------------------
1023 1023 # Control methods
1024 1024 #--------------------------------------------------------------------------
1025 1025
1026 1026 @spin_first
1027 1027 def clear(self, targets=None, block=None):
1028 1028 """Clear the namespace in target(s)."""
1029 1029 block = self.block if block is None else block
1030 1030 targets = self._build_targets(targets)[0]
1031 1031 for t in targets:
1032 1032 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1033 1033 error = False
1034 1034 if block:
1035 1035 self._flush_ignored_control()
1036 1036 for i in range(len(targets)):
1037 1037 idents,msg = self.session.recv(self._control_socket,0)
1038 1038 if self.debug:
1039 1039 pprint(msg)
1040 1040 if msg['content']['status'] != 'ok':
1041 1041 error = self._unwrap_exception(msg['content'])
1042 1042 else:
1043 1043 self._ignored_control_replies += len(targets)
1044 1044 if error:
1045 1045 raise error
1046 1046
1047 1047
1048 1048 @spin_first
1049 1049 def abort(self, jobs=None, targets=None, block=None):
1050 1050 """Abort specific jobs from the execution queues of target(s).
1051 1051
1052 1052 This is a mechanism to prevent jobs that have already been submitted
1053 1053 from executing.
1054 1054
1055 1055 Parameters
1056 1056 ----------
1057 1057
1058 1058 jobs : msg_id, list of msg_ids, or AsyncResult
1059 1059 The jobs to be aborted
1060 1060
1061 1061 If unspecified/None: abort all outstanding jobs.
1062 1062
1063 1063 """
1064 1064 block = self.block if block is None else block
1065 1065 jobs = jobs if jobs is not None else list(self.outstanding)
1066 1066 targets = self._build_targets(targets)[0]
1067 1067
1068 1068 msg_ids = []
1069 1069 if isinstance(jobs, (basestring,AsyncResult)):
1070 1070 jobs = [jobs]
1071 1071 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1072 1072 if bad_ids:
1073 1073 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1074 1074 for j in jobs:
1075 1075 if isinstance(j, AsyncResult):
1076 1076 msg_ids.extend(j.msg_ids)
1077 1077 else:
1078 1078 msg_ids.append(j)
1079 1079 content = dict(msg_ids=msg_ids)
1080 1080 for t in targets:
1081 1081 self.session.send(self._control_socket, 'abort_request',
1082 1082 content=content, ident=t)
1083 1083 error = False
1084 1084 if block:
1085 1085 self._flush_ignored_control()
1086 1086 for i in range(len(targets)):
1087 1087 idents,msg = self.session.recv(self._control_socket,0)
1088 1088 if self.debug:
1089 1089 pprint(msg)
1090 1090 if msg['content']['status'] != 'ok':
1091 1091 error = self._unwrap_exception(msg['content'])
1092 1092 else:
1093 1093 self._ignored_control_replies += len(targets)
1094 1094 if error:
1095 1095 raise error
1096 1096
1097 1097 @spin_first
1098 1098 def shutdown(self, targets=None, restart=False, hub=False, block=None):
1099 1099 """Terminates one or more engine processes, optionally including the hub."""
1100 1100 block = self.block if block is None else block
1101 1101 if hub:
1102 1102 targets = 'all'
1103 1103 targets = self._build_targets(targets)[0]
1104 1104 for t in targets:
1105 1105 self.session.send(self._control_socket, 'shutdown_request',
1106 1106 content={'restart':restart},ident=t)
1107 1107 error = False
1108 1108 if block or hub:
1109 1109 self._flush_ignored_control()
1110 1110 for i in range(len(targets)):
1111 1111 idents,msg = self.session.recv(self._control_socket, 0)
1112 1112 if self.debug:
1113 1113 pprint(msg)
1114 1114 if msg['content']['status'] != 'ok':
1115 1115 error = self._unwrap_exception(msg['content'])
1116 1116 else:
1117 1117 self._ignored_control_replies += len(targets)
1118 1118
1119 1119 if hub:
1120 1120 time.sleep(0.25)
1121 1121 self.session.send(self._query_socket, 'shutdown_request')
1122 1122 idents,msg = self.session.recv(self._query_socket, 0)
1123 1123 if self.debug:
1124 1124 pprint(msg)
1125 1125 if msg['content']['status'] != 'ok':
1126 1126 error = self._unwrap_exception(msg['content'])
1127 1127
1128 1128 if error:
1129 1129 raise error
1130 1130
1131 1131 #--------------------------------------------------------------------------
1132 1132 # Execution related methods
1133 1133 #--------------------------------------------------------------------------
1134 1134
1135 1135 def _maybe_raise(self, result):
1136 1136 """wrapper for maybe raising an exception if apply failed."""
1137 1137 if isinstance(result, error.RemoteError):
1138 1138 raise result
1139 1139
1140 1140 return result
1141 1141
1142 1142 def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
1143 1143 ident=None):
1144 1144 """construct and send an apply message via a socket.
1145 1145
1146 1146 This is the principal method with which all engine execution is performed by views.
1147 1147 """
1148 1148
1149 1149 if self._closed:
1150 1150 raise RuntimeError("Client cannot be used after its sockets have been closed")
1151 1151
1152 1152 # defaults:
1153 1153 args = args if args is not None else []
1154 1154 kwargs = kwargs if kwargs is not None else {}
1155 1155 subheader = subheader if subheader is not None else {}
1156 1156
1157 1157 # validate arguments
1158 1158 if not callable(f) and not isinstance(f, Reference):
1159 1159 raise TypeError("f must be callable, not %s"%type(f))
1160 1160 if not isinstance(args, (tuple, list)):
1161 1161 raise TypeError("args must be tuple or list, not %s"%type(args))
1162 1162 if not isinstance(kwargs, dict):
1163 1163 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1164 1164 if not isinstance(subheader, dict):
1165 1165 raise TypeError("subheader must be dict, not %s"%type(subheader))
1166 1166
1167 1167 bufs = util.pack_apply_message(f,args,kwargs)
1168 1168
1169 1169 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1170 1170 subheader=subheader, track=track)
1171 1171
1172 1172 msg_id = msg['header']['msg_id']
1173 1173 self.outstanding.add(msg_id)
1174 1174 if ident:
1175 1175 # possibly routed to a specific engine
1176 1176 if isinstance(ident, list):
1177 1177 ident = ident[-1]
1178 1178 if ident in self._engines.values():
1179 1179 # save for later, in case of engine death
1180 1180 self._outstanding_dict[ident].add(msg_id)
1181 1181 self.history.append(msg_id)
1182 1182 self.metadata[msg_id]['submitted'] = datetime.now()
1183 1183
1184 1184 return msg
1185 1185
1186 1186 def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None):
1187 1187 """construct and send an execute request via a socket.
1188 1188
1189 1189 """
1190 1190
1191 1191 if self._closed:
1192 1192 raise RuntimeError("Client cannot be used after its sockets have been closed")
1193 1193
1194 1194 # defaults:
1195 1195 subheader = subheader if subheader is not None else {}
1196 1196
1197 1197 # validate arguments
1198 1198 if not isinstance(code, basestring):
1199 1199 raise TypeError("code must be text, not %s" % type(code))
1200 1200 if not isinstance(subheader, dict):
1201 1201 raise TypeError("subheader must be dict, not %s" % type(subheader))
1202 1202
1203 1203 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1204 1204
1205 1205
1206 1206 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1207 1207 subheader=subheader)
1208 1208
1209 1209 msg_id = msg['header']['msg_id']
1210 1210 self.outstanding.add(msg_id)
1211 1211 if ident:
1212 1212 # possibly routed to a specific engine
1213 1213 if isinstance(ident, list):
1214 1214 ident = ident[-1]
1215 1215 if ident in self._engines.values():
1216 1216 # save for later, in case of engine death
1217 1217 self._outstanding_dict[ident].add(msg_id)
1218 1218 self.history.append(msg_id)
1219 1219 self.metadata[msg_id]['submitted'] = datetime.now()
1220 1220
1221 1221 return msg
1222 1222
1223 1223 #--------------------------------------------------------------------------
1224 1224 # construct a View object
1225 1225 #--------------------------------------------------------------------------
1226 1226
1227 1227 def load_balanced_view(self, targets=None):
1228 1228 """construct a DirectView object.
1229 1229
1230 1230 If no arguments are specified, create a LoadBalancedView
1231 1231 using all engines.
1232 1232
1233 1233 Parameters
1234 1234 ----------
1235 1235
1236 1236 targets: list,slice,int,etc. [default: use all engines]
1237 1237 The subset of engines across which to load-balance
1238 1238 """
1239 1239 if targets == 'all':
1240 1240 targets = None
1241 1241 if targets is not None:
1242 1242 targets = self._build_targets(targets)[1]
1243 1243 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1244 1244
1245 1245 def direct_view(self, targets='all'):
1246 1246 """construct a DirectView object.
1247 1247
1248 1248 If no targets are specified, create a DirectView using all engines.
1249 1249
1250 1250 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1251 1251 evaluate the target engines at each execution, whereas rc[:] will connect to
1252 1252 all *current* engines, and that list will not change.
1253 1253
1254 1254 That is, 'all' will always use all engines, whereas rc[:] will not use
1255 1255 engines added after the DirectView is constructed.
1256 1256
1257 1257 Parameters
1258 1258 ----------
1259 1259
1260 1260 targets: list,slice,int,etc. [default: use all engines]
1261 1261 The engines to use for the View
1262 1262 """
1263 1263 single = isinstance(targets, int)
1264 1264 # allow 'all' to be lazily evaluated at each execution
1265 1265 if targets != 'all':
1266 1266 targets = self._build_targets(targets)[1]
1267 1267 if single:
1268 1268 targets = targets[0]
1269 1269 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1270 1270
1271 1271 #--------------------------------------------------------------------------
1272 1272 # Query methods
1273 1273 #--------------------------------------------------------------------------
1274 1274
1275 1275 @spin_first
1276 1276 def get_result(self, indices_or_msg_ids=None, block=None):
1277 1277 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1278 1278
1279 1279 If the client already has the results, no request to the Hub will be made.
1280 1280
1281 1281 This is a convenient way to construct AsyncResult objects, which are wrappers
1282 1282 that include metadata about execution, and allow for awaiting results that
1283 1283 were not submitted by this Client.
1284 1284
1285 1285 It can also be a convenient way to retrieve the metadata associated with
1286 1286 blocking execution, since it always retrieves
1287 1287
1288 1288 Examples
1289 1289 --------
1290 1290 ::
1291 1291
1292 1292 In [10]: r = client.apply()
1293 1293
1294 1294 Parameters
1295 1295 ----------
1296 1296
1297 1297 indices_or_msg_ids : integer history index, str msg_id, or list of either
1298 1298 The indices or msg_ids of indices to be retrieved
1299 1299
1300 1300 block : bool
1301 1301 Whether to wait for the result to be done
1302 1302
1303 1303 Returns
1304 1304 -------
1305 1305
1306 1306 AsyncResult
1307 1307 A single AsyncResult object will always be returned.
1308 1308
1309 1309 AsyncHubResult
1310 1310 A subclass of AsyncResult that retrieves results from the Hub
1311 1311
1312 1312 """
1313 1313 block = self.block if block is None else block
1314 1314 if indices_or_msg_ids is None:
1315 1315 indices_or_msg_ids = -1
1316 1316
1317 1317 if not isinstance(indices_or_msg_ids, (list,tuple)):
1318 1318 indices_or_msg_ids = [indices_or_msg_ids]
1319 1319
1320 1320 theids = []
1321 1321 for id in indices_or_msg_ids:
1322 1322 if isinstance(id, int):
1323 1323 id = self.history[id]
1324 1324 if not isinstance(id, basestring):
1325 1325 raise TypeError("indices must be str or int, not %r"%id)
1326 1326 theids.append(id)
1327 1327
1328 1328 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1329 1329 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1330 1330
1331 1331 if remote_ids:
1332 1332 ar = AsyncHubResult(self, msg_ids=theids)
1333 1333 else:
1334 1334 ar = AsyncResult(self, msg_ids=theids)
1335 1335
1336 1336 if block:
1337 1337 ar.wait()
1338 1338
1339 1339 return ar
1340 1340
1341 1341 @spin_first
1342 1342 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1343 1343 """Resubmit one or more tasks.
1344 1344
1345 1345 in-flight tasks may not be resubmitted.
1346 1346
1347 1347 Parameters
1348 1348 ----------
1349 1349
1350 1350 indices_or_msg_ids : integer history index, str msg_id, or list of either
1351 1351 The indices or msg_ids of indices to be retrieved
1352 1352
1353 1353 block : bool
1354 1354 Whether to wait for the result to be done
1355 1355
1356 1356 Returns
1357 1357 -------
1358 1358
1359 1359 AsyncHubResult
1360 1360 A subclass of AsyncResult that retrieves results from the Hub
1361 1361
1362 1362 """
1363 1363 block = self.block if block is None else block
1364 1364 if indices_or_msg_ids is None:
1365 1365 indices_or_msg_ids = -1
1366 1366
1367 1367 if not isinstance(indices_or_msg_ids, (list,tuple)):
1368 1368 indices_or_msg_ids = [indices_or_msg_ids]
1369 1369
1370 1370 theids = []
1371 1371 for id in indices_or_msg_ids:
1372 1372 if isinstance(id, int):
1373 1373 id = self.history[id]
1374 1374 if not isinstance(id, basestring):
1375 1375 raise TypeError("indices must be str or int, not %r"%id)
1376 1376 theids.append(id)
1377 1377
1378 1378 content = dict(msg_ids = theids)
1379 1379
1380 1380 self.session.send(self._query_socket, 'resubmit_request', content)
1381 1381
1382 1382 zmq.select([self._query_socket], [], [])
1383 1383 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1384 1384 if self.debug:
1385 1385 pprint(msg)
1386 1386 content = msg['content']
1387 1387 if content['status'] != 'ok':
1388 1388 raise self._unwrap_exception(content)
1389 1389 mapping = content['resubmitted']
1390 1390 new_ids = [ mapping[msg_id] for msg_id in theids ]
1391 1391
1392 1392 ar = AsyncHubResult(self, msg_ids=new_ids)
1393 1393
1394 1394 if block:
1395 1395 ar.wait()
1396 1396
1397 1397 return ar
1398 1398
1399 1399 @spin_first
1400 1400 def result_status(self, msg_ids, status_only=True):
1401 1401 """Check on the status of the result(s) of the apply request with `msg_ids`.
1402 1402
1403 1403 If status_only is False, then the actual results will be retrieved, else
1404 1404 only the status of the results will be checked.
1405 1405
1406 1406 Parameters
1407 1407 ----------
1408 1408
1409 1409 msg_ids : list of msg_ids
1410 1410 if int:
1411 1411 Passed as index to self.history for convenience.
1412 1412 status_only : bool (default: True)
1413 1413 if False:
1414 1414 Retrieve the actual results of completed tasks.
1415 1415
1416 1416 Returns
1417 1417 -------
1418 1418
1419 1419 results : dict
1420 1420 There will always be the keys 'pending' and 'completed', which will
1421 1421 be lists of msg_ids that are incomplete or complete. If `status_only`
1422 1422 is False, then completed results will be keyed by their `msg_id`.
1423 1423 """
1424 1424 if not isinstance(msg_ids, (list,tuple)):
1425 1425 msg_ids = [msg_ids]
1426 1426
1427 1427 theids = []
1428 1428 for msg_id in msg_ids:
1429 1429 if isinstance(msg_id, int):
1430 1430 msg_id = self.history[msg_id]
1431 1431 if not isinstance(msg_id, basestring):
1432 1432 raise TypeError("msg_ids must be str, not %r"%msg_id)
1433 1433 theids.append(msg_id)
1434 1434
1435 1435 completed = []
1436 1436 local_results = {}
1437 1437
1438 1438 # comment this block out to temporarily disable local shortcut:
1439 1439 for msg_id in theids:
1440 1440 if msg_id in self.results:
1441 1441 completed.append(msg_id)
1442 1442 local_results[msg_id] = self.results[msg_id]
1443 1443 theids.remove(msg_id)
1444 1444
1445 1445 if theids: # some not locally cached
1446 1446 content = dict(msg_ids=theids, status_only=status_only)
1447 1447 msg = self.session.send(self._query_socket, "result_request", content=content)
1448 1448 zmq.select([self._query_socket], [], [])
1449 1449 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1450 1450 if self.debug:
1451 1451 pprint(msg)
1452 1452 content = msg['content']
1453 1453 if content['status'] != 'ok':
1454 1454 raise self._unwrap_exception(content)
1455 1455 buffers = msg['buffers']
1456 1456 else:
1457 1457 content = dict(completed=[],pending=[])
1458 1458
1459 1459 content['completed'].extend(completed)
1460 1460
1461 1461 if status_only:
1462 1462 return content
1463 1463
1464 1464 failures = []
1465 1465 # load cached results into result:
1466 1466 content.update(local_results)
1467 1467
1468 1468 # update cache with results:
1469 1469 for msg_id in sorted(theids):
1470 1470 if msg_id in content['completed']:
1471 1471 rec = content[msg_id]
1472 1472 parent = rec['header']
1473 1473 header = rec['result_header']
1474 1474 rcontent = rec['result_content']
1475 1475 iodict = rec['io']
1476 1476 if isinstance(rcontent, str):
1477 1477 rcontent = self.session.unpack(rcontent)
1478 1478
1479 1479 md = self.metadata[msg_id]
1480 1480 md.update(self._extract_metadata(header, parent, rcontent))
1481 1481 if rec.get('received'):
1482 1482 md['received'] = rec['received']
1483 1483 md.update(iodict)
1484 1484
1485 1485 if rcontent['status'] == 'ok':
1486 1486 res,buffers = util.unserialize_object(buffers)
1487 1487 else:
1488 1488 print rcontent
1489 1489 res = self._unwrap_exception(rcontent)
1490 1490 failures.append(res)
1491 1491
1492 1492 self.results[msg_id] = res
1493 1493 content[msg_id] = res
1494 1494
1495 1495 if len(theids) == 1 and failures:
1496 1496 raise failures[0]
1497 1497
1498 1498 error.collect_exceptions(failures, "result_status")
1499 1499 return content
1500 1500
1501 1501 @spin_first
1502 1502 def queue_status(self, targets='all', verbose=False):
1503 1503 """Fetch the status of engine queues.
1504 1504
1505 1505 Parameters
1506 1506 ----------
1507 1507
1508 1508 targets : int/str/list of ints/strs
1509 1509 the engines whose states are to be queried.
1510 1510 default : all
1511 1511 verbose : bool
1512 1512 Whether to return lengths only, or lists of ids for each element
1513 1513 """
1514 1514 if targets == 'all':
1515 1515 # allow 'all' to be evaluated on the engine
1516 1516 engine_ids = None
1517 1517 else:
1518 1518 engine_ids = self._build_targets(targets)[1]
1519 1519 content = dict(targets=engine_ids, verbose=verbose)
1520 1520 self.session.send(self._query_socket, "queue_request", content=content)
1521 1521 idents,msg = self.session.recv(self._query_socket, 0)
1522 1522 if self.debug:
1523 1523 pprint(msg)
1524 1524 content = msg['content']
1525 1525 status = content.pop('status')
1526 1526 if status != 'ok':
1527 1527 raise self._unwrap_exception(content)
1528 1528 content = rekey(content)
1529 1529 if isinstance(targets, int):
1530 1530 return content[targets]
1531 1531 else:
1532 1532 return content
1533 1533
1534 1534 @spin_first
1535 1535 def purge_results(self, jobs=[], targets=[]):
1536 1536 """Tell the Hub to forget results.
1537 1537
1538 1538 Individual results can be purged by msg_id, or the entire
1539 1539 history of specific targets can be purged.
1540 1540
1541 1541 Use `purge_results('all')` to scrub everything from the Hub's db.
1542 1542
1543 1543 Parameters
1544 1544 ----------
1545 1545
1546 1546 jobs : str or list of str or AsyncResult objects
1547 1547 the msg_ids whose results should be forgotten.
1548 1548 targets : int/str/list of ints/strs
1549 1549 The targets, by int_id, whose entire history is to be purged.
1550 1550
1551 1551 default : None
1552 1552 """
1553 1553 if not targets and not jobs:
1554 1554 raise ValueError("Must specify at least one of `targets` and `jobs`")
1555 1555 if targets:
1556 1556 targets = self._build_targets(targets)[1]
1557 1557
1558 1558 # construct msg_ids from jobs
1559 1559 if jobs == 'all':
1560 1560 msg_ids = jobs
1561 1561 else:
1562 1562 msg_ids = []
1563 1563 if isinstance(jobs, (basestring,AsyncResult)):
1564 1564 jobs = [jobs]
1565 1565 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1566 1566 if bad_ids:
1567 1567 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1568 1568 for j in jobs:
1569 1569 if isinstance(j, AsyncResult):
1570 1570 msg_ids.extend(j.msg_ids)
1571 1571 else:
1572 1572 msg_ids.append(j)
1573 1573
1574 1574 content = dict(engine_ids=targets, msg_ids=msg_ids)
1575 1575 self.session.send(self._query_socket, "purge_request", content=content)
1576 1576 idents, msg = self.session.recv(self._query_socket, 0)
1577 1577 if self.debug:
1578 1578 pprint(msg)
1579 1579 content = msg['content']
1580 1580 if content['status'] != 'ok':
1581 1581 raise self._unwrap_exception(content)
1582 1582
1583 1583 @spin_first
1584 1584 def hub_history(self):
1585 1585 """Get the Hub's history
1586 1586
1587 1587 Just like the Client, the Hub has a history, which is a list of msg_ids.
1588 1588 This will contain the history of all clients, and, depending on configuration,
1589 1589 may contain history across multiple cluster sessions.
1590 1590
1591 1591 Any msg_id returned here is a valid argument to `get_result`.
1592 1592
1593 1593 Returns
1594 1594 -------
1595 1595
1596 1596 msg_ids : list of strs
1597 1597 list of all msg_ids, ordered by task submission time.
1598 1598 """
1599 1599
1600 1600 self.session.send(self._query_socket, "history_request", content={})
1601 1601 idents, msg = self.session.recv(self._query_socket, 0)
1602 1602
1603 1603 if self.debug:
1604 1604 pprint(msg)
1605 1605 content = msg['content']
1606 1606 if content['status'] != 'ok':
1607 1607 raise self._unwrap_exception(content)
1608 1608 else:
1609 1609 return content['history']
1610 1610
1611 1611 @spin_first
1612 1612 def db_query(self, query, keys=None):
1613 1613 """Query the Hub's TaskRecord database
1614 1614
1615 1615 This will return a list of task record dicts that match `query`
1616 1616
1617 1617 Parameters
1618 1618 ----------
1619 1619
1620 1620 query : mongodb query dict
1621 1621 The search dict. See mongodb query docs for details.
1622 1622 keys : list of strs [optional]
1623 1623 The subset of keys to be returned. The default is to fetch everything but buffers.
1624 1624 'msg_id' will *always* be included.
1625 1625 """
1626 1626 if isinstance(keys, basestring):
1627 1627 keys = [keys]
1628 1628 content = dict(query=query, keys=keys)
1629 1629 self.session.send(self._query_socket, "db_request", content=content)
1630 1630 idents, msg = self.session.recv(self._query_socket, 0)
1631 1631 if self.debug:
1632 1632 pprint(msg)
1633 1633 content = msg['content']
1634 1634 if content['status'] != 'ok':
1635 1635 raise self._unwrap_exception(content)
1636 1636
1637 1637 records = content['records']
1638 1638
1639 1639 buffer_lens = content['buffer_lens']
1640 1640 result_buffer_lens = content['result_buffer_lens']
1641 1641 buffers = msg['buffers']
1642 1642 has_bufs = buffer_lens is not None
1643 1643 has_rbufs = result_buffer_lens is not None
1644 1644 for i,rec in enumerate(records):
1645 1645 # relink buffers
1646 1646 if has_bufs:
1647 1647 blen = buffer_lens[i]
1648 1648 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1649 1649 if has_rbufs:
1650 1650 blen = result_buffer_lens[i]
1651 1651 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1652 1652
1653 1653 return records
1654 1654
1655 1655 __all__ = [ 'Client' ]
@@ -1,942 +1,942 b''
1 1 .. _parallel_multiengine:
2 2
3 3 ==========================
4 4 IPython's Direct interface
5 5 ==========================
6 6
7 7 The direct, or multiengine, interface represents one possible way of working with a set of
8 8 IPython engines. The basic idea behind the multiengine interface is that the
9 9 capabilities of each engine are directly and explicitly exposed to the user.
10 10 Thus, in the multiengine interface, each engine is given an id that is used to
11 11 identify the engine and give it work to do. This interface is very intuitive
12 12 and is designed with interactive usage in mind, and is the best place for
13 13 new users of IPython to begin.
14 14
15 15 Starting the IPython controller and engines
16 16 ===========================================
17 17
18 18 To follow along with this tutorial, you will need to start the IPython
19 19 controller and four IPython engines. The simplest way of doing this is to use
20 20 the :command:`ipcluster` command::
21 21
22 22 $ ipcluster start -n 4
23 23
24 24 For more detailed information about starting the controller and engines, see
25 25 our :ref:`introduction <parallel_overview>` to using IPython for parallel computing.
26 26
27 27 Creating a ``DirectView`` instance
28 28 ==================================
29 29
30 30 The first step is to import the IPython :mod:`IPython.parallel`
31 31 module and then create a :class:`.Client` instance:
32 32
33 33 .. sourcecode:: ipython
34 34
35 35 In [1]: from IPython.parallel import Client
36 36
37 37 In [2]: rc = Client()
38 38
39 39 This form assumes that the default connection information (stored in
40 40 :file:`ipcontroller-client.json` found in :file:`IPYTHONDIR/profile_default/security`) is
41 41 accurate. If the controller was started on a remote machine, you must copy that connection
42 42 file to the client machine, or enter its contents as arguments to the Client constructor:
43 43
44 44 .. sourcecode:: ipython
45 45
46 46 # If you have copied the json connector file from the controller:
47 47 In [2]: rc = Client('/path/to/ipcontroller-client.json')
48 48 # or to connect with a specific profile you have set up:
49 49 In [3]: rc = Client(profile='mpi')
50 50
51 51
52 52 To make sure there are engines connected to the controller, users can get a list
53 53 of engine ids:
54 54
55 55 .. sourcecode:: ipython
56 56
57 57 In [3]: rc.ids
58 58 Out[3]: [0, 1, 2, 3]
59 59
60 60 Here we see that there are four engines ready to do work for us.
61 61
62 62 For direct execution, we will make use of a :class:`DirectView` object, which can be
63 63 constructed via list-access to the client:
64 64
65 65 .. sourcecode:: ipython
66 66
67 67 In [4]: dview = rc[:] # use all engines
68 68
69 69 .. seealso::
70 70
71 71 For more information, see the in-depth explanation of :ref:`Views <parallel_details>`.
72 72
73 73
74 74 Quick and easy parallelism
75 75 ==========================
76 76
77 77 In many cases, you simply want to apply a Python function to a sequence of
78 78 objects, but *in parallel*. The client interface provides a simple way
79 79 of accomplishing this: using the DirectView's :meth:`~DirectView.map` method.
80 80
81 81 Parallel map
82 82 ------------
83 83
84 84 Python's builtin :func:`map` functions allows a function to be applied to a
85 85 sequence element-by-element. This type of code is typically trivial to
86 86 parallelize. In fact, since IPython's interface is all about functions anyway,
87 87 you can just use the builtin :func:`map` with a :class:`RemoteFunction`, or a
88 88 DirectView's :meth:`map` method:
89 89
90 90 .. sourcecode:: ipython
91 91
92 92 In [62]: serial_result = map(lambda x:x**10, range(32))
93 93
94 94 In [63]: parallel_result = dview.map_sync(lambda x: x**10, range(32))
95 95
96 96 In [67]: serial_result==parallel_result
97 97 Out[67]: True
98 98
99 99
100 100 .. note::
101 101
102 102 The :class:`DirectView`'s version of :meth:`map` does
103 103 not do dynamic load balancing. For a load balanced version, use a
104 104 :class:`LoadBalancedView`.
105 105
106 106 .. seealso::
107 107
108 108 :meth:`map` is implemented via :class:`ParallelFunction`.
109 109
110 110 Remote function decorators
111 111 --------------------------
112 112
113 113 Remote functions are just like normal functions, but when they are called,
114 114 they execute on one or more engines, rather than locally. IPython provides
115 115 two decorators:
116 116
117 117 .. sourcecode:: ipython
118 118
119 119 In [10]: @dview.remote(block=True)
120 120 ....: def getpid():
121 121 ....: import os
122 122 ....: return os.getpid()
123 123 ....:
124 124
125 125 In [11]: getpid()
126 126 Out[11]: [12345, 12346, 12347, 12348]
127 127
128 128 The ``@parallel`` decorator creates parallel functions, that break up an element-wise
129 129 operations and distribute them, reconstructing the result.
130 130
131 131 .. sourcecode:: ipython
132 132
133 133 In [12]: import numpy as np
134 134
135 135 In [13]: A = np.random.random((64,48))
136 136
137 137 In [14]: @dview.parallel(block=True)
138 138 ....: def pmul(A,B):
139 139 ....: return A*B
140 140
141 141 In [15]: C_local = A*A
142 142
143 143 In [16]: C_remote = pmul(A,A)
144 144
145 145 In [17]: (C_local == C_remote).all()
146 146 Out[17]: True
147 147
148 148 Calling a ``@parallel`` function *does not* correspond to map. It is used for splitting
149 149 element-wise operations that operate on a sequence or array. For ``map`` behavior,
150 150 parallel functions do have a map method.
151 151
152 152 ==================== ============================ =============================
153 153 call pfunc(seq) pfunc.map(seq)
154 154 ==================== ============================ =============================
155 155 # of tasks # of engines (1 per engine) # of engines (1 per engine)
156 156 # of remote calls # of engines (1 per engine) ``len(seq)``
157 157 argument to remote ``seq[i:j]`` (sub-sequence) ``seq[i]`` (single element)
158 158 ==================== ============================ =============================
159 159
160 160 A quick example to illustrate the difference in arguments for the two modes:
161 161
162 162 .. sourcecode:: ipython
163 163
164 164 In [16]: @dview.parallel(block=True)
165 165 ....: def echo(x):
166 166 ....: return str(x)
167 167 ....:
168 168
169 169 In [17]: echo(range(5))
170 170 Out[17]: ['[0, 1]', '[2]', '[3]', '[4]']
171 171
172 172 In [18]: echo.map(range(5))
173 173 Out[18]: ['0', '1', '2', '3', '4']
174 174
175 175
176 176 .. seealso::
177 177
178 178 See the :func:`~.remotefunction.parallel` and :func:`~.remotefunction.remote`
179 179 decorators for options.
180 180
181 181 Calling Python functions
182 182 ========================
183 183
184 184 The most basic type of operation that can be performed on the engines is to
185 185 execute Python code or call Python functions. Executing Python code can be
186 186 done in blocking or non-blocking mode (non-blocking is default) using the
187 187 :meth:`.View.execute` method, and calling functions can be done via the
188 188 :meth:`.View.apply` method.
189 189
190 190 apply
191 191 -----
192 192
193 193 The main method for doing remote execution (in fact, all methods that
194 194 communicate with the engines are built on top of it), is :meth:`View.apply`.
195 195
196 196 We strive to provide the cleanest interface we can, so `apply` has the following
197 197 signature:
198 198
199 199 .. sourcecode:: python
200 200
201 201 view.apply(f, *args, **kwargs)
202 202
203 203 There are various ways to call functions with IPython, and these flags are set as
204 204 attributes of the View. The ``DirectView`` has just two of these flags:
205 205
206 206 dv.block : bool
207 207 whether to wait for the result, or return an :class:`AsyncResult` object
208 208 immediately
209 209 dv.track : bool
210 210 whether to instruct pyzmq to track when zeromq is done sending the message.
211 211 This is primarily useful for non-copying sends of numpy arrays that you plan to
212 212 edit in-place. You need to know when it becomes safe to edit the buffer
213 213 without corrupting the message.
214 214 dv.targets : int, list of ints
215 215 which targets this view is associated with.
216 216
217 217
218 218 Creating a view is simple: index-access on a client creates a :class:`.DirectView`.
219 219
220 220 .. sourcecode:: ipython
221 221
222 222 In [4]: view = rc[1:3]
223 223 Out[4]: <DirectView [1, 2]>
224 224
225 225 In [5]: view.apply<tab>
226 226 view.apply view.apply_async view.apply_sync
227 227
228 228 For convenience, you can set block temporarily for a single call with the extra sync/async methods.
229 229
230 230 Blocking execution
231 231 ------------------
232 232
233 233 In blocking mode, the :class:`.DirectView` object (called ``dview`` in
234 234 these examples) submits the command to the controller, which places the
235 235 command in the engines' queues for execution. The :meth:`apply` call then
236 236 blocks until the engines are done executing the command:
237 237
238 238 .. sourcecode:: ipython
239 239
240 240 In [2]: dview = rc[:] # A DirectView of all engines
241 241 In [3]: dview.block=True
242 242 In [4]: dview['a'] = 5
243 243
244 244 In [5]: dview['b'] = 10
245 245
246 246 In [6]: dview.apply(lambda x: a+b+x, 27)
247 247 Out[6]: [42, 42, 42, 42]
248 248
249 249 You can also select blocking execution on a call-by-call basis with the :meth:`apply_sync`
250 250 method:
251 251
252 252 In [7]: dview.block=False
253 253
254 254 In [8]: dview.apply_sync(lambda x: a+b+x, 27)
255 255 Out[8]: [42, 42, 42, 42]
256 256
257 257 Python commands can be executed as strings on specific engines by using a View's ``execute``
258 258 method:
259 259
260 260 .. sourcecode:: ipython
261 261
262 262 In [6]: rc[::2].execute('c=a+b')
263 263
264 264 In [7]: rc[1::2].execute('c=a-b')
265 265
266 266 In [8]: dview['c'] # shorthand for dview.pull('c', block=True)
267 267 Out[8]: [15, -5, 15, -5]
268 268
269 269
270 270 Non-blocking execution
271 271 ----------------------
272 272
273 273 In non-blocking mode, :meth:`apply` submits the command to be executed and
274 274 then returns a :class:`AsyncResult` object immediately. The
275 275 :class:`AsyncResult` object gives you a way of getting a result at a later
276 276 time through its :meth:`get` method.
277 277
278 278 .. seealso::
279 279
280 280 Docs on the :ref:`AsyncResult <parallel_asyncresult>` object.
281 281
282 282 This allows you to quickly submit long running commands without blocking your
283 283 local Python/IPython session:
284 284
285 285 .. sourcecode:: ipython
286 286
287 287 # define our function
288 288 In [6]: def wait(t):
289 289 ....: import time
290 290 ....: tic = time.time()
291 291 ....: time.sleep(t)
292 292 ....: return time.time()-tic
293 293
294 294 # In non-blocking mode
295 295 In [7]: ar = dview.apply_async(wait, 2)
296 296
297 297 # Now block for the result
298 298 In [8]: ar.get()
299 299 Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154]
300 300
301 301 # Again in non-blocking mode
302 302 In [9]: ar = dview.apply_async(wait, 10)
303 303
304 304 # Poll to see if the result is ready
305 305 In [10]: ar.ready()
306 306 Out[10]: False
307 307
308 308 # ask for the result, but wait a maximum of 1 second:
309 309 In [45]: ar.get(1)
310 310 ---------------------------------------------------------------------------
311 311 TimeoutError Traceback (most recent call last)
312 312 /home/you/<ipython-input-45-7cd858bbb8e0> in <module>()
313 313 ----> 1 ar.get(1)
314 314
315 315 /path/to/site-packages/IPython/parallel/asyncresult.pyc in get(self, timeout)
316 316 62 raise self._exception
317 317 63 else:
318 318 ---> 64 raise error.TimeoutError("Result not ready.")
319 319 65
320 320 66 def ready(self):
321 321
322 322 TimeoutError: Result not ready.
323 323
324 324 .. Note::
325 325
326 326 Note the import inside the function. This is a common model, to ensure
327 327 that the appropriate modules are imported where the task is run. You can
328 328 also manually import modules into the engine(s) namespace(s) via
329 329 :meth:`view.execute('import numpy')`.
330 330
331 331 Often, it is desirable to wait until a set of :class:`AsyncResult` objects
332 332 are done. For this, there is a the method :meth:`wait`. This method takes a
333 333 tuple of :class:`AsyncResult` objects (or `msg_ids` or indices to the client's History),
334 334 and blocks until all of the associated results are ready:
335 335
336 336 .. sourcecode:: ipython
337 337
338 338 In [72]: dview.block=False
339 339
340 340 # A trivial list of AsyncResults objects
341 341 In [73]: pr_list = [dview.apply_async(wait, 3) for i in range(10)]
342 342
343 343 # Wait until all of them are done
344 344 In [74]: dview.wait(pr_list)
345 345
346 346 # Then, their results are ready using get() or the `.r` attribute
347 347 In [75]: pr_list[0].get()
348 348 Out[75]: [2.9982571601867676, 2.9982588291168213, 2.9987530708312988, 2.9990990161895752]
349 349
350 350
351 351
352 352 The ``block`` and ``targets`` keyword arguments and attributes
353 353 --------------------------------------------------------------
354 354
355 355 Most DirectView methods (excluding :meth:`apply`) accept ``block`` and
356 356 ``targets`` as keyword arguments. As we have seen above, these keyword arguments control the
357 357 blocking mode and which engines the command is applied to. The :class:`View` class also has
358 358 :attr:`block` and :attr:`targets` attributes that control the default behavior when the keyword
359 359 arguments are not provided. Thus the following logic is used for :attr:`block` and :attr:`targets`:
360 360
361 361 * If no keyword argument is provided, the instance attributes are used.
362 362 * Keyword argument, if provided override the instance attributes for
363 363 the duration of a single call.
364 364
365 365 The following examples demonstrate how to use the instance attributes:
366 366
367 367 .. sourcecode:: ipython
368 368
369 369 In [16]: dview.targets = [0,2]
370 370
371 371 In [17]: dview.block = False
372 372
373 373 In [18]: ar = dview.apply(lambda : 10)
374 374
375 375 In [19]: ar.get()
376 376 Out[19]: [10, 10]
377 377
378 378 In [16]: dview.targets = v.client.ids # all engines (4)
379 379
380 380 In [21]: dview.block = True
381 381
382 382 In [22]: dview.apply(lambda : 42)
383 383 Out[22]: [42, 42, 42, 42]
384 384
385 385 The :attr:`block` and :attr:`targets` instance attributes of the
386 386 :class:`.DirectView` also determine the behavior of the parallel magic commands.
387 387
388 388 Parallel magic commands
389 389 -----------------------
390 390
391 391 We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``)
392 392 that make it a bit more pleasant to execute Python commands on the engines interactively.
393 393 These are simply shortcuts to :meth:`.DirectView.execute`
394 394 and :meth:`.AsyncResult.display_outputs` methods repsectively.
395 395 The ``%px`` magic executes a single Python command on the engines
396 396 specified by the :attr:`targets` attribute of the :class:`DirectView` instance:
397 397
398 398 .. sourcecode:: ipython
399 399
400 400 # Create a DirectView for all targets
401 401 In [22]: dv = rc[:]
402 402
403 403 # Make this DirectView active for parallel magic commands
404 404 In [23]: dv.activate()
405 405
406 406 In [24]: dv.block=True
407 407
408 408 # import numpy here and everywhere
409 409 In [25]: with dv.sync_imports():
410 410 ....: import numpy
411 411 importing numpy on engine(s)
412 412
413 413 In [27]: %px a = numpy.random.rand(2,2)
414 414 Parallel execution on engines: [0, 1, 2, 3]
415 415
416 416 In [28]: %px numpy.linalg.eigvals(a)
417 417 Parallel execution on engines: [0, 1, 2, 3]
418 [ 0] Out[68]: array([ 0.77120707, -0.19448286])
419 [ 1] Out[68]: array([ 1.10815921, 0.05110369])
420 [ 2] Out[68]: array([ 0.74625527, -0.37475081])
421 [ 3] Out[68]: array([ 0.72931905, 0.07159743])
418 [0] Out[68]: array([ 0.77120707, -0.19448286])
419 [1] Out[68]: array([ 1.10815921, 0.05110369])
420 [2] Out[68]: array([ 0.74625527, -0.37475081])
421 [3] Out[68]: array([ 0.72931905, 0.07159743])
422 422
423 423 In [29]: %px print 'hi'
424 424 Parallel execution on engine(s): [0, 1, 2, 3]
425 [stdout: 0] hi
426 [stdout: 1] hi
427 [stdout: 2] hi
428 [stdout: 3] hi
425 [stdout:0] hi
426 [stdout:1] hi
427 [stdout:2] hi
428 [stdout:3] hi
429 429
430 430
431 431 Since engines are IPython as well, you can even run magics remotely:
432 432
433 433 .. sourcecode:: ipython
434 434
435 435 In [28]: %px %pylab inline
436 436 Parallel execution on engine(s): [0, 1, 2, 3]
437 [stdout: 0]
437 [stdout:0]
438 438 Welcome to pylab, a matplotlib-based Python environment...
439 439 For more information, type 'help(pylab)'.
440 [stdout: 1]
440 [stdout:1]
441 441 Welcome to pylab, a matplotlib-based Python environment...
442 442 For more information, type 'help(pylab)'.
443 [stdout: 2]
443 [stdout:2]
444 444 Welcome to pylab, a matplotlib-based Python environment...
445 445 For more information, type 'help(pylab)'.
446 [stdout: 3]
446 [stdout:3]
447 447 Welcome to pylab, a matplotlib-based Python environment...
448 448 For more information, type 'help(pylab)'.
449 449
450 450 And once in pylab mode with the inline backend,
451 451 you can make plots and they will be displayed in your frontend
452 452 if it suports the inline figures (e.g. notebook or qtconsole):
453 453
454 454 .. sourcecode:: ipython
455 455
456 456 In [40]: %px plot(rand(100))
457 457 Parallel execution on engine(s): [0, 1, 2, 3]
458 458 <plot0>
459 459 <plot1>
460 460 <plot2>
461 461 <plot3>
462 [ 0] Out[79]: [<matplotlib.lines.Line2D at 0x10a6286d0>]
463 [ 1] Out[79]: [<matplotlib.lines.Line2D at 0x10b9476d0>]
464 [ 2] Out[79]: [<matplotlib.lines.Line2D at 0x110652750>]
465 [ 3] Out[79]: [<matplotlib.lines.Line2D at 0x10c6566d0>]
462 [0] Out[79]: [<matplotlib.lines.Line2D at 0x10a6286d0>]
463 [1] Out[79]: [<matplotlib.lines.Line2D at 0x10b9476d0>]
464 [2] Out[79]: [<matplotlib.lines.Line2D at 0x110652750>]
465 [3] Out[79]: [<matplotlib.lines.Line2D at 0x10c6566d0>]
466 466
467 467
468 468 ``%%px`` Cell Magic
469 469 *******************
470 470
471 471 `%%px` can also be used as a Cell Magic, which accepts ``--[no]block`` flags,
472 472 and a ``--group-outputs`` argument, which adjust how the outputs of multiple
473 473 engines are presented.
474 474
475 475 .. seealso::
476 476
477 477 :meth:`.AsyncResult.display_outputs` for the grouping options.
478 478
479 479 .. sourcecode:: ipython
480 480
481 481 In [50]: %%px --block --group-outputs=engine
482 482 ....: import numpy as np
483 483 ....: A = np.random.random((2,2))
484 484 ....: ev = numpy.linalg.eigvals(A)
485 485 ....: print ev
486 486 ....: ev.max()
487 487 ....:
488 488 Parallel execution on engine(s): [0, 1, 2, 3]
489 [stdout: 0] [ 0.60640442 0.95919621]
490 [ 0] Out[73]: 0.9591962130899806
491 [stdout: 1] [ 0.38501813 1.29430871]
492 [ 1] Out[73]: 1.2943087091452372
493 [stdout: 2] [-0.85925141 0.9387692 ]
494 [ 2] Out[73]: 0.93876920456230284
495 [stdout: 3] [ 0.37998269 1.24218246]
496 [ 3] Out[73]: 1.2421824618493817
489 [stdout:0] [ 0.60640442 0.95919621]
490 [0] Out[73]: 0.9591962130899806
491 [stdout:1] [ 0.38501813 1.29430871]
492 [1] Out[73]: 1.2943087091452372
493 [stdout:2] [-0.85925141 0.9387692 ]
494 [2] Out[73]: 0.93876920456230284
495 [stdout:3] [ 0.37998269 1.24218246]
496 [3] Out[73]: 1.2421824618493817
497 497
498 498 ``%result`` Magic
499 499 *****************
500 500
501 501 If you are using ``%px`` in non-blocking mode, you won't get output.
502 502 You can use ``%result`` to display the outputs of the latest command,
503 503 just as is done when ``%px`` is blocking:
504 504
505 505 .. sourcecode:: ipython
506 506
507 507 In [39]: dv.block = False
508 508
509 509 In [40]: %px print 'hi'
510 510 Async parallel execution on engine(s): [0, 1, 2, 3]
511 511
512 512 In [41]: %result
513 [stdout: 0] hi
514 [stdout: 1] hi
515 [stdout: 2] hi
516 [stdout: 3] hi
513 [stdout:0] hi
514 [stdout:1] hi
515 [stdout:2] hi
516 [stdout:3] hi
517 517
518 518 ``%result`` simply calls :meth:`.AsyncResult.display_outputs` on the most recent request.
519 519 You can pass integers as indices if you want a result other than the latest,
520 520 e.g. ``%result -2``, or ``%result 0`` for the first.
521 521
522 522
523 523 ``%autopx``
524 524 ***********
525 525
526 526 The ``%autopx`` magic switches to a mode where everything you type is executed
527 527 on the engines until you do ``%autopx`` again.
528 528
529 529 .. sourcecode:: ipython
530 530
531 531 In [30]: dv.block=True
532 532
533 533 In [31]: %autopx
534 534 %autopx enabled
535 535
536 536 In [32]: max_evals = []
537 537
538 538 In [33]: for i in range(100):
539 539 ....: a = numpy.random.rand(10,10)
540 540 ....: a = a+a.transpose()
541 541 ....: evals = numpy.linalg.eigvals(a)
542 542 ....: max_evals.append(evals[0].real)
543 543 ....:
544 544
545 545 In [34]: print "Average max eigenvalue is: %f" % (sum(max_evals)/len(max_evals))
546 [stdout: 0] Average max eigenvalue is: 10.193101
547 [stdout: 1] Average max eigenvalue is: 10.064508
548 [stdout: 2] Average max eigenvalue is: 10.055724
549 [stdout: 3] Average max eigenvalue is: 10.086876
546 [stdout:0] Average max eigenvalue is: 10.193101
547 [stdout:1] Average max eigenvalue is: 10.064508
548 [stdout:2] Average max eigenvalue is: 10.055724
549 [stdout:3] Average max eigenvalue is: 10.086876
550 550
551 551 In [35]: %autopx
552 552 Auto Parallel Disabled
553 553
554 554
555 555 Moving Python objects around
556 556 ============================
557 557
558 558 In addition to calling functions and executing code on engines, you can
559 559 transfer Python objects to and from your IPython session and the engines. In
560 560 IPython, these operations are called :meth:`push` (sending an object to the
561 561 engines) and :meth:`pull` (getting an object from the engines).
562 562
563 563 Basic push and pull
564 564 -------------------
565 565
566 566 Here are some examples of how you use :meth:`push` and :meth:`pull`:
567 567
568 568 .. sourcecode:: ipython
569 569
570 570 In [38]: dview.push(dict(a=1.03234,b=3453))
571 571 Out[38]: [None,None,None,None]
572 572
573 573 In [39]: dview.pull('a')
574 574 Out[39]: [ 1.03234, 1.03234, 1.03234, 1.03234]
575 575
576 576 In [40]: dview.pull('b', targets=0)
577 577 Out[40]: 3453
578 578
579 579 In [41]: dview.pull(('a','b'))
580 580 Out[41]: [ [1.03234, 3453], [1.03234, 3453], [1.03234, 3453], [1.03234, 3453] ]
581 581
582 582 In [43]: dview.push(dict(c='speed'))
583 583 Out[43]: [None,None,None,None]
584 584
585 585 In non-blocking mode :meth:`push` and :meth:`pull` also return
586 586 :class:`AsyncResult` objects:
587 587
588 588 .. sourcecode:: ipython
589 589
590 590 In [48]: ar = dview.pull('a', block=False)
591 591
592 592 In [49]: ar.get()
593 593 Out[49]: [1.03234, 1.03234, 1.03234, 1.03234]
594 594
595 595
596 596 Dictionary interface
597 597 --------------------
598 598
599 599 Since a Python namespace is just a :class:`dict`, :class:`DirectView` objects provide
600 600 dictionary-style access by key and methods such as :meth:`get` and
601 601 :meth:`update` for convenience. This make the remote namespaces of the engines
602 602 appear as a local dictionary. Underneath, these methods call :meth:`apply`:
603 603
604 604 .. sourcecode:: ipython
605 605
606 606 In [51]: dview['a']=['foo','bar']
607 607
608 608 In [52]: dview['a']
609 609 Out[52]: [ ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'] ]
610 610
611 611 Scatter and gather
612 612 ------------------
613 613
614 614 Sometimes it is useful to partition a sequence and push the partitions to
615 615 different engines. In MPI language, this is know as scatter/gather and we
616 616 follow that terminology. However, it is important to remember that in
617 617 IPython's :class:`Client` class, :meth:`scatter` is from the
618 618 interactive IPython session to the engines and :meth:`gather` is from the
619 619 engines back to the interactive IPython session. For scatter/gather operations
620 620 between engines, MPI, pyzmq, or some other direct interconnect should be used.
621 621
622 622 .. sourcecode:: ipython
623 623
624 624 In [58]: dview.scatter('a',range(16))
625 625 Out[58]: [None,None,None,None]
626 626
627 627 In [59]: dview['a']
628 628 Out[59]: [ [0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15] ]
629 629
630 630 In [60]: dview.gather('a')
631 631 Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
632 632
633 633 Other things to look at
634 634 =======================
635 635
636 636 How to do parallel list comprehensions
637 637 --------------------------------------
638 638
639 639 In many cases list comprehensions are nicer than using the map function. While
640 640 we don't have fully parallel list comprehensions, it is simple to get the
641 641 basic effect using :meth:`scatter` and :meth:`gather`:
642 642
643 643 .. sourcecode:: ipython
644 644
645 645 In [66]: dview.scatter('x',range(64))
646 646
647 647 In [67]: %px y = [i**10 for i in x]
648 648 Parallel execution on engines: [0, 1, 2, 3]
649 649 Out[67]:
650 650
651 651 In [68]: y = dview.gather('y')
652 652
653 653 In [69]: print y
654 654 [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...]
655 655
656 656 Remote imports
657 657 --------------
658 658
659 659 Sometimes you will want to import packages both in your interactive session
660 660 and on your remote engines. This can be done with the :class:`ContextManager`
661 661 created by a DirectView's :meth:`sync_imports` method:
662 662
663 663 .. sourcecode:: ipython
664 664
665 665 In [69]: with dview.sync_imports():
666 666 ....: import numpy
667 667 importing numpy on engine(s)
668 668
669 669 Any imports made inside the block will also be performed on the view's engines.
670 670 sync_imports also takes a `local` boolean flag that defaults to True, which specifies
671 671 whether the local imports should also be performed. However, support for `local=False`
672 672 has not been implemented, so only packages that can be imported locally will work
673 673 this way.
674 674
675 675 You can also specify imports via the ``@require`` decorator. This is a decorator
676 676 designed for use in Dependencies, but can be used to handle remote imports as well.
677 677 Modules or module names passed to ``@require`` will be imported before the decorated
678 678 function is called. If they cannot be imported, the decorated function will never
679 679 execution, and will fail with an UnmetDependencyError.
680 680
681 681 .. sourcecode:: ipython
682 682
683 683 In [69]: from IPython.parallel import require
684 684
685 685 In [70]: @require('re'):
686 686 ....: def findall(pat, x):
687 687 ....: # re is guaranteed to be available
688 688 ....: return re.findall(pat, x)
689 689
690 690 # you can also pass modules themselves, that you already have locally:
691 691 In [71]: @require(time):
692 692 ....: def wait(t):
693 693 ....: time.sleep(t)
694 694 ....: return t
695 695
696 696 .. _parallel_exceptions:
697 697
698 698 Parallel exceptions
699 699 -------------------
700 700
701 701 In the multiengine interface, parallel commands can raise Python exceptions,
702 702 just like serial commands. But, it is a little subtle, because a single
703 703 parallel command can actually raise multiple exceptions (one for each engine
704 704 the command was run on). To express this idea, we have a
705 705 :exc:`CompositeError` exception class that will be raised in most cases. The
706 706 :exc:`CompositeError` class is a special type of exception that wraps one or
707 707 more other types of exceptions. Here is how it works:
708 708
709 709 .. sourcecode:: ipython
710 710
711 711 In [76]: dview.block=True
712 712
713 713 In [77]: dview.execute('1/0')
714 714 ---------------------------------------------------------------------------
715 715 CompositeError Traceback (most recent call last)
716 716 /home/user/<ipython-input-10-5d56b303a66c> in <module>()
717 717 ----> 1 dview.execute('1/0')
718 718
719 719 /path/to/site-packages/IPython/parallel/client/view.pyc in execute(self, code, targets, block)
720 720 591 default: self.block
721 721 592 """
722 722 --> 593 return self._really_apply(util._execute, args=(code,), block=block, targets=targets)
723 723 594
724 724 595 def run(self, filename, targets=None, block=None):
725 725
726 726 /home/user/<string> in _really_apply(self, f, args, kwargs, targets, block, track)
727 727
728 728 /path/to/site-packages/IPython/parallel/client/view.pyc in sync_results(f, self, *args, **kwargs)
729 729 55 def sync_results(f, self, *args, **kwargs):
730 730 56 """sync relevant results from self.client to our results attribute."""
731 731 ---> 57 ret = f(self, *args, **kwargs)
732 732 58 delta = self.outstanding.difference(self.client.outstanding)
733 733 59 completed = self.outstanding.intersection(delta)
734 734
735 735 /home/user/<string> in _really_apply(self, f, args, kwargs, targets, block, track)
736 736
737 737 /path/to/site-packages/IPython/parallel/client/view.pyc in save_ids(f, self, *args, **kwargs)
738 738 44 n_previous = len(self.client.history)
739 739 45 try:
740 740 ---> 46 ret = f(self, *args, **kwargs)
741 741 47 finally:
742 742 48 nmsgs = len(self.client.history) - n_previous
743 743
744 744 /path/to/site-packages/IPython/parallel/client/view.pyc in _really_apply(self, f, args, kwargs, targets, block, track)
745 745 529 if block:
746 746 530 try:
747 747 --> 531 return ar.get()
748 748 532 except KeyboardInterrupt:
749 749 533 pass
750 750
751 751 /path/to/site-packages/IPython/parallel/client/asyncresult.pyc in get(self, timeout)
752 752 101 return self._result
753 753 102 else:
754 754 --> 103 raise self._exception
755 755 104 else:
756 756 105 raise error.TimeoutError("Result not ready.")
757 757
758 758 CompositeError: one or more exceptions from call to method: _execute
759 759 [0:apply]: ZeroDivisionError: integer division or modulo by zero
760 760 [1:apply]: ZeroDivisionError: integer division or modulo by zero
761 761 [2:apply]: ZeroDivisionError: integer division or modulo by zero
762 762 [3:apply]: ZeroDivisionError: integer division or modulo by zero
763 763
764 764 Notice how the error message printed when :exc:`CompositeError` is raised has
765 765 information about the individual exceptions that were raised on each engine.
766 766 If you want, you can even raise one of these original exceptions:
767 767
768 768 .. sourcecode:: ipython
769 769
770 770 In [80]: try:
771 771 ....: dview.execute('1/0')
772 772 ....: except parallel.error.CompositeError, e:
773 773 ....: e.raise_exception()
774 774 ....:
775 775 ....:
776 776 ---------------------------------------------------------------------------
777 777 RemoteError Traceback (most recent call last)
778 778 /home/user/<ipython-input-17-8597e7e39858> in <module>()
779 779 2 dview.execute('1/0')
780 780 3 except CompositeError as e:
781 781 ----> 4 e.raise_exception()
782 782
783 783 /path/to/site-packages/IPython/parallel/error.pyc in raise_exception(self, excid)
784 784 266 raise IndexError("an exception with index %i does not exist"%excid)
785 785 267 else:
786 786 --> 268 raise RemoteError(en, ev, etb, ei)
787 787 269
788 788 270
789 789
790 790 RemoteError: ZeroDivisionError(integer division or modulo by zero)
791 791 Traceback (most recent call last):
792 792 File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
793 793 exec code in working,working
794 794 File "<string>", line 1, in <module>
795 795 File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
796 796 exec code in globals()
797 797 File "<string>", line 1, in <module>
798 798 ZeroDivisionError: integer division or modulo by zero
799 799
800 800 If you are working in IPython, you can simple type ``%debug`` after one of
801 801 these :exc:`CompositeError` exceptions is raised, and inspect the exception
802 802 instance:
803 803
804 804 .. sourcecode:: ipython
805 805
806 806 In [81]: dview.execute('1/0')
807 807 ---------------------------------------------------------------------------
808 808 CompositeError Traceback (most recent call last)
809 809 /home/user/<ipython-input-10-5d56b303a66c> in <module>()
810 810 ----> 1 dview.execute('1/0')
811 811
812 812 /path/to/site-packages/IPython/parallel/client/view.pyc in execute(self, code, targets, block)
813 813 591 default: self.block
814 814 592 """
815 815 --> 593 return self._really_apply(util._execute, args=(code,), block=block, targets=targets)
816 816 594
817 817 595 def run(self, filename, targets=None, block=None):
818 818
819 819 /home/user/<string> in _really_apply(self, f, args, kwargs, targets, block, track)
820 820
821 821 /path/to/site-packages/IPython/parallel/client/view.pyc in sync_results(f, self, *args, **kwargs)
822 822 55 def sync_results(f, self, *args, **kwargs):
823 823 56 """sync relevant results from self.client to our results attribute."""
824 824 ---> 57 ret = f(self, *args, **kwargs)
825 825 58 delta = self.outstanding.difference(self.client.outstanding)
826 826 59 completed = self.outstanding.intersection(delta)
827 827
828 828 /home/user/<string> in _really_apply(self, f, args, kwargs, targets, block, track)
829 829
830 830 /path/to/site-packages/IPython/parallel/client/view.pyc in save_ids(f, self, *args, **kwargs)
831 831 44 n_previous = len(self.client.history)
832 832 45 try:
833 833 ---> 46 ret = f(self, *args, **kwargs)
834 834 47 finally:
835 835 48 nmsgs = len(self.client.history) - n_previous
836 836
837 837 /path/to/site-packages/IPython/parallel/client/view.pyc in _really_apply(self, f, args, kwargs, targets, block, track)
838 838 529 if block:
839 839 530 try:
840 840 --> 531 return ar.get()
841 841 532 except KeyboardInterrupt:
842 842 533 pass
843 843
844 844 /path/to/site-packages/IPython/parallel/client/asyncresult.pyc in get(self, timeout)
845 845 101 return self._result
846 846 102 else:
847 847 --> 103 raise self._exception
848 848 104 else:
849 849 105 raise error.TimeoutError("Result not ready.")
850 850
851 851 CompositeError: one or more exceptions from call to method: _execute
852 852 [0:apply]: ZeroDivisionError: integer division or modulo by zero
853 853 [1:apply]: ZeroDivisionError: integer division or modulo by zero
854 854 [2:apply]: ZeroDivisionError: integer division or modulo by zero
855 855 [3:apply]: ZeroDivisionError: integer division or modulo by zero
856 856
857 857 In [82]: %debug
858 858 > /path/to/site-packages/IPython/parallel/client/asyncresult.py(103)get()
859 859 102 else:
860 860 --> 103 raise self._exception
861 861 104 else:
862 862
863 863 # With the debugger running, self._exception is the exceptions instance. We can tab complete
864 864 # on it and see the extra methods that are available.
865 865 ipdb> self._exception.<tab>
866 866 e.__class__ e.__getitem__ e.__new__ e.__setstate__ e.args
867 867 e.__delattr__ e.__getslice__ e.__reduce__ e.__str__ e.elist
868 868 e.__dict__ e.__hash__ e.__reduce_ex__ e.__weakref__ e.message
869 869 e.__doc__ e.__init__ e.__repr__ e._get_engine_str e.print_tracebacks
870 870 e.__getattribute__ e.__module__ e.__setattr__ e._get_traceback e.raise_exception
871 871 ipdb> self._exception.print_tracebacks()
872 872 [0:apply]:
873 873 Traceback (most recent call last):
874 874 File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
875 875 exec code in working,working
876 876 File "<string>", line 1, in <module>
877 877 File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
878 878 exec code in globals()
879 879 File "<string>", line 1, in <module>
880 880 ZeroDivisionError: integer division or modulo by zero
881 881
882 882
883 883 [1:apply]:
884 884 Traceback (most recent call last):
885 885 File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
886 886 exec code in working,working
887 887 File "<string>", line 1, in <module>
888 888 File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
889 889 exec code in globals()
890 890 File "<string>", line 1, in <module>
891 891 ZeroDivisionError: integer division or modulo by zero
892 892
893 893
894 894 [2:apply]:
895 895 Traceback (most recent call last):
896 896 File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
897 897 exec code in working,working
898 898 File "<string>", line 1, in <module>
899 899 File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
900 900 exec code in globals()
901 901 File "<string>", line 1, in <module>
902 902 ZeroDivisionError: integer division or modulo by zero
903 903
904 904
905 905 [3:apply]:
906 906 Traceback (most recent call last):
907 907 File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
908 908 exec code in working,working
909 909 File "<string>", line 1, in <module>
910 910 File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
911 911 exec code in globals()
912 912 File "<string>", line 1, in <module>
913 913 ZeroDivisionError: integer division or modulo by zero
914 914
915 915
916 916 All of this same error handling magic even works in non-blocking mode:
917 917
918 918 .. sourcecode:: ipython
919 919
920 920 In [83]: dview.block=False
921 921
922 922 In [84]: ar = dview.execute('1/0')
923 923
924 924 In [85]: ar.get()
925 925 ---------------------------------------------------------------------------
926 926 CompositeError Traceback (most recent call last)
927 927 /home/user/<ipython-input-21-8531eb3d26fb> in <module>()
928 928 ----> 1 ar.get()
929 929
930 930 /path/to/site-packages/IPython/parallel/client/asyncresult.pyc in get(self, timeout)
931 931 101 return self._result
932 932 102 else:
933 933 --> 103 raise self._exception
934 934 104 else:
935 935 105 raise error.TimeoutError("Result not ready.")
936 936
937 937 CompositeError: one or more exceptions from call to method: _execute
938 938 [0:apply]: ZeroDivisionError: integer division or modulo by zero
939 939 [1:apply]: ZeroDivisionError: integer division or modulo by zero
940 940 [2:apply]: ZeroDivisionError: integer division or modulo by zero
941 941 [3:apply]: ZeroDivisionError: integer division or modulo by zero
942 942
General Comments 0
You need to be logged in to leave comments. Login now