##// END OF EJS Templates
Merge pull request #2255 from minrk/arwait...
Fernando Perez -
r8149:99eddce3 merge
parent child Browse files
Show More
@@ -82,6 +82,7 b' class AsyncResult(object):'
82 82 self._targets = targets
83 83 self._tracker = tracker
84 84 self._ready = False
85 self._outputs_ready = False
85 86 self._success = None
86 87 self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ]
87 88 if len(msg_ids) == 1:
@@ -134,6 +135,9 b' class AsyncResult(object):'
134 135 """Return whether the call has completed."""
135 136 if not self._ready:
136 137 self.wait(0)
138 elif not self._outputs_ready:
139 self._wait_for_outputs(0)
140
137 141 return self._ready
138 142
139 143 def wait(self, timeout=-1):
@@ -142,6 +146,7 b' class AsyncResult(object):'
142 146 This method always returns None.
143 147 """
144 148 if self._ready:
149 self._wait_for_outputs(timeout)
145 150 return
146 151 self._ready = self._client.wait(self.msg_ids, timeout)
147 152 if self._ready:
@@ -161,8 +166,10 b' class AsyncResult(object):'
161 166 else:
162 167 self._success = True
163 168 finally:
164
165 self._wait_for_outputs(10)
169 if timeout is None or timeout < 0:
170 # cutoff infinite wait at 10s
171 timeout = 10
172 self._wait_for_outputs(timeout)
166 173
167 174
168 175 def successful(self):
@@ -251,6 +258,7 b' class AsyncResult(object):'
251 258 return error.collect_exceptions(self._result[key], self._fname)
252 259 elif isinstance(key, basestring):
253 260 # metadata proxy *does not* require that results are done
261 self.wait(0)
254 262 values = [ md[key] for md in self._metadata ]
255 263 if self._single_result:
256 264 return values[0]
@@ -377,11 +385,13 b' class AsyncResult(object):'
377 385 """
378 386 return self.timedelta(self.submitted, self.received)
379 387
380 def wait_interactive(self, interval=1., timeout=None):
388 def wait_interactive(self, interval=1., timeout=-1):
381 389 """interactive wait, printing progress at regular intervals"""
390 if timeout is None:
391 timeout = -1
382 392 N = len(self)
383 393 tic = time.time()
384 while not self.ready() and (timeout is None or time.time() - tic <= timeout):
394 while not self.ready() and (timeout < 0 or time.time() - tic <= timeout):
385 395 self.wait(interval)
386 396 clear_output()
387 397 print("%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed), end="")
@@ -433,13 +443,21 b' class AsyncResult(object):'
433 443 def _wait_for_outputs(self, timeout=-1):
434 444 """wait for the 'status=idle' message that indicates we have all outputs
435 445 """
436 if not self._success:
446 if self._outputs_ready or not self._success:
437 447 # don't wait on errors
438 448 return
449
450 # cast None to -1 for infinite timeout
451 if timeout is None:
452 timeout = -1
453
439 454 tic = time.time()
440 while not all(md['outputs_ready'] for md in self._metadata):
455 self._client._flush_iopub(self._client._iopub_socket)
456 self._outputs_ready = all(md['outputs_ready'] for md in self._metadata)
457 while not self._outputs_ready:
441 458 time.sleep(0.01)
442 459 self._client._flush_iopub(self._client._iopub_socket)
460 self._outputs_ready = all(md['outputs_ready'] for md in self._metadata)
443 461 if timeout >= 0 and time.time() > tic + timeout:
444 462 break
445 463
@@ -643,9 +661,9 b' class AsyncHubResult(AsyncResult):'
643 661 so use `AsyncHubResult.wait()` sparingly.
644 662 """
645 663
646 def _wait_for_outputs(self, timeout=None):
664 def _wait_for_outputs(self, timeout=-1):
647 665 """no-op, because HubResults are never incomplete"""
648 return
666 self._outputs_ready = True
649 667
650 668 def wait(self, timeout=-1):
651 669 """wait for result to complete."""
@@ -18,6 +18,8 b' Authors:'
18 18
19 19 import time
20 20
21 import nose.tools as nt
22
21 23 from IPython.utils.io import capture_output
22 24
23 25 from IPython.parallel.error import TimeoutError
@@ -263,5 +265,30 b' class AsyncResultTest(ClusterTestCase):'
263 265 ar.display_outputs('engine')
264 266 self.assertEqual(io.stderr, '')
265 267 self.assertEqual(io.stdout, '')
268
269 def test_await_data(self):
270 """asking for ar.data flushes outputs"""
271 self.minimum_engines(1)
266 272
273 v = self.client[-1]
274 ar = v.execute('\n'.join([
275 "import time",
276 "from IPython.zmq.datapub import publish_data",
277 "for i in range(5):",
278 " publish_data(dict(i=i))",
279 " time.sleep(0.1)",
280 ]), block=False)
281 found = set()
282 tic = time.time()
283 # timeout after 10s
284 while time.time() <= tic + 10:
285 if ar.data:
286 found.add(ar.data['i'])
287 if ar.data['i'] == 4:
288 break
289 time.sleep(0.05)
290
291 ar.get(5)
292 nt.assert_in(4, found)
293 self.assertTrue(len(found) > 1, "should have seen data multiple times, but got: %s" % found)
267 294
General Comments 0
You need to be logged in to leave comments. Login now