##// END OF EJS Templates
added preliminary tests for zmq.parallel
MinRK -
Show More
@@ -0,0 +1,51 b''
1 """toplevel setup/teardown for prallel tests."""
2 import time
3
4 from IPython.zmq.parallel.ipcluster import launch_process
5 from IPython.zmq.parallel.entry_point import select_random_ports
6 # from multiprocessing import Process
7
8 cluster_logs = dict(
9 regport=0,
10 processes = [],
11 )
12
13 def setup():
14 p = select_random_ports(1)[0]
15 cluster_logs['regport']=p
16 cp = launch_process('controller',('--scheduler lru --ping 100 --regport %i'%p).split())
17 # cp.start()
18 cluster_logs['processes'].append(cp)
19 add_engine(p)
20 time.sleep(2)
21
22 def add_engine(port=None):
23 if port is None:
24 port = cluster_logs['regport']
25 ep = launch_process('engine', ['--regport',str(port)])
26 # ep.start()
27 cluster_logs['processes'].append(ep)
28 return ep
29
30 def teardown():
31 time.sleep(1)
32 processes = cluster_logs['processes']
33 while processes:
34 p = processes.pop()
35 if p.poll() is None:
36 try:
37 print 'terminating'
38 p.terminate()
39 except Exception, e:
40 print e
41 pass
42 if p.poll() is None:
43 time.sleep(.25)
44 if p.poll() is None:
45 try:
46 print 'killing'
47 p.kill()
48 except:
49 print "couldn't shutdown process: ",p
50
51
@@ -0,0 +1,55 b''
1 import time
2 from signal import SIGINT
3 from multiprocessing import Process
4
5 from zmq.tests import BaseZMQTestCase
6
7 from IPython.zmq.parallel.ipcluster import launch_process
8 from IPython.zmq.parallel.entry_point import select_random_ports
9 from IPython.zmq.parallel.client import Client
10 from IPython.zmq.parallel.tests import cluster_logs,add_engine
11
12
13 class ClusterTestCase(BaseZMQTestCase):
14
15 def add_engines(self, n=1):
16 """add multiple engines to our cluster"""
17 for i in range(n):
18 self.engines.append(add_engine())
19
20 def wait_on_engines(self):
21 """wait for our engines to connect."""
22 while len(self.client.ids) < len(self.engines)+self.base_engine_count:
23 time.sleep(0.1)
24
25 def start_cluster(self, n=1):
26 """start a cluster"""
27 raise NotImplementedError("Don't use this anymore")
28 rport = select_random_ports(1)[0]
29 args = [ '--regport', str(rport), '--ip', '127.0.0.1' ]
30 cp = launch_process('controller', args)
31 eps = [ launch_process('engine', args+['--ident', 'engine-%i'%i]) for i in range(n) ]
32 return rport, args, cp, eps
33
34 def connect_client(self, port=None):
35 """connect a client with my Context, and track its sockets for cleanup"""
36 if port is None:
37 port = cluster_logs['regport']
38 c = Client('tcp://127.0.0.1:%i'%port,context=self.context)
39 for name in filter(lambda n:n.endswith('socket'), dir(c)):
40 self.sockets.append(getattr(c, name))
41 return c
42
43 def setUp(self):
44 BaseZMQTestCase.setUp(self)
45 self.client = self.connect_client()
46 self.base_engine_count=len(self.client.ids)
47 self.engines=[]
48
49 def tearDown(self):
50 [ e.terminate() for e in filter(lambda e: e.poll() is None, self.engines) ]
51 # while len(self.client.ids) > self.base_engine_count:
52 # time.sleep(.1)
53 del self.engines
54 BaseZMQTestCase.tearDown(self)
55 No newline at end of file
@@ -0,0 +1,42 b''
1 import time
2
3 from IPython.zmq.parallel.view import LoadBalancedView, DirectView
4
5 from clienttest import ClusterTestCase
6
7 class TestClient(ClusterTestCase):
8
9 def test_ids(self):
10 self.assertEquals(len(self.client.ids), 1)
11 self.add_engines(3)
12 self.wait_on_engines()
13 self.assertEquals(self.client.ids, set(range(4)))
14
15 def test_segfault(self):
16 def segfault():
17 import ctypes
18 ctypes.memset(-1,0,1)
19 self.client[0].apply(segfault)
20 while 0 in self.client.ids:
21 time.sleep(.01)
22 self.client.spin()
23
24 def test_view_indexing(self):
25 self.add_engines(7)
26 self.wait_on_engines()
27 targets = self.client._build_targets('all')[-1]
28 v = self.client[:]
29 self.assertEquals(v.targets, targets)
30 v =self.client[2]
31 self.assertEquals(v.targets, 2)
32 v =self.client[1,2]
33 self.assertEquals(v.targets, [1,2])
34 v =self.client[::2]
35 self.assertEquals(v.targets, targets[::2])
36 v =self.client[1::3]
37 self.assertEquals(v.targets, targets[1::3])
38 v =self.client[:-3]
39 self.assertEquals(v.targets, targets[:-3])
40 v =self.client[None]
41 self.assert_(isinstance(v, LoadBalancedView))
42 No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now