##// END OF EJS Templates
further tweaks to parallel tests...
MinRK -
Show More
@@ -1,126 +1,129 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """test LoadBalancedView objects
2 """test LoadBalancedView objects
3
3
4 Authors:
4 Authors:
5
5
6 * Min RK
6 * Min RK
7 """
7 """
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import sys
19 import sys
20 import time
20 import time
21
21
22 import zmq
22 import zmq
23 from nose import SkipTest
23
24
24 from IPython import parallel as pmod
25 from IPython import parallel as pmod
25 from IPython.parallel import error
26 from IPython.parallel import error
26
27
27 from IPython.parallel.tests import add_engines
28 from IPython.parallel.tests import add_engines
28
29
29 from .clienttest import ClusterTestCase, crash, wait, skip_without
30 from .clienttest import ClusterTestCase, crash, wait, skip_without
30
31
31 def setup():
32 def setup():
32 add_engines(3)
33 add_engines(3)
33
34
34 class TestLoadBalancedView(ClusterTestCase):
35 class TestLoadBalancedView(ClusterTestCase):
35
36
36 def setUp(self):
37 def setUp(self):
37 ClusterTestCase.setUp(self)
38 ClusterTestCase.setUp(self)
38 self.view = self.client.load_balanced_view()
39 self.view = self.client.load_balanced_view()
39
40
40 def test_z_crash_task(self):
41 def test_z_crash_task(self):
41 """test graceful handling of engine death (balanced)"""
42 """test graceful handling of engine death (balanced)"""
43 raise SkipTest("crash tests disabled, due to undesirable crash reports")
42 # self.add_engines(1)
44 # self.add_engines(1)
43 ar = self.view.apply_async(crash)
45 ar = self.view.apply_async(crash)
44 self.assertRaisesRemote(error.EngineError, ar.get, 10)
46 self.assertRaisesRemote(error.EngineError, ar.get, 10)
45 eid = ar.engine_id
47 eid = ar.engine_id
46 tic = time.time()
48 tic = time.time()
47 while eid in self.client.ids and time.time()-tic < 5:
49 while eid in self.client.ids and time.time()-tic < 5:
48 time.sleep(.01)
50 time.sleep(.01)
49 self.client.spin()
51 self.client.spin()
50 self.assertFalse(eid in self.client.ids, "Engine should have died")
52 self.assertFalse(eid in self.client.ids, "Engine should have died")
51
53
52 def test_map(self):
54 def test_map(self):
53 def f(x):
55 def f(x):
54 return x**2
56 return x**2
55 data = range(16)
57 data = range(16)
56 r = self.view.map_sync(f, data)
58 r = self.view.map_sync(f, data)
57 self.assertEquals(r, map(f, data))
59 self.assertEquals(r, map(f, data))
58
60
59 def test_abort(self):
61 def test_abort(self):
60 view = self.view
62 view = self.view
61 ar = self.client[:].apply_async(time.sleep, .5)
63 ar = self.client[:].apply_async(time.sleep, .5)
62 ar = self.client[:].apply_async(time.sleep, .5)
64 ar = self.client[:].apply_async(time.sleep, .5)
65 time.sleep(0.2)
63 ar2 = view.apply_async(lambda : 2)
66 ar2 = view.apply_async(lambda : 2)
64 ar3 = view.apply_async(lambda : 3)
67 ar3 = view.apply_async(lambda : 3)
65 view.abort(ar2)
68 view.abort(ar2)
66 view.abort(ar3.msg_ids)
69 view.abort(ar3.msg_ids)
67 self.assertRaises(error.TaskAborted, ar2.get)
70 self.assertRaises(error.TaskAborted, ar2.get)
68 self.assertRaises(error.TaskAborted, ar3.get)
71 self.assertRaises(error.TaskAborted, ar3.get)
69
72
70 def test_retries(self):
73 def test_retries(self):
71 add_engines(3)
74 add_engines(3)
72 view = self.view
75 view = self.view
73 view.timeout = 1 # prevent hang if this doesn't behave
76 view.timeout = 1 # prevent hang if this doesn't behave
74 def fail():
77 def fail():
75 assert False
78 assert False
76 for r in range(len(self.client)-1):
79 for r in range(len(self.client)-1):
77 with view.temp_flags(retries=r):
80 with view.temp_flags(retries=r):
78 self.assertRaisesRemote(AssertionError, view.apply_sync, fail)
81 self.assertRaisesRemote(AssertionError, view.apply_sync, fail)
79
82
80 with view.temp_flags(retries=len(self.client), timeout=0.25):
83 with view.temp_flags(retries=len(self.client), timeout=0.25):
81 self.assertRaisesRemote(error.TaskTimeout, view.apply_sync, fail)
84 self.assertRaisesRemote(error.TaskTimeout, view.apply_sync, fail)
82
85
83 def test_invalid_dependency(self):
86 def test_invalid_dependency(self):
84 view = self.view
87 view = self.view
85 with view.temp_flags(after='12345'):
88 with view.temp_flags(after='12345'):
86 self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1)
89 self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1)
87
90
88 def test_impossible_dependency(self):
91 def test_impossible_dependency(self):
89 if len(self.client) < 2:
92 if len(self.client) < 2:
90 add_engines(2)
93 add_engines(2)
91 view = self.client.load_balanced_view()
94 view = self.client.load_balanced_view()
92 ar1 = view.apply_async(lambda : 1)
95 ar1 = view.apply_async(lambda : 1)
93 ar1.get()
96 ar1.get()
94 e1 = ar1.engine_id
97 e1 = ar1.engine_id
95 e2 = e1
98 e2 = e1
96 while e2 == e1:
99 while e2 == e1:
97 ar2 = view.apply_async(lambda : 1)
100 ar2 = view.apply_async(lambda : 1)
98 ar2.get()
101 ar2.get()
99 e2 = ar2.engine_id
102 e2 = ar2.engine_id
100
103
101 with view.temp_flags(follow=[ar1, ar2]):
104 with view.temp_flags(follow=[ar1, ar2]):
102 self.assertRaisesRemote(error.ImpossibleDependency, view.apply_sync, lambda : 1)
105 self.assertRaisesRemote(error.ImpossibleDependency, view.apply_sync, lambda : 1)
103
106
104
107
105 def test_follow(self):
108 def test_follow(self):
106 ar = self.view.apply_async(lambda : 1)
109 ar = self.view.apply_async(lambda : 1)
107 ar.get()
110 ar.get()
108 ars = []
111 ars = []
109 first_id = ar.engine_id
112 first_id = ar.engine_id
110
113
111 self.view.follow = ar
114 self.view.follow = ar
112 for i in range(5):
115 for i in range(5):
113 ars.append(self.view.apply_async(lambda : 1))
116 ars.append(self.view.apply_async(lambda : 1))
114 self.view.wait(ars)
117 self.view.wait(ars)
115 for ar in ars:
118 for ar in ars:
116 self.assertEquals(ar.engine_id, first_id)
119 self.assertEquals(ar.engine_id, first_id)
117
120
118 def test_after(self):
121 def test_after(self):
119 view = self.view
122 view = self.view
120 ar = view.apply_async(time.sleep, 0.5)
123 ar = view.apply_async(time.sleep, 0.5)
121 with view.temp_flags(after=ar):
124 with view.temp_flags(after=ar):
122 ar2 = view.apply_async(lambda : 1)
125 ar2 = view.apply_async(lambda : 1)
123
126
124 ar.wait()
127 ar.wait()
125 ar2.wait()
128 ar2.wait()
126 self.assertTrue(ar2.started > ar.completed)
129 self.assertTrue(ar2.started > ar.completed)
@@ -1,447 +1,451 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """test View objects
2 """test View objects
3
3
4 Authors:
4 Authors:
5
5
6 * Min RK
6 * Min RK
7 """
7 """
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import sys
19 import sys
20 import time
20 import time
21 from tempfile import mktemp
21 from tempfile import mktemp
22 from StringIO import StringIO
22 from StringIO import StringIO
23
23
24 import zmq
24 import zmq
25 from nose import SkipTest
25
26
26 from IPython import parallel as pmod
27 from IPython import parallel as pmod
27 from IPython.parallel import error
28 from IPython.parallel import error
28 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
29 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
29 from IPython.parallel import DirectView
30 from IPython.parallel import DirectView
30 from IPython.parallel.util import interactive
31 from IPython.parallel.util import interactive
31
32
32 from IPython.parallel.tests import add_engines
33 from IPython.parallel.tests import add_engines
33
34
34 from .clienttest import ClusterTestCase, crash, wait, skip_without
35 from .clienttest import ClusterTestCase, crash, wait, skip_without
35
36
36 def setup():
37 def setup():
37 add_engines(3)
38 add_engines(3)
38
39
39 class TestView(ClusterTestCase):
40 class TestView(ClusterTestCase):
40
41
41 def test_z_crash_mux(self):
42 def test_z_crash_mux(self):
42 """test graceful handling of engine death (direct)"""
43 """test graceful handling of engine death (direct)"""
44 raise SkipTest("crash tests disabled, due to undesirable crash reports")
43 # self.add_engines(1)
45 # self.add_engines(1)
44 eid = self.client.ids[-1]
46 eid = self.client.ids[-1]
45 ar = self.client[eid].apply_async(crash)
47 ar = self.client[eid].apply_async(crash)
46 self.assertRaisesRemote(error.EngineError, ar.get, 10)
48 self.assertRaisesRemote(error.EngineError, ar.get, 10)
47 eid = ar.engine_id
49 eid = ar.engine_id
48 tic = time.time()
50 tic = time.time()
49 while eid in self.client.ids and time.time()-tic < 5:
51 while eid in self.client.ids and time.time()-tic < 5:
50 time.sleep(.01)
52 time.sleep(.01)
51 self.client.spin()
53 self.client.spin()
52 self.assertFalse(eid in self.client.ids, "Engine should have died")
54 self.assertFalse(eid in self.client.ids, "Engine should have died")
53
55
54 def test_push_pull(self):
56 def test_push_pull(self):
55 """test pushing and pulling"""
57 """test pushing and pulling"""
56 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
58 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
57 t = self.client.ids[-1]
59 t = self.client.ids[-1]
58 v = self.client[t]
60 v = self.client[t]
59 push = v.push
61 push = v.push
60 pull = v.pull
62 pull = v.pull
61 v.block=True
63 v.block=True
62 nengines = len(self.client)
64 nengines = len(self.client)
63 push({'data':data})
65 push({'data':data})
64 d = pull('data')
66 d = pull('data')
65 self.assertEquals(d, data)
67 self.assertEquals(d, data)
66 self.client[:].push({'data':data})
68 self.client[:].push({'data':data})
67 d = self.client[:].pull('data', block=True)
69 d = self.client[:].pull('data', block=True)
68 self.assertEquals(d, nengines*[data])
70 self.assertEquals(d, nengines*[data])
69 ar = push({'data':data}, block=False)
71 ar = push({'data':data}, block=False)
70 self.assertTrue(isinstance(ar, AsyncResult))
72 self.assertTrue(isinstance(ar, AsyncResult))
71 r = ar.get()
73 r = ar.get()
72 ar = self.client[:].pull('data', block=False)
74 ar = self.client[:].pull('data', block=False)
73 self.assertTrue(isinstance(ar, AsyncResult))
75 self.assertTrue(isinstance(ar, AsyncResult))
74 r = ar.get()
76 r = ar.get()
75 self.assertEquals(r, nengines*[data])
77 self.assertEquals(r, nengines*[data])
76 self.client[:].push(dict(a=10,b=20))
78 self.client[:].push(dict(a=10,b=20))
77 r = self.client[:].pull(('a','b'), block=True)
79 r = self.client[:].pull(('a','b'), block=True)
78 self.assertEquals(r, nengines*[[10,20]])
80 self.assertEquals(r, nengines*[[10,20]])
79
81
80 def test_push_pull_function(self):
82 def test_push_pull_function(self):
81 "test pushing and pulling functions"
83 "test pushing and pulling functions"
82 def testf(x):
84 def testf(x):
83 return 2.0*x
85 return 2.0*x
84
86
85 t = self.client.ids[-1]
87 t = self.client.ids[-1]
86 v = self.client[t]
88 v = self.client[t]
87 v.block=True
89 v.block=True
88 push = v.push
90 push = v.push
89 pull = v.pull
91 pull = v.pull
90 execute = v.execute
92 execute = v.execute
91 push({'testf':testf})
93 push({'testf':testf})
92 r = pull('testf')
94 r = pull('testf')
93 self.assertEqual(r(1.0), testf(1.0))
95 self.assertEqual(r(1.0), testf(1.0))
94 execute('r = testf(10)')
96 execute('r = testf(10)')
95 r = pull('r')
97 r = pull('r')
96 self.assertEquals(r, testf(10))
98 self.assertEquals(r, testf(10))
97 ar = self.client[:].push({'testf':testf}, block=False)
99 ar = self.client[:].push({'testf':testf}, block=False)
98 ar.get()
100 ar.get()
99 ar = self.client[:].pull('testf', block=False)
101 ar = self.client[:].pull('testf', block=False)
100 rlist = ar.get()
102 rlist = ar.get()
101 for r in rlist:
103 for r in rlist:
102 self.assertEqual(r(1.0), testf(1.0))
104 self.assertEqual(r(1.0), testf(1.0))
103 execute("def g(x): return x*x")
105 execute("def g(x): return x*x")
104 r = pull(('testf','g'))
106 r = pull(('testf','g'))
105 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
107 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
106
108
107 def test_push_function_globals(self):
109 def test_push_function_globals(self):
108 """test that pushed functions have access to globals"""
110 """test that pushed functions have access to globals"""
109 @interactive
111 @interactive
110 def geta():
112 def geta():
111 return a
113 return a
112 # self.add_engines(1)
114 # self.add_engines(1)
113 v = self.client[-1]
115 v = self.client[-1]
114 v.block=True
116 v.block=True
115 v['f'] = geta
117 v['f'] = geta
116 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
118 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
117 v.execute('a=5')
119 v.execute('a=5')
118 v.execute('b=f()')
120 v.execute('b=f()')
119 self.assertEquals(v['b'], 5)
121 self.assertEquals(v['b'], 5)
120
122
121 def test_push_function_defaults(self):
123 def test_push_function_defaults(self):
122 """test that pushed functions preserve default args"""
124 """test that pushed functions preserve default args"""
123 def echo(a=10):
125 def echo(a=10):
124 return a
126 return a
125 v = self.client[-1]
127 v = self.client[-1]
126 v.block=True
128 v.block=True
127 v['f'] = echo
129 v['f'] = echo
128 v.execute('b=f()')
130 v.execute('b=f()')
129 self.assertEquals(v['b'], 10)
131 self.assertEquals(v['b'], 10)
130
132
131 def test_get_result(self):
133 def test_get_result(self):
132 """test getting results from the Hub."""
134 """test getting results from the Hub."""
133 c = pmod.Client(profile='iptest')
135 c = pmod.Client(profile='iptest')
134 # self.add_engines(1)
136 # self.add_engines(1)
135 t = c.ids[-1]
137 t = c.ids[-1]
136 v = c[t]
138 v = c[t]
137 v2 = self.client[t]
139 v2 = self.client[t]
138 ar = v.apply_async(wait, 1)
140 ar = v.apply_async(wait, 1)
139 # give the monitor time to notice the message
141 # give the monitor time to notice the message
140 time.sleep(.25)
142 time.sleep(.25)
141 ahr = v2.get_result(ar.msg_ids)
143 ahr = v2.get_result(ar.msg_ids)
142 self.assertTrue(isinstance(ahr, AsyncHubResult))
144 self.assertTrue(isinstance(ahr, AsyncHubResult))
143 self.assertEquals(ahr.get(), ar.get())
145 self.assertEquals(ahr.get(), ar.get())
144 ar2 = v2.get_result(ar.msg_ids)
146 ar2 = v2.get_result(ar.msg_ids)
145 self.assertFalse(isinstance(ar2, AsyncHubResult))
147 self.assertFalse(isinstance(ar2, AsyncHubResult))
146 c.spin()
148 c.spin()
147 c.close()
149 c.close()
148
150
149 def test_run_newline(self):
151 def test_run_newline(self):
150 """test that run appends newline to files"""
152 """test that run appends newline to files"""
151 tmpfile = mktemp()
153 tmpfile = mktemp()
152 with open(tmpfile, 'w') as f:
154 with open(tmpfile, 'w') as f:
153 f.write("""def g():
155 f.write("""def g():
154 return 5
156 return 5
155 """)
157 """)
156 v = self.client[-1]
158 v = self.client[-1]
157 v.run(tmpfile, block=True)
159 v.run(tmpfile, block=True)
158 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
160 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
159
161
160 def test_apply_tracked(self):
162 def test_apply_tracked(self):
161 """test tracking for apply"""
163 """test tracking for apply"""
162 # self.add_engines(1)
164 # self.add_engines(1)
163 t = self.client.ids[-1]
165 t = self.client.ids[-1]
164 v = self.client[t]
166 v = self.client[t]
165 v.block=False
167 v.block=False
166 def echo(n=1024*1024, **kwargs):
168 def echo(n=1024*1024, **kwargs):
167 with v.temp_flags(**kwargs):
169 with v.temp_flags(**kwargs):
168 return v.apply(lambda x: x, 'x'*n)
170 return v.apply(lambda x: x, 'x'*n)
169 ar = echo(1, track=False)
171 ar = echo(1, track=False)
170 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
172 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
171 self.assertTrue(ar.sent)
173 self.assertTrue(ar.sent)
172 ar = echo(track=True)
174 ar = echo(track=True)
173 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
175 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
174 self.assertEquals(ar.sent, ar._tracker.done)
176 self.assertEquals(ar.sent, ar._tracker.done)
175 ar._tracker.wait()
177 ar._tracker.wait()
176 self.assertTrue(ar.sent)
178 self.assertTrue(ar.sent)
177
179
178 def test_push_tracked(self):
180 def test_push_tracked(self):
179 t = self.client.ids[-1]
181 t = self.client.ids[-1]
180 ns = dict(x='x'*1024*1024)
182 ns = dict(x='x'*1024*1024)
181 v = self.client[t]
183 v = self.client[t]
182 ar = v.push(ns, block=False, track=False)
184 ar = v.push(ns, block=False, track=False)
183 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
185 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
184 self.assertTrue(ar.sent)
186 self.assertTrue(ar.sent)
185
187
186 ar = v.push(ns, block=False, track=True)
188 ar = v.push(ns, block=False, track=True)
187 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
189 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
188 self.assertEquals(ar.sent, ar._tracker.done)
189 ar._tracker.wait()
190 ar._tracker.wait()
191 self.assertEquals(ar.sent, ar._tracker.done)
190 self.assertTrue(ar.sent)
192 self.assertTrue(ar.sent)
191 ar.get()
193 ar.get()
192
194
193 def test_scatter_tracked(self):
195 def test_scatter_tracked(self):
194 t = self.client.ids
196 t = self.client.ids
195 x='x'*1024*1024
197 x='x'*1024*1024
196 ar = self.client[t].scatter('x', x, block=False, track=False)
198 ar = self.client[t].scatter('x', x, block=False, track=False)
197 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
199 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
198 self.assertTrue(ar.sent)
200 self.assertTrue(ar.sent)
199
201
200 ar = self.client[t].scatter('x', x, block=False, track=True)
202 ar = self.client[t].scatter('x', x, block=False, track=True)
201 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
203 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
202 self.assertEquals(ar.sent, ar._tracker.done)
204 self.assertEquals(ar.sent, ar._tracker.done)
203 ar._tracker.wait()
205 ar._tracker.wait()
204 self.assertTrue(ar.sent)
206 self.assertTrue(ar.sent)
205 ar.get()
207 ar.get()
206
208
207 def test_remote_reference(self):
209 def test_remote_reference(self):
208 v = self.client[-1]
210 v = self.client[-1]
209 v['a'] = 123
211 v['a'] = 123
210 ra = pmod.Reference('a')
212 ra = pmod.Reference('a')
211 b = v.apply_sync(lambda x: x, ra)
213 b = v.apply_sync(lambda x: x, ra)
212 self.assertEquals(b, 123)
214 self.assertEquals(b, 123)
213
215
214
216
215 def test_scatter_gather(self):
217 def test_scatter_gather(self):
216 view = self.client[:]
218 view = self.client[:]
217 seq1 = range(16)
219 seq1 = range(16)
218 view.scatter('a', seq1)
220 view.scatter('a', seq1)
219 seq2 = view.gather('a', block=True)
221 seq2 = view.gather('a', block=True)
220 self.assertEquals(seq2, seq1)
222 self.assertEquals(seq2, seq1)
221 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
223 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
222
224
223 @skip_without('numpy')
225 @skip_without('numpy')
224 def test_scatter_gather_numpy(self):
226 def test_scatter_gather_numpy(self):
225 import numpy
227 import numpy
226 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
228 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
227 view = self.client[:]
229 view = self.client[:]
228 a = numpy.arange(64)
230 a = numpy.arange(64)
229 view.scatter('a', a)
231 view.scatter('a', a)
230 b = view.gather('a', block=True)
232 b = view.gather('a', block=True)
231 assert_array_equal(b, a)
233 assert_array_equal(b, a)
232
234
233 def test_map(self):
235 def test_map(self):
234 view = self.client[:]
236 view = self.client[:]
235 def f(x):
237 def f(x):
236 return x**2
238 return x**2
237 data = range(16)
239 data = range(16)
238 r = view.map_sync(f, data)
240 r = view.map_sync(f, data)
239 self.assertEquals(r, map(f, data))
241 self.assertEquals(r, map(f, data))
240
242
241 def test_scatterGatherNonblocking(self):
243 def test_scatterGatherNonblocking(self):
242 data = range(16)
244 data = range(16)
243 view = self.client[:]
245 view = self.client[:]
244 view.scatter('a', data, block=False)
246 view.scatter('a', data, block=False)
245 ar = view.gather('a', block=False)
247 ar = view.gather('a', block=False)
246 self.assertEquals(ar.get(), data)
248 self.assertEquals(ar.get(), data)
247
249
248 @skip_without('numpy')
250 @skip_without('numpy')
249 def test_scatter_gather_numpy_nonblocking(self):
251 def test_scatter_gather_numpy_nonblocking(self):
250 import numpy
252 import numpy
251 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
253 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
252 a = numpy.arange(64)
254 a = numpy.arange(64)
253 view = self.client[:]
255 view = self.client[:]
254 ar = view.scatter('a', a, block=False)
256 ar = view.scatter('a', a, block=False)
255 self.assertTrue(isinstance(ar, AsyncResult))
257 self.assertTrue(isinstance(ar, AsyncResult))
256 amr = view.gather('a', block=False)
258 amr = view.gather('a', block=False)
257 self.assertTrue(isinstance(amr, AsyncMapResult))
259 self.assertTrue(isinstance(amr, AsyncMapResult))
258 assert_array_equal(amr.get(), a)
260 assert_array_equal(amr.get(), a)
259
261
260 def test_execute(self):
262 def test_execute(self):
261 view = self.client[:]
263 view = self.client[:]
262 # self.client.debug=True
264 # self.client.debug=True
263 execute = view.execute
265 execute = view.execute
264 ar = execute('c=30', block=False)
266 ar = execute('c=30', block=False)
265 self.assertTrue(isinstance(ar, AsyncResult))
267 self.assertTrue(isinstance(ar, AsyncResult))
266 ar = execute('d=[0,1,2]', block=False)
268 ar = execute('d=[0,1,2]', block=False)
267 self.client.wait(ar, 1)
269 self.client.wait(ar, 1)
268 self.assertEquals(len(ar.get()), len(self.client))
270 self.assertEquals(len(ar.get()), len(self.client))
269 for c in view['c']:
271 for c in view['c']:
270 self.assertEquals(c, 30)
272 self.assertEquals(c, 30)
271
273
272 def test_abort(self):
274 def test_abort(self):
273 view = self.client[-1]
275 view = self.client[-1]
274 ar = view.execute('import time; time.sleep(0.25)', block=False)
276 ar = view.execute('import time; time.sleep(1)', block=False)
275 ar2 = view.apply_async(lambda : 2)
277 ar2 = view.apply_async(lambda : 2)
276 ar3 = view.apply_async(lambda : 3)
278 ar3 = view.apply_async(lambda : 3)
277 view.abort(ar2)
279 view.abort(ar2)
278 view.abort(ar3.msg_ids)
280 view.abort(ar3.msg_ids)
279 self.assertRaises(error.TaskAborted, ar2.get)
281 self.assertRaises(error.TaskAborted, ar2.get)
280 self.assertRaises(error.TaskAborted, ar3.get)
282 self.assertRaises(error.TaskAborted, ar3.get)
281
283
282 def test_temp_flags(self):
284 def test_temp_flags(self):
283 view = self.client[-1]
285 view = self.client[-1]
284 view.block=True
286 view.block=True
285 with view.temp_flags(block=False):
287 with view.temp_flags(block=False):
286 self.assertFalse(view.block)
288 self.assertFalse(view.block)
287 self.assertTrue(view.block)
289 self.assertTrue(view.block)
288
290
289 def test_importer(self):
291 def test_importer(self):
290 view = self.client[-1]
292 view = self.client[-1]
291 view.clear(block=True)
293 view.clear(block=True)
292 with view.importer:
294 with view.importer:
293 import re
295 import re
294
296
295 @interactive
297 @interactive
296 def findall(pat, s):
298 def findall(pat, s):
297 # this globals() step isn't necessary in real code
299 # this globals() step isn't necessary in real code
298 # only to prevent a closure in the test
300 # only to prevent a closure in the test
299 re = globals()['re']
301 re = globals()['re']
300 return re.findall(pat, s)
302 return re.findall(pat, s)
301
303
302 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
304 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
303
305
304 # parallel magic tests
306 # parallel magic tests
305
307
306 def test_magic_px_blocking(self):
308 def test_magic_px_blocking(self):
307 ip = get_ipython()
309 ip = get_ipython()
308 v = self.client[-1]
310 v = self.client[-1]
309 v.activate()
311 v.activate()
310 v.block=True
312 v.block=True
311
313
312 ip.magic_px('a=5')
314 ip.magic_px('a=5')
313 self.assertEquals(v['a'], 5)
315 self.assertEquals(v['a'], 5)
314 ip.magic_px('a=10')
316 ip.magic_px('a=10')
315 self.assertEquals(v['a'], 10)
317 self.assertEquals(v['a'], 10)
316 sio = StringIO()
318 sio = StringIO()
317 savestdout = sys.stdout
319 savestdout = sys.stdout
318 sys.stdout = sio
320 sys.stdout = sio
319 ip.magic_px('print a')
321 # just 'print a' worst ~99% of the time, but this ensures that
322 # the stdout message has arrived when the result is finished:
323 ip.magic_px('import sys,time;print a; sys.stdout.flush();time.sleep(0.2)')
320 sys.stdout = savestdout
324 sys.stdout = savestdout
321 buf = sio.getvalue()
325 buf = sio.getvalue()
322 self.assertTrue('[stdout:' in buf, buf)
326 self.assertTrue('[stdout:' in buf, buf)
323 self.assertTrue(buf.rstrip().endswith('10'))
327 self.assertTrue(buf.rstrip().endswith('10'))
324 self.assertRaisesRemote(ZeroDivisionError, ip.magic_px, '1/0')
328 self.assertRaisesRemote(ZeroDivisionError, ip.magic_px, '1/0')
325
329
326 def test_magic_px_nonblocking(self):
330 def test_magic_px_nonblocking(self):
327 ip = get_ipython()
331 ip = get_ipython()
328 v = self.client[-1]
332 v = self.client[-1]
329 v.activate()
333 v.activate()
330 v.block=False
334 v.block=False
331
335
332 ip.magic_px('a=5')
336 ip.magic_px('a=5')
333 self.assertEquals(v['a'], 5)
337 self.assertEquals(v['a'], 5)
334 ip.magic_px('a=10')
338 ip.magic_px('a=10')
335 self.assertEquals(v['a'], 10)
339 self.assertEquals(v['a'], 10)
336 sio = StringIO()
340 sio = StringIO()
337 savestdout = sys.stdout
341 savestdout = sys.stdout
338 sys.stdout = sio
342 sys.stdout = sio
339 ip.magic_px('print a')
343 ip.magic_px('print a')
340 sys.stdout = savestdout
344 sys.stdout = savestdout
341 buf = sio.getvalue()
345 buf = sio.getvalue()
342 self.assertFalse('[stdout:%i]'%v.targets in buf)
346 self.assertFalse('[stdout:%i]'%v.targets in buf)
343 ip.magic_px('1/0')
347 ip.magic_px('1/0')
344 ar = v.get_result(-1)
348 ar = v.get_result(-1)
345 self.assertRaisesRemote(ZeroDivisionError, ar.get)
349 self.assertRaisesRemote(ZeroDivisionError, ar.get)
346
350
347 def test_magic_autopx_blocking(self):
351 def test_magic_autopx_blocking(self):
348 ip = get_ipython()
352 ip = get_ipython()
349 v = self.client[-1]
353 v = self.client[-1]
350 v.activate()
354 v.activate()
351 v.block=True
355 v.block=True
352
356
353 sio = StringIO()
357 sio = StringIO()
354 savestdout = sys.stdout
358 savestdout = sys.stdout
355 sys.stdout = sio
359 sys.stdout = sio
356 ip.magic_autopx()
360 ip.magic_autopx()
357 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
361 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
358 ip.run_cell('print b')
362 ip.run_cell('print b')
359 ip.run_cell("b/c")
363 ip.run_cell("b/c")
360 ip.run_code(compile('b*=2', '', 'single'))
364 ip.run_code(compile('b*=2', '', 'single'))
361 ip.magic_autopx()
365 ip.magic_autopx()
362 sys.stdout = savestdout
366 sys.stdout = savestdout
363 output = sio.getvalue().strip()
367 output = sio.getvalue().strip()
364 self.assertTrue(output.startswith('%autopx enabled'))
368 self.assertTrue(output.startswith('%autopx enabled'))
365 self.assertTrue(output.endswith('%autopx disabled'))
369 self.assertTrue(output.endswith('%autopx disabled'))
366 self.assertTrue('RemoteError: ZeroDivisionError' in output)
370 self.assertTrue('RemoteError: ZeroDivisionError' in output)
367 ar = v.get_result(-2)
371 ar = v.get_result(-2)
368 self.assertEquals(v['a'], 5)
372 self.assertEquals(v['a'], 5)
369 self.assertEquals(v['b'], 20)
373 self.assertEquals(v['b'], 20)
370 self.assertRaisesRemote(ZeroDivisionError, ar.get)
374 self.assertRaisesRemote(ZeroDivisionError, ar.get)
371
375
372 def test_magic_autopx_nonblocking(self):
376 def test_magic_autopx_nonblocking(self):
373 ip = get_ipython()
377 ip = get_ipython()
374 v = self.client[-1]
378 v = self.client[-1]
375 v.activate()
379 v.activate()
376 v.block=False
380 v.block=False
377
381
378 sio = StringIO()
382 sio = StringIO()
379 savestdout = sys.stdout
383 savestdout = sys.stdout
380 sys.stdout = sio
384 sys.stdout = sio
381 ip.magic_autopx()
385 ip.magic_autopx()
382 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
386 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
383 ip.run_cell('print b')
387 ip.run_cell('print b')
384 ip.run_cell("b/c")
388 ip.run_cell("b/c")
385 ip.run_code(compile('b*=2', '', 'single'))
389 ip.run_code(compile('b*=2', '', 'single'))
386 ip.magic_autopx()
390 ip.magic_autopx()
387 sys.stdout = savestdout
391 sys.stdout = savestdout
388 output = sio.getvalue().strip()
392 output = sio.getvalue().strip()
389 self.assertTrue(output.startswith('%autopx enabled'))
393 self.assertTrue(output.startswith('%autopx enabled'))
390 self.assertTrue(output.endswith('%autopx disabled'))
394 self.assertTrue(output.endswith('%autopx disabled'))
391 self.assertFalse('ZeroDivisionError' in output)
395 self.assertFalse('ZeroDivisionError' in output)
392 ar = v.get_result(-2)
396 ar = v.get_result(-2)
393 self.assertEquals(v['a'], 5)
397 self.assertEquals(v['a'], 5)
394 self.assertEquals(v['b'], 20)
398 self.assertEquals(v['b'], 20)
395 self.assertRaisesRemote(ZeroDivisionError, ar.get)
399 self.assertRaisesRemote(ZeroDivisionError, ar.get)
396
400
397 def test_magic_result(self):
401 def test_magic_result(self):
398 ip = get_ipython()
402 ip = get_ipython()
399 v = self.client[-1]
403 v = self.client[-1]
400 v.activate()
404 v.activate()
401 v['a'] = 111
405 v['a'] = 111
402 ra = v['a']
406 ra = v['a']
403
407
404 ar = ip.magic_result()
408 ar = ip.magic_result()
405 self.assertEquals(ar.msg_ids, [v.history[-1]])
409 self.assertEquals(ar.msg_ids, [v.history[-1]])
406 self.assertEquals(ar.get(), 111)
410 self.assertEquals(ar.get(), 111)
407 ar = ip.magic_result('-2')
411 ar = ip.magic_result('-2')
408 self.assertEquals(ar.msg_ids, [v.history[-2]])
412 self.assertEquals(ar.msg_ids, [v.history[-2]])
409
413
410 def test_unicode_execute(self):
414 def test_unicode_execute(self):
411 """test executing unicode strings"""
415 """test executing unicode strings"""
412 v = self.client[-1]
416 v = self.client[-1]
413 v.block=True
417 v.block=True
414 if sys.version_info[0] >= 3:
418 if sys.version_info[0] >= 3:
415 code="a='é'"
419 code="a='é'"
416 else:
420 else:
417 code=u"a=u'é'"
421 code=u"a=u'é'"
418 v.execute(code)
422 v.execute(code)
419 self.assertEquals(v['a'], u'é')
423 self.assertEquals(v['a'], u'é')
420
424
421 def test_unicode_apply_result(self):
425 def test_unicode_apply_result(self):
422 """test unicode apply results"""
426 """test unicode apply results"""
423 v = self.client[-1]
427 v = self.client[-1]
424 r = v.apply_sync(lambda : u'é')
428 r = v.apply_sync(lambda : u'é')
425 self.assertEquals(r, u'é')
429 self.assertEquals(r, u'é')
426
430
427 def test_unicode_apply_arg(self):
431 def test_unicode_apply_arg(self):
428 """test passing unicode arguments to apply"""
432 """test passing unicode arguments to apply"""
429 v = self.client[-1]
433 v = self.client[-1]
430
434
431 @interactive
435 @interactive
432 def check_unicode(a, check):
436 def check_unicode(a, check):
433 assert isinstance(a, unicode), "%r is not unicode"%a
437 assert isinstance(a, unicode), "%r is not unicode"%a
434 assert isinstance(check, bytes), "%r is not bytes"%check
438 assert isinstance(check, bytes), "%r is not bytes"%check
435 assert a.encode('utf8') == check, "%s != %s"%(a,check)
439 assert a.encode('utf8') == check, "%s != %s"%(a,check)
436
440
437 for s in [ u'é', u'ßø®∫',u'asdf' ]:
441 for s in [ u'é', u'ßø®∫',u'asdf' ]:
438 try:
442 try:
439 v.apply_sync(check_unicode, s, s.encode('utf8'))
443 v.apply_sync(check_unicode, s, s.encode('utf8'))
440 except error.RemoteError as e:
444 except error.RemoteError as e:
441 if e.ename == 'AssertionError':
445 if e.ename == 'AssertionError':
442 self.fail(e.evalue)
446 self.fail(e.evalue)
443 else:
447 else:
444 raise e
448 raise e
445
449
446
450
447
451
General Comments 0
You need to be logged in to leave comments. Login now