##// END OF EJS Templates
test very short timeouts...
MinRK -
Show More
@@ -1,211 +1,219
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """test LoadBalancedView objects
2 """test LoadBalancedView objects
3
3
4 Authors:
4 Authors:
5
5
6 * Min RK
6 * Min RK
7 """
7 """
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import sys
19 import sys
20 import time
20 import time
21
21
22 import zmq
22 import zmq
23 from nose import SkipTest
23 from nose import SkipTest
24 from nose.plugins.attrib import attr
24 from nose.plugins.attrib import attr
25
25
26 from IPython import parallel as pmod
26 from IPython import parallel as pmod
27 from IPython.parallel import error
27 from IPython.parallel import error
28
28
29 from IPython.parallel.tests import add_engines
29 from IPython.parallel.tests import add_engines
30
30
31 from .clienttest import ClusterTestCase, crash, wait, skip_without
31 from .clienttest import ClusterTestCase, crash, wait, skip_without
32
32
33 def setup():
33 def setup():
34 add_engines(3, total=True)
34 add_engines(3, total=True)
35
35
36 class TestLoadBalancedView(ClusterTestCase):
36 class TestLoadBalancedView(ClusterTestCase):
37
37
38 def setUp(self):
38 def setUp(self):
39 ClusterTestCase.setUp(self)
39 ClusterTestCase.setUp(self)
40 self.view = self.client.load_balanced_view()
40 self.view = self.client.load_balanced_view()
41
41
42 @attr('crash')
42 @attr('crash')
43 def test_z_crash_task(self):
43 def test_z_crash_task(self):
44 """test graceful handling of engine death (balanced)"""
44 """test graceful handling of engine death (balanced)"""
45 # self.add_engines(1)
45 # self.add_engines(1)
46 ar = self.view.apply_async(crash)
46 ar = self.view.apply_async(crash)
47 self.assertRaisesRemote(error.EngineError, ar.get, 10)
47 self.assertRaisesRemote(error.EngineError, ar.get, 10)
48 eid = ar.engine_id
48 eid = ar.engine_id
49 tic = time.time()
49 tic = time.time()
50 while eid in self.client.ids and time.time()-tic < 5:
50 while eid in self.client.ids and time.time()-tic < 5:
51 time.sleep(.01)
51 time.sleep(.01)
52 self.client.spin()
52 self.client.spin()
53 self.assertFalse(eid in self.client.ids, "Engine should have died")
53 self.assertFalse(eid in self.client.ids, "Engine should have died")
54
54
55 def test_map(self):
55 def test_map(self):
56 def f(x):
56 def f(x):
57 return x**2
57 return x**2
58 data = range(16)
58 data = range(16)
59 r = self.view.map_sync(f, data)
59 r = self.view.map_sync(f, data)
60 self.assertEqual(r, map(f, data))
60 self.assertEqual(r, map(f, data))
61
61
62 def test_map_generator(self):
62 def test_map_generator(self):
63 def f(x):
63 def f(x):
64 return x**2
64 return x**2
65
65
66 data = range(16)
66 data = range(16)
67 r = self.view.map_sync(f, iter(data))
67 r = self.view.map_sync(f, iter(data))
68 self.assertEqual(r, map(f, iter(data)))
68 self.assertEqual(r, map(f, iter(data)))
69
69
70 def test_map_short_first(self):
70 def test_map_short_first(self):
71 def f(x,y):
71 def f(x,y):
72 if y is None:
72 if y is None:
73 return y
73 return y
74 if x is None:
74 if x is None:
75 return x
75 return x
76 return x*y
76 return x*y
77 data = range(10)
77 data = range(10)
78 data2 = range(4)
78 data2 = range(4)
79
79
80 r = self.view.map_sync(f, data, data2)
80 r = self.view.map_sync(f, data, data2)
81 self.assertEqual(r, map(f, data, data2))
81 self.assertEqual(r, map(f, data, data2))
82
82
83 def test_map_short_last(self):
83 def test_map_short_last(self):
84 def f(x,y):
84 def f(x,y):
85 if y is None:
85 if y is None:
86 return y
86 return y
87 if x is None:
87 if x is None:
88 return x
88 return x
89 return x*y
89 return x*y
90 data = range(4)
90 data = range(4)
91 data2 = range(10)
91 data2 = range(10)
92
92
93 r = self.view.map_sync(f, data, data2)
93 r = self.view.map_sync(f, data, data2)
94 self.assertEqual(r, map(f, data, data2))
94 self.assertEqual(r, map(f, data, data2))
95
95
96 def test_map_unordered(self):
96 def test_map_unordered(self):
97 def f(x):
97 def f(x):
98 return x**2
98 return x**2
99 def slow_f(x):
99 def slow_f(x):
100 import time
100 import time
101 time.sleep(0.05*x)
101 time.sleep(0.05*x)
102 return x**2
102 return x**2
103 data = range(16,0,-1)
103 data = range(16,0,-1)
104 reference = map(f, data)
104 reference = map(f, data)
105
105
106 amr = self.view.map_async(slow_f, data, ordered=False)
106 amr = self.view.map_async(slow_f, data, ordered=False)
107 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
107 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
108 # check individual elements, retrieved as they come
108 # check individual elements, retrieved as they come
109 # list comprehension uses __iter__
109 # list comprehension uses __iter__
110 astheycame = [ r for r in amr ]
110 astheycame = [ r for r in amr ]
111 # Ensure that at least one result came out of order:
111 # Ensure that at least one result came out of order:
112 self.assertNotEqual(astheycame, reference, "should not have preserved order")
112 self.assertNotEqual(astheycame, reference, "should not have preserved order")
113 self.assertEqual(sorted(astheycame, reverse=True), reference, "result corrupted")
113 self.assertEqual(sorted(astheycame, reverse=True), reference, "result corrupted")
114
114
115 def test_map_ordered(self):
115 def test_map_ordered(self):
116 def f(x):
116 def f(x):
117 return x**2
117 return x**2
118 def slow_f(x):
118 def slow_f(x):
119 import time
119 import time
120 time.sleep(0.05*x)
120 time.sleep(0.05*x)
121 return x**2
121 return x**2
122 data = range(16,0,-1)
122 data = range(16,0,-1)
123 reference = map(f, data)
123 reference = map(f, data)
124
124
125 amr = self.view.map_async(slow_f, data)
125 amr = self.view.map_async(slow_f, data)
126 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
126 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
127 # check individual elements, retrieved as they come
127 # check individual elements, retrieved as they come
128 # list(amr) uses __iter__
128 # list(amr) uses __iter__
129 astheycame = list(amr)
129 astheycame = list(amr)
130 # Ensure that results came in order
130 # Ensure that results came in order
131 self.assertEqual(astheycame, reference)
131 self.assertEqual(astheycame, reference)
132 self.assertEqual(amr.result, reference)
132 self.assertEqual(amr.result, reference)
133
133
134 def test_map_iterable(self):
134 def test_map_iterable(self):
135 """test map on iterables (balanced)"""
135 """test map on iterables (balanced)"""
136 view = self.view
136 view = self.view
137 # 101 is prime, so it won't be evenly distributed
137 # 101 is prime, so it won't be evenly distributed
138 arr = range(101)
138 arr = range(101)
139 # so that it will be an iterator, even in Python 3
139 # so that it will be an iterator, even in Python 3
140 it = iter(arr)
140 it = iter(arr)
141 r = view.map_sync(lambda x:x, arr)
141 r = view.map_sync(lambda x:x, arr)
142 self.assertEqual(r, list(arr))
142 self.assertEqual(r, list(arr))
143
143
144
144
145 def test_abort(self):
145 def test_abort(self):
146 view = self.view
146 view = self.view
147 ar = self.client[:].apply_async(time.sleep, .5)
147 ar = self.client[:].apply_async(time.sleep, .5)
148 ar = self.client[:].apply_async(time.sleep, .5)
148 ar = self.client[:].apply_async(time.sleep, .5)
149 time.sleep(0.2)
149 time.sleep(0.2)
150 ar2 = view.apply_async(lambda : 2)
150 ar2 = view.apply_async(lambda : 2)
151 ar3 = view.apply_async(lambda : 3)
151 ar3 = view.apply_async(lambda : 3)
152 view.abort(ar2)
152 view.abort(ar2)
153 view.abort(ar3.msg_ids)
153 view.abort(ar3.msg_ids)
154 self.assertRaises(error.TaskAborted, ar2.get)
154 self.assertRaises(error.TaskAborted, ar2.get)
155 self.assertRaises(error.TaskAborted, ar3.get)
155 self.assertRaises(error.TaskAborted, ar3.get)
156
156
157 def test_retries(self):
157 def test_retries(self):
158 view = self.view
158 view = self.view
159 view.timeout = 1 # prevent hang if this doesn't behave
160 def fail():
159 def fail():
161 assert False
160 assert False
162 for r in range(len(self.client)-1):
161 for r in range(len(self.client)-1):
163 with view.temp_flags(retries=r):
162 with view.temp_flags(retries=r):
164 self.assertRaisesRemote(AssertionError, view.apply_sync, fail)
163 self.assertRaisesRemote(AssertionError, view.apply_sync, fail)
165
164
166 with view.temp_flags(retries=len(self.client), timeout=0.25):
165 with view.temp_flags(retries=len(self.client), timeout=0.1):
167 self.assertRaisesRemote(error.TaskTimeout, view.apply_sync, fail)
166 self.assertRaisesRemote(error.TaskTimeout, view.apply_sync, fail)
168
167
168 def test_short_timeout(self):
169 view = self.view
170 def fail():
171 import time
172 time.sleep(0.25)
173 assert False
174 with view.temp_flags(retries=1, timeout=0.01):
175 self.assertRaisesRemote(AssertionError, view.apply_sync, fail)
176
169 def test_invalid_dependency(self):
177 def test_invalid_dependency(self):
170 view = self.view
178 view = self.view
171 with view.temp_flags(after='12345'):
179 with view.temp_flags(after='12345'):
172 self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1)
180 self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1)
173
181
174 def test_impossible_dependency(self):
182 def test_impossible_dependency(self):
175 self.minimum_engines(2)
183 self.minimum_engines(2)
176 view = self.client.load_balanced_view()
184 view = self.client.load_balanced_view()
177 ar1 = view.apply_async(lambda : 1)
185 ar1 = view.apply_async(lambda : 1)
178 ar1.get()
186 ar1.get()
179 e1 = ar1.engine_id
187 e1 = ar1.engine_id
180 e2 = e1
188 e2 = e1
181 while e2 == e1:
189 while e2 == e1:
182 ar2 = view.apply_async(lambda : 1)
190 ar2 = view.apply_async(lambda : 1)
183 ar2.get()
191 ar2.get()
184 e2 = ar2.engine_id
192 e2 = ar2.engine_id
185
193
186 with view.temp_flags(follow=[ar1, ar2]):
194 with view.temp_flags(follow=[ar1, ar2]):
187 self.assertRaisesRemote(error.ImpossibleDependency, view.apply_sync, lambda : 1)
195 self.assertRaisesRemote(error.ImpossibleDependency, view.apply_sync, lambda : 1)
188
196
189
197
190 def test_follow(self):
198 def test_follow(self):
191 ar = self.view.apply_async(lambda : 1)
199 ar = self.view.apply_async(lambda : 1)
192 ar.get()
200 ar.get()
193 ars = []
201 ars = []
194 first_id = ar.engine_id
202 first_id = ar.engine_id
195
203
196 self.view.follow = ar
204 self.view.follow = ar
197 for i in range(5):
205 for i in range(5):
198 ars.append(self.view.apply_async(lambda : 1))
206 ars.append(self.view.apply_async(lambda : 1))
199 self.view.wait(ars)
207 self.view.wait(ars)
200 for ar in ars:
208 for ar in ars:
201 self.assertEqual(ar.engine_id, first_id)
209 self.assertEqual(ar.engine_id, first_id)
202
210
203 def test_after(self):
211 def test_after(self):
204 view = self.view
212 view = self.view
205 ar = view.apply_async(time.sleep, 0.5)
213 ar = view.apply_async(time.sleep, 0.5)
206 with view.temp_flags(after=ar):
214 with view.temp_flags(after=ar):
207 ar2 = view.apply_async(lambda : 1)
215 ar2 = view.apply_async(lambda : 1)
208
216
209 ar.wait()
217 ar.wait()
210 ar2.wait()
218 ar2.wait()
211 self.assertTrue(ar2.started >= ar.completed, "%s not >= %s"%(ar.started, ar.completed))
219 self.assertTrue(ar2.started >= ar.completed, "%s not >= %s"%(ar.started, ar.completed))
General Comments 0
You need to be logged in to leave comments. Login now