ipkernel.py
804 lines
| 30.2 KiB
| text/x-python
|
PythonLexer
Fernando Perez
|
r2598 | #!/usr/bin/env python | ||
"""A simple interactive kernel that talks to a frontend over 0MQ. | ||||
Things to do: | ||||
* Implement `set_parent` logic. Right before doing exec, the Kernel should | ||||
call set_parent on all the PUB objects with the message about to be executed. | ||||
* Implement random port and security key logic. | ||||
* Implement control messages. | ||||
* Implement event loop and poll version. | ||||
""" | ||||
epatters
|
r2712 | #----------------------------------------------------------------------------- | ||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
Fernando Perez
|
r2856 | from __future__ import print_function | ||
epatters
|
r2712 | |||
MinRK
|
r6789 | # Standard library imports | ||
Fernando Perez
|
r2598 | import __builtin__ | ||
import sys | ||||
import time | ||||
import traceback | ||||
Omar Andres Zapata Mesa
|
r3294 | import logging | ||
MinRK
|
r6788 | import uuid | ||
from datetime import datetime | ||||
Paul Ivanov
|
r5601 | from signal import ( | ||
MinRK
|
r6790 | signal, getsignal, default_int_handler, SIGINT, SIG_IGN | ||
Paul Ivanov
|
r5601 | ) | ||
MinRK
|
r6789 | |||
# System library imports | ||||
Fernando Perez
|
r2598 | import zmq | ||
MinRK
|
r6790 | from zmq.eventloop import ioloop | ||
from zmq.eventloop.zmqstream import ZMQStream | ||||
Fernando Perez
|
r2598 | |||
MinRK
|
r6789 | # Local imports | ||
Brian Granger
|
r2759 | from IPython.config.configurable import Configurable | ||
MinRK
|
r4952 | from IPython.core.error import StdinNotImplementedError | ||
Takafumi Arakaki
|
r9094 | from IPython.core import release | ||
Fernando Perez
|
r2856 | from IPython.utils import io | ||
Thomas Kluyver
|
r4772 | from IPython.utils import py3compat | ||
Fernando Perez
|
r2948 | from IPython.utils.jsonutil import json_clean | ||
MinRK
|
r3970 | from IPython.utils.traitlets import ( | ||
epatters
|
r8474 | Any, Instance, Float, Dict, CaselessStrEnum, List, Set, Integer, Unicode, | ||
Type | ||||
MinRK
|
r3970 | ) | ||
MinRK
|
r3979 | |||
MinRK
|
r6788 | from serialize import serialize_object, unpack_apply_message | ||
MinRK
|
r9357 | from session import Session | ||
epatters
|
r2778 | from zmqshell import ZMQInteractiveShell | ||
Fernando Perez
|
r2598 | |||
Omar Andres Zapata Mesa
|
r3314 | |||
epatters
|
r2712 | #----------------------------------------------------------------------------- | ||
Brian Granger
|
r2754 | # Main kernel class | ||
epatters
|
r2712 | #----------------------------------------------------------------------------- | ||
epatters
|
r2641 | |||
Takafumi Arakaki
|
r9094 | protocol_version = list(release.kernel_protocol_version_info) | ||
ipython_version = list(release.version_info) | ||||
Takafumi Arakaki
|
r8861 | language_version = list(sys.version_info[:3]) | ||
Takafumi Arakaki
|
r8830 | |||
Brian Granger
|
r2759 | class Kernel(Configurable): | ||
epatters
|
r2778 | #--------------------------------------------------------------------------- | ||
# Kernel interface | ||||
#--------------------------------------------------------------------------- | ||||
MinRK
|
r5153 | # attribute to override with a GUI | ||
eventloop = Any(None) | ||||
MinRK
|
r6790 | def _eventloop_changed(self, name, old, new): | ||
"""schedule call to eventloop from IOLoop""" | ||||
loop = ioloop.IOLoop.instance() | ||||
loop.add_timeout(time.time()+0.1, self.enter_eventloop) | ||||
MinRK
|
r5153 | |||
Brian Granger
|
r2760 | shell = Instance('IPython.core.interactiveshell.InteractiveShellABC') | ||
epatters
|
r8474 | shell_class = Type(ZMQInteractiveShell) | ||
epatters
|
r2778 | session = Instance(Session) | ||
MinRK
|
r5261 | profile_dir = Instance('IPython.core.profiledir.ProfileDir') | ||
MinRK
|
r6790 | shell_streams = List() | ||
control_stream = Instance(ZMQStream) | ||||
MinRK
|
r6804 | iopub_socket = Instance(zmq.Socket) | ||
MinRK
|
r6790 | stdin_socket = Instance(zmq.Socket) | ||
MinRK
|
r3970 | log = Instance(logging.Logger) | ||
MinRK
|
r6571 | |||
MinRK
|
r6826 | user_module = Any() | ||
MinRK
|
r6571 | def _user_module_changed(self, name, old, new): | ||
if self.shell is not None: | ||||
self.shell.user_module = new | ||||
user_ns = Dict(default_value=None) | ||||
def _user_ns_changed(self, name, old, new): | ||||
if self.shell is not None: | ||||
self.shell.user_ns = new | ||||
self.shell.init_user_ns() | ||||
Brian Granger
|
r2759 | |||
MinRK
|
r6789 | # identities: | ||
int_id = Integer(-1) | ||||
ident = Unicode() | ||||
def _ident_default(self): | ||||
return unicode(uuid.uuid4()) | ||||
Fernando Perez
|
r2940 | # Private interface | ||
MinRK
|
r6785 | |||
Fernando Perez
|
r2940 | # Time to sleep after flushing the stdout/err buffers in each execute | ||
# cycle. While this introduces a hard limit on the minimal latency of the | ||||
# execute cycle, it helps prevent output synchronization problems for | ||||
# clients. | ||||
# Units are in seconds. The minimum zmq latency on local host is probably | ||||
# ~150 microseconds, set this to 500us for now. We may need to increase it | ||||
# a little if it's not enough after more interactive testing. | ||||
_execute_sleep = Float(0.0005, config=True) | ||||
# Frequency of the kernel's event loop. | ||||
# Units are in seconds, kernel subclasses for GUI toolkits may need to | ||||
# adapt to milliseconds. | ||||
_poll_interval = Float(0.05, config=True) | ||||
Fernando Perez
|
r2972 | |||
# If the shutdown was requested over the network, we leave here the | ||||
# necessary reply message so it can be sent by our registered atexit | ||||
# handler. This ensures that the reply is only sent to clients truly at | ||||
# the end of our shutdown process (which happens after the underlying | ||||
# IPython shell's own shutdown). | ||||
_shutdown_message = None | ||||
Brian Granger
|
r3019 | |||
# This is a dict of port number that the kernel is listening on. It is set | ||||
# by record_ports and used by connect_request. | ||||
MinRK
|
r3970 | _recorded_ports = Dict() | ||
Pietro Berkes
|
r8940 | |||
# A reference to the Python builtin 'raw_input' function. | ||||
# (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3) | ||||
_sys_raw_input = Any() | ||||
MinRK
|
r6785 | # set of aborted msg_ids | ||
aborted = Set() | ||||
MinRK
|
r3970 | |||
Omar Andres Zapata Mesa
|
r3294 | |||
Brian Granger
|
r2759 | def __init__(self, **kwargs): | ||
super(Kernel, self).__init__(**kwargs) | ||||
Brian Granger
|
r2786 | |||
# Initialize the InteractiveShell subclass | ||||
epatters
|
r8474 | self.shell = self.shell_class.instance(config=self.config, | ||
MinRK
|
r5261 | profile_dir = self.profile_dir, | ||
Scott Tsai
|
r6043 | user_module = self.user_module, | ||
user_ns = self.user_ns, | ||||
MinRK
|
r5261 | ) | ||
Brian Granger
|
r2786 | self.shell.displayhook.session = self.session | ||
MinRK
|
r6804 | self.shell.displayhook.pub_socket = self.iopub_socket | ||
MinRK
|
r6814 | self.shell.displayhook.topic = self._topic('pyout') | ||
Brian Granger
|
r3277 | self.shell.display_pub.session = self.session | ||
MinRK
|
r6804 | self.shell.display_pub.pub_socket = self.iopub_socket | ||
MinRK
|
r8102 | self.shell.data_pub.session = self.session | ||
self.shell.data_pub.pub_socket = self.iopub_socket | ||||
epatters
|
r2778 | |||
Fernando Perez
|
r2838 | # TMP - hack while developing | ||
self.shell._reply_content = None | ||||
Fernando Perez
|
r2598 | # Build dict of handlers for message types | ||
Bernardo B. Marques
|
r4872 | msg_types = [ 'execute_request', 'complete_request', | ||
Thomas Kluyver
|
r3817 | 'object_info_request', 'history_request', | ||
Takafumi Arakaki
|
r8879 | 'kernel_info_request', | ||
MinRK
|
r6782 | 'connect_request', 'shutdown_request', | ||
'apply_request', | ||||
] | ||||
MinRK
|
r6790 | self.shell_handlers = {} | ||
epatters
|
r2612 | for msg_type in msg_types: | ||
MinRK
|
r6790 | self.shell_handlers[msg_type] = getattr(self, msg_type) | ||
MinRK
|
r6783 | |||
MinRK
|
r6799 | control_msg_types = msg_types + [ 'clear_request', 'abort_request' ] | ||
MinRK
|
r6782 | self.control_handlers = {} | ||
for msg_type in control_msg_types: | ||||
self.control_handlers[msg_type] = getattr(self, msg_type) | ||||
MinRK
|
r6790 | def dispatch_control(self, msg): | ||
"""dispatch control requests""" | ||||
idents,msg = self.session.feed_identities(msg, copy=False) | ||||
try: | ||||
msg = self.session.unserialize(msg, content=True, copy=False) | ||||
except: | ||||
self.log.error("Invalid Control Message", exc_info=True) | ||||
return | ||||
Brian E. Granger
|
r4237 | |||
MinRK
|
r6790 | self.log.debug("Control received: %s", msg) | ||
Brian E. Granger
|
r4237 | |||
MinRK
|
r6790 | header = msg['header'] | ||
msg_id = header['msg_id'] | ||||
msg_type = header['msg_type'] | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r6790 | handler = self.control_handlers.get(msg_type, None) | ||
if handler is None: | ||||
self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type) | ||||
else: | ||||
MinRK
|
r6792 | try: | ||
handler(self.control_stream, idents, msg) | ||||
except Exception: | ||||
self.log.error("Exception in control handler:", exc_info=True) | ||||
MinRK
|
r6790 | |||
def dispatch_shell(self, stream, msg): | ||||
"""dispatch shell requests""" | ||||
# flush control requests first | ||||
if self.control_stream: | ||||
self.control_stream.flush() | ||||
idents,msg = self.session.feed_identities(msg, copy=False) | ||||
try: | ||||
msg = self.session.unserialize(msg, content=True, copy=False) | ||||
except: | ||||
self.log.error("Invalid Message", exc_info=True) | ||||
return | ||||
header = msg['header'] | ||||
msg_id = header['msg_id'] | ||||
msg_type = msg['header']['msg_type'] | ||||
Fernando Perez
|
r2926 | # Print some info about this message and leave a '--->' marker, so it's | ||
# easier to trace visually the message chain when debugging. Each | ||||
# handler prints its message at the end. | ||||
MinRK
|
r6783 | self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type) | ||
self.log.debug(' Content: %s\n --->\n ', msg['content']) | ||||
Fernando Perez
|
r2926 | |||
MinRK
|
r6785 | if msg_id in self.aborted: | ||
self.aborted.remove(msg_id) | ||||
# is it safe to assume a msg_id will not be resubmitted? | ||||
reply_type = msg_type.split('_')[0] + '_reply' | ||||
status = {'status' : 'aborted'} | ||||
MinRK
|
r7957 | md = {'engine' : self.ident} | ||
md.update(status) | ||||
reply_msg = self.session.send(stream, reply_type, metadata=md, | ||||
MinRK
|
r6785 | content=status, parent=msg, ident=idents) | ||
return | ||||
MinRK
|
r6790 | handler = self.shell_handlers.get(msg_type, None) | ||
Brian Granger
|
r2868 | if handler is None: | ||
MinRK
|
r6790 | self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type) | ||
Brian Granger
|
r2868 | else: | ||
MinRK
|
r6790 | # ensure default_int_handler during handler call | ||
sig = signal(SIGINT, default_int_handler) | ||||
try: | ||||
handler(stream, idents, msg) | ||||
MinRK
|
r6792 | except Exception: | ||
self.log.error("Exception in message handler:", exc_info=True) | ||||
MinRK
|
r6790 | finally: | ||
signal(SIGINT, sig) | ||||
def enter_eventloop(self): | ||||
"""enter eventloop""" | ||||
MinRK
|
r6882 | self.log.info("entering eventloop") | ||
MinRK
|
r6790 | # restore default_int_handler | ||
signal(SIGINT, default_int_handler) | ||||
MinRK
|
r6827 | while self.eventloop is not None: | ||
try: | ||||
self.eventloop(self) | ||||
except KeyboardInterrupt: | ||||
# Ctrl-C shouldn't crash the kernel | ||||
self.log.error("KeyboardInterrupt caught in kernel") | ||||
continue | ||||
else: | ||||
# eventloop exited cleanly, this means we should stop (right?) | ||||
self.eventloop = None | ||||
break | ||||
MinRK
|
r6882 | self.log.info("exiting eventloop") | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r6790 | def start(self): | ||
"""register dispatchers for streams""" | ||||
MinRK
|
r6826 | self.shell.exit_now = False | ||
MinRK
|
r6790 | if self.control_stream: | ||
self.control_stream.on_recv(self.dispatch_control, copy=False) | ||||
Fernando Perez
|
r2950 | |||
MinRK
|
r6790 | def make_dispatcher(stream): | ||
def dispatcher(msg): | ||||
return self.dispatch_shell(stream, msg) | ||||
return dispatcher | ||||
epatters
|
r2778 | |||
MinRK
|
r6790 | for s in self.shell_streams: | ||
s.on_recv(make_dispatcher(s), copy=False) | ||||
MinRK
|
r10338 | |||
# publish idle status | ||||
self._publish_status('starting') | ||||
MinRK
|
r6790 | |||
def do_one_iteration(self): | ||||
"""step eventloop just once""" | ||||
if self.control_stream: | ||||
self.control_stream.flush() | ||||
for stream in self.shell_streams: | ||||
# handle at most one request per iteration | ||||
stream.flush(zmq.POLLIN, 1) | ||||
stream.flush(zmq.POLLOUT) | ||||
MinRK
|
r5153 | |||
Brian Granger
|
r3019 | |||
MinRK
|
r3970 | def record_ports(self, ports): | ||
Brian Granger
|
r3019 | """Record the ports that this kernel is using. | ||
The creator of the Kernel instance must call this methods if they | ||||
want the :meth:`connect_request` method to return the port numbers. | ||||
""" | ||||
MinRK
|
r3970 | self._recorded_ports = ports | ||
Brian Granger
|
r3019 | |||
epatters
|
r2778 | #--------------------------------------------------------------------------- | ||
# Kernel request handlers | ||||
#--------------------------------------------------------------------------- | ||||
MinRK
|
r6807 | |||
MinRK
|
r7957 | def _make_metadata(self, other=None): | ||
"""init metadata dict, for execute/apply_reply""" | ||||
new_md = { | ||||
MinRK
|
r6807 | 'dependencies_met' : True, | ||
'engine' : self.ident, | ||||
'started': datetime.now(), | ||||
} | ||||
MinRK
|
r7957 | if other: | ||
new_md.update(other) | ||||
return new_md | ||||
MinRK
|
r6807 | |||
Paul Ivanov
|
r6542 | def _publish_pyin(self, code, parent, execution_count): | ||
Fernando Perez
|
r2926 | """Publish the code request on the pyin stream.""" | ||
MinRK
|
r6804 | self.session.send(self.iopub_socket, u'pyin', | ||
MinRK
|
r6794 | {u'code':code, u'execution_count': execution_count}, | ||
MinRK
|
r6795 | parent=parent, ident=self._topic('pyin') | ||
MinRK
|
r6794 | ) | ||
MinRK
|
r7491 | |||
def _publish_status(self, status, parent=None): | ||||
"""send status (busy/idle) on IOPub""" | ||||
MinRK
|
r6804 | self.session.send(self.iopub_socket, | ||
Fernando Perez
|
r5471 | u'status', | ||
MinRK
|
r7491 | {u'execution_state': status}, | ||
MinRK
|
r6794 | parent=parent, | ||
MinRK
|
r6795 | ident=self._topic('status'), | ||
MinRK
|
r6794 | ) | ||
Fernando Perez
|
r5471 | |||
MinRK
|
r7491 | |||
def execute_request(self, stream, ident, parent): | ||||
"""handle an execute_request""" | ||||
self._publish_status(u'busy', parent) | ||||
Fernando Perez
|
r2598 | try: | ||
Fernando Perez
|
r2926 | content = parent[u'content'] | ||
code = content[u'code'] | ||||
Bernardo B. Marques
|
r4872 | silent = content[u'silent'] | ||
Jason Grout
|
r7981 | store_history = content.get(u'store_history', not silent) | ||
Fernando Perez
|
r2598 | except: | ||
MinRK
|
r3970 | self.log.error("Got bad msg: ") | ||
MinRK
|
r6783 | self.log.error("%s", parent) | ||
Fernando Perez
|
r2598 | return | ||
MinRK
|
r6807 | |||
MinRK
|
r7957 | md = self._make_metadata(parent['metadata']) | ||
epatters
|
r2722 | |||
Fernando Perez
|
r2926 | shell = self.shell # we'll need this a lot here | ||
Bernardo B. Marques
|
r4872 | # Replace raw_input. Note that is not sufficient to replace | ||
Fernando Perez
|
r2926 | # raw_input in the user namespace. | ||
MinRK
|
r4952 | if content.get('allow_stdin', False): | ||
raw_input = lambda prompt='': self._raw_input(prompt, ident, parent) | ||||
else: | ||||
raw_input = lambda prompt='' : self._no_raw_input() | ||||
Thomas Kluyver
|
r4772 | if py3compat.PY3: | ||
Pietro Berkes
|
r8940 | self._sys_raw_input = __builtin__.input | ||
Thomas Kluyver
|
r4772 | __builtin__.input = raw_input | ||
else: | ||||
Pietro Berkes
|
r8940 | self._sys_raw_input = __builtin__.raw_input | ||
Thomas Kluyver
|
r4772 | __builtin__.raw_input = raw_input | ||
Fernando Perez
|
r2926 | |||
# Set the parent message of the display hook and out streams. | ||||
shell.displayhook.set_parent(parent) | ||||
Brian Granger
|
r3277 | shell.display_pub.set_parent(parent) | ||
MinRK
|
r8102 | shell.data_pub.set_parent(parent) | ||
y-p
|
r9556 | try: | ||
sys.stdout.set_parent(parent) | ||||
except AttributeError: | ||||
pass | ||||
try: | ||||
sys.stderr.set_parent(parent) | ||||
except AttributeError: | ||||
pass | ||||
Fernando Perez
|
r2926 | |||
# Re-broadcast our input for the benefit of listening clients, and | ||||
# start computing output | ||||
if not silent: | ||||
Paul Ivanov
|
r6542 | self._publish_pyin(code, parent, shell.execution_count) | ||
Fernando Perez
|
r2926 | |||
reply_content = {} | ||||
Fernando Perez
|
r2598 | try: | ||
MinRK
|
r6803 | # FIXME: the shell calls the exception handler itself. | ||
Jason Grout
|
r7981 | shell.run_cell(code, store_history=store_history, silent=silent) | ||
Fernando Perez
|
r2598 | except: | ||
Fernando Perez
|
r2926 | status = u'error' | ||
Fernando Perez
|
r2838 | # FIXME: this code right now isn't being used yet by default, | ||
Thomas Kluyver
|
r3752 | # because the run_cell() call above directly fires off exception | ||
Fernando Perez
|
r2838 | # reporting. This code, therefore, is only active in the scenario | ||
# where runlines itself has an unhandled exception. We need to | ||||
# uniformize this, for all exception construction to come from a | ||||
# single location in the codbase. | ||||
Fernando Perez
|
r2598 | etype, evalue, tb = sys.exc_info() | ||
Fernando Perez
|
r2838 | tb_list = traceback.format_exception(etype, evalue, tb) | ||
Fernando Perez
|
r2926 | reply_content.update(shell._showtraceback(etype, evalue, tb_list)) | ||
Fernando Perez
|
r2598 | else: | ||
Fernando Perez
|
r2926 | status = u'ok' | ||
epatters
|
r8482 | finally: | ||
# Restore raw_input. | ||||
if py3compat.PY3: | ||||
Pietro Berkes
|
r8940 | __builtin__.input = self._sys_raw_input | ||
epatters
|
r8482 | else: | ||
Pietro Berkes
|
r8940 | __builtin__.raw_input = self._sys_raw_input | ||
Fernando Perez
|
r2926 | |||
reply_content[u'status'] = status | ||||
Bernardo B. Marques
|
r4872 | |||
Fernando Perez
|
r3077 | # Return the execution counter so clients can display prompts | ||
MinRK
|
r6803 | reply_content['execution_count'] = shell.execution_count - 1 | ||
Fernando Perez
|
r2926 | |||
# FIXME - fish exception info out of shell, possibly left there by | ||||
# runlines. We'll need to clean up this logic later. | ||||
if shell._reply_content is not None: | ||||
reply_content.update(shell._reply_content) | ||||
MinRK
|
r7038 | e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute') | ||
reply_content['engine_info'] = e_info | ||||
MinRK
|
r3853 | # reset after use | ||
shell._reply_content = None | ||||
MinRK
|
r8579 | |||
if 'traceback' in reply_content: | ||||
self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback'])) | ||||
Fernando Perez
|
r2926 | |||
# At this point, we can tell whether the main code execution succeeded | ||||
# or not. If it did, we proceed to evaluate user_variables/expressions | ||||
if reply_content['status'] == 'ok': | ||||
reply_content[u'user_variables'] = \ | ||||
MinRK
|
r6792 | shell.user_variables(content.get(u'user_variables', [])) | ||
Fernando Perez
|
r2926 | reply_content[u'user_expressions'] = \ | ||
MinRK
|
r6792 | shell.user_expressions(content.get(u'user_expressions', {})) | ||
Fernando Perez
|
r2926 | else: | ||
# If there was an error, don't even try to compute variables or | ||||
# expressions | ||||
reply_content[u'user_variables'] = {} | ||||
reply_content[u'user_expressions'] = {} | ||||
Fernando Perez
|
r2987 | |||
# Payloads should be retrieved regardless of outcome, so we can both | ||||
# recover partial output (that could have been generated early in a | ||||
# block, before an error) and clear the payload system always. | ||||
reply_content[u'payload'] = shell.payload_manager.read_payload() | ||||
# Be agressive about clearing the payload because we don't want | ||||
# it to sit in memory until the next execute_request comes in. | ||||
shell.payload_manager.clear_payload() | ||||
Fernando Perez
|
r2938 | # Flush output before sending the reply. | ||
sys.stdout.flush() | ||||
sys.stderr.flush() | ||||
# FIXME: on rare occasions, the flush doesn't seem to make it to the | ||||
# clients... This seems to mitigate the problem, but we definitely need | ||||
# to better understand what's going on. | ||||
Fernando Perez
|
r2940 | if self._execute_sleep: | ||
time.sleep(self._execute_sleep) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3333 | # Send the reply. | ||
MinRK
|
r4782 | reply_content = json_clean(reply_content) | ||
MinRK
|
r6807 | |||
MinRK
|
r7957 | md['status'] = reply_content['status'] | ||
MinRK
|
r6807 | if reply_content['status'] == 'error' and \ | ||
reply_content['ename'] == 'UnmetDependency': | ||||
MinRK
|
r7957 | md['dependencies_met'] = False | ||
MinRK
|
r6807 | |||
MinRK
|
r6790 | reply_msg = self.session.send(stream, u'execute_reply', | ||
MinRK
|
r7957 | reply_content, parent, metadata=md, | ||
MinRK
|
r6807 | ident=ident) | ||
MinRK
|
r6783 | self.log.debug("%s", reply_msg) | ||
MinRK
|
r3333 | |||
MinRK
|
r6803 | if not silent and reply_msg['content']['status'] == u'error': | ||
self._abort_queues() | ||||
epatters
|
r2778 | |||
MinRK
|
r7491 | self._publish_status(u'idle', parent) | ||
Brian Granger
|
r3035 | |||
MinRK
|
r6790 | def complete_request(self, stream, ident, parent): | ||
Fernando Perez
|
r2856 | txt, matches = self._complete(parent) | ||
matches = {'matches' : matches, | ||||
'matched_text' : txt, | ||||
epatters
|
r2778 | 'status' : 'ok'} | ||
MinRK
|
r4782 | matches = json_clean(matches) | ||
MinRK
|
r6790 | completion_msg = self.session.send(stream, 'complete_reply', | ||
epatters
|
r2778 | matches, parent, ident) | ||
MinRK
|
r6783 | self.log.debug("%s", completion_msg) | ||
epatters
|
r2778 | |||
MinRK
|
r6790 | def object_info_request(self, stream, ident, parent): | ||
MinRK
|
r6556 | content = parent['content'] | ||
object_info = self.shell.object_inspect(content['oname'], | ||||
detail_level = content.get('detail_level', 0) | ||||
) | ||||
Fernando Perez
|
r3051 | # Before we send this object over, we scrub it for JSON usage | ||
oinfo = json_clean(object_info) | ||||
MinRK
|
r6790 | msg = self.session.send(stream, 'object_info_reply', | ||
Fernando Perez
|
r2948 | oinfo, parent, ident) | ||
MinRK
|
r6783 | self.log.debug("%s", msg) | ||
epatters
|
r2795 | |||
MinRK
|
r6790 | def history_request(self, stream, ident, parent): | ||
Thomas Kluyver
|
r3404 | # We need to pull these out, as passing **kwargs doesn't work with | ||
# unicode keys before Python 2.6.5. | ||||
Thomas Kluyver
|
r3817 | hist_access_type = parent['content']['hist_access_type'] | ||
epatters
|
r2844 | raw = parent['content']['raw'] | ||
Thomas Kluyver
|
r3404 | output = parent['content']['output'] | ||
Thomas Kluyver
|
r3817 | if hist_access_type == 'tail': | ||
n = parent['content']['n'] | ||||
hist = self.shell.history_manager.get_tail(n, raw=raw, output=output, | ||||
include_latest=True) | ||||
Bernardo B. Marques
|
r4872 | |||
Thomas Kluyver
|
r3817 | elif hist_access_type == 'range': | ||
session = parent['content']['session'] | ||||
start = parent['content']['start'] | ||||
stop = parent['content']['stop'] | ||||
hist = self.shell.history_manager.get_range(session, start, stop, | ||||
raw=raw, output=output) | ||||
Bernardo B. Marques
|
r4872 | |||
Thomas Kluyver
|
r3817 | elif hist_access_type == 'search': | ||
Takafumi Arakaki
|
r8404 | n = parent['content'].get('n') | ||
Takafumi Arakaki
|
r9415 | unique = parent['content'].get('unique', False) | ||
Thomas Kluyver
|
r3817 | pattern = parent['content']['pattern'] | ||
Takafumi Arakaki
|
r9413 | hist = self.shell.history_manager.search( | ||
pattern, raw=raw, output=output, n=n, unique=unique) | ||||
Bernardo B. Marques
|
r4872 | |||
Thomas Kluyver
|
r3817 | else: | ||
hist = [] | ||||
MinRK
|
r6057 | hist = list(hist) | ||
content = {'history' : hist} | ||||
MinRK
|
r4782 | content = json_clean(content) | ||
MinRK
|
r6790 | msg = self.session.send(stream, 'history_reply', | ||
epatters
|
r2795 | content, parent, ident) | ||
MinRK
|
r6057 | self.log.debug("Sending history reply with %i entries", len(hist)) | ||
Brian Granger
|
r3019 | |||
MinRK
|
r6790 | def connect_request(self, stream, ident, parent): | ||
Brian Granger
|
r3019 | if self._recorded_ports is not None: | ||
content = self._recorded_ports.copy() | ||||
else: | ||||
content = {} | ||||
MinRK
|
r6790 | msg = self.session.send(stream, 'connect_reply', | ||
Brian Granger
|
r3019 | content, parent, ident) | ||
MinRK
|
r6783 | self.log.debug("%s", msg) | ||
Brian Granger
|
r3019 | |||
Takafumi Arakaki
|
r8879 | def kernel_info_request(self, stream, ident, parent): | ||
Takafumi Arakaki
|
r8830 | vinfo = { | ||
Takafumi Arakaki
|
r8861 | 'protocol_version': protocol_version, | ||
'ipython_version': ipython_version, | ||||
'language_version': language_version, | ||||
'language': 'python', | ||||
Takafumi Arakaki
|
r8830 | } | ||
Takafumi Arakaki
|
r8879 | msg = self.session.send(stream, 'kernel_info_reply', | ||
Takafumi Arakaki
|
r8830 | vinfo, parent, ident) | ||
self.log.debug("%s", msg) | ||||
MinRK
|
r6790 | def shutdown_request(self, stream, ident, parent): | ||
Fernando Perez
|
r2972 | self.shell.exit_now = True | ||
MinRK
|
r6799 | content = dict(status='ok') | ||
content.update(parent['content']) | ||||
self.session.send(stream, u'shutdown_reply', content, parent, ident=ident) | ||||
# same content, but different msg_id for broadcasting on IOPub | ||||
Fernando Perez
|
r5471 | self._shutdown_message = self.session.msg(u'shutdown_reply', | ||
MinRK
|
r6799 | content, parent | ||
MinRK
|
r6790 | ) | ||
self._at_shutdown() | ||||
# call sys.exit after a short delay | ||||
MinRK
|
r6826 | loop = ioloop.IOLoop.instance() | ||
loop.add_timeout(time.time()+0.1, loop.stop) | ||||
Brian Granger
|
r3019 | |||
epatters
|
r2778 | #--------------------------------------------------------------------------- | ||
MinRK
|
r6782 | # Engine methods | ||
#--------------------------------------------------------------------------- | ||||
MinRK
|
r6790 | def apply_request(self, stream, ident, parent): | ||
MinRK
|
r6782 | try: | ||
content = parent[u'content'] | ||||
bufs = parent[u'buffers'] | ||||
msg_id = parent['header']['msg_id'] | ||||
except: | ||||
MinRK
|
r6783 | self.log.error("Got bad msg: %s", parent, exc_info=True) | ||
MinRK
|
r6782 | return | ||
MinRK
|
r6834 | |||
MinRK
|
r7491 | self._publish_status(u'busy', parent) | ||
y-p
|
r9556 | |||
MinRK
|
r6834 | # Set the parent message of the display hook and out streams. | ||
MinRK
|
r7469 | shell = self.shell | ||
shell.displayhook.set_parent(parent) | ||||
shell.display_pub.set_parent(parent) | ||||
MinRK
|
r8102 | shell.data_pub.set_parent(parent) | ||
y-p
|
r9556 | try: | ||
sys.stdout.set_parent(parent) | ||||
except AttributeError: | ||||
pass | ||||
try: | ||||
sys.stderr.set_parent(parent) | ||||
except AttributeError: | ||||
pass | ||||
MinRK
|
r6834 | |||
MinRK
|
r6782 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) | ||
MinRK
|
r6804 | # self.iopub_socket.send(pyin_msg) | ||
# self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent) | ||||
MinRK
|
r7957 | md = self._make_metadata(parent['metadata']) | ||
MinRK
|
r6782 | try: | ||
MinRK
|
r7469 | working = shell.user_ns | ||
MinRK
|
r6796 | |||
MinRK
|
r6782 | prefix = "_"+str(msg_id).replace("-","")+"_" | ||
f,args,kwargs = unpack_apply_message(bufs, working, copy=False) | ||||
fname = getattr(f, '__name__', 'f') | ||||
fname = prefix+"f" | ||||
argname = prefix+"args" | ||||
kwargname = prefix+"kwargs" | ||||
resultname = prefix+"result" | ||||
ns = { fname : f, argname : args, kwargname : kwargs , resultname : None } | ||||
# print ns | ||||
working.update(ns) | ||||
MinRK
|
r6796 | code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname) | ||
MinRK
|
r6782 | try: | ||
MinRK
|
r7469 | exec code in shell.user_global_ns, shell.user_ns | ||
MinRK
|
r6782 | result = working.get(resultname) | ||
finally: | ||||
for key in ns.iterkeys(): | ||||
working.pop(key) | ||||
MinRK
|
r8033 | result_buf = serialize_object(result, | ||
buffer_threshold=self.session.buffer_threshold, | ||||
item_threshold=self.session.item_threshold, | ||||
) | ||||
MinRK
|
r7967 | |||
MinRK
|
r6782 | except: | ||
MinRK
|
r7469 | # invoke IPython traceback formatting | ||
shell.showtraceback() | ||||
# FIXME - fish exception info out of shell, possibly left there by | ||||
# run_code. We'll need to clean up this logic later. | ||||
reply_content = {} | ||||
if shell._reply_content is not None: | ||||
reply_content.update(shell._reply_content) | ||||
e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply') | ||||
reply_content['engine_info'] = e_info | ||||
# reset after use | ||||
shell._reply_content = None | ||||
self.session.send(self.iopub_socket, u'pyerr', reply_content, parent=parent, | ||||
MinRK
|
r6795 | ident=self._topic('pyerr')) | ||
MinRK
|
r8579 | self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback'])) | ||
MinRK
|
r6782 | result_buf = [] | ||
MinRK
|
r7469 | if reply_content['ename'] == 'UnmetDependency': | ||
MinRK
|
r7957 | md['dependencies_met'] = False | ||
MinRK
|
r6782 | else: | ||
reply_content = {'status' : 'ok'} | ||||
# put 'ok'/'error' status in header, for scheduler introspection: | ||||
MinRK
|
r7957 | md['status'] = reply_content['status'] | ||
MinRK
|
r6782 | |||
# flush i/o | ||||
sys.stdout.flush() | ||||
sys.stderr.flush() | ||||
MinRK
|
r6783 | |||
MinRK
|
r6790 | reply_msg = self.session.send(stream, u'apply_reply', reply_content, | ||
MinRK
|
r7957 | parent=parent, ident=ident,buffers=result_buf, metadata=md) | ||
MinRK
|
r6782 | |||
MinRK
|
r7491 | self._publish_status(u'idle', parent) | ||
MinRK
|
r6782 | #--------------------------------------------------------------------------- | ||
# Control messages | ||||
#--------------------------------------------------------------------------- | ||||
MinRK
|
r6790 | def abort_request(self, stream, ident, parent): | ||
MinRK
|
r6782 | """abort a specifig msg by id""" | ||
msg_ids = parent['content'].get('msg_ids', None) | ||||
if isinstance(msg_ids, basestring): | ||||
msg_ids = [msg_ids] | ||||
if not msg_ids: | ||||
self.abort_queues() | ||||
for mid in msg_ids: | ||||
self.aborted.add(str(mid)) | ||||
content = dict(status='ok') | ||||
MinRK
|
r6790 | reply_msg = self.session.send(stream, 'abort_reply', content=content, | ||
MinRK
|
r6782 | parent=parent, ident=ident) | ||
MinRK
|
r6783 | self.log.debug("%s", reply_msg) | ||
MinRK
|
r6782 | |||
MinRK
|
r6790 | def clear_request(self, stream, idents, parent): | ||
MinRK
|
r6782 | """Clear our namespace.""" | ||
MinRK
|
r6796 | self.shell.reset(False) | ||
MinRK
|
r6790 | msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent, | ||
MinRK
|
r6782 | content = dict(status='ok')) | ||
#--------------------------------------------------------------------------- | ||||
epatters
|
r2778 | # Protected interface | ||
#--------------------------------------------------------------------------- | ||||
Fernando Perez
|
r2598 | |||
MinRK
|
r6796 | def _wrap_exception(self, method=None): | ||
# import here, because _wrap_exception is only used in parallel, | ||||
# and parallel has higher min pyzmq version | ||||
from IPython.parallel.error import wrap_exception | ||||
e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method) | ||||
content = wrap_exception(e_info) | ||||
return content | ||||
MinRK
|
r6795 | def _topic(self, topic): | ||
"""prefixed topic for IOPub messages""" | ||||
if self.int_id >= 0: | ||||
base = "engine.%i" % self.int_id | ||||
else: | ||||
base = "kernel.%s" % self.ident | ||||
return py3compat.cast_bytes("%s.%s" % (base, topic)) | ||||
MinRK
|
r6785 | def _abort_queues(self): | ||
MinRK
|
r6790 | for stream in self.shell_streams: | ||
if stream: | ||||
self._abort_queue(stream) | ||||
MinRK
|
r6785 | |||
MinRK
|
r6790 | def _abort_queue(self, stream): | ||
MinRK
|
r6799 | poller = zmq.Poller() | ||
poller.register(stream.socket, zmq.POLLIN) | ||||
epatters
|
r2778 | while True: | ||
MinRK
|
r6790 | idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True) | ||
MinRK
|
r3269 | if msg is None: | ||
MinRK
|
r6785 | return | ||
Fernando Perez
|
r3322 | |||
MinRK
|
r6785 | self.log.info("Aborting:") | ||
self.log.info("%s", msg) | ||||
Brian E. Granger
|
r4230 | msg_type = msg['header']['msg_type'] | ||
epatters
|
r2778 | reply_type = msg_type.split('_')[0] + '_reply' | ||
MinRK
|
r6807 | |||
status = {'status' : 'aborted'} | ||||
MinRK
|
r7957 | md = {'engine' : self.ident} | ||
md.update(status) | ||||
MinRK
|
r7978 | reply_msg = self.session.send(stream, reply_type, metadata=md, | ||
MinRK
|
r6807 | content=status, parent=msg, ident=idents) | ||
MinRK
|
r6783 | self.log.debug("%s", reply_msg) | ||
epatters
|
r2778 | # We need to wait a bit for requests to come in. This can probably | ||
# be set shorter for true asynchronous clients. | ||||
MinRK
|
r6799 | poller.poll(50) | ||
MinRK
|
r6785 | |||
epatters
|
r2778 | |||
MinRK
|
r4952 | def _no_raw_input(self): | ||
Fernando Perez
|
r5471 | """Raise StdinNotImplentedError if active frontend doesn't support | ||
stdin.""" | ||||
raise StdinNotImplementedError("raw_input was called, but this " | ||||
"frontend does not support stdin.") | ||||
MinRK
|
r4952 | |||
epatters
|
r2778 | def _raw_input(self, prompt, ident, parent): | ||
epatters
|
r2730 | # Flush output before making the request. | ||
sys.stderr.flush() | ||||
sys.stdout.flush() | ||||
MinRK
|
r10367 | # flush the stdin socket, to purge stale replies | ||
while True: | ||||
try: | ||||
self.stdin_socket.recv_multipart(zmq.NOBLOCK) | ||||
except zmq.ZMQError as e: | ||||
if e.errno == zmq.EAGAIN: | ||||
break | ||||
else: | ||||
raise | ||||
epatters
|
r2730 | # Send the input request. | ||
MinRK
|
r4782 | content = json_clean(dict(prompt=prompt)) | ||
Fernando Perez
|
r5471 | self.session.send(self.stdin_socket, u'input_request', content, parent, | ||
ident=ident) | ||||
epatters
|
r2730 | |||
# Await a response. | ||||
MinRK
|
r4521 | while True: | ||
try: | ||||
ident, reply = self.session.recv(self.stdin_socket, 0) | ||||
except Exception: | ||||
self.log.warn("Invalid Message:", exc_info=True) | ||||
else: | ||||
break | ||||
epatters
|
r2730 | try: | ||
value = reply['content']['value'] | ||||
except: | ||||
MinRK
|
r3970 | self.log.error("Got bad raw_input reply: ") | ||
MinRK
|
r6783 | self.log.error("%s", parent) | ||
epatters
|
r2730 | value = '' | ||
MinRK
|
r5622 | if value == '\x04': | ||
# EOF | ||||
raise EOFError | ||||
epatters
|
r2730 | return value | ||
Bernardo B. Marques
|
r4872 | |||
epatters
|
r2778 | def _complete(self, msg): | ||
Fernando Perez
|
r2839 | c = msg['content'] | ||
try: | ||||
cpos = int(c['cursor_pos']) | ||||
except: | ||||
# If we don't get something that we can convert to an integer, at | ||||
Fernando Perez
|
r2856 | # least attempt the completion guessing the cursor is at the end of | ||
# the text, if there's any, and otherwise of the line | ||||
Fernando Perez
|
r2839 | cpos = len(c['text']) | ||
Fernando Perez
|
r2856 | if cpos==0: | ||
cpos = len(c['line']) | ||||
Fernando Perez
|
r2839 | return self.shell.complete(c['text'], c['line'], cpos) | ||
Fernando Perez
|
r2598 | |||
Fernando Perez
|
r2972 | def _at_shutdown(self): | ||
"""Actions taken at shutdown by the kernel, called by python's atexit. | ||||
""" | ||||
# io.rprint("Kernel at_shutdown") # dbg | ||||
if self._shutdown_message is not None: | ||||
MinRK
|
r6804 | self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown')) | ||
MinRK
|
r6783 | self.log.debug("%s", self._shutdown_message) | ||
MinRK
|
r6804 | [ s.flush(zmq.POLLOUT) for s in self.shell_streams ] | ||
Fernando Perez
|
r2972 | |||