##// END OF EJS Templates
make spin_thread tests more forgiving of slow VMs...
MinRK -
Show More
@@ -1,551 +1,555 b''
1 """Tests for parallel client.py
1 """Tests for parallel client.py
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 from __future__ import division
19 from __future__ import division
20
20
21 import time
21 import time
22 from datetime import datetime
22 from datetime import datetime
23
23
24 import zmq
24 import zmq
25
25
26 from IPython import parallel
26 from IPython import parallel
27 from IPython.parallel.client import client as clientmod
27 from IPython.parallel.client import client as clientmod
28 from IPython.parallel import error
28 from IPython.parallel import error
29 from IPython.parallel import AsyncResult, AsyncHubResult
29 from IPython.parallel import AsyncResult, AsyncHubResult
30 from IPython.parallel import LoadBalancedView, DirectView
30 from IPython.parallel import LoadBalancedView, DirectView
31
31
32 from .clienttest import ClusterTestCase, segfault, wait, add_engines
32 from .clienttest import ClusterTestCase, segfault, wait, add_engines
33
33
34 def setup():
34 def setup():
35 add_engines(4, total=True)
35 add_engines(4, total=True)
36
36
37 class TestClient(ClusterTestCase):
37 class TestClient(ClusterTestCase):
38
38
39 def test_ids(self):
39 def test_ids(self):
40 n = len(self.client.ids)
40 n = len(self.client.ids)
41 self.add_engines(2)
41 self.add_engines(2)
42 self.assertEqual(len(self.client.ids), n+2)
42 self.assertEqual(len(self.client.ids), n+2)
43
43
44 def test_view_indexing(self):
44 def test_view_indexing(self):
45 """test index access for views"""
45 """test index access for views"""
46 self.minimum_engines(4)
46 self.minimum_engines(4)
47 targets = self.client._build_targets('all')[-1]
47 targets = self.client._build_targets('all')[-1]
48 v = self.client[:]
48 v = self.client[:]
49 self.assertEqual(v.targets, targets)
49 self.assertEqual(v.targets, targets)
50 t = self.client.ids[2]
50 t = self.client.ids[2]
51 v = self.client[t]
51 v = self.client[t]
52 self.assertTrue(isinstance(v, DirectView))
52 self.assertTrue(isinstance(v, DirectView))
53 self.assertEqual(v.targets, t)
53 self.assertEqual(v.targets, t)
54 t = self.client.ids[2:4]
54 t = self.client.ids[2:4]
55 v = self.client[t]
55 v = self.client[t]
56 self.assertTrue(isinstance(v, DirectView))
56 self.assertTrue(isinstance(v, DirectView))
57 self.assertEqual(v.targets, t)
57 self.assertEqual(v.targets, t)
58 v = self.client[::2]
58 v = self.client[::2]
59 self.assertTrue(isinstance(v, DirectView))
59 self.assertTrue(isinstance(v, DirectView))
60 self.assertEqual(v.targets, targets[::2])
60 self.assertEqual(v.targets, targets[::2])
61 v = self.client[1::3]
61 v = self.client[1::3]
62 self.assertTrue(isinstance(v, DirectView))
62 self.assertTrue(isinstance(v, DirectView))
63 self.assertEqual(v.targets, targets[1::3])
63 self.assertEqual(v.targets, targets[1::3])
64 v = self.client[:-3]
64 v = self.client[:-3]
65 self.assertTrue(isinstance(v, DirectView))
65 self.assertTrue(isinstance(v, DirectView))
66 self.assertEqual(v.targets, targets[:-3])
66 self.assertEqual(v.targets, targets[:-3])
67 v = self.client[-1]
67 v = self.client[-1]
68 self.assertTrue(isinstance(v, DirectView))
68 self.assertTrue(isinstance(v, DirectView))
69 self.assertEqual(v.targets, targets[-1])
69 self.assertEqual(v.targets, targets[-1])
70 self.assertRaises(TypeError, lambda : self.client[None])
70 self.assertRaises(TypeError, lambda : self.client[None])
71
71
72 def test_lbview_targets(self):
72 def test_lbview_targets(self):
73 """test load_balanced_view targets"""
73 """test load_balanced_view targets"""
74 v = self.client.load_balanced_view()
74 v = self.client.load_balanced_view()
75 self.assertEqual(v.targets, None)
75 self.assertEqual(v.targets, None)
76 v = self.client.load_balanced_view(-1)
76 v = self.client.load_balanced_view(-1)
77 self.assertEqual(v.targets, [self.client.ids[-1]])
77 self.assertEqual(v.targets, [self.client.ids[-1]])
78 v = self.client.load_balanced_view('all')
78 v = self.client.load_balanced_view('all')
79 self.assertEqual(v.targets, None)
79 self.assertEqual(v.targets, None)
80
80
81 def test_dview_targets(self):
81 def test_dview_targets(self):
82 """test direct_view targets"""
82 """test direct_view targets"""
83 v = self.client.direct_view()
83 v = self.client.direct_view()
84 self.assertEqual(v.targets, 'all')
84 self.assertEqual(v.targets, 'all')
85 v = self.client.direct_view('all')
85 v = self.client.direct_view('all')
86 self.assertEqual(v.targets, 'all')
86 self.assertEqual(v.targets, 'all')
87 v = self.client.direct_view(-1)
87 v = self.client.direct_view(-1)
88 self.assertEqual(v.targets, self.client.ids[-1])
88 self.assertEqual(v.targets, self.client.ids[-1])
89
89
90 def test_lazy_all_targets(self):
90 def test_lazy_all_targets(self):
91 """test lazy evaluation of rc.direct_view('all')"""
91 """test lazy evaluation of rc.direct_view('all')"""
92 v = self.client.direct_view()
92 v = self.client.direct_view()
93 self.assertEqual(v.targets, 'all')
93 self.assertEqual(v.targets, 'all')
94
94
95 def double(x):
95 def double(x):
96 return x*2
96 return x*2
97 seq = list(range(100))
97 seq = list(range(100))
98 ref = [ double(x) for x in seq ]
98 ref = [ double(x) for x in seq ]
99
99
100 # add some engines, which should be used
100 # add some engines, which should be used
101 self.add_engines(1)
101 self.add_engines(1)
102 n1 = len(self.client.ids)
102 n1 = len(self.client.ids)
103
103
104 # simple apply
104 # simple apply
105 r = v.apply_sync(lambda : 1)
105 r = v.apply_sync(lambda : 1)
106 self.assertEqual(r, [1] * n1)
106 self.assertEqual(r, [1] * n1)
107
107
108 # map goes through remotefunction
108 # map goes through remotefunction
109 r = v.map_sync(double, seq)
109 r = v.map_sync(double, seq)
110 self.assertEqual(r, ref)
110 self.assertEqual(r, ref)
111
111
112 # add a couple more engines, and try again
112 # add a couple more engines, and try again
113 self.add_engines(2)
113 self.add_engines(2)
114 n2 = len(self.client.ids)
114 n2 = len(self.client.ids)
115 self.assertNotEqual(n2, n1)
115 self.assertNotEqual(n2, n1)
116
116
117 # apply
117 # apply
118 r = v.apply_sync(lambda : 1)
118 r = v.apply_sync(lambda : 1)
119 self.assertEqual(r, [1] * n2)
119 self.assertEqual(r, [1] * n2)
120
120
121 # map
121 # map
122 r = v.map_sync(double, seq)
122 r = v.map_sync(double, seq)
123 self.assertEqual(r, ref)
123 self.assertEqual(r, ref)
124
124
125 def test_targets(self):
125 def test_targets(self):
126 """test various valid targets arguments"""
126 """test various valid targets arguments"""
127 build = self.client._build_targets
127 build = self.client._build_targets
128 ids = self.client.ids
128 ids = self.client.ids
129 idents,targets = build(None)
129 idents,targets = build(None)
130 self.assertEqual(ids, targets)
130 self.assertEqual(ids, targets)
131
131
132 def test_clear(self):
132 def test_clear(self):
133 """test clear behavior"""
133 """test clear behavior"""
134 self.minimum_engines(2)
134 self.minimum_engines(2)
135 v = self.client[:]
135 v = self.client[:]
136 v.block=True
136 v.block=True
137 v.push(dict(a=5))
137 v.push(dict(a=5))
138 v.pull('a')
138 v.pull('a')
139 id0 = self.client.ids[-1]
139 id0 = self.client.ids[-1]
140 self.client.clear(targets=id0, block=True)
140 self.client.clear(targets=id0, block=True)
141 a = self.client[:-1].get('a')
141 a = self.client[:-1].get('a')
142 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
142 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
143 self.client.clear(block=True)
143 self.client.clear(block=True)
144 for i in self.client.ids:
144 for i in self.client.ids:
145 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
145 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
146
146
147 def test_get_result(self):
147 def test_get_result(self):
148 """test getting results from the Hub."""
148 """test getting results from the Hub."""
149 c = clientmod.Client(profile='iptest')
149 c = clientmod.Client(profile='iptest')
150 t = c.ids[-1]
150 t = c.ids[-1]
151 ar = c[t].apply_async(wait, 1)
151 ar = c[t].apply_async(wait, 1)
152 # give the monitor time to notice the message
152 # give the monitor time to notice the message
153 time.sleep(.25)
153 time.sleep(.25)
154 ahr = self.client.get_result(ar.msg_ids[0])
154 ahr = self.client.get_result(ar.msg_ids[0])
155 self.assertTrue(isinstance(ahr, AsyncHubResult))
155 self.assertTrue(isinstance(ahr, AsyncHubResult))
156 self.assertEqual(ahr.get(), ar.get())
156 self.assertEqual(ahr.get(), ar.get())
157 ar2 = self.client.get_result(ar.msg_ids[0])
157 ar2 = self.client.get_result(ar.msg_ids[0])
158 self.assertFalse(isinstance(ar2, AsyncHubResult))
158 self.assertFalse(isinstance(ar2, AsyncHubResult))
159 c.close()
159 c.close()
160
160
161 def test_get_execute_result(self):
161 def test_get_execute_result(self):
162 """test getting execute results from the Hub."""
162 """test getting execute results from the Hub."""
163 c = clientmod.Client(profile='iptest')
163 c = clientmod.Client(profile='iptest')
164 t = c.ids[-1]
164 t = c.ids[-1]
165 cell = '\n'.join([
165 cell = '\n'.join([
166 'import time',
166 'import time',
167 'time.sleep(0.25)',
167 'time.sleep(0.25)',
168 '5'
168 '5'
169 ])
169 ])
170 ar = c[t].execute("import time; time.sleep(1)", silent=False)
170 ar = c[t].execute("import time; time.sleep(1)", silent=False)
171 # give the monitor time to notice the message
171 # give the monitor time to notice the message
172 time.sleep(.25)
172 time.sleep(.25)
173 ahr = self.client.get_result(ar.msg_ids[0])
173 ahr = self.client.get_result(ar.msg_ids[0])
174 self.assertTrue(isinstance(ahr, AsyncHubResult))
174 self.assertTrue(isinstance(ahr, AsyncHubResult))
175 self.assertEqual(ahr.get().pyout, ar.get().pyout)
175 self.assertEqual(ahr.get().pyout, ar.get().pyout)
176 ar2 = self.client.get_result(ar.msg_ids[0])
176 ar2 = self.client.get_result(ar.msg_ids[0])
177 self.assertFalse(isinstance(ar2, AsyncHubResult))
177 self.assertFalse(isinstance(ar2, AsyncHubResult))
178 c.close()
178 c.close()
179
179
180 def test_ids_list(self):
180 def test_ids_list(self):
181 """test client.ids"""
181 """test client.ids"""
182 ids = self.client.ids
182 ids = self.client.ids
183 self.assertEqual(ids, self.client._ids)
183 self.assertEqual(ids, self.client._ids)
184 self.assertFalse(ids is self.client._ids)
184 self.assertFalse(ids is self.client._ids)
185 ids.remove(ids[-1])
185 ids.remove(ids[-1])
186 self.assertNotEqual(ids, self.client._ids)
186 self.assertNotEqual(ids, self.client._ids)
187
187
188 def test_queue_status(self):
188 def test_queue_status(self):
189 ids = self.client.ids
189 ids = self.client.ids
190 id0 = ids[0]
190 id0 = ids[0]
191 qs = self.client.queue_status(targets=id0)
191 qs = self.client.queue_status(targets=id0)
192 self.assertTrue(isinstance(qs, dict))
192 self.assertTrue(isinstance(qs, dict))
193 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
193 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
194 allqs = self.client.queue_status()
194 allqs = self.client.queue_status()
195 self.assertTrue(isinstance(allqs, dict))
195 self.assertTrue(isinstance(allqs, dict))
196 intkeys = list(allqs.keys())
196 intkeys = list(allqs.keys())
197 intkeys.remove('unassigned')
197 intkeys.remove('unassigned')
198 print("intkeys", intkeys)
198 print("intkeys", intkeys)
199 intkeys = sorted(intkeys)
199 intkeys = sorted(intkeys)
200 ids = self.client.ids
200 ids = self.client.ids
201 print("client.ids", ids)
201 print("client.ids", ids)
202 ids = sorted(self.client.ids)
202 ids = sorted(self.client.ids)
203 self.assertEqual(intkeys, ids)
203 self.assertEqual(intkeys, ids)
204 unassigned = allqs.pop('unassigned')
204 unassigned = allqs.pop('unassigned')
205 for eid,qs in allqs.items():
205 for eid,qs in allqs.items():
206 self.assertTrue(isinstance(qs, dict))
206 self.assertTrue(isinstance(qs, dict))
207 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
207 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
208
208
209 def test_shutdown(self):
209 def test_shutdown(self):
210 ids = self.client.ids
210 ids = self.client.ids
211 id0 = ids[0]
211 id0 = ids[0]
212 self.client.shutdown(id0, block=True)
212 self.client.shutdown(id0, block=True)
213 while id0 in self.client.ids:
213 while id0 in self.client.ids:
214 time.sleep(0.1)
214 time.sleep(0.1)
215 self.client.spin()
215 self.client.spin()
216
216
217 self.assertRaises(IndexError, lambda : self.client[id0])
217 self.assertRaises(IndexError, lambda : self.client[id0])
218
218
219 def test_result_status(self):
219 def test_result_status(self):
220 pass
220 pass
221 # to be written
221 # to be written
222
222
223 def test_db_query_dt(self):
223 def test_db_query_dt(self):
224 """test db query by date"""
224 """test db query by date"""
225 hist = self.client.hub_history()
225 hist = self.client.hub_history()
226 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
226 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
227 tic = middle['submitted']
227 tic = middle['submitted']
228 before = self.client.db_query({'submitted' : {'$lt' : tic}})
228 before = self.client.db_query({'submitted' : {'$lt' : tic}})
229 after = self.client.db_query({'submitted' : {'$gte' : tic}})
229 after = self.client.db_query({'submitted' : {'$gte' : tic}})
230 self.assertEqual(len(before)+len(after),len(hist))
230 self.assertEqual(len(before)+len(after),len(hist))
231 for b in before:
231 for b in before:
232 self.assertTrue(b['submitted'] < tic)
232 self.assertTrue(b['submitted'] < tic)
233 for a in after:
233 for a in after:
234 self.assertTrue(a['submitted'] >= tic)
234 self.assertTrue(a['submitted'] >= tic)
235 same = self.client.db_query({'submitted' : tic})
235 same = self.client.db_query({'submitted' : tic})
236 for s in same:
236 for s in same:
237 self.assertTrue(s['submitted'] == tic)
237 self.assertTrue(s['submitted'] == tic)
238
238
239 def test_db_query_keys(self):
239 def test_db_query_keys(self):
240 """test extracting subset of record keys"""
240 """test extracting subset of record keys"""
241 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
241 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
242 for rec in found:
242 for rec in found:
243 self.assertEqual(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
243 self.assertEqual(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
244
244
245 def test_db_query_default_keys(self):
245 def test_db_query_default_keys(self):
246 """default db_query excludes buffers"""
246 """default db_query excludes buffers"""
247 found = self.client.db_query({'msg_id': {'$ne' : ''}})
247 found = self.client.db_query({'msg_id': {'$ne' : ''}})
248 for rec in found:
248 for rec in found:
249 keys = set(rec.keys())
249 keys = set(rec.keys())
250 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
250 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
251 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
251 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
252
252
253 def test_db_query_msg_id(self):
253 def test_db_query_msg_id(self):
254 """ensure msg_id is always in db queries"""
254 """ensure msg_id is always in db queries"""
255 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
255 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
256 for rec in found:
256 for rec in found:
257 self.assertTrue('msg_id' in rec.keys())
257 self.assertTrue('msg_id' in rec.keys())
258 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
258 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
259 for rec in found:
259 for rec in found:
260 self.assertTrue('msg_id' in rec.keys())
260 self.assertTrue('msg_id' in rec.keys())
261 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
261 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
262 for rec in found:
262 for rec in found:
263 self.assertTrue('msg_id' in rec.keys())
263 self.assertTrue('msg_id' in rec.keys())
264
264
265 def test_db_query_get_result(self):
265 def test_db_query_get_result(self):
266 """pop in db_query shouldn't pop from result itself"""
266 """pop in db_query shouldn't pop from result itself"""
267 self.client[:].apply_sync(lambda : 1)
267 self.client[:].apply_sync(lambda : 1)
268 found = self.client.db_query({'msg_id': {'$ne' : ''}})
268 found = self.client.db_query({'msg_id': {'$ne' : ''}})
269 rc2 = clientmod.Client(profile='iptest')
269 rc2 = clientmod.Client(profile='iptest')
270 # If this bug is not fixed, this call will hang:
270 # If this bug is not fixed, this call will hang:
271 ar = rc2.get_result(self.client.history[-1])
271 ar = rc2.get_result(self.client.history[-1])
272 ar.wait(2)
272 ar.wait(2)
273 self.assertTrue(ar.ready())
273 self.assertTrue(ar.ready())
274 ar.get()
274 ar.get()
275 rc2.close()
275 rc2.close()
276
276
277 def test_db_query_in(self):
277 def test_db_query_in(self):
278 """test db query with '$in','$nin' operators"""
278 """test db query with '$in','$nin' operators"""
279 hist = self.client.hub_history()
279 hist = self.client.hub_history()
280 even = hist[::2]
280 even = hist[::2]
281 odd = hist[1::2]
281 odd = hist[1::2]
282 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
282 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
283 found = [ r['msg_id'] for r in recs ]
283 found = [ r['msg_id'] for r in recs ]
284 self.assertEqual(set(even), set(found))
284 self.assertEqual(set(even), set(found))
285 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
285 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
286 found = [ r['msg_id'] for r in recs ]
286 found = [ r['msg_id'] for r in recs ]
287 self.assertEqual(set(odd), set(found))
287 self.assertEqual(set(odd), set(found))
288
288
289 def test_hub_history(self):
289 def test_hub_history(self):
290 hist = self.client.hub_history()
290 hist = self.client.hub_history()
291 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
291 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
292 recdict = {}
292 recdict = {}
293 for rec in recs:
293 for rec in recs:
294 recdict[rec['msg_id']] = rec
294 recdict[rec['msg_id']] = rec
295
295
296 latest = datetime(1984,1,1)
296 latest = datetime(1984,1,1)
297 for msg_id in hist:
297 for msg_id in hist:
298 rec = recdict[msg_id]
298 rec = recdict[msg_id]
299 newt = rec['submitted']
299 newt = rec['submitted']
300 self.assertTrue(newt >= latest)
300 self.assertTrue(newt >= latest)
301 latest = newt
301 latest = newt
302 ar = self.client[-1].apply_async(lambda : 1)
302 ar = self.client[-1].apply_async(lambda : 1)
303 ar.get()
303 ar.get()
304 time.sleep(0.25)
304 time.sleep(0.25)
305 self.assertEqual(self.client.hub_history()[-1:],ar.msg_ids)
305 self.assertEqual(self.client.hub_history()[-1:],ar.msg_ids)
306
306
307 def _wait_for_idle(self):
307 def _wait_for_idle(self):
308 """wait for the cluster to become idle, according to the everyone."""
308 """wait for the cluster to become idle, according to the everyone."""
309 rc = self.client
309 rc = self.client
310
310
311 # step 0. wait for local results
311 # step 0. wait for local results
312 # this should be sufficient 99% of the time.
312 # this should be sufficient 99% of the time.
313 rc.wait(timeout=5)
313 rc.wait(timeout=5)
314
314
315 # step 1. wait for all requests to be noticed
315 # step 1. wait for all requests to be noticed
316 # timeout 5s, polling every 100ms
316 # timeout 5s, polling every 100ms
317 msg_ids = set(rc.history)
317 msg_ids = set(rc.history)
318 hub_hist = rc.hub_history()
318 hub_hist = rc.hub_history()
319 for i in range(50):
319 for i in range(50):
320 if msg_ids.difference(hub_hist):
320 if msg_ids.difference(hub_hist):
321 time.sleep(0.1)
321 time.sleep(0.1)
322 hub_hist = rc.hub_history()
322 hub_hist = rc.hub_history()
323 else:
323 else:
324 break
324 break
325
325
326 self.assertEqual(len(msg_ids.difference(hub_hist)), 0)
326 self.assertEqual(len(msg_ids.difference(hub_hist)), 0)
327
327
328 # step 2. wait for all requests to be done
328 # step 2. wait for all requests to be done
329 # timeout 5s, polling every 100ms
329 # timeout 5s, polling every 100ms
330 qs = rc.queue_status()
330 qs = rc.queue_status()
331 for i in range(50):
331 for i in range(50):
332 if qs['unassigned'] or any(qs[eid]['tasks'] + qs[eid]['queue'] for eid in qs if eid != 'unassigned'):
332 if qs['unassigned'] or any(qs[eid]['tasks'] + qs[eid]['queue'] for eid in qs if eid != 'unassigned'):
333 time.sleep(0.1)
333 time.sleep(0.1)
334 qs = rc.queue_status()
334 qs = rc.queue_status()
335 else:
335 else:
336 break
336 break
337
337
338 # ensure Hub up to date:
338 # ensure Hub up to date:
339 self.assertEqual(qs['unassigned'], 0)
339 self.assertEqual(qs['unassigned'], 0)
340 for eid in [ eid for eid in qs if eid != 'unassigned' ]:
340 for eid in [ eid for eid in qs if eid != 'unassigned' ]:
341 self.assertEqual(qs[eid]['tasks'], 0)
341 self.assertEqual(qs[eid]['tasks'], 0)
342 self.assertEqual(qs[eid]['queue'], 0)
342 self.assertEqual(qs[eid]['queue'], 0)
343
343
344
344
345 def test_resubmit(self):
345 def test_resubmit(self):
346 def f():
346 def f():
347 import random
347 import random
348 return random.random()
348 return random.random()
349 v = self.client.load_balanced_view()
349 v = self.client.load_balanced_view()
350 ar = v.apply_async(f)
350 ar = v.apply_async(f)
351 r1 = ar.get(1)
351 r1 = ar.get(1)
352 # give the Hub a chance to notice:
352 # give the Hub a chance to notice:
353 self._wait_for_idle()
353 self._wait_for_idle()
354 ahr = self.client.resubmit(ar.msg_ids)
354 ahr = self.client.resubmit(ar.msg_ids)
355 r2 = ahr.get(1)
355 r2 = ahr.get(1)
356 self.assertFalse(r1 == r2)
356 self.assertFalse(r1 == r2)
357
357
358 def test_resubmit_chain(self):
358 def test_resubmit_chain(self):
359 """resubmit resubmitted tasks"""
359 """resubmit resubmitted tasks"""
360 v = self.client.load_balanced_view()
360 v = self.client.load_balanced_view()
361 ar = v.apply_async(lambda x: x, 'x'*1024)
361 ar = v.apply_async(lambda x: x, 'x'*1024)
362 ar.get()
362 ar.get()
363 self._wait_for_idle()
363 self._wait_for_idle()
364 ars = [ar]
364 ars = [ar]
365
365
366 for i in range(10):
366 for i in range(10):
367 ar = ars[-1]
367 ar = ars[-1]
368 ar2 = self.client.resubmit(ar.msg_ids)
368 ar2 = self.client.resubmit(ar.msg_ids)
369
369
370 [ ar.get() for ar in ars ]
370 [ ar.get() for ar in ars ]
371
371
372 def test_resubmit_header(self):
372 def test_resubmit_header(self):
373 """resubmit shouldn't clobber the whole header"""
373 """resubmit shouldn't clobber the whole header"""
374 def f():
374 def f():
375 import random
375 import random
376 return random.random()
376 return random.random()
377 v = self.client.load_balanced_view()
377 v = self.client.load_balanced_view()
378 v.retries = 1
378 v.retries = 1
379 ar = v.apply_async(f)
379 ar = v.apply_async(f)
380 r1 = ar.get(1)
380 r1 = ar.get(1)
381 # give the Hub a chance to notice:
381 # give the Hub a chance to notice:
382 self._wait_for_idle()
382 self._wait_for_idle()
383 ahr = self.client.resubmit(ar.msg_ids)
383 ahr = self.client.resubmit(ar.msg_ids)
384 ahr.get(1)
384 ahr.get(1)
385 time.sleep(0.5)
385 time.sleep(0.5)
386 records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header')
386 records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header')
387 h1,h2 = [ r['header'] for r in records ]
387 h1,h2 = [ r['header'] for r in records ]
388 for key in set(h1.keys()).union(set(h2.keys())):
388 for key in set(h1.keys()).union(set(h2.keys())):
389 if key in ('msg_id', 'date'):
389 if key in ('msg_id', 'date'):
390 self.assertNotEqual(h1[key], h2[key])
390 self.assertNotEqual(h1[key], h2[key])
391 else:
391 else:
392 self.assertEqual(h1[key], h2[key])
392 self.assertEqual(h1[key], h2[key])
393
393
394 def test_resubmit_aborted(self):
394 def test_resubmit_aborted(self):
395 def f():
395 def f():
396 import random
396 import random
397 return random.random()
397 return random.random()
398 v = self.client.load_balanced_view()
398 v = self.client.load_balanced_view()
399 # restrict to one engine, so we can put a sleep
399 # restrict to one engine, so we can put a sleep
400 # ahead of the task, so it will get aborted
400 # ahead of the task, so it will get aborted
401 eid = self.client.ids[-1]
401 eid = self.client.ids[-1]
402 v.targets = [eid]
402 v.targets = [eid]
403 sleep = v.apply_async(time.sleep, 0.5)
403 sleep = v.apply_async(time.sleep, 0.5)
404 ar = v.apply_async(f)
404 ar = v.apply_async(f)
405 ar.abort()
405 ar.abort()
406 self.assertRaises(error.TaskAborted, ar.get)
406 self.assertRaises(error.TaskAborted, ar.get)
407 # Give the Hub a chance to get up to date:
407 # Give the Hub a chance to get up to date:
408 self._wait_for_idle()
408 self._wait_for_idle()
409 ahr = self.client.resubmit(ar.msg_ids)
409 ahr = self.client.resubmit(ar.msg_ids)
410 r2 = ahr.get(1)
410 r2 = ahr.get(1)
411
411
412 def test_resubmit_inflight(self):
412 def test_resubmit_inflight(self):
413 """resubmit of inflight task"""
413 """resubmit of inflight task"""
414 v = self.client.load_balanced_view()
414 v = self.client.load_balanced_view()
415 ar = v.apply_async(time.sleep,1)
415 ar = v.apply_async(time.sleep,1)
416 # give the message a chance to arrive
416 # give the message a chance to arrive
417 time.sleep(0.2)
417 time.sleep(0.2)
418 ahr = self.client.resubmit(ar.msg_ids)
418 ahr = self.client.resubmit(ar.msg_ids)
419 ar.get(2)
419 ar.get(2)
420 ahr.get(2)
420 ahr.get(2)
421
421
422 def test_resubmit_badkey(self):
422 def test_resubmit_badkey(self):
423 """ensure KeyError on resubmit of nonexistant task"""
423 """ensure KeyError on resubmit of nonexistant task"""
424 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
424 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
425
425
426 def test_purge_hub_results(self):
426 def test_purge_hub_results(self):
427 # ensure there are some tasks
427 # ensure there are some tasks
428 for i in range(5):
428 for i in range(5):
429 self.client[:].apply_sync(lambda : 1)
429 self.client[:].apply_sync(lambda : 1)
430 # Wait for the Hub to realise the result is done:
430 # Wait for the Hub to realise the result is done:
431 # This prevents a race condition, where we
431 # This prevents a race condition, where we
432 # might purge a result the Hub still thinks is pending.
432 # might purge a result the Hub still thinks is pending.
433 self._wait_for_idle()
433 self._wait_for_idle()
434 rc2 = clientmod.Client(profile='iptest')
434 rc2 = clientmod.Client(profile='iptest')
435 hist = self.client.hub_history()
435 hist = self.client.hub_history()
436 ahr = rc2.get_result([hist[-1]])
436 ahr = rc2.get_result([hist[-1]])
437 ahr.wait(10)
437 ahr.wait(10)
438 self.client.purge_hub_results(hist[-1])
438 self.client.purge_hub_results(hist[-1])
439 newhist = self.client.hub_history()
439 newhist = self.client.hub_history()
440 self.assertEqual(len(newhist)+1,len(hist))
440 self.assertEqual(len(newhist)+1,len(hist))
441 rc2.spin()
441 rc2.spin()
442 rc2.close()
442 rc2.close()
443
443
444 def test_purge_local_results(self):
444 def test_purge_local_results(self):
445 # ensure there are some tasks
445 # ensure there are some tasks
446 res = []
446 res = []
447 for i in range(5):
447 for i in range(5):
448 res.append(self.client[:].apply_async(lambda : 1))
448 res.append(self.client[:].apply_async(lambda : 1))
449 self._wait_for_idle()
449 self._wait_for_idle()
450 self.client.wait(10) # wait for the results to come back
450 self.client.wait(10) # wait for the results to come back
451 before = len(self.client.results)
451 before = len(self.client.results)
452 self.assertEqual(len(self.client.metadata),before)
452 self.assertEqual(len(self.client.metadata),before)
453 self.client.purge_local_results(res[-1])
453 self.client.purge_local_results(res[-1])
454 self.assertEqual(len(self.client.results),before-len(res[-1]), msg="Not removed from results")
454 self.assertEqual(len(self.client.results),before-len(res[-1]), msg="Not removed from results")
455 self.assertEqual(len(self.client.metadata),before-len(res[-1]), msg="Not removed from metadata")
455 self.assertEqual(len(self.client.metadata),before-len(res[-1]), msg="Not removed from metadata")
456
456
457 def test_purge_local_results_outstanding(self):
457 def test_purge_local_results_outstanding(self):
458 v = self.client[-1]
458 v = self.client[-1]
459 ar = v.apply_async(lambda : 1)
459 ar = v.apply_async(lambda : 1)
460 msg_id = ar.msg_ids[0]
460 msg_id = ar.msg_ids[0]
461 ar.get()
461 ar.get()
462 self._wait_for_idle()
462 self._wait_for_idle()
463 ar2 = v.apply_async(time.sleep, 1)
463 ar2 = v.apply_async(time.sleep, 1)
464 self.assertIn(msg_id, self.client.results)
464 self.assertIn(msg_id, self.client.results)
465 self.assertIn(msg_id, self.client.metadata)
465 self.assertIn(msg_id, self.client.metadata)
466 self.client.purge_local_results(ar)
466 self.client.purge_local_results(ar)
467 self.assertNotIn(msg_id, self.client.results)
467 self.assertNotIn(msg_id, self.client.results)
468 self.assertNotIn(msg_id, self.client.metadata)
468 self.assertNotIn(msg_id, self.client.metadata)
469 with self.assertRaises(RuntimeError):
469 with self.assertRaises(RuntimeError):
470 self.client.purge_local_results(ar2)
470 self.client.purge_local_results(ar2)
471 ar2.get()
471 ar2.get()
472 self.client.purge_local_results(ar2)
472 self.client.purge_local_results(ar2)
473
473
474 def test_purge_all_local_results_outstanding(self):
474 def test_purge_all_local_results_outstanding(self):
475 v = self.client[-1]
475 v = self.client[-1]
476 ar = v.apply_async(time.sleep, 1)
476 ar = v.apply_async(time.sleep, 1)
477 with self.assertRaises(RuntimeError):
477 with self.assertRaises(RuntimeError):
478 self.client.purge_local_results('all')
478 self.client.purge_local_results('all')
479 ar.get()
479 ar.get()
480 self.client.purge_local_results('all')
480 self.client.purge_local_results('all')
481
481
482 def test_purge_all_hub_results(self):
482 def test_purge_all_hub_results(self):
483 self.client.purge_hub_results('all')
483 self.client.purge_hub_results('all')
484 hist = self.client.hub_history()
484 hist = self.client.hub_history()
485 self.assertEqual(len(hist), 0)
485 self.assertEqual(len(hist), 0)
486
486
487 def test_purge_all_local_results(self):
487 def test_purge_all_local_results(self):
488 self.client.purge_local_results('all')
488 self.client.purge_local_results('all')
489 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
489 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
490 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
490 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
491
491
492 def test_purge_all_results(self):
492 def test_purge_all_results(self):
493 # ensure there are some tasks
493 # ensure there are some tasks
494 for i in range(5):
494 for i in range(5):
495 self.client[:].apply_sync(lambda : 1)
495 self.client[:].apply_sync(lambda : 1)
496 self.client.wait(10)
496 self.client.wait(10)
497 self._wait_for_idle()
497 self._wait_for_idle()
498 self.client.purge_results('all')
498 self.client.purge_results('all')
499 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
499 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
500 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
500 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
501 hist = self.client.hub_history()
501 hist = self.client.hub_history()
502 self.assertEqual(len(hist), 0, msg="hub history not empty")
502 self.assertEqual(len(hist), 0, msg="hub history not empty")
503
503
504 def test_purge_everything(self):
504 def test_purge_everything(self):
505 # ensure there are some tasks
505 # ensure there are some tasks
506 for i in range(5):
506 for i in range(5):
507 self.client[:].apply_sync(lambda : 1)
507 self.client[:].apply_sync(lambda : 1)
508 self.client.wait(10)
508 self.client.wait(10)
509 self._wait_for_idle()
509 self._wait_for_idle()
510 self.client.purge_everything()
510 self.client.purge_everything()
511 # The client results
511 # The client results
512 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
512 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
513 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
513 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
514 # The client "bookkeeping"
514 # The client "bookkeeping"
515 self.assertEqual(len(self.client.session.digest_history), 0, msg="session digest not empty")
515 self.assertEqual(len(self.client.session.digest_history), 0, msg="session digest not empty")
516 self.assertEqual(len(self.client.history), 0, msg="client history not empty")
516 self.assertEqual(len(self.client.history), 0, msg="client history not empty")
517 # the hub results
517 # the hub results
518 hist = self.client.hub_history()
518 hist = self.client.hub_history()
519 self.assertEqual(len(hist), 0, msg="hub history not empty")
519 self.assertEqual(len(hist), 0, msg="hub history not empty")
520
520
521
521
522 def test_spin_thread(self):
522 def test_spin_thread(self):
523 self.client.spin_thread(0.01)
523 self.client.spin_thread(0.01)
524 ar = self.client[-1].apply_async(lambda : 1)
524 ar = self.client[-1].apply_async(lambda : 1)
525 time.sleep(0.1)
525 md = self.client.metadata[ar.msg_ids[0]]
526 self.assertTrue(ar.wall_time < 0.1,
526 # 3s timeout, 100ms poll
527 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
527 for i in range(30):
528 )
528 time.sleep(0.1)
529 if md['received'] is not None:
530 break
531 self.assertIsInstance(md['received'], datetime)
529
532
530 def test_stop_spin_thread(self):
533 def test_stop_spin_thread(self):
531 self.client.spin_thread(0.01)
534 self.client.spin_thread(0.01)
532 self.client.stop_spin_thread()
535 self.client.stop_spin_thread()
533 ar = self.client[-1].apply_async(lambda : 1)
536 ar = self.client[-1].apply_async(lambda : 1)
534 time.sleep(0.15)
537 md = self.client.metadata[ar.msg_ids[0]]
535 self.assertTrue(ar.wall_time > 0.1,
538 # 500ms timeout, 100ms poll
536 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
539 for i in range(5):
537 )
540 time.sleep(0.1)
541 self.assertIsNone(md['received'], None)
538
542
539 def test_activate(self):
543 def test_activate(self):
540 ip = get_ipython()
544 ip = get_ipython()
541 magics = ip.magics_manager.magics
545 magics = ip.magics_manager.magics
542 self.assertTrue('px' in magics['line'])
546 self.assertTrue('px' in magics['line'])
543 self.assertTrue('px' in magics['cell'])
547 self.assertTrue('px' in magics['cell'])
544 v0 = self.client.activate(-1, '0')
548 v0 = self.client.activate(-1, '0')
545 self.assertTrue('px0' in magics['line'])
549 self.assertTrue('px0' in magics['line'])
546 self.assertTrue('px0' in magics['cell'])
550 self.assertTrue('px0' in magics['cell'])
547 self.assertEqual(v0.targets, self.client.ids[-1])
551 self.assertEqual(v0.targets, self.client.ids[-1])
548 v0 = self.client.activate('all', 'all')
552 v0 = self.client.activate('all', 'all')
549 self.assertTrue('pxall' in magics['line'])
553 self.assertTrue('pxall' in magics['line'])
550 self.assertTrue('pxall' in magics['cell'])
554 self.assertTrue('pxall' in magics['cell'])
551 self.assertEqual(v0.targets, 'all')
555 self.assertEqual(v0.targets, 'all')
General Comments 0
You need to be logged in to leave comments. Login now