##// END OF EJS Templates
tag crash tests with @attr('crash')...
MinRK -
Show More
@@ -1,176 +1,177 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """test LoadBalancedView objects
2 """test LoadBalancedView objects
3
3
4 Authors:
4 Authors:
5
5
6 * Min RK
6 * Min RK
7 """
7 """
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import sys
19 import sys
20 import time
20 import time
21
21
22 import zmq
22 import zmq
23 from nose import SkipTest
23 from nose import SkipTest
24 from nose.plugins.attrib import attr
24
25
25 from IPython import parallel as pmod
26 from IPython import parallel as pmod
26 from IPython.parallel import error
27 from IPython.parallel import error
27
28
28 from IPython.parallel.tests import add_engines
29 from IPython.parallel.tests import add_engines
29
30
30 from .clienttest import ClusterTestCase, crash, wait, skip_without
31 from .clienttest import ClusterTestCase, crash, wait, skip_without
31
32
32 def setup():
33 def setup():
33 add_engines(3, total=True)
34 add_engines(3, total=True)
34
35
35 class TestLoadBalancedView(ClusterTestCase):
36 class TestLoadBalancedView(ClusterTestCase):
36
37
37 def setUp(self):
38 def setUp(self):
38 ClusterTestCase.setUp(self)
39 ClusterTestCase.setUp(self)
39 self.view = self.client.load_balanced_view()
40 self.view = self.client.load_balanced_view()
40
41
42 @attr('crash')
41 def test_z_crash_task(self):
43 def test_z_crash_task(self):
42 """test graceful handling of engine death (balanced)"""
44 """test graceful handling of engine death (balanced)"""
43 raise SkipTest("crash tests disabled, due to undesirable crash reports")
44 # self.add_engines(1)
45 # self.add_engines(1)
45 ar = self.view.apply_async(crash)
46 ar = self.view.apply_async(crash)
46 self.assertRaisesRemote(error.EngineError, ar.get, 10)
47 self.assertRaisesRemote(error.EngineError, ar.get, 10)
47 eid = ar.engine_id
48 eid = ar.engine_id
48 tic = time.time()
49 tic = time.time()
49 while eid in self.client.ids and time.time()-tic < 5:
50 while eid in self.client.ids and time.time()-tic < 5:
50 time.sleep(.01)
51 time.sleep(.01)
51 self.client.spin()
52 self.client.spin()
52 self.assertFalse(eid in self.client.ids, "Engine should have died")
53 self.assertFalse(eid in self.client.ids, "Engine should have died")
53
54
54 def test_map(self):
55 def test_map(self):
55 def f(x):
56 def f(x):
56 return x**2
57 return x**2
57 data = range(16)
58 data = range(16)
58 r = self.view.map_sync(f, data)
59 r = self.view.map_sync(f, data)
59 self.assertEqual(r, map(f, data))
60 self.assertEqual(r, map(f, data))
60
61
61 def test_map_unordered(self):
62 def test_map_unordered(self):
62 def f(x):
63 def f(x):
63 return x**2
64 return x**2
64 def slow_f(x):
65 def slow_f(x):
65 import time
66 import time
66 time.sleep(0.05*x)
67 time.sleep(0.05*x)
67 return x**2
68 return x**2
68 data = range(16,0,-1)
69 data = range(16,0,-1)
69 reference = map(f, data)
70 reference = map(f, data)
70
71
71 amr = self.view.map_async(slow_f, data, ordered=False)
72 amr = self.view.map_async(slow_f, data, ordered=False)
72 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
73 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
73 # check individual elements, retrieved as they come
74 # check individual elements, retrieved as they come
74 # list comprehension uses __iter__
75 # list comprehension uses __iter__
75 astheycame = [ r for r in amr ]
76 astheycame = [ r for r in amr ]
76 # Ensure that at least one result came out of order:
77 # Ensure that at least one result came out of order:
77 self.assertNotEqual(astheycame, reference, "should not have preserved order")
78 self.assertNotEqual(astheycame, reference, "should not have preserved order")
78 self.assertEqual(sorted(astheycame, reverse=True), reference, "result corrupted")
79 self.assertEqual(sorted(astheycame, reverse=True), reference, "result corrupted")
79
80
80 def test_map_ordered(self):
81 def test_map_ordered(self):
81 def f(x):
82 def f(x):
82 return x**2
83 return x**2
83 def slow_f(x):
84 def slow_f(x):
84 import time
85 import time
85 time.sleep(0.05*x)
86 time.sleep(0.05*x)
86 return x**2
87 return x**2
87 data = range(16,0,-1)
88 data = range(16,0,-1)
88 reference = map(f, data)
89 reference = map(f, data)
89
90
90 amr = self.view.map_async(slow_f, data)
91 amr = self.view.map_async(slow_f, data)
91 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
92 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
92 # check individual elements, retrieved as they come
93 # check individual elements, retrieved as they come
93 # list(amr) uses __iter__
94 # list(amr) uses __iter__
94 astheycame = list(amr)
95 astheycame = list(amr)
95 # Ensure that results came in order
96 # Ensure that results came in order
96 self.assertEqual(astheycame, reference)
97 self.assertEqual(astheycame, reference)
97 self.assertEqual(amr.result, reference)
98 self.assertEqual(amr.result, reference)
98
99
99 def test_map_iterable(self):
100 def test_map_iterable(self):
100 """test map on iterables (balanced)"""
101 """test map on iterables (balanced)"""
101 view = self.view
102 view = self.view
102 # 101 is prime, so it won't be evenly distributed
103 # 101 is prime, so it won't be evenly distributed
103 arr = range(101)
104 arr = range(101)
104 # so that it will be an iterator, even in Python 3
105 # so that it will be an iterator, even in Python 3
105 it = iter(arr)
106 it = iter(arr)
106 r = view.map_sync(lambda x:x, arr)
107 r = view.map_sync(lambda x:x, arr)
107 self.assertEqual(r, list(arr))
108 self.assertEqual(r, list(arr))
108
109
109
110
110 def test_abort(self):
111 def test_abort(self):
111 view = self.view
112 view = self.view
112 ar = self.client[:].apply_async(time.sleep, .5)
113 ar = self.client[:].apply_async(time.sleep, .5)
113 ar = self.client[:].apply_async(time.sleep, .5)
114 ar = self.client[:].apply_async(time.sleep, .5)
114 time.sleep(0.2)
115 time.sleep(0.2)
115 ar2 = view.apply_async(lambda : 2)
116 ar2 = view.apply_async(lambda : 2)
116 ar3 = view.apply_async(lambda : 3)
117 ar3 = view.apply_async(lambda : 3)
117 view.abort(ar2)
118 view.abort(ar2)
118 view.abort(ar3.msg_ids)
119 view.abort(ar3.msg_ids)
119 self.assertRaises(error.TaskAborted, ar2.get)
120 self.assertRaises(error.TaskAborted, ar2.get)
120 self.assertRaises(error.TaskAborted, ar3.get)
121 self.assertRaises(error.TaskAborted, ar3.get)
121
122
122 def test_retries(self):
123 def test_retries(self):
123 view = self.view
124 view = self.view
124 view.timeout = 1 # prevent hang if this doesn't behave
125 view.timeout = 1 # prevent hang if this doesn't behave
125 def fail():
126 def fail():
126 assert False
127 assert False
127 for r in range(len(self.client)-1):
128 for r in range(len(self.client)-1):
128 with view.temp_flags(retries=r):
129 with view.temp_flags(retries=r):
129 self.assertRaisesRemote(AssertionError, view.apply_sync, fail)
130 self.assertRaisesRemote(AssertionError, view.apply_sync, fail)
130
131
131 with view.temp_flags(retries=len(self.client), timeout=0.25):
132 with view.temp_flags(retries=len(self.client), timeout=0.25):
132 self.assertRaisesRemote(error.TaskTimeout, view.apply_sync, fail)
133 self.assertRaisesRemote(error.TaskTimeout, view.apply_sync, fail)
133
134
134 def test_invalid_dependency(self):
135 def test_invalid_dependency(self):
135 view = self.view
136 view = self.view
136 with view.temp_flags(after='12345'):
137 with view.temp_flags(after='12345'):
137 self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1)
138 self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1)
138
139
139 def test_impossible_dependency(self):
140 def test_impossible_dependency(self):
140 self.minimum_engines(2)
141 self.minimum_engines(2)
141 view = self.client.load_balanced_view()
142 view = self.client.load_balanced_view()
142 ar1 = view.apply_async(lambda : 1)
143 ar1 = view.apply_async(lambda : 1)
143 ar1.get()
144 ar1.get()
144 e1 = ar1.engine_id
145 e1 = ar1.engine_id
145 e2 = e1
146 e2 = e1
146 while e2 == e1:
147 while e2 == e1:
147 ar2 = view.apply_async(lambda : 1)
148 ar2 = view.apply_async(lambda : 1)
148 ar2.get()
149 ar2.get()
149 e2 = ar2.engine_id
150 e2 = ar2.engine_id
150
151
151 with view.temp_flags(follow=[ar1, ar2]):
152 with view.temp_flags(follow=[ar1, ar2]):
152 self.assertRaisesRemote(error.ImpossibleDependency, view.apply_sync, lambda : 1)
153 self.assertRaisesRemote(error.ImpossibleDependency, view.apply_sync, lambda : 1)
153
154
154
155
155 def test_follow(self):
156 def test_follow(self):
156 ar = self.view.apply_async(lambda : 1)
157 ar = self.view.apply_async(lambda : 1)
157 ar.get()
158 ar.get()
158 ars = []
159 ars = []
159 first_id = ar.engine_id
160 first_id = ar.engine_id
160
161
161 self.view.follow = ar
162 self.view.follow = ar
162 for i in range(5):
163 for i in range(5):
163 ars.append(self.view.apply_async(lambda : 1))
164 ars.append(self.view.apply_async(lambda : 1))
164 self.view.wait(ars)
165 self.view.wait(ars)
165 for ar in ars:
166 for ar in ars:
166 self.assertEqual(ar.engine_id, first_id)
167 self.assertEqual(ar.engine_id, first_id)
167
168
168 def test_after(self):
169 def test_after(self):
169 view = self.view
170 view = self.view
170 ar = view.apply_async(time.sleep, 0.5)
171 ar = view.apply_async(time.sleep, 0.5)
171 with view.temp_flags(after=ar):
172 with view.temp_flags(after=ar):
172 ar2 = view.apply_async(lambda : 1)
173 ar2 = view.apply_async(lambda : 1)
173
174
174 ar.wait()
175 ar.wait()
175 ar2.wait()
176 ar2.wait()
176 self.assertTrue(ar2.started >= ar.completed, "%s not >= %s"%(ar.started, ar.completed))
177 self.assertTrue(ar2.started >= ar.completed, "%s not >= %s"%(ar.started, ar.completed))
@@ -1,678 +1,679 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """test View objects
2 """test View objects
3
3
4 Authors:
4 Authors:
5
5
6 * Min RK
6 * Min RK
7 """
7 """
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import sys
19 import sys
20 import platform
20 import platform
21 import time
21 import time
22 from tempfile import mktemp
22 from tempfile import mktemp
23 from StringIO import StringIO
23 from StringIO import StringIO
24
24
25 import zmq
25 import zmq
26 from nose import SkipTest
26 from nose import SkipTest
27 from nose.plugins.attrib import attr
27
28
28 from IPython.testing import decorators as dec
29 from IPython.testing import decorators as dec
29 from IPython.testing.ipunittest import ParametricTestCase
30 from IPython.testing.ipunittest import ParametricTestCase
30
31
31 from IPython import parallel as pmod
32 from IPython import parallel as pmod
32 from IPython.parallel import error
33 from IPython.parallel import error
33 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
34 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
34 from IPython.parallel import DirectView
35 from IPython.parallel import DirectView
35 from IPython.parallel.util import interactive
36 from IPython.parallel.util import interactive
36
37
37 from IPython.parallel.tests import add_engines
38 from IPython.parallel.tests import add_engines
38
39
39 from .clienttest import ClusterTestCase, crash, wait, skip_without
40 from .clienttest import ClusterTestCase, crash, wait, skip_without
40
41
41 def setup():
42 def setup():
42 add_engines(3, total=True)
43 add_engines(3, total=True)
43
44
44 class TestView(ClusterTestCase, ParametricTestCase):
45 class TestView(ClusterTestCase, ParametricTestCase):
45
46
46 def setUp(self):
47 def setUp(self):
47 # On Win XP, wait for resource cleanup, else parallel test group fails
48 # On Win XP, wait for resource cleanup, else parallel test group fails
48 if platform.system() == "Windows" and platform.win32_ver()[0] == "XP":
49 if platform.system() == "Windows" and platform.win32_ver()[0] == "XP":
49 # 1 sec fails. 1.5 sec seems ok. Using 2 sec for margin of safety
50 # 1 sec fails. 1.5 sec seems ok. Using 2 sec for margin of safety
50 time.sleep(2)
51 time.sleep(2)
51 super(TestView, self).setUp()
52 super(TestView, self).setUp()
52
53
54 @attr('crash')
53 def test_z_crash_mux(self):
55 def test_z_crash_mux(self):
54 """test graceful handling of engine death (direct)"""
56 """test graceful handling of engine death (direct)"""
55 raise SkipTest("crash tests disabled, due to undesirable crash reports")
56 # self.add_engines(1)
57 # self.add_engines(1)
57 eid = self.client.ids[-1]
58 eid = self.client.ids[-1]
58 ar = self.client[eid].apply_async(crash)
59 ar = self.client[eid].apply_async(crash)
59 self.assertRaisesRemote(error.EngineError, ar.get, 10)
60 self.assertRaisesRemote(error.EngineError, ar.get, 10)
60 eid = ar.engine_id
61 eid = ar.engine_id
61 tic = time.time()
62 tic = time.time()
62 while eid in self.client.ids and time.time()-tic < 5:
63 while eid in self.client.ids and time.time()-tic < 5:
63 time.sleep(.01)
64 time.sleep(.01)
64 self.client.spin()
65 self.client.spin()
65 self.assertFalse(eid in self.client.ids, "Engine should have died")
66 self.assertFalse(eid in self.client.ids, "Engine should have died")
66
67
67 def test_push_pull(self):
68 def test_push_pull(self):
68 """test pushing and pulling"""
69 """test pushing and pulling"""
69 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
70 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
70 t = self.client.ids[-1]
71 t = self.client.ids[-1]
71 v = self.client[t]
72 v = self.client[t]
72 push = v.push
73 push = v.push
73 pull = v.pull
74 pull = v.pull
74 v.block=True
75 v.block=True
75 nengines = len(self.client)
76 nengines = len(self.client)
76 push({'data':data})
77 push({'data':data})
77 d = pull('data')
78 d = pull('data')
78 self.assertEqual(d, data)
79 self.assertEqual(d, data)
79 self.client[:].push({'data':data})
80 self.client[:].push({'data':data})
80 d = self.client[:].pull('data', block=True)
81 d = self.client[:].pull('data', block=True)
81 self.assertEqual(d, nengines*[data])
82 self.assertEqual(d, nengines*[data])
82 ar = push({'data':data}, block=False)
83 ar = push({'data':data}, block=False)
83 self.assertTrue(isinstance(ar, AsyncResult))
84 self.assertTrue(isinstance(ar, AsyncResult))
84 r = ar.get()
85 r = ar.get()
85 ar = self.client[:].pull('data', block=False)
86 ar = self.client[:].pull('data', block=False)
86 self.assertTrue(isinstance(ar, AsyncResult))
87 self.assertTrue(isinstance(ar, AsyncResult))
87 r = ar.get()
88 r = ar.get()
88 self.assertEqual(r, nengines*[data])
89 self.assertEqual(r, nengines*[data])
89 self.client[:].push(dict(a=10,b=20))
90 self.client[:].push(dict(a=10,b=20))
90 r = self.client[:].pull(('a','b'), block=True)
91 r = self.client[:].pull(('a','b'), block=True)
91 self.assertEqual(r, nengines*[[10,20]])
92 self.assertEqual(r, nengines*[[10,20]])
92
93
93 def test_push_pull_function(self):
94 def test_push_pull_function(self):
94 "test pushing and pulling functions"
95 "test pushing and pulling functions"
95 def testf(x):
96 def testf(x):
96 return 2.0*x
97 return 2.0*x
97
98
98 t = self.client.ids[-1]
99 t = self.client.ids[-1]
99 v = self.client[t]
100 v = self.client[t]
100 v.block=True
101 v.block=True
101 push = v.push
102 push = v.push
102 pull = v.pull
103 pull = v.pull
103 execute = v.execute
104 execute = v.execute
104 push({'testf':testf})
105 push({'testf':testf})
105 r = pull('testf')
106 r = pull('testf')
106 self.assertEqual(r(1.0), testf(1.0))
107 self.assertEqual(r(1.0), testf(1.0))
107 execute('r = testf(10)')
108 execute('r = testf(10)')
108 r = pull('r')
109 r = pull('r')
109 self.assertEqual(r, testf(10))
110 self.assertEqual(r, testf(10))
110 ar = self.client[:].push({'testf':testf}, block=False)
111 ar = self.client[:].push({'testf':testf}, block=False)
111 ar.get()
112 ar.get()
112 ar = self.client[:].pull('testf', block=False)
113 ar = self.client[:].pull('testf', block=False)
113 rlist = ar.get()
114 rlist = ar.get()
114 for r in rlist:
115 for r in rlist:
115 self.assertEqual(r(1.0), testf(1.0))
116 self.assertEqual(r(1.0), testf(1.0))
116 execute("def g(x): return x*x")
117 execute("def g(x): return x*x")
117 r = pull(('testf','g'))
118 r = pull(('testf','g'))
118 self.assertEqual((r[0](10),r[1](10)), (testf(10), 100))
119 self.assertEqual((r[0](10),r[1](10)), (testf(10), 100))
119
120
120 def test_push_function_globals(self):
121 def test_push_function_globals(self):
121 """test that pushed functions have access to globals"""
122 """test that pushed functions have access to globals"""
122 @interactive
123 @interactive
123 def geta():
124 def geta():
124 return a
125 return a
125 # self.add_engines(1)
126 # self.add_engines(1)
126 v = self.client[-1]
127 v = self.client[-1]
127 v.block=True
128 v.block=True
128 v['f'] = geta
129 v['f'] = geta
129 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
130 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
130 v.execute('a=5')
131 v.execute('a=5')
131 v.execute('b=f()')
132 v.execute('b=f()')
132 self.assertEqual(v['b'], 5)
133 self.assertEqual(v['b'], 5)
133
134
134 def test_push_function_defaults(self):
135 def test_push_function_defaults(self):
135 """test that pushed functions preserve default args"""
136 """test that pushed functions preserve default args"""
136 def echo(a=10):
137 def echo(a=10):
137 return a
138 return a
138 v = self.client[-1]
139 v = self.client[-1]
139 v.block=True
140 v.block=True
140 v['f'] = echo
141 v['f'] = echo
141 v.execute('b=f()')
142 v.execute('b=f()')
142 self.assertEqual(v['b'], 10)
143 self.assertEqual(v['b'], 10)
143
144
144 def test_get_result(self):
145 def test_get_result(self):
145 """test getting results from the Hub."""
146 """test getting results from the Hub."""
146 c = pmod.Client(profile='iptest')
147 c = pmod.Client(profile='iptest')
147 # self.add_engines(1)
148 # self.add_engines(1)
148 t = c.ids[-1]
149 t = c.ids[-1]
149 v = c[t]
150 v = c[t]
150 v2 = self.client[t]
151 v2 = self.client[t]
151 ar = v.apply_async(wait, 1)
152 ar = v.apply_async(wait, 1)
152 # give the monitor time to notice the message
153 # give the monitor time to notice the message
153 time.sleep(.25)
154 time.sleep(.25)
154 ahr = v2.get_result(ar.msg_ids)
155 ahr = v2.get_result(ar.msg_ids)
155 self.assertTrue(isinstance(ahr, AsyncHubResult))
156 self.assertTrue(isinstance(ahr, AsyncHubResult))
156 self.assertEqual(ahr.get(), ar.get())
157 self.assertEqual(ahr.get(), ar.get())
157 ar2 = v2.get_result(ar.msg_ids)
158 ar2 = v2.get_result(ar.msg_ids)
158 self.assertFalse(isinstance(ar2, AsyncHubResult))
159 self.assertFalse(isinstance(ar2, AsyncHubResult))
159 c.spin()
160 c.spin()
160 c.close()
161 c.close()
161
162
162 def test_run_newline(self):
163 def test_run_newline(self):
163 """test that run appends newline to files"""
164 """test that run appends newline to files"""
164 tmpfile = mktemp()
165 tmpfile = mktemp()
165 with open(tmpfile, 'w') as f:
166 with open(tmpfile, 'w') as f:
166 f.write("""def g():
167 f.write("""def g():
167 return 5
168 return 5
168 """)
169 """)
169 v = self.client[-1]
170 v = self.client[-1]
170 v.run(tmpfile, block=True)
171 v.run(tmpfile, block=True)
171 self.assertEqual(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
172 self.assertEqual(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
172
173
173 def test_apply_tracked(self):
174 def test_apply_tracked(self):
174 """test tracking for apply"""
175 """test tracking for apply"""
175 # self.add_engines(1)
176 # self.add_engines(1)
176 t = self.client.ids[-1]
177 t = self.client.ids[-1]
177 v = self.client[t]
178 v = self.client[t]
178 v.block=False
179 v.block=False
179 def echo(n=1024*1024, **kwargs):
180 def echo(n=1024*1024, **kwargs):
180 with v.temp_flags(**kwargs):
181 with v.temp_flags(**kwargs):
181 return v.apply(lambda x: x, 'x'*n)
182 return v.apply(lambda x: x, 'x'*n)
182 ar = echo(1, track=False)
183 ar = echo(1, track=False)
183 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
184 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
184 self.assertTrue(ar.sent)
185 self.assertTrue(ar.sent)
185 ar = echo(track=True)
186 ar = echo(track=True)
186 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
187 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
187 self.assertEqual(ar.sent, ar._tracker.done)
188 self.assertEqual(ar.sent, ar._tracker.done)
188 ar._tracker.wait()
189 ar._tracker.wait()
189 self.assertTrue(ar.sent)
190 self.assertTrue(ar.sent)
190
191
191 def test_push_tracked(self):
192 def test_push_tracked(self):
192 t = self.client.ids[-1]
193 t = self.client.ids[-1]
193 ns = dict(x='x'*1024*1024)
194 ns = dict(x='x'*1024*1024)
194 v = self.client[t]
195 v = self.client[t]
195 ar = v.push(ns, block=False, track=False)
196 ar = v.push(ns, block=False, track=False)
196 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
197 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
197 self.assertTrue(ar.sent)
198 self.assertTrue(ar.sent)
198
199
199 ar = v.push(ns, block=False, track=True)
200 ar = v.push(ns, block=False, track=True)
200 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
201 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
201 ar._tracker.wait()
202 ar._tracker.wait()
202 self.assertEqual(ar.sent, ar._tracker.done)
203 self.assertEqual(ar.sent, ar._tracker.done)
203 self.assertTrue(ar.sent)
204 self.assertTrue(ar.sent)
204 ar.get()
205 ar.get()
205
206
206 def test_scatter_tracked(self):
207 def test_scatter_tracked(self):
207 t = self.client.ids
208 t = self.client.ids
208 x='x'*1024*1024
209 x='x'*1024*1024
209 ar = self.client[t].scatter('x', x, block=False, track=False)
210 ar = self.client[t].scatter('x', x, block=False, track=False)
210 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
211 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
211 self.assertTrue(ar.sent)
212 self.assertTrue(ar.sent)
212
213
213 ar = self.client[t].scatter('x', x, block=False, track=True)
214 ar = self.client[t].scatter('x', x, block=False, track=True)
214 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
215 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
215 self.assertEqual(ar.sent, ar._tracker.done)
216 self.assertEqual(ar.sent, ar._tracker.done)
216 ar._tracker.wait()
217 ar._tracker.wait()
217 self.assertTrue(ar.sent)
218 self.assertTrue(ar.sent)
218 ar.get()
219 ar.get()
219
220
220 def test_remote_reference(self):
221 def test_remote_reference(self):
221 v = self.client[-1]
222 v = self.client[-1]
222 v['a'] = 123
223 v['a'] = 123
223 ra = pmod.Reference('a')
224 ra = pmod.Reference('a')
224 b = v.apply_sync(lambda x: x, ra)
225 b = v.apply_sync(lambda x: x, ra)
225 self.assertEqual(b, 123)
226 self.assertEqual(b, 123)
226
227
227
228
228 def test_scatter_gather(self):
229 def test_scatter_gather(self):
229 view = self.client[:]
230 view = self.client[:]
230 seq1 = range(16)
231 seq1 = range(16)
231 view.scatter('a', seq1)
232 view.scatter('a', seq1)
232 seq2 = view.gather('a', block=True)
233 seq2 = view.gather('a', block=True)
233 self.assertEqual(seq2, seq1)
234 self.assertEqual(seq2, seq1)
234 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
235 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
235
236
236 @skip_without('numpy')
237 @skip_without('numpy')
237 def test_scatter_gather_numpy(self):
238 def test_scatter_gather_numpy(self):
238 import numpy
239 import numpy
239 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
240 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
240 view = self.client[:]
241 view = self.client[:]
241 a = numpy.arange(64)
242 a = numpy.arange(64)
242 view.scatter('a', a, block=True)
243 view.scatter('a', a, block=True)
243 b = view.gather('a', block=True)
244 b = view.gather('a', block=True)
244 assert_array_equal(b, a)
245 assert_array_equal(b, a)
245
246
246 def test_scatter_gather_lazy(self):
247 def test_scatter_gather_lazy(self):
247 """scatter/gather with targets='all'"""
248 """scatter/gather with targets='all'"""
248 view = self.client.direct_view(targets='all')
249 view = self.client.direct_view(targets='all')
249 x = range(64)
250 x = range(64)
250 view.scatter('x', x)
251 view.scatter('x', x)
251 gathered = view.gather('x', block=True)
252 gathered = view.gather('x', block=True)
252 self.assertEqual(gathered, x)
253 self.assertEqual(gathered, x)
253
254
254
255
255 @dec.known_failure_py3
256 @dec.known_failure_py3
256 @skip_without('numpy')
257 @skip_without('numpy')
257 def test_push_numpy_nocopy(self):
258 def test_push_numpy_nocopy(self):
258 import numpy
259 import numpy
259 view = self.client[:]
260 view = self.client[:]
260 a = numpy.arange(64)
261 a = numpy.arange(64)
261 view['A'] = a
262 view['A'] = a
262 @interactive
263 @interactive
263 def check_writeable(x):
264 def check_writeable(x):
264 return x.flags.writeable
265 return x.flags.writeable
265
266
266 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
267 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
267 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
268 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
268
269
269 view.push(dict(B=a))
270 view.push(dict(B=a))
270 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
271 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
271 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
272 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
272
273
273 @skip_without('numpy')
274 @skip_without('numpy')
274 def test_apply_numpy(self):
275 def test_apply_numpy(self):
275 """view.apply(f, ndarray)"""
276 """view.apply(f, ndarray)"""
276 import numpy
277 import numpy
277 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
278 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
278
279
279 A = numpy.random.random((100,100))
280 A = numpy.random.random((100,100))
280 view = self.client[-1]
281 view = self.client[-1]
281 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
282 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
282 B = A.astype(dt)
283 B = A.astype(dt)
283 C = view.apply_sync(lambda x:x, B)
284 C = view.apply_sync(lambda x:x, B)
284 assert_array_equal(B,C)
285 assert_array_equal(B,C)
285
286
286 @skip_without('numpy')
287 @skip_without('numpy')
287 def test_push_pull_recarray(self):
288 def test_push_pull_recarray(self):
288 """push/pull recarrays"""
289 """push/pull recarrays"""
289 import numpy
290 import numpy
290 from numpy.testing.utils import assert_array_equal
291 from numpy.testing.utils import assert_array_equal
291
292
292 view = self.client[-1]
293 view = self.client[-1]
293
294
294 R = numpy.array([
295 R = numpy.array([
295 (1, 'hi', 0.),
296 (1, 'hi', 0.),
296 (2**30, 'there', 2.5),
297 (2**30, 'there', 2.5),
297 (-99999, 'world', -12345.6789),
298 (-99999, 'world', -12345.6789),
298 ], [('n', int), ('s', '|S10'), ('f', float)])
299 ], [('n', int), ('s', '|S10'), ('f', float)])
299
300
300 view['RR'] = R
301 view['RR'] = R
301 R2 = view['RR']
302 R2 = view['RR']
302
303
303 r_dtype, r_shape = view.apply_sync(interactive(lambda : (RR.dtype, RR.shape)))
304 r_dtype, r_shape = view.apply_sync(interactive(lambda : (RR.dtype, RR.shape)))
304 self.assertEqual(r_dtype, R.dtype)
305 self.assertEqual(r_dtype, R.dtype)
305 self.assertEqual(r_shape, R.shape)
306 self.assertEqual(r_shape, R.shape)
306 self.assertEqual(R2.dtype, R.dtype)
307 self.assertEqual(R2.dtype, R.dtype)
307 self.assertEqual(R2.shape, R.shape)
308 self.assertEqual(R2.shape, R.shape)
308 assert_array_equal(R2, R)
309 assert_array_equal(R2, R)
309
310
310 def test_map(self):
311 def test_map(self):
311 view = self.client[:]
312 view = self.client[:]
312 def f(x):
313 def f(x):
313 return x**2
314 return x**2
314 data = range(16)
315 data = range(16)
315 r = view.map_sync(f, data)
316 r = view.map_sync(f, data)
316 self.assertEqual(r, map(f, data))
317 self.assertEqual(r, map(f, data))
317
318
318 def test_map_iterable(self):
319 def test_map_iterable(self):
319 """test map on iterables (direct)"""
320 """test map on iterables (direct)"""
320 view = self.client[:]
321 view = self.client[:]
321 # 101 is prime, so it won't be evenly distributed
322 # 101 is prime, so it won't be evenly distributed
322 arr = range(101)
323 arr = range(101)
323 # ensure it will be an iterator, even in Python 3
324 # ensure it will be an iterator, even in Python 3
324 it = iter(arr)
325 it = iter(arr)
325 r = view.map_sync(lambda x:x, arr)
326 r = view.map_sync(lambda x:x, arr)
326 self.assertEqual(r, list(arr))
327 self.assertEqual(r, list(arr))
327
328
328 def test_scatter_gather_nonblocking(self):
329 def test_scatter_gather_nonblocking(self):
329 data = range(16)
330 data = range(16)
330 view = self.client[:]
331 view = self.client[:]
331 view.scatter('a', data, block=False)
332 view.scatter('a', data, block=False)
332 ar = view.gather('a', block=False)
333 ar = view.gather('a', block=False)
333 self.assertEqual(ar.get(), data)
334 self.assertEqual(ar.get(), data)
334
335
335 @skip_without('numpy')
336 @skip_without('numpy')
336 def test_scatter_gather_numpy_nonblocking(self):
337 def test_scatter_gather_numpy_nonblocking(self):
337 import numpy
338 import numpy
338 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
339 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
339 a = numpy.arange(64)
340 a = numpy.arange(64)
340 view = self.client[:]
341 view = self.client[:]
341 ar = view.scatter('a', a, block=False)
342 ar = view.scatter('a', a, block=False)
342 self.assertTrue(isinstance(ar, AsyncResult))
343 self.assertTrue(isinstance(ar, AsyncResult))
343 amr = view.gather('a', block=False)
344 amr = view.gather('a', block=False)
344 self.assertTrue(isinstance(amr, AsyncMapResult))
345 self.assertTrue(isinstance(amr, AsyncMapResult))
345 assert_array_equal(amr.get(), a)
346 assert_array_equal(amr.get(), a)
346
347
347 def test_execute(self):
348 def test_execute(self):
348 view = self.client[:]
349 view = self.client[:]
349 # self.client.debug=True
350 # self.client.debug=True
350 execute = view.execute
351 execute = view.execute
351 ar = execute('c=30', block=False)
352 ar = execute('c=30', block=False)
352 self.assertTrue(isinstance(ar, AsyncResult))
353 self.assertTrue(isinstance(ar, AsyncResult))
353 ar = execute('d=[0,1,2]', block=False)
354 ar = execute('d=[0,1,2]', block=False)
354 self.client.wait(ar, 1)
355 self.client.wait(ar, 1)
355 self.assertEqual(len(ar.get()), len(self.client))
356 self.assertEqual(len(ar.get()), len(self.client))
356 for c in view['c']:
357 for c in view['c']:
357 self.assertEqual(c, 30)
358 self.assertEqual(c, 30)
358
359
359 def test_abort(self):
360 def test_abort(self):
360 view = self.client[-1]
361 view = self.client[-1]
361 ar = view.execute('import time; time.sleep(1)', block=False)
362 ar = view.execute('import time; time.sleep(1)', block=False)
362 ar2 = view.apply_async(lambda : 2)
363 ar2 = view.apply_async(lambda : 2)
363 ar3 = view.apply_async(lambda : 3)
364 ar3 = view.apply_async(lambda : 3)
364 view.abort(ar2)
365 view.abort(ar2)
365 view.abort(ar3.msg_ids)
366 view.abort(ar3.msg_ids)
366 self.assertRaises(error.TaskAborted, ar2.get)
367 self.assertRaises(error.TaskAborted, ar2.get)
367 self.assertRaises(error.TaskAborted, ar3.get)
368 self.assertRaises(error.TaskAborted, ar3.get)
368
369
369 def test_abort_all(self):
370 def test_abort_all(self):
370 """view.abort() aborts all outstanding tasks"""
371 """view.abort() aborts all outstanding tasks"""
371 view = self.client[-1]
372 view = self.client[-1]
372 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
373 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
373 view.abort()
374 view.abort()
374 view.wait(timeout=5)
375 view.wait(timeout=5)
375 for ar in ars[5:]:
376 for ar in ars[5:]:
376 self.assertRaises(error.TaskAborted, ar.get)
377 self.assertRaises(error.TaskAborted, ar.get)
377
378
378 def test_temp_flags(self):
379 def test_temp_flags(self):
379 view = self.client[-1]
380 view = self.client[-1]
380 view.block=True
381 view.block=True
381 with view.temp_flags(block=False):
382 with view.temp_flags(block=False):
382 self.assertFalse(view.block)
383 self.assertFalse(view.block)
383 self.assertTrue(view.block)
384 self.assertTrue(view.block)
384
385
385 @dec.known_failure_py3
386 @dec.known_failure_py3
386 def test_importer(self):
387 def test_importer(self):
387 view = self.client[-1]
388 view = self.client[-1]
388 view.clear(block=True)
389 view.clear(block=True)
389 with view.importer:
390 with view.importer:
390 import re
391 import re
391
392
392 @interactive
393 @interactive
393 def findall(pat, s):
394 def findall(pat, s):
394 # this globals() step isn't necessary in real code
395 # this globals() step isn't necessary in real code
395 # only to prevent a closure in the test
396 # only to prevent a closure in the test
396 re = globals()['re']
397 re = globals()['re']
397 return re.findall(pat, s)
398 return re.findall(pat, s)
398
399
399 self.assertEqual(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
400 self.assertEqual(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
400
401
401 def test_unicode_execute(self):
402 def test_unicode_execute(self):
402 """test executing unicode strings"""
403 """test executing unicode strings"""
403 v = self.client[-1]
404 v = self.client[-1]
404 v.block=True
405 v.block=True
405 if sys.version_info[0] >= 3:
406 if sys.version_info[0] >= 3:
406 code="a='é'"
407 code="a='é'"
407 else:
408 else:
408 code=u"a=u'é'"
409 code=u"a=u'é'"
409 v.execute(code)
410 v.execute(code)
410 self.assertEqual(v['a'], u'é')
411 self.assertEqual(v['a'], u'é')
411
412
412 def test_unicode_apply_result(self):
413 def test_unicode_apply_result(self):
413 """test unicode apply results"""
414 """test unicode apply results"""
414 v = self.client[-1]
415 v = self.client[-1]
415 r = v.apply_sync(lambda : u'é')
416 r = v.apply_sync(lambda : u'é')
416 self.assertEqual(r, u'é')
417 self.assertEqual(r, u'é')
417
418
418 def test_unicode_apply_arg(self):
419 def test_unicode_apply_arg(self):
419 """test passing unicode arguments to apply"""
420 """test passing unicode arguments to apply"""
420 v = self.client[-1]
421 v = self.client[-1]
421
422
422 @interactive
423 @interactive
423 def check_unicode(a, check):
424 def check_unicode(a, check):
424 assert isinstance(a, unicode), "%r is not unicode"%a
425 assert isinstance(a, unicode), "%r is not unicode"%a
425 assert isinstance(check, bytes), "%r is not bytes"%check
426 assert isinstance(check, bytes), "%r is not bytes"%check
426 assert a.encode('utf8') == check, "%s != %s"%(a,check)
427 assert a.encode('utf8') == check, "%s != %s"%(a,check)
427
428
428 for s in [ u'é', u'ßø®∫',u'asdf' ]:
429 for s in [ u'é', u'ßø®∫',u'asdf' ]:
429 try:
430 try:
430 v.apply_sync(check_unicode, s, s.encode('utf8'))
431 v.apply_sync(check_unicode, s, s.encode('utf8'))
431 except error.RemoteError as e:
432 except error.RemoteError as e:
432 if e.ename == 'AssertionError':
433 if e.ename == 'AssertionError':
433 self.fail(e.evalue)
434 self.fail(e.evalue)
434 else:
435 else:
435 raise e
436 raise e
436
437
437 def test_map_reference(self):
438 def test_map_reference(self):
438 """view.map(<Reference>, *seqs) should work"""
439 """view.map(<Reference>, *seqs) should work"""
439 v = self.client[:]
440 v = self.client[:]
440 v.scatter('n', self.client.ids, flatten=True)
441 v.scatter('n', self.client.ids, flatten=True)
441 v.execute("f = lambda x,y: x*y")
442 v.execute("f = lambda x,y: x*y")
442 rf = pmod.Reference('f')
443 rf = pmod.Reference('f')
443 nlist = list(range(10))
444 nlist = list(range(10))
444 mlist = nlist[::-1]
445 mlist = nlist[::-1]
445 expected = [ m*n for m,n in zip(mlist, nlist) ]
446 expected = [ m*n for m,n in zip(mlist, nlist) ]
446 result = v.map_sync(rf, mlist, nlist)
447 result = v.map_sync(rf, mlist, nlist)
447 self.assertEqual(result, expected)
448 self.assertEqual(result, expected)
448
449
449 def test_apply_reference(self):
450 def test_apply_reference(self):
450 """view.apply(<Reference>, *args) should work"""
451 """view.apply(<Reference>, *args) should work"""
451 v = self.client[:]
452 v = self.client[:]
452 v.scatter('n', self.client.ids, flatten=True)
453 v.scatter('n', self.client.ids, flatten=True)
453 v.execute("f = lambda x: n*x")
454 v.execute("f = lambda x: n*x")
454 rf = pmod.Reference('f')
455 rf = pmod.Reference('f')
455 result = v.apply_sync(rf, 5)
456 result = v.apply_sync(rf, 5)
456 expected = [ 5*id for id in self.client.ids ]
457 expected = [ 5*id for id in self.client.ids ]
457 self.assertEqual(result, expected)
458 self.assertEqual(result, expected)
458
459
459 def test_eval_reference(self):
460 def test_eval_reference(self):
460 v = self.client[self.client.ids[0]]
461 v = self.client[self.client.ids[0]]
461 v['g'] = range(5)
462 v['g'] = range(5)
462 rg = pmod.Reference('g[0]')
463 rg = pmod.Reference('g[0]')
463 echo = lambda x:x
464 echo = lambda x:x
464 self.assertEqual(v.apply_sync(echo, rg), 0)
465 self.assertEqual(v.apply_sync(echo, rg), 0)
465
466
466 def test_reference_nameerror(self):
467 def test_reference_nameerror(self):
467 v = self.client[self.client.ids[0]]
468 v = self.client[self.client.ids[0]]
468 r = pmod.Reference('elvis_has_left')
469 r = pmod.Reference('elvis_has_left')
469 echo = lambda x:x
470 echo = lambda x:x
470 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
471 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
471
472
472 def test_single_engine_map(self):
473 def test_single_engine_map(self):
473 e0 = self.client[self.client.ids[0]]
474 e0 = self.client[self.client.ids[0]]
474 r = range(5)
475 r = range(5)
475 check = [ -1*i for i in r ]
476 check = [ -1*i for i in r ]
476 result = e0.map_sync(lambda x: -1*x, r)
477 result = e0.map_sync(lambda x: -1*x, r)
477 self.assertEqual(result, check)
478 self.assertEqual(result, check)
478
479
479 def test_len(self):
480 def test_len(self):
480 """len(view) makes sense"""
481 """len(view) makes sense"""
481 e0 = self.client[self.client.ids[0]]
482 e0 = self.client[self.client.ids[0]]
482 yield self.assertEqual(len(e0), 1)
483 yield self.assertEqual(len(e0), 1)
483 v = self.client[:]
484 v = self.client[:]
484 yield self.assertEqual(len(v), len(self.client.ids))
485 yield self.assertEqual(len(v), len(self.client.ids))
485 v = self.client.direct_view('all')
486 v = self.client.direct_view('all')
486 yield self.assertEqual(len(v), len(self.client.ids))
487 yield self.assertEqual(len(v), len(self.client.ids))
487 v = self.client[:2]
488 v = self.client[:2]
488 yield self.assertEqual(len(v), 2)
489 yield self.assertEqual(len(v), 2)
489 v = self.client[:1]
490 v = self.client[:1]
490 yield self.assertEqual(len(v), 1)
491 yield self.assertEqual(len(v), 1)
491 v = self.client.load_balanced_view()
492 v = self.client.load_balanced_view()
492 yield self.assertEqual(len(v), len(self.client.ids))
493 yield self.assertEqual(len(v), len(self.client.ids))
493 # parametric tests seem to require manual closing?
494 # parametric tests seem to require manual closing?
494 self.client.close()
495 self.client.close()
495
496
496
497
497 # begin execute tests
498 # begin execute tests
498
499
499 def test_execute_reply(self):
500 def test_execute_reply(self):
500 e0 = self.client[self.client.ids[0]]
501 e0 = self.client[self.client.ids[0]]
501 e0.block = True
502 e0.block = True
502 ar = e0.execute("5", silent=False)
503 ar = e0.execute("5", silent=False)
503 er = ar.get()
504 er = ar.get()
504 self.assertEqual(str(er), "<ExecuteReply[%i]: 5>" % er.execution_count)
505 self.assertEqual(str(er), "<ExecuteReply[%i]: 5>" % er.execution_count)
505 self.assertEqual(er.pyout['data']['text/plain'], '5')
506 self.assertEqual(er.pyout['data']['text/plain'], '5')
506
507
507 def test_execute_reply_stdout(self):
508 def test_execute_reply_stdout(self):
508 e0 = self.client[self.client.ids[0]]
509 e0 = self.client[self.client.ids[0]]
509 e0.block = True
510 e0.block = True
510 ar = e0.execute("print (5)", silent=False)
511 ar = e0.execute("print (5)", silent=False)
511 er = ar.get()
512 er = ar.get()
512 self.assertEqual(er.stdout.strip(), '5')
513 self.assertEqual(er.stdout.strip(), '5')
513
514
514 def test_execute_pyout(self):
515 def test_execute_pyout(self):
515 """execute triggers pyout with silent=False"""
516 """execute triggers pyout with silent=False"""
516 view = self.client[:]
517 view = self.client[:]
517 ar = view.execute("5", silent=False, block=True)
518 ar = view.execute("5", silent=False, block=True)
518
519
519 expected = [{'text/plain' : '5'}] * len(view)
520 expected = [{'text/plain' : '5'}] * len(view)
520 mimes = [ out['data'] for out in ar.pyout ]
521 mimes = [ out['data'] for out in ar.pyout ]
521 self.assertEqual(mimes, expected)
522 self.assertEqual(mimes, expected)
522
523
523 def test_execute_silent(self):
524 def test_execute_silent(self):
524 """execute does not trigger pyout with silent=True"""
525 """execute does not trigger pyout with silent=True"""
525 view = self.client[:]
526 view = self.client[:]
526 ar = view.execute("5", block=True)
527 ar = view.execute("5", block=True)
527 expected = [None] * len(view)
528 expected = [None] * len(view)
528 self.assertEqual(ar.pyout, expected)
529 self.assertEqual(ar.pyout, expected)
529
530
530 def test_execute_magic(self):
531 def test_execute_magic(self):
531 """execute accepts IPython commands"""
532 """execute accepts IPython commands"""
532 view = self.client[:]
533 view = self.client[:]
533 view.execute("a = 5")
534 view.execute("a = 5")
534 ar = view.execute("%whos", block=True)
535 ar = view.execute("%whos", block=True)
535 # this will raise, if that failed
536 # this will raise, if that failed
536 ar.get(5)
537 ar.get(5)
537 for stdout in ar.stdout:
538 for stdout in ar.stdout:
538 lines = stdout.splitlines()
539 lines = stdout.splitlines()
539 self.assertEqual(lines[0].split(), ['Variable', 'Type', 'Data/Info'])
540 self.assertEqual(lines[0].split(), ['Variable', 'Type', 'Data/Info'])
540 found = False
541 found = False
541 for line in lines[2:]:
542 for line in lines[2:]:
542 split = line.split()
543 split = line.split()
543 if split == ['a', 'int', '5']:
544 if split == ['a', 'int', '5']:
544 found = True
545 found = True
545 break
546 break
546 self.assertTrue(found, "whos output wrong: %s" % stdout)
547 self.assertTrue(found, "whos output wrong: %s" % stdout)
547
548
548 def test_execute_displaypub(self):
549 def test_execute_displaypub(self):
549 """execute tracks display_pub output"""
550 """execute tracks display_pub output"""
550 view = self.client[:]
551 view = self.client[:]
551 view.execute("from IPython.core.display import *")
552 view.execute("from IPython.core.display import *")
552 ar = view.execute("[ display(i) for i in range(5) ]", block=True)
553 ar = view.execute("[ display(i) for i in range(5) ]", block=True)
553
554
554 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
555 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
555 for outputs in ar.outputs:
556 for outputs in ar.outputs:
556 mimes = [ out['data'] for out in outputs ]
557 mimes = [ out['data'] for out in outputs ]
557 self.assertEqual(mimes, expected)
558 self.assertEqual(mimes, expected)
558
559
559 def test_apply_displaypub(self):
560 def test_apply_displaypub(self):
560 """apply tracks display_pub output"""
561 """apply tracks display_pub output"""
561 view = self.client[:]
562 view = self.client[:]
562 view.execute("from IPython.core.display import *")
563 view.execute("from IPython.core.display import *")
563
564
564 @interactive
565 @interactive
565 def publish():
566 def publish():
566 [ display(i) for i in range(5) ]
567 [ display(i) for i in range(5) ]
567
568
568 ar = view.apply_async(publish)
569 ar = view.apply_async(publish)
569 ar.get(5)
570 ar.get(5)
570 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
571 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
571 for outputs in ar.outputs:
572 for outputs in ar.outputs:
572 mimes = [ out['data'] for out in outputs ]
573 mimes = [ out['data'] for out in outputs ]
573 self.assertEqual(mimes, expected)
574 self.assertEqual(mimes, expected)
574
575
575 def test_execute_raises(self):
576 def test_execute_raises(self):
576 """exceptions in execute requests raise appropriately"""
577 """exceptions in execute requests raise appropriately"""
577 view = self.client[-1]
578 view = self.client[-1]
578 ar = view.execute("1/0")
579 ar = view.execute("1/0")
579 self.assertRaisesRemote(ZeroDivisionError, ar.get, 2)
580 self.assertRaisesRemote(ZeroDivisionError, ar.get, 2)
580
581
581 @dec.skipif_not_matplotlib
582 @dec.skipif_not_matplotlib
582 def test_magic_pylab(self):
583 def test_magic_pylab(self):
583 """%pylab works on engines"""
584 """%pylab works on engines"""
584 view = self.client[-1]
585 view = self.client[-1]
585 ar = view.execute("%pylab inline")
586 ar = view.execute("%pylab inline")
586 # at least check if this raised:
587 # at least check if this raised:
587 reply = ar.get(5)
588 reply = ar.get(5)
588 # include imports, in case user config
589 # include imports, in case user config
589 ar = view.execute("plot(rand(100))", silent=False)
590 ar = view.execute("plot(rand(100))", silent=False)
590 reply = ar.get(5)
591 reply = ar.get(5)
591 self.assertEqual(len(reply.outputs), 1)
592 self.assertEqual(len(reply.outputs), 1)
592 output = reply.outputs[0]
593 output = reply.outputs[0]
593 self.assertTrue("data" in output)
594 self.assertTrue("data" in output)
594 data = output['data']
595 data = output['data']
595 self.assertTrue("image/png" in data)
596 self.assertTrue("image/png" in data)
596
597
597 def test_func_default_func(self):
598 def test_func_default_func(self):
598 """interactively defined function as apply func default"""
599 """interactively defined function as apply func default"""
599 def foo():
600 def foo():
600 return 'foo'
601 return 'foo'
601
602
602 def bar(f=foo):
603 def bar(f=foo):
603 return f()
604 return f()
604
605
605 view = self.client[-1]
606 view = self.client[-1]
606 ar = view.apply_async(bar)
607 ar = view.apply_async(bar)
607 r = ar.get(10)
608 r = ar.get(10)
608 self.assertEqual(r, 'foo')
609 self.assertEqual(r, 'foo')
609 def test_data_pub_single(self):
610 def test_data_pub_single(self):
610 view = self.client[-1]
611 view = self.client[-1]
611 ar = view.execute('\n'.join([
612 ar = view.execute('\n'.join([
612 'from IPython.zmq.datapub import publish_data',
613 'from IPython.zmq.datapub import publish_data',
613 'for i in range(5):',
614 'for i in range(5):',
614 ' publish_data(dict(i=i))'
615 ' publish_data(dict(i=i))'
615 ]), block=False)
616 ]), block=False)
616 self.assertTrue(isinstance(ar.data, dict))
617 self.assertTrue(isinstance(ar.data, dict))
617 ar.get(5)
618 ar.get(5)
618 self.assertEqual(ar.data, dict(i=4))
619 self.assertEqual(ar.data, dict(i=4))
619
620
620 def test_data_pub(self):
621 def test_data_pub(self):
621 view = self.client[:]
622 view = self.client[:]
622 ar = view.execute('\n'.join([
623 ar = view.execute('\n'.join([
623 'from IPython.zmq.datapub import publish_data',
624 'from IPython.zmq.datapub import publish_data',
624 'for i in range(5):',
625 'for i in range(5):',
625 ' publish_data(dict(i=i))'
626 ' publish_data(dict(i=i))'
626 ]), block=False)
627 ]), block=False)
627 self.assertTrue(all(isinstance(d, dict) for d in ar.data))
628 self.assertTrue(all(isinstance(d, dict) for d in ar.data))
628 ar.get(5)
629 ar.get(5)
629 self.assertEqual(ar.data, [dict(i=4)] * len(ar))
630 self.assertEqual(ar.data, [dict(i=4)] * len(ar))
630
631
631 def test_can_list_arg(self):
632 def test_can_list_arg(self):
632 """args in lists are canned"""
633 """args in lists are canned"""
633 view = self.client[-1]
634 view = self.client[-1]
634 view['a'] = 128
635 view['a'] = 128
635 rA = pmod.Reference('a')
636 rA = pmod.Reference('a')
636 ar = view.apply_async(lambda x: x, [rA])
637 ar = view.apply_async(lambda x: x, [rA])
637 r = ar.get(5)
638 r = ar.get(5)
638 self.assertEqual(r, [128])
639 self.assertEqual(r, [128])
639
640
640 def test_can_dict_arg(self):
641 def test_can_dict_arg(self):
641 """args in dicts are canned"""
642 """args in dicts are canned"""
642 view = self.client[-1]
643 view = self.client[-1]
643 view['a'] = 128
644 view['a'] = 128
644 rA = pmod.Reference('a')
645 rA = pmod.Reference('a')
645 ar = view.apply_async(lambda x: x, dict(foo=rA))
646 ar = view.apply_async(lambda x: x, dict(foo=rA))
646 r = ar.get(5)
647 r = ar.get(5)
647 self.assertEqual(r, dict(foo=128))
648 self.assertEqual(r, dict(foo=128))
648
649
649 def test_can_list_kwarg(self):
650 def test_can_list_kwarg(self):
650 """kwargs in lists are canned"""
651 """kwargs in lists are canned"""
651 view = self.client[-1]
652 view = self.client[-1]
652 view['a'] = 128
653 view['a'] = 128
653 rA = pmod.Reference('a')
654 rA = pmod.Reference('a')
654 ar = view.apply_async(lambda x=5: x, x=[rA])
655 ar = view.apply_async(lambda x=5: x, x=[rA])
655 r = ar.get(5)
656 r = ar.get(5)
656 self.assertEqual(r, [128])
657 self.assertEqual(r, [128])
657
658
658 def test_can_dict_kwarg(self):
659 def test_can_dict_kwarg(self):
659 """kwargs in dicts are canned"""
660 """kwargs in dicts are canned"""
660 view = self.client[-1]
661 view = self.client[-1]
661 view['a'] = 128
662 view['a'] = 128
662 rA = pmod.Reference('a')
663 rA = pmod.Reference('a')
663 ar = view.apply_async(lambda x=5: x, dict(foo=rA))
664 ar = view.apply_async(lambda x=5: x, dict(foo=rA))
664 r = ar.get(5)
665 r = ar.get(5)
665 self.assertEqual(r, dict(foo=128))
666 self.assertEqual(r, dict(foo=128))
666
667
667 def test_map_ref(self):
668 def test_map_ref(self):
668 """view.map works with references"""
669 """view.map works with references"""
669 view = self.client[:]
670 view = self.client[:]
670 ranks = sorted(self.client.ids)
671 ranks = sorted(self.client.ids)
671 view.scatter('rank', ranks, flatten=True)
672 view.scatter('rank', ranks, flatten=True)
672 rrank = pmod.Reference('rank')
673 rrank = pmod.Reference('rank')
673
674
674 amr = view.map_async(lambda x: x*2, [rrank] * len(view))
675 amr = view.map_async(lambda x: x*2, [rrank] * len(view))
675 drank = amr.get(5)
676 drank = amr.get(5)
676 self.assertEqual(drank, [ r*2 for r in ranks ])
677 self.assertEqual(drank, [ r*2 for r in ranks ])
677
678
678
679
General Comments 0
You need to be logged in to leave comments. Login now