##// END OF EJS Templates
Merge pull request #2808 from minrk/parallel_wait...
Min RK -
r9181:9db88115 merge
parent child Browse files
Show More
@@ -1,502 +1,517
1 1 """Tests for parallel client.py
2 2
3 3 Authors:
4 4
5 5 * Min RK
6 6 """
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 from __future__ import division
20 20
21 21 import time
22 22 from datetime import datetime
23 23 from tempfile import mktemp
24 24
25 25 import zmq
26 26
27 27 from IPython import parallel
28 28 from IPython.parallel.client import client as clientmod
29 29 from IPython.parallel import error
30 30 from IPython.parallel import AsyncResult, AsyncHubResult
31 31 from IPython.parallel import LoadBalancedView, DirectView
32 32
33 33 from clienttest import ClusterTestCase, segfault, wait, add_engines
34 34
35 35 def setup():
36 36 add_engines(4, total=True)
37 37
38 38 class TestClient(ClusterTestCase):
39 39
40 40 def test_ids(self):
41 41 n = len(self.client.ids)
42 42 self.add_engines(2)
43 43 self.assertEqual(len(self.client.ids), n+2)
44 44
45 45 def test_view_indexing(self):
46 46 """test index access for views"""
47 47 self.minimum_engines(4)
48 48 targets = self.client._build_targets('all')[-1]
49 49 v = self.client[:]
50 50 self.assertEqual(v.targets, targets)
51 51 t = self.client.ids[2]
52 52 v = self.client[t]
53 53 self.assertTrue(isinstance(v, DirectView))
54 54 self.assertEqual(v.targets, t)
55 55 t = self.client.ids[2:4]
56 56 v = self.client[t]
57 57 self.assertTrue(isinstance(v, DirectView))
58 58 self.assertEqual(v.targets, t)
59 59 v = self.client[::2]
60 60 self.assertTrue(isinstance(v, DirectView))
61 61 self.assertEqual(v.targets, targets[::2])
62 62 v = self.client[1::3]
63 63 self.assertTrue(isinstance(v, DirectView))
64 64 self.assertEqual(v.targets, targets[1::3])
65 65 v = self.client[:-3]
66 66 self.assertTrue(isinstance(v, DirectView))
67 67 self.assertEqual(v.targets, targets[:-3])
68 68 v = self.client[-1]
69 69 self.assertTrue(isinstance(v, DirectView))
70 70 self.assertEqual(v.targets, targets[-1])
71 71 self.assertRaises(TypeError, lambda : self.client[None])
72 72
73 73 def test_lbview_targets(self):
74 74 """test load_balanced_view targets"""
75 75 v = self.client.load_balanced_view()
76 76 self.assertEqual(v.targets, None)
77 77 v = self.client.load_balanced_view(-1)
78 78 self.assertEqual(v.targets, [self.client.ids[-1]])
79 79 v = self.client.load_balanced_view('all')
80 80 self.assertEqual(v.targets, None)
81 81
82 82 def test_dview_targets(self):
83 83 """test direct_view targets"""
84 84 v = self.client.direct_view()
85 85 self.assertEqual(v.targets, 'all')
86 86 v = self.client.direct_view('all')
87 87 self.assertEqual(v.targets, 'all')
88 88 v = self.client.direct_view(-1)
89 89 self.assertEqual(v.targets, self.client.ids[-1])
90 90
91 91 def test_lazy_all_targets(self):
92 92 """test lazy evaluation of rc.direct_view('all')"""
93 93 v = self.client.direct_view()
94 94 self.assertEqual(v.targets, 'all')
95 95
96 96 def double(x):
97 97 return x*2
98 98 seq = range(100)
99 99 ref = [ double(x) for x in seq ]
100 100
101 101 # add some engines, which should be used
102 102 self.add_engines(1)
103 103 n1 = len(self.client.ids)
104 104
105 105 # simple apply
106 106 r = v.apply_sync(lambda : 1)
107 107 self.assertEqual(r, [1] * n1)
108 108
109 109 # map goes through remotefunction
110 110 r = v.map_sync(double, seq)
111 111 self.assertEqual(r, ref)
112 112
113 113 # add a couple more engines, and try again
114 114 self.add_engines(2)
115 115 n2 = len(self.client.ids)
116 116 self.assertNotEqual(n2, n1)
117 117
118 118 # apply
119 119 r = v.apply_sync(lambda : 1)
120 120 self.assertEqual(r, [1] * n2)
121 121
122 122 # map
123 123 r = v.map_sync(double, seq)
124 124 self.assertEqual(r, ref)
125 125
126 126 def test_targets(self):
127 127 """test various valid targets arguments"""
128 128 build = self.client._build_targets
129 129 ids = self.client.ids
130 130 idents,targets = build(None)
131 131 self.assertEqual(ids, targets)
132 132
133 133 def test_clear(self):
134 134 """test clear behavior"""
135 135 self.minimum_engines(2)
136 136 v = self.client[:]
137 137 v.block=True
138 138 v.push(dict(a=5))
139 139 v.pull('a')
140 140 id0 = self.client.ids[-1]
141 141 self.client.clear(targets=id0, block=True)
142 142 a = self.client[:-1].get('a')
143 143 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
144 144 self.client.clear(block=True)
145 145 for i in self.client.ids:
146 146 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
147 147
148 148 def test_get_result(self):
149 149 """test getting results from the Hub."""
150 150 c = clientmod.Client(profile='iptest')
151 151 t = c.ids[-1]
152 152 ar = c[t].apply_async(wait, 1)
153 153 # give the monitor time to notice the message
154 154 time.sleep(.25)
155 155 ahr = self.client.get_result(ar.msg_ids)
156 156 self.assertTrue(isinstance(ahr, AsyncHubResult))
157 157 self.assertEqual(ahr.get(), ar.get())
158 158 ar2 = self.client.get_result(ar.msg_ids)
159 159 self.assertFalse(isinstance(ar2, AsyncHubResult))
160 160 c.close()
161 161
162 162 def test_get_execute_result(self):
163 163 """test getting execute results from the Hub."""
164 164 c = clientmod.Client(profile='iptest')
165 165 t = c.ids[-1]
166 166 cell = '\n'.join([
167 167 'import time',
168 168 'time.sleep(0.25)',
169 169 '5'
170 170 ])
171 171 ar = c[t].execute("import time; time.sleep(1)", silent=False)
172 172 # give the monitor time to notice the message
173 173 time.sleep(.25)
174 174 ahr = self.client.get_result(ar.msg_ids)
175 175 self.assertTrue(isinstance(ahr, AsyncHubResult))
176 176 self.assertEqual(ahr.get().pyout, ar.get().pyout)
177 177 ar2 = self.client.get_result(ar.msg_ids)
178 178 self.assertFalse(isinstance(ar2, AsyncHubResult))
179 179 c.close()
180 180
181 181 def test_ids_list(self):
182 182 """test client.ids"""
183 183 ids = self.client.ids
184 184 self.assertEqual(ids, self.client._ids)
185 185 self.assertFalse(ids is self.client._ids)
186 186 ids.remove(ids[-1])
187 187 self.assertNotEqual(ids, self.client._ids)
188 188
189 189 def test_queue_status(self):
190 190 ids = self.client.ids
191 191 id0 = ids[0]
192 192 qs = self.client.queue_status(targets=id0)
193 193 self.assertTrue(isinstance(qs, dict))
194 194 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
195 195 allqs = self.client.queue_status()
196 196 self.assertTrue(isinstance(allqs, dict))
197 197 intkeys = list(allqs.keys())
198 198 intkeys.remove('unassigned')
199 199 self.assertEqual(sorted(intkeys), sorted(self.client.ids))
200 200 unassigned = allqs.pop('unassigned')
201 201 for eid,qs in allqs.items():
202 202 self.assertTrue(isinstance(qs, dict))
203 203 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
204 204
205 205 def test_shutdown(self):
206 206 ids = self.client.ids
207 207 id0 = ids[0]
208 208 self.client.shutdown(id0, block=True)
209 209 while id0 in self.client.ids:
210 210 time.sleep(0.1)
211 211 self.client.spin()
212 212
213 213 self.assertRaises(IndexError, lambda : self.client[id0])
214 214
215 215 def test_result_status(self):
216 216 pass
217 217 # to be written
218 218
219 219 def test_db_query_dt(self):
220 220 """test db query by date"""
221 221 hist = self.client.hub_history()
222 222 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
223 223 tic = middle['submitted']
224 224 before = self.client.db_query({'submitted' : {'$lt' : tic}})
225 225 after = self.client.db_query({'submitted' : {'$gte' : tic}})
226 226 self.assertEqual(len(before)+len(after),len(hist))
227 227 for b in before:
228 228 self.assertTrue(b['submitted'] < tic)
229 229 for a in after:
230 230 self.assertTrue(a['submitted'] >= tic)
231 231 same = self.client.db_query({'submitted' : tic})
232 232 for s in same:
233 233 self.assertTrue(s['submitted'] == tic)
234 234
235 235 def test_db_query_keys(self):
236 236 """test extracting subset of record keys"""
237 237 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
238 238 for rec in found:
239 239 self.assertEqual(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
240 240
241 241 def test_db_query_default_keys(self):
242 242 """default db_query excludes buffers"""
243 243 found = self.client.db_query({'msg_id': {'$ne' : ''}})
244 244 for rec in found:
245 245 keys = set(rec.keys())
246 246 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
247 247 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
248 248
249 249 def test_db_query_msg_id(self):
250 250 """ensure msg_id is always in db queries"""
251 251 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
252 252 for rec in found:
253 253 self.assertTrue('msg_id' in rec.keys())
254 254 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
255 255 for rec in found:
256 256 self.assertTrue('msg_id' in rec.keys())
257 257 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
258 258 for rec in found:
259 259 self.assertTrue('msg_id' in rec.keys())
260 260
261 261 def test_db_query_get_result(self):
262 262 """pop in db_query shouldn't pop from result itself"""
263 263 self.client[:].apply_sync(lambda : 1)
264 264 found = self.client.db_query({'msg_id': {'$ne' : ''}})
265 265 rc2 = clientmod.Client(profile='iptest')
266 266 # If this bug is not fixed, this call will hang:
267 267 ar = rc2.get_result(self.client.history[-1])
268 268 ar.wait(2)
269 269 self.assertTrue(ar.ready())
270 270 ar.get()
271 271 rc2.close()
272 272
273 273 def test_db_query_in(self):
274 274 """test db query with '$in','$nin' operators"""
275 275 hist = self.client.hub_history()
276 276 even = hist[::2]
277 277 odd = hist[1::2]
278 278 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
279 279 found = [ r['msg_id'] for r in recs ]
280 280 self.assertEqual(set(even), set(found))
281 281 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
282 282 found = [ r['msg_id'] for r in recs ]
283 283 self.assertEqual(set(odd), set(found))
284 284
285 285 def test_hub_history(self):
286 286 hist = self.client.hub_history()
287 287 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
288 288 recdict = {}
289 289 for rec in recs:
290 290 recdict[rec['msg_id']] = rec
291 291
292 292 latest = datetime(1984,1,1)
293 293 for msg_id in hist:
294 294 rec = recdict[msg_id]
295 295 newt = rec['submitted']
296 296 self.assertTrue(newt >= latest)
297 297 latest = newt
298 298 ar = self.client[-1].apply_async(lambda : 1)
299 299 ar.get()
300 300 time.sleep(0.25)
301 301 self.assertEqual(self.client.hub_history()[-1:],ar.msg_ids)
302 302
303 303 def _wait_for_idle(self):
304 304 """wait for an engine to become idle, according to the Hub"""
305 305 rc = self.client
306 306
307 # step 1. wait for all requests to be noticed
308 # timeout 5s, polling every 100ms
309 msg_ids = set(rc.history)
310 hub_hist = rc.hub_history()
311 for i in range(50):
312 if msg_ids.difference(hub_hist):
313 time.sleep(0.1)
314 hub_hist = rc.hub_history()
315 else:
316 break
317
318 self.assertEqual(len(msg_ids.difference(hub_hist)), 0)
319
320 # step 2. wait for all requests to be done
307 321 # timeout 5s, polling every 100ms
308 322 qs = rc.queue_status()
309 323 for i in range(50):
310 324 if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids):
311 325 time.sleep(0.1)
312 326 qs = rc.queue_status()
313 327 else:
314 328 break
315 329
316 330 # ensure Hub up to date:
317 331 self.assertEqual(qs['unassigned'], 0)
318 332 for eid in rc.ids:
319 333 self.assertEqual(qs[eid]['tasks'], 0)
320 334
321 335
322 336 def test_resubmit(self):
323 337 def f():
324 338 import random
325 339 return random.random()
326 340 v = self.client.load_balanced_view()
327 341 ar = v.apply_async(f)
328 342 r1 = ar.get(1)
329 343 # give the Hub a chance to notice:
330 344 self._wait_for_idle()
331 345 ahr = self.client.resubmit(ar.msg_ids)
332 346 r2 = ahr.get(1)
333 347 self.assertFalse(r1 == r2)
334 348
335 349 def test_resubmit_chain(self):
336 350 """resubmit resubmitted tasks"""
337 351 v = self.client.load_balanced_view()
338 352 ar = v.apply_async(lambda x: x, 'x'*1024)
339 353 ar.get()
340 354 self._wait_for_idle()
341 355 ars = [ar]
342 356
343 357 for i in range(10):
344 358 ar = ars[-1]
345 359 ar2 = self.client.resubmit(ar.msg_ids)
346 360
347 361 [ ar.get() for ar in ars ]
348 362
349 363 def test_resubmit_header(self):
350 364 """resubmit shouldn't clobber the whole header"""
351 365 def f():
352 366 import random
353 367 return random.random()
354 368 v = self.client.load_balanced_view()
355 369 v.retries = 1
356 370 ar = v.apply_async(f)
357 371 r1 = ar.get(1)
358 372 # give the Hub a chance to notice:
359 373 self._wait_for_idle()
360 374 ahr = self.client.resubmit(ar.msg_ids)
361 375 ahr.get(1)
362 376 time.sleep(0.5)
363 377 records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header')
364 378 h1,h2 = [ r['header'] for r in records ]
365 379 for key in set(h1.keys()).union(set(h2.keys())):
366 380 if key in ('msg_id', 'date'):
367 381 self.assertNotEqual(h1[key], h2[key])
368 382 else:
369 383 self.assertEqual(h1[key], h2[key])
370 384
371 385 def test_resubmit_aborted(self):
372 386 def f():
373 387 import random
374 388 return random.random()
375 389 v = self.client.load_balanced_view()
376 390 # restrict to one engine, so we can put a sleep
377 391 # ahead of the task, so it will get aborted
378 392 eid = self.client.ids[-1]
379 393 v.targets = [eid]
380 394 sleep = v.apply_async(time.sleep, 0.5)
381 395 ar = v.apply_async(f)
382 396 ar.abort()
383 397 self.assertRaises(error.TaskAborted, ar.get)
384 398 # Give the Hub a chance to get up to date:
385 399 self._wait_for_idle()
386 400 ahr = self.client.resubmit(ar.msg_ids)
387 401 r2 = ahr.get(1)
388 402
389 403 def test_resubmit_inflight(self):
390 404 """resubmit of inflight task"""
391 405 v = self.client.load_balanced_view()
392 406 ar = v.apply_async(time.sleep,1)
393 407 # give the message a chance to arrive
394 408 time.sleep(0.2)
395 409 ahr = self.client.resubmit(ar.msg_ids)
396 410 ar.get(2)
397 411 ahr.get(2)
398 412
399 413 def test_resubmit_badkey(self):
400 414 """ensure KeyError on resubmit of nonexistant task"""
401 415 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
402 416
403 417 def test_purge_hub_results(self):
404 418 # ensure there are some tasks
405 419 for i in range(5):
406 420 self.client[:].apply_sync(lambda : 1)
407 421 # Wait for the Hub to realise the result is done:
408 422 # This prevents a race condition, where we
409 423 # might purge a result the Hub still thinks is pending.
410 time.sleep(0.1)
424 self._wait_for_idle()
411 425 rc2 = clientmod.Client(profile='iptest')
412 426 hist = self.client.hub_history()
413 427 ahr = rc2.get_result([hist[-1]])
414 428 ahr.wait(10)
415 429 self.client.purge_hub_results(hist[-1])
416 430 newhist = self.client.hub_history()
417 431 self.assertEqual(len(newhist)+1,len(hist))
418 432 rc2.spin()
419 433 rc2.close()
420 434
421 435 def test_purge_local_results(self):
422 436 # ensure there are some tasks
423 437 res = []
424 438 for i in range(5):
425 439 res.append(self.client[:].apply_async(lambda : 1))
426 time.sleep(0.1)
440 self._wait_for_idle()
427 441 self.client.wait(10) # wait for the results to come back
428 442 before = len(self.client.results)
429 443 self.assertEqual(len(self.client.metadata),before)
430 444 self.client.purge_local_results(res[-1])
431 445 self.assertEqual(len(self.client.results),before-len(res[-1]), msg="Not removed from results")
432 446 self.assertEqual(len(self.client.metadata),before-len(res[-1]), msg="Not removed from metadata")
433 447
434 448 def test_purge_all_hub_results(self):
435 449 self.client.purge_hub_results('all')
436 450 hist = self.client.hub_history()
437 451 self.assertEqual(len(hist), 0)
438 452
439 453 def test_purge_all_local_results(self):
440 454 self.client.purge_local_results('all')
441 455 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
442 456 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
443 457
444 458 def test_purge_all_results(self):
445 459 # ensure there are some tasks
446 460 for i in range(5):
447 461 self.client[:].apply_sync(lambda : 1)
448 462 self.client.wait(10)
463 self._wait_for_idle()
449 464 self.client.purge_results('all')
450 465 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
451 466 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
452 time.sleep(0.1)
453 hist = self.client.hub_history()#
467 hist = self.client.hub_history()
454 468 self.assertEqual(len(hist), 0, msg="hub history not empty")
455 469
456 470 def test_purge_everything(self):
457 471 # ensure there are some tasks
458 472 for i in range(5):
459 473 self.client[:].apply_sync(lambda : 1)
460 474 self.client.wait(10)
475 self._wait_for_idle()
461 476 self.client.purge_everything()
462 477 # The client results
463 478 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
464 479 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
465 # the hub results
466 hist = self.client.hub_history()
467 self.assertEqual(len(hist), 0, msg="hub history not empty")
468 480 # The client "bookkeeping"
469 481 self.assertEqual(len(self.client.session.digest_history), 0, msg="session digest not empty")
470 482 self.assertEqual(len(self.client.history), 0, msg="client history not empty")
483 # the hub results
484 hist = self.client.hub_history()
485 self.assertEqual(len(hist), 0, msg="hub history not empty")
471 486
472 487
473 488 def test_spin_thread(self):
474 489 self.client.spin_thread(0.01)
475 490 ar = self.client[-1].apply_async(lambda : 1)
476 491 time.sleep(0.1)
477 492 self.assertTrue(ar.wall_time < 0.1,
478 493 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
479 494 )
480 495
481 496 def test_stop_spin_thread(self):
482 497 self.client.spin_thread(0.01)
483 498 self.client.stop_spin_thread()
484 499 ar = self.client[-1].apply_async(lambda : 1)
485 500 time.sleep(0.15)
486 501 self.assertTrue(ar.wall_time > 0.1,
487 502 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
488 503 )
489 504
490 505 def test_activate(self):
491 506 ip = get_ipython()
492 507 magics = ip.magics_manager.magics
493 508 self.assertTrue('px' in magics['line'])
494 509 self.assertTrue('px' in magics['cell'])
495 510 v0 = self.client.activate(-1, '0')
496 511 self.assertTrue('px0' in magics['line'])
497 512 self.assertTrue('px0' in magics['cell'])
498 513 self.assertEqual(v0.targets, self.client.ids[-1])
499 514 v0 = self.client.activate('all', 'all')
500 515 self.assertTrue('pxall' in magics['line'])
501 516 self.assertTrue('pxall' in magics['cell'])
502 517 self.assertEqual(v0.targets, 'all')
General Comments 0
You need to be logged in to leave comments. Login now