Show More
@@ -82,6 +82,7 b' class AsyncResult(object):' | |||||
82 | self._targets = targets |
|
82 | self._targets = targets | |
83 | self._tracker = tracker |
|
83 | self._tracker = tracker | |
84 | self._ready = False |
|
84 | self._ready = False | |
|
85 | self._outputs_ready = False | |||
85 | self._success = None |
|
86 | self._success = None | |
86 | self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ] |
|
87 | self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ] | |
87 | if len(msg_ids) == 1: |
|
88 | if len(msg_ids) == 1: | |
@@ -134,6 +135,9 b' class AsyncResult(object):' | |||||
134 | """Return whether the call has completed.""" |
|
135 | """Return whether the call has completed.""" | |
135 | if not self._ready: |
|
136 | if not self._ready: | |
136 | self.wait(0) |
|
137 | self.wait(0) | |
|
138 | elif not self._outputs_ready: | |||
|
139 | self._wait_for_outputs(0) | |||
|
140 | ||||
137 | return self._ready |
|
141 | return self._ready | |
138 |
|
142 | |||
139 | def wait(self, timeout=-1): |
|
143 | def wait(self, timeout=-1): | |
@@ -142,6 +146,7 b' class AsyncResult(object):' | |||||
142 | This method always returns None. |
|
146 | This method always returns None. | |
143 | """ |
|
147 | """ | |
144 | if self._ready: |
|
148 | if self._ready: | |
|
149 | self._wait_for_outputs(timeout) | |||
145 | return |
|
150 | return | |
146 | self._ready = self._client.wait(self.msg_ids, timeout) |
|
151 | self._ready = self._client.wait(self.msg_ids, timeout) | |
147 | if self._ready: |
|
152 | if self._ready: | |
@@ -161,8 +166,10 b' class AsyncResult(object):' | |||||
161 | else: |
|
166 | else: | |
162 | self._success = True |
|
167 | self._success = True | |
163 | finally: |
|
168 | finally: | |
164 |
|
169 | if timeout is None or timeout < 0: | ||
165 | self._wait_for_outputs(10) |
|
170 | # cutoff infinite wait at 10s | |
|
171 | timeout = 10 | |||
|
172 | self._wait_for_outputs(timeout) | |||
166 |
|
173 | |||
167 |
|
174 | |||
168 | def successful(self): |
|
175 | def successful(self): | |
@@ -251,6 +258,7 b' class AsyncResult(object):' | |||||
251 | return error.collect_exceptions(self._result[key], self._fname) |
|
258 | return error.collect_exceptions(self._result[key], self._fname) | |
252 | elif isinstance(key, basestring): |
|
259 | elif isinstance(key, basestring): | |
253 | # metadata proxy *does not* require that results are done |
|
260 | # metadata proxy *does not* require that results are done | |
|
261 | self.wait(0) | |||
254 | values = [ md[key] for md in self._metadata ] |
|
262 | values = [ md[key] for md in self._metadata ] | |
255 | if self._single_result: |
|
263 | if self._single_result: | |
256 | return values[0] |
|
264 | return values[0] | |
@@ -377,11 +385,13 b' class AsyncResult(object):' | |||||
377 | """ |
|
385 | """ | |
378 | return self.timedelta(self.submitted, self.received) |
|
386 | return self.timedelta(self.submitted, self.received) | |
379 |
|
387 | |||
380 |
def wait_interactive(self, interval=1., timeout= |
|
388 | def wait_interactive(self, interval=1., timeout=-1): | |
381 | """interactive wait, printing progress at regular intervals""" |
|
389 | """interactive wait, printing progress at regular intervals""" | |
|
390 | if timeout is None: | |||
|
391 | timeout = -1 | |||
382 | N = len(self) |
|
392 | N = len(self) | |
383 | tic = time.time() |
|
393 | tic = time.time() | |
384 |
while not self.ready() and (timeout |
|
394 | while not self.ready() and (timeout < 0 or time.time() - tic <= timeout): | |
385 | self.wait(interval) |
|
395 | self.wait(interval) | |
386 | clear_output() |
|
396 | clear_output() | |
387 | print("%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed), end="") |
|
397 | print("%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed), end="") | |
@@ -433,13 +443,21 b' class AsyncResult(object):' | |||||
433 | def _wait_for_outputs(self, timeout=-1): |
|
443 | def _wait_for_outputs(self, timeout=-1): | |
434 | """wait for the 'status=idle' message that indicates we have all outputs |
|
444 | """wait for the 'status=idle' message that indicates we have all outputs | |
435 | """ |
|
445 | """ | |
436 | if not self._success: |
|
446 | if self._outputs_ready or not self._success: | |
437 | # don't wait on errors |
|
447 | # don't wait on errors | |
438 | return |
|
448 | return | |
|
449 | ||||
|
450 | # cast None to -1 for infinite timeout | |||
|
451 | if timeout is None: | |||
|
452 | timeout = -1 | |||
|
453 | ||||
439 | tic = time.time() |
|
454 | tic = time.time() | |
440 | while not all(md['outputs_ready'] for md in self._metadata): |
|
455 | self._client._flush_iopub(self._client._iopub_socket) | |
|
456 | self._outputs_ready = all(md['outputs_ready'] for md in self._metadata) | |||
|
457 | while not self._outputs_ready: | |||
441 | time.sleep(0.01) |
|
458 | time.sleep(0.01) | |
442 | self._client._flush_iopub(self._client._iopub_socket) |
|
459 | self._client._flush_iopub(self._client._iopub_socket) | |
|
460 | self._outputs_ready = all(md['outputs_ready'] for md in self._metadata) | |||
443 | if timeout >= 0 and time.time() > tic + timeout: |
|
461 | if timeout >= 0 and time.time() > tic + timeout: | |
444 | break |
|
462 | break | |
445 |
|
463 | |||
@@ -643,9 +661,9 b' class AsyncHubResult(AsyncResult):' | |||||
643 | so use `AsyncHubResult.wait()` sparingly. |
|
661 | so use `AsyncHubResult.wait()` sparingly. | |
644 | """ |
|
662 | """ | |
645 |
|
663 | |||
646 |
def _wait_for_outputs(self, timeout= |
|
664 | def _wait_for_outputs(self, timeout=-1): | |
647 | """no-op, because HubResults are never incomplete""" |
|
665 | """no-op, because HubResults are never incomplete""" | |
648 | return |
|
666 | self._outputs_ready = True | |
649 |
|
667 | |||
650 | def wait(self, timeout=-1): |
|
668 | def wait(self, timeout=-1): | |
651 | """wait for result to complete.""" |
|
669 | """wait for result to complete.""" |
@@ -18,6 +18,8 b' Authors:' | |||||
18 |
|
18 | |||
19 | import time |
|
19 | import time | |
20 |
|
20 | |||
|
21 | import nose.tools as nt | |||
|
22 | ||||
21 | from IPython.utils.io import capture_output |
|
23 | from IPython.utils.io import capture_output | |
22 |
|
24 | |||
23 | from IPython.parallel.error import TimeoutError |
|
25 | from IPython.parallel.error import TimeoutError | |
@@ -263,5 +265,30 b' class AsyncResultTest(ClusterTestCase):' | |||||
263 | ar.display_outputs('engine') |
|
265 | ar.display_outputs('engine') | |
264 | self.assertEqual(io.stderr, '') |
|
266 | self.assertEqual(io.stderr, '') | |
265 | self.assertEqual(io.stdout, '') |
|
267 | self.assertEqual(io.stdout, '') | |
|
268 | ||||
|
269 | def test_await_data(self): | |||
|
270 | """asking for ar.data flushes outputs""" | |||
|
271 | self.minimum_engines(1) | |||
266 |
|
272 | |||
|
273 | v = self.client[-1] | |||
|
274 | ar = v.execute('\n'.join([ | |||
|
275 | "import time", | |||
|
276 | "from IPython.zmq.datapub import publish_data", | |||
|
277 | "for i in range(5):", | |||
|
278 | " publish_data(dict(i=i))", | |||
|
279 | " time.sleep(0.1)", | |||
|
280 | ]), block=False) | |||
|
281 | found = set() | |||
|
282 | tic = time.time() | |||
|
283 | # timeout after 10s | |||
|
284 | while time.time() <= tic + 10: | |||
|
285 | if ar.data: | |||
|
286 | found.add(ar.data['i']) | |||
|
287 | if ar.data['i'] == 4: | |||
|
288 | break | |||
|
289 | time.sleep(0.05) | |||
|
290 | ||||
|
291 | ar.get(5) | |||
|
292 | nt.assert_in(4, found) | |||
|
293 | self.assertTrue(len(found) > 1, "should have seen data multiple times, but got: %s" % found) | |||
267 |
|
294 |
General Comments 0
You need to be logged in to leave comments.
Login now