##// END OF EJS Templates
minor tweak to wait_for_idle in tests
MinRK -
Show More
@@ -1,436 +1,436 b''
1 """Tests for parallel client.py
1 """Tests for parallel client.py
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 from __future__ import division
19 from __future__ import division
20
20
21 import time
21 import time
22 from datetime import datetime
22 from datetime import datetime
23 from tempfile import mktemp
23 from tempfile import mktemp
24
24
25 import zmq
25 import zmq
26
26
27 from IPython import parallel
27 from IPython import parallel
28 from IPython.parallel.client import client as clientmod
28 from IPython.parallel.client import client as clientmod
29 from IPython.parallel import error
29 from IPython.parallel import error
30 from IPython.parallel import AsyncResult, AsyncHubResult
30 from IPython.parallel import AsyncResult, AsyncHubResult
31 from IPython.parallel import LoadBalancedView, DirectView
31 from IPython.parallel import LoadBalancedView, DirectView
32
32
33 from clienttest import ClusterTestCase, segfault, wait, add_engines
33 from clienttest import ClusterTestCase, segfault, wait, add_engines
34
34
35 def setup():
35 def setup():
36 add_engines(4, total=True)
36 add_engines(4, total=True)
37
37
38 class TestClient(ClusterTestCase):
38 class TestClient(ClusterTestCase):
39
39
40 def test_ids(self):
40 def test_ids(self):
41 n = len(self.client.ids)
41 n = len(self.client.ids)
42 self.add_engines(2)
42 self.add_engines(2)
43 self.assertEquals(len(self.client.ids), n+2)
43 self.assertEquals(len(self.client.ids), n+2)
44
44
45 def test_view_indexing(self):
45 def test_view_indexing(self):
46 """test index access for views"""
46 """test index access for views"""
47 self.minimum_engines(4)
47 self.minimum_engines(4)
48 targets = self.client._build_targets('all')[-1]
48 targets = self.client._build_targets('all')[-1]
49 v = self.client[:]
49 v = self.client[:]
50 self.assertEquals(v.targets, targets)
50 self.assertEquals(v.targets, targets)
51 t = self.client.ids[2]
51 t = self.client.ids[2]
52 v = self.client[t]
52 v = self.client[t]
53 self.assert_(isinstance(v, DirectView))
53 self.assert_(isinstance(v, DirectView))
54 self.assertEquals(v.targets, t)
54 self.assertEquals(v.targets, t)
55 t = self.client.ids[2:4]
55 t = self.client.ids[2:4]
56 v = self.client[t]
56 v = self.client[t]
57 self.assert_(isinstance(v, DirectView))
57 self.assert_(isinstance(v, DirectView))
58 self.assertEquals(v.targets, t)
58 self.assertEquals(v.targets, t)
59 v = self.client[::2]
59 v = self.client[::2]
60 self.assert_(isinstance(v, DirectView))
60 self.assert_(isinstance(v, DirectView))
61 self.assertEquals(v.targets, targets[::2])
61 self.assertEquals(v.targets, targets[::2])
62 v = self.client[1::3]
62 v = self.client[1::3]
63 self.assert_(isinstance(v, DirectView))
63 self.assert_(isinstance(v, DirectView))
64 self.assertEquals(v.targets, targets[1::3])
64 self.assertEquals(v.targets, targets[1::3])
65 v = self.client[:-3]
65 v = self.client[:-3]
66 self.assert_(isinstance(v, DirectView))
66 self.assert_(isinstance(v, DirectView))
67 self.assertEquals(v.targets, targets[:-3])
67 self.assertEquals(v.targets, targets[:-3])
68 v = self.client[-1]
68 v = self.client[-1]
69 self.assert_(isinstance(v, DirectView))
69 self.assert_(isinstance(v, DirectView))
70 self.assertEquals(v.targets, targets[-1])
70 self.assertEquals(v.targets, targets[-1])
71 self.assertRaises(TypeError, lambda : self.client[None])
71 self.assertRaises(TypeError, lambda : self.client[None])
72
72
73 def test_lbview_targets(self):
73 def test_lbview_targets(self):
74 """test load_balanced_view targets"""
74 """test load_balanced_view targets"""
75 v = self.client.load_balanced_view()
75 v = self.client.load_balanced_view()
76 self.assertEquals(v.targets, None)
76 self.assertEquals(v.targets, None)
77 v = self.client.load_balanced_view(-1)
77 v = self.client.load_balanced_view(-1)
78 self.assertEquals(v.targets, [self.client.ids[-1]])
78 self.assertEquals(v.targets, [self.client.ids[-1]])
79 v = self.client.load_balanced_view('all')
79 v = self.client.load_balanced_view('all')
80 self.assertEquals(v.targets, None)
80 self.assertEquals(v.targets, None)
81
81
82 def test_dview_targets(self):
82 def test_dview_targets(self):
83 """test direct_view targets"""
83 """test direct_view targets"""
84 v = self.client.direct_view()
84 v = self.client.direct_view()
85 self.assertEquals(v.targets, 'all')
85 self.assertEquals(v.targets, 'all')
86 v = self.client.direct_view('all')
86 v = self.client.direct_view('all')
87 self.assertEquals(v.targets, 'all')
87 self.assertEquals(v.targets, 'all')
88 v = self.client.direct_view(-1)
88 v = self.client.direct_view(-1)
89 self.assertEquals(v.targets, self.client.ids[-1])
89 self.assertEquals(v.targets, self.client.ids[-1])
90
90
91 def test_lazy_all_targets(self):
91 def test_lazy_all_targets(self):
92 """test lazy evaluation of rc.direct_view('all')"""
92 """test lazy evaluation of rc.direct_view('all')"""
93 v = self.client.direct_view()
93 v = self.client.direct_view()
94 self.assertEquals(v.targets, 'all')
94 self.assertEquals(v.targets, 'all')
95
95
96 def double(x):
96 def double(x):
97 return x*2
97 return x*2
98 seq = range(100)
98 seq = range(100)
99 ref = [ double(x) for x in seq ]
99 ref = [ double(x) for x in seq ]
100
100
101 # add some engines, which should be used
101 # add some engines, which should be used
102 self.add_engines(1)
102 self.add_engines(1)
103 n1 = len(self.client.ids)
103 n1 = len(self.client.ids)
104
104
105 # simple apply
105 # simple apply
106 r = v.apply_sync(lambda : 1)
106 r = v.apply_sync(lambda : 1)
107 self.assertEquals(r, [1] * n1)
107 self.assertEquals(r, [1] * n1)
108
108
109 # map goes through remotefunction
109 # map goes through remotefunction
110 r = v.map_sync(double, seq)
110 r = v.map_sync(double, seq)
111 self.assertEquals(r, ref)
111 self.assertEquals(r, ref)
112
112
113 # add a couple more engines, and try again
113 # add a couple more engines, and try again
114 self.add_engines(2)
114 self.add_engines(2)
115 n2 = len(self.client.ids)
115 n2 = len(self.client.ids)
116 self.assertNotEquals(n2, n1)
116 self.assertNotEquals(n2, n1)
117
117
118 # apply
118 # apply
119 r = v.apply_sync(lambda : 1)
119 r = v.apply_sync(lambda : 1)
120 self.assertEquals(r, [1] * n2)
120 self.assertEquals(r, [1] * n2)
121
121
122 # map
122 # map
123 r = v.map_sync(double, seq)
123 r = v.map_sync(double, seq)
124 self.assertEquals(r, ref)
124 self.assertEquals(r, ref)
125
125
126 def test_targets(self):
126 def test_targets(self):
127 """test various valid targets arguments"""
127 """test various valid targets arguments"""
128 build = self.client._build_targets
128 build = self.client._build_targets
129 ids = self.client.ids
129 ids = self.client.ids
130 idents,targets = build(None)
130 idents,targets = build(None)
131 self.assertEquals(ids, targets)
131 self.assertEquals(ids, targets)
132
132
133 def test_clear(self):
133 def test_clear(self):
134 """test clear behavior"""
134 """test clear behavior"""
135 self.minimum_engines(2)
135 self.minimum_engines(2)
136 v = self.client[:]
136 v = self.client[:]
137 v.block=True
137 v.block=True
138 v.push(dict(a=5))
138 v.push(dict(a=5))
139 v.pull('a')
139 v.pull('a')
140 id0 = self.client.ids[-1]
140 id0 = self.client.ids[-1]
141 self.client.clear(targets=id0, block=True)
141 self.client.clear(targets=id0, block=True)
142 a = self.client[:-1].get('a')
142 a = self.client[:-1].get('a')
143 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
143 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
144 self.client.clear(block=True)
144 self.client.clear(block=True)
145 for i in self.client.ids:
145 for i in self.client.ids:
146 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
146 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
147
147
148 def test_get_result(self):
148 def test_get_result(self):
149 """test getting results from the Hub."""
149 """test getting results from the Hub."""
150 c = clientmod.Client(profile='iptest')
150 c = clientmod.Client(profile='iptest')
151 t = c.ids[-1]
151 t = c.ids[-1]
152 ar = c[t].apply_async(wait, 1)
152 ar = c[t].apply_async(wait, 1)
153 # give the monitor time to notice the message
153 # give the monitor time to notice the message
154 time.sleep(.25)
154 time.sleep(.25)
155 ahr = self.client.get_result(ar.msg_ids)
155 ahr = self.client.get_result(ar.msg_ids)
156 self.assertTrue(isinstance(ahr, AsyncHubResult))
156 self.assertTrue(isinstance(ahr, AsyncHubResult))
157 self.assertEquals(ahr.get(), ar.get())
157 self.assertEquals(ahr.get(), ar.get())
158 ar2 = self.client.get_result(ar.msg_ids)
158 ar2 = self.client.get_result(ar.msg_ids)
159 self.assertFalse(isinstance(ar2, AsyncHubResult))
159 self.assertFalse(isinstance(ar2, AsyncHubResult))
160 c.close()
160 c.close()
161
161
162 def test_ids_list(self):
162 def test_ids_list(self):
163 """test client.ids"""
163 """test client.ids"""
164 ids = self.client.ids
164 ids = self.client.ids
165 self.assertEquals(ids, self.client._ids)
165 self.assertEquals(ids, self.client._ids)
166 self.assertFalse(ids is self.client._ids)
166 self.assertFalse(ids is self.client._ids)
167 ids.remove(ids[-1])
167 ids.remove(ids[-1])
168 self.assertNotEquals(ids, self.client._ids)
168 self.assertNotEquals(ids, self.client._ids)
169
169
170 def test_queue_status(self):
170 def test_queue_status(self):
171 ids = self.client.ids
171 ids = self.client.ids
172 id0 = ids[0]
172 id0 = ids[0]
173 qs = self.client.queue_status(targets=id0)
173 qs = self.client.queue_status(targets=id0)
174 self.assertTrue(isinstance(qs, dict))
174 self.assertTrue(isinstance(qs, dict))
175 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
175 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
176 allqs = self.client.queue_status()
176 allqs = self.client.queue_status()
177 self.assertTrue(isinstance(allqs, dict))
177 self.assertTrue(isinstance(allqs, dict))
178 intkeys = list(allqs.keys())
178 intkeys = list(allqs.keys())
179 intkeys.remove('unassigned')
179 intkeys.remove('unassigned')
180 self.assertEquals(sorted(intkeys), sorted(self.client.ids))
180 self.assertEquals(sorted(intkeys), sorted(self.client.ids))
181 unassigned = allqs.pop('unassigned')
181 unassigned = allqs.pop('unassigned')
182 for eid,qs in allqs.items():
182 for eid,qs in allqs.items():
183 self.assertTrue(isinstance(qs, dict))
183 self.assertTrue(isinstance(qs, dict))
184 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
184 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
185
185
186 def test_shutdown(self):
186 def test_shutdown(self):
187 ids = self.client.ids
187 ids = self.client.ids
188 id0 = ids[0]
188 id0 = ids[0]
189 self.client.shutdown(id0, block=True)
189 self.client.shutdown(id0, block=True)
190 while id0 in self.client.ids:
190 while id0 in self.client.ids:
191 time.sleep(0.1)
191 time.sleep(0.1)
192 self.client.spin()
192 self.client.spin()
193
193
194 self.assertRaises(IndexError, lambda : self.client[id0])
194 self.assertRaises(IndexError, lambda : self.client[id0])
195
195
196 def test_result_status(self):
196 def test_result_status(self):
197 pass
197 pass
198 # to be written
198 # to be written
199
199
200 def test_db_query_dt(self):
200 def test_db_query_dt(self):
201 """test db query by date"""
201 """test db query by date"""
202 hist = self.client.hub_history()
202 hist = self.client.hub_history()
203 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
203 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
204 tic = middle['submitted']
204 tic = middle['submitted']
205 before = self.client.db_query({'submitted' : {'$lt' : tic}})
205 before = self.client.db_query({'submitted' : {'$lt' : tic}})
206 after = self.client.db_query({'submitted' : {'$gte' : tic}})
206 after = self.client.db_query({'submitted' : {'$gte' : tic}})
207 self.assertEquals(len(before)+len(after),len(hist))
207 self.assertEquals(len(before)+len(after),len(hist))
208 for b in before:
208 for b in before:
209 self.assertTrue(b['submitted'] < tic)
209 self.assertTrue(b['submitted'] < tic)
210 for a in after:
210 for a in after:
211 self.assertTrue(a['submitted'] >= tic)
211 self.assertTrue(a['submitted'] >= tic)
212 same = self.client.db_query({'submitted' : tic})
212 same = self.client.db_query({'submitted' : tic})
213 for s in same:
213 for s in same:
214 self.assertTrue(s['submitted'] == tic)
214 self.assertTrue(s['submitted'] == tic)
215
215
216 def test_db_query_keys(self):
216 def test_db_query_keys(self):
217 """test extracting subset of record keys"""
217 """test extracting subset of record keys"""
218 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
218 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
219 for rec in found:
219 for rec in found:
220 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
220 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
221
221
222 def test_db_query_default_keys(self):
222 def test_db_query_default_keys(self):
223 """default db_query excludes buffers"""
223 """default db_query excludes buffers"""
224 found = self.client.db_query({'msg_id': {'$ne' : ''}})
224 found = self.client.db_query({'msg_id': {'$ne' : ''}})
225 for rec in found:
225 for rec in found:
226 keys = set(rec.keys())
226 keys = set(rec.keys())
227 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
227 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
228 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
228 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
229
229
230 def test_db_query_msg_id(self):
230 def test_db_query_msg_id(self):
231 """ensure msg_id is always in db queries"""
231 """ensure msg_id is always in db queries"""
232 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
232 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
233 for rec in found:
233 for rec in found:
234 self.assertTrue('msg_id' in rec.keys())
234 self.assertTrue('msg_id' in rec.keys())
235 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
235 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
236 for rec in found:
236 for rec in found:
237 self.assertTrue('msg_id' in rec.keys())
237 self.assertTrue('msg_id' in rec.keys())
238 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
238 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
239 for rec in found:
239 for rec in found:
240 self.assertTrue('msg_id' in rec.keys())
240 self.assertTrue('msg_id' in rec.keys())
241
241
242 def test_db_query_get_result(self):
242 def test_db_query_get_result(self):
243 """pop in db_query shouldn't pop from result itself"""
243 """pop in db_query shouldn't pop from result itself"""
244 self.client[:].apply_sync(lambda : 1)
244 self.client[:].apply_sync(lambda : 1)
245 found = self.client.db_query({'msg_id': {'$ne' : ''}})
245 found = self.client.db_query({'msg_id': {'$ne' : ''}})
246 rc2 = clientmod.Client(profile='iptest')
246 rc2 = clientmod.Client(profile='iptest')
247 # If this bug is not fixed, this call will hang:
247 # If this bug is not fixed, this call will hang:
248 ar = rc2.get_result(self.client.history[-1])
248 ar = rc2.get_result(self.client.history[-1])
249 ar.wait(2)
249 ar.wait(2)
250 self.assertTrue(ar.ready())
250 self.assertTrue(ar.ready())
251 ar.get()
251 ar.get()
252 rc2.close()
252 rc2.close()
253
253
254 def test_db_query_in(self):
254 def test_db_query_in(self):
255 """test db query with '$in','$nin' operators"""
255 """test db query with '$in','$nin' operators"""
256 hist = self.client.hub_history()
256 hist = self.client.hub_history()
257 even = hist[::2]
257 even = hist[::2]
258 odd = hist[1::2]
258 odd = hist[1::2]
259 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
259 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
260 found = [ r['msg_id'] for r in recs ]
260 found = [ r['msg_id'] for r in recs ]
261 self.assertEquals(set(even), set(found))
261 self.assertEquals(set(even), set(found))
262 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
262 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
263 found = [ r['msg_id'] for r in recs ]
263 found = [ r['msg_id'] for r in recs ]
264 self.assertEquals(set(odd), set(found))
264 self.assertEquals(set(odd), set(found))
265
265
266 def test_hub_history(self):
266 def test_hub_history(self):
267 hist = self.client.hub_history()
267 hist = self.client.hub_history()
268 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
268 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
269 recdict = {}
269 recdict = {}
270 for rec in recs:
270 for rec in recs:
271 recdict[rec['msg_id']] = rec
271 recdict[rec['msg_id']] = rec
272
272
273 latest = datetime(1984,1,1)
273 latest = datetime(1984,1,1)
274 for msg_id in hist:
274 for msg_id in hist:
275 rec = recdict[msg_id]
275 rec = recdict[msg_id]
276 newt = rec['submitted']
276 newt = rec['submitted']
277 self.assertTrue(newt >= latest)
277 self.assertTrue(newt >= latest)
278 latest = newt
278 latest = newt
279 ar = self.client[-1].apply_async(lambda : 1)
279 ar = self.client[-1].apply_async(lambda : 1)
280 ar.get()
280 ar.get()
281 time.sleep(0.25)
281 time.sleep(0.25)
282 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
282 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
283
283
284 def _wait_for_idle(self):
284 def _wait_for_idle(self):
285 """wait for an engine to become idle, according to the Hub"""
285 """wait for an engine to become idle, according to the Hub"""
286 rc = self.client
286 rc = self.client
287
287
288 # timeout 2s, polling every 100ms
288 # timeout 5s, polling every 100ms
289 for i in range(20):
289 qs = rc.queue_status()
290 qs = rc.queue_status()
290 for i in range(50):
291 if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids):
291 if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids):
292 time.sleep(0.1)
292 time.sleep(0.1)
293 qs = rc.queue_status()
293 else:
294 else:
294 break
295 break
295
296
296 # ensure Hub up to date:
297 # ensure Hub up to date:
297 qs = rc.queue_status()
298 self.assertEquals(qs['unassigned'], 0)
298 self.assertEquals(qs['unassigned'], 0)
299 for eid in rc.ids:
299 for eid in rc.ids:
300 self.assertEquals(qs[eid]['tasks'], 0)
300 self.assertEquals(qs[eid]['tasks'], 0)
301
301
302
302
303 def test_resubmit(self):
303 def test_resubmit(self):
304 def f():
304 def f():
305 import random
305 import random
306 return random.random()
306 return random.random()
307 v = self.client.load_balanced_view()
307 v = self.client.load_balanced_view()
308 ar = v.apply_async(f)
308 ar = v.apply_async(f)
309 r1 = ar.get(1)
309 r1 = ar.get(1)
310 # give the Hub a chance to notice:
310 # give the Hub a chance to notice:
311 self._wait_for_idle()
311 self._wait_for_idle()
312 ahr = self.client.resubmit(ar.msg_ids)
312 ahr = self.client.resubmit(ar.msg_ids)
313 r2 = ahr.get(1)
313 r2 = ahr.get(1)
314 self.assertFalse(r1 == r2)
314 self.assertFalse(r1 == r2)
315
315
316 def test_resubmit_chain(self):
316 def test_resubmit_chain(self):
317 """resubmit resubmitted tasks"""
317 """resubmit resubmitted tasks"""
318 v = self.client.load_balanced_view()
318 v = self.client.load_balanced_view()
319 ar = v.apply_async(lambda x: x, 'x'*1024)
319 ar = v.apply_async(lambda x: x, 'x'*1024)
320 ar.get()
320 ar.get()
321 self._wait_for_idle()
321 self._wait_for_idle()
322 ars = [ar]
322 ars = [ar]
323
323
324 for i in range(10):
324 for i in range(10):
325 ar = ars[-1]
325 ar = ars[-1]
326 ar2 = self.client.resubmit(ar.msg_ids)
326 ar2 = self.client.resubmit(ar.msg_ids)
327
327
328 [ ar.get() for ar in ars ]
328 [ ar.get() for ar in ars ]
329
329
330 def test_resubmit_header(self):
330 def test_resubmit_header(self):
331 """resubmit shouldn't clobber the whole header"""
331 """resubmit shouldn't clobber the whole header"""
332 def f():
332 def f():
333 import random
333 import random
334 return random.random()
334 return random.random()
335 v = self.client.load_balanced_view()
335 v = self.client.load_balanced_view()
336 v.retries = 1
336 v.retries = 1
337 ar = v.apply_async(f)
337 ar = v.apply_async(f)
338 r1 = ar.get(1)
338 r1 = ar.get(1)
339 # give the Hub a chance to notice:
339 # give the Hub a chance to notice:
340 self._wait_for_idle()
340 self._wait_for_idle()
341 ahr = self.client.resubmit(ar.msg_ids)
341 ahr = self.client.resubmit(ar.msg_ids)
342 ahr.get(1)
342 ahr.get(1)
343 time.sleep(0.5)
343 time.sleep(0.5)
344 records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header')
344 records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header')
345 h1,h2 = [ r['header'] for r in records ]
345 h1,h2 = [ r['header'] for r in records ]
346 for key in set(h1.keys()).union(set(h2.keys())):
346 for key in set(h1.keys()).union(set(h2.keys())):
347 if key in ('msg_id', 'date'):
347 if key in ('msg_id', 'date'):
348 self.assertNotEquals(h1[key], h2[key])
348 self.assertNotEquals(h1[key], h2[key])
349 else:
349 else:
350 self.assertEquals(h1[key], h2[key])
350 self.assertEquals(h1[key], h2[key])
351
351
352 def test_resubmit_aborted(self):
352 def test_resubmit_aborted(self):
353 def f():
353 def f():
354 import random
354 import random
355 return random.random()
355 return random.random()
356 v = self.client.load_balanced_view()
356 v = self.client.load_balanced_view()
357 # restrict to one engine, so we can put a sleep
357 # restrict to one engine, so we can put a sleep
358 # ahead of the task, so it will get aborted
358 # ahead of the task, so it will get aborted
359 eid = self.client.ids[-1]
359 eid = self.client.ids[-1]
360 v.targets = [eid]
360 v.targets = [eid]
361 sleep = v.apply_async(time.sleep, 0.5)
361 sleep = v.apply_async(time.sleep, 0.5)
362 ar = v.apply_async(f)
362 ar = v.apply_async(f)
363 ar.abort()
363 ar.abort()
364 self.assertRaises(error.TaskAborted, ar.get)
364 self.assertRaises(error.TaskAborted, ar.get)
365 # Give the Hub a chance to get up to date:
365 # Give the Hub a chance to get up to date:
366 self._wait_for_idle()
366 self._wait_for_idle()
367 ahr = self.client.resubmit(ar.msg_ids)
367 ahr = self.client.resubmit(ar.msg_ids)
368 r2 = ahr.get(1)
368 r2 = ahr.get(1)
369
369
370 def test_resubmit_inflight(self):
370 def test_resubmit_inflight(self):
371 """resubmit of inflight task"""
371 """resubmit of inflight task"""
372 v = self.client.load_balanced_view()
372 v = self.client.load_balanced_view()
373 ar = v.apply_async(time.sleep,1)
373 ar = v.apply_async(time.sleep,1)
374 # give the message a chance to arrive
374 # give the message a chance to arrive
375 time.sleep(0.2)
375 time.sleep(0.2)
376 ahr = self.client.resubmit(ar.msg_ids)
376 ahr = self.client.resubmit(ar.msg_ids)
377 ar.get(2)
377 ar.get(2)
378 ahr.get(2)
378 ahr.get(2)
379
379
380 def test_resubmit_badkey(self):
380 def test_resubmit_badkey(self):
381 """ensure KeyError on resubmit of nonexistant task"""
381 """ensure KeyError on resubmit of nonexistant task"""
382 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
382 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
383
383
384 def test_purge_results(self):
384 def test_purge_results(self):
385 # ensure there are some tasks
385 # ensure there are some tasks
386 for i in range(5):
386 for i in range(5):
387 self.client[:].apply_sync(lambda : 1)
387 self.client[:].apply_sync(lambda : 1)
388 # Wait for the Hub to realise the result is done:
388 # Wait for the Hub to realise the result is done:
389 # This prevents a race condition, where we
389 # This prevents a race condition, where we
390 # might purge a result the Hub still thinks is pending.
390 # might purge a result the Hub still thinks is pending.
391 time.sleep(0.1)
391 time.sleep(0.1)
392 rc2 = clientmod.Client(profile='iptest')
392 rc2 = clientmod.Client(profile='iptest')
393 hist = self.client.hub_history()
393 hist = self.client.hub_history()
394 ahr = rc2.get_result([hist[-1]])
394 ahr = rc2.get_result([hist[-1]])
395 ahr.wait(10)
395 ahr.wait(10)
396 self.client.purge_results(hist[-1])
396 self.client.purge_results(hist[-1])
397 newhist = self.client.hub_history()
397 newhist = self.client.hub_history()
398 self.assertEquals(len(newhist)+1,len(hist))
398 self.assertEquals(len(newhist)+1,len(hist))
399 rc2.spin()
399 rc2.spin()
400 rc2.close()
400 rc2.close()
401
401
402 def test_purge_all_results(self):
402 def test_purge_all_results(self):
403 self.client.purge_results('all')
403 self.client.purge_results('all')
404 hist = self.client.hub_history()
404 hist = self.client.hub_history()
405 self.assertEquals(len(hist), 0)
405 self.assertEquals(len(hist), 0)
406
406
407 def test_spin_thread(self):
407 def test_spin_thread(self):
408 self.client.spin_thread(0.01)
408 self.client.spin_thread(0.01)
409 ar = self.client[-1].apply_async(lambda : 1)
409 ar = self.client[-1].apply_async(lambda : 1)
410 time.sleep(0.1)
410 time.sleep(0.1)
411 self.assertTrue(ar.wall_time < 0.1,
411 self.assertTrue(ar.wall_time < 0.1,
412 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
412 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
413 )
413 )
414
414
415 def test_stop_spin_thread(self):
415 def test_stop_spin_thread(self):
416 self.client.spin_thread(0.01)
416 self.client.spin_thread(0.01)
417 self.client.stop_spin_thread()
417 self.client.stop_spin_thread()
418 ar = self.client[-1].apply_async(lambda : 1)
418 ar = self.client[-1].apply_async(lambda : 1)
419 time.sleep(0.15)
419 time.sleep(0.15)
420 self.assertTrue(ar.wall_time > 0.1,
420 self.assertTrue(ar.wall_time > 0.1,
421 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
421 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
422 )
422 )
423
423
424 def test_activate(self):
424 def test_activate(self):
425 ip = get_ipython()
425 ip = get_ipython()
426 magics = ip.magics_manager.magics
426 magics = ip.magics_manager.magics
427 self.assertTrue('px' in magics['line'])
427 self.assertTrue('px' in magics['line'])
428 self.assertTrue('px' in magics['cell'])
428 self.assertTrue('px' in magics['cell'])
429 v0 = self.client.activate(-1, '0')
429 v0 = self.client.activate(-1, '0')
430 self.assertTrue('px0' in magics['line'])
430 self.assertTrue('px0' in magics['line'])
431 self.assertTrue('px0' in magics['cell'])
431 self.assertTrue('px0' in magics['cell'])
432 self.assertEquals(v0.targets, self.client.ids[-1])
432 self.assertEquals(v0.targets, self.client.ids[-1])
433 v0 = self.client.activate('all', 'all')
433 v0 = self.client.activate('all', 'all')
434 self.assertTrue('pxall' in magics['line'])
434 self.assertTrue('pxall' in magics['line'])
435 self.assertTrue('pxall' in magics['cell'])
435 self.assertTrue('pxall' in magics['cell'])
436 self.assertEquals(v0.targets, 'all')
436 self.assertEquals(v0.targets, 'all')
General Comments 0
You need to be logged in to leave comments. Login now