##// END OF EJS Templates
add Client.spin_thread()...
MinRK -
Show More
@@ -18,6 +18,7 b' Authors:'
18 import os
18 import os
19 import json
19 import json
20 import sys
20 import sys
21 from threading import Thread, Event
21 import time
22 import time
22 import warnings
23 import warnings
23 from datetime import datetime
24 from datetime import datetime
@@ -36,7 +37,7 b' from IPython.utils.jsonutil import rekey'
36 from IPython.utils.localinterfaces import LOCAL_IPS
37 from IPython.utils.localinterfaces import LOCAL_IPS
37 from IPython.utils.path import get_ipython_dir
38 from IPython.utils.path import get_ipython_dir
38 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
39 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
39 Dict, List, Bool, Set)
40 Dict, List, Bool, Set, Any)
40 from IPython.external.decorator import decorator
41 from IPython.external.decorator import decorator
41 from IPython.external.ssh import tunnel
42 from IPython.external.ssh import tunnel
42
43
@@ -249,6 +250,8 b' class Client(HasTraits):'
249 metadata = Instance('collections.defaultdict', (Metadata,))
250 metadata = Instance('collections.defaultdict', (Metadata,))
250 history = List()
251 history = List()
251 debug = Bool(False)
252 debug = Bool(False)
253 _spin_thread = Any()
254 _stop_spinning = Any()
252
255
253 profile=Unicode()
256 profile=Unicode()
254 def _profile_default(self):
257 def _profile_default(self):
@@ -299,6 +302,7 b' class Client(HasTraits):'
299 if context is None:
302 if context is None:
300 context = zmq.Context.instance()
303 context = zmq.Context.instance()
301 self._context = context
304 self._context = context
305 self._stop_spinning = Event()
302
306
303 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
307 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
304 if self._cd is not None:
308 if self._cd is not None:
@@ -785,12 +789,59 b' class Client(HasTraits):'
785 def close(self):
789 def close(self):
786 if self._closed:
790 if self._closed:
787 return
791 return
792 self.stop_spin_thread()
788 snames = filter(lambda n: n.endswith('socket'), dir(self))
793 snames = filter(lambda n: n.endswith('socket'), dir(self))
789 for socket in map(lambda name: getattr(self, name), snames):
794 for socket in map(lambda name: getattr(self, name), snames):
790 if isinstance(socket, zmq.Socket) and not socket.closed:
795 if isinstance(socket, zmq.Socket) and not socket.closed:
791 socket.close()
796 socket.close()
792 self._closed = True
797 self._closed = True
793
798
799 def _spin_every(self, interval=1):
800 """target func for use in spin_thread"""
801 while True:
802 if self._stop_spinning.is_set():
803 return
804 time.sleep(interval)
805 self.spin()
806
807 def spin_thread(self, interval=1):
808 """call Client.spin() in a background thread on some regular interval
809
810 This helps ensure that messages don't pile up too much in the zmq queue
811 while you are working on other things, or just leaving an idle terminal.
812
813 It also helps limit potential padding of the `received` timestamp
814 on AsyncResult objects, used for timings.
815
816 Parameters
817 ----------
818
819 interval : float, optional
820 The interval on which to spin the client in the background thread
821 (simply passed to time.sleep).
822
823 Notes
824 -----
825
826 For precision timing, you may want to use this method to put a bound
827 on the jitter (in seconds) in `received` timestamps used
828 in AsyncResult.wall_time.
829
830 """
831 if self._spin_thread is not None:
832 self.stop_spin_thread()
833 self._stop_spinning.clear()
834 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
835 self._spin_thread.daemon = True
836 self._spin_thread.start()
837
838 def stop_spin_thread(self):
839 """stop background spin_thread, if any"""
840 if self._spin_thread is not None:
841 self._stop_spinning.set()
842 self._spin_thread.join()
843 self._spin_thread = None
844
794 def spin(self):
845 def spin(self):
795 """Flush any registration notifications and execution results
846 """Flush any registration notifications and execution results
796 waiting in the ZMQ queue.
847 waiting in the ZMQ queue.
@@ -316,4 +316,22 b' class TestClient(ClusterTestCase):'
316 self.client.purge_results('all')
316 self.client.purge_results('all')
317 hist = self.client.hub_history()
317 hist = self.client.hub_history()
318 self.assertEquals(len(hist), 0)
318 self.assertEquals(len(hist), 0)
319
320 def test_spin_thread(self):
321 self.client.spin_thread(0.01)
322 ar = self.client[-1].apply_async(lambda : 1)
323 time.sleep(0.1)
324 self.assertTrue(ar.wall_time < 0.1,
325 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
326 )
327
328 def test_stop_spin_thread(self):
329 self.client.spin_thread(0.01)
330 self.client.stop_spin_thread()
331 ar = self.client[-1].apply_async(lambda : 1)
332 time.sleep(0.15)
333 self.assertTrue(ar.wall_time > 0.1,
334 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
335 )
336
319
337
General Comments 0
You need to be logged in to leave comments. Login now