kernelbase.py
675 lines
| 24.3 KiB
| text/x-python
|
PythonLexer
Thomas Kluyver
|
r16857 | """Base class for a kernel that talks to frontends over 0MQ.""" | ||
Fernando Perez
|
r2598 | |||
MinRK
|
r16566 | # Copyright (c) IPython Development Team. | ||
# Distributed under the terms of the Modified BSD License. | ||||
Fernando Perez
|
r2856 | from __future__ import print_function | ||
epatters
|
r2712 | |||
Fernando Perez
|
r2598 | import sys | ||
import time | ||||
Omar Andres Zapata Mesa
|
r3294 | import logging | ||
MinRK
|
r6788 | import uuid | ||
from datetime import datetime | ||||
Paul Ivanov
|
r5601 | from signal import ( | ||
Thomas Kluyver
|
r11130 | signal, default_int_handler, SIGINT | ||
Paul Ivanov
|
r5601 | ) | ||
MinRK
|
r6789 | |||
Fernando Perez
|
r2598 | import zmq | ||
MinRK
|
r6790 | from zmq.eventloop import ioloop | ||
from zmq.eventloop.zmqstream import ZMQStream | ||||
Fernando Perez
|
r2598 | |||
Brian Granger
|
r2759 | from IPython.config.configurable import Configurable | ||
MinRK
|
r4952 | from IPython.core.error import StdinNotImplementedError | ||
Takafumi Arakaki
|
r9094 | from IPython.core import release | ||
Thomas Kluyver
|
r4772 | from IPython.utils import py3compat | ||
Thomas Kluyver
|
r16857 | from IPython.utils.py3compat import unicode_type, string_types | ||
Fernando Perez
|
r2948 | from IPython.utils.jsonutil import json_clean | ||
MinRK
|
r3970 | from IPython.utils.traitlets import ( | ||
Thomas Kluyver
|
r16857 | Any, Instance, Float, Dict, List, Set, Integer, Unicode, Bool, | ||
MinRK
|
r3970 | ) | ||
MinRK
|
r3979 | |||
Thomas Kluyver
|
r13347 | from .session import Session | ||
Fernando Perez
|
r2598 | |||
Omar Andres Zapata Mesa
|
r3314 | |||
Thomas Kluyver
|
r17095 | class Kernel(Configurable): | ||
Brian Granger
|
r2759 | |||
epatters
|
r2778 | #--------------------------------------------------------------------------- | ||
# Kernel interface | ||||
#--------------------------------------------------------------------------- | ||||
MinRK
|
r5153 | # attribute to override with a GUI | ||
eventloop = Any(None) | ||||
MinRK
|
r6790 | def _eventloop_changed(self, name, old, new): | ||
"""schedule call to eventloop from IOLoop""" | ||||
loop = ioloop.IOLoop.instance() | ||||
MinRK
|
r15232 | loop.add_callback(self.enter_eventloop) | ||
MinRK
|
r5153 | |||
epatters
|
r2778 | session = Instance(Session) | ||
MinRK
|
r5261 | profile_dir = Instance('IPython.core.profiledir.ProfileDir') | ||
MinRK
|
r6790 | shell_streams = List() | ||
control_stream = Instance(ZMQStream) | ||||
MinRK
|
r6804 | iopub_socket = Instance(zmq.Socket) | ||
MinRK
|
r6790 | stdin_socket = Instance(zmq.Socket) | ||
MinRK
|
r3970 | log = Instance(logging.Logger) | ||
Brian Granger
|
r2759 | |||
MinRK
|
r6789 | # identities: | ||
int_id = Integer(-1) | ||||
ident = Unicode() | ||||
def _ident_default(self): | ||||
Thomas Kluyver
|
r13353 | return unicode_type(uuid.uuid4()) | ||
MinRK
|
r6789 | |||
Fernando Perez
|
r2940 | # Private interface | ||
MinRK
|
r6785 | |||
MinRK
|
r13583 | _darwin_app_nap = Bool(True, config=True, | ||
help="""Whether to use appnope for compatiblity with OS X App Nap. | ||||
Only affects OS X >= 10.9. | ||||
""" | ||||
) | ||||
MinRK
|
r16574 | # track associations with current request | ||
_allow_stdin = Bool(False) | ||||
_parent_header = Dict() | ||||
_parent_ident = Any(b'') | ||||
Fernando Perez
|
r2940 | # Time to sleep after flushing the stdout/err buffers in each execute | ||
# cycle. While this introduces a hard limit on the minimal latency of the | ||||
# execute cycle, it helps prevent output synchronization problems for | ||||
# clients. | ||||
# Units are in seconds. The minimum zmq latency on local host is probably | ||||
# ~150 microseconds, set this to 500us for now. We may need to increase it | ||||
# a little if it's not enough after more interactive testing. | ||||
_execute_sleep = Float(0.0005, config=True) | ||||
# Frequency of the kernel's event loop. | ||||
# Units are in seconds, kernel subclasses for GUI toolkits may need to | ||||
# adapt to milliseconds. | ||||
_poll_interval = Float(0.05, config=True) | ||||
Fernando Perez
|
r2972 | |||
# If the shutdown was requested over the network, we leave here the | ||||
# necessary reply message so it can be sent by our registered atexit | ||||
# handler. This ensures that the reply is only sent to clients truly at | ||||
# the end of our shutdown process (which happens after the underlying | ||||
# IPython shell's own shutdown). | ||||
_shutdown_message = None | ||||
Brian Granger
|
r3019 | |||
# This is a dict of port number that the kernel is listening on. It is set | ||||
# by record_ports and used by connect_request. | ||||
MinRK
|
r3970 | _recorded_ports = Dict() | ||
Pietro Berkes
|
r8940 | |||
MinRK
|
r6785 | # set of aborted msg_ids | ||
aborted = Set() | ||||
MinRK
|
r3970 | |||
Thomas Kluyver
|
r16857 | # Track execution count here. For IPython, we override this to use the | ||
# execution count we store in the shell. | ||||
execution_count = 0 | ||||
Omar Andres Zapata Mesa
|
r3294 | |||
Brian Granger
|
r2759 | def __init__(self, **kwargs): | ||
Thomas Kluyver
|
r17095 | super(Kernel, self).__init__(**kwargs) | ||
Brian Granger
|
r2786 | |||
Fernando Perez
|
r2598 | # Build dict of handlers for message types | ||
Bernardo B. Marques
|
r4872 | msg_types = [ 'execute_request', 'complete_request', | ||
MinRK
|
r16587 | 'inspect_request', 'history_request', | ||
Takafumi Arakaki
|
r8879 | 'kernel_info_request', | ||
MinRK
|
r6782 | 'connect_request', 'shutdown_request', | ||
'apply_request', | ||||
] | ||||
MinRK
|
r6790 | self.shell_handlers = {} | ||
epatters
|
r2612 | for msg_type in msg_types: | ||
MinRK
|
r6790 | self.shell_handlers[msg_type] = getattr(self, msg_type) | ||
MinRK
|
r6783 | |||
MinRK
|
r6799 | control_msg_types = msg_types + [ 'clear_request', 'abort_request' ] | ||
MinRK
|
r6782 | self.control_handlers = {} | ||
for msg_type in control_msg_types: | ||||
self.control_handlers[msg_type] = getattr(self, msg_type) | ||||
MinRK
|
r13188 | |||
MinRK
|
r6790 | def dispatch_control(self, msg): | ||
"""dispatch control requests""" | ||||
idents,msg = self.session.feed_identities(msg, copy=False) | ||||
try: | ||||
msg = self.session.unserialize(msg, content=True, copy=False) | ||||
except: | ||||
self.log.error("Invalid Control Message", exc_info=True) | ||||
return | ||||
Brian E. Granger
|
r4237 | |||
MinRK
|
r6790 | self.log.debug("Control received: %s", msg) | ||
Brian E. Granger
|
r4237 | |||
MinRK
|
r6790 | header = msg['header'] | ||
msg_type = header['msg_type'] | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r6790 | handler = self.control_handlers.get(msg_type, None) | ||
if handler is None: | ||||
self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type) | ||||
else: | ||||
MinRK
|
r6792 | try: | ||
handler(self.control_stream, idents, msg) | ||||
except Exception: | ||||
self.log.error("Exception in control handler:", exc_info=True) | ||||
MinRK
|
r6790 | |||
def dispatch_shell(self, stream, msg): | ||||
"""dispatch shell requests""" | ||||
# flush control requests first | ||||
if self.control_stream: | ||||
self.control_stream.flush() | ||||
idents,msg = self.session.feed_identities(msg, copy=False) | ||||
try: | ||||
msg = self.session.unserialize(msg, content=True, copy=False) | ||||
except: | ||||
self.log.error("Invalid Message", exc_info=True) | ||||
return | ||||
header = msg['header'] | ||||
msg_id = header['msg_id'] | ||||
msg_type = msg['header']['msg_type'] | ||||
Fernando Perez
|
r2926 | # Print some info about this message and leave a '--->' marker, so it's | ||
# easier to trace visually the message chain when debugging. Each | ||||
# handler prints its message at the end. | ||||
MinRK
|
r6783 | self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type) | ||
self.log.debug(' Content: %s\n --->\n ', msg['content']) | ||||
Fernando Perez
|
r2926 | |||
MinRK
|
r6785 | if msg_id in self.aborted: | ||
self.aborted.remove(msg_id) | ||||
# is it safe to assume a msg_id will not be resubmitted? | ||||
reply_type = msg_type.split('_')[0] + '_reply' | ||||
status = {'status' : 'aborted'} | ||||
MinRK
|
r7957 | md = {'engine' : self.ident} | ||
md.update(status) | ||||
Thomas Kluyver
|
r16857 | self.session.send(stream, reply_type, metadata=md, | ||
MinRK
|
r6785 | content=status, parent=msg, ident=idents) | ||
return | ||||
MinRK
|
r6790 | handler = self.shell_handlers.get(msg_type, None) | ||
Brian Granger
|
r2868 | if handler is None: | ||
MinRK
|
r6790 | self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type) | ||
Brian Granger
|
r2868 | else: | ||
MinRK
|
r6790 | # ensure default_int_handler during handler call | ||
sig = signal(SIGINT, default_int_handler) | ||||
MinRK
|
r16580 | self.log.debug("%s: %s", msg_type, msg) | ||
MinRK
|
r6790 | try: | ||
handler(stream, idents, msg) | ||||
MinRK
|
r6792 | except Exception: | ||
self.log.error("Exception in message handler:", exc_info=True) | ||||
MinRK
|
r6790 | finally: | ||
signal(SIGINT, sig) | ||||
def enter_eventloop(self): | ||||
"""enter eventloop""" | ||||
MinRK
|
r15232 | self.log.info("entering eventloop %s", self.eventloop) | ||
for stream in self.shell_streams: | ||||
# flush any pending replies, | ||||
# which may be skipped by entering the eventloop | ||||
stream.flush(zmq.POLLOUT) | ||||
MinRK
|
r6790 | # restore default_int_handler | ||
signal(SIGINT, default_int_handler) | ||||
MinRK
|
r6827 | while self.eventloop is not None: | ||
try: | ||||
self.eventloop(self) | ||||
except KeyboardInterrupt: | ||||
# Ctrl-C shouldn't crash the kernel | ||||
self.log.error("KeyboardInterrupt caught in kernel") | ||||
continue | ||||
else: | ||||
# eventloop exited cleanly, this means we should stop (right?) | ||||
self.eventloop = None | ||||
break | ||||
MinRK
|
r6882 | self.log.info("exiting eventloop") | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r6790 | def start(self): | ||
"""register dispatchers for streams""" | ||||
if self.control_stream: | ||||
self.control_stream.on_recv(self.dispatch_control, copy=False) | ||||
Fernando Perez
|
r2950 | |||
MinRK
|
r6790 | def make_dispatcher(stream): | ||
def dispatcher(msg): | ||||
return self.dispatch_shell(stream, msg) | ||||
return dispatcher | ||||
epatters
|
r2778 | |||
MinRK
|
r6790 | for s in self.shell_streams: | ||
s.on_recv(make_dispatcher(s), copy=False) | ||||
MinRK
|
r10338 | |||
# publish idle status | ||||
self._publish_status('starting') | ||||
MinRK
|
r6790 | |||
def do_one_iteration(self): | ||||
"""step eventloop just once""" | ||||
if self.control_stream: | ||||
self.control_stream.flush() | ||||
for stream in self.shell_streams: | ||||
# handle at most one request per iteration | ||||
stream.flush(zmq.POLLIN, 1) | ||||
stream.flush(zmq.POLLOUT) | ||||
MinRK
|
r5153 | |||
Brian Granger
|
r3019 | |||
MinRK
|
r3970 | def record_ports(self, ports): | ||
Brian Granger
|
r3019 | """Record the ports that this kernel is using. | ||
The creator of the Kernel instance must call this methods if they | ||||
want the :meth:`connect_request` method to return the port numbers. | ||||
""" | ||||
MinRK
|
r3970 | self._recorded_ports = ports | ||
Brian Granger
|
r3019 | |||
epatters
|
r2778 | #--------------------------------------------------------------------------- | ||
# Kernel request handlers | ||||
#--------------------------------------------------------------------------- | ||||
MinRK
|
r6807 | |||
MinRK
|
r7957 | def _make_metadata(self, other=None): | ||
"""init metadata dict, for execute/apply_reply""" | ||||
new_md = { | ||||
MinRK
|
r6807 | 'dependencies_met' : True, | ||
'engine' : self.ident, | ||||
'started': datetime.now(), | ||||
} | ||||
MinRK
|
r7957 | if other: | ||
new_md.update(other) | ||||
return new_md | ||||
MinRK
|
r6807 | |||
MinRK
|
r16567 | def _publish_execute_input(self, code, parent, execution_count): | ||
"""Publish the code request on the iopub stream.""" | ||||
Fernando Perez
|
r2926 | |||
MinRK
|
r16567 | self.session.send(self.iopub_socket, u'execute_input', | ||
MinRK
|
r6794 | {u'code':code, u'execution_count': execution_count}, | ||
MinRK
|
r16567 | parent=parent, ident=self._topic('execute_input') | ||
MinRK
|
r6794 | ) | ||
MinRK
|
r7491 | |||
def _publish_status(self, status, parent=None): | ||||
"""send status (busy/idle) on IOPub""" | ||||
MinRK
|
r6804 | self.session.send(self.iopub_socket, | ||
Fernando Perez
|
r5471 | u'status', | ||
MinRK
|
r7491 | {u'execution_state': status}, | ||
MinRK
|
r6794 | parent=parent, | ||
MinRK
|
r6795 | ident=self._topic('status'), | ||
MinRK
|
r6794 | ) | ||
MinRK
|
r16573 | |||
MinRK
|
r16574 | def set_parent(self, ident, parent): | ||
MinRK
|
r16665 | """Set the current parent_header | ||
MinRK
|
r16574 | |||
MinRK
|
r16665 | Side effects (IOPub messages) and replies are associated with | ||
the request that caused them via the parent_header. | ||||
The parent identity is used to route input_request messages | ||||
on the stdin channel. | ||||
MinRK
|
r16574 | """ | ||
self._parent_ident = ident | ||||
self._parent_header = parent | ||||
Thomas Kluyver
|
r16857 | |||
def send_response(self, stream, msg_or_type, content=None, ident=None, | ||||
buffers=None, track=False, header=None, metadata=None): | ||||
"""Send a response to the message we're currently processing. | ||||
This accepts all the parameters of :meth:`IPython.kernel.zmq.session.Session.send` | ||||
except ``parent``. | ||||
This relies on :meth:`set_parent` having been called for the current | ||||
message. | ||||
""" | ||||
return self.session.send(stream, msg_or_type, content, self._parent_header, | ||||
ident, buffers, track, header, metadata) | ||||
MinRK
|
r16574 | |||
MinRK
|
r7491 | def execute_request(self, stream, ident, parent): | ||
"""handle an execute_request""" | ||||
self._publish_status(u'busy', parent) | ||||
Fernando Perez
|
r2598 | try: | ||
Fernando Perez
|
r2926 | content = parent[u'content'] | ||
Thomas Kluyver
|
r13893 | code = py3compat.cast_unicode_py2(content[u'code']) | ||
Bernardo B. Marques
|
r4872 | silent = content[u'silent'] | ||
Jason Grout
|
r7981 | store_history = content.get(u'store_history', not silent) | ||
Thomas Kluyver
|
r16857 | user_expressions = content.get('user_expressions', {}) | ||
allow_stdin = content.get('allow_stdin', False) | ||||
Fernando Perez
|
r2598 | except: | ||
MinRK
|
r3970 | self.log.error("Got bad msg: ") | ||
MinRK
|
r6783 | self.log.error("%s", parent) | ||
Fernando Perez
|
r2598 | return | ||
MinRK
|
r6807 | |||
MinRK
|
r7957 | md = self._make_metadata(parent['metadata']) | ||
MinRK
|
r16574 | |||
Fernando Perez
|
r2926 | # Set the parent message of the display hook and out streams. | ||
MinRK
|
r16574 | self.set_parent(ident, parent) | ||
Fernando Perez
|
r2926 | # Re-broadcast our input for the benefit of listening clients, and | ||
# start computing output | ||||
if not silent: | ||||
Thomas Kluyver
|
r16857 | self.execution_count += 1 | ||
self._publish_execute_input(code, parent, self.execution_count) | ||||
MinRK
|
r8579 | |||
Thomas Kluyver
|
r16857 | reply_content = self.do_execute(code, silent, store_history, | ||
user_expressions, allow_stdin) | ||||
Fernando Perez
|
r2987 | |||
Fernando Perez
|
r2938 | # Flush output before sending the reply. | ||
sys.stdout.flush() | ||||
sys.stderr.flush() | ||||
# FIXME: on rare occasions, the flush doesn't seem to make it to the | ||||
# clients... This seems to mitigate the problem, but we definitely need | ||||
# to better understand what's going on. | ||||
Fernando Perez
|
r2940 | if self._execute_sleep: | ||
time.sleep(self._execute_sleep) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3333 | # Send the reply. | ||
MinRK
|
r4782 | reply_content = json_clean(reply_content) | ||
MinRK
|
r6807 | |||
MinRK
|
r7957 | md['status'] = reply_content['status'] | ||
MinRK
|
r6807 | if reply_content['status'] == 'error' and \ | ||
reply_content['ename'] == 'UnmetDependency': | ||||
MinRK
|
r7957 | md['dependencies_met'] = False | ||
MinRK
|
r6807 | |||
MinRK
|
r6790 | reply_msg = self.session.send(stream, u'execute_reply', | ||
MinRK
|
r7957 | reply_content, parent, metadata=md, | ||
MinRK
|
r6807 | ident=ident) | ||
MinRK
|
r6783 | self.log.debug("%s", reply_msg) | ||
MinRK
|
r3333 | |||
MinRK
|
r6803 | if not silent and reply_msg['content']['status'] == u'error': | ||
self._abort_queues() | ||||
epatters
|
r2778 | |||
MinRK
|
r7491 | self._publish_status(u'idle', parent) | ||
Brian Granger
|
r3035 | |||
Thomas Kluyver
|
r16858 | def do_execute(self, code, silent, store_history=True, | ||
user_experssions=None, allow_stdin=False): | ||||
"""Execute user code. Must be overridden by subclasses. | ||||
""" | ||||
raise NotImplementedError | ||||
MinRK
|
r6790 | def complete_request(self, stream, ident, parent): | ||
MinRK
|
r16580 | content = parent['content'] | ||
code = content['code'] | ||||
cursor_pos = content['cursor_pos'] | ||||
Thomas Kluyver
|
r16857 | matches = self.do_complete(code, cursor_pos) | ||
MinRK
|
r4782 | matches = json_clean(matches) | ||
MinRK
|
r6790 | completion_msg = self.session.send(stream, 'complete_reply', | ||
epatters
|
r2778 | matches, parent, ident) | ||
MinRK
|
r6783 | self.log.debug("%s", completion_msg) | ||
epatters
|
r2778 | |||
Thomas Kluyver
|
r16858 | def do_complete(self, code, cursor_pos): | ||
"""Override in subclasses to find completions. | ||||
""" | ||||
return {'matches' : [], | ||||
'cursor_end' : cursor_pos, | ||||
'cursor_start' : cursor_pos, | ||||
'metadata' : {}, | ||||
'status' : 'ok'} | ||||
MinRK
|
r16587 | def inspect_request(self, stream, ident, parent): | ||
MinRK
|
r6556 | content = parent['content'] | ||
MinRK
|
r16580 | |||
Thomas Kluyver
|
r16857 | reply_content = self.do_inspect(content['code'], content['cursor_pos'], | ||
content.get('detail_level', 0)) | ||||
Fernando Perez
|
r3051 | # Before we send this object over, we scrub it for JSON usage | ||
MinRK
|
r16580 | reply_content = json_clean(reply_content) | ||
MinRK
|
r16587 | msg = self.session.send(stream, 'inspect_reply', | ||
MinRK
|
r16580 | reply_content, parent, ident) | ||
MinRK
|
r6783 | self.log.debug("%s", msg) | ||
epatters
|
r2795 | |||
Thomas Kluyver
|
r16858 | def do_inspect(self, code, cursor_pos, detail_level=0): | ||
"""Override in subclasses to allow introspection. | ||||
""" | ||||
return {'status': 'ok', 'data':{}, 'metadata':{}, 'found':False} | ||||
MinRK
|
r6790 | def history_request(self, stream, ident, parent): | ||
Thomas Kluyver
|
r16857 | content = parent['content'] | ||
Bernardo B. Marques
|
r4872 | |||
Thomas Kluyver
|
r16973 | reply_content = self.do_history(**content) | ||
Thomas Kluyver
|
r16857 | |||
reply_content = json_clean(reply_content) | ||||
MinRK
|
r6790 | msg = self.session.send(stream, 'history_reply', | ||
Thomas Kluyver
|
r16857 | reply_content, parent, ident) | ||
self.log.debug("%s", msg) | ||||
Brian Granger
|
r3019 | |||
Thomas Kluyver
|
r16858 | def do_history(self, hist_access_type, output, raw, session=None, start=None, | ||
stop=None, n=None, pattern=None, unique=False): | ||||
"""Override in subclasses to access history. | ||||
""" | ||||
return {'history': []} | ||||
MinRK
|
r6790 | def connect_request(self, stream, ident, parent): | ||
Brian Granger
|
r3019 | if self._recorded_ports is not None: | ||
content = self._recorded_ports.copy() | ||||
else: | ||||
content = {} | ||||
MinRK
|
r6790 | msg = self.session.send(stream, 'connect_reply', | ||
Brian Granger
|
r3019 | content, parent, ident) | ||
MinRK
|
r6783 | self.log.debug("%s", msg) | ||
Brian Granger
|
r3019 | |||
Thomas Kluyver
|
r16857 | @property | ||
def kernel_info(self): | ||||
return { | ||||
'protocol_version': release.kernel_protocol_version, | ||||
'implementation': self.implementation, | ||||
'implementation_version': self.implementation_version, | ||||
'language': self.language, | ||||
'language_version': self.language_version, | ||||
'banner': self.banner, | ||||
Takafumi Arakaki
|
r8830 | } | ||
Thomas Kluyver
|
r16857 | |||
def kernel_info_request(self, stream, ident, parent): | ||||
Takafumi Arakaki
|
r8879 | msg = self.session.send(stream, 'kernel_info_reply', | ||
Thomas Kluyver
|
r16857 | self.kernel_info, parent, ident) | ||
Takafumi Arakaki
|
r8830 | self.log.debug("%s", msg) | ||
MinRK
|
r6790 | def shutdown_request(self, stream, ident, parent): | ||
Thomas Kluyver
|
r16857 | content = self.do_shutdown(parent['content']['restart']) | ||
MinRK
|
r6799 | self.session.send(stream, u'shutdown_reply', content, parent, ident=ident) | ||
# same content, but different msg_id for broadcasting on IOPub | ||||
Fernando Perez
|
r5471 | self._shutdown_message = self.session.msg(u'shutdown_reply', | ||
MinRK
|
r6799 | content, parent | ||
MinRK
|
r6790 | ) | ||
self._at_shutdown() | ||||
# call sys.exit after a short delay | ||||
MinRK
|
r6826 | loop = ioloop.IOLoop.instance() | ||
loop.add_timeout(time.time()+0.1, loop.stop) | ||||
Brian Granger
|
r3019 | |||
Thomas Kluyver
|
r16858 | def do_shutdown(self, restart): | ||
"""Override in subclasses to do things when the frontend shuts down the | ||||
kernel. | ||||
""" | ||||
return {'status': 'ok', 'restart': restart} | ||||
epatters
|
r2778 | #--------------------------------------------------------------------------- | ||
MinRK
|
r6782 | # Engine methods | ||
#--------------------------------------------------------------------------- | ||||
MinRK
|
r6790 | def apply_request(self, stream, ident, parent): | ||
MinRK
|
r6782 | try: | ||
content = parent[u'content'] | ||||
bufs = parent[u'buffers'] | ||||
msg_id = parent['header']['msg_id'] | ||||
except: | ||||
MinRK
|
r6783 | self.log.error("Got bad msg: %s", parent, exc_info=True) | ||
MinRK
|
r6782 | return | ||
MinRK
|
r6834 | |||
MinRK
|
r7491 | self._publish_status(u'busy', parent) | ||
y-p
|
r9556 | |||
MinRK
|
r6834 | # Set the parent message of the display hook and out streams. | ||
Thomas Kluyver
|
r16857 | self.set_parent(ident, parent) | ||
MinRK
|
r6834 | |||
MinRK
|
r7957 | md = self._make_metadata(parent['metadata']) | ||
MinRK
|
r6782 | |||
Thomas Kluyver
|
r16857 | reply_content, result_buf = self.do_apply(content, bufs, msg_id, md) | ||
MinRK
|
r6782 | |||
# put 'ok'/'error' status in header, for scheduler introspection: | ||||
MinRK
|
r7957 | md['status'] = reply_content['status'] | ||
MinRK
|
r6782 | |||
# flush i/o | ||||
sys.stdout.flush() | ||||
sys.stderr.flush() | ||||
MinRK
|
r6783 | |||
Thomas Kluyver
|
r16857 | self.session.send(stream, u'apply_reply', reply_content, | ||
MinRK
|
r7957 | parent=parent, ident=ident,buffers=result_buf, metadata=md) | ||
MinRK
|
r6782 | |||
MinRK
|
r7491 | self._publish_status(u'idle', parent) | ||
Thomas Kluyver
|
r16858 | def do_apply(self, content, bufs, msg_id, reply_metadata): | ||
"""Override in subclasses to support the IPython parallel framework. | ||||
""" | ||||
raise NotImplementedError | ||||
MinRK
|
r6782 | #--------------------------------------------------------------------------- | ||
# Control messages | ||||
#--------------------------------------------------------------------------- | ||||
MinRK
|
r6790 | def abort_request(self, stream, ident, parent): | ||
MinRK
|
r6782 | """abort a specifig msg by id""" | ||
msg_ids = parent['content'].get('msg_ids', None) | ||||
Thomas Kluyver
|
r13353 | if isinstance(msg_ids, string_types): | ||
MinRK
|
r6782 | msg_ids = [msg_ids] | ||
if not msg_ids: | ||||
self.abort_queues() | ||||
for mid in msg_ids: | ||||
self.aborted.add(str(mid)) | ||||
content = dict(status='ok') | ||||
MinRK
|
r6790 | reply_msg = self.session.send(stream, 'abort_reply', content=content, | ||
MinRK
|
r6782 | parent=parent, ident=ident) | ||
MinRK
|
r6783 | self.log.debug("%s", reply_msg) | ||
MinRK
|
r6782 | |||
MinRK
|
r6790 | def clear_request(self, stream, idents, parent): | ||
MinRK
|
r6782 | """Clear our namespace.""" | ||
Thomas Kluyver
|
r16857 | content = self.do_clear() | ||
self.session.send(stream, 'clear_reply', ident=idents, parent=parent, | ||||
content = content) | ||||
MinRK
|
r6782 | |||
Thomas Kluyver
|
r16858 | def do_clear(self): | ||
"""Override in subclasses to clear the namespace | ||||
This is only required for IPython.parallel. | ||||
""" | ||||
raise NotImplementedError | ||||
MinRK
|
r6782 | #--------------------------------------------------------------------------- | ||
epatters
|
r2778 | # Protected interface | ||
#--------------------------------------------------------------------------- | ||||
Fernando Perez
|
r2598 | |||
MinRK
|
r6795 | def _topic(self, topic): | ||
"""prefixed topic for IOPub messages""" | ||||
if self.int_id >= 0: | ||||
base = "engine.%i" % self.int_id | ||||
else: | ||||
base = "kernel.%s" % self.ident | ||||
return py3compat.cast_bytes("%s.%s" % (base, topic)) | ||||
MinRK
|
r6785 | def _abort_queues(self): | ||
MinRK
|
r6790 | for stream in self.shell_streams: | ||
if stream: | ||||
self._abort_queue(stream) | ||||
MinRK
|
r6785 | |||
MinRK
|
r6790 | def _abort_queue(self, stream): | ||
MinRK
|
r6799 | poller = zmq.Poller() | ||
poller.register(stream.socket, zmq.POLLIN) | ||||
epatters
|
r2778 | while True: | ||
MinRK
|
r6790 | idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True) | ||
MinRK
|
r3269 | if msg is None: | ||
MinRK
|
r6785 | return | ||
Fernando Perez
|
r3322 | |||
MinRK
|
r6785 | self.log.info("Aborting:") | ||
self.log.info("%s", msg) | ||||
Brian E. Granger
|
r4230 | msg_type = msg['header']['msg_type'] | ||
epatters
|
r2778 | reply_type = msg_type.split('_')[0] + '_reply' | ||
MinRK
|
r6807 | |||
status = {'status' : 'aborted'} | ||||
MinRK
|
r7957 | md = {'engine' : self.ident} | ||
md.update(status) | ||||
MinRK
|
r7978 | reply_msg = self.session.send(stream, reply_type, metadata=md, | ||
MinRK
|
r6807 | content=status, parent=msg, ident=idents) | ||
MinRK
|
r6783 | self.log.debug("%s", reply_msg) | ||
epatters
|
r2778 | # We need to wait a bit for requests to come in. This can probably | ||
# be set shorter for true asynchronous clients. | ||||
MinRK
|
r6799 | poller.poll(50) | ||
MinRK
|
r6785 | |||
epatters
|
r2778 | |||
MinRK
|
r4952 | def _no_raw_input(self): | ||
Fernando Perez
|
r5471 | """Raise StdinNotImplentedError if active frontend doesn't support | ||
stdin.""" | ||||
raise StdinNotImplementedError("raw_input was called, but this " | ||||
"frontend does not support stdin.") | ||||
MinRK
|
r16573 | |||
MinRK
|
r16574 | def getpass(self, prompt=''): | ||
"""Forward getpass to frontends | ||||
Raises | ||||
------ | ||||
StdinNotImplentedError if active frontend doesn't support stdin. | ||||
""" | ||||
if not self._allow_stdin: | ||||
raise StdinNotImplementedError( | ||||
"getpass was called, but this frontend does not support input requests." | ||||
) | ||||
return self._input_request(prompt, | ||||
self._parent_ident, | ||||
self._parent_header, | ||||
password=True, | ||||
) | ||||
def raw_input(self, prompt=''): | ||||
"""Forward raw_input to frontends | ||||
Raises | ||||
------ | ||||
StdinNotImplentedError if active frontend doesn't support stdin. | ||||
""" | ||||
if not self._allow_stdin: | ||||
raise StdinNotImplementedError( | ||||
"raw_input was called, but this frontend does not support input requests." | ||||
) | ||||
return self._input_request(prompt, | ||||
self._parent_ident, | ||||
self._parent_header, | ||||
password=False, | ||||
) | ||||
def _input_request(self, prompt, ident, parent, password=False): | ||||
epatters
|
r2730 | # Flush output before making the request. | ||
sys.stderr.flush() | ||||
sys.stdout.flush() | ||||
MinRK
|
r10367 | # flush the stdin socket, to purge stale replies | ||
while True: | ||||
try: | ||||
self.stdin_socket.recv_multipart(zmq.NOBLOCK) | ||||
except zmq.ZMQError as e: | ||||
if e.errno == zmq.EAGAIN: | ||||
break | ||||
else: | ||||
raise | ||||
epatters
|
r2730 | # Send the input request. | ||
MinRK
|
r16573 | content = json_clean(dict(prompt=prompt, password=password)) | ||
Fernando Perez
|
r5471 | self.session.send(self.stdin_socket, u'input_request', content, parent, | ||
ident=ident) | ||||
epatters
|
r2730 | |||
# Await a response. | ||||
MinRK
|
r4521 | while True: | ||
try: | ||||
ident, reply = self.session.recv(self.stdin_socket, 0) | ||||
except Exception: | ||||
self.log.warn("Invalid Message:", exc_info=True) | ||||
MinRK
|
r11706 | except KeyboardInterrupt: | ||
# re-raise KeyboardInterrupt, to truncate traceback | ||||
raise KeyboardInterrupt | ||||
MinRK
|
r4521 | else: | ||
break | ||||
epatters
|
r2730 | try: | ||
MinRK
|
r11154 | value = py3compat.unicode_to_str(reply['content']['value']) | ||
epatters
|
r2730 | except: | ||
MinRK
|
r16574 | self.log.error("Bad input_reply: %s", parent) | ||
epatters
|
r2730 | value = '' | ||
MinRK
|
r5622 | if value == '\x04': | ||
# EOF | ||||
raise EOFError | ||||
epatters
|
r2730 | return value | ||
Bernardo B. Marques
|
r4872 | |||
Fernando Perez
|
r2972 | def _at_shutdown(self): | ||
"""Actions taken at shutdown by the kernel, called by python's atexit. | ||||
""" | ||||
# io.rprint("Kernel at_shutdown") # dbg | ||||
if self._shutdown_message is not None: | ||||
MinRK
|
r6804 | self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown')) | ||
MinRK
|
r6783 | self.log.debug("%s", self._shutdown_message) | ||
MinRK
|
r6804 | [ s.flush(zmq.POLLOUT) for s in self.shell_streams ] | ||
Fernando Perez
|
r2972 | |||