##// END OF EJS Templates
relax some timing constraints in parallel tests
MinRK -
Show More
@@ -1,122 +1,122 b''
1 1 """toplevel setup/teardown for parallel tests."""
2 2
3 3 #-------------------------------------------------------------------------------
4 4 # Copyright (C) 2011 The IPython Development Team
5 5 #
6 6 # Distributed under the terms of the BSD License. The full license is in
7 7 # the file COPYING, distributed as part of this software.
8 8 #-------------------------------------------------------------------------------
9 9
10 10 #-------------------------------------------------------------------------------
11 11 # Imports
12 12 #-------------------------------------------------------------------------------
13 13
14 14 import os
15 15 import tempfile
16 16 import time
17 17 from subprocess import Popen
18 18
19 19 from IPython.utils.path import get_ipython_dir
20 20 from IPython.parallel import Client
21 21 from IPython.parallel.apps.launcher import (LocalProcessLauncher,
22 22 ipengine_cmd_argv,
23 23 ipcontroller_cmd_argv,
24 24 SIGKILL,
25 25 ProcessStateError,
26 26 )
27 27
28 28 # globals
29 29 launchers = []
30 30 blackhole = open(os.devnull, 'w')
31 31
32 32 # Launcher class
33 33 class TestProcessLauncher(LocalProcessLauncher):
34 34 """subclass LocalProcessLauncher, to prevent extra sockets and threads being created on Windows"""
35 35 def start(self):
36 36 if self.state == 'before':
37 37 self.process = Popen(self.args,
38 38 stdout=blackhole, stderr=blackhole,
39 39 env=os.environ,
40 40 cwd=self.work_dir
41 41 )
42 42 self.notify_start(self.process.pid)
43 43 self.poll = self.process.poll
44 44 else:
45 45 s = 'The process was already started and has state: %r' % self.state
46 46 raise ProcessStateError(s)
47 47
48 48 # nose setup/teardown
49 49
50 50 def setup():
51 51 cluster_dir = os.path.join(get_ipython_dir(), 'profile_iptest')
52 52 engine_json = os.path.join(cluster_dir, 'security', 'ipcontroller-engine.json')
53 53 client_json = os.path.join(cluster_dir, 'security', 'ipcontroller-client.json')
54 54 for json in (engine_json, client_json):
55 55 if os.path.exists(json):
56 56 os.remove(json)
57 57
58 58 cp = TestProcessLauncher()
59 59 cp.cmd_and_args = ipcontroller_cmd_argv + \
60 60 ['--profile=iptest', '--log-level=50', '--ping=250']
61 61 cp.start()
62 62 launchers.append(cp)
63 63 tic = time.time()
64 64 while not os.path.exists(engine_json) or not os.path.exists(client_json):
65 65 if cp.poll() is not None:
66 66 print cp.poll()
67 67 raise RuntimeError("The test controller failed to start.")
68 elif time.time()-tic > 10:
68 elif time.time()-tic > 15:
69 69 raise RuntimeError("Timeout waiting for the test controller to start.")
70 70 time.sleep(0.1)
71 71 add_engines(1)
72 72
73 73 def add_engines(n=1, profile='iptest', total=False):
74 74 """add a number of engines to a given profile.
75 75
76 76 If total is True, then already running engines are counted, and only
77 77 the additional engines necessary (if any) are started.
78 78 """
79 79 rc = Client(profile=profile)
80 80 base = len(rc)
81 81
82 82 if total:
83 83 n = max(n - base, 0)
84 84
85 85 eps = []
86 86 for i in range(n):
87 87 ep = TestProcessLauncher()
88 88 ep.cmd_and_args = ipengine_cmd_argv + ['--profile=%s'%profile, '--log-level=50']
89 89 ep.start()
90 90 launchers.append(ep)
91 91 eps.append(ep)
92 92 tic = time.time()
93 93 while len(rc) < base+n:
94 94 if any([ ep.poll() is not None for ep in eps ]):
95 95 raise RuntimeError("A test engine failed to start.")
96 elif time.time()-tic > 10:
96 elif time.time()-tic > 15:
97 97 raise RuntimeError("Timeout waiting for engines to connect.")
98 98 time.sleep(.1)
99 99 rc.spin()
100 100 rc.close()
101 101 return eps
102 102
103 103 def teardown():
104 104 time.sleep(1)
105 105 while launchers:
106 106 p = launchers.pop()
107 107 if p.poll() is None:
108 108 try:
109 109 p.stop()
110 110 except Exception, e:
111 111 print e
112 112 pass
113 113 if p.poll() is None:
114 114 time.sleep(.25)
115 115 if p.poll() is None:
116 116 try:
117 117 print 'cleaning up test process...'
118 118 p.signal(SIGKILL)
119 119 except:
120 120 print "couldn't shutdown process: ", p
121 121 blackhole.close()
122 122
@@ -1,205 +1,205 b''
1 1 """Tests for asyncresult.py
2 2
3 3 Authors:
4 4
5 5 * Min RK
6 6 """
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 import time
20 20
21 21 from IPython.parallel.error import TimeoutError
22 22
23 23 from IPython.parallel import error, Client
24 24 from IPython.parallel.tests import add_engines
25 25 from .clienttest import ClusterTestCase
26 26
27 27 def setup():
28 28 add_engines(2, total=True)
29 29
30 30 def wait(n):
31 31 import time
32 32 time.sleep(n)
33 33 return n
34 34
35 35 class AsyncResultTest(ClusterTestCase):
36 36
37 37 def test_single_result_view(self):
38 38 """various one-target views get the right value for single_result"""
39 39 eid = self.client.ids[-1]
40 40 ar = self.client[eid].apply_async(lambda : 42)
41 41 self.assertEquals(ar.get(), 42)
42 42 ar = self.client[[eid]].apply_async(lambda : 42)
43 43 self.assertEquals(ar.get(), [42])
44 44 ar = self.client[-1:].apply_async(lambda : 42)
45 45 self.assertEquals(ar.get(), [42])
46 46
47 47 def test_get_after_done(self):
48 48 ar = self.client[-1].apply_async(lambda : 42)
49 49 ar.wait()
50 50 self.assertTrue(ar.ready())
51 51 self.assertEquals(ar.get(), 42)
52 52 self.assertEquals(ar.get(), 42)
53 53
54 54 def test_get_before_done(self):
55 55 ar = self.client[-1].apply_async(wait, 0.1)
56 56 self.assertRaises(TimeoutError, ar.get, 0)
57 57 ar.wait(0)
58 58 self.assertFalse(ar.ready())
59 59 self.assertEquals(ar.get(), 0.1)
60 60
61 61 def test_get_after_error(self):
62 62 ar = self.client[-1].apply_async(lambda : 1/0)
63 63 ar.wait(10)
64 64 self.assertRaisesRemote(ZeroDivisionError, ar.get)
65 65 self.assertRaisesRemote(ZeroDivisionError, ar.get)
66 66 self.assertRaisesRemote(ZeroDivisionError, ar.get_dict)
67 67
68 68 def test_get_dict(self):
69 69 n = len(self.client)
70 70 ar = self.client[:].apply_async(lambda : 5)
71 71 self.assertEquals(ar.get(), [5]*n)
72 72 d = ar.get_dict()
73 73 self.assertEquals(sorted(d.keys()), sorted(self.client.ids))
74 74 for eid,r in d.iteritems():
75 75 self.assertEquals(r, 5)
76 76
77 77 def test_list_amr(self):
78 78 ar = self.client.load_balanced_view().map_async(wait, [0.1]*5)
79 79 rlist = list(ar)
80 80
81 81 def test_getattr(self):
82 82 ar = self.client[:].apply_async(wait, 0.5)
83 83 self.assertRaises(AttributeError, lambda : ar._foo)
84 84 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
85 85 self.assertRaises(AttributeError, lambda : ar.foo)
86 86 self.assertRaises(AttributeError, lambda : ar.engine_id)
87 87 self.assertFalse(hasattr(ar, '__length_hint__'))
88 88 self.assertFalse(hasattr(ar, 'foo'))
89 89 self.assertFalse(hasattr(ar, 'engine_id'))
90 90 ar.get(5)
91 91 self.assertRaises(AttributeError, lambda : ar._foo)
92 92 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
93 93 self.assertRaises(AttributeError, lambda : ar.foo)
94 94 self.assertTrue(isinstance(ar.engine_id, list))
95 95 self.assertEquals(ar.engine_id, ar['engine_id'])
96 96 self.assertFalse(hasattr(ar, '__length_hint__'))
97 97 self.assertFalse(hasattr(ar, 'foo'))
98 98 self.assertTrue(hasattr(ar, 'engine_id'))
99 99
100 100 def test_getitem(self):
101 101 ar = self.client[:].apply_async(wait, 0.5)
102 102 self.assertRaises(TimeoutError, lambda : ar['foo'])
103 103 self.assertRaises(TimeoutError, lambda : ar['engine_id'])
104 104 ar.get(5)
105 105 self.assertRaises(KeyError, lambda : ar['foo'])
106 106 self.assertTrue(isinstance(ar['engine_id'], list))
107 107 self.assertEquals(ar.engine_id, ar['engine_id'])
108 108
109 109 def test_single_result(self):
110 110 ar = self.client[-1].apply_async(wait, 0.5)
111 111 self.assertRaises(TimeoutError, lambda : ar['foo'])
112 112 self.assertRaises(TimeoutError, lambda : ar['engine_id'])
113 113 self.assertTrue(ar.get(5) == 0.5)
114 114 self.assertTrue(isinstance(ar['engine_id'], int))
115 115 self.assertTrue(isinstance(ar.engine_id, int))
116 116 self.assertEquals(ar.engine_id, ar['engine_id'])
117 117
118 118 def test_abort(self):
119 119 e = self.client[-1]
120 120 ar = e.execute('import time; time.sleep(1)', block=False)
121 121 ar2 = e.apply_async(lambda : 2)
122 122 ar2.abort()
123 123 self.assertRaises(error.TaskAborted, ar2.get)
124 124 ar.get()
125 125
126 126 def test_len(self):
127 127 v = self.client.load_balanced_view()
128 128 ar = v.map_async(lambda x: x, range(10))
129 129 self.assertEquals(len(ar), 10)
130 130 ar = v.apply_async(lambda x: x, range(10))
131 131 self.assertEquals(len(ar), 1)
132 132 ar = self.client[:].apply_async(lambda x: x, range(10))
133 133 self.assertEquals(len(ar), len(self.client.ids))
134 134
135 135 def test_wall_time_single(self):
136 136 v = self.client.load_balanced_view()
137 137 ar = v.apply_async(time.sleep, 0.25)
138 138 self.assertRaises(TimeoutError, getattr, ar, 'wall_time')
139 139 ar.get(2)
140 140 self.assertTrue(ar.wall_time < 1.)
141 141 self.assertTrue(ar.wall_time > 0.2)
142 142
143 143 def test_wall_time_multi(self):
144 144 self.minimum_engines(4)
145 145 v = self.client[:]
146 146 ar = v.apply_async(time.sleep, 0.25)
147 147 self.assertRaises(TimeoutError, getattr, ar, 'wall_time')
148 148 ar.get(2)
149 149 self.assertTrue(ar.wall_time < 1.)
150 150 self.assertTrue(ar.wall_time > 0.2)
151 151
152 152 def test_serial_time_single(self):
153 153 v = self.client.load_balanced_view()
154 154 ar = v.apply_async(time.sleep, 0.25)
155 155 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
156 156 ar.get(2)
157 self.assertTrue(ar.serial_time < 0.5)
157 self.assertTrue(ar.serial_time < 1.)
158 158 self.assertTrue(ar.serial_time > 0.2)
159 159
160 160 def test_serial_time_multi(self):
161 161 self.minimum_engines(4)
162 162 v = self.client[:]
163 163 ar = v.apply_async(time.sleep, 0.25)
164 164 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
165 165 ar.get(2)
166 166 self.assertTrue(ar.serial_time < 2.)
167 167 self.assertTrue(ar.serial_time > 0.8)
168 168
169 169 def test_elapsed_single(self):
170 170 v = self.client.load_balanced_view()
171 171 ar = v.apply_async(time.sleep, 0.25)
172 172 while not ar.ready():
173 173 time.sleep(0.01)
174 self.assertTrue(ar.elapsed < 0.3)
175 self.assertTrue(ar.elapsed < 0.3)
174 self.assertTrue(ar.elapsed < 1)
175 self.assertTrue(ar.elapsed < 1)
176 176 ar.get(2)
177 177
178 178 def test_elapsed_multi(self):
179 179 v = self.client[:]
180 180 ar = v.apply_async(time.sleep, 0.25)
181 181 while not ar.ready():
182 182 time.sleep(0.01)
183 self.assertTrue(ar.elapsed < 0.3)
184 self.assertTrue(ar.elapsed < 0.3)
183 self.assertTrue(ar.elapsed < 1)
184 self.assertTrue(ar.elapsed < 1)
185 185 ar.get(2)
186 186
187 187 def test_hubresult_timestamps(self):
188 188 self.minimum_engines(4)
189 189 v = self.client[:]
190 190 ar = v.apply_async(time.sleep, 0.25)
191 191 ar.get(2)
192 192 rc2 = Client(profile='iptest')
193 193 # must have try/finally to close second Client, otherwise
194 194 # will have dangling sockets causing problems
195 195 try:
196 196 time.sleep(0.25)
197 197 hr = rc2.get_result(ar.msg_ids)
198 198 self.assertTrue(hr.elapsed > 0., "got bad elapsed: %s" % hr.elapsed)
199 199 hr.get(1)
200 200 self.assertTrue(hr.wall_time < ar.wall_time + 0.2, "got bad wall_time: %s > %s" % (hr.wall_time, ar.wall_time))
201 201 self.assertEquals(hr.serial_time, ar.serial_time)
202 202 finally:
203 203 rc2.close()
204 204
205 205
@@ -1,676 +1,689 b''
1 1 # -*- coding: utf-8 -*-
2 2 """test View objects
3 3
4 4 Authors:
5 5
6 6 * Min RK
7 7 """
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 import sys
20 20 import time
21 21 from tempfile import mktemp
22 22 from StringIO import StringIO
23 23
24 24 import zmq
25 25 from nose import SkipTest
26 26
27 27 from IPython.testing import decorators as dec
28 28 from IPython.testing.ipunittest import ParametricTestCase
29 29
30 30 from IPython import parallel as pmod
31 31 from IPython.parallel import error
32 32 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
33 33 from IPython.parallel import DirectView
34 34 from IPython.parallel.util import interactive
35 35
36 36 from IPython.parallel.tests import add_engines
37 37
38 38 from .clienttest import ClusterTestCase, crash, wait, skip_without
39 39
40 40 def setup():
41 41 add_engines(3, total=True)
42 42
43 43 class TestView(ClusterTestCase, ParametricTestCase):
44 44
45 45 def test_z_crash_mux(self):
46 46 """test graceful handling of engine death (direct)"""
47 47 raise SkipTest("crash tests disabled, due to undesirable crash reports")
48 48 # self.add_engines(1)
49 49 eid = self.client.ids[-1]
50 50 ar = self.client[eid].apply_async(crash)
51 51 self.assertRaisesRemote(error.EngineError, ar.get, 10)
52 52 eid = ar.engine_id
53 53 tic = time.time()
54 54 while eid in self.client.ids and time.time()-tic < 5:
55 55 time.sleep(.01)
56 56 self.client.spin()
57 57 self.assertFalse(eid in self.client.ids, "Engine should have died")
58 58
59 59 def test_push_pull(self):
60 60 """test pushing and pulling"""
61 61 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
62 62 t = self.client.ids[-1]
63 63 v = self.client[t]
64 64 push = v.push
65 65 pull = v.pull
66 66 v.block=True
67 67 nengines = len(self.client)
68 68 push({'data':data})
69 69 d = pull('data')
70 70 self.assertEquals(d, data)
71 71 self.client[:].push({'data':data})
72 72 d = self.client[:].pull('data', block=True)
73 73 self.assertEquals(d, nengines*[data])
74 74 ar = push({'data':data}, block=False)
75 75 self.assertTrue(isinstance(ar, AsyncResult))
76 76 r = ar.get()
77 77 ar = self.client[:].pull('data', block=False)
78 78 self.assertTrue(isinstance(ar, AsyncResult))
79 79 r = ar.get()
80 80 self.assertEquals(r, nengines*[data])
81 81 self.client[:].push(dict(a=10,b=20))
82 82 r = self.client[:].pull(('a','b'), block=True)
83 83 self.assertEquals(r, nengines*[[10,20]])
84 84
85 85 def test_push_pull_function(self):
86 86 "test pushing and pulling functions"
87 87 def testf(x):
88 88 return 2.0*x
89 89
90 90 t = self.client.ids[-1]
91 91 v = self.client[t]
92 92 v.block=True
93 93 push = v.push
94 94 pull = v.pull
95 95 execute = v.execute
96 96 push({'testf':testf})
97 97 r = pull('testf')
98 98 self.assertEqual(r(1.0), testf(1.0))
99 99 execute('r = testf(10)')
100 100 r = pull('r')
101 101 self.assertEquals(r, testf(10))
102 102 ar = self.client[:].push({'testf':testf}, block=False)
103 103 ar.get()
104 104 ar = self.client[:].pull('testf', block=False)
105 105 rlist = ar.get()
106 106 for r in rlist:
107 107 self.assertEqual(r(1.0), testf(1.0))
108 108 execute("def g(x): return x*x")
109 109 r = pull(('testf','g'))
110 110 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
111 111
112 112 def test_push_function_globals(self):
113 113 """test that pushed functions have access to globals"""
114 114 @interactive
115 115 def geta():
116 116 return a
117 117 # self.add_engines(1)
118 118 v = self.client[-1]
119 119 v.block=True
120 120 v['f'] = geta
121 121 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
122 122 v.execute('a=5')
123 123 v.execute('b=f()')
124 124 self.assertEquals(v['b'], 5)
125 125
126 126 def test_push_function_defaults(self):
127 127 """test that pushed functions preserve default args"""
128 128 def echo(a=10):
129 129 return a
130 130 v = self.client[-1]
131 131 v.block=True
132 132 v['f'] = echo
133 133 v.execute('b=f()')
134 134 self.assertEquals(v['b'], 10)
135 135
136 136 def test_get_result(self):
137 137 """test getting results from the Hub."""
138 138 c = pmod.Client(profile='iptest')
139 139 # self.add_engines(1)
140 140 t = c.ids[-1]
141 141 v = c[t]
142 142 v2 = self.client[t]
143 143 ar = v.apply_async(wait, 1)
144 144 # give the monitor time to notice the message
145 145 time.sleep(.25)
146 146 ahr = v2.get_result(ar.msg_ids)
147 147 self.assertTrue(isinstance(ahr, AsyncHubResult))
148 148 self.assertEquals(ahr.get(), ar.get())
149 149 ar2 = v2.get_result(ar.msg_ids)
150 150 self.assertFalse(isinstance(ar2, AsyncHubResult))
151 151 c.spin()
152 152 c.close()
153 153
154 154 def test_run_newline(self):
155 155 """test that run appends newline to files"""
156 156 tmpfile = mktemp()
157 157 with open(tmpfile, 'w') as f:
158 158 f.write("""def g():
159 159 return 5
160 160 """)
161 161 v = self.client[-1]
162 162 v.run(tmpfile, block=True)
163 163 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
164 164
165 165 def test_apply_tracked(self):
166 166 """test tracking for apply"""
167 167 # self.add_engines(1)
168 168 t = self.client.ids[-1]
169 169 v = self.client[t]
170 170 v.block=False
171 171 def echo(n=1024*1024, **kwargs):
172 172 with v.temp_flags(**kwargs):
173 173 return v.apply(lambda x: x, 'x'*n)
174 174 ar = echo(1, track=False)
175 175 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
176 176 self.assertTrue(ar.sent)
177 177 ar = echo(track=True)
178 178 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
179 179 self.assertEquals(ar.sent, ar._tracker.done)
180 180 ar._tracker.wait()
181 181 self.assertTrue(ar.sent)
182 182
183 183 def test_push_tracked(self):
184 184 t = self.client.ids[-1]
185 185 ns = dict(x='x'*1024*1024)
186 186 v = self.client[t]
187 187 ar = v.push(ns, block=False, track=False)
188 188 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
189 189 self.assertTrue(ar.sent)
190 190
191 191 ar = v.push(ns, block=False, track=True)
192 192 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
193 193 ar._tracker.wait()
194 194 self.assertEquals(ar.sent, ar._tracker.done)
195 195 self.assertTrue(ar.sent)
196 196 ar.get()
197 197
198 198 def test_scatter_tracked(self):
199 199 t = self.client.ids
200 200 x='x'*1024*1024
201 201 ar = self.client[t].scatter('x', x, block=False, track=False)
202 202 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
203 203 self.assertTrue(ar.sent)
204 204
205 205 ar = self.client[t].scatter('x', x, block=False, track=True)
206 206 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
207 207 self.assertEquals(ar.sent, ar._tracker.done)
208 208 ar._tracker.wait()
209 209 self.assertTrue(ar.sent)
210 210 ar.get()
211 211
212 212 def test_remote_reference(self):
213 213 v = self.client[-1]
214 214 v['a'] = 123
215 215 ra = pmod.Reference('a')
216 216 b = v.apply_sync(lambda x: x, ra)
217 217 self.assertEquals(b, 123)
218 218
219 219
220 220 def test_scatter_gather(self):
221 221 view = self.client[:]
222 222 seq1 = range(16)
223 223 view.scatter('a', seq1)
224 224 seq2 = view.gather('a', block=True)
225 225 self.assertEquals(seq2, seq1)
226 226 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
227 227
228 228 @skip_without('numpy')
229 229 def test_scatter_gather_numpy(self):
230 230 import numpy
231 231 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
232 232 view = self.client[:]
233 233 a = numpy.arange(64)
234 234 view.scatter('a', a)
235 235 b = view.gather('a', block=True)
236 236 assert_array_equal(b, a)
237 237
238 238 def test_scatter_gather_lazy(self):
239 239 """scatter/gather with targets='all'"""
240 240 view = self.client.direct_view(targets='all')
241 241 x = range(64)
242 242 view.scatter('x', x)
243 243 gathered = view.gather('x', block=True)
244 244 self.assertEquals(gathered, x)
245 245
246 246
247 247 @dec.known_failure_py3
248 248 @skip_without('numpy')
249 249 def test_push_numpy_nocopy(self):
250 250 import numpy
251 251 view = self.client[:]
252 252 a = numpy.arange(64)
253 253 view['A'] = a
254 254 @interactive
255 255 def check_writeable(x):
256 256 return x.flags.writeable
257 257
258 258 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
259 259 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
260 260
261 261 view.push(dict(B=a))
262 262 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
263 263 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
264 264
265 265 @skip_without('numpy')
266 266 def test_apply_numpy(self):
267 267 """view.apply(f, ndarray)"""
268 268 import numpy
269 269 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
270 270
271 271 A = numpy.random.random((100,100))
272 272 view = self.client[-1]
273 273 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
274 274 B = A.astype(dt)
275 275 C = view.apply_sync(lambda x:x, B)
276 276 assert_array_equal(B,C)
277 277
278 278 def test_map(self):
279 279 view = self.client[:]
280 280 def f(x):
281 281 return x**2
282 282 data = range(16)
283 283 r = view.map_sync(f, data)
284 284 self.assertEquals(r, map(f, data))
285 285
286 286 def test_map_iterable(self):
287 287 """test map on iterables (direct)"""
288 288 view = self.client[:]
289 289 # 101 is prime, so it won't be evenly distributed
290 290 arr = range(101)
291 291 # ensure it will be an iterator, even in Python 3
292 292 it = iter(arr)
293 293 r = view.map_sync(lambda x:x, arr)
294 294 self.assertEquals(r, list(arr))
295 295
296 296 def test_scatterGatherNonblocking(self):
297 297 data = range(16)
298 298 view = self.client[:]
299 299 view.scatter('a', data, block=False)
300 300 ar = view.gather('a', block=False)
301 301 self.assertEquals(ar.get(), data)
302 302
303 303 @skip_without('numpy')
304 304 def test_scatter_gather_numpy_nonblocking(self):
305 305 import numpy
306 306 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
307 307 a = numpy.arange(64)
308 308 view = self.client[:]
309 309 ar = view.scatter('a', a, block=False)
310 310 self.assertTrue(isinstance(ar, AsyncResult))
311 311 amr = view.gather('a', block=False)
312 312 self.assertTrue(isinstance(amr, AsyncMapResult))
313 313 assert_array_equal(amr.get(), a)
314 314
315 315 def test_execute(self):
316 316 view = self.client[:]
317 317 # self.client.debug=True
318 318 execute = view.execute
319 319 ar = execute('c=30', block=False)
320 320 self.assertTrue(isinstance(ar, AsyncResult))
321 321 ar = execute('d=[0,1,2]', block=False)
322 322 self.client.wait(ar, 1)
323 323 self.assertEquals(len(ar.get()), len(self.client))
324 324 for c in view['c']:
325 325 self.assertEquals(c, 30)
326 326
327 327 def test_abort(self):
328 328 view = self.client[-1]
329 329 ar = view.execute('import time; time.sleep(1)', block=False)
330 330 ar2 = view.apply_async(lambda : 2)
331 331 ar3 = view.apply_async(lambda : 3)
332 332 view.abort(ar2)
333 333 view.abort(ar3.msg_ids)
334 334 self.assertRaises(error.TaskAborted, ar2.get)
335 335 self.assertRaises(error.TaskAborted, ar3.get)
336 336
337 337 def test_abort_all(self):
338 338 """view.abort() aborts all outstanding tasks"""
339 339 view = self.client[-1]
340 340 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
341 341 view.abort()
342 342 view.wait(timeout=5)
343 343 for ar in ars[5:]:
344 344 self.assertRaises(error.TaskAborted, ar.get)
345 345
346 346 def test_temp_flags(self):
347 347 view = self.client[-1]
348 348 view.block=True
349 349 with view.temp_flags(block=False):
350 350 self.assertFalse(view.block)
351 351 self.assertTrue(view.block)
352 352
353 353 @dec.known_failure_py3
354 354 def test_importer(self):
355 355 view = self.client[-1]
356 356 view.clear(block=True)
357 357 with view.importer:
358 358 import re
359 359
360 360 @interactive
361 361 def findall(pat, s):
362 362 # this globals() step isn't necessary in real code
363 363 # only to prevent a closure in the test
364 364 re = globals()['re']
365 365 return re.findall(pat, s)
366 366
367 367 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
368 368
369 369 # parallel magic tests
370 370
371 371 def test_magic_px_blocking(self):
372 372 ip = get_ipython()
373 373 v = self.client[-1]
374 374 v.activate()
375 375 v.block=True
376 376
377 377 ip.magic_px('a=5')
378 378 self.assertEquals(v['a'], 5)
379 379 ip.magic_px('a=10')
380 380 self.assertEquals(v['a'], 10)
381 381 sio = StringIO()
382 382 savestdout = sys.stdout
383 383 sys.stdout = sio
384 384 # just 'print a' worst ~99% of the time, but this ensures that
385 385 # the stdout message has arrived when the result is finished:
386 386 ip.magic_px('import sys,time;print (a); sys.stdout.flush();time.sleep(0.2)')
387 387 sys.stdout = savestdout
388 388 buf = sio.getvalue()
389 389 self.assertTrue('[stdout:' in buf, buf)
390 390 self.assertTrue(buf.rstrip().endswith('10'))
391 391 self.assertRaisesRemote(ZeroDivisionError, ip.magic_px, '1/0')
392 392
393 393 def test_magic_px_nonblocking(self):
394 394 ip = get_ipython()
395 395 v = self.client[-1]
396 396 v.activate()
397 397 v.block=False
398 398
399 399 ip.magic_px('a=5')
400 400 self.assertEquals(v['a'], 5)
401 401 ip.magic_px('a=10')
402 402 self.assertEquals(v['a'], 10)
403 403 sio = StringIO()
404 404 savestdout = sys.stdout
405 405 sys.stdout = sio
406 406 ip.magic_px('print a')
407 407 sys.stdout = savestdout
408 408 buf = sio.getvalue()
409 409 self.assertFalse('[stdout:%i]'%v.targets in buf)
410 410 ip.magic_px('1/0')
411 411 ar = v.get_result(-1)
412 412 self.assertRaisesRemote(ZeroDivisionError, ar.get)
413 413
414 414 def test_magic_autopx_blocking(self):
415 415 ip = get_ipython()
416 416 v = self.client[-1]
417 417 v.activate()
418 418 v.block=True
419 419
420 420 sio = StringIO()
421 421 savestdout = sys.stdout
422 422 sys.stdout = sio
423 423 ip.magic_autopx()
424 424 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
425 425 ip.run_cell('b*=2')
426 426 ip.run_cell('print (b)')
427 427 ip.run_cell("b/c")
428 428 ip.magic_autopx()
429 429 sys.stdout = savestdout
430 430 output = sio.getvalue().strip()
431 431 self.assertTrue(output.startswith('%autopx enabled'))
432 432 self.assertTrue(output.endswith('%autopx disabled'))
433 433 self.assertTrue('RemoteError: ZeroDivisionError' in output)
434 434 ar = v.get_result(-1)
435 435 self.assertEquals(v['a'], 5)
436 436 self.assertEquals(v['b'], 20)
437 437 self.assertRaisesRemote(ZeroDivisionError, ar.get)
438 438
439 439 def test_magic_autopx_nonblocking(self):
440 440 ip = get_ipython()
441 441 v = self.client[-1]
442 442 v.activate()
443 443 v.block=False
444 444
445 445 sio = StringIO()
446 446 savestdout = sys.stdout
447 447 sys.stdout = sio
448 448 ip.magic_autopx()
449 449 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
450 450 ip.run_cell('print (b)')
451 451 ip.run_cell('import time; time.sleep(0.1)')
452 452 ip.run_cell("b/c")
453 453 ip.run_cell('b*=2')
454 454 ip.magic_autopx()
455 455 sys.stdout = savestdout
456 456 output = sio.getvalue().strip()
457 457 self.assertTrue(output.startswith('%autopx enabled'))
458 458 self.assertTrue(output.endswith('%autopx disabled'))
459 459 self.assertFalse('ZeroDivisionError' in output)
460 460 ar = v.get_result(-2)
461 461 self.assertRaisesRemote(ZeroDivisionError, ar.get)
462 462 # prevent TaskAborted on pulls, due to ZeroDivisionError
463 463 time.sleep(0.5)
464 464 self.assertEquals(v['a'], 5)
465 465 # b*=2 will not fire, due to abort
466 466 self.assertEquals(v['b'], 10)
467 467
468 468 def test_magic_result(self):
469 469 ip = get_ipython()
470 470 v = self.client[-1]
471 471 v.activate()
472 472 v['a'] = 111
473 473 ra = v['a']
474 474
475 475 ar = ip.magic_result()
476 476 self.assertEquals(ar.msg_ids, [v.history[-1]])
477 477 self.assertEquals(ar.get(), 111)
478 478 ar = ip.magic_result('-2')
479 479 self.assertEquals(ar.msg_ids, [v.history[-2]])
480 480
481 481 def test_unicode_execute(self):
482 482 """test executing unicode strings"""
483 483 v = self.client[-1]
484 484 v.block=True
485 485 if sys.version_info[0] >= 3:
486 486 code="a='é'"
487 487 else:
488 488 code=u"a=u'é'"
489 489 v.execute(code)
490 490 self.assertEquals(v['a'], u'é')
491 491
492 492 def test_unicode_apply_result(self):
493 493 """test unicode apply results"""
494 494 v = self.client[-1]
495 495 r = v.apply_sync(lambda : u'é')
496 496 self.assertEquals(r, u'é')
497 497
498 498 def test_unicode_apply_arg(self):
499 499 """test passing unicode arguments to apply"""
500 500 v = self.client[-1]
501 501
502 502 @interactive
503 503 def check_unicode(a, check):
504 504 assert isinstance(a, unicode), "%r is not unicode"%a
505 505 assert isinstance(check, bytes), "%r is not bytes"%check
506 506 assert a.encode('utf8') == check, "%s != %s"%(a,check)
507 507
508 508 for s in [ u'é', u'ßø®∫',u'asdf' ]:
509 509 try:
510 510 v.apply_sync(check_unicode, s, s.encode('utf8'))
511 511 except error.RemoteError as e:
512 512 if e.ename == 'AssertionError':
513 513 self.fail(e.evalue)
514 514 else:
515 515 raise e
516 516
517 517 def test_map_reference(self):
518 518 """view.map(<Reference>, *seqs) should work"""
519 519 v = self.client[:]
520 520 v.scatter('n', self.client.ids, flatten=True)
521 521 v.execute("f = lambda x,y: x*y")
522 522 rf = pmod.Reference('f')
523 523 nlist = list(range(10))
524 524 mlist = nlist[::-1]
525 525 expected = [ m*n for m,n in zip(mlist, nlist) ]
526 526 result = v.map_sync(rf, mlist, nlist)
527 527 self.assertEquals(result, expected)
528 528
529 529 def test_apply_reference(self):
530 530 """view.apply(<Reference>, *args) should work"""
531 531 v = self.client[:]
532 532 v.scatter('n', self.client.ids, flatten=True)
533 533 v.execute("f = lambda x: n*x")
534 534 rf = pmod.Reference('f')
535 535 result = v.apply_sync(rf, 5)
536 536 expected = [ 5*id for id in self.client.ids ]
537 537 self.assertEquals(result, expected)
538 538
539 539 def test_eval_reference(self):
540 540 v = self.client[self.client.ids[0]]
541 541 v['g'] = range(5)
542 542 rg = pmod.Reference('g[0]')
543 543 echo = lambda x:x
544 544 self.assertEquals(v.apply_sync(echo, rg), 0)
545 545
546 546 def test_reference_nameerror(self):
547 547 v = self.client[self.client.ids[0]]
548 548 r = pmod.Reference('elvis_has_left')
549 549 echo = lambda x:x
550 550 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
551 551
552 552 def test_single_engine_map(self):
553 553 e0 = self.client[self.client.ids[0]]
554 554 r = range(5)
555 555 check = [ -1*i for i in r ]
556 556 result = e0.map_sync(lambda x: -1*x, r)
557 557 self.assertEquals(result, check)
558 558
559 559 def test_len(self):
560 560 """len(view) makes sense"""
561 561 e0 = self.client[self.client.ids[0]]
562 562 yield self.assertEquals(len(e0), 1)
563 563 v = self.client[:]
564 564 yield self.assertEquals(len(v), len(self.client.ids))
565 565 v = self.client.direct_view('all')
566 566 yield self.assertEquals(len(v), len(self.client.ids))
567 567 v = self.client[:2]
568 568 yield self.assertEquals(len(v), 2)
569 569 v = self.client[:1]
570 570 yield self.assertEquals(len(v), 1)
571 571 v = self.client.load_balanced_view()
572 572 yield self.assertEquals(len(v), len(self.client.ids))
573 573 # parametric tests seem to require manual closing?
574 574 self.client.close()
575 575
576 576
577 577 # begin execute tests
578 def _wait_for(self, f, timeout=10):
579 tic = time.time()
580 while time.time() <= tic + timeout:
581 if f():
582 return
583 time.sleep(0.1)
584 self.client.spin()
585 if not f():
586 print "Warning: Awaited condition never arrived"
587
578 588
579 589 def test_execute_reply(self):
580 590 e0 = self.client[self.client.ids[0]]
581 591 e0.block = True
582 592 ar = e0.execute("5", silent=False)
583 593 er = ar.get()
584 time.sleep(0.2)
594 self._wait_for(lambda : bool(er.pyout))
585 595 self.assertEquals(str(er), "<ExecuteReply[%i]: 5>" % er.execution_count)
586 596 self.assertEquals(er.pyout['text/plain'], '5')
587 597
588 598 def test_execute_reply_stdout(self):
589 599 e0 = self.client[self.client.ids[0]]
590 600 e0.block = True
591 601 ar = e0.execute("print (5)", silent=False)
592 602 er = ar.get()
593 time.sleep(0.2)
603 self._wait_for(lambda : bool(er.stdout))
594 604 self.assertEquals(er.stdout.strip(), '5')
595 605
596 606 def test_execute_pyout(self):
597 607 """execute triggers pyout with silent=False"""
598 608 view = self.client[:]
599 609 ar = view.execute("5", silent=False, block=True)
600 time.sleep(0.2)
610 self._wait_for(lambda : all(ar.pyout))
611
601 612 expected = [{'text/plain' : '5'}] * len(view)
602 613 self.assertEquals(ar.pyout, expected)
603 614
604 615 def test_execute_silent(self):
605 616 """execute does not trigger pyout with silent=True"""
606 617 view = self.client[:]
607 618 ar = view.execute("5", block=True)
608 619 expected = [None] * len(view)
609 620 self.assertEquals(ar.pyout, expected)
610 621
611 622 def test_execute_magic(self):
612 623 """execute accepts IPython commands"""
613 624 view = self.client[:]
614 625 view.execute("a = 5")
615 626 ar = view.execute("%whos", block=True)
616 627 # this will raise, if that failed
617 628 ar.get(5)
618 time.sleep(0.2)
629 self._wait_for(lambda : all(ar.stdout))
619 630 for stdout in ar.stdout:
620 631 lines = stdout.splitlines()
621 632 self.assertEquals(lines[0].split(), ['Variable', 'Type', 'Data/Info'])
622 633 found = False
623 634 for line in lines[2:]:
624 635 split = line.split()
625 636 if split == ['a', 'int', '5']:
626 637 found = True
627 638 break
628 639 self.assertTrue(found, "whos output wrong: %s" % stdout)
629 640
630 641 def test_execute_displaypub(self):
631 642 """execute tracks display_pub output"""
632 643 view = self.client[:]
633 644 view.execute("from IPython.core.display import *")
634 645 ar = view.execute("[ display(i) for i in range(5) ]", block=True)
635 time.sleep(0.2)
646
647 self._wait_for(lambda : all(len(er.outputs) >= 5 for er in ar))
636 648 outs = [ {u'text/plain' : unicode(i)} for i in range(5) ]
637 649 expected = [outs] * len(view)
638 650 self.assertEquals(ar.outputs, expected)
639 651
640 652 def test_apply_displaypub(self):
641 653 """apply tracks display_pub output"""
642 654 view = self.client[:]
643 655 view.execute("from IPython.core.display import *")
644 656
645 657 @interactive
646 658 def publish():
647 659 [ display(i) for i in range(5) ]
648 660
649 661 ar = view.apply_async(publish)
650 662 ar.get(5)
651 time.sleep(0.2)
663 self._wait_for(lambda : all(len(out) >= 5 for out in ar.outputs))
652 664 outs = [ {u'text/plain' : unicode(j)} for j in range(5) ]
653 665 expected = [outs] * len(view)
654 666 self.assertEquals(ar.outputs, expected)
655 667
656 668 def test_execute_raises(self):
657 669 """exceptions in execute requests raise appropriately"""
658 670 view = self.client[-1]
659 671 ar = view.execute("1/0")
660 672 self.assertRaisesRemote(ZeroDivisionError, ar.get, 2)
661 673
662 674 @dec.skipif_not_matplotlib
663 675 def test_amagic_pylab(self):
664 676 """%pylab works on engines"""
665 677 view = self.client[-1]
666 678 ar = view.execute("%pylab inline")
667 679 # at least check if this raised:
668 680 reply = ar.get(5)
669 681 # include imports, in case user config
670 682 ar = view.execute("plot(rand(100))", silent=False)
671 683 reply = ar.get(5)
684 self._wait_for(lambda : all(ar.outputs))
672 685 self.assertEquals(len(reply.outputs), 1)
673 686 output = reply.outputs[0]
674 687 self.assertTrue("image/png" in output)
675 688
676 689
General Comments 0
You need to be logged in to leave comments. Login now