##// END OF EJS Templates
Merge pull request #2808 from minrk/parallel_wait...
Min RK -
r9181:9db88115 merge
parent child Browse files
Show More
@@ -1,502 +1,517
1 """Tests for parallel client.py
1 """Tests for parallel client.py
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 from __future__ import division
19 from __future__ import division
20
20
21 import time
21 import time
22 from datetime import datetime
22 from datetime import datetime
23 from tempfile import mktemp
23 from tempfile import mktemp
24
24
25 import zmq
25 import zmq
26
26
27 from IPython import parallel
27 from IPython import parallel
28 from IPython.parallel.client import client as clientmod
28 from IPython.parallel.client import client as clientmod
29 from IPython.parallel import error
29 from IPython.parallel import error
30 from IPython.parallel import AsyncResult, AsyncHubResult
30 from IPython.parallel import AsyncResult, AsyncHubResult
31 from IPython.parallel import LoadBalancedView, DirectView
31 from IPython.parallel import LoadBalancedView, DirectView
32
32
33 from clienttest import ClusterTestCase, segfault, wait, add_engines
33 from clienttest import ClusterTestCase, segfault, wait, add_engines
34
34
35 def setup():
35 def setup():
36 add_engines(4, total=True)
36 add_engines(4, total=True)
37
37
38 class TestClient(ClusterTestCase):
38 class TestClient(ClusterTestCase):
39
39
40 def test_ids(self):
40 def test_ids(self):
41 n = len(self.client.ids)
41 n = len(self.client.ids)
42 self.add_engines(2)
42 self.add_engines(2)
43 self.assertEqual(len(self.client.ids), n+2)
43 self.assertEqual(len(self.client.ids), n+2)
44
44
45 def test_view_indexing(self):
45 def test_view_indexing(self):
46 """test index access for views"""
46 """test index access for views"""
47 self.minimum_engines(4)
47 self.minimum_engines(4)
48 targets = self.client._build_targets('all')[-1]
48 targets = self.client._build_targets('all')[-1]
49 v = self.client[:]
49 v = self.client[:]
50 self.assertEqual(v.targets, targets)
50 self.assertEqual(v.targets, targets)
51 t = self.client.ids[2]
51 t = self.client.ids[2]
52 v = self.client[t]
52 v = self.client[t]
53 self.assertTrue(isinstance(v, DirectView))
53 self.assertTrue(isinstance(v, DirectView))
54 self.assertEqual(v.targets, t)
54 self.assertEqual(v.targets, t)
55 t = self.client.ids[2:4]
55 t = self.client.ids[2:4]
56 v = self.client[t]
56 v = self.client[t]
57 self.assertTrue(isinstance(v, DirectView))
57 self.assertTrue(isinstance(v, DirectView))
58 self.assertEqual(v.targets, t)
58 self.assertEqual(v.targets, t)
59 v = self.client[::2]
59 v = self.client[::2]
60 self.assertTrue(isinstance(v, DirectView))
60 self.assertTrue(isinstance(v, DirectView))
61 self.assertEqual(v.targets, targets[::2])
61 self.assertEqual(v.targets, targets[::2])
62 v = self.client[1::3]
62 v = self.client[1::3]
63 self.assertTrue(isinstance(v, DirectView))
63 self.assertTrue(isinstance(v, DirectView))
64 self.assertEqual(v.targets, targets[1::3])
64 self.assertEqual(v.targets, targets[1::3])
65 v = self.client[:-3]
65 v = self.client[:-3]
66 self.assertTrue(isinstance(v, DirectView))
66 self.assertTrue(isinstance(v, DirectView))
67 self.assertEqual(v.targets, targets[:-3])
67 self.assertEqual(v.targets, targets[:-3])
68 v = self.client[-1]
68 v = self.client[-1]
69 self.assertTrue(isinstance(v, DirectView))
69 self.assertTrue(isinstance(v, DirectView))
70 self.assertEqual(v.targets, targets[-1])
70 self.assertEqual(v.targets, targets[-1])
71 self.assertRaises(TypeError, lambda : self.client[None])
71 self.assertRaises(TypeError, lambda : self.client[None])
72
72
73 def test_lbview_targets(self):
73 def test_lbview_targets(self):
74 """test load_balanced_view targets"""
74 """test load_balanced_view targets"""
75 v = self.client.load_balanced_view()
75 v = self.client.load_balanced_view()
76 self.assertEqual(v.targets, None)
76 self.assertEqual(v.targets, None)
77 v = self.client.load_balanced_view(-1)
77 v = self.client.load_balanced_view(-1)
78 self.assertEqual(v.targets, [self.client.ids[-1]])
78 self.assertEqual(v.targets, [self.client.ids[-1]])
79 v = self.client.load_balanced_view('all')
79 v = self.client.load_balanced_view('all')
80 self.assertEqual(v.targets, None)
80 self.assertEqual(v.targets, None)
81
81
82 def test_dview_targets(self):
82 def test_dview_targets(self):
83 """test direct_view targets"""
83 """test direct_view targets"""
84 v = self.client.direct_view()
84 v = self.client.direct_view()
85 self.assertEqual(v.targets, 'all')
85 self.assertEqual(v.targets, 'all')
86 v = self.client.direct_view('all')
86 v = self.client.direct_view('all')
87 self.assertEqual(v.targets, 'all')
87 self.assertEqual(v.targets, 'all')
88 v = self.client.direct_view(-1)
88 v = self.client.direct_view(-1)
89 self.assertEqual(v.targets, self.client.ids[-1])
89 self.assertEqual(v.targets, self.client.ids[-1])
90
90
91 def test_lazy_all_targets(self):
91 def test_lazy_all_targets(self):
92 """test lazy evaluation of rc.direct_view('all')"""
92 """test lazy evaluation of rc.direct_view('all')"""
93 v = self.client.direct_view()
93 v = self.client.direct_view()
94 self.assertEqual(v.targets, 'all')
94 self.assertEqual(v.targets, 'all')
95
95
96 def double(x):
96 def double(x):
97 return x*2
97 return x*2
98 seq = range(100)
98 seq = range(100)
99 ref = [ double(x) for x in seq ]
99 ref = [ double(x) for x in seq ]
100
100
101 # add some engines, which should be used
101 # add some engines, which should be used
102 self.add_engines(1)
102 self.add_engines(1)
103 n1 = len(self.client.ids)
103 n1 = len(self.client.ids)
104
104
105 # simple apply
105 # simple apply
106 r = v.apply_sync(lambda : 1)
106 r = v.apply_sync(lambda : 1)
107 self.assertEqual(r, [1] * n1)
107 self.assertEqual(r, [1] * n1)
108
108
109 # map goes through remotefunction
109 # map goes through remotefunction
110 r = v.map_sync(double, seq)
110 r = v.map_sync(double, seq)
111 self.assertEqual(r, ref)
111 self.assertEqual(r, ref)
112
112
113 # add a couple more engines, and try again
113 # add a couple more engines, and try again
114 self.add_engines(2)
114 self.add_engines(2)
115 n2 = len(self.client.ids)
115 n2 = len(self.client.ids)
116 self.assertNotEqual(n2, n1)
116 self.assertNotEqual(n2, n1)
117
117
118 # apply
118 # apply
119 r = v.apply_sync(lambda : 1)
119 r = v.apply_sync(lambda : 1)
120 self.assertEqual(r, [1] * n2)
120 self.assertEqual(r, [1] * n2)
121
121
122 # map
122 # map
123 r = v.map_sync(double, seq)
123 r = v.map_sync(double, seq)
124 self.assertEqual(r, ref)
124 self.assertEqual(r, ref)
125
125
126 def test_targets(self):
126 def test_targets(self):
127 """test various valid targets arguments"""
127 """test various valid targets arguments"""
128 build = self.client._build_targets
128 build = self.client._build_targets
129 ids = self.client.ids
129 ids = self.client.ids
130 idents,targets = build(None)
130 idents,targets = build(None)
131 self.assertEqual(ids, targets)
131 self.assertEqual(ids, targets)
132
132
133 def test_clear(self):
133 def test_clear(self):
134 """test clear behavior"""
134 """test clear behavior"""
135 self.minimum_engines(2)
135 self.minimum_engines(2)
136 v = self.client[:]
136 v = self.client[:]
137 v.block=True
137 v.block=True
138 v.push(dict(a=5))
138 v.push(dict(a=5))
139 v.pull('a')
139 v.pull('a')
140 id0 = self.client.ids[-1]
140 id0 = self.client.ids[-1]
141 self.client.clear(targets=id0, block=True)
141 self.client.clear(targets=id0, block=True)
142 a = self.client[:-1].get('a')
142 a = self.client[:-1].get('a')
143 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
143 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
144 self.client.clear(block=True)
144 self.client.clear(block=True)
145 for i in self.client.ids:
145 for i in self.client.ids:
146 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
146 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
147
147
148 def test_get_result(self):
148 def test_get_result(self):
149 """test getting results from the Hub."""
149 """test getting results from the Hub."""
150 c = clientmod.Client(profile='iptest')
150 c = clientmod.Client(profile='iptest')
151 t = c.ids[-1]
151 t = c.ids[-1]
152 ar = c[t].apply_async(wait, 1)
152 ar = c[t].apply_async(wait, 1)
153 # give the monitor time to notice the message
153 # give the monitor time to notice the message
154 time.sleep(.25)
154 time.sleep(.25)
155 ahr = self.client.get_result(ar.msg_ids)
155 ahr = self.client.get_result(ar.msg_ids)
156 self.assertTrue(isinstance(ahr, AsyncHubResult))
156 self.assertTrue(isinstance(ahr, AsyncHubResult))
157 self.assertEqual(ahr.get(), ar.get())
157 self.assertEqual(ahr.get(), ar.get())
158 ar2 = self.client.get_result(ar.msg_ids)
158 ar2 = self.client.get_result(ar.msg_ids)
159 self.assertFalse(isinstance(ar2, AsyncHubResult))
159 self.assertFalse(isinstance(ar2, AsyncHubResult))
160 c.close()
160 c.close()
161
161
162 def test_get_execute_result(self):
162 def test_get_execute_result(self):
163 """test getting execute results from the Hub."""
163 """test getting execute results from the Hub."""
164 c = clientmod.Client(profile='iptest')
164 c = clientmod.Client(profile='iptest')
165 t = c.ids[-1]
165 t = c.ids[-1]
166 cell = '\n'.join([
166 cell = '\n'.join([
167 'import time',
167 'import time',
168 'time.sleep(0.25)',
168 'time.sleep(0.25)',
169 '5'
169 '5'
170 ])
170 ])
171 ar = c[t].execute("import time; time.sleep(1)", silent=False)
171 ar = c[t].execute("import time; time.sleep(1)", silent=False)
172 # give the monitor time to notice the message
172 # give the monitor time to notice the message
173 time.sleep(.25)
173 time.sleep(.25)
174 ahr = self.client.get_result(ar.msg_ids)
174 ahr = self.client.get_result(ar.msg_ids)
175 self.assertTrue(isinstance(ahr, AsyncHubResult))
175 self.assertTrue(isinstance(ahr, AsyncHubResult))
176 self.assertEqual(ahr.get().pyout, ar.get().pyout)
176 self.assertEqual(ahr.get().pyout, ar.get().pyout)
177 ar2 = self.client.get_result(ar.msg_ids)
177 ar2 = self.client.get_result(ar.msg_ids)
178 self.assertFalse(isinstance(ar2, AsyncHubResult))
178 self.assertFalse(isinstance(ar2, AsyncHubResult))
179 c.close()
179 c.close()
180
180
181 def test_ids_list(self):
181 def test_ids_list(self):
182 """test client.ids"""
182 """test client.ids"""
183 ids = self.client.ids
183 ids = self.client.ids
184 self.assertEqual(ids, self.client._ids)
184 self.assertEqual(ids, self.client._ids)
185 self.assertFalse(ids is self.client._ids)
185 self.assertFalse(ids is self.client._ids)
186 ids.remove(ids[-1])
186 ids.remove(ids[-1])
187 self.assertNotEqual(ids, self.client._ids)
187 self.assertNotEqual(ids, self.client._ids)
188
188
189 def test_queue_status(self):
189 def test_queue_status(self):
190 ids = self.client.ids
190 ids = self.client.ids
191 id0 = ids[0]
191 id0 = ids[0]
192 qs = self.client.queue_status(targets=id0)
192 qs = self.client.queue_status(targets=id0)
193 self.assertTrue(isinstance(qs, dict))
193 self.assertTrue(isinstance(qs, dict))
194 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
194 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
195 allqs = self.client.queue_status()
195 allqs = self.client.queue_status()
196 self.assertTrue(isinstance(allqs, dict))
196 self.assertTrue(isinstance(allqs, dict))
197 intkeys = list(allqs.keys())
197 intkeys = list(allqs.keys())
198 intkeys.remove('unassigned')
198 intkeys.remove('unassigned')
199 self.assertEqual(sorted(intkeys), sorted(self.client.ids))
199 self.assertEqual(sorted(intkeys), sorted(self.client.ids))
200 unassigned = allqs.pop('unassigned')
200 unassigned = allqs.pop('unassigned')
201 for eid,qs in allqs.items():
201 for eid,qs in allqs.items():
202 self.assertTrue(isinstance(qs, dict))
202 self.assertTrue(isinstance(qs, dict))
203 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
203 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
204
204
205 def test_shutdown(self):
205 def test_shutdown(self):
206 ids = self.client.ids
206 ids = self.client.ids
207 id0 = ids[0]
207 id0 = ids[0]
208 self.client.shutdown(id0, block=True)
208 self.client.shutdown(id0, block=True)
209 while id0 in self.client.ids:
209 while id0 in self.client.ids:
210 time.sleep(0.1)
210 time.sleep(0.1)
211 self.client.spin()
211 self.client.spin()
212
212
213 self.assertRaises(IndexError, lambda : self.client[id0])
213 self.assertRaises(IndexError, lambda : self.client[id0])
214
214
215 def test_result_status(self):
215 def test_result_status(self):
216 pass
216 pass
217 # to be written
217 # to be written
218
218
219 def test_db_query_dt(self):
219 def test_db_query_dt(self):
220 """test db query by date"""
220 """test db query by date"""
221 hist = self.client.hub_history()
221 hist = self.client.hub_history()
222 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
222 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
223 tic = middle['submitted']
223 tic = middle['submitted']
224 before = self.client.db_query({'submitted' : {'$lt' : tic}})
224 before = self.client.db_query({'submitted' : {'$lt' : tic}})
225 after = self.client.db_query({'submitted' : {'$gte' : tic}})
225 after = self.client.db_query({'submitted' : {'$gte' : tic}})
226 self.assertEqual(len(before)+len(after),len(hist))
226 self.assertEqual(len(before)+len(after),len(hist))
227 for b in before:
227 for b in before:
228 self.assertTrue(b['submitted'] < tic)
228 self.assertTrue(b['submitted'] < tic)
229 for a in after:
229 for a in after:
230 self.assertTrue(a['submitted'] >= tic)
230 self.assertTrue(a['submitted'] >= tic)
231 same = self.client.db_query({'submitted' : tic})
231 same = self.client.db_query({'submitted' : tic})
232 for s in same:
232 for s in same:
233 self.assertTrue(s['submitted'] == tic)
233 self.assertTrue(s['submitted'] == tic)
234
234
235 def test_db_query_keys(self):
235 def test_db_query_keys(self):
236 """test extracting subset of record keys"""
236 """test extracting subset of record keys"""
237 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
237 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
238 for rec in found:
238 for rec in found:
239 self.assertEqual(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
239 self.assertEqual(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
240
240
241 def test_db_query_default_keys(self):
241 def test_db_query_default_keys(self):
242 """default db_query excludes buffers"""
242 """default db_query excludes buffers"""
243 found = self.client.db_query({'msg_id': {'$ne' : ''}})
243 found = self.client.db_query({'msg_id': {'$ne' : ''}})
244 for rec in found:
244 for rec in found:
245 keys = set(rec.keys())
245 keys = set(rec.keys())
246 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
246 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
247 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
247 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
248
248
249 def test_db_query_msg_id(self):
249 def test_db_query_msg_id(self):
250 """ensure msg_id is always in db queries"""
250 """ensure msg_id is always in db queries"""
251 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
251 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
252 for rec in found:
252 for rec in found:
253 self.assertTrue('msg_id' in rec.keys())
253 self.assertTrue('msg_id' in rec.keys())
254 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
254 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
255 for rec in found:
255 for rec in found:
256 self.assertTrue('msg_id' in rec.keys())
256 self.assertTrue('msg_id' in rec.keys())
257 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
257 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
258 for rec in found:
258 for rec in found:
259 self.assertTrue('msg_id' in rec.keys())
259 self.assertTrue('msg_id' in rec.keys())
260
260
261 def test_db_query_get_result(self):
261 def test_db_query_get_result(self):
262 """pop in db_query shouldn't pop from result itself"""
262 """pop in db_query shouldn't pop from result itself"""
263 self.client[:].apply_sync(lambda : 1)
263 self.client[:].apply_sync(lambda : 1)
264 found = self.client.db_query({'msg_id': {'$ne' : ''}})
264 found = self.client.db_query({'msg_id': {'$ne' : ''}})
265 rc2 = clientmod.Client(profile='iptest')
265 rc2 = clientmod.Client(profile='iptest')
266 # If this bug is not fixed, this call will hang:
266 # If this bug is not fixed, this call will hang:
267 ar = rc2.get_result(self.client.history[-1])
267 ar = rc2.get_result(self.client.history[-1])
268 ar.wait(2)
268 ar.wait(2)
269 self.assertTrue(ar.ready())
269 self.assertTrue(ar.ready())
270 ar.get()
270 ar.get()
271 rc2.close()
271 rc2.close()
272
272
273 def test_db_query_in(self):
273 def test_db_query_in(self):
274 """test db query with '$in','$nin' operators"""
274 """test db query with '$in','$nin' operators"""
275 hist = self.client.hub_history()
275 hist = self.client.hub_history()
276 even = hist[::2]
276 even = hist[::2]
277 odd = hist[1::2]
277 odd = hist[1::2]
278 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
278 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
279 found = [ r['msg_id'] for r in recs ]
279 found = [ r['msg_id'] for r in recs ]
280 self.assertEqual(set(even), set(found))
280 self.assertEqual(set(even), set(found))
281 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
281 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
282 found = [ r['msg_id'] for r in recs ]
282 found = [ r['msg_id'] for r in recs ]
283 self.assertEqual(set(odd), set(found))
283 self.assertEqual(set(odd), set(found))
284
284
285 def test_hub_history(self):
285 def test_hub_history(self):
286 hist = self.client.hub_history()
286 hist = self.client.hub_history()
287 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
287 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
288 recdict = {}
288 recdict = {}
289 for rec in recs:
289 for rec in recs:
290 recdict[rec['msg_id']] = rec
290 recdict[rec['msg_id']] = rec
291
291
292 latest = datetime(1984,1,1)
292 latest = datetime(1984,1,1)
293 for msg_id in hist:
293 for msg_id in hist:
294 rec = recdict[msg_id]
294 rec = recdict[msg_id]
295 newt = rec['submitted']
295 newt = rec['submitted']
296 self.assertTrue(newt >= latest)
296 self.assertTrue(newt >= latest)
297 latest = newt
297 latest = newt
298 ar = self.client[-1].apply_async(lambda : 1)
298 ar = self.client[-1].apply_async(lambda : 1)
299 ar.get()
299 ar.get()
300 time.sleep(0.25)
300 time.sleep(0.25)
301 self.assertEqual(self.client.hub_history()[-1:],ar.msg_ids)
301 self.assertEqual(self.client.hub_history()[-1:],ar.msg_ids)
302
302
303 def _wait_for_idle(self):
303 def _wait_for_idle(self):
304 """wait for an engine to become idle, according to the Hub"""
304 """wait for an engine to become idle, according to the Hub"""
305 rc = self.client
305 rc = self.client
306
306
307 # step 1. wait for all requests to be noticed
308 # timeout 5s, polling every 100ms
309 msg_ids = set(rc.history)
310 hub_hist = rc.hub_history()
311 for i in range(50):
312 if msg_ids.difference(hub_hist):
313 time.sleep(0.1)
314 hub_hist = rc.hub_history()
315 else:
316 break
317
318 self.assertEqual(len(msg_ids.difference(hub_hist)), 0)
319
320 # step 2. wait for all requests to be done
307 # timeout 5s, polling every 100ms
321 # timeout 5s, polling every 100ms
308 qs = rc.queue_status()
322 qs = rc.queue_status()
309 for i in range(50):
323 for i in range(50):
310 if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids):
324 if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids):
311 time.sleep(0.1)
325 time.sleep(0.1)
312 qs = rc.queue_status()
326 qs = rc.queue_status()
313 else:
327 else:
314 break
328 break
315
329
316 # ensure Hub up to date:
330 # ensure Hub up to date:
317 self.assertEqual(qs['unassigned'], 0)
331 self.assertEqual(qs['unassigned'], 0)
318 for eid in rc.ids:
332 for eid in rc.ids:
319 self.assertEqual(qs[eid]['tasks'], 0)
333 self.assertEqual(qs[eid]['tasks'], 0)
320
334
321
335
322 def test_resubmit(self):
336 def test_resubmit(self):
323 def f():
337 def f():
324 import random
338 import random
325 return random.random()
339 return random.random()
326 v = self.client.load_balanced_view()
340 v = self.client.load_balanced_view()
327 ar = v.apply_async(f)
341 ar = v.apply_async(f)
328 r1 = ar.get(1)
342 r1 = ar.get(1)
329 # give the Hub a chance to notice:
343 # give the Hub a chance to notice:
330 self._wait_for_idle()
344 self._wait_for_idle()
331 ahr = self.client.resubmit(ar.msg_ids)
345 ahr = self.client.resubmit(ar.msg_ids)
332 r2 = ahr.get(1)
346 r2 = ahr.get(1)
333 self.assertFalse(r1 == r2)
347 self.assertFalse(r1 == r2)
334
348
335 def test_resubmit_chain(self):
349 def test_resubmit_chain(self):
336 """resubmit resubmitted tasks"""
350 """resubmit resubmitted tasks"""
337 v = self.client.load_balanced_view()
351 v = self.client.load_balanced_view()
338 ar = v.apply_async(lambda x: x, 'x'*1024)
352 ar = v.apply_async(lambda x: x, 'x'*1024)
339 ar.get()
353 ar.get()
340 self._wait_for_idle()
354 self._wait_for_idle()
341 ars = [ar]
355 ars = [ar]
342
356
343 for i in range(10):
357 for i in range(10):
344 ar = ars[-1]
358 ar = ars[-1]
345 ar2 = self.client.resubmit(ar.msg_ids)
359 ar2 = self.client.resubmit(ar.msg_ids)
346
360
347 [ ar.get() for ar in ars ]
361 [ ar.get() for ar in ars ]
348
362
349 def test_resubmit_header(self):
363 def test_resubmit_header(self):
350 """resubmit shouldn't clobber the whole header"""
364 """resubmit shouldn't clobber the whole header"""
351 def f():
365 def f():
352 import random
366 import random
353 return random.random()
367 return random.random()
354 v = self.client.load_balanced_view()
368 v = self.client.load_balanced_view()
355 v.retries = 1
369 v.retries = 1
356 ar = v.apply_async(f)
370 ar = v.apply_async(f)
357 r1 = ar.get(1)
371 r1 = ar.get(1)
358 # give the Hub a chance to notice:
372 # give the Hub a chance to notice:
359 self._wait_for_idle()
373 self._wait_for_idle()
360 ahr = self.client.resubmit(ar.msg_ids)
374 ahr = self.client.resubmit(ar.msg_ids)
361 ahr.get(1)
375 ahr.get(1)
362 time.sleep(0.5)
376 time.sleep(0.5)
363 records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header')
377 records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header')
364 h1,h2 = [ r['header'] for r in records ]
378 h1,h2 = [ r['header'] for r in records ]
365 for key in set(h1.keys()).union(set(h2.keys())):
379 for key in set(h1.keys()).union(set(h2.keys())):
366 if key in ('msg_id', 'date'):
380 if key in ('msg_id', 'date'):
367 self.assertNotEqual(h1[key], h2[key])
381 self.assertNotEqual(h1[key], h2[key])
368 else:
382 else:
369 self.assertEqual(h1[key], h2[key])
383 self.assertEqual(h1[key], h2[key])
370
384
371 def test_resubmit_aborted(self):
385 def test_resubmit_aborted(self):
372 def f():
386 def f():
373 import random
387 import random
374 return random.random()
388 return random.random()
375 v = self.client.load_balanced_view()
389 v = self.client.load_balanced_view()
376 # restrict to one engine, so we can put a sleep
390 # restrict to one engine, so we can put a sleep
377 # ahead of the task, so it will get aborted
391 # ahead of the task, so it will get aborted
378 eid = self.client.ids[-1]
392 eid = self.client.ids[-1]
379 v.targets = [eid]
393 v.targets = [eid]
380 sleep = v.apply_async(time.sleep, 0.5)
394 sleep = v.apply_async(time.sleep, 0.5)
381 ar = v.apply_async(f)
395 ar = v.apply_async(f)
382 ar.abort()
396 ar.abort()
383 self.assertRaises(error.TaskAborted, ar.get)
397 self.assertRaises(error.TaskAborted, ar.get)
384 # Give the Hub a chance to get up to date:
398 # Give the Hub a chance to get up to date:
385 self._wait_for_idle()
399 self._wait_for_idle()
386 ahr = self.client.resubmit(ar.msg_ids)
400 ahr = self.client.resubmit(ar.msg_ids)
387 r2 = ahr.get(1)
401 r2 = ahr.get(1)
388
402
389 def test_resubmit_inflight(self):
403 def test_resubmit_inflight(self):
390 """resubmit of inflight task"""
404 """resubmit of inflight task"""
391 v = self.client.load_balanced_view()
405 v = self.client.load_balanced_view()
392 ar = v.apply_async(time.sleep,1)
406 ar = v.apply_async(time.sleep,1)
393 # give the message a chance to arrive
407 # give the message a chance to arrive
394 time.sleep(0.2)
408 time.sleep(0.2)
395 ahr = self.client.resubmit(ar.msg_ids)
409 ahr = self.client.resubmit(ar.msg_ids)
396 ar.get(2)
410 ar.get(2)
397 ahr.get(2)
411 ahr.get(2)
398
412
399 def test_resubmit_badkey(self):
413 def test_resubmit_badkey(self):
400 """ensure KeyError on resubmit of nonexistant task"""
414 """ensure KeyError on resubmit of nonexistant task"""
401 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
415 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
402
416
403 def test_purge_hub_results(self):
417 def test_purge_hub_results(self):
404 # ensure there are some tasks
418 # ensure there are some tasks
405 for i in range(5):
419 for i in range(5):
406 self.client[:].apply_sync(lambda : 1)
420 self.client[:].apply_sync(lambda : 1)
407 # Wait for the Hub to realise the result is done:
421 # Wait for the Hub to realise the result is done:
408 # This prevents a race condition, where we
422 # This prevents a race condition, where we
409 # might purge a result the Hub still thinks is pending.
423 # might purge a result the Hub still thinks is pending.
410 time.sleep(0.1)
424 self._wait_for_idle()
411 rc2 = clientmod.Client(profile='iptest')
425 rc2 = clientmod.Client(profile='iptest')
412 hist = self.client.hub_history()
426 hist = self.client.hub_history()
413 ahr = rc2.get_result([hist[-1]])
427 ahr = rc2.get_result([hist[-1]])
414 ahr.wait(10)
428 ahr.wait(10)
415 self.client.purge_hub_results(hist[-1])
429 self.client.purge_hub_results(hist[-1])
416 newhist = self.client.hub_history()
430 newhist = self.client.hub_history()
417 self.assertEqual(len(newhist)+1,len(hist))
431 self.assertEqual(len(newhist)+1,len(hist))
418 rc2.spin()
432 rc2.spin()
419 rc2.close()
433 rc2.close()
420
434
421 def test_purge_local_results(self):
435 def test_purge_local_results(self):
422 # ensure there are some tasks
436 # ensure there are some tasks
423 res = []
437 res = []
424 for i in range(5):
438 for i in range(5):
425 res.append(self.client[:].apply_async(lambda : 1))
439 res.append(self.client[:].apply_async(lambda : 1))
426 time.sleep(0.1)
440 self._wait_for_idle()
427 self.client.wait(10) # wait for the results to come back
441 self.client.wait(10) # wait for the results to come back
428 before = len(self.client.results)
442 before = len(self.client.results)
429 self.assertEqual(len(self.client.metadata),before)
443 self.assertEqual(len(self.client.metadata),before)
430 self.client.purge_local_results(res[-1])
444 self.client.purge_local_results(res[-1])
431 self.assertEqual(len(self.client.results),before-len(res[-1]), msg="Not removed from results")
445 self.assertEqual(len(self.client.results),before-len(res[-1]), msg="Not removed from results")
432 self.assertEqual(len(self.client.metadata),before-len(res[-1]), msg="Not removed from metadata")
446 self.assertEqual(len(self.client.metadata),before-len(res[-1]), msg="Not removed from metadata")
433
447
434 def test_purge_all_hub_results(self):
448 def test_purge_all_hub_results(self):
435 self.client.purge_hub_results('all')
449 self.client.purge_hub_results('all')
436 hist = self.client.hub_history()
450 hist = self.client.hub_history()
437 self.assertEqual(len(hist), 0)
451 self.assertEqual(len(hist), 0)
438
452
439 def test_purge_all_local_results(self):
453 def test_purge_all_local_results(self):
440 self.client.purge_local_results('all')
454 self.client.purge_local_results('all')
441 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
455 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
442 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
456 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
443
457
444 def test_purge_all_results(self):
458 def test_purge_all_results(self):
445 # ensure there are some tasks
459 # ensure there are some tasks
446 for i in range(5):
460 for i in range(5):
447 self.client[:].apply_sync(lambda : 1)
461 self.client[:].apply_sync(lambda : 1)
448 self.client.wait(10)
462 self.client.wait(10)
463 self._wait_for_idle()
449 self.client.purge_results('all')
464 self.client.purge_results('all')
450 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
465 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
451 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
466 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
452 time.sleep(0.1)
467 hist = self.client.hub_history()
453 hist = self.client.hub_history()#
454 self.assertEqual(len(hist), 0, msg="hub history not empty")
468 self.assertEqual(len(hist), 0, msg="hub history not empty")
455
469
456 def test_purge_everything(self):
470 def test_purge_everything(self):
457 # ensure there are some tasks
471 # ensure there are some tasks
458 for i in range(5):
472 for i in range(5):
459 self.client[:].apply_sync(lambda : 1)
473 self.client[:].apply_sync(lambda : 1)
460 self.client.wait(10)
474 self.client.wait(10)
475 self._wait_for_idle()
461 self.client.purge_everything()
476 self.client.purge_everything()
462 # The client results
477 # The client results
463 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
478 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
464 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
479 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
465 # the hub results
466 hist = self.client.hub_history()
467 self.assertEqual(len(hist), 0, msg="hub history not empty")
468 # The client "bookkeeping"
480 # The client "bookkeeping"
469 self.assertEqual(len(self.client.session.digest_history), 0, msg="session digest not empty")
481 self.assertEqual(len(self.client.session.digest_history), 0, msg="session digest not empty")
470 self.assertEqual(len(self.client.history), 0, msg="client history not empty")
482 self.assertEqual(len(self.client.history), 0, msg="client history not empty")
483 # the hub results
484 hist = self.client.hub_history()
485 self.assertEqual(len(hist), 0, msg="hub history not empty")
471
486
472
487
473 def test_spin_thread(self):
488 def test_spin_thread(self):
474 self.client.spin_thread(0.01)
489 self.client.spin_thread(0.01)
475 ar = self.client[-1].apply_async(lambda : 1)
490 ar = self.client[-1].apply_async(lambda : 1)
476 time.sleep(0.1)
491 time.sleep(0.1)
477 self.assertTrue(ar.wall_time < 0.1,
492 self.assertTrue(ar.wall_time < 0.1,
478 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
493 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
479 )
494 )
480
495
481 def test_stop_spin_thread(self):
496 def test_stop_spin_thread(self):
482 self.client.spin_thread(0.01)
497 self.client.spin_thread(0.01)
483 self.client.stop_spin_thread()
498 self.client.stop_spin_thread()
484 ar = self.client[-1].apply_async(lambda : 1)
499 ar = self.client[-1].apply_async(lambda : 1)
485 time.sleep(0.15)
500 time.sleep(0.15)
486 self.assertTrue(ar.wall_time > 0.1,
501 self.assertTrue(ar.wall_time > 0.1,
487 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
502 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
488 )
503 )
489
504
490 def test_activate(self):
505 def test_activate(self):
491 ip = get_ipython()
506 ip = get_ipython()
492 magics = ip.magics_manager.magics
507 magics = ip.magics_manager.magics
493 self.assertTrue('px' in magics['line'])
508 self.assertTrue('px' in magics['line'])
494 self.assertTrue('px' in magics['cell'])
509 self.assertTrue('px' in magics['cell'])
495 v0 = self.client.activate(-1, '0')
510 v0 = self.client.activate(-1, '0')
496 self.assertTrue('px0' in magics['line'])
511 self.assertTrue('px0' in magics['line'])
497 self.assertTrue('px0' in magics['cell'])
512 self.assertTrue('px0' in magics['cell'])
498 self.assertEqual(v0.targets, self.client.ids[-1])
513 self.assertEqual(v0.targets, self.client.ids[-1])
499 v0 = self.client.activate('all', 'all')
514 v0 = self.client.activate('all', 'all')
500 self.assertTrue('pxall' in magics['line'])
515 self.assertTrue('pxall' in magics['line'])
501 self.assertTrue('pxall' in magics['cell'])
516 self.assertTrue('pxall' in magics['cell'])
502 self.assertEqual(v0.targets, 'all')
517 self.assertEqual(v0.targets, 'all')
General Comments 0
You need to be logged in to leave comments. Login now