##// END OF EJS Templates
prevent race condition in purge_results test...
MinRK -
Show More
@@ -1,261 +1,268 b''
1 """Tests for parallel client.py
1 """Tests for parallel client.py
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 from __future__ import division
19 from __future__ import division
20
20
21 import time
21 import time
22 from datetime import datetime
22 from datetime import datetime
23 from tempfile import mktemp
23 from tempfile import mktemp
24
24
25 import zmq
25 import zmq
26
26
27 from IPython.parallel.client import client as clientmod
27 from IPython.parallel.client import client as clientmod
28 from IPython.parallel import error
28 from IPython.parallel import error
29 from IPython.parallel import AsyncResult, AsyncHubResult
29 from IPython.parallel import AsyncResult, AsyncHubResult
30 from IPython.parallel import LoadBalancedView, DirectView
30 from IPython.parallel import LoadBalancedView, DirectView
31
31
32 from clienttest import ClusterTestCase, segfault, wait, add_engines
32 from clienttest import ClusterTestCase, segfault, wait, add_engines
33
33
34 def setup():
34 def setup():
35 add_engines(4)
35 add_engines(4)
36
36
37 class TestClient(ClusterTestCase):
37 class TestClient(ClusterTestCase):
38
38
39 def test_ids(self):
39 def test_ids(self):
40 n = len(self.client.ids)
40 n = len(self.client.ids)
41 self.add_engines(3)
41 self.add_engines(3)
42 self.assertEquals(len(self.client.ids), n+3)
42 self.assertEquals(len(self.client.ids), n+3)
43
43
44 def test_view_indexing(self):
44 def test_view_indexing(self):
45 """test index access for views"""
45 """test index access for views"""
46 self.add_engines(2)
46 self.add_engines(2)
47 targets = self.client._build_targets('all')[-1]
47 targets = self.client._build_targets('all')[-1]
48 v = self.client[:]
48 v = self.client[:]
49 self.assertEquals(v.targets, targets)
49 self.assertEquals(v.targets, targets)
50 t = self.client.ids[2]
50 t = self.client.ids[2]
51 v = self.client[t]
51 v = self.client[t]
52 self.assert_(isinstance(v, DirectView))
52 self.assert_(isinstance(v, DirectView))
53 self.assertEquals(v.targets, t)
53 self.assertEquals(v.targets, t)
54 t = self.client.ids[2:4]
54 t = self.client.ids[2:4]
55 v = self.client[t]
55 v = self.client[t]
56 self.assert_(isinstance(v, DirectView))
56 self.assert_(isinstance(v, DirectView))
57 self.assertEquals(v.targets, t)
57 self.assertEquals(v.targets, t)
58 v = self.client[::2]
58 v = self.client[::2]
59 self.assert_(isinstance(v, DirectView))
59 self.assert_(isinstance(v, DirectView))
60 self.assertEquals(v.targets, targets[::2])
60 self.assertEquals(v.targets, targets[::2])
61 v = self.client[1::3]
61 v = self.client[1::3]
62 self.assert_(isinstance(v, DirectView))
62 self.assert_(isinstance(v, DirectView))
63 self.assertEquals(v.targets, targets[1::3])
63 self.assertEquals(v.targets, targets[1::3])
64 v = self.client[:-3]
64 v = self.client[:-3]
65 self.assert_(isinstance(v, DirectView))
65 self.assert_(isinstance(v, DirectView))
66 self.assertEquals(v.targets, targets[:-3])
66 self.assertEquals(v.targets, targets[:-3])
67 v = self.client[-1]
67 v = self.client[-1]
68 self.assert_(isinstance(v, DirectView))
68 self.assert_(isinstance(v, DirectView))
69 self.assertEquals(v.targets, targets[-1])
69 self.assertEquals(v.targets, targets[-1])
70 self.assertRaises(TypeError, lambda : self.client[None])
70 self.assertRaises(TypeError, lambda : self.client[None])
71
71
72 def test_lbview_targets(self):
72 def test_lbview_targets(self):
73 """test load_balanced_view targets"""
73 """test load_balanced_view targets"""
74 v = self.client.load_balanced_view()
74 v = self.client.load_balanced_view()
75 self.assertEquals(v.targets, None)
75 self.assertEquals(v.targets, None)
76 v = self.client.load_balanced_view(-1)
76 v = self.client.load_balanced_view(-1)
77 self.assertEquals(v.targets, [self.client.ids[-1]])
77 self.assertEquals(v.targets, [self.client.ids[-1]])
78 v = self.client.load_balanced_view('all')
78 v = self.client.load_balanced_view('all')
79 self.assertEquals(v.targets, self.client.ids)
79 self.assertEquals(v.targets, self.client.ids)
80
80
81 def test_targets(self):
81 def test_targets(self):
82 """test various valid targets arguments"""
82 """test various valid targets arguments"""
83 build = self.client._build_targets
83 build = self.client._build_targets
84 ids = self.client.ids
84 ids = self.client.ids
85 idents,targets = build(None)
85 idents,targets = build(None)
86 self.assertEquals(ids, targets)
86 self.assertEquals(ids, targets)
87
87
88 def test_clear(self):
88 def test_clear(self):
89 """test clear behavior"""
89 """test clear behavior"""
90 # self.add_engines(2)
90 # self.add_engines(2)
91 v = self.client[:]
91 v = self.client[:]
92 v.block=True
92 v.block=True
93 v.push(dict(a=5))
93 v.push(dict(a=5))
94 v.pull('a')
94 v.pull('a')
95 id0 = self.client.ids[-1]
95 id0 = self.client.ids[-1]
96 self.client.clear(targets=id0, block=True)
96 self.client.clear(targets=id0, block=True)
97 a = self.client[:-1].get('a')
97 a = self.client[:-1].get('a')
98 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
98 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
99 self.client.clear(block=True)
99 self.client.clear(block=True)
100 for i in self.client.ids:
100 for i in self.client.ids:
101 # print i
101 # print i
102 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
102 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
103
103
104 def test_get_result(self):
104 def test_get_result(self):
105 """test getting results from the Hub."""
105 """test getting results from the Hub."""
106 c = clientmod.Client(profile='iptest')
106 c = clientmod.Client(profile='iptest')
107 # self.add_engines(1)
107 # self.add_engines(1)
108 t = c.ids[-1]
108 t = c.ids[-1]
109 ar = c[t].apply_async(wait, 1)
109 ar = c[t].apply_async(wait, 1)
110 # give the monitor time to notice the message
110 # give the monitor time to notice the message
111 time.sleep(.25)
111 time.sleep(.25)
112 ahr = self.client.get_result(ar.msg_ids)
112 ahr = self.client.get_result(ar.msg_ids)
113 self.assertTrue(isinstance(ahr, AsyncHubResult))
113 self.assertTrue(isinstance(ahr, AsyncHubResult))
114 self.assertEquals(ahr.get(), ar.get())
114 self.assertEquals(ahr.get(), ar.get())
115 ar2 = self.client.get_result(ar.msg_ids)
115 ar2 = self.client.get_result(ar.msg_ids)
116 self.assertFalse(isinstance(ar2, AsyncHubResult))
116 self.assertFalse(isinstance(ar2, AsyncHubResult))
117 c.close()
117 c.close()
118
118
119 def test_ids_list(self):
119 def test_ids_list(self):
120 """test client.ids"""
120 """test client.ids"""
121 # self.add_engines(2)
121 # self.add_engines(2)
122 ids = self.client.ids
122 ids = self.client.ids
123 self.assertEquals(ids, self.client._ids)
123 self.assertEquals(ids, self.client._ids)
124 self.assertFalse(ids is self.client._ids)
124 self.assertFalse(ids is self.client._ids)
125 ids.remove(ids[-1])
125 ids.remove(ids[-1])
126 self.assertNotEquals(ids, self.client._ids)
126 self.assertNotEquals(ids, self.client._ids)
127
127
128 def test_queue_status(self):
128 def test_queue_status(self):
129 # self.addEngine(4)
129 # self.addEngine(4)
130 ids = self.client.ids
130 ids = self.client.ids
131 id0 = ids[0]
131 id0 = ids[0]
132 qs = self.client.queue_status(targets=id0)
132 qs = self.client.queue_status(targets=id0)
133 self.assertTrue(isinstance(qs, dict))
133 self.assertTrue(isinstance(qs, dict))
134 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
134 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
135 allqs = self.client.queue_status()
135 allqs = self.client.queue_status()
136 self.assertTrue(isinstance(allqs, dict))
136 self.assertTrue(isinstance(allqs, dict))
137 intkeys = list(allqs.keys())
137 intkeys = list(allqs.keys())
138 intkeys.remove('unassigned')
138 intkeys.remove('unassigned')
139 self.assertEquals(sorted(intkeys), sorted(self.client.ids))
139 self.assertEquals(sorted(intkeys), sorted(self.client.ids))
140 unassigned = allqs.pop('unassigned')
140 unassigned = allqs.pop('unassigned')
141 for eid,qs in allqs.items():
141 for eid,qs in allqs.items():
142 self.assertTrue(isinstance(qs, dict))
142 self.assertTrue(isinstance(qs, dict))
143 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
143 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
144
144
145 def test_shutdown(self):
145 def test_shutdown(self):
146 # self.addEngine(4)
146 # self.addEngine(4)
147 ids = self.client.ids
147 ids = self.client.ids
148 id0 = ids[0]
148 id0 = ids[0]
149 self.client.shutdown(id0, block=True)
149 self.client.shutdown(id0, block=True)
150 while id0 in self.client.ids:
150 while id0 in self.client.ids:
151 time.sleep(0.1)
151 time.sleep(0.1)
152 self.client.spin()
152 self.client.spin()
153
153
154 self.assertRaises(IndexError, lambda : self.client[id0])
154 self.assertRaises(IndexError, lambda : self.client[id0])
155
155
156 def test_result_status(self):
156 def test_result_status(self):
157 pass
157 pass
158 # to be written
158 # to be written
159
159
160 def test_db_query_dt(self):
160 def test_db_query_dt(self):
161 """test db query by date"""
161 """test db query by date"""
162 hist = self.client.hub_history()
162 hist = self.client.hub_history()
163 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
163 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
164 tic = middle['submitted']
164 tic = middle['submitted']
165 before = self.client.db_query({'submitted' : {'$lt' : tic}})
165 before = self.client.db_query({'submitted' : {'$lt' : tic}})
166 after = self.client.db_query({'submitted' : {'$gte' : tic}})
166 after = self.client.db_query({'submitted' : {'$gte' : tic}})
167 self.assertEquals(len(before)+len(after),len(hist))
167 self.assertEquals(len(before)+len(after),len(hist))
168 for b in before:
168 for b in before:
169 self.assertTrue(b['submitted'] < tic)
169 self.assertTrue(b['submitted'] < tic)
170 for a in after:
170 for a in after:
171 self.assertTrue(a['submitted'] >= tic)
171 self.assertTrue(a['submitted'] >= tic)
172 same = self.client.db_query({'submitted' : tic})
172 same = self.client.db_query({'submitted' : tic})
173 for s in same:
173 for s in same:
174 self.assertTrue(s['submitted'] == tic)
174 self.assertTrue(s['submitted'] == tic)
175
175
176 def test_db_query_keys(self):
176 def test_db_query_keys(self):
177 """test extracting subset of record keys"""
177 """test extracting subset of record keys"""
178 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
178 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
179 for rec in found:
179 for rec in found:
180 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
180 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
181
181
182 def test_db_query_msg_id(self):
182 def test_db_query_msg_id(self):
183 """ensure msg_id is always in db queries"""
183 """ensure msg_id is always in db queries"""
184 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
184 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
185 for rec in found:
185 for rec in found:
186 self.assertTrue('msg_id' in rec.keys())
186 self.assertTrue('msg_id' in rec.keys())
187 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
187 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
188 for rec in found:
188 for rec in found:
189 self.assertTrue('msg_id' in rec.keys())
189 self.assertTrue('msg_id' in rec.keys())
190 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
190 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
191 for rec in found:
191 for rec in found:
192 self.assertTrue('msg_id' in rec.keys())
192 self.assertTrue('msg_id' in rec.keys())
193
193
194 def test_db_query_in(self):
194 def test_db_query_in(self):
195 """test db query with '$in','$nin' operators"""
195 """test db query with '$in','$nin' operators"""
196 hist = self.client.hub_history()
196 hist = self.client.hub_history()
197 even = hist[::2]
197 even = hist[::2]
198 odd = hist[1::2]
198 odd = hist[1::2]
199 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
199 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
200 found = [ r['msg_id'] for r in recs ]
200 found = [ r['msg_id'] for r in recs ]
201 self.assertEquals(set(even), set(found))
201 self.assertEquals(set(even), set(found))
202 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
202 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
203 found = [ r['msg_id'] for r in recs ]
203 found = [ r['msg_id'] for r in recs ]
204 self.assertEquals(set(odd), set(found))
204 self.assertEquals(set(odd), set(found))
205
205
206 def test_hub_history(self):
206 def test_hub_history(self):
207 hist = self.client.hub_history()
207 hist = self.client.hub_history()
208 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
208 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
209 recdict = {}
209 recdict = {}
210 for rec in recs:
210 for rec in recs:
211 recdict[rec['msg_id']] = rec
211 recdict[rec['msg_id']] = rec
212
212
213 latest = datetime(1984,1,1)
213 latest = datetime(1984,1,1)
214 for msg_id in hist:
214 for msg_id in hist:
215 rec = recdict[msg_id]
215 rec = recdict[msg_id]
216 newt = rec['submitted']
216 newt = rec['submitted']
217 self.assertTrue(newt >= latest)
217 self.assertTrue(newt >= latest)
218 latest = newt
218 latest = newt
219 ar = self.client[-1].apply_async(lambda : 1)
219 ar = self.client[-1].apply_async(lambda : 1)
220 ar.get()
220 ar.get()
221 time.sleep(0.25)
221 time.sleep(0.25)
222 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
222 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
223
223
224 def test_resubmit(self):
224 def test_resubmit(self):
225 def f():
225 def f():
226 import random
226 import random
227 return random.random()
227 return random.random()
228 v = self.client.load_balanced_view()
228 v = self.client.load_balanced_view()
229 ar = v.apply_async(f)
229 ar = v.apply_async(f)
230 r1 = ar.get(1)
230 r1 = ar.get(1)
231 ahr = self.client.resubmit(ar.msg_ids)
231 ahr = self.client.resubmit(ar.msg_ids)
232 r2 = ahr.get(1)
232 r2 = ahr.get(1)
233 self.assertFalse(r1 == r2)
233 self.assertFalse(r1 == r2)
234
234
235 def test_resubmit_inflight(self):
235 def test_resubmit_inflight(self):
236 """ensure ValueError on resubmit of inflight task"""
236 """ensure ValueError on resubmit of inflight task"""
237 v = self.client.load_balanced_view()
237 v = self.client.load_balanced_view()
238 ar = v.apply_async(time.sleep,1)
238 ar = v.apply_async(time.sleep,1)
239 # give the message a chance to arrive
239 # give the message a chance to arrive
240 time.sleep(0.2)
240 time.sleep(0.2)
241 self.assertRaisesRemote(ValueError, self.client.resubmit, ar.msg_ids)
241 self.assertRaisesRemote(ValueError, self.client.resubmit, ar.msg_ids)
242 ar.get(2)
242 ar.get(2)
243
243
244 def test_resubmit_badkey(self):
244 def test_resubmit_badkey(self):
245 """ensure KeyError on resubmit of nonexistant task"""
245 """ensure KeyError on resubmit of nonexistant task"""
246 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
246 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
247
247
248 def test_purge_results(self):
248 def test_purge_results(self):
249 # ensure there are some tasks
249 # ensure there are some tasks
250 for i in range(5):
250 for i in range(5):
251 self.client[:].apply_sync(lambda : 1)
251 self.client[:].apply_sync(lambda : 1)
252 # Wait for the Hub to realise the result is done:
253 # This prevents a race condition, where we
254 # might purge a result the Hub still thinks is pending.
255 time.sleep(0.1)
256 rc2 = clientmod.Client(profile='iptest')
252 hist = self.client.hub_history()
257 hist = self.client.hub_history()
258 ahr = rc2.get_result([hist[-1]])
259 ahr.wait(10)
253 self.client.purge_results(hist[-1])
260 self.client.purge_results(hist[-1])
254 newhist = self.client.hub_history()
261 newhist = self.client.hub_history()
255 self.assertEquals(len(newhist)+1,len(hist))
262 self.assertEquals(len(newhist)+1,len(hist))
256
263
257 def test_purge_all_results(self):
264 def test_purge_all_results(self):
258 self.client.purge_results('all')
265 self.client.purge_results('all')
259 hist = self.client.hub_history()
266 hist = self.client.hub_history()
260 self.assertEquals(len(hist), 0)
267 self.assertEquals(len(hist), 0)
261
268
General Comments 0
You need to be logged in to leave comments. Login now