##// END OF EJS Templates
debug occasional error in test_queue_status...
MinRK -
Show More
@@ -1,546 +1,551 b''
1 1 """Tests for parallel client.py
2 2
3 3 Authors:
4 4
5 5 * Min RK
6 6 """
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 from __future__ import division
20 20
21 21 import time
22 22 from datetime import datetime
23 23
24 24 import zmq
25 25
26 26 from IPython import parallel
27 27 from IPython.parallel.client import client as clientmod
28 28 from IPython.parallel import error
29 29 from IPython.parallel import AsyncResult, AsyncHubResult
30 30 from IPython.parallel import LoadBalancedView, DirectView
31 31
32 32 from .clienttest import ClusterTestCase, segfault, wait, add_engines
33 33
34 34 def setup():
35 35 add_engines(4, total=True)
36 36
37 37 class TestClient(ClusterTestCase):
38 38
39 39 def test_ids(self):
40 40 n = len(self.client.ids)
41 41 self.add_engines(2)
42 42 self.assertEqual(len(self.client.ids), n+2)
43 43
44 44 def test_view_indexing(self):
45 45 """test index access for views"""
46 46 self.minimum_engines(4)
47 47 targets = self.client._build_targets('all')[-1]
48 48 v = self.client[:]
49 49 self.assertEqual(v.targets, targets)
50 50 t = self.client.ids[2]
51 51 v = self.client[t]
52 52 self.assertTrue(isinstance(v, DirectView))
53 53 self.assertEqual(v.targets, t)
54 54 t = self.client.ids[2:4]
55 55 v = self.client[t]
56 56 self.assertTrue(isinstance(v, DirectView))
57 57 self.assertEqual(v.targets, t)
58 58 v = self.client[::2]
59 59 self.assertTrue(isinstance(v, DirectView))
60 60 self.assertEqual(v.targets, targets[::2])
61 61 v = self.client[1::3]
62 62 self.assertTrue(isinstance(v, DirectView))
63 63 self.assertEqual(v.targets, targets[1::3])
64 64 v = self.client[:-3]
65 65 self.assertTrue(isinstance(v, DirectView))
66 66 self.assertEqual(v.targets, targets[:-3])
67 67 v = self.client[-1]
68 68 self.assertTrue(isinstance(v, DirectView))
69 69 self.assertEqual(v.targets, targets[-1])
70 70 self.assertRaises(TypeError, lambda : self.client[None])
71 71
72 72 def test_lbview_targets(self):
73 73 """test load_balanced_view targets"""
74 74 v = self.client.load_balanced_view()
75 75 self.assertEqual(v.targets, None)
76 76 v = self.client.load_balanced_view(-1)
77 77 self.assertEqual(v.targets, [self.client.ids[-1]])
78 78 v = self.client.load_balanced_view('all')
79 79 self.assertEqual(v.targets, None)
80 80
81 81 def test_dview_targets(self):
82 82 """test direct_view targets"""
83 83 v = self.client.direct_view()
84 84 self.assertEqual(v.targets, 'all')
85 85 v = self.client.direct_view('all')
86 86 self.assertEqual(v.targets, 'all')
87 87 v = self.client.direct_view(-1)
88 88 self.assertEqual(v.targets, self.client.ids[-1])
89 89
90 90 def test_lazy_all_targets(self):
91 91 """test lazy evaluation of rc.direct_view('all')"""
92 92 v = self.client.direct_view()
93 93 self.assertEqual(v.targets, 'all')
94 94
95 95 def double(x):
96 96 return x*2
97 97 seq = list(range(100))
98 98 ref = [ double(x) for x in seq ]
99 99
100 100 # add some engines, which should be used
101 101 self.add_engines(1)
102 102 n1 = len(self.client.ids)
103 103
104 104 # simple apply
105 105 r = v.apply_sync(lambda : 1)
106 106 self.assertEqual(r, [1] * n1)
107 107
108 108 # map goes through remotefunction
109 109 r = v.map_sync(double, seq)
110 110 self.assertEqual(r, ref)
111 111
112 112 # add a couple more engines, and try again
113 113 self.add_engines(2)
114 114 n2 = len(self.client.ids)
115 115 self.assertNotEqual(n2, n1)
116 116
117 117 # apply
118 118 r = v.apply_sync(lambda : 1)
119 119 self.assertEqual(r, [1] * n2)
120 120
121 121 # map
122 122 r = v.map_sync(double, seq)
123 123 self.assertEqual(r, ref)
124 124
125 125 def test_targets(self):
126 126 """test various valid targets arguments"""
127 127 build = self.client._build_targets
128 128 ids = self.client.ids
129 129 idents,targets = build(None)
130 130 self.assertEqual(ids, targets)
131 131
132 132 def test_clear(self):
133 133 """test clear behavior"""
134 134 self.minimum_engines(2)
135 135 v = self.client[:]
136 136 v.block=True
137 137 v.push(dict(a=5))
138 138 v.pull('a')
139 139 id0 = self.client.ids[-1]
140 140 self.client.clear(targets=id0, block=True)
141 141 a = self.client[:-1].get('a')
142 142 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
143 143 self.client.clear(block=True)
144 144 for i in self.client.ids:
145 145 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
146 146
147 147 def test_get_result(self):
148 148 """test getting results from the Hub."""
149 149 c = clientmod.Client(profile='iptest')
150 150 t = c.ids[-1]
151 151 ar = c[t].apply_async(wait, 1)
152 152 # give the monitor time to notice the message
153 153 time.sleep(.25)
154 154 ahr = self.client.get_result(ar.msg_ids[0])
155 155 self.assertTrue(isinstance(ahr, AsyncHubResult))
156 156 self.assertEqual(ahr.get(), ar.get())
157 157 ar2 = self.client.get_result(ar.msg_ids[0])
158 158 self.assertFalse(isinstance(ar2, AsyncHubResult))
159 159 c.close()
160 160
161 161 def test_get_execute_result(self):
162 162 """test getting execute results from the Hub."""
163 163 c = clientmod.Client(profile='iptest')
164 164 t = c.ids[-1]
165 165 cell = '\n'.join([
166 166 'import time',
167 167 'time.sleep(0.25)',
168 168 '5'
169 169 ])
170 170 ar = c[t].execute("import time; time.sleep(1)", silent=False)
171 171 # give the monitor time to notice the message
172 172 time.sleep(.25)
173 173 ahr = self.client.get_result(ar.msg_ids[0])
174 174 self.assertTrue(isinstance(ahr, AsyncHubResult))
175 175 self.assertEqual(ahr.get().pyout, ar.get().pyout)
176 176 ar2 = self.client.get_result(ar.msg_ids[0])
177 177 self.assertFalse(isinstance(ar2, AsyncHubResult))
178 178 c.close()
179 179
180 180 def test_ids_list(self):
181 181 """test client.ids"""
182 182 ids = self.client.ids
183 183 self.assertEqual(ids, self.client._ids)
184 184 self.assertFalse(ids is self.client._ids)
185 185 ids.remove(ids[-1])
186 186 self.assertNotEqual(ids, self.client._ids)
187 187
188 188 def test_queue_status(self):
189 189 ids = self.client.ids
190 190 id0 = ids[0]
191 191 qs = self.client.queue_status(targets=id0)
192 192 self.assertTrue(isinstance(qs, dict))
193 193 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
194 194 allqs = self.client.queue_status()
195 195 self.assertTrue(isinstance(allqs, dict))
196 196 intkeys = list(allqs.keys())
197 197 intkeys.remove('unassigned')
198 self.assertEqual(sorted(intkeys), sorted(self.client.ids))
198 print("intkeys", intkeys)
199 intkeys = sorted(intkeys)
200 ids = self.client.ids
201 print("client.ids", ids)
202 ids = sorted(self.client.ids)
203 self.assertEqual(intkeys, ids)
199 204 unassigned = allqs.pop('unassigned')
200 205 for eid,qs in allqs.items():
201 206 self.assertTrue(isinstance(qs, dict))
202 207 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
203 208
204 209 def test_shutdown(self):
205 210 ids = self.client.ids
206 211 id0 = ids[0]
207 212 self.client.shutdown(id0, block=True)
208 213 while id0 in self.client.ids:
209 214 time.sleep(0.1)
210 215 self.client.spin()
211 216
212 217 self.assertRaises(IndexError, lambda : self.client[id0])
213 218
214 219 def test_result_status(self):
215 220 pass
216 221 # to be written
217 222
218 223 def test_db_query_dt(self):
219 224 """test db query by date"""
220 225 hist = self.client.hub_history()
221 226 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
222 227 tic = middle['submitted']
223 228 before = self.client.db_query({'submitted' : {'$lt' : tic}})
224 229 after = self.client.db_query({'submitted' : {'$gte' : tic}})
225 230 self.assertEqual(len(before)+len(after),len(hist))
226 231 for b in before:
227 232 self.assertTrue(b['submitted'] < tic)
228 233 for a in after:
229 234 self.assertTrue(a['submitted'] >= tic)
230 235 same = self.client.db_query({'submitted' : tic})
231 236 for s in same:
232 237 self.assertTrue(s['submitted'] == tic)
233 238
234 239 def test_db_query_keys(self):
235 240 """test extracting subset of record keys"""
236 241 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
237 242 for rec in found:
238 243 self.assertEqual(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
239 244
240 245 def test_db_query_default_keys(self):
241 246 """default db_query excludes buffers"""
242 247 found = self.client.db_query({'msg_id': {'$ne' : ''}})
243 248 for rec in found:
244 249 keys = set(rec.keys())
245 250 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
246 251 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
247 252
248 253 def test_db_query_msg_id(self):
249 254 """ensure msg_id is always in db queries"""
250 255 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
251 256 for rec in found:
252 257 self.assertTrue('msg_id' in rec.keys())
253 258 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
254 259 for rec in found:
255 260 self.assertTrue('msg_id' in rec.keys())
256 261 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
257 262 for rec in found:
258 263 self.assertTrue('msg_id' in rec.keys())
259 264
260 265 def test_db_query_get_result(self):
261 266 """pop in db_query shouldn't pop from result itself"""
262 267 self.client[:].apply_sync(lambda : 1)
263 268 found = self.client.db_query({'msg_id': {'$ne' : ''}})
264 269 rc2 = clientmod.Client(profile='iptest')
265 270 # If this bug is not fixed, this call will hang:
266 271 ar = rc2.get_result(self.client.history[-1])
267 272 ar.wait(2)
268 273 self.assertTrue(ar.ready())
269 274 ar.get()
270 275 rc2.close()
271 276
272 277 def test_db_query_in(self):
273 278 """test db query with '$in','$nin' operators"""
274 279 hist = self.client.hub_history()
275 280 even = hist[::2]
276 281 odd = hist[1::2]
277 282 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
278 283 found = [ r['msg_id'] for r in recs ]
279 284 self.assertEqual(set(even), set(found))
280 285 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
281 286 found = [ r['msg_id'] for r in recs ]
282 287 self.assertEqual(set(odd), set(found))
283 288
284 289 def test_hub_history(self):
285 290 hist = self.client.hub_history()
286 291 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
287 292 recdict = {}
288 293 for rec in recs:
289 294 recdict[rec['msg_id']] = rec
290 295
291 296 latest = datetime(1984,1,1)
292 297 for msg_id in hist:
293 298 rec = recdict[msg_id]
294 299 newt = rec['submitted']
295 300 self.assertTrue(newt >= latest)
296 301 latest = newt
297 302 ar = self.client[-1].apply_async(lambda : 1)
298 303 ar.get()
299 304 time.sleep(0.25)
300 305 self.assertEqual(self.client.hub_history()[-1:],ar.msg_ids)
301 306
302 307 def _wait_for_idle(self):
303 308 """wait for the cluster to become idle, according to the everyone."""
304 309 rc = self.client
305 310
306 311 # step 0. wait for local results
307 312 # this should be sufficient 99% of the time.
308 313 rc.wait(timeout=5)
309 314
310 315 # step 1. wait for all requests to be noticed
311 316 # timeout 5s, polling every 100ms
312 317 msg_ids = set(rc.history)
313 318 hub_hist = rc.hub_history()
314 319 for i in range(50):
315 320 if msg_ids.difference(hub_hist):
316 321 time.sleep(0.1)
317 322 hub_hist = rc.hub_history()
318 323 else:
319 324 break
320 325
321 326 self.assertEqual(len(msg_ids.difference(hub_hist)), 0)
322 327
323 328 # step 2. wait for all requests to be done
324 329 # timeout 5s, polling every 100ms
325 330 qs = rc.queue_status()
326 331 for i in range(50):
327 332 if qs['unassigned'] or any(qs[eid]['tasks'] + qs[eid]['queue'] for eid in qs if eid != 'unassigned'):
328 333 time.sleep(0.1)
329 334 qs = rc.queue_status()
330 335 else:
331 336 break
332 337
333 338 # ensure Hub up to date:
334 339 self.assertEqual(qs['unassigned'], 0)
335 340 for eid in [ eid for eid in qs if eid != 'unassigned' ]:
336 341 self.assertEqual(qs[eid]['tasks'], 0)
337 342 self.assertEqual(qs[eid]['queue'], 0)
338 343
339 344
340 345 def test_resubmit(self):
341 346 def f():
342 347 import random
343 348 return random.random()
344 349 v = self.client.load_balanced_view()
345 350 ar = v.apply_async(f)
346 351 r1 = ar.get(1)
347 352 # give the Hub a chance to notice:
348 353 self._wait_for_idle()
349 354 ahr = self.client.resubmit(ar.msg_ids)
350 355 r2 = ahr.get(1)
351 356 self.assertFalse(r1 == r2)
352 357
353 358 def test_resubmit_chain(self):
354 359 """resubmit resubmitted tasks"""
355 360 v = self.client.load_balanced_view()
356 361 ar = v.apply_async(lambda x: x, 'x'*1024)
357 362 ar.get()
358 363 self._wait_for_idle()
359 364 ars = [ar]
360 365
361 366 for i in range(10):
362 367 ar = ars[-1]
363 368 ar2 = self.client.resubmit(ar.msg_ids)
364 369
365 370 [ ar.get() for ar in ars ]
366 371
367 372 def test_resubmit_header(self):
368 373 """resubmit shouldn't clobber the whole header"""
369 374 def f():
370 375 import random
371 376 return random.random()
372 377 v = self.client.load_balanced_view()
373 378 v.retries = 1
374 379 ar = v.apply_async(f)
375 380 r1 = ar.get(1)
376 381 # give the Hub a chance to notice:
377 382 self._wait_for_idle()
378 383 ahr = self.client.resubmit(ar.msg_ids)
379 384 ahr.get(1)
380 385 time.sleep(0.5)
381 386 records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header')
382 387 h1,h2 = [ r['header'] for r in records ]
383 388 for key in set(h1.keys()).union(set(h2.keys())):
384 389 if key in ('msg_id', 'date'):
385 390 self.assertNotEqual(h1[key], h2[key])
386 391 else:
387 392 self.assertEqual(h1[key], h2[key])
388 393
389 394 def test_resubmit_aborted(self):
390 395 def f():
391 396 import random
392 397 return random.random()
393 398 v = self.client.load_balanced_view()
394 399 # restrict to one engine, so we can put a sleep
395 400 # ahead of the task, so it will get aborted
396 401 eid = self.client.ids[-1]
397 402 v.targets = [eid]
398 403 sleep = v.apply_async(time.sleep, 0.5)
399 404 ar = v.apply_async(f)
400 405 ar.abort()
401 406 self.assertRaises(error.TaskAborted, ar.get)
402 407 # Give the Hub a chance to get up to date:
403 408 self._wait_for_idle()
404 409 ahr = self.client.resubmit(ar.msg_ids)
405 410 r2 = ahr.get(1)
406 411
407 412 def test_resubmit_inflight(self):
408 413 """resubmit of inflight task"""
409 414 v = self.client.load_balanced_view()
410 415 ar = v.apply_async(time.sleep,1)
411 416 # give the message a chance to arrive
412 417 time.sleep(0.2)
413 418 ahr = self.client.resubmit(ar.msg_ids)
414 419 ar.get(2)
415 420 ahr.get(2)
416 421
417 422 def test_resubmit_badkey(self):
418 423 """ensure KeyError on resubmit of nonexistant task"""
419 424 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
420 425
421 426 def test_purge_hub_results(self):
422 427 # ensure there are some tasks
423 428 for i in range(5):
424 429 self.client[:].apply_sync(lambda : 1)
425 430 # Wait for the Hub to realise the result is done:
426 431 # This prevents a race condition, where we
427 432 # might purge a result the Hub still thinks is pending.
428 433 self._wait_for_idle()
429 434 rc2 = clientmod.Client(profile='iptest')
430 435 hist = self.client.hub_history()
431 436 ahr = rc2.get_result([hist[-1]])
432 437 ahr.wait(10)
433 438 self.client.purge_hub_results(hist[-1])
434 439 newhist = self.client.hub_history()
435 440 self.assertEqual(len(newhist)+1,len(hist))
436 441 rc2.spin()
437 442 rc2.close()
438 443
439 444 def test_purge_local_results(self):
440 445 # ensure there are some tasks
441 446 res = []
442 447 for i in range(5):
443 448 res.append(self.client[:].apply_async(lambda : 1))
444 449 self._wait_for_idle()
445 450 self.client.wait(10) # wait for the results to come back
446 451 before = len(self.client.results)
447 452 self.assertEqual(len(self.client.metadata),before)
448 453 self.client.purge_local_results(res[-1])
449 454 self.assertEqual(len(self.client.results),before-len(res[-1]), msg="Not removed from results")
450 455 self.assertEqual(len(self.client.metadata),before-len(res[-1]), msg="Not removed from metadata")
451 456
452 457 def test_purge_local_results_outstanding(self):
453 458 v = self.client[-1]
454 459 ar = v.apply_async(lambda : 1)
455 460 msg_id = ar.msg_ids[0]
456 461 ar.get()
457 462 self._wait_for_idle()
458 463 ar2 = v.apply_async(time.sleep, 1)
459 464 self.assertIn(msg_id, self.client.results)
460 465 self.assertIn(msg_id, self.client.metadata)
461 466 self.client.purge_local_results(ar)
462 467 self.assertNotIn(msg_id, self.client.results)
463 468 self.assertNotIn(msg_id, self.client.metadata)
464 469 with self.assertRaises(RuntimeError):
465 470 self.client.purge_local_results(ar2)
466 471 ar2.get()
467 472 self.client.purge_local_results(ar2)
468 473
469 474 def test_purge_all_local_results_outstanding(self):
470 475 v = self.client[-1]
471 476 ar = v.apply_async(time.sleep, 1)
472 477 with self.assertRaises(RuntimeError):
473 478 self.client.purge_local_results('all')
474 479 ar.get()
475 480 self.client.purge_local_results('all')
476 481
477 482 def test_purge_all_hub_results(self):
478 483 self.client.purge_hub_results('all')
479 484 hist = self.client.hub_history()
480 485 self.assertEqual(len(hist), 0)
481 486
482 487 def test_purge_all_local_results(self):
483 488 self.client.purge_local_results('all')
484 489 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
485 490 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
486 491
487 492 def test_purge_all_results(self):
488 493 # ensure there are some tasks
489 494 for i in range(5):
490 495 self.client[:].apply_sync(lambda : 1)
491 496 self.client.wait(10)
492 497 self._wait_for_idle()
493 498 self.client.purge_results('all')
494 499 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
495 500 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
496 501 hist = self.client.hub_history()
497 502 self.assertEqual(len(hist), 0, msg="hub history not empty")
498 503
499 504 def test_purge_everything(self):
500 505 # ensure there are some tasks
501 506 for i in range(5):
502 507 self.client[:].apply_sync(lambda : 1)
503 508 self.client.wait(10)
504 509 self._wait_for_idle()
505 510 self.client.purge_everything()
506 511 # The client results
507 512 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
508 513 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
509 514 # The client "bookkeeping"
510 515 self.assertEqual(len(self.client.session.digest_history), 0, msg="session digest not empty")
511 516 self.assertEqual(len(self.client.history), 0, msg="client history not empty")
512 517 # the hub results
513 518 hist = self.client.hub_history()
514 519 self.assertEqual(len(hist), 0, msg="hub history not empty")
515 520
516 521
517 522 def test_spin_thread(self):
518 523 self.client.spin_thread(0.01)
519 524 ar = self.client[-1].apply_async(lambda : 1)
520 525 time.sleep(0.1)
521 526 self.assertTrue(ar.wall_time < 0.1,
522 527 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
523 528 )
524 529
525 530 def test_stop_spin_thread(self):
526 531 self.client.spin_thread(0.01)
527 532 self.client.stop_spin_thread()
528 533 ar = self.client[-1].apply_async(lambda : 1)
529 534 time.sleep(0.15)
530 535 self.assertTrue(ar.wall_time > 0.1,
531 536 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
532 537 )
533 538
534 539 def test_activate(self):
535 540 ip = get_ipython()
536 541 magics = ip.magics_manager.magics
537 542 self.assertTrue('px' in magics['line'])
538 543 self.assertTrue('px' in magics['cell'])
539 544 v0 = self.client.activate(-1, '0')
540 545 self.assertTrue('px0' in magics['line'])
541 546 self.assertTrue('px0' in magics['cell'])
542 547 self.assertEqual(v0.targets, self.client.ids[-1])
543 548 v0 = self.client.activate('all', 'all')
544 549 self.assertTrue('pxall' in magics['line'])
545 550 self.assertTrue('pxall' in magics['cell'])
546 551 self.assertEqual(v0.targets, 'all')
General Comments 0
You need to be logged in to leave comments. Login now