Show More
@@ -18,6 +18,7 b' Authors:' | |||
|
18 | 18 | import os |
|
19 | 19 | import json |
|
20 | 20 | import sys |
|
21 | from threading import Thread, Event | |
|
21 | 22 | import time |
|
22 | 23 | import warnings |
|
23 | 24 | from datetime import datetime |
@@ -36,7 +37,7 b' from IPython.utils.jsonutil import rekey' | |||
|
36 | 37 | from IPython.utils.localinterfaces import LOCAL_IPS |
|
37 | 38 | from IPython.utils.path import get_ipython_dir |
|
38 | 39 | from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode, |
|
39 | Dict, List, Bool, Set) | |
|
40 | Dict, List, Bool, Set, Any) | |
|
40 | 41 | from IPython.external.decorator import decorator |
|
41 | 42 | from IPython.external.ssh import tunnel |
|
42 | 43 | |
@@ -249,6 +250,8 b' class Client(HasTraits):' | |||
|
249 | 250 | metadata = Instance('collections.defaultdict', (Metadata,)) |
|
250 | 251 | history = List() |
|
251 | 252 | debug = Bool(False) |
|
253 | _spin_thread = Any() | |
|
254 | _stop_spinning = Any() | |
|
252 | 255 | |
|
253 | 256 | profile=Unicode() |
|
254 | 257 | def _profile_default(self): |
@@ -299,6 +302,7 b' class Client(HasTraits):' | |||
|
299 | 302 | if context is None: |
|
300 | 303 | context = zmq.Context.instance() |
|
301 | 304 | self._context = context |
|
305 | self._stop_spinning = Event() | |
|
302 | 306 | |
|
303 | 307 | self._setup_profile_dir(self.profile, profile_dir, ipython_dir) |
|
304 | 308 | if self._cd is not None: |
@@ -785,12 +789,59 b' class Client(HasTraits):' | |||
|
785 | 789 | def close(self): |
|
786 | 790 | if self._closed: |
|
787 | 791 | return |
|
792 | self.stop_spin_thread() | |
|
788 | 793 | snames = filter(lambda n: n.endswith('socket'), dir(self)) |
|
789 | 794 | for socket in map(lambda name: getattr(self, name), snames): |
|
790 | 795 | if isinstance(socket, zmq.Socket) and not socket.closed: |
|
791 | 796 | socket.close() |
|
792 | 797 | self._closed = True |
|
793 | 798 | |
|
799 | def _spin_every(self, interval=1): | |
|
800 | """target func for use in spin_thread""" | |
|
801 | while True: | |
|
802 | if self._stop_spinning.is_set(): | |
|
803 | return | |
|
804 | time.sleep(interval) | |
|
805 | self.spin() | |
|
806 | ||
|
807 | def spin_thread(self, interval=1): | |
|
808 | """call Client.spin() in a background thread on some regular interval | |
|
809 | ||
|
810 | This helps ensure that messages don't pile up too much in the zmq queue | |
|
811 | while you are working on other things, or just leaving an idle terminal. | |
|
812 | ||
|
813 | It also helps limit potential padding of the `received` timestamp | |
|
814 | on AsyncResult objects, used for timings. | |
|
815 | ||
|
816 | Parameters | |
|
817 | ---------- | |
|
818 | ||
|
819 | interval : float, optional | |
|
820 | The interval on which to spin the client in the background thread | |
|
821 | (simply passed to time.sleep). | |
|
822 | ||
|
823 | Notes | |
|
824 | ----- | |
|
825 | ||
|
826 | For precision timing, you may want to use this method to put a bound | |
|
827 | on the jitter (in seconds) in `received` timestamps used | |
|
828 | in AsyncResult.wall_time. | |
|
829 | ||
|
830 | """ | |
|
831 | if self._spin_thread is not None: | |
|
832 | self.stop_spin_thread() | |
|
833 | self._stop_spinning.clear() | |
|
834 | self._spin_thread = Thread(target=self._spin_every, args=(interval,)) | |
|
835 | self._spin_thread.daemon = True | |
|
836 | self._spin_thread.start() | |
|
837 | ||
|
838 | def stop_spin_thread(self): | |
|
839 | """stop background spin_thread, if any""" | |
|
840 | if self._spin_thread is not None: | |
|
841 | self._stop_spinning.set() | |
|
842 | self._spin_thread.join() | |
|
843 | self._spin_thread = None | |
|
844 | ||
|
794 | 845 | def spin(self): |
|
795 | 846 | """Flush any registration notifications and execution results |
|
796 | 847 | waiting in the ZMQ queue. |
@@ -316,4 +316,22 b' class TestClient(ClusterTestCase):' | |||
|
316 | 316 | self.client.purge_results('all') |
|
317 | 317 | hist = self.client.hub_history() |
|
318 | 318 | self.assertEquals(len(hist), 0) |
|
319 | ||
|
320 | def test_spin_thread(self): | |
|
321 | self.client.spin_thread(0.01) | |
|
322 | ar = self.client[-1].apply_async(lambda : 1) | |
|
323 | time.sleep(0.1) | |
|
324 | self.assertTrue(ar.wall_time < 0.1, | |
|
325 | "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time | |
|
326 | ) | |
|
327 | ||
|
328 | def test_stop_spin_thread(self): | |
|
329 | self.client.spin_thread(0.01) | |
|
330 | self.client.stop_spin_thread() | |
|
331 | ar = self.client[-1].apply_async(lambda : 1) | |
|
332 | time.sleep(0.15) | |
|
333 | self.assertTrue(ar.wall_time > 0.1, | |
|
334 | "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time | |
|
335 | ) | |
|
336 | ||
|
319 | 337 |
General Comments 0
You need to be logged in to leave comments.
Login now