From 39eab52f53f11eec8661c6d51669840938bf4ca1 2011-04-08 00:38:14 From: MinRK Date: 2011-04-08 00:38:14 Subject: [PATCH] added preliminary tests for zmq.parallel --- diff --git a/IPython/zmq/parallel/tests/__init__.py b/IPython/zmq/parallel/tests/__init__.py new file mode 100644 index 0000000..9ab35fe --- /dev/null +++ b/IPython/zmq/parallel/tests/__init__.py @@ -0,0 +1,51 @@ +"""toplevel setup/teardown for prallel tests.""" +import time + +from IPython.zmq.parallel.ipcluster import launch_process +from IPython.zmq.parallel.entry_point import select_random_ports +# from multiprocessing import Process + +cluster_logs = dict( + regport=0, + processes = [], +) + +def setup(): + p = select_random_ports(1)[0] + cluster_logs['regport']=p + cp = launch_process('controller',('--scheduler lru --ping 100 --regport %i'%p).split()) + # cp.start() + cluster_logs['processes'].append(cp) + add_engine(p) + time.sleep(2) + +def add_engine(port=None): + if port is None: + port = cluster_logs['regport'] + ep = launch_process('engine', ['--regport',str(port)]) + # ep.start() + cluster_logs['processes'].append(ep) + return ep + +def teardown(): + time.sleep(1) + processes = cluster_logs['processes'] + while processes: + p = processes.pop() + if p.poll() is None: + try: + print 'terminating' + p.terminate() + except Exception, e: + print e + pass + if p.poll() is None: + time.sleep(.25) + if p.poll() is None: + try: + print 'killing' + p.kill() + except: + print "couldn't shutdown process: ",p + + diff --git a/IPython/zmq/parallel/tests/clienttest.py b/IPython/zmq/parallel/tests/clienttest.py new file mode 100644 index 0000000..88c5a75 --- /dev/null +++ b/IPython/zmq/parallel/tests/clienttest.py @@ -0,0 +1,55 @@ +import time +from signal import SIGINT +from multiprocessing import Process + +from zmq.tests import BaseZMQTestCase + +from IPython.zmq.parallel.ipcluster import launch_process +from IPython.zmq.parallel.entry_point import select_random_ports +from IPython.zmq.parallel.client import Client +from IPython.zmq.parallel.tests import cluster_logs,add_engine + + +class ClusterTestCase(BaseZMQTestCase): + + def add_engines(self, n=1): + """add multiple engines to our cluster""" + for i in range(n): + self.engines.append(add_engine()) + + def wait_on_engines(self): + """wait for our engines to connect.""" + while len(self.client.ids) < len(self.engines)+self.base_engine_count: + time.sleep(0.1) + + def start_cluster(self, n=1): + """start a cluster""" + raise NotImplementedError("Don't use this anymore") + rport = select_random_ports(1)[0] + args = [ '--regport', str(rport), '--ip', '127.0.0.1' ] + cp = launch_process('controller', args) + eps = [ launch_process('engine', args+['--ident', 'engine-%i'%i]) for i in range(n) ] + return rport, args, cp, eps + + def connect_client(self, port=None): + """connect a client with my Context, and track its sockets for cleanup""" + if port is None: + port = cluster_logs['regport'] + c = Client('tcp://127.0.0.1:%i'%port,context=self.context) + for name in filter(lambda n:n.endswith('socket'), dir(c)): + self.sockets.append(getattr(c, name)) + return c + + def setUp(self): + BaseZMQTestCase.setUp(self) + self.client = self.connect_client() + self.base_engine_count=len(self.client.ids) + self.engines=[] + + def tearDown(self): + [ e.terminate() for e in filter(lambda e: e.poll() is None, self.engines) ] + # while len(self.client.ids) > self.base_engine_count: + # time.sleep(.1) + del self.engines + BaseZMQTestCase.tearDown(self) + \ No newline at end of file diff --git a/IPython/zmq/parallel/tests/test_client.py b/IPython/zmq/parallel/tests/test_client.py new file mode 100644 index 0000000..c5281eb --- /dev/null +++ b/IPython/zmq/parallel/tests/test_client.py @@ -0,0 +1,42 @@ +import time + +from IPython.zmq.parallel.view import LoadBalancedView, DirectView + +from clienttest import ClusterTestCase + +class TestClient(ClusterTestCase): + + def test_ids(self): + self.assertEquals(len(self.client.ids), 1) + self.add_engines(3) + self.wait_on_engines() + self.assertEquals(self.client.ids, set(range(4))) + + def test_segfault(self): + def segfault(): + import ctypes + ctypes.memset(-1,0,1) + self.client[0].apply(segfault) + while 0 in self.client.ids: + time.sleep(.01) + self.client.spin() + + def test_view_indexing(self): + self.add_engines(7) + self.wait_on_engines() + targets = self.client._build_targets('all')[-1] + v = self.client[:] + self.assertEquals(v.targets, targets) + v =self.client[2] + self.assertEquals(v.targets, 2) + v =self.client[1,2] + self.assertEquals(v.targets, [1,2]) + v =self.client[::2] + self.assertEquals(v.targets, targets[::2]) + v =self.client[1::3] + self.assertEquals(v.targets, targets[1::3]) + v =self.client[:-3] + self.assertEquals(v.targets, targets[:-3]) + v =self.client[None] + self.assert_(isinstance(v, LoadBalancedView)) + \ No newline at end of file