kernelmanager.py
1043 lines
| 37.4 KiB
| text/x-python
|
PythonLexer
Brian Granger
|
r2742 | """Base classes to manage the interaction with a running kernel. | ||
Brian Granger
|
r2606 | |||
epatters
|
r3027 | TODO | ||
Brian Granger
|
r2699 | * Create logger to handle debugging and console messages. | ||
Brian Granger
|
r2606 | """ | ||
Brian Granger
|
r2699 | #----------------------------------------------------------------------------- | ||
# Copyright (C) 2008-2010 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
epatters
|
r2611 | # Standard library imports. | ||
MinRK
|
r3928 | import errno | ||
MinRK
|
r5072 | import json | ||
Brian Granger
|
r2606 | from Queue import Queue, Empty | ||
epatters
|
r2667 | from subprocess import Popen | ||
MinRK
|
r4958 | import os | ||
epatters
|
r3027 | import signal | ||
epatters
|
r2995 | import sys | ||
Brian Granger
|
r2606 | from threading import Thread | ||
epatters
|
r2614 | import time | ||
Brian Granger
|
r2606 | |||
epatters
|
r2611 | # System library imports. | ||
Brian Granger
|
r2606 | import zmq | ||
from zmq import POLLIN, POLLOUT, POLLERR | ||||
from zmq.eventloop import ioloop | ||||
epatters
|
r2611 | |||
# Local imports. | ||||
MinRK
|
r4015 | from IPython.config.loader import Config | ||
MinRK
|
r3144 | from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS | ||
MinRK
|
r4958 | from IPython.utils.traitlets import ( | ||
HasTraits, Any, Instance, Type, Unicode, Int, Bool | ||||
) | ||||
MinRK
|
r4966 | from IPython.utils.py3compat import str_to_bytes | ||
MinRK
|
r4958 | from IPython.zmq.entry_point import write_connection_file | ||
MinRK
|
r4956 | from session import Session | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | #----------------------------------------------------------------------------- | ||
# Constants and exceptions | ||||
#----------------------------------------------------------------------------- | ||||
class InvalidPortNumber(Exception): | ||||
pass | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | #----------------------------------------------------------------------------- | ||
Fernando Perez
|
r2926 | # Utility functions | ||
#----------------------------------------------------------------------------- | ||||
# some utilities to validate message structure, these might get moved elsewhere | ||||
# if they prove to have more generic utility | ||||
def validate_string_list(lst): | ||||
"""Validate that the input is a list of strings. | ||||
Raises ValueError if not.""" | ||||
if not isinstance(lst, list): | ||||
raise ValueError('input %r must be a list' % lst) | ||||
for x in lst: | ||||
if not isinstance(x, basestring): | ||||
raise ValueError('element %r in list must be a string' % x) | ||||
def validate_string_dict(dct): | ||||
"""Validate that the input is a dict with string keys and values. | ||||
Raises ValueError if not.""" | ||||
for k,v in dct.iteritems(): | ||||
if not isinstance(k, basestring): | ||||
raise ValueError('key %r in dict must be a string' % k) | ||||
if not isinstance(v, basestring): | ||||
raise ValueError('value %r in dict must be a string' % v) | ||||
#----------------------------------------------------------------------------- | ||||
Brian Granger
|
r2699 | # ZMQ Socket Channel classes | ||
#----------------------------------------------------------------------------- | ||||
Brian Granger
|
r2606 | |||
MinRK
|
r3974 | class ZMQSocketChannel(Thread): | ||
Brian Granger
|
r2699 | """The base class for the channels that use ZMQ sockets. | ||
epatters
|
r2631 | """ | ||
Brian Granger
|
r2695 | context = None | ||
session = None | ||||
socket = None | ||||
ioloop = None | ||||
iostate = None | ||||
Brian Granger
|
r2699 | _address = None | ||
def __init__(self, context, session, address): | ||||
"""Create a channel | ||||
Brian Granger
|
r2695 | |||
Brian Granger
|
r2699 | Parameters | ||
---------- | ||||
Brian Granger
|
r2742 | context : :class:`zmq.Context` | ||
Brian Granger
|
r2699 | The ZMQ context to use. | ||
Brian Granger
|
r2742 | session : :class:`session.Session` | ||
Brian Granger
|
r2699 | The session to use. | ||
address : tuple | ||||
Standard (ip, port) tuple that the kernel is listening on. | ||||
""" | ||||
MinRK
|
r3974 | super(ZMQSocketChannel, self).__init__() | ||
epatters
|
r2632 | self.daemon = True | ||
Brian Granger
|
r2606 | self.context = context | ||
self.session = session | ||||
Brian Granger
|
r2699 | if address[1] == 0: | ||
epatters
|
r2702 | message = 'The port number for a channel cannot be 0.' | ||
raise InvalidPortNumber(message) | ||||
Brian Granger
|
r2699 | self._address = address | ||
epatters
|
r2631 | |||
MinRK
|
r3928 | def _run_loop(self): | ||
"""Run my loop, ignoring EINTR events in the poller""" | ||||
while True: | ||||
try: | ||||
self.ioloop.start() | ||||
except zmq.ZMQError as e: | ||||
if e.errno == errno.EINTR: | ||||
continue | ||||
else: | ||||
raise | ||||
else: | ||||
break | ||||
Bernardo B. Marques
|
r4872 | |||
epatters
|
r2632 | def stop(self): | ||
Brian Granger
|
r2699 | """Stop the channel's activity. | ||
Brian Granger
|
r2691 | |||
Brian Granger
|
r2699 | This calls :method:`Thread.join` and returns when the thread | ||
Bernardo B. Marques
|
r4872 | terminates. :class:`RuntimeError` will be raised if | ||
Brian Granger
|
r2699 | :method:`self.start` is called again. | ||
epatters
|
r2632 | """ | ||
epatters
|
r2642 | self.join() | ||
Brian Granger
|
r2699 | @property | ||
def address(self): | ||||
"""Get the channel's address as an (ip, port) tuple. | ||||
Bernardo B. Marques
|
r4872 | |||
Brian Granger
|
r2699 | By the default, the address is (localhost, 0), where 0 means a random | ||
port. | ||||
epatters
|
r2632 | """ | ||
return self._address | ||||
Brian Granger
|
r2695 | def add_io_state(self, state): | ||
"""Add IO state to the eventloop. | ||||
Brian Granger
|
r2699 | Parameters | ||
---------- | ||||
state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR | ||||
The IO state flag to set. | ||||
Brian Granger
|
r2695 | This is thread safe as it uses the thread safe IOLoop.add_callback. | ||
""" | ||||
def add_io_state_callback(): | ||||
if not self.iostate & state: | ||||
self.iostate = self.iostate | state | ||||
self.ioloop.update_handler(self.socket, self.iostate) | ||||
self.ioloop.add_callback(add_io_state_callback) | ||||
def drop_io_state(self, state): | ||||
"""Drop IO state from the eventloop. | ||||
Brian Granger
|
r2699 | Parameters | ||
---------- | ||||
state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR | ||||
The IO state flag to set. | ||||
Brian Granger
|
r2695 | This is thread safe as it uses the thread safe IOLoop.add_callback. | ||
""" | ||||
def drop_io_state_callback(): | ||||
if self.iostate & state: | ||||
self.iostate = self.iostate & (~state) | ||||
self.ioloop.update_handler(self.socket, self.iostate) | ||||
self.ioloop.add_callback(drop_io_state_callback) | ||||
Brian Granger
|
r2606 | |||
MinRK
|
r3974 | class ShellSocketChannel(ZMQSocketChannel): | ||
Brian Granger
|
r2699 | """The XREQ channel for issues request/replies to the kernel. | ||
""" | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | command_queue = None | ||
MinRK
|
r4952 | # flag for whether execute requests should be allowed to call raw_input: | ||
allow_stdin = True | ||||
Brian Granger
|
r2699 | |||
def __init__(self, context, session, address): | ||||
MinRK
|
r3974 | super(ShellSocketChannel, self).__init__(context, session, address) | ||
epatters
|
r2996 | self.command_queue = Queue() | ||
self.ioloop = ioloop.IOLoop() | ||||
Brian Granger
|
r2606 | |||
def run(self): | ||||
Brian Granger
|
r2699 | """The thread's main activity. Call start() instead.""" | ||
MinRK
|
r4725 | self.socket = self.context.socket(zmq.DEALER) | ||
MinRK
|
r4770 | self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) | ||
epatters
|
r2632 | self.socket.connect('tcp://%s:%i' % self.address) | ||
Brian Granger
|
r2699 | self.iostate = POLLERR|POLLIN | ||
Bernardo B. Marques
|
r4872 | self.ioloop.add_handler(self.socket, self._handle_events, | ||
Brian Granger
|
r2695 | self.iostate) | ||
MinRK
|
r3928 | self._run_loop() | ||
Brian Granger
|
r2606 | |||
epatters
|
r2632 | def stop(self): | ||
self.ioloop.stop() | ||||
MinRK
|
r3974 | super(ShellSocketChannel, self).stop() | ||
Brian Granger
|
r2606 | |||
def call_handlers(self, msg): | ||||
Brian Granger
|
r2692 | """This method is called in the ioloop thread when a message arrives. | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2692 | Subclasses should override this method to handle incoming messages. | ||
It is important to remember that this method is called in the thread | ||||
so that some logic must be done to ensure that the application leve | ||||
handlers are called in the application thread. | ||||
""" | ||||
raise NotImplementedError('call_handlers must be defined in a subclass.') | ||||
Brian Granger
|
r2606 | |||
Fernando Perez
|
r2926 | def execute(self, code, silent=False, | ||
MinRK
|
r4952 | user_variables=None, user_expressions=None, allow_stdin=None): | ||
Brian Granger
|
r2699 | """Execute code in the kernel. | ||
epatters
|
r2672 | |||
Parameters | ||||
---------- | ||||
Brian Granger
|
r2699 | code : str | ||
A string of Python code. | ||||
Bernardo B. Marques
|
r4872 | |||
epatters
|
r2844 | silent : bool, optional (default False) | ||
If set, the kernel will execute the code as quietly possible. | ||||
epatters
|
r2614 | |||
Fernando Perez
|
r2926 | user_variables : list, optional | ||
A list of variable names to pull from the user's namespace. They | ||||
will come back as a dict with these names as keys and their | ||||
:func:`repr` as values. | ||||
Bernardo B. Marques
|
r4872 | |||
Fernando Perez
|
r2926 | user_expressions : dict, optional | ||
A dict with string keys and to pull from the user's | ||||
namespace. They will come back as a dict with these names as keys | ||||
and their :func:`repr` as values. | ||||
MinRK
|
r4952 | allow_stdin : bool, optional | ||
Flag for | ||||
A dict with string keys and to pull from the user's | ||||
namespace. They will come back as a dict with these names as keys | ||||
and their :func:`repr` as values. | ||||
Brian Granger
|
r2699 | Returns | ||
------- | ||||
The msg_id of the message sent. | ||||
""" | ||||
Fernando Perez
|
r2926 | if user_variables is None: | ||
user_variables = [] | ||||
if user_expressions is None: | ||||
user_expressions = {} | ||||
MinRK
|
r4952 | if allow_stdin is None: | ||
allow_stdin = self.allow_stdin | ||||
Fernando Perez
|
r2926 | # Don't waste network traffic if inputs are invalid | ||
if not isinstance(code, basestring): | ||||
raise ValueError('code %r must be a string' % code) | ||||
validate_string_list(user_variables) | ||||
validate_string_dict(user_expressions) | ||||
Brian Granger
|
r2699 | # Create class for content/msg creation. Related to, but possibly | ||
# not in Session. | ||||
Fernando Perez
|
r2926 | content = dict(code=code, silent=silent, | ||
user_variables=user_variables, | ||||
MinRK
|
r4952 | user_expressions=user_expressions, | ||
allow_stdin=allow_stdin, | ||||
) | ||||
Brian Granger
|
r2699 | msg = self.session.msg('execute_request', content) | ||
self._queue_request(msg) | ||||
return msg['header']['msg_id'] | ||||
Brian Granger
|
r2606 | |||
Fernando Perez
|
r2839 | def complete(self, text, line, cursor_pos, block=None): | ||
epatters
|
r2841 | """Tab complete text in the kernel's namespace. | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | Parameters | ||
---------- | ||||
text : str | ||||
The text to complete. | ||||
line : str | ||||
Bernardo B. Marques
|
r4872 | The full line of text that is the surrounding context for the | ||
Brian Granger
|
r2699 | text to complete. | ||
epatters
|
r2841 | cursor_pos : int | ||
The position of the cursor in the line where the completion was | ||||
requested. | ||||
block : str, optional | ||||
Brian Granger
|
r2699 | The full block of code in which the completion is being requested. | ||
Returns | ||||
------- | ||||
The msg_id of the message sent. | ||||
""" | ||||
Fernando Perez
|
r2839 | content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos) | ||
Brian Granger
|
r2699 | msg = self.session.msg('complete_request', content) | ||
self._queue_request(msg) | ||||
return msg['header']['msg_id'] | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | def object_info(self, oname): | ||
"""Get metadata information about an object. | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | Parameters | ||
---------- | ||||
oname : str | ||||
A string specifying the object name. | ||||
Bernardo B. Marques
|
r4872 | |||
Brian Granger
|
r2699 | Returns | ||
------- | ||||
The msg_id of the message sent. | ||||
""" | ||||
content = dict(oname=oname) | ||||
msg = self.session.msg('object_info_request', content) | ||||
self._queue_request(msg) | ||||
return msg['header']['msg_id'] | ||||
epatters
|
r2632 | |||
Thomas Kluyver
|
r3819 | def history(self, raw=True, output=False, hist_access_type='range', **kwargs): | ||
"""Get entries from the history list. | ||||
epatters
|
r2844 | |||
Parameters | ||||
---------- | ||||
raw : bool | ||||
If True, return the raw input. | ||||
output : bool | ||||
If True, then return the output as well. | ||||
Thomas Kluyver
|
r3819 | hist_access_type : str | ||
'range' (fill in session, start and stop params), 'tail' (fill in n) | ||||
or 'search' (fill in pattern param). | ||||
Bernardo B. Marques
|
r4872 | |||
Thomas Kluyver
|
r3819 | session : int | ||
For a range request, the session from which to get lines. Session | ||||
numbers are positive integers; negative ones count back from the | ||||
current session. | ||||
start : int | ||||
The first line number of a history range. | ||||
stop : int | ||||
The final (excluded) line number of a history range. | ||||
Bernardo B. Marques
|
r4872 | |||
Thomas Kluyver
|
r3819 | n : int | ||
The number of lines of history to get for a tail request. | ||||
Bernardo B. Marques
|
r4872 | |||
Thomas Kluyver
|
r3819 | pattern : str | ||
The glob-syntax pattern for a search request. | ||||
epatters
|
r2844 | |||
Returns | ||||
------- | ||||
The msg_id of the message sent. | ||||
""" | ||||
Thomas Kluyver
|
r3819 | content = dict(raw=raw, output=output, hist_access_type=hist_access_type, | ||
**kwargs) | ||||
Thomas Kluyver
|
r3817 | msg = self.session.msg('history_request', content) | ||
epatters
|
r2844 | self._queue_request(msg) | ||
return msg['header']['msg_id'] | ||||
MinRK
|
r3089 | def shutdown(self, restart=False): | ||
Fernando Perez
|
r2972 | """Request an immediate kernel shutdown. | ||
Upon receipt of the (empty) reply, client code can safely assume that | ||||
the kernel has shut down and it's safe to forcefully terminate it if | ||||
it's still alive. | ||||
The kernel will send the reply via a function registered with Python's | ||||
atexit module, ensuring it's truly done as the kernel is done with all | ||||
normal operation. | ||||
""" | ||||
# Send quit message to kernel. Once we implement kernel-side setattr, | ||||
# this should probably be done that way, but for now this will do. | ||||
MinRK
|
r3089 | msg = self.session.msg('shutdown_request', {'restart':restart}) | ||
Fernando Perez
|
r2972 | self._queue_request(msg) | ||
return msg['header']['msg_id'] | ||||
Brian Granger
|
r2606 | def _handle_events(self, socket, events): | ||
if events & POLLERR: | ||||
self._handle_err() | ||||
if events & POLLOUT: | ||||
self._handle_send() | ||||
if events & POLLIN: | ||||
self._handle_recv() | ||||
def _handle_recv(self): | ||||
MinRK
|
r3269 | ident,msg = self.session.recv(self.socket, 0) | ||
epatters
|
r2609 | self.call_handlers(msg) | ||
Brian Granger
|
r2606 | |||
def _handle_send(self): | ||||
try: | ||||
msg = self.command_queue.get(False) | ||||
except Empty: | ||||
pass | ||||
else: | ||||
MinRK
|
r3269 | self.session.send(self.socket,msg) | ||
Brian Granger
|
r2695 | if self.command_queue.empty(): | ||
self.drop_io_state(POLLOUT) | ||||
Brian Granger
|
r2606 | |||
def _handle_err(self): | ||||
Brian Granger
|
r2692 | # We don't want to let this go silently, so eventually we should log. | ||
Brian Granger
|
r2694 | raise zmq.ZMQError() | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | def _queue_request(self, msg): | ||
Brian Granger
|
r2606 | self.command_queue.put(msg) | ||
Brian Granger
|
r2695 | self.add_io_state(POLLOUT) | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | |||
MinRK
|
r3974 | class SubSocketChannel(ZMQSocketChannel): | ||
Brian Granger
|
r2699 | """The SUB channel which listens for messages that the kernel publishes. | ||
""" | ||||
def __init__(self, context, session, address): | ||||
super(SubSocketChannel, self).__init__(context, session, address) | ||||
epatters
|
r2996 | self.ioloop = ioloop.IOLoop() | ||
Brian Granger
|
r2699 | |||
def run(self): | ||||
"""The thread's main activity. Call start() instead.""" | ||||
self.socket = self.context.socket(zmq.SUB) | ||||
Thomas Kluyver
|
r4735 | self.socket.setsockopt(zmq.SUBSCRIBE,b'') | ||
MinRK
|
r4770 | self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) | ||
Brian Granger
|
r2699 | self.socket.connect('tcp://%s:%i' % self.address) | ||
self.iostate = POLLIN|POLLERR | ||||
Bernardo B. Marques
|
r4872 | self.ioloop.add_handler(self.socket, self._handle_events, | ||
Brian Granger
|
r2699 | self.iostate) | ||
MinRK
|
r3928 | self._run_loop() | ||
Brian Granger
|
r2699 | |||
def stop(self): | ||||
self.ioloop.stop() | ||||
super(SubSocketChannel, self).stop() | ||||
Brian Granger
|
r2697 | def call_handlers(self, msg): | ||
"""This method is called in the ioloop thread when a message arrives. | ||||
Subclasses should override this method to handle incoming messages. | ||||
It is important to remember that this method is called in the thread | ||||
so that some logic must be done to ensure that the application leve | ||||
handlers are called in the application thread. | ||||
""" | ||||
raise NotImplementedError('call_handlers must be defined in a subclass.') | ||||
Brian Granger
|
r2699 | def flush(self, timeout=1.0): | ||
"""Immediately processes all pending messages on the SUB channel. | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2742 | Callers should use this method to ensure that :method:`call_handlers` | ||
has been called for all messages that have been received on the | ||||
0MQ SUB socket of this channel. | ||||
Brian Granger
|
r2699 | This method is thread safe. | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | Parameters | ||
---------- | ||||
timeout : float, optional | ||||
The maximum amount of time to spend flushing, in seconds. The | ||||
default is one second. | ||||
""" | ||||
# We do the IOLoop callback process twice to ensure that the IOLoop | ||||
# gets to perform at least one full poll. | ||||
stop_time = time.time() + timeout | ||||
for i in xrange(2): | ||||
self._flushed = False | ||||
self.ioloop.add_callback(self._flush) | ||||
while not self._flushed and time.time() < stop_time: | ||||
time.sleep(0.01) | ||||
def _handle_events(self, socket, events): | ||||
# Turn on and off POLLOUT depending on if we have made a request | ||||
if events & POLLERR: | ||||
self._handle_err() | ||||
if events & POLLIN: | ||||
self._handle_recv() | ||||
def _handle_err(self): | ||||
# We don't want to let this go silently, so eventually we should log. | ||||
raise zmq.ZMQError() | ||||
def _handle_recv(self): | ||||
# Get all of the messages we can | ||||
while True: | ||||
try: | ||||
MinRK
|
r3269 | ident,msg = self.session.recv(self.socket) | ||
Brian Granger
|
r2699 | except zmq.ZMQError: | ||
# Check the errno? | ||||
Brian Granger
|
r2742 | # Will this trigger POLLERR? | ||
Brian Granger
|
r2699 | break | ||
else: | ||||
MinRK
|
r3269 | if msg is None: | ||
break | ||||
Brian Granger
|
r2699 | self.call_handlers(msg) | ||
def _flush(self): | ||||
"""Callback for :method:`self.flush`.""" | ||||
self._flushed = True | ||||
Brian Granger
|
r2606 | |||
MinRK
|
r3974 | class StdInSocketChannel(ZMQSocketChannel): | ||
Brian Granger
|
r2699 | """A reply channel to handle raw_input requests that the kernel makes.""" | ||
Brian Granger
|
r2606 | |||
epatters
|
r2707 | msg_queue = None | ||
def __init__(self, context, session, address): | ||||
MinRK
|
r3974 | super(StdInSocketChannel, self).__init__(context, session, address) | ||
epatters
|
r2996 | self.ioloop = ioloop.IOLoop() | ||
self.msg_queue = Queue() | ||||
epatters
|
r2707 | |||
epatters
|
r2701 | def run(self): | ||
"""The thread's main activity. Call start() instead.""" | ||||
MinRK
|
r4725 | self.socket = self.context.socket(zmq.DEALER) | ||
MinRK
|
r4770 | self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) | ||
epatters
|
r2707 | self.socket.connect('tcp://%s:%i' % self.address) | ||
self.iostate = POLLERR|POLLIN | ||||
Bernardo B. Marques
|
r4872 | self.ioloop.add_handler(self.socket, self._handle_events, | ||
epatters
|
r2707 | self.iostate) | ||
MinRK
|
r3928 | self._run_loop() | ||
epatters
|
r2701 | |||
def stop(self): | ||||
self.ioloop.stop() | ||||
MinRK
|
r3974 | super(StdInSocketChannel, self).stop() | ||
Brian Granger
|
r2606 | |||
epatters
|
r2707 | def call_handlers(self, msg): | ||
"""This method is called in the ioloop thread when a message arrives. | ||||
Subclasses should override this method to handle incoming messages. | ||||
It is important to remember that this method is called in the thread | ||||
so that some logic must be done to ensure that the application leve | ||||
handlers are called in the application thread. | ||||
""" | ||||
raise NotImplementedError('call_handlers must be defined in a subclass.') | ||||
epatters
|
r2730 | def input(self, string): | ||
"""Send a string of raw input to the kernel.""" | ||||
content = dict(value=string) | ||||
msg = self.session.msg('input_reply', content) | ||||
epatters
|
r2707 | self._queue_reply(msg) | ||
def _handle_events(self, socket, events): | ||||
if events & POLLERR: | ||||
self._handle_err() | ||||
if events & POLLOUT: | ||||
self._handle_send() | ||||
if events & POLLIN: | ||||
self._handle_recv() | ||||
def _handle_recv(self): | ||||
MinRK
|
r3269 | ident,msg = self.session.recv(self.socket, 0) | ||
epatters
|
r2707 | self.call_handlers(msg) | ||
def _handle_send(self): | ||||
try: | ||||
msg = self.msg_queue.get(False) | ||||
except Empty: | ||||
pass | ||||
else: | ||||
MinRK
|
r3269 | self.session.send(self.socket,msg) | ||
epatters
|
r2707 | if self.msg_queue.empty(): | ||
self.drop_io_state(POLLOUT) | ||||
def _handle_err(self): | ||||
# We don't want to let this go silently, so eventually we should log. | ||||
raise zmq.ZMQError() | ||||
def _queue_reply(self, msg): | ||||
self.msg_queue.put(msg) | ||||
self.add_io_state(POLLOUT) | ||||
epatters
|
r2611 | |||
MinRK
|
r3974 | class HBSocketChannel(ZMQSocketChannel): | ||
epatters
|
r3032 | """The heartbeat channel which monitors the kernel heartbeat. | ||
Note that the heartbeat channel is paused by default. As long as you start | ||||
this channel, the kernel manager will ensure that it is paused and un-paused | ||||
as appropriate. | ||||
""" | ||||
Brian Granger
|
r2910 | |||
Brian Granger
|
r2925 | time_to_dead = 3.0 | ||
Brian Granger
|
r2910 | socket = None | ||
poller = None | ||||
Brian Granger
|
r3023 | _running = None | ||
_pause = None | ||||
Brian Granger
|
r2910 | |||
def __init__(self, context, session, address): | ||||
super(HBSocketChannel, self).__init__(context, session, address) | ||||
epatters
|
r2915 | self._running = False | ||
epatters
|
r3032 | self._pause = True | ||
Brian Granger
|
r2910 | |||
def _create_socket(self): | ||||
self.socket = self.context.socket(zmq.REQ) | ||||
MinRK
|
r4770 | self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) | ||
Brian Granger
|
r2910 | self.socket.connect('tcp://%s:%i' % self.address) | ||
self.poller = zmq.Poller() | ||||
self.poller.register(self.socket, zmq.POLLIN) | ||||
def run(self): | ||||
"""The thread's main activity. Call start() instead.""" | ||||
self._create_socket() | ||||
epatters
|
r2915 | self._running = True | ||
while self._running: | ||||
Brian Granger
|
r3023 | if self._pause: | ||
time.sleep(self.time_to_dead) | ||||
Brian Granger
|
r2910 | else: | ||
Brian Granger
|
r3023 | since_last_heartbeat = 0.0 | ||
request_time = time.time() | ||||
try: | ||||
#io.rprint('Ping from HB channel') # dbg | ||||
MinRK
|
r3269 | self.socket.send(b'ping') | ||
Brian Granger
|
r3023 | except zmq.ZMQError, e: | ||
#io.rprint('*** HB Error:', e) # dbg | ||||
if e.errno == zmq.EFSM: | ||||
#io.rprint('sleep...', self.time_to_dead) # dbg | ||||
time.sleep(self.time_to_dead) | ||||
self._create_socket() | ||||
else: | ||||
raise | ||||
else: | ||||
while True: | ||||
try: | ||||
MinRK
|
r3269 | self.socket.recv(zmq.NOBLOCK) | ||
Brian Granger
|
r3023 | except zmq.ZMQError, e: | ||
#io.rprint('*** HB Error 2:', e) # dbg | ||||
if e.errno == zmq.EAGAIN: | ||||
before_poll = time.time() | ||||
until_dead = self.time_to_dead - (before_poll - | ||||
request_time) | ||||
epatters
|
r3032 | # When the return value of poll() is an empty | ||
# list, that is when things have gone wrong | ||||
# (zeromq bug). As long as it is not an empty | ||||
# list, poll is working correctly even if it | ||||
# returns quickly. Note: poll timeout is in | ||||
# milliseconds. | ||||
epatters
|
r3831 | if until_dead > 0.0: | ||
MinRK
|
r3928 | while True: | ||
try: | ||||
self.poller.poll(1000 * until_dead) | ||||
except zmq.ZMQError as e: | ||||
if e.errno == errno.EINTR: | ||||
continue | ||||
else: | ||||
raise | ||||
else: | ||||
break | ||||
Bernardo B. Marques
|
r4872 | |||
epatters
|
r3032 | since_last_heartbeat = time.time()-request_time | ||
Brian Granger
|
r3023 | if since_last_heartbeat > self.time_to_dead: | ||
self.call_handlers(since_last_heartbeat) | ||||
break | ||||
else: | ||||
# FIXME: We should probably log this instead. | ||||
raise | ||||
Brian Granger
|
r2910 | else: | ||
Brian Granger
|
r3023 | until_dead = self.time_to_dead - (time.time() - | ||
request_time) | ||||
if until_dead > 0.0: | ||||
#io.rprint('sleep...', self.time_to_dead) # dbg | ||||
time.sleep(until_dead) | ||||
break | ||||
def pause(self): | ||||
"""Pause the heartbeat.""" | ||||
self._pause = True | ||||
def unpause(self): | ||||
"""Unpause the heartbeat.""" | ||||
self._pause = False | ||||
def is_beating(self): | ||||
"""Is the heartbeat running and not paused.""" | ||||
if self.is_alive() and not self._pause: | ||||
return True | ||||
else: | ||||
return False | ||||
Brian Granger
|
r2910 | |||
epatters
|
r2915 | def stop(self): | ||
self._running = False | ||||
super(HBSocketChannel, self).stop() | ||||
Brian Granger
|
r2910 | def call_handlers(self, since_last_heartbeat): | ||
"""This method is called in the ioloop thread when a message arrives. | ||||
Subclasses should override this method to handle incoming messages. | ||||
It is important to remember that this method is called in the thread | ||||
so that some logic must be done to ensure that the application leve | ||||
handlers are called in the application thread. | ||||
""" | ||||
raise NotImplementedError('call_handlers must be defined in a subclass.') | ||||
Brian Granger
|
r2699 | #----------------------------------------------------------------------------- | ||
# Main kernel manager class | ||||
#----------------------------------------------------------------------------- | ||||
epatters
|
r2611 | class KernelManager(HasTraits): | ||
epatters
|
r2631 | """ Manages a kernel for a frontend. | ||
epatters
|
r2611 | |||
epatters
|
r2631 | The SUB channel is for the frontend to receive messages published by the | ||
kernel. | ||||
Bernardo B. Marques
|
r4872 | |||
epatters
|
r2631 | The REQ channel is for the frontend to make requests of the kernel. | ||
Bernardo B. Marques
|
r4872 | |||
epatters
|
r2631 | The REP channel is for the kernel to request stdin (raw_input) from the | ||
frontend. | ||||
""" | ||||
MinRK
|
r4015 | # config object for passing to child configurables | ||
config = Instance(Config) | ||||
Bernardo B. Marques
|
r4872 | |||
epatters
|
r2611 | # The PyZMQ Context to use for communication with the kernel. | ||
MinRK
|
r4015 | context = Instance(zmq.Context) | ||
def _context_default(self): | ||||
return zmq.Context.instance() | ||||
epatters
|
r2611 | |||
# The Session to use for communication with the kernel. | ||||
MinRK
|
r4015 | session = Instance(Session) | ||
epatters
|
r2611 | |||
epatters
|
r2730 | # The kernel process with which the KernelManager is communicating. | ||
kernel = Instance(Popen) | ||||
Bernardo B. Marques
|
r4872 | # The addresses for the communication channels. | ||
MinRK
|
r4958 | connection_file = Unicode('') | ||
MinRK
|
r4956 | ip = Unicode(LOCALHOST) | ||
shell_port = Int(0) | ||||
MinRK
|
r4959 | iopub_port = Int(0) | ||
MinRK
|
r4956 | stdin_port = Int(0) | ||
hb_port = Int(0) | ||||
epatters
|
r2758 | |||
epatters
|
r2611 | # The classes to use for the various channels. | ||
MinRK
|
r3974 | shell_channel_class = Type(ShellSocketChannel) | ||
Brian Granger
|
r2699 | sub_channel_class = Type(SubSocketChannel) | ||
MinRK
|
r3974 | stdin_channel_class = Type(StdInSocketChannel) | ||
Brian Granger
|
r2910 | hb_channel_class = Type(HBSocketChannel) | ||
Brian Granger
|
r3046 | |||
epatters
|
r2611 | # Protected traits. | ||
epatters
|
r2851 | _launch_args = Any | ||
MinRK
|
r3974 | _shell_channel = Any | ||
Brian Granger
|
r2699 | _sub_channel = Any | ||
MinRK
|
r3974 | _stdin_channel = Any | ||
Brian Granger
|
r2910 | _hb_channel = Any | ||
MinRK
|
r4958 | _connection_file_written=Bool(False) | ||
epatters
|
r2611 | |||
Brian Granger
|
r3046 | def __init__(self, **kwargs): | ||
super(KernelManager, self).__init__(**kwargs) | ||||
MinRK
|
r4015 | if self.session is None: | ||
self.session = Session(config=self.config) | ||||
MinRK
|
r4958 | |||
def __del__(self): | ||||
if self._connection_file_written: | ||||
# cleanup connection files on full shutdown of kernel we started | ||||
self._connection_file_written = False | ||||
try: | ||||
os.remove(self.connection_file) | ||||
except IOError: | ||||
pass | ||||
Brian Granger
|
r3046 | |||
epatters
|
r2758 | #-------------------------------------------------------------------------- | ||
epatters
|
r2686 | # Channel management methods: | ||
#-------------------------------------------------------------------------- | ||||
epatters
|
r2611 | |||
MinRK
|
r3974 | def start_channels(self, shell=True, sub=True, stdin=True, hb=True): | ||
epatters
|
r3032 | """Starts the channels for this kernel. | ||
Brian Granger
|
r2699 | |||
This will create the channels if they do not exist and then start | ||||
them. If port numbers of 0 are being used (random ports) then you | ||||
must first call :method:`start_kernel`. If the channels have been | ||||
stopped and you call this, :class:`RuntimeError` will be raised. | ||||
epatters
|
r2639 | """ | ||
MinRK
|
r3974 | if shell: | ||
self.shell_channel.start() | ||||
epatters
|
r2994 | if sub: | ||
self.sub_channel.start() | ||||
MinRK
|
r3974 | if stdin: | ||
self.stdin_channel.start() | ||||
MinRK
|
r4952 | self.shell_channel.allow_stdin = True | ||
else: | ||||
self.shell_channel.allow_stdin = False | ||||
epatters
|
r3032 | if hb: | ||
self.hb_channel.start() | ||||
epatters
|
r2639 | |||
Brian Granger
|
r2699 | def stop_channels(self): | ||
epatters
|
r2994 | """Stops all the running channels for this kernel. | ||
Brian Granger
|
r2699 | """ | ||
MinRK
|
r3974 | if self.shell_channel.is_alive(): | ||
self.shell_channel.stop() | ||||
epatters
|
r2994 | if self.sub_channel.is_alive(): | ||
self.sub_channel.stop() | ||||
MinRK
|
r3974 | if self.stdin_channel.is_alive(): | ||
self.stdin_channel.stop() | ||||
epatters
|
r3032 | if self.hb_channel.is_alive(): | ||
self.hb_channel.stop() | ||||
epatters
|
r2686 | |||
Brian Granger
|
r2699 | @property | ||
def channels_running(self): | ||||
epatters
|
r2994 | """Are any of the channels created and running?""" | ||
MinRK
|
r3974 | return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or | ||
self.stdin_channel.is_alive() or self.hb_channel.is_alive()) | ||||
epatters
|
r2639 | |||
epatters
|
r2686 | #-------------------------------------------------------------------------- | ||
# Kernel process management methods: | ||||
#-------------------------------------------------------------------------- | ||||
MinRK
|
r4958 | |||
MinRK
|
r4966 | def load_connection_file(self): | ||
"""load connection info from JSON dict in self.connection_file""" | ||||
with open(self.connection_file) as f: | ||||
cfg = json.loads(f.read()) | ||||
self.ip = cfg['ip'] | ||||
self.shell_port = cfg['shell_port'] | ||||
self.stdin_port = cfg['stdin_port'] | ||||
self.iopub_port = cfg['iopub_port'] | ||||
self.hb_port = cfg['hb_port'] | ||||
self.session.key = str_to_bytes(cfg['key']) | ||||
MinRK
|
r4958 | def write_connection_file(self): | ||
MinRK
|
r4966 | """write connection info to JSON dict in self.connection_file""" | ||
MinRK
|
r4958 | if self._connection_file_written: | ||
return | ||||
self.connection_file,cfg = write_connection_file(self.connection_file, | ||||
ip=self.ip, key=self.session.key, | ||||
MinRK
|
r4959 | stdin_port=self.stdin_port, iopub_port=self.iopub_port, | ||
MinRK
|
r4958 | shell_port=self.shell_port, hb_port=self.hb_port) | ||
# write_connection_file also sets default ports: | ||||
self.shell_port = cfg['shell_port'] | ||||
self.stdin_port = cfg['stdin_port'] | ||||
MinRK
|
r4959 | self.iopub_port = cfg['iopub_port'] | ||
MinRK
|
r4958 | self.hb_port = cfg['hb_port'] | ||
self._connection_file_written = True | ||||
epatters
|
r2851 | def start_kernel(self, **kw): | ||
epatters
|
r2686 | """Starts a kernel process and configures the manager to use it. | ||
Brian Granger
|
r2699 | If random ports (port=0) are being used, this method must be called | ||
before the channels are created. | ||||
epatters
|
r2758 | |||
Parameters: | ||||
----------- | ||||
epatters
|
r2778 | ipython : bool, optional (default True) | ||
Whether to use an IPython kernel instead of a plain Python kernel. | ||||
epatters
|
r3784 | |||
epatters
|
r4509 | launcher : callable, optional (default None) | ||
A custom function for launching the kernel process (generally a | ||||
wrapper around ``entry_point.base_launch_kernel``). In most cases, | ||||
it should not be necessary to use this parameter. | ||||
epatters
|
r3784 | **kw : optional | ||
See respective options for IPython and Python kernels. | ||||
epatters
|
r2611 | """ | ||
MinRK
|
r4956 | if self.ip not in LOCAL_IPS: | ||
MinRK
|
r3144 | raise RuntimeError("Can only launch a kernel on a local interface. " | ||
epatters
|
r2667 | "Make sure that the '*_address' attributes are " | ||
MinRK
|
r3144 | "configured properly. " | ||
"Currently valid addresses are: %s"%LOCAL_IPS | ||||
) | ||||
MinRK
|
r4958 | |||
# write connection file / get default ports | ||||
self.write_connection_file() | ||||
Bernardo B. Marques
|
r4872 | |||
epatters
|
r2851 | self._launch_args = kw.copy() | ||
epatters
|
r4509 | launch_kernel = kw.pop('launcher', None) | ||
if launch_kernel is None: | ||||
if kw.pop('ipython', True): | ||||
from ipkernel import launch_kernel | ||||
else: | ||||
from pykernel import launch_kernel | ||||
MinRK
|
r4958 | self.kernel = launch_kernel(fname=self.connection_file, **kw) | ||
epatters
|
r2686 | |||
MinRK
|
r3089 | def shutdown_kernel(self, restart=False): | ||
epatters
|
r2961 | """ Attempts to the stop the kernel process cleanly. If the kernel | ||
cannot be stopped, it is killed, if possible. | ||||
""" | ||||
epatters
|
r2995 | # FIXME: Shutdown does not work on Windows due to ZMQ errors! | ||
if sys.platform == 'win32': | ||||
self.kill_kernel() | ||||
return | ||||
epatters
|
r3032 | # Pause the heart beat channel if it exists. | ||
if self._hb_channel is not None: | ||||
self._hb_channel.pause() | ||||
epatters
|
r2961 | # Don't send any additional kernel kill messages immediately, to give | ||
# the kernel a chance to properly execute shutdown actions. Wait for at | ||||
Fernando Perez
|
r2972 | # most 1s, checking every 0.1s. | ||
MinRK
|
r3974 | self.shell_channel.shutdown(restart=restart) | ||
Fernando Perez
|
r2972 | for i in range(10): | ||
epatters
|
r2961 | if self.is_alive: | ||
time.sleep(0.1) | ||||
else: | ||||
break | ||||
else: | ||||
# OK, we've waited long enough. | ||||
if self.has_kernel: | ||||
self.kill_kernel() | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4958 | if not restart and self._connection_file_written: | ||
# cleanup connection files on full shutdown of kernel we started | ||||
self._connection_file_written = False | ||||
try: | ||||
os.remove(self.connection_file) | ||||
except IOError: | ||||
pass | ||||
epatters
|
r3784 | def restart_kernel(self, now=False, **kw): | ||
"""Restarts a kernel with the arguments that were used to launch it. | ||||
Bernardo B. Marques
|
r4872 | |||
epatters
|
r3784 | If the old kernel was launched with random ports, the same ports will be | ||
used for the new kernel. | ||||
Fernando Perez
|
r2972 | |||
Parameters | ||||
---------- | ||||
Fernando Perez
|
r3030 | now : bool, optional | ||
epatters
|
r3784 | If True, the kernel is forcefully restarted *immediately*, without | ||
having a chance to do any cleanup action. Otherwise the kernel is | ||||
given 1s to clean up before a forceful restart is issued. | ||||
Fernando Perez
|
r2972 | |||
epatters
|
r3784 | In all cases the kernel is restarted, the only difference is whether | ||
it is given a chance to perform a clean shutdown or not. | ||||
**kw : optional | ||||
Any options specified here will replace those used to launch the | ||||
kernel. | ||||
epatters
|
r2851 | """ | ||
if self._launch_args is None: | ||||
raise RuntimeError("Cannot restart the kernel. " | ||||
"No previous call to 'start_kernel'.") | ||||
else: | ||||
epatters
|
r3784 | # Stop currently running kernel. | ||
epatters
|
r2851 | if self.has_kernel: | ||
Fernando Perez
|
r3030 | if now: | ||
Fernando Perez
|
r2972 | self.kill_kernel() | ||
else: | ||||
MinRK
|
r3089 | self.shutdown_kernel(restart=True) | ||
epatters
|
r3784 | |||
# Start new kernel. | ||||
self._launch_args.update(kw) | ||||
epatters
|
r2915 | self.start_kernel(**self._launch_args) | ||
epatters
|
r2851 | |||
epatters
|
r2995 | # FIXME: Messages get dropped in Windows due to probable ZMQ bug | ||
# unless there is some delay here. | ||||
if sys.platform == 'win32': | ||||
time.sleep(0.2) | ||||
epatters
|
r2686 | @property | ||
def has_kernel(self): | ||||
"""Returns whether a kernel process has been specified for the kernel | ||||
manager. | ||||
epatters
|
r2632 | """ | ||
epatters
|
r2730 | return self.kernel is not None | ||
epatters
|
r2686 | |||
def kill_kernel(self): | ||||
""" Kill the running kernel. """ | ||||
Brian Granger
|
r3026 | if self.has_kernel: | ||
epatters
|
r3032 | # Pause the heart beat channel if it exists. | ||
if self._hb_channel is not None: | ||||
self._hb_channel.pause() | ||||
epatters
|
r3034 | # Attempt to kill the kernel. | ||
try: | ||||
self.kernel.kill() | ||||
except OSError, e: | ||||
# In Windows, we will get an Access Denied error if the process | ||||
# has already terminated. Ignore it. | ||||
epatters
|
r3827 | if sys.platform == 'win32': | ||
if e.winerror != 5: | ||||
raise | ||||
# On Unix, we may get an ESRCH error if the process has already | ||||
# terminated. Ignore it. | ||||
else: | ||||
from errno import ESRCH | ||||
if e.errno != ESRCH: | ||||
raise | ||||
epatters
|
r2730 | self.kernel = None | ||
epatters
|
r2639 | else: | ||
epatters
|
r2686 | raise RuntimeError("Cannot kill kernel. No kernel is running!") | ||
epatters
|
r2611 | |||
epatters
|
r3027 | def interrupt_kernel(self): | ||
""" Interrupts the kernel. Unlike ``signal_kernel``, this operation is | ||||
well supported on all platforms. | ||||
""" | ||||
if self.has_kernel: | ||||
if sys.platform == 'win32': | ||||
from parentpoller import ParentPollerWindows as Poller | ||||
Poller.send_interrupt(self.kernel.win32_interrupt_event) | ||||
else: | ||||
self.kernel.send_signal(signal.SIGINT) | ||||
else: | ||||
raise RuntimeError("Cannot interrupt kernel. No kernel is running!") | ||||
epatters
|
r2611 | def signal_kernel(self, signum): | ||
epatters
|
r3027 | """ Sends a signal to the kernel. Note that since only SIGTERM is | ||
supported on Windows, this function is only useful on Unix systems. | ||||
""" | ||||
Brian Granger
|
r3026 | if self.has_kernel: | ||
epatters
|
r2730 | self.kernel.send_signal(signum) | ||
epatters
|
r2686 | else: | ||
raise RuntimeError("Cannot signal kernel. No kernel is running!") | ||||
epatters
|
r2611 | |||
Brian Granger
|
r2699 | @property | ||
def is_alive(self): | ||||
"""Is the kernel process still running?""" | ||||
Fernando Perez
|
r2972 | # FIXME: not using a heartbeat means this method is broken for any | ||
# remote kernel, it's only capable of handling local kernels. | ||||
Brian Granger
|
r3026 | if self.has_kernel: | ||
epatters
|
r2730 | if self.kernel.poll() is None: | ||
Brian Granger
|
r2699 | return True | ||
else: | ||||
return False | ||||
else: | ||||
# We didn't start the kernel with this KernelManager so we don't | ||||
# know if it is running. We should use a heartbeat for this case. | ||||
return True | ||||
epatters
|
r2632 | #-------------------------------------------------------------------------- | ||
# Channels used for communication with the kernel: | ||||
#-------------------------------------------------------------------------- | ||||
epatters
|
r2611 | @property | ||
MinRK
|
r3974 | def shell_channel(self): | ||
epatters
|
r2611 | """Get the REQ socket channel object to make requests of the kernel.""" | ||
MinRK
|
r3974 | if self._shell_channel is None: | ||
self._shell_channel = self.shell_channel_class(self.context, | ||||
Brian Granger
|
r2699 | self.session, | ||
MinRK
|
r4956 | (self.ip, self.shell_port)) | ||
MinRK
|
r3974 | return self._shell_channel | ||
epatters
|
r2611 | |||
@property | ||||
Brian Granger
|
r2699 | def sub_channel(self): | ||
"""Get the SUB socket channel object.""" | ||||
if self._sub_channel is None: | ||||
self._sub_channel = self.sub_channel_class(self.context, | ||||
self.session, | ||||
MinRK
|
r4959 | (self.ip, self.iopub_port)) | ||
Brian Granger
|
r2699 | return self._sub_channel | ||
@property | ||||
MinRK
|
r3974 | def stdin_channel(self): | ||
epatters
|
r2611 | """Get the REP socket channel object to handle stdin (raw_input).""" | ||
MinRK
|
r3974 | if self._stdin_channel is None: | ||
self._stdin_channel = self.stdin_channel_class(self.context, | ||||
Brian Granger
|
r2699 | self.session, | ||
MinRK
|
r4956 | (self.ip, self.stdin_port)) | ||
MinRK
|
r3974 | return self._stdin_channel | ||
Brian Granger
|
r2910 | |||
@property | ||||
def hb_channel(self): | ||||
Thomas Kluyver
|
r3360 | """Get the heartbeat socket channel object to check that the | ||
kernel is alive.""" | ||||
Brian Granger
|
r2910 | if self._hb_channel is None: | ||
Bernardo B. Marques
|
r4872 | self._hb_channel = self.hb_channel_class(self.context, | ||
Brian Granger
|
r2910 | self.session, | ||
MinRK
|
r4956 | (self.ip, self.hb_port)) | ||
Brian Granger
|
r2910 | return self._hb_channel | ||