##// END OF EJS Templates
use crash that will also kill a Windows engine...
MinRK -
Show More
@@ -1,117 +1,128 b''
1 """base class for parallel client tests"""
1 """base class for parallel client tests"""
2
2
3 #-------------------------------------------------------------------------------
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2011 The IPython Development Team
4 # Copyright (C) 2011 The IPython Development Team
5 #
5 #
6 # Distributed under the terms of the BSD License. The full license is in
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9
9
10 import sys
10 import sys
11 import tempfile
11 import tempfile
12 import time
12 import time
13
13
14 from nose import SkipTest
14 from nose import SkipTest
15
15
16 import zmq
16 import zmq
17 from zmq.tests import BaseZMQTestCase
17 from zmq.tests import BaseZMQTestCase
18
18
19 from IPython.external.decorator import decorator
19 from IPython.external.decorator import decorator
20
20
21 from IPython.parallel import error
21 from IPython.parallel import error
22 from IPython.parallel import Client
22 from IPython.parallel import Client
23 from IPython.parallel.tests import processes,add_engines
23 from IPython.parallel.tests import processes,add_engines
24
24
25 # simple tasks for use in apply tests
25 # simple tasks for use in apply tests
26
26
27 def segfault():
27 def segfault():
28 """this will segfault"""
28 """this will segfault"""
29 import ctypes
29 import ctypes
30 ctypes.memset(-1,0,1)
30 ctypes.memset(-1,0,1)
31
31
32 def crash():
33 """from stdlib crashers in the test suite"""
34 import types
35 if sys.platform.startswith('win'):
36 import ctypes
37 ctypes.windll.kernel32.SetErrorMode(0x0002);
38
39 co = types.CodeType(0, 0, 0, 0, b'\x04\x71\x00\x00',
40 (), (), (), '', '', 1, b'')
41 exec(co)
42
32 def wait(n):
43 def wait(n):
33 """sleep for a time"""
44 """sleep for a time"""
34 import time
45 import time
35 time.sleep(n)
46 time.sleep(n)
36 return n
47 return n
37
48
38 def raiser(eclass):
49 def raiser(eclass):
39 """raise an exception"""
50 """raise an exception"""
40 raise eclass()
51 raise eclass()
41
52
42 # test decorator for skipping tests when libraries are unavailable
53 # test decorator for skipping tests when libraries are unavailable
43 def skip_without(*names):
54 def skip_without(*names):
44 """skip a test if some names are not importable"""
55 """skip a test if some names are not importable"""
45 @decorator
56 @decorator
46 def skip_without_names(f, *args, **kwargs):
57 def skip_without_names(f, *args, **kwargs):
47 """decorator to skip tests in the absence of numpy."""
58 """decorator to skip tests in the absence of numpy."""
48 for name in names:
59 for name in names:
49 try:
60 try:
50 __import__(name)
61 __import__(name)
51 except ImportError:
62 except ImportError:
52 raise SkipTest
63 raise SkipTest
53 return f(*args, **kwargs)
64 return f(*args, **kwargs)
54 return skip_without_names
65 return skip_without_names
55
66
56 class ClusterTestCase(BaseZMQTestCase):
67 class ClusterTestCase(BaseZMQTestCase):
57
68
58 def add_engines(self, n=1, block=True):
69 def add_engines(self, n=1, block=True):
59 """add multiple engines to our cluster"""
70 """add multiple engines to our cluster"""
60 self.engines.extend(add_engines(n))
71 self.engines.extend(add_engines(n))
61 if block:
72 if block:
62 self.wait_on_engines()
73 self.wait_on_engines()
63
74
64 def wait_on_engines(self, timeout=5):
75 def wait_on_engines(self, timeout=5):
65 """wait for our engines to connect."""
76 """wait for our engines to connect."""
66 n = len(self.engines)+self.base_engine_count
77 n = len(self.engines)+self.base_engine_count
67 tic = time.time()
78 tic = time.time()
68 while time.time()-tic < timeout and len(self.client.ids) < n:
79 while time.time()-tic < timeout and len(self.client.ids) < n:
69 time.sleep(0.1)
80 time.sleep(0.1)
70
81
71 assert not len(self.client.ids) < n, "waiting for engines timed out"
82 assert not len(self.client.ids) < n, "waiting for engines timed out"
72
83
73 def connect_client(self):
84 def connect_client(self):
74 """connect a client with my Context, and track its sockets for cleanup"""
85 """connect a client with my Context, and track its sockets for cleanup"""
75 c = Client(profile='iptest', context=self.context)
86 c = Client(profile='iptest', context=self.context)
76 for name in filter(lambda n:n.endswith('socket'), dir(c)):
87 for name in filter(lambda n:n.endswith('socket'), dir(c)):
77 s = getattr(c, name)
88 s = getattr(c, name)
78 s.setsockopt(zmq.LINGER, 0)
89 s.setsockopt(zmq.LINGER, 0)
79 self.sockets.append(s)
90 self.sockets.append(s)
80 return c
91 return c
81
92
82 def assertRaisesRemote(self, etype, f, *args, **kwargs):
93 def assertRaisesRemote(self, etype, f, *args, **kwargs):
83 try:
94 try:
84 try:
95 try:
85 f(*args, **kwargs)
96 f(*args, **kwargs)
86 except error.CompositeError as e:
97 except error.CompositeError as e:
87 e.raise_exception()
98 e.raise_exception()
88 except error.RemoteError as e:
99 except error.RemoteError as e:
89 self.assertEquals(etype.__name__, e.ename, "Should have raised %r, but raised %r"%(e.ename, etype.__name__))
100 self.assertEquals(etype.__name__, e.ename, "Should have raised %r, but raised %r"%(etype.__name__, e.ename))
90 else:
101 else:
91 self.fail("should have raised a RemoteError")
102 self.fail("should have raised a RemoteError")
92
103
93 def setUp(self):
104 def setUp(self):
94 BaseZMQTestCase.setUp(self)
105 BaseZMQTestCase.setUp(self)
95 self.client = self.connect_client()
106 self.client = self.connect_client()
96 # start every test with clean engine namespaces:
107 # start every test with clean engine namespaces:
97 self.client.clear(block=True)
108 self.client.clear(block=True)
98 self.base_engine_count=len(self.client.ids)
109 self.base_engine_count=len(self.client.ids)
99 self.engines=[]
110 self.engines=[]
100
111
101 def tearDown(self):
112 def tearDown(self):
102 # self.client.clear(block=True)
113 # self.client.clear(block=True)
103 # close fds:
114 # close fds:
104 for e in filter(lambda e: e.poll() is not None, processes):
115 for e in filter(lambda e: e.poll() is not None, processes):
105 processes.remove(e)
116 processes.remove(e)
106
117
107 # allow flushing of incoming messages to prevent crash on socket close
118 # allow flushing of incoming messages to prevent crash on socket close
108 self.client.wait(timeout=2)
119 self.client.wait(timeout=2)
109 # time.sleep(2)
120 # time.sleep(2)
110 self.client.spin()
121 self.client.spin()
111 self.client.close()
122 self.client.close()
112 BaseZMQTestCase.tearDown(self)
123 BaseZMQTestCase.tearDown(self)
113 # this will be redundant when pyzmq merges PR #88
124 # this will be redundant when pyzmq merges PR #88
114 # self.context.term()
125 # self.context.term()
115 # print tempfile.TemporaryFile().fileno(),
126 # print tempfile.TemporaryFile().fileno(),
116 # sys.stdout.flush()
127 # sys.stdout.flush()
117 No newline at end of file
128
@@ -1,414 +1,418 b''
1 """test View objects"""
1 """test View objects"""
2 #-------------------------------------------------------------------------------
2 #-------------------------------------------------------------------------------
3 # Copyright (C) 2011 The IPython Development Team
3 # Copyright (C) 2011 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-------------------------------------------------------------------------------
7 #-------------------------------------------------------------------------------
8
8
9 #-------------------------------------------------------------------------------
9 #-------------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-------------------------------------------------------------------------------
11 #-------------------------------------------------------------------------------
12
12
13 import sys
13 import sys
14 import time
14 import time
15 from tempfile import mktemp
15 from tempfile import mktemp
16 from StringIO import StringIO
16 from StringIO import StringIO
17
17
18 import zmq
18 import zmq
19
19
20 from IPython import parallel as pmod
20 from IPython import parallel as pmod
21 from IPython.parallel import error
21 from IPython.parallel import error
22 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
22 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
23 from IPython.parallel import LoadBalancedView, DirectView
23 from IPython.parallel import LoadBalancedView, DirectView
24 from IPython.parallel.util import interactive
24 from IPython.parallel.util import interactive
25
25
26 from IPython.parallel.tests import add_engines
26 from IPython.parallel.tests import add_engines
27
27
28 from .clienttest import ClusterTestCase, segfault, wait, skip_without
28 from .clienttest import ClusterTestCase, crash, wait, skip_without
29
29
30 def setup():
30 def setup():
31 add_engines(3)
31 add_engines(3)
32
32
33 class TestView(ClusterTestCase):
33 class TestView(ClusterTestCase):
34
34
35 def test_segfault_task(self):
35 def test_crash_task(self):
36 """test graceful handling of engine death (balanced)"""
36 """test graceful handling of engine death (balanced)"""
37 # self.add_engines(1)
37 # self.add_engines(1)
38 ar = self.client[-1].apply_async(segfault)
38 ar = self.client[-1].apply_async(crash)
39 self.assertRaisesRemote(error.EngineError, ar.get)
39 self.assertRaisesRemote(error.EngineError, ar.get)
40 eid = ar.engine_id
40 eid = ar.engine_id
41 while eid in self.client.ids:
41 tic = time.time()
42 while eid in self.client.ids and time.time()-tic < 5:
42 time.sleep(.01)
43 time.sleep(.01)
43 self.client.spin()
44 self.client.spin()
45 self.assertFalse(eid in self.client.ids, "Engine should have died")
44
46
45 def test_segfault_mux(self):
47 def test_crash_mux(self):
46 """test graceful handling of engine death (direct)"""
48 """test graceful handling of engine death (direct)"""
47 # self.add_engines(1)
49 # self.add_engines(1)
48 eid = self.client.ids[-1]
50 eid = self.client.ids[-1]
49 ar = self.client[eid].apply_async(segfault)
51 ar = self.client[eid].apply_async(crash)
50 self.assertRaisesRemote(error.EngineError, ar.get)
52 self.assertRaisesRemote(error.EngineError, ar.get)
51 eid = ar.engine_id
53 eid = ar.engine_id
52 while eid in self.client.ids:
54 tic = time.time()
55 while eid in self.client.ids and time.time()-tic < 5:
53 time.sleep(.01)
56 time.sleep(.01)
54 self.client.spin()
57 self.client.spin()
58 self.assertFalse(eid in self.client.ids, "Engine should have died")
55
59
56 def test_push_pull(self):
60 def test_push_pull(self):
57 """test pushing and pulling"""
61 """test pushing and pulling"""
58 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
62 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
59 t = self.client.ids[-1]
63 t = self.client.ids[-1]
60 v = self.client[t]
64 v = self.client[t]
61 push = v.push
65 push = v.push
62 pull = v.pull
66 pull = v.pull
63 v.block=True
67 v.block=True
64 nengines = len(self.client)
68 nengines = len(self.client)
65 push({'data':data})
69 push({'data':data})
66 d = pull('data')
70 d = pull('data')
67 self.assertEquals(d, data)
71 self.assertEquals(d, data)
68 self.client[:].push({'data':data})
72 self.client[:].push({'data':data})
69 d = self.client[:].pull('data', block=True)
73 d = self.client[:].pull('data', block=True)
70 self.assertEquals(d, nengines*[data])
74 self.assertEquals(d, nengines*[data])
71 ar = push({'data':data}, block=False)
75 ar = push({'data':data}, block=False)
72 self.assertTrue(isinstance(ar, AsyncResult))
76 self.assertTrue(isinstance(ar, AsyncResult))
73 r = ar.get()
77 r = ar.get()
74 ar = self.client[:].pull('data', block=False)
78 ar = self.client[:].pull('data', block=False)
75 self.assertTrue(isinstance(ar, AsyncResult))
79 self.assertTrue(isinstance(ar, AsyncResult))
76 r = ar.get()
80 r = ar.get()
77 self.assertEquals(r, nengines*[data])
81 self.assertEquals(r, nengines*[data])
78 self.client[:].push(dict(a=10,b=20))
82 self.client[:].push(dict(a=10,b=20))
79 r = self.client[:].pull(('a','b'), block=True)
83 r = self.client[:].pull(('a','b'), block=True)
80 self.assertEquals(r, nengines*[[10,20]])
84 self.assertEquals(r, nengines*[[10,20]])
81
85
82 def test_push_pull_function(self):
86 def test_push_pull_function(self):
83 "test pushing and pulling functions"
87 "test pushing and pulling functions"
84 def testf(x):
88 def testf(x):
85 return 2.0*x
89 return 2.0*x
86
90
87 t = self.client.ids[-1]
91 t = self.client.ids[-1]
88 v = self.client[t]
92 v = self.client[t]
89 v.block=True
93 v.block=True
90 push = v.push
94 push = v.push
91 pull = v.pull
95 pull = v.pull
92 execute = v.execute
96 execute = v.execute
93 push({'testf':testf})
97 push({'testf':testf})
94 r = pull('testf')
98 r = pull('testf')
95 self.assertEqual(r(1.0), testf(1.0))
99 self.assertEqual(r(1.0), testf(1.0))
96 execute('r = testf(10)')
100 execute('r = testf(10)')
97 r = pull('r')
101 r = pull('r')
98 self.assertEquals(r, testf(10))
102 self.assertEquals(r, testf(10))
99 ar = self.client[:].push({'testf':testf}, block=False)
103 ar = self.client[:].push({'testf':testf}, block=False)
100 ar.get()
104 ar.get()
101 ar = self.client[:].pull('testf', block=False)
105 ar = self.client[:].pull('testf', block=False)
102 rlist = ar.get()
106 rlist = ar.get()
103 for r in rlist:
107 for r in rlist:
104 self.assertEqual(r(1.0), testf(1.0))
108 self.assertEqual(r(1.0), testf(1.0))
105 execute("def g(x): return x*x")
109 execute("def g(x): return x*x")
106 r = pull(('testf','g'))
110 r = pull(('testf','g'))
107 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
111 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
108
112
109 def test_push_function_globals(self):
113 def test_push_function_globals(self):
110 """test that pushed functions have access to globals"""
114 """test that pushed functions have access to globals"""
111 @interactive
115 @interactive
112 def geta():
116 def geta():
113 return a
117 return a
114 # self.add_engines(1)
118 # self.add_engines(1)
115 v = self.client[-1]
119 v = self.client[-1]
116 v.block=True
120 v.block=True
117 v['f'] = geta
121 v['f'] = geta
118 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
122 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
119 v.execute('a=5')
123 v.execute('a=5')
120 v.execute('b=f()')
124 v.execute('b=f()')
121 self.assertEquals(v['b'], 5)
125 self.assertEquals(v['b'], 5)
122
126
123 def test_push_function_defaults(self):
127 def test_push_function_defaults(self):
124 """test that pushed functions preserve default args"""
128 """test that pushed functions preserve default args"""
125 def echo(a=10):
129 def echo(a=10):
126 return a
130 return a
127 v = self.client[-1]
131 v = self.client[-1]
128 v.block=True
132 v.block=True
129 v['f'] = echo
133 v['f'] = echo
130 v.execute('b=f()')
134 v.execute('b=f()')
131 self.assertEquals(v['b'], 10)
135 self.assertEquals(v['b'], 10)
132
136
133 def test_get_result(self):
137 def test_get_result(self):
134 """test getting results from the Hub."""
138 """test getting results from the Hub."""
135 c = pmod.Client(profile='iptest')
139 c = pmod.Client(profile='iptest')
136 # self.add_engines(1)
140 # self.add_engines(1)
137 t = c.ids[-1]
141 t = c.ids[-1]
138 v = c[t]
142 v = c[t]
139 v2 = self.client[t]
143 v2 = self.client[t]
140 ar = v.apply_async(wait, 1)
144 ar = v.apply_async(wait, 1)
141 # give the monitor time to notice the message
145 # give the monitor time to notice the message
142 time.sleep(.25)
146 time.sleep(.25)
143 ahr = v2.get_result(ar.msg_ids)
147 ahr = v2.get_result(ar.msg_ids)
144 self.assertTrue(isinstance(ahr, AsyncHubResult))
148 self.assertTrue(isinstance(ahr, AsyncHubResult))
145 self.assertEquals(ahr.get(), ar.get())
149 self.assertEquals(ahr.get(), ar.get())
146 ar2 = v2.get_result(ar.msg_ids)
150 ar2 = v2.get_result(ar.msg_ids)
147 self.assertFalse(isinstance(ar2, AsyncHubResult))
151 self.assertFalse(isinstance(ar2, AsyncHubResult))
148 c.spin()
152 c.spin()
149 c.close()
153 c.close()
150
154
151 def test_run_newline(self):
155 def test_run_newline(self):
152 """test that run appends newline to files"""
156 """test that run appends newline to files"""
153 tmpfile = mktemp()
157 tmpfile = mktemp()
154 with open(tmpfile, 'w') as f:
158 with open(tmpfile, 'w') as f:
155 f.write("""def g():
159 f.write("""def g():
156 return 5
160 return 5
157 """)
161 """)
158 v = self.client[-1]
162 v = self.client[-1]
159 v.run(tmpfile, block=True)
163 v.run(tmpfile, block=True)
160 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
164 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
161
165
162 def test_apply_tracked(self):
166 def test_apply_tracked(self):
163 """test tracking for apply"""
167 """test tracking for apply"""
164 # self.add_engines(1)
168 # self.add_engines(1)
165 t = self.client.ids[-1]
169 t = self.client.ids[-1]
166 v = self.client[t]
170 v = self.client[t]
167 v.block=False
171 v.block=False
168 def echo(n=1024*1024, **kwargs):
172 def echo(n=1024*1024, **kwargs):
169 with v.temp_flags(**kwargs):
173 with v.temp_flags(**kwargs):
170 return v.apply(lambda x: x, 'x'*n)
174 return v.apply(lambda x: x, 'x'*n)
171 ar = echo(1, track=False)
175 ar = echo(1, track=False)
172 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
176 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
173 self.assertTrue(ar.sent)
177 self.assertTrue(ar.sent)
174 ar = echo(track=True)
178 ar = echo(track=True)
175 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
179 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
176 self.assertEquals(ar.sent, ar._tracker.done)
180 self.assertEquals(ar.sent, ar._tracker.done)
177 ar._tracker.wait()
181 ar._tracker.wait()
178 self.assertTrue(ar.sent)
182 self.assertTrue(ar.sent)
179
183
180 def test_push_tracked(self):
184 def test_push_tracked(self):
181 t = self.client.ids[-1]
185 t = self.client.ids[-1]
182 ns = dict(x='x'*1024*1024)
186 ns = dict(x='x'*1024*1024)
183 v = self.client[t]
187 v = self.client[t]
184 ar = v.push(ns, block=False, track=False)
188 ar = v.push(ns, block=False, track=False)
185 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
189 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
186 self.assertTrue(ar.sent)
190 self.assertTrue(ar.sent)
187
191
188 ar = v.push(ns, block=False, track=True)
192 ar = v.push(ns, block=False, track=True)
189 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
193 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
190 self.assertEquals(ar.sent, ar._tracker.done)
194 self.assertEquals(ar.sent, ar._tracker.done)
191 ar._tracker.wait()
195 ar._tracker.wait()
192 self.assertTrue(ar.sent)
196 self.assertTrue(ar.sent)
193 ar.get()
197 ar.get()
194
198
195 def test_scatter_tracked(self):
199 def test_scatter_tracked(self):
196 t = self.client.ids
200 t = self.client.ids
197 x='x'*1024*1024
201 x='x'*1024*1024
198 ar = self.client[t].scatter('x', x, block=False, track=False)
202 ar = self.client[t].scatter('x', x, block=False, track=False)
199 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
203 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
200 self.assertTrue(ar.sent)
204 self.assertTrue(ar.sent)
201
205
202 ar = self.client[t].scatter('x', x, block=False, track=True)
206 ar = self.client[t].scatter('x', x, block=False, track=True)
203 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
207 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
204 self.assertEquals(ar.sent, ar._tracker.done)
208 self.assertEquals(ar.sent, ar._tracker.done)
205 ar._tracker.wait()
209 ar._tracker.wait()
206 self.assertTrue(ar.sent)
210 self.assertTrue(ar.sent)
207 ar.get()
211 ar.get()
208
212
209 def test_remote_reference(self):
213 def test_remote_reference(self):
210 v = self.client[-1]
214 v = self.client[-1]
211 v['a'] = 123
215 v['a'] = 123
212 ra = pmod.Reference('a')
216 ra = pmod.Reference('a')
213 b = v.apply_sync(lambda x: x, ra)
217 b = v.apply_sync(lambda x: x, ra)
214 self.assertEquals(b, 123)
218 self.assertEquals(b, 123)
215
219
216
220
217 def test_scatter_gather(self):
221 def test_scatter_gather(self):
218 view = self.client[:]
222 view = self.client[:]
219 seq1 = range(16)
223 seq1 = range(16)
220 view.scatter('a', seq1)
224 view.scatter('a', seq1)
221 seq2 = view.gather('a', block=True)
225 seq2 = view.gather('a', block=True)
222 self.assertEquals(seq2, seq1)
226 self.assertEquals(seq2, seq1)
223 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
227 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
224
228
225 @skip_without('numpy')
229 @skip_without('numpy')
226 def test_scatter_gather_numpy(self):
230 def test_scatter_gather_numpy(self):
227 import numpy
231 import numpy
228 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
232 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
229 view = self.client[:]
233 view = self.client[:]
230 a = numpy.arange(64)
234 a = numpy.arange(64)
231 view.scatter('a', a)
235 view.scatter('a', a)
232 b = view.gather('a', block=True)
236 b = view.gather('a', block=True)
233 assert_array_equal(b, a)
237 assert_array_equal(b, a)
234
238
235 def test_map(self):
239 def test_map(self):
236 view = self.client[:]
240 view = self.client[:]
237 def f(x):
241 def f(x):
238 return x**2
242 return x**2
239 data = range(16)
243 data = range(16)
240 r = view.map_sync(f, data)
244 r = view.map_sync(f, data)
241 self.assertEquals(r, map(f, data))
245 self.assertEquals(r, map(f, data))
242
246
243 def test_scatterGatherNonblocking(self):
247 def test_scatterGatherNonblocking(self):
244 data = range(16)
248 data = range(16)
245 view = self.client[:]
249 view = self.client[:]
246 view.scatter('a', data, block=False)
250 view.scatter('a', data, block=False)
247 ar = view.gather('a', block=False)
251 ar = view.gather('a', block=False)
248 self.assertEquals(ar.get(), data)
252 self.assertEquals(ar.get(), data)
249
253
250 @skip_without('numpy')
254 @skip_without('numpy')
251 def test_scatter_gather_numpy_nonblocking(self):
255 def test_scatter_gather_numpy_nonblocking(self):
252 import numpy
256 import numpy
253 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
257 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
254 a = numpy.arange(64)
258 a = numpy.arange(64)
255 view = self.client[:]
259 view = self.client[:]
256 ar = view.scatter('a', a, block=False)
260 ar = view.scatter('a', a, block=False)
257 self.assertTrue(isinstance(ar, AsyncResult))
261 self.assertTrue(isinstance(ar, AsyncResult))
258 amr = view.gather('a', block=False)
262 amr = view.gather('a', block=False)
259 self.assertTrue(isinstance(amr, AsyncMapResult))
263 self.assertTrue(isinstance(amr, AsyncMapResult))
260 assert_array_equal(amr.get(), a)
264 assert_array_equal(amr.get(), a)
261
265
262 def test_execute(self):
266 def test_execute(self):
263 view = self.client[:]
267 view = self.client[:]
264 # self.client.debug=True
268 # self.client.debug=True
265 execute = view.execute
269 execute = view.execute
266 ar = execute('c=30', block=False)
270 ar = execute('c=30', block=False)
267 self.assertTrue(isinstance(ar, AsyncResult))
271 self.assertTrue(isinstance(ar, AsyncResult))
268 ar = execute('d=[0,1,2]', block=False)
272 ar = execute('d=[0,1,2]', block=False)
269 self.client.wait(ar, 1)
273 self.client.wait(ar, 1)
270 self.assertEquals(len(ar.get()), len(self.client))
274 self.assertEquals(len(ar.get()), len(self.client))
271 for c in view['c']:
275 for c in view['c']:
272 self.assertEquals(c, 30)
276 self.assertEquals(c, 30)
273
277
274 def test_abort(self):
278 def test_abort(self):
275 view = self.client[-1]
279 view = self.client[-1]
276 ar = view.execute('import time; time.sleep(0.25)', block=False)
280 ar = view.execute('import time; time.sleep(0.25)', block=False)
277 ar2 = view.apply_async(lambda : 2)
281 ar2 = view.apply_async(lambda : 2)
278 ar3 = view.apply_async(lambda : 3)
282 ar3 = view.apply_async(lambda : 3)
279 view.abort(ar2)
283 view.abort(ar2)
280 view.abort(ar3.msg_ids)
284 view.abort(ar3.msg_ids)
281 self.assertRaises(error.TaskAborted, ar2.get)
285 self.assertRaises(error.TaskAborted, ar2.get)
282 self.assertRaises(error.TaskAborted, ar3.get)
286 self.assertRaises(error.TaskAborted, ar3.get)
283
287
284 def test_temp_flags(self):
288 def test_temp_flags(self):
285 view = self.client[-1]
289 view = self.client[-1]
286 view.block=True
290 view.block=True
287 with view.temp_flags(block=False):
291 with view.temp_flags(block=False):
288 self.assertFalse(view.block)
292 self.assertFalse(view.block)
289 self.assertTrue(view.block)
293 self.assertTrue(view.block)
290
294
291 def test_importer(self):
295 def test_importer(self):
292 view = self.client[-1]
296 view = self.client[-1]
293 view.clear(block=True)
297 view.clear(block=True)
294 with view.importer:
298 with view.importer:
295 import re
299 import re
296
300
297 @interactive
301 @interactive
298 def findall(pat, s):
302 def findall(pat, s):
299 # this globals() step isn't necessary in real code
303 # this globals() step isn't necessary in real code
300 # only to prevent a closure in the test
304 # only to prevent a closure in the test
301 re = globals()['re']
305 re = globals()['re']
302 return re.findall(pat, s)
306 return re.findall(pat, s)
303
307
304 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
308 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
305
309
306 # parallel magic tests
310 # parallel magic tests
307
311
308 def test_magic_px_blocking(self):
312 def test_magic_px_blocking(self):
309 ip = get_ipython()
313 ip = get_ipython()
310 v = self.client[-1]
314 v = self.client[-1]
311 v.activate()
315 v.activate()
312 v.block=True
316 v.block=True
313
317
314 ip.magic_px('a=5')
318 ip.magic_px('a=5')
315 self.assertEquals(v['a'], 5)
319 self.assertEquals(v['a'], 5)
316 ip.magic_px('a=10')
320 ip.magic_px('a=10')
317 self.assertEquals(v['a'], 10)
321 self.assertEquals(v['a'], 10)
318 sio = StringIO()
322 sio = StringIO()
319 savestdout = sys.stdout
323 savestdout = sys.stdout
320 sys.stdout = sio
324 sys.stdout = sio
321 ip.magic_px('print a')
325 ip.magic_px('print a')
322 sys.stdout = savestdout
326 sys.stdout = savestdout
323 sio.read()
327 sio.read()
324 self.assertTrue('[stdout:%i]'%v.targets in sio.buf)
328 self.assertTrue('[stdout:%i]'%v.targets in sio.buf)
325 self.assertRaisesRemote(ZeroDivisionError, ip.magic_px, '1/0')
329 self.assertRaisesRemote(ZeroDivisionError, ip.magic_px, '1/0')
326
330
327 def test_magic_px_nonblocking(self):
331 def test_magic_px_nonblocking(self):
328 ip = get_ipython()
332 ip = get_ipython()
329 v = self.client[-1]
333 v = self.client[-1]
330 v.activate()
334 v.activate()
331 v.block=False
335 v.block=False
332
336
333 ip.magic_px('a=5')
337 ip.magic_px('a=5')
334 self.assertEquals(v['a'], 5)
338 self.assertEquals(v['a'], 5)
335 ip.magic_px('a=10')
339 ip.magic_px('a=10')
336 self.assertEquals(v['a'], 10)
340 self.assertEquals(v['a'], 10)
337 sio = StringIO()
341 sio = StringIO()
338 savestdout = sys.stdout
342 savestdout = sys.stdout
339 sys.stdout = sio
343 sys.stdout = sio
340 ip.magic_px('print a')
344 ip.magic_px('print a')
341 sys.stdout = savestdout
345 sys.stdout = savestdout
342 sio.read()
346 sio.read()
343 self.assertFalse('[stdout:%i]'%v.targets in sio.buf)
347 self.assertFalse('[stdout:%i]'%v.targets in sio.buf)
344 ip.magic_px('1/0')
348 ip.magic_px('1/0')
345 ar = v.get_result(-1)
349 ar = v.get_result(-1)
346 self.assertRaisesRemote(ZeroDivisionError, ar.get)
350 self.assertRaisesRemote(ZeroDivisionError, ar.get)
347
351
348 def test_magic_autopx_blocking(self):
352 def test_magic_autopx_blocking(self):
349 ip = get_ipython()
353 ip = get_ipython()
350 v = self.client[-1]
354 v = self.client[-1]
351 v.activate()
355 v.activate()
352 v.block=True
356 v.block=True
353
357
354 sio = StringIO()
358 sio = StringIO()
355 savestdout = sys.stdout
359 savestdout = sys.stdout
356 sys.stdout = sio
360 sys.stdout = sio
357 ip.magic_autopx()
361 ip.magic_autopx()
358 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
362 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
359 ip.run_cell('print b')
363 ip.run_cell('print b')
360 ip.run_cell("b/c")
364 ip.run_cell("b/c")
361 ip.run_code(compile('b*=2', '', 'single'))
365 ip.run_code(compile('b*=2', '', 'single'))
362 ip.magic_autopx()
366 ip.magic_autopx()
363 sys.stdout = savestdout
367 sys.stdout = savestdout
364 sio.read()
368 sio.read()
365 output = sio.buf.strip()
369 output = sio.buf.strip()
366 self.assertTrue(output.startswith('%autopx enabled'))
370 self.assertTrue(output.startswith('%autopx enabled'))
367 self.assertTrue(output.endswith('%autopx disabled'))
371 self.assertTrue(output.endswith('%autopx disabled'))
368 self.assertTrue('RemoteError: ZeroDivisionError' in output)
372 self.assertTrue('RemoteError: ZeroDivisionError' in output)
369 ar = v.get_result(-2)
373 ar = v.get_result(-2)
370 self.assertEquals(v['a'], 5)
374 self.assertEquals(v['a'], 5)
371 self.assertEquals(v['b'], 20)
375 self.assertEquals(v['b'], 20)
372 self.assertRaisesRemote(ZeroDivisionError, ar.get)
376 self.assertRaisesRemote(ZeroDivisionError, ar.get)
373
377
374 def test_magic_autopx_nonblocking(self):
378 def test_magic_autopx_nonblocking(self):
375 ip = get_ipython()
379 ip = get_ipython()
376 v = self.client[-1]
380 v = self.client[-1]
377 v.activate()
381 v.activate()
378 v.block=False
382 v.block=False
379
383
380 sio = StringIO()
384 sio = StringIO()
381 savestdout = sys.stdout
385 savestdout = sys.stdout
382 sys.stdout = sio
386 sys.stdout = sio
383 ip.magic_autopx()
387 ip.magic_autopx()
384 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
388 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
385 ip.run_cell('print b')
389 ip.run_cell('print b')
386 ip.run_cell("b/c")
390 ip.run_cell("b/c")
387 ip.run_code(compile('b*=2', '', 'single'))
391 ip.run_code(compile('b*=2', '', 'single'))
388 ip.magic_autopx()
392 ip.magic_autopx()
389 sys.stdout = savestdout
393 sys.stdout = savestdout
390 sio.read()
394 sio.read()
391 output = sio.buf.strip()
395 output = sio.buf.strip()
392 self.assertTrue(output.startswith('%autopx enabled'))
396 self.assertTrue(output.startswith('%autopx enabled'))
393 self.assertTrue(output.endswith('%autopx disabled'))
397 self.assertTrue(output.endswith('%autopx disabled'))
394 self.assertFalse('ZeroDivisionError' in output)
398 self.assertFalse('ZeroDivisionError' in output)
395 ar = v.get_result(-2)
399 ar = v.get_result(-2)
396 self.assertEquals(v['a'], 5)
400 self.assertEquals(v['a'], 5)
397 self.assertEquals(v['b'], 20)
401 self.assertEquals(v['b'], 20)
398 self.assertRaisesRemote(ZeroDivisionError, ar.get)
402 self.assertRaisesRemote(ZeroDivisionError, ar.get)
399
403
400 def test_magic_result(self):
404 def test_magic_result(self):
401 ip = get_ipython()
405 ip = get_ipython()
402 v = self.client[-1]
406 v = self.client[-1]
403 v.activate()
407 v.activate()
404 v['a'] = 111
408 v['a'] = 111
405 ra = v['a']
409 ra = v['a']
406
410
407 ar = ip.magic_result()
411 ar = ip.magic_result()
408 self.assertEquals(ar.msg_ids, [v.history[-1]])
412 self.assertEquals(ar.msg_ids, [v.history[-1]])
409 self.assertEquals(ar.get(), 111)
413 self.assertEquals(ar.get(), 111)
410 ar = ip.magic_result('-2')
414 ar = ip.magic_result('-2')
411 self.assertEquals(ar.msg_ids, [v.history[-2]])
415 self.assertEquals(ar.msg_ids, [v.history[-2]])
412
416
413
417
414
418
General Comments 0
You need to be logged in to leave comments. Login now