##// END OF EJS Templates
add banner to kernel_info_reply
MinRK -
Show More
@@ -1,850 +1,851 b''
1 """An interactive kernel that talks to frontends over 0MQ."""
1 """An interactive kernel that talks to frontends over 0MQ."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from __future__ import print_function
6 from __future__ import print_function
7
7
8 import getpass
8 import getpass
9 import sys
9 import sys
10 import time
10 import time
11 import traceback
11 import traceback
12 import logging
12 import logging
13 import uuid
13 import uuid
14
14
15 from datetime import datetime
15 from datetime import datetime
16 from signal import (
16 from signal import (
17 signal, default_int_handler, SIGINT
17 signal, default_int_handler, SIGINT
18 )
18 )
19
19
20 import zmq
20 import zmq
21 from zmq.eventloop import ioloop
21 from zmq.eventloop import ioloop
22 from zmq.eventloop.zmqstream import ZMQStream
22 from zmq.eventloop.zmqstream import ZMQStream
23
23
24 from IPython.config.configurable import Configurable
24 from IPython.config.configurable import Configurable
25 from IPython.core.error import StdinNotImplementedError
25 from IPython.core.error import StdinNotImplementedError
26 from IPython.core import release
26 from IPython.core import release
27 from IPython.utils import py3compat
27 from IPython.utils import py3compat
28 from IPython.utils.py3compat import builtin_mod, unicode_type, string_types
28 from IPython.utils.py3compat import builtin_mod, unicode_type, string_types
29 from IPython.utils.jsonutil import json_clean
29 from IPython.utils.jsonutil import json_clean
30 from IPython.utils.tokenutil import token_at_cursor
30 from IPython.utils.tokenutil import token_at_cursor
31 from IPython.utils.traitlets import (
31 from IPython.utils.traitlets import (
32 Any, Instance, Float, Dict, List, Set, Integer, Unicode,
32 Any, Instance, Float, Dict, List, Set, Integer, Unicode,
33 Type, Bool,
33 Type, Bool,
34 )
34 )
35
35
36 from .serialize import serialize_object, unpack_apply_message
36 from .serialize import serialize_object, unpack_apply_message
37 from .session import Session
37 from .session import Session
38 from .zmqshell import ZMQInteractiveShell
38 from .zmqshell import ZMQInteractiveShell
39
39
40
40
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42 # Main kernel class
42 # Main kernel class
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44
44
45 protocol_version = release.kernel_protocol_version
45 protocol_version = release.kernel_protocol_version
46 ipython_version = release.version
46 ipython_version = release.version
47 language_version = sys.version.split()[0]
47 language_version = sys.version.split()[0]
48
48
49
49
50 class Kernel(Configurable):
50 class Kernel(Configurable):
51
51
52 #---------------------------------------------------------------------------
52 #---------------------------------------------------------------------------
53 # Kernel interface
53 # Kernel interface
54 #---------------------------------------------------------------------------
54 #---------------------------------------------------------------------------
55
55
56 # attribute to override with a GUI
56 # attribute to override with a GUI
57 eventloop = Any(None)
57 eventloop = Any(None)
58 def _eventloop_changed(self, name, old, new):
58 def _eventloop_changed(self, name, old, new):
59 """schedule call to eventloop from IOLoop"""
59 """schedule call to eventloop from IOLoop"""
60 loop = ioloop.IOLoop.instance()
60 loop = ioloop.IOLoop.instance()
61 loop.add_callback(self.enter_eventloop)
61 loop.add_callback(self.enter_eventloop)
62
62
63 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
63 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
64 shell_class = Type(ZMQInteractiveShell)
64 shell_class = Type(ZMQInteractiveShell)
65
65
66 session = Instance(Session)
66 session = Instance(Session)
67 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
67 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
68 shell_streams = List()
68 shell_streams = List()
69 control_stream = Instance(ZMQStream)
69 control_stream = Instance(ZMQStream)
70 iopub_socket = Instance(zmq.Socket)
70 iopub_socket = Instance(zmq.Socket)
71 stdin_socket = Instance(zmq.Socket)
71 stdin_socket = Instance(zmq.Socket)
72 log = Instance(logging.Logger)
72 log = Instance(logging.Logger)
73
73
74 user_module = Any()
74 user_module = Any()
75 def _user_module_changed(self, name, old, new):
75 def _user_module_changed(self, name, old, new):
76 if self.shell is not None:
76 if self.shell is not None:
77 self.shell.user_module = new
77 self.shell.user_module = new
78
78
79 user_ns = Instance(dict, args=None, allow_none=True)
79 user_ns = Instance(dict, args=None, allow_none=True)
80 def _user_ns_changed(self, name, old, new):
80 def _user_ns_changed(self, name, old, new):
81 if self.shell is not None:
81 if self.shell is not None:
82 self.shell.user_ns = new
82 self.shell.user_ns = new
83 self.shell.init_user_ns()
83 self.shell.init_user_ns()
84
84
85 # identities:
85 # identities:
86 int_id = Integer(-1)
86 int_id = Integer(-1)
87 ident = Unicode()
87 ident = Unicode()
88
88
89 def _ident_default(self):
89 def _ident_default(self):
90 return unicode_type(uuid.uuid4())
90 return unicode_type(uuid.uuid4())
91
91
92 # Private interface
92 # Private interface
93
93
94 _darwin_app_nap = Bool(True, config=True,
94 _darwin_app_nap = Bool(True, config=True,
95 help="""Whether to use appnope for compatiblity with OS X App Nap.
95 help="""Whether to use appnope for compatiblity with OS X App Nap.
96
96
97 Only affects OS X >= 10.9.
97 Only affects OS X >= 10.9.
98 """
98 """
99 )
99 )
100
100
101 # track associations with current request
101 # track associations with current request
102 _allow_stdin = Bool(False)
102 _allow_stdin = Bool(False)
103 _parent_header = Dict()
103 _parent_header = Dict()
104 _parent_ident = Any(b'')
104 _parent_ident = Any(b'')
105 # Time to sleep after flushing the stdout/err buffers in each execute
105 # Time to sleep after flushing the stdout/err buffers in each execute
106 # cycle. While this introduces a hard limit on the minimal latency of the
106 # cycle. While this introduces a hard limit on the minimal latency of the
107 # execute cycle, it helps prevent output synchronization problems for
107 # execute cycle, it helps prevent output synchronization problems for
108 # clients.
108 # clients.
109 # Units are in seconds. The minimum zmq latency on local host is probably
109 # Units are in seconds. The minimum zmq latency on local host is probably
110 # ~150 microseconds, set this to 500us for now. We may need to increase it
110 # ~150 microseconds, set this to 500us for now. We may need to increase it
111 # a little if it's not enough after more interactive testing.
111 # a little if it's not enough after more interactive testing.
112 _execute_sleep = Float(0.0005, config=True)
112 _execute_sleep = Float(0.0005, config=True)
113
113
114 # Frequency of the kernel's event loop.
114 # Frequency of the kernel's event loop.
115 # Units are in seconds, kernel subclasses for GUI toolkits may need to
115 # Units are in seconds, kernel subclasses for GUI toolkits may need to
116 # adapt to milliseconds.
116 # adapt to milliseconds.
117 _poll_interval = Float(0.05, config=True)
117 _poll_interval = Float(0.05, config=True)
118
118
119 # If the shutdown was requested over the network, we leave here the
119 # If the shutdown was requested over the network, we leave here the
120 # necessary reply message so it can be sent by our registered atexit
120 # necessary reply message so it can be sent by our registered atexit
121 # handler. This ensures that the reply is only sent to clients truly at
121 # handler. This ensures that the reply is only sent to clients truly at
122 # the end of our shutdown process (which happens after the underlying
122 # the end of our shutdown process (which happens after the underlying
123 # IPython shell's own shutdown).
123 # IPython shell's own shutdown).
124 _shutdown_message = None
124 _shutdown_message = None
125
125
126 # This is a dict of port number that the kernel is listening on. It is set
126 # This is a dict of port number that the kernel is listening on. It is set
127 # by record_ports and used by connect_request.
127 # by record_ports and used by connect_request.
128 _recorded_ports = Dict()
128 _recorded_ports = Dict()
129
129
130 # A reference to the Python builtin 'raw_input' function.
130 # A reference to the Python builtin 'raw_input' function.
131 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
131 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
132 _sys_raw_input = Any()
132 _sys_raw_input = Any()
133 _sys_eval_input = Any()
133 _sys_eval_input = Any()
134
134
135 # set of aborted msg_ids
135 # set of aborted msg_ids
136 aborted = Set()
136 aborted = Set()
137
137
138
138
139 def __init__(self, **kwargs):
139 def __init__(self, **kwargs):
140 super(Kernel, self).__init__(**kwargs)
140 super(Kernel, self).__init__(**kwargs)
141
141
142 # Initialize the InteractiveShell subclass
142 # Initialize the InteractiveShell subclass
143 self.shell = self.shell_class.instance(parent=self,
143 self.shell = self.shell_class.instance(parent=self,
144 profile_dir = self.profile_dir,
144 profile_dir = self.profile_dir,
145 user_module = self.user_module,
145 user_module = self.user_module,
146 user_ns = self.user_ns,
146 user_ns = self.user_ns,
147 kernel = self,
147 kernel = self,
148 )
148 )
149 self.shell.displayhook.session = self.session
149 self.shell.displayhook.session = self.session
150 self.shell.displayhook.pub_socket = self.iopub_socket
150 self.shell.displayhook.pub_socket = self.iopub_socket
151 self.shell.displayhook.topic = self._topic('execute_result')
151 self.shell.displayhook.topic = self._topic('execute_result')
152 self.shell.display_pub.session = self.session
152 self.shell.display_pub.session = self.session
153 self.shell.display_pub.pub_socket = self.iopub_socket
153 self.shell.display_pub.pub_socket = self.iopub_socket
154 self.shell.data_pub.session = self.session
154 self.shell.data_pub.session = self.session
155 self.shell.data_pub.pub_socket = self.iopub_socket
155 self.shell.data_pub.pub_socket = self.iopub_socket
156
156
157 # TMP - hack while developing
157 # TMP - hack while developing
158 self.shell._reply_content = None
158 self.shell._reply_content = None
159
159
160 # Build dict of handlers for message types
160 # Build dict of handlers for message types
161 msg_types = [ 'execute_request', 'complete_request',
161 msg_types = [ 'execute_request', 'complete_request',
162 'object_info_request', 'history_request',
162 'object_info_request', 'history_request',
163 'kernel_info_request',
163 'kernel_info_request',
164 'connect_request', 'shutdown_request',
164 'connect_request', 'shutdown_request',
165 'apply_request',
165 'apply_request',
166 ]
166 ]
167 self.shell_handlers = {}
167 self.shell_handlers = {}
168 for msg_type in msg_types:
168 for msg_type in msg_types:
169 self.shell_handlers[msg_type] = getattr(self, msg_type)
169 self.shell_handlers[msg_type] = getattr(self, msg_type)
170
170
171 comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ]
171 comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ]
172 comm_manager = self.shell.comm_manager
172 comm_manager = self.shell.comm_manager
173 for msg_type in comm_msg_types:
173 for msg_type in comm_msg_types:
174 self.shell_handlers[msg_type] = getattr(comm_manager, msg_type)
174 self.shell_handlers[msg_type] = getattr(comm_manager, msg_type)
175
175
176 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
176 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
177 self.control_handlers = {}
177 self.control_handlers = {}
178 for msg_type in control_msg_types:
178 for msg_type in control_msg_types:
179 self.control_handlers[msg_type] = getattr(self, msg_type)
179 self.control_handlers[msg_type] = getattr(self, msg_type)
180
180
181
181
182 def dispatch_control(self, msg):
182 def dispatch_control(self, msg):
183 """dispatch control requests"""
183 """dispatch control requests"""
184 idents,msg = self.session.feed_identities(msg, copy=False)
184 idents,msg = self.session.feed_identities(msg, copy=False)
185 try:
185 try:
186 msg = self.session.unserialize(msg, content=True, copy=False)
186 msg = self.session.unserialize(msg, content=True, copy=False)
187 except:
187 except:
188 self.log.error("Invalid Control Message", exc_info=True)
188 self.log.error("Invalid Control Message", exc_info=True)
189 return
189 return
190
190
191 self.log.debug("Control received: %s", msg)
191 self.log.debug("Control received: %s", msg)
192
192
193 header = msg['header']
193 header = msg['header']
194 msg_id = header['msg_id']
194 msg_id = header['msg_id']
195 msg_type = header['msg_type']
195 msg_type = header['msg_type']
196
196
197 handler = self.control_handlers.get(msg_type, None)
197 handler = self.control_handlers.get(msg_type, None)
198 if handler is None:
198 if handler is None:
199 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
199 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
200 else:
200 else:
201 try:
201 try:
202 handler(self.control_stream, idents, msg)
202 handler(self.control_stream, idents, msg)
203 except Exception:
203 except Exception:
204 self.log.error("Exception in control handler:", exc_info=True)
204 self.log.error("Exception in control handler:", exc_info=True)
205
205
206 def dispatch_shell(self, stream, msg):
206 def dispatch_shell(self, stream, msg):
207 """dispatch shell requests"""
207 """dispatch shell requests"""
208 # flush control requests first
208 # flush control requests first
209 if self.control_stream:
209 if self.control_stream:
210 self.control_stream.flush()
210 self.control_stream.flush()
211
211
212 idents,msg = self.session.feed_identities(msg, copy=False)
212 idents,msg = self.session.feed_identities(msg, copy=False)
213 try:
213 try:
214 msg = self.session.unserialize(msg, content=True, copy=False)
214 msg = self.session.unserialize(msg, content=True, copy=False)
215 except:
215 except:
216 self.log.error("Invalid Message", exc_info=True)
216 self.log.error("Invalid Message", exc_info=True)
217 return
217 return
218
218
219 header = msg['header']
219 header = msg['header']
220 msg_id = header['msg_id']
220 msg_id = header['msg_id']
221 msg_type = msg['header']['msg_type']
221 msg_type = msg['header']['msg_type']
222
222
223 # Print some info about this message and leave a '--->' marker, so it's
223 # Print some info about this message and leave a '--->' marker, so it's
224 # easier to trace visually the message chain when debugging. Each
224 # easier to trace visually the message chain when debugging. Each
225 # handler prints its message at the end.
225 # handler prints its message at the end.
226 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
226 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
227 self.log.debug(' Content: %s\n --->\n ', msg['content'])
227 self.log.debug(' Content: %s\n --->\n ', msg['content'])
228
228
229 if msg_id in self.aborted:
229 if msg_id in self.aborted:
230 self.aborted.remove(msg_id)
230 self.aborted.remove(msg_id)
231 # is it safe to assume a msg_id will not be resubmitted?
231 # is it safe to assume a msg_id will not be resubmitted?
232 reply_type = msg_type.split('_')[0] + '_reply'
232 reply_type = msg_type.split('_')[0] + '_reply'
233 status = {'status' : 'aborted'}
233 status = {'status' : 'aborted'}
234 md = {'engine' : self.ident}
234 md = {'engine' : self.ident}
235 md.update(status)
235 md.update(status)
236 reply_msg = self.session.send(stream, reply_type, metadata=md,
236 reply_msg = self.session.send(stream, reply_type, metadata=md,
237 content=status, parent=msg, ident=idents)
237 content=status, parent=msg, ident=idents)
238 return
238 return
239
239
240 handler = self.shell_handlers.get(msg_type, None)
240 handler = self.shell_handlers.get(msg_type, None)
241 if handler is None:
241 if handler is None:
242 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
242 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
243 else:
243 else:
244 # ensure default_int_handler during handler call
244 # ensure default_int_handler during handler call
245 sig = signal(SIGINT, default_int_handler)
245 sig = signal(SIGINT, default_int_handler)
246 self.log.debug("%s: %s", msg_type, msg)
246 self.log.debug("%s: %s", msg_type, msg)
247 try:
247 try:
248 handler(stream, idents, msg)
248 handler(stream, idents, msg)
249 except Exception:
249 except Exception:
250 self.log.error("Exception in message handler:", exc_info=True)
250 self.log.error("Exception in message handler:", exc_info=True)
251 finally:
251 finally:
252 signal(SIGINT, sig)
252 signal(SIGINT, sig)
253
253
254 def enter_eventloop(self):
254 def enter_eventloop(self):
255 """enter eventloop"""
255 """enter eventloop"""
256 self.log.info("entering eventloop %s", self.eventloop)
256 self.log.info("entering eventloop %s", self.eventloop)
257 for stream in self.shell_streams:
257 for stream in self.shell_streams:
258 # flush any pending replies,
258 # flush any pending replies,
259 # which may be skipped by entering the eventloop
259 # which may be skipped by entering the eventloop
260 stream.flush(zmq.POLLOUT)
260 stream.flush(zmq.POLLOUT)
261 # restore default_int_handler
261 # restore default_int_handler
262 signal(SIGINT, default_int_handler)
262 signal(SIGINT, default_int_handler)
263 while self.eventloop is not None:
263 while self.eventloop is not None:
264 try:
264 try:
265 self.eventloop(self)
265 self.eventloop(self)
266 except KeyboardInterrupt:
266 except KeyboardInterrupt:
267 # Ctrl-C shouldn't crash the kernel
267 # Ctrl-C shouldn't crash the kernel
268 self.log.error("KeyboardInterrupt caught in kernel")
268 self.log.error("KeyboardInterrupt caught in kernel")
269 continue
269 continue
270 else:
270 else:
271 # eventloop exited cleanly, this means we should stop (right?)
271 # eventloop exited cleanly, this means we should stop (right?)
272 self.eventloop = None
272 self.eventloop = None
273 break
273 break
274 self.log.info("exiting eventloop")
274 self.log.info("exiting eventloop")
275
275
276 def start(self):
276 def start(self):
277 """register dispatchers for streams"""
277 """register dispatchers for streams"""
278 self.shell.exit_now = False
278 self.shell.exit_now = False
279 if self.control_stream:
279 if self.control_stream:
280 self.control_stream.on_recv(self.dispatch_control, copy=False)
280 self.control_stream.on_recv(self.dispatch_control, copy=False)
281
281
282 def make_dispatcher(stream):
282 def make_dispatcher(stream):
283 def dispatcher(msg):
283 def dispatcher(msg):
284 return self.dispatch_shell(stream, msg)
284 return self.dispatch_shell(stream, msg)
285 return dispatcher
285 return dispatcher
286
286
287 for s in self.shell_streams:
287 for s in self.shell_streams:
288 s.on_recv(make_dispatcher(s), copy=False)
288 s.on_recv(make_dispatcher(s), copy=False)
289
289
290 # publish idle status
290 # publish idle status
291 self._publish_status('starting')
291 self._publish_status('starting')
292
292
293 def do_one_iteration(self):
293 def do_one_iteration(self):
294 """step eventloop just once"""
294 """step eventloop just once"""
295 if self.control_stream:
295 if self.control_stream:
296 self.control_stream.flush()
296 self.control_stream.flush()
297 for stream in self.shell_streams:
297 for stream in self.shell_streams:
298 # handle at most one request per iteration
298 # handle at most one request per iteration
299 stream.flush(zmq.POLLIN, 1)
299 stream.flush(zmq.POLLIN, 1)
300 stream.flush(zmq.POLLOUT)
300 stream.flush(zmq.POLLOUT)
301
301
302
302
303 def record_ports(self, ports):
303 def record_ports(self, ports):
304 """Record the ports that this kernel is using.
304 """Record the ports that this kernel is using.
305
305
306 The creator of the Kernel instance must call this methods if they
306 The creator of the Kernel instance must call this methods if they
307 want the :meth:`connect_request` method to return the port numbers.
307 want the :meth:`connect_request` method to return the port numbers.
308 """
308 """
309 self._recorded_ports = ports
309 self._recorded_ports = ports
310
310
311 #---------------------------------------------------------------------------
311 #---------------------------------------------------------------------------
312 # Kernel request handlers
312 # Kernel request handlers
313 #---------------------------------------------------------------------------
313 #---------------------------------------------------------------------------
314
314
315 def _make_metadata(self, other=None):
315 def _make_metadata(self, other=None):
316 """init metadata dict, for execute/apply_reply"""
316 """init metadata dict, for execute/apply_reply"""
317 new_md = {
317 new_md = {
318 'dependencies_met' : True,
318 'dependencies_met' : True,
319 'engine' : self.ident,
319 'engine' : self.ident,
320 'started': datetime.now(),
320 'started': datetime.now(),
321 }
321 }
322 if other:
322 if other:
323 new_md.update(other)
323 new_md.update(other)
324 return new_md
324 return new_md
325
325
326 def _publish_execute_input(self, code, parent, execution_count):
326 def _publish_execute_input(self, code, parent, execution_count):
327 """Publish the code request on the iopub stream."""
327 """Publish the code request on the iopub stream."""
328
328
329 self.session.send(self.iopub_socket, u'execute_input',
329 self.session.send(self.iopub_socket, u'execute_input',
330 {u'code':code, u'execution_count': execution_count},
330 {u'code':code, u'execution_count': execution_count},
331 parent=parent, ident=self._topic('execute_input')
331 parent=parent, ident=self._topic('execute_input')
332 )
332 )
333
333
334 def _publish_status(self, status, parent=None):
334 def _publish_status(self, status, parent=None):
335 """send status (busy/idle) on IOPub"""
335 """send status (busy/idle) on IOPub"""
336 self.session.send(self.iopub_socket,
336 self.session.send(self.iopub_socket,
337 u'status',
337 u'status',
338 {u'execution_state': status},
338 {u'execution_state': status},
339 parent=parent,
339 parent=parent,
340 ident=self._topic('status'),
340 ident=self._topic('status'),
341 )
341 )
342
342
343 def _forward_input(self, allow_stdin=False):
343 def _forward_input(self, allow_stdin=False):
344 """Forward raw_input and getpass to the current frontend.
344 """Forward raw_input and getpass to the current frontend.
345
345
346 via input_request
346 via input_request
347 """
347 """
348 self._allow_stdin = allow_stdin
348 self._allow_stdin = allow_stdin
349
349
350 if py3compat.PY3:
350 if py3compat.PY3:
351 self._sys_raw_input = builtin_mod.input
351 self._sys_raw_input = builtin_mod.input
352 builtin_mod.input = self.raw_input
352 builtin_mod.input = self.raw_input
353 else:
353 else:
354 self._sys_raw_input = builtin_mod.raw_input
354 self._sys_raw_input = builtin_mod.raw_input
355 self._sys_eval_input = builtin_mod.input
355 self._sys_eval_input = builtin_mod.input
356 builtin_mod.raw_input = self.raw_input
356 builtin_mod.raw_input = self.raw_input
357 builtin_mod.input = lambda prompt='': eval(self.raw_input(prompt))
357 builtin_mod.input = lambda prompt='': eval(self.raw_input(prompt))
358 self._save_getpass = getpass.getpass
358 self._save_getpass = getpass.getpass
359 getpass.getpass = self.getpass
359 getpass.getpass = self.getpass
360
360
361 def _restore_input(self):
361 def _restore_input(self):
362 """Restore raw_input, getpass"""
362 """Restore raw_input, getpass"""
363 if py3compat.PY3:
363 if py3compat.PY3:
364 builtin_mod.input = self._sys_raw_input
364 builtin_mod.input = self._sys_raw_input
365 else:
365 else:
366 builtin_mod.raw_input = self._sys_raw_input
366 builtin_mod.raw_input = self._sys_raw_input
367 builtin_mod.input = self._sys_eval_input
367 builtin_mod.input = self._sys_eval_input
368
368
369 getpass.getpass = self._save_getpass
369 getpass.getpass = self._save_getpass
370
370
371 def set_parent(self, ident, parent):
371 def set_parent(self, ident, parent):
372 """Record the parent state
372 """Record the parent state
373
373
374 For associating side effects with their requests.
374 For associating side effects with their requests.
375 """
375 """
376 self._parent_ident = ident
376 self._parent_ident = ident
377 self._parent_header = parent
377 self._parent_header = parent
378 self.shell.set_parent(parent)
378 self.shell.set_parent(parent)
379
379
380 def execute_request(self, stream, ident, parent):
380 def execute_request(self, stream, ident, parent):
381 """handle an execute_request"""
381 """handle an execute_request"""
382
382
383 self._publish_status(u'busy', parent)
383 self._publish_status(u'busy', parent)
384
384
385 try:
385 try:
386 content = parent[u'content']
386 content = parent[u'content']
387 code = py3compat.cast_unicode_py2(content[u'code'])
387 code = py3compat.cast_unicode_py2(content[u'code'])
388 silent = content[u'silent']
388 silent = content[u'silent']
389 store_history = content.get(u'store_history', not silent)
389 store_history = content.get(u'store_history', not silent)
390 except:
390 except:
391 self.log.error("Got bad msg: ")
391 self.log.error("Got bad msg: ")
392 self.log.error("%s", parent)
392 self.log.error("%s", parent)
393 return
393 return
394
394
395 md = self._make_metadata(parent['metadata'])
395 md = self._make_metadata(parent['metadata'])
396
396
397 shell = self.shell # we'll need this a lot here
397 shell = self.shell # we'll need this a lot here
398
398
399 self._forward_input(content.get('allow_stdin', False))
399 self._forward_input(content.get('allow_stdin', False))
400 # Set the parent message of the display hook and out streams.
400 # Set the parent message of the display hook and out streams.
401 self.set_parent(ident, parent)
401 self.set_parent(ident, parent)
402
402
403 # Re-broadcast our input for the benefit of listening clients, and
403 # Re-broadcast our input for the benefit of listening clients, and
404 # start computing output
404 # start computing output
405 if not silent:
405 if not silent:
406 self._publish_execute_input(code, parent, shell.execution_count)
406 self._publish_execute_input(code, parent, shell.execution_count)
407
407
408 reply_content = {}
408 reply_content = {}
409 # FIXME: the shell calls the exception handler itself.
409 # FIXME: the shell calls the exception handler itself.
410 shell._reply_content = None
410 shell._reply_content = None
411 try:
411 try:
412 shell.run_cell(code, store_history=store_history, silent=silent)
412 shell.run_cell(code, store_history=store_history, silent=silent)
413 except:
413 except:
414 status = u'error'
414 status = u'error'
415 # FIXME: this code right now isn't being used yet by default,
415 # FIXME: this code right now isn't being used yet by default,
416 # because the run_cell() call above directly fires off exception
416 # because the run_cell() call above directly fires off exception
417 # reporting. This code, therefore, is only active in the scenario
417 # reporting. This code, therefore, is only active in the scenario
418 # where runlines itself has an unhandled exception. We need to
418 # where runlines itself has an unhandled exception. We need to
419 # uniformize this, for all exception construction to come from a
419 # uniformize this, for all exception construction to come from a
420 # single location in the codbase.
420 # single location in the codbase.
421 etype, evalue, tb = sys.exc_info()
421 etype, evalue, tb = sys.exc_info()
422 tb_list = traceback.format_exception(etype, evalue, tb)
422 tb_list = traceback.format_exception(etype, evalue, tb)
423 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
423 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
424 else:
424 else:
425 status = u'ok'
425 status = u'ok'
426 finally:
426 finally:
427 self._restore_input()
427 self._restore_input()
428
428
429 reply_content[u'status'] = status
429 reply_content[u'status'] = status
430
430
431 # Return the execution counter so clients can display prompts
431 # Return the execution counter so clients can display prompts
432 reply_content['execution_count'] = shell.execution_count - 1
432 reply_content['execution_count'] = shell.execution_count - 1
433
433
434 # FIXME - fish exception info out of shell, possibly left there by
434 # FIXME - fish exception info out of shell, possibly left there by
435 # runlines. We'll need to clean up this logic later.
435 # runlines. We'll need to clean up this logic later.
436 if shell._reply_content is not None:
436 if shell._reply_content is not None:
437 reply_content.update(shell._reply_content)
437 reply_content.update(shell._reply_content)
438 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
438 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
439 reply_content['engine_info'] = e_info
439 reply_content['engine_info'] = e_info
440 # reset after use
440 # reset after use
441 shell._reply_content = None
441 shell._reply_content = None
442
442
443 if 'traceback' in reply_content:
443 if 'traceback' in reply_content:
444 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
444 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
445
445
446
446
447 # At this point, we can tell whether the main code execution succeeded
447 # At this point, we can tell whether the main code execution succeeded
448 # or not. If it did, we proceed to evaluate user_expressions
448 # or not. If it did, we proceed to evaluate user_expressions
449 if reply_content['status'] == 'ok':
449 if reply_content['status'] == 'ok':
450 reply_content[u'user_expressions'] = \
450 reply_content[u'user_expressions'] = \
451 shell.user_expressions(content.get(u'user_expressions', {}))
451 shell.user_expressions(content.get(u'user_expressions', {}))
452 else:
452 else:
453 # If there was an error, don't even try to compute expressions
453 # If there was an error, don't even try to compute expressions
454 reply_content[u'user_expressions'] = {}
454 reply_content[u'user_expressions'] = {}
455
455
456 # Payloads should be retrieved regardless of outcome, so we can both
456 # Payloads should be retrieved regardless of outcome, so we can both
457 # recover partial output (that could have been generated early in a
457 # recover partial output (that could have been generated early in a
458 # block, before an error) and clear the payload system always.
458 # block, before an error) and clear the payload system always.
459 reply_content[u'payload'] = shell.payload_manager.read_payload()
459 reply_content[u'payload'] = shell.payload_manager.read_payload()
460 # Be agressive about clearing the payload because we don't want
460 # Be agressive about clearing the payload because we don't want
461 # it to sit in memory until the next execute_request comes in.
461 # it to sit in memory until the next execute_request comes in.
462 shell.payload_manager.clear_payload()
462 shell.payload_manager.clear_payload()
463
463
464 # Flush output before sending the reply.
464 # Flush output before sending the reply.
465 sys.stdout.flush()
465 sys.stdout.flush()
466 sys.stderr.flush()
466 sys.stderr.flush()
467 # FIXME: on rare occasions, the flush doesn't seem to make it to the
467 # FIXME: on rare occasions, the flush doesn't seem to make it to the
468 # clients... This seems to mitigate the problem, but we definitely need
468 # clients... This seems to mitigate the problem, but we definitely need
469 # to better understand what's going on.
469 # to better understand what's going on.
470 if self._execute_sleep:
470 if self._execute_sleep:
471 time.sleep(self._execute_sleep)
471 time.sleep(self._execute_sleep)
472
472
473 # Send the reply.
473 # Send the reply.
474 reply_content = json_clean(reply_content)
474 reply_content = json_clean(reply_content)
475
475
476 md['status'] = reply_content['status']
476 md['status'] = reply_content['status']
477 if reply_content['status'] == 'error' and \
477 if reply_content['status'] == 'error' and \
478 reply_content['ename'] == 'UnmetDependency':
478 reply_content['ename'] == 'UnmetDependency':
479 md['dependencies_met'] = False
479 md['dependencies_met'] = False
480
480
481 reply_msg = self.session.send(stream, u'execute_reply',
481 reply_msg = self.session.send(stream, u'execute_reply',
482 reply_content, parent, metadata=md,
482 reply_content, parent, metadata=md,
483 ident=ident)
483 ident=ident)
484
484
485 self.log.debug("%s", reply_msg)
485 self.log.debug("%s", reply_msg)
486
486
487 if not silent and reply_msg['content']['status'] == u'error':
487 if not silent and reply_msg['content']['status'] == u'error':
488 self._abort_queues()
488 self._abort_queues()
489
489
490 self._publish_status(u'idle', parent)
490 self._publish_status(u'idle', parent)
491
491
492 def complete_request(self, stream, ident, parent):
492 def complete_request(self, stream, ident, parent):
493 content = parent['content']
493 content = parent['content']
494 code = content['code']
494 code = content['code']
495 cursor_pos = content['cursor_pos']
495 cursor_pos = content['cursor_pos']
496
496
497 txt, matches = self.shell.complete('', code, cursor_pos)
497 txt, matches = self.shell.complete('', code, cursor_pos)
498 matches = {'matches' : matches,
498 matches = {'matches' : matches,
499 'matched_text' : txt,
499 'matched_text' : txt,
500 'status' : 'ok'}
500 'status' : 'ok'}
501 matches = json_clean(matches)
501 matches = json_clean(matches)
502 completion_msg = self.session.send(stream, 'complete_reply',
502 completion_msg = self.session.send(stream, 'complete_reply',
503 matches, parent, ident)
503 matches, parent, ident)
504 self.log.debug("%s", completion_msg)
504 self.log.debug("%s", completion_msg)
505
505
506 def object_info_request(self, stream, ident, parent):
506 def object_info_request(self, stream, ident, parent):
507 content = parent['content']
507 content = parent['content']
508
508
509 name = token_at_cursor(content['code'], content['cursor_pos'])
509 name = token_at_cursor(content['code'], content['cursor_pos'])
510 info = self.shell.object_inspect(name)
510 info = self.shell.object_inspect(name)
511
511
512 reply_content = {'status' : 'ok'}
512 reply_content = {'status' : 'ok'}
513 reply_content['data'] = data = {}
513 reply_content['data'] = data = {}
514 reply_content['metadata'] = {}
514 reply_content['metadata'] = {}
515 reply_content['name'] = name
515 reply_content['name'] = name
516 reply_content['found'] = info['found']
516 reply_content['found'] = info['found']
517 if info['found']:
517 if info['found']:
518 info_text = self.shell.object_inspect_text(
518 info_text = self.shell.object_inspect_text(
519 name,
519 name,
520 detail_level=content.get('detail_level', 0),
520 detail_level=content.get('detail_level', 0),
521 )
521 )
522 reply_content['data']['text/plain'] = info_text
522 reply_content['data']['text/plain'] = info_text
523 # Before we send this object over, we scrub it for JSON usage
523 # Before we send this object over, we scrub it for JSON usage
524 reply_content = json_clean(reply_content)
524 reply_content = json_clean(reply_content)
525 msg = self.session.send(stream, 'object_info_reply',
525 msg = self.session.send(stream, 'object_info_reply',
526 reply_content, parent, ident)
526 reply_content, parent, ident)
527 self.log.debug("%s", msg)
527 self.log.debug("%s", msg)
528
528
529 def history_request(self, stream, ident, parent):
529 def history_request(self, stream, ident, parent):
530 # We need to pull these out, as passing **kwargs doesn't work with
530 # We need to pull these out, as passing **kwargs doesn't work with
531 # unicode keys before Python 2.6.5.
531 # unicode keys before Python 2.6.5.
532 hist_access_type = parent['content']['hist_access_type']
532 hist_access_type = parent['content']['hist_access_type']
533 raw = parent['content']['raw']
533 raw = parent['content']['raw']
534 output = parent['content']['output']
534 output = parent['content']['output']
535 if hist_access_type == 'tail':
535 if hist_access_type == 'tail':
536 n = parent['content']['n']
536 n = parent['content']['n']
537 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
537 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
538 include_latest=True)
538 include_latest=True)
539
539
540 elif hist_access_type == 'range':
540 elif hist_access_type == 'range':
541 session = parent['content']['session']
541 session = parent['content']['session']
542 start = parent['content']['start']
542 start = parent['content']['start']
543 stop = parent['content']['stop']
543 stop = parent['content']['stop']
544 hist = self.shell.history_manager.get_range(session, start, stop,
544 hist = self.shell.history_manager.get_range(session, start, stop,
545 raw=raw, output=output)
545 raw=raw, output=output)
546
546
547 elif hist_access_type == 'search':
547 elif hist_access_type == 'search':
548 n = parent['content'].get('n')
548 n = parent['content'].get('n')
549 unique = parent['content'].get('unique', False)
549 unique = parent['content'].get('unique', False)
550 pattern = parent['content']['pattern']
550 pattern = parent['content']['pattern']
551 hist = self.shell.history_manager.search(
551 hist = self.shell.history_manager.search(
552 pattern, raw=raw, output=output, n=n, unique=unique)
552 pattern, raw=raw, output=output, n=n, unique=unique)
553
553
554 else:
554 else:
555 hist = []
555 hist = []
556 hist = list(hist)
556 hist = list(hist)
557 content = {'history' : hist}
557 content = {'history' : hist}
558 content = json_clean(content)
558 content = json_clean(content)
559 msg = self.session.send(stream, 'history_reply',
559 msg = self.session.send(stream, 'history_reply',
560 content, parent, ident)
560 content, parent, ident)
561 self.log.debug("Sending history reply with %i entries", len(hist))
561 self.log.debug("Sending history reply with %i entries", len(hist))
562
562
563 def connect_request(self, stream, ident, parent):
563 def connect_request(self, stream, ident, parent):
564 if self._recorded_ports is not None:
564 if self._recorded_ports is not None:
565 content = self._recorded_ports.copy()
565 content = self._recorded_ports.copy()
566 else:
566 else:
567 content = {}
567 content = {}
568 msg = self.session.send(stream, 'connect_reply',
568 msg = self.session.send(stream, 'connect_reply',
569 content, parent, ident)
569 content, parent, ident)
570 self.log.debug("%s", msg)
570 self.log.debug("%s", msg)
571
571
572 def kernel_info_request(self, stream, ident, parent):
572 def kernel_info_request(self, stream, ident, parent):
573 vinfo = {
573 vinfo = {
574 'protocol_version': protocol_version,
574 'protocol_version': protocol_version,
575 'ipython_version': ipython_version,
575 'ipython_version': ipython_version,
576 'language_version': language_version,
576 'language_version': language_version,
577 'language': 'python',
577 'language': 'python',
578 'banner': self.shell.banner,
578 }
579 }
579 msg = self.session.send(stream, 'kernel_info_reply',
580 msg = self.session.send(stream, 'kernel_info_reply',
580 vinfo, parent, ident)
581 vinfo, parent, ident)
581 self.log.debug("%s", msg)
582 self.log.debug("%s", msg)
582
583
583 def shutdown_request(self, stream, ident, parent):
584 def shutdown_request(self, stream, ident, parent):
584 self.shell.exit_now = True
585 self.shell.exit_now = True
585 content = dict(status='ok')
586 content = dict(status='ok')
586 content.update(parent['content'])
587 content.update(parent['content'])
587 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
588 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
588 # same content, but different msg_id for broadcasting on IOPub
589 # same content, but different msg_id for broadcasting on IOPub
589 self._shutdown_message = self.session.msg(u'shutdown_reply',
590 self._shutdown_message = self.session.msg(u'shutdown_reply',
590 content, parent
591 content, parent
591 )
592 )
592
593
593 self._at_shutdown()
594 self._at_shutdown()
594 # call sys.exit after a short delay
595 # call sys.exit after a short delay
595 loop = ioloop.IOLoop.instance()
596 loop = ioloop.IOLoop.instance()
596 loop.add_timeout(time.time()+0.1, loop.stop)
597 loop.add_timeout(time.time()+0.1, loop.stop)
597
598
598 #---------------------------------------------------------------------------
599 #---------------------------------------------------------------------------
599 # Engine methods
600 # Engine methods
600 #---------------------------------------------------------------------------
601 #---------------------------------------------------------------------------
601
602
602 def apply_request(self, stream, ident, parent):
603 def apply_request(self, stream, ident, parent):
603 try:
604 try:
604 content = parent[u'content']
605 content = parent[u'content']
605 bufs = parent[u'buffers']
606 bufs = parent[u'buffers']
606 msg_id = parent['header']['msg_id']
607 msg_id = parent['header']['msg_id']
607 except:
608 except:
608 self.log.error("Got bad msg: %s", parent, exc_info=True)
609 self.log.error("Got bad msg: %s", parent, exc_info=True)
609 return
610 return
610
611
611 self._publish_status(u'busy', parent)
612 self._publish_status(u'busy', parent)
612
613
613 # Set the parent message of the display hook and out streams.
614 # Set the parent message of the display hook and out streams.
614 shell = self.shell
615 shell = self.shell
615 shell.set_parent(parent)
616 shell.set_parent(parent)
616
617
617 # execute_input_msg = self.session.msg(u'execute_input',{u'code':code}, parent=parent)
618 # execute_input_msg = self.session.msg(u'execute_input',{u'code':code}, parent=parent)
618 # self.iopub_socket.send(execute_input_msg)
619 # self.iopub_socket.send(execute_input_msg)
619 # self.session.send(self.iopub_socket, u'execute_input', {u'code':code},parent=parent)
620 # self.session.send(self.iopub_socket, u'execute_input', {u'code':code},parent=parent)
620 md = self._make_metadata(parent['metadata'])
621 md = self._make_metadata(parent['metadata'])
621 try:
622 try:
622 working = shell.user_ns
623 working = shell.user_ns
623
624
624 prefix = "_"+str(msg_id).replace("-","")+"_"
625 prefix = "_"+str(msg_id).replace("-","")+"_"
625
626
626 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
627 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
627
628
628 fname = getattr(f, '__name__', 'f')
629 fname = getattr(f, '__name__', 'f')
629
630
630 fname = prefix+"f"
631 fname = prefix+"f"
631 argname = prefix+"args"
632 argname = prefix+"args"
632 kwargname = prefix+"kwargs"
633 kwargname = prefix+"kwargs"
633 resultname = prefix+"result"
634 resultname = prefix+"result"
634
635
635 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
636 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
636 # print ns
637 # print ns
637 working.update(ns)
638 working.update(ns)
638 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
639 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
639 try:
640 try:
640 exec(code, shell.user_global_ns, shell.user_ns)
641 exec(code, shell.user_global_ns, shell.user_ns)
641 result = working.get(resultname)
642 result = working.get(resultname)
642 finally:
643 finally:
643 for key in ns:
644 for key in ns:
644 working.pop(key)
645 working.pop(key)
645
646
646 result_buf = serialize_object(result,
647 result_buf = serialize_object(result,
647 buffer_threshold=self.session.buffer_threshold,
648 buffer_threshold=self.session.buffer_threshold,
648 item_threshold=self.session.item_threshold,
649 item_threshold=self.session.item_threshold,
649 )
650 )
650
651
651 except:
652 except:
652 # invoke IPython traceback formatting
653 # invoke IPython traceback formatting
653 shell.showtraceback()
654 shell.showtraceback()
654 # FIXME - fish exception info out of shell, possibly left there by
655 # FIXME - fish exception info out of shell, possibly left there by
655 # run_code. We'll need to clean up this logic later.
656 # run_code. We'll need to clean up this logic later.
656 reply_content = {}
657 reply_content = {}
657 if shell._reply_content is not None:
658 if shell._reply_content is not None:
658 reply_content.update(shell._reply_content)
659 reply_content.update(shell._reply_content)
659 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
660 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
660 reply_content['engine_info'] = e_info
661 reply_content['engine_info'] = e_info
661 # reset after use
662 # reset after use
662 shell._reply_content = None
663 shell._reply_content = None
663
664
664 self.session.send(self.iopub_socket, u'error', reply_content, parent=parent,
665 self.session.send(self.iopub_socket, u'error', reply_content, parent=parent,
665 ident=self._topic('error'))
666 ident=self._topic('error'))
666 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
667 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
667 result_buf = []
668 result_buf = []
668
669
669 if reply_content['ename'] == 'UnmetDependency':
670 if reply_content['ename'] == 'UnmetDependency':
670 md['dependencies_met'] = False
671 md['dependencies_met'] = False
671 else:
672 else:
672 reply_content = {'status' : 'ok'}
673 reply_content = {'status' : 'ok'}
673
674
674 # put 'ok'/'error' status in header, for scheduler introspection:
675 # put 'ok'/'error' status in header, for scheduler introspection:
675 md['status'] = reply_content['status']
676 md['status'] = reply_content['status']
676
677
677 # flush i/o
678 # flush i/o
678 sys.stdout.flush()
679 sys.stdout.flush()
679 sys.stderr.flush()
680 sys.stderr.flush()
680
681
681 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
682 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
682 parent=parent, ident=ident,buffers=result_buf, metadata=md)
683 parent=parent, ident=ident,buffers=result_buf, metadata=md)
683
684
684 self._publish_status(u'idle', parent)
685 self._publish_status(u'idle', parent)
685
686
686 #---------------------------------------------------------------------------
687 #---------------------------------------------------------------------------
687 # Control messages
688 # Control messages
688 #---------------------------------------------------------------------------
689 #---------------------------------------------------------------------------
689
690
690 def abort_request(self, stream, ident, parent):
691 def abort_request(self, stream, ident, parent):
691 """abort a specifig msg by id"""
692 """abort a specifig msg by id"""
692 msg_ids = parent['content'].get('msg_ids', None)
693 msg_ids = parent['content'].get('msg_ids', None)
693 if isinstance(msg_ids, string_types):
694 if isinstance(msg_ids, string_types):
694 msg_ids = [msg_ids]
695 msg_ids = [msg_ids]
695 if not msg_ids:
696 if not msg_ids:
696 self.abort_queues()
697 self.abort_queues()
697 for mid in msg_ids:
698 for mid in msg_ids:
698 self.aborted.add(str(mid))
699 self.aborted.add(str(mid))
699
700
700 content = dict(status='ok')
701 content = dict(status='ok')
701 reply_msg = self.session.send(stream, 'abort_reply', content=content,
702 reply_msg = self.session.send(stream, 'abort_reply', content=content,
702 parent=parent, ident=ident)
703 parent=parent, ident=ident)
703 self.log.debug("%s", reply_msg)
704 self.log.debug("%s", reply_msg)
704
705
705 def clear_request(self, stream, idents, parent):
706 def clear_request(self, stream, idents, parent):
706 """Clear our namespace."""
707 """Clear our namespace."""
707 self.shell.reset(False)
708 self.shell.reset(False)
708 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
709 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
709 content = dict(status='ok'))
710 content = dict(status='ok'))
710
711
711
712
712 #---------------------------------------------------------------------------
713 #---------------------------------------------------------------------------
713 # Protected interface
714 # Protected interface
714 #---------------------------------------------------------------------------
715 #---------------------------------------------------------------------------
715
716
716 def _wrap_exception(self, method=None):
717 def _wrap_exception(self, method=None):
717 # import here, because _wrap_exception is only used in parallel,
718 # import here, because _wrap_exception is only used in parallel,
718 # and parallel has higher min pyzmq version
719 # and parallel has higher min pyzmq version
719 from IPython.parallel.error import wrap_exception
720 from IPython.parallel.error import wrap_exception
720 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
721 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
721 content = wrap_exception(e_info)
722 content = wrap_exception(e_info)
722 return content
723 return content
723
724
724 def _topic(self, topic):
725 def _topic(self, topic):
725 """prefixed topic for IOPub messages"""
726 """prefixed topic for IOPub messages"""
726 if self.int_id >= 0:
727 if self.int_id >= 0:
727 base = "engine.%i" % self.int_id
728 base = "engine.%i" % self.int_id
728 else:
729 else:
729 base = "kernel.%s" % self.ident
730 base = "kernel.%s" % self.ident
730
731
731 return py3compat.cast_bytes("%s.%s" % (base, topic))
732 return py3compat.cast_bytes("%s.%s" % (base, topic))
732
733
733 def _abort_queues(self):
734 def _abort_queues(self):
734 for stream in self.shell_streams:
735 for stream in self.shell_streams:
735 if stream:
736 if stream:
736 self._abort_queue(stream)
737 self._abort_queue(stream)
737
738
738 def _abort_queue(self, stream):
739 def _abort_queue(self, stream):
739 poller = zmq.Poller()
740 poller = zmq.Poller()
740 poller.register(stream.socket, zmq.POLLIN)
741 poller.register(stream.socket, zmq.POLLIN)
741 while True:
742 while True:
742 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
743 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
743 if msg is None:
744 if msg is None:
744 return
745 return
745
746
746 self.log.info("Aborting:")
747 self.log.info("Aborting:")
747 self.log.info("%s", msg)
748 self.log.info("%s", msg)
748 msg_type = msg['header']['msg_type']
749 msg_type = msg['header']['msg_type']
749 reply_type = msg_type.split('_')[0] + '_reply'
750 reply_type = msg_type.split('_')[0] + '_reply'
750
751
751 status = {'status' : 'aborted'}
752 status = {'status' : 'aborted'}
752 md = {'engine' : self.ident}
753 md = {'engine' : self.ident}
753 md.update(status)
754 md.update(status)
754 reply_msg = self.session.send(stream, reply_type, metadata=md,
755 reply_msg = self.session.send(stream, reply_type, metadata=md,
755 content=status, parent=msg, ident=idents)
756 content=status, parent=msg, ident=idents)
756 self.log.debug("%s", reply_msg)
757 self.log.debug("%s", reply_msg)
757 # We need to wait a bit for requests to come in. This can probably
758 # We need to wait a bit for requests to come in. This can probably
758 # be set shorter for true asynchronous clients.
759 # be set shorter for true asynchronous clients.
759 poller.poll(50)
760 poller.poll(50)
760
761
761
762
762 def _no_raw_input(self):
763 def _no_raw_input(self):
763 """Raise StdinNotImplentedError if active frontend doesn't support
764 """Raise StdinNotImplentedError if active frontend doesn't support
764 stdin."""
765 stdin."""
765 raise StdinNotImplementedError("raw_input was called, but this "
766 raise StdinNotImplementedError("raw_input was called, but this "
766 "frontend does not support stdin.")
767 "frontend does not support stdin.")
767
768
768 def getpass(self, prompt=''):
769 def getpass(self, prompt=''):
769 """Forward getpass to frontends
770 """Forward getpass to frontends
770
771
771 Raises
772 Raises
772 ------
773 ------
773 StdinNotImplentedError if active frontend doesn't support stdin.
774 StdinNotImplentedError if active frontend doesn't support stdin.
774 """
775 """
775 if not self._allow_stdin:
776 if not self._allow_stdin:
776 raise StdinNotImplementedError(
777 raise StdinNotImplementedError(
777 "getpass was called, but this frontend does not support input requests."
778 "getpass was called, but this frontend does not support input requests."
778 )
779 )
779 return self._input_request(prompt,
780 return self._input_request(prompt,
780 self._parent_ident,
781 self._parent_ident,
781 self._parent_header,
782 self._parent_header,
782 password=True,
783 password=True,
783 )
784 )
784
785
785 def raw_input(self, prompt=''):
786 def raw_input(self, prompt=''):
786 """Forward raw_input to frontends
787 """Forward raw_input to frontends
787
788
788 Raises
789 Raises
789 ------
790 ------
790 StdinNotImplentedError if active frontend doesn't support stdin.
791 StdinNotImplentedError if active frontend doesn't support stdin.
791 """
792 """
792 if not self._allow_stdin:
793 if not self._allow_stdin:
793 raise StdinNotImplementedError(
794 raise StdinNotImplementedError(
794 "raw_input was called, but this frontend does not support input requests."
795 "raw_input was called, but this frontend does not support input requests."
795 )
796 )
796 return self._input_request(prompt,
797 return self._input_request(prompt,
797 self._parent_ident,
798 self._parent_ident,
798 self._parent_header,
799 self._parent_header,
799 password=False,
800 password=False,
800 )
801 )
801
802
802 def _input_request(self, prompt, ident, parent, password=False):
803 def _input_request(self, prompt, ident, parent, password=False):
803 # Flush output before making the request.
804 # Flush output before making the request.
804 sys.stderr.flush()
805 sys.stderr.flush()
805 sys.stdout.flush()
806 sys.stdout.flush()
806 # flush the stdin socket, to purge stale replies
807 # flush the stdin socket, to purge stale replies
807 while True:
808 while True:
808 try:
809 try:
809 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
810 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
810 except zmq.ZMQError as e:
811 except zmq.ZMQError as e:
811 if e.errno == zmq.EAGAIN:
812 if e.errno == zmq.EAGAIN:
812 break
813 break
813 else:
814 else:
814 raise
815 raise
815
816
816 # Send the input request.
817 # Send the input request.
817 content = json_clean(dict(prompt=prompt, password=password))
818 content = json_clean(dict(prompt=prompt, password=password))
818 self.session.send(self.stdin_socket, u'input_request', content, parent,
819 self.session.send(self.stdin_socket, u'input_request', content, parent,
819 ident=ident)
820 ident=ident)
820
821
821 # Await a response.
822 # Await a response.
822 while True:
823 while True:
823 try:
824 try:
824 ident, reply = self.session.recv(self.stdin_socket, 0)
825 ident, reply = self.session.recv(self.stdin_socket, 0)
825 except Exception:
826 except Exception:
826 self.log.warn("Invalid Message:", exc_info=True)
827 self.log.warn("Invalid Message:", exc_info=True)
827 except KeyboardInterrupt:
828 except KeyboardInterrupt:
828 # re-raise KeyboardInterrupt, to truncate traceback
829 # re-raise KeyboardInterrupt, to truncate traceback
829 raise KeyboardInterrupt
830 raise KeyboardInterrupt
830 else:
831 else:
831 break
832 break
832 try:
833 try:
833 value = py3compat.unicode_to_str(reply['content']['value'])
834 value = py3compat.unicode_to_str(reply['content']['value'])
834 except:
835 except:
835 self.log.error("Bad input_reply: %s", parent)
836 self.log.error("Bad input_reply: %s", parent)
836 value = ''
837 value = ''
837 if value == '\x04':
838 if value == '\x04':
838 # EOF
839 # EOF
839 raise EOFError
840 raise EOFError
840 return value
841 return value
841
842
842 def _at_shutdown(self):
843 def _at_shutdown(self):
843 """Actions taken at shutdown by the kernel, called by python's atexit.
844 """Actions taken at shutdown by the kernel, called by python's atexit.
844 """
845 """
845 # io.rprint("Kernel at_shutdown") # dbg
846 # io.rprint("Kernel at_shutdown") # dbg
846 if self._shutdown_message is not None:
847 if self._shutdown_message is not None:
847 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
848 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
848 self.log.debug("%s", self._shutdown_message)
849 self.log.debug("%s", self._shutdown_message)
849 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
850 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
850
851
General Comments 0
You need to be logged in to leave comments. Login now