##// END OF EJS Templates
add Client.spin_thread()...
MinRK -
Show More
@@ -18,6 +18,7 b' Authors:'
18 18 import os
19 19 import json
20 20 import sys
21 from threading import Thread, Event
21 22 import time
22 23 import warnings
23 24 from datetime import datetime
@@ -36,7 +37,7 b' from IPython.utils.jsonutil import rekey'
36 37 from IPython.utils.localinterfaces import LOCAL_IPS
37 38 from IPython.utils.path import get_ipython_dir
38 39 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
39 Dict, List, Bool, Set)
40 Dict, List, Bool, Set, Any)
40 41 from IPython.external.decorator import decorator
41 42 from IPython.external.ssh import tunnel
42 43
@@ -249,6 +250,8 b' class Client(HasTraits):'
249 250 metadata = Instance('collections.defaultdict', (Metadata,))
250 251 history = List()
251 252 debug = Bool(False)
253 _spin_thread = Any()
254 _stop_spinning = Any()
252 255
253 256 profile=Unicode()
254 257 def _profile_default(self):
@@ -299,6 +302,7 b' class Client(HasTraits):'
299 302 if context is None:
300 303 context = zmq.Context.instance()
301 304 self._context = context
305 self._stop_spinning = Event()
302 306
303 307 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
304 308 if self._cd is not None:
@@ -785,12 +789,59 b' class Client(HasTraits):'
785 789 def close(self):
786 790 if self._closed:
787 791 return
792 self.stop_spin_thread()
788 793 snames = filter(lambda n: n.endswith('socket'), dir(self))
789 794 for socket in map(lambda name: getattr(self, name), snames):
790 795 if isinstance(socket, zmq.Socket) and not socket.closed:
791 796 socket.close()
792 797 self._closed = True
793 798
799 def _spin_every(self, interval=1):
800 """target func for use in spin_thread"""
801 while True:
802 if self._stop_spinning.is_set():
803 return
804 time.sleep(interval)
805 self.spin()
806
807 def spin_thread(self, interval=1):
808 """call Client.spin() in a background thread on some regular interval
809
810 This helps ensure that messages don't pile up too much in the zmq queue
811 while you are working on other things, or just leaving an idle terminal.
812
813 It also helps limit potential padding of the `received` timestamp
814 on AsyncResult objects, used for timings.
815
816 Parameters
817 ----------
818
819 interval : float, optional
820 The interval on which to spin the client in the background thread
821 (simply passed to time.sleep).
822
823 Notes
824 -----
825
826 For precision timing, you may want to use this method to put a bound
827 on the jitter (in seconds) in `received` timestamps used
828 in AsyncResult.wall_time.
829
830 """
831 if self._spin_thread is not None:
832 self.stop_spin_thread()
833 self._stop_spinning.clear()
834 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
835 self._spin_thread.daemon = True
836 self._spin_thread.start()
837
838 def stop_spin_thread(self):
839 """stop background spin_thread, if any"""
840 if self._spin_thread is not None:
841 self._stop_spinning.set()
842 self._spin_thread.join()
843 self._spin_thread = None
844
794 845 def spin(self):
795 846 """Flush any registration notifications and execution results
796 847 waiting in the ZMQ queue.
@@ -317,3 +317,21 b' class TestClient(ClusterTestCase):'
317 317 hist = self.client.hub_history()
318 318 self.assertEquals(len(hist), 0)
319 319
320 def test_spin_thread(self):
321 self.client.spin_thread(0.01)
322 ar = self.client[-1].apply_async(lambda : 1)
323 time.sleep(0.1)
324 self.assertTrue(ar.wall_time < 0.1,
325 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
326 )
327
328 def test_stop_spin_thread(self):
329 self.client.spin_thread(0.01)
330 self.client.stop_spin_thread()
331 ar = self.client[-1].apply_async(lambda : 1)
332 time.sleep(0.15)
333 self.assertTrue(ar.wall_time > 0.1,
334 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
335 )
336
337
General Comments 0
You need to be logged in to leave comments. Login now