##// END OF EJS Templates
prevent race condition in purge_results test...
MinRK -
Show More
@@ -1,261 +1,268 b''
1 1 """Tests for parallel client.py
2 2
3 3 Authors:
4 4
5 5 * Min RK
6 6 """
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 from __future__ import division
20 20
21 21 import time
22 22 from datetime import datetime
23 23 from tempfile import mktemp
24 24
25 25 import zmq
26 26
27 27 from IPython.parallel.client import client as clientmod
28 28 from IPython.parallel import error
29 29 from IPython.parallel import AsyncResult, AsyncHubResult
30 30 from IPython.parallel import LoadBalancedView, DirectView
31 31
32 32 from clienttest import ClusterTestCase, segfault, wait, add_engines
33 33
34 34 def setup():
35 35 add_engines(4)
36 36
37 37 class TestClient(ClusterTestCase):
38 38
39 39 def test_ids(self):
40 40 n = len(self.client.ids)
41 41 self.add_engines(3)
42 42 self.assertEquals(len(self.client.ids), n+3)
43 43
44 44 def test_view_indexing(self):
45 45 """test index access for views"""
46 46 self.add_engines(2)
47 47 targets = self.client._build_targets('all')[-1]
48 48 v = self.client[:]
49 49 self.assertEquals(v.targets, targets)
50 50 t = self.client.ids[2]
51 51 v = self.client[t]
52 52 self.assert_(isinstance(v, DirectView))
53 53 self.assertEquals(v.targets, t)
54 54 t = self.client.ids[2:4]
55 55 v = self.client[t]
56 56 self.assert_(isinstance(v, DirectView))
57 57 self.assertEquals(v.targets, t)
58 58 v = self.client[::2]
59 59 self.assert_(isinstance(v, DirectView))
60 60 self.assertEquals(v.targets, targets[::2])
61 61 v = self.client[1::3]
62 62 self.assert_(isinstance(v, DirectView))
63 63 self.assertEquals(v.targets, targets[1::3])
64 64 v = self.client[:-3]
65 65 self.assert_(isinstance(v, DirectView))
66 66 self.assertEquals(v.targets, targets[:-3])
67 67 v = self.client[-1]
68 68 self.assert_(isinstance(v, DirectView))
69 69 self.assertEquals(v.targets, targets[-1])
70 70 self.assertRaises(TypeError, lambda : self.client[None])
71 71
72 72 def test_lbview_targets(self):
73 73 """test load_balanced_view targets"""
74 74 v = self.client.load_balanced_view()
75 75 self.assertEquals(v.targets, None)
76 76 v = self.client.load_balanced_view(-1)
77 77 self.assertEquals(v.targets, [self.client.ids[-1]])
78 78 v = self.client.load_balanced_view('all')
79 79 self.assertEquals(v.targets, self.client.ids)
80 80
81 81 def test_targets(self):
82 82 """test various valid targets arguments"""
83 83 build = self.client._build_targets
84 84 ids = self.client.ids
85 85 idents,targets = build(None)
86 86 self.assertEquals(ids, targets)
87 87
88 88 def test_clear(self):
89 89 """test clear behavior"""
90 90 # self.add_engines(2)
91 91 v = self.client[:]
92 92 v.block=True
93 93 v.push(dict(a=5))
94 94 v.pull('a')
95 95 id0 = self.client.ids[-1]
96 96 self.client.clear(targets=id0, block=True)
97 97 a = self.client[:-1].get('a')
98 98 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
99 99 self.client.clear(block=True)
100 100 for i in self.client.ids:
101 101 # print i
102 102 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
103 103
104 104 def test_get_result(self):
105 105 """test getting results from the Hub."""
106 106 c = clientmod.Client(profile='iptest')
107 107 # self.add_engines(1)
108 108 t = c.ids[-1]
109 109 ar = c[t].apply_async(wait, 1)
110 110 # give the monitor time to notice the message
111 111 time.sleep(.25)
112 112 ahr = self.client.get_result(ar.msg_ids)
113 113 self.assertTrue(isinstance(ahr, AsyncHubResult))
114 114 self.assertEquals(ahr.get(), ar.get())
115 115 ar2 = self.client.get_result(ar.msg_ids)
116 116 self.assertFalse(isinstance(ar2, AsyncHubResult))
117 117 c.close()
118 118
119 119 def test_ids_list(self):
120 120 """test client.ids"""
121 121 # self.add_engines(2)
122 122 ids = self.client.ids
123 123 self.assertEquals(ids, self.client._ids)
124 124 self.assertFalse(ids is self.client._ids)
125 125 ids.remove(ids[-1])
126 126 self.assertNotEquals(ids, self.client._ids)
127 127
128 128 def test_queue_status(self):
129 129 # self.addEngine(4)
130 130 ids = self.client.ids
131 131 id0 = ids[0]
132 132 qs = self.client.queue_status(targets=id0)
133 133 self.assertTrue(isinstance(qs, dict))
134 134 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
135 135 allqs = self.client.queue_status()
136 136 self.assertTrue(isinstance(allqs, dict))
137 137 intkeys = list(allqs.keys())
138 138 intkeys.remove('unassigned')
139 139 self.assertEquals(sorted(intkeys), sorted(self.client.ids))
140 140 unassigned = allqs.pop('unassigned')
141 141 for eid,qs in allqs.items():
142 142 self.assertTrue(isinstance(qs, dict))
143 143 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
144 144
145 145 def test_shutdown(self):
146 146 # self.addEngine(4)
147 147 ids = self.client.ids
148 148 id0 = ids[0]
149 149 self.client.shutdown(id0, block=True)
150 150 while id0 in self.client.ids:
151 151 time.sleep(0.1)
152 152 self.client.spin()
153 153
154 154 self.assertRaises(IndexError, lambda : self.client[id0])
155 155
156 156 def test_result_status(self):
157 157 pass
158 158 # to be written
159 159
160 160 def test_db_query_dt(self):
161 161 """test db query by date"""
162 162 hist = self.client.hub_history()
163 163 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
164 164 tic = middle['submitted']
165 165 before = self.client.db_query({'submitted' : {'$lt' : tic}})
166 166 after = self.client.db_query({'submitted' : {'$gte' : tic}})
167 167 self.assertEquals(len(before)+len(after),len(hist))
168 168 for b in before:
169 169 self.assertTrue(b['submitted'] < tic)
170 170 for a in after:
171 171 self.assertTrue(a['submitted'] >= tic)
172 172 same = self.client.db_query({'submitted' : tic})
173 173 for s in same:
174 174 self.assertTrue(s['submitted'] == tic)
175 175
176 176 def test_db_query_keys(self):
177 177 """test extracting subset of record keys"""
178 178 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
179 179 for rec in found:
180 180 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
181 181
182 182 def test_db_query_msg_id(self):
183 183 """ensure msg_id is always in db queries"""
184 184 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
185 185 for rec in found:
186 186 self.assertTrue('msg_id' in rec.keys())
187 187 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
188 188 for rec in found:
189 189 self.assertTrue('msg_id' in rec.keys())
190 190 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
191 191 for rec in found:
192 192 self.assertTrue('msg_id' in rec.keys())
193 193
194 194 def test_db_query_in(self):
195 195 """test db query with '$in','$nin' operators"""
196 196 hist = self.client.hub_history()
197 197 even = hist[::2]
198 198 odd = hist[1::2]
199 199 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
200 200 found = [ r['msg_id'] for r in recs ]
201 201 self.assertEquals(set(even), set(found))
202 202 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
203 203 found = [ r['msg_id'] for r in recs ]
204 204 self.assertEquals(set(odd), set(found))
205 205
206 206 def test_hub_history(self):
207 207 hist = self.client.hub_history()
208 208 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
209 209 recdict = {}
210 210 for rec in recs:
211 211 recdict[rec['msg_id']] = rec
212 212
213 213 latest = datetime(1984,1,1)
214 214 for msg_id in hist:
215 215 rec = recdict[msg_id]
216 216 newt = rec['submitted']
217 217 self.assertTrue(newt >= latest)
218 218 latest = newt
219 219 ar = self.client[-1].apply_async(lambda : 1)
220 220 ar.get()
221 221 time.sleep(0.25)
222 222 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
223 223
224 224 def test_resubmit(self):
225 225 def f():
226 226 import random
227 227 return random.random()
228 228 v = self.client.load_balanced_view()
229 229 ar = v.apply_async(f)
230 230 r1 = ar.get(1)
231 231 ahr = self.client.resubmit(ar.msg_ids)
232 232 r2 = ahr.get(1)
233 233 self.assertFalse(r1 == r2)
234 234
235 235 def test_resubmit_inflight(self):
236 236 """ensure ValueError on resubmit of inflight task"""
237 237 v = self.client.load_balanced_view()
238 238 ar = v.apply_async(time.sleep,1)
239 239 # give the message a chance to arrive
240 240 time.sleep(0.2)
241 241 self.assertRaisesRemote(ValueError, self.client.resubmit, ar.msg_ids)
242 242 ar.get(2)
243 243
244 244 def test_resubmit_badkey(self):
245 245 """ensure KeyError on resubmit of nonexistant task"""
246 246 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
247 247
248 248 def test_purge_results(self):
249 249 # ensure there are some tasks
250 250 for i in range(5):
251 251 self.client[:].apply_sync(lambda : 1)
252 # Wait for the Hub to realise the result is done:
253 # This prevents a race condition, where we
254 # might purge a result the Hub still thinks is pending.
255 time.sleep(0.1)
256 rc2 = clientmod.Client(profile='iptest')
252 257 hist = self.client.hub_history()
258 ahr = rc2.get_result([hist[-1]])
259 ahr.wait(10)
253 260 self.client.purge_results(hist[-1])
254 261 newhist = self.client.hub_history()
255 262 self.assertEquals(len(newhist)+1,len(hist))
256 263
257 264 def test_purge_all_results(self):
258 265 self.client.purge_results('all')
259 266 hist = self.client.hub_history()
260 267 self.assertEquals(len(hist), 0)
261 268
General Comments 0
You need to be logged in to leave comments. Login now