Show More
@@ -68,9 +68,18 b' def setup():' | |||
|
68 | 68 | time.sleep(0.1) |
|
69 | 69 | add_engines(1) |
|
70 | 70 | |
|
71 | def add_engines(n=1, profile='iptest'): | |
|
71 | def add_engines(n=1, profile='iptest', total=False): | |
|
72 | """add a number of engines to a given profile. | |
|
73 | ||
|
74 | If total is True, then already running engines are counted, and only | |
|
75 | the additional engines necessary (if any) are started. | |
|
76 | """ | |
|
72 | 77 | rc = Client(profile=profile) |
|
73 | 78 | base = len(rc) |
|
79 | ||
|
80 | if total: | |
|
81 | n = max(n - base, 0) | |
|
82 | ||
|
74 | 83 | eps = [] |
|
75 | 84 | for i in range(n): |
|
76 | 85 | ep = TestProcessLauncher() |
@@ -81,6 +81,13 b' class ClusterTestCase(BaseZMQTestCase):' | |||
|
81 | 81 | if block: |
|
82 | 82 | self.wait_on_engines() |
|
83 | 83 | |
|
84 | def minimum_engines(self, n=1, block=True): | |
|
85 | """add engines until there are at least n connected""" | |
|
86 | self.engines.extend(add_engines(n, total=True)) | |
|
87 | if block: | |
|
88 | self.wait_on_engines() | |
|
89 | ||
|
90 | ||
|
84 | 91 | def wait_on_engines(self, timeout=5): |
|
85 | 92 | """wait for our engines to connect.""" |
|
86 | 93 | n = len(self.engines)+self.base_engine_count |
@@ -23,7 +23,7 b' from IPython.parallel.tests import add_engines' | |||
|
23 | 23 | from .clienttest import ClusterTestCase |
|
24 | 24 | |
|
25 | 25 | def setup(): |
|
26 | add_engines(2) | |
|
26 | add_engines(2, total=True) | |
|
27 | 27 | |
|
28 | 28 | def wait(n): |
|
29 | 29 | import time |
@@ -32,18 +32,18 b' from IPython.parallel import LoadBalancedView, DirectView' | |||
|
32 | 32 | from clienttest import ClusterTestCase, segfault, wait, add_engines |
|
33 | 33 | |
|
34 | 34 | def setup(): |
|
35 | add_engines(4) | |
|
35 | add_engines(4, total=True) | |
|
36 | 36 | |
|
37 | 37 | class TestClient(ClusterTestCase): |
|
38 | 38 | |
|
39 | 39 | def test_ids(self): |
|
40 | 40 | n = len(self.client.ids) |
|
41 |
self.add_engines( |
|
|
42 |
self.assertEquals(len(self.client.ids), n+ |
|
|
41 | self.add_engines(2) | |
|
42 | self.assertEquals(len(self.client.ids), n+2) | |
|
43 | 43 | |
|
44 | 44 | def test_view_indexing(self): |
|
45 | 45 | """test index access for views""" |
|
46 |
self. |
|
|
46 | self.minimum_engines(4) | |
|
47 | 47 | targets = self.client._build_targets('all')[-1] |
|
48 | 48 | v = self.client[:] |
|
49 | 49 | self.assertEquals(v.targets, targets) |
@@ -98,7 +98,7 b' class TestClient(ClusterTestCase):' | |||
|
98 | 98 | ref = [ double(x) for x in seq ] |
|
99 | 99 | |
|
100 | 100 | # add some engines, which should be used |
|
101 |
self.add_engines( |
|
|
101 | self.add_engines(1) | |
|
102 | 102 | n1 = len(self.client.ids) |
|
103 | 103 | |
|
104 | 104 | # simple apply |
@@ -131,7 +131,7 b' class TestClient(ClusterTestCase):' | |||
|
131 | 131 | |
|
132 | 132 | def test_clear(self): |
|
133 | 133 | """test clear behavior""" |
|
134 |
|
|
|
134 | self.minimum_engines(2) | |
|
135 | 135 | v = self.client[:] |
|
136 | 136 | v.block=True |
|
137 | 137 | v.push(dict(a=5)) |
@@ -142,13 +142,11 b' class TestClient(ClusterTestCase):' | |||
|
142 | 142 | self.assertRaisesRemote(NameError, self.client[id0].get, 'a') |
|
143 | 143 | self.client.clear(block=True) |
|
144 | 144 | for i in self.client.ids: |
|
145 | # print i | |
|
146 | 145 | self.assertRaisesRemote(NameError, self.client[i].get, 'a') |
|
147 | 146 | |
|
148 | 147 | def test_get_result(self): |
|
149 | 148 | """test getting results from the Hub.""" |
|
150 | 149 | c = clientmod.Client(profile='iptest') |
|
151 | # self.add_engines(1) | |
|
152 | 150 | t = c.ids[-1] |
|
153 | 151 | ar = c[t].apply_async(wait, 1) |
|
154 | 152 | # give the monitor time to notice the message |
@@ -162,7 +160,6 b' class TestClient(ClusterTestCase):' | |||
|
162 | 160 | |
|
163 | 161 | def test_ids_list(self): |
|
164 | 162 | """test client.ids""" |
|
165 | # self.add_engines(2) | |
|
166 | 163 | ids = self.client.ids |
|
167 | 164 | self.assertEquals(ids, self.client._ids) |
|
168 | 165 | self.assertFalse(ids is self.client._ids) |
@@ -170,7 +167,6 b' class TestClient(ClusterTestCase):' | |||
|
170 | 167 | self.assertNotEquals(ids, self.client._ids) |
|
171 | 168 | |
|
172 | 169 | def test_queue_status(self): |
|
173 | # self.addEngine(4) | |
|
174 | 170 | ids = self.client.ids |
|
175 | 171 | id0 = ids[0] |
|
176 | 172 | qs = self.client.queue_status(targets=id0) |
@@ -187,7 +183,6 b' class TestClient(ClusterTestCase):' | |||
|
187 | 183 | self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks']) |
|
188 | 184 | |
|
189 | 185 | def test_shutdown(self): |
|
190 | # self.addEngine(4) | |
|
191 | 186 | ids = self.client.ids |
|
192 | 187 | id0 = ids[0] |
|
193 | 188 | self.client.shutdown(id0, block=True) |
@@ -30,7 +30,7 b' from IPython.parallel.tests import add_engines' | |||
|
30 | 30 | from .clienttest import ClusterTestCase |
|
31 | 31 | |
|
32 | 32 | def setup(): |
|
33 | add_engines(1) | |
|
33 | add_engines(1, total=True) | |
|
34 | 34 | |
|
35 | 35 | @pmod.require('time') |
|
36 | 36 | def wait(n): |
@@ -30,7 +30,7 b' from IPython.parallel.tests import add_engines' | |||
|
30 | 30 | from .clienttest import ClusterTestCase, crash, wait, skip_without |
|
31 | 31 | |
|
32 | 32 | def setup(): |
|
33 | add_engines(3) | |
|
33 | add_engines(3, total=True) | |
|
34 | 34 | |
|
35 | 35 | class TestLoadBalancedView(ClusterTestCase): |
|
36 | 36 | |
@@ -120,7 +120,6 b' class TestLoadBalancedView(ClusterTestCase):' | |||
|
120 | 120 | self.assertRaises(error.TaskAborted, ar3.get) |
|
121 | 121 | |
|
122 | 122 | def test_retries(self): |
|
123 | add_engines(3) | |
|
124 | 123 | view = self.view |
|
125 | 124 | view.timeout = 1 # prevent hang if this doesn't behave |
|
126 | 125 | def fail(): |
@@ -138,8 +137,7 b' class TestLoadBalancedView(ClusterTestCase):' | |||
|
138 | 137 | self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1) |
|
139 | 138 | |
|
140 | 139 | def test_impossible_dependency(self): |
|
141 | if len(self.client) < 2: | |
|
142 | add_engines(2) | |
|
140 | self.minimum_engines(2) | |
|
143 | 141 | view = self.client.load_balanced_view() |
|
144 | 142 | ar1 = view.apply_async(lambda : 1) |
|
145 | 143 | ar1.get() |
@@ -37,7 +37,7 b' from IPython.parallel.tests import add_engines' | |||
|
37 | 37 | from .clienttest import ClusterTestCase, crash, wait, skip_without |
|
38 | 38 | |
|
39 | 39 | def setup(): |
|
40 | add_engines(3) | |
|
40 | add_engines(3, total=True) | |
|
41 | 41 | |
|
42 | 42 | class TestView(ClusterTestCase): |
|
43 | 43 | |
@@ -296,7 +296,7 b' class TestView(ClusterTestCase):' | |||
|
296 | 296 | def test_abort_all(self): |
|
297 | 297 | """view.abort() aborts all outstanding tasks""" |
|
298 | 298 | view = self.client[-1] |
|
299 |
ars = [ view.apply_async(time.sleep, |
|
|
299 | ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ] | |
|
300 | 300 | view.abort() |
|
301 | 301 | view.wait(timeout=5) |
|
302 | 302 | for ar in ars[5:]: |
General Comments 0
You need to be logged in to leave comments.
Login now