##// END OF EJS Templates
avoid double-iteration over engine IDs in get_dict
MinRK -
Show More
@@ -1,713 +1,715 b''
1 """AsyncResult objects for the client
1 """AsyncResult objects for the client
2
2
3 Authors:
3 Authors:
4
4
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import print_function
18 from __future__ import print_function
19
19
20 import sys
20 import sys
21 import time
21 import time
22 from datetime import datetime
22 from datetime import datetime
23
23
24 from zmq import MessageTracker
24 from zmq import MessageTracker
25
25
26 from IPython.core.display import clear_output, display, display_pretty
26 from IPython.core.display import clear_output, display, display_pretty
27 from IPython.external.decorator import decorator
27 from IPython.external.decorator import decorator
28 from IPython.parallel import error
28 from IPython.parallel import error
29
29
30 #-----------------------------------------------------------------------------
30 #-----------------------------------------------------------------------------
31 # Functions
31 # Functions
32 #-----------------------------------------------------------------------------
32 #-----------------------------------------------------------------------------
33
33
34 def _total_seconds(td):
34 def _total_seconds(td):
35 """timedelta.total_seconds was added in 2.7"""
35 """timedelta.total_seconds was added in 2.7"""
36 try:
36 try:
37 # Python >= 2.7
37 # Python >= 2.7
38 return td.total_seconds()
38 return td.total_seconds()
39 except AttributeError:
39 except AttributeError:
40 # Python 2.6
40 # Python 2.6
41 return 1e-6 * (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)
41 return 1e-6 * (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)
42
42
43 def _raw_text(s):
43 def _raw_text(s):
44 display_pretty(s, raw=True)
44 display_pretty(s, raw=True)
45
45
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47 # Classes
47 # Classes
48 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
49
49
50 # global empty tracker that's always done:
50 # global empty tracker that's always done:
51 finished_tracker = MessageTracker()
51 finished_tracker = MessageTracker()
52
52
53 @decorator
53 @decorator
54 def check_ready(f, self, *args, **kwargs):
54 def check_ready(f, self, *args, **kwargs):
55 """Call spin() to sync state prior to calling the method."""
55 """Call spin() to sync state prior to calling the method."""
56 self.wait(0)
56 self.wait(0)
57 if not self._ready:
57 if not self._ready:
58 raise error.TimeoutError("result not ready")
58 raise error.TimeoutError("result not ready")
59 return f(self, *args, **kwargs)
59 return f(self, *args, **kwargs)
60
60
61 class AsyncResult(object):
61 class AsyncResult(object):
62 """Class for representing results of non-blocking calls.
62 """Class for representing results of non-blocking calls.
63
63
64 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
64 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
65 """
65 """
66
66
67 msg_ids = None
67 msg_ids = None
68 _targets = None
68 _targets = None
69 _tracker = None
69 _tracker = None
70 _single_result = False
70 _single_result = False
71
71
72 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
72 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
73 if isinstance(msg_ids, basestring):
73 if isinstance(msg_ids, basestring):
74 # always a list
74 # always a list
75 msg_ids = [msg_ids]
75 msg_ids = [msg_ids]
76 if tracker is None:
76 if tracker is None:
77 # default to always done
77 # default to always done
78 tracker = finished_tracker
78 tracker = finished_tracker
79 self._client = client
79 self._client = client
80 self.msg_ids = msg_ids
80 self.msg_ids = msg_ids
81 self._fname=fname
81 self._fname=fname
82 self._targets = targets
82 self._targets = targets
83 self._tracker = tracker
83 self._tracker = tracker
84 self._ready = False
84 self._ready = False
85 self._outputs_ready = False
85 self._outputs_ready = False
86 self._success = None
86 self._success = None
87 self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ]
87 self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ]
88 if len(msg_ids) == 1:
88 if len(msg_ids) == 1:
89 self._single_result = not isinstance(targets, (list, tuple))
89 self._single_result = not isinstance(targets, (list, tuple))
90 else:
90 else:
91 self._single_result = False
91 self._single_result = False
92
92
93 def __repr__(self):
93 def __repr__(self):
94 if self._ready:
94 if self._ready:
95 return "<%s: finished>"%(self.__class__.__name__)
95 return "<%s: finished>"%(self.__class__.__name__)
96 else:
96 else:
97 return "<%s: %s>"%(self.__class__.__name__,self._fname)
97 return "<%s: %s>"%(self.__class__.__name__,self._fname)
98
98
99
99
100 def _reconstruct_result(self, res):
100 def _reconstruct_result(self, res):
101 """Reconstruct our result from actual result list (always a list)
101 """Reconstruct our result from actual result list (always a list)
102
102
103 Override me in subclasses for turning a list of results
103 Override me in subclasses for turning a list of results
104 into the expected form.
104 into the expected form.
105 """
105 """
106 if self._single_result:
106 if self._single_result:
107 return res[0]
107 return res[0]
108 else:
108 else:
109 return res
109 return res
110
110
111 def get(self, timeout=-1):
111 def get(self, timeout=-1):
112 """Return the result when it arrives.
112 """Return the result when it arrives.
113
113
114 If `timeout` is not ``None`` and the result does not arrive within
114 If `timeout` is not ``None`` and the result does not arrive within
115 `timeout` seconds then ``TimeoutError`` is raised. If the
115 `timeout` seconds then ``TimeoutError`` is raised. If the
116 remote call raised an exception then that exception will be reraised
116 remote call raised an exception then that exception will be reraised
117 by get() inside a `RemoteError`.
117 by get() inside a `RemoteError`.
118 """
118 """
119 if not self.ready():
119 if not self.ready():
120 self.wait(timeout)
120 self.wait(timeout)
121
121
122 if self._ready:
122 if self._ready:
123 if self._success:
123 if self._success:
124 return self._result
124 return self._result
125 else:
125 else:
126 raise self._exception
126 raise self._exception
127 else:
127 else:
128 raise error.TimeoutError("Result not ready.")
128 raise error.TimeoutError("Result not ready.")
129
129
130 def _check_ready(self):
130 def _check_ready(self):
131 if not self.ready():
131 if not self.ready():
132 raise error.TimeoutError("Result not ready.")
132 raise error.TimeoutError("Result not ready.")
133
133
134 def ready(self):
134 def ready(self):
135 """Return whether the call has completed."""
135 """Return whether the call has completed."""
136 if not self._ready:
136 if not self._ready:
137 self.wait(0)
137 self.wait(0)
138 elif not self._outputs_ready:
138 elif not self._outputs_ready:
139 self._wait_for_outputs(0)
139 self._wait_for_outputs(0)
140
140
141 return self._ready
141 return self._ready
142
142
143 def wait(self, timeout=-1):
143 def wait(self, timeout=-1):
144 """Wait until the result is available or until `timeout` seconds pass.
144 """Wait until the result is available or until `timeout` seconds pass.
145
145
146 This method always returns None.
146 This method always returns None.
147 """
147 """
148 if self._ready:
148 if self._ready:
149 self._wait_for_outputs(timeout)
149 self._wait_for_outputs(timeout)
150 return
150 return
151 self._ready = self._client.wait(self.msg_ids, timeout)
151 self._ready = self._client.wait(self.msg_ids, timeout)
152 if self._ready:
152 if self._ready:
153 try:
153 try:
154 results = map(self._client.results.get, self.msg_ids)
154 results = map(self._client.results.get, self.msg_ids)
155 self._result = results
155 self._result = results
156 if self._single_result:
156 if self._single_result:
157 r = results[0]
157 r = results[0]
158 if isinstance(r, Exception):
158 if isinstance(r, Exception):
159 raise r
159 raise r
160 else:
160 else:
161 results = error.collect_exceptions(results, self._fname)
161 results = error.collect_exceptions(results, self._fname)
162 self._result = self._reconstruct_result(results)
162 self._result = self._reconstruct_result(results)
163 except Exception as e:
163 except Exception as e:
164 self._exception = e
164 self._exception = e
165 self._success = False
165 self._success = False
166 else:
166 else:
167 self._success = True
167 self._success = True
168 finally:
168 finally:
169 if timeout is None or timeout < 0:
169 if timeout is None or timeout < 0:
170 # cutoff infinite wait at 10s
170 # cutoff infinite wait at 10s
171 timeout = 10
171 timeout = 10
172 self._wait_for_outputs(timeout)
172 self._wait_for_outputs(timeout)
173
173
174
174
175 def successful(self):
175 def successful(self):
176 """Return whether the call completed without raising an exception.
176 """Return whether the call completed without raising an exception.
177
177
178 Will raise ``AssertionError`` if the result is not ready.
178 Will raise ``AssertionError`` if the result is not ready.
179 """
179 """
180 assert self.ready()
180 assert self.ready()
181 return self._success
181 return self._success
182
182
183 #----------------------------------------------------------------
183 #----------------------------------------------------------------
184 # Extra methods not in mp.pool.AsyncResult
184 # Extra methods not in mp.pool.AsyncResult
185 #----------------------------------------------------------------
185 #----------------------------------------------------------------
186
186
187 def get_dict(self, timeout=-1):
187 def get_dict(self, timeout=-1):
188 """Get the results as a dict, keyed by engine_id.
188 """Get the results as a dict, keyed by engine_id.
189
189
190 timeout behavior is described in `get()`.
190 timeout behavior is described in `get()`.
191 """
191 """
192
192
193 results = self.get(timeout)
193 results = self.get(timeout)
194 if self._single_result:
194 if self._single_result:
195 results = [results]
195 results = [results]
196 engine_ids = [ md['engine_id'] for md in self._metadata ]
196 engine_ids = [ md['engine_id'] for md in self._metadata ]
197
197
198 seen = set()
198
199 for engine_id in engine_ids:
199 rdict = {}
200 if engine_id in seen:
200 for engine_id, result in zip(engine_ids, results):
201 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
201 if engine_id in rdict:
202 engine_ids.count(engine_id), engine_id))
202 raise ValueError("Cannot build dict, %i jobs ran on engine #%i" % (
203 engine_ids.count(engine_id), engine_id)
204 )
203 else:
205 else:
204 seen.add(engine_id)
206 rdict[engine_id] = result
205
207
206 return dict(zip(engine_ids,results))
208 return rdict
207
209
208 @property
210 @property
209 def result(self):
211 def result(self):
210 """result property wrapper for `get(timeout=-1)`."""
212 """result property wrapper for `get(timeout=-1)`."""
211 return self.get()
213 return self.get()
212
214
213 # abbreviated alias:
215 # abbreviated alias:
214 r = result
216 r = result
215
217
216 @property
218 @property
217 def metadata(self):
219 def metadata(self):
218 """property for accessing execution metadata."""
220 """property for accessing execution metadata."""
219 if self._single_result:
221 if self._single_result:
220 return self._metadata[0]
222 return self._metadata[0]
221 else:
223 else:
222 return self._metadata
224 return self._metadata
223
225
224 @property
226 @property
225 def result_dict(self):
227 def result_dict(self):
226 """result property as a dict."""
228 """result property as a dict."""
227 return self.get_dict()
229 return self.get_dict()
228
230
229 def __dict__(self):
231 def __dict__(self):
230 return self.get_dict(0)
232 return self.get_dict(0)
231
233
232 def abort(self):
234 def abort(self):
233 """abort my tasks."""
235 """abort my tasks."""
234 assert not self.ready(), "Can't abort, I am already done!"
236 assert not self.ready(), "Can't abort, I am already done!"
235 return self._client.abort(self.msg_ids, targets=self._targets, block=True)
237 return self._client.abort(self.msg_ids, targets=self._targets, block=True)
236
238
237 @property
239 @property
238 def sent(self):
240 def sent(self):
239 """check whether my messages have been sent."""
241 """check whether my messages have been sent."""
240 return self._tracker.done
242 return self._tracker.done
241
243
242 def wait_for_send(self, timeout=-1):
244 def wait_for_send(self, timeout=-1):
243 """wait for pyzmq send to complete.
245 """wait for pyzmq send to complete.
244
246
245 This is necessary when sending arrays that you intend to edit in-place.
247 This is necessary when sending arrays that you intend to edit in-place.
246 `timeout` is in seconds, and will raise TimeoutError if it is reached
248 `timeout` is in seconds, and will raise TimeoutError if it is reached
247 before the send completes.
249 before the send completes.
248 """
250 """
249 return self._tracker.wait(timeout)
251 return self._tracker.wait(timeout)
250
252
251 #-------------------------------------
253 #-------------------------------------
252 # dict-access
254 # dict-access
253 #-------------------------------------
255 #-------------------------------------
254
256
255 def __getitem__(self, key):
257 def __getitem__(self, key):
256 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
258 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
257 """
259 """
258 if isinstance(key, int):
260 if isinstance(key, int):
259 self._check_ready()
261 self._check_ready()
260 return error.collect_exceptions([self._result[key]], self._fname)[0]
262 return error.collect_exceptions([self._result[key]], self._fname)[0]
261 elif isinstance(key, slice):
263 elif isinstance(key, slice):
262 self._check_ready()
264 self._check_ready()
263 return error.collect_exceptions(self._result[key], self._fname)
265 return error.collect_exceptions(self._result[key], self._fname)
264 elif isinstance(key, basestring):
266 elif isinstance(key, basestring):
265 # metadata proxy *does not* require that results are done
267 # metadata proxy *does not* require that results are done
266 self.wait(0)
268 self.wait(0)
267 values = [ md[key] for md in self._metadata ]
269 values = [ md[key] for md in self._metadata ]
268 if self._single_result:
270 if self._single_result:
269 return values[0]
271 return values[0]
270 else:
272 else:
271 return values
273 return values
272 else:
274 else:
273 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
275 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
274
276
275 def __getattr__(self, key):
277 def __getattr__(self, key):
276 """getattr maps to getitem for convenient attr access to metadata."""
278 """getattr maps to getitem for convenient attr access to metadata."""
277 try:
279 try:
278 return self.__getitem__(key)
280 return self.__getitem__(key)
279 except (error.TimeoutError, KeyError):
281 except (error.TimeoutError, KeyError):
280 raise AttributeError("%r object has no attribute %r"%(
282 raise AttributeError("%r object has no attribute %r"%(
281 self.__class__.__name__, key))
283 self.__class__.__name__, key))
282
284
283 # asynchronous iterator:
285 # asynchronous iterator:
284 def __iter__(self):
286 def __iter__(self):
285 if self._single_result:
287 if self._single_result:
286 raise TypeError("AsyncResults with a single result are not iterable.")
288 raise TypeError("AsyncResults with a single result are not iterable.")
287 try:
289 try:
288 rlist = self.get(0)
290 rlist = self.get(0)
289 except error.TimeoutError:
291 except error.TimeoutError:
290 # wait for each result individually
292 # wait for each result individually
291 for msg_id in self.msg_ids:
293 for msg_id in self.msg_ids:
292 ar = AsyncResult(self._client, msg_id, self._fname)
294 ar = AsyncResult(self._client, msg_id, self._fname)
293 yield ar.get()
295 yield ar.get()
294 else:
296 else:
295 # already done
297 # already done
296 for r in rlist:
298 for r in rlist:
297 yield r
299 yield r
298
300
299 def __len__(self):
301 def __len__(self):
300 return len(self.msg_ids)
302 return len(self.msg_ids)
301
303
302 #-------------------------------------
304 #-------------------------------------
303 # Sugar methods and attributes
305 # Sugar methods and attributes
304 #-------------------------------------
306 #-------------------------------------
305
307
306 def timedelta(self, start, end, start_key=min, end_key=max):
308 def timedelta(self, start, end, start_key=min, end_key=max):
307 """compute the difference between two sets of timestamps
309 """compute the difference between two sets of timestamps
308
310
309 The default behavior is to use the earliest of the first
311 The default behavior is to use the earliest of the first
310 and the latest of the second list, but this can be changed
312 and the latest of the second list, but this can be changed
311 by passing a different
313 by passing a different
312
314
313 Parameters
315 Parameters
314 ----------
316 ----------
315
317
316 start : one or more datetime objects (e.g. ar.submitted)
318 start : one or more datetime objects (e.g. ar.submitted)
317 end : one or more datetime objects (e.g. ar.received)
319 end : one or more datetime objects (e.g. ar.received)
318 start_key : callable
320 start_key : callable
319 Function to call on `start` to extract the relevant
321 Function to call on `start` to extract the relevant
320 entry [defalt: min]
322 entry [defalt: min]
321 end_key : callable
323 end_key : callable
322 Function to call on `end` to extract the relevant
324 Function to call on `end` to extract the relevant
323 entry [default: max]
325 entry [default: max]
324
326
325 Returns
327 Returns
326 -------
328 -------
327
329
328 dt : float
330 dt : float
329 The time elapsed (in seconds) between the two selected timestamps.
331 The time elapsed (in seconds) between the two selected timestamps.
330 """
332 """
331 if not isinstance(start, datetime):
333 if not isinstance(start, datetime):
332 # handle single_result AsyncResults, where ar.stamp is single object,
334 # handle single_result AsyncResults, where ar.stamp is single object,
333 # not a list
335 # not a list
334 start = start_key(start)
336 start = start_key(start)
335 if not isinstance(end, datetime):
337 if not isinstance(end, datetime):
336 # handle single_result AsyncResults, where ar.stamp is single object,
338 # handle single_result AsyncResults, where ar.stamp is single object,
337 # not a list
339 # not a list
338 end = end_key(end)
340 end = end_key(end)
339 return _total_seconds(end - start)
341 return _total_seconds(end - start)
340
342
341 @property
343 @property
342 def progress(self):
344 def progress(self):
343 """the number of tasks which have been completed at this point.
345 """the number of tasks which have been completed at this point.
344
346
345 Fractional progress would be given by 1.0 * ar.progress / len(ar)
347 Fractional progress would be given by 1.0 * ar.progress / len(ar)
346 """
348 """
347 self.wait(0)
349 self.wait(0)
348 return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding))
350 return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding))
349
351
350 @property
352 @property
351 def elapsed(self):
353 def elapsed(self):
352 """elapsed time since initial submission"""
354 """elapsed time since initial submission"""
353 if self.ready():
355 if self.ready():
354 return self.wall_time
356 return self.wall_time
355
357
356 now = submitted = datetime.now()
358 now = submitted = datetime.now()
357 for msg_id in self.msg_ids:
359 for msg_id in self.msg_ids:
358 if msg_id in self._client.metadata:
360 if msg_id in self._client.metadata:
359 stamp = self._client.metadata[msg_id]['submitted']
361 stamp = self._client.metadata[msg_id]['submitted']
360 if stamp and stamp < submitted:
362 if stamp and stamp < submitted:
361 submitted = stamp
363 submitted = stamp
362 return _total_seconds(now-submitted)
364 return _total_seconds(now-submitted)
363
365
364 @property
366 @property
365 @check_ready
367 @check_ready
366 def serial_time(self):
368 def serial_time(self):
367 """serial computation time of a parallel calculation
369 """serial computation time of a parallel calculation
368
370
369 Computed as the sum of (completed-started) of each task
371 Computed as the sum of (completed-started) of each task
370 """
372 """
371 t = 0
373 t = 0
372 for md in self._metadata:
374 for md in self._metadata:
373 t += _total_seconds(md['completed'] - md['started'])
375 t += _total_seconds(md['completed'] - md['started'])
374 return t
376 return t
375
377
376 @property
378 @property
377 @check_ready
379 @check_ready
378 def wall_time(self):
380 def wall_time(self):
379 """actual computation time of a parallel calculation
381 """actual computation time of a parallel calculation
380
382
381 Computed as the time between the latest `received` stamp
383 Computed as the time between the latest `received` stamp
382 and the earliest `submitted`.
384 and the earliest `submitted`.
383
385
384 Only reliable if Client was spinning/waiting when the task finished, because
386 Only reliable if Client was spinning/waiting when the task finished, because
385 the `received` timestamp is created when a result is pulled off of the zmq queue,
387 the `received` timestamp is created when a result is pulled off of the zmq queue,
386 which happens as a result of `client.spin()`.
388 which happens as a result of `client.spin()`.
387
389
388 For similar comparison of other timestamp pairs, check out AsyncResult.timedelta.
390 For similar comparison of other timestamp pairs, check out AsyncResult.timedelta.
389
391
390 """
392 """
391 return self.timedelta(self.submitted, self.received)
393 return self.timedelta(self.submitted, self.received)
392
394
393 def wait_interactive(self, interval=1., timeout=-1):
395 def wait_interactive(self, interval=1., timeout=-1):
394 """interactive wait, printing progress at regular intervals"""
396 """interactive wait, printing progress at regular intervals"""
395 if timeout is None:
397 if timeout is None:
396 timeout = -1
398 timeout = -1
397 N = len(self)
399 N = len(self)
398 tic = time.time()
400 tic = time.time()
399 while not self.ready() and (timeout < 0 or time.time() - tic <= timeout):
401 while not self.ready() and (timeout < 0 or time.time() - tic <= timeout):
400 self.wait(interval)
402 self.wait(interval)
401 clear_output()
403 clear_output()
402 print("%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed), end="")
404 print("%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed), end="")
403 sys.stdout.flush()
405 sys.stdout.flush()
404 print()
406 print()
405 print("done")
407 print("done")
406
408
407 def _republish_displaypub(self, content, eid):
409 def _republish_displaypub(self, content, eid):
408 """republish individual displaypub content dicts"""
410 """republish individual displaypub content dicts"""
409 try:
411 try:
410 ip = get_ipython()
412 ip = get_ipython()
411 except NameError:
413 except NameError:
412 # displaypub is meaningless outside IPython
414 # displaypub is meaningless outside IPython
413 return
415 return
414 md = content['metadata'] or {}
416 md = content['metadata'] or {}
415 md['engine'] = eid
417 md['engine'] = eid
416 ip.display_pub.publish(content['source'], content['data'], md)
418 ip.display_pub.publish(content['source'], content['data'], md)
417
419
418 def _display_stream(self, text, prefix='', file=None):
420 def _display_stream(self, text, prefix='', file=None):
419 if not text:
421 if not text:
420 # nothing to display
422 # nothing to display
421 return
423 return
422 if file is None:
424 if file is None:
423 file = sys.stdout
425 file = sys.stdout
424 end = '' if text.endswith('\n') else '\n'
426 end = '' if text.endswith('\n') else '\n'
425
427
426 multiline = text.count('\n') > int(text.endswith('\n'))
428 multiline = text.count('\n') > int(text.endswith('\n'))
427 if prefix and multiline and not text.startswith('\n'):
429 if prefix and multiline and not text.startswith('\n'):
428 prefix = prefix + '\n'
430 prefix = prefix + '\n'
429 print("%s%s" % (prefix, text), file=file, end=end)
431 print("%s%s" % (prefix, text), file=file, end=end)
430
432
431
433
432 def _display_single_result(self):
434 def _display_single_result(self):
433 self._display_stream(self.stdout)
435 self._display_stream(self.stdout)
434 self._display_stream(self.stderr, file=sys.stderr)
436 self._display_stream(self.stderr, file=sys.stderr)
435
437
436 try:
438 try:
437 get_ipython()
439 get_ipython()
438 except NameError:
440 except NameError:
439 # displaypub is meaningless outside IPython
441 # displaypub is meaningless outside IPython
440 return
442 return
441
443
442 for output in self.outputs:
444 for output in self.outputs:
443 self._republish_displaypub(output, self.engine_id)
445 self._republish_displaypub(output, self.engine_id)
444
446
445 if self.pyout is not None:
447 if self.pyout is not None:
446 display(self.get())
448 display(self.get())
447
449
448 def _wait_for_outputs(self, timeout=-1):
450 def _wait_for_outputs(self, timeout=-1):
449 """wait for the 'status=idle' message that indicates we have all outputs
451 """wait for the 'status=idle' message that indicates we have all outputs
450 """
452 """
451 if self._outputs_ready or not self._success:
453 if self._outputs_ready or not self._success:
452 # don't wait on errors
454 # don't wait on errors
453 return
455 return
454
456
455 # cast None to -1 for infinite timeout
457 # cast None to -1 for infinite timeout
456 if timeout is None:
458 if timeout is None:
457 timeout = -1
459 timeout = -1
458
460
459 tic = time.time()
461 tic = time.time()
460 while True:
462 while True:
461 self._client._flush_iopub(self._client._iopub_socket)
463 self._client._flush_iopub(self._client._iopub_socket)
462 self._outputs_ready = all(md['outputs_ready']
464 self._outputs_ready = all(md['outputs_ready']
463 for md in self._metadata)
465 for md in self._metadata)
464 if self._outputs_ready or \
466 if self._outputs_ready or \
465 (timeout >= 0 and time.time() > tic + timeout):
467 (timeout >= 0 and time.time() > tic + timeout):
466 break
468 break
467 time.sleep(0.01)
469 time.sleep(0.01)
468
470
469 @check_ready
471 @check_ready
470 def display_outputs(self, groupby="type"):
472 def display_outputs(self, groupby="type"):
471 """republish the outputs of the computation
473 """republish the outputs of the computation
472
474
473 Parameters
475 Parameters
474 ----------
476 ----------
475
477
476 groupby : str [default: type]
478 groupby : str [default: type]
477 if 'type':
479 if 'type':
478 Group outputs by type (show all stdout, then all stderr, etc.):
480 Group outputs by type (show all stdout, then all stderr, etc.):
479
481
480 [stdout:1] foo
482 [stdout:1] foo
481 [stdout:2] foo
483 [stdout:2] foo
482 [stderr:1] bar
484 [stderr:1] bar
483 [stderr:2] bar
485 [stderr:2] bar
484 if 'engine':
486 if 'engine':
485 Display outputs for each engine before moving on to the next:
487 Display outputs for each engine before moving on to the next:
486
488
487 [stdout:1] foo
489 [stdout:1] foo
488 [stderr:1] bar
490 [stderr:1] bar
489 [stdout:2] foo
491 [stdout:2] foo
490 [stderr:2] bar
492 [stderr:2] bar
491
493
492 if 'order':
494 if 'order':
493 Like 'type', but further collate individual displaypub
495 Like 'type', but further collate individual displaypub
494 outputs. This is meant for cases of each command producing
496 outputs. This is meant for cases of each command producing
495 several plots, and you would like to see all of the first
497 several plots, and you would like to see all of the first
496 plots together, then all of the second plots, and so on.
498 plots together, then all of the second plots, and so on.
497 """
499 """
498 if self._single_result:
500 if self._single_result:
499 self._display_single_result()
501 self._display_single_result()
500 return
502 return
501
503
502 stdouts = self.stdout
504 stdouts = self.stdout
503 stderrs = self.stderr
505 stderrs = self.stderr
504 pyouts = self.pyout
506 pyouts = self.pyout
505 output_lists = self.outputs
507 output_lists = self.outputs
506 results = self.get()
508 results = self.get()
507
509
508 targets = self.engine_id
510 targets = self.engine_id
509
511
510 if groupby == "engine":
512 if groupby == "engine":
511 for eid,stdout,stderr,outputs,r,pyout in zip(
513 for eid,stdout,stderr,outputs,r,pyout in zip(
512 targets, stdouts, stderrs, output_lists, results, pyouts
514 targets, stdouts, stderrs, output_lists, results, pyouts
513 ):
515 ):
514 self._display_stream(stdout, '[stdout:%i] ' % eid)
516 self._display_stream(stdout, '[stdout:%i] ' % eid)
515 self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
517 self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
516
518
517 try:
519 try:
518 get_ipython()
520 get_ipython()
519 except NameError:
521 except NameError:
520 # displaypub is meaningless outside IPython
522 # displaypub is meaningless outside IPython
521 return
523 return
522
524
523 if outputs or pyout is not None:
525 if outputs or pyout is not None:
524 _raw_text('[output:%i]' % eid)
526 _raw_text('[output:%i]' % eid)
525
527
526 for output in outputs:
528 for output in outputs:
527 self._republish_displaypub(output, eid)
529 self._republish_displaypub(output, eid)
528
530
529 if pyout is not None:
531 if pyout is not None:
530 display(r)
532 display(r)
531
533
532 elif groupby in ('type', 'order'):
534 elif groupby in ('type', 'order'):
533 # republish stdout:
535 # republish stdout:
534 for eid,stdout in zip(targets, stdouts):
536 for eid,stdout in zip(targets, stdouts):
535 self._display_stream(stdout, '[stdout:%i] ' % eid)
537 self._display_stream(stdout, '[stdout:%i] ' % eid)
536
538
537 # republish stderr:
539 # republish stderr:
538 for eid,stderr in zip(targets, stderrs):
540 for eid,stderr in zip(targets, stderrs):
539 self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
541 self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
540
542
541 try:
543 try:
542 get_ipython()
544 get_ipython()
543 except NameError:
545 except NameError:
544 # displaypub is meaningless outside IPython
546 # displaypub is meaningless outside IPython
545 return
547 return
546
548
547 if groupby == 'order':
549 if groupby == 'order':
548 output_dict = dict((eid, outputs) for eid,outputs in zip(targets, output_lists))
550 output_dict = dict((eid, outputs) for eid,outputs in zip(targets, output_lists))
549 N = max(len(outputs) for outputs in output_lists)
551 N = max(len(outputs) for outputs in output_lists)
550 for i in range(N):
552 for i in range(N):
551 for eid in targets:
553 for eid in targets:
552 outputs = output_dict[eid]
554 outputs = output_dict[eid]
553 if len(outputs) >= N:
555 if len(outputs) >= N:
554 _raw_text('[output:%i]' % eid)
556 _raw_text('[output:%i]' % eid)
555 self._republish_displaypub(outputs[i], eid)
557 self._republish_displaypub(outputs[i], eid)
556 else:
558 else:
557 # republish displaypub output
559 # republish displaypub output
558 for eid,outputs in zip(targets, output_lists):
560 for eid,outputs in zip(targets, output_lists):
559 if outputs:
561 if outputs:
560 _raw_text('[output:%i]' % eid)
562 _raw_text('[output:%i]' % eid)
561 for output in outputs:
563 for output in outputs:
562 self._republish_displaypub(output, eid)
564 self._republish_displaypub(output, eid)
563
565
564 # finally, add pyout:
566 # finally, add pyout:
565 for eid,r,pyout in zip(targets, results, pyouts):
567 for eid,r,pyout in zip(targets, results, pyouts):
566 if pyout is not None:
568 if pyout is not None:
567 display(r)
569 display(r)
568
570
569 else:
571 else:
570 raise ValueError("groupby must be one of 'type', 'engine', 'collate', not %r" % groupby)
572 raise ValueError("groupby must be one of 'type', 'engine', 'collate', not %r" % groupby)
571
573
572
574
573
575
574
576
575 class AsyncMapResult(AsyncResult):
577 class AsyncMapResult(AsyncResult):
576 """Class for representing results of non-blocking gathers.
578 """Class for representing results of non-blocking gathers.
577
579
578 This will properly reconstruct the gather.
580 This will properly reconstruct the gather.
579
581
580 This class is iterable at any time, and will wait on results as they come.
582 This class is iterable at any time, and will wait on results as they come.
581
583
582 If ordered=False, then the first results to arrive will come first, otherwise
584 If ordered=False, then the first results to arrive will come first, otherwise
583 results will be yielded in the order they were submitted.
585 results will be yielded in the order they were submitted.
584
586
585 """
587 """
586
588
587 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
589 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
588 AsyncResult.__init__(self, client, msg_ids, fname=fname)
590 AsyncResult.__init__(self, client, msg_ids, fname=fname)
589 self._mapObject = mapObject
591 self._mapObject = mapObject
590 self._single_result = False
592 self._single_result = False
591 self.ordered = ordered
593 self.ordered = ordered
592
594
593 def _reconstruct_result(self, res):
595 def _reconstruct_result(self, res):
594 """Perform the gather on the actual results."""
596 """Perform the gather on the actual results."""
595 return self._mapObject.joinPartitions(res)
597 return self._mapObject.joinPartitions(res)
596
598
597 # asynchronous iterator:
599 # asynchronous iterator:
598 def __iter__(self):
600 def __iter__(self):
599 it = self._ordered_iter if self.ordered else self._unordered_iter
601 it = self._ordered_iter if self.ordered else self._unordered_iter
600 for r in it():
602 for r in it():
601 yield r
603 yield r
602
604
603 # asynchronous ordered iterator:
605 # asynchronous ordered iterator:
604 def _ordered_iter(self):
606 def _ordered_iter(self):
605 """iterator for results *as they arrive*, preserving submission order."""
607 """iterator for results *as they arrive*, preserving submission order."""
606 try:
608 try:
607 rlist = self.get(0)
609 rlist = self.get(0)
608 except error.TimeoutError:
610 except error.TimeoutError:
609 # wait for each result individually
611 # wait for each result individually
610 for msg_id in self.msg_ids:
612 for msg_id in self.msg_ids:
611 ar = AsyncResult(self._client, msg_id, self._fname)
613 ar = AsyncResult(self._client, msg_id, self._fname)
612 rlist = ar.get()
614 rlist = ar.get()
613 try:
615 try:
614 for r in rlist:
616 for r in rlist:
615 yield r
617 yield r
616 except TypeError:
618 except TypeError:
617 # flattened, not a list
619 # flattened, not a list
618 # this could get broken by flattened data that returns iterables
620 # this could get broken by flattened data that returns iterables
619 # but most calls to map do not expose the `flatten` argument
621 # but most calls to map do not expose the `flatten` argument
620 yield rlist
622 yield rlist
621 else:
623 else:
622 # already done
624 # already done
623 for r in rlist:
625 for r in rlist:
624 yield r
626 yield r
625
627
626 # asynchronous unordered iterator:
628 # asynchronous unordered iterator:
627 def _unordered_iter(self):
629 def _unordered_iter(self):
628 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
630 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
629 try:
631 try:
630 rlist = self.get(0)
632 rlist = self.get(0)
631 except error.TimeoutError:
633 except error.TimeoutError:
632 pending = set(self.msg_ids)
634 pending = set(self.msg_ids)
633 while pending:
635 while pending:
634 try:
636 try:
635 self._client.wait(pending, 1e-3)
637 self._client.wait(pending, 1e-3)
636 except error.TimeoutError:
638 except error.TimeoutError:
637 # ignore timeout error, because that only means
639 # ignore timeout error, because that only means
638 # *some* jobs are outstanding
640 # *some* jobs are outstanding
639 pass
641 pass
640 # update ready set with those no longer outstanding:
642 # update ready set with those no longer outstanding:
641 ready = pending.difference(self._client.outstanding)
643 ready = pending.difference(self._client.outstanding)
642 # update pending to exclude those that are finished
644 # update pending to exclude those that are finished
643 pending = pending.difference(ready)
645 pending = pending.difference(ready)
644 while ready:
646 while ready:
645 msg_id = ready.pop()
647 msg_id = ready.pop()
646 ar = AsyncResult(self._client, msg_id, self._fname)
648 ar = AsyncResult(self._client, msg_id, self._fname)
647 rlist = ar.get()
649 rlist = ar.get()
648 try:
650 try:
649 for r in rlist:
651 for r in rlist:
650 yield r
652 yield r
651 except TypeError:
653 except TypeError:
652 # flattened, not a list
654 # flattened, not a list
653 # this could get broken by flattened data that returns iterables
655 # this could get broken by flattened data that returns iterables
654 # but most calls to map do not expose the `flatten` argument
656 # but most calls to map do not expose the `flatten` argument
655 yield rlist
657 yield rlist
656 else:
658 else:
657 # already done
659 # already done
658 for r in rlist:
660 for r in rlist:
659 yield r
661 yield r
660
662
661
663
662 class AsyncHubResult(AsyncResult):
664 class AsyncHubResult(AsyncResult):
663 """Class to wrap pending results that must be requested from the Hub.
665 """Class to wrap pending results that must be requested from the Hub.
664
666
665 Note that waiting/polling on these objects requires polling the Hubover the network,
667 Note that waiting/polling on these objects requires polling the Hubover the network,
666 so use `AsyncHubResult.wait()` sparingly.
668 so use `AsyncHubResult.wait()` sparingly.
667 """
669 """
668
670
669 def _wait_for_outputs(self, timeout=-1):
671 def _wait_for_outputs(self, timeout=-1):
670 """no-op, because HubResults are never incomplete"""
672 """no-op, because HubResults are never incomplete"""
671 self._outputs_ready = True
673 self._outputs_ready = True
672
674
673 def wait(self, timeout=-1):
675 def wait(self, timeout=-1):
674 """wait for result to complete."""
676 """wait for result to complete."""
675 start = time.time()
677 start = time.time()
676 if self._ready:
678 if self._ready:
677 return
679 return
678 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
680 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
679 local_ready = self._client.wait(local_ids, timeout)
681 local_ready = self._client.wait(local_ids, timeout)
680 if local_ready:
682 if local_ready:
681 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
683 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
682 if not remote_ids:
684 if not remote_ids:
683 self._ready = True
685 self._ready = True
684 else:
686 else:
685 rdict = self._client.result_status(remote_ids, status_only=False)
687 rdict = self._client.result_status(remote_ids, status_only=False)
686 pending = rdict['pending']
688 pending = rdict['pending']
687 while pending and (timeout < 0 or time.time() < start+timeout):
689 while pending and (timeout < 0 or time.time() < start+timeout):
688 rdict = self._client.result_status(remote_ids, status_only=False)
690 rdict = self._client.result_status(remote_ids, status_only=False)
689 pending = rdict['pending']
691 pending = rdict['pending']
690 if pending:
692 if pending:
691 time.sleep(0.1)
693 time.sleep(0.1)
692 if not pending:
694 if not pending:
693 self._ready = True
695 self._ready = True
694 if self._ready:
696 if self._ready:
695 try:
697 try:
696 results = map(self._client.results.get, self.msg_ids)
698 results = map(self._client.results.get, self.msg_ids)
697 self._result = results
699 self._result = results
698 if self._single_result:
700 if self._single_result:
699 r = results[0]
701 r = results[0]
700 if isinstance(r, Exception):
702 if isinstance(r, Exception):
701 raise r
703 raise r
702 else:
704 else:
703 results = error.collect_exceptions(results, self._fname)
705 results = error.collect_exceptions(results, self._fname)
704 self._result = self._reconstruct_result(results)
706 self._result = self._reconstruct_result(results)
705 except Exception as e:
707 except Exception as e:
706 self._exception = e
708 self._exception = e
707 self._success = False
709 self._success = False
708 else:
710 else:
709 self._success = True
711 self._success = True
710 finally:
712 finally:
711 self._metadata = map(self._client.metadata.get, self.msg_ids)
713 self._metadata = map(self._client.metadata.get, self.msg_ids)
712
714
713 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']
715 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']
General Comments 0
You need to be logged in to leave comments. Login now