##// END OF EJS Templates
expedite IPython.parallel tests...
MinRK -
Show More
@@ -68,9 +68,18 b' def setup():'
68 time.sleep(0.1)
68 time.sleep(0.1)
69 add_engines(1)
69 add_engines(1)
70
70
71 def add_engines(n=1, profile='iptest'):
71 def add_engines(n=1, profile='iptest', total=False):
72 """add a number of engines to a given profile.
73
74 If total is True, then already running engines are counted, and only
75 the additional engines necessary (if any) are started.
76 """
72 rc = Client(profile=profile)
77 rc = Client(profile=profile)
73 base = len(rc)
78 base = len(rc)
79
80 if total:
81 n = max(n - base, 0)
82
74 eps = []
83 eps = []
75 for i in range(n):
84 for i in range(n):
76 ep = TestProcessLauncher()
85 ep = TestProcessLauncher()
@@ -81,6 +81,13 b' class ClusterTestCase(BaseZMQTestCase):'
81 if block:
81 if block:
82 self.wait_on_engines()
82 self.wait_on_engines()
83
83
84 def minimum_engines(self, n=1, block=True):
85 """add engines until there are at least n connected"""
86 self.engines.extend(add_engines(n, total=True))
87 if block:
88 self.wait_on_engines()
89
90
84 def wait_on_engines(self, timeout=5):
91 def wait_on_engines(self, timeout=5):
85 """wait for our engines to connect."""
92 """wait for our engines to connect."""
86 n = len(self.engines)+self.base_engine_count
93 n = len(self.engines)+self.base_engine_count
@@ -23,7 +23,7 b' from IPython.parallel.tests import add_engines'
23 from .clienttest import ClusterTestCase
23 from .clienttest import ClusterTestCase
24
24
25 def setup():
25 def setup():
26 add_engines(2)
26 add_engines(2, total=True)
27
27
28 def wait(n):
28 def wait(n):
29 import time
29 import time
@@ -32,18 +32,18 b' from IPython.parallel import LoadBalancedView, DirectView'
32 from clienttest import ClusterTestCase, segfault, wait, add_engines
32 from clienttest import ClusterTestCase, segfault, wait, add_engines
33
33
34 def setup():
34 def setup():
35 add_engines(4)
35 add_engines(4, total=True)
36
36
37 class TestClient(ClusterTestCase):
37 class TestClient(ClusterTestCase):
38
38
39 def test_ids(self):
39 def test_ids(self):
40 n = len(self.client.ids)
40 n = len(self.client.ids)
41 self.add_engines(3)
41 self.add_engines(2)
42 self.assertEquals(len(self.client.ids), n+3)
42 self.assertEquals(len(self.client.ids), n+2)
43
43
44 def test_view_indexing(self):
44 def test_view_indexing(self):
45 """test index access for views"""
45 """test index access for views"""
46 self.add_engines(2)
46 self.minimum_engines(4)
47 targets = self.client._build_targets('all')[-1]
47 targets = self.client._build_targets('all')[-1]
48 v = self.client[:]
48 v = self.client[:]
49 self.assertEquals(v.targets, targets)
49 self.assertEquals(v.targets, targets)
@@ -98,7 +98,7 b' class TestClient(ClusterTestCase):'
98 ref = [ double(x) for x in seq ]
98 ref = [ double(x) for x in seq ]
99
99
100 # add some engines, which should be used
100 # add some engines, which should be used
101 self.add_engines(2)
101 self.add_engines(1)
102 n1 = len(self.client.ids)
102 n1 = len(self.client.ids)
103
103
104 # simple apply
104 # simple apply
@@ -131,7 +131,7 b' class TestClient(ClusterTestCase):'
131
131
132 def test_clear(self):
132 def test_clear(self):
133 """test clear behavior"""
133 """test clear behavior"""
134 # self.add_engines(2)
134 self.minimum_engines(2)
135 v = self.client[:]
135 v = self.client[:]
136 v.block=True
136 v.block=True
137 v.push(dict(a=5))
137 v.push(dict(a=5))
@@ -142,13 +142,11 b' class TestClient(ClusterTestCase):'
142 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
142 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
143 self.client.clear(block=True)
143 self.client.clear(block=True)
144 for i in self.client.ids:
144 for i in self.client.ids:
145 # print i
146 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
145 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
147
146
148 def test_get_result(self):
147 def test_get_result(self):
149 """test getting results from the Hub."""
148 """test getting results from the Hub."""
150 c = clientmod.Client(profile='iptest')
149 c = clientmod.Client(profile='iptest')
151 # self.add_engines(1)
152 t = c.ids[-1]
150 t = c.ids[-1]
153 ar = c[t].apply_async(wait, 1)
151 ar = c[t].apply_async(wait, 1)
154 # give the monitor time to notice the message
152 # give the monitor time to notice the message
@@ -162,7 +160,6 b' class TestClient(ClusterTestCase):'
162
160
163 def test_ids_list(self):
161 def test_ids_list(self):
164 """test client.ids"""
162 """test client.ids"""
165 # self.add_engines(2)
166 ids = self.client.ids
163 ids = self.client.ids
167 self.assertEquals(ids, self.client._ids)
164 self.assertEquals(ids, self.client._ids)
168 self.assertFalse(ids is self.client._ids)
165 self.assertFalse(ids is self.client._ids)
@@ -170,7 +167,6 b' class TestClient(ClusterTestCase):'
170 self.assertNotEquals(ids, self.client._ids)
167 self.assertNotEquals(ids, self.client._ids)
171
168
172 def test_queue_status(self):
169 def test_queue_status(self):
173 # self.addEngine(4)
174 ids = self.client.ids
170 ids = self.client.ids
175 id0 = ids[0]
171 id0 = ids[0]
176 qs = self.client.queue_status(targets=id0)
172 qs = self.client.queue_status(targets=id0)
@@ -187,7 +183,6 b' class TestClient(ClusterTestCase):'
187 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
183 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
188
184
189 def test_shutdown(self):
185 def test_shutdown(self):
190 # self.addEngine(4)
191 ids = self.client.ids
186 ids = self.client.ids
192 id0 = ids[0]
187 id0 = ids[0]
193 self.client.shutdown(id0, block=True)
188 self.client.shutdown(id0, block=True)
@@ -30,7 +30,7 b' from IPython.parallel.tests import add_engines'
30 from .clienttest import ClusterTestCase
30 from .clienttest import ClusterTestCase
31
31
32 def setup():
32 def setup():
33 add_engines(1)
33 add_engines(1, total=True)
34
34
35 @pmod.require('time')
35 @pmod.require('time')
36 def wait(n):
36 def wait(n):
@@ -30,7 +30,7 b' from IPython.parallel.tests import add_engines'
30 from .clienttest import ClusterTestCase, crash, wait, skip_without
30 from .clienttest import ClusterTestCase, crash, wait, skip_without
31
31
32 def setup():
32 def setup():
33 add_engines(3)
33 add_engines(3, total=True)
34
34
35 class TestLoadBalancedView(ClusterTestCase):
35 class TestLoadBalancedView(ClusterTestCase):
36
36
@@ -120,7 +120,6 b' class TestLoadBalancedView(ClusterTestCase):'
120 self.assertRaises(error.TaskAborted, ar3.get)
120 self.assertRaises(error.TaskAborted, ar3.get)
121
121
122 def test_retries(self):
122 def test_retries(self):
123 add_engines(3)
124 view = self.view
123 view = self.view
125 view.timeout = 1 # prevent hang if this doesn't behave
124 view.timeout = 1 # prevent hang if this doesn't behave
126 def fail():
125 def fail():
@@ -138,8 +137,7 b' class TestLoadBalancedView(ClusterTestCase):'
138 self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1)
137 self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1)
139
138
140 def test_impossible_dependency(self):
139 def test_impossible_dependency(self):
141 if len(self.client) < 2:
140 self.minimum_engines(2)
142 add_engines(2)
143 view = self.client.load_balanced_view()
141 view = self.client.load_balanced_view()
144 ar1 = view.apply_async(lambda : 1)
142 ar1 = view.apply_async(lambda : 1)
145 ar1.get()
143 ar1.get()
@@ -37,7 +37,7 b' from IPython.parallel.tests import add_engines'
37 from .clienttest import ClusterTestCase, crash, wait, skip_without
37 from .clienttest import ClusterTestCase, crash, wait, skip_without
38
38
39 def setup():
39 def setup():
40 add_engines(3)
40 add_engines(3, total=True)
41
41
42 class TestView(ClusterTestCase):
42 class TestView(ClusterTestCase):
43
43
@@ -296,7 +296,7 b' class TestView(ClusterTestCase):'
296 def test_abort_all(self):
296 def test_abort_all(self):
297 """view.abort() aborts all outstanding tasks"""
297 """view.abort() aborts all outstanding tasks"""
298 view = self.client[-1]
298 view = self.client[-1]
299 ars = [ view.apply_async(time.sleep, 1) for i in range(10) ]
299 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
300 view.abort()
300 view.abort()
301 view.wait(timeout=5)
301 view.wait(timeout=5)
302 for ar in ars[5:]:
302 for ar in ars[5:]:
General Comments 0
You need to be logged in to leave comments. Login now