##// END OF EJS Templates
relax some timing constraints in parallel tests
MinRK -
Show More
@@ -1,122 +1,122 b''
1 """toplevel setup/teardown for parallel tests."""
1 """toplevel setup/teardown for parallel tests."""
2
2
3 #-------------------------------------------------------------------------------
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2011 The IPython Development Team
4 # Copyright (C) 2011 The IPython Development Team
5 #
5 #
6 # Distributed under the terms of the BSD License. The full license is in
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9
9
10 #-------------------------------------------------------------------------------
10 #-------------------------------------------------------------------------------
11 # Imports
11 # Imports
12 #-------------------------------------------------------------------------------
12 #-------------------------------------------------------------------------------
13
13
14 import os
14 import os
15 import tempfile
15 import tempfile
16 import time
16 import time
17 from subprocess import Popen
17 from subprocess import Popen
18
18
19 from IPython.utils.path import get_ipython_dir
19 from IPython.utils.path import get_ipython_dir
20 from IPython.parallel import Client
20 from IPython.parallel import Client
21 from IPython.parallel.apps.launcher import (LocalProcessLauncher,
21 from IPython.parallel.apps.launcher import (LocalProcessLauncher,
22 ipengine_cmd_argv,
22 ipengine_cmd_argv,
23 ipcontroller_cmd_argv,
23 ipcontroller_cmd_argv,
24 SIGKILL,
24 SIGKILL,
25 ProcessStateError,
25 ProcessStateError,
26 )
26 )
27
27
28 # globals
28 # globals
29 launchers = []
29 launchers = []
30 blackhole = open(os.devnull, 'w')
30 blackhole = open(os.devnull, 'w')
31
31
32 # Launcher class
32 # Launcher class
33 class TestProcessLauncher(LocalProcessLauncher):
33 class TestProcessLauncher(LocalProcessLauncher):
34 """subclass LocalProcessLauncher, to prevent extra sockets and threads being created on Windows"""
34 """subclass LocalProcessLauncher, to prevent extra sockets and threads being created on Windows"""
35 def start(self):
35 def start(self):
36 if self.state == 'before':
36 if self.state == 'before':
37 self.process = Popen(self.args,
37 self.process = Popen(self.args,
38 stdout=blackhole, stderr=blackhole,
38 stdout=blackhole, stderr=blackhole,
39 env=os.environ,
39 env=os.environ,
40 cwd=self.work_dir
40 cwd=self.work_dir
41 )
41 )
42 self.notify_start(self.process.pid)
42 self.notify_start(self.process.pid)
43 self.poll = self.process.poll
43 self.poll = self.process.poll
44 else:
44 else:
45 s = 'The process was already started and has state: %r' % self.state
45 s = 'The process was already started and has state: %r' % self.state
46 raise ProcessStateError(s)
46 raise ProcessStateError(s)
47
47
48 # nose setup/teardown
48 # nose setup/teardown
49
49
50 def setup():
50 def setup():
51 cluster_dir = os.path.join(get_ipython_dir(), 'profile_iptest')
51 cluster_dir = os.path.join(get_ipython_dir(), 'profile_iptest')
52 engine_json = os.path.join(cluster_dir, 'security', 'ipcontroller-engine.json')
52 engine_json = os.path.join(cluster_dir, 'security', 'ipcontroller-engine.json')
53 client_json = os.path.join(cluster_dir, 'security', 'ipcontroller-client.json')
53 client_json = os.path.join(cluster_dir, 'security', 'ipcontroller-client.json')
54 for json in (engine_json, client_json):
54 for json in (engine_json, client_json):
55 if os.path.exists(json):
55 if os.path.exists(json):
56 os.remove(json)
56 os.remove(json)
57
57
58 cp = TestProcessLauncher()
58 cp = TestProcessLauncher()
59 cp.cmd_and_args = ipcontroller_cmd_argv + \
59 cp.cmd_and_args = ipcontroller_cmd_argv + \
60 ['--profile=iptest', '--log-level=50', '--ping=250']
60 ['--profile=iptest', '--log-level=50', '--ping=250']
61 cp.start()
61 cp.start()
62 launchers.append(cp)
62 launchers.append(cp)
63 tic = time.time()
63 tic = time.time()
64 while not os.path.exists(engine_json) or not os.path.exists(client_json):
64 while not os.path.exists(engine_json) or not os.path.exists(client_json):
65 if cp.poll() is not None:
65 if cp.poll() is not None:
66 print cp.poll()
66 print cp.poll()
67 raise RuntimeError("The test controller failed to start.")
67 raise RuntimeError("The test controller failed to start.")
68 elif time.time()-tic > 10:
68 elif time.time()-tic > 15:
69 raise RuntimeError("Timeout waiting for the test controller to start.")
69 raise RuntimeError("Timeout waiting for the test controller to start.")
70 time.sleep(0.1)
70 time.sleep(0.1)
71 add_engines(1)
71 add_engines(1)
72
72
73 def add_engines(n=1, profile='iptest', total=False):
73 def add_engines(n=1, profile='iptest', total=False):
74 """add a number of engines to a given profile.
74 """add a number of engines to a given profile.
75
75
76 If total is True, then already running engines are counted, and only
76 If total is True, then already running engines are counted, and only
77 the additional engines necessary (if any) are started.
77 the additional engines necessary (if any) are started.
78 """
78 """
79 rc = Client(profile=profile)
79 rc = Client(profile=profile)
80 base = len(rc)
80 base = len(rc)
81
81
82 if total:
82 if total:
83 n = max(n - base, 0)
83 n = max(n - base, 0)
84
84
85 eps = []
85 eps = []
86 for i in range(n):
86 for i in range(n):
87 ep = TestProcessLauncher()
87 ep = TestProcessLauncher()
88 ep.cmd_and_args = ipengine_cmd_argv + ['--profile=%s'%profile, '--log-level=50']
88 ep.cmd_and_args = ipengine_cmd_argv + ['--profile=%s'%profile, '--log-level=50']
89 ep.start()
89 ep.start()
90 launchers.append(ep)
90 launchers.append(ep)
91 eps.append(ep)
91 eps.append(ep)
92 tic = time.time()
92 tic = time.time()
93 while len(rc) < base+n:
93 while len(rc) < base+n:
94 if any([ ep.poll() is not None for ep in eps ]):
94 if any([ ep.poll() is not None for ep in eps ]):
95 raise RuntimeError("A test engine failed to start.")
95 raise RuntimeError("A test engine failed to start.")
96 elif time.time()-tic > 10:
96 elif time.time()-tic > 15:
97 raise RuntimeError("Timeout waiting for engines to connect.")
97 raise RuntimeError("Timeout waiting for engines to connect.")
98 time.sleep(.1)
98 time.sleep(.1)
99 rc.spin()
99 rc.spin()
100 rc.close()
100 rc.close()
101 return eps
101 return eps
102
102
103 def teardown():
103 def teardown():
104 time.sleep(1)
104 time.sleep(1)
105 while launchers:
105 while launchers:
106 p = launchers.pop()
106 p = launchers.pop()
107 if p.poll() is None:
107 if p.poll() is None:
108 try:
108 try:
109 p.stop()
109 p.stop()
110 except Exception, e:
110 except Exception, e:
111 print e
111 print e
112 pass
112 pass
113 if p.poll() is None:
113 if p.poll() is None:
114 time.sleep(.25)
114 time.sleep(.25)
115 if p.poll() is None:
115 if p.poll() is None:
116 try:
116 try:
117 print 'cleaning up test process...'
117 print 'cleaning up test process...'
118 p.signal(SIGKILL)
118 p.signal(SIGKILL)
119 except:
119 except:
120 print "couldn't shutdown process: ", p
120 print "couldn't shutdown process: ", p
121 blackhole.close()
121 blackhole.close()
122
122
@@ -1,205 +1,205 b''
1 """Tests for asyncresult.py
1 """Tests for asyncresult.py
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import time
19 import time
20
20
21 from IPython.parallel.error import TimeoutError
21 from IPython.parallel.error import TimeoutError
22
22
23 from IPython.parallel import error, Client
23 from IPython.parallel import error, Client
24 from IPython.parallel.tests import add_engines
24 from IPython.parallel.tests import add_engines
25 from .clienttest import ClusterTestCase
25 from .clienttest import ClusterTestCase
26
26
27 def setup():
27 def setup():
28 add_engines(2, total=True)
28 add_engines(2, total=True)
29
29
30 def wait(n):
30 def wait(n):
31 import time
31 import time
32 time.sleep(n)
32 time.sleep(n)
33 return n
33 return n
34
34
35 class AsyncResultTest(ClusterTestCase):
35 class AsyncResultTest(ClusterTestCase):
36
36
37 def test_single_result_view(self):
37 def test_single_result_view(self):
38 """various one-target views get the right value for single_result"""
38 """various one-target views get the right value for single_result"""
39 eid = self.client.ids[-1]
39 eid = self.client.ids[-1]
40 ar = self.client[eid].apply_async(lambda : 42)
40 ar = self.client[eid].apply_async(lambda : 42)
41 self.assertEquals(ar.get(), 42)
41 self.assertEquals(ar.get(), 42)
42 ar = self.client[[eid]].apply_async(lambda : 42)
42 ar = self.client[[eid]].apply_async(lambda : 42)
43 self.assertEquals(ar.get(), [42])
43 self.assertEquals(ar.get(), [42])
44 ar = self.client[-1:].apply_async(lambda : 42)
44 ar = self.client[-1:].apply_async(lambda : 42)
45 self.assertEquals(ar.get(), [42])
45 self.assertEquals(ar.get(), [42])
46
46
47 def test_get_after_done(self):
47 def test_get_after_done(self):
48 ar = self.client[-1].apply_async(lambda : 42)
48 ar = self.client[-1].apply_async(lambda : 42)
49 ar.wait()
49 ar.wait()
50 self.assertTrue(ar.ready())
50 self.assertTrue(ar.ready())
51 self.assertEquals(ar.get(), 42)
51 self.assertEquals(ar.get(), 42)
52 self.assertEquals(ar.get(), 42)
52 self.assertEquals(ar.get(), 42)
53
53
54 def test_get_before_done(self):
54 def test_get_before_done(self):
55 ar = self.client[-1].apply_async(wait, 0.1)
55 ar = self.client[-1].apply_async(wait, 0.1)
56 self.assertRaises(TimeoutError, ar.get, 0)
56 self.assertRaises(TimeoutError, ar.get, 0)
57 ar.wait(0)
57 ar.wait(0)
58 self.assertFalse(ar.ready())
58 self.assertFalse(ar.ready())
59 self.assertEquals(ar.get(), 0.1)
59 self.assertEquals(ar.get(), 0.1)
60
60
61 def test_get_after_error(self):
61 def test_get_after_error(self):
62 ar = self.client[-1].apply_async(lambda : 1/0)
62 ar = self.client[-1].apply_async(lambda : 1/0)
63 ar.wait(10)
63 ar.wait(10)
64 self.assertRaisesRemote(ZeroDivisionError, ar.get)
64 self.assertRaisesRemote(ZeroDivisionError, ar.get)
65 self.assertRaisesRemote(ZeroDivisionError, ar.get)
65 self.assertRaisesRemote(ZeroDivisionError, ar.get)
66 self.assertRaisesRemote(ZeroDivisionError, ar.get_dict)
66 self.assertRaisesRemote(ZeroDivisionError, ar.get_dict)
67
67
68 def test_get_dict(self):
68 def test_get_dict(self):
69 n = len(self.client)
69 n = len(self.client)
70 ar = self.client[:].apply_async(lambda : 5)
70 ar = self.client[:].apply_async(lambda : 5)
71 self.assertEquals(ar.get(), [5]*n)
71 self.assertEquals(ar.get(), [5]*n)
72 d = ar.get_dict()
72 d = ar.get_dict()
73 self.assertEquals(sorted(d.keys()), sorted(self.client.ids))
73 self.assertEquals(sorted(d.keys()), sorted(self.client.ids))
74 for eid,r in d.iteritems():
74 for eid,r in d.iteritems():
75 self.assertEquals(r, 5)
75 self.assertEquals(r, 5)
76
76
77 def test_list_amr(self):
77 def test_list_amr(self):
78 ar = self.client.load_balanced_view().map_async(wait, [0.1]*5)
78 ar = self.client.load_balanced_view().map_async(wait, [0.1]*5)
79 rlist = list(ar)
79 rlist = list(ar)
80
80
81 def test_getattr(self):
81 def test_getattr(self):
82 ar = self.client[:].apply_async(wait, 0.5)
82 ar = self.client[:].apply_async(wait, 0.5)
83 self.assertRaises(AttributeError, lambda : ar._foo)
83 self.assertRaises(AttributeError, lambda : ar._foo)
84 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
84 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
85 self.assertRaises(AttributeError, lambda : ar.foo)
85 self.assertRaises(AttributeError, lambda : ar.foo)
86 self.assertRaises(AttributeError, lambda : ar.engine_id)
86 self.assertRaises(AttributeError, lambda : ar.engine_id)
87 self.assertFalse(hasattr(ar, '__length_hint__'))
87 self.assertFalse(hasattr(ar, '__length_hint__'))
88 self.assertFalse(hasattr(ar, 'foo'))
88 self.assertFalse(hasattr(ar, 'foo'))
89 self.assertFalse(hasattr(ar, 'engine_id'))
89 self.assertFalse(hasattr(ar, 'engine_id'))
90 ar.get(5)
90 ar.get(5)
91 self.assertRaises(AttributeError, lambda : ar._foo)
91 self.assertRaises(AttributeError, lambda : ar._foo)
92 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
92 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
93 self.assertRaises(AttributeError, lambda : ar.foo)
93 self.assertRaises(AttributeError, lambda : ar.foo)
94 self.assertTrue(isinstance(ar.engine_id, list))
94 self.assertTrue(isinstance(ar.engine_id, list))
95 self.assertEquals(ar.engine_id, ar['engine_id'])
95 self.assertEquals(ar.engine_id, ar['engine_id'])
96 self.assertFalse(hasattr(ar, '__length_hint__'))
96 self.assertFalse(hasattr(ar, '__length_hint__'))
97 self.assertFalse(hasattr(ar, 'foo'))
97 self.assertFalse(hasattr(ar, 'foo'))
98 self.assertTrue(hasattr(ar, 'engine_id'))
98 self.assertTrue(hasattr(ar, 'engine_id'))
99
99
100 def test_getitem(self):
100 def test_getitem(self):
101 ar = self.client[:].apply_async(wait, 0.5)
101 ar = self.client[:].apply_async(wait, 0.5)
102 self.assertRaises(TimeoutError, lambda : ar['foo'])
102 self.assertRaises(TimeoutError, lambda : ar['foo'])
103 self.assertRaises(TimeoutError, lambda : ar['engine_id'])
103 self.assertRaises(TimeoutError, lambda : ar['engine_id'])
104 ar.get(5)
104 ar.get(5)
105 self.assertRaises(KeyError, lambda : ar['foo'])
105 self.assertRaises(KeyError, lambda : ar['foo'])
106 self.assertTrue(isinstance(ar['engine_id'], list))
106 self.assertTrue(isinstance(ar['engine_id'], list))
107 self.assertEquals(ar.engine_id, ar['engine_id'])
107 self.assertEquals(ar.engine_id, ar['engine_id'])
108
108
109 def test_single_result(self):
109 def test_single_result(self):
110 ar = self.client[-1].apply_async(wait, 0.5)
110 ar = self.client[-1].apply_async(wait, 0.5)
111 self.assertRaises(TimeoutError, lambda : ar['foo'])
111 self.assertRaises(TimeoutError, lambda : ar['foo'])
112 self.assertRaises(TimeoutError, lambda : ar['engine_id'])
112 self.assertRaises(TimeoutError, lambda : ar['engine_id'])
113 self.assertTrue(ar.get(5) == 0.5)
113 self.assertTrue(ar.get(5) == 0.5)
114 self.assertTrue(isinstance(ar['engine_id'], int))
114 self.assertTrue(isinstance(ar['engine_id'], int))
115 self.assertTrue(isinstance(ar.engine_id, int))
115 self.assertTrue(isinstance(ar.engine_id, int))
116 self.assertEquals(ar.engine_id, ar['engine_id'])
116 self.assertEquals(ar.engine_id, ar['engine_id'])
117
117
118 def test_abort(self):
118 def test_abort(self):
119 e = self.client[-1]
119 e = self.client[-1]
120 ar = e.execute('import time; time.sleep(1)', block=False)
120 ar = e.execute('import time; time.sleep(1)', block=False)
121 ar2 = e.apply_async(lambda : 2)
121 ar2 = e.apply_async(lambda : 2)
122 ar2.abort()
122 ar2.abort()
123 self.assertRaises(error.TaskAborted, ar2.get)
123 self.assertRaises(error.TaskAborted, ar2.get)
124 ar.get()
124 ar.get()
125
125
126 def test_len(self):
126 def test_len(self):
127 v = self.client.load_balanced_view()
127 v = self.client.load_balanced_view()
128 ar = v.map_async(lambda x: x, range(10))
128 ar = v.map_async(lambda x: x, range(10))
129 self.assertEquals(len(ar), 10)
129 self.assertEquals(len(ar), 10)
130 ar = v.apply_async(lambda x: x, range(10))
130 ar = v.apply_async(lambda x: x, range(10))
131 self.assertEquals(len(ar), 1)
131 self.assertEquals(len(ar), 1)
132 ar = self.client[:].apply_async(lambda x: x, range(10))
132 ar = self.client[:].apply_async(lambda x: x, range(10))
133 self.assertEquals(len(ar), len(self.client.ids))
133 self.assertEquals(len(ar), len(self.client.ids))
134
134
135 def test_wall_time_single(self):
135 def test_wall_time_single(self):
136 v = self.client.load_balanced_view()
136 v = self.client.load_balanced_view()
137 ar = v.apply_async(time.sleep, 0.25)
137 ar = v.apply_async(time.sleep, 0.25)
138 self.assertRaises(TimeoutError, getattr, ar, 'wall_time')
138 self.assertRaises(TimeoutError, getattr, ar, 'wall_time')
139 ar.get(2)
139 ar.get(2)
140 self.assertTrue(ar.wall_time < 1.)
140 self.assertTrue(ar.wall_time < 1.)
141 self.assertTrue(ar.wall_time > 0.2)
141 self.assertTrue(ar.wall_time > 0.2)
142
142
143 def test_wall_time_multi(self):
143 def test_wall_time_multi(self):
144 self.minimum_engines(4)
144 self.minimum_engines(4)
145 v = self.client[:]
145 v = self.client[:]
146 ar = v.apply_async(time.sleep, 0.25)
146 ar = v.apply_async(time.sleep, 0.25)
147 self.assertRaises(TimeoutError, getattr, ar, 'wall_time')
147 self.assertRaises(TimeoutError, getattr, ar, 'wall_time')
148 ar.get(2)
148 ar.get(2)
149 self.assertTrue(ar.wall_time < 1.)
149 self.assertTrue(ar.wall_time < 1.)
150 self.assertTrue(ar.wall_time > 0.2)
150 self.assertTrue(ar.wall_time > 0.2)
151
151
152 def test_serial_time_single(self):
152 def test_serial_time_single(self):
153 v = self.client.load_balanced_view()
153 v = self.client.load_balanced_view()
154 ar = v.apply_async(time.sleep, 0.25)
154 ar = v.apply_async(time.sleep, 0.25)
155 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
155 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
156 ar.get(2)
156 ar.get(2)
157 self.assertTrue(ar.serial_time < 0.5)
157 self.assertTrue(ar.serial_time < 1.)
158 self.assertTrue(ar.serial_time > 0.2)
158 self.assertTrue(ar.serial_time > 0.2)
159
159
160 def test_serial_time_multi(self):
160 def test_serial_time_multi(self):
161 self.minimum_engines(4)
161 self.minimum_engines(4)
162 v = self.client[:]
162 v = self.client[:]
163 ar = v.apply_async(time.sleep, 0.25)
163 ar = v.apply_async(time.sleep, 0.25)
164 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
164 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
165 ar.get(2)
165 ar.get(2)
166 self.assertTrue(ar.serial_time < 2.)
166 self.assertTrue(ar.serial_time < 2.)
167 self.assertTrue(ar.serial_time > 0.8)
167 self.assertTrue(ar.serial_time > 0.8)
168
168
169 def test_elapsed_single(self):
169 def test_elapsed_single(self):
170 v = self.client.load_balanced_view()
170 v = self.client.load_balanced_view()
171 ar = v.apply_async(time.sleep, 0.25)
171 ar = v.apply_async(time.sleep, 0.25)
172 while not ar.ready():
172 while not ar.ready():
173 time.sleep(0.01)
173 time.sleep(0.01)
174 self.assertTrue(ar.elapsed < 0.3)
174 self.assertTrue(ar.elapsed < 1)
175 self.assertTrue(ar.elapsed < 0.3)
175 self.assertTrue(ar.elapsed < 1)
176 ar.get(2)
176 ar.get(2)
177
177
178 def test_elapsed_multi(self):
178 def test_elapsed_multi(self):
179 v = self.client[:]
179 v = self.client[:]
180 ar = v.apply_async(time.sleep, 0.25)
180 ar = v.apply_async(time.sleep, 0.25)
181 while not ar.ready():
181 while not ar.ready():
182 time.sleep(0.01)
182 time.sleep(0.01)
183 self.assertTrue(ar.elapsed < 0.3)
183 self.assertTrue(ar.elapsed < 1)
184 self.assertTrue(ar.elapsed < 0.3)
184 self.assertTrue(ar.elapsed < 1)
185 ar.get(2)
185 ar.get(2)
186
186
187 def test_hubresult_timestamps(self):
187 def test_hubresult_timestamps(self):
188 self.minimum_engines(4)
188 self.minimum_engines(4)
189 v = self.client[:]
189 v = self.client[:]
190 ar = v.apply_async(time.sleep, 0.25)
190 ar = v.apply_async(time.sleep, 0.25)
191 ar.get(2)
191 ar.get(2)
192 rc2 = Client(profile='iptest')
192 rc2 = Client(profile='iptest')
193 # must have try/finally to close second Client, otherwise
193 # must have try/finally to close second Client, otherwise
194 # will have dangling sockets causing problems
194 # will have dangling sockets causing problems
195 try:
195 try:
196 time.sleep(0.25)
196 time.sleep(0.25)
197 hr = rc2.get_result(ar.msg_ids)
197 hr = rc2.get_result(ar.msg_ids)
198 self.assertTrue(hr.elapsed > 0., "got bad elapsed: %s" % hr.elapsed)
198 self.assertTrue(hr.elapsed > 0., "got bad elapsed: %s" % hr.elapsed)
199 hr.get(1)
199 hr.get(1)
200 self.assertTrue(hr.wall_time < ar.wall_time + 0.2, "got bad wall_time: %s > %s" % (hr.wall_time, ar.wall_time))
200 self.assertTrue(hr.wall_time < ar.wall_time + 0.2, "got bad wall_time: %s > %s" % (hr.wall_time, ar.wall_time))
201 self.assertEquals(hr.serial_time, ar.serial_time)
201 self.assertEquals(hr.serial_time, ar.serial_time)
202 finally:
202 finally:
203 rc2.close()
203 rc2.close()
204
204
205
205
@@ -1,676 +1,689 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """test View objects
2 """test View objects
3
3
4 Authors:
4 Authors:
5
5
6 * Min RK
6 * Min RK
7 """
7 """
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import sys
19 import sys
20 import time
20 import time
21 from tempfile import mktemp
21 from tempfile import mktemp
22 from StringIO import StringIO
22 from StringIO import StringIO
23
23
24 import zmq
24 import zmq
25 from nose import SkipTest
25 from nose import SkipTest
26
26
27 from IPython.testing import decorators as dec
27 from IPython.testing import decorators as dec
28 from IPython.testing.ipunittest import ParametricTestCase
28 from IPython.testing.ipunittest import ParametricTestCase
29
29
30 from IPython import parallel as pmod
30 from IPython import parallel as pmod
31 from IPython.parallel import error
31 from IPython.parallel import error
32 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
32 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
33 from IPython.parallel import DirectView
33 from IPython.parallel import DirectView
34 from IPython.parallel.util import interactive
34 from IPython.parallel.util import interactive
35
35
36 from IPython.parallel.tests import add_engines
36 from IPython.parallel.tests import add_engines
37
37
38 from .clienttest import ClusterTestCase, crash, wait, skip_without
38 from .clienttest import ClusterTestCase, crash, wait, skip_without
39
39
40 def setup():
40 def setup():
41 add_engines(3, total=True)
41 add_engines(3, total=True)
42
42
43 class TestView(ClusterTestCase, ParametricTestCase):
43 class TestView(ClusterTestCase, ParametricTestCase):
44
44
45 def test_z_crash_mux(self):
45 def test_z_crash_mux(self):
46 """test graceful handling of engine death (direct)"""
46 """test graceful handling of engine death (direct)"""
47 raise SkipTest("crash tests disabled, due to undesirable crash reports")
47 raise SkipTest("crash tests disabled, due to undesirable crash reports")
48 # self.add_engines(1)
48 # self.add_engines(1)
49 eid = self.client.ids[-1]
49 eid = self.client.ids[-1]
50 ar = self.client[eid].apply_async(crash)
50 ar = self.client[eid].apply_async(crash)
51 self.assertRaisesRemote(error.EngineError, ar.get, 10)
51 self.assertRaisesRemote(error.EngineError, ar.get, 10)
52 eid = ar.engine_id
52 eid = ar.engine_id
53 tic = time.time()
53 tic = time.time()
54 while eid in self.client.ids and time.time()-tic < 5:
54 while eid in self.client.ids and time.time()-tic < 5:
55 time.sleep(.01)
55 time.sleep(.01)
56 self.client.spin()
56 self.client.spin()
57 self.assertFalse(eid in self.client.ids, "Engine should have died")
57 self.assertFalse(eid in self.client.ids, "Engine should have died")
58
58
59 def test_push_pull(self):
59 def test_push_pull(self):
60 """test pushing and pulling"""
60 """test pushing and pulling"""
61 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
61 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
62 t = self.client.ids[-1]
62 t = self.client.ids[-1]
63 v = self.client[t]
63 v = self.client[t]
64 push = v.push
64 push = v.push
65 pull = v.pull
65 pull = v.pull
66 v.block=True
66 v.block=True
67 nengines = len(self.client)
67 nengines = len(self.client)
68 push({'data':data})
68 push({'data':data})
69 d = pull('data')
69 d = pull('data')
70 self.assertEquals(d, data)
70 self.assertEquals(d, data)
71 self.client[:].push({'data':data})
71 self.client[:].push({'data':data})
72 d = self.client[:].pull('data', block=True)
72 d = self.client[:].pull('data', block=True)
73 self.assertEquals(d, nengines*[data])
73 self.assertEquals(d, nengines*[data])
74 ar = push({'data':data}, block=False)
74 ar = push({'data':data}, block=False)
75 self.assertTrue(isinstance(ar, AsyncResult))
75 self.assertTrue(isinstance(ar, AsyncResult))
76 r = ar.get()
76 r = ar.get()
77 ar = self.client[:].pull('data', block=False)
77 ar = self.client[:].pull('data', block=False)
78 self.assertTrue(isinstance(ar, AsyncResult))
78 self.assertTrue(isinstance(ar, AsyncResult))
79 r = ar.get()
79 r = ar.get()
80 self.assertEquals(r, nengines*[data])
80 self.assertEquals(r, nengines*[data])
81 self.client[:].push(dict(a=10,b=20))
81 self.client[:].push(dict(a=10,b=20))
82 r = self.client[:].pull(('a','b'), block=True)
82 r = self.client[:].pull(('a','b'), block=True)
83 self.assertEquals(r, nengines*[[10,20]])
83 self.assertEquals(r, nengines*[[10,20]])
84
84
85 def test_push_pull_function(self):
85 def test_push_pull_function(self):
86 "test pushing and pulling functions"
86 "test pushing and pulling functions"
87 def testf(x):
87 def testf(x):
88 return 2.0*x
88 return 2.0*x
89
89
90 t = self.client.ids[-1]
90 t = self.client.ids[-1]
91 v = self.client[t]
91 v = self.client[t]
92 v.block=True
92 v.block=True
93 push = v.push
93 push = v.push
94 pull = v.pull
94 pull = v.pull
95 execute = v.execute
95 execute = v.execute
96 push({'testf':testf})
96 push({'testf':testf})
97 r = pull('testf')
97 r = pull('testf')
98 self.assertEqual(r(1.0), testf(1.0))
98 self.assertEqual(r(1.0), testf(1.0))
99 execute('r = testf(10)')
99 execute('r = testf(10)')
100 r = pull('r')
100 r = pull('r')
101 self.assertEquals(r, testf(10))
101 self.assertEquals(r, testf(10))
102 ar = self.client[:].push({'testf':testf}, block=False)
102 ar = self.client[:].push({'testf':testf}, block=False)
103 ar.get()
103 ar.get()
104 ar = self.client[:].pull('testf', block=False)
104 ar = self.client[:].pull('testf', block=False)
105 rlist = ar.get()
105 rlist = ar.get()
106 for r in rlist:
106 for r in rlist:
107 self.assertEqual(r(1.0), testf(1.0))
107 self.assertEqual(r(1.0), testf(1.0))
108 execute("def g(x): return x*x")
108 execute("def g(x): return x*x")
109 r = pull(('testf','g'))
109 r = pull(('testf','g'))
110 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
110 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
111
111
112 def test_push_function_globals(self):
112 def test_push_function_globals(self):
113 """test that pushed functions have access to globals"""
113 """test that pushed functions have access to globals"""
114 @interactive
114 @interactive
115 def geta():
115 def geta():
116 return a
116 return a
117 # self.add_engines(1)
117 # self.add_engines(1)
118 v = self.client[-1]
118 v = self.client[-1]
119 v.block=True
119 v.block=True
120 v['f'] = geta
120 v['f'] = geta
121 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
121 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
122 v.execute('a=5')
122 v.execute('a=5')
123 v.execute('b=f()')
123 v.execute('b=f()')
124 self.assertEquals(v['b'], 5)
124 self.assertEquals(v['b'], 5)
125
125
126 def test_push_function_defaults(self):
126 def test_push_function_defaults(self):
127 """test that pushed functions preserve default args"""
127 """test that pushed functions preserve default args"""
128 def echo(a=10):
128 def echo(a=10):
129 return a
129 return a
130 v = self.client[-1]
130 v = self.client[-1]
131 v.block=True
131 v.block=True
132 v['f'] = echo
132 v['f'] = echo
133 v.execute('b=f()')
133 v.execute('b=f()')
134 self.assertEquals(v['b'], 10)
134 self.assertEquals(v['b'], 10)
135
135
136 def test_get_result(self):
136 def test_get_result(self):
137 """test getting results from the Hub."""
137 """test getting results from the Hub."""
138 c = pmod.Client(profile='iptest')
138 c = pmod.Client(profile='iptest')
139 # self.add_engines(1)
139 # self.add_engines(1)
140 t = c.ids[-1]
140 t = c.ids[-1]
141 v = c[t]
141 v = c[t]
142 v2 = self.client[t]
142 v2 = self.client[t]
143 ar = v.apply_async(wait, 1)
143 ar = v.apply_async(wait, 1)
144 # give the monitor time to notice the message
144 # give the monitor time to notice the message
145 time.sleep(.25)
145 time.sleep(.25)
146 ahr = v2.get_result(ar.msg_ids)
146 ahr = v2.get_result(ar.msg_ids)
147 self.assertTrue(isinstance(ahr, AsyncHubResult))
147 self.assertTrue(isinstance(ahr, AsyncHubResult))
148 self.assertEquals(ahr.get(), ar.get())
148 self.assertEquals(ahr.get(), ar.get())
149 ar2 = v2.get_result(ar.msg_ids)
149 ar2 = v2.get_result(ar.msg_ids)
150 self.assertFalse(isinstance(ar2, AsyncHubResult))
150 self.assertFalse(isinstance(ar2, AsyncHubResult))
151 c.spin()
151 c.spin()
152 c.close()
152 c.close()
153
153
154 def test_run_newline(self):
154 def test_run_newline(self):
155 """test that run appends newline to files"""
155 """test that run appends newline to files"""
156 tmpfile = mktemp()
156 tmpfile = mktemp()
157 with open(tmpfile, 'w') as f:
157 with open(tmpfile, 'w') as f:
158 f.write("""def g():
158 f.write("""def g():
159 return 5
159 return 5
160 """)
160 """)
161 v = self.client[-1]
161 v = self.client[-1]
162 v.run(tmpfile, block=True)
162 v.run(tmpfile, block=True)
163 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
163 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
164
164
165 def test_apply_tracked(self):
165 def test_apply_tracked(self):
166 """test tracking for apply"""
166 """test tracking for apply"""
167 # self.add_engines(1)
167 # self.add_engines(1)
168 t = self.client.ids[-1]
168 t = self.client.ids[-1]
169 v = self.client[t]
169 v = self.client[t]
170 v.block=False
170 v.block=False
171 def echo(n=1024*1024, **kwargs):
171 def echo(n=1024*1024, **kwargs):
172 with v.temp_flags(**kwargs):
172 with v.temp_flags(**kwargs):
173 return v.apply(lambda x: x, 'x'*n)
173 return v.apply(lambda x: x, 'x'*n)
174 ar = echo(1, track=False)
174 ar = echo(1, track=False)
175 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
175 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
176 self.assertTrue(ar.sent)
176 self.assertTrue(ar.sent)
177 ar = echo(track=True)
177 ar = echo(track=True)
178 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
178 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
179 self.assertEquals(ar.sent, ar._tracker.done)
179 self.assertEquals(ar.sent, ar._tracker.done)
180 ar._tracker.wait()
180 ar._tracker.wait()
181 self.assertTrue(ar.sent)
181 self.assertTrue(ar.sent)
182
182
183 def test_push_tracked(self):
183 def test_push_tracked(self):
184 t = self.client.ids[-1]
184 t = self.client.ids[-1]
185 ns = dict(x='x'*1024*1024)
185 ns = dict(x='x'*1024*1024)
186 v = self.client[t]
186 v = self.client[t]
187 ar = v.push(ns, block=False, track=False)
187 ar = v.push(ns, block=False, track=False)
188 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
188 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
189 self.assertTrue(ar.sent)
189 self.assertTrue(ar.sent)
190
190
191 ar = v.push(ns, block=False, track=True)
191 ar = v.push(ns, block=False, track=True)
192 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
192 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
193 ar._tracker.wait()
193 ar._tracker.wait()
194 self.assertEquals(ar.sent, ar._tracker.done)
194 self.assertEquals(ar.sent, ar._tracker.done)
195 self.assertTrue(ar.sent)
195 self.assertTrue(ar.sent)
196 ar.get()
196 ar.get()
197
197
198 def test_scatter_tracked(self):
198 def test_scatter_tracked(self):
199 t = self.client.ids
199 t = self.client.ids
200 x='x'*1024*1024
200 x='x'*1024*1024
201 ar = self.client[t].scatter('x', x, block=False, track=False)
201 ar = self.client[t].scatter('x', x, block=False, track=False)
202 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
202 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
203 self.assertTrue(ar.sent)
203 self.assertTrue(ar.sent)
204
204
205 ar = self.client[t].scatter('x', x, block=False, track=True)
205 ar = self.client[t].scatter('x', x, block=False, track=True)
206 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
206 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
207 self.assertEquals(ar.sent, ar._tracker.done)
207 self.assertEquals(ar.sent, ar._tracker.done)
208 ar._tracker.wait()
208 ar._tracker.wait()
209 self.assertTrue(ar.sent)
209 self.assertTrue(ar.sent)
210 ar.get()
210 ar.get()
211
211
212 def test_remote_reference(self):
212 def test_remote_reference(self):
213 v = self.client[-1]
213 v = self.client[-1]
214 v['a'] = 123
214 v['a'] = 123
215 ra = pmod.Reference('a')
215 ra = pmod.Reference('a')
216 b = v.apply_sync(lambda x: x, ra)
216 b = v.apply_sync(lambda x: x, ra)
217 self.assertEquals(b, 123)
217 self.assertEquals(b, 123)
218
218
219
219
220 def test_scatter_gather(self):
220 def test_scatter_gather(self):
221 view = self.client[:]
221 view = self.client[:]
222 seq1 = range(16)
222 seq1 = range(16)
223 view.scatter('a', seq1)
223 view.scatter('a', seq1)
224 seq2 = view.gather('a', block=True)
224 seq2 = view.gather('a', block=True)
225 self.assertEquals(seq2, seq1)
225 self.assertEquals(seq2, seq1)
226 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
226 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
227
227
228 @skip_without('numpy')
228 @skip_without('numpy')
229 def test_scatter_gather_numpy(self):
229 def test_scatter_gather_numpy(self):
230 import numpy
230 import numpy
231 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
231 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
232 view = self.client[:]
232 view = self.client[:]
233 a = numpy.arange(64)
233 a = numpy.arange(64)
234 view.scatter('a', a)
234 view.scatter('a', a)
235 b = view.gather('a', block=True)
235 b = view.gather('a', block=True)
236 assert_array_equal(b, a)
236 assert_array_equal(b, a)
237
237
238 def test_scatter_gather_lazy(self):
238 def test_scatter_gather_lazy(self):
239 """scatter/gather with targets='all'"""
239 """scatter/gather with targets='all'"""
240 view = self.client.direct_view(targets='all')
240 view = self.client.direct_view(targets='all')
241 x = range(64)
241 x = range(64)
242 view.scatter('x', x)
242 view.scatter('x', x)
243 gathered = view.gather('x', block=True)
243 gathered = view.gather('x', block=True)
244 self.assertEquals(gathered, x)
244 self.assertEquals(gathered, x)
245
245
246
246
247 @dec.known_failure_py3
247 @dec.known_failure_py3
248 @skip_without('numpy')
248 @skip_without('numpy')
249 def test_push_numpy_nocopy(self):
249 def test_push_numpy_nocopy(self):
250 import numpy
250 import numpy
251 view = self.client[:]
251 view = self.client[:]
252 a = numpy.arange(64)
252 a = numpy.arange(64)
253 view['A'] = a
253 view['A'] = a
254 @interactive
254 @interactive
255 def check_writeable(x):
255 def check_writeable(x):
256 return x.flags.writeable
256 return x.flags.writeable
257
257
258 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
258 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
259 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
259 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
260
260
261 view.push(dict(B=a))
261 view.push(dict(B=a))
262 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
262 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
263 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
263 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
264
264
265 @skip_without('numpy')
265 @skip_without('numpy')
266 def test_apply_numpy(self):
266 def test_apply_numpy(self):
267 """view.apply(f, ndarray)"""
267 """view.apply(f, ndarray)"""
268 import numpy
268 import numpy
269 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
269 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
270
270
271 A = numpy.random.random((100,100))
271 A = numpy.random.random((100,100))
272 view = self.client[-1]
272 view = self.client[-1]
273 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
273 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
274 B = A.astype(dt)
274 B = A.astype(dt)
275 C = view.apply_sync(lambda x:x, B)
275 C = view.apply_sync(lambda x:x, B)
276 assert_array_equal(B,C)
276 assert_array_equal(B,C)
277
277
278 def test_map(self):
278 def test_map(self):
279 view = self.client[:]
279 view = self.client[:]
280 def f(x):
280 def f(x):
281 return x**2
281 return x**2
282 data = range(16)
282 data = range(16)
283 r = view.map_sync(f, data)
283 r = view.map_sync(f, data)
284 self.assertEquals(r, map(f, data))
284 self.assertEquals(r, map(f, data))
285
285
286 def test_map_iterable(self):
286 def test_map_iterable(self):
287 """test map on iterables (direct)"""
287 """test map on iterables (direct)"""
288 view = self.client[:]
288 view = self.client[:]
289 # 101 is prime, so it won't be evenly distributed
289 # 101 is prime, so it won't be evenly distributed
290 arr = range(101)
290 arr = range(101)
291 # ensure it will be an iterator, even in Python 3
291 # ensure it will be an iterator, even in Python 3
292 it = iter(arr)
292 it = iter(arr)
293 r = view.map_sync(lambda x:x, arr)
293 r = view.map_sync(lambda x:x, arr)
294 self.assertEquals(r, list(arr))
294 self.assertEquals(r, list(arr))
295
295
296 def test_scatterGatherNonblocking(self):
296 def test_scatterGatherNonblocking(self):
297 data = range(16)
297 data = range(16)
298 view = self.client[:]
298 view = self.client[:]
299 view.scatter('a', data, block=False)
299 view.scatter('a', data, block=False)
300 ar = view.gather('a', block=False)
300 ar = view.gather('a', block=False)
301 self.assertEquals(ar.get(), data)
301 self.assertEquals(ar.get(), data)
302
302
303 @skip_without('numpy')
303 @skip_without('numpy')
304 def test_scatter_gather_numpy_nonblocking(self):
304 def test_scatter_gather_numpy_nonblocking(self):
305 import numpy
305 import numpy
306 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
306 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
307 a = numpy.arange(64)
307 a = numpy.arange(64)
308 view = self.client[:]
308 view = self.client[:]
309 ar = view.scatter('a', a, block=False)
309 ar = view.scatter('a', a, block=False)
310 self.assertTrue(isinstance(ar, AsyncResult))
310 self.assertTrue(isinstance(ar, AsyncResult))
311 amr = view.gather('a', block=False)
311 amr = view.gather('a', block=False)
312 self.assertTrue(isinstance(amr, AsyncMapResult))
312 self.assertTrue(isinstance(amr, AsyncMapResult))
313 assert_array_equal(amr.get(), a)
313 assert_array_equal(amr.get(), a)
314
314
315 def test_execute(self):
315 def test_execute(self):
316 view = self.client[:]
316 view = self.client[:]
317 # self.client.debug=True
317 # self.client.debug=True
318 execute = view.execute
318 execute = view.execute
319 ar = execute('c=30', block=False)
319 ar = execute('c=30', block=False)
320 self.assertTrue(isinstance(ar, AsyncResult))
320 self.assertTrue(isinstance(ar, AsyncResult))
321 ar = execute('d=[0,1,2]', block=False)
321 ar = execute('d=[0,1,2]', block=False)
322 self.client.wait(ar, 1)
322 self.client.wait(ar, 1)
323 self.assertEquals(len(ar.get()), len(self.client))
323 self.assertEquals(len(ar.get()), len(self.client))
324 for c in view['c']:
324 for c in view['c']:
325 self.assertEquals(c, 30)
325 self.assertEquals(c, 30)
326
326
327 def test_abort(self):
327 def test_abort(self):
328 view = self.client[-1]
328 view = self.client[-1]
329 ar = view.execute('import time; time.sleep(1)', block=False)
329 ar = view.execute('import time; time.sleep(1)', block=False)
330 ar2 = view.apply_async(lambda : 2)
330 ar2 = view.apply_async(lambda : 2)
331 ar3 = view.apply_async(lambda : 3)
331 ar3 = view.apply_async(lambda : 3)
332 view.abort(ar2)
332 view.abort(ar2)
333 view.abort(ar3.msg_ids)
333 view.abort(ar3.msg_ids)
334 self.assertRaises(error.TaskAborted, ar2.get)
334 self.assertRaises(error.TaskAborted, ar2.get)
335 self.assertRaises(error.TaskAborted, ar3.get)
335 self.assertRaises(error.TaskAborted, ar3.get)
336
336
337 def test_abort_all(self):
337 def test_abort_all(self):
338 """view.abort() aborts all outstanding tasks"""
338 """view.abort() aborts all outstanding tasks"""
339 view = self.client[-1]
339 view = self.client[-1]
340 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
340 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
341 view.abort()
341 view.abort()
342 view.wait(timeout=5)
342 view.wait(timeout=5)
343 for ar in ars[5:]:
343 for ar in ars[5:]:
344 self.assertRaises(error.TaskAborted, ar.get)
344 self.assertRaises(error.TaskAborted, ar.get)
345
345
346 def test_temp_flags(self):
346 def test_temp_flags(self):
347 view = self.client[-1]
347 view = self.client[-1]
348 view.block=True
348 view.block=True
349 with view.temp_flags(block=False):
349 with view.temp_flags(block=False):
350 self.assertFalse(view.block)
350 self.assertFalse(view.block)
351 self.assertTrue(view.block)
351 self.assertTrue(view.block)
352
352
353 @dec.known_failure_py3
353 @dec.known_failure_py3
354 def test_importer(self):
354 def test_importer(self):
355 view = self.client[-1]
355 view = self.client[-1]
356 view.clear(block=True)
356 view.clear(block=True)
357 with view.importer:
357 with view.importer:
358 import re
358 import re
359
359
360 @interactive
360 @interactive
361 def findall(pat, s):
361 def findall(pat, s):
362 # this globals() step isn't necessary in real code
362 # this globals() step isn't necessary in real code
363 # only to prevent a closure in the test
363 # only to prevent a closure in the test
364 re = globals()['re']
364 re = globals()['re']
365 return re.findall(pat, s)
365 return re.findall(pat, s)
366
366
367 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
367 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
368
368
369 # parallel magic tests
369 # parallel magic tests
370
370
371 def test_magic_px_blocking(self):
371 def test_magic_px_blocking(self):
372 ip = get_ipython()
372 ip = get_ipython()
373 v = self.client[-1]
373 v = self.client[-1]
374 v.activate()
374 v.activate()
375 v.block=True
375 v.block=True
376
376
377 ip.magic_px('a=5')
377 ip.magic_px('a=5')
378 self.assertEquals(v['a'], 5)
378 self.assertEquals(v['a'], 5)
379 ip.magic_px('a=10')
379 ip.magic_px('a=10')
380 self.assertEquals(v['a'], 10)
380 self.assertEquals(v['a'], 10)
381 sio = StringIO()
381 sio = StringIO()
382 savestdout = sys.stdout
382 savestdout = sys.stdout
383 sys.stdout = sio
383 sys.stdout = sio
384 # just 'print a' worst ~99% of the time, but this ensures that
384 # just 'print a' worst ~99% of the time, but this ensures that
385 # the stdout message has arrived when the result is finished:
385 # the stdout message has arrived when the result is finished:
386 ip.magic_px('import sys,time;print (a); sys.stdout.flush();time.sleep(0.2)')
386 ip.magic_px('import sys,time;print (a); sys.stdout.flush();time.sleep(0.2)')
387 sys.stdout = savestdout
387 sys.stdout = savestdout
388 buf = sio.getvalue()
388 buf = sio.getvalue()
389 self.assertTrue('[stdout:' in buf, buf)
389 self.assertTrue('[stdout:' in buf, buf)
390 self.assertTrue(buf.rstrip().endswith('10'))
390 self.assertTrue(buf.rstrip().endswith('10'))
391 self.assertRaisesRemote(ZeroDivisionError, ip.magic_px, '1/0')
391 self.assertRaisesRemote(ZeroDivisionError, ip.magic_px, '1/0')
392
392
393 def test_magic_px_nonblocking(self):
393 def test_magic_px_nonblocking(self):
394 ip = get_ipython()
394 ip = get_ipython()
395 v = self.client[-1]
395 v = self.client[-1]
396 v.activate()
396 v.activate()
397 v.block=False
397 v.block=False
398
398
399 ip.magic_px('a=5')
399 ip.magic_px('a=5')
400 self.assertEquals(v['a'], 5)
400 self.assertEquals(v['a'], 5)
401 ip.magic_px('a=10')
401 ip.magic_px('a=10')
402 self.assertEquals(v['a'], 10)
402 self.assertEquals(v['a'], 10)
403 sio = StringIO()
403 sio = StringIO()
404 savestdout = sys.stdout
404 savestdout = sys.stdout
405 sys.stdout = sio
405 sys.stdout = sio
406 ip.magic_px('print a')
406 ip.magic_px('print a')
407 sys.stdout = savestdout
407 sys.stdout = savestdout
408 buf = sio.getvalue()
408 buf = sio.getvalue()
409 self.assertFalse('[stdout:%i]'%v.targets in buf)
409 self.assertFalse('[stdout:%i]'%v.targets in buf)
410 ip.magic_px('1/0')
410 ip.magic_px('1/0')
411 ar = v.get_result(-1)
411 ar = v.get_result(-1)
412 self.assertRaisesRemote(ZeroDivisionError, ar.get)
412 self.assertRaisesRemote(ZeroDivisionError, ar.get)
413
413
414 def test_magic_autopx_blocking(self):
414 def test_magic_autopx_blocking(self):
415 ip = get_ipython()
415 ip = get_ipython()
416 v = self.client[-1]
416 v = self.client[-1]
417 v.activate()
417 v.activate()
418 v.block=True
418 v.block=True
419
419
420 sio = StringIO()
420 sio = StringIO()
421 savestdout = sys.stdout
421 savestdout = sys.stdout
422 sys.stdout = sio
422 sys.stdout = sio
423 ip.magic_autopx()
423 ip.magic_autopx()
424 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
424 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
425 ip.run_cell('b*=2')
425 ip.run_cell('b*=2')
426 ip.run_cell('print (b)')
426 ip.run_cell('print (b)')
427 ip.run_cell("b/c")
427 ip.run_cell("b/c")
428 ip.magic_autopx()
428 ip.magic_autopx()
429 sys.stdout = savestdout
429 sys.stdout = savestdout
430 output = sio.getvalue().strip()
430 output = sio.getvalue().strip()
431 self.assertTrue(output.startswith('%autopx enabled'))
431 self.assertTrue(output.startswith('%autopx enabled'))
432 self.assertTrue(output.endswith('%autopx disabled'))
432 self.assertTrue(output.endswith('%autopx disabled'))
433 self.assertTrue('RemoteError: ZeroDivisionError' in output)
433 self.assertTrue('RemoteError: ZeroDivisionError' in output)
434 ar = v.get_result(-1)
434 ar = v.get_result(-1)
435 self.assertEquals(v['a'], 5)
435 self.assertEquals(v['a'], 5)
436 self.assertEquals(v['b'], 20)
436 self.assertEquals(v['b'], 20)
437 self.assertRaisesRemote(ZeroDivisionError, ar.get)
437 self.assertRaisesRemote(ZeroDivisionError, ar.get)
438
438
439 def test_magic_autopx_nonblocking(self):
439 def test_magic_autopx_nonblocking(self):
440 ip = get_ipython()
440 ip = get_ipython()
441 v = self.client[-1]
441 v = self.client[-1]
442 v.activate()
442 v.activate()
443 v.block=False
443 v.block=False
444
444
445 sio = StringIO()
445 sio = StringIO()
446 savestdout = sys.stdout
446 savestdout = sys.stdout
447 sys.stdout = sio
447 sys.stdout = sio
448 ip.magic_autopx()
448 ip.magic_autopx()
449 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
449 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
450 ip.run_cell('print (b)')
450 ip.run_cell('print (b)')
451 ip.run_cell('import time; time.sleep(0.1)')
451 ip.run_cell('import time; time.sleep(0.1)')
452 ip.run_cell("b/c")
452 ip.run_cell("b/c")
453 ip.run_cell('b*=2')
453 ip.run_cell('b*=2')
454 ip.magic_autopx()
454 ip.magic_autopx()
455 sys.stdout = savestdout
455 sys.stdout = savestdout
456 output = sio.getvalue().strip()
456 output = sio.getvalue().strip()
457 self.assertTrue(output.startswith('%autopx enabled'))
457 self.assertTrue(output.startswith('%autopx enabled'))
458 self.assertTrue(output.endswith('%autopx disabled'))
458 self.assertTrue(output.endswith('%autopx disabled'))
459 self.assertFalse('ZeroDivisionError' in output)
459 self.assertFalse('ZeroDivisionError' in output)
460 ar = v.get_result(-2)
460 ar = v.get_result(-2)
461 self.assertRaisesRemote(ZeroDivisionError, ar.get)
461 self.assertRaisesRemote(ZeroDivisionError, ar.get)
462 # prevent TaskAborted on pulls, due to ZeroDivisionError
462 # prevent TaskAborted on pulls, due to ZeroDivisionError
463 time.sleep(0.5)
463 time.sleep(0.5)
464 self.assertEquals(v['a'], 5)
464 self.assertEquals(v['a'], 5)
465 # b*=2 will not fire, due to abort
465 # b*=2 will not fire, due to abort
466 self.assertEquals(v['b'], 10)
466 self.assertEquals(v['b'], 10)
467
467
468 def test_magic_result(self):
468 def test_magic_result(self):
469 ip = get_ipython()
469 ip = get_ipython()
470 v = self.client[-1]
470 v = self.client[-1]
471 v.activate()
471 v.activate()
472 v['a'] = 111
472 v['a'] = 111
473 ra = v['a']
473 ra = v['a']
474
474
475 ar = ip.magic_result()
475 ar = ip.magic_result()
476 self.assertEquals(ar.msg_ids, [v.history[-1]])
476 self.assertEquals(ar.msg_ids, [v.history[-1]])
477 self.assertEquals(ar.get(), 111)
477 self.assertEquals(ar.get(), 111)
478 ar = ip.magic_result('-2')
478 ar = ip.magic_result('-2')
479 self.assertEquals(ar.msg_ids, [v.history[-2]])
479 self.assertEquals(ar.msg_ids, [v.history[-2]])
480
480
481 def test_unicode_execute(self):
481 def test_unicode_execute(self):
482 """test executing unicode strings"""
482 """test executing unicode strings"""
483 v = self.client[-1]
483 v = self.client[-1]
484 v.block=True
484 v.block=True
485 if sys.version_info[0] >= 3:
485 if sys.version_info[0] >= 3:
486 code="a='é'"
486 code="a='é'"
487 else:
487 else:
488 code=u"a=u'é'"
488 code=u"a=u'é'"
489 v.execute(code)
489 v.execute(code)
490 self.assertEquals(v['a'], u'é')
490 self.assertEquals(v['a'], u'é')
491
491
492 def test_unicode_apply_result(self):
492 def test_unicode_apply_result(self):
493 """test unicode apply results"""
493 """test unicode apply results"""
494 v = self.client[-1]
494 v = self.client[-1]
495 r = v.apply_sync(lambda : u'é')
495 r = v.apply_sync(lambda : u'é')
496 self.assertEquals(r, u'é')
496 self.assertEquals(r, u'é')
497
497
498 def test_unicode_apply_arg(self):
498 def test_unicode_apply_arg(self):
499 """test passing unicode arguments to apply"""
499 """test passing unicode arguments to apply"""
500 v = self.client[-1]
500 v = self.client[-1]
501
501
502 @interactive
502 @interactive
503 def check_unicode(a, check):
503 def check_unicode(a, check):
504 assert isinstance(a, unicode), "%r is not unicode"%a
504 assert isinstance(a, unicode), "%r is not unicode"%a
505 assert isinstance(check, bytes), "%r is not bytes"%check
505 assert isinstance(check, bytes), "%r is not bytes"%check
506 assert a.encode('utf8') == check, "%s != %s"%(a,check)
506 assert a.encode('utf8') == check, "%s != %s"%(a,check)
507
507
508 for s in [ u'é', u'ßø®∫',u'asdf' ]:
508 for s in [ u'é', u'ßø®∫',u'asdf' ]:
509 try:
509 try:
510 v.apply_sync(check_unicode, s, s.encode('utf8'))
510 v.apply_sync(check_unicode, s, s.encode('utf8'))
511 except error.RemoteError as e:
511 except error.RemoteError as e:
512 if e.ename == 'AssertionError':
512 if e.ename == 'AssertionError':
513 self.fail(e.evalue)
513 self.fail(e.evalue)
514 else:
514 else:
515 raise e
515 raise e
516
516
517 def test_map_reference(self):
517 def test_map_reference(self):
518 """view.map(<Reference>, *seqs) should work"""
518 """view.map(<Reference>, *seqs) should work"""
519 v = self.client[:]
519 v = self.client[:]
520 v.scatter('n', self.client.ids, flatten=True)
520 v.scatter('n', self.client.ids, flatten=True)
521 v.execute("f = lambda x,y: x*y")
521 v.execute("f = lambda x,y: x*y")
522 rf = pmod.Reference('f')
522 rf = pmod.Reference('f')
523 nlist = list(range(10))
523 nlist = list(range(10))
524 mlist = nlist[::-1]
524 mlist = nlist[::-1]
525 expected = [ m*n for m,n in zip(mlist, nlist) ]
525 expected = [ m*n for m,n in zip(mlist, nlist) ]
526 result = v.map_sync(rf, mlist, nlist)
526 result = v.map_sync(rf, mlist, nlist)
527 self.assertEquals(result, expected)
527 self.assertEquals(result, expected)
528
528
529 def test_apply_reference(self):
529 def test_apply_reference(self):
530 """view.apply(<Reference>, *args) should work"""
530 """view.apply(<Reference>, *args) should work"""
531 v = self.client[:]
531 v = self.client[:]
532 v.scatter('n', self.client.ids, flatten=True)
532 v.scatter('n', self.client.ids, flatten=True)
533 v.execute("f = lambda x: n*x")
533 v.execute("f = lambda x: n*x")
534 rf = pmod.Reference('f')
534 rf = pmod.Reference('f')
535 result = v.apply_sync(rf, 5)
535 result = v.apply_sync(rf, 5)
536 expected = [ 5*id for id in self.client.ids ]
536 expected = [ 5*id for id in self.client.ids ]
537 self.assertEquals(result, expected)
537 self.assertEquals(result, expected)
538
538
539 def test_eval_reference(self):
539 def test_eval_reference(self):
540 v = self.client[self.client.ids[0]]
540 v = self.client[self.client.ids[0]]
541 v['g'] = range(5)
541 v['g'] = range(5)
542 rg = pmod.Reference('g[0]')
542 rg = pmod.Reference('g[0]')
543 echo = lambda x:x
543 echo = lambda x:x
544 self.assertEquals(v.apply_sync(echo, rg), 0)
544 self.assertEquals(v.apply_sync(echo, rg), 0)
545
545
546 def test_reference_nameerror(self):
546 def test_reference_nameerror(self):
547 v = self.client[self.client.ids[0]]
547 v = self.client[self.client.ids[0]]
548 r = pmod.Reference('elvis_has_left')
548 r = pmod.Reference('elvis_has_left')
549 echo = lambda x:x
549 echo = lambda x:x
550 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
550 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
551
551
552 def test_single_engine_map(self):
552 def test_single_engine_map(self):
553 e0 = self.client[self.client.ids[0]]
553 e0 = self.client[self.client.ids[0]]
554 r = range(5)
554 r = range(5)
555 check = [ -1*i for i in r ]
555 check = [ -1*i for i in r ]
556 result = e0.map_sync(lambda x: -1*x, r)
556 result = e0.map_sync(lambda x: -1*x, r)
557 self.assertEquals(result, check)
557 self.assertEquals(result, check)
558
558
559 def test_len(self):
559 def test_len(self):
560 """len(view) makes sense"""
560 """len(view) makes sense"""
561 e0 = self.client[self.client.ids[0]]
561 e0 = self.client[self.client.ids[0]]
562 yield self.assertEquals(len(e0), 1)
562 yield self.assertEquals(len(e0), 1)
563 v = self.client[:]
563 v = self.client[:]
564 yield self.assertEquals(len(v), len(self.client.ids))
564 yield self.assertEquals(len(v), len(self.client.ids))
565 v = self.client.direct_view('all')
565 v = self.client.direct_view('all')
566 yield self.assertEquals(len(v), len(self.client.ids))
566 yield self.assertEquals(len(v), len(self.client.ids))
567 v = self.client[:2]
567 v = self.client[:2]
568 yield self.assertEquals(len(v), 2)
568 yield self.assertEquals(len(v), 2)
569 v = self.client[:1]
569 v = self.client[:1]
570 yield self.assertEquals(len(v), 1)
570 yield self.assertEquals(len(v), 1)
571 v = self.client.load_balanced_view()
571 v = self.client.load_balanced_view()
572 yield self.assertEquals(len(v), len(self.client.ids))
572 yield self.assertEquals(len(v), len(self.client.ids))
573 # parametric tests seem to require manual closing?
573 # parametric tests seem to require manual closing?
574 self.client.close()
574 self.client.close()
575
575
576
576
577 # begin execute tests
577 # begin execute tests
578 def _wait_for(self, f, timeout=10):
579 tic = time.time()
580 while time.time() <= tic + timeout:
581 if f():
582 return
583 time.sleep(0.1)
584 self.client.spin()
585 if not f():
586 print "Warning: Awaited condition never arrived"
587
578
588
579 def test_execute_reply(self):
589 def test_execute_reply(self):
580 e0 = self.client[self.client.ids[0]]
590 e0 = self.client[self.client.ids[0]]
581 e0.block = True
591 e0.block = True
582 ar = e0.execute("5", silent=False)
592 ar = e0.execute("5", silent=False)
583 er = ar.get()
593 er = ar.get()
584 time.sleep(0.2)
594 self._wait_for(lambda : bool(er.pyout))
585 self.assertEquals(str(er), "<ExecuteReply[%i]: 5>" % er.execution_count)
595 self.assertEquals(str(er), "<ExecuteReply[%i]: 5>" % er.execution_count)
586 self.assertEquals(er.pyout['text/plain'], '5')
596 self.assertEquals(er.pyout['text/plain'], '5')
587
597
588 def test_execute_reply_stdout(self):
598 def test_execute_reply_stdout(self):
589 e0 = self.client[self.client.ids[0]]
599 e0 = self.client[self.client.ids[0]]
590 e0.block = True
600 e0.block = True
591 ar = e0.execute("print (5)", silent=False)
601 ar = e0.execute("print (5)", silent=False)
592 er = ar.get()
602 er = ar.get()
593 time.sleep(0.2)
603 self._wait_for(lambda : bool(er.stdout))
594 self.assertEquals(er.stdout.strip(), '5')
604 self.assertEquals(er.stdout.strip(), '5')
595
605
596 def test_execute_pyout(self):
606 def test_execute_pyout(self):
597 """execute triggers pyout with silent=False"""
607 """execute triggers pyout with silent=False"""
598 view = self.client[:]
608 view = self.client[:]
599 ar = view.execute("5", silent=False, block=True)
609 ar = view.execute("5", silent=False, block=True)
600 time.sleep(0.2)
610 self._wait_for(lambda : all(ar.pyout))
611
601 expected = [{'text/plain' : '5'}] * len(view)
612 expected = [{'text/plain' : '5'}] * len(view)
602 self.assertEquals(ar.pyout, expected)
613 self.assertEquals(ar.pyout, expected)
603
614
604 def test_execute_silent(self):
615 def test_execute_silent(self):
605 """execute does not trigger pyout with silent=True"""
616 """execute does not trigger pyout with silent=True"""
606 view = self.client[:]
617 view = self.client[:]
607 ar = view.execute("5", block=True)
618 ar = view.execute("5", block=True)
608 expected = [None] * len(view)
619 expected = [None] * len(view)
609 self.assertEquals(ar.pyout, expected)
620 self.assertEquals(ar.pyout, expected)
610
621
611 def test_execute_magic(self):
622 def test_execute_magic(self):
612 """execute accepts IPython commands"""
623 """execute accepts IPython commands"""
613 view = self.client[:]
624 view = self.client[:]
614 view.execute("a = 5")
625 view.execute("a = 5")
615 ar = view.execute("%whos", block=True)
626 ar = view.execute("%whos", block=True)
616 # this will raise, if that failed
627 # this will raise, if that failed
617 ar.get(5)
628 ar.get(5)
618 time.sleep(0.2)
629 self._wait_for(lambda : all(ar.stdout))
619 for stdout in ar.stdout:
630 for stdout in ar.stdout:
620 lines = stdout.splitlines()
631 lines = stdout.splitlines()
621 self.assertEquals(lines[0].split(), ['Variable', 'Type', 'Data/Info'])
632 self.assertEquals(lines[0].split(), ['Variable', 'Type', 'Data/Info'])
622 found = False
633 found = False
623 for line in lines[2:]:
634 for line in lines[2:]:
624 split = line.split()
635 split = line.split()
625 if split == ['a', 'int', '5']:
636 if split == ['a', 'int', '5']:
626 found = True
637 found = True
627 break
638 break
628 self.assertTrue(found, "whos output wrong: %s" % stdout)
639 self.assertTrue(found, "whos output wrong: %s" % stdout)
629
640
630 def test_execute_displaypub(self):
641 def test_execute_displaypub(self):
631 """execute tracks display_pub output"""
642 """execute tracks display_pub output"""
632 view = self.client[:]
643 view = self.client[:]
633 view.execute("from IPython.core.display import *")
644 view.execute("from IPython.core.display import *")
634 ar = view.execute("[ display(i) for i in range(5) ]", block=True)
645 ar = view.execute("[ display(i) for i in range(5) ]", block=True)
635 time.sleep(0.2)
646
647 self._wait_for(lambda : all(len(er.outputs) >= 5 for er in ar))
636 outs = [ {u'text/plain' : unicode(i)} for i in range(5) ]
648 outs = [ {u'text/plain' : unicode(i)} for i in range(5) ]
637 expected = [outs] * len(view)
649 expected = [outs] * len(view)
638 self.assertEquals(ar.outputs, expected)
650 self.assertEquals(ar.outputs, expected)
639
651
640 def test_apply_displaypub(self):
652 def test_apply_displaypub(self):
641 """apply tracks display_pub output"""
653 """apply tracks display_pub output"""
642 view = self.client[:]
654 view = self.client[:]
643 view.execute("from IPython.core.display import *")
655 view.execute("from IPython.core.display import *")
644
656
645 @interactive
657 @interactive
646 def publish():
658 def publish():
647 [ display(i) for i in range(5) ]
659 [ display(i) for i in range(5) ]
648
660
649 ar = view.apply_async(publish)
661 ar = view.apply_async(publish)
650 ar.get(5)
662 ar.get(5)
651 time.sleep(0.2)
663 self._wait_for(lambda : all(len(out) >= 5 for out in ar.outputs))
652 outs = [ {u'text/plain' : unicode(j)} for j in range(5) ]
664 outs = [ {u'text/plain' : unicode(j)} for j in range(5) ]
653 expected = [outs] * len(view)
665 expected = [outs] * len(view)
654 self.assertEquals(ar.outputs, expected)
666 self.assertEquals(ar.outputs, expected)
655
667
656 def test_execute_raises(self):
668 def test_execute_raises(self):
657 """exceptions in execute requests raise appropriately"""
669 """exceptions in execute requests raise appropriately"""
658 view = self.client[-1]
670 view = self.client[-1]
659 ar = view.execute("1/0")
671 ar = view.execute("1/0")
660 self.assertRaisesRemote(ZeroDivisionError, ar.get, 2)
672 self.assertRaisesRemote(ZeroDivisionError, ar.get, 2)
661
673
662 @dec.skipif_not_matplotlib
674 @dec.skipif_not_matplotlib
663 def test_amagic_pylab(self):
675 def test_amagic_pylab(self):
664 """%pylab works on engines"""
676 """%pylab works on engines"""
665 view = self.client[-1]
677 view = self.client[-1]
666 ar = view.execute("%pylab inline")
678 ar = view.execute("%pylab inline")
667 # at least check if this raised:
679 # at least check if this raised:
668 reply = ar.get(5)
680 reply = ar.get(5)
669 # include imports, in case user config
681 # include imports, in case user config
670 ar = view.execute("plot(rand(100))", silent=False)
682 ar = view.execute("plot(rand(100))", silent=False)
671 reply = ar.get(5)
683 reply = ar.get(5)
684 self._wait_for(lambda : all(ar.outputs))
672 self.assertEquals(len(reply.outputs), 1)
685 self.assertEquals(len(reply.outputs), 1)
673 output = reply.outputs[0]
686 output = reply.outputs[0]
674 self.assertTrue("image/png" in output)
687 self.assertTrue("image/png" in output)
675
688
676
689
General Comments 0
You need to be logged in to leave comments. Login now