##// END OF EJS Templates
test new execute/output behaviors
MinRK -
Show More
@@ -1,571 +1,657 b''
1 1 # -*- coding: utf-8 -*-
2 2 """test View objects
3 3
4 4 Authors:
5 5
6 6 * Min RK
7 7 """
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 import sys
20 20 import time
21 21 from tempfile import mktemp
22 22 from StringIO import StringIO
23 23
24 24 import zmq
25 25 from nose import SkipTest
26 26
27 27 from IPython.testing import decorators as dec
28 28 from IPython.testing.ipunittest import ParametricTestCase
29 29
30 30 from IPython import parallel as pmod
31 31 from IPython.parallel import error
32 32 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
33 33 from IPython.parallel import DirectView
34 34 from IPython.parallel.util import interactive
35 35
36 36 from IPython.parallel.tests import add_engines
37 37
38 38 from .clienttest import ClusterTestCase, crash, wait, skip_without
39 39
40 40 def setup():
41 41 add_engines(3, total=True)
42 42
43 43 class TestView(ClusterTestCase, ParametricTestCase):
44 44
45 45 def test_z_crash_mux(self):
46 46 """test graceful handling of engine death (direct)"""
47 47 raise SkipTest("crash tests disabled, due to undesirable crash reports")
48 48 # self.add_engines(1)
49 49 eid = self.client.ids[-1]
50 50 ar = self.client[eid].apply_async(crash)
51 51 self.assertRaisesRemote(error.EngineError, ar.get, 10)
52 52 eid = ar.engine_id
53 53 tic = time.time()
54 54 while eid in self.client.ids and time.time()-tic < 5:
55 55 time.sleep(.01)
56 56 self.client.spin()
57 57 self.assertFalse(eid in self.client.ids, "Engine should have died")
58 58
59 59 def test_push_pull(self):
60 60 """test pushing and pulling"""
61 61 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
62 62 t = self.client.ids[-1]
63 63 v = self.client[t]
64 64 push = v.push
65 65 pull = v.pull
66 66 v.block=True
67 67 nengines = len(self.client)
68 68 push({'data':data})
69 69 d = pull('data')
70 70 self.assertEquals(d, data)
71 71 self.client[:].push({'data':data})
72 72 d = self.client[:].pull('data', block=True)
73 73 self.assertEquals(d, nengines*[data])
74 74 ar = push({'data':data}, block=False)
75 75 self.assertTrue(isinstance(ar, AsyncResult))
76 76 r = ar.get()
77 77 ar = self.client[:].pull('data', block=False)
78 78 self.assertTrue(isinstance(ar, AsyncResult))
79 79 r = ar.get()
80 80 self.assertEquals(r, nengines*[data])
81 81 self.client[:].push(dict(a=10,b=20))
82 82 r = self.client[:].pull(('a','b'), block=True)
83 83 self.assertEquals(r, nengines*[[10,20]])
84 84
85 85 def test_push_pull_function(self):
86 86 "test pushing and pulling functions"
87 87 def testf(x):
88 88 return 2.0*x
89 89
90 90 t = self.client.ids[-1]
91 91 v = self.client[t]
92 92 v.block=True
93 93 push = v.push
94 94 pull = v.pull
95 95 execute = v.execute
96 96 push({'testf':testf})
97 97 r = pull('testf')
98 98 self.assertEqual(r(1.0), testf(1.0))
99 99 execute('r = testf(10)')
100 100 r = pull('r')
101 101 self.assertEquals(r, testf(10))
102 102 ar = self.client[:].push({'testf':testf}, block=False)
103 103 ar.get()
104 104 ar = self.client[:].pull('testf', block=False)
105 105 rlist = ar.get()
106 106 for r in rlist:
107 107 self.assertEqual(r(1.0), testf(1.0))
108 108 execute("def g(x): return x*x")
109 109 r = pull(('testf','g'))
110 110 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
111 111
112 112 def test_push_function_globals(self):
113 113 """test that pushed functions have access to globals"""
114 114 @interactive
115 115 def geta():
116 116 return a
117 117 # self.add_engines(1)
118 118 v = self.client[-1]
119 119 v.block=True
120 120 v['f'] = geta
121 121 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
122 122 v.execute('a=5')
123 123 v.execute('b=f()')
124 124 self.assertEquals(v['b'], 5)
125 125
126 126 def test_push_function_defaults(self):
127 127 """test that pushed functions preserve default args"""
128 128 def echo(a=10):
129 129 return a
130 130 v = self.client[-1]
131 131 v.block=True
132 132 v['f'] = echo
133 133 v.execute('b=f()')
134 134 self.assertEquals(v['b'], 10)
135 135
136 136 def test_get_result(self):
137 137 """test getting results from the Hub."""
138 138 c = pmod.Client(profile='iptest')
139 139 # self.add_engines(1)
140 140 t = c.ids[-1]
141 141 v = c[t]
142 142 v2 = self.client[t]
143 143 ar = v.apply_async(wait, 1)
144 144 # give the monitor time to notice the message
145 145 time.sleep(.25)
146 146 ahr = v2.get_result(ar.msg_ids)
147 147 self.assertTrue(isinstance(ahr, AsyncHubResult))
148 148 self.assertEquals(ahr.get(), ar.get())
149 149 ar2 = v2.get_result(ar.msg_ids)
150 150 self.assertFalse(isinstance(ar2, AsyncHubResult))
151 151 c.spin()
152 152 c.close()
153 153
154 154 def test_run_newline(self):
155 155 """test that run appends newline to files"""
156 156 tmpfile = mktemp()
157 157 with open(tmpfile, 'w') as f:
158 158 f.write("""def g():
159 159 return 5
160 160 """)
161 161 v = self.client[-1]
162 162 v.run(tmpfile, block=True)
163 163 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
164 164
165 165 def test_apply_tracked(self):
166 166 """test tracking for apply"""
167 167 # self.add_engines(1)
168 168 t = self.client.ids[-1]
169 169 v = self.client[t]
170 170 v.block=False
171 171 def echo(n=1024*1024, **kwargs):
172 172 with v.temp_flags(**kwargs):
173 173 return v.apply(lambda x: x, 'x'*n)
174 174 ar = echo(1, track=False)
175 175 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
176 176 self.assertTrue(ar.sent)
177 177 ar = echo(track=True)
178 178 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
179 179 self.assertEquals(ar.sent, ar._tracker.done)
180 180 ar._tracker.wait()
181 181 self.assertTrue(ar.sent)
182 182
183 183 def test_push_tracked(self):
184 184 t = self.client.ids[-1]
185 185 ns = dict(x='x'*1024*1024)
186 186 v = self.client[t]
187 187 ar = v.push(ns, block=False, track=False)
188 188 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
189 189 self.assertTrue(ar.sent)
190 190
191 191 ar = v.push(ns, block=False, track=True)
192 192 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
193 193 ar._tracker.wait()
194 194 self.assertEquals(ar.sent, ar._tracker.done)
195 195 self.assertTrue(ar.sent)
196 196 ar.get()
197 197
198 198 def test_scatter_tracked(self):
199 199 t = self.client.ids
200 200 x='x'*1024*1024
201 201 ar = self.client[t].scatter('x', x, block=False, track=False)
202 202 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
203 203 self.assertTrue(ar.sent)
204 204
205 205 ar = self.client[t].scatter('x', x, block=False, track=True)
206 206 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
207 207 self.assertEquals(ar.sent, ar._tracker.done)
208 208 ar._tracker.wait()
209 209 self.assertTrue(ar.sent)
210 210 ar.get()
211 211
212 212 def test_remote_reference(self):
213 213 v = self.client[-1]
214 214 v['a'] = 123
215 215 ra = pmod.Reference('a')
216 216 b = v.apply_sync(lambda x: x, ra)
217 217 self.assertEquals(b, 123)
218 218
219 219
220 220 def test_scatter_gather(self):
221 221 view = self.client[:]
222 222 seq1 = range(16)
223 223 view.scatter('a', seq1)
224 224 seq2 = view.gather('a', block=True)
225 225 self.assertEquals(seq2, seq1)
226 226 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
227 227
228 228 @skip_without('numpy')
229 229 def test_scatter_gather_numpy(self):
230 230 import numpy
231 231 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
232 232 view = self.client[:]
233 233 a = numpy.arange(64)
234 234 view.scatter('a', a)
235 235 b = view.gather('a', block=True)
236 236 assert_array_equal(b, a)
237 237
238 238 def test_scatter_gather_lazy(self):
239 239 """scatter/gather with targets='all'"""
240 240 view = self.client.direct_view(targets='all')
241 241 x = range(64)
242 242 view.scatter('x', x)
243 243 gathered = view.gather('x', block=True)
244 244 self.assertEquals(gathered, x)
245 245
246 246
247 247 @dec.known_failure_py3
248 248 @skip_without('numpy')
249 249 def test_push_numpy_nocopy(self):
250 250 import numpy
251 251 view = self.client[:]
252 252 a = numpy.arange(64)
253 253 view['A'] = a
254 254 @interactive
255 255 def check_writeable(x):
256 256 return x.flags.writeable
257 257
258 258 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
259 259 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
260 260
261 261 view.push(dict(B=a))
262 262 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
263 263 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
264 264
265 265 @skip_without('numpy')
266 266 def test_apply_numpy(self):
267 267 """view.apply(f, ndarray)"""
268 268 import numpy
269 269 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
270 270
271 271 A = numpy.random.random((100,100))
272 272 view = self.client[-1]
273 273 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
274 274 B = A.astype(dt)
275 275 C = view.apply_sync(lambda x:x, B)
276 276 assert_array_equal(B,C)
277 277
278 278 def test_map(self):
279 279 view = self.client[:]
280 280 def f(x):
281 281 return x**2
282 282 data = range(16)
283 283 r = view.map_sync(f, data)
284 284 self.assertEquals(r, map(f, data))
285 285
286 286 def test_map_iterable(self):
287 287 """test map on iterables (direct)"""
288 288 view = self.client[:]
289 289 # 101 is prime, so it won't be evenly distributed
290 290 arr = range(101)
291 291 # ensure it will be an iterator, even in Python 3
292 292 it = iter(arr)
293 293 r = view.map_sync(lambda x:x, arr)
294 294 self.assertEquals(r, list(arr))
295 295
296 296 def test_scatterGatherNonblocking(self):
297 297 data = range(16)
298 298 view = self.client[:]
299 299 view.scatter('a', data, block=False)
300 300 ar = view.gather('a', block=False)
301 301 self.assertEquals(ar.get(), data)
302 302
303 303 @skip_without('numpy')
304 304 def test_scatter_gather_numpy_nonblocking(self):
305 305 import numpy
306 306 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
307 307 a = numpy.arange(64)
308 308 view = self.client[:]
309 309 ar = view.scatter('a', a, block=False)
310 310 self.assertTrue(isinstance(ar, AsyncResult))
311 311 amr = view.gather('a', block=False)
312 312 self.assertTrue(isinstance(amr, AsyncMapResult))
313 313 assert_array_equal(amr.get(), a)
314 314
315 315 def test_execute(self):
316 316 view = self.client[:]
317 317 # self.client.debug=True
318 318 execute = view.execute
319 319 ar = execute('c=30', block=False)
320 320 self.assertTrue(isinstance(ar, AsyncResult))
321 321 ar = execute('d=[0,1,2]', block=False)
322 322 self.client.wait(ar, 1)
323 323 self.assertEquals(len(ar.get()), len(self.client))
324 324 for c in view['c']:
325 325 self.assertEquals(c, 30)
326 326
327 327 def test_abort(self):
328 328 view = self.client[-1]
329 329 ar = view.execute('import time; time.sleep(1)', block=False)
330 330 ar2 = view.apply_async(lambda : 2)
331 331 ar3 = view.apply_async(lambda : 3)
332 332 view.abort(ar2)
333 333 view.abort(ar3.msg_ids)
334 334 self.assertRaises(error.TaskAborted, ar2.get)
335 335 self.assertRaises(error.TaskAborted, ar3.get)
336 336
337 337 def test_abort_all(self):
338 338 """view.abort() aborts all outstanding tasks"""
339 339 view = self.client[-1]
340 340 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
341 341 view.abort()
342 342 view.wait(timeout=5)
343 343 for ar in ars[5:]:
344 344 self.assertRaises(error.TaskAborted, ar.get)
345 345
346 346 def test_temp_flags(self):
347 347 view = self.client[-1]
348 348 view.block=True
349 349 with view.temp_flags(block=False):
350 350 self.assertFalse(view.block)
351 351 self.assertTrue(view.block)
352 352
353 353 @dec.known_failure_py3
354 354 def test_importer(self):
355 355 view = self.client[-1]
356 356 view.clear(block=True)
357 357 with view.importer:
358 358 import re
359 359
360 360 @interactive
361 361 def findall(pat, s):
362 362 # this globals() step isn't necessary in real code
363 363 # only to prevent a closure in the test
364 364 re = globals()['re']
365 365 return re.findall(pat, s)
366 366
367 367 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
368 368
369 369 # parallel magic tests
370 370
371 371 def test_magic_px_blocking(self):
372 372 ip = get_ipython()
373 373 v = self.client[-1]
374 374 v.activate()
375 375 v.block=True
376 376
377 377 ip.magic_px('a=5')
378 378 self.assertEquals(v['a'], 5)
379 379 ip.magic_px('a=10')
380 380 self.assertEquals(v['a'], 10)
381 381 sio = StringIO()
382 382 savestdout = sys.stdout
383 383 sys.stdout = sio
384 384 # just 'print a' worst ~99% of the time, but this ensures that
385 385 # the stdout message has arrived when the result is finished:
386 386 ip.magic_px('import sys,time;print (a); sys.stdout.flush();time.sleep(0.2)')
387 387 sys.stdout = savestdout
388 388 buf = sio.getvalue()
389 389 self.assertTrue('[stdout:' in buf, buf)
390 390 self.assertTrue(buf.rstrip().endswith('10'))
391 391 self.assertRaisesRemote(ZeroDivisionError, ip.magic_px, '1/0')
392 392
393 393 def test_magic_px_nonblocking(self):
394 394 ip = get_ipython()
395 395 v = self.client[-1]
396 396 v.activate()
397 397 v.block=False
398 398
399 399 ip.magic_px('a=5')
400 400 self.assertEquals(v['a'], 5)
401 401 ip.magic_px('a=10')
402 402 self.assertEquals(v['a'], 10)
403 403 sio = StringIO()
404 404 savestdout = sys.stdout
405 405 sys.stdout = sio
406 406 ip.magic_px('print a')
407 407 sys.stdout = savestdout
408 408 buf = sio.getvalue()
409 409 self.assertFalse('[stdout:%i]'%v.targets in buf)
410 410 ip.magic_px('1/0')
411 411 ar = v.get_result(-1)
412 412 self.assertRaisesRemote(ZeroDivisionError, ar.get)
413 413
414 414 def test_magic_autopx_blocking(self):
415 415 ip = get_ipython()
416 416 v = self.client[-1]
417 417 v.activate()
418 418 v.block=True
419 419
420 420 sio = StringIO()
421 421 savestdout = sys.stdout
422 422 sys.stdout = sio
423 423 ip.magic_autopx()
424 424 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
425 425 ip.run_cell('b*=2')
426 426 ip.run_cell('print (b)')
427 427 ip.run_cell("b/c")
428 428 ip.magic_autopx()
429 429 sys.stdout = savestdout
430 430 output = sio.getvalue().strip()
431 431 self.assertTrue(output.startswith('%autopx enabled'))
432 432 self.assertTrue(output.endswith('%autopx disabled'))
433 433 self.assertTrue('RemoteError: ZeroDivisionError' in output)
434 434 ar = v.get_result(-1)
435 435 self.assertEquals(v['a'], 5)
436 436 self.assertEquals(v['b'], 20)
437 437 self.assertRaisesRemote(ZeroDivisionError, ar.get)
438 438
439 439 def test_magic_autopx_nonblocking(self):
440 440 ip = get_ipython()
441 441 v = self.client[-1]
442 442 v.activate()
443 443 v.block=False
444 444
445 445 sio = StringIO()
446 446 savestdout = sys.stdout
447 447 sys.stdout = sio
448 448 ip.magic_autopx()
449 449 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
450 450 ip.run_cell('print (b)')
451 451 ip.run_cell("b/c")
452 452 ip.run_cell('b*=2')
453 453 ip.magic_autopx()
454 454 sys.stdout = savestdout
455 455 output = sio.getvalue().strip()
456 456 self.assertTrue(output.startswith('%autopx enabled'))
457 457 self.assertTrue(output.endswith('%autopx disabled'))
458 458 self.assertFalse('ZeroDivisionError' in output)
459 459 ar = v.get_result(-2)
460 460 self.assertEquals(v['a'], 5)
461 461 self.assertEquals(v['b'], 20)
462 462 self.assertRaisesRemote(ZeroDivisionError, ar.get)
463 463
464 464 def test_magic_result(self):
465 465 ip = get_ipython()
466 466 v = self.client[-1]
467 467 v.activate()
468 468 v['a'] = 111
469 469 ra = v['a']
470 470
471 471 ar = ip.magic_result()
472 472 self.assertEquals(ar.msg_ids, [v.history[-1]])
473 473 self.assertEquals(ar.get(), 111)
474 474 ar = ip.magic_result('-2')
475 475 self.assertEquals(ar.msg_ids, [v.history[-2]])
476 476
477 477 def test_unicode_execute(self):
478 478 """test executing unicode strings"""
479 479 v = self.client[-1]
480 480 v.block=True
481 481 if sys.version_info[0] >= 3:
482 482 code="a='é'"
483 483 else:
484 484 code=u"a=u'é'"
485 485 v.execute(code)
486 486 self.assertEquals(v['a'], u'é')
487 487
488 488 def test_unicode_apply_result(self):
489 489 """test unicode apply results"""
490 490 v = self.client[-1]
491 491 r = v.apply_sync(lambda : u'é')
492 492 self.assertEquals(r, u'é')
493 493
494 494 def test_unicode_apply_arg(self):
495 495 """test passing unicode arguments to apply"""
496 496 v = self.client[-1]
497 497
498 498 @interactive
499 499 def check_unicode(a, check):
500 500 assert isinstance(a, unicode), "%r is not unicode"%a
501 501 assert isinstance(check, bytes), "%r is not bytes"%check
502 502 assert a.encode('utf8') == check, "%s != %s"%(a,check)
503 503
504 504 for s in [ u'é', u'ßø®∫',u'asdf' ]:
505 505 try:
506 506 v.apply_sync(check_unicode, s, s.encode('utf8'))
507 507 except error.RemoteError as e:
508 508 if e.ename == 'AssertionError':
509 509 self.fail(e.evalue)
510 510 else:
511 511 raise e
512 512
513 513 def test_map_reference(self):
514 514 """view.map(<Reference>, *seqs) should work"""
515 515 v = self.client[:]
516 516 v.scatter('n', self.client.ids, flatten=True)
517 517 v.execute("f = lambda x,y: x*y")
518 518 rf = pmod.Reference('f')
519 519 nlist = list(range(10))
520 520 mlist = nlist[::-1]
521 521 expected = [ m*n for m,n in zip(mlist, nlist) ]
522 522 result = v.map_sync(rf, mlist, nlist)
523 523 self.assertEquals(result, expected)
524 524
525 525 def test_apply_reference(self):
526 526 """view.apply(<Reference>, *args) should work"""
527 527 v = self.client[:]
528 528 v.scatter('n', self.client.ids, flatten=True)
529 529 v.execute("f = lambda x: n*x")
530 530 rf = pmod.Reference('f')
531 531 result = v.apply_sync(rf, 5)
532 532 expected = [ 5*id for id in self.client.ids ]
533 533 self.assertEquals(result, expected)
534 534
535 535 def test_eval_reference(self):
536 536 v = self.client[self.client.ids[0]]
537 537 v['g'] = range(5)
538 538 rg = pmod.Reference('g[0]')
539 539 echo = lambda x:x
540 540 self.assertEquals(v.apply_sync(echo, rg), 0)
541 541
542 542 def test_reference_nameerror(self):
543 543 v = self.client[self.client.ids[0]]
544 544 r = pmod.Reference('elvis_has_left')
545 545 echo = lambda x:x
546 546 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
547 547
548 548 def test_single_engine_map(self):
549 549 e0 = self.client[self.client.ids[0]]
550 550 r = range(5)
551 551 check = [ -1*i for i in r ]
552 552 result = e0.map_sync(lambda x: -1*x, r)
553 553 self.assertEquals(result, check)
554 554
555 555 def test_len(self):
556 556 """len(view) makes sense"""
557 557 e0 = self.client[self.client.ids[0]]
558 558 yield self.assertEquals(len(e0), 1)
559 559 v = self.client[:]
560 560 yield self.assertEquals(len(v), len(self.client.ids))
561 561 v = self.client.direct_view('all')
562 562 yield self.assertEquals(len(v), len(self.client.ids))
563 563 v = self.client[:2]
564 564 yield self.assertEquals(len(v), 2)
565 565 v = self.client[:1]
566 566 yield self.assertEquals(len(v), 1)
567 567 v = self.client.load_balanced_view()
568 568 yield self.assertEquals(len(v), len(self.client.ids))
569 569 # parametric tests seem to require manual closing?
570 570 self.client.close()
571 571
572
573 # begin execute tests
574
575 def test_execute_reply(self):
576 e0 = self.client[self.client.ids[0]]
577 e0.block = True
578 ar = e0.execute("5", silent=False)
579 er = ar.get()
580 time.sleep(0.2)
581 self.assertEquals(str(er), "<ExecuteReply[%i]: 5>" % er.execution_count)
582 self.assertEquals(er.pyout['text/plain'], '5')
583
584 def test_execute_reply_stdout(self):
585 e0 = self.client[self.client.ids[0]]
586 e0.block = True
587 ar = e0.execute("print (5)", silent=False)
588 er = ar.get()
589 time.sleep(0.2)
590 self.assertEquals(er.stdout.strip(), '5')
591
592 def test_execute_pyout(self):
593 """execute triggers pyout with silent=False"""
594 view = self.client[:]
595 ar = view.execute("5", silent=False, block=True)
596 time.sleep(0.2)
597 expected = [{'text/plain' : '5'}] * len(view)
598 self.assertEquals(ar.pyout, expected)
599
600 def test_execute_silent(self):
601 """execute does not trigger pyout with silent=True"""
602 view = self.client[:]
603 ar = view.execute("5", block=True)
604 expected = [None] * len(view)
605 self.assertEquals(ar.pyout, expected)
606
607 def test_execute_magic(self):
608 """execute accepts IPython commands"""
609 view = self.client[:]
610 view.execute("a = 5")
611 ar = view.execute("%whos", block=True)
612 # this will raise, if that failed
613 ar.get(5)
614 time.sleep(0.2)
615 for stdout in ar.stdout:
616 lines = stdout.splitlines()
617 self.assertEquals(lines[0].split(), ['Variable', 'Type', 'Data/Info'])
618 found = False
619 for line in lines[2:]:
620 split = line.split()
621 if split == ['a', 'int', '5']:
622 found = True
623 break
624 self.assertTrue(found, "whos output wrong: %s" % stdout)
625
626 def test_execute_displaypub(self):
627 """execute tracks display_pub output"""
628 view = self.client[:]
629 view.execute("from IPython.core.display import *")
630 ar = view.execute("[ display(i) for i in range(5) ]", block=True)
631 time.sleep(0.2)
632 outs = [ {u'text/plain' : unicode(i)} for i in range(5) ]
633 expected = [outs] * len(view)
634 self.assertEquals(ar.outputs, expected)
635
636 def test_apply_displaypub(self):
637 """apply tracks display_pub output"""
638 view = self.client[:]
639 view.execute("from IPython.core.display import *")
640
641 @interactive
642 def publish():
643 [ display(i) for i in range(5) ]
644
645 ar = view.apply_async(publish)
646 ar.get(5)
647 time.sleep(0.2)
648 outs = [ {u'text/plain' : unicode(j)} for j in range(5) ]
649 expected = [outs] * len(view)
650 self.assertEquals(ar.outputs, expected)
651
652 def test_execute_raises(self):
653 """exceptions in execute requests raise appropriately"""
654 view = self.client[-1]
655 ar = view.execute("1/0")
656 self.assertRaisesRemote(ZeroDivisionError, ar.get, 2)
657
General Comments 0
You need to be logged in to leave comments. Login now