##// END OF EJS Templates
test Composite/RemoteError tracebacks
MinRK -
Show More
@@ -1,678 +1,703 b''
1 1 # -*- coding: utf-8 -*-
2 2 """test View objects
3 3
4 4 Authors:
5 5
6 6 * Min RK
7 7 """
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 import sys
20 20 import platform
21 21 import time
22 22 from tempfile import mktemp
23 23 from StringIO import StringIO
24 24
25 25 import zmq
26 26 from nose import SkipTest
27 27
28 28 from IPython.testing import decorators as dec
29 29 from IPython.testing.ipunittest import ParametricTestCase
30 from IPython.utils.io import capture_output
30 31
31 32 from IPython import parallel as pmod
32 33 from IPython.parallel import error
33 34 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
34 35 from IPython.parallel import DirectView
35 36 from IPython.parallel.util import interactive
36 37
37 38 from IPython.parallel.tests import add_engines
38 39
39 40 from .clienttest import ClusterTestCase, crash, wait, skip_without
40 41
41 42 def setup():
42 43 add_engines(3, total=True)
43 44
44 45 class TestView(ClusterTestCase, ParametricTestCase):
45 46
46 47 def setUp(self):
47 48 # On Win XP, wait for resource cleanup, else parallel test group fails
48 49 if platform.system() == "Windows" and platform.win32_ver()[0] == "XP":
49 50 # 1 sec fails. 1.5 sec seems ok. Using 2 sec for margin of safety
50 51 time.sleep(2)
51 52 super(TestView, self).setUp()
52 53
53 54 def test_z_crash_mux(self):
54 55 """test graceful handling of engine death (direct)"""
55 56 raise SkipTest("crash tests disabled, due to undesirable crash reports")
56 57 # self.add_engines(1)
57 58 eid = self.client.ids[-1]
58 59 ar = self.client[eid].apply_async(crash)
59 60 self.assertRaisesRemote(error.EngineError, ar.get, 10)
60 61 eid = ar.engine_id
61 62 tic = time.time()
62 63 while eid in self.client.ids and time.time()-tic < 5:
63 64 time.sleep(.01)
64 65 self.client.spin()
65 66 self.assertFalse(eid in self.client.ids, "Engine should have died")
66 67
67 68 def test_push_pull(self):
68 69 """test pushing and pulling"""
69 70 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
70 71 t = self.client.ids[-1]
71 72 v = self.client[t]
72 73 push = v.push
73 74 pull = v.pull
74 75 v.block=True
75 76 nengines = len(self.client)
76 77 push({'data':data})
77 78 d = pull('data')
78 79 self.assertEqual(d, data)
79 80 self.client[:].push({'data':data})
80 81 d = self.client[:].pull('data', block=True)
81 82 self.assertEqual(d, nengines*[data])
82 83 ar = push({'data':data}, block=False)
83 84 self.assertTrue(isinstance(ar, AsyncResult))
84 85 r = ar.get()
85 86 ar = self.client[:].pull('data', block=False)
86 87 self.assertTrue(isinstance(ar, AsyncResult))
87 88 r = ar.get()
88 89 self.assertEqual(r, nengines*[data])
89 90 self.client[:].push(dict(a=10,b=20))
90 91 r = self.client[:].pull(('a','b'), block=True)
91 92 self.assertEqual(r, nengines*[[10,20]])
92 93
93 94 def test_push_pull_function(self):
94 95 "test pushing and pulling functions"
95 96 def testf(x):
96 97 return 2.0*x
97 98
98 99 t = self.client.ids[-1]
99 100 v = self.client[t]
100 101 v.block=True
101 102 push = v.push
102 103 pull = v.pull
103 104 execute = v.execute
104 105 push({'testf':testf})
105 106 r = pull('testf')
106 107 self.assertEqual(r(1.0), testf(1.0))
107 108 execute('r = testf(10)')
108 109 r = pull('r')
109 110 self.assertEqual(r, testf(10))
110 111 ar = self.client[:].push({'testf':testf}, block=False)
111 112 ar.get()
112 113 ar = self.client[:].pull('testf', block=False)
113 114 rlist = ar.get()
114 115 for r in rlist:
115 116 self.assertEqual(r(1.0), testf(1.0))
116 117 execute("def g(x): return x*x")
117 118 r = pull(('testf','g'))
118 119 self.assertEqual((r[0](10),r[1](10)), (testf(10), 100))
119 120
120 121 def test_push_function_globals(self):
121 122 """test that pushed functions have access to globals"""
122 123 @interactive
123 124 def geta():
124 125 return a
125 126 # self.add_engines(1)
126 127 v = self.client[-1]
127 128 v.block=True
128 129 v['f'] = geta
129 130 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
130 131 v.execute('a=5')
131 132 v.execute('b=f()')
132 133 self.assertEqual(v['b'], 5)
133 134
134 135 def test_push_function_defaults(self):
135 136 """test that pushed functions preserve default args"""
136 137 def echo(a=10):
137 138 return a
138 139 v = self.client[-1]
139 140 v.block=True
140 141 v['f'] = echo
141 142 v.execute('b=f()')
142 143 self.assertEqual(v['b'], 10)
143 144
144 145 def test_get_result(self):
145 146 """test getting results from the Hub."""
146 147 c = pmod.Client(profile='iptest')
147 148 # self.add_engines(1)
148 149 t = c.ids[-1]
149 150 v = c[t]
150 151 v2 = self.client[t]
151 152 ar = v.apply_async(wait, 1)
152 153 # give the monitor time to notice the message
153 154 time.sleep(.25)
154 155 ahr = v2.get_result(ar.msg_ids)
155 156 self.assertTrue(isinstance(ahr, AsyncHubResult))
156 157 self.assertEqual(ahr.get(), ar.get())
157 158 ar2 = v2.get_result(ar.msg_ids)
158 159 self.assertFalse(isinstance(ar2, AsyncHubResult))
159 160 c.spin()
160 161 c.close()
161 162
162 163 def test_run_newline(self):
163 164 """test that run appends newline to files"""
164 165 tmpfile = mktemp()
165 166 with open(tmpfile, 'w') as f:
166 167 f.write("""def g():
167 168 return 5
168 169 """)
169 170 v = self.client[-1]
170 171 v.run(tmpfile, block=True)
171 172 self.assertEqual(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
172 173
173 174 def test_apply_tracked(self):
174 175 """test tracking for apply"""
175 176 # self.add_engines(1)
176 177 t = self.client.ids[-1]
177 178 v = self.client[t]
178 179 v.block=False
179 180 def echo(n=1024*1024, **kwargs):
180 181 with v.temp_flags(**kwargs):
181 182 return v.apply(lambda x: x, 'x'*n)
182 183 ar = echo(1, track=False)
183 184 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
184 185 self.assertTrue(ar.sent)
185 186 ar = echo(track=True)
186 187 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
187 188 self.assertEqual(ar.sent, ar._tracker.done)
188 189 ar._tracker.wait()
189 190 self.assertTrue(ar.sent)
190 191
191 192 def test_push_tracked(self):
192 193 t = self.client.ids[-1]
193 194 ns = dict(x='x'*1024*1024)
194 195 v = self.client[t]
195 196 ar = v.push(ns, block=False, track=False)
196 197 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
197 198 self.assertTrue(ar.sent)
198 199
199 200 ar = v.push(ns, block=False, track=True)
200 201 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
201 202 ar._tracker.wait()
202 203 self.assertEqual(ar.sent, ar._tracker.done)
203 204 self.assertTrue(ar.sent)
204 205 ar.get()
205 206
206 207 def test_scatter_tracked(self):
207 208 t = self.client.ids
208 209 x='x'*1024*1024
209 210 ar = self.client[t].scatter('x', x, block=False, track=False)
210 211 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
211 212 self.assertTrue(ar.sent)
212 213
213 214 ar = self.client[t].scatter('x', x, block=False, track=True)
214 215 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
215 216 self.assertEqual(ar.sent, ar._tracker.done)
216 217 ar._tracker.wait()
217 218 self.assertTrue(ar.sent)
218 219 ar.get()
219 220
220 221 def test_remote_reference(self):
221 222 v = self.client[-1]
222 223 v['a'] = 123
223 224 ra = pmod.Reference('a')
224 225 b = v.apply_sync(lambda x: x, ra)
225 226 self.assertEqual(b, 123)
226 227
227 228
228 229 def test_scatter_gather(self):
229 230 view = self.client[:]
230 231 seq1 = range(16)
231 232 view.scatter('a', seq1)
232 233 seq2 = view.gather('a', block=True)
233 234 self.assertEqual(seq2, seq1)
234 235 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
235 236
236 237 @skip_without('numpy')
237 238 def test_scatter_gather_numpy(self):
238 239 import numpy
239 240 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
240 241 view = self.client[:]
241 242 a = numpy.arange(64)
242 243 view.scatter('a', a, block=True)
243 244 b = view.gather('a', block=True)
244 245 assert_array_equal(b, a)
245 246
246 247 def test_scatter_gather_lazy(self):
247 248 """scatter/gather with targets='all'"""
248 249 view = self.client.direct_view(targets='all')
249 250 x = range(64)
250 251 view.scatter('x', x)
251 252 gathered = view.gather('x', block=True)
252 253 self.assertEqual(gathered, x)
253 254
254 255
255 256 @dec.known_failure_py3
256 257 @skip_without('numpy')
257 258 def test_push_numpy_nocopy(self):
258 259 import numpy
259 260 view = self.client[:]
260 261 a = numpy.arange(64)
261 262 view['A'] = a
262 263 @interactive
263 264 def check_writeable(x):
264 265 return x.flags.writeable
265 266
266 267 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
267 268 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
268 269
269 270 view.push(dict(B=a))
270 271 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
271 272 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
272 273
273 274 @skip_without('numpy')
274 275 def test_apply_numpy(self):
275 276 """view.apply(f, ndarray)"""
276 277 import numpy
277 278 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
278 279
279 280 A = numpy.random.random((100,100))
280 281 view = self.client[-1]
281 282 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
282 283 B = A.astype(dt)
283 284 C = view.apply_sync(lambda x:x, B)
284 285 assert_array_equal(B,C)
285 286
286 287 @skip_without('numpy')
287 288 def test_push_pull_recarray(self):
288 289 """push/pull recarrays"""
289 290 import numpy
290 291 from numpy.testing.utils import assert_array_equal
291 292
292 293 view = self.client[-1]
293 294
294 295 R = numpy.array([
295 296 (1, 'hi', 0.),
296 297 (2**30, 'there', 2.5),
297 298 (-99999, 'world', -12345.6789),
298 299 ], [('n', int), ('s', '|S10'), ('f', float)])
299 300
300 301 view['RR'] = R
301 302 R2 = view['RR']
302 303
303 304 r_dtype, r_shape = view.apply_sync(interactive(lambda : (RR.dtype, RR.shape)))
304 305 self.assertEqual(r_dtype, R.dtype)
305 306 self.assertEqual(r_shape, R.shape)
306 307 self.assertEqual(R2.dtype, R.dtype)
307 308 self.assertEqual(R2.shape, R.shape)
308 309 assert_array_equal(R2, R)
309 310
310 311 def test_map(self):
311 312 view = self.client[:]
312 313 def f(x):
313 314 return x**2
314 315 data = range(16)
315 316 r = view.map_sync(f, data)
316 317 self.assertEqual(r, map(f, data))
317 318
318 319 def test_map_iterable(self):
319 320 """test map on iterables (direct)"""
320 321 view = self.client[:]
321 322 # 101 is prime, so it won't be evenly distributed
322 323 arr = range(101)
323 324 # ensure it will be an iterator, even in Python 3
324 325 it = iter(arr)
325 326 r = view.map_sync(lambda x:x, arr)
326 327 self.assertEqual(r, list(arr))
327 328
328 329 def test_scatter_gather_nonblocking(self):
329 330 data = range(16)
330 331 view = self.client[:]
331 332 view.scatter('a', data, block=False)
332 333 ar = view.gather('a', block=False)
333 334 self.assertEqual(ar.get(), data)
334 335
335 336 @skip_without('numpy')
336 337 def test_scatter_gather_numpy_nonblocking(self):
337 338 import numpy
338 339 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
339 340 a = numpy.arange(64)
340 341 view = self.client[:]
341 342 ar = view.scatter('a', a, block=False)
342 343 self.assertTrue(isinstance(ar, AsyncResult))
343 344 amr = view.gather('a', block=False)
344 345 self.assertTrue(isinstance(amr, AsyncMapResult))
345 346 assert_array_equal(amr.get(), a)
346 347
347 348 def test_execute(self):
348 349 view = self.client[:]
349 350 # self.client.debug=True
350 351 execute = view.execute
351 352 ar = execute('c=30', block=False)
352 353 self.assertTrue(isinstance(ar, AsyncResult))
353 354 ar = execute('d=[0,1,2]', block=False)
354 355 self.client.wait(ar, 1)
355 356 self.assertEqual(len(ar.get()), len(self.client))
356 357 for c in view['c']:
357 358 self.assertEqual(c, 30)
358 359
359 360 def test_abort(self):
360 361 view = self.client[-1]
361 362 ar = view.execute('import time; time.sleep(1)', block=False)
362 363 ar2 = view.apply_async(lambda : 2)
363 364 ar3 = view.apply_async(lambda : 3)
364 365 view.abort(ar2)
365 366 view.abort(ar3.msg_ids)
366 367 self.assertRaises(error.TaskAborted, ar2.get)
367 368 self.assertRaises(error.TaskAborted, ar3.get)
368 369
369 370 def test_abort_all(self):
370 371 """view.abort() aborts all outstanding tasks"""
371 372 view = self.client[-1]
372 373 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
373 374 view.abort()
374 375 view.wait(timeout=5)
375 376 for ar in ars[5:]:
376 377 self.assertRaises(error.TaskAborted, ar.get)
377 378
378 379 def test_temp_flags(self):
379 380 view = self.client[-1]
380 381 view.block=True
381 382 with view.temp_flags(block=False):
382 383 self.assertFalse(view.block)
383 384 self.assertTrue(view.block)
384 385
385 386 @dec.known_failure_py3
386 387 def test_importer(self):
387 388 view = self.client[-1]
388 389 view.clear(block=True)
389 390 with view.importer:
390 391 import re
391 392
392 393 @interactive
393 394 def findall(pat, s):
394 395 # this globals() step isn't necessary in real code
395 396 # only to prevent a closure in the test
396 397 re = globals()['re']
397 398 return re.findall(pat, s)
398 399
399 400 self.assertEqual(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
400 401
401 402 def test_unicode_execute(self):
402 403 """test executing unicode strings"""
403 404 v = self.client[-1]
404 405 v.block=True
405 406 if sys.version_info[0] >= 3:
406 407 code="a='é'"
407 408 else:
408 409 code=u"a=u'é'"
409 410 v.execute(code)
410 411 self.assertEqual(v['a'], u'é')
411 412
412 413 def test_unicode_apply_result(self):
413 414 """test unicode apply results"""
414 415 v = self.client[-1]
415 416 r = v.apply_sync(lambda : u'é')
416 417 self.assertEqual(r, u'é')
417 418
418 419 def test_unicode_apply_arg(self):
419 420 """test passing unicode arguments to apply"""
420 421 v = self.client[-1]
421 422
422 423 @interactive
423 424 def check_unicode(a, check):
424 425 assert isinstance(a, unicode), "%r is not unicode"%a
425 426 assert isinstance(check, bytes), "%r is not bytes"%check
426 427 assert a.encode('utf8') == check, "%s != %s"%(a,check)
427 428
428 429 for s in [ u'é', u'ßø®∫',u'asdf' ]:
429 430 try:
430 431 v.apply_sync(check_unicode, s, s.encode('utf8'))
431 432 except error.RemoteError as e:
432 433 if e.ename == 'AssertionError':
433 434 self.fail(e.evalue)
434 435 else:
435 436 raise e
436 437
437 438 def test_map_reference(self):
438 439 """view.map(<Reference>, *seqs) should work"""
439 440 v = self.client[:]
440 441 v.scatter('n', self.client.ids, flatten=True)
441 442 v.execute("f = lambda x,y: x*y")
442 443 rf = pmod.Reference('f')
443 444 nlist = list(range(10))
444 445 mlist = nlist[::-1]
445 446 expected = [ m*n for m,n in zip(mlist, nlist) ]
446 447 result = v.map_sync(rf, mlist, nlist)
447 448 self.assertEqual(result, expected)
448 449
449 450 def test_apply_reference(self):
450 451 """view.apply(<Reference>, *args) should work"""
451 452 v = self.client[:]
452 453 v.scatter('n', self.client.ids, flatten=True)
453 454 v.execute("f = lambda x: n*x")
454 455 rf = pmod.Reference('f')
455 456 result = v.apply_sync(rf, 5)
456 457 expected = [ 5*id for id in self.client.ids ]
457 458 self.assertEqual(result, expected)
458 459
459 460 def test_eval_reference(self):
460 461 v = self.client[self.client.ids[0]]
461 462 v['g'] = range(5)
462 463 rg = pmod.Reference('g[0]')
463 464 echo = lambda x:x
464 465 self.assertEqual(v.apply_sync(echo, rg), 0)
465 466
466 467 def test_reference_nameerror(self):
467 468 v = self.client[self.client.ids[0]]
468 469 r = pmod.Reference('elvis_has_left')
469 470 echo = lambda x:x
470 471 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
471 472
472 473 def test_single_engine_map(self):
473 474 e0 = self.client[self.client.ids[0]]
474 475 r = range(5)
475 476 check = [ -1*i for i in r ]
476 477 result = e0.map_sync(lambda x: -1*x, r)
477 478 self.assertEqual(result, check)
478 479
479 480 def test_len(self):
480 481 """len(view) makes sense"""
481 482 e0 = self.client[self.client.ids[0]]
482 483 yield self.assertEqual(len(e0), 1)
483 484 v = self.client[:]
484 485 yield self.assertEqual(len(v), len(self.client.ids))
485 486 v = self.client.direct_view('all')
486 487 yield self.assertEqual(len(v), len(self.client.ids))
487 488 v = self.client[:2]
488 489 yield self.assertEqual(len(v), 2)
489 490 v = self.client[:1]
490 491 yield self.assertEqual(len(v), 1)
491 492 v = self.client.load_balanced_view()
492 493 yield self.assertEqual(len(v), len(self.client.ids))
493 494 # parametric tests seem to require manual closing?
494 495 self.client.close()
495 496
496 497
497 498 # begin execute tests
498 499
499 500 def test_execute_reply(self):
500 501 e0 = self.client[self.client.ids[0]]
501 502 e0.block = True
502 503 ar = e0.execute("5", silent=False)
503 504 er = ar.get()
504 505 self.assertEqual(str(er), "<ExecuteReply[%i]: 5>" % er.execution_count)
505 506 self.assertEqual(er.pyout['data']['text/plain'], '5')
506 507
507 508 def test_execute_reply_stdout(self):
508 509 e0 = self.client[self.client.ids[0]]
509 510 e0.block = True
510 511 ar = e0.execute("print (5)", silent=False)
511 512 er = ar.get()
512 513 self.assertEqual(er.stdout.strip(), '5')
513 514
514 515 def test_execute_pyout(self):
515 516 """execute triggers pyout with silent=False"""
516 517 view = self.client[:]
517 518 ar = view.execute("5", silent=False, block=True)
518 519
519 520 expected = [{'text/plain' : '5'}] * len(view)
520 521 mimes = [ out['data'] for out in ar.pyout ]
521 522 self.assertEqual(mimes, expected)
522 523
523 524 def test_execute_silent(self):
524 525 """execute does not trigger pyout with silent=True"""
525 526 view = self.client[:]
526 527 ar = view.execute("5", block=True)
527 528 expected = [None] * len(view)
528 529 self.assertEqual(ar.pyout, expected)
529 530
530 531 def test_execute_magic(self):
531 532 """execute accepts IPython commands"""
532 533 view = self.client[:]
533 534 view.execute("a = 5")
534 535 ar = view.execute("%whos", block=True)
535 536 # this will raise, if that failed
536 537 ar.get(5)
537 538 for stdout in ar.stdout:
538 539 lines = stdout.splitlines()
539 540 self.assertEqual(lines[0].split(), ['Variable', 'Type', 'Data/Info'])
540 541 found = False
541 542 for line in lines[2:]:
542 543 split = line.split()
543 544 if split == ['a', 'int', '5']:
544 545 found = True
545 546 break
546 547 self.assertTrue(found, "whos output wrong: %s" % stdout)
547 548
548 549 def test_execute_displaypub(self):
549 550 """execute tracks display_pub output"""
550 551 view = self.client[:]
551 552 view.execute("from IPython.core.display import *")
552 553 ar = view.execute("[ display(i) for i in range(5) ]", block=True)
553 554
554 555 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
555 556 for outputs in ar.outputs:
556 557 mimes = [ out['data'] for out in outputs ]
557 558 self.assertEqual(mimes, expected)
558 559
559 560 def test_apply_displaypub(self):
560 561 """apply tracks display_pub output"""
561 562 view = self.client[:]
562 563 view.execute("from IPython.core.display import *")
563 564
564 565 @interactive
565 566 def publish():
566 567 [ display(i) for i in range(5) ]
567 568
568 569 ar = view.apply_async(publish)
569 570 ar.get(5)
570 571 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
571 572 for outputs in ar.outputs:
572 573 mimes = [ out['data'] for out in outputs ]
573 574 self.assertEqual(mimes, expected)
574 575
575 576 def test_execute_raises(self):
576 577 """exceptions in execute requests raise appropriately"""
577 578 view = self.client[-1]
578 579 ar = view.execute("1/0")
579 580 self.assertRaisesRemote(ZeroDivisionError, ar.get, 2)
580 581
582 def test_remoteerror_render_exception(self):
583 """RemoteErrors get nice tracebacks"""
584 view = self.client[-1]
585 ar = view.execute("1/0")
586 ip = get_ipython()
587 ip.user_ns['ar'] = ar
588 with capture_output() as io:
589 ip.run_cell("ar.get(2)")
590
591 self.assertTrue('ZeroDivisionError' in io.stdout, io.stdout)
592
593 def test_compositeerror_render_exception(self):
594 """CompositeErrors get nice tracebacks"""
595 view = self.client[:]
596 ar = view.execute("1/0")
597 ip = get_ipython()
598 ip.user_ns['ar'] = ar
599 with capture_output() as io:
600 ip.run_cell("ar.get(2)")
601
602 self.assertEqual(io.stdout.count('ZeroDivisionError'), len(view) * 2, io.stdout)
603 self.assertEqual(io.stdout.count('integer division'), len(view), io.stdout)
604 self.assertEqual(io.stdout.count(':execute'), len(view), io.stdout)
605
581 606 @dec.skipif_not_matplotlib
582 607 def test_magic_pylab(self):
583 608 """%pylab works on engines"""
584 609 view = self.client[-1]
585 610 ar = view.execute("%pylab inline")
586 611 # at least check if this raised:
587 612 reply = ar.get(5)
588 613 # include imports, in case user config
589 614 ar = view.execute("plot(rand(100))", silent=False)
590 615 reply = ar.get(5)
591 616 self.assertEqual(len(reply.outputs), 1)
592 617 output = reply.outputs[0]
593 618 self.assertTrue("data" in output)
594 619 data = output['data']
595 620 self.assertTrue("image/png" in data)
596 621
597 622 def test_func_default_func(self):
598 623 """interactively defined function as apply func default"""
599 624 def foo():
600 625 return 'foo'
601 626
602 627 def bar(f=foo):
603 628 return f()
604 629
605 630 view = self.client[-1]
606 631 ar = view.apply_async(bar)
607 632 r = ar.get(10)
608 633 self.assertEqual(r, 'foo')
609 634 def test_data_pub_single(self):
610 635 view = self.client[-1]
611 636 ar = view.execute('\n'.join([
612 637 'from IPython.zmq.datapub import publish_data',
613 638 'for i in range(5):',
614 639 ' publish_data(dict(i=i))'
615 640 ]), block=False)
616 641 self.assertTrue(isinstance(ar.data, dict))
617 642 ar.get(5)
618 643 self.assertEqual(ar.data, dict(i=4))
619 644
620 645 def test_data_pub(self):
621 646 view = self.client[:]
622 647 ar = view.execute('\n'.join([
623 648 'from IPython.zmq.datapub import publish_data',
624 649 'for i in range(5):',
625 650 ' publish_data(dict(i=i))'
626 651 ]), block=False)
627 652 self.assertTrue(all(isinstance(d, dict) for d in ar.data))
628 653 ar.get(5)
629 654 self.assertEqual(ar.data, [dict(i=4)] * len(ar))
630 655
631 656 def test_can_list_arg(self):
632 657 """args in lists are canned"""
633 658 view = self.client[-1]
634 659 view['a'] = 128
635 660 rA = pmod.Reference('a')
636 661 ar = view.apply_async(lambda x: x, [rA])
637 662 r = ar.get(5)
638 663 self.assertEqual(r, [128])
639 664
640 665 def test_can_dict_arg(self):
641 666 """args in dicts are canned"""
642 667 view = self.client[-1]
643 668 view['a'] = 128
644 669 rA = pmod.Reference('a')
645 670 ar = view.apply_async(lambda x: x, dict(foo=rA))
646 671 r = ar.get(5)
647 672 self.assertEqual(r, dict(foo=128))
648 673
649 674 def test_can_list_kwarg(self):
650 675 """kwargs in lists are canned"""
651 676 view = self.client[-1]
652 677 view['a'] = 128
653 678 rA = pmod.Reference('a')
654 679 ar = view.apply_async(lambda x=5: x, x=[rA])
655 680 r = ar.get(5)
656 681 self.assertEqual(r, [128])
657 682
658 683 def test_can_dict_kwarg(self):
659 684 """kwargs in dicts are canned"""
660 685 view = self.client[-1]
661 686 view['a'] = 128
662 687 rA = pmod.Reference('a')
663 688 ar = view.apply_async(lambda x=5: x, dict(foo=rA))
664 689 r = ar.get(5)
665 690 self.assertEqual(r, dict(foo=128))
666 691
667 692 def test_map_ref(self):
668 693 """view.map works with references"""
669 694 view = self.client[:]
670 695 ranks = sorted(self.client.ids)
671 696 view.scatter('rank', ranks, flatten=True)
672 697 rrank = pmod.Reference('rank')
673 698
674 699 amr = view.map_async(lambda x: x*2, [rrank] * len(view))
675 700 drank = amr.get(5)
676 701 self.assertEqual(drank, [ r*2 for r in ranks ])
677 702
678 703
General Comments 0
You need to be logged in to leave comments. Login now