##// END OF EJS Templates
aesthetics pass on AsyncResult.display_outputs...
MinRK -
Show More
@@ -1,652 +1,670 b''
1 """AsyncResult objects for the client
1 """AsyncResult objects for the client
2
2
3 Authors:
3 Authors:
4
4
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import print_function
19
18 import sys
20 import sys
19 import time
21 import time
20 from datetime import datetime
22 from datetime import datetime
21
23
22 from zmq import MessageTracker
24 from zmq import MessageTracker
23
25
24 from IPython.core.display import clear_output, display
26 from IPython.core.display import clear_output, display, display_pretty
25 from IPython.external.decorator import decorator
27 from IPython.external.decorator import decorator
26 from IPython.parallel import error
28 from IPython.parallel import error
27
29
28 #-----------------------------------------------------------------------------
30 #-----------------------------------------------------------------------------
29 # Functions
31 # Functions
30 #-----------------------------------------------------------------------------
32 #-----------------------------------------------------------------------------
31
33
32 def _total_seconds(td):
34 def _total_seconds(td):
33 """timedelta.total_seconds was added in 2.7"""
35 """timedelta.total_seconds was added in 2.7"""
34 try:
36 try:
35 # Python >= 2.7
37 # Python >= 2.7
36 return td.total_seconds()
38 return td.total_seconds()
37 except AttributeError:
39 except AttributeError:
38 # Python 2.6
40 # Python 2.6
39 return 1e-6 * (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)
41 return 1e-6 * (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)
40
42
43 def _raw_text(s):
44 display_pretty(s, raw=True)
45
41 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
42 # Classes
47 # Classes
43 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
44
49
45 # global empty tracker that's always done:
50 # global empty tracker that's always done:
46 finished_tracker = MessageTracker()
51 finished_tracker = MessageTracker()
47
52
48 @decorator
53 @decorator
49 def check_ready(f, self, *args, **kwargs):
54 def check_ready(f, self, *args, **kwargs):
50 """Call spin() to sync state prior to calling the method."""
55 """Call spin() to sync state prior to calling the method."""
51 self.wait(0)
56 self.wait(0)
52 if not self._ready:
57 if not self._ready:
53 raise error.TimeoutError("result not ready")
58 raise error.TimeoutError("result not ready")
54 return f(self, *args, **kwargs)
59 return f(self, *args, **kwargs)
55
60
56 class AsyncResult(object):
61 class AsyncResult(object):
57 """Class for representing results of non-blocking calls.
62 """Class for representing results of non-blocking calls.
58
63
59 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
64 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
60 """
65 """
61
66
62 msg_ids = None
67 msg_ids = None
63 _targets = None
68 _targets = None
64 _tracker = None
69 _tracker = None
65 _single_result = False
70 _single_result = False
66
71
67 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
72 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
68 if isinstance(msg_ids, basestring):
73 if isinstance(msg_ids, basestring):
69 # always a list
74 # always a list
70 msg_ids = [msg_ids]
75 msg_ids = [msg_ids]
71 if tracker is None:
76 if tracker is None:
72 # default to always done
77 # default to always done
73 tracker = finished_tracker
78 tracker = finished_tracker
74 self._client = client
79 self._client = client
75 self.msg_ids = msg_ids
80 self.msg_ids = msg_ids
76 self._fname=fname
81 self._fname=fname
77 self._targets = targets
82 self._targets = targets
78 self._tracker = tracker
83 self._tracker = tracker
79 self._ready = False
84 self._ready = False
80 self._success = None
85 self._success = None
81 self._metadata = None
86 self._metadata = None
82 if len(msg_ids) == 1:
87 if len(msg_ids) == 1:
83 self._single_result = not isinstance(targets, (list, tuple))
88 self._single_result = not isinstance(targets, (list, tuple))
84 else:
89 else:
85 self._single_result = False
90 self._single_result = False
86
91
87 def __repr__(self):
92 def __repr__(self):
88 if self._ready:
93 if self._ready:
89 return "<%s: finished>"%(self.__class__.__name__)
94 return "<%s: finished>"%(self.__class__.__name__)
90 else:
95 else:
91 return "<%s: %s>"%(self.__class__.__name__,self._fname)
96 return "<%s: %s>"%(self.__class__.__name__,self._fname)
92
97
93
98
94 def _reconstruct_result(self, res):
99 def _reconstruct_result(self, res):
95 """Reconstruct our result from actual result list (always a list)
100 """Reconstruct our result from actual result list (always a list)
96
101
97 Override me in subclasses for turning a list of results
102 Override me in subclasses for turning a list of results
98 into the expected form.
103 into the expected form.
99 """
104 """
100 if self._single_result:
105 if self._single_result:
101 return res[0]
106 return res[0]
102 else:
107 else:
103 return res
108 return res
104
109
105 def get(self, timeout=-1):
110 def get(self, timeout=-1):
106 """Return the result when it arrives.
111 """Return the result when it arrives.
107
112
108 If `timeout` is not ``None`` and the result does not arrive within
113 If `timeout` is not ``None`` and the result does not arrive within
109 `timeout` seconds then ``TimeoutError`` is raised. If the
114 `timeout` seconds then ``TimeoutError`` is raised. If the
110 remote call raised an exception then that exception will be reraised
115 remote call raised an exception then that exception will be reraised
111 by get() inside a `RemoteError`.
116 by get() inside a `RemoteError`.
112 """
117 """
113 if not self.ready():
118 if not self.ready():
114 self.wait(timeout)
119 self.wait(timeout)
115
120
116 if self._ready:
121 if self._ready:
117 if self._success:
122 if self._success:
118 return self._result
123 return self._result
119 else:
124 else:
120 raise self._exception
125 raise self._exception
121 else:
126 else:
122 raise error.TimeoutError("Result not ready.")
127 raise error.TimeoutError("Result not ready.")
123
128
124 def ready(self):
129 def ready(self):
125 """Return whether the call has completed."""
130 """Return whether the call has completed."""
126 if not self._ready:
131 if not self._ready:
127 self.wait(0)
132 self.wait(0)
128 return self._ready
133 return self._ready
129
134
130 def wait(self, timeout=-1):
135 def wait(self, timeout=-1):
131 """Wait until the result is available or until `timeout` seconds pass.
136 """Wait until the result is available or until `timeout` seconds pass.
132
137
133 This method always returns None.
138 This method always returns None.
134 """
139 """
135 if self._ready:
140 if self._ready:
136 return
141 return
137 self._ready = self._client.wait(self.msg_ids, timeout)
142 self._ready = self._client.wait(self.msg_ids, timeout)
138 if self._ready:
143 if self._ready:
139 try:
144 try:
140 results = map(self._client.results.get, self.msg_ids)
145 results = map(self._client.results.get, self.msg_ids)
141 self._result = results
146 self._result = results
142 if self._single_result:
147 if self._single_result:
143 r = results[0]
148 r = results[0]
144 if isinstance(r, Exception):
149 if isinstance(r, Exception):
145 raise r
150 raise r
146 else:
151 else:
147 results = error.collect_exceptions(results, self._fname)
152 results = error.collect_exceptions(results, self._fname)
148 self._result = self._reconstruct_result(results)
153 self._result = self._reconstruct_result(results)
149 except Exception, e:
154 except Exception, e:
150 self._exception = e
155 self._exception = e
151 self._success = False
156 self._success = False
152 else:
157 else:
153 self._success = True
158 self._success = True
154 finally:
159 finally:
155 self._metadata = map(self._client.metadata.get, self.msg_ids)
160 self._metadata = map(self._client.metadata.get, self.msg_ids)
156
161
157
162
158 def successful(self):
163 def successful(self):
159 """Return whether the call completed without raising an exception.
164 """Return whether the call completed without raising an exception.
160
165
161 Will raise ``AssertionError`` if the result is not ready.
166 Will raise ``AssertionError`` if the result is not ready.
162 """
167 """
163 assert self.ready()
168 assert self.ready()
164 return self._success
169 return self._success
165
170
166 #----------------------------------------------------------------
171 #----------------------------------------------------------------
167 # Extra methods not in mp.pool.AsyncResult
172 # Extra methods not in mp.pool.AsyncResult
168 #----------------------------------------------------------------
173 #----------------------------------------------------------------
169
174
170 def get_dict(self, timeout=-1):
175 def get_dict(self, timeout=-1):
171 """Get the results as a dict, keyed by engine_id.
176 """Get the results as a dict, keyed by engine_id.
172
177
173 timeout behavior is described in `get()`.
178 timeout behavior is described in `get()`.
174 """
179 """
175
180
176 results = self.get(timeout)
181 results = self.get(timeout)
177 engine_ids = [ md['engine_id'] for md in self._metadata ]
182 engine_ids = [ md['engine_id'] for md in self._metadata ]
178 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
183 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
179 maxcount = bycount.count(bycount[-1])
184 maxcount = bycount.count(bycount[-1])
180 if maxcount > 1:
185 if maxcount > 1:
181 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
186 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
182 maxcount, bycount[-1]))
187 maxcount, bycount[-1]))
183
188
184 return dict(zip(engine_ids,results))
189 return dict(zip(engine_ids,results))
185
190
186 @property
191 @property
187 def result(self):
192 def result(self):
188 """result property wrapper for `get(timeout=0)`."""
193 """result property wrapper for `get(timeout=0)`."""
189 return self.get()
194 return self.get()
190
195
191 # abbreviated alias:
196 # abbreviated alias:
192 r = result
197 r = result
193
198
194 @property
199 @property
195 @check_ready
200 @check_ready
196 def metadata(self):
201 def metadata(self):
197 """property for accessing execution metadata."""
202 """property for accessing execution metadata."""
198 if self._single_result:
203 if self._single_result:
199 return self._metadata[0]
204 return self._metadata[0]
200 else:
205 else:
201 return self._metadata
206 return self._metadata
202
207
203 @property
208 @property
204 def result_dict(self):
209 def result_dict(self):
205 """result property as a dict."""
210 """result property as a dict."""
206 return self.get_dict()
211 return self.get_dict()
207
212
208 def __dict__(self):
213 def __dict__(self):
209 return self.get_dict(0)
214 return self.get_dict(0)
210
215
211 def abort(self):
216 def abort(self):
212 """abort my tasks."""
217 """abort my tasks."""
213 assert not self.ready(), "Can't abort, I am already done!"
218 assert not self.ready(), "Can't abort, I am already done!"
214 return self._client.abort(self.msg_ids, targets=self._targets, block=True)
219 return self._client.abort(self.msg_ids, targets=self._targets, block=True)
215
220
216 @property
221 @property
217 def sent(self):
222 def sent(self):
218 """check whether my messages have been sent."""
223 """check whether my messages have been sent."""
219 return self._tracker.done
224 return self._tracker.done
220
225
221 def wait_for_send(self, timeout=-1):
226 def wait_for_send(self, timeout=-1):
222 """wait for pyzmq send to complete.
227 """wait for pyzmq send to complete.
223
228
224 This is necessary when sending arrays that you intend to edit in-place.
229 This is necessary when sending arrays that you intend to edit in-place.
225 `timeout` is in seconds, and will raise TimeoutError if it is reached
230 `timeout` is in seconds, and will raise TimeoutError if it is reached
226 before the send completes.
231 before the send completes.
227 """
232 """
228 return self._tracker.wait(timeout)
233 return self._tracker.wait(timeout)
229
234
230 #-------------------------------------
235 #-------------------------------------
231 # dict-access
236 # dict-access
232 #-------------------------------------
237 #-------------------------------------
233
238
234 @check_ready
239 @check_ready
235 def __getitem__(self, key):
240 def __getitem__(self, key):
236 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
241 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
237 """
242 """
238 if isinstance(key, int):
243 if isinstance(key, int):
239 return error.collect_exceptions([self._result[key]], self._fname)[0]
244 return error.collect_exceptions([self._result[key]], self._fname)[0]
240 elif isinstance(key, slice):
245 elif isinstance(key, slice):
241 return error.collect_exceptions(self._result[key], self._fname)
246 return error.collect_exceptions(self._result[key], self._fname)
242 elif isinstance(key, basestring):
247 elif isinstance(key, basestring):
243 values = [ md[key] for md in self._metadata ]
248 values = [ md[key] for md in self._metadata ]
244 if self._single_result:
249 if self._single_result:
245 return values[0]
250 return values[0]
246 else:
251 else:
247 return values
252 return values
248 else:
253 else:
249 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
254 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
250
255
251 def __getattr__(self, key):
256 def __getattr__(self, key):
252 """getattr maps to getitem for convenient attr access to metadata."""
257 """getattr maps to getitem for convenient attr access to metadata."""
253 try:
258 try:
254 return self.__getitem__(key)
259 return self.__getitem__(key)
255 except (error.TimeoutError, KeyError):
260 except (error.TimeoutError, KeyError):
256 raise AttributeError("%r object has no attribute %r"%(
261 raise AttributeError("%r object has no attribute %r"%(
257 self.__class__.__name__, key))
262 self.__class__.__name__, key))
258
263
259 # asynchronous iterator:
264 # asynchronous iterator:
260 def __iter__(self):
265 def __iter__(self):
261 if self._single_result:
266 if self._single_result:
262 raise TypeError("AsyncResults with a single result are not iterable.")
267 raise TypeError("AsyncResults with a single result are not iterable.")
263 try:
268 try:
264 rlist = self.get(0)
269 rlist = self.get(0)
265 except error.TimeoutError:
270 except error.TimeoutError:
266 # wait for each result individually
271 # wait for each result individually
267 for msg_id in self.msg_ids:
272 for msg_id in self.msg_ids:
268 ar = AsyncResult(self._client, msg_id, self._fname)
273 ar = AsyncResult(self._client, msg_id, self._fname)
269 yield ar.get()
274 yield ar.get()
270 else:
275 else:
271 # already done
276 # already done
272 for r in rlist:
277 for r in rlist:
273 yield r
278 yield r
274
279
275 def __len__(self):
280 def __len__(self):
276 return len(self.msg_ids)
281 return len(self.msg_ids)
277
282
278 #-------------------------------------
283 #-------------------------------------
279 # Sugar methods and attributes
284 # Sugar methods and attributes
280 #-------------------------------------
285 #-------------------------------------
281
286
282 def timedelta(self, start, end, start_key=min, end_key=max):
287 def timedelta(self, start, end, start_key=min, end_key=max):
283 """compute the difference between two sets of timestamps
288 """compute the difference between two sets of timestamps
284
289
285 The default behavior is to use the earliest of the first
290 The default behavior is to use the earliest of the first
286 and the latest of the second list, but this can be changed
291 and the latest of the second list, but this can be changed
287 by passing a different
292 by passing a different
288
293
289 Parameters
294 Parameters
290 ----------
295 ----------
291
296
292 start : one or more datetime objects (e.g. ar.submitted)
297 start : one or more datetime objects (e.g. ar.submitted)
293 end : one or more datetime objects (e.g. ar.received)
298 end : one or more datetime objects (e.g. ar.received)
294 start_key : callable
299 start_key : callable
295 Function to call on `start` to extract the relevant
300 Function to call on `start` to extract the relevant
296 entry [defalt: min]
301 entry [defalt: min]
297 end_key : callable
302 end_key : callable
298 Function to call on `end` to extract the relevant
303 Function to call on `end` to extract the relevant
299 entry [default: max]
304 entry [default: max]
300
305
301 Returns
306 Returns
302 -------
307 -------
303
308
304 dt : float
309 dt : float
305 The time elapsed (in seconds) between the two selected timestamps.
310 The time elapsed (in seconds) between the two selected timestamps.
306 """
311 """
307 if not isinstance(start, datetime):
312 if not isinstance(start, datetime):
308 # handle single_result AsyncResults, where ar.stamp is single object,
313 # handle single_result AsyncResults, where ar.stamp is single object,
309 # not a list
314 # not a list
310 start = start_key(start)
315 start = start_key(start)
311 if not isinstance(end, datetime):
316 if not isinstance(end, datetime):
312 # handle single_result AsyncResults, where ar.stamp is single object,
317 # handle single_result AsyncResults, where ar.stamp is single object,
313 # not a list
318 # not a list
314 end = end_key(end)
319 end = end_key(end)
315 return _total_seconds(end - start)
320 return _total_seconds(end - start)
316
321
317 @property
322 @property
318 def progress(self):
323 def progress(self):
319 """the number of tasks which have been completed at this point.
324 """the number of tasks which have been completed at this point.
320
325
321 Fractional progress would be given by 1.0 * ar.progress / len(ar)
326 Fractional progress would be given by 1.0 * ar.progress / len(ar)
322 """
327 """
323 self.wait(0)
328 self.wait(0)
324 return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding))
329 return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding))
325
330
326 @property
331 @property
327 def elapsed(self):
332 def elapsed(self):
328 """elapsed time since initial submission"""
333 """elapsed time since initial submission"""
329 if self.ready():
334 if self.ready():
330 return self.wall_time
335 return self.wall_time
331
336
332 now = submitted = datetime.now()
337 now = submitted = datetime.now()
333 for msg_id in self.msg_ids:
338 for msg_id in self.msg_ids:
334 if msg_id in self._client.metadata:
339 if msg_id in self._client.metadata:
335 stamp = self._client.metadata[msg_id]['submitted']
340 stamp = self._client.metadata[msg_id]['submitted']
336 if stamp and stamp < submitted:
341 if stamp and stamp < submitted:
337 submitted = stamp
342 submitted = stamp
338 return _total_seconds(now-submitted)
343 return _total_seconds(now-submitted)
339
344
340 @property
345 @property
341 @check_ready
346 @check_ready
342 def serial_time(self):
347 def serial_time(self):
343 """serial computation time of a parallel calculation
348 """serial computation time of a parallel calculation
344
349
345 Computed as the sum of (completed-started) of each task
350 Computed as the sum of (completed-started) of each task
346 """
351 """
347 t = 0
352 t = 0
348 for md in self._metadata:
353 for md in self._metadata:
349 t += _total_seconds(md['completed'] - md['started'])
354 t += _total_seconds(md['completed'] - md['started'])
350 return t
355 return t
351
356
352 @property
357 @property
353 @check_ready
358 @check_ready
354 def wall_time(self):
359 def wall_time(self):
355 """actual computation time of a parallel calculation
360 """actual computation time of a parallel calculation
356
361
357 Computed as the time between the latest `received` stamp
362 Computed as the time between the latest `received` stamp
358 and the earliest `submitted`.
363 and the earliest `submitted`.
359
364
360 Only reliable if Client was spinning/waiting when the task finished, because
365 Only reliable if Client was spinning/waiting when the task finished, because
361 the `received` timestamp is created when a result is pulled off of the zmq queue,
366 the `received` timestamp is created when a result is pulled off of the zmq queue,
362 which happens as a result of `client.spin()`.
367 which happens as a result of `client.spin()`.
363
368
364 For similar comparison of other timestamp pairs, check out AsyncResult.timedelta.
369 For similar comparison of other timestamp pairs, check out AsyncResult.timedelta.
365
370
366 """
371 """
367 return self.timedelta(self.submitted, self.received)
372 return self.timedelta(self.submitted, self.received)
368
373
369 def wait_interactive(self, interval=1., timeout=None):
374 def wait_interactive(self, interval=1., timeout=None):
370 """interactive wait, printing progress at regular intervals"""
375 """interactive wait, printing progress at regular intervals"""
371 N = len(self)
376 N = len(self)
372 tic = time.time()
377 tic = time.time()
373 while not self.ready() and (timeout is None or time.time() - tic <= timeout):
378 while not self.ready() and (timeout is None or time.time() - tic <= timeout):
374 self.wait(interval)
379 self.wait(interval)
375 clear_output()
380 clear_output()
376 print "%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed),
381 print("%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed), end="")
377 sys.stdout.flush()
382 sys.stdout.flush()
378 print
383 print()
379 print "done"
384 print("done")
380
385
381 def _republish_displaypub(self, content, eid):
386 def _republish_displaypub(self, content, eid):
382 """republish individual displaypub content dicts"""
387 """republish individual displaypub content dicts"""
383 try:
388 try:
384 ip = get_ipython()
389 ip = get_ipython()
385 except NameError:
390 except NameError:
386 # displaypub is meaningless outside IPython
391 # displaypub is meaningless outside IPython
387 return
392 return
388 md = content['metadata'] or {}
393 md = content['metadata'] or {}
389 md['engine'] = eid
394 md['engine'] = eid
390 ip.display_pub.publish(content['source'], content['data'], md)
395 ip.display_pub.publish(content['source'], content['data'], md)
396
397 def _display_stream(self, text, prefix='', file=None):
398 if not text:
399 # nothing to display
400 return
401 if file is None:
402 file = sys.stdout
403 end = '' if text.endswith('\n') else '\n'
391
404
405 multiline = text.count('\n') > int(text.endswith('\n'))
406 if prefix and multiline and not text.startswith('\n'):
407 prefix = prefix + '\n'
408 print("%s%s" % (prefix, text), file=file, end=end)
392
409
410
393 def _display_single_result(self):
411 def _display_single_result(self):
394 if self.stdout:
412 self._display_stream(self.stdout)
395 print self.stdout
413 self._display_stream(self.stderr, file=sys.stderr)
396 if self.stderr:
397 print >> sys.stderr, self.stderr
398
414
399 try:
415 try:
400 get_ipython()
416 get_ipython()
401 except NameError:
417 except NameError:
402 # displaypub is meaningless outside IPython
418 # displaypub is meaningless outside IPython
403 return
419 return
404
420
405 for output in self.outputs:
421 for output in self.outputs:
406 self._republish_displaypub(output, self.engine_id)
422 self._republish_displaypub(output, self.engine_id)
407
423
408 if self.pyout is not None:
424 if self.pyout is not None:
409 display(self.get())
425 display(self.get())
410
426
411 @check_ready
427 @check_ready
412 def display_outputs(self, groupby="type"):
428 def display_outputs(self, groupby="type"):
413 """republish the outputs of the computation
429 """republish the outputs of the computation
414
430
415 Parameters
431 Parameters
416 ----------
432 ----------
417
433
418 groupby : str [default: type]
434 groupby : str [default: type]
419 if 'type':
435 if 'type':
420 Group outputs by type (show all stdout, then all stderr, etc.):
436 Group outputs by type (show all stdout, then all stderr, etc.):
421
437
422 [stdout:1] foo
438 [stdout:1] foo
423 [stdout:2] foo
439 [stdout:2] foo
424 [stderr:1] bar
440 [stderr:1] bar
425 [stderr:2] bar
441 [stderr:2] bar
426 if 'engine':
442 if 'engine':
427 Display outputs for each engine before moving on to the next:
443 Display outputs for each engine before moving on to the next:
428
444
429 [stdout:1] foo
445 [stdout:1] foo
430 [stderr:1] bar
446 [stderr:1] bar
431 [stdout:2] foo
447 [stdout:2] foo
432 [stderr:2] bar
448 [stderr:2] bar
433
449
434 if 'order':
450 if 'order':
435 Like 'type', but further collate individual displaypub
451 Like 'type', but further collate individual displaypub
436 outputs. This is meant for cases of each command producing
452 outputs. This is meant for cases of each command producing
437 several plots, and you would like to see all of the first
453 several plots, and you would like to see all of the first
438 plots together, then all of the second plots, and so on.
454 plots together, then all of the second plots, and so on.
439 """
455 """
440 # flush iopub, just in case
456 # flush iopub, just in case
441 self._client._flush_iopub(self._client._iopub_socket)
457 self._client._flush_iopub(self._client._iopub_socket)
442 if self._single_result:
458 if self._single_result:
443 self._display_single_result()
459 self._display_single_result()
444 return
460 return
445
461
446 stdouts = [s.rstrip() for s in self.stdout]
462 stdouts = self.stdout
447 stderrs = [s.rstrip() for s in self.stderr]
463 stderrs = self.stderr
448 pyouts = [p for p in self.pyout]
464 pyouts = self.pyout
449 output_lists = self.outputs
465 output_lists = self.outputs
450 results = self.get()
466 results = self.get()
451
467
452 targets = self.engine_id
468 targets = self.engine_id
453
469
454 if groupby == "engine":
470 if groupby == "engine":
455 for eid,stdout,stderr,outputs,r,pyout in zip(
471 for eid,stdout,stderr,outputs,r,pyout in zip(
456 targets, stdouts, stderrs, output_lists, results, pyouts
472 targets, stdouts, stderrs, output_lists, results, pyouts
457 ):
473 ):
458 if stdout:
474 self._display_stream(stdout, '[stdout:%i] ' % eid)
459 print '[stdout:%i]' % eid, stdout
475 self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
460 if stderr:
461 print >> sys.stderr, '[stderr:%i]' % eid, stderr
462
476
463 try:
477 try:
464 get_ipython()
478 get_ipython()
465 except NameError:
479 except NameError:
466 # displaypub is meaningless outside IPython
480 # displaypub is meaningless outside IPython
467 return
481 return
468
482
483 if outputs or pyout is not None:
484 _raw_text('[output:%i]' % eid)
485
469 for output in outputs:
486 for output in outputs:
470 self._republish_displaypub(output, eid)
487 self._republish_displaypub(output, eid)
471
488
472 if pyout is not None:
489 if pyout is not None:
473 display(r)
490 display(r)
474
491
475 elif groupby in ('type', 'order'):
492 elif groupby in ('type', 'order'):
476 # republish stdout:
493 # republish stdout:
477 if any(stdouts):
494 for eid,stdout in zip(targets, stdouts):
478 for eid,stdout in zip(targets, stdouts):
495 self._display_stream(stdout, '[stdout:%i] ' % eid)
479 print '[stdout:%i]' % eid, stdout
480
496
481 # republish stderr:
497 # republish stderr:
482 if any(stderrs):
498 for eid,stderr in zip(targets, stderrs):
483 for eid,stderr in zip(targets, stderrs):
499 self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
484 print >> sys.stderr, '[stderr:%i]' % eid, stderr
485
500
486 try:
501 try:
487 get_ipython()
502 get_ipython()
488 except NameError:
503 except NameError:
489 # displaypub is meaningless outside IPython
504 # displaypub is meaningless outside IPython
490 return
505 return
491
506
492 if groupby == 'order':
507 if groupby == 'order':
493 output_dict = dict((eid, outputs) for eid,outputs in zip(targets, output_lists))
508 output_dict = dict((eid, outputs) for eid,outputs in zip(targets, output_lists))
494 N = max(len(outputs) for outputs in output_lists)
509 N = max(len(outputs) for outputs in output_lists)
495 for i in range(N):
510 for i in range(N):
496 for eid in targets:
511 for eid in targets:
497 outputs = output_dict[eid]
512 outputs = output_dict[eid]
498 if len(outputs) >= N:
513 if len(outputs) >= N:
514 _raw_text('[output:%i]' % eid)
499 self._republish_displaypub(outputs[i], eid)
515 self._republish_displaypub(outputs[i], eid)
500 else:
516 else:
501 # republish displaypub output
517 # republish displaypub output
502 for eid,outputs in zip(targets, output_lists):
518 for eid,outputs in zip(targets, output_lists):
519 if outputs:
520 _raw_text('[output:%i]' % eid)
503 for output in outputs:
521 for output in outputs:
504 self._republish_displaypub(output, eid)
522 self._republish_displaypub(output, eid)
505
523
506 # finally, add pyout:
524 # finally, add pyout:
507 for eid,r,pyout in zip(targets, results, pyouts):
525 for eid,r,pyout in zip(targets, results, pyouts):
508 if pyout is not None:
526 if pyout is not None:
509 display(r)
527 display(r)
510
528
511 else:
529 else:
512 raise ValueError("groupby must be one of 'type', 'engine', 'collate', not %r" % groupby)
530 raise ValueError("groupby must be one of 'type', 'engine', 'collate', not %r" % groupby)
513
531
514
532
515
533
516
534
517 class AsyncMapResult(AsyncResult):
535 class AsyncMapResult(AsyncResult):
518 """Class for representing results of non-blocking gathers.
536 """Class for representing results of non-blocking gathers.
519
537
520 This will properly reconstruct the gather.
538 This will properly reconstruct the gather.
521
539
522 This class is iterable at any time, and will wait on results as they come.
540 This class is iterable at any time, and will wait on results as they come.
523
541
524 If ordered=False, then the first results to arrive will come first, otherwise
542 If ordered=False, then the first results to arrive will come first, otherwise
525 results will be yielded in the order they were submitted.
543 results will be yielded in the order they were submitted.
526
544
527 """
545 """
528
546
529 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
547 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
530 AsyncResult.__init__(self, client, msg_ids, fname=fname)
548 AsyncResult.__init__(self, client, msg_ids, fname=fname)
531 self._mapObject = mapObject
549 self._mapObject = mapObject
532 self._single_result = False
550 self._single_result = False
533 self.ordered = ordered
551 self.ordered = ordered
534
552
535 def _reconstruct_result(self, res):
553 def _reconstruct_result(self, res):
536 """Perform the gather on the actual results."""
554 """Perform the gather on the actual results."""
537 return self._mapObject.joinPartitions(res)
555 return self._mapObject.joinPartitions(res)
538
556
539 # asynchronous iterator:
557 # asynchronous iterator:
540 def __iter__(self):
558 def __iter__(self):
541 it = self._ordered_iter if self.ordered else self._unordered_iter
559 it = self._ordered_iter if self.ordered else self._unordered_iter
542 for r in it():
560 for r in it():
543 yield r
561 yield r
544
562
545 # asynchronous ordered iterator:
563 # asynchronous ordered iterator:
546 def _ordered_iter(self):
564 def _ordered_iter(self):
547 """iterator for results *as they arrive*, preserving submission order."""
565 """iterator for results *as they arrive*, preserving submission order."""
548 try:
566 try:
549 rlist = self.get(0)
567 rlist = self.get(0)
550 except error.TimeoutError:
568 except error.TimeoutError:
551 # wait for each result individually
569 # wait for each result individually
552 for msg_id in self.msg_ids:
570 for msg_id in self.msg_ids:
553 ar = AsyncResult(self._client, msg_id, self._fname)
571 ar = AsyncResult(self._client, msg_id, self._fname)
554 rlist = ar.get()
572 rlist = ar.get()
555 try:
573 try:
556 for r in rlist:
574 for r in rlist:
557 yield r
575 yield r
558 except TypeError:
576 except TypeError:
559 # flattened, not a list
577 # flattened, not a list
560 # this could get broken by flattened data that returns iterables
578 # this could get broken by flattened data that returns iterables
561 # but most calls to map do not expose the `flatten` argument
579 # but most calls to map do not expose the `flatten` argument
562 yield rlist
580 yield rlist
563 else:
581 else:
564 # already done
582 # already done
565 for r in rlist:
583 for r in rlist:
566 yield r
584 yield r
567
585
568 # asynchronous unordered iterator:
586 # asynchronous unordered iterator:
569 def _unordered_iter(self):
587 def _unordered_iter(self):
570 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
588 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
571 try:
589 try:
572 rlist = self.get(0)
590 rlist = self.get(0)
573 except error.TimeoutError:
591 except error.TimeoutError:
574 pending = set(self.msg_ids)
592 pending = set(self.msg_ids)
575 while pending:
593 while pending:
576 try:
594 try:
577 self._client.wait(pending, 1e-3)
595 self._client.wait(pending, 1e-3)
578 except error.TimeoutError:
596 except error.TimeoutError:
579 # ignore timeout error, because that only means
597 # ignore timeout error, because that only means
580 # *some* jobs are outstanding
598 # *some* jobs are outstanding
581 pass
599 pass
582 # update ready set with those no longer outstanding:
600 # update ready set with those no longer outstanding:
583 ready = pending.difference(self._client.outstanding)
601 ready = pending.difference(self._client.outstanding)
584 # update pending to exclude those that are finished
602 # update pending to exclude those that are finished
585 pending = pending.difference(ready)
603 pending = pending.difference(ready)
586 while ready:
604 while ready:
587 msg_id = ready.pop()
605 msg_id = ready.pop()
588 ar = AsyncResult(self._client, msg_id, self._fname)
606 ar = AsyncResult(self._client, msg_id, self._fname)
589 rlist = ar.get()
607 rlist = ar.get()
590 try:
608 try:
591 for r in rlist:
609 for r in rlist:
592 yield r
610 yield r
593 except TypeError:
611 except TypeError:
594 # flattened, not a list
612 # flattened, not a list
595 # this could get broken by flattened data that returns iterables
613 # this could get broken by flattened data that returns iterables
596 # but most calls to map do not expose the `flatten` argument
614 # but most calls to map do not expose the `flatten` argument
597 yield rlist
615 yield rlist
598 else:
616 else:
599 # already done
617 # already done
600 for r in rlist:
618 for r in rlist:
601 yield r
619 yield r
602
620
603
621
604
622
605 class AsyncHubResult(AsyncResult):
623 class AsyncHubResult(AsyncResult):
606 """Class to wrap pending results that must be requested from the Hub.
624 """Class to wrap pending results that must be requested from the Hub.
607
625
608 Note that waiting/polling on these objects requires polling the Hubover the network,
626 Note that waiting/polling on these objects requires polling the Hubover the network,
609 so use `AsyncHubResult.wait()` sparingly.
627 so use `AsyncHubResult.wait()` sparingly.
610 """
628 """
611
629
612 def wait(self, timeout=-1):
630 def wait(self, timeout=-1):
613 """wait for result to complete."""
631 """wait for result to complete."""
614 start = time.time()
632 start = time.time()
615 if self._ready:
633 if self._ready:
616 return
634 return
617 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
635 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
618 local_ready = self._client.wait(local_ids, timeout)
636 local_ready = self._client.wait(local_ids, timeout)
619 if local_ready:
637 if local_ready:
620 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
638 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
621 if not remote_ids:
639 if not remote_ids:
622 self._ready = True
640 self._ready = True
623 else:
641 else:
624 rdict = self._client.result_status(remote_ids, status_only=False)
642 rdict = self._client.result_status(remote_ids, status_only=False)
625 pending = rdict['pending']
643 pending = rdict['pending']
626 while pending and (timeout < 0 or time.time() < start+timeout):
644 while pending and (timeout < 0 or time.time() < start+timeout):
627 rdict = self._client.result_status(remote_ids, status_only=False)
645 rdict = self._client.result_status(remote_ids, status_only=False)
628 pending = rdict['pending']
646 pending = rdict['pending']
629 if pending:
647 if pending:
630 time.sleep(0.1)
648 time.sleep(0.1)
631 if not pending:
649 if not pending:
632 self._ready = True
650 self._ready = True
633 if self._ready:
651 if self._ready:
634 try:
652 try:
635 results = map(self._client.results.get, self.msg_ids)
653 results = map(self._client.results.get, self.msg_ids)
636 self._result = results
654 self._result = results
637 if self._single_result:
655 if self._single_result:
638 r = results[0]
656 r = results[0]
639 if isinstance(r, Exception):
657 if isinstance(r, Exception):
640 raise r
658 raise r
641 else:
659 else:
642 results = error.collect_exceptions(results, self._fname)
660 results = error.collect_exceptions(results, self._fname)
643 self._result = self._reconstruct_result(results)
661 self._result = self._reconstruct_result(results)
644 except Exception, e:
662 except Exception, e:
645 self._exception = e
663 self._exception = e
646 self._success = False
664 self._success = False
647 else:
665 else:
648 self._success = True
666 self._success = True
649 finally:
667 finally:
650 self._metadata = map(self._client.metadata.get, self.msg_ids)
668 self._metadata = map(self._client.metadata.get, self.msg_ids)
651
669
652 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult'] No newline at end of file
670 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']
@@ -1,1655 +1,1659 b''
1 """A semi-synchronous Client for the ZMQ cluster
1 """A semi-synchronous Client for the ZMQ cluster
2
2
3 Authors:
3 Authors:
4
4
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import json
19 import json
20 import sys
20 import sys
21 from threading import Thread, Event
21 from threading import Thread, Event
22 import time
22 import time
23 import warnings
23 import warnings
24 from datetime import datetime
24 from datetime import datetime
25 from getpass import getpass
25 from getpass import getpass
26 from pprint import pprint
26 from pprint import pprint
27
27
28 pjoin = os.path.join
28 pjoin = os.path.join
29
29
30 import zmq
30 import zmq
31 # from zmq.eventloop import ioloop, zmqstream
31 # from zmq.eventloop import ioloop, zmqstream
32
32
33 from IPython.config.configurable import MultipleInstanceError
33 from IPython.config.configurable import MultipleInstanceError
34 from IPython.core.application import BaseIPythonApplication
34 from IPython.core.application import BaseIPythonApplication
35
35
36 from IPython.utils.coloransi import TermColors
36 from IPython.utils.coloransi import TermColors
37 from IPython.utils.jsonutil import rekey
37 from IPython.utils.jsonutil import rekey
38 from IPython.utils.localinterfaces import LOCAL_IPS
38 from IPython.utils.localinterfaces import LOCAL_IPS
39 from IPython.utils.path import get_ipython_dir
39 from IPython.utils.path import get_ipython_dir
40 from IPython.utils.py3compat import cast_bytes
40 from IPython.utils.py3compat import cast_bytes
41 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
41 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
42 Dict, List, Bool, Set, Any)
42 Dict, List, Bool, Set, Any)
43 from IPython.external.decorator import decorator
43 from IPython.external.decorator import decorator
44 from IPython.external.ssh import tunnel
44 from IPython.external.ssh import tunnel
45
45
46 from IPython.parallel import Reference
46 from IPython.parallel import Reference
47 from IPython.parallel import error
47 from IPython.parallel import error
48 from IPython.parallel import util
48 from IPython.parallel import util
49
49
50 from IPython.zmq.session import Session, Message
50 from IPython.zmq.session import Session, Message
51
51
52 from .asyncresult import AsyncResult, AsyncHubResult
52 from .asyncresult import AsyncResult, AsyncHubResult
53 from IPython.core.profiledir import ProfileDir, ProfileDirError
53 from IPython.core.profiledir import ProfileDir, ProfileDirError
54 from .view import DirectView, LoadBalancedView
54 from .view import DirectView, LoadBalancedView
55
55
56 if sys.version_info[0] >= 3:
56 if sys.version_info[0] >= 3:
57 # xrange is used in a couple 'isinstance' tests in py2
57 # xrange is used in a couple 'isinstance' tests in py2
58 # should be just 'range' in 3k
58 # should be just 'range' in 3k
59 xrange = range
59 xrange = range
60
60
61 #--------------------------------------------------------------------------
61 #--------------------------------------------------------------------------
62 # Decorators for Client methods
62 # Decorators for Client methods
63 #--------------------------------------------------------------------------
63 #--------------------------------------------------------------------------
64
64
65 @decorator
65 @decorator
66 def spin_first(f, self, *args, **kwargs):
66 def spin_first(f, self, *args, **kwargs):
67 """Call spin() to sync state prior to calling the method."""
67 """Call spin() to sync state prior to calling the method."""
68 self.spin()
68 self.spin()
69 return f(self, *args, **kwargs)
69 return f(self, *args, **kwargs)
70
70
71
71
72 #--------------------------------------------------------------------------
72 #--------------------------------------------------------------------------
73 # Classes
73 # Classes
74 #--------------------------------------------------------------------------
74 #--------------------------------------------------------------------------
75
75
76
76
77 class ExecuteReply(object):
77 class ExecuteReply(object):
78 """wrapper for finished Execute results"""
78 """wrapper for finished Execute results"""
79 def __init__(self, msg_id, content, metadata):
79 def __init__(self, msg_id, content, metadata):
80 self.msg_id = msg_id
80 self.msg_id = msg_id
81 self._content = content
81 self._content = content
82 self.execution_count = content['execution_count']
82 self.execution_count = content['execution_count']
83 self.metadata = metadata
83 self.metadata = metadata
84
84
85 def __getitem__(self, key):
85 def __getitem__(self, key):
86 return self.metadata[key]
86 return self.metadata[key]
87
87
88 def __getattr__(self, key):
88 def __getattr__(self, key):
89 if key not in self.metadata:
89 if key not in self.metadata:
90 raise AttributeError(key)
90 raise AttributeError(key)
91 return self.metadata[key]
91 return self.metadata[key]
92
92
93 def __repr__(self):
93 def __repr__(self):
94 pyout = self.metadata['pyout'] or {'data':{}}
94 pyout = self.metadata['pyout'] or {'data':{}}
95 text_out = pyout['data'].get('text/plain', '')
95 text_out = pyout['data'].get('text/plain', '')
96 if len(text_out) > 32:
96 if len(text_out) > 32:
97 text_out = text_out[:29] + '...'
97 text_out = text_out[:29] + '...'
98
98
99 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
99 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
100
100
101 def _repr_pretty_(self, p, cycle):
101 def _repr_pretty_(self, p, cycle):
102 pyout = self.metadata['pyout'] or {'data':{}}
102 pyout = self.metadata['pyout'] or {'data':{}}
103 text_out = pyout['data'].get('text/plain', '')
103 text_out = pyout['data'].get('text/plain', '')
104
104
105 if not text_out:
105 if not text_out:
106 return
106 return
107
107
108 try:
108 try:
109 ip = get_ipython()
109 ip = get_ipython()
110 except NameError:
110 except NameError:
111 colors = "NoColor"
111 colors = "NoColor"
112 else:
112 else:
113 colors = ip.colors
113 colors = ip.colors
114
114
115 if colors == "NoColor":
115 if colors == "NoColor":
116 out = normal = ""
116 out = normal = ""
117 else:
117 else:
118 out = TermColors.Red
118 out = TermColors.Red
119 normal = TermColors.Normal
119 normal = TermColors.Normal
120
120
121 if '\n' in text_out and not text_out.startswith('\n'):
122 # add newline for multiline reprs
123 text_out = '\n' + text_out
124
121 p.text(
125 p.text(
122 u'[%i] ' % self.metadata['engine_id'] +
126 out + u'Out[%i:%i]: ' % (
123 out + u'Out[%i]: ' % self.execution_count +
127 self.metadata['engine_id'], self.execution_count
124 normal + text_out
128 ) + normal + text_out
125 )
129 )
126
130
127 def _repr_html_(self):
131 def _repr_html_(self):
128 pyout = self.metadata['pyout'] or {'data':{}}
132 pyout = self.metadata['pyout'] or {'data':{}}
129 return pyout['data'].get("text/html")
133 return pyout['data'].get("text/html")
130
134
131 def _repr_latex_(self):
135 def _repr_latex_(self):
132 pyout = self.metadata['pyout'] or {'data':{}}
136 pyout = self.metadata['pyout'] or {'data':{}}
133 return pyout['data'].get("text/latex")
137 return pyout['data'].get("text/latex")
134
138
135 def _repr_json_(self):
139 def _repr_json_(self):
136 pyout = self.metadata['pyout'] or {'data':{}}
140 pyout = self.metadata['pyout'] or {'data':{}}
137 return pyout['data'].get("application/json")
141 return pyout['data'].get("application/json")
138
142
139 def _repr_javascript_(self):
143 def _repr_javascript_(self):
140 pyout = self.metadata['pyout'] or {'data':{}}
144 pyout = self.metadata['pyout'] or {'data':{}}
141 return pyout['data'].get("application/javascript")
145 return pyout['data'].get("application/javascript")
142
146
143 def _repr_png_(self):
147 def _repr_png_(self):
144 pyout = self.metadata['pyout'] or {'data':{}}
148 pyout = self.metadata['pyout'] or {'data':{}}
145 return pyout['data'].get("image/png")
149 return pyout['data'].get("image/png")
146
150
147 def _repr_jpeg_(self):
151 def _repr_jpeg_(self):
148 pyout = self.metadata['pyout'] or {'data':{}}
152 pyout = self.metadata['pyout'] or {'data':{}}
149 return pyout['data'].get("image/jpeg")
153 return pyout['data'].get("image/jpeg")
150
154
151 def _repr_svg_(self):
155 def _repr_svg_(self):
152 pyout = self.metadata['pyout'] or {'data':{}}
156 pyout = self.metadata['pyout'] or {'data':{}}
153 return pyout['data'].get("image/svg+xml")
157 return pyout['data'].get("image/svg+xml")
154
158
155
159
156 class Metadata(dict):
160 class Metadata(dict):
157 """Subclass of dict for initializing metadata values.
161 """Subclass of dict for initializing metadata values.
158
162
159 Attribute access works on keys.
163 Attribute access works on keys.
160
164
161 These objects have a strict set of keys - errors will raise if you try
165 These objects have a strict set of keys - errors will raise if you try
162 to add new keys.
166 to add new keys.
163 """
167 """
164 def __init__(self, *args, **kwargs):
168 def __init__(self, *args, **kwargs):
165 dict.__init__(self)
169 dict.__init__(self)
166 md = {'msg_id' : None,
170 md = {'msg_id' : None,
167 'submitted' : None,
171 'submitted' : None,
168 'started' : None,
172 'started' : None,
169 'completed' : None,
173 'completed' : None,
170 'received' : None,
174 'received' : None,
171 'engine_uuid' : None,
175 'engine_uuid' : None,
172 'engine_id' : None,
176 'engine_id' : None,
173 'follow' : None,
177 'follow' : None,
174 'after' : None,
178 'after' : None,
175 'status' : None,
179 'status' : None,
176
180
177 'pyin' : None,
181 'pyin' : None,
178 'pyout' : None,
182 'pyout' : None,
179 'pyerr' : None,
183 'pyerr' : None,
180 'stdout' : '',
184 'stdout' : '',
181 'stderr' : '',
185 'stderr' : '',
182 'outputs' : [],
186 'outputs' : [],
183 }
187 }
184 self.update(md)
188 self.update(md)
185 self.update(dict(*args, **kwargs))
189 self.update(dict(*args, **kwargs))
186
190
187 def __getattr__(self, key):
191 def __getattr__(self, key):
188 """getattr aliased to getitem"""
192 """getattr aliased to getitem"""
189 if key in self.iterkeys():
193 if key in self.iterkeys():
190 return self[key]
194 return self[key]
191 else:
195 else:
192 raise AttributeError(key)
196 raise AttributeError(key)
193
197
194 def __setattr__(self, key, value):
198 def __setattr__(self, key, value):
195 """setattr aliased to setitem, with strict"""
199 """setattr aliased to setitem, with strict"""
196 if key in self.iterkeys():
200 if key in self.iterkeys():
197 self[key] = value
201 self[key] = value
198 else:
202 else:
199 raise AttributeError(key)
203 raise AttributeError(key)
200
204
201 def __setitem__(self, key, value):
205 def __setitem__(self, key, value):
202 """strict static key enforcement"""
206 """strict static key enforcement"""
203 if key in self.iterkeys():
207 if key in self.iterkeys():
204 dict.__setitem__(self, key, value)
208 dict.__setitem__(self, key, value)
205 else:
209 else:
206 raise KeyError(key)
210 raise KeyError(key)
207
211
208
212
209 class Client(HasTraits):
213 class Client(HasTraits):
210 """A semi-synchronous client to the IPython ZMQ cluster
214 """A semi-synchronous client to the IPython ZMQ cluster
211
215
212 Parameters
216 Parameters
213 ----------
217 ----------
214
218
215 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
219 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
216 Connection information for the Hub's registration. If a json connector
220 Connection information for the Hub's registration. If a json connector
217 file is given, then likely no further configuration is necessary.
221 file is given, then likely no further configuration is necessary.
218 [Default: use profile]
222 [Default: use profile]
219 profile : bytes
223 profile : bytes
220 The name of the Cluster profile to be used to find connector information.
224 The name of the Cluster profile to be used to find connector information.
221 If run from an IPython application, the default profile will be the same
225 If run from an IPython application, the default profile will be the same
222 as the running application, otherwise it will be 'default'.
226 as the running application, otherwise it will be 'default'.
223 context : zmq.Context
227 context : zmq.Context
224 Pass an existing zmq.Context instance, otherwise the client will create its own.
228 Pass an existing zmq.Context instance, otherwise the client will create its own.
225 debug : bool
229 debug : bool
226 flag for lots of message printing for debug purposes
230 flag for lots of message printing for debug purposes
227 timeout : int/float
231 timeout : int/float
228 time (in seconds) to wait for connection replies from the Hub
232 time (in seconds) to wait for connection replies from the Hub
229 [Default: 10]
233 [Default: 10]
230
234
231 #-------------- session related args ----------------
235 #-------------- session related args ----------------
232
236
233 config : Config object
237 config : Config object
234 If specified, this will be relayed to the Session for configuration
238 If specified, this will be relayed to the Session for configuration
235 username : str
239 username : str
236 set username for the session object
240 set username for the session object
237 packer : str (import_string) or callable
241 packer : str (import_string) or callable
238 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
242 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
239 function to serialize messages. Must support same input as
243 function to serialize messages. Must support same input as
240 JSON, and output must be bytes.
244 JSON, and output must be bytes.
241 You can pass a callable directly as `pack`
245 You can pass a callable directly as `pack`
242 unpacker : str (import_string) or callable
246 unpacker : str (import_string) or callable
243 The inverse of packer. Only necessary if packer is specified as *not* one
247 The inverse of packer. Only necessary if packer is specified as *not* one
244 of 'json' or 'pickle'.
248 of 'json' or 'pickle'.
245
249
246 #-------------- ssh related args ----------------
250 #-------------- ssh related args ----------------
247 # These are args for configuring the ssh tunnel to be used
251 # These are args for configuring the ssh tunnel to be used
248 # credentials are used to forward connections over ssh to the Controller
252 # credentials are used to forward connections over ssh to the Controller
249 # Note that the ip given in `addr` needs to be relative to sshserver
253 # Note that the ip given in `addr` needs to be relative to sshserver
250 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
254 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
251 # and set sshserver as the same machine the Controller is on. However,
255 # and set sshserver as the same machine the Controller is on. However,
252 # the only requirement is that sshserver is able to see the Controller
256 # the only requirement is that sshserver is able to see the Controller
253 # (i.e. is within the same trusted network).
257 # (i.e. is within the same trusted network).
254
258
255 sshserver : str
259 sshserver : str
256 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
260 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
257 If keyfile or password is specified, and this is not, it will default to
261 If keyfile or password is specified, and this is not, it will default to
258 the ip given in addr.
262 the ip given in addr.
259 sshkey : str; path to ssh private key file
263 sshkey : str; path to ssh private key file
260 This specifies a key to be used in ssh login, default None.
264 This specifies a key to be used in ssh login, default None.
261 Regular default ssh keys will be used without specifying this argument.
265 Regular default ssh keys will be used without specifying this argument.
262 password : str
266 password : str
263 Your ssh password to sshserver. Note that if this is left None,
267 Your ssh password to sshserver. Note that if this is left None,
264 you will be prompted for it if passwordless key based login is unavailable.
268 you will be prompted for it if passwordless key based login is unavailable.
265 paramiko : bool
269 paramiko : bool
266 flag for whether to use paramiko instead of shell ssh for tunneling.
270 flag for whether to use paramiko instead of shell ssh for tunneling.
267 [default: True on win32, False else]
271 [default: True on win32, False else]
268
272
269 ------- exec authentication args -------
273 ------- exec authentication args -------
270 If even localhost is untrusted, you can have some protection against
274 If even localhost is untrusted, you can have some protection against
271 unauthorized execution by signing messages with HMAC digests.
275 unauthorized execution by signing messages with HMAC digests.
272 Messages are still sent as cleartext, so if someone can snoop your
276 Messages are still sent as cleartext, so if someone can snoop your
273 loopback traffic this will not protect your privacy, but will prevent
277 loopback traffic this will not protect your privacy, but will prevent
274 unauthorized execution.
278 unauthorized execution.
275
279
276 exec_key : str
280 exec_key : str
277 an authentication key or file containing a key
281 an authentication key or file containing a key
278 default: None
282 default: None
279
283
280
284
281 Attributes
285 Attributes
282 ----------
286 ----------
283
287
284 ids : list of int engine IDs
288 ids : list of int engine IDs
285 requesting the ids attribute always synchronizes
289 requesting the ids attribute always synchronizes
286 the registration state. To request ids without synchronization,
290 the registration state. To request ids without synchronization,
287 use semi-private _ids attributes.
291 use semi-private _ids attributes.
288
292
289 history : list of msg_ids
293 history : list of msg_ids
290 a list of msg_ids, keeping track of all the execution
294 a list of msg_ids, keeping track of all the execution
291 messages you have submitted in order.
295 messages you have submitted in order.
292
296
293 outstanding : set of msg_ids
297 outstanding : set of msg_ids
294 a set of msg_ids that have been submitted, but whose
298 a set of msg_ids that have been submitted, but whose
295 results have not yet been received.
299 results have not yet been received.
296
300
297 results : dict
301 results : dict
298 a dict of all our results, keyed by msg_id
302 a dict of all our results, keyed by msg_id
299
303
300 block : bool
304 block : bool
301 determines default behavior when block not specified
305 determines default behavior when block not specified
302 in execution methods
306 in execution methods
303
307
304 Methods
308 Methods
305 -------
309 -------
306
310
307 spin
311 spin
308 flushes incoming results and registration state changes
312 flushes incoming results and registration state changes
309 control methods spin, and requesting `ids` also ensures up to date
313 control methods spin, and requesting `ids` also ensures up to date
310
314
311 wait
315 wait
312 wait on one or more msg_ids
316 wait on one or more msg_ids
313
317
314 execution methods
318 execution methods
315 apply
319 apply
316 legacy: execute, run
320 legacy: execute, run
317
321
318 data movement
322 data movement
319 push, pull, scatter, gather
323 push, pull, scatter, gather
320
324
321 query methods
325 query methods
322 queue_status, get_result, purge, result_status
326 queue_status, get_result, purge, result_status
323
327
324 control methods
328 control methods
325 abort, shutdown
329 abort, shutdown
326
330
327 """
331 """
328
332
329
333
330 block = Bool(False)
334 block = Bool(False)
331 outstanding = Set()
335 outstanding = Set()
332 results = Instance('collections.defaultdict', (dict,))
336 results = Instance('collections.defaultdict', (dict,))
333 metadata = Instance('collections.defaultdict', (Metadata,))
337 metadata = Instance('collections.defaultdict', (Metadata,))
334 history = List()
338 history = List()
335 debug = Bool(False)
339 debug = Bool(False)
336 _spin_thread = Any()
340 _spin_thread = Any()
337 _stop_spinning = Any()
341 _stop_spinning = Any()
338
342
339 profile=Unicode()
343 profile=Unicode()
340 def _profile_default(self):
344 def _profile_default(self):
341 if BaseIPythonApplication.initialized():
345 if BaseIPythonApplication.initialized():
342 # an IPython app *might* be running, try to get its profile
346 # an IPython app *might* be running, try to get its profile
343 try:
347 try:
344 return BaseIPythonApplication.instance().profile
348 return BaseIPythonApplication.instance().profile
345 except (AttributeError, MultipleInstanceError):
349 except (AttributeError, MultipleInstanceError):
346 # could be a *different* subclass of config.Application,
350 # could be a *different* subclass of config.Application,
347 # which would raise one of these two errors.
351 # which would raise one of these two errors.
348 return u'default'
352 return u'default'
349 else:
353 else:
350 return u'default'
354 return u'default'
351
355
352
356
353 _outstanding_dict = Instance('collections.defaultdict', (set,))
357 _outstanding_dict = Instance('collections.defaultdict', (set,))
354 _ids = List()
358 _ids = List()
355 _connected=Bool(False)
359 _connected=Bool(False)
356 _ssh=Bool(False)
360 _ssh=Bool(False)
357 _context = Instance('zmq.Context')
361 _context = Instance('zmq.Context')
358 _config = Dict()
362 _config = Dict()
359 _engines=Instance(util.ReverseDict, (), {})
363 _engines=Instance(util.ReverseDict, (), {})
360 # _hub_socket=Instance('zmq.Socket')
364 # _hub_socket=Instance('zmq.Socket')
361 _query_socket=Instance('zmq.Socket')
365 _query_socket=Instance('zmq.Socket')
362 _control_socket=Instance('zmq.Socket')
366 _control_socket=Instance('zmq.Socket')
363 _iopub_socket=Instance('zmq.Socket')
367 _iopub_socket=Instance('zmq.Socket')
364 _notification_socket=Instance('zmq.Socket')
368 _notification_socket=Instance('zmq.Socket')
365 _mux_socket=Instance('zmq.Socket')
369 _mux_socket=Instance('zmq.Socket')
366 _task_socket=Instance('zmq.Socket')
370 _task_socket=Instance('zmq.Socket')
367 _task_scheme=Unicode()
371 _task_scheme=Unicode()
368 _closed = False
372 _closed = False
369 _ignored_control_replies=Integer(0)
373 _ignored_control_replies=Integer(0)
370 _ignored_hub_replies=Integer(0)
374 _ignored_hub_replies=Integer(0)
371
375
372 def __new__(self, *args, **kw):
376 def __new__(self, *args, **kw):
373 # don't raise on positional args
377 # don't raise on positional args
374 return HasTraits.__new__(self, **kw)
378 return HasTraits.__new__(self, **kw)
375
379
376 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
380 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
377 context=None, debug=False, exec_key=None,
381 context=None, debug=False, exec_key=None,
378 sshserver=None, sshkey=None, password=None, paramiko=None,
382 sshserver=None, sshkey=None, password=None, paramiko=None,
379 timeout=10, **extra_args
383 timeout=10, **extra_args
380 ):
384 ):
381 if profile:
385 if profile:
382 super(Client, self).__init__(debug=debug, profile=profile)
386 super(Client, self).__init__(debug=debug, profile=profile)
383 else:
387 else:
384 super(Client, self).__init__(debug=debug)
388 super(Client, self).__init__(debug=debug)
385 if context is None:
389 if context is None:
386 context = zmq.Context.instance()
390 context = zmq.Context.instance()
387 self._context = context
391 self._context = context
388 self._stop_spinning = Event()
392 self._stop_spinning = Event()
389
393
390 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
394 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
391 if self._cd is not None:
395 if self._cd is not None:
392 if url_or_file is None:
396 if url_or_file is None:
393 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
397 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
394 if url_or_file is None:
398 if url_or_file is None:
395 raise ValueError(
399 raise ValueError(
396 "I can't find enough information to connect to a hub!"
400 "I can't find enough information to connect to a hub!"
397 " Please specify at least one of url_or_file or profile."
401 " Please specify at least one of url_or_file or profile."
398 )
402 )
399
403
400 if not util.is_url(url_or_file):
404 if not util.is_url(url_or_file):
401 # it's not a url, try for a file
405 # it's not a url, try for a file
402 if not os.path.exists(url_or_file):
406 if not os.path.exists(url_or_file):
403 if self._cd:
407 if self._cd:
404 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
408 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
405 if not os.path.exists(url_or_file):
409 if not os.path.exists(url_or_file):
406 raise IOError("Connection file not found: %r" % url_or_file)
410 raise IOError("Connection file not found: %r" % url_or_file)
407 with open(url_or_file) as f:
411 with open(url_or_file) as f:
408 cfg = json.loads(f.read())
412 cfg = json.loads(f.read())
409 else:
413 else:
410 cfg = {'url':url_or_file}
414 cfg = {'url':url_or_file}
411
415
412 # sync defaults from args, json:
416 # sync defaults from args, json:
413 if sshserver:
417 if sshserver:
414 cfg['ssh'] = sshserver
418 cfg['ssh'] = sshserver
415 if exec_key:
419 if exec_key:
416 cfg['exec_key'] = exec_key
420 cfg['exec_key'] = exec_key
417 exec_key = cfg['exec_key']
421 exec_key = cfg['exec_key']
418 location = cfg.setdefault('location', None)
422 location = cfg.setdefault('location', None)
419 cfg['url'] = util.disambiguate_url(cfg['url'], location)
423 cfg['url'] = util.disambiguate_url(cfg['url'], location)
420 url = cfg['url']
424 url = cfg['url']
421 proto,addr,port = util.split_url(url)
425 proto,addr,port = util.split_url(url)
422 if location is not None and addr == '127.0.0.1':
426 if location is not None and addr == '127.0.0.1':
423 # location specified, and connection is expected to be local
427 # location specified, and connection is expected to be local
424 if location not in LOCAL_IPS and not sshserver:
428 if location not in LOCAL_IPS and not sshserver:
425 # load ssh from JSON *only* if the controller is not on
429 # load ssh from JSON *only* if the controller is not on
426 # this machine
430 # this machine
427 sshserver=cfg['ssh']
431 sshserver=cfg['ssh']
428 if location not in LOCAL_IPS and not sshserver:
432 if location not in LOCAL_IPS and not sshserver:
429 # warn if no ssh specified, but SSH is probably needed
433 # warn if no ssh specified, but SSH is probably needed
430 # This is only a warning, because the most likely cause
434 # This is only a warning, because the most likely cause
431 # is a local Controller on a laptop whose IP is dynamic
435 # is a local Controller on a laptop whose IP is dynamic
432 warnings.warn("""
436 warnings.warn("""
433 Controller appears to be listening on localhost, but not on this machine.
437 Controller appears to be listening on localhost, but not on this machine.
434 If this is true, you should specify Client(...,sshserver='you@%s')
438 If this is true, you should specify Client(...,sshserver='you@%s')
435 or instruct your controller to listen on an external IP."""%location,
439 or instruct your controller to listen on an external IP."""%location,
436 RuntimeWarning)
440 RuntimeWarning)
437 elif not sshserver:
441 elif not sshserver:
438 # otherwise sync with cfg
442 # otherwise sync with cfg
439 sshserver = cfg['ssh']
443 sshserver = cfg['ssh']
440
444
441 self._config = cfg
445 self._config = cfg
442
446
443 self._ssh = bool(sshserver or sshkey or password)
447 self._ssh = bool(sshserver or sshkey or password)
444 if self._ssh and sshserver is None:
448 if self._ssh and sshserver is None:
445 # default to ssh via localhost
449 # default to ssh via localhost
446 sshserver = url.split('://')[1].split(':')[0]
450 sshserver = url.split('://')[1].split(':')[0]
447 if self._ssh and password is None:
451 if self._ssh and password is None:
448 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
452 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
449 password=False
453 password=False
450 else:
454 else:
451 password = getpass("SSH Password for %s: "%sshserver)
455 password = getpass("SSH Password for %s: "%sshserver)
452 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
456 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
453
457
454 # configure and construct the session
458 # configure and construct the session
455 if exec_key is not None:
459 if exec_key is not None:
456 if os.path.isfile(exec_key):
460 if os.path.isfile(exec_key):
457 extra_args['keyfile'] = exec_key
461 extra_args['keyfile'] = exec_key
458 else:
462 else:
459 exec_key = cast_bytes(exec_key)
463 exec_key = cast_bytes(exec_key)
460 extra_args['key'] = exec_key
464 extra_args['key'] = exec_key
461 self.session = Session(**extra_args)
465 self.session = Session(**extra_args)
462
466
463 self._query_socket = self._context.socket(zmq.DEALER)
467 self._query_socket = self._context.socket(zmq.DEALER)
464 self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
468 self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
465 if self._ssh:
469 if self._ssh:
466 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
470 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
467 else:
471 else:
468 self._query_socket.connect(url)
472 self._query_socket.connect(url)
469
473
470 self.session.debug = self.debug
474 self.session.debug = self.debug
471
475
472 self._notification_handlers = {'registration_notification' : self._register_engine,
476 self._notification_handlers = {'registration_notification' : self._register_engine,
473 'unregistration_notification' : self._unregister_engine,
477 'unregistration_notification' : self._unregister_engine,
474 'shutdown_notification' : lambda msg: self.close(),
478 'shutdown_notification' : lambda msg: self.close(),
475 }
479 }
476 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
480 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
477 'apply_reply' : self._handle_apply_reply}
481 'apply_reply' : self._handle_apply_reply}
478 self._connect(sshserver, ssh_kwargs, timeout)
482 self._connect(sshserver, ssh_kwargs, timeout)
479
483
480 def __del__(self):
484 def __del__(self):
481 """cleanup sockets, but _not_ context."""
485 """cleanup sockets, but _not_ context."""
482 self.close()
486 self.close()
483
487
484 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
488 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
485 if ipython_dir is None:
489 if ipython_dir is None:
486 ipython_dir = get_ipython_dir()
490 ipython_dir = get_ipython_dir()
487 if profile_dir is not None:
491 if profile_dir is not None:
488 try:
492 try:
489 self._cd = ProfileDir.find_profile_dir(profile_dir)
493 self._cd = ProfileDir.find_profile_dir(profile_dir)
490 return
494 return
491 except ProfileDirError:
495 except ProfileDirError:
492 pass
496 pass
493 elif profile is not None:
497 elif profile is not None:
494 try:
498 try:
495 self._cd = ProfileDir.find_profile_dir_by_name(
499 self._cd = ProfileDir.find_profile_dir_by_name(
496 ipython_dir, profile)
500 ipython_dir, profile)
497 return
501 return
498 except ProfileDirError:
502 except ProfileDirError:
499 pass
503 pass
500 self._cd = None
504 self._cd = None
501
505
502 def _update_engines(self, engines):
506 def _update_engines(self, engines):
503 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
507 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
504 for k,v in engines.iteritems():
508 for k,v in engines.iteritems():
505 eid = int(k)
509 eid = int(k)
506 self._engines[eid] = v
510 self._engines[eid] = v
507 self._ids.append(eid)
511 self._ids.append(eid)
508 self._ids = sorted(self._ids)
512 self._ids = sorted(self._ids)
509 if sorted(self._engines.keys()) != range(len(self._engines)) and \
513 if sorted(self._engines.keys()) != range(len(self._engines)) and \
510 self._task_scheme == 'pure' and self._task_socket:
514 self._task_scheme == 'pure' and self._task_socket:
511 self._stop_scheduling_tasks()
515 self._stop_scheduling_tasks()
512
516
513 def _stop_scheduling_tasks(self):
517 def _stop_scheduling_tasks(self):
514 """Stop scheduling tasks because an engine has been unregistered
518 """Stop scheduling tasks because an engine has been unregistered
515 from a pure ZMQ scheduler.
519 from a pure ZMQ scheduler.
516 """
520 """
517 self._task_socket.close()
521 self._task_socket.close()
518 self._task_socket = None
522 self._task_socket = None
519 msg = "An engine has been unregistered, and we are using pure " +\
523 msg = "An engine has been unregistered, and we are using pure " +\
520 "ZMQ task scheduling. Task farming will be disabled."
524 "ZMQ task scheduling. Task farming will be disabled."
521 if self.outstanding:
525 if self.outstanding:
522 msg += " If you were running tasks when this happened, " +\
526 msg += " If you were running tasks when this happened, " +\
523 "some `outstanding` msg_ids may never resolve."
527 "some `outstanding` msg_ids may never resolve."
524 warnings.warn(msg, RuntimeWarning)
528 warnings.warn(msg, RuntimeWarning)
525
529
526 def _build_targets(self, targets):
530 def _build_targets(self, targets):
527 """Turn valid target IDs or 'all' into two lists:
531 """Turn valid target IDs or 'all' into two lists:
528 (int_ids, uuids).
532 (int_ids, uuids).
529 """
533 """
530 if not self._ids:
534 if not self._ids:
531 # flush notification socket if no engines yet, just in case
535 # flush notification socket if no engines yet, just in case
532 if not self.ids:
536 if not self.ids:
533 raise error.NoEnginesRegistered("Can't build targets without any engines")
537 raise error.NoEnginesRegistered("Can't build targets without any engines")
534
538
535 if targets is None:
539 if targets is None:
536 targets = self._ids
540 targets = self._ids
537 elif isinstance(targets, basestring):
541 elif isinstance(targets, basestring):
538 if targets.lower() == 'all':
542 if targets.lower() == 'all':
539 targets = self._ids
543 targets = self._ids
540 else:
544 else:
541 raise TypeError("%r not valid str target, must be 'all'"%(targets))
545 raise TypeError("%r not valid str target, must be 'all'"%(targets))
542 elif isinstance(targets, int):
546 elif isinstance(targets, int):
543 if targets < 0:
547 if targets < 0:
544 targets = self.ids[targets]
548 targets = self.ids[targets]
545 if targets not in self._ids:
549 if targets not in self._ids:
546 raise IndexError("No such engine: %i"%targets)
550 raise IndexError("No such engine: %i"%targets)
547 targets = [targets]
551 targets = [targets]
548
552
549 if isinstance(targets, slice):
553 if isinstance(targets, slice):
550 indices = range(len(self._ids))[targets]
554 indices = range(len(self._ids))[targets]
551 ids = self.ids
555 ids = self.ids
552 targets = [ ids[i] for i in indices ]
556 targets = [ ids[i] for i in indices ]
553
557
554 if not isinstance(targets, (tuple, list, xrange)):
558 if not isinstance(targets, (tuple, list, xrange)):
555 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
559 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
556
560
557 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
561 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
558
562
559 def _connect(self, sshserver, ssh_kwargs, timeout):
563 def _connect(self, sshserver, ssh_kwargs, timeout):
560 """setup all our socket connections to the cluster. This is called from
564 """setup all our socket connections to the cluster. This is called from
561 __init__."""
565 __init__."""
562
566
563 # Maybe allow reconnecting?
567 # Maybe allow reconnecting?
564 if self._connected:
568 if self._connected:
565 return
569 return
566 self._connected=True
570 self._connected=True
567
571
568 def connect_socket(s, url):
572 def connect_socket(s, url):
569 url = util.disambiguate_url(url, self._config['location'])
573 url = util.disambiguate_url(url, self._config['location'])
570 if self._ssh:
574 if self._ssh:
571 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
575 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
572 else:
576 else:
573 return s.connect(url)
577 return s.connect(url)
574
578
575 self.session.send(self._query_socket, 'connection_request')
579 self.session.send(self._query_socket, 'connection_request')
576 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
580 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
577 poller = zmq.Poller()
581 poller = zmq.Poller()
578 poller.register(self._query_socket, zmq.POLLIN)
582 poller.register(self._query_socket, zmq.POLLIN)
579 # poll expects milliseconds, timeout is seconds
583 # poll expects milliseconds, timeout is seconds
580 evts = poller.poll(timeout*1000)
584 evts = poller.poll(timeout*1000)
581 if not evts:
585 if not evts:
582 raise error.TimeoutError("Hub connection request timed out")
586 raise error.TimeoutError("Hub connection request timed out")
583 idents,msg = self.session.recv(self._query_socket,mode=0)
587 idents,msg = self.session.recv(self._query_socket,mode=0)
584 if self.debug:
588 if self.debug:
585 pprint(msg)
589 pprint(msg)
586 msg = Message(msg)
590 msg = Message(msg)
587 content = msg.content
591 content = msg.content
588 self._config['registration'] = dict(content)
592 self._config['registration'] = dict(content)
589 if content.status == 'ok':
593 if content.status == 'ok':
590 ident = self.session.bsession
594 ident = self.session.bsession
591 if content.mux:
595 if content.mux:
592 self._mux_socket = self._context.socket(zmq.DEALER)
596 self._mux_socket = self._context.socket(zmq.DEALER)
593 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
597 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
594 connect_socket(self._mux_socket, content.mux)
598 connect_socket(self._mux_socket, content.mux)
595 if content.task:
599 if content.task:
596 self._task_scheme, task_addr = content.task
600 self._task_scheme, task_addr = content.task
597 self._task_socket = self._context.socket(zmq.DEALER)
601 self._task_socket = self._context.socket(zmq.DEALER)
598 self._task_socket.setsockopt(zmq.IDENTITY, ident)
602 self._task_socket.setsockopt(zmq.IDENTITY, ident)
599 connect_socket(self._task_socket, task_addr)
603 connect_socket(self._task_socket, task_addr)
600 if content.notification:
604 if content.notification:
601 self._notification_socket = self._context.socket(zmq.SUB)
605 self._notification_socket = self._context.socket(zmq.SUB)
602 connect_socket(self._notification_socket, content.notification)
606 connect_socket(self._notification_socket, content.notification)
603 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
607 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
604 # if content.query:
608 # if content.query:
605 # self._query_socket = self._context.socket(zmq.DEALER)
609 # self._query_socket = self._context.socket(zmq.DEALER)
606 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
610 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
607 # connect_socket(self._query_socket, content.query)
611 # connect_socket(self._query_socket, content.query)
608 if content.control:
612 if content.control:
609 self._control_socket = self._context.socket(zmq.DEALER)
613 self._control_socket = self._context.socket(zmq.DEALER)
610 self._control_socket.setsockopt(zmq.IDENTITY, ident)
614 self._control_socket.setsockopt(zmq.IDENTITY, ident)
611 connect_socket(self._control_socket, content.control)
615 connect_socket(self._control_socket, content.control)
612 if content.iopub:
616 if content.iopub:
613 self._iopub_socket = self._context.socket(zmq.SUB)
617 self._iopub_socket = self._context.socket(zmq.SUB)
614 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
618 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
615 self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
619 self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
616 connect_socket(self._iopub_socket, content.iopub)
620 connect_socket(self._iopub_socket, content.iopub)
617 self._update_engines(dict(content.engines))
621 self._update_engines(dict(content.engines))
618 else:
622 else:
619 self._connected = False
623 self._connected = False
620 raise Exception("Failed to connect!")
624 raise Exception("Failed to connect!")
621
625
622 #--------------------------------------------------------------------------
626 #--------------------------------------------------------------------------
623 # handlers and callbacks for incoming messages
627 # handlers and callbacks for incoming messages
624 #--------------------------------------------------------------------------
628 #--------------------------------------------------------------------------
625
629
626 def _unwrap_exception(self, content):
630 def _unwrap_exception(self, content):
627 """unwrap exception, and remap engine_id to int."""
631 """unwrap exception, and remap engine_id to int."""
628 e = error.unwrap_exception(content)
632 e = error.unwrap_exception(content)
629 # print e.traceback
633 # print e.traceback
630 if e.engine_info:
634 if e.engine_info:
631 e_uuid = e.engine_info['engine_uuid']
635 e_uuid = e.engine_info['engine_uuid']
632 eid = self._engines[e_uuid]
636 eid = self._engines[e_uuid]
633 e.engine_info['engine_id'] = eid
637 e.engine_info['engine_id'] = eid
634 return e
638 return e
635
639
636 def _extract_metadata(self, header, parent, content):
640 def _extract_metadata(self, header, parent, content):
637 md = {'msg_id' : parent['msg_id'],
641 md = {'msg_id' : parent['msg_id'],
638 'received' : datetime.now(),
642 'received' : datetime.now(),
639 'engine_uuid' : header.get('engine', None),
643 'engine_uuid' : header.get('engine', None),
640 'follow' : parent.get('follow', []),
644 'follow' : parent.get('follow', []),
641 'after' : parent.get('after', []),
645 'after' : parent.get('after', []),
642 'status' : content['status'],
646 'status' : content['status'],
643 }
647 }
644
648
645 if md['engine_uuid'] is not None:
649 if md['engine_uuid'] is not None:
646 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
650 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
647
651
648 if 'date' in parent:
652 if 'date' in parent:
649 md['submitted'] = parent['date']
653 md['submitted'] = parent['date']
650 if 'started' in header:
654 if 'started' in header:
651 md['started'] = header['started']
655 md['started'] = header['started']
652 if 'date' in header:
656 if 'date' in header:
653 md['completed'] = header['date']
657 md['completed'] = header['date']
654 return md
658 return md
655
659
656 def _register_engine(self, msg):
660 def _register_engine(self, msg):
657 """Register a new engine, and update our connection info."""
661 """Register a new engine, and update our connection info."""
658 content = msg['content']
662 content = msg['content']
659 eid = content['id']
663 eid = content['id']
660 d = {eid : content['queue']}
664 d = {eid : content['queue']}
661 self._update_engines(d)
665 self._update_engines(d)
662
666
663 def _unregister_engine(self, msg):
667 def _unregister_engine(self, msg):
664 """Unregister an engine that has died."""
668 """Unregister an engine that has died."""
665 content = msg['content']
669 content = msg['content']
666 eid = int(content['id'])
670 eid = int(content['id'])
667 if eid in self._ids:
671 if eid in self._ids:
668 self._ids.remove(eid)
672 self._ids.remove(eid)
669 uuid = self._engines.pop(eid)
673 uuid = self._engines.pop(eid)
670
674
671 self._handle_stranded_msgs(eid, uuid)
675 self._handle_stranded_msgs(eid, uuid)
672
676
673 if self._task_socket and self._task_scheme == 'pure':
677 if self._task_socket and self._task_scheme == 'pure':
674 self._stop_scheduling_tasks()
678 self._stop_scheduling_tasks()
675
679
676 def _handle_stranded_msgs(self, eid, uuid):
680 def _handle_stranded_msgs(self, eid, uuid):
677 """Handle messages known to be on an engine when the engine unregisters.
681 """Handle messages known to be on an engine when the engine unregisters.
678
682
679 It is possible that this will fire prematurely - that is, an engine will
683 It is possible that this will fire prematurely - that is, an engine will
680 go down after completing a result, and the client will be notified
684 go down after completing a result, and the client will be notified
681 of the unregistration and later receive the successful result.
685 of the unregistration and later receive the successful result.
682 """
686 """
683
687
684 outstanding = self._outstanding_dict[uuid]
688 outstanding = self._outstanding_dict[uuid]
685
689
686 for msg_id in list(outstanding):
690 for msg_id in list(outstanding):
687 if msg_id in self.results:
691 if msg_id in self.results:
688 # we already
692 # we already
689 continue
693 continue
690 try:
694 try:
691 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
695 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
692 except:
696 except:
693 content = error.wrap_exception()
697 content = error.wrap_exception()
694 # build a fake message:
698 # build a fake message:
695 parent = {}
699 parent = {}
696 header = {}
700 header = {}
697 parent['msg_id'] = msg_id
701 parent['msg_id'] = msg_id
698 header['engine'] = uuid
702 header['engine'] = uuid
699 header['date'] = datetime.now()
703 header['date'] = datetime.now()
700 msg = dict(parent_header=parent, header=header, content=content)
704 msg = dict(parent_header=parent, header=header, content=content)
701 self._handle_apply_reply(msg)
705 self._handle_apply_reply(msg)
702
706
703 def _handle_execute_reply(self, msg):
707 def _handle_execute_reply(self, msg):
704 """Save the reply to an execute_request into our results.
708 """Save the reply to an execute_request into our results.
705
709
706 execute messages are never actually used. apply is used instead.
710 execute messages are never actually used. apply is used instead.
707 """
711 """
708
712
709 parent = msg['parent_header']
713 parent = msg['parent_header']
710 msg_id = parent['msg_id']
714 msg_id = parent['msg_id']
711 if msg_id not in self.outstanding:
715 if msg_id not in self.outstanding:
712 if msg_id in self.history:
716 if msg_id in self.history:
713 print ("got stale result: %s"%msg_id)
717 print ("got stale result: %s"%msg_id)
714 else:
718 else:
715 print ("got unknown result: %s"%msg_id)
719 print ("got unknown result: %s"%msg_id)
716 else:
720 else:
717 self.outstanding.remove(msg_id)
721 self.outstanding.remove(msg_id)
718
722
719 content = msg['content']
723 content = msg['content']
720 header = msg['header']
724 header = msg['header']
721
725
722 # construct metadata:
726 # construct metadata:
723 md = self.metadata[msg_id]
727 md = self.metadata[msg_id]
724 md.update(self._extract_metadata(header, parent, content))
728 md.update(self._extract_metadata(header, parent, content))
725 # is this redundant?
729 # is this redundant?
726 self.metadata[msg_id] = md
730 self.metadata[msg_id] = md
727
731
728 e_outstanding = self._outstanding_dict[md['engine_uuid']]
732 e_outstanding = self._outstanding_dict[md['engine_uuid']]
729 if msg_id in e_outstanding:
733 if msg_id in e_outstanding:
730 e_outstanding.remove(msg_id)
734 e_outstanding.remove(msg_id)
731
735
732 # construct result:
736 # construct result:
733 if content['status'] == 'ok':
737 if content['status'] == 'ok':
734 self.results[msg_id] = ExecuteReply(msg_id, content, md)
738 self.results[msg_id] = ExecuteReply(msg_id, content, md)
735 elif content['status'] == 'aborted':
739 elif content['status'] == 'aborted':
736 self.results[msg_id] = error.TaskAborted(msg_id)
740 self.results[msg_id] = error.TaskAborted(msg_id)
737 elif content['status'] == 'resubmitted':
741 elif content['status'] == 'resubmitted':
738 # TODO: handle resubmission
742 # TODO: handle resubmission
739 pass
743 pass
740 else:
744 else:
741 self.results[msg_id] = self._unwrap_exception(content)
745 self.results[msg_id] = self._unwrap_exception(content)
742
746
743 def _handle_apply_reply(self, msg):
747 def _handle_apply_reply(self, msg):
744 """Save the reply to an apply_request into our results."""
748 """Save the reply to an apply_request into our results."""
745 parent = msg['parent_header']
749 parent = msg['parent_header']
746 msg_id = parent['msg_id']
750 msg_id = parent['msg_id']
747 if msg_id not in self.outstanding:
751 if msg_id not in self.outstanding:
748 if msg_id in self.history:
752 if msg_id in self.history:
749 print ("got stale result: %s"%msg_id)
753 print ("got stale result: %s"%msg_id)
750 print self.results[msg_id]
754 print self.results[msg_id]
751 print msg
755 print msg
752 else:
756 else:
753 print ("got unknown result: %s"%msg_id)
757 print ("got unknown result: %s"%msg_id)
754 else:
758 else:
755 self.outstanding.remove(msg_id)
759 self.outstanding.remove(msg_id)
756 content = msg['content']
760 content = msg['content']
757 header = msg['header']
761 header = msg['header']
758
762
759 # construct metadata:
763 # construct metadata:
760 md = self.metadata[msg_id]
764 md = self.metadata[msg_id]
761 md.update(self._extract_metadata(header, parent, content))
765 md.update(self._extract_metadata(header, parent, content))
762 # is this redundant?
766 # is this redundant?
763 self.metadata[msg_id] = md
767 self.metadata[msg_id] = md
764
768
765 e_outstanding = self._outstanding_dict[md['engine_uuid']]
769 e_outstanding = self._outstanding_dict[md['engine_uuid']]
766 if msg_id in e_outstanding:
770 if msg_id in e_outstanding:
767 e_outstanding.remove(msg_id)
771 e_outstanding.remove(msg_id)
768
772
769 # construct result:
773 # construct result:
770 if content['status'] == 'ok':
774 if content['status'] == 'ok':
771 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
775 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
772 elif content['status'] == 'aborted':
776 elif content['status'] == 'aborted':
773 self.results[msg_id] = error.TaskAborted(msg_id)
777 self.results[msg_id] = error.TaskAborted(msg_id)
774 elif content['status'] == 'resubmitted':
778 elif content['status'] == 'resubmitted':
775 # TODO: handle resubmission
779 # TODO: handle resubmission
776 pass
780 pass
777 else:
781 else:
778 self.results[msg_id] = self._unwrap_exception(content)
782 self.results[msg_id] = self._unwrap_exception(content)
779
783
780 def _flush_notifications(self):
784 def _flush_notifications(self):
781 """Flush notifications of engine registrations waiting
785 """Flush notifications of engine registrations waiting
782 in ZMQ queue."""
786 in ZMQ queue."""
783 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
787 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
784 while msg is not None:
788 while msg is not None:
785 if self.debug:
789 if self.debug:
786 pprint(msg)
790 pprint(msg)
787 msg_type = msg['header']['msg_type']
791 msg_type = msg['header']['msg_type']
788 handler = self._notification_handlers.get(msg_type, None)
792 handler = self._notification_handlers.get(msg_type, None)
789 if handler is None:
793 if handler is None:
790 raise Exception("Unhandled message type: %s"%msg.msg_type)
794 raise Exception("Unhandled message type: %s"%msg.msg_type)
791 else:
795 else:
792 handler(msg)
796 handler(msg)
793 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
797 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
794
798
795 def _flush_results(self, sock):
799 def _flush_results(self, sock):
796 """Flush task or queue results waiting in ZMQ queue."""
800 """Flush task or queue results waiting in ZMQ queue."""
797 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
801 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
798 while msg is not None:
802 while msg is not None:
799 if self.debug:
803 if self.debug:
800 pprint(msg)
804 pprint(msg)
801 msg_type = msg['header']['msg_type']
805 msg_type = msg['header']['msg_type']
802 handler = self._queue_handlers.get(msg_type, None)
806 handler = self._queue_handlers.get(msg_type, None)
803 if handler is None:
807 if handler is None:
804 raise Exception("Unhandled message type: %s"%msg.msg_type)
808 raise Exception("Unhandled message type: %s"%msg.msg_type)
805 else:
809 else:
806 handler(msg)
810 handler(msg)
807 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
811 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
808
812
809 def _flush_control(self, sock):
813 def _flush_control(self, sock):
810 """Flush replies from the control channel waiting
814 """Flush replies from the control channel waiting
811 in the ZMQ queue.
815 in the ZMQ queue.
812
816
813 Currently: ignore them."""
817 Currently: ignore them."""
814 if self._ignored_control_replies <= 0:
818 if self._ignored_control_replies <= 0:
815 return
819 return
816 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
820 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
817 while msg is not None:
821 while msg is not None:
818 self._ignored_control_replies -= 1
822 self._ignored_control_replies -= 1
819 if self.debug:
823 if self.debug:
820 pprint(msg)
824 pprint(msg)
821 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
825 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
822
826
823 def _flush_ignored_control(self):
827 def _flush_ignored_control(self):
824 """flush ignored control replies"""
828 """flush ignored control replies"""
825 while self._ignored_control_replies > 0:
829 while self._ignored_control_replies > 0:
826 self.session.recv(self._control_socket)
830 self.session.recv(self._control_socket)
827 self._ignored_control_replies -= 1
831 self._ignored_control_replies -= 1
828
832
829 def _flush_ignored_hub_replies(self):
833 def _flush_ignored_hub_replies(self):
830 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
834 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
831 while msg is not None:
835 while msg is not None:
832 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
836 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
833
837
834 def _flush_iopub(self, sock):
838 def _flush_iopub(self, sock):
835 """Flush replies from the iopub channel waiting
839 """Flush replies from the iopub channel waiting
836 in the ZMQ queue.
840 in the ZMQ queue.
837 """
841 """
838 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
842 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
839 while msg is not None:
843 while msg is not None:
840 if self.debug:
844 if self.debug:
841 pprint(msg)
845 pprint(msg)
842 parent = msg['parent_header']
846 parent = msg['parent_header']
843 # ignore IOPub messages with no parent.
847 # ignore IOPub messages with no parent.
844 # Caused by print statements or warnings from before the first execution.
848 # Caused by print statements or warnings from before the first execution.
845 if not parent:
849 if not parent:
846 continue
850 continue
847 msg_id = parent['msg_id']
851 msg_id = parent['msg_id']
848 content = msg['content']
852 content = msg['content']
849 header = msg['header']
853 header = msg['header']
850 msg_type = msg['header']['msg_type']
854 msg_type = msg['header']['msg_type']
851
855
852 # init metadata:
856 # init metadata:
853 md = self.metadata[msg_id]
857 md = self.metadata[msg_id]
854
858
855 if msg_type == 'stream':
859 if msg_type == 'stream':
856 name = content['name']
860 name = content['name']
857 s = md[name] or ''
861 s = md[name] or ''
858 md[name] = s + content['data']
862 md[name] = s + content['data']
859 elif msg_type == 'pyerr':
863 elif msg_type == 'pyerr':
860 md.update({'pyerr' : self._unwrap_exception(content)})
864 md.update({'pyerr' : self._unwrap_exception(content)})
861 elif msg_type == 'pyin':
865 elif msg_type == 'pyin':
862 md.update({'pyin' : content['code']})
866 md.update({'pyin' : content['code']})
863 elif msg_type == 'display_data':
867 elif msg_type == 'display_data':
864 md['outputs'].append(content)
868 md['outputs'].append(content)
865 elif msg_type == 'pyout':
869 elif msg_type == 'pyout':
866 md['pyout'] = content
870 md['pyout'] = content
867 else:
871 else:
868 # unhandled msg_type (status, etc.)
872 # unhandled msg_type (status, etc.)
869 pass
873 pass
870
874
871 # reduntant?
875 # reduntant?
872 self.metadata[msg_id] = md
876 self.metadata[msg_id] = md
873
877
874 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
878 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
875
879
876 #--------------------------------------------------------------------------
880 #--------------------------------------------------------------------------
877 # len, getitem
881 # len, getitem
878 #--------------------------------------------------------------------------
882 #--------------------------------------------------------------------------
879
883
880 def __len__(self):
884 def __len__(self):
881 """len(client) returns # of engines."""
885 """len(client) returns # of engines."""
882 return len(self.ids)
886 return len(self.ids)
883
887
884 def __getitem__(self, key):
888 def __getitem__(self, key):
885 """index access returns DirectView multiplexer objects
889 """index access returns DirectView multiplexer objects
886
890
887 Must be int, slice, or list/tuple/xrange of ints"""
891 Must be int, slice, or list/tuple/xrange of ints"""
888 if not isinstance(key, (int, slice, tuple, list, xrange)):
892 if not isinstance(key, (int, slice, tuple, list, xrange)):
889 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
893 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
890 else:
894 else:
891 return self.direct_view(key)
895 return self.direct_view(key)
892
896
893 #--------------------------------------------------------------------------
897 #--------------------------------------------------------------------------
894 # Begin public methods
898 # Begin public methods
895 #--------------------------------------------------------------------------
899 #--------------------------------------------------------------------------
896
900
897 @property
901 @property
898 def ids(self):
902 def ids(self):
899 """Always up-to-date ids property."""
903 """Always up-to-date ids property."""
900 self._flush_notifications()
904 self._flush_notifications()
901 # always copy:
905 # always copy:
902 return list(self._ids)
906 return list(self._ids)
903
907
904 def close(self):
908 def close(self):
905 if self._closed:
909 if self._closed:
906 return
910 return
907 self.stop_spin_thread()
911 self.stop_spin_thread()
908 snames = filter(lambda n: n.endswith('socket'), dir(self))
912 snames = filter(lambda n: n.endswith('socket'), dir(self))
909 for socket in map(lambda name: getattr(self, name), snames):
913 for socket in map(lambda name: getattr(self, name), snames):
910 if isinstance(socket, zmq.Socket) and not socket.closed:
914 if isinstance(socket, zmq.Socket) and not socket.closed:
911 socket.close()
915 socket.close()
912 self._closed = True
916 self._closed = True
913
917
914 def _spin_every(self, interval=1):
918 def _spin_every(self, interval=1):
915 """target func for use in spin_thread"""
919 """target func for use in spin_thread"""
916 while True:
920 while True:
917 if self._stop_spinning.is_set():
921 if self._stop_spinning.is_set():
918 return
922 return
919 time.sleep(interval)
923 time.sleep(interval)
920 self.spin()
924 self.spin()
921
925
922 def spin_thread(self, interval=1):
926 def spin_thread(self, interval=1):
923 """call Client.spin() in a background thread on some regular interval
927 """call Client.spin() in a background thread on some regular interval
924
928
925 This helps ensure that messages don't pile up too much in the zmq queue
929 This helps ensure that messages don't pile up too much in the zmq queue
926 while you are working on other things, or just leaving an idle terminal.
930 while you are working on other things, or just leaving an idle terminal.
927
931
928 It also helps limit potential padding of the `received` timestamp
932 It also helps limit potential padding of the `received` timestamp
929 on AsyncResult objects, used for timings.
933 on AsyncResult objects, used for timings.
930
934
931 Parameters
935 Parameters
932 ----------
936 ----------
933
937
934 interval : float, optional
938 interval : float, optional
935 The interval on which to spin the client in the background thread
939 The interval on which to spin the client in the background thread
936 (simply passed to time.sleep).
940 (simply passed to time.sleep).
937
941
938 Notes
942 Notes
939 -----
943 -----
940
944
941 For precision timing, you may want to use this method to put a bound
945 For precision timing, you may want to use this method to put a bound
942 on the jitter (in seconds) in `received` timestamps used
946 on the jitter (in seconds) in `received` timestamps used
943 in AsyncResult.wall_time.
947 in AsyncResult.wall_time.
944
948
945 """
949 """
946 if self._spin_thread is not None:
950 if self._spin_thread is not None:
947 self.stop_spin_thread()
951 self.stop_spin_thread()
948 self._stop_spinning.clear()
952 self._stop_spinning.clear()
949 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
953 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
950 self._spin_thread.daemon = True
954 self._spin_thread.daemon = True
951 self._spin_thread.start()
955 self._spin_thread.start()
952
956
953 def stop_spin_thread(self):
957 def stop_spin_thread(self):
954 """stop background spin_thread, if any"""
958 """stop background spin_thread, if any"""
955 if self._spin_thread is not None:
959 if self._spin_thread is not None:
956 self._stop_spinning.set()
960 self._stop_spinning.set()
957 self._spin_thread.join()
961 self._spin_thread.join()
958 self._spin_thread = None
962 self._spin_thread = None
959
963
960 def spin(self):
964 def spin(self):
961 """Flush any registration notifications and execution results
965 """Flush any registration notifications and execution results
962 waiting in the ZMQ queue.
966 waiting in the ZMQ queue.
963 """
967 """
964 if self._notification_socket:
968 if self._notification_socket:
965 self._flush_notifications()
969 self._flush_notifications()
966 if self._iopub_socket:
970 if self._iopub_socket:
967 self._flush_iopub(self._iopub_socket)
971 self._flush_iopub(self._iopub_socket)
968 if self._mux_socket:
972 if self._mux_socket:
969 self._flush_results(self._mux_socket)
973 self._flush_results(self._mux_socket)
970 if self._task_socket:
974 if self._task_socket:
971 self._flush_results(self._task_socket)
975 self._flush_results(self._task_socket)
972 if self._control_socket:
976 if self._control_socket:
973 self._flush_control(self._control_socket)
977 self._flush_control(self._control_socket)
974 if self._query_socket:
978 if self._query_socket:
975 self._flush_ignored_hub_replies()
979 self._flush_ignored_hub_replies()
976
980
977 def wait(self, jobs=None, timeout=-1):
981 def wait(self, jobs=None, timeout=-1):
978 """waits on one or more `jobs`, for up to `timeout` seconds.
982 """waits on one or more `jobs`, for up to `timeout` seconds.
979
983
980 Parameters
984 Parameters
981 ----------
985 ----------
982
986
983 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
987 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
984 ints are indices to self.history
988 ints are indices to self.history
985 strs are msg_ids
989 strs are msg_ids
986 default: wait on all outstanding messages
990 default: wait on all outstanding messages
987 timeout : float
991 timeout : float
988 a time in seconds, after which to give up.
992 a time in seconds, after which to give up.
989 default is -1, which means no timeout
993 default is -1, which means no timeout
990
994
991 Returns
995 Returns
992 -------
996 -------
993
997
994 True : when all msg_ids are done
998 True : when all msg_ids are done
995 False : timeout reached, some msg_ids still outstanding
999 False : timeout reached, some msg_ids still outstanding
996 """
1000 """
997 tic = time.time()
1001 tic = time.time()
998 if jobs is None:
1002 if jobs is None:
999 theids = self.outstanding
1003 theids = self.outstanding
1000 else:
1004 else:
1001 if isinstance(jobs, (int, basestring, AsyncResult)):
1005 if isinstance(jobs, (int, basestring, AsyncResult)):
1002 jobs = [jobs]
1006 jobs = [jobs]
1003 theids = set()
1007 theids = set()
1004 for job in jobs:
1008 for job in jobs:
1005 if isinstance(job, int):
1009 if isinstance(job, int):
1006 # index access
1010 # index access
1007 job = self.history[job]
1011 job = self.history[job]
1008 elif isinstance(job, AsyncResult):
1012 elif isinstance(job, AsyncResult):
1009 map(theids.add, job.msg_ids)
1013 map(theids.add, job.msg_ids)
1010 continue
1014 continue
1011 theids.add(job)
1015 theids.add(job)
1012 if not theids.intersection(self.outstanding):
1016 if not theids.intersection(self.outstanding):
1013 return True
1017 return True
1014 self.spin()
1018 self.spin()
1015 while theids.intersection(self.outstanding):
1019 while theids.intersection(self.outstanding):
1016 if timeout >= 0 and ( time.time()-tic ) > timeout:
1020 if timeout >= 0 and ( time.time()-tic ) > timeout:
1017 break
1021 break
1018 time.sleep(1e-3)
1022 time.sleep(1e-3)
1019 self.spin()
1023 self.spin()
1020 return len(theids.intersection(self.outstanding)) == 0
1024 return len(theids.intersection(self.outstanding)) == 0
1021
1025
1022 #--------------------------------------------------------------------------
1026 #--------------------------------------------------------------------------
1023 # Control methods
1027 # Control methods
1024 #--------------------------------------------------------------------------
1028 #--------------------------------------------------------------------------
1025
1029
1026 @spin_first
1030 @spin_first
1027 def clear(self, targets=None, block=None):
1031 def clear(self, targets=None, block=None):
1028 """Clear the namespace in target(s)."""
1032 """Clear the namespace in target(s)."""
1029 block = self.block if block is None else block
1033 block = self.block if block is None else block
1030 targets = self._build_targets(targets)[0]
1034 targets = self._build_targets(targets)[0]
1031 for t in targets:
1035 for t in targets:
1032 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1036 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1033 error = False
1037 error = False
1034 if block:
1038 if block:
1035 self._flush_ignored_control()
1039 self._flush_ignored_control()
1036 for i in range(len(targets)):
1040 for i in range(len(targets)):
1037 idents,msg = self.session.recv(self._control_socket,0)
1041 idents,msg = self.session.recv(self._control_socket,0)
1038 if self.debug:
1042 if self.debug:
1039 pprint(msg)
1043 pprint(msg)
1040 if msg['content']['status'] != 'ok':
1044 if msg['content']['status'] != 'ok':
1041 error = self._unwrap_exception(msg['content'])
1045 error = self._unwrap_exception(msg['content'])
1042 else:
1046 else:
1043 self._ignored_control_replies += len(targets)
1047 self._ignored_control_replies += len(targets)
1044 if error:
1048 if error:
1045 raise error
1049 raise error
1046
1050
1047
1051
1048 @spin_first
1052 @spin_first
1049 def abort(self, jobs=None, targets=None, block=None):
1053 def abort(self, jobs=None, targets=None, block=None):
1050 """Abort specific jobs from the execution queues of target(s).
1054 """Abort specific jobs from the execution queues of target(s).
1051
1055
1052 This is a mechanism to prevent jobs that have already been submitted
1056 This is a mechanism to prevent jobs that have already been submitted
1053 from executing.
1057 from executing.
1054
1058
1055 Parameters
1059 Parameters
1056 ----------
1060 ----------
1057
1061
1058 jobs : msg_id, list of msg_ids, or AsyncResult
1062 jobs : msg_id, list of msg_ids, or AsyncResult
1059 The jobs to be aborted
1063 The jobs to be aborted
1060
1064
1061 If unspecified/None: abort all outstanding jobs.
1065 If unspecified/None: abort all outstanding jobs.
1062
1066
1063 """
1067 """
1064 block = self.block if block is None else block
1068 block = self.block if block is None else block
1065 jobs = jobs if jobs is not None else list(self.outstanding)
1069 jobs = jobs if jobs is not None else list(self.outstanding)
1066 targets = self._build_targets(targets)[0]
1070 targets = self._build_targets(targets)[0]
1067
1071
1068 msg_ids = []
1072 msg_ids = []
1069 if isinstance(jobs, (basestring,AsyncResult)):
1073 if isinstance(jobs, (basestring,AsyncResult)):
1070 jobs = [jobs]
1074 jobs = [jobs]
1071 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1075 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1072 if bad_ids:
1076 if bad_ids:
1073 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1077 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1074 for j in jobs:
1078 for j in jobs:
1075 if isinstance(j, AsyncResult):
1079 if isinstance(j, AsyncResult):
1076 msg_ids.extend(j.msg_ids)
1080 msg_ids.extend(j.msg_ids)
1077 else:
1081 else:
1078 msg_ids.append(j)
1082 msg_ids.append(j)
1079 content = dict(msg_ids=msg_ids)
1083 content = dict(msg_ids=msg_ids)
1080 for t in targets:
1084 for t in targets:
1081 self.session.send(self._control_socket, 'abort_request',
1085 self.session.send(self._control_socket, 'abort_request',
1082 content=content, ident=t)
1086 content=content, ident=t)
1083 error = False
1087 error = False
1084 if block:
1088 if block:
1085 self._flush_ignored_control()
1089 self._flush_ignored_control()
1086 for i in range(len(targets)):
1090 for i in range(len(targets)):
1087 idents,msg = self.session.recv(self._control_socket,0)
1091 idents,msg = self.session.recv(self._control_socket,0)
1088 if self.debug:
1092 if self.debug:
1089 pprint(msg)
1093 pprint(msg)
1090 if msg['content']['status'] != 'ok':
1094 if msg['content']['status'] != 'ok':
1091 error = self._unwrap_exception(msg['content'])
1095 error = self._unwrap_exception(msg['content'])
1092 else:
1096 else:
1093 self._ignored_control_replies += len(targets)
1097 self._ignored_control_replies += len(targets)
1094 if error:
1098 if error:
1095 raise error
1099 raise error
1096
1100
1097 @spin_first
1101 @spin_first
1098 def shutdown(self, targets=None, restart=False, hub=False, block=None):
1102 def shutdown(self, targets=None, restart=False, hub=False, block=None):
1099 """Terminates one or more engine processes, optionally including the hub."""
1103 """Terminates one or more engine processes, optionally including the hub."""
1100 block = self.block if block is None else block
1104 block = self.block if block is None else block
1101 if hub:
1105 if hub:
1102 targets = 'all'
1106 targets = 'all'
1103 targets = self._build_targets(targets)[0]
1107 targets = self._build_targets(targets)[0]
1104 for t in targets:
1108 for t in targets:
1105 self.session.send(self._control_socket, 'shutdown_request',
1109 self.session.send(self._control_socket, 'shutdown_request',
1106 content={'restart':restart},ident=t)
1110 content={'restart':restart},ident=t)
1107 error = False
1111 error = False
1108 if block or hub:
1112 if block or hub:
1109 self._flush_ignored_control()
1113 self._flush_ignored_control()
1110 for i in range(len(targets)):
1114 for i in range(len(targets)):
1111 idents,msg = self.session.recv(self._control_socket, 0)
1115 idents,msg = self.session.recv(self._control_socket, 0)
1112 if self.debug:
1116 if self.debug:
1113 pprint(msg)
1117 pprint(msg)
1114 if msg['content']['status'] != 'ok':
1118 if msg['content']['status'] != 'ok':
1115 error = self._unwrap_exception(msg['content'])
1119 error = self._unwrap_exception(msg['content'])
1116 else:
1120 else:
1117 self._ignored_control_replies += len(targets)
1121 self._ignored_control_replies += len(targets)
1118
1122
1119 if hub:
1123 if hub:
1120 time.sleep(0.25)
1124 time.sleep(0.25)
1121 self.session.send(self._query_socket, 'shutdown_request')
1125 self.session.send(self._query_socket, 'shutdown_request')
1122 idents,msg = self.session.recv(self._query_socket, 0)
1126 idents,msg = self.session.recv(self._query_socket, 0)
1123 if self.debug:
1127 if self.debug:
1124 pprint(msg)
1128 pprint(msg)
1125 if msg['content']['status'] != 'ok':
1129 if msg['content']['status'] != 'ok':
1126 error = self._unwrap_exception(msg['content'])
1130 error = self._unwrap_exception(msg['content'])
1127
1131
1128 if error:
1132 if error:
1129 raise error
1133 raise error
1130
1134
1131 #--------------------------------------------------------------------------
1135 #--------------------------------------------------------------------------
1132 # Execution related methods
1136 # Execution related methods
1133 #--------------------------------------------------------------------------
1137 #--------------------------------------------------------------------------
1134
1138
1135 def _maybe_raise(self, result):
1139 def _maybe_raise(self, result):
1136 """wrapper for maybe raising an exception if apply failed."""
1140 """wrapper for maybe raising an exception if apply failed."""
1137 if isinstance(result, error.RemoteError):
1141 if isinstance(result, error.RemoteError):
1138 raise result
1142 raise result
1139
1143
1140 return result
1144 return result
1141
1145
1142 def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
1146 def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
1143 ident=None):
1147 ident=None):
1144 """construct and send an apply message via a socket.
1148 """construct and send an apply message via a socket.
1145
1149
1146 This is the principal method with which all engine execution is performed by views.
1150 This is the principal method with which all engine execution is performed by views.
1147 """
1151 """
1148
1152
1149 if self._closed:
1153 if self._closed:
1150 raise RuntimeError("Client cannot be used after its sockets have been closed")
1154 raise RuntimeError("Client cannot be used after its sockets have been closed")
1151
1155
1152 # defaults:
1156 # defaults:
1153 args = args if args is not None else []
1157 args = args if args is not None else []
1154 kwargs = kwargs if kwargs is not None else {}
1158 kwargs = kwargs if kwargs is not None else {}
1155 subheader = subheader if subheader is not None else {}
1159 subheader = subheader if subheader is not None else {}
1156
1160
1157 # validate arguments
1161 # validate arguments
1158 if not callable(f) and not isinstance(f, Reference):
1162 if not callable(f) and not isinstance(f, Reference):
1159 raise TypeError("f must be callable, not %s"%type(f))
1163 raise TypeError("f must be callable, not %s"%type(f))
1160 if not isinstance(args, (tuple, list)):
1164 if not isinstance(args, (tuple, list)):
1161 raise TypeError("args must be tuple or list, not %s"%type(args))
1165 raise TypeError("args must be tuple or list, not %s"%type(args))
1162 if not isinstance(kwargs, dict):
1166 if not isinstance(kwargs, dict):
1163 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1167 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1164 if not isinstance(subheader, dict):
1168 if not isinstance(subheader, dict):
1165 raise TypeError("subheader must be dict, not %s"%type(subheader))
1169 raise TypeError("subheader must be dict, not %s"%type(subheader))
1166
1170
1167 bufs = util.pack_apply_message(f,args,kwargs)
1171 bufs = util.pack_apply_message(f,args,kwargs)
1168
1172
1169 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1173 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1170 subheader=subheader, track=track)
1174 subheader=subheader, track=track)
1171
1175
1172 msg_id = msg['header']['msg_id']
1176 msg_id = msg['header']['msg_id']
1173 self.outstanding.add(msg_id)
1177 self.outstanding.add(msg_id)
1174 if ident:
1178 if ident:
1175 # possibly routed to a specific engine
1179 # possibly routed to a specific engine
1176 if isinstance(ident, list):
1180 if isinstance(ident, list):
1177 ident = ident[-1]
1181 ident = ident[-1]
1178 if ident in self._engines.values():
1182 if ident in self._engines.values():
1179 # save for later, in case of engine death
1183 # save for later, in case of engine death
1180 self._outstanding_dict[ident].add(msg_id)
1184 self._outstanding_dict[ident].add(msg_id)
1181 self.history.append(msg_id)
1185 self.history.append(msg_id)
1182 self.metadata[msg_id]['submitted'] = datetime.now()
1186 self.metadata[msg_id]['submitted'] = datetime.now()
1183
1187
1184 return msg
1188 return msg
1185
1189
1186 def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None):
1190 def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None):
1187 """construct and send an execute request via a socket.
1191 """construct and send an execute request via a socket.
1188
1192
1189 """
1193 """
1190
1194
1191 if self._closed:
1195 if self._closed:
1192 raise RuntimeError("Client cannot be used after its sockets have been closed")
1196 raise RuntimeError("Client cannot be used after its sockets have been closed")
1193
1197
1194 # defaults:
1198 # defaults:
1195 subheader = subheader if subheader is not None else {}
1199 subheader = subheader if subheader is not None else {}
1196
1200
1197 # validate arguments
1201 # validate arguments
1198 if not isinstance(code, basestring):
1202 if not isinstance(code, basestring):
1199 raise TypeError("code must be text, not %s" % type(code))
1203 raise TypeError("code must be text, not %s" % type(code))
1200 if not isinstance(subheader, dict):
1204 if not isinstance(subheader, dict):
1201 raise TypeError("subheader must be dict, not %s" % type(subheader))
1205 raise TypeError("subheader must be dict, not %s" % type(subheader))
1202
1206
1203 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1207 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1204
1208
1205
1209
1206 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1210 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1207 subheader=subheader)
1211 subheader=subheader)
1208
1212
1209 msg_id = msg['header']['msg_id']
1213 msg_id = msg['header']['msg_id']
1210 self.outstanding.add(msg_id)
1214 self.outstanding.add(msg_id)
1211 if ident:
1215 if ident:
1212 # possibly routed to a specific engine
1216 # possibly routed to a specific engine
1213 if isinstance(ident, list):
1217 if isinstance(ident, list):
1214 ident = ident[-1]
1218 ident = ident[-1]
1215 if ident in self._engines.values():
1219 if ident in self._engines.values():
1216 # save for later, in case of engine death
1220 # save for later, in case of engine death
1217 self._outstanding_dict[ident].add(msg_id)
1221 self._outstanding_dict[ident].add(msg_id)
1218 self.history.append(msg_id)
1222 self.history.append(msg_id)
1219 self.metadata[msg_id]['submitted'] = datetime.now()
1223 self.metadata[msg_id]['submitted'] = datetime.now()
1220
1224
1221 return msg
1225 return msg
1222
1226
1223 #--------------------------------------------------------------------------
1227 #--------------------------------------------------------------------------
1224 # construct a View object
1228 # construct a View object
1225 #--------------------------------------------------------------------------
1229 #--------------------------------------------------------------------------
1226
1230
1227 def load_balanced_view(self, targets=None):
1231 def load_balanced_view(self, targets=None):
1228 """construct a DirectView object.
1232 """construct a DirectView object.
1229
1233
1230 If no arguments are specified, create a LoadBalancedView
1234 If no arguments are specified, create a LoadBalancedView
1231 using all engines.
1235 using all engines.
1232
1236
1233 Parameters
1237 Parameters
1234 ----------
1238 ----------
1235
1239
1236 targets: list,slice,int,etc. [default: use all engines]
1240 targets: list,slice,int,etc. [default: use all engines]
1237 The subset of engines across which to load-balance
1241 The subset of engines across which to load-balance
1238 """
1242 """
1239 if targets == 'all':
1243 if targets == 'all':
1240 targets = None
1244 targets = None
1241 if targets is not None:
1245 if targets is not None:
1242 targets = self._build_targets(targets)[1]
1246 targets = self._build_targets(targets)[1]
1243 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1247 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1244
1248
1245 def direct_view(self, targets='all'):
1249 def direct_view(self, targets='all'):
1246 """construct a DirectView object.
1250 """construct a DirectView object.
1247
1251
1248 If no targets are specified, create a DirectView using all engines.
1252 If no targets are specified, create a DirectView using all engines.
1249
1253
1250 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1254 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1251 evaluate the target engines at each execution, whereas rc[:] will connect to
1255 evaluate the target engines at each execution, whereas rc[:] will connect to
1252 all *current* engines, and that list will not change.
1256 all *current* engines, and that list will not change.
1253
1257
1254 That is, 'all' will always use all engines, whereas rc[:] will not use
1258 That is, 'all' will always use all engines, whereas rc[:] will not use
1255 engines added after the DirectView is constructed.
1259 engines added after the DirectView is constructed.
1256
1260
1257 Parameters
1261 Parameters
1258 ----------
1262 ----------
1259
1263
1260 targets: list,slice,int,etc. [default: use all engines]
1264 targets: list,slice,int,etc. [default: use all engines]
1261 The engines to use for the View
1265 The engines to use for the View
1262 """
1266 """
1263 single = isinstance(targets, int)
1267 single = isinstance(targets, int)
1264 # allow 'all' to be lazily evaluated at each execution
1268 # allow 'all' to be lazily evaluated at each execution
1265 if targets != 'all':
1269 if targets != 'all':
1266 targets = self._build_targets(targets)[1]
1270 targets = self._build_targets(targets)[1]
1267 if single:
1271 if single:
1268 targets = targets[0]
1272 targets = targets[0]
1269 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1273 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1270
1274
1271 #--------------------------------------------------------------------------
1275 #--------------------------------------------------------------------------
1272 # Query methods
1276 # Query methods
1273 #--------------------------------------------------------------------------
1277 #--------------------------------------------------------------------------
1274
1278
1275 @spin_first
1279 @spin_first
1276 def get_result(self, indices_or_msg_ids=None, block=None):
1280 def get_result(self, indices_or_msg_ids=None, block=None):
1277 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1281 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1278
1282
1279 If the client already has the results, no request to the Hub will be made.
1283 If the client already has the results, no request to the Hub will be made.
1280
1284
1281 This is a convenient way to construct AsyncResult objects, which are wrappers
1285 This is a convenient way to construct AsyncResult objects, which are wrappers
1282 that include metadata about execution, and allow for awaiting results that
1286 that include metadata about execution, and allow for awaiting results that
1283 were not submitted by this Client.
1287 were not submitted by this Client.
1284
1288
1285 It can also be a convenient way to retrieve the metadata associated with
1289 It can also be a convenient way to retrieve the metadata associated with
1286 blocking execution, since it always retrieves
1290 blocking execution, since it always retrieves
1287
1291
1288 Examples
1292 Examples
1289 --------
1293 --------
1290 ::
1294 ::
1291
1295
1292 In [10]: r = client.apply()
1296 In [10]: r = client.apply()
1293
1297
1294 Parameters
1298 Parameters
1295 ----------
1299 ----------
1296
1300
1297 indices_or_msg_ids : integer history index, str msg_id, or list of either
1301 indices_or_msg_ids : integer history index, str msg_id, or list of either
1298 The indices or msg_ids of indices to be retrieved
1302 The indices or msg_ids of indices to be retrieved
1299
1303
1300 block : bool
1304 block : bool
1301 Whether to wait for the result to be done
1305 Whether to wait for the result to be done
1302
1306
1303 Returns
1307 Returns
1304 -------
1308 -------
1305
1309
1306 AsyncResult
1310 AsyncResult
1307 A single AsyncResult object will always be returned.
1311 A single AsyncResult object will always be returned.
1308
1312
1309 AsyncHubResult
1313 AsyncHubResult
1310 A subclass of AsyncResult that retrieves results from the Hub
1314 A subclass of AsyncResult that retrieves results from the Hub
1311
1315
1312 """
1316 """
1313 block = self.block if block is None else block
1317 block = self.block if block is None else block
1314 if indices_or_msg_ids is None:
1318 if indices_or_msg_ids is None:
1315 indices_or_msg_ids = -1
1319 indices_or_msg_ids = -1
1316
1320
1317 if not isinstance(indices_or_msg_ids, (list,tuple)):
1321 if not isinstance(indices_or_msg_ids, (list,tuple)):
1318 indices_or_msg_ids = [indices_or_msg_ids]
1322 indices_or_msg_ids = [indices_or_msg_ids]
1319
1323
1320 theids = []
1324 theids = []
1321 for id in indices_or_msg_ids:
1325 for id in indices_or_msg_ids:
1322 if isinstance(id, int):
1326 if isinstance(id, int):
1323 id = self.history[id]
1327 id = self.history[id]
1324 if not isinstance(id, basestring):
1328 if not isinstance(id, basestring):
1325 raise TypeError("indices must be str or int, not %r"%id)
1329 raise TypeError("indices must be str or int, not %r"%id)
1326 theids.append(id)
1330 theids.append(id)
1327
1331
1328 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1332 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1329 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1333 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1330
1334
1331 if remote_ids:
1335 if remote_ids:
1332 ar = AsyncHubResult(self, msg_ids=theids)
1336 ar = AsyncHubResult(self, msg_ids=theids)
1333 else:
1337 else:
1334 ar = AsyncResult(self, msg_ids=theids)
1338 ar = AsyncResult(self, msg_ids=theids)
1335
1339
1336 if block:
1340 if block:
1337 ar.wait()
1341 ar.wait()
1338
1342
1339 return ar
1343 return ar
1340
1344
1341 @spin_first
1345 @spin_first
1342 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1346 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1343 """Resubmit one or more tasks.
1347 """Resubmit one or more tasks.
1344
1348
1345 in-flight tasks may not be resubmitted.
1349 in-flight tasks may not be resubmitted.
1346
1350
1347 Parameters
1351 Parameters
1348 ----------
1352 ----------
1349
1353
1350 indices_or_msg_ids : integer history index, str msg_id, or list of either
1354 indices_or_msg_ids : integer history index, str msg_id, or list of either
1351 The indices or msg_ids of indices to be retrieved
1355 The indices or msg_ids of indices to be retrieved
1352
1356
1353 block : bool
1357 block : bool
1354 Whether to wait for the result to be done
1358 Whether to wait for the result to be done
1355
1359
1356 Returns
1360 Returns
1357 -------
1361 -------
1358
1362
1359 AsyncHubResult
1363 AsyncHubResult
1360 A subclass of AsyncResult that retrieves results from the Hub
1364 A subclass of AsyncResult that retrieves results from the Hub
1361
1365
1362 """
1366 """
1363 block = self.block if block is None else block
1367 block = self.block if block is None else block
1364 if indices_or_msg_ids is None:
1368 if indices_or_msg_ids is None:
1365 indices_or_msg_ids = -1
1369 indices_or_msg_ids = -1
1366
1370
1367 if not isinstance(indices_or_msg_ids, (list,tuple)):
1371 if not isinstance(indices_or_msg_ids, (list,tuple)):
1368 indices_or_msg_ids = [indices_or_msg_ids]
1372 indices_or_msg_ids = [indices_or_msg_ids]
1369
1373
1370 theids = []
1374 theids = []
1371 for id in indices_or_msg_ids:
1375 for id in indices_or_msg_ids:
1372 if isinstance(id, int):
1376 if isinstance(id, int):
1373 id = self.history[id]
1377 id = self.history[id]
1374 if not isinstance(id, basestring):
1378 if not isinstance(id, basestring):
1375 raise TypeError("indices must be str or int, not %r"%id)
1379 raise TypeError("indices must be str or int, not %r"%id)
1376 theids.append(id)
1380 theids.append(id)
1377
1381
1378 content = dict(msg_ids = theids)
1382 content = dict(msg_ids = theids)
1379
1383
1380 self.session.send(self._query_socket, 'resubmit_request', content)
1384 self.session.send(self._query_socket, 'resubmit_request', content)
1381
1385
1382 zmq.select([self._query_socket], [], [])
1386 zmq.select([self._query_socket], [], [])
1383 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1387 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1384 if self.debug:
1388 if self.debug:
1385 pprint(msg)
1389 pprint(msg)
1386 content = msg['content']
1390 content = msg['content']
1387 if content['status'] != 'ok':
1391 if content['status'] != 'ok':
1388 raise self._unwrap_exception(content)
1392 raise self._unwrap_exception(content)
1389 mapping = content['resubmitted']
1393 mapping = content['resubmitted']
1390 new_ids = [ mapping[msg_id] for msg_id in theids ]
1394 new_ids = [ mapping[msg_id] for msg_id in theids ]
1391
1395
1392 ar = AsyncHubResult(self, msg_ids=new_ids)
1396 ar = AsyncHubResult(self, msg_ids=new_ids)
1393
1397
1394 if block:
1398 if block:
1395 ar.wait()
1399 ar.wait()
1396
1400
1397 return ar
1401 return ar
1398
1402
1399 @spin_first
1403 @spin_first
1400 def result_status(self, msg_ids, status_only=True):
1404 def result_status(self, msg_ids, status_only=True):
1401 """Check on the status of the result(s) of the apply request with `msg_ids`.
1405 """Check on the status of the result(s) of the apply request with `msg_ids`.
1402
1406
1403 If status_only is False, then the actual results will be retrieved, else
1407 If status_only is False, then the actual results will be retrieved, else
1404 only the status of the results will be checked.
1408 only the status of the results will be checked.
1405
1409
1406 Parameters
1410 Parameters
1407 ----------
1411 ----------
1408
1412
1409 msg_ids : list of msg_ids
1413 msg_ids : list of msg_ids
1410 if int:
1414 if int:
1411 Passed as index to self.history for convenience.
1415 Passed as index to self.history for convenience.
1412 status_only : bool (default: True)
1416 status_only : bool (default: True)
1413 if False:
1417 if False:
1414 Retrieve the actual results of completed tasks.
1418 Retrieve the actual results of completed tasks.
1415
1419
1416 Returns
1420 Returns
1417 -------
1421 -------
1418
1422
1419 results : dict
1423 results : dict
1420 There will always be the keys 'pending' and 'completed', which will
1424 There will always be the keys 'pending' and 'completed', which will
1421 be lists of msg_ids that are incomplete or complete. If `status_only`
1425 be lists of msg_ids that are incomplete or complete. If `status_only`
1422 is False, then completed results will be keyed by their `msg_id`.
1426 is False, then completed results will be keyed by their `msg_id`.
1423 """
1427 """
1424 if not isinstance(msg_ids, (list,tuple)):
1428 if not isinstance(msg_ids, (list,tuple)):
1425 msg_ids = [msg_ids]
1429 msg_ids = [msg_ids]
1426
1430
1427 theids = []
1431 theids = []
1428 for msg_id in msg_ids:
1432 for msg_id in msg_ids:
1429 if isinstance(msg_id, int):
1433 if isinstance(msg_id, int):
1430 msg_id = self.history[msg_id]
1434 msg_id = self.history[msg_id]
1431 if not isinstance(msg_id, basestring):
1435 if not isinstance(msg_id, basestring):
1432 raise TypeError("msg_ids must be str, not %r"%msg_id)
1436 raise TypeError("msg_ids must be str, not %r"%msg_id)
1433 theids.append(msg_id)
1437 theids.append(msg_id)
1434
1438
1435 completed = []
1439 completed = []
1436 local_results = {}
1440 local_results = {}
1437
1441
1438 # comment this block out to temporarily disable local shortcut:
1442 # comment this block out to temporarily disable local shortcut:
1439 for msg_id in theids:
1443 for msg_id in theids:
1440 if msg_id in self.results:
1444 if msg_id in self.results:
1441 completed.append(msg_id)
1445 completed.append(msg_id)
1442 local_results[msg_id] = self.results[msg_id]
1446 local_results[msg_id] = self.results[msg_id]
1443 theids.remove(msg_id)
1447 theids.remove(msg_id)
1444
1448
1445 if theids: # some not locally cached
1449 if theids: # some not locally cached
1446 content = dict(msg_ids=theids, status_only=status_only)
1450 content = dict(msg_ids=theids, status_only=status_only)
1447 msg = self.session.send(self._query_socket, "result_request", content=content)
1451 msg = self.session.send(self._query_socket, "result_request", content=content)
1448 zmq.select([self._query_socket], [], [])
1452 zmq.select([self._query_socket], [], [])
1449 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1453 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1450 if self.debug:
1454 if self.debug:
1451 pprint(msg)
1455 pprint(msg)
1452 content = msg['content']
1456 content = msg['content']
1453 if content['status'] != 'ok':
1457 if content['status'] != 'ok':
1454 raise self._unwrap_exception(content)
1458 raise self._unwrap_exception(content)
1455 buffers = msg['buffers']
1459 buffers = msg['buffers']
1456 else:
1460 else:
1457 content = dict(completed=[],pending=[])
1461 content = dict(completed=[],pending=[])
1458
1462
1459 content['completed'].extend(completed)
1463 content['completed'].extend(completed)
1460
1464
1461 if status_only:
1465 if status_only:
1462 return content
1466 return content
1463
1467
1464 failures = []
1468 failures = []
1465 # load cached results into result:
1469 # load cached results into result:
1466 content.update(local_results)
1470 content.update(local_results)
1467
1471
1468 # update cache with results:
1472 # update cache with results:
1469 for msg_id in sorted(theids):
1473 for msg_id in sorted(theids):
1470 if msg_id in content['completed']:
1474 if msg_id in content['completed']:
1471 rec = content[msg_id]
1475 rec = content[msg_id]
1472 parent = rec['header']
1476 parent = rec['header']
1473 header = rec['result_header']
1477 header = rec['result_header']
1474 rcontent = rec['result_content']
1478 rcontent = rec['result_content']
1475 iodict = rec['io']
1479 iodict = rec['io']
1476 if isinstance(rcontent, str):
1480 if isinstance(rcontent, str):
1477 rcontent = self.session.unpack(rcontent)
1481 rcontent = self.session.unpack(rcontent)
1478
1482
1479 md = self.metadata[msg_id]
1483 md = self.metadata[msg_id]
1480 md.update(self._extract_metadata(header, parent, rcontent))
1484 md.update(self._extract_metadata(header, parent, rcontent))
1481 if rec.get('received'):
1485 if rec.get('received'):
1482 md['received'] = rec['received']
1486 md['received'] = rec['received']
1483 md.update(iodict)
1487 md.update(iodict)
1484
1488
1485 if rcontent['status'] == 'ok':
1489 if rcontent['status'] == 'ok':
1486 res,buffers = util.unserialize_object(buffers)
1490 res,buffers = util.unserialize_object(buffers)
1487 else:
1491 else:
1488 print rcontent
1492 print rcontent
1489 res = self._unwrap_exception(rcontent)
1493 res = self._unwrap_exception(rcontent)
1490 failures.append(res)
1494 failures.append(res)
1491
1495
1492 self.results[msg_id] = res
1496 self.results[msg_id] = res
1493 content[msg_id] = res
1497 content[msg_id] = res
1494
1498
1495 if len(theids) == 1 and failures:
1499 if len(theids) == 1 and failures:
1496 raise failures[0]
1500 raise failures[0]
1497
1501
1498 error.collect_exceptions(failures, "result_status")
1502 error.collect_exceptions(failures, "result_status")
1499 return content
1503 return content
1500
1504
1501 @spin_first
1505 @spin_first
1502 def queue_status(self, targets='all', verbose=False):
1506 def queue_status(self, targets='all', verbose=False):
1503 """Fetch the status of engine queues.
1507 """Fetch the status of engine queues.
1504
1508
1505 Parameters
1509 Parameters
1506 ----------
1510 ----------
1507
1511
1508 targets : int/str/list of ints/strs
1512 targets : int/str/list of ints/strs
1509 the engines whose states are to be queried.
1513 the engines whose states are to be queried.
1510 default : all
1514 default : all
1511 verbose : bool
1515 verbose : bool
1512 Whether to return lengths only, or lists of ids for each element
1516 Whether to return lengths only, or lists of ids for each element
1513 """
1517 """
1514 if targets == 'all':
1518 if targets == 'all':
1515 # allow 'all' to be evaluated on the engine
1519 # allow 'all' to be evaluated on the engine
1516 engine_ids = None
1520 engine_ids = None
1517 else:
1521 else:
1518 engine_ids = self._build_targets(targets)[1]
1522 engine_ids = self._build_targets(targets)[1]
1519 content = dict(targets=engine_ids, verbose=verbose)
1523 content = dict(targets=engine_ids, verbose=verbose)
1520 self.session.send(self._query_socket, "queue_request", content=content)
1524 self.session.send(self._query_socket, "queue_request", content=content)
1521 idents,msg = self.session.recv(self._query_socket, 0)
1525 idents,msg = self.session.recv(self._query_socket, 0)
1522 if self.debug:
1526 if self.debug:
1523 pprint(msg)
1527 pprint(msg)
1524 content = msg['content']
1528 content = msg['content']
1525 status = content.pop('status')
1529 status = content.pop('status')
1526 if status != 'ok':
1530 if status != 'ok':
1527 raise self._unwrap_exception(content)
1531 raise self._unwrap_exception(content)
1528 content = rekey(content)
1532 content = rekey(content)
1529 if isinstance(targets, int):
1533 if isinstance(targets, int):
1530 return content[targets]
1534 return content[targets]
1531 else:
1535 else:
1532 return content
1536 return content
1533
1537
1534 @spin_first
1538 @spin_first
1535 def purge_results(self, jobs=[], targets=[]):
1539 def purge_results(self, jobs=[], targets=[]):
1536 """Tell the Hub to forget results.
1540 """Tell the Hub to forget results.
1537
1541
1538 Individual results can be purged by msg_id, or the entire
1542 Individual results can be purged by msg_id, or the entire
1539 history of specific targets can be purged.
1543 history of specific targets can be purged.
1540
1544
1541 Use `purge_results('all')` to scrub everything from the Hub's db.
1545 Use `purge_results('all')` to scrub everything from the Hub's db.
1542
1546
1543 Parameters
1547 Parameters
1544 ----------
1548 ----------
1545
1549
1546 jobs : str or list of str or AsyncResult objects
1550 jobs : str or list of str or AsyncResult objects
1547 the msg_ids whose results should be forgotten.
1551 the msg_ids whose results should be forgotten.
1548 targets : int/str/list of ints/strs
1552 targets : int/str/list of ints/strs
1549 The targets, by int_id, whose entire history is to be purged.
1553 The targets, by int_id, whose entire history is to be purged.
1550
1554
1551 default : None
1555 default : None
1552 """
1556 """
1553 if not targets and not jobs:
1557 if not targets and not jobs:
1554 raise ValueError("Must specify at least one of `targets` and `jobs`")
1558 raise ValueError("Must specify at least one of `targets` and `jobs`")
1555 if targets:
1559 if targets:
1556 targets = self._build_targets(targets)[1]
1560 targets = self._build_targets(targets)[1]
1557
1561
1558 # construct msg_ids from jobs
1562 # construct msg_ids from jobs
1559 if jobs == 'all':
1563 if jobs == 'all':
1560 msg_ids = jobs
1564 msg_ids = jobs
1561 else:
1565 else:
1562 msg_ids = []
1566 msg_ids = []
1563 if isinstance(jobs, (basestring,AsyncResult)):
1567 if isinstance(jobs, (basestring,AsyncResult)):
1564 jobs = [jobs]
1568 jobs = [jobs]
1565 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1569 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1566 if bad_ids:
1570 if bad_ids:
1567 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1571 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1568 for j in jobs:
1572 for j in jobs:
1569 if isinstance(j, AsyncResult):
1573 if isinstance(j, AsyncResult):
1570 msg_ids.extend(j.msg_ids)
1574 msg_ids.extend(j.msg_ids)
1571 else:
1575 else:
1572 msg_ids.append(j)
1576 msg_ids.append(j)
1573
1577
1574 content = dict(engine_ids=targets, msg_ids=msg_ids)
1578 content = dict(engine_ids=targets, msg_ids=msg_ids)
1575 self.session.send(self._query_socket, "purge_request", content=content)
1579 self.session.send(self._query_socket, "purge_request", content=content)
1576 idents, msg = self.session.recv(self._query_socket, 0)
1580 idents, msg = self.session.recv(self._query_socket, 0)
1577 if self.debug:
1581 if self.debug:
1578 pprint(msg)
1582 pprint(msg)
1579 content = msg['content']
1583 content = msg['content']
1580 if content['status'] != 'ok':
1584 if content['status'] != 'ok':
1581 raise self._unwrap_exception(content)
1585 raise self._unwrap_exception(content)
1582
1586
1583 @spin_first
1587 @spin_first
1584 def hub_history(self):
1588 def hub_history(self):
1585 """Get the Hub's history
1589 """Get the Hub's history
1586
1590
1587 Just like the Client, the Hub has a history, which is a list of msg_ids.
1591 Just like the Client, the Hub has a history, which is a list of msg_ids.
1588 This will contain the history of all clients, and, depending on configuration,
1592 This will contain the history of all clients, and, depending on configuration,
1589 may contain history across multiple cluster sessions.
1593 may contain history across multiple cluster sessions.
1590
1594
1591 Any msg_id returned here is a valid argument to `get_result`.
1595 Any msg_id returned here is a valid argument to `get_result`.
1592
1596
1593 Returns
1597 Returns
1594 -------
1598 -------
1595
1599
1596 msg_ids : list of strs
1600 msg_ids : list of strs
1597 list of all msg_ids, ordered by task submission time.
1601 list of all msg_ids, ordered by task submission time.
1598 """
1602 """
1599
1603
1600 self.session.send(self._query_socket, "history_request", content={})
1604 self.session.send(self._query_socket, "history_request", content={})
1601 idents, msg = self.session.recv(self._query_socket, 0)
1605 idents, msg = self.session.recv(self._query_socket, 0)
1602
1606
1603 if self.debug:
1607 if self.debug:
1604 pprint(msg)
1608 pprint(msg)
1605 content = msg['content']
1609 content = msg['content']
1606 if content['status'] != 'ok':
1610 if content['status'] != 'ok':
1607 raise self._unwrap_exception(content)
1611 raise self._unwrap_exception(content)
1608 else:
1612 else:
1609 return content['history']
1613 return content['history']
1610
1614
1611 @spin_first
1615 @spin_first
1612 def db_query(self, query, keys=None):
1616 def db_query(self, query, keys=None):
1613 """Query the Hub's TaskRecord database
1617 """Query the Hub's TaskRecord database
1614
1618
1615 This will return a list of task record dicts that match `query`
1619 This will return a list of task record dicts that match `query`
1616
1620
1617 Parameters
1621 Parameters
1618 ----------
1622 ----------
1619
1623
1620 query : mongodb query dict
1624 query : mongodb query dict
1621 The search dict. See mongodb query docs for details.
1625 The search dict. See mongodb query docs for details.
1622 keys : list of strs [optional]
1626 keys : list of strs [optional]
1623 The subset of keys to be returned. The default is to fetch everything but buffers.
1627 The subset of keys to be returned. The default is to fetch everything but buffers.
1624 'msg_id' will *always* be included.
1628 'msg_id' will *always* be included.
1625 """
1629 """
1626 if isinstance(keys, basestring):
1630 if isinstance(keys, basestring):
1627 keys = [keys]
1631 keys = [keys]
1628 content = dict(query=query, keys=keys)
1632 content = dict(query=query, keys=keys)
1629 self.session.send(self._query_socket, "db_request", content=content)
1633 self.session.send(self._query_socket, "db_request", content=content)
1630 idents, msg = self.session.recv(self._query_socket, 0)
1634 idents, msg = self.session.recv(self._query_socket, 0)
1631 if self.debug:
1635 if self.debug:
1632 pprint(msg)
1636 pprint(msg)
1633 content = msg['content']
1637 content = msg['content']
1634 if content['status'] != 'ok':
1638 if content['status'] != 'ok':
1635 raise self._unwrap_exception(content)
1639 raise self._unwrap_exception(content)
1636
1640
1637 records = content['records']
1641 records = content['records']
1638
1642
1639 buffer_lens = content['buffer_lens']
1643 buffer_lens = content['buffer_lens']
1640 result_buffer_lens = content['result_buffer_lens']
1644 result_buffer_lens = content['result_buffer_lens']
1641 buffers = msg['buffers']
1645 buffers = msg['buffers']
1642 has_bufs = buffer_lens is not None
1646 has_bufs = buffer_lens is not None
1643 has_rbufs = result_buffer_lens is not None
1647 has_rbufs = result_buffer_lens is not None
1644 for i,rec in enumerate(records):
1648 for i,rec in enumerate(records):
1645 # relink buffers
1649 # relink buffers
1646 if has_bufs:
1650 if has_bufs:
1647 blen = buffer_lens[i]
1651 blen = buffer_lens[i]
1648 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1652 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1649 if has_rbufs:
1653 if has_rbufs:
1650 blen = result_buffer_lens[i]
1654 blen = result_buffer_lens[i]
1651 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1655 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1652
1656
1653 return records
1657 return records
1654
1658
1655 __all__ = [ 'Client' ]
1659 __all__ = [ 'Client' ]
@@ -1,264 +1,266 b''
1 """Tests for asyncresult.py
1 """Tests for asyncresult.py
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import time
19 import time
20
20
21 from IPython.parallel.error import TimeoutError
21 from IPython.parallel.error import TimeoutError
22
22
23 from IPython.parallel import error, Client
23 from IPython.parallel import error, Client
24 from IPython.parallel.tests import add_engines
24 from IPython.parallel.tests import add_engines
25 from .clienttest import ClusterTestCase, capture_output
25 from .clienttest import ClusterTestCase, capture_output
26
26
27 def setup():
27 def setup():
28 add_engines(2, total=True)
28 add_engines(2, total=True)
29
29
30 def wait(n):
30 def wait(n):
31 import time
31 import time
32 time.sleep(n)
32 time.sleep(n)
33 return n
33 return n
34
34
35 class AsyncResultTest(ClusterTestCase):
35 class AsyncResultTest(ClusterTestCase):
36
36
37 def test_single_result_view(self):
37 def test_single_result_view(self):
38 """various one-target views get the right value for single_result"""
38 """various one-target views get the right value for single_result"""
39 eid = self.client.ids[-1]
39 eid = self.client.ids[-1]
40 ar = self.client[eid].apply_async(lambda : 42)
40 ar = self.client[eid].apply_async(lambda : 42)
41 self.assertEquals(ar.get(), 42)
41 self.assertEquals(ar.get(), 42)
42 ar = self.client[[eid]].apply_async(lambda : 42)
42 ar = self.client[[eid]].apply_async(lambda : 42)
43 self.assertEquals(ar.get(), [42])
43 self.assertEquals(ar.get(), [42])
44 ar = self.client[-1:].apply_async(lambda : 42)
44 ar = self.client[-1:].apply_async(lambda : 42)
45 self.assertEquals(ar.get(), [42])
45 self.assertEquals(ar.get(), [42])
46
46
47 def test_get_after_done(self):
47 def test_get_after_done(self):
48 ar = self.client[-1].apply_async(lambda : 42)
48 ar = self.client[-1].apply_async(lambda : 42)
49 ar.wait()
49 ar.wait()
50 self.assertTrue(ar.ready())
50 self.assertTrue(ar.ready())
51 self.assertEquals(ar.get(), 42)
51 self.assertEquals(ar.get(), 42)
52 self.assertEquals(ar.get(), 42)
52 self.assertEquals(ar.get(), 42)
53
53
54 def test_get_before_done(self):
54 def test_get_before_done(self):
55 ar = self.client[-1].apply_async(wait, 0.1)
55 ar = self.client[-1].apply_async(wait, 0.1)
56 self.assertRaises(TimeoutError, ar.get, 0)
56 self.assertRaises(TimeoutError, ar.get, 0)
57 ar.wait(0)
57 ar.wait(0)
58 self.assertFalse(ar.ready())
58 self.assertFalse(ar.ready())
59 self.assertEquals(ar.get(), 0.1)
59 self.assertEquals(ar.get(), 0.1)
60
60
61 def test_get_after_error(self):
61 def test_get_after_error(self):
62 ar = self.client[-1].apply_async(lambda : 1/0)
62 ar = self.client[-1].apply_async(lambda : 1/0)
63 ar.wait(10)
63 ar.wait(10)
64 self.assertRaisesRemote(ZeroDivisionError, ar.get)
64 self.assertRaisesRemote(ZeroDivisionError, ar.get)
65 self.assertRaisesRemote(ZeroDivisionError, ar.get)
65 self.assertRaisesRemote(ZeroDivisionError, ar.get)
66 self.assertRaisesRemote(ZeroDivisionError, ar.get_dict)
66 self.assertRaisesRemote(ZeroDivisionError, ar.get_dict)
67
67
68 def test_get_dict(self):
68 def test_get_dict(self):
69 n = len(self.client)
69 n = len(self.client)
70 ar = self.client[:].apply_async(lambda : 5)
70 ar = self.client[:].apply_async(lambda : 5)
71 self.assertEquals(ar.get(), [5]*n)
71 self.assertEquals(ar.get(), [5]*n)
72 d = ar.get_dict()
72 d = ar.get_dict()
73 self.assertEquals(sorted(d.keys()), sorted(self.client.ids))
73 self.assertEquals(sorted(d.keys()), sorted(self.client.ids))
74 for eid,r in d.iteritems():
74 for eid,r in d.iteritems():
75 self.assertEquals(r, 5)
75 self.assertEquals(r, 5)
76
76
77 def test_list_amr(self):
77 def test_list_amr(self):
78 ar = self.client.load_balanced_view().map_async(wait, [0.1]*5)
78 ar = self.client.load_balanced_view().map_async(wait, [0.1]*5)
79 rlist = list(ar)
79 rlist = list(ar)
80
80
81 def test_getattr(self):
81 def test_getattr(self):
82 ar = self.client[:].apply_async(wait, 0.5)
82 ar = self.client[:].apply_async(wait, 0.5)
83 self.assertRaises(AttributeError, lambda : ar._foo)
83 self.assertRaises(AttributeError, lambda : ar._foo)
84 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
84 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
85 self.assertRaises(AttributeError, lambda : ar.foo)
85 self.assertRaises(AttributeError, lambda : ar.foo)
86 self.assertRaises(AttributeError, lambda : ar.engine_id)
86 self.assertRaises(AttributeError, lambda : ar.engine_id)
87 self.assertFalse(hasattr(ar, '__length_hint__'))
87 self.assertFalse(hasattr(ar, '__length_hint__'))
88 self.assertFalse(hasattr(ar, 'foo'))
88 self.assertFalse(hasattr(ar, 'foo'))
89 self.assertFalse(hasattr(ar, 'engine_id'))
89 self.assertFalse(hasattr(ar, 'engine_id'))
90 ar.get(5)
90 ar.get(5)
91 self.assertRaises(AttributeError, lambda : ar._foo)
91 self.assertRaises(AttributeError, lambda : ar._foo)
92 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
92 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
93 self.assertRaises(AttributeError, lambda : ar.foo)
93 self.assertRaises(AttributeError, lambda : ar.foo)
94 self.assertTrue(isinstance(ar.engine_id, list))
94 self.assertTrue(isinstance(ar.engine_id, list))
95 self.assertEquals(ar.engine_id, ar['engine_id'])
95 self.assertEquals(ar.engine_id, ar['engine_id'])
96 self.assertFalse(hasattr(ar, '__length_hint__'))
96 self.assertFalse(hasattr(ar, '__length_hint__'))
97 self.assertFalse(hasattr(ar, 'foo'))
97 self.assertFalse(hasattr(ar, 'foo'))
98 self.assertTrue(hasattr(ar, 'engine_id'))
98 self.assertTrue(hasattr(ar, 'engine_id'))
99
99
100 def test_getitem(self):
100 def test_getitem(self):
101 ar = self.client[:].apply_async(wait, 0.5)
101 ar = self.client[:].apply_async(wait, 0.5)
102 self.assertRaises(TimeoutError, lambda : ar['foo'])
102 self.assertRaises(TimeoutError, lambda : ar['foo'])
103 self.assertRaises(TimeoutError, lambda : ar['engine_id'])
103 self.assertRaises(TimeoutError, lambda : ar['engine_id'])
104 ar.get(5)
104 ar.get(5)
105 self.assertRaises(KeyError, lambda : ar['foo'])
105 self.assertRaises(KeyError, lambda : ar['foo'])
106 self.assertTrue(isinstance(ar['engine_id'], list))
106 self.assertTrue(isinstance(ar['engine_id'], list))
107 self.assertEquals(ar.engine_id, ar['engine_id'])
107 self.assertEquals(ar.engine_id, ar['engine_id'])
108
108
109 def test_single_result(self):
109 def test_single_result(self):
110 ar = self.client[-1].apply_async(wait, 0.5)
110 ar = self.client[-1].apply_async(wait, 0.5)
111 self.assertRaises(TimeoutError, lambda : ar['foo'])
111 self.assertRaises(TimeoutError, lambda : ar['foo'])
112 self.assertRaises(TimeoutError, lambda : ar['engine_id'])
112 self.assertRaises(TimeoutError, lambda : ar['engine_id'])
113 self.assertTrue(ar.get(5) == 0.5)
113 self.assertTrue(ar.get(5) == 0.5)
114 self.assertTrue(isinstance(ar['engine_id'], int))
114 self.assertTrue(isinstance(ar['engine_id'], int))
115 self.assertTrue(isinstance(ar.engine_id, int))
115 self.assertTrue(isinstance(ar.engine_id, int))
116 self.assertEquals(ar.engine_id, ar['engine_id'])
116 self.assertEquals(ar.engine_id, ar['engine_id'])
117
117
118 def test_abort(self):
118 def test_abort(self):
119 e = self.client[-1]
119 e = self.client[-1]
120 ar = e.execute('import time; time.sleep(1)', block=False)
120 ar = e.execute('import time; time.sleep(1)', block=False)
121 ar2 = e.apply_async(lambda : 2)
121 ar2 = e.apply_async(lambda : 2)
122 ar2.abort()
122 ar2.abort()
123 self.assertRaises(error.TaskAborted, ar2.get)
123 self.assertRaises(error.TaskAborted, ar2.get)
124 ar.get()
124 ar.get()
125
125
126 def test_len(self):
126 def test_len(self):
127 v = self.client.load_balanced_view()
127 v = self.client.load_balanced_view()
128 ar = v.map_async(lambda x: x, range(10))
128 ar = v.map_async(lambda x: x, range(10))
129 self.assertEquals(len(ar), 10)
129 self.assertEquals(len(ar), 10)
130 ar = v.apply_async(lambda x: x, range(10))
130 ar = v.apply_async(lambda x: x, range(10))
131 self.assertEquals(len(ar), 1)
131 self.assertEquals(len(ar), 1)
132 ar = self.client[:].apply_async(lambda x: x, range(10))
132 ar = self.client[:].apply_async(lambda x: x, range(10))
133 self.assertEquals(len(ar), len(self.client.ids))
133 self.assertEquals(len(ar), len(self.client.ids))
134
134
135 def test_wall_time_single(self):
135 def test_wall_time_single(self):
136 v = self.client.load_balanced_view()
136 v = self.client.load_balanced_view()
137 ar = v.apply_async(time.sleep, 0.25)
137 ar = v.apply_async(time.sleep, 0.25)
138 self.assertRaises(TimeoutError, getattr, ar, 'wall_time')
138 self.assertRaises(TimeoutError, getattr, ar, 'wall_time')
139 ar.get(2)
139 ar.get(2)
140 self.assertTrue(ar.wall_time < 1.)
140 self.assertTrue(ar.wall_time < 1.)
141 self.assertTrue(ar.wall_time > 0.2)
141 self.assertTrue(ar.wall_time > 0.2)
142
142
143 def test_wall_time_multi(self):
143 def test_wall_time_multi(self):
144 self.minimum_engines(4)
144 self.minimum_engines(4)
145 v = self.client[:]
145 v = self.client[:]
146 ar = v.apply_async(time.sleep, 0.25)
146 ar = v.apply_async(time.sleep, 0.25)
147 self.assertRaises(TimeoutError, getattr, ar, 'wall_time')
147 self.assertRaises(TimeoutError, getattr, ar, 'wall_time')
148 ar.get(2)
148 ar.get(2)
149 self.assertTrue(ar.wall_time < 1.)
149 self.assertTrue(ar.wall_time < 1.)
150 self.assertTrue(ar.wall_time > 0.2)
150 self.assertTrue(ar.wall_time > 0.2)
151
151
152 def test_serial_time_single(self):
152 def test_serial_time_single(self):
153 v = self.client.load_balanced_view()
153 v = self.client.load_balanced_view()
154 ar = v.apply_async(time.sleep, 0.25)
154 ar = v.apply_async(time.sleep, 0.25)
155 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
155 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
156 ar.get(2)
156 ar.get(2)
157 self.assertTrue(ar.serial_time < 1.)
157 self.assertTrue(ar.serial_time < 1.)
158 self.assertTrue(ar.serial_time > 0.2)
158 self.assertTrue(ar.serial_time > 0.2)
159
159
160 def test_serial_time_multi(self):
160 def test_serial_time_multi(self):
161 self.minimum_engines(4)
161 self.minimum_engines(4)
162 v = self.client[:]
162 v = self.client[:]
163 ar = v.apply_async(time.sleep, 0.25)
163 ar = v.apply_async(time.sleep, 0.25)
164 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
164 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
165 ar.get(2)
165 ar.get(2)
166 self.assertTrue(ar.serial_time < 2.)
166 self.assertTrue(ar.serial_time < 2.)
167 self.assertTrue(ar.serial_time > 0.8)
167 self.assertTrue(ar.serial_time > 0.8)
168
168
169 def test_elapsed_single(self):
169 def test_elapsed_single(self):
170 v = self.client.load_balanced_view()
170 v = self.client.load_balanced_view()
171 ar = v.apply_async(time.sleep, 0.25)
171 ar = v.apply_async(time.sleep, 0.25)
172 while not ar.ready():
172 while not ar.ready():
173 time.sleep(0.01)
173 time.sleep(0.01)
174 self.assertTrue(ar.elapsed < 1)
174 self.assertTrue(ar.elapsed < 1)
175 self.assertTrue(ar.elapsed < 1)
175 self.assertTrue(ar.elapsed < 1)
176 ar.get(2)
176 ar.get(2)
177
177
178 def test_elapsed_multi(self):
178 def test_elapsed_multi(self):
179 v = self.client[:]
179 v = self.client[:]
180 ar = v.apply_async(time.sleep, 0.25)
180 ar = v.apply_async(time.sleep, 0.25)
181 while not ar.ready():
181 while not ar.ready():
182 time.sleep(0.01)
182 time.sleep(0.01)
183 self.assertTrue(ar.elapsed < 1)
183 self.assertTrue(ar.elapsed < 1)
184 self.assertTrue(ar.elapsed < 1)
184 self.assertTrue(ar.elapsed < 1)
185 ar.get(2)
185 ar.get(2)
186
186
187 def test_hubresult_timestamps(self):
187 def test_hubresult_timestamps(self):
188 self.minimum_engines(4)
188 self.minimum_engines(4)
189 v = self.client[:]
189 v = self.client[:]
190 ar = v.apply_async(time.sleep, 0.25)
190 ar = v.apply_async(time.sleep, 0.25)
191 ar.get(2)
191 ar.get(2)
192 rc2 = Client(profile='iptest')
192 rc2 = Client(profile='iptest')
193 # must have try/finally to close second Client, otherwise
193 # must have try/finally to close second Client, otherwise
194 # will have dangling sockets causing problems
194 # will have dangling sockets causing problems
195 try:
195 try:
196 time.sleep(0.25)
196 time.sleep(0.25)
197 hr = rc2.get_result(ar.msg_ids)
197 hr = rc2.get_result(ar.msg_ids)
198 self.assertTrue(hr.elapsed > 0., "got bad elapsed: %s" % hr.elapsed)
198 self.assertTrue(hr.elapsed > 0., "got bad elapsed: %s" % hr.elapsed)
199 hr.get(1)
199 hr.get(1)
200 self.assertTrue(hr.wall_time < ar.wall_time + 0.2, "got bad wall_time: %s > %s" % (hr.wall_time, ar.wall_time))
200 self.assertTrue(hr.wall_time < ar.wall_time + 0.2, "got bad wall_time: %s > %s" % (hr.wall_time, ar.wall_time))
201 self.assertEquals(hr.serial_time, ar.serial_time)
201 self.assertEquals(hr.serial_time, ar.serial_time)
202 finally:
202 finally:
203 rc2.close()
203 rc2.close()
204
204
205 def test_display_empty_streams_single(self):
205 def test_display_empty_streams_single(self):
206 """empty stdout/err are not displayed (single result)"""
206 """empty stdout/err are not displayed (single result)"""
207 self.minimum_engines(1)
207 self.minimum_engines(1)
208
208
209 v = self.client[-1]
209 v = self.client[-1]
210 ar = v.execute("print (5555)")
210 ar = v.execute("print (5555)")
211 ar.get(5)
211 ar.get(5)
212 with capture_output() as io:
212 with capture_output() as io:
213 ar.display_outputs()
213 ar.display_outputs()
214 self.assertEquals(io.stderr, '')
214 self.assertEquals(io.stderr, '')
215 self.assertTrue('5555' in io.stdout)
215 self.assertEquals('5555\n', io.stdout)
216
216
217 ar = v.execute("a=5")
217 ar = v.execute("a=5")
218 ar.get(5)
218 ar.get(5)
219 with capture_output() as io:
219 with capture_output() as io:
220 ar.display_outputs()
220 ar.display_outputs()
221 self.assertEquals(io.stderr, '')
221 self.assertEquals(io.stderr, '')
222 self.assertEquals(io.stdout, '')
222 self.assertEquals(io.stdout, '')
223
223
224 def test_display_empty_streams_type(self):
224 def test_display_empty_streams_type(self):
225 """empty stdout/err are not displayed (groupby type)"""
225 """empty stdout/err are not displayed (groupby type)"""
226 self.minimum_engines(1)
226 self.minimum_engines(1)
227
227
228 v = self.client[:]
228 v = self.client[:]
229 ar = v.execute("print (5555)")
229 ar = v.execute("print (5555)")
230 ar.get(5)
230 ar.get(5)
231 with capture_output() as io:
231 with capture_output() as io:
232 ar.display_outputs()
232 ar.display_outputs()
233 self.assertEquals(io.stderr, '')
233 self.assertEquals(io.stderr, '')
234 self.assertEquals(io.stdout.count('5555'), len(v), io.stdout)
234 self.assertEquals(io.stdout.count('5555'), len(v), io.stdout)
235 self.assertFalse('\n\n' in io.stdout, io.stdout)
235 self.assertEquals(io.stdout.count('[stdout:'), len(v), io.stdout)
236 self.assertEquals(io.stdout.count('[stdout:'), len(v), io.stdout)
236
237
237 ar = v.execute("a=5")
238 ar = v.execute("a=5")
238 ar.get(5)
239 ar.get(5)
239 with capture_output() as io:
240 with capture_output() as io:
240 ar.display_outputs()
241 ar.display_outputs()
241 self.assertEquals(io.stderr, '')
242 self.assertEquals(io.stderr, '')
242 self.assertEquals(io.stdout, '')
243 self.assertEquals(io.stdout, '')
243
244
244 def test_display_empty_streams_engine(self):
245 def test_display_empty_streams_engine(self):
245 """empty stdout/err are not displayed (groupby engine)"""
246 """empty stdout/err are not displayed (groupby engine)"""
246 self.minimum_engines(1)
247 self.minimum_engines(1)
247
248
248 v = self.client[:]
249 v = self.client[:]
249 ar = v.execute("print (5555)")
250 ar = v.execute("print (5555)")
250 ar.get(5)
251 ar.get(5)
251 with capture_output() as io:
252 with capture_output() as io:
252 ar.display_outputs('engine')
253 ar.display_outputs('engine')
253 self.assertEquals(io.stderr, '')
254 self.assertEquals(io.stderr, '')
254 self.assertEquals(io.stdout.count('5555'), len(v), io.stdout)
255 self.assertEquals(io.stdout.count('5555'), len(v), io.stdout)
256 self.assertFalse('\n\n' in io.stdout, io.stdout)
255 self.assertEquals(io.stdout.count('[stdout:'), len(v), io.stdout)
257 self.assertEquals(io.stdout.count('[stdout:'), len(v), io.stdout)
256
258
257 ar = v.execute("a=5")
259 ar = v.execute("a=5")
258 ar.get(5)
260 ar.get(5)
259 with capture_output() as io:
261 with capture_output() as io:
260 ar.display_outputs('engine')
262 ar.display_outputs('engine')
261 self.assertEquals(io.stderr, '')
263 self.assertEquals(io.stderr, '')
262 self.assertEquals(io.stdout, '')
264 self.assertEquals(io.stdout, '')
263
265
264
266
@@ -1,342 +1,339 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """Test Parallel magics
2 """Test Parallel magics
3
3
4 Authors:
4 Authors:
5
5
6 * Min RK
6 * Min RK
7 """
7 """
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import re
19 import sys
20 import sys
20 import time
21 import time
21
22
22 import zmq
23 import zmq
23 from nose import SkipTest
24 from nose import SkipTest
24
25
25 from IPython.testing import decorators as dec
26 from IPython.testing import decorators as dec
26 from IPython.testing.ipunittest import ParametricTestCase
27 from IPython.testing.ipunittest import ParametricTestCase
27
28
28 from IPython import parallel as pmod
29 from IPython import parallel as pmod
29 from IPython.parallel import error
30 from IPython.parallel import error
30 from IPython.parallel import AsyncResult
31 from IPython.parallel import AsyncResult
31 from IPython.parallel.util import interactive
32 from IPython.parallel.util import interactive
32
33
33 from IPython.parallel.tests import add_engines
34 from IPython.parallel.tests import add_engines
34
35
35 from .clienttest import ClusterTestCase, capture_output, generate_output
36 from .clienttest import ClusterTestCase, capture_output, generate_output
36
37
37 def setup():
38 def setup():
38 add_engines(3, total=True)
39 add_engines(3, total=True)
39
40
40 class TestParallelMagics(ClusterTestCase, ParametricTestCase):
41 class TestParallelMagics(ClusterTestCase, ParametricTestCase):
41
42
42 def test_px_blocking(self):
43 def test_px_blocking(self):
43 ip = get_ipython()
44 ip = get_ipython()
44 v = self.client[-1:]
45 v = self.client[-1:]
45 v.activate()
46 v.activate()
46 v.block=True
47 v.block=True
47
48
48 ip.magic('px a=5')
49 ip.magic('px a=5')
49 self.assertEquals(v['a'], [5])
50 self.assertEquals(v['a'], [5])
50 ip.magic('px a=10')
51 ip.magic('px a=10')
51 self.assertEquals(v['a'], [10])
52 self.assertEquals(v['a'], [10])
52 # just 'print a' works ~99% of the time, but this ensures that
53 # just 'print a' works ~99% of the time, but this ensures that
53 # the stdout message has arrived when the result is finished:
54 # the stdout message has arrived when the result is finished:
54 with capture_output() as io:
55 with capture_output() as io:
55 ip.magic(
56 ip.magic(
56 'px import sys,time;print(a);sys.stdout.flush();time.sleep(0.2)'
57 'px import sys,time;print(a);sys.stdout.flush();time.sleep(0.2)'
57 )
58 )
58 out = io.stdout
59 out = io.stdout
59 self.assertTrue('[stdout:' in out, out)
60 self.assertTrue('[stdout:' in out, out)
61 self.assertFalse('\n\n' in out)
60 self.assertTrue(out.rstrip().endswith('10'))
62 self.assertTrue(out.rstrip().endswith('10'))
61 self.assertRaisesRemote(ZeroDivisionError, ip.magic, 'px 1/0')
63 self.assertRaisesRemote(ZeroDivisionError, ip.magic, 'px 1/0')
62
64
65 def _check_generated_stderr(self, stderr, n):
66 expected = [
67 r'\[stderr:\d+\]',
68 '^stderr$',
69 '^stderr2$',
70 ] * n
71
72 self.assertFalse('\n\n' in stderr, stderr)
73 lines = stderr.splitlines()
74 self.assertEquals(len(lines), len(expected), stderr)
75 for line,expect in zip(lines, expected):
76 if isinstance(expect, str):
77 expect = [expect]
78 for ex in expect:
79 self.assertTrue(re.search(ex, line) is not None, "Expected %r in %r" % (ex, line))
80
63 def test_cellpx_block_args(self):
81 def test_cellpx_block_args(self):
64 """%%px --[no]block flags work"""
82 """%%px --[no]block flags work"""
65 ip = get_ipython()
83 ip = get_ipython()
66 v = self.client[-1:]
84 v = self.client[-1:]
67 v.activate()
85 v.activate()
68 v.block=False
86 v.block=False
69
87
70 for block in (True, False):
88 for block in (True, False):
71 v.block = block
89 v.block = block
72
90
73 with capture_output() as io:
91 with capture_output() as io:
74 ip.run_cell_magic("px", "", "1")
92 ip.run_cell_magic("px", "", "1")
75 if block:
93 if block:
76 self.assertTrue(io.stdout.startswith("Parallel"), io.stdout)
94 self.assertTrue(io.stdout.startswith("Parallel"), io.stdout)
77 else:
95 else:
78 self.assertTrue(io.stdout.startswith("Async"), io.stdout)
96 self.assertTrue(io.stdout.startswith("Async"), io.stdout)
79
97
80 with capture_output() as io:
98 with capture_output() as io:
81 ip.run_cell_magic("px", "--block", "1")
99 ip.run_cell_magic("px", "--block", "1")
82 self.assertTrue(io.stdout.startswith("Parallel"), io.stdout)
100 self.assertTrue(io.stdout.startswith("Parallel"), io.stdout)
83
101
84 with capture_output() as io:
102 with capture_output() as io:
85 ip.run_cell_magic("px", "--noblock", "1")
103 ip.run_cell_magic("px", "--noblock", "1")
86 self.assertTrue(io.stdout.startswith("Async"), io.stdout)
104 self.assertTrue(io.stdout.startswith("Async"), io.stdout)
87
105
88 def test_cellpx_groupby_engine(self):
106 def test_cellpx_groupby_engine(self):
89 """%%px --group-outputs=engine"""
107 """%%px --group-outputs=engine"""
90 ip = get_ipython()
108 ip = get_ipython()
91 v = self.client[:]
109 v = self.client[:]
92 v.block = True
110 v.block = True
93 v.activate()
111 v.activate()
94
112
95 v['generate_output'] = generate_output
113 v['generate_output'] = generate_output
96
114
97 with capture_output() as io:
115 with capture_output() as io:
98 ip.run_cell_magic('px', '--group-outputs=engine', 'generate_output()')
116 ip.run_cell_magic('px', '--group-outputs=engine', 'generate_output()')
99
117
100 lines = io.stdout.strip().splitlines()[1:]
118 self.assertFalse('\n\n' in io.stdout)
119 lines = io.stdout.splitlines()[1:]
101 expected = [
120 expected = [
102 ('[stdout:', '] stdout'),
121 r'\[stdout:\d+\]',
122 'stdout',
103 'stdout2',
123 'stdout2',
104 'IPython.core.display.HTML',
124 r'\[output:\d+\]',
105 'IPython.core.display.Math',
125 r'IPython\.core\.display\.HTML',
106 ('] Out[', 'IPython.core.display.Math')
126 r'IPython\.core\.display\.Math',
127 r'Out\[\d+:\d+\]:.*IPython\.core\.display\.Math',
107 ] * len(v)
128 ] * len(v)
108
129
109 self.assertEquals(len(lines), len(expected), io.stdout)
130 self.assertEquals(len(lines), len(expected), io.stdout)
110 for line,expect in zip(lines, expected):
131 for line,expect in zip(lines, expected):
111 if isinstance(expect, str):
132 if isinstance(expect, str):
112 expect = [expect]
133 expect = [expect]
113 for ex in expect:
134 for ex in expect:
114 self.assertTrue(ex in line, "Expected %r in %r" % (ex, line))
135 self.assertTrue(re.search(ex, line) is not None, "Expected %r in %r" % (ex, line))
115
116 expected = [
117 ('[stderr:', '] stderr'),
118 'stderr2',
119 ] * len(v)
120
136
121 lines = io.stderr.strip().splitlines()
137 self._check_generated_stderr(io.stderr, len(v))
122 self.assertEquals(len(lines), len(expected), io.stderr)
123 for line,expect in zip(lines, expected):
124 if isinstance(expect, str):
125 expect = [expect]
126 for ex in expect:
127 self.assertTrue(ex in line, "Expected %r in %r" % (ex, line))
128
138
129
139
130 def test_cellpx_groupby_order(self):
140 def test_cellpx_groupby_order(self):
131 """%%px --group-outputs=order"""
141 """%%px --group-outputs=order"""
132 ip = get_ipython()
142 ip = get_ipython()
133 v = self.client[:]
143 v = self.client[:]
134 v.block = True
144 v.block = True
135 v.activate()
145 v.activate()
136
146
137 v['generate_output'] = generate_output
147 v['generate_output'] = generate_output
138
148
139 with capture_output() as io:
149 with capture_output() as io:
140 ip.run_cell_magic('px', '--group-outputs=order', 'generate_output()')
150 ip.run_cell_magic('px', '--group-outputs=order', 'generate_output()')
141
151
142 lines = io.stdout.strip().splitlines()[1:]
152 self.assertFalse('\n\n' in io.stdout)
153 lines = io.stdout.splitlines()[1:]
143 expected = []
154 expected = []
144 expected.extend([
155 expected.extend([
145 ('[stdout:', '] stdout'),
156 r'\[stdout:\d+\]',
157 'stdout',
146 'stdout2',
158 'stdout2',
147 ] * len(v))
159 ] * len(v))
148 expected.extend([
160 expected.extend([
161 r'\[output:\d+\]',
149 'IPython.core.display.HTML',
162 'IPython.core.display.HTML',
150 ] * len(v))
163 ] * len(v))
151 expected.extend([
164 expected.extend([
165 r'\[output:\d+\]',
152 'IPython.core.display.Math',
166 'IPython.core.display.Math',
153 ] * len(v))
167 ] * len(v))
154 expected.extend([
168 expected.extend([
155 ('] Out[', 'IPython.core.display.Math')
169 r'Out\[\d+:\d+\]:.*IPython\.core\.display\.Math'
156 ] * len(v))
170 ] * len(v))
157
171
158 self.assertEquals(len(lines), len(expected), io.stdout)
172 self.assertEquals(len(lines), len(expected), io.stdout)
159 for line,expect in zip(lines, expected):
173 for line,expect in zip(lines, expected):
160 if isinstance(expect, str):
174 if isinstance(expect, str):
161 expect = [expect]
175 expect = [expect]
162 for ex in expect:
176 for ex in expect:
163 self.assertTrue(ex in line, "Expected %r in %r" % (ex, line))
177 self.assertTrue(re.search(ex, line) is not None, "Expected %r in %r" % (ex, line))
164
178
165 expected = [
179 self._check_generated_stderr(io.stderr, len(v))
166 ('[stderr:', '] stderr'),
167 'stderr2',
168 ] * len(v)
169
170 lines = io.stderr.strip().splitlines()
171 self.assertEquals(len(lines), len(expected), io.stderr)
172 for line,expect in zip(lines, expected):
173 if isinstance(expect, str):
174 expect = [expect]
175 for ex in expect:
176 self.assertTrue(ex in line, "Expected %r in %r" % (ex, line))
177
180
178 def test_cellpx_groupby_atype(self):
181 def test_cellpx_groupby_type(self):
179 """%%px --group-outputs=type"""
182 """%%px --group-outputs=type"""
180 ip = get_ipython()
183 ip = get_ipython()
181 v = self.client[:]
184 v = self.client[:]
182 v.block = True
185 v.block = True
183 v.activate()
186 v.activate()
184
187
185 v['generate_output'] = generate_output
188 v['generate_output'] = generate_output
186
189
187 with capture_output() as io:
190 with capture_output() as io:
188 ip.run_cell_magic('px', '--group-outputs=type', 'generate_output()')
191 ip.run_cell_magic('px', '--group-outputs=type', 'generate_output()')
189
192
190 lines = io.stdout.strip().splitlines()[1:]
193 self.assertFalse('\n\n' in io.stdout)
194 lines = io.stdout.splitlines()[1:]
191
195
192 expected = []
196 expected = []
193 expected.extend([
197 expected.extend([
194 ('[stdout:', '] stdout'),
198 r'\[stdout:\d+\]',
199 'stdout',
195 'stdout2',
200 'stdout2',
196 ] * len(v))
201 ] * len(v))
197 expected.extend([
202 expected.extend([
198 'IPython.core.display.HTML',
203 r'\[output:\d+\]',
199 'IPython.core.display.Math',
204 r'IPython\.core\.display\.HTML',
205 r'IPython\.core\.display\.Math',
200 ] * len(v))
206 ] * len(v))
201 expected.extend([
207 expected.extend([
202 ('] Out[', 'IPython.core.display.Math')
208 (r'Out\[\d+:\d+\]', r'IPython\.core\.display\.Math')
203 ] * len(v))
209 ] * len(v))
204
210
205 self.assertEquals(len(lines), len(expected), io.stdout)
211 self.assertEquals(len(lines), len(expected), io.stdout)
206 for line,expect in zip(lines, expected):
212 for line,expect in zip(lines, expected):
207 if isinstance(expect, str):
213 if isinstance(expect, str):
208 expect = [expect]
214 expect = [expect]
209 for ex in expect:
215 for ex in expect:
210 self.assertTrue(ex in line, "Expected %r in %r" % (ex, line))
216 self.assertTrue(re.search(ex, line) is not None, "Expected %r in %r" % (ex, line))
211
217
212 expected = [
218 self._check_generated_stderr(io.stderr, len(v))
213 ('[stderr:', '] stderr'),
214 'stderr2',
215 ] * len(v)
216
217 lines = io.stderr.strip().splitlines()
218 self.assertEquals(len(lines), len(expected), io.stderr)
219 for line,expect in zip(lines, expected):
220 if isinstance(expect, str):
221 expect = [expect]
222 for ex in expect:
223 self.assertTrue(ex in line, "Expected %r in %r" % (ex, line))
224
219
225
220
226 def test_px_nonblocking(self):
221 def test_px_nonblocking(self):
227 ip = get_ipython()
222 ip = get_ipython()
228 v = self.client[-1:]
223 v = self.client[-1:]
229 v.activate()
224 v.activate()
230 v.block=False
225 v.block=False
231
226
232 ip.magic('px a=5')
227 ip.magic('px a=5')
233 self.assertEquals(v['a'], [5])
228 self.assertEquals(v['a'], [5])
234 ip.magic('px a=10')
229 ip.magic('px a=10')
235 self.assertEquals(v['a'], [10])
230 self.assertEquals(v['a'], [10])
236 with capture_output() as io:
231 with capture_output() as io:
237 ar = ip.magic('px print (a)')
232 ar = ip.magic('px print (a)')
238 self.assertTrue(isinstance(ar, AsyncResult))
233 self.assertTrue(isinstance(ar, AsyncResult))
239 self.assertTrue('Async' in io.stdout)
234 self.assertTrue('Async' in io.stdout)
240 self.assertFalse('[stdout:' in io.stdout)
235 self.assertFalse('[stdout:' in io.stdout)
236 self.assertFalse('\n\n' in io.stdout)
237
241 ar = ip.magic('px 1/0')
238 ar = ip.magic('px 1/0')
242 self.assertRaisesRemote(ZeroDivisionError, ar.get)
239 self.assertRaisesRemote(ZeroDivisionError, ar.get)
243
240
244 def test_autopx_blocking(self):
241 def test_autopx_blocking(self):
245 ip = get_ipython()
242 ip = get_ipython()
246 v = self.client[-1]
243 v = self.client[-1]
247 v.activate()
244 v.activate()
248 v.block=True
245 v.block=True
249
246
250 with capture_output() as io:
247 with capture_output() as io:
251 ip.magic('autopx')
248 ip.magic('autopx')
252 ip.run_cell('\n'.join(('a=5','b=12345','c=0')))
249 ip.run_cell('\n'.join(('a=5','b=12345','c=0')))
253 ip.run_cell('b*=2')
250 ip.run_cell('b*=2')
254 ip.run_cell('print (b)')
251 ip.run_cell('print (b)')
255 ip.run_cell('b')
252 ip.run_cell('b')
256 ip.run_cell("b/c")
253 ip.run_cell("b/c")
257 ip.magic('autopx')
254 ip.magic('autopx')
258
255
259 output = io.stdout.strip()
256 output = io.stdout
260
257
261 self.assertTrue(output.startswith('%autopx enabled'), output)
258 self.assertTrue(output.startswith('%autopx enabled'), output)
262 self.assertTrue(output.endswith('%autopx disabled'), output)
259 self.assertTrue(output.rstrip().endswith('%autopx disabled'), output)
263 self.assertTrue('RemoteError: ZeroDivisionError' in output, output)
260 self.assertTrue('RemoteError: ZeroDivisionError' in output, output)
264 self.assertTrue('] Out[' in output, output)
261 self.assertTrue('\nOut[' in output, output)
265 self.assertTrue(': 24690' in output, output)
262 self.assertTrue(': 24690' in output, output)
266 ar = v.get_result(-1)
263 ar = v.get_result(-1)
267 self.assertEquals(v['a'], 5)
264 self.assertEquals(v['a'], 5)
268 self.assertEquals(v['b'], 24690)
265 self.assertEquals(v['b'], 24690)
269 self.assertRaisesRemote(ZeroDivisionError, ar.get)
266 self.assertRaisesRemote(ZeroDivisionError, ar.get)
270
267
271 def test_autopx_nonblocking(self):
268 def test_autopx_nonblocking(self):
272 ip = get_ipython()
269 ip = get_ipython()
273 v = self.client[-1]
270 v = self.client[-1]
274 v.activate()
271 v.activate()
275 v.block=False
272 v.block=False
276
273
277 with capture_output() as io:
274 with capture_output() as io:
278 ip.magic('autopx')
275 ip.magic('autopx')
279 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
276 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
280 ip.run_cell('print (b)')
277 ip.run_cell('print (b)')
281 ip.run_cell('import time; time.sleep(0.1)')
278 ip.run_cell('import time; time.sleep(0.1)')
282 ip.run_cell("b/c")
279 ip.run_cell("b/c")
283 ip.run_cell('b*=2')
280 ip.run_cell('b*=2')
284 ip.magic('autopx')
281 ip.magic('autopx')
285
282
286 output = io.stdout.strip()
283 output = io.stdout.rstrip()
287
284
288 self.assertTrue(output.startswith('%autopx enabled'))
285 self.assertTrue(output.startswith('%autopx enabled'))
289 self.assertTrue(output.endswith('%autopx disabled'))
286 self.assertTrue(output.endswith('%autopx disabled'))
290 self.assertFalse('ZeroDivisionError' in output)
287 self.assertFalse('ZeroDivisionError' in output)
291 ar = v.get_result(-2)
288 ar = v.get_result(-2)
292 self.assertRaisesRemote(ZeroDivisionError, ar.get)
289 self.assertRaisesRemote(ZeroDivisionError, ar.get)
293 # prevent TaskAborted on pulls, due to ZeroDivisionError
290 # prevent TaskAborted on pulls, due to ZeroDivisionError
294 time.sleep(0.5)
291 time.sleep(0.5)
295 self.assertEquals(v['a'], 5)
292 self.assertEquals(v['a'], 5)
296 # b*=2 will not fire, due to abort
293 # b*=2 will not fire, due to abort
297 self.assertEquals(v['b'], 10)
294 self.assertEquals(v['b'], 10)
298
295
299 def test_result(self):
296 def test_result(self):
300 ip = get_ipython()
297 ip = get_ipython()
301 v = self.client[-1]
298 v = self.client[-1]
302 v.activate()
299 v.activate()
303 data = dict(a=111,b=222)
300 data = dict(a=111,b=222)
304 v.push(data, block=True)
301 v.push(data, block=True)
305
302
306 ip.magic('px a')
303 ip.magic('px a')
307 ip.magic('px b')
304 ip.magic('px b')
308 for idx, name in [
305 for idx, name in [
309 ('', 'b'),
306 ('', 'b'),
310 ('-1', 'b'),
307 ('-1', 'b'),
311 ('2', 'b'),
308 ('2', 'b'),
312 ('1', 'a'),
309 ('1', 'a'),
313 ('-2', 'a'),
310 ('-2', 'a'),
314 ]:
311 ]:
315 with capture_output() as io:
312 with capture_output() as io:
316 ip.magic('result ' + idx)
313 ip.magic('result ' + idx)
317 output = io.stdout.strip()
314 output = io.stdout
318 msg = "expected %s output to include %s, but got: %s" % \
315 msg = "expected %s output to include %s, but got: %s" % \
319 ('%result '+idx, str(data[name]), output)
316 ('%result '+idx, str(data[name]), output)
320 self.assertTrue(str(data[name]) in output, msg)
317 self.assertTrue(str(data[name]) in output, msg)
321
318
322 @dec.skipif_not_matplotlib
319 @dec.skipif_not_matplotlib
323 def test_px_pylab(self):
320 def test_px_pylab(self):
324 """%pylab works on engines"""
321 """%pylab works on engines"""
325 ip = get_ipython()
322 ip = get_ipython()
326 v = self.client[-1]
323 v = self.client[-1]
327 v.block = True
324 v.block = True
328 v.activate()
325 v.activate()
329
326
330 with capture_output() as io:
327 with capture_output() as io:
331 ip.magic("px %pylab inline")
328 ip.magic("px %pylab inline")
332
329
333 self.assertTrue("Welcome to pylab" in io.stdout, io.stdout)
330 self.assertTrue("Welcome to pylab" in io.stdout, io.stdout)
334 self.assertTrue("backend_inline" in io.stdout, io.stdout)
331 self.assertTrue("backend_inline" in io.stdout, io.stdout)
335
332
336 with capture_output() as io:
333 with capture_output() as io:
337 ip.magic("px plot(rand(100))")
334 ip.magic("px plot(rand(100))")
338
335
339 self.assertTrue('] Out[' in io.stdout, io.stdout)
336 self.assertTrue('Out[' in io.stdout, io.stdout)
340 self.assertTrue('matplotlib.lines' in io.stdout, io.stdout)
337 self.assertTrue('matplotlib.lines' in io.stdout, io.stdout)
341
338
342
339
@@ -1,228 +1,313 b''
1 {
1 {
2 "metadata": {
2 "metadata": {
3 "name": "Parallel Magics"
3 "name": "Parallel Magics"
4 },
4 },
5 "nbformat": 3,
5 "nbformat": 3,
6 "worksheets": [
6 "worksheets": [
7 {
7 {
8 "cells": [
8 "cells": [
9 {
9 {
10 "cell_type": "heading",
10 "cell_type": "heading",
11 "level": 1,
11 "level": 1,
12 "source": [
12 "source": [
13 "Using Parallel Magics"
13 "Using Parallel Magics"
14 ]
14 ]
15 },
15 },
16 {
16 {
17 "cell_type": "markdown",
17 "cell_type": "markdown",
18 "source": [
18 "source": [
19 "IPython has a few magics for working with your engines.",
19 "IPython has a few magics for working with your engines.",
20 "",
20 "",
21 "This assumes you have started an IPython cluster, either with the notebook interface,",
21 "This assumes you have started an IPython cluster, either with the notebook interface,",
22 "or the `ipcluster/controller/engine` commands."
22 "or the `ipcluster/controller/engine` commands."
23 ]
23 ]
24 },
24 },
25 {
25 {
26 "cell_type": "code",
26 "cell_type": "code",
27 "collapsed": false,
27 "collapsed": false,
28 "input": [
28 "input": [
29 "from IPython import parallel",
29 "from IPython import parallel",
30 "rc = parallel.Client()",
30 "rc = parallel.Client()",
31 "dv = rc[:]",
31 "dv = rc[:]",
32 "dv.block = True",
32 "dv.block = True",
33 "dv"
33 "dv"
34 ],
34 ],
35 "language": "python",
35 "language": "python",
36 "outputs": []
36 "outputs": []
37 },
37 },
38 {
38 {
39 "cell_type": "markdown",
39 "cell_type": "markdown",
40 "source": [
40 "source": [
41 "The parallel magics come from the `parallelmagics` IPython extension.",
41 "The parallel magics come from the `parallelmagics` IPython extension.",
42 "The magics are set to work with a particular View object,",
42 "The magics are set to work with a particular View object,",
43 "so to activate them, you call the `activate()` method on a particular view:"
43 "so to activate them, you call the `activate()` method on a particular view:"
44 ]
44 ]
45 },
45 },
46 {
46 {
47 "cell_type": "code",
47 "cell_type": "code",
48 "collapsed": true,
48 "collapsed": true,
49 "input": [
49 "input": [
50 "dv.activate()"
50 "dv.activate()"
51 ],
51 ],
52 "language": "python",
52 "language": "python",
53 "outputs": []
53 "outputs": []
54 },
54 },
55 {
55 {
56 "cell_type": "markdown",
56 "cell_type": "markdown",
57 "source": [
57 "source": [
58 "Now we can execute code remotely with `%px`:"
58 "Now we can execute code remotely with `%px`:"
59 ]
59 ]
60 },
60 },
61 {
61 {
62 "cell_type": "code",
62 "cell_type": "code",
63 "collapsed": false,
63 "collapsed": false,
64 "input": [
64 "input": [
65 "%px a=5"
65 "%px a=5"
66 ],
66 ],
67 "language": "python",
67 "language": "python",
68 "outputs": []
68 "outputs": []
69 },
69 },
70 {
70 {
71 "cell_type": "code",
71 "cell_type": "code",
72 "collapsed": false,
72 "collapsed": false,
73 "input": [
73 "input": [
74 "%px print a"
74 "%px print a"
75 ],
75 ],
76 "language": "python",
76 "language": "python",
77 "outputs": []
77 "outputs": []
78 },
78 },
79 {
79 {
80 "cell_type": "code",
80 "cell_type": "code",
81 "collapsed": false,
81 "collapsed": false,
82 "input": [
82 "input": [
83 "%px a"
83 "%px a"
84 ],
84 ],
85 "language": "python",
85 "language": "python",
86 "outputs": []
86 "outputs": []
87 },
87 },
88 {
88 {
89 "cell_type": "code",
90 "collapsed": false,
91 "input": [
92 "%px print >> sys.stderr, \"ERROR\""
93 ],
94 "language": "python",
95 "outputs": []
96 },
97 {
89 "cell_type": "markdown",
98 "cell_type": "markdown",
90 "source": [
99 "source": [
91 "You don't have to wait for results:"
100 "You don't have to wait for results:"
92 ]
101 ]
93 },
102 },
94 {
103 {
95 "cell_type": "code",
104 "cell_type": "code",
96 "collapsed": true,
105 "collapsed": true,
97 "input": [
106 "input": [
98 "dv.block = False"
107 "dv.block = False"
99 ],
108 ],
100 "language": "python",
109 "language": "python",
101 "outputs": []
110 "outputs": []
102 },
111 },
103 {
112 {
104 "cell_type": "code",
113 "cell_type": "code",
105 "collapsed": false,
114 "collapsed": false,
106 "input": [
115 "input": [
107 "%px import time",
116 "%px import time",
108 "%px time.sleep(5)",
117 "%px time.sleep(5)",
109 "%px time.time()"
118 "%px time.time()"
110 ],
119 ],
111 "language": "python",
120 "language": "python",
112 "outputs": []
121 "outputs": []
113 },
122 },
114 {
123 {
115 "cell_type": "markdown",
124 "cell_type": "markdown",
116 "source": [
125 "source": [
117 "But you will notice that this didn't output the result of the last command.",
126 "But you will notice that this didn't output the result of the last command.",
118 "For this, we have `%result`, which displays the output of the latest request:"
127 "For this, we have `%result`, which displays the output of the latest request:"
119 ]
128 ]
120 },
129 },
121 {
130 {
122 "cell_type": "code",
131 "cell_type": "code",
123 "collapsed": false,
132 "collapsed": false,
124 "input": [
133 "input": [
125 "%result"
134 "%result"
126 ],
135 ],
127 "language": "python",
136 "language": "python",
128 "outputs": []
137 "outputs": []
129 },
138 },
130 {
139 {
131 "cell_type": "markdown",
140 "cell_type": "markdown",
132 "source": [
141 "source": [
133 "Remember, an IPython engine is IPython, so you can do magics remotely as well!"
142 "Remember, an IPython engine is IPython, so you can do magics remotely as well!"
134 ]
143 ]
135 },
144 },
136 {
145 {
137 "cell_type": "code",
146 "cell_type": "code",
138 "collapsed": false,
147 "collapsed": false,
139 "input": [
148 "input": [
140 "dv.block = True",
149 "dv.block = True",
141 "%px %pylab inline"
150 "%px %pylab inline"
142 ],
151 ],
143 "language": "python",
152 "language": "python",
144 "outputs": []
153 "outputs": []
145 },
154 },
146 {
155 {
147 "cell_type": "markdown",
156 "cell_type": "markdown",
148 "source": [
157 "source": [
149 "`%%px` can also be used as a cell magic, for submitting whole blocks.",
158 "`%%px` can also be used as a cell magic, for submitting whole blocks.",
150 "This one acceps `--block` and `--noblock` flags to specify",
159 "This one acceps `--block` and `--noblock` flags to specify",
151 "the blocking behavior, though the default is unchanged.",
160 "the blocking behavior, though the default is unchanged.",
152 ""
161 ""
153 ]
162 ]
154 },
163 },
155 {
164 {
156 "cell_type": "code",
165 "cell_type": "code",
157 "collapsed": true,
166 "collapsed": true,
158 "input": [
167 "input": [
159 "dv.scatter('id', dv.targets, flatten=True)",
168 "dv.scatter('id', dv.targets, flatten=True)",
160 "dv['stride'] = len(dv)"
169 "dv['stride'] = len(dv)"
161 ],
170 ],
162 "language": "python",
171 "language": "python",
163 "outputs": []
172 "outputs": []
164 },
173 },
165 {
174 {
166 "cell_type": "code",
175 "cell_type": "code",
167 "collapsed": false,
176 "collapsed": false,
168 "input": [
177 "input": [
169 "%%px --noblock",
178 "%%px --noblock",
170 "x = linspace(0,pi,1000)",
179 "x = linspace(0,pi,1000)",
171 "for n in range(id,12, stride):",
180 "for n in range(id,12, stride):",
172 " print n",
181 " print n",
173 " plt.plot(x,sin(n*x))",
182 " plt.plot(x,sin(n*x))",
174 "plt.title(\"Plot %i\" % id)"
183 "plt.title(\"Plot %i\" % id)"
175 ],
184 ],
176 "language": "python",
185 "language": "python",
177 "outputs": []
186 "outputs": []
178 },
187 },
179 {
188 {
180 "cell_type": "code",
189 "cell_type": "code",
181 "collapsed": false,
190 "collapsed": false,
182 "input": [
191 "input": [
183 "%result"
192 "%result"
184 ],
193 ],
185 "language": "python",
194 "language": "python",
186 "outputs": []
195 "outputs": []
187 },
196 },
188 {
197 {
189 "cell_type": "markdown",
198 "cell_type": "markdown",
190 "source": [
199 "source": [
191 "It also lets you choose some amount of the grouping of the outputs with `--group-outputs`:",
200 "It also lets you choose some amount of the grouping of the outputs with `--group-outputs`:",
192 "",
201 "",
193 "The choices are:",
202 "The choices are:",
194 "",
203 "",
195 "* `engine` - all of an engine's output is collected together",
204 "* `engine` - all of an engine's output is collected together",
196 "* `type` - where stdout of each engine is grouped, etc. (the default)",
205 "* `type` - where stdout of each engine is grouped, etc. (the default)",
197 "* `order` - same as `type`, but individual displaypub outputs are interleaved.",
206 "* `order` - same as `type`, but individual displaypub outputs are interleaved.",
198 " That is, it will output the first plot from each engine, then the second from each,",
207 " That is, it will output the first plot from each engine, then the second from each,",
199 " etc."
208 " etc."
200 ]
209 ]
201 },
210 },
202 {
211 {
203 "cell_type": "code",
212 "cell_type": "code",
204 "collapsed": false,
213 "collapsed": false,
205 "input": [
214 "input": [
206 "%%px --group-outputs=engine",
215 "%%px --group-outputs=engine",
207 "x = linspace(0,pi,1000)",
216 "x = linspace(0,pi,1000)",
208 "for n in range(id,12, stride):",
217 "for n in range(id,12, stride):",
209 " print n",
218 " print n",
219 " plt.figure()",
210 " plt.plot(x,sin(n*x))",
220 " plt.plot(x,sin(n*x))",
211 "plt.title(\"Plot %i\" % id)"
221 "plt.title(\"Plot %i\" % id)"
212 ],
222 ],
213 "language": "python",
223 "language": "python",
214 "outputs": []
224 "outputs": []
215 },
225 },
216 {
226 {
227 "cell_type": "markdown",
228 "source": [
229 "When you specify 'order', then individual display outputs (e.g. plots) will be interleaved:"
230 ]
231 },
232 {
233 "cell_type": "code",
234 "collapsed": false,
235 "input": [
236 "%%px --group-outputs=order",
237 "x = linspace(0,pi,1000)",
238 "for n in range(id,12, stride):",
239 " print n",
240 " plt.figure()",
241 " plt.plot(x,sin(n*x))",
242 "plt.title(\"Plot %i\" % id)"
243 ],
244 "language": "python",
245 "outputs": []
246 },
247 {
248 "cell_type": "heading",
249 "level": 2,
250 "source": [
251 "Single-engine views"
252 ]
253 },
254 {
255 "cell_type": "markdown",
256 "source": [
257 "When a DirectView has a single target, the output is a bit simpler (no prefixes on stdout/err, etc.):"
258 ]
259 },
260 {
261 "cell_type": "code",
262 "collapsed": true,
263 "input": [
264 "def generate_output():",
265 " \"\"\"function for testing output",
266 " ",
267 " publishes two outputs of each type, and returns something",
268 " \"\"\"",
269 " ",
270 " import sys,os",
271 " from IPython.core.display import display, HTML, Math",
272 " ",
273 " print \"stdout\"",
274 " print >> sys.stderr, \"stderr\"",
275 " ",
276 " display(HTML(\"<b>HTML</b>\"))",
277 " ",
278 " print \"stdout2\"",
279 " print >> sys.stderr, \"stderr2\"",
280 " ",
281 " display(Math(r\"\\alpha=\\beta\"))",
282 " ",
283 " return os.getpid()",
284 "",
285 "dv['generate_output'] = generate_output"
286 ],
287 "language": "python",
288 "outputs": []
289 },
290 {
217 "cell_type": "code",
291 "cell_type": "code",
218 "collapsed": true,
292 "collapsed": true,
219 "input": [
293 "input": [
220 ""
294 "e0 = rc[-1]",
295 "e0.block = True",
296 "e0.activate()"
297 ],
298 "language": "python",
299 "outputs": []
300 },
301 {
302 "cell_type": "code",
303 "collapsed": false,
304 "input": [
305 "%px generate_output()"
221 ],
306 ],
222 "language": "python",
307 "language": "python",
223 "outputs": []
308 "outputs": []
224 }
309 }
225 ]
310 ]
226 }
311 }
227 ]
312 ]
228 } No newline at end of file
313 }
General Comments 0
You need to be logged in to leave comments. Login now