##// END OF EJS Templates
use finite wait_for_outputs
MinRK -
Show More
@@ -1,686 +1,686 b''
1 """AsyncResult objects for the client
1 """AsyncResult objects for the client
2
2
3 Authors:
3 Authors:
4
4
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import print_function
18 from __future__ import print_function
19
19
20 import sys
20 import sys
21 import time
21 import time
22 from datetime import datetime
22 from datetime import datetime
23
23
24 from zmq import MessageTracker
24 from zmq import MessageTracker
25
25
26 from IPython.core.display import clear_output, display, display_pretty
26 from IPython.core.display import clear_output, display, display_pretty
27 from IPython.external.decorator import decorator
27 from IPython.external.decorator import decorator
28 from IPython.parallel import error
28 from IPython.parallel import error
29
29
30 #-----------------------------------------------------------------------------
30 #-----------------------------------------------------------------------------
31 # Functions
31 # Functions
32 #-----------------------------------------------------------------------------
32 #-----------------------------------------------------------------------------
33
33
34 def _total_seconds(td):
34 def _total_seconds(td):
35 """timedelta.total_seconds was added in 2.7"""
35 """timedelta.total_seconds was added in 2.7"""
36 try:
36 try:
37 # Python >= 2.7
37 # Python >= 2.7
38 return td.total_seconds()
38 return td.total_seconds()
39 except AttributeError:
39 except AttributeError:
40 # Python 2.6
40 # Python 2.6
41 return 1e-6 * (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)
41 return 1e-6 * (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)
42
42
43 def _raw_text(s):
43 def _raw_text(s):
44 display_pretty(s, raw=True)
44 display_pretty(s, raw=True)
45
45
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47 # Classes
47 # Classes
48 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
49
49
50 # global empty tracker that's always done:
50 # global empty tracker that's always done:
51 finished_tracker = MessageTracker()
51 finished_tracker = MessageTracker()
52
52
53 @decorator
53 @decorator
54 def check_ready(f, self, *args, **kwargs):
54 def check_ready(f, self, *args, **kwargs):
55 """Call spin() to sync state prior to calling the method."""
55 """Call spin() to sync state prior to calling the method."""
56 self.wait(0)
56 self.wait(0)
57 if not self._ready:
57 if not self._ready:
58 raise error.TimeoutError("result not ready")
58 raise error.TimeoutError("result not ready")
59 return f(self, *args, **kwargs)
59 return f(self, *args, **kwargs)
60
60
61 class AsyncResult(object):
61 class AsyncResult(object):
62 """Class for representing results of non-blocking calls.
62 """Class for representing results of non-blocking calls.
63
63
64 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
64 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
65 """
65 """
66
66
67 msg_ids = None
67 msg_ids = None
68 _targets = None
68 _targets = None
69 _tracker = None
69 _tracker = None
70 _single_result = False
70 _single_result = False
71
71
72 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
72 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
73 if isinstance(msg_ids, basestring):
73 if isinstance(msg_ids, basestring):
74 # always a list
74 # always a list
75 msg_ids = [msg_ids]
75 msg_ids = [msg_ids]
76 if tracker is None:
76 if tracker is None:
77 # default to always done
77 # default to always done
78 tracker = finished_tracker
78 tracker = finished_tracker
79 self._client = client
79 self._client = client
80 self.msg_ids = msg_ids
80 self.msg_ids = msg_ids
81 self._fname=fname
81 self._fname=fname
82 self._targets = targets
82 self._targets = targets
83 self._tracker = tracker
83 self._tracker = tracker
84 self._ready = False
84 self._ready = False
85 self._success = None
85 self._success = None
86 self._metadata = None
86 self._metadata = None
87 if len(msg_ids) == 1:
87 if len(msg_ids) == 1:
88 self._single_result = not isinstance(targets, (list, tuple))
88 self._single_result = not isinstance(targets, (list, tuple))
89 else:
89 else:
90 self._single_result = False
90 self._single_result = False
91
91
92 def __repr__(self):
92 def __repr__(self):
93 if self._ready:
93 if self._ready:
94 return "<%s: finished>"%(self.__class__.__name__)
94 return "<%s: finished>"%(self.__class__.__name__)
95 else:
95 else:
96 return "<%s: %s>"%(self.__class__.__name__,self._fname)
96 return "<%s: %s>"%(self.__class__.__name__,self._fname)
97
97
98
98
99 def _reconstruct_result(self, res):
99 def _reconstruct_result(self, res):
100 """Reconstruct our result from actual result list (always a list)
100 """Reconstruct our result from actual result list (always a list)
101
101
102 Override me in subclasses for turning a list of results
102 Override me in subclasses for turning a list of results
103 into the expected form.
103 into the expected form.
104 """
104 """
105 if self._single_result:
105 if self._single_result:
106 return res[0]
106 return res[0]
107 else:
107 else:
108 return res
108 return res
109
109
110 def get(self, timeout=-1):
110 def get(self, timeout=-1):
111 """Return the result when it arrives.
111 """Return the result when it arrives.
112
112
113 If `timeout` is not ``None`` and the result does not arrive within
113 If `timeout` is not ``None`` and the result does not arrive within
114 `timeout` seconds then ``TimeoutError`` is raised. If the
114 `timeout` seconds then ``TimeoutError`` is raised. If the
115 remote call raised an exception then that exception will be reraised
115 remote call raised an exception then that exception will be reraised
116 by get() inside a `RemoteError`.
116 by get() inside a `RemoteError`.
117 """
117 """
118 if not self.ready():
118 if not self.ready():
119 self.wait(timeout)
119 self.wait(timeout)
120
120
121 if self._ready:
121 if self._ready:
122 if self._success:
122 if self._success:
123 return self._result
123 return self._result
124 else:
124 else:
125 raise self._exception
125 raise self._exception
126 else:
126 else:
127 raise error.TimeoutError("Result not ready.")
127 raise error.TimeoutError("Result not ready.")
128
128
129 def ready(self):
129 def ready(self):
130 """Return whether the call has completed."""
130 """Return whether the call has completed."""
131 if not self._ready:
131 if not self._ready:
132 self.wait(0)
132 self.wait(0)
133 return self._ready
133 return self._ready
134
134
135 def wait(self, timeout=-1):
135 def wait(self, timeout=-1):
136 """Wait until the result is available or until `timeout` seconds pass.
136 """Wait until the result is available or until `timeout` seconds pass.
137
137
138 This method always returns None.
138 This method always returns None.
139 """
139 """
140 if self._ready:
140 if self._ready:
141 return
141 return
142 self._ready = self._client.wait(self.msg_ids, timeout)
142 self._ready = self._client.wait(self.msg_ids, timeout)
143 if self._ready:
143 if self._ready:
144 try:
144 try:
145 results = map(self._client.results.get, self.msg_ids)
145 results = map(self._client.results.get, self.msg_ids)
146 self._result = results
146 self._result = results
147 if self._single_result:
147 if self._single_result:
148 r = results[0]
148 r = results[0]
149 if isinstance(r, Exception):
149 if isinstance(r, Exception):
150 raise r
150 raise r
151 else:
151 else:
152 results = error.collect_exceptions(results, self._fname)
152 results = error.collect_exceptions(results, self._fname)
153 self._result = self._reconstruct_result(results)
153 self._result = self._reconstruct_result(results)
154 except Exception, e:
154 except Exception, e:
155 self._exception = e
155 self._exception = e
156 self._success = False
156 self._success = False
157 else:
157 else:
158 self._success = True
158 self._success = True
159 finally:
159 finally:
160 self._metadata = map(self._client.metadata.get, self.msg_ids)
160 self._metadata = map(self._client.metadata.get, self.msg_ids)
161 self._wait_for_outputs()
161 self._wait_for_outputs(10)
162
162
163
163
164
164
165 def successful(self):
165 def successful(self):
166 """Return whether the call completed without raising an exception.
166 """Return whether the call completed without raising an exception.
167
167
168 Will raise ``AssertionError`` if the result is not ready.
168 Will raise ``AssertionError`` if the result is not ready.
169 """
169 """
170 assert self.ready()
170 assert self.ready()
171 return self._success
171 return self._success
172
172
173 #----------------------------------------------------------------
173 #----------------------------------------------------------------
174 # Extra methods not in mp.pool.AsyncResult
174 # Extra methods not in mp.pool.AsyncResult
175 #----------------------------------------------------------------
175 #----------------------------------------------------------------
176
176
177 def get_dict(self, timeout=-1):
177 def get_dict(self, timeout=-1):
178 """Get the results as a dict, keyed by engine_id.
178 """Get the results as a dict, keyed by engine_id.
179
179
180 timeout behavior is described in `get()`.
180 timeout behavior is described in `get()`.
181 """
181 """
182
182
183 results = self.get(timeout)
183 results = self.get(timeout)
184 engine_ids = [ md['engine_id'] for md in self._metadata ]
184 engine_ids = [ md['engine_id'] for md in self._metadata ]
185 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
185 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
186 maxcount = bycount.count(bycount[-1])
186 maxcount = bycount.count(bycount[-1])
187 if maxcount > 1:
187 if maxcount > 1:
188 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
188 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
189 maxcount, bycount[-1]))
189 maxcount, bycount[-1]))
190
190
191 return dict(zip(engine_ids,results))
191 return dict(zip(engine_ids,results))
192
192
193 @property
193 @property
194 def result(self):
194 def result(self):
195 """result property wrapper for `get(timeout=0)`."""
195 """result property wrapper for `get(timeout=0)`."""
196 return self.get()
196 return self.get()
197
197
198 # abbreviated alias:
198 # abbreviated alias:
199 r = result
199 r = result
200
200
201 @property
201 @property
202 @check_ready
202 @check_ready
203 def metadata(self):
203 def metadata(self):
204 """property for accessing execution metadata."""
204 """property for accessing execution metadata."""
205 if self._single_result:
205 if self._single_result:
206 return self._metadata[0]
206 return self._metadata[0]
207 else:
207 else:
208 return self._metadata
208 return self._metadata
209
209
210 @property
210 @property
211 def result_dict(self):
211 def result_dict(self):
212 """result property as a dict."""
212 """result property as a dict."""
213 return self.get_dict()
213 return self.get_dict()
214
214
215 def __dict__(self):
215 def __dict__(self):
216 return self.get_dict(0)
216 return self.get_dict(0)
217
217
218 def abort(self):
218 def abort(self):
219 """abort my tasks."""
219 """abort my tasks."""
220 assert not self.ready(), "Can't abort, I am already done!"
220 assert not self.ready(), "Can't abort, I am already done!"
221 return self._client.abort(self.msg_ids, targets=self._targets, block=True)
221 return self._client.abort(self.msg_ids, targets=self._targets, block=True)
222
222
223 @property
223 @property
224 def sent(self):
224 def sent(self):
225 """check whether my messages have been sent."""
225 """check whether my messages have been sent."""
226 return self._tracker.done
226 return self._tracker.done
227
227
228 def wait_for_send(self, timeout=-1):
228 def wait_for_send(self, timeout=-1):
229 """wait for pyzmq send to complete.
229 """wait for pyzmq send to complete.
230
230
231 This is necessary when sending arrays that you intend to edit in-place.
231 This is necessary when sending arrays that you intend to edit in-place.
232 `timeout` is in seconds, and will raise TimeoutError if it is reached
232 `timeout` is in seconds, and will raise TimeoutError if it is reached
233 before the send completes.
233 before the send completes.
234 """
234 """
235 return self._tracker.wait(timeout)
235 return self._tracker.wait(timeout)
236
236
237 #-------------------------------------
237 #-------------------------------------
238 # dict-access
238 # dict-access
239 #-------------------------------------
239 #-------------------------------------
240
240
241 @check_ready
241 @check_ready
242 def __getitem__(self, key):
242 def __getitem__(self, key):
243 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
243 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
244 """
244 """
245 if isinstance(key, int):
245 if isinstance(key, int):
246 return error.collect_exceptions([self._result[key]], self._fname)[0]
246 return error.collect_exceptions([self._result[key]], self._fname)[0]
247 elif isinstance(key, slice):
247 elif isinstance(key, slice):
248 return error.collect_exceptions(self._result[key], self._fname)
248 return error.collect_exceptions(self._result[key], self._fname)
249 elif isinstance(key, basestring):
249 elif isinstance(key, basestring):
250 values = [ md[key] for md in self._metadata ]
250 values = [ md[key] for md in self._metadata ]
251 if self._single_result:
251 if self._single_result:
252 return values[0]
252 return values[0]
253 else:
253 else:
254 return values
254 return values
255 else:
255 else:
256 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
256 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
257
257
258 def __getattr__(self, key):
258 def __getattr__(self, key):
259 """getattr maps to getitem for convenient attr access to metadata."""
259 """getattr maps to getitem for convenient attr access to metadata."""
260 try:
260 try:
261 return self.__getitem__(key)
261 return self.__getitem__(key)
262 except (error.TimeoutError, KeyError):
262 except (error.TimeoutError, KeyError):
263 raise AttributeError("%r object has no attribute %r"%(
263 raise AttributeError("%r object has no attribute %r"%(
264 self.__class__.__name__, key))
264 self.__class__.__name__, key))
265
265
266 # asynchronous iterator:
266 # asynchronous iterator:
267 def __iter__(self):
267 def __iter__(self):
268 if self._single_result:
268 if self._single_result:
269 raise TypeError("AsyncResults with a single result are not iterable.")
269 raise TypeError("AsyncResults with a single result are not iterable.")
270 try:
270 try:
271 rlist = self.get(0)
271 rlist = self.get(0)
272 except error.TimeoutError:
272 except error.TimeoutError:
273 # wait for each result individually
273 # wait for each result individually
274 for msg_id in self.msg_ids:
274 for msg_id in self.msg_ids:
275 ar = AsyncResult(self._client, msg_id, self._fname)
275 ar = AsyncResult(self._client, msg_id, self._fname)
276 yield ar.get()
276 yield ar.get()
277 else:
277 else:
278 # already done
278 # already done
279 for r in rlist:
279 for r in rlist:
280 yield r
280 yield r
281
281
282 def __len__(self):
282 def __len__(self):
283 return len(self.msg_ids)
283 return len(self.msg_ids)
284
284
285 #-------------------------------------
285 #-------------------------------------
286 # Sugar methods and attributes
286 # Sugar methods and attributes
287 #-------------------------------------
287 #-------------------------------------
288
288
289 def timedelta(self, start, end, start_key=min, end_key=max):
289 def timedelta(self, start, end, start_key=min, end_key=max):
290 """compute the difference between two sets of timestamps
290 """compute the difference between two sets of timestamps
291
291
292 The default behavior is to use the earliest of the first
292 The default behavior is to use the earliest of the first
293 and the latest of the second list, but this can be changed
293 and the latest of the second list, but this can be changed
294 by passing a different
294 by passing a different
295
295
296 Parameters
296 Parameters
297 ----------
297 ----------
298
298
299 start : one or more datetime objects (e.g. ar.submitted)
299 start : one or more datetime objects (e.g. ar.submitted)
300 end : one or more datetime objects (e.g. ar.received)
300 end : one or more datetime objects (e.g. ar.received)
301 start_key : callable
301 start_key : callable
302 Function to call on `start` to extract the relevant
302 Function to call on `start` to extract the relevant
303 entry [defalt: min]
303 entry [defalt: min]
304 end_key : callable
304 end_key : callable
305 Function to call on `end` to extract the relevant
305 Function to call on `end` to extract the relevant
306 entry [default: max]
306 entry [default: max]
307
307
308 Returns
308 Returns
309 -------
309 -------
310
310
311 dt : float
311 dt : float
312 The time elapsed (in seconds) between the two selected timestamps.
312 The time elapsed (in seconds) between the two selected timestamps.
313 """
313 """
314 if not isinstance(start, datetime):
314 if not isinstance(start, datetime):
315 # handle single_result AsyncResults, where ar.stamp is single object,
315 # handle single_result AsyncResults, where ar.stamp is single object,
316 # not a list
316 # not a list
317 start = start_key(start)
317 start = start_key(start)
318 if not isinstance(end, datetime):
318 if not isinstance(end, datetime):
319 # handle single_result AsyncResults, where ar.stamp is single object,
319 # handle single_result AsyncResults, where ar.stamp is single object,
320 # not a list
320 # not a list
321 end = end_key(end)
321 end = end_key(end)
322 return _total_seconds(end - start)
322 return _total_seconds(end - start)
323
323
324 @property
324 @property
325 def progress(self):
325 def progress(self):
326 """the number of tasks which have been completed at this point.
326 """the number of tasks which have been completed at this point.
327
327
328 Fractional progress would be given by 1.0 * ar.progress / len(ar)
328 Fractional progress would be given by 1.0 * ar.progress / len(ar)
329 """
329 """
330 self.wait(0)
330 self.wait(0)
331 return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding))
331 return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding))
332
332
333 @property
333 @property
334 def elapsed(self):
334 def elapsed(self):
335 """elapsed time since initial submission"""
335 """elapsed time since initial submission"""
336 if self.ready():
336 if self.ready():
337 return self.wall_time
337 return self.wall_time
338
338
339 now = submitted = datetime.now()
339 now = submitted = datetime.now()
340 for msg_id in self.msg_ids:
340 for msg_id in self.msg_ids:
341 if msg_id in self._client.metadata:
341 if msg_id in self._client.metadata:
342 stamp = self._client.metadata[msg_id]['submitted']
342 stamp = self._client.metadata[msg_id]['submitted']
343 if stamp and stamp < submitted:
343 if stamp and stamp < submitted:
344 submitted = stamp
344 submitted = stamp
345 return _total_seconds(now-submitted)
345 return _total_seconds(now-submitted)
346
346
347 @property
347 @property
348 @check_ready
348 @check_ready
349 def serial_time(self):
349 def serial_time(self):
350 """serial computation time of a parallel calculation
350 """serial computation time of a parallel calculation
351
351
352 Computed as the sum of (completed-started) of each task
352 Computed as the sum of (completed-started) of each task
353 """
353 """
354 t = 0
354 t = 0
355 for md in self._metadata:
355 for md in self._metadata:
356 t += _total_seconds(md['completed'] - md['started'])
356 t += _total_seconds(md['completed'] - md['started'])
357 return t
357 return t
358
358
359 @property
359 @property
360 @check_ready
360 @check_ready
361 def wall_time(self):
361 def wall_time(self):
362 """actual computation time of a parallel calculation
362 """actual computation time of a parallel calculation
363
363
364 Computed as the time between the latest `received` stamp
364 Computed as the time between the latest `received` stamp
365 and the earliest `submitted`.
365 and the earliest `submitted`.
366
366
367 Only reliable if Client was spinning/waiting when the task finished, because
367 Only reliable if Client was spinning/waiting when the task finished, because
368 the `received` timestamp is created when a result is pulled off of the zmq queue,
368 the `received` timestamp is created when a result is pulled off of the zmq queue,
369 which happens as a result of `client.spin()`.
369 which happens as a result of `client.spin()`.
370
370
371 For similar comparison of other timestamp pairs, check out AsyncResult.timedelta.
371 For similar comparison of other timestamp pairs, check out AsyncResult.timedelta.
372
372
373 """
373 """
374 return self.timedelta(self.submitted, self.received)
374 return self.timedelta(self.submitted, self.received)
375
375
376 def wait_interactive(self, interval=1., timeout=None):
376 def wait_interactive(self, interval=1., timeout=None):
377 """interactive wait, printing progress at regular intervals"""
377 """interactive wait, printing progress at regular intervals"""
378 N = len(self)
378 N = len(self)
379 tic = time.time()
379 tic = time.time()
380 while not self.ready() and (timeout is None or time.time() - tic <= timeout):
380 while not self.ready() and (timeout is None or time.time() - tic <= timeout):
381 self.wait(interval)
381 self.wait(interval)
382 clear_output()
382 clear_output()
383 print("%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed), end="")
383 print("%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed), end="")
384 sys.stdout.flush()
384 sys.stdout.flush()
385 print()
385 print()
386 print("done")
386 print("done")
387
387
388 def _republish_displaypub(self, content, eid):
388 def _republish_displaypub(self, content, eid):
389 """republish individual displaypub content dicts"""
389 """republish individual displaypub content dicts"""
390 try:
390 try:
391 ip = get_ipython()
391 ip = get_ipython()
392 except NameError:
392 except NameError:
393 # displaypub is meaningless outside IPython
393 # displaypub is meaningless outside IPython
394 return
394 return
395 md = content['metadata'] or {}
395 md = content['metadata'] or {}
396 md['engine'] = eid
396 md['engine'] = eid
397 ip.display_pub.publish(content['source'], content['data'], md)
397 ip.display_pub.publish(content['source'], content['data'], md)
398
398
399 def _display_stream(self, text, prefix='', file=None):
399 def _display_stream(self, text, prefix='', file=None):
400 if not text:
400 if not text:
401 # nothing to display
401 # nothing to display
402 return
402 return
403 if file is None:
403 if file is None:
404 file = sys.stdout
404 file = sys.stdout
405 end = '' if text.endswith('\n') else '\n'
405 end = '' if text.endswith('\n') else '\n'
406
406
407 multiline = text.count('\n') > int(text.endswith('\n'))
407 multiline = text.count('\n') > int(text.endswith('\n'))
408 if prefix and multiline and not text.startswith('\n'):
408 if prefix and multiline and not text.startswith('\n'):
409 prefix = prefix + '\n'
409 prefix = prefix + '\n'
410 print("%s%s" % (prefix, text), file=file, end=end)
410 print("%s%s" % (prefix, text), file=file, end=end)
411
411
412
412
413 def _display_single_result(self):
413 def _display_single_result(self):
414 self._display_stream(self.stdout)
414 self._display_stream(self.stdout)
415 self._display_stream(self.stderr, file=sys.stderr)
415 self._display_stream(self.stderr, file=sys.stderr)
416
416
417 try:
417 try:
418 get_ipython()
418 get_ipython()
419 except NameError:
419 except NameError:
420 # displaypub is meaningless outside IPython
420 # displaypub is meaningless outside IPython
421 return
421 return
422
422
423 for output in self.outputs:
423 for output in self.outputs:
424 self._republish_displaypub(output, self.engine_id)
424 self._republish_displaypub(output, self.engine_id)
425
425
426 if self.pyout is not None:
426 if self.pyout is not None:
427 display(self.get())
427 display(self.get())
428
428
429 def _wait_for_outputs(self, timeout=-1):
429 def _wait_for_outputs(self, timeout=-1):
430 """wait for the 'status=idle' message that indicates we have all outputs
430 """wait for the 'status=idle' message that indicates we have all outputs
431 """
431 """
432 if not self._success:
432 if not self._success:
433 # don't wait on errors
433 # don't wait on errors
434 return
434 return
435 tic = time.time()
435 tic = time.time()
436 while not all(md['outputs_ready'] for md in self._metadata):
436 while not all(md['outputs_ready'] for md in self._metadata):
437 time.sleep(0.01)
437 time.sleep(0.01)
438 self._client._flush_iopub(self._client._iopub_socket)
438 self._client._flush_iopub(self._client._iopub_socket)
439 if timeout >= 0 and time.time() > tic + timeout:
439 if timeout >= 0 and time.time() > tic + timeout:
440 break
440 break
441
441
442 @check_ready
442 @check_ready
443 def display_outputs(self, groupby="type"):
443 def display_outputs(self, groupby="type"):
444 """republish the outputs of the computation
444 """republish the outputs of the computation
445
445
446 Parameters
446 Parameters
447 ----------
447 ----------
448
448
449 groupby : str [default: type]
449 groupby : str [default: type]
450 if 'type':
450 if 'type':
451 Group outputs by type (show all stdout, then all stderr, etc.):
451 Group outputs by type (show all stdout, then all stderr, etc.):
452
452
453 [stdout:1] foo
453 [stdout:1] foo
454 [stdout:2] foo
454 [stdout:2] foo
455 [stderr:1] bar
455 [stderr:1] bar
456 [stderr:2] bar
456 [stderr:2] bar
457 if 'engine':
457 if 'engine':
458 Display outputs for each engine before moving on to the next:
458 Display outputs for each engine before moving on to the next:
459
459
460 [stdout:1] foo
460 [stdout:1] foo
461 [stderr:1] bar
461 [stderr:1] bar
462 [stdout:2] foo
462 [stdout:2] foo
463 [stderr:2] bar
463 [stderr:2] bar
464
464
465 if 'order':
465 if 'order':
466 Like 'type', but further collate individual displaypub
466 Like 'type', but further collate individual displaypub
467 outputs. This is meant for cases of each command producing
467 outputs. This is meant for cases of each command producing
468 several plots, and you would like to see all of the first
468 several plots, and you would like to see all of the first
469 plots together, then all of the second plots, and so on.
469 plots together, then all of the second plots, and so on.
470 """
470 """
471 if self._single_result:
471 if self._single_result:
472 self._display_single_result()
472 self._display_single_result()
473 return
473 return
474
474
475 stdouts = self.stdout
475 stdouts = self.stdout
476 stderrs = self.stderr
476 stderrs = self.stderr
477 pyouts = self.pyout
477 pyouts = self.pyout
478 output_lists = self.outputs
478 output_lists = self.outputs
479 results = self.get()
479 results = self.get()
480
480
481 targets = self.engine_id
481 targets = self.engine_id
482
482
483 if groupby == "engine":
483 if groupby == "engine":
484 for eid,stdout,stderr,outputs,r,pyout in zip(
484 for eid,stdout,stderr,outputs,r,pyout in zip(
485 targets, stdouts, stderrs, output_lists, results, pyouts
485 targets, stdouts, stderrs, output_lists, results, pyouts
486 ):
486 ):
487 self._display_stream(stdout, '[stdout:%i] ' % eid)
487 self._display_stream(stdout, '[stdout:%i] ' % eid)
488 self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
488 self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
489
489
490 try:
490 try:
491 get_ipython()
491 get_ipython()
492 except NameError:
492 except NameError:
493 # displaypub is meaningless outside IPython
493 # displaypub is meaningless outside IPython
494 return
494 return
495
495
496 if outputs or pyout is not None:
496 if outputs or pyout is not None:
497 _raw_text('[output:%i]' % eid)
497 _raw_text('[output:%i]' % eid)
498
498
499 for output in outputs:
499 for output in outputs:
500 self._republish_displaypub(output, eid)
500 self._republish_displaypub(output, eid)
501
501
502 if pyout is not None:
502 if pyout is not None:
503 display(r)
503 display(r)
504
504
505 elif groupby in ('type', 'order'):
505 elif groupby in ('type', 'order'):
506 # republish stdout:
506 # republish stdout:
507 for eid,stdout in zip(targets, stdouts):
507 for eid,stdout in zip(targets, stdouts):
508 self._display_stream(stdout, '[stdout:%i] ' % eid)
508 self._display_stream(stdout, '[stdout:%i] ' % eid)
509
509
510 # republish stderr:
510 # republish stderr:
511 for eid,stderr in zip(targets, stderrs):
511 for eid,stderr in zip(targets, stderrs):
512 self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
512 self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
513
513
514 try:
514 try:
515 get_ipython()
515 get_ipython()
516 except NameError:
516 except NameError:
517 # displaypub is meaningless outside IPython
517 # displaypub is meaningless outside IPython
518 return
518 return
519
519
520 if groupby == 'order':
520 if groupby == 'order':
521 output_dict = dict((eid, outputs) for eid,outputs in zip(targets, output_lists))
521 output_dict = dict((eid, outputs) for eid,outputs in zip(targets, output_lists))
522 N = max(len(outputs) for outputs in output_lists)
522 N = max(len(outputs) for outputs in output_lists)
523 for i in range(N):
523 for i in range(N):
524 for eid in targets:
524 for eid in targets:
525 outputs = output_dict[eid]
525 outputs = output_dict[eid]
526 if len(outputs) >= N:
526 if len(outputs) >= N:
527 _raw_text('[output:%i]' % eid)
527 _raw_text('[output:%i]' % eid)
528 self._republish_displaypub(outputs[i], eid)
528 self._republish_displaypub(outputs[i], eid)
529 else:
529 else:
530 # republish displaypub output
530 # republish displaypub output
531 for eid,outputs in zip(targets, output_lists):
531 for eid,outputs in zip(targets, output_lists):
532 if outputs:
532 if outputs:
533 _raw_text('[output:%i]' % eid)
533 _raw_text('[output:%i]' % eid)
534 for output in outputs:
534 for output in outputs:
535 self._republish_displaypub(output, eid)
535 self._republish_displaypub(output, eid)
536
536
537 # finally, add pyout:
537 # finally, add pyout:
538 for eid,r,pyout in zip(targets, results, pyouts):
538 for eid,r,pyout in zip(targets, results, pyouts):
539 if pyout is not None:
539 if pyout is not None:
540 display(r)
540 display(r)
541
541
542 else:
542 else:
543 raise ValueError("groupby must be one of 'type', 'engine', 'collate', not %r" % groupby)
543 raise ValueError("groupby must be one of 'type', 'engine', 'collate', not %r" % groupby)
544
544
545
545
546
546
547
547
548 class AsyncMapResult(AsyncResult):
548 class AsyncMapResult(AsyncResult):
549 """Class for representing results of non-blocking gathers.
549 """Class for representing results of non-blocking gathers.
550
550
551 This will properly reconstruct the gather.
551 This will properly reconstruct the gather.
552
552
553 This class is iterable at any time, and will wait on results as they come.
553 This class is iterable at any time, and will wait on results as they come.
554
554
555 If ordered=False, then the first results to arrive will come first, otherwise
555 If ordered=False, then the first results to arrive will come first, otherwise
556 results will be yielded in the order they were submitted.
556 results will be yielded in the order they were submitted.
557
557
558 """
558 """
559
559
560 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
560 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
561 AsyncResult.__init__(self, client, msg_ids, fname=fname)
561 AsyncResult.__init__(self, client, msg_ids, fname=fname)
562 self._mapObject = mapObject
562 self._mapObject = mapObject
563 self._single_result = False
563 self._single_result = False
564 self.ordered = ordered
564 self.ordered = ordered
565
565
566 def _reconstruct_result(self, res):
566 def _reconstruct_result(self, res):
567 """Perform the gather on the actual results."""
567 """Perform the gather on the actual results."""
568 return self._mapObject.joinPartitions(res)
568 return self._mapObject.joinPartitions(res)
569
569
570 # asynchronous iterator:
570 # asynchronous iterator:
571 def __iter__(self):
571 def __iter__(self):
572 it = self._ordered_iter if self.ordered else self._unordered_iter
572 it = self._ordered_iter if self.ordered else self._unordered_iter
573 for r in it():
573 for r in it():
574 yield r
574 yield r
575
575
576 # asynchronous ordered iterator:
576 # asynchronous ordered iterator:
577 def _ordered_iter(self):
577 def _ordered_iter(self):
578 """iterator for results *as they arrive*, preserving submission order."""
578 """iterator for results *as they arrive*, preserving submission order."""
579 try:
579 try:
580 rlist = self.get(0)
580 rlist = self.get(0)
581 except error.TimeoutError:
581 except error.TimeoutError:
582 # wait for each result individually
582 # wait for each result individually
583 for msg_id in self.msg_ids:
583 for msg_id in self.msg_ids:
584 ar = AsyncResult(self._client, msg_id, self._fname)
584 ar = AsyncResult(self._client, msg_id, self._fname)
585 rlist = ar.get()
585 rlist = ar.get()
586 try:
586 try:
587 for r in rlist:
587 for r in rlist:
588 yield r
588 yield r
589 except TypeError:
589 except TypeError:
590 # flattened, not a list
590 # flattened, not a list
591 # this could get broken by flattened data that returns iterables
591 # this could get broken by flattened data that returns iterables
592 # but most calls to map do not expose the `flatten` argument
592 # but most calls to map do not expose the `flatten` argument
593 yield rlist
593 yield rlist
594 else:
594 else:
595 # already done
595 # already done
596 for r in rlist:
596 for r in rlist:
597 yield r
597 yield r
598
598
599 # asynchronous unordered iterator:
599 # asynchronous unordered iterator:
600 def _unordered_iter(self):
600 def _unordered_iter(self):
601 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
601 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
602 try:
602 try:
603 rlist = self.get(0)
603 rlist = self.get(0)
604 except error.TimeoutError:
604 except error.TimeoutError:
605 pending = set(self.msg_ids)
605 pending = set(self.msg_ids)
606 while pending:
606 while pending:
607 try:
607 try:
608 self._client.wait(pending, 1e-3)
608 self._client.wait(pending, 1e-3)
609 except error.TimeoutError:
609 except error.TimeoutError:
610 # ignore timeout error, because that only means
610 # ignore timeout error, because that only means
611 # *some* jobs are outstanding
611 # *some* jobs are outstanding
612 pass
612 pass
613 # update ready set with those no longer outstanding:
613 # update ready set with those no longer outstanding:
614 ready = pending.difference(self._client.outstanding)
614 ready = pending.difference(self._client.outstanding)
615 # update pending to exclude those that are finished
615 # update pending to exclude those that are finished
616 pending = pending.difference(ready)
616 pending = pending.difference(ready)
617 while ready:
617 while ready:
618 msg_id = ready.pop()
618 msg_id = ready.pop()
619 ar = AsyncResult(self._client, msg_id, self._fname)
619 ar = AsyncResult(self._client, msg_id, self._fname)
620 rlist = ar.get()
620 rlist = ar.get()
621 try:
621 try:
622 for r in rlist:
622 for r in rlist:
623 yield r
623 yield r
624 except TypeError:
624 except TypeError:
625 # flattened, not a list
625 # flattened, not a list
626 # this could get broken by flattened data that returns iterables
626 # this could get broken by flattened data that returns iterables
627 # but most calls to map do not expose the `flatten` argument
627 # but most calls to map do not expose the `flatten` argument
628 yield rlist
628 yield rlist
629 else:
629 else:
630 # already done
630 # already done
631 for r in rlist:
631 for r in rlist:
632 yield r
632 yield r
633
633
634
634
635 class AsyncHubResult(AsyncResult):
635 class AsyncHubResult(AsyncResult):
636 """Class to wrap pending results that must be requested from the Hub.
636 """Class to wrap pending results that must be requested from the Hub.
637
637
638 Note that waiting/polling on these objects requires polling the Hubover the network,
638 Note that waiting/polling on these objects requires polling the Hubover the network,
639 so use `AsyncHubResult.wait()` sparingly.
639 so use `AsyncHubResult.wait()` sparingly.
640 """
640 """
641
641
642 def _wait_for_outputs(self, timeout=None):
642 def _wait_for_outputs(self, timeout=None):
643 """no-op, because HubResults are never incomplete"""
643 """no-op, because HubResults are never incomplete"""
644 return
644 return
645
645
646 def wait(self, timeout=-1):
646 def wait(self, timeout=-1):
647 """wait for result to complete."""
647 """wait for result to complete."""
648 start = time.time()
648 start = time.time()
649 if self._ready:
649 if self._ready:
650 return
650 return
651 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
651 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
652 local_ready = self._client.wait(local_ids, timeout)
652 local_ready = self._client.wait(local_ids, timeout)
653 if local_ready:
653 if local_ready:
654 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
654 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
655 if not remote_ids:
655 if not remote_ids:
656 self._ready = True
656 self._ready = True
657 else:
657 else:
658 rdict = self._client.result_status(remote_ids, status_only=False)
658 rdict = self._client.result_status(remote_ids, status_only=False)
659 pending = rdict['pending']
659 pending = rdict['pending']
660 while pending and (timeout < 0 or time.time() < start+timeout):
660 while pending and (timeout < 0 or time.time() < start+timeout):
661 rdict = self._client.result_status(remote_ids, status_only=False)
661 rdict = self._client.result_status(remote_ids, status_only=False)
662 pending = rdict['pending']
662 pending = rdict['pending']
663 if pending:
663 if pending:
664 time.sleep(0.1)
664 time.sleep(0.1)
665 if not pending:
665 if not pending:
666 self._ready = True
666 self._ready = True
667 if self._ready:
667 if self._ready:
668 try:
668 try:
669 results = map(self._client.results.get, self.msg_ids)
669 results = map(self._client.results.get, self.msg_ids)
670 self._result = results
670 self._result = results
671 if self._single_result:
671 if self._single_result:
672 r = results[0]
672 r = results[0]
673 if isinstance(r, Exception):
673 if isinstance(r, Exception):
674 raise r
674 raise r
675 else:
675 else:
676 results = error.collect_exceptions(results, self._fname)
676 results = error.collect_exceptions(results, self._fname)
677 self._result = self._reconstruct_result(results)
677 self._result = self._reconstruct_result(results)
678 except Exception, e:
678 except Exception, e:
679 self._exception = e
679 self._exception = e
680 self._success = False
680 self._success = False
681 else:
681 else:
682 self._success = True
682 self._success = True
683 finally:
683 finally:
684 self._metadata = map(self._client.metadata.get, self.msg_ids)
684 self._metadata = map(self._client.metadata.get, self.msg_ids)
685
685
686 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult'] No newline at end of file
686 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']
General Comments 0
You need to be logged in to leave comments. Login now