##// END OF EJS Templates
test namedtuple as return and apply arg
MinRK -
Show More
@@ -1,737 +1,759 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """test View objects
2 """test View objects
3
3
4 Authors:
4 Authors:
5
5
6 * Min RK
6 * Min RK
7 """
7 """
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import sys
19 import sys
20 import platform
20 import platform
21 import time
21 import time
22 from collections import namedtuple
22 from tempfile import mktemp
23 from tempfile import mktemp
23 from StringIO import StringIO
24 from StringIO import StringIO
24
25
25 import zmq
26 import zmq
26 from nose import SkipTest
27 from nose import SkipTest
27 from nose.plugins.attrib import attr
28 from nose.plugins.attrib import attr
28
29
29 from IPython.testing import decorators as dec
30 from IPython.testing import decorators as dec
30 from IPython.testing.ipunittest import ParametricTestCase
31 from IPython.testing.ipunittest import ParametricTestCase
31 from IPython.utils.io import capture_output
32 from IPython.utils.io import capture_output
32
33
33 from IPython import parallel as pmod
34 from IPython import parallel as pmod
34 from IPython.parallel import error
35 from IPython.parallel import error
35 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
36 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
36 from IPython.parallel import DirectView
37 from IPython.parallel import DirectView
37 from IPython.parallel.util import interactive
38 from IPython.parallel.util import interactive
38
39
39 from IPython.parallel.tests import add_engines
40 from IPython.parallel.tests import add_engines
40
41
41 from .clienttest import ClusterTestCase, crash, wait, skip_without
42 from .clienttest import ClusterTestCase, crash, wait, skip_without
42
43
43 def setup():
44 def setup():
44 add_engines(3, total=True)
45 add_engines(3, total=True)
45
46
47 point = namedtuple("point", "x y")
48
46 class TestView(ClusterTestCase, ParametricTestCase):
49 class TestView(ClusterTestCase, ParametricTestCase):
47
50
48 def setUp(self):
51 def setUp(self):
49 # On Win XP, wait for resource cleanup, else parallel test group fails
52 # On Win XP, wait for resource cleanup, else parallel test group fails
50 if platform.system() == "Windows" and platform.win32_ver()[0] == "XP":
53 if platform.system() == "Windows" and platform.win32_ver()[0] == "XP":
51 # 1 sec fails. 1.5 sec seems ok. Using 2 sec for margin of safety
54 # 1 sec fails. 1.5 sec seems ok. Using 2 sec for margin of safety
52 time.sleep(2)
55 time.sleep(2)
53 super(TestView, self).setUp()
56 super(TestView, self).setUp()
54
57
55 @attr('crash')
58 @attr('crash')
56 def test_z_crash_mux(self):
59 def test_z_crash_mux(self):
57 """test graceful handling of engine death (direct)"""
60 """test graceful handling of engine death (direct)"""
58 # self.add_engines(1)
61 # self.add_engines(1)
59 eid = self.client.ids[-1]
62 eid = self.client.ids[-1]
60 ar = self.client[eid].apply_async(crash)
63 ar = self.client[eid].apply_async(crash)
61 self.assertRaisesRemote(error.EngineError, ar.get, 10)
64 self.assertRaisesRemote(error.EngineError, ar.get, 10)
62 eid = ar.engine_id
65 eid = ar.engine_id
63 tic = time.time()
66 tic = time.time()
64 while eid in self.client.ids and time.time()-tic < 5:
67 while eid in self.client.ids and time.time()-tic < 5:
65 time.sleep(.01)
68 time.sleep(.01)
66 self.client.spin()
69 self.client.spin()
67 self.assertFalse(eid in self.client.ids, "Engine should have died")
70 self.assertFalse(eid in self.client.ids, "Engine should have died")
68
71
69 def test_push_pull(self):
72 def test_push_pull(self):
70 """test pushing and pulling"""
73 """test pushing and pulling"""
71 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
74 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
72 t = self.client.ids[-1]
75 t = self.client.ids[-1]
73 v = self.client[t]
76 v = self.client[t]
74 push = v.push
77 push = v.push
75 pull = v.pull
78 pull = v.pull
76 v.block=True
79 v.block=True
77 nengines = len(self.client)
80 nengines = len(self.client)
78 push({'data':data})
81 push({'data':data})
79 d = pull('data')
82 d = pull('data')
80 self.assertEqual(d, data)
83 self.assertEqual(d, data)
81 self.client[:].push({'data':data})
84 self.client[:].push({'data':data})
82 d = self.client[:].pull('data', block=True)
85 d = self.client[:].pull('data', block=True)
83 self.assertEqual(d, nengines*[data])
86 self.assertEqual(d, nengines*[data])
84 ar = push({'data':data}, block=False)
87 ar = push({'data':data}, block=False)
85 self.assertTrue(isinstance(ar, AsyncResult))
88 self.assertTrue(isinstance(ar, AsyncResult))
86 r = ar.get()
89 r = ar.get()
87 ar = self.client[:].pull('data', block=False)
90 ar = self.client[:].pull('data', block=False)
88 self.assertTrue(isinstance(ar, AsyncResult))
91 self.assertTrue(isinstance(ar, AsyncResult))
89 r = ar.get()
92 r = ar.get()
90 self.assertEqual(r, nengines*[data])
93 self.assertEqual(r, nengines*[data])
91 self.client[:].push(dict(a=10,b=20))
94 self.client[:].push(dict(a=10,b=20))
92 r = self.client[:].pull(('a','b'), block=True)
95 r = self.client[:].pull(('a','b'), block=True)
93 self.assertEqual(r, nengines*[[10,20]])
96 self.assertEqual(r, nengines*[[10,20]])
94
97
95 def test_push_pull_function(self):
98 def test_push_pull_function(self):
96 "test pushing and pulling functions"
99 "test pushing and pulling functions"
97 def testf(x):
100 def testf(x):
98 return 2.0*x
101 return 2.0*x
99
102
100 t = self.client.ids[-1]
103 t = self.client.ids[-1]
101 v = self.client[t]
104 v = self.client[t]
102 v.block=True
105 v.block=True
103 push = v.push
106 push = v.push
104 pull = v.pull
107 pull = v.pull
105 execute = v.execute
108 execute = v.execute
106 push({'testf':testf})
109 push({'testf':testf})
107 r = pull('testf')
110 r = pull('testf')
108 self.assertEqual(r(1.0), testf(1.0))
111 self.assertEqual(r(1.0), testf(1.0))
109 execute('r = testf(10)')
112 execute('r = testf(10)')
110 r = pull('r')
113 r = pull('r')
111 self.assertEqual(r, testf(10))
114 self.assertEqual(r, testf(10))
112 ar = self.client[:].push({'testf':testf}, block=False)
115 ar = self.client[:].push({'testf':testf}, block=False)
113 ar.get()
116 ar.get()
114 ar = self.client[:].pull('testf', block=False)
117 ar = self.client[:].pull('testf', block=False)
115 rlist = ar.get()
118 rlist = ar.get()
116 for r in rlist:
119 for r in rlist:
117 self.assertEqual(r(1.0), testf(1.0))
120 self.assertEqual(r(1.0), testf(1.0))
118 execute("def g(x): return x*x")
121 execute("def g(x): return x*x")
119 r = pull(('testf','g'))
122 r = pull(('testf','g'))
120 self.assertEqual((r[0](10),r[1](10)), (testf(10), 100))
123 self.assertEqual((r[0](10),r[1](10)), (testf(10), 100))
121
124
122 def test_push_function_globals(self):
125 def test_push_function_globals(self):
123 """test that pushed functions have access to globals"""
126 """test that pushed functions have access to globals"""
124 @interactive
127 @interactive
125 def geta():
128 def geta():
126 return a
129 return a
127 # self.add_engines(1)
130 # self.add_engines(1)
128 v = self.client[-1]
131 v = self.client[-1]
129 v.block=True
132 v.block=True
130 v['f'] = geta
133 v['f'] = geta
131 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
134 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
132 v.execute('a=5')
135 v.execute('a=5')
133 v.execute('b=f()')
136 v.execute('b=f()')
134 self.assertEqual(v['b'], 5)
137 self.assertEqual(v['b'], 5)
135
138
136 def test_push_function_defaults(self):
139 def test_push_function_defaults(self):
137 """test that pushed functions preserve default args"""
140 """test that pushed functions preserve default args"""
138 def echo(a=10):
141 def echo(a=10):
139 return a
142 return a
140 v = self.client[-1]
143 v = self.client[-1]
141 v.block=True
144 v.block=True
142 v['f'] = echo
145 v['f'] = echo
143 v.execute('b=f()')
146 v.execute('b=f()')
144 self.assertEqual(v['b'], 10)
147 self.assertEqual(v['b'], 10)
145
148
146 def test_get_result(self):
149 def test_get_result(self):
147 """test getting results from the Hub."""
150 """test getting results from the Hub."""
148 c = pmod.Client(profile='iptest')
151 c = pmod.Client(profile='iptest')
149 # self.add_engines(1)
152 # self.add_engines(1)
150 t = c.ids[-1]
153 t = c.ids[-1]
151 v = c[t]
154 v = c[t]
152 v2 = self.client[t]
155 v2 = self.client[t]
153 ar = v.apply_async(wait, 1)
156 ar = v.apply_async(wait, 1)
154 # give the monitor time to notice the message
157 # give the monitor time to notice the message
155 time.sleep(.25)
158 time.sleep(.25)
156 ahr = v2.get_result(ar.msg_ids)
159 ahr = v2.get_result(ar.msg_ids)
157 self.assertTrue(isinstance(ahr, AsyncHubResult))
160 self.assertTrue(isinstance(ahr, AsyncHubResult))
158 self.assertEqual(ahr.get(), ar.get())
161 self.assertEqual(ahr.get(), ar.get())
159 ar2 = v2.get_result(ar.msg_ids)
162 ar2 = v2.get_result(ar.msg_ids)
160 self.assertFalse(isinstance(ar2, AsyncHubResult))
163 self.assertFalse(isinstance(ar2, AsyncHubResult))
161 c.spin()
164 c.spin()
162 c.close()
165 c.close()
163
166
164 def test_run_newline(self):
167 def test_run_newline(self):
165 """test that run appends newline to files"""
168 """test that run appends newline to files"""
166 tmpfile = mktemp()
169 tmpfile = mktemp()
167 with open(tmpfile, 'w') as f:
170 with open(tmpfile, 'w') as f:
168 f.write("""def g():
171 f.write("""def g():
169 return 5
172 return 5
170 """)
173 """)
171 v = self.client[-1]
174 v = self.client[-1]
172 v.run(tmpfile, block=True)
175 v.run(tmpfile, block=True)
173 self.assertEqual(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
176 self.assertEqual(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
174
177
175 def test_apply_tracked(self):
178 def test_apply_tracked(self):
176 """test tracking for apply"""
179 """test tracking for apply"""
177 # self.add_engines(1)
180 # self.add_engines(1)
178 t = self.client.ids[-1]
181 t = self.client.ids[-1]
179 v = self.client[t]
182 v = self.client[t]
180 v.block=False
183 v.block=False
181 def echo(n=1024*1024, **kwargs):
184 def echo(n=1024*1024, **kwargs):
182 with v.temp_flags(**kwargs):
185 with v.temp_flags(**kwargs):
183 return v.apply(lambda x: x, 'x'*n)
186 return v.apply(lambda x: x, 'x'*n)
184 ar = echo(1, track=False)
187 ar = echo(1, track=False)
185 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
188 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
186 self.assertTrue(ar.sent)
189 self.assertTrue(ar.sent)
187 ar = echo(track=True)
190 ar = echo(track=True)
188 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
191 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
189 self.assertEqual(ar.sent, ar._tracker.done)
192 self.assertEqual(ar.sent, ar._tracker.done)
190 ar._tracker.wait()
193 ar._tracker.wait()
191 self.assertTrue(ar.sent)
194 self.assertTrue(ar.sent)
192
195
193 def test_push_tracked(self):
196 def test_push_tracked(self):
194 t = self.client.ids[-1]
197 t = self.client.ids[-1]
195 ns = dict(x='x'*1024*1024)
198 ns = dict(x='x'*1024*1024)
196 v = self.client[t]
199 v = self.client[t]
197 ar = v.push(ns, block=False, track=False)
200 ar = v.push(ns, block=False, track=False)
198 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
201 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
199 self.assertTrue(ar.sent)
202 self.assertTrue(ar.sent)
200
203
201 ar = v.push(ns, block=False, track=True)
204 ar = v.push(ns, block=False, track=True)
202 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
205 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
203 ar._tracker.wait()
206 ar._tracker.wait()
204 self.assertEqual(ar.sent, ar._tracker.done)
207 self.assertEqual(ar.sent, ar._tracker.done)
205 self.assertTrue(ar.sent)
208 self.assertTrue(ar.sent)
206 ar.get()
209 ar.get()
207
210
208 def test_scatter_tracked(self):
211 def test_scatter_tracked(self):
209 t = self.client.ids
212 t = self.client.ids
210 x='x'*1024*1024
213 x='x'*1024*1024
211 ar = self.client[t].scatter('x', x, block=False, track=False)
214 ar = self.client[t].scatter('x', x, block=False, track=False)
212 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
215 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
213 self.assertTrue(ar.sent)
216 self.assertTrue(ar.sent)
214
217
215 ar = self.client[t].scatter('x', x, block=False, track=True)
218 ar = self.client[t].scatter('x', x, block=False, track=True)
216 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
219 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
217 self.assertEqual(ar.sent, ar._tracker.done)
220 self.assertEqual(ar.sent, ar._tracker.done)
218 ar._tracker.wait()
221 ar._tracker.wait()
219 self.assertTrue(ar.sent)
222 self.assertTrue(ar.sent)
220 ar.get()
223 ar.get()
221
224
222 def test_remote_reference(self):
225 def test_remote_reference(self):
223 v = self.client[-1]
226 v = self.client[-1]
224 v['a'] = 123
227 v['a'] = 123
225 ra = pmod.Reference('a')
228 ra = pmod.Reference('a')
226 b = v.apply_sync(lambda x: x, ra)
229 b = v.apply_sync(lambda x: x, ra)
227 self.assertEqual(b, 123)
230 self.assertEqual(b, 123)
228
231
229
232
230 def test_scatter_gather(self):
233 def test_scatter_gather(self):
231 view = self.client[:]
234 view = self.client[:]
232 seq1 = range(16)
235 seq1 = range(16)
233 view.scatter('a', seq1)
236 view.scatter('a', seq1)
234 seq2 = view.gather('a', block=True)
237 seq2 = view.gather('a', block=True)
235 self.assertEqual(seq2, seq1)
238 self.assertEqual(seq2, seq1)
236 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
239 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
237
240
238 @skip_without('numpy')
241 @skip_without('numpy')
239 def test_scatter_gather_numpy(self):
242 def test_scatter_gather_numpy(self):
240 import numpy
243 import numpy
241 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
244 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
242 view = self.client[:]
245 view = self.client[:]
243 a = numpy.arange(64)
246 a = numpy.arange(64)
244 view.scatter('a', a, block=True)
247 view.scatter('a', a, block=True)
245 b = view.gather('a', block=True)
248 b = view.gather('a', block=True)
246 assert_array_equal(b, a)
249 assert_array_equal(b, a)
247
250
248 def test_scatter_gather_lazy(self):
251 def test_scatter_gather_lazy(self):
249 """scatter/gather with targets='all'"""
252 """scatter/gather with targets='all'"""
250 view = self.client.direct_view(targets='all')
253 view = self.client.direct_view(targets='all')
251 x = range(64)
254 x = range(64)
252 view.scatter('x', x)
255 view.scatter('x', x)
253 gathered = view.gather('x', block=True)
256 gathered = view.gather('x', block=True)
254 self.assertEqual(gathered, x)
257 self.assertEqual(gathered, x)
255
258
256
259
257 @dec.known_failure_py3
260 @dec.known_failure_py3
258 @skip_without('numpy')
261 @skip_without('numpy')
259 def test_push_numpy_nocopy(self):
262 def test_push_numpy_nocopy(self):
260 import numpy
263 import numpy
261 view = self.client[:]
264 view = self.client[:]
262 a = numpy.arange(64)
265 a = numpy.arange(64)
263 view['A'] = a
266 view['A'] = a
264 @interactive
267 @interactive
265 def check_writeable(x):
268 def check_writeable(x):
266 return x.flags.writeable
269 return x.flags.writeable
267
270
268 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
271 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
269 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
272 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
270
273
271 view.push(dict(B=a))
274 view.push(dict(B=a))
272 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
275 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
273 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
276 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
274
277
275 @skip_without('numpy')
278 @skip_without('numpy')
276 def test_apply_numpy(self):
279 def test_apply_numpy(self):
277 """view.apply(f, ndarray)"""
280 """view.apply(f, ndarray)"""
278 import numpy
281 import numpy
279 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
282 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
280
283
281 A = numpy.random.random((100,100))
284 A = numpy.random.random((100,100))
282 view = self.client[-1]
285 view = self.client[-1]
283 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
286 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
284 B = A.astype(dt)
287 B = A.astype(dt)
285 C = view.apply_sync(lambda x:x, B)
288 C = view.apply_sync(lambda x:x, B)
286 assert_array_equal(B,C)
289 assert_array_equal(B,C)
287
290
288 @skip_without('numpy')
291 @skip_without('numpy')
289 def test_push_pull_recarray(self):
292 def test_push_pull_recarray(self):
290 """push/pull recarrays"""
293 """push/pull recarrays"""
291 import numpy
294 import numpy
292 from numpy.testing.utils import assert_array_equal
295 from numpy.testing.utils import assert_array_equal
293
296
294 view = self.client[-1]
297 view = self.client[-1]
295
298
296 R = numpy.array([
299 R = numpy.array([
297 (1, 'hi', 0.),
300 (1, 'hi', 0.),
298 (2**30, 'there', 2.5),
301 (2**30, 'there', 2.5),
299 (-99999, 'world', -12345.6789),
302 (-99999, 'world', -12345.6789),
300 ], [('n', int), ('s', '|S10'), ('f', float)])
303 ], [('n', int), ('s', '|S10'), ('f', float)])
301
304
302 view['RR'] = R
305 view['RR'] = R
303 R2 = view['RR']
306 R2 = view['RR']
304
307
305 r_dtype, r_shape = view.apply_sync(interactive(lambda : (RR.dtype, RR.shape)))
308 r_dtype, r_shape = view.apply_sync(interactive(lambda : (RR.dtype, RR.shape)))
306 self.assertEqual(r_dtype, R.dtype)
309 self.assertEqual(r_dtype, R.dtype)
307 self.assertEqual(r_shape, R.shape)
310 self.assertEqual(r_shape, R.shape)
308 self.assertEqual(R2.dtype, R.dtype)
311 self.assertEqual(R2.dtype, R.dtype)
309 self.assertEqual(R2.shape, R.shape)
312 self.assertEqual(R2.shape, R.shape)
310 assert_array_equal(R2, R)
313 assert_array_equal(R2, R)
311
314
312 @skip_without('pandas')
315 @skip_without('pandas')
313 def test_push_pull_timeseries(self):
316 def test_push_pull_timeseries(self):
314 """push/pull pandas.TimeSeries"""
317 """push/pull pandas.TimeSeries"""
315 import pandas
318 import pandas
316
319
317 ts = pandas.TimeSeries(range(10))
320 ts = pandas.TimeSeries(range(10))
318
321
319 view = self.client[-1]
322 view = self.client[-1]
320
323
321 view.push(dict(ts=ts), block=True)
324 view.push(dict(ts=ts), block=True)
322 rts = view['ts']
325 rts = view['ts']
323
326
324 self.assertEqual(type(rts), type(ts))
327 self.assertEqual(type(rts), type(ts))
325 self.assertTrue((ts == rts).all())
328 self.assertTrue((ts == rts).all())
326
329
327 def test_map(self):
330 def test_map(self):
328 view = self.client[:]
331 view = self.client[:]
329 def f(x):
332 def f(x):
330 return x**2
333 return x**2
331 data = range(16)
334 data = range(16)
332 r = view.map_sync(f, data)
335 r = view.map_sync(f, data)
333 self.assertEqual(r, map(f, data))
336 self.assertEqual(r, map(f, data))
334
337
335 def test_map_iterable(self):
338 def test_map_iterable(self):
336 """test map on iterables (direct)"""
339 """test map on iterables (direct)"""
337 view = self.client[:]
340 view = self.client[:]
338 # 101 is prime, so it won't be evenly distributed
341 # 101 is prime, so it won't be evenly distributed
339 arr = range(101)
342 arr = range(101)
340 # ensure it will be an iterator, even in Python 3
343 # ensure it will be an iterator, even in Python 3
341 it = iter(arr)
344 it = iter(arr)
342 r = view.map_sync(lambda x:x, arr)
345 r = view.map_sync(lambda x:x, arr)
343 self.assertEqual(r, list(arr))
346 self.assertEqual(r, list(arr))
344
347
345 def test_scatter_gather_nonblocking(self):
348 def test_scatter_gather_nonblocking(self):
346 data = range(16)
349 data = range(16)
347 view = self.client[:]
350 view = self.client[:]
348 view.scatter('a', data, block=False)
351 view.scatter('a', data, block=False)
349 ar = view.gather('a', block=False)
352 ar = view.gather('a', block=False)
350 self.assertEqual(ar.get(), data)
353 self.assertEqual(ar.get(), data)
351
354
352 @skip_without('numpy')
355 @skip_without('numpy')
353 def test_scatter_gather_numpy_nonblocking(self):
356 def test_scatter_gather_numpy_nonblocking(self):
354 import numpy
357 import numpy
355 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
358 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
356 a = numpy.arange(64)
359 a = numpy.arange(64)
357 view = self.client[:]
360 view = self.client[:]
358 ar = view.scatter('a', a, block=False)
361 ar = view.scatter('a', a, block=False)
359 self.assertTrue(isinstance(ar, AsyncResult))
362 self.assertTrue(isinstance(ar, AsyncResult))
360 amr = view.gather('a', block=False)
363 amr = view.gather('a', block=False)
361 self.assertTrue(isinstance(amr, AsyncMapResult))
364 self.assertTrue(isinstance(amr, AsyncMapResult))
362 assert_array_equal(amr.get(), a)
365 assert_array_equal(amr.get(), a)
363
366
364 def test_execute(self):
367 def test_execute(self):
365 view = self.client[:]
368 view = self.client[:]
366 # self.client.debug=True
369 # self.client.debug=True
367 execute = view.execute
370 execute = view.execute
368 ar = execute('c=30', block=False)
371 ar = execute('c=30', block=False)
369 self.assertTrue(isinstance(ar, AsyncResult))
372 self.assertTrue(isinstance(ar, AsyncResult))
370 ar = execute('d=[0,1,2]', block=False)
373 ar = execute('d=[0,1,2]', block=False)
371 self.client.wait(ar, 1)
374 self.client.wait(ar, 1)
372 self.assertEqual(len(ar.get()), len(self.client))
375 self.assertEqual(len(ar.get()), len(self.client))
373 for c in view['c']:
376 for c in view['c']:
374 self.assertEqual(c, 30)
377 self.assertEqual(c, 30)
375
378
376 def test_abort(self):
379 def test_abort(self):
377 view = self.client[-1]
380 view = self.client[-1]
378 ar = view.execute('import time; time.sleep(1)', block=False)
381 ar = view.execute('import time; time.sleep(1)', block=False)
379 ar2 = view.apply_async(lambda : 2)
382 ar2 = view.apply_async(lambda : 2)
380 ar3 = view.apply_async(lambda : 3)
383 ar3 = view.apply_async(lambda : 3)
381 view.abort(ar2)
384 view.abort(ar2)
382 view.abort(ar3.msg_ids)
385 view.abort(ar3.msg_ids)
383 self.assertRaises(error.TaskAborted, ar2.get)
386 self.assertRaises(error.TaskAborted, ar2.get)
384 self.assertRaises(error.TaskAborted, ar3.get)
387 self.assertRaises(error.TaskAborted, ar3.get)
385
388
386 def test_abort_all(self):
389 def test_abort_all(self):
387 """view.abort() aborts all outstanding tasks"""
390 """view.abort() aborts all outstanding tasks"""
388 view = self.client[-1]
391 view = self.client[-1]
389 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
392 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
390 view.abort()
393 view.abort()
391 view.wait(timeout=5)
394 view.wait(timeout=5)
392 for ar in ars[5:]:
395 for ar in ars[5:]:
393 self.assertRaises(error.TaskAborted, ar.get)
396 self.assertRaises(error.TaskAborted, ar.get)
394
397
395 def test_temp_flags(self):
398 def test_temp_flags(self):
396 view = self.client[-1]
399 view = self.client[-1]
397 view.block=True
400 view.block=True
398 with view.temp_flags(block=False):
401 with view.temp_flags(block=False):
399 self.assertFalse(view.block)
402 self.assertFalse(view.block)
400 self.assertTrue(view.block)
403 self.assertTrue(view.block)
401
404
402 @dec.known_failure_py3
405 @dec.known_failure_py3
403 def test_importer(self):
406 def test_importer(self):
404 view = self.client[-1]
407 view = self.client[-1]
405 view.clear(block=True)
408 view.clear(block=True)
406 with view.importer:
409 with view.importer:
407 import re
410 import re
408
411
409 @interactive
412 @interactive
410 def findall(pat, s):
413 def findall(pat, s):
411 # this globals() step isn't necessary in real code
414 # this globals() step isn't necessary in real code
412 # only to prevent a closure in the test
415 # only to prevent a closure in the test
413 re = globals()['re']
416 re = globals()['re']
414 return re.findall(pat, s)
417 return re.findall(pat, s)
415
418
416 self.assertEqual(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
419 self.assertEqual(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
417
420
418 def test_unicode_execute(self):
421 def test_unicode_execute(self):
419 """test executing unicode strings"""
422 """test executing unicode strings"""
420 v = self.client[-1]
423 v = self.client[-1]
421 v.block=True
424 v.block=True
422 if sys.version_info[0] >= 3:
425 if sys.version_info[0] >= 3:
423 code="a='é'"
426 code="a='é'"
424 else:
427 else:
425 code=u"a=u'é'"
428 code=u"a=u'é'"
426 v.execute(code)
429 v.execute(code)
427 self.assertEqual(v['a'], u'é')
430 self.assertEqual(v['a'], u'é')
428
431
429 def test_unicode_apply_result(self):
432 def test_unicode_apply_result(self):
430 """test unicode apply results"""
433 """test unicode apply results"""
431 v = self.client[-1]
434 v = self.client[-1]
432 r = v.apply_sync(lambda : u'é')
435 r = v.apply_sync(lambda : u'é')
433 self.assertEqual(r, u'é')
436 self.assertEqual(r, u'é')
434
437
435 def test_unicode_apply_arg(self):
438 def test_unicode_apply_arg(self):
436 """test passing unicode arguments to apply"""
439 """test passing unicode arguments to apply"""
437 v = self.client[-1]
440 v = self.client[-1]
438
441
439 @interactive
442 @interactive
440 def check_unicode(a, check):
443 def check_unicode(a, check):
441 assert isinstance(a, unicode), "%r is not unicode"%a
444 assert isinstance(a, unicode), "%r is not unicode"%a
442 assert isinstance(check, bytes), "%r is not bytes"%check
445 assert isinstance(check, bytes), "%r is not bytes"%check
443 assert a.encode('utf8') == check, "%s != %s"%(a,check)
446 assert a.encode('utf8') == check, "%s != %s"%(a,check)
444
447
445 for s in [ u'é', u'ßø®∫',u'asdf' ]:
448 for s in [ u'é', u'ßø®∫',u'asdf' ]:
446 try:
449 try:
447 v.apply_sync(check_unicode, s, s.encode('utf8'))
450 v.apply_sync(check_unicode, s, s.encode('utf8'))
448 except error.RemoteError as e:
451 except error.RemoteError as e:
449 if e.ename == 'AssertionError':
452 if e.ename == 'AssertionError':
450 self.fail(e.evalue)
453 self.fail(e.evalue)
451 else:
454 else:
452 raise e
455 raise e
453
456
454 def test_map_reference(self):
457 def test_map_reference(self):
455 """view.map(<Reference>, *seqs) should work"""
458 """view.map(<Reference>, *seqs) should work"""
456 v = self.client[:]
459 v = self.client[:]
457 v.scatter('n', self.client.ids, flatten=True)
460 v.scatter('n', self.client.ids, flatten=True)
458 v.execute("f = lambda x,y: x*y")
461 v.execute("f = lambda x,y: x*y")
459 rf = pmod.Reference('f')
462 rf = pmod.Reference('f')
460 nlist = list(range(10))
463 nlist = list(range(10))
461 mlist = nlist[::-1]
464 mlist = nlist[::-1]
462 expected = [ m*n for m,n in zip(mlist, nlist) ]
465 expected = [ m*n for m,n in zip(mlist, nlist) ]
463 result = v.map_sync(rf, mlist, nlist)
466 result = v.map_sync(rf, mlist, nlist)
464 self.assertEqual(result, expected)
467 self.assertEqual(result, expected)
465
468
466 def test_apply_reference(self):
469 def test_apply_reference(self):
467 """view.apply(<Reference>, *args) should work"""
470 """view.apply(<Reference>, *args) should work"""
468 v = self.client[:]
471 v = self.client[:]
469 v.scatter('n', self.client.ids, flatten=True)
472 v.scatter('n', self.client.ids, flatten=True)
470 v.execute("f = lambda x: n*x")
473 v.execute("f = lambda x: n*x")
471 rf = pmod.Reference('f')
474 rf = pmod.Reference('f')
472 result = v.apply_sync(rf, 5)
475 result = v.apply_sync(rf, 5)
473 expected = [ 5*id for id in self.client.ids ]
476 expected = [ 5*id for id in self.client.ids ]
474 self.assertEqual(result, expected)
477 self.assertEqual(result, expected)
475
478
476 def test_eval_reference(self):
479 def test_eval_reference(self):
477 v = self.client[self.client.ids[0]]
480 v = self.client[self.client.ids[0]]
478 v['g'] = range(5)
481 v['g'] = range(5)
479 rg = pmod.Reference('g[0]')
482 rg = pmod.Reference('g[0]')
480 echo = lambda x:x
483 echo = lambda x:x
481 self.assertEqual(v.apply_sync(echo, rg), 0)
484 self.assertEqual(v.apply_sync(echo, rg), 0)
482
485
483 def test_reference_nameerror(self):
486 def test_reference_nameerror(self):
484 v = self.client[self.client.ids[0]]
487 v = self.client[self.client.ids[0]]
485 r = pmod.Reference('elvis_has_left')
488 r = pmod.Reference('elvis_has_left')
486 echo = lambda x:x
489 echo = lambda x:x
487 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
490 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
488
491
489 def test_single_engine_map(self):
492 def test_single_engine_map(self):
490 e0 = self.client[self.client.ids[0]]
493 e0 = self.client[self.client.ids[0]]
491 r = range(5)
494 r = range(5)
492 check = [ -1*i for i in r ]
495 check = [ -1*i for i in r ]
493 result = e0.map_sync(lambda x: -1*x, r)
496 result = e0.map_sync(lambda x: -1*x, r)
494 self.assertEqual(result, check)
497 self.assertEqual(result, check)
495
498
496 def test_len(self):
499 def test_len(self):
497 """len(view) makes sense"""
500 """len(view) makes sense"""
498 e0 = self.client[self.client.ids[0]]
501 e0 = self.client[self.client.ids[0]]
499 yield self.assertEqual(len(e0), 1)
502 yield self.assertEqual(len(e0), 1)
500 v = self.client[:]
503 v = self.client[:]
501 yield self.assertEqual(len(v), len(self.client.ids))
504 yield self.assertEqual(len(v), len(self.client.ids))
502 v = self.client.direct_view('all')
505 v = self.client.direct_view('all')
503 yield self.assertEqual(len(v), len(self.client.ids))
506 yield self.assertEqual(len(v), len(self.client.ids))
504 v = self.client[:2]
507 v = self.client[:2]
505 yield self.assertEqual(len(v), 2)
508 yield self.assertEqual(len(v), 2)
506 v = self.client[:1]
509 v = self.client[:1]
507 yield self.assertEqual(len(v), 1)
510 yield self.assertEqual(len(v), 1)
508 v = self.client.load_balanced_view()
511 v = self.client.load_balanced_view()
509 yield self.assertEqual(len(v), len(self.client.ids))
512 yield self.assertEqual(len(v), len(self.client.ids))
510 # parametric tests seem to require manual closing?
513 # parametric tests seem to require manual closing?
511 self.client.close()
514 self.client.close()
512
515
513
516
514 # begin execute tests
517 # begin execute tests
515
518
516 def test_execute_reply(self):
519 def test_execute_reply(self):
517 e0 = self.client[self.client.ids[0]]
520 e0 = self.client[self.client.ids[0]]
518 e0.block = True
521 e0.block = True
519 ar = e0.execute("5", silent=False)
522 ar = e0.execute("5", silent=False)
520 er = ar.get()
523 er = ar.get()
521 self.assertEqual(str(er), "<ExecuteReply[%i]: 5>" % er.execution_count)
524 self.assertEqual(str(er), "<ExecuteReply[%i]: 5>" % er.execution_count)
522 self.assertEqual(er.pyout['data']['text/plain'], '5')
525 self.assertEqual(er.pyout['data']['text/plain'], '5')
523
526
524 def test_execute_reply_stdout(self):
527 def test_execute_reply_stdout(self):
525 e0 = self.client[self.client.ids[0]]
528 e0 = self.client[self.client.ids[0]]
526 e0.block = True
529 e0.block = True
527 ar = e0.execute("print (5)", silent=False)
530 ar = e0.execute("print (5)", silent=False)
528 er = ar.get()
531 er = ar.get()
529 self.assertEqual(er.stdout.strip(), '5')
532 self.assertEqual(er.stdout.strip(), '5')
530
533
531 def test_execute_pyout(self):
534 def test_execute_pyout(self):
532 """execute triggers pyout with silent=False"""
535 """execute triggers pyout with silent=False"""
533 view = self.client[:]
536 view = self.client[:]
534 ar = view.execute("5", silent=False, block=True)
537 ar = view.execute("5", silent=False, block=True)
535
538
536 expected = [{'text/plain' : '5'}] * len(view)
539 expected = [{'text/plain' : '5'}] * len(view)
537 mimes = [ out['data'] for out in ar.pyout ]
540 mimes = [ out['data'] for out in ar.pyout ]
538 self.assertEqual(mimes, expected)
541 self.assertEqual(mimes, expected)
539
542
540 def test_execute_silent(self):
543 def test_execute_silent(self):
541 """execute does not trigger pyout with silent=True"""
544 """execute does not trigger pyout with silent=True"""
542 view = self.client[:]
545 view = self.client[:]
543 ar = view.execute("5", block=True)
546 ar = view.execute("5", block=True)
544 expected = [None] * len(view)
547 expected = [None] * len(view)
545 self.assertEqual(ar.pyout, expected)
548 self.assertEqual(ar.pyout, expected)
546
549
547 def test_execute_magic(self):
550 def test_execute_magic(self):
548 """execute accepts IPython commands"""
551 """execute accepts IPython commands"""
549 view = self.client[:]
552 view = self.client[:]
550 view.execute("a = 5")
553 view.execute("a = 5")
551 ar = view.execute("%whos", block=True)
554 ar = view.execute("%whos", block=True)
552 # this will raise, if that failed
555 # this will raise, if that failed
553 ar.get(5)
556 ar.get(5)
554 for stdout in ar.stdout:
557 for stdout in ar.stdout:
555 lines = stdout.splitlines()
558 lines = stdout.splitlines()
556 self.assertEqual(lines[0].split(), ['Variable', 'Type', 'Data/Info'])
559 self.assertEqual(lines[0].split(), ['Variable', 'Type', 'Data/Info'])
557 found = False
560 found = False
558 for line in lines[2:]:
561 for line in lines[2:]:
559 split = line.split()
562 split = line.split()
560 if split == ['a', 'int', '5']:
563 if split == ['a', 'int', '5']:
561 found = True
564 found = True
562 break
565 break
563 self.assertTrue(found, "whos output wrong: %s" % stdout)
566 self.assertTrue(found, "whos output wrong: %s" % stdout)
564
567
565 def test_execute_displaypub(self):
568 def test_execute_displaypub(self):
566 """execute tracks display_pub output"""
569 """execute tracks display_pub output"""
567 view = self.client[:]
570 view = self.client[:]
568 view.execute("from IPython.core.display import *")
571 view.execute("from IPython.core.display import *")
569 ar = view.execute("[ display(i) for i in range(5) ]", block=True)
572 ar = view.execute("[ display(i) for i in range(5) ]", block=True)
570
573
571 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
574 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
572 for outputs in ar.outputs:
575 for outputs in ar.outputs:
573 mimes = [ out['data'] for out in outputs ]
576 mimes = [ out['data'] for out in outputs ]
574 self.assertEqual(mimes, expected)
577 self.assertEqual(mimes, expected)
575
578
576 def test_apply_displaypub(self):
579 def test_apply_displaypub(self):
577 """apply tracks display_pub output"""
580 """apply tracks display_pub output"""
578 view = self.client[:]
581 view = self.client[:]
579 view.execute("from IPython.core.display import *")
582 view.execute("from IPython.core.display import *")
580
583
581 @interactive
584 @interactive
582 def publish():
585 def publish():
583 [ display(i) for i in range(5) ]
586 [ display(i) for i in range(5) ]
584
587
585 ar = view.apply_async(publish)
588 ar = view.apply_async(publish)
586 ar.get(5)
589 ar.get(5)
587 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
590 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
588 for outputs in ar.outputs:
591 for outputs in ar.outputs:
589 mimes = [ out['data'] for out in outputs ]
592 mimes = [ out['data'] for out in outputs ]
590 self.assertEqual(mimes, expected)
593 self.assertEqual(mimes, expected)
591
594
592 def test_execute_raises(self):
595 def test_execute_raises(self):
593 """exceptions in execute requests raise appropriately"""
596 """exceptions in execute requests raise appropriately"""
594 view = self.client[-1]
597 view = self.client[-1]
595 ar = view.execute("1/0")
598 ar = view.execute("1/0")
596 self.assertRaisesRemote(ZeroDivisionError, ar.get, 2)
599 self.assertRaisesRemote(ZeroDivisionError, ar.get, 2)
597
600
598 def test_remoteerror_render_exception(self):
601 def test_remoteerror_render_exception(self):
599 """RemoteErrors get nice tracebacks"""
602 """RemoteErrors get nice tracebacks"""
600 view = self.client[-1]
603 view = self.client[-1]
601 ar = view.execute("1/0")
604 ar = view.execute("1/0")
602 ip = get_ipython()
605 ip = get_ipython()
603 ip.user_ns['ar'] = ar
606 ip.user_ns['ar'] = ar
604 with capture_output() as io:
607 with capture_output() as io:
605 ip.run_cell("ar.get(2)")
608 ip.run_cell("ar.get(2)")
606
609
607 self.assertTrue('ZeroDivisionError' in io.stdout, io.stdout)
610 self.assertTrue('ZeroDivisionError' in io.stdout, io.stdout)
608
611
609 def test_compositeerror_render_exception(self):
612 def test_compositeerror_render_exception(self):
610 """CompositeErrors get nice tracebacks"""
613 """CompositeErrors get nice tracebacks"""
611 view = self.client[:]
614 view = self.client[:]
612 ar = view.execute("1/0")
615 ar = view.execute("1/0")
613 ip = get_ipython()
616 ip = get_ipython()
614 ip.user_ns['ar'] = ar
617 ip.user_ns['ar'] = ar
615 with capture_output() as io:
618 with capture_output() as io:
616 ip.run_cell("ar.get(2)")
619 ip.run_cell("ar.get(2)")
617
620
618 self.assertEqual(io.stdout.count('ZeroDivisionError'), len(view) * 2, io.stdout)
621 self.assertEqual(io.stdout.count('ZeroDivisionError'), len(view) * 2, io.stdout)
619 self.assertEqual(io.stdout.count('by zero'), len(view), io.stdout)
622 self.assertEqual(io.stdout.count('by zero'), len(view), io.stdout)
620 self.assertEqual(io.stdout.count(':execute'), len(view), io.stdout)
623 self.assertEqual(io.stdout.count(':execute'), len(view), io.stdout)
621
624
622 @dec.skipif_not_matplotlib
625 @dec.skipif_not_matplotlib
623 def test_magic_pylab(self):
626 def test_magic_pylab(self):
624 """%pylab works on engines"""
627 """%pylab works on engines"""
625 view = self.client[-1]
628 view = self.client[-1]
626 ar = view.execute("%pylab inline")
629 ar = view.execute("%pylab inline")
627 # at least check if this raised:
630 # at least check if this raised:
628 reply = ar.get(5)
631 reply = ar.get(5)
629 # include imports, in case user config
632 # include imports, in case user config
630 ar = view.execute("plot(rand(100))", silent=False)
633 ar = view.execute("plot(rand(100))", silent=False)
631 reply = ar.get(5)
634 reply = ar.get(5)
632 self.assertEqual(len(reply.outputs), 1)
635 self.assertEqual(len(reply.outputs), 1)
633 output = reply.outputs[0]
636 output = reply.outputs[0]
634 self.assertTrue("data" in output)
637 self.assertTrue("data" in output)
635 data = output['data']
638 data = output['data']
636 self.assertTrue("image/png" in data)
639 self.assertTrue("image/png" in data)
637
640
638 def test_func_default_func(self):
641 def test_func_default_func(self):
639 """interactively defined function as apply func default"""
642 """interactively defined function as apply func default"""
640 def foo():
643 def foo():
641 return 'foo'
644 return 'foo'
642
645
643 def bar(f=foo):
646 def bar(f=foo):
644 return f()
647 return f()
645
648
646 view = self.client[-1]
649 view = self.client[-1]
647 ar = view.apply_async(bar)
650 ar = view.apply_async(bar)
648 r = ar.get(10)
651 r = ar.get(10)
649 self.assertEqual(r, 'foo')
652 self.assertEqual(r, 'foo')
650 def test_data_pub_single(self):
653 def test_data_pub_single(self):
651 view = self.client[-1]
654 view = self.client[-1]
652 ar = view.execute('\n'.join([
655 ar = view.execute('\n'.join([
653 'from IPython.kernel.zmq.datapub import publish_data',
656 'from IPython.kernel.zmq.datapub import publish_data',
654 'for i in range(5):',
657 'for i in range(5):',
655 ' publish_data(dict(i=i))'
658 ' publish_data(dict(i=i))'
656 ]), block=False)
659 ]), block=False)
657 self.assertTrue(isinstance(ar.data, dict))
660 self.assertTrue(isinstance(ar.data, dict))
658 ar.get(5)
661 ar.get(5)
659 self.assertEqual(ar.data, dict(i=4))
662 self.assertEqual(ar.data, dict(i=4))
660
663
661 def test_data_pub(self):
664 def test_data_pub(self):
662 view = self.client[:]
665 view = self.client[:]
663 ar = view.execute('\n'.join([
666 ar = view.execute('\n'.join([
664 'from IPython.kernel.zmq.datapub import publish_data',
667 'from IPython.kernel.zmq.datapub import publish_data',
665 'for i in range(5):',
668 'for i in range(5):',
666 ' publish_data(dict(i=i))'
669 ' publish_data(dict(i=i))'
667 ]), block=False)
670 ]), block=False)
668 self.assertTrue(all(isinstance(d, dict) for d in ar.data))
671 self.assertTrue(all(isinstance(d, dict) for d in ar.data))
669 ar.get(5)
672 ar.get(5)
670 self.assertEqual(ar.data, [dict(i=4)] * len(ar))
673 self.assertEqual(ar.data, [dict(i=4)] * len(ar))
671
674
672 def test_can_list_arg(self):
675 def test_can_list_arg(self):
673 """args in lists are canned"""
676 """args in lists are canned"""
674 view = self.client[-1]
677 view = self.client[-1]
675 view['a'] = 128
678 view['a'] = 128
676 rA = pmod.Reference('a')
679 rA = pmod.Reference('a')
677 ar = view.apply_async(lambda x: x, [rA])
680 ar = view.apply_async(lambda x: x, [rA])
678 r = ar.get(5)
681 r = ar.get(5)
679 self.assertEqual(r, [128])
682 self.assertEqual(r, [128])
680
683
681 def test_can_dict_arg(self):
684 def test_can_dict_arg(self):
682 """args in dicts are canned"""
685 """args in dicts are canned"""
683 view = self.client[-1]
686 view = self.client[-1]
684 view['a'] = 128
687 view['a'] = 128
685 rA = pmod.Reference('a')
688 rA = pmod.Reference('a')
686 ar = view.apply_async(lambda x: x, dict(foo=rA))
689 ar = view.apply_async(lambda x: x, dict(foo=rA))
687 r = ar.get(5)
690 r = ar.get(5)
688 self.assertEqual(r, dict(foo=128))
691 self.assertEqual(r, dict(foo=128))
689
692
690 def test_can_list_kwarg(self):
693 def test_can_list_kwarg(self):
691 """kwargs in lists are canned"""
694 """kwargs in lists are canned"""
692 view = self.client[-1]
695 view = self.client[-1]
693 view['a'] = 128
696 view['a'] = 128
694 rA = pmod.Reference('a')
697 rA = pmod.Reference('a')
695 ar = view.apply_async(lambda x=5: x, x=[rA])
698 ar = view.apply_async(lambda x=5: x, x=[rA])
696 r = ar.get(5)
699 r = ar.get(5)
697 self.assertEqual(r, [128])
700 self.assertEqual(r, [128])
698
701
699 def test_can_dict_kwarg(self):
702 def test_can_dict_kwarg(self):
700 """kwargs in dicts are canned"""
703 """kwargs in dicts are canned"""
701 view = self.client[-1]
704 view = self.client[-1]
702 view['a'] = 128
705 view['a'] = 128
703 rA = pmod.Reference('a')
706 rA = pmod.Reference('a')
704 ar = view.apply_async(lambda x=5: x, dict(foo=rA))
707 ar = view.apply_async(lambda x=5: x, dict(foo=rA))
705 r = ar.get(5)
708 r = ar.get(5)
706 self.assertEqual(r, dict(foo=128))
709 self.assertEqual(r, dict(foo=128))
707
710
708 def test_map_ref(self):
711 def test_map_ref(self):
709 """view.map works with references"""
712 """view.map works with references"""
710 view = self.client[:]
713 view = self.client[:]
711 ranks = sorted(self.client.ids)
714 ranks = sorted(self.client.ids)
712 view.scatter('rank', ranks, flatten=True)
715 view.scatter('rank', ranks, flatten=True)
713 rrank = pmod.Reference('rank')
716 rrank = pmod.Reference('rank')
714
717
715 amr = view.map_async(lambda x: x*2, [rrank] * len(view))
718 amr = view.map_async(lambda x: x*2, [rrank] * len(view))
716 drank = amr.get(5)
719 drank = amr.get(5)
717 self.assertEqual(drank, [ r*2 for r in ranks ])
720 self.assertEqual(drank, [ r*2 for r in ranks ])
718
721
719 def test_nested_getitem_setitem(self):
722 def test_nested_getitem_setitem(self):
720 """get and set with view['a.b']"""
723 """get and set with view['a.b']"""
721 view = self.client[-1]
724 view = self.client[-1]
722 view.execute('\n'.join([
725 view.execute('\n'.join([
723 'class A(object): pass',
726 'class A(object): pass',
724 'a = A()',
727 'a = A()',
725 'a.b = 128',
728 'a.b = 128',
726 ]), block=True)
729 ]), block=True)
727 ra = pmod.Reference('a')
730 ra = pmod.Reference('a')
728
731
729 r = view.apply_sync(lambda x: x.b, ra)
732 r = view.apply_sync(lambda x: x.b, ra)
730 self.assertEqual(r, 128)
733 self.assertEqual(r, 128)
731 self.assertEqual(view['a.b'], 128)
734 self.assertEqual(view['a.b'], 128)
732
735
733 view['a.b'] = 0
736 view['a.b'] = 0
734
737
735 r = view.apply_sync(lambda x: x.b, ra)
738 r = view.apply_sync(lambda x: x.b, ra)
736 self.assertEqual(r, 0)
739 self.assertEqual(r, 0)
737 self.assertEqual(view['a.b'], 0)
740 self.assertEqual(view['a.b'], 0)
741
742 def test_return_namedtuple(self):
743 def namedtuplify(x, y):
744 from IPython.parallel.tests.test_view import point
745 return point(x, y)
746
747 view = self.client[-1]
748 p = view.apply_sync(namedtuplify, 1, 2)
749 self.assertEqual(p.x, 1)
750 self.assertEqual(p.y, 2)
751
752 def test_apply_namedtuple(self):
753 def echoxy(p):
754 return p.y, p.x
755
756 view = self.client[-1]
757 tup = view.apply_sync(echoxy, point(1, 2))
758 self.assertEqual(tup, (2,1))
759
General Comments 0
You need to be logged in to leave comments. Login now