##// END OF EJS Templates
test new execute/output behaviors
MinRK -
Show More
@@ -1,571 +1,657 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """test View objects
2 """test View objects
3
3
4 Authors:
4 Authors:
5
5
6 * Min RK
6 * Min RK
7 """
7 """
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import sys
19 import sys
20 import time
20 import time
21 from tempfile import mktemp
21 from tempfile import mktemp
22 from StringIO import StringIO
22 from StringIO import StringIO
23
23
24 import zmq
24 import zmq
25 from nose import SkipTest
25 from nose import SkipTest
26
26
27 from IPython.testing import decorators as dec
27 from IPython.testing import decorators as dec
28 from IPython.testing.ipunittest import ParametricTestCase
28 from IPython.testing.ipunittest import ParametricTestCase
29
29
30 from IPython import parallel as pmod
30 from IPython import parallel as pmod
31 from IPython.parallel import error
31 from IPython.parallel import error
32 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
32 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
33 from IPython.parallel import DirectView
33 from IPython.parallel import DirectView
34 from IPython.parallel.util import interactive
34 from IPython.parallel.util import interactive
35
35
36 from IPython.parallel.tests import add_engines
36 from IPython.parallel.tests import add_engines
37
37
38 from .clienttest import ClusterTestCase, crash, wait, skip_without
38 from .clienttest import ClusterTestCase, crash, wait, skip_without
39
39
40 def setup():
40 def setup():
41 add_engines(3, total=True)
41 add_engines(3, total=True)
42
42
43 class TestView(ClusterTestCase, ParametricTestCase):
43 class TestView(ClusterTestCase, ParametricTestCase):
44
44
45 def test_z_crash_mux(self):
45 def test_z_crash_mux(self):
46 """test graceful handling of engine death (direct)"""
46 """test graceful handling of engine death (direct)"""
47 raise SkipTest("crash tests disabled, due to undesirable crash reports")
47 raise SkipTest("crash tests disabled, due to undesirable crash reports")
48 # self.add_engines(1)
48 # self.add_engines(1)
49 eid = self.client.ids[-1]
49 eid = self.client.ids[-1]
50 ar = self.client[eid].apply_async(crash)
50 ar = self.client[eid].apply_async(crash)
51 self.assertRaisesRemote(error.EngineError, ar.get, 10)
51 self.assertRaisesRemote(error.EngineError, ar.get, 10)
52 eid = ar.engine_id
52 eid = ar.engine_id
53 tic = time.time()
53 tic = time.time()
54 while eid in self.client.ids and time.time()-tic < 5:
54 while eid in self.client.ids and time.time()-tic < 5:
55 time.sleep(.01)
55 time.sleep(.01)
56 self.client.spin()
56 self.client.spin()
57 self.assertFalse(eid in self.client.ids, "Engine should have died")
57 self.assertFalse(eid in self.client.ids, "Engine should have died")
58
58
59 def test_push_pull(self):
59 def test_push_pull(self):
60 """test pushing and pulling"""
60 """test pushing and pulling"""
61 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
61 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
62 t = self.client.ids[-1]
62 t = self.client.ids[-1]
63 v = self.client[t]
63 v = self.client[t]
64 push = v.push
64 push = v.push
65 pull = v.pull
65 pull = v.pull
66 v.block=True
66 v.block=True
67 nengines = len(self.client)
67 nengines = len(self.client)
68 push({'data':data})
68 push({'data':data})
69 d = pull('data')
69 d = pull('data')
70 self.assertEquals(d, data)
70 self.assertEquals(d, data)
71 self.client[:].push({'data':data})
71 self.client[:].push({'data':data})
72 d = self.client[:].pull('data', block=True)
72 d = self.client[:].pull('data', block=True)
73 self.assertEquals(d, nengines*[data])
73 self.assertEquals(d, nengines*[data])
74 ar = push({'data':data}, block=False)
74 ar = push({'data':data}, block=False)
75 self.assertTrue(isinstance(ar, AsyncResult))
75 self.assertTrue(isinstance(ar, AsyncResult))
76 r = ar.get()
76 r = ar.get()
77 ar = self.client[:].pull('data', block=False)
77 ar = self.client[:].pull('data', block=False)
78 self.assertTrue(isinstance(ar, AsyncResult))
78 self.assertTrue(isinstance(ar, AsyncResult))
79 r = ar.get()
79 r = ar.get()
80 self.assertEquals(r, nengines*[data])
80 self.assertEquals(r, nengines*[data])
81 self.client[:].push(dict(a=10,b=20))
81 self.client[:].push(dict(a=10,b=20))
82 r = self.client[:].pull(('a','b'), block=True)
82 r = self.client[:].pull(('a','b'), block=True)
83 self.assertEquals(r, nengines*[[10,20]])
83 self.assertEquals(r, nengines*[[10,20]])
84
84
85 def test_push_pull_function(self):
85 def test_push_pull_function(self):
86 "test pushing and pulling functions"
86 "test pushing and pulling functions"
87 def testf(x):
87 def testf(x):
88 return 2.0*x
88 return 2.0*x
89
89
90 t = self.client.ids[-1]
90 t = self.client.ids[-1]
91 v = self.client[t]
91 v = self.client[t]
92 v.block=True
92 v.block=True
93 push = v.push
93 push = v.push
94 pull = v.pull
94 pull = v.pull
95 execute = v.execute
95 execute = v.execute
96 push({'testf':testf})
96 push({'testf':testf})
97 r = pull('testf')
97 r = pull('testf')
98 self.assertEqual(r(1.0), testf(1.0))
98 self.assertEqual(r(1.0), testf(1.0))
99 execute('r = testf(10)')
99 execute('r = testf(10)')
100 r = pull('r')
100 r = pull('r')
101 self.assertEquals(r, testf(10))
101 self.assertEquals(r, testf(10))
102 ar = self.client[:].push({'testf':testf}, block=False)
102 ar = self.client[:].push({'testf':testf}, block=False)
103 ar.get()
103 ar.get()
104 ar = self.client[:].pull('testf', block=False)
104 ar = self.client[:].pull('testf', block=False)
105 rlist = ar.get()
105 rlist = ar.get()
106 for r in rlist:
106 for r in rlist:
107 self.assertEqual(r(1.0), testf(1.0))
107 self.assertEqual(r(1.0), testf(1.0))
108 execute("def g(x): return x*x")
108 execute("def g(x): return x*x")
109 r = pull(('testf','g'))
109 r = pull(('testf','g'))
110 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
110 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
111
111
112 def test_push_function_globals(self):
112 def test_push_function_globals(self):
113 """test that pushed functions have access to globals"""
113 """test that pushed functions have access to globals"""
114 @interactive
114 @interactive
115 def geta():
115 def geta():
116 return a
116 return a
117 # self.add_engines(1)
117 # self.add_engines(1)
118 v = self.client[-1]
118 v = self.client[-1]
119 v.block=True
119 v.block=True
120 v['f'] = geta
120 v['f'] = geta
121 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
121 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
122 v.execute('a=5')
122 v.execute('a=5')
123 v.execute('b=f()')
123 v.execute('b=f()')
124 self.assertEquals(v['b'], 5)
124 self.assertEquals(v['b'], 5)
125
125
126 def test_push_function_defaults(self):
126 def test_push_function_defaults(self):
127 """test that pushed functions preserve default args"""
127 """test that pushed functions preserve default args"""
128 def echo(a=10):
128 def echo(a=10):
129 return a
129 return a
130 v = self.client[-1]
130 v = self.client[-1]
131 v.block=True
131 v.block=True
132 v['f'] = echo
132 v['f'] = echo
133 v.execute('b=f()')
133 v.execute('b=f()')
134 self.assertEquals(v['b'], 10)
134 self.assertEquals(v['b'], 10)
135
135
136 def test_get_result(self):
136 def test_get_result(self):
137 """test getting results from the Hub."""
137 """test getting results from the Hub."""
138 c = pmod.Client(profile='iptest')
138 c = pmod.Client(profile='iptest')
139 # self.add_engines(1)
139 # self.add_engines(1)
140 t = c.ids[-1]
140 t = c.ids[-1]
141 v = c[t]
141 v = c[t]
142 v2 = self.client[t]
142 v2 = self.client[t]
143 ar = v.apply_async(wait, 1)
143 ar = v.apply_async(wait, 1)
144 # give the monitor time to notice the message
144 # give the monitor time to notice the message
145 time.sleep(.25)
145 time.sleep(.25)
146 ahr = v2.get_result(ar.msg_ids)
146 ahr = v2.get_result(ar.msg_ids)
147 self.assertTrue(isinstance(ahr, AsyncHubResult))
147 self.assertTrue(isinstance(ahr, AsyncHubResult))
148 self.assertEquals(ahr.get(), ar.get())
148 self.assertEquals(ahr.get(), ar.get())
149 ar2 = v2.get_result(ar.msg_ids)
149 ar2 = v2.get_result(ar.msg_ids)
150 self.assertFalse(isinstance(ar2, AsyncHubResult))
150 self.assertFalse(isinstance(ar2, AsyncHubResult))
151 c.spin()
151 c.spin()
152 c.close()
152 c.close()
153
153
154 def test_run_newline(self):
154 def test_run_newline(self):
155 """test that run appends newline to files"""
155 """test that run appends newline to files"""
156 tmpfile = mktemp()
156 tmpfile = mktemp()
157 with open(tmpfile, 'w') as f:
157 with open(tmpfile, 'w') as f:
158 f.write("""def g():
158 f.write("""def g():
159 return 5
159 return 5
160 """)
160 """)
161 v = self.client[-1]
161 v = self.client[-1]
162 v.run(tmpfile, block=True)
162 v.run(tmpfile, block=True)
163 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
163 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
164
164
165 def test_apply_tracked(self):
165 def test_apply_tracked(self):
166 """test tracking for apply"""
166 """test tracking for apply"""
167 # self.add_engines(1)
167 # self.add_engines(1)
168 t = self.client.ids[-1]
168 t = self.client.ids[-1]
169 v = self.client[t]
169 v = self.client[t]
170 v.block=False
170 v.block=False
171 def echo(n=1024*1024, **kwargs):
171 def echo(n=1024*1024, **kwargs):
172 with v.temp_flags(**kwargs):
172 with v.temp_flags(**kwargs):
173 return v.apply(lambda x: x, 'x'*n)
173 return v.apply(lambda x: x, 'x'*n)
174 ar = echo(1, track=False)
174 ar = echo(1, track=False)
175 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
175 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
176 self.assertTrue(ar.sent)
176 self.assertTrue(ar.sent)
177 ar = echo(track=True)
177 ar = echo(track=True)
178 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
178 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
179 self.assertEquals(ar.sent, ar._tracker.done)
179 self.assertEquals(ar.sent, ar._tracker.done)
180 ar._tracker.wait()
180 ar._tracker.wait()
181 self.assertTrue(ar.sent)
181 self.assertTrue(ar.sent)
182
182
183 def test_push_tracked(self):
183 def test_push_tracked(self):
184 t = self.client.ids[-1]
184 t = self.client.ids[-1]
185 ns = dict(x='x'*1024*1024)
185 ns = dict(x='x'*1024*1024)
186 v = self.client[t]
186 v = self.client[t]
187 ar = v.push(ns, block=False, track=False)
187 ar = v.push(ns, block=False, track=False)
188 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
188 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
189 self.assertTrue(ar.sent)
189 self.assertTrue(ar.sent)
190
190
191 ar = v.push(ns, block=False, track=True)
191 ar = v.push(ns, block=False, track=True)
192 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
192 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
193 ar._tracker.wait()
193 ar._tracker.wait()
194 self.assertEquals(ar.sent, ar._tracker.done)
194 self.assertEquals(ar.sent, ar._tracker.done)
195 self.assertTrue(ar.sent)
195 self.assertTrue(ar.sent)
196 ar.get()
196 ar.get()
197
197
198 def test_scatter_tracked(self):
198 def test_scatter_tracked(self):
199 t = self.client.ids
199 t = self.client.ids
200 x='x'*1024*1024
200 x='x'*1024*1024
201 ar = self.client[t].scatter('x', x, block=False, track=False)
201 ar = self.client[t].scatter('x', x, block=False, track=False)
202 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
202 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
203 self.assertTrue(ar.sent)
203 self.assertTrue(ar.sent)
204
204
205 ar = self.client[t].scatter('x', x, block=False, track=True)
205 ar = self.client[t].scatter('x', x, block=False, track=True)
206 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
206 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
207 self.assertEquals(ar.sent, ar._tracker.done)
207 self.assertEquals(ar.sent, ar._tracker.done)
208 ar._tracker.wait()
208 ar._tracker.wait()
209 self.assertTrue(ar.sent)
209 self.assertTrue(ar.sent)
210 ar.get()
210 ar.get()
211
211
212 def test_remote_reference(self):
212 def test_remote_reference(self):
213 v = self.client[-1]
213 v = self.client[-1]
214 v['a'] = 123
214 v['a'] = 123
215 ra = pmod.Reference('a')
215 ra = pmod.Reference('a')
216 b = v.apply_sync(lambda x: x, ra)
216 b = v.apply_sync(lambda x: x, ra)
217 self.assertEquals(b, 123)
217 self.assertEquals(b, 123)
218
218
219
219
220 def test_scatter_gather(self):
220 def test_scatter_gather(self):
221 view = self.client[:]
221 view = self.client[:]
222 seq1 = range(16)
222 seq1 = range(16)
223 view.scatter('a', seq1)
223 view.scatter('a', seq1)
224 seq2 = view.gather('a', block=True)
224 seq2 = view.gather('a', block=True)
225 self.assertEquals(seq2, seq1)
225 self.assertEquals(seq2, seq1)
226 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
226 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
227
227
228 @skip_without('numpy')
228 @skip_without('numpy')
229 def test_scatter_gather_numpy(self):
229 def test_scatter_gather_numpy(self):
230 import numpy
230 import numpy
231 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
231 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
232 view = self.client[:]
232 view = self.client[:]
233 a = numpy.arange(64)
233 a = numpy.arange(64)
234 view.scatter('a', a)
234 view.scatter('a', a)
235 b = view.gather('a', block=True)
235 b = view.gather('a', block=True)
236 assert_array_equal(b, a)
236 assert_array_equal(b, a)
237
237
238 def test_scatter_gather_lazy(self):
238 def test_scatter_gather_lazy(self):
239 """scatter/gather with targets='all'"""
239 """scatter/gather with targets='all'"""
240 view = self.client.direct_view(targets='all')
240 view = self.client.direct_view(targets='all')
241 x = range(64)
241 x = range(64)
242 view.scatter('x', x)
242 view.scatter('x', x)
243 gathered = view.gather('x', block=True)
243 gathered = view.gather('x', block=True)
244 self.assertEquals(gathered, x)
244 self.assertEquals(gathered, x)
245
245
246
246
247 @dec.known_failure_py3
247 @dec.known_failure_py3
248 @skip_without('numpy')
248 @skip_without('numpy')
249 def test_push_numpy_nocopy(self):
249 def test_push_numpy_nocopy(self):
250 import numpy
250 import numpy
251 view = self.client[:]
251 view = self.client[:]
252 a = numpy.arange(64)
252 a = numpy.arange(64)
253 view['A'] = a
253 view['A'] = a
254 @interactive
254 @interactive
255 def check_writeable(x):
255 def check_writeable(x):
256 return x.flags.writeable
256 return x.flags.writeable
257
257
258 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
258 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
259 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
259 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
260
260
261 view.push(dict(B=a))
261 view.push(dict(B=a))
262 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
262 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
263 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
263 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
264
264
265 @skip_without('numpy')
265 @skip_without('numpy')
266 def test_apply_numpy(self):
266 def test_apply_numpy(self):
267 """view.apply(f, ndarray)"""
267 """view.apply(f, ndarray)"""
268 import numpy
268 import numpy
269 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
269 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
270
270
271 A = numpy.random.random((100,100))
271 A = numpy.random.random((100,100))
272 view = self.client[-1]
272 view = self.client[-1]
273 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
273 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
274 B = A.astype(dt)
274 B = A.astype(dt)
275 C = view.apply_sync(lambda x:x, B)
275 C = view.apply_sync(lambda x:x, B)
276 assert_array_equal(B,C)
276 assert_array_equal(B,C)
277
277
278 def test_map(self):
278 def test_map(self):
279 view = self.client[:]
279 view = self.client[:]
280 def f(x):
280 def f(x):
281 return x**2
281 return x**2
282 data = range(16)
282 data = range(16)
283 r = view.map_sync(f, data)
283 r = view.map_sync(f, data)
284 self.assertEquals(r, map(f, data))
284 self.assertEquals(r, map(f, data))
285
285
286 def test_map_iterable(self):
286 def test_map_iterable(self):
287 """test map on iterables (direct)"""
287 """test map on iterables (direct)"""
288 view = self.client[:]
288 view = self.client[:]
289 # 101 is prime, so it won't be evenly distributed
289 # 101 is prime, so it won't be evenly distributed
290 arr = range(101)
290 arr = range(101)
291 # ensure it will be an iterator, even in Python 3
291 # ensure it will be an iterator, even in Python 3
292 it = iter(arr)
292 it = iter(arr)
293 r = view.map_sync(lambda x:x, arr)
293 r = view.map_sync(lambda x:x, arr)
294 self.assertEquals(r, list(arr))
294 self.assertEquals(r, list(arr))
295
295
296 def test_scatterGatherNonblocking(self):
296 def test_scatterGatherNonblocking(self):
297 data = range(16)
297 data = range(16)
298 view = self.client[:]
298 view = self.client[:]
299 view.scatter('a', data, block=False)
299 view.scatter('a', data, block=False)
300 ar = view.gather('a', block=False)
300 ar = view.gather('a', block=False)
301 self.assertEquals(ar.get(), data)
301 self.assertEquals(ar.get(), data)
302
302
303 @skip_without('numpy')
303 @skip_without('numpy')
304 def test_scatter_gather_numpy_nonblocking(self):
304 def test_scatter_gather_numpy_nonblocking(self):
305 import numpy
305 import numpy
306 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
306 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
307 a = numpy.arange(64)
307 a = numpy.arange(64)
308 view = self.client[:]
308 view = self.client[:]
309 ar = view.scatter('a', a, block=False)
309 ar = view.scatter('a', a, block=False)
310 self.assertTrue(isinstance(ar, AsyncResult))
310 self.assertTrue(isinstance(ar, AsyncResult))
311 amr = view.gather('a', block=False)
311 amr = view.gather('a', block=False)
312 self.assertTrue(isinstance(amr, AsyncMapResult))
312 self.assertTrue(isinstance(amr, AsyncMapResult))
313 assert_array_equal(amr.get(), a)
313 assert_array_equal(amr.get(), a)
314
314
315 def test_execute(self):
315 def test_execute(self):
316 view = self.client[:]
316 view = self.client[:]
317 # self.client.debug=True
317 # self.client.debug=True
318 execute = view.execute
318 execute = view.execute
319 ar = execute('c=30', block=False)
319 ar = execute('c=30', block=False)
320 self.assertTrue(isinstance(ar, AsyncResult))
320 self.assertTrue(isinstance(ar, AsyncResult))
321 ar = execute('d=[0,1,2]', block=False)
321 ar = execute('d=[0,1,2]', block=False)
322 self.client.wait(ar, 1)
322 self.client.wait(ar, 1)
323 self.assertEquals(len(ar.get()), len(self.client))
323 self.assertEquals(len(ar.get()), len(self.client))
324 for c in view['c']:
324 for c in view['c']:
325 self.assertEquals(c, 30)
325 self.assertEquals(c, 30)
326
326
327 def test_abort(self):
327 def test_abort(self):
328 view = self.client[-1]
328 view = self.client[-1]
329 ar = view.execute('import time; time.sleep(1)', block=False)
329 ar = view.execute('import time; time.sleep(1)', block=False)
330 ar2 = view.apply_async(lambda : 2)
330 ar2 = view.apply_async(lambda : 2)
331 ar3 = view.apply_async(lambda : 3)
331 ar3 = view.apply_async(lambda : 3)
332 view.abort(ar2)
332 view.abort(ar2)
333 view.abort(ar3.msg_ids)
333 view.abort(ar3.msg_ids)
334 self.assertRaises(error.TaskAborted, ar2.get)
334 self.assertRaises(error.TaskAborted, ar2.get)
335 self.assertRaises(error.TaskAborted, ar3.get)
335 self.assertRaises(error.TaskAborted, ar3.get)
336
336
337 def test_abort_all(self):
337 def test_abort_all(self):
338 """view.abort() aborts all outstanding tasks"""
338 """view.abort() aborts all outstanding tasks"""
339 view = self.client[-1]
339 view = self.client[-1]
340 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
340 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
341 view.abort()
341 view.abort()
342 view.wait(timeout=5)
342 view.wait(timeout=5)
343 for ar in ars[5:]:
343 for ar in ars[5:]:
344 self.assertRaises(error.TaskAborted, ar.get)
344 self.assertRaises(error.TaskAborted, ar.get)
345
345
346 def test_temp_flags(self):
346 def test_temp_flags(self):
347 view = self.client[-1]
347 view = self.client[-1]
348 view.block=True
348 view.block=True
349 with view.temp_flags(block=False):
349 with view.temp_flags(block=False):
350 self.assertFalse(view.block)
350 self.assertFalse(view.block)
351 self.assertTrue(view.block)
351 self.assertTrue(view.block)
352
352
353 @dec.known_failure_py3
353 @dec.known_failure_py3
354 def test_importer(self):
354 def test_importer(self):
355 view = self.client[-1]
355 view = self.client[-1]
356 view.clear(block=True)
356 view.clear(block=True)
357 with view.importer:
357 with view.importer:
358 import re
358 import re
359
359
360 @interactive
360 @interactive
361 def findall(pat, s):
361 def findall(pat, s):
362 # this globals() step isn't necessary in real code
362 # this globals() step isn't necessary in real code
363 # only to prevent a closure in the test
363 # only to prevent a closure in the test
364 re = globals()['re']
364 re = globals()['re']
365 return re.findall(pat, s)
365 return re.findall(pat, s)
366
366
367 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
367 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
368
368
369 # parallel magic tests
369 # parallel magic tests
370
370
371 def test_magic_px_blocking(self):
371 def test_magic_px_blocking(self):
372 ip = get_ipython()
372 ip = get_ipython()
373 v = self.client[-1]
373 v = self.client[-1]
374 v.activate()
374 v.activate()
375 v.block=True
375 v.block=True
376
376
377 ip.magic_px('a=5')
377 ip.magic_px('a=5')
378 self.assertEquals(v['a'], 5)
378 self.assertEquals(v['a'], 5)
379 ip.magic_px('a=10')
379 ip.magic_px('a=10')
380 self.assertEquals(v['a'], 10)
380 self.assertEquals(v['a'], 10)
381 sio = StringIO()
381 sio = StringIO()
382 savestdout = sys.stdout
382 savestdout = sys.stdout
383 sys.stdout = sio
383 sys.stdout = sio
384 # just 'print a' worst ~99% of the time, but this ensures that
384 # just 'print a' worst ~99% of the time, but this ensures that
385 # the stdout message has arrived when the result is finished:
385 # the stdout message has arrived when the result is finished:
386 ip.magic_px('import sys,time;print (a); sys.stdout.flush();time.sleep(0.2)')
386 ip.magic_px('import sys,time;print (a); sys.stdout.flush();time.sleep(0.2)')
387 sys.stdout = savestdout
387 sys.stdout = savestdout
388 buf = sio.getvalue()
388 buf = sio.getvalue()
389 self.assertTrue('[stdout:' in buf, buf)
389 self.assertTrue('[stdout:' in buf, buf)
390 self.assertTrue(buf.rstrip().endswith('10'))
390 self.assertTrue(buf.rstrip().endswith('10'))
391 self.assertRaisesRemote(ZeroDivisionError, ip.magic_px, '1/0')
391 self.assertRaisesRemote(ZeroDivisionError, ip.magic_px, '1/0')
392
392
393 def test_magic_px_nonblocking(self):
393 def test_magic_px_nonblocking(self):
394 ip = get_ipython()
394 ip = get_ipython()
395 v = self.client[-1]
395 v = self.client[-1]
396 v.activate()
396 v.activate()
397 v.block=False
397 v.block=False
398
398
399 ip.magic_px('a=5')
399 ip.magic_px('a=5')
400 self.assertEquals(v['a'], 5)
400 self.assertEquals(v['a'], 5)
401 ip.magic_px('a=10')
401 ip.magic_px('a=10')
402 self.assertEquals(v['a'], 10)
402 self.assertEquals(v['a'], 10)
403 sio = StringIO()
403 sio = StringIO()
404 savestdout = sys.stdout
404 savestdout = sys.stdout
405 sys.stdout = sio
405 sys.stdout = sio
406 ip.magic_px('print a')
406 ip.magic_px('print a')
407 sys.stdout = savestdout
407 sys.stdout = savestdout
408 buf = sio.getvalue()
408 buf = sio.getvalue()
409 self.assertFalse('[stdout:%i]'%v.targets in buf)
409 self.assertFalse('[stdout:%i]'%v.targets in buf)
410 ip.magic_px('1/0')
410 ip.magic_px('1/0')
411 ar = v.get_result(-1)
411 ar = v.get_result(-1)
412 self.assertRaisesRemote(ZeroDivisionError, ar.get)
412 self.assertRaisesRemote(ZeroDivisionError, ar.get)
413
413
414 def test_magic_autopx_blocking(self):
414 def test_magic_autopx_blocking(self):
415 ip = get_ipython()
415 ip = get_ipython()
416 v = self.client[-1]
416 v = self.client[-1]
417 v.activate()
417 v.activate()
418 v.block=True
418 v.block=True
419
419
420 sio = StringIO()
420 sio = StringIO()
421 savestdout = sys.stdout
421 savestdout = sys.stdout
422 sys.stdout = sio
422 sys.stdout = sio
423 ip.magic_autopx()
423 ip.magic_autopx()
424 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
424 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
425 ip.run_cell('b*=2')
425 ip.run_cell('b*=2')
426 ip.run_cell('print (b)')
426 ip.run_cell('print (b)')
427 ip.run_cell("b/c")
427 ip.run_cell("b/c")
428 ip.magic_autopx()
428 ip.magic_autopx()
429 sys.stdout = savestdout
429 sys.stdout = savestdout
430 output = sio.getvalue().strip()
430 output = sio.getvalue().strip()
431 self.assertTrue(output.startswith('%autopx enabled'))
431 self.assertTrue(output.startswith('%autopx enabled'))
432 self.assertTrue(output.endswith('%autopx disabled'))
432 self.assertTrue(output.endswith('%autopx disabled'))
433 self.assertTrue('RemoteError: ZeroDivisionError' in output)
433 self.assertTrue('RemoteError: ZeroDivisionError' in output)
434 ar = v.get_result(-1)
434 ar = v.get_result(-1)
435 self.assertEquals(v['a'], 5)
435 self.assertEquals(v['a'], 5)
436 self.assertEquals(v['b'], 20)
436 self.assertEquals(v['b'], 20)
437 self.assertRaisesRemote(ZeroDivisionError, ar.get)
437 self.assertRaisesRemote(ZeroDivisionError, ar.get)
438
438
439 def test_magic_autopx_nonblocking(self):
439 def test_magic_autopx_nonblocking(self):
440 ip = get_ipython()
440 ip = get_ipython()
441 v = self.client[-1]
441 v = self.client[-1]
442 v.activate()
442 v.activate()
443 v.block=False
443 v.block=False
444
444
445 sio = StringIO()
445 sio = StringIO()
446 savestdout = sys.stdout
446 savestdout = sys.stdout
447 sys.stdout = sio
447 sys.stdout = sio
448 ip.magic_autopx()
448 ip.magic_autopx()
449 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
449 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
450 ip.run_cell('print (b)')
450 ip.run_cell('print (b)')
451 ip.run_cell("b/c")
451 ip.run_cell("b/c")
452 ip.run_cell('b*=2')
452 ip.run_cell('b*=2')
453 ip.magic_autopx()
453 ip.magic_autopx()
454 sys.stdout = savestdout
454 sys.stdout = savestdout
455 output = sio.getvalue().strip()
455 output = sio.getvalue().strip()
456 self.assertTrue(output.startswith('%autopx enabled'))
456 self.assertTrue(output.startswith('%autopx enabled'))
457 self.assertTrue(output.endswith('%autopx disabled'))
457 self.assertTrue(output.endswith('%autopx disabled'))
458 self.assertFalse('ZeroDivisionError' in output)
458 self.assertFalse('ZeroDivisionError' in output)
459 ar = v.get_result(-2)
459 ar = v.get_result(-2)
460 self.assertEquals(v['a'], 5)
460 self.assertEquals(v['a'], 5)
461 self.assertEquals(v['b'], 20)
461 self.assertEquals(v['b'], 20)
462 self.assertRaisesRemote(ZeroDivisionError, ar.get)
462 self.assertRaisesRemote(ZeroDivisionError, ar.get)
463
463
464 def test_magic_result(self):
464 def test_magic_result(self):
465 ip = get_ipython()
465 ip = get_ipython()
466 v = self.client[-1]
466 v = self.client[-1]
467 v.activate()
467 v.activate()
468 v['a'] = 111
468 v['a'] = 111
469 ra = v['a']
469 ra = v['a']
470
470
471 ar = ip.magic_result()
471 ar = ip.magic_result()
472 self.assertEquals(ar.msg_ids, [v.history[-1]])
472 self.assertEquals(ar.msg_ids, [v.history[-1]])
473 self.assertEquals(ar.get(), 111)
473 self.assertEquals(ar.get(), 111)
474 ar = ip.magic_result('-2')
474 ar = ip.magic_result('-2')
475 self.assertEquals(ar.msg_ids, [v.history[-2]])
475 self.assertEquals(ar.msg_ids, [v.history[-2]])
476
476
477 def test_unicode_execute(self):
477 def test_unicode_execute(self):
478 """test executing unicode strings"""
478 """test executing unicode strings"""
479 v = self.client[-1]
479 v = self.client[-1]
480 v.block=True
480 v.block=True
481 if sys.version_info[0] >= 3:
481 if sys.version_info[0] >= 3:
482 code="a='é'"
482 code="a='é'"
483 else:
483 else:
484 code=u"a=u'é'"
484 code=u"a=u'é'"
485 v.execute(code)
485 v.execute(code)
486 self.assertEquals(v['a'], u'é')
486 self.assertEquals(v['a'], u'é')
487
487
488 def test_unicode_apply_result(self):
488 def test_unicode_apply_result(self):
489 """test unicode apply results"""
489 """test unicode apply results"""
490 v = self.client[-1]
490 v = self.client[-1]
491 r = v.apply_sync(lambda : u'é')
491 r = v.apply_sync(lambda : u'é')
492 self.assertEquals(r, u'é')
492 self.assertEquals(r, u'é')
493
493
494 def test_unicode_apply_arg(self):
494 def test_unicode_apply_arg(self):
495 """test passing unicode arguments to apply"""
495 """test passing unicode arguments to apply"""
496 v = self.client[-1]
496 v = self.client[-1]
497
497
498 @interactive
498 @interactive
499 def check_unicode(a, check):
499 def check_unicode(a, check):
500 assert isinstance(a, unicode), "%r is not unicode"%a
500 assert isinstance(a, unicode), "%r is not unicode"%a
501 assert isinstance(check, bytes), "%r is not bytes"%check
501 assert isinstance(check, bytes), "%r is not bytes"%check
502 assert a.encode('utf8') == check, "%s != %s"%(a,check)
502 assert a.encode('utf8') == check, "%s != %s"%(a,check)
503
503
504 for s in [ u'é', u'ßø®∫',u'asdf' ]:
504 for s in [ u'é', u'ßø®∫',u'asdf' ]:
505 try:
505 try:
506 v.apply_sync(check_unicode, s, s.encode('utf8'))
506 v.apply_sync(check_unicode, s, s.encode('utf8'))
507 except error.RemoteError as e:
507 except error.RemoteError as e:
508 if e.ename == 'AssertionError':
508 if e.ename == 'AssertionError':
509 self.fail(e.evalue)
509 self.fail(e.evalue)
510 else:
510 else:
511 raise e
511 raise e
512
512
513 def test_map_reference(self):
513 def test_map_reference(self):
514 """view.map(<Reference>, *seqs) should work"""
514 """view.map(<Reference>, *seqs) should work"""
515 v = self.client[:]
515 v = self.client[:]
516 v.scatter('n', self.client.ids, flatten=True)
516 v.scatter('n', self.client.ids, flatten=True)
517 v.execute("f = lambda x,y: x*y")
517 v.execute("f = lambda x,y: x*y")
518 rf = pmod.Reference('f')
518 rf = pmod.Reference('f')
519 nlist = list(range(10))
519 nlist = list(range(10))
520 mlist = nlist[::-1]
520 mlist = nlist[::-1]
521 expected = [ m*n for m,n in zip(mlist, nlist) ]
521 expected = [ m*n for m,n in zip(mlist, nlist) ]
522 result = v.map_sync(rf, mlist, nlist)
522 result = v.map_sync(rf, mlist, nlist)
523 self.assertEquals(result, expected)
523 self.assertEquals(result, expected)
524
524
525 def test_apply_reference(self):
525 def test_apply_reference(self):
526 """view.apply(<Reference>, *args) should work"""
526 """view.apply(<Reference>, *args) should work"""
527 v = self.client[:]
527 v = self.client[:]
528 v.scatter('n', self.client.ids, flatten=True)
528 v.scatter('n', self.client.ids, flatten=True)
529 v.execute("f = lambda x: n*x")
529 v.execute("f = lambda x: n*x")
530 rf = pmod.Reference('f')
530 rf = pmod.Reference('f')
531 result = v.apply_sync(rf, 5)
531 result = v.apply_sync(rf, 5)
532 expected = [ 5*id for id in self.client.ids ]
532 expected = [ 5*id for id in self.client.ids ]
533 self.assertEquals(result, expected)
533 self.assertEquals(result, expected)
534
534
535 def test_eval_reference(self):
535 def test_eval_reference(self):
536 v = self.client[self.client.ids[0]]
536 v = self.client[self.client.ids[0]]
537 v['g'] = range(5)
537 v['g'] = range(5)
538 rg = pmod.Reference('g[0]')
538 rg = pmod.Reference('g[0]')
539 echo = lambda x:x
539 echo = lambda x:x
540 self.assertEquals(v.apply_sync(echo, rg), 0)
540 self.assertEquals(v.apply_sync(echo, rg), 0)
541
541
542 def test_reference_nameerror(self):
542 def test_reference_nameerror(self):
543 v = self.client[self.client.ids[0]]
543 v = self.client[self.client.ids[0]]
544 r = pmod.Reference('elvis_has_left')
544 r = pmod.Reference('elvis_has_left')
545 echo = lambda x:x
545 echo = lambda x:x
546 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
546 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
547
547
548 def test_single_engine_map(self):
548 def test_single_engine_map(self):
549 e0 = self.client[self.client.ids[0]]
549 e0 = self.client[self.client.ids[0]]
550 r = range(5)
550 r = range(5)
551 check = [ -1*i for i in r ]
551 check = [ -1*i for i in r ]
552 result = e0.map_sync(lambda x: -1*x, r)
552 result = e0.map_sync(lambda x: -1*x, r)
553 self.assertEquals(result, check)
553 self.assertEquals(result, check)
554
554
555 def test_len(self):
555 def test_len(self):
556 """len(view) makes sense"""
556 """len(view) makes sense"""
557 e0 = self.client[self.client.ids[0]]
557 e0 = self.client[self.client.ids[0]]
558 yield self.assertEquals(len(e0), 1)
558 yield self.assertEquals(len(e0), 1)
559 v = self.client[:]
559 v = self.client[:]
560 yield self.assertEquals(len(v), len(self.client.ids))
560 yield self.assertEquals(len(v), len(self.client.ids))
561 v = self.client.direct_view('all')
561 v = self.client.direct_view('all')
562 yield self.assertEquals(len(v), len(self.client.ids))
562 yield self.assertEquals(len(v), len(self.client.ids))
563 v = self.client[:2]
563 v = self.client[:2]
564 yield self.assertEquals(len(v), 2)
564 yield self.assertEquals(len(v), 2)
565 v = self.client[:1]
565 v = self.client[:1]
566 yield self.assertEquals(len(v), 1)
566 yield self.assertEquals(len(v), 1)
567 v = self.client.load_balanced_view()
567 v = self.client.load_balanced_view()
568 yield self.assertEquals(len(v), len(self.client.ids))
568 yield self.assertEquals(len(v), len(self.client.ids))
569 # parametric tests seem to require manual closing?
569 # parametric tests seem to require manual closing?
570 self.client.close()
570 self.client.close()
571
571
572
573 # begin execute tests
574
575 def test_execute_reply(self):
576 e0 = self.client[self.client.ids[0]]
577 e0.block = True
578 ar = e0.execute("5", silent=False)
579 er = ar.get()
580 time.sleep(0.2)
581 self.assertEquals(str(er), "<ExecuteReply[%i]: 5>" % er.execution_count)
582 self.assertEquals(er.pyout['text/plain'], '5')
583
584 def test_execute_reply_stdout(self):
585 e0 = self.client[self.client.ids[0]]
586 e0.block = True
587 ar = e0.execute("print (5)", silent=False)
588 er = ar.get()
589 time.sleep(0.2)
590 self.assertEquals(er.stdout.strip(), '5')
591
592 def test_execute_pyout(self):
593 """execute triggers pyout with silent=False"""
594 view = self.client[:]
595 ar = view.execute("5", silent=False, block=True)
596 time.sleep(0.2)
597 expected = [{'text/plain' : '5'}] * len(view)
598 self.assertEquals(ar.pyout, expected)
599
600 def test_execute_silent(self):
601 """execute does not trigger pyout with silent=True"""
602 view = self.client[:]
603 ar = view.execute("5", block=True)
604 expected = [None] * len(view)
605 self.assertEquals(ar.pyout, expected)
606
607 def test_execute_magic(self):
608 """execute accepts IPython commands"""
609 view = self.client[:]
610 view.execute("a = 5")
611 ar = view.execute("%whos", block=True)
612 # this will raise, if that failed
613 ar.get(5)
614 time.sleep(0.2)
615 for stdout in ar.stdout:
616 lines = stdout.splitlines()
617 self.assertEquals(lines[0].split(), ['Variable', 'Type', 'Data/Info'])
618 found = False
619 for line in lines[2:]:
620 split = line.split()
621 if split == ['a', 'int', '5']:
622 found = True
623 break
624 self.assertTrue(found, "whos output wrong: %s" % stdout)
625
626 def test_execute_displaypub(self):
627 """execute tracks display_pub output"""
628 view = self.client[:]
629 view.execute("from IPython.core.display import *")
630 ar = view.execute("[ display(i) for i in range(5) ]", block=True)
631 time.sleep(0.2)
632 outs = [ {u'text/plain' : unicode(i)} for i in range(5) ]
633 expected = [outs] * len(view)
634 self.assertEquals(ar.outputs, expected)
635
636 def test_apply_displaypub(self):
637 """apply tracks display_pub output"""
638 view = self.client[:]
639 view.execute("from IPython.core.display import *")
640
641 @interactive
642 def publish():
643 [ display(i) for i in range(5) ]
644
645 ar = view.apply_async(publish)
646 ar.get(5)
647 time.sleep(0.2)
648 outs = [ {u'text/plain' : unicode(j)} for j in range(5) ]
649 expected = [outs] * len(view)
650 self.assertEquals(ar.outputs, expected)
651
652 def test_execute_raises(self):
653 """exceptions in execute requests raise appropriately"""
654 view = self.client[-1]
655 ar = view.execute("1/0")
656 self.assertRaisesRemote(ZeroDivisionError, ar.get, 2)
657
General Comments 0
You need to be logged in to leave comments. Login now