##// END OF EJS Templates
test view.sync_imports
MinRK -
Show More
@@ -1,808 +1,834 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """test View objects
2 """test View objects
3
3
4 Authors:
4 Authors:
5
5
6 * Min RK
6 * Min RK
7 """
7 """
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import base64
19 import base64
20 import sys
20 import sys
21 import platform
21 import platform
22 import time
22 import time
23 from collections import namedtuple
23 from collections import namedtuple
24 from tempfile import mktemp
24 from tempfile import mktemp
25
25
26 import zmq
26 import zmq
27 from nose.plugins.attrib import attr
27 from nose.plugins.attrib import attr
28
28
29 from IPython.testing import decorators as dec
29 from IPython.testing import decorators as dec
30 from IPython.utils.io import capture_output
30 from IPython.utils.io import capture_output
31
31
32 from IPython import parallel as pmod
32 from IPython import parallel as pmod
33 from IPython.parallel import error
33 from IPython.parallel import error
34 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
34 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
35 from IPython.parallel.util import interactive
35 from IPython.parallel.util import interactive
36
36
37 from IPython.parallel.tests import add_engines
37 from IPython.parallel.tests import add_engines
38
38
39 from .clienttest import ClusterTestCase, crash, wait, skip_without
39 from .clienttest import ClusterTestCase, crash, wait, skip_without
40
40
41 def setup():
41 def setup():
42 add_engines(3, total=True)
42 add_engines(3, total=True)
43
43
44 point = namedtuple("point", "x y")
44 point = namedtuple("point", "x y")
45
45
46 class TestView(ClusterTestCase):
46 class TestView(ClusterTestCase):
47
47
48 def setUp(self):
48 def setUp(self):
49 # On Win XP, wait for resource cleanup, else parallel test group fails
49 # On Win XP, wait for resource cleanup, else parallel test group fails
50 if platform.system() == "Windows" and platform.win32_ver()[0] == "XP":
50 if platform.system() == "Windows" and platform.win32_ver()[0] == "XP":
51 # 1 sec fails. 1.5 sec seems ok. Using 2 sec for margin of safety
51 # 1 sec fails. 1.5 sec seems ok. Using 2 sec for margin of safety
52 time.sleep(2)
52 time.sleep(2)
53 super(TestView, self).setUp()
53 super(TestView, self).setUp()
54
54
55 @attr('crash')
55 @attr('crash')
56 def test_z_crash_mux(self):
56 def test_z_crash_mux(self):
57 """test graceful handling of engine death (direct)"""
57 """test graceful handling of engine death (direct)"""
58 # self.add_engines(1)
58 # self.add_engines(1)
59 eid = self.client.ids[-1]
59 eid = self.client.ids[-1]
60 ar = self.client[eid].apply_async(crash)
60 ar = self.client[eid].apply_async(crash)
61 self.assertRaisesRemote(error.EngineError, ar.get, 10)
61 self.assertRaisesRemote(error.EngineError, ar.get, 10)
62 eid = ar.engine_id
62 eid = ar.engine_id
63 tic = time.time()
63 tic = time.time()
64 while eid in self.client.ids and time.time()-tic < 5:
64 while eid in self.client.ids and time.time()-tic < 5:
65 time.sleep(.01)
65 time.sleep(.01)
66 self.client.spin()
66 self.client.spin()
67 self.assertFalse(eid in self.client.ids, "Engine should have died")
67 self.assertFalse(eid in self.client.ids, "Engine should have died")
68
68
69 def test_push_pull(self):
69 def test_push_pull(self):
70 """test pushing and pulling"""
70 """test pushing and pulling"""
71 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
71 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
72 t = self.client.ids[-1]
72 t = self.client.ids[-1]
73 v = self.client[t]
73 v = self.client[t]
74 push = v.push
74 push = v.push
75 pull = v.pull
75 pull = v.pull
76 v.block=True
76 v.block=True
77 nengines = len(self.client)
77 nengines = len(self.client)
78 push({'data':data})
78 push({'data':data})
79 d = pull('data')
79 d = pull('data')
80 self.assertEqual(d, data)
80 self.assertEqual(d, data)
81 self.client[:].push({'data':data})
81 self.client[:].push({'data':data})
82 d = self.client[:].pull('data', block=True)
82 d = self.client[:].pull('data', block=True)
83 self.assertEqual(d, nengines*[data])
83 self.assertEqual(d, nengines*[data])
84 ar = push({'data':data}, block=False)
84 ar = push({'data':data}, block=False)
85 self.assertTrue(isinstance(ar, AsyncResult))
85 self.assertTrue(isinstance(ar, AsyncResult))
86 r = ar.get()
86 r = ar.get()
87 ar = self.client[:].pull('data', block=False)
87 ar = self.client[:].pull('data', block=False)
88 self.assertTrue(isinstance(ar, AsyncResult))
88 self.assertTrue(isinstance(ar, AsyncResult))
89 r = ar.get()
89 r = ar.get()
90 self.assertEqual(r, nengines*[data])
90 self.assertEqual(r, nengines*[data])
91 self.client[:].push(dict(a=10,b=20))
91 self.client[:].push(dict(a=10,b=20))
92 r = self.client[:].pull(('a','b'), block=True)
92 r = self.client[:].pull(('a','b'), block=True)
93 self.assertEqual(r, nengines*[[10,20]])
93 self.assertEqual(r, nengines*[[10,20]])
94
94
95 def test_push_pull_function(self):
95 def test_push_pull_function(self):
96 "test pushing and pulling functions"
96 "test pushing and pulling functions"
97 def testf(x):
97 def testf(x):
98 return 2.0*x
98 return 2.0*x
99
99
100 t = self.client.ids[-1]
100 t = self.client.ids[-1]
101 v = self.client[t]
101 v = self.client[t]
102 v.block=True
102 v.block=True
103 push = v.push
103 push = v.push
104 pull = v.pull
104 pull = v.pull
105 execute = v.execute
105 execute = v.execute
106 push({'testf':testf})
106 push({'testf':testf})
107 r = pull('testf')
107 r = pull('testf')
108 self.assertEqual(r(1.0), testf(1.0))
108 self.assertEqual(r(1.0), testf(1.0))
109 execute('r = testf(10)')
109 execute('r = testf(10)')
110 r = pull('r')
110 r = pull('r')
111 self.assertEqual(r, testf(10))
111 self.assertEqual(r, testf(10))
112 ar = self.client[:].push({'testf':testf}, block=False)
112 ar = self.client[:].push({'testf':testf}, block=False)
113 ar.get()
113 ar.get()
114 ar = self.client[:].pull('testf', block=False)
114 ar = self.client[:].pull('testf', block=False)
115 rlist = ar.get()
115 rlist = ar.get()
116 for r in rlist:
116 for r in rlist:
117 self.assertEqual(r(1.0), testf(1.0))
117 self.assertEqual(r(1.0), testf(1.0))
118 execute("def g(x): return x*x")
118 execute("def g(x): return x*x")
119 r = pull(('testf','g'))
119 r = pull(('testf','g'))
120 self.assertEqual((r[0](10),r[1](10)), (testf(10), 100))
120 self.assertEqual((r[0](10),r[1](10)), (testf(10), 100))
121
121
122 def test_push_function_globals(self):
122 def test_push_function_globals(self):
123 """test that pushed functions have access to globals"""
123 """test that pushed functions have access to globals"""
124 @interactive
124 @interactive
125 def geta():
125 def geta():
126 return a
126 return a
127 # self.add_engines(1)
127 # self.add_engines(1)
128 v = self.client[-1]
128 v = self.client[-1]
129 v.block=True
129 v.block=True
130 v['f'] = geta
130 v['f'] = geta
131 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
131 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
132 v.execute('a=5')
132 v.execute('a=5')
133 v.execute('b=f()')
133 v.execute('b=f()')
134 self.assertEqual(v['b'], 5)
134 self.assertEqual(v['b'], 5)
135
135
136 def test_push_function_defaults(self):
136 def test_push_function_defaults(self):
137 """test that pushed functions preserve default args"""
137 """test that pushed functions preserve default args"""
138 def echo(a=10):
138 def echo(a=10):
139 return a
139 return a
140 v = self.client[-1]
140 v = self.client[-1]
141 v.block=True
141 v.block=True
142 v['f'] = echo
142 v['f'] = echo
143 v.execute('b=f()')
143 v.execute('b=f()')
144 self.assertEqual(v['b'], 10)
144 self.assertEqual(v['b'], 10)
145
145
146 def test_get_result(self):
146 def test_get_result(self):
147 """test getting results from the Hub."""
147 """test getting results from the Hub."""
148 c = pmod.Client(profile='iptest')
148 c = pmod.Client(profile='iptest')
149 # self.add_engines(1)
149 # self.add_engines(1)
150 t = c.ids[-1]
150 t = c.ids[-1]
151 v = c[t]
151 v = c[t]
152 v2 = self.client[t]
152 v2 = self.client[t]
153 ar = v.apply_async(wait, 1)
153 ar = v.apply_async(wait, 1)
154 # give the monitor time to notice the message
154 # give the monitor time to notice the message
155 time.sleep(.25)
155 time.sleep(.25)
156 ahr = v2.get_result(ar.msg_ids[0])
156 ahr = v2.get_result(ar.msg_ids[0])
157 self.assertTrue(isinstance(ahr, AsyncHubResult))
157 self.assertTrue(isinstance(ahr, AsyncHubResult))
158 self.assertEqual(ahr.get(), ar.get())
158 self.assertEqual(ahr.get(), ar.get())
159 ar2 = v2.get_result(ar.msg_ids[0])
159 ar2 = v2.get_result(ar.msg_ids[0])
160 self.assertFalse(isinstance(ar2, AsyncHubResult))
160 self.assertFalse(isinstance(ar2, AsyncHubResult))
161 c.spin()
161 c.spin()
162 c.close()
162 c.close()
163
163
164 def test_run_newline(self):
164 def test_run_newline(self):
165 """test that run appends newline to files"""
165 """test that run appends newline to files"""
166 tmpfile = mktemp()
166 tmpfile = mktemp()
167 with open(tmpfile, 'w') as f:
167 with open(tmpfile, 'w') as f:
168 f.write("""def g():
168 f.write("""def g():
169 return 5
169 return 5
170 """)
170 """)
171 v = self.client[-1]
171 v = self.client[-1]
172 v.run(tmpfile, block=True)
172 v.run(tmpfile, block=True)
173 self.assertEqual(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
173 self.assertEqual(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
174
174
175 def test_apply_tracked(self):
175 def test_apply_tracked(self):
176 """test tracking for apply"""
176 """test tracking for apply"""
177 # self.add_engines(1)
177 # self.add_engines(1)
178 t = self.client.ids[-1]
178 t = self.client.ids[-1]
179 v = self.client[t]
179 v = self.client[t]
180 v.block=False
180 v.block=False
181 def echo(n=1024*1024, **kwargs):
181 def echo(n=1024*1024, **kwargs):
182 with v.temp_flags(**kwargs):
182 with v.temp_flags(**kwargs):
183 return v.apply(lambda x: x, 'x'*n)
183 return v.apply(lambda x: x, 'x'*n)
184 ar = echo(1, track=False)
184 ar = echo(1, track=False)
185 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
185 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
186 self.assertTrue(ar.sent)
186 self.assertTrue(ar.sent)
187 ar = echo(track=True)
187 ar = echo(track=True)
188 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
188 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
189 self.assertEqual(ar.sent, ar._tracker.done)
189 self.assertEqual(ar.sent, ar._tracker.done)
190 ar._tracker.wait()
190 ar._tracker.wait()
191 self.assertTrue(ar.sent)
191 self.assertTrue(ar.sent)
192
192
193 def test_push_tracked(self):
193 def test_push_tracked(self):
194 t = self.client.ids[-1]
194 t = self.client.ids[-1]
195 ns = dict(x='x'*1024*1024)
195 ns = dict(x='x'*1024*1024)
196 v = self.client[t]
196 v = self.client[t]
197 ar = v.push(ns, block=False, track=False)
197 ar = v.push(ns, block=False, track=False)
198 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
198 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
199 self.assertTrue(ar.sent)
199 self.assertTrue(ar.sent)
200
200
201 ar = v.push(ns, block=False, track=True)
201 ar = v.push(ns, block=False, track=True)
202 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
202 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
203 ar._tracker.wait()
203 ar._tracker.wait()
204 self.assertEqual(ar.sent, ar._tracker.done)
204 self.assertEqual(ar.sent, ar._tracker.done)
205 self.assertTrue(ar.sent)
205 self.assertTrue(ar.sent)
206 ar.get()
206 ar.get()
207
207
208 def test_scatter_tracked(self):
208 def test_scatter_tracked(self):
209 t = self.client.ids
209 t = self.client.ids
210 x='x'*1024*1024
210 x='x'*1024*1024
211 ar = self.client[t].scatter('x', x, block=False, track=False)
211 ar = self.client[t].scatter('x', x, block=False, track=False)
212 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
212 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
213 self.assertTrue(ar.sent)
213 self.assertTrue(ar.sent)
214
214
215 ar = self.client[t].scatter('x', x, block=False, track=True)
215 ar = self.client[t].scatter('x', x, block=False, track=True)
216 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
216 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
217 self.assertEqual(ar.sent, ar._tracker.done)
217 self.assertEqual(ar.sent, ar._tracker.done)
218 ar._tracker.wait()
218 ar._tracker.wait()
219 self.assertTrue(ar.sent)
219 self.assertTrue(ar.sent)
220 ar.get()
220 ar.get()
221
221
222 def test_remote_reference(self):
222 def test_remote_reference(self):
223 v = self.client[-1]
223 v = self.client[-1]
224 v['a'] = 123
224 v['a'] = 123
225 ra = pmod.Reference('a')
225 ra = pmod.Reference('a')
226 b = v.apply_sync(lambda x: x, ra)
226 b = v.apply_sync(lambda x: x, ra)
227 self.assertEqual(b, 123)
227 self.assertEqual(b, 123)
228
228
229
229
230 def test_scatter_gather(self):
230 def test_scatter_gather(self):
231 view = self.client[:]
231 view = self.client[:]
232 seq1 = range(16)
232 seq1 = range(16)
233 view.scatter('a', seq1)
233 view.scatter('a', seq1)
234 seq2 = view.gather('a', block=True)
234 seq2 = view.gather('a', block=True)
235 self.assertEqual(seq2, seq1)
235 self.assertEqual(seq2, seq1)
236 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
236 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
237
237
238 @skip_without('numpy')
238 @skip_without('numpy')
239 def test_scatter_gather_numpy(self):
239 def test_scatter_gather_numpy(self):
240 import numpy
240 import numpy
241 from numpy.testing.utils import assert_array_equal
241 from numpy.testing.utils import assert_array_equal
242 view = self.client[:]
242 view = self.client[:]
243 a = numpy.arange(64)
243 a = numpy.arange(64)
244 view.scatter('a', a, block=True)
244 view.scatter('a', a, block=True)
245 b = view.gather('a', block=True)
245 b = view.gather('a', block=True)
246 assert_array_equal(b, a)
246 assert_array_equal(b, a)
247
247
248 def test_scatter_gather_lazy(self):
248 def test_scatter_gather_lazy(self):
249 """scatter/gather with targets='all'"""
249 """scatter/gather with targets='all'"""
250 view = self.client.direct_view(targets='all')
250 view = self.client.direct_view(targets='all')
251 x = range(64)
251 x = range(64)
252 view.scatter('x', x)
252 view.scatter('x', x)
253 gathered = view.gather('x', block=True)
253 gathered = view.gather('x', block=True)
254 self.assertEqual(gathered, x)
254 self.assertEqual(gathered, x)
255
255
256
256
257 @dec.known_failure_py3
257 @dec.known_failure_py3
258 @skip_without('numpy')
258 @skip_without('numpy')
259 def test_push_numpy_nocopy(self):
259 def test_push_numpy_nocopy(self):
260 import numpy
260 import numpy
261 view = self.client[:]
261 view = self.client[:]
262 a = numpy.arange(64)
262 a = numpy.arange(64)
263 view['A'] = a
263 view['A'] = a
264 @interactive
264 @interactive
265 def check_writeable(x):
265 def check_writeable(x):
266 return x.flags.writeable
266 return x.flags.writeable
267
267
268 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
268 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
269 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
269 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
270
270
271 view.push(dict(B=a))
271 view.push(dict(B=a))
272 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
272 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
273 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
273 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
274
274
275 @skip_without('numpy')
275 @skip_without('numpy')
276 def test_apply_numpy(self):
276 def test_apply_numpy(self):
277 """view.apply(f, ndarray)"""
277 """view.apply(f, ndarray)"""
278 import numpy
278 import numpy
279 from numpy.testing.utils import assert_array_equal
279 from numpy.testing.utils import assert_array_equal
280
280
281 A = numpy.random.random((100,100))
281 A = numpy.random.random((100,100))
282 view = self.client[-1]
282 view = self.client[-1]
283 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
283 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
284 B = A.astype(dt)
284 B = A.astype(dt)
285 C = view.apply_sync(lambda x:x, B)
285 C = view.apply_sync(lambda x:x, B)
286 assert_array_equal(B,C)
286 assert_array_equal(B,C)
287
287
288 @skip_without('numpy')
288 @skip_without('numpy')
289 def test_push_pull_recarray(self):
289 def test_push_pull_recarray(self):
290 """push/pull recarrays"""
290 """push/pull recarrays"""
291 import numpy
291 import numpy
292 from numpy.testing.utils import assert_array_equal
292 from numpy.testing.utils import assert_array_equal
293
293
294 view = self.client[-1]
294 view = self.client[-1]
295
295
296 R = numpy.array([
296 R = numpy.array([
297 (1, 'hi', 0.),
297 (1, 'hi', 0.),
298 (2**30, 'there', 2.5),
298 (2**30, 'there', 2.5),
299 (-99999, 'world', -12345.6789),
299 (-99999, 'world', -12345.6789),
300 ], [('n', int), ('s', '|S10'), ('f', float)])
300 ], [('n', int), ('s', '|S10'), ('f', float)])
301
301
302 view['RR'] = R
302 view['RR'] = R
303 R2 = view['RR']
303 R2 = view['RR']
304
304
305 r_dtype, r_shape = view.apply_sync(interactive(lambda : (RR.dtype, RR.shape)))
305 r_dtype, r_shape = view.apply_sync(interactive(lambda : (RR.dtype, RR.shape)))
306 self.assertEqual(r_dtype, R.dtype)
306 self.assertEqual(r_dtype, R.dtype)
307 self.assertEqual(r_shape, R.shape)
307 self.assertEqual(r_shape, R.shape)
308 self.assertEqual(R2.dtype, R.dtype)
308 self.assertEqual(R2.dtype, R.dtype)
309 self.assertEqual(R2.shape, R.shape)
309 self.assertEqual(R2.shape, R.shape)
310 assert_array_equal(R2, R)
310 assert_array_equal(R2, R)
311
311
312 @skip_without('pandas')
312 @skip_without('pandas')
313 def test_push_pull_timeseries(self):
313 def test_push_pull_timeseries(self):
314 """push/pull pandas.TimeSeries"""
314 """push/pull pandas.TimeSeries"""
315 import pandas
315 import pandas
316
316
317 ts = pandas.TimeSeries(range(10))
317 ts = pandas.TimeSeries(range(10))
318
318
319 view = self.client[-1]
319 view = self.client[-1]
320
320
321 view.push(dict(ts=ts), block=True)
321 view.push(dict(ts=ts), block=True)
322 rts = view['ts']
322 rts = view['ts']
323
323
324 self.assertEqual(type(rts), type(ts))
324 self.assertEqual(type(rts), type(ts))
325 self.assertTrue((ts == rts).all())
325 self.assertTrue((ts == rts).all())
326
326
327 def test_map(self):
327 def test_map(self):
328 view = self.client[:]
328 view = self.client[:]
329 def f(x):
329 def f(x):
330 return x**2
330 return x**2
331 data = range(16)
331 data = range(16)
332 r = view.map_sync(f, data)
332 r = view.map_sync(f, data)
333 self.assertEqual(r, map(f, data))
333 self.assertEqual(r, map(f, data))
334
334
335 def test_map_iterable(self):
335 def test_map_iterable(self):
336 """test map on iterables (direct)"""
336 """test map on iterables (direct)"""
337 view = self.client[:]
337 view = self.client[:]
338 # 101 is prime, so it won't be evenly distributed
338 # 101 is prime, so it won't be evenly distributed
339 arr = range(101)
339 arr = range(101)
340 # ensure it will be an iterator, even in Python 3
340 # ensure it will be an iterator, even in Python 3
341 it = iter(arr)
341 it = iter(arr)
342 r = view.map_sync(lambda x: x, it)
342 r = view.map_sync(lambda x: x, it)
343 self.assertEqual(r, list(arr))
343 self.assertEqual(r, list(arr))
344
344
345 @skip_without('numpy')
345 @skip_without('numpy')
346 def test_map_numpy(self):
346 def test_map_numpy(self):
347 """test map on numpy arrays (direct)"""
347 """test map on numpy arrays (direct)"""
348 import numpy
348 import numpy
349 from numpy.testing.utils import assert_array_equal
349 from numpy.testing.utils import assert_array_equal
350
350
351 view = self.client[:]
351 view = self.client[:]
352 # 101 is prime, so it won't be evenly distributed
352 # 101 is prime, so it won't be evenly distributed
353 arr = numpy.arange(101)
353 arr = numpy.arange(101)
354 r = view.map_sync(lambda x: x, arr)
354 r = view.map_sync(lambda x: x, arr)
355 assert_array_equal(r, arr)
355 assert_array_equal(r, arr)
356
356
357 def test_scatter_gather_nonblocking(self):
357 def test_scatter_gather_nonblocking(self):
358 data = range(16)
358 data = range(16)
359 view = self.client[:]
359 view = self.client[:]
360 view.scatter('a', data, block=False)
360 view.scatter('a', data, block=False)
361 ar = view.gather('a', block=False)
361 ar = view.gather('a', block=False)
362 self.assertEqual(ar.get(), data)
362 self.assertEqual(ar.get(), data)
363
363
364 @skip_without('numpy')
364 @skip_without('numpy')
365 def test_scatter_gather_numpy_nonblocking(self):
365 def test_scatter_gather_numpy_nonblocking(self):
366 import numpy
366 import numpy
367 from numpy.testing.utils import assert_array_equal
367 from numpy.testing.utils import assert_array_equal
368 a = numpy.arange(64)
368 a = numpy.arange(64)
369 view = self.client[:]
369 view = self.client[:]
370 ar = view.scatter('a', a, block=False)
370 ar = view.scatter('a', a, block=False)
371 self.assertTrue(isinstance(ar, AsyncResult))
371 self.assertTrue(isinstance(ar, AsyncResult))
372 amr = view.gather('a', block=False)
372 amr = view.gather('a', block=False)
373 self.assertTrue(isinstance(amr, AsyncMapResult))
373 self.assertTrue(isinstance(amr, AsyncMapResult))
374 assert_array_equal(amr.get(), a)
374 assert_array_equal(amr.get(), a)
375
375
376 def test_execute(self):
376 def test_execute(self):
377 view = self.client[:]
377 view = self.client[:]
378 # self.client.debug=True
378 # self.client.debug=True
379 execute = view.execute
379 execute = view.execute
380 ar = execute('c=30', block=False)
380 ar = execute('c=30', block=False)
381 self.assertTrue(isinstance(ar, AsyncResult))
381 self.assertTrue(isinstance(ar, AsyncResult))
382 ar = execute('d=[0,1,2]', block=False)
382 ar = execute('d=[0,1,2]', block=False)
383 self.client.wait(ar, 1)
383 self.client.wait(ar, 1)
384 self.assertEqual(len(ar.get()), len(self.client))
384 self.assertEqual(len(ar.get()), len(self.client))
385 for c in view['c']:
385 for c in view['c']:
386 self.assertEqual(c, 30)
386 self.assertEqual(c, 30)
387
387
388 def test_abort(self):
388 def test_abort(self):
389 view = self.client[-1]
389 view = self.client[-1]
390 ar = view.execute('import time; time.sleep(1)', block=False)
390 ar = view.execute('import time; time.sleep(1)', block=False)
391 ar2 = view.apply_async(lambda : 2)
391 ar2 = view.apply_async(lambda : 2)
392 ar3 = view.apply_async(lambda : 3)
392 ar3 = view.apply_async(lambda : 3)
393 view.abort(ar2)
393 view.abort(ar2)
394 view.abort(ar3.msg_ids)
394 view.abort(ar3.msg_ids)
395 self.assertRaises(error.TaskAborted, ar2.get)
395 self.assertRaises(error.TaskAborted, ar2.get)
396 self.assertRaises(error.TaskAborted, ar3.get)
396 self.assertRaises(error.TaskAborted, ar3.get)
397
397
398 def test_abort_all(self):
398 def test_abort_all(self):
399 """view.abort() aborts all outstanding tasks"""
399 """view.abort() aborts all outstanding tasks"""
400 view = self.client[-1]
400 view = self.client[-1]
401 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
401 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
402 view.abort()
402 view.abort()
403 view.wait(timeout=5)
403 view.wait(timeout=5)
404 for ar in ars[5:]:
404 for ar in ars[5:]:
405 self.assertRaises(error.TaskAborted, ar.get)
405 self.assertRaises(error.TaskAborted, ar.get)
406
406
407 def test_temp_flags(self):
407 def test_temp_flags(self):
408 view = self.client[-1]
408 view = self.client[-1]
409 view.block=True
409 view.block=True
410 with view.temp_flags(block=False):
410 with view.temp_flags(block=False):
411 self.assertFalse(view.block)
411 self.assertFalse(view.block)
412 self.assertTrue(view.block)
412 self.assertTrue(view.block)
413
413
414 @dec.known_failure_py3
414 @dec.known_failure_py3
415 def test_importer(self):
415 def test_importer(self):
416 view = self.client[-1]
416 view = self.client[-1]
417 view.clear(block=True)
417 view.clear(block=True)
418 with view.importer:
418 with view.importer:
419 import re
419 import re
420
420
421 @interactive
421 @interactive
422 def findall(pat, s):
422 def findall(pat, s):
423 # this globals() step isn't necessary in real code
423 # this globals() step isn't necessary in real code
424 # only to prevent a closure in the test
424 # only to prevent a closure in the test
425 re = globals()['re']
425 re = globals()['re']
426 return re.findall(pat, s)
426 return re.findall(pat, s)
427
427
428 self.assertEqual(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
428 self.assertEqual(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
429
429
430 def test_unicode_execute(self):
430 def test_unicode_execute(self):
431 """test executing unicode strings"""
431 """test executing unicode strings"""
432 v = self.client[-1]
432 v = self.client[-1]
433 v.block=True
433 v.block=True
434 if sys.version_info[0] >= 3:
434 if sys.version_info[0] >= 3:
435 code="a='é'"
435 code="a='é'"
436 else:
436 else:
437 code=u"a=u'é'"
437 code=u"a=u'é'"
438 v.execute(code)
438 v.execute(code)
439 self.assertEqual(v['a'], u'é')
439 self.assertEqual(v['a'], u'é')
440
440
441 def test_unicode_apply_result(self):
441 def test_unicode_apply_result(self):
442 """test unicode apply results"""
442 """test unicode apply results"""
443 v = self.client[-1]
443 v = self.client[-1]
444 r = v.apply_sync(lambda : u'é')
444 r = v.apply_sync(lambda : u'é')
445 self.assertEqual(r, u'é')
445 self.assertEqual(r, u'é')
446
446
447 def test_unicode_apply_arg(self):
447 def test_unicode_apply_arg(self):
448 """test passing unicode arguments to apply"""
448 """test passing unicode arguments to apply"""
449 v = self.client[-1]
449 v = self.client[-1]
450
450
451 @interactive
451 @interactive
452 def check_unicode(a, check):
452 def check_unicode(a, check):
453 assert isinstance(a, unicode), "%r is not unicode"%a
453 assert isinstance(a, unicode), "%r is not unicode"%a
454 assert isinstance(check, bytes), "%r is not bytes"%check
454 assert isinstance(check, bytes), "%r is not bytes"%check
455 assert a.encode('utf8') == check, "%s != %s"%(a,check)
455 assert a.encode('utf8') == check, "%s != %s"%(a,check)
456
456
457 for s in [ u'é', u'ßø®∫',u'asdf' ]:
457 for s in [ u'é', u'ßø®∫',u'asdf' ]:
458 try:
458 try:
459 v.apply_sync(check_unicode, s, s.encode('utf8'))
459 v.apply_sync(check_unicode, s, s.encode('utf8'))
460 except error.RemoteError as e:
460 except error.RemoteError as e:
461 if e.ename == 'AssertionError':
461 if e.ename == 'AssertionError':
462 self.fail(e.evalue)
462 self.fail(e.evalue)
463 else:
463 else:
464 raise e
464 raise e
465
465
466 def test_map_reference(self):
466 def test_map_reference(self):
467 """view.map(<Reference>, *seqs) should work"""
467 """view.map(<Reference>, *seqs) should work"""
468 v = self.client[:]
468 v = self.client[:]
469 v.scatter('n', self.client.ids, flatten=True)
469 v.scatter('n', self.client.ids, flatten=True)
470 v.execute("f = lambda x,y: x*y")
470 v.execute("f = lambda x,y: x*y")
471 rf = pmod.Reference('f')
471 rf = pmod.Reference('f')
472 nlist = list(range(10))
472 nlist = list(range(10))
473 mlist = nlist[::-1]
473 mlist = nlist[::-1]
474 expected = [ m*n for m,n in zip(mlist, nlist) ]
474 expected = [ m*n for m,n in zip(mlist, nlist) ]
475 result = v.map_sync(rf, mlist, nlist)
475 result = v.map_sync(rf, mlist, nlist)
476 self.assertEqual(result, expected)
476 self.assertEqual(result, expected)
477
477
478 def test_apply_reference(self):
478 def test_apply_reference(self):
479 """view.apply(<Reference>, *args) should work"""
479 """view.apply(<Reference>, *args) should work"""
480 v = self.client[:]
480 v = self.client[:]
481 v.scatter('n', self.client.ids, flatten=True)
481 v.scatter('n', self.client.ids, flatten=True)
482 v.execute("f = lambda x: n*x")
482 v.execute("f = lambda x: n*x")
483 rf = pmod.Reference('f')
483 rf = pmod.Reference('f')
484 result = v.apply_sync(rf, 5)
484 result = v.apply_sync(rf, 5)
485 expected = [ 5*id for id in self.client.ids ]
485 expected = [ 5*id for id in self.client.ids ]
486 self.assertEqual(result, expected)
486 self.assertEqual(result, expected)
487
487
488 def test_eval_reference(self):
488 def test_eval_reference(self):
489 v = self.client[self.client.ids[0]]
489 v = self.client[self.client.ids[0]]
490 v['g'] = range(5)
490 v['g'] = range(5)
491 rg = pmod.Reference('g[0]')
491 rg = pmod.Reference('g[0]')
492 echo = lambda x:x
492 echo = lambda x:x
493 self.assertEqual(v.apply_sync(echo, rg), 0)
493 self.assertEqual(v.apply_sync(echo, rg), 0)
494
494
495 def test_reference_nameerror(self):
495 def test_reference_nameerror(self):
496 v = self.client[self.client.ids[0]]
496 v = self.client[self.client.ids[0]]
497 r = pmod.Reference('elvis_has_left')
497 r = pmod.Reference('elvis_has_left')
498 echo = lambda x:x
498 echo = lambda x:x
499 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
499 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
500
500
501 def test_single_engine_map(self):
501 def test_single_engine_map(self):
502 e0 = self.client[self.client.ids[0]]
502 e0 = self.client[self.client.ids[0]]
503 r = range(5)
503 r = range(5)
504 check = [ -1*i for i in r ]
504 check = [ -1*i for i in r ]
505 result = e0.map_sync(lambda x: -1*x, r)
505 result = e0.map_sync(lambda x: -1*x, r)
506 self.assertEqual(result, check)
506 self.assertEqual(result, check)
507
507
508 def test_len(self):
508 def test_len(self):
509 """len(view) makes sense"""
509 """len(view) makes sense"""
510 e0 = self.client[self.client.ids[0]]
510 e0 = self.client[self.client.ids[0]]
511 self.assertEqual(len(e0), 1)
511 self.assertEqual(len(e0), 1)
512 v = self.client[:]
512 v = self.client[:]
513 self.assertEqual(len(v), len(self.client.ids))
513 self.assertEqual(len(v), len(self.client.ids))
514 v = self.client.direct_view('all')
514 v = self.client.direct_view('all')
515 self.assertEqual(len(v), len(self.client.ids))
515 self.assertEqual(len(v), len(self.client.ids))
516 v = self.client[:2]
516 v = self.client[:2]
517 self.assertEqual(len(v), 2)
517 self.assertEqual(len(v), 2)
518 v = self.client[:1]
518 v = self.client[:1]
519 self.assertEqual(len(v), 1)
519 self.assertEqual(len(v), 1)
520 v = self.client.load_balanced_view()
520 v = self.client.load_balanced_view()
521 self.assertEqual(len(v), len(self.client.ids))
521 self.assertEqual(len(v), len(self.client.ids))
522
522
523
523
524 # begin execute tests
524 # begin execute tests
525
525
526 def test_execute_reply(self):
526 def test_execute_reply(self):
527 e0 = self.client[self.client.ids[0]]
527 e0 = self.client[self.client.ids[0]]
528 e0.block = True
528 e0.block = True
529 ar = e0.execute("5", silent=False)
529 ar = e0.execute("5", silent=False)
530 er = ar.get()
530 er = ar.get()
531 self.assertEqual(str(er), "<ExecuteReply[%i]: 5>" % er.execution_count)
531 self.assertEqual(str(er), "<ExecuteReply[%i]: 5>" % er.execution_count)
532 self.assertEqual(er.pyout['data']['text/plain'], '5')
532 self.assertEqual(er.pyout['data']['text/plain'], '5')
533
533
534 def test_execute_reply_rich(self):
534 def test_execute_reply_rich(self):
535 e0 = self.client[self.client.ids[0]]
535 e0 = self.client[self.client.ids[0]]
536 e0.block = True
536 e0.block = True
537 e0.execute("from IPython.display import Image, HTML")
537 e0.execute("from IPython.display import Image, HTML")
538 ar = e0.execute("Image(data=b'garbage', format='png', width=10)", silent=False)
538 ar = e0.execute("Image(data=b'garbage', format='png', width=10)", silent=False)
539 er = ar.get()
539 er = ar.get()
540 b64data = base64.encodestring(b'garbage').decode('ascii')
540 b64data = base64.encodestring(b'garbage').decode('ascii')
541 self.assertEqual(er._repr_png_(), (b64data, dict(width=10)))
541 self.assertEqual(er._repr_png_(), (b64data, dict(width=10)))
542 ar = e0.execute("HTML('<b>bold</b>')", silent=False)
542 ar = e0.execute("HTML('<b>bold</b>')", silent=False)
543 er = ar.get()
543 er = ar.get()
544 self.assertEqual(er._repr_html_(), "<b>bold</b>")
544 self.assertEqual(er._repr_html_(), "<b>bold</b>")
545
545
546 def test_execute_reply_stdout(self):
546 def test_execute_reply_stdout(self):
547 e0 = self.client[self.client.ids[0]]
547 e0 = self.client[self.client.ids[0]]
548 e0.block = True
548 e0.block = True
549 ar = e0.execute("print (5)", silent=False)
549 ar = e0.execute("print (5)", silent=False)
550 er = ar.get()
550 er = ar.get()
551 self.assertEqual(er.stdout.strip(), '5')
551 self.assertEqual(er.stdout.strip(), '5')
552
552
553 def test_execute_pyout(self):
553 def test_execute_pyout(self):
554 """execute triggers pyout with silent=False"""
554 """execute triggers pyout with silent=False"""
555 view = self.client[:]
555 view = self.client[:]
556 ar = view.execute("5", silent=False, block=True)
556 ar = view.execute("5", silent=False, block=True)
557
557
558 expected = [{'text/plain' : '5'}] * len(view)
558 expected = [{'text/plain' : '5'}] * len(view)
559 mimes = [ out['data'] for out in ar.pyout ]
559 mimes = [ out['data'] for out in ar.pyout ]
560 self.assertEqual(mimes, expected)
560 self.assertEqual(mimes, expected)
561
561
562 def test_execute_silent(self):
562 def test_execute_silent(self):
563 """execute does not trigger pyout with silent=True"""
563 """execute does not trigger pyout with silent=True"""
564 view = self.client[:]
564 view = self.client[:]
565 ar = view.execute("5", block=True)
565 ar = view.execute("5", block=True)
566 expected = [None] * len(view)
566 expected = [None] * len(view)
567 self.assertEqual(ar.pyout, expected)
567 self.assertEqual(ar.pyout, expected)
568
568
569 def test_execute_magic(self):
569 def test_execute_magic(self):
570 """execute accepts IPython commands"""
570 """execute accepts IPython commands"""
571 view = self.client[:]
571 view = self.client[:]
572 view.execute("a = 5")
572 view.execute("a = 5")
573 ar = view.execute("%whos", block=True)
573 ar = view.execute("%whos", block=True)
574 # this will raise, if that failed
574 # this will raise, if that failed
575 ar.get(5)
575 ar.get(5)
576 for stdout in ar.stdout:
576 for stdout in ar.stdout:
577 lines = stdout.splitlines()
577 lines = stdout.splitlines()
578 self.assertEqual(lines[0].split(), ['Variable', 'Type', 'Data/Info'])
578 self.assertEqual(lines[0].split(), ['Variable', 'Type', 'Data/Info'])
579 found = False
579 found = False
580 for line in lines[2:]:
580 for line in lines[2:]:
581 split = line.split()
581 split = line.split()
582 if split == ['a', 'int', '5']:
582 if split == ['a', 'int', '5']:
583 found = True
583 found = True
584 break
584 break
585 self.assertTrue(found, "whos output wrong: %s" % stdout)
585 self.assertTrue(found, "whos output wrong: %s" % stdout)
586
586
587 def test_execute_displaypub(self):
587 def test_execute_displaypub(self):
588 """execute tracks display_pub output"""
588 """execute tracks display_pub output"""
589 view = self.client[:]
589 view = self.client[:]
590 view.execute("from IPython.core.display import *")
590 view.execute("from IPython.core.display import *")
591 ar = view.execute("[ display(i) for i in range(5) ]", block=True)
591 ar = view.execute("[ display(i) for i in range(5) ]", block=True)
592
592
593 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
593 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
594 for outputs in ar.outputs:
594 for outputs in ar.outputs:
595 mimes = [ out['data'] for out in outputs ]
595 mimes = [ out['data'] for out in outputs ]
596 self.assertEqual(mimes, expected)
596 self.assertEqual(mimes, expected)
597
597
598 def test_apply_displaypub(self):
598 def test_apply_displaypub(self):
599 """apply tracks display_pub output"""
599 """apply tracks display_pub output"""
600 view = self.client[:]
600 view = self.client[:]
601 view.execute("from IPython.core.display import *")
601 view.execute("from IPython.core.display import *")
602
602
603 @interactive
603 @interactive
604 def publish():
604 def publish():
605 [ display(i) for i in range(5) ]
605 [ display(i) for i in range(5) ]
606
606
607 ar = view.apply_async(publish)
607 ar = view.apply_async(publish)
608 ar.get(5)
608 ar.get(5)
609 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
609 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
610 for outputs in ar.outputs:
610 for outputs in ar.outputs:
611 mimes = [ out['data'] for out in outputs ]
611 mimes = [ out['data'] for out in outputs ]
612 self.assertEqual(mimes, expected)
612 self.assertEqual(mimes, expected)
613
613
614 def test_execute_raises(self):
614 def test_execute_raises(self):
615 """exceptions in execute requests raise appropriately"""
615 """exceptions in execute requests raise appropriately"""
616 view = self.client[-1]
616 view = self.client[-1]
617 ar = view.execute("1/0")
617 ar = view.execute("1/0")
618 self.assertRaisesRemote(ZeroDivisionError, ar.get, 2)
618 self.assertRaisesRemote(ZeroDivisionError, ar.get, 2)
619
619
620 def test_remoteerror_render_exception(self):
620 def test_remoteerror_render_exception(self):
621 """RemoteErrors get nice tracebacks"""
621 """RemoteErrors get nice tracebacks"""
622 view = self.client[-1]
622 view = self.client[-1]
623 ar = view.execute("1/0")
623 ar = view.execute("1/0")
624 ip = get_ipython()
624 ip = get_ipython()
625 ip.user_ns['ar'] = ar
625 ip.user_ns['ar'] = ar
626 with capture_output() as io:
626 with capture_output() as io:
627 ip.run_cell("ar.get(2)")
627 ip.run_cell("ar.get(2)")
628
628
629 self.assertTrue('ZeroDivisionError' in io.stdout, io.stdout)
629 self.assertTrue('ZeroDivisionError' in io.stdout, io.stdout)
630
630
631 def test_compositeerror_render_exception(self):
631 def test_compositeerror_render_exception(self):
632 """CompositeErrors get nice tracebacks"""
632 """CompositeErrors get nice tracebacks"""
633 view = self.client[:]
633 view = self.client[:]
634 ar = view.execute("1/0")
634 ar = view.execute("1/0")
635 ip = get_ipython()
635 ip = get_ipython()
636 ip.user_ns['ar'] = ar
636 ip.user_ns['ar'] = ar
637
637
638 with capture_output() as io:
638 with capture_output() as io:
639 ip.run_cell("ar.get(2)")
639 ip.run_cell("ar.get(2)")
640
640
641 count = min(error.CompositeError.tb_limit, len(view))
641 count = min(error.CompositeError.tb_limit, len(view))
642
642
643 self.assertEqual(io.stdout.count('ZeroDivisionError'), count * 2, io.stdout)
643 self.assertEqual(io.stdout.count('ZeroDivisionError'), count * 2, io.stdout)
644 self.assertEqual(io.stdout.count('by zero'), count, io.stdout)
644 self.assertEqual(io.stdout.count('by zero'), count, io.stdout)
645 self.assertEqual(io.stdout.count(':execute'), count, io.stdout)
645 self.assertEqual(io.stdout.count(':execute'), count, io.stdout)
646
646
647 def test_compositeerror_truncate(self):
647 def test_compositeerror_truncate(self):
648 """Truncate CompositeErrors with many exceptions"""
648 """Truncate CompositeErrors with many exceptions"""
649 view = self.client[:]
649 view = self.client[:]
650 msg_ids = []
650 msg_ids = []
651 for i in range(10):
651 for i in range(10):
652 ar = view.execute("1/0")
652 ar = view.execute("1/0")
653 msg_ids.extend(ar.msg_ids)
653 msg_ids.extend(ar.msg_ids)
654
654
655 ar = self.client.get_result(msg_ids)
655 ar = self.client.get_result(msg_ids)
656 try:
656 try:
657 ar.get()
657 ar.get()
658 except error.CompositeError as _e:
658 except error.CompositeError as _e:
659 e = _e
659 e = _e
660 else:
660 else:
661 self.fail("Should have raised CompositeError")
661 self.fail("Should have raised CompositeError")
662
662
663 lines = e.render_traceback()
663 lines = e.render_traceback()
664 with capture_output() as io:
664 with capture_output() as io:
665 e.print_traceback()
665 e.print_traceback()
666
666
667 self.assertTrue("more exceptions" in lines[-1])
667 self.assertTrue("more exceptions" in lines[-1])
668 count = e.tb_limit
668 count = e.tb_limit
669
669
670 self.assertEqual(io.stdout.count('ZeroDivisionError'), 2 * count, io.stdout)
670 self.assertEqual(io.stdout.count('ZeroDivisionError'), 2 * count, io.stdout)
671 self.assertEqual(io.stdout.count('by zero'), count, io.stdout)
671 self.assertEqual(io.stdout.count('by zero'), count, io.stdout)
672 self.assertEqual(io.stdout.count(':execute'), count, io.stdout)
672 self.assertEqual(io.stdout.count(':execute'), count, io.stdout)
673
673
674 @dec.skipif_not_matplotlib
674 @dec.skipif_not_matplotlib
675 def test_magic_pylab(self):
675 def test_magic_pylab(self):
676 """%pylab works on engines"""
676 """%pylab works on engines"""
677 view = self.client[-1]
677 view = self.client[-1]
678 ar = view.execute("%pylab inline")
678 ar = view.execute("%pylab inline")
679 # at least check if this raised:
679 # at least check if this raised:
680 reply = ar.get(5)
680 reply = ar.get(5)
681 # include imports, in case user config
681 # include imports, in case user config
682 ar = view.execute("plot(rand(100))", silent=False)
682 ar = view.execute("plot(rand(100))", silent=False)
683 reply = ar.get(5)
683 reply = ar.get(5)
684 self.assertEqual(len(reply.outputs), 1)
684 self.assertEqual(len(reply.outputs), 1)
685 output = reply.outputs[0]
685 output = reply.outputs[0]
686 self.assertTrue("data" in output)
686 self.assertTrue("data" in output)
687 data = output['data']
687 data = output['data']
688 self.assertTrue("image/png" in data)
688 self.assertTrue("image/png" in data)
689
689
690 def test_func_default_func(self):
690 def test_func_default_func(self):
691 """interactively defined function as apply func default"""
691 """interactively defined function as apply func default"""
692 def foo():
692 def foo():
693 return 'foo'
693 return 'foo'
694
694
695 def bar(f=foo):
695 def bar(f=foo):
696 return f()
696 return f()
697
697
698 view = self.client[-1]
698 view = self.client[-1]
699 ar = view.apply_async(bar)
699 ar = view.apply_async(bar)
700 r = ar.get(10)
700 r = ar.get(10)
701 self.assertEqual(r, 'foo')
701 self.assertEqual(r, 'foo')
702 def test_data_pub_single(self):
702 def test_data_pub_single(self):
703 view = self.client[-1]
703 view = self.client[-1]
704 ar = view.execute('\n'.join([
704 ar = view.execute('\n'.join([
705 'from IPython.kernel.zmq.datapub import publish_data',
705 'from IPython.kernel.zmq.datapub import publish_data',
706 'for i in range(5):',
706 'for i in range(5):',
707 ' publish_data(dict(i=i))'
707 ' publish_data(dict(i=i))'
708 ]), block=False)
708 ]), block=False)
709 self.assertTrue(isinstance(ar.data, dict))
709 self.assertTrue(isinstance(ar.data, dict))
710 ar.get(5)
710 ar.get(5)
711 self.assertEqual(ar.data, dict(i=4))
711 self.assertEqual(ar.data, dict(i=4))
712
712
713 def test_data_pub(self):
713 def test_data_pub(self):
714 view = self.client[:]
714 view = self.client[:]
715 ar = view.execute('\n'.join([
715 ar = view.execute('\n'.join([
716 'from IPython.kernel.zmq.datapub import publish_data',
716 'from IPython.kernel.zmq.datapub import publish_data',
717 'for i in range(5):',
717 'for i in range(5):',
718 ' publish_data(dict(i=i))'
718 ' publish_data(dict(i=i))'
719 ]), block=False)
719 ]), block=False)
720 self.assertTrue(all(isinstance(d, dict) for d in ar.data))
720 self.assertTrue(all(isinstance(d, dict) for d in ar.data))
721 ar.get(5)
721 ar.get(5)
722 self.assertEqual(ar.data, [dict(i=4)] * len(ar))
722 self.assertEqual(ar.data, [dict(i=4)] * len(ar))
723
723
724 def test_can_list_arg(self):
724 def test_can_list_arg(self):
725 """args in lists are canned"""
725 """args in lists are canned"""
726 view = self.client[-1]
726 view = self.client[-1]
727 view['a'] = 128
727 view['a'] = 128
728 rA = pmod.Reference('a')
728 rA = pmod.Reference('a')
729 ar = view.apply_async(lambda x: x, [rA])
729 ar = view.apply_async(lambda x: x, [rA])
730 r = ar.get(5)
730 r = ar.get(5)
731 self.assertEqual(r, [128])
731 self.assertEqual(r, [128])
732
732
733 def test_can_dict_arg(self):
733 def test_can_dict_arg(self):
734 """args in dicts are canned"""
734 """args in dicts are canned"""
735 view = self.client[-1]
735 view = self.client[-1]
736 view['a'] = 128
736 view['a'] = 128
737 rA = pmod.Reference('a')
737 rA = pmod.Reference('a')
738 ar = view.apply_async(lambda x: x, dict(foo=rA))
738 ar = view.apply_async(lambda x: x, dict(foo=rA))
739 r = ar.get(5)
739 r = ar.get(5)
740 self.assertEqual(r, dict(foo=128))
740 self.assertEqual(r, dict(foo=128))
741
741
742 def test_can_list_kwarg(self):
742 def test_can_list_kwarg(self):
743 """kwargs in lists are canned"""
743 """kwargs in lists are canned"""
744 view = self.client[-1]
744 view = self.client[-1]
745 view['a'] = 128
745 view['a'] = 128
746 rA = pmod.Reference('a')
746 rA = pmod.Reference('a')
747 ar = view.apply_async(lambda x=5: x, x=[rA])
747 ar = view.apply_async(lambda x=5: x, x=[rA])
748 r = ar.get(5)
748 r = ar.get(5)
749 self.assertEqual(r, [128])
749 self.assertEqual(r, [128])
750
750
751 def test_can_dict_kwarg(self):
751 def test_can_dict_kwarg(self):
752 """kwargs in dicts are canned"""
752 """kwargs in dicts are canned"""
753 view = self.client[-1]
753 view = self.client[-1]
754 view['a'] = 128
754 view['a'] = 128
755 rA = pmod.Reference('a')
755 rA = pmod.Reference('a')
756 ar = view.apply_async(lambda x=5: x, dict(foo=rA))
756 ar = view.apply_async(lambda x=5: x, dict(foo=rA))
757 r = ar.get(5)
757 r = ar.get(5)
758 self.assertEqual(r, dict(foo=128))
758 self.assertEqual(r, dict(foo=128))
759
759
760 def test_map_ref(self):
760 def test_map_ref(self):
761 """view.map works with references"""
761 """view.map works with references"""
762 view = self.client[:]
762 view = self.client[:]
763 ranks = sorted(self.client.ids)
763 ranks = sorted(self.client.ids)
764 view.scatter('rank', ranks, flatten=True)
764 view.scatter('rank', ranks, flatten=True)
765 rrank = pmod.Reference('rank')
765 rrank = pmod.Reference('rank')
766
766
767 amr = view.map_async(lambda x: x*2, [rrank] * len(view))
767 amr = view.map_async(lambda x: x*2, [rrank] * len(view))
768 drank = amr.get(5)
768 drank = amr.get(5)
769 self.assertEqual(drank, [ r*2 for r in ranks ])
769 self.assertEqual(drank, [ r*2 for r in ranks ])
770
770
771 def test_nested_getitem_setitem(self):
771 def test_nested_getitem_setitem(self):
772 """get and set with view['a.b']"""
772 """get and set with view['a.b']"""
773 view = self.client[-1]
773 view = self.client[-1]
774 view.execute('\n'.join([
774 view.execute('\n'.join([
775 'class A(object): pass',
775 'class A(object): pass',
776 'a = A()',
776 'a = A()',
777 'a.b = 128',
777 'a.b = 128',
778 ]), block=True)
778 ]), block=True)
779 ra = pmod.Reference('a')
779 ra = pmod.Reference('a')
780
780
781 r = view.apply_sync(lambda x: x.b, ra)
781 r = view.apply_sync(lambda x: x.b, ra)
782 self.assertEqual(r, 128)
782 self.assertEqual(r, 128)
783 self.assertEqual(view['a.b'], 128)
783 self.assertEqual(view['a.b'], 128)
784
784
785 view['a.b'] = 0
785 view['a.b'] = 0
786
786
787 r = view.apply_sync(lambda x: x.b, ra)
787 r = view.apply_sync(lambda x: x.b, ra)
788 self.assertEqual(r, 0)
788 self.assertEqual(r, 0)
789 self.assertEqual(view['a.b'], 0)
789 self.assertEqual(view['a.b'], 0)
790
790
791 def test_return_namedtuple(self):
791 def test_return_namedtuple(self):
792 def namedtuplify(x, y):
792 def namedtuplify(x, y):
793 from IPython.parallel.tests.test_view import point
793 from IPython.parallel.tests.test_view import point
794 return point(x, y)
794 return point(x, y)
795
795
796 view = self.client[-1]
796 view = self.client[-1]
797 p = view.apply_sync(namedtuplify, 1, 2)
797 p = view.apply_sync(namedtuplify, 1, 2)
798 self.assertEqual(p.x, 1)
798 self.assertEqual(p.x, 1)
799 self.assertEqual(p.y, 2)
799 self.assertEqual(p.y, 2)
800
800
801 def test_apply_namedtuple(self):
801 def test_apply_namedtuple(self):
802 def echoxy(p):
802 def echoxy(p):
803 return p.y, p.x
803 return p.y, p.x
804
804
805 view = self.client[-1]
805 view = self.client[-1]
806 tup = view.apply_sync(echoxy, point(1, 2))
806 tup = view.apply_sync(echoxy, point(1, 2))
807 self.assertEqual(tup, (2,1))
807 self.assertEqual(tup, (2,1))
808
809 def test_sync_imports(self):
810 view = self.client[-1]
811 with capture_output() as io:
812 with view.sync_imports():
813 import IPython
814 self.assertIn("IPython", io.stdout)
815
816 @interactive
817 def find_ipython():
818 return 'IPython' in globals()
819
820 assert view.apply_sync(find_ipython)
821
822 def test_sync_imports_quiet(self):
823 view = self.client[-1]
824 with capture_output() as io:
825 with view.sync_imports(quiet=True):
826 import IPython
827 self.assertEqual(io.stdout, '')
828
829 @interactive
830 def find_ipython():
831 return 'IPython' in globals()
832
833 assert view.apply_sync(find_ipython)
808
834
General Comments 0
You need to be logged in to leave comments. Login now