kernelmanager.py
786 lines
| 27.9 KiB
| text/x-python
|
PythonLexer
Brian Granger
|
r2742 | """Base classes to manage the interaction with a running kernel. | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | Todo | ||
==== | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | * Create logger to handle debugging and console messages. | ||
Brian Granger
|
r2606 | """ | ||
Brian Granger
|
r2699 | #----------------------------------------------------------------------------- | ||
# Copyright (C) 2008-2010 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
epatters
|
r2611 | # Standard library imports. | ||
Brian Granger
|
r2606 | from Queue import Queue, Empty | ||
epatters
|
r2667 | from subprocess import Popen | ||
Brian Granger
|
r2606 | from threading import Thread | ||
epatters
|
r2614 | import time | ||
Brian Granger
|
r2606 | |||
epatters
|
r2611 | # System library imports. | ||
Brian Granger
|
r2606 | import zmq | ||
from zmq import POLLIN, POLLOUT, POLLERR | ||||
from zmq.eventloop import ioloop | ||||
epatters
|
r2611 | |||
# Local imports. | ||||
Fernando Perez
|
r2926 | from IPython.utils import io | ||
Brian Granger
|
r2742 | from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress | ||
Brian Granger
|
r2606 | from session import Session | ||
Brian Granger
|
r2699 | #----------------------------------------------------------------------------- | ||
# Constants and exceptions | ||||
#----------------------------------------------------------------------------- | ||||
epatters
|
r2667 | LOCALHOST = '127.0.0.1' | ||
Brian Granger
|
r2699 | class InvalidPortNumber(Exception): | ||
pass | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | #----------------------------------------------------------------------------- | ||
Fernando Perez
|
r2926 | # Utility functions | ||
#----------------------------------------------------------------------------- | ||||
# some utilities to validate message structure, these might get moved elsewhere | ||||
# if they prove to have more generic utility | ||||
def validate_string_list(lst): | ||||
"""Validate that the input is a list of strings. | ||||
Raises ValueError if not.""" | ||||
if not isinstance(lst, list): | ||||
raise ValueError('input %r must be a list' % lst) | ||||
for x in lst: | ||||
if not isinstance(x, basestring): | ||||
raise ValueError('element %r in list must be a string' % x) | ||||
def validate_string_dict(dct): | ||||
"""Validate that the input is a dict with string keys and values. | ||||
Raises ValueError if not.""" | ||||
for k,v in dct.iteritems(): | ||||
if not isinstance(k, basestring): | ||||
raise ValueError('key %r in dict must be a string' % k) | ||||
if not isinstance(v, basestring): | ||||
raise ValueError('value %r in dict must be a string' % v) | ||||
#----------------------------------------------------------------------------- | ||||
Brian Granger
|
r2699 | # ZMQ Socket Channel classes | ||
#----------------------------------------------------------------------------- | ||||
Brian Granger
|
r2606 | |||
class ZmqSocketChannel(Thread): | ||||
Brian Granger
|
r2699 | """The base class for the channels that use ZMQ sockets. | ||
epatters
|
r2631 | """ | ||
Brian Granger
|
r2695 | context = None | ||
session = None | ||||
socket = None | ||||
ioloop = None | ||||
iostate = None | ||||
Brian Granger
|
r2699 | _address = None | ||
def __init__(self, context, session, address): | ||||
"""Create a channel | ||||
Brian Granger
|
r2695 | |||
Brian Granger
|
r2699 | Parameters | ||
---------- | ||||
Brian Granger
|
r2742 | context : :class:`zmq.Context` | ||
Brian Granger
|
r2699 | The ZMQ context to use. | ||
Brian Granger
|
r2742 | session : :class:`session.Session` | ||
Brian Granger
|
r2699 | The session to use. | ||
address : tuple | ||||
Standard (ip, port) tuple that the kernel is listening on. | ||||
""" | ||||
epatters
|
r2632 | super(ZmqSocketChannel, self).__init__() | ||
self.daemon = True | ||||
Brian Granger
|
r2606 | self.context = context | ||
self.session = session | ||||
Brian Granger
|
r2699 | if address[1] == 0: | ||
epatters
|
r2702 | message = 'The port number for a channel cannot be 0.' | ||
raise InvalidPortNumber(message) | ||||
Brian Granger
|
r2699 | self._address = address | ||
epatters
|
r2631 | |||
epatters
|
r2632 | def stop(self): | ||
Brian Granger
|
r2699 | """Stop the channel's activity. | ||
Brian Granger
|
r2691 | |||
Brian Granger
|
r2699 | This calls :method:`Thread.join` and returns when the thread | ||
terminates. :class:`RuntimeError` will be raised if | ||||
:method:`self.start` is called again. | ||||
epatters
|
r2632 | """ | ||
epatters
|
r2642 | self.join() | ||
Brian Granger
|
r2699 | @property | ||
def address(self): | ||||
"""Get the channel's address as an (ip, port) tuple. | ||||
By the default, the address is (localhost, 0), where 0 means a random | ||||
port. | ||||
epatters
|
r2632 | """ | ||
return self._address | ||||
Brian Granger
|
r2695 | def add_io_state(self, state): | ||
"""Add IO state to the eventloop. | ||||
Brian Granger
|
r2699 | Parameters | ||
---------- | ||||
state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR | ||||
The IO state flag to set. | ||||
Brian Granger
|
r2695 | This is thread safe as it uses the thread safe IOLoop.add_callback. | ||
""" | ||||
def add_io_state_callback(): | ||||
if not self.iostate & state: | ||||
self.iostate = self.iostate | state | ||||
self.ioloop.update_handler(self.socket, self.iostate) | ||||
self.ioloop.add_callback(add_io_state_callback) | ||||
def drop_io_state(self, state): | ||||
"""Drop IO state from the eventloop. | ||||
Brian Granger
|
r2699 | Parameters | ||
---------- | ||||
state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR | ||||
The IO state flag to set. | ||||
Brian Granger
|
r2695 | This is thread safe as it uses the thread safe IOLoop.add_callback. | ||
""" | ||||
def drop_io_state_callback(): | ||||
if self.iostate & state: | ||||
self.iostate = self.iostate & (~state) | ||||
self.ioloop.update_handler(self.socket, self.iostate) | ||||
self.ioloop.add_callback(drop_io_state_callback) | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | class XReqSocketChannel(ZmqSocketChannel): | ||
"""The XREQ channel for issues request/replies to the kernel. | ||||
""" | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | command_queue = None | ||
def __init__(self, context, session, address): | ||||
self.command_queue = Queue() | ||||
super(XReqSocketChannel, self).__init__(context, session, address) | ||||
Brian Granger
|
r2606 | |||
def run(self): | ||||
Brian Granger
|
r2699 | """The thread's main activity. Call start() instead.""" | ||
self.socket = self.context.socket(zmq.XREQ) | ||||
Brian Granger
|
r2606 | self.socket.setsockopt(zmq.IDENTITY, self.session.session) | ||
epatters
|
r2632 | self.socket.connect('tcp://%s:%i' % self.address) | ||
Brian Granger
|
r2606 | self.ioloop = ioloop.IOLoop() | ||
Brian Granger
|
r2699 | self.iostate = POLLERR|POLLIN | ||
Brian Granger
|
r2606 | self.ioloop.add_handler(self.socket, self._handle_events, | ||
Brian Granger
|
r2695 | self.iostate) | ||
Brian Granger
|
r2606 | self.ioloop.start() | ||
epatters
|
r2632 | def stop(self): | ||
self.ioloop.stop() | ||||
Brian Granger
|
r2699 | super(XReqSocketChannel, self).stop() | ||
Brian Granger
|
r2606 | |||
def call_handlers(self, msg): | ||||
Brian Granger
|
r2692 | """This method is called in the ioloop thread when a message arrives. | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2692 | Subclasses should override this method to handle incoming messages. | ||
It is important to remember that this method is called in the thread | ||||
so that some logic must be done to ensure that the application leve | ||||
handlers are called in the application thread. | ||||
""" | ||||
raise NotImplementedError('call_handlers must be defined in a subclass.') | ||||
Brian Granger
|
r2606 | |||
Fernando Perez
|
r2926 | def execute(self, code, silent=False, | ||
user_variables=None, user_expressions=None): | ||||
Brian Granger
|
r2699 | """Execute code in the kernel. | ||
epatters
|
r2672 | |||
Parameters | ||||
---------- | ||||
Brian Granger
|
r2699 | code : str | ||
A string of Python code. | ||||
Fernando Perez
|
r2926 | |||
epatters
|
r2844 | silent : bool, optional (default False) | ||
If set, the kernel will execute the code as quietly possible. | ||||
epatters
|
r2614 | |||
Fernando Perez
|
r2926 | user_variables : list, optional | ||
A list of variable names to pull from the user's namespace. They | ||||
will come back as a dict with these names as keys and their | ||||
:func:`repr` as values. | ||||
user_expressions : dict, optional | ||||
A dict with string keys and to pull from the user's | ||||
namespace. They will come back as a dict with these names as keys | ||||
and their :func:`repr` as values. | ||||
Brian Granger
|
r2699 | Returns | ||
------- | ||||
The msg_id of the message sent. | ||||
""" | ||||
Fernando Perez
|
r2926 | if user_variables is None: | ||
user_variables = [] | ||||
if user_expressions is None: | ||||
user_expressions = {} | ||||
# Don't waste network traffic if inputs are invalid | ||||
if not isinstance(code, basestring): | ||||
raise ValueError('code %r must be a string' % code) | ||||
validate_string_list(user_variables) | ||||
validate_string_dict(user_expressions) | ||||
Brian Granger
|
r2699 | # Create class for content/msg creation. Related to, but possibly | ||
# not in Session. | ||||
Fernando Perez
|
r2926 | content = dict(code=code, silent=silent, | ||
user_variables=user_variables, | ||||
user_expressions=user_expressions) | ||||
Brian Granger
|
r2699 | msg = self.session.msg('execute_request', content) | ||
self._queue_request(msg) | ||||
return msg['header']['msg_id'] | ||||
Brian Granger
|
r2606 | |||
Fernando Perez
|
r2839 | def complete(self, text, line, cursor_pos, block=None): | ||
epatters
|
r2841 | """Tab complete text in the kernel's namespace. | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | Parameters | ||
---------- | ||||
text : str | ||||
The text to complete. | ||||
line : str | ||||
The full line of text that is the surrounding context for the | ||||
text to complete. | ||||
epatters
|
r2841 | cursor_pos : int | ||
The position of the cursor in the line where the completion was | ||||
requested. | ||||
block : str, optional | ||||
Brian Granger
|
r2699 | The full block of code in which the completion is being requested. | ||
Returns | ||||
------- | ||||
The msg_id of the message sent. | ||||
""" | ||||
Fernando Perez
|
r2839 | content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos) | ||
Brian Granger
|
r2699 | msg = self.session.msg('complete_request', content) | ||
self._queue_request(msg) | ||||
return msg['header']['msg_id'] | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | def object_info(self, oname): | ||
"""Get metadata information about an object. | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | Parameters | ||
---------- | ||||
oname : str | ||||
A string specifying the object name. | ||||
Returns | ||||
------- | ||||
The msg_id of the message sent. | ||||
""" | ||||
content = dict(oname=oname) | ||||
msg = self.session.msg('object_info_request', content) | ||||
self._queue_request(msg) | ||||
return msg['header']['msg_id'] | ||||
epatters
|
r2632 | |||
epatters
|
r2844 | def history(self, index=None, raw=False, output=True): | ||
"""Get the history list. | ||||
Parameters | ||||
---------- | ||||
index : n or (n1, n2) or None | ||||
If n, then the last entries. If a tuple, then all in | ||||
range(n1, n2). If None, then all entries. Raises IndexError if | ||||
the format of index is incorrect. | ||||
raw : bool | ||||
If True, return the raw input. | ||||
output : bool | ||||
If True, then return the output as well. | ||||
Returns | ||||
------- | ||||
The msg_id of the message sent. | ||||
""" | ||||
content = dict(index=index, raw=raw, output=output) | ||||
msg = self.session.msg('history_request', content) | ||||
self._queue_request(msg) | ||||
return msg['header']['msg_id'] | ||||
Brian Granger
|
r2606 | def _handle_events(self, socket, events): | ||
if events & POLLERR: | ||||
self._handle_err() | ||||
if events & POLLOUT: | ||||
self._handle_send() | ||||
if events & POLLIN: | ||||
self._handle_recv() | ||||
def _handle_recv(self): | ||||
msg = self.socket.recv_json() | ||||
epatters
|
r2609 | self.call_handlers(msg) | ||
Brian Granger
|
r2606 | |||
def _handle_send(self): | ||||
try: | ||||
msg = self.command_queue.get(False) | ||||
except Empty: | ||||
pass | ||||
else: | ||||
self.socket.send_json(msg) | ||||
Brian Granger
|
r2695 | if self.command_queue.empty(): | ||
self.drop_io_state(POLLOUT) | ||||
Brian Granger
|
r2606 | |||
def _handle_err(self): | ||||
Brian Granger
|
r2692 | # We don't want to let this go silently, so eventually we should log. | ||
Brian Granger
|
r2694 | raise zmq.ZMQError() | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | def _queue_request(self, msg): | ||
Brian Granger
|
r2606 | self.command_queue.put(msg) | ||
Brian Granger
|
r2695 | self.add_io_state(POLLOUT) | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | |||
class SubSocketChannel(ZmqSocketChannel): | ||||
"""The SUB channel which listens for messages that the kernel publishes. | ||||
""" | ||||
def __init__(self, context, session, address): | ||||
super(SubSocketChannel, self).__init__(context, session, address) | ||||
def run(self): | ||||
"""The thread's main activity. Call start() instead.""" | ||||
self.socket = self.context.socket(zmq.SUB) | ||||
self.socket.setsockopt(zmq.SUBSCRIBE,'') | ||||
self.socket.setsockopt(zmq.IDENTITY, self.session.session) | ||||
self.socket.connect('tcp://%s:%i' % self.address) | ||||
self.ioloop = ioloop.IOLoop() | ||||
self.iostate = POLLIN|POLLERR | ||||
self.ioloop.add_handler(self.socket, self._handle_events, | ||||
self.iostate) | ||||
self.ioloop.start() | ||||
def stop(self): | ||||
self.ioloop.stop() | ||||
super(SubSocketChannel, self).stop() | ||||
Brian Granger
|
r2697 | def call_handlers(self, msg): | ||
"""This method is called in the ioloop thread when a message arrives. | ||||
Subclasses should override this method to handle incoming messages. | ||||
It is important to remember that this method is called in the thread | ||||
so that some logic must be done to ensure that the application leve | ||||
handlers are called in the application thread. | ||||
""" | ||||
raise NotImplementedError('call_handlers must be defined in a subclass.') | ||||
Brian Granger
|
r2699 | def flush(self, timeout=1.0): | ||
"""Immediately processes all pending messages on the SUB channel. | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2742 | Callers should use this method to ensure that :method:`call_handlers` | ||
has been called for all messages that have been received on the | ||||
0MQ SUB socket of this channel. | ||||
Brian Granger
|
r2699 | This method is thread safe. | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | Parameters | ||
---------- | ||||
timeout : float, optional | ||||
The maximum amount of time to spend flushing, in seconds. The | ||||
default is one second. | ||||
""" | ||||
# We do the IOLoop callback process twice to ensure that the IOLoop | ||||
# gets to perform at least one full poll. | ||||
stop_time = time.time() + timeout | ||||
for i in xrange(2): | ||||
self._flushed = False | ||||
self.ioloop.add_callback(self._flush) | ||||
while not self._flushed and time.time() < stop_time: | ||||
time.sleep(0.01) | ||||
def _handle_events(self, socket, events): | ||||
# Turn on and off POLLOUT depending on if we have made a request | ||||
if events & POLLERR: | ||||
self._handle_err() | ||||
if events & POLLIN: | ||||
self._handle_recv() | ||||
def _handle_err(self): | ||||
# We don't want to let this go silently, so eventually we should log. | ||||
raise zmq.ZMQError() | ||||
def _handle_recv(self): | ||||
# Get all of the messages we can | ||||
while True: | ||||
try: | ||||
msg = self.socket.recv_json(zmq.NOBLOCK) | ||||
except zmq.ZMQError: | ||||
# Check the errno? | ||||
Brian Granger
|
r2742 | # Will this trigger POLLERR? | ||
Brian Granger
|
r2699 | break | ||
else: | ||||
self.call_handlers(msg) | ||||
def _flush(self): | ||||
"""Callback for :method:`self.flush`.""" | ||||
self._flushed = True | ||||
Brian Granger
|
r2606 | |||
class RepSocketChannel(ZmqSocketChannel): | ||||
Brian Granger
|
r2699 | """A reply channel to handle raw_input requests that the kernel makes.""" | ||
Brian Granger
|
r2606 | |||
epatters
|
r2707 | msg_queue = None | ||
def __init__(self, context, session, address): | ||||
self.msg_queue = Queue() | ||||
super(RepSocketChannel, self).__init__(context, session, address) | ||||
epatters
|
r2701 | def run(self): | ||
"""The thread's main activity. Call start() instead.""" | ||||
epatters
|
r2707 | self.socket = self.context.socket(zmq.XREQ) | ||
self.socket.setsockopt(zmq.IDENTITY, self.session.session) | ||||
self.socket.connect('tcp://%s:%i' % self.address) | ||||
epatters
|
r2701 | self.ioloop = ioloop.IOLoop() | ||
epatters
|
r2707 | self.iostate = POLLERR|POLLIN | ||
self.ioloop.add_handler(self.socket, self._handle_events, | ||||
self.iostate) | ||||
epatters
|
r2701 | self.ioloop.start() | ||
def stop(self): | ||||
self.ioloop.stop() | ||||
epatters
|
r2702 | super(RepSocketChannel, self).stop() | ||
Brian Granger
|
r2606 | |||
epatters
|
r2707 | def call_handlers(self, msg): | ||
"""This method is called in the ioloop thread when a message arrives. | ||||
Subclasses should override this method to handle incoming messages. | ||||
It is important to remember that this method is called in the thread | ||||
so that some logic must be done to ensure that the application leve | ||||
handlers are called in the application thread. | ||||
""" | ||||
raise NotImplementedError('call_handlers must be defined in a subclass.') | ||||
epatters
|
r2730 | def input(self, string): | ||
"""Send a string of raw input to the kernel.""" | ||||
content = dict(value=string) | ||||
msg = self.session.msg('input_reply', content) | ||||
epatters
|
r2707 | self._queue_reply(msg) | ||
def _handle_events(self, socket, events): | ||||
if events & POLLERR: | ||||
self._handle_err() | ||||
if events & POLLOUT: | ||||
self._handle_send() | ||||
if events & POLLIN: | ||||
self._handle_recv() | ||||
def _handle_recv(self): | ||||
msg = self.socket.recv_json() | ||||
self.call_handlers(msg) | ||||
def _handle_send(self): | ||||
try: | ||||
msg = self.msg_queue.get(False) | ||||
except Empty: | ||||
pass | ||||
else: | ||||
self.socket.send_json(msg) | ||||
if self.msg_queue.empty(): | ||||
self.drop_io_state(POLLOUT) | ||||
def _handle_err(self): | ||||
# We don't want to let this go silently, so eventually we should log. | ||||
raise zmq.ZMQError() | ||||
def _queue_reply(self, msg): | ||||
self.msg_queue.put(msg) | ||||
self.add_io_state(POLLOUT) | ||||
epatters
|
r2611 | |||
Brian Granger
|
r2910 | class HBSocketChannel(ZmqSocketChannel): | ||
epatters
|
r2915 | """The heartbeat channel which monitors the kernel heartbeat.""" | ||
Brian Granger
|
r2910 | |||
Brian Granger
|
r2925 | time_to_dead = 3.0 | ||
Brian Granger
|
r2910 | socket = None | ||
poller = None | ||||
def __init__(self, context, session, address): | ||||
super(HBSocketChannel, self).__init__(context, session, address) | ||||
epatters
|
r2915 | self._running = False | ||
Brian Granger
|
r2910 | |||
def _create_socket(self): | ||||
self.socket = self.context.socket(zmq.REQ) | ||||
self.socket.setsockopt(zmq.IDENTITY, self.session.session) | ||||
self.socket.connect('tcp://%s:%i' % self.address) | ||||
self.poller = zmq.Poller() | ||||
self.poller.register(self.socket, zmq.POLLIN) | ||||
def run(self): | ||||
"""The thread's main activity. Call start() instead.""" | ||||
self._create_socket() | ||||
epatters
|
r2915 | self._running = True | ||
Brian Granger
|
r2925 | # Wait 2 seconds for the kernel to come up and the sockets to auto | ||
# connect. If we don't we will see the kernel as dead. Also, before | ||||
# the sockets are connected, the poller.poll line below is returning | ||||
# too fast. This avoids that because the polling doesn't start until | ||||
# after the sockets are connected. | ||||
time.sleep(2.0) | ||||
epatters
|
r2915 | while self._running: | ||
Brian Granger
|
r2910 | since_last_heartbeat = 0.0 | ||
request_time = time.time() | ||||
try: | ||||
Fernando Perez
|
r2926 | #io.rprint('Ping from HB channel') # dbg | ||
Brian Granger
|
r2910 | self.socket.send_json('ping') | ||
except zmq.ZMQError, e: | ||||
Fernando Perez
|
r2926 | #io.rprint('*** HB Error:', e) # dbg | ||
Brian Granger
|
r2910 | if e.errno == zmq.EFSM: | ||
Fernando Perez
|
r2926 | #io.rprint('sleep...', self.time_to_dead) # dbg | ||
Brian Granger
|
r2910 | time.sleep(self.time_to_dead) | ||
self._create_socket() | ||||
else: | ||||
raise | ||||
else: | ||||
while True: | ||||
try: | ||||
Fernando Perez
|
r2926 | self.socket.recv_json(zmq.NOBLOCK) | ||
Brian Granger
|
r2910 | except zmq.ZMQError, e: | ||
Fernando Perez
|
r2926 | #io.rprint('*** HB Error 2:', e) # dbg | ||
Brian Granger
|
r2910 | if e.errno == zmq.EAGAIN: | ||
Fernando Perez
|
r2926 | before_poll = time.time() | ||
until_dead = self.time_to_dead - (before_poll - | ||||
epatters
|
r2915 | request_time) | ||
Fernando Perez
|
r2926 | |||
# When the return value of poll() is an empty list, | ||||
# that is when things have gone wrong (zeromq bug). | ||||
# As long as it is not an empty list, poll is | ||||
# working correctly even if it returns quickly. | ||||
# Note: poll timeout is in milliseconds. | ||||
self.poller.poll(1000*until_dead) | ||||
Brian Granger
|
r2910 | since_last_heartbeat = time.time() - request_time | ||
if since_last_heartbeat > self.time_to_dead: | ||||
self.call_handlers(since_last_heartbeat) | ||||
break | ||||
else: | ||||
epatters
|
r2915 | # FIXME: We should probably log this instead. | ||
Brian Granger
|
r2910 | raise | ||
else: | ||||
epatters
|
r2915 | until_dead = self.time_to_dead - (time.time() - | ||
request_time) | ||||
Brian Granger
|
r2910 | if until_dead > 0.0: | ||
Fernando Perez
|
r2926 | #io.rprint('sleep...', self.time_to_dead) # dbg | ||
Brian Granger
|
r2910 | time.sleep(until_dead) | ||
break | ||||
epatters
|
r2915 | def stop(self): | ||
self._running = False | ||||
super(HBSocketChannel, self).stop() | ||||
Brian Granger
|
r2910 | def call_handlers(self, since_last_heartbeat): | ||
"""This method is called in the ioloop thread when a message arrives. | ||||
Subclasses should override this method to handle incoming messages. | ||||
It is important to remember that this method is called in the thread | ||||
so that some logic must be done to ensure that the application leve | ||||
handlers are called in the application thread. | ||||
""" | ||||
raise NotImplementedError('call_handlers must be defined in a subclass.') | ||||
Brian Granger
|
r2699 | #----------------------------------------------------------------------------- | ||
# Main kernel manager class | ||||
#----------------------------------------------------------------------------- | ||||
epatters
|
r2611 | class KernelManager(HasTraits): | ||
epatters
|
r2631 | """ Manages a kernel for a frontend. | ||
epatters
|
r2611 | |||
epatters
|
r2631 | The SUB channel is for the frontend to receive messages published by the | ||
kernel. | ||||
The REQ channel is for the frontend to make requests of the kernel. | ||||
The REP channel is for the kernel to request stdin (raw_input) from the | ||||
frontend. | ||||
""" | ||||
epatters
|
r2611 | # The PyZMQ Context to use for communication with the kernel. | ||
Brian Granger
|
r2753 | context = Instance(zmq.Context,(),{}) | ||
epatters
|
r2611 | |||
# The Session to use for communication with the kernel. | ||||
Brian Granger
|
r2753 | session = Instance(Session,(),{}) | ||
epatters
|
r2611 | |||
epatters
|
r2730 | # The kernel process with which the KernelManager is communicating. | ||
kernel = Instance(Popen) | ||||
epatters
|
r2758 | # The addresses for the communication channels. | ||
xreq_address = TCPAddress((LOCALHOST, 0)) | ||||
sub_address = TCPAddress((LOCALHOST, 0)) | ||||
rep_address = TCPAddress((LOCALHOST, 0)) | ||||
Brian Granger
|
r2910 | hb_address = TCPAddress((LOCALHOST, 0)) | ||
epatters
|
r2758 | |||
epatters
|
r2611 | # The classes to use for the various channels. | ||
xreq_channel_class = Type(XReqSocketChannel) | ||||
Brian Granger
|
r2699 | sub_channel_class = Type(SubSocketChannel) | ||
epatters
|
r2611 | rep_channel_class = Type(RepSocketChannel) | ||
Brian Granger
|
r2910 | hb_channel_class = Type(HBSocketChannel) | ||
epatters
|
r2631 | |||
epatters
|
r2611 | # Protected traits. | ||
epatters
|
r2851 | _launch_args = Any | ||
epatters
|
r2611 | _xreq_channel = Any | ||
Brian Granger
|
r2699 | _sub_channel = Any | ||
epatters
|
r2611 | _rep_channel = Any | ||
Brian Granger
|
r2910 | _hb_channel = Any | ||
epatters
|
r2611 | |||
epatters
|
r2758 | #-------------------------------------------------------------------------- | ||
epatters
|
r2686 | # Channel management methods: | ||
#-------------------------------------------------------------------------- | ||||
epatters
|
r2611 | |||
Brian Granger
|
r2699 | def start_channels(self): | ||
"""Starts the channels for this kernel. | ||||
This will create the channels if they do not exist and then start | ||||
them. If port numbers of 0 are being used (random ports) then you | ||||
must first call :method:`start_kernel`. If the channels have been | ||||
stopped and you call this, :class:`RuntimeError` will be raised. | ||||
epatters
|
r2639 | """ | ||
Brian Granger
|
r2699 | self.xreq_channel.start() | ||
self.sub_channel.start() | ||||
self.rep_channel.start() | ||||
Brian Granger
|
r2910 | self.hb_channel.start() | ||
epatters
|
r2639 | |||
Brian Granger
|
r2699 | def stop_channels(self): | ||
"""Stops the channels for this kernel. | ||||
This stops the channels by joining their threads. If the channels | ||||
were not started, :class:`RuntimeError` will be raised. | ||||
""" | ||||
self.xreq_channel.stop() | ||||
self.sub_channel.stop() | ||||
self.rep_channel.stop() | ||||
Brian Granger
|
r2910 | self.hb_channel.stop() | ||
epatters
|
r2686 | |||
Brian Granger
|
r2699 | @property | ||
def channels_running(self): | ||||
"""Are all of the channels created and running?""" | ||||
return self.xreq_channel.is_alive() \ | ||||
and self.sub_channel.is_alive() \ | ||||
Brian Granger
|
r2910 | and self.rep_channel.is_alive() \ | ||
and self.hb_channel.is_alive() | ||||
epatters
|
r2639 | |||
epatters
|
r2686 | #-------------------------------------------------------------------------- | ||
# Kernel process management methods: | ||||
#-------------------------------------------------------------------------- | ||||
epatters
|
r2851 | def start_kernel(self, **kw): | ||
epatters
|
r2686 | """Starts a kernel process and configures the manager to use it. | ||
Brian Granger
|
r2699 | If random ports (port=0) are being used, this method must be called | ||
before the channels are created. | ||||
epatters
|
r2758 | |||
Parameters: | ||||
----------- | ||||
epatters
|
r2778 | ipython : bool, optional (default True) | ||
Whether to use an IPython kernel instead of a plain Python kernel. | ||||
epatters
|
r2611 | """ | ||
Brian Granger
|
r2910 | xreq, sub, rep, hb = self.xreq_address, self.sub_address, \ | ||
self.rep_address, self.hb_address | ||||
epatters
|
r2915 | if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or \ | ||
rep[0] != LOCALHOST or hb[0] != LOCALHOST: | ||||
epatters
|
r2667 | raise RuntimeError("Can only launch a kernel on localhost." | ||
"Make sure that the '*_address' attributes are " | ||||
"configured properly.") | ||||
epatters
|
r2851 | self._launch_args = kw.copy() | ||
if kw.pop('ipython', True): | ||||
epatters
|
r2778 | from ipkernel import launch_kernel as launch | ||
else: | ||||
from pykernel import launch_kernel as launch | ||||
Brian Granger
|
r2910 | self.kernel, xrep, pub, req, hb = launch( | ||
epatters
|
r2915 | xrep_port=xreq[1], pub_port=sub[1], | ||
req_port=rep[1], hb_port=hb[1], **kw) | ||||
Brian Granger
|
r2753 | self.xreq_address = (LOCALHOST, xrep) | ||
self.sub_address = (LOCALHOST, pub) | ||||
self.rep_address = (LOCALHOST, req) | ||||
Brian Granger
|
r2910 | self.hb_address = (LOCALHOST, hb) | ||
epatters
|
r2686 | |||
epatters
|
r2851 | def restart_kernel(self): | ||
"""Restarts a kernel with the same arguments that were used to launch | ||||
it. If the old kernel was launched with random ports, the same ports | ||||
will be used for the new kernel. | ||||
""" | ||||
if self._launch_args is None: | ||||
raise RuntimeError("Cannot restart the kernel. " | ||||
"No previous call to 'start_kernel'.") | ||||
else: | ||||
if self.has_kernel: | ||||
self.kill_kernel() | ||||
epatters
|
r2915 | self.start_kernel(**self._launch_args) | ||
epatters
|
r2851 | |||
epatters
|
r2686 | @property | ||
def has_kernel(self): | ||||
"""Returns whether a kernel process has been specified for the kernel | ||||
manager. | ||||
epatters
|
r2632 | """ | ||
epatters
|
r2730 | return self.kernel is not None | ||
epatters
|
r2686 | |||
def kill_kernel(self): | ||||
""" Kill the running kernel. """ | ||||
epatters
|
r2730 | if self.kernel is not None: | ||
self.kernel.kill() | ||||
self.kernel = None | ||||
epatters
|
r2639 | else: | ||
epatters
|
r2686 | raise RuntimeError("Cannot kill kernel. No kernel is running!") | ||
epatters
|
r2611 | |||
def signal_kernel(self, signum): | ||||
epatters
|
r2686 | """ Sends a signal to the kernel. """ | ||
epatters
|
r2730 | if self.kernel is not None: | ||
self.kernel.send_signal(signum) | ||||
epatters
|
r2686 | else: | ||
raise RuntimeError("Cannot signal kernel. No kernel is running!") | ||||
epatters
|
r2611 | |||
Brian Granger
|
r2699 | @property | ||
def is_alive(self): | ||||
"""Is the kernel process still running?""" | ||||
epatters
|
r2730 | if self.kernel is not None: | ||
if self.kernel.poll() is None: | ||||
Brian Granger
|
r2699 | return True | ||
else: | ||||
return False | ||||
else: | ||||
# We didn't start the kernel with this KernelManager so we don't | ||||
# know if it is running. We should use a heartbeat for this case. | ||||
return True | ||||
epatters
|
r2632 | #-------------------------------------------------------------------------- | ||
# Channels used for communication with the kernel: | ||||
#-------------------------------------------------------------------------- | ||||
epatters
|
r2611 | @property | ||
def xreq_channel(self): | ||||
"""Get the REQ socket channel object to make requests of the kernel.""" | ||||
if self._xreq_channel is None: | ||||
epatters
|
r2631 | self._xreq_channel = self.xreq_channel_class(self.context, | ||
Brian Granger
|
r2699 | self.session, | ||
self.xreq_address) | ||||
epatters
|
r2611 | return self._xreq_channel | ||
@property | ||||
Brian Granger
|
r2699 | def sub_channel(self): | ||
"""Get the SUB socket channel object.""" | ||||
if self._sub_channel is None: | ||||
self._sub_channel = self.sub_channel_class(self.context, | ||||
self.session, | ||||
self.sub_address) | ||||
return self._sub_channel | ||||
@property | ||||
epatters
|
r2611 | def rep_channel(self): | ||
"""Get the REP socket channel object to handle stdin (raw_input).""" | ||||
if self._rep_channel is None: | ||||
epatters
|
r2631 | self._rep_channel = self.rep_channel_class(self.context, | ||
Brian Granger
|
r2699 | self.session, | ||
self.rep_address) | ||||
epatters
|
r2611 | return self._rep_channel | ||
Brian Granger
|
r2910 | |||
@property | ||||
def hb_channel(self): | ||||
"""Get the REP socket channel object to handle stdin (raw_input).""" | ||||
if self._hb_channel is None: | ||||
self._hb_channel = self.hb_channel_class(self.context, | ||||
self.session, | ||||
self.hb_address) | ||||
return self._hb_channel | ||||