##// END OF EJS Templates
test view.map on a generator
MinRK -
Show More
@@ -1,203 +1,211 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """test LoadBalancedView objects
2 """test LoadBalancedView objects
3
3
4 Authors:
4 Authors:
5
5
6 * Min RK
6 * Min RK
7 """
7 """
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import sys
19 import sys
20 import time
20 import time
21
21
22 import zmq
22 import zmq
23 from nose import SkipTest
23 from nose import SkipTest
24 from nose.plugins.attrib import attr
24 from nose.plugins.attrib import attr
25
25
26 from IPython import parallel as pmod
26 from IPython import parallel as pmod
27 from IPython.parallel import error
27 from IPython.parallel import error
28
28
29 from IPython.parallel.tests import add_engines
29 from IPython.parallel.tests import add_engines
30
30
31 from .clienttest import ClusterTestCase, crash, wait, skip_without
31 from .clienttest import ClusterTestCase, crash, wait, skip_without
32
32
33 def setup():
33 def setup():
34 add_engines(3, total=True)
34 add_engines(3, total=True)
35
35
36 class TestLoadBalancedView(ClusterTestCase):
36 class TestLoadBalancedView(ClusterTestCase):
37
37
38 def setUp(self):
38 def setUp(self):
39 ClusterTestCase.setUp(self)
39 ClusterTestCase.setUp(self)
40 self.view = self.client.load_balanced_view()
40 self.view = self.client.load_balanced_view()
41
41
42 @attr('crash')
42 @attr('crash')
43 def test_z_crash_task(self):
43 def test_z_crash_task(self):
44 """test graceful handling of engine death (balanced)"""
44 """test graceful handling of engine death (balanced)"""
45 # self.add_engines(1)
45 # self.add_engines(1)
46 ar = self.view.apply_async(crash)
46 ar = self.view.apply_async(crash)
47 self.assertRaisesRemote(error.EngineError, ar.get, 10)
47 self.assertRaisesRemote(error.EngineError, ar.get, 10)
48 eid = ar.engine_id
48 eid = ar.engine_id
49 tic = time.time()
49 tic = time.time()
50 while eid in self.client.ids and time.time()-tic < 5:
50 while eid in self.client.ids and time.time()-tic < 5:
51 time.sleep(.01)
51 time.sleep(.01)
52 self.client.spin()
52 self.client.spin()
53 self.assertFalse(eid in self.client.ids, "Engine should have died")
53 self.assertFalse(eid in self.client.ids, "Engine should have died")
54
54
55 def test_map(self):
55 def test_map(self):
56 def f(x):
56 def f(x):
57 return x**2
57 return x**2
58 data = range(16)
58 data = range(16)
59 r = self.view.map_sync(f, data)
59 r = self.view.map_sync(f, data)
60 self.assertEqual(r, map(f, data))
60 self.assertEqual(r, map(f, data))
61
61
62 def test_map_generator(self):
63 def f(x):
64 return x**2
65
66 data = range(16)
67 r = self.view.map_sync(f, iter(data))
68 self.assertEqual(r, map(f, iter(data)))
69
62 def test_map_short_first(self):
70 def test_map_short_first(self):
63 def f(x,y):
71 def f(x,y):
64 if y is None:
72 if y is None:
65 return y
73 return y
66 if x is None:
74 if x is None:
67 return x
75 return x
68 return x*y
76 return x*y
69 data = range(10)
77 data = range(10)
70 data2 = range(4)
78 data2 = range(4)
71
79
72 r = self.view.map_sync(f, data, data2)
80 r = self.view.map_sync(f, data, data2)
73 self.assertEqual(r, map(f, data, data2))
81 self.assertEqual(r, map(f, data, data2))
74
82
75 def test_map_short_last(self):
83 def test_map_short_last(self):
76 def f(x,y):
84 def f(x,y):
77 if y is None:
85 if y is None:
78 return y
86 return y
79 if x is None:
87 if x is None:
80 return x
88 return x
81 return x*y
89 return x*y
82 data = range(4)
90 data = range(4)
83 data2 = range(10)
91 data2 = range(10)
84
92
85 r = self.view.map_sync(f, data, data2)
93 r = self.view.map_sync(f, data, data2)
86 self.assertEqual(r, map(f, data, data2))
94 self.assertEqual(r, map(f, data, data2))
87
95
88 def test_map_unordered(self):
96 def test_map_unordered(self):
89 def f(x):
97 def f(x):
90 return x**2
98 return x**2
91 def slow_f(x):
99 def slow_f(x):
92 import time
100 import time
93 time.sleep(0.05*x)
101 time.sleep(0.05*x)
94 return x**2
102 return x**2
95 data = range(16,0,-1)
103 data = range(16,0,-1)
96 reference = map(f, data)
104 reference = map(f, data)
97
105
98 amr = self.view.map_async(slow_f, data, ordered=False)
106 amr = self.view.map_async(slow_f, data, ordered=False)
99 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
107 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
100 # check individual elements, retrieved as they come
108 # check individual elements, retrieved as they come
101 # list comprehension uses __iter__
109 # list comprehension uses __iter__
102 astheycame = [ r for r in amr ]
110 astheycame = [ r for r in amr ]
103 # Ensure that at least one result came out of order:
111 # Ensure that at least one result came out of order:
104 self.assertNotEqual(astheycame, reference, "should not have preserved order")
112 self.assertNotEqual(astheycame, reference, "should not have preserved order")
105 self.assertEqual(sorted(astheycame, reverse=True), reference, "result corrupted")
113 self.assertEqual(sorted(astheycame, reverse=True), reference, "result corrupted")
106
114
107 def test_map_ordered(self):
115 def test_map_ordered(self):
108 def f(x):
116 def f(x):
109 return x**2
117 return x**2
110 def slow_f(x):
118 def slow_f(x):
111 import time
119 import time
112 time.sleep(0.05*x)
120 time.sleep(0.05*x)
113 return x**2
121 return x**2
114 data = range(16,0,-1)
122 data = range(16,0,-1)
115 reference = map(f, data)
123 reference = map(f, data)
116
124
117 amr = self.view.map_async(slow_f, data)
125 amr = self.view.map_async(slow_f, data)
118 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
126 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
119 # check individual elements, retrieved as they come
127 # check individual elements, retrieved as they come
120 # list(amr) uses __iter__
128 # list(amr) uses __iter__
121 astheycame = list(amr)
129 astheycame = list(amr)
122 # Ensure that results came in order
130 # Ensure that results came in order
123 self.assertEqual(astheycame, reference)
131 self.assertEqual(astheycame, reference)
124 self.assertEqual(amr.result, reference)
132 self.assertEqual(amr.result, reference)
125
133
126 def test_map_iterable(self):
134 def test_map_iterable(self):
127 """test map on iterables (balanced)"""
135 """test map on iterables (balanced)"""
128 view = self.view
136 view = self.view
129 # 101 is prime, so it won't be evenly distributed
137 # 101 is prime, so it won't be evenly distributed
130 arr = range(101)
138 arr = range(101)
131 # so that it will be an iterator, even in Python 3
139 # so that it will be an iterator, even in Python 3
132 it = iter(arr)
140 it = iter(arr)
133 r = view.map_sync(lambda x:x, arr)
141 r = view.map_sync(lambda x:x, arr)
134 self.assertEqual(r, list(arr))
142 self.assertEqual(r, list(arr))
135
143
136
144
137 def test_abort(self):
145 def test_abort(self):
138 view = self.view
146 view = self.view
139 ar = self.client[:].apply_async(time.sleep, .5)
147 ar = self.client[:].apply_async(time.sleep, .5)
140 ar = self.client[:].apply_async(time.sleep, .5)
148 ar = self.client[:].apply_async(time.sleep, .5)
141 time.sleep(0.2)
149 time.sleep(0.2)
142 ar2 = view.apply_async(lambda : 2)
150 ar2 = view.apply_async(lambda : 2)
143 ar3 = view.apply_async(lambda : 3)
151 ar3 = view.apply_async(lambda : 3)
144 view.abort(ar2)
152 view.abort(ar2)
145 view.abort(ar3.msg_ids)
153 view.abort(ar3.msg_ids)
146 self.assertRaises(error.TaskAborted, ar2.get)
154 self.assertRaises(error.TaskAborted, ar2.get)
147 self.assertRaises(error.TaskAborted, ar3.get)
155 self.assertRaises(error.TaskAborted, ar3.get)
148
156
149 def test_retries(self):
157 def test_retries(self):
150 view = self.view
158 view = self.view
151 view.timeout = 1 # prevent hang if this doesn't behave
159 view.timeout = 1 # prevent hang if this doesn't behave
152 def fail():
160 def fail():
153 assert False
161 assert False
154 for r in range(len(self.client)-1):
162 for r in range(len(self.client)-1):
155 with view.temp_flags(retries=r):
163 with view.temp_flags(retries=r):
156 self.assertRaisesRemote(AssertionError, view.apply_sync, fail)
164 self.assertRaisesRemote(AssertionError, view.apply_sync, fail)
157
165
158 with view.temp_flags(retries=len(self.client), timeout=0.25):
166 with view.temp_flags(retries=len(self.client), timeout=0.25):
159 self.assertRaisesRemote(error.TaskTimeout, view.apply_sync, fail)
167 self.assertRaisesRemote(error.TaskTimeout, view.apply_sync, fail)
160
168
161 def test_invalid_dependency(self):
169 def test_invalid_dependency(self):
162 view = self.view
170 view = self.view
163 with view.temp_flags(after='12345'):
171 with view.temp_flags(after='12345'):
164 self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1)
172 self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1)
165
173
166 def test_impossible_dependency(self):
174 def test_impossible_dependency(self):
167 self.minimum_engines(2)
175 self.minimum_engines(2)
168 view = self.client.load_balanced_view()
176 view = self.client.load_balanced_view()
169 ar1 = view.apply_async(lambda : 1)
177 ar1 = view.apply_async(lambda : 1)
170 ar1.get()
178 ar1.get()
171 e1 = ar1.engine_id
179 e1 = ar1.engine_id
172 e2 = e1
180 e2 = e1
173 while e2 == e1:
181 while e2 == e1:
174 ar2 = view.apply_async(lambda : 1)
182 ar2 = view.apply_async(lambda : 1)
175 ar2.get()
183 ar2.get()
176 e2 = ar2.engine_id
184 e2 = ar2.engine_id
177
185
178 with view.temp_flags(follow=[ar1, ar2]):
186 with view.temp_flags(follow=[ar1, ar2]):
179 self.assertRaisesRemote(error.ImpossibleDependency, view.apply_sync, lambda : 1)
187 self.assertRaisesRemote(error.ImpossibleDependency, view.apply_sync, lambda : 1)
180
188
181
189
182 def test_follow(self):
190 def test_follow(self):
183 ar = self.view.apply_async(lambda : 1)
191 ar = self.view.apply_async(lambda : 1)
184 ar.get()
192 ar.get()
185 ars = []
193 ars = []
186 first_id = ar.engine_id
194 first_id = ar.engine_id
187
195
188 self.view.follow = ar
196 self.view.follow = ar
189 for i in range(5):
197 for i in range(5):
190 ars.append(self.view.apply_async(lambda : 1))
198 ars.append(self.view.apply_async(lambda : 1))
191 self.view.wait(ars)
199 self.view.wait(ars)
192 for ar in ars:
200 for ar in ars:
193 self.assertEqual(ar.engine_id, first_id)
201 self.assertEqual(ar.engine_id, first_id)
194
202
195 def test_after(self):
203 def test_after(self):
196 view = self.view
204 view = self.view
197 ar = view.apply_async(time.sleep, 0.5)
205 ar = view.apply_async(time.sleep, 0.5)
198 with view.temp_flags(after=ar):
206 with view.temp_flags(after=ar):
199 ar2 = view.apply_async(lambda : 1)
207 ar2 = view.apply_async(lambda : 1)
200
208
201 ar.wait()
209 ar.wait()
202 ar2.wait()
210 ar2.wait()
203 self.assertTrue(ar2.started >= ar.completed, "%s not >= %s"%(ar.started, ar.completed))
211 self.assertTrue(ar2.started >= ar.completed, "%s not >= %s"%(ar.started, ar.completed))
General Comments 0
You need to be logged in to leave comments. Login now