##// END OF EJS Templates
Merge pull request #2025 from jdmarch/win_xp_parallel_test_wait_for_cleanup...
Min RK -
r7690:b1eae246 merge
parent child Browse files
Show More
@@ -1,565 +1,573 b''
1 1 # -*- coding: utf-8 -*-
2 2 """test View objects
3 3
4 4 Authors:
5 5
6 6 * Min RK
7 7 """
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 import sys
20 import platform
20 21 import time
21 22 from tempfile import mktemp
22 23 from StringIO import StringIO
23 24
24 25 import zmq
25 26 from nose import SkipTest
26 27
27 28 from IPython.testing import decorators as dec
28 29 from IPython.testing.ipunittest import ParametricTestCase
29 30
30 31 from IPython import parallel as pmod
31 32 from IPython.parallel import error
32 33 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
33 34 from IPython.parallel import DirectView
34 35 from IPython.parallel.util import interactive
35 36
36 37 from IPython.parallel.tests import add_engines
37 38
38 39 from .clienttest import ClusterTestCase, crash, wait, skip_without
39 40
40 41 def setup():
41 42 add_engines(3, total=True)
42 43
43 44 class TestView(ClusterTestCase, ParametricTestCase):
44 45
46 def setUp(self):
47 # On Win XP, wait for resource cleanup, else parallel test group fails
48 if platform.system() == "Windows" and platform.win32_ver()[0] == "XP":
49 # 1 sec fails. 1.5 sec seems ok. Using 2 sec for margin of safety
50 time.sleep(2)
51 super(TestView, self).setUp()
52
45 53 def test_z_crash_mux(self):
46 54 """test graceful handling of engine death (direct)"""
47 55 raise SkipTest("crash tests disabled, due to undesirable crash reports")
48 56 # self.add_engines(1)
49 57 eid = self.client.ids[-1]
50 58 ar = self.client[eid].apply_async(crash)
51 59 self.assertRaisesRemote(error.EngineError, ar.get, 10)
52 60 eid = ar.engine_id
53 61 tic = time.time()
54 62 while eid in self.client.ids and time.time()-tic < 5:
55 63 time.sleep(.01)
56 64 self.client.spin()
57 65 self.assertFalse(eid in self.client.ids, "Engine should have died")
58 66
59 67 def test_push_pull(self):
60 68 """test pushing and pulling"""
61 69 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
62 70 t = self.client.ids[-1]
63 71 v = self.client[t]
64 72 push = v.push
65 73 pull = v.pull
66 74 v.block=True
67 75 nengines = len(self.client)
68 76 push({'data':data})
69 77 d = pull('data')
70 78 self.assertEquals(d, data)
71 79 self.client[:].push({'data':data})
72 80 d = self.client[:].pull('data', block=True)
73 81 self.assertEquals(d, nengines*[data])
74 82 ar = push({'data':data}, block=False)
75 83 self.assertTrue(isinstance(ar, AsyncResult))
76 84 r = ar.get()
77 85 ar = self.client[:].pull('data', block=False)
78 86 self.assertTrue(isinstance(ar, AsyncResult))
79 87 r = ar.get()
80 88 self.assertEquals(r, nengines*[data])
81 89 self.client[:].push(dict(a=10,b=20))
82 90 r = self.client[:].pull(('a','b'), block=True)
83 91 self.assertEquals(r, nengines*[[10,20]])
84 92
85 93 def test_push_pull_function(self):
86 94 "test pushing and pulling functions"
87 95 def testf(x):
88 96 return 2.0*x
89 97
90 98 t = self.client.ids[-1]
91 99 v = self.client[t]
92 100 v.block=True
93 101 push = v.push
94 102 pull = v.pull
95 103 execute = v.execute
96 104 push({'testf':testf})
97 105 r = pull('testf')
98 106 self.assertEqual(r(1.0), testf(1.0))
99 107 execute('r = testf(10)')
100 108 r = pull('r')
101 109 self.assertEquals(r, testf(10))
102 110 ar = self.client[:].push({'testf':testf}, block=False)
103 111 ar.get()
104 112 ar = self.client[:].pull('testf', block=False)
105 113 rlist = ar.get()
106 114 for r in rlist:
107 115 self.assertEqual(r(1.0), testf(1.0))
108 116 execute("def g(x): return x*x")
109 117 r = pull(('testf','g'))
110 118 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
111 119
112 120 def test_push_function_globals(self):
113 121 """test that pushed functions have access to globals"""
114 122 @interactive
115 123 def geta():
116 124 return a
117 125 # self.add_engines(1)
118 126 v = self.client[-1]
119 127 v.block=True
120 128 v['f'] = geta
121 129 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
122 130 v.execute('a=5')
123 131 v.execute('b=f()')
124 132 self.assertEquals(v['b'], 5)
125 133
126 134 def test_push_function_defaults(self):
127 135 """test that pushed functions preserve default args"""
128 136 def echo(a=10):
129 137 return a
130 138 v = self.client[-1]
131 139 v.block=True
132 140 v['f'] = echo
133 141 v.execute('b=f()')
134 142 self.assertEquals(v['b'], 10)
135 143
136 144 def test_get_result(self):
137 145 """test getting results from the Hub."""
138 146 c = pmod.Client(profile='iptest')
139 147 # self.add_engines(1)
140 148 t = c.ids[-1]
141 149 v = c[t]
142 150 v2 = self.client[t]
143 151 ar = v.apply_async(wait, 1)
144 152 # give the monitor time to notice the message
145 153 time.sleep(.25)
146 154 ahr = v2.get_result(ar.msg_ids)
147 155 self.assertTrue(isinstance(ahr, AsyncHubResult))
148 156 self.assertEquals(ahr.get(), ar.get())
149 157 ar2 = v2.get_result(ar.msg_ids)
150 158 self.assertFalse(isinstance(ar2, AsyncHubResult))
151 159 c.spin()
152 160 c.close()
153 161
154 162 def test_run_newline(self):
155 163 """test that run appends newline to files"""
156 164 tmpfile = mktemp()
157 165 with open(tmpfile, 'w') as f:
158 166 f.write("""def g():
159 167 return 5
160 168 """)
161 169 v = self.client[-1]
162 170 v.run(tmpfile, block=True)
163 171 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
164 172
165 173 def test_apply_tracked(self):
166 174 """test tracking for apply"""
167 175 # self.add_engines(1)
168 176 t = self.client.ids[-1]
169 177 v = self.client[t]
170 178 v.block=False
171 179 def echo(n=1024*1024, **kwargs):
172 180 with v.temp_flags(**kwargs):
173 181 return v.apply(lambda x: x, 'x'*n)
174 182 ar = echo(1, track=False)
175 183 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
176 184 self.assertTrue(ar.sent)
177 185 ar = echo(track=True)
178 186 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
179 187 self.assertEquals(ar.sent, ar._tracker.done)
180 188 ar._tracker.wait()
181 189 self.assertTrue(ar.sent)
182 190
183 191 def test_push_tracked(self):
184 192 t = self.client.ids[-1]
185 193 ns = dict(x='x'*1024*1024)
186 194 v = self.client[t]
187 195 ar = v.push(ns, block=False, track=False)
188 196 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
189 197 self.assertTrue(ar.sent)
190 198
191 199 ar = v.push(ns, block=False, track=True)
192 200 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
193 201 ar._tracker.wait()
194 202 self.assertEquals(ar.sent, ar._tracker.done)
195 203 self.assertTrue(ar.sent)
196 204 ar.get()
197 205
198 206 def test_scatter_tracked(self):
199 207 t = self.client.ids
200 208 x='x'*1024*1024
201 209 ar = self.client[t].scatter('x', x, block=False, track=False)
202 210 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
203 211 self.assertTrue(ar.sent)
204 212
205 213 ar = self.client[t].scatter('x', x, block=False, track=True)
206 214 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
207 215 self.assertEquals(ar.sent, ar._tracker.done)
208 216 ar._tracker.wait()
209 217 self.assertTrue(ar.sent)
210 218 ar.get()
211 219
212 220 def test_remote_reference(self):
213 221 v = self.client[-1]
214 222 v['a'] = 123
215 223 ra = pmod.Reference('a')
216 224 b = v.apply_sync(lambda x: x, ra)
217 225 self.assertEquals(b, 123)
218 226
219 227
220 228 def test_scatter_gather(self):
221 229 view = self.client[:]
222 230 seq1 = range(16)
223 231 view.scatter('a', seq1)
224 232 seq2 = view.gather('a', block=True)
225 233 self.assertEquals(seq2, seq1)
226 234 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
227 235
228 236 @skip_without('numpy')
229 237 def test_scatter_gather_numpy(self):
230 238 import numpy
231 239 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
232 240 view = self.client[:]
233 241 a = numpy.arange(64)
234 242 view.scatter('a', a)
235 243 b = view.gather('a', block=True)
236 244 assert_array_equal(b, a)
237 245
238 246 def test_scatter_gather_lazy(self):
239 247 """scatter/gather with targets='all'"""
240 248 view = self.client.direct_view(targets='all')
241 249 x = range(64)
242 250 view.scatter('x', x)
243 251 gathered = view.gather('x', block=True)
244 252 self.assertEquals(gathered, x)
245 253
246 254
247 255 @dec.known_failure_py3
248 256 @skip_without('numpy')
249 257 def test_push_numpy_nocopy(self):
250 258 import numpy
251 259 view = self.client[:]
252 260 a = numpy.arange(64)
253 261 view['A'] = a
254 262 @interactive
255 263 def check_writeable(x):
256 264 return x.flags.writeable
257 265
258 266 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
259 267 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
260 268
261 269 view.push(dict(B=a))
262 270 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
263 271 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
264 272
265 273 @skip_without('numpy')
266 274 def test_apply_numpy(self):
267 275 """view.apply(f, ndarray)"""
268 276 import numpy
269 277 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
270 278
271 279 A = numpy.random.random((100,100))
272 280 view = self.client[-1]
273 281 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
274 282 B = A.astype(dt)
275 283 C = view.apply_sync(lambda x:x, B)
276 284 assert_array_equal(B,C)
277 285
278 286 def test_map(self):
279 287 view = self.client[:]
280 288 def f(x):
281 289 return x**2
282 290 data = range(16)
283 291 r = view.map_sync(f, data)
284 292 self.assertEquals(r, map(f, data))
285 293
286 294 def test_map_iterable(self):
287 295 """test map on iterables (direct)"""
288 296 view = self.client[:]
289 297 # 101 is prime, so it won't be evenly distributed
290 298 arr = range(101)
291 299 # ensure it will be an iterator, even in Python 3
292 300 it = iter(arr)
293 301 r = view.map_sync(lambda x:x, arr)
294 302 self.assertEquals(r, list(arr))
295 303
296 304 def test_scatterGatherNonblocking(self):
297 305 data = range(16)
298 306 view = self.client[:]
299 307 view.scatter('a', data, block=False)
300 308 ar = view.gather('a', block=False)
301 309 self.assertEquals(ar.get(), data)
302 310
303 311 @skip_without('numpy')
304 312 def test_scatter_gather_numpy_nonblocking(self):
305 313 import numpy
306 314 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
307 315 a = numpy.arange(64)
308 316 view = self.client[:]
309 317 ar = view.scatter('a', a, block=False)
310 318 self.assertTrue(isinstance(ar, AsyncResult))
311 319 amr = view.gather('a', block=False)
312 320 self.assertTrue(isinstance(amr, AsyncMapResult))
313 321 assert_array_equal(amr.get(), a)
314 322
315 323 def test_execute(self):
316 324 view = self.client[:]
317 325 # self.client.debug=True
318 326 execute = view.execute
319 327 ar = execute('c=30', block=False)
320 328 self.assertTrue(isinstance(ar, AsyncResult))
321 329 ar = execute('d=[0,1,2]', block=False)
322 330 self.client.wait(ar, 1)
323 331 self.assertEquals(len(ar.get()), len(self.client))
324 332 for c in view['c']:
325 333 self.assertEquals(c, 30)
326 334
327 335 def test_abort(self):
328 336 view = self.client[-1]
329 337 ar = view.execute('import time; time.sleep(1)', block=False)
330 338 ar2 = view.apply_async(lambda : 2)
331 339 ar3 = view.apply_async(lambda : 3)
332 340 view.abort(ar2)
333 341 view.abort(ar3.msg_ids)
334 342 self.assertRaises(error.TaskAborted, ar2.get)
335 343 self.assertRaises(error.TaskAborted, ar3.get)
336 344
337 345 def test_abort_all(self):
338 346 """view.abort() aborts all outstanding tasks"""
339 347 view = self.client[-1]
340 348 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
341 349 view.abort()
342 350 view.wait(timeout=5)
343 351 for ar in ars[5:]:
344 352 self.assertRaises(error.TaskAborted, ar.get)
345 353
346 354 def test_temp_flags(self):
347 355 view = self.client[-1]
348 356 view.block=True
349 357 with view.temp_flags(block=False):
350 358 self.assertFalse(view.block)
351 359 self.assertTrue(view.block)
352 360
353 361 @dec.known_failure_py3
354 362 def test_importer(self):
355 363 view = self.client[-1]
356 364 view.clear(block=True)
357 365 with view.importer:
358 366 import re
359 367
360 368 @interactive
361 369 def findall(pat, s):
362 370 # this globals() step isn't necessary in real code
363 371 # only to prevent a closure in the test
364 372 re = globals()['re']
365 373 return re.findall(pat, s)
366 374
367 375 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
368 376
369 377 def test_unicode_execute(self):
370 378 """test executing unicode strings"""
371 379 v = self.client[-1]
372 380 v.block=True
373 381 if sys.version_info[0] >= 3:
374 382 code="a='é'"
375 383 else:
376 384 code=u"a=u'é'"
377 385 v.execute(code)
378 386 self.assertEquals(v['a'], u'é')
379 387
380 388 def test_unicode_apply_result(self):
381 389 """test unicode apply results"""
382 390 v = self.client[-1]
383 391 r = v.apply_sync(lambda : u'é')
384 392 self.assertEquals(r, u'é')
385 393
386 394 def test_unicode_apply_arg(self):
387 395 """test passing unicode arguments to apply"""
388 396 v = self.client[-1]
389 397
390 398 @interactive
391 399 def check_unicode(a, check):
392 400 assert isinstance(a, unicode), "%r is not unicode"%a
393 401 assert isinstance(check, bytes), "%r is not bytes"%check
394 402 assert a.encode('utf8') == check, "%s != %s"%(a,check)
395 403
396 404 for s in [ u'é', u'ßø®∫',u'asdf' ]:
397 405 try:
398 406 v.apply_sync(check_unicode, s, s.encode('utf8'))
399 407 except error.RemoteError as e:
400 408 if e.ename == 'AssertionError':
401 409 self.fail(e.evalue)
402 410 else:
403 411 raise e
404 412
405 413 def test_map_reference(self):
406 414 """view.map(<Reference>, *seqs) should work"""
407 415 v = self.client[:]
408 416 v.scatter('n', self.client.ids, flatten=True)
409 417 v.execute("f = lambda x,y: x*y")
410 418 rf = pmod.Reference('f')
411 419 nlist = list(range(10))
412 420 mlist = nlist[::-1]
413 421 expected = [ m*n for m,n in zip(mlist, nlist) ]
414 422 result = v.map_sync(rf, mlist, nlist)
415 423 self.assertEquals(result, expected)
416 424
417 425 def test_apply_reference(self):
418 426 """view.apply(<Reference>, *args) should work"""
419 427 v = self.client[:]
420 428 v.scatter('n', self.client.ids, flatten=True)
421 429 v.execute("f = lambda x: n*x")
422 430 rf = pmod.Reference('f')
423 431 result = v.apply_sync(rf, 5)
424 432 expected = [ 5*id for id in self.client.ids ]
425 433 self.assertEquals(result, expected)
426 434
427 435 def test_eval_reference(self):
428 436 v = self.client[self.client.ids[0]]
429 437 v['g'] = range(5)
430 438 rg = pmod.Reference('g[0]')
431 439 echo = lambda x:x
432 440 self.assertEquals(v.apply_sync(echo, rg), 0)
433 441
434 442 def test_reference_nameerror(self):
435 443 v = self.client[self.client.ids[0]]
436 444 r = pmod.Reference('elvis_has_left')
437 445 echo = lambda x:x
438 446 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
439 447
440 448 def test_single_engine_map(self):
441 449 e0 = self.client[self.client.ids[0]]
442 450 r = range(5)
443 451 check = [ -1*i for i in r ]
444 452 result = e0.map_sync(lambda x: -1*x, r)
445 453 self.assertEquals(result, check)
446 454
447 455 def test_len(self):
448 456 """len(view) makes sense"""
449 457 e0 = self.client[self.client.ids[0]]
450 458 yield self.assertEquals(len(e0), 1)
451 459 v = self.client[:]
452 460 yield self.assertEquals(len(v), len(self.client.ids))
453 461 v = self.client.direct_view('all')
454 462 yield self.assertEquals(len(v), len(self.client.ids))
455 463 v = self.client[:2]
456 464 yield self.assertEquals(len(v), 2)
457 465 v = self.client[:1]
458 466 yield self.assertEquals(len(v), 1)
459 467 v = self.client.load_balanced_view()
460 468 yield self.assertEquals(len(v), len(self.client.ids))
461 469 # parametric tests seem to require manual closing?
462 470 self.client.close()
463 471
464 472
465 473 # begin execute tests
466 474
467 475 def test_execute_reply(self):
468 476 e0 = self.client[self.client.ids[0]]
469 477 e0.block = True
470 478 ar = e0.execute("5", silent=False)
471 479 er = ar.get()
472 480 self.assertEquals(str(er), "<ExecuteReply[%i]: 5>" % er.execution_count)
473 481 self.assertEquals(er.pyout['data']['text/plain'], '5')
474 482
475 483 def test_execute_reply_stdout(self):
476 484 e0 = self.client[self.client.ids[0]]
477 485 e0.block = True
478 486 ar = e0.execute("print (5)", silent=False)
479 487 er = ar.get()
480 488 self.assertEquals(er.stdout.strip(), '5')
481 489
482 490 def test_execute_pyout(self):
483 491 """execute triggers pyout with silent=False"""
484 492 view = self.client[:]
485 493 ar = view.execute("5", silent=False, block=True)
486 494
487 495 expected = [{'text/plain' : '5'}] * len(view)
488 496 mimes = [ out['data'] for out in ar.pyout ]
489 497 self.assertEquals(mimes, expected)
490 498
491 499 def test_execute_silent(self):
492 500 """execute does not trigger pyout with silent=True"""
493 501 view = self.client[:]
494 502 ar = view.execute("5", block=True)
495 503 expected = [None] * len(view)
496 504 self.assertEquals(ar.pyout, expected)
497 505
498 506 def test_execute_magic(self):
499 507 """execute accepts IPython commands"""
500 508 view = self.client[:]
501 509 view.execute("a = 5")
502 510 ar = view.execute("%whos", block=True)
503 511 # this will raise, if that failed
504 512 ar.get(5)
505 513 for stdout in ar.stdout:
506 514 lines = stdout.splitlines()
507 515 self.assertEquals(lines[0].split(), ['Variable', 'Type', 'Data/Info'])
508 516 found = False
509 517 for line in lines[2:]:
510 518 split = line.split()
511 519 if split == ['a', 'int', '5']:
512 520 found = True
513 521 break
514 522 self.assertTrue(found, "whos output wrong: %s" % stdout)
515 523
516 524 def test_execute_displaypub(self):
517 525 """execute tracks display_pub output"""
518 526 view = self.client[:]
519 527 view.execute("from IPython.core.display import *")
520 528 ar = view.execute("[ display(i) for i in range(5) ]", block=True)
521 529
522 530 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
523 531 for outputs in ar.outputs:
524 532 mimes = [ out['data'] for out in outputs ]
525 533 self.assertEquals(mimes, expected)
526 534
527 535 def test_apply_displaypub(self):
528 536 """apply tracks display_pub output"""
529 537 view = self.client[:]
530 538 view.execute("from IPython.core.display import *")
531 539
532 540 @interactive
533 541 def publish():
534 542 [ display(i) for i in range(5) ]
535 543
536 544 ar = view.apply_async(publish)
537 545 ar.get(5)
538 546 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
539 547 for outputs in ar.outputs:
540 548 mimes = [ out['data'] for out in outputs ]
541 549 self.assertEquals(mimes, expected)
542 550
543 551 def test_execute_raises(self):
544 552 """exceptions in execute requests raise appropriately"""
545 553 view = self.client[-1]
546 554 ar = view.execute("1/0")
547 555 self.assertRaisesRemote(ZeroDivisionError, ar.get, 2)
548 556
549 557 @dec.skipif_not_matplotlib
550 558 def test_magic_pylab(self):
551 559 """%pylab works on engines"""
552 560 view = self.client[-1]
553 561 ar = view.execute("%pylab inline")
554 562 # at least check if this raised:
555 563 reply = ar.get(5)
556 564 # include imports, in case user config
557 565 ar = view.execute("plot(rand(100))", silent=False)
558 566 reply = ar.get(5)
559 567 self.assertEquals(len(reply.outputs), 1)
560 568 output = reply.outputs[0]
561 569 self.assertTrue("data" in output)
562 570 data = output['data']
563 571 self.assertTrue("image/png" in data)
564 572
565 573
General Comments 0
You need to be logged in to leave comments. Login now