##// END OF EJS Templates
skip ugly %2i formatting...
MinRK -
Show More
@@ -1,651 +1,651 b''
1 """AsyncResult objects for the client
1 """AsyncResult objects for the client
2
2
3 Authors:
3 Authors:
4
4
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import sys
18 import sys
19 import time
19 import time
20 from datetime import datetime
20 from datetime import datetime
21
21
22 from zmq import MessageTracker
22 from zmq import MessageTracker
23
23
24 from IPython.core.display import clear_output, display
24 from IPython.core.display import clear_output, display
25 from IPython.external.decorator import decorator
25 from IPython.external.decorator import decorator
26 from IPython.parallel import error
26 from IPython.parallel import error
27
27
28 #-----------------------------------------------------------------------------
28 #-----------------------------------------------------------------------------
29 # Functions
29 # Functions
30 #-----------------------------------------------------------------------------
30 #-----------------------------------------------------------------------------
31
31
32 def _total_seconds(td):
32 def _total_seconds(td):
33 """timedelta.total_seconds was added in 2.7"""
33 """timedelta.total_seconds was added in 2.7"""
34 try:
34 try:
35 # Python >= 2.7
35 # Python >= 2.7
36 return td.total_seconds()
36 return td.total_seconds()
37 except AttributeError:
37 except AttributeError:
38 # Python 2.6
38 # Python 2.6
39 return 1e-6 * (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)
39 return 1e-6 * (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)
40
40
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42 # Classes
42 # Classes
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44
44
45 # global empty tracker that's always done:
45 # global empty tracker that's always done:
46 finished_tracker = MessageTracker()
46 finished_tracker = MessageTracker()
47
47
48 @decorator
48 @decorator
49 def check_ready(f, self, *args, **kwargs):
49 def check_ready(f, self, *args, **kwargs):
50 """Call spin() to sync state prior to calling the method."""
50 """Call spin() to sync state prior to calling the method."""
51 self.wait(0)
51 self.wait(0)
52 if not self._ready:
52 if not self._ready:
53 raise error.TimeoutError("result not ready")
53 raise error.TimeoutError("result not ready")
54 return f(self, *args, **kwargs)
54 return f(self, *args, **kwargs)
55
55
56 class AsyncResult(object):
56 class AsyncResult(object):
57 """Class for representing results of non-blocking calls.
57 """Class for representing results of non-blocking calls.
58
58
59 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
59 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
60 """
60 """
61
61
62 msg_ids = None
62 msg_ids = None
63 _targets = None
63 _targets = None
64 _tracker = None
64 _tracker = None
65 _single_result = False
65 _single_result = False
66
66
67 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
67 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
68 if isinstance(msg_ids, basestring):
68 if isinstance(msg_ids, basestring):
69 # always a list
69 # always a list
70 msg_ids = [msg_ids]
70 msg_ids = [msg_ids]
71 if tracker is None:
71 if tracker is None:
72 # default to always done
72 # default to always done
73 tracker = finished_tracker
73 tracker = finished_tracker
74 self._client = client
74 self._client = client
75 self.msg_ids = msg_ids
75 self.msg_ids = msg_ids
76 self._fname=fname
76 self._fname=fname
77 self._targets = targets
77 self._targets = targets
78 self._tracker = tracker
78 self._tracker = tracker
79 self._ready = False
79 self._ready = False
80 self._success = None
80 self._success = None
81 self._metadata = None
81 self._metadata = None
82 if len(msg_ids) == 1:
82 if len(msg_ids) == 1:
83 self._single_result = not isinstance(targets, (list, tuple))
83 self._single_result = not isinstance(targets, (list, tuple))
84 else:
84 else:
85 self._single_result = False
85 self._single_result = False
86
86
87 def __repr__(self):
87 def __repr__(self):
88 if self._ready:
88 if self._ready:
89 return "<%s: finished>"%(self.__class__.__name__)
89 return "<%s: finished>"%(self.__class__.__name__)
90 else:
90 else:
91 return "<%s: %s>"%(self.__class__.__name__,self._fname)
91 return "<%s: %s>"%(self.__class__.__name__,self._fname)
92
92
93
93
94 def _reconstruct_result(self, res):
94 def _reconstruct_result(self, res):
95 """Reconstruct our result from actual result list (always a list)
95 """Reconstruct our result from actual result list (always a list)
96
96
97 Override me in subclasses for turning a list of results
97 Override me in subclasses for turning a list of results
98 into the expected form.
98 into the expected form.
99 """
99 """
100 if self._single_result:
100 if self._single_result:
101 return res[0]
101 return res[0]
102 else:
102 else:
103 return res
103 return res
104
104
105 def get(self, timeout=-1):
105 def get(self, timeout=-1):
106 """Return the result when it arrives.
106 """Return the result when it arrives.
107
107
108 If `timeout` is not ``None`` and the result does not arrive within
108 If `timeout` is not ``None`` and the result does not arrive within
109 `timeout` seconds then ``TimeoutError`` is raised. If the
109 `timeout` seconds then ``TimeoutError`` is raised. If the
110 remote call raised an exception then that exception will be reraised
110 remote call raised an exception then that exception will be reraised
111 by get() inside a `RemoteError`.
111 by get() inside a `RemoteError`.
112 """
112 """
113 if not self.ready():
113 if not self.ready():
114 self.wait(timeout)
114 self.wait(timeout)
115
115
116 if self._ready:
116 if self._ready:
117 if self._success:
117 if self._success:
118 return self._result
118 return self._result
119 else:
119 else:
120 raise self._exception
120 raise self._exception
121 else:
121 else:
122 raise error.TimeoutError("Result not ready.")
122 raise error.TimeoutError("Result not ready.")
123
123
124 def ready(self):
124 def ready(self):
125 """Return whether the call has completed."""
125 """Return whether the call has completed."""
126 if not self._ready:
126 if not self._ready:
127 self.wait(0)
127 self.wait(0)
128 return self._ready
128 return self._ready
129
129
130 def wait(self, timeout=-1):
130 def wait(self, timeout=-1):
131 """Wait until the result is available or until `timeout` seconds pass.
131 """Wait until the result is available or until `timeout` seconds pass.
132
132
133 This method always returns None.
133 This method always returns None.
134 """
134 """
135 if self._ready:
135 if self._ready:
136 return
136 return
137 self._ready = self._client.wait(self.msg_ids, timeout)
137 self._ready = self._client.wait(self.msg_ids, timeout)
138 if self._ready:
138 if self._ready:
139 try:
139 try:
140 results = map(self._client.results.get, self.msg_ids)
140 results = map(self._client.results.get, self.msg_ids)
141 self._result = results
141 self._result = results
142 if self._single_result:
142 if self._single_result:
143 r = results[0]
143 r = results[0]
144 if isinstance(r, Exception):
144 if isinstance(r, Exception):
145 raise r
145 raise r
146 else:
146 else:
147 results = error.collect_exceptions(results, self._fname)
147 results = error.collect_exceptions(results, self._fname)
148 self._result = self._reconstruct_result(results)
148 self._result = self._reconstruct_result(results)
149 except Exception, e:
149 except Exception, e:
150 self._exception = e
150 self._exception = e
151 self._success = False
151 self._success = False
152 else:
152 else:
153 self._success = True
153 self._success = True
154 finally:
154 finally:
155 self._metadata = map(self._client.metadata.get, self.msg_ids)
155 self._metadata = map(self._client.metadata.get, self.msg_ids)
156
156
157
157
158 def successful(self):
158 def successful(self):
159 """Return whether the call completed without raising an exception.
159 """Return whether the call completed without raising an exception.
160
160
161 Will raise ``AssertionError`` if the result is not ready.
161 Will raise ``AssertionError`` if the result is not ready.
162 """
162 """
163 assert self.ready()
163 assert self.ready()
164 return self._success
164 return self._success
165
165
166 #----------------------------------------------------------------
166 #----------------------------------------------------------------
167 # Extra methods not in mp.pool.AsyncResult
167 # Extra methods not in mp.pool.AsyncResult
168 #----------------------------------------------------------------
168 #----------------------------------------------------------------
169
169
170 def get_dict(self, timeout=-1):
170 def get_dict(self, timeout=-1):
171 """Get the results as a dict, keyed by engine_id.
171 """Get the results as a dict, keyed by engine_id.
172
172
173 timeout behavior is described in `get()`.
173 timeout behavior is described in `get()`.
174 """
174 """
175
175
176 results = self.get(timeout)
176 results = self.get(timeout)
177 engine_ids = [ md['engine_id'] for md in self._metadata ]
177 engine_ids = [ md['engine_id'] for md in self._metadata ]
178 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
178 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
179 maxcount = bycount.count(bycount[-1])
179 maxcount = bycount.count(bycount[-1])
180 if maxcount > 1:
180 if maxcount > 1:
181 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
181 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
182 maxcount, bycount[-1]))
182 maxcount, bycount[-1]))
183
183
184 return dict(zip(engine_ids,results))
184 return dict(zip(engine_ids,results))
185
185
186 @property
186 @property
187 def result(self):
187 def result(self):
188 """result property wrapper for `get(timeout=0)`."""
188 """result property wrapper for `get(timeout=0)`."""
189 return self.get()
189 return self.get()
190
190
191 # abbreviated alias:
191 # abbreviated alias:
192 r = result
192 r = result
193
193
194 @property
194 @property
195 @check_ready
195 @check_ready
196 def metadata(self):
196 def metadata(self):
197 """property for accessing execution metadata."""
197 """property for accessing execution metadata."""
198 if self._single_result:
198 if self._single_result:
199 return self._metadata[0]
199 return self._metadata[0]
200 else:
200 else:
201 return self._metadata
201 return self._metadata
202
202
203 @property
203 @property
204 def result_dict(self):
204 def result_dict(self):
205 """result property as a dict."""
205 """result property as a dict."""
206 return self.get_dict()
206 return self.get_dict()
207
207
208 def __dict__(self):
208 def __dict__(self):
209 return self.get_dict(0)
209 return self.get_dict(0)
210
210
211 def abort(self):
211 def abort(self):
212 """abort my tasks."""
212 """abort my tasks."""
213 assert not self.ready(), "Can't abort, I am already done!"
213 assert not self.ready(), "Can't abort, I am already done!"
214 return self._client.abort(self.msg_ids, targets=self._targets, block=True)
214 return self._client.abort(self.msg_ids, targets=self._targets, block=True)
215
215
216 @property
216 @property
217 def sent(self):
217 def sent(self):
218 """check whether my messages have been sent."""
218 """check whether my messages have been sent."""
219 return self._tracker.done
219 return self._tracker.done
220
220
221 def wait_for_send(self, timeout=-1):
221 def wait_for_send(self, timeout=-1):
222 """wait for pyzmq send to complete.
222 """wait for pyzmq send to complete.
223
223
224 This is necessary when sending arrays that you intend to edit in-place.
224 This is necessary when sending arrays that you intend to edit in-place.
225 `timeout` is in seconds, and will raise TimeoutError if it is reached
225 `timeout` is in seconds, and will raise TimeoutError if it is reached
226 before the send completes.
226 before the send completes.
227 """
227 """
228 return self._tracker.wait(timeout)
228 return self._tracker.wait(timeout)
229
229
230 #-------------------------------------
230 #-------------------------------------
231 # dict-access
231 # dict-access
232 #-------------------------------------
232 #-------------------------------------
233
233
234 @check_ready
234 @check_ready
235 def __getitem__(self, key):
235 def __getitem__(self, key):
236 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
236 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
237 """
237 """
238 if isinstance(key, int):
238 if isinstance(key, int):
239 return error.collect_exceptions([self._result[key]], self._fname)[0]
239 return error.collect_exceptions([self._result[key]], self._fname)[0]
240 elif isinstance(key, slice):
240 elif isinstance(key, slice):
241 return error.collect_exceptions(self._result[key], self._fname)
241 return error.collect_exceptions(self._result[key], self._fname)
242 elif isinstance(key, basestring):
242 elif isinstance(key, basestring):
243 values = [ md[key] for md in self._metadata ]
243 values = [ md[key] for md in self._metadata ]
244 if self._single_result:
244 if self._single_result:
245 return values[0]
245 return values[0]
246 else:
246 else:
247 return values
247 return values
248 else:
248 else:
249 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
249 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
250
250
251 def __getattr__(self, key):
251 def __getattr__(self, key):
252 """getattr maps to getitem for convenient attr access to metadata."""
252 """getattr maps to getitem for convenient attr access to metadata."""
253 try:
253 try:
254 return self.__getitem__(key)
254 return self.__getitem__(key)
255 except (error.TimeoutError, KeyError):
255 except (error.TimeoutError, KeyError):
256 raise AttributeError("%r object has no attribute %r"%(
256 raise AttributeError("%r object has no attribute %r"%(
257 self.__class__.__name__, key))
257 self.__class__.__name__, key))
258
258
259 # asynchronous iterator:
259 # asynchronous iterator:
260 def __iter__(self):
260 def __iter__(self):
261 if self._single_result:
261 if self._single_result:
262 raise TypeError("AsyncResults with a single result are not iterable.")
262 raise TypeError("AsyncResults with a single result are not iterable.")
263 try:
263 try:
264 rlist = self.get(0)
264 rlist = self.get(0)
265 except error.TimeoutError:
265 except error.TimeoutError:
266 # wait for each result individually
266 # wait for each result individually
267 for msg_id in self.msg_ids:
267 for msg_id in self.msg_ids:
268 ar = AsyncResult(self._client, msg_id, self._fname)
268 ar = AsyncResult(self._client, msg_id, self._fname)
269 yield ar.get()
269 yield ar.get()
270 else:
270 else:
271 # already done
271 # already done
272 for r in rlist:
272 for r in rlist:
273 yield r
273 yield r
274
274
275 def __len__(self):
275 def __len__(self):
276 return len(self.msg_ids)
276 return len(self.msg_ids)
277
277
278 #-------------------------------------
278 #-------------------------------------
279 # Sugar methods and attributes
279 # Sugar methods and attributes
280 #-------------------------------------
280 #-------------------------------------
281
281
282 def timedelta(self, start, end, start_key=min, end_key=max):
282 def timedelta(self, start, end, start_key=min, end_key=max):
283 """compute the difference between two sets of timestamps
283 """compute the difference between two sets of timestamps
284
284
285 The default behavior is to use the earliest of the first
285 The default behavior is to use the earliest of the first
286 and the latest of the second list, but this can be changed
286 and the latest of the second list, but this can be changed
287 by passing a different
287 by passing a different
288
288
289 Parameters
289 Parameters
290 ----------
290 ----------
291
291
292 start : one or more datetime objects (e.g. ar.submitted)
292 start : one or more datetime objects (e.g. ar.submitted)
293 end : one or more datetime objects (e.g. ar.received)
293 end : one or more datetime objects (e.g. ar.received)
294 start_key : callable
294 start_key : callable
295 Function to call on `start` to extract the relevant
295 Function to call on `start` to extract the relevant
296 entry [defalt: min]
296 entry [defalt: min]
297 end_key : callable
297 end_key : callable
298 Function to call on `end` to extract the relevant
298 Function to call on `end` to extract the relevant
299 entry [default: max]
299 entry [default: max]
300
300
301 Returns
301 Returns
302 -------
302 -------
303
303
304 dt : float
304 dt : float
305 The time elapsed (in seconds) between the two selected timestamps.
305 The time elapsed (in seconds) between the two selected timestamps.
306 """
306 """
307 if not isinstance(start, datetime):
307 if not isinstance(start, datetime):
308 # handle single_result AsyncResults, where ar.stamp is single object,
308 # handle single_result AsyncResults, where ar.stamp is single object,
309 # not a list
309 # not a list
310 start = start_key(start)
310 start = start_key(start)
311 if not isinstance(end, datetime):
311 if not isinstance(end, datetime):
312 # handle single_result AsyncResults, where ar.stamp is single object,
312 # handle single_result AsyncResults, where ar.stamp is single object,
313 # not a list
313 # not a list
314 end = end_key(end)
314 end = end_key(end)
315 return _total_seconds(end - start)
315 return _total_seconds(end - start)
316
316
317 @property
317 @property
318 def progress(self):
318 def progress(self):
319 """the number of tasks which have been completed at this point.
319 """the number of tasks which have been completed at this point.
320
320
321 Fractional progress would be given by 1.0 * ar.progress / len(ar)
321 Fractional progress would be given by 1.0 * ar.progress / len(ar)
322 """
322 """
323 self.wait(0)
323 self.wait(0)
324 return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding))
324 return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding))
325
325
326 @property
326 @property
327 def elapsed(self):
327 def elapsed(self):
328 """elapsed time since initial submission"""
328 """elapsed time since initial submission"""
329 if self.ready():
329 if self.ready():
330 return self.wall_time
330 return self.wall_time
331
331
332 now = submitted = datetime.now()
332 now = submitted = datetime.now()
333 for msg_id in self.msg_ids:
333 for msg_id in self.msg_ids:
334 if msg_id in self._client.metadata:
334 if msg_id in self._client.metadata:
335 stamp = self._client.metadata[msg_id]['submitted']
335 stamp = self._client.metadata[msg_id]['submitted']
336 if stamp and stamp < submitted:
336 if stamp and stamp < submitted:
337 submitted = stamp
337 submitted = stamp
338 return _total_seconds(now-submitted)
338 return _total_seconds(now-submitted)
339
339
340 @property
340 @property
341 @check_ready
341 @check_ready
342 def serial_time(self):
342 def serial_time(self):
343 """serial computation time of a parallel calculation
343 """serial computation time of a parallel calculation
344
344
345 Computed as the sum of (completed-started) of each task
345 Computed as the sum of (completed-started) of each task
346 """
346 """
347 t = 0
347 t = 0
348 for md in self._metadata:
348 for md in self._metadata:
349 t += _total_seconds(md['completed'] - md['started'])
349 t += _total_seconds(md['completed'] - md['started'])
350 return t
350 return t
351
351
352 @property
352 @property
353 @check_ready
353 @check_ready
354 def wall_time(self):
354 def wall_time(self):
355 """actual computation time of a parallel calculation
355 """actual computation time of a parallel calculation
356
356
357 Computed as the time between the latest `received` stamp
357 Computed as the time between the latest `received` stamp
358 and the earliest `submitted`.
358 and the earliest `submitted`.
359
359
360 Only reliable if Client was spinning/waiting when the task finished, because
360 Only reliable if Client was spinning/waiting when the task finished, because
361 the `received` timestamp is created when a result is pulled off of the zmq queue,
361 the `received` timestamp is created when a result is pulled off of the zmq queue,
362 which happens as a result of `client.spin()`.
362 which happens as a result of `client.spin()`.
363
363
364 For similar comparison of other timestamp pairs, check out AsyncResult.timedelta.
364 For similar comparison of other timestamp pairs, check out AsyncResult.timedelta.
365
365
366 """
366 """
367 return self.timedelta(self.submitted, self.received)
367 return self.timedelta(self.submitted, self.received)
368
368
369 def wait_interactive(self, interval=1., timeout=None):
369 def wait_interactive(self, interval=1., timeout=None):
370 """interactive wait, printing progress at regular intervals"""
370 """interactive wait, printing progress at regular intervals"""
371 N = len(self)
371 N = len(self)
372 tic = time.time()
372 tic = time.time()
373 while not self.ready() and (timeout is None or time.time() - tic <= timeout):
373 while not self.ready() and (timeout is None or time.time() - tic <= timeout):
374 self.wait(interval)
374 self.wait(interval)
375 clear_output()
375 clear_output()
376 print "%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed),
376 print "%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed),
377 sys.stdout.flush()
377 sys.stdout.flush()
378 print
378 print
379 print "done"
379 print "done"
380
380
381 def _republish_displaypub(self, content, eid):
381 def _republish_displaypub(self, content, eid):
382 """republish individual displaypub content dicts"""
382 """republish individual displaypub content dicts"""
383 try:
383 try:
384 ip = get_ipython()
384 ip = get_ipython()
385 except NameError:
385 except NameError:
386 # displaypub is meaningless outside IPython
386 # displaypub is meaningless outside IPython
387 return
387 return
388 md = content['metadata'] or {}
388 md = content['metadata'] or {}
389 md['engine'] = eid
389 md['engine'] = eid
390 ip.display_pub.publish(content['source'], content['data'], md)
390 ip.display_pub.publish(content['source'], content['data'], md)
391
391
392
392
393 def _display_single_result(self):
393 def _display_single_result(self):
394
394
395 print self.stdout
395 print self.stdout
396 print >> sys.stderr, self.stderr
396 print >> sys.stderr, self.stderr
397
397
398 try:
398 try:
399 get_ipython()
399 get_ipython()
400 except NameError:
400 except NameError:
401 # displaypub is meaningless outside IPython
401 # displaypub is meaningless outside IPython
402 return
402 return
403
403
404 for output in self.outputs:
404 for output in self.outputs:
405 self._republish_displaypub(output, self.engine_id)
405 self._republish_displaypub(output, self.engine_id)
406
406
407 if self.pyout is not None:
407 if self.pyout is not None:
408 display(self.get())
408 display(self.get())
409
409
410 @check_ready
410 @check_ready
411 def display_outputs(self, groupby="type"):
411 def display_outputs(self, groupby="type"):
412 """republish the outputs of the computation
412 """republish the outputs of the computation
413
413
414 Parameters
414 Parameters
415 ----------
415 ----------
416
416
417 groupby : str [default: type]
417 groupby : str [default: type]
418 if 'type':
418 if 'type':
419 Group outputs by type (show all stdout, then all stderr, etc.):
419 Group outputs by type (show all stdout, then all stderr, etc.):
420
420
421 [stdout:1] foo
421 [stdout:1] foo
422 [stdout:2] foo
422 [stdout:2] foo
423 [stderr:1] bar
423 [stderr:1] bar
424 [stderr:2] bar
424 [stderr:2] bar
425 if 'engine':
425 if 'engine':
426 Display outputs for each engine before moving on to the next:
426 Display outputs for each engine before moving on to the next:
427
427
428 [stdout:1] foo
428 [stdout:1] foo
429 [stderr:1] bar
429 [stderr:1] bar
430 [stdout:2] foo
430 [stdout:2] foo
431 [stderr:2] bar
431 [stderr:2] bar
432
432
433 if 'order':
433 if 'order':
434 Like 'type', but further collate individual displaypub
434 Like 'type', but further collate individual displaypub
435 outputs. This is meant for cases of each command producing
435 outputs. This is meant for cases of each command producing
436 several plots, and you would like to see all of the first
436 several plots, and you would like to see all of the first
437 plots together, then all of the second plots, and so on.
437 plots together, then all of the second plots, and so on.
438 """
438 """
439 # flush iopub, just in case
439 # flush iopub, just in case
440 self._client._flush_iopub(self._client._iopub_socket)
440 self._client._flush_iopub(self._client._iopub_socket)
441 if self._single_result:
441 if self._single_result:
442 self._display_single_result()
442 self._display_single_result()
443 return
443 return
444
444
445 stdouts = [s.rstrip() for s in self.stdout]
445 stdouts = [s.rstrip() for s in self.stdout]
446 stderrs = [s.rstrip() for s in self.stderr]
446 stderrs = [s.rstrip() for s in self.stderr]
447 pyouts = [p for p in self.pyout]
447 pyouts = [p for p in self.pyout]
448 output_lists = self.outputs
448 output_lists = self.outputs
449 results = self.get()
449 results = self.get()
450
450
451 targets = self.engine_id
451 targets = self.engine_id
452
452
453 if groupby == "engine":
453 if groupby == "engine":
454 for eid,stdout,stderr,outputs,r,pyout in zip(
454 for eid,stdout,stderr,outputs,r,pyout in zip(
455 targets, stdouts, stderrs, output_lists, results, pyouts
455 targets, stdouts, stderrs, output_lists, results, pyouts
456 ):
456 ):
457 if stdout:
457 if stdout:
458 print '[stdout:%2i]' % eid, stdout
458 print '[stdout:%i]' % eid, stdout
459 if stderr:
459 if stderr:
460 print >> sys.stderr, '[stderr:%2i]' % eid, stderr
460 print >> sys.stderr, '[stderr:%i]' % eid, stderr
461
461
462 try:
462 try:
463 get_ipython()
463 get_ipython()
464 except NameError:
464 except NameError:
465 # displaypub is meaningless outside IPython
465 # displaypub is meaningless outside IPython
466 return
466 return
467
467
468 for output in outputs:
468 for output in outputs:
469 self._republish_displaypub(output, eid)
469 self._republish_displaypub(output, eid)
470
470
471 if pyout is not None:
471 if pyout is not None:
472 display(r)
472 display(r)
473
473
474 elif groupby in ('type', 'order'):
474 elif groupby in ('type', 'order'):
475 # republish stdout:
475 # republish stdout:
476 if any(stdouts):
476 if any(stdouts):
477 for eid,stdout in zip(targets, stdouts):
477 for eid,stdout in zip(targets, stdouts):
478 print '[stdout:%2i]' % eid, stdout
478 print '[stdout:%i]' % eid, stdout
479
479
480 # republish stderr:
480 # republish stderr:
481 if any(stderrs):
481 if any(stderrs):
482 for eid,stderr in zip(targets, stderrs):
482 for eid,stderr in zip(targets, stderrs):
483 print >> sys.stderr, '[stderr:%2i]' % eid, stderr
483 print >> sys.stderr, '[stderr:%i]' % eid, stderr
484
484
485 try:
485 try:
486 get_ipython()
486 get_ipython()
487 except NameError:
487 except NameError:
488 # displaypub is meaningless outside IPython
488 # displaypub is meaningless outside IPython
489 return
489 return
490
490
491 if groupby == 'order':
491 if groupby == 'order':
492 output_dict = dict((eid, outputs) for eid,outputs in zip(targets, output_lists))
492 output_dict = dict((eid, outputs) for eid,outputs in zip(targets, output_lists))
493 N = max(len(outputs) for outputs in output_lists)
493 N = max(len(outputs) for outputs in output_lists)
494 for i in range(N):
494 for i in range(N):
495 for eid in targets:
495 for eid in targets:
496 outputs = output_dict[eid]
496 outputs = output_dict[eid]
497 if len(outputs) >= N:
497 if len(outputs) >= N:
498 self._republish_displaypub(outputs[i], eid)
498 self._republish_displaypub(outputs[i], eid)
499 else:
499 else:
500 # republish displaypub output
500 # republish displaypub output
501 for eid,outputs in zip(targets, output_lists):
501 for eid,outputs in zip(targets, output_lists):
502 for output in outputs:
502 for output in outputs:
503 self._republish_displaypub(output, eid)
503 self._republish_displaypub(output, eid)
504
504
505 # finally, add pyout:
505 # finally, add pyout:
506 for eid,r,pyout in zip(targets, results, pyouts):
506 for eid,r,pyout in zip(targets, results, pyouts):
507 if pyout is not None:
507 if pyout is not None:
508 display(r)
508 display(r)
509
509
510 else:
510 else:
511 raise ValueError("groupby must be one of 'type', 'engine', 'collate', not %r" % groupby)
511 raise ValueError("groupby must be one of 'type', 'engine', 'collate', not %r" % groupby)
512
512
513
513
514
514
515
515
516 class AsyncMapResult(AsyncResult):
516 class AsyncMapResult(AsyncResult):
517 """Class for representing results of non-blocking gathers.
517 """Class for representing results of non-blocking gathers.
518
518
519 This will properly reconstruct the gather.
519 This will properly reconstruct the gather.
520
520
521 This class is iterable at any time, and will wait on results as they come.
521 This class is iterable at any time, and will wait on results as they come.
522
522
523 If ordered=False, then the first results to arrive will come first, otherwise
523 If ordered=False, then the first results to arrive will come first, otherwise
524 results will be yielded in the order they were submitted.
524 results will be yielded in the order they were submitted.
525
525
526 """
526 """
527
527
528 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
528 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
529 AsyncResult.__init__(self, client, msg_ids, fname=fname)
529 AsyncResult.__init__(self, client, msg_ids, fname=fname)
530 self._mapObject = mapObject
530 self._mapObject = mapObject
531 self._single_result = False
531 self._single_result = False
532 self.ordered = ordered
532 self.ordered = ordered
533
533
534 def _reconstruct_result(self, res):
534 def _reconstruct_result(self, res):
535 """Perform the gather on the actual results."""
535 """Perform the gather on the actual results."""
536 return self._mapObject.joinPartitions(res)
536 return self._mapObject.joinPartitions(res)
537
537
538 # asynchronous iterator:
538 # asynchronous iterator:
539 def __iter__(self):
539 def __iter__(self):
540 it = self._ordered_iter if self.ordered else self._unordered_iter
540 it = self._ordered_iter if self.ordered else self._unordered_iter
541 for r in it():
541 for r in it():
542 yield r
542 yield r
543
543
544 # asynchronous ordered iterator:
544 # asynchronous ordered iterator:
545 def _ordered_iter(self):
545 def _ordered_iter(self):
546 """iterator for results *as they arrive*, preserving submission order."""
546 """iterator for results *as they arrive*, preserving submission order."""
547 try:
547 try:
548 rlist = self.get(0)
548 rlist = self.get(0)
549 except error.TimeoutError:
549 except error.TimeoutError:
550 # wait for each result individually
550 # wait for each result individually
551 for msg_id in self.msg_ids:
551 for msg_id in self.msg_ids:
552 ar = AsyncResult(self._client, msg_id, self._fname)
552 ar = AsyncResult(self._client, msg_id, self._fname)
553 rlist = ar.get()
553 rlist = ar.get()
554 try:
554 try:
555 for r in rlist:
555 for r in rlist:
556 yield r
556 yield r
557 except TypeError:
557 except TypeError:
558 # flattened, not a list
558 # flattened, not a list
559 # this could get broken by flattened data that returns iterables
559 # this could get broken by flattened data that returns iterables
560 # but most calls to map do not expose the `flatten` argument
560 # but most calls to map do not expose the `flatten` argument
561 yield rlist
561 yield rlist
562 else:
562 else:
563 # already done
563 # already done
564 for r in rlist:
564 for r in rlist:
565 yield r
565 yield r
566
566
567 # asynchronous unordered iterator:
567 # asynchronous unordered iterator:
568 def _unordered_iter(self):
568 def _unordered_iter(self):
569 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
569 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
570 try:
570 try:
571 rlist = self.get(0)
571 rlist = self.get(0)
572 except error.TimeoutError:
572 except error.TimeoutError:
573 pending = set(self.msg_ids)
573 pending = set(self.msg_ids)
574 while pending:
574 while pending:
575 try:
575 try:
576 self._client.wait(pending, 1e-3)
576 self._client.wait(pending, 1e-3)
577 except error.TimeoutError:
577 except error.TimeoutError:
578 # ignore timeout error, because that only means
578 # ignore timeout error, because that only means
579 # *some* jobs are outstanding
579 # *some* jobs are outstanding
580 pass
580 pass
581 # update ready set with those no longer outstanding:
581 # update ready set with those no longer outstanding:
582 ready = pending.difference(self._client.outstanding)
582 ready = pending.difference(self._client.outstanding)
583 # update pending to exclude those that are finished
583 # update pending to exclude those that are finished
584 pending = pending.difference(ready)
584 pending = pending.difference(ready)
585 while ready:
585 while ready:
586 msg_id = ready.pop()
586 msg_id = ready.pop()
587 ar = AsyncResult(self._client, msg_id, self._fname)
587 ar = AsyncResult(self._client, msg_id, self._fname)
588 rlist = ar.get()
588 rlist = ar.get()
589 try:
589 try:
590 for r in rlist:
590 for r in rlist:
591 yield r
591 yield r
592 except TypeError:
592 except TypeError:
593 # flattened, not a list
593 # flattened, not a list
594 # this could get broken by flattened data that returns iterables
594 # this could get broken by flattened data that returns iterables
595 # but most calls to map do not expose the `flatten` argument
595 # but most calls to map do not expose the `flatten` argument
596 yield rlist
596 yield rlist
597 else:
597 else:
598 # already done
598 # already done
599 for r in rlist:
599 for r in rlist:
600 yield r
600 yield r
601
601
602
602
603
603
604 class AsyncHubResult(AsyncResult):
604 class AsyncHubResult(AsyncResult):
605 """Class to wrap pending results that must be requested from the Hub.
605 """Class to wrap pending results that must be requested from the Hub.
606
606
607 Note that waiting/polling on these objects requires polling the Hubover the network,
607 Note that waiting/polling on these objects requires polling the Hubover the network,
608 so use `AsyncHubResult.wait()` sparingly.
608 so use `AsyncHubResult.wait()` sparingly.
609 """
609 """
610
610
611 def wait(self, timeout=-1):
611 def wait(self, timeout=-1):
612 """wait for result to complete."""
612 """wait for result to complete."""
613 start = time.time()
613 start = time.time()
614 if self._ready:
614 if self._ready:
615 return
615 return
616 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
616 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
617 local_ready = self._client.wait(local_ids, timeout)
617 local_ready = self._client.wait(local_ids, timeout)
618 if local_ready:
618 if local_ready:
619 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
619 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
620 if not remote_ids:
620 if not remote_ids:
621 self._ready = True
621 self._ready = True
622 else:
622 else:
623 rdict = self._client.result_status(remote_ids, status_only=False)
623 rdict = self._client.result_status(remote_ids, status_only=False)
624 pending = rdict['pending']
624 pending = rdict['pending']
625 while pending and (timeout < 0 or time.time() < start+timeout):
625 while pending and (timeout < 0 or time.time() < start+timeout):
626 rdict = self._client.result_status(remote_ids, status_only=False)
626 rdict = self._client.result_status(remote_ids, status_only=False)
627 pending = rdict['pending']
627 pending = rdict['pending']
628 if pending:
628 if pending:
629 time.sleep(0.1)
629 time.sleep(0.1)
630 if not pending:
630 if not pending:
631 self._ready = True
631 self._ready = True
632 if self._ready:
632 if self._ready:
633 try:
633 try:
634 results = map(self._client.results.get, self.msg_ids)
634 results = map(self._client.results.get, self.msg_ids)
635 self._result = results
635 self._result = results
636 if self._single_result:
636 if self._single_result:
637 r = results[0]
637 r = results[0]
638 if isinstance(r, Exception):
638 if isinstance(r, Exception):
639 raise r
639 raise r
640 else:
640 else:
641 results = error.collect_exceptions(results, self._fname)
641 results = error.collect_exceptions(results, self._fname)
642 self._result = self._reconstruct_result(results)
642 self._result = self._reconstruct_result(results)
643 except Exception, e:
643 except Exception, e:
644 self._exception = e
644 self._exception = e
645 self._success = False
645 self._success = False
646 else:
646 else:
647 self._success = True
647 self._success = True
648 finally:
648 finally:
649 self._metadata = map(self._client.metadata.get, self.msg_ids)
649 self._metadata = map(self._client.metadata.get, self.msg_ids)
650
650
651 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult'] No newline at end of file
651 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']
@@ -1,1655 +1,1655 b''
1 """A semi-synchronous Client for the ZMQ cluster
1 """A semi-synchronous Client for the ZMQ cluster
2
2
3 Authors:
3 Authors:
4
4
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import json
19 import json
20 import sys
20 import sys
21 from threading import Thread, Event
21 from threading import Thread, Event
22 import time
22 import time
23 import warnings
23 import warnings
24 from datetime import datetime
24 from datetime import datetime
25 from getpass import getpass
25 from getpass import getpass
26 from pprint import pprint
26 from pprint import pprint
27
27
28 pjoin = os.path.join
28 pjoin = os.path.join
29
29
30 import zmq
30 import zmq
31 # from zmq.eventloop import ioloop, zmqstream
31 # from zmq.eventloop import ioloop, zmqstream
32
32
33 from IPython.config.configurable import MultipleInstanceError
33 from IPython.config.configurable import MultipleInstanceError
34 from IPython.core.application import BaseIPythonApplication
34 from IPython.core.application import BaseIPythonApplication
35
35
36 from IPython.utils.coloransi import TermColors
36 from IPython.utils.coloransi import TermColors
37 from IPython.utils.jsonutil import rekey
37 from IPython.utils.jsonutil import rekey
38 from IPython.utils.localinterfaces import LOCAL_IPS
38 from IPython.utils.localinterfaces import LOCAL_IPS
39 from IPython.utils.path import get_ipython_dir
39 from IPython.utils.path import get_ipython_dir
40 from IPython.utils.py3compat import cast_bytes
40 from IPython.utils.py3compat import cast_bytes
41 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
41 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
42 Dict, List, Bool, Set, Any)
42 Dict, List, Bool, Set, Any)
43 from IPython.external.decorator import decorator
43 from IPython.external.decorator import decorator
44 from IPython.external.ssh import tunnel
44 from IPython.external.ssh import tunnel
45
45
46 from IPython.parallel import Reference
46 from IPython.parallel import Reference
47 from IPython.parallel import error
47 from IPython.parallel import error
48 from IPython.parallel import util
48 from IPython.parallel import util
49
49
50 from IPython.zmq.session import Session, Message
50 from IPython.zmq.session import Session, Message
51
51
52 from .asyncresult import AsyncResult, AsyncHubResult
52 from .asyncresult import AsyncResult, AsyncHubResult
53 from IPython.core.profiledir import ProfileDir, ProfileDirError
53 from IPython.core.profiledir import ProfileDir, ProfileDirError
54 from .view import DirectView, LoadBalancedView
54 from .view import DirectView, LoadBalancedView
55
55
56 if sys.version_info[0] >= 3:
56 if sys.version_info[0] >= 3:
57 # xrange is used in a couple 'isinstance' tests in py2
57 # xrange is used in a couple 'isinstance' tests in py2
58 # should be just 'range' in 3k
58 # should be just 'range' in 3k
59 xrange = range
59 xrange = range
60
60
61 #--------------------------------------------------------------------------
61 #--------------------------------------------------------------------------
62 # Decorators for Client methods
62 # Decorators for Client methods
63 #--------------------------------------------------------------------------
63 #--------------------------------------------------------------------------
64
64
65 @decorator
65 @decorator
66 def spin_first(f, self, *args, **kwargs):
66 def spin_first(f, self, *args, **kwargs):
67 """Call spin() to sync state prior to calling the method."""
67 """Call spin() to sync state prior to calling the method."""
68 self.spin()
68 self.spin()
69 return f(self, *args, **kwargs)
69 return f(self, *args, **kwargs)
70
70
71
71
72 #--------------------------------------------------------------------------
72 #--------------------------------------------------------------------------
73 # Classes
73 # Classes
74 #--------------------------------------------------------------------------
74 #--------------------------------------------------------------------------
75
75
76
76
77 class ExecuteReply(object):
77 class ExecuteReply(object):
78 """wrapper for finished Execute results"""
78 """wrapper for finished Execute results"""
79 def __init__(self, msg_id, content, metadata):
79 def __init__(self, msg_id, content, metadata):
80 self.msg_id = msg_id
80 self.msg_id = msg_id
81 self._content = content
81 self._content = content
82 self.execution_count = content['execution_count']
82 self.execution_count = content['execution_count']
83 self.metadata = metadata
83 self.metadata = metadata
84
84
85 def __getitem__(self, key):
85 def __getitem__(self, key):
86 return self.metadata[key]
86 return self.metadata[key]
87
87
88 def __getattr__(self, key):
88 def __getattr__(self, key):
89 if key not in self.metadata:
89 if key not in self.metadata:
90 raise AttributeError(key)
90 raise AttributeError(key)
91 return self.metadata[key]
91 return self.metadata[key]
92
92
93 def __repr__(self):
93 def __repr__(self):
94 pyout = self.metadata['pyout'] or {'data':{}}
94 pyout = self.metadata['pyout'] or {'data':{}}
95 text_out = pyout['data'].get('text/plain', '')
95 text_out = pyout['data'].get('text/plain', '')
96 if len(text_out) > 32:
96 if len(text_out) > 32:
97 text_out = text_out[:29] + '...'
97 text_out = text_out[:29] + '...'
98
98
99 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
99 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
100
100
101 def _repr_pretty_(self, p, cycle):
101 def _repr_pretty_(self, p, cycle):
102 pyout = self.metadata['pyout'] or {'data':{}}
102 pyout = self.metadata['pyout'] or {'data':{}}
103 text_out = pyout['data'].get('text/plain', '')
103 text_out = pyout['data'].get('text/plain', '')
104
104
105 if not text_out:
105 if not text_out:
106 return
106 return
107
107
108 try:
108 try:
109 ip = get_ipython()
109 ip = get_ipython()
110 except NameError:
110 except NameError:
111 colors = "NoColor"
111 colors = "NoColor"
112 else:
112 else:
113 colors = ip.colors
113 colors = ip.colors
114
114
115 if colors == "NoColor":
115 if colors == "NoColor":
116 out = normal = ""
116 out = normal = ""
117 else:
117 else:
118 out = TermColors.Red
118 out = TermColors.Red
119 normal = TermColors.Normal
119 normal = TermColors.Normal
120
120
121 p.text(
121 p.text(
122 u'[%2i] ' % self.metadata['engine_id'] +
122 u'[%i] ' % self.metadata['engine_id'] +
123 out + u'Out[%2i]: ' % self.execution_count +
123 out + u'Out[%i]: ' % self.execution_count +
124 normal + text_out
124 normal + text_out
125 )
125 )
126
126
127 def _repr_html_(self):
127 def _repr_html_(self):
128 pyout = self.metadata['pyout'] or {'data':{}}
128 pyout = self.metadata['pyout'] or {'data':{}}
129 return pyout['data'].get("text/html")
129 return pyout['data'].get("text/html")
130
130
131 def _repr_latex_(self):
131 def _repr_latex_(self):
132 pyout = self.metadata['pyout'] or {'data':{}}
132 pyout = self.metadata['pyout'] or {'data':{}}
133 return pyout['data'].get("text/latex")
133 return pyout['data'].get("text/latex")
134
134
135 def _repr_json_(self):
135 def _repr_json_(self):
136 pyout = self.metadata['pyout'] or {'data':{}}
136 pyout = self.metadata['pyout'] or {'data':{}}
137 return pyout['data'].get("application/json")
137 return pyout['data'].get("application/json")
138
138
139 def _repr_javascript_(self):
139 def _repr_javascript_(self):
140 pyout = self.metadata['pyout'] or {'data':{}}
140 pyout = self.metadata['pyout'] or {'data':{}}
141 return pyout['data'].get("application/javascript")
141 return pyout['data'].get("application/javascript")
142
142
143 def _repr_png_(self):
143 def _repr_png_(self):
144 pyout = self.metadata['pyout'] or {'data':{}}
144 pyout = self.metadata['pyout'] or {'data':{}}
145 return pyout['data'].get("image/png")
145 return pyout['data'].get("image/png")
146
146
147 def _repr_jpeg_(self):
147 def _repr_jpeg_(self):
148 pyout = self.metadata['pyout'] or {'data':{}}
148 pyout = self.metadata['pyout'] or {'data':{}}
149 return pyout['data'].get("image/jpeg")
149 return pyout['data'].get("image/jpeg")
150
150
151 def _repr_svg_(self):
151 def _repr_svg_(self):
152 pyout = self.metadata['pyout'] or {'data':{}}
152 pyout = self.metadata['pyout'] or {'data':{}}
153 return pyout['data'].get("image/svg+xml")
153 return pyout['data'].get("image/svg+xml")
154
154
155
155
156 class Metadata(dict):
156 class Metadata(dict):
157 """Subclass of dict for initializing metadata values.
157 """Subclass of dict for initializing metadata values.
158
158
159 Attribute access works on keys.
159 Attribute access works on keys.
160
160
161 These objects have a strict set of keys - errors will raise if you try
161 These objects have a strict set of keys - errors will raise if you try
162 to add new keys.
162 to add new keys.
163 """
163 """
164 def __init__(self, *args, **kwargs):
164 def __init__(self, *args, **kwargs):
165 dict.__init__(self)
165 dict.__init__(self)
166 md = {'msg_id' : None,
166 md = {'msg_id' : None,
167 'submitted' : None,
167 'submitted' : None,
168 'started' : None,
168 'started' : None,
169 'completed' : None,
169 'completed' : None,
170 'received' : None,
170 'received' : None,
171 'engine_uuid' : None,
171 'engine_uuid' : None,
172 'engine_id' : None,
172 'engine_id' : None,
173 'follow' : None,
173 'follow' : None,
174 'after' : None,
174 'after' : None,
175 'status' : None,
175 'status' : None,
176
176
177 'pyin' : None,
177 'pyin' : None,
178 'pyout' : None,
178 'pyout' : None,
179 'pyerr' : None,
179 'pyerr' : None,
180 'stdout' : '',
180 'stdout' : '',
181 'stderr' : '',
181 'stderr' : '',
182 'outputs' : [],
182 'outputs' : [],
183 }
183 }
184 self.update(md)
184 self.update(md)
185 self.update(dict(*args, **kwargs))
185 self.update(dict(*args, **kwargs))
186
186
187 def __getattr__(self, key):
187 def __getattr__(self, key):
188 """getattr aliased to getitem"""
188 """getattr aliased to getitem"""
189 if key in self.iterkeys():
189 if key in self.iterkeys():
190 return self[key]
190 return self[key]
191 else:
191 else:
192 raise AttributeError(key)
192 raise AttributeError(key)
193
193
194 def __setattr__(self, key, value):
194 def __setattr__(self, key, value):
195 """setattr aliased to setitem, with strict"""
195 """setattr aliased to setitem, with strict"""
196 if key in self.iterkeys():
196 if key in self.iterkeys():
197 self[key] = value
197 self[key] = value
198 else:
198 else:
199 raise AttributeError(key)
199 raise AttributeError(key)
200
200
201 def __setitem__(self, key, value):
201 def __setitem__(self, key, value):
202 """strict static key enforcement"""
202 """strict static key enforcement"""
203 if key in self.iterkeys():
203 if key in self.iterkeys():
204 dict.__setitem__(self, key, value)
204 dict.__setitem__(self, key, value)
205 else:
205 else:
206 raise KeyError(key)
206 raise KeyError(key)
207
207
208
208
209 class Client(HasTraits):
209 class Client(HasTraits):
210 """A semi-synchronous client to the IPython ZMQ cluster
210 """A semi-synchronous client to the IPython ZMQ cluster
211
211
212 Parameters
212 Parameters
213 ----------
213 ----------
214
214
215 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
215 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
216 Connection information for the Hub's registration. If a json connector
216 Connection information for the Hub's registration. If a json connector
217 file is given, then likely no further configuration is necessary.
217 file is given, then likely no further configuration is necessary.
218 [Default: use profile]
218 [Default: use profile]
219 profile : bytes
219 profile : bytes
220 The name of the Cluster profile to be used to find connector information.
220 The name of the Cluster profile to be used to find connector information.
221 If run from an IPython application, the default profile will be the same
221 If run from an IPython application, the default profile will be the same
222 as the running application, otherwise it will be 'default'.
222 as the running application, otherwise it will be 'default'.
223 context : zmq.Context
223 context : zmq.Context
224 Pass an existing zmq.Context instance, otherwise the client will create its own.
224 Pass an existing zmq.Context instance, otherwise the client will create its own.
225 debug : bool
225 debug : bool
226 flag for lots of message printing for debug purposes
226 flag for lots of message printing for debug purposes
227 timeout : int/float
227 timeout : int/float
228 time (in seconds) to wait for connection replies from the Hub
228 time (in seconds) to wait for connection replies from the Hub
229 [Default: 10]
229 [Default: 10]
230
230
231 #-------------- session related args ----------------
231 #-------------- session related args ----------------
232
232
233 config : Config object
233 config : Config object
234 If specified, this will be relayed to the Session for configuration
234 If specified, this will be relayed to the Session for configuration
235 username : str
235 username : str
236 set username for the session object
236 set username for the session object
237 packer : str (import_string) or callable
237 packer : str (import_string) or callable
238 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
238 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
239 function to serialize messages. Must support same input as
239 function to serialize messages. Must support same input as
240 JSON, and output must be bytes.
240 JSON, and output must be bytes.
241 You can pass a callable directly as `pack`
241 You can pass a callable directly as `pack`
242 unpacker : str (import_string) or callable
242 unpacker : str (import_string) or callable
243 The inverse of packer. Only necessary if packer is specified as *not* one
243 The inverse of packer. Only necessary if packer is specified as *not* one
244 of 'json' or 'pickle'.
244 of 'json' or 'pickle'.
245
245
246 #-------------- ssh related args ----------------
246 #-------------- ssh related args ----------------
247 # These are args for configuring the ssh tunnel to be used
247 # These are args for configuring the ssh tunnel to be used
248 # credentials are used to forward connections over ssh to the Controller
248 # credentials are used to forward connections over ssh to the Controller
249 # Note that the ip given in `addr` needs to be relative to sshserver
249 # Note that the ip given in `addr` needs to be relative to sshserver
250 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
250 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
251 # and set sshserver as the same machine the Controller is on. However,
251 # and set sshserver as the same machine the Controller is on. However,
252 # the only requirement is that sshserver is able to see the Controller
252 # the only requirement is that sshserver is able to see the Controller
253 # (i.e. is within the same trusted network).
253 # (i.e. is within the same trusted network).
254
254
255 sshserver : str
255 sshserver : str
256 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
256 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
257 If keyfile or password is specified, and this is not, it will default to
257 If keyfile or password is specified, and this is not, it will default to
258 the ip given in addr.
258 the ip given in addr.
259 sshkey : str; path to ssh private key file
259 sshkey : str; path to ssh private key file
260 This specifies a key to be used in ssh login, default None.
260 This specifies a key to be used in ssh login, default None.
261 Regular default ssh keys will be used without specifying this argument.
261 Regular default ssh keys will be used without specifying this argument.
262 password : str
262 password : str
263 Your ssh password to sshserver. Note that if this is left None,
263 Your ssh password to sshserver. Note that if this is left None,
264 you will be prompted for it if passwordless key based login is unavailable.
264 you will be prompted for it if passwordless key based login is unavailable.
265 paramiko : bool
265 paramiko : bool
266 flag for whether to use paramiko instead of shell ssh for tunneling.
266 flag for whether to use paramiko instead of shell ssh for tunneling.
267 [default: True on win32, False else]
267 [default: True on win32, False else]
268
268
269 ------- exec authentication args -------
269 ------- exec authentication args -------
270 If even localhost is untrusted, you can have some protection against
270 If even localhost is untrusted, you can have some protection against
271 unauthorized execution by signing messages with HMAC digests.
271 unauthorized execution by signing messages with HMAC digests.
272 Messages are still sent as cleartext, so if someone can snoop your
272 Messages are still sent as cleartext, so if someone can snoop your
273 loopback traffic this will not protect your privacy, but will prevent
273 loopback traffic this will not protect your privacy, but will prevent
274 unauthorized execution.
274 unauthorized execution.
275
275
276 exec_key : str
276 exec_key : str
277 an authentication key or file containing a key
277 an authentication key or file containing a key
278 default: None
278 default: None
279
279
280
280
281 Attributes
281 Attributes
282 ----------
282 ----------
283
283
284 ids : list of int engine IDs
284 ids : list of int engine IDs
285 requesting the ids attribute always synchronizes
285 requesting the ids attribute always synchronizes
286 the registration state. To request ids without synchronization,
286 the registration state. To request ids without synchronization,
287 use semi-private _ids attributes.
287 use semi-private _ids attributes.
288
288
289 history : list of msg_ids
289 history : list of msg_ids
290 a list of msg_ids, keeping track of all the execution
290 a list of msg_ids, keeping track of all the execution
291 messages you have submitted in order.
291 messages you have submitted in order.
292
292
293 outstanding : set of msg_ids
293 outstanding : set of msg_ids
294 a set of msg_ids that have been submitted, but whose
294 a set of msg_ids that have been submitted, but whose
295 results have not yet been received.
295 results have not yet been received.
296
296
297 results : dict
297 results : dict
298 a dict of all our results, keyed by msg_id
298 a dict of all our results, keyed by msg_id
299
299
300 block : bool
300 block : bool
301 determines default behavior when block not specified
301 determines default behavior when block not specified
302 in execution methods
302 in execution methods
303
303
304 Methods
304 Methods
305 -------
305 -------
306
306
307 spin
307 spin
308 flushes incoming results and registration state changes
308 flushes incoming results and registration state changes
309 control methods spin, and requesting `ids` also ensures up to date
309 control methods spin, and requesting `ids` also ensures up to date
310
310
311 wait
311 wait
312 wait on one or more msg_ids
312 wait on one or more msg_ids
313
313
314 execution methods
314 execution methods
315 apply
315 apply
316 legacy: execute, run
316 legacy: execute, run
317
317
318 data movement
318 data movement
319 push, pull, scatter, gather
319 push, pull, scatter, gather
320
320
321 query methods
321 query methods
322 queue_status, get_result, purge, result_status
322 queue_status, get_result, purge, result_status
323
323
324 control methods
324 control methods
325 abort, shutdown
325 abort, shutdown
326
326
327 """
327 """
328
328
329
329
330 block = Bool(False)
330 block = Bool(False)
331 outstanding = Set()
331 outstanding = Set()
332 results = Instance('collections.defaultdict', (dict,))
332 results = Instance('collections.defaultdict', (dict,))
333 metadata = Instance('collections.defaultdict', (Metadata,))
333 metadata = Instance('collections.defaultdict', (Metadata,))
334 history = List()
334 history = List()
335 debug = Bool(False)
335 debug = Bool(False)
336 _spin_thread = Any()
336 _spin_thread = Any()
337 _stop_spinning = Any()
337 _stop_spinning = Any()
338
338
339 profile=Unicode()
339 profile=Unicode()
340 def _profile_default(self):
340 def _profile_default(self):
341 if BaseIPythonApplication.initialized():
341 if BaseIPythonApplication.initialized():
342 # an IPython app *might* be running, try to get its profile
342 # an IPython app *might* be running, try to get its profile
343 try:
343 try:
344 return BaseIPythonApplication.instance().profile
344 return BaseIPythonApplication.instance().profile
345 except (AttributeError, MultipleInstanceError):
345 except (AttributeError, MultipleInstanceError):
346 # could be a *different* subclass of config.Application,
346 # could be a *different* subclass of config.Application,
347 # which would raise one of these two errors.
347 # which would raise one of these two errors.
348 return u'default'
348 return u'default'
349 else:
349 else:
350 return u'default'
350 return u'default'
351
351
352
352
353 _outstanding_dict = Instance('collections.defaultdict', (set,))
353 _outstanding_dict = Instance('collections.defaultdict', (set,))
354 _ids = List()
354 _ids = List()
355 _connected=Bool(False)
355 _connected=Bool(False)
356 _ssh=Bool(False)
356 _ssh=Bool(False)
357 _context = Instance('zmq.Context')
357 _context = Instance('zmq.Context')
358 _config = Dict()
358 _config = Dict()
359 _engines=Instance(util.ReverseDict, (), {})
359 _engines=Instance(util.ReverseDict, (), {})
360 # _hub_socket=Instance('zmq.Socket')
360 # _hub_socket=Instance('zmq.Socket')
361 _query_socket=Instance('zmq.Socket')
361 _query_socket=Instance('zmq.Socket')
362 _control_socket=Instance('zmq.Socket')
362 _control_socket=Instance('zmq.Socket')
363 _iopub_socket=Instance('zmq.Socket')
363 _iopub_socket=Instance('zmq.Socket')
364 _notification_socket=Instance('zmq.Socket')
364 _notification_socket=Instance('zmq.Socket')
365 _mux_socket=Instance('zmq.Socket')
365 _mux_socket=Instance('zmq.Socket')
366 _task_socket=Instance('zmq.Socket')
366 _task_socket=Instance('zmq.Socket')
367 _task_scheme=Unicode()
367 _task_scheme=Unicode()
368 _closed = False
368 _closed = False
369 _ignored_control_replies=Integer(0)
369 _ignored_control_replies=Integer(0)
370 _ignored_hub_replies=Integer(0)
370 _ignored_hub_replies=Integer(0)
371
371
372 def __new__(self, *args, **kw):
372 def __new__(self, *args, **kw):
373 # don't raise on positional args
373 # don't raise on positional args
374 return HasTraits.__new__(self, **kw)
374 return HasTraits.__new__(self, **kw)
375
375
376 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
376 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
377 context=None, debug=False, exec_key=None,
377 context=None, debug=False, exec_key=None,
378 sshserver=None, sshkey=None, password=None, paramiko=None,
378 sshserver=None, sshkey=None, password=None, paramiko=None,
379 timeout=10, **extra_args
379 timeout=10, **extra_args
380 ):
380 ):
381 if profile:
381 if profile:
382 super(Client, self).__init__(debug=debug, profile=profile)
382 super(Client, self).__init__(debug=debug, profile=profile)
383 else:
383 else:
384 super(Client, self).__init__(debug=debug)
384 super(Client, self).__init__(debug=debug)
385 if context is None:
385 if context is None:
386 context = zmq.Context.instance()
386 context = zmq.Context.instance()
387 self._context = context
387 self._context = context
388 self._stop_spinning = Event()
388 self._stop_spinning = Event()
389
389
390 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
390 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
391 if self._cd is not None:
391 if self._cd is not None:
392 if url_or_file is None:
392 if url_or_file is None:
393 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
393 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
394 if url_or_file is None:
394 if url_or_file is None:
395 raise ValueError(
395 raise ValueError(
396 "I can't find enough information to connect to a hub!"
396 "I can't find enough information to connect to a hub!"
397 " Please specify at least one of url_or_file or profile."
397 " Please specify at least one of url_or_file or profile."
398 )
398 )
399
399
400 if not util.is_url(url_or_file):
400 if not util.is_url(url_or_file):
401 # it's not a url, try for a file
401 # it's not a url, try for a file
402 if not os.path.exists(url_or_file):
402 if not os.path.exists(url_or_file):
403 if self._cd:
403 if self._cd:
404 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
404 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
405 if not os.path.exists(url_or_file):
405 if not os.path.exists(url_or_file):
406 raise IOError("Connection file not found: %r" % url_or_file)
406 raise IOError("Connection file not found: %r" % url_or_file)
407 with open(url_or_file) as f:
407 with open(url_or_file) as f:
408 cfg = json.loads(f.read())
408 cfg = json.loads(f.read())
409 else:
409 else:
410 cfg = {'url':url_or_file}
410 cfg = {'url':url_or_file}
411
411
412 # sync defaults from args, json:
412 # sync defaults from args, json:
413 if sshserver:
413 if sshserver:
414 cfg['ssh'] = sshserver
414 cfg['ssh'] = sshserver
415 if exec_key:
415 if exec_key:
416 cfg['exec_key'] = exec_key
416 cfg['exec_key'] = exec_key
417 exec_key = cfg['exec_key']
417 exec_key = cfg['exec_key']
418 location = cfg.setdefault('location', None)
418 location = cfg.setdefault('location', None)
419 cfg['url'] = util.disambiguate_url(cfg['url'], location)
419 cfg['url'] = util.disambiguate_url(cfg['url'], location)
420 url = cfg['url']
420 url = cfg['url']
421 proto,addr,port = util.split_url(url)
421 proto,addr,port = util.split_url(url)
422 if location is not None and addr == '127.0.0.1':
422 if location is not None and addr == '127.0.0.1':
423 # location specified, and connection is expected to be local
423 # location specified, and connection is expected to be local
424 if location not in LOCAL_IPS and not sshserver:
424 if location not in LOCAL_IPS and not sshserver:
425 # load ssh from JSON *only* if the controller is not on
425 # load ssh from JSON *only* if the controller is not on
426 # this machine
426 # this machine
427 sshserver=cfg['ssh']
427 sshserver=cfg['ssh']
428 if location not in LOCAL_IPS and not sshserver:
428 if location not in LOCAL_IPS and not sshserver:
429 # warn if no ssh specified, but SSH is probably needed
429 # warn if no ssh specified, but SSH is probably needed
430 # This is only a warning, because the most likely cause
430 # This is only a warning, because the most likely cause
431 # is a local Controller on a laptop whose IP is dynamic
431 # is a local Controller on a laptop whose IP is dynamic
432 warnings.warn("""
432 warnings.warn("""
433 Controller appears to be listening on localhost, but not on this machine.
433 Controller appears to be listening on localhost, but not on this machine.
434 If this is true, you should specify Client(...,sshserver='you@%s')
434 If this is true, you should specify Client(...,sshserver='you@%s')
435 or instruct your controller to listen on an external IP."""%location,
435 or instruct your controller to listen on an external IP."""%location,
436 RuntimeWarning)
436 RuntimeWarning)
437 elif not sshserver:
437 elif not sshserver:
438 # otherwise sync with cfg
438 # otherwise sync with cfg
439 sshserver = cfg['ssh']
439 sshserver = cfg['ssh']
440
440
441 self._config = cfg
441 self._config = cfg
442
442
443 self._ssh = bool(sshserver or sshkey or password)
443 self._ssh = bool(sshserver or sshkey or password)
444 if self._ssh and sshserver is None:
444 if self._ssh and sshserver is None:
445 # default to ssh via localhost
445 # default to ssh via localhost
446 sshserver = url.split('://')[1].split(':')[0]
446 sshserver = url.split('://')[1].split(':')[0]
447 if self._ssh and password is None:
447 if self._ssh and password is None:
448 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
448 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
449 password=False
449 password=False
450 else:
450 else:
451 password = getpass("SSH Password for %s: "%sshserver)
451 password = getpass("SSH Password for %s: "%sshserver)
452 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
452 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
453
453
454 # configure and construct the session
454 # configure and construct the session
455 if exec_key is not None:
455 if exec_key is not None:
456 if os.path.isfile(exec_key):
456 if os.path.isfile(exec_key):
457 extra_args['keyfile'] = exec_key
457 extra_args['keyfile'] = exec_key
458 else:
458 else:
459 exec_key = cast_bytes(exec_key)
459 exec_key = cast_bytes(exec_key)
460 extra_args['key'] = exec_key
460 extra_args['key'] = exec_key
461 self.session = Session(**extra_args)
461 self.session = Session(**extra_args)
462
462
463 self._query_socket = self._context.socket(zmq.DEALER)
463 self._query_socket = self._context.socket(zmq.DEALER)
464 self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
464 self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
465 if self._ssh:
465 if self._ssh:
466 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
466 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
467 else:
467 else:
468 self._query_socket.connect(url)
468 self._query_socket.connect(url)
469
469
470 self.session.debug = self.debug
470 self.session.debug = self.debug
471
471
472 self._notification_handlers = {'registration_notification' : self._register_engine,
472 self._notification_handlers = {'registration_notification' : self._register_engine,
473 'unregistration_notification' : self._unregister_engine,
473 'unregistration_notification' : self._unregister_engine,
474 'shutdown_notification' : lambda msg: self.close(),
474 'shutdown_notification' : lambda msg: self.close(),
475 }
475 }
476 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
476 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
477 'apply_reply' : self._handle_apply_reply}
477 'apply_reply' : self._handle_apply_reply}
478 self._connect(sshserver, ssh_kwargs, timeout)
478 self._connect(sshserver, ssh_kwargs, timeout)
479
479
480 def __del__(self):
480 def __del__(self):
481 """cleanup sockets, but _not_ context."""
481 """cleanup sockets, but _not_ context."""
482 self.close()
482 self.close()
483
483
484 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
484 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
485 if ipython_dir is None:
485 if ipython_dir is None:
486 ipython_dir = get_ipython_dir()
486 ipython_dir = get_ipython_dir()
487 if profile_dir is not None:
487 if profile_dir is not None:
488 try:
488 try:
489 self._cd = ProfileDir.find_profile_dir(profile_dir)
489 self._cd = ProfileDir.find_profile_dir(profile_dir)
490 return
490 return
491 except ProfileDirError:
491 except ProfileDirError:
492 pass
492 pass
493 elif profile is not None:
493 elif profile is not None:
494 try:
494 try:
495 self._cd = ProfileDir.find_profile_dir_by_name(
495 self._cd = ProfileDir.find_profile_dir_by_name(
496 ipython_dir, profile)
496 ipython_dir, profile)
497 return
497 return
498 except ProfileDirError:
498 except ProfileDirError:
499 pass
499 pass
500 self._cd = None
500 self._cd = None
501
501
502 def _update_engines(self, engines):
502 def _update_engines(self, engines):
503 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
503 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
504 for k,v in engines.iteritems():
504 for k,v in engines.iteritems():
505 eid = int(k)
505 eid = int(k)
506 self._engines[eid] = v
506 self._engines[eid] = v
507 self._ids.append(eid)
507 self._ids.append(eid)
508 self._ids = sorted(self._ids)
508 self._ids = sorted(self._ids)
509 if sorted(self._engines.keys()) != range(len(self._engines)) and \
509 if sorted(self._engines.keys()) != range(len(self._engines)) and \
510 self._task_scheme == 'pure' and self._task_socket:
510 self._task_scheme == 'pure' and self._task_socket:
511 self._stop_scheduling_tasks()
511 self._stop_scheduling_tasks()
512
512
513 def _stop_scheduling_tasks(self):
513 def _stop_scheduling_tasks(self):
514 """Stop scheduling tasks because an engine has been unregistered
514 """Stop scheduling tasks because an engine has been unregistered
515 from a pure ZMQ scheduler.
515 from a pure ZMQ scheduler.
516 """
516 """
517 self._task_socket.close()
517 self._task_socket.close()
518 self._task_socket = None
518 self._task_socket = None
519 msg = "An engine has been unregistered, and we are using pure " +\
519 msg = "An engine has been unregistered, and we are using pure " +\
520 "ZMQ task scheduling. Task farming will be disabled."
520 "ZMQ task scheduling. Task farming will be disabled."
521 if self.outstanding:
521 if self.outstanding:
522 msg += " If you were running tasks when this happened, " +\
522 msg += " If you were running tasks when this happened, " +\
523 "some `outstanding` msg_ids may never resolve."
523 "some `outstanding` msg_ids may never resolve."
524 warnings.warn(msg, RuntimeWarning)
524 warnings.warn(msg, RuntimeWarning)
525
525
526 def _build_targets(self, targets):
526 def _build_targets(self, targets):
527 """Turn valid target IDs or 'all' into two lists:
527 """Turn valid target IDs or 'all' into two lists:
528 (int_ids, uuids).
528 (int_ids, uuids).
529 """
529 """
530 if not self._ids:
530 if not self._ids:
531 # flush notification socket if no engines yet, just in case
531 # flush notification socket if no engines yet, just in case
532 if not self.ids:
532 if not self.ids:
533 raise error.NoEnginesRegistered("Can't build targets without any engines")
533 raise error.NoEnginesRegistered("Can't build targets without any engines")
534
534
535 if targets is None:
535 if targets is None:
536 targets = self._ids
536 targets = self._ids
537 elif isinstance(targets, basestring):
537 elif isinstance(targets, basestring):
538 if targets.lower() == 'all':
538 if targets.lower() == 'all':
539 targets = self._ids
539 targets = self._ids
540 else:
540 else:
541 raise TypeError("%r not valid str target, must be 'all'"%(targets))
541 raise TypeError("%r not valid str target, must be 'all'"%(targets))
542 elif isinstance(targets, int):
542 elif isinstance(targets, int):
543 if targets < 0:
543 if targets < 0:
544 targets = self.ids[targets]
544 targets = self.ids[targets]
545 if targets not in self._ids:
545 if targets not in self._ids:
546 raise IndexError("No such engine: %i"%targets)
546 raise IndexError("No such engine: %i"%targets)
547 targets = [targets]
547 targets = [targets]
548
548
549 if isinstance(targets, slice):
549 if isinstance(targets, slice):
550 indices = range(len(self._ids))[targets]
550 indices = range(len(self._ids))[targets]
551 ids = self.ids
551 ids = self.ids
552 targets = [ ids[i] for i in indices ]
552 targets = [ ids[i] for i in indices ]
553
553
554 if not isinstance(targets, (tuple, list, xrange)):
554 if not isinstance(targets, (tuple, list, xrange)):
555 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
555 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
556
556
557 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
557 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
558
558
559 def _connect(self, sshserver, ssh_kwargs, timeout):
559 def _connect(self, sshserver, ssh_kwargs, timeout):
560 """setup all our socket connections to the cluster. This is called from
560 """setup all our socket connections to the cluster. This is called from
561 __init__."""
561 __init__."""
562
562
563 # Maybe allow reconnecting?
563 # Maybe allow reconnecting?
564 if self._connected:
564 if self._connected:
565 return
565 return
566 self._connected=True
566 self._connected=True
567
567
568 def connect_socket(s, url):
568 def connect_socket(s, url):
569 url = util.disambiguate_url(url, self._config['location'])
569 url = util.disambiguate_url(url, self._config['location'])
570 if self._ssh:
570 if self._ssh:
571 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
571 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
572 else:
572 else:
573 return s.connect(url)
573 return s.connect(url)
574
574
575 self.session.send(self._query_socket, 'connection_request')
575 self.session.send(self._query_socket, 'connection_request')
576 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
576 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
577 poller = zmq.Poller()
577 poller = zmq.Poller()
578 poller.register(self._query_socket, zmq.POLLIN)
578 poller.register(self._query_socket, zmq.POLLIN)
579 # poll expects milliseconds, timeout is seconds
579 # poll expects milliseconds, timeout is seconds
580 evts = poller.poll(timeout*1000)
580 evts = poller.poll(timeout*1000)
581 if not evts:
581 if not evts:
582 raise error.TimeoutError("Hub connection request timed out")
582 raise error.TimeoutError("Hub connection request timed out")
583 idents,msg = self.session.recv(self._query_socket,mode=0)
583 idents,msg = self.session.recv(self._query_socket,mode=0)
584 if self.debug:
584 if self.debug:
585 pprint(msg)
585 pprint(msg)
586 msg = Message(msg)
586 msg = Message(msg)
587 content = msg.content
587 content = msg.content
588 self._config['registration'] = dict(content)
588 self._config['registration'] = dict(content)
589 if content.status == 'ok':
589 if content.status == 'ok':
590 ident = self.session.bsession
590 ident = self.session.bsession
591 if content.mux:
591 if content.mux:
592 self._mux_socket = self._context.socket(zmq.DEALER)
592 self._mux_socket = self._context.socket(zmq.DEALER)
593 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
593 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
594 connect_socket(self._mux_socket, content.mux)
594 connect_socket(self._mux_socket, content.mux)
595 if content.task:
595 if content.task:
596 self._task_scheme, task_addr = content.task
596 self._task_scheme, task_addr = content.task
597 self._task_socket = self._context.socket(zmq.DEALER)
597 self._task_socket = self._context.socket(zmq.DEALER)
598 self._task_socket.setsockopt(zmq.IDENTITY, ident)
598 self._task_socket.setsockopt(zmq.IDENTITY, ident)
599 connect_socket(self._task_socket, task_addr)
599 connect_socket(self._task_socket, task_addr)
600 if content.notification:
600 if content.notification:
601 self._notification_socket = self._context.socket(zmq.SUB)
601 self._notification_socket = self._context.socket(zmq.SUB)
602 connect_socket(self._notification_socket, content.notification)
602 connect_socket(self._notification_socket, content.notification)
603 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
603 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
604 # if content.query:
604 # if content.query:
605 # self._query_socket = self._context.socket(zmq.DEALER)
605 # self._query_socket = self._context.socket(zmq.DEALER)
606 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
606 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
607 # connect_socket(self._query_socket, content.query)
607 # connect_socket(self._query_socket, content.query)
608 if content.control:
608 if content.control:
609 self._control_socket = self._context.socket(zmq.DEALER)
609 self._control_socket = self._context.socket(zmq.DEALER)
610 self._control_socket.setsockopt(zmq.IDENTITY, ident)
610 self._control_socket.setsockopt(zmq.IDENTITY, ident)
611 connect_socket(self._control_socket, content.control)
611 connect_socket(self._control_socket, content.control)
612 if content.iopub:
612 if content.iopub:
613 self._iopub_socket = self._context.socket(zmq.SUB)
613 self._iopub_socket = self._context.socket(zmq.SUB)
614 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
614 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
615 self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
615 self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
616 connect_socket(self._iopub_socket, content.iopub)
616 connect_socket(self._iopub_socket, content.iopub)
617 self._update_engines(dict(content.engines))
617 self._update_engines(dict(content.engines))
618 else:
618 else:
619 self._connected = False
619 self._connected = False
620 raise Exception("Failed to connect!")
620 raise Exception("Failed to connect!")
621
621
622 #--------------------------------------------------------------------------
622 #--------------------------------------------------------------------------
623 # handlers and callbacks for incoming messages
623 # handlers and callbacks for incoming messages
624 #--------------------------------------------------------------------------
624 #--------------------------------------------------------------------------
625
625
626 def _unwrap_exception(self, content):
626 def _unwrap_exception(self, content):
627 """unwrap exception, and remap engine_id to int."""
627 """unwrap exception, and remap engine_id to int."""
628 e = error.unwrap_exception(content)
628 e = error.unwrap_exception(content)
629 # print e.traceback
629 # print e.traceback
630 if e.engine_info:
630 if e.engine_info:
631 e_uuid = e.engine_info['engine_uuid']
631 e_uuid = e.engine_info['engine_uuid']
632 eid = self._engines[e_uuid]
632 eid = self._engines[e_uuid]
633 e.engine_info['engine_id'] = eid
633 e.engine_info['engine_id'] = eid
634 return e
634 return e
635
635
636 def _extract_metadata(self, header, parent, content):
636 def _extract_metadata(self, header, parent, content):
637 md = {'msg_id' : parent['msg_id'],
637 md = {'msg_id' : parent['msg_id'],
638 'received' : datetime.now(),
638 'received' : datetime.now(),
639 'engine_uuid' : header.get('engine', None),
639 'engine_uuid' : header.get('engine', None),
640 'follow' : parent.get('follow', []),
640 'follow' : parent.get('follow', []),
641 'after' : parent.get('after', []),
641 'after' : parent.get('after', []),
642 'status' : content['status'],
642 'status' : content['status'],
643 }
643 }
644
644
645 if md['engine_uuid'] is not None:
645 if md['engine_uuid'] is not None:
646 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
646 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
647
647
648 if 'date' in parent:
648 if 'date' in parent:
649 md['submitted'] = parent['date']
649 md['submitted'] = parent['date']
650 if 'started' in header:
650 if 'started' in header:
651 md['started'] = header['started']
651 md['started'] = header['started']
652 if 'date' in header:
652 if 'date' in header:
653 md['completed'] = header['date']
653 md['completed'] = header['date']
654 return md
654 return md
655
655
656 def _register_engine(self, msg):
656 def _register_engine(self, msg):
657 """Register a new engine, and update our connection info."""
657 """Register a new engine, and update our connection info."""
658 content = msg['content']
658 content = msg['content']
659 eid = content['id']
659 eid = content['id']
660 d = {eid : content['queue']}
660 d = {eid : content['queue']}
661 self._update_engines(d)
661 self._update_engines(d)
662
662
663 def _unregister_engine(self, msg):
663 def _unregister_engine(self, msg):
664 """Unregister an engine that has died."""
664 """Unregister an engine that has died."""
665 content = msg['content']
665 content = msg['content']
666 eid = int(content['id'])
666 eid = int(content['id'])
667 if eid in self._ids:
667 if eid in self._ids:
668 self._ids.remove(eid)
668 self._ids.remove(eid)
669 uuid = self._engines.pop(eid)
669 uuid = self._engines.pop(eid)
670
670
671 self._handle_stranded_msgs(eid, uuid)
671 self._handle_stranded_msgs(eid, uuid)
672
672
673 if self._task_socket and self._task_scheme == 'pure':
673 if self._task_socket and self._task_scheme == 'pure':
674 self._stop_scheduling_tasks()
674 self._stop_scheduling_tasks()
675
675
676 def _handle_stranded_msgs(self, eid, uuid):
676 def _handle_stranded_msgs(self, eid, uuid):
677 """Handle messages known to be on an engine when the engine unregisters.
677 """Handle messages known to be on an engine when the engine unregisters.
678
678
679 It is possible that this will fire prematurely - that is, an engine will
679 It is possible that this will fire prematurely - that is, an engine will
680 go down after completing a result, and the client will be notified
680 go down after completing a result, and the client will be notified
681 of the unregistration and later receive the successful result.
681 of the unregistration and later receive the successful result.
682 """
682 """
683
683
684 outstanding = self._outstanding_dict[uuid]
684 outstanding = self._outstanding_dict[uuid]
685
685
686 for msg_id in list(outstanding):
686 for msg_id in list(outstanding):
687 if msg_id in self.results:
687 if msg_id in self.results:
688 # we already
688 # we already
689 continue
689 continue
690 try:
690 try:
691 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
691 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
692 except:
692 except:
693 content = error.wrap_exception()
693 content = error.wrap_exception()
694 # build a fake message:
694 # build a fake message:
695 parent = {}
695 parent = {}
696 header = {}
696 header = {}
697 parent['msg_id'] = msg_id
697 parent['msg_id'] = msg_id
698 header['engine'] = uuid
698 header['engine'] = uuid
699 header['date'] = datetime.now()
699 header['date'] = datetime.now()
700 msg = dict(parent_header=parent, header=header, content=content)
700 msg = dict(parent_header=parent, header=header, content=content)
701 self._handle_apply_reply(msg)
701 self._handle_apply_reply(msg)
702
702
703 def _handle_execute_reply(self, msg):
703 def _handle_execute_reply(self, msg):
704 """Save the reply to an execute_request into our results.
704 """Save the reply to an execute_request into our results.
705
705
706 execute messages are never actually used. apply is used instead.
706 execute messages are never actually used. apply is used instead.
707 """
707 """
708
708
709 parent = msg['parent_header']
709 parent = msg['parent_header']
710 msg_id = parent['msg_id']
710 msg_id = parent['msg_id']
711 if msg_id not in self.outstanding:
711 if msg_id not in self.outstanding:
712 if msg_id in self.history:
712 if msg_id in self.history:
713 print ("got stale result: %s"%msg_id)
713 print ("got stale result: %s"%msg_id)
714 else:
714 else:
715 print ("got unknown result: %s"%msg_id)
715 print ("got unknown result: %s"%msg_id)
716 else:
716 else:
717 self.outstanding.remove(msg_id)
717 self.outstanding.remove(msg_id)
718
718
719 content = msg['content']
719 content = msg['content']
720 header = msg['header']
720 header = msg['header']
721
721
722 # construct metadata:
722 # construct metadata:
723 md = self.metadata[msg_id]
723 md = self.metadata[msg_id]
724 md.update(self._extract_metadata(header, parent, content))
724 md.update(self._extract_metadata(header, parent, content))
725 # is this redundant?
725 # is this redundant?
726 self.metadata[msg_id] = md
726 self.metadata[msg_id] = md
727
727
728 e_outstanding = self._outstanding_dict[md['engine_uuid']]
728 e_outstanding = self._outstanding_dict[md['engine_uuid']]
729 if msg_id in e_outstanding:
729 if msg_id in e_outstanding:
730 e_outstanding.remove(msg_id)
730 e_outstanding.remove(msg_id)
731
731
732 # construct result:
732 # construct result:
733 if content['status'] == 'ok':
733 if content['status'] == 'ok':
734 self.results[msg_id] = ExecuteReply(msg_id, content, md)
734 self.results[msg_id] = ExecuteReply(msg_id, content, md)
735 elif content['status'] == 'aborted':
735 elif content['status'] == 'aborted':
736 self.results[msg_id] = error.TaskAborted(msg_id)
736 self.results[msg_id] = error.TaskAborted(msg_id)
737 elif content['status'] == 'resubmitted':
737 elif content['status'] == 'resubmitted':
738 # TODO: handle resubmission
738 # TODO: handle resubmission
739 pass
739 pass
740 else:
740 else:
741 self.results[msg_id] = self._unwrap_exception(content)
741 self.results[msg_id] = self._unwrap_exception(content)
742
742
743 def _handle_apply_reply(self, msg):
743 def _handle_apply_reply(self, msg):
744 """Save the reply to an apply_request into our results."""
744 """Save the reply to an apply_request into our results."""
745 parent = msg['parent_header']
745 parent = msg['parent_header']
746 msg_id = parent['msg_id']
746 msg_id = parent['msg_id']
747 if msg_id not in self.outstanding:
747 if msg_id not in self.outstanding:
748 if msg_id in self.history:
748 if msg_id in self.history:
749 print ("got stale result: %s"%msg_id)
749 print ("got stale result: %s"%msg_id)
750 print self.results[msg_id]
750 print self.results[msg_id]
751 print msg
751 print msg
752 else:
752 else:
753 print ("got unknown result: %s"%msg_id)
753 print ("got unknown result: %s"%msg_id)
754 else:
754 else:
755 self.outstanding.remove(msg_id)
755 self.outstanding.remove(msg_id)
756 content = msg['content']
756 content = msg['content']
757 header = msg['header']
757 header = msg['header']
758
758
759 # construct metadata:
759 # construct metadata:
760 md = self.metadata[msg_id]
760 md = self.metadata[msg_id]
761 md.update(self._extract_metadata(header, parent, content))
761 md.update(self._extract_metadata(header, parent, content))
762 # is this redundant?
762 # is this redundant?
763 self.metadata[msg_id] = md
763 self.metadata[msg_id] = md
764
764
765 e_outstanding = self._outstanding_dict[md['engine_uuid']]
765 e_outstanding = self._outstanding_dict[md['engine_uuid']]
766 if msg_id in e_outstanding:
766 if msg_id in e_outstanding:
767 e_outstanding.remove(msg_id)
767 e_outstanding.remove(msg_id)
768
768
769 # construct result:
769 # construct result:
770 if content['status'] == 'ok':
770 if content['status'] == 'ok':
771 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
771 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
772 elif content['status'] == 'aborted':
772 elif content['status'] == 'aborted':
773 self.results[msg_id] = error.TaskAborted(msg_id)
773 self.results[msg_id] = error.TaskAborted(msg_id)
774 elif content['status'] == 'resubmitted':
774 elif content['status'] == 'resubmitted':
775 # TODO: handle resubmission
775 # TODO: handle resubmission
776 pass
776 pass
777 else:
777 else:
778 self.results[msg_id] = self._unwrap_exception(content)
778 self.results[msg_id] = self._unwrap_exception(content)
779
779
780 def _flush_notifications(self):
780 def _flush_notifications(self):
781 """Flush notifications of engine registrations waiting
781 """Flush notifications of engine registrations waiting
782 in ZMQ queue."""
782 in ZMQ queue."""
783 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
783 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
784 while msg is not None:
784 while msg is not None:
785 if self.debug:
785 if self.debug:
786 pprint(msg)
786 pprint(msg)
787 msg_type = msg['header']['msg_type']
787 msg_type = msg['header']['msg_type']
788 handler = self._notification_handlers.get(msg_type, None)
788 handler = self._notification_handlers.get(msg_type, None)
789 if handler is None:
789 if handler is None:
790 raise Exception("Unhandled message type: %s"%msg.msg_type)
790 raise Exception("Unhandled message type: %s"%msg.msg_type)
791 else:
791 else:
792 handler(msg)
792 handler(msg)
793 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
793 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
794
794
795 def _flush_results(self, sock):
795 def _flush_results(self, sock):
796 """Flush task or queue results waiting in ZMQ queue."""
796 """Flush task or queue results waiting in ZMQ queue."""
797 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
797 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
798 while msg is not None:
798 while msg is not None:
799 if self.debug:
799 if self.debug:
800 pprint(msg)
800 pprint(msg)
801 msg_type = msg['header']['msg_type']
801 msg_type = msg['header']['msg_type']
802 handler = self._queue_handlers.get(msg_type, None)
802 handler = self._queue_handlers.get(msg_type, None)
803 if handler is None:
803 if handler is None:
804 raise Exception("Unhandled message type: %s"%msg.msg_type)
804 raise Exception("Unhandled message type: %s"%msg.msg_type)
805 else:
805 else:
806 handler(msg)
806 handler(msg)
807 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
807 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
808
808
809 def _flush_control(self, sock):
809 def _flush_control(self, sock):
810 """Flush replies from the control channel waiting
810 """Flush replies from the control channel waiting
811 in the ZMQ queue.
811 in the ZMQ queue.
812
812
813 Currently: ignore them."""
813 Currently: ignore them."""
814 if self._ignored_control_replies <= 0:
814 if self._ignored_control_replies <= 0:
815 return
815 return
816 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
816 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
817 while msg is not None:
817 while msg is not None:
818 self._ignored_control_replies -= 1
818 self._ignored_control_replies -= 1
819 if self.debug:
819 if self.debug:
820 pprint(msg)
820 pprint(msg)
821 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
821 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
822
822
823 def _flush_ignored_control(self):
823 def _flush_ignored_control(self):
824 """flush ignored control replies"""
824 """flush ignored control replies"""
825 while self._ignored_control_replies > 0:
825 while self._ignored_control_replies > 0:
826 self.session.recv(self._control_socket)
826 self.session.recv(self._control_socket)
827 self._ignored_control_replies -= 1
827 self._ignored_control_replies -= 1
828
828
829 def _flush_ignored_hub_replies(self):
829 def _flush_ignored_hub_replies(self):
830 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
830 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
831 while msg is not None:
831 while msg is not None:
832 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
832 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
833
833
834 def _flush_iopub(self, sock):
834 def _flush_iopub(self, sock):
835 """Flush replies from the iopub channel waiting
835 """Flush replies from the iopub channel waiting
836 in the ZMQ queue.
836 in the ZMQ queue.
837 """
837 """
838 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
838 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
839 while msg is not None:
839 while msg is not None:
840 if self.debug:
840 if self.debug:
841 pprint(msg)
841 pprint(msg)
842 parent = msg['parent_header']
842 parent = msg['parent_header']
843 # ignore IOPub messages with no parent.
843 # ignore IOPub messages with no parent.
844 # Caused by print statements or warnings from before the first execution.
844 # Caused by print statements or warnings from before the first execution.
845 if not parent:
845 if not parent:
846 continue
846 continue
847 msg_id = parent['msg_id']
847 msg_id = parent['msg_id']
848 content = msg['content']
848 content = msg['content']
849 header = msg['header']
849 header = msg['header']
850 msg_type = msg['header']['msg_type']
850 msg_type = msg['header']['msg_type']
851
851
852 # init metadata:
852 # init metadata:
853 md = self.metadata[msg_id]
853 md = self.metadata[msg_id]
854
854
855 if msg_type == 'stream':
855 if msg_type == 'stream':
856 name = content['name']
856 name = content['name']
857 s = md[name] or ''
857 s = md[name] or ''
858 md[name] = s + content['data']
858 md[name] = s + content['data']
859 elif msg_type == 'pyerr':
859 elif msg_type == 'pyerr':
860 md.update({'pyerr' : self._unwrap_exception(content)})
860 md.update({'pyerr' : self._unwrap_exception(content)})
861 elif msg_type == 'pyin':
861 elif msg_type == 'pyin':
862 md.update({'pyin' : content['code']})
862 md.update({'pyin' : content['code']})
863 elif msg_type == 'display_data':
863 elif msg_type == 'display_data':
864 md['outputs'].append(content)
864 md['outputs'].append(content)
865 elif msg_type == 'pyout':
865 elif msg_type == 'pyout':
866 md['pyout'] = content
866 md['pyout'] = content
867 else:
867 else:
868 # unhandled msg_type (status, etc.)
868 # unhandled msg_type (status, etc.)
869 pass
869 pass
870
870
871 # reduntant?
871 # reduntant?
872 self.metadata[msg_id] = md
872 self.metadata[msg_id] = md
873
873
874 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
874 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
875
875
876 #--------------------------------------------------------------------------
876 #--------------------------------------------------------------------------
877 # len, getitem
877 # len, getitem
878 #--------------------------------------------------------------------------
878 #--------------------------------------------------------------------------
879
879
880 def __len__(self):
880 def __len__(self):
881 """len(client) returns # of engines."""
881 """len(client) returns # of engines."""
882 return len(self.ids)
882 return len(self.ids)
883
883
884 def __getitem__(self, key):
884 def __getitem__(self, key):
885 """index access returns DirectView multiplexer objects
885 """index access returns DirectView multiplexer objects
886
886
887 Must be int, slice, or list/tuple/xrange of ints"""
887 Must be int, slice, or list/tuple/xrange of ints"""
888 if not isinstance(key, (int, slice, tuple, list, xrange)):
888 if not isinstance(key, (int, slice, tuple, list, xrange)):
889 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
889 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
890 else:
890 else:
891 return self.direct_view(key)
891 return self.direct_view(key)
892
892
893 #--------------------------------------------------------------------------
893 #--------------------------------------------------------------------------
894 # Begin public methods
894 # Begin public methods
895 #--------------------------------------------------------------------------
895 #--------------------------------------------------------------------------
896
896
897 @property
897 @property
898 def ids(self):
898 def ids(self):
899 """Always up-to-date ids property."""
899 """Always up-to-date ids property."""
900 self._flush_notifications()
900 self._flush_notifications()
901 # always copy:
901 # always copy:
902 return list(self._ids)
902 return list(self._ids)
903
903
904 def close(self):
904 def close(self):
905 if self._closed:
905 if self._closed:
906 return
906 return
907 self.stop_spin_thread()
907 self.stop_spin_thread()
908 snames = filter(lambda n: n.endswith('socket'), dir(self))
908 snames = filter(lambda n: n.endswith('socket'), dir(self))
909 for socket in map(lambda name: getattr(self, name), snames):
909 for socket in map(lambda name: getattr(self, name), snames):
910 if isinstance(socket, zmq.Socket) and not socket.closed:
910 if isinstance(socket, zmq.Socket) and not socket.closed:
911 socket.close()
911 socket.close()
912 self._closed = True
912 self._closed = True
913
913
914 def _spin_every(self, interval=1):
914 def _spin_every(self, interval=1):
915 """target func for use in spin_thread"""
915 """target func for use in spin_thread"""
916 while True:
916 while True:
917 if self._stop_spinning.is_set():
917 if self._stop_spinning.is_set():
918 return
918 return
919 time.sleep(interval)
919 time.sleep(interval)
920 self.spin()
920 self.spin()
921
921
922 def spin_thread(self, interval=1):
922 def spin_thread(self, interval=1):
923 """call Client.spin() in a background thread on some regular interval
923 """call Client.spin() in a background thread on some regular interval
924
924
925 This helps ensure that messages don't pile up too much in the zmq queue
925 This helps ensure that messages don't pile up too much in the zmq queue
926 while you are working on other things, or just leaving an idle terminal.
926 while you are working on other things, or just leaving an idle terminal.
927
927
928 It also helps limit potential padding of the `received` timestamp
928 It also helps limit potential padding of the `received` timestamp
929 on AsyncResult objects, used for timings.
929 on AsyncResult objects, used for timings.
930
930
931 Parameters
931 Parameters
932 ----------
932 ----------
933
933
934 interval : float, optional
934 interval : float, optional
935 The interval on which to spin the client in the background thread
935 The interval on which to spin the client in the background thread
936 (simply passed to time.sleep).
936 (simply passed to time.sleep).
937
937
938 Notes
938 Notes
939 -----
939 -----
940
940
941 For precision timing, you may want to use this method to put a bound
941 For precision timing, you may want to use this method to put a bound
942 on the jitter (in seconds) in `received` timestamps used
942 on the jitter (in seconds) in `received` timestamps used
943 in AsyncResult.wall_time.
943 in AsyncResult.wall_time.
944
944
945 """
945 """
946 if self._spin_thread is not None:
946 if self._spin_thread is not None:
947 self.stop_spin_thread()
947 self.stop_spin_thread()
948 self._stop_spinning.clear()
948 self._stop_spinning.clear()
949 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
949 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
950 self._spin_thread.daemon = True
950 self._spin_thread.daemon = True
951 self._spin_thread.start()
951 self._spin_thread.start()
952
952
953 def stop_spin_thread(self):
953 def stop_spin_thread(self):
954 """stop background spin_thread, if any"""
954 """stop background spin_thread, if any"""
955 if self._spin_thread is not None:
955 if self._spin_thread is not None:
956 self._stop_spinning.set()
956 self._stop_spinning.set()
957 self._spin_thread.join()
957 self._spin_thread.join()
958 self._spin_thread = None
958 self._spin_thread = None
959
959
960 def spin(self):
960 def spin(self):
961 """Flush any registration notifications and execution results
961 """Flush any registration notifications and execution results
962 waiting in the ZMQ queue.
962 waiting in the ZMQ queue.
963 """
963 """
964 if self._notification_socket:
964 if self._notification_socket:
965 self._flush_notifications()
965 self._flush_notifications()
966 if self._iopub_socket:
966 if self._iopub_socket:
967 self._flush_iopub(self._iopub_socket)
967 self._flush_iopub(self._iopub_socket)
968 if self._mux_socket:
968 if self._mux_socket:
969 self._flush_results(self._mux_socket)
969 self._flush_results(self._mux_socket)
970 if self._task_socket:
970 if self._task_socket:
971 self._flush_results(self._task_socket)
971 self._flush_results(self._task_socket)
972 if self._control_socket:
972 if self._control_socket:
973 self._flush_control(self._control_socket)
973 self._flush_control(self._control_socket)
974 if self._query_socket:
974 if self._query_socket:
975 self._flush_ignored_hub_replies()
975 self._flush_ignored_hub_replies()
976
976
977 def wait(self, jobs=None, timeout=-1):
977 def wait(self, jobs=None, timeout=-1):
978 """waits on one or more `jobs`, for up to `timeout` seconds.
978 """waits on one or more `jobs`, for up to `timeout` seconds.
979
979
980 Parameters
980 Parameters
981 ----------
981 ----------
982
982
983 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
983 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
984 ints are indices to self.history
984 ints are indices to self.history
985 strs are msg_ids
985 strs are msg_ids
986 default: wait on all outstanding messages
986 default: wait on all outstanding messages
987 timeout : float
987 timeout : float
988 a time in seconds, after which to give up.
988 a time in seconds, after which to give up.
989 default is -1, which means no timeout
989 default is -1, which means no timeout
990
990
991 Returns
991 Returns
992 -------
992 -------
993
993
994 True : when all msg_ids are done
994 True : when all msg_ids are done
995 False : timeout reached, some msg_ids still outstanding
995 False : timeout reached, some msg_ids still outstanding
996 """
996 """
997 tic = time.time()
997 tic = time.time()
998 if jobs is None:
998 if jobs is None:
999 theids = self.outstanding
999 theids = self.outstanding
1000 else:
1000 else:
1001 if isinstance(jobs, (int, basestring, AsyncResult)):
1001 if isinstance(jobs, (int, basestring, AsyncResult)):
1002 jobs = [jobs]
1002 jobs = [jobs]
1003 theids = set()
1003 theids = set()
1004 for job in jobs:
1004 for job in jobs:
1005 if isinstance(job, int):
1005 if isinstance(job, int):
1006 # index access
1006 # index access
1007 job = self.history[job]
1007 job = self.history[job]
1008 elif isinstance(job, AsyncResult):
1008 elif isinstance(job, AsyncResult):
1009 map(theids.add, job.msg_ids)
1009 map(theids.add, job.msg_ids)
1010 continue
1010 continue
1011 theids.add(job)
1011 theids.add(job)
1012 if not theids.intersection(self.outstanding):
1012 if not theids.intersection(self.outstanding):
1013 return True
1013 return True
1014 self.spin()
1014 self.spin()
1015 while theids.intersection(self.outstanding):
1015 while theids.intersection(self.outstanding):
1016 if timeout >= 0 and ( time.time()-tic ) > timeout:
1016 if timeout >= 0 and ( time.time()-tic ) > timeout:
1017 break
1017 break
1018 time.sleep(1e-3)
1018 time.sleep(1e-3)
1019 self.spin()
1019 self.spin()
1020 return len(theids.intersection(self.outstanding)) == 0
1020 return len(theids.intersection(self.outstanding)) == 0
1021
1021
1022 #--------------------------------------------------------------------------
1022 #--------------------------------------------------------------------------
1023 # Control methods
1023 # Control methods
1024 #--------------------------------------------------------------------------
1024 #--------------------------------------------------------------------------
1025
1025
1026 @spin_first
1026 @spin_first
1027 def clear(self, targets=None, block=None):
1027 def clear(self, targets=None, block=None):
1028 """Clear the namespace in target(s)."""
1028 """Clear the namespace in target(s)."""
1029 block = self.block if block is None else block
1029 block = self.block if block is None else block
1030 targets = self._build_targets(targets)[0]
1030 targets = self._build_targets(targets)[0]
1031 for t in targets:
1031 for t in targets:
1032 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1032 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1033 error = False
1033 error = False
1034 if block:
1034 if block:
1035 self._flush_ignored_control()
1035 self._flush_ignored_control()
1036 for i in range(len(targets)):
1036 for i in range(len(targets)):
1037 idents,msg = self.session.recv(self._control_socket,0)
1037 idents,msg = self.session.recv(self._control_socket,0)
1038 if self.debug:
1038 if self.debug:
1039 pprint(msg)
1039 pprint(msg)
1040 if msg['content']['status'] != 'ok':
1040 if msg['content']['status'] != 'ok':
1041 error = self._unwrap_exception(msg['content'])
1041 error = self._unwrap_exception(msg['content'])
1042 else:
1042 else:
1043 self._ignored_control_replies += len(targets)
1043 self._ignored_control_replies += len(targets)
1044 if error:
1044 if error:
1045 raise error
1045 raise error
1046
1046
1047
1047
1048 @spin_first
1048 @spin_first
1049 def abort(self, jobs=None, targets=None, block=None):
1049 def abort(self, jobs=None, targets=None, block=None):
1050 """Abort specific jobs from the execution queues of target(s).
1050 """Abort specific jobs from the execution queues of target(s).
1051
1051
1052 This is a mechanism to prevent jobs that have already been submitted
1052 This is a mechanism to prevent jobs that have already been submitted
1053 from executing.
1053 from executing.
1054
1054
1055 Parameters
1055 Parameters
1056 ----------
1056 ----------
1057
1057
1058 jobs : msg_id, list of msg_ids, or AsyncResult
1058 jobs : msg_id, list of msg_ids, or AsyncResult
1059 The jobs to be aborted
1059 The jobs to be aborted
1060
1060
1061 If unspecified/None: abort all outstanding jobs.
1061 If unspecified/None: abort all outstanding jobs.
1062
1062
1063 """
1063 """
1064 block = self.block if block is None else block
1064 block = self.block if block is None else block
1065 jobs = jobs if jobs is not None else list(self.outstanding)
1065 jobs = jobs if jobs is not None else list(self.outstanding)
1066 targets = self._build_targets(targets)[0]
1066 targets = self._build_targets(targets)[0]
1067
1067
1068 msg_ids = []
1068 msg_ids = []
1069 if isinstance(jobs, (basestring,AsyncResult)):
1069 if isinstance(jobs, (basestring,AsyncResult)):
1070 jobs = [jobs]
1070 jobs = [jobs]
1071 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1071 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1072 if bad_ids:
1072 if bad_ids:
1073 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1073 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1074 for j in jobs:
1074 for j in jobs:
1075 if isinstance(j, AsyncResult):
1075 if isinstance(j, AsyncResult):
1076 msg_ids.extend(j.msg_ids)
1076 msg_ids.extend(j.msg_ids)
1077 else:
1077 else:
1078 msg_ids.append(j)
1078 msg_ids.append(j)
1079 content = dict(msg_ids=msg_ids)
1079 content = dict(msg_ids=msg_ids)
1080 for t in targets:
1080 for t in targets:
1081 self.session.send(self._control_socket, 'abort_request',
1081 self.session.send(self._control_socket, 'abort_request',
1082 content=content, ident=t)
1082 content=content, ident=t)
1083 error = False
1083 error = False
1084 if block:
1084 if block:
1085 self._flush_ignored_control()
1085 self._flush_ignored_control()
1086 for i in range(len(targets)):
1086 for i in range(len(targets)):
1087 idents,msg = self.session.recv(self._control_socket,0)
1087 idents,msg = self.session.recv(self._control_socket,0)
1088 if self.debug:
1088 if self.debug:
1089 pprint(msg)
1089 pprint(msg)
1090 if msg['content']['status'] != 'ok':
1090 if msg['content']['status'] != 'ok':
1091 error = self._unwrap_exception(msg['content'])
1091 error = self._unwrap_exception(msg['content'])
1092 else:
1092 else:
1093 self._ignored_control_replies += len(targets)
1093 self._ignored_control_replies += len(targets)
1094 if error:
1094 if error:
1095 raise error
1095 raise error
1096
1096
1097 @spin_first
1097 @spin_first
1098 def shutdown(self, targets=None, restart=False, hub=False, block=None):
1098 def shutdown(self, targets=None, restart=False, hub=False, block=None):
1099 """Terminates one or more engine processes, optionally including the hub."""
1099 """Terminates one or more engine processes, optionally including the hub."""
1100 block = self.block if block is None else block
1100 block = self.block if block is None else block
1101 if hub:
1101 if hub:
1102 targets = 'all'
1102 targets = 'all'
1103 targets = self._build_targets(targets)[0]
1103 targets = self._build_targets(targets)[0]
1104 for t in targets:
1104 for t in targets:
1105 self.session.send(self._control_socket, 'shutdown_request',
1105 self.session.send(self._control_socket, 'shutdown_request',
1106 content={'restart':restart},ident=t)
1106 content={'restart':restart},ident=t)
1107 error = False
1107 error = False
1108 if block or hub:
1108 if block or hub:
1109 self._flush_ignored_control()
1109 self._flush_ignored_control()
1110 for i in range(len(targets)):
1110 for i in range(len(targets)):
1111 idents,msg = self.session.recv(self._control_socket, 0)
1111 idents,msg = self.session.recv(self._control_socket, 0)
1112 if self.debug:
1112 if self.debug:
1113 pprint(msg)
1113 pprint(msg)
1114 if msg['content']['status'] != 'ok':
1114 if msg['content']['status'] != 'ok':
1115 error = self._unwrap_exception(msg['content'])
1115 error = self._unwrap_exception(msg['content'])
1116 else:
1116 else:
1117 self._ignored_control_replies += len(targets)
1117 self._ignored_control_replies += len(targets)
1118
1118
1119 if hub:
1119 if hub:
1120 time.sleep(0.25)
1120 time.sleep(0.25)
1121 self.session.send(self._query_socket, 'shutdown_request')
1121 self.session.send(self._query_socket, 'shutdown_request')
1122 idents,msg = self.session.recv(self._query_socket, 0)
1122 idents,msg = self.session.recv(self._query_socket, 0)
1123 if self.debug:
1123 if self.debug:
1124 pprint(msg)
1124 pprint(msg)
1125 if msg['content']['status'] != 'ok':
1125 if msg['content']['status'] != 'ok':
1126 error = self._unwrap_exception(msg['content'])
1126 error = self._unwrap_exception(msg['content'])
1127
1127
1128 if error:
1128 if error:
1129 raise error
1129 raise error
1130
1130
1131 #--------------------------------------------------------------------------
1131 #--------------------------------------------------------------------------
1132 # Execution related methods
1132 # Execution related methods
1133 #--------------------------------------------------------------------------
1133 #--------------------------------------------------------------------------
1134
1134
1135 def _maybe_raise(self, result):
1135 def _maybe_raise(self, result):
1136 """wrapper for maybe raising an exception if apply failed."""
1136 """wrapper for maybe raising an exception if apply failed."""
1137 if isinstance(result, error.RemoteError):
1137 if isinstance(result, error.RemoteError):
1138 raise result
1138 raise result
1139
1139
1140 return result
1140 return result
1141
1141
1142 def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
1142 def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
1143 ident=None):
1143 ident=None):
1144 """construct and send an apply message via a socket.
1144 """construct and send an apply message via a socket.
1145
1145
1146 This is the principal method with which all engine execution is performed by views.
1146 This is the principal method with which all engine execution is performed by views.
1147 """
1147 """
1148
1148
1149 if self._closed:
1149 if self._closed:
1150 raise RuntimeError("Client cannot be used after its sockets have been closed")
1150 raise RuntimeError("Client cannot be used after its sockets have been closed")
1151
1151
1152 # defaults:
1152 # defaults:
1153 args = args if args is not None else []
1153 args = args if args is not None else []
1154 kwargs = kwargs if kwargs is not None else {}
1154 kwargs = kwargs if kwargs is not None else {}
1155 subheader = subheader if subheader is not None else {}
1155 subheader = subheader if subheader is not None else {}
1156
1156
1157 # validate arguments
1157 # validate arguments
1158 if not callable(f) and not isinstance(f, Reference):
1158 if not callable(f) and not isinstance(f, Reference):
1159 raise TypeError("f must be callable, not %s"%type(f))
1159 raise TypeError("f must be callable, not %s"%type(f))
1160 if not isinstance(args, (tuple, list)):
1160 if not isinstance(args, (tuple, list)):
1161 raise TypeError("args must be tuple or list, not %s"%type(args))
1161 raise TypeError("args must be tuple or list, not %s"%type(args))
1162 if not isinstance(kwargs, dict):
1162 if not isinstance(kwargs, dict):
1163 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1163 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1164 if not isinstance(subheader, dict):
1164 if not isinstance(subheader, dict):
1165 raise TypeError("subheader must be dict, not %s"%type(subheader))
1165 raise TypeError("subheader must be dict, not %s"%type(subheader))
1166
1166
1167 bufs = util.pack_apply_message(f,args,kwargs)
1167 bufs = util.pack_apply_message(f,args,kwargs)
1168
1168
1169 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1169 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1170 subheader=subheader, track=track)
1170 subheader=subheader, track=track)
1171
1171
1172 msg_id = msg['header']['msg_id']
1172 msg_id = msg['header']['msg_id']
1173 self.outstanding.add(msg_id)
1173 self.outstanding.add(msg_id)
1174 if ident:
1174 if ident:
1175 # possibly routed to a specific engine
1175 # possibly routed to a specific engine
1176 if isinstance(ident, list):
1176 if isinstance(ident, list):
1177 ident = ident[-1]
1177 ident = ident[-1]
1178 if ident in self._engines.values():
1178 if ident in self._engines.values():
1179 # save for later, in case of engine death
1179 # save for later, in case of engine death
1180 self._outstanding_dict[ident].add(msg_id)
1180 self._outstanding_dict[ident].add(msg_id)
1181 self.history.append(msg_id)
1181 self.history.append(msg_id)
1182 self.metadata[msg_id]['submitted'] = datetime.now()
1182 self.metadata[msg_id]['submitted'] = datetime.now()
1183
1183
1184 return msg
1184 return msg
1185
1185
1186 def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None):
1186 def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None):
1187 """construct and send an execute request via a socket.
1187 """construct and send an execute request via a socket.
1188
1188
1189 """
1189 """
1190
1190
1191 if self._closed:
1191 if self._closed:
1192 raise RuntimeError("Client cannot be used after its sockets have been closed")
1192 raise RuntimeError("Client cannot be used after its sockets have been closed")
1193
1193
1194 # defaults:
1194 # defaults:
1195 subheader = subheader if subheader is not None else {}
1195 subheader = subheader if subheader is not None else {}
1196
1196
1197 # validate arguments
1197 # validate arguments
1198 if not isinstance(code, basestring):
1198 if not isinstance(code, basestring):
1199 raise TypeError("code must be text, not %s" % type(code))
1199 raise TypeError("code must be text, not %s" % type(code))
1200 if not isinstance(subheader, dict):
1200 if not isinstance(subheader, dict):
1201 raise TypeError("subheader must be dict, not %s" % type(subheader))
1201 raise TypeError("subheader must be dict, not %s" % type(subheader))
1202
1202
1203 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1203 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1204
1204
1205
1205
1206 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1206 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1207 subheader=subheader)
1207 subheader=subheader)
1208
1208
1209 msg_id = msg['header']['msg_id']
1209 msg_id = msg['header']['msg_id']
1210 self.outstanding.add(msg_id)
1210 self.outstanding.add(msg_id)
1211 if ident:
1211 if ident:
1212 # possibly routed to a specific engine
1212 # possibly routed to a specific engine
1213 if isinstance(ident, list):
1213 if isinstance(ident, list):
1214 ident = ident[-1]
1214 ident = ident[-1]
1215 if ident in self._engines.values():
1215 if ident in self._engines.values():
1216 # save for later, in case of engine death
1216 # save for later, in case of engine death
1217 self._outstanding_dict[ident].add(msg_id)
1217 self._outstanding_dict[ident].add(msg_id)
1218 self.history.append(msg_id)
1218 self.history.append(msg_id)
1219 self.metadata[msg_id]['submitted'] = datetime.now()
1219 self.metadata[msg_id]['submitted'] = datetime.now()
1220
1220
1221 return msg
1221 return msg
1222
1222
1223 #--------------------------------------------------------------------------
1223 #--------------------------------------------------------------------------
1224 # construct a View object
1224 # construct a View object
1225 #--------------------------------------------------------------------------
1225 #--------------------------------------------------------------------------
1226
1226
1227 def load_balanced_view(self, targets=None):
1227 def load_balanced_view(self, targets=None):
1228 """construct a DirectView object.
1228 """construct a DirectView object.
1229
1229
1230 If no arguments are specified, create a LoadBalancedView
1230 If no arguments are specified, create a LoadBalancedView
1231 using all engines.
1231 using all engines.
1232
1232
1233 Parameters
1233 Parameters
1234 ----------
1234 ----------
1235
1235
1236 targets: list,slice,int,etc. [default: use all engines]
1236 targets: list,slice,int,etc. [default: use all engines]
1237 The subset of engines across which to load-balance
1237 The subset of engines across which to load-balance
1238 """
1238 """
1239 if targets == 'all':
1239 if targets == 'all':
1240 targets = None
1240 targets = None
1241 if targets is not None:
1241 if targets is not None:
1242 targets = self._build_targets(targets)[1]
1242 targets = self._build_targets(targets)[1]
1243 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1243 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1244
1244
1245 def direct_view(self, targets='all'):
1245 def direct_view(self, targets='all'):
1246 """construct a DirectView object.
1246 """construct a DirectView object.
1247
1247
1248 If no targets are specified, create a DirectView using all engines.
1248 If no targets are specified, create a DirectView using all engines.
1249
1249
1250 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1250 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1251 evaluate the target engines at each execution, whereas rc[:] will connect to
1251 evaluate the target engines at each execution, whereas rc[:] will connect to
1252 all *current* engines, and that list will not change.
1252 all *current* engines, and that list will not change.
1253
1253
1254 That is, 'all' will always use all engines, whereas rc[:] will not use
1254 That is, 'all' will always use all engines, whereas rc[:] will not use
1255 engines added after the DirectView is constructed.
1255 engines added after the DirectView is constructed.
1256
1256
1257 Parameters
1257 Parameters
1258 ----------
1258 ----------
1259
1259
1260 targets: list,slice,int,etc. [default: use all engines]
1260 targets: list,slice,int,etc. [default: use all engines]
1261 The engines to use for the View
1261 The engines to use for the View
1262 """
1262 """
1263 single = isinstance(targets, int)
1263 single = isinstance(targets, int)
1264 # allow 'all' to be lazily evaluated at each execution
1264 # allow 'all' to be lazily evaluated at each execution
1265 if targets != 'all':
1265 if targets != 'all':
1266 targets = self._build_targets(targets)[1]
1266 targets = self._build_targets(targets)[1]
1267 if single:
1267 if single:
1268 targets = targets[0]
1268 targets = targets[0]
1269 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1269 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1270
1270
1271 #--------------------------------------------------------------------------
1271 #--------------------------------------------------------------------------
1272 # Query methods
1272 # Query methods
1273 #--------------------------------------------------------------------------
1273 #--------------------------------------------------------------------------
1274
1274
1275 @spin_first
1275 @spin_first
1276 def get_result(self, indices_or_msg_ids=None, block=None):
1276 def get_result(self, indices_or_msg_ids=None, block=None):
1277 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1277 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1278
1278
1279 If the client already has the results, no request to the Hub will be made.
1279 If the client already has the results, no request to the Hub will be made.
1280
1280
1281 This is a convenient way to construct AsyncResult objects, which are wrappers
1281 This is a convenient way to construct AsyncResult objects, which are wrappers
1282 that include metadata about execution, and allow for awaiting results that
1282 that include metadata about execution, and allow for awaiting results that
1283 were not submitted by this Client.
1283 were not submitted by this Client.
1284
1284
1285 It can also be a convenient way to retrieve the metadata associated with
1285 It can also be a convenient way to retrieve the metadata associated with
1286 blocking execution, since it always retrieves
1286 blocking execution, since it always retrieves
1287
1287
1288 Examples
1288 Examples
1289 --------
1289 --------
1290 ::
1290 ::
1291
1291
1292 In [10]: r = client.apply()
1292 In [10]: r = client.apply()
1293
1293
1294 Parameters
1294 Parameters
1295 ----------
1295 ----------
1296
1296
1297 indices_or_msg_ids : integer history index, str msg_id, or list of either
1297 indices_or_msg_ids : integer history index, str msg_id, or list of either
1298 The indices or msg_ids of indices to be retrieved
1298 The indices or msg_ids of indices to be retrieved
1299
1299
1300 block : bool
1300 block : bool
1301 Whether to wait for the result to be done
1301 Whether to wait for the result to be done
1302
1302
1303 Returns
1303 Returns
1304 -------
1304 -------
1305
1305
1306 AsyncResult
1306 AsyncResult
1307 A single AsyncResult object will always be returned.
1307 A single AsyncResult object will always be returned.
1308
1308
1309 AsyncHubResult
1309 AsyncHubResult
1310 A subclass of AsyncResult that retrieves results from the Hub
1310 A subclass of AsyncResult that retrieves results from the Hub
1311
1311
1312 """
1312 """
1313 block = self.block if block is None else block
1313 block = self.block if block is None else block
1314 if indices_or_msg_ids is None:
1314 if indices_or_msg_ids is None:
1315 indices_or_msg_ids = -1
1315 indices_or_msg_ids = -1
1316
1316
1317 if not isinstance(indices_or_msg_ids, (list,tuple)):
1317 if not isinstance(indices_or_msg_ids, (list,tuple)):
1318 indices_or_msg_ids = [indices_or_msg_ids]
1318 indices_or_msg_ids = [indices_or_msg_ids]
1319
1319
1320 theids = []
1320 theids = []
1321 for id in indices_or_msg_ids:
1321 for id in indices_or_msg_ids:
1322 if isinstance(id, int):
1322 if isinstance(id, int):
1323 id = self.history[id]
1323 id = self.history[id]
1324 if not isinstance(id, basestring):
1324 if not isinstance(id, basestring):
1325 raise TypeError("indices must be str or int, not %r"%id)
1325 raise TypeError("indices must be str or int, not %r"%id)
1326 theids.append(id)
1326 theids.append(id)
1327
1327
1328 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1328 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1329 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1329 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1330
1330
1331 if remote_ids:
1331 if remote_ids:
1332 ar = AsyncHubResult(self, msg_ids=theids)
1332 ar = AsyncHubResult(self, msg_ids=theids)
1333 else:
1333 else:
1334 ar = AsyncResult(self, msg_ids=theids)
1334 ar = AsyncResult(self, msg_ids=theids)
1335
1335
1336 if block:
1336 if block:
1337 ar.wait()
1337 ar.wait()
1338
1338
1339 return ar
1339 return ar
1340
1340
1341 @spin_first
1341 @spin_first
1342 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1342 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1343 """Resubmit one or more tasks.
1343 """Resubmit one or more tasks.
1344
1344
1345 in-flight tasks may not be resubmitted.
1345 in-flight tasks may not be resubmitted.
1346
1346
1347 Parameters
1347 Parameters
1348 ----------
1348 ----------
1349
1349
1350 indices_or_msg_ids : integer history index, str msg_id, or list of either
1350 indices_or_msg_ids : integer history index, str msg_id, or list of either
1351 The indices or msg_ids of indices to be retrieved
1351 The indices or msg_ids of indices to be retrieved
1352
1352
1353 block : bool
1353 block : bool
1354 Whether to wait for the result to be done
1354 Whether to wait for the result to be done
1355
1355
1356 Returns
1356 Returns
1357 -------
1357 -------
1358
1358
1359 AsyncHubResult
1359 AsyncHubResult
1360 A subclass of AsyncResult that retrieves results from the Hub
1360 A subclass of AsyncResult that retrieves results from the Hub
1361
1361
1362 """
1362 """
1363 block = self.block if block is None else block
1363 block = self.block if block is None else block
1364 if indices_or_msg_ids is None:
1364 if indices_or_msg_ids is None:
1365 indices_or_msg_ids = -1
1365 indices_or_msg_ids = -1
1366
1366
1367 if not isinstance(indices_or_msg_ids, (list,tuple)):
1367 if not isinstance(indices_or_msg_ids, (list,tuple)):
1368 indices_or_msg_ids = [indices_or_msg_ids]
1368 indices_or_msg_ids = [indices_or_msg_ids]
1369
1369
1370 theids = []
1370 theids = []
1371 for id in indices_or_msg_ids:
1371 for id in indices_or_msg_ids:
1372 if isinstance(id, int):
1372 if isinstance(id, int):
1373 id = self.history[id]
1373 id = self.history[id]
1374 if not isinstance(id, basestring):
1374 if not isinstance(id, basestring):
1375 raise TypeError("indices must be str or int, not %r"%id)
1375 raise TypeError("indices must be str or int, not %r"%id)
1376 theids.append(id)
1376 theids.append(id)
1377
1377
1378 content = dict(msg_ids = theids)
1378 content = dict(msg_ids = theids)
1379
1379
1380 self.session.send(self._query_socket, 'resubmit_request', content)
1380 self.session.send(self._query_socket, 'resubmit_request', content)
1381
1381
1382 zmq.select([self._query_socket], [], [])
1382 zmq.select([self._query_socket], [], [])
1383 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1383 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1384 if self.debug:
1384 if self.debug:
1385 pprint(msg)
1385 pprint(msg)
1386 content = msg['content']
1386 content = msg['content']
1387 if content['status'] != 'ok':
1387 if content['status'] != 'ok':
1388 raise self._unwrap_exception(content)
1388 raise self._unwrap_exception(content)
1389 mapping = content['resubmitted']
1389 mapping = content['resubmitted']
1390 new_ids = [ mapping[msg_id] for msg_id in theids ]
1390 new_ids = [ mapping[msg_id] for msg_id in theids ]
1391
1391
1392 ar = AsyncHubResult(self, msg_ids=new_ids)
1392 ar = AsyncHubResult(self, msg_ids=new_ids)
1393
1393
1394 if block:
1394 if block:
1395 ar.wait()
1395 ar.wait()
1396
1396
1397 return ar
1397 return ar
1398
1398
1399 @spin_first
1399 @spin_first
1400 def result_status(self, msg_ids, status_only=True):
1400 def result_status(self, msg_ids, status_only=True):
1401 """Check on the status of the result(s) of the apply request with `msg_ids`.
1401 """Check on the status of the result(s) of the apply request with `msg_ids`.
1402
1402
1403 If status_only is False, then the actual results will be retrieved, else
1403 If status_only is False, then the actual results will be retrieved, else
1404 only the status of the results will be checked.
1404 only the status of the results will be checked.
1405
1405
1406 Parameters
1406 Parameters
1407 ----------
1407 ----------
1408
1408
1409 msg_ids : list of msg_ids
1409 msg_ids : list of msg_ids
1410 if int:
1410 if int:
1411 Passed as index to self.history for convenience.
1411 Passed as index to self.history for convenience.
1412 status_only : bool (default: True)
1412 status_only : bool (default: True)
1413 if False:
1413 if False:
1414 Retrieve the actual results of completed tasks.
1414 Retrieve the actual results of completed tasks.
1415
1415
1416 Returns
1416 Returns
1417 -------
1417 -------
1418
1418
1419 results : dict
1419 results : dict
1420 There will always be the keys 'pending' and 'completed', which will
1420 There will always be the keys 'pending' and 'completed', which will
1421 be lists of msg_ids that are incomplete or complete. If `status_only`
1421 be lists of msg_ids that are incomplete or complete. If `status_only`
1422 is False, then completed results will be keyed by their `msg_id`.
1422 is False, then completed results will be keyed by their `msg_id`.
1423 """
1423 """
1424 if not isinstance(msg_ids, (list,tuple)):
1424 if not isinstance(msg_ids, (list,tuple)):
1425 msg_ids = [msg_ids]
1425 msg_ids = [msg_ids]
1426
1426
1427 theids = []
1427 theids = []
1428 for msg_id in msg_ids:
1428 for msg_id in msg_ids:
1429 if isinstance(msg_id, int):
1429 if isinstance(msg_id, int):
1430 msg_id = self.history[msg_id]
1430 msg_id = self.history[msg_id]
1431 if not isinstance(msg_id, basestring):
1431 if not isinstance(msg_id, basestring):
1432 raise TypeError("msg_ids must be str, not %r"%msg_id)
1432 raise TypeError("msg_ids must be str, not %r"%msg_id)
1433 theids.append(msg_id)
1433 theids.append(msg_id)
1434
1434
1435 completed = []
1435 completed = []
1436 local_results = {}
1436 local_results = {}
1437
1437
1438 # comment this block out to temporarily disable local shortcut:
1438 # comment this block out to temporarily disable local shortcut:
1439 for msg_id in theids:
1439 for msg_id in theids:
1440 if msg_id in self.results:
1440 if msg_id in self.results:
1441 completed.append(msg_id)
1441 completed.append(msg_id)
1442 local_results[msg_id] = self.results[msg_id]
1442 local_results[msg_id] = self.results[msg_id]
1443 theids.remove(msg_id)
1443 theids.remove(msg_id)
1444
1444
1445 if theids: # some not locally cached
1445 if theids: # some not locally cached
1446 content = dict(msg_ids=theids, status_only=status_only)
1446 content = dict(msg_ids=theids, status_only=status_only)
1447 msg = self.session.send(self._query_socket, "result_request", content=content)
1447 msg = self.session.send(self._query_socket, "result_request", content=content)
1448 zmq.select([self._query_socket], [], [])
1448 zmq.select([self._query_socket], [], [])
1449 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1449 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1450 if self.debug:
1450 if self.debug:
1451 pprint(msg)
1451 pprint(msg)
1452 content = msg['content']
1452 content = msg['content']
1453 if content['status'] != 'ok':
1453 if content['status'] != 'ok':
1454 raise self._unwrap_exception(content)
1454 raise self._unwrap_exception(content)
1455 buffers = msg['buffers']
1455 buffers = msg['buffers']
1456 else:
1456 else:
1457 content = dict(completed=[],pending=[])
1457 content = dict(completed=[],pending=[])
1458
1458
1459 content['completed'].extend(completed)
1459 content['completed'].extend(completed)
1460
1460
1461 if status_only:
1461 if status_only:
1462 return content
1462 return content
1463
1463
1464 failures = []
1464 failures = []
1465 # load cached results into result:
1465 # load cached results into result:
1466 content.update(local_results)
1466 content.update(local_results)
1467
1467
1468 # update cache with results:
1468 # update cache with results:
1469 for msg_id in sorted(theids):
1469 for msg_id in sorted(theids):
1470 if msg_id in content['completed']:
1470 if msg_id in content['completed']:
1471 rec = content[msg_id]
1471 rec = content[msg_id]
1472 parent = rec['header']
1472 parent = rec['header']
1473 header = rec['result_header']
1473 header = rec['result_header']
1474 rcontent = rec['result_content']
1474 rcontent = rec['result_content']
1475 iodict = rec['io']
1475 iodict = rec['io']
1476 if isinstance(rcontent, str):
1476 if isinstance(rcontent, str):
1477 rcontent = self.session.unpack(rcontent)
1477 rcontent = self.session.unpack(rcontent)
1478
1478
1479 md = self.metadata[msg_id]
1479 md = self.metadata[msg_id]
1480 md.update(self._extract_metadata(header, parent, rcontent))
1480 md.update(self._extract_metadata(header, parent, rcontent))
1481 if rec.get('received'):
1481 if rec.get('received'):
1482 md['received'] = rec['received']
1482 md['received'] = rec['received']
1483 md.update(iodict)
1483 md.update(iodict)
1484
1484
1485 if rcontent['status'] == 'ok':
1485 if rcontent['status'] == 'ok':
1486 res,buffers = util.unserialize_object(buffers)
1486 res,buffers = util.unserialize_object(buffers)
1487 else:
1487 else:
1488 print rcontent
1488 print rcontent
1489 res = self._unwrap_exception(rcontent)
1489 res = self._unwrap_exception(rcontent)
1490 failures.append(res)
1490 failures.append(res)
1491
1491
1492 self.results[msg_id] = res
1492 self.results[msg_id] = res
1493 content[msg_id] = res
1493 content[msg_id] = res
1494
1494
1495 if len(theids) == 1 and failures:
1495 if len(theids) == 1 and failures:
1496 raise failures[0]
1496 raise failures[0]
1497
1497
1498 error.collect_exceptions(failures, "result_status")
1498 error.collect_exceptions(failures, "result_status")
1499 return content
1499 return content
1500
1500
1501 @spin_first
1501 @spin_first
1502 def queue_status(self, targets='all', verbose=False):
1502 def queue_status(self, targets='all', verbose=False):
1503 """Fetch the status of engine queues.
1503 """Fetch the status of engine queues.
1504
1504
1505 Parameters
1505 Parameters
1506 ----------
1506 ----------
1507
1507
1508 targets : int/str/list of ints/strs
1508 targets : int/str/list of ints/strs
1509 the engines whose states are to be queried.
1509 the engines whose states are to be queried.
1510 default : all
1510 default : all
1511 verbose : bool
1511 verbose : bool
1512 Whether to return lengths only, or lists of ids for each element
1512 Whether to return lengths only, or lists of ids for each element
1513 """
1513 """
1514 if targets == 'all':
1514 if targets == 'all':
1515 # allow 'all' to be evaluated on the engine
1515 # allow 'all' to be evaluated on the engine
1516 engine_ids = None
1516 engine_ids = None
1517 else:
1517 else:
1518 engine_ids = self._build_targets(targets)[1]
1518 engine_ids = self._build_targets(targets)[1]
1519 content = dict(targets=engine_ids, verbose=verbose)
1519 content = dict(targets=engine_ids, verbose=verbose)
1520 self.session.send(self._query_socket, "queue_request", content=content)
1520 self.session.send(self._query_socket, "queue_request", content=content)
1521 idents,msg = self.session.recv(self._query_socket, 0)
1521 idents,msg = self.session.recv(self._query_socket, 0)
1522 if self.debug:
1522 if self.debug:
1523 pprint(msg)
1523 pprint(msg)
1524 content = msg['content']
1524 content = msg['content']
1525 status = content.pop('status')
1525 status = content.pop('status')
1526 if status != 'ok':
1526 if status != 'ok':
1527 raise self._unwrap_exception(content)
1527 raise self._unwrap_exception(content)
1528 content = rekey(content)
1528 content = rekey(content)
1529 if isinstance(targets, int):
1529 if isinstance(targets, int):
1530 return content[targets]
1530 return content[targets]
1531 else:
1531 else:
1532 return content
1532 return content
1533
1533
1534 @spin_first
1534 @spin_first
1535 def purge_results(self, jobs=[], targets=[]):
1535 def purge_results(self, jobs=[], targets=[]):
1536 """Tell the Hub to forget results.
1536 """Tell the Hub to forget results.
1537
1537
1538 Individual results can be purged by msg_id, or the entire
1538 Individual results can be purged by msg_id, or the entire
1539 history of specific targets can be purged.
1539 history of specific targets can be purged.
1540
1540
1541 Use `purge_results('all')` to scrub everything from the Hub's db.
1541 Use `purge_results('all')` to scrub everything from the Hub's db.
1542
1542
1543 Parameters
1543 Parameters
1544 ----------
1544 ----------
1545
1545
1546 jobs : str or list of str or AsyncResult objects
1546 jobs : str or list of str or AsyncResult objects
1547 the msg_ids whose results should be forgotten.
1547 the msg_ids whose results should be forgotten.
1548 targets : int/str/list of ints/strs
1548 targets : int/str/list of ints/strs
1549 The targets, by int_id, whose entire history is to be purged.
1549 The targets, by int_id, whose entire history is to be purged.
1550
1550
1551 default : None
1551 default : None
1552 """
1552 """
1553 if not targets and not jobs:
1553 if not targets and not jobs:
1554 raise ValueError("Must specify at least one of `targets` and `jobs`")
1554 raise ValueError("Must specify at least one of `targets` and `jobs`")
1555 if targets:
1555 if targets:
1556 targets = self._build_targets(targets)[1]
1556 targets = self._build_targets(targets)[1]
1557
1557
1558 # construct msg_ids from jobs
1558 # construct msg_ids from jobs
1559 if jobs == 'all':
1559 if jobs == 'all':
1560 msg_ids = jobs
1560 msg_ids = jobs
1561 else:
1561 else:
1562 msg_ids = []
1562 msg_ids = []
1563 if isinstance(jobs, (basestring,AsyncResult)):
1563 if isinstance(jobs, (basestring,AsyncResult)):
1564 jobs = [jobs]
1564 jobs = [jobs]
1565 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1565 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1566 if bad_ids:
1566 if bad_ids:
1567 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1567 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1568 for j in jobs:
1568 for j in jobs:
1569 if isinstance(j, AsyncResult):
1569 if isinstance(j, AsyncResult):
1570 msg_ids.extend(j.msg_ids)
1570 msg_ids.extend(j.msg_ids)
1571 else:
1571 else:
1572 msg_ids.append(j)
1572 msg_ids.append(j)
1573
1573
1574 content = dict(engine_ids=targets, msg_ids=msg_ids)
1574 content = dict(engine_ids=targets, msg_ids=msg_ids)
1575 self.session.send(self._query_socket, "purge_request", content=content)
1575 self.session.send(self._query_socket, "purge_request", content=content)
1576 idents, msg = self.session.recv(self._query_socket, 0)
1576 idents, msg = self.session.recv(self._query_socket, 0)
1577 if self.debug:
1577 if self.debug:
1578 pprint(msg)
1578 pprint(msg)
1579 content = msg['content']
1579 content = msg['content']
1580 if content['status'] != 'ok':
1580 if content['status'] != 'ok':
1581 raise self._unwrap_exception(content)
1581 raise self._unwrap_exception(content)
1582
1582
1583 @spin_first
1583 @spin_first
1584 def hub_history(self):
1584 def hub_history(self):
1585 """Get the Hub's history
1585 """Get the Hub's history
1586
1586
1587 Just like the Client, the Hub has a history, which is a list of msg_ids.
1587 Just like the Client, the Hub has a history, which is a list of msg_ids.
1588 This will contain the history of all clients, and, depending on configuration,
1588 This will contain the history of all clients, and, depending on configuration,
1589 may contain history across multiple cluster sessions.
1589 may contain history across multiple cluster sessions.
1590
1590
1591 Any msg_id returned here is a valid argument to `get_result`.
1591 Any msg_id returned here is a valid argument to `get_result`.
1592
1592
1593 Returns
1593 Returns
1594 -------
1594 -------
1595
1595
1596 msg_ids : list of strs
1596 msg_ids : list of strs
1597 list of all msg_ids, ordered by task submission time.
1597 list of all msg_ids, ordered by task submission time.
1598 """
1598 """
1599
1599
1600 self.session.send(self._query_socket, "history_request", content={})
1600 self.session.send(self._query_socket, "history_request", content={})
1601 idents, msg = self.session.recv(self._query_socket, 0)
1601 idents, msg = self.session.recv(self._query_socket, 0)
1602
1602
1603 if self.debug:
1603 if self.debug:
1604 pprint(msg)
1604 pprint(msg)
1605 content = msg['content']
1605 content = msg['content']
1606 if content['status'] != 'ok':
1606 if content['status'] != 'ok':
1607 raise self._unwrap_exception(content)
1607 raise self._unwrap_exception(content)
1608 else:
1608 else:
1609 return content['history']
1609 return content['history']
1610
1610
1611 @spin_first
1611 @spin_first
1612 def db_query(self, query, keys=None):
1612 def db_query(self, query, keys=None):
1613 """Query the Hub's TaskRecord database
1613 """Query the Hub's TaskRecord database
1614
1614
1615 This will return a list of task record dicts that match `query`
1615 This will return a list of task record dicts that match `query`
1616
1616
1617 Parameters
1617 Parameters
1618 ----------
1618 ----------
1619
1619
1620 query : mongodb query dict
1620 query : mongodb query dict
1621 The search dict. See mongodb query docs for details.
1621 The search dict. See mongodb query docs for details.
1622 keys : list of strs [optional]
1622 keys : list of strs [optional]
1623 The subset of keys to be returned. The default is to fetch everything but buffers.
1623 The subset of keys to be returned. The default is to fetch everything but buffers.
1624 'msg_id' will *always* be included.
1624 'msg_id' will *always* be included.
1625 """
1625 """
1626 if isinstance(keys, basestring):
1626 if isinstance(keys, basestring):
1627 keys = [keys]
1627 keys = [keys]
1628 content = dict(query=query, keys=keys)
1628 content = dict(query=query, keys=keys)
1629 self.session.send(self._query_socket, "db_request", content=content)
1629 self.session.send(self._query_socket, "db_request", content=content)
1630 idents, msg = self.session.recv(self._query_socket, 0)
1630 idents, msg = self.session.recv(self._query_socket, 0)
1631 if self.debug:
1631 if self.debug:
1632 pprint(msg)
1632 pprint(msg)
1633 content = msg['content']
1633 content = msg['content']
1634 if content['status'] != 'ok':
1634 if content['status'] != 'ok':
1635 raise self._unwrap_exception(content)
1635 raise self._unwrap_exception(content)
1636
1636
1637 records = content['records']
1637 records = content['records']
1638
1638
1639 buffer_lens = content['buffer_lens']
1639 buffer_lens = content['buffer_lens']
1640 result_buffer_lens = content['result_buffer_lens']
1640 result_buffer_lens = content['result_buffer_lens']
1641 buffers = msg['buffers']
1641 buffers = msg['buffers']
1642 has_bufs = buffer_lens is not None
1642 has_bufs = buffer_lens is not None
1643 has_rbufs = result_buffer_lens is not None
1643 has_rbufs = result_buffer_lens is not None
1644 for i,rec in enumerate(records):
1644 for i,rec in enumerate(records):
1645 # relink buffers
1645 # relink buffers
1646 if has_bufs:
1646 if has_bufs:
1647 blen = buffer_lens[i]
1647 blen = buffer_lens[i]
1648 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1648 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1649 if has_rbufs:
1649 if has_rbufs:
1650 blen = result_buffer_lens[i]
1650 blen = result_buffer_lens[i]
1651 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1651 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1652
1652
1653 return records
1653 return records
1654
1654
1655 __all__ = [ 'Client' ]
1655 __all__ = [ 'Client' ]
@@ -1,942 +1,942 b''
1 .. _parallel_multiengine:
1 .. _parallel_multiengine:
2
2
3 ==========================
3 ==========================
4 IPython's Direct interface
4 IPython's Direct interface
5 ==========================
5 ==========================
6
6
7 The direct, or multiengine, interface represents one possible way of working with a set of
7 The direct, or multiengine, interface represents one possible way of working with a set of
8 IPython engines. The basic idea behind the multiengine interface is that the
8 IPython engines. The basic idea behind the multiengine interface is that the
9 capabilities of each engine are directly and explicitly exposed to the user.
9 capabilities of each engine are directly and explicitly exposed to the user.
10 Thus, in the multiengine interface, each engine is given an id that is used to
10 Thus, in the multiengine interface, each engine is given an id that is used to
11 identify the engine and give it work to do. This interface is very intuitive
11 identify the engine and give it work to do. This interface is very intuitive
12 and is designed with interactive usage in mind, and is the best place for
12 and is designed with interactive usage in mind, and is the best place for
13 new users of IPython to begin.
13 new users of IPython to begin.
14
14
15 Starting the IPython controller and engines
15 Starting the IPython controller and engines
16 ===========================================
16 ===========================================
17
17
18 To follow along with this tutorial, you will need to start the IPython
18 To follow along with this tutorial, you will need to start the IPython
19 controller and four IPython engines. The simplest way of doing this is to use
19 controller and four IPython engines. The simplest way of doing this is to use
20 the :command:`ipcluster` command::
20 the :command:`ipcluster` command::
21
21
22 $ ipcluster start -n 4
22 $ ipcluster start -n 4
23
23
24 For more detailed information about starting the controller and engines, see
24 For more detailed information about starting the controller and engines, see
25 our :ref:`introduction <parallel_overview>` to using IPython for parallel computing.
25 our :ref:`introduction <parallel_overview>` to using IPython for parallel computing.
26
26
27 Creating a ``DirectView`` instance
27 Creating a ``DirectView`` instance
28 ==================================
28 ==================================
29
29
30 The first step is to import the IPython :mod:`IPython.parallel`
30 The first step is to import the IPython :mod:`IPython.parallel`
31 module and then create a :class:`.Client` instance:
31 module and then create a :class:`.Client` instance:
32
32
33 .. sourcecode:: ipython
33 .. sourcecode:: ipython
34
34
35 In [1]: from IPython.parallel import Client
35 In [1]: from IPython.parallel import Client
36
36
37 In [2]: rc = Client()
37 In [2]: rc = Client()
38
38
39 This form assumes that the default connection information (stored in
39 This form assumes that the default connection information (stored in
40 :file:`ipcontroller-client.json` found in :file:`IPYTHONDIR/profile_default/security`) is
40 :file:`ipcontroller-client.json` found in :file:`IPYTHONDIR/profile_default/security`) is
41 accurate. If the controller was started on a remote machine, you must copy that connection
41 accurate. If the controller was started on a remote machine, you must copy that connection
42 file to the client machine, or enter its contents as arguments to the Client constructor:
42 file to the client machine, or enter its contents as arguments to the Client constructor:
43
43
44 .. sourcecode:: ipython
44 .. sourcecode:: ipython
45
45
46 # If you have copied the json connector file from the controller:
46 # If you have copied the json connector file from the controller:
47 In [2]: rc = Client('/path/to/ipcontroller-client.json')
47 In [2]: rc = Client('/path/to/ipcontroller-client.json')
48 # or to connect with a specific profile you have set up:
48 # or to connect with a specific profile you have set up:
49 In [3]: rc = Client(profile='mpi')
49 In [3]: rc = Client(profile='mpi')
50
50
51
51
52 To make sure there are engines connected to the controller, users can get a list
52 To make sure there are engines connected to the controller, users can get a list
53 of engine ids:
53 of engine ids:
54
54
55 .. sourcecode:: ipython
55 .. sourcecode:: ipython
56
56
57 In [3]: rc.ids
57 In [3]: rc.ids
58 Out[3]: [0, 1, 2, 3]
58 Out[3]: [0, 1, 2, 3]
59
59
60 Here we see that there are four engines ready to do work for us.
60 Here we see that there are four engines ready to do work for us.
61
61
62 For direct execution, we will make use of a :class:`DirectView` object, which can be
62 For direct execution, we will make use of a :class:`DirectView` object, which can be
63 constructed via list-access to the client:
63 constructed via list-access to the client:
64
64
65 .. sourcecode:: ipython
65 .. sourcecode:: ipython
66
66
67 In [4]: dview = rc[:] # use all engines
67 In [4]: dview = rc[:] # use all engines
68
68
69 .. seealso::
69 .. seealso::
70
70
71 For more information, see the in-depth explanation of :ref:`Views <parallel_details>`.
71 For more information, see the in-depth explanation of :ref:`Views <parallel_details>`.
72
72
73
73
74 Quick and easy parallelism
74 Quick and easy parallelism
75 ==========================
75 ==========================
76
76
77 In many cases, you simply want to apply a Python function to a sequence of
77 In many cases, you simply want to apply a Python function to a sequence of
78 objects, but *in parallel*. The client interface provides a simple way
78 objects, but *in parallel*. The client interface provides a simple way
79 of accomplishing this: using the DirectView's :meth:`~DirectView.map` method.
79 of accomplishing this: using the DirectView's :meth:`~DirectView.map` method.
80
80
81 Parallel map
81 Parallel map
82 ------------
82 ------------
83
83
84 Python's builtin :func:`map` functions allows a function to be applied to a
84 Python's builtin :func:`map` functions allows a function to be applied to a
85 sequence element-by-element. This type of code is typically trivial to
85 sequence element-by-element. This type of code is typically trivial to
86 parallelize. In fact, since IPython's interface is all about functions anyway,
86 parallelize. In fact, since IPython's interface is all about functions anyway,
87 you can just use the builtin :func:`map` with a :class:`RemoteFunction`, or a
87 you can just use the builtin :func:`map` with a :class:`RemoteFunction`, or a
88 DirectView's :meth:`map` method:
88 DirectView's :meth:`map` method:
89
89
90 .. sourcecode:: ipython
90 .. sourcecode:: ipython
91
91
92 In [62]: serial_result = map(lambda x:x**10, range(32))
92 In [62]: serial_result = map(lambda x:x**10, range(32))
93
93
94 In [63]: parallel_result = dview.map_sync(lambda x: x**10, range(32))
94 In [63]: parallel_result = dview.map_sync(lambda x: x**10, range(32))
95
95
96 In [67]: serial_result==parallel_result
96 In [67]: serial_result==parallel_result
97 Out[67]: True
97 Out[67]: True
98
98
99
99
100 .. note::
100 .. note::
101
101
102 The :class:`DirectView`'s version of :meth:`map` does
102 The :class:`DirectView`'s version of :meth:`map` does
103 not do dynamic load balancing. For a load balanced version, use a
103 not do dynamic load balancing. For a load balanced version, use a
104 :class:`LoadBalancedView`.
104 :class:`LoadBalancedView`.
105
105
106 .. seealso::
106 .. seealso::
107
107
108 :meth:`map` is implemented via :class:`ParallelFunction`.
108 :meth:`map` is implemented via :class:`ParallelFunction`.
109
109
110 Remote function decorators
110 Remote function decorators
111 --------------------------
111 --------------------------
112
112
113 Remote functions are just like normal functions, but when they are called,
113 Remote functions are just like normal functions, but when they are called,
114 they execute on one or more engines, rather than locally. IPython provides
114 they execute on one or more engines, rather than locally. IPython provides
115 two decorators:
115 two decorators:
116
116
117 .. sourcecode:: ipython
117 .. sourcecode:: ipython
118
118
119 In [10]: @dview.remote(block=True)
119 In [10]: @dview.remote(block=True)
120 ....: def getpid():
120 ....: def getpid():
121 ....: import os
121 ....: import os
122 ....: return os.getpid()
122 ....: return os.getpid()
123 ....:
123 ....:
124
124
125 In [11]: getpid()
125 In [11]: getpid()
126 Out[11]: [12345, 12346, 12347, 12348]
126 Out[11]: [12345, 12346, 12347, 12348]
127
127
128 The ``@parallel`` decorator creates parallel functions, that break up an element-wise
128 The ``@parallel`` decorator creates parallel functions, that break up an element-wise
129 operations and distribute them, reconstructing the result.
129 operations and distribute them, reconstructing the result.
130
130
131 .. sourcecode:: ipython
131 .. sourcecode:: ipython
132
132
133 In [12]: import numpy as np
133 In [12]: import numpy as np
134
134
135 In [13]: A = np.random.random((64,48))
135 In [13]: A = np.random.random((64,48))
136
136
137 In [14]: @dview.parallel(block=True)
137 In [14]: @dview.parallel(block=True)
138 ....: def pmul(A,B):
138 ....: def pmul(A,B):
139 ....: return A*B
139 ....: return A*B
140
140
141 In [15]: C_local = A*A
141 In [15]: C_local = A*A
142
142
143 In [16]: C_remote = pmul(A,A)
143 In [16]: C_remote = pmul(A,A)
144
144
145 In [17]: (C_local == C_remote).all()
145 In [17]: (C_local == C_remote).all()
146 Out[17]: True
146 Out[17]: True
147
147
148 Calling a ``@parallel`` function *does not* correspond to map. It is used for splitting
148 Calling a ``@parallel`` function *does not* correspond to map. It is used for splitting
149 element-wise operations that operate on a sequence or array. For ``map`` behavior,
149 element-wise operations that operate on a sequence or array. For ``map`` behavior,
150 parallel functions do have a map method.
150 parallel functions do have a map method.
151
151
152 ==================== ============================ =============================
152 ==================== ============================ =============================
153 call pfunc(seq) pfunc.map(seq)
153 call pfunc(seq) pfunc.map(seq)
154 ==================== ============================ =============================
154 ==================== ============================ =============================
155 # of tasks # of engines (1 per engine) # of engines (1 per engine)
155 # of tasks # of engines (1 per engine) # of engines (1 per engine)
156 # of remote calls # of engines (1 per engine) ``len(seq)``
156 # of remote calls # of engines (1 per engine) ``len(seq)``
157 argument to remote ``seq[i:j]`` (sub-sequence) ``seq[i]`` (single element)
157 argument to remote ``seq[i:j]`` (sub-sequence) ``seq[i]`` (single element)
158 ==================== ============================ =============================
158 ==================== ============================ =============================
159
159
160 A quick example to illustrate the difference in arguments for the two modes:
160 A quick example to illustrate the difference in arguments for the two modes:
161
161
162 .. sourcecode:: ipython
162 .. sourcecode:: ipython
163
163
164 In [16]: @dview.parallel(block=True)
164 In [16]: @dview.parallel(block=True)
165 ....: def echo(x):
165 ....: def echo(x):
166 ....: return str(x)
166 ....: return str(x)
167 ....:
167 ....:
168
168
169 In [17]: echo(range(5))
169 In [17]: echo(range(5))
170 Out[17]: ['[0, 1]', '[2]', '[3]', '[4]']
170 Out[17]: ['[0, 1]', '[2]', '[3]', '[4]']
171
171
172 In [18]: echo.map(range(5))
172 In [18]: echo.map(range(5))
173 Out[18]: ['0', '1', '2', '3', '4']
173 Out[18]: ['0', '1', '2', '3', '4']
174
174
175
175
176 .. seealso::
176 .. seealso::
177
177
178 See the :func:`~.remotefunction.parallel` and :func:`~.remotefunction.remote`
178 See the :func:`~.remotefunction.parallel` and :func:`~.remotefunction.remote`
179 decorators for options.
179 decorators for options.
180
180
181 Calling Python functions
181 Calling Python functions
182 ========================
182 ========================
183
183
184 The most basic type of operation that can be performed on the engines is to
184 The most basic type of operation that can be performed on the engines is to
185 execute Python code or call Python functions. Executing Python code can be
185 execute Python code or call Python functions. Executing Python code can be
186 done in blocking or non-blocking mode (non-blocking is default) using the
186 done in blocking or non-blocking mode (non-blocking is default) using the
187 :meth:`.View.execute` method, and calling functions can be done via the
187 :meth:`.View.execute` method, and calling functions can be done via the
188 :meth:`.View.apply` method.
188 :meth:`.View.apply` method.
189
189
190 apply
190 apply
191 -----
191 -----
192
192
193 The main method for doing remote execution (in fact, all methods that
193 The main method for doing remote execution (in fact, all methods that
194 communicate with the engines are built on top of it), is :meth:`View.apply`.
194 communicate with the engines are built on top of it), is :meth:`View.apply`.
195
195
196 We strive to provide the cleanest interface we can, so `apply` has the following
196 We strive to provide the cleanest interface we can, so `apply` has the following
197 signature:
197 signature:
198
198
199 .. sourcecode:: python
199 .. sourcecode:: python
200
200
201 view.apply(f, *args, **kwargs)
201 view.apply(f, *args, **kwargs)
202
202
203 There are various ways to call functions with IPython, and these flags are set as
203 There are various ways to call functions with IPython, and these flags are set as
204 attributes of the View. The ``DirectView`` has just two of these flags:
204 attributes of the View. The ``DirectView`` has just two of these flags:
205
205
206 dv.block : bool
206 dv.block : bool
207 whether to wait for the result, or return an :class:`AsyncResult` object
207 whether to wait for the result, or return an :class:`AsyncResult` object
208 immediately
208 immediately
209 dv.track : bool
209 dv.track : bool
210 whether to instruct pyzmq to track when zeromq is done sending the message.
210 whether to instruct pyzmq to track when zeromq is done sending the message.
211 This is primarily useful for non-copying sends of numpy arrays that you plan to
211 This is primarily useful for non-copying sends of numpy arrays that you plan to
212 edit in-place. You need to know when it becomes safe to edit the buffer
212 edit in-place. You need to know when it becomes safe to edit the buffer
213 without corrupting the message.
213 without corrupting the message.
214 dv.targets : int, list of ints
214 dv.targets : int, list of ints
215 which targets this view is associated with.
215 which targets this view is associated with.
216
216
217
217
218 Creating a view is simple: index-access on a client creates a :class:`.DirectView`.
218 Creating a view is simple: index-access on a client creates a :class:`.DirectView`.
219
219
220 .. sourcecode:: ipython
220 .. sourcecode:: ipython
221
221
222 In [4]: view = rc[1:3]
222 In [4]: view = rc[1:3]
223 Out[4]: <DirectView [1, 2]>
223 Out[4]: <DirectView [1, 2]>
224
224
225 In [5]: view.apply<tab>
225 In [5]: view.apply<tab>
226 view.apply view.apply_async view.apply_sync
226 view.apply view.apply_async view.apply_sync
227
227
228 For convenience, you can set block temporarily for a single call with the extra sync/async methods.
228 For convenience, you can set block temporarily for a single call with the extra sync/async methods.
229
229
230 Blocking execution
230 Blocking execution
231 ------------------
231 ------------------
232
232
233 In blocking mode, the :class:`.DirectView` object (called ``dview`` in
233 In blocking mode, the :class:`.DirectView` object (called ``dview`` in
234 these examples) submits the command to the controller, which places the
234 these examples) submits the command to the controller, which places the
235 command in the engines' queues for execution. The :meth:`apply` call then
235 command in the engines' queues for execution. The :meth:`apply` call then
236 blocks until the engines are done executing the command:
236 blocks until the engines are done executing the command:
237
237
238 .. sourcecode:: ipython
238 .. sourcecode:: ipython
239
239
240 In [2]: dview = rc[:] # A DirectView of all engines
240 In [2]: dview = rc[:] # A DirectView of all engines
241 In [3]: dview.block=True
241 In [3]: dview.block=True
242 In [4]: dview['a'] = 5
242 In [4]: dview['a'] = 5
243
243
244 In [5]: dview['b'] = 10
244 In [5]: dview['b'] = 10
245
245
246 In [6]: dview.apply(lambda x: a+b+x, 27)
246 In [6]: dview.apply(lambda x: a+b+x, 27)
247 Out[6]: [42, 42, 42, 42]
247 Out[6]: [42, 42, 42, 42]
248
248
249 You can also select blocking execution on a call-by-call basis with the :meth:`apply_sync`
249 You can also select blocking execution on a call-by-call basis with the :meth:`apply_sync`
250 method:
250 method:
251
251
252 In [7]: dview.block=False
252 In [7]: dview.block=False
253
253
254 In [8]: dview.apply_sync(lambda x: a+b+x, 27)
254 In [8]: dview.apply_sync(lambda x: a+b+x, 27)
255 Out[8]: [42, 42, 42, 42]
255 Out[8]: [42, 42, 42, 42]
256
256
257 Python commands can be executed as strings on specific engines by using a View's ``execute``
257 Python commands can be executed as strings on specific engines by using a View's ``execute``
258 method:
258 method:
259
259
260 .. sourcecode:: ipython
260 .. sourcecode:: ipython
261
261
262 In [6]: rc[::2].execute('c=a+b')
262 In [6]: rc[::2].execute('c=a+b')
263
263
264 In [7]: rc[1::2].execute('c=a-b')
264 In [7]: rc[1::2].execute('c=a-b')
265
265
266 In [8]: dview['c'] # shorthand for dview.pull('c', block=True)
266 In [8]: dview['c'] # shorthand for dview.pull('c', block=True)
267 Out[8]: [15, -5, 15, -5]
267 Out[8]: [15, -5, 15, -5]
268
268
269
269
270 Non-blocking execution
270 Non-blocking execution
271 ----------------------
271 ----------------------
272
272
273 In non-blocking mode, :meth:`apply` submits the command to be executed and
273 In non-blocking mode, :meth:`apply` submits the command to be executed and
274 then returns a :class:`AsyncResult` object immediately. The
274 then returns a :class:`AsyncResult` object immediately. The
275 :class:`AsyncResult` object gives you a way of getting a result at a later
275 :class:`AsyncResult` object gives you a way of getting a result at a later
276 time through its :meth:`get` method.
276 time through its :meth:`get` method.
277
277
278 .. seealso::
278 .. seealso::
279
279
280 Docs on the :ref:`AsyncResult <parallel_asyncresult>` object.
280 Docs on the :ref:`AsyncResult <parallel_asyncresult>` object.
281
281
282 This allows you to quickly submit long running commands without blocking your
282 This allows you to quickly submit long running commands without blocking your
283 local Python/IPython session:
283 local Python/IPython session:
284
284
285 .. sourcecode:: ipython
285 .. sourcecode:: ipython
286
286
287 # define our function
287 # define our function
288 In [6]: def wait(t):
288 In [6]: def wait(t):
289 ....: import time
289 ....: import time
290 ....: tic = time.time()
290 ....: tic = time.time()
291 ....: time.sleep(t)
291 ....: time.sleep(t)
292 ....: return time.time()-tic
292 ....: return time.time()-tic
293
293
294 # In non-blocking mode
294 # In non-blocking mode
295 In [7]: ar = dview.apply_async(wait, 2)
295 In [7]: ar = dview.apply_async(wait, 2)
296
296
297 # Now block for the result
297 # Now block for the result
298 In [8]: ar.get()
298 In [8]: ar.get()
299 Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154]
299 Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154]
300
300
301 # Again in non-blocking mode
301 # Again in non-blocking mode
302 In [9]: ar = dview.apply_async(wait, 10)
302 In [9]: ar = dview.apply_async(wait, 10)
303
303
304 # Poll to see if the result is ready
304 # Poll to see if the result is ready
305 In [10]: ar.ready()
305 In [10]: ar.ready()
306 Out[10]: False
306 Out[10]: False
307
307
308 # ask for the result, but wait a maximum of 1 second:
308 # ask for the result, but wait a maximum of 1 second:
309 In [45]: ar.get(1)
309 In [45]: ar.get(1)
310 ---------------------------------------------------------------------------
310 ---------------------------------------------------------------------------
311 TimeoutError Traceback (most recent call last)
311 TimeoutError Traceback (most recent call last)
312 /home/you/<ipython-input-45-7cd858bbb8e0> in <module>()
312 /home/you/<ipython-input-45-7cd858bbb8e0> in <module>()
313 ----> 1 ar.get(1)
313 ----> 1 ar.get(1)
314
314
315 /path/to/site-packages/IPython/parallel/asyncresult.pyc in get(self, timeout)
315 /path/to/site-packages/IPython/parallel/asyncresult.pyc in get(self, timeout)
316 62 raise self._exception
316 62 raise self._exception
317 63 else:
317 63 else:
318 ---> 64 raise error.TimeoutError("Result not ready.")
318 ---> 64 raise error.TimeoutError("Result not ready.")
319 65
319 65
320 66 def ready(self):
320 66 def ready(self):
321
321
322 TimeoutError: Result not ready.
322 TimeoutError: Result not ready.
323
323
324 .. Note::
324 .. Note::
325
325
326 Note the import inside the function. This is a common model, to ensure
326 Note the import inside the function. This is a common model, to ensure
327 that the appropriate modules are imported where the task is run. You can
327 that the appropriate modules are imported where the task is run. You can
328 also manually import modules into the engine(s) namespace(s) via
328 also manually import modules into the engine(s) namespace(s) via
329 :meth:`view.execute('import numpy')`.
329 :meth:`view.execute('import numpy')`.
330
330
331 Often, it is desirable to wait until a set of :class:`AsyncResult` objects
331 Often, it is desirable to wait until a set of :class:`AsyncResult` objects
332 are done. For this, there is a the method :meth:`wait`. This method takes a
332 are done. For this, there is a the method :meth:`wait`. This method takes a
333 tuple of :class:`AsyncResult` objects (or `msg_ids` or indices to the client's History),
333 tuple of :class:`AsyncResult` objects (or `msg_ids` or indices to the client's History),
334 and blocks until all of the associated results are ready:
334 and blocks until all of the associated results are ready:
335
335
336 .. sourcecode:: ipython
336 .. sourcecode:: ipython
337
337
338 In [72]: dview.block=False
338 In [72]: dview.block=False
339
339
340 # A trivial list of AsyncResults objects
340 # A trivial list of AsyncResults objects
341 In [73]: pr_list = [dview.apply_async(wait, 3) for i in range(10)]
341 In [73]: pr_list = [dview.apply_async(wait, 3) for i in range(10)]
342
342
343 # Wait until all of them are done
343 # Wait until all of them are done
344 In [74]: dview.wait(pr_list)
344 In [74]: dview.wait(pr_list)
345
345
346 # Then, their results are ready using get() or the `.r` attribute
346 # Then, their results are ready using get() or the `.r` attribute
347 In [75]: pr_list[0].get()
347 In [75]: pr_list[0].get()
348 Out[75]: [2.9982571601867676, 2.9982588291168213, 2.9987530708312988, 2.9990990161895752]
348 Out[75]: [2.9982571601867676, 2.9982588291168213, 2.9987530708312988, 2.9990990161895752]
349
349
350
350
351
351
352 The ``block`` and ``targets`` keyword arguments and attributes
352 The ``block`` and ``targets`` keyword arguments and attributes
353 --------------------------------------------------------------
353 --------------------------------------------------------------
354
354
355 Most DirectView methods (excluding :meth:`apply`) accept ``block`` and
355 Most DirectView methods (excluding :meth:`apply`) accept ``block`` and
356 ``targets`` as keyword arguments. As we have seen above, these keyword arguments control the
356 ``targets`` as keyword arguments. As we have seen above, these keyword arguments control the
357 blocking mode and which engines the command is applied to. The :class:`View` class also has
357 blocking mode and which engines the command is applied to. The :class:`View` class also has
358 :attr:`block` and :attr:`targets` attributes that control the default behavior when the keyword
358 :attr:`block` and :attr:`targets` attributes that control the default behavior when the keyword
359 arguments are not provided. Thus the following logic is used for :attr:`block` and :attr:`targets`:
359 arguments are not provided. Thus the following logic is used for :attr:`block` and :attr:`targets`:
360
360
361 * If no keyword argument is provided, the instance attributes are used.
361 * If no keyword argument is provided, the instance attributes are used.
362 * Keyword argument, if provided override the instance attributes for
362 * Keyword argument, if provided override the instance attributes for
363 the duration of a single call.
363 the duration of a single call.
364
364
365 The following examples demonstrate how to use the instance attributes:
365 The following examples demonstrate how to use the instance attributes:
366
366
367 .. sourcecode:: ipython
367 .. sourcecode:: ipython
368
368
369 In [16]: dview.targets = [0,2]
369 In [16]: dview.targets = [0,2]
370
370
371 In [17]: dview.block = False
371 In [17]: dview.block = False
372
372
373 In [18]: ar = dview.apply(lambda : 10)
373 In [18]: ar = dview.apply(lambda : 10)
374
374
375 In [19]: ar.get()
375 In [19]: ar.get()
376 Out[19]: [10, 10]
376 Out[19]: [10, 10]
377
377
378 In [16]: dview.targets = v.client.ids # all engines (4)
378 In [16]: dview.targets = v.client.ids # all engines (4)
379
379
380 In [21]: dview.block = True
380 In [21]: dview.block = True
381
381
382 In [22]: dview.apply(lambda : 42)
382 In [22]: dview.apply(lambda : 42)
383 Out[22]: [42, 42, 42, 42]
383 Out[22]: [42, 42, 42, 42]
384
384
385 The :attr:`block` and :attr:`targets` instance attributes of the
385 The :attr:`block` and :attr:`targets` instance attributes of the
386 :class:`.DirectView` also determine the behavior of the parallel magic commands.
386 :class:`.DirectView` also determine the behavior of the parallel magic commands.
387
387
388 Parallel magic commands
388 Parallel magic commands
389 -----------------------
389 -----------------------
390
390
391 We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``)
391 We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``)
392 that make it a bit more pleasant to execute Python commands on the engines interactively.
392 that make it a bit more pleasant to execute Python commands on the engines interactively.
393 These are simply shortcuts to :meth:`.DirectView.execute`
393 These are simply shortcuts to :meth:`.DirectView.execute`
394 and :meth:`.AsyncResult.display_outputs` methods repsectively.
394 and :meth:`.AsyncResult.display_outputs` methods repsectively.
395 The ``%px`` magic executes a single Python command on the engines
395 The ``%px`` magic executes a single Python command on the engines
396 specified by the :attr:`targets` attribute of the :class:`DirectView` instance:
396 specified by the :attr:`targets` attribute of the :class:`DirectView` instance:
397
397
398 .. sourcecode:: ipython
398 .. sourcecode:: ipython
399
399
400 # Create a DirectView for all targets
400 # Create a DirectView for all targets
401 In [22]: dv = rc[:]
401 In [22]: dv = rc[:]
402
402
403 # Make this DirectView active for parallel magic commands
403 # Make this DirectView active for parallel magic commands
404 In [23]: dv.activate()
404 In [23]: dv.activate()
405
405
406 In [24]: dv.block=True
406 In [24]: dv.block=True
407
407
408 # import numpy here and everywhere
408 # import numpy here and everywhere
409 In [25]: with dv.sync_imports():
409 In [25]: with dv.sync_imports():
410 ....: import numpy
410 ....: import numpy
411 importing numpy on engine(s)
411 importing numpy on engine(s)
412
412
413 In [27]: %px a = numpy.random.rand(2,2)
413 In [27]: %px a = numpy.random.rand(2,2)
414 Parallel execution on engines: [0, 1, 2, 3]
414 Parallel execution on engines: [0, 1, 2, 3]
415
415
416 In [28]: %px numpy.linalg.eigvals(a)
416 In [28]: %px numpy.linalg.eigvals(a)
417 Parallel execution on engines: [0, 1, 2, 3]
417 Parallel execution on engines: [0, 1, 2, 3]
418 [ 0] Out[68]: array([ 0.77120707, -0.19448286])
418 [0] Out[68]: array([ 0.77120707, -0.19448286])
419 [ 1] Out[68]: array([ 1.10815921, 0.05110369])
419 [1] Out[68]: array([ 1.10815921, 0.05110369])
420 [ 2] Out[68]: array([ 0.74625527, -0.37475081])
420 [2] Out[68]: array([ 0.74625527, -0.37475081])
421 [ 3] Out[68]: array([ 0.72931905, 0.07159743])
421 [3] Out[68]: array([ 0.72931905, 0.07159743])
422
422
423 In [29]: %px print 'hi'
423 In [29]: %px print 'hi'
424 Parallel execution on engine(s): [0, 1, 2, 3]
424 Parallel execution on engine(s): [0, 1, 2, 3]
425 [stdout: 0] hi
425 [stdout:0] hi
426 [stdout: 1] hi
426 [stdout:1] hi
427 [stdout: 2] hi
427 [stdout:2] hi
428 [stdout: 3] hi
428 [stdout:3] hi
429
429
430
430
431 Since engines are IPython as well, you can even run magics remotely:
431 Since engines are IPython as well, you can even run magics remotely:
432
432
433 .. sourcecode:: ipython
433 .. sourcecode:: ipython
434
434
435 In [28]: %px %pylab inline
435 In [28]: %px %pylab inline
436 Parallel execution on engine(s): [0, 1, 2, 3]
436 Parallel execution on engine(s): [0, 1, 2, 3]
437 [stdout: 0]
437 [stdout:0]
438 Welcome to pylab, a matplotlib-based Python environment...
438 Welcome to pylab, a matplotlib-based Python environment...
439 For more information, type 'help(pylab)'.
439 For more information, type 'help(pylab)'.
440 [stdout: 1]
440 [stdout:1]
441 Welcome to pylab, a matplotlib-based Python environment...
441 Welcome to pylab, a matplotlib-based Python environment...
442 For more information, type 'help(pylab)'.
442 For more information, type 'help(pylab)'.
443 [stdout: 2]
443 [stdout:2]
444 Welcome to pylab, a matplotlib-based Python environment...
444 Welcome to pylab, a matplotlib-based Python environment...
445 For more information, type 'help(pylab)'.
445 For more information, type 'help(pylab)'.
446 [stdout: 3]
446 [stdout:3]
447 Welcome to pylab, a matplotlib-based Python environment...
447 Welcome to pylab, a matplotlib-based Python environment...
448 For more information, type 'help(pylab)'.
448 For more information, type 'help(pylab)'.
449
449
450 And once in pylab mode with the inline backend,
450 And once in pylab mode with the inline backend,
451 you can make plots and they will be displayed in your frontend
451 you can make plots and they will be displayed in your frontend
452 if it suports the inline figures (e.g. notebook or qtconsole):
452 if it suports the inline figures (e.g. notebook or qtconsole):
453
453
454 .. sourcecode:: ipython
454 .. sourcecode:: ipython
455
455
456 In [40]: %px plot(rand(100))
456 In [40]: %px plot(rand(100))
457 Parallel execution on engine(s): [0, 1, 2, 3]
457 Parallel execution on engine(s): [0, 1, 2, 3]
458 <plot0>
458 <plot0>
459 <plot1>
459 <plot1>
460 <plot2>
460 <plot2>
461 <plot3>
461 <plot3>
462 [ 0] Out[79]: [<matplotlib.lines.Line2D at 0x10a6286d0>]
462 [0] Out[79]: [<matplotlib.lines.Line2D at 0x10a6286d0>]
463 [ 1] Out[79]: [<matplotlib.lines.Line2D at 0x10b9476d0>]
463 [1] Out[79]: [<matplotlib.lines.Line2D at 0x10b9476d0>]
464 [ 2] Out[79]: [<matplotlib.lines.Line2D at 0x110652750>]
464 [2] Out[79]: [<matplotlib.lines.Line2D at 0x110652750>]
465 [ 3] Out[79]: [<matplotlib.lines.Line2D at 0x10c6566d0>]
465 [3] Out[79]: [<matplotlib.lines.Line2D at 0x10c6566d0>]
466
466
467
467
468 ``%%px`` Cell Magic
468 ``%%px`` Cell Magic
469 *******************
469 *******************
470
470
471 `%%px` can also be used as a Cell Magic, which accepts ``--[no]block`` flags,
471 `%%px` can also be used as a Cell Magic, which accepts ``--[no]block`` flags,
472 and a ``--group-outputs`` argument, which adjust how the outputs of multiple
472 and a ``--group-outputs`` argument, which adjust how the outputs of multiple
473 engines are presented.
473 engines are presented.
474
474
475 .. seealso::
475 .. seealso::
476
476
477 :meth:`.AsyncResult.display_outputs` for the grouping options.
477 :meth:`.AsyncResult.display_outputs` for the grouping options.
478
478
479 .. sourcecode:: ipython
479 .. sourcecode:: ipython
480
480
481 In [50]: %%px --block --group-outputs=engine
481 In [50]: %%px --block --group-outputs=engine
482 ....: import numpy as np
482 ....: import numpy as np
483 ....: A = np.random.random((2,2))
483 ....: A = np.random.random((2,2))
484 ....: ev = numpy.linalg.eigvals(A)
484 ....: ev = numpy.linalg.eigvals(A)
485 ....: print ev
485 ....: print ev
486 ....: ev.max()
486 ....: ev.max()
487 ....:
487 ....:
488 Parallel execution on engine(s): [0, 1, 2, 3]
488 Parallel execution on engine(s): [0, 1, 2, 3]
489 [stdout: 0] [ 0.60640442 0.95919621]
489 [stdout:0] [ 0.60640442 0.95919621]
490 [ 0] Out[73]: 0.9591962130899806
490 [0] Out[73]: 0.9591962130899806
491 [stdout: 1] [ 0.38501813 1.29430871]
491 [stdout:1] [ 0.38501813 1.29430871]
492 [ 1] Out[73]: 1.2943087091452372
492 [1] Out[73]: 1.2943087091452372
493 [stdout: 2] [-0.85925141 0.9387692 ]
493 [stdout:2] [-0.85925141 0.9387692 ]
494 [ 2] Out[73]: 0.93876920456230284
494 [2] Out[73]: 0.93876920456230284
495 [stdout: 3] [ 0.37998269 1.24218246]
495 [stdout:3] [ 0.37998269 1.24218246]
496 [ 3] Out[73]: 1.2421824618493817
496 [3] Out[73]: 1.2421824618493817
497
497
498 ``%result`` Magic
498 ``%result`` Magic
499 *****************
499 *****************
500
500
501 If you are using ``%px`` in non-blocking mode, you won't get output.
501 If you are using ``%px`` in non-blocking mode, you won't get output.
502 You can use ``%result`` to display the outputs of the latest command,
502 You can use ``%result`` to display the outputs of the latest command,
503 just as is done when ``%px`` is blocking:
503 just as is done when ``%px`` is blocking:
504
504
505 .. sourcecode:: ipython
505 .. sourcecode:: ipython
506
506
507 In [39]: dv.block = False
507 In [39]: dv.block = False
508
508
509 In [40]: %px print 'hi'
509 In [40]: %px print 'hi'
510 Async parallel execution on engine(s): [0, 1, 2, 3]
510 Async parallel execution on engine(s): [0, 1, 2, 3]
511
511
512 In [41]: %result
512 In [41]: %result
513 [stdout: 0] hi
513 [stdout:0] hi
514 [stdout: 1] hi
514 [stdout:1] hi
515 [stdout: 2] hi
515 [stdout:2] hi
516 [stdout: 3] hi
516 [stdout:3] hi
517
517
518 ``%result`` simply calls :meth:`.AsyncResult.display_outputs` on the most recent request.
518 ``%result`` simply calls :meth:`.AsyncResult.display_outputs` on the most recent request.
519 You can pass integers as indices if you want a result other than the latest,
519 You can pass integers as indices if you want a result other than the latest,
520 e.g. ``%result -2``, or ``%result 0`` for the first.
520 e.g. ``%result -2``, or ``%result 0`` for the first.
521
521
522
522
523 ``%autopx``
523 ``%autopx``
524 ***********
524 ***********
525
525
526 The ``%autopx`` magic switches to a mode where everything you type is executed
526 The ``%autopx`` magic switches to a mode where everything you type is executed
527 on the engines until you do ``%autopx`` again.
527 on the engines until you do ``%autopx`` again.
528
528
529 .. sourcecode:: ipython
529 .. sourcecode:: ipython
530
530
531 In [30]: dv.block=True
531 In [30]: dv.block=True
532
532
533 In [31]: %autopx
533 In [31]: %autopx
534 %autopx enabled
534 %autopx enabled
535
535
536 In [32]: max_evals = []
536 In [32]: max_evals = []
537
537
538 In [33]: for i in range(100):
538 In [33]: for i in range(100):
539 ....: a = numpy.random.rand(10,10)
539 ....: a = numpy.random.rand(10,10)
540 ....: a = a+a.transpose()
540 ....: a = a+a.transpose()
541 ....: evals = numpy.linalg.eigvals(a)
541 ....: evals = numpy.linalg.eigvals(a)
542 ....: max_evals.append(evals[0].real)
542 ....: max_evals.append(evals[0].real)
543 ....:
543 ....:
544
544
545 In [34]: print "Average max eigenvalue is: %f" % (sum(max_evals)/len(max_evals))
545 In [34]: print "Average max eigenvalue is: %f" % (sum(max_evals)/len(max_evals))
546 [stdout: 0] Average max eigenvalue is: 10.193101
546 [stdout:0] Average max eigenvalue is: 10.193101
547 [stdout: 1] Average max eigenvalue is: 10.064508
547 [stdout:1] Average max eigenvalue is: 10.064508
548 [stdout: 2] Average max eigenvalue is: 10.055724
548 [stdout:2] Average max eigenvalue is: 10.055724
549 [stdout: 3] Average max eigenvalue is: 10.086876
549 [stdout:3] Average max eigenvalue is: 10.086876
550
550
551 In [35]: %autopx
551 In [35]: %autopx
552 Auto Parallel Disabled
552 Auto Parallel Disabled
553
553
554
554
555 Moving Python objects around
555 Moving Python objects around
556 ============================
556 ============================
557
557
558 In addition to calling functions and executing code on engines, you can
558 In addition to calling functions and executing code on engines, you can
559 transfer Python objects to and from your IPython session and the engines. In
559 transfer Python objects to and from your IPython session and the engines. In
560 IPython, these operations are called :meth:`push` (sending an object to the
560 IPython, these operations are called :meth:`push` (sending an object to the
561 engines) and :meth:`pull` (getting an object from the engines).
561 engines) and :meth:`pull` (getting an object from the engines).
562
562
563 Basic push and pull
563 Basic push and pull
564 -------------------
564 -------------------
565
565
566 Here are some examples of how you use :meth:`push` and :meth:`pull`:
566 Here are some examples of how you use :meth:`push` and :meth:`pull`:
567
567
568 .. sourcecode:: ipython
568 .. sourcecode:: ipython
569
569
570 In [38]: dview.push(dict(a=1.03234,b=3453))
570 In [38]: dview.push(dict(a=1.03234,b=3453))
571 Out[38]: [None,None,None,None]
571 Out[38]: [None,None,None,None]
572
572
573 In [39]: dview.pull('a')
573 In [39]: dview.pull('a')
574 Out[39]: [ 1.03234, 1.03234, 1.03234, 1.03234]
574 Out[39]: [ 1.03234, 1.03234, 1.03234, 1.03234]
575
575
576 In [40]: dview.pull('b', targets=0)
576 In [40]: dview.pull('b', targets=0)
577 Out[40]: 3453
577 Out[40]: 3453
578
578
579 In [41]: dview.pull(('a','b'))
579 In [41]: dview.pull(('a','b'))
580 Out[41]: [ [1.03234, 3453], [1.03234, 3453], [1.03234, 3453], [1.03234, 3453] ]
580 Out[41]: [ [1.03234, 3453], [1.03234, 3453], [1.03234, 3453], [1.03234, 3453] ]
581
581
582 In [43]: dview.push(dict(c='speed'))
582 In [43]: dview.push(dict(c='speed'))
583 Out[43]: [None,None,None,None]
583 Out[43]: [None,None,None,None]
584
584
585 In non-blocking mode :meth:`push` and :meth:`pull` also return
585 In non-blocking mode :meth:`push` and :meth:`pull` also return
586 :class:`AsyncResult` objects:
586 :class:`AsyncResult` objects:
587
587
588 .. sourcecode:: ipython
588 .. sourcecode:: ipython
589
589
590 In [48]: ar = dview.pull('a', block=False)
590 In [48]: ar = dview.pull('a', block=False)
591
591
592 In [49]: ar.get()
592 In [49]: ar.get()
593 Out[49]: [1.03234, 1.03234, 1.03234, 1.03234]
593 Out[49]: [1.03234, 1.03234, 1.03234, 1.03234]
594
594
595
595
596 Dictionary interface
596 Dictionary interface
597 --------------------
597 --------------------
598
598
599 Since a Python namespace is just a :class:`dict`, :class:`DirectView` objects provide
599 Since a Python namespace is just a :class:`dict`, :class:`DirectView` objects provide
600 dictionary-style access by key and methods such as :meth:`get` and
600 dictionary-style access by key and methods such as :meth:`get` and
601 :meth:`update` for convenience. This make the remote namespaces of the engines
601 :meth:`update` for convenience. This make the remote namespaces of the engines
602 appear as a local dictionary. Underneath, these methods call :meth:`apply`:
602 appear as a local dictionary. Underneath, these methods call :meth:`apply`:
603
603
604 .. sourcecode:: ipython
604 .. sourcecode:: ipython
605
605
606 In [51]: dview['a']=['foo','bar']
606 In [51]: dview['a']=['foo','bar']
607
607
608 In [52]: dview['a']
608 In [52]: dview['a']
609 Out[52]: [ ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'] ]
609 Out[52]: [ ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'] ]
610
610
611 Scatter and gather
611 Scatter and gather
612 ------------------
612 ------------------
613
613
614 Sometimes it is useful to partition a sequence and push the partitions to
614 Sometimes it is useful to partition a sequence and push the partitions to
615 different engines. In MPI language, this is know as scatter/gather and we
615 different engines. In MPI language, this is know as scatter/gather and we
616 follow that terminology. However, it is important to remember that in
616 follow that terminology. However, it is important to remember that in
617 IPython's :class:`Client` class, :meth:`scatter` is from the
617 IPython's :class:`Client` class, :meth:`scatter` is from the
618 interactive IPython session to the engines and :meth:`gather` is from the
618 interactive IPython session to the engines and :meth:`gather` is from the
619 engines back to the interactive IPython session. For scatter/gather operations
619 engines back to the interactive IPython session. For scatter/gather operations
620 between engines, MPI, pyzmq, or some other direct interconnect should be used.
620 between engines, MPI, pyzmq, or some other direct interconnect should be used.
621
621
622 .. sourcecode:: ipython
622 .. sourcecode:: ipython
623
623
624 In [58]: dview.scatter('a',range(16))
624 In [58]: dview.scatter('a',range(16))
625 Out[58]: [None,None,None,None]
625 Out[58]: [None,None,None,None]
626
626
627 In [59]: dview['a']
627 In [59]: dview['a']
628 Out[59]: [ [0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15] ]
628 Out[59]: [ [0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15] ]
629
629
630 In [60]: dview.gather('a')
630 In [60]: dview.gather('a')
631 Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
631 Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
632
632
633 Other things to look at
633 Other things to look at
634 =======================
634 =======================
635
635
636 How to do parallel list comprehensions
636 How to do parallel list comprehensions
637 --------------------------------------
637 --------------------------------------
638
638
639 In many cases list comprehensions are nicer than using the map function. While
639 In many cases list comprehensions are nicer than using the map function. While
640 we don't have fully parallel list comprehensions, it is simple to get the
640 we don't have fully parallel list comprehensions, it is simple to get the
641 basic effect using :meth:`scatter` and :meth:`gather`:
641 basic effect using :meth:`scatter` and :meth:`gather`:
642
642
643 .. sourcecode:: ipython
643 .. sourcecode:: ipython
644
644
645 In [66]: dview.scatter('x',range(64))
645 In [66]: dview.scatter('x',range(64))
646
646
647 In [67]: %px y = [i**10 for i in x]
647 In [67]: %px y = [i**10 for i in x]
648 Parallel execution on engines: [0, 1, 2, 3]
648 Parallel execution on engines: [0, 1, 2, 3]
649 Out[67]:
649 Out[67]:
650
650
651 In [68]: y = dview.gather('y')
651 In [68]: y = dview.gather('y')
652
652
653 In [69]: print y
653 In [69]: print y
654 [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...]
654 [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...]
655
655
656 Remote imports
656 Remote imports
657 --------------
657 --------------
658
658
659 Sometimes you will want to import packages both in your interactive session
659 Sometimes you will want to import packages both in your interactive session
660 and on your remote engines. This can be done with the :class:`ContextManager`
660 and on your remote engines. This can be done with the :class:`ContextManager`
661 created by a DirectView's :meth:`sync_imports` method:
661 created by a DirectView's :meth:`sync_imports` method:
662
662
663 .. sourcecode:: ipython
663 .. sourcecode:: ipython
664
664
665 In [69]: with dview.sync_imports():
665 In [69]: with dview.sync_imports():
666 ....: import numpy
666 ....: import numpy
667 importing numpy on engine(s)
667 importing numpy on engine(s)
668
668
669 Any imports made inside the block will also be performed on the view's engines.
669 Any imports made inside the block will also be performed on the view's engines.
670 sync_imports also takes a `local` boolean flag that defaults to True, which specifies
670 sync_imports also takes a `local` boolean flag that defaults to True, which specifies
671 whether the local imports should also be performed. However, support for `local=False`
671 whether the local imports should also be performed. However, support for `local=False`
672 has not been implemented, so only packages that can be imported locally will work
672 has not been implemented, so only packages that can be imported locally will work
673 this way.
673 this way.
674
674
675 You can also specify imports via the ``@require`` decorator. This is a decorator
675 You can also specify imports via the ``@require`` decorator. This is a decorator
676 designed for use in Dependencies, but can be used to handle remote imports as well.
676 designed for use in Dependencies, but can be used to handle remote imports as well.
677 Modules or module names passed to ``@require`` will be imported before the decorated
677 Modules or module names passed to ``@require`` will be imported before the decorated
678 function is called. If they cannot be imported, the decorated function will never
678 function is called. If they cannot be imported, the decorated function will never
679 execution, and will fail with an UnmetDependencyError.
679 execution, and will fail with an UnmetDependencyError.
680
680
681 .. sourcecode:: ipython
681 .. sourcecode:: ipython
682
682
683 In [69]: from IPython.parallel import require
683 In [69]: from IPython.parallel import require
684
684
685 In [70]: @require('re'):
685 In [70]: @require('re'):
686 ....: def findall(pat, x):
686 ....: def findall(pat, x):
687 ....: # re is guaranteed to be available
687 ....: # re is guaranteed to be available
688 ....: return re.findall(pat, x)
688 ....: return re.findall(pat, x)
689
689
690 # you can also pass modules themselves, that you already have locally:
690 # you can also pass modules themselves, that you already have locally:
691 In [71]: @require(time):
691 In [71]: @require(time):
692 ....: def wait(t):
692 ....: def wait(t):
693 ....: time.sleep(t)
693 ....: time.sleep(t)
694 ....: return t
694 ....: return t
695
695
696 .. _parallel_exceptions:
696 .. _parallel_exceptions:
697
697
698 Parallel exceptions
698 Parallel exceptions
699 -------------------
699 -------------------
700
700
701 In the multiengine interface, parallel commands can raise Python exceptions,
701 In the multiengine interface, parallel commands can raise Python exceptions,
702 just like serial commands. But, it is a little subtle, because a single
702 just like serial commands. But, it is a little subtle, because a single
703 parallel command can actually raise multiple exceptions (one for each engine
703 parallel command can actually raise multiple exceptions (one for each engine
704 the command was run on). To express this idea, we have a
704 the command was run on). To express this idea, we have a
705 :exc:`CompositeError` exception class that will be raised in most cases. The
705 :exc:`CompositeError` exception class that will be raised in most cases. The
706 :exc:`CompositeError` class is a special type of exception that wraps one or
706 :exc:`CompositeError` class is a special type of exception that wraps one or
707 more other types of exceptions. Here is how it works:
707 more other types of exceptions. Here is how it works:
708
708
709 .. sourcecode:: ipython
709 .. sourcecode:: ipython
710
710
711 In [76]: dview.block=True
711 In [76]: dview.block=True
712
712
713 In [77]: dview.execute('1/0')
713 In [77]: dview.execute('1/0')
714 ---------------------------------------------------------------------------
714 ---------------------------------------------------------------------------
715 CompositeError Traceback (most recent call last)
715 CompositeError Traceback (most recent call last)
716 /home/user/<ipython-input-10-5d56b303a66c> in <module>()
716 /home/user/<ipython-input-10-5d56b303a66c> in <module>()
717 ----> 1 dview.execute('1/0')
717 ----> 1 dview.execute('1/0')
718
718
719 /path/to/site-packages/IPython/parallel/client/view.pyc in execute(self, code, targets, block)
719 /path/to/site-packages/IPython/parallel/client/view.pyc in execute(self, code, targets, block)
720 591 default: self.block
720 591 default: self.block
721 592 """
721 592 """
722 --> 593 return self._really_apply(util._execute, args=(code,), block=block, targets=targets)
722 --> 593 return self._really_apply(util._execute, args=(code,), block=block, targets=targets)
723 594
723 594
724 595 def run(self, filename, targets=None, block=None):
724 595 def run(self, filename, targets=None, block=None):
725
725
726 /home/user/<string> in _really_apply(self, f, args, kwargs, targets, block, track)
726 /home/user/<string> in _really_apply(self, f, args, kwargs, targets, block, track)
727
727
728 /path/to/site-packages/IPython/parallel/client/view.pyc in sync_results(f, self, *args, **kwargs)
728 /path/to/site-packages/IPython/parallel/client/view.pyc in sync_results(f, self, *args, **kwargs)
729 55 def sync_results(f, self, *args, **kwargs):
729 55 def sync_results(f, self, *args, **kwargs):
730 56 """sync relevant results from self.client to our results attribute."""
730 56 """sync relevant results from self.client to our results attribute."""
731 ---> 57 ret = f(self, *args, **kwargs)
731 ---> 57 ret = f(self, *args, **kwargs)
732 58 delta = self.outstanding.difference(self.client.outstanding)
732 58 delta = self.outstanding.difference(self.client.outstanding)
733 59 completed = self.outstanding.intersection(delta)
733 59 completed = self.outstanding.intersection(delta)
734
734
735 /home/user/<string> in _really_apply(self, f, args, kwargs, targets, block, track)
735 /home/user/<string> in _really_apply(self, f, args, kwargs, targets, block, track)
736
736
737 /path/to/site-packages/IPython/parallel/client/view.pyc in save_ids(f, self, *args, **kwargs)
737 /path/to/site-packages/IPython/parallel/client/view.pyc in save_ids(f, self, *args, **kwargs)
738 44 n_previous = len(self.client.history)
738 44 n_previous = len(self.client.history)
739 45 try:
739 45 try:
740 ---> 46 ret = f(self, *args, **kwargs)
740 ---> 46 ret = f(self, *args, **kwargs)
741 47 finally:
741 47 finally:
742 48 nmsgs = len(self.client.history) - n_previous
742 48 nmsgs = len(self.client.history) - n_previous
743
743
744 /path/to/site-packages/IPython/parallel/client/view.pyc in _really_apply(self, f, args, kwargs, targets, block, track)
744 /path/to/site-packages/IPython/parallel/client/view.pyc in _really_apply(self, f, args, kwargs, targets, block, track)
745 529 if block:
745 529 if block:
746 530 try:
746 530 try:
747 --> 531 return ar.get()
747 --> 531 return ar.get()
748 532 except KeyboardInterrupt:
748 532 except KeyboardInterrupt:
749 533 pass
749 533 pass
750
750
751 /path/to/site-packages/IPython/parallel/client/asyncresult.pyc in get(self, timeout)
751 /path/to/site-packages/IPython/parallel/client/asyncresult.pyc in get(self, timeout)
752 101 return self._result
752 101 return self._result
753 102 else:
753 102 else:
754 --> 103 raise self._exception
754 --> 103 raise self._exception
755 104 else:
755 104 else:
756 105 raise error.TimeoutError("Result not ready.")
756 105 raise error.TimeoutError("Result not ready.")
757
757
758 CompositeError: one or more exceptions from call to method: _execute
758 CompositeError: one or more exceptions from call to method: _execute
759 [0:apply]: ZeroDivisionError: integer division or modulo by zero
759 [0:apply]: ZeroDivisionError: integer division or modulo by zero
760 [1:apply]: ZeroDivisionError: integer division or modulo by zero
760 [1:apply]: ZeroDivisionError: integer division or modulo by zero
761 [2:apply]: ZeroDivisionError: integer division or modulo by zero
761 [2:apply]: ZeroDivisionError: integer division or modulo by zero
762 [3:apply]: ZeroDivisionError: integer division or modulo by zero
762 [3:apply]: ZeroDivisionError: integer division or modulo by zero
763
763
764 Notice how the error message printed when :exc:`CompositeError` is raised has
764 Notice how the error message printed when :exc:`CompositeError` is raised has
765 information about the individual exceptions that were raised on each engine.
765 information about the individual exceptions that were raised on each engine.
766 If you want, you can even raise one of these original exceptions:
766 If you want, you can even raise one of these original exceptions:
767
767
768 .. sourcecode:: ipython
768 .. sourcecode:: ipython
769
769
770 In [80]: try:
770 In [80]: try:
771 ....: dview.execute('1/0')
771 ....: dview.execute('1/0')
772 ....: except parallel.error.CompositeError, e:
772 ....: except parallel.error.CompositeError, e:
773 ....: e.raise_exception()
773 ....: e.raise_exception()
774 ....:
774 ....:
775 ....:
775 ....:
776 ---------------------------------------------------------------------------
776 ---------------------------------------------------------------------------
777 RemoteError Traceback (most recent call last)
777 RemoteError Traceback (most recent call last)
778 /home/user/<ipython-input-17-8597e7e39858> in <module>()
778 /home/user/<ipython-input-17-8597e7e39858> in <module>()
779 2 dview.execute('1/0')
779 2 dview.execute('1/0')
780 3 except CompositeError as e:
780 3 except CompositeError as e:
781 ----> 4 e.raise_exception()
781 ----> 4 e.raise_exception()
782
782
783 /path/to/site-packages/IPython/parallel/error.pyc in raise_exception(self, excid)
783 /path/to/site-packages/IPython/parallel/error.pyc in raise_exception(self, excid)
784 266 raise IndexError("an exception with index %i does not exist"%excid)
784 266 raise IndexError("an exception with index %i does not exist"%excid)
785 267 else:
785 267 else:
786 --> 268 raise RemoteError(en, ev, etb, ei)
786 --> 268 raise RemoteError(en, ev, etb, ei)
787 269
787 269
788 270
788 270
789
789
790 RemoteError: ZeroDivisionError(integer division or modulo by zero)
790 RemoteError: ZeroDivisionError(integer division or modulo by zero)
791 Traceback (most recent call last):
791 Traceback (most recent call last):
792 File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
792 File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
793 exec code in working,working
793 exec code in working,working
794 File "<string>", line 1, in <module>
794 File "<string>", line 1, in <module>
795 File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
795 File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
796 exec code in globals()
796 exec code in globals()
797 File "<string>", line 1, in <module>
797 File "<string>", line 1, in <module>
798 ZeroDivisionError: integer division or modulo by zero
798 ZeroDivisionError: integer division or modulo by zero
799
799
800 If you are working in IPython, you can simple type ``%debug`` after one of
800 If you are working in IPython, you can simple type ``%debug`` after one of
801 these :exc:`CompositeError` exceptions is raised, and inspect the exception
801 these :exc:`CompositeError` exceptions is raised, and inspect the exception
802 instance:
802 instance:
803
803
804 .. sourcecode:: ipython
804 .. sourcecode:: ipython
805
805
806 In [81]: dview.execute('1/0')
806 In [81]: dview.execute('1/0')
807 ---------------------------------------------------------------------------
807 ---------------------------------------------------------------------------
808 CompositeError Traceback (most recent call last)
808 CompositeError Traceback (most recent call last)
809 /home/user/<ipython-input-10-5d56b303a66c> in <module>()
809 /home/user/<ipython-input-10-5d56b303a66c> in <module>()
810 ----> 1 dview.execute('1/0')
810 ----> 1 dview.execute('1/0')
811
811
812 /path/to/site-packages/IPython/parallel/client/view.pyc in execute(self, code, targets, block)
812 /path/to/site-packages/IPython/parallel/client/view.pyc in execute(self, code, targets, block)
813 591 default: self.block
813 591 default: self.block
814 592 """
814 592 """
815 --> 593 return self._really_apply(util._execute, args=(code,), block=block, targets=targets)
815 --> 593 return self._really_apply(util._execute, args=(code,), block=block, targets=targets)
816 594
816 594
817 595 def run(self, filename, targets=None, block=None):
817 595 def run(self, filename, targets=None, block=None):
818
818
819 /home/user/<string> in _really_apply(self, f, args, kwargs, targets, block, track)
819 /home/user/<string> in _really_apply(self, f, args, kwargs, targets, block, track)
820
820
821 /path/to/site-packages/IPython/parallel/client/view.pyc in sync_results(f, self, *args, **kwargs)
821 /path/to/site-packages/IPython/parallel/client/view.pyc in sync_results(f, self, *args, **kwargs)
822 55 def sync_results(f, self, *args, **kwargs):
822 55 def sync_results(f, self, *args, **kwargs):
823 56 """sync relevant results from self.client to our results attribute."""
823 56 """sync relevant results from self.client to our results attribute."""
824 ---> 57 ret = f(self, *args, **kwargs)
824 ---> 57 ret = f(self, *args, **kwargs)
825 58 delta = self.outstanding.difference(self.client.outstanding)
825 58 delta = self.outstanding.difference(self.client.outstanding)
826 59 completed = self.outstanding.intersection(delta)
826 59 completed = self.outstanding.intersection(delta)
827
827
828 /home/user/<string> in _really_apply(self, f, args, kwargs, targets, block, track)
828 /home/user/<string> in _really_apply(self, f, args, kwargs, targets, block, track)
829
829
830 /path/to/site-packages/IPython/parallel/client/view.pyc in save_ids(f, self, *args, **kwargs)
830 /path/to/site-packages/IPython/parallel/client/view.pyc in save_ids(f, self, *args, **kwargs)
831 44 n_previous = len(self.client.history)
831 44 n_previous = len(self.client.history)
832 45 try:
832 45 try:
833 ---> 46 ret = f(self, *args, **kwargs)
833 ---> 46 ret = f(self, *args, **kwargs)
834 47 finally:
834 47 finally:
835 48 nmsgs = len(self.client.history) - n_previous
835 48 nmsgs = len(self.client.history) - n_previous
836
836
837 /path/to/site-packages/IPython/parallel/client/view.pyc in _really_apply(self, f, args, kwargs, targets, block, track)
837 /path/to/site-packages/IPython/parallel/client/view.pyc in _really_apply(self, f, args, kwargs, targets, block, track)
838 529 if block:
838 529 if block:
839 530 try:
839 530 try:
840 --> 531 return ar.get()
840 --> 531 return ar.get()
841 532 except KeyboardInterrupt:
841 532 except KeyboardInterrupt:
842 533 pass
842 533 pass
843
843
844 /path/to/site-packages/IPython/parallel/client/asyncresult.pyc in get(self, timeout)
844 /path/to/site-packages/IPython/parallel/client/asyncresult.pyc in get(self, timeout)
845 101 return self._result
845 101 return self._result
846 102 else:
846 102 else:
847 --> 103 raise self._exception
847 --> 103 raise self._exception
848 104 else:
848 104 else:
849 105 raise error.TimeoutError("Result not ready.")
849 105 raise error.TimeoutError("Result not ready.")
850
850
851 CompositeError: one or more exceptions from call to method: _execute
851 CompositeError: one or more exceptions from call to method: _execute
852 [0:apply]: ZeroDivisionError: integer division or modulo by zero
852 [0:apply]: ZeroDivisionError: integer division or modulo by zero
853 [1:apply]: ZeroDivisionError: integer division or modulo by zero
853 [1:apply]: ZeroDivisionError: integer division or modulo by zero
854 [2:apply]: ZeroDivisionError: integer division or modulo by zero
854 [2:apply]: ZeroDivisionError: integer division or modulo by zero
855 [3:apply]: ZeroDivisionError: integer division or modulo by zero
855 [3:apply]: ZeroDivisionError: integer division or modulo by zero
856
856
857 In [82]: %debug
857 In [82]: %debug
858 > /path/to/site-packages/IPython/parallel/client/asyncresult.py(103)get()
858 > /path/to/site-packages/IPython/parallel/client/asyncresult.py(103)get()
859 102 else:
859 102 else:
860 --> 103 raise self._exception
860 --> 103 raise self._exception
861 104 else:
861 104 else:
862
862
863 # With the debugger running, self._exception is the exceptions instance. We can tab complete
863 # With the debugger running, self._exception is the exceptions instance. We can tab complete
864 # on it and see the extra methods that are available.
864 # on it and see the extra methods that are available.
865 ipdb> self._exception.<tab>
865 ipdb> self._exception.<tab>
866 e.__class__ e.__getitem__ e.__new__ e.__setstate__ e.args
866 e.__class__ e.__getitem__ e.__new__ e.__setstate__ e.args
867 e.__delattr__ e.__getslice__ e.__reduce__ e.__str__ e.elist
867 e.__delattr__ e.__getslice__ e.__reduce__ e.__str__ e.elist
868 e.__dict__ e.__hash__ e.__reduce_ex__ e.__weakref__ e.message
868 e.__dict__ e.__hash__ e.__reduce_ex__ e.__weakref__ e.message
869 e.__doc__ e.__init__ e.__repr__ e._get_engine_str e.print_tracebacks
869 e.__doc__ e.__init__ e.__repr__ e._get_engine_str e.print_tracebacks
870 e.__getattribute__ e.__module__ e.__setattr__ e._get_traceback e.raise_exception
870 e.__getattribute__ e.__module__ e.__setattr__ e._get_traceback e.raise_exception
871 ipdb> self._exception.print_tracebacks()
871 ipdb> self._exception.print_tracebacks()
872 [0:apply]:
872 [0:apply]:
873 Traceback (most recent call last):
873 Traceback (most recent call last):
874 File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
874 File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
875 exec code in working,working
875 exec code in working,working
876 File "<string>", line 1, in <module>
876 File "<string>", line 1, in <module>
877 File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
877 File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
878 exec code in globals()
878 exec code in globals()
879 File "<string>", line 1, in <module>
879 File "<string>", line 1, in <module>
880 ZeroDivisionError: integer division or modulo by zero
880 ZeroDivisionError: integer division or modulo by zero
881
881
882
882
883 [1:apply]:
883 [1:apply]:
884 Traceback (most recent call last):
884 Traceback (most recent call last):
885 File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
885 File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
886 exec code in working,working
886 exec code in working,working
887 File "<string>", line 1, in <module>
887 File "<string>", line 1, in <module>
888 File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
888 File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
889 exec code in globals()
889 exec code in globals()
890 File "<string>", line 1, in <module>
890 File "<string>", line 1, in <module>
891 ZeroDivisionError: integer division or modulo by zero
891 ZeroDivisionError: integer division or modulo by zero
892
892
893
893
894 [2:apply]:
894 [2:apply]:
895 Traceback (most recent call last):
895 Traceback (most recent call last):
896 File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
896 File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
897 exec code in working,working
897 exec code in working,working
898 File "<string>", line 1, in <module>
898 File "<string>", line 1, in <module>
899 File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
899 File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
900 exec code in globals()
900 exec code in globals()
901 File "<string>", line 1, in <module>
901 File "<string>", line 1, in <module>
902 ZeroDivisionError: integer division or modulo by zero
902 ZeroDivisionError: integer division or modulo by zero
903
903
904
904
905 [3:apply]:
905 [3:apply]:
906 Traceback (most recent call last):
906 Traceback (most recent call last):
907 File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
907 File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
908 exec code in working,working
908 exec code in working,working
909 File "<string>", line 1, in <module>
909 File "<string>", line 1, in <module>
910 File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
910 File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
911 exec code in globals()
911 exec code in globals()
912 File "<string>", line 1, in <module>
912 File "<string>", line 1, in <module>
913 ZeroDivisionError: integer division or modulo by zero
913 ZeroDivisionError: integer division or modulo by zero
914
914
915
915
916 All of this same error handling magic even works in non-blocking mode:
916 All of this same error handling magic even works in non-blocking mode:
917
917
918 .. sourcecode:: ipython
918 .. sourcecode:: ipython
919
919
920 In [83]: dview.block=False
920 In [83]: dview.block=False
921
921
922 In [84]: ar = dview.execute('1/0')
922 In [84]: ar = dview.execute('1/0')
923
923
924 In [85]: ar.get()
924 In [85]: ar.get()
925 ---------------------------------------------------------------------------
925 ---------------------------------------------------------------------------
926 CompositeError Traceback (most recent call last)
926 CompositeError Traceback (most recent call last)
927 /home/user/<ipython-input-21-8531eb3d26fb> in <module>()
927 /home/user/<ipython-input-21-8531eb3d26fb> in <module>()
928 ----> 1 ar.get()
928 ----> 1 ar.get()
929
929
930 /path/to/site-packages/IPython/parallel/client/asyncresult.pyc in get(self, timeout)
930 /path/to/site-packages/IPython/parallel/client/asyncresult.pyc in get(self, timeout)
931 101 return self._result
931 101 return self._result
932 102 else:
932 102 else:
933 --> 103 raise self._exception
933 --> 103 raise self._exception
934 104 else:
934 104 else:
935 105 raise error.TimeoutError("Result not ready.")
935 105 raise error.TimeoutError("Result not ready.")
936
936
937 CompositeError: one or more exceptions from call to method: _execute
937 CompositeError: one or more exceptions from call to method: _execute
938 [0:apply]: ZeroDivisionError: integer division or modulo by zero
938 [0:apply]: ZeroDivisionError: integer division or modulo by zero
939 [1:apply]: ZeroDivisionError: integer division or modulo by zero
939 [1:apply]: ZeroDivisionError: integer division or modulo by zero
940 [2:apply]: ZeroDivisionError: integer division or modulo by zero
940 [2:apply]: ZeroDivisionError: integer division or modulo by zero
941 [3:apply]: ZeroDivisionError: integer division or modulo by zero
941 [3:apply]: ZeroDivisionError: integer division or modulo by zero
942
942
General Comments 0
You need to be logged in to leave comments. Login now