##// END OF EJS Templates
better flush iopub with AsyncResults...
MinRK -
Show More
@@ -82,6 +82,7 b' class AsyncResult(object):'
82 self._targets = targets
82 self._targets = targets
83 self._tracker = tracker
83 self._tracker = tracker
84 self._ready = False
84 self._ready = False
85 self._outputs_ready = False
85 self._success = None
86 self._success = None
86 self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ]
87 self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ]
87 if len(msg_ids) == 1:
88 if len(msg_ids) == 1:
@@ -134,6 +135,9 b' class AsyncResult(object):'
134 """Return whether the call has completed."""
135 """Return whether the call has completed."""
135 if not self._ready:
136 if not self._ready:
136 self.wait(0)
137 self.wait(0)
138 elif not self._outputs_ready:
139 self._wait_for_outputs(0)
140
137 return self._ready
141 return self._ready
138
142
139 def wait(self, timeout=-1):
143 def wait(self, timeout=-1):
@@ -142,6 +146,7 b' class AsyncResult(object):'
142 This method always returns None.
146 This method always returns None.
143 """
147 """
144 if self._ready:
148 if self._ready:
149 self._wait_for_outputs(timeout)
145 return
150 return
146 self._ready = self._client.wait(self.msg_ids, timeout)
151 self._ready = self._client.wait(self.msg_ids, timeout)
147 if self._ready:
152 if self._ready:
@@ -161,8 +166,9 b' class AsyncResult(object):'
161 else:
166 else:
162 self._success = True
167 self._success = True
163 finally:
168 finally:
164
169 if timeout is not None and timeout < 0:
165 self._wait_for_outputs(10)
170 timeout = 10
171 self._wait_for_outputs(timeout)
166
172
167
173
168 def successful(self):
174 def successful(self):
@@ -251,6 +257,7 b' class AsyncResult(object):'
251 return error.collect_exceptions(self._result[key], self._fname)
257 return error.collect_exceptions(self._result[key], self._fname)
252 elif isinstance(key, basestring):
258 elif isinstance(key, basestring):
253 # metadata proxy *does not* require that results are done
259 # metadata proxy *does not* require that results are done
260 self.wait(0)
254 values = [ md[key] for md in self._metadata ]
261 values = [ md[key] for md in self._metadata ]
255 if self._single_result:
262 if self._single_result:
256 return values[0]
263 return values[0]
@@ -377,11 +384,13 b' class AsyncResult(object):'
377 """
384 """
378 return self.timedelta(self.submitted, self.received)
385 return self.timedelta(self.submitted, self.received)
379
386
380 def wait_interactive(self, interval=1., timeout=None):
387 def wait_interactive(self, interval=1., timeout=-1):
381 """interactive wait, printing progress at regular intervals"""
388 """interactive wait, printing progress at regular intervals"""
389 if timeout is None:
390 timeout = -1
382 N = len(self)
391 N = len(self)
383 tic = time.time()
392 tic = time.time()
384 while not self.ready() and (timeout is None or time.time() - tic <= timeout):
393 while not self.ready() and (timeout < 0 or time.time() - tic <= timeout):
385 self.wait(interval)
394 self.wait(interval)
386 clear_output()
395 clear_output()
387 print("%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed), end="")
396 print("%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed), end="")
@@ -433,13 +442,21 b' class AsyncResult(object):'
433 def _wait_for_outputs(self, timeout=-1):
442 def _wait_for_outputs(self, timeout=-1):
434 """wait for the 'status=idle' message that indicates we have all outputs
443 """wait for the 'status=idle' message that indicates we have all outputs
435 """
444 """
436 if not self._success:
445 if self._outputs_ready or not self._success:
437 # don't wait on errors
446 # don't wait on errors
438 return
447 return
448
449 # cast None to -1 for infinite timeout
450 if timeout is None:
451 timeout = -1
452
439 tic = time.time()
453 tic = time.time()
440 while not all(md['outputs_ready'] for md in self._metadata):
454 self._client._flush_iopub(self._client._iopub_socket)
455 self._outputs_ready = all(md['outputs_ready'] for md in self._metadata)
456 while not self._outputs_ready:
441 time.sleep(0.01)
457 time.sleep(0.01)
442 self._client._flush_iopub(self._client._iopub_socket)
458 self._client._flush_iopub(self._client._iopub_socket)
459 self._outputs_ready = all(md['outputs_ready'] for md in self._metadata)
443 if timeout >= 0 and time.time() > tic + timeout:
460 if timeout >= 0 and time.time() > tic + timeout:
444 break
461 break
445
462
@@ -643,9 +660,9 b' class AsyncHubResult(AsyncResult):'
643 so use `AsyncHubResult.wait()` sparingly.
660 so use `AsyncHubResult.wait()` sparingly.
644 """
661 """
645
662
646 def _wait_for_outputs(self, timeout=None):
663 def _wait_for_outputs(self, timeout=-1):
647 """no-op, because HubResults are never incomplete"""
664 """no-op, because HubResults are never incomplete"""
648 return
665 self._outputs_ready = True
649
666
650 def wait(self, timeout=-1):
667 def wait(self, timeout=-1):
651 """wait for result to complete."""
668 """wait for result to complete."""
@@ -18,6 +18,8 b' Authors:'
18
18
19 import time
19 import time
20
20
21 import nose.tools as nt
22
21 from IPython.utils.io import capture_output
23 from IPython.utils.io import capture_output
22
24
23 from IPython.parallel.error import TimeoutError
25 from IPython.parallel.error import TimeoutError
@@ -263,5 +265,30 b' class AsyncResultTest(ClusterTestCase):'
263 ar.display_outputs('engine')
265 ar.display_outputs('engine')
264 self.assertEqual(io.stderr, '')
266 self.assertEqual(io.stderr, '')
265 self.assertEqual(io.stdout, '')
267 self.assertEqual(io.stdout, '')
268
269 def test_await_data(self):
270 """asking for ar.data flushes outputs"""
271 self.minimum_engines(1)
266
272
273 v = self.client[-1]
274 ar = v.execute('\n'.join([
275 "import time",
276 "from IPython.zmq.datapub import publish_data",
277 "for i in range(5):",
278 " publish_data(dict(i=i))",
279 " time.sleep(0.1)",
280 ]), block=False)
281 found = set()
282 tic = time.time()
283 # timeout after 10s
284 while time.time() <= tic + 10:
285 if ar.data:
286 found.add(ar.data['i'])
287 if ar.data['i'] == 4:
288 break
289 time.sleep(0.05)
290
291 ar.get(5)
292 nt.assert_in(4, found)
293 self.assertTrue(len(found) > 1, "should have seen data multiple times, but got: %s" % found)
267
294
General Comments 0
You need to be logged in to leave comments. Login now