##// END OF EJS Templates
fix count when testing composite error output
MinRK -
Show More
@@ -1,786 +1,789 b''
1 1 # -*- coding: utf-8 -*-
2 2 """test View objects
3 3
4 4 Authors:
5 5
6 6 * Min RK
7 7 """
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 import sys
20 20 import platform
21 21 import time
22 22 from collections import namedtuple
23 23 from tempfile import mktemp
24 24 from StringIO import StringIO
25 25
26 26 import zmq
27 27 from nose import SkipTest
28 28 from nose.plugins.attrib import attr
29 29
30 30 from IPython.testing import decorators as dec
31 31 from IPython.testing.ipunittest import ParametricTestCase
32 32 from IPython.utils.io import capture_output
33 33
34 34 from IPython import parallel as pmod
35 35 from IPython.parallel import error
36 36 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
37 37 from IPython.parallel import DirectView
38 38 from IPython.parallel.util import interactive
39 39
40 40 from IPython.parallel.tests import add_engines
41 41
42 42 from .clienttest import ClusterTestCase, crash, wait, skip_without
43 43
44 44 def setup():
45 45 add_engines(3, total=True)
46 46
47 47 point = namedtuple("point", "x y")
48 48
49 49 class TestView(ClusterTestCase, ParametricTestCase):
50 50
51 51 def setUp(self):
52 52 # On Win XP, wait for resource cleanup, else parallel test group fails
53 53 if platform.system() == "Windows" and platform.win32_ver()[0] == "XP":
54 54 # 1 sec fails. 1.5 sec seems ok. Using 2 sec for margin of safety
55 55 time.sleep(2)
56 56 super(TestView, self).setUp()
57 57
58 58 @attr('crash')
59 59 def test_z_crash_mux(self):
60 60 """test graceful handling of engine death (direct)"""
61 61 # self.add_engines(1)
62 62 eid = self.client.ids[-1]
63 63 ar = self.client[eid].apply_async(crash)
64 64 self.assertRaisesRemote(error.EngineError, ar.get, 10)
65 65 eid = ar.engine_id
66 66 tic = time.time()
67 67 while eid in self.client.ids and time.time()-tic < 5:
68 68 time.sleep(.01)
69 69 self.client.spin()
70 70 self.assertFalse(eid in self.client.ids, "Engine should have died")
71 71
72 72 def test_push_pull(self):
73 73 """test pushing and pulling"""
74 74 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
75 75 t = self.client.ids[-1]
76 76 v = self.client[t]
77 77 push = v.push
78 78 pull = v.pull
79 79 v.block=True
80 80 nengines = len(self.client)
81 81 push({'data':data})
82 82 d = pull('data')
83 83 self.assertEqual(d, data)
84 84 self.client[:].push({'data':data})
85 85 d = self.client[:].pull('data', block=True)
86 86 self.assertEqual(d, nengines*[data])
87 87 ar = push({'data':data}, block=False)
88 88 self.assertTrue(isinstance(ar, AsyncResult))
89 89 r = ar.get()
90 90 ar = self.client[:].pull('data', block=False)
91 91 self.assertTrue(isinstance(ar, AsyncResult))
92 92 r = ar.get()
93 93 self.assertEqual(r, nengines*[data])
94 94 self.client[:].push(dict(a=10,b=20))
95 95 r = self.client[:].pull(('a','b'), block=True)
96 96 self.assertEqual(r, nengines*[[10,20]])
97 97
98 98 def test_push_pull_function(self):
99 99 "test pushing and pulling functions"
100 100 def testf(x):
101 101 return 2.0*x
102 102
103 103 t = self.client.ids[-1]
104 104 v = self.client[t]
105 105 v.block=True
106 106 push = v.push
107 107 pull = v.pull
108 108 execute = v.execute
109 109 push({'testf':testf})
110 110 r = pull('testf')
111 111 self.assertEqual(r(1.0), testf(1.0))
112 112 execute('r = testf(10)')
113 113 r = pull('r')
114 114 self.assertEqual(r, testf(10))
115 115 ar = self.client[:].push({'testf':testf}, block=False)
116 116 ar.get()
117 117 ar = self.client[:].pull('testf', block=False)
118 118 rlist = ar.get()
119 119 for r in rlist:
120 120 self.assertEqual(r(1.0), testf(1.0))
121 121 execute("def g(x): return x*x")
122 122 r = pull(('testf','g'))
123 123 self.assertEqual((r[0](10),r[1](10)), (testf(10), 100))
124 124
125 125 def test_push_function_globals(self):
126 126 """test that pushed functions have access to globals"""
127 127 @interactive
128 128 def geta():
129 129 return a
130 130 # self.add_engines(1)
131 131 v = self.client[-1]
132 132 v.block=True
133 133 v['f'] = geta
134 134 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
135 135 v.execute('a=5')
136 136 v.execute('b=f()')
137 137 self.assertEqual(v['b'], 5)
138 138
139 139 def test_push_function_defaults(self):
140 140 """test that pushed functions preserve default args"""
141 141 def echo(a=10):
142 142 return a
143 143 v = self.client[-1]
144 144 v.block=True
145 145 v['f'] = echo
146 146 v.execute('b=f()')
147 147 self.assertEqual(v['b'], 10)
148 148
149 149 def test_get_result(self):
150 150 """test getting results from the Hub."""
151 151 c = pmod.Client(profile='iptest')
152 152 # self.add_engines(1)
153 153 t = c.ids[-1]
154 154 v = c[t]
155 155 v2 = self.client[t]
156 156 ar = v.apply_async(wait, 1)
157 157 # give the monitor time to notice the message
158 158 time.sleep(.25)
159 159 ahr = v2.get_result(ar.msg_ids)
160 160 self.assertTrue(isinstance(ahr, AsyncHubResult))
161 161 self.assertEqual(ahr.get(), ar.get())
162 162 ar2 = v2.get_result(ar.msg_ids)
163 163 self.assertFalse(isinstance(ar2, AsyncHubResult))
164 164 c.spin()
165 165 c.close()
166 166
167 167 def test_run_newline(self):
168 168 """test that run appends newline to files"""
169 169 tmpfile = mktemp()
170 170 with open(tmpfile, 'w') as f:
171 171 f.write("""def g():
172 172 return 5
173 173 """)
174 174 v = self.client[-1]
175 175 v.run(tmpfile, block=True)
176 176 self.assertEqual(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
177 177
178 178 def test_apply_tracked(self):
179 179 """test tracking for apply"""
180 180 # self.add_engines(1)
181 181 t = self.client.ids[-1]
182 182 v = self.client[t]
183 183 v.block=False
184 184 def echo(n=1024*1024, **kwargs):
185 185 with v.temp_flags(**kwargs):
186 186 return v.apply(lambda x: x, 'x'*n)
187 187 ar = echo(1, track=False)
188 188 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
189 189 self.assertTrue(ar.sent)
190 190 ar = echo(track=True)
191 191 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
192 192 self.assertEqual(ar.sent, ar._tracker.done)
193 193 ar._tracker.wait()
194 194 self.assertTrue(ar.sent)
195 195
196 196 def test_push_tracked(self):
197 197 t = self.client.ids[-1]
198 198 ns = dict(x='x'*1024*1024)
199 199 v = self.client[t]
200 200 ar = v.push(ns, block=False, track=False)
201 201 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
202 202 self.assertTrue(ar.sent)
203 203
204 204 ar = v.push(ns, block=False, track=True)
205 205 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
206 206 ar._tracker.wait()
207 207 self.assertEqual(ar.sent, ar._tracker.done)
208 208 self.assertTrue(ar.sent)
209 209 ar.get()
210 210
211 211 def test_scatter_tracked(self):
212 212 t = self.client.ids
213 213 x='x'*1024*1024
214 214 ar = self.client[t].scatter('x', x, block=False, track=False)
215 215 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
216 216 self.assertTrue(ar.sent)
217 217
218 218 ar = self.client[t].scatter('x', x, block=False, track=True)
219 219 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
220 220 self.assertEqual(ar.sent, ar._tracker.done)
221 221 ar._tracker.wait()
222 222 self.assertTrue(ar.sent)
223 223 ar.get()
224 224
225 225 def test_remote_reference(self):
226 226 v = self.client[-1]
227 227 v['a'] = 123
228 228 ra = pmod.Reference('a')
229 229 b = v.apply_sync(lambda x: x, ra)
230 230 self.assertEqual(b, 123)
231 231
232 232
233 233 def test_scatter_gather(self):
234 234 view = self.client[:]
235 235 seq1 = range(16)
236 236 view.scatter('a', seq1)
237 237 seq2 = view.gather('a', block=True)
238 238 self.assertEqual(seq2, seq1)
239 239 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
240 240
241 241 @skip_without('numpy')
242 242 def test_scatter_gather_numpy(self):
243 243 import numpy
244 244 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
245 245 view = self.client[:]
246 246 a = numpy.arange(64)
247 247 view.scatter('a', a, block=True)
248 248 b = view.gather('a', block=True)
249 249 assert_array_equal(b, a)
250 250
251 251 def test_scatter_gather_lazy(self):
252 252 """scatter/gather with targets='all'"""
253 253 view = self.client.direct_view(targets='all')
254 254 x = range(64)
255 255 view.scatter('x', x)
256 256 gathered = view.gather('x', block=True)
257 257 self.assertEqual(gathered, x)
258 258
259 259
260 260 @dec.known_failure_py3
261 261 @skip_without('numpy')
262 262 def test_push_numpy_nocopy(self):
263 263 import numpy
264 264 view = self.client[:]
265 265 a = numpy.arange(64)
266 266 view['A'] = a
267 267 @interactive
268 268 def check_writeable(x):
269 269 return x.flags.writeable
270 270
271 271 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
272 272 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
273 273
274 274 view.push(dict(B=a))
275 275 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
276 276 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
277 277
278 278 @skip_without('numpy')
279 279 def test_apply_numpy(self):
280 280 """view.apply(f, ndarray)"""
281 281 import numpy
282 282 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
283 283
284 284 A = numpy.random.random((100,100))
285 285 view = self.client[-1]
286 286 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
287 287 B = A.astype(dt)
288 288 C = view.apply_sync(lambda x:x, B)
289 289 assert_array_equal(B,C)
290 290
291 291 @skip_without('numpy')
292 292 def test_push_pull_recarray(self):
293 293 """push/pull recarrays"""
294 294 import numpy
295 295 from numpy.testing.utils import assert_array_equal
296 296
297 297 view = self.client[-1]
298 298
299 299 R = numpy.array([
300 300 (1, 'hi', 0.),
301 301 (2**30, 'there', 2.5),
302 302 (-99999, 'world', -12345.6789),
303 303 ], [('n', int), ('s', '|S10'), ('f', float)])
304 304
305 305 view['RR'] = R
306 306 R2 = view['RR']
307 307
308 308 r_dtype, r_shape = view.apply_sync(interactive(lambda : (RR.dtype, RR.shape)))
309 309 self.assertEqual(r_dtype, R.dtype)
310 310 self.assertEqual(r_shape, R.shape)
311 311 self.assertEqual(R2.dtype, R.dtype)
312 312 self.assertEqual(R2.shape, R.shape)
313 313 assert_array_equal(R2, R)
314 314
315 315 @skip_without('pandas')
316 316 def test_push_pull_timeseries(self):
317 317 """push/pull pandas.TimeSeries"""
318 318 import pandas
319 319
320 320 ts = pandas.TimeSeries(range(10))
321 321
322 322 view = self.client[-1]
323 323
324 324 view.push(dict(ts=ts), block=True)
325 325 rts = view['ts']
326 326
327 327 self.assertEqual(type(rts), type(ts))
328 328 self.assertTrue((ts == rts).all())
329 329
330 330 def test_map(self):
331 331 view = self.client[:]
332 332 def f(x):
333 333 return x**2
334 334 data = range(16)
335 335 r = view.map_sync(f, data)
336 336 self.assertEqual(r, map(f, data))
337 337
338 338 def test_map_iterable(self):
339 339 """test map on iterables (direct)"""
340 340 view = self.client[:]
341 341 # 101 is prime, so it won't be evenly distributed
342 342 arr = range(101)
343 343 # ensure it will be an iterator, even in Python 3
344 344 it = iter(arr)
345 345 r = view.map_sync(lambda x:x, arr)
346 346 self.assertEqual(r, list(arr))
347 347
348 348 def test_scatter_gather_nonblocking(self):
349 349 data = range(16)
350 350 view = self.client[:]
351 351 view.scatter('a', data, block=False)
352 352 ar = view.gather('a', block=False)
353 353 self.assertEqual(ar.get(), data)
354 354
355 355 @skip_without('numpy')
356 356 def test_scatter_gather_numpy_nonblocking(self):
357 357 import numpy
358 358 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
359 359 a = numpy.arange(64)
360 360 view = self.client[:]
361 361 ar = view.scatter('a', a, block=False)
362 362 self.assertTrue(isinstance(ar, AsyncResult))
363 363 amr = view.gather('a', block=False)
364 364 self.assertTrue(isinstance(amr, AsyncMapResult))
365 365 assert_array_equal(amr.get(), a)
366 366
367 367 def test_execute(self):
368 368 view = self.client[:]
369 369 # self.client.debug=True
370 370 execute = view.execute
371 371 ar = execute('c=30', block=False)
372 372 self.assertTrue(isinstance(ar, AsyncResult))
373 373 ar = execute('d=[0,1,2]', block=False)
374 374 self.client.wait(ar, 1)
375 375 self.assertEqual(len(ar.get()), len(self.client))
376 376 for c in view['c']:
377 377 self.assertEqual(c, 30)
378 378
379 379 def test_abort(self):
380 380 view = self.client[-1]
381 381 ar = view.execute('import time; time.sleep(1)', block=False)
382 382 ar2 = view.apply_async(lambda : 2)
383 383 ar3 = view.apply_async(lambda : 3)
384 384 view.abort(ar2)
385 385 view.abort(ar3.msg_ids)
386 386 self.assertRaises(error.TaskAborted, ar2.get)
387 387 self.assertRaises(error.TaskAborted, ar3.get)
388 388
389 389 def test_abort_all(self):
390 390 """view.abort() aborts all outstanding tasks"""
391 391 view = self.client[-1]
392 392 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
393 393 view.abort()
394 394 view.wait(timeout=5)
395 395 for ar in ars[5:]:
396 396 self.assertRaises(error.TaskAborted, ar.get)
397 397
398 398 def test_temp_flags(self):
399 399 view = self.client[-1]
400 400 view.block=True
401 401 with view.temp_flags(block=False):
402 402 self.assertFalse(view.block)
403 403 self.assertTrue(view.block)
404 404
405 405 @dec.known_failure_py3
406 406 def test_importer(self):
407 407 view = self.client[-1]
408 408 view.clear(block=True)
409 409 with view.importer:
410 410 import re
411 411
412 412 @interactive
413 413 def findall(pat, s):
414 414 # this globals() step isn't necessary in real code
415 415 # only to prevent a closure in the test
416 416 re = globals()['re']
417 417 return re.findall(pat, s)
418 418
419 419 self.assertEqual(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
420 420
421 421 def test_unicode_execute(self):
422 422 """test executing unicode strings"""
423 423 v = self.client[-1]
424 424 v.block=True
425 425 if sys.version_info[0] >= 3:
426 426 code="a='é'"
427 427 else:
428 428 code=u"a=u'é'"
429 429 v.execute(code)
430 430 self.assertEqual(v['a'], u'é')
431 431
432 432 def test_unicode_apply_result(self):
433 433 """test unicode apply results"""
434 434 v = self.client[-1]
435 435 r = v.apply_sync(lambda : u'é')
436 436 self.assertEqual(r, u'é')
437 437
438 438 def test_unicode_apply_arg(self):
439 439 """test passing unicode arguments to apply"""
440 440 v = self.client[-1]
441 441
442 442 @interactive
443 443 def check_unicode(a, check):
444 444 assert isinstance(a, unicode), "%r is not unicode"%a
445 445 assert isinstance(check, bytes), "%r is not bytes"%check
446 446 assert a.encode('utf8') == check, "%s != %s"%(a,check)
447 447
448 448 for s in [ u'é', u'ßø®∫',u'asdf' ]:
449 449 try:
450 450 v.apply_sync(check_unicode, s, s.encode('utf8'))
451 451 except error.RemoteError as e:
452 452 if e.ename == 'AssertionError':
453 453 self.fail(e.evalue)
454 454 else:
455 455 raise e
456 456
457 457 def test_map_reference(self):
458 458 """view.map(<Reference>, *seqs) should work"""
459 459 v = self.client[:]
460 460 v.scatter('n', self.client.ids, flatten=True)
461 461 v.execute("f = lambda x,y: x*y")
462 462 rf = pmod.Reference('f')
463 463 nlist = list(range(10))
464 464 mlist = nlist[::-1]
465 465 expected = [ m*n for m,n in zip(mlist, nlist) ]
466 466 result = v.map_sync(rf, mlist, nlist)
467 467 self.assertEqual(result, expected)
468 468
469 469 def test_apply_reference(self):
470 470 """view.apply(<Reference>, *args) should work"""
471 471 v = self.client[:]
472 472 v.scatter('n', self.client.ids, flatten=True)
473 473 v.execute("f = lambda x: n*x")
474 474 rf = pmod.Reference('f')
475 475 result = v.apply_sync(rf, 5)
476 476 expected = [ 5*id for id in self.client.ids ]
477 477 self.assertEqual(result, expected)
478 478
479 479 def test_eval_reference(self):
480 480 v = self.client[self.client.ids[0]]
481 481 v['g'] = range(5)
482 482 rg = pmod.Reference('g[0]')
483 483 echo = lambda x:x
484 484 self.assertEqual(v.apply_sync(echo, rg), 0)
485 485
486 486 def test_reference_nameerror(self):
487 487 v = self.client[self.client.ids[0]]
488 488 r = pmod.Reference('elvis_has_left')
489 489 echo = lambda x:x
490 490 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
491 491
492 492 def test_single_engine_map(self):
493 493 e0 = self.client[self.client.ids[0]]
494 494 r = range(5)
495 495 check = [ -1*i for i in r ]
496 496 result = e0.map_sync(lambda x: -1*x, r)
497 497 self.assertEqual(result, check)
498 498
499 499 def test_len(self):
500 500 """len(view) makes sense"""
501 501 e0 = self.client[self.client.ids[0]]
502 502 yield self.assertEqual(len(e0), 1)
503 503 v = self.client[:]
504 504 yield self.assertEqual(len(v), len(self.client.ids))
505 505 v = self.client.direct_view('all')
506 506 yield self.assertEqual(len(v), len(self.client.ids))
507 507 v = self.client[:2]
508 508 yield self.assertEqual(len(v), 2)
509 509 v = self.client[:1]
510 510 yield self.assertEqual(len(v), 1)
511 511 v = self.client.load_balanced_view()
512 512 yield self.assertEqual(len(v), len(self.client.ids))
513 513 # parametric tests seem to require manual closing?
514 514 self.client.close()
515 515
516 516
517 517 # begin execute tests
518 518
519 519 def test_execute_reply(self):
520 520 e0 = self.client[self.client.ids[0]]
521 521 e0.block = True
522 522 ar = e0.execute("5", silent=False)
523 523 er = ar.get()
524 524 self.assertEqual(str(er), "<ExecuteReply[%i]: 5>" % er.execution_count)
525 525 self.assertEqual(er.pyout['data']['text/plain'], '5')
526 526
527 527 def test_execute_reply_stdout(self):
528 528 e0 = self.client[self.client.ids[0]]
529 529 e0.block = True
530 530 ar = e0.execute("print (5)", silent=False)
531 531 er = ar.get()
532 532 self.assertEqual(er.stdout.strip(), '5')
533 533
534 534 def test_execute_pyout(self):
535 535 """execute triggers pyout with silent=False"""
536 536 view = self.client[:]
537 537 ar = view.execute("5", silent=False, block=True)
538 538
539 539 expected = [{'text/plain' : '5'}] * len(view)
540 540 mimes = [ out['data'] for out in ar.pyout ]
541 541 self.assertEqual(mimes, expected)
542 542
543 543 def test_execute_silent(self):
544 544 """execute does not trigger pyout with silent=True"""
545 545 view = self.client[:]
546 546 ar = view.execute("5", block=True)
547 547 expected = [None] * len(view)
548 548 self.assertEqual(ar.pyout, expected)
549 549
550 550 def test_execute_magic(self):
551 551 """execute accepts IPython commands"""
552 552 view = self.client[:]
553 553 view.execute("a = 5")
554 554 ar = view.execute("%whos", block=True)
555 555 # this will raise, if that failed
556 556 ar.get(5)
557 557 for stdout in ar.stdout:
558 558 lines = stdout.splitlines()
559 559 self.assertEqual(lines[0].split(), ['Variable', 'Type', 'Data/Info'])
560 560 found = False
561 561 for line in lines[2:]:
562 562 split = line.split()
563 563 if split == ['a', 'int', '5']:
564 564 found = True
565 565 break
566 566 self.assertTrue(found, "whos output wrong: %s" % stdout)
567 567
568 568 def test_execute_displaypub(self):
569 569 """execute tracks display_pub output"""
570 570 view = self.client[:]
571 571 view.execute("from IPython.core.display import *")
572 572 ar = view.execute("[ display(i) for i in range(5) ]", block=True)
573 573
574 574 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
575 575 for outputs in ar.outputs:
576 576 mimes = [ out['data'] for out in outputs ]
577 577 self.assertEqual(mimes, expected)
578 578
579 579 def test_apply_displaypub(self):
580 580 """apply tracks display_pub output"""
581 581 view = self.client[:]
582 582 view.execute("from IPython.core.display import *")
583 583
584 584 @interactive
585 585 def publish():
586 586 [ display(i) for i in range(5) ]
587 587
588 588 ar = view.apply_async(publish)
589 589 ar.get(5)
590 590 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
591 591 for outputs in ar.outputs:
592 592 mimes = [ out['data'] for out in outputs ]
593 593 self.assertEqual(mimes, expected)
594 594
595 595 def test_execute_raises(self):
596 596 """exceptions in execute requests raise appropriately"""
597 597 view = self.client[-1]
598 598 ar = view.execute("1/0")
599 599 self.assertRaisesRemote(ZeroDivisionError, ar.get, 2)
600 600
601 601 def test_remoteerror_render_exception(self):
602 602 """RemoteErrors get nice tracebacks"""
603 603 view = self.client[-1]
604 604 ar = view.execute("1/0")
605 605 ip = get_ipython()
606 606 ip.user_ns['ar'] = ar
607 607 with capture_output() as io:
608 608 ip.run_cell("ar.get(2)")
609 609
610 610 self.assertTrue('ZeroDivisionError' in io.stdout, io.stdout)
611 611
612 612 def test_compositeerror_render_exception(self):
613 613 """CompositeErrors get nice tracebacks"""
614 614 view = self.client[:]
615 615 ar = view.execute("1/0")
616 616 ip = get_ipython()
617 617 ip.user_ns['ar'] = ar
618
618 619 with capture_output() as io:
619 620 ip.run_cell("ar.get(2)")
620 621
621 self.assertEqual(io.stdout.count('ZeroDivisionError'), len(view) * 2, io.stdout)
622 self.assertEqual(io.stdout.count('by zero'), len(view), io.stdout)
623 self.assertEqual(io.stdout.count(':execute'), len(view), io.stdout)
622 count = min(error.CompositeError.tb_limit, len(view))
623
624 self.assertEqual(io.stdout.count('ZeroDivisionError'), count * 2, io.stdout)
625 self.assertEqual(io.stdout.count('by zero'), count, io.stdout)
626 self.assertEqual(io.stdout.count(':execute'), count, io.stdout)
624 627
625 628 def test_compositeerror_truncate(self):
626 629 """Truncate CompositeErrors with many exceptions"""
627 630 view = self.client[:]
628 631 msg_ids = []
629 632 for i in range(10):
630 633 ar = view.execute("1/0")
631 634 msg_ids.extend(ar.msg_ids)
632 635
633 636 ar = self.client.get_result(msg_ids)
634 637 try:
635 638 ar.get()
636 639 except error.CompositeError as e:
637 640 pass
638 641 else:
639 642 self.fail("Should have raised CompositeError")
640 643
641 644 lines = e.render_traceback()
642 645 with capture_output() as io:
643 646 e.print_traceback()
644 647
645 648 self.assertTrue("more exceptions" in lines[-1])
646 649 count = e.tb_limit
647 650
648 651 self.assertEqual(io.stdout.count('ZeroDivisionError'), 2 * count, io.stdout)
649 652 self.assertEqual(io.stdout.count('by zero'), count, io.stdout)
650 653 self.assertEqual(io.stdout.count(':execute'), count, io.stdout)
651 654
652 655 @dec.skipif_not_matplotlib
653 656 def test_magic_pylab(self):
654 657 """%pylab works on engines"""
655 658 view = self.client[-1]
656 659 ar = view.execute("%pylab inline")
657 660 # at least check if this raised:
658 661 reply = ar.get(5)
659 662 # include imports, in case user config
660 663 ar = view.execute("plot(rand(100))", silent=False)
661 664 reply = ar.get(5)
662 665 self.assertEqual(len(reply.outputs), 1)
663 666 output = reply.outputs[0]
664 667 self.assertTrue("data" in output)
665 668 data = output['data']
666 669 self.assertTrue("image/png" in data)
667 670
668 671 def test_func_default_func(self):
669 672 """interactively defined function as apply func default"""
670 673 def foo():
671 674 return 'foo'
672 675
673 676 def bar(f=foo):
674 677 return f()
675 678
676 679 view = self.client[-1]
677 680 ar = view.apply_async(bar)
678 681 r = ar.get(10)
679 682 self.assertEqual(r, 'foo')
680 683 def test_data_pub_single(self):
681 684 view = self.client[-1]
682 685 ar = view.execute('\n'.join([
683 686 'from IPython.kernel.zmq.datapub import publish_data',
684 687 'for i in range(5):',
685 688 ' publish_data(dict(i=i))'
686 689 ]), block=False)
687 690 self.assertTrue(isinstance(ar.data, dict))
688 691 ar.get(5)
689 692 self.assertEqual(ar.data, dict(i=4))
690 693
691 694 def test_data_pub(self):
692 695 view = self.client[:]
693 696 ar = view.execute('\n'.join([
694 697 'from IPython.kernel.zmq.datapub import publish_data',
695 698 'for i in range(5):',
696 699 ' publish_data(dict(i=i))'
697 700 ]), block=False)
698 701 self.assertTrue(all(isinstance(d, dict) for d in ar.data))
699 702 ar.get(5)
700 703 self.assertEqual(ar.data, [dict(i=4)] * len(ar))
701 704
702 705 def test_can_list_arg(self):
703 706 """args in lists are canned"""
704 707 view = self.client[-1]
705 708 view['a'] = 128
706 709 rA = pmod.Reference('a')
707 710 ar = view.apply_async(lambda x: x, [rA])
708 711 r = ar.get(5)
709 712 self.assertEqual(r, [128])
710 713
711 714 def test_can_dict_arg(self):
712 715 """args in dicts are canned"""
713 716 view = self.client[-1]
714 717 view['a'] = 128
715 718 rA = pmod.Reference('a')
716 719 ar = view.apply_async(lambda x: x, dict(foo=rA))
717 720 r = ar.get(5)
718 721 self.assertEqual(r, dict(foo=128))
719 722
720 723 def test_can_list_kwarg(self):
721 724 """kwargs in lists are canned"""
722 725 view = self.client[-1]
723 726 view['a'] = 128
724 727 rA = pmod.Reference('a')
725 728 ar = view.apply_async(lambda x=5: x, x=[rA])
726 729 r = ar.get(5)
727 730 self.assertEqual(r, [128])
728 731
729 732 def test_can_dict_kwarg(self):
730 733 """kwargs in dicts are canned"""
731 734 view = self.client[-1]
732 735 view['a'] = 128
733 736 rA = pmod.Reference('a')
734 737 ar = view.apply_async(lambda x=5: x, dict(foo=rA))
735 738 r = ar.get(5)
736 739 self.assertEqual(r, dict(foo=128))
737 740
738 741 def test_map_ref(self):
739 742 """view.map works with references"""
740 743 view = self.client[:]
741 744 ranks = sorted(self.client.ids)
742 745 view.scatter('rank', ranks, flatten=True)
743 746 rrank = pmod.Reference('rank')
744 747
745 748 amr = view.map_async(lambda x: x*2, [rrank] * len(view))
746 749 drank = amr.get(5)
747 750 self.assertEqual(drank, [ r*2 for r in ranks ])
748 751
749 752 def test_nested_getitem_setitem(self):
750 753 """get and set with view['a.b']"""
751 754 view = self.client[-1]
752 755 view.execute('\n'.join([
753 756 'class A(object): pass',
754 757 'a = A()',
755 758 'a.b = 128',
756 759 ]), block=True)
757 760 ra = pmod.Reference('a')
758 761
759 762 r = view.apply_sync(lambda x: x.b, ra)
760 763 self.assertEqual(r, 128)
761 764 self.assertEqual(view['a.b'], 128)
762 765
763 766 view['a.b'] = 0
764 767
765 768 r = view.apply_sync(lambda x: x.b, ra)
766 769 self.assertEqual(r, 0)
767 770 self.assertEqual(view['a.b'], 0)
768 771
769 772 def test_return_namedtuple(self):
770 773 def namedtuplify(x, y):
771 774 from IPython.parallel.tests.test_view import point
772 775 return point(x, y)
773 776
774 777 view = self.client[-1]
775 778 p = view.apply_sync(namedtuplify, 1, 2)
776 779 self.assertEqual(p.x, 1)
777 780 self.assertEqual(p.y, 2)
778 781
779 782 def test_apply_namedtuple(self):
780 783 def echoxy(p):
781 784 return p.y, p.x
782 785
783 786 view = self.client[-1]
784 787 tup = view.apply_sync(echoxy, point(1, 2))
785 788 self.assertEqual(tup, (2,1))
786 789
General Comments 0
You need to be logged in to leave comments. Login now