##// END OF EJS Templates
test chained resubmissions
MinRK -
Show More
@@ -1,408 +1,422 b''
1 1 """Tests for parallel client.py
2 2
3 3 Authors:
4 4
5 5 * Min RK
6 6 """
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 from __future__ import division
20 20
21 21 import time
22 22 from datetime import datetime
23 23 from tempfile import mktemp
24 24
25 25 import zmq
26 26
27 27 from IPython.parallel.client import client as clientmod
28 28 from IPython.parallel import error
29 29 from IPython.parallel import AsyncResult, AsyncHubResult
30 30 from IPython.parallel import LoadBalancedView, DirectView
31 31
32 32 from clienttest import ClusterTestCase, segfault, wait, add_engines
33 33
34 34 def setup():
35 35 add_engines(4, total=True)
36 36
37 37 class TestClient(ClusterTestCase):
38 38
39 39 def test_ids(self):
40 40 n = len(self.client.ids)
41 41 self.add_engines(2)
42 42 self.assertEquals(len(self.client.ids), n+2)
43 43
44 44 def test_view_indexing(self):
45 45 """test index access for views"""
46 46 self.minimum_engines(4)
47 47 targets = self.client._build_targets('all')[-1]
48 48 v = self.client[:]
49 49 self.assertEquals(v.targets, targets)
50 50 t = self.client.ids[2]
51 51 v = self.client[t]
52 52 self.assert_(isinstance(v, DirectView))
53 53 self.assertEquals(v.targets, t)
54 54 t = self.client.ids[2:4]
55 55 v = self.client[t]
56 56 self.assert_(isinstance(v, DirectView))
57 57 self.assertEquals(v.targets, t)
58 58 v = self.client[::2]
59 59 self.assert_(isinstance(v, DirectView))
60 60 self.assertEquals(v.targets, targets[::2])
61 61 v = self.client[1::3]
62 62 self.assert_(isinstance(v, DirectView))
63 63 self.assertEquals(v.targets, targets[1::3])
64 64 v = self.client[:-3]
65 65 self.assert_(isinstance(v, DirectView))
66 66 self.assertEquals(v.targets, targets[:-3])
67 67 v = self.client[-1]
68 68 self.assert_(isinstance(v, DirectView))
69 69 self.assertEquals(v.targets, targets[-1])
70 70 self.assertRaises(TypeError, lambda : self.client[None])
71 71
72 72 def test_lbview_targets(self):
73 73 """test load_balanced_view targets"""
74 74 v = self.client.load_balanced_view()
75 75 self.assertEquals(v.targets, None)
76 76 v = self.client.load_balanced_view(-1)
77 77 self.assertEquals(v.targets, [self.client.ids[-1]])
78 78 v = self.client.load_balanced_view('all')
79 79 self.assertEquals(v.targets, None)
80 80
81 81 def test_dview_targets(self):
82 82 """test direct_view targets"""
83 83 v = self.client.direct_view()
84 84 self.assertEquals(v.targets, 'all')
85 85 v = self.client.direct_view('all')
86 86 self.assertEquals(v.targets, 'all')
87 87 v = self.client.direct_view(-1)
88 88 self.assertEquals(v.targets, self.client.ids[-1])
89 89
90 90 def test_lazy_all_targets(self):
91 91 """test lazy evaluation of rc.direct_view('all')"""
92 92 v = self.client.direct_view()
93 93 self.assertEquals(v.targets, 'all')
94 94
95 95 def double(x):
96 96 return x*2
97 97 seq = range(100)
98 98 ref = [ double(x) for x in seq ]
99 99
100 100 # add some engines, which should be used
101 101 self.add_engines(1)
102 102 n1 = len(self.client.ids)
103 103
104 104 # simple apply
105 105 r = v.apply_sync(lambda : 1)
106 106 self.assertEquals(r, [1] * n1)
107 107
108 108 # map goes through remotefunction
109 109 r = v.map_sync(double, seq)
110 110 self.assertEquals(r, ref)
111 111
112 112 # add a couple more engines, and try again
113 113 self.add_engines(2)
114 114 n2 = len(self.client.ids)
115 115 self.assertNotEquals(n2, n1)
116 116
117 117 # apply
118 118 r = v.apply_sync(lambda : 1)
119 119 self.assertEquals(r, [1] * n2)
120 120
121 121 # map
122 122 r = v.map_sync(double, seq)
123 123 self.assertEquals(r, ref)
124 124
125 125 def test_targets(self):
126 126 """test various valid targets arguments"""
127 127 build = self.client._build_targets
128 128 ids = self.client.ids
129 129 idents,targets = build(None)
130 130 self.assertEquals(ids, targets)
131 131
132 132 def test_clear(self):
133 133 """test clear behavior"""
134 134 self.minimum_engines(2)
135 135 v = self.client[:]
136 136 v.block=True
137 137 v.push(dict(a=5))
138 138 v.pull('a')
139 139 id0 = self.client.ids[-1]
140 140 self.client.clear(targets=id0, block=True)
141 141 a = self.client[:-1].get('a')
142 142 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
143 143 self.client.clear(block=True)
144 144 for i in self.client.ids:
145 145 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
146 146
147 147 def test_get_result(self):
148 148 """test getting results from the Hub."""
149 149 c = clientmod.Client(profile='iptest')
150 150 t = c.ids[-1]
151 151 ar = c[t].apply_async(wait, 1)
152 152 # give the monitor time to notice the message
153 153 time.sleep(.25)
154 154 ahr = self.client.get_result(ar.msg_ids)
155 155 self.assertTrue(isinstance(ahr, AsyncHubResult))
156 156 self.assertEquals(ahr.get(), ar.get())
157 157 ar2 = self.client.get_result(ar.msg_ids)
158 158 self.assertFalse(isinstance(ar2, AsyncHubResult))
159 159 c.close()
160 160
161 161 def test_ids_list(self):
162 162 """test client.ids"""
163 163 ids = self.client.ids
164 164 self.assertEquals(ids, self.client._ids)
165 165 self.assertFalse(ids is self.client._ids)
166 166 ids.remove(ids[-1])
167 167 self.assertNotEquals(ids, self.client._ids)
168 168
169 169 def test_queue_status(self):
170 170 ids = self.client.ids
171 171 id0 = ids[0]
172 172 qs = self.client.queue_status(targets=id0)
173 173 self.assertTrue(isinstance(qs, dict))
174 174 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
175 175 allqs = self.client.queue_status()
176 176 self.assertTrue(isinstance(allqs, dict))
177 177 intkeys = list(allqs.keys())
178 178 intkeys.remove('unassigned')
179 179 self.assertEquals(sorted(intkeys), sorted(self.client.ids))
180 180 unassigned = allqs.pop('unassigned')
181 181 for eid,qs in allqs.items():
182 182 self.assertTrue(isinstance(qs, dict))
183 183 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
184 184
185 185 def test_shutdown(self):
186 186 ids = self.client.ids
187 187 id0 = ids[0]
188 188 self.client.shutdown(id0, block=True)
189 189 while id0 in self.client.ids:
190 190 time.sleep(0.1)
191 191 self.client.spin()
192 192
193 193 self.assertRaises(IndexError, lambda : self.client[id0])
194 194
195 195 def test_result_status(self):
196 196 pass
197 197 # to be written
198 198
199 199 def test_db_query_dt(self):
200 200 """test db query by date"""
201 201 hist = self.client.hub_history()
202 202 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
203 203 tic = middle['submitted']
204 204 before = self.client.db_query({'submitted' : {'$lt' : tic}})
205 205 after = self.client.db_query({'submitted' : {'$gte' : tic}})
206 206 self.assertEquals(len(before)+len(after),len(hist))
207 207 for b in before:
208 208 self.assertTrue(b['submitted'] < tic)
209 209 for a in after:
210 210 self.assertTrue(a['submitted'] >= tic)
211 211 same = self.client.db_query({'submitted' : tic})
212 212 for s in same:
213 213 self.assertTrue(s['submitted'] == tic)
214 214
215 215 def test_db_query_keys(self):
216 216 """test extracting subset of record keys"""
217 217 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
218 218 for rec in found:
219 219 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
220 220
221 221 def test_db_query_default_keys(self):
222 222 """default db_query excludes buffers"""
223 223 found = self.client.db_query({'msg_id': {'$ne' : ''}})
224 224 for rec in found:
225 225 keys = set(rec.keys())
226 226 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
227 227 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
228 228
229 229 def test_db_query_msg_id(self):
230 230 """ensure msg_id is always in db queries"""
231 231 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
232 232 for rec in found:
233 233 self.assertTrue('msg_id' in rec.keys())
234 234 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
235 235 for rec in found:
236 236 self.assertTrue('msg_id' in rec.keys())
237 237 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
238 238 for rec in found:
239 239 self.assertTrue('msg_id' in rec.keys())
240 240
241 241 def test_db_query_get_result(self):
242 242 """pop in db_query shouldn't pop from result itself"""
243 243 self.client[:].apply_sync(lambda : 1)
244 244 found = self.client.db_query({'msg_id': {'$ne' : ''}})
245 245 rc2 = clientmod.Client(profile='iptest')
246 246 # If this bug is not fixed, this call will hang:
247 247 ar = rc2.get_result(self.client.history[-1])
248 248 ar.wait(2)
249 249 self.assertTrue(ar.ready())
250 250 ar.get()
251 251 rc2.close()
252 252
253 253 def test_db_query_in(self):
254 254 """test db query with '$in','$nin' operators"""
255 255 hist = self.client.hub_history()
256 256 even = hist[::2]
257 257 odd = hist[1::2]
258 258 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
259 259 found = [ r['msg_id'] for r in recs ]
260 260 self.assertEquals(set(even), set(found))
261 261 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
262 262 found = [ r['msg_id'] for r in recs ]
263 263 self.assertEquals(set(odd), set(found))
264 264
265 265 def test_hub_history(self):
266 266 hist = self.client.hub_history()
267 267 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
268 268 recdict = {}
269 269 for rec in recs:
270 270 recdict[rec['msg_id']] = rec
271 271
272 272 latest = datetime(1984,1,1)
273 273 for msg_id in hist:
274 274 rec = recdict[msg_id]
275 275 newt = rec['submitted']
276 276 self.assertTrue(newt >= latest)
277 277 latest = newt
278 278 ar = self.client[-1].apply_async(lambda : 1)
279 279 ar.get()
280 280 time.sleep(0.25)
281 281 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
282 282
283 283 def _wait_for_idle(self):
284 284 """wait for an engine to become idle, according to the Hub"""
285 285 rc = self.client
286 286
287 287 # timeout 2s, polling every 100ms
288 288 for i in range(20):
289 289 qs = rc.queue_status()
290 290 if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids):
291 291 time.sleep(0.1)
292 292 else:
293 293 break
294 294
295 295 # ensure Hub up to date:
296 296 qs = rc.queue_status()
297 297 self.assertEquals(qs['unassigned'], 0)
298 298 for eid in rc.ids:
299 299 self.assertEquals(qs[eid]['tasks'], 0)
300 300
301 301
302 302 def test_resubmit(self):
303 303 def f():
304 304 import random
305 305 return random.random()
306 306 v = self.client.load_balanced_view()
307 307 ar = v.apply_async(f)
308 308 r1 = ar.get(1)
309 309 # give the Hub a chance to notice:
310 310 self._wait_for_idle()
311 311 ahr = self.client.resubmit(ar.msg_ids)
312 312 r2 = ahr.get(1)
313 313 self.assertFalse(r1 == r2)
314 314
315 def test_resubmit_chain(self):
316 """resubmit resubmitted tasks"""
317 v = self.client.load_balanced_view()
318 ar = v.apply_async(lambda x: x, 'x'*1024)
319 ar.get()
320 self._wait_for_idle()
321 ars = [ar]
322
323 for i in range(10):
324 ar = ars[-1]
325 ar2 = self.client.resubmit(ar.msg_ids)
326
327 [ ar.get() for ar in ars ]
328
315 329 def test_resubmit_header(self):
316 330 """resubmit shouldn't clobber the whole header"""
317 331 def f():
318 332 import random
319 333 return random.random()
320 334 v = self.client.load_balanced_view()
321 335 v.retries = 1
322 336 ar = v.apply_async(f)
323 337 r1 = ar.get(1)
324 338 # give the Hub a chance to notice:
325 339 self._wait_for_idle()
326 340 ahr = self.client.resubmit(ar.msg_ids)
327 341 ahr.get(1)
328 342 time.sleep(0.5)
329 343 records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header')
330 344 h1,h2 = [ r['header'] for r in records ]
331 345 for key in set(h1.keys()).union(set(h2.keys())):
332 346 if key in ('msg_id', 'date'):
333 347 self.assertNotEquals(h1[key], h2[key])
334 348 else:
335 349 self.assertEquals(h1[key], h2[key])
336 350
337 351 def test_resubmit_aborted(self):
338 352 def f():
339 353 import random
340 354 return random.random()
341 355 v = self.client.load_balanced_view()
342 356 # restrict to one engine, so we can put a sleep
343 357 # ahead of the task, so it will get aborted
344 358 eid = self.client.ids[-1]
345 359 v.targets = [eid]
346 360 sleep = v.apply_async(time.sleep, 0.5)
347 361 ar = v.apply_async(f)
348 362 ar.abort()
349 363 self.assertRaises(error.TaskAborted, ar.get)
350 364 # Give the Hub a chance to get up to date:
351 365 self._wait_for_idle()
352 366 ahr = self.client.resubmit(ar.msg_ids)
353 367 r2 = ahr.get(1)
354 368
355 369 def test_resubmit_inflight(self):
356 370 """resubmit of inflight task"""
357 371 v = self.client.load_balanced_view()
358 372 ar = v.apply_async(time.sleep,1)
359 373 # give the message a chance to arrive
360 374 time.sleep(0.2)
361 375 ahr = self.client.resubmit(ar.msg_ids)
362 376 ar.get(2)
363 377 ahr.get(2)
364 378
365 379 def test_resubmit_badkey(self):
366 380 """ensure KeyError on resubmit of nonexistant task"""
367 381 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
368 382
369 383 def test_purge_results(self):
370 384 # ensure there are some tasks
371 385 for i in range(5):
372 386 self.client[:].apply_sync(lambda : 1)
373 387 # Wait for the Hub to realise the result is done:
374 388 # This prevents a race condition, where we
375 389 # might purge a result the Hub still thinks is pending.
376 390 time.sleep(0.1)
377 391 rc2 = clientmod.Client(profile='iptest')
378 392 hist = self.client.hub_history()
379 393 ahr = rc2.get_result([hist[-1]])
380 394 ahr.wait(10)
381 395 self.client.purge_results(hist[-1])
382 396 newhist = self.client.hub_history()
383 397 self.assertEquals(len(newhist)+1,len(hist))
384 398 rc2.spin()
385 399 rc2.close()
386 400
387 401 def test_purge_all_results(self):
388 402 self.client.purge_results('all')
389 403 hist = self.client.hub_history()
390 404 self.assertEquals(len(hist), 0)
391 405
392 406 def test_spin_thread(self):
393 407 self.client.spin_thread(0.01)
394 408 ar = self.client[-1].apply_async(lambda : 1)
395 409 time.sleep(0.1)
396 410 self.assertTrue(ar.wall_time < 0.1,
397 411 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
398 412 )
399 413
400 414 def test_stop_spin_thread(self):
401 415 self.client.spin_thread(0.01)
402 416 self.client.stop_spin_thread()
403 417 ar = self.client[-1].apply_async(lambda : 1)
404 418 time.sleep(0.15)
405 419 self.assertTrue(ar.wall_time > 0.1,
406 420 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
407 421 )
408 422
General Comments 0
You need to be logged in to leave comments. Login now