##// END OF EJS Templates
test push/pull of pandas TimeSeries
MinRK -
Show More
@@ -1,722 +1,737 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """test View objects
2 """test View objects
3
3
4 Authors:
4 Authors:
5
5
6 * Min RK
6 * Min RK
7 """
7 """
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import sys
19 import sys
20 import platform
20 import platform
21 import time
21 import time
22 from tempfile import mktemp
22 from tempfile import mktemp
23 from StringIO import StringIO
23 from StringIO import StringIO
24
24
25 import zmq
25 import zmq
26 from nose import SkipTest
26 from nose import SkipTest
27 from nose.plugins.attrib import attr
27 from nose.plugins.attrib import attr
28
28
29 from IPython.testing import decorators as dec
29 from IPython.testing import decorators as dec
30 from IPython.testing.ipunittest import ParametricTestCase
30 from IPython.testing.ipunittest import ParametricTestCase
31 from IPython.utils.io import capture_output
31 from IPython.utils.io import capture_output
32
32
33 from IPython import parallel as pmod
33 from IPython import parallel as pmod
34 from IPython.parallel import error
34 from IPython.parallel import error
35 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
35 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
36 from IPython.parallel import DirectView
36 from IPython.parallel import DirectView
37 from IPython.parallel.util import interactive
37 from IPython.parallel.util import interactive
38
38
39 from IPython.parallel.tests import add_engines
39 from IPython.parallel.tests import add_engines
40
40
41 from .clienttest import ClusterTestCase, crash, wait, skip_without
41 from .clienttest import ClusterTestCase, crash, wait, skip_without
42
42
43 def setup():
43 def setup():
44 add_engines(3, total=True)
44 add_engines(3, total=True)
45
45
46 class TestView(ClusterTestCase, ParametricTestCase):
46 class TestView(ClusterTestCase, ParametricTestCase):
47
47
48 def setUp(self):
48 def setUp(self):
49 # On Win XP, wait for resource cleanup, else parallel test group fails
49 # On Win XP, wait for resource cleanup, else parallel test group fails
50 if platform.system() == "Windows" and platform.win32_ver()[0] == "XP":
50 if platform.system() == "Windows" and platform.win32_ver()[0] == "XP":
51 # 1 sec fails. 1.5 sec seems ok. Using 2 sec for margin of safety
51 # 1 sec fails. 1.5 sec seems ok. Using 2 sec for margin of safety
52 time.sleep(2)
52 time.sleep(2)
53 super(TestView, self).setUp()
53 super(TestView, self).setUp()
54
54
55 @attr('crash')
55 @attr('crash')
56 def test_z_crash_mux(self):
56 def test_z_crash_mux(self):
57 """test graceful handling of engine death (direct)"""
57 """test graceful handling of engine death (direct)"""
58 # self.add_engines(1)
58 # self.add_engines(1)
59 eid = self.client.ids[-1]
59 eid = self.client.ids[-1]
60 ar = self.client[eid].apply_async(crash)
60 ar = self.client[eid].apply_async(crash)
61 self.assertRaisesRemote(error.EngineError, ar.get, 10)
61 self.assertRaisesRemote(error.EngineError, ar.get, 10)
62 eid = ar.engine_id
62 eid = ar.engine_id
63 tic = time.time()
63 tic = time.time()
64 while eid in self.client.ids and time.time()-tic < 5:
64 while eid in self.client.ids and time.time()-tic < 5:
65 time.sleep(.01)
65 time.sleep(.01)
66 self.client.spin()
66 self.client.spin()
67 self.assertFalse(eid in self.client.ids, "Engine should have died")
67 self.assertFalse(eid in self.client.ids, "Engine should have died")
68
68
69 def test_push_pull(self):
69 def test_push_pull(self):
70 """test pushing and pulling"""
70 """test pushing and pulling"""
71 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
71 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
72 t = self.client.ids[-1]
72 t = self.client.ids[-1]
73 v = self.client[t]
73 v = self.client[t]
74 push = v.push
74 push = v.push
75 pull = v.pull
75 pull = v.pull
76 v.block=True
76 v.block=True
77 nengines = len(self.client)
77 nengines = len(self.client)
78 push({'data':data})
78 push({'data':data})
79 d = pull('data')
79 d = pull('data')
80 self.assertEqual(d, data)
80 self.assertEqual(d, data)
81 self.client[:].push({'data':data})
81 self.client[:].push({'data':data})
82 d = self.client[:].pull('data', block=True)
82 d = self.client[:].pull('data', block=True)
83 self.assertEqual(d, nengines*[data])
83 self.assertEqual(d, nengines*[data])
84 ar = push({'data':data}, block=False)
84 ar = push({'data':data}, block=False)
85 self.assertTrue(isinstance(ar, AsyncResult))
85 self.assertTrue(isinstance(ar, AsyncResult))
86 r = ar.get()
86 r = ar.get()
87 ar = self.client[:].pull('data', block=False)
87 ar = self.client[:].pull('data', block=False)
88 self.assertTrue(isinstance(ar, AsyncResult))
88 self.assertTrue(isinstance(ar, AsyncResult))
89 r = ar.get()
89 r = ar.get()
90 self.assertEqual(r, nengines*[data])
90 self.assertEqual(r, nengines*[data])
91 self.client[:].push(dict(a=10,b=20))
91 self.client[:].push(dict(a=10,b=20))
92 r = self.client[:].pull(('a','b'), block=True)
92 r = self.client[:].pull(('a','b'), block=True)
93 self.assertEqual(r, nengines*[[10,20]])
93 self.assertEqual(r, nengines*[[10,20]])
94
94
95 def test_push_pull_function(self):
95 def test_push_pull_function(self):
96 "test pushing and pulling functions"
96 "test pushing and pulling functions"
97 def testf(x):
97 def testf(x):
98 return 2.0*x
98 return 2.0*x
99
99
100 t = self.client.ids[-1]
100 t = self.client.ids[-1]
101 v = self.client[t]
101 v = self.client[t]
102 v.block=True
102 v.block=True
103 push = v.push
103 push = v.push
104 pull = v.pull
104 pull = v.pull
105 execute = v.execute
105 execute = v.execute
106 push({'testf':testf})
106 push({'testf':testf})
107 r = pull('testf')
107 r = pull('testf')
108 self.assertEqual(r(1.0), testf(1.0))
108 self.assertEqual(r(1.0), testf(1.0))
109 execute('r = testf(10)')
109 execute('r = testf(10)')
110 r = pull('r')
110 r = pull('r')
111 self.assertEqual(r, testf(10))
111 self.assertEqual(r, testf(10))
112 ar = self.client[:].push({'testf':testf}, block=False)
112 ar = self.client[:].push({'testf':testf}, block=False)
113 ar.get()
113 ar.get()
114 ar = self.client[:].pull('testf', block=False)
114 ar = self.client[:].pull('testf', block=False)
115 rlist = ar.get()
115 rlist = ar.get()
116 for r in rlist:
116 for r in rlist:
117 self.assertEqual(r(1.0), testf(1.0))
117 self.assertEqual(r(1.0), testf(1.0))
118 execute("def g(x): return x*x")
118 execute("def g(x): return x*x")
119 r = pull(('testf','g'))
119 r = pull(('testf','g'))
120 self.assertEqual((r[0](10),r[1](10)), (testf(10), 100))
120 self.assertEqual((r[0](10),r[1](10)), (testf(10), 100))
121
121
122 def test_push_function_globals(self):
122 def test_push_function_globals(self):
123 """test that pushed functions have access to globals"""
123 """test that pushed functions have access to globals"""
124 @interactive
124 @interactive
125 def geta():
125 def geta():
126 return a
126 return a
127 # self.add_engines(1)
127 # self.add_engines(1)
128 v = self.client[-1]
128 v = self.client[-1]
129 v.block=True
129 v.block=True
130 v['f'] = geta
130 v['f'] = geta
131 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
131 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
132 v.execute('a=5')
132 v.execute('a=5')
133 v.execute('b=f()')
133 v.execute('b=f()')
134 self.assertEqual(v['b'], 5)
134 self.assertEqual(v['b'], 5)
135
135
136 def test_push_function_defaults(self):
136 def test_push_function_defaults(self):
137 """test that pushed functions preserve default args"""
137 """test that pushed functions preserve default args"""
138 def echo(a=10):
138 def echo(a=10):
139 return a
139 return a
140 v = self.client[-1]
140 v = self.client[-1]
141 v.block=True
141 v.block=True
142 v['f'] = echo
142 v['f'] = echo
143 v.execute('b=f()')
143 v.execute('b=f()')
144 self.assertEqual(v['b'], 10)
144 self.assertEqual(v['b'], 10)
145
145
146 def test_get_result(self):
146 def test_get_result(self):
147 """test getting results from the Hub."""
147 """test getting results from the Hub."""
148 c = pmod.Client(profile='iptest')
148 c = pmod.Client(profile='iptest')
149 # self.add_engines(1)
149 # self.add_engines(1)
150 t = c.ids[-1]
150 t = c.ids[-1]
151 v = c[t]
151 v = c[t]
152 v2 = self.client[t]
152 v2 = self.client[t]
153 ar = v.apply_async(wait, 1)
153 ar = v.apply_async(wait, 1)
154 # give the monitor time to notice the message
154 # give the monitor time to notice the message
155 time.sleep(.25)
155 time.sleep(.25)
156 ahr = v2.get_result(ar.msg_ids)
156 ahr = v2.get_result(ar.msg_ids)
157 self.assertTrue(isinstance(ahr, AsyncHubResult))
157 self.assertTrue(isinstance(ahr, AsyncHubResult))
158 self.assertEqual(ahr.get(), ar.get())
158 self.assertEqual(ahr.get(), ar.get())
159 ar2 = v2.get_result(ar.msg_ids)
159 ar2 = v2.get_result(ar.msg_ids)
160 self.assertFalse(isinstance(ar2, AsyncHubResult))
160 self.assertFalse(isinstance(ar2, AsyncHubResult))
161 c.spin()
161 c.spin()
162 c.close()
162 c.close()
163
163
164 def test_run_newline(self):
164 def test_run_newline(self):
165 """test that run appends newline to files"""
165 """test that run appends newline to files"""
166 tmpfile = mktemp()
166 tmpfile = mktemp()
167 with open(tmpfile, 'w') as f:
167 with open(tmpfile, 'w') as f:
168 f.write("""def g():
168 f.write("""def g():
169 return 5
169 return 5
170 """)
170 """)
171 v = self.client[-1]
171 v = self.client[-1]
172 v.run(tmpfile, block=True)
172 v.run(tmpfile, block=True)
173 self.assertEqual(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
173 self.assertEqual(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
174
174
175 def test_apply_tracked(self):
175 def test_apply_tracked(self):
176 """test tracking for apply"""
176 """test tracking for apply"""
177 # self.add_engines(1)
177 # self.add_engines(1)
178 t = self.client.ids[-1]
178 t = self.client.ids[-1]
179 v = self.client[t]
179 v = self.client[t]
180 v.block=False
180 v.block=False
181 def echo(n=1024*1024, **kwargs):
181 def echo(n=1024*1024, **kwargs):
182 with v.temp_flags(**kwargs):
182 with v.temp_flags(**kwargs):
183 return v.apply(lambda x: x, 'x'*n)
183 return v.apply(lambda x: x, 'x'*n)
184 ar = echo(1, track=False)
184 ar = echo(1, track=False)
185 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
185 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
186 self.assertTrue(ar.sent)
186 self.assertTrue(ar.sent)
187 ar = echo(track=True)
187 ar = echo(track=True)
188 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
188 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
189 self.assertEqual(ar.sent, ar._tracker.done)
189 self.assertEqual(ar.sent, ar._tracker.done)
190 ar._tracker.wait()
190 ar._tracker.wait()
191 self.assertTrue(ar.sent)
191 self.assertTrue(ar.sent)
192
192
193 def test_push_tracked(self):
193 def test_push_tracked(self):
194 t = self.client.ids[-1]
194 t = self.client.ids[-1]
195 ns = dict(x='x'*1024*1024)
195 ns = dict(x='x'*1024*1024)
196 v = self.client[t]
196 v = self.client[t]
197 ar = v.push(ns, block=False, track=False)
197 ar = v.push(ns, block=False, track=False)
198 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
198 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
199 self.assertTrue(ar.sent)
199 self.assertTrue(ar.sent)
200
200
201 ar = v.push(ns, block=False, track=True)
201 ar = v.push(ns, block=False, track=True)
202 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
202 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
203 ar._tracker.wait()
203 ar._tracker.wait()
204 self.assertEqual(ar.sent, ar._tracker.done)
204 self.assertEqual(ar.sent, ar._tracker.done)
205 self.assertTrue(ar.sent)
205 self.assertTrue(ar.sent)
206 ar.get()
206 ar.get()
207
207
208 def test_scatter_tracked(self):
208 def test_scatter_tracked(self):
209 t = self.client.ids
209 t = self.client.ids
210 x='x'*1024*1024
210 x='x'*1024*1024
211 ar = self.client[t].scatter('x', x, block=False, track=False)
211 ar = self.client[t].scatter('x', x, block=False, track=False)
212 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
212 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
213 self.assertTrue(ar.sent)
213 self.assertTrue(ar.sent)
214
214
215 ar = self.client[t].scatter('x', x, block=False, track=True)
215 ar = self.client[t].scatter('x', x, block=False, track=True)
216 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
216 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
217 self.assertEqual(ar.sent, ar._tracker.done)
217 self.assertEqual(ar.sent, ar._tracker.done)
218 ar._tracker.wait()
218 ar._tracker.wait()
219 self.assertTrue(ar.sent)
219 self.assertTrue(ar.sent)
220 ar.get()
220 ar.get()
221
221
222 def test_remote_reference(self):
222 def test_remote_reference(self):
223 v = self.client[-1]
223 v = self.client[-1]
224 v['a'] = 123
224 v['a'] = 123
225 ra = pmod.Reference('a')
225 ra = pmod.Reference('a')
226 b = v.apply_sync(lambda x: x, ra)
226 b = v.apply_sync(lambda x: x, ra)
227 self.assertEqual(b, 123)
227 self.assertEqual(b, 123)
228
228
229
229
230 def test_scatter_gather(self):
230 def test_scatter_gather(self):
231 view = self.client[:]
231 view = self.client[:]
232 seq1 = range(16)
232 seq1 = range(16)
233 view.scatter('a', seq1)
233 view.scatter('a', seq1)
234 seq2 = view.gather('a', block=True)
234 seq2 = view.gather('a', block=True)
235 self.assertEqual(seq2, seq1)
235 self.assertEqual(seq2, seq1)
236 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
236 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
237
237
238 @skip_without('numpy')
238 @skip_without('numpy')
239 def test_scatter_gather_numpy(self):
239 def test_scatter_gather_numpy(self):
240 import numpy
240 import numpy
241 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
241 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
242 view = self.client[:]
242 view = self.client[:]
243 a = numpy.arange(64)
243 a = numpy.arange(64)
244 view.scatter('a', a, block=True)
244 view.scatter('a', a, block=True)
245 b = view.gather('a', block=True)
245 b = view.gather('a', block=True)
246 assert_array_equal(b, a)
246 assert_array_equal(b, a)
247
247
248 def test_scatter_gather_lazy(self):
248 def test_scatter_gather_lazy(self):
249 """scatter/gather with targets='all'"""
249 """scatter/gather with targets='all'"""
250 view = self.client.direct_view(targets='all')
250 view = self.client.direct_view(targets='all')
251 x = range(64)
251 x = range(64)
252 view.scatter('x', x)
252 view.scatter('x', x)
253 gathered = view.gather('x', block=True)
253 gathered = view.gather('x', block=True)
254 self.assertEqual(gathered, x)
254 self.assertEqual(gathered, x)
255
255
256
256
257 @dec.known_failure_py3
257 @dec.known_failure_py3
258 @skip_without('numpy')
258 @skip_without('numpy')
259 def test_push_numpy_nocopy(self):
259 def test_push_numpy_nocopy(self):
260 import numpy
260 import numpy
261 view = self.client[:]
261 view = self.client[:]
262 a = numpy.arange(64)
262 a = numpy.arange(64)
263 view['A'] = a
263 view['A'] = a
264 @interactive
264 @interactive
265 def check_writeable(x):
265 def check_writeable(x):
266 return x.flags.writeable
266 return x.flags.writeable
267
267
268 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
268 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
269 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
269 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
270
270
271 view.push(dict(B=a))
271 view.push(dict(B=a))
272 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
272 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
273 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
273 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
274
274
275 @skip_without('numpy')
275 @skip_without('numpy')
276 def test_apply_numpy(self):
276 def test_apply_numpy(self):
277 """view.apply(f, ndarray)"""
277 """view.apply(f, ndarray)"""
278 import numpy
278 import numpy
279 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
279 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
280
280
281 A = numpy.random.random((100,100))
281 A = numpy.random.random((100,100))
282 view = self.client[-1]
282 view = self.client[-1]
283 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
283 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
284 B = A.astype(dt)
284 B = A.astype(dt)
285 C = view.apply_sync(lambda x:x, B)
285 C = view.apply_sync(lambda x:x, B)
286 assert_array_equal(B,C)
286 assert_array_equal(B,C)
287
287
288 @skip_without('numpy')
288 @skip_without('numpy')
289 def test_push_pull_recarray(self):
289 def test_push_pull_recarray(self):
290 """push/pull recarrays"""
290 """push/pull recarrays"""
291 import numpy
291 import numpy
292 from numpy.testing.utils import assert_array_equal
292 from numpy.testing.utils import assert_array_equal
293
293
294 view = self.client[-1]
294 view = self.client[-1]
295
295
296 R = numpy.array([
296 R = numpy.array([
297 (1, 'hi', 0.),
297 (1, 'hi', 0.),
298 (2**30, 'there', 2.5),
298 (2**30, 'there', 2.5),
299 (-99999, 'world', -12345.6789),
299 (-99999, 'world', -12345.6789),
300 ], [('n', int), ('s', '|S10'), ('f', float)])
300 ], [('n', int), ('s', '|S10'), ('f', float)])
301
301
302 view['RR'] = R
302 view['RR'] = R
303 R2 = view['RR']
303 R2 = view['RR']
304
304
305 r_dtype, r_shape = view.apply_sync(interactive(lambda : (RR.dtype, RR.shape)))
305 r_dtype, r_shape = view.apply_sync(interactive(lambda : (RR.dtype, RR.shape)))
306 self.assertEqual(r_dtype, R.dtype)
306 self.assertEqual(r_dtype, R.dtype)
307 self.assertEqual(r_shape, R.shape)
307 self.assertEqual(r_shape, R.shape)
308 self.assertEqual(R2.dtype, R.dtype)
308 self.assertEqual(R2.dtype, R.dtype)
309 self.assertEqual(R2.shape, R.shape)
309 self.assertEqual(R2.shape, R.shape)
310 assert_array_equal(R2, R)
310 assert_array_equal(R2, R)
311
311
312 @skip_without('pandas')
313 def test_push_pull_timeseries(self):
314 """push/pull pandas.TimeSeries"""
315 import pandas
316
317 ts = pandas.TimeSeries(range(10))
318
319 view = self.client[-1]
320
321 view.push(dict(ts=ts), block=True)
322 rts = view['ts']
323
324 self.assertEqual(type(rts), type(ts))
325 self.assertTrue((ts == rts).all())
326
312 def test_map(self):
327 def test_map(self):
313 view = self.client[:]
328 view = self.client[:]
314 def f(x):
329 def f(x):
315 return x**2
330 return x**2
316 data = range(16)
331 data = range(16)
317 r = view.map_sync(f, data)
332 r = view.map_sync(f, data)
318 self.assertEqual(r, map(f, data))
333 self.assertEqual(r, map(f, data))
319
334
320 def test_map_iterable(self):
335 def test_map_iterable(self):
321 """test map on iterables (direct)"""
336 """test map on iterables (direct)"""
322 view = self.client[:]
337 view = self.client[:]
323 # 101 is prime, so it won't be evenly distributed
338 # 101 is prime, so it won't be evenly distributed
324 arr = range(101)
339 arr = range(101)
325 # ensure it will be an iterator, even in Python 3
340 # ensure it will be an iterator, even in Python 3
326 it = iter(arr)
341 it = iter(arr)
327 r = view.map_sync(lambda x:x, arr)
342 r = view.map_sync(lambda x:x, arr)
328 self.assertEqual(r, list(arr))
343 self.assertEqual(r, list(arr))
329
344
330 def test_scatter_gather_nonblocking(self):
345 def test_scatter_gather_nonblocking(self):
331 data = range(16)
346 data = range(16)
332 view = self.client[:]
347 view = self.client[:]
333 view.scatter('a', data, block=False)
348 view.scatter('a', data, block=False)
334 ar = view.gather('a', block=False)
349 ar = view.gather('a', block=False)
335 self.assertEqual(ar.get(), data)
350 self.assertEqual(ar.get(), data)
336
351
337 @skip_without('numpy')
352 @skip_without('numpy')
338 def test_scatter_gather_numpy_nonblocking(self):
353 def test_scatter_gather_numpy_nonblocking(self):
339 import numpy
354 import numpy
340 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
355 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
341 a = numpy.arange(64)
356 a = numpy.arange(64)
342 view = self.client[:]
357 view = self.client[:]
343 ar = view.scatter('a', a, block=False)
358 ar = view.scatter('a', a, block=False)
344 self.assertTrue(isinstance(ar, AsyncResult))
359 self.assertTrue(isinstance(ar, AsyncResult))
345 amr = view.gather('a', block=False)
360 amr = view.gather('a', block=False)
346 self.assertTrue(isinstance(amr, AsyncMapResult))
361 self.assertTrue(isinstance(amr, AsyncMapResult))
347 assert_array_equal(amr.get(), a)
362 assert_array_equal(amr.get(), a)
348
363
349 def test_execute(self):
364 def test_execute(self):
350 view = self.client[:]
365 view = self.client[:]
351 # self.client.debug=True
366 # self.client.debug=True
352 execute = view.execute
367 execute = view.execute
353 ar = execute('c=30', block=False)
368 ar = execute('c=30', block=False)
354 self.assertTrue(isinstance(ar, AsyncResult))
369 self.assertTrue(isinstance(ar, AsyncResult))
355 ar = execute('d=[0,1,2]', block=False)
370 ar = execute('d=[0,1,2]', block=False)
356 self.client.wait(ar, 1)
371 self.client.wait(ar, 1)
357 self.assertEqual(len(ar.get()), len(self.client))
372 self.assertEqual(len(ar.get()), len(self.client))
358 for c in view['c']:
373 for c in view['c']:
359 self.assertEqual(c, 30)
374 self.assertEqual(c, 30)
360
375
361 def test_abort(self):
376 def test_abort(self):
362 view = self.client[-1]
377 view = self.client[-1]
363 ar = view.execute('import time; time.sleep(1)', block=False)
378 ar = view.execute('import time; time.sleep(1)', block=False)
364 ar2 = view.apply_async(lambda : 2)
379 ar2 = view.apply_async(lambda : 2)
365 ar3 = view.apply_async(lambda : 3)
380 ar3 = view.apply_async(lambda : 3)
366 view.abort(ar2)
381 view.abort(ar2)
367 view.abort(ar3.msg_ids)
382 view.abort(ar3.msg_ids)
368 self.assertRaises(error.TaskAborted, ar2.get)
383 self.assertRaises(error.TaskAborted, ar2.get)
369 self.assertRaises(error.TaskAborted, ar3.get)
384 self.assertRaises(error.TaskAborted, ar3.get)
370
385
371 def test_abort_all(self):
386 def test_abort_all(self):
372 """view.abort() aborts all outstanding tasks"""
387 """view.abort() aborts all outstanding tasks"""
373 view = self.client[-1]
388 view = self.client[-1]
374 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
389 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
375 view.abort()
390 view.abort()
376 view.wait(timeout=5)
391 view.wait(timeout=5)
377 for ar in ars[5:]:
392 for ar in ars[5:]:
378 self.assertRaises(error.TaskAborted, ar.get)
393 self.assertRaises(error.TaskAborted, ar.get)
379
394
380 def test_temp_flags(self):
395 def test_temp_flags(self):
381 view = self.client[-1]
396 view = self.client[-1]
382 view.block=True
397 view.block=True
383 with view.temp_flags(block=False):
398 with view.temp_flags(block=False):
384 self.assertFalse(view.block)
399 self.assertFalse(view.block)
385 self.assertTrue(view.block)
400 self.assertTrue(view.block)
386
401
387 @dec.known_failure_py3
402 @dec.known_failure_py3
388 def test_importer(self):
403 def test_importer(self):
389 view = self.client[-1]
404 view = self.client[-1]
390 view.clear(block=True)
405 view.clear(block=True)
391 with view.importer:
406 with view.importer:
392 import re
407 import re
393
408
394 @interactive
409 @interactive
395 def findall(pat, s):
410 def findall(pat, s):
396 # this globals() step isn't necessary in real code
411 # this globals() step isn't necessary in real code
397 # only to prevent a closure in the test
412 # only to prevent a closure in the test
398 re = globals()['re']
413 re = globals()['re']
399 return re.findall(pat, s)
414 return re.findall(pat, s)
400
415
401 self.assertEqual(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
416 self.assertEqual(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
402
417
403 def test_unicode_execute(self):
418 def test_unicode_execute(self):
404 """test executing unicode strings"""
419 """test executing unicode strings"""
405 v = self.client[-1]
420 v = self.client[-1]
406 v.block=True
421 v.block=True
407 if sys.version_info[0] >= 3:
422 if sys.version_info[0] >= 3:
408 code="a='é'"
423 code="a='é'"
409 else:
424 else:
410 code=u"a=u'é'"
425 code=u"a=u'é'"
411 v.execute(code)
426 v.execute(code)
412 self.assertEqual(v['a'], u'é')
427 self.assertEqual(v['a'], u'é')
413
428
414 def test_unicode_apply_result(self):
429 def test_unicode_apply_result(self):
415 """test unicode apply results"""
430 """test unicode apply results"""
416 v = self.client[-1]
431 v = self.client[-1]
417 r = v.apply_sync(lambda : u'é')
432 r = v.apply_sync(lambda : u'é')
418 self.assertEqual(r, u'é')
433 self.assertEqual(r, u'é')
419
434
420 def test_unicode_apply_arg(self):
435 def test_unicode_apply_arg(self):
421 """test passing unicode arguments to apply"""
436 """test passing unicode arguments to apply"""
422 v = self.client[-1]
437 v = self.client[-1]
423
438
424 @interactive
439 @interactive
425 def check_unicode(a, check):
440 def check_unicode(a, check):
426 assert isinstance(a, unicode), "%r is not unicode"%a
441 assert isinstance(a, unicode), "%r is not unicode"%a
427 assert isinstance(check, bytes), "%r is not bytes"%check
442 assert isinstance(check, bytes), "%r is not bytes"%check
428 assert a.encode('utf8') == check, "%s != %s"%(a,check)
443 assert a.encode('utf8') == check, "%s != %s"%(a,check)
429
444
430 for s in [ u'é', u'ßø®∫',u'asdf' ]:
445 for s in [ u'é', u'ßø®∫',u'asdf' ]:
431 try:
446 try:
432 v.apply_sync(check_unicode, s, s.encode('utf8'))
447 v.apply_sync(check_unicode, s, s.encode('utf8'))
433 except error.RemoteError as e:
448 except error.RemoteError as e:
434 if e.ename == 'AssertionError':
449 if e.ename == 'AssertionError':
435 self.fail(e.evalue)
450 self.fail(e.evalue)
436 else:
451 else:
437 raise e
452 raise e
438
453
439 def test_map_reference(self):
454 def test_map_reference(self):
440 """view.map(<Reference>, *seqs) should work"""
455 """view.map(<Reference>, *seqs) should work"""
441 v = self.client[:]
456 v = self.client[:]
442 v.scatter('n', self.client.ids, flatten=True)
457 v.scatter('n', self.client.ids, flatten=True)
443 v.execute("f = lambda x,y: x*y")
458 v.execute("f = lambda x,y: x*y")
444 rf = pmod.Reference('f')
459 rf = pmod.Reference('f')
445 nlist = list(range(10))
460 nlist = list(range(10))
446 mlist = nlist[::-1]
461 mlist = nlist[::-1]
447 expected = [ m*n for m,n in zip(mlist, nlist) ]
462 expected = [ m*n for m,n in zip(mlist, nlist) ]
448 result = v.map_sync(rf, mlist, nlist)
463 result = v.map_sync(rf, mlist, nlist)
449 self.assertEqual(result, expected)
464 self.assertEqual(result, expected)
450
465
451 def test_apply_reference(self):
466 def test_apply_reference(self):
452 """view.apply(<Reference>, *args) should work"""
467 """view.apply(<Reference>, *args) should work"""
453 v = self.client[:]
468 v = self.client[:]
454 v.scatter('n', self.client.ids, flatten=True)
469 v.scatter('n', self.client.ids, flatten=True)
455 v.execute("f = lambda x: n*x")
470 v.execute("f = lambda x: n*x")
456 rf = pmod.Reference('f')
471 rf = pmod.Reference('f')
457 result = v.apply_sync(rf, 5)
472 result = v.apply_sync(rf, 5)
458 expected = [ 5*id for id in self.client.ids ]
473 expected = [ 5*id for id in self.client.ids ]
459 self.assertEqual(result, expected)
474 self.assertEqual(result, expected)
460
475
461 def test_eval_reference(self):
476 def test_eval_reference(self):
462 v = self.client[self.client.ids[0]]
477 v = self.client[self.client.ids[0]]
463 v['g'] = range(5)
478 v['g'] = range(5)
464 rg = pmod.Reference('g[0]')
479 rg = pmod.Reference('g[0]')
465 echo = lambda x:x
480 echo = lambda x:x
466 self.assertEqual(v.apply_sync(echo, rg), 0)
481 self.assertEqual(v.apply_sync(echo, rg), 0)
467
482
468 def test_reference_nameerror(self):
483 def test_reference_nameerror(self):
469 v = self.client[self.client.ids[0]]
484 v = self.client[self.client.ids[0]]
470 r = pmod.Reference('elvis_has_left')
485 r = pmod.Reference('elvis_has_left')
471 echo = lambda x:x
486 echo = lambda x:x
472 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
487 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
473
488
474 def test_single_engine_map(self):
489 def test_single_engine_map(self):
475 e0 = self.client[self.client.ids[0]]
490 e0 = self.client[self.client.ids[0]]
476 r = range(5)
491 r = range(5)
477 check = [ -1*i for i in r ]
492 check = [ -1*i for i in r ]
478 result = e0.map_sync(lambda x: -1*x, r)
493 result = e0.map_sync(lambda x: -1*x, r)
479 self.assertEqual(result, check)
494 self.assertEqual(result, check)
480
495
481 def test_len(self):
496 def test_len(self):
482 """len(view) makes sense"""
497 """len(view) makes sense"""
483 e0 = self.client[self.client.ids[0]]
498 e0 = self.client[self.client.ids[0]]
484 yield self.assertEqual(len(e0), 1)
499 yield self.assertEqual(len(e0), 1)
485 v = self.client[:]
500 v = self.client[:]
486 yield self.assertEqual(len(v), len(self.client.ids))
501 yield self.assertEqual(len(v), len(self.client.ids))
487 v = self.client.direct_view('all')
502 v = self.client.direct_view('all')
488 yield self.assertEqual(len(v), len(self.client.ids))
503 yield self.assertEqual(len(v), len(self.client.ids))
489 v = self.client[:2]
504 v = self.client[:2]
490 yield self.assertEqual(len(v), 2)
505 yield self.assertEqual(len(v), 2)
491 v = self.client[:1]
506 v = self.client[:1]
492 yield self.assertEqual(len(v), 1)
507 yield self.assertEqual(len(v), 1)
493 v = self.client.load_balanced_view()
508 v = self.client.load_balanced_view()
494 yield self.assertEqual(len(v), len(self.client.ids))
509 yield self.assertEqual(len(v), len(self.client.ids))
495 # parametric tests seem to require manual closing?
510 # parametric tests seem to require manual closing?
496 self.client.close()
511 self.client.close()
497
512
498
513
499 # begin execute tests
514 # begin execute tests
500
515
501 def test_execute_reply(self):
516 def test_execute_reply(self):
502 e0 = self.client[self.client.ids[0]]
517 e0 = self.client[self.client.ids[0]]
503 e0.block = True
518 e0.block = True
504 ar = e0.execute("5", silent=False)
519 ar = e0.execute("5", silent=False)
505 er = ar.get()
520 er = ar.get()
506 self.assertEqual(str(er), "<ExecuteReply[%i]: 5>" % er.execution_count)
521 self.assertEqual(str(er), "<ExecuteReply[%i]: 5>" % er.execution_count)
507 self.assertEqual(er.pyout['data']['text/plain'], '5')
522 self.assertEqual(er.pyout['data']['text/plain'], '5')
508
523
509 def test_execute_reply_stdout(self):
524 def test_execute_reply_stdout(self):
510 e0 = self.client[self.client.ids[0]]
525 e0 = self.client[self.client.ids[0]]
511 e0.block = True
526 e0.block = True
512 ar = e0.execute("print (5)", silent=False)
527 ar = e0.execute("print (5)", silent=False)
513 er = ar.get()
528 er = ar.get()
514 self.assertEqual(er.stdout.strip(), '5')
529 self.assertEqual(er.stdout.strip(), '5')
515
530
516 def test_execute_pyout(self):
531 def test_execute_pyout(self):
517 """execute triggers pyout with silent=False"""
532 """execute triggers pyout with silent=False"""
518 view = self.client[:]
533 view = self.client[:]
519 ar = view.execute("5", silent=False, block=True)
534 ar = view.execute("5", silent=False, block=True)
520
535
521 expected = [{'text/plain' : '5'}] * len(view)
536 expected = [{'text/plain' : '5'}] * len(view)
522 mimes = [ out['data'] for out in ar.pyout ]
537 mimes = [ out['data'] for out in ar.pyout ]
523 self.assertEqual(mimes, expected)
538 self.assertEqual(mimes, expected)
524
539
525 def test_execute_silent(self):
540 def test_execute_silent(self):
526 """execute does not trigger pyout with silent=True"""
541 """execute does not trigger pyout with silent=True"""
527 view = self.client[:]
542 view = self.client[:]
528 ar = view.execute("5", block=True)
543 ar = view.execute("5", block=True)
529 expected = [None] * len(view)
544 expected = [None] * len(view)
530 self.assertEqual(ar.pyout, expected)
545 self.assertEqual(ar.pyout, expected)
531
546
532 def test_execute_magic(self):
547 def test_execute_magic(self):
533 """execute accepts IPython commands"""
548 """execute accepts IPython commands"""
534 view = self.client[:]
549 view = self.client[:]
535 view.execute("a = 5")
550 view.execute("a = 5")
536 ar = view.execute("%whos", block=True)
551 ar = view.execute("%whos", block=True)
537 # this will raise, if that failed
552 # this will raise, if that failed
538 ar.get(5)
553 ar.get(5)
539 for stdout in ar.stdout:
554 for stdout in ar.stdout:
540 lines = stdout.splitlines()
555 lines = stdout.splitlines()
541 self.assertEqual(lines[0].split(), ['Variable', 'Type', 'Data/Info'])
556 self.assertEqual(lines[0].split(), ['Variable', 'Type', 'Data/Info'])
542 found = False
557 found = False
543 for line in lines[2:]:
558 for line in lines[2:]:
544 split = line.split()
559 split = line.split()
545 if split == ['a', 'int', '5']:
560 if split == ['a', 'int', '5']:
546 found = True
561 found = True
547 break
562 break
548 self.assertTrue(found, "whos output wrong: %s" % stdout)
563 self.assertTrue(found, "whos output wrong: %s" % stdout)
549
564
550 def test_execute_displaypub(self):
565 def test_execute_displaypub(self):
551 """execute tracks display_pub output"""
566 """execute tracks display_pub output"""
552 view = self.client[:]
567 view = self.client[:]
553 view.execute("from IPython.core.display import *")
568 view.execute("from IPython.core.display import *")
554 ar = view.execute("[ display(i) for i in range(5) ]", block=True)
569 ar = view.execute("[ display(i) for i in range(5) ]", block=True)
555
570
556 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
571 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
557 for outputs in ar.outputs:
572 for outputs in ar.outputs:
558 mimes = [ out['data'] for out in outputs ]
573 mimes = [ out['data'] for out in outputs ]
559 self.assertEqual(mimes, expected)
574 self.assertEqual(mimes, expected)
560
575
561 def test_apply_displaypub(self):
576 def test_apply_displaypub(self):
562 """apply tracks display_pub output"""
577 """apply tracks display_pub output"""
563 view = self.client[:]
578 view = self.client[:]
564 view.execute("from IPython.core.display import *")
579 view.execute("from IPython.core.display import *")
565
580
566 @interactive
581 @interactive
567 def publish():
582 def publish():
568 [ display(i) for i in range(5) ]
583 [ display(i) for i in range(5) ]
569
584
570 ar = view.apply_async(publish)
585 ar = view.apply_async(publish)
571 ar.get(5)
586 ar.get(5)
572 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
587 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
573 for outputs in ar.outputs:
588 for outputs in ar.outputs:
574 mimes = [ out['data'] for out in outputs ]
589 mimes = [ out['data'] for out in outputs ]
575 self.assertEqual(mimes, expected)
590 self.assertEqual(mimes, expected)
576
591
577 def test_execute_raises(self):
592 def test_execute_raises(self):
578 """exceptions in execute requests raise appropriately"""
593 """exceptions in execute requests raise appropriately"""
579 view = self.client[-1]
594 view = self.client[-1]
580 ar = view.execute("1/0")
595 ar = view.execute("1/0")
581 self.assertRaisesRemote(ZeroDivisionError, ar.get, 2)
596 self.assertRaisesRemote(ZeroDivisionError, ar.get, 2)
582
597
583 def test_remoteerror_render_exception(self):
598 def test_remoteerror_render_exception(self):
584 """RemoteErrors get nice tracebacks"""
599 """RemoteErrors get nice tracebacks"""
585 view = self.client[-1]
600 view = self.client[-1]
586 ar = view.execute("1/0")
601 ar = view.execute("1/0")
587 ip = get_ipython()
602 ip = get_ipython()
588 ip.user_ns['ar'] = ar
603 ip.user_ns['ar'] = ar
589 with capture_output() as io:
604 with capture_output() as io:
590 ip.run_cell("ar.get(2)")
605 ip.run_cell("ar.get(2)")
591
606
592 self.assertTrue('ZeroDivisionError' in io.stdout, io.stdout)
607 self.assertTrue('ZeroDivisionError' in io.stdout, io.stdout)
593
608
594 def test_compositeerror_render_exception(self):
609 def test_compositeerror_render_exception(self):
595 """CompositeErrors get nice tracebacks"""
610 """CompositeErrors get nice tracebacks"""
596 view = self.client[:]
611 view = self.client[:]
597 ar = view.execute("1/0")
612 ar = view.execute("1/0")
598 ip = get_ipython()
613 ip = get_ipython()
599 ip.user_ns['ar'] = ar
614 ip.user_ns['ar'] = ar
600 with capture_output() as io:
615 with capture_output() as io:
601 ip.run_cell("ar.get(2)")
616 ip.run_cell("ar.get(2)")
602
617
603 self.assertEqual(io.stdout.count('ZeroDivisionError'), len(view) * 2, io.stdout)
618 self.assertEqual(io.stdout.count('ZeroDivisionError'), len(view) * 2, io.stdout)
604 self.assertEqual(io.stdout.count('by zero'), len(view), io.stdout)
619 self.assertEqual(io.stdout.count('by zero'), len(view), io.stdout)
605 self.assertEqual(io.stdout.count(':execute'), len(view), io.stdout)
620 self.assertEqual(io.stdout.count(':execute'), len(view), io.stdout)
606
621
607 @dec.skipif_not_matplotlib
622 @dec.skipif_not_matplotlib
608 def test_magic_pylab(self):
623 def test_magic_pylab(self):
609 """%pylab works on engines"""
624 """%pylab works on engines"""
610 view = self.client[-1]
625 view = self.client[-1]
611 ar = view.execute("%pylab inline")
626 ar = view.execute("%pylab inline")
612 # at least check if this raised:
627 # at least check if this raised:
613 reply = ar.get(5)
628 reply = ar.get(5)
614 # include imports, in case user config
629 # include imports, in case user config
615 ar = view.execute("plot(rand(100))", silent=False)
630 ar = view.execute("plot(rand(100))", silent=False)
616 reply = ar.get(5)
631 reply = ar.get(5)
617 self.assertEqual(len(reply.outputs), 1)
632 self.assertEqual(len(reply.outputs), 1)
618 output = reply.outputs[0]
633 output = reply.outputs[0]
619 self.assertTrue("data" in output)
634 self.assertTrue("data" in output)
620 data = output['data']
635 data = output['data']
621 self.assertTrue("image/png" in data)
636 self.assertTrue("image/png" in data)
622
637
623 def test_func_default_func(self):
638 def test_func_default_func(self):
624 """interactively defined function as apply func default"""
639 """interactively defined function as apply func default"""
625 def foo():
640 def foo():
626 return 'foo'
641 return 'foo'
627
642
628 def bar(f=foo):
643 def bar(f=foo):
629 return f()
644 return f()
630
645
631 view = self.client[-1]
646 view = self.client[-1]
632 ar = view.apply_async(bar)
647 ar = view.apply_async(bar)
633 r = ar.get(10)
648 r = ar.get(10)
634 self.assertEqual(r, 'foo')
649 self.assertEqual(r, 'foo')
635 def test_data_pub_single(self):
650 def test_data_pub_single(self):
636 view = self.client[-1]
651 view = self.client[-1]
637 ar = view.execute('\n'.join([
652 ar = view.execute('\n'.join([
638 'from IPython.zmq.datapub import publish_data',
653 'from IPython.zmq.datapub import publish_data',
639 'for i in range(5):',
654 'for i in range(5):',
640 ' publish_data(dict(i=i))'
655 ' publish_data(dict(i=i))'
641 ]), block=False)
656 ]), block=False)
642 self.assertTrue(isinstance(ar.data, dict))
657 self.assertTrue(isinstance(ar.data, dict))
643 ar.get(5)
658 ar.get(5)
644 self.assertEqual(ar.data, dict(i=4))
659 self.assertEqual(ar.data, dict(i=4))
645
660
646 def test_data_pub(self):
661 def test_data_pub(self):
647 view = self.client[:]
662 view = self.client[:]
648 ar = view.execute('\n'.join([
663 ar = view.execute('\n'.join([
649 'from IPython.zmq.datapub import publish_data',
664 'from IPython.zmq.datapub import publish_data',
650 'for i in range(5):',
665 'for i in range(5):',
651 ' publish_data(dict(i=i))'
666 ' publish_data(dict(i=i))'
652 ]), block=False)
667 ]), block=False)
653 self.assertTrue(all(isinstance(d, dict) for d in ar.data))
668 self.assertTrue(all(isinstance(d, dict) for d in ar.data))
654 ar.get(5)
669 ar.get(5)
655 self.assertEqual(ar.data, [dict(i=4)] * len(ar))
670 self.assertEqual(ar.data, [dict(i=4)] * len(ar))
656
671
657 def test_can_list_arg(self):
672 def test_can_list_arg(self):
658 """args in lists are canned"""
673 """args in lists are canned"""
659 view = self.client[-1]
674 view = self.client[-1]
660 view['a'] = 128
675 view['a'] = 128
661 rA = pmod.Reference('a')
676 rA = pmod.Reference('a')
662 ar = view.apply_async(lambda x: x, [rA])
677 ar = view.apply_async(lambda x: x, [rA])
663 r = ar.get(5)
678 r = ar.get(5)
664 self.assertEqual(r, [128])
679 self.assertEqual(r, [128])
665
680
666 def test_can_dict_arg(self):
681 def test_can_dict_arg(self):
667 """args in dicts are canned"""
682 """args in dicts are canned"""
668 view = self.client[-1]
683 view = self.client[-1]
669 view['a'] = 128
684 view['a'] = 128
670 rA = pmod.Reference('a')
685 rA = pmod.Reference('a')
671 ar = view.apply_async(lambda x: x, dict(foo=rA))
686 ar = view.apply_async(lambda x: x, dict(foo=rA))
672 r = ar.get(5)
687 r = ar.get(5)
673 self.assertEqual(r, dict(foo=128))
688 self.assertEqual(r, dict(foo=128))
674
689
675 def test_can_list_kwarg(self):
690 def test_can_list_kwarg(self):
676 """kwargs in lists are canned"""
691 """kwargs in lists are canned"""
677 view = self.client[-1]
692 view = self.client[-1]
678 view['a'] = 128
693 view['a'] = 128
679 rA = pmod.Reference('a')
694 rA = pmod.Reference('a')
680 ar = view.apply_async(lambda x=5: x, x=[rA])
695 ar = view.apply_async(lambda x=5: x, x=[rA])
681 r = ar.get(5)
696 r = ar.get(5)
682 self.assertEqual(r, [128])
697 self.assertEqual(r, [128])
683
698
684 def test_can_dict_kwarg(self):
699 def test_can_dict_kwarg(self):
685 """kwargs in dicts are canned"""
700 """kwargs in dicts are canned"""
686 view = self.client[-1]
701 view = self.client[-1]
687 view['a'] = 128
702 view['a'] = 128
688 rA = pmod.Reference('a')
703 rA = pmod.Reference('a')
689 ar = view.apply_async(lambda x=5: x, dict(foo=rA))
704 ar = view.apply_async(lambda x=5: x, dict(foo=rA))
690 r = ar.get(5)
705 r = ar.get(5)
691 self.assertEqual(r, dict(foo=128))
706 self.assertEqual(r, dict(foo=128))
692
707
693 def test_map_ref(self):
708 def test_map_ref(self):
694 """view.map works with references"""
709 """view.map works with references"""
695 view = self.client[:]
710 view = self.client[:]
696 ranks = sorted(self.client.ids)
711 ranks = sorted(self.client.ids)
697 view.scatter('rank', ranks, flatten=True)
712 view.scatter('rank', ranks, flatten=True)
698 rrank = pmod.Reference('rank')
713 rrank = pmod.Reference('rank')
699
714
700 amr = view.map_async(lambda x: x*2, [rrank] * len(view))
715 amr = view.map_async(lambda x: x*2, [rrank] * len(view))
701 drank = amr.get(5)
716 drank = amr.get(5)
702 self.assertEqual(drank, [ r*2 for r in ranks ])
717 self.assertEqual(drank, [ r*2 for r in ranks ])
703
718
704 def test_nested_getitem_setitem(self):
719 def test_nested_getitem_setitem(self):
705 """get and set with view['a.b']"""
720 """get and set with view['a.b']"""
706 view = self.client[-1]
721 view = self.client[-1]
707 view.execute('\n'.join([
722 view.execute('\n'.join([
708 'class A(object): pass',
723 'class A(object): pass',
709 'a = A()',
724 'a = A()',
710 'a.b = 128',
725 'a.b = 128',
711 ]), block=True)
726 ]), block=True)
712 ra = pmod.Reference('a')
727 ra = pmod.Reference('a')
713
728
714 r = view.apply_sync(lambda x: x.b, ra)
729 r = view.apply_sync(lambda x: x.b, ra)
715 self.assertEqual(r, 128)
730 self.assertEqual(r, 128)
716 self.assertEqual(view['a.b'], 128)
731 self.assertEqual(view['a.b'], 128)
717
732
718 view['a.b'] = 0
733 view['a.b'] = 0
719
734
720 r = view.apply_sync(lambda x: x.b, ra)
735 r = view.apply_sync(lambda x: x.b, ra)
721 self.assertEqual(r, 0)
736 self.assertEqual(r, 0)
722 self.assertEqual(view['a.b'], 0)
737 self.assertEqual(view['a.b'], 0)
General Comments 0
You need to be logged in to leave comments. Login now