From 868cb7a0c28029e92a2de519810bffec56232a3c 2014-12-05 01:48:48 From: Thomas Kluyver Date: 2014-12-05 01:48:48 Subject: [PATCH] Factor out code to set up ZMQ sockets --- diff --git a/IPython/kernel/blocking/channels.py b/IPython/kernel/blocking/channels.py index 8911a62..805f465 100644 --- a/IPython/kernel/blocking/channels.py +++ b/IPython/kernel/blocking/channels.py @@ -14,8 +14,9 @@ try: except ImportError: from Queue import Queue, Empty # Py 2 -from IPython.kernel.channels import IOPubChannel, HBChannel, \ - ShellChannel, StdInChannel, InvalidPortNumber, major_protocol_version +from IPython.kernel.channels import HBChannel,\ + make_iopub_socket, make_shell_socket, make_stdin_socket,\ + InvalidPortNumber, major_protocol_version from IPython.utils.py3compat import string_types, iteritems # some utilities to validate message structure, these might get moved elsewhere @@ -154,10 +155,7 @@ class BlockingShellChannel(ZMQSocketChannel): ] def start(self): - self.socket = self.context.socket(zmq.DEALER) - self.socket.linger = 1000 - self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) - self.socket.connect(self.address) + self.socket = make_stdin_socket(self.context, self.session.bsession, self.address) def execute(self, code, silent=False, store_history=True, user_expressions=None, allow_stdin=None): @@ -354,12 +352,7 @@ class BlockingIOPubChannel(ZMQSocketChannel): This channel is where all output is published to frontends. """ def start(self): - self.socket = self.context.socket(zmq.SUB) - self.socket.linger = 1000 - self.socket.setsockopt(zmq.SUBSCRIBE,b'') - self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) - self.socket.connect(self.address) - + self.socket = make_iopub_socket(self.context, self.session.bsession, self.address) class BlockingStdInChannel(ZMQSocketChannel): """The stdin channel to handle raw_input requests that the kernel makes.""" @@ -367,10 +360,7 @@ class BlockingStdInChannel(ZMQSocketChannel): proxy_methods = ['input'] def start(self): - self.socket = self.context.socket(zmq.DEALER) - self.socket.linger = 1000 - self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) - self.socket.connect(self.address) + self.socket = make_stdin_socket(self.context, self.session.bsession, self.address) def input(self, string): """Send a string of raw input to the kernel.""" diff --git a/IPython/kernel/channels.py b/IPython/kernel/channels.py index 62ccaf5..e6df05d 100644 --- a/IPython/kernel/channels.py +++ b/IPython/kernel/channels.py @@ -500,6 +500,27 @@ class StdInChannel(ZMQSocketChannel): msg = self.session.msg('input_reply', content) self._queue_send(msg) +def make_shell_socket(context, identity, address): + socket = context.socket(zmq.DEALER) + socket.linger = 1000 + socket.setsockopt(zmq.IDENTITY, identity) + socket.connect(address) + return socket + +def make_iopub_socket(context, identity, address): + socket = context.socket(zmq.SUB) + socket.linger = 1000 + socket.setsockopt(zmq.SUBSCRIBE,b'') + socket.setsockopt(zmq.IDENTITY, identity) + socket.connect(address) + return socket + +def make_stdin_socket(context, identity, address): + socket = context.socket(zmq.DEALER) + socket.linger = 1000 + socket.setsockopt(zmq.IDENTITY, identity) + socket.connect(address) + return socket class HBChannel(ZMQSocketChannel): """The heartbeat channel which monitors the kernel heartbeat. diff --git a/IPython/qt/client.py b/IPython/qt/client.py index c858041..f3af1fc 100644 --- a/IPython/qt/client.py +++ b/IPython/qt/client.py @@ -15,7 +15,8 @@ from IPython.external.qt import QtCore # Local imports from IPython.utils.traitlets import Type -from IPython.kernel.channels import HBChannel +from IPython.kernel.channels import HBChannel,\ + make_shell_socket, make_iopub_socket, make_stdin_socket from IPython.kernel import KernelClient from .kernel_mixins import (QtHBChannelMixin, QtKernelClientMixin) @@ -39,16 +40,6 @@ class InvalidPortNumber(Exception): # some utilities to validate message structure, these might get moved elsewhere # if they prove to have more generic utility -def validate_string_list(lst): - """Validate that the input is a list of strings. - - Raises ValueError if not.""" - if not isinstance(lst, list): - raise ValueError('input %r must be a list' % lst) - for x in lst: - if not isinstance(x, string_types): - raise ValueError('element %r in list must be a string' % x) - def validate_string_dict(dct): """Validate that the input is a dict with string keys and values. @@ -246,10 +237,7 @@ class QtShellChannel(QtZMQSocketChannel): def run(self): """The thread's main activity. Call start() instead.""" - self.socket = self.context.socket(zmq.DEALER) - self.socket.linger = 1000 - self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) - self.socket.connect(self.address) + self.socket = make_shell_socket(self.context, self.session.bsession, self.address) self.stream = zmqstream.ZMQStream(self.socket, self.ioloop) self.stream.on_recv(self._handle_recv) self._run_loop() @@ -482,11 +470,7 @@ class QtIOPubChannel(QtZMQSocketChannel): def run(self): """The thread's main activity. Call start() instead.""" - self.socket = self.context.socket(zmq.SUB) - self.socket.linger = 1000 - self.socket.setsockopt(zmq.SUBSCRIBE,b'') - self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) - self.socket.connect(self.address) + self.socket = make_iopub_socket(self.context, self.session.bsession, self.address) self.stream = zmqstream.ZMQStream(self.socket, self.ioloop) self.stream.on_recv(self._handle_recv) self._run_loop() @@ -545,10 +529,7 @@ class QtStdInChannel(QtZMQSocketChannel): def run(self): """The thread's main activity. Call start() instead.""" - self.socket = self.context.socket(zmq.DEALER) - self.socket.linger = 1000 - self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) - self.socket.connect(self.address) + self.socket = make_stdin_socket(self.context, self.session.bsession, self.address) self.stream = zmqstream.ZMQStream(self.socket, self.ioloop) self.stream.on_recv(self._handle_recv) self._run_loop()