##// END OF EJS Templates
move coding declaration above docstring in test_view...
MinRK -
Show More
@@ -1,125 +1,125 b''
1 # -*- coding: utf-8 -*-
1 2 """test LoadBalancedView objects
2 3
3 4 Authors:
4 5
5 6 * Min RK
6 7 """
7 # -*- coding: utf-8 -*-
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 import sys
20 20 import time
21 21
22 22 import zmq
23 23
24 24 from IPython import parallel as pmod
25 25 from IPython.parallel import error
26 26
27 27 from IPython.parallel.tests import add_engines
28 28
29 29 from .clienttest import ClusterTestCase, crash, wait, skip_without
30 30
31 31 def setup():
32 32 add_engines(3)
33 33
34 34 class TestLoadBalancedView(ClusterTestCase):
35 35
36 36 def setUp(self):
37 37 ClusterTestCase.setUp(self)
38 38 self.view = self.client.load_balanced_view()
39 39
40 40 def test_z_crash_task(self):
41 41 """test graceful handling of engine death (balanced)"""
42 42 # self.add_engines(1)
43 43 ar = self.view.apply_async(crash)
44 44 self.assertRaisesRemote(error.EngineError, ar.get, 10)
45 45 eid = ar.engine_id
46 46 tic = time.time()
47 47 while eid in self.client.ids and time.time()-tic < 5:
48 48 time.sleep(.01)
49 49 self.client.spin()
50 50 self.assertFalse(eid in self.client.ids, "Engine should have died")
51 51
52 52 def test_map(self):
53 53 def f(x):
54 54 return x**2
55 55 data = range(16)
56 56 r = self.view.map_sync(f, data)
57 57 self.assertEquals(r, map(f, data))
58 58
59 59 def test_abort(self):
60 60 view = self.view
61 61 ar = self.client[:].apply_async(time.sleep, .5)
62 62 ar2 = view.apply_async(lambda : 2)
63 63 ar3 = view.apply_async(lambda : 3)
64 64 view.abort(ar2)
65 65 view.abort(ar3.msg_ids)
66 66 self.assertRaises(error.TaskAborted, ar2.get)
67 67 self.assertRaises(error.TaskAborted, ar3.get)
68 68
69 69 def test_retries(self):
70 70 add_engines(3)
71 71 view = self.view
72 72 view.timeout = 1 # prevent hang if this doesn't behave
73 73 def fail():
74 74 assert False
75 75 for r in range(len(self.client)-1):
76 76 with view.temp_flags(retries=r):
77 77 self.assertRaisesRemote(AssertionError, view.apply_sync, fail)
78 78
79 79 with view.temp_flags(retries=len(self.client), timeout=0.25):
80 80 self.assertRaisesRemote(error.TaskTimeout, view.apply_sync, fail)
81 81
82 82 def test_invalid_dependency(self):
83 83 view = self.view
84 84 with view.temp_flags(after='12345'):
85 85 self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1)
86 86
87 87 def test_impossible_dependency(self):
88 88 if len(self.client) < 2:
89 89 add_engines(2)
90 90 view = self.client.load_balanced_view()
91 91 ar1 = view.apply_async(lambda : 1)
92 92 ar1.get()
93 93 e1 = ar1.engine_id
94 94 e2 = e1
95 95 while e2 == e1:
96 96 ar2 = view.apply_async(lambda : 1)
97 97 ar2.get()
98 98 e2 = ar2.engine_id
99 99
100 100 with view.temp_flags(follow=[ar1, ar2]):
101 101 self.assertRaisesRemote(error.ImpossibleDependency, view.apply_sync, lambda : 1)
102 102
103 103
104 104 def test_follow(self):
105 105 ar = self.view.apply_async(lambda : 1)
106 106 ar.get()
107 107 ars = []
108 108 first_id = ar.engine_id
109 109
110 110 self.view.follow = ar
111 111 for i in range(5):
112 112 ars.append(self.view.apply_async(lambda : 1))
113 113 self.view.wait(ars)
114 114 for ar in ars:
115 115 self.assertEquals(ar.engine_id, first_id)
116 116
117 117 def test_after(self):
118 118 view = self.view
119 119 ar = view.apply_async(time.sleep, 0.5)
120 120 with view.temp_flags(after=ar):
121 121 ar2 = view.apply_async(lambda : 1)
122 122
123 123 ar.wait()
124 124 ar2.wait()
125 125 self.assertTrue(ar2.started > ar.completed)
@@ -1,446 +1,446 b''
1 # -*- coding: utf-8 -*-
1 2 """test View objects
2 3
3 4 Authors:
4 5
5 6 * Min RK
6 7 """
7 # -*- coding: utf-8 -*-
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 import sys
20 20 import time
21 21 from tempfile import mktemp
22 22 from StringIO import StringIO
23 23
24 24 import zmq
25 25
26 26 from IPython import parallel as pmod
27 27 from IPython.parallel import error
28 28 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
29 29 from IPython.parallel import DirectView
30 30 from IPython.parallel.util import interactive
31 31
32 32 from IPython.parallel.tests import add_engines
33 33
34 34 from .clienttest import ClusterTestCase, crash, wait, skip_without
35 35
36 36 def setup():
37 37 add_engines(3)
38 38
39 39 class TestView(ClusterTestCase):
40 40
41 41 def test_z_crash_mux(self):
42 42 """test graceful handling of engine death (direct)"""
43 43 # self.add_engines(1)
44 44 eid = self.client.ids[-1]
45 45 ar = self.client[eid].apply_async(crash)
46 46 self.assertRaisesRemote(error.EngineError, ar.get)
47 47 eid = ar.engine_id
48 48 tic = time.time()
49 49 while eid in self.client.ids and time.time()-tic < 5:
50 50 time.sleep(.01)
51 51 self.client.spin()
52 52 self.assertFalse(eid in self.client.ids, "Engine should have died")
53 53
54 54 def test_push_pull(self):
55 55 """test pushing and pulling"""
56 56 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
57 57 t = self.client.ids[-1]
58 58 v = self.client[t]
59 59 push = v.push
60 60 pull = v.pull
61 61 v.block=True
62 62 nengines = len(self.client)
63 63 push({'data':data})
64 64 d = pull('data')
65 65 self.assertEquals(d, data)
66 66 self.client[:].push({'data':data})
67 67 d = self.client[:].pull('data', block=True)
68 68 self.assertEquals(d, nengines*[data])
69 69 ar = push({'data':data}, block=False)
70 70 self.assertTrue(isinstance(ar, AsyncResult))
71 71 r = ar.get()
72 72 ar = self.client[:].pull('data', block=False)
73 73 self.assertTrue(isinstance(ar, AsyncResult))
74 74 r = ar.get()
75 75 self.assertEquals(r, nengines*[data])
76 76 self.client[:].push(dict(a=10,b=20))
77 77 r = self.client[:].pull(('a','b'), block=True)
78 78 self.assertEquals(r, nengines*[[10,20]])
79 79
80 80 def test_push_pull_function(self):
81 81 "test pushing and pulling functions"
82 82 def testf(x):
83 83 return 2.0*x
84 84
85 85 t = self.client.ids[-1]
86 86 v = self.client[t]
87 87 v.block=True
88 88 push = v.push
89 89 pull = v.pull
90 90 execute = v.execute
91 91 push({'testf':testf})
92 92 r = pull('testf')
93 93 self.assertEqual(r(1.0), testf(1.0))
94 94 execute('r = testf(10)')
95 95 r = pull('r')
96 96 self.assertEquals(r, testf(10))
97 97 ar = self.client[:].push({'testf':testf}, block=False)
98 98 ar.get()
99 99 ar = self.client[:].pull('testf', block=False)
100 100 rlist = ar.get()
101 101 for r in rlist:
102 102 self.assertEqual(r(1.0), testf(1.0))
103 103 execute("def g(x): return x*x")
104 104 r = pull(('testf','g'))
105 105 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
106 106
107 107 def test_push_function_globals(self):
108 108 """test that pushed functions have access to globals"""
109 109 @interactive
110 110 def geta():
111 111 return a
112 112 # self.add_engines(1)
113 113 v = self.client[-1]
114 114 v.block=True
115 115 v['f'] = geta
116 116 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
117 117 v.execute('a=5')
118 118 v.execute('b=f()')
119 119 self.assertEquals(v['b'], 5)
120 120
121 121 def test_push_function_defaults(self):
122 122 """test that pushed functions preserve default args"""
123 123 def echo(a=10):
124 124 return a
125 125 v = self.client[-1]
126 126 v.block=True
127 127 v['f'] = echo
128 128 v.execute('b=f()')
129 129 self.assertEquals(v['b'], 10)
130 130
131 131 def test_get_result(self):
132 132 """test getting results from the Hub."""
133 133 c = pmod.Client(profile='iptest')
134 134 # self.add_engines(1)
135 135 t = c.ids[-1]
136 136 v = c[t]
137 137 v2 = self.client[t]
138 138 ar = v.apply_async(wait, 1)
139 139 # give the monitor time to notice the message
140 140 time.sleep(.25)
141 141 ahr = v2.get_result(ar.msg_ids)
142 142 self.assertTrue(isinstance(ahr, AsyncHubResult))
143 143 self.assertEquals(ahr.get(), ar.get())
144 144 ar2 = v2.get_result(ar.msg_ids)
145 145 self.assertFalse(isinstance(ar2, AsyncHubResult))
146 146 c.spin()
147 147 c.close()
148 148
149 149 def test_run_newline(self):
150 150 """test that run appends newline to files"""
151 151 tmpfile = mktemp()
152 152 with open(tmpfile, 'w') as f:
153 153 f.write("""def g():
154 154 return 5
155 155 """)
156 156 v = self.client[-1]
157 157 v.run(tmpfile, block=True)
158 158 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
159 159
160 160 def test_apply_tracked(self):
161 161 """test tracking for apply"""
162 162 # self.add_engines(1)
163 163 t = self.client.ids[-1]
164 164 v = self.client[t]
165 165 v.block=False
166 166 def echo(n=1024*1024, **kwargs):
167 167 with v.temp_flags(**kwargs):
168 168 return v.apply(lambda x: x, 'x'*n)
169 169 ar = echo(1, track=False)
170 170 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
171 171 self.assertTrue(ar.sent)
172 172 ar = echo(track=True)
173 173 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
174 174 self.assertEquals(ar.sent, ar._tracker.done)
175 175 ar._tracker.wait()
176 176 self.assertTrue(ar.sent)
177 177
178 178 def test_push_tracked(self):
179 179 t = self.client.ids[-1]
180 180 ns = dict(x='x'*1024*1024)
181 181 v = self.client[t]
182 182 ar = v.push(ns, block=False, track=False)
183 183 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
184 184 self.assertTrue(ar.sent)
185 185
186 186 ar = v.push(ns, block=False, track=True)
187 187 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
188 188 self.assertEquals(ar.sent, ar._tracker.done)
189 189 ar._tracker.wait()
190 190 self.assertTrue(ar.sent)
191 191 ar.get()
192 192
193 193 def test_scatter_tracked(self):
194 194 t = self.client.ids
195 195 x='x'*1024*1024
196 196 ar = self.client[t].scatter('x', x, block=False, track=False)
197 197 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
198 198 self.assertTrue(ar.sent)
199 199
200 200 ar = self.client[t].scatter('x', x, block=False, track=True)
201 201 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
202 202 self.assertEquals(ar.sent, ar._tracker.done)
203 203 ar._tracker.wait()
204 204 self.assertTrue(ar.sent)
205 205 ar.get()
206 206
207 207 def test_remote_reference(self):
208 208 v = self.client[-1]
209 209 v['a'] = 123
210 210 ra = pmod.Reference('a')
211 211 b = v.apply_sync(lambda x: x, ra)
212 212 self.assertEquals(b, 123)
213 213
214 214
215 215 def test_scatter_gather(self):
216 216 view = self.client[:]
217 217 seq1 = range(16)
218 218 view.scatter('a', seq1)
219 219 seq2 = view.gather('a', block=True)
220 220 self.assertEquals(seq2, seq1)
221 221 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
222 222
223 223 @skip_without('numpy')
224 224 def test_scatter_gather_numpy(self):
225 225 import numpy
226 226 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
227 227 view = self.client[:]
228 228 a = numpy.arange(64)
229 229 view.scatter('a', a)
230 230 b = view.gather('a', block=True)
231 231 assert_array_equal(b, a)
232 232
233 233 def test_map(self):
234 234 view = self.client[:]
235 235 def f(x):
236 236 return x**2
237 237 data = range(16)
238 238 r = view.map_sync(f, data)
239 239 self.assertEquals(r, map(f, data))
240 240
241 241 def test_scatterGatherNonblocking(self):
242 242 data = range(16)
243 243 view = self.client[:]
244 244 view.scatter('a', data, block=False)
245 245 ar = view.gather('a', block=False)
246 246 self.assertEquals(ar.get(), data)
247 247
248 248 @skip_without('numpy')
249 249 def test_scatter_gather_numpy_nonblocking(self):
250 250 import numpy
251 251 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
252 252 a = numpy.arange(64)
253 253 view = self.client[:]
254 254 ar = view.scatter('a', a, block=False)
255 255 self.assertTrue(isinstance(ar, AsyncResult))
256 256 amr = view.gather('a', block=False)
257 257 self.assertTrue(isinstance(amr, AsyncMapResult))
258 258 assert_array_equal(amr.get(), a)
259 259
260 260 def test_execute(self):
261 261 view = self.client[:]
262 262 # self.client.debug=True
263 263 execute = view.execute
264 264 ar = execute('c=30', block=False)
265 265 self.assertTrue(isinstance(ar, AsyncResult))
266 266 ar = execute('d=[0,1,2]', block=False)
267 267 self.client.wait(ar, 1)
268 268 self.assertEquals(len(ar.get()), len(self.client))
269 269 for c in view['c']:
270 270 self.assertEquals(c, 30)
271 271
272 272 def test_abort(self):
273 273 view = self.client[-1]
274 274 ar = view.execute('import time; time.sleep(0.25)', block=False)
275 275 ar2 = view.apply_async(lambda : 2)
276 276 ar3 = view.apply_async(lambda : 3)
277 277 view.abort(ar2)
278 278 view.abort(ar3.msg_ids)
279 279 self.assertRaises(error.TaskAborted, ar2.get)
280 280 self.assertRaises(error.TaskAborted, ar3.get)
281 281
282 282 def test_temp_flags(self):
283 283 view = self.client[-1]
284 284 view.block=True
285 285 with view.temp_flags(block=False):
286 286 self.assertFalse(view.block)
287 287 self.assertTrue(view.block)
288 288
289 289 def test_importer(self):
290 290 view = self.client[-1]
291 291 view.clear(block=True)
292 292 with view.importer:
293 293 import re
294 294
295 295 @interactive
296 296 def findall(pat, s):
297 297 # this globals() step isn't necessary in real code
298 298 # only to prevent a closure in the test
299 299 re = globals()['re']
300 300 return re.findall(pat, s)
301 301
302 302 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
303 303
304 304 # parallel magic tests
305 305
306 306 def test_magic_px_blocking(self):
307 307 ip = get_ipython()
308 308 v = self.client[-1]
309 309 v.activate()
310 310 v.block=True
311 311
312 312 ip.magic_px('a=5')
313 313 self.assertEquals(v['a'], 5)
314 314 ip.magic_px('a=10')
315 315 self.assertEquals(v['a'], 10)
316 316 sio = StringIO()
317 317 savestdout = sys.stdout
318 318 sys.stdout = sio
319 319 ip.magic_px('print a')
320 320 sys.stdout = savestdout
321 321 sio.read()
322 322 self.assertTrue('[stdout:%i]'%v.targets in sio.buf)
323 323 self.assertTrue(sio.buf.rstrip().endswith('10'))
324 324 self.assertRaisesRemote(ZeroDivisionError, ip.magic_px, '1/0')
325 325
326 326 def test_magic_px_nonblocking(self):
327 327 ip = get_ipython()
328 328 v = self.client[-1]
329 329 v.activate()
330 330 v.block=False
331 331
332 332 ip.magic_px('a=5')
333 333 self.assertEquals(v['a'], 5)
334 334 ip.magic_px('a=10')
335 335 self.assertEquals(v['a'], 10)
336 336 sio = StringIO()
337 337 savestdout = sys.stdout
338 338 sys.stdout = sio
339 339 ip.magic_px('print a')
340 340 sys.stdout = savestdout
341 341 sio.read()
342 342 self.assertFalse('[stdout:%i]'%v.targets in sio.buf)
343 343 ip.magic_px('1/0')
344 344 ar = v.get_result(-1)
345 345 self.assertRaisesRemote(ZeroDivisionError, ar.get)
346 346
347 347 def test_magic_autopx_blocking(self):
348 348 ip = get_ipython()
349 349 v = self.client[-1]
350 350 v.activate()
351 351 v.block=True
352 352
353 353 sio = StringIO()
354 354 savestdout = sys.stdout
355 355 sys.stdout = sio
356 356 ip.magic_autopx()
357 357 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
358 358 ip.run_cell('print b')
359 359 ip.run_cell("b/c")
360 360 ip.run_code(compile('b*=2', '', 'single'))
361 361 ip.magic_autopx()
362 362 sys.stdout = savestdout
363 363 sio.read()
364 364 output = sio.buf.strip()
365 365 self.assertTrue(output.startswith('%autopx enabled'))
366 366 self.assertTrue(output.endswith('%autopx disabled'))
367 367 self.assertTrue('RemoteError: ZeroDivisionError' in output)
368 368 ar = v.get_result(-2)
369 369 self.assertEquals(v['a'], 5)
370 370 self.assertEquals(v['b'], 20)
371 371 self.assertRaisesRemote(ZeroDivisionError, ar.get)
372 372
373 373 def test_magic_autopx_nonblocking(self):
374 374 ip = get_ipython()
375 375 v = self.client[-1]
376 376 v.activate()
377 377 v.block=False
378 378
379 379 sio = StringIO()
380 380 savestdout = sys.stdout
381 381 sys.stdout = sio
382 382 ip.magic_autopx()
383 383 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
384 384 ip.run_cell('print b')
385 385 ip.run_cell("b/c")
386 386 ip.run_code(compile('b*=2', '', 'single'))
387 387 ip.magic_autopx()
388 388 sys.stdout = savestdout
389 389 sio.read()
390 390 output = sio.buf.strip()
391 391 self.assertTrue(output.startswith('%autopx enabled'))
392 392 self.assertTrue(output.endswith('%autopx disabled'))
393 393 self.assertFalse('ZeroDivisionError' in output)
394 394 ar = v.get_result(-2)
395 395 self.assertEquals(v['a'], 5)
396 396 self.assertEquals(v['b'], 20)
397 397 self.assertRaisesRemote(ZeroDivisionError, ar.get)
398 398
399 399 def test_magic_result(self):
400 400 ip = get_ipython()
401 401 v = self.client[-1]
402 402 v.activate()
403 403 v['a'] = 111
404 404 ra = v['a']
405 405
406 406 ar = ip.magic_result()
407 407 self.assertEquals(ar.msg_ids, [v.history[-1]])
408 408 self.assertEquals(ar.get(), 111)
409 409 ar = ip.magic_result('-2')
410 410 self.assertEquals(ar.msg_ids, [v.history[-2]])
411 411
412 412 def test_unicode_execute(self):
413 413 """test executing unicode strings"""
414 414 v = self.client[-1]
415 415 v.block=True
416 416 code=u"a=u'é'"
417 417 v.execute(code)
418 418 self.assertEquals(v['a'], u'é')
419 419
420 420 def test_unicode_apply_result(self):
421 421 """test unicode apply results"""
422 422 v = self.client[-1]
423 423 r = v.apply_sync(lambda : u'é')
424 424 self.assertEquals(r, u'é')
425 425
426 426 def test_unicode_apply_arg(self):
427 427 """test passing unicode arguments to apply"""
428 428 v = self.client[-1]
429 429
430 430 @interactive
431 431 def check_unicode(a, check):
432 432 assert isinstance(a, unicode), "%r is not unicode"%a
433 433 assert isinstance(check, bytes), "%r is not bytes"%check
434 434 assert a.encode('utf8') == check, "%s != %s"%(a,check)
435 435
436 436 for s in [ u'é', u'ßø®∫','asdf'.decode() ]:
437 437 try:
438 438 v.apply_sync(check_unicode, s, s.encode('utf8'))
439 439 except error.RemoteError as e:
440 440 if e.ename == 'AssertionError':
441 441 self.fail(e.evalue)
442 442 else:
443 443 raise e
444 444
445 445
446 446
General Comments 0
You need to be logged in to leave comments. Login now