##// END OF EJS Templates
Merge pull request #2305 from minrk/render_traceback...
Min RK -
r8296:58335ad7 merge
parent child Browse files
Show More
@@ -1,335 +1,341
1 1 # encoding: utf-8
2 2
3 3 """Classes and functions for kernel related errors and exceptions.
4 4
5 5 Authors:
6 6
7 7 * Brian Granger
8 8 * Min RK
9 9 """
10 10 from __future__ import print_function
11 11
12 12 import sys
13 13 import traceback
14 14
15 15 __docformat__ = "restructuredtext en"
16 16
17 17 # Tell nose to skip this module
18 18 __test__ = {}
19 19
20 20 #-------------------------------------------------------------------------------
21 21 # Copyright (C) 2008-2011 The IPython Development Team
22 22 #
23 23 # Distributed under the terms of the BSD License. The full license is in
24 24 # the file COPYING, distributed as part of this software.
25 25 #-------------------------------------------------------------------------------
26 26
27 27 #-------------------------------------------------------------------------------
28 28 # Error classes
29 29 #-------------------------------------------------------------------------------
30 30 class IPythonError(Exception):
31 31 """Base exception that all of our exceptions inherit from.
32 32
33 33 This can be raised by code that doesn't have any more specific
34 34 information."""
35 35
36 36 pass
37 37
38 38 # Exceptions associated with the controller objects
39 39 class ControllerError(IPythonError): pass
40 40
41 41 class ControllerCreationError(ControllerError): pass
42 42
43 43
44 44 # Exceptions associated with the Engines
45 45 class EngineError(IPythonError): pass
46 46
47 47 class EngineCreationError(EngineError): pass
48 48
49 49 class KernelError(IPythonError):
50 50 pass
51 51
52 52 class NotDefined(KernelError):
53 53 def __init__(self, name):
54 54 self.name = name
55 55 self.args = (name,)
56 56
57 57 def __repr__(self):
58 58 return '<NotDefined: %s>' % self.name
59 59
60 60 __str__ = __repr__
61 61
62 62
63 63 class QueueCleared(KernelError):
64 64 pass
65 65
66 66
67 67 class IdInUse(KernelError):
68 68 pass
69 69
70 70
71 71 class ProtocolError(KernelError):
72 72 pass
73 73
74 74
75 75 class ConnectionError(KernelError):
76 76 pass
77 77
78 78
79 79 class InvalidEngineID(KernelError):
80 80 pass
81 81
82 82
83 83 class NoEnginesRegistered(KernelError):
84 84 pass
85 85
86 86
87 87 class InvalidClientID(KernelError):
88 88 pass
89 89
90 90
91 91 class InvalidDeferredID(KernelError):
92 92 pass
93 93
94 94
95 95 class SerializationError(KernelError):
96 96 pass
97 97
98 98
99 99 class MessageSizeError(KernelError):
100 100 pass
101 101
102 102
103 103 class PBMessageSizeError(MessageSizeError):
104 104 pass
105 105
106 106
107 107 class ResultNotCompleted(KernelError):
108 108 pass
109 109
110 110
111 111 class ResultAlreadyRetrieved(KernelError):
112 112 pass
113 113
114 114 class ClientError(KernelError):
115 115 pass
116 116
117 117
118 118 class TaskAborted(KernelError):
119 119 pass
120 120
121 121
122 122 class TaskTimeout(KernelError):
123 123 pass
124 124
125 125
126 126 class NotAPendingResult(KernelError):
127 127 pass
128 128
129 129
130 130 class UnpickleableException(KernelError):
131 131 pass
132 132
133 133
134 134 class AbortedPendingDeferredError(KernelError):
135 135 pass
136 136
137 137
138 138 class InvalidProperty(KernelError):
139 139 pass
140 140
141 141
142 142 class MissingBlockArgument(KernelError):
143 143 pass
144 144
145 145
146 146 class StopLocalExecution(KernelError):
147 147 pass
148 148
149 149
150 150 class SecurityError(KernelError):
151 151 pass
152 152
153 153
154 154 class FileTimeoutError(KernelError):
155 155 pass
156 156
157 157 class TimeoutError(KernelError):
158 158 pass
159 159
160 160 class UnmetDependency(KernelError):
161 161 pass
162 162
163 163 class ImpossibleDependency(UnmetDependency):
164 164 pass
165 165
166 166 class DependencyTimeout(ImpossibleDependency):
167 167 pass
168 168
169 169 class InvalidDependency(ImpossibleDependency):
170 170 pass
171 171
172 172 class RemoteError(KernelError):
173 173 """Error raised elsewhere"""
174 174 ename=None
175 175 evalue=None
176 176 traceback=None
177 177 engine_info=None
178 178
179 179 def __init__(self, ename, evalue, traceback, engine_info=None):
180 180 self.ename=ename
181 181 self.evalue=evalue
182 182 self.traceback=traceback
183 183 self.engine_info=engine_info or {}
184 184 self.args=(ename, evalue)
185 185
186 186 def __repr__(self):
187 187 engineid = self.engine_info.get('engine_id', ' ')
188 188 return "<Remote[%s]:%s(%s)>"%(engineid, self.ename, self.evalue)
189 189
190 190 def __str__(self):
191 191 return "%s(%s)" % (self.ename, self.evalue)
192 192
193 193 def render_traceback(self):
194 194 """render traceback to a list of lines"""
195 195 return (self.traceback or "No traceback available").splitlines()
196 196
197 # Special method for custom tracebacks within IPython
198 _render_traceback_ = render_traceback
197 def _render_traceback_(self):
198 """Special method for custom tracebacks within IPython.
199
200 This will be called by IPython instead of displaying the local traceback.
201
202 It should return a traceback rendered as a list of lines.
203 """
204 return self.render_traceback()
199 205
200 206 def print_traceback(self, excid=None):
201 207 """print my traceback"""
202 208 print('\n'.join(self.render_traceback()))
203 209
204 210
205 211
206 212
207 213 class TaskRejectError(KernelError):
208 214 """Exception to raise when a task should be rejected by an engine.
209 215
210 216 This exception can be used to allow a task running on an engine to test
211 217 if the engine (or the user's namespace on the engine) has the needed
212 218 task dependencies. If not, the task should raise this exception. For
213 219 the task to be retried on another engine, the task should be created
214 220 with the `retries` argument > 1.
215 221
216 222 The advantage of this approach over our older properties system is that
217 223 tasks have full access to the user's namespace on the engines and the
218 224 properties don't have to be managed or tested by the controller.
219 225 """
220 226
221 227
222 228 class CompositeError(RemoteError):
223 229 """Error for representing possibly multiple errors on engines"""
224 230 def __init__(self, message, elist):
225 231 Exception.__init__(self, *(message, elist))
226 232 # Don't use pack_exception because it will conflict with the .message
227 233 # attribute that is being deprecated in 2.6 and beyond.
228 234 self.msg = message
229 235 self.elist = elist
230 236 self.args = [ e[0] for e in elist ]
231 237
232 238 def _get_engine_str(self, ei):
233 239 if not ei:
234 240 return '[Engine Exception]'
235 241 else:
236 242 return '[%s:%s]: ' % (ei['engine_id'], ei['method'])
237 243
238 244 def _get_traceback(self, ev):
239 245 try:
240 246 tb = ev._ipython_traceback_text
241 247 except AttributeError:
242 248 return 'No traceback available'
243 249 else:
244 250 return tb
245 251
246 252 def __str__(self):
247 253 s = str(self.msg)
248 254 for en, ev, etb, ei in self.elist:
249 255 engine_str = self._get_engine_str(ei)
250 256 s = s + '\n' + engine_str + en + ': ' + str(ev)
251 257 return s
252 258
253 259 def __repr__(self):
254 260 return "CompositeError(%i)"%len(self.elist)
255 261
256 262 def render_traceback(self, excid=None):
257 263 """render one or all of my tracebacks to a list of lines"""
258 264 lines = []
259 265 if excid is None:
260 266 for (en,ev,etb,ei) in self.elist:
261 267 lines.append(self._get_engine_str(ei))
262 268 lines.extend((etb or 'No traceback available').splitlines())
263 269 lines.append('')
264 270 else:
265 271 try:
266 272 en,ev,etb,ei = self.elist[excid]
267 273 except:
268 274 raise IndexError("an exception with index %i does not exist"%excid)
269 275 else:
270 276 lines.append(self._get_engine_str(ei))
271 277 lines.extend((etb or 'No traceback available').splitlines())
272 278
273 279 return lines
274 280
275 281 def print_traceback(self, excid=None):
276 282 print('\n'.join(self.render_traceback(excid)))
277 283
278 284 def raise_exception(self, excid=0):
279 285 try:
280 286 en,ev,etb,ei = self.elist[excid]
281 287 except:
282 288 raise IndexError("an exception with index %i does not exist"%excid)
283 289 else:
284 290 raise RemoteError(en, ev, etb, ei)
285 291
286 292
287 293 def collect_exceptions(rdict_or_list, method='unspecified'):
288 294 """check a result dict for errors, and raise CompositeError if any exist.
289 295 Passthrough otherwise."""
290 296 elist = []
291 297 if isinstance(rdict_or_list, dict):
292 298 rlist = rdict_or_list.values()
293 299 else:
294 300 rlist = rdict_or_list
295 301 for r in rlist:
296 302 if isinstance(r, RemoteError):
297 303 en, ev, etb, ei = r.ename, r.evalue, r.traceback, r.engine_info
298 304 # Sometimes we could have CompositeError in our list. Just take
299 305 # the errors out of them and put them in our new list. This
300 306 # has the effect of flattening lists of CompositeErrors into one
301 307 # CompositeError
302 308 if en=='CompositeError':
303 309 for e in ev.elist:
304 310 elist.append(e)
305 311 else:
306 312 elist.append((en, ev, etb, ei))
307 313 if len(elist)==0:
308 314 return rdict_or_list
309 315 else:
310 316 msg = "one or more exceptions from call to method: %s" % (method)
311 317 # This silliness is needed so the debugger has access to the exception
312 318 # instance (e in this case)
313 319 try:
314 320 raise CompositeError(msg, elist)
315 321 except CompositeError as e:
316 322 raise e
317 323
318 324 def wrap_exception(engine_info={}):
319 325 etype, evalue, tb = sys.exc_info()
320 326 stb = traceback.format_exception(etype, evalue, tb)
321 327 exc_content = {
322 328 'status' : 'error',
323 329 'traceback' : stb,
324 330 'ename' : unicode(etype.__name__),
325 331 'evalue' : unicode(evalue),
326 332 'engine_info' : engine_info
327 333 }
328 334 return exc_content
329 335
330 336 def unwrap_exception(content):
331 337 err = RemoteError(content['ename'], content['evalue'],
332 338 ''.join(content['traceback']),
333 339 content.get('engine_info', {}))
334 340 return err
335 341
@@ -1,678 +1,703
1 1 # -*- coding: utf-8 -*-
2 2 """test View objects
3 3
4 4 Authors:
5 5
6 6 * Min RK
7 7 """
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 import sys
20 20 import platform
21 21 import time
22 22 from tempfile import mktemp
23 23 from StringIO import StringIO
24 24
25 25 import zmq
26 26 from nose import SkipTest
27 27
28 28 from IPython.testing import decorators as dec
29 29 from IPython.testing.ipunittest import ParametricTestCase
30 from IPython.utils.io import capture_output
30 31
31 32 from IPython import parallel as pmod
32 33 from IPython.parallel import error
33 34 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
34 35 from IPython.parallel import DirectView
35 36 from IPython.parallel.util import interactive
36 37
37 38 from IPython.parallel.tests import add_engines
38 39
39 40 from .clienttest import ClusterTestCase, crash, wait, skip_without
40 41
41 42 def setup():
42 43 add_engines(3, total=True)
43 44
44 45 class TestView(ClusterTestCase, ParametricTestCase):
45 46
46 47 def setUp(self):
47 48 # On Win XP, wait for resource cleanup, else parallel test group fails
48 49 if platform.system() == "Windows" and platform.win32_ver()[0] == "XP":
49 50 # 1 sec fails. 1.5 sec seems ok. Using 2 sec for margin of safety
50 51 time.sleep(2)
51 52 super(TestView, self).setUp()
52 53
53 54 def test_z_crash_mux(self):
54 55 """test graceful handling of engine death (direct)"""
55 56 raise SkipTest("crash tests disabled, due to undesirable crash reports")
56 57 # self.add_engines(1)
57 58 eid = self.client.ids[-1]
58 59 ar = self.client[eid].apply_async(crash)
59 60 self.assertRaisesRemote(error.EngineError, ar.get, 10)
60 61 eid = ar.engine_id
61 62 tic = time.time()
62 63 while eid in self.client.ids and time.time()-tic < 5:
63 64 time.sleep(.01)
64 65 self.client.spin()
65 66 self.assertFalse(eid in self.client.ids, "Engine should have died")
66 67
67 68 def test_push_pull(self):
68 69 """test pushing and pulling"""
69 70 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
70 71 t = self.client.ids[-1]
71 72 v = self.client[t]
72 73 push = v.push
73 74 pull = v.pull
74 75 v.block=True
75 76 nengines = len(self.client)
76 77 push({'data':data})
77 78 d = pull('data')
78 79 self.assertEqual(d, data)
79 80 self.client[:].push({'data':data})
80 81 d = self.client[:].pull('data', block=True)
81 82 self.assertEqual(d, nengines*[data])
82 83 ar = push({'data':data}, block=False)
83 84 self.assertTrue(isinstance(ar, AsyncResult))
84 85 r = ar.get()
85 86 ar = self.client[:].pull('data', block=False)
86 87 self.assertTrue(isinstance(ar, AsyncResult))
87 88 r = ar.get()
88 89 self.assertEqual(r, nengines*[data])
89 90 self.client[:].push(dict(a=10,b=20))
90 91 r = self.client[:].pull(('a','b'), block=True)
91 92 self.assertEqual(r, nengines*[[10,20]])
92 93
93 94 def test_push_pull_function(self):
94 95 "test pushing and pulling functions"
95 96 def testf(x):
96 97 return 2.0*x
97 98
98 99 t = self.client.ids[-1]
99 100 v = self.client[t]
100 101 v.block=True
101 102 push = v.push
102 103 pull = v.pull
103 104 execute = v.execute
104 105 push({'testf':testf})
105 106 r = pull('testf')
106 107 self.assertEqual(r(1.0), testf(1.0))
107 108 execute('r = testf(10)')
108 109 r = pull('r')
109 110 self.assertEqual(r, testf(10))
110 111 ar = self.client[:].push({'testf':testf}, block=False)
111 112 ar.get()
112 113 ar = self.client[:].pull('testf', block=False)
113 114 rlist = ar.get()
114 115 for r in rlist:
115 116 self.assertEqual(r(1.0), testf(1.0))
116 117 execute("def g(x): return x*x")
117 118 r = pull(('testf','g'))
118 119 self.assertEqual((r[0](10),r[1](10)), (testf(10), 100))
119 120
120 121 def test_push_function_globals(self):
121 122 """test that pushed functions have access to globals"""
122 123 @interactive
123 124 def geta():
124 125 return a
125 126 # self.add_engines(1)
126 127 v = self.client[-1]
127 128 v.block=True
128 129 v['f'] = geta
129 130 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
130 131 v.execute('a=5')
131 132 v.execute('b=f()')
132 133 self.assertEqual(v['b'], 5)
133 134
134 135 def test_push_function_defaults(self):
135 136 """test that pushed functions preserve default args"""
136 137 def echo(a=10):
137 138 return a
138 139 v = self.client[-1]
139 140 v.block=True
140 141 v['f'] = echo
141 142 v.execute('b=f()')
142 143 self.assertEqual(v['b'], 10)
143 144
144 145 def test_get_result(self):
145 146 """test getting results from the Hub."""
146 147 c = pmod.Client(profile='iptest')
147 148 # self.add_engines(1)
148 149 t = c.ids[-1]
149 150 v = c[t]
150 151 v2 = self.client[t]
151 152 ar = v.apply_async(wait, 1)
152 153 # give the monitor time to notice the message
153 154 time.sleep(.25)
154 155 ahr = v2.get_result(ar.msg_ids)
155 156 self.assertTrue(isinstance(ahr, AsyncHubResult))
156 157 self.assertEqual(ahr.get(), ar.get())
157 158 ar2 = v2.get_result(ar.msg_ids)
158 159 self.assertFalse(isinstance(ar2, AsyncHubResult))
159 160 c.spin()
160 161 c.close()
161 162
162 163 def test_run_newline(self):
163 164 """test that run appends newline to files"""
164 165 tmpfile = mktemp()
165 166 with open(tmpfile, 'w') as f:
166 167 f.write("""def g():
167 168 return 5
168 169 """)
169 170 v = self.client[-1]
170 171 v.run(tmpfile, block=True)
171 172 self.assertEqual(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
172 173
173 174 def test_apply_tracked(self):
174 175 """test tracking for apply"""
175 176 # self.add_engines(1)
176 177 t = self.client.ids[-1]
177 178 v = self.client[t]
178 179 v.block=False
179 180 def echo(n=1024*1024, **kwargs):
180 181 with v.temp_flags(**kwargs):
181 182 return v.apply(lambda x: x, 'x'*n)
182 183 ar = echo(1, track=False)
183 184 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
184 185 self.assertTrue(ar.sent)
185 186 ar = echo(track=True)
186 187 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
187 188 self.assertEqual(ar.sent, ar._tracker.done)
188 189 ar._tracker.wait()
189 190 self.assertTrue(ar.sent)
190 191
191 192 def test_push_tracked(self):
192 193 t = self.client.ids[-1]
193 194 ns = dict(x='x'*1024*1024)
194 195 v = self.client[t]
195 196 ar = v.push(ns, block=False, track=False)
196 197 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
197 198 self.assertTrue(ar.sent)
198 199
199 200 ar = v.push(ns, block=False, track=True)
200 201 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
201 202 ar._tracker.wait()
202 203 self.assertEqual(ar.sent, ar._tracker.done)
203 204 self.assertTrue(ar.sent)
204 205 ar.get()
205 206
206 207 def test_scatter_tracked(self):
207 208 t = self.client.ids
208 209 x='x'*1024*1024
209 210 ar = self.client[t].scatter('x', x, block=False, track=False)
210 211 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
211 212 self.assertTrue(ar.sent)
212 213
213 214 ar = self.client[t].scatter('x', x, block=False, track=True)
214 215 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
215 216 self.assertEqual(ar.sent, ar._tracker.done)
216 217 ar._tracker.wait()
217 218 self.assertTrue(ar.sent)
218 219 ar.get()
219 220
220 221 def test_remote_reference(self):
221 222 v = self.client[-1]
222 223 v['a'] = 123
223 224 ra = pmod.Reference('a')
224 225 b = v.apply_sync(lambda x: x, ra)
225 226 self.assertEqual(b, 123)
226 227
227 228
228 229 def test_scatter_gather(self):
229 230 view = self.client[:]
230 231 seq1 = range(16)
231 232 view.scatter('a', seq1)
232 233 seq2 = view.gather('a', block=True)
233 234 self.assertEqual(seq2, seq1)
234 235 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
235 236
236 237 @skip_without('numpy')
237 238 def test_scatter_gather_numpy(self):
238 239 import numpy
239 240 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
240 241 view = self.client[:]
241 242 a = numpy.arange(64)
242 243 view.scatter('a', a, block=True)
243 244 b = view.gather('a', block=True)
244 245 assert_array_equal(b, a)
245 246
246 247 def test_scatter_gather_lazy(self):
247 248 """scatter/gather with targets='all'"""
248 249 view = self.client.direct_view(targets='all')
249 250 x = range(64)
250 251 view.scatter('x', x)
251 252 gathered = view.gather('x', block=True)
252 253 self.assertEqual(gathered, x)
253 254
254 255
255 256 @dec.known_failure_py3
256 257 @skip_without('numpy')
257 258 def test_push_numpy_nocopy(self):
258 259 import numpy
259 260 view = self.client[:]
260 261 a = numpy.arange(64)
261 262 view['A'] = a
262 263 @interactive
263 264 def check_writeable(x):
264 265 return x.flags.writeable
265 266
266 267 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
267 268 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
268 269
269 270 view.push(dict(B=a))
270 271 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
271 272 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
272 273
273 274 @skip_without('numpy')
274 275 def test_apply_numpy(self):
275 276 """view.apply(f, ndarray)"""
276 277 import numpy
277 278 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
278 279
279 280 A = numpy.random.random((100,100))
280 281 view = self.client[-1]
281 282 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
282 283 B = A.astype(dt)
283 284 C = view.apply_sync(lambda x:x, B)
284 285 assert_array_equal(B,C)
285 286
286 287 @skip_without('numpy')
287 288 def test_push_pull_recarray(self):
288 289 """push/pull recarrays"""
289 290 import numpy
290 291 from numpy.testing.utils import assert_array_equal
291 292
292 293 view = self.client[-1]
293 294
294 295 R = numpy.array([
295 296 (1, 'hi', 0.),
296 297 (2**30, 'there', 2.5),
297 298 (-99999, 'world', -12345.6789),
298 299 ], [('n', int), ('s', '|S10'), ('f', float)])
299 300
300 301 view['RR'] = R
301 302 R2 = view['RR']
302 303
303 304 r_dtype, r_shape = view.apply_sync(interactive(lambda : (RR.dtype, RR.shape)))
304 305 self.assertEqual(r_dtype, R.dtype)
305 306 self.assertEqual(r_shape, R.shape)
306 307 self.assertEqual(R2.dtype, R.dtype)
307 308 self.assertEqual(R2.shape, R.shape)
308 309 assert_array_equal(R2, R)
309 310
310 311 def test_map(self):
311 312 view = self.client[:]
312 313 def f(x):
313 314 return x**2
314 315 data = range(16)
315 316 r = view.map_sync(f, data)
316 317 self.assertEqual(r, map(f, data))
317 318
318 319 def test_map_iterable(self):
319 320 """test map on iterables (direct)"""
320 321 view = self.client[:]
321 322 # 101 is prime, so it won't be evenly distributed
322 323 arr = range(101)
323 324 # ensure it will be an iterator, even in Python 3
324 325 it = iter(arr)
325 326 r = view.map_sync(lambda x:x, arr)
326 327 self.assertEqual(r, list(arr))
327 328
328 329 def test_scatter_gather_nonblocking(self):
329 330 data = range(16)
330 331 view = self.client[:]
331 332 view.scatter('a', data, block=False)
332 333 ar = view.gather('a', block=False)
333 334 self.assertEqual(ar.get(), data)
334 335
335 336 @skip_without('numpy')
336 337 def test_scatter_gather_numpy_nonblocking(self):
337 338 import numpy
338 339 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
339 340 a = numpy.arange(64)
340 341 view = self.client[:]
341 342 ar = view.scatter('a', a, block=False)
342 343 self.assertTrue(isinstance(ar, AsyncResult))
343 344 amr = view.gather('a', block=False)
344 345 self.assertTrue(isinstance(amr, AsyncMapResult))
345 346 assert_array_equal(amr.get(), a)
346 347
347 348 def test_execute(self):
348 349 view = self.client[:]
349 350 # self.client.debug=True
350 351 execute = view.execute
351 352 ar = execute('c=30', block=False)
352 353 self.assertTrue(isinstance(ar, AsyncResult))
353 354 ar = execute('d=[0,1,2]', block=False)
354 355 self.client.wait(ar, 1)
355 356 self.assertEqual(len(ar.get()), len(self.client))
356 357 for c in view['c']:
357 358 self.assertEqual(c, 30)
358 359
359 360 def test_abort(self):
360 361 view = self.client[-1]
361 362 ar = view.execute('import time; time.sleep(1)', block=False)
362 363 ar2 = view.apply_async(lambda : 2)
363 364 ar3 = view.apply_async(lambda : 3)
364 365 view.abort(ar2)
365 366 view.abort(ar3.msg_ids)
366 367 self.assertRaises(error.TaskAborted, ar2.get)
367 368 self.assertRaises(error.TaskAborted, ar3.get)
368 369
369 370 def test_abort_all(self):
370 371 """view.abort() aborts all outstanding tasks"""
371 372 view = self.client[-1]
372 373 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
373 374 view.abort()
374 375 view.wait(timeout=5)
375 376 for ar in ars[5:]:
376 377 self.assertRaises(error.TaskAborted, ar.get)
377 378
378 379 def test_temp_flags(self):
379 380 view = self.client[-1]
380 381 view.block=True
381 382 with view.temp_flags(block=False):
382 383 self.assertFalse(view.block)
383 384 self.assertTrue(view.block)
384 385
385 386 @dec.known_failure_py3
386 387 def test_importer(self):
387 388 view = self.client[-1]
388 389 view.clear(block=True)
389 390 with view.importer:
390 391 import re
391 392
392 393 @interactive
393 394 def findall(pat, s):
394 395 # this globals() step isn't necessary in real code
395 396 # only to prevent a closure in the test
396 397 re = globals()['re']
397 398 return re.findall(pat, s)
398 399
399 400 self.assertEqual(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
400 401
401 402 def test_unicode_execute(self):
402 403 """test executing unicode strings"""
403 404 v = self.client[-1]
404 405 v.block=True
405 406 if sys.version_info[0] >= 3:
406 407 code="a='é'"
407 408 else:
408 409 code=u"a=u'é'"
409 410 v.execute(code)
410 411 self.assertEqual(v['a'], u'é')
411 412
412 413 def test_unicode_apply_result(self):
413 414 """test unicode apply results"""
414 415 v = self.client[-1]
415 416 r = v.apply_sync(lambda : u'é')
416 417 self.assertEqual(r, u'é')
417 418
418 419 def test_unicode_apply_arg(self):
419 420 """test passing unicode arguments to apply"""
420 421 v = self.client[-1]
421 422
422 423 @interactive
423 424 def check_unicode(a, check):
424 425 assert isinstance(a, unicode), "%r is not unicode"%a
425 426 assert isinstance(check, bytes), "%r is not bytes"%check
426 427 assert a.encode('utf8') == check, "%s != %s"%(a,check)
427 428
428 429 for s in [ u'é', u'ßø®∫',u'asdf' ]:
429 430 try:
430 431 v.apply_sync(check_unicode, s, s.encode('utf8'))
431 432 except error.RemoteError as e:
432 433 if e.ename == 'AssertionError':
433 434 self.fail(e.evalue)
434 435 else:
435 436 raise e
436 437
437 438 def test_map_reference(self):
438 439 """view.map(<Reference>, *seqs) should work"""
439 440 v = self.client[:]
440 441 v.scatter('n', self.client.ids, flatten=True)
441 442 v.execute("f = lambda x,y: x*y")
442 443 rf = pmod.Reference('f')
443 444 nlist = list(range(10))
444 445 mlist = nlist[::-1]
445 446 expected = [ m*n for m,n in zip(mlist, nlist) ]
446 447 result = v.map_sync(rf, mlist, nlist)
447 448 self.assertEqual(result, expected)
448 449
449 450 def test_apply_reference(self):
450 451 """view.apply(<Reference>, *args) should work"""
451 452 v = self.client[:]
452 453 v.scatter('n', self.client.ids, flatten=True)
453 454 v.execute("f = lambda x: n*x")
454 455 rf = pmod.Reference('f')
455 456 result = v.apply_sync(rf, 5)
456 457 expected = [ 5*id for id in self.client.ids ]
457 458 self.assertEqual(result, expected)
458 459
459 460 def test_eval_reference(self):
460 461 v = self.client[self.client.ids[0]]
461 462 v['g'] = range(5)
462 463 rg = pmod.Reference('g[0]')
463 464 echo = lambda x:x
464 465 self.assertEqual(v.apply_sync(echo, rg), 0)
465 466
466 467 def test_reference_nameerror(self):
467 468 v = self.client[self.client.ids[0]]
468 469 r = pmod.Reference('elvis_has_left')
469 470 echo = lambda x:x
470 471 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
471 472
472 473 def test_single_engine_map(self):
473 474 e0 = self.client[self.client.ids[0]]
474 475 r = range(5)
475 476 check = [ -1*i for i in r ]
476 477 result = e0.map_sync(lambda x: -1*x, r)
477 478 self.assertEqual(result, check)
478 479
479 480 def test_len(self):
480 481 """len(view) makes sense"""
481 482 e0 = self.client[self.client.ids[0]]
482 483 yield self.assertEqual(len(e0), 1)
483 484 v = self.client[:]
484 485 yield self.assertEqual(len(v), len(self.client.ids))
485 486 v = self.client.direct_view('all')
486 487 yield self.assertEqual(len(v), len(self.client.ids))
487 488 v = self.client[:2]
488 489 yield self.assertEqual(len(v), 2)
489 490 v = self.client[:1]
490 491 yield self.assertEqual(len(v), 1)
491 492 v = self.client.load_balanced_view()
492 493 yield self.assertEqual(len(v), len(self.client.ids))
493 494 # parametric tests seem to require manual closing?
494 495 self.client.close()
495 496
496 497
497 498 # begin execute tests
498 499
499 500 def test_execute_reply(self):
500 501 e0 = self.client[self.client.ids[0]]
501 502 e0.block = True
502 503 ar = e0.execute("5", silent=False)
503 504 er = ar.get()
504 505 self.assertEqual(str(er), "<ExecuteReply[%i]: 5>" % er.execution_count)
505 506 self.assertEqual(er.pyout['data']['text/plain'], '5')
506 507
507 508 def test_execute_reply_stdout(self):
508 509 e0 = self.client[self.client.ids[0]]
509 510 e0.block = True
510 511 ar = e0.execute("print (5)", silent=False)
511 512 er = ar.get()
512 513 self.assertEqual(er.stdout.strip(), '5')
513 514
514 515 def test_execute_pyout(self):
515 516 """execute triggers pyout with silent=False"""
516 517 view = self.client[:]
517 518 ar = view.execute("5", silent=False, block=True)
518 519
519 520 expected = [{'text/plain' : '5'}] * len(view)
520 521 mimes = [ out['data'] for out in ar.pyout ]
521 522 self.assertEqual(mimes, expected)
522 523
523 524 def test_execute_silent(self):
524 525 """execute does not trigger pyout with silent=True"""
525 526 view = self.client[:]
526 527 ar = view.execute("5", block=True)
527 528 expected = [None] * len(view)
528 529 self.assertEqual(ar.pyout, expected)
529 530
530 531 def test_execute_magic(self):
531 532 """execute accepts IPython commands"""
532 533 view = self.client[:]
533 534 view.execute("a = 5")
534 535 ar = view.execute("%whos", block=True)
535 536 # this will raise, if that failed
536 537 ar.get(5)
537 538 for stdout in ar.stdout:
538 539 lines = stdout.splitlines()
539 540 self.assertEqual(lines[0].split(), ['Variable', 'Type', 'Data/Info'])
540 541 found = False
541 542 for line in lines[2:]:
542 543 split = line.split()
543 544 if split == ['a', 'int', '5']:
544 545 found = True
545 546 break
546 547 self.assertTrue(found, "whos output wrong: %s" % stdout)
547 548
548 549 def test_execute_displaypub(self):
549 550 """execute tracks display_pub output"""
550 551 view = self.client[:]
551 552 view.execute("from IPython.core.display import *")
552 553 ar = view.execute("[ display(i) for i in range(5) ]", block=True)
553 554
554 555 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
555 556 for outputs in ar.outputs:
556 557 mimes = [ out['data'] for out in outputs ]
557 558 self.assertEqual(mimes, expected)
558 559
559 560 def test_apply_displaypub(self):
560 561 """apply tracks display_pub output"""
561 562 view = self.client[:]
562 563 view.execute("from IPython.core.display import *")
563 564
564 565 @interactive
565 566 def publish():
566 567 [ display(i) for i in range(5) ]
567 568
568 569 ar = view.apply_async(publish)
569 570 ar.get(5)
570 571 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
571 572 for outputs in ar.outputs:
572 573 mimes = [ out['data'] for out in outputs ]
573 574 self.assertEqual(mimes, expected)
574 575
575 576 def test_execute_raises(self):
576 577 """exceptions in execute requests raise appropriately"""
577 578 view = self.client[-1]
578 579 ar = view.execute("1/0")
579 580 self.assertRaisesRemote(ZeroDivisionError, ar.get, 2)
580 581
582 def test_remoteerror_render_exception(self):
583 """RemoteErrors get nice tracebacks"""
584 view = self.client[-1]
585 ar = view.execute("1/0")
586 ip = get_ipython()
587 ip.user_ns['ar'] = ar
588 with capture_output() as io:
589 ip.run_cell("ar.get(2)")
590
591 self.assertTrue('ZeroDivisionError' in io.stdout, io.stdout)
592
593 def test_compositeerror_render_exception(self):
594 """CompositeErrors get nice tracebacks"""
595 view = self.client[:]
596 ar = view.execute("1/0")
597 ip = get_ipython()
598 ip.user_ns['ar'] = ar
599 with capture_output() as io:
600 ip.run_cell("ar.get(2)")
601
602 self.assertEqual(io.stdout.count('ZeroDivisionError'), len(view) * 2, io.stdout)
603 self.assertEqual(io.stdout.count('integer division'), len(view), io.stdout)
604 self.assertEqual(io.stdout.count(':execute'), len(view), io.stdout)
605
581 606 @dec.skipif_not_matplotlib
582 607 def test_magic_pylab(self):
583 608 """%pylab works on engines"""
584 609 view = self.client[-1]
585 610 ar = view.execute("%pylab inline")
586 611 # at least check if this raised:
587 612 reply = ar.get(5)
588 613 # include imports, in case user config
589 614 ar = view.execute("plot(rand(100))", silent=False)
590 615 reply = ar.get(5)
591 616 self.assertEqual(len(reply.outputs), 1)
592 617 output = reply.outputs[0]
593 618 self.assertTrue("data" in output)
594 619 data = output['data']
595 620 self.assertTrue("image/png" in data)
596 621
597 622 def test_func_default_func(self):
598 623 """interactively defined function as apply func default"""
599 624 def foo():
600 625 return 'foo'
601 626
602 627 def bar(f=foo):
603 628 return f()
604 629
605 630 view = self.client[-1]
606 631 ar = view.apply_async(bar)
607 632 r = ar.get(10)
608 633 self.assertEqual(r, 'foo')
609 634 def test_data_pub_single(self):
610 635 view = self.client[-1]
611 636 ar = view.execute('\n'.join([
612 637 'from IPython.zmq.datapub import publish_data',
613 638 'for i in range(5):',
614 639 ' publish_data(dict(i=i))'
615 640 ]), block=False)
616 641 self.assertTrue(isinstance(ar.data, dict))
617 642 ar.get(5)
618 643 self.assertEqual(ar.data, dict(i=4))
619 644
620 645 def test_data_pub(self):
621 646 view = self.client[:]
622 647 ar = view.execute('\n'.join([
623 648 'from IPython.zmq.datapub import publish_data',
624 649 'for i in range(5):',
625 650 ' publish_data(dict(i=i))'
626 651 ]), block=False)
627 652 self.assertTrue(all(isinstance(d, dict) for d in ar.data))
628 653 ar.get(5)
629 654 self.assertEqual(ar.data, [dict(i=4)] * len(ar))
630 655
631 656 def test_can_list_arg(self):
632 657 """args in lists are canned"""
633 658 view = self.client[-1]
634 659 view['a'] = 128
635 660 rA = pmod.Reference('a')
636 661 ar = view.apply_async(lambda x: x, [rA])
637 662 r = ar.get(5)
638 663 self.assertEqual(r, [128])
639 664
640 665 def test_can_dict_arg(self):
641 666 """args in dicts are canned"""
642 667 view = self.client[-1]
643 668 view['a'] = 128
644 669 rA = pmod.Reference('a')
645 670 ar = view.apply_async(lambda x: x, dict(foo=rA))
646 671 r = ar.get(5)
647 672 self.assertEqual(r, dict(foo=128))
648 673
649 674 def test_can_list_kwarg(self):
650 675 """kwargs in lists are canned"""
651 676 view = self.client[-1]
652 677 view['a'] = 128
653 678 rA = pmod.Reference('a')
654 679 ar = view.apply_async(lambda x=5: x, x=[rA])
655 680 r = ar.get(5)
656 681 self.assertEqual(r, [128])
657 682
658 683 def test_can_dict_kwarg(self):
659 684 """kwargs in dicts are canned"""
660 685 view = self.client[-1]
661 686 view['a'] = 128
662 687 rA = pmod.Reference('a')
663 688 ar = view.apply_async(lambda x=5: x, dict(foo=rA))
664 689 r = ar.get(5)
665 690 self.assertEqual(r, dict(foo=128))
666 691
667 692 def test_map_ref(self):
668 693 """view.map works with references"""
669 694 view = self.client[:]
670 695 ranks = sorted(self.client.ids)
671 696 view.scatter('rank', ranks, flatten=True)
672 697 rrank = pmod.Reference('rank')
673 698
674 699 amr = view.map_async(lambda x: x*2, [rrank] * len(view))
675 700 drank = amr.get(5)
676 701 self.assertEqual(drank, [ r*2 for r in ranks ])
677 702
678 703
General Comments 0
You need to be logged in to leave comments. Login now