From 9014de0415120d8b0bf5cb612fbece145a2d2e6b 2013-01-16 21:58:28 From: Brian E. Granger Date: 2013-01-16 21:58:28 Subject: [PATCH] Adding tested ipc support to MultiKernelManager. --- diff --git a/IPython/frontend/html/notebook/kernelmanager.py b/IPython/frontend/html/notebook/kernelmanager.py index c456659..4376e4d 100644 --- a/IPython/frontend/html/notebook/kernelmanager.py +++ b/IPython/frontend/html/notebook/kernelmanager.py @@ -85,7 +85,9 @@ class MultiKernelManager(LoggingConfigurable): kernel_id = kwargs.pop('kernel_id', unicode(uuid.uuid4())) if kernel_id in self: raise DuplicateKernelError('Kernel already exists: %s' % kernel_id) - # use base KernelManager for each Kernel + # kernel_manager_factory is the constructor for the KernelManager + # subclass we are using. It can be configured as any Configurable, + # including things like its transport and ip. km = self.kernel_manager_factory(connection_file=os.path.join( self.connection_dir, "kernel-%s.json" % kernel_id), config=self.config, @@ -175,7 +177,7 @@ class MultiKernelManager(LoggingConfigurable): else: raise KeyError("Kernel with id not found: %s" % kernel_id) - def get_connection_data(self, kernel_id): + def get_connection_info(self, kernel_id): """Return a dictionary of connection data for a kernel. Parameters @@ -192,18 +194,28 @@ class MultiKernelManager(LoggingConfigurable): shell_port, hb_port). """ km = self.get_kernel(kernel_id) - return dict(ip=km.ip, + return dict(transport=km.transport, + ip=km.ip, shell_port=km.shell_port, iopub_port=km.iopub_port, stdin_port=km.stdin_port, hb_port=km.hb_port, ) - def create_connected_stream(self, ip, port, socket_type): + def _make_url(self, transport, ip, port): + """Make a ZeroMQ URL for a given transport, ip and port.""" + if transport == 'tcp': + return "tcp://%s:%i" % (ip, port) + else: + return "%s://%s-%s" % (transport, ip, port) + + def _create_connected_stream(self, kernel_id, socket_type): + """Create a connected ZMQStream for a kernel.""" + cinfo = self.get_connection_info(kernel_id) + url = self._make_url(cinfo['transport'], cinfo['ip'], cinfo['port']) sock = self.context.socket(socket_type) - addr = "tcp://%s:%i" % (ip, port) - self.log.info("Connecting to: %s" % addr) - sock.connect(addr) + self.log.info("Connecting to: %s" % url) + sock.connect(url) return ZMQStream(sock) def create_iopub_stream(self, kernel_id): @@ -218,10 +230,7 @@ class MultiKernelManager(LoggingConfigurable): ======= stream : ZMQStream """ - kdata = self.get_connection_data(kernel_id) - iopub_stream = self.create_connected_stream( - kdata['ip'], kdata['iopub_port'], zmq.SUB - ) + iopub_stream = self._create_connected_stream(kernel_id, zmq.SUB) iopub_stream.socket.setsockopt(zmq.SUBSCRIBE, b'') return iopub_stream @@ -237,10 +246,7 @@ class MultiKernelManager(LoggingConfigurable): ======= stream : ZMQStream """ - kdata = self.get_connection_data(kernel_id) - shell_stream = self.create_connected_stream( - kdata['ip'], kdata['shell_port'], zmq.DEALER - ) + shell_stream = self._create_connected_stream(kernel_id, zmq.DEALER) return shell_stream def create_hb_stream(self, kernel_id): @@ -255,10 +261,7 @@ class MultiKernelManager(LoggingConfigurable): ======= stream : ZMQStream """ - kdata = self.get_connection_data(kernel_id) - hb_stream = self.create_connected_stream( - kdata['ip'], kdata['hb_port'], zmq.REQ - ) + hb_stream = self._create_connected_stream(kernel_id, zmq.REQ) return hb_stream diff --git a/IPython/frontend/html/notebook/tests/test_kernelsession.py b/IPython/frontend/html/notebook/tests/test_kernelsession.py index 50d2c70..3f216bb 100644 --- a/IPython/frontend/html/notebook/tests/test_kernelsession.py +++ b/IPython/frontend/html/notebook/tests/test_kernelsession.py @@ -2,12 +2,23 @@ from unittest import TestCase +from IPython.config.loader import Config from IPython.frontend.html.notebook.kernelmanager import MultiKernelManager +from IPython.zmq.kernelmanager import KernelManager class TestKernelManager(TestCase): - def test_km_lifecycle(self): - km = MultiKernelManager() + def _get_tcp_km(self): + return MultiKernelManager() + + def _get_ipc_km(self): + c = Config() + c.KernelManager.transport = 'ipc' + c.KernelManager.ip = 'test' + km = MultiKernelManager(config=c) + return km + + def _run_lifecycle(self, km): kid = km.start_kernel() self.assertTrue(kid in km) self.assertTrue(kid in km.list_kernel_ids()) @@ -15,17 +26,42 @@ class TestKernelManager(TestCase): km.restart_kernel(kid) self.assertTrue(kid in km.list_kernel_ids()) km.interrupt_kernel(kid) - km.kill_kernel(kid) + k = km.get_kernel(kid) + self.assertTrue(isinstance(k, KernelManager)) + km.shutdown_kernel(kid) self.assertTrue(not kid in km) + def test_km_tcp(self): + km = self._get_tcp_km() + self._run_lifecycle(km) + + def test_km_ipc(self): + km = self._get_ipc_km() + self._run_lifecycle(km) + + def test_tcp_cinfo(self): + km = self._get_tcp_km() kid = km.start_kernel() - cdata = km.get_connection_data(kid) - self.assertEqual('127.0.0.1', cdata['ip']) - self.assertTrue('stdin_port' in cdata) - self.assertTrue('iopub_port' in cdata) - self.assertTrue('shell_port' in cdata) - self.assertTrue('hb_port' in cdata) - km.get_kernel(kid) - km.kill_kernel(kid) + k = km.get_kernel(kid) + cinfo = km.get_connection_info(kid) + self.assertEqual('tcp', cinfo['transport']) + self.assertEqual('127.0.0.1', cinfo['ip']) + self.assertTrue('stdin_port' in cinfo) + self.assertTrue('iopub_port' in cinfo) + self.assertTrue('shell_port' in cinfo) + self.assertTrue('hb_port' in cinfo) + km.shutdown_kernel(kid) + def test_ipc_cinfo(self): + km = self._get_ipc_km() + kid = km.start_kernel() + k = km.get_kernel(kid) + cinfo = km.get_connection_info(kid) + self.assertEqual('ipc', cinfo['transport']) + self.assertEqual('test', cinfo['ip']) + self.assertTrue('stdin_port' in cinfo) + self.assertTrue('iopub_port' in cinfo) + self.assertTrue('shell_port' in cinfo) + self.assertTrue('hb_port' in cinfo) + km.shutdown_kernel(kid) diff --git a/IPython/zmq/kernelmanager.py b/IPython/zmq/kernelmanager.py index e4ad9c4..a30c0fc 100644 --- a/IPython/zmq/kernelmanager.py +++ b/IPython/zmq/kernelmanager.py @@ -35,6 +35,7 @@ from zmq.eventloop import ioloop, zmqstream # Local imports. from IPython.config.loader import Config +from IPython.config.configurable import Configurable from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS from IPython.utils.traitlets import ( HasTraits, Any, Instance, Type, Unicode, Integer, Bool, CaselessStrEnum @@ -638,7 +639,7 @@ class HBSocketChannel(ZMQSocketChannel): # Main kernel manager class #----------------------------------------------------------------------------- -class KernelManager(HasTraits): +class KernelManager(Configurable): """ Manages a kernel for a frontend. The SUB channel is for the frontend to receive messages published by the @@ -649,9 +650,6 @@ class KernelManager(HasTraits): The REP channel is for the kernel to request stdin (raw_input) from the frontend. """ - # config object for passing to child configurables - config = Instance(Config) - # The PyZMQ Context to use for communication with the kernel. context = Instance(zmq.Context) def _context_default(self): @@ -668,10 +666,9 @@ class KernelManager(HasTraits): # The addresses for the communication channels. connection_file = Unicode('') - transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp') - - - ip = Unicode(LOCALHOST) + transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True) + + ip = Unicode(LOCALHOST, config=True) def _ip_changed(self, name, old, new): if new == '*': self.ip = '0.0.0.0' @@ -768,12 +765,12 @@ class KernelManager(HasTraits): os.remove(ipcfile) except (IOError, OSError): pass - + def load_connection_file(self): """load connection info from JSON dict in self.connection_file""" with open(self.connection_file) as f: cfg = json.loads(f.read()) - + from pprint import pprint pprint(cfg) self.transport = cfg.get('transport', 'tcp')