kernelmanager.py
1130 lines
| 39.5 KiB
| text/x-python
|
PythonLexer
Brian Granger
|
r2742 | """Base classes to manage the interaction with a running kernel. | ||
Brian Granger
|
r2606 | |||
epatters
|
r3027 | TODO | ||
Brian Granger
|
r2699 | * Create logger to handle debugging and console messages. | ||
Brian Granger
|
r2606 | """ | ||
Brian Granger
|
r2699 | #----------------------------------------------------------------------------- | ||
Matthias BUSSONNIER
|
r5390 | # Copyright (C) 2008-2011 The IPython Development Team | ||
Brian Granger
|
r2699 | # | ||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r9372 | from __future__ import absolute_import | ||
MinRK
|
r9370 | # Standard library imports | ||
MinRK
|
r6319 | import atexit | ||
MinRK
|
r3928 | import errno | ||
MinRK
|
r5072 | import json | ||
epatters
|
r2667 | from subprocess import Popen | ||
MinRK
|
r4958 | import os | ||
epatters
|
r3027 | import signal | ||
epatters
|
r2995 | import sys | ||
Brian Granger
|
r2606 | from threading import Thread | ||
epatters
|
r2614 | import time | ||
Brian Granger
|
r2606 | |||
MinRK
|
r9370 | # System library imports | ||
Brian Granger
|
r2606 | import zmq | ||
MinRK
|
r5649 | # import ZMQError in top-level namespace, to avoid ugly attribute-error messages | ||
# during garbage collection of threads at exit: | ||||
from zmq import ZMQError | ||||
MinRK
|
r5377 | from zmq.eventloop import ioloop, zmqstream | ||
epatters
|
r2611 | |||
MinRK
|
r9370 | # Local imports | ||
Brian E. Granger
|
r9116 | from IPython.config.configurable import Configurable | ||
MinRK
|
r3144 | from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS | ||
MinRK
|
r4958 | from IPython.utils.traitlets import ( | ||
MinRK
|
r9348 | Any, Instance, Type, Unicode, List, Integer, Bool, CaselessStrEnum | ||
MinRK
|
r4958 | ) | ||
MinRK
|
r4966 | from IPython.utils.py3compat import str_to_bytes | ||
MinRK
|
r9353 | from IPython.kernel import ( | ||
MinRK
|
r9348 | write_connection_file, | ||
MinRK
|
r9350 | make_ipkernel_cmd, | ||
MinRK
|
r9348 | launch_kernel, | ||
) | ||||
MinRK
|
r9376 | from .zmq.session import Session | ||
from .kernelmanagerabc import ( | ||||
Brian Granger
|
r9121 | ShellChannelABC, IOPubChannelABC, | ||
HBChannelABC, StdInChannelABC, | ||||
KernelManagerABC | ||||
) | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | #----------------------------------------------------------------------------- | ||
# Constants and exceptions | ||||
#----------------------------------------------------------------------------- | ||||
class InvalidPortNumber(Exception): | ||||
pass | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | #----------------------------------------------------------------------------- | ||
Fernando Perez
|
r2926 | # Utility functions | ||
#----------------------------------------------------------------------------- | ||||
# some utilities to validate message structure, these might get moved elsewhere | ||||
# if they prove to have more generic utility | ||||
def validate_string_list(lst): | ||||
"""Validate that the input is a list of strings. | ||||
Raises ValueError if not.""" | ||||
if not isinstance(lst, list): | ||||
raise ValueError('input %r must be a list' % lst) | ||||
for x in lst: | ||||
if not isinstance(x, basestring): | ||||
raise ValueError('element %r in list must be a string' % x) | ||||
def validate_string_dict(dct): | ||||
"""Validate that the input is a dict with string keys and values. | ||||
Raises ValueError if not.""" | ||||
for k,v in dct.iteritems(): | ||||
if not isinstance(k, basestring): | ||||
raise ValueError('key %r in dict must be a string' % k) | ||||
if not isinstance(v, basestring): | ||||
raise ValueError('value %r in dict must be a string' % v) | ||||
#----------------------------------------------------------------------------- | ||||
Brian Granger
|
r2699 | # ZMQ Socket Channel classes | ||
#----------------------------------------------------------------------------- | ||||
Brian Granger
|
r2606 | |||
MinRK
|
r3974 | class ZMQSocketChannel(Thread): | ||
Brian E. Granger
|
r9129 | """The base class for the channels that use ZMQ sockets.""" | ||
Brian Granger
|
r2695 | context = None | ||
session = None | ||||
socket = None | ||||
ioloop = None | ||||
MinRK
|
r5377 | stream = None | ||
Brian Granger
|
r2699 | _address = None | ||
MinRK
|
r6319 | _exiting = False | ||
Brian Granger
|
r2699 | |||
def __init__(self, context, session, address): | ||||
Brian E. Granger
|
r9129 | """Create a channel. | ||
Brian Granger
|
r2695 | |||
Brian Granger
|
r2699 | Parameters | ||
---------- | ||||
Brian Granger
|
r2742 | context : :class:`zmq.Context` | ||
Brian Granger
|
r2699 | The ZMQ context to use. | ||
Brian Granger
|
r2742 | session : :class:`session.Session` | ||
Brian Granger
|
r2699 | The session to use. | ||
MinRK
|
r7321 | address : zmq url | ||
Brian Granger
|
r2699 | Standard (ip, port) tuple that the kernel is listening on. | ||
""" | ||||
MinRK
|
r3974 | super(ZMQSocketChannel, self).__init__() | ||
epatters
|
r2632 | self.daemon = True | ||
Brian Granger
|
r2606 | self.context = context | ||
self.session = session | ||||
MinRK
|
r7321 | if isinstance(address, tuple): | ||
if address[1] == 0: | ||||
message = 'The port number for a channel cannot be 0.' | ||||
raise InvalidPortNumber(message) | ||||
address = "tcp://%s:%i" % address | ||||
Brian Granger
|
r2699 | self._address = address | ||
MinRK
|
r6319 | atexit.register(self._notice_exit) | ||
def _notice_exit(self): | ||||
self._exiting = True | ||||
epatters
|
r2631 | |||
MinRK
|
r3928 | def _run_loop(self): | ||
"""Run my loop, ignoring EINTR events in the poller""" | ||||
while True: | ||||
try: | ||||
self.ioloop.start() | ||||
MinRK
|
r5649 | except ZMQError as e: | ||
MinRK
|
r3928 | if e.errno == errno.EINTR: | ||
continue | ||||
else: | ||||
raise | ||||
MinRK
|
r6319 | except Exception: | ||
if self._exiting: | ||||
break | ||||
else: | ||||
raise | ||||
MinRK
|
r3928 | else: | ||
break | ||||
Bernardo B. Marques
|
r4872 | |||
epatters
|
r2632 | def stop(self): | ||
Brian E. Granger
|
r9129 | """Stop the channel's event loop and join its thread. | ||
Brian Granger
|
r2691 | |||
Brian Granger
|
r2699 | This calls :method:`Thread.join` and returns when the thread | ||
Bernardo B. Marques
|
r4872 | terminates. :class:`RuntimeError` will be raised if | ||
Brian Granger
|
r2699 | :method:`self.start` is called again. | ||
epatters
|
r2632 | """ | ||
epatters
|
r2642 | self.join() | ||
Brian Granger
|
r2699 | @property | ||
def address(self): | ||||
Brian E. Granger
|
r9129 | """Get the channel's address as a zmq url string. | ||
These URLS have the form: 'tcp://127.0.0.1:5555'. | ||||
epatters
|
r2632 | """ | ||
return self._address | ||||
MinRK
|
r5377 | def _queue_send(self, msg): | ||
"""Queue a message to be sent from the IOLoop's thread. | ||||
Brian Granger
|
r2699 | Parameters | ||
---------- | ||||
MinRK
|
r5377 | msg : message to send | ||
This is threadsafe, as it uses IOLoop.add_callback to give the loop's | ||||
thread control of the action. | ||||
Brian Granger
|
r2695 | """ | ||
MinRK
|
r5377 | def thread_send(): | ||
self.session.send(self.stream, msg) | ||||
self.ioloop.add_callback(thread_send) | ||||
Brian Granger
|
r2699 | |||
MinRK
|
r5377 | def _handle_recv(self, msg): | ||
Brian E. Granger
|
r9129 | """Callback for stream.on_recv. | ||
Unpacks message, and calls handlers with it. | ||||
Brian Granger
|
r2695 | """ | ||
MinRK
|
r5377 | ident,smsg = self.session.feed_identities(msg) | ||
self.call_handlers(self.session.unserialize(smsg)) | ||||
Brian Granger
|
r2695 | |||
Brian Granger
|
r2606 | |||
Brian Granger
|
r9120 | class ShellChannel(ZMQSocketChannel): | ||
Brian E. Granger
|
r9129 | """The shell channel for issuing request/replies to the kernel.""" | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | command_queue = None | ||
MinRK
|
r4952 | # flag for whether execute requests should be allowed to call raw_input: | ||
allow_stdin = True | ||||
Brian Granger
|
r2699 | |||
def __init__(self, context, session, address): | ||||
Brian Granger
|
r9120 | super(ShellChannel, self).__init__(context, session, address) | ||
epatters
|
r2996 | self.ioloop = ioloop.IOLoop() | ||
Brian Granger
|
r2606 | |||
def run(self): | ||||
Brian Granger
|
r2699 | """The thread's main activity. Call start() instead.""" | ||
MinRK
|
r4725 | self.socket = self.context.socket(zmq.DEALER) | ||
MinRK
|
r4770 | self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) | ||
MinRK
|
r7321 | self.socket.connect(self.address) | ||
MinRK
|
r5377 | self.stream = zmqstream.ZMQStream(self.socket, self.ioloop) | ||
self.stream.on_recv(self._handle_recv) | ||||
MinRK
|
r3928 | self._run_loop() | ||
MinRK
|
r6564 | try: | ||
self.socket.close() | ||||
except: | ||||
pass | ||||
Brian Granger
|
r2606 | |||
epatters
|
r2632 | def stop(self): | ||
Brian E. Granger
|
r9129 | """Stop the channel's event loop and join its thread.""" | ||
epatters
|
r2632 | self.ioloop.stop() | ||
Brian Granger
|
r9120 | super(ShellChannel, self).stop() | ||
Brian Granger
|
r2606 | |||
def call_handlers(self, msg): | ||||
Brian Granger
|
r2692 | """This method is called in the ioloop thread when a message arrives. | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2692 | Subclasses should override this method to handle incoming messages. | ||
It is important to remember that this method is called in the thread | ||||
so that some logic must be done to ensure that the application leve | ||||
handlers are called in the application thread. | ||||
""" | ||||
raise NotImplementedError('call_handlers must be defined in a subclass.') | ||||
Brian Granger
|
r2606 | |||
Jason Grout
|
r8000 | def execute(self, code, silent=False, store_history=True, | ||
MinRK
|
r4952 | user_variables=None, user_expressions=None, allow_stdin=None): | ||
Brian Granger
|
r2699 | """Execute code in the kernel. | ||
epatters
|
r2672 | |||
Parameters | ||||
---------- | ||||
Brian Granger
|
r2699 | code : str | ||
A string of Python code. | ||||
Bernardo B. Marques
|
r4872 | |||
epatters
|
r2844 | silent : bool, optional (default False) | ||
Jason Grout
|
r8000 | If set, the kernel will execute the code as quietly possible, and | ||
will force store_history to be False. | ||||
store_history : bool, optional (default True) | ||||
Jason Grout
|
r8001 | If set, the kernel will store command history. This is forced | ||
Jason Grout
|
r8000 | to be False if silent is True. | ||
epatters
|
r2614 | |||
Fernando Perez
|
r2926 | user_variables : list, optional | ||
A list of variable names to pull from the user's namespace. They | ||||
will come back as a dict with these names as keys and their | ||||
:func:`repr` as values. | ||||
Bernardo B. Marques
|
r4872 | |||
Fernando Perez
|
r2926 | user_expressions : dict, optional | ||
epatters
|
r8349 | A dict mapping names to expressions to be evaluated in the user's | ||
dict. The expression values are returned as strings formatted using | ||||
:func:`repr`. | ||||
allow_stdin : bool, optional (default self.allow_stdin) | ||||
Flag for whether the kernel can send stdin requests to frontends. | ||||
Some frontends (e.g. the Notebook) do not support stdin requests. | ||||
If raw_input is called from code executed from such a frontend, a | ||||
StdinNotImplementedError will be raised. | ||||
MinRK
|
r4952 | |||
Brian Granger
|
r2699 | Returns | ||
------- | ||||
The msg_id of the message sent. | ||||
""" | ||||
Fernando Perez
|
r2926 | if user_variables is None: | ||
user_variables = [] | ||||
if user_expressions is None: | ||||
user_expressions = {} | ||||
MinRK
|
r4952 | if allow_stdin is None: | ||
allow_stdin = self.allow_stdin | ||||
Fernando Perez
|
r2926 | # Don't waste network traffic if inputs are invalid | ||
if not isinstance(code, basestring): | ||||
raise ValueError('code %r must be a string' % code) | ||||
validate_string_list(user_variables) | ||||
validate_string_dict(user_expressions) | ||||
Brian Granger
|
r2699 | # Create class for content/msg creation. Related to, but possibly | ||
# not in Session. | ||||
Jason Grout
|
r8000 | content = dict(code=code, silent=silent, store_history=store_history, | ||
Fernando Perez
|
r2926 | user_variables=user_variables, | ||
MinRK
|
r4952 | user_expressions=user_expressions, | ||
allow_stdin=allow_stdin, | ||||
) | ||||
Brian Granger
|
r2699 | msg = self.session.msg('execute_request', content) | ||
MinRK
|
r5377 | self._queue_send(msg) | ||
Brian Granger
|
r2699 | return msg['header']['msg_id'] | ||
Brian Granger
|
r2606 | |||
Fernando Perez
|
r2839 | def complete(self, text, line, cursor_pos, block=None): | ||
epatters
|
r2841 | """Tab complete text in the kernel's namespace. | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | Parameters | ||
---------- | ||||
text : str | ||||
The text to complete. | ||||
line : str | ||||
Bernardo B. Marques
|
r4872 | The full line of text that is the surrounding context for the | ||
Brian Granger
|
r2699 | text to complete. | ||
epatters
|
r2841 | cursor_pos : int | ||
The position of the cursor in the line where the completion was | ||||
requested. | ||||
block : str, optional | ||||
Brian Granger
|
r2699 | The full block of code in which the completion is being requested. | ||
Returns | ||||
------- | ||||
The msg_id of the message sent. | ||||
""" | ||||
Fernando Perez
|
r2839 | content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos) | ||
Brian Granger
|
r2699 | msg = self.session.msg('complete_request', content) | ||
MinRK
|
r5377 | self._queue_send(msg) | ||
Brian Granger
|
r2699 | return msg['header']['msg_id'] | ||
Brian Granger
|
r2606 | |||
MinRK
|
r6556 | def object_info(self, oname, detail_level=0): | ||
Brian E. Granger
|
r9129 | """Get metadata information about an object in the kernel's namespace. | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | Parameters | ||
---------- | ||||
oname : str | ||||
A string specifying the object name. | ||||
MinRK
|
r6556 | detail_level : int, optional | ||
The level of detail for the introspection (0-2) | ||||
Bernardo B. Marques
|
r4872 | |||
Brian Granger
|
r2699 | Returns | ||
------- | ||||
The msg_id of the message sent. | ||||
""" | ||||
MinRK
|
r6556 | content = dict(oname=oname, detail_level=detail_level) | ||
Brian Granger
|
r2699 | msg = self.session.msg('object_info_request', content) | ||
MinRK
|
r5377 | self._queue_send(msg) | ||
Brian Granger
|
r2699 | return msg['header']['msg_id'] | ||
epatters
|
r2632 | |||
Thomas Kluyver
|
r3819 | def history(self, raw=True, output=False, hist_access_type='range', **kwargs): | ||
Brian E. Granger
|
r9129 | """Get entries from the kernel's history list. | ||
epatters
|
r2844 | |||
Parameters | ||||
---------- | ||||
raw : bool | ||||
If True, return the raw input. | ||||
output : bool | ||||
If True, then return the output as well. | ||||
Thomas Kluyver
|
r3819 | hist_access_type : str | ||
'range' (fill in session, start and stop params), 'tail' (fill in n) | ||||
or 'search' (fill in pattern param). | ||||
Bernardo B. Marques
|
r4872 | |||
Thomas Kluyver
|
r3819 | session : int | ||
For a range request, the session from which to get lines. Session | ||||
numbers are positive integers; negative ones count back from the | ||||
current session. | ||||
start : int | ||||
The first line number of a history range. | ||||
stop : int | ||||
The final (excluded) line number of a history range. | ||||
Bernardo B. Marques
|
r4872 | |||
Thomas Kluyver
|
r3819 | n : int | ||
The number of lines of history to get for a tail request. | ||||
Bernardo B. Marques
|
r4872 | |||
Thomas Kluyver
|
r3819 | pattern : str | ||
The glob-syntax pattern for a search request. | ||||
epatters
|
r2844 | |||
Returns | ||||
------- | ||||
The msg_id of the message sent. | ||||
""" | ||||
Thomas Kluyver
|
r3819 | content = dict(raw=raw, output=output, hist_access_type=hist_access_type, | ||
**kwargs) | ||||
Thomas Kluyver
|
r3817 | msg = self.session.msg('history_request', content) | ||
MinRK
|
r5377 | self._queue_send(msg) | ||
epatters
|
r2844 | return msg['header']['msg_id'] | ||
Takafumi Arakaki
|
r8905 | def kernel_info(self): | ||
"""Request kernel info.""" | ||||
Takafumi Arakaki
|
r8879 | msg = self.session.msg('kernel_info_request') | ||
Takafumi Arakaki
|
r8831 | self._queue_send(msg) | ||
return msg['header']['msg_id'] | ||||
MinRK
|
r3089 | def shutdown(self, restart=False): | ||
Fernando Perez
|
r2972 | """Request an immediate kernel shutdown. | ||
Upon receipt of the (empty) reply, client code can safely assume that | ||||
the kernel has shut down and it's safe to forcefully terminate it if | ||||
it's still alive. | ||||
The kernel will send the reply via a function registered with Python's | ||||
atexit module, ensuring it's truly done as the kernel is done with all | ||||
normal operation. | ||||
""" | ||||
# Send quit message to kernel. Once we implement kernel-side setattr, | ||||
# this should probably be done that way, but for now this will do. | ||||
MinRK
|
r3089 | msg = self.session.msg('shutdown_request', {'restart':restart}) | ||
MinRK
|
r5377 | self._queue_send(msg) | ||
Fernando Perez
|
r2972 | return msg['header']['msg_id'] | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | |||
Brian Granger
|
r9120 | class IOPubChannel(ZMQSocketChannel): | ||
Brian E. Granger
|
r9129 | """The iopub channel which listens for messages that the kernel publishes. | ||
This channel is where all output is published to frontends. | ||||
Brian Granger
|
r2699 | """ | ||
def __init__(self, context, session, address): | ||||
Brian Granger
|
r9120 | super(IOPubChannel, self).__init__(context, session, address) | ||
epatters
|
r2996 | self.ioloop = ioloop.IOLoop() | ||
Brian Granger
|
r2699 | |||
def run(self): | ||||
"""The thread's main activity. Call start() instead.""" | ||||
self.socket = self.context.socket(zmq.SUB) | ||||
Thomas Kluyver
|
r4735 | self.socket.setsockopt(zmq.SUBSCRIBE,b'') | ||
MinRK
|
r4770 | self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) | ||
MinRK
|
r7321 | self.socket.connect(self.address) | ||
MinRK
|
r5377 | self.stream = zmqstream.ZMQStream(self.socket, self.ioloop) | ||
self.stream.on_recv(self._handle_recv) | ||||
MinRK
|
r3928 | self._run_loop() | ||
MinRK
|
r6564 | try: | ||
self.socket.close() | ||||
except: | ||||
pass | ||||
Brian Granger
|
r2699 | |||
def stop(self): | ||||
Brian E. Granger
|
r9129 | """Stop the channel's event loop and join its thread.""" | ||
Brian Granger
|
r2699 | self.ioloop.stop() | ||
Brian Granger
|
r9120 | super(IOPubChannel, self).stop() | ||
Brian Granger
|
r2699 | |||
Brian Granger
|
r2697 | def call_handlers(self, msg): | ||
"""This method is called in the ioloop thread when a message arrives. | ||||
Subclasses should override this method to handle incoming messages. | ||||
It is important to remember that this method is called in the thread | ||||
so that some logic must be done to ensure that the application leve | ||||
handlers are called in the application thread. | ||||
""" | ||||
raise NotImplementedError('call_handlers must be defined in a subclass.') | ||||
Brian Granger
|
r2699 | def flush(self, timeout=1.0): | ||
Brian E. Granger
|
r9129 | """Immediately processes all pending messages on the iopub channel. | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2742 | Callers should use this method to ensure that :method:`call_handlers` | ||
has been called for all messages that have been received on the | ||||
0MQ SUB socket of this channel. | ||||
Brian Granger
|
r2699 | This method is thread safe. | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | Parameters | ||
---------- | ||||
timeout : float, optional | ||||
The maximum amount of time to spend flushing, in seconds. The | ||||
default is one second. | ||||
""" | ||||
# We do the IOLoop callback process twice to ensure that the IOLoop | ||||
# gets to perform at least one full poll. | ||||
stop_time = time.time() + timeout | ||||
for i in xrange(2): | ||||
self._flushed = False | ||||
self.ioloop.add_callback(self._flush) | ||||
while not self._flushed and time.time() < stop_time: | ||||
time.sleep(0.01) | ||||
def _flush(self): | ||||
"""Callback for :method:`self.flush`.""" | ||||
MinRK
|
r5377 | self.stream.flush() | ||
Brian Granger
|
r2699 | self._flushed = True | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r9120 | class StdInChannel(ZMQSocketChannel): | ||
Brian E. Granger
|
r9129 | """The stdin channel to handle raw_input requests that the kernel makes.""" | ||
Brian Granger
|
r2606 | |||
epatters
|
r2707 | msg_queue = None | ||
def __init__(self, context, session, address): | ||||
Brian Granger
|
r9120 | super(StdInChannel, self).__init__(context, session, address) | ||
epatters
|
r2996 | self.ioloop = ioloop.IOLoop() | ||
epatters
|
r2707 | |||
epatters
|
r2701 | def run(self): | ||
"""The thread's main activity. Call start() instead.""" | ||||
MinRK
|
r4725 | self.socket = self.context.socket(zmq.DEALER) | ||
MinRK
|
r4770 | self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) | ||
MinRK
|
r7321 | self.socket.connect(self.address) | ||
MinRK
|
r5377 | self.stream = zmqstream.ZMQStream(self.socket, self.ioloop) | ||
self.stream.on_recv(self._handle_recv) | ||||
MinRK
|
r3928 | self._run_loop() | ||
MinRK
|
r6564 | try: | ||
self.socket.close() | ||||
except: | ||||
pass | ||||
epatters
|
r2701 | |||
def stop(self): | ||||
Brian E. Granger
|
r9129 | """Stop the channel's event loop and join its thread.""" | ||
epatters
|
r2701 | self.ioloop.stop() | ||
Brian Granger
|
r9120 | super(StdInChannel, self).stop() | ||
Brian Granger
|
r2606 | |||
epatters
|
r2707 | def call_handlers(self, msg): | ||
"""This method is called in the ioloop thread when a message arrives. | ||||
Subclasses should override this method to handle incoming messages. | ||||
It is important to remember that this method is called in the thread | ||||
so that some logic must be done to ensure that the application leve | ||||
handlers are called in the application thread. | ||||
""" | ||||
raise NotImplementedError('call_handlers must be defined in a subclass.') | ||||
epatters
|
r2730 | def input(self, string): | ||
"""Send a string of raw input to the kernel.""" | ||||
content = dict(value=string) | ||||
msg = self.session.msg('input_reply', content) | ||||
MinRK
|
r5377 | self._queue_send(msg) | ||
epatters
|
r2611 | |||
Brian Granger
|
r9120 | class HBChannel(ZMQSocketChannel): | ||
epatters
|
r3032 | """The heartbeat channel which monitors the kernel heartbeat. | ||
Note that the heartbeat channel is paused by default. As long as you start | ||||
this channel, the kernel manager will ensure that it is paused and un-paused | ||||
as appropriate. | ||||
""" | ||||
Brian Granger
|
r2910 | |||
Brian Granger
|
r2925 | time_to_dead = 3.0 | ||
Brian Granger
|
r2910 | socket = None | ||
poller = None | ||||
Brian Granger
|
r3023 | _running = None | ||
_pause = None | ||||
MinRK
|
r5614 | _beating = None | ||
Brian Granger
|
r2910 | |||
def __init__(self, context, session, address): | ||||
Brian Granger
|
r9120 | super(HBChannel, self).__init__(context, session, address) | ||
epatters
|
r2915 | self._running = False | ||
MinRK
|
r5614 | self._pause =True | ||
self.poller = zmq.Poller() | ||||
Brian Granger
|
r2910 | |||
def _create_socket(self): | ||||
MinRK
|
r5614 | if self.socket is not None: | ||
# close previous socket, before opening a new one | ||||
self.poller.unregister(self.socket) | ||||
self.socket.close() | ||||
Brian Granger
|
r2910 | self.socket = self.context.socket(zmq.REQ) | ||
MinRK
|
r5614 | self.socket.setsockopt(zmq.LINGER, 0) | ||
MinRK
|
r7321 | self.socket.connect(self.address) | ||
MinRK
|
r5614 | |||
Brian Granger
|
r2910 | self.poller.register(self.socket, zmq.POLLIN) | ||
MinRK
|
r5614 | |||
def _poll(self, start_time): | ||||
Brian E. Granger
|
r9129 | """poll for heartbeat replies until we reach self.time_to_dead. | ||
MinRK
|
r5614 | |||
Ignores interrupts, and returns the result of poll(), which | ||||
will be an empty list if no messages arrived before the timeout, | ||||
or the event tuple if there is a message to receive. | ||||
""" | ||||
until_dead = self.time_to_dead - (time.time() - start_time) | ||||
# ensure poll at least once | ||||
until_dead = max(until_dead, 1e-3) | ||||
events = [] | ||||
while True: | ||||
try: | ||||
events = self.poller.poll(1000 * until_dead) | ||||
MinRK
|
r5649 | except ZMQError as e: | ||
MinRK
|
r5614 | if e.errno == errno.EINTR: | ||
# ignore interrupts during heartbeat | ||||
# this may never actually happen | ||||
until_dead = self.time_to_dead - (time.time() - start_time) | ||||
until_dead = max(until_dead, 1e-3) | ||||
pass | ||||
else: | ||||
raise | ||||
MinRK
|
r6319 | except Exception: | ||
if self._exiting: | ||||
break | ||||
else: | ||||
raise | ||||
MinRK
|
r5614 | else: | ||
break | ||||
return events | ||||
Brian Granger
|
r2910 | |||
def run(self): | ||||
"""The thread's main activity. Call start() instead.""" | ||||
self._create_socket() | ||||
epatters
|
r2915 | self._running = True | ||
MinRK
|
r5614 | self._beating = True | ||
epatters
|
r2915 | while self._running: | ||
Brian Granger
|
r3023 | if self._pause: | ||
MinRK
|
r5614 | # just sleep, and skip the rest of the loop | ||
Brian Granger
|
r3023 | time.sleep(self.time_to_dead) | ||
MinRK
|
r5614 | continue | ||
since_last_heartbeat = 0.0 | ||||
# io.rprint('Ping from HB channel') # dbg | ||||
# no need to catch EFSM here, because the previous event was | ||||
# either a recv or connect, which cannot be followed by EFSM | ||||
self.socket.send(b'ping') | ||||
request_time = time.time() | ||||
ready = self._poll(request_time) | ||||
if ready: | ||||
self._beating = True | ||||
# the poll above guarantees we have something to recv | ||||
self.socket.recv() | ||||
# sleep the remainder of the cycle | ||||
remainder = self.time_to_dead - (time.time() - request_time) | ||||
if remainder > 0: | ||||
time.sleep(remainder) | ||||
continue | ||||
Brian Granger
|
r2910 | else: | ||
MinRK
|
r5614 | # nothing was received within the time limit, signal heart failure | ||
self._beating = False | ||||
since_last_heartbeat = time.time() - request_time | ||||
self.call_handlers(since_last_heartbeat) | ||||
# and close/reopen the socket, because the REQ/REP cycle has been broken | ||||
self._create_socket() | ||||
continue | ||||
MinRK
|
r6564 | try: | ||
self.socket.close() | ||||
except: | ||||
pass | ||||
Brian Granger
|
r3023 | |||
def pause(self): | ||||
"""Pause the heartbeat.""" | ||||
self._pause = True | ||||
def unpause(self): | ||||
"""Unpause the heartbeat.""" | ||||
self._pause = False | ||||
def is_beating(self): | ||||
MinRK
|
r5614 | """Is the heartbeat running and responsive (and not paused).""" | ||
if self.is_alive() and not self._pause and self._beating: | ||||
Brian Granger
|
r3023 | return True | ||
else: | ||||
return False | ||||
Brian Granger
|
r2910 | |||
epatters
|
r2915 | def stop(self): | ||
Brian E. Granger
|
r9129 | """Stop the channel's event loop and join its thread.""" | ||
epatters
|
r2915 | self._running = False | ||
Brian Granger
|
r9120 | super(HBChannel, self).stop() | ||
epatters
|
r2915 | |||
Brian Granger
|
r2910 | def call_handlers(self, since_last_heartbeat): | ||
"""This method is called in the ioloop thread when a message arrives. | ||||
Subclasses should override this method to handle incoming messages. | ||||
It is important to remember that this method is called in the thread | ||||
MinRK
|
r5614 | so that some logic must be done to ensure that the application level | ||
Brian Granger
|
r2910 | handlers are called in the application thread. | ||
""" | ||||
raise NotImplementedError('call_handlers must be defined in a subclass.') | ||||
Brian Granger
|
r2699 | #----------------------------------------------------------------------------- | ||
# Main kernel manager class | ||||
#----------------------------------------------------------------------------- | ||||
Brian E. Granger
|
r9116 | class KernelManager(Configurable): | ||
Brian E. Granger
|
r9129 | """Manages a single kernel on this host along with its channels. | ||
epatters
|
r2611 | |||
Brian E. Granger
|
r9129 | There are four channels associated with each kernel: | ||
Bernardo B. Marques
|
r4872 | |||
Brian E. Granger
|
r9129 | * shell: for request/reply calls to the kernel. | ||
* iopub: for the kernel to publish results to frontends. | ||||
* hb: for monitoring the kernel's heartbeat. | ||||
* stdin: for frontends to reply to raw_input calls in the kernel. | ||||
Bernardo B. Marques
|
r4872 | |||
Brian E. Granger
|
r9129 | The usage of the channels that this class manages is optional. It is | ||
entirely possible to connect to the kernels directly using ZeroMQ | ||||
sockets. These channels are useful primarily for talking to a kernel | ||||
whose :class:`KernelManager` is in the same process. | ||||
This version manages kernels started using Popen. | ||||
epatters
|
r2631 | """ | ||
epatters
|
r2611 | # The PyZMQ Context to use for communication with the kernel. | ||
MinRK
|
r4015 | context = Instance(zmq.Context) | ||
def _context_default(self): | ||||
return zmq.Context.instance() | ||||
epatters
|
r2611 | |||
# The Session to use for communication with the kernel. | ||||
MinRK
|
r4015 | session = Instance(Session) | ||
epatters
|
r8408 | def _session_default(self): | ||
return Session(config=self.config) | ||||
epatters
|
r2611 | |||
epatters
|
r2730 | # The kernel process with which the KernelManager is communicating. | ||
MinRK
|
r9348 | # generally a Popen instance | ||
kernel = Any() | ||||
kernel_cmd = List(Unicode, config=True, | ||||
help="""The Popen Command to launch the kernel. | ||||
Override this if you have a custom | ||||
""" | ||||
) | ||||
def _kernel_cmd_changed(self, name, old, new): | ||||
self.ipython_kernel = False | ||||
ipython_kernel = Bool(True) | ||||
epatters
|
r2730 | |||
Bernardo B. Marques
|
r4872 | # The addresses for the communication channels. | ||
MinRK
|
r4958 | connection_file = Unicode('') | ||
MinRK
|
r7321 | |||
Brian E. Granger
|
r9116 | transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True) | ||
MinRK
|
r9175 | ip = Unicode(LOCALHOST, config=True, | ||
help="""Set the kernel\'s IP address [default localhost]. | ||||
If the IP address is something other than localhost, then | ||||
Consoles on other machines will be able to connect | ||||
to the Kernel, so be careful!""" | ||||
) | ||||
def _ip_default(self): | ||||
if self.transport == 'ipc': | ||||
if self.connection_file: | ||||
return os.path.splitext(self.connection_file)[0] + '-ipc' | ||||
else: | ||||
return 'kernel-ipc' | ||||
else: | ||||
return LOCALHOST | ||||
MinRK
|
r5170 | def _ip_changed(self, name, old, new): | ||
if new == '*': | ||||
self.ip = '0.0.0.0' | ||||
MinRK
|
r5344 | shell_port = Integer(0) | ||
iopub_port = Integer(0) | ||||
stdin_port = Integer(0) | ||||
hb_port = Integer(0) | ||||
epatters
|
r2758 | |||
epatters
|
r2611 | # The classes to use for the various channels. | ||
Brian Granger
|
r9120 | shell_channel_class = Type(ShellChannel) | ||
iopub_channel_class = Type(IOPubChannel) | ||||
stdin_channel_class = Type(StdInChannel) | ||||
hb_channel_class = Type(HBChannel) | ||||
Brian Granger
|
r3046 | |||
epatters
|
r2611 | # Protected traits. | ||
epatters
|
r2851 | _launch_args = Any | ||
MinRK
|
r3974 | _shell_channel = Any | ||
Brian Granger
|
r9120 | _iopub_channel = Any | ||
MinRK
|
r3974 | _stdin_channel = Any | ||
Brian Granger
|
r2910 | _hb_channel = Any | ||
MinRK
|
r4958 | _connection_file_written=Bool(False) | ||
MinRK
|
r9175 | |||
def __del__(self): | ||||
Brian E. Granger
|
r9154 | self.cleanup_connection_file() | ||
Brian E. Granger
|
r9152 | |||
epatters
|
r2758 | #-------------------------------------------------------------------------- | ||
epatters
|
r2686 | # Channel management methods: | ||
#-------------------------------------------------------------------------- | ||||
epatters
|
r2611 | |||
Brian Granger
|
r9121 | def start_channels(self, shell=True, iopub=True, stdin=True, hb=True): | ||
epatters
|
r3032 | """Starts the channels for this kernel. | ||
Brian Granger
|
r2699 | |||
This will create the channels if they do not exist and then start | ||||
Brian E. Granger
|
r9129 | them (their activity runs in a thread). If port numbers of 0 are | ||
being used (random ports) then you must first call | ||||
:method:`start_kernel`. If the channels have been stopped and you | ||||
call this, :class:`RuntimeError` will be raised. | ||||
epatters
|
r2639 | """ | ||
MinRK
|
r3974 | if shell: | ||
self.shell_channel.start() | ||||
Brian Granger
|
r9121 | if iopub: | ||
Brian Granger
|
r9120 | self.iopub_channel.start() | ||
MinRK
|
r3974 | if stdin: | ||
self.stdin_channel.start() | ||||
MinRK
|
r4952 | self.shell_channel.allow_stdin = True | ||
else: | ||||
self.shell_channel.allow_stdin = False | ||||
epatters
|
r3032 | if hb: | ||
self.hb_channel.start() | ||||
epatters
|
r2639 | |||
Brian Granger
|
r2699 | def stop_channels(self): | ||
epatters
|
r2994 | """Stops all the running channels for this kernel. | ||
Brian E. Granger
|
r9129 | |||
This stops their event loops and joins their threads. | ||||
Brian Granger
|
r2699 | """ | ||
MinRK
|
r3974 | if self.shell_channel.is_alive(): | ||
self.shell_channel.stop() | ||||
Brian Granger
|
r9120 | if self.iopub_channel.is_alive(): | ||
self.iopub_channel.stop() | ||||
MinRK
|
r3974 | if self.stdin_channel.is_alive(): | ||
self.stdin_channel.stop() | ||||
epatters
|
r3032 | if self.hb_channel.is_alive(): | ||
self.hb_channel.stop() | ||||
epatters
|
r2686 | |||
Brian Granger
|
r2699 | @property | ||
def channels_running(self): | ||||
epatters
|
r2994 | """Are any of the channels created and running?""" | ||
Brian Granger
|
r9120 | return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or | ||
MinRK
|
r3974 | self.stdin_channel.is_alive() or self.hb_channel.is_alive()) | ||
epatters
|
r2639 | |||
Brian Granger
|
r9121 | def _make_url(self, port): | ||
Brian E. Granger
|
r9129 | """Make a zmq url with a port. | ||
There are two cases that this handles: | ||||
* tcp: tcp://ip:port | ||||
* ipc: ipc://ip-port | ||||
""" | ||||
Brian Granger
|
r9121 | if self.transport == 'tcp': | ||
return "tcp://%s:%i" % (self.ip, port) | ||||
else: | ||||
return "%s://%s-%s" % (self.transport, self.ip, port) | ||||
@property | ||||
def shell_channel(self): | ||||
Brian E. Granger
|
r9129 | """Get the shell channel object for this kernel.""" | ||
Brian Granger
|
r9121 | if self._shell_channel is None: | ||
Brian E. Granger
|
r9129 | self._shell_channel = self.shell_channel_class( | ||
self.context, self.session, self._make_url(self.shell_port) | ||||
Brian Granger
|
r9121 | ) | ||
return self._shell_channel | ||||
@property | ||||
def iopub_channel(self): | ||||
Brian E. Granger
|
r9129 | """Get the iopub channel object for this kernel.""" | ||
Brian Granger
|
r9121 | if self._iopub_channel is None: | ||
Brian E. Granger
|
r9129 | self._iopub_channel = self.iopub_channel_class( | ||
self.context, self.session, self._make_url(self.iopub_port) | ||||
Brian Granger
|
r9121 | ) | ||
return self._iopub_channel | ||||
@property | ||||
def stdin_channel(self): | ||||
Brian E. Granger
|
r9129 | """Get the stdin channel object for this kernel.""" | ||
Brian Granger
|
r9121 | if self._stdin_channel is None: | ||
Brian E. Granger
|
r9129 | self._stdin_channel = self.stdin_channel_class( | ||
self.context, self.session, self._make_url(self.stdin_port) | ||||
Brian Granger
|
r9121 | ) | ||
return self._stdin_channel | ||||
@property | ||||
def hb_channel(self): | ||||
Brian E. Granger
|
r9129 | """Get the hb channel object for this kernel.""" | ||
Brian Granger
|
r9121 | if self._hb_channel is None: | ||
Brian E. Granger
|
r9129 | self._hb_channel = self.hb_channel_class( | ||
self.context, self.session, self._make_url(self.hb_port) | ||||
Brian Granger
|
r9121 | ) | ||
return self._hb_channel | ||||
epatters
|
r2686 | #-------------------------------------------------------------------------- | ||
Brian E. Granger
|
r9151 | # Connection and ipc file management | ||
epatters
|
r2686 | #-------------------------------------------------------------------------- | ||
MinRK
|
r4958 | |||
MinRK
|
r5609 | def cleanup_connection_file(self): | ||
Brian E. Granger
|
r9129 | """Cleanup connection file *if we wrote it* | ||
MinRK
|
r5609 | |||
Will not raise if the connection file was already removed somehow. | ||||
""" | ||||
if self._connection_file_written: | ||||
# cleanup connection files on full shutdown of kernel we started | ||||
self._connection_file_written = False | ||||
try: | ||||
os.remove(self.connection_file) | ||||
MinRK
|
r9513 | except (IOError, OSError, AttributeError): | ||
MinRK
|
r7321 | pass | ||
Brian Granger
|
r9119 | def cleanup_ipc_files(self): | ||
Brian E. Granger
|
r9129 | """Cleanup ipc files if we wrote them.""" | ||
MinRK
|
r7321 | if self.transport != 'ipc': | ||
return | ||||
for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port): | ||||
ipcfile = "%s-%i" % (self.ip, port) | ||||
try: | ||||
os.remove(ipcfile) | ||||
except (IOError, OSError): | ||||
MinRK
|
r5609 | pass | ||
Brian E. Granger
|
r9116 | |||
MinRK
|
r4966 | def load_connection_file(self): | ||
Brian E. Granger
|
r9129 | """Load connection info from JSON dict in self.connection_file.""" | ||
MinRK
|
r4966 | with open(self.connection_file) as f: | ||
cfg = json.loads(f.read()) | ||||
Brian E. Granger
|
r9116 | |||
MinRK
|
r7321 | from pprint import pprint | ||
pprint(cfg) | ||||
self.transport = cfg.get('transport', 'tcp') | ||||
MinRK
|
r4966 | self.ip = cfg['ip'] | ||
self.shell_port = cfg['shell_port'] | ||||
self.stdin_port = cfg['stdin_port'] | ||||
self.iopub_port = cfg['iopub_port'] | ||||
self.hb_port = cfg['hb_port'] | ||||
self.session.key = str_to_bytes(cfg['key']) | ||||
MinRK
|
r4958 | def write_connection_file(self): | ||
Brian E. Granger
|
r9129 | """Write connection info to JSON dict in self.connection_file.""" | ||
MinRK
|
r4958 | if self._connection_file_written: | ||
return | ||||
self.connection_file,cfg = write_connection_file(self.connection_file, | ||||
MinRK
|
r7321 | transport=self.transport, ip=self.ip, key=self.session.key, | ||
MinRK
|
r4959 | stdin_port=self.stdin_port, iopub_port=self.iopub_port, | ||
MinRK
|
r4958 | shell_port=self.shell_port, hb_port=self.hb_port) | ||
# write_connection_file also sets default ports: | ||||
self.shell_port = cfg['shell_port'] | ||||
self.stdin_port = cfg['stdin_port'] | ||||
MinRK
|
r4959 | self.iopub_port = cfg['iopub_port'] | ||
MinRK
|
r4958 | self.hb_port = cfg['hb_port'] | ||
self._connection_file_written = True | ||||
Brian Granger
|
r9121 | |||
#-------------------------------------------------------------------------- | ||||
Brian E. Granger
|
r9151 | # Kernel management | ||
Brian Granger
|
r9121 | #-------------------------------------------------------------------------- | ||
MinRK
|
r9348 | |||
def format_kernel_cmd(self, **kw): | ||||
"""format templated args (e.g. {connection_file})""" | ||||
if self.kernel_cmd: | ||||
cmd = self.kernel_cmd | ||||
else: | ||||
MinRK
|
r9350 | cmd = make_ipkernel_cmd( | ||
MinRK
|
r9372 | 'from IPython.kernel.zmq.kernelapp import main; main()', | ||
MinRK
|
r9348 | **kw | ||
) | ||||
ns = dict(connection_file=self.connection_file) | ||||
ns.update(self._launch_args) | ||||
return [ c.format(**ns) for c in cmd ] | ||||
MinRK
|
r9349 | |||
def _launch_kernel(self, kernel_cmd, **kw): | ||||
"""actually launch the kernel | ||||
override in a subclass to launch kernel subprocesses differently | ||||
""" | ||||
return launch_kernel(kernel_cmd, **kw) | ||||
epatters
|
r2851 | def start_kernel(self, **kw): | ||
Brian E. Granger
|
r9129 | """Starts a kernel on this host in a separate process. | ||
epatters
|
r2686 | |||
Brian Granger
|
r2699 | If random ports (port=0) are being used, this method must be called | ||
before the channels are created. | ||||
epatters
|
r2758 | |||
Parameters: | ||||
----------- | ||||
epatters
|
r3784 | **kw : optional | ||
MinRK
|
r9352 | keyword arguments that are passed down to build the kernel_cmd | ||
and launching the kernel (e.g. Popen kwargs). | ||||
epatters
|
r2611 | """ | ||
MinRK
|
r7321 | if self.transport == 'tcp' and self.ip not in LOCAL_IPS: | ||
MinRK
|
r3144 | raise RuntimeError("Can only launch a kernel on a local interface. " | ||
epatters
|
r2667 | "Make sure that the '*_address' attributes are " | ||
MinRK
|
r3144 | "configured properly. " | ||
"Currently valid addresses are: %s"%LOCAL_IPS | ||||
) | ||||
MinRK
|
r4958 | |||
# write connection file / get default ports | ||||
self.write_connection_file() | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r9348 | # save kwargs for use in restart | ||
epatters
|
r2851 | self._launch_args = kw.copy() | ||
MinRK
|
r9348 | # build the Popen cmd | ||
kernel_cmd = self.format_kernel_cmd(**kw) | ||||
# launch the kernel subprocess | ||||
MinRK
|
r9349 | self.kernel = self._launch_kernel(kernel_cmd, | ||
ipython_kernel=self.ipython_kernel, | ||||
**kw) | ||||
epatters
|
r2686 | |||
Brian Granger
|
r9119 | def shutdown_kernel(self, now=False, restart=False): | ||
Brian E. Granger
|
r9129 | """Attempts to the stop the kernel process cleanly. | ||
This attempts to shutdown the kernels cleanly by: | ||||
1. Sending it a shutdown message over the shell channel. | ||||
2. If that fails, the kernel is shutdown forcibly by sending it | ||||
a signal. | ||||
epatters
|
r8487 | |||
Brian E. Granger
|
r9129 | Parameters: | ||
----------- | ||||
now : bool | ||||
Should the kernel be forcible killed *now*. This skips the | ||||
first, nice shutdown attempt. | ||||
restart: bool | ||||
Will this kernel be restarted after it is shutdown. When this | ||||
is True, connection files will not be cleaned up. | ||||
epatters
|
r2961 | """ | ||
epatters
|
r2995 | # FIXME: Shutdown does not work on Windows due to ZMQ errors! | ||
if sys.platform == 'win32': | ||||
Brian E. Granger
|
r9130 | self._kill_kernel() | ||
epatters
|
r2995 | return | ||
epatters
|
r3032 | # Pause the heart beat channel if it exists. | ||
if self._hb_channel is not None: | ||||
self._hb_channel.pause() | ||||
Brian Granger
|
r9119 | if now: | ||
epatters
|
r2961 | if self.has_kernel: | ||
Brian E. Granger
|
r9130 | self._kill_kernel() | ||
Brian Granger
|
r9119 | else: | ||
# Don't send any additional kernel kill messages immediately, to give | ||||
# the kernel a chance to properly execute shutdown actions. Wait for at | ||||
# most 1s, checking every 0.1s. | ||||
self.shell_channel.shutdown(restart=restart) | ||||
for i in range(10): | ||||
if self.is_alive: | ||||
time.sleep(0.1) | ||||
else: | ||||
break | ||||
else: | ||||
# OK, we've waited long enough. | ||||
if self.has_kernel: | ||||
Brian E. Granger
|
r9130 | self._kill_kernel() | ||
Bernardo B. Marques
|
r4872 | |||
Brian E. Granger
|
r9117 | if not restart: | ||
self.cleanup_connection_file() | ||||
Brian E. Granger
|
r9151 | self.cleanup_ipc_files() | ||
Brian Granger
|
r9119 | else: | ||
self.cleanup_ipc_files() | ||||
MinRK
|
r4958 | |||
epatters
|
r3784 | def restart_kernel(self, now=False, **kw): | ||
"""Restarts a kernel with the arguments that were used to launch it. | ||||
Bernardo B. Marques
|
r4872 | |||
epatters
|
r3784 | If the old kernel was launched with random ports, the same ports will be | ||
Brian E. Granger
|
r9129 | used for the new kernel. The same connection file is used again. | ||
Fernando Perez
|
r2972 | |||
Parameters | ||||
---------- | ||||
Fernando Perez
|
r3030 | now : bool, optional | ||
epatters
|
r3784 | If True, the kernel is forcefully restarted *immediately*, without | ||
having a chance to do any cleanup action. Otherwise the kernel is | ||||
given 1s to clean up before a forceful restart is issued. | ||||
Fernando Perez
|
r2972 | |||
epatters
|
r3784 | In all cases the kernel is restarted, the only difference is whether | ||
it is given a chance to perform a clean shutdown or not. | ||||
**kw : optional | ||||
Brian E. Granger
|
r9129 | Any options specified here will overwrite those used to launch the | ||
epatters
|
r3784 | kernel. | ||
epatters
|
r2851 | """ | ||
if self._launch_args is None: | ||||
raise RuntimeError("Cannot restart the kernel. " | ||||
"No previous call to 'start_kernel'.") | ||||
else: | ||||
epatters
|
r3784 | # Stop currently running kernel. | ||
Brian Granger
|
r9119 | self.shutdown_kernel(now=now, restart=True) | ||
epatters
|
r3784 | |||
# Start new kernel. | ||||
self._launch_args.update(kw) | ||||
epatters
|
r2915 | self.start_kernel(**self._launch_args) | ||
epatters
|
r2851 | |||
epatters
|
r2995 | # FIXME: Messages get dropped in Windows due to probable ZMQ bug | ||
# unless there is some delay here. | ||||
if sys.platform == 'win32': | ||||
time.sleep(0.2) | ||||
epatters
|
r2686 | @property | ||
def has_kernel(self): | ||||
Brian E. Granger
|
r9129 | """Has a kernel been started that we are managing.""" | ||
epatters
|
r2730 | return self.kernel is not None | ||
epatters
|
r2686 | |||
Brian E. Granger
|
r9130 | def _kill_kernel(self): | ||
Brian E. Granger
|
r9129 | """Kill the running kernel. | ||
epatters
|
r8487 | |||
Brian E. Granger
|
r9129 | This is a private method, callers should use shutdown_kernel(now=True). | ||
epatters
|
r8487 | """ | ||
Brian Granger
|
r3026 | if self.has_kernel: | ||
epatters
|
r3032 | # Pause the heart beat channel if it exists. | ||
if self._hb_channel is not None: | ||||
self._hb_channel.pause() | ||||
epatters
|
r8487 | # Signal the kernel to terminate (sends SIGKILL on Unix and calls | ||
# TerminateProcess() on Win32). | ||||
epatters
|
r3034 | try: | ||
self.kernel.kill() | ||||
Matthias BUSSONNIER
|
r7787 | except OSError as e: | ||
epatters
|
r3034 | # In Windows, we will get an Access Denied error if the process | ||
# has already terminated. Ignore it. | ||||
epatters
|
r3827 | if sys.platform == 'win32': | ||
if e.winerror != 5: | ||||
raise | ||||
# On Unix, we may get an ESRCH error if the process has already | ||||
# terminated. Ignore it. | ||||
else: | ||||
from errno import ESRCH | ||||
if e.errno != ESRCH: | ||||
raise | ||||
epatters
|
r8487 | |||
# Block until the kernel terminates. | ||||
self.kernel.wait() | ||||
epatters
|
r2730 | self.kernel = None | ||
epatters
|
r2639 | else: | ||
epatters
|
r2686 | raise RuntimeError("Cannot kill kernel. No kernel is running!") | ||
epatters
|
r2611 | |||
epatters
|
r3027 | def interrupt_kernel(self): | ||
Brian E. Granger
|
r9129 | """Interrupts the kernel by sending it a signal. | ||
epatters
|
r8487 | |||
Unlike ``signal_kernel``, this operation is well supported on all | ||||
platforms. | ||||
epatters
|
r3027 | """ | ||
if self.has_kernel: | ||||
if sys.platform == 'win32': | ||||
Thomas Kluyver
|
r9487 | from .zmq.parentpoller import ParentPollerWindows as Poller | ||
epatters
|
r3027 | Poller.send_interrupt(self.kernel.win32_interrupt_event) | ||
else: | ||||
self.kernel.send_signal(signal.SIGINT) | ||||
else: | ||||
raise RuntimeError("Cannot interrupt kernel. No kernel is running!") | ||||
epatters
|
r2611 | def signal_kernel(self, signum): | ||
Brian E. Granger
|
r9129 | """Sends a signal to the kernel. | ||
epatters
|
r8487 | |||
Note that since only SIGTERM is supported on Windows, this function is | ||||
only useful on Unix systems. | ||||
epatters
|
r3027 | """ | ||
Brian Granger
|
r3026 | if self.has_kernel: | ||
epatters
|
r2730 | self.kernel.send_signal(signum) | ||
epatters
|
r2686 | else: | ||
raise RuntimeError("Cannot signal kernel. No kernel is running!") | ||||
epatters
|
r2611 | |||
Brian Granger
|
r2699 | @property | ||
def is_alive(self): | ||||
"""Is the kernel process still running?""" | ||||
Brian Granger
|
r3026 | if self.has_kernel: | ||
epatters
|
r2730 | if self.kernel.poll() is None: | ||
Brian Granger
|
r2699 | return True | ||
else: | ||||
return False | ||||
MinRK
|
r5614 | elif self._hb_channel is not None: | ||
# We didn't start the kernel with this KernelManager so we | ||||
# use the heartbeat. | ||||
return self._hb_channel.is_beating() | ||||
Brian Granger
|
r2699 | else: | ||
MinRK
|
r5614 | # no heartbeat and not local, we can't tell if it's running, | ||
# so naively return True | ||||
Brian Granger
|
r2699 | return True | ||
Brian Granger
|
r9121 | #----------------------------------------------------------------------------- | ||
# ABC Registration | ||||
#----------------------------------------------------------------------------- | ||||
Brian Granger
|
r2910 | |||
Brian Granger
|
r9121 | ShellChannelABC.register(ShellChannel) | ||
IOPubChannelABC.register(IOPubChannel) | ||||
HBChannelABC.register(HBChannel) | ||||
StdInChannelABC.register(StdInChannel) | ||||
KernelManagerABC.register(KernelManager) | ||||
Brian E. Granger
|
r9129 | |||