##// END OF EJS Templates
Backport PR #6374: fix _abort_queues typo
Thomas Kluyver -
Show More
@@ -1,800 +1,800 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """An interactive kernel that talks to frontends over 0MQ."""
2 """An interactive kernel that talks to frontends over 0MQ."""
3
3
4 #-----------------------------------------------------------------------------
4 #-----------------------------------------------------------------------------
5 # Imports
5 # Imports
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 from __future__ import print_function
7 from __future__ import print_function
8
8
9 # Standard library imports
9 # Standard library imports
10 import sys
10 import sys
11 import time
11 import time
12 import traceback
12 import traceback
13 import logging
13 import logging
14 import uuid
14 import uuid
15
15
16 from datetime import datetime
16 from datetime import datetime
17 from signal import (
17 from signal import (
18 signal, default_int_handler, SIGINT
18 signal, default_int_handler, SIGINT
19 )
19 )
20
20
21 # System library imports
21 # System library imports
22 import zmq
22 import zmq
23 from zmq.eventloop import ioloop
23 from zmq.eventloop import ioloop
24 from zmq.eventloop.zmqstream import ZMQStream
24 from zmq.eventloop.zmqstream import ZMQStream
25
25
26 # Local imports
26 # Local imports
27 from IPython.config.configurable import Configurable
27 from IPython.config.configurable import Configurable
28 from IPython.core.error import StdinNotImplementedError
28 from IPython.core.error import StdinNotImplementedError
29 from IPython.core import release
29 from IPython.core import release
30 from IPython.utils import py3compat
30 from IPython.utils import py3compat
31 from IPython.utils.py3compat import builtin_mod, unicode_type, string_types
31 from IPython.utils.py3compat import builtin_mod, unicode_type, string_types
32 from IPython.utils.jsonutil import json_clean
32 from IPython.utils.jsonutil import json_clean
33 from IPython.utils.traitlets import (
33 from IPython.utils.traitlets import (
34 Any, Instance, Float, Dict, List, Set, Integer, Unicode,
34 Any, Instance, Float, Dict, List, Set, Integer, Unicode,
35 Type, Bool,
35 Type, Bool,
36 )
36 )
37
37
38 from .serialize import serialize_object, unpack_apply_message
38 from .serialize import serialize_object, unpack_apply_message
39 from .session import Session
39 from .session import Session
40 from .zmqshell import ZMQInteractiveShell
40 from .zmqshell import ZMQInteractiveShell
41
41
42
42
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44 # Main kernel class
44 # Main kernel class
45 #-----------------------------------------------------------------------------
45 #-----------------------------------------------------------------------------
46
46
47 protocol_version = list(release.kernel_protocol_version_info)
47 protocol_version = list(release.kernel_protocol_version_info)
48 ipython_version = list(release.version_info)
48 ipython_version = list(release.version_info)
49 language_version = list(sys.version_info[:3])
49 language_version = list(sys.version_info[:3])
50
50
51
51
52 class Kernel(Configurable):
52 class Kernel(Configurable):
53
53
54 #---------------------------------------------------------------------------
54 #---------------------------------------------------------------------------
55 # Kernel interface
55 # Kernel interface
56 #---------------------------------------------------------------------------
56 #---------------------------------------------------------------------------
57
57
58 # attribute to override with a GUI
58 # attribute to override with a GUI
59 eventloop = Any(None)
59 eventloop = Any(None)
60 def _eventloop_changed(self, name, old, new):
60 def _eventloop_changed(self, name, old, new):
61 """schedule call to eventloop from IOLoop"""
61 """schedule call to eventloop from IOLoop"""
62 loop = ioloop.IOLoop.instance()
62 loop = ioloop.IOLoop.instance()
63 loop.add_callback(self.enter_eventloop)
63 loop.add_callback(self.enter_eventloop)
64
64
65 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
65 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
66 shell_class = Type(ZMQInteractiveShell)
66 shell_class = Type(ZMQInteractiveShell)
67
67
68 session = Instance(Session)
68 session = Instance(Session)
69 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
69 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
70 shell_streams = List()
70 shell_streams = List()
71 control_stream = Instance(ZMQStream)
71 control_stream = Instance(ZMQStream)
72 iopub_socket = Instance(zmq.Socket)
72 iopub_socket = Instance(zmq.Socket)
73 stdin_socket = Instance(zmq.Socket)
73 stdin_socket = Instance(zmq.Socket)
74 log = Instance(logging.Logger)
74 log = Instance(logging.Logger)
75
75
76 user_module = Any()
76 user_module = Any()
77 def _user_module_changed(self, name, old, new):
77 def _user_module_changed(self, name, old, new):
78 if self.shell is not None:
78 if self.shell is not None:
79 self.shell.user_module = new
79 self.shell.user_module = new
80
80
81 user_ns = Instance(dict, args=None, allow_none=True)
81 user_ns = Instance(dict, args=None, allow_none=True)
82 def _user_ns_changed(self, name, old, new):
82 def _user_ns_changed(self, name, old, new):
83 if self.shell is not None:
83 if self.shell is not None:
84 self.shell.user_ns = new
84 self.shell.user_ns = new
85 self.shell.init_user_ns()
85 self.shell.init_user_ns()
86
86
87 # identities:
87 # identities:
88 int_id = Integer(-1)
88 int_id = Integer(-1)
89 ident = Unicode()
89 ident = Unicode()
90
90
91 def _ident_default(self):
91 def _ident_default(self):
92 return unicode_type(uuid.uuid4())
92 return unicode_type(uuid.uuid4())
93
93
94 # Private interface
94 # Private interface
95
95
96 _darwin_app_nap = Bool(True, config=True,
96 _darwin_app_nap = Bool(True, config=True,
97 help="""Whether to use appnope for compatiblity with OS X App Nap.
97 help="""Whether to use appnope for compatiblity with OS X App Nap.
98
98
99 Only affects OS X >= 10.9.
99 Only affects OS X >= 10.9.
100 """
100 """
101 )
101 )
102
102
103 # Time to sleep after flushing the stdout/err buffers in each execute
103 # Time to sleep after flushing the stdout/err buffers in each execute
104 # cycle. While this introduces a hard limit on the minimal latency of the
104 # cycle. While this introduces a hard limit on the minimal latency of the
105 # execute cycle, it helps prevent output synchronization problems for
105 # execute cycle, it helps prevent output synchronization problems for
106 # clients.
106 # clients.
107 # Units are in seconds. The minimum zmq latency on local host is probably
107 # Units are in seconds. The minimum zmq latency on local host is probably
108 # ~150 microseconds, set this to 500us for now. We may need to increase it
108 # ~150 microseconds, set this to 500us for now. We may need to increase it
109 # a little if it's not enough after more interactive testing.
109 # a little if it's not enough after more interactive testing.
110 _execute_sleep = Float(0.0005, config=True)
110 _execute_sleep = Float(0.0005, config=True)
111
111
112 # Frequency of the kernel's event loop.
112 # Frequency of the kernel's event loop.
113 # Units are in seconds, kernel subclasses for GUI toolkits may need to
113 # Units are in seconds, kernel subclasses for GUI toolkits may need to
114 # adapt to milliseconds.
114 # adapt to milliseconds.
115 _poll_interval = Float(0.05, config=True)
115 _poll_interval = Float(0.05, config=True)
116
116
117 # If the shutdown was requested over the network, we leave here the
117 # If the shutdown was requested over the network, we leave here the
118 # necessary reply message so it can be sent by our registered atexit
118 # necessary reply message so it can be sent by our registered atexit
119 # handler. This ensures that the reply is only sent to clients truly at
119 # handler. This ensures that the reply is only sent to clients truly at
120 # the end of our shutdown process (which happens after the underlying
120 # the end of our shutdown process (which happens after the underlying
121 # IPython shell's own shutdown).
121 # IPython shell's own shutdown).
122 _shutdown_message = None
122 _shutdown_message = None
123
123
124 # This is a dict of port number that the kernel is listening on. It is set
124 # This is a dict of port number that the kernel is listening on. It is set
125 # by record_ports and used by connect_request.
125 # by record_ports and used by connect_request.
126 _recorded_ports = Dict()
126 _recorded_ports = Dict()
127
127
128 # A reference to the Python builtin 'raw_input' function.
128 # A reference to the Python builtin 'raw_input' function.
129 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
129 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
130 _sys_raw_input = Any()
130 _sys_raw_input = Any()
131 _sys_eval_input = Any()
131 _sys_eval_input = Any()
132
132
133 # set of aborted msg_ids
133 # set of aborted msg_ids
134 aborted = Set()
134 aborted = Set()
135
135
136
136
137 def __init__(self, **kwargs):
137 def __init__(self, **kwargs):
138 super(Kernel, self).__init__(**kwargs)
138 super(Kernel, self).__init__(**kwargs)
139
139
140 # Initialize the InteractiveShell subclass
140 # Initialize the InteractiveShell subclass
141 self.shell = self.shell_class.instance(parent=self,
141 self.shell = self.shell_class.instance(parent=self,
142 profile_dir = self.profile_dir,
142 profile_dir = self.profile_dir,
143 user_module = self.user_module,
143 user_module = self.user_module,
144 user_ns = self.user_ns,
144 user_ns = self.user_ns,
145 kernel = self,
145 kernel = self,
146 )
146 )
147 self.shell.displayhook.session = self.session
147 self.shell.displayhook.session = self.session
148 self.shell.displayhook.pub_socket = self.iopub_socket
148 self.shell.displayhook.pub_socket = self.iopub_socket
149 self.shell.displayhook.topic = self._topic('pyout')
149 self.shell.displayhook.topic = self._topic('pyout')
150 self.shell.display_pub.session = self.session
150 self.shell.display_pub.session = self.session
151 self.shell.display_pub.pub_socket = self.iopub_socket
151 self.shell.display_pub.pub_socket = self.iopub_socket
152 self.shell.data_pub.session = self.session
152 self.shell.data_pub.session = self.session
153 self.shell.data_pub.pub_socket = self.iopub_socket
153 self.shell.data_pub.pub_socket = self.iopub_socket
154
154
155 # TMP - hack while developing
155 # TMP - hack while developing
156 self.shell._reply_content = None
156 self.shell._reply_content = None
157
157
158 # Build dict of handlers for message types
158 # Build dict of handlers for message types
159 msg_types = [ 'execute_request', 'complete_request',
159 msg_types = [ 'execute_request', 'complete_request',
160 'object_info_request', 'history_request',
160 'object_info_request', 'history_request',
161 'kernel_info_request',
161 'kernel_info_request',
162 'connect_request', 'shutdown_request',
162 'connect_request', 'shutdown_request',
163 'apply_request',
163 'apply_request',
164 ]
164 ]
165 self.shell_handlers = {}
165 self.shell_handlers = {}
166 for msg_type in msg_types:
166 for msg_type in msg_types:
167 self.shell_handlers[msg_type] = getattr(self, msg_type)
167 self.shell_handlers[msg_type] = getattr(self, msg_type)
168
168
169 comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ]
169 comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ]
170 comm_manager = self.shell.comm_manager
170 comm_manager = self.shell.comm_manager
171 for msg_type in comm_msg_types:
171 for msg_type in comm_msg_types:
172 self.shell_handlers[msg_type] = getattr(comm_manager, msg_type)
172 self.shell_handlers[msg_type] = getattr(comm_manager, msg_type)
173
173
174 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
174 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
175 self.control_handlers = {}
175 self.control_handlers = {}
176 for msg_type in control_msg_types:
176 for msg_type in control_msg_types:
177 self.control_handlers[msg_type] = getattr(self, msg_type)
177 self.control_handlers[msg_type] = getattr(self, msg_type)
178
178
179
179
180 def dispatch_control(self, msg):
180 def dispatch_control(self, msg):
181 """dispatch control requests"""
181 """dispatch control requests"""
182 idents,msg = self.session.feed_identities(msg, copy=False)
182 idents,msg = self.session.feed_identities(msg, copy=False)
183 try:
183 try:
184 msg = self.session.unserialize(msg, content=True, copy=False)
184 msg = self.session.unserialize(msg, content=True, copy=False)
185 except:
185 except:
186 self.log.error("Invalid Control Message", exc_info=True)
186 self.log.error("Invalid Control Message", exc_info=True)
187 return
187 return
188
188
189 self.log.debug("Control received: %s", msg)
189 self.log.debug("Control received: %s", msg)
190
190
191 header = msg['header']
191 header = msg['header']
192 msg_id = header['msg_id']
192 msg_id = header['msg_id']
193 msg_type = header['msg_type']
193 msg_type = header['msg_type']
194
194
195 handler = self.control_handlers.get(msg_type, None)
195 handler = self.control_handlers.get(msg_type, None)
196 if handler is None:
196 if handler is None:
197 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
197 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
198 else:
198 else:
199 try:
199 try:
200 handler(self.control_stream, idents, msg)
200 handler(self.control_stream, idents, msg)
201 except Exception:
201 except Exception:
202 self.log.error("Exception in control handler:", exc_info=True)
202 self.log.error("Exception in control handler:", exc_info=True)
203
203
204 def dispatch_shell(self, stream, msg):
204 def dispatch_shell(self, stream, msg):
205 """dispatch shell requests"""
205 """dispatch shell requests"""
206 # flush control requests first
206 # flush control requests first
207 if self.control_stream:
207 if self.control_stream:
208 self.control_stream.flush()
208 self.control_stream.flush()
209
209
210 idents,msg = self.session.feed_identities(msg, copy=False)
210 idents,msg = self.session.feed_identities(msg, copy=False)
211 try:
211 try:
212 msg = self.session.unserialize(msg, content=True, copy=False)
212 msg = self.session.unserialize(msg, content=True, copy=False)
213 except:
213 except:
214 self.log.error("Invalid Message", exc_info=True)
214 self.log.error("Invalid Message", exc_info=True)
215 return
215 return
216
216
217 header = msg['header']
217 header = msg['header']
218 msg_id = header['msg_id']
218 msg_id = header['msg_id']
219 msg_type = msg['header']['msg_type']
219 msg_type = msg['header']['msg_type']
220
220
221 # Print some info about this message and leave a '--->' marker, so it's
221 # Print some info about this message and leave a '--->' marker, so it's
222 # easier to trace visually the message chain when debugging. Each
222 # easier to trace visually the message chain when debugging. Each
223 # handler prints its message at the end.
223 # handler prints its message at the end.
224 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
224 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
225 self.log.debug(' Content: %s\n --->\n ', msg['content'])
225 self.log.debug(' Content: %s\n --->\n ', msg['content'])
226
226
227 if msg_id in self.aborted:
227 if msg_id in self.aborted:
228 self.aborted.remove(msg_id)
228 self.aborted.remove(msg_id)
229 # is it safe to assume a msg_id will not be resubmitted?
229 # is it safe to assume a msg_id will not be resubmitted?
230 reply_type = msg_type.split('_')[0] + '_reply'
230 reply_type = msg_type.split('_')[0] + '_reply'
231 status = {'status' : 'aborted'}
231 status = {'status' : 'aborted'}
232 md = {'engine' : self.ident}
232 md = {'engine' : self.ident}
233 md.update(status)
233 md.update(status)
234 reply_msg = self.session.send(stream, reply_type, metadata=md,
234 reply_msg = self.session.send(stream, reply_type, metadata=md,
235 content=status, parent=msg, ident=idents)
235 content=status, parent=msg, ident=idents)
236 return
236 return
237
237
238 handler = self.shell_handlers.get(msg_type, None)
238 handler = self.shell_handlers.get(msg_type, None)
239 if handler is None:
239 if handler is None:
240 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
240 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
241 else:
241 else:
242 # ensure default_int_handler during handler call
242 # ensure default_int_handler during handler call
243 sig = signal(SIGINT, default_int_handler)
243 sig = signal(SIGINT, default_int_handler)
244 try:
244 try:
245 handler(stream, idents, msg)
245 handler(stream, idents, msg)
246 except Exception:
246 except Exception:
247 self.log.error("Exception in message handler:", exc_info=True)
247 self.log.error("Exception in message handler:", exc_info=True)
248 finally:
248 finally:
249 signal(SIGINT, sig)
249 signal(SIGINT, sig)
250
250
251 def enter_eventloop(self):
251 def enter_eventloop(self):
252 """enter eventloop"""
252 """enter eventloop"""
253 self.log.info("entering eventloop %s", self.eventloop)
253 self.log.info("entering eventloop %s", self.eventloop)
254 for stream in self.shell_streams:
254 for stream in self.shell_streams:
255 # flush any pending replies,
255 # flush any pending replies,
256 # which may be skipped by entering the eventloop
256 # which may be skipped by entering the eventloop
257 stream.flush(zmq.POLLOUT)
257 stream.flush(zmq.POLLOUT)
258 # restore default_int_handler
258 # restore default_int_handler
259 signal(SIGINT, default_int_handler)
259 signal(SIGINT, default_int_handler)
260 while self.eventloop is not None:
260 while self.eventloop is not None:
261 try:
261 try:
262 self.eventloop(self)
262 self.eventloop(self)
263 except KeyboardInterrupt:
263 except KeyboardInterrupt:
264 # Ctrl-C shouldn't crash the kernel
264 # Ctrl-C shouldn't crash the kernel
265 self.log.error("KeyboardInterrupt caught in kernel")
265 self.log.error("KeyboardInterrupt caught in kernel")
266 continue
266 continue
267 else:
267 else:
268 # eventloop exited cleanly, this means we should stop (right?)
268 # eventloop exited cleanly, this means we should stop (right?)
269 self.eventloop = None
269 self.eventloop = None
270 break
270 break
271 self.log.info("exiting eventloop")
271 self.log.info("exiting eventloop")
272
272
273 def start(self):
273 def start(self):
274 """register dispatchers for streams"""
274 """register dispatchers for streams"""
275 self.shell.exit_now = False
275 self.shell.exit_now = False
276 if self.control_stream:
276 if self.control_stream:
277 self.control_stream.on_recv(self.dispatch_control, copy=False)
277 self.control_stream.on_recv(self.dispatch_control, copy=False)
278
278
279 def make_dispatcher(stream):
279 def make_dispatcher(stream):
280 def dispatcher(msg):
280 def dispatcher(msg):
281 return self.dispatch_shell(stream, msg)
281 return self.dispatch_shell(stream, msg)
282 return dispatcher
282 return dispatcher
283
283
284 for s in self.shell_streams:
284 for s in self.shell_streams:
285 s.on_recv(make_dispatcher(s), copy=False)
285 s.on_recv(make_dispatcher(s), copy=False)
286
286
287 # publish idle status
287 # publish idle status
288 self._publish_status('starting')
288 self._publish_status('starting')
289
289
290 def do_one_iteration(self):
290 def do_one_iteration(self):
291 """step eventloop just once"""
291 """step eventloop just once"""
292 if self.control_stream:
292 if self.control_stream:
293 self.control_stream.flush()
293 self.control_stream.flush()
294 for stream in self.shell_streams:
294 for stream in self.shell_streams:
295 # handle at most one request per iteration
295 # handle at most one request per iteration
296 stream.flush(zmq.POLLIN, 1)
296 stream.flush(zmq.POLLIN, 1)
297 stream.flush(zmq.POLLOUT)
297 stream.flush(zmq.POLLOUT)
298
298
299
299
300 def record_ports(self, ports):
300 def record_ports(self, ports):
301 """Record the ports that this kernel is using.
301 """Record the ports that this kernel is using.
302
302
303 The creator of the Kernel instance must call this methods if they
303 The creator of the Kernel instance must call this methods if they
304 want the :meth:`connect_request` method to return the port numbers.
304 want the :meth:`connect_request` method to return the port numbers.
305 """
305 """
306 self._recorded_ports = ports
306 self._recorded_ports = ports
307
307
308 #---------------------------------------------------------------------------
308 #---------------------------------------------------------------------------
309 # Kernel request handlers
309 # Kernel request handlers
310 #---------------------------------------------------------------------------
310 #---------------------------------------------------------------------------
311
311
312 def _make_metadata(self, other=None):
312 def _make_metadata(self, other=None):
313 """init metadata dict, for execute/apply_reply"""
313 """init metadata dict, for execute/apply_reply"""
314 new_md = {
314 new_md = {
315 'dependencies_met' : True,
315 'dependencies_met' : True,
316 'engine' : self.ident,
316 'engine' : self.ident,
317 'started': datetime.now(),
317 'started': datetime.now(),
318 }
318 }
319 if other:
319 if other:
320 new_md.update(other)
320 new_md.update(other)
321 return new_md
321 return new_md
322
322
323 def _publish_pyin(self, code, parent, execution_count):
323 def _publish_pyin(self, code, parent, execution_count):
324 """Publish the code request on the pyin stream."""
324 """Publish the code request on the pyin stream."""
325
325
326 self.session.send(self.iopub_socket, u'pyin',
326 self.session.send(self.iopub_socket, u'pyin',
327 {u'code':code, u'execution_count': execution_count},
327 {u'code':code, u'execution_count': execution_count},
328 parent=parent, ident=self._topic('pyin')
328 parent=parent, ident=self._topic('pyin')
329 )
329 )
330
330
331 def _publish_status(self, status, parent=None):
331 def _publish_status(self, status, parent=None):
332 """send status (busy/idle) on IOPub"""
332 """send status (busy/idle) on IOPub"""
333 self.session.send(self.iopub_socket,
333 self.session.send(self.iopub_socket,
334 u'status',
334 u'status',
335 {u'execution_state': status},
335 {u'execution_state': status},
336 parent=parent,
336 parent=parent,
337 ident=self._topic('status'),
337 ident=self._topic('status'),
338 )
338 )
339
339
340
340
341 def execute_request(self, stream, ident, parent):
341 def execute_request(self, stream, ident, parent):
342 """handle an execute_request"""
342 """handle an execute_request"""
343
343
344 self._publish_status(u'busy', parent)
344 self._publish_status(u'busy', parent)
345
345
346 try:
346 try:
347 content = parent[u'content']
347 content = parent[u'content']
348 code = py3compat.cast_unicode_py2(content[u'code'])
348 code = py3compat.cast_unicode_py2(content[u'code'])
349 silent = content[u'silent']
349 silent = content[u'silent']
350 store_history = content.get(u'store_history', not silent)
350 store_history = content.get(u'store_history', not silent)
351 except:
351 except:
352 self.log.error("Got bad msg: ")
352 self.log.error("Got bad msg: ")
353 self.log.error("%s", parent)
353 self.log.error("%s", parent)
354 return
354 return
355
355
356 md = self._make_metadata(parent['metadata'])
356 md = self._make_metadata(parent['metadata'])
357
357
358 shell = self.shell # we'll need this a lot here
358 shell = self.shell # we'll need this a lot here
359
359
360 # Replace raw_input. Note that is not sufficient to replace
360 # Replace raw_input. Note that is not sufficient to replace
361 # raw_input in the user namespace.
361 # raw_input in the user namespace.
362 if content.get('allow_stdin', False):
362 if content.get('allow_stdin', False):
363 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
363 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
364 input = lambda prompt='': eval(raw_input(prompt))
364 input = lambda prompt='': eval(raw_input(prompt))
365 else:
365 else:
366 raw_input = input = lambda prompt='' : self._no_raw_input()
366 raw_input = input = lambda prompt='' : self._no_raw_input()
367
367
368 if py3compat.PY3:
368 if py3compat.PY3:
369 self._sys_raw_input = builtin_mod.input
369 self._sys_raw_input = builtin_mod.input
370 builtin_mod.input = raw_input
370 builtin_mod.input = raw_input
371 else:
371 else:
372 self._sys_raw_input = builtin_mod.raw_input
372 self._sys_raw_input = builtin_mod.raw_input
373 self._sys_eval_input = builtin_mod.input
373 self._sys_eval_input = builtin_mod.input
374 builtin_mod.raw_input = raw_input
374 builtin_mod.raw_input = raw_input
375 builtin_mod.input = input
375 builtin_mod.input = input
376
376
377 # Set the parent message of the display hook and out streams.
377 # Set the parent message of the display hook and out streams.
378 shell.set_parent(parent)
378 shell.set_parent(parent)
379
379
380 # Re-broadcast our input for the benefit of listening clients, and
380 # Re-broadcast our input for the benefit of listening clients, and
381 # start computing output
381 # start computing output
382 if not silent:
382 if not silent:
383 self._publish_pyin(code, parent, shell.execution_count)
383 self._publish_pyin(code, parent, shell.execution_count)
384
384
385 reply_content = {}
385 reply_content = {}
386 # FIXME: the shell calls the exception handler itself.
386 # FIXME: the shell calls the exception handler itself.
387 shell._reply_content = None
387 shell._reply_content = None
388 try:
388 try:
389 shell.run_cell(code, store_history=store_history, silent=silent)
389 shell.run_cell(code, store_history=store_history, silent=silent)
390 except:
390 except:
391 status = u'error'
391 status = u'error'
392 # FIXME: this code right now isn't being used yet by default,
392 # FIXME: this code right now isn't being used yet by default,
393 # because the run_cell() call above directly fires off exception
393 # because the run_cell() call above directly fires off exception
394 # reporting. This code, therefore, is only active in the scenario
394 # reporting. This code, therefore, is only active in the scenario
395 # where runlines itself has an unhandled exception. We need to
395 # where runlines itself has an unhandled exception. We need to
396 # uniformize this, for all exception construction to come from a
396 # uniformize this, for all exception construction to come from a
397 # single location in the codbase.
397 # single location in the codbase.
398 etype, evalue, tb = sys.exc_info()
398 etype, evalue, tb = sys.exc_info()
399 tb_list = traceback.format_exception(etype, evalue, tb)
399 tb_list = traceback.format_exception(etype, evalue, tb)
400 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
400 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
401 else:
401 else:
402 status = u'ok'
402 status = u'ok'
403 finally:
403 finally:
404 # Restore raw_input.
404 # Restore raw_input.
405 if py3compat.PY3:
405 if py3compat.PY3:
406 builtin_mod.input = self._sys_raw_input
406 builtin_mod.input = self._sys_raw_input
407 else:
407 else:
408 builtin_mod.raw_input = self._sys_raw_input
408 builtin_mod.raw_input = self._sys_raw_input
409 builtin_mod.input = self._sys_eval_input
409 builtin_mod.input = self._sys_eval_input
410
410
411 reply_content[u'status'] = status
411 reply_content[u'status'] = status
412
412
413 # Return the execution counter so clients can display prompts
413 # Return the execution counter so clients can display prompts
414 reply_content['execution_count'] = shell.execution_count - 1
414 reply_content['execution_count'] = shell.execution_count - 1
415
415
416 # FIXME - fish exception info out of shell, possibly left there by
416 # FIXME - fish exception info out of shell, possibly left there by
417 # runlines. We'll need to clean up this logic later.
417 # runlines. We'll need to clean up this logic later.
418 if shell._reply_content is not None:
418 if shell._reply_content is not None:
419 reply_content.update(shell._reply_content)
419 reply_content.update(shell._reply_content)
420 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
420 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
421 reply_content['engine_info'] = e_info
421 reply_content['engine_info'] = e_info
422 # reset after use
422 # reset after use
423 shell._reply_content = None
423 shell._reply_content = None
424
424
425 if 'traceback' in reply_content:
425 if 'traceback' in reply_content:
426 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
426 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
427
427
428
428
429 # At this point, we can tell whether the main code execution succeeded
429 # At this point, we can tell whether the main code execution succeeded
430 # or not. If it did, we proceed to evaluate user_variables/expressions
430 # or not. If it did, we proceed to evaluate user_variables/expressions
431 if reply_content['status'] == 'ok':
431 if reply_content['status'] == 'ok':
432 reply_content[u'user_variables'] = \
432 reply_content[u'user_variables'] = \
433 shell.user_variables(content.get(u'user_variables', []))
433 shell.user_variables(content.get(u'user_variables', []))
434 reply_content[u'user_expressions'] = \
434 reply_content[u'user_expressions'] = \
435 shell.user_expressions(content.get(u'user_expressions', {}))
435 shell.user_expressions(content.get(u'user_expressions', {}))
436 else:
436 else:
437 # If there was an error, don't even try to compute variables or
437 # If there was an error, don't even try to compute variables or
438 # expressions
438 # expressions
439 reply_content[u'user_variables'] = {}
439 reply_content[u'user_variables'] = {}
440 reply_content[u'user_expressions'] = {}
440 reply_content[u'user_expressions'] = {}
441
441
442 # Payloads should be retrieved regardless of outcome, so we can both
442 # Payloads should be retrieved regardless of outcome, so we can both
443 # recover partial output (that could have been generated early in a
443 # recover partial output (that could have been generated early in a
444 # block, before an error) and clear the payload system always.
444 # block, before an error) and clear the payload system always.
445 reply_content[u'payload'] = shell.payload_manager.read_payload()
445 reply_content[u'payload'] = shell.payload_manager.read_payload()
446 # Be agressive about clearing the payload because we don't want
446 # Be agressive about clearing the payload because we don't want
447 # it to sit in memory until the next execute_request comes in.
447 # it to sit in memory until the next execute_request comes in.
448 shell.payload_manager.clear_payload()
448 shell.payload_manager.clear_payload()
449
449
450 # Flush output before sending the reply.
450 # Flush output before sending the reply.
451 sys.stdout.flush()
451 sys.stdout.flush()
452 sys.stderr.flush()
452 sys.stderr.flush()
453 # FIXME: on rare occasions, the flush doesn't seem to make it to the
453 # FIXME: on rare occasions, the flush doesn't seem to make it to the
454 # clients... This seems to mitigate the problem, but we definitely need
454 # clients... This seems to mitigate the problem, but we definitely need
455 # to better understand what's going on.
455 # to better understand what's going on.
456 if self._execute_sleep:
456 if self._execute_sleep:
457 time.sleep(self._execute_sleep)
457 time.sleep(self._execute_sleep)
458
458
459 # Send the reply.
459 # Send the reply.
460 reply_content = json_clean(reply_content)
460 reply_content = json_clean(reply_content)
461
461
462 md['status'] = reply_content['status']
462 md['status'] = reply_content['status']
463 if reply_content['status'] == 'error' and \
463 if reply_content['status'] == 'error' and \
464 reply_content['ename'] == 'UnmetDependency':
464 reply_content['ename'] == 'UnmetDependency':
465 md['dependencies_met'] = False
465 md['dependencies_met'] = False
466
466
467 reply_msg = self.session.send(stream, u'execute_reply',
467 reply_msg = self.session.send(stream, u'execute_reply',
468 reply_content, parent, metadata=md,
468 reply_content, parent, metadata=md,
469 ident=ident)
469 ident=ident)
470
470
471 self.log.debug("%s", reply_msg)
471 self.log.debug("%s", reply_msg)
472
472
473 if not silent and reply_msg['content']['status'] == u'error':
473 if not silent and reply_msg['content']['status'] == u'error':
474 self._abort_queues()
474 self._abort_queues()
475
475
476 self._publish_status(u'idle', parent)
476 self._publish_status(u'idle', parent)
477
477
478 def complete_request(self, stream, ident, parent):
478 def complete_request(self, stream, ident, parent):
479 txt, matches = self._complete(parent)
479 txt, matches = self._complete(parent)
480 matches = {'matches' : matches,
480 matches = {'matches' : matches,
481 'matched_text' : txt,
481 'matched_text' : txt,
482 'status' : 'ok'}
482 'status' : 'ok'}
483 matches = json_clean(matches)
483 matches = json_clean(matches)
484 completion_msg = self.session.send(stream, 'complete_reply',
484 completion_msg = self.session.send(stream, 'complete_reply',
485 matches, parent, ident)
485 matches, parent, ident)
486 self.log.debug("%s", completion_msg)
486 self.log.debug("%s", completion_msg)
487
487
488 def object_info_request(self, stream, ident, parent):
488 def object_info_request(self, stream, ident, parent):
489 content = parent['content']
489 content = parent['content']
490 object_info = self.shell.object_inspect(content['oname'],
490 object_info = self.shell.object_inspect(content['oname'],
491 detail_level = content.get('detail_level', 0)
491 detail_level = content.get('detail_level', 0)
492 )
492 )
493 # Before we send this object over, we scrub it for JSON usage
493 # Before we send this object over, we scrub it for JSON usage
494 oinfo = json_clean(object_info)
494 oinfo = json_clean(object_info)
495 msg = self.session.send(stream, 'object_info_reply',
495 msg = self.session.send(stream, 'object_info_reply',
496 oinfo, parent, ident)
496 oinfo, parent, ident)
497 self.log.debug("%s", msg)
497 self.log.debug("%s", msg)
498
498
499 def history_request(self, stream, ident, parent):
499 def history_request(self, stream, ident, parent):
500 # We need to pull these out, as passing **kwargs doesn't work with
500 # We need to pull these out, as passing **kwargs doesn't work with
501 # unicode keys before Python 2.6.5.
501 # unicode keys before Python 2.6.5.
502 hist_access_type = parent['content']['hist_access_type']
502 hist_access_type = parent['content']['hist_access_type']
503 raw = parent['content']['raw']
503 raw = parent['content']['raw']
504 output = parent['content']['output']
504 output = parent['content']['output']
505 if hist_access_type == 'tail':
505 if hist_access_type == 'tail':
506 n = parent['content']['n']
506 n = parent['content']['n']
507 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
507 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
508 include_latest=True)
508 include_latest=True)
509
509
510 elif hist_access_type == 'range':
510 elif hist_access_type == 'range':
511 session = parent['content']['session']
511 session = parent['content']['session']
512 start = parent['content']['start']
512 start = parent['content']['start']
513 stop = parent['content']['stop']
513 stop = parent['content']['stop']
514 hist = self.shell.history_manager.get_range(session, start, stop,
514 hist = self.shell.history_manager.get_range(session, start, stop,
515 raw=raw, output=output)
515 raw=raw, output=output)
516
516
517 elif hist_access_type == 'search':
517 elif hist_access_type == 'search':
518 n = parent['content'].get('n')
518 n = parent['content'].get('n')
519 unique = parent['content'].get('unique', False)
519 unique = parent['content'].get('unique', False)
520 pattern = parent['content']['pattern']
520 pattern = parent['content']['pattern']
521 hist = self.shell.history_manager.search(
521 hist = self.shell.history_manager.search(
522 pattern, raw=raw, output=output, n=n, unique=unique)
522 pattern, raw=raw, output=output, n=n, unique=unique)
523
523
524 else:
524 else:
525 hist = []
525 hist = []
526 hist = list(hist)
526 hist = list(hist)
527 content = {'history' : hist}
527 content = {'history' : hist}
528 content = json_clean(content)
528 content = json_clean(content)
529 msg = self.session.send(stream, 'history_reply',
529 msg = self.session.send(stream, 'history_reply',
530 content, parent, ident)
530 content, parent, ident)
531 self.log.debug("Sending history reply with %i entries", len(hist))
531 self.log.debug("Sending history reply with %i entries", len(hist))
532
532
533 def connect_request(self, stream, ident, parent):
533 def connect_request(self, stream, ident, parent):
534 if self._recorded_ports is not None:
534 if self._recorded_ports is not None:
535 content = self._recorded_ports.copy()
535 content = self._recorded_ports.copy()
536 else:
536 else:
537 content = {}
537 content = {}
538 msg = self.session.send(stream, 'connect_reply',
538 msg = self.session.send(stream, 'connect_reply',
539 content, parent, ident)
539 content, parent, ident)
540 self.log.debug("%s", msg)
540 self.log.debug("%s", msg)
541
541
542 def kernel_info_request(self, stream, ident, parent):
542 def kernel_info_request(self, stream, ident, parent):
543 vinfo = {
543 vinfo = {
544 'protocol_version': protocol_version,
544 'protocol_version': protocol_version,
545 'ipython_version': ipython_version,
545 'ipython_version': ipython_version,
546 'language_version': language_version,
546 'language_version': language_version,
547 'language': 'python',
547 'language': 'python',
548 }
548 }
549 msg = self.session.send(stream, 'kernel_info_reply',
549 msg = self.session.send(stream, 'kernel_info_reply',
550 vinfo, parent, ident)
550 vinfo, parent, ident)
551 self.log.debug("%s", msg)
551 self.log.debug("%s", msg)
552
552
553 def shutdown_request(self, stream, ident, parent):
553 def shutdown_request(self, stream, ident, parent):
554 self.shell.exit_now = True
554 self.shell.exit_now = True
555 content = dict(status='ok')
555 content = dict(status='ok')
556 content.update(parent['content'])
556 content.update(parent['content'])
557 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
557 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
558 # same content, but different msg_id for broadcasting on IOPub
558 # same content, but different msg_id for broadcasting on IOPub
559 self._shutdown_message = self.session.msg(u'shutdown_reply',
559 self._shutdown_message = self.session.msg(u'shutdown_reply',
560 content, parent
560 content, parent
561 )
561 )
562
562
563 self._at_shutdown()
563 self._at_shutdown()
564 # call sys.exit after a short delay
564 # call sys.exit after a short delay
565 loop = ioloop.IOLoop.instance()
565 loop = ioloop.IOLoop.instance()
566 loop.add_timeout(time.time()+0.1, loop.stop)
566 loop.add_timeout(time.time()+0.1, loop.stop)
567
567
568 #---------------------------------------------------------------------------
568 #---------------------------------------------------------------------------
569 # Engine methods
569 # Engine methods
570 #---------------------------------------------------------------------------
570 #---------------------------------------------------------------------------
571
571
572 def apply_request(self, stream, ident, parent):
572 def apply_request(self, stream, ident, parent):
573 try:
573 try:
574 content = parent[u'content']
574 content = parent[u'content']
575 bufs = parent[u'buffers']
575 bufs = parent[u'buffers']
576 msg_id = parent['header']['msg_id']
576 msg_id = parent['header']['msg_id']
577 except:
577 except:
578 self.log.error("Got bad msg: %s", parent, exc_info=True)
578 self.log.error("Got bad msg: %s", parent, exc_info=True)
579 return
579 return
580
580
581 self._publish_status(u'busy', parent)
581 self._publish_status(u'busy', parent)
582
582
583 # Set the parent message of the display hook and out streams.
583 # Set the parent message of the display hook and out streams.
584 shell = self.shell
584 shell = self.shell
585 shell.set_parent(parent)
585 shell.set_parent(parent)
586
586
587 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
587 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
588 # self.iopub_socket.send(pyin_msg)
588 # self.iopub_socket.send(pyin_msg)
589 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
589 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
590 md = self._make_metadata(parent['metadata'])
590 md = self._make_metadata(parent['metadata'])
591 try:
591 try:
592 working = shell.user_ns
592 working = shell.user_ns
593
593
594 prefix = "_"+str(msg_id).replace("-","")+"_"
594 prefix = "_"+str(msg_id).replace("-","")+"_"
595
595
596 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
596 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
597
597
598 fname = getattr(f, '__name__', 'f')
598 fname = getattr(f, '__name__', 'f')
599
599
600 fname = prefix+"f"
600 fname = prefix+"f"
601 argname = prefix+"args"
601 argname = prefix+"args"
602 kwargname = prefix+"kwargs"
602 kwargname = prefix+"kwargs"
603 resultname = prefix+"result"
603 resultname = prefix+"result"
604
604
605 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
605 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
606 # print ns
606 # print ns
607 working.update(ns)
607 working.update(ns)
608 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
608 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
609 try:
609 try:
610 exec(code, shell.user_global_ns, shell.user_ns)
610 exec(code, shell.user_global_ns, shell.user_ns)
611 result = working.get(resultname)
611 result = working.get(resultname)
612 finally:
612 finally:
613 for key in ns:
613 for key in ns:
614 working.pop(key)
614 working.pop(key)
615
615
616 result_buf = serialize_object(result,
616 result_buf = serialize_object(result,
617 buffer_threshold=self.session.buffer_threshold,
617 buffer_threshold=self.session.buffer_threshold,
618 item_threshold=self.session.item_threshold,
618 item_threshold=self.session.item_threshold,
619 )
619 )
620
620
621 except:
621 except:
622 # invoke IPython traceback formatting
622 # invoke IPython traceback formatting
623 shell.showtraceback()
623 shell.showtraceback()
624 # FIXME - fish exception info out of shell, possibly left there by
624 # FIXME - fish exception info out of shell, possibly left there by
625 # run_code. We'll need to clean up this logic later.
625 # run_code. We'll need to clean up this logic later.
626 reply_content = {}
626 reply_content = {}
627 if shell._reply_content is not None:
627 if shell._reply_content is not None:
628 reply_content.update(shell._reply_content)
628 reply_content.update(shell._reply_content)
629 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
629 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
630 reply_content['engine_info'] = e_info
630 reply_content['engine_info'] = e_info
631 # reset after use
631 # reset after use
632 shell._reply_content = None
632 shell._reply_content = None
633
633
634 self.session.send(self.iopub_socket, u'pyerr', reply_content, parent=parent,
634 self.session.send(self.iopub_socket, u'pyerr', reply_content, parent=parent,
635 ident=self._topic('pyerr'))
635 ident=self._topic('pyerr'))
636 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
636 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
637 result_buf = []
637 result_buf = []
638
638
639 if reply_content['ename'] == 'UnmetDependency':
639 if reply_content['ename'] == 'UnmetDependency':
640 md['dependencies_met'] = False
640 md['dependencies_met'] = False
641 else:
641 else:
642 reply_content = {'status' : 'ok'}
642 reply_content = {'status' : 'ok'}
643
643
644 # put 'ok'/'error' status in header, for scheduler introspection:
644 # put 'ok'/'error' status in header, for scheduler introspection:
645 md['status'] = reply_content['status']
645 md['status'] = reply_content['status']
646
646
647 # flush i/o
647 # flush i/o
648 sys.stdout.flush()
648 sys.stdout.flush()
649 sys.stderr.flush()
649 sys.stderr.flush()
650
650
651 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
651 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
652 parent=parent, ident=ident,buffers=result_buf, metadata=md)
652 parent=parent, ident=ident,buffers=result_buf, metadata=md)
653
653
654 self._publish_status(u'idle', parent)
654 self._publish_status(u'idle', parent)
655
655
656 #---------------------------------------------------------------------------
656 #---------------------------------------------------------------------------
657 # Control messages
657 # Control messages
658 #---------------------------------------------------------------------------
658 #---------------------------------------------------------------------------
659
659
660 def abort_request(self, stream, ident, parent):
660 def abort_request(self, stream, ident, parent):
661 """abort a specifig msg by id"""
661 """abort a specific msg by id"""
662 msg_ids = parent['content'].get('msg_ids', None)
662 msg_ids = parent['content'].get('msg_ids', None)
663 if isinstance(msg_ids, string_types):
663 if isinstance(msg_ids, string_types):
664 msg_ids = [msg_ids]
664 msg_ids = [msg_ids]
665 if not msg_ids:
665 if not msg_ids:
666 self.abort_queues()
666 self._abort_queues()
667 for mid in msg_ids:
667 for mid in msg_ids:
668 self.aborted.add(str(mid))
668 self.aborted.add(str(mid))
669
669
670 content = dict(status='ok')
670 content = dict(status='ok')
671 reply_msg = self.session.send(stream, 'abort_reply', content=content,
671 reply_msg = self.session.send(stream, 'abort_reply', content=content,
672 parent=parent, ident=ident)
672 parent=parent, ident=ident)
673 self.log.debug("%s", reply_msg)
673 self.log.debug("%s", reply_msg)
674
674
675 def clear_request(self, stream, idents, parent):
675 def clear_request(self, stream, idents, parent):
676 """Clear our namespace."""
676 """Clear our namespace."""
677 self.shell.reset(False)
677 self.shell.reset(False)
678 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
678 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
679 content = dict(status='ok'))
679 content = dict(status='ok'))
680
680
681
681
682 #---------------------------------------------------------------------------
682 #---------------------------------------------------------------------------
683 # Protected interface
683 # Protected interface
684 #---------------------------------------------------------------------------
684 #---------------------------------------------------------------------------
685
685
686 def _wrap_exception(self, method=None):
686 def _wrap_exception(self, method=None):
687 # import here, because _wrap_exception is only used in parallel,
687 # import here, because _wrap_exception is only used in parallel,
688 # and parallel has higher min pyzmq version
688 # and parallel has higher min pyzmq version
689 from IPython.parallel.error import wrap_exception
689 from IPython.parallel.error import wrap_exception
690 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
690 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
691 content = wrap_exception(e_info)
691 content = wrap_exception(e_info)
692 return content
692 return content
693
693
694 def _topic(self, topic):
694 def _topic(self, topic):
695 """prefixed topic for IOPub messages"""
695 """prefixed topic for IOPub messages"""
696 if self.int_id >= 0:
696 if self.int_id >= 0:
697 base = "engine.%i" % self.int_id
697 base = "engine.%i" % self.int_id
698 else:
698 else:
699 base = "kernel.%s" % self.ident
699 base = "kernel.%s" % self.ident
700
700
701 return py3compat.cast_bytes("%s.%s" % (base, topic))
701 return py3compat.cast_bytes("%s.%s" % (base, topic))
702
702
703 def _abort_queues(self):
703 def _abort_queues(self):
704 for stream in self.shell_streams:
704 for stream in self.shell_streams:
705 if stream:
705 if stream:
706 self._abort_queue(stream)
706 self._abort_queue(stream)
707
707
708 def _abort_queue(self, stream):
708 def _abort_queue(self, stream):
709 poller = zmq.Poller()
709 poller = zmq.Poller()
710 poller.register(stream.socket, zmq.POLLIN)
710 poller.register(stream.socket, zmq.POLLIN)
711 while True:
711 while True:
712 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
712 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
713 if msg is None:
713 if msg is None:
714 return
714 return
715
715
716 self.log.info("Aborting:")
716 self.log.info("Aborting:")
717 self.log.info("%s", msg)
717 self.log.info("%s", msg)
718 msg_type = msg['header']['msg_type']
718 msg_type = msg['header']['msg_type']
719 reply_type = msg_type.split('_')[0] + '_reply'
719 reply_type = msg_type.split('_')[0] + '_reply'
720
720
721 status = {'status' : 'aborted'}
721 status = {'status' : 'aborted'}
722 md = {'engine' : self.ident}
722 md = {'engine' : self.ident}
723 md.update(status)
723 md.update(status)
724 reply_msg = self.session.send(stream, reply_type, metadata=md,
724 reply_msg = self.session.send(stream, reply_type, metadata=md,
725 content=status, parent=msg, ident=idents)
725 content=status, parent=msg, ident=idents)
726 self.log.debug("%s", reply_msg)
726 self.log.debug("%s", reply_msg)
727 # We need to wait a bit for requests to come in. This can probably
727 # We need to wait a bit for requests to come in. This can probably
728 # be set shorter for true asynchronous clients.
728 # be set shorter for true asynchronous clients.
729 poller.poll(50)
729 poller.poll(50)
730
730
731
731
732 def _no_raw_input(self):
732 def _no_raw_input(self):
733 """Raise StdinNotImplentedError if active frontend doesn't support
733 """Raise StdinNotImplentedError if active frontend doesn't support
734 stdin."""
734 stdin."""
735 raise StdinNotImplementedError("raw_input was called, but this "
735 raise StdinNotImplementedError("raw_input was called, but this "
736 "frontend does not support stdin.")
736 "frontend does not support stdin.")
737
737
738 def _raw_input(self, prompt, ident, parent):
738 def _raw_input(self, prompt, ident, parent):
739 # Flush output before making the request.
739 # Flush output before making the request.
740 sys.stderr.flush()
740 sys.stderr.flush()
741 sys.stdout.flush()
741 sys.stdout.flush()
742 # flush the stdin socket, to purge stale replies
742 # flush the stdin socket, to purge stale replies
743 while True:
743 while True:
744 try:
744 try:
745 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
745 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
746 except zmq.ZMQError as e:
746 except zmq.ZMQError as e:
747 if e.errno == zmq.EAGAIN:
747 if e.errno == zmq.EAGAIN:
748 break
748 break
749 else:
749 else:
750 raise
750 raise
751
751
752 # Send the input request.
752 # Send the input request.
753 content = json_clean(dict(prompt=prompt))
753 content = json_clean(dict(prompt=prompt))
754 self.session.send(self.stdin_socket, u'input_request', content, parent,
754 self.session.send(self.stdin_socket, u'input_request', content, parent,
755 ident=ident)
755 ident=ident)
756
756
757 # Await a response.
757 # Await a response.
758 while True:
758 while True:
759 try:
759 try:
760 ident, reply = self.session.recv(self.stdin_socket, 0)
760 ident, reply = self.session.recv(self.stdin_socket, 0)
761 except Exception:
761 except Exception:
762 self.log.warn("Invalid Message:", exc_info=True)
762 self.log.warn("Invalid Message:", exc_info=True)
763 except KeyboardInterrupt:
763 except KeyboardInterrupt:
764 # re-raise KeyboardInterrupt, to truncate traceback
764 # re-raise KeyboardInterrupt, to truncate traceback
765 raise KeyboardInterrupt
765 raise KeyboardInterrupt
766 else:
766 else:
767 break
767 break
768 try:
768 try:
769 value = py3compat.unicode_to_str(reply['content']['value'])
769 value = py3compat.unicode_to_str(reply['content']['value'])
770 except:
770 except:
771 self.log.error("Got bad raw_input reply: ")
771 self.log.error("Got bad raw_input reply: ")
772 self.log.error("%s", parent)
772 self.log.error("%s", parent)
773 value = ''
773 value = ''
774 if value == '\x04':
774 if value == '\x04':
775 # EOF
775 # EOF
776 raise EOFError
776 raise EOFError
777 return value
777 return value
778
778
779 def _complete(self, msg):
779 def _complete(self, msg):
780 c = msg['content']
780 c = msg['content']
781 try:
781 try:
782 cpos = int(c['cursor_pos'])
782 cpos = int(c['cursor_pos'])
783 except:
783 except:
784 # If we don't get something that we can convert to an integer, at
784 # If we don't get something that we can convert to an integer, at
785 # least attempt the completion guessing the cursor is at the end of
785 # least attempt the completion guessing the cursor is at the end of
786 # the text, if there's any, and otherwise of the line
786 # the text, if there's any, and otherwise of the line
787 cpos = len(c['text'])
787 cpos = len(c['text'])
788 if cpos==0:
788 if cpos==0:
789 cpos = len(c['line'])
789 cpos = len(c['line'])
790 return self.shell.complete(c['text'], c['line'], cpos)
790 return self.shell.complete(c['text'], c['line'], cpos)
791
791
792 def _at_shutdown(self):
792 def _at_shutdown(self):
793 """Actions taken at shutdown by the kernel, called by python's atexit.
793 """Actions taken at shutdown by the kernel, called by python's atexit.
794 """
794 """
795 # io.rprint("Kernel at_shutdown") # dbg
795 # io.rprint("Kernel at_shutdown") # dbg
796 if self._shutdown_message is not None:
796 if self._shutdown_message is not None:
797 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
797 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
798 self.log.debug("%s", self._shutdown_message)
798 self.log.debug("%s", self._shutdown_message)
799 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
799 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
800
800
General Comments 0
You need to be logged in to leave comments. Login now