##// END OF EJS Templates
test that pushing numpy arrays is zero-copy
MinRK -
Show More
@@ -1,507 +1,524 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """test View objects
2 """test View objects
3
3
4 Authors:
4 Authors:
5
5
6 * Min RK
6 * Min RK
7 """
7 """
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import sys
19 import sys
20 import time
20 import time
21 from tempfile import mktemp
21 from tempfile import mktemp
22 from StringIO import StringIO
22 from StringIO import StringIO
23
23
24 import zmq
24 import zmq
25 from nose import SkipTest
25 from nose import SkipTest
26
26
27 from IPython.testing import decorators as dec
27 from IPython.testing import decorators as dec
28
28
29 from IPython import parallel as pmod
29 from IPython import parallel as pmod
30 from IPython.parallel import error
30 from IPython.parallel import error
31 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
31 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
32 from IPython.parallel import DirectView
32 from IPython.parallel import DirectView
33 from IPython.parallel.util import interactive
33 from IPython.parallel.util import interactive
34
34
35 from IPython.parallel.tests import add_engines
35 from IPython.parallel.tests import add_engines
36
36
37 from .clienttest import ClusterTestCase, crash, wait, skip_without
37 from .clienttest import ClusterTestCase, crash, wait, skip_without
38
38
39 def setup():
39 def setup():
40 add_engines(3, total=True)
40 add_engines(3, total=True)
41
41
42 class TestView(ClusterTestCase):
42 class TestView(ClusterTestCase):
43
43
44 def test_z_crash_mux(self):
44 def test_z_crash_mux(self):
45 """test graceful handling of engine death (direct)"""
45 """test graceful handling of engine death (direct)"""
46 raise SkipTest("crash tests disabled, due to undesirable crash reports")
46 raise SkipTest("crash tests disabled, due to undesirable crash reports")
47 # self.add_engines(1)
47 # self.add_engines(1)
48 eid = self.client.ids[-1]
48 eid = self.client.ids[-1]
49 ar = self.client[eid].apply_async(crash)
49 ar = self.client[eid].apply_async(crash)
50 self.assertRaisesRemote(error.EngineError, ar.get, 10)
50 self.assertRaisesRemote(error.EngineError, ar.get, 10)
51 eid = ar.engine_id
51 eid = ar.engine_id
52 tic = time.time()
52 tic = time.time()
53 while eid in self.client.ids and time.time()-tic < 5:
53 while eid in self.client.ids and time.time()-tic < 5:
54 time.sleep(.01)
54 time.sleep(.01)
55 self.client.spin()
55 self.client.spin()
56 self.assertFalse(eid in self.client.ids, "Engine should have died")
56 self.assertFalse(eid in self.client.ids, "Engine should have died")
57
57
58 def test_push_pull(self):
58 def test_push_pull(self):
59 """test pushing and pulling"""
59 """test pushing and pulling"""
60 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
60 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
61 t = self.client.ids[-1]
61 t = self.client.ids[-1]
62 v = self.client[t]
62 v = self.client[t]
63 push = v.push
63 push = v.push
64 pull = v.pull
64 pull = v.pull
65 v.block=True
65 v.block=True
66 nengines = len(self.client)
66 nengines = len(self.client)
67 push({'data':data})
67 push({'data':data})
68 d = pull('data')
68 d = pull('data')
69 self.assertEquals(d, data)
69 self.assertEquals(d, data)
70 self.client[:].push({'data':data})
70 self.client[:].push({'data':data})
71 d = self.client[:].pull('data', block=True)
71 d = self.client[:].pull('data', block=True)
72 self.assertEquals(d, nengines*[data])
72 self.assertEquals(d, nengines*[data])
73 ar = push({'data':data}, block=False)
73 ar = push({'data':data}, block=False)
74 self.assertTrue(isinstance(ar, AsyncResult))
74 self.assertTrue(isinstance(ar, AsyncResult))
75 r = ar.get()
75 r = ar.get()
76 ar = self.client[:].pull('data', block=False)
76 ar = self.client[:].pull('data', block=False)
77 self.assertTrue(isinstance(ar, AsyncResult))
77 self.assertTrue(isinstance(ar, AsyncResult))
78 r = ar.get()
78 r = ar.get()
79 self.assertEquals(r, nengines*[data])
79 self.assertEquals(r, nengines*[data])
80 self.client[:].push(dict(a=10,b=20))
80 self.client[:].push(dict(a=10,b=20))
81 r = self.client[:].pull(('a','b'), block=True)
81 r = self.client[:].pull(('a','b'), block=True)
82 self.assertEquals(r, nengines*[[10,20]])
82 self.assertEquals(r, nengines*[[10,20]])
83
83
84 def test_push_pull_function(self):
84 def test_push_pull_function(self):
85 "test pushing and pulling functions"
85 "test pushing and pulling functions"
86 def testf(x):
86 def testf(x):
87 return 2.0*x
87 return 2.0*x
88
88
89 t = self.client.ids[-1]
89 t = self.client.ids[-1]
90 v = self.client[t]
90 v = self.client[t]
91 v.block=True
91 v.block=True
92 push = v.push
92 push = v.push
93 pull = v.pull
93 pull = v.pull
94 execute = v.execute
94 execute = v.execute
95 push({'testf':testf})
95 push({'testf':testf})
96 r = pull('testf')
96 r = pull('testf')
97 self.assertEqual(r(1.0), testf(1.0))
97 self.assertEqual(r(1.0), testf(1.0))
98 execute('r = testf(10)')
98 execute('r = testf(10)')
99 r = pull('r')
99 r = pull('r')
100 self.assertEquals(r, testf(10))
100 self.assertEquals(r, testf(10))
101 ar = self.client[:].push({'testf':testf}, block=False)
101 ar = self.client[:].push({'testf':testf}, block=False)
102 ar.get()
102 ar.get()
103 ar = self.client[:].pull('testf', block=False)
103 ar = self.client[:].pull('testf', block=False)
104 rlist = ar.get()
104 rlist = ar.get()
105 for r in rlist:
105 for r in rlist:
106 self.assertEqual(r(1.0), testf(1.0))
106 self.assertEqual(r(1.0), testf(1.0))
107 execute("def g(x): return x*x")
107 execute("def g(x): return x*x")
108 r = pull(('testf','g'))
108 r = pull(('testf','g'))
109 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
109 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
110
110
111 def test_push_function_globals(self):
111 def test_push_function_globals(self):
112 """test that pushed functions have access to globals"""
112 """test that pushed functions have access to globals"""
113 @interactive
113 @interactive
114 def geta():
114 def geta():
115 return a
115 return a
116 # self.add_engines(1)
116 # self.add_engines(1)
117 v = self.client[-1]
117 v = self.client[-1]
118 v.block=True
118 v.block=True
119 v['f'] = geta
119 v['f'] = geta
120 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
120 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
121 v.execute('a=5')
121 v.execute('a=5')
122 v.execute('b=f()')
122 v.execute('b=f()')
123 self.assertEquals(v['b'], 5)
123 self.assertEquals(v['b'], 5)
124
124
125 def test_push_function_defaults(self):
125 def test_push_function_defaults(self):
126 """test that pushed functions preserve default args"""
126 """test that pushed functions preserve default args"""
127 def echo(a=10):
127 def echo(a=10):
128 return a
128 return a
129 v = self.client[-1]
129 v = self.client[-1]
130 v.block=True
130 v.block=True
131 v['f'] = echo
131 v['f'] = echo
132 v.execute('b=f()')
132 v.execute('b=f()')
133 self.assertEquals(v['b'], 10)
133 self.assertEquals(v['b'], 10)
134
134
135 def test_get_result(self):
135 def test_get_result(self):
136 """test getting results from the Hub."""
136 """test getting results from the Hub."""
137 c = pmod.Client(profile='iptest')
137 c = pmod.Client(profile='iptest')
138 # self.add_engines(1)
138 # self.add_engines(1)
139 t = c.ids[-1]
139 t = c.ids[-1]
140 v = c[t]
140 v = c[t]
141 v2 = self.client[t]
141 v2 = self.client[t]
142 ar = v.apply_async(wait, 1)
142 ar = v.apply_async(wait, 1)
143 # give the monitor time to notice the message
143 # give the monitor time to notice the message
144 time.sleep(.25)
144 time.sleep(.25)
145 ahr = v2.get_result(ar.msg_ids)
145 ahr = v2.get_result(ar.msg_ids)
146 self.assertTrue(isinstance(ahr, AsyncHubResult))
146 self.assertTrue(isinstance(ahr, AsyncHubResult))
147 self.assertEquals(ahr.get(), ar.get())
147 self.assertEquals(ahr.get(), ar.get())
148 ar2 = v2.get_result(ar.msg_ids)
148 ar2 = v2.get_result(ar.msg_ids)
149 self.assertFalse(isinstance(ar2, AsyncHubResult))
149 self.assertFalse(isinstance(ar2, AsyncHubResult))
150 c.spin()
150 c.spin()
151 c.close()
151 c.close()
152
152
153 def test_run_newline(self):
153 def test_run_newline(self):
154 """test that run appends newline to files"""
154 """test that run appends newline to files"""
155 tmpfile = mktemp()
155 tmpfile = mktemp()
156 with open(tmpfile, 'w') as f:
156 with open(tmpfile, 'w') as f:
157 f.write("""def g():
157 f.write("""def g():
158 return 5
158 return 5
159 """)
159 """)
160 v = self.client[-1]
160 v = self.client[-1]
161 v.run(tmpfile, block=True)
161 v.run(tmpfile, block=True)
162 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
162 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
163
163
164 def test_apply_tracked(self):
164 def test_apply_tracked(self):
165 """test tracking for apply"""
165 """test tracking for apply"""
166 # self.add_engines(1)
166 # self.add_engines(1)
167 t = self.client.ids[-1]
167 t = self.client.ids[-1]
168 v = self.client[t]
168 v = self.client[t]
169 v.block=False
169 v.block=False
170 def echo(n=1024*1024, **kwargs):
170 def echo(n=1024*1024, **kwargs):
171 with v.temp_flags(**kwargs):
171 with v.temp_flags(**kwargs):
172 return v.apply(lambda x: x, 'x'*n)
172 return v.apply(lambda x: x, 'x'*n)
173 ar = echo(1, track=False)
173 ar = echo(1, track=False)
174 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
174 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
175 self.assertTrue(ar.sent)
175 self.assertTrue(ar.sent)
176 ar = echo(track=True)
176 ar = echo(track=True)
177 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
177 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
178 self.assertEquals(ar.sent, ar._tracker.done)
178 self.assertEquals(ar.sent, ar._tracker.done)
179 ar._tracker.wait()
179 ar._tracker.wait()
180 self.assertTrue(ar.sent)
180 self.assertTrue(ar.sent)
181
181
182 def test_push_tracked(self):
182 def test_push_tracked(self):
183 t = self.client.ids[-1]
183 t = self.client.ids[-1]
184 ns = dict(x='x'*1024*1024)
184 ns = dict(x='x'*1024*1024)
185 v = self.client[t]
185 v = self.client[t]
186 ar = v.push(ns, block=False, track=False)
186 ar = v.push(ns, block=False, track=False)
187 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
187 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
188 self.assertTrue(ar.sent)
188 self.assertTrue(ar.sent)
189
189
190 ar = v.push(ns, block=False, track=True)
190 ar = v.push(ns, block=False, track=True)
191 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
191 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
192 ar._tracker.wait()
192 ar._tracker.wait()
193 self.assertEquals(ar.sent, ar._tracker.done)
193 self.assertEquals(ar.sent, ar._tracker.done)
194 self.assertTrue(ar.sent)
194 self.assertTrue(ar.sent)
195 ar.get()
195 ar.get()
196
196
197 def test_scatter_tracked(self):
197 def test_scatter_tracked(self):
198 t = self.client.ids
198 t = self.client.ids
199 x='x'*1024*1024
199 x='x'*1024*1024
200 ar = self.client[t].scatter('x', x, block=False, track=False)
200 ar = self.client[t].scatter('x', x, block=False, track=False)
201 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
201 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
202 self.assertTrue(ar.sent)
202 self.assertTrue(ar.sent)
203
203
204 ar = self.client[t].scatter('x', x, block=False, track=True)
204 ar = self.client[t].scatter('x', x, block=False, track=True)
205 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
205 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
206 self.assertEquals(ar.sent, ar._tracker.done)
206 self.assertEquals(ar.sent, ar._tracker.done)
207 ar._tracker.wait()
207 ar._tracker.wait()
208 self.assertTrue(ar.sent)
208 self.assertTrue(ar.sent)
209 ar.get()
209 ar.get()
210
210
211 def test_remote_reference(self):
211 def test_remote_reference(self):
212 v = self.client[-1]
212 v = self.client[-1]
213 v['a'] = 123
213 v['a'] = 123
214 ra = pmod.Reference('a')
214 ra = pmod.Reference('a')
215 b = v.apply_sync(lambda x: x, ra)
215 b = v.apply_sync(lambda x: x, ra)
216 self.assertEquals(b, 123)
216 self.assertEquals(b, 123)
217
217
218
218
219 def test_scatter_gather(self):
219 def test_scatter_gather(self):
220 view = self.client[:]
220 view = self.client[:]
221 seq1 = range(16)
221 seq1 = range(16)
222 view.scatter('a', seq1)
222 view.scatter('a', seq1)
223 seq2 = view.gather('a', block=True)
223 seq2 = view.gather('a', block=True)
224 self.assertEquals(seq2, seq1)
224 self.assertEquals(seq2, seq1)
225 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
225 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
226
226
227 @skip_without('numpy')
227 @skip_without('numpy')
228 def test_scatter_gather_numpy(self):
228 def test_scatter_gather_numpy(self):
229 import numpy
229 import numpy
230 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
230 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
231 view = self.client[:]
231 view = self.client[:]
232 a = numpy.arange(64)
232 a = numpy.arange(64)
233 view.scatter('a', a)
233 view.scatter('a', a)
234 b = view.gather('a', block=True)
234 b = view.gather('a', block=True)
235 assert_array_equal(b, a)
235 assert_array_equal(b, a)
236
237 @skip_without('numpy')
238 def test_push_numpy_nocopy(self):
239 import numpy
240 view = self.client[:]
241 a = numpy.arange(64)
242 view['A'] = a
243 @interactive
244 def check_writeable(x):
245 return x.flags.writeable
246
247 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
248 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
249
250 view.push(dict(B=a))
251 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
252 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
236
253
237 def test_map(self):
254 def test_map(self):
238 view = self.client[:]
255 view = self.client[:]
239 def f(x):
256 def f(x):
240 return x**2
257 return x**2
241 data = range(16)
258 data = range(16)
242 r = view.map_sync(f, data)
259 r = view.map_sync(f, data)
243 self.assertEquals(r, map(f, data))
260 self.assertEquals(r, map(f, data))
244
261
245 def test_map_iterable(self):
262 def test_map_iterable(self):
246 """test map on iterables (direct)"""
263 """test map on iterables (direct)"""
247 view = self.client[:]
264 view = self.client[:]
248 # 101 is prime, so it won't be evenly distributed
265 # 101 is prime, so it won't be evenly distributed
249 arr = range(101)
266 arr = range(101)
250 # ensure it will be an iterator, even in Python 3
267 # ensure it will be an iterator, even in Python 3
251 it = iter(arr)
268 it = iter(arr)
252 r = view.map_sync(lambda x:x, arr)
269 r = view.map_sync(lambda x:x, arr)
253 self.assertEquals(r, list(arr))
270 self.assertEquals(r, list(arr))
254
271
255 def test_scatterGatherNonblocking(self):
272 def test_scatterGatherNonblocking(self):
256 data = range(16)
273 data = range(16)
257 view = self.client[:]
274 view = self.client[:]
258 view.scatter('a', data, block=False)
275 view.scatter('a', data, block=False)
259 ar = view.gather('a', block=False)
276 ar = view.gather('a', block=False)
260 self.assertEquals(ar.get(), data)
277 self.assertEquals(ar.get(), data)
261
278
262 @skip_without('numpy')
279 @skip_without('numpy')
263 def test_scatter_gather_numpy_nonblocking(self):
280 def test_scatter_gather_numpy_nonblocking(self):
264 import numpy
281 import numpy
265 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
282 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
266 a = numpy.arange(64)
283 a = numpy.arange(64)
267 view = self.client[:]
284 view = self.client[:]
268 ar = view.scatter('a', a, block=False)
285 ar = view.scatter('a', a, block=False)
269 self.assertTrue(isinstance(ar, AsyncResult))
286 self.assertTrue(isinstance(ar, AsyncResult))
270 amr = view.gather('a', block=False)
287 amr = view.gather('a', block=False)
271 self.assertTrue(isinstance(amr, AsyncMapResult))
288 self.assertTrue(isinstance(amr, AsyncMapResult))
272 assert_array_equal(amr.get(), a)
289 assert_array_equal(amr.get(), a)
273
290
274 def test_execute(self):
291 def test_execute(self):
275 view = self.client[:]
292 view = self.client[:]
276 # self.client.debug=True
293 # self.client.debug=True
277 execute = view.execute
294 execute = view.execute
278 ar = execute('c=30', block=False)
295 ar = execute('c=30', block=False)
279 self.assertTrue(isinstance(ar, AsyncResult))
296 self.assertTrue(isinstance(ar, AsyncResult))
280 ar = execute('d=[0,1,2]', block=False)
297 ar = execute('d=[0,1,2]', block=False)
281 self.client.wait(ar, 1)
298 self.client.wait(ar, 1)
282 self.assertEquals(len(ar.get()), len(self.client))
299 self.assertEquals(len(ar.get()), len(self.client))
283 for c in view['c']:
300 for c in view['c']:
284 self.assertEquals(c, 30)
301 self.assertEquals(c, 30)
285
302
286 def test_abort(self):
303 def test_abort(self):
287 view = self.client[-1]
304 view = self.client[-1]
288 ar = view.execute('import time; time.sleep(1)', block=False)
305 ar = view.execute('import time; time.sleep(1)', block=False)
289 ar2 = view.apply_async(lambda : 2)
306 ar2 = view.apply_async(lambda : 2)
290 ar3 = view.apply_async(lambda : 3)
307 ar3 = view.apply_async(lambda : 3)
291 view.abort(ar2)
308 view.abort(ar2)
292 view.abort(ar3.msg_ids)
309 view.abort(ar3.msg_ids)
293 self.assertRaises(error.TaskAborted, ar2.get)
310 self.assertRaises(error.TaskAborted, ar2.get)
294 self.assertRaises(error.TaskAborted, ar3.get)
311 self.assertRaises(error.TaskAborted, ar3.get)
295
312
296 def test_abort_all(self):
313 def test_abort_all(self):
297 """view.abort() aborts all outstanding tasks"""
314 """view.abort() aborts all outstanding tasks"""
298 view = self.client[-1]
315 view = self.client[-1]
299 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
316 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
300 view.abort()
317 view.abort()
301 view.wait(timeout=5)
318 view.wait(timeout=5)
302 for ar in ars[5:]:
319 for ar in ars[5:]:
303 self.assertRaises(error.TaskAborted, ar.get)
320 self.assertRaises(error.TaskAborted, ar.get)
304
321
305 def test_temp_flags(self):
322 def test_temp_flags(self):
306 view = self.client[-1]
323 view = self.client[-1]
307 view.block=True
324 view.block=True
308 with view.temp_flags(block=False):
325 with view.temp_flags(block=False):
309 self.assertFalse(view.block)
326 self.assertFalse(view.block)
310 self.assertTrue(view.block)
327 self.assertTrue(view.block)
311
328
312 @dec.known_failure_py3
329 @dec.known_failure_py3
313 def test_importer(self):
330 def test_importer(self):
314 view = self.client[-1]
331 view = self.client[-1]
315 view.clear(block=True)
332 view.clear(block=True)
316 with view.importer:
333 with view.importer:
317 import re
334 import re
318
335
319 @interactive
336 @interactive
320 def findall(pat, s):
337 def findall(pat, s):
321 # this globals() step isn't necessary in real code
338 # this globals() step isn't necessary in real code
322 # only to prevent a closure in the test
339 # only to prevent a closure in the test
323 re = globals()['re']
340 re = globals()['re']
324 return re.findall(pat, s)
341 return re.findall(pat, s)
325
342
326 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
343 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
327
344
328 # parallel magic tests
345 # parallel magic tests
329
346
330 def test_magic_px_blocking(self):
347 def test_magic_px_blocking(self):
331 ip = get_ipython()
348 ip = get_ipython()
332 v = self.client[-1]
349 v = self.client[-1]
333 v.activate()
350 v.activate()
334 v.block=True
351 v.block=True
335
352
336 ip.magic_px('a=5')
353 ip.magic_px('a=5')
337 self.assertEquals(v['a'], 5)
354 self.assertEquals(v['a'], 5)
338 ip.magic_px('a=10')
355 ip.magic_px('a=10')
339 self.assertEquals(v['a'], 10)
356 self.assertEquals(v['a'], 10)
340 sio = StringIO()
357 sio = StringIO()
341 savestdout = sys.stdout
358 savestdout = sys.stdout
342 sys.stdout = sio
359 sys.stdout = sio
343 # just 'print a' worst ~99% of the time, but this ensures that
360 # just 'print a' worst ~99% of the time, but this ensures that
344 # the stdout message has arrived when the result is finished:
361 # the stdout message has arrived when the result is finished:
345 ip.magic_px('import sys,time;print (a); sys.stdout.flush();time.sleep(0.2)')
362 ip.magic_px('import sys,time;print (a); sys.stdout.flush();time.sleep(0.2)')
346 sys.stdout = savestdout
363 sys.stdout = savestdout
347 buf = sio.getvalue()
364 buf = sio.getvalue()
348 self.assertTrue('[stdout:' in buf, buf)
365 self.assertTrue('[stdout:' in buf, buf)
349 self.assertTrue(buf.rstrip().endswith('10'))
366 self.assertTrue(buf.rstrip().endswith('10'))
350 self.assertRaisesRemote(ZeroDivisionError, ip.magic_px, '1/0')
367 self.assertRaisesRemote(ZeroDivisionError, ip.magic_px, '1/0')
351
368
352 def test_magic_px_nonblocking(self):
369 def test_magic_px_nonblocking(self):
353 ip = get_ipython()
370 ip = get_ipython()
354 v = self.client[-1]
371 v = self.client[-1]
355 v.activate()
372 v.activate()
356 v.block=False
373 v.block=False
357
374
358 ip.magic_px('a=5')
375 ip.magic_px('a=5')
359 self.assertEquals(v['a'], 5)
376 self.assertEquals(v['a'], 5)
360 ip.magic_px('a=10')
377 ip.magic_px('a=10')
361 self.assertEquals(v['a'], 10)
378 self.assertEquals(v['a'], 10)
362 sio = StringIO()
379 sio = StringIO()
363 savestdout = sys.stdout
380 savestdout = sys.stdout
364 sys.stdout = sio
381 sys.stdout = sio
365 ip.magic_px('print a')
382 ip.magic_px('print a')
366 sys.stdout = savestdout
383 sys.stdout = savestdout
367 buf = sio.getvalue()
384 buf = sio.getvalue()
368 self.assertFalse('[stdout:%i]'%v.targets in buf)
385 self.assertFalse('[stdout:%i]'%v.targets in buf)
369 ip.magic_px('1/0')
386 ip.magic_px('1/0')
370 ar = v.get_result(-1)
387 ar = v.get_result(-1)
371 self.assertRaisesRemote(ZeroDivisionError, ar.get)
388 self.assertRaisesRemote(ZeroDivisionError, ar.get)
372
389
373 def test_magic_autopx_blocking(self):
390 def test_magic_autopx_blocking(self):
374 ip = get_ipython()
391 ip = get_ipython()
375 v = self.client[-1]
392 v = self.client[-1]
376 v.activate()
393 v.activate()
377 v.block=True
394 v.block=True
378
395
379 sio = StringIO()
396 sio = StringIO()
380 savestdout = sys.stdout
397 savestdout = sys.stdout
381 sys.stdout = sio
398 sys.stdout = sio
382 ip.magic_autopx()
399 ip.magic_autopx()
383 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
400 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
384 ip.run_cell('print b')
401 ip.run_cell('print b')
385 ip.run_cell("b/c")
402 ip.run_cell("b/c")
386 ip.run_code(compile('b*=2', '', 'single'))
403 ip.run_code(compile('b*=2', '', 'single'))
387 ip.magic_autopx()
404 ip.magic_autopx()
388 sys.stdout = savestdout
405 sys.stdout = savestdout
389 output = sio.getvalue().strip()
406 output = sio.getvalue().strip()
390 self.assertTrue(output.startswith('%autopx enabled'))
407 self.assertTrue(output.startswith('%autopx enabled'))
391 self.assertTrue(output.endswith('%autopx disabled'))
408 self.assertTrue(output.endswith('%autopx disabled'))
392 self.assertTrue('RemoteError: ZeroDivisionError' in output)
409 self.assertTrue('RemoteError: ZeroDivisionError' in output)
393 ar = v.get_result(-2)
410 ar = v.get_result(-2)
394 self.assertEquals(v['a'], 5)
411 self.assertEquals(v['a'], 5)
395 self.assertEquals(v['b'], 20)
412 self.assertEquals(v['b'], 20)
396 self.assertRaisesRemote(ZeroDivisionError, ar.get)
413 self.assertRaisesRemote(ZeroDivisionError, ar.get)
397
414
398 def test_magic_autopx_nonblocking(self):
415 def test_magic_autopx_nonblocking(self):
399 ip = get_ipython()
416 ip = get_ipython()
400 v = self.client[-1]
417 v = self.client[-1]
401 v.activate()
418 v.activate()
402 v.block=False
419 v.block=False
403
420
404 sio = StringIO()
421 sio = StringIO()
405 savestdout = sys.stdout
422 savestdout = sys.stdout
406 sys.stdout = sio
423 sys.stdout = sio
407 ip.magic_autopx()
424 ip.magic_autopx()
408 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
425 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
409 ip.run_cell('print b')
426 ip.run_cell('print b')
410 ip.run_cell("b/c")
427 ip.run_cell("b/c")
411 ip.run_code(compile('b*=2', '', 'single'))
428 ip.run_code(compile('b*=2', '', 'single'))
412 ip.magic_autopx()
429 ip.magic_autopx()
413 sys.stdout = savestdout
430 sys.stdout = savestdout
414 output = sio.getvalue().strip()
431 output = sio.getvalue().strip()
415 self.assertTrue(output.startswith('%autopx enabled'))
432 self.assertTrue(output.startswith('%autopx enabled'))
416 self.assertTrue(output.endswith('%autopx disabled'))
433 self.assertTrue(output.endswith('%autopx disabled'))
417 self.assertFalse('ZeroDivisionError' in output)
434 self.assertFalse('ZeroDivisionError' in output)
418 ar = v.get_result(-2)
435 ar = v.get_result(-2)
419 self.assertEquals(v['a'], 5)
436 self.assertEquals(v['a'], 5)
420 self.assertEquals(v['b'], 20)
437 self.assertEquals(v['b'], 20)
421 self.assertRaisesRemote(ZeroDivisionError, ar.get)
438 self.assertRaisesRemote(ZeroDivisionError, ar.get)
422
439
423 def test_magic_result(self):
440 def test_magic_result(self):
424 ip = get_ipython()
441 ip = get_ipython()
425 v = self.client[-1]
442 v = self.client[-1]
426 v.activate()
443 v.activate()
427 v['a'] = 111
444 v['a'] = 111
428 ra = v['a']
445 ra = v['a']
429
446
430 ar = ip.magic_result()
447 ar = ip.magic_result()
431 self.assertEquals(ar.msg_ids, [v.history[-1]])
448 self.assertEquals(ar.msg_ids, [v.history[-1]])
432 self.assertEquals(ar.get(), 111)
449 self.assertEquals(ar.get(), 111)
433 ar = ip.magic_result('-2')
450 ar = ip.magic_result('-2')
434 self.assertEquals(ar.msg_ids, [v.history[-2]])
451 self.assertEquals(ar.msg_ids, [v.history[-2]])
435
452
436 def test_unicode_execute(self):
453 def test_unicode_execute(self):
437 """test executing unicode strings"""
454 """test executing unicode strings"""
438 v = self.client[-1]
455 v = self.client[-1]
439 v.block=True
456 v.block=True
440 if sys.version_info[0] >= 3:
457 if sys.version_info[0] >= 3:
441 code="a='é'"
458 code="a='é'"
442 else:
459 else:
443 code=u"a=u'é'"
460 code=u"a=u'é'"
444 v.execute(code)
461 v.execute(code)
445 self.assertEquals(v['a'], u'é')
462 self.assertEquals(v['a'], u'é')
446
463
447 def test_unicode_apply_result(self):
464 def test_unicode_apply_result(self):
448 """test unicode apply results"""
465 """test unicode apply results"""
449 v = self.client[-1]
466 v = self.client[-1]
450 r = v.apply_sync(lambda : u'é')
467 r = v.apply_sync(lambda : u'é')
451 self.assertEquals(r, u'é')
468 self.assertEquals(r, u'é')
452
469
453 def test_unicode_apply_arg(self):
470 def test_unicode_apply_arg(self):
454 """test passing unicode arguments to apply"""
471 """test passing unicode arguments to apply"""
455 v = self.client[-1]
472 v = self.client[-1]
456
473
457 @interactive
474 @interactive
458 def check_unicode(a, check):
475 def check_unicode(a, check):
459 assert isinstance(a, unicode), "%r is not unicode"%a
476 assert isinstance(a, unicode), "%r is not unicode"%a
460 assert isinstance(check, bytes), "%r is not bytes"%check
477 assert isinstance(check, bytes), "%r is not bytes"%check
461 assert a.encode('utf8') == check, "%s != %s"%(a,check)
478 assert a.encode('utf8') == check, "%s != %s"%(a,check)
462
479
463 for s in [ u'é', u'ßø®∫',u'asdf' ]:
480 for s in [ u'é', u'ßø®∫',u'asdf' ]:
464 try:
481 try:
465 v.apply_sync(check_unicode, s, s.encode('utf8'))
482 v.apply_sync(check_unicode, s, s.encode('utf8'))
466 except error.RemoteError as e:
483 except error.RemoteError as e:
467 if e.ename == 'AssertionError':
484 if e.ename == 'AssertionError':
468 self.fail(e.evalue)
485 self.fail(e.evalue)
469 else:
486 else:
470 raise e
487 raise e
471
488
472 def test_map_reference(self):
489 def test_map_reference(self):
473 """view.map(<Reference>, *seqs) should work"""
490 """view.map(<Reference>, *seqs) should work"""
474 v = self.client[:]
491 v = self.client[:]
475 v.scatter('n', self.client.ids, flatten=True)
492 v.scatter('n', self.client.ids, flatten=True)
476 v.execute("f = lambda x,y: x*y")
493 v.execute("f = lambda x,y: x*y")
477 rf = pmod.Reference('f')
494 rf = pmod.Reference('f')
478 nlist = list(range(10))
495 nlist = list(range(10))
479 mlist = nlist[::-1]
496 mlist = nlist[::-1]
480 expected = [ m*n for m,n in zip(mlist, nlist) ]
497 expected = [ m*n for m,n in zip(mlist, nlist) ]
481 result = v.map_sync(rf, mlist, nlist)
498 result = v.map_sync(rf, mlist, nlist)
482 self.assertEquals(result, expected)
499 self.assertEquals(result, expected)
483
500
484 def test_apply_reference(self):
501 def test_apply_reference(self):
485 """view.apply(<Reference>, *args) should work"""
502 """view.apply(<Reference>, *args) should work"""
486 v = self.client[:]
503 v = self.client[:]
487 v.scatter('n', self.client.ids, flatten=True)
504 v.scatter('n', self.client.ids, flatten=True)
488 v.execute("f = lambda x: n*x")
505 v.execute("f = lambda x: n*x")
489 rf = pmod.Reference('f')
506 rf = pmod.Reference('f')
490 result = v.apply_sync(rf, 5)
507 result = v.apply_sync(rf, 5)
491 expected = [ 5*id for id in self.client.ids ]
508 expected = [ 5*id for id in self.client.ids ]
492 self.assertEquals(result, expected)
509 self.assertEquals(result, expected)
493
510
494 def test_eval_reference(self):
511 def test_eval_reference(self):
495 v = self.client[self.client.ids[0]]
512 v = self.client[self.client.ids[0]]
496 v['g'] = range(5)
513 v['g'] = range(5)
497 rg = pmod.Reference('g[0]')
514 rg = pmod.Reference('g[0]')
498 echo = lambda x:x
515 echo = lambda x:x
499 self.assertEquals(v.apply_sync(echo, rg), 0)
516 self.assertEquals(v.apply_sync(echo, rg), 0)
500
517
501 def test_reference_nameerror(self):
518 def test_reference_nameerror(self):
502 v = self.client[self.client.ids[0]]
519 v = self.client[self.client.ids[0]]
503 r = pmod.Reference('elvis_has_left')
520 r = pmod.Reference('elvis_has_left')
504 echo = lambda x:x
521 echo = lambda x:x
505 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
522 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
506
523
507
524
General Comments 0
You need to be logged in to leave comments. Login now