##// END OF EJS Templates
Merge pull request #2025 from jdmarch/win_xp_parallel_test_wait_for_cleanup...
Min RK -
r7690:b1eae246 merge
parent child Browse files
Show More
@@ -1,565 +1,573 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """test View objects
2 """test View objects
3
3
4 Authors:
4 Authors:
5
5
6 * Min RK
6 * Min RK
7 """
7 """
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import sys
19 import sys
20 import platform
20 import time
21 import time
21 from tempfile import mktemp
22 from tempfile import mktemp
22 from StringIO import StringIO
23 from StringIO import StringIO
23
24
24 import zmq
25 import zmq
25 from nose import SkipTest
26 from nose import SkipTest
26
27
27 from IPython.testing import decorators as dec
28 from IPython.testing import decorators as dec
28 from IPython.testing.ipunittest import ParametricTestCase
29 from IPython.testing.ipunittest import ParametricTestCase
29
30
30 from IPython import parallel as pmod
31 from IPython import parallel as pmod
31 from IPython.parallel import error
32 from IPython.parallel import error
32 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
33 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
33 from IPython.parallel import DirectView
34 from IPython.parallel import DirectView
34 from IPython.parallel.util import interactive
35 from IPython.parallel.util import interactive
35
36
36 from IPython.parallel.tests import add_engines
37 from IPython.parallel.tests import add_engines
37
38
38 from .clienttest import ClusterTestCase, crash, wait, skip_without
39 from .clienttest import ClusterTestCase, crash, wait, skip_without
39
40
40 def setup():
41 def setup():
41 add_engines(3, total=True)
42 add_engines(3, total=True)
42
43
43 class TestView(ClusterTestCase, ParametricTestCase):
44 class TestView(ClusterTestCase, ParametricTestCase):
44
45
46 def setUp(self):
47 # On Win XP, wait for resource cleanup, else parallel test group fails
48 if platform.system() == "Windows" and platform.win32_ver()[0] == "XP":
49 # 1 sec fails. 1.5 sec seems ok. Using 2 sec for margin of safety
50 time.sleep(2)
51 super(TestView, self).setUp()
52
45 def test_z_crash_mux(self):
53 def test_z_crash_mux(self):
46 """test graceful handling of engine death (direct)"""
54 """test graceful handling of engine death (direct)"""
47 raise SkipTest("crash tests disabled, due to undesirable crash reports")
55 raise SkipTest("crash tests disabled, due to undesirable crash reports")
48 # self.add_engines(1)
56 # self.add_engines(1)
49 eid = self.client.ids[-1]
57 eid = self.client.ids[-1]
50 ar = self.client[eid].apply_async(crash)
58 ar = self.client[eid].apply_async(crash)
51 self.assertRaisesRemote(error.EngineError, ar.get, 10)
59 self.assertRaisesRemote(error.EngineError, ar.get, 10)
52 eid = ar.engine_id
60 eid = ar.engine_id
53 tic = time.time()
61 tic = time.time()
54 while eid in self.client.ids and time.time()-tic < 5:
62 while eid in self.client.ids and time.time()-tic < 5:
55 time.sleep(.01)
63 time.sleep(.01)
56 self.client.spin()
64 self.client.spin()
57 self.assertFalse(eid in self.client.ids, "Engine should have died")
65 self.assertFalse(eid in self.client.ids, "Engine should have died")
58
66
59 def test_push_pull(self):
67 def test_push_pull(self):
60 """test pushing and pulling"""
68 """test pushing and pulling"""
61 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
69 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
62 t = self.client.ids[-1]
70 t = self.client.ids[-1]
63 v = self.client[t]
71 v = self.client[t]
64 push = v.push
72 push = v.push
65 pull = v.pull
73 pull = v.pull
66 v.block=True
74 v.block=True
67 nengines = len(self.client)
75 nengines = len(self.client)
68 push({'data':data})
76 push({'data':data})
69 d = pull('data')
77 d = pull('data')
70 self.assertEquals(d, data)
78 self.assertEquals(d, data)
71 self.client[:].push({'data':data})
79 self.client[:].push({'data':data})
72 d = self.client[:].pull('data', block=True)
80 d = self.client[:].pull('data', block=True)
73 self.assertEquals(d, nengines*[data])
81 self.assertEquals(d, nengines*[data])
74 ar = push({'data':data}, block=False)
82 ar = push({'data':data}, block=False)
75 self.assertTrue(isinstance(ar, AsyncResult))
83 self.assertTrue(isinstance(ar, AsyncResult))
76 r = ar.get()
84 r = ar.get()
77 ar = self.client[:].pull('data', block=False)
85 ar = self.client[:].pull('data', block=False)
78 self.assertTrue(isinstance(ar, AsyncResult))
86 self.assertTrue(isinstance(ar, AsyncResult))
79 r = ar.get()
87 r = ar.get()
80 self.assertEquals(r, nengines*[data])
88 self.assertEquals(r, nengines*[data])
81 self.client[:].push(dict(a=10,b=20))
89 self.client[:].push(dict(a=10,b=20))
82 r = self.client[:].pull(('a','b'), block=True)
90 r = self.client[:].pull(('a','b'), block=True)
83 self.assertEquals(r, nengines*[[10,20]])
91 self.assertEquals(r, nengines*[[10,20]])
84
92
85 def test_push_pull_function(self):
93 def test_push_pull_function(self):
86 "test pushing and pulling functions"
94 "test pushing and pulling functions"
87 def testf(x):
95 def testf(x):
88 return 2.0*x
96 return 2.0*x
89
97
90 t = self.client.ids[-1]
98 t = self.client.ids[-1]
91 v = self.client[t]
99 v = self.client[t]
92 v.block=True
100 v.block=True
93 push = v.push
101 push = v.push
94 pull = v.pull
102 pull = v.pull
95 execute = v.execute
103 execute = v.execute
96 push({'testf':testf})
104 push({'testf':testf})
97 r = pull('testf')
105 r = pull('testf')
98 self.assertEqual(r(1.0), testf(1.0))
106 self.assertEqual(r(1.0), testf(1.0))
99 execute('r = testf(10)')
107 execute('r = testf(10)')
100 r = pull('r')
108 r = pull('r')
101 self.assertEquals(r, testf(10))
109 self.assertEquals(r, testf(10))
102 ar = self.client[:].push({'testf':testf}, block=False)
110 ar = self.client[:].push({'testf':testf}, block=False)
103 ar.get()
111 ar.get()
104 ar = self.client[:].pull('testf', block=False)
112 ar = self.client[:].pull('testf', block=False)
105 rlist = ar.get()
113 rlist = ar.get()
106 for r in rlist:
114 for r in rlist:
107 self.assertEqual(r(1.0), testf(1.0))
115 self.assertEqual(r(1.0), testf(1.0))
108 execute("def g(x): return x*x")
116 execute("def g(x): return x*x")
109 r = pull(('testf','g'))
117 r = pull(('testf','g'))
110 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
118 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
111
119
112 def test_push_function_globals(self):
120 def test_push_function_globals(self):
113 """test that pushed functions have access to globals"""
121 """test that pushed functions have access to globals"""
114 @interactive
122 @interactive
115 def geta():
123 def geta():
116 return a
124 return a
117 # self.add_engines(1)
125 # self.add_engines(1)
118 v = self.client[-1]
126 v = self.client[-1]
119 v.block=True
127 v.block=True
120 v['f'] = geta
128 v['f'] = geta
121 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
129 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
122 v.execute('a=5')
130 v.execute('a=5')
123 v.execute('b=f()')
131 v.execute('b=f()')
124 self.assertEquals(v['b'], 5)
132 self.assertEquals(v['b'], 5)
125
133
126 def test_push_function_defaults(self):
134 def test_push_function_defaults(self):
127 """test that pushed functions preserve default args"""
135 """test that pushed functions preserve default args"""
128 def echo(a=10):
136 def echo(a=10):
129 return a
137 return a
130 v = self.client[-1]
138 v = self.client[-1]
131 v.block=True
139 v.block=True
132 v['f'] = echo
140 v['f'] = echo
133 v.execute('b=f()')
141 v.execute('b=f()')
134 self.assertEquals(v['b'], 10)
142 self.assertEquals(v['b'], 10)
135
143
136 def test_get_result(self):
144 def test_get_result(self):
137 """test getting results from the Hub."""
145 """test getting results from the Hub."""
138 c = pmod.Client(profile='iptest')
146 c = pmod.Client(profile='iptest')
139 # self.add_engines(1)
147 # self.add_engines(1)
140 t = c.ids[-1]
148 t = c.ids[-1]
141 v = c[t]
149 v = c[t]
142 v2 = self.client[t]
150 v2 = self.client[t]
143 ar = v.apply_async(wait, 1)
151 ar = v.apply_async(wait, 1)
144 # give the monitor time to notice the message
152 # give the monitor time to notice the message
145 time.sleep(.25)
153 time.sleep(.25)
146 ahr = v2.get_result(ar.msg_ids)
154 ahr = v2.get_result(ar.msg_ids)
147 self.assertTrue(isinstance(ahr, AsyncHubResult))
155 self.assertTrue(isinstance(ahr, AsyncHubResult))
148 self.assertEquals(ahr.get(), ar.get())
156 self.assertEquals(ahr.get(), ar.get())
149 ar2 = v2.get_result(ar.msg_ids)
157 ar2 = v2.get_result(ar.msg_ids)
150 self.assertFalse(isinstance(ar2, AsyncHubResult))
158 self.assertFalse(isinstance(ar2, AsyncHubResult))
151 c.spin()
159 c.spin()
152 c.close()
160 c.close()
153
161
154 def test_run_newline(self):
162 def test_run_newline(self):
155 """test that run appends newline to files"""
163 """test that run appends newline to files"""
156 tmpfile = mktemp()
164 tmpfile = mktemp()
157 with open(tmpfile, 'w') as f:
165 with open(tmpfile, 'w') as f:
158 f.write("""def g():
166 f.write("""def g():
159 return 5
167 return 5
160 """)
168 """)
161 v = self.client[-1]
169 v = self.client[-1]
162 v.run(tmpfile, block=True)
170 v.run(tmpfile, block=True)
163 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
171 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
164
172
165 def test_apply_tracked(self):
173 def test_apply_tracked(self):
166 """test tracking for apply"""
174 """test tracking for apply"""
167 # self.add_engines(1)
175 # self.add_engines(1)
168 t = self.client.ids[-1]
176 t = self.client.ids[-1]
169 v = self.client[t]
177 v = self.client[t]
170 v.block=False
178 v.block=False
171 def echo(n=1024*1024, **kwargs):
179 def echo(n=1024*1024, **kwargs):
172 with v.temp_flags(**kwargs):
180 with v.temp_flags(**kwargs):
173 return v.apply(lambda x: x, 'x'*n)
181 return v.apply(lambda x: x, 'x'*n)
174 ar = echo(1, track=False)
182 ar = echo(1, track=False)
175 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
183 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
176 self.assertTrue(ar.sent)
184 self.assertTrue(ar.sent)
177 ar = echo(track=True)
185 ar = echo(track=True)
178 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
186 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
179 self.assertEquals(ar.sent, ar._tracker.done)
187 self.assertEquals(ar.sent, ar._tracker.done)
180 ar._tracker.wait()
188 ar._tracker.wait()
181 self.assertTrue(ar.sent)
189 self.assertTrue(ar.sent)
182
190
183 def test_push_tracked(self):
191 def test_push_tracked(self):
184 t = self.client.ids[-1]
192 t = self.client.ids[-1]
185 ns = dict(x='x'*1024*1024)
193 ns = dict(x='x'*1024*1024)
186 v = self.client[t]
194 v = self.client[t]
187 ar = v.push(ns, block=False, track=False)
195 ar = v.push(ns, block=False, track=False)
188 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
196 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
189 self.assertTrue(ar.sent)
197 self.assertTrue(ar.sent)
190
198
191 ar = v.push(ns, block=False, track=True)
199 ar = v.push(ns, block=False, track=True)
192 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
200 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
193 ar._tracker.wait()
201 ar._tracker.wait()
194 self.assertEquals(ar.sent, ar._tracker.done)
202 self.assertEquals(ar.sent, ar._tracker.done)
195 self.assertTrue(ar.sent)
203 self.assertTrue(ar.sent)
196 ar.get()
204 ar.get()
197
205
198 def test_scatter_tracked(self):
206 def test_scatter_tracked(self):
199 t = self.client.ids
207 t = self.client.ids
200 x='x'*1024*1024
208 x='x'*1024*1024
201 ar = self.client[t].scatter('x', x, block=False, track=False)
209 ar = self.client[t].scatter('x', x, block=False, track=False)
202 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
210 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
203 self.assertTrue(ar.sent)
211 self.assertTrue(ar.sent)
204
212
205 ar = self.client[t].scatter('x', x, block=False, track=True)
213 ar = self.client[t].scatter('x', x, block=False, track=True)
206 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
214 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
207 self.assertEquals(ar.sent, ar._tracker.done)
215 self.assertEquals(ar.sent, ar._tracker.done)
208 ar._tracker.wait()
216 ar._tracker.wait()
209 self.assertTrue(ar.sent)
217 self.assertTrue(ar.sent)
210 ar.get()
218 ar.get()
211
219
212 def test_remote_reference(self):
220 def test_remote_reference(self):
213 v = self.client[-1]
221 v = self.client[-1]
214 v['a'] = 123
222 v['a'] = 123
215 ra = pmod.Reference('a')
223 ra = pmod.Reference('a')
216 b = v.apply_sync(lambda x: x, ra)
224 b = v.apply_sync(lambda x: x, ra)
217 self.assertEquals(b, 123)
225 self.assertEquals(b, 123)
218
226
219
227
220 def test_scatter_gather(self):
228 def test_scatter_gather(self):
221 view = self.client[:]
229 view = self.client[:]
222 seq1 = range(16)
230 seq1 = range(16)
223 view.scatter('a', seq1)
231 view.scatter('a', seq1)
224 seq2 = view.gather('a', block=True)
232 seq2 = view.gather('a', block=True)
225 self.assertEquals(seq2, seq1)
233 self.assertEquals(seq2, seq1)
226 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
234 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
227
235
228 @skip_without('numpy')
236 @skip_without('numpy')
229 def test_scatter_gather_numpy(self):
237 def test_scatter_gather_numpy(self):
230 import numpy
238 import numpy
231 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
239 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
232 view = self.client[:]
240 view = self.client[:]
233 a = numpy.arange(64)
241 a = numpy.arange(64)
234 view.scatter('a', a)
242 view.scatter('a', a)
235 b = view.gather('a', block=True)
243 b = view.gather('a', block=True)
236 assert_array_equal(b, a)
244 assert_array_equal(b, a)
237
245
238 def test_scatter_gather_lazy(self):
246 def test_scatter_gather_lazy(self):
239 """scatter/gather with targets='all'"""
247 """scatter/gather with targets='all'"""
240 view = self.client.direct_view(targets='all')
248 view = self.client.direct_view(targets='all')
241 x = range(64)
249 x = range(64)
242 view.scatter('x', x)
250 view.scatter('x', x)
243 gathered = view.gather('x', block=True)
251 gathered = view.gather('x', block=True)
244 self.assertEquals(gathered, x)
252 self.assertEquals(gathered, x)
245
253
246
254
247 @dec.known_failure_py3
255 @dec.known_failure_py3
248 @skip_without('numpy')
256 @skip_without('numpy')
249 def test_push_numpy_nocopy(self):
257 def test_push_numpy_nocopy(self):
250 import numpy
258 import numpy
251 view = self.client[:]
259 view = self.client[:]
252 a = numpy.arange(64)
260 a = numpy.arange(64)
253 view['A'] = a
261 view['A'] = a
254 @interactive
262 @interactive
255 def check_writeable(x):
263 def check_writeable(x):
256 return x.flags.writeable
264 return x.flags.writeable
257
265
258 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
266 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
259 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
267 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
260
268
261 view.push(dict(B=a))
269 view.push(dict(B=a))
262 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
270 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
263 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
271 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
264
272
265 @skip_without('numpy')
273 @skip_without('numpy')
266 def test_apply_numpy(self):
274 def test_apply_numpy(self):
267 """view.apply(f, ndarray)"""
275 """view.apply(f, ndarray)"""
268 import numpy
276 import numpy
269 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
277 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
270
278
271 A = numpy.random.random((100,100))
279 A = numpy.random.random((100,100))
272 view = self.client[-1]
280 view = self.client[-1]
273 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
281 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
274 B = A.astype(dt)
282 B = A.astype(dt)
275 C = view.apply_sync(lambda x:x, B)
283 C = view.apply_sync(lambda x:x, B)
276 assert_array_equal(B,C)
284 assert_array_equal(B,C)
277
285
278 def test_map(self):
286 def test_map(self):
279 view = self.client[:]
287 view = self.client[:]
280 def f(x):
288 def f(x):
281 return x**2
289 return x**2
282 data = range(16)
290 data = range(16)
283 r = view.map_sync(f, data)
291 r = view.map_sync(f, data)
284 self.assertEquals(r, map(f, data))
292 self.assertEquals(r, map(f, data))
285
293
286 def test_map_iterable(self):
294 def test_map_iterable(self):
287 """test map on iterables (direct)"""
295 """test map on iterables (direct)"""
288 view = self.client[:]
296 view = self.client[:]
289 # 101 is prime, so it won't be evenly distributed
297 # 101 is prime, so it won't be evenly distributed
290 arr = range(101)
298 arr = range(101)
291 # ensure it will be an iterator, even in Python 3
299 # ensure it will be an iterator, even in Python 3
292 it = iter(arr)
300 it = iter(arr)
293 r = view.map_sync(lambda x:x, arr)
301 r = view.map_sync(lambda x:x, arr)
294 self.assertEquals(r, list(arr))
302 self.assertEquals(r, list(arr))
295
303
296 def test_scatterGatherNonblocking(self):
304 def test_scatterGatherNonblocking(self):
297 data = range(16)
305 data = range(16)
298 view = self.client[:]
306 view = self.client[:]
299 view.scatter('a', data, block=False)
307 view.scatter('a', data, block=False)
300 ar = view.gather('a', block=False)
308 ar = view.gather('a', block=False)
301 self.assertEquals(ar.get(), data)
309 self.assertEquals(ar.get(), data)
302
310
303 @skip_without('numpy')
311 @skip_without('numpy')
304 def test_scatter_gather_numpy_nonblocking(self):
312 def test_scatter_gather_numpy_nonblocking(self):
305 import numpy
313 import numpy
306 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
314 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
307 a = numpy.arange(64)
315 a = numpy.arange(64)
308 view = self.client[:]
316 view = self.client[:]
309 ar = view.scatter('a', a, block=False)
317 ar = view.scatter('a', a, block=False)
310 self.assertTrue(isinstance(ar, AsyncResult))
318 self.assertTrue(isinstance(ar, AsyncResult))
311 amr = view.gather('a', block=False)
319 amr = view.gather('a', block=False)
312 self.assertTrue(isinstance(amr, AsyncMapResult))
320 self.assertTrue(isinstance(amr, AsyncMapResult))
313 assert_array_equal(amr.get(), a)
321 assert_array_equal(amr.get(), a)
314
322
315 def test_execute(self):
323 def test_execute(self):
316 view = self.client[:]
324 view = self.client[:]
317 # self.client.debug=True
325 # self.client.debug=True
318 execute = view.execute
326 execute = view.execute
319 ar = execute('c=30', block=False)
327 ar = execute('c=30', block=False)
320 self.assertTrue(isinstance(ar, AsyncResult))
328 self.assertTrue(isinstance(ar, AsyncResult))
321 ar = execute('d=[0,1,2]', block=False)
329 ar = execute('d=[0,1,2]', block=False)
322 self.client.wait(ar, 1)
330 self.client.wait(ar, 1)
323 self.assertEquals(len(ar.get()), len(self.client))
331 self.assertEquals(len(ar.get()), len(self.client))
324 for c in view['c']:
332 for c in view['c']:
325 self.assertEquals(c, 30)
333 self.assertEquals(c, 30)
326
334
327 def test_abort(self):
335 def test_abort(self):
328 view = self.client[-1]
336 view = self.client[-1]
329 ar = view.execute('import time; time.sleep(1)', block=False)
337 ar = view.execute('import time; time.sleep(1)', block=False)
330 ar2 = view.apply_async(lambda : 2)
338 ar2 = view.apply_async(lambda : 2)
331 ar3 = view.apply_async(lambda : 3)
339 ar3 = view.apply_async(lambda : 3)
332 view.abort(ar2)
340 view.abort(ar2)
333 view.abort(ar3.msg_ids)
341 view.abort(ar3.msg_ids)
334 self.assertRaises(error.TaskAborted, ar2.get)
342 self.assertRaises(error.TaskAborted, ar2.get)
335 self.assertRaises(error.TaskAborted, ar3.get)
343 self.assertRaises(error.TaskAborted, ar3.get)
336
344
337 def test_abort_all(self):
345 def test_abort_all(self):
338 """view.abort() aborts all outstanding tasks"""
346 """view.abort() aborts all outstanding tasks"""
339 view = self.client[-1]
347 view = self.client[-1]
340 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
348 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
341 view.abort()
349 view.abort()
342 view.wait(timeout=5)
350 view.wait(timeout=5)
343 for ar in ars[5:]:
351 for ar in ars[5:]:
344 self.assertRaises(error.TaskAborted, ar.get)
352 self.assertRaises(error.TaskAborted, ar.get)
345
353
346 def test_temp_flags(self):
354 def test_temp_flags(self):
347 view = self.client[-1]
355 view = self.client[-1]
348 view.block=True
356 view.block=True
349 with view.temp_flags(block=False):
357 with view.temp_flags(block=False):
350 self.assertFalse(view.block)
358 self.assertFalse(view.block)
351 self.assertTrue(view.block)
359 self.assertTrue(view.block)
352
360
353 @dec.known_failure_py3
361 @dec.known_failure_py3
354 def test_importer(self):
362 def test_importer(self):
355 view = self.client[-1]
363 view = self.client[-1]
356 view.clear(block=True)
364 view.clear(block=True)
357 with view.importer:
365 with view.importer:
358 import re
366 import re
359
367
360 @interactive
368 @interactive
361 def findall(pat, s):
369 def findall(pat, s):
362 # this globals() step isn't necessary in real code
370 # this globals() step isn't necessary in real code
363 # only to prevent a closure in the test
371 # only to prevent a closure in the test
364 re = globals()['re']
372 re = globals()['re']
365 return re.findall(pat, s)
373 return re.findall(pat, s)
366
374
367 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
375 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
368
376
369 def test_unicode_execute(self):
377 def test_unicode_execute(self):
370 """test executing unicode strings"""
378 """test executing unicode strings"""
371 v = self.client[-1]
379 v = self.client[-1]
372 v.block=True
380 v.block=True
373 if sys.version_info[0] >= 3:
381 if sys.version_info[0] >= 3:
374 code="a='é'"
382 code="a='é'"
375 else:
383 else:
376 code=u"a=u'é'"
384 code=u"a=u'é'"
377 v.execute(code)
385 v.execute(code)
378 self.assertEquals(v['a'], u'é')
386 self.assertEquals(v['a'], u'é')
379
387
380 def test_unicode_apply_result(self):
388 def test_unicode_apply_result(self):
381 """test unicode apply results"""
389 """test unicode apply results"""
382 v = self.client[-1]
390 v = self.client[-1]
383 r = v.apply_sync(lambda : u'é')
391 r = v.apply_sync(lambda : u'é')
384 self.assertEquals(r, u'é')
392 self.assertEquals(r, u'é')
385
393
386 def test_unicode_apply_arg(self):
394 def test_unicode_apply_arg(self):
387 """test passing unicode arguments to apply"""
395 """test passing unicode arguments to apply"""
388 v = self.client[-1]
396 v = self.client[-1]
389
397
390 @interactive
398 @interactive
391 def check_unicode(a, check):
399 def check_unicode(a, check):
392 assert isinstance(a, unicode), "%r is not unicode"%a
400 assert isinstance(a, unicode), "%r is not unicode"%a
393 assert isinstance(check, bytes), "%r is not bytes"%check
401 assert isinstance(check, bytes), "%r is not bytes"%check
394 assert a.encode('utf8') == check, "%s != %s"%(a,check)
402 assert a.encode('utf8') == check, "%s != %s"%(a,check)
395
403
396 for s in [ u'é', u'ßø®∫',u'asdf' ]:
404 for s in [ u'é', u'ßø®∫',u'asdf' ]:
397 try:
405 try:
398 v.apply_sync(check_unicode, s, s.encode('utf8'))
406 v.apply_sync(check_unicode, s, s.encode('utf8'))
399 except error.RemoteError as e:
407 except error.RemoteError as e:
400 if e.ename == 'AssertionError':
408 if e.ename == 'AssertionError':
401 self.fail(e.evalue)
409 self.fail(e.evalue)
402 else:
410 else:
403 raise e
411 raise e
404
412
405 def test_map_reference(self):
413 def test_map_reference(self):
406 """view.map(<Reference>, *seqs) should work"""
414 """view.map(<Reference>, *seqs) should work"""
407 v = self.client[:]
415 v = self.client[:]
408 v.scatter('n', self.client.ids, flatten=True)
416 v.scatter('n', self.client.ids, flatten=True)
409 v.execute("f = lambda x,y: x*y")
417 v.execute("f = lambda x,y: x*y")
410 rf = pmod.Reference('f')
418 rf = pmod.Reference('f')
411 nlist = list(range(10))
419 nlist = list(range(10))
412 mlist = nlist[::-1]
420 mlist = nlist[::-1]
413 expected = [ m*n for m,n in zip(mlist, nlist) ]
421 expected = [ m*n for m,n in zip(mlist, nlist) ]
414 result = v.map_sync(rf, mlist, nlist)
422 result = v.map_sync(rf, mlist, nlist)
415 self.assertEquals(result, expected)
423 self.assertEquals(result, expected)
416
424
417 def test_apply_reference(self):
425 def test_apply_reference(self):
418 """view.apply(<Reference>, *args) should work"""
426 """view.apply(<Reference>, *args) should work"""
419 v = self.client[:]
427 v = self.client[:]
420 v.scatter('n', self.client.ids, flatten=True)
428 v.scatter('n', self.client.ids, flatten=True)
421 v.execute("f = lambda x: n*x")
429 v.execute("f = lambda x: n*x")
422 rf = pmod.Reference('f')
430 rf = pmod.Reference('f')
423 result = v.apply_sync(rf, 5)
431 result = v.apply_sync(rf, 5)
424 expected = [ 5*id for id in self.client.ids ]
432 expected = [ 5*id for id in self.client.ids ]
425 self.assertEquals(result, expected)
433 self.assertEquals(result, expected)
426
434
427 def test_eval_reference(self):
435 def test_eval_reference(self):
428 v = self.client[self.client.ids[0]]
436 v = self.client[self.client.ids[0]]
429 v['g'] = range(5)
437 v['g'] = range(5)
430 rg = pmod.Reference('g[0]')
438 rg = pmod.Reference('g[0]')
431 echo = lambda x:x
439 echo = lambda x:x
432 self.assertEquals(v.apply_sync(echo, rg), 0)
440 self.assertEquals(v.apply_sync(echo, rg), 0)
433
441
434 def test_reference_nameerror(self):
442 def test_reference_nameerror(self):
435 v = self.client[self.client.ids[0]]
443 v = self.client[self.client.ids[0]]
436 r = pmod.Reference('elvis_has_left')
444 r = pmod.Reference('elvis_has_left')
437 echo = lambda x:x
445 echo = lambda x:x
438 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
446 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
439
447
440 def test_single_engine_map(self):
448 def test_single_engine_map(self):
441 e0 = self.client[self.client.ids[0]]
449 e0 = self.client[self.client.ids[0]]
442 r = range(5)
450 r = range(5)
443 check = [ -1*i for i in r ]
451 check = [ -1*i for i in r ]
444 result = e0.map_sync(lambda x: -1*x, r)
452 result = e0.map_sync(lambda x: -1*x, r)
445 self.assertEquals(result, check)
453 self.assertEquals(result, check)
446
454
447 def test_len(self):
455 def test_len(self):
448 """len(view) makes sense"""
456 """len(view) makes sense"""
449 e0 = self.client[self.client.ids[0]]
457 e0 = self.client[self.client.ids[0]]
450 yield self.assertEquals(len(e0), 1)
458 yield self.assertEquals(len(e0), 1)
451 v = self.client[:]
459 v = self.client[:]
452 yield self.assertEquals(len(v), len(self.client.ids))
460 yield self.assertEquals(len(v), len(self.client.ids))
453 v = self.client.direct_view('all')
461 v = self.client.direct_view('all')
454 yield self.assertEquals(len(v), len(self.client.ids))
462 yield self.assertEquals(len(v), len(self.client.ids))
455 v = self.client[:2]
463 v = self.client[:2]
456 yield self.assertEquals(len(v), 2)
464 yield self.assertEquals(len(v), 2)
457 v = self.client[:1]
465 v = self.client[:1]
458 yield self.assertEquals(len(v), 1)
466 yield self.assertEquals(len(v), 1)
459 v = self.client.load_balanced_view()
467 v = self.client.load_balanced_view()
460 yield self.assertEquals(len(v), len(self.client.ids))
468 yield self.assertEquals(len(v), len(self.client.ids))
461 # parametric tests seem to require manual closing?
469 # parametric tests seem to require manual closing?
462 self.client.close()
470 self.client.close()
463
471
464
472
465 # begin execute tests
473 # begin execute tests
466
474
467 def test_execute_reply(self):
475 def test_execute_reply(self):
468 e0 = self.client[self.client.ids[0]]
476 e0 = self.client[self.client.ids[0]]
469 e0.block = True
477 e0.block = True
470 ar = e0.execute("5", silent=False)
478 ar = e0.execute("5", silent=False)
471 er = ar.get()
479 er = ar.get()
472 self.assertEquals(str(er), "<ExecuteReply[%i]: 5>" % er.execution_count)
480 self.assertEquals(str(er), "<ExecuteReply[%i]: 5>" % er.execution_count)
473 self.assertEquals(er.pyout['data']['text/plain'], '5')
481 self.assertEquals(er.pyout['data']['text/plain'], '5')
474
482
475 def test_execute_reply_stdout(self):
483 def test_execute_reply_stdout(self):
476 e0 = self.client[self.client.ids[0]]
484 e0 = self.client[self.client.ids[0]]
477 e0.block = True
485 e0.block = True
478 ar = e0.execute("print (5)", silent=False)
486 ar = e0.execute("print (5)", silent=False)
479 er = ar.get()
487 er = ar.get()
480 self.assertEquals(er.stdout.strip(), '5')
488 self.assertEquals(er.stdout.strip(), '5')
481
489
482 def test_execute_pyout(self):
490 def test_execute_pyout(self):
483 """execute triggers pyout with silent=False"""
491 """execute triggers pyout with silent=False"""
484 view = self.client[:]
492 view = self.client[:]
485 ar = view.execute("5", silent=False, block=True)
493 ar = view.execute("5", silent=False, block=True)
486
494
487 expected = [{'text/plain' : '5'}] * len(view)
495 expected = [{'text/plain' : '5'}] * len(view)
488 mimes = [ out['data'] for out in ar.pyout ]
496 mimes = [ out['data'] for out in ar.pyout ]
489 self.assertEquals(mimes, expected)
497 self.assertEquals(mimes, expected)
490
498
491 def test_execute_silent(self):
499 def test_execute_silent(self):
492 """execute does not trigger pyout with silent=True"""
500 """execute does not trigger pyout with silent=True"""
493 view = self.client[:]
501 view = self.client[:]
494 ar = view.execute("5", block=True)
502 ar = view.execute("5", block=True)
495 expected = [None] * len(view)
503 expected = [None] * len(view)
496 self.assertEquals(ar.pyout, expected)
504 self.assertEquals(ar.pyout, expected)
497
505
498 def test_execute_magic(self):
506 def test_execute_magic(self):
499 """execute accepts IPython commands"""
507 """execute accepts IPython commands"""
500 view = self.client[:]
508 view = self.client[:]
501 view.execute("a = 5")
509 view.execute("a = 5")
502 ar = view.execute("%whos", block=True)
510 ar = view.execute("%whos", block=True)
503 # this will raise, if that failed
511 # this will raise, if that failed
504 ar.get(5)
512 ar.get(5)
505 for stdout in ar.stdout:
513 for stdout in ar.stdout:
506 lines = stdout.splitlines()
514 lines = stdout.splitlines()
507 self.assertEquals(lines[0].split(), ['Variable', 'Type', 'Data/Info'])
515 self.assertEquals(lines[0].split(), ['Variable', 'Type', 'Data/Info'])
508 found = False
516 found = False
509 for line in lines[2:]:
517 for line in lines[2:]:
510 split = line.split()
518 split = line.split()
511 if split == ['a', 'int', '5']:
519 if split == ['a', 'int', '5']:
512 found = True
520 found = True
513 break
521 break
514 self.assertTrue(found, "whos output wrong: %s" % stdout)
522 self.assertTrue(found, "whos output wrong: %s" % stdout)
515
523
516 def test_execute_displaypub(self):
524 def test_execute_displaypub(self):
517 """execute tracks display_pub output"""
525 """execute tracks display_pub output"""
518 view = self.client[:]
526 view = self.client[:]
519 view.execute("from IPython.core.display import *")
527 view.execute("from IPython.core.display import *")
520 ar = view.execute("[ display(i) for i in range(5) ]", block=True)
528 ar = view.execute("[ display(i) for i in range(5) ]", block=True)
521
529
522 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
530 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
523 for outputs in ar.outputs:
531 for outputs in ar.outputs:
524 mimes = [ out['data'] for out in outputs ]
532 mimes = [ out['data'] for out in outputs ]
525 self.assertEquals(mimes, expected)
533 self.assertEquals(mimes, expected)
526
534
527 def test_apply_displaypub(self):
535 def test_apply_displaypub(self):
528 """apply tracks display_pub output"""
536 """apply tracks display_pub output"""
529 view = self.client[:]
537 view = self.client[:]
530 view.execute("from IPython.core.display import *")
538 view.execute("from IPython.core.display import *")
531
539
532 @interactive
540 @interactive
533 def publish():
541 def publish():
534 [ display(i) for i in range(5) ]
542 [ display(i) for i in range(5) ]
535
543
536 ar = view.apply_async(publish)
544 ar = view.apply_async(publish)
537 ar.get(5)
545 ar.get(5)
538 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
546 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
539 for outputs in ar.outputs:
547 for outputs in ar.outputs:
540 mimes = [ out['data'] for out in outputs ]
548 mimes = [ out['data'] for out in outputs ]
541 self.assertEquals(mimes, expected)
549 self.assertEquals(mimes, expected)
542
550
543 def test_execute_raises(self):
551 def test_execute_raises(self):
544 """exceptions in execute requests raise appropriately"""
552 """exceptions in execute requests raise appropriately"""
545 view = self.client[-1]
553 view = self.client[-1]
546 ar = view.execute("1/0")
554 ar = view.execute("1/0")
547 self.assertRaisesRemote(ZeroDivisionError, ar.get, 2)
555 self.assertRaisesRemote(ZeroDivisionError, ar.get, 2)
548
556
549 @dec.skipif_not_matplotlib
557 @dec.skipif_not_matplotlib
550 def test_magic_pylab(self):
558 def test_magic_pylab(self):
551 """%pylab works on engines"""
559 """%pylab works on engines"""
552 view = self.client[-1]
560 view = self.client[-1]
553 ar = view.execute("%pylab inline")
561 ar = view.execute("%pylab inline")
554 # at least check if this raised:
562 # at least check if this raised:
555 reply = ar.get(5)
563 reply = ar.get(5)
556 # include imports, in case user config
564 # include imports, in case user config
557 ar = view.execute("plot(rand(100))", silent=False)
565 ar = view.execute("plot(rand(100))", silent=False)
558 reply = ar.get(5)
566 reply = ar.get(5)
559 self.assertEquals(len(reply.outputs), 1)
567 self.assertEquals(len(reply.outputs), 1)
560 output = reply.outputs[0]
568 output = reply.outputs[0]
561 self.assertTrue("data" in output)
569 self.assertTrue("data" in output)
562 data = output['data']
570 data = output['data']
563 self.assertTrue("image/png" in data)
571 self.assertTrue("image/png" in data)
564
572
565
573
General Comments 0
You need to be logged in to leave comments. Login now