##// END OF EJS Templates
Merge pull request #2255 from minrk/arwait...
Fernando Perez -
r8149:99eddce3 merge
parent child Browse files
Show More
@@ -1,690 +1,708
1 """AsyncResult objects for the client
1 """AsyncResult objects for the client
2
2
3 Authors:
3 Authors:
4
4
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import print_function
18 from __future__ import print_function
19
19
20 import sys
20 import sys
21 import time
21 import time
22 from datetime import datetime
22 from datetime import datetime
23
23
24 from zmq import MessageTracker
24 from zmq import MessageTracker
25
25
26 from IPython.core.display import clear_output, display, display_pretty
26 from IPython.core.display import clear_output, display, display_pretty
27 from IPython.external.decorator import decorator
27 from IPython.external.decorator import decorator
28 from IPython.parallel import error
28 from IPython.parallel import error
29
29
30 #-----------------------------------------------------------------------------
30 #-----------------------------------------------------------------------------
31 # Functions
31 # Functions
32 #-----------------------------------------------------------------------------
32 #-----------------------------------------------------------------------------
33
33
34 def _total_seconds(td):
34 def _total_seconds(td):
35 """timedelta.total_seconds was added in 2.7"""
35 """timedelta.total_seconds was added in 2.7"""
36 try:
36 try:
37 # Python >= 2.7
37 # Python >= 2.7
38 return td.total_seconds()
38 return td.total_seconds()
39 except AttributeError:
39 except AttributeError:
40 # Python 2.6
40 # Python 2.6
41 return 1e-6 * (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)
41 return 1e-6 * (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)
42
42
43 def _raw_text(s):
43 def _raw_text(s):
44 display_pretty(s, raw=True)
44 display_pretty(s, raw=True)
45
45
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47 # Classes
47 # Classes
48 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
49
49
50 # global empty tracker that's always done:
50 # global empty tracker that's always done:
51 finished_tracker = MessageTracker()
51 finished_tracker = MessageTracker()
52
52
53 @decorator
53 @decorator
54 def check_ready(f, self, *args, **kwargs):
54 def check_ready(f, self, *args, **kwargs):
55 """Call spin() to sync state prior to calling the method."""
55 """Call spin() to sync state prior to calling the method."""
56 self.wait(0)
56 self.wait(0)
57 if not self._ready:
57 if not self._ready:
58 raise error.TimeoutError("result not ready")
58 raise error.TimeoutError("result not ready")
59 return f(self, *args, **kwargs)
59 return f(self, *args, **kwargs)
60
60
61 class AsyncResult(object):
61 class AsyncResult(object):
62 """Class for representing results of non-blocking calls.
62 """Class for representing results of non-blocking calls.
63
63
64 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
64 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
65 """
65 """
66
66
67 msg_ids = None
67 msg_ids = None
68 _targets = None
68 _targets = None
69 _tracker = None
69 _tracker = None
70 _single_result = False
70 _single_result = False
71
71
72 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
72 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
73 if isinstance(msg_ids, basestring):
73 if isinstance(msg_ids, basestring):
74 # always a list
74 # always a list
75 msg_ids = [msg_ids]
75 msg_ids = [msg_ids]
76 if tracker is None:
76 if tracker is None:
77 # default to always done
77 # default to always done
78 tracker = finished_tracker
78 tracker = finished_tracker
79 self._client = client
79 self._client = client
80 self.msg_ids = msg_ids
80 self.msg_ids = msg_ids
81 self._fname=fname
81 self._fname=fname
82 self._targets = targets
82 self._targets = targets
83 self._tracker = tracker
83 self._tracker = tracker
84 self._ready = False
84 self._ready = False
85 self._outputs_ready = False
85 self._success = None
86 self._success = None
86 self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ]
87 self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ]
87 if len(msg_ids) == 1:
88 if len(msg_ids) == 1:
88 self._single_result = not isinstance(targets, (list, tuple))
89 self._single_result = not isinstance(targets, (list, tuple))
89 else:
90 else:
90 self._single_result = False
91 self._single_result = False
91
92
92 def __repr__(self):
93 def __repr__(self):
93 if self._ready:
94 if self._ready:
94 return "<%s: finished>"%(self.__class__.__name__)
95 return "<%s: finished>"%(self.__class__.__name__)
95 else:
96 else:
96 return "<%s: %s>"%(self.__class__.__name__,self._fname)
97 return "<%s: %s>"%(self.__class__.__name__,self._fname)
97
98
98
99
99 def _reconstruct_result(self, res):
100 def _reconstruct_result(self, res):
100 """Reconstruct our result from actual result list (always a list)
101 """Reconstruct our result from actual result list (always a list)
101
102
102 Override me in subclasses for turning a list of results
103 Override me in subclasses for turning a list of results
103 into the expected form.
104 into the expected form.
104 """
105 """
105 if self._single_result:
106 if self._single_result:
106 return res[0]
107 return res[0]
107 else:
108 else:
108 return res
109 return res
109
110
110 def get(self, timeout=-1):
111 def get(self, timeout=-1):
111 """Return the result when it arrives.
112 """Return the result when it arrives.
112
113
113 If `timeout` is not ``None`` and the result does not arrive within
114 If `timeout` is not ``None`` and the result does not arrive within
114 `timeout` seconds then ``TimeoutError`` is raised. If the
115 `timeout` seconds then ``TimeoutError`` is raised. If the
115 remote call raised an exception then that exception will be reraised
116 remote call raised an exception then that exception will be reraised
116 by get() inside a `RemoteError`.
117 by get() inside a `RemoteError`.
117 """
118 """
118 if not self.ready():
119 if not self.ready():
119 self.wait(timeout)
120 self.wait(timeout)
120
121
121 if self._ready:
122 if self._ready:
122 if self._success:
123 if self._success:
123 return self._result
124 return self._result
124 else:
125 else:
125 raise self._exception
126 raise self._exception
126 else:
127 else:
127 raise error.TimeoutError("Result not ready.")
128 raise error.TimeoutError("Result not ready.")
128
129
129 def _check_ready(self):
130 def _check_ready(self):
130 if not self.ready():
131 if not self.ready():
131 raise error.TimeoutError("Result not ready.")
132 raise error.TimeoutError("Result not ready.")
132
133
133 def ready(self):
134 def ready(self):
134 """Return whether the call has completed."""
135 """Return whether the call has completed."""
135 if not self._ready:
136 if not self._ready:
136 self.wait(0)
137 self.wait(0)
138 elif not self._outputs_ready:
139 self._wait_for_outputs(0)
140
137 return self._ready
141 return self._ready
138
142
139 def wait(self, timeout=-1):
143 def wait(self, timeout=-1):
140 """Wait until the result is available or until `timeout` seconds pass.
144 """Wait until the result is available or until `timeout` seconds pass.
141
145
142 This method always returns None.
146 This method always returns None.
143 """
147 """
144 if self._ready:
148 if self._ready:
149 self._wait_for_outputs(timeout)
145 return
150 return
146 self._ready = self._client.wait(self.msg_ids, timeout)
151 self._ready = self._client.wait(self.msg_ids, timeout)
147 if self._ready:
152 if self._ready:
148 try:
153 try:
149 results = map(self._client.results.get, self.msg_ids)
154 results = map(self._client.results.get, self.msg_ids)
150 self._result = results
155 self._result = results
151 if self._single_result:
156 if self._single_result:
152 r = results[0]
157 r = results[0]
153 if isinstance(r, Exception):
158 if isinstance(r, Exception):
154 raise r
159 raise r
155 else:
160 else:
156 results = error.collect_exceptions(results, self._fname)
161 results = error.collect_exceptions(results, self._fname)
157 self._result = self._reconstruct_result(results)
162 self._result = self._reconstruct_result(results)
158 except Exception as e:
163 except Exception as e:
159 self._exception = e
164 self._exception = e
160 self._success = False
165 self._success = False
161 else:
166 else:
162 self._success = True
167 self._success = True
163 finally:
168 finally:
164
169 if timeout is None or timeout < 0:
165 self._wait_for_outputs(10)
170 # cutoff infinite wait at 10s
171 timeout = 10
172 self._wait_for_outputs(timeout)
166
173
167
174
168 def successful(self):
175 def successful(self):
169 """Return whether the call completed without raising an exception.
176 """Return whether the call completed without raising an exception.
170
177
171 Will raise ``AssertionError`` if the result is not ready.
178 Will raise ``AssertionError`` if the result is not ready.
172 """
179 """
173 assert self.ready()
180 assert self.ready()
174 return self._success
181 return self._success
175
182
176 #----------------------------------------------------------------
183 #----------------------------------------------------------------
177 # Extra methods not in mp.pool.AsyncResult
184 # Extra methods not in mp.pool.AsyncResult
178 #----------------------------------------------------------------
185 #----------------------------------------------------------------
179
186
180 def get_dict(self, timeout=-1):
187 def get_dict(self, timeout=-1):
181 """Get the results as a dict, keyed by engine_id.
188 """Get the results as a dict, keyed by engine_id.
182
189
183 timeout behavior is described in `get()`.
190 timeout behavior is described in `get()`.
184 """
191 """
185
192
186 results = self.get(timeout)
193 results = self.get(timeout)
187 engine_ids = [ md['engine_id'] for md in self._metadata ]
194 engine_ids = [ md['engine_id'] for md in self._metadata ]
188 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
195 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
189 maxcount = bycount.count(bycount[-1])
196 maxcount = bycount.count(bycount[-1])
190 if maxcount > 1:
197 if maxcount > 1:
191 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
198 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
192 maxcount, bycount[-1]))
199 maxcount, bycount[-1]))
193
200
194 return dict(zip(engine_ids,results))
201 return dict(zip(engine_ids,results))
195
202
196 @property
203 @property
197 def result(self):
204 def result(self):
198 """result property wrapper for `get(timeout=-1)`."""
205 """result property wrapper for `get(timeout=-1)`."""
199 return self.get()
206 return self.get()
200
207
201 # abbreviated alias:
208 # abbreviated alias:
202 r = result
209 r = result
203
210
204 @property
211 @property
205 def metadata(self):
212 def metadata(self):
206 """property for accessing execution metadata."""
213 """property for accessing execution metadata."""
207 if self._single_result:
214 if self._single_result:
208 return self._metadata[0]
215 return self._metadata[0]
209 else:
216 else:
210 return self._metadata
217 return self._metadata
211
218
212 @property
219 @property
213 def result_dict(self):
220 def result_dict(self):
214 """result property as a dict."""
221 """result property as a dict."""
215 return self.get_dict()
222 return self.get_dict()
216
223
217 def __dict__(self):
224 def __dict__(self):
218 return self.get_dict(0)
225 return self.get_dict(0)
219
226
220 def abort(self):
227 def abort(self):
221 """abort my tasks."""
228 """abort my tasks."""
222 assert not self.ready(), "Can't abort, I am already done!"
229 assert not self.ready(), "Can't abort, I am already done!"
223 return self._client.abort(self.msg_ids, targets=self._targets, block=True)
230 return self._client.abort(self.msg_ids, targets=self._targets, block=True)
224
231
225 @property
232 @property
226 def sent(self):
233 def sent(self):
227 """check whether my messages have been sent."""
234 """check whether my messages have been sent."""
228 return self._tracker.done
235 return self._tracker.done
229
236
230 def wait_for_send(self, timeout=-1):
237 def wait_for_send(self, timeout=-1):
231 """wait for pyzmq send to complete.
238 """wait for pyzmq send to complete.
232
239
233 This is necessary when sending arrays that you intend to edit in-place.
240 This is necessary when sending arrays that you intend to edit in-place.
234 `timeout` is in seconds, and will raise TimeoutError if it is reached
241 `timeout` is in seconds, and will raise TimeoutError if it is reached
235 before the send completes.
242 before the send completes.
236 """
243 """
237 return self._tracker.wait(timeout)
244 return self._tracker.wait(timeout)
238
245
239 #-------------------------------------
246 #-------------------------------------
240 # dict-access
247 # dict-access
241 #-------------------------------------
248 #-------------------------------------
242
249
243 def __getitem__(self, key):
250 def __getitem__(self, key):
244 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
251 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
245 """
252 """
246 if isinstance(key, int):
253 if isinstance(key, int):
247 self._check_ready()
254 self._check_ready()
248 return error.collect_exceptions([self._result[key]], self._fname)[0]
255 return error.collect_exceptions([self._result[key]], self._fname)[0]
249 elif isinstance(key, slice):
256 elif isinstance(key, slice):
250 self._check_ready()
257 self._check_ready()
251 return error.collect_exceptions(self._result[key], self._fname)
258 return error.collect_exceptions(self._result[key], self._fname)
252 elif isinstance(key, basestring):
259 elif isinstance(key, basestring):
253 # metadata proxy *does not* require that results are done
260 # metadata proxy *does not* require that results are done
261 self.wait(0)
254 values = [ md[key] for md in self._metadata ]
262 values = [ md[key] for md in self._metadata ]
255 if self._single_result:
263 if self._single_result:
256 return values[0]
264 return values[0]
257 else:
265 else:
258 return values
266 return values
259 else:
267 else:
260 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
268 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
261
269
262 def __getattr__(self, key):
270 def __getattr__(self, key):
263 """getattr maps to getitem for convenient attr access to metadata."""
271 """getattr maps to getitem for convenient attr access to metadata."""
264 try:
272 try:
265 return self.__getitem__(key)
273 return self.__getitem__(key)
266 except (error.TimeoutError, KeyError):
274 except (error.TimeoutError, KeyError):
267 raise AttributeError("%r object has no attribute %r"%(
275 raise AttributeError("%r object has no attribute %r"%(
268 self.__class__.__name__, key))
276 self.__class__.__name__, key))
269
277
270 # asynchronous iterator:
278 # asynchronous iterator:
271 def __iter__(self):
279 def __iter__(self):
272 if self._single_result:
280 if self._single_result:
273 raise TypeError("AsyncResults with a single result are not iterable.")
281 raise TypeError("AsyncResults with a single result are not iterable.")
274 try:
282 try:
275 rlist = self.get(0)
283 rlist = self.get(0)
276 except error.TimeoutError:
284 except error.TimeoutError:
277 # wait for each result individually
285 # wait for each result individually
278 for msg_id in self.msg_ids:
286 for msg_id in self.msg_ids:
279 ar = AsyncResult(self._client, msg_id, self._fname)
287 ar = AsyncResult(self._client, msg_id, self._fname)
280 yield ar.get()
288 yield ar.get()
281 else:
289 else:
282 # already done
290 # already done
283 for r in rlist:
291 for r in rlist:
284 yield r
292 yield r
285
293
286 def __len__(self):
294 def __len__(self):
287 return len(self.msg_ids)
295 return len(self.msg_ids)
288
296
289 #-------------------------------------
297 #-------------------------------------
290 # Sugar methods and attributes
298 # Sugar methods and attributes
291 #-------------------------------------
299 #-------------------------------------
292
300
293 def timedelta(self, start, end, start_key=min, end_key=max):
301 def timedelta(self, start, end, start_key=min, end_key=max):
294 """compute the difference between two sets of timestamps
302 """compute the difference between two sets of timestamps
295
303
296 The default behavior is to use the earliest of the first
304 The default behavior is to use the earliest of the first
297 and the latest of the second list, but this can be changed
305 and the latest of the second list, but this can be changed
298 by passing a different
306 by passing a different
299
307
300 Parameters
308 Parameters
301 ----------
309 ----------
302
310
303 start : one or more datetime objects (e.g. ar.submitted)
311 start : one or more datetime objects (e.g. ar.submitted)
304 end : one or more datetime objects (e.g. ar.received)
312 end : one or more datetime objects (e.g. ar.received)
305 start_key : callable
313 start_key : callable
306 Function to call on `start` to extract the relevant
314 Function to call on `start` to extract the relevant
307 entry [defalt: min]
315 entry [defalt: min]
308 end_key : callable
316 end_key : callable
309 Function to call on `end` to extract the relevant
317 Function to call on `end` to extract the relevant
310 entry [default: max]
318 entry [default: max]
311
319
312 Returns
320 Returns
313 -------
321 -------
314
322
315 dt : float
323 dt : float
316 The time elapsed (in seconds) between the two selected timestamps.
324 The time elapsed (in seconds) between the two selected timestamps.
317 """
325 """
318 if not isinstance(start, datetime):
326 if not isinstance(start, datetime):
319 # handle single_result AsyncResults, where ar.stamp is single object,
327 # handle single_result AsyncResults, where ar.stamp is single object,
320 # not a list
328 # not a list
321 start = start_key(start)
329 start = start_key(start)
322 if not isinstance(end, datetime):
330 if not isinstance(end, datetime):
323 # handle single_result AsyncResults, where ar.stamp is single object,
331 # handle single_result AsyncResults, where ar.stamp is single object,
324 # not a list
332 # not a list
325 end = end_key(end)
333 end = end_key(end)
326 return _total_seconds(end - start)
334 return _total_seconds(end - start)
327
335
328 @property
336 @property
329 def progress(self):
337 def progress(self):
330 """the number of tasks which have been completed at this point.
338 """the number of tasks which have been completed at this point.
331
339
332 Fractional progress would be given by 1.0 * ar.progress / len(ar)
340 Fractional progress would be given by 1.0 * ar.progress / len(ar)
333 """
341 """
334 self.wait(0)
342 self.wait(0)
335 return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding))
343 return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding))
336
344
337 @property
345 @property
338 def elapsed(self):
346 def elapsed(self):
339 """elapsed time since initial submission"""
347 """elapsed time since initial submission"""
340 if self.ready():
348 if self.ready():
341 return self.wall_time
349 return self.wall_time
342
350
343 now = submitted = datetime.now()
351 now = submitted = datetime.now()
344 for msg_id in self.msg_ids:
352 for msg_id in self.msg_ids:
345 if msg_id in self._client.metadata:
353 if msg_id in self._client.metadata:
346 stamp = self._client.metadata[msg_id]['submitted']
354 stamp = self._client.metadata[msg_id]['submitted']
347 if stamp and stamp < submitted:
355 if stamp and stamp < submitted:
348 submitted = stamp
356 submitted = stamp
349 return _total_seconds(now-submitted)
357 return _total_seconds(now-submitted)
350
358
351 @property
359 @property
352 @check_ready
360 @check_ready
353 def serial_time(self):
361 def serial_time(self):
354 """serial computation time of a parallel calculation
362 """serial computation time of a parallel calculation
355
363
356 Computed as the sum of (completed-started) of each task
364 Computed as the sum of (completed-started) of each task
357 """
365 """
358 t = 0
366 t = 0
359 for md in self._metadata:
367 for md in self._metadata:
360 t += _total_seconds(md['completed'] - md['started'])
368 t += _total_seconds(md['completed'] - md['started'])
361 return t
369 return t
362
370
363 @property
371 @property
364 @check_ready
372 @check_ready
365 def wall_time(self):
373 def wall_time(self):
366 """actual computation time of a parallel calculation
374 """actual computation time of a parallel calculation
367
375
368 Computed as the time between the latest `received` stamp
376 Computed as the time between the latest `received` stamp
369 and the earliest `submitted`.
377 and the earliest `submitted`.
370
378
371 Only reliable if Client was spinning/waiting when the task finished, because
379 Only reliable if Client was spinning/waiting when the task finished, because
372 the `received` timestamp is created when a result is pulled off of the zmq queue,
380 the `received` timestamp is created when a result is pulled off of the zmq queue,
373 which happens as a result of `client.spin()`.
381 which happens as a result of `client.spin()`.
374
382
375 For similar comparison of other timestamp pairs, check out AsyncResult.timedelta.
383 For similar comparison of other timestamp pairs, check out AsyncResult.timedelta.
376
384
377 """
385 """
378 return self.timedelta(self.submitted, self.received)
386 return self.timedelta(self.submitted, self.received)
379
387
380 def wait_interactive(self, interval=1., timeout=None):
388 def wait_interactive(self, interval=1., timeout=-1):
381 """interactive wait, printing progress at regular intervals"""
389 """interactive wait, printing progress at regular intervals"""
390 if timeout is None:
391 timeout = -1
382 N = len(self)
392 N = len(self)
383 tic = time.time()
393 tic = time.time()
384 while not self.ready() and (timeout is None or time.time() - tic <= timeout):
394 while not self.ready() and (timeout < 0 or time.time() - tic <= timeout):
385 self.wait(interval)
395 self.wait(interval)
386 clear_output()
396 clear_output()
387 print("%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed), end="")
397 print("%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed), end="")
388 sys.stdout.flush()
398 sys.stdout.flush()
389 print()
399 print()
390 print("done")
400 print("done")
391
401
392 def _republish_displaypub(self, content, eid):
402 def _republish_displaypub(self, content, eid):
393 """republish individual displaypub content dicts"""
403 """republish individual displaypub content dicts"""
394 try:
404 try:
395 ip = get_ipython()
405 ip = get_ipython()
396 except NameError:
406 except NameError:
397 # displaypub is meaningless outside IPython
407 # displaypub is meaningless outside IPython
398 return
408 return
399 md = content['metadata'] or {}
409 md = content['metadata'] or {}
400 md['engine'] = eid
410 md['engine'] = eid
401 ip.display_pub.publish(content['source'], content['data'], md)
411 ip.display_pub.publish(content['source'], content['data'], md)
402
412
403 def _display_stream(self, text, prefix='', file=None):
413 def _display_stream(self, text, prefix='', file=None):
404 if not text:
414 if not text:
405 # nothing to display
415 # nothing to display
406 return
416 return
407 if file is None:
417 if file is None:
408 file = sys.stdout
418 file = sys.stdout
409 end = '' if text.endswith('\n') else '\n'
419 end = '' if text.endswith('\n') else '\n'
410
420
411 multiline = text.count('\n') > int(text.endswith('\n'))
421 multiline = text.count('\n') > int(text.endswith('\n'))
412 if prefix and multiline and not text.startswith('\n'):
422 if prefix and multiline and not text.startswith('\n'):
413 prefix = prefix + '\n'
423 prefix = prefix + '\n'
414 print("%s%s" % (prefix, text), file=file, end=end)
424 print("%s%s" % (prefix, text), file=file, end=end)
415
425
416
426
417 def _display_single_result(self):
427 def _display_single_result(self):
418 self._display_stream(self.stdout)
428 self._display_stream(self.stdout)
419 self._display_stream(self.stderr, file=sys.stderr)
429 self._display_stream(self.stderr, file=sys.stderr)
420
430
421 try:
431 try:
422 get_ipython()
432 get_ipython()
423 except NameError:
433 except NameError:
424 # displaypub is meaningless outside IPython
434 # displaypub is meaningless outside IPython
425 return
435 return
426
436
427 for output in self.outputs:
437 for output in self.outputs:
428 self._republish_displaypub(output, self.engine_id)
438 self._republish_displaypub(output, self.engine_id)
429
439
430 if self.pyout is not None:
440 if self.pyout is not None:
431 display(self.get())
441 display(self.get())
432
442
433 def _wait_for_outputs(self, timeout=-1):
443 def _wait_for_outputs(self, timeout=-1):
434 """wait for the 'status=idle' message that indicates we have all outputs
444 """wait for the 'status=idle' message that indicates we have all outputs
435 """
445 """
436 if not self._success:
446 if self._outputs_ready or not self._success:
437 # don't wait on errors
447 # don't wait on errors
438 return
448 return
449
450 # cast None to -1 for infinite timeout
451 if timeout is None:
452 timeout = -1
453
439 tic = time.time()
454 tic = time.time()
440 while not all(md['outputs_ready'] for md in self._metadata):
455 self._client._flush_iopub(self._client._iopub_socket)
456 self._outputs_ready = all(md['outputs_ready'] for md in self._metadata)
457 while not self._outputs_ready:
441 time.sleep(0.01)
458 time.sleep(0.01)
442 self._client._flush_iopub(self._client._iopub_socket)
459 self._client._flush_iopub(self._client._iopub_socket)
460 self._outputs_ready = all(md['outputs_ready'] for md in self._metadata)
443 if timeout >= 0 and time.time() > tic + timeout:
461 if timeout >= 0 and time.time() > tic + timeout:
444 break
462 break
445
463
446 @check_ready
464 @check_ready
447 def display_outputs(self, groupby="type"):
465 def display_outputs(self, groupby="type"):
448 """republish the outputs of the computation
466 """republish the outputs of the computation
449
467
450 Parameters
468 Parameters
451 ----------
469 ----------
452
470
453 groupby : str [default: type]
471 groupby : str [default: type]
454 if 'type':
472 if 'type':
455 Group outputs by type (show all stdout, then all stderr, etc.):
473 Group outputs by type (show all stdout, then all stderr, etc.):
456
474
457 [stdout:1] foo
475 [stdout:1] foo
458 [stdout:2] foo
476 [stdout:2] foo
459 [stderr:1] bar
477 [stderr:1] bar
460 [stderr:2] bar
478 [stderr:2] bar
461 if 'engine':
479 if 'engine':
462 Display outputs for each engine before moving on to the next:
480 Display outputs for each engine before moving on to the next:
463
481
464 [stdout:1] foo
482 [stdout:1] foo
465 [stderr:1] bar
483 [stderr:1] bar
466 [stdout:2] foo
484 [stdout:2] foo
467 [stderr:2] bar
485 [stderr:2] bar
468
486
469 if 'order':
487 if 'order':
470 Like 'type', but further collate individual displaypub
488 Like 'type', but further collate individual displaypub
471 outputs. This is meant for cases of each command producing
489 outputs. This is meant for cases of each command producing
472 several plots, and you would like to see all of the first
490 several plots, and you would like to see all of the first
473 plots together, then all of the second plots, and so on.
491 plots together, then all of the second plots, and so on.
474 """
492 """
475 if self._single_result:
493 if self._single_result:
476 self._display_single_result()
494 self._display_single_result()
477 return
495 return
478
496
479 stdouts = self.stdout
497 stdouts = self.stdout
480 stderrs = self.stderr
498 stderrs = self.stderr
481 pyouts = self.pyout
499 pyouts = self.pyout
482 output_lists = self.outputs
500 output_lists = self.outputs
483 results = self.get()
501 results = self.get()
484
502
485 targets = self.engine_id
503 targets = self.engine_id
486
504
487 if groupby == "engine":
505 if groupby == "engine":
488 for eid,stdout,stderr,outputs,r,pyout in zip(
506 for eid,stdout,stderr,outputs,r,pyout in zip(
489 targets, stdouts, stderrs, output_lists, results, pyouts
507 targets, stdouts, stderrs, output_lists, results, pyouts
490 ):
508 ):
491 self._display_stream(stdout, '[stdout:%i] ' % eid)
509 self._display_stream(stdout, '[stdout:%i] ' % eid)
492 self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
510 self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
493
511
494 try:
512 try:
495 get_ipython()
513 get_ipython()
496 except NameError:
514 except NameError:
497 # displaypub is meaningless outside IPython
515 # displaypub is meaningless outside IPython
498 return
516 return
499
517
500 if outputs or pyout is not None:
518 if outputs or pyout is not None:
501 _raw_text('[output:%i]' % eid)
519 _raw_text('[output:%i]' % eid)
502
520
503 for output in outputs:
521 for output in outputs:
504 self._republish_displaypub(output, eid)
522 self._republish_displaypub(output, eid)
505
523
506 if pyout is not None:
524 if pyout is not None:
507 display(r)
525 display(r)
508
526
509 elif groupby in ('type', 'order'):
527 elif groupby in ('type', 'order'):
510 # republish stdout:
528 # republish stdout:
511 for eid,stdout in zip(targets, stdouts):
529 for eid,stdout in zip(targets, stdouts):
512 self._display_stream(stdout, '[stdout:%i] ' % eid)
530 self._display_stream(stdout, '[stdout:%i] ' % eid)
513
531
514 # republish stderr:
532 # republish stderr:
515 for eid,stderr in zip(targets, stderrs):
533 for eid,stderr in zip(targets, stderrs):
516 self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
534 self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
517
535
518 try:
536 try:
519 get_ipython()
537 get_ipython()
520 except NameError:
538 except NameError:
521 # displaypub is meaningless outside IPython
539 # displaypub is meaningless outside IPython
522 return
540 return
523
541
524 if groupby == 'order':
542 if groupby == 'order':
525 output_dict = dict((eid, outputs) for eid,outputs in zip(targets, output_lists))
543 output_dict = dict((eid, outputs) for eid,outputs in zip(targets, output_lists))
526 N = max(len(outputs) for outputs in output_lists)
544 N = max(len(outputs) for outputs in output_lists)
527 for i in range(N):
545 for i in range(N):
528 for eid in targets:
546 for eid in targets:
529 outputs = output_dict[eid]
547 outputs = output_dict[eid]
530 if len(outputs) >= N:
548 if len(outputs) >= N:
531 _raw_text('[output:%i]' % eid)
549 _raw_text('[output:%i]' % eid)
532 self._republish_displaypub(outputs[i], eid)
550 self._republish_displaypub(outputs[i], eid)
533 else:
551 else:
534 # republish displaypub output
552 # republish displaypub output
535 for eid,outputs in zip(targets, output_lists):
553 for eid,outputs in zip(targets, output_lists):
536 if outputs:
554 if outputs:
537 _raw_text('[output:%i]' % eid)
555 _raw_text('[output:%i]' % eid)
538 for output in outputs:
556 for output in outputs:
539 self._republish_displaypub(output, eid)
557 self._republish_displaypub(output, eid)
540
558
541 # finally, add pyout:
559 # finally, add pyout:
542 for eid,r,pyout in zip(targets, results, pyouts):
560 for eid,r,pyout in zip(targets, results, pyouts):
543 if pyout is not None:
561 if pyout is not None:
544 display(r)
562 display(r)
545
563
546 else:
564 else:
547 raise ValueError("groupby must be one of 'type', 'engine', 'collate', not %r" % groupby)
565 raise ValueError("groupby must be one of 'type', 'engine', 'collate', not %r" % groupby)
548
566
549
567
550
568
551
569
552 class AsyncMapResult(AsyncResult):
570 class AsyncMapResult(AsyncResult):
553 """Class for representing results of non-blocking gathers.
571 """Class for representing results of non-blocking gathers.
554
572
555 This will properly reconstruct the gather.
573 This will properly reconstruct the gather.
556
574
557 This class is iterable at any time, and will wait on results as they come.
575 This class is iterable at any time, and will wait on results as they come.
558
576
559 If ordered=False, then the first results to arrive will come first, otherwise
577 If ordered=False, then the first results to arrive will come first, otherwise
560 results will be yielded in the order they were submitted.
578 results will be yielded in the order they were submitted.
561
579
562 """
580 """
563
581
564 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
582 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
565 AsyncResult.__init__(self, client, msg_ids, fname=fname)
583 AsyncResult.__init__(self, client, msg_ids, fname=fname)
566 self._mapObject = mapObject
584 self._mapObject = mapObject
567 self._single_result = False
585 self._single_result = False
568 self.ordered = ordered
586 self.ordered = ordered
569
587
570 def _reconstruct_result(self, res):
588 def _reconstruct_result(self, res):
571 """Perform the gather on the actual results."""
589 """Perform the gather on the actual results."""
572 return self._mapObject.joinPartitions(res)
590 return self._mapObject.joinPartitions(res)
573
591
574 # asynchronous iterator:
592 # asynchronous iterator:
575 def __iter__(self):
593 def __iter__(self):
576 it = self._ordered_iter if self.ordered else self._unordered_iter
594 it = self._ordered_iter if self.ordered else self._unordered_iter
577 for r in it():
595 for r in it():
578 yield r
596 yield r
579
597
580 # asynchronous ordered iterator:
598 # asynchronous ordered iterator:
581 def _ordered_iter(self):
599 def _ordered_iter(self):
582 """iterator for results *as they arrive*, preserving submission order."""
600 """iterator for results *as they arrive*, preserving submission order."""
583 try:
601 try:
584 rlist = self.get(0)
602 rlist = self.get(0)
585 except error.TimeoutError:
603 except error.TimeoutError:
586 # wait for each result individually
604 # wait for each result individually
587 for msg_id in self.msg_ids:
605 for msg_id in self.msg_ids:
588 ar = AsyncResult(self._client, msg_id, self._fname)
606 ar = AsyncResult(self._client, msg_id, self._fname)
589 rlist = ar.get()
607 rlist = ar.get()
590 try:
608 try:
591 for r in rlist:
609 for r in rlist:
592 yield r
610 yield r
593 except TypeError:
611 except TypeError:
594 # flattened, not a list
612 # flattened, not a list
595 # this could get broken by flattened data that returns iterables
613 # this could get broken by flattened data that returns iterables
596 # but most calls to map do not expose the `flatten` argument
614 # but most calls to map do not expose the `flatten` argument
597 yield rlist
615 yield rlist
598 else:
616 else:
599 # already done
617 # already done
600 for r in rlist:
618 for r in rlist:
601 yield r
619 yield r
602
620
603 # asynchronous unordered iterator:
621 # asynchronous unordered iterator:
604 def _unordered_iter(self):
622 def _unordered_iter(self):
605 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
623 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
606 try:
624 try:
607 rlist = self.get(0)
625 rlist = self.get(0)
608 except error.TimeoutError:
626 except error.TimeoutError:
609 pending = set(self.msg_ids)
627 pending = set(self.msg_ids)
610 while pending:
628 while pending:
611 try:
629 try:
612 self._client.wait(pending, 1e-3)
630 self._client.wait(pending, 1e-3)
613 except error.TimeoutError:
631 except error.TimeoutError:
614 # ignore timeout error, because that only means
632 # ignore timeout error, because that only means
615 # *some* jobs are outstanding
633 # *some* jobs are outstanding
616 pass
634 pass
617 # update ready set with those no longer outstanding:
635 # update ready set with those no longer outstanding:
618 ready = pending.difference(self._client.outstanding)
636 ready = pending.difference(self._client.outstanding)
619 # update pending to exclude those that are finished
637 # update pending to exclude those that are finished
620 pending = pending.difference(ready)
638 pending = pending.difference(ready)
621 while ready:
639 while ready:
622 msg_id = ready.pop()
640 msg_id = ready.pop()
623 ar = AsyncResult(self._client, msg_id, self._fname)
641 ar = AsyncResult(self._client, msg_id, self._fname)
624 rlist = ar.get()
642 rlist = ar.get()
625 try:
643 try:
626 for r in rlist:
644 for r in rlist:
627 yield r
645 yield r
628 except TypeError:
646 except TypeError:
629 # flattened, not a list
647 # flattened, not a list
630 # this could get broken by flattened data that returns iterables
648 # this could get broken by flattened data that returns iterables
631 # but most calls to map do not expose the `flatten` argument
649 # but most calls to map do not expose the `flatten` argument
632 yield rlist
650 yield rlist
633 else:
651 else:
634 # already done
652 # already done
635 for r in rlist:
653 for r in rlist:
636 yield r
654 yield r
637
655
638
656
639 class AsyncHubResult(AsyncResult):
657 class AsyncHubResult(AsyncResult):
640 """Class to wrap pending results that must be requested from the Hub.
658 """Class to wrap pending results that must be requested from the Hub.
641
659
642 Note that waiting/polling on these objects requires polling the Hubover the network,
660 Note that waiting/polling on these objects requires polling the Hubover the network,
643 so use `AsyncHubResult.wait()` sparingly.
661 so use `AsyncHubResult.wait()` sparingly.
644 """
662 """
645
663
646 def _wait_for_outputs(self, timeout=None):
664 def _wait_for_outputs(self, timeout=-1):
647 """no-op, because HubResults are never incomplete"""
665 """no-op, because HubResults are never incomplete"""
648 return
666 self._outputs_ready = True
649
667
650 def wait(self, timeout=-1):
668 def wait(self, timeout=-1):
651 """wait for result to complete."""
669 """wait for result to complete."""
652 start = time.time()
670 start = time.time()
653 if self._ready:
671 if self._ready:
654 return
672 return
655 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
673 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
656 local_ready = self._client.wait(local_ids, timeout)
674 local_ready = self._client.wait(local_ids, timeout)
657 if local_ready:
675 if local_ready:
658 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
676 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
659 if not remote_ids:
677 if not remote_ids:
660 self._ready = True
678 self._ready = True
661 else:
679 else:
662 rdict = self._client.result_status(remote_ids, status_only=False)
680 rdict = self._client.result_status(remote_ids, status_only=False)
663 pending = rdict['pending']
681 pending = rdict['pending']
664 while pending and (timeout < 0 or time.time() < start+timeout):
682 while pending and (timeout < 0 or time.time() < start+timeout):
665 rdict = self._client.result_status(remote_ids, status_only=False)
683 rdict = self._client.result_status(remote_ids, status_only=False)
666 pending = rdict['pending']
684 pending = rdict['pending']
667 if pending:
685 if pending:
668 time.sleep(0.1)
686 time.sleep(0.1)
669 if not pending:
687 if not pending:
670 self._ready = True
688 self._ready = True
671 if self._ready:
689 if self._ready:
672 try:
690 try:
673 results = map(self._client.results.get, self.msg_ids)
691 results = map(self._client.results.get, self.msg_ids)
674 self._result = results
692 self._result = results
675 if self._single_result:
693 if self._single_result:
676 r = results[0]
694 r = results[0]
677 if isinstance(r, Exception):
695 if isinstance(r, Exception):
678 raise r
696 raise r
679 else:
697 else:
680 results = error.collect_exceptions(results, self._fname)
698 results = error.collect_exceptions(results, self._fname)
681 self._result = self._reconstruct_result(results)
699 self._result = self._reconstruct_result(results)
682 except Exception as e:
700 except Exception as e:
683 self._exception = e
701 self._exception = e
684 self._success = False
702 self._success = False
685 else:
703 else:
686 self._success = True
704 self._success = True
687 finally:
705 finally:
688 self._metadata = map(self._client.metadata.get, self.msg_ids)
706 self._metadata = map(self._client.metadata.get, self.msg_ids)
689
707
690 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']
708 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']
@@ -1,267 +1,294
1 """Tests for asyncresult.py
1 """Tests for asyncresult.py
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import time
19 import time
20
20
21 import nose.tools as nt
22
21 from IPython.utils.io import capture_output
23 from IPython.utils.io import capture_output
22
24
23 from IPython.parallel.error import TimeoutError
25 from IPython.parallel.error import TimeoutError
24 from IPython.parallel import error, Client
26 from IPython.parallel import error, Client
25 from IPython.parallel.tests import add_engines
27 from IPython.parallel.tests import add_engines
26 from .clienttest import ClusterTestCase
28 from .clienttest import ClusterTestCase
27
29
28 def setup():
30 def setup():
29 add_engines(2, total=True)
31 add_engines(2, total=True)
30
32
31 def wait(n):
33 def wait(n):
32 import time
34 import time
33 time.sleep(n)
35 time.sleep(n)
34 return n
36 return n
35
37
36 class AsyncResultTest(ClusterTestCase):
38 class AsyncResultTest(ClusterTestCase):
37
39
38 def test_single_result_view(self):
40 def test_single_result_view(self):
39 """various one-target views get the right value for single_result"""
41 """various one-target views get the right value for single_result"""
40 eid = self.client.ids[-1]
42 eid = self.client.ids[-1]
41 ar = self.client[eid].apply_async(lambda : 42)
43 ar = self.client[eid].apply_async(lambda : 42)
42 self.assertEqual(ar.get(), 42)
44 self.assertEqual(ar.get(), 42)
43 ar = self.client[[eid]].apply_async(lambda : 42)
45 ar = self.client[[eid]].apply_async(lambda : 42)
44 self.assertEqual(ar.get(), [42])
46 self.assertEqual(ar.get(), [42])
45 ar = self.client[-1:].apply_async(lambda : 42)
47 ar = self.client[-1:].apply_async(lambda : 42)
46 self.assertEqual(ar.get(), [42])
48 self.assertEqual(ar.get(), [42])
47
49
48 def test_get_after_done(self):
50 def test_get_after_done(self):
49 ar = self.client[-1].apply_async(lambda : 42)
51 ar = self.client[-1].apply_async(lambda : 42)
50 ar.wait()
52 ar.wait()
51 self.assertTrue(ar.ready())
53 self.assertTrue(ar.ready())
52 self.assertEqual(ar.get(), 42)
54 self.assertEqual(ar.get(), 42)
53 self.assertEqual(ar.get(), 42)
55 self.assertEqual(ar.get(), 42)
54
56
55 def test_get_before_done(self):
57 def test_get_before_done(self):
56 ar = self.client[-1].apply_async(wait, 0.1)
58 ar = self.client[-1].apply_async(wait, 0.1)
57 self.assertRaises(TimeoutError, ar.get, 0)
59 self.assertRaises(TimeoutError, ar.get, 0)
58 ar.wait(0)
60 ar.wait(0)
59 self.assertFalse(ar.ready())
61 self.assertFalse(ar.ready())
60 self.assertEqual(ar.get(), 0.1)
62 self.assertEqual(ar.get(), 0.1)
61
63
62 def test_get_after_error(self):
64 def test_get_after_error(self):
63 ar = self.client[-1].apply_async(lambda : 1/0)
65 ar = self.client[-1].apply_async(lambda : 1/0)
64 ar.wait(10)
66 ar.wait(10)
65 self.assertRaisesRemote(ZeroDivisionError, ar.get)
67 self.assertRaisesRemote(ZeroDivisionError, ar.get)
66 self.assertRaisesRemote(ZeroDivisionError, ar.get)
68 self.assertRaisesRemote(ZeroDivisionError, ar.get)
67 self.assertRaisesRemote(ZeroDivisionError, ar.get_dict)
69 self.assertRaisesRemote(ZeroDivisionError, ar.get_dict)
68
70
69 def test_get_dict(self):
71 def test_get_dict(self):
70 n = len(self.client)
72 n = len(self.client)
71 ar = self.client[:].apply_async(lambda : 5)
73 ar = self.client[:].apply_async(lambda : 5)
72 self.assertEqual(ar.get(), [5]*n)
74 self.assertEqual(ar.get(), [5]*n)
73 d = ar.get_dict()
75 d = ar.get_dict()
74 self.assertEqual(sorted(d.keys()), sorted(self.client.ids))
76 self.assertEqual(sorted(d.keys()), sorted(self.client.ids))
75 for eid,r in d.iteritems():
77 for eid,r in d.iteritems():
76 self.assertEqual(r, 5)
78 self.assertEqual(r, 5)
77
79
78 def test_list_amr(self):
80 def test_list_amr(self):
79 ar = self.client.load_balanced_view().map_async(wait, [0.1]*5)
81 ar = self.client.load_balanced_view().map_async(wait, [0.1]*5)
80 rlist = list(ar)
82 rlist = list(ar)
81
83
82 def test_getattr(self):
84 def test_getattr(self):
83 ar = self.client[:].apply_async(wait, 0.5)
85 ar = self.client[:].apply_async(wait, 0.5)
84 self.assertEqual(ar.engine_id, [None] * len(ar))
86 self.assertEqual(ar.engine_id, [None] * len(ar))
85 self.assertRaises(AttributeError, lambda : ar._foo)
87 self.assertRaises(AttributeError, lambda : ar._foo)
86 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
88 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
87 self.assertRaises(AttributeError, lambda : ar.foo)
89 self.assertRaises(AttributeError, lambda : ar.foo)
88 self.assertFalse(hasattr(ar, '__length_hint__'))
90 self.assertFalse(hasattr(ar, '__length_hint__'))
89 self.assertFalse(hasattr(ar, 'foo'))
91 self.assertFalse(hasattr(ar, 'foo'))
90 self.assertTrue(hasattr(ar, 'engine_id'))
92 self.assertTrue(hasattr(ar, 'engine_id'))
91 ar.get(5)
93 ar.get(5)
92 self.assertRaises(AttributeError, lambda : ar._foo)
94 self.assertRaises(AttributeError, lambda : ar._foo)
93 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
95 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
94 self.assertRaises(AttributeError, lambda : ar.foo)
96 self.assertRaises(AttributeError, lambda : ar.foo)
95 self.assertTrue(isinstance(ar.engine_id, list))
97 self.assertTrue(isinstance(ar.engine_id, list))
96 self.assertEqual(ar.engine_id, ar['engine_id'])
98 self.assertEqual(ar.engine_id, ar['engine_id'])
97 self.assertFalse(hasattr(ar, '__length_hint__'))
99 self.assertFalse(hasattr(ar, '__length_hint__'))
98 self.assertFalse(hasattr(ar, 'foo'))
100 self.assertFalse(hasattr(ar, 'foo'))
99 self.assertTrue(hasattr(ar, 'engine_id'))
101 self.assertTrue(hasattr(ar, 'engine_id'))
100
102
101 def test_getitem(self):
103 def test_getitem(self):
102 ar = self.client[:].apply_async(wait, 0.5)
104 ar = self.client[:].apply_async(wait, 0.5)
103 self.assertEqual(ar['engine_id'], [None] * len(ar))
105 self.assertEqual(ar['engine_id'], [None] * len(ar))
104 self.assertRaises(KeyError, lambda : ar['foo'])
106 self.assertRaises(KeyError, lambda : ar['foo'])
105 ar.get(5)
107 ar.get(5)
106 self.assertRaises(KeyError, lambda : ar['foo'])
108 self.assertRaises(KeyError, lambda : ar['foo'])
107 self.assertTrue(isinstance(ar['engine_id'], list))
109 self.assertTrue(isinstance(ar['engine_id'], list))
108 self.assertEqual(ar.engine_id, ar['engine_id'])
110 self.assertEqual(ar.engine_id, ar['engine_id'])
109
111
110 def test_single_result(self):
112 def test_single_result(self):
111 ar = self.client[-1].apply_async(wait, 0.5)
113 ar = self.client[-1].apply_async(wait, 0.5)
112 self.assertRaises(KeyError, lambda : ar['foo'])
114 self.assertRaises(KeyError, lambda : ar['foo'])
113 self.assertEqual(ar['engine_id'], None)
115 self.assertEqual(ar['engine_id'], None)
114 self.assertTrue(ar.get(5) == 0.5)
116 self.assertTrue(ar.get(5) == 0.5)
115 self.assertTrue(isinstance(ar['engine_id'], int))
117 self.assertTrue(isinstance(ar['engine_id'], int))
116 self.assertTrue(isinstance(ar.engine_id, int))
118 self.assertTrue(isinstance(ar.engine_id, int))
117 self.assertEqual(ar.engine_id, ar['engine_id'])
119 self.assertEqual(ar.engine_id, ar['engine_id'])
118
120
119 def test_abort(self):
121 def test_abort(self):
120 e = self.client[-1]
122 e = self.client[-1]
121 ar = e.execute('import time; time.sleep(1)', block=False)
123 ar = e.execute('import time; time.sleep(1)', block=False)
122 ar2 = e.apply_async(lambda : 2)
124 ar2 = e.apply_async(lambda : 2)
123 ar2.abort()
125 ar2.abort()
124 self.assertRaises(error.TaskAborted, ar2.get)
126 self.assertRaises(error.TaskAborted, ar2.get)
125 ar.get()
127 ar.get()
126
128
127 def test_len(self):
129 def test_len(self):
128 v = self.client.load_balanced_view()
130 v = self.client.load_balanced_view()
129 ar = v.map_async(lambda x: x, range(10))
131 ar = v.map_async(lambda x: x, range(10))
130 self.assertEqual(len(ar), 10)
132 self.assertEqual(len(ar), 10)
131 ar = v.apply_async(lambda x: x, range(10))
133 ar = v.apply_async(lambda x: x, range(10))
132 self.assertEqual(len(ar), 1)
134 self.assertEqual(len(ar), 1)
133 ar = self.client[:].apply_async(lambda x: x, range(10))
135 ar = self.client[:].apply_async(lambda x: x, range(10))
134 self.assertEqual(len(ar), len(self.client.ids))
136 self.assertEqual(len(ar), len(self.client.ids))
135
137
136 def test_wall_time_single(self):
138 def test_wall_time_single(self):
137 v = self.client.load_balanced_view()
139 v = self.client.load_balanced_view()
138 ar = v.apply_async(time.sleep, 0.25)
140 ar = v.apply_async(time.sleep, 0.25)
139 self.assertRaises(TimeoutError, getattr, ar, 'wall_time')
141 self.assertRaises(TimeoutError, getattr, ar, 'wall_time')
140 ar.get(2)
142 ar.get(2)
141 self.assertTrue(ar.wall_time < 1.)
143 self.assertTrue(ar.wall_time < 1.)
142 self.assertTrue(ar.wall_time > 0.2)
144 self.assertTrue(ar.wall_time > 0.2)
143
145
144 def test_wall_time_multi(self):
146 def test_wall_time_multi(self):
145 self.minimum_engines(4)
147 self.minimum_engines(4)
146 v = self.client[:]
148 v = self.client[:]
147 ar = v.apply_async(time.sleep, 0.25)
149 ar = v.apply_async(time.sleep, 0.25)
148 self.assertRaises(TimeoutError, getattr, ar, 'wall_time')
150 self.assertRaises(TimeoutError, getattr, ar, 'wall_time')
149 ar.get(2)
151 ar.get(2)
150 self.assertTrue(ar.wall_time < 1.)
152 self.assertTrue(ar.wall_time < 1.)
151 self.assertTrue(ar.wall_time > 0.2)
153 self.assertTrue(ar.wall_time > 0.2)
152
154
153 def test_serial_time_single(self):
155 def test_serial_time_single(self):
154 v = self.client.load_balanced_view()
156 v = self.client.load_balanced_view()
155 ar = v.apply_async(time.sleep, 0.25)
157 ar = v.apply_async(time.sleep, 0.25)
156 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
158 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
157 ar.get(2)
159 ar.get(2)
158 self.assertTrue(ar.serial_time < 1.)
160 self.assertTrue(ar.serial_time < 1.)
159 self.assertTrue(ar.serial_time > 0.2)
161 self.assertTrue(ar.serial_time > 0.2)
160
162
161 def test_serial_time_multi(self):
163 def test_serial_time_multi(self):
162 self.minimum_engines(4)
164 self.minimum_engines(4)
163 v = self.client[:]
165 v = self.client[:]
164 ar = v.apply_async(time.sleep, 0.25)
166 ar = v.apply_async(time.sleep, 0.25)
165 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
167 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
166 ar.get(2)
168 ar.get(2)
167 self.assertTrue(ar.serial_time < 2.)
169 self.assertTrue(ar.serial_time < 2.)
168 self.assertTrue(ar.serial_time > 0.8)
170 self.assertTrue(ar.serial_time > 0.8)
169
171
170 def test_elapsed_single(self):
172 def test_elapsed_single(self):
171 v = self.client.load_balanced_view()
173 v = self.client.load_balanced_view()
172 ar = v.apply_async(time.sleep, 0.25)
174 ar = v.apply_async(time.sleep, 0.25)
173 while not ar.ready():
175 while not ar.ready():
174 time.sleep(0.01)
176 time.sleep(0.01)
175 self.assertTrue(ar.elapsed < 1)
177 self.assertTrue(ar.elapsed < 1)
176 self.assertTrue(ar.elapsed < 1)
178 self.assertTrue(ar.elapsed < 1)
177 ar.get(2)
179 ar.get(2)
178
180
179 def test_elapsed_multi(self):
181 def test_elapsed_multi(self):
180 v = self.client[:]
182 v = self.client[:]
181 ar = v.apply_async(time.sleep, 0.25)
183 ar = v.apply_async(time.sleep, 0.25)
182 while not ar.ready():
184 while not ar.ready():
183 time.sleep(0.01)
185 time.sleep(0.01)
184 self.assertTrue(ar.elapsed < 1)
186 self.assertTrue(ar.elapsed < 1)
185 self.assertTrue(ar.elapsed < 1)
187 self.assertTrue(ar.elapsed < 1)
186 ar.get(2)
188 ar.get(2)
187
189
188 def test_hubresult_timestamps(self):
190 def test_hubresult_timestamps(self):
189 self.minimum_engines(4)
191 self.minimum_engines(4)
190 v = self.client[:]
192 v = self.client[:]
191 ar = v.apply_async(time.sleep, 0.25)
193 ar = v.apply_async(time.sleep, 0.25)
192 ar.get(2)
194 ar.get(2)
193 rc2 = Client(profile='iptest')
195 rc2 = Client(profile='iptest')
194 # must have try/finally to close second Client, otherwise
196 # must have try/finally to close second Client, otherwise
195 # will have dangling sockets causing problems
197 # will have dangling sockets causing problems
196 try:
198 try:
197 time.sleep(0.25)
199 time.sleep(0.25)
198 hr = rc2.get_result(ar.msg_ids)
200 hr = rc2.get_result(ar.msg_ids)
199 self.assertTrue(hr.elapsed > 0., "got bad elapsed: %s" % hr.elapsed)
201 self.assertTrue(hr.elapsed > 0., "got bad elapsed: %s" % hr.elapsed)
200 hr.get(1)
202 hr.get(1)
201 self.assertTrue(hr.wall_time < ar.wall_time + 0.2, "got bad wall_time: %s > %s" % (hr.wall_time, ar.wall_time))
203 self.assertTrue(hr.wall_time < ar.wall_time + 0.2, "got bad wall_time: %s > %s" % (hr.wall_time, ar.wall_time))
202 self.assertEqual(hr.serial_time, ar.serial_time)
204 self.assertEqual(hr.serial_time, ar.serial_time)
203 finally:
205 finally:
204 rc2.close()
206 rc2.close()
205
207
206 def test_display_empty_streams_single(self):
208 def test_display_empty_streams_single(self):
207 """empty stdout/err are not displayed (single result)"""
209 """empty stdout/err are not displayed (single result)"""
208 self.minimum_engines(1)
210 self.minimum_engines(1)
209
211
210 v = self.client[-1]
212 v = self.client[-1]
211 ar = v.execute("print (5555)")
213 ar = v.execute("print (5555)")
212 ar.get(5)
214 ar.get(5)
213 with capture_output() as io:
215 with capture_output() as io:
214 ar.display_outputs()
216 ar.display_outputs()
215 self.assertEqual(io.stderr, '')
217 self.assertEqual(io.stderr, '')
216 self.assertEqual('5555\n', io.stdout)
218 self.assertEqual('5555\n', io.stdout)
217
219
218 ar = v.execute("a=5")
220 ar = v.execute("a=5")
219 ar.get(5)
221 ar.get(5)
220 with capture_output() as io:
222 with capture_output() as io:
221 ar.display_outputs()
223 ar.display_outputs()
222 self.assertEqual(io.stderr, '')
224 self.assertEqual(io.stderr, '')
223 self.assertEqual(io.stdout, '')
225 self.assertEqual(io.stdout, '')
224
226
225 def test_display_empty_streams_type(self):
227 def test_display_empty_streams_type(self):
226 """empty stdout/err are not displayed (groupby type)"""
228 """empty stdout/err are not displayed (groupby type)"""
227 self.minimum_engines(1)
229 self.minimum_engines(1)
228
230
229 v = self.client[:]
231 v = self.client[:]
230 ar = v.execute("print (5555)")
232 ar = v.execute("print (5555)")
231 ar.get(5)
233 ar.get(5)
232 with capture_output() as io:
234 with capture_output() as io:
233 ar.display_outputs()
235 ar.display_outputs()
234 self.assertEqual(io.stderr, '')
236 self.assertEqual(io.stderr, '')
235 self.assertEqual(io.stdout.count('5555'), len(v), io.stdout)
237 self.assertEqual(io.stdout.count('5555'), len(v), io.stdout)
236 self.assertFalse('\n\n' in io.stdout, io.stdout)
238 self.assertFalse('\n\n' in io.stdout, io.stdout)
237 self.assertEqual(io.stdout.count('[stdout:'), len(v), io.stdout)
239 self.assertEqual(io.stdout.count('[stdout:'), len(v), io.stdout)
238
240
239 ar = v.execute("a=5")
241 ar = v.execute("a=5")
240 ar.get(5)
242 ar.get(5)
241 with capture_output() as io:
243 with capture_output() as io:
242 ar.display_outputs()
244 ar.display_outputs()
243 self.assertEqual(io.stderr, '')
245 self.assertEqual(io.stderr, '')
244 self.assertEqual(io.stdout, '')
246 self.assertEqual(io.stdout, '')
245
247
246 def test_display_empty_streams_engine(self):
248 def test_display_empty_streams_engine(self):
247 """empty stdout/err are not displayed (groupby engine)"""
249 """empty stdout/err are not displayed (groupby engine)"""
248 self.minimum_engines(1)
250 self.minimum_engines(1)
249
251
250 v = self.client[:]
252 v = self.client[:]
251 ar = v.execute("print (5555)")
253 ar = v.execute("print (5555)")
252 ar.get(5)
254 ar.get(5)
253 with capture_output() as io:
255 with capture_output() as io:
254 ar.display_outputs('engine')
256 ar.display_outputs('engine')
255 self.assertEqual(io.stderr, '')
257 self.assertEqual(io.stderr, '')
256 self.assertEqual(io.stdout.count('5555'), len(v), io.stdout)
258 self.assertEqual(io.stdout.count('5555'), len(v), io.stdout)
257 self.assertFalse('\n\n' in io.stdout, io.stdout)
259 self.assertFalse('\n\n' in io.stdout, io.stdout)
258 self.assertEqual(io.stdout.count('[stdout:'), len(v), io.stdout)
260 self.assertEqual(io.stdout.count('[stdout:'), len(v), io.stdout)
259
261
260 ar = v.execute("a=5")
262 ar = v.execute("a=5")
261 ar.get(5)
263 ar.get(5)
262 with capture_output() as io:
264 with capture_output() as io:
263 ar.display_outputs('engine')
265 ar.display_outputs('engine')
264 self.assertEqual(io.stderr, '')
266 self.assertEqual(io.stderr, '')
265 self.assertEqual(io.stdout, '')
267 self.assertEqual(io.stdout, '')
268
269 def test_await_data(self):
270 """asking for ar.data flushes outputs"""
271 self.minimum_engines(1)
266
272
273 v = self.client[-1]
274 ar = v.execute('\n'.join([
275 "import time",
276 "from IPython.zmq.datapub import publish_data",
277 "for i in range(5):",
278 " publish_data(dict(i=i))",
279 " time.sleep(0.1)",
280 ]), block=False)
281 found = set()
282 tic = time.time()
283 # timeout after 10s
284 while time.time() <= tic + 10:
285 if ar.data:
286 found.add(ar.data['i'])
287 if ar.data['i'] == 4:
288 break
289 time.sleep(0.05)
290
291 ar.get(5)
292 nt.assert_in(4, found)
293 self.assertTrue(len(found) > 1, "should have seen data multiple times, but got: %s" % found)
267
294
General Comments 0
You need to be logged in to leave comments. Login now