##// END OF EJS Templates
Add Testcases for new purging methods in parallel.Client...
Jan Schulz -
Show More
@@ -1,455 +1,502 b''
1 1 """Tests for parallel client.py
2 2
3 3 Authors:
4 4
5 5 * Min RK
6 6 """
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 from __future__ import division
20 20
21 21 import time
22 22 from datetime import datetime
23 23 from tempfile import mktemp
24 24
25 25 import zmq
26 26
27 27 from IPython import parallel
28 28 from IPython.parallel.client import client as clientmod
29 29 from IPython.parallel import error
30 30 from IPython.parallel import AsyncResult, AsyncHubResult
31 31 from IPython.parallel import LoadBalancedView, DirectView
32 32
33 33 from clienttest import ClusterTestCase, segfault, wait, add_engines
34 34
35 35 def setup():
36 36 add_engines(4, total=True)
37 37
38 38 class TestClient(ClusterTestCase):
39 39
40 40 def test_ids(self):
41 41 n = len(self.client.ids)
42 42 self.add_engines(2)
43 43 self.assertEqual(len(self.client.ids), n+2)
44 44
45 45 def test_view_indexing(self):
46 46 """test index access for views"""
47 47 self.minimum_engines(4)
48 48 targets = self.client._build_targets('all')[-1]
49 49 v = self.client[:]
50 50 self.assertEqual(v.targets, targets)
51 51 t = self.client.ids[2]
52 52 v = self.client[t]
53 53 self.assertTrue(isinstance(v, DirectView))
54 54 self.assertEqual(v.targets, t)
55 55 t = self.client.ids[2:4]
56 56 v = self.client[t]
57 57 self.assertTrue(isinstance(v, DirectView))
58 58 self.assertEqual(v.targets, t)
59 59 v = self.client[::2]
60 60 self.assertTrue(isinstance(v, DirectView))
61 61 self.assertEqual(v.targets, targets[::2])
62 62 v = self.client[1::3]
63 63 self.assertTrue(isinstance(v, DirectView))
64 64 self.assertEqual(v.targets, targets[1::3])
65 65 v = self.client[:-3]
66 66 self.assertTrue(isinstance(v, DirectView))
67 67 self.assertEqual(v.targets, targets[:-3])
68 68 v = self.client[-1]
69 69 self.assertTrue(isinstance(v, DirectView))
70 70 self.assertEqual(v.targets, targets[-1])
71 71 self.assertRaises(TypeError, lambda : self.client[None])
72 72
73 73 def test_lbview_targets(self):
74 74 """test load_balanced_view targets"""
75 75 v = self.client.load_balanced_view()
76 76 self.assertEqual(v.targets, None)
77 77 v = self.client.load_balanced_view(-1)
78 78 self.assertEqual(v.targets, [self.client.ids[-1]])
79 79 v = self.client.load_balanced_view('all')
80 80 self.assertEqual(v.targets, None)
81 81
82 82 def test_dview_targets(self):
83 83 """test direct_view targets"""
84 84 v = self.client.direct_view()
85 85 self.assertEqual(v.targets, 'all')
86 86 v = self.client.direct_view('all')
87 87 self.assertEqual(v.targets, 'all')
88 88 v = self.client.direct_view(-1)
89 89 self.assertEqual(v.targets, self.client.ids[-1])
90 90
91 91 def test_lazy_all_targets(self):
92 92 """test lazy evaluation of rc.direct_view('all')"""
93 93 v = self.client.direct_view()
94 94 self.assertEqual(v.targets, 'all')
95 95
96 96 def double(x):
97 97 return x*2
98 98 seq = range(100)
99 99 ref = [ double(x) for x in seq ]
100 100
101 101 # add some engines, which should be used
102 102 self.add_engines(1)
103 103 n1 = len(self.client.ids)
104 104
105 105 # simple apply
106 106 r = v.apply_sync(lambda : 1)
107 107 self.assertEqual(r, [1] * n1)
108 108
109 109 # map goes through remotefunction
110 110 r = v.map_sync(double, seq)
111 111 self.assertEqual(r, ref)
112 112
113 113 # add a couple more engines, and try again
114 114 self.add_engines(2)
115 115 n2 = len(self.client.ids)
116 116 self.assertNotEqual(n2, n1)
117 117
118 118 # apply
119 119 r = v.apply_sync(lambda : 1)
120 120 self.assertEqual(r, [1] * n2)
121 121
122 122 # map
123 123 r = v.map_sync(double, seq)
124 124 self.assertEqual(r, ref)
125 125
126 126 def test_targets(self):
127 127 """test various valid targets arguments"""
128 128 build = self.client._build_targets
129 129 ids = self.client.ids
130 130 idents,targets = build(None)
131 131 self.assertEqual(ids, targets)
132 132
133 133 def test_clear(self):
134 134 """test clear behavior"""
135 135 self.minimum_engines(2)
136 136 v = self.client[:]
137 137 v.block=True
138 138 v.push(dict(a=5))
139 139 v.pull('a')
140 140 id0 = self.client.ids[-1]
141 141 self.client.clear(targets=id0, block=True)
142 142 a = self.client[:-1].get('a')
143 143 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
144 144 self.client.clear(block=True)
145 145 for i in self.client.ids:
146 146 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
147 147
148 148 def test_get_result(self):
149 149 """test getting results from the Hub."""
150 150 c = clientmod.Client(profile='iptest')
151 151 t = c.ids[-1]
152 152 ar = c[t].apply_async(wait, 1)
153 153 # give the monitor time to notice the message
154 154 time.sleep(.25)
155 155 ahr = self.client.get_result(ar.msg_ids)
156 156 self.assertTrue(isinstance(ahr, AsyncHubResult))
157 157 self.assertEqual(ahr.get(), ar.get())
158 158 ar2 = self.client.get_result(ar.msg_ids)
159 159 self.assertFalse(isinstance(ar2, AsyncHubResult))
160 160 c.close()
161 161
162 162 def test_get_execute_result(self):
163 163 """test getting execute results from the Hub."""
164 164 c = clientmod.Client(profile='iptest')
165 165 t = c.ids[-1]
166 166 cell = '\n'.join([
167 167 'import time',
168 168 'time.sleep(0.25)',
169 169 '5'
170 170 ])
171 171 ar = c[t].execute("import time; time.sleep(1)", silent=False)
172 172 # give the monitor time to notice the message
173 173 time.sleep(.25)
174 174 ahr = self.client.get_result(ar.msg_ids)
175 175 self.assertTrue(isinstance(ahr, AsyncHubResult))
176 176 self.assertEqual(ahr.get().pyout, ar.get().pyout)
177 177 ar2 = self.client.get_result(ar.msg_ids)
178 178 self.assertFalse(isinstance(ar2, AsyncHubResult))
179 179 c.close()
180 180
181 181 def test_ids_list(self):
182 182 """test client.ids"""
183 183 ids = self.client.ids
184 184 self.assertEqual(ids, self.client._ids)
185 185 self.assertFalse(ids is self.client._ids)
186 186 ids.remove(ids[-1])
187 187 self.assertNotEqual(ids, self.client._ids)
188 188
189 189 def test_queue_status(self):
190 190 ids = self.client.ids
191 191 id0 = ids[0]
192 192 qs = self.client.queue_status(targets=id0)
193 193 self.assertTrue(isinstance(qs, dict))
194 194 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
195 195 allqs = self.client.queue_status()
196 196 self.assertTrue(isinstance(allqs, dict))
197 197 intkeys = list(allqs.keys())
198 198 intkeys.remove('unassigned')
199 199 self.assertEqual(sorted(intkeys), sorted(self.client.ids))
200 200 unassigned = allqs.pop('unassigned')
201 201 for eid,qs in allqs.items():
202 202 self.assertTrue(isinstance(qs, dict))
203 203 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
204 204
205 205 def test_shutdown(self):
206 206 ids = self.client.ids
207 207 id0 = ids[0]
208 208 self.client.shutdown(id0, block=True)
209 209 while id0 in self.client.ids:
210 210 time.sleep(0.1)
211 211 self.client.spin()
212 212
213 213 self.assertRaises(IndexError, lambda : self.client[id0])
214 214
215 215 def test_result_status(self):
216 216 pass
217 217 # to be written
218 218
219 219 def test_db_query_dt(self):
220 220 """test db query by date"""
221 221 hist = self.client.hub_history()
222 222 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
223 223 tic = middle['submitted']
224 224 before = self.client.db_query({'submitted' : {'$lt' : tic}})
225 225 after = self.client.db_query({'submitted' : {'$gte' : tic}})
226 226 self.assertEqual(len(before)+len(after),len(hist))
227 227 for b in before:
228 228 self.assertTrue(b['submitted'] < tic)
229 229 for a in after:
230 230 self.assertTrue(a['submitted'] >= tic)
231 231 same = self.client.db_query({'submitted' : tic})
232 232 for s in same:
233 233 self.assertTrue(s['submitted'] == tic)
234 234
235 235 def test_db_query_keys(self):
236 236 """test extracting subset of record keys"""
237 237 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
238 238 for rec in found:
239 239 self.assertEqual(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
240 240
241 241 def test_db_query_default_keys(self):
242 242 """default db_query excludes buffers"""
243 243 found = self.client.db_query({'msg_id': {'$ne' : ''}})
244 244 for rec in found:
245 245 keys = set(rec.keys())
246 246 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
247 247 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
248 248
249 249 def test_db_query_msg_id(self):
250 250 """ensure msg_id is always in db queries"""
251 251 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
252 252 for rec in found:
253 253 self.assertTrue('msg_id' in rec.keys())
254 254 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
255 255 for rec in found:
256 256 self.assertTrue('msg_id' in rec.keys())
257 257 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
258 258 for rec in found:
259 259 self.assertTrue('msg_id' in rec.keys())
260 260
261 261 def test_db_query_get_result(self):
262 262 """pop in db_query shouldn't pop from result itself"""
263 263 self.client[:].apply_sync(lambda : 1)
264 264 found = self.client.db_query({'msg_id': {'$ne' : ''}})
265 265 rc2 = clientmod.Client(profile='iptest')
266 266 # If this bug is not fixed, this call will hang:
267 267 ar = rc2.get_result(self.client.history[-1])
268 268 ar.wait(2)
269 269 self.assertTrue(ar.ready())
270 270 ar.get()
271 271 rc2.close()
272 272
273 273 def test_db_query_in(self):
274 274 """test db query with '$in','$nin' operators"""
275 275 hist = self.client.hub_history()
276 276 even = hist[::2]
277 277 odd = hist[1::2]
278 278 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
279 279 found = [ r['msg_id'] for r in recs ]
280 280 self.assertEqual(set(even), set(found))
281 281 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
282 282 found = [ r['msg_id'] for r in recs ]
283 283 self.assertEqual(set(odd), set(found))
284 284
285 285 def test_hub_history(self):
286 286 hist = self.client.hub_history()
287 287 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
288 288 recdict = {}
289 289 for rec in recs:
290 290 recdict[rec['msg_id']] = rec
291 291
292 292 latest = datetime(1984,1,1)
293 293 for msg_id in hist:
294 294 rec = recdict[msg_id]
295 295 newt = rec['submitted']
296 296 self.assertTrue(newt >= latest)
297 297 latest = newt
298 298 ar = self.client[-1].apply_async(lambda : 1)
299 299 ar.get()
300 300 time.sleep(0.25)
301 301 self.assertEqual(self.client.hub_history()[-1:],ar.msg_ids)
302 302
303 303 def _wait_for_idle(self):
304 304 """wait for an engine to become idle, according to the Hub"""
305 305 rc = self.client
306 306
307 307 # timeout 5s, polling every 100ms
308 308 qs = rc.queue_status()
309 309 for i in range(50):
310 310 if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids):
311 311 time.sleep(0.1)
312 312 qs = rc.queue_status()
313 313 else:
314 314 break
315 315
316 316 # ensure Hub up to date:
317 317 self.assertEqual(qs['unassigned'], 0)
318 318 for eid in rc.ids:
319 319 self.assertEqual(qs[eid]['tasks'], 0)
320 320
321 321
322 322 def test_resubmit(self):
323 323 def f():
324 324 import random
325 325 return random.random()
326 326 v = self.client.load_balanced_view()
327 327 ar = v.apply_async(f)
328 328 r1 = ar.get(1)
329 329 # give the Hub a chance to notice:
330 330 self._wait_for_idle()
331 331 ahr = self.client.resubmit(ar.msg_ids)
332 332 r2 = ahr.get(1)
333 333 self.assertFalse(r1 == r2)
334 334
335 335 def test_resubmit_chain(self):
336 336 """resubmit resubmitted tasks"""
337 337 v = self.client.load_balanced_view()
338 338 ar = v.apply_async(lambda x: x, 'x'*1024)
339 339 ar.get()
340 340 self._wait_for_idle()
341 341 ars = [ar]
342 342
343 343 for i in range(10):
344 344 ar = ars[-1]
345 345 ar2 = self.client.resubmit(ar.msg_ids)
346 346
347 347 [ ar.get() for ar in ars ]
348 348
349 349 def test_resubmit_header(self):
350 350 """resubmit shouldn't clobber the whole header"""
351 351 def f():
352 352 import random
353 353 return random.random()
354 354 v = self.client.load_balanced_view()
355 355 v.retries = 1
356 356 ar = v.apply_async(f)
357 357 r1 = ar.get(1)
358 358 # give the Hub a chance to notice:
359 359 self._wait_for_idle()
360 360 ahr = self.client.resubmit(ar.msg_ids)
361 361 ahr.get(1)
362 362 time.sleep(0.5)
363 363 records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header')
364 364 h1,h2 = [ r['header'] for r in records ]
365 365 for key in set(h1.keys()).union(set(h2.keys())):
366 366 if key in ('msg_id', 'date'):
367 367 self.assertNotEqual(h1[key], h2[key])
368 368 else:
369 369 self.assertEqual(h1[key], h2[key])
370 370
371 371 def test_resubmit_aborted(self):
372 372 def f():
373 373 import random
374 374 return random.random()
375 375 v = self.client.load_balanced_view()
376 376 # restrict to one engine, so we can put a sleep
377 377 # ahead of the task, so it will get aborted
378 378 eid = self.client.ids[-1]
379 379 v.targets = [eid]
380 380 sleep = v.apply_async(time.sleep, 0.5)
381 381 ar = v.apply_async(f)
382 382 ar.abort()
383 383 self.assertRaises(error.TaskAborted, ar.get)
384 384 # Give the Hub a chance to get up to date:
385 385 self._wait_for_idle()
386 386 ahr = self.client.resubmit(ar.msg_ids)
387 387 r2 = ahr.get(1)
388 388
389 389 def test_resubmit_inflight(self):
390 390 """resubmit of inflight task"""
391 391 v = self.client.load_balanced_view()
392 392 ar = v.apply_async(time.sleep,1)
393 393 # give the message a chance to arrive
394 394 time.sleep(0.2)
395 395 ahr = self.client.resubmit(ar.msg_ids)
396 396 ar.get(2)
397 397 ahr.get(2)
398 398
399 399 def test_resubmit_badkey(self):
400 400 """ensure KeyError on resubmit of nonexistant task"""
401 401 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
402 402
403 def test_purge_results(self):
403 def test_purge_hub_results(self):
404 404 # ensure there are some tasks
405 405 for i in range(5):
406 406 self.client[:].apply_sync(lambda : 1)
407 407 # Wait for the Hub to realise the result is done:
408 408 # This prevents a race condition, where we
409 409 # might purge a result the Hub still thinks is pending.
410 410 time.sleep(0.1)
411 411 rc2 = clientmod.Client(profile='iptest')
412 412 hist = self.client.hub_history()
413 413 ahr = rc2.get_result([hist[-1]])
414 414 ahr.wait(10)
415 self.client.purge_results(hist[-1])
415 self.client.purge_hub_results(hist[-1])
416 416 newhist = self.client.hub_history()
417 417 self.assertEqual(len(newhist)+1,len(hist))
418 418 rc2.spin()
419 419 rc2.close()
420 420
421 def test_purge_local_results(self):
422 # ensure there are some tasks
423 res = []
424 for i in range(5):
425 res.append(self.client[:].apply_async(lambda : 1))
426 time.sleep(0.1)
427 self.client.wait(10) # wait for the results to come back
428 before = len(self.client.results)
429 self.assertEqual(len(self.client.metadata),before)
430 self.client.purge_local_results(res[-1])
431 self.assertEqual(len(self.client.results),before-len(res[-1]), msg="Not removed from results")
432 self.assertEqual(len(self.client.metadata),before-len(res[-1]), msg="Not removed from metadata")
433
434 def test_purge_all_hub_results(self):
435 self.client.purge_hub_results('all')
436 hist = self.client.hub_history()
437 self.assertEqual(len(hist), 0)
438
439 def test_purge_all_local_results(self):
440 self.client.purge_local_results('all')
441 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
442 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
443
421 444 def test_purge_all_results(self):
445 # ensure there are some tasks
446 for i in range(5):
447 self.client[:].apply_sync(lambda : 1)
448 self.client.wait(10)
422 449 self.client.purge_results('all')
450 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
451 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
452 time.sleep(0.1)
453 hist = self.client.hub_history()#
454 self.assertEqual(len(hist), 0, msg="hub history not empty")
455
456 def test_purge_everything(self):
457 # ensure there are some tasks
458 for i in range(5):
459 self.client[:].apply_sync(lambda : 1)
460 self.client.wait(10)
461 self.client.purge_everything()
462 # The client results
463 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
464 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
465 # the hub results
423 466 hist = self.client.hub_history()
424 self.assertEqual(len(hist), 0)
467 self.assertEqual(len(hist), 0, msg="hub history not empty")
468 # The client "bookkeeping"
469 self.assertEqual(len(self.client.session.digest_history), 0, msg="session digest not empty")
470 self.assertEqual(len(self.client.history), 0, msg="client history not empty")
471
425 472
426 473 def test_spin_thread(self):
427 474 self.client.spin_thread(0.01)
428 475 ar = self.client[-1].apply_async(lambda : 1)
429 476 time.sleep(0.1)
430 477 self.assertTrue(ar.wall_time < 0.1,
431 478 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
432 479 )
433 480
434 481 def test_stop_spin_thread(self):
435 482 self.client.spin_thread(0.01)
436 483 self.client.stop_spin_thread()
437 484 ar = self.client[-1].apply_async(lambda : 1)
438 485 time.sleep(0.15)
439 486 self.assertTrue(ar.wall_time > 0.1,
440 487 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
441 488 )
442 489
443 490 def test_activate(self):
444 491 ip = get_ipython()
445 492 magics = ip.magics_manager.magics
446 493 self.assertTrue('px' in magics['line'])
447 494 self.assertTrue('px' in magics['cell'])
448 495 v0 = self.client.activate(-1, '0')
449 496 self.assertTrue('px0' in magics['line'])
450 497 self.assertTrue('px0' in magics['cell'])
451 498 self.assertEqual(v0.targets, self.client.ids[-1])
452 499 v0 = self.client.activate('all', 'all')
453 500 self.assertTrue('pxall' in magics['line'])
454 501 self.assertTrue('pxall' in magics['cell'])
455 502 self.assertEqual(v0.targets, 'all')
General Comments 0
You need to be logged in to leave comments. Login now