##// END OF EJS Templates
truncate potentially long CompositeErrors...
MinRK -
Show More
@@ -1,346 +1,354 b''
1 1 # encoding: utf-8
2 2
3 3 """Classes and functions for kernel related errors and exceptions.
4 4
5 5 Inheritance diagram:
6 6
7 7 .. inheritance-diagram:: IPython.parallel.error
8 8 :parts: 3
9 9
10 10 Authors:
11 11
12 12 * Brian Granger
13 13 * Min RK
14 14 """
15 15 from __future__ import print_function
16 16
17 17 import sys
18 18 import traceback
19 19
20 20 __docformat__ = "restructuredtext en"
21 21
22 22 # Tell nose to skip this module
23 23 __test__ = {}
24 24
25 25 #-------------------------------------------------------------------------------
26 26 # Copyright (C) 2008-2011 The IPython Development Team
27 27 #
28 28 # Distributed under the terms of the BSD License. The full license is in
29 29 # the file COPYING, distributed as part of this software.
30 30 #-------------------------------------------------------------------------------
31 31
32 32 #-------------------------------------------------------------------------------
33 33 # Error classes
34 34 #-------------------------------------------------------------------------------
35 35 class IPythonError(Exception):
36 36 """Base exception that all of our exceptions inherit from.
37 37
38 38 This can be raised by code that doesn't have any more specific
39 39 information."""
40 40
41 41 pass
42 42
43 43 # Exceptions associated with the controller objects
44 44 class ControllerError(IPythonError): pass
45 45
46 46 class ControllerCreationError(ControllerError): pass
47 47
48 48
49 49 # Exceptions associated with the Engines
50 50 class EngineError(IPythonError): pass
51 51
52 52 class EngineCreationError(EngineError): pass
53 53
54 54 class KernelError(IPythonError):
55 55 pass
56 56
57 57 class NotDefined(KernelError):
58 58 def __init__(self, name):
59 59 self.name = name
60 60 self.args = (name,)
61 61
62 62 def __repr__(self):
63 63 return '<NotDefined: %s>' % self.name
64 64
65 65 __str__ = __repr__
66 66
67 67
68 68 class QueueCleared(KernelError):
69 69 pass
70 70
71 71
72 72 class IdInUse(KernelError):
73 73 pass
74 74
75 75
76 76 class ProtocolError(KernelError):
77 77 pass
78 78
79 79
80 80 class ConnectionError(KernelError):
81 81 pass
82 82
83 83
84 84 class InvalidEngineID(KernelError):
85 85 pass
86 86
87 87
88 88 class NoEnginesRegistered(KernelError):
89 89 pass
90 90
91 91
92 92 class InvalidClientID(KernelError):
93 93 pass
94 94
95 95
96 96 class InvalidDeferredID(KernelError):
97 97 pass
98 98
99 99
100 100 class SerializationError(KernelError):
101 101 pass
102 102
103 103
104 104 class MessageSizeError(KernelError):
105 105 pass
106 106
107 107
108 108 class PBMessageSizeError(MessageSizeError):
109 109 pass
110 110
111 111
112 112 class ResultNotCompleted(KernelError):
113 113 pass
114 114
115 115
116 116 class ResultAlreadyRetrieved(KernelError):
117 117 pass
118 118
119 119 class ClientError(KernelError):
120 120 pass
121 121
122 122
123 123 class TaskAborted(KernelError):
124 124 pass
125 125
126 126
127 127 class TaskTimeout(KernelError):
128 128 pass
129 129
130 130
131 131 class NotAPendingResult(KernelError):
132 132 pass
133 133
134 134
135 135 class UnpickleableException(KernelError):
136 136 pass
137 137
138 138
139 139 class AbortedPendingDeferredError(KernelError):
140 140 pass
141 141
142 142
143 143 class InvalidProperty(KernelError):
144 144 pass
145 145
146 146
147 147 class MissingBlockArgument(KernelError):
148 148 pass
149 149
150 150
151 151 class StopLocalExecution(KernelError):
152 152 pass
153 153
154 154
155 155 class SecurityError(KernelError):
156 156 pass
157 157
158 158
159 159 class FileTimeoutError(KernelError):
160 160 pass
161 161
162 162 class TimeoutError(KernelError):
163 163 pass
164 164
165 165 class UnmetDependency(KernelError):
166 166 pass
167 167
168 168 class ImpossibleDependency(UnmetDependency):
169 169 pass
170 170
171 171 class DependencyTimeout(ImpossibleDependency):
172 172 pass
173 173
174 174 class InvalidDependency(ImpossibleDependency):
175 175 pass
176 176
177 177 class RemoteError(KernelError):
178 178 """Error raised elsewhere"""
179 179 ename=None
180 180 evalue=None
181 181 traceback=None
182 182 engine_info=None
183 183
184 184 def __init__(self, ename, evalue, traceback, engine_info=None):
185 185 self.ename=ename
186 186 self.evalue=evalue
187 187 self.traceback=traceback
188 188 self.engine_info=engine_info or {}
189 189 self.args=(ename, evalue)
190 190
191 191 def __repr__(self):
192 192 engineid = self.engine_info.get('engine_id', ' ')
193 193 return "<Remote[%s]:%s(%s)>"%(engineid, self.ename, self.evalue)
194 194
195 195 def __str__(self):
196 196 return "%s(%s)" % (self.ename, self.evalue)
197 197
198 198 def render_traceback(self):
199 199 """render traceback to a list of lines"""
200 200 return (self.traceback or "No traceback available").splitlines()
201 201
202 202 def _render_traceback_(self):
203 203 """Special method for custom tracebacks within IPython.
204 204
205 205 This will be called by IPython instead of displaying the local traceback.
206 206
207 207 It should return a traceback rendered as a list of lines.
208 208 """
209 209 return self.render_traceback()
210 210
211 211 def print_traceback(self, excid=None):
212 212 """print my traceback"""
213 213 print('\n'.join(self.render_traceback()))
214 214
215 215
216 216
217 217
218 218 class TaskRejectError(KernelError):
219 219 """Exception to raise when a task should be rejected by an engine.
220 220
221 221 This exception can be used to allow a task running on an engine to test
222 222 if the engine (or the user's namespace on the engine) has the needed
223 223 task dependencies. If not, the task should raise this exception. For
224 224 the task to be retried on another engine, the task should be created
225 225 with the `retries` argument > 1.
226 226
227 227 The advantage of this approach over our older properties system is that
228 228 tasks have full access to the user's namespace on the engines and the
229 229 properties don't have to be managed or tested by the controller.
230 230 """
231 231
232 232
233 233 class CompositeError(RemoteError):
234 234 """Error for representing possibly multiple errors on engines"""
235 tb_limit = 4 # limit on how many tracebacks to draw
236
235 237 def __init__(self, message, elist):
236 238 Exception.__init__(self, *(message, elist))
237 239 # Don't use pack_exception because it will conflict with the .message
238 240 # attribute that is being deprecated in 2.6 and beyond.
239 241 self.msg = message
240 242 self.elist = elist
241 243 self.args = [ e[0] for e in elist ]
242 244
243 245 def _get_engine_str(self, ei):
244 246 if not ei:
245 247 return '[Engine Exception]'
246 248 else:
247 249 return '[%s:%s]: ' % (ei['engine_id'], ei['method'])
248 250
249 251 def _get_traceback(self, ev):
250 252 try:
251 253 tb = ev._ipython_traceback_text
252 254 except AttributeError:
253 255 return 'No traceback available'
254 256 else:
255 257 return tb
256 258
257 259 def __str__(self):
258 260 s = str(self.msg)
259 for en, ev, etb, ei in self.elist:
261 for en, ev, etb, ei in self.elist[:self.tb_limit]:
260 262 engine_str = self._get_engine_str(ei)
261 263 s = s + '\n' + engine_str + en + ': ' + str(ev)
264 if len(self.elist) > self.tb_limit:
265 s = s + '\n.... %i more exceptions ...' % (len(self.elist) - self.tb_limit)
262 266 return s
263 267
264 268 def __repr__(self):
265 return "CompositeError(%i)"%len(self.elist)
269 return "CompositeError(%i)" % len(self.elist)
266 270
267 271 def render_traceback(self, excid=None):
268 272 """render one or all of my tracebacks to a list of lines"""
269 273 lines = []
270 274 if excid is None:
271 for (en,ev,etb,ei) in self.elist:
275 for (en,ev,etb,ei) in self.elist[:self.tb_limit]:
272 276 lines.append(self._get_engine_str(ei))
273 277 lines.extend((etb or 'No traceback available').splitlines())
274 278 lines.append('')
279 if len(self.elist) > self.tb_limit:
280 lines.append(
281 '... %i more exceptions ...' % (len(self.elist) - self.tb_limit)
282 )
275 283 else:
276 284 try:
277 285 en,ev,etb,ei = self.elist[excid]
278 286 except:
279 287 raise IndexError("an exception with index %i does not exist"%excid)
280 288 else:
281 289 lines.append(self._get_engine_str(ei))
282 290 lines.extend((etb or 'No traceback available').splitlines())
283 291
284 292 return lines
285 293
286 294 def print_traceback(self, excid=None):
287 295 print('\n'.join(self.render_traceback(excid)))
288 296
289 297 def raise_exception(self, excid=0):
290 298 try:
291 299 en,ev,etb,ei = self.elist[excid]
292 300 except:
293 301 raise IndexError("an exception with index %i does not exist"%excid)
294 302 else:
295 303 raise RemoteError(en, ev, etb, ei)
296 304
297 305
298 306 def collect_exceptions(rdict_or_list, method='unspecified'):
299 307 """check a result dict for errors, and raise CompositeError if any exist.
300 308 Passthrough otherwise."""
301 309 elist = []
302 310 if isinstance(rdict_or_list, dict):
303 311 rlist = rdict_or_list.values()
304 312 else:
305 313 rlist = rdict_or_list
306 314 for r in rlist:
307 315 if isinstance(r, RemoteError):
308 316 en, ev, etb, ei = r.ename, r.evalue, r.traceback, r.engine_info
309 317 # Sometimes we could have CompositeError in our list. Just take
310 318 # the errors out of them and put them in our new list. This
311 319 # has the effect of flattening lists of CompositeErrors into one
312 320 # CompositeError
313 321 if en=='CompositeError':
314 322 for e in ev.elist:
315 323 elist.append(e)
316 324 else:
317 325 elist.append((en, ev, etb, ei))
318 326 if len(elist)==0:
319 327 return rdict_or_list
320 328 else:
321 329 msg = "one or more exceptions from call to method: %s" % (method)
322 330 # This silliness is needed so the debugger has access to the exception
323 331 # instance (e in this case)
324 332 try:
325 333 raise CompositeError(msg, elist)
326 334 except CompositeError as e:
327 335 raise e
328 336
329 337 def wrap_exception(engine_info={}):
330 338 etype, evalue, tb = sys.exc_info()
331 339 stb = traceback.format_exception(etype, evalue, tb)
332 340 exc_content = {
333 341 'status' : 'error',
334 342 'traceback' : stb,
335 343 'ename' : unicode(etype.__name__),
336 344 'evalue' : unicode(evalue),
337 345 'engine_info' : engine_info
338 346 }
339 347 return exc_content
340 348
341 349 def unwrap_exception(content):
342 350 err = RemoteError(content['ename'], content['evalue'],
343 351 ''.join(content['traceback']),
344 352 content.get('engine_info', {}))
345 353 return err
346 354
@@ -1,737 +1,764 b''
1 1 # -*- coding: utf-8 -*-
2 2 """test View objects
3 3
4 4 Authors:
5 5
6 6 * Min RK
7 7 """
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 import sys
20 20 import platform
21 21 import time
22 22 from tempfile import mktemp
23 23 from StringIO import StringIO
24 24
25 25 import zmq
26 26 from nose import SkipTest
27 27 from nose.plugins.attrib import attr
28 28
29 29 from IPython.testing import decorators as dec
30 30 from IPython.testing.ipunittest import ParametricTestCase
31 31 from IPython.utils.io import capture_output
32 32
33 33 from IPython import parallel as pmod
34 34 from IPython.parallel import error
35 35 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
36 36 from IPython.parallel import DirectView
37 37 from IPython.parallel.util import interactive
38 38
39 39 from IPython.parallel.tests import add_engines
40 40
41 41 from .clienttest import ClusterTestCase, crash, wait, skip_without
42 42
43 43 def setup():
44 44 add_engines(3, total=True)
45 45
46 46 class TestView(ClusterTestCase, ParametricTestCase):
47 47
48 48 def setUp(self):
49 49 # On Win XP, wait for resource cleanup, else parallel test group fails
50 50 if platform.system() == "Windows" and platform.win32_ver()[0] == "XP":
51 51 # 1 sec fails. 1.5 sec seems ok. Using 2 sec for margin of safety
52 52 time.sleep(2)
53 53 super(TestView, self).setUp()
54 54
55 55 @attr('crash')
56 56 def test_z_crash_mux(self):
57 57 """test graceful handling of engine death (direct)"""
58 58 # self.add_engines(1)
59 59 eid = self.client.ids[-1]
60 60 ar = self.client[eid].apply_async(crash)
61 61 self.assertRaisesRemote(error.EngineError, ar.get, 10)
62 62 eid = ar.engine_id
63 63 tic = time.time()
64 64 while eid in self.client.ids and time.time()-tic < 5:
65 65 time.sleep(.01)
66 66 self.client.spin()
67 67 self.assertFalse(eid in self.client.ids, "Engine should have died")
68 68
69 69 def test_push_pull(self):
70 70 """test pushing and pulling"""
71 71 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
72 72 t = self.client.ids[-1]
73 73 v = self.client[t]
74 74 push = v.push
75 75 pull = v.pull
76 76 v.block=True
77 77 nengines = len(self.client)
78 78 push({'data':data})
79 79 d = pull('data')
80 80 self.assertEqual(d, data)
81 81 self.client[:].push({'data':data})
82 82 d = self.client[:].pull('data', block=True)
83 83 self.assertEqual(d, nengines*[data])
84 84 ar = push({'data':data}, block=False)
85 85 self.assertTrue(isinstance(ar, AsyncResult))
86 86 r = ar.get()
87 87 ar = self.client[:].pull('data', block=False)
88 88 self.assertTrue(isinstance(ar, AsyncResult))
89 89 r = ar.get()
90 90 self.assertEqual(r, nengines*[data])
91 91 self.client[:].push(dict(a=10,b=20))
92 92 r = self.client[:].pull(('a','b'), block=True)
93 93 self.assertEqual(r, nengines*[[10,20]])
94 94
95 95 def test_push_pull_function(self):
96 96 "test pushing and pulling functions"
97 97 def testf(x):
98 98 return 2.0*x
99 99
100 100 t = self.client.ids[-1]
101 101 v = self.client[t]
102 102 v.block=True
103 103 push = v.push
104 104 pull = v.pull
105 105 execute = v.execute
106 106 push({'testf':testf})
107 107 r = pull('testf')
108 108 self.assertEqual(r(1.0), testf(1.0))
109 109 execute('r = testf(10)')
110 110 r = pull('r')
111 111 self.assertEqual(r, testf(10))
112 112 ar = self.client[:].push({'testf':testf}, block=False)
113 113 ar.get()
114 114 ar = self.client[:].pull('testf', block=False)
115 115 rlist = ar.get()
116 116 for r in rlist:
117 117 self.assertEqual(r(1.0), testf(1.0))
118 118 execute("def g(x): return x*x")
119 119 r = pull(('testf','g'))
120 120 self.assertEqual((r[0](10),r[1](10)), (testf(10), 100))
121 121
122 122 def test_push_function_globals(self):
123 123 """test that pushed functions have access to globals"""
124 124 @interactive
125 125 def geta():
126 126 return a
127 127 # self.add_engines(1)
128 128 v = self.client[-1]
129 129 v.block=True
130 130 v['f'] = geta
131 131 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
132 132 v.execute('a=5')
133 133 v.execute('b=f()')
134 134 self.assertEqual(v['b'], 5)
135 135
136 136 def test_push_function_defaults(self):
137 137 """test that pushed functions preserve default args"""
138 138 def echo(a=10):
139 139 return a
140 140 v = self.client[-1]
141 141 v.block=True
142 142 v['f'] = echo
143 143 v.execute('b=f()')
144 144 self.assertEqual(v['b'], 10)
145 145
146 146 def test_get_result(self):
147 147 """test getting results from the Hub."""
148 148 c = pmod.Client(profile='iptest')
149 149 # self.add_engines(1)
150 150 t = c.ids[-1]
151 151 v = c[t]
152 152 v2 = self.client[t]
153 153 ar = v.apply_async(wait, 1)
154 154 # give the monitor time to notice the message
155 155 time.sleep(.25)
156 156 ahr = v2.get_result(ar.msg_ids)
157 157 self.assertTrue(isinstance(ahr, AsyncHubResult))
158 158 self.assertEqual(ahr.get(), ar.get())
159 159 ar2 = v2.get_result(ar.msg_ids)
160 160 self.assertFalse(isinstance(ar2, AsyncHubResult))
161 161 c.spin()
162 162 c.close()
163 163
164 164 def test_run_newline(self):
165 165 """test that run appends newline to files"""
166 166 tmpfile = mktemp()
167 167 with open(tmpfile, 'w') as f:
168 168 f.write("""def g():
169 169 return 5
170 170 """)
171 171 v = self.client[-1]
172 172 v.run(tmpfile, block=True)
173 173 self.assertEqual(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
174 174
175 175 def test_apply_tracked(self):
176 176 """test tracking for apply"""
177 177 # self.add_engines(1)
178 178 t = self.client.ids[-1]
179 179 v = self.client[t]
180 180 v.block=False
181 181 def echo(n=1024*1024, **kwargs):
182 182 with v.temp_flags(**kwargs):
183 183 return v.apply(lambda x: x, 'x'*n)
184 184 ar = echo(1, track=False)
185 185 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
186 186 self.assertTrue(ar.sent)
187 187 ar = echo(track=True)
188 188 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
189 189 self.assertEqual(ar.sent, ar._tracker.done)
190 190 ar._tracker.wait()
191 191 self.assertTrue(ar.sent)
192 192
193 193 def test_push_tracked(self):
194 194 t = self.client.ids[-1]
195 195 ns = dict(x='x'*1024*1024)
196 196 v = self.client[t]
197 197 ar = v.push(ns, block=False, track=False)
198 198 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
199 199 self.assertTrue(ar.sent)
200 200
201 201 ar = v.push(ns, block=False, track=True)
202 202 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
203 203 ar._tracker.wait()
204 204 self.assertEqual(ar.sent, ar._tracker.done)
205 205 self.assertTrue(ar.sent)
206 206 ar.get()
207 207
208 208 def test_scatter_tracked(self):
209 209 t = self.client.ids
210 210 x='x'*1024*1024
211 211 ar = self.client[t].scatter('x', x, block=False, track=False)
212 212 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
213 213 self.assertTrue(ar.sent)
214 214
215 215 ar = self.client[t].scatter('x', x, block=False, track=True)
216 216 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
217 217 self.assertEqual(ar.sent, ar._tracker.done)
218 218 ar._tracker.wait()
219 219 self.assertTrue(ar.sent)
220 220 ar.get()
221 221
222 222 def test_remote_reference(self):
223 223 v = self.client[-1]
224 224 v['a'] = 123
225 225 ra = pmod.Reference('a')
226 226 b = v.apply_sync(lambda x: x, ra)
227 227 self.assertEqual(b, 123)
228 228
229 229
230 230 def test_scatter_gather(self):
231 231 view = self.client[:]
232 232 seq1 = range(16)
233 233 view.scatter('a', seq1)
234 234 seq2 = view.gather('a', block=True)
235 235 self.assertEqual(seq2, seq1)
236 236 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
237 237
238 238 @skip_without('numpy')
239 239 def test_scatter_gather_numpy(self):
240 240 import numpy
241 241 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
242 242 view = self.client[:]
243 243 a = numpy.arange(64)
244 244 view.scatter('a', a, block=True)
245 245 b = view.gather('a', block=True)
246 246 assert_array_equal(b, a)
247 247
248 248 def test_scatter_gather_lazy(self):
249 249 """scatter/gather with targets='all'"""
250 250 view = self.client.direct_view(targets='all')
251 251 x = range(64)
252 252 view.scatter('x', x)
253 253 gathered = view.gather('x', block=True)
254 254 self.assertEqual(gathered, x)
255 255
256 256
257 257 @dec.known_failure_py3
258 258 @skip_without('numpy')
259 259 def test_push_numpy_nocopy(self):
260 260 import numpy
261 261 view = self.client[:]
262 262 a = numpy.arange(64)
263 263 view['A'] = a
264 264 @interactive
265 265 def check_writeable(x):
266 266 return x.flags.writeable
267 267
268 268 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
269 269 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
270 270
271 271 view.push(dict(B=a))
272 272 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
273 273 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
274 274
275 275 @skip_without('numpy')
276 276 def test_apply_numpy(self):
277 277 """view.apply(f, ndarray)"""
278 278 import numpy
279 279 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
280 280
281 281 A = numpy.random.random((100,100))
282 282 view = self.client[-1]
283 283 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
284 284 B = A.astype(dt)
285 285 C = view.apply_sync(lambda x:x, B)
286 286 assert_array_equal(B,C)
287 287
288 288 @skip_without('numpy')
289 289 def test_push_pull_recarray(self):
290 290 """push/pull recarrays"""
291 291 import numpy
292 292 from numpy.testing.utils import assert_array_equal
293 293
294 294 view = self.client[-1]
295 295
296 296 R = numpy.array([
297 297 (1, 'hi', 0.),
298 298 (2**30, 'there', 2.5),
299 299 (-99999, 'world', -12345.6789),
300 300 ], [('n', int), ('s', '|S10'), ('f', float)])
301 301
302 302 view['RR'] = R
303 303 R2 = view['RR']
304 304
305 305 r_dtype, r_shape = view.apply_sync(interactive(lambda : (RR.dtype, RR.shape)))
306 306 self.assertEqual(r_dtype, R.dtype)
307 307 self.assertEqual(r_shape, R.shape)
308 308 self.assertEqual(R2.dtype, R.dtype)
309 309 self.assertEqual(R2.shape, R.shape)
310 310 assert_array_equal(R2, R)
311 311
312 312 @skip_without('pandas')
313 313 def test_push_pull_timeseries(self):
314 314 """push/pull pandas.TimeSeries"""
315 315 import pandas
316 316
317 317 ts = pandas.TimeSeries(range(10))
318 318
319 319 view = self.client[-1]
320 320
321 321 view.push(dict(ts=ts), block=True)
322 322 rts = view['ts']
323 323
324 324 self.assertEqual(type(rts), type(ts))
325 325 self.assertTrue((ts == rts).all())
326 326
327 327 def test_map(self):
328 328 view = self.client[:]
329 329 def f(x):
330 330 return x**2
331 331 data = range(16)
332 332 r = view.map_sync(f, data)
333 333 self.assertEqual(r, map(f, data))
334 334
335 335 def test_map_iterable(self):
336 336 """test map on iterables (direct)"""
337 337 view = self.client[:]
338 338 # 101 is prime, so it won't be evenly distributed
339 339 arr = range(101)
340 340 # ensure it will be an iterator, even in Python 3
341 341 it = iter(arr)
342 342 r = view.map_sync(lambda x:x, arr)
343 343 self.assertEqual(r, list(arr))
344 344
345 345 def test_scatter_gather_nonblocking(self):
346 346 data = range(16)
347 347 view = self.client[:]
348 348 view.scatter('a', data, block=False)
349 349 ar = view.gather('a', block=False)
350 350 self.assertEqual(ar.get(), data)
351 351
352 352 @skip_without('numpy')
353 353 def test_scatter_gather_numpy_nonblocking(self):
354 354 import numpy
355 355 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
356 356 a = numpy.arange(64)
357 357 view = self.client[:]
358 358 ar = view.scatter('a', a, block=False)
359 359 self.assertTrue(isinstance(ar, AsyncResult))
360 360 amr = view.gather('a', block=False)
361 361 self.assertTrue(isinstance(amr, AsyncMapResult))
362 362 assert_array_equal(amr.get(), a)
363 363
364 364 def test_execute(self):
365 365 view = self.client[:]
366 366 # self.client.debug=True
367 367 execute = view.execute
368 368 ar = execute('c=30', block=False)
369 369 self.assertTrue(isinstance(ar, AsyncResult))
370 370 ar = execute('d=[0,1,2]', block=False)
371 371 self.client.wait(ar, 1)
372 372 self.assertEqual(len(ar.get()), len(self.client))
373 373 for c in view['c']:
374 374 self.assertEqual(c, 30)
375 375
376 376 def test_abort(self):
377 377 view = self.client[-1]
378 378 ar = view.execute('import time; time.sleep(1)', block=False)
379 379 ar2 = view.apply_async(lambda : 2)
380 380 ar3 = view.apply_async(lambda : 3)
381 381 view.abort(ar2)
382 382 view.abort(ar3.msg_ids)
383 383 self.assertRaises(error.TaskAborted, ar2.get)
384 384 self.assertRaises(error.TaskAborted, ar3.get)
385 385
386 386 def test_abort_all(self):
387 387 """view.abort() aborts all outstanding tasks"""
388 388 view = self.client[-1]
389 389 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
390 390 view.abort()
391 391 view.wait(timeout=5)
392 392 for ar in ars[5:]:
393 393 self.assertRaises(error.TaskAborted, ar.get)
394 394
395 395 def test_temp_flags(self):
396 396 view = self.client[-1]
397 397 view.block=True
398 398 with view.temp_flags(block=False):
399 399 self.assertFalse(view.block)
400 400 self.assertTrue(view.block)
401 401
402 402 @dec.known_failure_py3
403 403 def test_importer(self):
404 404 view = self.client[-1]
405 405 view.clear(block=True)
406 406 with view.importer:
407 407 import re
408 408
409 409 @interactive
410 410 def findall(pat, s):
411 411 # this globals() step isn't necessary in real code
412 412 # only to prevent a closure in the test
413 413 re = globals()['re']
414 414 return re.findall(pat, s)
415 415
416 416 self.assertEqual(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
417 417
418 418 def test_unicode_execute(self):
419 419 """test executing unicode strings"""
420 420 v = self.client[-1]
421 421 v.block=True
422 422 if sys.version_info[0] >= 3:
423 423 code="a='é'"
424 424 else:
425 425 code=u"a=u'é'"
426 426 v.execute(code)
427 427 self.assertEqual(v['a'], u'é')
428 428
429 429 def test_unicode_apply_result(self):
430 430 """test unicode apply results"""
431 431 v = self.client[-1]
432 432 r = v.apply_sync(lambda : u'é')
433 433 self.assertEqual(r, u'é')
434 434
435 435 def test_unicode_apply_arg(self):
436 436 """test passing unicode arguments to apply"""
437 437 v = self.client[-1]
438 438
439 439 @interactive
440 440 def check_unicode(a, check):
441 441 assert isinstance(a, unicode), "%r is not unicode"%a
442 442 assert isinstance(check, bytes), "%r is not bytes"%check
443 443 assert a.encode('utf8') == check, "%s != %s"%(a,check)
444 444
445 445 for s in [ u'é', u'ßø®∫',u'asdf' ]:
446 446 try:
447 447 v.apply_sync(check_unicode, s, s.encode('utf8'))
448 448 except error.RemoteError as e:
449 449 if e.ename == 'AssertionError':
450 450 self.fail(e.evalue)
451 451 else:
452 452 raise e
453 453
454 454 def test_map_reference(self):
455 455 """view.map(<Reference>, *seqs) should work"""
456 456 v = self.client[:]
457 457 v.scatter('n', self.client.ids, flatten=True)
458 458 v.execute("f = lambda x,y: x*y")
459 459 rf = pmod.Reference('f')
460 460 nlist = list(range(10))
461 461 mlist = nlist[::-1]
462 462 expected = [ m*n for m,n in zip(mlist, nlist) ]
463 463 result = v.map_sync(rf, mlist, nlist)
464 464 self.assertEqual(result, expected)
465 465
466 466 def test_apply_reference(self):
467 467 """view.apply(<Reference>, *args) should work"""
468 468 v = self.client[:]
469 469 v.scatter('n', self.client.ids, flatten=True)
470 470 v.execute("f = lambda x: n*x")
471 471 rf = pmod.Reference('f')
472 472 result = v.apply_sync(rf, 5)
473 473 expected = [ 5*id for id in self.client.ids ]
474 474 self.assertEqual(result, expected)
475 475
476 476 def test_eval_reference(self):
477 477 v = self.client[self.client.ids[0]]
478 478 v['g'] = range(5)
479 479 rg = pmod.Reference('g[0]')
480 480 echo = lambda x:x
481 481 self.assertEqual(v.apply_sync(echo, rg), 0)
482 482
483 483 def test_reference_nameerror(self):
484 484 v = self.client[self.client.ids[0]]
485 485 r = pmod.Reference('elvis_has_left')
486 486 echo = lambda x:x
487 487 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
488 488
489 489 def test_single_engine_map(self):
490 490 e0 = self.client[self.client.ids[0]]
491 491 r = range(5)
492 492 check = [ -1*i for i in r ]
493 493 result = e0.map_sync(lambda x: -1*x, r)
494 494 self.assertEqual(result, check)
495 495
496 496 def test_len(self):
497 497 """len(view) makes sense"""
498 498 e0 = self.client[self.client.ids[0]]
499 499 yield self.assertEqual(len(e0), 1)
500 500 v = self.client[:]
501 501 yield self.assertEqual(len(v), len(self.client.ids))
502 502 v = self.client.direct_view('all')
503 503 yield self.assertEqual(len(v), len(self.client.ids))
504 504 v = self.client[:2]
505 505 yield self.assertEqual(len(v), 2)
506 506 v = self.client[:1]
507 507 yield self.assertEqual(len(v), 1)
508 508 v = self.client.load_balanced_view()
509 509 yield self.assertEqual(len(v), len(self.client.ids))
510 510 # parametric tests seem to require manual closing?
511 511 self.client.close()
512 512
513 513
514 514 # begin execute tests
515 515
516 516 def test_execute_reply(self):
517 517 e0 = self.client[self.client.ids[0]]
518 518 e0.block = True
519 519 ar = e0.execute("5", silent=False)
520 520 er = ar.get()
521 521 self.assertEqual(str(er), "<ExecuteReply[%i]: 5>" % er.execution_count)
522 522 self.assertEqual(er.pyout['data']['text/plain'], '5')
523 523
524 524 def test_execute_reply_stdout(self):
525 525 e0 = self.client[self.client.ids[0]]
526 526 e0.block = True
527 527 ar = e0.execute("print (5)", silent=False)
528 528 er = ar.get()
529 529 self.assertEqual(er.stdout.strip(), '5')
530 530
531 531 def test_execute_pyout(self):
532 532 """execute triggers pyout with silent=False"""
533 533 view = self.client[:]
534 534 ar = view.execute("5", silent=False, block=True)
535 535
536 536 expected = [{'text/plain' : '5'}] * len(view)
537 537 mimes = [ out['data'] for out in ar.pyout ]
538 538 self.assertEqual(mimes, expected)
539 539
540 540 def test_execute_silent(self):
541 541 """execute does not trigger pyout with silent=True"""
542 542 view = self.client[:]
543 543 ar = view.execute("5", block=True)
544 544 expected = [None] * len(view)
545 545 self.assertEqual(ar.pyout, expected)
546 546
547 547 def test_execute_magic(self):
548 548 """execute accepts IPython commands"""
549 549 view = self.client[:]
550 550 view.execute("a = 5")
551 551 ar = view.execute("%whos", block=True)
552 552 # this will raise, if that failed
553 553 ar.get(5)
554 554 for stdout in ar.stdout:
555 555 lines = stdout.splitlines()
556 556 self.assertEqual(lines[0].split(), ['Variable', 'Type', 'Data/Info'])
557 557 found = False
558 558 for line in lines[2:]:
559 559 split = line.split()
560 560 if split == ['a', 'int', '5']:
561 561 found = True
562 562 break
563 563 self.assertTrue(found, "whos output wrong: %s" % stdout)
564 564
565 565 def test_execute_displaypub(self):
566 566 """execute tracks display_pub output"""
567 567 view = self.client[:]
568 568 view.execute("from IPython.core.display import *")
569 569 ar = view.execute("[ display(i) for i in range(5) ]", block=True)
570 570
571 571 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
572 572 for outputs in ar.outputs:
573 573 mimes = [ out['data'] for out in outputs ]
574 574 self.assertEqual(mimes, expected)
575 575
576 576 def test_apply_displaypub(self):
577 577 """apply tracks display_pub output"""
578 578 view = self.client[:]
579 579 view.execute("from IPython.core.display import *")
580 580
581 581 @interactive
582 582 def publish():
583 583 [ display(i) for i in range(5) ]
584 584
585 585 ar = view.apply_async(publish)
586 586 ar.get(5)
587 587 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
588 588 for outputs in ar.outputs:
589 589 mimes = [ out['data'] for out in outputs ]
590 590 self.assertEqual(mimes, expected)
591 591
592 592 def test_execute_raises(self):
593 593 """exceptions in execute requests raise appropriately"""
594 594 view = self.client[-1]
595 595 ar = view.execute("1/0")
596 596 self.assertRaisesRemote(ZeroDivisionError, ar.get, 2)
597 597
598 598 def test_remoteerror_render_exception(self):
599 599 """RemoteErrors get nice tracebacks"""
600 600 view = self.client[-1]
601 601 ar = view.execute("1/0")
602 602 ip = get_ipython()
603 603 ip.user_ns['ar'] = ar
604 604 with capture_output() as io:
605 605 ip.run_cell("ar.get(2)")
606 606
607 607 self.assertTrue('ZeroDivisionError' in io.stdout, io.stdout)
608 608
609 609 def test_compositeerror_render_exception(self):
610 610 """CompositeErrors get nice tracebacks"""
611 611 view = self.client[:]
612 612 ar = view.execute("1/0")
613 613 ip = get_ipython()
614 614 ip.user_ns['ar'] = ar
615 615 with capture_output() as io:
616 616 ip.run_cell("ar.get(2)")
617 617
618 618 self.assertEqual(io.stdout.count('ZeroDivisionError'), len(view) * 2, io.stdout)
619 619 self.assertEqual(io.stdout.count('by zero'), len(view), io.stdout)
620 620 self.assertEqual(io.stdout.count(':execute'), len(view), io.stdout)
621 621
622 def test_compositeerror_truncate(self):
623 """Truncate CompositeErrors with many exceptions"""
624 view = self.client[:]
625 msg_ids = []
626 for i in range(10):
627 ar = view.execute("1/0")
628 msg_ids.extend(ar.msg_ids)
629
630 ar = self.client.get_result(msg_ids)
631 try:
632 ar.get()
633 except error.CompositeError as e:
634 pass
635 else:
636 self.fail("Should have raised CompositeError")
637
638 lines = e.render_traceback()
639 with capture_output() as io:
640 e.print_traceback()
641
642 self.assertTrue("more exceptions" in lines[-1])
643 count = e.tb_limit
644
645 self.assertEqual(io.stdout.count('ZeroDivisionError'), 2 * count, io.stdout)
646 self.assertEqual(io.stdout.count('by zero'), count, io.stdout)
647 self.assertEqual(io.stdout.count(':execute'), count, io.stdout)
648
622 649 @dec.skipif_not_matplotlib
623 650 def test_magic_pylab(self):
624 651 """%pylab works on engines"""
625 652 view = self.client[-1]
626 653 ar = view.execute("%pylab inline")
627 654 # at least check if this raised:
628 655 reply = ar.get(5)
629 656 # include imports, in case user config
630 657 ar = view.execute("plot(rand(100))", silent=False)
631 658 reply = ar.get(5)
632 659 self.assertEqual(len(reply.outputs), 1)
633 660 output = reply.outputs[0]
634 661 self.assertTrue("data" in output)
635 662 data = output['data']
636 663 self.assertTrue("image/png" in data)
637 664
638 665 def test_func_default_func(self):
639 666 """interactively defined function as apply func default"""
640 667 def foo():
641 668 return 'foo'
642 669
643 670 def bar(f=foo):
644 671 return f()
645 672
646 673 view = self.client[-1]
647 674 ar = view.apply_async(bar)
648 675 r = ar.get(10)
649 676 self.assertEqual(r, 'foo')
650 677 def test_data_pub_single(self):
651 678 view = self.client[-1]
652 679 ar = view.execute('\n'.join([
653 680 'from IPython.kernel.zmq.datapub import publish_data',
654 681 'for i in range(5):',
655 682 ' publish_data(dict(i=i))'
656 683 ]), block=False)
657 684 self.assertTrue(isinstance(ar.data, dict))
658 685 ar.get(5)
659 686 self.assertEqual(ar.data, dict(i=4))
660 687
661 688 def test_data_pub(self):
662 689 view = self.client[:]
663 690 ar = view.execute('\n'.join([
664 691 'from IPython.kernel.zmq.datapub import publish_data',
665 692 'for i in range(5):',
666 693 ' publish_data(dict(i=i))'
667 694 ]), block=False)
668 695 self.assertTrue(all(isinstance(d, dict) for d in ar.data))
669 696 ar.get(5)
670 697 self.assertEqual(ar.data, [dict(i=4)] * len(ar))
671 698
672 699 def test_can_list_arg(self):
673 700 """args in lists are canned"""
674 701 view = self.client[-1]
675 702 view['a'] = 128
676 703 rA = pmod.Reference('a')
677 704 ar = view.apply_async(lambda x: x, [rA])
678 705 r = ar.get(5)
679 706 self.assertEqual(r, [128])
680 707
681 708 def test_can_dict_arg(self):
682 709 """args in dicts are canned"""
683 710 view = self.client[-1]
684 711 view['a'] = 128
685 712 rA = pmod.Reference('a')
686 713 ar = view.apply_async(lambda x: x, dict(foo=rA))
687 714 r = ar.get(5)
688 715 self.assertEqual(r, dict(foo=128))
689 716
690 717 def test_can_list_kwarg(self):
691 718 """kwargs in lists are canned"""
692 719 view = self.client[-1]
693 720 view['a'] = 128
694 721 rA = pmod.Reference('a')
695 722 ar = view.apply_async(lambda x=5: x, x=[rA])
696 723 r = ar.get(5)
697 724 self.assertEqual(r, [128])
698 725
699 726 def test_can_dict_kwarg(self):
700 727 """kwargs in dicts are canned"""
701 728 view = self.client[-1]
702 729 view['a'] = 128
703 730 rA = pmod.Reference('a')
704 731 ar = view.apply_async(lambda x=5: x, dict(foo=rA))
705 732 r = ar.get(5)
706 733 self.assertEqual(r, dict(foo=128))
707 734
708 735 def test_map_ref(self):
709 736 """view.map works with references"""
710 737 view = self.client[:]
711 738 ranks = sorted(self.client.ids)
712 739 view.scatter('rank', ranks, flatten=True)
713 740 rrank = pmod.Reference('rank')
714 741
715 742 amr = view.map_async(lambda x: x*2, [rrank] * len(view))
716 743 drank = amr.get(5)
717 744 self.assertEqual(drank, [ r*2 for r in ranks ])
718 745
719 746 def test_nested_getitem_setitem(self):
720 747 """get and set with view['a.b']"""
721 748 view = self.client[-1]
722 749 view.execute('\n'.join([
723 750 'class A(object): pass',
724 751 'a = A()',
725 752 'a.b = 128',
726 753 ]), block=True)
727 754 ra = pmod.Reference('a')
728 755
729 756 r = view.apply_sync(lambda x: x.b, ra)
730 757 self.assertEqual(r, 128)
731 758 self.assertEqual(view['a.b'], 128)
732 759
733 760 view['a.b'] = 0
734 761
735 762 r = view.apply_sync(lambda x: x.b, ra)
736 763 self.assertEqual(r, 0)
737 764 self.assertEqual(view['a.b'], 0)
General Comments 0
You need to be logged in to leave comments. Login now