##// END OF EJS Templates
Merge pull request #4568 from minrk/wait-fix-again-keyerror...
Min RK -
r13692:b6dce27d merge
parent child Browse files
Show More
@@ -1,522 +1,522
1 """Tests for parallel client.py
1 """Tests for parallel client.py
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 from __future__ import division
19 from __future__ import division
20
20
21 import time
21 import time
22 from datetime import datetime
22 from datetime import datetime
23 from tempfile import mktemp
23 from tempfile import mktemp
24
24
25 import zmq
25 import zmq
26
26
27 from IPython import parallel
27 from IPython import parallel
28 from IPython.parallel.client import client as clientmod
28 from IPython.parallel.client import client as clientmod
29 from IPython.parallel import error
29 from IPython.parallel import error
30 from IPython.parallel import AsyncResult, AsyncHubResult
30 from IPython.parallel import AsyncResult, AsyncHubResult
31 from IPython.parallel import LoadBalancedView, DirectView
31 from IPython.parallel import LoadBalancedView, DirectView
32
32
33 from .clienttest import ClusterTestCase, segfault, wait, add_engines
33 from .clienttest import ClusterTestCase, segfault, wait, add_engines
34
34
35 def setup():
35 def setup():
36 add_engines(4, total=True)
36 add_engines(4, total=True)
37
37
38 class TestClient(ClusterTestCase):
38 class TestClient(ClusterTestCase):
39
39
40 def test_ids(self):
40 def test_ids(self):
41 n = len(self.client.ids)
41 n = len(self.client.ids)
42 self.add_engines(2)
42 self.add_engines(2)
43 self.assertEqual(len(self.client.ids), n+2)
43 self.assertEqual(len(self.client.ids), n+2)
44
44
45 def test_view_indexing(self):
45 def test_view_indexing(self):
46 """test index access for views"""
46 """test index access for views"""
47 self.minimum_engines(4)
47 self.minimum_engines(4)
48 targets = self.client._build_targets('all')[-1]
48 targets = self.client._build_targets('all')[-1]
49 v = self.client[:]
49 v = self.client[:]
50 self.assertEqual(v.targets, targets)
50 self.assertEqual(v.targets, targets)
51 t = self.client.ids[2]
51 t = self.client.ids[2]
52 v = self.client[t]
52 v = self.client[t]
53 self.assertTrue(isinstance(v, DirectView))
53 self.assertTrue(isinstance(v, DirectView))
54 self.assertEqual(v.targets, t)
54 self.assertEqual(v.targets, t)
55 t = self.client.ids[2:4]
55 t = self.client.ids[2:4]
56 v = self.client[t]
56 v = self.client[t]
57 self.assertTrue(isinstance(v, DirectView))
57 self.assertTrue(isinstance(v, DirectView))
58 self.assertEqual(v.targets, t)
58 self.assertEqual(v.targets, t)
59 v = self.client[::2]
59 v = self.client[::2]
60 self.assertTrue(isinstance(v, DirectView))
60 self.assertTrue(isinstance(v, DirectView))
61 self.assertEqual(v.targets, targets[::2])
61 self.assertEqual(v.targets, targets[::2])
62 v = self.client[1::3]
62 v = self.client[1::3]
63 self.assertTrue(isinstance(v, DirectView))
63 self.assertTrue(isinstance(v, DirectView))
64 self.assertEqual(v.targets, targets[1::3])
64 self.assertEqual(v.targets, targets[1::3])
65 v = self.client[:-3]
65 v = self.client[:-3]
66 self.assertTrue(isinstance(v, DirectView))
66 self.assertTrue(isinstance(v, DirectView))
67 self.assertEqual(v.targets, targets[:-3])
67 self.assertEqual(v.targets, targets[:-3])
68 v = self.client[-1]
68 v = self.client[-1]
69 self.assertTrue(isinstance(v, DirectView))
69 self.assertTrue(isinstance(v, DirectView))
70 self.assertEqual(v.targets, targets[-1])
70 self.assertEqual(v.targets, targets[-1])
71 self.assertRaises(TypeError, lambda : self.client[None])
71 self.assertRaises(TypeError, lambda : self.client[None])
72
72
73 def test_lbview_targets(self):
73 def test_lbview_targets(self):
74 """test load_balanced_view targets"""
74 """test load_balanced_view targets"""
75 v = self.client.load_balanced_view()
75 v = self.client.load_balanced_view()
76 self.assertEqual(v.targets, None)
76 self.assertEqual(v.targets, None)
77 v = self.client.load_balanced_view(-1)
77 v = self.client.load_balanced_view(-1)
78 self.assertEqual(v.targets, [self.client.ids[-1]])
78 self.assertEqual(v.targets, [self.client.ids[-1]])
79 v = self.client.load_balanced_view('all')
79 v = self.client.load_balanced_view('all')
80 self.assertEqual(v.targets, None)
80 self.assertEqual(v.targets, None)
81
81
82 def test_dview_targets(self):
82 def test_dview_targets(self):
83 """test direct_view targets"""
83 """test direct_view targets"""
84 v = self.client.direct_view()
84 v = self.client.direct_view()
85 self.assertEqual(v.targets, 'all')
85 self.assertEqual(v.targets, 'all')
86 v = self.client.direct_view('all')
86 v = self.client.direct_view('all')
87 self.assertEqual(v.targets, 'all')
87 self.assertEqual(v.targets, 'all')
88 v = self.client.direct_view(-1)
88 v = self.client.direct_view(-1)
89 self.assertEqual(v.targets, self.client.ids[-1])
89 self.assertEqual(v.targets, self.client.ids[-1])
90
90
91 def test_lazy_all_targets(self):
91 def test_lazy_all_targets(self):
92 """test lazy evaluation of rc.direct_view('all')"""
92 """test lazy evaluation of rc.direct_view('all')"""
93 v = self.client.direct_view()
93 v = self.client.direct_view()
94 self.assertEqual(v.targets, 'all')
94 self.assertEqual(v.targets, 'all')
95
95
96 def double(x):
96 def double(x):
97 return x*2
97 return x*2
98 seq = list(range(100))
98 seq = list(range(100))
99 ref = [ double(x) for x in seq ]
99 ref = [ double(x) for x in seq ]
100
100
101 # add some engines, which should be used
101 # add some engines, which should be used
102 self.add_engines(1)
102 self.add_engines(1)
103 n1 = len(self.client.ids)
103 n1 = len(self.client.ids)
104
104
105 # simple apply
105 # simple apply
106 r = v.apply_sync(lambda : 1)
106 r = v.apply_sync(lambda : 1)
107 self.assertEqual(r, [1] * n1)
107 self.assertEqual(r, [1] * n1)
108
108
109 # map goes through remotefunction
109 # map goes through remotefunction
110 r = v.map_sync(double, seq)
110 r = v.map_sync(double, seq)
111 self.assertEqual(r, ref)
111 self.assertEqual(r, ref)
112
112
113 # add a couple more engines, and try again
113 # add a couple more engines, and try again
114 self.add_engines(2)
114 self.add_engines(2)
115 n2 = len(self.client.ids)
115 n2 = len(self.client.ids)
116 self.assertNotEqual(n2, n1)
116 self.assertNotEqual(n2, n1)
117
117
118 # apply
118 # apply
119 r = v.apply_sync(lambda : 1)
119 r = v.apply_sync(lambda : 1)
120 self.assertEqual(r, [1] * n2)
120 self.assertEqual(r, [1] * n2)
121
121
122 # map
122 # map
123 r = v.map_sync(double, seq)
123 r = v.map_sync(double, seq)
124 self.assertEqual(r, ref)
124 self.assertEqual(r, ref)
125
125
126 def test_targets(self):
126 def test_targets(self):
127 """test various valid targets arguments"""
127 """test various valid targets arguments"""
128 build = self.client._build_targets
128 build = self.client._build_targets
129 ids = self.client.ids
129 ids = self.client.ids
130 idents,targets = build(None)
130 idents,targets = build(None)
131 self.assertEqual(ids, targets)
131 self.assertEqual(ids, targets)
132
132
133 def test_clear(self):
133 def test_clear(self):
134 """test clear behavior"""
134 """test clear behavior"""
135 self.minimum_engines(2)
135 self.minimum_engines(2)
136 v = self.client[:]
136 v = self.client[:]
137 v.block=True
137 v.block=True
138 v.push(dict(a=5))
138 v.push(dict(a=5))
139 v.pull('a')
139 v.pull('a')
140 id0 = self.client.ids[-1]
140 id0 = self.client.ids[-1]
141 self.client.clear(targets=id0, block=True)
141 self.client.clear(targets=id0, block=True)
142 a = self.client[:-1].get('a')
142 a = self.client[:-1].get('a')
143 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
143 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
144 self.client.clear(block=True)
144 self.client.clear(block=True)
145 for i in self.client.ids:
145 for i in self.client.ids:
146 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
146 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
147
147
148 def test_get_result(self):
148 def test_get_result(self):
149 """test getting results from the Hub."""
149 """test getting results from the Hub."""
150 c = clientmod.Client(profile='iptest')
150 c = clientmod.Client(profile='iptest')
151 t = c.ids[-1]
151 t = c.ids[-1]
152 ar = c[t].apply_async(wait, 1)
152 ar = c[t].apply_async(wait, 1)
153 # give the monitor time to notice the message
153 # give the monitor time to notice the message
154 time.sleep(.25)
154 time.sleep(.25)
155 ahr = self.client.get_result(ar.msg_ids[0])
155 ahr = self.client.get_result(ar.msg_ids[0])
156 self.assertTrue(isinstance(ahr, AsyncHubResult))
156 self.assertTrue(isinstance(ahr, AsyncHubResult))
157 self.assertEqual(ahr.get(), ar.get())
157 self.assertEqual(ahr.get(), ar.get())
158 ar2 = self.client.get_result(ar.msg_ids[0])
158 ar2 = self.client.get_result(ar.msg_ids[0])
159 self.assertFalse(isinstance(ar2, AsyncHubResult))
159 self.assertFalse(isinstance(ar2, AsyncHubResult))
160 c.close()
160 c.close()
161
161
162 def test_get_execute_result(self):
162 def test_get_execute_result(self):
163 """test getting execute results from the Hub."""
163 """test getting execute results from the Hub."""
164 c = clientmod.Client(profile='iptest')
164 c = clientmod.Client(profile='iptest')
165 t = c.ids[-1]
165 t = c.ids[-1]
166 cell = '\n'.join([
166 cell = '\n'.join([
167 'import time',
167 'import time',
168 'time.sleep(0.25)',
168 'time.sleep(0.25)',
169 '5'
169 '5'
170 ])
170 ])
171 ar = c[t].execute("import time; time.sleep(1)", silent=False)
171 ar = c[t].execute("import time; time.sleep(1)", silent=False)
172 # give the monitor time to notice the message
172 # give the monitor time to notice the message
173 time.sleep(.25)
173 time.sleep(.25)
174 ahr = self.client.get_result(ar.msg_ids[0])
174 ahr = self.client.get_result(ar.msg_ids[0])
175 self.assertTrue(isinstance(ahr, AsyncHubResult))
175 self.assertTrue(isinstance(ahr, AsyncHubResult))
176 self.assertEqual(ahr.get().pyout, ar.get().pyout)
176 self.assertEqual(ahr.get().pyout, ar.get().pyout)
177 ar2 = self.client.get_result(ar.msg_ids[0])
177 ar2 = self.client.get_result(ar.msg_ids[0])
178 self.assertFalse(isinstance(ar2, AsyncHubResult))
178 self.assertFalse(isinstance(ar2, AsyncHubResult))
179 c.close()
179 c.close()
180
180
181 def test_ids_list(self):
181 def test_ids_list(self):
182 """test client.ids"""
182 """test client.ids"""
183 ids = self.client.ids
183 ids = self.client.ids
184 self.assertEqual(ids, self.client._ids)
184 self.assertEqual(ids, self.client._ids)
185 self.assertFalse(ids is self.client._ids)
185 self.assertFalse(ids is self.client._ids)
186 ids.remove(ids[-1])
186 ids.remove(ids[-1])
187 self.assertNotEqual(ids, self.client._ids)
187 self.assertNotEqual(ids, self.client._ids)
188
188
189 def test_queue_status(self):
189 def test_queue_status(self):
190 ids = self.client.ids
190 ids = self.client.ids
191 id0 = ids[0]
191 id0 = ids[0]
192 qs = self.client.queue_status(targets=id0)
192 qs = self.client.queue_status(targets=id0)
193 self.assertTrue(isinstance(qs, dict))
193 self.assertTrue(isinstance(qs, dict))
194 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
194 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
195 allqs = self.client.queue_status()
195 allqs = self.client.queue_status()
196 self.assertTrue(isinstance(allqs, dict))
196 self.assertTrue(isinstance(allqs, dict))
197 intkeys = list(allqs.keys())
197 intkeys = list(allqs.keys())
198 intkeys.remove('unassigned')
198 intkeys.remove('unassigned')
199 self.assertEqual(sorted(intkeys), sorted(self.client.ids))
199 self.assertEqual(sorted(intkeys), sorted(self.client.ids))
200 unassigned = allqs.pop('unassigned')
200 unassigned = allqs.pop('unassigned')
201 for eid,qs in allqs.items():
201 for eid,qs in allqs.items():
202 self.assertTrue(isinstance(qs, dict))
202 self.assertTrue(isinstance(qs, dict))
203 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
203 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
204
204
205 def test_shutdown(self):
205 def test_shutdown(self):
206 ids = self.client.ids
206 ids = self.client.ids
207 id0 = ids[0]
207 id0 = ids[0]
208 self.client.shutdown(id0, block=True)
208 self.client.shutdown(id0, block=True)
209 while id0 in self.client.ids:
209 while id0 in self.client.ids:
210 time.sleep(0.1)
210 time.sleep(0.1)
211 self.client.spin()
211 self.client.spin()
212
212
213 self.assertRaises(IndexError, lambda : self.client[id0])
213 self.assertRaises(IndexError, lambda : self.client[id0])
214
214
215 def test_result_status(self):
215 def test_result_status(self):
216 pass
216 pass
217 # to be written
217 # to be written
218
218
219 def test_db_query_dt(self):
219 def test_db_query_dt(self):
220 """test db query by date"""
220 """test db query by date"""
221 hist = self.client.hub_history()
221 hist = self.client.hub_history()
222 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
222 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
223 tic = middle['submitted']
223 tic = middle['submitted']
224 before = self.client.db_query({'submitted' : {'$lt' : tic}})
224 before = self.client.db_query({'submitted' : {'$lt' : tic}})
225 after = self.client.db_query({'submitted' : {'$gte' : tic}})
225 after = self.client.db_query({'submitted' : {'$gte' : tic}})
226 self.assertEqual(len(before)+len(after),len(hist))
226 self.assertEqual(len(before)+len(after),len(hist))
227 for b in before:
227 for b in before:
228 self.assertTrue(b['submitted'] < tic)
228 self.assertTrue(b['submitted'] < tic)
229 for a in after:
229 for a in after:
230 self.assertTrue(a['submitted'] >= tic)
230 self.assertTrue(a['submitted'] >= tic)
231 same = self.client.db_query({'submitted' : tic})
231 same = self.client.db_query({'submitted' : tic})
232 for s in same:
232 for s in same:
233 self.assertTrue(s['submitted'] == tic)
233 self.assertTrue(s['submitted'] == tic)
234
234
235 def test_db_query_keys(self):
235 def test_db_query_keys(self):
236 """test extracting subset of record keys"""
236 """test extracting subset of record keys"""
237 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
237 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
238 for rec in found:
238 for rec in found:
239 self.assertEqual(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
239 self.assertEqual(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
240
240
241 def test_db_query_default_keys(self):
241 def test_db_query_default_keys(self):
242 """default db_query excludes buffers"""
242 """default db_query excludes buffers"""
243 found = self.client.db_query({'msg_id': {'$ne' : ''}})
243 found = self.client.db_query({'msg_id': {'$ne' : ''}})
244 for rec in found:
244 for rec in found:
245 keys = set(rec.keys())
245 keys = set(rec.keys())
246 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
246 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
247 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
247 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
248
248
249 def test_db_query_msg_id(self):
249 def test_db_query_msg_id(self):
250 """ensure msg_id is always in db queries"""
250 """ensure msg_id is always in db queries"""
251 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
251 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
252 for rec in found:
252 for rec in found:
253 self.assertTrue('msg_id' in rec.keys())
253 self.assertTrue('msg_id' in rec.keys())
254 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
254 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
255 for rec in found:
255 for rec in found:
256 self.assertTrue('msg_id' in rec.keys())
256 self.assertTrue('msg_id' in rec.keys())
257 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
257 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
258 for rec in found:
258 for rec in found:
259 self.assertTrue('msg_id' in rec.keys())
259 self.assertTrue('msg_id' in rec.keys())
260
260
261 def test_db_query_get_result(self):
261 def test_db_query_get_result(self):
262 """pop in db_query shouldn't pop from result itself"""
262 """pop in db_query shouldn't pop from result itself"""
263 self.client[:].apply_sync(lambda : 1)
263 self.client[:].apply_sync(lambda : 1)
264 found = self.client.db_query({'msg_id': {'$ne' : ''}})
264 found = self.client.db_query({'msg_id': {'$ne' : ''}})
265 rc2 = clientmod.Client(profile='iptest')
265 rc2 = clientmod.Client(profile='iptest')
266 # If this bug is not fixed, this call will hang:
266 # If this bug is not fixed, this call will hang:
267 ar = rc2.get_result(self.client.history[-1])
267 ar = rc2.get_result(self.client.history[-1])
268 ar.wait(2)
268 ar.wait(2)
269 self.assertTrue(ar.ready())
269 self.assertTrue(ar.ready())
270 ar.get()
270 ar.get()
271 rc2.close()
271 rc2.close()
272
272
273 def test_db_query_in(self):
273 def test_db_query_in(self):
274 """test db query with '$in','$nin' operators"""
274 """test db query with '$in','$nin' operators"""
275 hist = self.client.hub_history()
275 hist = self.client.hub_history()
276 even = hist[::2]
276 even = hist[::2]
277 odd = hist[1::2]
277 odd = hist[1::2]
278 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
278 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
279 found = [ r['msg_id'] for r in recs ]
279 found = [ r['msg_id'] for r in recs ]
280 self.assertEqual(set(even), set(found))
280 self.assertEqual(set(even), set(found))
281 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
281 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
282 found = [ r['msg_id'] for r in recs ]
282 found = [ r['msg_id'] for r in recs ]
283 self.assertEqual(set(odd), set(found))
283 self.assertEqual(set(odd), set(found))
284
284
285 def test_hub_history(self):
285 def test_hub_history(self):
286 hist = self.client.hub_history()
286 hist = self.client.hub_history()
287 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
287 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
288 recdict = {}
288 recdict = {}
289 for rec in recs:
289 for rec in recs:
290 recdict[rec['msg_id']] = rec
290 recdict[rec['msg_id']] = rec
291
291
292 latest = datetime(1984,1,1)
292 latest = datetime(1984,1,1)
293 for msg_id in hist:
293 for msg_id in hist:
294 rec = recdict[msg_id]
294 rec = recdict[msg_id]
295 newt = rec['submitted']
295 newt = rec['submitted']
296 self.assertTrue(newt >= latest)
296 self.assertTrue(newt >= latest)
297 latest = newt
297 latest = newt
298 ar = self.client[-1].apply_async(lambda : 1)
298 ar = self.client[-1].apply_async(lambda : 1)
299 ar.get()
299 ar.get()
300 time.sleep(0.25)
300 time.sleep(0.25)
301 self.assertEqual(self.client.hub_history()[-1:],ar.msg_ids)
301 self.assertEqual(self.client.hub_history()[-1:],ar.msg_ids)
302
302
303 def _wait_for_idle(self):
303 def _wait_for_idle(self):
304 """wait for the cluster to become idle, according to the everyone."""
304 """wait for the cluster to become idle, according to the everyone."""
305 rc = self.client
305 rc = self.client
306
306
307 # step 0. wait for local results
307 # step 0. wait for local results
308 # this should be sufficient 99% of the time.
308 # this should be sufficient 99% of the time.
309 rc.wait(timeout=5)
309 rc.wait(timeout=5)
310
310
311 # step 1. wait for all requests to be noticed
311 # step 1. wait for all requests to be noticed
312 # timeout 5s, polling every 100ms
312 # timeout 5s, polling every 100ms
313 msg_ids = set(rc.history)
313 msg_ids = set(rc.history)
314 hub_hist = rc.hub_history()
314 hub_hist = rc.hub_history()
315 for i in range(50):
315 for i in range(50):
316 if msg_ids.difference(hub_hist):
316 if msg_ids.difference(hub_hist):
317 time.sleep(0.1)
317 time.sleep(0.1)
318 hub_hist = rc.hub_history()
318 hub_hist = rc.hub_history()
319 else:
319 else:
320 break
320 break
321
321
322 self.assertEqual(len(msg_ids.difference(hub_hist)), 0)
322 self.assertEqual(len(msg_ids.difference(hub_hist)), 0)
323
323
324 # step 2. wait for all requests to be done
324 # step 2. wait for all requests to be done
325 # timeout 5s, polling every 100ms
325 # timeout 5s, polling every 100ms
326 qs = rc.queue_status()
326 qs = rc.queue_status()
327 for i in range(50):
327 for i in range(50):
328 if qs['unassigned'] or any(qs[eid]['tasks'] + qs[eid]['queue'] for eid in rc.ids):
328 if qs['unassigned'] or any(qs[eid]['tasks'] + qs[eid]['queue'] for eid in qs if eid != 'unassigned'):
329 time.sleep(0.1)
329 time.sleep(0.1)
330 qs = rc.queue_status()
330 qs = rc.queue_status()
331 else:
331 else:
332 break
332 break
333
333
334 # ensure Hub up to date:
334 # ensure Hub up to date:
335 self.assertEqual(qs['unassigned'], 0)
335 self.assertEqual(qs['unassigned'], 0)
336 for eid in rc.ids:
336 for eid in rc.ids:
337 self.assertEqual(qs[eid]['tasks'], 0)
337 self.assertEqual(qs[eid]['tasks'], 0)
338 self.assertEqual(qs[eid]['queue'], 0)
338 self.assertEqual(qs[eid]['queue'], 0)
339
339
340
340
341 def test_resubmit(self):
341 def test_resubmit(self):
342 def f():
342 def f():
343 import random
343 import random
344 return random.random()
344 return random.random()
345 v = self.client.load_balanced_view()
345 v = self.client.load_balanced_view()
346 ar = v.apply_async(f)
346 ar = v.apply_async(f)
347 r1 = ar.get(1)
347 r1 = ar.get(1)
348 # give the Hub a chance to notice:
348 # give the Hub a chance to notice:
349 self._wait_for_idle()
349 self._wait_for_idle()
350 ahr = self.client.resubmit(ar.msg_ids)
350 ahr = self.client.resubmit(ar.msg_ids)
351 r2 = ahr.get(1)
351 r2 = ahr.get(1)
352 self.assertFalse(r1 == r2)
352 self.assertFalse(r1 == r2)
353
353
354 def test_resubmit_chain(self):
354 def test_resubmit_chain(self):
355 """resubmit resubmitted tasks"""
355 """resubmit resubmitted tasks"""
356 v = self.client.load_balanced_view()
356 v = self.client.load_balanced_view()
357 ar = v.apply_async(lambda x: x, 'x'*1024)
357 ar = v.apply_async(lambda x: x, 'x'*1024)
358 ar.get()
358 ar.get()
359 self._wait_for_idle()
359 self._wait_for_idle()
360 ars = [ar]
360 ars = [ar]
361
361
362 for i in range(10):
362 for i in range(10):
363 ar = ars[-1]
363 ar = ars[-1]
364 ar2 = self.client.resubmit(ar.msg_ids)
364 ar2 = self.client.resubmit(ar.msg_ids)
365
365
366 [ ar.get() for ar in ars ]
366 [ ar.get() for ar in ars ]
367
367
368 def test_resubmit_header(self):
368 def test_resubmit_header(self):
369 """resubmit shouldn't clobber the whole header"""
369 """resubmit shouldn't clobber the whole header"""
370 def f():
370 def f():
371 import random
371 import random
372 return random.random()
372 return random.random()
373 v = self.client.load_balanced_view()
373 v = self.client.load_balanced_view()
374 v.retries = 1
374 v.retries = 1
375 ar = v.apply_async(f)
375 ar = v.apply_async(f)
376 r1 = ar.get(1)
376 r1 = ar.get(1)
377 # give the Hub a chance to notice:
377 # give the Hub a chance to notice:
378 self._wait_for_idle()
378 self._wait_for_idle()
379 ahr = self.client.resubmit(ar.msg_ids)
379 ahr = self.client.resubmit(ar.msg_ids)
380 ahr.get(1)
380 ahr.get(1)
381 time.sleep(0.5)
381 time.sleep(0.5)
382 records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header')
382 records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header')
383 h1,h2 = [ r['header'] for r in records ]
383 h1,h2 = [ r['header'] for r in records ]
384 for key in set(h1.keys()).union(set(h2.keys())):
384 for key in set(h1.keys()).union(set(h2.keys())):
385 if key in ('msg_id', 'date'):
385 if key in ('msg_id', 'date'):
386 self.assertNotEqual(h1[key], h2[key])
386 self.assertNotEqual(h1[key], h2[key])
387 else:
387 else:
388 self.assertEqual(h1[key], h2[key])
388 self.assertEqual(h1[key], h2[key])
389
389
390 def test_resubmit_aborted(self):
390 def test_resubmit_aborted(self):
391 def f():
391 def f():
392 import random
392 import random
393 return random.random()
393 return random.random()
394 v = self.client.load_balanced_view()
394 v = self.client.load_balanced_view()
395 # restrict to one engine, so we can put a sleep
395 # restrict to one engine, so we can put a sleep
396 # ahead of the task, so it will get aborted
396 # ahead of the task, so it will get aborted
397 eid = self.client.ids[-1]
397 eid = self.client.ids[-1]
398 v.targets = [eid]
398 v.targets = [eid]
399 sleep = v.apply_async(time.sleep, 0.5)
399 sleep = v.apply_async(time.sleep, 0.5)
400 ar = v.apply_async(f)
400 ar = v.apply_async(f)
401 ar.abort()
401 ar.abort()
402 self.assertRaises(error.TaskAborted, ar.get)
402 self.assertRaises(error.TaskAborted, ar.get)
403 # Give the Hub a chance to get up to date:
403 # Give the Hub a chance to get up to date:
404 self._wait_for_idle()
404 self._wait_for_idle()
405 ahr = self.client.resubmit(ar.msg_ids)
405 ahr = self.client.resubmit(ar.msg_ids)
406 r2 = ahr.get(1)
406 r2 = ahr.get(1)
407
407
408 def test_resubmit_inflight(self):
408 def test_resubmit_inflight(self):
409 """resubmit of inflight task"""
409 """resubmit of inflight task"""
410 v = self.client.load_balanced_view()
410 v = self.client.load_balanced_view()
411 ar = v.apply_async(time.sleep,1)
411 ar = v.apply_async(time.sleep,1)
412 # give the message a chance to arrive
412 # give the message a chance to arrive
413 time.sleep(0.2)
413 time.sleep(0.2)
414 ahr = self.client.resubmit(ar.msg_ids)
414 ahr = self.client.resubmit(ar.msg_ids)
415 ar.get(2)
415 ar.get(2)
416 ahr.get(2)
416 ahr.get(2)
417
417
418 def test_resubmit_badkey(self):
418 def test_resubmit_badkey(self):
419 """ensure KeyError on resubmit of nonexistant task"""
419 """ensure KeyError on resubmit of nonexistant task"""
420 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
420 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
421
421
422 def test_purge_hub_results(self):
422 def test_purge_hub_results(self):
423 # ensure there are some tasks
423 # ensure there are some tasks
424 for i in range(5):
424 for i in range(5):
425 self.client[:].apply_sync(lambda : 1)
425 self.client[:].apply_sync(lambda : 1)
426 # Wait for the Hub to realise the result is done:
426 # Wait for the Hub to realise the result is done:
427 # This prevents a race condition, where we
427 # This prevents a race condition, where we
428 # might purge a result the Hub still thinks is pending.
428 # might purge a result the Hub still thinks is pending.
429 self._wait_for_idle()
429 self._wait_for_idle()
430 rc2 = clientmod.Client(profile='iptest')
430 rc2 = clientmod.Client(profile='iptest')
431 hist = self.client.hub_history()
431 hist = self.client.hub_history()
432 ahr = rc2.get_result([hist[-1]])
432 ahr = rc2.get_result([hist[-1]])
433 ahr.wait(10)
433 ahr.wait(10)
434 self.client.purge_hub_results(hist[-1])
434 self.client.purge_hub_results(hist[-1])
435 newhist = self.client.hub_history()
435 newhist = self.client.hub_history()
436 self.assertEqual(len(newhist)+1,len(hist))
436 self.assertEqual(len(newhist)+1,len(hist))
437 rc2.spin()
437 rc2.spin()
438 rc2.close()
438 rc2.close()
439
439
440 def test_purge_local_results(self):
440 def test_purge_local_results(self):
441 # ensure there are some tasks
441 # ensure there are some tasks
442 res = []
442 res = []
443 for i in range(5):
443 for i in range(5):
444 res.append(self.client[:].apply_async(lambda : 1))
444 res.append(self.client[:].apply_async(lambda : 1))
445 self._wait_for_idle()
445 self._wait_for_idle()
446 self.client.wait(10) # wait for the results to come back
446 self.client.wait(10) # wait for the results to come back
447 before = len(self.client.results)
447 before = len(self.client.results)
448 self.assertEqual(len(self.client.metadata),before)
448 self.assertEqual(len(self.client.metadata),before)
449 self.client.purge_local_results(res[-1])
449 self.client.purge_local_results(res[-1])
450 self.assertEqual(len(self.client.results),before-len(res[-1]), msg="Not removed from results")
450 self.assertEqual(len(self.client.results),before-len(res[-1]), msg="Not removed from results")
451 self.assertEqual(len(self.client.metadata),before-len(res[-1]), msg="Not removed from metadata")
451 self.assertEqual(len(self.client.metadata),before-len(res[-1]), msg="Not removed from metadata")
452
452
453 def test_purge_all_hub_results(self):
453 def test_purge_all_hub_results(self):
454 self.client.purge_hub_results('all')
454 self.client.purge_hub_results('all')
455 hist = self.client.hub_history()
455 hist = self.client.hub_history()
456 self.assertEqual(len(hist), 0)
456 self.assertEqual(len(hist), 0)
457
457
458 def test_purge_all_local_results(self):
458 def test_purge_all_local_results(self):
459 self.client.purge_local_results('all')
459 self.client.purge_local_results('all')
460 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
460 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
461 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
461 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
462
462
463 def test_purge_all_results(self):
463 def test_purge_all_results(self):
464 # ensure there are some tasks
464 # ensure there are some tasks
465 for i in range(5):
465 for i in range(5):
466 self.client[:].apply_sync(lambda : 1)
466 self.client[:].apply_sync(lambda : 1)
467 self.client.wait(10)
467 self.client.wait(10)
468 self._wait_for_idle()
468 self._wait_for_idle()
469 self.client.purge_results('all')
469 self.client.purge_results('all')
470 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
470 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
471 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
471 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
472 hist = self.client.hub_history()
472 hist = self.client.hub_history()
473 self.assertEqual(len(hist), 0, msg="hub history not empty")
473 self.assertEqual(len(hist), 0, msg="hub history not empty")
474
474
475 def test_purge_everything(self):
475 def test_purge_everything(self):
476 # ensure there are some tasks
476 # ensure there are some tasks
477 for i in range(5):
477 for i in range(5):
478 self.client[:].apply_sync(lambda : 1)
478 self.client[:].apply_sync(lambda : 1)
479 self.client.wait(10)
479 self.client.wait(10)
480 self._wait_for_idle()
480 self._wait_for_idle()
481 self.client.purge_everything()
481 self.client.purge_everything()
482 # The client results
482 # The client results
483 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
483 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
484 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
484 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
485 # The client "bookkeeping"
485 # The client "bookkeeping"
486 self.assertEqual(len(self.client.session.digest_history), 0, msg="session digest not empty")
486 self.assertEqual(len(self.client.session.digest_history), 0, msg="session digest not empty")
487 self.assertEqual(len(self.client.history), 0, msg="client history not empty")
487 self.assertEqual(len(self.client.history), 0, msg="client history not empty")
488 # the hub results
488 # the hub results
489 hist = self.client.hub_history()
489 hist = self.client.hub_history()
490 self.assertEqual(len(hist), 0, msg="hub history not empty")
490 self.assertEqual(len(hist), 0, msg="hub history not empty")
491
491
492
492
493 def test_spin_thread(self):
493 def test_spin_thread(self):
494 self.client.spin_thread(0.01)
494 self.client.spin_thread(0.01)
495 ar = self.client[-1].apply_async(lambda : 1)
495 ar = self.client[-1].apply_async(lambda : 1)
496 time.sleep(0.1)
496 time.sleep(0.1)
497 self.assertTrue(ar.wall_time < 0.1,
497 self.assertTrue(ar.wall_time < 0.1,
498 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
498 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
499 )
499 )
500
500
501 def test_stop_spin_thread(self):
501 def test_stop_spin_thread(self):
502 self.client.spin_thread(0.01)
502 self.client.spin_thread(0.01)
503 self.client.stop_spin_thread()
503 self.client.stop_spin_thread()
504 ar = self.client[-1].apply_async(lambda : 1)
504 ar = self.client[-1].apply_async(lambda : 1)
505 time.sleep(0.15)
505 time.sleep(0.15)
506 self.assertTrue(ar.wall_time > 0.1,
506 self.assertTrue(ar.wall_time > 0.1,
507 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
507 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
508 )
508 )
509
509
510 def test_activate(self):
510 def test_activate(self):
511 ip = get_ipython()
511 ip = get_ipython()
512 magics = ip.magics_manager.magics
512 magics = ip.magics_manager.magics
513 self.assertTrue('px' in magics['line'])
513 self.assertTrue('px' in magics['line'])
514 self.assertTrue('px' in magics['cell'])
514 self.assertTrue('px' in magics['cell'])
515 v0 = self.client.activate(-1, '0')
515 v0 = self.client.activate(-1, '0')
516 self.assertTrue('px0' in magics['line'])
516 self.assertTrue('px0' in magics['line'])
517 self.assertTrue('px0' in magics['cell'])
517 self.assertTrue('px0' in magics['cell'])
518 self.assertEqual(v0.targets, self.client.ids[-1])
518 self.assertEqual(v0.targets, self.client.ids[-1])
519 v0 = self.client.activate('all', 'all')
519 v0 = self.client.activate('all', 'all')
520 self.assertTrue('pxall' in magics['line'])
520 self.assertTrue('pxall' in magics['line'])
521 self.assertTrue('pxall' in magics['cell'])
521 self.assertTrue('pxall' in magics['cell'])
522 self.assertEqual(v0.targets, 'all')
522 self.assertEqual(v0.targets, 'all')
General Comments 0
You need to be logged in to leave comments. Login now