diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py index d92ab6b..ef122fb 100644 --- a/IPython/parallel/client/client.py +++ b/IPython/parallel/client/client.py @@ -18,6 +18,7 @@ Authors: import os import json import sys +from threading import Thread, Event import time import warnings from datetime import datetime @@ -36,7 +37,7 @@ from IPython.utils.jsonutil import rekey from IPython.utils.localinterfaces import LOCAL_IPS from IPython.utils.path import get_ipython_dir from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode, - Dict, List, Bool, Set) + Dict, List, Bool, Set, Any) from IPython.external.decorator import decorator from IPython.external.ssh import tunnel @@ -249,6 +250,8 @@ class Client(HasTraits): metadata = Instance('collections.defaultdict', (Metadata,)) history = List() debug = Bool(False) + _spin_thread = Any() + _stop_spinning = Any() profile=Unicode() def _profile_default(self): @@ -299,6 +302,7 @@ class Client(HasTraits): if context is None: context = zmq.Context.instance() self._context = context + self._stop_spinning = Event() self._setup_profile_dir(self.profile, profile_dir, ipython_dir) if self._cd is not None: @@ -785,12 +789,59 @@ class Client(HasTraits): def close(self): if self._closed: return + self.stop_spin_thread() snames = filter(lambda n: n.endswith('socket'), dir(self)) for socket in map(lambda name: getattr(self, name), snames): if isinstance(socket, zmq.Socket) and not socket.closed: socket.close() self._closed = True + def _spin_every(self, interval=1): + """target func for use in spin_thread""" + while True: + if self._stop_spinning.is_set(): + return + time.sleep(interval) + self.spin() + + def spin_thread(self, interval=1): + """call Client.spin() in a background thread on some regular interval + + This helps ensure that messages don't pile up too much in the zmq queue + while you are working on other things, or just leaving an idle terminal. + + It also helps limit potential padding of the `received` timestamp + on AsyncResult objects, used for timings. + + Parameters + ---------- + + interval : float, optional + The interval on which to spin the client in the background thread + (simply passed to time.sleep). + + Notes + ----- + + For precision timing, you may want to use this method to put a bound + on the jitter (in seconds) in `received` timestamps used + in AsyncResult.wall_time. + + """ + if self._spin_thread is not None: + self.stop_spin_thread() + self._stop_spinning.clear() + self._spin_thread = Thread(target=self._spin_every, args=(interval,)) + self._spin_thread.daemon = True + self._spin_thread.start() + + def stop_spin_thread(self): + """stop background spin_thread, if any""" + if self._spin_thread is not None: + self._stop_spinning.set() + self._spin_thread.join() + self._spin_thread = None + def spin(self): """Flush any registration notifications and execution results waiting in the ZMQ queue. diff --git a/IPython/parallel/tests/test_client.py b/IPython/parallel/tests/test_client.py index bd6ab3a..0980294 100644 --- a/IPython/parallel/tests/test_client.py +++ b/IPython/parallel/tests/test_client.py @@ -316,4 +316,22 @@ class TestClient(ClusterTestCase): self.client.purge_results('all') hist = self.client.hub_history() self.assertEquals(len(hist), 0) + + def test_spin_thread(self): + self.client.spin_thread(0.01) + ar = self.client[-1].apply_async(lambda : 1) + time.sleep(0.1) + self.assertTrue(ar.wall_time < 0.1, + "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time + ) + + def test_stop_spin_thread(self): + self.client.spin_thread(0.01) + self.client.stop_spin_thread() + ar = self.client[-1].apply_async(lambda : 1) + time.sleep(0.15) + self.assertTrue(ar.wall_time > 0.1, + "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time + ) +