kernelmanager.py
458 lines
| 14.9 KiB
| text/x-python
|
PythonLexer
Brian Granger
|
r2606 | """Kernel frontend classes. | ||
epatters
|
r2609 | TODO: Create logger to handle debugging and console messages. | ||
Brian Granger
|
r2606 | |||
""" | ||||
epatters
|
r2611 | # Standard library imports. | ||
Brian Granger
|
r2606 | from Queue import Queue, Empty | ||
epatters
|
r2667 | from subprocess import Popen | ||
Brian Granger
|
r2606 | from threading import Thread | ||
epatters
|
r2614 | import time | ||
Brian Granger
|
r2606 | import traceback | ||
epatters
|
r2611 | # System library imports. | ||
Brian Granger
|
r2606 | import zmq | ||
from zmq import POLLIN, POLLOUT, POLLERR | ||||
from zmq.eventloop import ioloop | ||||
epatters
|
r2611 | |||
# Local imports. | ||||
epatters
|
r2639 | from IPython.utils.traitlets import HasTraits, Any, Bool, Int, Instance, Str, \ | ||
Type | ||||
epatters
|
r2667 | from kernel import launch_kernel | ||
Brian Granger
|
r2606 | from session import Session | ||
epatters
|
r2667 | # Constants. | ||
LOCALHOST = '127.0.0.1' | ||||
Brian Granger
|
r2606 | |||
class MissingHandlerError(Exception): | ||||
pass | ||||
class ZmqSocketChannel(Thread): | ||||
epatters
|
r2631 | """ The base class for the channels that use ZMQ sockets. | ||
""" | ||||
Brian Granger
|
r2606 | |||
epatters
|
r2632 | def __init__(self, context, session, address=None): | ||
super(ZmqSocketChannel, self).__init__() | ||||
self.daemon = True | ||||
Brian Granger
|
r2606 | self.context = context | ||
self.session = session | ||||
epatters
|
r2632 | self.address = address | ||
epatters
|
r2631 | self.socket = None | ||
epatters
|
r2632 | def stop(self): | ||
Brian Granger
|
r2691 | """Stop the thread's activity. Returns when the thread terminates. | ||
The thread will raise :class:`RuntimeError` if :method:`self.start` | ||||
is called again. | ||||
epatters
|
r2632 | """ | ||
epatters
|
r2642 | self.join() | ||
epatters
|
r2632 | |||
def get_address(self): | ||||
epatters
|
r2667 | """ Get the channel's address. By the default, a channel is on | ||
localhost with no port specified (a negative port number). | ||||
epatters
|
r2632 | """ | ||
return self._address | ||||
def set_adresss(self, address): | ||||
""" Set the channel's address. Should be a tuple of form: | ||||
epatters
|
r2667 | (ip address [str], port [int]). | ||
or None, in which case the address is reset to its default value. | ||||
epatters
|
r2632 | """ | ||
# FIXME: Validate address. | ||||
Brian Granger
|
r2691 | if self.is_alive(): # This is Thread.is_alive | ||
epatters
|
r2632 | raise RuntimeError("Cannot set address on a running channel!") | ||
else: | ||||
epatters
|
r2667 | if address is None: | ||
Brian Granger
|
r2690 | address = (LOCALHOST, 0) | ||
epatters
|
r2632 | self._address = address | ||
address = property(get_address, set_adresss) | ||||
Brian Granger
|
r2606 | |||
class SubSocketChannel(ZmqSocketChannel): | ||||
epatters
|
r2632 | def __init__(self, context, session, address=None): | ||
super(SubSocketChannel, self).__init__(context, session, address) | ||||
Brian Granger
|
r2606 | |||
def run(self): | ||||
self.socket = self.context.socket(zmq.SUB) | ||||
self.socket.setsockopt(zmq.SUBSCRIBE,'') | ||||
self.socket.setsockopt(zmq.IDENTITY, self.session.session) | ||||
epatters
|
r2632 | self.socket.connect('tcp://%s:%i' % self.address) | ||
Brian Granger
|
r2606 | self.ioloop = ioloop.IOLoop() | ||
self.ioloop.add_handler(self.socket, self._handle_events, | ||||
POLLIN|POLLERR) | ||||
self.ioloop.start() | ||||
epatters
|
r2632 | def stop(self): | ||
self.ioloop.stop() | ||||
epatters
|
r2642 | super(SubSocketChannel, self).stop() | ||
epatters
|
r2632 | |||
Brian Granger
|
r2606 | def _handle_events(self, socket, events): | ||
# Turn on and off POLLOUT depending on if we have made a request | ||||
if events & POLLERR: | ||||
self._handle_err() | ||||
if events & POLLIN: | ||||
self._handle_recv() | ||||
def _handle_err(self): | ||||
Brian Granger
|
r2692 | # We don't want to let this go silently, so eventually we should log. | ||
Brian Granger
|
r2606 | raise zmq.ZmqError() | ||
def _handle_recv(self): | ||||
msg = self.socket.recv_json() | ||||
self.call_handlers(msg) | ||||
def call_handlers(self, msg): | ||||
Brian Granger
|
r2692 | """This method is called in the ioloop thread when a message arrives. | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2692 | Subclasses should override this method to handle incoming messages. | ||
It is important to remember that this method is called in the thread | ||||
so that some logic must be done to ensure that the application leve | ||||
handlers are called in the application thread. | ||||
""" | ||||
raise NotImplementedError('call_handlers must be defined in a subclass.') | ||||
Brian Granger
|
r2606 | |||
epatters
|
r2672 | def flush(self, timeout=1.0): | ||
"""Immediately processes all pending messages on the SUB channel. | ||||
This method is thread safe. | ||||
Parameters | ||||
---------- | ||||
timeout : float, optional | ||||
The maximum amount of time to spend flushing, in seconds. The | ||||
default is one second. | ||||
epatters
|
r2614 | """ | ||
epatters
|
r2672 | # We do the IOLoop callback process twice to ensure that the IOLoop | ||
# gets to perform at least one full poll. | ||||
stop_time = time.time() + timeout | ||||
for i in xrange(2): | ||||
self._flushed = False | ||||
self.ioloop.add_callback(self._flush) | ||||
while not self._flushed and time.time() < stop_time: | ||||
time.sleep(0.01) | ||||
epatters
|
r2614 | |||
def _flush(self): | ||||
"""Called in this thread by the IOLoop to indicate that all events have | ||||
epatters
|
r2672 | been processed. | ||
epatters
|
r2614 | """ | ||
self._flushed = True | ||||
Brian Granger
|
r2606 | |||
class XReqSocketChannel(ZmqSocketChannel): | ||||
handler_queue = None | ||||
command_queue = None | ||||
handlers = None | ||||
_overriden_call_handler = None | ||||
epatters
|
r2632 | def __init__(self, context, session, address=None): | ||
Brian Granger
|
r2606 | self.handlers = {} | ||
self.handler_queue = Queue() | ||||
self.command_queue = Queue() | ||||
epatters
|
r2632 | super(XReqSocketChannel, self).__init__(context, session, address) | ||
Brian Granger
|
r2606 | |||
def run(self): | ||||
self.socket = self.context.socket(zmq.XREQ) | ||||
self.socket.setsockopt(zmq.IDENTITY, self.session.session) | ||||
epatters
|
r2632 | self.socket.connect('tcp://%s:%i' % self.address) | ||
Brian Granger
|
r2606 | self.ioloop = ioloop.IOLoop() | ||
self.ioloop.add_handler(self.socket, self._handle_events, | ||||
POLLIN|POLLOUT|POLLERR) | ||||
self.ioloop.start() | ||||
epatters
|
r2632 | def stop(self): | ||
self.ioloop.stop() | ||||
epatters
|
r2642 | super(XReqSocketChannel, self).stop() | ||
epatters
|
r2632 | |||
Brian Granger
|
r2606 | def _handle_events(self, socket, events): | ||
# Turn on and off POLLOUT depending on if we have made a request | ||||
if events & POLLERR: | ||||
self._handle_err() | ||||
if events & POLLOUT: | ||||
self._handle_send() | ||||
if events & POLLIN: | ||||
self._handle_recv() | ||||
def _handle_recv(self): | ||||
msg = self.socket.recv_json() | ||||
epatters
|
r2609 | self.call_handlers(msg) | ||
Brian Granger
|
r2606 | |||
def _handle_send(self): | ||||
try: | ||||
msg = self.command_queue.get(False) | ||||
except Empty: | ||||
pass | ||||
else: | ||||
self.socket.send_json(msg) | ||||
def _handle_err(self): | ||||
Brian Granger
|
r2692 | # We don't want to let this go silently, so eventually we should log. | ||
Brian Granger
|
r2606 | raise zmq.ZmqError() | ||
def _queue_request(self, msg, callback): | ||||
handler = self._find_handler(msg['msg_type'], callback) | ||||
self.handler_queue.put(handler) | ||||
self.command_queue.put(msg) | ||||
def execute(self, code, callback=None): | ||||
# Create class for content/msg creation. Related to, but possibly | ||||
# not in Session. | ||||
content = dict(code=code) | ||||
msg = self.session.msg('execute_request', content) | ||||
self._queue_request(msg, callback) | ||||
return msg['header']['msg_id'] | ||||
def complete(self, text, line, block=None, callback=None): | ||||
content = dict(text=text, line=line) | ||||
msg = self.session.msg('complete_request', content) | ||||
epatters
|
r2609 | self._queue_request(msg, callback) | ||
Brian Granger
|
r2606 | return msg['header']['msg_id'] | ||
def object_info(self, oname, callback=None): | ||||
content = dict(oname=oname) | ||||
msg = self.session.msg('object_info_request', content) | ||||
epatters
|
r2609 | self._queue_request(msg, callback) | ||
Brian Granger
|
r2606 | return msg['header']['msg_id'] | ||
def _find_handler(self, name, callback): | ||||
if callback is not None: | ||||
return callback | ||||
handler = self.handlers.get(name) | ||||
if handler is None: | ||||
epatters
|
r2611 | raise MissingHandlerError( | ||
'No handler defined for method: %s' % name) | ||||
Brian Granger
|
r2606 | return handler | ||
def override_call_handler(self, func): | ||||
"""Permanently override the call_handler. | ||||
The function func will be called as:: | ||||
func(handler, msg) | ||||
And must call:: | ||||
handler(msg) | ||||
in the main thread. | ||||
""" | ||||
assert callable(func), "not a callable: %r" % func | ||||
self._overriden_call_handler = func | ||||
epatters
|
r2609 | def call_handlers(self, msg): | ||
try: | ||||
handler = self.handler_queue.get(False) | ||||
except Empty: | ||||
print "Message received with no handler!!!" | ||||
print msg | ||||
else: | ||||
self.call_handler(handler, msg) | ||||
Brian Granger
|
r2606 | def call_handler(self, handler, msg): | ||
if self._overriden_call_handler is not None: | ||||
self._overriden_call_handler(handler, msg) | ||||
elif hasattr(self, '_call_handler'): | ||||
call_handler = getattr(self, '_call_handler') | ||||
call_handler(handler, msg) | ||||
else: | ||||
raise RuntimeError('no handler!') | ||||
class RepSocketChannel(ZmqSocketChannel): | ||||
epatters
|
r2632 | def on_raw_input(self): | ||
Brian Granger
|
r2606 | pass | ||
epatters
|
r2611 | |||
class KernelManager(HasTraits): | ||||
epatters
|
r2631 | """ Manages a kernel for a frontend. | ||
epatters
|
r2611 | |||
epatters
|
r2631 | The SUB channel is for the frontend to receive messages published by the | ||
kernel. | ||||
The REQ channel is for the frontend to make requests of the kernel. | ||||
The REP channel is for the kernel to request stdin (raw_input) from the | ||||
frontend. | ||||
""" | ||||
epatters
|
r2611 | |||
epatters
|
r2639 | # Whether the kernel manager is currently listening on its channels. | ||
is_listening = Bool(False) | ||||
epatters
|
r2611 | # The PyZMQ Context to use for communication with the kernel. | ||
context = Instance(zmq.Context, ()) | ||||
# The Session to use for communication with the kernel. | ||||
session = Instance(Session, ()) | ||||
# The classes to use for the various channels. | ||||
sub_channel_class = Type(SubSocketChannel) | ||||
xreq_channel_class = Type(XReqSocketChannel) | ||||
rep_channel_class = Type(RepSocketChannel) | ||||
epatters
|
r2631 | |||
epatters
|
r2611 | # Protected traits. | ||
epatters
|
r2667 | _kernel = Instance(Popen) | ||
epatters
|
r2611 | _sub_channel = Any | ||
_xreq_channel = Any | ||||
_rep_channel = Any | ||||
epatters
|
r2686 | #-------------------------------------------------------------------------- | ||
# Channel management methods: | ||||
#-------------------------------------------------------------------------- | ||||
epatters
|
r2611 | |||
epatters
|
r2639 | def start_listening(self): | ||
epatters
|
r2686 | """Starts listening on the specified ports. If already listening, raises | ||
epatters
|
r2639 | a RuntimeError. | ||
""" | ||||
if self.is_listening: | ||||
raise RuntimeError("Cannot start listening. Already listening!") | ||||
else: | ||||
self.is_listening = True | ||||
self.sub_channel.start() | ||||
self.xreq_channel.start() | ||||
self.rep_channel.start() | ||||
epatters
|
r2686 | @property | ||
def is_alive(self): | ||||
""" Returns whether the kernel is alive. """ | ||||
if self.is_listening: | ||||
# TODO: check if alive. | ||||
return True | ||||
else: | ||||
return False | ||||
epatters
|
r2639 | def stop_listening(self): | ||
epatters
|
r2686 | """Stops listening. If not listening, does nothing. """ | ||
epatters
|
r2639 | if self.is_listening: | ||
self.is_listening = False | ||||
self.sub_channel.stop() | ||||
self.xreq_channel.stop() | ||||
self.rep_channel.stop() | ||||
epatters
|
r2686 | #-------------------------------------------------------------------------- | ||
# Kernel process management methods: | ||||
#-------------------------------------------------------------------------- | ||||
epatters
|
r2611 | def start_kernel(self): | ||
epatters
|
r2686 | """Starts a kernel process and configures the manager to use it. | ||
If ports have been specified via the address attributes, they are used. | ||||
Otherwise, open ports are chosen by the OS and the channel port | ||||
attributes are configured as appropriate. | ||||
epatters
|
r2611 | """ | ||
epatters
|
r2667 | xreq, sub = self.xreq_address, self.sub_address | ||
if xreq[0] != LOCALHOST or sub[0] != LOCALHOST: | ||||
raise RuntimeError("Can only launch a kernel on localhost." | ||||
"Make sure that the '*_address' attributes are " | ||||
"configured properly.") | ||||
epatters
|
r2686 | kernel, xrep, pub = launch_kernel(xrep_port=xreq[1], pub_port=sub[1]) | ||
self.set_kernel(kernel) | ||||
epatters
|
r2667 | self.xreq_address = (LOCALHOST, xrep) | ||
self.sub_address = (LOCALHOST, pub) | ||||
epatters
|
r2611 | |||
epatters
|
r2686 | def set_kernel(self, kernel): | ||
"""Sets the kernel manager's kernel to an existing kernel process. | ||||
It is *not* necessary to a set a kernel to communicate with it via the | ||||
channels, and those objects must be configured separately. It | ||||
*is* necessary to set a kernel if you want to use the manager (or | ||||
frontends that use the manager) to signal and/or kill the kernel. | ||||
Parameters: | ||||
----------- | ||||
kernel : Popen | ||||
An existing kernel process. | ||||
""" | ||||
self._kernel = kernel | ||||
@property | ||||
def has_kernel(self): | ||||
"""Returns whether a kernel process has been specified for the kernel | ||||
manager. | ||||
A kernel process can be set via 'start_kernel' or 'set_kernel'. | ||||
epatters
|
r2632 | """ | ||
epatters
|
r2686 | return self._kernel is not None | ||
def kill_kernel(self): | ||||
""" Kill the running kernel. """ | ||||
epatters
|
r2667 | if self._kernel: | ||
self._kernel.kill() | ||||
self._kernel = None | ||||
epatters
|
r2639 | else: | ||
epatters
|
r2686 | raise RuntimeError("Cannot kill kernel. No kernel is running!") | ||
epatters
|
r2611 | |||
def signal_kernel(self, signum): | ||||
epatters
|
r2686 | """ Sends a signal to the kernel. """ | ||
if self._kernel: | ||||
self._kernel.send_signal(signum) | ||||
else: | ||||
raise RuntimeError("Cannot signal kernel. No kernel is running!") | ||||
epatters
|
r2611 | |||
epatters
|
r2632 | #-------------------------------------------------------------------------- | ||
# Channels used for communication with the kernel: | ||||
#-------------------------------------------------------------------------- | ||||
epatters
|
r2611 | @property | ||
def sub_channel(self): | ||||
"""Get the SUB socket channel object.""" | ||||
if self._sub_channel is None: | ||||
epatters
|
r2631 | self._sub_channel = self.sub_channel_class(self.context, | ||
self.session) | ||||
epatters
|
r2611 | return self._sub_channel | ||
@property | ||||
def xreq_channel(self): | ||||
"""Get the REQ socket channel object to make requests of the kernel.""" | ||||
if self._xreq_channel is None: | ||||
epatters
|
r2631 | self._xreq_channel = self.xreq_channel_class(self.context, | ||
self.session) | ||||
epatters
|
r2611 | return self._xreq_channel | ||
@property | ||||
def rep_channel(self): | ||||
"""Get the REP socket channel object to handle stdin (raw_input).""" | ||||
if self._rep_channel is None: | ||||
epatters
|
r2631 | self._rep_channel = self.rep_channel_class(self.context, | ||
self.session) | ||||
epatters
|
r2611 | return self._rep_channel | ||
epatters
|
r2631 | |||
epatters
|
r2632 | #-------------------------------------------------------------------------- | ||
epatters
|
r2686 | # Delegates for the Channel address attributes: | ||
epatters
|
r2632 | #-------------------------------------------------------------------------- | ||
epatters
|
r2631 | def get_sub_address(self): | ||
epatters
|
r2632 | return self.sub_channel.address | ||
def set_sub_address(self, address): | ||||
self.sub_channel.address = address | ||||
epatters
|
r2631 | sub_address = property(get_sub_address, set_sub_address, | ||
doc="The address used by SUB socket channel.") | ||||
def get_xreq_address(self): | ||||
epatters
|
r2632 | return self.xreq_channel.address | ||
def set_xreq_address(self, address): | ||||
self.xreq_channel.address = address | ||||
epatters
|
r2631 | xreq_address = property(get_xreq_address, set_xreq_address, | ||
doc="The address used by XREQ socket channel.") | ||||
def get_rep_address(self): | ||||
epatters
|
r2632 | return self.rep_channel.address | ||
def set_rep_address(self, address): | ||||
self.rep_channel.address = address | ||||
epatters
|
r2631 | rep_address = property(get_rep_address, set_rep_address, | ||
doc="The address used by REP socket channel.") | ||||
epatters
|
r2632 | |||