##// END OF EJS Templates
minor tweak to wait_for_idle in tests
MinRK -
Show More
@@ -1,436 +1,436 b''
1 1 """Tests for parallel client.py
2 2
3 3 Authors:
4 4
5 5 * Min RK
6 6 """
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 from __future__ import division
20 20
21 21 import time
22 22 from datetime import datetime
23 23 from tempfile import mktemp
24 24
25 25 import zmq
26 26
27 27 from IPython import parallel
28 28 from IPython.parallel.client import client as clientmod
29 29 from IPython.parallel import error
30 30 from IPython.parallel import AsyncResult, AsyncHubResult
31 31 from IPython.parallel import LoadBalancedView, DirectView
32 32
33 33 from clienttest import ClusterTestCase, segfault, wait, add_engines
34 34
35 35 def setup():
36 36 add_engines(4, total=True)
37 37
38 38 class TestClient(ClusterTestCase):
39 39
40 40 def test_ids(self):
41 41 n = len(self.client.ids)
42 42 self.add_engines(2)
43 43 self.assertEquals(len(self.client.ids), n+2)
44 44
45 45 def test_view_indexing(self):
46 46 """test index access for views"""
47 47 self.minimum_engines(4)
48 48 targets = self.client._build_targets('all')[-1]
49 49 v = self.client[:]
50 50 self.assertEquals(v.targets, targets)
51 51 t = self.client.ids[2]
52 52 v = self.client[t]
53 53 self.assert_(isinstance(v, DirectView))
54 54 self.assertEquals(v.targets, t)
55 55 t = self.client.ids[2:4]
56 56 v = self.client[t]
57 57 self.assert_(isinstance(v, DirectView))
58 58 self.assertEquals(v.targets, t)
59 59 v = self.client[::2]
60 60 self.assert_(isinstance(v, DirectView))
61 61 self.assertEquals(v.targets, targets[::2])
62 62 v = self.client[1::3]
63 63 self.assert_(isinstance(v, DirectView))
64 64 self.assertEquals(v.targets, targets[1::3])
65 65 v = self.client[:-3]
66 66 self.assert_(isinstance(v, DirectView))
67 67 self.assertEquals(v.targets, targets[:-3])
68 68 v = self.client[-1]
69 69 self.assert_(isinstance(v, DirectView))
70 70 self.assertEquals(v.targets, targets[-1])
71 71 self.assertRaises(TypeError, lambda : self.client[None])
72 72
73 73 def test_lbview_targets(self):
74 74 """test load_balanced_view targets"""
75 75 v = self.client.load_balanced_view()
76 76 self.assertEquals(v.targets, None)
77 77 v = self.client.load_balanced_view(-1)
78 78 self.assertEquals(v.targets, [self.client.ids[-1]])
79 79 v = self.client.load_balanced_view('all')
80 80 self.assertEquals(v.targets, None)
81 81
82 82 def test_dview_targets(self):
83 83 """test direct_view targets"""
84 84 v = self.client.direct_view()
85 85 self.assertEquals(v.targets, 'all')
86 86 v = self.client.direct_view('all')
87 87 self.assertEquals(v.targets, 'all')
88 88 v = self.client.direct_view(-1)
89 89 self.assertEquals(v.targets, self.client.ids[-1])
90 90
91 91 def test_lazy_all_targets(self):
92 92 """test lazy evaluation of rc.direct_view('all')"""
93 93 v = self.client.direct_view()
94 94 self.assertEquals(v.targets, 'all')
95 95
96 96 def double(x):
97 97 return x*2
98 98 seq = range(100)
99 99 ref = [ double(x) for x in seq ]
100 100
101 101 # add some engines, which should be used
102 102 self.add_engines(1)
103 103 n1 = len(self.client.ids)
104 104
105 105 # simple apply
106 106 r = v.apply_sync(lambda : 1)
107 107 self.assertEquals(r, [1] * n1)
108 108
109 109 # map goes through remotefunction
110 110 r = v.map_sync(double, seq)
111 111 self.assertEquals(r, ref)
112 112
113 113 # add a couple more engines, and try again
114 114 self.add_engines(2)
115 115 n2 = len(self.client.ids)
116 116 self.assertNotEquals(n2, n1)
117 117
118 118 # apply
119 119 r = v.apply_sync(lambda : 1)
120 120 self.assertEquals(r, [1] * n2)
121 121
122 122 # map
123 123 r = v.map_sync(double, seq)
124 124 self.assertEquals(r, ref)
125 125
126 126 def test_targets(self):
127 127 """test various valid targets arguments"""
128 128 build = self.client._build_targets
129 129 ids = self.client.ids
130 130 idents,targets = build(None)
131 131 self.assertEquals(ids, targets)
132 132
133 133 def test_clear(self):
134 134 """test clear behavior"""
135 135 self.minimum_engines(2)
136 136 v = self.client[:]
137 137 v.block=True
138 138 v.push(dict(a=5))
139 139 v.pull('a')
140 140 id0 = self.client.ids[-1]
141 141 self.client.clear(targets=id0, block=True)
142 142 a = self.client[:-1].get('a')
143 143 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
144 144 self.client.clear(block=True)
145 145 for i in self.client.ids:
146 146 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
147 147
148 148 def test_get_result(self):
149 149 """test getting results from the Hub."""
150 150 c = clientmod.Client(profile='iptest')
151 151 t = c.ids[-1]
152 152 ar = c[t].apply_async(wait, 1)
153 153 # give the monitor time to notice the message
154 154 time.sleep(.25)
155 155 ahr = self.client.get_result(ar.msg_ids)
156 156 self.assertTrue(isinstance(ahr, AsyncHubResult))
157 157 self.assertEquals(ahr.get(), ar.get())
158 158 ar2 = self.client.get_result(ar.msg_ids)
159 159 self.assertFalse(isinstance(ar2, AsyncHubResult))
160 160 c.close()
161 161
162 162 def test_ids_list(self):
163 163 """test client.ids"""
164 164 ids = self.client.ids
165 165 self.assertEquals(ids, self.client._ids)
166 166 self.assertFalse(ids is self.client._ids)
167 167 ids.remove(ids[-1])
168 168 self.assertNotEquals(ids, self.client._ids)
169 169
170 170 def test_queue_status(self):
171 171 ids = self.client.ids
172 172 id0 = ids[0]
173 173 qs = self.client.queue_status(targets=id0)
174 174 self.assertTrue(isinstance(qs, dict))
175 175 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
176 176 allqs = self.client.queue_status()
177 177 self.assertTrue(isinstance(allqs, dict))
178 178 intkeys = list(allqs.keys())
179 179 intkeys.remove('unassigned')
180 180 self.assertEquals(sorted(intkeys), sorted(self.client.ids))
181 181 unassigned = allqs.pop('unassigned')
182 182 for eid,qs in allqs.items():
183 183 self.assertTrue(isinstance(qs, dict))
184 184 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
185 185
186 186 def test_shutdown(self):
187 187 ids = self.client.ids
188 188 id0 = ids[0]
189 189 self.client.shutdown(id0, block=True)
190 190 while id0 in self.client.ids:
191 191 time.sleep(0.1)
192 192 self.client.spin()
193 193
194 194 self.assertRaises(IndexError, lambda : self.client[id0])
195 195
196 196 def test_result_status(self):
197 197 pass
198 198 # to be written
199 199
200 200 def test_db_query_dt(self):
201 201 """test db query by date"""
202 202 hist = self.client.hub_history()
203 203 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
204 204 tic = middle['submitted']
205 205 before = self.client.db_query({'submitted' : {'$lt' : tic}})
206 206 after = self.client.db_query({'submitted' : {'$gte' : tic}})
207 207 self.assertEquals(len(before)+len(after),len(hist))
208 208 for b in before:
209 209 self.assertTrue(b['submitted'] < tic)
210 210 for a in after:
211 211 self.assertTrue(a['submitted'] >= tic)
212 212 same = self.client.db_query({'submitted' : tic})
213 213 for s in same:
214 214 self.assertTrue(s['submitted'] == tic)
215 215
216 216 def test_db_query_keys(self):
217 217 """test extracting subset of record keys"""
218 218 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
219 219 for rec in found:
220 220 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
221 221
222 222 def test_db_query_default_keys(self):
223 223 """default db_query excludes buffers"""
224 224 found = self.client.db_query({'msg_id': {'$ne' : ''}})
225 225 for rec in found:
226 226 keys = set(rec.keys())
227 227 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
228 228 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
229 229
230 230 def test_db_query_msg_id(self):
231 231 """ensure msg_id is always in db queries"""
232 232 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
233 233 for rec in found:
234 234 self.assertTrue('msg_id' in rec.keys())
235 235 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
236 236 for rec in found:
237 237 self.assertTrue('msg_id' in rec.keys())
238 238 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
239 239 for rec in found:
240 240 self.assertTrue('msg_id' in rec.keys())
241 241
242 242 def test_db_query_get_result(self):
243 243 """pop in db_query shouldn't pop from result itself"""
244 244 self.client[:].apply_sync(lambda : 1)
245 245 found = self.client.db_query({'msg_id': {'$ne' : ''}})
246 246 rc2 = clientmod.Client(profile='iptest')
247 247 # If this bug is not fixed, this call will hang:
248 248 ar = rc2.get_result(self.client.history[-1])
249 249 ar.wait(2)
250 250 self.assertTrue(ar.ready())
251 251 ar.get()
252 252 rc2.close()
253 253
254 254 def test_db_query_in(self):
255 255 """test db query with '$in','$nin' operators"""
256 256 hist = self.client.hub_history()
257 257 even = hist[::2]
258 258 odd = hist[1::2]
259 259 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
260 260 found = [ r['msg_id'] for r in recs ]
261 261 self.assertEquals(set(even), set(found))
262 262 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
263 263 found = [ r['msg_id'] for r in recs ]
264 264 self.assertEquals(set(odd), set(found))
265 265
266 266 def test_hub_history(self):
267 267 hist = self.client.hub_history()
268 268 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
269 269 recdict = {}
270 270 for rec in recs:
271 271 recdict[rec['msg_id']] = rec
272 272
273 273 latest = datetime(1984,1,1)
274 274 for msg_id in hist:
275 275 rec = recdict[msg_id]
276 276 newt = rec['submitted']
277 277 self.assertTrue(newt >= latest)
278 278 latest = newt
279 279 ar = self.client[-1].apply_async(lambda : 1)
280 280 ar.get()
281 281 time.sleep(0.25)
282 282 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
283 283
284 284 def _wait_for_idle(self):
285 285 """wait for an engine to become idle, according to the Hub"""
286 286 rc = self.client
287 287
288 # timeout 2s, polling every 100ms
289 for i in range(20):
290 qs = rc.queue_status()
288 # timeout 5s, polling every 100ms
289 qs = rc.queue_status()
290 for i in range(50):
291 291 if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids):
292 292 time.sleep(0.1)
293 qs = rc.queue_status()
293 294 else:
294 295 break
295 296
296 297 # ensure Hub up to date:
297 qs = rc.queue_status()
298 298 self.assertEquals(qs['unassigned'], 0)
299 299 for eid in rc.ids:
300 300 self.assertEquals(qs[eid]['tasks'], 0)
301 301
302 302
303 303 def test_resubmit(self):
304 304 def f():
305 305 import random
306 306 return random.random()
307 307 v = self.client.load_balanced_view()
308 308 ar = v.apply_async(f)
309 309 r1 = ar.get(1)
310 310 # give the Hub a chance to notice:
311 311 self._wait_for_idle()
312 312 ahr = self.client.resubmit(ar.msg_ids)
313 313 r2 = ahr.get(1)
314 314 self.assertFalse(r1 == r2)
315 315
316 316 def test_resubmit_chain(self):
317 317 """resubmit resubmitted tasks"""
318 318 v = self.client.load_balanced_view()
319 319 ar = v.apply_async(lambda x: x, 'x'*1024)
320 320 ar.get()
321 321 self._wait_for_idle()
322 322 ars = [ar]
323 323
324 324 for i in range(10):
325 325 ar = ars[-1]
326 326 ar2 = self.client.resubmit(ar.msg_ids)
327 327
328 328 [ ar.get() for ar in ars ]
329 329
330 330 def test_resubmit_header(self):
331 331 """resubmit shouldn't clobber the whole header"""
332 332 def f():
333 333 import random
334 334 return random.random()
335 335 v = self.client.load_balanced_view()
336 336 v.retries = 1
337 337 ar = v.apply_async(f)
338 338 r1 = ar.get(1)
339 339 # give the Hub a chance to notice:
340 340 self._wait_for_idle()
341 341 ahr = self.client.resubmit(ar.msg_ids)
342 342 ahr.get(1)
343 343 time.sleep(0.5)
344 344 records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header')
345 345 h1,h2 = [ r['header'] for r in records ]
346 346 for key in set(h1.keys()).union(set(h2.keys())):
347 347 if key in ('msg_id', 'date'):
348 348 self.assertNotEquals(h1[key], h2[key])
349 349 else:
350 350 self.assertEquals(h1[key], h2[key])
351 351
352 352 def test_resubmit_aborted(self):
353 353 def f():
354 354 import random
355 355 return random.random()
356 356 v = self.client.load_balanced_view()
357 357 # restrict to one engine, so we can put a sleep
358 358 # ahead of the task, so it will get aborted
359 359 eid = self.client.ids[-1]
360 360 v.targets = [eid]
361 361 sleep = v.apply_async(time.sleep, 0.5)
362 362 ar = v.apply_async(f)
363 363 ar.abort()
364 364 self.assertRaises(error.TaskAborted, ar.get)
365 365 # Give the Hub a chance to get up to date:
366 366 self._wait_for_idle()
367 367 ahr = self.client.resubmit(ar.msg_ids)
368 368 r2 = ahr.get(1)
369 369
370 370 def test_resubmit_inflight(self):
371 371 """resubmit of inflight task"""
372 372 v = self.client.load_balanced_view()
373 373 ar = v.apply_async(time.sleep,1)
374 374 # give the message a chance to arrive
375 375 time.sleep(0.2)
376 376 ahr = self.client.resubmit(ar.msg_ids)
377 377 ar.get(2)
378 378 ahr.get(2)
379 379
380 380 def test_resubmit_badkey(self):
381 381 """ensure KeyError on resubmit of nonexistant task"""
382 382 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
383 383
384 384 def test_purge_results(self):
385 385 # ensure there are some tasks
386 386 for i in range(5):
387 387 self.client[:].apply_sync(lambda : 1)
388 388 # Wait for the Hub to realise the result is done:
389 389 # This prevents a race condition, where we
390 390 # might purge a result the Hub still thinks is pending.
391 391 time.sleep(0.1)
392 392 rc2 = clientmod.Client(profile='iptest')
393 393 hist = self.client.hub_history()
394 394 ahr = rc2.get_result([hist[-1]])
395 395 ahr.wait(10)
396 396 self.client.purge_results(hist[-1])
397 397 newhist = self.client.hub_history()
398 398 self.assertEquals(len(newhist)+1,len(hist))
399 399 rc2.spin()
400 400 rc2.close()
401 401
402 402 def test_purge_all_results(self):
403 403 self.client.purge_results('all')
404 404 hist = self.client.hub_history()
405 405 self.assertEquals(len(hist), 0)
406 406
407 407 def test_spin_thread(self):
408 408 self.client.spin_thread(0.01)
409 409 ar = self.client[-1].apply_async(lambda : 1)
410 410 time.sleep(0.1)
411 411 self.assertTrue(ar.wall_time < 0.1,
412 412 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
413 413 )
414 414
415 415 def test_stop_spin_thread(self):
416 416 self.client.spin_thread(0.01)
417 417 self.client.stop_spin_thread()
418 418 ar = self.client[-1].apply_async(lambda : 1)
419 419 time.sleep(0.15)
420 420 self.assertTrue(ar.wall_time > 0.1,
421 421 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
422 422 )
423 423
424 424 def test_activate(self):
425 425 ip = get_ipython()
426 426 magics = ip.magics_manager.magics
427 427 self.assertTrue('px' in magics['line'])
428 428 self.assertTrue('px' in magics['cell'])
429 429 v0 = self.client.activate(-1, '0')
430 430 self.assertTrue('px0' in magics['line'])
431 431 self.assertTrue('px0' in magics['cell'])
432 432 self.assertEquals(v0.targets, self.client.ids[-1])
433 433 v0 = self.client.activate('all', 'all')
434 434 self.assertTrue('pxall' in magics['line'])
435 435 self.assertTrue('pxall' in magics['cell'])
436 436 self.assertEquals(v0.targets, 'all')
General Comments 0
You need to be logged in to leave comments. Login now