##// END OF EJS Templates
clear _reply_content cache before using it...
MinRK -
Show More
@@ -1,799 +1,800 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """An interactive kernel that talks to frontends over 0MQ."""
2 """An interactive kernel that talks to frontends over 0MQ."""
3
3
4 #-----------------------------------------------------------------------------
4 #-----------------------------------------------------------------------------
5 # Imports
5 # Imports
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 from __future__ import print_function
7 from __future__ import print_function
8
8
9 # Standard library imports
9 # Standard library imports
10 import sys
10 import sys
11 import time
11 import time
12 import traceback
12 import traceback
13 import logging
13 import logging
14 import uuid
14 import uuid
15
15
16 from datetime import datetime
16 from datetime import datetime
17 from signal import (
17 from signal import (
18 signal, default_int_handler, SIGINT
18 signal, default_int_handler, SIGINT
19 )
19 )
20
20
21 # System library imports
21 # System library imports
22 import zmq
22 import zmq
23 from zmq.eventloop import ioloop
23 from zmq.eventloop import ioloop
24 from zmq.eventloop.zmqstream import ZMQStream
24 from zmq.eventloop.zmqstream import ZMQStream
25
25
26 # Local imports
26 # Local imports
27 from IPython.config.configurable import Configurable
27 from IPython.config.configurable import Configurable
28 from IPython.core.error import StdinNotImplementedError
28 from IPython.core.error import StdinNotImplementedError
29 from IPython.core import release
29 from IPython.core import release
30 from IPython.utils import py3compat
30 from IPython.utils import py3compat
31 from IPython.utils.py3compat import builtin_mod, unicode_type, string_types
31 from IPython.utils.py3compat import builtin_mod, unicode_type, string_types
32 from IPython.utils.jsonutil import json_clean
32 from IPython.utils.jsonutil import json_clean
33 from IPython.utils.traitlets import (
33 from IPython.utils.traitlets import (
34 Any, Instance, Float, Dict, List, Set, Integer, Unicode,
34 Any, Instance, Float, Dict, List, Set, Integer, Unicode,
35 Type, Bool,
35 Type, Bool,
36 )
36 )
37
37
38 from .serialize import serialize_object, unpack_apply_message
38 from .serialize import serialize_object, unpack_apply_message
39 from .session import Session
39 from .session import Session
40 from .zmqshell import ZMQInteractiveShell
40 from .zmqshell import ZMQInteractiveShell
41
41
42
42
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44 # Main kernel class
44 # Main kernel class
45 #-----------------------------------------------------------------------------
45 #-----------------------------------------------------------------------------
46
46
47 protocol_version = list(release.kernel_protocol_version_info)
47 protocol_version = list(release.kernel_protocol_version_info)
48 ipython_version = list(release.version_info)
48 ipython_version = list(release.version_info)
49 language_version = list(sys.version_info[:3])
49 language_version = list(sys.version_info[:3])
50
50
51
51
52 class Kernel(Configurable):
52 class Kernel(Configurable):
53
53
54 #---------------------------------------------------------------------------
54 #---------------------------------------------------------------------------
55 # Kernel interface
55 # Kernel interface
56 #---------------------------------------------------------------------------
56 #---------------------------------------------------------------------------
57
57
58 # attribute to override with a GUI
58 # attribute to override with a GUI
59 eventloop = Any(None)
59 eventloop = Any(None)
60 def _eventloop_changed(self, name, old, new):
60 def _eventloop_changed(self, name, old, new):
61 """schedule call to eventloop from IOLoop"""
61 """schedule call to eventloop from IOLoop"""
62 loop = ioloop.IOLoop.instance()
62 loop = ioloop.IOLoop.instance()
63 loop.add_callback(self.enter_eventloop)
63 loop.add_callback(self.enter_eventloop)
64
64
65 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
65 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
66 shell_class = Type(ZMQInteractiveShell)
66 shell_class = Type(ZMQInteractiveShell)
67
67
68 session = Instance(Session)
68 session = Instance(Session)
69 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
69 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
70 shell_streams = List()
70 shell_streams = List()
71 control_stream = Instance(ZMQStream)
71 control_stream = Instance(ZMQStream)
72 iopub_socket = Instance(zmq.Socket)
72 iopub_socket = Instance(zmq.Socket)
73 stdin_socket = Instance(zmq.Socket)
73 stdin_socket = Instance(zmq.Socket)
74 log = Instance(logging.Logger)
74 log = Instance(logging.Logger)
75
75
76 user_module = Any()
76 user_module = Any()
77 def _user_module_changed(self, name, old, new):
77 def _user_module_changed(self, name, old, new):
78 if self.shell is not None:
78 if self.shell is not None:
79 self.shell.user_module = new
79 self.shell.user_module = new
80
80
81 user_ns = Instance(dict, args=None, allow_none=True)
81 user_ns = Instance(dict, args=None, allow_none=True)
82 def _user_ns_changed(self, name, old, new):
82 def _user_ns_changed(self, name, old, new):
83 if self.shell is not None:
83 if self.shell is not None:
84 self.shell.user_ns = new
84 self.shell.user_ns = new
85 self.shell.init_user_ns()
85 self.shell.init_user_ns()
86
86
87 # identities:
87 # identities:
88 int_id = Integer(-1)
88 int_id = Integer(-1)
89 ident = Unicode()
89 ident = Unicode()
90
90
91 def _ident_default(self):
91 def _ident_default(self):
92 return unicode_type(uuid.uuid4())
92 return unicode_type(uuid.uuid4())
93
93
94 # Private interface
94 # Private interface
95
95
96 _darwin_app_nap = Bool(True, config=True,
96 _darwin_app_nap = Bool(True, config=True,
97 help="""Whether to use appnope for compatiblity with OS X App Nap.
97 help="""Whether to use appnope for compatiblity with OS X App Nap.
98
98
99 Only affects OS X >= 10.9.
99 Only affects OS X >= 10.9.
100 """
100 """
101 )
101 )
102
102
103 # Time to sleep after flushing the stdout/err buffers in each execute
103 # Time to sleep after flushing the stdout/err buffers in each execute
104 # cycle. While this introduces a hard limit on the minimal latency of the
104 # cycle. While this introduces a hard limit on the minimal latency of the
105 # execute cycle, it helps prevent output synchronization problems for
105 # execute cycle, it helps prevent output synchronization problems for
106 # clients.
106 # clients.
107 # Units are in seconds. The minimum zmq latency on local host is probably
107 # Units are in seconds. The minimum zmq latency on local host is probably
108 # ~150 microseconds, set this to 500us for now. We may need to increase it
108 # ~150 microseconds, set this to 500us for now. We may need to increase it
109 # a little if it's not enough after more interactive testing.
109 # a little if it's not enough after more interactive testing.
110 _execute_sleep = Float(0.0005, config=True)
110 _execute_sleep = Float(0.0005, config=True)
111
111
112 # Frequency of the kernel's event loop.
112 # Frequency of the kernel's event loop.
113 # Units are in seconds, kernel subclasses for GUI toolkits may need to
113 # Units are in seconds, kernel subclasses for GUI toolkits may need to
114 # adapt to milliseconds.
114 # adapt to milliseconds.
115 _poll_interval = Float(0.05, config=True)
115 _poll_interval = Float(0.05, config=True)
116
116
117 # If the shutdown was requested over the network, we leave here the
117 # If the shutdown was requested over the network, we leave here the
118 # necessary reply message so it can be sent by our registered atexit
118 # necessary reply message so it can be sent by our registered atexit
119 # handler. This ensures that the reply is only sent to clients truly at
119 # handler. This ensures that the reply is only sent to clients truly at
120 # the end of our shutdown process (which happens after the underlying
120 # the end of our shutdown process (which happens after the underlying
121 # IPython shell's own shutdown).
121 # IPython shell's own shutdown).
122 _shutdown_message = None
122 _shutdown_message = None
123
123
124 # This is a dict of port number that the kernel is listening on. It is set
124 # This is a dict of port number that the kernel is listening on. It is set
125 # by record_ports and used by connect_request.
125 # by record_ports and used by connect_request.
126 _recorded_ports = Dict()
126 _recorded_ports = Dict()
127
127
128 # A reference to the Python builtin 'raw_input' function.
128 # A reference to the Python builtin 'raw_input' function.
129 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
129 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
130 _sys_raw_input = Any()
130 _sys_raw_input = Any()
131 _sys_eval_input = Any()
131 _sys_eval_input = Any()
132
132
133 # set of aborted msg_ids
133 # set of aborted msg_ids
134 aborted = Set()
134 aborted = Set()
135
135
136
136
137 def __init__(self, **kwargs):
137 def __init__(self, **kwargs):
138 super(Kernel, self).__init__(**kwargs)
138 super(Kernel, self).__init__(**kwargs)
139
139
140 # Initialize the InteractiveShell subclass
140 # Initialize the InteractiveShell subclass
141 self.shell = self.shell_class.instance(parent=self,
141 self.shell = self.shell_class.instance(parent=self,
142 profile_dir = self.profile_dir,
142 profile_dir = self.profile_dir,
143 user_module = self.user_module,
143 user_module = self.user_module,
144 user_ns = self.user_ns,
144 user_ns = self.user_ns,
145 kernel = self,
145 kernel = self,
146 )
146 )
147 self.shell.displayhook.session = self.session
147 self.shell.displayhook.session = self.session
148 self.shell.displayhook.pub_socket = self.iopub_socket
148 self.shell.displayhook.pub_socket = self.iopub_socket
149 self.shell.displayhook.topic = self._topic('pyout')
149 self.shell.displayhook.topic = self._topic('pyout')
150 self.shell.display_pub.session = self.session
150 self.shell.display_pub.session = self.session
151 self.shell.display_pub.pub_socket = self.iopub_socket
151 self.shell.display_pub.pub_socket = self.iopub_socket
152 self.shell.data_pub.session = self.session
152 self.shell.data_pub.session = self.session
153 self.shell.data_pub.pub_socket = self.iopub_socket
153 self.shell.data_pub.pub_socket = self.iopub_socket
154
154
155 # TMP - hack while developing
155 # TMP - hack while developing
156 self.shell._reply_content = None
156 self.shell._reply_content = None
157
157
158 # Build dict of handlers for message types
158 # Build dict of handlers for message types
159 msg_types = [ 'execute_request', 'complete_request',
159 msg_types = [ 'execute_request', 'complete_request',
160 'object_info_request', 'history_request',
160 'object_info_request', 'history_request',
161 'kernel_info_request',
161 'kernel_info_request',
162 'connect_request', 'shutdown_request',
162 'connect_request', 'shutdown_request',
163 'apply_request',
163 'apply_request',
164 ]
164 ]
165 self.shell_handlers = {}
165 self.shell_handlers = {}
166 for msg_type in msg_types:
166 for msg_type in msg_types:
167 self.shell_handlers[msg_type] = getattr(self, msg_type)
167 self.shell_handlers[msg_type] = getattr(self, msg_type)
168
168
169 comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ]
169 comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ]
170 comm_manager = self.shell.comm_manager
170 comm_manager = self.shell.comm_manager
171 for msg_type in comm_msg_types:
171 for msg_type in comm_msg_types:
172 self.shell_handlers[msg_type] = getattr(comm_manager, msg_type)
172 self.shell_handlers[msg_type] = getattr(comm_manager, msg_type)
173
173
174 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
174 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
175 self.control_handlers = {}
175 self.control_handlers = {}
176 for msg_type in control_msg_types:
176 for msg_type in control_msg_types:
177 self.control_handlers[msg_type] = getattr(self, msg_type)
177 self.control_handlers[msg_type] = getattr(self, msg_type)
178
178
179
179
180 def dispatch_control(self, msg):
180 def dispatch_control(self, msg):
181 """dispatch control requests"""
181 """dispatch control requests"""
182 idents,msg = self.session.feed_identities(msg, copy=False)
182 idents,msg = self.session.feed_identities(msg, copy=False)
183 try:
183 try:
184 msg = self.session.unserialize(msg, content=True, copy=False)
184 msg = self.session.unserialize(msg, content=True, copy=False)
185 except:
185 except:
186 self.log.error("Invalid Control Message", exc_info=True)
186 self.log.error("Invalid Control Message", exc_info=True)
187 return
187 return
188
188
189 self.log.debug("Control received: %s", msg)
189 self.log.debug("Control received: %s", msg)
190
190
191 header = msg['header']
191 header = msg['header']
192 msg_id = header['msg_id']
192 msg_id = header['msg_id']
193 msg_type = header['msg_type']
193 msg_type = header['msg_type']
194
194
195 handler = self.control_handlers.get(msg_type, None)
195 handler = self.control_handlers.get(msg_type, None)
196 if handler is None:
196 if handler is None:
197 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
197 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
198 else:
198 else:
199 try:
199 try:
200 handler(self.control_stream, idents, msg)
200 handler(self.control_stream, idents, msg)
201 except Exception:
201 except Exception:
202 self.log.error("Exception in control handler:", exc_info=True)
202 self.log.error("Exception in control handler:", exc_info=True)
203
203
204 def dispatch_shell(self, stream, msg):
204 def dispatch_shell(self, stream, msg):
205 """dispatch shell requests"""
205 """dispatch shell requests"""
206 # flush control requests first
206 # flush control requests first
207 if self.control_stream:
207 if self.control_stream:
208 self.control_stream.flush()
208 self.control_stream.flush()
209
209
210 idents,msg = self.session.feed_identities(msg, copy=False)
210 idents,msg = self.session.feed_identities(msg, copy=False)
211 try:
211 try:
212 msg = self.session.unserialize(msg, content=True, copy=False)
212 msg = self.session.unserialize(msg, content=True, copy=False)
213 except:
213 except:
214 self.log.error("Invalid Message", exc_info=True)
214 self.log.error("Invalid Message", exc_info=True)
215 return
215 return
216
216
217 header = msg['header']
217 header = msg['header']
218 msg_id = header['msg_id']
218 msg_id = header['msg_id']
219 msg_type = msg['header']['msg_type']
219 msg_type = msg['header']['msg_type']
220
220
221 # Print some info about this message and leave a '--->' marker, so it's
221 # Print some info about this message and leave a '--->' marker, so it's
222 # easier to trace visually the message chain when debugging. Each
222 # easier to trace visually the message chain when debugging. Each
223 # handler prints its message at the end.
223 # handler prints its message at the end.
224 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
224 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
225 self.log.debug(' Content: %s\n --->\n ', msg['content'])
225 self.log.debug(' Content: %s\n --->\n ', msg['content'])
226
226
227 if msg_id in self.aborted:
227 if msg_id in self.aborted:
228 self.aborted.remove(msg_id)
228 self.aborted.remove(msg_id)
229 # is it safe to assume a msg_id will not be resubmitted?
229 # is it safe to assume a msg_id will not be resubmitted?
230 reply_type = msg_type.split('_')[0] + '_reply'
230 reply_type = msg_type.split('_')[0] + '_reply'
231 status = {'status' : 'aborted'}
231 status = {'status' : 'aborted'}
232 md = {'engine' : self.ident}
232 md = {'engine' : self.ident}
233 md.update(status)
233 md.update(status)
234 reply_msg = self.session.send(stream, reply_type, metadata=md,
234 reply_msg = self.session.send(stream, reply_type, metadata=md,
235 content=status, parent=msg, ident=idents)
235 content=status, parent=msg, ident=idents)
236 return
236 return
237
237
238 handler = self.shell_handlers.get(msg_type, None)
238 handler = self.shell_handlers.get(msg_type, None)
239 if handler is None:
239 if handler is None:
240 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
240 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
241 else:
241 else:
242 # ensure default_int_handler during handler call
242 # ensure default_int_handler during handler call
243 sig = signal(SIGINT, default_int_handler)
243 sig = signal(SIGINT, default_int_handler)
244 try:
244 try:
245 handler(stream, idents, msg)
245 handler(stream, idents, msg)
246 except Exception:
246 except Exception:
247 self.log.error("Exception in message handler:", exc_info=True)
247 self.log.error("Exception in message handler:", exc_info=True)
248 finally:
248 finally:
249 signal(SIGINT, sig)
249 signal(SIGINT, sig)
250
250
251 def enter_eventloop(self):
251 def enter_eventloop(self):
252 """enter eventloop"""
252 """enter eventloop"""
253 self.log.info("entering eventloop %s", self.eventloop)
253 self.log.info("entering eventloop %s", self.eventloop)
254 for stream in self.shell_streams:
254 for stream in self.shell_streams:
255 # flush any pending replies,
255 # flush any pending replies,
256 # which may be skipped by entering the eventloop
256 # which may be skipped by entering the eventloop
257 stream.flush(zmq.POLLOUT)
257 stream.flush(zmq.POLLOUT)
258 # restore default_int_handler
258 # restore default_int_handler
259 signal(SIGINT, default_int_handler)
259 signal(SIGINT, default_int_handler)
260 while self.eventloop is not None:
260 while self.eventloop is not None:
261 try:
261 try:
262 self.eventloop(self)
262 self.eventloop(self)
263 except KeyboardInterrupt:
263 except KeyboardInterrupt:
264 # Ctrl-C shouldn't crash the kernel
264 # Ctrl-C shouldn't crash the kernel
265 self.log.error("KeyboardInterrupt caught in kernel")
265 self.log.error("KeyboardInterrupt caught in kernel")
266 continue
266 continue
267 else:
267 else:
268 # eventloop exited cleanly, this means we should stop (right?)
268 # eventloop exited cleanly, this means we should stop (right?)
269 self.eventloop = None
269 self.eventloop = None
270 break
270 break
271 self.log.info("exiting eventloop")
271 self.log.info("exiting eventloop")
272
272
273 def start(self):
273 def start(self):
274 """register dispatchers for streams"""
274 """register dispatchers for streams"""
275 self.shell.exit_now = False
275 self.shell.exit_now = False
276 if self.control_stream:
276 if self.control_stream:
277 self.control_stream.on_recv(self.dispatch_control, copy=False)
277 self.control_stream.on_recv(self.dispatch_control, copy=False)
278
278
279 def make_dispatcher(stream):
279 def make_dispatcher(stream):
280 def dispatcher(msg):
280 def dispatcher(msg):
281 return self.dispatch_shell(stream, msg)
281 return self.dispatch_shell(stream, msg)
282 return dispatcher
282 return dispatcher
283
283
284 for s in self.shell_streams:
284 for s in self.shell_streams:
285 s.on_recv(make_dispatcher(s), copy=False)
285 s.on_recv(make_dispatcher(s), copy=False)
286
286
287 # publish idle status
287 # publish idle status
288 self._publish_status('starting')
288 self._publish_status('starting')
289
289
290 def do_one_iteration(self):
290 def do_one_iteration(self):
291 """step eventloop just once"""
291 """step eventloop just once"""
292 if self.control_stream:
292 if self.control_stream:
293 self.control_stream.flush()
293 self.control_stream.flush()
294 for stream in self.shell_streams:
294 for stream in self.shell_streams:
295 # handle at most one request per iteration
295 # handle at most one request per iteration
296 stream.flush(zmq.POLLIN, 1)
296 stream.flush(zmq.POLLIN, 1)
297 stream.flush(zmq.POLLOUT)
297 stream.flush(zmq.POLLOUT)
298
298
299
299
300 def record_ports(self, ports):
300 def record_ports(self, ports):
301 """Record the ports that this kernel is using.
301 """Record the ports that this kernel is using.
302
302
303 The creator of the Kernel instance must call this methods if they
303 The creator of the Kernel instance must call this methods if they
304 want the :meth:`connect_request` method to return the port numbers.
304 want the :meth:`connect_request` method to return the port numbers.
305 """
305 """
306 self._recorded_ports = ports
306 self._recorded_ports = ports
307
307
308 #---------------------------------------------------------------------------
308 #---------------------------------------------------------------------------
309 # Kernel request handlers
309 # Kernel request handlers
310 #---------------------------------------------------------------------------
310 #---------------------------------------------------------------------------
311
311
312 def _make_metadata(self, other=None):
312 def _make_metadata(self, other=None):
313 """init metadata dict, for execute/apply_reply"""
313 """init metadata dict, for execute/apply_reply"""
314 new_md = {
314 new_md = {
315 'dependencies_met' : True,
315 'dependencies_met' : True,
316 'engine' : self.ident,
316 'engine' : self.ident,
317 'started': datetime.now(),
317 'started': datetime.now(),
318 }
318 }
319 if other:
319 if other:
320 new_md.update(other)
320 new_md.update(other)
321 return new_md
321 return new_md
322
322
323 def _publish_pyin(self, code, parent, execution_count):
323 def _publish_pyin(self, code, parent, execution_count):
324 """Publish the code request on the pyin stream."""
324 """Publish the code request on the pyin stream."""
325
325
326 self.session.send(self.iopub_socket, u'pyin',
326 self.session.send(self.iopub_socket, u'pyin',
327 {u'code':code, u'execution_count': execution_count},
327 {u'code':code, u'execution_count': execution_count},
328 parent=parent, ident=self._topic('pyin')
328 parent=parent, ident=self._topic('pyin')
329 )
329 )
330
330
331 def _publish_status(self, status, parent=None):
331 def _publish_status(self, status, parent=None):
332 """send status (busy/idle) on IOPub"""
332 """send status (busy/idle) on IOPub"""
333 self.session.send(self.iopub_socket,
333 self.session.send(self.iopub_socket,
334 u'status',
334 u'status',
335 {u'execution_state': status},
335 {u'execution_state': status},
336 parent=parent,
336 parent=parent,
337 ident=self._topic('status'),
337 ident=self._topic('status'),
338 )
338 )
339
339
340
340
341 def execute_request(self, stream, ident, parent):
341 def execute_request(self, stream, ident, parent):
342 """handle an execute_request"""
342 """handle an execute_request"""
343
343
344 self._publish_status(u'busy', parent)
344 self._publish_status(u'busy', parent)
345
345
346 try:
346 try:
347 content = parent[u'content']
347 content = parent[u'content']
348 code = py3compat.cast_unicode_py2(content[u'code'])
348 code = py3compat.cast_unicode_py2(content[u'code'])
349 silent = content[u'silent']
349 silent = content[u'silent']
350 store_history = content.get(u'store_history', not silent)
350 store_history = content.get(u'store_history', not silent)
351 except:
351 except:
352 self.log.error("Got bad msg: ")
352 self.log.error("Got bad msg: ")
353 self.log.error("%s", parent)
353 self.log.error("%s", parent)
354 return
354 return
355
355
356 md = self._make_metadata(parent['metadata'])
356 md = self._make_metadata(parent['metadata'])
357
357
358 shell = self.shell # we'll need this a lot here
358 shell = self.shell # we'll need this a lot here
359
359
360 # Replace raw_input. Note that is not sufficient to replace
360 # Replace raw_input. Note that is not sufficient to replace
361 # raw_input in the user namespace.
361 # raw_input in the user namespace.
362 if content.get('allow_stdin', False):
362 if content.get('allow_stdin', False):
363 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
363 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
364 input = lambda prompt='': eval(raw_input(prompt))
364 input = lambda prompt='': eval(raw_input(prompt))
365 else:
365 else:
366 raw_input = input = lambda prompt='' : self._no_raw_input()
366 raw_input = input = lambda prompt='' : self._no_raw_input()
367
367
368 if py3compat.PY3:
368 if py3compat.PY3:
369 self._sys_raw_input = builtin_mod.input
369 self._sys_raw_input = builtin_mod.input
370 builtin_mod.input = raw_input
370 builtin_mod.input = raw_input
371 else:
371 else:
372 self._sys_raw_input = builtin_mod.raw_input
372 self._sys_raw_input = builtin_mod.raw_input
373 self._sys_eval_input = builtin_mod.input
373 self._sys_eval_input = builtin_mod.input
374 builtin_mod.raw_input = raw_input
374 builtin_mod.raw_input = raw_input
375 builtin_mod.input = input
375 builtin_mod.input = input
376
376
377 # Set the parent message of the display hook and out streams.
377 # Set the parent message of the display hook and out streams.
378 shell.set_parent(parent)
378 shell.set_parent(parent)
379
379
380 # Re-broadcast our input for the benefit of listening clients, and
380 # Re-broadcast our input for the benefit of listening clients, and
381 # start computing output
381 # start computing output
382 if not silent:
382 if not silent:
383 self._publish_pyin(code, parent, shell.execution_count)
383 self._publish_pyin(code, parent, shell.execution_count)
384
384
385 reply_content = {}
385 reply_content = {}
386 # FIXME: the shell calls the exception handler itself.
387 shell._reply_content = None
386 try:
388 try:
387 # FIXME: the shell calls the exception handler itself.
388 shell.run_cell(code, store_history=store_history, silent=silent)
389 shell.run_cell(code, store_history=store_history, silent=silent)
389 except:
390 except:
390 status = u'error'
391 status = u'error'
391 # FIXME: this code right now isn't being used yet by default,
392 # FIXME: this code right now isn't being used yet by default,
392 # because the run_cell() call above directly fires off exception
393 # because the run_cell() call above directly fires off exception
393 # reporting. This code, therefore, is only active in the scenario
394 # reporting. This code, therefore, is only active in the scenario
394 # where runlines itself has an unhandled exception. We need to
395 # where runlines itself has an unhandled exception. We need to
395 # uniformize this, for all exception construction to come from a
396 # uniformize this, for all exception construction to come from a
396 # single location in the codbase.
397 # single location in the codbase.
397 etype, evalue, tb = sys.exc_info()
398 etype, evalue, tb = sys.exc_info()
398 tb_list = traceback.format_exception(etype, evalue, tb)
399 tb_list = traceback.format_exception(etype, evalue, tb)
399 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
400 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
400 else:
401 else:
401 status = u'ok'
402 status = u'ok'
402 finally:
403 finally:
403 # Restore raw_input.
404 # Restore raw_input.
404 if py3compat.PY3:
405 if py3compat.PY3:
405 builtin_mod.input = self._sys_raw_input
406 builtin_mod.input = self._sys_raw_input
406 else:
407 else:
407 builtin_mod.raw_input = self._sys_raw_input
408 builtin_mod.raw_input = self._sys_raw_input
408 builtin_mod.input = self._sys_eval_input
409 builtin_mod.input = self._sys_eval_input
409
410
410 reply_content[u'status'] = status
411 reply_content[u'status'] = status
411
412
412 # Return the execution counter so clients can display prompts
413 # Return the execution counter so clients can display prompts
413 reply_content['execution_count'] = shell.execution_count - 1
414 reply_content['execution_count'] = shell.execution_count - 1
414
415
415 # FIXME - fish exception info out of shell, possibly left there by
416 # FIXME - fish exception info out of shell, possibly left there by
416 # runlines. We'll need to clean up this logic later.
417 # runlines. We'll need to clean up this logic later.
417 if shell._reply_content is not None:
418 if shell._reply_content is not None:
418 reply_content.update(shell._reply_content)
419 reply_content.update(shell._reply_content)
419 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
420 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
420 reply_content['engine_info'] = e_info
421 reply_content['engine_info'] = e_info
421 # reset after use
422 # reset after use
422 shell._reply_content = None
423 shell._reply_content = None
423
424
424 if 'traceback' in reply_content:
425 if 'traceback' in reply_content:
425 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
426 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
426
427
427
428
428 # At this point, we can tell whether the main code execution succeeded
429 # At this point, we can tell whether the main code execution succeeded
429 # or not. If it did, we proceed to evaluate user_variables/expressions
430 # or not. If it did, we proceed to evaluate user_variables/expressions
430 if reply_content['status'] == 'ok':
431 if reply_content['status'] == 'ok':
431 reply_content[u'user_variables'] = \
432 reply_content[u'user_variables'] = \
432 shell.user_variables(content.get(u'user_variables', []))
433 shell.user_variables(content.get(u'user_variables', []))
433 reply_content[u'user_expressions'] = \
434 reply_content[u'user_expressions'] = \
434 shell.user_expressions(content.get(u'user_expressions', {}))
435 shell.user_expressions(content.get(u'user_expressions', {}))
435 else:
436 else:
436 # If there was an error, don't even try to compute variables or
437 # If there was an error, don't even try to compute variables or
437 # expressions
438 # expressions
438 reply_content[u'user_variables'] = {}
439 reply_content[u'user_variables'] = {}
439 reply_content[u'user_expressions'] = {}
440 reply_content[u'user_expressions'] = {}
440
441
441 # Payloads should be retrieved regardless of outcome, so we can both
442 # Payloads should be retrieved regardless of outcome, so we can both
442 # recover partial output (that could have been generated early in a
443 # recover partial output (that could have been generated early in a
443 # block, before an error) and clear the payload system always.
444 # block, before an error) and clear the payload system always.
444 reply_content[u'payload'] = shell.payload_manager.read_payload()
445 reply_content[u'payload'] = shell.payload_manager.read_payload()
445 # Be agressive about clearing the payload because we don't want
446 # Be agressive about clearing the payload because we don't want
446 # it to sit in memory until the next execute_request comes in.
447 # it to sit in memory until the next execute_request comes in.
447 shell.payload_manager.clear_payload()
448 shell.payload_manager.clear_payload()
448
449
449 # Flush output before sending the reply.
450 # Flush output before sending the reply.
450 sys.stdout.flush()
451 sys.stdout.flush()
451 sys.stderr.flush()
452 sys.stderr.flush()
452 # FIXME: on rare occasions, the flush doesn't seem to make it to the
453 # FIXME: on rare occasions, the flush doesn't seem to make it to the
453 # clients... This seems to mitigate the problem, but we definitely need
454 # clients... This seems to mitigate the problem, but we definitely need
454 # to better understand what's going on.
455 # to better understand what's going on.
455 if self._execute_sleep:
456 if self._execute_sleep:
456 time.sleep(self._execute_sleep)
457 time.sleep(self._execute_sleep)
457
458
458 # Send the reply.
459 # Send the reply.
459 reply_content = json_clean(reply_content)
460 reply_content = json_clean(reply_content)
460
461
461 md['status'] = reply_content['status']
462 md['status'] = reply_content['status']
462 if reply_content['status'] == 'error' and \
463 if reply_content['status'] == 'error' and \
463 reply_content['ename'] == 'UnmetDependency':
464 reply_content['ename'] == 'UnmetDependency':
464 md['dependencies_met'] = False
465 md['dependencies_met'] = False
465
466
466 reply_msg = self.session.send(stream, u'execute_reply',
467 reply_msg = self.session.send(stream, u'execute_reply',
467 reply_content, parent, metadata=md,
468 reply_content, parent, metadata=md,
468 ident=ident)
469 ident=ident)
469
470
470 self.log.debug("%s", reply_msg)
471 self.log.debug("%s", reply_msg)
471
472
472 if not silent and reply_msg['content']['status'] == u'error':
473 if not silent and reply_msg['content']['status'] == u'error':
473 self._abort_queues()
474 self._abort_queues()
474
475
475 self._publish_status(u'idle', parent)
476 self._publish_status(u'idle', parent)
476
477
477 def complete_request(self, stream, ident, parent):
478 def complete_request(self, stream, ident, parent):
478 txt, matches = self._complete(parent)
479 txt, matches = self._complete(parent)
479 matches = {'matches' : matches,
480 matches = {'matches' : matches,
480 'matched_text' : txt,
481 'matched_text' : txt,
481 'status' : 'ok'}
482 'status' : 'ok'}
482 matches = json_clean(matches)
483 matches = json_clean(matches)
483 completion_msg = self.session.send(stream, 'complete_reply',
484 completion_msg = self.session.send(stream, 'complete_reply',
484 matches, parent, ident)
485 matches, parent, ident)
485 self.log.debug("%s", completion_msg)
486 self.log.debug("%s", completion_msg)
486
487
487 def object_info_request(self, stream, ident, parent):
488 def object_info_request(self, stream, ident, parent):
488 content = parent['content']
489 content = parent['content']
489 object_info = self.shell.object_inspect(content['oname'],
490 object_info = self.shell.object_inspect(content['oname'],
490 detail_level = content.get('detail_level', 0)
491 detail_level = content.get('detail_level', 0)
491 )
492 )
492 # Before we send this object over, we scrub it for JSON usage
493 # Before we send this object over, we scrub it for JSON usage
493 oinfo = json_clean(object_info)
494 oinfo = json_clean(object_info)
494 msg = self.session.send(stream, 'object_info_reply',
495 msg = self.session.send(stream, 'object_info_reply',
495 oinfo, parent, ident)
496 oinfo, parent, ident)
496 self.log.debug("%s", msg)
497 self.log.debug("%s", msg)
497
498
498 def history_request(self, stream, ident, parent):
499 def history_request(self, stream, ident, parent):
499 # We need to pull these out, as passing **kwargs doesn't work with
500 # We need to pull these out, as passing **kwargs doesn't work with
500 # unicode keys before Python 2.6.5.
501 # unicode keys before Python 2.6.5.
501 hist_access_type = parent['content']['hist_access_type']
502 hist_access_type = parent['content']['hist_access_type']
502 raw = parent['content']['raw']
503 raw = parent['content']['raw']
503 output = parent['content']['output']
504 output = parent['content']['output']
504 if hist_access_type == 'tail':
505 if hist_access_type == 'tail':
505 n = parent['content']['n']
506 n = parent['content']['n']
506 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
507 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
507 include_latest=True)
508 include_latest=True)
508
509
509 elif hist_access_type == 'range':
510 elif hist_access_type == 'range':
510 session = parent['content']['session']
511 session = parent['content']['session']
511 start = parent['content']['start']
512 start = parent['content']['start']
512 stop = parent['content']['stop']
513 stop = parent['content']['stop']
513 hist = self.shell.history_manager.get_range(session, start, stop,
514 hist = self.shell.history_manager.get_range(session, start, stop,
514 raw=raw, output=output)
515 raw=raw, output=output)
515
516
516 elif hist_access_type == 'search':
517 elif hist_access_type == 'search':
517 n = parent['content'].get('n')
518 n = parent['content'].get('n')
518 unique = parent['content'].get('unique', False)
519 unique = parent['content'].get('unique', False)
519 pattern = parent['content']['pattern']
520 pattern = parent['content']['pattern']
520 hist = self.shell.history_manager.search(
521 hist = self.shell.history_manager.search(
521 pattern, raw=raw, output=output, n=n, unique=unique)
522 pattern, raw=raw, output=output, n=n, unique=unique)
522
523
523 else:
524 else:
524 hist = []
525 hist = []
525 hist = list(hist)
526 hist = list(hist)
526 content = {'history' : hist}
527 content = {'history' : hist}
527 content = json_clean(content)
528 content = json_clean(content)
528 msg = self.session.send(stream, 'history_reply',
529 msg = self.session.send(stream, 'history_reply',
529 content, parent, ident)
530 content, parent, ident)
530 self.log.debug("Sending history reply with %i entries", len(hist))
531 self.log.debug("Sending history reply with %i entries", len(hist))
531
532
532 def connect_request(self, stream, ident, parent):
533 def connect_request(self, stream, ident, parent):
533 if self._recorded_ports is not None:
534 if self._recorded_ports is not None:
534 content = self._recorded_ports.copy()
535 content = self._recorded_ports.copy()
535 else:
536 else:
536 content = {}
537 content = {}
537 msg = self.session.send(stream, 'connect_reply',
538 msg = self.session.send(stream, 'connect_reply',
538 content, parent, ident)
539 content, parent, ident)
539 self.log.debug("%s", msg)
540 self.log.debug("%s", msg)
540
541
541 def kernel_info_request(self, stream, ident, parent):
542 def kernel_info_request(self, stream, ident, parent):
542 vinfo = {
543 vinfo = {
543 'protocol_version': protocol_version,
544 'protocol_version': protocol_version,
544 'ipython_version': ipython_version,
545 'ipython_version': ipython_version,
545 'language_version': language_version,
546 'language_version': language_version,
546 'language': 'python',
547 'language': 'python',
547 }
548 }
548 msg = self.session.send(stream, 'kernel_info_reply',
549 msg = self.session.send(stream, 'kernel_info_reply',
549 vinfo, parent, ident)
550 vinfo, parent, ident)
550 self.log.debug("%s", msg)
551 self.log.debug("%s", msg)
551
552
552 def shutdown_request(self, stream, ident, parent):
553 def shutdown_request(self, stream, ident, parent):
553 self.shell.exit_now = True
554 self.shell.exit_now = True
554 content = dict(status='ok')
555 content = dict(status='ok')
555 content.update(parent['content'])
556 content.update(parent['content'])
556 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
557 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
557 # same content, but different msg_id for broadcasting on IOPub
558 # same content, but different msg_id for broadcasting on IOPub
558 self._shutdown_message = self.session.msg(u'shutdown_reply',
559 self._shutdown_message = self.session.msg(u'shutdown_reply',
559 content, parent
560 content, parent
560 )
561 )
561
562
562 self._at_shutdown()
563 self._at_shutdown()
563 # call sys.exit after a short delay
564 # call sys.exit after a short delay
564 loop = ioloop.IOLoop.instance()
565 loop = ioloop.IOLoop.instance()
565 loop.add_timeout(time.time()+0.1, loop.stop)
566 loop.add_timeout(time.time()+0.1, loop.stop)
566
567
567 #---------------------------------------------------------------------------
568 #---------------------------------------------------------------------------
568 # Engine methods
569 # Engine methods
569 #---------------------------------------------------------------------------
570 #---------------------------------------------------------------------------
570
571
571 def apply_request(self, stream, ident, parent):
572 def apply_request(self, stream, ident, parent):
572 try:
573 try:
573 content = parent[u'content']
574 content = parent[u'content']
574 bufs = parent[u'buffers']
575 bufs = parent[u'buffers']
575 msg_id = parent['header']['msg_id']
576 msg_id = parent['header']['msg_id']
576 except:
577 except:
577 self.log.error("Got bad msg: %s", parent, exc_info=True)
578 self.log.error("Got bad msg: %s", parent, exc_info=True)
578 return
579 return
579
580
580 self._publish_status(u'busy', parent)
581 self._publish_status(u'busy', parent)
581
582
582 # Set the parent message of the display hook and out streams.
583 # Set the parent message of the display hook and out streams.
583 shell = self.shell
584 shell = self.shell
584 shell.set_parent(parent)
585 shell.set_parent(parent)
585
586
586 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
587 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
587 # self.iopub_socket.send(pyin_msg)
588 # self.iopub_socket.send(pyin_msg)
588 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
589 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
589 md = self._make_metadata(parent['metadata'])
590 md = self._make_metadata(parent['metadata'])
590 try:
591 try:
591 working = shell.user_ns
592 working = shell.user_ns
592
593
593 prefix = "_"+str(msg_id).replace("-","")+"_"
594 prefix = "_"+str(msg_id).replace("-","")+"_"
594
595
595 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
596 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
596
597
597 fname = getattr(f, '__name__', 'f')
598 fname = getattr(f, '__name__', 'f')
598
599
599 fname = prefix+"f"
600 fname = prefix+"f"
600 argname = prefix+"args"
601 argname = prefix+"args"
601 kwargname = prefix+"kwargs"
602 kwargname = prefix+"kwargs"
602 resultname = prefix+"result"
603 resultname = prefix+"result"
603
604
604 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
605 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
605 # print ns
606 # print ns
606 working.update(ns)
607 working.update(ns)
607 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
608 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
608 try:
609 try:
609 exec(code, shell.user_global_ns, shell.user_ns)
610 exec(code, shell.user_global_ns, shell.user_ns)
610 result = working.get(resultname)
611 result = working.get(resultname)
611 finally:
612 finally:
612 for key in ns:
613 for key in ns:
613 working.pop(key)
614 working.pop(key)
614
615
615 result_buf = serialize_object(result,
616 result_buf = serialize_object(result,
616 buffer_threshold=self.session.buffer_threshold,
617 buffer_threshold=self.session.buffer_threshold,
617 item_threshold=self.session.item_threshold,
618 item_threshold=self.session.item_threshold,
618 )
619 )
619
620
620 except:
621 except:
621 # invoke IPython traceback formatting
622 # invoke IPython traceback formatting
622 shell.showtraceback()
623 shell.showtraceback()
623 # FIXME - fish exception info out of shell, possibly left there by
624 # FIXME - fish exception info out of shell, possibly left there by
624 # run_code. We'll need to clean up this logic later.
625 # run_code. We'll need to clean up this logic later.
625 reply_content = {}
626 reply_content = {}
626 if shell._reply_content is not None:
627 if shell._reply_content is not None:
627 reply_content.update(shell._reply_content)
628 reply_content.update(shell._reply_content)
628 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
629 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
629 reply_content['engine_info'] = e_info
630 reply_content['engine_info'] = e_info
630 # reset after use
631 # reset after use
631 shell._reply_content = None
632 shell._reply_content = None
632
633
633 self.session.send(self.iopub_socket, u'pyerr', reply_content, parent=parent,
634 self.session.send(self.iopub_socket, u'pyerr', reply_content, parent=parent,
634 ident=self._topic('pyerr'))
635 ident=self._topic('pyerr'))
635 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
636 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
636 result_buf = []
637 result_buf = []
637
638
638 if reply_content['ename'] == 'UnmetDependency':
639 if reply_content['ename'] == 'UnmetDependency':
639 md['dependencies_met'] = False
640 md['dependencies_met'] = False
640 else:
641 else:
641 reply_content = {'status' : 'ok'}
642 reply_content = {'status' : 'ok'}
642
643
643 # put 'ok'/'error' status in header, for scheduler introspection:
644 # put 'ok'/'error' status in header, for scheduler introspection:
644 md['status'] = reply_content['status']
645 md['status'] = reply_content['status']
645
646
646 # flush i/o
647 # flush i/o
647 sys.stdout.flush()
648 sys.stdout.flush()
648 sys.stderr.flush()
649 sys.stderr.flush()
649
650
650 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
651 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
651 parent=parent, ident=ident,buffers=result_buf, metadata=md)
652 parent=parent, ident=ident,buffers=result_buf, metadata=md)
652
653
653 self._publish_status(u'idle', parent)
654 self._publish_status(u'idle', parent)
654
655
655 #---------------------------------------------------------------------------
656 #---------------------------------------------------------------------------
656 # Control messages
657 # Control messages
657 #---------------------------------------------------------------------------
658 #---------------------------------------------------------------------------
658
659
659 def abort_request(self, stream, ident, parent):
660 def abort_request(self, stream, ident, parent):
660 """abort a specifig msg by id"""
661 """abort a specifig msg by id"""
661 msg_ids = parent['content'].get('msg_ids', None)
662 msg_ids = parent['content'].get('msg_ids', None)
662 if isinstance(msg_ids, string_types):
663 if isinstance(msg_ids, string_types):
663 msg_ids = [msg_ids]
664 msg_ids = [msg_ids]
664 if not msg_ids:
665 if not msg_ids:
665 self.abort_queues()
666 self.abort_queues()
666 for mid in msg_ids:
667 for mid in msg_ids:
667 self.aborted.add(str(mid))
668 self.aborted.add(str(mid))
668
669
669 content = dict(status='ok')
670 content = dict(status='ok')
670 reply_msg = self.session.send(stream, 'abort_reply', content=content,
671 reply_msg = self.session.send(stream, 'abort_reply', content=content,
671 parent=parent, ident=ident)
672 parent=parent, ident=ident)
672 self.log.debug("%s", reply_msg)
673 self.log.debug("%s", reply_msg)
673
674
674 def clear_request(self, stream, idents, parent):
675 def clear_request(self, stream, idents, parent):
675 """Clear our namespace."""
676 """Clear our namespace."""
676 self.shell.reset(False)
677 self.shell.reset(False)
677 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
678 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
678 content = dict(status='ok'))
679 content = dict(status='ok'))
679
680
680
681
681 #---------------------------------------------------------------------------
682 #---------------------------------------------------------------------------
682 # Protected interface
683 # Protected interface
683 #---------------------------------------------------------------------------
684 #---------------------------------------------------------------------------
684
685
685 def _wrap_exception(self, method=None):
686 def _wrap_exception(self, method=None):
686 # import here, because _wrap_exception is only used in parallel,
687 # import here, because _wrap_exception is only used in parallel,
687 # and parallel has higher min pyzmq version
688 # and parallel has higher min pyzmq version
688 from IPython.parallel.error import wrap_exception
689 from IPython.parallel.error import wrap_exception
689 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
690 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
690 content = wrap_exception(e_info)
691 content = wrap_exception(e_info)
691 return content
692 return content
692
693
693 def _topic(self, topic):
694 def _topic(self, topic):
694 """prefixed topic for IOPub messages"""
695 """prefixed topic for IOPub messages"""
695 if self.int_id >= 0:
696 if self.int_id >= 0:
696 base = "engine.%i" % self.int_id
697 base = "engine.%i" % self.int_id
697 else:
698 else:
698 base = "kernel.%s" % self.ident
699 base = "kernel.%s" % self.ident
699
700
700 return py3compat.cast_bytes("%s.%s" % (base, topic))
701 return py3compat.cast_bytes("%s.%s" % (base, topic))
701
702
702 def _abort_queues(self):
703 def _abort_queues(self):
703 for stream in self.shell_streams:
704 for stream in self.shell_streams:
704 if stream:
705 if stream:
705 self._abort_queue(stream)
706 self._abort_queue(stream)
706
707
707 def _abort_queue(self, stream):
708 def _abort_queue(self, stream):
708 poller = zmq.Poller()
709 poller = zmq.Poller()
709 poller.register(stream.socket, zmq.POLLIN)
710 poller.register(stream.socket, zmq.POLLIN)
710 while True:
711 while True:
711 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
712 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
712 if msg is None:
713 if msg is None:
713 return
714 return
714
715
715 self.log.info("Aborting:")
716 self.log.info("Aborting:")
716 self.log.info("%s", msg)
717 self.log.info("%s", msg)
717 msg_type = msg['header']['msg_type']
718 msg_type = msg['header']['msg_type']
718 reply_type = msg_type.split('_')[0] + '_reply'
719 reply_type = msg_type.split('_')[0] + '_reply'
719
720
720 status = {'status' : 'aborted'}
721 status = {'status' : 'aborted'}
721 md = {'engine' : self.ident}
722 md = {'engine' : self.ident}
722 md.update(status)
723 md.update(status)
723 reply_msg = self.session.send(stream, reply_type, metadata=md,
724 reply_msg = self.session.send(stream, reply_type, metadata=md,
724 content=status, parent=msg, ident=idents)
725 content=status, parent=msg, ident=idents)
725 self.log.debug("%s", reply_msg)
726 self.log.debug("%s", reply_msg)
726 # We need to wait a bit for requests to come in. This can probably
727 # We need to wait a bit for requests to come in. This can probably
727 # be set shorter for true asynchronous clients.
728 # be set shorter for true asynchronous clients.
728 poller.poll(50)
729 poller.poll(50)
729
730
730
731
731 def _no_raw_input(self):
732 def _no_raw_input(self):
732 """Raise StdinNotImplentedError if active frontend doesn't support
733 """Raise StdinNotImplentedError if active frontend doesn't support
733 stdin."""
734 stdin."""
734 raise StdinNotImplementedError("raw_input was called, but this "
735 raise StdinNotImplementedError("raw_input was called, but this "
735 "frontend does not support stdin.")
736 "frontend does not support stdin.")
736
737
737 def _raw_input(self, prompt, ident, parent):
738 def _raw_input(self, prompt, ident, parent):
738 # Flush output before making the request.
739 # Flush output before making the request.
739 sys.stderr.flush()
740 sys.stderr.flush()
740 sys.stdout.flush()
741 sys.stdout.flush()
741 # flush the stdin socket, to purge stale replies
742 # flush the stdin socket, to purge stale replies
742 while True:
743 while True:
743 try:
744 try:
744 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
745 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
745 except zmq.ZMQError as e:
746 except zmq.ZMQError as e:
746 if e.errno == zmq.EAGAIN:
747 if e.errno == zmq.EAGAIN:
747 break
748 break
748 else:
749 else:
749 raise
750 raise
750
751
751 # Send the input request.
752 # Send the input request.
752 content = json_clean(dict(prompt=prompt))
753 content = json_clean(dict(prompt=prompt))
753 self.session.send(self.stdin_socket, u'input_request', content, parent,
754 self.session.send(self.stdin_socket, u'input_request', content, parent,
754 ident=ident)
755 ident=ident)
755
756
756 # Await a response.
757 # Await a response.
757 while True:
758 while True:
758 try:
759 try:
759 ident, reply = self.session.recv(self.stdin_socket, 0)
760 ident, reply = self.session.recv(self.stdin_socket, 0)
760 except Exception:
761 except Exception:
761 self.log.warn("Invalid Message:", exc_info=True)
762 self.log.warn("Invalid Message:", exc_info=True)
762 except KeyboardInterrupt:
763 except KeyboardInterrupt:
763 # re-raise KeyboardInterrupt, to truncate traceback
764 # re-raise KeyboardInterrupt, to truncate traceback
764 raise KeyboardInterrupt
765 raise KeyboardInterrupt
765 else:
766 else:
766 break
767 break
767 try:
768 try:
768 value = py3compat.unicode_to_str(reply['content']['value'])
769 value = py3compat.unicode_to_str(reply['content']['value'])
769 except:
770 except:
770 self.log.error("Got bad raw_input reply: ")
771 self.log.error("Got bad raw_input reply: ")
771 self.log.error("%s", parent)
772 self.log.error("%s", parent)
772 value = ''
773 value = ''
773 if value == '\x04':
774 if value == '\x04':
774 # EOF
775 # EOF
775 raise EOFError
776 raise EOFError
776 return value
777 return value
777
778
778 def _complete(self, msg):
779 def _complete(self, msg):
779 c = msg['content']
780 c = msg['content']
780 try:
781 try:
781 cpos = int(c['cursor_pos'])
782 cpos = int(c['cursor_pos'])
782 except:
783 except:
783 # If we don't get something that we can convert to an integer, at
784 # If we don't get something that we can convert to an integer, at
784 # least attempt the completion guessing the cursor is at the end of
785 # least attempt the completion guessing the cursor is at the end of
785 # the text, if there's any, and otherwise of the line
786 # the text, if there's any, and otherwise of the line
786 cpos = len(c['text'])
787 cpos = len(c['text'])
787 if cpos==0:
788 if cpos==0:
788 cpos = len(c['line'])
789 cpos = len(c['line'])
789 return self.shell.complete(c['text'], c['line'], cpos)
790 return self.shell.complete(c['text'], c['line'], cpos)
790
791
791 def _at_shutdown(self):
792 def _at_shutdown(self):
792 """Actions taken at shutdown by the kernel, called by python's atexit.
793 """Actions taken at shutdown by the kernel, called by python's atexit.
793 """
794 """
794 # io.rprint("Kernel at_shutdown") # dbg
795 # io.rprint("Kernel at_shutdown") # dbg
795 if self._shutdown_message is not None:
796 if self._shutdown_message is not None:
796 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
797 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
797 self.log.debug("%s", self._shutdown_message)
798 self.log.debug("%s", self._shutdown_message)
798 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
799 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
799
800
General Comments 0
You need to be logged in to leave comments. Login now