##// END OF EJS Templates
further tweaks to parallel tests...
MinRK -
Show More
@@ -1,126 +1,129 b''
1 1 # -*- coding: utf-8 -*-
2 2 """test LoadBalancedView objects
3 3
4 4 Authors:
5 5
6 6 * Min RK
7 7 """
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 import sys
20 20 import time
21 21
22 22 import zmq
23 from nose import SkipTest
23 24
24 25 from IPython import parallel as pmod
25 26 from IPython.parallel import error
26 27
27 28 from IPython.parallel.tests import add_engines
28 29
29 30 from .clienttest import ClusterTestCase, crash, wait, skip_without
30 31
31 32 def setup():
32 33 add_engines(3)
33 34
34 35 class TestLoadBalancedView(ClusterTestCase):
35 36
36 37 def setUp(self):
37 38 ClusterTestCase.setUp(self)
38 39 self.view = self.client.load_balanced_view()
39 40
40 41 def test_z_crash_task(self):
41 42 """test graceful handling of engine death (balanced)"""
43 raise SkipTest("crash tests disabled, due to undesirable crash reports")
42 44 # self.add_engines(1)
43 45 ar = self.view.apply_async(crash)
44 46 self.assertRaisesRemote(error.EngineError, ar.get, 10)
45 47 eid = ar.engine_id
46 48 tic = time.time()
47 49 while eid in self.client.ids and time.time()-tic < 5:
48 50 time.sleep(.01)
49 51 self.client.spin()
50 52 self.assertFalse(eid in self.client.ids, "Engine should have died")
51 53
52 54 def test_map(self):
53 55 def f(x):
54 56 return x**2
55 57 data = range(16)
56 58 r = self.view.map_sync(f, data)
57 59 self.assertEquals(r, map(f, data))
58 60
59 61 def test_abort(self):
60 62 view = self.view
61 63 ar = self.client[:].apply_async(time.sleep, .5)
62 64 ar = self.client[:].apply_async(time.sleep, .5)
65 time.sleep(0.2)
63 66 ar2 = view.apply_async(lambda : 2)
64 67 ar3 = view.apply_async(lambda : 3)
65 68 view.abort(ar2)
66 69 view.abort(ar3.msg_ids)
67 70 self.assertRaises(error.TaskAborted, ar2.get)
68 71 self.assertRaises(error.TaskAborted, ar3.get)
69 72
70 73 def test_retries(self):
71 74 add_engines(3)
72 75 view = self.view
73 76 view.timeout = 1 # prevent hang if this doesn't behave
74 77 def fail():
75 78 assert False
76 79 for r in range(len(self.client)-1):
77 80 with view.temp_flags(retries=r):
78 81 self.assertRaisesRemote(AssertionError, view.apply_sync, fail)
79 82
80 83 with view.temp_flags(retries=len(self.client), timeout=0.25):
81 84 self.assertRaisesRemote(error.TaskTimeout, view.apply_sync, fail)
82 85
83 86 def test_invalid_dependency(self):
84 87 view = self.view
85 88 with view.temp_flags(after='12345'):
86 89 self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1)
87 90
88 91 def test_impossible_dependency(self):
89 92 if len(self.client) < 2:
90 93 add_engines(2)
91 94 view = self.client.load_balanced_view()
92 95 ar1 = view.apply_async(lambda : 1)
93 96 ar1.get()
94 97 e1 = ar1.engine_id
95 98 e2 = e1
96 99 while e2 == e1:
97 100 ar2 = view.apply_async(lambda : 1)
98 101 ar2.get()
99 102 e2 = ar2.engine_id
100 103
101 104 with view.temp_flags(follow=[ar1, ar2]):
102 105 self.assertRaisesRemote(error.ImpossibleDependency, view.apply_sync, lambda : 1)
103 106
104 107
105 108 def test_follow(self):
106 109 ar = self.view.apply_async(lambda : 1)
107 110 ar.get()
108 111 ars = []
109 112 first_id = ar.engine_id
110 113
111 114 self.view.follow = ar
112 115 for i in range(5):
113 116 ars.append(self.view.apply_async(lambda : 1))
114 117 self.view.wait(ars)
115 118 for ar in ars:
116 119 self.assertEquals(ar.engine_id, first_id)
117 120
118 121 def test_after(self):
119 122 view = self.view
120 123 ar = view.apply_async(time.sleep, 0.5)
121 124 with view.temp_flags(after=ar):
122 125 ar2 = view.apply_async(lambda : 1)
123 126
124 127 ar.wait()
125 128 ar2.wait()
126 129 self.assertTrue(ar2.started > ar.completed)
@@ -1,447 +1,451 b''
1 1 # -*- coding: utf-8 -*-
2 2 """test View objects
3 3
4 4 Authors:
5 5
6 6 * Min RK
7 7 """
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 import sys
20 20 import time
21 21 from tempfile import mktemp
22 22 from StringIO import StringIO
23 23
24 24 import zmq
25 from nose import SkipTest
25 26
26 27 from IPython import parallel as pmod
27 28 from IPython.parallel import error
28 29 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
29 30 from IPython.parallel import DirectView
30 31 from IPython.parallel.util import interactive
31 32
32 33 from IPython.parallel.tests import add_engines
33 34
34 35 from .clienttest import ClusterTestCase, crash, wait, skip_without
35 36
36 37 def setup():
37 38 add_engines(3)
38 39
39 40 class TestView(ClusterTestCase):
40 41
41 42 def test_z_crash_mux(self):
42 43 """test graceful handling of engine death (direct)"""
44 raise SkipTest("crash tests disabled, due to undesirable crash reports")
43 45 # self.add_engines(1)
44 46 eid = self.client.ids[-1]
45 47 ar = self.client[eid].apply_async(crash)
46 48 self.assertRaisesRemote(error.EngineError, ar.get, 10)
47 49 eid = ar.engine_id
48 50 tic = time.time()
49 51 while eid in self.client.ids and time.time()-tic < 5:
50 52 time.sleep(.01)
51 53 self.client.spin()
52 54 self.assertFalse(eid in self.client.ids, "Engine should have died")
53 55
54 56 def test_push_pull(self):
55 57 """test pushing and pulling"""
56 58 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
57 59 t = self.client.ids[-1]
58 60 v = self.client[t]
59 61 push = v.push
60 62 pull = v.pull
61 63 v.block=True
62 64 nengines = len(self.client)
63 65 push({'data':data})
64 66 d = pull('data')
65 67 self.assertEquals(d, data)
66 68 self.client[:].push({'data':data})
67 69 d = self.client[:].pull('data', block=True)
68 70 self.assertEquals(d, nengines*[data])
69 71 ar = push({'data':data}, block=False)
70 72 self.assertTrue(isinstance(ar, AsyncResult))
71 73 r = ar.get()
72 74 ar = self.client[:].pull('data', block=False)
73 75 self.assertTrue(isinstance(ar, AsyncResult))
74 76 r = ar.get()
75 77 self.assertEquals(r, nengines*[data])
76 78 self.client[:].push(dict(a=10,b=20))
77 79 r = self.client[:].pull(('a','b'), block=True)
78 80 self.assertEquals(r, nengines*[[10,20]])
79 81
80 82 def test_push_pull_function(self):
81 83 "test pushing and pulling functions"
82 84 def testf(x):
83 85 return 2.0*x
84 86
85 87 t = self.client.ids[-1]
86 88 v = self.client[t]
87 89 v.block=True
88 90 push = v.push
89 91 pull = v.pull
90 92 execute = v.execute
91 93 push({'testf':testf})
92 94 r = pull('testf')
93 95 self.assertEqual(r(1.0), testf(1.0))
94 96 execute('r = testf(10)')
95 97 r = pull('r')
96 98 self.assertEquals(r, testf(10))
97 99 ar = self.client[:].push({'testf':testf}, block=False)
98 100 ar.get()
99 101 ar = self.client[:].pull('testf', block=False)
100 102 rlist = ar.get()
101 103 for r in rlist:
102 104 self.assertEqual(r(1.0), testf(1.0))
103 105 execute("def g(x): return x*x")
104 106 r = pull(('testf','g'))
105 107 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
106 108
107 109 def test_push_function_globals(self):
108 110 """test that pushed functions have access to globals"""
109 111 @interactive
110 112 def geta():
111 113 return a
112 114 # self.add_engines(1)
113 115 v = self.client[-1]
114 116 v.block=True
115 117 v['f'] = geta
116 118 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
117 119 v.execute('a=5')
118 120 v.execute('b=f()')
119 121 self.assertEquals(v['b'], 5)
120 122
121 123 def test_push_function_defaults(self):
122 124 """test that pushed functions preserve default args"""
123 125 def echo(a=10):
124 126 return a
125 127 v = self.client[-1]
126 128 v.block=True
127 129 v['f'] = echo
128 130 v.execute('b=f()')
129 131 self.assertEquals(v['b'], 10)
130 132
131 133 def test_get_result(self):
132 134 """test getting results from the Hub."""
133 135 c = pmod.Client(profile='iptest')
134 136 # self.add_engines(1)
135 137 t = c.ids[-1]
136 138 v = c[t]
137 139 v2 = self.client[t]
138 140 ar = v.apply_async(wait, 1)
139 141 # give the monitor time to notice the message
140 142 time.sleep(.25)
141 143 ahr = v2.get_result(ar.msg_ids)
142 144 self.assertTrue(isinstance(ahr, AsyncHubResult))
143 145 self.assertEquals(ahr.get(), ar.get())
144 146 ar2 = v2.get_result(ar.msg_ids)
145 147 self.assertFalse(isinstance(ar2, AsyncHubResult))
146 148 c.spin()
147 149 c.close()
148 150
149 151 def test_run_newline(self):
150 152 """test that run appends newline to files"""
151 153 tmpfile = mktemp()
152 154 with open(tmpfile, 'w') as f:
153 155 f.write("""def g():
154 156 return 5
155 157 """)
156 158 v = self.client[-1]
157 159 v.run(tmpfile, block=True)
158 160 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
159 161
160 162 def test_apply_tracked(self):
161 163 """test tracking for apply"""
162 164 # self.add_engines(1)
163 165 t = self.client.ids[-1]
164 166 v = self.client[t]
165 167 v.block=False
166 168 def echo(n=1024*1024, **kwargs):
167 169 with v.temp_flags(**kwargs):
168 170 return v.apply(lambda x: x, 'x'*n)
169 171 ar = echo(1, track=False)
170 172 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
171 173 self.assertTrue(ar.sent)
172 174 ar = echo(track=True)
173 175 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
174 176 self.assertEquals(ar.sent, ar._tracker.done)
175 177 ar._tracker.wait()
176 178 self.assertTrue(ar.sent)
177 179
178 180 def test_push_tracked(self):
179 181 t = self.client.ids[-1]
180 182 ns = dict(x='x'*1024*1024)
181 183 v = self.client[t]
182 184 ar = v.push(ns, block=False, track=False)
183 185 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
184 186 self.assertTrue(ar.sent)
185 187
186 188 ar = v.push(ns, block=False, track=True)
187 189 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
188 self.assertEquals(ar.sent, ar._tracker.done)
189 190 ar._tracker.wait()
191 self.assertEquals(ar.sent, ar._tracker.done)
190 192 self.assertTrue(ar.sent)
191 193 ar.get()
192 194
193 195 def test_scatter_tracked(self):
194 196 t = self.client.ids
195 197 x='x'*1024*1024
196 198 ar = self.client[t].scatter('x', x, block=False, track=False)
197 199 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
198 200 self.assertTrue(ar.sent)
199 201
200 202 ar = self.client[t].scatter('x', x, block=False, track=True)
201 203 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
202 204 self.assertEquals(ar.sent, ar._tracker.done)
203 205 ar._tracker.wait()
204 206 self.assertTrue(ar.sent)
205 207 ar.get()
206 208
207 209 def test_remote_reference(self):
208 210 v = self.client[-1]
209 211 v['a'] = 123
210 212 ra = pmod.Reference('a')
211 213 b = v.apply_sync(lambda x: x, ra)
212 214 self.assertEquals(b, 123)
213 215
214 216
215 217 def test_scatter_gather(self):
216 218 view = self.client[:]
217 219 seq1 = range(16)
218 220 view.scatter('a', seq1)
219 221 seq2 = view.gather('a', block=True)
220 222 self.assertEquals(seq2, seq1)
221 223 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
222 224
223 225 @skip_without('numpy')
224 226 def test_scatter_gather_numpy(self):
225 227 import numpy
226 228 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
227 229 view = self.client[:]
228 230 a = numpy.arange(64)
229 231 view.scatter('a', a)
230 232 b = view.gather('a', block=True)
231 233 assert_array_equal(b, a)
232 234
233 235 def test_map(self):
234 236 view = self.client[:]
235 237 def f(x):
236 238 return x**2
237 239 data = range(16)
238 240 r = view.map_sync(f, data)
239 241 self.assertEquals(r, map(f, data))
240 242
241 243 def test_scatterGatherNonblocking(self):
242 244 data = range(16)
243 245 view = self.client[:]
244 246 view.scatter('a', data, block=False)
245 247 ar = view.gather('a', block=False)
246 248 self.assertEquals(ar.get(), data)
247 249
248 250 @skip_without('numpy')
249 251 def test_scatter_gather_numpy_nonblocking(self):
250 252 import numpy
251 253 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
252 254 a = numpy.arange(64)
253 255 view = self.client[:]
254 256 ar = view.scatter('a', a, block=False)
255 257 self.assertTrue(isinstance(ar, AsyncResult))
256 258 amr = view.gather('a', block=False)
257 259 self.assertTrue(isinstance(amr, AsyncMapResult))
258 260 assert_array_equal(amr.get(), a)
259 261
260 262 def test_execute(self):
261 263 view = self.client[:]
262 264 # self.client.debug=True
263 265 execute = view.execute
264 266 ar = execute('c=30', block=False)
265 267 self.assertTrue(isinstance(ar, AsyncResult))
266 268 ar = execute('d=[0,1,2]', block=False)
267 269 self.client.wait(ar, 1)
268 270 self.assertEquals(len(ar.get()), len(self.client))
269 271 for c in view['c']:
270 272 self.assertEquals(c, 30)
271 273
272 274 def test_abort(self):
273 275 view = self.client[-1]
274 ar = view.execute('import time; time.sleep(0.25)', block=False)
276 ar = view.execute('import time; time.sleep(1)', block=False)
275 277 ar2 = view.apply_async(lambda : 2)
276 278 ar3 = view.apply_async(lambda : 3)
277 279 view.abort(ar2)
278 280 view.abort(ar3.msg_ids)
279 281 self.assertRaises(error.TaskAborted, ar2.get)
280 282 self.assertRaises(error.TaskAborted, ar3.get)
281 283
282 284 def test_temp_flags(self):
283 285 view = self.client[-1]
284 286 view.block=True
285 287 with view.temp_flags(block=False):
286 288 self.assertFalse(view.block)
287 289 self.assertTrue(view.block)
288 290
289 291 def test_importer(self):
290 292 view = self.client[-1]
291 293 view.clear(block=True)
292 294 with view.importer:
293 295 import re
294 296
295 297 @interactive
296 298 def findall(pat, s):
297 299 # this globals() step isn't necessary in real code
298 300 # only to prevent a closure in the test
299 301 re = globals()['re']
300 302 return re.findall(pat, s)
301 303
302 304 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
303 305
304 306 # parallel magic tests
305 307
306 308 def test_magic_px_blocking(self):
307 309 ip = get_ipython()
308 310 v = self.client[-1]
309 311 v.activate()
310 312 v.block=True
311 313
312 314 ip.magic_px('a=5')
313 315 self.assertEquals(v['a'], 5)
314 316 ip.magic_px('a=10')
315 317 self.assertEquals(v['a'], 10)
316 318 sio = StringIO()
317 319 savestdout = sys.stdout
318 320 sys.stdout = sio
319 ip.magic_px('print a')
321 # just 'print a' worst ~99% of the time, but this ensures that
322 # the stdout message has arrived when the result is finished:
323 ip.magic_px('import sys,time;print a; sys.stdout.flush();time.sleep(0.2)')
320 324 sys.stdout = savestdout
321 325 buf = sio.getvalue()
322 326 self.assertTrue('[stdout:' in buf, buf)
323 327 self.assertTrue(buf.rstrip().endswith('10'))
324 328 self.assertRaisesRemote(ZeroDivisionError, ip.magic_px, '1/0')
325 329
326 330 def test_magic_px_nonblocking(self):
327 331 ip = get_ipython()
328 332 v = self.client[-1]
329 333 v.activate()
330 334 v.block=False
331 335
332 336 ip.magic_px('a=5')
333 337 self.assertEquals(v['a'], 5)
334 338 ip.magic_px('a=10')
335 339 self.assertEquals(v['a'], 10)
336 340 sio = StringIO()
337 341 savestdout = sys.stdout
338 342 sys.stdout = sio
339 343 ip.magic_px('print a')
340 344 sys.stdout = savestdout
341 345 buf = sio.getvalue()
342 346 self.assertFalse('[stdout:%i]'%v.targets in buf)
343 347 ip.magic_px('1/0')
344 348 ar = v.get_result(-1)
345 349 self.assertRaisesRemote(ZeroDivisionError, ar.get)
346 350
347 351 def test_magic_autopx_blocking(self):
348 352 ip = get_ipython()
349 353 v = self.client[-1]
350 354 v.activate()
351 355 v.block=True
352 356
353 357 sio = StringIO()
354 358 savestdout = sys.stdout
355 359 sys.stdout = sio
356 360 ip.magic_autopx()
357 361 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
358 362 ip.run_cell('print b')
359 363 ip.run_cell("b/c")
360 364 ip.run_code(compile('b*=2', '', 'single'))
361 365 ip.magic_autopx()
362 366 sys.stdout = savestdout
363 367 output = sio.getvalue().strip()
364 368 self.assertTrue(output.startswith('%autopx enabled'))
365 369 self.assertTrue(output.endswith('%autopx disabled'))
366 370 self.assertTrue('RemoteError: ZeroDivisionError' in output)
367 371 ar = v.get_result(-2)
368 372 self.assertEquals(v['a'], 5)
369 373 self.assertEquals(v['b'], 20)
370 374 self.assertRaisesRemote(ZeroDivisionError, ar.get)
371 375
372 376 def test_magic_autopx_nonblocking(self):
373 377 ip = get_ipython()
374 378 v = self.client[-1]
375 379 v.activate()
376 380 v.block=False
377 381
378 382 sio = StringIO()
379 383 savestdout = sys.stdout
380 384 sys.stdout = sio
381 385 ip.magic_autopx()
382 386 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
383 387 ip.run_cell('print b')
384 388 ip.run_cell("b/c")
385 389 ip.run_code(compile('b*=2', '', 'single'))
386 390 ip.magic_autopx()
387 391 sys.stdout = savestdout
388 392 output = sio.getvalue().strip()
389 393 self.assertTrue(output.startswith('%autopx enabled'))
390 394 self.assertTrue(output.endswith('%autopx disabled'))
391 395 self.assertFalse('ZeroDivisionError' in output)
392 396 ar = v.get_result(-2)
393 397 self.assertEquals(v['a'], 5)
394 398 self.assertEquals(v['b'], 20)
395 399 self.assertRaisesRemote(ZeroDivisionError, ar.get)
396 400
397 401 def test_magic_result(self):
398 402 ip = get_ipython()
399 403 v = self.client[-1]
400 404 v.activate()
401 405 v['a'] = 111
402 406 ra = v['a']
403 407
404 408 ar = ip.magic_result()
405 409 self.assertEquals(ar.msg_ids, [v.history[-1]])
406 410 self.assertEquals(ar.get(), 111)
407 411 ar = ip.magic_result('-2')
408 412 self.assertEquals(ar.msg_ids, [v.history[-2]])
409 413
410 414 def test_unicode_execute(self):
411 415 """test executing unicode strings"""
412 416 v = self.client[-1]
413 417 v.block=True
414 418 if sys.version_info[0] >= 3:
415 419 code="a='é'"
416 420 else:
417 421 code=u"a=u'é'"
418 422 v.execute(code)
419 423 self.assertEquals(v['a'], u'é')
420 424
421 425 def test_unicode_apply_result(self):
422 426 """test unicode apply results"""
423 427 v = self.client[-1]
424 428 r = v.apply_sync(lambda : u'é')
425 429 self.assertEquals(r, u'é')
426 430
427 431 def test_unicode_apply_arg(self):
428 432 """test passing unicode arguments to apply"""
429 433 v = self.client[-1]
430 434
431 435 @interactive
432 436 def check_unicode(a, check):
433 437 assert isinstance(a, unicode), "%r is not unicode"%a
434 438 assert isinstance(check, bytes), "%r is not bytes"%check
435 439 assert a.encode('utf8') == check, "%s != %s"%(a,check)
436 440
437 441 for s in [ u'é', u'ßø®∫',u'asdf' ]:
438 442 try:
439 443 v.apply_sync(check_unicode, s, s.encode('utf8'))
440 444 except error.RemoteError as e:
441 445 if e.ename == 'AssertionError':
442 446 self.fail(e.evalue)
443 447 else:
444 448 raise e
445 449
446 450
447 451
General Comments 0
You need to be logged in to leave comments. Login now