##// END OF EJS Templates
fix AsyncResult.get_dict for single result...
MinRK -
Show More
@@ -1,708 +1,713 b''
1 """AsyncResult objects for the client
1 """AsyncResult objects for the client
2
2
3 Authors:
3 Authors:
4
4
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import print_function
18 from __future__ import print_function
19
19
20 import sys
20 import sys
21 import time
21 import time
22 from datetime import datetime
22 from datetime import datetime
23
23
24 from zmq import MessageTracker
24 from zmq import MessageTracker
25
25
26 from IPython.core.display import clear_output, display, display_pretty
26 from IPython.core.display import clear_output, display, display_pretty
27 from IPython.external.decorator import decorator
27 from IPython.external.decorator import decorator
28 from IPython.parallel import error
28 from IPython.parallel import error
29
29
30 #-----------------------------------------------------------------------------
30 #-----------------------------------------------------------------------------
31 # Functions
31 # Functions
32 #-----------------------------------------------------------------------------
32 #-----------------------------------------------------------------------------
33
33
34 def _total_seconds(td):
34 def _total_seconds(td):
35 """timedelta.total_seconds was added in 2.7"""
35 """timedelta.total_seconds was added in 2.7"""
36 try:
36 try:
37 # Python >= 2.7
37 # Python >= 2.7
38 return td.total_seconds()
38 return td.total_seconds()
39 except AttributeError:
39 except AttributeError:
40 # Python 2.6
40 # Python 2.6
41 return 1e-6 * (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)
41 return 1e-6 * (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)
42
42
43 def _raw_text(s):
43 def _raw_text(s):
44 display_pretty(s, raw=True)
44 display_pretty(s, raw=True)
45
45
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47 # Classes
47 # Classes
48 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
49
49
50 # global empty tracker that's always done:
50 # global empty tracker that's always done:
51 finished_tracker = MessageTracker()
51 finished_tracker = MessageTracker()
52
52
53 @decorator
53 @decorator
54 def check_ready(f, self, *args, **kwargs):
54 def check_ready(f, self, *args, **kwargs):
55 """Call spin() to sync state prior to calling the method."""
55 """Call spin() to sync state prior to calling the method."""
56 self.wait(0)
56 self.wait(0)
57 if not self._ready:
57 if not self._ready:
58 raise error.TimeoutError("result not ready")
58 raise error.TimeoutError("result not ready")
59 return f(self, *args, **kwargs)
59 return f(self, *args, **kwargs)
60
60
61 class AsyncResult(object):
61 class AsyncResult(object):
62 """Class for representing results of non-blocking calls.
62 """Class for representing results of non-blocking calls.
63
63
64 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
64 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
65 """
65 """
66
66
67 msg_ids = None
67 msg_ids = None
68 _targets = None
68 _targets = None
69 _tracker = None
69 _tracker = None
70 _single_result = False
70 _single_result = False
71
71
72 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
72 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
73 if isinstance(msg_ids, basestring):
73 if isinstance(msg_ids, basestring):
74 # always a list
74 # always a list
75 msg_ids = [msg_ids]
75 msg_ids = [msg_ids]
76 if tracker is None:
76 if tracker is None:
77 # default to always done
77 # default to always done
78 tracker = finished_tracker
78 tracker = finished_tracker
79 self._client = client
79 self._client = client
80 self.msg_ids = msg_ids
80 self.msg_ids = msg_ids
81 self._fname=fname
81 self._fname=fname
82 self._targets = targets
82 self._targets = targets
83 self._tracker = tracker
83 self._tracker = tracker
84 self._ready = False
84 self._ready = False
85 self._outputs_ready = False
85 self._outputs_ready = False
86 self._success = None
86 self._success = None
87 self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ]
87 self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ]
88 if len(msg_ids) == 1:
88 if len(msg_ids) == 1:
89 self._single_result = not isinstance(targets, (list, tuple))
89 self._single_result = not isinstance(targets, (list, tuple))
90 else:
90 else:
91 self._single_result = False
91 self._single_result = False
92
92
93 def __repr__(self):
93 def __repr__(self):
94 if self._ready:
94 if self._ready:
95 return "<%s: finished>"%(self.__class__.__name__)
95 return "<%s: finished>"%(self.__class__.__name__)
96 else:
96 else:
97 return "<%s: %s>"%(self.__class__.__name__,self._fname)
97 return "<%s: %s>"%(self.__class__.__name__,self._fname)
98
98
99
99
100 def _reconstruct_result(self, res):
100 def _reconstruct_result(self, res):
101 """Reconstruct our result from actual result list (always a list)
101 """Reconstruct our result from actual result list (always a list)
102
102
103 Override me in subclasses for turning a list of results
103 Override me in subclasses for turning a list of results
104 into the expected form.
104 into the expected form.
105 """
105 """
106 if self._single_result:
106 if self._single_result:
107 return res[0]
107 return res[0]
108 else:
108 else:
109 return res
109 return res
110
110
111 def get(self, timeout=-1):
111 def get(self, timeout=-1):
112 """Return the result when it arrives.
112 """Return the result when it arrives.
113
113
114 If `timeout` is not ``None`` and the result does not arrive within
114 If `timeout` is not ``None`` and the result does not arrive within
115 `timeout` seconds then ``TimeoutError`` is raised. If the
115 `timeout` seconds then ``TimeoutError`` is raised. If the
116 remote call raised an exception then that exception will be reraised
116 remote call raised an exception then that exception will be reraised
117 by get() inside a `RemoteError`.
117 by get() inside a `RemoteError`.
118 """
118 """
119 if not self.ready():
119 if not self.ready():
120 self.wait(timeout)
120 self.wait(timeout)
121
121
122 if self._ready:
122 if self._ready:
123 if self._success:
123 if self._success:
124 return self._result
124 return self._result
125 else:
125 else:
126 raise self._exception
126 raise self._exception
127 else:
127 else:
128 raise error.TimeoutError("Result not ready.")
128 raise error.TimeoutError("Result not ready.")
129
129
130 def _check_ready(self):
130 def _check_ready(self):
131 if not self.ready():
131 if not self.ready():
132 raise error.TimeoutError("Result not ready.")
132 raise error.TimeoutError("Result not ready.")
133
133
134 def ready(self):
134 def ready(self):
135 """Return whether the call has completed."""
135 """Return whether the call has completed."""
136 if not self._ready:
136 if not self._ready:
137 self.wait(0)
137 self.wait(0)
138 elif not self._outputs_ready:
138 elif not self._outputs_ready:
139 self._wait_for_outputs(0)
139 self._wait_for_outputs(0)
140
140
141 return self._ready
141 return self._ready
142
142
143 def wait(self, timeout=-1):
143 def wait(self, timeout=-1):
144 """Wait until the result is available or until `timeout` seconds pass.
144 """Wait until the result is available or until `timeout` seconds pass.
145
145
146 This method always returns None.
146 This method always returns None.
147 """
147 """
148 if self._ready:
148 if self._ready:
149 self._wait_for_outputs(timeout)
149 self._wait_for_outputs(timeout)
150 return
150 return
151 self._ready = self._client.wait(self.msg_ids, timeout)
151 self._ready = self._client.wait(self.msg_ids, timeout)
152 if self._ready:
152 if self._ready:
153 try:
153 try:
154 results = map(self._client.results.get, self.msg_ids)
154 results = map(self._client.results.get, self.msg_ids)
155 self._result = results
155 self._result = results
156 if self._single_result:
156 if self._single_result:
157 r = results[0]
157 r = results[0]
158 if isinstance(r, Exception):
158 if isinstance(r, Exception):
159 raise r
159 raise r
160 else:
160 else:
161 results = error.collect_exceptions(results, self._fname)
161 results = error.collect_exceptions(results, self._fname)
162 self._result = self._reconstruct_result(results)
162 self._result = self._reconstruct_result(results)
163 except Exception as e:
163 except Exception as e:
164 self._exception = e
164 self._exception = e
165 self._success = False
165 self._success = False
166 else:
166 else:
167 self._success = True
167 self._success = True
168 finally:
168 finally:
169 if timeout is None or timeout < 0:
169 if timeout is None or timeout < 0:
170 # cutoff infinite wait at 10s
170 # cutoff infinite wait at 10s
171 timeout = 10
171 timeout = 10
172 self._wait_for_outputs(timeout)
172 self._wait_for_outputs(timeout)
173
173
174
174
175 def successful(self):
175 def successful(self):
176 """Return whether the call completed without raising an exception.
176 """Return whether the call completed without raising an exception.
177
177
178 Will raise ``AssertionError`` if the result is not ready.
178 Will raise ``AssertionError`` if the result is not ready.
179 """
179 """
180 assert self.ready()
180 assert self.ready()
181 return self._success
181 return self._success
182
182
183 #----------------------------------------------------------------
183 #----------------------------------------------------------------
184 # Extra methods not in mp.pool.AsyncResult
184 # Extra methods not in mp.pool.AsyncResult
185 #----------------------------------------------------------------
185 #----------------------------------------------------------------
186
186
187 def get_dict(self, timeout=-1):
187 def get_dict(self, timeout=-1):
188 """Get the results as a dict, keyed by engine_id.
188 """Get the results as a dict, keyed by engine_id.
189
189
190 timeout behavior is described in `get()`.
190 timeout behavior is described in `get()`.
191 """
191 """
192
192
193 results = self.get(timeout)
193 results = self.get(timeout)
194 if self._single_result:
195 results = [results]
194 engine_ids = [ md['engine_id'] for md in self._metadata ]
196 engine_ids = [ md['engine_id'] for md in self._metadata ]
195 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
197
196 maxcount = bycount.count(bycount[-1])
198 seen = set()
197 if maxcount > 1:
199 for engine_id in engine_ids:
200 if engine_id in seen:
198 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
201 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
199 maxcount, bycount[-1]))
202 engine_ids.count(engine_id), engine_id))
203 else:
204 seen.add(engine_id)
200
205
201 return dict(zip(engine_ids,results))
206 return dict(zip(engine_ids,results))
202
207
203 @property
208 @property
204 def result(self):
209 def result(self):
205 """result property wrapper for `get(timeout=-1)`."""
210 """result property wrapper for `get(timeout=-1)`."""
206 return self.get()
211 return self.get()
207
212
208 # abbreviated alias:
213 # abbreviated alias:
209 r = result
214 r = result
210
215
211 @property
216 @property
212 def metadata(self):
217 def metadata(self):
213 """property for accessing execution metadata."""
218 """property for accessing execution metadata."""
214 if self._single_result:
219 if self._single_result:
215 return self._metadata[0]
220 return self._metadata[0]
216 else:
221 else:
217 return self._metadata
222 return self._metadata
218
223
219 @property
224 @property
220 def result_dict(self):
225 def result_dict(self):
221 """result property as a dict."""
226 """result property as a dict."""
222 return self.get_dict()
227 return self.get_dict()
223
228
224 def __dict__(self):
229 def __dict__(self):
225 return self.get_dict(0)
230 return self.get_dict(0)
226
231
227 def abort(self):
232 def abort(self):
228 """abort my tasks."""
233 """abort my tasks."""
229 assert not self.ready(), "Can't abort, I am already done!"
234 assert not self.ready(), "Can't abort, I am already done!"
230 return self._client.abort(self.msg_ids, targets=self._targets, block=True)
235 return self._client.abort(self.msg_ids, targets=self._targets, block=True)
231
236
232 @property
237 @property
233 def sent(self):
238 def sent(self):
234 """check whether my messages have been sent."""
239 """check whether my messages have been sent."""
235 return self._tracker.done
240 return self._tracker.done
236
241
237 def wait_for_send(self, timeout=-1):
242 def wait_for_send(self, timeout=-1):
238 """wait for pyzmq send to complete.
243 """wait for pyzmq send to complete.
239
244
240 This is necessary when sending arrays that you intend to edit in-place.
245 This is necessary when sending arrays that you intend to edit in-place.
241 `timeout` is in seconds, and will raise TimeoutError if it is reached
246 `timeout` is in seconds, and will raise TimeoutError if it is reached
242 before the send completes.
247 before the send completes.
243 """
248 """
244 return self._tracker.wait(timeout)
249 return self._tracker.wait(timeout)
245
250
246 #-------------------------------------
251 #-------------------------------------
247 # dict-access
252 # dict-access
248 #-------------------------------------
253 #-------------------------------------
249
254
250 def __getitem__(self, key):
255 def __getitem__(self, key):
251 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
256 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
252 """
257 """
253 if isinstance(key, int):
258 if isinstance(key, int):
254 self._check_ready()
259 self._check_ready()
255 return error.collect_exceptions([self._result[key]], self._fname)[0]
260 return error.collect_exceptions([self._result[key]], self._fname)[0]
256 elif isinstance(key, slice):
261 elif isinstance(key, slice):
257 self._check_ready()
262 self._check_ready()
258 return error.collect_exceptions(self._result[key], self._fname)
263 return error.collect_exceptions(self._result[key], self._fname)
259 elif isinstance(key, basestring):
264 elif isinstance(key, basestring):
260 # metadata proxy *does not* require that results are done
265 # metadata proxy *does not* require that results are done
261 self.wait(0)
266 self.wait(0)
262 values = [ md[key] for md in self._metadata ]
267 values = [ md[key] for md in self._metadata ]
263 if self._single_result:
268 if self._single_result:
264 return values[0]
269 return values[0]
265 else:
270 else:
266 return values
271 return values
267 else:
272 else:
268 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
273 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
269
274
270 def __getattr__(self, key):
275 def __getattr__(self, key):
271 """getattr maps to getitem for convenient attr access to metadata."""
276 """getattr maps to getitem for convenient attr access to metadata."""
272 try:
277 try:
273 return self.__getitem__(key)
278 return self.__getitem__(key)
274 except (error.TimeoutError, KeyError):
279 except (error.TimeoutError, KeyError):
275 raise AttributeError("%r object has no attribute %r"%(
280 raise AttributeError("%r object has no attribute %r"%(
276 self.__class__.__name__, key))
281 self.__class__.__name__, key))
277
282
278 # asynchronous iterator:
283 # asynchronous iterator:
279 def __iter__(self):
284 def __iter__(self):
280 if self._single_result:
285 if self._single_result:
281 raise TypeError("AsyncResults with a single result are not iterable.")
286 raise TypeError("AsyncResults with a single result are not iterable.")
282 try:
287 try:
283 rlist = self.get(0)
288 rlist = self.get(0)
284 except error.TimeoutError:
289 except error.TimeoutError:
285 # wait for each result individually
290 # wait for each result individually
286 for msg_id in self.msg_ids:
291 for msg_id in self.msg_ids:
287 ar = AsyncResult(self._client, msg_id, self._fname)
292 ar = AsyncResult(self._client, msg_id, self._fname)
288 yield ar.get()
293 yield ar.get()
289 else:
294 else:
290 # already done
295 # already done
291 for r in rlist:
296 for r in rlist:
292 yield r
297 yield r
293
298
294 def __len__(self):
299 def __len__(self):
295 return len(self.msg_ids)
300 return len(self.msg_ids)
296
301
297 #-------------------------------------
302 #-------------------------------------
298 # Sugar methods and attributes
303 # Sugar methods and attributes
299 #-------------------------------------
304 #-------------------------------------
300
305
301 def timedelta(self, start, end, start_key=min, end_key=max):
306 def timedelta(self, start, end, start_key=min, end_key=max):
302 """compute the difference between two sets of timestamps
307 """compute the difference between two sets of timestamps
303
308
304 The default behavior is to use the earliest of the first
309 The default behavior is to use the earliest of the first
305 and the latest of the second list, but this can be changed
310 and the latest of the second list, but this can be changed
306 by passing a different
311 by passing a different
307
312
308 Parameters
313 Parameters
309 ----------
314 ----------
310
315
311 start : one or more datetime objects (e.g. ar.submitted)
316 start : one or more datetime objects (e.g. ar.submitted)
312 end : one or more datetime objects (e.g. ar.received)
317 end : one or more datetime objects (e.g. ar.received)
313 start_key : callable
318 start_key : callable
314 Function to call on `start` to extract the relevant
319 Function to call on `start` to extract the relevant
315 entry [defalt: min]
320 entry [defalt: min]
316 end_key : callable
321 end_key : callable
317 Function to call on `end` to extract the relevant
322 Function to call on `end` to extract the relevant
318 entry [default: max]
323 entry [default: max]
319
324
320 Returns
325 Returns
321 -------
326 -------
322
327
323 dt : float
328 dt : float
324 The time elapsed (in seconds) between the two selected timestamps.
329 The time elapsed (in seconds) between the two selected timestamps.
325 """
330 """
326 if not isinstance(start, datetime):
331 if not isinstance(start, datetime):
327 # handle single_result AsyncResults, where ar.stamp is single object,
332 # handle single_result AsyncResults, where ar.stamp is single object,
328 # not a list
333 # not a list
329 start = start_key(start)
334 start = start_key(start)
330 if not isinstance(end, datetime):
335 if not isinstance(end, datetime):
331 # handle single_result AsyncResults, where ar.stamp is single object,
336 # handle single_result AsyncResults, where ar.stamp is single object,
332 # not a list
337 # not a list
333 end = end_key(end)
338 end = end_key(end)
334 return _total_seconds(end - start)
339 return _total_seconds(end - start)
335
340
336 @property
341 @property
337 def progress(self):
342 def progress(self):
338 """the number of tasks which have been completed at this point.
343 """the number of tasks which have been completed at this point.
339
344
340 Fractional progress would be given by 1.0 * ar.progress / len(ar)
345 Fractional progress would be given by 1.0 * ar.progress / len(ar)
341 """
346 """
342 self.wait(0)
347 self.wait(0)
343 return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding))
348 return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding))
344
349
345 @property
350 @property
346 def elapsed(self):
351 def elapsed(self):
347 """elapsed time since initial submission"""
352 """elapsed time since initial submission"""
348 if self.ready():
353 if self.ready():
349 return self.wall_time
354 return self.wall_time
350
355
351 now = submitted = datetime.now()
356 now = submitted = datetime.now()
352 for msg_id in self.msg_ids:
357 for msg_id in self.msg_ids:
353 if msg_id in self._client.metadata:
358 if msg_id in self._client.metadata:
354 stamp = self._client.metadata[msg_id]['submitted']
359 stamp = self._client.metadata[msg_id]['submitted']
355 if stamp and stamp < submitted:
360 if stamp and stamp < submitted:
356 submitted = stamp
361 submitted = stamp
357 return _total_seconds(now-submitted)
362 return _total_seconds(now-submitted)
358
363
359 @property
364 @property
360 @check_ready
365 @check_ready
361 def serial_time(self):
366 def serial_time(self):
362 """serial computation time of a parallel calculation
367 """serial computation time of a parallel calculation
363
368
364 Computed as the sum of (completed-started) of each task
369 Computed as the sum of (completed-started) of each task
365 """
370 """
366 t = 0
371 t = 0
367 for md in self._metadata:
372 for md in self._metadata:
368 t += _total_seconds(md['completed'] - md['started'])
373 t += _total_seconds(md['completed'] - md['started'])
369 return t
374 return t
370
375
371 @property
376 @property
372 @check_ready
377 @check_ready
373 def wall_time(self):
378 def wall_time(self):
374 """actual computation time of a parallel calculation
379 """actual computation time of a parallel calculation
375
380
376 Computed as the time between the latest `received` stamp
381 Computed as the time between the latest `received` stamp
377 and the earliest `submitted`.
382 and the earliest `submitted`.
378
383
379 Only reliable if Client was spinning/waiting when the task finished, because
384 Only reliable if Client was spinning/waiting when the task finished, because
380 the `received` timestamp is created when a result is pulled off of the zmq queue,
385 the `received` timestamp is created when a result is pulled off of the zmq queue,
381 which happens as a result of `client.spin()`.
386 which happens as a result of `client.spin()`.
382
387
383 For similar comparison of other timestamp pairs, check out AsyncResult.timedelta.
388 For similar comparison of other timestamp pairs, check out AsyncResult.timedelta.
384
389
385 """
390 """
386 return self.timedelta(self.submitted, self.received)
391 return self.timedelta(self.submitted, self.received)
387
392
388 def wait_interactive(self, interval=1., timeout=-1):
393 def wait_interactive(self, interval=1., timeout=-1):
389 """interactive wait, printing progress at regular intervals"""
394 """interactive wait, printing progress at regular intervals"""
390 if timeout is None:
395 if timeout is None:
391 timeout = -1
396 timeout = -1
392 N = len(self)
397 N = len(self)
393 tic = time.time()
398 tic = time.time()
394 while not self.ready() and (timeout < 0 or time.time() - tic <= timeout):
399 while not self.ready() and (timeout < 0 or time.time() - tic <= timeout):
395 self.wait(interval)
400 self.wait(interval)
396 clear_output()
401 clear_output()
397 print("%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed), end="")
402 print("%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed), end="")
398 sys.stdout.flush()
403 sys.stdout.flush()
399 print()
404 print()
400 print("done")
405 print("done")
401
406
402 def _republish_displaypub(self, content, eid):
407 def _republish_displaypub(self, content, eid):
403 """republish individual displaypub content dicts"""
408 """republish individual displaypub content dicts"""
404 try:
409 try:
405 ip = get_ipython()
410 ip = get_ipython()
406 except NameError:
411 except NameError:
407 # displaypub is meaningless outside IPython
412 # displaypub is meaningless outside IPython
408 return
413 return
409 md = content['metadata'] or {}
414 md = content['metadata'] or {}
410 md['engine'] = eid
415 md['engine'] = eid
411 ip.display_pub.publish(content['source'], content['data'], md)
416 ip.display_pub.publish(content['source'], content['data'], md)
412
417
413 def _display_stream(self, text, prefix='', file=None):
418 def _display_stream(self, text, prefix='', file=None):
414 if not text:
419 if not text:
415 # nothing to display
420 # nothing to display
416 return
421 return
417 if file is None:
422 if file is None:
418 file = sys.stdout
423 file = sys.stdout
419 end = '' if text.endswith('\n') else '\n'
424 end = '' if text.endswith('\n') else '\n'
420
425
421 multiline = text.count('\n') > int(text.endswith('\n'))
426 multiline = text.count('\n') > int(text.endswith('\n'))
422 if prefix and multiline and not text.startswith('\n'):
427 if prefix and multiline and not text.startswith('\n'):
423 prefix = prefix + '\n'
428 prefix = prefix + '\n'
424 print("%s%s" % (prefix, text), file=file, end=end)
429 print("%s%s" % (prefix, text), file=file, end=end)
425
430
426
431
427 def _display_single_result(self):
432 def _display_single_result(self):
428 self._display_stream(self.stdout)
433 self._display_stream(self.stdout)
429 self._display_stream(self.stderr, file=sys.stderr)
434 self._display_stream(self.stderr, file=sys.stderr)
430
435
431 try:
436 try:
432 get_ipython()
437 get_ipython()
433 except NameError:
438 except NameError:
434 # displaypub is meaningless outside IPython
439 # displaypub is meaningless outside IPython
435 return
440 return
436
441
437 for output in self.outputs:
442 for output in self.outputs:
438 self._republish_displaypub(output, self.engine_id)
443 self._republish_displaypub(output, self.engine_id)
439
444
440 if self.pyout is not None:
445 if self.pyout is not None:
441 display(self.get())
446 display(self.get())
442
447
443 def _wait_for_outputs(self, timeout=-1):
448 def _wait_for_outputs(self, timeout=-1):
444 """wait for the 'status=idle' message that indicates we have all outputs
449 """wait for the 'status=idle' message that indicates we have all outputs
445 """
450 """
446 if self._outputs_ready or not self._success:
451 if self._outputs_ready or not self._success:
447 # don't wait on errors
452 # don't wait on errors
448 return
453 return
449
454
450 # cast None to -1 for infinite timeout
455 # cast None to -1 for infinite timeout
451 if timeout is None:
456 if timeout is None:
452 timeout = -1
457 timeout = -1
453
458
454 tic = time.time()
459 tic = time.time()
455 while True:
460 while True:
456 self._client._flush_iopub(self._client._iopub_socket)
461 self._client._flush_iopub(self._client._iopub_socket)
457 self._outputs_ready = all(md['outputs_ready']
462 self._outputs_ready = all(md['outputs_ready']
458 for md in self._metadata)
463 for md in self._metadata)
459 if self._outputs_ready or \
464 if self._outputs_ready or \
460 (timeout >= 0 and time.time() > tic + timeout):
465 (timeout >= 0 and time.time() > tic + timeout):
461 break
466 break
462 time.sleep(0.01)
467 time.sleep(0.01)
463
468
464 @check_ready
469 @check_ready
465 def display_outputs(self, groupby="type"):
470 def display_outputs(self, groupby="type"):
466 """republish the outputs of the computation
471 """republish the outputs of the computation
467
472
468 Parameters
473 Parameters
469 ----------
474 ----------
470
475
471 groupby : str [default: type]
476 groupby : str [default: type]
472 if 'type':
477 if 'type':
473 Group outputs by type (show all stdout, then all stderr, etc.):
478 Group outputs by type (show all stdout, then all stderr, etc.):
474
479
475 [stdout:1] foo
480 [stdout:1] foo
476 [stdout:2] foo
481 [stdout:2] foo
477 [stderr:1] bar
482 [stderr:1] bar
478 [stderr:2] bar
483 [stderr:2] bar
479 if 'engine':
484 if 'engine':
480 Display outputs for each engine before moving on to the next:
485 Display outputs for each engine before moving on to the next:
481
486
482 [stdout:1] foo
487 [stdout:1] foo
483 [stderr:1] bar
488 [stderr:1] bar
484 [stdout:2] foo
489 [stdout:2] foo
485 [stderr:2] bar
490 [stderr:2] bar
486
491
487 if 'order':
492 if 'order':
488 Like 'type', but further collate individual displaypub
493 Like 'type', but further collate individual displaypub
489 outputs. This is meant for cases of each command producing
494 outputs. This is meant for cases of each command producing
490 several plots, and you would like to see all of the first
495 several plots, and you would like to see all of the first
491 plots together, then all of the second plots, and so on.
496 plots together, then all of the second plots, and so on.
492 """
497 """
493 if self._single_result:
498 if self._single_result:
494 self._display_single_result()
499 self._display_single_result()
495 return
500 return
496
501
497 stdouts = self.stdout
502 stdouts = self.stdout
498 stderrs = self.stderr
503 stderrs = self.stderr
499 pyouts = self.pyout
504 pyouts = self.pyout
500 output_lists = self.outputs
505 output_lists = self.outputs
501 results = self.get()
506 results = self.get()
502
507
503 targets = self.engine_id
508 targets = self.engine_id
504
509
505 if groupby == "engine":
510 if groupby == "engine":
506 for eid,stdout,stderr,outputs,r,pyout in zip(
511 for eid,stdout,stderr,outputs,r,pyout in zip(
507 targets, stdouts, stderrs, output_lists, results, pyouts
512 targets, stdouts, stderrs, output_lists, results, pyouts
508 ):
513 ):
509 self._display_stream(stdout, '[stdout:%i] ' % eid)
514 self._display_stream(stdout, '[stdout:%i] ' % eid)
510 self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
515 self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
511
516
512 try:
517 try:
513 get_ipython()
518 get_ipython()
514 except NameError:
519 except NameError:
515 # displaypub is meaningless outside IPython
520 # displaypub is meaningless outside IPython
516 return
521 return
517
522
518 if outputs or pyout is not None:
523 if outputs or pyout is not None:
519 _raw_text('[output:%i]' % eid)
524 _raw_text('[output:%i]' % eid)
520
525
521 for output in outputs:
526 for output in outputs:
522 self._republish_displaypub(output, eid)
527 self._republish_displaypub(output, eid)
523
528
524 if pyout is not None:
529 if pyout is not None:
525 display(r)
530 display(r)
526
531
527 elif groupby in ('type', 'order'):
532 elif groupby in ('type', 'order'):
528 # republish stdout:
533 # republish stdout:
529 for eid,stdout in zip(targets, stdouts):
534 for eid,stdout in zip(targets, stdouts):
530 self._display_stream(stdout, '[stdout:%i] ' % eid)
535 self._display_stream(stdout, '[stdout:%i] ' % eid)
531
536
532 # republish stderr:
537 # republish stderr:
533 for eid,stderr in zip(targets, stderrs):
538 for eid,stderr in zip(targets, stderrs):
534 self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
539 self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
535
540
536 try:
541 try:
537 get_ipython()
542 get_ipython()
538 except NameError:
543 except NameError:
539 # displaypub is meaningless outside IPython
544 # displaypub is meaningless outside IPython
540 return
545 return
541
546
542 if groupby == 'order':
547 if groupby == 'order':
543 output_dict = dict((eid, outputs) for eid,outputs in zip(targets, output_lists))
548 output_dict = dict((eid, outputs) for eid,outputs in zip(targets, output_lists))
544 N = max(len(outputs) for outputs in output_lists)
549 N = max(len(outputs) for outputs in output_lists)
545 for i in range(N):
550 for i in range(N):
546 for eid in targets:
551 for eid in targets:
547 outputs = output_dict[eid]
552 outputs = output_dict[eid]
548 if len(outputs) >= N:
553 if len(outputs) >= N:
549 _raw_text('[output:%i]' % eid)
554 _raw_text('[output:%i]' % eid)
550 self._republish_displaypub(outputs[i], eid)
555 self._republish_displaypub(outputs[i], eid)
551 else:
556 else:
552 # republish displaypub output
557 # republish displaypub output
553 for eid,outputs in zip(targets, output_lists):
558 for eid,outputs in zip(targets, output_lists):
554 if outputs:
559 if outputs:
555 _raw_text('[output:%i]' % eid)
560 _raw_text('[output:%i]' % eid)
556 for output in outputs:
561 for output in outputs:
557 self._republish_displaypub(output, eid)
562 self._republish_displaypub(output, eid)
558
563
559 # finally, add pyout:
564 # finally, add pyout:
560 for eid,r,pyout in zip(targets, results, pyouts):
565 for eid,r,pyout in zip(targets, results, pyouts):
561 if pyout is not None:
566 if pyout is not None:
562 display(r)
567 display(r)
563
568
564 else:
569 else:
565 raise ValueError("groupby must be one of 'type', 'engine', 'collate', not %r" % groupby)
570 raise ValueError("groupby must be one of 'type', 'engine', 'collate', not %r" % groupby)
566
571
567
572
568
573
569
574
570 class AsyncMapResult(AsyncResult):
575 class AsyncMapResult(AsyncResult):
571 """Class for representing results of non-blocking gathers.
576 """Class for representing results of non-blocking gathers.
572
577
573 This will properly reconstruct the gather.
578 This will properly reconstruct the gather.
574
579
575 This class is iterable at any time, and will wait on results as they come.
580 This class is iterable at any time, and will wait on results as they come.
576
581
577 If ordered=False, then the first results to arrive will come first, otherwise
582 If ordered=False, then the first results to arrive will come first, otherwise
578 results will be yielded in the order they were submitted.
583 results will be yielded in the order they were submitted.
579
584
580 """
585 """
581
586
582 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
587 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
583 AsyncResult.__init__(self, client, msg_ids, fname=fname)
588 AsyncResult.__init__(self, client, msg_ids, fname=fname)
584 self._mapObject = mapObject
589 self._mapObject = mapObject
585 self._single_result = False
590 self._single_result = False
586 self.ordered = ordered
591 self.ordered = ordered
587
592
588 def _reconstruct_result(self, res):
593 def _reconstruct_result(self, res):
589 """Perform the gather on the actual results."""
594 """Perform the gather on the actual results."""
590 return self._mapObject.joinPartitions(res)
595 return self._mapObject.joinPartitions(res)
591
596
592 # asynchronous iterator:
597 # asynchronous iterator:
593 def __iter__(self):
598 def __iter__(self):
594 it = self._ordered_iter if self.ordered else self._unordered_iter
599 it = self._ordered_iter if self.ordered else self._unordered_iter
595 for r in it():
600 for r in it():
596 yield r
601 yield r
597
602
598 # asynchronous ordered iterator:
603 # asynchronous ordered iterator:
599 def _ordered_iter(self):
604 def _ordered_iter(self):
600 """iterator for results *as they arrive*, preserving submission order."""
605 """iterator for results *as they arrive*, preserving submission order."""
601 try:
606 try:
602 rlist = self.get(0)
607 rlist = self.get(0)
603 except error.TimeoutError:
608 except error.TimeoutError:
604 # wait for each result individually
609 # wait for each result individually
605 for msg_id in self.msg_ids:
610 for msg_id in self.msg_ids:
606 ar = AsyncResult(self._client, msg_id, self._fname)
611 ar = AsyncResult(self._client, msg_id, self._fname)
607 rlist = ar.get()
612 rlist = ar.get()
608 try:
613 try:
609 for r in rlist:
614 for r in rlist:
610 yield r
615 yield r
611 except TypeError:
616 except TypeError:
612 # flattened, not a list
617 # flattened, not a list
613 # this could get broken by flattened data that returns iterables
618 # this could get broken by flattened data that returns iterables
614 # but most calls to map do not expose the `flatten` argument
619 # but most calls to map do not expose the `flatten` argument
615 yield rlist
620 yield rlist
616 else:
621 else:
617 # already done
622 # already done
618 for r in rlist:
623 for r in rlist:
619 yield r
624 yield r
620
625
621 # asynchronous unordered iterator:
626 # asynchronous unordered iterator:
622 def _unordered_iter(self):
627 def _unordered_iter(self):
623 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
628 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
624 try:
629 try:
625 rlist = self.get(0)
630 rlist = self.get(0)
626 except error.TimeoutError:
631 except error.TimeoutError:
627 pending = set(self.msg_ids)
632 pending = set(self.msg_ids)
628 while pending:
633 while pending:
629 try:
634 try:
630 self._client.wait(pending, 1e-3)
635 self._client.wait(pending, 1e-3)
631 except error.TimeoutError:
636 except error.TimeoutError:
632 # ignore timeout error, because that only means
637 # ignore timeout error, because that only means
633 # *some* jobs are outstanding
638 # *some* jobs are outstanding
634 pass
639 pass
635 # update ready set with those no longer outstanding:
640 # update ready set with those no longer outstanding:
636 ready = pending.difference(self._client.outstanding)
641 ready = pending.difference(self._client.outstanding)
637 # update pending to exclude those that are finished
642 # update pending to exclude those that are finished
638 pending = pending.difference(ready)
643 pending = pending.difference(ready)
639 while ready:
644 while ready:
640 msg_id = ready.pop()
645 msg_id = ready.pop()
641 ar = AsyncResult(self._client, msg_id, self._fname)
646 ar = AsyncResult(self._client, msg_id, self._fname)
642 rlist = ar.get()
647 rlist = ar.get()
643 try:
648 try:
644 for r in rlist:
649 for r in rlist:
645 yield r
650 yield r
646 except TypeError:
651 except TypeError:
647 # flattened, not a list
652 # flattened, not a list
648 # this could get broken by flattened data that returns iterables
653 # this could get broken by flattened data that returns iterables
649 # but most calls to map do not expose the `flatten` argument
654 # but most calls to map do not expose the `flatten` argument
650 yield rlist
655 yield rlist
651 else:
656 else:
652 # already done
657 # already done
653 for r in rlist:
658 for r in rlist:
654 yield r
659 yield r
655
660
656
661
657 class AsyncHubResult(AsyncResult):
662 class AsyncHubResult(AsyncResult):
658 """Class to wrap pending results that must be requested from the Hub.
663 """Class to wrap pending results that must be requested from the Hub.
659
664
660 Note that waiting/polling on these objects requires polling the Hubover the network,
665 Note that waiting/polling on these objects requires polling the Hubover the network,
661 so use `AsyncHubResult.wait()` sparingly.
666 so use `AsyncHubResult.wait()` sparingly.
662 """
667 """
663
668
664 def _wait_for_outputs(self, timeout=-1):
669 def _wait_for_outputs(self, timeout=-1):
665 """no-op, because HubResults are never incomplete"""
670 """no-op, because HubResults are never incomplete"""
666 self._outputs_ready = True
671 self._outputs_ready = True
667
672
668 def wait(self, timeout=-1):
673 def wait(self, timeout=-1):
669 """wait for result to complete."""
674 """wait for result to complete."""
670 start = time.time()
675 start = time.time()
671 if self._ready:
676 if self._ready:
672 return
677 return
673 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
678 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
674 local_ready = self._client.wait(local_ids, timeout)
679 local_ready = self._client.wait(local_ids, timeout)
675 if local_ready:
680 if local_ready:
676 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
681 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
677 if not remote_ids:
682 if not remote_ids:
678 self._ready = True
683 self._ready = True
679 else:
684 else:
680 rdict = self._client.result_status(remote_ids, status_only=False)
685 rdict = self._client.result_status(remote_ids, status_only=False)
681 pending = rdict['pending']
686 pending = rdict['pending']
682 while pending and (timeout < 0 or time.time() < start+timeout):
687 while pending and (timeout < 0 or time.time() < start+timeout):
683 rdict = self._client.result_status(remote_ids, status_only=False)
688 rdict = self._client.result_status(remote_ids, status_only=False)
684 pending = rdict['pending']
689 pending = rdict['pending']
685 if pending:
690 if pending:
686 time.sleep(0.1)
691 time.sleep(0.1)
687 if not pending:
692 if not pending:
688 self._ready = True
693 self._ready = True
689 if self._ready:
694 if self._ready:
690 try:
695 try:
691 results = map(self._client.results.get, self.msg_ids)
696 results = map(self._client.results.get, self.msg_ids)
692 self._result = results
697 self._result = results
693 if self._single_result:
698 if self._single_result:
694 r = results[0]
699 r = results[0]
695 if isinstance(r, Exception):
700 if isinstance(r, Exception):
696 raise r
701 raise r
697 else:
702 else:
698 results = error.collect_exceptions(results, self._fname)
703 results = error.collect_exceptions(results, self._fname)
699 self._result = self._reconstruct_result(results)
704 self._result = self._reconstruct_result(results)
700 except Exception as e:
705 except Exception as e:
701 self._exception = e
706 self._exception = e
702 self._success = False
707 self._success = False
703 else:
708 else:
704 self._success = True
709 self._success = True
705 finally:
710 finally:
706 self._metadata = map(self._client.metadata.get, self.msg_ids)
711 self._metadata = map(self._client.metadata.get, self.msg_ids)
707
712
708 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']
713 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']
General Comments 0
You need to be logged in to leave comments. Login now