##// END OF EJS Templates
add missing '>> sys.stderr' for one display_outputs case
MinRK -
Show More
@@ -1,651 +1,651
1 """AsyncResult objects for the client
1 """AsyncResult objects for the client
2
2
3 Authors:
3 Authors:
4
4
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import sys
18 import sys
19 import time
19 import time
20 from datetime import datetime
20 from datetime import datetime
21
21
22 from zmq import MessageTracker
22 from zmq import MessageTracker
23
23
24 from IPython.core.display import clear_output, display
24 from IPython.core.display import clear_output, display
25 from IPython.external.decorator import decorator
25 from IPython.external.decorator import decorator
26 from IPython.parallel import error
26 from IPython.parallel import error
27
27
28 #-----------------------------------------------------------------------------
28 #-----------------------------------------------------------------------------
29 # Functions
29 # Functions
30 #-----------------------------------------------------------------------------
30 #-----------------------------------------------------------------------------
31
31
32 def _total_seconds(td):
32 def _total_seconds(td):
33 """timedelta.total_seconds was added in 2.7"""
33 """timedelta.total_seconds was added in 2.7"""
34 try:
34 try:
35 # Python >= 2.7
35 # Python >= 2.7
36 return td.total_seconds()
36 return td.total_seconds()
37 except AttributeError:
37 except AttributeError:
38 # Python 2.6
38 # Python 2.6
39 return 1e-6 * (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)
39 return 1e-6 * (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)
40
40
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42 # Classes
42 # Classes
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44
44
45 # global empty tracker that's always done:
45 # global empty tracker that's always done:
46 finished_tracker = MessageTracker()
46 finished_tracker = MessageTracker()
47
47
48 @decorator
48 @decorator
49 def check_ready(f, self, *args, **kwargs):
49 def check_ready(f, self, *args, **kwargs):
50 """Call spin() to sync state prior to calling the method."""
50 """Call spin() to sync state prior to calling the method."""
51 self.wait(0)
51 self.wait(0)
52 if not self._ready:
52 if not self._ready:
53 raise error.TimeoutError("result not ready")
53 raise error.TimeoutError("result not ready")
54 return f(self, *args, **kwargs)
54 return f(self, *args, **kwargs)
55
55
56 class AsyncResult(object):
56 class AsyncResult(object):
57 """Class for representing results of non-blocking calls.
57 """Class for representing results of non-blocking calls.
58
58
59 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
59 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
60 """
60 """
61
61
62 msg_ids = None
62 msg_ids = None
63 _targets = None
63 _targets = None
64 _tracker = None
64 _tracker = None
65 _single_result = False
65 _single_result = False
66
66
67 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
67 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
68 if isinstance(msg_ids, basestring):
68 if isinstance(msg_ids, basestring):
69 # always a list
69 # always a list
70 msg_ids = [msg_ids]
70 msg_ids = [msg_ids]
71 if tracker is None:
71 if tracker is None:
72 # default to always done
72 # default to always done
73 tracker = finished_tracker
73 tracker = finished_tracker
74 self._client = client
74 self._client = client
75 self.msg_ids = msg_ids
75 self.msg_ids = msg_ids
76 self._fname=fname
76 self._fname=fname
77 self._targets = targets
77 self._targets = targets
78 self._tracker = tracker
78 self._tracker = tracker
79 self._ready = False
79 self._ready = False
80 self._success = None
80 self._success = None
81 self._metadata = None
81 self._metadata = None
82 if len(msg_ids) == 1:
82 if len(msg_ids) == 1:
83 self._single_result = not isinstance(targets, (list, tuple))
83 self._single_result = not isinstance(targets, (list, tuple))
84 else:
84 else:
85 self._single_result = False
85 self._single_result = False
86
86
87 def __repr__(self):
87 def __repr__(self):
88 if self._ready:
88 if self._ready:
89 return "<%s: finished>"%(self.__class__.__name__)
89 return "<%s: finished>"%(self.__class__.__name__)
90 else:
90 else:
91 return "<%s: %s>"%(self.__class__.__name__,self._fname)
91 return "<%s: %s>"%(self.__class__.__name__,self._fname)
92
92
93
93
94 def _reconstruct_result(self, res):
94 def _reconstruct_result(self, res):
95 """Reconstruct our result from actual result list (always a list)
95 """Reconstruct our result from actual result list (always a list)
96
96
97 Override me in subclasses for turning a list of results
97 Override me in subclasses for turning a list of results
98 into the expected form.
98 into the expected form.
99 """
99 """
100 if self._single_result:
100 if self._single_result:
101 return res[0]
101 return res[0]
102 else:
102 else:
103 return res
103 return res
104
104
105 def get(self, timeout=-1):
105 def get(self, timeout=-1):
106 """Return the result when it arrives.
106 """Return the result when it arrives.
107
107
108 If `timeout` is not ``None`` and the result does not arrive within
108 If `timeout` is not ``None`` and the result does not arrive within
109 `timeout` seconds then ``TimeoutError`` is raised. If the
109 `timeout` seconds then ``TimeoutError`` is raised. If the
110 remote call raised an exception then that exception will be reraised
110 remote call raised an exception then that exception will be reraised
111 by get() inside a `RemoteError`.
111 by get() inside a `RemoteError`.
112 """
112 """
113 if not self.ready():
113 if not self.ready():
114 self.wait(timeout)
114 self.wait(timeout)
115
115
116 if self._ready:
116 if self._ready:
117 if self._success:
117 if self._success:
118 return self._result
118 return self._result
119 else:
119 else:
120 raise self._exception
120 raise self._exception
121 else:
121 else:
122 raise error.TimeoutError("Result not ready.")
122 raise error.TimeoutError("Result not ready.")
123
123
124 def ready(self):
124 def ready(self):
125 """Return whether the call has completed."""
125 """Return whether the call has completed."""
126 if not self._ready:
126 if not self._ready:
127 self.wait(0)
127 self.wait(0)
128 return self._ready
128 return self._ready
129
129
130 def wait(self, timeout=-1):
130 def wait(self, timeout=-1):
131 """Wait until the result is available or until `timeout` seconds pass.
131 """Wait until the result is available or until `timeout` seconds pass.
132
132
133 This method always returns None.
133 This method always returns None.
134 """
134 """
135 if self._ready:
135 if self._ready:
136 return
136 return
137 self._ready = self._client.wait(self.msg_ids, timeout)
137 self._ready = self._client.wait(self.msg_ids, timeout)
138 if self._ready:
138 if self._ready:
139 try:
139 try:
140 results = map(self._client.results.get, self.msg_ids)
140 results = map(self._client.results.get, self.msg_ids)
141 self._result = results
141 self._result = results
142 if self._single_result:
142 if self._single_result:
143 r = results[0]
143 r = results[0]
144 if isinstance(r, Exception):
144 if isinstance(r, Exception):
145 raise r
145 raise r
146 else:
146 else:
147 results = error.collect_exceptions(results, self._fname)
147 results = error.collect_exceptions(results, self._fname)
148 self._result = self._reconstruct_result(results)
148 self._result = self._reconstruct_result(results)
149 except Exception, e:
149 except Exception, e:
150 self._exception = e
150 self._exception = e
151 self._success = False
151 self._success = False
152 else:
152 else:
153 self._success = True
153 self._success = True
154 finally:
154 finally:
155 self._metadata = map(self._client.metadata.get, self.msg_ids)
155 self._metadata = map(self._client.metadata.get, self.msg_ids)
156
156
157
157
158 def successful(self):
158 def successful(self):
159 """Return whether the call completed without raising an exception.
159 """Return whether the call completed without raising an exception.
160
160
161 Will raise ``AssertionError`` if the result is not ready.
161 Will raise ``AssertionError`` if the result is not ready.
162 """
162 """
163 assert self.ready()
163 assert self.ready()
164 return self._success
164 return self._success
165
165
166 #----------------------------------------------------------------
166 #----------------------------------------------------------------
167 # Extra methods not in mp.pool.AsyncResult
167 # Extra methods not in mp.pool.AsyncResult
168 #----------------------------------------------------------------
168 #----------------------------------------------------------------
169
169
170 def get_dict(self, timeout=-1):
170 def get_dict(self, timeout=-1):
171 """Get the results as a dict, keyed by engine_id.
171 """Get the results as a dict, keyed by engine_id.
172
172
173 timeout behavior is described in `get()`.
173 timeout behavior is described in `get()`.
174 """
174 """
175
175
176 results = self.get(timeout)
176 results = self.get(timeout)
177 engine_ids = [ md['engine_id'] for md in self._metadata ]
177 engine_ids = [ md['engine_id'] for md in self._metadata ]
178 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
178 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
179 maxcount = bycount.count(bycount[-1])
179 maxcount = bycount.count(bycount[-1])
180 if maxcount > 1:
180 if maxcount > 1:
181 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
181 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
182 maxcount, bycount[-1]))
182 maxcount, bycount[-1]))
183
183
184 return dict(zip(engine_ids,results))
184 return dict(zip(engine_ids,results))
185
185
186 @property
186 @property
187 def result(self):
187 def result(self):
188 """result property wrapper for `get(timeout=0)`."""
188 """result property wrapper for `get(timeout=0)`."""
189 return self.get()
189 return self.get()
190
190
191 # abbreviated alias:
191 # abbreviated alias:
192 r = result
192 r = result
193
193
194 @property
194 @property
195 @check_ready
195 @check_ready
196 def metadata(self):
196 def metadata(self):
197 """property for accessing execution metadata."""
197 """property for accessing execution metadata."""
198 if self._single_result:
198 if self._single_result:
199 return self._metadata[0]
199 return self._metadata[0]
200 else:
200 else:
201 return self._metadata
201 return self._metadata
202
202
203 @property
203 @property
204 def result_dict(self):
204 def result_dict(self):
205 """result property as a dict."""
205 """result property as a dict."""
206 return self.get_dict()
206 return self.get_dict()
207
207
208 def __dict__(self):
208 def __dict__(self):
209 return self.get_dict(0)
209 return self.get_dict(0)
210
210
211 def abort(self):
211 def abort(self):
212 """abort my tasks."""
212 """abort my tasks."""
213 assert not self.ready(), "Can't abort, I am already done!"
213 assert not self.ready(), "Can't abort, I am already done!"
214 return self._client.abort(self.msg_ids, targets=self._targets, block=True)
214 return self._client.abort(self.msg_ids, targets=self._targets, block=True)
215
215
216 @property
216 @property
217 def sent(self):
217 def sent(self):
218 """check whether my messages have been sent."""
218 """check whether my messages have been sent."""
219 return self._tracker.done
219 return self._tracker.done
220
220
221 def wait_for_send(self, timeout=-1):
221 def wait_for_send(self, timeout=-1):
222 """wait for pyzmq send to complete.
222 """wait for pyzmq send to complete.
223
223
224 This is necessary when sending arrays that you intend to edit in-place.
224 This is necessary when sending arrays that you intend to edit in-place.
225 `timeout` is in seconds, and will raise TimeoutError if it is reached
225 `timeout` is in seconds, and will raise TimeoutError if it is reached
226 before the send completes.
226 before the send completes.
227 """
227 """
228 return self._tracker.wait(timeout)
228 return self._tracker.wait(timeout)
229
229
230 #-------------------------------------
230 #-------------------------------------
231 # dict-access
231 # dict-access
232 #-------------------------------------
232 #-------------------------------------
233
233
234 @check_ready
234 @check_ready
235 def __getitem__(self, key):
235 def __getitem__(self, key):
236 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
236 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
237 """
237 """
238 if isinstance(key, int):
238 if isinstance(key, int):
239 return error.collect_exceptions([self._result[key]], self._fname)[0]
239 return error.collect_exceptions([self._result[key]], self._fname)[0]
240 elif isinstance(key, slice):
240 elif isinstance(key, slice):
241 return error.collect_exceptions(self._result[key], self._fname)
241 return error.collect_exceptions(self._result[key], self._fname)
242 elif isinstance(key, basestring):
242 elif isinstance(key, basestring):
243 values = [ md[key] for md in self._metadata ]
243 values = [ md[key] for md in self._metadata ]
244 if self._single_result:
244 if self._single_result:
245 return values[0]
245 return values[0]
246 else:
246 else:
247 return values
247 return values
248 else:
248 else:
249 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
249 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
250
250
251 def __getattr__(self, key):
251 def __getattr__(self, key):
252 """getattr maps to getitem for convenient attr access to metadata."""
252 """getattr maps to getitem for convenient attr access to metadata."""
253 try:
253 try:
254 return self.__getitem__(key)
254 return self.__getitem__(key)
255 except (error.TimeoutError, KeyError):
255 except (error.TimeoutError, KeyError):
256 raise AttributeError("%r object has no attribute %r"%(
256 raise AttributeError("%r object has no attribute %r"%(
257 self.__class__.__name__, key))
257 self.__class__.__name__, key))
258
258
259 # asynchronous iterator:
259 # asynchronous iterator:
260 def __iter__(self):
260 def __iter__(self):
261 if self._single_result:
261 if self._single_result:
262 raise TypeError("AsyncResults with a single result are not iterable.")
262 raise TypeError("AsyncResults with a single result are not iterable.")
263 try:
263 try:
264 rlist = self.get(0)
264 rlist = self.get(0)
265 except error.TimeoutError:
265 except error.TimeoutError:
266 # wait for each result individually
266 # wait for each result individually
267 for msg_id in self.msg_ids:
267 for msg_id in self.msg_ids:
268 ar = AsyncResult(self._client, msg_id, self._fname)
268 ar = AsyncResult(self._client, msg_id, self._fname)
269 yield ar.get()
269 yield ar.get()
270 else:
270 else:
271 # already done
271 # already done
272 for r in rlist:
272 for r in rlist:
273 yield r
273 yield r
274
274
275 def __len__(self):
275 def __len__(self):
276 return len(self.msg_ids)
276 return len(self.msg_ids)
277
277
278 #-------------------------------------
278 #-------------------------------------
279 # Sugar methods and attributes
279 # Sugar methods and attributes
280 #-------------------------------------
280 #-------------------------------------
281
281
282 def timedelta(self, start, end, start_key=min, end_key=max):
282 def timedelta(self, start, end, start_key=min, end_key=max):
283 """compute the difference between two sets of timestamps
283 """compute the difference between two sets of timestamps
284
284
285 The default behavior is to use the earliest of the first
285 The default behavior is to use the earliest of the first
286 and the latest of the second list, but this can be changed
286 and the latest of the second list, but this can be changed
287 by passing a different
287 by passing a different
288
288
289 Parameters
289 Parameters
290 ----------
290 ----------
291
291
292 start : one or more datetime objects (e.g. ar.submitted)
292 start : one or more datetime objects (e.g. ar.submitted)
293 end : one or more datetime objects (e.g. ar.received)
293 end : one or more datetime objects (e.g. ar.received)
294 start_key : callable
294 start_key : callable
295 Function to call on `start` to extract the relevant
295 Function to call on `start` to extract the relevant
296 entry [defalt: min]
296 entry [defalt: min]
297 end_key : callable
297 end_key : callable
298 Function to call on `end` to extract the relevant
298 Function to call on `end` to extract the relevant
299 entry [default: max]
299 entry [default: max]
300
300
301 Returns
301 Returns
302 -------
302 -------
303
303
304 dt : float
304 dt : float
305 The time elapsed (in seconds) between the two selected timestamps.
305 The time elapsed (in seconds) between the two selected timestamps.
306 """
306 """
307 if not isinstance(start, datetime):
307 if not isinstance(start, datetime):
308 # handle single_result AsyncResults, where ar.stamp is single object,
308 # handle single_result AsyncResults, where ar.stamp is single object,
309 # not a list
309 # not a list
310 start = start_key(start)
310 start = start_key(start)
311 if not isinstance(end, datetime):
311 if not isinstance(end, datetime):
312 # handle single_result AsyncResults, where ar.stamp is single object,
312 # handle single_result AsyncResults, where ar.stamp is single object,
313 # not a list
313 # not a list
314 end = end_key(end)
314 end = end_key(end)
315 return _total_seconds(end - start)
315 return _total_seconds(end - start)
316
316
317 @property
317 @property
318 def progress(self):
318 def progress(self):
319 """the number of tasks which have been completed at this point.
319 """the number of tasks which have been completed at this point.
320
320
321 Fractional progress would be given by 1.0 * ar.progress / len(ar)
321 Fractional progress would be given by 1.0 * ar.progress / len(ar)
322 """
322 """
323 self.wait(0)
323 self.wait(0)
324 return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding))
324 return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding))
325
325
326 @property
326 @property
327 def elapsed(self):
327 def elapsed(self):
328 """elapsed time since initial submission"""
328 """elapsed time since initial submission"""
329 if self.ready():
329 if self.ready():
330 return self.wall_time
330 return self.wall_time
331
331
332 now = submitted = datetime.now()
332 now = submitted = datetime.now()
333 for msg_id in self.msg_ids:
333 for msg_id in self.msg_ids:
334 if msg_id in self._client.metadata:
334 if msg_id in self._client.metadata:
335 stamp = self._client.metadata[msg_id]['submitted']
335 stamp = self._client.metadata[msg_id]['submitted']
336 if stamp and stamp < submitted:
336 if stamp and stamp < submitted:
337 submitted = stamp
337 submitted = stamp
338 return _total_seconds(now-submitted)
338 return _total_seconds(now-submitted)
339
339
340 @property
340 @property
341 @check_ready
341 @check_ready
342 def serial_time(self):
342 def serial_time(self):
343 """serial computation time of a parallel calculation
343 """serial computation time of a parallel calculation
344
344
345 Computed as the sum of (completed-started) of each task
345 Computed as the sum of (completed-started) of each task
346 """
346 """
347 t = 0
347 t = 0
348 for md in self._metadata:
348 for md in self._metadata:
349 t += _total_seconds(md['completed'] - md['started'])
349 t += _total_seconds(md['completed'] - md['started'])
350 return t
350 return t
351
351
352 @property
352 @property
353 @check_ready
353 @check_ready
354 def wall_time(self):
354 def wall_time(self):
355 """actual computation time of a parallel calculation
355 """actual computation time of a parallel calculation
356
356
357 Computed as the time between the latest `received` stamp
357 Computed as the time between the latest `received` stamp
358 and the earliest `submitted`.
358 and the earliest `submitted`.
359
359
360 Only reliable if Client was spinning/waiting when the task finished, because
360 Only reliable if Client was spinning/waiting when the task finished, because
361 the `received` timestamp is created when a result is pulled off of the zmq queue,
361 the `received` timestamp is created when a result is pulled off of the zmq queue,
362 which happens as a result of `client.spin()`.
362 which happens as a result of `client.spin()`.
363
363
364 For similar comparison of other timestamp pairs, check out AsyncResult.timedelta.
364 For similar comparison of other timestamp pairs, check out AsyncResult.timedelta.
365
365
366 """
366 """
367 return self.timedelta(self.submitted, self.received)
367 return self.timedelta(self.submitted, self.received)
368
368
369 def wait_interactive(self, interval=1., timeout=None):
369 def wait_interactive(self, interval=1., timeout=None):
370 """interactive wait, printing progress at regular intervals"""
370 """interactive wait, printing progress at regular intervals"""
371 N = len(self)
371 N = len(self)
372 tic = time.time()
372 tic = time.time()
373 while not self.ready() and (timeout is None or time.time() - tic <= timeout):
373 while not self.ready() and (timeout is None or time.time() - tic <= timeout):
374 self.wait(interval)
374 self.wait(interval)
375 clear_output()
375 clear_output()
376 print "%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed),
376 print "%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed),
377 sys.stdout.flush()
377 sys.stdout.flush()
378 print
378 print
379 print "done"
379 print "done"
380
380
381 def _republish_displaypub(self, content, eid):
381 def _republish_displaypub(self, content, eid):
382 """republish individual displaypub content dicts"""
382 """republish individual displaypub content dicts"""
383 try:
383 try:
384 ip = get_ipython()
384 ip = get_ipython()
385 except NameError:
385 except NameError:
386 # displaypub is meaningless outside IPython
386 # displaypub is meaningless outside IPython
387 return
387 return
388 md = content['metadata'] or {}
388 md = content['metadata'] or {}
389 md['engine'] = eid
389 md['engine'] = eid
390 ip.display_pub.publish(content['source'], content['data'], md)
390 ip.display_pub.publish(content['source'], content['data'], md)
391
391
392
392
393 def _display_single_result(self):
393 def _display_single_result(self):
394
394
395 print self.stdout
395 print self.stdout
396 print >> sys.stderr, self.stderr
396 print >> sys.stderr, self.stderr
397
397
398 try:
398 try:
399 get_ipython()
399 get_ipython()
400 except NameError:
400 except NameError:
401 # displaypub is meaningless outside IPython
401 # displaypub is meaningless outside IPython
402 return
402 return
403
403
404 for output in self.outputs:
404 for output in self.outputs:
405 self._republish_displaypub(output, self.engine_id)
405 self._republish_displaypub(output, self.engine_id)
406
406
407 if self.pyout is not None:
407 if self.pyout is not None:
408 display(self.get())
408 display(self.get())
409
409
410 @check_ready
410 @check_ready
411 def display_outputs(self, groupby="type"):
411 def display_outputs(self, groupby="type"):
412 """republish the outputs of the computation
412 """republish the outputs of the computation
413
413
414 Parameters
414 Parameters
415 ----------
415 ----------
416
416
417 groupby : str [default: type]
417 groupby : str [default: type]
418 if 'type':
418 if 'type':
419 Group outputs by type (show all stdout, then all stderr, etc.):
419 Group outputs by type (show all stdout, then all stderr, etc.):
420
420
421 [stdout:1] foo
421 [stdout:1] foo
422 [stdout:2] foo
422 [stdout:2] foo
423 [stderr:1] bar
423 [stderr:1] bar
424 [stderr:2] bar
424 [stderr:2] bar
425 if 'engine':
425 if 'engine':
426 Display outputs for each engine before moving on to the next:
426 Display outputs for each engine before moving on to the next:
427
427
428 [stdout:1] foo
428 [stdout:1] foo
429 [stderr:1] bar
429 [stderr:1] bar
430 [stdout:2] foo
430 [stdout:2] foo
431 [stderr:2] bar
431 [stderr:2] bar
432
432
433 if 'order':
433 if 'order':
434 Like 'type', but further collate individual displaypub
434 Like 'type', but further collate individual displaypub
435 outputs. This is meant for cases of each command producing
435 outputs. This is meant for cases of each command producing
436 several plots, and you would like to see all of the first
436 several plots, and you would like to see all of the first
437 plots together, then all of the second plots, and so on.
437 plots together, then all of the second plots, and so on.
438 """
438 """
439 # flush iopub, just in case
439 # flush iopub, just in case
440 self._client._flush_iopub(self._client._iopub_socket)
440 self._client._flush_iopub(self._client._iopub_socket)
441 if self._single_result:
441 if self._single_result:
442 self._display_single_result()
442 self._display_single_result()
443 return
443 return
444
444
445 stdouts = [s.rstrip() for s in self.stdout]
445 stdouts = [s.rstrip() for s in self.stdout]
446 stderrs = [s.rstrip() for s in self.stderr]
446 stderrs = [s.rstrip() for s in self.stderr]
447 pyouts = [p for p in self.pyout]
447 pyouts = [p for p in self.pyout]
448 output_lists = self.outputs
448 output_lists = self.outputs
449 results = self.get()
449 results = self.get()
450
450
451 targets = self.engine_id
451 targets = self.engine_id
452
452
453 if groupby == "engine":
453 if groupby == "engine":
454 for eid,stdout,stderr,outputs,r,pyout in zip(
454 for eid,stdout,stderr,outputs,r,pyout in zip(
455 targets, stdouts, stderrs, output_lists, results, pyouts
455 targets, stdouts, stderrs, output_lists, results, pyouts
456 ):
456 ):
457 if stdout:
457 if stdout:
458 print '[stdout:%2i]' % eid, stdout
458 print '[stdout:%2i]' % eid, stdout
459 if stderr:
459 if stderr:
460 print '[stderr:%2i]' % eid, stderr
460 print >> sys.stderr, '[stderr:%2i]' % eid, stderr
461
461
462 try:
462 try:
463 get_ipython()
463 get_ipython()
464 except NameError:
464 except NameError:
465 # displaypub is meaningless outside IPython
465 # displaypub is meaningless outside IPython
466 return
466 return
467
467
468 for output in outputs:
468 for output in outputs:
469 self._republish_displaypub(output, eid)
469 self._republish_displaypub(output, eid)
470
470
471 if pyout is not None:
471 if pyout is not None:
472 display(r)
472 display(r)
473
473
474 elif groupby in ('type', 'order'):
474 elif groupby in ('type', 'order'):
475 # republish stdout:
475 # republish stdout:
476 if any(stdouts):
476 if any(stdouts):
477 for eid,stdout in zip(targets, stdouts):
477 for eid,stdout in zip(targets, stdouts):
478 print '[stdout:%2i]' % eid, stdout
478 print '[stdout:%2i]' % eid, stdout
479
479
480 # republish stderr:
480 # republish stderr:
481 if any(stderrs):
481 if any(stderrs):
482 for eid,stderr in zip(targets, stderrs):
482 for eid,stderr in zip(targets, stderrs):
483 print >> sys.stderr, '[stderr:%2i]' % eid, stderr
483 print >> sys.stderr, '[stderr:%2i]' % eid, stderr
484
484
485 try:
485 try:
486 get_ipython()
486 get_ipython()
487 except NameError:
487 except NameError:
488 # displaypub is meaningless outside IPython
488 # displaypub is meaningless outside IPython
489 return
489 return
490
490
491 if groupby == 'order':
491 if groupby == 'order':
492 output_dict = dict((eid, outputs) for eid,outputs in zip(targets, output_lists))
492 output_dict = dict((eid, outputs) for eid,outputs in zip(targets, output_lists))
493 N = max(len(outputs) for outputs in output_lists)
493 N = max(len(outputs) for outputs in output_lists)
494 for i in range(N):
494 for i in range(N):
495 for eid in targets:
495 for eid in targets:
496 outputs = output_dict[eid]
496 outputs = output_dict[eid]
497 if len(outputs) >= N:
497 if len(outputs) >= N:
498 self._republish_displaypub(outputs[i], eid)
498 self._republish_displaypub(outputs[i], eid)
499 else:
499 else:
500 # republish displaypub output
500 # republish displaypub output
501 for eid,outputs in zip(targets, output_lists):
501 for eid,outputs in zip(targets, output_lists):
502 for output in outputs:
502 for output in outputs:
503 self._republish_displaypub(output, eid)
503 self._republish_displaypub(output, eid)
504
504
505 # finally, add pyout:
505 # finally, add pyout:
506 for eid,r,pyout in zip(targets, results, pyouts):
506 for eid,r,pyout in zip(targets, results, pyouts):
507 if pyout is not None:
507 if pyout is not None:
508 display(r)
508 display(r)
509
509
510 else:
510 else:
511 raise ValueError("groupby must be one of 'type', 'engine', 'collate', not %r" % groupby)
511 raise ValueError("groupby must be one of 'type', 'engine', 'collate', not %r" % groupby)
512
512
513
513
514
514
515
515
516 class AsyncMapResult(AsyncResult):
516 class AsyncMapResult(AsyncResult):
517 """Class for representing results of non-blocking gathers.
517 """Class for representing results of non-blocking gathers.
518
518
519 This will properly reconstruct the gather.
519 This will properly reconstruct the gather.
520
520
521 This class is iterable at any time, and will wait on results as they come.
521 This class is iterable at any time, and will wait on results as they come.
522
522
523 If ordered=False, then the first results to arrive will come first, otherwise
523 If ordered=False, then the first results to arrive will come first, otherwise
524 results will be yielded in the order they were submitted.
524 results will be yielded in the order they were submitted.
525
525
526 """
526 """
527
527
528 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
528 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
529 AsyncResult.__init__(self, client, msg_ids, fname=fname)
529 AsyncResult.__init__(self, client, msg_ids, fname=fname)
530 self._mapObject = mapObject
530 self._mapObject = mapObject
531 self._single_result = False
531 self._single_result = False
532 self.ordered = ordered
532 self.ordered = ordered
533
533
534 def _reconstruct_result(self, res):
534 def _reconstruct_result(self, res):
535 """Perform the gather on the actual results."""
535 """Perform the gather on the actual results."""
536 return self._mapObject.joinPartitions(res)
536 return self._mapObject.joinPartitions(res)
537
537
538 # asynchronous iterator:
538 # asynchronous iterator:
539 def __iter__(self):
539 def __iter__(self):
540 it = self._ordered_iter if self.ordered else self._unordered_iter
540 it = self._ordered_iter if self.ordered else self._unordered_iter
541 for r in it():
541 for r in it():
542 yield r
542 yield r
543
543
544 # asynchronous ordered iterator:
544 # asynchronous ordered iterator:
545 def _ordered_iter(self):
545 def _ordered_iter(self):
546 """iterator for results *as they arrive*, preserving submission order."""
546 """iterator for results *as they arrive*, preserving submission order."""
547 try:
547 try:
548 rlist = self.get(0)
548 rlist = self.get(0)
549 except error.TimeoutError:
549 except error.TimeoutError:
550 # wait for each result individually
550 # wait for each result individually
551 for msg_id in self.msg_ids:
551 for msg_id in self.msg_ids:
552 ar = AsyncResult(self._client, msg_id, self._fname)
552 ar = AsyncResult(self._client, msg_id, self._fname)
553 rlist = ar.get()
553 rlist = ar.get()
554 try:
554 try:
555 for r in rlist:
555 for r in rlist:
556 yield r
556 yield r
557 except TypeError:
557 except TypeError:
558 # flattened, not a list
558 # flattened, not a list
559 # this could get broken by flattened data that returns iterables
559 # this could get broken by flattened data that returns iterables
560 # but most calls to map do not expose the `flatten` argument
560 # but most calls to map do not expose the `flatten` argument
561 yield rlist
561 yield rlist
562 else:
562 else:
563 # already done
563 # already done
564 for r in rlist:
564 for r in rlist:
565 yield r
565 yield r
566
566
567 # asynchronous unordered iterator:
567 # asynchronous unordered iterator:
568 def _unordered_iter(self):
568 def _unordered_iter(self):
569 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
569 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
570 try:
570 try:
571 rlist = self.get(0)
571 rlist = self.get(0)
572 except error.TimeoutError:
572 except error.TimeoutError:
573 pending = set(self.msg_ids)
573 pending = set(self.msg_ids)
574 while pending:
574 while pending:
575 try:
575 try:
576 self._client.wait(pending, 1e-3)
576 self._client.wait(pending, 1e-3)
577 except error.TimeoutError:
577 except error.TimeoutError:
578 # ignore timeout error, because that only means
578 # ignore timeout error, because that only means
579 # *some* jobs are outstanding
579 # *some* jobs are outstanding
580 pass
580 pass
581 # update ready set with those no longer outstanding:
581 # update ready set with those no longer outstanding:
582 ready = pending.difference(self._client.outstanding)
582 ready = pending.difference(self._client.outstanding)
583 # update pending to exclude those that are finished
583 # update pending to exclude those that are finished
584 pending = pending.difference(ready)
584 pending = pending.difference(ready)
585 while ready:
585 while ready:
586 msg_id = ready.pop()
586 msg_id = ready.pop()
587 ar = AsyncResult(self._client, msg_id, self._fname)
587 ar = AsyncResult(self._client, msg_id, self._fname)
588 rlist = ar.get()
588 rlist = ar.get()
589 try:
589 try:
590 for r in rlist:
590 for r in rlist:
591 yield r
591 yield r
592 except TypeError:
592 except TypeError:
593 # flattened, not a list
593 # flattened, not a list
594 # this could get broken by flattened data that returns iterables
594 # this could get broken by flattened data that returns iterables
595 # but most calls to map do not expose the `flatten` argument
595 # but most calls to map do not expose the `flatten` argument
596 yield rlist
596 yield rlist
597 else:
597 else:
598 # already done
598 # already done
599 for r in rlist:
599 for r in rlist:
600 yield r
600 yield r
601
601
602
602
603
603
604 class AsyncHubResult(AsyncResult):
604 class AsyncHubResult(AsyncResult):
605 """Class to wrap pending results that must be requested from the Hub.
605 """Class to wrap pending results that must be requested from the Hub.
606
606
607 Note that waiting/polling on these objects requires polling the Hubover the network,
607 Note that waiting/polling on these objects requires polling the Hubover the network,
608 so use `AsyncHubResult.wait()` sparingly.
608 so use `AsyncHubResult.wait()` sparingly.
609 """
609 """
610
610
611 def wait(self, timeout=-1):
611 def wait(self, timeout=-1):
612 """wait for result to complete."""
612 """wait for result to complete."""
613 start = time.time()
613 start = time.time()
614 if self._ready:
614 if self._ready:
615 return
615 return
616 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
616 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
617 local_ready = self._client.wait(local_ids, timeout)
617 local_ready = self._client.wait(local_ids, timeout)
618 if local_ready:
618 if local_ready:
619 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
619 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
620 if not remote_ids:
620 if not remote_ids:
621 self._ready = True
621 self._ready = True
622 else:
622 else:
623 rdict = self._client.result_status(remote_ids, status_only=False)
623 rdict = self._client.result_status(remote_ids, status_only=False)
624 pending = rdict['pending']
624 pending = rdict['pending']
625 while pending and (timeout < 0 or time.time() < start+timeout):
625 while pending and (timeout < 0 or time.time() < start+timeout):
626 rdict = self._client.result_status(remote_ids, status_only=False)
626 rdict = self._client.result_status(remote_ids, status_only=False)
627 pending = rdict['pending']
627 pending = rdict['pending']
628 if pending:
628 if pending:
629 time.sleep(0.1)
629 time.sleep(0.1)
630 if not pending:
630 if not pending:
631 self._ready = True
631 self._ready = True
632 if self._ready:
632 if self._ready:
633 try:
633 try:
634 results = map(self._client.results.get, self.msg_ids)
634 results = map(self._client.results.get, self.msg_ids)
635 self._result = results
635 self._result = results
636 if self._single_result:
636 if self._single_result:
637 r = results[0]
637 r = results[0]
638 if isinstance(r, Exception):
638 if isinstance(r, Exception):
639 raise r
639 raise r
640 else:
640 else:
641 results = error.collect_exceptions(results, self._fname)
641 results = error.collect_exceptions(results, self._fname)
642 self._result = self._reconstruct_result(results)
642 self._result = self._reconstruct_result(results)
643 except Exception, e:
643 except Exception, e:
644 self._exception = e
644 self._exception = e
645 self._success = False
645 self._success = False
646 else:
646 else:
647 self._success = True
647 self._success = True
648 finally:
648 finally:
649 self._metadata = map(self._client.metadata.get, self.msg_ids)
649 self._metadata = map(self._client.metadata.get, self.msg_ids)
650
650
651 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult'] No newline at end of file
651 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']
General Comments 0
You need to be logged in to leave comments. Login now