##// END OF EJS Templates
Merge pull request #2255 from minrk/arwait...
Fernando Perez -
r8149:99eddce3 merge
parent child Browse files
Show More
@@ -82,6 +82,7 b' class AsyncResult(object):'
82 self._targets = targets
82 self._targets = targets
83 self._tracker = tracker
83 self._tracker = tracker
84 self._ready = False
84 self._ready = False
85 self._outputs_ready = False
85 self._success = None
86 self._success = None
86 self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ]
87 self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ]
87 if len(msg_ids) == 1:
88 if len(msg_ids) == 1:
@@ -134,6 +135,9 b' class AsyncResult(object):'
134 """Return whether the call has completed."""
135 """Return whether the call has completed."""
135 if not self._ready:
136 if not self._ready:
136 self.wait(0)
137 self.wait(0)
138 elif not self._outputs_ready:
139 self._wait_for_outputs(0)
140
137 return self._ready
141 return self._ready
138
142
139 def wait(self, timeout=-1):
143 def wait(self, timeout=-1):
@@ -142,6 +146,7 b' class AsyncResult(object):'
142 This method always returns None.
146 This method always returns None.
143 """
147 """
144 if self._ready:
148 if self._ready:
149 self._wait_for_outputs(timeout)
145 return
150 return
146 self._ready = self._client.wait(self.msg_ids, timeout)
151 self._ready = self._client.wait(self.msg_ids, timeout)
147 if self._ready:
152 if self._ready:
@@ -161,8 +166,10 b' class AsyncResult(object):'
161 else:
166 else:
162 self._success = True
167 self._success = True
163 finally:
168 finally:
164
169 if timeout is None or timeout < 0:
165 self._wait_for_outputs(10)
170 # cutoff infinite wait at 10s
171 timeout = 10
172 self._wait_for_outputs(timeout)
166
173
167
174
168 def successful(self):
175 def successful(self):
@@ -251,6 +258,7 b' class AsyncResult(object):'
251 return error.collect_exceptions(self._result[key], self._fname)
258 return error.collect_exceptions(self._result[key], self._fname)
252 elif isinstance(key, basestring):
259 elif isinstance(key, basestring):
253 # metadata proxy *does not* require that results are done
260 # metadata proxy *does not* require that results are done
261 self.wait(0)
254 values = [ md[key] for md in self._metadata ]
262 values = [ md[key] for md in self._metadata ]
255 if self._single_result:
263 if self._single_result:
256 return values[0]
264 return values[0]
@@ -377,11 +385,13 b' class AsyncResult(object):'
377 """
385 """
378 return self.timedelta(self.submitted, self.received)
386 return self.timedelta(self.submitted, self.received)
379
387
380 def wait_interactive(self, interval=1., timeout=None):
388 def wait_interactive(self, interval=1., timeout=-1):
381 """interactive wait, printing progress at regular intervals"""
389 """interactive wait, printing progress at regular intervals"""
390 if timeout is None:
391 timeout = -1
382 N = len(self)
392 N = len(self)
383 tic = time.time()
393 tic = time.time()
384 while not self.ready() and (timeout is None or time.time() - tic <= timeout):
394 while not self.ready() and (timeout < 0 or time.time() - tic <= timeout):
385 self.wait(interval)
395 self.wait(interval)
386 clear_output()
396 clear_output()
387 print("%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed), end="")
397 print("%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed), end="")
@@ -433,13 +443,21 b' class AsyncResult(object):'
433 def _wait_for_outputs(self, timeout=-1):
443 def _wait_for_outputs(self, timeout=-1):
434 """wait for the 'status=idle' message that indicates we have all outputs
444 """wait for the 'status=idle' message that indicates we have all outputs
435 """
445 """
436 if not self._success:
446 if self._outputs_ready or not self._success:
437 # don't wait on errors
447 # don't wait on errors
438 return
448 return
449
450 # cast None to -1 for infinite timeout
451 if timeout is None:
452 timeout = -1
453
439 tic = time.time()
454 tic = time.time()
440 while not all(md['outputs_ready'] for md in self._metadata):
455 self._client._flush_iopub(self._client._iopub_socket)
456 self._outputs_ready = all(md['outputs_ready'] for md in self._metadata)
457 while not self._outputs_ready:
441 time.sleep(0.01)
458 time.sleep(0.01)
442 self._client._flush_iopub(self._client._iopub_socket)
459 self._client._flush_iopub(self._client._iopub_socket)
460 self._outputs_ready = all(md['outputs_ready'] for md in self._metadata)
443 if timeout >= 0 and time.time() > tic + timeout:
461 if timeout >= 0 and time.time() > tic + timeout:
444 break
462 break
445
463
@@ -643,9 +661,9 b' class AsyncHubResult(AsyncResult):'
643 so use `AsyncHubResult.wait()` sparingly.
661 so use `AsyncHubResult.wait()` sparingly.
644 """
662 """
645
663
646 def _wait_for_outputs(self, timeout=None):
664 def _wait_for_outputs(self, timeout=-1):
647 """no-op, because HubResults are never incomplete"""
665 """no-op, because HubResults are never incomplete"""
648 return
666 self._outputs_ready = True
649
667
650 def wait(self, timeout=-1):
668 def wait(self, timeout=-1):
651 """wait for result to complete."""
669 """wait for result to complete."""
@@ -18,6 +18,8 b' Authors:'
18
18
19 import time
19 import time
20
20
21 import nose.tools as nt
22
21 from IPython.utils.io import capture_output
23 from IPython.utils.io import capture_output
22
24
23 from IPython.parallel.error import TimeoutError
25 from IPython.parallel.error import TimeoutError
@@ -263,5 +265,30 b' class AsyncResultTest(ClusterTestCase):'
263 ar.display_outputs('engine')
265 ar.display_outputs('engine')
264 self.assertEqual(io.stderr, '')
266 self.assertEqual(io.stderr, '')
265 self.assertEqual(io.stdout, '')
267 self.assertEqual(io.stdout, '')
268
269 def test_await_data(self):
270 """asking for ar.data flushes outputs"""
271 self.minimum_engines(1)
266
272
273 v = self.client[-1]
274 ar = v.execute('\n'.join([
275 "import time",
276 "from IPython.zmq.datapub import publish_data",
277 "for i in range(5):",
278 " publish_data(dict(i=i))",
279 " time.sleep(0.1)",
280 ]), block=False)
281 found = set()
282 tic = time.time()
283 # timeout after 10s
284 while time.time() <= tic + 10:
285 if ar.data:
286 found.add(ar.data['i'])
287 if ar.data['i'] == 4:
288 break
289 time.sleep(0.05)
290
291 ar.get(5)
292 nt.assert_in(4, found)
293 self.assertTrue(len(found) > 1, "should have seen data multiple times, but got: %s" % found)
267
294
General Comments 0
You need to be logged in to leave comments. Login now