##// END OF EJS Templates
AsyncResult.display_outputs should only print stdout/err if non-empty (single engine)
MinRK -
Show More
@@ -1,651 +1,652 b''
1 """AsyncResult objects for the client
1 """AsyncResult objects for the client
2
2
3 Authors:
3 Authors:
4
4
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import sys
18 import sys
19 import time
19 import time
20 from datetime import datetime
20 from datetime import datetime
21
21
22 from zmq import MessageTracker
22 from zmq import MessageTracker
23
23
24 from IPython.core.display import clear_output, display
24 from IPython.core.display import clear_output, display
25 from IPython.external.decorator import decorator
25 from IPython.external.decorator import decorator
26 from IPython.parallel import error
26 from IPython.parallel import error
27
27
28 #-----------------------------------------------------------------------------
28 #-----------------------------------------------------------------------------
29 # Functions
29 # Functions
30 #-----------------------------------------------------------------------------
30 #-----------------------------------------------------------------------------
31
31
32 def _total_seconds(td):
32 def _total_seconds(td):
33 """timedelta.total_seconds was added in 2.7"""
33 """timedelta.total_seconds was added in 2.7"""
34 try:
34 try:
35 # Python >= 2.7
35 # Python >= 2.7
36 return td.total_seconds()
36 return td.total_seconds()
37 except AttributeError:
37 except AttributeError:
38 # Python 2.6
38 # Python 2.6
39 return 1e-6 * (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)
39 return 1e-6 * (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)
40
40
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42 # Classes
42 # Classes
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44
44
45 # global empty tracker that's always done:
45 # global empty tracker that's always done:
46 finished_tracker = MessageTracker()
46 finished_tracker = MessageTracker()
47
47
48 @decorator
48 @decorator
49 def check_ready(f, self, *args, **kwargs):
49 def check_ready(f, self, *args, **kwargs):
50 """Call spin() to sync state prior to calling the method."""
50 """Call spin() to sync state prior to calling the method."""
51 self.wait(0)
51 self.wait(0)
52 if not self._ready:
52 if not self._ready:
53 raise error.TimeoutError("result not ready")
53 raise error.TimeoutError("result not ready")
54 return f(self, *args, **kwargs)
54 return f(self, *args, **kwargs)
55
55
56 class AsyncResult(object):
56 class AsyncResult(object):
57 """Class for representing results of non-blocking calls.
57 """Class for representing results of non-blocking calls.
58
58
59 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
59 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
60 """
60 """
61
61
62 msg_ids = None
62 msg_ids = None
63 _targets = None
63 _targets = None
64 _tracker = None
64 _tracker = None
65 _single_result = False
65 _single_result = False
66
66
67 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
67 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
68 if isinstance(msg_ids, basestring):
68 if isinstance(msg_ids, basestring):
69 # always a list
69 # always a list
70 msg_ids = [msg_ids]
70 msg_ids = [msg_ids]
71 if tracker is None:
71 if tracker is None:
72 # default to always done
72 # default to always done
73 tracker = finished_tracker
73 tracker = finished_tracker
74 self._client = client
74 self._client = client
75 self.msg_ids = msg_ids
75 self.msg_ids = msg_ids
76 self._fname=fname
76 self._fname=fname
77 self._targets = targets
77 self._targets = targets
78 self._tracker = tracker
78 self._tracker = tracker
79 self._ready = False
79 self._ready = False
80 self._success = None
80 self._success = None
81 self._metadata = None
81 self._metadata = None
82 if len(msg_ids) == 1:
82 if len(msg_ids) == 1:
83 self._single_result = not isinstance(targets, (list, tuple))
83 self._single_result = not isinstance(targets, (list, tuple))
84 else:
84 else:
85 self._single_result = False
85 self._single_result = False
86
86
87 def __repr__(self):
87 def __repr__(self):
88 if self._ready:
88 if self._ready:
89 return "<%s: finished>"%(self.__class__.__name__)
89 return "<%s: finished>"%(self.__class__.__name__)
90 else:
90 else:
91 return "<%s: %s>"%(self.__class__.__name__,self._fname)
91 return "<%s: %s>"%(self.__class__.__name__,self._fname)
92
92
93
93
94 def _reconstruct_result(self, res):
94 def _reconstruct_result(self, res):
95 """Reconstruct our result from actual result list (always a list)
95 """Reconstruct our result from actual result list (always a list)
96
96
97 Override me in subclasses for turning a list of results
97 Override me in subclasses for turning a list of results
98 into the expected form.
98 into the expected form.
99 """
99 """
100 if self._single_result:
100 if self._single_result:
101 return res[0]
101 return res[0]
102 else:
102 else:
103 return res
103 return res
104
104
105 def get(self, timeout=-1):
105 def get(self, timeout=-1):
106 """Return the result when it arrives.
106 """Return the result when it arrives.
107
107
108 If `timeout` is not ``None`` and the result does not arrive within
108 If `timeout` is not ``None`` and the result does not arrive within
109 `timeout` seconds then ``TimeoutError`` is raised. If the
109 `timeout` seconds then ``TimeoutError`` is raised. If the
110 remote call raised an exception then that exception will be reraised
110 remote call raised an exception then that exception will be reraised
111 by get() inside a `RemoteError`.
111 by get() inside a `RemoteError`.
112 """
112 """
113 if not self.ready():
113 if not self.ready():
114 self.wait(timeout)
114 self.wait(timeout)
115
115
116 if self._ready:
116 if self._ready:
117 if self._success:
117 if self._success:
118 return self._result
118 return self._result
119 else:
119 else:
120 raise self._exception
120 raise self._exception
121 else:
121 else:
122 raise error.TimeoutError("Result not ready.")
122 raise error.TimeoutError("Result not ready.")
123
123
124 def ready(self):
124 def ready(self):
125 """Return whether the call has completed."""
125 """Return whether the call has completed."""
126 if not self._ready:
126 if not self._ready:
127 self.wait(0)
127 self.wait(0)
128 return self._ready
128 return self._ready
129
129
130 def wait(self, timeout=-1):
130 def wait(self, timeout=-1):
131 """Wait until the result is available or until `timeout` seconds pass.
131 """Wait until the result is available or until `timeout` seconds pass.
132
132
133 This method always returns None.
133 This method always returns None.
134 """
134 """
135 if self._ready:
135 if self._ready:
136 return
136 return
137 self._ready = self._client.wait(self.msg_ids, timeout)
137 self._ready = self._client.wait(self.msg_ids, timeout)
138 if self._ready:
138 if self._ready:
139 try:
139 try:
140 results = map(self._client.results.get, self.msg_ids)
140 results = map(self._client.results.get, self.msg_ids)
141 self._result = results
141 self._result = results
142 if self._single_result:
142 if self._single_result:
143 r = results[0]
143 r = results[0]
144 if isinstance(r, Exception):
144 if isinstance(r, Exception):
145 raise r
145 raise r
146 else:
146 else:
147 results = error.collect_exceptions(results, self._fname)
147 results = error.collect_exceptions(results, self._fname)
148 self._result = self._reconstruct_result(results)
148 self._result = self._reconstruct_result(results)
149 except Exception, e:
149 except Exception, e:
150 self._exception = e
150 self._exception = e
151 self._success = False
151 self._success = False
152 else:
152 else:
153 self._success = True
153 self._success = True
154 finally:
154 finally:
155 self._metadata = map(self._client.metadata.get, self.msg_ids)
155 self._metadata = map(self._client.metadata.get, self.msg_ids)
156
156
157
157
158 def successful(self):
158 def successful(self):
159 """Return whether the call completed without raising an exception.
159 """Return whether the call completed without raising an exception.
160
160
161 Will raise ``AssertionError`` if the result is not ready.
161 Will raise ``AssertionError`` if the result is not ready.
162 """
162 """
163 assert self.ready()
163 assert self.ready()
164 return self._success
164 return self._success
165
165
166 #----------------------------------------------------------------
166 #----------------------------------------------------------------
167 # Extra methods not in mp.pool.AsyncResult
167 # Extra methods not in mp.pool.AsyncResult
168 #----------------------------------------------------------------
168 #----------------------------------------------------------------
169
169
170 def get_dict(self, timeout=-1):
170 def get_dict(self, timeout=-1):
171 """Get the results as a dict, keyed by engine_id.
171 """Get the results as a dict, keyed by engine_id.
172
172
173 timeout behavior is described in `get()`.
173 timeout behavior is described in `get()`.
174 """
174 """
175
175
176 results = self.get(timeout)
176 results = self.get(timeout)
177 engine_ids = [ md['engine_id'] for md in self._metadata ]
177 engine_ids = [ md['engine_id'] for md in self._metadata ]
178 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
178 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
179 maxcount = bycount.count(bycount[-1])
179 maxcount = bycount.count(bycount[-1])
180 if maxcount > 1:
180 if maxcount > 1:
181 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
181 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
182 maxcount, bycount[-1]))
182 maxcount, bycount[-1]))
183
183
184 return dict(zip(engine_ids,results))
184 return dict(zip(engine_ids,results))
185
185
186 @property
186 @property
187 def result(self):
187 def result(self):
188 """result property wrapper for `get(timeout=0)`."""
188 """result property wrapper for `get(timeout=0)`."""
189 return self.get()
189 return self.get()
190
190
191 # abbreviated alias:
191 # abbreviated alias:
192 r = result
192 r = result
193
193
194 @property
194 @property
195 @check_ready
195 @check_ready
196 def metadata(self):
196 def metadata(self):
197 """property for accessing execution metadata."""
197 """property for accessing execution metadata."""
198 if self._single_result:
198 if self._single_result:
199 return self._metadata[0]
199 return self._metadata[0]
200 else:
200 else:
201 return self._metadata
201 return self._metadata
202
202
203 @property
203 @property
204 def result_dict(self):
204 def result_dict(self):
205 """result property as a dict."""
205 """result property as a dict."""
206 return self.get_dict()
206 return self.get_dict()
207
207
208 def __dict__(self):
208 def __dict__(self):
209 return self.get_dict(0)
209 return self.get_dict(0)
210
210
211 def abort(self):
211 def abort(self):
212 """abort my tasks."""
212 """abort my tasks."""
213 assert not self.ready(), "Can't abort, I am already done!"
213 assert not self.ready(), "Can't abort, I am already done!"
214 return self._client.abort(self.msg_ids, targets=self._targets, block=True)
214 return self._client.abort(self.msg_ids, targets=self._targets, block=True)
215
215
216 @property
216 @property
217 def sent(self):
217 def sent(self):
218 """check whether my messages have been sent."""
218 """check whether my messages have been sent."""
219 return self._tracker.done
219 return self._tracker.done
220
220
221 def wait_for_send(self, timeout=-1):
221 def wait_for_send(self, timeout=-1):
222 """wait for pyzmq send to complete.
222 """wait for pyzmq send to complete.
223
223
224 This is necessary when sending arrays that you intend to edit in-place.
224 This is necessary when sending arrays that you intend to edit in-place.
225 `timeout` is in seconds, and will raise TimeoutError if it is reached
225 `timeout` is in seconds, and will raise TimeoutError if it is reached
226 before the send completes.
226 before the send completes.
227 """
227 """
228 return self._tracker.wait(timeout)
228 return self._tracker.wait(timeout)
229
229
230 #-------------------------------------
230 #-------------------------------------
231 # dict-access
231 # dict-access
232 #-------------------------------------
232 #-------------------------------------
233
233
234 @check_ready
234 @check_ready
235 def __getitem__(self, key):
235 def __getitem__(self, key):
236 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
236 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
237 """
237 """
238 if isinstance(key, int):
238 if isinstance(key, int):
239 return error.collect_exceptions([self._result[key]], self._fname)[0]
239 return error.collect_exceptions([self._result[key]], self._fname)[0]
240 elif isinstance(key, slice):
240 elif isinstance(key, slice):
241 return error.collect_exceptions(self._result[key], self._fname)
241 return error.collect_exceptions(self._result[key], self._fname)
242 elif isinstance(key, basestring):
242 elif isinstance(key, basestring):
243 values = [ md[key] for md in self._metadata ]
243 values = [ md[key] for md in self._metadata ]
244 if self._single_result:
244 if self._single_result:
245 return values[0]
245 return values[0]
246 else:
246 else:
247 return values
247 return values
248 else:
248 else:
249 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
249 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
250
250
251 def __getattr__(self, key):
251 def __getattr__(self, key):
252 """getattr maps to getitem for convenient attr access to metadata."""
252 """getattr maps to getitem for convenient attr access to metadata."""
253 try:
253 try:
254 return self.__getitem__(key)
254 return self.__getitem__(key)
255 except (error.TimeoutError, KeyError):
255 except (error.TimeoutError, KeyError):
256 raise AttributeError("%r object has no attribute %r"%(
256 raise AttributeError("%r object has no attribute %r"%(
257 self.__class__.__name__, key))
257 self.__class__.__name__, key))
258
258
259 # asynchronous iterator:
259 # asynchronous iterator:
260 def __iter__(self):
260 def __iter__(self):
261 if self._single_result:
261 if self._single_result:
262 raise TypeError("AsyncResults with a single result are not iterable.")
262 raise TypeError("AsyncResults with a single result are not iterable.")
263 try:
263 try:
264 rlist = self.get(0)
264 rlist = self.get(0)
265 except error.TimeoutError:
265 except error.TimeoutError:
266 # wait for each result individually
266 # wait for each result individually
267 for msg_id in self.msg_ids:
267 for msg_id in self.msg_ids:
268 ar = AsyncResult(self._client, msg_id, self._fname)
268 ar = AsyncResult(self._client, msg_id, self._fname)
269 yield ar.get()
269 yield ar.get()
270 else:
270 else:
271 # already done
271 # already done
272 for r in rlist:
272 for r in rlist:
273 yield r
273 yield r
274
274
275 def __len__(self):
275 def __len__(self):
276 return len(self.msg_ids)
276 return len(self.msg_ids)
277
277
278 #-------------------------------------
278 #-------------------------------------
279 # Sugar methods and attributes
279 # Sugar methods and attributes
280 #-------------------------------------
280 #-------------------------------------
281
281
282 def timedelta(self, start, end, start_key=min, end_key=max):
282 def timedelta(self, start, end, start_key=min, end_key=max):
283 """compute the difference between two sets of timestamps
283 """compute the difference between two sets of timestamps
284
284
285 The default behavior is to use the earliest of the first
285 The default behavior is to use the earliest of the first
286 and the latest of the second list, but this can be changed
286 and the latest of the second list, but this can be changed
287 by passing a different
287 by passing a different
288
288
289 Parameters
289 Parameters
290 ----------
290 ----------
291
291
292 start : one or more datetime objects (e.g. ar.submitted)
292 start : one or more datetime objects (e.g. ar.submitted)
293 end : one or more datetime objects (e.g. ar.received)
293 end : one or more datetime objects (e.g. ar.received)
294 start_key : callable
294 start_key : callable
295 Function to call on `start` to extract the relevant
295 Function to call on `start` to extract the relevant
296 entry [defalt: min]
296 entry [defalt: min]
297 end_key : callable
297 end_key : callable
298 Function to call on `end` to extract the relevant
298 Function to call on `end` to extract the relevant
299 entry [default: max]
299 entry [default: max]
300
300
301 Returns
301 Returns
302 -------
302 -------
303
303
304 dt : float
304 dt : float
305 The time elapsed (in seconds) between the two selected timestamps.
305 The time elapsed (in seconds) between the two selected timestamps.
306 """
306 """
307 if not isinstance(start, datetime):
307 if not isinstance(start, datetime):
308 # handle single_result AsyncResults, where ar.stamp is single object,
308 # handle single_result AsyncResults, where ar.stamp is single object,
309 # not a list
309 # not a list
310 start = start_key(start)
310 start = start_key(start)
311 if not isinstance(end, datetime):
311 if not isinstance(end, datetime):
312 # handle single_result AsyncResults, where ar.stamp is single object,
312 # handle single_result AsyncResults, where ar.stamp is single object,
313 # not a list
313 # not a list
314 end = end_key(end)
314 end = end_key(end)
315 return _total_seconds(end - start)
315 return _total_seconds(end - start)
316
316
317 @property
317 @property
318 def progress(self):
318 def progress(self):
319 """the number of tasks which have been completed at this point.
319 """the number of tasks which have been completed at this point.
320
320
321 Fractional progress would be given by 1.0 * ar.progress / len(ar)
321 Fractional progress would be given by 1.0 * ar.progress / len(ar)
322 """
322 """
323 self.wait(0)
323 self.wait(0)
324 return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding))
324 return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding))
325
325
326 @property
326 @property
327 def elapsed(self):
327 def elapsed(self):
328 """elapsed time since initial submission"""
328 """elapsed time since initial submission"""
329 if self.ready():
329 if self.ready():
330 return self.wall_time
330 return self.wall_time
331
331
332 now = submitted = datetime.now()
332 now = submitted = datetime.now()
333 for msg_id in self.msg_ids:
333 for msg_id in self.msg_ids:
334 if msg_id in self._client.metadata:
334 if msg_id in self._client.metadata:
335 stamp = self._client.metadata[msg_id]['submitted']
335 stamp = self._client.metadata[msg_id]['submitted']
336 if stamp and stamp < submitted:
336 if stamp and stamp < submitted:
337 submitted = stamp
337 submitted = stamp
338 return _total_seconds(now-submitted)
338 return _total_seconds(now-submitted)
339
339
340 @property
340 @property
341 @check_ready
341 @check_ready
342 def serial_time(self):
342 def serial_time(self):
343 """serial computation time of a parallel calculation
343 """serial computation time of a parallel calculation
344
344
345 Computed as the sum of (completed-started) of each task
345 Computed as the sum of (completed-started) of each task
346 """
346 """
347 t = 0
347 t = 0
348 for md in self._metadata:
348 for md in self._metadata:
349 t += _total_seconds(md['completed'] - md['started'])
349 t += _total_seconds(md['completed'] - md['started'])
350 return t
350 return t
351
351
352 @property
352 @property
353 @check_ready
353 @check_ready
354 def wall_time(self):
354 def wall_time(self):
355 """actual computation time of a parallel calculation
355 """actual computation time of a parallel calculation
356
356
357 Computed as the time between the latest `received` stamp
357 Computed as the time between the latest `received` stamp
358 and the earliest `submitted`.
358 and the earliest `submitted`.
359
359
360 Only reliable if Client was spinning/waiting when the task finished, because
360 Only reliable if Client was spinning/waiting when the task finished, because
361 the `received` timestamp is created when a result is pulled off of the zmq queue,
361 the `received` timestamp is created when a result is pulled off of the zmq queue,
362 which happens as a result of `client.spin()`.
362 which happens as a result of `client.spin()`.
363
363
364 For similar comparison of other timestamp pairs, check out AsyncResult.timedelta.
364 For similar comparison of other timestamp pairs, check out AsyncResult.timedelta.
365
365
366 """
366 """
367 return self.timedelta(self.submitted, self.received)
367 return self.timedelta(self.submitted, self.received)
368
368
369 def wait_interactive(self, interval=1., timeout=None):
369 def wait_interactive(self, interval=1., timeout=None):
370 """interactive wait, printing progress at regular intervals"""
370 """interactive wait, printing progress at regular intervals"""
371 N = len(self)
371 N = len(self)
372 tic = time.time()
372 tic = time.time()
373 while not self.ready() and (timeout is None or time.time() - tic <= timeout):
373 while not self.ready() and (timeout is None or time.time() - tic <= timeout):
374 self.wait(interval)
374 self.wait(interval)
375 clear_output()
375 clear_output()
376 print "%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed),
376 print "%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed),
377 sys.stdout.flush()
377 sys.stdout.flush()
378 print
378 print
379 print "done"
379 print "done"
380
380
381 def _republish_displaypub(self, content, eid):
381 def _republish_displaypub(self, content, eid):
382 """republish individual displaypub content dicts"""
382 """republish individual displaypub content dicts"""
383 try:
383 try:
384 ip = get_ipython()
384 ip = get_ipython()
385 except NameError:
385 except NameError:
386 # displaypub is meaningless outside IPython
386 # displaypub is meaningless outside IPython
387 return
387 return
388 md = content['metadata'] or {}
388 md = content['metadata'] or {}
389 md['engine'] = eid
389 md['engine'] = eid
390 ip.display_pub.publish(content['source'], content['data'], md)
390 ip.display_pub.publish(content['source'], content['data'], md)
391
391
392
392
393 def _display_single_result(self):
393 def _display_single_result(self):
394
394 if self.stdout:
395 print self.stdout
395 print self.stdout
396 print >> sys.stderr, self.stderr
396 if self.stderr:
397 print >> sys.stderr, self.stderr
397
398
398 try:
399 try:
399 get_ipython()
400 get_ipython()
400 except NameError:
401 except NameError:
401 # displaypub is meaningless outside IPython
402 # displaypub is meaningless outside IPython
402 return
403 return
403
404
404 for output in self.outputs:
405 for output in self.outputs:
405 self._republish_displaypub(output, self.engine_id)
406 self._republish_displaypub(output, self.engine_id)
406
407
407 if self.pyout is not None:
408 if self.pyout is not None:
408 display(self.get())
409 display(self.get())
409
410
410 @check_ready
411 @check_ready
411 def display_outputs(self, groupby="type"):
412 def display_outputs(self, groupby="type"):
412 """republish the outputs of the computation
413 """republish the outputs of the computation
413
414
414 Parameters
415 Parameters
415 ----------
416 ----------
416
417
417 groupby : str [default: type]
418 groupby : str [default: type]
418 if 'type':
419 if 'type':
419 Group outputs by type (show all stdout, then all stderr, etc.):
420 Group outputs by type (show all stdout, then all stderr, etc.):
420
421
421 [stdout:1] foo
422 [stdout:1] foo
422 [stdout:2] foo
423 [stdout:2] foo
423 [stderr:1] bar
424 [stderr:1] bar
424 [stderr:2] bar
425 [stderr:2] bar
425 if 'engine':
426 if 'engine':
426 Display outputs for each engine before moving on to the next:
427 Display outputs for each engine before moving on to the next:
427
428
428 [stdout:1] foo
429 [stdout:1] foo
429 [stderr:1] bar
430 [stderr:1] bar
430 [stdout:2] foo
431 [stdout:2] foo
431 [stderr:2] bar
432 [stderr:2] bar
432
433
433 if 'order':
434 if 'order':
434 Like 'type', but further collate individual displaypub
435 Like 'type', but further collate individual displaypub
435 outputs. This is meant for cases of each command producing
436 outputs. This is meant for cases of each command producing
436 several plots, and you would like to see all of the first
437 several plots, and you would like to see all of the first
437 plots together, then all of the second plots, and so on.
438 plots together, then all of the second plots, and so on.
438 """
439 """
439 # flush iopub, just in case
440 # flush iopub, just in case
440 self._client._flush_iopub(self._client._iopub_socket)
441 self._client._flush_iopub(self._client._iopub_socket)
441 if self._single_result:
442 if self._single_result:
442 self._display_single_result()
443 self._display_single_result()
443 return
444 return
444
445
445 stdouts = [s.rstrip() for s in self.stdout]
446 stdouts = [s.rstrip() for s in self.stdout]
446 stderrs = [s.rstrip() for s in self.stderr]
447 stderrs = [s.rstrip() for s in self.stderr]
447 pyouts = [p for p in self.pyout]
448 pyouts = [p for p in self.pyout]
448 output_lists = self.outputs
449 output_lists = self.outputs
449 results = self.get()
450 results = self.get()
450
451
451 targets = self.engine_id
452 targets = self.engine_id
452
453
453 if groupby == "engine":
454 if groupby == "engine":
454 for eid,stdout,stderr,outputs,r,pyout in zip(
455 for eid,stdout,stderr,outputs,r,pyout in zip(
455 targets, stdouts, stderrs, output_lists, results, pyouts
456 targets, stdouts, stderrs, output_lists, results, pyouts
456 ):
457 ):
457 if stdout:
458 if stdout:
458 print '[stdout:%i]' % eid, stdout
459 print '[stdout:%i]' % eid, stdout
459 if stderr:
460 if stderr:
460 print >> sys.stderr, '[stderr:%i]' % eid, stderr
461 print >> sys.stderr, '[stderr:%i]' % eid, stderr
461
462
462 try:
463 try:
463 get_ipython()
464 get_ipython()
464 except NameError:
465 except NameError:
465 # displaypub is meaningless outside IPython
466 # displaypub is meaningless outside IPython
466 return
467 return
467
468
468 for output in outputs:
469 for output in outputs:
469 self._republish_displaypub(output, eid)
470 self._republish_displaypub(output, eid)
470
471
471 if pyout is not None:
472 if pyout is not None:
472 display(r)
473 display(r)
473
474
474 elif groupby in ('type', 'order'):
475 elif groupby in ('type', 'order'):
475 # republish stdout:
476 # republish stdout:
476 if any(stdouts):
477 if any(stdouts):
477 for eid,stdout in zip(targets, stdouts):
478 for eid,stdout in zip(targets, stdouts):
478 print '[stdout:%i]' % eid, stdout
479 print '[stdout:%i]' % eid, stdout
479
480
480 # republish stderr:
481 # republish stderr:
481 if any(stderrs):
482 if any(stderrs):
482 for eid,stderr in zip(targets, stderrs):
483 for eid,stderr in zip(targets, stderrs):
483 print >> sys.stderr, '[stderr:%i]' % eid, stderr
484 print >> sys.stderr, '[stderr:%i]' % eid, stderr
484
485
485 try:
486 try:
486 get_ipython()
487 get_ipython()
487 except NameError:
488 except NameError:
488 # displaypub is meaningless outside IPython
489 # displaypub is meaningless outside IPython
489 return
490 return
490
491
491 if groupby == 'order':
492 if groupby == 'order':
492 output_dict = dict((eid, outputs) for eid,outputs in zip(targets, output_lists))
493 output_dict = dict((eid, outputs) for eid,outputs in zip(targets, output_lists))
493 N = max(len(outputs) for outputs in output_lists)
494 N = max(len(outputs) for outputs in output_lists)
494 for i in range(N):
495 for i in range(N):
495 for eid in targets:
496 for eid in targets:
496 outputs = output_dict[eid]
497 outputs = output_dict[eid]
497 if len(outputs) >= N:
498 if len(outputs) >= N:
498 self._republish_displaypub(outputs[i], eid)
499 self._republish_displaypub(outputs[i], eid)
499 else:
500 else:
500 # republish displaypub output
501 # republish displaypub output
501 for eid,outputs in zip(targets, output_lists):
502 for eid,outputs in zip(targets, output_lists):
502 for output in outputs:
503 for output in outputs:
503 self._republish_displaypub(output, eid)
504 self._republish_displaypub(output, eid)
504
505
505 # finally, add pyout:
506 # finally, add pyout:
506 for eid,r,pyout in zip(targets, results, pyouts):
507 for eid,r,pyout in zip(targets, results, pyouts):
507 if pyout is not None:
508 if pyout is not None:
508 display(r)
509 display(r)
509
510
510 else:
511 else:
511 raise ValueError("groupby must be one of 'type', 'engine', 'collate', not %r" % groupby)
512 raise ValueError("groupby must be one of 'type', 'engine', 'collate', not %r" % groupby)
512
513
513
514
514
515
515
516
516 class AsyncMapResult(AsyncResult):
517 class AsyncMapResult(AsyncResult):
517 """Class for representing results of non-blocking gathers.
518 """Class for representing results of non-blocking gathers.
518
519
519 This will properly reconstruct the gather.
520 This will properly reconstruct the gather.
520
521
521 This class is iterable at any time, and will wait on results as they come.
522 This class is iterable at any time, and will wait on results as they come.
522
523
523 If ordered=False, then the first results to arrive will come first, otherwise
524 If ordered=False, then the first results to arrive will come first, otherwise
524 results will be yielded in the order they were submitted.
525 results will be yielded in the order they were submitted.
525
526
526 """
527 """
527
528
528 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
529 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
529 AsyncResult.__init__(self, client, msg_ids, fname=fname)
530 AsyncResult.__init__(self, client, msg_ids, fname=fname)
530 self._mapObject = mapObject
531 self._mapObject = mapObject
531 self._single_result = False
532 self._single_result = False
532 self.ordered = ordered
533 self.ordered = ordered
533
534
534 def _reconstruct_result(self, res):
535 def _reconstruct_result(self, res):
535 """Perform the gather on the actual results."""
536 """Perform the gather on the actual results."""
536 return self._mapObject.joinPartitions(res)
537 return self._mapObject.joinPartitions(res)
537
538
538 # asynchronous iterator:
539 # asynchronous iterator:
539 def __iter__(self):
540 def __iter__(self):
540 it = self._ordered_iter if self.ordered else self._unordered_iter
541 it = self._ordered_iter if self.ordered else self._unordered_iter
541 for r in it():
542 for r in it():
542 yield r
543 yield r
543
544
544 # asynchronous ordered iterator:
545 # asynchronous ordered iterator:
545 def _ordered_iter(self):
546 def _ordered_iter(self):
546 """iterator for results *as they arrive*, preserving submission order."""
547 """iterator for results *as they arrive*, preserving submission order."""
547 try:
548 try:
548 rlist = self.get(0)
549 rlist = self.get(0)
549 except error.TimeoutError:
550 except error.TimeoutError:
550 # wait for each result individually
551 # wait for each result individually
551 for msg_id in self.msg_ids:
552 for msg_id in self.msg_ids:
552 ar = AsyncResult(self._client, msg_id, self._fname)
553 ar = AsyncResult(self._client, msg_id, self._fname)
553 rlist = ar.get()
554 rlist = ar.get()
554 try:
555 try:
555 for r in rlist:
556 for r in rlist:
556 yield r
557 yield r
557 except TypeError:
558 except TypeError:
558 # flattened, not a list
559 # flattened, not a list
559 # this could get broken by flattened data that returns iterables
560 # this could get broken by flattened data that returns iterables
560 # but most calls to map do not expose the `flatten` argument
561 # but most calls to map do not expose the `flatten` argument
561 yield rlist
562 yield rlist
562 else:
563 else:
563 # already done
564 # already done
564 for r in rlist:
565 for r in rlist:
565 yield r
566 yield r
566
567
567 # asynchronous unordered iterator:
568 # asynchronous unordered iterator:
568 def _unordered_iter(self):
569 def _unordered_iter(self):
569 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
570 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
570 try:
571 try:
571 rlist = self.get(0)
572 rlist = self.get(0)
572 except error.TimeoutError:
573 except error.TimeoutError:
573 pending = set(self.msg_ids)
574 pending = set(self.msg_ids)
574 while pending:
575 while pending:
575 try:
576 try:
576 self._client.wait(pending, 1e-3)
577 self._client.wait(pending, 1e-3)
577 except error.TimeoutError:
578 except error.TimeoutError:
578 # ignore timeout error, because that only means
579 # ignore timeout error, because that only means
579 # *some* jobs are outstanding
580 # *some* jobs are outstanding
580 pass
581 pass
581 # update ready set with those no longer outstanding:
582 # update ready set with those no longer outstanding:
582 ready = pending.difference(self._client.outstanding)
583 ready = pending.difference(self._client.outstanding)
583 # update pending to exclude those that are finished
584 # update pending to exclude those that are finished
584 pending = pending.difference(ready)
585 pending = pending.difference(ready)
585 while ready:
586 while ready:
586 msg_id = ready.pop()
587 msg_id = ready.pop()
587 ar = AsyncResult(self._client, msg_id, self._fname)
588 ar = AsyncResult(self._client, msg_id, self._fname)
588 rlist = ar.get()
589 rlist = ar.get()
589 try:
590 try:
590 for r in rlist:
591 for r in rlist:
591 yield r
592 yield r
592 except TypeError:
593 except TypeError:
593 # flattened, not a list
594 # flattened, not a list
594 # this could get broken by flattened data that returns iterables
595 # this could get broken by flattened data that returns iterables
595 # but most calls to map do not expose the `flatten` argument
596 # but most calls to map do not expose the `flatten` argument
596 yield rlist
597 yield rlist
597 else:
598 else:
598 # already done
599 # already done
599 for r in rlist:
600 for r in rlist:
600 yield r
601 yield r
601
602
602
603
603
604
604 class AsyncHubResult(AsyncResult):
605 class AsyncHubResult(AsyncResult):
605 """Class to wrap pending results that must be requested from the Hub.
606 """Class to wrap pending results that must be requested from the Hub.
606
607
607 Note that waiting/polling on these objects requires polling the Hubover the network,
608 Note that waiting/polling on these objects requires polling the Hubover the network,
608 so use `AsyncHubResult.wait()` sparingly.
609 so use `AsyncHubResult.wait()` sparingly.
609 """
610 """
610
611
611 def wait(self, timeout=-1):
612 def wait(self, timeout=-1):
612 """wait for result to complete."""
613 """wait for result to complete."""
613 start = time.time()
614 start = time.time()
614 if self._ready:
615 if self._ready:
615 return
616 return
616 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
617 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
617 local_ready = self._client.wait(local_ids, timeout)
618 local_ready = self._client.wait(local_ids, timeout)
618 if local_ready:
619 if local_ready:
619 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
620 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
620 if not remote_ids:
621 if not remote_ids:
621 self._ready = True
622 self._ready = True
622 else:
623 else:
623 rdict = self._client.result_status(remote_ids, status_only=False)
624 rdict = self._client.result_status(remote_ids, status_only=False)
624 pending = rdict['pending']
625 pending = rdict['pending']
625 while pending and (timeout < 0 or time.time() < start+timeout):
626 while pending and (timeout < 0 or time.time() < start+timeout):
626 rdict = self._client.result_status(remote_ids, status_only=False)
627 rdict = self._client.result_status(remote_ids, status_only=False)
627 pending = rdict['pending']
628 pending = rdict['pending']
628 if pending:
629 if pending:
629 time.sleep(0.1)
630 time.sleep(0.1)
630 if not pending:
631 if not pending:
631 self._ready = True
632 self._ready = True
632 if self._ready:
633 if self._ready:
633 try:
634 try:
634 results = map(self._client.results.get, self.msg_ids)
635 results = map(self._client.results.get, self.msg_ids)
635 self._result = results
636 self._result = results
636 if self._single_result:
637 if self._single_result:
637 r = results[0]
638 r = results[0]
638 if isinstance(r, Exception):
639 if isinstance(r, Exception):
639 raise r
640 raise r
640 else:
641 else:
641 results = error.collect_exceptions(results, self._fname)
642 results = error.collect_exceptions(results, self._fname)
642 self._result = self._reconstruct_result(results)
643 self._result = self._reconstruct_result(results)
643 except Exception, e:
644 except Exception, e:
644 self._exception = e
645 self._exception = e
645 self._success = False
646 self._success = False
646 else:
647 else:
647 self._success = True
648 self._success = True
648 finally:
649 finally:
649 self._metadata = map(self._client.metadata.get, self.msg_ids)
650 self._metadata = map(self._client.metadata.get, self.msg_ids)
650
651
651 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult'] No newline at end of file
652 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']
General Comments 0
You need to be logged in to leave comments. Login now