##// END OF EJS Templates
test view.sync_imports
MinRK -
Show More
@@ -1,808 +1,834 b''
1 1 # -*- coding: utf-8 -*-
2 2 """test View objects
3 3
4 4 Authors:
5 5
6 6 * Min RK
7 7 """
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 import base64
20 20 import sys
21 21 import platform
22 22 import time
23 23 from collections import namedtuple
24 24 from tempfile import mktemp
25 25
26 26 import zmq
27 27 from nose.plugins.attrib import attr
28 28
29 29 from IPython.testing import decorators as dec
30 30 from IPython.utils.io import capture_output
31 31
32 32 from IPython import parallel as pmod
33 33 from IPython.parallel import error
34 34 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
35 35 from IPython.parallel.util import interactive
36 36
37 37 from IPython.parallel.tests import add_engines
38 38
39 39 from .clienttest import ClusterTestCase, crash, wait, skip_without
40 40
41 41 def setup():
42 42 add_engines(3, total=True)
43 43
44 44 point = namedtuple("point", "x y")
45 45
46 46 class TestView(ClusterTestCase):
47 47
48 48 def setUp(self):
49 49 # On Win XP, wait for resource cleanup, else parallel test group fails
50 50 if platform.system() == "Windows" and platform.win32_ver()[0] == "XP":
51 51 # 1 sec fails. 1.5 sec seems ok. Using 2 sec for margin of safety
52 52 time.sleep(2)
53 53 super(TestView, self).setUp()
54 54
55 55 @attr('crash')
56 56 def test_z_crash_mux(self):
57 57 """test graceful handling of engine death (direct)"""
58 58 # self.add_engines(1)
59 59 eid = self.client.ids[-1]
60 60 ar = self.client[eid].apply_async(crash)
61 61 self.assertRaisesRemote(error.EngineError, ar.get, 10)
62 62 eid = ar.engine_id
63 63 tic = time.time()
64 64 while eid in self.client.ids and time.time()-tic < 5:
65 65 time.sleep(.01)
66 66 self.client.spin()
67 67 self.assertFalse(eid in self.client.ids, "Engine should have died")
68 68
69 69 def test_push_pull(self):
70 70 """test pushing and pulling"""
71 71 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
72 72 t = self.client.ids[-1]
73 73 v = self.client[t]
74 74 push = v.push
75 75 pull = v.pull
76 76 v.block=True
77 77 nengines = len(self.client)
78 78 push({'data':data})
79 79 d = pull('data')
80 80 self.assertEqual(d, data)
81 81 self.client[:].push({'data':data})
82 82 d = self.client[:].pull('data', block=True)
83 83 self.assertEqual(d, nengines*[data])
84 84 ar = push({'data':data}, block=False)
85 85 self.assertTrue(isinstance(ar, AsyncResult))
86 86 r = ar.get()
87 87 ar = self.client[:].pull('data', block=False)
88 88 self.assertTrue(isinstance(ar, AsyncResult))
89 89 r = ar.get()
90 90 self.assertEqual(r, nengines*[data])
91 91 self.client[:].push(dict(a=10,b=20))
92 92 r = self.client[:].pull(('a','b'), block=True)
93 93 self.assertEqual(r, nengines*[[10,20]])
94 94
95 95 def test_push_pull_function(self):
96 96 "test pushing and pulling functions"
97 97 def testf(x):
98 98 return 2.0*x
99 99
100 100 t = self.client.ids[-1]
101 101 v = self.client[t]
102 102 v.block=True
103 103 push = v.push
104 104 pull = v.pull
105 105 execute = v.execute
106 106 push({'testf':testf})
107 107 r = pull('testf')
108 108 self.assertEqual(r(1.0), testf(1.0))
109 109 execute('r = testf(10)')
110 110 r = pull('r')
111 111 self.assertEqual(r, testf(10))
112 112 ar = self.client[:].push({'testf':testf}, block=False)
113 113 ar.get()
114 114 ar = self.client[:].pull('testf', block=False)
115 115 rlist = ar.get()
116 116 for r in rlist:
117 117 self.assertEqual(r(1.0), testf(1.0))
118 118 execute("def g(x): return x*x")
119 119 r = pull(('testf','g'))
120 120 self.assertEqual((r[0](10),r[1](10)), (testf(10), 100))
121 121
122 122 def test_push_function_globals(self):
123 123 """test that pushed functions have access to globals"""
124 124 @interactive
125 125 def geta():
126 126 return a
127 127 # self.add_engines(1)
128 128 v = self.client[-1]
129 129 v.block=True
130 130 v['f'] = geta
131 131 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
132 132 v.execute('a=5')
133 133 v.execute('b=f()')
134 134 self.assertEqual(v['b'], 5)
135 135
136 136 def test_push_function_defaults(self):
137 137 """test that pushed functions preserve default args"""
138 138 def echo(a=10):
139 139 return a
140 140 v = self.client[-1]
141 141 v.block=True
142 142 v['f'] = echo
143 143 v.execute('b=f()')
144 144 self.assertEqual(v['b'], 10)
145 145
146 146 def test_get_result(self):
147 147 """test getting results from the Hub."""
148 148 c = pmod.Client(profile='iptest')
149 149 # self.add_engines(1)
150 150 t = c.ids[-1]
151 151 v = c[t]
152 152 v2 = self.client[t]
153 153 ar = v.apply_async(wait, 1)
154 154 # give the monitor time to notice the message
155 155 time.sleep(.25)
156 156 ahr = v2.get_result(ar.msg_ids[0])
157 157 self.assertTrue(isinstance(ahr, AsyncHubResult))
158 158 self.assertEqual(ahr.get(), ar.get())
159 159 ar2 = v2.get_result(ar.msg_ids[0])
160 160 self.assertFalse(isinstance(ar2, AsyncHubResult))
161 161 c.spin()
162 162 c.close()
163 163
164 164 def test_run_newline(self):
165 165 """test that run appends newline to files"""
166 166 tmpfile = mktemp()
167 167 with open(tmpfile, 'w') as f:
168 168 f.write("""def g():
169 169 return 5
170 170 """)
171 171 v = self.client[-1]
172 172 v.run(tmpfile, block=True)
173 173 self.assertEqual(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
174 174
175 175 def test_apply_tracked(self):
176 176 """test tracking for apply"""
177 177 # self.add_engines(1)
178 178 t = self.client.ids[-1]
179 179 v = self.client[t]
180 180 v.block=False
181 181 def echo(n=1024*1024, **kwargs):
182 182 with v.temp_flags(**kwargs):
183 183 return v.apply(lambda x: x, 'x'*n)
184 184 ar = echo(1, track=False)
185 185 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
186 186 self.assertTrue(ar.sent)
187 187 ar = echo(track=True)
188 188 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
189 189 self.assertEqual(ar.sent, ar._tracker.done)
190 190 ar._tracker.wait()
191 191 self.assertTrue(ar.sent)
192 192
193 193 def test_push_tracked(self):
194 194 t = self.client.ids[-1]
195 195 ns = dict(x='x'*1024*1024)
196 196 v = self.client[t]
197 197 ar = v.push(ns, block=False, track=False)
198 198 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
199 199 self.assertTrue(ar.sent)
200 200
201 201 ar = v.push(ns, block=False, track=True)
202 202 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
203 203 ar._tracker.wait()
204 204 self.assertEqual(ar.sent, ar._tracker.done)
205 205 self.assertTrue(ar.sent)
206 206 ar.get()
207 207
208 208 def test_scatter_tracked(self):
209 209 t = self.client.ids
210 210 x='x'*1024*1024
211 211 ar = self.client[t].scatter('x', x, block=False, track=False)
212 212 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
213 213 self.assertTrue(ar.sent)
214 214
215 215 ar = self.client[t].scatter('x', x, block=False, track=True)
216 216 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
217 217 self.assertEqual(ar.sent, ar._tracker.done)
218 218 ar._tracker.wait()
219 219 self.assertTrue(ar.sent)
220 220 ar.get()
221 221
222 222 def test_remote_reference(self):
223 223 v = self.client[-1]
224 224 v['a'] = 123
225 225 ra = pmod.Reference('a')
226 226 b = v.apply_sync(lambda x: x, ra)
227 227 self.assertEqual(b, 123)
228 228
229 229
230 230 def test_scatter_gather(self):
231 231 view = self.client[:]
232 232 seq1 = range(16)
233 233 view.scatter('a', seq1)
234 234 seq2 = view.gather('a', block=True)
235 235 self.assertEqual(seq2, seq1)
236 236 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
237 237
238 238 @skip_without('numpy')
239 239 def test_scatter_gather_numpy(self):
240 240 import numpy
241 241 from numpy.testing.utils import assert_array_equal
242 242 view = self.client[:]
243 243 a = numpy.arange(64)
244 244 view.scatter('a', a, block=True)
245 245 b = view.gather('a', block=True)
246 246 assert_array_equal(b, a)
247 247
248 248 def test_scatter_gather_lazy(self):
249 249 """scatter/gather with targets='all'"""
250 250 view = self.client.direct_view(targets='all')
251 251 x = range(64)
252 252 view.scatter('x', x)
253 253 gathered = view.gather('x', block=True)
254 254 self.assertEqual(gathered, x)
255 255
256 256
257 257 @dec.known_failure_py3
258 258 @skip_without('numpy')
259 259 def test_push_numpy_nocopy(self):
260 260 import numpy
261 261 view = self.client[:]
262 262 a = numpy.arange(64)
263 263 view['A'] = a
264 264 @interactive
265 265 def check_writeable(x):
266 266 return x.flags.writeable
267 267
268 268 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
269 269 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
270 270
271 271 view.push(dict(B=a))
272 272 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
273 273 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
274 274
275 275 @skip_without('numpy')
276 276 def test_apply_numpy(self):
277 277 """view.apply(f, ndarray)"""
278 278 import numpy
279 279 from numpy.testing.utils import assert_array_equal
280 280
281 281 A = numpy.random.random((100,100))
282 282 view = self.client[-1]
283 283 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
284 284 B = A.astype(dt)
285 285 C = view.apply_sync(lambda x:x, B)
286 286 assert_array_equal(B,C)
287 287
288 288 @skip_without('numpy')
289 289 def test_push_pull_recarray(self):
290 290 """push/pull recarrays"""
291 291 import numpy
292 292 from numpy.testing.utils import assert_array_equal
293 293
294 294 view = self.client[-1]
295 295
296 296 R = numpy.array([
297 297 (1, 'hi', 0.),
298 298 (2**30, 'there', 2.5),
299 299 (-99999, 'world', -12345.6789),
300 300 ], [('n', int), ('s', '|S10'), ('f', float)])
301 301
302 302 view['RR'] = R
303 303 R2 = view['RR']
304 304
305 305 r_dtype, r_shape = view.apply_sync(interactive(lambda : (RR.dtype, RR.shape)))
306 306 self.assertEqual(r_dtype, R.dtype)
307 307 self.assertEqual(r_shape, R.shape)
308 308 self.assertEqual(R2.dtype, R.dtype)
309 309 self.assertEqual(R2.shape, R.shape)
310 310 assert_array_equal(R2, R)
311 311
312 312 @skip_without('pandas')
313 313 def test_push_pull_timeseries(self):
314 314 """push/pull pandas.TimeSeries"""
315 315 import pandas
316 316
317 317 ts = pandas.TimeSeries(range(10))
318 318
319 319 view = self.client[-1]
320 320
321 321 view.push(dict(ts=ts), block=True)
322 322 rts = view['ts']
323 323
324 324 self.assertEqual(type(rts), type(ts))
325 325 self.assertTrue((ts == rts).all())
326 326
327 327 def test_map(self):
328 328 view = self.client[:]
329 329 def f(x):
330 330 return x**2
331 331 data = range(16)
332 332 r = view.map_sync(f, data)
333 333 self.assertEqual(r, map(f, data))
334 334
335 335 def test_map_iterable(self):
336 336 """test map on iterables (direct)"""
337 337 view = self.client[:]
338 338 # 101 is prime, so it won't be evenly distributed
339 339 arr = range(101)
340 340 # ensure it will be an iterator, even in Python 3
341 341 it = iter(arr)
342 342 r = view.map_sync(lambda x: x, it)
343 343 self.assertEqual(r, list(arr))
344 344
345 345 @skip_without('numpy')
346 346 def test_map_numpy(self):
347 347 """test map on numpy arrays (direct)"""
348 348 import numpy
349 349 from numpy.testing.utils import assert_array_equal
350 350
351 351 view = self.client[:]
352 352 # 101 is prime, so it won't be evenly distributed
353 353 arr = numpy.arange(101)
354 354 r = view.map_sync(lambda x: x, arr)
355 355 assert_array_equal(r, arr)
356 356
357 357 def test_scatter_gather_nonblocking(self):
358 358 data = range(16)
359 359 view = self.client[:]
360 360 view.scatter('a', data, block=False)
361 361 ar = view.gather('a', block=False)
362 362 self.assertEqual(ar.get(), data)
363 363
364 364 @skip_without('numpy')
365 365 def test_scatter_gather_numpy_nonblocking(self):
366 366 import numpy
367 367 from numpy.testing.utils import assert_array_equal
368 368 a = numpy.arange(64)
369 369 view = self.client[:]
370 370 ar = view.scatter('a', a, block=False)
371 371 self.assertTrue(isinstance(ar, AsyncResult))
372 372 amr = view.gather('a', block=False)
373 373 self.assertTrue(isinstance(amr, AsyncMapResult))
374 374 assert_array_equal(amr.get(), a)
375 375
376 376 def test_execute(self):
377 377 view = self.client[:]
378 378 # self.client.debug=True
379 379 execute = view.execute
380 380 ar = execute('c=30', block=False)
381 381 self.assertTrue(isinstance(ar, AsyncResult))
382 382 ar = execute('d=[0,1,2]', block=False)
383 383 self.client.wait(ar, 1)
384 384 self.assertEqual(len(ar.get()), len(self.client))
385 385 for c in view['c']:
386 386 self.assertEqual(c, 30)
387 387
388 388 def test_abort(self):
389 389 view = self.client[-1]
390 390 ar = view.execute('import time; time.sleep(1)', block=False)
391 391 ar2 = view.apply_async(lambda : 2)
392 392 ar3 = view.apply_async(lambda : 3)
393 393 view.abort(ar2)
394 394 view.abort(ar3.msg_ids)
395 395 self.assertRaises(error.TaskAborted, ar2.get)
396 396 self.assertRaises(error.TaskAborted, ar3.get)
397 397
398 398 def test_abort_all(self):
399 399 """view.abort() aborts all outstanding tasks"""
400 400 view = self.client[-1]
401 401 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
402 402 view.abort()
403 403 view.wait(timeout=5)
404 404 for ar in ars[5:]:
405 405 self.assertRaises(error.TaskAborted, ar.get)
406 406
407 407 def test_temp_flags(self):
408 408 view = self.client[-1]
409 409 view.block=True
410 410 with view.temp_flags(block=False):
411 411 self.assertFalse(view.block)
412 412 self.assertTrue(view.block)
413 413
414 414 @dec.known_failure_py3
415 415 def test_importer(self):
416 416 view = self.client[-1]
417 417 view.clear(block=True)
418 418 with view.importer:
419 419 import re
420 420
421 421 @interactive
422 422 def findall(pat, s):
423 423 # this globals() step isn't necessary in real code
424 424 # only to prevent a closure in the test
425 425 re = globals()['re']
426 426 return re.findall(pat, s)
427 427
428 428 self.assertEqual(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
429 429
430 430 def test_unicode_execute(self):
431 431 """test executing unicode strings"""
432 432 v = self.client[-1]
433 433 v.block=True
434 434 if sys.version_info[0] >= 3:
435 435 code="a='é'"
436 436 else:
437 437 code=u"a=u'é'"
438 438 v.execute(code)
439 439 self.assertEqual(v['a'], u'é')
440 440
441 441 def test_unicode_apply_result(self):
442 442 """test unicode apply results"""
443 443 v = self.client[-1]
444 444 r = v.apply_sync(lambda : u'é')
445 445 self.assertEqual(r, u'é')
446 446
447 447 def test_unicode_apply_arg(self):
448 448 """test passing unicode arguments to apply"""
449 449 v = self.client[-1]
450 450
451 451 @interactive
452 452 def check_unicode(a, check):
453 453 assert isinstance(a, unicode), "%r is not unicode"%a
454 454 assert isinstance(check, bytes), "%r is not bytes"%check
455 455 assert a.encode('utf8') == check, "%s != %s"%(a,check)
456 456
457 457 for s in [ u'é', u'ßø®∫',u'asdf' ]:
458 458 try:
459 459 v.apply_sync(check_unicode, s, s.encode('utf8'))
460 460 except error.RemoteError as e:
461 461 if e.ename == 'AssertionError':
462 462 self.fail(e.evalue)
463 463 else:
464 464 raise e
465 465
466 466 def test_map_reference(self):
467 467 """view.map(<Reference>, *seqs) should work"""
468 468 v = self.client[:]
469 469 v.scatter('n', self.client.ids, flatten=True)
470 470 v.execute("f = lambda x,y: x*y")
471 471 rf = pmod.Reference('f')
472 472 nlist = list(range(10))
473 473 mlist = nlist[::-1]
474 474 expected = [ m*n for m,n in zip(mlist, nlist) ]
475 475 result = v.map_sync(rf, mlist, nlist)
476 476 self.assertEqual(result, expected)
477 477
478 478 def test_apply_reference(self):
479 479 """view.apply(<Reference>, *args) should work"""
480 480 v = self.client[:]
481 481 v.scatter('n', self.client.ids, flatten=True)
482 482 v.execute("f = lambda x: n*x")
483 483 rf = pmod.Reference('f')
484 484 result = v.apply_sync(rf, 5)
485 485 expected = [ 5*id for id in self.client.ids ]
486 486 self.assertEqual(result, expected)
487 487
488 488 def test_eval_reference(self):
489 489 v = self.client[self.client.ids[0]]
490 490 v['g'] = range(5)
491 491 rg = pmod.Reference('g[0]')
492 492 echo = lambda x:x
493 493 self.assertEqual(v.apply_sync(echo, rg), 0)
494 494
495 495 def test_reference_nameerror(self):
496 496 v = self.client[self.client.ids[0]]
497 497 r = pmod.Reference('elvis_has_left')
498 498 echo = lambda x:x
499 499 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
500 500
501 501 def test_single_engine_map(self):
502 502 e0 = self.client[self.client.ids[0]]
503 503 r = range(5)
504 504 check = [ -1*i for i in r ]
505 505 result = e0.map_sync(lambda x: -1*x, r)
506 506 self.assertEqual(result, check)
507 507
508 508 def test_len(self):
509 509 """len(view) makes sense"""
510 510 e0 = self.client[self.client.ids[0]]
511 511 self.assertEqual(len(e0), 1)
512 512 v = self.client[:]
513 513 self.assertEqual(len(v), len(self.client.ids))
514 514 v = self.client.direct_view('all')
515 515 self.assertEqual(len(v), len(self.client.ids))
516 516 v = self.client[:2]
517 517 self.assertEqual(len(v), 2)
518 518 v = self.client[:1]
519 519 self.assertEqual(len(v), 1)
520 520 v = self.client.load_balanced_view()
521 521 self.assertEqual(len(v), len(self.client.ids))
522 522
523 523
524 524 # begin execute tests
525 525
526 526 def test_execute_reply(self):
527 527 e0 = self.client[self.client.ids[0]]
528 528 e0.block = True
529 529 ar = e0.execute("5", silent=False)
530 530 er = ar.get()
531 531 self.assertEqual(str(er), "<ExecuteReply[%i]: 5>" % er.execution_count)
532 532 self.assertEqual(er.pyout['data']['text/plain'], '5')
533 533
534 534 def test_execute_reply_rich(self):
535 535 e0 = self.client[self.client.ids[0]]
536 536 e0.block = True
537 537 e0.execute("from IPython.display import Image, HTML")
538 538 ar = e0.execute("Image(data=b'garbage', format='png', width=10)", silent=False)
539 539 er = ar.get()
540 540 b64data = base64.encodestring(b'garbage').decode('ascii')
541 541 self.assertEqual(er._repr_png_(), (b64data, dict(width=10)))
542 542 ar = e0.execute("HTML('<b>bold</b>')", silent=False)
543 543 er = ar.get()
544 544 self.assertEqual(er._repr_html_(), "<b>bold</b>")
545 545
546 546 def test_execute_reply_stdout(self):
547 547 e0 = self.client[self.client.ids[0]]
548 548 e0.block = True
549 549 ar = e0.execute("print (5)", silent=False)
550 550 er = ar.get()
551 551 self.assertEqual(er.stdout.strip(), '5')
552 552
553 553 def test_execute_pyout(self):
554 554 """execute triggers pyout with silent=False"""
555 555 view = self.client[:]
556 556 ar = view.execute("5", silent=False, block=True)
557 557
558 558 expected = [{'text/plain' : '5'}] * len(view)
559 559 mimes = [ out['data'] for out in ar.pyout ]
560 560 self.assertEqual(mimes, expected)
561 561
562 562 def test_execute_silent(self):
563 563 """execute does not trigger pyout with silent=True"""
564 564 view = self.client[:]
565 565 ar = view.execute("5", block=True)
566 566 expected = [None] * len(view)
567 567 self.assertEqual(ar.pyout, expected)
568 568
569 569 def test_execute_magic(self):
570 570 """execute accepts IPython commands"""
571 571 view = self.client[:]
572 572 view.execute("a = 5")
573 573 ar = view.execute("%whos", block=True)
574 574 # this will raise, if that failed
575 575 ar.get(5)
576 576 for stdout in ar.stdout:
577 577 lines = stdout.splitlines()
578 578 self.assertEqual(lines[0].split(), ['Variable', 'Type', 'Data/Info'])
579 579 found = False
580 580 for line in lines[2:]:
581 581 split = line.split()
582 582 if split == ['a', 'int', '5']:
583 583 found = True
584 584 break
585 585 self.assertTrue(found, "whos output wrong: %s" % stdout)
586 586
587 587 def test_execute_displaypub(self):
588 588 """execute tracks display_pub output"""
589 589 view = self.client[:]
590 590 view.execute("from IPython.core.display import *")
591 591 ar = view.execute("[ display(i) for i in range(5) ]", block=True)
592 592
593 593 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
594 594 for outputs in ar.outputs:
595 595 mimes = [ out['data'] for out in outputs ]
596 596 self.assertEqual(mimes, expected)
597 597
598 598 def test_apply_displaypub(self):
599 599 """apply tracks display_pub output"""
600 600 view = self.client[:]
601 601 view.execute("from IPython.core.display import *")
602 602
603 603 @interactive
604 604 def publish():
605 605 [ display(i) for i in range(5) ]
606 606
607 607 ar = view.apply_async(publish)
608 608 ar.get(5)
609 609 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
610 610 for outputs in ar.outputs:
611 611 mimes = [ out['data'] for out in outputs ]
612 612 self.assertEqual(mimes, expected)
613 613
614 614 def test_execute_raises(self):
615 615 """exceptions in execute requests raise appropriately"""
616 616 view = self.client[-1]
617 617 ar = view.execute("1/0")
618 618 self.assertRaisesRemote(ZeroDivisionError, ar.get, 2)
619 619
620 620 def test_remoteerror_render_exception(self):
621 621 """RemoteErrors get nice tracebacks"""
622 622 view = self.client[-1]
623 623 ar = view.execute("1/0")
624 624 ip = get_ipython()
625 625 ip.user_ns['ar'] = ar
626 626 with capture_output() as io:
627 627 ip.run_cell("ar.get(2)")
628 628
629 629 self.assertTrue('ZeroDivisionError' in io.stdout, io.stdout)
630 630
631 631 def test_compositeerror_render_exception(self):
632 632 """CompositeErrors get nice tracebacks"""
633 633 view = self.client[:]
634 634 ar = view.execute("1/0")
635 635 ip = get_ipython()
636 636 ip.user_ns['ar'] = ar
637 637
638 638 with capture_output() as io:
639 639 ip.run_cell("ar.get(2)")
640 640
641 641 count = min(error.CompositeError.tb_limit, len(view))
642 642
643 643 self.assertEqual(io.stdout.count('ZeroDivisionError'), count * 2, io.stdout)
644 644 self.assertEqual(io.stdout.count('by zero'), count, io.stdout)
645 645 self.assertEqual(io.stdout.count(':execute'), count, io.stdout)
646 646
647 647 def test_compositeerror_truncate(self):
648 648 """Truncate CompositeErrors with many exceptions"""
649 649 view = self.client[:]
650 650 msg_ids = []
651 651 for i in range(10):
652 652 ar = view.execute("1/0")
653 653 msg_ids.extend(ar.msg_ids)
654 654
655 655 ar = self.client.get_result(msg_ids)
656 656 try:
657 657 ar.get()
658 658 except error.CompositeError as _e:
659 659 e = _e
660 660 else:
661 661 self.fail("Should have raised CompositeError")
662 662
663 663 lines = e.render_traceback()
664 664 with capture_output() as io:
665 665 e.print_traceback()
666 666
667 667 self.assertTrue("more exceptions" in lines[-1])
668 668 count = e.tb_limit
669 669
670 670 self.assertEqual(io.stdout.count('ZeroDivisionError'), 2 * count, io.stdout)
671 671 self.assertEqual(io.stdout.count('by zero'), count, io.stdout)
672 672 self.assertEqual(io.stdout.count(':execute'), count, io.stdout)
673 673
674 674 @dec.skipif_not_matplotlib
675 675 def test_magic_pylab(self):
676 676 """%pylab works on engines"""
677 677 view = self.client[-1]
678 678 ar = view.execute("%pylab inline")
679 679 # at least check if this raised:
680 680 reply = ar.get(5)
681 681 # include imports, in case user config
682 682 ar = view.execute("plot(rand(100))", silent=False)
683 683 reply = ar.get(5)
684 684 self.assertEqual(len(reply.outputs), 1)
685 685 output = reply.outputs[0]
686 686 self.assertTrue("data" in output)
687 687 data = output['data']
688 688 self.assertTrue("image/png" in data)
689 689
690 690 def test_func_default_func(self):
691 691 """interactively defined function as apply func default"""
692 692 def foo():
693 693 return 'foo'
694 694
695 695 def bar(f=foo):
696 696 return f()
697 697
698 698 view = self.client[-1]
699 699 ar = view.apply_async(bar)
700 700 r = ar.get(10)
701 701 self.assertEqual(r, 'foo')
702 702 def test_data_pub_single(self):
703 703 view = self.client[-1]
704 704 ar = view.execute('\n'.join([
705 705 'from IPython.kernel.zmq.datapub import publish_data',
706 706 'for i in range(5):',
707 707 ' publish_data(dict(i=i))'
708 708 ]), block=False)
709 709 self.assertTrue(isinstance(ar.data, dict))
710 710 ar.get(5)
711 711 self.assertEqual(ar.data, dict(i=4))
712 712
713 713 def test_data_pub(self):
714 714 view = self.client[:]
715 715 ar = view.execute('\n'.join([
716 716 'from IPython.kernel.zmq.datapub import publish_data',
717 717 'for i in range(5):',
718 718 ' publish_data(dict(i=i))'
719 719 ]), block=False)
720 720 self.assertTrue(all(isinstance(d, dict) for d in ar.data))
721 721 ar.get(5)
722 722 self.assertEqual(ar.data, [dict(i=4)] * len(ar))
723 723
724 724 def test_can_list_arg(self):
725 725 """args in lists are canned"""
726 726 view = self.client[-1]
727 727 view['a'] = 128
728 728 rA = pmod.Reference('a')
729 729 ar = view.apply_async(lambda x: x, [rA])
730 730 r = ar.get(5)
731 731 self.assertEqual(r, [128])
732 732
733 733 def test_can_dict_arg(self):
734 734 """args in dicts are canned"""
735 735 view = self.client[-1]
736 736 view['a'] = 128
737 737 rA = pmod.Reference('a')
738 738 ar = view.apply_async(lambda x: x, dict(foo=rA))
739 739 r = ar.get(5)
740 740 self.assertEqual(r, dict(foo=128))
741 741
742 742 def test_can_list_kwarg(self):
743 743 """kwargs in lists are canned"""
744 744 view = self.client[-1]
745 745 view['a'] = 128
746 746 rA = pmod.Reference('a')
747 747 ar = view.apply_async(lambda x=5: x, x=[rA])
748 748 r = ar.get(5)
749 749 self.assertEqual(r, [128])
750 750
751 751 def test_can_dict_kwarg(self):
752 752 """kwargs in dicts are canned"""
753 753 view = self.client[-1]
754 754 view['a'] = 128
755 755 rA = pmod.Reference('a')
756 756 ar = view.apply_async(lambda x=5: x, dict(foo=rA))
757 757 r = ar.get(5)
758 758 self.assertEqual(r, dict(foo=128))
759 759
760 760 def test_map_ref(self):
761 761 """view.map works with references"""
762 762 view = self.client[:]
763 763 ranks = sorted(self.client.ids)
764 764 view.scatter('rank', ranks, flatten=True)
765 765 rrank = pmod.Reference('rank')
766 766
767 767 amr = view.map_async(lambda x: x*2, [rrank] * len(view))
768 768 drank = amr.get(5)
769 769 self.assertEqual(drank, [ r*2 for r in ranks ])
770 770
771 771 def test_nested_getitem_setitem(self):
772 772 """get and set with view['a.b']"""
773 773 view = self.client[-1]
774 774 view.execute('\n'.join([
775 775 'class A(object): pass',
776 776 'a = A()',
777 777 'a.b = 128',
778 778 ]), block=True)
779 779 ra = pmod.Reference('a')
780 780
781 781 r = view.apply_sync(lambda x: x.b, ra)
782 782 self.assertEqual(r, 128)
783 783 self.assertEqual(view['a.b'], 128)
784 784
785 785 view['a.b'] = 0
786 786
787 787 r = view.apply_sync(lambda x: x.b, ra)
788 788 self.assertEqual(r, 0)
789 789 self.assertEqual(view['a.b'], 0)
790 790
791 791 def test_return_namedtuple(self):
792 792 def namedtuplify(x, y):
793 793 from IPython.parallel.tests.test_view import point
794 794 return point(x, y)
795 795
796 796 view = self.client[-1]
797 797 p = view.apply_sync(namedtuplify, 1, 2)
798 798 self.assertEqual(p.x, 1)
799 799 self.assertEqual(p.y, 2)
800 800
801 801 def test_apply_namedtuple(self):
802 802 def echoxy(p):
803 803 return p.y, p.x
804 804
805 805 view = self.client[-1]
806 806 tup = view.apply_sync(echoxy, point(1, 2))
807 807 self.assertEqual(tup, (2,1))
808
809 def test_sync_imports(self):
810 view = self.client[-1]
811 with capture_output() as io:
812 with view.sync_imports():
813 import IPython
814 self.assertIn("IPython", io.stdout)
815
816 @interactive
817 def find_ipython():
818 return 'IPython' in globals()
819
820 assert view.apply_sync(find_ipython)
821
822 def test_sync_imports_quiet(self):
823 view = self.client[-1]
824 with capture_output() as io:
825 with view.sync_imports(quiet=True):
826 import IPython
827 self.assertEqual(io.stdout, '')
828
829 @interactive
830 def find_ipython():
831 return 'IPython' in globals()
832
833 assert view.apply_sync(find_ipython)
808 834
General Comments 0
You need to be logged in to leave comments. Login now