##// END OF EJS Templates
test namedtuple as return and apply arg
MinRK -
Show More
@@ -1,737 +1,759 b''
1 1 # -*- coding: utf-8 -*-
2 2 """test View objects
3 3
4 4 Authors:
5 5
6 6 * Min RK
7 7 """
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 import sys
20 20 import platform
21 21 import time
22 from collections import namedtuple
22 23 from tempfile import mktemp
23 24 from StringIO import StringIO
24 25
25 26 import zmq
26 27 from nose import SkipTest
27 28 from nose.plugins.attrib import attr
28 29
29 30 from IPython.testing import decorators as dec
30 31 from IPython.testing.ipunittest import ParametricTestCase
31 32 from IPython.utils.io import capture_output
32 33
33 34 from IPython import parallel as pmod
34 35 from IPython.parallel import error
35 36 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
36 37 from IPython.parallel import DirectView
37 38 from IPython.parallel.util import interactive
38 39
39 40 from IPython.parallel.tests import add_engines
40 41
41 42 from .clienttest import ClusterTestCase, crash, wait, skip_without
42 43
43 44 def setup():
44 45 add_engines(3, total=True)
45 46
47 point = namedtuple("point", "x y")
48
46 49 class TestView(ClusterTestCase, ParametricTestCase):
47 50
48 51 def setUp(self):
49 52 # On Win XP, wait for resource cleanup, else parallel test group fails
50 53 if platform.system() == "Windows" and platform.win32_ver()[0] == "XP":
51 54 # 1 sec fails. 1.5 sec seems ok. Using 2 sec for margin of safety
52 55 time.sleep(2)
53 56 super(TestView, self).setUp()
54 57
55 58 @attr('crash')
56 59 def test_z_crash_mux(self):
57 60 """test graceful handling of engine death (direct)"""
58 61 # self.add_engines(1)
59 62 eid = self.client.ids[-1]
60 63 ar = self.client[eid].apply_async(crash)
61 64 self.assertRaisesRemote(error.EngineError, ar.get, 10)
62 65 eid = ar.engine_id
63 66 tic = time.time()
64 67 while eid in self.client.ids and time.time()-tic < 5:
65 68 time.sleep(.01)
66 69 self.client.spin()
67 70 self.assertFalse(eid in self.client.ids, "Engine should have died")
68 71
69 72 def test_push_pull(self):
70 73 """test pushing and pulling"""
71 74 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
72 75 t = self.client.ids[-1]
73 76 v = self.client[t]
74 77 push = v.push
75 78 pull = v.pull
76 79 v.block=True
77 80 nengines = len(self.client)
78 81 push({'data':data})
79 82 d = pull('data')
80 83 self.assertEqual(d, data)
81 84 self.client[:].push({'data':data})
82 85 d = self.client[:].pull('data', block=True)
83 86 self.assertEqual(d, nengines*[data])
84 87 ar = push({'data':data}, block=False)
85 88 self.assertTrue(isinstance(ar, AsyncResult))
86 89 r = ar.get()
87 90 ar = self.client[:].pull('data', block=False)
88 91 self.assertTrue(isinstance(ar, AsyncResult))
89 92 r = ar.get()
90 93 self.assertEqual(r, nengines*[data])
91 94 self.client[:].push(dict(a=10,b=20))
92 95 r = self.client[:].pull(('a','b'), block=True)
93 96 self.assertEqual(r, nengines*[[10,20]])
94 97
95 98 def test_push_pull_function(self):
96 99 "test pushing and pulling functions"
97 100 def testf(x):
98 101 return 2.0*x
99 102
100 103 t = self.client.ids[-1]
101 104 v = self.client[t]
102 105 v.block=True
103 106 push = v.push
104 107 pull = v.pull
105 108 execute = v.execute
106 109 push({'testf':testf})
107 110 r = pull('testf')
108 111 self.assertEqual(r(1.0), testf(1.0))
109 112 execute('r = testf(10)')
110 113 r = pull('r')
111 114 self.assertEqual(r, testf(10))
112 115 ar = self.client[:].push({'testf':testf}, block=False)
113 116 ar.get()
114 117 ar = self.client[:].pull('testf', block=False)
115 118 rlist = ar.get()
116 119 for r in rlist:
117 120 self.assertEqual(r(1.0), testf(1.0))
118 121 execute("def g(x): return x*x")
119 122 r = pull(('testf','g'))
120 123 self.assertEqual((r[0](10),r[1](10)), (testf(10), 100))
121 124
122 125 def test_push_function_globals(self):
123 126 """test that pushed functions have access to globals"""
124 127 @interactive
125 128 def geta():
126 129 return a
127 130 # self.add_engines(1)
128 131 v = self.client[-1]
129 132 v.block=True
130 133 v['f'] = geta
131 134 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
132 135 v.execute('a=5')
133 136 v.execute('b=f()')
134 137 self.assertEqual(v['b'], 5)
135 138
136 139 def test_push_function_defaults(self):
137 140 """test that pushed functions preserve default args"""
138 141 def echo(a=10):
139 142 return a
140 143 v = self.client[-1]
141 144 v.block=True
142 145 v['f'] = echo
143 146 v.execute('b=f()')
144 147 self.assertEqual(v['b'], 10)
145 148
146 149 def test_get_result(self):
147 150 """test getting results from the Hub."""
148 151 c = pmod.Client(profile='iptest')
149 152 # self.add_engines(1)
150 153 t = c.ids[-1]
151 154 v = c[t]
152 155 v2 = self.client[t]
153 156 ar = v.apply_async(wait, 1)
154 157 # give the monitor time to notice the message
155 158 time.sleep(.25)
156 159 ahr = v2.get_result(ar.msg_ids)
157 160 self.assertTrue(isinstance(ahr, AsyncHubResult))
158 161 self.assertEqual(ahr.get(), ar.get())
159 162 ar2 = v2.get_result(ar.msg_ids)
160 163 self.assertFalse(isinstance(ar2, AsyncHubResult))
161 164 c.spin()
162 165 c.close()
163 166
164 167 def test_run_newline(self):
165 168 """test that run appends newline to files"""
166 169 tmpfile = mktemp()
167 170 with open(tmpfile, 'w') as f:
168 171 f.write("""def g():
169 172 return 5
170 173 """)
171 174 v = self.client[-1]
172 175 v.run(tmpfile, block=True)
173 176 self.assertEqual(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
174 177
175 178 def test_apply_tracked(self):
176 179 """test tracking for apply"""
177 180 # self.add_engines(1)
178 181 t = self.client.ids[-1]
179 182 v = self.client[t]
180 183 v.block=False
181 184 def echo(n=1024*1024, **kwargs):
182 185 with v.temp_flags(**kwargs):
183 186 return v.apply(lambda x: x, 'x'*n)
184 187 ar = echo(1, track=False)
185 188 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
186 189 self.assertTrue(ar.sent)
187 190 ar = echo(track=True)
188 191 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
189 192 self.assertEqual(ar.sent, ar._tracker.done)
190 193 ar._tracker.wait()
191 194 self.assertTrue(ar.sent)
192 195
193 196 def test_push_tracked(self):
194 197 t = self.client.ids[-1]
195 198 ns = dict(x='x'*1024*1024)
196 199 v = self.client[t]
197 200 ar = v.push(ns, block=False, track=False)
198 201 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
199 202 self.assertTrue(ar.sent)
200 203
201 204 ar = v.push(ns, block=False, track=True)
202 205 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
203 206 ar._tracker.wait()
204 207 self.assertEqual(ar.sent, ar._tracker.done)
205 208 self.assertTrue(ar.sent)
206 209 ar.get()
207 210
208 211 def test_scatter_tracked(self):
209 212 t = self.client.ids
210 213 x='x'*1024*1024
211 214 ar = self.client[t].scatter('x', x, block=False, track=False)
212 215 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
213 216 self.assertTrue(ar.sent)
214 217
215 218 ar = self.client[t].scatter('x', x, block=False, track=True)
216 219 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
217 220 self.assertEqual(ar.sent, ar._tracker.done)
218 221 ar._tracker.wait()
219 222 self.assertTrue(ar.sent)
220 223 ar.get()
221 224
222 225 def test_remote_reference(self):
223 226 v = self.client[-1]
224 227 v['a'] = 123
225 228 ra = pmod.Reference('a')
226 229 b = v.apply_sync(lambda x: x, ra)
227 230 self.assertEqual(b, 123)
228 231
229 232
230 233 def test_scatter_gather(self):
231 234 view = self.client[:]
232 235 seq1 = range(16)
233 236 view.scatter('a', seq1)
234 237 seq2 = view.gather('a', block=True)
235 238 self.assertEqual(seq2, seq1)
236 239 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
237 240
238 241 @skip_without('numpy')
239 242 def test_scatter_gather_numpy(self):
240 243 import numpy
241 244 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
242 245 view = self.client[:]
243 246 a = numpy.arange(64)
244 247 view.scatter('a', a, block=True)
245 248 b = view.gather('a', block=True)
246 249 assert_array_equal(b, a)
247 250
248 251 def test_scatter_gather_lazy(self):
249 252 """scatter/gather with targets='all'"""
250 253 view = self.client.direct_view(targets='all')
251 254 x = range(64)
252 255 view.scatter('x', x)
253 256 gathered = view.gather('x', block=True)
254 257 self.assertEqual(gathered, x)
255 258
256 259
257 260 @dec.known_failure_py3
258 261 @skip_without('numpy')
259 262 def test_push_numpy_nocopy(self):
260 263 import numpy
261 264 view = self.client[:]
262 265 a = numpy.arange(64)
263 266 view['A'] = a
264 267 @interactive
265 268 def check_writeable(x):
266 269 return x.flags.writeable
267 270
268 271 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
269 272 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
270 273
271 274 view.push(dict(B=a))
272 275 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
273 276 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
274 277
275 278 @skip_without('numpy')
276 279 def test_apply_numpy(self):
277 280 """view.apply(f, ndarray)"""
278 281 import numpy
279 282 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
280 283
281 284 A = numpy.random.random((100,100))
282 285 view = self.client[-1]
283 286 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
284 287 B = A.astype(dt)
285 288 C = view.apply_sync(lambda x:x, B)
286 289 assert_array_equal(B,C)
287 290
288 291 @skip_without('numpy')
289 292 def test_push_pull_recarray(self):
290 293 """push/pull recarrays"""
291 294 import numpy
292 295 from numpy.testing.utils import assert_array_equal
293 296
294 297 view = self.client[-1]
295 298
296 299 R = numpy.array([
297 300 (1, 'hi', 0.),
298 301 (2**30, 'there', 2.5),
299 302 (-99999, 'world', -12345.6789),
300 303 ], [('n', int), ('s', '|S10'), ('f', float)])
301 304
302 305 view['RR'] = R
303 306 R2 = view['RR']
304 307
305 308 r_dtype, r_shape = view.apply_sync(interactive(lambda : (RR.dtype, RR.shape)))
306 309 self.assertEqual(r_dtype, R.dtype)
307 310 self.assertEqual(r_shape, R.shape)
308 311 self.assertEqual(R2.dtype, R.dtype)
309 312 self.assertEqual(R2.shape, R.shape)
310 313 assert_array_equal(R2, R)
311 314
312 315 @skip_without('pandas')
313 316 def test_push_pull_timeseries(self):
314 317 """push/pull pandas.TimeSeries"""
315 318 import pandas
316 319
317 320 ts = pandas.TimeSeries(range(10))
318 321
319 322 view = self.client[-1]
320 323
321 324 view.push(dict(ts=ts), block=True)
322 325 rts = view['ts']
323 326
324 327 self.assertEqual(type(rts), type(ts))
325 328 self.assertTrue((ts == rts).all())
326 329
327 330 def test_map(self):
328 331 view = self.client[:]
329 332 def f(x):
330 333 return x**2
331 334 data = range(16)
332 335 r = view.map_sync(f, data)
333 336 self.assertEqual(r, map(f, data))
334 337
335 338 def test_map_iterable(self):
336 339 """test map on iterables (direct)"""
337 340 view = self.client[:]
338 341 # 101 is prime, so it won't be evenly distributed
339 342 arr = range(101)
340 343 # ensure it will be an iterator, even in Python 3
341 344 it = iter(arr)
342 345 r = view.map_sync(lambda x:x, arr)
343 346 self.assertEqual(r, list(arr))
344 347
345 348 def test_scatter_gather_nonblocking(self):
346 349 data = range(16)
347 350 view = self.client[:]
348 351 view.scatter('a', data, block=False)
349 352 ar = view.gather('a', block=False)
350 353 self.assertEqual(ar.get(), data)
351 354
352 355 @skip_without('numpy')
353 356 def test_scatter_gather_numpy_nonblocking(self):
354 357 import numpy
355 358 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
356 359 a = numpy.arange(64)
357 360 view = self.client[:]
358 361 ar = view.scatter('a', a, block=False)
359 362 self.assertTrue(isinstance(ar, AsyncResult))
360 363 amr = view.gather('a', block=False)
361 364 self.assertTrue(isinstance(amr, AsyncMapResult))
362 365 assert_array_equal(amr.get(), a)
363 366
364 367 def test_execute(self):
365 368 view = self.client[:]
366 369 # self.client.debug=True
367 370 execute = view.execute
368 371 ar = execute('c=30', block=False)
369 372 self.assertTrue(isinstance(ar, AsyncResult))
370 373 ar = execute('d=[0,1,2]', block=False)
371 374 self.client.wait(ar, 1)
372 375 self.assertEqual(len(ar.get()), len(self.client))
373 376 for c in view['c']:
374 377 self.assertEqual(c, 30)
375 378
376 379 def test_abort(self):
377 380 view = self.client[-1]
378 381 ar = view.execute('import time; time.sleep(1)', block=False)
379 382 ar2 = view.apply_async(lambda : 2)
380 383 ar3 = view.apply_async(lambda : 3)
381 384 view.abort(ar2)
382 385 view.abort(ar3.msg_ids)
383 386 self.assertRaises(error.TaskAborted, ar2.get)
384 387 self.assertRaises(error.TaskAborted, ar3.get)
385 388
386 389 def test_abort_all(self):
387 390 """view.abort() aborts all outstanding tasks"""
388 391 view = self.client[-1]
389 392 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
390 393 view.abort()
391 394 view.wait(timeout=5)
392 395 for ar in ars[5:]:
393 396 self.assertRaises(error.TaskAborted, ar.get)
394 397
395 398 def test_temp_flags(self):
396 399 view = self.client[-1]
397 400 view.block=True
398 401 with view.temp_flags(block=False):
399 402 self.assertFalse(view.block)
400 403 self.assertTrue(view.block)
401 404
402 405 @dec.known_failure_py3
403 406 def test_importer(self):
404 407 view = self.client[-1]
405 408 view.clear(block=True)
406 409 with view.importer:
407 410 import re
408 411
409 412 @interactive
410 413 def findall(pat, s):
411 414 # this globals() step isn't necessary in real code
412 415 # only to prevent a closure in the test
413 416 re = globals()['re']
414 417 return re.findall(pat, s)
415 418
416 419 self.assertEqual(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
417 420
418 421 def test_unicode_execute(self):
419 422 """test executing unicode strings"""
420 423 v = self.client[-1]
421 424 v.block=True
422 425 if sys.version_info[0] >= 3:
423 426 code="a='é'"
424 427 else:
425 428 code=u"a=u'é'"
426 429 v.execute(code)
427 430 self.assertEqual(v['a'], u'é')
428 431
429 432 def test_unicode_apply_result(self):
430 433 """test unicode apply results"""
431 434 v = self.client[-1]
432 435 r = v.apply_sync(lambda : u'é')
433 436 self.assertEqual(r, u'é')
434 437
435 438 def test_unicode_apply_arg(self):
436 439 """test passing unicode arguments to apply"""
437 440 v = self.client[-1]
438 441
439 442 @interactive
440 443 def check_unicode(a, check):
441 444 assert isinstance(a, unicode), "%r is not unicode"%a
442 445 assert isinstance(check, bytes), "%r is not bytes"%check
443 446 assert a.encode('utf8') == check, "%s != %s"%(a,check)
444 447
445 448 for s in [ u'é', u'ßø®∫',u'asdf' ]:
446 449 try:
447 450 v.apply_sync(check_unicode, s, s.encode('utf8'))
448 451 except error.RemoteError as e:
449 452 if e.ename == 'AssertionError':
450 453 self.fail(e.evalue)
451 454 else:
452 455 raise e
453 456
454 457 def test_map_reference(self):
455 458 """view.map(<Reference>, *seqs) should work"""
456 459 v = self.client[:]
457 460 v.scatter('n', self.client.ids, flatten=True)
458 461 v.execute("f = lambda x,y: x*y")
459 462 rf = pmod.Reference('f')
460 463 nlist = list(range(10))
461 464 mlist = nlist[::-1]
462 465 expected = [ m*n for m,n in zip(mlist, nlist) ]
463 466 result = v.map_sync(rf, mlist, nlist)
464 467 self.assertEqual(result, expected)
465 468
466 469 def test_apply_reference(self):
467 470 """view.apply(<Reference>, *args) should work"""
468 471 v = self.client[:]
469 472 v.scatter('n', self.client.ids, flatten=True)
470 473 v.execute("f = lambda x: n*x")
471 474 rf = pmod.Reference('f')
472 475 result = v.apply_sync(rf, 5)
473 476 expected = [ 5*id for id in self.client.ids ]
474 477 self.assertEqual(result, expected)
475 478
476 479 def test_eval_reference(self):
477 480 v = self.client[self.client.ids[0]]
478 481 v['g'] = range(5)
479 482 rg = pmod.Reference('g[0]')
480 483 echo = lambda x:x
481 484 self.assertEqual(v.apply_sync(echo, rg), 0)
482 485
483 486 def test_reference_nameerror(self):
484 487 v = self.client[self.client.ids[0]]
485 488 r = pmod.Reference('elvis_has_left')
486 489 echo = lambda x:x
487 490 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
488 491
489 492 def test_single_engine_map(self):
490 493 e0 = self.client[self.client.ids[0]]
491 494 r = range(5)
492 495 check = [ -1*i for i in r ]
493 496 result = e0.map_sync(lambda x: -1*x, r)
494 497 self.assertEqual(result, check)
495 498
496 499 def test_len(self):
497 500 """len(view) makes sense"""
498 501 e0 = self.client[self.client.ids[0]]
499 502 yield self.assertEqual(len(e0), 1)
500 503 v = self.client[:]
501 504 yield self.assertEqual(len(v), len(self.client.ids))
502 505 v = self.client.direct_view('all')
503 506 yield self.assertEqual(len(v), len(self.client.ids))
504 507 v = self.client[:2]
505 508 yield self.assertEqual(len(v), 2)
506 509 v = self.client[:1]
507 510 yield self.assertEqual(len(v), 1)
508 511 v = self.client.load_balanced_view()
509 512 yield self.assertEqual(len(v), len(self.client.ids))
510 513 # parametric tests seem to require manual closing?
511 514 self.client.close()
512 515
513 516
514 517 # begin execute tests
515 518
516 519 def test_execute_reply(self):
517 520 e0 = self.client[self.client.ids[0]]
518 521 e0.block = True
519 522 ar = e0.execute("5", silent=False)
520 523 er = ar.get()
521 524 self.assertEqual(str(er), "<ExecuteReply[%i]: 5>" % er.execution_count)
522 525 self.assertEqual(er.pyout['data']['text/plain'], '5')
523 526
524 527 def test_execute_reply_stdout(self):
525 528 e0 = self.client[self.client.ids[0]]
526 529 e0.block = True
527 530 ar = e0.execute("print (5)", silent=False)
528 531 er = ar.get()
529 532 self.assertEqual(er.stdout.strip(), '5')
530 533
531 534 def test_execute_pyout(self):
532 535 """execute triggers pyout with silent=False"""
533 536 view = self.client[:]
534 537 ar = view.execute("5", silent=False, block=True)
535 538
536 539 expected = [{'text/plain' : '5'}] * len(view)
537 540 mimes = [ out['data'] for out in ar.pyout ]
538 541 self.assertEqual(mimes, expected)
539 542
540 543 def test_execute_silent(self):
541 544 """execute does not trigger pyout with silent=True"""
542 545 view = self.client[:]
543 546 ar = view.execute("5", block=True)
544 547 expected = [None] * len(view)
545 548 self.assertEqual(ar.pyout, expected)
546 549
547 550 def test_execute_magic(self):
548 551 """execute accepts IPython commands"""
549 552 view = self.client[:]
550 553 view.execute("a = 5")
551 554 ar = view.execute("%whos", block=True)
552 555 # this will raise, if that failed
553 556 ar.get(5)
554 557 for stdout in ar.stdout:
555 558 lines = stdout.splitlines()
556 559 self.assertEqual(lines[0].split(), ['Variable', 'Type', 'Data/Info'])
557 560 found = False
558 561 for line in lines[2:]:
559 562 split = line.split()
560 563 if split == ['a', 'int', '5']:
561 564 found = True
562 565 break
563 566 self.assertTrue(found, "whos output wrong: %s" % stdout)
564 567
565 568 def test_execute_displaypub(self):
566 569 """execute tracks display_pub output"""
567 570 view = self.client[:]
568 571 view.execute("from IPython.core.display import *")
569 572 ar = view.execute("[ display(i) for i in range(5) ]", block=True)
570 573
571 574 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
572 575 for outputs in ar.outputs:
573 576 mimes = [ out['data'] for out in outputs ]
574 577 self.assertEqual(mimes, expected)
575 578
576 579 def test_apply_displaypub(self):
577 580 """apply tracks display_pub output"""
578 581 view = self.client[:]
579 582 view.execute("from IPython.core.display import *")
580 583
581 584 @interactive
582 585 def publish():
583 586 [ display(i) for i in range(5) ]
584 587
585 588 ar = view.apply_async(publish)
586 589 ar.get(5)
587 590 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
588 591 for outputs in ar.outputs:
589 592 mimes = [ out['data'] for out in outputs ]
590 593 self.assertEqual(mimes, expected)
591 594
592 595 def test_execute_raises(self):
593 596 """exceptions in execute requests raise appropriately"""
594 597 view = self.client[-1]
595 598 ar = view.execute("1/0")
596 599 self.assertRaisesRemote(ZeroDivisionError, ar.get, 2)
597 600
598 601 def test_remoteerror_render_exception(self):
599 602 """RemoteErrors get nice tracebacks"""
600 603 view = self.client[-1]
601 604 ar = view.execute("1/0")
602 605 ip = get_ipython()
603 606 ip.user_ns['ar'] = ar
604 607 with capture_output() as io:
605 608 ip.run_cell("ar.get(2)")
606 609
607 610 self.assertTrue('ZeroDivisionError' in io.stdout, io.stdout)
608 611
609 612 def test_compositeerror_render_exception(self):
610 613 """CompositeErrors get nice tracebacks"""
611 614 view = self.client[:]
612 615 ar = view.execute("1/0")
613 616 ip = get_ipython()
614 617 ip.user_ns['ar'] = ar
615 618 with capture_output() as io:
616 619 ip.run_cell("ar.get(2)")
617 620
618 621 self.assertEqual(io.stdout.count('ZeroDivisionError'), len(view) * 2, io.stdout)
619 622 self.assertEqual(io.stdout.count('by zero'), len(view), io.stdout)
620 623 self.assertEqual(io.stdout.count(':execute'), len(view), io.stdout)
621 624
622 625 @dec.skipif_not_matplotlib
623 626 def test_magic_pylab(self):
624 627 """%pylab works on engines"""
625 628 view = self.client[-1]
626 629 ar = view.execute("%pylab inline")
627 630 # at least check if this raised:
628 631 reply = ar.get(5)
629 632 # include imports, in case user config
630 633 ar = view.execute("plot(rand(100))", silent=False)
631 634 reply = ar.get(5)
632 635 self.assertEqual(len(reply.outputs), 1)
633 636 output = reply.outputs[0]
634 637 self.assertTrue("data" in output)
635 638 data = output['data']
636 639 self.assertTrue("image/png" in data)
637 640
638 641 def test_func_default_func(self):
639 642 """interactively defined function as apply func default"""
640 643 def foo():
641 644 return 'foo'
642 645
643 646 def bar(f=foo):
644 647 return f()
645 648
646 649 view = self.client[-1]
647 650 ar = view.apply_async(bar)
648 651 r = ar.get(10)
649 652 self.assertEqual(r, 'foo')
650 653 def test_data_pub_single(self):
651 654 view = self.client[-1]
652 655 ar = view.execute('\n'.join([
653 656 'from IPython.kernel.zmq.datapub import publish_data',
654 657 'for i in range(5):',
655 658 ' publish_data(dict(i=i))'
656 659 ]), block=False)
657 660 self.assertTrue(isinstance(ar.data, dict))
658 661 ar.get(5)
659 662 self.assertEqual(ar.data, dict(i=4))
660 663
661 664 def test_data_pub(self):
662 665 view = self.client[:]
663 666 ar = view.execute('\n'.join([
664 667 'from IPython.kernel.zmq.datapub import publish_data',
665 668 'for i in range(5):',
666 669 ' publish_data(dict(i=i))'
667 670 ]), block=False)
668 671 self.assertTrue(all(isinstance(d, dict) for d in ar.data))
669 672 ar.get(5)
670 673 self.assertEqual(ar.data, [dict(i=4)] * len(ar))
671 674
672 675 def test_can_list_arg(self):
673 676 """args in lists are canned"""
674 677 view = self.client[-1]
675 678 view['a'] = 128
676 679 rA = pmod.Reference('a')
677 680 ar = view.apply_async(lambda x: x, [rA])
678 681 r = ar.get(5)
679 682 self.assertEqual(r, [128])
680 683
681 684 def test_can_dict_arg(self):
682 685 """args in dicts are canned"""
683 686 view = self.client[-1]
684 687 view['a'] = 128
685 688 rA = pmod.Reference('a')
686 689 ar = view.apply_async(lambda x: x, dict(foo=rA))
687 690 r = ar.get(5)
688 691 self.assertEqual(r, dict(foo=128))
689 692
690 693 def test_can_list_kwarg(self):
691 694 """kwargs in lists are canned"""
692 695 view = self.client[-1]
693 696 view['a'] = 128
694 697 rA = pmod.Reference('a')
695 698 ar = view.apply_async(lambda x=5: x, x=[rA])
696 699 r = ar.get(5)
697 700 self.assertEqual(r, [128])
698 701
699 702 def test_can_dict_kwarg(self):
700 703 """kwargs in dicts are canned"""
701 704 view = self.client[-1]
702 705 view['a'] = 128
703 706 rA = pmod.Reference('a')
704 707 ar = view.apply_async(lambda x=5: x, dict(foo=rA))
705 708 r = ar.get(5)
706 709 self.assertEqual(r, dict(foo=128))
707 710
708 711 def test_map_ref(self):
709 712 """view.map works with references"""
710 713 view = self.client[:]
711 714 ranks = sorted(self.client.ids)
712 715 view.scatter('rank', ranks, flatten=True)
713 716 rrank = pmod.Reference('rank')
714 717
715 718 amr = view.map_async(lambda x: x*2, [rrank] * len(view))
716 719 drank = amr.get(5)
717 720 self.assertEqual(drank, [ r*2 for r in ranks ])
718 721
719 722 def test_nested_getitem_setitem(self):
720 723 """get and set with view['a.b']"""
721 724 view = self.client[-1]
722 725 view.execute('\n'.join([
723 726 'class A(object): pass',
724 727 'a = A()',
725 728 'a.b = 128',
726 729 ]), block=True)
727 730 ra = pmod.Reference('a')
728 731
729 732 r = view.apply_sync(lambda x: x.b, ra)
730 733 self.assertEqual(r, 128)
731 734 self.assertEqual(view['a.b'], 128)
732 735
733 736 view['a.b'] = 0
734 737
735 738 r = view.apply_sync(lambda x: x.b, ra)
736 739 self.assertEqual(r, 0)
737 740 self.assertEqual(view['a.b'], 0)
741
742 def test_return_namedtuple(self):
743 def namedtuplify(x, y):
744 from IPython.parallel.tests.test_view import point
745 return point(x, y)
746
747 view = self.client[-1]
748 p = view.apply_sync(namedtuplify, 1, 2)
749 self.assertEqual(p.x, 1)
750 self.assertEqual(p.y, 2)
751
752 def test_apply_namedtuple(self):
753 def echoxy(p):
754 return p.y, p.x
755
756 view = self.client[-1]
757 tup = view.apply_sync(echoxy, point(1, 2))
758 self.assertEqual(tup, (2,1))
759
General Comments 0
You need to be logged in to leave comments. Login now