##// END OF EJS Templates
add basic unicode tests in test_view...
MinRK -
Show More
@@ -1,418 +1,452
1 1 """test View objects"""
2 # -*- coding: utf-8 -*-
2 3 #-------------------------------------------------------------------------------
3 4 # Copyright (C) 2011 The IPython Development Team
4 5 #
5 6 # Distributed under the terms of the BSD License. The full license is in
6 7 # the file COPYING, distributed as part of this software.
7 8 #-------------------------------------------------------------------------------
8 9
9 10 #-------------------------------------------------------------------------------
10 11 # Imports
11 12 #-------------------------------------------------------------------------------
12 13
13 14 import sys
14 15 import time
15 16 from tempfile import mktemp
16 17 from StringIO import StringIO
17 18
18 19 import zmq
19 20
20 21 from IPython import parallel as pmod
21 22 from IPython.parallel import error
22 23 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
23 24 from IPython.parallel import LoadBalancedView, DirectView
24 25 from IPython.parallel.util import interactive
25 26
26 27 from IPython.parallel.tests import add_engines
27 28
28 29 from .clienttest import ClusterTestCase, crash, wait, skip_without
29 30
30 31 def setup():
31 32 add_engines(3)
32 33
33 34 class TestView(ClusterTestCase):
34 35
35 def test_crash_task(self):
36 def test_z_crash_task(self):
36 37 """test graceful handling of engine death (balanced)"""
37 38 # self.add_engines(1)
38 39 ar = self.client[-1].apply_async(crash)
39 40 self.assertRaisesRemote(error.EngineError, ar.get)
40 41 eid = ar.engine_id
41 42 tic = time.time()
42 43 while eid in self.client.ids and time.time()-tic < 5:
43 44 time.sleep(.01)
44 45 self.client.spin()
45 46 self.assertFalse(eid in self.client.ids, "Engine should have died")
46 47
47 def test_crash_mux(self):
48 def test_z_crash_mux(self):
48 49 """test graceful handling of engine death (direct)"""
49 50 # self.add_engines(1)
50 51 eid = self.client.ids[-1]
51 52 ar = self.client[eid].apply_async(crash)
52 53 self.assertRaisesRemote(error.EngineError, ar.get)
53 54 eid = ar.engine_id
54 55 tic = time.time()
55 56 while eid in self.client.ids and time.time()-tic < 5:
56 57 time.sleep(.01)
57 58 self.client.spin()
58 59 self.assertFalse(eid in self.client.ids, "Engine should have died")
59 60
60 61 def test_push_pull(self):
61 62 """test pushing and pulling"""
62 63 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
63 64 t = self.client.ids[-1]
64 65 v = self.client[t]
65 66 push = v.push
66 67 pull = v.pull
67 68 v.block=True
68 69 nengines = len(self.client)
69 70 push({'data':data})
70 71 d = pull('data')
71 72 self.assertEquals(d, data)
72 73 self.client[:].push({'data':data})
73 74 d = self.client[:].pull('data', block=True)
74 75 self.assertEquals(d, nengines*[data])
75 76 ar = push({'data':data}, block=False)
76 77 self.assertTrue(isinstance(ar, AsyncResult))
77 78 r = ar.get()
78 79 ar = self.client[:].pull('data', block=False)
79 80 self.assertTrue(isinstance(ar, AsyncResult))
80 81 r = ar.get()
81 82 self.assertEquals(r, nengines*[data])
82 83 self.client[:].push(dict(a=10,b=20))
83 84 r = self.client[:].pull(('a','b'), block=True)
84 85 self.assertEquals(r, nengines*[[10,20]])
85 86
86 87 def test_push_pull_function(self):
87 88 "test pushing and pulling functions"
88 89 def testf(x):
89 90 return 2.0*x
90 91
91 92 t = self.client.ids[-1]
92 93 v = self.client[t]
93 94 v.block=True
94 95 push = v.push
95 96 pull = v.pull
96 97 execute = v.execute
97 98 push({'testf':testf})
98 99 r = pull('testf')
99 100 self.assertEqual(r(1.0), testf(1.0))
100 101 execute('r = testf(10)')
101 102 r = pull('r')
102 103 self.assertEquals(r, testf(10))
103 104 ar = self.client[:].push({'testf':testf}, block=False)
104 105 ar.get()
105 106 ar = self.client[:].pull('testf', block=False)
106 107 rlist = ar.get()
107 108 for r in rlist:
108 109 self.assertEqual(r(1.0), testf(1.0))
109 110 execute("def g(x): return x*x")
110 111 r = pull(('testf','g'))
111 112 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
112 113
113 114 def test_push_function_globals(self):
114 115 """test that pushed functions have access to globals"""
115 116 @interactive
116 117 def geta():
117 118 return a
118 119 # self.add_engines(1)
119 120 v = self.client[-1]
120 121 v.block=True
121 122 v['f'] = geta
122 123 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
123 124 v.execute('a=5')
124 125 v.execute('b=f()')
125 126 self.assertEquals(v['b'], 5)
126 127
127 128 def test_push_function_defaults(self):
128 129 """test that pushed functions preserve default args"""
129 130 def echo(a=10):
130 131 return a
131 132 v = self.client[-1]
132 133 v.block=True
133 134 v['f'] = echo
134 135 v.execute('b=f()')
135 136 self.assertEquals(v['b'], 10)
136 137
137 138 def test_get_result(self):
138 139 """test getting results from the Hub."""
139 140 c = pmod.Client(profile='iptest')
140 141 # self.add_engines(1)
141 142 t = c.ids[-1]
142 143 v = c[t]
143 144 v2 = self.client[t]
144 145 ar = v.apply_async(wait, 1)
145 146 # give the monitor time to notice the message
146 147 time.sleep(.25)
147 148 ahr = v2.get_result(ar.msg_ids)
148 149 self.assertTrue(isinstance(ahr, AsyncHubResult))
149 150 self.assertEquals(ahr.get(), ar.get())
150 151 ar2 = v2.get_result(ar.msg_ids)
151 152 self.assertFalse(isinstance(ar2, AsyncHubResult))
152 153 c.spin()
153 154 c.close()
154 155
155 156 def test_run_newline(self):
156 157 """test that run appends newline to files"""
157 158 tmpfile = mktemp()
158 159 with open(tmpfile, 'w') as f:
159 160 f.write("""def g():
160 161 return 5
161 162 """)
162 163 v = self.client[-1]
163 164 v.run(tmpfile, block=True)
164 165 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
165 166
166 167 def test_apply_tracked(self):
167 168 """test tracking for apply"""
168 169 # self.add_engines(1)
169 170 t = self.client.ids[-1]
170 171 v = self.client[t]
171 172 v.block=False
172 173 def echo(n=1024*1024, **kwargs):
173 174 with v.temp_flags(**kwargs):
174 175 return v.apply(lambda x: x, 'x'*n)
175 176 ar = echo(1, track=False)
176 177 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
177 178 self.assertTrue(ar.sent)
178 179 ar = echo(track=True)
179 180 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
180 181 self.assertEquals(ar.sent, ar._tracker.done)
181 182 ar._tracker.wait()
182 183 self.assertTrue(ar.sent)
183 184
184 185 def test_push_tracked(self):
185 186 t = self.client.ids[-1]
186 187 ns = dict(x='x'*1024*1024)
187 188 v = self.client[t]
188 189 ar = v.push(ns, block=False, track=False)
189 190 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
190 191 self.assertTrue(ar.sent)
191 192
192 193 ar = v.push(ns, block=False, track=True)
193 194 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
194 195 self.assertEquals(ar.sent, ar._tracker.done)
195 196 ar._tracker.wait()
196 197 self.assertTrue(ar.sent)
197 198 ar.get()
198 199
199 200 def test_scatter_tracked(self):
200 201 t = self.client.ids
201 202 x='x'*1024*1024
202 203 ar = self.client[t].scatter('x', x, block=False, track=False)
203 204 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
204 205 self.assertTrue(ar.sent)
205 206
206 207 ar = self.client[t].scatter('x', x, block=False, track=True)
207 208 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
208 209 self.assertEquals(ar.sent, ar._tracker.done)
209 210 ar._tracker.wait()
210 211 self.assertTrue(ar.sent)
211 212 ar.get()
212 213
213 214 def test_remote_reference(self):
214 215 v = self.client[-1]
215 216 v['a'] = 123
216 217 ra = pmod.Reference('a')
217 218 b = v.apply_sync(lambda x: x, ra)
218 219 self.assertEquals(b, 123)
219 220
220 221
221 222 def test_scatter_gather(self):
222 223 view = self.client[:]
223 224 seq1 = range(16)
224 225 view.scatter('a', seq1)
225 226 seq2 = view.gather('a', block=True)
226 227 self.assertEquals(seq2, seq1)
227 228 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
228 229
229 230 @skip_without('numpy')
230 231 def test_scatter_gather_numpy(self):
231 232 import numpy
232 233 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
233 234 view = self.client[:]
234 235 a = numpy.arange(64)
235 236 view.scatter('a', a)
236 237 b = view.gather('a', block=True)
237 238 assert_array_equal(b, a)
238 239
239 240 def test_map(self):
240 241 view = self.client[:]
241 242 def f(x):
242 243 return x**2
243 244 data = range(16)
244 245 r = view.map_sync(f, data)
245 246 self.assertEquals(r, map(f, data))
246 247
247 248 def test_scatterGatherNonblocking(self):
248 249 data = range(16)
249 250 view = self.client[:]
250 251 view.scatter('a', data, block=False)
251 252 ar = view.gather('a', block=False)
252 253 self.assertEquals(ar.get(), data)
253 254
254 255 @skip_without('numpy')
255 256 def test_scatter_gather_numpy_nonblocking(self):
256 257 import numpy
257 258 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
258 259 a = numpy.arange(64)
259 260 view = self.client[:]
260 261 ar = view.scatter('a', a, block=False)
261 262 self.assertTrue(isinstance(ar, AsyncResult))
262 263 amr = view.gather('a', block=False)
263 264 self.assertTrue(isinstance(amr, AsyncMapResult))
264 265 assert_array_equal(amr.get(), a)
265 266
266 267 def test_execute(self):
267 268 view = self.client[:]
268 269 # self.client.debug=True
269 270 execute = view.execute
270 271 ar = execute('c=30', block=False)
271 272 self.assertTrue(isinstance(ar, AsyncResult))
272 273 ar = execute('d=[0,1,2]', block=False)
273 274 self.client.wait(ar, 1)
274 275 self.assertEquals(len(ar.get()), len(self.client))
275 276 for c in view['c']:
276 277 self.assertEquals(c, 30)
277 278
278 279 def test_abort(self):
279 280 view = self.client[-1]
280 281 ar = view.execute('import time; time.sleep(0.25)', block=False)
281 282 ar2 = view.apply_async(lambda : 2)
282 283 ar3 = view.apply_async(lambda : 3)
283 284 view.abort(ar2)
284 285 view.abort(ar3.msg_ids)
285 286 self.assertRaises(error.TaskAborted, ar2.get)
286 287 self.assertRaises(error.TaskAborted, ar3.get)
287 288
288 289 def test_temp_flags(self):
289 290 view = self.client[-1]
290 291 view.block=True
291 292 with view.temp_flags(block=False):
292 293 self.assertFalse(view.block)
293 294 self.assertTrue(view.block)
294 295
295 296 def test_importer(self):
296 297 view = self.client[-1]
297 298 view.clear(block=True)
298 299 with view.importer:
299 300 import re
300 301
301 302 @interactive
302 303 def findall(pat, s):
303 304 # this globals() step isn't necessary in real code
304 305 # only to prevent a closure in the test
305 306 re = globals()['re']
306 307 return re.findall(pat, s)
307 308
308 309 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
309 310
310 311 # parallel magic tests
311 312
312 313 def test_magic_px_blocking(self):
313 314 ip = get_ipython()
314 315 v = self.client[-1]
315 316 v.activate()
316 317 v.block=True
317 318
318 319 ip.magic_px('a=5')
319 320 self.assertEquals(v['a'], 5)
320 321 ip.magic_px('a=10')
321 322 self.assertEquals(v['a'], 10)
322 323 sio = StringIO()
323 324 savestdout = sys.stdout
324 325 sys.stdout = sio
325 326 ip.magic_px('print a')
326 327 sys.stdout = savestdout
327 328 sio.read()
328 329 self.assertTrue('[stdout:%i]'%v.targets in sio.buf)
329 330 self.assertRaisesRemote(ZeroDivisionError, ip.magic_px, '1/0')
330 331
331 332 def test_magic_px_nonblocking(self):
332 333 ip = get_ipython()
333 334 v = self.client[-1]
334 335 v.activate()
335 336 v.block=False
336 337
337 338 ip.magic_px('a=5')
338 339 self.assertEquals(v['a'], 5)
339 340 ip.magic_px('a=10')
340 341 self.assertEquals(v['a'], 10)
341 342 sio = StringIO()
342 343 savestdout = sys.stdout
343 344 sys.stdout = sio
344 345 ip.magic_px('print a')
345 346 sys.stdout = savestdout
346 347 sio.read()
347 348 self.assertFalse('[stdout:%i]'%v.targets in sio.buf)
348 349 ip.magic_px('1/0')
349 350 ar = v.get_result(-1)
350 351 self.assertRaisesRemote(ZeroDivisionError, ar.get)
351 352
352 353 def test_magic_autopx_blocking(self):
353 354 ip = get_ipython()
354 355 v = self.client[-1]
355 356 v.activate()
356 357 v.block=True
357 358
358 359 sio = StringIO()
359 360 savestdout = sys.stdout
360 361 sys.stdout = sio
361 362 ip.magic_autopx()
362 363 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
363 364 ip.run_cell('print b')
364 365 ip.run_cell("b/c")
365 366 ip.run_code(compile('b*=2', '', 'single'))
366 367 ip.magic_autopx()
367 368 sys.stdout = savestdout
368 369 sio.read()
369 370 output = sio.buf.strip()
370 371 self.assertTrue(output.startswith('%autopx enabled'))
371 372 self.assertTrue(output.endswith('%autopx disabled'))
372 373 self.assertTrue('RemoteError: ZeroDivisionError' in output)
373 374 ar = v.get_result(-2)
374 375 self.assertEquals(v['a'], 5)
375 376 self.assertEquals(v['b'], 20)
376 377 self.assertRaisesRemote(ZeroDivisionError, ar.get)
377 378
378 379 def test_magic_autopx_nonblocking(self):
379 380 ip = get_ipython()
380 381 v = self.client[-1]
381 382 v.activate()
382 383 v.block=False
383 384
384 385 sio = StringIO()
385 386 savestdout = sys.stdout
386 387 sys.stdout = sio
387 388 ip.magic_autopx()
388 389 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
389 390 ip.run_cell('print b')
390 391 ip.run_cell("b/c")
391 392 ip.run_code(compile('b*=2', '', 'single'))
392 393 ip.magic_autopx()
393 394 sys.stdout = savestdout
394 395 sio.read()
395 396 output = sio.buf.strip()
396 397 self.assertTrue(output.startswith('%autopx enabled'))
397 398 self.assertTrue(output.endswith('%autopx disabled'))
398 399 self.assertFalse('ZeroDivisionError' in output)
399 400 ar = v.get_result(-2)
400 401 self.assertEquals(v['a'], 5)
401 402 self.assertEquals(v['b'], 20)
402 403 self.assertRaisesRemote(ZeroDivisionError, ar.get)
403 404
404 405 def test_magic_result(self):
405 406 ip = get_ipython()
406 407 v = self.client[-1]
407 408 v.activate()
408 409 v['a'] = 111
409 410 ra = v['a']
410 411
411 412 ar = ip.magic_result()
412 413 self.assertEquals(ar.msg_ids, [v.history[-1]])
413 414 self.assertEquals(ar.get(), 111)
414 415 ar = ip.magic_result('-2')
415 416 self.assertEquals(ar.msg_ids, [v.history[-2]])
417
418 def test_unicode_execute(self):
419 """test executing unicode strings"""
420 v = self.client[-1]
421 v.block=True
422 code=u"a=u'é'"
423 v.execute(code)
424 self.assertEquals(v['a'], u'é')
425
426 def test_unicode_apply_result(self):
427 """test unicode apply results"""
428 v = self.client[-1]
429 r = v.apply_sync(lambda : u'é')
430 self.assertEquals(r, u'é')
431
432 def test_unicode_apply_arg(self):
433 """test passing unicode arguments to apply"""
434 v = self.client[-1]
435
436 @interactive
437 def check_unicode(a, check):
438 assert isinstance(a, unicode), "%r is not unicode"%a
439 assert isinstance(check, bytes), "%r is not bytes"%check
440 assert a.encode('utf8') == check, "%s != %s"%(a,check)
441
442 for s in [ u'é', u'ßø®∫','asdf'.decode() ]:
443 try:
444 v.apply_sync(check_unicode, s, s.encode('utf8'))
445 except error.RemoteError as e:
446 if e.ename == 'AssertionError':
447 self.fail(e.evalue)
448 else:
449 raise e
416 450
417 451
418 452
General Comments 0
You need to be logged in to leave comments. Login now