Show More
@@ -18,6 +18,7 b' Authors:' | |||||
18 | import os |
|
18 | import os | |
19 | import json |
|
19 | import json | |
20 | import sys |
|
20 | import sys | |
|
21 | from threading import Thread, Event | |||
21 | import time |
|
22 | import time | |
22 | import warnings |
|
23 | import warnings | |
23 | from datetime import datetime |
|
24 | from datetime import datetime | |
@@ -36,7 +37,7 b' from IPython.utils.jsonutil import rekey' | |||||
36 | from IPython.utils.localinterfaces import LOCAL_IPS |
|
37 | from IPython.utils.localinterfaces import LOCAL_IPS | |
37 | from IPython.utils.path import get_ipython_dir |
|
38 | from IPython.utils.path import get_ipython_dir | |
38 | from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode, |
|
39 | from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode, | |
39 | Dict, List, Bool, Set) |
|
40 | Dict, List, Bool, Set, Any) | |
40 | from IPython.external.decorator import decorator |
|
41 | from IPython.external.decorator import decorator | |
41 | from IPython.external.ssh import tunnel |
|
42 | from IPython.external.ssh import tunnel | |
42 |
|
43 | |||
@@ -249,6 +250,8 b' class Client(HasTraits):' | |||||
249 | metadata = Instance('collections.defaultdict', (Metadata,)) |
|
250 | metadata = Instance('collections.defaultdict', (Metadata,)) | |
250 | history = List() |
|
251 | history = List() | |
251 | debug = Bool(False) |
|
252 | debug = Bool(False) | |
|
253 | _spin_thread = Any() | |||
|
254 | _stop_spinning = Any() | |||
252 |
|
255 | |||
253 | profile=Unicode() |
|
256 | profile=Unicode() | |
254 | def _profile_default(self): |
|
257 | def _profile_default(self): | |
@@ -299,6 +302,7 b' class Client(HasTraits):' | |||||
299 | if context is None: |
|
302 | if context is None: | |
300 | context = zmq.Context.instance() |
|
303 | context = zmq.Context.instance() | |
301 | self._context = context |
|
304 | self._context = context | |
|
305 | self._stop_spinning = Event() | |||
302 |
|
306 | |||
303 | self._setup_profile_dir(self.profile, profile_dir, ipython_dir) |
|
307 | self._setup_profile_dir(self.profile, profile_dir, ipython_dir) | |
304 | if self._cd is not None: |
|
308 | if self._cd is not None: | |
@@ -785,12 +789,59 b' class Client(HasTraits):' | |||||
785 | def close(self): |
|
789 | def close(self): | |
786 | if self._closed: |
|
790 | if self._closed: | |
787 | return |
|
791 | return | |
|
792 | self.stop_spin_thread() | |||
788 | snames = filter(lambda n: n.endswith('socket'), dir(self)) |
|
793 | snames = filter(lambda n: n.endswith('socket'), dir(self)) | |
789 | for socket in map(lambda name: getattr(self, name), snames): |
|
794 | for socket in map(lambda name: getattr(self, name), snames): | |
790 | if isinstance(socket, zmq.Socket) and not socket.closed: |
|
795 | if isinstance(socket, zmq.Socket) and not socket.closed: | |
791 | socket.close() |
|
796 | socket.close() | |
792 | self._closed = True |
|
797 | self._closed = True | |
793 |
|
798 | |||
|
799 | def _spin_every(self, interval=1): | |||
|
800 | """target func for use in spin_thread""" | |||
|
801 | while True: | |||
|
802 | if self._stop_spinning.is_set(): | |||
|
803 | return | |||
|
804 | time.sleep(interval) | |||
|
805 | self.spin() | |||
|
806 | ||||
|
807 | def spin_thread(self, interval=1): | |||
|
808 | """call Client.spin() in a background thread on some regular interval | |||
|
809 | ||||
|
810 | This helps ensure that messages don't pile up too much in the zmq queue | |||
|
811 | while you are working on other things, or just leaving an idle terminal. | |||
|
812 | ||||
|
813 | It also helps limit potential padding of the `received` timestamp | |||
|
814 | on AsyncResult objects, used for timings. | |||
|
815 | ||||
|
816 | Parameters | |||
|
817 | ---------- | |||
|
818 | ||||
|
819 | interval : float, optional | |||
|
820 | The interval on which to spin the client in the background thread | |||
|
821 | (simply passed to time.sleep). | |||
|
822 | ||||
|
823 | Notes | |||
|
824 | ----- | |||
|
825 | ||||
|
826 | For precision timing, you may want to use this method to put a bound | |||
|
827 | on the jitter (in seconds) in `received` timestamps used | |||
|
828 | in AsyncResult.wall_time. | |||
|
829 | ||||
|
830 | """ | |||
|
831 | if self._spin_thread is not None: | |||
|
832 | self.stop_spin_thread() | |||
|
833 | self._stop_spinning.clear() | |||
|
834 | self._spin_thread = Thread(target=self._spin_every, args=(interval,)) | |||
|
835 | self._spin_thread.daemon = True | |||
|
836 | self._spin_thread.start() | |||
|
837 | ||||
|
838 | def stop_spin_thread(self): | |||
|
839 | """stop background spin_thread, if any""" | |||
|
840 | if self._spin_thread is not None: | |||
|
841 | self._stop_spinning.set() | |||
|
842 | self._spin_thread.join() | |||
|
843 | self._spin_thread = None | |||
|
844 | ||||
794 | def spin(self): |
|
845 | def spin(self): | |
795 | """Flush any registration notifications and execution results |
|
846 | """Flush any registration notifications and execution results | |
796 | waiting in the ZMQ queue. |
|
847 | waiting in the ZMQ queue. |
@@ -316,4 +316,22 b' class TestClient(ClusterTestCase):' | |||||
316 | self.client.purge_results('all') |
|
316 | self.client.purge_results('all') | |
317 | hist = self.client.hub_history() |
|
317 | hist = self.client.hub_history() | |
318 | self.assertEquals(len(hist), 0) |
|
318 | self.assertEquals(len(hist), 0) | |
|
319 | ||||
|
320 | def test_spin_thread(self): | |||
|
321 | self.client.spin_thread(0.01) | |||
|
322 | ar = self.client[-1].apply_async(lambda : 1) | |||
|
323 | time.sleep(0.1) | |||
|
324 | self.assertTrue(ar.wall_time < 0.1, | |||
|
325 | "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time | |||
|
326 | ) | |||
|
327 | ||||
|
328 | def test_stop_spin_thread(self): | |||
|
329 | self.client.spin_thread(0.01) | |||
|
330 | self.client.stop_spin_thread() | |||
|
331 | ar = self.client[-1].apply_async(lambda : 1) | |||
|
332 | time.sleep(0.15) | |||
|
333 | self.assertTrue(ar.wall_time > 0.1, | |||
|
334 | "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time | |||
|
335 | ) | |||
|
336 | ||||
319 |
|
337 |
General Comments 0
You need to be logged in to leave comments.
Login now