##// END OF EJS Templates
re-raise KeyboardInterrupt in raw_input...
MinRK -
Show More
@@ -1,803 +1,806 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """A simple interactive kernel that talks to a frontend over 0MQ.
2 """A simple interactive kernel that talks to a frontend over 0MQ.
3
3
4 Things to do:
4 Things to do:
5
5
6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 call set_parent on all the PUB objects with the message about to be executed.
7 call set_parent on all the PUB objects with the message about to be executed.
8 * Implement random port and security key logic.
8 * Implement random port and security key logic.
9 * Implement control messages.
9 * Implement control messages.
10 * Implement event loop and poll version.
10 * Implement event loop and poll version.
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 # Standard library imports
18 # Standard library imports
19 import __builtin__
19 import __builtin__
20 import sys
20 import sys
21 import time
21 import time
22 import traceback
22 import traceback
23 import logging
23 import logging
24 import uuid
24 import uuid
25
25
26 from datetime import datetime
26 from datetime import datetime
27 from signal import (
27 from signal import (
28 signal, default_int_handler, SIGINT
28 signal, default_int_handler, SIGINT
29 )
29 )
30
30
31 # System library imports
31 # System library imports
32 import zmq
32 import zmq
33 from zmq.eventloop import ioloop
33 from zmq.eventloop import ioloop
34 from zmq.eventloop.zmqstream import ZMQStream
34 from zmq.eventloop.zmqstream import ZMQStream
35
35
36 # Local imports
36 # Local imports
37 from IPython.config.configurable import Configurable
37 from IPython.config.configurable import Configurable
38 from IPython.core.error import StdinNotImplementedError
38 from IPython.core.error import StdinNotImplementedError
39 from IPython.core import release
39 from IPython.core import release
40 from IPython.utils import py3compat
40 from IPython.utils import py3compat
41 from IPython.utils.jsonutil import json_clean
41 from IPython.utils.jsonutil import json_clean
42 from IPython.utils.traitlets import (
42 from IPython.utils.traitlets import (
43 Any, Instance, Float, Dict, List, Set, Integer, Unicode,
43 Any, Instance, Float, Dict, List, Set, Integer, Unicode,
44 Type
44 Type
45 )
45 )
46
46
47 from serialize import serialize_object, unpack_apply_message
47 from serialize import serialize_object, unpack_apply_message
48 from session import Session
48 from session import Session
49 from zmqshell import ZMQInteractiveShell
49 from zmqshell import ZMQInteractiveShell
50
50
51
51
52 #-----------------------------------------------------------------------------
52 #-----------------------------------------------------------------------------
53 # Main kernel class
53 # Main kernel class
54 #-----------------------------------------------------------------------------
54 #-----------------------------------------------------------------------------
55
55
56 protocol_version = list(release.kernel_protocol_version_info)
56 protocol_version = list(release.kernel_protocol_version_info)
57 ipython_version = list(release.version_info)
57 ipython_version = list(release.version_info)
58 language_version = list(sys.version_info[:3])
58 language_version = list(sys.version_info[:3])
59
59
60
60
61 class Kernel(Configurable):
61 class Kernel(Configurable):
62
62
63 #---------------------------------------------------------------------------
63 #---------------------------------------------------------------------------
64 # Kernel interface
64 # Kernel interface
65 #---------------------------------------------------------------------------
65 #---------------------------------------------------------------------------
66
66
67 # attribute to override with a GUI
67 # attribute to override with a GUI
68 eventloop = Any(None)
68 eventloop = Any(None)
69 def _eventloop_changed(self, name, old, new):
69 def _eventloop_changed(self, name, old, new):
70 """schedule call to eventloop from IOLoop"""
70 """schedule call to eventloop from IOLoop"""
71 loop = ioloop.IOLoop.instance()
71 loop = ioloop.IOLoop.instance()
72 loop.add_timeout(time.time()+0.1, self.enter_eventloop)
72 loop.add_timeout(time.time()+0.1, self.enter_eventloop)
73
73
74 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
74 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
75 shell_class = Type(ZMQInteractiveShell)
75 shell_class = Type(ZMQInteractiveShell)
76
76
77 session = Instance(Session)
77 session = Instance(Session)
78 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
78 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
79 shell_streams = List()
79 shell_streams = List()
80 control_stream = Instance(ZMQStream)
80 control_stream = Instance(ZMQStream)
81 iopub_socket = Instance(zmq.Socket)
81 iopub_socket = Instance(zmq.Socket)
82 stdin_socket = Instance(zmq.Socket)
82 stdin_socket = Instance(zmq.Socket)
83 log = Instance(logging.Logger)
83 log = Instance(logging.Logger)
84
84
85 user_module = Any()
85 user_module = Any()
86 def _user_module_changed(self, name, old, new):
86 def _user_module_changed(self, name, old, new):
87 if self.shell is not None:
87 if self.shell is not None:
88 self.shell.user_module = new
88 self.shell.user_module = new
89
89
90 user_ns = Dict(default_value=None)
90 user_ns = Dict(default_value=None)
91 def _user_ns_changed(self, name, old, new):
91 def _user_ns_changed(self, name, old, new):
92 if self.shell is not None:
92 if self.shell is not None:
93 self.shell.user_ns = new
93 self.shell.user_ns = new
94 self.shell.init_user_ns()
94 self.shell.init_user_ns()
95
95
96 # identities:
96 # identities:
97 int_id = Integer(-1)
97 int_id = Integer(-1)
98 ident = Unicode()
98 ident = Unicode()
99
99
100 def _ident_default(self):
100 def _ident_default(self):
101 return unicode(uuid.uuid4())
101 return unicode(uuid.uuid4())
102
102
103
103
104 # Private interface
104 # Private interface
105
105
106 # Time to sleep after flushing the stdout/err buffers in each execute
106 # Time to sleep after flushing the stdout/err buffers in each execute
107 # cycle. While this introduces a hard limit on the minimal latency of the
107 # cycle. While this introduces a hard limit on the minimal latency of the
108 # execute cycle, it helps prevent output synchronization problems for
108 # execute cycle, it helps prevent output synchronization problems for
109 # clients.
109 # clients.
110 # Units are in seconds. The minimum zmq latency on local host is probably
110 # Units are in seconds. The minimum zmq latency on local host is probably
111 # ~150 microseconds, set this to 500us for now. We may need to increase it
111 # ~150 microseconds, set this to 500us for now. We may need to increase it
112 # a little if it's not enough after more interactive testing.
112 # a little if it's not enough after more interactive testing.
113 _execute_sleep = Float(0.0005, config=True)
113 _execute_sleep = Float(0.0005, config=True)
114
114
115 # Frequency of the kernel's event loop.
115 # Frequency of the kernel's event loop.
116 # Units are in seconds, kernel subclasses for GUI toolkits may need to
116 # Units are in seconds, kernel subclasses for GUI toolkits may need to
117 # adapt to milliseconds.
117 # adapt to milliseconds.
118 _poll_interval = Float(0.05, config=True)
118 _poll_interval = Float(0.05, config=True)
119
119
120 # If the shutdown was requested over the network, we leave here the
120 # If the shutdown was requested over the network, we leave here the
121 # necessary reply message so it can be sent by our registered atexit
121 # necessary reply message so it can be sent by our registered atexit
122 # handler. This ensures that the reply is only sent to clients truly at
122 # handler. This ensures that the reply is only sent to clients truly at
123 # the end of our shutdown process (which happens after the underlying
123 # the end of our shutdown process (which happens after the underlying
124 # IPython shell's own shutdown).
124 # IPython shell's own shutdown).
125 _shutdown_message = None
125 _shutdown_message = None
126
126
127 # This is a dict of port number that the kernel is listening on. It is set
127 # This is a dict of port number that the kernel is listening on. It is set
128 # by record_ports and used by connect_request.
128 # by record_ports and used by connect_request.
129 _recorded_ports = Dict()
129 _recorded_ports = Dict()
130
130
131 # A reference to the Python builtin 'raw_input' function.
131 # A reference to the Python builtin 'raw_input' function.
132 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
132 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
133 _sys_raw_input = Any()
133 _sys_raw_input = Any()
134
134
135 # set of aborted msg_ids
135 # set of aborted msg_ids
136 aborted = Set()
136 aborted = Set()
137
137
138
138
139 def __init__(self, **kwargs):
139 def __init__(self, **kwargs):
140 super(Kernel, self).__init__(**kwargs)
140 super(Kernel, self).__init__(**kwargs)
141
141
142 # Initialize the InteractiveShell subclass
142 # Initialize the InteractiveShell subclass
143 self.shell = self.shell_class.instance(parent=self,
143 self.shell = self.shell_class.instance(parent=self,
144 profile_dir = self.profile_dir,
144 profile_dir = self.profile_dir,
145 user_module = self.user_module,
145 user_module = self.user_module,
146 user_ns = self.user_ns,
146 user_ns = self.user_ns,
147 )
147 )
148 self.shell.displayhook.session = self.session
148 self.shell.displayhook.session = self.session
149 self.shell.displayhook.pub_socket = self.iopub_socket
149 self.shell.displayhook.pub_socket = self.iopub_socket
150 self.shell.displayhook.topic = self._topic('pyout')
150 self.shell.displayhook.topic = self._topic('pyout')
151 self.shell.display_pub.session = self.session
151 self.shell.display_pub.session = self.session
152 self.shell.display_pub.pub_socket = self.iopub_socket
152 self.shell.display_pub.pub_socket = self.iopub_socket
153 self.shell.data_pub.session = self.session
153 self.shell.data_pub.session = self.session
154 self.shell.data_pub.pub_socket = self.iopub_socket
154 self.shell.data_pub.pub_socket = self.iopub_socket
155
155
156 # TMP - hack while developing
156 # TMP - hack while developing
157 self.shell._reply_content = None
157 self.shell._reply_content = None
158
158
159 # Build dict of handlers for message types
159 # Build dict of handlers for message types
160 msg_types = [ 'execute_request', 'complete_request',
160 msg_types = [ 'execute_request', 'complete_request',
161 'object_info_request', 'history_request',
161 'object_info_request', 'history_request',
162 'kernel_info_request',
162 'kernel_info_request',
163 'connect_request', 'shutdown_request',
163 'connect_request', 'shutdown_request',
164 'apply_request',
164 'apply_request',
165 ]
165 ]
166 self.shell_handlers = {}
166 self.shell_handlers = {}
167 for msg_type in msg_types:
167 for msg_type in msg_types:
168 self.shell_handlers[msg_type] = getattr(self, msg_type)
168 self.shell_handlers[msg_type] = getattr(self, msg_type)
169
169
170 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
170 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
171 self.control_handlers = {}
171 self.control_handlers = {}
172 for msg_type in control_msg_types:
172 for msg_type in control_msg_types:
173 self.control_handlers[msg_type] = getattr(self, msg_type)
173 self.control_handlers[msg_type] = getattr(self, msg_type)
174
174
175 def dispatch_control(self, msg):
175 def dispatch_control(self, msg):
176 """dispatch control requests"""
176 """dispatch control requests"""
177 idents,msg = self.session.feed_identities(msg, copy=False)
177 idents,msg = self.session.feed_identities(msg, copy=False)
178 try:
178 try:
179 msg = self.session.unserialize(msg, content=True, copy=False)
179 msg = self.session.unserialize(msg, content=True, copy=False)
180 except:
180 except:
181 self.log.error("Invalid Control Message", exc_info=True)
181 self.log.error("Invalid Control Message", exc_info=True)
182 return
182 return
183
183
184 self.log.debug("Control received: %s", msg)
184 self.log.debug("Control received: %s", msg)
185
185
186 header = msg['header']
186 header = msg['header']
187 msg_id = header['msg_id']
187 msg_id = header['msg_id']
188 msg_type = header['msg_type']
188 msg_type = header['msg_type']
189
189
190 handler = self.control_handlers.get(msg_type, None)
190 handler = self.control_handlers.get(msg_type, None)
191 if handler is None:
191 if handler is None:
192 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
192 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
193 else:
193 else:
194 try:
194 try:
195 handler(self.control_stream, idents, msg)
195 handler(self.control_stream, idents, msg)
196 except Exception:
196 except Exception:
197 self.log.error("Exception in control handler:", exc_info=True)
197 self.log.error("Exception in control handler:", exc_info=True)
198
198
199 def dispatch_shell(self, stream, msg):
199 def dispatch_shell(self, stream, msg):
200 """dispatch shell requests"""
200 """dispatch shell requests"""
201 # flush control requests first
201 # flush control requests first
202 if self.control_stream:
202 if self.control_stream:
203 self.control_stream.flush()
203 self.control_stream.flush()
204
204
205 idents,msg = self.session.feed_identities(msg, copy=False)
205 idents,msg = self.session.feed_identities(msg, copy=False)
206 try:
206 try:
207 msg = self.session.unserialize(msg, content=True, copy=False)
207 msg = self.session.unserialize(msg, content=True, copy=False)
208 except:
208 except:
209 self.log.error("Invalid Message", exc_info=True)
209 self.log.error("Invalid Message", exc_info=True)
210 return
210 return
211
211
212 header = msg['header']
212 header = msg['header']
213 msg_id = header['msg_id']
213 msg_id = header['msg_id']
214 msg_type = msg['header']['msg_type']
214 msg_type = msg['header']['msg_type']
215
215
216 # Print some info about this message and leave a '--->' marker, so it's
216 # Print some info about this message and leave a '--->' marker, so it's
217 # easier to trace visually the message chain when debugging. Each
217 # easier to trace visually the message chain when debugging. Each
218 # handler prints its message at the end.
218 # handler prints its message at the end.
219 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
219 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
220 self.log.debug(' Content: %s\n --->\n ', msg['content'])
220 self.log.debug(' Content: %s\n --->\n ', msg['content'])
221
221
222 if msg_id in self.aborted:
222 if msg_id in self.aborted:
223 self.aborted.remove(msg_id)
223 self.aborted.remove(msg_id)
224 # is it safe to assume a msg_id will not be resubmitted?
224 # is it safe to assume a msg_id will not be resubmitted?
225 reply_type = msg_type.split('_')[0] + '_reply'
225 reply_type = msg_type.split('_')[0] + '_reply'
226 status = {'status' : 'aborted'}
226 status = {'status' : 'aborted'}
227 md = {'engine' : self.ident}
227 md = {'engine' : self.ident}
228 md.update(status)
228 md.update(status)
229 reply_msg = self.session.send(stream, reply_type, metadata=md,
229 reply_msg = self.session.send(stream, reply_type, metadata=md,
230 content=status, parent=msg, ident=idents)
230 content=status, parent=msg, ident=idents)
231 return
231 return
232
232
233 handler = self.shell_handlers.get(msg_type, None)
233 handler = self.shell_handlers.get(msg_type, None)
234 if handler is None:
234 if handler is None:
235 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
235 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
236 else:
236 else:
237 # ensure default_int_handler during handler call
237 # ensure default_int_handler during handler call
238 sig = signal(SIGINT, default_int_handler)
238 sig = signal(SIGINT, default_int_handler)
239 try:
239 try:
240 handler(stream, idents, msg)
240 handler(stream, idents, msg)
241 except Exception:
241 except Exception:
242 self.log.error("Exception in message handler:", exc_info=True)
242 self.log.error("Exception in message handler:", exc_info=True)
243 finally:
243 finally:
244 signal(SIGINT, sig)
244 signal(SIGINT, sig)
245
245
246 def enter_eventloop(self):
246 def enter_eventloop(self):
247 """enter eventloop"""
247 """enter eventloop"""
248 self.log.info("entering eventloop")
248 self.log.info("entering eventloop")
249 # restore default_int_handler
249 # restore default_int_handler
250 signal(SIGINT, default_int_handler)
250 signal(SIGINT, default_int_handler)
251 while self.eventloop is not None:
251 while self.eventloop is not None:
252 try:
252 try:
253 self.eventloop(self)
253 self.eventloop(self)
254 except KeyboardInterrupt:
254 except KeyboardInterrupt:
255 # Ctrl-C shouldn't crash the kernel
255 # Ctrl-C shouldn't crash the kernel
256 self.log.error("KeyboardInterrupt caught in kernel")
256 self.log.error("KeyboardInterrupt caught in kernel")
257 continue
257 continue
258 else:
258 else:
259 # eventloop exited cleanly, this means we should stop (right?)
259 # eventloop exited cleanly, this means we should stop (right?)
260 self.eventloop = None
260 self.eventloop = None
261 break
261 break
262 self.log.info("exiting eventloop")
262 self.log.info("exiting eventloop")
263
263
264 def start(self):
264 def start(self):
265 """register dispatchers for streams"""
265 """register dispatchers for streams"""
266 self.shell.exit_now = False
266 self.shell.exit_now = False
267 if self.control_stream:
267 if self.control_stream:
268 self.control_stream.on_recv(self.dispatch_control, copy=False)
268 self.control_stream.on_recv(self.dispatch_control, copy=False)
269
269
270 def make_dispatcher(stream):
270 def make_dispatcher(stream):
271 def dispatcher(msg):
271 def dispatcher(msg):
272 return self.dispatch_shell(stream, msg)
272 return self.dispatch_shell(stream, msg)
273 return dispatcher
273 return dispatcher
274
274
275 for s in self.shell_streams:
275 for s in self.shell_streams:
276 s.on_recv(make_dispatcher(s), copy=False)
276 s.on_recv(make_dispatcher(s), copy=False)
277
277
278 # publish idle status
278 # publish idle status
279 self._publish_status('starting')
279 self._publish_status('starting')
280
280
281 def do_one_iteration(self):
281 def do_one_iteration(self):
282 """step eventloop just once"""
282 """step eventloop just once"""
283 if self.control_stream:
283 if self.control_stream:
284 self.control_stream.flush()
284 self.control_stream.flush()
285 for stream in self.shell_streams:
285 for stream in self.shell_streams:
286 # handle at most one request per iteration
286 # handle at most one request per iteration
287 stream.flush(zmq.POLLIN, 1)
287 stream.flush(zmq.POLLIN, 1)
288 stream.flush(zmq.POLLOUT)
288 stream.flush(zmq.POLLOUT)
289
289
290
290
291 def record_ports(self, ports):
291 def record_ports(self, ports):
292 """Record the ports that this kernel is using.
292 """Record the ports that this kernel is using.
293
293
294 The creator of the Kernel instance must call this methods if they
294 The creator of the Kernel instance must call this methods if they
295 want the :meth:`connect_request` method to return the port numbers.
295 want the :meth:`connect_request` method to return the port numbers.
296 """
296 """
297 self._recorded_ports = ports
297 self._recorded_ports = ports
298
298
299 #---------------------------------------------------------------------------
299 #---------------------------------------------------------------------------
300 # Kernel request handlers
300 # Kernel request handlers
301 #---------------------------------------------------------------------------
301 #---------------------------------------------------------------------------
302
302
303 def _make_metadata(self, other=None):
303 def _make_metadata(self, other=None):
304 """init metadata dict, for execute/apply_reply"""
304 """init metadata dict, for execute/apply_reply"""
305 new_md = {
305 new_md = {
306 'dependencies_met' : True,
306 'dependencies_met' : True,
307 'engine' : self.ident,
307 'engine' : self.ident,
308 'started': datetime.now(),
308 'started': datetime.now(),
309 }
309 }
310 if other:
310 if other:
311 new_md.update(other)
311 new_md.update(other)
312 return new_md
312 return new_md
313
313
314 def _publish_pyin(self, code, parent, execution_count):
314 def _publish_pyin(self, code, parent, execution_count):
315 """Publish the code request on the pyin stream."""
315 """Publish the code request on the pyin stream."""
316
316
317 self.session.send(self.iopub_socket, u'pyin',
317 self.session.send(self.iopub_socket, u'pyin',
318 {u'code':code, u'execution_count': execution_count},
318 {u'code':code, u'execution_count': execution_count},
319 parent=parent, ident=self._topic('pyin')
319 parent=parent, ident=self._topic('pyin')
320 )
320 )
321
321
322 def _publish_status(self, status, parent=None):
322 def _publish_status(self, status, parent=None):
323 """send status (busy/idle) on IOPub"""
323 """send status (busy/idle) on IOPub"""
324 self.session.send(self.iopub_socket,
324 self.session.send(self.iopub_socket,
325 u'status',
325 u'status',
326 {u'execution_state': status},
326 {u'execution_state': status},
327 parent=parent,
327 parent=parent,
328 ident=self._topic('status'),
328 ident=self._topic('status'),
329 )
329 )
330
330
331
331
332 def execute_request(self, stream, ident, parent):
332 def execute_request(self, stream, ident, parent):
333 """handle an execute_request"""
333 """handle an execute_request"""
334
334
335 self._publish_status(u'busy', parent)
335 self._publish_status(u'busy', parent)
336
336
337 try:
337 try:
338 content = parent[u'content']
338 content = parent[u'content']
339 code = content[u'code']
339 code = content[u'code']
340 silent = content[u'silent']
340 silent = content[u'silent']
341 store_history = content.get(u'store_history', not silent)
341 store_history = content.get(u'store_history', not silent)
342 except:
342 except:
343 self.log.error("Got bad msg: ")
343 self.log.error("Got bad msg: ")
344 self.log.error("%s", parent)
344 self.log.error("%s", parent)
345 return
345 return
346
346
347 md = self._make_metadata(parent['metadata'])
347 md = self._make_metadata(parent['metadata'])
348
348
349 shell = self.shell # we'll need this a lot here
349 shell = self.shell # we'll need this a lot here
350
350
351 # Replace raw_input. Note that is not sufficient to replace
351 # Replace raw_input. Note that is not sufficient to replace
352 # raw_input in the user namespace.
352 # raw_input in the user namespace.
353 if content.get('allow_stdin', False):
353 if content.get('allow_stdin', False):
354 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
354 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
355 else:
355 else:
356 raw_input = lambda prompt='' : self._no_raw_input()
356 raw_input = lambda prompt='' : self._no_raw_input()
357
357
358 if py3compat.PY3:
358 if py3compat.PY3:
359 self._sys_raw_input = __builtin__.input
359 self._sys_raw_input = __builtin__.input
360 __builtin__.input = raw_input
360 __builtin__.input = raw_input
361 else:
361 else:
362 self._sys_raw_input = __builtin__.raw_input
362 self._sys_raw_input = __builtin__.raw_input
363 __builtin__.raw_input = raw_input
363 __builtin__.raw_input = raw_input
364
364
365 # Set the parent message of the display hook and out streams.
365 # Set the parent message of the display hook and out streams.
366 shell.displayhook.set_parent(parent)
366 shell.displayhook.set_parent(parent)
367 shell.display_pub.set_parent(parent)
367 shell.display_pub.set_parent(parent)
368 shell.data_pub.set_parent(parent)
368 shell.data_pub.set_parent(parent)
369 try:
369 try:
370 sys.stdout.set_parent(parent)
370 sys.stdout.set_parent(parent)
371 except AttributeError:
371 except AttributeError:
372 pass
372 pass
373 try:
373 try:
374 sys.stderr.set_parent(parent)
374 sys.stderr.set_parent(parent)
375 except AttributeError:
375 except AttributeError:
376 pass
376 pass
377
377
378 # Re-broadcast our input for the benefit of listening clients, and
378 # Re-broadcast our input for the benefit of listening clients, and
379 # start computing output
379 # start computing output
380 if not silent:
380 if not silent:
381 self._publish_pyin(code, parent, shell.execution_count)
381 self._publish_pyin(code, parent, shell.execution_count)
382
382
383 reply_content = {}
383 reply_content = {}
384 try:
384 try:
385 # FIXME: the shell calls the exception handler itself.
385 # FIXME: the shell calls the exception handler itself.
386 shell.run_cell(code, store_history=store_history, silent=silent)
386 shell.run_cell(code, store_history=store_history, silent=silent)
387 except:
387 except:
388 status = u'error'
388 status = u'error'
389 # FIXME: this code right now isn't being used yet by default,
389 # FIXME: this code right now isn't being used yet by default,
390 # because the run_cell() call above directly fires off exception
390 # because the run_cell() call above directly fires off exception
391 # reporting. This code, therefore, is only active in the scenario
391 # reporting. This code, therefore, is only active in the scenario
392 # where runlines itself has an unhandled exception. We need to
392 # where runlines itself has an unhandled exception. We need to
393 # uniformize this, for all exception construction to come from a
393 # uniformize this, for all exception construction to come from a
394 # single location in the codbase.
394 # single location in the codbase.
395 etype, evalue, tb = sys.exc_info()
395 etype, evalue, tb = sys.exc_info()
396 tb_list = traceback.format_exception(etype, evalue, tb)
396 tb_list = traceback.format_exception(etype, evalue, tb)
397 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
397 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
398 else:
398 else:
399 status = u'ok'
399 status = u'ok'
400 finally:
400 finally:
401 # Restore raw_input.
401 # Restore raw_input.
402 if py3compat.PY3:
402 if py3compat.PY3:
403 __builtin__.input = self._sys_raw_input
403 __builtin__.input = self._sys_raw_input
404 else:
404 else:
405 __builtin__.raw_input = self._sys_raw_input
405 __builtin__.raw_input = self._sys_raw_input
406
406
407 reply_content[u'status'] = status
407 reply_content[u'status'] = status
408
408
409 # Return the execution counter so clients can display prompts
409 # Return the execution counter so clients can display prompts
410 reply_content['execution_count'] = shell.execution_count - 1
410 reply_content['execution_count'] = shell.execution_count - 1
411
411
412 # FIXME - fish exception info out of shell, possibly left there by
412 # FIXME - fish exception info out of shell, possibly left there by
413 # runlines. We'll need to clean up this logic later.
413 # runlines. We'll need to clean up this logic later.
414 if shell._reply_content is not None:
414 if shell._reply_content is not None:
415 reply_content.update(shell._reply_content)
415 reply_content.update(shell._reply_content)
416 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
416 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
417 reply_content['engine_info'] = e_info
417 reply_content['engine_info'] = e_info
418 # reset after use
418 # reset after use
419 shell._reply_content = None
419 shell._reply_content = None
420
420
421 if 'traceback' in reply_content:
421 if 'traceback' in reply_content:
422 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
422 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
423
423
424
424
425 # At this point, we can tell whether the main code execution succeeded
425 # At this point, we can tell whether the main code execution succeeded
426 # or not. If it did, we proceed to evaluate user_variables/expressions
426 # or not. If it did, we proceed to evaluate user_variables/expressions
427 if reply_content['status'] == 'ok':
427 if reply_content['status'] == 'ok':
428 reply_content[u'user_variables'] = \
428 reply_content[u'user_variables'] = \
429 shell.user_variables(content.get(u'user_variables', []))
429 shell.user_variables(content.get(u'user_variables', []))
430 reply_content[u'user_expressions'] = \
430 reply_content[u'user_expressions'] = \
431 shell.user_expressions(content.get(u'user_expressions', {}))
431 shell.user_expressions(content.get(u'user_expressions', {}))
432 else:
432 else:
433 # If there was an error, don't even try to compute variables or
433 # If there was an error, don't even try to compute variables or
434 # expressions
434 # expressions
435 reply_content[u'user_variables'] = {}
435 reply_content[u'user_variables'] = {}
436 reply_content[u'user_expressions'] = {}
436 reply_content[u'user_expressions'] = {}
437
437
438 # Payloads should be retrieved regardless of outcome, so we can both
438 # Payloads should be retrieved regardless of outcome, so we can both
439 # recover partial output (that could have been generated early in a
439 # recover partial output (that could have been generated early in a
440 # block, before an error) and clear the payload system always.
440 # block, before an error) and clear the payload system always.
441 reply_content[u'payload'] = shell.payload_manager.read_payload()
441 reply_content[u'payload'] = shell.payload_manager.read_payload()
442 # Be agressive about clearing the payload because we don't want
442 # Be agressive about clearing the payload because we don't want
443 # it to sit in memory until the next execute_request comes in.
443 # it to sit in memory until the next execute_request comes in.
444 shell.payload_manager.clear_payload()
444 shell.payload_manager.clear_payload()
445
445
446 # Flush output before sending the reply.
446 # Flush output before sending the reply.
447 sys.stdout.flush()
447 sys.stdout.flush()
448 sys.stderr.flush()
448 sys.stderr.flush()
449 # FIXME: on rare occasions, the flush doesn't seem to make it to the
449 # FIXME: on rare occasions, the flush doesn't seem to make it to the
450 # clients... This seems to mitigate the problem, but we definitely need
450 # clients... This seems to mitigate the problem, but we definitely need
451 # to better understand what's going on.
451 # to better understand what's going on.
452 if self._execute_sleep:
452 if self._execute_sleep:
453 time.sleep(self._execute_sleep)
453 time.sleep(self._execute_sleep)
454
454
455 # Send the reply.
455 # Send the reply.
456 reply_content = json_clean(reply_content)
456 reply_content = json_clean(reply_content)
457
457
458 md['status'] = reply_content['status']
458 md['status'] = reply_content['status']
459 if reply_content['status'] == 'error' and \
459 if reply_content['status'] == 'error' and \
460 reply_content['ename'] == 'UnmetDependency':
460 reply_content['ename'] == 'UnmetDependency':
461 md['dependencies_met'] = False
461 md['dependencies_met'] = False
462
462
463 reply_msg = self.session.send(stream, u'execute_reply',
463 reply_msg = self.session.send(stream, u'execute_reply',
464 reply_content, parent, metadata=md,
464 reply_content, parent, metadata=md,
465 ident=ident)
465 ident=ident)
466
466
467 self.log.debug("%s", reply_msg)
467 self.log.debug("%s", reply_msg)
468
468
469 if not silent and reply_msg['content']['status'] == u'error':
469 if not silent and reply_msg['content']['status'] == u'error':
470 self._abort_queues()
470 self._abort_queues()
471
471
472 self._publish_status(u'idle', parent)
472 self._publish_status(u'idle', parent)
473
473
474 def complete_request(self, stream, ident, parent):
474 def complete_request(self, stream, ident, parent):
475 txt, matches = self._complete(parent)
475 txt, matches = self._complete(parent)
476 matches = {'matches' : matches,
476 matches = {'matches' : matches,
477 'matched_text' : txt,
477 'matched_text' : txt,
478 'status' : 'ok'}
478 'status' : 'ok'}
479 matches = json_clean(matches)
479 matches = json_clean(matches)
480 completion_msg = self.session.send(stream, 'complete_reply',
480 completion_msg = self.session.send(stream, 'complete_reply',
481 matches, parent, ident)
481 matches, parent, ident)
482 self.log.debug("%s", completion_msg)
482 self.log.debug("%s", completion_msg)
483
483
484 def object_info_request(self, stream, ident, parent):
484 def object_info_request(self, stream, ident, parent):
485 content = parent['content']
485 content = parent['content']
486 object_info = self.shell.object_inspect(content['oname'],
486 object_info = self.shell.object_inspect(content['oname'],
487 detail_level = content.get('detail_level', 0)
487 detail_level = content.get('detail_level', 0)
488 )
488 )
489 # Before we send this object over, we scrub it for JSON usage
489 # Before we send this object over, we scrub it for JSON usage
490 oinfo = json_clean(object_info)
490 oinfo = json_clean(object_info)
491 msg = self.session.send(stream, 'object_info_reply',
491 msg = self.session.send(stream, 'object_info_reply',
492 oinfo, parent, ident)
492 oinfo, parent, ident)
493 self.log.debug("%s", msg)
493 self.log.debug("%s", msg)
494
494
495 def history_request(self, stream, ident, parent):
495 def history_request(self, stream, ident, parent):
496 # We need to pull these out, as passing **kwargs doesn't work with
496 # We need to pull these out, as passing **kwargs doesn't work with
497 # unicode keys before Python 2.6.5.
497 # unicode keys before Python 2.6.5.
498 hist_access_type = parent['content']['hist_access_type']
498 hist_access_type = parent['content']['hist_access_type']
499 raw = parent['content']['raw']
499 raw = parent['content']['raw']
500 output = parent['content']['output']
500 output = parent['content']['output']
501 if hist_access_type == 'tail':
501 if hist_access_type == 'tail':
502 n = parent['content']['n']
502 n = parent['content']['n']
503 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
503 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
504 include_latest=True)
504 include_latest=True)
505
505
506 elif hist_access_type == 'range':
506 elif hist_access_type == 'range':
507 session = parent['content']['session']
507 session = parent['content']['session']
508 start = parent['content']['start']
508 start = parent['content']['start']
509 stop = parent['content']['stop']
509 stop = parent['content']['stop']
510 hist = self.shell.history_manager.get_range(session, start, stop,
510 hist = self.shell.history_manager.get_range(session, start, stop,
511 raw=raw, output=output)
511 raw=raw, output=output)
512
512
513 elif hist_access_type == 'search':
513 elif hist_access_type == 'search':
514 n = parent['content'].get('n')
514 n = parent['content'].get('n')
515 unique = parent['content'].get('unique', False)
515 unique = parent['content'].get('unique', False)
516 pattern = parent['content']['pattern']
516 pattern = parent['content']['pattern']
517 hist = self.shell.history_manager.search(
517 hist = self.shell.history_manager.search(
518 pattern, raw=raw, output=output, n=n, unique=unique)
518 pattern, raw=raw, output=output, n=n, unique=unique)
519
519
520 else:
520 else:
521 hist = []
521 hist = []
522 hist = list(hist)
522 hist = list(hist)
523 content = {'history' : hist}
523 content = {'history' : hist}
524 content = json_clean(content)
524 content = json_clean(content)
525 msg = self.session.send(stream, 'history_reply',
525 msg = self.session.send(stream, 'history_reply',
526 content, parent, ident)
526 content, parent, ident)
527 self.log.debug("Sending history reply with %i entries", len(hist))
527 self.log.debug("Sending history reply with %i entries", len(hist))
528
528
529 def connect_request(self, stream, ident, parent):
529 def connect_request(self, stream, ident, parent):
530 if self._recorded_ports is not None:
530 if self._recorded_ports is not None:
531 content = self._recorded_ports.copy()
531 content = self._recorded_ports.copy()
532 else:
532 else:
533 content = {}
533 content = {}
534 msg = self.session.send(stream, 'connect_reply',
534 msg = self.session.send(stream, 'connect_reply',
535 content, parent, ident)
535 content, parent, ident)
536 self.log.debug("%s", msg)
536 self.log.debug("%s", msg)
537
537
538 def kernel_info_request(self, stream, ident, parent):
538 def kernel_info_request(self, stream, ident, parent):
539 vinfo = {
539 vinfo = {
540 'protocol_version': protocol_version,
540 'protocol_version': protocol_version,
541 'ipython_version': ipython_version,
541 'ipython_version': ipython_version,
542 'language_version': language_version,
542 'language_version': language_version,
543 'language': 'python',
543 'language': 'python',
544 }
544 }
545 msg = self.session.send(stream, 'kernel_info_reply',
545 msg = self.session.send(stream, 'kernel_info_reply',
546 vinfo, parent, ident)
546 vinfo, parent, ident)
547 self.log.debug("%s", msg)
547 self.log.debug("%s", msg)
548
548
549 def shutdown_request(self, stream, ident, parent):
549 def shutdown_request(self, stream, ident, parent):
550 self.shell.exit_now = True
550 self.shell.exit_now = True
551 content = dict(status='ok')
551 content = dict(status='ok')
552 content.update(parent['content'])
552 content.update(parent['content'])
553 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
553 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
554 # same content, but different msg_id for broadcasting on IOPub
554 # same content, but different msg_id for broadcasting on IOPub
555 self._shutdown_message = self.session.msg(u'shutdown_reply',
555 self._shutdown_message = self.session.msg(u'shutdown_reply',
556 content, parent
556 content, parent
557 )
557 )
558
558
559 self._at_shutdown()
559 self._at_shutdown()
560 # call sys.exit after a short delay
560 # call sys.exit after a short delay
561 loop = ioloop.IOLoop.instance()
561 loop = ioloop.IOLoop.instance()
562 loop.add_timeout(time.time()+0.1, loop.stop)
562 loop.add_timeout(time.time()+0.1, loop.stop)
563
563
564 #---------------------------------------------------------------------------
564 #---------------------------------------------------------------------------
565 # Engine methods
565 # Engine methods
566 #---------------------------------------------------------------------------
566 #---------------------------------------------------------------------------
567
567
568 def apply_request(self, stream, ident, parent):
568 def apply_request(self, stream, ident, parent):
569 try:
569 try:
570 content = parent[u'content']
570 content = parent[u'content']
571 bufs = parent[u'buffers']
571 bufs = parent[u'buffers']
572 msg_id = parent['header']['msg_id']
572 msg_id = parent['header']['msg_id']
573 except:
573 except:
574 self.log.error("Got bad msg: %s", parent, exc_info=True)
574 self.log.error("Got bad msg: %s", parent, exc_info=True)
575 return
575 return
576
576
577 self._publish_status(u'busy', parent)
577 self._publish_status(u'busy', parent)
578
578
579 # Set the parent message of the display hook and out streams.
579 # Set the parent message of the display hook and out streams.
580 shell = self.shell
580 shell = self.shell
581 shell.displayhook.set_parent(parent)
581 shell.displayhook.set_parent(parent)
582 shell.display_pub.set_parent(parent)
582 shell.display_pub.set_parent(parent)
583 shell.data_pub.set_parent(parent)
583 shell.data_pub.set_parent(parent)
584 try:
584 try:
585 sys.stdout.set_parent(parent)
585 sys.stdout.set_parent(parent)
586 except AttributeError:
586 except AttributeError:
587 pass
587 pass
588 try:
588 try:
589 sys.stderr.set_parent(parent)
589 sys.stderr.set_parent(parent)
590 except AttributeError:
590 except AttributeError:
591 pass
591 pass
592
592
593 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
593 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
594 # self.iopub_socket.send(pyin_msg)
594 # self.iopub_socket.send(pyin_msg)
595 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
595 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
596 md = self._make_metadata(parent['metadata'])
596 md = self._make_metadata(parent['metadata'])
597 try:
597 try:
598 working = shell.user_ns
598 working = shell.user_ns
599
599
600 prefix = "_"+str(msg_id).replace("-","")+"_"
600 prefix = "_"+str(msg_id).replace("-","")+"_"
601
601
602 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
602 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
603
603
604 fname = getattr(f, '__name__', 'f')
604 fname = getattr(f, '__name__', 'f')
605
605
606 fname = prefix+"f"
606 fname = prefix+"f"
607 argname = prefix+"args"
607 argname = prefix+"args"
608 kwargname = prefix+"kwargs"
608 kwargname = prefix+"kwargs"
609 resultname = prefix+"result"
609 resultname = prefix+"result"
610
610
611 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
611 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
612 # print ns
612 # print ns
613 working.update(ns)
613 working.update(ns)
614 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
614 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
615 try:
615 try:
616 exec code in shell.user_global_ns, shell.user_ns
616 exec code in shell.user_global_ns, shell.user_ns
617 result = working.get(resultname)
617 result = working.get(resultname)
618 finally:
618 finally:
619 for key in ns.iterkeys():
619 for key in ns.iterkeys():
620 working.pop(key)
620 working.pop(key)
621
621
622 result_buf = serialize_object(result,
622 result_buf = serialize_object(result,
623 buffer_threshold=self.session.buffer_threshold,
623 buffer_threshold=self.session.buffer_threshold,
624 item_threshold=self.session.item_threshold,
624 item_threshold=self.session.item_threshold,
625 )
625 )
626
626
627 except:
627 except:
628 # invoke IPython traceback formatting
628 # invoke IPython traceback formatting
629 shell.showtraceback()
629 shell.showtraceback()
630 # FIXME - fish exception info out of shell, possibly left there by
630 # FIXME - fish exception info out of shell, possibly left there by
631 # run_code. We'll need to clean up this logic later.
631 # run_code. We'll need to clean up this logic later.
632 reply_content = {}
632 reply_content = {}
633 if shell._reply_content is not None:
633 if shell._reply_content is not None:
634 reply_content.update(shell._reply_content)
634 reply_content.update(shell._reply_content)
635 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
635 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
636 reply_content['engine_info'] = e_info
636 reply_content['engine_info'] = e_info
637 # reset after use
637 # reset after use
638 shell._reply_content = None
638 shell._reply_content = None
639
639
640 self.session.send(self.iopub_socket, u'pyerr', reply_content, parent=parent,
640 self.session.send(self.iopub_socket, u'pyerr', reply_content, parent=parent,
641 ident=self._topic('pyerr'))
641 ident=self._topic('pyerr'))
642 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
642 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
643 result_buf = []
643 result_buf = []
644
644
645 if reply_content['ename'] == 'UnmetDependency':
645 if reply_content['ename'] == 'UnmetDependency':
646 md['dependencies_met'] = False
646 md['dependencies_met'] = False
647 else:
647 else:
648 reply_content = {'status' : 'ok'}
648 reply_content = {'status' : 'ok'}
649
649
650 # put 'ok'/'error' status in header, for scheduler introspection:
650 # put 'ok'/'error' status in header, for scheduler introspection:
651 md['status'] = reply_content['status']
651 md['status'] = reply_content['status']
652
652
653 # flush i/o
653 # flush i/o
654 sys.stdout.flush()
654 sys.stdout.flush()
655 sys.stderr.flush()
655 sys.stderr.flush()
656
656
657 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
657 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
658 parent=parent, ident=ident,buffers=result_buf, metadata=md)
658 parent=parent, ident=ident,buffers=result_buf, metadata=md)
659
659
660 self._publish_status(u'idle', parent)
660 self._publish_status(u'idle', parent)
661
661
662 #---------------------------------------------------------------------------
662 #---------------------------------------------------------------------------
663 # Control messages
663 # Control messages
664 #---------------------------------------------------------------------------
664 #---------------------------------------------------------------------------
665
665
666 def abort_request(self, stream, ident, parent):
666 def abort_request(self, stream, ident, parent):
667 """abort a specifig msg by id"""
667 """abort a specifig msg by id"""
668 msg_ids = parent['content'].get('msg_ids', None)
668 msg_ids = parent['content'].get('msg_ids', None)
669 if isinstance(msg_ids, basestring):
669 if isinstance(msg_ids, basestring):
670 msg_ids = [msg_ids]
670 msg_ids = [msg_ids]
671 if not msg_ids:
671 if not msg_ids:
672 self.abort_queues()
672 self.abort_queues()
673 for mid in msg_ids:
673 for mid in msg_ids:
674 self.aborted.add(str(mid))
674 self.aborted.add(str(mid))
675
675
676 content = dict(status='ok')
676 content = dict(status='ok')
677 reply_msg = self.session.send(stream, 'abort_reply', content=content,
677 reply_msg = self.session.send(stream, 'abort_reply', content=content,
678 parent=parent, ident=ident)
678 parent=parent, ident=ident)
679 self.log.debug("%s", reply_msg)
679 self.log.debug("%s", reply_msg)
680
680
681 def clear_request(self, stream, idents, parent):
681 def clear_request(self, stream, idents, parent):
682 """Clear our namespace."""
682 """Clear our namespace."""
683 self.shell.reset(False)
683 self.shell.reset(False)
684 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
684 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
685 content = dict(status='ok'))
685 content = dict(status='ok'))
686
686
687
687
688 #---------------------------------------------------------------------------
688 #---------------------------------------------------------------------------
689 # Protected interface
689 # Protected interface
690 #---------------------------------------------------------------------------
690 #---------------------------------------------------------------------------
691
691
692 def _wrap_exception(self, method=None):
692 def _wrap_exception(self, method=None):
693 # import here, because _wrap_exception is only used in parallel,
693 # import here, because _wrap_exception is only used in parallel,
694 # and parallel has higher min pyzmq version
694 # and parallel has higher min pyzmq version
695 from IPython.parallel.error import wrap_exception
695 from IPython.parallel.error import wrap_exception
696 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
696 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
697 content = wrap_exception(e_info)
697 content = wrap_exception(e_info)
698 return content
698 return content
699
699
700 def _topic(self, topic):
700 def _topic(self, topic):
701 """prefixed topic for IOPub messages"""
701 """prefixed topic for IOPub messages"""
702 if self.int_id >= 0:
702 if self.int_id >= 0:
703 base = "engine.%i" % self.int_id
703 base = "engine.%i" % self.int_id
704 else:
704 else:
705 base = "kernel.%s" % self.ident
705 base = "kernel.%s" % self.ident
706
706
707 return py3compat.cast_bytes("%s.%s" % (base, topic))
707 return py3compat.cast_bytes("%s.%s" % (base, topic))
708
708
709 def _abort_queues(self):
709 def _abort_queues(self):
710 for stream in self.shell_streams:
710 for stream in self.shell_streams:
711 if stream:
711 if stream:
712 self._abort_queue(stream)
712 self._abort_queue(stream)
713
713
714 def _abort_queue(self, stream):
714 def _abort_queue(self, stream):
715 poller = zmq.Poller()
715 poller = zmq.Poller()
716 poller.register(stream.socket, zmq.POLLIN)
716 poller.register(stream.socket, zmq.POLLIN)
717 while True:
717 while True:
718 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
718 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
719 if msg is None:
719 if msg is None:
720 return
720 return
721
721
722 self.log.info("Aborting:")
722 self.log.info("Aborting:")
723 self.log.info("%s", msg)
723 self.log.info("%s", msg)
724 msg_type = msg['header']['msg_type']
724 msg_type = msg['header']['msg_type']
725 reply_type = msg_type.split('_')[0] + '_reply'
725 reply_type = msg_type.split('_')[0] + '_reply'
726
726
727 status = {'status' : 'aborted'}
727 status = {'status' : 'aborted'}
728 md = {'engine' : self.ident}
728 md = {'engine' : self.ident}
729 md.update(status)
729 md.update(status)
730 reply_msg = self.session.send(stream, reply_type, metadata=md,
730 reply_msg = self.session.send(stream, reply_type, metadata=md,
731 content=status, parent=msg, ident=idents)
731 content=status, parent=msg, ident=idents)
732 self.log.debug("%s", reply_msg)
732 self.log.debug("%s", reply_msg)
733 # We need to wait a bit for requests to come in. This can probably
733 # We need to wait a bit for requests to come in. This can probably
734 # be set shorter for true asynchronous clients.
734 # be set shorter for true asynchronous clients.
735 poller.poll(50)
735 poller.poll(50)
736
736
737
737
738 def _no_raw_input(self):
738 def _no_raw_input(self):
739 """Raise StdinNotImplentedError if active frontend doesn't support
739 """Raise StdinNotImplentedError if active frontend doesn't support
740 stdin."""
740 stdin."""
741 raise StdinNotImplementedError("raw_input was called, but this "
741 raise StdinNotImplementedError("raw_input was called, but this "
742 "frontend does not support stdin.")
742 "frontend does not support stdin.")
743
743
744 def _raw_input(self, prompt, ident, parent):
744 def _raw_input(self, prompt, ident, parent):
745 # Flush output before making the request.
745 # Flush output before making the request.
746 sys.stderr.flush()
746 sys.stderr.flush()
747 sys.stdout.flush()
747 sys.stdout.flush()
748 # flush the stdin socket, to purge stale replies
748 # flush the stdin socket, to purge stale replies
749 while True:
749 while True:
750 try:
750 try:
751 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
751 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
752 except zmq.ZMQError as e:
752 except zmq.ZMQError as e:
753 if e.errno == zmq.EAGAIN:
753 if e.errno == zmq.EAGAIN:
754 break
754 break
755 else:
755 else:
756 raise
756 raise
757
757
758 # Send the input request.
758 # Send the input request.
759 content = json_clean(dict(prompt=prompt))
759 content = json_clean(dict(prompt=prompt))
760 self.session.send(self.stdin_socket, u'input_request', content, parent,
760 self.session.send(self.stdin_socket, u'input_request', content, parent,
761 ident=ident)
761 ident=ident)
762
762
763 # Await a response.
763 # Await a response.
764 while True:
764 while True:
765 try:
765 try:
766 ident, reply = self.session.recv(self.stdin_socket, 0)
766 ident, reply = self.session.recv(self.stdin_socket, 0)
767 except Exception:
767 except Exception:
768 self.log.warn("Invalid Message:", exc_info=True)
768 self.log.warn("Invalid Message:", exc_info=True)
769 except KeyboardInterrupt:
770 # re-raise KeyboardInterrupt, to truncate traceback
771 raise KeyboardInterrupt
769 else:
772 else:
770 break
773 break
771 try:
774 try:
772 value = py3compat.unicode_to_str(reply['content']['value'])
775 value = py3compat.unicode_to_str(reply['content']['value'])
773 except:
776 except:
774 self.log.error("Got bad raw_input reply: ")
777 self.log.error("Got bad raw_input reply: ")
775 self.log.error("%s", parent)
778 self.log.error("%s", parent)
776 value = ''
779 value = ''
777 if value == '\x04':
780 if value == '\x04':
778 # EOF
781 # EOF
779 raise EOFError
782 raise EOFError
780 return value
783 return value
781
784
782 def _complete(self, msg):
785 def _complete(self, msg):
783 c = msg['content']
786 c = msg['content']
784 try:
787 try:
785 cpos = int(c['cursor_pos'])
788 cpos = int(c['cursor_pos'])
786 except:
789 except:
787 # If we don't get something that we can convert to an integer, at
790 # If we don't get something that we can convert to an integer, at
788 # least attempt the completion guessing the cursor is at the end of
791 # least attempt the completion guessing the cursor is at the end of
789 # the text, if there's any, and otherwise of the line
792 # the text, if there's any, and otherwise of the line
790 cpos = len(c['text'])
793 cpos = len(c['text'])
791 if cpos==0:
794 if cpos==0:
792 cpos = len(c['line'])
795 cpos = len(c['line'])
793 return self.shell.complete(c['text'], c['line'], cpos)
796 return self.shell.complete(c['text'], c['line'], cpos)
794
797
795 def _at_shutdown(self):
798 def _at_shutdown(self):
796 """Actions taken at shutdown by the kernel, called by python's atexit.
799 """Actions taken at shutdown by the kernel, called by python's atexit.
797 """
800 """
798 # io.rprint("Kernel at_shutdown") # dbg
801 # io.rprint("Kernel at_shutdown") # dbg
799 if self._shutdown_message is not None:
802 if self._shutdown_message is not None:
800 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
803 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
801 self.log.debug("%s", self._shutdown_message)
804 self.log.debug("%s", self._shutdown_message)
802 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
805 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
803
806
General Comments 0
You need to be logged in to leave comments. Login now