##// END OF EJS Templates
fix out of sync parallel tests...
MinRK -
Show More
@@ -1,147 +1,148 b''
1 """Tests for parallel client.py"""
1 """Tests for parallel client.py"""
2
2
3 #-------------------------------------------------------------------------------
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2011 The IPython Development Team
4 # Copyright (C) 2011 The IPython Development Team
5 #
5 #
6 # Distributed under the terms of the BSD License. The full license is in
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9
9
10 #-------------------------------------------------------------------------------
10 #-------------------------------------------------------------------------------
11 # Imports
11 # Imports
12 #-------------------------------------------------------------------------------
12 #-------------------------------------------------------------------------------
13
13
14 import time
14 import time
15 from tempfile import mktemp
15 from tempfile import mktemp
16
16
17 import zmq
17 import zmq
18
18
19 from IPython.parallel.client import client as clientmod
19 from IPython.parallel.client import client as clientmod
20 from IPython.parallel import error
20 from IPython.parallel import error
21 from IPython.parallel import AsyncResult, AsyncHubResult
21 from IPython.parallel import AsyncResult, AsyncHubResult
22 from IPython.parallel import LoadBalancedView, DirectView
22 from IPython.parallel import LoadBalancedView, DirectView
23
23
24 from clienttest import ClusterTestCase, segfault, wait, add_engines
24 from clienttest import ClusterTestCase, segfault, wait, add_engines
25
25
26 def setup():
26 def setup():
27 add_engines(4)
27 add_engines(4)
28
28
29 class TestClient(ClusterTestCase):
29 class TestClient(ClusterTestCase):
30
30
31 def test_ids(self):
31 def test_ids(self):
32 n = len(self.client.ids)
32 n = len(self.client.ids)
33 self.add_engines(3)
33 self.add_engines(3)
34 self.assertEquals(len(self.client.ids), n+3)
34 self.assertEquals(len(self.client.ids), n+3)
35
35
36 def test_view_indexing(self):
36 def test_view_indexing(self):
37 """test index access for views"""
37 """test index access for views"""
38 self.add_engines(2)
38 self.add_engines(2)
39 targets = self.client._build_targets('all')[-1]
39 targets = self.client._build_targets('all')[-1]
40 v = self.client[:]
40 v = self.client[:]
41 self.assertEquals(v.targets, targets)
41 self.assertEquals(v.targets, targets)
42 t = self.client.ids[2]
42 t = self.client.ids[2]
43 v = self.client[t]
43 v = self.client[t]
44 self.assert_(isinstance(v, DirectView))
44 self.assert_(isinstance(v, DirectView))
45 self.assertEquals(v.targets, t)
45 self.assertEquals(v.targets, t)
46 t = self.client.ids[2:4]
46 t = self.client.ids[2:4]
47 v = self.client[t]
47 v = self.client[t]
48 self.assert_(isinstance(v, DirectView))
48 self.assert_(isinstance(v, DirectView))
49 self.assertEquals(v.targets, t)
49 self.assertEquals(v.targets, t)
50 v = self.client[::2]
50 v = self.client[::2]
51 self.assert_(isinstance(v, DirectView))
51 self.assert_(isinstance(v, DirectView))
52 self.assertEquals(v.targets, targets[::2])
52 self.assertEquals(v.targets, targets[::2])
53 v = self.client[1::3]
53 v = self.client[1::3]
54 self.assert_(isinstance(v, DirectView))
54 self.assert_(isinstance(v, DirectView))
55 self.assertEquals(v.targets, targets[1::3])
55 self.assertEquals(v.targets, targets[1::3])
56 v = self.client[:-3]
56 v = self.client[:-3]
57 self.assert_(isinstance(v, DirectView))
57 self.assert_(isinstance(v, DirectView))
58 self.assertEquals(v.targets, targets[:-3])
58 self.assertEquals(v.targets, targets[:-3])
59 v = self.client[-1]
59 v = self.client[-1]
60 self.assert_(isinstance(v, DirectView))
60 self.assert_(isinstance(v, DirectView))
61 self.assertEquals(v.targets, targets[-1])
61 self.assertEquals(v.targets, targets[-1])
62 self.assertRaises(TypeError, lambda : self.client[None])
62 self.assertRaises(TypeError, lambda : self.client[None])
63
63
64 def test_lbview_targets(self):
64 def test_lbview_targets(self):
65 """test load_balanced_view targets"""
65 """test load_balanced_view targets"""
66 v = self.client.load_balanced_view()
66 v = self.client.load_balanced_view()
67 self.assertEquals(v.targets, None)
67 self.assertEquals(v.targets, None)
68 v = self.client.load_balanced_view(-1)
68 v = self.client.load_balanced_view(-1)
69 self.assertEquals(v.targets, [self.client.ids[-1]])
69 self.assertEquals(v.targets, [self.client.ids[-1]])
70 v = self.client.load_balanced_view('all')
70 v = self.client.load_balanced_view('all')
71 self.assertEquals(v.targets, self.client.ids)
71 self.assertEquals(v.targets, self.client.ids)
72
72
73 def test_targets(self):
73 def test_targets(self):
74 """test various valid targets arguments"""
74 """test various valid targets arguments"""
75 build = self.client._build_targets
75 build = self.client._build_targets
76 ids = self.client.ids
76 ids = self.client.ids
77 idents,targets = build(None)
77 idents,targets = build(None)
78 self.assertEquals(ids, targets)
78 self.assertEquals(ids, targets)
79
79
80 def test_clear(self):
80 def test_clear(self):
81 """test clear behavior"""
81 """test clear behavior"""
82 # self.add_engines(2)
82 # self.add_engines(2)
83 v = self.client[:]
83 v = self.client[:]
84 v.block=True
84 v.block=True
85 v.push(dict(a=5))
85 v.push(dict(a=5))
86 v.pull('a')
86 v.pull('a')
87 id0 = self.client.ids[-1]
87 id0 = self.client.ids[-1]
88 self.client.clear(targets=id0)
88 self.client.clear(targets=id0)
89 self.client[:-1].pull('a')
89 self.client[:-1].pull('a')
90 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
90 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
91 self.client.clear(block=True)
91 self.client.clear(block=True)
92 for i in self.client.ids:
92 for i in self.client.ids:
93 # print i
93 # print i
94 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
94 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
95
95
96 def test_get_result(self):
96 def test_get_result(self):
97 """test getting results from the Hub."""
97 """test getting results from the Hub."""
98 c = clientmod.Client(profile='iptest')
98 c = clientmod.Client(profile='iptest')
99 # self.add_engines(1)
99 # self.add_engines(1)
100 t = c.ids[-1]
100 t = c.ids[-1]
101 ar = c[t].apply_async(wait, 1)
101 ar = c[t].apply_async(wait, 1)
102 # give the monitor time to notice the message
102 # give the monitor time to notice the message
103 time.sleep(.25)
103 time.sleep(.25)
104 ahr = self.client.get_result(ar.msg_ids)
104 ahr = self.client.get_result(ar.msg_ids)
105 self.assertTrue(isinstance(ahr, AsyncHubResult))
105 self.assertTrue(isinstance(ahr, AsyncHubResult))
106 self.assertEquals(ahr.get(), ar.get())
106 self.assertEquals(ahr.get(), ar.get())
107 ar2 = self.client.get_result(ar.msg_ids)
107 ar2 = self.client.get_result(ar.msg_ids)
108 self.assertFalse(isinstance(ar2, AsyncHubResult))
108 self.assertFalse(isinstance(ar2, AsyncHubResult))
109 c.close()
109 c.close()
110
110
111 def test_ids_list(self):
111 def test_ids_list(self):
112 """test client.ids"""
112 """test client.ids"""
113 # self.add_engines(2)
113 # self.add_engines(2)
114 ids = self.client.ids
114 ids = self.client.ids
115 self.assertEquals(ids, self.client._ids)
115 self.assertEquals(ids, self.client._ids)
116 self.assertFalse(ids is self.client._ids)
116 self.assertFalse(ids is self.client._ids)
117 ids.remove(ids[-1])
117 ids.remove(ids[-1])
118 self.assertNotEquals(ids, self.client._ids)
118 self.assertNotEquals(ids, self.client._ids)
119
119
120 def test_queue_status(self):
120 def test_queue_status(self):
121 # self.addEngine(4)
121 # self.addEngine(4)
122 ids = self.client.ids
122 ids = self.client.ids
123 id0 = ids[0]
123 id0 = ids[0]
124 qs = self.client.queue_status(targets=id0)
124 qs = self.client.queue_status(targets=id0)
125 self.assertTrue(isinstance(qs, dict))
125 self.assertTrue(isinstance(qs, dict))
126 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
126 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
127 allqs = self.client.queue_status()
127 allqs = self.client.queue_status()
128 self.assertTrue(isinstance(allqs, dict))
128 self.assertTrue(isinstance(allqs, dict))
129 self.assertEquals(sorted(allqs.keys()), self.client.ids)
129 self.assertEquals(sorted(allqs.keys()), sorted(self.client.ids + ['unassigned']))
130 unassigned = allqs.pop('unassigned')
130 for eid,qs in allqs.items():
131 for eid,qs in allqs.items():
131 self.assertTrue(isinstance(qs, dict))
132 self.assertTrue(isinstance(qs, dict))
132 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
133 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
133
134
134 def test_shutdown(self):
135 def test_shutdown(self):
135 # self.addEngine(4)
136 # self.addEngine(4)
136 ids = self.client.ids
137 ids = self.client.ids
137 id0 = ids[0]
138 id0 = ids[0]
138 self.client.shutdown(id0, block=True)
139 self.client.shutdown(id0, block=True)
139 while id0 in self.client.ids:
140 while id0 in self.client.ids:
140 time.sleep(0.1)
141 time.sleep(0.1)
141 self.client.spin()
142 self.client.spin()
142
143
143 self.assertRaises(IndexError, lambda : self.client[id0])
144 self.assertRaises(IndexError, lambda : self.client[id0])
144
145
145 def test_result_status(self):
146 def test_result_status(self):
146 pass
147 pass
147 # to be written
148 # to be written
@@ -1,301 +1,302 b''
1 """test View objects"""
1 """test View objects"""
2 #-------------------------------------------------------------------------------
2 #-------------------------------------------------------------------------------
3 # Copyright (C) 2011 The IPython Development Team
3 # Copyright (C) 2011 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-------------------------------------------------------------------------------
7 #-------------------------------------------------------------------------------
8
8
9 #-------------------------------------------------------------------------------
9 #-------------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-------------------------------------------------------------------------------
11 #-------------------------------------------------------------------------------
12
12
13 import time
13 import time
14 from tempfile import mktemp
14 from tempfile import mktemp
15
15
16 import zmq
16 import zmq
17
17
18 from IPython import parallel as pmod
18 from IPython import parallel as pmod
19 from IPython.parallel import error
19 from IPython.parallel import error
20 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
20 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
21 from IPython.parallel import LoadBalancedView, DirectView
21 from IPython.parallel import LoadBalancedView, DirectView
22 from IPython.parallel.util import interactive
22 from IPython.parallel.util import interactive
23
23
24 from IPython.parallel.tests import add_engines
24 from IPython.parallel.tests import add_engines
25
25
26 from .clienttest import ClusterTestCase, segfault, wait, skip_without
26 from .clienttest import ClusterTestCase, segfault, wait, skip_without
27
27
28 def setup():
28 def setup():
29 add_engines(3)
29 add_engines(3)
30
30
31 class TestView(ClusterTestCase):
31 class TestView(ClusterTestCase):
32
32
33 def test_segfault_task(self):
33 def test_segfault_task(self):
34 """test graceful handling of engine death (balanced)"""
34 """test graceful handling of engine death (balanced)"""
35 # self.add_engines(1)
35 # self.add_engines(1)
36 ar = self.client[-1].apply_async(segfault)
36 ar = self.client[-1].apply_async(segfault)
37 self.assertRaisesRemote(error.EngineError, ar.get)
37 self.assertRaisesRemote(error.EngineError, ar.get)
38 eid = ar.engine_id
38 eid = ar.engine_id
39 while eid in self.client.ids:
39 while eid in self.client.ids:
40 time.sleep(.01)
40 time.sleep(.01)
41 self.client.spin()
41 self.client.spin()
42
42
43 def test_segfault_mux(self):
43 def test_segfault_mux(self):
44 """test graceful handling of engine death (direct)"""
44 """test graceful handling of engine death (direct)"""
45 # self.add_engines(1)
45 # self.add_engines(1)
46 eid = self.client.ids[-1]
46 eid = self.client.ids[-1]
47 ar = self.client[eid].apply_async(segfault)
47 ar = self.client[eid].apply_async(segfault)
48 self.assertRaisesRemote(error.EngineError, ar.get)
48 self.assertRaisesRemote(error.EngineError, ar.get)
49 eid = ar.engine_id
49 eid = ar.engine_id
50 while eid in self.client.ids:
50 while eid in self.client.ids:
51 time.sleep(.01)
51 time.sleep(.01)
52 self.client.spin()
52 self.client.spin()
53
53
54 def test_push_pull(self):
54 def test_push_pull(self):
55 """test pushing and pulling"""
55 """test pushing and pulling"""
56 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
56 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
57 t = self.client.ids[-1]
57 t = self.client.ids[-1]
58 v = self.client[t]
58 v = self.client[t]
59 push = v.push
59 push = v.push
60 pull = v.pull
60 pull = v.pull
61 v.block=True
61 v.block=True
62 nengines = len(self.client)
62 nengines = len(self.client)
63 push({'data':data})
63 push({'data':data})
64 d = pull('data')
64 d = pull('data')
65 self.assertEquals(d, data)
65 self.assertEquals(d, data)
66 self.client[:].push({'data':data})
66 self.client[:].push({'data':data})
67 d = self.client[:].pull('data', block=True)
67 d = self.client[:].pull('data', block=True)
68 self.assertEquals(d, nengines*[data])
68 self.assertEquals(d, nengines*[data])
69 ar = push({'data':data}, block=False)
69 ar = push({'data':data}, block=False)
70 self.assertTrue(isinstance(ar, AsyncResult))
70 self.assertTrue(isinstance(ar, AsyncResult))
71 r = ar.get()
71 r = ar.get()
72 ar = self.client[:].pull('data', block=False)
72 ar = self.client[:].pull('data', block=False)
73 self.assertTrue(isinstance(ar, AsyncResult))
73 self.assertTrue(isinstance(ar, AsyncResult))
74 r = ar.get()
74 r = ar.get()
75 self.assertEquals(r, nengines*[data])
75 self.assertEquals(r, nengines*[data])
76 self.client[:].push(dict(a=10,b=20))
76 self.client[:].push(dict(a=10,b=20))
77 r = self.client[:].pull(('a','b'))
77 r = self.client[:].pull(('a','b'), block=True)
78 self.assertEquals(r, nengines*[[10,20]])
78 self.assertEquals(r, nengines*[[10,20]])
79
79
80 def test_push_pull_function(self):
80 def test_push_pull_function(self):
81 "test pushing and pulling functions"
81 "test pushing and pulling functions"
82 def testf(x):
82 def testf(x):
83 return 2.0*x
83 return 2.0*x
84
84
85 t = self.client.ids[-1]
85 t = self.client.ids[-1]
86 self.client[t].block=True
86 v = self.client[t]
87 push = self.client[t].push
87 v.block=True
88 pull = self.client[t].pull
88 push = v.push
89 execute = self.client[t].execute
89 pull = v.pull
90 execute = v.execute
90 push({'testf':testf})
91 push({'testf':testf})
91 r = pull('testf')
92 r = pull('testf')
92 self.assertEqual(r(1.0), testf(1.0))
93 self.assertEqual(r(1.0), testf(1.0))
93 execute('r = testf(10)')
94 execute('r = testf(10)')
94 r = pull('r')
95 r = pull('r')
95 self.assertEquals(r, testf(10))
96 self.assertEquals(r, testf(10))
96 ar = self.client[:].push({'testf':testf}, block=False)
97 ar = self.client[:].push({'testf':testf}, block=False)
97 ar.get()
98 ar.get()
98 ar = self.client[:].pull('testf', block=False)
99 ar = self.client[:].pull('testf', block=False)
99 rlist = ar.get()
100 rlist = ar.get()
100 for r in rlist:
101 for r in rlist:
101 self.assertEqual(r(1.0), testf(1.0))
102 self.assertEqual(r(1.0), testf(1.0))
102 execute("def g(x): return x*x")
103 execute("def g(x): return x*x")
103 r = pull(('testf','g'))
104 r = pull(('testf','g'))
104 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
105 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
105
106
106 def test_push_function_globals(self):
107 def test_push_function_globals(self):
107 """test that pushed functions have access to globals"""
108 """test that pushed functions have access to globals"""
108 @interactive
109 @interactive
109 def geta():
110 def geta():
110 return a
111 return a
111 # self.add_engines(1)
112 # self.add_engines(1)
112 v = self.client[-1]
113 v = self.client[-1]
113 v.block=True
114 v.block=True
114 v['f'] = geta
115 v['f'] = geta
115 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
116 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
116 v.execute('a=5')
117 v.execute('a=5')
117 v.execute('b=f()')
118 v.execute('b=f()')
118 self.assertEquals(v['b'], 5)
119 self.assertEquals(v['b'], 5)
119
120
120 def test_push_function_defaults(self):
121 def test_push_function_defaults(self):
121 """test that pushed functions preserve default args"""
122 """test that pushed functions preserve default args"""
122 def echo(a=10):
123 def echo(a=10):
123 return a
124 return a
124 v = self.client[-1]
125 v = self.client[-1]
125 v.block=True
126 v.block=True
126 v['f'] = echo
127 v['f'] = echo
127 v.execute('b=f()')
128 v.execute('b=f()')
128 self.assertEquals(v['b'], 10)
129 self.assertEquals(v['b'], 10)
129
130
130 def test_get_result(self):
131 def test_get_result(self):
131 """test getting results from the Hub."""
132 """test getting results from the Hub."""
132 c = pmod.Client(profile='iptest')
133 c = pmod.Client(profile='iptest')
133 # self.add_engines(1)
134 # self.add_engines(1)
134 t = c.ids[-1]
135 t = c.ids[-1]
135 v = c[t]
136 v = c[t]
136 v2 = self.client[t]
137 v2 = self.client[t]
137 ar = v.apply_async(wait, 1)
138 ar = v.apply_async(wait, 1)
138 # give the monitor time to notice the message
139 # give the monitor time to notice the message
139 time.sleep(.25)
140 time.sleep(.25)
140 ahr = v2.get_result(ar.msg_ids)
141 ahr = v2.get_result(ar.msg_ids)
141 self.assertTrue(isinstance(ahr, AsyncHubResult))
142 self.assertTrue(isinstance(ahr, AsyncHubResult))
142 self.assertEquals(ahr.get(), ar.get())
143 self.assertEquals(ahr.get(), ar.get())
143 ar2 = v2.get_result(ar.msg_ids)
144 ar2 = v2.get_result(ar.msg_ids)
144 self.assertFalse(isinstance(ar2, AsyncHubResult))
145 self.assertFalse(isinstance(ar2, AsyncHubResult))
145 c.spin()
146 c.spin()
146 c.close()
147 c.close()
147
148
148 def test_run_newline(self):
149 def test_run_newline(self):
149 """test that run appends newline to files"""
150 """test that run appends newline to files"""
150 tmpfile = mktemp()
151 tmpfile = mktemp()
151 with open(tmpfile, 'w') as f:
152 with open(tmpfile, 'w') as f:
152 f.write("""def g():
153 f.write("""def g():
153 return 5
154 return 5
154 """)
155 """)
155 v = self.client[-1]
156 v = self.client[-1]
156 v.run(tmpfile, block=True)
157 v.run(tmpfile, block=True)
157 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
158 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
158
159
159 def test_apply_tracked(self):
160 def test_apply_tracked(self):
160 """test tracking for apply"""
161 """test tracking for apply"""
161 # self.add_engines(1)
162 # self.add_engines(1)
162 t = self.client.ids[-1]
163 t = self.client.ids[-1]
163 v = self.client[t]
164 v = self.client[t]
164 v.block=False
165 v.block=False
165 def echo(n=1024*1024, **kwargs):
166 def echo(n=1024*1024, **kwargs):
166 with v.temp_flags(**kwargs):
167 with v.temp_flags(**kwargs):
167 return v.apply(lambda x: x, 'x'*n)
168 return v.apply(lambda x: x, 'x'*n)
168 ar = echo(1, track=False)
169 ar = echo(1, track=False)
169 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
170 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
170 self.assertTrue(ar.sent)
171 self.assertTrue(ar.sent)
171 ar = echo(track=True)
172 ar = echo(track=True)
172 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
173 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
173 self.assertEquals(ar.sent, ar._tracker.done)
174 self.assertEquals(ar.sent, ar._tracker.done)
174 ar._tracker.wait()
175 ar._tracker.wait()
175 self.assertTrue(ar.sent)
176 self.assertTrue(ar.sent)
176
177
177 def test_push_tracked(self):
178 def test_push_tracked(self):
178 t = self.client.ids[-1]
179 t = self.client.ids[-1]
179 ns = dict(x='x'*1024*1024)
180 ns = dict(x='x'*1024*1024)
180 v = self.client[t]
181 v = self.client[t]
181 ar = v.push(ns, block=False, track=False)
182 ar = v.push(ns, block=False, track=False)
182 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
183 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
183 self.assertTrue(ar.sent)
184 self.assertTrue(ar.sent)
184
185
185 ar = v.push(ns, block=False, track=True)
186 ar = v.push(ns, block=False, track=True)
186 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
187 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
187 self.assertEquals(ar.sent, ar._tracker.done)
188 self.assertEquals(ar.sent, ar._tracker.done)
188 ar._tracker.wait()
189 ar._tracker.wait()
189 self.assertTrue(ar.sent)
190 self.assertTrue(ar.sent)
190 ar.get()
191 ar.get()
191
192
192 def test_scatter_tracked(self):
193 def test_scatter_tracked(self):
193 t = self.client.ids
194 t = self.client.ids
194 x='x'*1024*1024
195 x='x'*1024*1024
195 ar = self.client[t].scatter('x', x, block=False, track=False)
196 ar = self.client[t].scatter('x', x, block=False, track=False)
196 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
197 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
197 self.assertTrue(ar.sent)
198 self.assertTrue(ar.sent)
198
199
199 ar = self.client[t].scatter('x', x, block=False, track=True)
200 ar = self.client[t].scatter('x', x, block=False, track=True)
200 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
201 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
201 self.assertEquals(ar.sent, ar._tracker.done)
202 self.assertEquals(ar.sent, ar._tracker.done)
202 ar._tracker.wait()
203 ar._tracker.wait()
203 self.assertTrue(ar.sent)
204 self.assertTrue(ar.sent)
204 ar.get()
205 ar.get()
205
206
206 def test_remote_reference(self):
207 def test_remote_reference(self):
207 v = self.client[-1]
208 v = self.client[-1]
208 v['a'] = 123
209 v['a'] = 123
209 ra = pmod.Reference('a')
210 ra = pmod.Reference('a')
210 b = v.apply_sync(lambda x: x, ra)
211 b = v.apply_sync(lambda x: x, ra)
211 self.assertEquals(b, 123)
212 self.assertEquals(b, 123)
212
213
213
214
214 def test_scatter_gather(self):
215 def test_scatter_gather(self):
215 view = self.client[:]
216 view = self.client[:]
216 seq1 = range(16)
217 seq1 = range(16)
217 view.scatter('a', seq1)
218 view.scatter('a', seq1)
218 seq2 = view.gather('a', block=True)
219 seq2 = view.gather('a', block=True)
219 self.assertEquals(seq2, seq1)
220 self.assertEquals(seq2, seq1)
220 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
221 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
221
222
222 @skip_without('numpy')
223 @skip_without('numpy')
223 def test_scatter_gather_numpy(self):
224 def test_scatter_gather_numpy(self):
224 import numpy
225 import numpy
225 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
226 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
226 view = self.client[:]
227 view = self.client[:]
227 a = numpy.arange(64)
228 a = numpy.arange(64)
228 view.scatter('a', a)
229 view.scatter('a', a)
229 b = view.gather('a', block=True)
230 b = view.gather('a', block=True)
230 assert_array_equal(b, a)
231 assert_array_equal(b, a)
231
232
232 def test_map(self):
233 def test_map(self):
233 view = self.client[:]
234 view = self.client[:]
234 def f(x):
235 def f(x):
235 return x**2
236 return x**2
236 data = range(16)
237 data = range(16)
237 r = view.map_sync(f, data)
238 r = view.map_sync(f, data)
238 self.assertEquals(r, map(f, data))
239 self.assertEquals(r, map(f, data))
239
240
240 def test_scatterGatherNonblocking(self):
241 def test_scatterGatherNonblocking(self):
241 data = range(16)
242 data = range(16)
242 view = self.client[:]
243 view = self.client[:]
243 view.scatter('a', data, block=False)
244 view.scatter('a', data, block=False)
244 ar = view.gather('a', block=False)
245 ar = view.gather('a', block=False)
245 self.assertEquals(ar.get(), data)
246 self.assertEquals(ar.get(), data)
246
247
247 @skip_without('numpy')
248 @skip_without('numpy')
248 def test_scatter_gather_numpy_nonblocking(self):
249 def test_scatter_gather_numpy_nonblocking(self):
249 import numpy
250 import numpy
250 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
251 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
251 a = numpy.arange(64)
252 a = numpy.arange(64)
252 view = self.client[:]
253 view = self.client[:]
253 ar = view.scatter('a', a, block=False)
254 ar = view.scatter('a', a, block=False)
254 self.assertTrue(isinstance(ar, AsyncResult))
255 self.assertTrue(isinstance(ar, AsyncResult))
255 amr = view.gather('a', block=False)
256 amr = view.gather('a', block=False)
256 self.assertTrue(isinstance(amr, AsyncMapResult))
257 self.assertTrue(isinstance(amr, AsyncMapResult))
257 assert_array_equal(amr.get(), a)
258 assert_array_equal(amr.get(), a)
258
259
259 def test_execute(self):
260 def test_execute(self):
260 view = self.client[:]
261 view = self.client[:]
261 # self.client.debug=True
262 # self.client.debug=True
262 execute = view.execute
263 execute = view.execute
263 ar = execute('c=30', block=False)
264 ar = execute('c=30', block=False)
264 self.assertTrue(isinstance(ar, AsyncResult))
265 self.assertTrue(isinstance(ar, AsyncResult))
265 ar = execute('d=[0,1,2]', block=False)
266 ar = execute('d=[0,1,2]', block=False)
266 self.client.wait(ar, 1)
267 self.client.wait(ar, 1)
267 self.assertEquals(len(ar.get()), len(self.client))
268 self.assertEquals(len(ar.get()), len(self.client))
268 for c in view['c']:
269 for c in view['c']:
269 self.assertEquals(c, 30)
270 self.assertEquals(c, 30)
270
271
271 def test_abort(self):
272 def test_abort(self):
272 view = self.client[-1]
273 view = self.client[-1]
273 ar = view.execute('import time; time.sleep(0.25)', block=False)
274 ar = view.execute('import time; time.sleep(0.25)', block=False)
274 ar2 = view.apply_async(lambda : 2)
275 ar2 = view.apply_async(lambda : 2)
275 ar3 = view.apply_async(lambda : 3)
276 ar3 = view.apply_async(lambda : 3)
276 view.abort(ar2)
277 view.abort(ar2)
277 view.abort(ar3.msg_ids)
278 view.abort(ar3.msg_ids)
278 self.assertRaises(error.TaskAborted, ar2.get)
279 self.assertRaises(error.TaskAborted, ar2.get)
279 self.assertRaises(error.TaskAborted, ar3.get)
280 self.assertRaises(error.TaskAborted, ar3.get)
280
281
281 def test_temp_flags(self):
282 def test_temp_flags(self):
282 view = self.client[-1]
283 view = self.client[-1]
283 view.block=True
284 view.block=True
284 with view.temp_flags(block=False):
285 with view.temp_flags(block=False):
285 self.assertFalse(view.block)
286 self.assertFalse(view.block)
286 self.assertTrue(view.block)
287 self.assertTrue(view.block)
287
288
288 def test_importer(self):
289 def test_importer(self):
289 view = self.client[-1]
290 view = self.client[-1]
290 view.clear(block=True)
291 view.clear(block=True)
291 with view.importer:
292 with view.importer:
292 import re
293 import re
293
294
294 @interactive
295 @interactive
295 def findall(pat, s):
296 def findall(pat, s):
296 # this globals() step isn't necessary in real code
297 # this globals() step isn't necessary in real code
297 # only to prevent a closure in the test
298 # only to prevent a closure in the test
298 return globals()['re'].findall(pat, s)
299 return globals()['re'].findall(pat, s)
299
300
300 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
301 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
301
302
General Comments 0
You need to be logged in to leave comments. Login now