##// END OF EJS Templates
test very short timeouts...
MinRK -
Show More
@@ -1,211 +1,219
1 1 # -*- coding: utf-8 -*-
2 2 """test LoadBalancedView objects
3 3
4 4 Authors:
5 5
6 6 * Min RK
7 7 """
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 import sys
20 20 import time
21 21
22 22 import zmq
23 23 from nose import SkipTest
24 24 from nose.plugins.attrib import attr
25 25
26 26 from IPython import parallel as pmod
27 27 from IPython.parallel import error
28 28
29 29 from IPython.parallel.tests import add_engines
30 30
31 31 from .clienttest import ClusterTestCase, crash, wait, skip_without
32 32
33 33 def setup():
34 34 add_engines(3, total=True)
35 35
36 36 class TestLoadBalancedView(ClusterTestCase):
37 37
38 38 def setUp(self):
39 39 ClusterTestCase.setUp(self)
40 40 self.view = self.client.load_balanced_view()
41 41
42 42 @attr('crash')
43 43 def test_z_crash_task(self):
44 44 """test graceful handling of engine death (balanced)"""
45 45 # self.add_engines(1)
46 46 ar = self.view.apply_async(crash)
47 47 self.assertRaisesRemote(error.EngineError, ar.get, 10)
48 48 eid = ar.engine_id
49 49 tic = time.time()
50 50 while eid in self.client.ids and time.time()-tic < 5:
51 51 time.sleep(.01)
52 52 self.client.spin()
53 53 self.assertFalse(eid in self.client.ids, "Engine should have died")
54 54
55 55 def test_map(self):
56 56 def f(x):
57 57 return x**2
58 58 data = range(16)
59 59 r = self.view.map_sync(f, data)
60 60 self.assertEqual(r, map(f, data))
61 61
62 62 def test_map_generator(self):
63 63 def f(x):
64 64 return x**2
65 65
66 66 data = range(16)
67 67 r = self.view.map_sync(f, iter(data))
68 68 self.assertEqual(r, map(f, iter(data)))
69 69
70 70 def test_map_short_first(self):
71 71 def f(x,y):
72 72 if y is None:
73 73 return y
74 74 if x is None:
75 75 return x
76 76 return x*y
77 77 data = range(10)
78 78 data2 = range(4)
79 79
80 80 r = self.view.map_sync(f, data, data2)
81 81 self.assertEqual(r, map(f, data, data2))
82 82
83 83 def test_map_short_last(self):
84 84 def f(x,y):
85 85 if y is None:
86 86 return y
87 87 if x is None:
88 88 return x
89 89 return x*y
90 90 data = range(4)
91 91 data2 = range(10)
92 92
93 93 r = self.view.map_sync(f, data, data2)
94 94 self.assertEqual(r, map(f, data, data2))
95 95
96 96 def test_map_unordered(self):
97 97 def f(x):
98 98 return x**2
99 99 def slow_f(x):
100 100 import time
101 101 time.sleep(0.05*x)
102 102 return x**2
103 103 data = range(16,0,-1)
104 104 reference = map(f, data)
105 105
106 106 amr = self.view.map_async(slow_f, data, ordered=False)
107 107 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
108 108 # check individual elements, retrieved as they come
109 109 # list comprehension uses __iter__
110 110 astheycame = [ r for r in amr ]
111 111 # Ensure that at least one result came out of order:
112 112 self.assertNotEqual(astheycame, reference, "should not have preserved order")
113 113 self.assertEqual(sorted(astheycame, reverse=True), reference, "result corrupted")
114 114
115 115 def test_map_ordered(self):
116 116 def f(x):
117 117 return x**2
118 118 def slow_f(x):
119 119 import time
120 120 time.sleep(0.05*x)
121 121 return x**2
122 122 data = range(16,0,-1)
123 123 reference = map(f, data)
124 124
125 125 amr = self.view.map_async(slow_f, data)
126 126 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
127 127 # check individual elements, retrieved as they come
128 128 # list(amr) uses __iter__
129 129 astheycame = list(amr)
130 130 # Ensure that results came in order
131 131 self.assertEqual(astheycame, reference)
132 132 self.assertEqual(amr.result, reference)
133 133
134 134 def test_map_iterable(self):
135 135 """test map on iterables (balanced)"""
136 136 view = self.view
137 137 # 101 is prime, so it won't be evenly distributed
138 138 arr = range(101)
139 139 # so that it will be an iterator, even in Python 3
140 140 it = iter(arr)
141 141 r = view.map_sync(lambda x:x, arr)
142 142 self.assertEqual(r, list(arr))
143 143
144 144
145 145 def test_abort(self):
146 146 view = self.view
147 147 ar = self.client[:].apply_async(time.sleep, .5)
148 148 ar = self.client[:].apply_async(time.sleep, .5)
149 149 time.sleep(0.2)
150 150 ar2 = view.apply_async(lambda : 2)
151 151 ar3 = view.apply_async(lambda : 3)
152 152 view.abort(ar2)
153 153 view.abort(ar3.msg_ids)
154 154 self.assertRaises(error.TaskAborted, ar2.get)
155 155 self.assertRaises(error.TaskAborted, ar3.get)
156 156
157 157 def test_retries(self):
158 158 view = self.view
159 view.timeout = 1 # prevent hang if this doesn't behave
160 159 def fail():
161 160 assert False
162 161 for r in range(len(self.client)-1):
163 162 with view.temp_flags(retries=r):
164 163 self.assertRaisesRemote(AssertionError, view.apply_sync, fail)
165 164
166 with view.temp_flags(retries=len(self.client), timeout=0.25):
165 with view.temp_flags(retries=len(self.client), timeout=0.1):
167 166 self.assertRaisesRemote(error.TaskTimeout, view.apply_sync, fail)
168 167
168 def test_short_timeout(self):
169 view = self.view
170 def fail():
171 import time
172 time.sleep(0.25)
173 assert False
174 with view.temp_flags(retries=1, timeout=0.01):
175 self.assertRaisesRemote(AssertionError, view.apply_sync, fail)
176
169 177 def test_invalid_dependency(self):
170 178 view = self.view
171 179 with view.temp_flags(after='12345'):
172 180 self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1)
173 181
174 182 def test_impossible_dependency(self):
175 183 self.minimum_engines(2)
176 184 view = self.client.load_balanced_view()
177 185 ar1 = view.apply_async(lambda : 1)
178 186 ar1.get()
179 187 e1 = ar1.engine_id
180 188 e2 = e1
181 189 while e2 == e1:
182 190 ar2 = view.apply_async(lambda : 1)
183 191 ar2.get()
184 192 e2 = ar2.engine_id
185 193
186 194 with view.temp_flags(follow=[ar1, ar2]):
187 195 self.assertRaisesRemote(error.ImpossibleDependency, view.apply_sync, lambda : 1)
188 196
189 197
190 198 def test_follow(self):
191 199 ar = self.view.apply_async(lambda : 1)
192 200 ar.get()
193 201 ars = []
194 202 first_id = ar.engine_id
195 203
196 204 self.view.follow = ar
197 205 for i in range(5):
198 206 ars.append(self.view.apply_async(lambda : 1))
199 207 self.view.wait(ars)
200 208 for ar in ars:
201 209 self.assertEqual(ar.engine_id, first_id)
202 210
203 211 def test_after(self):
204 212 view = self.view
205 213 ar = view.apply_async(time.sleep, 0.5)
206 214 with view.temp_flags(after=ar):
207 215 ar2 = view.apply_async(lambda : 1)
208 216
209 217 ar.wait()
210 218 ar2.wait()
211 219 self.assertTrue(ar2.started >= ar.completed, "%s not >= %s"%(ar.started, ar.completed))
General Comments 0
You need to be logged in to leave comments. Login now