##// END OF EJS Templates
py3: assign e so it survives outside the scope
MinRK -
Show More
@@ -1,789 +1,789 b''
1 1 # -*- coding: utf-8 -*-
2 2 """test View objects
3 3
4 4 Authors:
5 5
6 6 * Min RK
7 7 """
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 import sys
20 20 import platform
21 21 import time
22 22 from collections import namedtuple
23 23 from tempfile import mktemp
24 24 from StringIO import StringIO
25 25
26 26 import zmq
27 27 from nose import SkipTest
28 28 from nose.plugins.attrib import attr
29 29
30 30 from IPython.testing import decorators as dec
31 31 from IPython.testing.ipunittest import ParametricTestCase
32 32 from IPython.utils.io import capture_output
33 33
34 34 from IPython import parallel as pmod
35 35 from IPython.parallel import error
36 36 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
37 37 from IPython.parallel import DirectView
38 38 from IPython.parallel.util import interactive
39 39
40 40 from IPython.parallel.tests import add_engines
41 41
42 42 from .clienttest import ClusterTestCase, crash, wait, skip_without
43 43
44 44 def setup():
45 45 add_engines(3, total=True)
46 46
47 47 point = namedtuple("point", "x y")
48 48
49 49 class TestView(ClusterTestCase, ParametricTestCase):
50 50
51 51 def setUp(self):
52 52 # On Win XP, wait for resource cleanup, else parallel test group fails
53 53 if platform.system() == "Windows" and platform.win32_ver()[0] == "XP":
54 54 # 1 sec fails. 1.5 sec seems ok. Using 2 sec for margin of safety
55 55 time.sleep(2)
56 56 super(TestView, self).setUp()
57 57
58 58 @attr('crash')
59 59 def test_z_crash_mux(self):
60 60 """test graceful handling of engine death (direct)"""
61 61 # self.add_engines(1)
62 62 eid = self.client.ids[-1]
63 63 ar = self.client[eid].apply_async(crash)
64 64 self.assertRaisesRemote(error.EngineError, ar.get, 10)
65 65 eid = ar.engine_id
66 66 tic = time.time()
67 67 while eid in self.client.ids and time.time()-tic < 5:
68 68 time.sleep(.01)
69 69 self.client.spin()
70 70 self.assertFalse(eid in self.client.ids, "Engine should have died")
71 71
72 72 def test_push_pull(self):
73 73 """test pushing and pulling"""
74 74 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
75 75 t = self.client.ids[-1]
76 76 v = self.client[t]
77 77 push = v.push
78 78 pull = v.pull
79 79 v.block=True
80 80 nengines = len(self.client)
81 81 push({'data':data})
82 82 d = pull('data')
83 83 self.assertEqual(d, data)
84 84 self.client[:].push({'data':data})
85 85 d = self.client[:].pull('data', block=True)
86 86 self.assertEqual(d, nengines*[data])
87 87 ar = push({'data':data}, block=False)
88 88 self.assertTrue(isinstance(ar, AsyncResult))
89 89 r = ar.get()
90 90 ar = self.client[:].pull('data', block=False)
91 91 self.assertTrue(isinstance(ar, AsyncResult))
92 92 r = ar.get()
93 93 self.assertEqual(r, nengines*[data])
94 94 self.client[:].push(dict(a=10,b=20))
95 95 r = self.client[:].pull(('a','b'), block=True)
96 96 self.assertEqual(r, nengines*[[10,20]])
97 97
98 98 def test_push_pull_function(self):
99 99 "test pushing and pulling functions"
100 100 def testf(x):
101 101 return 2.0*x
102 102
103 103 t = self.client.ids[-1]
104 104 v = self.client[t]
105 105 v.block=True
106 106 push = v.push
107 107 pull = v.pull
108 108 execute = v.execute
109 109 push({'testf':testf})
110 110 r = pull('testf')
111 111 self.assertEqual(r(1.0), testf(1.0))
112 112 execute('r = testf(10)')
113 113 r = pull('r')
114 114 self.assertEqual(r, testf(10))
115 115 ar = self.client[:].push({'testf':testf}, block=False)
116 116 ar.get()
117 117 ar = self.client[:].pull('testf', block=False)
118 118 rlist = ar.get()
119 119 for r in rlist:
120 120 self.assertEqual(r(1.0), testf(1.0))
121 121 execute("def g(x): return x*x")
122 122 r = pull(('testf','g'))
123 123 self.assertEqual((r[0](10),r[1](10)), (testf(10), 100))
124 124
125 125 def test_push_function_globals(self):
126 126 """test that pushed functions have access to globals"""
127 127 @interactive
128 128 def geta():
129 129 return a
130 130 # self.add_engines(1)
131 131 v = self.client[-1]
132 132 v.block=True
133 133 v['f'] = geta
134 134 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
135 135 v.execute('a=5')
136 136 v.execute('b=f()')
137 137 self.assertEqual(v['b'], 5)
138 138
139 139 def test_push_function_defaults(self):
140 140 """test that pushed functions preserve default args"""
141 141 def echo(a=10):
142 142 return a
143 143 v = self.client[-1]
144 144 v.block=True
145 145 v['f'] = echo
146 146 v.execute('b=f()')
147 147 self.assertEqual(v['b'], 10)
148 148
149 149 def test_get_result(self):
150 150 """test getting results from the Hub."""
151 151 c = pmod.Client(profile='iptest')
152 152 # self.add_engines(1)
153 153 t = c.ids[-1]
154 154 v = c[t]
155 155 v2 = self.client[t]
156 156 ar = v.apply_async(wait, 1)
157 157 # give the monitor time to notice the message
158 158 time.sleep(.25)
159 159 ahr = v2.get_result(ar.msg_ids)
160 160 self.assertTrue(isinstance(ahr, AsyncHubResult))
161 161 self.assertEqual(ahr.get(), ar.get())
162 162 ar2 = v2.get_result(ar.msg_ids)
163 163 self.assertFalse(isinstance(ar2, AsyncHubResult))
164 164 c.spin()
165 165 c.close()
166 166
167 167 def test_run_newline(self):
168 168 """test that run appends newline to files"""
169 169 tmpfile = mktemp()
170 170 with open(tmpfile, 'w') as f:
171 171 f.write("""def g():
172 172 return 5
173 173 """)
174 174 v = self.client[-1]
175 175 v.run(tmpfile, block=True)
176 176 self.assertEqual(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
177 177
178 178 def test_apply_tracked(self):
179 179 """test tracking for apply"""
180 180 # self.add_engines(1)
181 181 t = self.client.ids[-1]
182 182 v = self.client[t]
183 183 v.block=False
184 184 def echo(n=1024*1024, **kwargs):
185 185 with v.temp_flags(**kwargs):
186 186 return v.apply(lambda x: x, 'x'*n)
187 187 ar = echo(1, track=False)
188 188 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
189 189 self.assertTrue(ar.sent)
190 190 ar = echo(track=True)
191 191 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
192 192 self.assertEqual(ar.sent, ar._tracker.done)
193 193 ar._tracker.wait()
194 194 self.assertTrue(ar.sent)
195 195
196 196 def test_push_tracked(self):
197 197 t = self.client.ids[-1]
198 198 ns = dict(x='x'*1024*1024)
199 199 v = self.client[t]
200 200 ar = v.push(ns, block=False, track=False)
201 201 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
202 202 self.assertTrue(ar.sent)
203 203
204 204 ar = v.push(ns, block=False, track=True)
205 205 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
206 206 ar._tracker.wait()
207 207 self.assertEqual(ar.sent, ar._tracker.done)
208 208 self.assertTrue(ar.sent)
209 209 ar.get()
210 210
211 211 def test_scatter_tracked(self):
212 212 t = self.client.ids
213 213 x='x'*1024*1024
214 214 ar = self.client[t].scatter('x', x, block=False, track=False)
215 215 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
216 216 self.assertTrue(ar.sent)
217 217
218 218 ar = self.client[t].scatter('x', x, block=False, track=True)
219 219 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
220 220 self.assertEqual(ar.sent, ar._tracker.done)
221 221 ar._tracker.wait()
222 222 self.assertTrue(ar.sent)
223 223 ar.get()
224 224
225 225 def test_remote_reference(self):
226 226 v = self.client[-1]
227 227 v['a'] = 123
228 228 ra = pmod.Reference('a')
229 229 b = v.apply_sync(lambda x: x, ra)
230 230 self.assertEqual(b, 123)
231 231
232 232
233 233 def test_scatter_gather(self):
234 234 view = self.client[:]
235 235 seq1 = range(16)
236 236 view.scatter('a', seq1)
237 237 seq2 = view.gather('a', block=True)
238 238 self.assertEqual(seq2, seq1)
239 239 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
240 240
241 241 @skip_without('numpy')
242 242 def test_scatter_gather_numpy(self):
243 243 import numpy
244 244 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
245 245 view = self.client[:]
246 246 a = numpy.arange(64)
247 247 view.scatter('a', a, block=True)
248 248 b = view.gather('a', block=True)
249 249 assert_array_equal(b, a)
250 250
251 251 def test_scatter_gather_lazy(self):
252 252 """scatter/gather with targets='all'"""
253 253 view = self.client.direct_view(targets='all')
254 254 x = range(64)
255 255 view.scatter('x', x)
256 256 gathered = view.gather('x', block=True)
257 257 self.assertEqual(gathered, x)
258 258
259 259
260 260 @dec.known_failure_py3
261 261 @skip_without('numpy')
262 262 def test_push_numpy_nocopy(self):
263 263 import numpy
264 264 view = self.client[:]
265 265 a = numpy.arange(64)
266 266 view['A'] = a
267 267 @interactive
268 268 def check_writeable(x):
269 269 return x.flags.writeable
270 270
271 271 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
272 272 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
273 273
274 274 view.push(dict(B=a))
275 275 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
276 276 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
277 277
278 278 @skip_without('numpy')
279 279 def test_apply_numpy(self):
280 280 """view.apply(f, ndarray)"""
281 281 import numpy
282 282 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
283 283
284 284 A = numpy.random.random((100,100))
285 285 view = self.client[-1]
286 286 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
287 287 B = A.astype(dt)
288 288 C = view.apply_sync(lambda x:x, B)
289 289 assert_array_equal(B,C)
290 290
291 291 @skip_without('numpy')
292 292 def test_push_pull_recarray(self):
293 293 """push/pull recarrays"""
294 294 import numpy
295 295 from numpy.testing.utils import assert_array_equal
296 296
297 297 view = self.client[-1]
298 298
299 299 R = numpy.array([
300 300 (1, 'hi', 0.),
301 301 (2**30, 'there', 2.5),
302 302 (-99999, 'world', -12345.6789),
303 303 ], [('n', int), ('s', '|S10'), ('f', float)])
304 304
305 305 view['RR'] = R
306 306 R2 = view['RR']
307 307
308 308 r_dtype, r_shape = view.apply_sync(interactive(lambda : (RR.dtype, RR.shape)))
309 309 self.assertEqual(r_dtype, R.dtype)
310 310 self.assertEqual(r_shape, R.shape)
311 311 self.assertEqual(R2.dtype, R.dtype)
312 312 self.assertEqual(R2.shape, R.shape)
313 313 assert_array_equal(R2, R)
314 314
315 315 @skip_without('pandas')
316 316 def test_push_pull_timeseries(self):
317 317 """push/pull pandas.TimeSeries"""
318 318 import pandas
319 319
320 320 ts = pandas.TimeSeries(range(10))
321 321
322 322 view = self.client[-1]
323 323
324 324 view.push(dict(ts=ts), block=True)
325 325 rts = view['ts']
326 326
327 327 self.assertEqual(type(rts), type(ts))
328 328 self.assertTrue((ts == rts).all())
329 329
330 330 def test_map(self):
331 331 view = self.client[:]
332 332 def f(x):
333 333 return x**2
334 334 data = range(16)
335 335 r = view.map_sync(f, data)
336 336 self.assertEqual(r, map(f, data))
337 337
338 338 def test_map_iterable(self):
339 339 """test map on iterables (direct)"""
340 340 view = self.client[:]
341 341 # 101 is prime, so it won't be evenly distributed
342 342 arr = range(101)
343 343 # ensure it will be an iterator, even in Python 3
344 344 it = iter(arr)
345 345 r = view.map_sync(lambda x:x, arr)
346 346 self.assertEqual(r, list(arr))
347 347
348 348 def test_scatter_gather_nonblocking(self):
349 349 data = range(16)
350 350 view = self.client[:]
351 351 view.scatter('a', data, block=False)
352 352 ar = view.gather('a', block=False)
353 353 self.assertEqual(ar.get(), data)
354 354
355 355 @skip_without('numpy')
356 356 def test_scatter_gather_numpy_nonblocking(self):
357 357 import numpy
358 358 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
359 359 a = numpy.arange(64)
360 360 view = self.client[:]
361 361 ar = view.scatter('a', a, block=False)
362 362 self.assertTrue(isinstance(ar, AsyncResult))
363 363 amr = view.gather('a', block=False)
364 364 self.assertTrue(isinstance(amr, AsyncMapResult))
365 365 assert_array_equal(amr.get(), a)
366 366
367 367 def test_execute(self):
368 368 view = self.client[:]
369 369 # self.client.debug=True
370 370 execute = view.execute
371 371 ar = execute('c=30', block=False)
372 372 self.assertTrue(isinstance(ar, AsyncResult))
373 373 ar = execute('d=[0,1,2]', block=False)
374 374 self.client.wait(ar, 1)
375 375 self.assertEqual(len(ar.get()), len(self.client))
376 376 for c in view['c']:
377 377 self.assertEqual(c, 30)
378 378
379 379 def test_abort(self):
380 380 view = self.client[-1]
381 381 ar = view.execute('import time; time.sleep(1)', block=False)
382 382 ar2 = view.apply_async(lambda : 2)
383 383 ar3 = view.apply_async(lambda : 3)
384 384 view.abort(ar2)
385 385 view.abort(ar3.msg_ids)
386 386 self.assertRaises(error.TaskAborted, ar2.get)
387 387 self.assertRaises(error.TaskAborted, ar3.get)
388 388
389 389 def test_abort_all(self):
390 390 """view.abort() aborts all outstanding tasks"""
391 391 view = self.client[-1]
392 392 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
393 393 view.abort()
394 394 view.wait(timeout=5)
395 395 for ar in ars[5:]:
396 396 self.assertRaises(error.TaskAborted, ar.get)
397 397
398 398 def test_temp_flags(self):
399 399 view = self.client[-1]
400 400 view.block=True
401 401 with view.temp_flags(block=False):
402 402 self.assertFalse(view.block)
403 403 self.assertTrue(view.block)
404 404
405 405 @dec.known_failure_py3
406 406 def test_importer(self):
407 407 view = self.client[-1]
408 408 view.clear(block=True)
409 409 with view.importer:
410 410 import re
411 411
412 412 @interactive
413 413 def findall(pat, s):
414 414 # this globals() step isn't necessary in real code
415 415 # only to prevent a closure in the test
416 416 re = globals()['re']
417 417 return re.findall(pat, s)
418 418
419 419 self.assertEqual(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
420 420
421 421 def test_unicode_execute(self):
422 422 """test executing unicode strings"""
423 423 v = self.client[-1]
424 424 v.block=True
425 425 if sys.version_info[0] >= 3:
426 426 code="a='é'"
427 427 else:
428 428 code=u"a=u'é'"
429 429 v.execute(code)
430 430 self.assertEqual(v['a'], u'é')
431 431
432 432 def test_unicode_apply_result(self):
433 433 """test unicode apply results"""
434 434 v = self.client[-1]
435 435 r = v.apply_sync(lambda : u'é')
436 436 self.assertEqual(r, u'é')
437 437
438 438 def test_unicode_apply_arg(self):
439 439 """test passing unicode arguments to apply"""
440 440 v = self.client[-1]
441 441
442 442 @interactive
443 443 def check_unicode(a, check):
444 444 assert isinstance(a, unicode), "%r is not unicode"%a
445 445 assert isinstance(check, bytes), "%r is not bytes"%check
446 446 assert a.encode('utf8') == check, "%s != %s"%(a,check)
447 447
448 448 for s in [ u'é', u'ßø®∫',u'asdf' ]:
449 449 try:
450 450 v.apply_sync(check_unicode, s, s.encode('utf8'))
451 451 except error.RemoteError as e:
452 452 if e.ename == 'AssertionError':
453 453 self.fail(e.evalue)
454 454 else:
455 455 raise e
456 456
457 457 def test_map_reference(self):
458 458 """view.map(<Reference>, *seqs) should work"""
459 459 v = self.client[:]
460 460 v.scatter('n', self.client.ids, flatten=True)
461 461 v.execute("f = lambda x,y: x*y")
462 462 rf = pmod.Reference('f')
463 463 nlist = list(range(10))
464 464 mlist = nlist[::-1]
465 465 expected = [ m*n for m,n in zip(mlist, nlist) ]
466 466 result = v.map_sync(rf, mlist, nlist)
467 467 self.assertEqual(result, expected)
468 468
469 469 def test_apply_reference(self):
470 470 """view.apply(<Reference>, *args) should work"""
471 471 v = self.client[:]
472 472 v.scatter('n', self.client.ids, flatten=True)
473 473 v.execute("f = lambda x: n*x")
474 474 rf = pmod.Reference('f')
475 475 result = v.apply_sync(rf, 5)
476 476 expected = [ 5*id for id in self.client.ids ]
477 477 self.assertEqual(result, expected)
478 478
479 479 def test_eval_reference(self):
480 480 v = self.client[self.client.ids[0]]
481 481 v['g'] = range(5)
482 482 rg = pmod.Reference('g[0]')
483 483 echo = lambda x:x
484 484 self.assertEqual(v.apply_sync(echo, rg), 0)
485 485
486 486 def test_reference_nameerror(self):
487 487 v = self.client[self.client.ids[0]]
488 488 r = pmod.Reference('elvis_has_left')
489 489 echo = lambda x:x
490 490 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
491 491
492 492 def test_single_engine_map(self):
493 493 e0 = self.client[self.client.ids[0]]
494 494 r = range(5)
495 495 check = [ -1*i for i in r ]
496 496 result = e0.map_sync(lambda x: -1*x, r)
497 497 self.assertEqual(result, check)
498 498
499 499 def test_len(self):
500 500 """len(view) makes sense"""
501 501 e0 = self.client[self.client.ids[0]]
502 502 yield self.assertEqual(len(e0), 1)
503 503 v = self.client[:]
504 504 yield self.assertEqual(len(v), len(self.client.ids))
505 505 v = self.client.direct_view('all')
506 506 yield self.assertEqual(len(v), len(self.client.ids))
507 507 v = self.client[:2]
508 508 yield self.assertEqual(len(v), 2)
509 509 v = self.client[:1]
510 510 yield self.assertEqual(len(v), 1)
511 511 v = self.client.load_balanced_view()
512 512 yield self.assertEqual(len(v), len(self.client.ids))
513 513 # parametric tests seem to require manual closing?
514 514 self.client.close()
515 515
516 516
517 517 # begin execute tests
518 518
519 519 def test_execute_reply(self):
520 520 e0 = self.client[self.client.ids[0]]
521 521 e0.block = True
522 522 ar = e0.execute("5", silent=False)
523 523 er = ar.get()
524 524 self.assertEqual(str(er), "<ExecuteReply[%i]: 5>" % er.execution_count)
525 525 self.assertEqual(er.pyout['data']['text/plain'], '5')
526 526
527 527 def test_execute_reply_stdout(self):
528 528 e0 = self.client[self.client.ids[0]]
529 529 e0.block = True
530 530 ar = e0.execute("print (5)", silent=False)
531 531 er = ar.get()
532 532 self.assertEqual(er.stdout.strip(), '5')
533 533
534 534 def test_execute_pyout(self):
535 535 """execute triggers pyout with silent=False"""
536 536 view = self.client[:]
537 537 ar = view.execute("5", silent=False, block=True)
538 538
539 539 expected = [{'text/plain' : '5'}] * len(view)
540 540 mimes = [ out['data'] for out in ar.pyout ]
541 541 self.assertEqual(mimes, expected)
542 542
543 543 def test_execute_silent(self):
544 544 """execute does not trigger pyout with silent=True"""
545 545 view = self.client[:]
546 546 ar = view.execute("5", block=True)
547 547 expected = [None] * len(view)
548 548 self.assertEqual(ar.pyout, expected)
549 549
550 550 def test_execute_magic(self):
551 551 """execute accepts IPython commands"""
552 552 view = self.client[:]
553 553 view.execute("a = 5")
554 554 ar = view.execute("%whos", block=True)
555 555 # this will raise, if that failed
556 556 ar.get(5)
557 557 for stdout in ar.stdout:
558 558 lines = stdout.splitlines()
559 559 self.assertEqual(lines[0].split(), ['Variable', 'Type', 'Data/Info'])
560 560 found = False
561 561 for line in lines[2:]:
562 562 split = line.split()
563 563 if split == ['a', 'int', '5']:
564 564 found = True
565 565 break
566 566 self.assertTrue(found, "whos output wrong: %s" % stdout)
567 567
568 568 def test_execute_displaypub(self):
569 569 """execute tracks display_pub output"""
570 570 view = self.client[:]
571 571 view.execute("from IPython.core.display import *")
572 572 ar = view.execute("[ display(i) for i in range(5) ]", block=True)
573 573
574 574 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
575 575 for outputs in ar.outputs:
576 576 mimes = [ out['data'] for out in outputs ]
577 577 self.assertEqual(mimes, expected)
578 578
579 579 def test_apply_displaypub(self):
580 580 """apply tracks display_pub output"""
581 581 view = self.client[:]
582 582 view.execute("from IPython.core.display import *")
583 583
584 584 @interactive
585 585 def publish():
586 586 [ display(i) for i in range(5) ]
587 587
588 588 ar = view.apply_async(publish)
589 589 ar.get(5)
590 590 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
591 591 for outputs in ar.outputs:
592 592 mimes = [ out['data'] for out in outputs ]
593 593 self.assertEqual(mimes, expected)
594 594
595 595 def test_execute_raises(self):
596 596 """exceptions in execute requests raise appropriately"""
597 597 view = self.client[-1]
598 598 ar = view.execute("1/0")
599 599 self.assertRaisesRemote(ZeroDivisionError, ar.get, 2)
600 600
601 601 def test_remoteerror_render_exception(self):
602 602 """RemoteErrors get nice tracebacks"""
603 603 view = self.client[-1]
604 604 ar = view.execute("1/0")
605 605 ip = get_ipython()
606 606 ip.user_ns['ar'] = ar
607 607 with capture_output() as io:
608 608 ip.run_cell("ar.get(2)")
609 609
610 610 self.assertTrue('ZeroDivisionError' in io.stdout, io.stdout)
611 611
612 612 def test_compositeerror_render_exception(self):
613 613 """CompositeErrors get nice tracebacks"""
614 614 view = self.client[:]
615 615 ar = view.execute("1/0")
616 616 ip = get_ipython()
617 617 ip.user_ns['ar'] = ar
618 618
619 619 with capture_output() as io:
620 620 ip.run_cell("ar.get(2)")
621 621
622 622 count = min(error.CompositeError.tb_limit, len(view))
623 623
624 624 self.assertEqual(io.stdout.count('ZeroDivisionError'), count * 2, io.stdout)
625 625 self.assertEqual(io.stdout.count('by zero'), count, io.stdout)
626 626 self.assertEqual(io.stdout.count(':execute'), count, io.stdout)
627 627
628 628 def test_compositeerror_truncate(self):
629 629 """Truncate CompositeErrors with many exceptions"""
630 630 view = self.client[:]
631 631 msg_ids = []
632 632 for i in range(10):
633 633 ar = view.execute("1/0")
634 634 msg_ids.extend(ar.msg_ids)
635 635
636 636 ar = self.client.get_result(msg_ids)
637 637 try:
638 638 ar.get()
639 except error.CompositeError as e:
640 pass
639 except error.CompositeError as _e:
640 e = _e
641 641 else:
642 642 self.fail("Should have raised CompositeError")
643 643
644 644 lines = e.render_traceback()
645 645 with capture_output() as io:
646 646 e.print_traceback()
647 647
648 648 self.assertTrue("more exceptions" in lines[-1])
649 649 count = e.tb_limit
650 650
651 651 self.assertEqual(io.stdout.count('ZeroDivisionError'), 2 * count, io.stdout)
652 652 self.assertEqual(io.stdout.count('by zero'), count, io.stdout)
653 653 self.assertEqual(io.stdout.count(':execute'), count, io.stdout)
654 654
655 655 @dec.skipif_not_matplotlib
656 656 def test_magic_pylab(self):
657 657 """%pylab works on engines"""
658 658 view = self.client[-1]
659 659 ar = view.execute("%pylab inline")
660 660 # at least check if this raised:
661 661 reply = ar.get(5)
662 662 # include imports, in case user config
663 663 ar = view.execute("plot(rand(100))", silent=False)
664 664 reply = ar.get(5)
665 665 self.assertEqual(len(reply.outputs), 1)
666 666 output = reply.outputs[0]
667 667 self.assertTrue("data" in output)
668 668 data = output['data']
669 669 self.assertTrue("image/png" in data)
670 670
671 671 def test_func_default_func(self):
672 672 """interactively defined function as apply func default"""
673 673 def foo():
674 674 return 'foo'
675 675
676 676 def bar(f=foo):
677 677 return f()
678 678
679 679 view = self.client[-1]
680 680 ar = view.apply_async(bar)
681 681 r = ar.get(10)
682 682 self.assertEqual(r, 'foo')
683 683 def test_data_pub_single(self):
684 684 view = self.client[-1]
685 685 ar = view.execute('\n'.join([
686 686 'from IPython.kernel.zmq.datapub import publish_data',
687 687 'for i in range(5):',
688 688 ' publish_data(dict(i=i))'
689 689 ]), block=False)
690 690 self.assertTrue(isinstance(ar.data, dict))
691 691 ar.get(5)
692 692 self.assertEqual(ar.data, dict(i=4))
693 693
694 694 def test_data_pub(self):
695 695 view = self.client[:]
696 696 ar = view.execute('\n'.join([
697 697 'from IPython.kernel.zmq.datapub import publish_data',
698 698 'for i in range(5):',
699 699 ' publish_data(dict(i=i))'
700 700 ]), block=False)
701 701 self.assertTrue(all(isinstance(d, dict) for d in ar.data))
702 702 ar.get(5)
703 703 self.assertEqual(ar.data, [dict(i=4)] * len(ar))
704 704
705 705 def test_can_list_arg(self):
706 706 """args in lists are canned"""
707 707 view = self.client[-1]
708 708 view['a'] = 128
709 709 rA = pmod.Reference('a')
710 710 ar = view.apply_async(lambda x: x, [rA])
711 711 r = ar.get(5)
712 712 self.assertEqual(r, [128])
713 713
714 714 def test_can_dict_arg(self):
715 715 """args in dicts are canned"""
716 716 view = self.client[-1]
717 717 view['a'] = 128
718 718 rA = pmod.Reference('a')
719 719 ar = view.apply_async(lambda x: x, dict(foo=rA))
720 720 r = ar.get(5)
721 721 self.assertEqual(r, dict(foo=128))
722 722
723 723 def test_can_list_kwarg(self):
724 724 """kwargs in lists are canned"""
725 725 view = self.client[-1]
726 726 view['a'] = 128
727 727 rA = pmod.Reference('a')
728 728 ar = view.apply_async(lambda x=5: x, x=[rA])
729 729 r = ar.get(5)
730 730 self.assertEqual(r, [128])
731 731
732 732 def test_can_dict_kwarg(self):
733 733 """kwargs in dicts are canned"""
734 734 view = self.client[-1]
735 735 view['a'] = 128
736 736 rA = pmod.Reference('a')
737 737 ar = view.apply_async(lambda x=5: x, dict(foo=rA))
738 738 r = ar.get(5)
739 739 self.assertEqual(r, dict(foo=128))
740 740
741 741 def test_map_ref(self):
742 742 """view.map works with references"""
743 743 view = self.client[:]
744 744 ranks = sorted(self.client.ids)
745 745 view.scatter('rank', ranks, flatten=True)
746 746 rrank = pmod.Reference('rank')
747 747
748 748 amr = view.map_async(lambda x: x*2, [rrank] * len(view))
749 749 drank = amr.get(5)
750 750 self.assertEqual(drank, [ r*2 for r in ranks ])
751 751
752 752 def test_nested_getitem_setitem(self):
753 753 """get and set with view['a.b']"""
754 754 view = self.client[-1]
755 755 view.execute('\n'.join([
756 756 'class A(object): pass',
757 757 'a = A()',
758 758 'a.b = 128',
759 759 ]), block=True)
760 760 ra = pmod.Reference('a')
761 761
762 762 r = view.apply_sync(lambda x: x.b, ra)
763 763 self.assertEqual(r, 128)
764 764 self.assertEqual(view['a.b'], 128)
765 765
766 766 view['a.b'] = 0
767 767
768 768 r = view.apply_sync(lambda x: x.b, ra)
769 769 self.assertEqual(r, 0)
770 770 self.assertEqual(view['a.b'], 0)
771 771
772 772 def test_return_namedtuple(self):
773 773 def namedtuplify(x, y):
774 774 from IPython.parallel.tests.test_view import point
775 775 return point(x, y)
776 776
777 777 view = self.client[-1]
778 778 p = view.apply_sync(namedtuplify, 1, 2)
779 779 self.assertEqual(p.x, 1)
780 780 self.assertEqual(p.y, 2)
781 781
782 782 def test_apply_namedtuple(self):
783 783 def echoxy(p):
784 784 return p.y, p.x
785 785
786 786 view = self.client[-1]
787 787 tup = view.apply_sync(echoxy, point(1, 2))
788 788 self.assertEqual(tup, (2,1))
789 789
General Comments 0
You need to be logged in to leave comments. Login now