import time from signal import SIGINT from multiprocessing import Process from nose import SkipTest from zmq.tests import BaseZMQTestCase from IPython.external.decorator import decorator from IPython.zmq.parallel.ipcluster import launch_process from IPython.zmq.parallel.entry_point import select_random_ports from IPython.zmq.parallel.client import Client from IPython.zmq.parallel.tests import processes,add_engine # simple tasks for use in apply tests def segfault(): """""" import ctypes ctypes.memset(-1,0,1) def wait(n): """sleep for a time""" import time time.sleep(n) return n def raiser(eclass): """raise an exception""" raise eclass() # test decorator for skipping tests when libraries are unavailable def skip_without(*names): """skip a test if some names are not importable""" @decorator def skip_without_names(f, *args, **kwargs): """decorator to skip tests in the absence of numpy.""" for name in names: try: __import__(name) except ImportError: raise SkipTest return f(*args, **kwargs) return skip_without_names class ClusterTestCase(BaseZMQTestCase): def add_engines(self, n=1, block=True): """add multiple engines to our cluster""" for i in range(n): self.engines.append(add_engine()) if block: self.wait_on_engines() def wait_on_engines(self, timeout=5): """wait for our engines to connect.""" n = len(self.engines)+self.base_engine_count tic = time.time() while time.time()-tic < timeout and len(self.client.ids) < n: time.sleep(0.1) assert not self.client.ids < n, "waiting for engines timed out" def connect_client(self): """connect a client with my Context, and track its sockets for cleanup""" c = Client(profile='iptest',context=self.context) for name in filter(lambda n:n.endswith('socket'), dir(c)): self.sockets.append(getattr(c, name)) return c def setUp(self): BaseZMQTestCase.setUp(self) self.client = self.connect_client() self.base_engine_count=len(self.client.ids) self.engines=[] def tearDown(self): [ e.terminate() for e in filter(lambda e: e.poll() is None, self.engines) ] # while len(self.client.ids) > self.base_engine_count: # time.sleep(.1) del self.engines BaseZMQTestCase.tearDown(self)