##// END OF EJS Templates
don't use lazily-evaluated rc.ids in wait_for_idle...
MinRK -
Show More
@@ -1,522 +1,522 b''
1 1 """Tests for parallel client.py
2 2
3 3 Authors:
4 4
5 5 * Min RK
6 6 """
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 from __future__ import division
20 20
21 21 import time
22 22 from datetime import datetime
23 23 from tempfile import mktemp
24 24
25 25 import zmq
26 26
27 27 from IPython import parallel
28 28 from IPython.parallel.client import client as clientmod
29 29 from IPython.parallel import error
30 30 from IPython.parallel import AsyncResult, AsyncHubResult
31 31 from IPython.parallel import LoadBalancedView, DirectView
32 32
33 33 from .clienttest import ClusterTestCase, segfault, wait, add_engines
34 34
35 35 def setup():
36 36 add_engines(4, total=True)
37 37
38 38 class TestClient(ClusterTestCase):
39 39
40 40 def test_ids(self):
41 41 n = len(self.client.ids)
42 42 self.add_engines(2)
43 43 self.assertEqual(len(self.client.ids), n+2)
44 44
45 45 def test_view_indexing(self):
46 46 """test index access for views"""
47 47 self.minimum_engines(4)
48 48 targets = self.client._build_targets('all')[-1]
49 49 v = self.client[:]
50 50 self.assertEqual(v.targets, targets)
51 51 t = self.client.ids[2]
52 52 v = self.client[t]
53 53 self.assertTrue(isinstance(v, DirectView))
54 54 self.assertEqual(v.targets, t)
55 55 t = self.client.ids[2:4]
56 56 v = self.client[t]
57 57 self.assertTrue(isinstance(v, DirectView))
58 58 self.assertEqual(v.targets, t)
59 59 v = self.client[::2]
60 60 self.assertTrue(isinstance(v, DirectView))
61 61 self.assertEqual(v.targets, targets[::2])
62 62 v = self.client[1::3]
63 63 self.assertTrue(isinstance(v, DirectView))
64 64 self.assertEqual(v.targets, targets[1::3])
65 65 v = self.client[:-3]
66 66 self.assertTrue(isinstance(v, DirectView))
67 67 self.assertEqual(v.targets, targets[:-3])
68 68 v = self.client[-1]
69 69 self.assertTrue(isinstance(v, DirectView))
70 70 self.assertEqual(v.targets, targets[-1])
71 71 self.assertRaises(TypeError, lambda : self.client[None])
72 72
73 73 def test_lbview_targets(self):
74 74 """test load_balanced_view targets"""
75 75 v = self.client.load_balanced_view()
76 76 self.assertEqual(v.targets, None)
77 77 v = self.client.load_balanced_view(-1)
78 78 self.assertEqual(v.targets, [self.client.ids[-1]])
79 79 v = self.client.load_balanced_view('all')
80 80 self.assertEqual(v.targets, None)
81 81
82 82 def test_dview_targets(self):
83 83 """test direct_view targets"""
84 84 v = self.client.direct_view()
85 85 self.assertEqual(v.targets, 'all')
86 86 v = self.client.direct_view('all')
87 87 self.assertEqual(v.targets, 'all')
88 88 v = self.client.direct_view(-1)
89 89 self.assertEqual(v.targets, self.client.ids[-1])
90 90
91 91 def test_lazy_all_targets(self):
92 92 """test lazy evaluation of rc.direct_view('all')"""
93 93 v = self.client.direct_view()
94 94 self.assertEqual(v.targets, 'all')
95 95
96 96 def double(x):
97 97 return x*2
98 98 seq = list(range(100))
99 99 ref = [ double(x) for x in seq ]
100 100
101 101 # add some engines, which should be used
102 102 self.add_engines(1)
103 103 n1 = len(self.client.ids)
104 104
105 105 # simple apply
106 106 r = v.apply_sync(lambda : 1)
107 107 self.assertEqual(r, [1] * n1)
108 108
109 109 # map goes through remotefunction
110 110 r = v.map_sync(double, seq)
111 111 self.assertEqual(r, ref)
112 112
113 113 # add a couple more engines, and try again
114 114 self.add_engines(2)
115 115 n2 = len(self.client.ids)
116 116 self.assertNotEqual(n2, n1)
117 117
118 118 # apply
119 119 r = v.apply_sync(lambda : 1)
120 120 self.assertEqual(r, [1] * n2)
121 121
122 122 # map
123 123 r = v.map_sync(double, seq)
124 124 self.assertEqual(r, ref)
125 125
126 126 def test_targets(self):
127 127 """test various valid targets arguments"""
128 128 build = self.client._build_targets
129 129 ids = self.client.ids
130 130 idents,targets = build(None)
131 131 self.assertEqual(ids, targets)
132 132
133 133 def test_clear(self):
134 134 """test clear behavior"""
135 135 self.minimum_engines(2)
136 136 v = self.client[:]
137 137 v.block=True
138 138 v.push(dict(a=5))
139 139 v.pull('a')
140 140 id0 = self.client.ids[-1]
141 141 self.client.clear(targets=id0, block=True)
142 142 a = self.client[:-1].get('a')
143 143 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
144 144 self.client.clear(block=True)
145 145 for i in self.client.ids:
146 146 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
147 147
148 148 def test_get_result(self):
149 149 """test getting results from the Hub."""
150 150 c = clientmod.Client(profile='iptest')
151 151 t = c.ids[-1]
152 152 ar = c[t].apply_async(wait, 1)
153 153 # give the monitor time to notice the message
154 154 time.sleep(.25)
155 155 ahr = self.client.get_result(ar.msg_ids[0])
156 156 self.assertTrue(isinstance(ahr, AsyncHubResult))
157 157 self.assertEqual(ahr.get(), ar.get())
158 158 ar2 = self.client.get_result(ar.msg_ids[0])
159 159 self.assertFalse(isinstance(ar2, AsyncHubResult))
160 160 c.close()
161 161
162 162 def test_get_execute_result(self):
163 163 """test getting execute results from the Hub."""
164 164 c = clientmod.Client(profile='iptest')
165 165 t = c.ids[-1]
166 166 cell = '\n'.join([
167 167 'import time',
168 168 'time.sleep(0.25)',
169 169 '5'
170 170 ])
171 171 ar = c[t].execute("import time; time.sleep(1)", silent=False)
172 172 # give the monitor time to notice the message
173 173 time.sleep(.25)
174 174 ahr = self.client.get_result(ar.msg_ids[0])
175 175 self.assertTrue(isinstance(ahr, AsyncHubResult))
176 176 self.assertEqual(ahr.get().pyout, ar.get().pyout)
177 177 ar2 = self.client.get_result(ar.msg_ids[0])
178 178 self.assertFalse(isinstance(ar2, AsyncHubResult))
179 179 c.close()
180 180
181 181 def test_ids_list(self):
182 182 """test client.ids"""
183 183 ids = self.client.ids
184 184 self.assertEqual(ids, self.client._ids)
185 185 self.assertFalse(ids is self.client._ids)
186 186 ids.remove(ids[-1])
187 187 self.assertNotEqual(ids, self.client._ids)
188 188
189 189 def test_queue_status(self):
190 190 ids = self.client.ids
191 191 id0 = ids[0]
192 192 qs = self.client.queue_status(targets=id0)
193 193 self.assertTrue(isinstance(qs, dict))
194 194 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
195 195 allqs = self.client.queue_status()
196 196 self.assertTrue(isinstance(allqs, dict))
197 197 intkeys = list(allqs.keys())
198 198 intkeys.remove('unassigned')
199 199 self.assertEqual(sorted(intkeys), sorted(self.client.ids))
200 200 unassigned = allqs.pop('unassigned')
201 201 for eid,qs in allqs.items():
202 202 self.assertTrue(isinstance(qs, dict))
203 203 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
204 204
205 205 def test_shutdown(self):
206 206 ids = self.client.ids
207 207 id0 = ids[0]
208 208 self.client.shutdown(id0, block=True)
209 209 while id0 in self.client.ids:
210 210 time.sleep(0.1)
211 211 self.client.spin()
212 212
213 213 self.assertRaises(IndexError, lambda : self.client[id0])
214 214
215 215 def test_result_status(self):
216 216 pass
217 217 # to be written
218 218
219 219 def test_db_query_dt(self):
220 220 """test db query by date"""
221 221 hist = self.client.hub_history()
222 222 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
223 223 tic = middle['submitted']
224 224 before = self.client.db_query({'submitted' : {'$lt' : tic}})
225 225 after = self.client.db_query({'submitted' : {'$gte' : tic}})
226 226 self.assertEqual(len(before)+len(after),len(hist))
227 227 for b in before:
228 228 self.assertTrue(b['submitted'] < tic)
229 229 for a in after:
230 230 self.assertTrue(a['submitted'] >= tic)
231 231 same = self.client.db_query({'submitted' : tic})
232 232 for s in same:
233 233 self.assertTrue(s['submitted'] == tic)
234 234
235 235 def test_db_query_keys(self):
236 236 """test extracting subset of record keys"""
237 237 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
238 238 for rec in found:
239 239 self.assertEqual(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
240 240
241 241 def test_db_query_default_keys(self):
242 242 """default db_query excludes buffers"""
243 243 found = self.client.db_query({'msg_id': {'$ne' : ''}})
244 244 for rec in found:
245 245 keys = set(rec.keys())
246 246 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
247 247 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
248 248
249 249 def test_db_query_msg_id(self):
250 250 """ensure msg_id is always in db queries"""
251 251 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
252 252 for rec in found:
253 253 self.assertTrue('msg_id' in rec.keys())
254 254 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
255 255 for rec in found:
256 256 self.assertTrue('msg_id' in rec.keys())
257 257 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
258 258 for rec in found:
259 259 self.assertTrue('msg_id' in rec.keys())
260 260
261 261 def test_db_query_get_result(self):
262 262 """pop in db_query shouldn't pop from result itself"""
263 263 self.client[:].apply_sync(lambda : 1)
264 264 found = self.client.db_query({'msg_id': {'$ne' : ''}})
265 265 rc2 = clientmod.Client(profile='iptest')
266 266 # If this bug is not fixed, this call will hang:
267 267 ar = rc2.get_result(self.client.history[-1])
268 268 ar.wait(2)
269 269 self.assertTrue(ar.ready())
270 270 ar.get()
271 271 rc2.close()
272 272
273 273 def test_db_query_in(self):
274 274 """test db query with '$in','$nin' operators"""
275 275 hist = self.client.hub_history()
276 276 even = hist[::2]
277 277 odd = hist[1::2]
278 278 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
279 279 found = [ r['msg_id'] for r in recs ]
280 280 self.assertEqual(set(even), set(found))
281 281 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
282 282 found = [ r['msg_id'] for r in recs ]
283 283 self.assertEqual(set(odd), set(found))
284 284
285 285 def test_hub_history(self):
286 286 hist = self.client.hub_history()
287 287 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
288 288 recdict = {}
289 289 for rec in recs:
290 290 recdict[rec['msg_id']] = rec
291 291
292 292 latest = datetime(1984,1,1)
293 293 for msg_id in hist:
294 294 rec = recdict[msg_id]
295 295 newt = rec['submitted']
296 296 self.assertTrue(newt >= latest)
297 297 latest = newt
298 298 ar = self.client[-1].apply_async(lambda : 1)
299 299 ar.get()
300 300 time.sleep(0.25)
301 301 self.assertEqual(self.client.hub_history()[-1:],ar.msg_ids)
302 302
303 303 def _wait_for_idle(self):
304 304 """wait for the cluster to become idle, according to the everyone."""
305 305 rc = self.client
306 306
307 307 # step 0. wait for local results
308 308 # this should be sufficient 99% of the time.
309 309 rc.wait(timeout=5)
310 310
311 311 # step 1. wait for all requests to be noticed
312 312 # timeout 5s, polling every 100ms
313 313 msg_ids = set(rc.history)
314 314 hub_hist = rc.hub_history()
315 315 for i in range(50):
316 316 if msg_ids.difference(hub_hist):
317 317 time.sleep(0.1)
318 318 hub_hist = rc.hub_history()
319 319 else:
320 320 break
321 321
322 322 self.assertEqual(len(msg_ids.difference(hub_hist)), 0)
323 323
324 324 # step 2. wait for all requests to be done
325 325 # timeout 5s, polling every 100ms
326 326 qs = rc.queue_status()
327 327 for i in range(50):
328 if qs['unassigned'] or any(qs[eid]['tasks'] + qs[eid]['queue'] for eid in rc.ids):
328 if qs['unassigned'] or any(qs[eid]['tasks'] + qs[eid]['queue'] for eid in qs if eid != 'unassigned'):
329 329 time.sleep(0.1)
330 330 qs = rc.queue_status()
331 331 else:
332 332 break
333 333
334 334 # ensure Hub up to date:
335 335 self.assertEqual(qs['unassigned'], 0)
336 336 for eid in rc.ids:
337 337 self.assertEqual(qs[eid]['tasks'], 0)
338 338 self.assertEqual(qs[eid]['queue'], 0)
339 339
340 340
341 341 def test_resubmit(self):
342 342 def f():
343 343 import random
344 344 return random.random()
345 345 v = self.client.load_balanced_view()
346 346 ar = v.apply_async(f)
347 347 r1 = ar.get(1)
348 348 # give the Hub a chance to notice:
349 349 self._wait_for_idle()
350 350 ahr = self.client.resubmit(ar.msg_ids)
351 351 r2 = ahr.get(1)
352 352 self.assertFalse(r1 == r2)
353 353
354 354 def test_resubmit_chain(self):
355 355 """resubmit resubmitted tasks"""
356 356 v = self.client.load_balanced_view()
357 357 ar = v.apply_async(lambda x: x, 'x'*1024)
358 358 ar.get()
359 359 self._wait_for_idle()
360 360 ars = [ar]
361 361
362 362 for i in range(10):
363 363 ar = ars[-1]
364 364 ar2 = self.client.resubmit(ar.msg_ids)
365 365
366 366 [ ar.get() for ar in ars ]
367 367
368 368 def test_resubmit_header(self):
369 369 """resubmit shouldn't clobber the whole header"""
370 370 def f():
371 371 import random
372 372 return random.random()
373 373 v = self.client.load_balanced_view()
374 374 v.retries = 1
375 375 ar = v.apply_async(f)
376 376 r1 = ar.get(1)
377 377 # give the Hub a chance to notice:
378 378 self._wait_for_idle()
379 379 ahr = self.client.resubmit(ar.msg_ids)
380 380 ahr.get(1)
381 381 time.sleep(0.5)
382 382 records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header')
383 383 h1,h2 = [ r['header'] for r in records ]
384 384 for key in set(h1.keys()).union(set(h2.keys())):
385 385 if key in ('msg_id', 'date'):
386 386 self.assertNotEqual(h1[key], h2[key])
387 387 else:
388 388 self.assertEqual(h1[key], h2[key])
389 389
390 390 def test_resubmit_aborted(self):
391 391 def f():
392 392 import random
393 393 return random.random()
394 394 v = self.client.load_balanced_view()
395 395 # restrict to one engine, so we can put a sleep
396 396 # ahead of the task, so it will get aborted
397 397 eid = self.client.ids[-1]
398 398 v.targets = [eid]
399 399 sleep = v.apply_async(time.sleep, 0.5)
400 400 ar = v.apply_async(f)
401 401 ar.abort()
402 402 self.assertRaises(error.TaskAborted, ar.get)
403 403 # Give the Hub a chance to get up to date:
404 404 self._wait_for_idle()
405 405 ahr = self.client.resubmit(ar.msg_ids)
406 406 r2 = ahr.get(1)
407 407
408 408 def test_resubmit_inflight(self):
409 409 """resubmit of inflight task"""
410 410 v = self.client.load_balanced_view()
411 411 ar = v.apply_async(time.sleep,1)
412 412 # give the message a chance to arrive
413 413 time.sleep(0.2)
414 414 ahr = self.client.resubmit(ar.msg_ids)
415 415 ar.get(2)
416 416 ahr.get(2)
417 417
418 418 def test_resubmit_badkey(self):
419 419 """ensure KeyError on resubmit of nonexistant task"""
420 420 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
421 421
422 422 def test_purge_hub_results(self):
423 423 # ensure there are some tasks
424 424 for i in range(5):
425 425 self.client[:].apply_sync(lambda : 1)
426 426 # Wait for the Hub to realise the result is done:
427 427 # This prevents a race condition, where we
428 428 # might purge a result the Hub still thinks is pending.
429 429 self._wait_for_idle()
430 430 rc2 = clientmod.Client(profile='iptest')
431 431 hist = self.client.hub_history()
432 432 ahr = rc2.get_result([hist[-1]])
433 433 ahr.wait(10)
434 434 self.client.purge_hub_results(hist[-1])
435 435 newhist = self.client.hub_history()
436 436 self.assertEqual(len(newhist)+1,len(hist))
437 437 rc2.spin()
438 438 rc2.close()
439 439
440 440 def test_purge_local_results(self):
441 441 # ensure there are some tasks
442 442 res = []
443 443 for i in range(5):
444 444 res.append(self.client[:].apply_async(lambda : 1))
445 445 self._wait_for_idle()
446 446 self.client.wait(10) # wait for the results to come back
447 447 before = len(self.client.results)
448 448 self.assertEqual(len(self.client.metadata),before)
449 449 self.client.purge_local_results(res[-1])
450 450 self.assertEqual(len(self.client.results),before-len(res[-1]), msg="Not removed from results")
451 451 self.assertEqual(len(self.client.metadata),before-len(res[-1]), msg="Not removed from metadata")
452 452
453 453 def test_purge_all_hub_results(self):
454 454 self.client.purge_hub_results('all')
455 455 hist = self.client.hub_history()
456 456 self.assertEqual(len(hist), 0)
457 457
458 458 def test_purge_all_local_results(self):
459 459 self.client.purge_local_results('all')
460 460 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
461 461 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
462 462
463 463 def test_purge_all_results(self):
464 464 # ensure there are some tasks
465 465 for i in range(5):
466 466 self.client[:].apply_sync(lambda : 1)
467 467 self.client.wait(10)
468 468 self._wait_for_idle()
469 469 self.client.purge_results('all')
470 470 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
471 471 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
472 472 hist = self.client.hub_history()
473 473 self.assertEqual(len(hist), 0, msg="hub history not empty")
474 474
475 475 def test_purge_everything(self):
476 476 # ensure there are some tasks
477 477 for i in range(5):
478 478 self.client[:].apply_sync(lambda : 1)
479 479 self.client.wait(10)
480 480 self._wait_for_idle()
481 481 self.client.purge_everything()
482 482 # The client results
483 483 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
484 484 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
485 485 # The client "bookkeeping"
486 486 self.assertEqual(len(self.client.session.digest_history), 0, msg="session digest not empty")
487 487 self.assertEqual(len(self.client.history), 0, msg="client history not empty")
488 488 # the hub results
489 489 hist = self.client.hub_history()
490 490 self.assertEqual(len(hist), 0, msg="hub history not empty")
491 491
492 492
493 493 def test_spin_thread(self):
494 494 self.client.spin_thread(0.01)
495 495 ar = self.client[-1].apply_async(lambda : 1)
496 496 time.sleep(0.1)
497 497 self.assertTrue(ar.wall_time < 0.1,
498 498 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
499 499 )
500 500
501 501 def test_stop_spin_thread(self):
502 502 self.client.spin_thread(0.01)
503 503 self.client.stop_spin_thread()
504 504 ar = self.client[-1].apply_async(lambda : 1)
505 505 time.sleep(0.15)
506 506 self.assertTrue(ar.wall_time > 0.1,
507 507 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
508 508 )
509 509
510 510 def test_activate(self):
511 511 ip = get_ipython()
512 512 magics = ip.magics_manager.magics
513 513 self.assertTrue('px' in magics['line'])
514 514 self.assertTrue('px' in magics['cell'])
515 515 v0 = self.client.activate(-1, '0')
516 516 self.assertTrue('px0' in magics['line'])
517 517 self.assertTrue('px0' in magics['cell'])
518 518 self.assertEqual(v0.targets, self.client.ids[-1])
519 519 v0 = self.client.activate('all', 'all')
520 520 self.assertTrue('pxall' in magics['line'])
521 521 self.assertTrue('pxall' in magics['cell'])
522 522 self.assertEqual(v0.targets, 'all')
General Comments 0
You need to be logged in to leave comments. Login now