##// END OF EJS Templates
wait for empty queues as well as tasks...
MinRK -
Show More
@@ -1,517 +1,522 b''
1 """Tests for parallel client.py
1 """Tests for parallel client.py
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 from __future__ import division
19 from __future__ import division
20
20
21 import time
21 import time
22 from datetime import datetime
22 from datetime import datetime
23 from tempfile import mktemp
23 from tempfile import mktemp
24
24
25 import zmq
25 import zmq
26
26
27 from IPython import parallel
27 from IPython import parallel
28 from IPython.parallel.client import client as clientmod
28 from IPython.parallel.client import client as clientmod
29 from IPython.parallel import error
29 from IPython.parallel import error
30 from IPython.parallel import AsyncResult, AsyncHubResult
30 from IPython.parallel import AsyncResult, AsyncHubResult
31 from IPython.parallel import LoadBalancedView, DirectView
31 from IPython.parallel import LoadBalancedView, DirectView
32
32
33 from .clienttest import ClusterTestCase, segfault, wait, add_engines
33 from .clienttest import ClusterTestCase, segfault, wait, add_engines
34
34
35 def setup():
35 def setup():
36 add_engines(4, total=True)
36 add_engines(4, total=True)
37
37
38 class TestClient(ClusterTestCase):
38 class TestClient(ClusterTestCase):
39
39
40 def test_ids(self):
40 def test_ids(self):
41 n = len(self.client.ids)
41 n = len(self.client.ids)
42 self.add_engines(2)
42 self.add_engines(2)
43 self.assertEqual(len(self.client.ids), n+2)
43 self.assertEqual(len(self.client.ids), n+2)
44
44
45 def test_view_indexing(self):
45 def test_view_indexing(self):
46 """test index access for views"""
46 """test index access for views"""
47 self.minimum_engines(4)
47 self.minimum_engines(4)
48 targets = self.client._build_targets('all')[-1]
48 targets = self.client._build_targets('all')[-1]
49 v = self.client[:]
49 v = self.client[:]
50 self.assertEqual(v.targets, targets)
50 self.assertEqual(v.targets, targets)
51 t = self.client.ids[2]
51 t = self.client.ids[2]
52 v = self.client[t]
52 v = self.client[t]
53 self.assertTrue(isinstance(v, DirectView))
53 self.assertTrue(isinstance(v, DirectView))
54 self.assertEqual(v.targets, t)
54 self.assertEqual(v.targets, t)
55 t = self.client.ids[2:4]
55 t = self.client.ids[2:4]
56 v = self.client[t]
56 v = self.client[t]
57 self.assertTrue(isinstance(v, DirectView))
57 self.assertTrue(isinstance(v, DirectView))
58 self.assertEqual(v.targets, t)
58 self.assertEqual(v.targets, t)
59 v = self.client[::2]
59 v = self.client[::2]
60 self.assertTrue(isinstance(v, DirectView))
60 self.assertTrue(isinstance(v, DirectView))
61 self.assertEqual(v.targets, targets[::2])
61 self.assertEqual(v.targets, targets[::2])
62 v = self.client[1::3]
62 v = self.client[1::3]
63 self.assertTrue(isinstance(v, DirectView))
63 self.assertTrue(isinstance(v, DirectView))
64 self.assertEqual(v.targets, targets[1::3])
64 self.assertEqual(v.targets, targets[1::3])
65 v = self.client[:-3]
65 v = self.client[:-3]
66 self.assertTrue(isinstance(v, DirectView))
66 self.assertTrue(isinstance(v, DirectView))
67 self.assertEqual(v.targets, targets[:-3])
67 self.assertEqual(v.targets, targets[:-3])
68 v = self.client[-1]
68 v = self.client[-1]
69 self.assertTrue(isinstance(v, DirectView))
69 self.assertTrue(isinstance(v, DirectView))
70 self.assertEqual(v.targets, targets[-1])
70 self.assertEqual(v.targets, targets[-1])
71 self.assertRaises(TypeError, lambda : self.client[None])
71 self.assertRaises(TypeError, lambda : self.client[None])
72
72
73 def test_lbview_targets(self):
73 def test_lbview_targets(self):
74 """test load_balanced_view targets"""
74 """test load_balanced_view targets"""
75 v = self.client.load_balanced_view()
75 v = self.client.load_balanced_view()
76 self.assertEqual(v.targets, None)
76 self.assertEqual(v.targets, None)
77 v = self.client.load_balanced_view(-1)
77 v = self.client.load_balanced_view(-1)
78 self.assertEqual(v.targets, [self.client.ids[-1]])
78 self.assertEqual(v.targets, [self.client.ids[-1]])
79 v = self.client.load_balanced_view('all')
79 v = self.client.load_balanced_view('all')
80 self.assertEqual(v.targets, None)
80 self.assertEqual(v.targets, None)
81
81
82 def test_dview_targets(self):
82 def test_dview_targets(self):
83 """test direct_view targets"""
83 """test direct_view targets"""
84 v = self.client.direct_view()
84 v = self.client.direct_view()
85 self.assertEqual(v.targets, 'all')
85 self.assertEqual(v.targets, 'all')
86 v = self.client.direct_view('all')
86 v = self.client.direct_view('all')
87 self.assertEqual(v.targets, 'all')
87 self.assertEqual(v.targets, 'all')
88 v = self.client.direct_view(-1)
88 v = self.client.direct_view(-1)
89 self.assertEqual(v.targets, self.client.ids[-1])
89 self.assertEqual(v.targets, self.client.ids[-1])
90
90
91 def test_lazy_all_targets(self):
91 def test_lazy_all_targets(self):
92 """test lazy evaluation of rc.direct_view('all')"""
92 """test lazy evaluation of rc.direct_view('all')"""
93 v = self.client.direct_view()
93 v = self.client.direct_view()
94 self.assertEqual(v.targets, 'all')
94 self.assertEqual(v.targets, 'all')
95
95
96 def double(x):
96 def double(x):
97 return x*2
97 return x*2
98 seq = list(range(100))
98 seq = list(range(100))
99 ref = [ double(x) for x in seq ]
99 ref = [ double(x) for x in seq ]
100
100
101 # add some engines, which should be used
101 # add some engines, which should be used
102 self.add_engines(1)
102 self.add_engines(1)
103 n1 = len(self.client.ids)
103 n1 = len(self.client.ids)
104
104
105 # simple apply
105 # simple apply
106 r = v.apply_sync(lambda : 1)
106 r = v.apply_sync(lambda : 1)
107 self.assertEqual(r, [1] * n1)
107 self.assertEqual(r, [1] * n1)
108
108
109 # map goes through remotefunction
109 # map goes through remotefunction
110 r = v.map_sync(double, seq)
110 r = v.map_sync(double, seq)
111 self.assertEqual(r, ref)
111 self.assertEqual(r, ref)
112
112
113 # add a couple more engines, and try again
113 # add a couple more engines, and try again
114 self.add_engines(2)
114 self.add_engines(2)
115 n2 = len(self.client.ids)
115 n2 = len(self.client.ids)
116 self.assertNotEqual(n2, n1)
116 self.assertNotEqual(n2, n1)
117
117
118 # apply
118 # apply
119 r = v.apply_sync(lambda : 1)
119 r = v.apply_sync(lambda : 1)
120 self.assertEqual(r, [1] * n2)
120 self.assertEqual(r, [1] * n2)
121
121
122 # map
122 # map
123 r = v.map_sync(double, seq)
123 r = v.map_sync(double, seq)
124 self.assertEqual(r, ref)
124 self.assertEqual(r, ref)
125
125
126 def test_targets(self):
126 def test_targets(self):
127 """test various valid targets arguments"""
127 """test various valid targets arguments"""
128 build = self.client._build_targets
128 build = self.client._build_targets
129 ids = self.client.ids
129 ids = self.client.ids
130 idents,targets = build(None)
130 idents,targets = build(None)
131 self.assertEqual(ids, targets)
131 self.assertEqual(ids, targets)
132
132
133 def test_clear(self):
133 def test_clear(self):
134 """test clear behavior"""
134 """test clear behavior"""
135 self.minimum_engines(2)
135 self.minimum_engines(2)
136 v = self.client[:]
136 v = self.client[:]
137 v.block=True
137 v.block=True
138 v.push(dict(a=5))
138 v.push(dict(a=5))
139 v.pull('a')
139 v.pull('a')
140 id0 = self.client.ids[-1]
140 id0 = self.client.ids[-1]
141 self.client.clear(targets=id0, block=True)
141 self.client.clear(targets=id0, block=True)
142 a = self.client[:-1].get('a')
142 a = self.client[:-1].get('a')
143 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
143 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
144 self.client.clear(block=True)
144 self.client.clear(block=True)
145 for i in self.client.ids:
145 for i in self.client.ids:
146 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
146 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
147
147
148 def test_get_result(self):
148 def test_get_result(self):
149 """test getting results from the Hub."""
149 """test getting results from the Hub."""
150 c = clientmod.Client(profile='iptest')
150 c = clientmod.Client(profile='iptest')
151 t = c.ids[-1]
151 t = c.ids[-1]
152 ar = c[t].apply_async(wait, 1)
152 ar = c[t].apply_async(wait, 1)
153 # give the monitor time to notice the message
153 # give the monitor time to notice the message
154 time.sleep(.25)
154 time.sleep(.25)
155 ahr = self.client.get_result(ar.msg_ids[0])
155 ahr = self.client.get_result(ar.msg_ids[0])
156 self.assertTrue(isinstance(ahr, AsyncHubResult))
156 self.assertTrue(isinstance(ahr, AsyncHubResult))
157 self.assertEqual(ahr.get(), ar.get())
157 self.assertEqual(ahr.get(), ar.get())
158 ar2 = self.client.get_result(ar.msg_ids[0])
158 ar2 = self.client.get_result(ar.msg_ids[0])
159 self.assertFalse(isinstance(ar2, AsyncHubResult))
159 self.assertFalse(isinstance(ar2, AsyncHubResult))
160 c.close()
160 c.close()
161
161
162 def test_get_execute_result(self):
162 def test_get_execute_result(self):
163 """test getting execute results from the Hub."""
163 """test getting execute results from the Hub."""
164 c = clientmod.Client(profile='iptest')
164 c = clientmod.Client(profile='iptest')
165 t = c.ids[-1]
165 t = c.ids[-1]
166 cell = '\n'.join([
166 cell = '\n'.join([
167 'import time',
167 'import time',
168 'time.sleep(0.25)',
168 'time.sleep(0.25)',
169 '5'
169 '5'
170 ])
170 ])
171 ar = c[t].execute("import time; time.sleep(1)", silent=False)
171 ar = c[t].execute("import time; time.sleep(1)", silent=False)
172 # give the monitor time to notice the message
172 # give the monitor time to notice the message
173 time.sleep(.25)
173 time.sleep(.25)
174 ahr = self.client.get_result(ar.msg_ids[0])
174 ahr = self.client.get_result(ar.msg_ids[0])
175 self.assertTrue(isinstance(ahr, AsyncHubResult))
175 self.assertTrue(isinstance(ahr, AsyncHubResult))
176 self.assertEqual(ahr.get().pyout, ar.get().pyout)
176 self.assertEqual(ahr.get().pyout, ar.get().pyout)
177 ar2 = self.client.get_result(ar.msg_ids[0])
177 ar2 = self.client.get_result(ar.msg_ids[0])
178 self.assertFalse(isinstance(ar2, AsyncHubResult))
178 self.assertFalse(isinstance(ar2, AsyncHubResult))
179 c.close()
179 c.close()
180
180
181 def test_ids_list(self):
181 def test_ids_list(self):
182 """test client.ids"""
182 """test client.ids"""
183 ids = self.client.ids
183 ids = self.client.ids
184 self.assertEqual(ids, self.client._ids)
184 self.assertEqual(ids, self.client._ids)
185 self.assertFalse(ids is self.client._ids)
185 self.assertFalse(ids is self.client._ids)
186 ids.remove(ids[-1])
186 ids.remove(ids[-1])
187 self.assertNotEqual(ids, self.client._ids)
187 self.assertNotEqual(ids, self.client._ids)
188
188
189 def test_queue_status(self):
189 def test_queue_status(self):
190 ids = self.client.ids
190 ids = self.client.ids
191 id0 = ids[0]
191 id0 = ids[0]
192 qs = self.client.queue_status(targets=id0)
192 qs = self.client.queue_status(targets=id0)
193 self.assertTrue(isinstance(qs, dict))
193 self.assertTrue(isinstance(qs, dict))
194 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
194 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
195 allqs = self.client.queue_status()
195 allqs = self.client.queue_status()
196 self.assertTrue(isinstance(allqs, dict))
196 self.assertTrue(isinstance(allqs, dict))
197 intkeys = list(allqs.keys())
197 intkeys = list(allqs.keys())
198 intkeys.remove('unassigned')
198 intkeys.remove('unassigned')
199 self.assertEqual(sorted(intkeys), sorted(self.client.ids))
199 self.assertEqual(sorted(intkeys), sorted(self.client.ids))
200 unassigned = allqs.pop('unassigned')
200 unassigned = allqs.pop('unassigned')
201 for eid,qs in allqs.items():
201 for eid,qs in allqs.items():
202 self.assertTrue(isinstance(qs, dict))
202 self.assertTrue(isinstance(qs, dict))
203 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
203 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
204
204
205 def test_shutdown(self):
205 def test_shutdown(self):
206 ids = self.client.ids
206 ids = self.client.ids
207 id0 = ids[0]
207 id0 = ids[0]
208 self.client.shutdown(id0, block=True)
208 self.client.shutdown(id0, block=True)
209 while id0 in self.client.ids:
209 while id0 in self.client.ids:
210 time.sleep(0.1)
210 time.sleep(0.1)
211 self.client.spin()
211 self.client.spin()
212
212
213 self.assertRaises(IndexError, lambda : self.client[id0])
213 self.assertRaises(IndexError, lambda : self.client[id0])
214
214
215 def test_result_status(self):
215 def test_result_status(self):
216 pass
216 pass
217 # to be written
217 # to be written
218
218
219 def test_db_query_dt(self):
219 def test_db_query_dt(self):
220 """test db query by date"""
220 """test db query by date"""
221 hist = self.client.hub_history()
221 hist = self.client.hub_history()
222 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
222 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
223 tic = middle['submitted']
223 tic = middle['submitted']
224 before = self.client.db_query({'submitted' : {'$lt' : tic}})
224 before = self.client.db_query({'submitted' : {'$lt' : tic}})
225 after = self.client.db_query({'submitted' : {'$gte' : tic}})
225 after = self.client.db_query({'submitted' : {'$gte' : tic}})
226 self.assertEqual(len(before)+len(after),len(hist))
226 self.assertEqual(len(before)+len(after),len(hist))
227 for b in before:
227 for b in before:
228 self.assertTrue(b['submitted'] < tic)
228 self.assertTrue(b['submitted'] < tic)
229 for a in after:
229 for a in after:
230 self.assertTrue(a['submitted'] >= tic)
230 self.assertTrue(a['submitted'] >= tic)
231 same = self.client.db_query({'submitted' : tic})
231 same = self.client.db_query({'submitted' : tic})
232 for s in same:
232 for s in same:
233 self.assertTrue(s['submitted'] == tic)
233 self.assertTrue(s['submitted'] == tic)
234
234
235 def test_db_query_keys(self):
235 def test_db_query_keys(self):
236 """test extracting subset of record keys"""
236 """test extracting subset of record keys"""
237 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
237 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
238 for rec in found:
238 for rec in found:
239 self.assertEqual(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
239 self.assertEqual(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
240
240
241 def test_db_query_default_keys(self):
241 def test_db_query_default_keys(self):
242 """default db_query excludes buffers"""
242 """default db_query excludes buffers"""
243 found = self.client.db_query({'msg_id': {'$ne' : ''}})
243 found = self.client.db_query({'msg_id': {'$ne' : ''}})
244 for rec in found:
244 for rec in found:
245 keys = set(rec.keys())
245 keys = set(rec.keys())
246 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
246 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
247 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
247 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
248
248
249 def test_db_query_msg_id(self):
249 def test_db_query_msg_id(self):
250 """ensure msg_id is always in db queries"""
250 """ensure msg_id is always in db queries"""
251 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
251 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
252 for rec in found:
252 for rec in found:
253 self.assertTrue('msg_id' in rec.keys())
253 self.assertTrue('msg_id' in rec.keys())
254 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
254 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
255 for rec in found:
255 for rec in found:
256 self.assertTrue('msg_id' in rec.keys())
256 self.assertTrue('msg_id' in rec.keys())
257 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
257 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
258 for rec in found:
258 for rec in found:
259 self.assertTrue('msg_id' in rec.keys())
259 self.assertTrue('msg_id' in rec.keys())
260
260
261 def test_db_query_get_result(self):
261 def test_db_query_get_result(self):
262 """pop in db_query shouldn't pop from result itself"""
262 """pop in db_query shouldn't pop from result itself"""
263 self.client[:].apply_sync(lambda : 1)
263 self.client[:].apply_sync(lambda : 1)
264 found = self.client.db_query({'msg_id': {'$ne' : ''}})
264 found = self.client.db_query({'msg_id': {'$ne' : ''}})
265 rc2 = clientmod.Client(profile='iptest')
265 rc2 = clientmod.Client(profile='iptest')
266 # If this bug is not fixed, this call will hang:
266 # If this bug is not fixed, this call will hang:
267 ar = rc2.get_result(self.client.history[-1])
267 ar = rc2.get_result(self.client.history[-1])
268 ar.wait(2)
268 ar.wait(2)
269 self.assertTrue(ar.ready())
269 self.assertTrue(ar.ready())
270 ar.get()
270 ar.get()
271 rc2.close()
271 rc2.close()
272
272
273 def test_db_query_in(self):
273 def test_db_query_in(self):
274 """test db query with '$in','$nin' operators"""
274 """test db query with '$in','$nin' operators"""
275 hist = self.client.hub_history()
275 hist = self.client.hub_history()
276 even = hist[::2]
276 even = hist[::2]
277 odd = hist[1::2]
277 odd = hist[1::2]
278 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
278 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
279 found = [ r['msg_id'] for r in recs ]
279 found = [ r['msg_id'] for r in recs ]
280 self.assertEqual(set(even), set(found))
280 self.assertEqual(set(even), set(found))
281 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
281 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
282 found = [ r['msg_id'] for r in recs ]
282 found = [ r['msg_id'] for r in recs ]
283 self.assertEqual(set(odd), set(found))
283 self.assertEqual(set(odd), set(found))
284
284
285 def test_hub_history(self):
285 def test_hub_history(self):
286 hist = self.client.hub_history()
286 hist = self.client.hub_history()
287 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
287 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
288 recdict = {}
288 recdict = {}
289 for rec in recs:
289 for rec in recs:
290 recdict[rec['msg_id']] = rec
290 recdict[rec['msg_id']] = rec
291
291
292 latest = datetime(1984,1,1)
292 latest = datetime(1984,1,1)
293 for msg_id in hist:
293 for msg_id in hist:
294 rec = recdict[msg_id]
294 rec = recdict[msg_id]
295 newt = rec['submitted']
295 newt = rec['submitted']
296 self.assertTrue(newt >= latest)
296 self.assertTrue(newt >= latest)
297 latest = newt
297 latest = newt
298 ar = self.client[-1].apply_async(lambda : 1)
298 ar = self.client[-1].apply_async(lambda : 1)
299 ar.get()
299 ar.get()
300 time.sleep(0.25)
300 time.sleep(0.25)
301 self.assertEqual(self.client.hub_history()[-1:],ar.msg_ids)
301 self.assertEqual(self.client.hub_history()[-1:],ar.msg_ids)
302
302
303 def _wait_for_idle(self):
303 def _wait_for_idle(self):
304 """wait for an engine to become idle, according to the Hub"""
304 """wait for the cluster to become idle, according to the everyone."""
305 rc = self.client
305 rc = self.client
306
306
307 # step 0. wait for local results
308 # this should be sufficient 99% of the time.
309 rc.wait(timeout=5)
310
307 # step 1. wait for all requests to be noticed
311 # step 1. wait for all requests to be noticed
308 # timeout 5s, polling every 100ms
312 # timeout 5s, polling every 100ms
309 msg_ids = set(rc.history)
313 msg_ids = set(rc.history)
310 hub_hist = rc.hub_history()
314 hub_hist = rc.hub_history()
311 for i in range(50):
315 for i in range(50):
312 if msg_ids.difference(hub_hist):
316 if msg_ids.difference(hub_hist):
313 time.sleep(0.1)
317 time.sleep(0.1)
314 hub_hist = rc.hub_history()
318 hub_hist = rc.hub_history()
315 else:
319 else:
316 break
320 break
317
321
318 self.assertEqual(len(msg_ids.difference(hub_hist)), 0)
322 self.assertEqual(len(msg_ids.difference(hub_hist)), 0)
319
323
320 # step 2. wait for all requests to be done
324 # step 2. wait for all requests to be done
321 # timeout 5s, polling every 100ms
325 # timeout 5s, polling every 100ms
322 qs = rc.queue_status()
326 qs = rc.queue_status()
323 for i in range(50):
327 for i in range(50):
324 if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids):
328 if qs['unassigned'] or any(qs[eid]['tasks'] + qs[eid]['queue'] for eid in rc.ids):
325 time.sleep(0.1)
329 time.sleep(0.1)
326 qs = rc.queue_status()
330 qs = rc.queue_status()
327 else:
331 else:
328 break
332 break
329
333
330 # ensure Hub up to date:
334 # ensure Hub up to date:
331 self.assertEqual(qs['unassigned'], 0)
335 self.assertEqual(qs['unassigned'], 0)
332 for eid in rc.ids:
336 for eid in rc.ids:
333 self.assertEqual(qs[eid]['tasks'], 0)
337 self.assertEqual(qs[eid]['tasks'], 0)
338 self.assertEqual(qs[eid]['queue'], 0)
334
339
335
340
336 def test_resubmit(self):
341 def test_resubmit(self):
337 def f():
342 def f():
338 import random
343 import random
339 return random.random()
344 return random.random()
340 v = self.client.load_balanced_view()
345 v = self.client.load_balanced_view()
341 ar = v.apply_async(f)
346 ar = v.apply_async(f)
342 r1 = ar.get(1)
347 r1 = ar.get(1)
343 # give the Hub a chance to notice:
348 # give the Hub a chance to notice:
344 self._wait_for_idle()
349 self._wait_for_idle()
345 ahr = self.client.resubmit(ar.msg_ids)
350 ahr = self.client.resubmit(ar.msg_ids)
346 r2 = ahr.get(1)
351 r2 = ahr.get(1)
347 self.assertFalse(r1 == r2)
352 self.assertFalse(r1 == r2)
348
353
349 def test_resubmit_chain(self):
354 def test_resubmit_chain(self):
350 """resubmit resubmitted tasks"""
355 """resubmit resubmitted tasks"""
351 v = self.client.load_balanced_view()
356 v = self.client.load_balanced_view()
352 ar = v.apply_async(lambda x: x, 'x'*1024)
357 ar = v.apply_async(lambda x: x, 'x'*1024)
353 ar.get()
358 ar.get()
354 self._wait_for_idle()
359 self._wait_for_idle()
355 ars = [ar]
360 ars = [ar]
356
361
357 for i in range(10):
362 for i in range(10):
358 ar = ars[-1]
363 ar = ars[-1]
359 ar2 = self.client.resubmit(ar.msg_ids)
364 ar2 = self.client.resubmit(ar.msg_ids)
360
365
361 [ ar.get() for ar in ars ]
366 [ ar.get() for ar in ars ]
362
367
363 def test_resubmit_header(self):
368 def test_resubmit_header(self):
364 """resubmit shouldn't clobber the whole header"""
369 """resubmit shouldn't clobber the whole header"""
365 def f():
370 def f():
366 import random
371 import random
367 return random.random()
372 return random.random()
368 v = self.client.load_balanced_view()
373 v = self.client.load_balanced_view()
369 v.retries = 1
374 v.retries = 1
370 ar = v.apply_async(f)
375 ar = v.apply_async(f)
371 r1 = ar.get(1)
376 r1 = ar.get(1)
372 # give the Hub a chance to notice:
377 # give the Hub a chance to notice:
373 self._wait_for_idle()
378 self._wait_for_idle()
374 ahr = self.client.resubmit(ar.msg_ids)
379 ahr = self.client.resubmit(ar.msg_ids)
375 ahr.get(1)
380 ahr.get(1)
376 time.sleep(0.5)
381 time.sleep(0.5)
377 records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header')
382 records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header')
378 h1,h2 = [ r['header'] for r in records ]
383 h1,h2 = [ r['header'] for r in records ]
379 for key in set(h1.keys()).union(set(h2.keys())):
384 for key in set(h1.keys()).union(set(h2.keys())):
380 if key in ('msg_id', 'date'):
385 if key in ('msg_id', 'date'):
381 self.assertNotEqual(h1[key], h2[key])
386 self.assertNotEqual(h1[key], h2[key])
382 else:
387 else:
383 self.assertEqual(h1[key], h2[key])
388 self.assertEqual(h1[key], h2[key])
384
389
385 def test_resubmit_aborted(self):
390 def test_resubmit_aborted(self):
386 def f():
391 def f():
387 import random
392 import random
388 return random.random()
393 return random.random()
389 v = self.client.load_balanced_view()
394 v = self.client.load_balanced_view()
390 # restrict to one engine, so we can put a sleep
395 # restrict to one engine, so we can put a sleep
391 # ahead of the task, so it will get aborted
396 # ahead of the task, so it will get aborted
392 eid = self.client.ids[-1]
397 eid = self.client.ids[-1]
393 v.targets = [eid]
398 v.targets = [eid]
394 sleep = v.apply_async(time.sleep, 0.5)
399 sleep = v.apply_async(time.sleep, 0.5)
395 ar = v.apply_async(f)
400 ar = v.apply_async(f)
396 ar.abort()
401 ar.abort()
397 self.assertRaises(error.TaskAborted, ar.get)
402 self.assertRaises(error.TaskAborted, ar.get)
398 # Give the Hub a chance to get up to date:
403 # Give the Hub a chance to get up to date:
399 self._wait_for_idle()
404 self._wait_for_idle()
400 ahr = self.client.resubmit(ar.msg_ids)
405 ahr = self.client.resubmit(ar.msg_ids)
401 r2 = ahr.get(1)
406 r2 = ahr.get(1)
402
407
403 def test_resubmit_inflight(self):
408 def test_resubmit_inflight(self):
404 """resubmit of inflight task"""
409 """resubmit of inflight task"""
405 v = self.client.load_balanced_view()
410 v = self.client.load_balanced_view()
406 ar = v.apply_async(time.sleep,1)
411 ar = v.apply_async(time.sleep,1)
407 # give the message a chance to arrive
412 # give the message a chance to arrive
408 time.sleep(0.2)
413 time.sleep(0.2)
409 ahr = self.client.resubmit(ar.msg_ids)
414 ahr = self.client.resubmit(ar.msg_ids)
410 ar.get(2)
415 ar.get(2)
411 ahr.get(2)
416 ahr.get(2)
412
417
413 def test_resubmit_badkey(self):
418 def test_resubmit_badkey(self):
414 """ensure KeyError on resubmit of nonexistant task"""
419 """ensure KeyError on resubmit of nonexistant task"""
415 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
420 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
416
421
417 def test_purge_hub_results(self):
422 def test_purge_hub_results(self):
418 # ensure there are some tasks
423 # ensure there are some tasks
419 for i in range(5):
424 for i in range(5):
420 self.client[:].apply_sync(lambda : 1)
425 self.client[:].apply_sync(lambda : 1)
421 # Wait for the Hub to realise the result is done:
426 # Wait for the Hub to realise the result is done:
422 # This prevents a race condition, where we
427 # This prevents a race condition, where we
423 # might purge a result the Hub still thinks is pending.
428 # might purge a result the Hub still thinks is pending.
424 self._wait_for_idle()
429 self._wait_for_idle()
425 rc2 = clientmod.Client(profile='iptest')
430 rc2 = clientmod.Client(profile='iptest')
426 hist = self.client.hub_history()
431 hist = self.client.hub_history()
427 ahr = rc2.get_result([hist[-1]])
432 ahr = rc2.get_result([hist[-1]])
428 ahr.wait(10)
433 ahr.wait(10)
429 self.client.purge_hub_results(hist[-1])
434 self.client.purge_hub_results(hist[-1])
430 newhist = self.client.hub_history()
435 newhist = self.client.hub_history()
431 self.assertEqual(len(newhist)+1,len(hist))
436 self.assertEqual(len(newhist)+1,len(hist))
432 rc2.spin()
437 rc2.spin()
433 rc2.close()
438 rc2.close()
434
439
435 def test_purge_local_results(self):
440 def test_purge_local_results(self):
436 # ensure there are some tasks
441 # ensure there are some tasks
437 res = []
442 res = []
438 for i in range(5):
443 for i in range(5):
439 res.append(self.client[:].apply_async(lambda : 1))
444 res.append(self.client[:].apply_async(lambda : 1))
440 self._wait_for_idle()
445 self._wait_for_idle()
441 self.client.wait(10) # wait for the results to come back
446 self.client.wait(10) # wait for the results to come back
442 before = len(self.client.results)
447 before = len(self.client.results)
443 self.assertEqual(len(self.client.metadata),before)
448 self.assertEqual(len(self.client.metadata),before)
444 self.client.purge_local_results(res[-1])
449 self.client.purge_local_results(res[-1])
445 self.assertEqual(len(self.client.results),before-len(res[-1]), msg="Not removed from results")
450 self.assertEqual(len(self.client.results),before-len(res[-1]), msg="Not removed from results")
446 self.assertEqual(len(self.client.metadata),before-len(res[-1]), msg="Not removed from metadata")
451 self.assertEqual(len(self.client.metadata),before-len(res[-1]), msg="Not removed from metadata")
447
452
448 def test_purge_all_hub_results(self):
453 def test_purge_all_hub_results(self):
449 self.client.purge_hub_results('all')
454 self.client.purge_hub_results('all')
450 hist = self.client.hub_history()
455 hist = self.client.hub_history()
451 self.assertEqual(len(hist), 0)
456 self.assertEqual(len(hist), 0)
452
457
453 def test_purge_all_local_results(self):
458 def test_purge_all_local_results(self):
454 self.client.purge_local_results('all')
459 self.client.purge_local_results('all')
455 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
460 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
456 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
461 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
457
462
458 def test_purge_all_results(self):
463 def test_purge_all_results(self):
459 # ensure there are some tasks
464 # ensure there are some tasks
460 for i in range(5):
465 for i in range(5):
461 self.client[:].apply_sync(lambda : 1)
466 self.client[:].apply_sync(lambda : 1)
462 self.client.wait(10)
467 self.client.wait(10)
463 self._wait_for_idle()
468 self._wait_for_idle()
464 self.client.purge_results('all')
469 self.client.purge_results('all')
465 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
470 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
466 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
471 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
467 hist = self.client.hub_history()
472 hist = self.client.hub_history()
468 self.assertEqual(len(hist), 0, msg="hub history not empty")
473 self.assertEqual(len(hist), 0, msg="hub history not empty")
469
474
470 def test_purge_everything(self):
475 def test_purge_everything(self):
471 # ensure there are some tasks
476 # ensure there are some tasks
472 for i in range(5):
477 for i in range(5):
473 self.client[:].apply_sync(lambda : 1)
478 self.client[:].apply_sync(lambda : 1)
474 self.client.wait(10)
479 self.client.wait(10)
475 self._wait_for_idle()
480 self._wait_for_idle()
476 self.client.purge_everything()
481 self.client.purge_everything()
477 # The client results
482 # The client results
478 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
483 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
479 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
484 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
480 # The client "bookkeeping"
485 # The client "bookkeeping"
481 self.assertEqual(len(self.client.session.digest_history), 0, msg="session digest not empty")
486 self.assertEqual(len(self.client.session.digest_history), 0, msg="session digest not empty")
482 self.assertEqual(len(self.client.history), 0, msg="client history not empty")
487 self.assertEqual(len(self.client.history), 0, msg="client history not empty")
483 # the hub results
488 # the hub results
484 hist = self.client.hub_history()
489 hist = self.client.hub_history()
485 self.assertEqual(len(hist), 0, msg="hub history not empty")
490 self.assertEqual(len(hist), 0, msg="hub history not empty")
486
491
487
492
488 def test_spin_thread(self):
493 def test_spin_thread(self):
489 self.client.spin_thread(0.01)
494 self.client.spin_thread(0.01)
490 ar = self.client[-1].apply_async(lambda : 1)
495 ar = self.client[-1].apply_async(lambda : 1)
491 time.sleep(0.1)
496 time.sleep(0.1)
492 self.assertTrue(ar.wall_time < 0.1,
497 self.assertTrue(ar.wall_time < 0.1,
493 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
498 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
494 )
499 )
495
500
496 def test_stop_spin_thread(self):
501 def test_stop_spin_thread(self):
497 self.client.spin_thread(0.01)
502 self.client.spin_thread(0.01)
498 self.client.stop_spin_thread()
503 self.client.stop_spin_thread()
499 ar = self.client[-1].apply_async(lambda : 1)
504 ar = self.client[-1].apply_async(lambda : 1)
500 time.sleep(0.15)
505 time.sleep(0.15)
501 self.assertTrue(ar.wall_time > 0.1,
506 self.assertTrue(ar.wall_time > 0.1,
502 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
507 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
503 )
508 )
504
509
505 def test_activate(self):
510 def test_activate(self):
506 ip = get_ipython()
511 ip = get_ipython()
507 magics = ip.magics_manager.magics
512 magics = ip.magics_manager.magics
508 self.assertTrue('px' in magics['line'])
513 self.assertTrue('px' in magics['line'])
509 self.assertTrue('px' in magics['cell'])
514 self.assertTrue('px' in magics['cell'])
510 v0 = self.client.activate(-1, '0')
515 v0 = self.client.activate(-1, '0')
511 self.assertTrue('px0' in magics['line'])
516 self.assertTrue('px0' in magics['line'])
512 self.assertTrue('px0' in magics['cell'])
517 self.assertTrue('px0' in magics['cell'])
513 self.assertEqual(v0.targets, self.client.ids[-1])
518 self.assertEqual(v0.targets, self.client.ids[-1])
514 v0 = self.client.activate('all', 'all')
519 v0 = self.client.activate('all', 'all')
515 self.assertTrue('pxall' in magics['line'])
520 self.assertTrue('pxall' in magics['line'])
516 self.assertTrue('pxall' in magics['cell'])
521 self.assertTrue('pxall' in magics['cell'])
517 self.assertEqual(v0.targets, 'all')
522 self.assertEqual(v0.targets, 'all')
General Comments 0
You need to be logged in to leave comments. Login now