##// END OF EJS Templates
test chained resubmissions
MinRK -
Show More
@@ -1,408 +1,422 b''
1 """Tests for parallel client.py
1 """Tests for parallel client.py
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 from __future__ import division
19 from __future__ import division
20
20
21 import time
21 import time
22 from datetime import datetime
22 from datetime import datetime
23 from tempfile import mktemp
23 from tempfile import mktemp
24
24
25 import zmq
25 import zmq
26
26
27 from IPython.parallel.client import client as clientmod
27 from IPython.parallel.client import client as clientmod
28 from IPython.parallel import error
28 from IPython.parallel import error
29 from IPython.parallel import AsyncResult, AsyncHubResult
29 from IPython.parallel import AsyncResult, AsyncHubResult
30 from IPython.parallel import LoadBalancedView, DirectView
30 from IPython.parallel import LoadBalancedView, DirectView
31
31
32 from clienttest import ClusterTestCase, segfault, wait, add_engines
32 from clienttest import ClusterTestCase, segfault, wait, add_engines
33
33
34 def setup():
34 def setup():
35 add_engines(4, total=True)
35 add_engines(4, total=True)
36
36
37 class TestClient(ClusterTestCase):
37 class TestClient(ClusterTestCase):
38
38
39 def test_ids(self):
39 def test_ids(self):
40 n = len(self.client.ids)
40 n = len(self.client.ids)
41 self.add_engines(2)
41 self.add_engines(2)
42 self.assertEquals(len(self.client.ids), n+2)
42 self.assertEquals(len(self.client.ids), n+2)
43
43
44 def test_view_indexing(self):
44 def test_view_indexing(self):
45 """test index access for views"""
45 """test index access for views"""
46 self.minimum_engines(4)
46 self.minimum_engines(4)
47 targets = self.client._build_targets('all')[-1]
47 targets = self.client._build_targets('all')[-1]
48 v = self.client[:]
48 v = self.client[:]
49 self.assertEquals(v.targets, targets)
49 self.assertEquals(v.targets, targets)
50 t = self.client.ids[2]
50 t = self.client.ids[2]
51 v = self.client[t]
51 v = self.client[t]
52 self.assert_(isinstance(v, DirectView))
52 self.assert_(isinstance(v, DirectView))
53 self.assertEquals(v.targets, t)
53 self.assertEquals(v.targets, t)
54 t = self.client.ids[2:4]
54 t = self.client.ids[2:4]
55 v = self.client[t]
55 v = self.client[t]
56 self.assert_(isinstance(v, DirectView))
56 self.assert_(isinstance(v, DirectView))
57 self.assertEquals(v.targets, t)
57 self.assertEquals(v.targets, t)
58 v = self.client[::2]
58 v = self.client[::2]
59 self.assert_(isinstance(v, DirectView))
59 self.assert_(isinstance(v, DirectView))
60 self.assertEquals(v.targets, targets[::2])
60 self.assertEquals(v.targets, targets[::2])
61 v = self.client[1::3]
61 v = self.client[1::3]
62 self.assert_(isinstance(v, DirectView))
62 self.assert_(isinstance(v, DirectView))
63 self.assertEquals(v.targets, targets[1::3])
63 self.assertEquals(v.targets, targets[1::3])
64 v = self.client[:-3]
64 v = self.client[:-3]
65 self.assert_(isinstance(v, DirectView))
65 self.assert_(isinstance(v, DirectView))
66 self.assertEquals(v.targets, targets[:-3])
66 self.assertEquals(v.targets, targets[:-3])
67 v = self.client[-1]
67 v = self.client[-1]
68 self.assert_(isinstance(v, DirectView))
68 self.assert_(isinstance(v, DirectView))
69 self.assertEquals(v.targets, targets[-1])
69 self.assertEquals(v.targets, targets[-1])
70 self.assertRaises(TypeError, lambda : self.client[None])
70 self.assertRaises(TypeError, lambda : self.client[None])
71
71
72 def test_lbview_targets(self):
72 def test_lbview_targets(self):
73 """test load_balanced_view targets"""
73 """test load_balanced_view targets"""
74 v = self.client.load_balanced_view()
74 v = self.client.load_balanced_view()
75 self.assertEquals(v.targets, None)
75 self.assertEquals(v.targets, None)
76 v = self.client.load_balanced_view(-1)
76 v = self.client.load_balanced_view(-1)
77 self.assertEquals(v.targets, [self.client.ids[-1]])
77 self.assertEquals(v.targets, [self.client.ids[-1]])
78 v = self.client.load_balanced_view('all')
78 v = self.client.load_balanced_view('all')
79 self.assertEquals(v.targets, None)
79 self.assertEquals(v.targets, None)
80
80
81 def test_dview_targets(self):
81 def test_dview_targets(self):
82 """test direct_view targets"""
82 """test direct_view targets"""
83 v = self.client.direct_view()
83 v = self.client.direct_view()
84 self.assertEquals(v.targets, 'all')
84 self.assertEquals(v.targets, 'all')
85 v = self.client.direct_view('all')
85 v = self.client.direct_view('all')
86 self.assertEquals(v.targets, 'all')
86 self.assertEquals(v.targets, 'all')
87 v = self.client.direct_view(-1)
87 v = self.client.direct_view(-1)
88 self.assertEquals(v.targets, self.client.ids[-1])
88 self.assertEquals(v.targets, self.client.ids[-1])
89
89
90 def test_lazy_all_targets(self):
90 def test_lazy_all_targets(self):
91 """test lazy evaluation of rc.direct_view('all')"""
91 """test lazy evaluation of rc.direct_view('all')"""
92 v = self.client.direct_view()
92 v = self.client.direct_view()
93 self.assertEquals(v.targets, 'all')
93 self.assertEquals(v.targets, 'all')
94
94
95 def double(x):
95 def double(x):
96 return x*2
96 return x*2
97 seq = range(100)
97 seq = range(100)
98 ref = [ double(x) for x in seq ]
98 ref = [ double(x) for x in seq ]
99
99
100 # add some engines, which should be used
100 # add some engines, which should be used
101 self.add_engines(1)
101 self.add_engines(1)
102 n1 = len(self.client.ids)
102 n1 = len(self.client.ids)
103
103
104 # simple apply
104 # simple apply
105 r = v.apply_sync(lambda : 1)
105 r = v.apply_sync(lambda : 1)
106 self.assertEquals(r, [1] * n1)
106 self.assertEquals(r, [1] * n1)
107
107
108 # map goes through remotefunction
108 # map goes through remotefunction
109 r = v.map_sync(double, seq)
109 r = v.map_sync(double, seq)
110 self.assertEquals(r, ref)
110 self.assertEquals(r, ref)
111
111
112 # add a couple more engines, and try again
112 # add a couple more engines, and try again
113 self.add_engines(2)
113 self.add_engines(2)
114 n2 = len(self.client.ids)
114 n2 = len(self.client.ids)
115 self.assertNotEquals(n2, n1)
115 self.assertNotEquals(n2, n1)
116
116
117 # apply
117 # apply
118 r = v.apply_sync(lambda : 1)
118 r = v.apply_sync(lambda : 1)
119 self.assertEquals(r, [1] * n2)
119 self.assertEquals(r, [1] * n2)
120
120
121 # map
121 # map
122 r = v.map_sync(double, seq)
122 r = v.map_sync(double, seq)
123 self.assertEquals(r, ref)
123 self.assertEquals(r, ref)
124
124
125 def test_targets(self):
125 def test_targets(self):
126 """test various valid targets arguments"""
126 """test various valid targets arguments"""
127 build = self.client._build_targets
127 build = self.client._build_targets
128 ids = self.client.ids
128 ids = self.client.ids
129 idents,targets = build(None)
129 idents,targets = build(None)
130 self.assertEquals(ids, targets)
130 self.assertEquals(ids, targets)
131
131
132 def test_clear(self):
132 def test_clear(self):
133 """test clear behavior"""
133 """test clear behavior"""
134 self.minimum_engines(2)
134 self.minimum_engines(2)
135 v = self.client[:]
135 v = self.client[:]
136 v.block=True
136 v.block=True
137 v.push(dict(a=5))
137 v.push(dict(a=5))
138 v.pull('a')
138 v.pull('a')
139 id0 = self.client.ids[-1]
139 id0 = self.client.ids[-1]
140 self.client.clear(targets=id0, block=True)
140 self.client.clear(targets=id0, block=True)
141 a = self.client[:-1].get('a')
141 a = self.client[:-1].get('a')
142 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
142 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
143 self.client.clear(block=True)
143 self.client.clear(block=True)
144 for i in self.client.ids:
144 for i in self.client.ids:
145 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
145 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
146
146
147 def test_get_result(self):
147 def test_get_result(self):
148 """test getting results from the Hub."""
148 """test getting results from the Hub."""
149 c = clientmod.Client(profile='iptest')
149 c = clientmod.Client(profile='iptest')
150 t = c.ids[-1]
150 t = c.ids[-1]
151 ar = c[t].apply_async(wait, 1)
151 ar = c[t].apply_async(wait, 1)
152 # give the monitor time to notice the message
152 # give the monitor time to notice the message
153 time.sleep(.25)
153 time.sleep(.25)
154 ahr = self.client.get_result(ar.msg_ids)
154 ahr = self.client.get_result(ar.msg_ids)
155 self.assertTrue(isinstance(ahr, AsyncHubResult))
155 self.assertTrue(isinstance(ahr, AsyncHubResult))
156 self.assertEquals(ahr.get(), ar.get())
156 self.assertEquals(ahr.get(), ar.get())
157 ar2 = self.client.get_result(ar.msg_ids)
157 ar2 = self.client.get_result(ar.msg_ids)
158 self.assertFalse(isinstance(ar2, AsyncHubResult))
158 self.assertFalse(isinstance(ar2, AsyncHubResult))
159 c.close()
159 c.close()
160
160
161 def test_ids_list(self):
161 def test_ids_list(self):
162 """test client.ids"""
162 """test client.ids"""
163 ids = self.client.ids
163 ids = self.client.ids
164 self.assertEquals(ids, self.client._ids)
164 self.assertEquals(ids, self.client._ids)
165 self.assertFalse(ids is self.client._ids)
165 self.assertFalse(ids is self.client._ids)
166 ids.remove(ids[-1])
166 ids.remove(ids[-1])
167 self.assertNotEquals(ids, self.client._ids)
167 self.assertNotEquals(ids, self.client._ids)
168
168
169 def test_queue_status(self):
169 def test_queue_status(self):
170 ids = self.client.ids
170 ids = self.client.ids
171 id0 = ids[0]
171 id0 = ids[0]
172 qs = self.client.queue_status(targets=id0)
172 qs = self.client.queue_status(targets=id0)
173 self.assertTrue(isinstance(qs, dict))
173 self.assertTrue(isinstance(qs, dict))
174 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
174 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
175 allqs = self.client.queue_status()
175 allqs = self.client.queue_status()
176 self.assertTrue(isinstance(allqs, dict))
176 self.assertTrue(isinstance(allqs, dict))
177 intkeys = list(allqs.keys())
177 intkeys = list(allqs.keys())
178 intkeys.remove('unassigned')
178 intkeys.remove('unassigned')
179 self.assertEquals(sorted(intkeys), sorted(self.client.ids))
179 self.assertEquals(sorted(intkeys), sorted(self.client.ids))
180 unassigned = allqs.pop('unassigned')
180 unassigned = allqs.pop('unassigned')
181 for eid,qs in allqs.items():
181 for eid,qs in allqs.items():
182 self.assertTrue(isinstance(qs, dict))
182 self.assertTrue(isinstance(qs, dict))
183 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
183 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
184
184
185 def test_shutdown(self):
185 def test_shutdown(self):
186 ids = self.client.ids
186 ids = self.client.ids
187 id0 = ids[0]
187 id0 = ids[0]
188 self.client.shutdown(id0, block=True)
188 self.client.shutdown(id0, block=True)
189 while id0 in self.client.ids:
189 while id0 in self.client.ids:
190 time.sleep(0.1)
190 time.sleep(0.1)
191 self.client.spin()
191 self.client.spin()
192
192
193 self.assertRaises(IndexError, lambda : self.client[id0])
193 self.assertRaises(IndexError, lambda : self.client[id0])
194
194
195 def test_result_status(self):
195 def test_result_status(self):
196 pass
196 pass
197 # to be written
197 # to be written
198
198
199 def test_db_query_dt(self):
199 def test_db_query_dt(self):
200 """test db query by date"""
200 """test db query by date"""
201 hist = self.client.hub_history()
201 hist = self.client.hub_history()
202 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
202 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
203 tic = middle['submitted']
203 tic = middle['submitted']
204 before = self.client.db_query({'submitted' : {'$lt' : tic}})
204 before = self.client.db_query({'submitted' : {'$lt' : tic}})
205 after = self.client.db_query({'submitted' : {'$gte' : tic}})
205 after = self.client.db_query({'submitted' : {'$gte' : tic}})
206 self.assertEquals(len(before)+len(after),len(hist))
206 self.assertEquals(len(before)+len(after),len(hist))
207 for b in before:
207 for b in before:
208 self.assertTrue(b['submitted'] < tic)
208 self.assertTrue(b['submitted'] < tic)
209 for a in after:
209 for a in after:
210 self.assertTrue(a['submitted'] >= tic)
210 self.assertTrue(a['submitted'] >= tic)
211 same = self.client.db_query({'submitted' : tic})
211 same = self.client.db_query({'submitted' : tic})
212 for s in same:
212 for s in same:
213 self.assertTrue(s['submitted'] == tic)
213 self.assertTrue(s['submitted'] == tic)
214
214
215 def test_db_query_keys(self):
215 def test_db_query_keys(self):
216 """test extracting subset of record keys"""
216 """test extracting subset of record keys"""
217 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
217 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
218 for rec in found:
218 for rec in found:
219 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
219 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
220
220
221 def test_db_query_default_keys(self):
221 def test_db_query_default_keys(self):
222 """default db_query excludes buffers"""
222 """default db_query excludes buffers"""
223 found = self.client.db_query({'msg_id': {'$ne' : ''}})
223 found = self.client.db_query({'msg_id': {'$ne' : ''}})
224 for rec in found:
224 for rec in found:
225 keys = set(rec.keys())
225 keys = set(rec.keys())
226 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
226 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
227 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
227 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
228
228
229 def test_db_query_msg_id(self):
229 def test_db_query_msg_id(self):
230 """ensure msg_id is always in db queries"""
230 """ensure msg_id is always in db queries"""
231 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
231 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
232 for rec in found:
232 for rec in found:
233 self.assertTrue('msg_id' in rec.keys())
233 self.assertTrue('msg_id' in rec.keys())
234 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
234 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
235 for rec in found:
235 for rec in found:
236 self.assertTrue('msg_id' in rec.keys())
236 self.assertTrue('msg_id' in rec.keys())
237 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
237 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
238 for rec in found:
238 for rec in found:
239 self.assertTrue('msg_id' in rec.keys())
239 self.assertTrue('msg_id' in rec.keys())
240
240
241 def test_db_query_get_result(self):
241 def test_db_query_get_result(self):
242 """pop in db_query shouldn't pop from result itself"""
242 """pop in db_query shouldn't pop from result itself"""
243 self.client[:].apply_sync(lambda : 1)
243 self.client[:].apply_sync(lambda : 1)
244 found = self.client.db_query({'msg_id': {'$ne' : ''}})
244 found = self.client.db_query({'msg_id': {'$ne' : ''}})
245 rc2 = clientmod.Client(profile='iptest')
245 rc2 = clientmod.Client(profile='iptest')
246 # If this bug is not fixed, this call will hang:
246 # If this bug is not fixed, this call will hang:
247 ar = rc2.get_result(self.client.history[-1])
247 ar = rc2.get_result(self.client.history[-1])
248 ar.wait(2)
248 ar.wait(2)
249 self.assertTrue(ar.ready())
249 self.assertTrue(ar.ready())
250 ar.get()
250 ar.get()
251 rc2.close()
251 rc2.close()
252
252
253 def test_db_query_in(self):
253 def test_db_query_in(self):
254 """test db query with '$in','$nin' operators"""
254 """test db query with '$in','$nin' operators"""
255 hist = self.client.hub_history()
255 hist = self.client.hub_history()
256 even = hist[::2]
256 even = hist[::2]
257 odd = hist[1::2]
257 odd = hist[1::2]
258 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
258 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
259 found = [ r['msg_id'] for r in recs ]
259 found = [ r['msg_id'] for r in recs ]
260 self.assertEquals(set(even), set(found))
260 self.assertEquals(set(even), set(found))
261 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
261 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
262 found = [ r['msg_id'] for r in recs ]
262 found = [ r['msg_id'] for r in recs ]
263 self.assertEquals(set(odd), set(found))
263 self.assertEquals(set(odd), set(found))
264
264
265 def test_hub_history(self):
265 def test_hub_history(self):
266 hist = self.client.hub_history()
266 hist = self.client.hub_history()
267 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
267 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
268 recdict = {}
268 recdict = {}
269 for rec in recs:
269 for rec in recs:
270 recdict[rec['msg_id']] = rec
270 recdict[rec['msg_id']] = rec
271
271
272 latest = datetime(1984,1,1)
272 latest = datetime(1984,1,1)
273 for msg_id in hist:
273 for msg_id in hist:
274 rec = recdict[msg_id]
274 rec = recdict[msg_id]
275 newt = rec['submitted']
275 newt = rec['submitted']
276 self.assertTrue(newt >= latest)
276 self.assertTrue(newt >= latest)
277 latest = newt
277 latest = newt
278 ar = self.client[-1].apply_async(lambda : 1)
278 ar = self.client[-1].apply_async(lambda : 1)
279 ar.get()
279 ar.get()
280 time.sleep(0.25)
280 time.sleep(0.25)
281 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
281 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
282
282
283 def _wait_for_idle(self):
283 def _wait_for_idle(self):
284 """wait for an engine to become idle, according to the Hub"""
284 """wait for an engine to become idle, according to the Hub"""
285 rc = self.client
285 rc = self.client
286
286
287 # timeout 2s, polling every 100ms
287 # timeout 2s, polling every 100ms
288 for i in range(20):
288 for i in range(20):
289 qs = rc.queue_status()
289 qs = rc.queue_status()
290 if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids):
290 if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids):
291 time.sleep(0.1)
291 time.sleep(0.1)
292 else:
292 else:
293 break
293 break
294
294
295 # ensure Hub up to date:
295 # ensure Hub up to date:
296 qs = rc.queue_status()
296 qs = rc.queue_status()
297 self.assertEquals(qs['unassigned'], 0)
297 self.assertEquals(qs['unassigned'], 0)
298 for eid in rc.ids:
298 for eid in rc.ids:
299 self.assertEquals(qs[eid]['tasks'], 0)
299 self.assertEquals(qs[eid]['tasks'], 0)
300
300
301
301
302 def test_resubmit(self):
302 def test_resubmit(self):
303 def f():
303 def f():
304 import random
304 import random
305 return random.random()
305 return random.random()
306 v = self.client.load_balanced_view()
306 v = self.client.load_balanced_view()
307 ar = v.apply_async(f)
307 ar = v.apply_async(f)
308 r1 = ar.get(1)
308 r1 = ar.get(1)
309 # give the Hub a chance to notice:
309 # give the Hub a chance to notice:
310 self._wait_for_idle()
310 self._wait_for_idle()
311 ahr = self.client.resubmit(ar.msg_ids)
311 ahr = self.client.resubmit(ar.msg_ids)
312 r2 = ahr.get(1)
312 r2 = ahr.get(1)
313 self.assertFalse(r1 == r2)
313 self.assertFalse(r1 == r2)
314
314
315 def test_resubmit_chain(self):
316 """resubmit resubmitted tasks"""
317 v = self.client.load_balanced_view()
318 ar = v.apply_async(lambda x: x, 'x'*1024)
319 ar.get()
320 self._wait_for_idle()
321 ars = [ar]
322
323 for i in range(10):
324 ar = ars[-1]
325 ar2 = self.client.resubmit(ar.msg_ids)
326
327 [ ar.get() for ar in ars ]
328
315 def test_resubmit_header(self):
329 def test_resubmit_header(self):
316 """resubmit shouldn't clobber the whole header"""
330 """resubmit shouldn't clobber the whole header"""
317 def f():
331 def f():
318 import random
332 import random
319 return random.random()
333 return random.random()
320 v = self.client.load_balanced_view()
334 v = self.client.load_balanced_view()
321 v.retries = 1
335 v.retries = 1
322 ar = v.apply_async(f)
336 ar = v.apply_async(f)
323 r1 = ar.get(1)
337 r1 = ar.get(1)
324 # give the Hub a chance to notice:
338 # give the Hub a chance to notice:
325 self._wait_for_idle()
339 self._wait_for_idle()
326 ahr = self.client.resubmit(ar.msg_ids)
340 ahr = self.client.resubmit(ar.msg_ids)
327 ahr.get(1)
341 ahr.get(1)
328 time.sleep(0.5)
342 time.sleep(0.5)
329 records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header')
343 records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header')
330 h1,h2 = [ r['header'] for r in records ]
344 h1,h2 = [ r['header'] for r in records ]
331 for key in set(h1.keys()).union(set(h2.keys())):
345 for key in set(h1.keys()).union(set(h2.keys())):
332 if key in ('msg_id', 'date'):
346 if key in ('msg_id', 'date'):
333 self.assertNotEquals(h1[key], h2[key])
347 self.assertNotEquals(h1[key], h2[key])
334 else:
348 else:
335 self.assertEquals(h1[key], h2[key])
349 self.assertEquals(h1[key], h2[key])
336
350
337 def test_resubmit_aborted(self):
351 def test_resubmit_aborted(self):
338 def f():
352 def f():
339 import random
353 import random
340 return random.random()
354 return random.random()
341 v = self.client.load_balanced_view()
355 v = self.client.load_balanced_view()
342 # restrict to one engine, so we can put a sleep
356 # restrict to one engine, so we can put a sleep
343 # ahead of the task, so it will get aborted
357 # ahead of the task, so it will get aborted
344 eid = self.client.ids[-1]
358 eid = self.client.ids[-1]
345 v.targets = [eid]
359 v.targets = [eid]
346 sleep = v.apply_async(time.sleep, 0.5)
360 sleep = v.apply_async(time.sleep, 0.5)
347 ar = v.apply_async(f)
361 ar = v.apply_async(f)
348 ar.abort()
362 ar.abort()
349 self.assertRaises(error.TaskAborted, ar.get)
363 self.assertRaises(error.TaskAborted, ar.get)
350 # Give the Hub a chance to get up to date:
364 # Give the Hub a chance to get up to date:
351 self._wait_for_idle()
365 self._wait_for_idle()
352 ahr = self.client.resubmit(ar.msg_ids)
366 ahr = self.client.resubmit(ar.msg_ids)
353 r2 = ahr.get(1)
367 r2 = ahr.get(1)
354
368
355 def test_resubmit_inflight(self):
369 def test_resubmit_inflight(self):
356 """resubmit of inflight task"""
370 """resubmit of inflight task"""
357 v = self.client.load_balanced_view()
371 v = self.client.load_balanced_view()
358 ar = v.apply_async(time.sleep,1)
372 ar = v.apply_async(time.sleep,1)
359 # give the message a chance to arrive
373 # give the message a chance to arrive
360 time.sleep(0.2)
374 time.sleep(0.2)
361 ahr = self.client.resubmit(ar.msg_ids)
375 ahr = self.client.resubmit(ar.msg_ids)
362 ar.get(2)
376 ar.get(2)
363 ahr.get(2)
377 ahr.get(2)
364
378
365 def test_resubmit_badkey(self):
379 def test_resubmit_badkey(self):
366 """ensure KeyError on resubmit of nonexistant task"""
380 """ensure KeyError on resubmit of nonexistant task"""
367 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
381 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
368
382
369 def test_purge_results(self):
383 def test_purge_results(self):
370 # ensure there are some tasks
384 # ensure there are some tasks
371 for i in range(5):
385 for i in range(5):
372 self.client[:].apply_sync(lambda : 1)
386 self.client[:].apply_sync(lambda : 1)
373 # Wait for the Hub to realise the result is done:
387 # Wait for the Hub to realise the result is done:
374 # This prevents a race condition, where we
388 # This prevents a race condition, where we
375 # might purge a result the Hub still thinks is pending.
389 # might purge a result the Hub still thinks is pending.
376 time.sleep(0.1)
390 time.sleep(0.1)
377 rc2 = clientmod.Client(profile='iptest')
391 rc2 = clientmod.Client(profile='iptest')
378 hist = self.client.hub_history()
392 hist = self.client.hub_history()
379 ahr = rc2.get_result([hist[-1]])
393 ahr = rc2.get_result([hist[-1]])
380 ahr.wait(10)
394 ahr.wait(10)
381 self.client.purge_results(hist[-1])
395 self.client.purge_results(hist[-1])
382 newhist = self.client.hub_history()
396 newhist = self.client.hub_history()
383 self.assertEquals(len(newhist)+1,len(hist))
397 self.assertEquals(len(newhist)+1,len(hist))
384 rc2.spin()
398 rc2.spin()
385 rc2.close()
399 rc2.close()
386
400
387 def test_purge_all_results(self):
401 def test_purge_all_results(self):
388 self.client.purge_results('all')
402 self.client.purge_results('all')
389 hist = self.client.hub_history()
403 hist = self.client.hub_history()
390 self.assertEquals(len(hist), 0)
404 self.assertEquals(len(hist), 0)
391
405
392 def test_spin_thread(self):
406 def test_spin_thread(self):
393 self.client.spin_thread(0.01)
407 self.client.spin_thread(0.01)
394 ar = self.client[-1].apply_async(lambda : 1)
408 ar = self.client[-1].apply_async(lambda : 1)
395 time.sleep(0.1)
409 time.sleep(0.1)
396 self.assertTrue(ar.wall_time < 0.1,
410 self.assertTrue(ar.wall_time < 0.1,
397 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
411 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
398 )
412 )
399
413
400 def test_stop_spin_thread(self):
414 def test_stop_spin_thread(self):
401 self.client.spin_thread(0.01)
415 self.client.spin_thread(0.01)
402 self.client.stop_spin_thread()
416 self.client.stop_spin_thread()
403 ar = self.client[-1].apply_async(lambda : 1)
417 ar = self.client[-1].apply_async(lambda : 1)
404 time.sleep(0.15)
418 time.sleep(0.15)
405 self.assertTrue(ar.wall_time > 0.1,
419 self.assertTrue(ar.wall_time > 0.1,
406 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
420 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
407 )
421 )
408
422
General Comments 0
You need to be logged in to leave comments. Login now