##// END OF EJS Templates
expedite IPython.parallel tests...
MinRK -
Show More
@@ -68,9 +68,18 b' def setup():'
68 68 time.sleep(0.1)
69 69 add_engines(1)
70 70
71 def add_engines(n=1, profile='iptest'):
71 def add_engines(n=1, profile='iptest', total=False):
72 """add a number of engines to a given profile.
73
74 If total is True, then already running engines are counted, and only
75 the additional engines necessary (if any) are started.
76 """
72 77 rc = Client(profile=profile)
73 78 base = len(rc)
79
80 if total:
81 n = max(n - base, 0)
82
74 83 eps = []
75 84 for i in range(n):
76 85 ep = TestProcessLauncher()
@@ -81,6 +81,13 b' class ClusterTestCase(BaseZMQTestCase):'
81 81 if block:
82 82 self.wait_on_engines()
83 83
84 def minimum_engines(self, n=1, block=True):
85 """add engines until there are at least n connected"""
86 self.engines.extend(add_engines(n, total=True))
87 if block:
88 self.wait_on_engines()
89
90
84 91 def wait_on_engines(self, timeout=5):
85 92 """wait for our engines to connect."""
86 93 n = len(self.engines)+self.base_engine_count
@@ -23,7 +23,7 b' from IPython.parallel.tests import add_engines'
23 23 from .clienttest import ClusterTestCase
24 24
25 25 def setup():
26 add_engines(2)
26 add_engines(2, total=True)
27 27
28 28 def wait(n):
29 29 import time
@@ -32,18 +32,18 b' from IPython.parallel import LoadBalancedView, DirectView'
32 32 from clienttest import ClusterTestCase, segfault, wait, add_engines
33 33
34 34 def setup():
35 add_engines(4)
35 add_engines(4, total=True)
36 36
37 37 class TestClient(ClusterTestCase):
38 38
39 39 def test_ids(self):
40 40 n = len(self.client.ids)
41 self.add_engines(3)
42 self.assertEquals(len(self.client.ids), n+3)
41 self.add_engines(2)
42 self.assertEquals(len(self.client.ids), n+2)
43 43
44 44 def test_view_indexing(self):
45 45 """test index access for views"""
46 self.add_engines(2)
46 self.minimum_engines(4)
47 47 targets = self.client._build_targets('all')[-1]
48 48 v = self.client[:]
49 49 self.assertEquals(v.targets, targets)
@@ -98,7 +98,7 b' class TestClient(ClusterTestCase):'
98 98 ref = [ double(x) for x in seq ]
99 99
100 100 # add some engines, which should be used
101 self.add_engines(2)
101 self.add_engines(1)
102 102 n1 = len(self.client.ids)
103 103
104 104 # simple apply
@@ -131,7 +131,7 b' class TestClient(ClusterTestCase):'
131 131
132 132 def test_clear(self):
133 133 """test clear behavior"""
134 # self.add_engines(2)
134 self.minimum_engines(2)
135 135 v = self.client[:]
136 136 v.block=True
137 137 v.push(dict(a=5))
@@ -142,13 +142,11 b' class TestClient(ClusterTestCase):'
142 142 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
143 143 self.client.clear(block=True)
144 144 for i in self.client.ids:
145 # print i
146 145 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
147 146
148 147 def test_get_result(self):
149 148 """test getting results from the Hub."""
150 149 c = clientmod.Client(profile='iptest')
151 # self.add_engines(1)
152 150 t = c.ids[-1]
153 151 ar = c[t].apply_async(wait, 1)
154 152 # give the monitor time to notice the message
@@ -162,7 +160,6 b' class TestClient(ClusterTestCase):'
162 160
163 161 def test_ids_list(self):
164 162 """test client.ids"""
165 # self.add_engines(2)
166 163 ids = self.client.ids
167 164 self.assertEquals(ids, self.client._ids)
168 165 self.assertFalse(ids is self.client._ids)
@@ -170,7 +167,6 b' class TestClient(ClusterTestCase):'
170 167 self.assertNotEquals(ids, self.client._ids)
171 168
172 169 def test_queue_status(self):
173 # self.addEngine(4)
174 170 ids = self.client.ids
175 171 id0 = ids[0]
176 172 qs = self.client.queue_status(targets=id0)
@@ -187,7 +183,6 b' class TestClient(ClusterTestCase):'
187 183 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
188 184
189 185 def test_shutdown(self):
190 # self.addEngine(4)
191 186 ids = self.client.ids
192 187 id0 = ids[0]
193 188 self.client.shutdown(id0, block=True)
@@ -30,7 +30,7 b' from IPython.parallel.tests import add_engines'
30 30 from .clienttest import ClusterTestCase
31 31
32 32 def setup():
33 add_engines(1)
33 add_engines(1, total=True)
34 34
35 35 @pmod.require('time')
36 36 def wait(n):
@@ -30,7 +30,7 b' from IPython.parallel.tests import add_engines'
30 30 from .clienttest import ClusterTestCase, crash, wait, skip_without
31 31
32 32 def setup():
33 add_engines(3)
33 add_engines(3, total=True)
34 34
35 35 class TestLoadBalancedView(ClusterTestCase):
36 36
@@ -120,7 +120,6 b' class TestLoadBalancedView(ClusterTestCase):'
120 120 self.assertRaises(error.TaskAborted, ar3.get)
121 121
122 122 def test_retries(self):
123 add_engines(3)
124 123 view = self.view
125 124 view.timeout = 1 # prevent hang if this doesn't behave
126 125 def fail():
@@ -138,8 +137,7 b' class TestLoadBalancedView(ClusterTestCase):'
138 137 self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1)
139 138
140 139 def test_impossible_dependency(self):
141 if len(self.client) < 2:
142 add_engines(2)
140 self.minimum_engines(2)
143 141 view = self.client.load_balanced_view()
144 142 ar1 = view.apply_async(lambda : 1)
145 143 ar1.get()
@@ -37,7 +37,7 b' from IPython.parallel.tests import add_engines'
37 37 from .clienttest import ClusterTestCase, crash, wait, skip_without
38 38
39 39 def setup():
40 add_engines(3)
40 add_engines(3, total=True)
41 41
42 42 class TestView(ClusterTestCase):
43 43
@@ -296,7 +296,7 b' class TestView(ClusterTestCase):'
296 296 def test_abort_all(self):
297 297 """view.abort() aborts all outstanding tasks"""
298 298 view = self.client[-1]
299 ars = [ view.apply_async(time.sleep, 1) for i in range(10) ]
299 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
300 300 view.abort()
301 301 view.wait(timeout=5)
302 302 for ar in ars[5:]:
General Comments 0
You need to be logged in to leave comments. Login now