##// END OF EJS Templates
adapt kernel/error.py to zmq, improve error propagation.
MinRK -
Show More
@@ -0,0 +1,276 b''
1 # encoding: utf-8
2
3 """Classes and functions for kernel related errors and exceptions."""
4 from __future__ import print_function
5
6 __docformat__ = "restructuredtext en"
7
8 # Tell nose to skip this module
9 __test__ = {}
10
11 #-------------------------------------------------------------------------------
12 # Copyright (C) 2008 The IPython Development Team
13 #
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
16 #-------------------------------------------------------------------------------
17
18 #-------------------------------------------------------------------------------
19 # Error classes
20 #-------------------------------------------------------------------------------
21 class IPythonError(Exception):
22 """Base exception that all of our exceptions inherit from.
23
24 This can be raised by code that doesn't have any more specific
25 information."""
26
27 pass
28
29 # Exceptions associated with the controller objects
30 class ControllerError(IPythonError): pass
31
32 class ControllerCreationError(ControllerError): pass
33
34
35 # Exceptions associated with the Engines
36 class EngineError(IPythonError): pass
37
38 class EngineCreationError(EngineError): pass
39
40 class KernelError(IPythonError):
41 pass
42
43 class NotDefined(KernelError):
44 def __init__(self, name):
45 self.name = name
46 self.args = (name,)
47
48 def __repr__(self):
49 return '<NotDefined: %s>' % self.name
50
51 __str__ = __repr__
52
53
54 class QueueCleared(KernelError):
55 pass
56
57
58 class IdInUse(KernelError):
59 pass
60
61
62 class ProtocolError(KernelError):
63 pass
64
65
66 class ConnectionError(KernelError):
67 pass
68
69
70 class InvalidEngineID(KernelError):
71 pass
72
73
74 class NoEnginesRegistered(KernelError):
75 pass
76
77
78 class InvalidClientID(KernelError):
79 pass
80
81
82 class InvalidDeferredID(KernelError):
83 pass
84
85
86 class SerializationError(KernelError):
87 pass
88
89
90 class MessageSizeError(KernelError):
91 pass
92
93
94 class PBMessageSizeError(MessageSizeError):
95 pass
96
97
98 class ResultNotCompleted(KernelError):
99 pass
100
101
102 class ResultAlreadyRetrieved(KernelError):
103 pass
104
105 class ClientError(KernelError):
106 pass
107
108
109 class TaskAborted(KernelError):
110 pass
111
112
113 class TaskTimeout(KernelError):
114 pass
115
116
117 class NotAPendingResult(KernelError):
118 pass
119
120
121 class UnpickleableException(KernelError):
122 pass
123
124
125 class AbortedPendingDeferredError(KernelError):
126 pass
127
128
129 class InvalidProperty(KernelError):
130 pass
131
132
133 class MissingBlockArgument(KernelError):
134 pass
135
136
137 class StopLocalExecution(KernelError):
138 pass
139
140
141 class SecurityError(KernelError):
142 pass
143
144
145 class FileTimeoutError(KernelError):
146 pass
147
148 class RemoteError(KernelError):
149 """Error raised elsewhere"""
150 ename=None
151 evalue=None
152 traceback=None
153 engine_info=None
154
155 def __init__(self, ename, evalue, traceback, engine_info=None):
156 self.ename=ename
157 self.evalue=evalue
158 self.traceback=traceback
159 self.engine_info=engine_info or {}
160 self.args=(ename, evalue)
161
162 def __repr__(self):
163 engineid = self.engine_info.get('engineid', ' ')
164 return "<Remote[%s]:%s(%s)>"%(engineid, self.ename, self.evalue)
165
166 def __str__(self):
167 sig = "%s(%s)"%(self.ename, self.evalue)
168 if self.traceback:
169 return sig + '\n' + self.traceback
170 else:
171 return sig
172
173
174 class TaskRejectError(KernelError):
175 """Exception to raise when a task should be rejected by an engine.
176
177 This exception can be used to allow a task running on an engine to test
178 if the engine (or the user's namespace on the engine) has the needed
179 task dependencies. If not, the task should raise this exception. For
180 the task to be retried on another engine, the task should be created
181 with the `retries` argument > 1.
182
183 The advantage of this approach over our older properties system is that
184 tasks have full access to the user's namespace on the engines and the
185 properties don't have to be managed or tested by the controller.
186 """
187
188
189 class CompositeError(KernelError):
190 """Error for representing possibly multiple errors on engines"""
191 def __init__(self, message, elist):
192 Exception.__init__(self, *(message, elist))
193 # Don't use pack_exception because it will conflict with the .message
194 # attribute that is being deprecated in 2.6 and beyond.
195 self.msg = message
196 self.elist = elist
197 self.args = [ e[0] for e in elist ]
198
199 def _get_engine_str(self, ei):
200 if not ei:
201 return '[Engine Exception]'
202 else:
203 return '[%i:%s]: ' % (ei['engineid'], ei['method'])
204
205 def _get_traceback(self, ev):
206 try:
207 tb = ev._ipython_traceback_text
208 except AttributeError:
209 return 'No traceback available'
210 else:
211 return tb
212
213 def __str__(self):
214 s = str(self.msg)
215 for en, ev, etb, ei in self.elist:
216 engine_str = self._get_engine_str(ei)
217 s = s + '\n' + engine_str + en + ': ' + str(ev)
218 return s
219
220 def __repr__(self):
221 return "CompositeError(%i)"%len(self.elist)
222
223 def print_tracebacks(self, excid=None):
224 if excid is None:
225 for (en,ev,etb,ei) in self.elist:
226 print (self._get_engine_str(ei))
227 print (etb or 'No traceback available')
228 print ()
229 else:
230 try:
231 en,ev,etb,ei = self.elist[excid]
232 except:
233 raise IndexError("an exception with index %i does not exist"%excid)
234 else:
235 print (self._get_engine_str(ei))
236 print (etb or 'No traceback available')
237
238 def raise_exception(self, excid=0):
239 try:
240 en,ev,etb,ei = self.elist[excid]
241 except:
242 raise IndexError("an exception with index %i does not exist"%excid)
243 else:
244 try:
245 raise RemoteError(en, ev, etb, ei)
246 except:
247 et,ev,tb = sys.exc_info()
248
249
250 def collect_exceptions(rdict, method):
251 """check a result dict for errors, and raise CompositeError if any exist.
252 Passthrough otherwise."""
253 elist = []
254 for r in rdict.values():
255 if isinstance(r, RemoteError):
256 en, ev, etb, ei = r.ename, r.evalue, r.traceback, r.engine_info
257 # Sometimes we could have CompositeError in our list. Just take
258 # the errors out of them and put them in our new list. This
259 # has the effect of flattening lists of CompositeErrors into one
260 # CompositeError
261 if en=='CompositeError':
262 for e in ev.elist:
263 elist.append(e)
264 else:
265 elist.append((en, ev, etb, ei))
266 if len(elist)==0:
267 return rdict
268 else:
269 msg = "one or more exceptions from call to method: %s" % (method)
270 # This silliness is needed so the debugger has access to the exception
271 # instance (e in this case)
272 try:
273 raise CompositeError(msg, elist)
274 except CompositeError, e:
275 raise e
276
@@ -27,6 +27,7 b' import streamsession as ss'
27 27 # from remotenamespace import RemoteNamespace
28 28 from view import DirectView, LoadBalancedView
29 29 from dependency import Dependency, depend, require
30 import error
30 31
31 32 def _push(ns):
32 33 globals().update(ns)
@@ -128,12 +129,13 b' class AbortedTask(object):'
128 129 def __init__(self, msg_id):
129 130 self.msg_id = msg_id
130 131
131 class ControllerError(Exception):
132 """Exception Class for errors in the controller (not the Engine)."""
133 def __init__(self, etype, evalue, tb):
134 self.etype = etype
135 self.evalue = evalue
136 self.traceback=tb
132 class ResultDict(dict):
133 """A subclass of dict that raises errors if it has them."""
134 def __getitem__(self, key):
135 res = dict.__getitem__(self, key)
136 if isinstance(res, error.KernelError):
137 raise res
138 return res
137 139
138 140 class Client(object):
139 141 """A semi-synchronous client to the IPython ZMQ controller
@@ -402,12 +404,18 b' class Client(object):'
402 404 if content['status'] == 'ok':
403 405 self.results[msg_id] = ss.unserialize_object(msg['buffers'])
404 406 elif content['status'] == 'aborted':
405 self.results[msg_id] = AbortedTask(msg_id)
407 self.results[msg_id] = error.AbortedTask(msg_id)
406 408 elif content['status'] == 'resubmitted':
407 409 # TODO: handle resubmission
408 410 pass
409 411 else:
410 self.results[msg_id] = ss.unwrap_exception(content)
412 e = ss.unwrap_exception(content)
413 e_uuid = e.engine_info['engineid']
414 for k,v in self._engines.iteritems():
415 if v == e_uuid:
416 e.engine_info['engineid'] = k
417 break
418 self.results[msg_id] = e
411 419
412 420 def _flush_notifications(self):
413 421 """Flush notifications of engine registrations waiting
@@ -649,6 +657,13 b' class Client(object):'
649 657 result = self.apply(execute, (code,), targets=None, block=block, bound=False)
650 658 return result
651 659
660 def _maybe_raise(self, result):
661 """wrapper for maybe raising an exception if apply failed."""
662 if isinstance(result, error.RemoteError):
663 raise result
664
665 return result
666
652 667 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
653 668 after=None, follow=None):
654 669 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
@@ -758,7 +773,7 b' class Client(object):'
758 773 self.history.append(msg_id)
759 774 if block:
760 775 self.barrier(msg_id)
761 return self.results[msg_id]
776 return self._maybe_raise(self.results[msg_id])
762 777 else:
763 778 return msg_id
764 779
@@ -795,12 +810,12 b' class Client(object):'
795 810 else:
796 811 return msg_ids
797 812 if len(msg_ids) == 1:
798 return self.results[msg_ids[0]]
813 return self._maybe_raise(self.results[msg_ids[0]])
799 814 else:
800 815 result = {}
801 816 for target,mid in zip(targets, msg_ids):
802 817 result[target] = self.results[mid]
803 return result
818 return error.collect_exceptions(result, f.__name__)
804 819
805 820 #--------------------------------------------------------------------------
806 821 # Data movement
@@ -24,6 +24,7 b' import zmq'
24 24 from zmq.eventloop import ioloop, zmqstream
25 25
26 26 # Local imports.
27 from IPython.core import ultratb
27 28 from IPython.utils.traitlets import HasTraits, Instance, List
28 29 from IPython.zmq.completer import KernelCompleter
29 30 from IPython.zmq.log import logger # a Logger object
@@ -74,6 +75,12 b' class Kernel(HasTraits):'
74 75 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
75 76 self.control_handlers[msg_type] = getattr(self, msg_type)
76 77
78
79 def _wrap_exception(self, method=None):
80 e_info = dict(engineid=self.identity, method=method)
81 content=wrap_exception(e_info)
82 return content
83
77 84 #-------------------- control handlers -----------------------------
78 85 def abort_queues(self):
79 86 for stream in self.shell_streams:
@@ -131,7 +138,7 b' class Kernel(HasTraits):'
131 138 try:
132 139 self.abort_queues()
133 140 except:
134 content = wrap_exception()
141 content = self._wrap_exception('shutdown')
135 142 else:
136 143 content = dict(parent['content'])
137 144 content['status'] = 'ok'
@@ -214,7 +221,7 b' class Kernel(HasTraits):'
214 221 sys.displayhook.set_parent(parent)
215 222 exec comp_code in self.user_ns, self.user_ns
216 223 except:
217 exc_content = wrap_exception()
224 exc_content = self._wrap_exception('execute')
218 225 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
219 226 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
220 227 reply_content = exc_content
@@ -291,13 +298,13 b' class Kernel(HasTraits):'
291 298 packed_result,buf = serialize_object(result)
292 299 result_buf = [packed_result]+buf
293 300 except:
294 exc_content = wrap_exception()
301 exc_content = self._wrap_exception('apply')
295 302 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
296 303 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
297 304 reply_content = exc_content
298 305 result_buf = []
299 306
300 if etype is UnmetDependency:
307 if exc_content['ename'] == UnmetDependency.__name__:
301 308 sub['dependencies_met'] = False
302 309 else:
303 310 reply_content = {'status' : 'ok'}
@@ -17,6 +17,8 b' from zmq.eventloop.zmqstream import ZMQStream'
17 17 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
18 18 from IPython.utils.newserialized import serialize, unserialize
19 19
20 from IPython.zmq.parallel.error import RemoteError
21
20 22 try:
21 23 import cPickle
22 24 pickle = cPickle
@@ -60,25 +62,22 b' else:'
60 62 DELIM="<IDS|MSG>"
61 63 ISO8601="%Y-%m-%dT%H:%M:%S.%f"
62 64
63 def wrap_exception():
65 def wrap_exception(engine_info={}):
64 66 etype, evalue, tb = sys.exc_info()
65 tb = traceback.format_exception(etype, evalue, tb)
67 stb = traceback.format_exception(etype, evalue, tb)
66 68 exc_content = {
67 69 'status' : 'error',
68 'traceback' : [ line.encode('utf8') for line in tb ],
69 'etype' : str(etype).encode('utf8'),
70 'evalue' : evalue.encode('utf8')
70 'traceback' : stb,
71 'ename' : unicode(etype.__name__),
72 'evalue' : unicode(evalue),
73 'engine_info' : engine_info
71 74 }
72 75 return exc_content
73 76
74 class KernelError(Exception):
75 pass
76
77 77 def unwrap_exception(content):
78 err = KernelError(content['etype'], content['evalue'])
79 err.evalue = content['evalue']
80 err.etype = content['etype']
81 err.traceback = ''.join(content['traceback'])
78 err = RemoteError(content['ename'], content['evalue'],
79 ''.join(content['traceback']),
80 content.get('engine_info', {}))
82 81 return err
83 82
84 83
@@ -402,7 +401,7 b' class StreamSession(object):'
402 401 pprint.pprint(buffers)
403 402 return omsg
404 403
405 def send_raw(self, stream, msg, flags=0, copy=True, idents=None):
404 def send_raw(self, stream, msg, flags=0, copy=True, ident=None):
406 405 """Send a raw message via idents.
407 406
408 407 Parameters
General Comments 0
You need to be logged in to leave comments. Login now