##// END OF EJS Templates
adapt kernel/error.py to zmq, improve error propagation.
MinRK -
Show More
@@ -0,0 +1,276 b''
1 # encoding: utf-8
2
3 """Classes and functions for kernel related errors and exceptions."""
4 from __future__ import print_function
5
6 __docformat__ = "restructuredtext en"
7
8 # Tell nose to skip this module
9 __test__ = {}
10
11 #-------------------------------------------------------------------------------
12 # Copyright (C) 2008 The IPython Development Team
13 #
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
16 #-------------------------------------------------------------------------------
17
18 #-------------------------------------------------------------------------------
19 # Error classes
20 #-------------------------------------------------------------------------------
21 class IPythonError(Exception):
22 """Base exception that all of our exceptions inherit from.
23
24 This can be raised by code that doesn't have any more specific
25 information."""
26
27 pass
28
29 # Exceptions associated with the controller objects
30 class ControllerError(IPythonError): pass
31
32 class ControllerCreationError(ControllerError): pass
33
34
35 # Exceptions associated with the Engines
36 class EngineError(IPythonError): pass
37
38 class EngineCreationError(EngineError): pass
39
40 class KernelError(IPythonError):
41 pass
42
43 class NotDefined(KernelError):
44 def __init__(self, name):
45 self.name = name
46 self.args = (name,)
47
48 def __repr__(self):
49 return '<NotDefined: %s>' % self.name
50
51 __str__ = __repr__
52
53
54 class QueueCleared(KernelError):
55 pass
56
57
58 class IdInUse(KernelError):
59 pass
60
61
62 class ProtocolError(KernelError):
63 pass
64
65
66 class ConnectionError(KernelError):
67 pass
68
69
70 class InvalidEngineID(KernelError):
71 pass
72
73
74 class NoEnginesRegistered(KernelError):
75 pass
76
77
78 class InvalidClientID(KernelError):
79 pass
80
81
82 class InvalidDeferredID(KernelError):
83 pass
84
85
86 class SerializationError(KernelError):
87 pass
88
89
90 class MessageSizeError(KernelError):
91 pass
92
93
94 class PBMessageSizeError(MessageSizeError):
95 pass
96
97
98 class ResultNotCompleted(KernelError):
99 pass
100
101
102 class ResultAlreadyRetrieved(KernelError):
103 pass
104
105 class ClientError(KernelError):
106 pass
107
108
109 class TaskAborted(KernelError):
110 pass
111
112
113 class TaskTimeout(KernelError):
114 pass
115
116
117 class NotAPendingResult(KernelError):
118 pass
119
120
121 class UnpickleableException(KernelError):
122 pass
123
124
125 class AbortedPendingDeferredError(KernelError):
126 pass
127
128
129 class InvalidProperty(KernelError):
130 pass
131
132
133 class MissingBlockArgument(KernelError):
134 pass
135
136
137 class StopLocalExecution(KernelError):
138 pass
139
140
141 class SecurityError(KernelError):
142 pass
143
144
145 class FileTimeoutError(KernelError):
146 pass
147
148 class RemoteError(KernelError):
149 """Error raised elsewhere"""
150 ename=None
151 evalue=None
152 traceback=None
153 engine_info=None
154
155 def __init__(self, ename, evalue, traceback, engine_info=None):
156 self.ename=ename
157 self.evalue=evalue
158 self.traceback=traceback
159 self.engine_info=engine_info or {}
160 self.args=(ename, evalue)
161
162 def __repr__(self):
163 engineid = self.engine_info.get('engineid', ' ')
164 return "<Remote[%s]:%s(%s)>"%(engineid, self.ename, self.evalue)
165
166 def __str__(self):
167 sig = "%s(%s)"%(self.ename, self.evalue)
168 if self.traceback:
169 return sig + '\n' + self.traceback
170 else:
171 return sig
172
173
174 class TaskRejectError(KernelError):
175 """Exception to raise when a task should be rejected by an engine.
176
177 This exception can be used to allow a task running on an engine to test
178 if the engine (or the user's namespace on the engine) has the needed
179 task dependencies. If not, the task should raise this exception. For
180 the task to be retried on another engine, the task should be created
181 with the `retries` argument > 1.
182
183 The advantage of this approach over our older properties system is that
184 tasks have full access to the user's namespace on the engines and the
185 properties don't have to be managed or tested by the controller.
186 """
187
188
189 class CompositeError(KernelError):
190 """Error for representing possibly multiple errors on engines"""
191 def __init__(self, message, elist):
192 Exception.__init__(self, *(message, elist))
193 # Don't use pack_exception because it will conflict with the .message
194 # attribute that is being deprecated in 2.6 and beyond.
195 self.msg = message
196 self.elist = elist
197 self.args = [ e[0] for e in elist ]
198
199 def _get_engine_str(self, ei):
200 if not ei:
201 return '[Engine Exception]'
202 else:
203 return '[%i:%s]: ' % (ei['engineid'], ei['method'])
204
205 def _get_traceback(self, ev):
206 try:
207 tb = ev._ipython_traceback_text
208 except AttributeError:
209 return 'No traceback available'
210 else:
211 return tb
212
213 def __str__(self):
214 s = str(self.msg)
215 for en, ev, etb, ei in self.elist:
216 engine_str = self._get_engine_str(ei)
217 s = s + '\n' + engine_str + en + ': ' + str(ev)
218 return s
219
220 def __repr__(self):
221 return "CompositeError(%i)"%len(self.elist)
222
223 def print_tracebacks(self, excid=None):
224 if excid is None:
225 for (en,ev,etb,ei) in self.elist:
226 print (self._get_engine_str(ei))
227 print (etb or 'No traceback available')
228 print ()
229 else:
230 try:
231 en,ev,etb,ei = self.elist[excid]
232 except:
233 raise IndexError("an exception with index %i does not exist"%excid)
234 else:
235 print (self._get_engine_str(ei))
236 print (etb or 'No traceback available')
237
238 def raise_exception(self, excid=0):
239 try:
240 en,ev,etb,ei = self.elist[excid]
241 except:
242 raise IndexError("an exception with index %i does not exist"%excid)
243 else:
244 try:
245 raise RemoteError(en, ev, etb, ei)
246 except:
247 et,ev,tb = sys.exc_info()
248
249
250 def collect_exceptions(rdict, method):
251 """check a result dict for errors, and raise CompositeError if any exist.
252 Passthrough otherwise."""
253 elist = []
254 for r in rdict.values():
255 if isinstance(r, RemoteError):
256 en, ev, etb, ei = r.ename, r.evalue, r.traceback, r.engine_info
257 # Sometimes we could have CompositeError in our list. Just take
258 # the errors out of them and put them in our new list. This
259 # has the effect of flattening lists of CompositeErrors into one
260 # CompositeError
261 if en=='CompositeError':
262 for e in ev.elist:
263 elist.append(e)
264 else:
265 elist.append((en, ev, etb, ei))
266 if len(elist)==0:
267 return rdict
268 else:
269 msg = "one or more exceptions from call to method: %s" % (method)
270 # This silliness is needed so the debugger has access to the exception
271 # instance (e in this case)
272 try:
273 raise CompositeError(msg, elist)
274 except CompositeError, e:
275 raise e
276
@@ -27,6 +27,7 b' import streamsession as ss'
27 # from remotenamespace import RemoteNamespace
27 # from remotenamespace import RemoteNamespace
28 from view import DirectView, LoadBalancedView
28 from view import DirectView, LoadBalancedView
29 from dependency import Dependency, depend, require
29 from dependency import Dependency, depend, require
30 import error
30
31
31 def _push(ns):
32 def _push(ns):
32 globals().update(ns)
33 globals().update(ns)
@@ -128,12 +129,13 b' class AbortedTask(object):'
128 def __init__(self, msg_id):
129 def __init__(self, msg_id):
129 self.msg_id = msg_id
130 self.msg_id = msg_id
130
131
131 class ControllerError(Exception):
132 class ResultDict(dict):
132 """Exception Class for errors in the controller (not the Engine)."""
133 """A subclass of dict that raises errors if it has them."""
133 def __init__(self, etype, evalue, tb):
134 def __getitem__(self, key):
134 self.etype = etype
135 res = dict.__getitem__(self, key)
135 self.evalue = evalue
136 if isinstance(res, error.KernelError):
136 self.traceback=tb
137 raise res
138 return res
137
139
138 class Client(object):
140 class Client(object):
139 """A semi-synchronous client to the IPython ZMQ controller
141 """A semi-synchronous client to the IPython ZMQ controller
@@ -402,12 +404,18 b' class Client(object):'
402 if content['status'] == 'ok':
404 if content['status'] == 'ok':
403 self.results[msg_id] = ss.unserialize_object(msg['buffers'])
405 self.results[msg_id] = ss.unserialize_object(msg['buffers'])
404 elif content['status'] == 'aborted':
406 elif content['status'] == 'aborted':
405 self.results[msg_id] = AbortedTask(msg_id)
407 self.results[msg_id] = error.AbortedTask(msg_id)
406 elif content['status'] == 'resubmitted':
408 elif content['status'] == 'resubmitted':
407 # TODO: handle resubmission
409 # TODO: handle resubmission
408 pass
410 pass
409 else:
411 else:
410 self.results[msg_id] = ss.unwrap_exception(content)
412 e = ss.unwrap_exception(content)
413 e_uuid = e.engine_info['engineid']
414 for k,v in self._engines.iteritems():
415 if v == e_uuid:
416 e.engine_info['engineid'] = k
417 break
418 self.results[msg_id] = e
411
419
412 def _flush_notifications(self):
420 def _flush_notifications(self):
413 """Flush notifications of engine registrations waiting
421 """Flush notifications of engine registrations waiting
@@ -649,6 +657,13 b' class Client(object):'
649 result = self.apply(execute, (code,), targets=None, block=block, bound=False)
657 result = self.apply(execute, (code,), targets=None, block=block, bound=False)
650 return result
658 return result
651
659
660 def _maybe_raise(self, result):
661 """wrapper for maybe raising an exception if apply failed."""
662 if isinstance(result, error.RemoteError):
663 raise result
664
665 return result
666
652 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
667 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
653 after=None, follow=None):
668 after=None, follow=None):
654 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
669 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
@@ -758,7 +773,7 b' class Client(object):'
758 self.history.append(msg_id)
773 self.history.append(msg_id)
759 if block:
774 if block:
760 self.barrier(msg_id)
775 self.barrier(msg_id)
761 return self.results[msg_id]
776 return self._maybe_raise(self.results[msg_id])
762 else:
777 else:
763 return msg_id
778 return msg_id
764
779
@@ -795,12 +810,12 b' class Client(object):'
795 else:
810 else:
796 return msg_ids
811 return msg_ids
797 if len(msg_ids) == 1:
812 if len(msg_ids) == 1:
798 return self.results[msg_ids[0]]
813 return self._maybe_raise(self.results[msg_ids[0]])
799 else:
814 else:
800 result = {}
815 result = {}
801 for target,mid in zip(targets, msg_ids):
816 for target,mid in zip(targets, msg_ids):
802 result[target] = self.results[mid]
817 result[target] = self.results[mid]
803 return result
818 return error.collect_exceptions(result, f.__name__)
804
819
805 #--------------------------------------------------------------------------
820 #--------------------------------------------------------------------------
806 # Data movement
821 # Data movement
@@ -24,6 +24,7 b' import zmq'
24 from zmq.eventloop import ioloop, zmqstream
24 from zmq.eventloop import ioloop, zmqstream
25
25
26 # Local imports.
26 # Local imports.
27 from IPython.core import ultratb
27 from IPython.utils.traitlets import HasTraits, Instance, List
28 from IPython.utils.traitlets import HasTraits, Instance, List
28 from IPython.zmq.completer import KernelCompleter
29 from IPython.zmq.completer import KernelCompleter
29 from IPython.zmq.log import logger # a Logger object
30 from IPython.zmq.log import logger # a Logger object
@@ -74,6 +75,12 b' class Kernel(HasTraits):'
74 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
75 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
75 self.control_handlers[msg_type] = getattr(self, msg_type)
76 self.control_handlers[msg_type] = getattr(self, msg_type)
76
77
78
79 def _wrap_exception(self, method=None):
80 e_info = dict(engineid=self.identity, method=method)
81 content=wrap_exception(e_info)
82 return content
83
77 #-------------------- control handlers -----------------------------
84 #-------------------- control handlers -----------------------------
78 def abort_queues(self):
85 def abort_queues(self):
79 for stream in self.shell_streams:
86 for stream in self.shell_streams:
@@ -131,7 +138,7 b' class Kernel(HasTraits):'
131 try:
138 try:
132 self.abort_queues()
139 self.abort_queues()
133 except:
140 except:
134 content = wrap_exception()
141 content = self._wrap_exception('shutdown')
135 else:
142 else:
136 content = dict(parent['content'])
143 content = dict(parent['content'])
137 content['status'] = 'ok'
144 content['status'] = 'ok'
@@ -214,7 +221,7 b' class Kernel(HasTraits):'
214 sys.displayhook.set_parent(parent)
221 sys.displayhook.set_parent(parent)
215 exec comp_code in self.user_ns, self.user_ns
222 exec comp_code in self.user_ns, self.user_ns
216 except:
223 except:
217 exc_content = wrap_exception()
224 exc_content = self._wrap_exception('execute')
218 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
225 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
219 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
226 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
220 reply_content = exc_content
227 reply_content = exc_content
@@ -291,13 +298,13 b' class Kernel(HasTraits):'
291 packed_result,buf = serialize_object(result)
298 packed_result,buf = serialize_object(result)
292 result_buf = [packed_result]+buf
299 result_buf = [packed_result]+buf
293 except:
300 except:
294 exc_content = wrap_exception()
301 exc_content = self._wrap_exception('apply')
295 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
302 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
296 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
303 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
297 reply_content = exc_content
304 reply_content = exc_content
298 result_buf = []
305 result_buf = []
299
306
300 if etype is UnmetDependency:
307 if exc_content['ename'] == UnmetDependency.__name__:
301 sub['dependencies_met'] = False
308 sub['dependencies_met'] = False
302 else:
309 else:
303 reply_content = {'status' : 'ok'}
310 reply_content = {'status' : 'ok'}
@@ -17,6 +17,8 b' from zmq.eventloop.zmqstream import ZMQStream'
17 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
17 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
18 from IPython.utils.newserialized import serialize, unserialize
18 from IPython.utils.newserialized import serialize, unserialize
19
19
20 from IPython.zmq.parallel.error import RemoteError
21
20 try:
22 try:
21 import cPickle
23 import cPickle
22 pickle = cPickle
24 pickle = cPickle
@@ -60,25 +62,22 b' else:'
60 DELIM="<IDS|MSG>"
62 DELIM="<IDS|MSG>"
61 ISO8601="%Y-%m-%dT%H:%M:%S.%f"
63 ISO8601="%Y-%m-%dT%H:%M:%S.%f"
62
64
63 def wrap_exception():
65 def wrap_exception(engine_info={}):
64 etype, evalue, tb = sys.exc_info()
66 etype, evalue, tb = sys.exc_info()
65 tb = traceback.format_exception(etype, evalue, tb)
67 stb = traceback.format_exception(etype, evalue, tb)
66 exc_content = {
68 exc_content = {
67 'status' : 'error',
69 'status' : 'error',
68 'traceback' : [ line.encode('utf8') for line in tb ],
70 'traceback' : stb,
69 'etype' : str(etype).encode('utf8'),
71 'ename' : unicode(etype.__name__),
70 'evalue' : evalue.encode('utf8')
72 'evalue' : unicode(evalue),
73 'engine_info' : engine_info
71 }
74 }
72 return exc_content
75 return exc_content
73
76
74 class KernelError(Exception):
75 pass
76
77 def unwrap_exception(content):
77 def unwrap_exception(content):
78 err = KernelError(content['etype'], content['evalue'])
78 err = RemoteError(content['ename'], content['evalue'],
79 err.evalue = content['evalue']
79 ''.join(content['traceback']),
80 err.etype = content['etype']
80 content.get('engine_info', {}))
81 err.traceback = ''.join(content['traceback'])
82 return err
81 return err
83
82
84
83
@@ -402,7 +401,7 b' class StreamSession(object):'
402 pprint.pprint(buffers)
401 pprint.pprint(buffers)
403 return omsg
402 return omsg
404
403
405 def send_raw(self, stream, msg, flags=0, copy=True, idents=None):
404 def send_raw(self, stream, msg, flags=0, copy=True, ident=None):
406 """Send a raw message via idents.
405 """Send a raw message via idents.
407
406
408 Parameters
407 Parameters
General Comments 0
You need to be logged in to leave comments. Login now