##// END OF EJS Templates
test resubmit of aborted tasks (#1647)
MinRK -
Show More
@@ -1,349 +1,386 b''
1 """Tests for parallel client.py
1 """Tests for parallel client.py
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 from __future__ import division
19 from __future__ import division
20
20
21 import time
21 import time
22 from datetime import datetime
22 from datetime import datetime
23 from tempfile import mktemp
23 from tempfile import mktemp
24
24
25 import zmq
25 import zmq
26
26
27 from IPython.parallel.client import client as clientmod
27 from IPython.parallel.client import client as clientmod
28 from IPython.parallel import error
28 from IPython.parallel import error
29 from IPython.parallel import AsyncResult, AsyncHubResult
29 from IPython.parallel import AsyncResult, AsyncHubResult
30 from IPython.parallel import LoadBalancedView, DirectView
30 from IPython.parallel import LoadBalancedView, DirectView
31
31
32 from clienttest import ClusterTestCase, segfault, wait, add_engines
32 from clienttest import ClusterTestCase, segfault, wait, add_engines
33
33
34 def setup():
34 def setup():
35 add_engines(4, total=True)
35 add_engines(4, total=True)
36
36
37 class TestClient(ClusterTestCase):
37 class TestClient(ClusterTestCase):
38
38
39 def test_ids(self):
39 def test_ids(self):
40 n = len(self.client.ids)
40 n = len(self.client.ids)
41 self.add_engines(2)
41 self.add_engines(2)
42 self.assertEquals(len(self.client.ids), n+2)
42 self.assertEquals(len(self.client.ids), n+2)
43
43
44 def test_view_indexing(self):
44 def test_view_indexing(self):
45 """test index access for views"""
45 """test index access for views"""
46 self.minimum_engines(4)
46 self.minimum_engines(4)
47 targets = self.client._build_targets('all')[-1]
47 targets = self.client._build_targets('all')[-1]
48 v = self.client[:]
48 v = self.client[:]
49 self.assertEquals(v.targets, targets)
49 self.assertEquals(v.targets, targets)
50 t = self.client.ids[2]
50 t = self.client.ids[2]
51 v = self.client[t]
51 v = self.client[t]
52 self.assert_(isinstance(v, DirectView))
52 self.assert_(isinstance(v, DirectView))
53 self.assertEquals(v.targets, t)
53 self.assertEquals(v.targets, t)
54 t = self.client.ids[2:4]
54 t = self.client.ids[2:4]
55 v = self.client[t]
55 v = self.client[t]
56 self.assert_(isinstance(v, DirectView))
56 self.assert_(isinstance(v, DirectView))
57 self.assertEquals(v.targets, t)
57 self.assertEquals(v.targets, t)
58 v = self.client[::2]
58 v = self.client[::2]
59 self.assert_(isinstance(v, DirectView))
59 self.assert_(isinstance(v, DirectView))
60 self.assertEquals(v.targets, targets[::2])
60 self.assertEquals(v.targets, targets[::2])
61 v = self.client[1::3]
61 v = self.client[1::3]
62 self.assert_(isinstance(v, DirectView))
62 self.assert_(isinstance(v, DirectView))
63 self.assertEquals(v.targets, targets[1::3])
63 self.assertEquals(v.targets, targets[1::3])
64 v = self.client[:-3]
64 v = self.client[:-3]
65 self.assert_(isinstance(v, DirectView))
65 self.assert_(isinstance(v, DirectView))
66 self.assertEquals(v.targets, targets[:-3])
66 self.assertEquals(v.targets, targets[:-3])
67 v = self.client[-1]
67 v = self.client[-1]
68 self.assert_(isinstance(v, DirectView))
68 self.assert_(isinstance(v, DirectView))
69 self.assertEquals(v.targets, targets[-1])
69 self.assertEquals(v.targets, targets[-1])
70 self.assertRaises(TypeError, lambda : self.client[None])
70 self.assertRaises(TypeError, lambda : self.client[None])
71
71
72 def test_lbview_targets(self):
72 def test_lbview_targets(self):
73 """test load_balanced_view targets"""
73 """test load_balanced_view targets"""
74 v = self.client.load_balanced_view()
74 v = self.client.load_balanced_view()
75 self.assertEquals(v.targets, None)
75 self.assertEquals(v.targets, None)
76 v = self.client.load_balanced_view(-1)
76 v = self.client.load_balanced_view(-1)
77 self.assertEquals(v.targets, [self.client.ids[-1]])
77 self.assertEquals(v.targets, [self.client.ids[-1]])
78 v = self.client.load_balanced_view('all')
78 v = self.client.load_balanced_view('all')
79 self.assertEquals(v.targets, None)
79 self.assertEquals(v.targets, None)
80
80
81 def test_dview_targets(self):
81 def test_dview_targets(self):
82 """test direct_view targets"""
82 """test direct_view targets"""
83 v = self.client.direct_view()
83 v = self.client.direct_view()
84 self.assertEquals(v.targets, 'all')
84 self.assertEquals(v.targets, 'all')
85 v = self.client.direct_view('all')
85 v = self.client.direct_view('all')
86 self.assertEquals(v.targets, 'all')
86 self.assertEquals(v.targets, 'all')
87 v = self.client.direct_view(-1)
87 v = self.client.direct_view(-1)
88 self.assertEquals(v.targets, self.client.ids[-1])
88 self.assertEquals(v.targets, self.client.ids[-1])
89
89
90 def test_lazy_all_targets(self):
90 def test_lazy_all_targets(self):
91 """test lazy evaluation of rc.direct_view('all')"""
91 """test lazy evaluation of rc.direct_view('all')"""
92 v = self.client.direct_view()
92 v = self.client.direct_view()
93 self.assertEquals(v.targets, 'all')
93 self.assertEquals(v.targets, 'all')
94
94
95 def double(x):
95 def double(x):
96 return x*2
96 return x*2
97 seq = range(100)
97 seq = range(100)
98 ref = [ double(x) for x in seq ]
98 ref = [ double(x) for x in seq ]
99
99
100 # add some engines, which should be used
100 # add some engines, which should be used
101 self.add_engines(1)
101 self.add_engines(1)
102 n1 = len(self.client.ids)
102 n1 = len(self.client.ids)
103
103
104 # simple apply
104 # simple apply
105 r = v.apply_sync(lambda : 1)
105 r = v.apply_sync(lambda : 1)
106 self.assertEquals(r, [1] * n1)
106 self.assertEquals(r, [1] * n1)
107
107
108 # map goes through remotefunction
108 # map goes through remotefunction
109 r = v.map_sync(double, seq)
109 r = v.map_sync(double, seq)
110 self.assertEquals(r, ref)
110 self.assertEquals(r, ref)
111
111
112 # add a couple more engines, and try again
112 # add a couple more engines, and try again
113 self.add_engines(2)
113 self.add_engines(2)
114 n2 = len(self.client.ids)
114 n2 = len(self.client.ids)
115 self.assertNotEquals(n2, n1)
115 self.assertNotEquals(n2, n1)
116
116
117 # apply
117 # apply
118 r = v.apply_sync(lambda : 1)
118 r = v.apply_sync(lambda : 1)
119 self.assertEquals(r, [1] * n2)
119 self.assertEquals(r, [1] * n2)
120
120
121 # map
121 # map
122 r = v.map_sync(double, seq)
122 r = v.map_sync(double, seq)
123 self.assertEquals(r, ref)
123 self.assertEquals(r, ref)
124
124
125 def test_targets(self):
125 def test_targets(self):
126 """test various valid targets arguments"""
126 """test various valid targets arguments"""
127 build = self.client._build_targets
127 build = self.client._build_targets
128 ids = self.client.ids
128 ids = self.client.ids
129 idents,targets = build(None)
129 idents,targets = build(None)
130 self.assertEquals(ids, targets)
130 self.assertEquals(ids, targets)
131
131
132 def test_clear(self):
132 def test_clear(self):
133 """test clear behavior"""
133 """test clear behavior"""
134 self.minimum_engines(2)
134 self.minimum_engines(2)
135 v = self.client[:]
135 v = self.client[:]
136 v.block=True
136 v.block=True
137 v.push(dict(a=5))
137 v.push(dict(a=5))
138 v.pull('a')
138 v.pull('a')
139 id0 = self.client.ids[-1]
139 id0 = self.client.ids[-1]
140 self.client.clear(targets=id0, block=True)
140 self.client.clear(targets=id0, block=True)
141 a = self.client[:-1].get('a')
141 a = self.client[:-1].get('a')
142 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
142 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
143 self.client.clear(block=True)
143 self.client.clear(block=True)
144 for i in self.client.ids:
144 for i in self.client.ids:
145 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
145 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
146
146
147 def test_get_result(self):
147 def test_get_result(self):
148 """test getting results from the Hub."""
148 """test getting results from the Hub."""
149 c = clientmod.Client(profile='iptest')
149 c = clientmod.Client(profile='iptest')
150 t = c.ids[-1]
150 t = c.ids[-1]
151 ar = c[t].apply_async(wait, 1)
151 ar = c[t].apply_async(wait, 1)
152 # give the monitor time to notice the message
152 # give the monitor time to notice the message
153 time.sleep(.25)
153 time.sleep(.25)
154 ahr = self.client.get_result(ar.msg_ids)
154 ahr = self.client.get_result(ar.msg_ids)
155 self.assertTrue(isinstance(ahr, AsyncHubResult))
155 self.assertTrue(isinstance(ahr, AsyncHubResult))
156 self.assertEquals(ahr.get(), ar.get())
156 self.assertEquals(ahr.get(), ar.get())
157 ar2 = self.client.get_result(ar.msg_ids)
157 ar2 = self.client.get_result(ar.msg_ids)
158 self.assertFalse(isinstance(ar2, AsyncHubResult))
158 self.assertFalse(isinstance(ar2, AsyncHubResult))
159 c.close()
159 c.close()
160
160
161 def test_ids_list(self):
161 def test_ids_list(self):
162 """test client.ids"""
162 """test client.ids"""
163 ids = self.client.ids
163 ids = self.client.ids
164 self.assertEquals(ids, self.client._ids)
164 self.assertEquals(ids, self.client._ids)
165 self.assertFalse(ids is self.client._ids)
165 self.assertFalse(ids is self.client._ids)
166 ids.remove(ids[-1])
166 ids.remove(ids[-1])
167 self.assertNotEquals(ids, self.client._ids)
167 self.assertNotEquals(ids, self.client._ids)
168
168
169 def test_queue_status(self):
169 def test_queue_status(self):
170 ids = self.client.ids
170 ids = self.client.ids
171 id0 = ids[0]
171 id0 = ids[0]
172 qs = self.client.queue_status(targets=id0)
172 qs = self.client.queue_status(targets=id0)
173 self.assertTrue(isinstance(qs, dict))
173 self.assertTrue(isinstance(qs, dict))
174 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
174 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
175 allqs = self.client.queue_status()
175 allqs = self.client.queue_status()
176 self.assertTrue(isinstance(allqs, dict))
176 self.assertTrue(isinstance(allqs, dict))
177 intkeys = list(allqs.keys())
177 intkeys = list(allqs.keys())
178 intkeys.remove('unassigned')
178 intkeys.remove('unassigned')
179 self.assertEquals(sorted(intkeys), sorted(self.client.ids))
179 self.assertEquals(sorted(intkeys), sorted(self.client.ids))
180 unassigned = allqs.pop('unassigned')
180 unassigned = allqs.pop('unassigned')
181 for eid,qs in allqs.items():
181 for eid,qs in allqs.items():
182 self.assertTrue(isinstance(qs, dict))
182 self.assertTrue(isinstance(qs, dict))
183 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
183 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
184
184
185 def test_shutdown(self):
185 def test_shutdown(self):
186 ids = self.client.ids
186 ids = self.client.ids
187 id0 = ids[0]
187 id0 = ids[0]
188 self.client.shutdown(id0, block=True)
188 self.client.shutdown(id0, block=True)
189 while id0 in self.client.ids:
189 while id0 in self.client.ids:
190 time.sleep(0.1)
190 time.sleep(0.1)
191 self.client.spin()
191 self.client.spin()
192
192
193 self.assertRaises(IndexError, lambda : self.client[id0])
193 self.assertRaises(IndexError, lambda : self.client[id0])
194
194
195 def test_result_status(self):
195 def test_result_status(self):
196 pass
196 pass
197 # to be written
197 # to be written
198
198
199 def test_db_query_dt(self):
199 def test_db_query_dt(self):
200 """test db query by date"""
200 """test db query by date"""
201 hist = self.client.hub_history()
201 hist = self.client.hub_history()
202 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
202 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
203 tic = middle['submitted']
203 tic = middle['submitted']
204 before = self.client.db_query({'submitted' : {'$lt' : tic}})
204 before = self.client.db_query({'submitted' : {'$lt' : tic}})
205 after = self.client.db_query({'submitted' : {'$gte' : tic}})
205 after = self.client.db_query({'submitted' : {'$gte' : tic}})
206 self.assertEquals(len(before)+len(after),len(hist))
206 self.assertEquals(len(before)+len(after),len(hist))
207 for b in before:
207 for b in before:
208 self.assertTrue(b['submitted'] < tic)
208 self.assertTrue(b['submitted'] < tic)
209 for a in after:
209 for a in after:
210 self.assertTrue(a['submitted'] >= tic)
210 self.assertTrue(a['submitted'] >= tic)
211 same = self.client.db_query({'submitted' : tic})
211 same = self.client.db_query({'submitted' : tic})
212 for s in same:
212 for s in same:
213 self.assertTrue(s['submitted'] == tic)
213 self.assertTrue(s['submitted'] == tic)
214
214
215 def test_db_query_keys(self):
215 def test_db_query_keys(self):
216 """test extracting subset of record keys"""
216 """test extracting subset of record keys"""
217 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
217 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
218 for rec in found:
218 for rec in found:
219 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
219 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
220
220
221 def test_db_query_default_keys(self):
221 def test_db_query_default_keys(self):
222 """default db_query excludes buffers"""
222 """default db_query excludes buffers"""
223 found = self.client.db_query({'msg_id': {'$ne' : ''}})
223 found = self.client.db_query({'msg_id': {'$ne' : ''}})
224 for rec in found:
224 for rec in found:
225 keys = set(rec.keys())
225 keys = set(rec.keys())
226 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
226 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
227 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
227 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
228
228
229 def test_db_query_msg_id(self):
229 def test_db_query_msg_id(self):
230 """ensure msg_id is always in db queries"""
230 """ensure msg_id is always in db queries"""
231 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
231 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
232 for rec in found:
232 for rec in found:
233 self.assertTrue('msg_id' in rec.keys())
233 self.assertTrue('msg_id' in rec.keys())
234 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
234 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
235 for rec in found:
235 for rec in found:
236 self.assertTrue('msg_id' in rec.keys())
236 self.assertTrue('msg_id' in rec.keys())
237 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
237 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
238 for rec in found:
238 for rec in found:
239 self.assertTrue('msg_id' in rec.keys())
239 self.assertTrue('msg_id' in rec.keys())
240
240
241 def test_db_query_get_result(self):
241 def test_db_query_get_result(self):
242 """pop in db_query shouldn't pop from result itself"""
242 """pop in db_query shouldn't pop from result itself"""
243 self.client[:].apply_sync(lambda : 1)
243 self.client[:].apply_sync(lambda : 1)
244 found = self.client.db_query({'msg_id': {'$ne' : ''}})
244 found = self.client.db_query({'msg_id': {'$ne' : ''}})
245 rc2 = clientmod.Client(profile='iptest')
245 rc2 = clientmod.Client(profile='iptest')
246 # If this bug is not fixed, this call will hang:
246 # If this bug is not fixed, this call will hang:
247 ar = rc2.get_result(self.client.history[-1])
247 ar = rc2.get_result(self.client.history[-1])
248 ar.wait(2)
248 ar.wait(2)
249 self.assertTrue(ar.ready())
249 self.assertTrue(ar.ready())
250 ar.get()
250 ar.get()
251 rc2.close()
251 rc2.close()
252
252
253 def test_db_query_in(self):
253 def test_db_query_in(self):
254 """test db query with '$in','$nin' operators"""
254 """test db query with '$in','$nin' operators"""
255 hist = self.client.hub_history()
255 hist = self.client.hub_history()
256 even = hist[::2]
256 even = hist[::2]
257 odd = hist[1::2]
257 odd = hist[1::2]
258 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
258 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
259 found = [ r['msg_id'] for r in recs ]
259 found = [ r['msg_id'] for r in recs ]
260 self.assertEquals(set(even), set(found))
260 self.assertEquals(set(even), set(found))
261 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
261 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
262 found = [ r['msg_id'] for r in recs ]
262 found = [ r['msg_id'] for r in recs ]
263 self.assertEquals(set(odd), set(found))
263 self.assertEquals(set(odd), set(found))
264
264
265 def test_hub_history(self):
265 def test_hub_history(self):
266 hist = self.client.hub_history()
266 hist = self.client.hub_history()
267 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
267 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
268 recdict = {}
268 recdict = {}
269 for rec in recs:
269 for rec in recs:
270 recdict[rec['msg_id']] = rec
270 recdict[rec['msg_id']] = rec
271
271
272 latest = datetime(1984,1,1)
272 latest = datetime(1984,1,1)
273 for msg_id in hist:
273 for msg_id in hist:
274 rec = recdict[msg_id]
274 rec = recdict[msg_id]
275 newt = rec['submitted']
275 newt = rec['submitted']
276 self.assertTrue(newt >= latest)
276 self.assertTrue(newt >= latest)
277 latest = newt
277 latest = newt
278 ar = self.client[-1].apply_async(lambda : 1)
278 ar = self.client[-1].apply_async(lambda : 1)
279 ar.get()
279 ar.get()
280 time.sleep(0.25)
280 time.sleep(0.25)
281 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
281 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
282
282
283 def _wait_for_idle(self):
284 """wait for an engine to become idle, according to the Hub"""
285 rc = self.client
286
287 # timeout 2s, polling every 100ms
288 for i in range(20):
289 qs = rc.queue_status()
290 if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids):
291 time.sleep(0.1)
292 else:
293 break
294
295 # ensure Hub up to date:
296 qs = rc.queue_status()
297 self.assertEquals(qs['unassigned'], 0)
298 for eid in rc.ids:
299 self.assertEquals(qs[eid]['tasks'], 0)
300
301
283 def test_resubmit(self):
302 def test_resubmit(self):
284 def f():
303 def f():
285 import random
304 import random
286 return random.random()
305 return random.random()
287 v = self.client.load_balanced_view()
306 v = self.client.load_balanced_view()
288 ar = v.apply_async(f)
307 ar = v.apply_async(f)
289 r1 = ar.get(1)
308 r1 = ar.get(1)
290 # give the Hub a chance to notice:
309 # give the Hub a chance to notice:
291 time.sleep(0.5)
310 self._wait_for_idle()
292 ahr = self.client.resubmit(ar.msg_ids)
311 ahr = self.client.resubmit(ar.msg_ids)
293 r2 = ahr.get(1)
312 r2 = ahr.get(1)
294 self.assertFalse(r1 == r2)
313 self.assertFalse(r1 == r2)
295
314
315 def test_resubmit_aborted(self):
316 def f():
317 import random
318 return random.random()
319 v = self.client.load_balanced_view()
320 # restrict to one engine, so we can put a sleep
321 # ahead of the task, so it will get aborted
322 eid = self.client.ids[-1]
323 v.targets = [eid]
324 sleep = v.apply_async(time.sleep, 0.5)
325 ar = v.apply_async(f)
326 ar.abort()
327 self.assertRaises(error.TaskAborted, ar.get)
328 # Give the Hub a chance to get up to date:
329 self._wait_for_idle()
330 ahr = self.client.resubmit(ar.msg_ids)
331 r2 = ahr.get(1)
332
296 def test_resubmit_inflight(self):
333 def test_resubmit_inflight(self):
297 """ensure ValueError on resubmit of inflight task"""
334 """ensure ValueError on resubmit of inflight task"""
298 v = self.client.load_balanced_view()
335 v = self.client.load_balanced_view()
299 ar = v.apply_async(time.sleep,1)
336 ar = v.apply_async(time.sleep,1)
300 # give the message a chance to arrive
337 # give the message a chance to arrive
301 time.sleep(0.2)
338 time.sleep(0.2)
302 self.assertRaisesRemote(ValueError, self.client.resubmit, ar.msg_ids)
339 self.assertRaisesRemote(ValueError, self.client.resubmit, ar.msg_ids)
303 ar.get(2)
340 ar.get(2)
304
341
305 def test_resubmit_badkey(self):
342 def test_resubmit_badkey(self):
306 """ensure KeyError on resubmit of nonexistant task"""
343 """ensure KeyError on resubmit of nonexistant task"""
307 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
344 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
308
345
309 def test_purge_results(self):
346 def test_purge_results(self):
310 # ensure there are some tasks
347 # ensure there are some tasks
311 for i in range(5):
348 for i in range(5):
312 self.client[:].apply_sync(lambda : 1)
349 self.client[:].apply_sync(lambda : 1)
313 # Wait for the Hub to realise the result is done:
350 # Wait for the Hub to realise the result is done:
314 # This prevents a race condition, where we
351 # This prevents a race condition, where we
315 # might purge a result the Hub still thinks is pending.
352 # might purge a result the Hub still thinks is pending.
316 time.sleep(0.1)
353 time.sleep(0.1)
317 rc2 = clientmod.Client(profile='iptest')
354 rc2 = clientmod.Client(profile='iptest')
318 hist = self.client.hub_history()
355 hist = self.client.hub_history()
319 ahr = rc2.get_result([hist[-1]])
356 ahr = rc2.get_result([hist[-1]])
320 ahr.wait(10)
357 ahr.wait(10)
321 self.client.purge_results(hist[-1])
358 self.client.purge_results(hist[-1])
322 newhist = self.client.hub_history()
359 newhist = self.client.hub_history()
323 self.assertEquals(len(newhist)+1,len(hist))
360 self.assertEquals(len(newhist)+1,len(hist))
324 rc2.spin()
361 rc2.spin()
325 rc2.close()
362 rc2.close()
326
363
327 def test_purge_all_results(self):
364 def test_purge_all_results(self):
328 self.client.purge_results('all')
365 self.client.purge_results('all')
329 hist = self.client.hub_history()
366 hist = self.client.hub_history()
330 self.assertEquals(len(hist), 0)
367 self.assertEquals(len(hist), 0)
331
368
332 def test_spin_thread(self):
369 def test_spin_thread(self):
333 self.client.spin_thread(0.01)
370 self.client.spin_thread(0.01)
334 ar = self.client[-1].apply_async(lambda : 1)
371 ar = self.client[-1].apply_async(lambda : 1)
335 time.sleep(0.1)
372 time.sleep(0.1)
336 self.assertTrue(ar.wall_time < 0.1,
373 self.assertTrue(ar.wall_time < 0.1,
337 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
374 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
338 )
375 )
339
376
340 def test_stop_spin_thread(self):
377 def test_stop_spin_thread(self):
341 self.client.spin_thread(0.01)
378 self.client.spin_thread(0.01)
342 self.client.stop_spin_thread()
379 self.client.stop_spin_thread()
343 ar = self.client[-1].apply_async(lambda : 1)
380 ar = self.client[-1].apply_async(lambda : 1)
344 time.sleep(0.15)
381 time.sleep(0.15)
345 self.assertTrue(ar.wall_time > 0.1,
382 self.assertTrue(ar.wall_time > 0.1,
346 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
383 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
347 )
384 )
348
385
349
386
General Comments 0
You need to be logged in to leave comments. Login now