##// END OF EJS Templates
fix logic for infinite wait cutoff when waiting for outputs after receiving result
MinRK -
Show More
@@ -1,707 +1,708 b''
1 """AsyncResult objects for the client
1 """AsyncResult objects for the client
2
2
3 Authors:
3 Authors:
4
4
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import print_function
18 from __future__ import print_function
19
19
20 import sys
20 import sys
21 import time
21 import time
22 from datetime import datetime
22 from datetime import datetime
23
23
24 from zmq import MessageTracker
24 from zmq import MessageTracker
25
25
26 from IPython.core.display import clear_output, display, display_pretty
26 from IPython.core.display import clear_output, display, display_pretty
27 from IPython.external.decorator import decorator
27 from IPython.external.decorator import decorator
28 from IPython.parallel import error
28 from IPython.parallel import error
29
29
30 #-----------------------------------------------------------------------------
30 #-----------------------------------------------------------------------------
31 # Functions
31 # Functions
32 #-----------------------------------------------------------------------------
32 #-----------------------------------------------------------------------------
33
33
34 def _total_seconds(td):
34 def _total_seconds(td):
35 """timedelta.total_seconds was added in 2.7"""
35 """timedelta.total_seconds was added in 2.7"""
36 try:
36 try:
37 # Python >= 2.7
37 # Python >= 2.7
38 return td.total_seconds()
38 return td.total_seconds()
39 except AttributeError:
39 except AttributeError:
40 # Python 2.6
40 # Python 2.6
41 return 1e-6 * (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)
41 return 1e-6 * (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)
42
42
43 def _raw_text(s):
43 def _raw_text(s):
44 display_pretty(s, raw=True)
44 display_pretty(s, raw=True)
45
45
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47 # Classes
47 # Classes
48 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
49
49
50 # global empty tracker that's always done:
50 # global empty tracker that's always done:
51 finished_tracker = MessageTracker()
51 finished_tracker = MessageTracker()
52
52
53 @decorator
53 @decorator
54 def check_ready(f, self, *args, **kwargs):
54 def check_ready(f, self, *args, **kwargs):
55 """Call spin() to sync state prior to calling the method."""
55 """Call spin() to sync state prior to calling the method."""
56 self.wait(0)
56 self.wait(0)
57 if not self._ready:
57 if not self._ready:
58 raise error.TimeoutError("result not ready")
58 raise error.TimeoutError("result not ready")
59 return f(self, *args, **kwargs)
59 return f(self, *args, **kwargs)
60
60
61 class AsyncResult(object):
61 class AsyncResult(object):
62 """Class for representing results of non-blocking calls.
62 """Class for representing results of non-blocking calls.
63
63
64 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
64 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
65 """
65 """
66
66
67 msg_ids = None
67 msg_ids = None
68 _targets = None
68 _targets = None
69 _tracker = None
69 _tracker = None
70 _single_result = False
70 _single_result = False
71
71
72 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
72 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
73 if isinstance(msg_ids, basestring):
73 if isinstance(msg_ids, basestring):
74 # always a list
74 # always a list
75 msg_ids = [msg_ids]
75 msg_ids = [msg_ids]
76 if tracker is None:
76 if tracker is None:
77 # default to always done
77 # default to always done
78 tracker = finished_tracker
78 tracker = finished_tracker
79 self._client = client
79 self._client = client
80 self.msg_ids = msg_ids
80 self.msg_ids = msg_ids
81 self._fname=fname
81 self._fname=fname
82 self._targets = targets
82 self._targets = targets
83 self._tracker = tracker
83 self._tracker = tracker
84 self._ready = False
84 self._ready = False
85 self._outputs_ready = False
85 self._outputs_ready = False
86 self._success = None
86 self._success = None
87 self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ]
87 self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ]
88 if len(msg_ids) == 1:
88 if len(msg_ids) == 1:
89 self._single_result = not isinstance(targets, (list, tuple))
89 self._single_result = not isinstance(targets, (list, tuple))
90 else:
90 else:
91 self._single_result = False
91 self._single_result = False
92
92
93 def __repr__(self):
93 def __repr__(self):
94 if self._ready:
94 if self._ready:
95 return "<%s: finished>"%(self.__class__.__name__)
95 return "<%s: finished>"%(self.__class__.__name__)
96 else:
96 else:
97 return "<%s: %s>"%(self.__class__.__name__,self._fname)
97 return "<%s: %s>"%(self.__class__.__name__,self._fname)
98
98
99
99
100 def _reconstruct_result(self, res):
100 def _reconstruct_result(self, res):
101 """Reconstruct our result from actual result list (always a list)
101 """Reconstruct our result from actual result list (always a list)
102
102
103 Override me in subclasses for turning a list of results
103 Override me in subclasses for turning a list of results
104 into the expected form.
104 into the expected form.
105 """
105 """
106 if self._single_result:
106 if self._single_result:
107 return res[0]
107 return res[0]
108 else:
108 else:
109 return res
109 return res
110
110
111 def get(self, timeout=-1):
111 def get(self, timeout=-1):
112 """Return the result when it arrives.
112 """Return the result when it arrives.
113
113
114 If `timeout` is not ``None`` and the result does not arrive within
114 If `timeout` is not ``None`` and the result does not arrive within
115 `timeout` seconds then ``TimeoutError`` is raised. If the
115 `timeout` seconds then ``TimeoutError`` is raised. If the
116 remote call raised an exception then that exception will be reraised
116 remote call raised an exception then that exception will be reraised
117 by get() inside a `RemoteError`.
117 by get() inside a `RemoteError`.
118 """
118 """
119 if not self.ready():
119 if not self.ready():
120 self.wait(timeout)
120 self.wait(timeout)
121
121
122 if self._ready:
122 if self._ready:
123 if self._success:
123 if self._success:
124 return self._result
124 return self._result
125 else:
125 else:
126 raise self._exception
126 raise self._exception
127 else:
127 else:
128 raise error.TimeoutError("Result not ready.")
128 raise error.TimeoutError("Result not ready.")
129
129
130 def _check_ready(self):
130 def _check_ready(self):
131 if not self.ready():
131 if not self.ready():
132 raise error.TimeoutError("Result not ready.")
132 raise error.TimeoutError("Result not ready.")
133
133
134 def ready(self):
134 def ready(self):
135 """Return whether the call has completed."""
135 """Return whether the call has completed."""
136 if not self._ready:
136 if not self._ready:
137 self.wait(0)
137 self.wait(0)
138 elif not self._outputs_ready:
138 elif not self._outputs_ready:
139 self._wait_for_outputs(0)
139 self._wait_for_outputs(0)
140
140
141 return self._ready
141 return self._ready
142
142
143 def wait(self, timeout=-1):
143 def wait(self, timeout=-1):
144 """Wait until the result is available or until `timeout` seconds pass.
144 """Wait until the result is available or until `timeout` seconds pass.
145
145
146 This method always returns None.
146 This method always returns None.
147 """
147 """
148 if self._ready:
148 if self._ready:
149 self._wait_for_outputs(timeout)
149 self._wait_for_outputs(timeout)
150 return
150 return
151 self._ready = self._client.wait(self.msg_ids, timeout)
151 self._ready = self._client.wait(self.msg_ids, timeout)
152 if self._ready:
152 if self._ready:
153 try:
153 try:
154 results = map(self._client.results.get, self.msg_ids)
154 results = map(self._client.results.get, self.msg_ids)
155 self._result = results
155 self._result = results
156 if self._single_result:
156 if self._single_result:
157 r = results[0]
157 r = results[0]
158 if isinstance(r, Exception):
158 if isinstance(r, Exception):
159 raise r
159 raise r
160 else:
160 else:
161 results = error.collect_exceptions(results, self._fname)
161 results = error.collect_exceptions(results, self._fname)
162 self._result = self._reconstruct_result(results)
162 self._result = self._reconstruct_result(results)
163 except Exception as e:
163 except Exception as e:
164 self._exception = e
164 self._exception = e
165 self._success = False
165 self._success = False
166 else:
166 else:
167 self._success = True
167 self._success = True
168 finally:
168 finally:
169 if timeout is not None and timeout < 0:
169 if timeout is None or timeout < 0:
170 # cutoff infinite wait at 10s
170 timeout = 10
171 timeout = 10
171 self._wait_for_outputs(timeout)
172 self._wait_for_outputs(timeout)
172
173
173
174
174 def successful(self):
175 def successful(self):
175 """Return whether the call completed without raising an exception.
176 """Return whether the call completed without raising an exception.
176
177
177 Will raise ``AssertionError`` if the result is not ready.
178 Will raise ``AssertionError`` if the result is not ready.
178 """
179 """
179 assert self.ready()
180 assert self.ready()
180 return self._success
181 return self._success
181
182
182 #----------------------------------------------------------------
183 #----------------------------------------------------------------
183 # Extra methods not in mp.pool.AsyncResult
184 # Extra methods not in mp.pool.AsyncResult
184 #----------------------------------------------------------------
185 #----------------------------------------------------------------
185
186
186 def get_dict(self, timeout=-1):
187 def get_dict(self, timeout=-1):
187 """Get the results as a dict, keyed by engine_id.
188 """Get the results as a dict, keyed by engine_id.
188
189
189 timeout behavior is described in `get()`.
190 timeout behavior is described in `get()`.
190 """
191 """
191
192
192 results = self.get(timeout)
193 results = self.get(timeout)
193 engine_ids = [ md['engine_id'] for md in self._metadata ]
194 engine_ids = [ md['engine_id'] for md in self._metadata ]
194 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
195 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
195 maxcount = bycount.count(bycount[-1])
196 maxcount = bycount.count(bycount[-1])
196 if maxcount > 1:
197 if maxcount > 1:
197 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
198 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
198 maxcount, bycount[-1]))
199 maxcount, bycount[-1]))
199
200
200 return dict(zip(engine_ids,results))
201 return dict(zip(engine_ids,results))
201
202
202 @property
203 @property
203 def result(self):
204 def result(self):
204 """result property wrapper for `get(timeout=-1)`."""
205 """result property wrapper for `get(timeout=-1)`."""
205 return self.get()
206 return self.get()
206
207
207 # abbreviated alias:
208 # abbreviated alias:
208 r = result
209 r = result
209
210
210 @property
211 @property
211 def metadata(self):
212 def metadata(self):
212 """property for accessing execution metadata."""
213 """property for accessing execution metadata."""
213 if self._single_result:
214 if self._single_result:
214 return self._metadata[0]
215 return self._metadata[0]
215 else:
216 else:
216 return self._metadata
217 return self._metadata
217
218
218 @property
219 @property
219 def result_dict(self):
220 def result_dict(self):
220 """result property as a dict."""
221 """result property as a dict."""
221 return self.get_dict()
222 return self.get_dict()
222
223
223 def __dict__(self):
224 def __dict__(self):
224 return self.get_dict(0)
225 return self.get_dict(0)
225
226
226 def abort(self):
227 def abort(self):
227 """abort my tasks."""
228 """abort my tasks."""
228 assert not self.ready(), "Can't abort, I am already done!"
229 assert not self.ready(), "Can't abort, I am already done!"
229 return self._client.abort(self.msg_ids, targets=self._targets, block=True)
230 return self._client.abort(self.msg_ids, targets=self._targets, block=True)
230
231
231 @property
232 @property
232 def sent(self):
233 def sent(self):
233 """check whether my messages have been sent."""
234 """check whether my messages have been sent."""
234 return self._tracker.done
235 return self._tracker.done
235
236
236 def wait_for_send(self, timeout=-1):
237 def wait_for_send(self, timeout=-1):
237 """wait for pyzmq send to complete.
238 """wait for pyzmq send to complete.
238
239
239 This is necessary when sending arrays that you intend to edit in-place.
240 This is necessary when sending arrays that you intend to edit in-place.
240 `timeout` is in seconds, and will raise TimeoutError if it is reached
241 `timeout` is in seconds, and will raise TimeoutError if it is reached
241 before the send completes.
242 before the send completes.
242 """
243 """
243 return self._tracker.wait(timeout)
244 return self._tracker.wait(timeout)
244
245
245 #-------------------------------------
246 #-------------------------------------
246 # dict-access
247 # dict-access
247 #-------------------------------------
248 #-------------------------------------
248
249
249 def __getitem__(self, key):
250 def __getitem__(self, key):
250 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
251 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
251 """
252 """
252 if isinstance(key, int):
253 if isinstance(key, int):
253 self._check_ready()
254 self._check_ready()
254 return error.collect_exceptions([self._result[key]], self._fname)[0]
255 return error.collect_exceptions([self._result[key]], self._fname)[0]
255 elif isinstance(key, slice):
256 elif isinstance(key, slice):
256 self._check_ready()
257 self._check_ready()
257 return error.collect_exceptions(self._result[key], self._fname)
258 return error.collect_exceptions(self._result[key], self._fname)
258 elif isinstance(key, basestring):
259 elif isinstance(key, basestring):
259 # metadata proxy *does not* require that results are done
260 # metadata proxy *does not* require that results are done
260 self.wait(0)
261 self.wait(0)
261 values = [ md[key] for md in self._metadata ]
262 values = [ md[key] for md in self._metadata ]
262 if self._single_result:
263 if self._single_result:
263 return values[0]
264 return values[0]
264 else:
265 else:
265 return values
266 return values
266 else:
267 else:
267 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
268 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
268
269
269 def __getattr__(self, key):
270 def __getattr__(self, key):
270 """getattr maps to getitem for convenient attr access to metadata."""
271 """getattr maps to getitem for convenient attr access to metadata."""
271 try:
272 try:
272 return self.__getitem__(key)
273 return self.__getitem__(key)
273 except (error.TimeoutError, KeyError):
274 except (error.TimeoutError, KeyError):
274 raise AttributeError("%r object has no attribute %r"%(
275 raise AttributeError("%r object has no attribute %r"%(
275 self.__class__.__name__, key))
276 self.__class__.__name__, key))
276
277
277 # asynchronous iterator:
278 # asynchronous iterator:
278 def __iter__(self):
279 def __iter__(self):
279 if self._single_result:
280 if self._single_result:
280 raise TypeError("AsyncResults with a single result are not iterable.")
281 raise TypeError("AsyncResults with a single result are not iterable.")
281 try:
282 try:
282 rlist = self.get(0)
283 rlist = self.get(0)
283 except error.TimeoutError:
284 except error.TimeoutError:
284 # wait for each result individually
285 # wait for each result individually
285 for msg_id in self.msg_ids:
286 for msg_id in self.msg_ids:
286 ar = AsyncResult(self._client, msg_id, self._fname)
287 ar = AsyncResult(self._client, msg_id, self._fname)
287 yield ar.get()
288 yield ar.get()
288 else:
289 else:
289 # already done
290 # already done
290 for r in rlist:
291 for r in rlist:
291 yield r
292 yield r
292
293
293 def __len__(self):
294 def __len__(self):
294 return len(self.msg_ids)
295 return len(self.msg_ids)
295
296
296 #-------------------------------------
297 #-------------------------------------
297 # Sugar methods and attributes
298 # Sugar methods and attributes
298 #-------------------------------------
299 #-------------------------------------
299
300
300 def timedelta(self, start, end, start_key=min, end_key=max):
301 def timedelta(self, start, end, start_key=min, end_key=max):
301 """compute the difference between two sets of timestamps
302 """compute the difference between two sets of timestamps
302
303
303 The default behavior is to use the earliest of the first
304 The default behavior is to use the earliest of the first
304 and the latest of the second list, but this can be changed
305 and the latest of the second list, but this can be changed
305 by passing a different
306 by passing a different
306
307
307 Parameters
308 Parameters
308 ----------
309 ----------
309
310
310 start : one or more datetime objects (e.g. ar.submitted)
311 start : one or more datetime objects (e.g. ar.submitted)
311 end : one or more datetime objects (e.g. ar.received)
312 end : one or more datetime objects (e.g. ar.received)
312 start_key : callable
313 start_key : callable
313 Function to call on `start` to extract the relevant
314 Function to call on `start` to extract the relevant
314 entry [defalt: min]
315 entry [defalt: min]
315 end_key : callable
316 end_key : callable
316 Function to call on `end` to extract the relevant
317 Function to call on `end` to extract the relevant
317 entry [default: max]
318 entry [default: max]
318
319
319 Returns
320 Returns
320 -------
321 -------
321
322
322 dt : float
323 dt : float
323 The time elapsed (in seconds) between the two selected timestamps.
324 The time elapsed (in seconds) between the two selected timestamps.
324 """
325 """
325 if not isinstance(start, datetime):
326 if not isinstance(start, datetime):
326 # handle single_result AsyncResults, where ar.stamp is single object,
327 # handle single_result AsyncResults, where ar.stamp is single object,
327 # not a list
328 # not a list
328 start = start_key(start)
329 start = start_key(start)
329 if not isinstance(end, datetime):
330 if not isinstance(end, datetime):
330 # handle single_result AsyncResults, where ar.stamp is single object,
331 # handle single_result AsyncResults, where ar.stamp is single object,
331 # not a list
332 # not a list
332 end = end_key(end)
333 end = end_key(end)
333 return _total_seconds(end - start)
334 return _total_seconds(end - start)
334
335
335 @property
336 @property
336 def progress(self):
337 def progress(self):
337 """the number of tasks which have been completed at this point.
338 """the number of tasks which have been completed at this point.
338
339
339 Fractional progress would be given by 1.0 * ar.progress / len(ar)
340 Fractional progress would be given by 1.0 * ar.progress / len(ar)
340 """
341 """
341 self.wait(0)
342 self.wait(0)
342 return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding))
343 return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding))
343
344
344 @property
345 @property
345 def elapsed(self):
346 def elapsed(self):
346 """elapsed time since initial submission"""
347 """elapsed time since initial submission"""
347 if self.ready():
348 if self.ready():
348 return self.wall_time
349 return self.wall_time
349
350
350 now = submitted = datetime.now()
351 now = submitted = datetime.now()
351 for msg_id in self.msg_ids:
352 for msg_id in self.msg_ids:
352 if msg_id in self._client.metadata:
353 if msg_id in self._client.metadata:
353 stamp = self._client.metadata[msg_id]['submitted']
354 stamp = self._client.metadata[msg_id]['submitted']
354 if stamp and stamp < submitted:
355 if stamp and stamp < submitted:
355 submitted = stamp
356 submitted = stamp
356 return _total_seconds(now-submitted)
357 return _total_seconds(now-submitted)
357
358
358 @property
359 @property
359 @check_ready
360 @check_ready
360 def serial_time(self):
361 def serial_time(self):
361 """serial computation time of a parallel calculation
362 """serial computation time of a parallel calculation
362
363
363 Computed as the sum of (completed-started) of each task
364 Computed as the sum of (completed-started) of each task
364 """
365 """
365 t = 0
366 t = 0
366 for md in self._metadata:
367 for md in self._metadata:
367 t += _total_seconds(md['completed'] - md['started'])
368 t += _total_seconds(md['completed'] - md['started'])
368 return t
369 return t
369
370
370 @property
371 @property
371 @check_ready
372 @check_ready
372 def wall_time(self):
373 def wall_time(self):
373 """actual computation time of a parallel calculation
374 """actual computation time of a parallel calculation
374
375
375 Computed as the time between the latest `received` stamp
376 Computed as the time between the latest `received` stamp
376 and the earliest `submitted`.
377 and the earliest `submitted`.
377
378
378 Only reliable if Client was spinning/waiting when the task finished, because
379 Only reliable if Client was spinning/waiting when the task finished, because
379 the `received` timestamp is created when a result is pulled off of the zmq queue,
380 the `received` timestamp is created when a result is pulled off of the zmq queue,
380 which happens as a result of `client.spin()`.
381 which happens as a result of `client.spin()`.
381
382
382 For similar comparison of other timestamp pairs, check out AsyncResult.timedelta.
383 For similar comparison of other timestamp pairs, check out AsyncResult.timedelta.
383
384
384 """
385 """
385 return self.timedelta(self.submitted, self.received)
386 return self.timedelta(self.submitted, self.received)
386
387
387 def wait_interactive(self, interval=1., timeout=-1):
388 def wait_interactive(self, interval=1., timeout=-1):
388 """interactive wait, printing progress at regular intervals"""
389 """interactive wait, printing progress at regular intervals"""
389 if timeout is None:
390 if timeout is None:
390 timeout = -1
391 timeout = -1
391 N = len(self)
392 N = len(self)
392 tic = time.time()
393 tic = time.time()
393 while not self.ready() and (timeout < 0 or time.time() - tic <= timeout):
394 while not self.ready() and (timeout < 0 or time.time() - tic <= timeout):
394 self.wait(interval)
395 self.wait(interval)
395 clear_output()
396 clear_output()
396 print("%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed), end="")
397 print("%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed), end="")
397 sys.stdout.flush()
398 sys.stdout.flush()
398 print()
399 print()
399 print("done")
400 print("done")
400
401
401 def _republish_displaypub(self, content, eid):
402 def _republish_displaypub(self, content, eid):
402 """republish individual displaypub content dicts"""
403 """republish individual displaypub content dicts"""
403 try:
404 try:
404 ip = get_ipython()
405 ip = get_ipython()
405 except NameError:
406 except NameError:
406 # displaypub is meaningless outside IPython
407 # displaypub is meaningless outside IPython
407 return
408 return
408 md = content['metadata'] or {}
409 md = content['metadata'] or {}
409 md['engine'] = eid
410 md['engine'] = eid
410 ip.display_pub.publish(content['source'], content['data'], md)
411 ip.display_pub.publish(content['source'], content['data'], md)
411
412
412 def _display_stream(self, text, prefix='', file=None):
413 def _display_stream(self, text, prefix='', file=None):
413 if not text:
414 if not text:
414 # nothing to display
415 # nothing to display
415 return
416 return
416 if file is None:
417 if file is None:
417 file = sys.stdout
418 file = sys.stdout
418 end = '' if text.endswith('\n') else '\n'
419 end = '' if text.endswith('\n') else '\n'
419
420
420 multiline = text.count('\n') > int(text.endswith('\n'))
421 multiline = text.count('\n') > int(text.endswith('\n'))
421 if prefix and multiline and not text.startswith('\n'):
422 if prefix and multiline and not text.startswith('\n'):
422 prefix = prefix + '\n'
423 prefix = prefix + '\n'
423 print("%s%s" % (prefix, text), file=file, end=end)
424 print("%s%s" % (prefix, text), file=file, end=end)
424
425
425
426
426 def _display_single_result(self):
427 def _display_single_result(self):
427 self._display_stream(self.stdout)
428 self._display_stream(self.stdout)
428 self._display_stream(self.stderr, file=sys.stderr)
429 self._display_stream(self.stderr, file=sys.stderr)
429
430
430 try:
431 try:
431 get_ipython()
432 get_ipython()
432 except NameError:
433 except NameError:
433 # displaypub is meaningless outside IPython
434 # displaypub is meaningless outside IPython
434 return
435 return
435
436
436 for output in self.outputs:
437 for output in self.outputs:
437 self._republish_displaypub(output, self.engine_id)
438 self._republish_displaypub(output, self.engine_id)
438
439
439 if self.pyout is not None:
440 if self.pyout is not None:
440 display(self.get())
441 display(self.get())
441
442
442 def _wait_for_outputs(self, timeout=-1):
443 def _wait_for_outputs(self, timeout=-1):
443 """wait for the 'status=idle' message that indicates we have all outputs
444 """wait for the 'status=idle' message that indicates we have all outputs
444 """
445 """
445 if self._outputs_ready or not self._success:
446 if self._outputs_ready or not self._success:
446 # don't wait on errors
447 # don't wait on errors
447 return
448 return
448
449
449 # cast None to -1 for infinite timeout
450 # cast None to -1 for infinite timeout
450 if timeout is None:
451 if timeout is None:
451 timeout = -1
452 timeout = -1
452
453
453 tic = time.time()
454 tic = time.time()
454 self._client._flush_iopub(self._client._iopub_socket)
455 self._client._flush_iopub(self._client._iopub_socket)
455 self._outputs_ready = all(md['outputs_ready'] for md in self._metadata)
456 self._outputs_ready = all(md['outputs_ready'] for md in self._metadata)
456 while not self._outputs_ready:
457 while not self._outputs_ready:
457 time.sleep(0.01)
458 time.sleep(0.01)
458 self._client._flush_iopub(self._client._iopub_socket)
459 self._client._flush_iopub(self._client._iopub_socket)
459 self._outputs_ready = all(md['outputs_ready'] for md in self._metadata)
460 self._outputs_ready = all(md['outputs_ready'] for md in self._metadata)
460 if timeout >= 0 and time.time() > tic + timeout:
461 if timeout >= 0 and time.time() > tic + timeout:
461 break
462 break
462
463
463 @check_ready
464 @check_ready
464 def display_outputs(self, groupby="type"):
465 def display_outputs(self, groupby="type"):
465 """republish the outputs of the computation
466 """republish the outputs of the computation
466
467
467 Parameters
468 Parameters
468 ----------
469 ----------
469
470
470 groupby : str [default: type]
471 groupby : str [default: type]
471 if 'type':
472 if 'type':
472 Group outputs by type (show all stdout, then all stderr, etc.):
473 Group outputs by type (show all stdout, then all stderr, etc.):
473
474
474 [stdout:1] foo
475 [stdout:1] foo
475 [stdout:2] foo
476 [stdout:2] foo
476 [stderr:1] bar
477 [stderr:1] bar
477 [stderr:2] bar
478 [stderr:2] bar
478 if 'engine':
479 if 'engine':
479 Display outputs for each engine before moving on to the next:
480 Display outputs for each engine before moving on to the next:
480
481
481 [stdout:1] foo
482 [stdout:1] foo
482 [stderr:1] bar
483 [stderr:1] bar
483 [stdout:2] foo
484 [stdout:2] foo
484 [stderr:2] bar
485 [stderr:2] bar
485
486
486 if 'order':
487 if 'order':
487 Like 'type', but further collate individual displaypub
488 Like 'type', but further collate individual displaypub
488 outputs. This is meant for cases of each command producing
489 outputs. This is meant for cases of each command producing
489 several plots, and you would like to see all of the first
490 several plots, and you would like to see all of the first
490 plots together, then all of the second plots, and so on.
491 plots together, then all of the second plots, and so on.
491 """
492 """
492 if self._single_result:
493 if self._single_result:
493 self._display_single_result()
494 self._display_single_result()
494 return
495 return
495
496
496 stdouts = self.stdout
497 stdouts = self.stdout
497 stderrs = self.stderr
498 stderrs = self.stderr
498 pyouts = self.pyout
499 pyouts = self.pyout
499 output_lists = self.outputs
500 output_lists = self.outputs
500 results = self.get()
501 results = self.get()
501
502
502 targets = self.engine_id
503 targets = self.engine_id
503
504
504 if groupby == "engine":
505 if groupby == "engine":
505 for eid,stdout,stderr,outputs,r,pyout in zip(
506 for eid,stdout,stderr,outputs,r,pyout in zip(
506 targets, stdouts, stderrs, output_lists, results, pyouts
507 targets, stdouts, stderrs, output_lists, results, pyouts
507 ):
508 ):
508 self._display_stream(stdout, '[stdout:%i] ' % eid)
509 self._display_stream(stdout, '[stdout:%i] ' % eid)
509 self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
510 self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
510
511
511 try:
512 try:
512 get_ipython()
513 get_ipython()
513 except NameError:
514 except NameError:
514 # displaypub is meaningless outside IPython
515 # displaypub is meaningless outside IPython
515 return
516 return
516
517
517 if outputs or pyout is not None:
518 if outputs or pyout is not None:
518 _raw_text('[output:%i]' % eid)
519 _raw_text('[output:%i]' % eid)
519
520
520 for output in outputs:
521 for output in outputs:
521 self._republish_displaypub(output, eid)
522 self._republish_displaypub(output, eid)
522
523
523 if pyout is not None:
524 if pyout is not None:
524 display(r)
525 display(r)
525
526
526 elif groupby in ('type', 'order'):
527 elif groupby in ('type', 'order'):
527 # republish stdout:
528 # republish stdout:
528 for eid,stdout in zip(targets, stdouts):
529 for eid,stdout in zip(targets, stdouts):
529 self._display_stream(stdout, '[stdout:%i] ' % eid)
530 self._display_stream(stdout, '[stdout:%i] ' % eid)
530
531
531 # republish stderr:
532 # republish stderr:
532 for eid,stderr in zip(targets, stderrs):
533 for eid,stderr in zip(targets, stderrs):
533 self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
534 self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
534
535
535 try:
536 try:
536 get_ipython()
537 get_ipython()
537 except NameError:
538 except NameError:
538 # displaypub is meaningless outside IPython
539 # displaypub is meaningless outside IPython
539 return
540 return
540
541
541 if groupby == 'order':
542 if groupby == 'order':
542 output_dict = dict((eid, outputs) for eid,outputs in zip(targets, output_lists))
543 output_dict = dict((eid, outputs) for eid,outputs in zip(targets, output_lists))
543 N = max(len(outputs) for outputs in output_lists)
544 N = max(len(outputs) for outputs in output_lists)
544 for i in range(N):
545 for i in range(N):
545 for eid in targets:
546 for eid in targets:
546 outputs = output_dict[eid]
547 outputs = output_dict[eid]
547 if len(outputs) >= N:
548 if len(outputs) >= N:
548 _raw_text('[output:%i]' % eid)
549 _raw_text('[output:%i]' % eid)
549 self._republish_displaypub(outputs[i], eid)
550 self._republish_displaypub(outputs[i], eid)
550 else:
551 else:
551 # republish displaypub output
552 # republish displaypub output
552 for eid,outputs in zip(targets, output_lists):
553 for eid,outputs in zip(targets, output_lists):
553 if outputs:
554 if outputs:
554 _raw_text('[output:%i]' % eid)
555 _raw_text('[output:%i]' % eid)
555 for output in outputs:
556 for output in outputs:
556 self._republish_displaypub(output, eid)
557 self._republish_displaypub(output, eid)
557
558
558 # finally, add pyout:
559 # finally, add pyout:
559 for eid,r,pyout in zip(targets, results, pyouts):
560 for eid,r,pyout in zip(targets, results, pyouts):
560 if pyout is not None:
561 if pyout is not None:
561 display(r)
562 display(r)
562
563
563 else:
564 else:
564 raise ValueError("groupby must be one of 'type', 'engine', 'collate', not %r" % groupby)
565 raise ValueError("groupby must be one of 'type', 'engine', 'collate', not %r" % groupby)
565
566
566
567
567
568
568
569
569 class AsyncMapResult(AsyncResult):
570 class AsyncMapResult(AsyncResult):
570 """Class for representing results of non-blocking gathers.
571 """Class for representing results of non-blocking gathers.
571
572
572 This will properly reconstruct the gather.
573 This will properly reconstruct the gather.
573
574
574 This class is iterable at any time, and will wait on results as they come.
575 This class is iterable at any time, and will wait on results as they come.
575
576
576 If ordered=False, then the first results to arrive will come first, otherwise
577 If ordered=False, then the first results to arrive will come first, otherwise
577 results will be yielded in the order they were submitted.
578 results will be yielded in the order they were submitted.
578
579
579 """
580 """
580
581
581 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
582 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
582 AsyncResult.__init__(self, client, msg_ids, fname=fname)
583 AsyncResult.__init__(self, client, msg_ids, fname=fname)
583 self._mapObject = mapObject
584 self._mapObject = mapObject
584 self._single_result = False
585 self._single_result = False
585 self.ordered = ordered
586 self.ordered = ordered
586
587
587 def _reconstruct_result(self, res):
588 def _reconstruct_result(self, res):
588 """Perform the gather on the actual results."""
589 """Perform the gather on the actual results."""
589 return self._mapObject.joinPartitions(res)
590 return self._mapObject.joinPartitions(res)
590
591
591 # asynchronous iterator:
592 # asynchronous iterator:
592 def __iter__(self):
593 def __iter__(self):
593 it = self._ordered_iter if self.ordered else self._unordered_iter
594 it = self._ordered_iter if self.ordered else self._unordered_iter
594 for r in it():
595 for r in it():
595 yield r
596 yield r
596
597
597 # asynchronous ordered iterator:
598 # asynchronous ordered iterator:
598 def _ordered_iter(self):
599 def _ordered_iter(self):
599 """iterator for results *as they arrive*, preserving submission order."""
600 """iterator for results *as they arrive*, preserving submission order."""
600 try:
601 try:
601 rlist = self.get(0)
602 rlist = self.get(0)
602 except error.TimeoutError:
603 except error.TimeoutError:
603 # wait for each result individually
604 # wait for each result individually
604 for msg_id in self.msg_ids:
605 for msg_id in self.msg_ids:
605 ar = AsyncResult(self._client, msg_id, self._fname)
606 ar = AsyncResult(self._client, msg_id, self._fname)
606 rlist = ar.get()
607 rlist = ar.get()
607 try:
608 try:
608 for r in rlist:
609 for r in rlist:
609 yield r
610 yield r
610 except TypeError:
611 except TypeError:
611 # flattened, not a list
612 # flattened, not a list
612 # this could get broken by flattened data that returns iterables
613 # this could get broken by flattened data that returns iterables
613 # but most calls to map do not expose the `flatten` argument
614 # but most calls to map do not expose the `flatten` argument
614 yield rlist
615 yield rlist
615 else:
616 else:
616 # already done
617 # already done
617 for r in rlist:
618 for r in rlist:
618 yield r
619 yield r
619
620
620 # asynchronous unordered iterator:
621 # asynchronous unordered iterator:
621 def _unordered_iter(self):
622 def _unordered_iter(self):
622 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
623 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
623 try:
624 try:
624 rlist = self.get(0)
625 rlist = self.get(0)
625 except error.TimeoutError:
626 except error.TimeoutError:
626 pending = set(self.msg_ids)
627 pending = set(self.msg_ids)
627 while pending:
628 while pending:
628 try:
629 try:
629 self._client.wait(pending, 1e-3)
630 self._client.wait(pending, 1e-3)
630 except error.TimeoutError:
631 except error.TimeoutError:
631 # ignore timeout error, because that only means
632 # ignore timeout error, because that only means
632 # *some* jobs are outstanding
633 # *some* jobs are outstanding
633 pass
634 pass
634 # update ready set with those no longer outstanding:
635 # update ready set with those no longer outstanding:
635 ready = pending.difference(self._client.outstanding)
636 ready = pending.difference(self._client.outstanding)
636 # update pending to exclude those that are finished
637 # update pending to exclude those that are finished
637 pending = pending.difference(ready)
638 pending = pending.difference(ready)
638 while ready:
639 while ready:
639 msg_id = ready.pop()
640 msg_id = ready.pop()
640 ar = AsyncResult(self._client, msg_id, self._fname)
641 ar = AsyncResult(self._client, msg_id, self._fname)
641 rlist = ar.get()
642 rlist = ar.get()
642 try:
643 try:
643 for r in rlist:
644 for r in rlist:
644 yield r
645 yield r
645 except TypeError:
646 except TypeError:
646 # flattened, not a list
647 # flattened, not a list
647 # this could get broken by flattened data that returns iterables
648 # this could get broken by flattened data that returns iterables
648 # but most calls to map do not expose the `flatten` argument
649 # but most calls to map do not expose the `flatten` argument
649 yield rlist
650 yield rlist
650 else:
651 else:
651 # already done
652 # already done
652 for r in rlist:
653 for r in rlist:
653 yield r
654 yield r
654
655
655
656
656 class AsyncHubResult(AsyncResult):
657 class AsyncHubResult(AsyncResult):
657 """Class to wrap pending results that must be requested from the Hub.
658 """Class to wrap pending results that must be requested from the Hub.
658
659
659 Note that waiting/polling on these objects requires polling the Hubover the network,
660 Note that waiting/polling on these objects requires polling the Hubover the network,
660 so use `AsyncHubResult.wait()` sparingly.
661 so use `AsyncHubResult.wait()` sparingly.
661 """
662 """
662
663
663 def _wait_for_outputs(self, timeout=-1):
664 def _wait_for_outputs(self, timeout=-1):
664 """no-op, because HubResults are never incomplete"""
665 """no-op, because HubResults are never incomplete"""
665 self._outputs_ready = True
666 self._outputs_ready = True
666
667
667 def wait(self, timeout=-1):
668 def wait(self, timeout=-1):
668 """wait for result to complete."""
669 """wait for result to complete."""
669 start = time.time()
670 start = time.time()
670 if self._ready:
671 if self._ready:
671 return
672 return
672 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
673 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
673 local_ready = self._client.wait(local_ids, timeout)
674 local_ready = self._client.wait(local_ids, timeout)
674 if local_ready:
675 if local_ready:
675 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
676 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
676 if not remote_ids:
677 if not remote_ids:
677 self._ready = True
678 self._ready = True
678 else:
679 else:
679 rdict = self._client.result_status(remote_ids, status_only=False)
680 rdict = self._client.result_status(remote_ids, status_only=False)
680 pending = rdict['pending']
681 pending = rdict['pending']
681 while pending and (timeout < 0 or time.time() < start+timeout):
682 while pending and (timeout < 0 or time.time() < start+timeout):
682 rdict = self._client.result_status(remote_ids, status_only=False)
683 rdict = self._client.result_status(remote_ids, status_only=False)
683 pending = rdict['pending']
684 pending = rdict['pending']
684 if pending:
685 if pending:
685 time.sleep(0.1)
686 time.sleep(0.1)
686 if not pending:
687 if not pending:
687 self._ready = True
688 self._ready = True
688 if self._ready:
689 if self._ready:
689 try:
690 try:
690 results = map(self._client.results.get, self.msg_ids)
691 results = map(self._client.results.get, self.msg_ids)
691 self._result = results
692 self._result = results
692 if self._single_result:
693 if self._single_result:
693 r = results[0]
694 r = results[0]
694 if isinstance(r, Exception):
695 if isinstance(r, Exception):
695 raise r
696 raise r
696 else:
697 else:
697 results = error.collect_exceptions(results, self._fname)
698 results = error.collect_exceptions(results, self._fname)
698 self._result = self._reconstruct_result(results)
699 self._result = self._reconstruct_result(results)
699 except Exception as e:
700 except Exception as e:
700 self._exception = e
701 self._exception = e
701 self._success = False
702 self._success = False
702 else:
703 else:
703 self._success = True
704 self._success = True
704 finally:
705 finally:
705 self._metadata = map(self._client.metadata.get, self.msg_ids)
706 self._metadata = map(self._client.metadata.get, self.msg_ids)
706
707
707 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']
708 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']
General Comments 0
You need to be logged in to leave comments. Login now