Show More
@@ -82,6 +82,7 b' class AsyncResult(object):' | |||
|
82 | 82 | self._targets = targets |
|
83 | 83 | self._tracker = tracker |
|
84 | 84 | self._ready = False |
|
85 | self._outputs_ready = False | |
|
85 | 86 | self._success = None |
|
86 | 87 | self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ] |
|
87 | 88 | if len(msg_ids) == 1: |
@@ -134,6 +135,9 b' class AsyncResult(object):' | |||
|
134 | 135 | """Return whether the call has completed.""" |
|
135 | 136 | if not self._ready: |
|
136 | 137 | self.wait(0) |
|
138 | elif not self._outputs_ready: | |
|
139 | self._wait_for_outputs(0) | |
|
140 | ||
|
137 | 141 | return self._ready |
|
138 | 142 | |
|
139 | 143 | def wait(self, timeout=-1): |
@@ -142,6 +146,7 b' class AsyncResult(object):' | |||
|
142 | 146 | This method always returns None. |
|
143 | 147 | """ |
|
144 | 148 | if self._ready: |
|
149 | self._wait_for_outputs(timeout) | |
|
145 | 150 | return |
|
146 | 151 | self._ready = self._client.wait(self.msg_ids, timeout) |
|
147 | 152 | if self._ready: |
@@ -161,8 +166,10 b' class AsyncResult(object):' | |||
|
161 | 166 | else: |
|
162 | 167 | self._success = True |
|
163 | 168 | finally: |
|
164 | ||
|
165 | self._wait_for_outputs(10) | |
|
169 | if timeout is None or timeout < 0: | |
|
170 | # cutoff infinite wait at 10s | |
|
171 | timeout = 10 | |
|
172 | self._wait_for_outputs(timeout) | |
|
166 | 173 | |
|
167 | 174 | |
|
168 | 175 | def successful(self): |
@@ -251,6 +258,7 b' class AsyncResult(object):' | |||
|
251 | 258 | return error.collect_exceptions(self._result[key], self._fname) |
|
252 | 259 | elif isinstance(key, basestring): |
|
253 | 260 | # metadata proxy *does not* require that results are done |
|
261 | self.wait(0) | |
|
254 | 262 | values = [ md[key] for md in self._metadata ] |
|
255 | 263 | if self._single_result: |
|
256 | 264 | return values[0] |
@@ -377,11 +385,13 b' class AsyncResult(object):' | |||
|
377 | 385 | """ |
|
378 | 386 | return self.timedelta(self.submitted, self.received) |
|
379 | 387 | |
|
380 |
def wait_interactive(self, interval=1., timeout= |
|
|
388 | def wait_interactive(self, interval=1., timeout=-1): | |
|
381 | 389 | """interactive wait, printing progress at regular intervals""" |
|
390 | if timeout is None: | |
|
391 | timeout = -1 | |
|
382 | 392 | N = len(self) |
|
383 | 393 | tic = time.time() |
|
384 |
while not self.ready() and (timeout |
|
|
394 | while not self.ready() and (timeout < 0 or time.time() - tic <= timeout): | |
|
385 | 395 | self.wait(interval) |
|
386 | 396 | clear_output() |
|
387 | 397 | print("%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed), end="") |
@@ -433,13 +443,21 b' class AsyncResult(object):' | |||
|
433 | 443 | def _wait_for_outputs(self, timeout=-1): |
|
434 | 444 | """wait for the 'status=idle' message that indicates we have all outputs |
|
435 | 445 | """ |
|
436 | if not self._success: | |
|
446 | if self._outputs_ready or not self._success: | |
|
437 | 447 | # don't wait on errors |
|
438 | 448 | return |
|
449 | ||
|
450 | # cast None to -1 for infinite timeout | |
|
451 | if timeout is None: | |
|
452 | timeout = -1 | |
|
453 | ||
|
439 | 454 | tic = time.time() |
|
440 | while not all(md['outputs_ready'] for md in self._metadata): | |
|
455 | self._client._flush_iopub(self._client._iopub_socket) | |
|
456 | self._outputs_ready = all(md['outputs_ready'] for md in self._metadata) | |
|
457 | while not self._outputs_ready: | |
|
441 | 458 | time.sleep(0.01) |
|
442 | 459 | self._client._flush_iopub(self._client._iopub_socket) |
|
460 | self._outputs_ready = all(md['outputs_ready'] for md in self._metadata) | |
|
443 | 461 | if timeout >= 0 and time.time() > tic + timeout: |
|
444 | 462 | break |
|
445 | 463 | |
@@ -643,9 +661,9 b' class AsyncHubResult(AsyncResult):' | |||
|
643 | 661 | so use `AsyncHubResult.wait()` sparingly. |
|
644 | 662 | """ |
|
645 | 663 | |
|
646 |
def _wait_for_outputs(self, timeout= |
|
|
664 | def _wait_for_outputs(self, timeout=-1): | |
|
647 | 665 | """no-op, because HubResults are never incomplete""" |
|
648 | return | |
|
666 | self._outputs_ready = True | |
|
649 | 667 | |
|
650 | 668 | def wait(self, timeout=-1): |
|
651 | 669 | """wait for result to complete.""" |
@@ -18,6 +18,8 b' Authors:' | |||
|
18 | 18 | |
|
19 | 19 | import time |
|
20 | 20 | |
|
21 | import nose.tools as nt | |
|
22 | ||
|
21 | 23 | from IPython.utils.io import capture_output |
|
22 | 24 | |
|
23 | 25 | from IPython.parallel.error import TimeoutError |
@@ -263,5 +265,30 b' class AsyncResultTest(ClusterTestCase):' | |||
|
263 | 265 | ar.display_outputs('engine') |
|
264 | 266 | self.assertEqual(io.stderr, '') |
|
265 | 267 | self.assertEqual(io.stdout, '') |
|
268 | ||
|
269 | def test_await_data(self): | |
|
270 | """asking for ar.data flushes outputs""" | |
|
271 | self.minimum_engines(1) | |
|
266 | 272 | |
|
273 | v = self.client[-1] | |
|
274 | ar = v.execute('\n'.join([ | |
|
275 | "import time", | |
|
276 | "from IPython.zmq.datapub import publish_data", | |
|
277 | "for i in range(5):", | |
|
278 | " publish_data(dict(i=i))", | |
|
279 | " time.sleep(0.1)", | |
|
280 | ]), block=False) | |
|
281 | found = set() | |
|
282 | tic = time.time() | |
|
283 | # timeout after 10s | |
|
284 | while time.time() <= tic + 10: | |
|
285 | if ar.data: | |
|
286 | found.add(ar.data['i']) | |
|
287 | if ar.data['i'] == 4: | |
|
288 | break | |
|
289 | time.sleep(0.05) | |
|
290 | ||
|
291 | ar.get(5) | |
|
292 | nt.assert_in(4, found) | |
|
293 | self.assertTrue(len(found) > 1, "should have seen data multiple times, but got: %s" % found) | |
|
267 | 294 |
General Comments 0
You need to be logged in to leave comments.
Login now