##// END OF EJS Templates
debug occasional error in test_queue_status...
MinRK -
Show More
@@ -1,546 +1,551 b''
1 """Tests for parallel client.py
1 """Tests for parallel client.py
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 from __future__ import division
19 from __future__ import division
20
20
21 import time
21 import time
22 from datetime import datetime
22 from datetime import datetime
23
23
24 import zmq
24 import zmq
25
25
26 from IPython import parallel
26 from IPython import parallel
27 from IPython.parallel.client import client as clientmod
27 from IPython.parallel.client import client as clientmod
28 from IPython.parallel import error
28 from IPython.parallel import error
29 from IPython.parallel import AsyncResult, AsyncHubResult
29 from IPython.parallel import AsyncResult, AsyncHubResult
30 from IPython.parallel import LoadBalancedView, DirectView
30 from IPython.parallel import LoadBalancedView, DirectView
31
31
32 from .clienttest import ClusterTestCase, segfault, wait, add_engines
32 from .clienttest import ClusterTestCase, segfault, wait, add_engines
33
33
34 def setup():
34 def setup():
35 add_engines(4, total=True)
35 add_engines(4, total=True)
36
36
37 class TestClient(ClusterTestCase):
37 class TestClient(ClusterTestCase):
38
38
39 def test_ids(self):
39 def test_ids(self):
40 n = len(self.client.ids)
40 n = len(self.client.ids)
41 self.add_engines(2)
41 self.add_engines(2)
42 self.assertEqual(len(self.client.ids), n+2)
42 self.assertEqual(len(self.client.ids), n+2)
43
43
44 def test_view_indexing(self):
44 def test_view_indexing(self):
45 """test index access for views"""
45 """test index access for views"""
46 self.minimum_engines(4)
46 self.minimum_engines(4)
47 targets = self.client._build_targets('all')[-1]
47 targets = self.client._build_targets('all')[-1]
48 v = self.client[:]
48 v = self.client[:]
49 self.assertEqual(v.targets, targets)
49 self.assertEqual(v.targets, targets)
50 t = self.client.ids[2]
50 t = self.client.ids[2]
51 v = self.client[t]
51 v = self.client[t]
52 self.assertTrue(isinstance(v, DirectView))
52 self.assertTrue(isinstance(v, DirectView))
53 self.assertEqual(v.targets, t)
53 self.assertEqual(v.targets, t)
54 t = self.client.ids[2:4]
54 t = self.client.ids[2:4]
55 v = self.client[t]
55 v = self.client[t]
56 self.assertTrue(isinstance(v, DirectView))
56 self.assertTrue(isinstance(v, DirectView))
57 self.assertEqual(v.targets, t)
57 self.assertEqual(v.targets, t)
58 v = self.client[::2]
58 v = self.client[::2]
59 self.assertTrue(isinstance(v, DirectView))
59 self.assertTrue(isinstance(v, DirectView))
60 self.assertEqual(v.targets, targets[::2])
60 self.assertEqual(v.targets, targets[::2])
61 v = self.client[1::3]
61 v = self.client[1::3]
62 self.assertTrue(isinstance(v, DirectView))
62 self.assertTrue(isinstance(v, DirectView))
63 self.assertEqual(v.targets, targets[1::3])
63 self.assertEqual(v.targets, targets[1::3])
64 v = self.client[:-3]
64 v = self.client[:-3]
65 self.assertTrue(isinstance(v, DirectView))
65 self.assertTrue(isinstance(v, DirectView))
66 self.assertEqual(v.targets, targets[:-3])
66 self.assertEqual(v.targets, targets[:-3])
67 v = self.client[-1]
67 v = self.client[-1]
68 self.assertTrue(isinstance(v, DirectView))
68 self.assertTrue(isinstance(v, DirectView))
69 self.assertEqual(v.targets, targets[-1])
69 self.assertEqual(v.targets, targets[-1])
70 self.assertRaises(TypeError, lambda : self.client[None])
70 self.assertRaises(TypeError, lambda : self.client[None])
71
71
72 def test_lbview_targets(self):
72 def test_lbview_targets(self):
73 """test load_balanced_view targets"""
73 """test load_balanced_view targets"""
74 v = self.client.load_balanced_view()
74 v = self.client.load_balanced_view()
75 self.assertEqual(v.targets, None)
75 self.assertEqual(v.targets, None)
76 v = self.client.load_balanced_view(-1)
76 v = self.client.load_balanced_view(-1)
77 self.assertEqual(v.targets, [self.client.ids[-1]])
77 self.assertEqual(v.targets, [self.client.ids[-1]])
78 v = self.client.load_balanced_view('all')
78 v = self.client.load_balanced_view('all')
79 self.assertEqual(v.targets, None)
79 self.assertEqual(v.targets, None)
80
80
81 def test_dview_targets(self):
81 def test_dview_targets(self):
82 """test direct_view targets"""
82 """test direct_view targets"""
83 v = self.client.direct_view()
83 v = self.client.direct_view()
84 self.assertEqual(v.targets, 'all')
84 self.assertEqual(v.targets, 'all')
85 v = self.client.direct_view('all')
85 v = self.client.direct_view('all')
86 self.assertEqual(v.targets, 'all')
86 self.assertEqual(v.targets, 'all')
87 v = self.client.direct_view(-1)
87 v = self.client.direct_view(-1)
88 self.assertEqual(v.targets, self.client.ids[-1])
88 self.assertEqual(v.targets, self.client.ids[-1])
89
89
90 def test_lazy_all_targets(self):
90 def test_lazy_all_targets(self):
91 """test lazy evaluation of rc.direct_view('all')"""
91 """test lazy evaluation of rc.direct_view('all')"""
92 v = self.client.direct_view()
92 v = self.client.direct_view()
93 self.assertEqual(v.targets, 'all')
93 self.assertEqual(v.targets, 'all')
94
94
95 def double(x):
95 def double(x):
96 return x*2
96 return x*2
97 seq = list(range(100))
97 seq = list(range(100))
98 ref = [ double(x) for x in seq ]
98 ref = [ double(x) for x in seq ]
99
99
100 # add some engines, which should be used
100 # add some engines, which should be used
101 self.add_engines(1)
101 self.add_engines(1)
102 n1 = len(self.client.ids)
102 n1 = len(self.client.ids)
103
103
104 # simple apply
104 # simple apply
105 r = v.apply_sync(lambda : 1)
105 r = v.apply_sync(lambda : 1)
106 self.assertEqual(r, [1] * n1)
106 self.assertEqual(r, [1] * n1)
107
107
108 # map goes through remotefunction
108 # map goes through remotefunction
109 r = v.map_sync(double, seq)
109 r = v.map_sync(double, seq)
110 self.assertEqual(r, ref)
110 self.assertEqual(r, ref)
111
111
112 # add a couple more engines, and try again
112 # add a couple more engines, and try again
113 self.add_engines(2)
113 self.add_engines(2)
114 n2 = len(self.client.ids)
114 n2 = len(self.client.ids)
115 self.assertNotEqual(n2, n1)
115 self.assertNotEqual(n2, n1)
116
116
117 # apply
117 # apply
118 r = v.apply_sync(lambda : 1)
118 r = v.apply_sync(lambda : 1)
119 self.assertEqual(r, [1] * n2)
119 self.assertEqual(r, [1] * n2)
120
120
121 # map
121 # map
122 r = v.map_sync(double, seq)
122 r = v.map_sync(double, seq)
123 self.assertEqual(r, ref)
123 self.assertEqual(r, ref)
124
124
125 def test_targets(self):
125 def test_targets(self):
126 """test various valid targets arguments"""
126 """test various valid targets arguments"""
127 build = self.client._build_targets
127 build = self.client._build_targets
128 ids = self.client.ids
128 ids = self.client.ids
129 idents,targets = build(None)
129 idents,targets = build(None)
130 self.assertEqual(ids, targets)
130 self.assertEqual(ids, targets)
131
131
132 def test_clear(self):
132 def test_clear(self):
133 """test clear behavior"""
133 """test clear behavior"""
134 self.minimum_engines(2)
134 self.minimum_engines(2)
135 v = self.client[:]
135 v = self.client[:]
136 v.block=True
136 v.block=True
137 v.push(dict(a=5))
137 v.push(dict(a=5))
138 v.pull('a')
138 v.pull('a')
139 id0 = self.client.ids[-1]
139 id0 = self.client.ids[-1]
140 self.client.clear(targets=id0, block=True)
140 self.client.clear(targets=id0, block=True)
141 a = self.client[:-1].get('a')
141 a = self.client[:-1].get('a')
142 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
142 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
143 self.client.clear(block=True)
143 self.client.clear(block=True)
144 for i in self.client.ids:
144 for i in self.client.ids:
145 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
145 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
146
146
147 def test_get_result(self):
147 def test_get_result(self):
148 """test getting results from the Hub."""
148 """test getting results from the Hub."""
149 c = clientmod.Client(profile='iptest')
149 c = clientmod.Client(profile='iptest')
150 t = c.ids[-1]
150 t = c.ids[-1]
151 ar = c[t].apply_async(wait, 1)
151 ar = c[t].apply_async(wait, 1)
152 # give the monitor time to notice the message
152 # give the monitor time to notice the message
153 time.sleep(.25)
153 time.sleep(.25)
154 ahr = self.client.get_result(ar.msg_ids[0])
154 ahr = self.client.get_result(ar.msg_ids[0])
155 self.assertTrue(isinstance(ahr, AsyncHubResult))
155 self.assertTrue(isinstance(ahr, AsyncHubResult))
156 self.assertEqual(ahr.get(), ar.get())
156 self.assertEqual(ahr.get(), ar.get())
157 ar2 = self.client.get_result(ar.msg_ids[0])
157 ar2 = self.client.get_result(ar.msg_ids[0])
158 self.assertFalse(isinstance(ar2, AsyncHubResult))
158 self.assertFalse(isinstance(ar2, AsyncHubResult))
159 c.close()
159 c.close()
160
160
161 def test_get_execute_result(self):
161 def test_get_execute_result(self):
162 """test getting execute results from the Hub."""
162 """test getting execute results from the Hub."""
163 c = clientmod.Client(profile='iptest')
163 c = clientmod.Client(profile='iptest')
164 t = c.ids[-1]
164 t = c.ids[-1]
165 cell = '\n'.join([
165 cell = '\n'.join([
166 'import time',
166 'import time',
167 'time.sleep(0.25)',
167 'time.sleep(0.25)',
168 '5'
168 '5'
169 ])
169 ])
170 ar = c[t].execute("import time; time.sleep(1)", silent=False)
170 ar = c[t].execute("import time; time.sleep(1)", silent=False)
171 # give the monitor time to notice the message
171 # give the monitor time to notice the message
172 time.sleep(.25)
172 time.sleep(.25)
173 ahr = self.client.get_result(ar.msg_ids[0])
173 ahr = self.client.get_result(ar.msg_ids[0])
174 self.assertTrue(isinstance(ahr, AsyncHubResult))
174 self.assertTrue(isinstance(ahr, AsyncHubResult))
175 self.assertEqual(ahr.get().pyout, ar.get().pyout)
175 self.assertEqual(ahr.get().pyout, ar.get().pyout)
176 ar2 = self.client.get_result(ar.msg_ids[0])
176 ar2 = self.client.get_result(ar.msg_ids[0])
177 self.assertFalse(isinstance(ar2, AsyncHubResult))
177 self.assertFalse(isinstance(ar2, AsyncHubResult))
178 c.close()
178 c.close()
179
179
180 def test_ids_list(self):
180 def test_ids_list(self):
181 """test client.ids"""
181 """test client.ids"""
182 ids = self.client.ids
182 ids = self.client.ids
183 self.assertEqual(ids, self.client._ids)
183 self.assertEqual(ids, self.client._ids)
184 self.assertFalse(ids is self.client._ids)
184 self.assertFalse(ids is self.client._ids)
185 ids.remove(ids[-1])
185 ids.remove(ids[-1])
186 self.assertNotEqual(ids, self.client._ids)
186 self.assertNotEqual(ids, self.client._ids)
187
187
188 def test_queue_status(self):
188 def test_queue_status(self):
189 ids = self.client.ids
189 ids = self.client.ids
190 id0 = ids[0]
190 id0 = ids[0]
191 qs = self.client.queue_status(targets=id0)
191 qs = self.client.queue_status(targets=id0)
192 self.assertTrue(isinstance(qs, dict))
192 self.assertTrue(isinstance(qs, dict))
193 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
193 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
194 allqs = self.client.queue_status()
194 allqs = self.client.queue_status()
195 self.assertTrue(isinstance(allqs, dict))
195 self.assertTrue(isinstance(allqs, dict))
196 intkeys = list(allqs.keys())
196 intkeys = list(allqs.keys())
197 intkeys.remove('unassigned')
197 intkeys.remove('unassigned')
198 self.assertEqual(sorted(intkeys), sorted(self.client.ids))
198 print("intkeys", intkeys)
199 intkeys = sorted(intkeys)
200 ids = self.client.ids
201 print("client.ids", ids)
202 ids = sorted(self.client.ids)
203 self.assertEqual(intkeys, ids)
199 unassigned = allqs.pop('unassigned')
204 unassigned = allqs.pop('unassigned')
200 for eid,qs in allqs.items():
205 for eid,qs in allqs.items():
201 self.assertTrue(isinstance(qs, dict))
206 self.assertTrue(isinstance(qs, dict))
202 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
207 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
203
208
204 def test_shutdown(self):
209 def test_shutdown(self):
205 ids = self.client.ids
210 ids = self.client.ids
206 id0 = ids[0]
211 id0 = ids[0]
207 self.client.shutdown(id0, block=True)
212 self.client.shutdown(id0, block=True)
208 while id0 in self.client.ids:
213 while id0 in self.client.ids:
209 time.sleep(0.1)
214 time.sleep(0.1)
210 self.client.spin()
215 self.client.spin()
211
216
212 self.assertRaises(IndexError, lambda : self.client[id0])
217 self.assertRaises(IndexError, lambda : self.client[id0])
213
218
214 def test_result_status(self):
219 def test_result_status(self):
215 pass
220 pass
216 # to be written
221 # to be written
217
222
218 def test_db_query_dt(self):
223 def test_db_query_dt(self):
219 """test db query by date"""
224 """test db query by date"""
220 hist = self.client.hub_history()
225 hist = self.client.hub_history()
221 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
226 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
222 tic = middle['submitted']
227 tic = middle['submitted']
223 before = self.client.db_query({'submitted' : {'$lt' : tic}})
228 before = self.client.db_query({'submitted' : {'$lt' : tic}})
224 after = self.client.db_query({'submitted' : {'$gte' : tic}})
229 after = self.client.db_query({'submitted' : {'$gte' : tic}})
225 self.assertEqual(len(before)+len(after),len(hist))
230 self.assertEqual(len(before)+len(after),len(hist))
226 for b in before:
231 for b in before:
227 self.assertTrue(b['submitted'] < tic)
232 self.assertTrue(b['submitted'] < tic)
228 for a in after:
233 for a in after:
229 self.assertTrue(a['submitted'] >= tic)
234 self.assertTrue(a['submitted'] >= tic)
230 same = self.client.db_query({'submitted' : tic})
235 same = self.client.db_query({'submitted' : tic})
231 for s in same:
236 for s in same:
232 self.assertTrue(s['submitted'] == tic)
237 self.assertTrue(s['submitted'] == tic)
233
238
234 def test_db_query_keys(self):
239 def test_db_query_keys(self):
235 """test extracting subset of record keys"""
240 """test extracting subset of record keys"""
236 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
241 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
237 for rec in found:
242 for rec in found:
238 self.assertEqual(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
243 self.assertEqual(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
239
244
240 def test_db_query_default_keys(self):
245 def test_db_query_default_keys(self):
241 """default db_query excludes buffers"""
246 """default db_query excludes buffers"""
242 found = self.client.db_query({'msg_id': {'$ne' : ''}})
247 found = self.client.db_query({'msg_id': {'$ne' : ''}})
243 for rec in found:
248 for rec in found:
244 keys = set(rec.keys())
249 keys = set(rec.keys())
245 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
250 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
246 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
251 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
247
252
248 def test_db_query_msg_id(self):
253 def test_db_query_msg_id(self):
249 """ensure msg_id is always in db queries"""
254 """ensure msg_id is always in db queries"""
250 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
255 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
251 for rec in found:
256 for rec in found:
252 self.assertTrue('msg_id' in rec.keys())
257 self.assertTrue('msg_id' in rec.keys())
253 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
258 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
254 for rec in found:
259 for rec in found:
255 self.assertTrue('msg_id' in rec.keys())
260 self.assertTrue('msg_id' in rec.keys())
256 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
261 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
257 for rec in found:
262 for rec in found:
258 self.assertTrue('msg_id' in rec.keys())
263 self.assertTrue('msg_id' in rec.keys())
259
264
260 def test_db_query_get_result(self):
265 def test_db_query_get_result(self):
261 """pop in db_query shouldn't pop from result itself"""
266 """pop in db_query shouldn't pop from result itself"""
262 self.client[:].apply_sync(lambda : 1)
267 self.client[:].apply_sync(lambda : 1)
263 found = self.client.db_query({'msg_id': {'$ne' : ''}})
268 found = self.client.db_query({'msg_id': {'$ne' : ''}})
264 rc2 = clientmod.Client(profile='iptest')
269 rc2 = clientmod.Client(profile='iptest')
265 # If this bug is not fixed, this call will hang:
270 # If this bug is not fixed, this call will hang:
266 ar = rc2.get_result(self.client.history[-1])
271 ar = rc2.get_result(self.client.history[-1])
267 ar.wait(2)
272 ar.wait(2)
268 self.assertTrue(ar.ready())
273 self.assertTrue(ar.ready())
269 ar.get()
274 ar.get()
270 rc2.close()
275 rc2.close()
271
276
272 def test_db_query_in(self):
277 def test_db_query_in(self):
273 """test db query with '$in','$nin' operators"""
278 """test db query with '$in','$nin' operators"""
274 hist = self.client.hub_history()
279 hist = self.client.hub_history()
275 even = hist[::2]
280 even = hist[::2]
276 odd = hist[1::2]
281 odd = hist[1::2]
277 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
282 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
278 found = [ r['msg_id'] for r in recs ]
283 found = [ r['msg_id'] for r in recs ]
279 self.assertEqual(set(even), set(found))
284 self.assertEqual(set(even), set(found))
280 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
285 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
281 found = [ r['msg_id'] for r in recs ]
286 found = [ r['msg_id'] for r in recs ]
282 self.assertEqual(set(odd), set(found))
287 self.assertEqual(set(odd), set(found))
283
288
284 def test_hub_history(self):
289 def test_hub_history(self):
285 hist = self.client.hub_history()
290 hist = self.client.hub_history()
286 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
291 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
287 recdict = {}
292 recdict = {}
288 for rec in recs:
293 for rec in recs:
289 recdict[rec['msg_id']] = rec
294 recdict[rec['msg_id']] = rec
290
295
291 latest = datetime(1984,1,1)
296 latest = datetime(1984,1,1)
292 for msg_id in hist:
297 for msg_id in hist:
293 rec = recdict[msg_id]
298 rec = recdict[msg_id]
294 newt = rec['submitted']
299 newt = rec['submitted']
295 self.assertTrue(newt >= latest)
300 self.assertTrue(newt >= latest)
296 latest = newt
301 latest = newt
297 ar = self.client[-1].apply_async(lambda : 1)
302 ar = self.client[-1].apply_async(lambda : 1)
298 ar.get()
303 ar.get()
299 time.sleep(0.25)
304 time.sleep(0.25)
300 self.assertEqual(self.client.hub_history()[-1:],ar.msg_ids)
305 self.assertEqual(self.client.hub_history()[-1:],ar.msg_ids)
301
306
302 def _wait_for_idle(self):
307 def _wait_for_idle(self):
303 """wait for the cluster to become idle, according to the everyone."""
308 """wait for the cluster to become idle, according to the everyone."""
304 rc = self.client
309 rc = self.client
305
310
306 # step 0. wait for local results
311 # step 0. wait for local results
307 # this should be sufficient 99% of the time.
312 # this should be sufficient 99% of the time.
308 rc.wait(timeout=5)
313 rc.wait(timeout=5)
309
314
310 # step 1. wait for all requests to be noticed
315 # step 1. wait for all requests to be noticed
311 # timeout 5s, polling every 100ms
316 # timeout 5s, polling every 100ms
312 msg_ids = set(rc.history)
317 msg_ids = set(rc.history)
313 hub_hist = rc.hub_history()
318 hub_hist = rc.hub_history()
314 for i in range(50):
319 for i in range(50):
315 if msg_ids.difference(hub_hist):
320 if msg_ids.difference(hub_hist):
316 time.sleep(0.1)
321 time.sleep(0.1)
317 hub_hist = rc.hub_history()
322 hub_hist = rc.hub_history()
318 else:
323 else:
319 break
324 break
320
325
321 self.assertEqual(len(msg_ids.difference(hub_hist)), 0)
326 self.assertEqual(len(msg_ids.difference(hub_hist)), 0)
322
327
323 # step 2. wait for all requests to be done
328 # step 2. wait for all requests to be done
324 # timeout 5s, polling every 100ms
329 # timeout 5s, polling every 100ms
325 qs = rc.queue_status()
330 qs = rc.queue_status()
326 for i in range(50):
331 for i in range(50):
327 if qs['unassigned'] or any(qs[eid]['tasks'] + qs[eid]['queue'] for eid in qs if eid != 'unassigned'):
332 if qs['unassigned'] or any(qs[eid]['tasks'] + qs[eid]['queue'] for eid in qs if eid != 'unassigned'):
328 time.sleep(0.1)
333 time.sleep(0.1)
329 qs = rc.queue_status()
334 qs = rc.queue_status()
330 else:
335 else:
331 break
336 break
332
337
333 # ensure Hub up to date:
338 # ensure Hub up to date:
334 self.assertEqual(qs['unassigned'], 0)
339 self.assertEqual(qs['unassigned'], 0)
335 for eid in [ eid for eid in qs if eid != 'unassigned' ]:
340 for eid in [ eid for eid in qs if eid != 'unassigned' ]:
336 self.assertEqual(qs[eid]['tasks'], 0)
341 self.assertEqual(qs[eid]['tasks'], 0)
337 self.assertEqual(qs[eid]['queue'], 0)
342 self.assertEqual(qs[eid]['queue'], 0)
338
343
339
344
340 def test_resubmit(self):
345 def test_resubmit(self):
341 def f():
346 def f():
342 import random
347 import random
343 return random.random()
348 return random.random()
344 v = self.client.load_balanced_view()
349 v = self.client.load_balanced_view()
345 ar = v.apply_async(f)
350 ar = v.apply_async(f)
346 r1 = ar.get(1)
351 r1 = ar.get(1)
347 # give the Hub a chance to notice:
352 # give the Hub a chance to notice:
348 self._wait_for_idle()
353 self._wait_for_idle()
349 ahr = self.client.resubmit(ar.msg_ids)
354 ahr = self.client.resubmit(ar.msg_ids)
350 r2 = ahr.get(1)
355 r2 = ahr.get(1)
351 self.assertFalse(r1 == r2)
356 self.assertFalse(r1 == r2)
352
357
353 def test_resubmit_chain(self):
358 def test_resubmit_chain(self):
354 """resubmit resubmitted tasks"""
359 """resubmit resubmitted tasks"""
355 v = self.client.load_balanced_view()
360 v = self.client.load_balanced_view()
356 ar = v.apply_async(lambda x: x, 'x'*1024)
361 ar = v.apply_async(lambda x: x, 'x'*1024)
357 ar.get()
362 ar.get()
358 self._wait_for_idle()
363 self._wait_for_idle()
359 ars = [ar]
364 ars = [ar]
360
365
361 for i in range(10):
366 for i in range(10):
362 ar = ars[-1]
367 ar = ars[-1]
363 ar2 = self.client.resubmit(ar.msg_ids)
368 ar2 = self.client.resubmit(ar.msg_ids)
364
369
365 [ ar.get() for ar in ars ]
370 [ ar.get() for ar in ars ]
366
371
367 def test_resubmit_header(self):
372 def test_resubmit_header(self):
368 """resubmit shouldn't clobber the whole header"""
373 """resubmit shouldn't clobber the whole header"""
369 def f():
374 def f():
370 import random
375 import random
371 return random.random()
376 return random.random()
372 v = self.client.load_balanced_view()
377 v = self.client.load_balanced_view()
373 v.retries = 1
378 v.retries = 1
374 ar = v.apply_async(f)
379 ar = v.apply_async(f)
375 r1 = ar.get(1)
380 r1 = ar.get(1)
376 # give the Hub a chance to notice:
381 # give the Hub a chance to notice:
377 self._wait_for_idle()
382 self._wait_for_idle()
378 ahr = self.client.resubmit(ar.msg_ids)
383 ahr = self.client.resubmit(ar.msg_ids)
379 ahr.get(1)
384 ahr.get(1)
380 time.sleep(0.5)
385 time.sleep(0.5)
381 records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header')
386 records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header')
382 h1,h2 = [ r['header'] for r in records ]
387 h1,h2 = [ r['header'] for r in records ]
383 for key in set(h1.keys()).union(set(h2.keys())):
388 for key in set(h1.keys()).union(set(h2.keys())):
384 if key in ('msg_id', 'date'):
389 if key in ('msg_id', 'date'):
385 self.assertNotEqual(h1[key], h2[key])
390 self.assertNotEqual(h1[key], h2[key])
386 else:
391 else:
387 self.assertEqual(h1[key], h2[key])
392 self.assertEqual(h1[key], h2[key])
388
393
389 def test_resubmit_aborted(self):
394 def test_resubmit_aborted(self):
390 def f():
395 def f():
391 import random
396 import random
392 return random.random()
397 return random.random()
393 v = self.client.load_balanced_view()
398 v = self.client.load_balanced_view()
394 # restrict to one engine, so we can put a sleep
399 # restrict to one engine, so we can put a sleep
395 # ahead of the task, so it will get aborted
400 # ahead of the task, so it will get aborted
396 eid = self.client.ids[-1]
401 eid = self.client.ids[-1]
397 v.targets = [eid]
402 v.targets = [eid]
398 sleep = v.apply_async(time.sleep, 0.5)
403 sleep = v.apply_async(time.sleep, 0.5)
399 ar = v.apply_async(f)
404 ar = v.apply_async(f)
400 ar.abort()
405 ar.abort()
401 self.assertRaises(error.TaskAborted, ar.get)
406 self.assertRaises(error.TaskAborted, ar.get)
402 # Give the Hub a chance to get up to date:
407 # Give the Hub a chance to get up to date:
403 self._wait_for_idle()
408 self._wait_for_idle()
404 ahr = self.client.resubmit(ar.msg_ids)
409 ahr = self.client.resubmit(ar.msg_ids)
405 r2 = ahr.get(1)
410 r2 = ahr.get(1)
406
411
407 def test_resubmit_inflight(self):
412 def test_resubmit_inflight(self):
408 """resubmit of inflight task"""
413 """resubmit of inflight task"""
409 v = self.client.load_balanced_view()
414 v = self.client.load_balanced_view()
410 ar = v.apply_async(time.sleep,1)
415 ar = v.apply_async(time.sleep,1)
411 # give the message a chance to arrive
416 # give the message a chance to arrive
412 time.sleep(0.2)
417 time.sleep(0.2)
413 ahr = self.client.resubmit(ar.msg_ids)
418 ahr = self.client.resubmit(ar.msg_ids)
414 ar.get(2)
419 ar.get(2)
415 ahr.get(2)
420 ahr.get(2)
416
421
417 def test_resubmit_badkey(self):
422 def test_resubmit_badkey(self):
418 """ensure KeyError on resubmit of nonexistant task"""
423 """ensure KeyError on resubmit of nonexistant task"""
419 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
424 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
420
425
421 def test_purge_hub_results(self):
426 def test_purge_hub_results(self):
422 # ensure there are some tasks
427 # ensure there are some tasks
423 for i in range(5):
428 for i in range(5):
424 self.client[:].apply_sync(lambda : 1)
429 self.client[:].apply_sync(lambda : 1)
425 # Wait for the Hub to realise the result is done:
430 # Wait for the Hub to realise the result is done:
426 # This prevents a race condition, where we
431 # This prevents a race condition, where we
427 # might purge a result the Hub still thinks is pending.
432 # might purge a result the Hub still thinks is pending.
428 self._wait_for_idle()
433 self._wait_for_idle()
429 rc2 = clientmod.Client(profile='iptest')
434 rc2 = clientmod.Client(profile='iptest')
430 hist = self.client.hub_history()
435 hist = self.client.hub_history()
431 ahr = rc2.get_result([hist[-1]])
436 ahr = rc2.get_result([hist[-1]])
432 ahr.wait(10)
437 ahr.wait(10)
433 self.client.purge_hub_results(hist[-1])
438 self.client.purge_hub_results(hist[-1])
434 newhist = self.client.hub_history()
439 newhist = self.client.hub_history()
435 self.assertEqual(len(newhist)+1,len(hist))
440 self.assertEqual(len(newhist)+1,len(hist))
436 rc2.spin()
441 rc2.spin()
437 rc2.close()
442 rc2.close()
438
443
439 def test_purge_local_results(self):
444 def test_purge_local_results(self):
440 # ensure there are some tasks
445 # ensure there are some tasks
441 res = []
446 res = []
442 for i in range(5):
447 for i in range(5):
443 res.append(self.client[:].apply_async(lambda : 1))
448 res.append(self.client[:].apply_async(lambda : 1))
444 self._wait_for_idle()
449 self._wait_for_idle()
445 self.client.wait(10) # wait for the results to come back
450 self.client.wait(10) # wait for the results to come back
446 before = len(self.client.results)
451 before = len(self.client.results)
447 self.assertEqual(len(self.client.metadata),before)
452 self.assertEqual(len(self.client.metadata),before)
448 self.client.purge_local_results(res[-1])
453 self.client.purge_local_results(res[-1])
449 self.assertEqual(len(self.client.results),before-len(res[-1]), msg="Not removed from results")
454 self.assertEqual(len(self.client.results),before-len(res[-1]), msg="Not removed from results")
450 self.assertEqual(len(self.client.metadata),before-len(res[-1]), msg="Not removed from metadata")
455 self.assertEqual(len(self.client.metadata),before-len(res[-1]), msg="Not removed from metadata")
451
456
452 def test_purge_local_results_outstanding(self):
457 def test_purge_local_results_outstanding(self):
453 v = self.client[-1]
458 v = self.client[-1]
454 ar = v.apply_async(lambda : 1)
459 ar = v.apply_async(lambda : 1)
455 msg_id = ar.msg_ids[0]
460 msg_id = ar.msg_ids[0]
456 ar.get()
461 ar.get()
457 self._wait_for_idle()
462 self._wait_for_idle()
458 ar2 = v.apply_async(time.sleep, 1)
463 ar2 = v.apply_async(time.sleep, 1)
459 self.assertIn(msg_id, self.client.results)
464 self.assertIn(msg_id, self.client.results)
460 self.assertIn(msg_id, self.client.metadata)
465 self.assertIn(msg_id, self.client.metadata)
461 self.client.purge_local_results(ar)
466 self.client.purge_local_results(ar)
462 self.assertNotIn(msg_id, self.client.results)
467 self.assertNotIn(msg_id, self.client.results)
463 self.assertNotIn(msg_id, self.client.metadata)
468 self.assertNotIn(msg_id, self.client.metadata)
464 with self.assertRaises(RuntimeError):
469 with self.assertRaises(RuntimeError):
465 self.client.purge_local_results(ar2)
470 self.client.purge_local_results(ar2)
466 ar2.get()
471 ar2.get()
467 self.client.purge_local_results(ar2)
472 self.client.purge_local_results(ar2)
468
473
469 def test_purge_all_local_results_outstanding(self):
474 def test_purge_all_local_results_outstanding(self):
470 v = self.client[-1]
475 v = self.client[-1]
471 ar = v.apply_async(time.sleep, 1)
476 ar = v.apply_async(time.sleep, 1)
472 with self.assertRaises(RuntimeError):
477 with self.assertRaises(RuntimeError):
473 self.client.purge_local_results('all')
478 self.client.purge_local_results('all')
474 ar.get()
479 ar.get()
475 self.client.purge_local_results('all')
480 self.client.purge_local_results('all')
476
481
477 def test_purge_all_hub_results(self):
482 def test_purge_all_hub_results(self):
478 self.client.purge_hub_results('all')
483 self.client.purge_hub_results('all')
479 hist = self.client.hub_history()
484 hist = self.client.hub_history()
480 self.assertEqual(len(hist), 0)
485 self.assertEqual(len(hist), 0)
481
486
482 def test_purge_all_local_results(self):
487 def test_purge_all_local_results(self):
483 self.client.purge_local_results('all')
488 self.client.purge_local_results('all')
484 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
489 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
485 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
490 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
486
491
487 def test_purge_all_results(self):
492 def test_purge_all_results(self):
488 # ensure there are some tasks
493 # ensure there are some tasks
489 for i in range(5):
494 for i in range(5):
490 self.client[:].apply_sync(lambda : 1)
495 self.client[:].apply_sync(lambda : 1)
491 self.client.wait(10)
496 self.client.wait(10)
492 self._wait_for_idle()
497 self._wait_for_idle()
493 self.client.purge_results('all')
498 self.client.purge_results('all')
494 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
499 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
495 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
500 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
496 hist = self.client.hub_history()
501 hist = self.client.hub_history()
497 self.assertEqual(len(hist), 0, msg="hub history not empty")
502 self.assertEqual(len(hist), 0, msg="hub history not empty")
498
503
499 def test_purge_everything(self):
504 def test_purge_everything(self):
500 # ensure there are some tasks
505 # ensure there are some tasks
501 for i in range(5):
506 for i in range(5):
502 self.client[:].apply_sync(lambda : 1)
507 self.client[:].apply_sync(lambda : 1)
503 self.client.wait(10)
508 self.client.wait(10)
504 self._wait_for_idle()
509 self._wait_for_idle()
505 self.client.purge_everything()
510 self.client.purge_everything()
506 # The client results
511 # The client results
507 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
512 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
508 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
513 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
509 # The client "bookkeeping"
514 # The client "bookkeeping"
510 self.assertEqual(len(self.client.session.digest_history), 0, msg="session digest not empty")
515 self.assertEqual(len(self.client.session.digest_history), 0, msg="session digest not empty")
511 self.assertEqual(len(self.client.history), 0, msg="client history not empty")
516 self.assertEqual(len(self.client.history), 0, msg="client history not empty")
512 # the hub results
517 # the hub results
513 hist = self.client.hub_history()
518 hist = self.client.hub_history()
514 self.assertEqual(len(hist), 0, msg="hub history not empty")
519 self.assertEqual(len(hist), 0, msg="hub history not empty")
515
520
516
521
517 def test_spin_thread(self):
522 def test_spin_thread(self):
518 self.client.spin_thread(0.01)
523 self.client.spin_thread(0.01)
519 ar = self.client[-1].apply_async(lambda : 1)
524 ar = self.client[-1].apply_async(lambda : 1)
520 time.sleep(0.1)
525 time.sleep(0.1)
521 self.assertTrue(ar.wall_time < 0.1,
526 self.assertTrue(ar.wall_time < 0.1,
522 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
527 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
523 )
528 )
524
529
525 def test_stop_spin_thread(self):
530 def test_stop_spin_thread(self):
526 self.client.spin_thread(0.01)
531 self.client.spin_thread(0.01)
527 self.client.stop_spin_thread()
532 self.client.stop_spin_thread()
528 ar = self.client[-1].apply_async(lambda : 1)
533 ar = self.client[-1].apply_async(lambda : 1)
529 time.sleep(0.15)
534 time.sleep(0.15)
530 self.assertTrue(ar.wall_time > 0.1,
535 self.assertTrue(ar.wall_time > 0.1,
531 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
536 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
532 )
537 )
533
538
534 def test_activate(self):
539 def test_activate(self):
535 ip = get_ipython()
540 ip = get_ipython()
536 magics = ip.magics_manager.magics
541 magics = ip.magics_manager.magics
537 self.assertTrue('px' in magics['line'])
542 self.assertTrue('px' in magics['line'])
538 self.assertTrue('px' in magics['cell'])
543 self.assertTrue('px' in magics['cell'])
539 v0 = self.client.activate(-1, '0')
544 v0 = self.client.activate(-1, '0')
540 self.assertTrue('px0' in magics['line'])
545 self.assertTrue('px0' in magics['line'])
541 self.assertTrue('px0' in magics['cell'])
546 self.assertTrue('px0' in magics['cell'])
542 self.assertEqual(v0.targets, self.client.ids[-1])
547 self.assertEqual(v0.targets, self.client.ids[-1])
543 v0 = self.client.activate('all', 'all')
548 v0 = self.client.activate('all', 'all')
544 self.assertTrue('pxall' in magics['line'])
549 self.assertTrue('pxall' in magics['line'])
545 self.assertTrue('pxall' in magics['cell'])
550 self.assertTrue('pxall' in magics['cell'])
546 self.assertEqual(v0.targets, 'all')
551 self.assertEqual(v0.targets, 'all')
General Comments 0
You need to be logged in to leave comments. Login now