import time from signal import SIGINT from multiprocessing import Process from zmq.tests import BaseZMQTestCase from IPython.zmq.parallel.ipcluster import launch_process from IPython.zmq.parallel.entry_point import select_random_ports from IPython.zmq.parallel.client import Client from IPython.zmq.parallel.tests import cluster_logs,add_engine class ClusterTestCase(BaseZMQTestCase): def add_engines(self, n=1): """add multiple engines to our cluster""" for i in range(n): self.engines.append(add_engine()) def wait_on_engines(self): """wait for our engines to connect.""" while len(self.client.ids) < len(self.engines)+self.base_engine_count: time.sleep(0.1) def start_cluster(self, n=1): """start a cluster""" raise NotImplementedError("Don't use this anymore") rport = select_random_ports(1)[0] args = [ '--regport', str(rport), '--ip', '127.0.0.1' ] cp = launch_process('controller', args) eps = [ launch_process('engine', args+['--ident', 'engine-%i'%i]) for i in range(n) ] return rport, args, cp, eps def connect_client(self, port=None): """connect a client with my Context, and track its sockets for cleanup""" if port is None: port = cluster_logs['regport'] c = Client('tcp://127.0.0.1:%i'%port,context=self.context) for name in filter(lambda n:n.endswith('socket'), dir(c)): self.sockets.append(getattr(c, name)) return c def setUp(self): BaseZMQTestCase.setUp(self) self.client = self.connect_client() self.base_engine_count=len(self.client.ids) self.engines=[] def tearDown(self): [ e.terminate() for e in filter(lambda e: e.poll() is None, self.engines) ] # while len(self.client.ids) > self.base_engine_count: # time.sleep(.1) del self.engines BaseZMQTestCase.tearDown(self)