kernelmanager.py
451 lines
| 14.1 KiB
| text/x-python
|
PythonLexer
Brian Granger
|
r2606 | """Kernel frontend classes. | ||
epatters
|
r2609 | TODO: Create logger to handle debugging and console messages. | ||
Brian Granger
|
r2606 | |||
""" | ||||
epatters
|
r2611 | # Standard library imports. | ||
Brian Granger
|
r2606 | from Queue import Queue, Empty | ||
epatters
|
r2667 | from subprocess import Popen | ||
Brian Granger
|
r2606 | from threading import Thread | ||
epatters
|
r2614 | import time | ||
Brian Granger
|
r2606 | import traceback | ||
epatters
|
r2611 | # System library imports. | ||
Brian Granger
|
r2606 | import zmq | ||
from zmq import POLLIN, POLLOUT, POLLERR | ||||
from zmq.eventloop import ioloop | ||||
epatters
|
r2611 | |||
# Local imports. | ||||
epatters
|
r2639 | from IPython.utils.traitlets import HasTraits, Any, Bool, Int, Instance, Str, \ | ||
Type | ||||
epatters
|
r2667 | from kernel import launch_kernel | ||
Brian Granger
|
r2606 | from session import Session | ||
epatters
|
r2667 | # Constants. | ||
LOCALHOST = '127.0.0.1' | ||||
Brian Granger
|
r2606 | |||
class MissingHandlerError(Exception): | ||||
pass | ||||
class ZmqSocketChannel(Thread): | ||||
epatters
|
r2631 | """ The base class for the channels that use ZMQ sockets. | ||
""" | ||||
Brian Granger
|
r2606 | |||
epatters
|
r2632 | def __init__(self, context, session, address=None): | ||
super(ZmqSocketChannel, self).__init__() | ||||
self.daemon = True | ||||
Brian Granger
|
r2606 | self.context = context | ||
self.session = session | ||||
epatters
|
r2632 | self.address = address | ||
epatters
|
r2631 | self.socket = None | ||
epatters
|
r2632 | def stop(self): | ||
""" Stop the thread's activity. Returns when the thread terminates. | ||||
""" | ||||
epatters
|
r2642 | self.join() | ||
# Allow the thread to be started again. | ||||
# FIXME: Although this works (and there's no reason why it shouldn't), | ||||
# it feels wrong. Is there a cleaner way to achieve this? | ||||
Thread.__init__(self) | ||||
epatters
|
r2632 | |||
def get_address(self): | ||||
epatters
|
r2667 | """ Get the channel's address. By the default, a channel is on | ||
localhost with no port specified (a negative port number). | ||||
epatters
|
r2632 | """ | ||
return self._address | ||||
def set_adresss(self, address): | ||||
""" Set the channel's address. Should be a tuple of form: | ||||
epatters
|
r2667 | (ip address [str], port [int]). | ||
or None, in which case the address is reset to its default value. | ||||
epatters
|
r2632 | """ | ||
# FIXME: Validate address. | ||||
if self.is_alive(): | ||||
raise RuntimeError("Cannot set address on a running channel!") | ||||
else: | ||||
epatters
|
r2667 | if address is None: | ||
address = (LOCALHOST, -1) | ||||
epatters
|
r2632 | self._address = address | ||
address = property(get_address, set_adresss) | ||||
Brian Granger
|
r2606 | |||
class SubSocketChannel(ZmqSocketChannel): | ||||
handlers = None | ||||
_overriden_call_handler = None | ||||
epatters
|
r2632 | def __init__(self, context, session, address=None): | ||
Brian Granger
|
r2606 | self.handlers = {} | ||
epatters
|
r2632 | super(SubSocketChannel, self).__init__(context, session, address) | ||
Brian Granger
|
r2606 | |||
def run(self): | ||||
self.socket = self.context.socket(zmq.SUB) | ||||
self.socket.setsockopt(zmq.SUBSCRIBE,'') | ||||
self.socket.setsockopt(zmq.IDENTITY, self.session.session) | ||||
epatters
|
r2632 | self.socket.connect('tcp://%s:%i' % self.address) | ||
Brian Granger
|
r2606 | self.ioloop = ioloop.IOLoop() | ||
self.ioloop.add_handler(self.socket, self._handle_events, | ||||
POLLIN|POLLERR) | ||||
self.ioloop.start() | ||||
epatters
|
r2632 | def stop(self): | ||
self.ioloop.stop() | ||||
epatters
|
r2642 | super(SubSocketChannel, self).stop() | ||
epatters
|
r2632 | |||
Brian Granger
|
r2606 | def _handle_events(self, socket, events): | ||
# Turn on and off POLLOUT depending on if we have made a request | ||||
if events & POLLERR: | ||||
self._handle_err() | ||||
if events & POLLIN: | ||||
self._handle_recv() | ||||
def _handle_err(self): | ||||
raise zmq.ZmqError() | ||||
def _handle_recv(self): | ||||
msg = self.socket.recv_json() | ||||
self.call_handlers(msg) | ||||
def override_call_handler(self, func): | ||||
"""Permanently override the call_handler. | ||||
The function func will be called as:: | ||||
func(handler, msg) | ||||
And must call:: | ||||
handler(msg) | ||||
in the main thread. | ||||
""" | ||||
assert callable(func), "not a callable: %r" % func | ||||
self._overriden_call_handler = func | ||||
def call_handlers(self, msg): | ||||
handler = self.handlers.get(msg['msg_type'], None) | ||||
if handler is not None: | ||||
try: | ||||
self.call_handler(handler, msg) | ||||
except: | ||||
# XXX: This should be logged at least | ||||
traceback.print_last() | ||||
def call_handler(self, handler, msg): | ||||
if self._overriden_call_handler is not None: | ||||
self._overriden_call_handler(handler, msg) | ||||
elif hasattr(self, '_call_handler'): | ||||
call_handler = getattr(self, '_call_handler') | ||||
call_handler(handler, msg) | ||||
else: | ||||
raise RuntimeError('no handler!') | ||||
def add_handler(self, callback, msg_type): | ||||
"""Register a callback for msg type.""" | ||||
self.handlers[msg_type] = callback | ||||
def remove_handler(self, msg_type): | ||||
epatters
|
r2614 | """Remove the callback for msg type.""" | ||
Brian Granger
|
r2606 | self.handlers.pop(msg_type, None) | ||
epatters
|
r2614 | def flush(self): | ||
"""Immediately processes all pending messages on the SUB channel. This | ||||
method is thread safe. | ||||
""" | ||||
self._flushed = False | ||||
self.ioloop.add_callback(self._flush) | ||||
while not self._flushed: | ||||
epatters
|
r2627 | time.sleep(0.01) | ||
epatters
|
r2614 | |||
def _flush(self): | ||||
"""Called in this thread by the IOLoop to indicate that all events have | ||||
been processed. | ||||
""" | ||||
self._flushed = True | ||||
Brian Granger
|
r2606 | |||
class XReqSocketChannel(ZmqSocketChannel): | ||||
handler_queue = None | ||||
command_queue = None | ||||
handlers = None | ||||
_overriden_call_handler = None | ||||
epatters
|
r2632 | def __init__(self, context, session, address=None): | ||
Brian Granger
|
r2606 | self.handlers = {} | ||
self.handler_queue = Queue() | ||||
self.command_queue = Queue() | ||||
epatters
|
r2632 | super(XReqSocketChannel, self).__init__(context, session, address) | ||
Brian Granger
|
r2606 | |||
def run(self): | ||||
self.socket = self.context.socket(zmq.XREQ) | ||||
self.socket.setsockopt(zmq.IDENTITY, self.session.session) | ||||
epatters
|
r2632 | self.socket.connect('tcp://%s:%i' % self.address) | ||
Brian Granger
|
r2606 | self.ioloop = ioloop.IOLoop() | ||
self.ioloop.add_handler(self.socket, self._handle_events, | ||||
POLLIN|POLLOUT|POLLERR) | ||||
self.ioloop.start() | ||||
epatters
|
r2632 | def stop(self): | ||
self.ioloop.stop() | ||||
epatters
|
r2642 | super(XReqSocketChannel, self).stop() | ||
epatters
|
r2632 | |||
Brian Granger
|
r2606 | def _handle_events(self, socket, events): | ||
# Turn on and off POLLOUT depending on if we have made a request | ||||
if events & POLLERR: | ||||
self._handle_err() | ||||
if events & POLLOUT: | ||||
self._handle_send() | ||||
if events & POLLIN: | ||||
self._handle_recv() | ||||
def _handle_recv(self): | ||||
msg = self.socket.recv_json() | ||||
epatters
|
r2609 | self.call_handlers(msg) | ||
Brian Granger
|
r2606 | |||
def _handle_send(self): | ||||
try: | ||||
msg = self.command_queue.get(False) | ||||
except Empty: | ||||
pass | ||||
else: | ||||
self.socket.send_json(msg) | ||||
def _handle_err(self): | ||||
raise zmq.ZmqError() | ||||
def _queue_request(self, msg, callback): | ||||
handler = self._find_handler(msg['msg_type'], callback) | ||||
self.handler_queue.put(handler) | ||||
self.command_queue.put(msg) | ||||
def execute(self, code, callback=None): | ||||
# Create class for content/msg creation. Related to, but possibly | ||||
# not in Session. | ||||
content = dict(code=code) | ||||
msg = self.session.msg('execute_request', content) | ||||
self._queue_request(msg, callback) | ||||
return msg['header']['msg_id'] | ||||
def complete(self, text, line, block=None, callback=None): | ||||
content = dict(text=text, line=line) | ||||
msg = self.session.msg('complete_request', content) | ||||
epatters
|
r2609 | self._queue_request(msg, callback) | ||
Brian Granger
|
r2606 | return msg['header']['msg_id'] | ||
def object_info(self, oname, callback=None): | ||||
content = dict(oname=oname) | ||||
msg = self.session.msg('object_info_request', content) | ||||
epatters
|
r2609 | self._queue_request(msg, callback) | ||
Brian Granger
|
r2606 | return msg['header']['msg_id'] | ||
def _find_handler(self, name, callback): | ||||
if callback is not None: | ||||
return callback | ||||
handler = self.handlers.get(name) | ||||
if handler is None: | ||||
epatters
|
r2611 | raise MissingHandlerError( | ||
'No handler defined for method: %s' % name) | ||||
Brian Granger
|
r2606 | return handler | ||
def override_call_handler(self, func): | ||||
"""Permanently override the call_handler. | ||||
The function func will be called as:: | ||||
func(handler, msg) | ||||
And must call:: | ||||
handler(msg) | ||||
in the main thread. | ||||
""" | ||||
assert callable(func), "not a callable: %r" % func | ||||
self._overriden_call_handler = func | ||||
epatters
|
r2609 | def call_handlers(self, msg): | ||
try: | ||||
handler = self.handler_queue.get(False) | ||||
except Empty: | ||||
print "Message received with no handler!!!" | ||||
print msg | ||||
else: | ||||
self.call_handler(handler, msg) | ||||
Brian Granger
|
r2606 | def call_handler(self, handler, msg): | ||
if self._overriden_call_handler is not None: | ||||
self._overriden_call_handler(handler, msg) | ||||
elif hasattr(self, '_call_handler'): | ||||
call_handler = getattr(self, '_call_handler') | ||||
call_handler(handler, msg) | ||||
else: | ||||
raise RuntimeError('no handler!') | ||||
class RepSocketChannel(ZmqSocketChannel): | ||||
epatters
|
r2632 | def on_raw_input(self): | ||
Brian Granger
|
r2606 | pass | ||
epatters
|
r2611 | |||
class KernelManager(HasTraits): | ||||
epatters
|
r2631 | """ Manages a kernel for a frontend. | ||
epatters
|
r2611 | |||
epatters
|
r2631 | The SUB channel is for the frontend to receive messages published by the | ||
kernel. | ||||
The REQ channel is for the frontend to make requests of the kernel. | ||||
The REP channel is for the kernel to request stdin (raw_input) from the | ||||
frontend. | ||||
""" | ||||
epatters
|
r2611 | |||
epatters
|
r2639 | # Whether the kernel manager is currently listening on its channels. | ||
is_listening = Bool(False) | ||||
epatters
|
r2611 | # The PyZMQ Context to use for communication with the kernel. | ||
context = Instance(zmq.Context, ()) | ||||
# The Session to use for communication with the kernel. | ||||
session = Instance(Session, ()) | ||||
# The classes to use for the various channels. | ||||
sub_channel_class = Type(SubSocketChannel) | ||||
xreq_channel_class = Type(XReqSocketChannel) | ||||
rep_channel_class = Type(RepSocketChannel) | ||||
epatters
|
r2631 | |||
epatters
|
r2611 | # Protected traits. | ||
epatters
|
r2667 | _kernel = Instance(Popen) | ||
epatters
|
r2611 | _sub_channel = Any | ||
_xreq_channel = Any | ||||
_rep_channel = Any | ||||
epatters
|
r2631 | def __init__(self, **traits): | ||
epatters
|
r2611 | super(KernelManager, self).__init__() | ||
# FIXME: This should be the business of HasTraits. The convention is: | ||||
# HasTraits.__init__(self, **traits_to_be_initialized.) | ||||
for trait in traits: | ||||
setattr(self, trait, traits[trait]) | ||||
epatters
|
r2639 | def start_listening(self): | ||
"""Start listening on the specified ports. If already listening, raises | ||||
a RuntimeError. | ||||
""" | ||||
if self.is_listening: | ||||
raise RuntimeError("Cannot start listening. Already listening!") | ||||
else: | ||||
self.is_listening = True | ||||
self.sub_channel.start() | ||||
self.xreq_channel.start() | ||||
self.rep_channel.start() | ||||
def stop_listening(self): | ||||
"""Stop listening. If not listening, does nothing. """ | ||||
if self.is_listening: | ||||
self.is_listening = False | ||||
self.sub_channel.stop() | ||||
self.xreq_channel.stop() | ||||
self.rep_channel.stop() | ||||
epatters
|
r2611 | def start_kernel(self): | ||
epatters
|
r2667 | """Start a localhost kernel. If ports have been specified via the | ||
address attributes, use them. Otherwise, choose open ports at random. | ||||
epatters
|
r2611 | """ | ||
epatters
|
r2667 | xreq, sub = self.xreq_address, self.sub_address | ||
if xreq[0] != LOCALHOST or sub[0] != LOCALHOST: | ||||
raise RuntimeError("Can only launch a kernel on localhost." | ||||
"Make sure that the '*_address' attributes are " | ||||
"configured properly.") | ||||
self._kernel, xrep, pub = launch_kernel(xrep_port=xreq[1], | ||||
pub_port=sub[1]) | ||||
self.xreq_address = (LOCALHOST, xrep) | ||||
self.sub_address = (LOCALHOST, pub) | ||||
epatters
|
r2611 | |||
def kill_kernel(self): | ||||
epatters
|
r2667 | """Kill the running kernel, if there is one. | ||
epatters
|
r2632 | """ | ||
epatters
|
r2667 | if self._kernel: | ||
self._kernel.kill() | ||||
self._kernel = None | ||||
epatters
|
r2611 | |||
epatters
|
r2639 | @property | ||
epatters
|
r2611 | def is_alive(self): | ||
epatters
|
r2639 | """ Returns whether the kernel is alive. """ | ||
if self.is_listening: | ||||
# TODO: check if alive. | ||||
return True | ||||
else: | ||||
return False | ||||
epatters
|
r2611 | |||
def signal_kernel(self, signum): | ||||
"""Send signum to the kernel.""" | ||||
epatters
|
r2639 | # TODO: signal the kernel. | ||
epatters
|
r2611 | |||
epatters
|
r2632 | #-------------------------------------------------------------------------- | ||
# Channels used for communication with the kernel: | ||||
#-------------------------------------------------------------------------- | ||||
epatters
|
r2611 | @property | ||
def sub_channel(self): | ||||
"""Get the SUB socket channel object.""" | ||||
if self._sub_channel is None: | ||||
epatters
|
r2631 | self._sub_channel = self.sub_channel_class(self.context, | ||
self.session) | ||||
epatters
|
r2611 | return self._sub_channel | ||
@property | ||||
def xreq_channel(self): | ||||
"""Get the REQ socket channel object to make requests of the kernel.""" | ||||
if self._xreq_channel is None: | ||||
epatters
|
r2631 | self._xreq_channel = self.xreq_channel_class(self.context, | ||
self.session) | ||||
epatters
|
r2611 | return self._xreq_channel | ||
@property | ||||
def rep_channel(self): | ||||
"""Get the REP socket channel object to handle stdin (raw_input).""" | ||||
if self._rep_channel is None: | ||||
epatters
|
r2631 | self._rep_channel = self.rep_channel_class(self.context, | ||
self.session) | ||||
epatters
|
r2611 | return self._rep_channel | ||
epatters
|
r2631 | |||
epatters
|
r2632 | #-------------------------------------------------------------------------- | ||
epatters
|
r2639 | # Channel address attributes: | ||
epatters
|
r2632 | #-------------------------------------------------------------------------- | ||
epatters
|
r2631 | def get_sub_address(self): | ||
epatters
|
r2632 | return self.sub_channel.address | ||
def set_sub_address(self, address): | ||||
self.sub_channel.address = address | ||||
epatters
|
r2631 | sub_address = property(get_sub_address, set_sub_address, | ||
doc="The address used by SUB socket channel.") | ||||
def get_xreq_address(self): | ||||
epatters
|
r2632 | return self.xreq_channel.address | ||
def set_xreq_address(self, address): | ||||
self.xreq_channel.address = address | ||||
epatters
|
r2631 | xreq_address = property(get_xreq_address, set_xreq_address, | ||
doc="The address used by XREQ socket channel.") | ||||
def get_rep_address(self): | ||||
epatters
|
r2632 | return self.rep_channel.address | ||
def set_rep_address(self, address): | ||||
self.rep_channel.address = address | ||||
epatters
|
r2631 | rep_address = property(get_rep_address, set_rep_address, | ||
doc="The address used by REP socket channel.") | ||||
epatters
|
r2632 | |||