clienttest.py
54 lines
| 2.0 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3595 | import time | |
from signal import SIGINT | |||
from multiprocessing import Process | |||
from zmq.tests import BaseZMQTestCase | |||
from IPython.zmq.parallel.ipcluster import launch_process | |||
from IPython.zmq.parallel.entry_point import select_random_ports | |||
from IPython.zmq.parallel.client import Client | |||
from IPython.zmq.parallel.tests import cluster_logs,add_engine | |||
class ClusterTestCase(BaseZMQTestCase): | |||
def add_engines(self, n=1): | |||
"""add multiple engines to our cluster""" | |||
for i in range(n): | |||
self.engines.append(add_engine()) | |||
def wait_on_engines(self): | |||
"""wait for our engines to connect.""" | |||
while len(self.client.ids) < len(self.engines)+self.base_engine_count: | |||
time.sleep(0.1) | |||
def start_cluster(self, n=1): | |||
"""start a cluster""" | |||
raise NotImplementedError("Don't use this anymore") | |||
rport = select_random_ports(1)[0] | |||
args = [ '--regport', str(rport), '--ip', '127.0.0.1' ] | |||
cp = launch_process('controller', args) | |||
eps = [ launch_process('engine', args+['--ident', 'engine-%i'%i]) for i in range(n) ] | |||
return rport, args, cp, eps | |||
def connect_client(self, port=None): | |||
"""connect a client with my Context, and track its sockets for cleanup""" | |||
if port is None: | |||
port = cluster_logs['regport'] | |||
c = Client('tcp://127.0.0.1:%i'%port,context=self.context) | |||
for name in filter(lambda n:n.endswith('socket'), dir(c)): | |||
self.sockets.append(getattr(c, name)) | |||
return c | |||
def setUp(self): | |||
BaseZMQTestCase.setUp(self) | |||
self.client = self.connect_client() | |||
self.base_engine_count=len(self.client.ids) | |||
self.engines=[] | |||
def tearDown(self): | |||
[ e.terminate() for e in filter(lambda e: e.poll() is None, self.engines) ] | |||
# while len(self.client.ids) > self.base_engine_count: | |||
# time.sleep(.1) | |||
del self.engines | |||
BaseZMQTestCase.tearDown(self) | |||