##// END OF EJS Templates
speed up AsyncResult._wait_for_outputs (don't waste 0.01 seconds)...
Anton Akhmerov -
Show More
@@ -1,708 +1,708 b''
1 """AsyncResult objects for the client
1 """AsyncResult objects for the client
2
2
3 Authors:
3 Authors:
4
4
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import print_function
18 from __future__ import print_function
19
19
20 import sys
20 import sys
21 import time
21 import time
22 from datetime import datetime
22 from datetime import datetime
23
23
24 from zmq import MessageTracker
24 from zmq import MessageTracker
25
25
26 from IPython.core.display import clear_output, display, display_pretty
26 from IPython.core.display import clear_output, display, display_pretty
27 from IPython.external.decorator import decorator
27 from IPython.external.decorator import decorator
28 from IPython.parallel import error
28 from IPython.parallel import error
29
29
30 #-----------------------------------------------------------------------------
30 #-----------------------------------------------------------------------------
31 # Functions
31 # Functions
32 #-----------------------------------------------------------------------------
32 #-----------------------------------------------------------------------------
33
33
34 def _total_seconds(td):
34 def _total_seconds(td):
35 """timedelta.total_seconds was added in 2.7"""
35 """timedelta.total_seconds was added in 2.7"""
36 try:
36 try:
37 # Python >= 2.7
37 # Python >= 2.7
38 return td.total_seconds()
38 return td.total_seconds()
39 except AttributeError:
39 except AttributeError:
40 # Python 2.6
40 # Python 2.6
41 return 1e-6 * (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)
41 return 1e-6 * (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)
42
42
43 def _raw_text(s):
43 def _raw_text(s):
44 display_pretty(s, raw=True)
44 display_pretty(s, raw=True)
45
45
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47 # Classes
47 # Classes
48 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
49
49
50 # global empty tracker that's always done:
50 # global empty tracker that's always done:
51 finished_tracker = MessageTracker()
51 finished_tracker = MessageTracker()
52
52
53 @decorator
53 @decorator
54 def check_ready(f, self, *args, **kwargs):
54 def check_ready(f, self, *args, **kwargs):
55 """Call spin() to sync state prior to calling the method."""
55 """Call spin() to sync state prior to calling the method."""
56 self.wait(0)
56 self.wait(0)
57 if not self._ready:
57 if not self._ready:
58 raise error.TimeoutError("result not ready")
58 raise error.TimeoutError("result not ready")
59 return f(self, *args, **kwargs)
59 return f(self, *args, **kwargs)
60
60
61 class AsyncResult(object):
61 class AsyncResult(object):
62 """Class for representing results of non-blocking calls.
62 """Class for representing results of non-blocking calls.
63
63
64 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
64 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
65 """
65 """
66
66
67 msg_ids = None
67 msg_ids = None
68 _targets = None
68 _targets = None
69 _tracker = None
69 _tracker = None
70 _single_result = False
70 _single_result = False
71
71
72 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
72 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
73 if isinstance(msg_ids, basestring):
73 if isinstance(msg_ids, basestring):
74 # always a list
74 # always a list
75 msg_ids = [msg_ids]
75 msg_ids = [msg_ids]
76 if tracker is None:
76 if tracker is None:
77 # default to always done
77 # default to always done
78 tracker = finished_tracker
78 tracker = finished_tracker
79 self._client = client
79 self._client = client
80 self.msg_ids = msg_ids
80 self.msg_ids = msg_ids
81 self._fname=fname
81 self._fname=fname
82 self._targets = targets
82 self._targets = targets
83 self._tracker = tracker
83 self._tracker = tracker
84 self._ready = False
84 self._ready = False
85 self._outputs_ready = False
85 self._outputs_ready = False
86 self._success = None
86 self._success = None
87 self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ]
87 self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ]
88 if len(msg_ids) == 1:
88 if len(msg_ids) == 1:
89 self._single_result = not isinstance(targets, (list, tuple))
89 self._single_result = not isinstance(targets, (list, tuple))
90 else:
90 else:
91 self._single_result = False
91 self._single_result = False
92
92
93 def __repr__(self):
93 def __repr__(self):
94 if self._ready:
94 if self._ready:
95 return "<%s: finished>"%(self.__class__.__name__)
95 return "<%s: finished>"%(self.__class__.__name__)
96 else:
96 else:
97 return "<%s: %s>"%(self.__class__.__name__,self._fname)
97 return "<%s: %s>"%(self.__class__.__name__,self._fname)
98
98
99
99
100 def _reconstruct_result(self, res):
100 def _reconstruct_result(self, res):
101 """Reconstruct our result from actual result list (always a list)
101 """Reconstruct our result from actual result list (always a list)
102
102
103 Override me in subclasses for turning a list of results
103 Override me in subclasses for turning a list of results
104 into the expected form.
104 into the expected form.
105 """
105 """
106 if self._single_result:
106 if self._single_result:
107 return res[0]
107 return res[0]
108 else:
108 else:
109 return res
109 return res
110
110
111 def get(self, timeout=-1):
111 def get(self, timeout=-1):
112 """Return the result when it arrives.
112 """Return the result when it arrives.
113
113
114 If `timeout` is not ``None`` and the result does not arrive within
114 If `timeout` is not ``None`` and the result does not arrive within
115 `timeout` seconds then ``TimeoutError`` is raised. If the
115 `timeout` seconds then ``TimeoutError`` is raised. If the
116 remote call raised an exception then that exception will be reraised
116 remote call raised an exception then that exception will be reraised
117 by get() inside a `RemoteError`.
117 by get() inside a `RemoteError`.
118 """
118 """
119 if not self.ready():
119 if not self.ready():
120 self.wait(timeout)
120 self.wait(timeout)
121
121
122 if self._ready:
122 if self._ready:
123 if self._success:
123 if self._success:
124 return self._result
124 return self._result
125 else:
125 else:
126 raise self._exception
126 raise self._exception
127 else:
127 else:
128 raise error.TimeoutError("Result not ready.")
128 raise error.TimeoutError("Result not ready.")
129
129
130 def _check_ready(self):
130 def _check_ready(self):
131 if not self.ready():
131 if not self.ready():
132 raise error.TimeoutError("Result not ready.")
132 raise error.TimeoutError("Result not ready.")
133
133
134 def ready(self):
134 def ready(self):
135 """Return whether the call has completed."""
135 """Return whether the call has completed."""
136 if not self._ready:
136 if not self._ready:
137 self.wait(0)
137 self.wait(0)
138 elif not self._outputs_ready:
138 elif not self._outputs_ready:
139 self._wait_for_outputs(0)
139 self._wait_for_outputs(0)
140
140
141 return self._ready
141 return self._ready
142
142
143 def wait(self, timeout=-1):
143 def wait(self, timeout=-1):
144 """Wait until the result is available or until `timeout` seconds pass.
144 """Wait until the result is available or until `timeout` seconds pass.
145
145
146 This method always returns None.
146 This method always returns None.
147 """
147 """
148 if self._ready:
148 if self._ready:
149 self._wait_for_outputs(timeout)
149 self._wait_for_outputs(timeout)
150 return
150 return
151 self._ready = self._client.wait(self.msg_ids, timeout)
151 self._ready = self._client.wait(self.msg_ids, timeout)
152 if self._ready:
152 if self._ready:
153 try:
153 try:
154 results = map(self._client.results.get, self.msg_ids)
154 results = map(self._client.results.get, self.msg_ids)
155 self._result = results
155 self._result = results
156 if self._single_result:
156 if self._single_result:
157 r = results[0]
157 r = results[0]
158 if isinstance(r, Exception):
158 if isinstance(r, Exception):
159 raise r
159 raise r
160 else:
160 else:
161 results = error.collect_exceptions(results, self._fname)
161 results = error.collect_exceptions(results, self._fname)
162 self._result = self._reconstruct_result(results)
162 self._result = self._reconstruct_result(results)
163 except Exception as e:
163 except Exception as e:
164 self._exception = e
164 self._exception = e
165 self._success = False
165 self._success = False
166 else:
166 else:
167 self._success = True
167 self._success = True
168 finally:
168 finally:
169 if timeout is None or timeout < 0:
169 if timeout is None or timeout < 0:
170 # cutoff infinite wait at 10s
170 # cutoff infinite wait at 10s
171 timeout = 10
171 timeout = 10
172 self._wait_for_outputs(timeout)
172 self._wait_for_outputs(timeout)
173
173
174
174
175 def successful(self):
175 def successful(self):
176 """Return whether the call completed without raising an exception.
176 """Return whether the call completed without raising an exception.
177
177
178 Will raise ``AssertionError`` if the result is not ready.
178 Will raise ``AssertionError`` if the result is not ready.
179 """
179 """
180 assert self.ready()
180 assert self.ready()
181 return self._success
181 return self._success
182
182
183 #----------------------------------------------------------------
183 #----------------------------------------------------------------
184 # Extra methods not in mp.pool.AsyncResult
184 # Extra methods not in mp.pool.AsyncResult
185 #----------------------------------------------------------------
185 #----------------------------------------------------------------
186
186
187 def get_dict(self, timeout=-1):
187 def get_dict(self, timeout=-1):
188 """Get the results as a dict, keyed by engine_id.
188 """Get the results as a dict, keyed by engine_id.
189
189
190 timeout behavior is described in `get()`.
190 timeout behavior is described in `get()`.
191 """
191 """
192
192
193 results = self.get(timeout)
193 results = self.get(timeout)
194 engine_ids = [ md['engine_id'] for md in self._metadata ]
194 engine_ids = [ md['engine_id'] for md in self._metadata ]
195 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
195 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
196 maxcount = bycount.count(bycount[-1])
196 maxcount = bycount.count(bycount[-1])
197 if maxcount > 1:
197 if maxcount > 1:
198 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
198 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
199 maxcount, bycount[-1]))
199 maxcount, bycount[-1]))
200
200
201 return dict(zip(engine_ids,results))
201 return dict(zip(engine_ids,results))
202
202
203 @property
203 @property
204 def result(self):
204 def result(self):
205 """result property wrapper for `get(timeout=-1)`."""
205 """result property wrapper for `get(timeout=-1)`."""
206 return self.get()
206 return self.get()
207
207
208 # abbreviated alias:
208 # abbreviated alias:
209 r = result
209 r = result
210
210
211 @property
211 @property
212 def metadata(self):
212 def metadata(self):
213 """property for accessing execution metadata."""
213 """property for accessing execution metadata."""
214 if self._single_result:
214 if self._single_result:
215 return self._metadata[0]
215 return self._metadata[0]
216 else:
216 else:
217 return self._metadata
217 return self._metadata
218
218
219 @property
219 @property
220 def result_dict(self):
220 def result_dict(self):
221 """result property as a dict."""
221 """result property as a dict."""
222 return self.get_dict()
222 return self.get_dict()
223
223
224 def __dict__(self):
224 def __dict__(self):
225 return self.get_dict(0)
225 return self.get_dict(0)
226
226
227 def abort(self):
227 def abort(self):
228 """abort my tasks."""
228 """abort my tasks."""
229 assert not self.ready(), "Can't abort, I am already done!"
229 assert not self.ready(), "Can't abort, I am already done!"
230 return self._client.abort(self.msg_ids, targets=self._targets, block=True)
230 return self._client.abort(self.msg_ids, targets=self._targets, block=True)
231
231
232 @property
232 @property
233 def sent(self):
233 def sent(self):
234 """check whether my messages have been sent."""
234 """check whether my messages have been sent."""
235 return self._tracker.done
235 return self._tracker.done
236
236
237 def wait_for_send(self, timeout=-1):
237 def wait_for_send(self, timeout=-1):
238 """wait for pyzmq send to complete.
238 """wait for pyzmq send to complete.
239
239
240 This is necessary when sending arrays that you intend to edit in-place.
240 This is necessary when sending arrays that you intend to edit in-place.
241 `timeout` is in seconds, and will raise TimeoutError if it is reached
241 `timeout` is in seconds, and will raise TimeoutError if it is reached
242 before the send completes.
242 before the send completes.
243 """
243 """
244 return self._tracker.wait(timeout)
244 return self._tracker.wait(timeout)
245
245
246 #-------------------------------------
246 #-------------------------------------
247 # dict-access
247 # dict-access
248 #-------------------------------------
248 #-------------------------------------
249
249
250 def __getitem__(self, key):
250 def __getitem__(self, key):
251 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
251 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
252 """
252 """
253 if isinstance(key, int):
253 if isinstance(key, int):
254 self._check_ready()
254 self._check_ready()
255 return error.collect_exceptions([self._result[key]], self._fname)[0]
255 return error.collect_exceptions([self._result[key]], self._fname)[0]
256 elif isinstance(key, slice):
256 elif isinstance(key, slice):
257 self._check_ready()
257 self._check_ready()
258 return error.collect_exceptions(self._result[key], self._fname)
258 return error.collect_exceptions(self._result[key], self._fname)
259 elif isinstance(key, basestring):
259 elif isinstance(key, basestring):
260 # metadata proxy *does not* require that results are done
260 # metadata proxy *does not* require that results are done
261 self.wait(0)
261 self.wait(0)
262 values = [ md[key] for md in self._metadata ]
262 values = [ md[key] for md in self._metadata ]
263 if self._single_result:
263 if self._single_result:
264 return values[0]
264 return values[0]
265 else:
265 else:
266 return values
266 return values
267 else:
267 else:
268 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
268 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
269
269
270 def __getattr__(self, key):
270 def __getattr__(self, key):
271 """getattr maps to getitem for convenient attr access to metadata."""
271 """getattr maps to getitem for convenient attr access to metadata."""
272 try:
272 try:
273 return self.__getitem__(key)
273 return self.__getitem__(key)
274 except (error.TimeoutError, KeyError):
274 except (error.TimeoutError, KeyError):
275 raise AttributeError("%r object has no attribute %r"%(
275 raise AttributeError("%r object has no attribute %r"%(
276 self.__class__.__name__, key))
276 self.__class__.__name__, key))
277
277
278 # asynchronous iterator:
278 # asynchronous iterator:
279 def __iter__(self):
279 def __iter__(self):
280 if self._single_result:
280 if self._single_result:
281 raise TypeError("AsyncResults with a single result are not iterable.")
281 raise TypeError("AsyncResults with a single result are not iterable.")
282 try:
282 try:
283 rlist = self.get(0)
283 rlist = self.get(0)
284 except error.TimeoutError:
284 except error.TimeoutError:
285 # wait for each result individually
285 # wait for each result individually
286 for msg_id in self.msg_ids:
286 for msg_id in self.msg_ids:
287 ar = AsyncResult(self._client, msg_id, self._fname)
287 ar = AsyncResult(self._client, msg_id, self._fname)
288 yield ar.get()
288 yield ar.get()
289 else:
289 else:
290 # already done
290 # already done
291 for r in rlist:
291 for r in rlist:
292 yield r
292 yield r
293
293
294 def __len__(self):
294 def __len__(self):
295 return len(self.msg_ids)
295 return len(self.msg_ids)
296
296
297 #-------------------------------------
297 #-------------------------------------
298 # Sugar methods and attributes
298 # Sugar methods and attributes
299 #-------------------------------------
299 #-------------------------------------
300
300
301 def timedelta(self, start, end, start_key=min, end_key=max):
301 def timedelta(self, start, end, start_key=min, end_key=max):
302 """compute the difference between two sets of timestamps
302 """compute the difference between two sets of timestamps
303
303
304 The default behavior is to use the earliest of the first
304 The default behavior is to use the earliest of the first
305 and the latest of the second list, but this can be changed
305 and the latest of the second list, but this can be changed
306 by passing a different
306 by passing a different
307
307
308 Parameters
308 Parameters
309 ----------
309 ----------
310
310
311 start : one or more datetime objects (e.g. ar.submitted)
311 start : one or more datetime objects (e.g. ar.submitted)
312 end : one or more datetime objects (e.g. ar.received)
312 end : one or more datetime objects (e.g. ar.received)
313 start_key : callable
313 start_key : callable
314 Function to call on `start` to extract the relevant
314 Function to call on `start` to extract the relevant
315 entry [defalt: min]
315 entry [defalt: min]
316 end_key : callable
316 end_key : callable
317 Function to call on `end` to extract the relevant
317 Function to call on `end` to extract the relevant
318 entry [default: max]
318 entry [default: max]
319
319
320 Returns
320 Returns
321 -------
321 -------
322
322
323 dt : float
323 dt : float
324 The time elapsed (in seconds) between the two selected timestamps.
324 The time elapsed (in seconds) between the two selected timestamps.
325 """
325 """
326 if not isinstance(start, datetime):
326 if not isinstance(start, datetime):
327 # handle single_result AsyncResults, where ar.stamp is single object,
327 # handle single_result AsyncResults, where ar.stamp is single object,
328 # not a list
328 # not a list
329 start = start_key(start)
329 start = start_key(start)
330 if not isinstance(end, datetime):
330 if not isinstance(end, datetime):
331 # handle single_result AsyncResults, where ar.stamp is single object,
331 # handle single_result AsyncResults, where ar.stamp is single object,
332 # not a list
332 # not a list
333 end = end_key(end)
333 end = end_key(end)
334 return _total_seconds(end - start)
334 return _total_seconds(end - start)
335
335
336 @property
336 @property
337 def progress(self):
337 def progress(self):
338 """the number of tasks which have been completed at this point.
338 """the number of tasks which have been completed at this point.
339
339
340 Fractional progress would be given by 1.0 * ar.progress / len(ar)
340 Fractional progress would be given by 1.0 * ar.progress / len(ar)
341 """
341 """
342 self.wait(0)
342 self.wait(0)
343 return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding))
343 return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding))
344
344
345 @property
345 @property
346 def elapsed(self):
346 def elapsed(self):
347 """elapsed time since initial submission"""
347 """elapsed time since initial submission"""
348 if self.ready():
348 if self.ready():
349 return self.wall_time
349 return self.wall_time
350
350
351 now = submitted = datetime.now()
351 now = submitted = datetime.now()
352 for msg_id in self.msg_ids:
352 for msg_id in self.msg_ids:
353 if msg_id in self._client.metadata:
353 if msg_id in self._client.metadata:
354 stamp = self._client.metadata[msg_id]['submitted']
354 stamp = self._client.metadata[msg_id]['submitted']
355 if stamp and stamp < submitted:
355 if stamp and stamp < submitted:
356 submitted = stamp
356 submitted = stamp
357 return _total_seconds(now-submitted)
357 return _total_seconds(now-submitted)
358
358
359 @property
359 @property
360 @check_ready
360 @check_ready
361 def serial_time(self):
361 def serial_time(self):
362 """serial computation time of a parallel calculation
362 """serial computation time of a parallel calculation
363
363
364 Computed as the sum of (completed-started) of each task
364 Computed as the sum of (completed-started) of each task
365 """
365 """
366 t = 0
366 t = 0
367 for md in self._metadata:
367 for md in self._metadata:
368 t += _total_seconds(md['completed'] - md['started'])
368 t += _total_seconds(md['completed'] - md['started'])
369 return t
369 return t
370
370
371 @property
371 @property
372 @check_ready
372 @check_ready
373 def wall_time(self):
373 def wall_time(self):
374 """actual computation time of a parallel calculation
374 """actual computation time of a parallel calculation
375
375
376 Computed as the time between the latest `received` stamp
376 Computed as the time between the latest `received` stamp
377 and the earliest `submitted`.
377 and the earliest `submitted`.
378
378
379 Only reliable if Client was spinning/waiting when the task finished, because
379 Only reliable if Client was spinning/waiting when the task finished, because
380 the `received` timestamp is created when a result is pulled off of the zmq queue,
380 the `received` timestamp is created when a result is pulled off of the zmq queue,
381 which happens as a result of `client.spin()`.
381 which happens as a result of `client.spin()`.
382
382
383 For similar comparison of other timestamp pairs, check out AsyncResult.timedelta.
383 For similar comparison of other timestamp pairs, check out AsyncResult.timedelta.
384
384
385 """
385 """
386 return self.timedelta(self.submitted, self.received)
386 return self.timedelta(self.submitted, self.received)
387
387
388 def wait_interactive(self, interval=1., timeout=-1):
388 def wait_interactive(self, interval=1., timeout=-1):
389 """interactive wait, printing progress at regular intervals"""
389 """interactive wait, printing progress at regular intervals"""
390 if timeout is None:
390 if timeout is None:
391 timeout = -1
391 timeout = -1
392 N = len(self)
392 N = len(self)
393 tic = time.time()
393 tic = time.time()
394 while not self.ready() and (timeout < 0 or time.time() - tic <= timeout):
394 while not self.ready() and (timeout < 0 or time.time() - tic <= timeout):
395 self.wait(interval)
395 self.wait(interval)
396 clear_output()
396 clear_output()
397 print("%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed), end="")
397 print("%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed), end="")
398 sys.stdout.flush()
398 sys.stdout.flush()
399 print()
399 print()
400 print("done")
400 print("done")
401
401
402 def _republish_displaypub(self, content, eid):
402 def _republish_displaypub(self, content, eid):
403 """republish individual displaypub content dicts"""
403 """republish individual displaypub content dicts"""
404 try:
404 try:
405 ip = get_ipython()
405 ip = get_ipython()
406 except NameError:
406 except NameError:
407 # displaypub is meaningless outside IPython
407 # displaypub is meaningless outside IPython
408 return
408 return
409 md = content['metadata'] or {}
409 md = content['metadata'] or {}
410 md['engine'] = eid
410 md['engine'] = eid
411 ip.display_pub.publish(content['source'], content['data'], md)
411 ip.display_pub.publish(content['source'], content['data'], md)
412
412
413 def _display_stream(self, text, prefix='', file=None):
413 def _display_stream(self, text, prefix='', file=None):
414 if not text:
414 if not text:
415 # nothing to display
415 # nothing to display
416 return
416 return
417 if file is None:
417 if file is None:
418 file = sys.stdout
418 file = sys.stdout
419 end = '' if text.endswith('\n') else '\n'
419 end = '' if text.endswith('\n') else '\n'
420
420
421 multiline = text.count('\n') > int(text.endswith('\n'))
421 multiline = text.count('\n') > int(text.endswith('\n'))
422 if prefix and multiline and not text.startswith('\n'):
422 if prefix and multiline and not text.startswith('\n'):
423 prefix = prefix + '\n'
423 prefix = prefix + '\n'
424 print("%s%s" % (prefix, text), file=file, end=end)
424 print("%s%s" % (prefix, text), file=file, end=end)
425
425
426
426
427 def _display_single_result(self):
427 def _display_single_result(self):
428 self._display_stream(self.stdout)
428 self._display_stream(self.stdout)
429 self._display_stream(self.stderr, file=sys.stderr)
429 self._display_stream(self.stderr, file=sys.stderr)
430
430
431 try:
431 try:
432 get_ipython()
432 get_ipython()
433 except NameError:
433 except NameError:
434 # displaypub is meaningless outside IPython
434 # displaypub is meaningless outside IPython
435 return
435 return
436
436
437 for output in self.outputs:
437 for output in self.outputs:
438 self._republish_displaypub(output, self.engine_id)
438 self._republish_displaypub(output, self.engine_id)
439
439
440 if self.pyout is not None:
440 if self.pyout is not None:
441 display(self.get())
441 display(self.get())
442
442
443 def _wait_for_outputs(self, timeout=-1):
443 def _wait_for_outputs(self, timeout=-1):
444 """wait for the 'status=idle' message that indicates we have all outputs
444 """wait for the 'status=idle' message that indicates we have all outputs
445 """
445 """
446 if self._outputs_ready or not self._success:
446 if self._outputs_ready or not self._success:
447 # don't wait on errors
447 # don't wait on errors
448 return
448 return
449
449
450 # cast None to -1 for infinite timeout
450 # cast None to -1 for infinite timeout
451 if timeout is None:
451 if timeout is None:
452 timeout = -1
452 timeout = -1
453
453
454 tic = time.time()
454 tic = time.time()
455 self._client._flush_iopub(self._client._iopub_socket)
455 while True:
456 self._outputs_ready = all(md['outputs_ready'] for md in self._metadata)
457 while not self._outputs_ready:
458 time.sleep(0.01)
459 self._client._flush_iopub(self._client._iopub_socket)
456 self._client._flush_iopub(self._client._iopub_socket)
460 self._outputs_ready = all(md['outputs_ready'] for md in self._metadata)
457 self._outputs_ready = all(md['outputs_ready']
461 if timeout >= 0 and time.time() > tic + timeout:
458 for md in self._metadata)
459 if self._outputs_ready or \
460 (timeout >= 0 and time.time() > tic + timeout):
462 break
461 break
462 time.sleep(0.01)
463
463
464 @check_ready
464 @check_ready
465 def display_outputs(self, groupby="type"):
465 def display_outputs(self, groupby="type"):
466 """republish the outputs of the computation
466 """republish the outputs of the computation
467
467
468 Parameters
468 Parameters
469 ----------
469 ----------
470
470
471 groupby : str [default: type]
471 groupby : str [default: type]
472 if 'type':
472 if 'type':
473 Group outputs by type (show all stdout, then all stderr, etc.):
473 Group outputs by type (show all stdout, then all stderr, etc.):
474
474
475 [stdout:1] foo
475 [stdout:1] foo
476 [stdout:2] foo
476 [stdout:2] foo
477 [stderr:1] bar
477 [stderr:1] bar
478 [stderr:2] bar
478 [stderr:2] bar
479 if 'engine':
479 if 'engine':
480 Display outputs for each engine before moving on to the next:
480 Display outputs for each engine before moving on to the next:
481
481
482 [stdout:1] foo
482 [stdout:1] foo
483 [stderr:1] bar
483 [stderr:1] bar
484 [stdout:2] foo
484 [stdout:2] foo
485 [stderr:2] bar
485 [stderr:2] bar
486
486
487 if 'order':
487 if 'order':
488 Like 'type', but further collate individual displaypub
488 Like 'type', but further collate individual displaypub
489 outputs. This is meant for cases of each command producing
489 outputs. This is meant for cases of each command producing
490 several plots, and you would like to see all of the first
490 several plots, and you would like to see all of the first
491 plots together, then all of the second plots, and so on.
491 plots together, then all of the second plots, and so on.
492 """
492 """
493 if self._single_result:
493 if self._single_result:
494 self._display_single_result()
494 self._display_single_result()
495 return
495 return
496
496
497 stdouts = self.stdout
497 stdouts = self.stdout
498 stderrs = self.stderr
498 stderrs = self.stderr
499 pyouts = self.pyout
499 pyouts = self.pyout
500 output_lists = self.outputs
500 output_lists = self.outputs
501 results = self.get()
501 results = self.get()
502
502
503 targets = self.engine_id
503 targets = self.engine_id
504
504
505 if groupby == "engine":
505 if groupby == "engine":
506 for eid,stdout,stderr,outputs,r,pyout in zip(
506 for eid,stdout,stderr,outputs,r,pyout in zip(
507 targets, stdouts, stderrs, output_lists, results, pyouts
507 targets, stdouts, stderrs, output_lists, results, pyouts
508 ):
508 ):
509 self._display_stream(stdout, '[stdout:%i] ' % eid)
509 self._display_stream(stdout, '[stdout:%i] ' % eid)
510 self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
510 self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
511
511
512 try:
512 try:
513 get_ipython()
513 get_ipython()
514 except NameError:
514 except NameError:
515 # displaypub is meaningless outside IPython
515 # displaypub is meaningless outside IPython
516 return
516 return
517
517
518 if outputs or pyout is not None:
518 if outputs or pyout is not None:
519 _raw_text('[output:%i]' % eid)
519 _raw_text('[output:%i]' % eid)
520
520
521 for output in outputs:
521 for output in outputs:
522 self._republish_displaypub(output, eid)
522 self._republish_displaypub(output, eid)
523
523
524 if pyout is not None:
524 if pyout is not None:
525 display(r)
525 display(r)
526
526
527 elif groupby in ('type', 'order'):
527 elif groupby in ('type', 'order'):
528 # republish stdout:
528 # republish stdout:
529 for eid,stdout in zip(targets, stdouts):
529 for eid,stdout in zip(targets, stdouts):
530 self._display_stream(stdout, '[stdout:%i] ' % eid)
530 self._display_stream(stdout, '[stdout:%i] ' % eid)
531
531
532 # republish stderr:
532 # republish stderr:
533 for eid,stderr in zip(targets, stderrs):
533 for eid,stderr in zip(targets, stderrs):
534 self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
534 self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
535
535
536 try:
536 try:
537 get_ipython()
537 get_ipython()
538 except NameError:
538 except NameError:
539 # displaypub is meaningless outside IPython
539 # displaypub is meaningless outside IPython
540 return
540 return
541
541
542 if groupby == 'order':
542 if groupby == 'order':
543 output_dict = dict((eid, outputs) for eid,outputs in zip(targets, output_lists))
543 output_dict = dict((eid, outputs) for eid,outputs in zip(targets, output_lists))
544 N = max(len(outputs) for outputs in output_lists)
544 N = max(len(outputs) for outputs in output_lists)
545 for i in range(N):
545 for i in range(N):
546 for eid in targets:
546 for eid in targets:
547 outputs = output_dict[eid]
547 outputs = output_dict[eid]
548 if len(outputs) >= N:
548 if len(outputs) >= N:
549 _raw_text('[output:%i]' % eid)
549 _raw_text('[output:%i]' % eid)
550 self._republish_displaypub(outputs[i], eid)
550 self._republish_displaypub(outputs[i], eid)
551 else:
551 else:
552 # republish displaypub output
552 # republish displaypub output
553 for eid,outputs in zip(targets, output_lists):
553 for eid,outputs in zip(targets, output_lists):
554 if outputs:
554 if outputs:
555 _raw_text('[output:%i]' % eid)
555 _raw_text('[output:%i]' % eid)
556 for output in outputs:
556 for output in outputs:
557 self._republish_displaypub(output, eid)
557 self._republish_displaypub(output, eid)
558
558
559 # finally, add pyout:
559 # finally, add pyout:
560 for eid,r,pyout in zip(targets, results, pyouts):
560 for eid,r,pyout in zip(targets, results, pyouts):
561 if pyout is not None:
561 if pyout is not None:
562 display(r)
562 display(r)
563
563
564 else:
564 else:
565 raise ValueError("groupby must be one of 'type', 'engine', 'collate', not %r" % groupby)
565 raise ValueError("groupby must be one of 'type', 'engine', 'collate', not %r" % groupby)
566
566
567
567
568
568
569
569
570 class AsyncMapResult(AsyncResult):
570 class AsyncMapResult(AsyncResult):
571 """Class for representing results of non-blocking gathers.
571 """Class for representing results of non-blocking gathers.
572
572
573 This will properly reconstruct the gather.
573 This will properly reconstruct the gather.
574
574
575 This class is iterable at any time, and will wait on results as they come.
575 This class is iterable at any time, and will wait on results as they come.
576
576
577 If ordered=False, then the first results to arrive will come first, otherwise
577 If ordered=False, then the first results to arrive will come first, otherwise
578 results will be yielded in the order they were submitted.
578 results will be yielded in the order they were submitted.
579
579
580 """
580 """
581
581
582 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
582 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
583 AsyncResult.__init__(self, client, msg_ids, fname=fname)
583 AsyncResult.__init__(self, client, msg_ids, fname=fname)
584 self._mapObject = mapObject
584 self._mapObject = mapObject
585 self._single_result = False
585 self._single_result = False
586 self.ordered = ordered
586 self.ordered = ordered
587
587
588 def _reconstruct_result(self, res):
588 def _reconstruct_result(self, res):
589 """Perform the gather on the actual results."""
589 """Perform the gather on the actual results."""
590 return self._mapObject.joinPartitions(res)
590 return self._mapObject.joinPartitions(res)
591
591
592 # asynchronous iterator:
592 # asynchronous iterator:
593 def __iter__(self):
593 def __iter__(self):
594 it = self._ordered_iter if self.ordered else self._unordered_iter
594 it = self._ordered_iter if self.ordered else self._unordered_iter
595 for r in it():
595 for r in it():
596 yield r
596 yield r
597
597
598 # asynchronous ordered iterator:
598 # asynchronous ordered iterator:
599 def _ordered_iter(self):
599 def _ordered_iter(self):
600 """iterator for results *as they arrive*, preserving submission order."""
600 """iterator for results *as they arrive*, preserving submission order."""
601 try:
601 try:
602 rlist = self.get(0)
602 rlist = self.get(0)
603 except error.TimeoutError:
603 except error.TimeoutError:
604 # wait for each result individually
604 # wait for each result individually
605 for msg_id in self.msg_ids:
605 for msg_id in self.msg_ids:
606 ar = AsyncResult(self._client, msg_id, self._fname)
606 ar = AsyncResult(self._client, msg_id, self._fname)
607 rlist = ar.get()
607 rlist = ar.get()
608 try:
608 try:
609 for r in rlist:
609 for r in rlist:
610 yield r
610 yield r
611 except TypeError:
611 except TypeError:
612 # flattened, not a list
612 # flattened, not a list
613 # this could get broken by flattened data that returns iterables
613 # this could get broken by flattened data that returns iterables
614 # but most calls to map do not expose the `flatten` argument
614 # but most calls to map do not expose the `flatten` argument
615 yield rlist
615 yield rlist
616 else:
616 else:
617 # already done
617 # already done
618 for r in rlist:
618 for r in rlist:
619 yield r
619 yield r
620
620
621 # asynchronous unordered iterator:
621 # asynchronous unordered iterator:
622 def _unordered_iter(self):
622 def _unordered_iter(self):
623 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
623 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
624 try:
624 try:
625 rlist = self.get(0)
625 rlist = self.get(0)
626 except error.TimeoutError:
626 except error.TimeoutError:
627 pending = set(self.msg_ids)
627 pending = set(self.msg_ids)
628 while pending:
628 while pending:
629 try:
629 try:
630 self._client.wait(pending, 1e-3)
630 self._client.wait(pending, 1e-3)
631 except error.TimeoutError:
631 except error.TimeoutError:
632 # ignore timeout error, because that only means
632 # ignore timeout error, because that only means
633 # *some* jobs are outstanding
633 # *some* jobs are outstanding
634 pass
634 pass
635 # update ready set with those no longer outstanding:
635 # update ready set with those no longer outstanding:
636 ready = pending.difference(self._client.outstanding)
636 ready = pending.difference(self._client.outstanding)
637 # update pending to exclude those that are finished
637 # update pending to exclude those that are finished
638 pending = pending.difference(ready)
638 pending = pending.difference(ready)
639 while ready:
639 while ready:
640 msg_id = ready.pop()
640 msg_id = ready.pop()
641 ar = AsyncResult(self._client, msg_id, self._fname)
641 ar = AsyncResult(self._client, msg_id, self._fname)
642 rlist = ar.get()
642 rlist = ar.get()
643 try:
643 try:
644 for r in rlist:
644 for r in rlist:
645 yield r
645 yield r
646 except TypeError:
646 except TypeError:
647 # flattened, not a list
647 # flattened, not a list
648 # this could get broken by flattened data that returns iterables
648 # this could get broken by flattened data that returns iterables
649 # but most calls to map do not expose the `flatten` argument
649 # but most calls to map do not expose the `flatten` argument
650 yield rlist
650 yield rlist
651 else:
651 else:
652 # already done
652 # already done
653 for r in rlist:
653 for r in rlist:
654 yield r
654 yield r
655
655
656
656
657 class AsyncHubResult(AsyncResult):
657 class AsyncHubResult(AsyncResult):
658 """Class to wrap pending results that must be requested from the Hub.
658 """Class to wrap pending results that must be requested from the Hub.
659
659
660 Note that waiting/polling on these objects requires polling the Hubover the network,
660 Note that waiting/polling on these objects requires polling the Hubover the network,
661 so use `AsyncHubResult.wait()` sparingly.
661 so use `AsyncHubResult.wait()` sparingly.
662 """
662 """
663
663
664 def _wait_for_outputs(self, timeout=-1):
664 def _wait_for_outputs(self, timeout=-1):
665 """no-op, because HubResults are never incomplete"""
665 """no-op, because HubResults are never incomplete"""
666 self._outputs_ready = True
666 self._outputs_ready = True
667
667
668 def wait(self, timeout=-1):
668 def wait(self, timeout=-1):
669 """wait for result to complete."""
669 """wait for result to complete."""
670 start = time.time()
670 start = time.time()
671 if self._ready:
671 if self._ready:
672 return
672 return
673 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
673 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
674 local_ready = self._client.wait(local_ids, timeout)
674 local_ready = self._client.wait(local_ids, timeout)
675 if local_ready:
675 if local_ready:
676 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
676 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
677 if not remote_ids:
677 if not remote_ids:
678 self._ready = True
678 self._ready = True
679 else:
679 else:
680 rdict = self._client.result_status(remote_ids, status_only=False)
680 rdict = self._client.result_status(remote_ids, status_only=False)
681 pending = rdict['pending']
681 pending = rdict['pending']
682 while pending and (timeout < 0 or time.time() < start+timeout):
682 while pending and (timeout < 0 or time.time() < start+timeout):
683 rdict = self._client.result_status(remote_ids, status_only=False)
683 rdict = self._client.result_status(remote_ids, status_only=False)
684 pending = rdict['pending']
684 pending = rdict['pending']
685 if pending:
685 if pending:
686 time.sleep(0.1)
686 time.sleep(0.1)
687 if not pending:
687 if not pending:
688 self._ready = True
688 self._ready = True
689 if self._ready:
689 if self._ready:
690 try:
690 try:
691 results = map(self._client.results.get, self.msg_ids)
691 results = map(self._client.results.get, self.msg_ids)
692 self._result = results
692 self._result = results
693 if self._single_result:
693 if self._single_result:
694 r = results[0]
694 r = results[0]
695 if isinstance(r, Exception):
695 if isinstance(r, Exception):
696 raise r
696 raise r
697 else:
697 else:
698 results = error.collect_exceptions(results, self._fname)
698 results = error.collect_exceptions(results, self._fname)
699 self._result = self._reconstruct_result(results)
699 self._result = self._reconstruct_result(results)
700 except Exception as e:
700 except Exception as e:
701 self._exception = e
701 self._exception = e
702 self._success = False
702 self._success = False
703 else:
703 else:
704 self._success = True
704 self._success = True
705 finally:
705 finally:
706 self._metadata = map(self._client.metadata.get, self.msg_ids)
706 self._metadata = map(self._client.metadata.get, self.msg_ids)
707
707
708 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']
708 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']
General Comments 0
You need to be logged in to leave comments. Login now