##// END OF EJS Templates
Mark test_push_numpy_nocopy as a known failure for Python 3 (gh-1699)
Thomas Kluyver -
Show More
@@ -1,543 +1,544 b''
1 1 # -*- coding: utf-8 -*-
2 2 """test View objects
3 3
4 4 Authors:
5 5
6 6 * Min RK
7 7 """
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 import sys
20 20 import time
21 21 from tempfile import mktemp
22 22 from StringIO import StringIO
23 23
24 24 import zmq
25 25 from nose import SkipTest
26 26
27 27 from IPython.testing import decorators as dec
28 28
29 29 from IPython import parallel as pmod
30 30 from IPython.parallel import error
31 31 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
32 32 from IPython.parallel import DirectView
33 33 from IPython.parallel.util import interactive
34 34
35 35 from IPython.parallel.tests import add_engines
36 36
37 37 from .clienttest import ClusterTestCase, crash, wait, skip_without
38 38
39 39 def setup():
40 40 add_engines(3, total=True)
41 41
42 42 class TestView(ClusterTestCase):
43 43
44 44 def test_z_crash_mux(self):
45 45 """test graceful handling of engine death (direct)"""
46 46 raise SkipTest("crash tests disabled, due to undesirable crash reports")
47 47 # self.add_engines(1)
48 48 eid = self.client.ids[-1]
49 49 ar = self.client[eid].apply_async(crash)
50 50 self.assertRaisesRemote(error.EngineError, ar.get, 10)
51 51 eid = ar.engine_id
52 52 tic = time.time()
53 53 while eid in self.client.ids and time.time()-tic < 5:
54 54 time.sleep(.01)
55 55 self.client.spin()
56 56 self.assertFalse(eid in self.client.ids, "Engine should have died")
57 57
58 58 def test_push_pull(self):
59 59 """test pushing and pulling"""
60 60 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
61 61 t = self.client.ids[-1]
62 62 v = self.client[t]
63 63 push = v.push
64 64 pull = v.pull
65 65 v.block=True
66 66 nengines = len(self.client)
67 67 push({'data':data})
68 68 d = pull('data')
69 69 self.assertEquals(d, data)
70 70 self.client[:].push({'data':data})
71 71 d = self.client[:].pull('data', block=True)
72 72 self.assertEquals(d, nengines*[data])
73 73 ar = push({'data':data}, block=False)
74 74 self.assertTrue(isinstance(ar, AsyncResult))
75 75 r = ar.get()
76 76 ar = self.client[:].pull('data', block=False)
77 77 self.assertTrue(isinstance(ar, AsyncResult))
78 78 r = ar.get()
79 79 self.assertEquals(r, nengines*[data])
80 80 self.client[:].push(dict(a=10,b=20))
81 81 r = self.client[:].pull(('a','b'), block=True)
82 82 self.assertEquals(r, nengines*[[10,20]])
83 83
84 84 def test_push_pull_function(self):
85 85 "test pushing and pulling functions"
86 86 def testf(x):
87 87 return 2.0*x
88 88
89 89 t = self.client.ids[-1]
90 90 v = self.client[t]
91 91 v.block=True
92 92 push = v.push
93 93 pull = v.pull
94 94 execute = v.execute
95 95 push({'testf':testf})
96 96 r = pull('testf')
97 97 self.assertEqual(r(1.0), testf(1.0))
98 98 execute('r = testf(10)')
99 99 r = pull('r')
100 100 self.assertEquals(r, testf(10))
101 101 ar = self.client[:].push({'testf':testf}, block=False)
102 102 ar.get()
103 103 ar = self.client[:].pull('testf', block=False)
104 104 rlist = ar.get()
105 105 for r in rlist:
106 106 self.assertEqual(r(1.0), testf(1.0))
107 107 execute("def g(x): return x*x")
108 108 r = pull(('testf','g'))
109 109 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
110 110
111 111 def test_push_function_globals(self):
112 112 """test that pushed functions have access to globals"""
113 113 @interactive
114 114 def geta():
115 115 return a
116 116 # self.add_engines(1)
117 117 v = self.client[-1]
118 118 v.block=True
119 119 v['f'] = geta
120 120 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
121 121 v.execute('a=5')
122 122 v.execute('b=f()')
123 123 self.assertEquals(v['b'], 5)
124 124
125 125 def test_push_function_defaults(self):
126 126 """test that pushed functions preserve default args"""
127 127 def echo(a=10):
128 128 return a
129 129 v = self.client[-1]
130 130 v.block=True
131 131 v['f'] = echo
132 132 v.execute('b=f()')
133 133 self.assertEquals(v['b'], 10)
134 134
135 135 def test_get_result(self):
136 136 """test getting results from the Hub."""
137 137 c = pmod.Client(profile='iptest')
138 138 # self.add_engines(1)
139 139 t = c.ids[-1]
140 140 v = c[t]
141 141 v2 = self.client[t]
142 142 ar = v.apply_async(wait, 1)
143 143 # give the monitor time to notice the message
144 144 time.sleep(.25)
145 145 ahr = v2.get_result(ar.msg_ids)
146 146 self.assertTrue(isinstance(ahr, AsyncHubResult))
147 147 self.assertEquals(ahr.get(), ar.get())
148 148 ar2 = v2.get_result(ar.msg_ids)
149 149 self.assertFalse(isinstance(ar2, AsyncHubResult))
150 150 c.spin()
151 151 c.close()
152 152
153 153 def test_run_newline(self):
154 154 """test that run appends newline to files"""
155 155 tmpfile = mktemp()
156 156 with open(tmpfile, 'w') as f:
157 157 f.write("""def g():
158 158 return 5
159 159 """)
160 160 v = self.client[-1]
161 161 v.run(tmpfile, block=True)
162 162 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
163 163
164 164 def test_apply_tracked(self):
165 165 """test tracking for apply"""
166 166 # self.add_engines(1)
167 167 t = self.client.ids[-1]
168 168 v = self.client[t]
169 169 v.block=False
170 170 def echo(n=1024*1024, **kwargs):
171 171 with v.temp_flags(**kwargs):
172 172 return v.apply(lambda x: x, 'x'*n)
173 173 ar = echo(1, track=False)
174 174 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
175 175 self.assertTrue(ar.sent)
176 176 ar = echo(track=True)
177 177 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
178 178 self.assertEquals(ar.sent, ar._tracker.done)
179 179 ar._tracker.wait()
180 180 self.assertTrue(ar.sent)
181 181
182 182 def test_push_tracked(self):
183 183 t = self.client.ids[-1]
184 184 ns = dict(x='x'*1024*1024)
185 185 v = self.client[t]
186 186 ar = v.push(ns, block=False, track=False)
187 187 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
188 188 self.assertTrue(ar.sent)
189 189
190 190 ar = v.push(ns, block=False, track=True)
191 191 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
192 192 ar._tracker.wait()
193 193 self.assertEquals(ar.sent, ar._tracker.done)
194 194 self.assertTrue(ar.sent)
195 195 ar.get()
196 196
197 197 def test_scatter_tracked(self):
198 198 t = self.client.ids
199 199 x='x'*1024*1024
200 200 ar = self.client[t].scatter('x', x, block=False, track=False)
201 201 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
202 202 self.assertTrue(ar.sent)
203 203
204 204 ar = self.client[t].scatter('x', x, block=False, track=True)
205 205 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
206 206 self.assertEquals(ar.sent, ar._tracker.done)
207 207 ar._tracker.wait()
208 208 self.assertTrue(ar.sent)
209 209 ar.get()
210 210
211 211 def test_remote_reference(self):
212 212 v = self.client[-1]
213 213 v['a'] = 123
214 214 ra = pmod.Reference('a')
215 215 b = v.apply_sync(lambda x: x, ra)
216 216 self.assertEquals(b, 123)
217 217
218 218
219 219 def test_scatter_gather(self):
220 220 view = self.client[:]
221 221 seq1 = range(16)
222 222 view.scatter('a', seq1)
223 223 seq2 = view.gather('a', block=True)
224 224 self.assertEquals(seq2, seq1)
225 225 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
226 226
227 227 @skip_without('numpy')
228 228 def test_scatter_gather_numpy(self):
229 229 import numpy
230 230 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
231 231 view = self.client[:]
232 232 a = numpy.arange(64)
233 233 view.scatter('a', a)
234 234 b = view.gather('a', block=True)
235 235 assert_array_equal(b, a)
236 236
237 @dec.known_failure_py3
237 238 @skip_without('numpy')
238 239 def test_push_numpy_nocopy(self):
239 240 import numpy
240 241 view = self.client[:]
241 242 a = numpy.arange(64)
242 243 view['A'] = a
243 244 @interactive
244 245 def check_writeable(x):
245 246 return x.flags.writeable
246 247
247 248 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
248 249 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
249 250
250 251 view.push(dict(B=a))
251 252 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
252 253 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
253 254
254 255 @skip_without('numpy')
255 256 def test_apply_numpy(self):
256 257 """view.apply(f, ndarray)"""
257 258 import numpy
258 259 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
259 260
260 261 A = numpy.random.random((100,100))
261 262 view = self.client[-1]
262 263 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
263 264 B = A.astype(dt)
264 265 C = view.apply_sync(lambda x:x, B)
265 266 assert_array_equal(B,C)
266 267
267 268 def test_map(self):
268 269 view = self.client[:]
269 270 def f(x):
270 271 return x**2
271 272 data = range(16)
272 273 r = view.map_sync(f, data)
273 274 self.assertEquals(r, map(f, data))
274 275
275 276 def test_map_iterable(self):
276 277 """test map on iterables (direct)"""
277 278 view = self.client[:]
278 279 # 101 is prime, so it won't be evenly distributed
279 280 arr = range(101)
280 281 # ensure it will be an iterator, even in Python 3
281 282 it = iter(arr)
282 283 r = view.map_sync(lambda x:x, arr)
283 284 self.assertEquals(r, list(arr))
284 285
285 286 def test_scatterGatherNonblocking(self):
286 287 data = range(16)
287 288 view = self.client[:]
288 289 view.scatter('a', data, block=False)
289 290 ar = view.gather('a', block=False)
290 291 self.assertEquals(ar.get(), data)
291 292
292 293 @skip_without('numpy')
293 294 def test_scatter_gather_numpy_nonblocking(self):
294 295 import numpy
295 296 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
296 297 a = numpy.arange(64)
297 298 view = self.client[:]
298 299 ar = view.scatter('a', a, block=False)
299 300 self.assertTrue(isinstance(ar, AsyncResult))
300 301 amr = view.gather('a', block=False)
301 302 self.assertTrue(isinstance(amr, AsyncMapResult))
302 303 assert_array_equal(amr.get(), a)
303 304
304 305 def test_execute(self):
305 306 view = self.client[:]
306 307 # self.client.debug=True
307 308 execute = view.execute
308 309 ar = execute('c=30', block=False)
309 310 self.assertTrue(isinstance(ar, AsyncResult))
310 311 ar = execute('d=[0,1,2]', block=False)
311 312 self.client.wait(ar, 1)
312 313 self.assertEquals(len(ar.get()), len(self.client))
313 314 for c in view['c']:
314 315 self.assertEquals(c, 30)
315 316
316 317 def test_abort(self):
317 318 view = self.client[-1]
318 319 ar = view.execute('import time; time.sleep(1)', block=False)
319 320 ar2 = view.apply_async(lambda : 2)
320 321 ar3 = view.apply_async(lambda : 3)
321 322 view.abort(ar2)
322 323 view.abort(ar3.msg_ids)
323 324 self.assertRaises(error.TaskAborted, ar2.get)
324 325 self.assertRaises(error.TaskAborted, ar3.get)
325 326
326 327 def test_abort_all(self):
327 328 """view.abort() aborts all outstanding tasks"""
328 329 view = self.client[-1]
329 330 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
330 331 view.abort()
331 332 view.wait(timeout=5)
332 333 for ar in ars[5:]:
333 334 self.assertRaises(error.TaskAborted, ar.get)
334 335
335 336 def test_temp_flags(self):
336 337 view = self.client[-1]
337 338 view.block=True
338 339 with view.temp_flags(block=False):
339 340 self.assertFalse(view.block)
340 341 self.assertTrue(view.block)
341 342
342 343 @dec.known_failure_py3
343 344 def test_importer(self):
344 345 view = self.client[-1]
345 346 view.clear(block=True)
346 347 with view.importer:
347 348 import re
348 349
349 350 @interactive
350 351 def findall(pat, s):
351 352 # this globals() step isn't necessary in real code
352 353 # only to prevent a closure in the test
353 354 re = globals()['re']
354 355 return re.findall(pat, s)
355 356
356 357 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
357 358
358 359 # parallel magic tests
359 360
360 361 def test_magic_px_blocking(self):
361 362 ip = get_ipython()
362 363 v = self.client[-1]
363 364 v.activate()
364 365 v.block=True
365 366
366 367 ip.magic_px('a=5')
367 368 self.assertEquals(v['a'], 5)
368 369 ip.magic_px('a=10')
369 370 self.assertEquals(v['a'], 10)
370 371 sio = StringIO()
371 372 savestdout = sys.stdout
372 373 sys.stdout = sio
373 374 # just 'print a' worst ~99% of the time, but this ensures that
374 375 # the stdout message has arrived when the result is finished:
375 376 ip.magic_px('import sys,time;print (a); sys.stdout.flush();time.sleep(0.2)')
376 377 sys.stdout = savestdout
377 378 buf = sio.getvalue()
378 379 self.assertTrue('[stdout:' in buf, buf)
379 380 self.assertTrue(buf.rstrip().endswith('10'))
380 381 self.assertRaisesRemote(ZeroDivisionError, ip.magic_px, '1/0')
381 382
382 383 def test_magic_px_nonblocking(self):
383 384 ip = get_ipython()
384 385 v = self.client[-1]
385 386 v.activate()
386 387 v.block=False
387 388
388 389 ip.magic_px('a=5')
389 390 self.assertEquals(v['a'], 5)
390 391 ip.magic_px('a=10')
391 392 self.assertEquals(v['a'], 10)
392 393 sio = StringIO()
393 394 savestdout = sys.stdout
394 395 sys.stdout = sio
395 396 ip.magic_px('print a')
396 397 sys.stdout = savestdout
397 398 buf = sio.getvalue()
398 399 self.assertFalse('[stdout:%i]'%v.targets in buf)
399 400 ip.magic_px('1/0')
400 401 ar = v.get_result(-1)
401 402 self.assertRaisesRemote(ZeroDivisionError, ar.get)
402 403
403 404 def test_magic_autopx_blocking(self):
404 405 ip = get_ipython()
405 406 v = self.client[-1]
406 407 v.activate()
407 408 v.block=True
408 409
409 410 sio = StringIO()
410 411 savestdout = sys.stdout
411 412 sys.stdout = sio
412 413 ip.magic_autopx()
413 414 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
414 415 ip.run_cell('print b')
415 416 ip.run_cell("b/c")
416 417 ip.run_code(compile('b*=2', '', 'single'))
417 418 ip.magic_autopx()
418 419 sys.stdout = savestdout
419 420 output = sio.getvalue().strip()
420 421 self.assertTrue(output.startswith('%autopx enabled'))
421 422 self.assertTrue(output.endswith('%autopx disabled'))
422 423 self.assertTrue('RemoteError: ZeroDivisionError' in output)
423 424 ar = v.get_result(-2)
424 425 self.assertEquals(v['a'], 5)
425 426 self.assertEquals(v['b'], 20)
426 427 self.assertRaisesRemote(ZeroDivisionError, ar.get)
427 428
428 429 def test_magic_autopx_nonblocking(self):
429 430 ip = get_ipython()
430 431 v = self.client[-1]
431 432 v.activate()
432 433 v.block=False
433 434
434 435 sio = StringIO()
435 436 savestdout = sys.stdout
436 437 sys.stdout = sio
437 438 ip.magic_autopx()
438 439 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
439 440 ip.run_cell('print b')
440 441 ip.run_cell("b/c")
441 442 ip.run_code(compile('b*=2', '', 'single'))
442 443 ip.magic_autopx()
443 444 sys.stdout = savestdout
444 445 output = sio.getvalue().strip()
445 446 self.assertTrue(output.startswith('%autopx enabled'))
446 447 self.assertTrue(output.endswith('%autopx disabled'))
447 448 self.assertFalse('ZeroDivisionError' in output)
448 449 ar = v.get_result(-2)
449 450 self.assertEquals(v['a'], 5)
450 451 self.assertEquals(v['b'], 20)
451 452 self.assertRaisesRemote(ZeroDivisionError, ar.get)
452 453
453 454 def test_magic_result(self):
454 455 ip = get_ipython()
455 456 v = self.client[-1]
456 457 v.activate()
457 458 v['a'] = 111
458 459 ra = v['a']
459 460
460 461 ar = ip.magic_result()
461 462 self.assertEquals(ar.msg_ids, [v.history[-1]])
462 463 self.assertEquals(ar.get(), 111)
463 464 ar = ip.magic_result('-2')
464 465 self.assertEquals(ar.msg_ids, [v.history[-2]])
465 466
466 467 def test_unicode_execute(self):
467 468 """test executing unicode strings"""
468 469 v = self.client[-1]
469 470 v.block=True
470 471 if sys.version_info[0] >= 3:
471 472 code="a='é'"
472 473 else:
473 474 code=u"a=u'é'"
474 475 v.execute(code)
475 476 self.assertEquals(v['a'], u'é')
476 477
477 478 def test_unicode_apply_result(self):
478 479 """test unicode apply results"""
479 480 v = self.client[-1]
480 481 r = v.apply_sync(lambda : u'é')
481 482 self.assertEquals(r, u'é')
482 483
483 484 def test_unicode_apply_arg(self):
484 485 """test passing unicode arguments to apply"""
485 486 v = self.client[-1]
486 487
487 488 @interactive
488 489 def check_unicode(a, check):
489 490 assert isinstance(a, unicode), "%r is not unicode"%a
490 491 assert isinstance(check, bytes), "%r is not bytes"%check
491 492 assert a.encode('utf8') == check, "%s != %s"%(a,check)
492 493
493 494 for s in [ u'é', u'ßø®∫',u'asdf' ]:
494 495 try:
495 496 v.apply_sync(check_unicode, s, s.encode('utf8'))
496 497 except error.RemoteError as e:
497 498 if e.ename == 'AssertionError':
498 499 self.fail(e.evalue)
499 500 else:
500 501 raise e
501 502
502 503 def test_map_reference(self):
503 504 """view.map(<Reference>, *seqs) should work"""
504 505 v = self.client[:]
505 506 v.scatter('n', self.client.ids, flatten=True)
506 507 v.execute("f = lambda x,y: x*y")
507 508 rf = pmod.Reference('f')
508 509 nlist = list(range(10))
509 510 mlist = nlist[::-1]
510 511 expected = [ m*n for m,n in zip(mlist, nlist) ]
511 512 result = v.map_sync(rf, mlist, nlist)
512 513 self.assertEquals(result, expected)
513 514
514 515 def test_apply_reference(self):
515 516 """view.apply(<Reference>, *args) should work"""
516 517 v = self.client[:]
517 518 v.scatter('n', self.client.ids, flatten=True)
518 519 v.execute("f = lambda x: n*x")
519 520 rf = pmod.Reference('f')
520 521 result = v.apply_sync(rf, 5)
521 522 expected = [ 5*id for id in self.client.ids ]
522 523 self.assertEquals(result, expected)
523 524
524 525 def test_eval_reference(self):
525 526 v = self.client[self.client.ids[0]]
526 527 v['g'] = range(5)
527 528 rg = pmod.Reference('g[0]')
528 529 echo = lambda x:x
529 530 self.assertEquals(v.apply_sync(echo, rg), 0)
530 531
531 532 def test_reference_nameerror(self):
532 533 v = self.client[self.client.ids[0]]
533 534 r = pmod.Reference('elvis_has_left')
534 535 echo = lambda x:x
535 536 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
536 537
537 538 def test_single_engine_map(self):
538 539 e0 = self.client[self.client.ids[0]]
539 540 r = range(5)
540 541 check = [ -1*i for i in r ]
541 542 result = e0.map_sync(lambda x: -1*x, r)
542 543 self.assertEquals(result, check)
543 544
General Comments 0
You need to be logged in to leave comments. Login now