##// END OF EJS Templates
Merge pull request #5463 from minrk/spin-slow...
Thomas Kluyver -
r16043:ef5762b6 merge
parent child Browse files
Show More
@@ -1,551 +1,555 b''
1 1 """Tests for parallel client.py
2 2
3 3 Authors:
4 4
5 5 * Min RK
6 6 """
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 from __future__ import division
20 20
21 21 import time
22 22 from datetime import datetime
23 23
24 24 import zmq
25 25
26 26 from IPython import parallel
27 27 from IPython.parallel.client import client as clientmod
28 28 from IPython.parallel import error
29 29 from IPython.parallel import AsyncResult, AsyncHubResult
30 30 from IPython.parallel import LoadBalancedView, DirectView
31 31
32 32 from .clienttest import ClusterTestCase, segfault, wait, add_engines
33 33
34 34 def setup():
35 35 add_engines(4, total=True)
36 36
37 37 class TestClient(ClusterTestCase):
38 38
39 39 def test_ids(self):
40 40 n = len(self.client.ids)
41 41 self.add_engines(2)
42 42 self.assertEqual(len(self.client.ids), n+2)
43 43
44 44 def test_view_indexing(self):
45 45 """test index access for views"""
46 46 self.minimum_engines(4)
47 47 targets = self.client._build_targets('all')[-1]
48 48 v = self.client[:]
49 49 self.assertEqual(v.targets, targets)
50 50 t = self.client.ids[2]
51 51 v = self.client[t]
52 52 self.assertTrue(isinstance(v, DirectView))
53 53 self.assertEqual(v.targets, t)
54 54 t = self.client.ids[2:4]
55 55 v = self.client[t]
56 56 self.assertTrue(isinstance(v, DirectView))
57 57 self.assertEqual(v.targets, t)
58 58 v = self.client[::2]
59 59 self.assertTrue(isinstance(v, DirectView))
60 60 self.assertEqual(v.targets, targets[::2])
61 61 v = self.client[1::3]
62 62 self.assertTrue(isinstance(v, DirectView))
63 63 self.assertEqual(v.targets, targets[1::3])
64 64 v = self.client[:-3]
65 65 self.assertTrue(isinstance(v, DirectView))
66 66 self.assertEqual(v.targets, targets[:-3])
67 67 v = self.client[-1]
68 68 self.assertTrue(isinstance(v, DirectView))
69 69 self.assertEqual(v.targets, targets[-1])
70 70 self.assertRaises(TypeError, lambda : self.client[None])
71 71
72 72 def test_lbview_targets(self):
73 73 """test load_balanced_view targets"""
74 74 v = self.client.load_balanced_view()
75 75 self.assertEqual(v.targets, None)
76 76 v = self.client.load_balanced_view(-1)
77 77 self.assertEqual(v.targets, [self.client.ids[-1]])
78 78 v = self.client.load_balanced_view('all')
79 79 self.assertEqual(v.targets, None)
80 80
81 81 def test_dview_targets(self):
82 82 """test direct_view targets"""
83 83 v = self.client.direct_view()
84 84 self.assertEqual(v.targets, 'all')
85 85 v = self.client.direct_view('all')
86 86 self.assertEqual(v.targets, 'all')
87 87 v = self.client.direct_view(-1)
88 88 self.assertEqual(v.targets, self.client.ids[-1])
89 89
90 90 def test_lazy_all_targets(self):
91 91 """test lazy evaluation of rc.direct_view('all')"""
92 92 v = self.client.direct_view()
93 93 self.assertEqual(v.targets, 'all')
94 94
95 95 def double(x):
96 96 return x*2
97 97 seq = list(range(100))
98 98 ref = [ double(x) for x in seq ]
99 99
100 100 # add some engines, which should be used
101 101 self.add_engines(1)
102 102 n1 = len(self.client.ids)
103 103
104 104 # simple apply
105 105 r = v.apply_sync(lambda : 1)
106 106 self.assertEqual(r, [1] * n1)
107 107
108 108 # map goes through remotefunction
109 109 r = v.map_sync(double, seq)
110 110 self.assertEqual(r, ref)
111 111
112 112 # add a couple more engines, and try again
113 113 self.add_engines(2)
114 114 n2 = len(self.client.ids)
115 115 self.assertNotEqual(n2, n1)
116 116
117 117 # apply
118 118 r = v.apply_sync(lambda : 1)
119 119 self.assertEqual(r, [1] * n2)
120 120
121 121 # map
122 122 r = v.map_sync(double, seq)
123 123 self.assertEqual(r, ref)
124 124
125 125 def test_targets(self):
126 126 """test various valid targets arguments"""
127 127 build = self.client._build_targets
128 128 ids = self.client.ids
129 129 idents,targets = build(None)
130 130 self.assertEqual(ids, targets)
131 131
132 132 def test_clear(self):
133 133 """test clear behavior"""
134 134 self.minimum_engines(2)
135 135 v = self.client[:]
136 136 v.block=True
137 137 v.push(dict(a=5))
138 138 v.pull('a')
139 139 id0 = self.client.ids[-1]
140 140 self.client.clear(targets=id0, block=True)
141 141 a = self.client[:-1].get('a')
142 142 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
143 143 self.client.clear(block=True)
144 144 for i in self.client.ids:
145 145 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
146 146
147 147 def test_get_result(self):
148 148 """test getting results from the Hub."""
149 149 c = clientmod.Client(profile='iptest')
150 150 t = c.ids[-1]
151 151 ar = c[t].apply_async(wait, 1)
152 152 # give the monitor time to notice the message
153 153 time.sleep(.25)
154 154 ahr = self.client.get_result(ar.msg_ids[0])
155 155 self.assertTrue(isinstance(ahr, AsyncHubResult))
156 156 self.assertEqual(ahr.get(), ar.get())
157 157 ar2 = self.client.get_result(ar.msg_ids[0])
158 158 self.assertFalse(isinstance(ar2, AsyncHubResult))
159 159 c.close()
160 160
161 161 def test_get_execute_result(self):
162 162 """test getting execute results from the Hub."""
163 163 c = clientmod.Client(profile='iptest')
164 164 t = c.ids[-1]
165 165 cell = '\n'.join([
166 166 'import time',
167 167 'time.sleep(0.25)',
168 168 '5'
169 169 ])
170 170 ar = c[t].execute("import time; time.sleep(1)", silent=False)
171 171 # give the monitor time to notice the message
172 172 time.sleep(.25)
173 173 ahr = self.client.get_result(ar.msg_ids[0])
174 174 self.assertTrue(isinstance(ahr, AsyncHubResult))
175 175 self.assertEqual(ahr.get().pyout, ar.get().pyout)
176 176 ar2 = self.client.get_result(ar.msg_ids[0])
177 177 self.assertFalse(isinstance(ar2, AsyncHubResult))
178 178 c.close()
179 179
180 180 def test_ids_list(self):
181 181 """test client.ids"""
182 182 ids = self.client.ids
183 183 self.assertEqual(ids, self.client._ids)
184 184 self.assertFalse(ids is self.client._ids)
185 185 ids.remove(ids[-1])
186 186 self.assertNotEqual(ids, self.client._ids)
187 187
188 188 def test_queue_status(self):
189 189 ids = self.client.ids
190 190 id0 = ids[0]
191 191 qs = self.client.queue_status(targets=id0)
192 192 self.assertTrue(isinstance(qs, dict))
193 193 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
194 194 allqs = self.client.queue_status()
195 195 self.assertTrue(isinstance(allqs, dict))
196 196 intkeys = list(allqs.keys())
197 197 intkeys.remove('unassigned')
198 198 print("intkeys", intkeys)
199 199 intkeys = sorted(intkeys)
200 200 ids = self.client.ids
201 201 print("client.ids", ids)
202 202 ids = sorted(self.client.ids)
203 203 self.assertEqual(intkeys, ids)
204 204 unassigned = allqs.pop('unassigned')
205 205 for eid,qs in allqs.items():
206 206 self.assertTrue(isinstance(qs, dict))
207 207 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
208 208
209 209 def test_shutdown(self):
210 210 ids = self.client.ids
211 211 id0 = ids[0]
212 212 self.client.shutdown(id0, block=True)
213 213 while id0 in self.client.ids:
214 214 time.sleep(0.1)
215 215 self.client.spin()
216 216
217 217 self.assertRaises(IndexError, lambda : self.client[id0])
218 218
219 219 def test_result_status(self):
220 220 pass
221 221 # to be written
222 222
223 223 def test_db_query_dt(self):
224 224 """test db query by date"""
225 225 hist = self.client.hub_history()
226 226 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
227 227 tic = middle['submitted']
228 228 before = self.client.db_query({'submitted' : {'$lt' : tic}})
229 229 after = self.client.db_query({'submitted' : {'$gte' : tic}})
230 230 self.assertEqual(len(before)+len(after),len(hist))
231 231 for b in before:
232 232 self.assertTrue(b['submitted'] < tic)
233 233 for a in after:
234 234 self.assertTrue(a['submitted'] >= tic)
235 235 same = self.client.db_query({'submitted' : tic})
236 236 for s in same:
237 237 self.assertTrue(s['submitted'] == tic)
238 238
239 239 def test_db_query_keys(self):
240 240 """test extracting subset of record keys"""
241 241 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
242 242 for rec in found:
243 243 self.assertEqual(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
244 244
245 245 def test_db_query_default_keys(self):
246 246 """default db_query excludes buffers"""
247 247 found = self.client.db_query({'msg_id': {'$ne' : ''}})
248 248 for rec in found:
249 249 keys = set(rec.keys())
250 250 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
251 251 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
252 252
253 253 def test_db_query_msg_id(self):
254 254 """ensure msg_id is always in db queries"""
255 255 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
256 256 for rec in found:
257 257 self.assertTrue('msg_id' in rec.keys())
258 258 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
259 259 for rec in found:
260 260 self.assertTrue('msg_id' in rec.keys())
261 261 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
262 262 for rec in found:
263 263 self.assertTrue('msg_id' in rec.keys())
264 264
265 265 def test_db_query_get_result(self):
266 266 """pop in db_query shouldn't pop from result itself"""
267 267 self.client[:].apply_sync(lambda : 1)
268 268 found = self.client.db_query({'msg_id': {'$ne' : ''}})
269 269 rc2 = clientmod.Client(profile='iptest')
270 270 # If this bug is not fixed, this call will hang:
271 271 ar = rc2.get_result(self.client.history[-1])
272 272 ar.wait(2)
273 273 self.assertTrue(ar.ready())
274 274 ar.get()
275 275 rc2.close()
276 276
277 277 def test_db_query_in(self):
278 278 """test db query with '$in','$nin' operators"""
279 279 hist = self.client.hub_history()
280 280 even = hist[::2]
281 281 odd = hist[1::2]
282 282 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
283 283 found = [ r['msg_id'] for r in recs ]
284 284 self.assertEqual(set(even), set(found))
285 285 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
286 286 found = [ r['msg_id'] for r in recs ]
287 287 self.assertEqual(set(odd), set(found))
288 288
289 289 def test_hub_history(self):
290 290 hist = self.client.hub_history()
291 291 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
292 292 recdict = {}
293 293 for rec in recs:
294 294 recdict[rec['msg_id']] = rec
295 295
296 296 latest = datetime(1984,1,1)
297 297 for msg_id in hist:
298 298 rec = recdict[msg_id]
299 299 newt = rec['submitted']
300 300 self.assertTrue(newt >= latest)
301 301 latest = newt
302 302 ar = self.client[-1].apply_async(lambda : 1)
303 303 ar.get()
304 304 time.sleep(0.25)
305 305 self.assertEqual(self.client.hub_history()[-1:],ar.msg_ids)
306 306
307 307 def _wait_for_idle(self):
308 308 """wait for the cluster to become idle, according to the everyone."""
309 309 rc = self.client
310 310
311 311 # step 0. wait for local results
312 312 # this should be sufficient 99% of the time.
313 313 rc.wait(timeout=5)
314 314
315 315 # step 1. wait for all requests to be noticed
316 316 # timeout 5s, polling every 100ms
317 317 msg_ids = set(rc.history)
318 318 hub_hist = rc.hub_history()
319 319 for i in range(50):
320 320 if msg_ids.difference(hub_hist):
321 321 time.sleep(0.1)
322 322 hub_hist = rc.hub_history()
323 323 else:
324 324 break
325 325
326 326 self.assertEqual(len(msg_ids.difference(hub_hist)), 0)
327 327
328 328 # step 2. wait for all requests to be done
329 329 # timeout 5s, polling every 100ms
330 330 qs = rc.queue_status()
331 331 for i in range(50):
332 332 if qs['unassigned'] or any(qs[eid]['tasks'] + qs[eid]['queue'] for eid in qs if eid != 'unassigned'):
333 333 time.sleep(0.1)
334 334 qs = rc.queue_status()
335 335 else:
336 336 break
337 337
338 338 # ensure Hub up to date:
339 339 self.assertEqual(qs['unassigned'], 0)
340 340 for eid in [ eid for eid in qs if eid != 'unassigned' ]:
341 341 self.assertEqual(qs[eid]['tasks'], 0)
342 342 self.assertEqual(qs[eid]['queue'], 0)
343 343
344 344
345 345 def test_resubmit(self):
346 346 def f():
347 347 import random
348 348 return random.random()
349 349 v = self.client.load_balanced_view()
350 350 ar = v.apply_async(f)
351 351 r1 = ar.get(1)
352 352 # give the Hub a chance to notice:
353 353 self._wait_for_idle()
354 354 ahr = self.client.resubmit(ar.msg_ids)
355 355 r2 = ahr.get(1)
356 356 self.assertFalse(r1 == r2)
357 357
358 358 def test_resubmit_chain(self):
359 359 """resubmit resubmitted tasks"""
360 360 v = self.client.load_balanced_view()
361 361 ar = v.apply_async(lambda x: x, 'x'*1024)
362 362 ar.get()
363 363 self._wait_for_idle()
364 364 ars = [ar]
365 365
366 366 for i in range(10):
367 367 ar = ars[-1]
368 368 ar2 = self.client.resubmit(ar.msg_ids)
369 369
370 370 [ ar.get() for ar in ars ]
371 371
372 372 def test_resubmit_header(self):
373 373 """resubmit shouldn't clobber the whole header"""
374 374 def f():
375 375 import random
376 376 return random.random()
377 377 v = self.client.load_balanced_view()
378 378 v.retries = 1
379 379 ar = v.apply_async(f)
380 380 r1 = ar.get(1)
381 381 # give the Hub a chance to notice:
382 382 self._wait_for_idle()
383 383 ahr = self.client.resubmit(ar.msg_ids)
384 384 ahr.get(1)
385 385 time.sleep(0.5)
386 386 records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header')
387 387 h1,h2 = [ r['header'] for r in records ]
388 388 for key in set(h1.keys()).union(set(h2.keys())):
389 389 if key in ('msg_id', 'date'):
390 390 self.assertNotEqual(h1[key], h2[key])
391 391 else:
392 392 self.assertEqual(h1[key], h2[key])
393 393
394 394 def test_resubmit_aborted(self):
395 395 def f():
396 396 import random
397 397 return random.random()
398 398 v = self.client.load_balanced_view()
399 399 # restrict to one engine, so we can put a sleep
400 400 # ahead of the task, so it will get aborted
401 401 eid = self.client.ids[-1]
402 402 v.targets = [eid]
403 403 sleep = v.apply_async(time.sleep, 0.5)
404 404 ar = v.apply_async(f)
405 405 ar.abort()
406 406 self.assertRaises(error.TaskAborted, ar.get)
407 407 # Give the Hub a chance to get up to date:
408 408 self._wait_for_idle()
409 409 ahr = self.client.resubmit(ar.msg_ids)
410 410 r2 = ahr.get(1)
411 411
412 412 def test_resubmit_inflight(self):
413 413 """resubmit of inflight task"""
414 414 v = self.client.load_balanced_view()
415 415 ar = v.apply_async(time.sleep,1)
416 416 # give the message a chance to arrive
417 417 time.sleep(0.2)
418 418 ahr = self.client.resubmit(ar.msg_ids)
419 419 ar.get(2)
420 420 ahr.get(2)
421 421
422 422 def test_resubmit_badkey(self):
423 423 """ensure KeyError on resubmit of nonexistant task"""
424 424 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
425 425
426 426 def test_purge_hub_results(self):
427 427 # ensure there are some tasks
428 428 for i in range(5):
429 429 self.client[:].apply_sync(lambda : 1)
430 430 # Wait for the Hub to realise the result is done:
431 431 # This prevents a race condition, where we
432 432 # might purge a result the Hub still thinks is pending.
433 433 self._wait_for_idle()
434 434 rc2 = clientmod.Client(profile='iptest')
435 435 hist = self.client.hub_history()
436 436 ahr = rc2.get_result([hist[-1]])
437 437 ahr.wait(10)
438 438 self.client.purge_hub_results(hist[-1])
439 439 newhist = self.client.hub_history()
440 440 self.assertEqual(len(newhist)+1,len(hist))
441 441 rc2.spin()
442 442 rc2.close()
443 443
444 444 def test_purge_local_results(self):
445 445 # ensure there are some tasks
446 446 res = []
447 447 for i in range(5):
448 448 res.append(self.client[:].apply_async(lambda : 1))
449 449 self._wait_for_idle()
450 450 self.client.wait(10) # wait for the results to come back
451 451 before = len(self.client.results)
452 452 self.assertEqual(len(self.client.metadata),before)
453 453 self.client.purge_local_results(res[-1])
454 454 self.assertEqual(len(self.client.results),before-len(res[-1]), msg="Not removed from results")
455 455 self.assertEqual(len(self.client.metadata),before-len(res[-1]), msg="Not removed from metadata")
456 456
457 457 def test_purge_local_results_outstanding(self):
458 458 v = self.client[-1]
459 459 ar = v.apply_async(lambda : 1)
460 460 msg_id = ar.msg_ids[0]
461 461 ar.get()
462 462 self._wait_for_idle()
463 463 ar2 = v.apply_async(time.sleep, 1)
464 464 self.assertIn(msg_id, self.client.results)
465 465 self.assertIn(msg_id, self.client.metadata)
466 466 self.client.purge_local_results(ar)
467 467 self.assertNotIn(msg_id, self.client.results)
468 468 self.assertNotIn(msg_id, self.client.metadata)
469 469 with self.assertRaises(RuntimeError):
470 470 self.client.purge_local_results(ar2)
471 471 ar2.get()
472 472 self.client.purge_local_results(ar2)
473 473
474 474 def test_purge_all_local_results_outstanding(self):
475 475 v = self.client[-1]
476 476 ar = v.apply_async(time.sleep, 1)
477 477 with self.assertRaises(RuntimeError):
478 478 self.client.purge_local_results('all')
479 479 ar.get()
480 480 self.client.purge_local_results('all')
481 481
482 482 def test_purge_all_hub_results(self):
483 483 self.client.purge_hub_results('all')
484 484 hist = self.client.hub_history()
485 485 self.assertEqual(len(hist), 0)
486 486
487 487 def test_purge_all_local_results(self):
488 488 self.client.purge_local_results('all')
489 489 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
490 490 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
491 491
492 492 def test_purge_all_results(self):
493 493 # ensure there are some tasks
494 494 for i in range(5):
495 495 self.client[:].apply_sync(lambda : 1)
496 496 self.client.wait(10)
497 497 self._wait_for_idle()
498 498 self.client.purge_results('all')
499 499 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
500 500 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
501 501 hist = self.client.hub_history()
502 502 self.assertEqual(len(hist), 0, msg="hub history not empty")
503 503
504 504 def test_purge_everything(self):
505 505 # ensure there are some tasks
506 506 for i in range(5):
507 507 self.client[:].apply_sync(lambda : 1)
508 508 self.client.wait(10)
509 509 self._wait_for_idle()
510 510 self.client.purge_everything()
511 511 # The client results
512 512 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
513 513 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
514 514 # The client "bookkeeping"
515 515 self.assertEqual(len(self.client.session.digest_history), 0, msg="session digest not empty")
516 516 self.assertEqual(len(self.client.history), 0, msg="client history not empty")
517 517 # the hub results
518 518 hist = self.client.hub_history()
519 519 self.assertEqual(len(hist), 0, msg="hub history not empty")
520 520
521 521
522 522 def test_spin_thread(self):
523 523 self.client.spin_thread(0.01)
524 524 ar = self.client[-1].apply_async(lambda : 1)
525 md = self.client.metadata[ar.msg_ids[0]]
526 # 3s timeout, 100ms poll
527 for i in range(30):
525 528 time.sleep(0.1)
526 self.assertTrue(ar.wall_time < 0.1,
527 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
528 )
529 if md['received'] is not None:
530 break
531 self.assertIsInstance(md['received'], datetime)
529 532
530 533 def test_stop_spin_thread(self):
531 534 self.client.spin_thread(0.01)
532 535 self.client.stop_spin_thread()
533 536 ar = self.client[-1].apply_async(lambda : 1)
534 time.sleep(0.15)
535 self.assertTrue(ar.wall_time > 0.1,
536 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
537 )
537 md = self.client.metadata[ar.msg_ids[0]]
538 # 500ms timeout, 100ms poll
539 for i in range(5):
540 time.sleep(0.1)
541 self.assertIsNone(md['received'], None)
538 542
539 543 def test_activate(self):
540 544 ip = get_ipython()
541 545 magics = ip.magics_manager.magics
542 546 self.assertTrue('px' in magics['line'])
543 547 self.assertTrue('px' in magics['cell'])
544 548 v0 = self.client.activate(-1, '0')
545 549 self.assertTrue('px0' in magics['line'])
546 550 self.assertTrue('px0' in magics['cell'])
547 551 self.assertEqual(v0.targets, self.client.ids[-1])
548 552 v0 = self.client.activate('all', 'all')
549 553 self.assertTrue('pxall' in magics['line'])
550 554 self.assertTrue('pxall' in magics['cell'])
551 555 self.assertEqual(v0.targets, 'all')
General Comments 0
You need to be logged in to leave comments. Login now