##// END OF EJS Templates
wait for empty queues as well as tasks...
MinRK -
Show More
@@ -1,517 +1,522 b''
1 1 """Tests for parallel client.py
2 2
3 3 Authors:
4 4
5 5 * Min RK
6 6 """
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 from __future__ import division
20 20
21 21 import time
22 22 from datetime import datetime
23 23 from tempfile import mktemp
24 24
25 25 import zmq
26 26
27 27 from IPython import parallel
28 28 from IPython.parallel.client import client as clientmod
29 29 from IPython.parallel import error
30 30 from IPython.parallel import AsyncResult, AsyncHubResult
31 31 from IPython.parallel import LoadBalancedView, DirectView
32 32
33 33 from .clienttest import ClusterTestCase, segfault, wait, add_engines
34 34
35 35 def setup():
36 36 add_engines(4, total=True)
37 37
38 38 class TestClient(ClusterTestCase):
39 39
40 40 def test_ids(self):
41 41 n = len(self.client.ids)
42 42 self.add_engines(2)
43 43 self.assertEqual(len(self.client.ids), n+2)
44 44
45 45 def test_view_indexing(self):
46 46 """test index access for views"""
47 47 self.minimum_engines(4)
48 48 targets = self.client._build_targets('all')[-1]
49 49 v = self.client[:]
50 50 self.assertEqual(v.targets, targets)
51 51 t = self.client.ids[2]
52 52 v = self.client[t]
53 53 self.assertTrue(isinstance(v, DirectView))
54 54 self.assertEqual(v.targets, t)
55 55 t = self.client.ids[2:4]
56 56 v = self.client[t]
57 57 self.assertTrue(isinstance(v, DirectView))
58 58 self.assertEqual(v.targets, t)
59 59 v = self.client[::2]
60 60 self.assertTrue(isinstance(v, DirectView))
61 61 self.assertEqual(v.targets, targets[::2])
62 62 v = self.client[1::3]
63 63 self.assertTrue(isinstance(v, DirectView))
64 64 self.assertEqual(v.targets, targets[1::3])
65 65 v = self.client[:-3]
66 66 self.assertTrue(isinstance(v, DirectView))
67 67 self.assertEqual(v.targets, targets[:-3])
68 68 v = self.client[-1]
69 69 self.assertTrue(isinstance(v, DirectView))
70 70 self.assertEqual(v.targets, targets[-1])
71 71 self.assertRaises(TypeError, lambda : self.client[None])
72 72
73 73 def test_lbview_targets(self):
74 74 """test load_balanced_view targets"""
75 75 v = self.client.load_balanced_view()
76 76 self.assertEqual(v.targets, None)
77 77 v = self.client.load_balanced_view(-1)
78 78 self.assertEqual(v.targets, [self.client.ids[-1]])
79 79 v = self.client.load_balanced_view('all')
80 80 self.assertEqual(v.targets, None)
81 81
82 82 def test_dview_targets(self):
83 83 """test direct_view targets"""
84 84 v = self.client.direct_view()
85 85 self.assertEqual(v.targets, 'all')
86 86 v = self.client.direct_view('all')
87 87 self.assertEqual(v.targets, 'all')
88 88 v = self.client.direct_view(-1)
89 89 self.assertEqual(v.targets, self.client.ids[-1])
90 90
91 91 def test_lazy_all_targets(self):
92 92 """test lazy evaluation of rc.direct_view('all')"""
93 93 v = self.client.direct_view()
94 94 self.assertEqual(v.targets, 'all')
95 95
96 96 def double(x):
97 97 return x*2
98 98 seq = list(range(100))
99 99 ref = [ double(x) for x in seq ]
100 100
101 101 # add some engines, which should be used
102 102 self.add_engines(1)
103 103 n1 = len(self.client.ids)
104 104
105 105 # simple apply
106 106 r = v.apply_sync(lambda : 1)
107 107 self.assertEqual(r, [1] * n1)
108 108
109 109 # map goes through remotefunction
110 110 r = v.map_sync(double, seq)
111 111 self.assertEqual(r, ref)
112 112
113 113 # add a couple more engines, and try again
114 114 self.add_engines(2)
115 115 n2 = len(self.client.ids)
116 116 self.assertNotEqual(n2, n1)
117 117
118 118 # apply
119 119 r = v.apply_sync(lambda : 1)
120 120 self.assertEqual(r, [1] * n2)
121 121
122 122 # map
123 123 r = v.map_sync(double, seq)
124 124 self.assertEqual(r, ref)
125 125
126 126 def test_targets(self):
127 127 """test various valid targets arguments"""
128 128 build = self.client._build_targets
129 129 ids = self.client.ids
130 130 idents,targets = build(None)
131 131 self.assertEqual(ids, targets)
132 132
133 133 def test_clear(self):
134 134 """test clear behavior"""
135 135 self.minimum_engines(2)
136 136 v = self.client[:]
137 137 v.block=True
138 138 v.push(dict(a=5))
139 139 v.pull('a')
140 140 id0 = self.client.ids[-1]
141 141 self.client.clear(targets=id0, block=True)
142 142 a = self.client[:-1].get('a')
143 143 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
144 144 self.client.clear(block=True)
145 145 for i in self.client.ids:
146 146 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
147 147
148 148 def test_get_result(self):
149 149 """test getting results from the Hub."""
150 150 c = clientmod.Client(profile='iptest')
151 151 t = c.ids[-1]
152 152 ar = c[t].apply_async(wait, 1)
153 153 # give the monitor time to notice the message
154 154 time.sleep(.25)
155 155 ahr = self.client.get_result(ar.msg_ids[0])
156 156 self.assertTrue(isinstance(ahr, AsyncHubResult))
157 157 self.assertEqual(ahr.get(), ar.get())
158 158 ar2 = self.client.get_result(ar.msg_ids[0])
159 159 self.assertFalse(isinstance(ar2, AsyncHubResult))
160 160 c.close()
161 161
162 162 def test_get_execute_result(self):
163 163 """test getting execute results from the Hub."""
164 164 c = clientmod.Client(profile='iptest')
165 165 t = c.ids[-1]
166 166 cell = '\n'.join([
167 167 'import time',
168 168 'time.sleep(0.25)',
169 169 '5'
170 170 ])
171 171 ar = c[t].execute("import time; time.sleep(1)", silent=False)
172 172 # give the monitor time to notice the message
173 173 time.sleep(.25)
174 174 ahr = self.client.get_result(ar.msg_ids[0])
175 175 self.assertTrue(isinstance(ahr, AsyncHubResult))
176 176 self.assertEqual(ahr.get().pyout, ar.get().pyout)
177 177 ar2 = self.client.get_result(ar.msg_ids[0])
178 178 self.assertFalse(isinstance(ar2, AsyncHubResult))
179 179 c.close()
180 180
181 181 def test_ids_list(self):
182 182 """test client.ids"""
183 183 ids = self.client.ids
184 184 self.assertEqual(ids, self.client._ids)
185 185 self.assertFalse(ids is self.client._ids)
186 186 ids.remove(ids[-1])
187 187 self.assertNotEqual(ids, self.client._ids)
188 188
189 189 def test_queue_status(self):
190 190 ids = self.client.ids
191 191 id0 = ids[0]
192 192 qs = self.client.queue_status(targets=id0)
193 193 self.assertTrue(isinstance(qs, dict))
194 194 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
195 195 allqs = self.client.queue_status()
196 196 self.assertTrue(isinstance(allqs, dict))
197 197 intkeys = list(allqs.keys())
198 198 intkeys.remove('unassigned')
199 199 self.assertEqual(sorted(intkeys), sorted(self.client.ids))
200 200 unassigned = allqs.pop('unassigned')
201 201 for eid,qs in allqs.items():
202 202 self.assertTrue(isinstance(qs, dict))
203 203 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
204 204
205 205 def test_shutdown(self):
206 206 ids = self.client.ids
207 207 id0 = ids[0]
208 208 self.client.shutdown(id0, block=True)
209 209 while id0 in self.client.ids:
210 210 time.sleep(0.1)
211 211 self.client.spin()
212 212
213 213 self.assertRaises(IndexError, lambda : self.client[id0])
214 214
215 215 def test_result_status(self):
216 216 pass
217 217 # to be written
218 218
219 219 def test_db_query_dt(self):
220 220 """test db query by date"""
221 221 hist = self.client.hub_history()
222 222 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
223 223 tic = middle['submitted']
224 224 before = self.client.db_query({'submitted' : {'$lt' : tic}})
225 225 after = self.client.db_query({'submitted' : {'$gte' : tic}})
226 226 self.assertEqual(len(before)+len(after),len(hist))
227 227 for b in before:
228 228 self.assertTrue(b['submitted'] < tic)
229 229 for a in after:
230 230 self.assertTrue(a['submitted'] >= tic)
231 231 same = self.client.db_query({'submitted' : tic})
232 232 for s in same:
233 233 self.assertTrue(s['submitted'] == tic)
234 234
235 235 def test_db_query_keys(self):
236 236 """test extracting subset of record keys"""
237 237 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
238 238 for rec in found:
239 239 self.assertEqual(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
240 240
241 241 def test_db_query_default_keys(self):
242 242 """default db_query excludes buffers"""
243 243 found = self.client.db_query({'msg_id': {'$ne' : ''}})
244 244 for rec in found:
245 245 keys = set(rec.keys())
246 246 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
247 247 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
248 248
249 249 def test_db_query_msg_id(self):
250 250 """ensure msg_id is always in db queries"""
251 251 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
252 252 for rec in found:
253 253 self.assertTrue('msg_id' in rec.keys())
254 254 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
255 255 for rec in found:
256 256 self.assertTrue('msg_id' in rec.keys())
257 257 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
258 258 for rec in found:
259 259 self.assertTrue('msg_id' in rec.keys())
260 260
261 261 def test_db_query_get_result(self):
262 262 """pop in db_query shouldn't pop from result itself"""
263 263 self.client[:].apply_sync(lambda : 1)
264 264 found = self.client.db_query({'msg_id': {'$ne' : ''}})
265 265 rc2 = clientmod.Client(profile='iptest')
266 266 # If this bug is not fixed, this call will hang:
267 267 ar = rc2.get_result(self.client.history[-1])
268 268 ar.wait(2)
269 269 self.assertTrue(ar.ready())
270 270 ar.get()
271 271 rc2.close()
272 272
273 273 def test_db_query_in(self):
274 274 """test db query with '$in','$nin' operators"""
275 275 hist = self.client.hub_history()
276 276 even = hist[::2]
277 277 odd = hist[1::2]
278 278 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
279 279 found = [ r['msg_id'] for r in recs ]
280 280 self.assertEqual(set(even), set(found))
281 281 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
282 282 found = [ r['msg_id'] for r in recs ]
283 283 self.assertEqual(set(odd), set(found))
284 284
285 285 def test_hub_history(self):
286 286 hist = self.client.hub_history()
287 287 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
288 288 recdict = {}
289 289 for rec in recs:
290 290 recdict[rec['msg_id']] = rec
291 291
292 292 latest = datetime(1984,1,1)
293 293 for msg_id in hist:
294 294 rec = recdict[msg_id]
295 295 newt = rec['submitted']
296 296 self.assertTrue(newt >= latest)
297 297 latest = newt
298 298 ar = self.client[-1].apply_async(lambda : 1)
299 299 ar.get()
300 300 time.sleep(0.25)
301 301 self.assertEqual(self.client.hub_history()[-1:],ar.msg_ids)
302 302
303 303 def _wait_for_idle(self):
304 """wait for an engine to become idle, according to the Hub"""
304 """wait for the cluster to become idle, according to the everyone."""
305 305 rc = self.client
306 306
307 # step 0. wait for local results
308 # this should be sufficient 99% of the time.
309 rc.wait(timeout=5)
310
307 311 # step 1. wait for all requests to be noticed
308 312 # timeout 5s, polling every 100ms
309 313 msg_ids = set(rc.history)
310 314 hub_hist = rc.hub_history()
311 315 for i in range(50):
312 316 if msg_ids.difference(hub_hist):
313 317 time.sleep(0.1)
314 318 hub_hist = rc.hub_history()
315 319 else:
316 320 break
317 321
318 322 self.assertEqual(len(msg_ids.difference(hub_hist)), 0)
319 323
320 324 # step 2. wait for all requests to be done
321 325 # timeout 5s, polling every 100ms
322 326 qs = rc.queue_status()
323 327 for i in range(50):
324 if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids):
328 if qs['unassigned'] or any(qs[eid]['tasks'] + qs[eid]['queue'] for eid in rc.ids):
325 329 time.sleep(0.1)
326 330 qs = rc.queue_status()
327 331 else:
328 332 break
329 333
330 334 # ensure Hub up to date:
331 335 self.assertEqual(qs['unassigned'], 0)
332 336 for eid in rc.ids:
333 337 self.assertEqual(qs[eid]['tasks'], 0)
338 self.assertEqual(qs[eid]['queue'], 0)
334 339
335 340
336 341 def test_resubmit(self):
337 342 def f():
338 343 import random
339 344 return random.random()
340 345 v = self.client.load_balanced_view()
341 346 ar = v.apply_async(f)
342 347 r1 = ar.get(1)
343 348 # give the Hub a chance to notice:
344 349 self._wait_for_idle()
345 350 ahr = self.client.resubmit(ar.msg_ids)
346 351 r2 = ahr.get(1)
347 352 self.assertFalse(r1 == r2)
348 353
349 354 def test_resubmit_chain(self):
350 355 """resubmit resubmitted tasks"""
351 356 v = self.client.load_balanced_view()
352 357 ar = v.apply_async(lambda x: x, 'x'*1024)
353 358 ar.get()
354 359 self._wait_for_idle()
355 360 ars = [ar]
356 361
357 362 for i in range(10):
358 363 ar = ars[-1]
359 364 ar2 = self.client.resubmit(ar.msg_ids)
360 365
361 366 [ ar.get() for ar in ars ]
362 367
363 368 def test_resubmit_header(self):
364 369 """resubmit shouldn't clobber the whole header"""
365 370 def f():
366 371 import random
367 372 return random.random()
368 373 v = self.client.load_balanced_view()
369 374 v.retries = 1
370 375 ar = v.apply_async(f)
371 376 r1 = ar.get(1)
372 377 # give the Hub a chance to notice:
373 378 self._wait_for_idle()
374 379 ahr = self.client.resubmit(ar.msg_ids)
375 380 ahr.get(1)
376 381 time.sleep(0.5)
377 382 records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header')
378 383 h1,h2 = [ r['header'] for r in records ]
379 384 for key in set(h1.keys()).union(set(h2.keys())):
380 385 if key in ('msg_id', 'date'):
381 386 self.assertNotEqual(h1[key], h2[key])
382 387 else:
383 388 self.assertEqual(h1[key], h2[key])
384 389
385 390 def test_resubmit_aborted(self):
386 391 def f():
387 392 import random
388 393 return random.random()
389 394 v = self.client.load_balanced_view()
390 395 # restrict to one engine, so we can put a sleep
391 396 # ahead of the task, so it will get aborted
392 397 eid = self.client.ids[-1]
393 398 v.targets = [eid]
394 399 sleep = v.apply_async(time.sleep, 0.5)
395 400 ar = v.apply_async(f)
396 401 ar.abort()
397 402 self.assertRaises(error.TaskAborted, ar.get)
398 403 # Give the Hub a chance to get up to date:
399 404 self._wait_for_idle()
400 405 ahr = self.client.resubmit(ar.msg_ids)
401 406 r2 = ahr.get(1)
402 407
403 408 def test_resubmit_inflight(self):
404 409 """resubmit of inflight task"""
405 410 v = self.client.load_balanced_view()
406 411 ar = v.apply_async(time.sleep,1)
407 412 # give the message a chance to arrive
408 413 time.sleep(0.2)
409 414 ahr = self.client.resubmit(ar.msg_ids)
410 415 ar.get(2)
411 416 ahr.get(2)
412 417
413 418 def test_resubmit_badkey(self):
414 419 """ensure KeyError on resubmit of nonexistant task"""
415 420 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
416 421
417 422 def test_purge_hub_results(self):
418 423 # ensure there are some tasks
419 424 for i in range(5):
420 425 self.client[:].apply_sync(lambda : 1)
421 426 # Wait for the Hub to realise the result is done:
422 427 # This prevents a race condition, where we
423 428 # might purge a result the Hub still thinks is pending.
424 429 self._wait_for_idle()
425 430 rc2 = clientmod.Client(profile='iptest')
426 431 hist = self.client.hub_history()
427 432 ahr = rc2.get_result([hist[-1]])
428 433 ahr.wait(10)
429 434 self.client.purge_hub_results(hist[-1])
430 435 newhist = self.client.hub_history()
431 436 self.assertEqual(len(newhist)+1,len(hist))
432 437 rc2.spin()
433 438 rc2.close()
434 439
435 440 def test_purge_local_results(self):
436 441 # ensure there are some tasks
437 442 res = []
438 443 for i in range(5):
439 444 res.append(self.client[:].apply_async(lambda : 1))
440 445 self._wait_for_idle()
441 446 self.client.wait(10) # wait for the results to come back
442 447 before = len(self.client.results)
443 448 self.assertEqual(len(self.client.metadata),before)
444 449 self.client.purge_local_results(res[-1])
445 450 self.assertEqual(len(self.client.results),before-len(res[-1]), msg="Not removed from results")
446 451 self.assertEqual(len(self.client.metadata),before-len(res[-1]), msg="Not removed from metadata")
447 452
448 453 def test_purge_all_hub_results(self):
449 454 self.client.purge_hub_results('all')
450 455 hist = self.client.hub_history()
451 456 self.assertEqual(len(hist), 0)
452 457
453 458 def test_purge_all_local_results(self):
454 459 self.client.purge_local_results('all')
455 460 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
456 461 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
457 462
458 463 def test_purge_all_results(self):
459 464 # ensure there are some tasks
460 465 for i in range(5):
461 466 self.client[:].apply_sync(lambda : 1)
462 467 self.client.wait(10)
463 468 self._wait_for_idle()
464 469 self.client.purge_results('all')
465 470 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
466 471 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
467 472 hist = self.client.hub_history()
468 473 self.assertEqual(len(hist), 0, msg="hub history not empty")
469 474
470 475 def test_purge_everything(self):
471 476 # ensure there are some tasks
472 477 for i in range(5):
473 478 self.client[:].apply_sync(lambda : 1)
474 479 self.client.wait(10)
475 480 self._wait_for_idle()
476 481 self.client.purge_everything()
477 482 # The client results
478 483 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
479 484 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
480 485 # The client "bookkeeping"
481 486 self.assertEqual(len(self.client.session.digest_history), 0, msg="session digest not empty")
482 487 self.assertEqual(len(self.client.history), 0, msg="client history not empty")
483 488 # the hub results
484 489 hist = self.client.hub_history()
485 490 self.assertEqual(len(hist), 0, msg="hub history not empty")
486 491
487 492
488 493 def test_spin_thread(self):
489 494 self.client.spin_thread(0.01)
490 495 ar = self.client[-1].apply_async(lambda : 1)
491 496 time.sleep(0.1)
492 497 self.assertTrue(ar.wall_time < 0.1,
493 498 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
494 499 )
495 500
496 501 def test_stop_spin_thread(self):
497 502 self.client.spin_thread(0.01)
498 503 self.client.stop_spin_thread()
499 504 ar = self.client[-1].apply_async(lambda : 1)
500 505 time.sleep(0.15)
501 506 self.assertTrue(ar.wall_time > 0.1,
502 507 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
503 508 )
504 509
505 510 def test_activate(self):
506 511 ip = get_ipython()
507 512 magics = ip.magics_manager.magics
508 513 self.assertTrue('px' in magics['line'])
509 514 self.assertTrue('px' in magics['cell'])
510 515 v0 = self.client.activate(-1, '0')
511 516 self.assertTrue('px0' in magics['line'])
512 517 self.assertTrue('px0' in magics['cell'])
513 518 self.assertEqual(v0.targets, self.client.ids[-1])
514 519 v0 = self.client.activate('all', 'all')
515 520 self.assertTrue('pxall' in magics['line'])
516 521 self.assertTrue('pxall' in magics['cell'])
517 522 self.assertEqual(v0.targets, 'all')
General Comments 0
You need to be logged in to leave comments. Login now