kernelmanager.py
910 lines
| 32.7 KiB
| text/x-python
|
PythonLexer
Brian Granger
|
r2742 | """Base classes to manage the interaction with a running kernel. | ||
Brian Granger
|
r2606 | |||
epatters
|
r3027 | TODO | ||
Brian Granger
|
r2699 | * Create logger to handle debugging and console messages. | ||
Brian Granger
|
r2606 | """ | ||
Brian Granger
|
r2699 | #----------------------------------------------------------------------------- | ||
# Copyright (C) 2008-2010 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
epatters
|
r2611 | # Standard library imports. | ||
Brian Granger
|
r3046 | import atexit | ||
Brian Granger
|
r2606 | from Queue import Queue, Empty | ||
epatters
|
r2667 | from subprocess import Popen | ||
epatters
|
r3027 | import signal | ||
epatters
|
r2995 | import sys | ||
Brian Granger
|
r2606 | from threading import Thread | ||
epatters
|
r2614 | import time | ||
Omar Andres Zapata Mesa
|
r3294 | import logging | ||
Brian Granger
|
r2606 | |||
epatters
|
r2611 | # System library imports. | ||
Brian Granger
|
r2606 | import zmq | ||
from zmq import POLLIN, POLLOUT, POLLERR | ||||
from zmq.eventloop import ioloop | ||||
epatters
|
r2611 | |||
# Local imports. | ||||
Fernando Perez
|
r2926 | from IPython.utils import io | ||
MinRK
|
r3144 | from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS | ||
Brian Granger
|
r2742 | from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress | ||
MinRK
|
r3269 | from session import Session, Message | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | #----------------------------------------------------------------------------- | ||
# Constants and exceptions | ||||
#----------------------------------------------------------------------------- | ||||
class InvalidPortNumber(Exception): | ||||
pass | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | #----------------------------------------------------------------------------- | ||
Fernando Perez
|
r2926 | # Utility functions | ||
#----------------------------------------------------------------------------- | ||||
# some utilities to validate message structure, these might get moved elsewhere | ||||
# if they prove to have more generic utility | ||||
def validate_string_list(lst): | ||||
"""Validate that the input is a list of strings. | ||||
Raises ValueError if not.""" | ||||
if not isinstance(lst, list): | ||||
raise ValueError('input %r must be a list' % lst) | ||||
for x in lst: | ||||
if not isinstance(x, basestring): | ||||
raise ValueError('element %r in list must be a string' % x) | ||||
def validate_string_dict(dct): | ||||
"""Validate that the input is a dict with string keys and values. | ||||
Raises ValueError if not.""" | ||||
for k,v in dct.iteritems(): | ||||
if not isinstance(k, basestring): | ||||
raise ValueError('key %r in dict must be a string' % k) | ||||
if not isinstance(v, basestring): | ||||
raise ValueError('value %r in dict must be a string' % v) | ||||
#----------------------------------------------------------------------------- | ||||
Brian Granger
|
r2699 | # ZMQ Socket Channel classes | ||
#----------------------------------------------------------------------------- | ||||
Brian Granger
|
r2606 | |||
class ZmqSocketChannel(Thread): | ||||
Brian Granger
|
r2699 | """The base class for the channels that use ZMQ sockets. | ||
epatters
|
r2631 | """ | ||
Brian Granger
|
r2695 | context = None | ||
session = None | ||||
socket = None | ||||
ioloop = None | ||||
iostate = None | ||||
Brian Granger
|
r2699 | _address = None | ||
def __init__(self, context, session, address): | ||||
"""Create a channel | ||||
Brian Granger
|
r2695 | |||
Brian Granger
|
r2699 | Parameters | ||
---------- | ||||
Brian Granger
|
r2742 | context : :class:`zmq.Context` | ||
Brian Granger
|
r2699 | The ZMQ context to use. | ||
Brian Granger
|
r2742 | session : :class:`session.Session` | ||
Brian Granger
|
r2699 | The session to use. | ||
address : tuple | ||||
Standard (ip, port) tuple that the kernel is listening on. | ||||
""" | ||||
epatters
|
r2632 | super(ZmqSocketChannel, self).__init__() | ||
self.daemon = True | ||||
Brian Granger
|
r2606 | self.context = context | ||
self.session = session | ||||
Brian Granger
|
r2699 | if address[1] == 0: | ||
epatters
|
r2702 | message = 'The port number for a channel cannot be 0.' | ||
raise InvalidPortNumber(message) | ||||
Brian Granger
|
r2699 | self._address = address | ||
epatters
|
r2631 | |||
epatters
|
r2632 | def stop(self): | ||
Brian Granger
|
r2699 | """Stop the channel's activity. | ||
Brian Granger
|
r2691 | |||
Brian Granger
|
r2699 | This calls :method:`Thread.join` and returns when the thread | ||
terminates. :class:`RuntimeError` will be raised if | ||||
:method:`self.start` is called again. | ||||
epatters
|
r2632 | """ | ||
epatters
|
r2642 | self.join() | ||
Brian Granger
|
r2699 | @property | ||
def address(self): | ||||
"""Get the channel's address as an (ip, port) tuple. | ||||
By the default, the address is (localhost, 0), where 0 means a random | ||||
port. | ||||
epatters
|
r2632 | """ | ||
return self._address | ||||
Brian Granger
|
r2695 | def add_io_state(self, state): | ||
"""Add IO state to the eventloop. | ||||
Brian Granger
|
r2699 | Parameters | ||
---------- | ||||
state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR | ||||
The IO state flag to set. | ||||
Brian Granger
|
r2695 | This is thread safe as it uses the thread safe IOLoop.add_callback. | ||
""" | ||||
def add_io_state_callback(): | ||||
if not self.iostate & state: | ||||
self.iostate = self.iostate | state | ||||
self.ioloop.update_handler(self.socket, self.iostate) | ||||
self.ioloop.add_callback(add_io_state_callback) | ||||
def drop_io_state(self, state): | ||||
"""Drop IO state from the eventloop. | ||||
Brian Granger
|
r2699 | Parameters | ||
---------- | ||||
state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR | ||||
The IO state flag to set. | ||||
Brian Granger
|
r2695 | This is thread safe as it uses the thread safe IOLoop.add_callback. | ||
""" | ||||
def drop_io_state_callback(): | ||||
if self.iostate & state: | ||||
self.iostate = self.iostate & (~state) | ||||
self.ioloop.update_handler(self.socket, self.iostate) | ||||
self.ioloop.add_callback(drop_io_state_callback) | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | class XReqSocketChannel(ZmqSocketChannel): | ||
"""The XREQ channel for issues request/replies to the kernel. | ||||
""" | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | command_queue = None | ||
def __init__(self, context, session, address): | ||||
super(XReqSocketChannel, self).__init__(context, session, address) | ||||
epatters
|
r2996 | self.command_queue = Queue() | ||
self.ioloop = ioloop.IOLoop() | ||||
Brian Granger
|
r2606 | |||
def run(self): | ||||
Brian Granger
|
r2699 | """The thread's main activity. Call start() instead.""" | ||
self.socket = self.context.socket(zmq.XREQ) | ||||
Brian Granger
|
r2606 | self.socket.setsockopt(zmq.IDENTITY, self.session.session) | ||
epatters
|
r2632 | self.socket.connect('tcp://%s:%i' % self.address) | ||
Brian Granger
|
r2699 | self.iostate = POLLERR|POLLIN | ||
Brian Granger
|
r2606 | self.ioloop.add_handler(self.socket, self._handle_events, | ||
Brian Granger
|
r2695 | self.iostate) | ||
Brian Granger
|
r2606 | self.ioloop.start() | ||
epatters
|
r2632 | def stop(self): | ||
self.ioloop.stop() | ||||
Brian Granger
|
r2699 | super(XReqSocketChannel, self).stop() | ||
Brian Granger
|
r2606 | |||
def call_handlers(self, msg): | ||||
Brian Granger
|
r2692 | """This method is called in the ioloop thread when a message arrives. | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2692 | Subclasses should override this method to handle incoming messages. | ||
It is important to remember that this method is called in the thread | ||||
so that some logic must be done to ensure that the application leve | ||||
handlers are called in the application thread. | ||||
""" | ||||
raise NotImplementedError('call_handlers must be defined in a subclass.') | ||||
Brian Granger
|
r2606 | |||
Fernando Perez
|
r2926 | def execute(self, code, silent=False, | ||
user_variables=None, user_expressions=None): | ||||
Brian Granger
|
r2699 | """Execute code in the kernel. | ||
epatters
|
r2672 | |||
Parameters | ||||
---------- | ||||
Brian Granger
|
r2699 | code : str | ||
A string of Python code. | ||||
Fernando Perez
|
r2926 | |||
epatters
|
r2844 | silent : bool, optional (default False) | ||
If set, the kernel will execute the code as quietly possible. | ||||
epatters
|
r2614 | |||
Fernando Perez
|
r2926 | user_variables : list, optional | ||
A list of variable names to pull from the user's namespace. They | ||||
will come back as a dict with these names as keys and their | ||||
:func:`repr` as values. | ||||
user_expressions : dict, optional | ||||
A dict with string keys and to pull from the user's | ||||
namespace. They will come back as a dict with these names as keys | ||||
and their :func:`repr` as values. | ||||
Brian Granger
|
r2699 | Returns | ||
------- | ||||
The msg_id of the message sent. | ||||
""" | ||||
Fernando Perez
|
r2926 | if user_variables is None: | ||
user_variables = [] | ||||
if user_expressions is None: | ||||
user_expressions = {} | ||||
# Don't waste network traffic if inputs are invalid | ||||
if not isinstance(code, basestring): | ||||
raise ValueError('code %r must be a string' % code) | ||||
validate_string_list(user_variables) | ||||
validate_string_dict(user_expressions) | ||||
Brian Granger
|
r2699 | # Create class for content/msg creation. Related to, but possibly | ||
# not in Session. | ||||
Fernando Perez
|
r2926 | content = dict(code=code, silent=silent, | ||
user_variables=user_variables, | ||||
user_expressions=user_expressions) | ||||
Brian Granger
|
r2699 | msg = self.session.msg('execute_request', content) | ||
self._queue_request(msg) | ||||
return msg['header']['msg_id'] | ||||
Brian Granger
|
r2606 | |||
Fernando Perez
|
r2839 | def complete(self, text, line, cursor_pos, block=None): | ||
epatters
|
r2841 | """Tab complete text in the kernel's namespace. | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | Parameters | ||
---------- | ||||
text : str | ||||
The text to complete. | ||||
line : str | ||||
The full line of text that is the surrounding context for the | ||||
text to complete. | ||||
epatters
|
r2841 | cursor_pos : int | ||
The position of the cursor in the line where the completion was | ||||
requested. | ||||
block : str, optional | ||||
Brian Granger
|
r2699 | The full block of code in which the completion is being requested. | ||
Returns | ||||
------- | ||||
The msg_id of the message sent. | ||||
""" | ||||
Fernando Perez
|
r2839 | content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos) | ||
Brian Granger
|
r2699 | msg = self.session.msg('complete_request', content) | ||
self._queue_request(msg) | ||||
return msg['header']['msg_id'] | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | def object_info(self, oname): | ||
"""Get metadata information about an object. | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | Parameters | ||
---------- | ||||
oname : str | ||||
A string specifying the object name. | ||||
Returns | ||||
------- | ||||
The msg_id of the message sent. | ||||
""" | ||||
content = dict(oname=oname) | ||||
msg = self.session.msg('object_info_request', content) | ||||
self._queue_request(msg) | ||||
return msg['header']['msg_id'] | ||||
epatters
|
r2632 | |||
epatters
|
r2844 | def history(self, index=None, raw=False, output=True): | ||
"""Get the history list. | ||||
Parameters | ||||
---------- | ||||
index : n or (n1, n2) or None | ||||
If n, then the last entries. If a tuple, then all in | ||||
range(n1, n2). If None, then all entries. Raises IndexError if | ||||
the format of index is incorrect. | ||||
raw : bool | ||||
If True, return the raw input. | ||||
output : bool | ||||
If True, then return the output as well. | ||||
Returns | ||||
------- | ||||
The msg_id of the message sent. | ||||
""" | ||||
content = dict(index=index, raw=raw, output=output) | ||||
msg = self.session.msg('history_request', content) | ||||
self._queue_request(msg) | ||||
return msg['header']['msg_id'] | ||||
MinRK
|
r3089 | def shutdown(self, restart=False): | ||
Fernando Perez
|
r2972 | """Request an immediate kernel shutdown. | ||
Upon receipt of the (empty) reply, client code can safely assume that | ||||
the kernel has shut down and it's safe to forcefully terminate it if | ||||
it's still alive. | ||||
The kernel will send the reply via a function registered with Python's | ||||
atexit module, ensuring it's truly done as the kernel is done with all | ||||
normal operation. | ||||
""" | ||||
# Send quit message to kernel. Once we implement kernel-side setattr, | ||||
# this should probably be done that way, but for now this will do. | ||||
MinRK
|
r3089 | msg = self.session.msg('shutdown_request', {'restart':restart}) | ||
Fernando Perez
|
r2972 | self._queue_request(msg) | ||
return msg['header']['msg_id'] | ||||
Brian Granger
|
r2606 | def _handle_events(self, socket, events): | ||
if events & POLLERR: | ||||
self._handle_err() | ||||
if events & POLLOUT: | ||||
self._handle_send() | ||||
if events & POLLIN: | ||||
self._handle_recv() | ||||
def _handle_recv(self): | ||||
MinRK
|
r3269 | ident,msg = self.session.recv(self.socket, 0) | ||
epatters
|
r2609 | self.call_handlers(msg) | ||
Brian Granger
|
r2606 | |||
def _handle_send(self): | ||||
try: | ||||
msg = self.command_queue.get(False) | ||||
except Empty: | ||||
pass | ||||
else: | ||||
MinRK
|
r3269 | self.session.send(self.socket,msg) | ||
Brian Granger
|
r2695 | if self.command_queue.empty(): | ||
self.drop_io_state(POLLOUT) | ||||
Brian Granger
|
r2606 | |||
def _handle_err(self): | ||||
Brian Granger
|
r2692 | # We don't want to let this go silently, so eventually we should log. | ||
Brian Granger
|
r2694 | raise zmq.ZMQError() | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | def _queue_request(self, msg): | ||
Brian Granger
|
r2606 | self.command_queue.put(msg) | ||
Brian Granger
|
r2695 | self.add_io_state(POLLOUT) | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | |||
class SubSocketChannel(ZmqSocketChannel): | ||||
"""The SUB channel which listens for messages that the kernel publishes. | ||||
""" | ||||
def __init__(self, context, session, address): | ||||
super(SubSocketChannel, self).__init__(context, session, address) | ||||
epatters
|
r2996 | self.ioloop = ioloop.IOLoop() | ||
Brian Granger
|
r2699 | |||
def run(self): | ||||
"""The thread's main activity. Call start() instead.""" | ||||
self.socket = self.context.socket(zmq.SUB) | ||||
self.socket.setsockopt(zmq.SUBSCRIBE,'') | ||||
self.socket.setsockopt(zmq.IDENTITY, self.session.session) | ||||
self.socket.connect('tcp://%s:%i' % self.address) | ||||
self.iostate = POLLIN|POLLERR | ||||
self.ioloop.add_handler(self.socket, self._handle_events, | ||||
self.iostate) | ||||
self.ioloop.start() | ||||
def stop(self): | ||||
self.ioloop.stop() | ||||
super(SubSocketChannel, self).stop() | ||||
Brian Granger
|
r2697 | def call_handlers(self, msg): | ||
"""This method is called in the ioloop thread when a message arrives. | ||||
Subclasses should override this method to handle incoming messages. | ||||
It is important to remember that this method is called in the thread | ||||
so that some logic must be done to ensure that the application leve | ||||
handlers are called in the application thread. | ||||
""" | ||||
raise NotImplementedError('call_handlers must be defined in a subclass.') | ||||
Brian Granger
|
r2699 | def flush(self, timeout=1.0): | ||
"""Immediately processes all pending messages on the SUB channel. | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2742 | Callers should use this method to ensure that :method:`call_handlers` | ||
has been called for all messages that have been received on the | ||||
0MQ SUB socket of this channel. | ||||
Brian Granger
|
r2699 | This method is thread safe. | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | Parameters | ||
---------- | ||||
timeout : float, optional | ||||
The maximum amount of time to spend flushing, in seconds. The | ||||
default is one second. | ||||
""" | ||||
# We do the IOLoop callback process twice to ensure that the IOLoop | ||||
# gets to perform at least one full poll. | ||||
stop_time = time.time() + timeout | ||||
for i in xrange(2): | ||||
self._flushed = False | ||||
self.ioloop.add_callback(self._flush) | ||||
while not self._flushed and time.time() < stop_time: | ||||
time.sleep(0.01) | ||||
def _handle_events(self, socket, events): | ||||
# Turn on and off POLLOUT depending on if we have made a request | ||||
if events & POLLERR: | ||||
self._handle_err() | ||||
if events & POLLIN: | ||||
self._handle_recv() | ||||
def _handle_err(self): | ||||
# We don't want to let this go silently, so eventually we should log. | ||||
raise zmq.ZMQError() | ||||
def _handle_recv(self): | ||||
# Get all of the messages we can | ||||
while True: | ||||
try: | ||||
MinRK
|
r3269 | ident,msg = self.session.recv(self.socket) | ||
Brian Granger
|
r2699 | except zmq.ZMQError: | ||
# Check the errno? | ||||
Brian Granger
|
r2742 | # Will this trigger POLLERR? | ||
Brian Granger
|
r2699 | break | ||
else: | ||||
MinRK
|
r3269 | if msg is None: | ||
break | ||||
Brian Granger
|
r2699 | self.call_handlers(msg) | ||
def _flush(self): | ||||
"""Callback for :method:`self.flush`.""" | ||||
self._flushed = True | ||||
Brian Granger
|
r2606 | |||
class RepSocketChannel(ZmqSocketChannel): | ||||
Brian Granger
|
r2699 | """A reply channel to handle raw_input requests that the kernel makes.""" | ||
Brian Granger
|
r2606 | |||
epatters
|
r2707 | msg_queue = None | ||
def __init__(self, context, session, address): | ||||
super(RepSocketChannel, self).__init__(context, session, address) | ||||
epatters
|
r2996 | self.ioloop = ioloop.IOLoop() | ||
self.msg_queue = Queue() | ||||
epatters
|
r2707 | |||
epatters
|
r2701 | def run(self): | ||
"""The thread's main activity. Call start() instead.""" | ||||
epatters
|
r2707 | self.socket = self.context.socket(zmq.XREQ) | ||
self.socket.setsockopt(zmq.IDENTITY, self.session.session) | ||||
self.socket.connect('tcp://%s:%i' % self.address) | ||||
self.iostate = POLLERR|POLLIN | ||||
self.ioloop.add_handler(self.socket, self._handle_events, | ||||
self.iostate) | ||||
epatters
|
r2701 | self.ioloop.start() | ||
def stop(self): | ||||
self.ioloop.stop() | ||||
epatters
|
r2702 | super(RepSocketChannel, self).stop() | ||
Brian Granger
|
r2606 | |||
epatters
|
r2707 | def call_handlers(self, msg): | ||
"""This method is called in the ioloop thread when a message arrives. | ||||
Subclasses should override this method to handle incoming messages. | ||||
It is important to remember that this method is called in the thread | ||||
so that some logic must be done to ensure that the application leve | ||||
handlers are called in the application thread. | ||||
""" | ||||
raise NotImplementedError('call_handlers must be defined in a subclass.') | ||||
epatters
|
r2730 | def input(self, string): | ||
"""Send a string of raw input to the kernel.""" | ||||
content = dict(value=string) | ||||
msg = self.session.msg('input_reply', content) | ||||
epatters
|
r2707 | self._queue_reply(msg) | ||
def _handle_events(self, socket, events): | ||||
if events & POLLERR: | ||||
self._handle_err() | ||||
if events & POLLOUT: | ||||
self._handle_send() | ||||
if events & POLLIN: | ||||
self._handle_recv() | ||||
def _handle_recv(self): | ||||
MinRK
|
r3269 | ident,msg = self.session.recv(self.socket, 0) | ||
epatters
|
r2707 | self.call_handlers(msg) | ||
def _handle_send(self): | ||||
try: | ||||
msg = self.msg_queue.get(False) | ||||
except Empty: | ||||
pass | ||||
else: | ||||
MinRK
|
r3269 | self.session.send(self.socket,msg) | ||
epatters
|
r2707 | if self.msg_queue.empty(): | ||
self.drop_io_state(POLLOUT) | ||||
def _handle_err(self): | ||||
# We don't want to let this go silently, so eventually we should log. | ||||
raise zmq.ZMQError() | ||||
def _queue_reply(self, msg): | ||||
self.msg_queue.put(msg) | ||||
self.add_io_state(POLLOUT) | ||||
epatters
|
r2611 | |||
Brian Granger
|
r2910 | class HBSocketChannel(ZmqSocketChannel): | ||
epatters
|
r3032 | """The heartbeat channel which monitors the kernel heartbeat. | ||
Note that the heartbeat channel is paused by default. As long as you start | ||||
this channel, the kernel manager will ensure that it is paused and un-paused | ||||
as appropriate. | ||||
""" | ||||
Brian Granger
|
r2910 | |||
Brian Granger
|
r2925 | time_to_dead = 3.0 | ||
Brian Granger
|
r2910 | socket = None | ||
poller = None | ||||
Brian Granger
|
r3023 | _running = None | ||
_pause = None | ||||
Brian Granger
|
r2910 | |||
def __init__(self, context, session, address): | ||||
super(HBSocketChannel, self).__init__(context, session, address) | ||||
epatters
|
r2915 | self._running = False | ||
epatters
|
r3032 | self._pause = True | ||
Brian Granger
|
r2910 | |||
def _create_socket(self): | ||||
self.socket = self.context.socket(zmq.REQ) | ||||
self.socket.setsockopt(zmq.IDENTITY, self.session.session) | ||||
self.socket.connect('tcp://%s:%i' % self.address) | ||||
self.poller = zmq.Poller() | ||||
self.poller.register(self.socket, zmq.POLLIN) | ||||
def run(self): | ||||
"""The thread's main activity. Call start() instead.""" | ||||
self._create_socket() | ||||
epatters
|
r2915 | self._running = True | ||
while self._running: | ||||
Brian Granger
|
r3023 | if self._pause: | ||
time.sleep(self.time_to_dead) | ||||
Brian Granger
|
r2910 | else: | ||
Brian Granger
|
r3023 | since_last_heartbeat = 0.0 | ||
request_time = time.time() | ||||
try: | ||||
#io.rprint('Ping from HB channel') # dbg | ||||
MinRK
|
r3269 | self.socket.send(b'ping') | ||
Brian Granger
|
r3023 | except zmq.ZMQError, e: | ||
#io.rprint('*** HB Error:', e) # dbg | ||||
if e.errno == zmq.EFSM: | ||||
#io.rprint('sleep...', self.time_to_dead) # dbg | ||||
time.sleep(self.time_to_dead) | ||||
self._create_socket() | ||||
else: | ||||
raise | ||||
else: | ||||
while True: | ||||
try: | ||||
MinRK
|
r3269 | self.socket.recv(zmq.NOBLOCK) | ||
Brian Granger
|
r3023 | except zmq.ZMQError, e: | ||
#io.rprint('*** HB Error 2:', e) # dbg | ||||
if e.errno == zmq.EAGAIN: | ||||
before_poll = time.time() | ||||
until_dead = self.time_to_dead - (before_poll - | ||||
request_time) | ||||
epatters
|
r3032 | # When the return value of poll() is an empty | ||
# list, that is when things have gone wrong | ||||
# (zeromq bug). As long as it is not an empty | ||||
# list, poll is working correctly even if it | ||||
# returns quickly. Note: poll timeout is in | ||||
# milliseconds. | ||||
Brian Granger
|
r3023 | self.poller.poll(1000*until_dead) | ||
Fernando Perez
|
r2926 | |||
epatters
|
r3032 | since_last_heartbeat = time.time()-request_time | ||
Brian Granger
|
r3023 | if since_last_heartbeat > self.time_to_dead: | ||
self.call_handlers(since_last_heartbeat) | ||||
break | ||||
else: | ||||
# FIXME: We should probably log this instead. | ||||
raise | ||||
Brian Granger
|
r2910 | else: | ||
Brian Granger
|
r3023 | until_dead = self.time_to_dead - (time.time() - | ||
request_time) | ||||
if until_dead > 0.0: | ||||
#io.rprint('sleep...', self.time_to_dead) # dbg | ||||
time.sleep(until_dead) | ||||
break | ||||
def pause(self): | ||||
"""Pause the heartbeat.""" | ||||
self._pause = True | ||||
def unpause(self): | ||||
"""Unpause the heartbeat.""" | ||||
self._pause = False | ||||
def is_beating(self): | ||||
"""Is the heartbeat running and not paused.""" | ||||
if self.is_alive() and not self._pause: | ||||
return True | ||||
else: | ||||
return False | ||||
Brian Granger
|
r2910 | |||
epatters
|
r2915 | def stop(self): | ||
self._running = False | ||||
super(HBSocketChannel, self).stop() | ||||
Brian Granger
|
r2910 | def call_handlers(self, since_last_heartbeat): | ||
"""This method is called in the ioloop thread when a message arrives. | ||||
Subclasses should override this method to handle incoming messages. | ||||
It is important to remember that this method is called in the thread | ||||
so that some logic must be done to ensure that the application leve | ||||
handlers are called in the application thread. | ||||
""" | ||||
raise NotImplementedError('call_handlers must be defined in a subclass.') | ||||
Brian Granger
|
r2699 | #----------------------------------------------------------------------------- | ||
# Main kernel manager class | ||||
#----------------------------------------------------------------------------- | ||||
epatters
|
r2611 | class KernelManager(HasTraits): | ||
epatters
|
r2631 | """ Manages a kernel for a frontend. | ||
epatters
|
r2611 | |||
epatters
|
r2631 | The SUB channel is for the frontend to receive messages published by the | ||
kernel. | ||||
The REQ channel is for the frontend to make requests of the kernel. | ||||
The REP channel is for the kernel to request stdin (raw_input) from the | ||||
frontend. | ||||
""" | ||||
epatters
|
r2611 | # The PyZMQ Context to use for communication with the kernel. | ||
Brian Granger
|
r2753 | context = Instance(zmq.Context,(),{}) | ||
epatters
|
r2611 | |||
# The Session to use for communication with the kernel. | ||||
Brian Granger
|
r2753 | session = Instance(Session,(),{}) | ||
epatters
|
r2611 | |||
epatters
|
r2730 | # The kernel process with which the KernelManager is communicating. | ||
kernel = Instance(Popen) | ||||
epatters
|
r2758 | # The addresses for the communication channels. | ||
xreq_address = TCPAddress((LOCALHOST, 0)) | ||||
sub_address = TCPAddress((LOCALHOST, 0)) | ||||
rep_address = TCPAddress((LOCALHOST, 0)) | ||||
Brian Granger
|
r2910 | hb_address = TCPAddress((LOCALHOST, 0)) | ||
epatters
|
r2758 | |||
epatters
|
r2611 | # The classes to use for the various channels. | ||
xreq_channel_class = Type(XReqSocketChannel) | ||||
Brian Granger
|
r2699 | sub_channel_class = Type(SubSocketChannel) | ||
epatters
|
r2611 | rep_channel_class = Type(RepSocketChannel) | ||
Brian Granger
|
r2910 | hb_channel_class = Type(HBSocketChannel) | ||
Brian Granger
|
r3046 | |||
epatters
|
r2611 | # Protected traits. | ||
epatters
|
r2851 | _launch_args = Any | ||
epatters
|
r2611 | _xreq_channel = Any | ||
Brian Granger
|
r2699 | _sub_channel = Any | ||
epatters
|
r2611 | _rep_channel = Any | ||
Brian Granger
|
r2910 | _hb_channel = Any | ||
epatters
|
r2611 | |||
Brian Granger
|
r3046 | def __init__(self, **kwargs): | ||
super(KernelManager, self).__init__(**kwargs) | ||||
Brian Granger
|
r3047 | # Uncomment this to try closing the context. | ||
# atexit.register(self.context.close) | ||||
Brian Granger
|
r3046 | |||
epatters
|
r2758 | #-------------------------------------------------------------------------- | ||
epatters
|
r2686 | # Channel management methods: | ||
#-------------------------------------------------------------------------- | ||||
epatters
|
r2611 | |||
epatters
|
r3032 | def start_channels(self, xreq=True, sub=True, rep=True, hb=True): | ||
"""Starts the channels for this kernel. | ||||
Brian Granger
|
r2699 | |||
This will create the channels if they do not exist and then start | ||||
them. If port numbers of 0 are being used (random ports) then you | ||||
must first call :method:`start_kernel`. If the channels have been | ||||
stopped and you call this, :class:`RuntimeError` will be raised. | ||||
epatters
|
r2639 | """ | ||
epatters
|
r2994 | if xreq: | ||
self.xreq_channel.start() | ||||
if sub: | ||||
self.sub_channel.start() | ||||
if rep: | ||||
self.rep_channel.start() | ||||
epatters
|
r3032 | if hb: | ||
self.hb_channel.start() | ||||
epatters
|
r2639 | |||
Brian Granger
|
r2699 | def stop_channels(self): | ||
epatters
|
r2994 | """Stops all the running channels for this kernel. | ||
Brian Granger
|
r2699 | """ | ||
epatters
|
r2994 | if self.xreq_channel.is_alive(): | ||
self.xreq_channel.stop() | ||||
if self.sub_channel.is_alive(): | ||||
self.sub_channel.stop() | ||||
if self.rep_channel.is_alive(): | ||||
self.rep_channel.stop() | ||||
epatters
|
r3032 | if self.hb_channel.is_alive(): | ||
self.hb_channel.stop() | ||||
epatters
|
r2686 | |||
Brian Granger
|
r2699 | @property | ||
def channels_running(self): | ||||
epatters
|
r2994 | """Are any of the channels created and running?""" | ||
epatters
|
r3032 | return (self.xreq_channel.is_alive() or self.sub_channel.is_alive() or | ||
self.rep_channel.is_alive() or self.hb_channel.is_alive()) | ||||
epatters
|
r2639 | |||
epatters
|
r2686 | #-------------------------------------------------------------------------- | ||
# Kernel process management methods: | ||||
#-------------------------------------------------------------------------- | ||||
epatters
|
r2851 | def start_kernel(self, **kw): | ||
epatters
|
r2686 | """Starts a kernel process and configures the manager to use it. | ||
Brian Granger
|
r2699 | If random ports (port=0) are being used, this method must be called | ||
before the channels are created. | ||||
epatters
|
r2758 | |||
Parameters: | ||||
----------- | ||||
epatters
|
r2778 | ipython : bool, optional (default True) | ||
Whether to use an IPython kernel instead of a plain Python kernel. | ||||
epatters
|
r2611 | """ | ||
Brian Granger
|
r2910 | xreq, sub, rep, hb = self.xreq_address, self.sub_address, \ | ||
self.rep_address, self.hb_address | ||||
MinRK
|
r3144 | if xreq[0] not in LOCAL_IPS or sub[0] not in LOCAL_IPS or \ | ||
rep[0] not in LOCAL_IPS or hb[0] not in LOCAL_IPS: | ||||
raise RuntimeError("Can only launch a kernel on a local interface. " | ||||
epatters
|
r2667 | "Make sure that the '*_address' attributes are " | ||
MinRK
|
r3144 | "configured properly. " | ||
"Currently valid addresses are: %s"%LOCAL_IPS | ||||
) | ||||
epatters
|
r2851 | self._launch_args = kw.copy() | ||
if kw.pop('ipython', True): | ||||
epatters
|
r2961 | from ipkernel import launch_kernel | ||
epatters
|
r2778 | else: | ||
epatters
|
r2961 | from pykernel import launch_kernel | ||
MinRK
|
r3144 | self.kernel, xrep, pub, req, _hb = launch_kernel( | ||
epatters
|
r2915 | xrep_port=xreq[1], pub_port=sub[1], | ||
req_port=rep[1], hb_port=hb[1], **kw) | ||||
MinRK
|
r3144 | self.xreq_address = (xreq[0], xrep) | ||
self.sub_address = (sub[0], pub) | ||||
self.rep_address = (rep[0], req) | ||||
self.hb_address = (hb[0], _hb) | ||||
epatters
|
r2686 | |||
MinRK
|
r3089 | def shutdown_kernel(self, restart=False): | ||
epatters
|
r2961 | """ Attempts to the stop the kernel process cleanly. If the kernel | ||
cannot be stopped, it is killed, if possible. | ||||
""" | ||||
epatters
|
r2995 | # FIXME: Shutdown does not work on Windows due to ZMQ errors! | ||
if sys.platform == 'win32': | ||||
self.kill_kernel() | ||||
return | ||||
epatters
|
r3032 | # Pause the heart beat channel if it exists. | ||
if self._hb_channel is not None: | ||||
self._hb_channel.pause() | ||||
epatters
|
r2961 | # Don't send any additional kernel kill messages immediately, to give | ||
# the kernel a chance to properly execute shutdown actions. Wait for at | ||||
Fernando Perez
|
r2972 | # most 1s, checking every 0.1s. | ||
MinRK
|
r3089 | self.xreq_channel.shutdown(restart=restart) | ||
Fernando Perez
|
r2972 | for i in range(10): | ||
epatters
|
r2961 | if self.is_alive: | ||
time.sleep(0.1) | ||||
else: | ||||
break | ||||
else: | ||||
# OK, we've waited long enough. | ||||
if self.has_kernel: | ||||
self.kill_kernel() | ||||
Fernando Perez
|
r2972 | |||
Fernando Perez
|
r3030 | def restart_kernel(self, now=False): | ||
epatters
|
r2851 | """Restarts a kernel with the same arguments that were used to launch | ||
it. If the old kernel was launched with random ports, the same ports | ||||
will be used for the new kernel. | ||||
Fernando Perez
|
r2972 | |||
Parameters | ||||
---------- | ||||
Fernando Perez
|
r3030 | now : bool, optional | ||
Fernando Perez
|
r2972 | If True, the kernel is forcefully restarted *immediately*, without | ||
having a chance to do any cleanup action. Otherwise the kernel is | ||||
given 1s to clean up before a forceful restart is issued. | ||||
In all cases the kernel is restarted, the only difference is whether | ||||
it is given a chance to perform a clean shutdown or not. | ||||
epatters
|
r2851 | """ | ||
if self._launch_args is None: | ||||
raise RuntimeError("Cannot restart the kernel. " | ||||
"No previous call to 'start_kernel'.") | ||||
else: | ||||
if self.has_kernel: | ||||
Fernando Perez
|
r3030 | if now: | ||
Fernando Perez
|
r2972 | self.kill_kernel() | ||
else: | ||||
MinRK
|
r3089 | self.shutdown_kernel(restart=True) | ||
epatters
|
r2915 | self.start_kernel(**self._launch_args) | ||
epatters
|
r2851 | |||
epatters
|
r2995 | # FIXME: Messages get dropped in Windows due to probable ZMQ bug | ||
# unless there is some delay here. | ||||
if sys.platform == 'win32': | ||||
time.sleep(0.2) | ||||
epatters
|
r2686 | @property | ||
def has_kernel(self): | ||||
"""Returns whether a kernel process has been specified for the kernel | ||||
manager. | ||||
epatters
|
r2632 | """ | ||
epatters
|
r2730 | return self.kernel is not None | ||
epatters
|
r2686 | |||
def kill_kernel(self): | ||||
""" Kill the running kernel. """ | ||||
Brian Granger
|
r3026 | if self.has_kernel: | ||
epatters
|
r3032 | # Pause the heart beat channel if it exists. | ||
if self._hb_channel is not None: | ||||
self._hb_channel.pause() | ||||
epatters
|
r3034 | # Attempt to kill the kernel. | ||
try: | ||||
self.kernel.kill() | ||||
except OSError, e: | ||||
# In Windows, we will get an Access Denied error if the process | ||||
# has already terminated. Ignore it. | ||||
if not (sys.platform == 'win32' and e.winerror == 5): | ||||
raise | ||||
epatters
|
r2730 | self.kernel = None | ||
epatters
|
r2639 | else: | ||
epatters
|
r2686 | raise RuntimeError("Cannot kill kernel. No kernel is running!") | ||
epatters
|
r2611 | |||
epatters
|
r3027 | def interrupt_kernel(self): | ||
""" Interrupts the kernel. Unlike ``signal_kernel``, this operation is | ||||
well supported on all platforms. | ||||
""" | ||||
if self.has_kernel: | ||||
if sys.platform == 'win32': | ||||
from parentpoller import ParentPollerWindows as Poller | ||||
Poller.send_interrupt(self.kernel.win32_interrupt_event) | ||||
else: | ||||
self.kernel.send_signal(signal.SIGINT) | ||||
else: | ||||
raise RuntimeError("Cannot interrupt kernel. No kernel is running!") | ||||
epatters
|
r2611 | def signal_kernel(self, signum): | ||
epatters
|
r3027 | """ Sends a signal to the kernel. Note that since only SIGTERM is | ||
supported on Windows, this function is only useful on Unix systems. | ||||
""" | ||||
Brian Granger
|
r3026 | if self.has_kernel: | ||
epatters
|
r2730 | self.kernel.send_signal(signum) | ||
epatters
|
r2686 | else: | ||
raise RuntimeError("Cannot signal kernel. No kernel is running!") | ||||
epatters
|
r2611 | |||
Brian Granger
|
r2699 | @property | ||
def is_alive(self): | ||||
"""Is the kernel process still running?""" | ||||
Fernando Perez
|
r2972 | # FIXME: not using a heartbeat means this method is broken for any | ||
# remote kernel, it's only capable of handling local kernels. | ||||
Brian Granger
|
r3026 | if self.has_kernel: | ||
epatters
|
r2730 | if self.kernel.poll() is None: | ||
Brian Granger
|
r2699 | return True | ||
else: | ||||
return False | ||||
else: | ||||
# We didn't start the kernel with this KernelManager so we don't | ||||
# know if it is running. We should use a heartbeat for this case. | ||||
return True | ||||
epatters
|
r2632 | #-------------------------------------------------------------------------- | ||
# Channels used for communication with the kernel: | ||||
#-------------------------------------------------------------------------- | ||||
epatters
|
r2611 | @property | ||
def xreq_channel(self): | ||||
"""Get the REQ socket channel object to make requests of the kernel.""" | ||||
if self._xreq_channel is None: | ||||
epatters
|
r2631 | self._xreq_channel = self.xreq_channel_class(self.context, | ||
Brian Granger
|
r2699 | self.session, | ||
self.xreq_address) | ||||
epatters
|
r2611 | return self._xreq_channel | ||
@property | ||||
Brian Granger
|
r2699 | def sub_channel(self): | ||
"""Get the SUB socket channel object.""" | ||||
if self._sub_channel is None: | ||||
self._sub_channel = self.sub_channel_class(self.context, | ||||
self.session, | ||||
self.sub_address) | ||||
return self._sub_channel | ||||
@property | ||||
epatters
|
r2611 | def rep_channel(self): | ||
"""Get the REP socket channel object to handle stdin (raw_input).""" | ||||
if self._rep_channel is None: | ||||
epatters
|
r2631 | self._rep_channel = self.rep_channel_class(self.context, | ||
Brian Granger
|
r2699 | self.session, | ||
self.rep_address) | ||||
epatters
|
r2611 | return self._rep_channel | ||
Brian Granger
|
r2910 | |||
@property | ||||
def hb_channel(self): | ||||
Thomas Kluyver
|
r3360 | """Get the heartbeat socket channel object to check that the | ||
kernel is alive.""" | ||||
Brian Granger
|
r2910 | if self._hb_channel is None: | ||
self._hb_channel = self.hb_channel_class(self.context, | ||||
self.session, | ||||
self.hb_address) | ||||
return self._hb_channel | ||||