diff --git a/IPython/frontend/qt/base_frontend_mixin.py b/IPython/frontend/qt/base_frontend_mixin.py index f558e66..7897723 100644 --- a/IPython/frontend/qt/base_frontend_mixin.py +++ b/IPython/frontend/qt/base_frontend_mixin.py @@ -30,7 +30,7 @@ class BaseFrontendMixin(object): old_manager.stopped_channels.disconnect(self._stopped_channels) # Disconnect the old kernel manager's channels. - old_manager.sub_channel.message_received.disconnect(self._dispatch) + old_manager.iopub_channel.message_received.disconnect(self._dispatch) old_manager.shell_channel.message_received.disconnect(self._dispatch) old_manager.stdin_channel.message_received.disconnect(self._dispatch) old_manager.hb_channel.kernel_died.disconnect( @@ -51,7 +51,7 @@ class BaseFrontendMixin(object): kernel_manager.stopped_channels.connect(self._stopped_channels) # Connect the new kernel manager's channels. - kernel_manager.sub_channel.message_received.connect(self._dispatch) + kernel_manager.iopub_channel.message_received.connect(self._dispatch) kernel_manager.shell_channel.message_received.connect(self._dispatch) kernel_manager.stdin_channel.message_received.connect(self._dispatch) kernel_manager.hb_channel.kernel_died.connect(self._handle_kernel_died) diff --git a/IPython/frontend/qt/base_kernelmanager.py b/IPython/frontend/qt/base_kernelmanager.py index 2851b79..0ca589f 100644 --- a/IPython/frontend/qt/base_kernelmanager.py +++ b/IPython/frontend/qt/base_kernelmanager.py @@ -69,7 +69,7 @@ class QtShellChannelMixin(ChannelQObject): _handlers_called = False #--------------------------------------------------------------------------- - # 'ShellSocketChannel' interface + # 'ShellChannel' interface #--------------------------------------------------------------------------- def call_handlers(self, msg): @@ -98,7 +98,7 @@ class QtShellChannelMixin(ChannelQObject): self._handlers_called = False -class QtSubChannelMixin(ChannelQObject): +class QtIOPubChannelMixin(ChannelQObject): # Emitted when any message is received. message_received = QtCore.Signal(object) @@ -126,7 +126,7 @@ class QtSubChannelMixin(ChannelQObject): shutdown_reply_received = QtCore.Signal(object) #--------------------------------------------------------------------------- - # 'SubSocketChannel' interface + # 'IOPubChannel' interface #--------------------------------------------------------------------------- def call_handlers(self, msg): @@ -145,7 +145,7 @@ class QtSubChannelMixin(ChannelQObject): def flush(self): """ Reimplemented to ensure that signals are dispatched immediately. """ - super(QtSubChannelMixin, self).flush() + super(QtIOPubChannelMixin, self).flush() QtCore.QCoreApplication.instance().processEvents() @@ -158,7 +158,7 @@ class QtStdInChannelMixin(ChannelQObject): input_requested = QtCore.Signal(object) #--------------------------------------------------------------------------- - # 'StdInSocketChannel' interface + # 'StdInChannel' interface #--------------------------------------------------------------------------- def call_handlers(self, msg): @@ -179,7 +179,7 @@ class QtHBChannelMixin(ChannelQObject): kernel_died = QtCore.Signal(object) #--------------------------------------------------------------------------- - # 'HBSocketChannel' interface + # 'HBChannel' interface #--------------------------------------------------------------------------- def call_handlers(self, since_last_heartbeat): @@ -205,7 +205,7 @@ class QtKernelManagerMixin(HasTraits, SuperQObject): stopped_channels = QtCore.Signal() # Use Qt-specific channel classes that emit signals. - sub_channel_class = Type(QtSubChannelMixin) + iopub_channel_class = Type(QtIOPubChannelMixin) shell_channel_class = Type(QtShellChannelMixin) stdin_channel_class = Type(QtStdInChannelMixin) hb_channel_class = Type(QtHBChannelMixin) diff --git a/IPython/frontend/qt/console/frontend_widget.py b/IPython/frontend/qt/console/frontend_widget.py index 9bbf140..b1476c5 100644 --- a/IPython/frontend/qt/console/frontend_widget.py +++ b/IPython/frontend/qt/console/frontend_widget.py @@ -396,7 +396,7 @@ class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin): if info and info.kind == 'user' and not self._hidden: # Make sure that all output from the SUB channel has been processed # before writing a new prompt. - self.kernel_manager.sub_channel.flush() + self.kernel_manager.iopub_channel.flush() # Reset the ANSI style information to prevent bad text in stdout # from messing up our colors. We're not a true terminal so we're @@ -431,7 +431,7 @@ class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin): # Make sure that all output from the SUB channel has been processed # before entering readline mode. - self.kernel_manager.sub_channel.flush() + self.kernel_manager.iopub_channel.flush() def callback(line): self.kernel_manager.stdin_channel.input(line) diff --git a/IPython/frontend/qt/inprocess_kernelmanager.py b/IPython/frontend/qt/inprocess_kernelmanager.py index 6cc1dad..3f70062 100644 --- a/IPython/frontend/qt/inprocess_kernelmanager.py +++ b/IPython/frontend/qt/inprocess_kernelmanager.py @@ -3,23 +3,23 @@ # Local imports. from IPython.inprocess.kernelmanager import \ - ShellInProcessChannel, SubInProcessChannel, StdInInProcessChannel, \ - HBInProcessChannel, InProcessKernelManager + InProcessShellChannel, InProcessIOPubChannel, InProcessStdInChannel, \ + InProcessHBChannel, InProcessKernelManager from IPython.utils.traitlets import Type -from base_kernelmanager import QtShellChannelMixin, QtSubChannelMixin, \ +from base_kernelmanager import QtShellChannelMixin, QtIOPubChannelMixin, \ QtStdInChannelMixin, QtHBChannelMixin, QtKernelManagerMixin -class QtShellInProcessChannel(QtShellChannelMixin, ShellInProcessChannel): +class QtInProcessShellChannel(QtShellChannelMixin, InProcessShellChannel): pass -class QtSubInProcessChannel(QtSubChannelMixin, SubInProcessChannel): +class QtInProcessIOPubChannel(QtIOPubChannelMixin, InProcessIOPubChannel): pass -class QtStdInInProcessChannel(QtStdInChannelMixin, StdInInProcessChannel): +class QtInProcessStdInChannel(QtStdInChannelMixin, InProcessStdInChannel): pass -class QtHBInProcessChannel(QtHBChannelMixin, HBInProcessChannel): +class QtInProcessHBChannel(QtHBChannelMixin, InProcessHBChannel): pass @@ -27,7 +27,7 @@ class QtInProcessKernelManager(QtKernelManagerMixin, InProcessKernelManager): """ An in-process KernelManager with signals and slots. """ - sub_channel_class = Type(QtSubInProcessChannel) - shell_channel_class = Type(QtShellInProcessChannel) - stdin_channel_class = Type(QtStdInInProcessChannel) - hb_channel_class = Type(QtHBInProcessChannel) + iopub_channel_class = Type(QtInProcessIOPubChannel) + shell_channel_class = Type(QtInProcessShellChannel) + stdin_channel_class = Type(QtInProcessStdInChannel) + hb_channel_class = Type(QtInProcessHBChannel) diff --git a/IPython/frontend/qt/kernelmanager.py b/IPython/frontend/qt/kernelmanager.py index bca2575..7a1086d 100644 --- a/IPython/frontend/qt/kernelmanager.py +++ b/IPython/frontend/qt/kernelmanager.py @@ -3,22 +3,22 @@ # Local imports. from IPython.utils.traitlets import Type -from IPython.zmq.kernelmanager import ShellSocketChannel, SubSocketChannel, \ - StdInSocketChannel, HBSocketChannel, KernelManager -from base_kernelmanager import QtShellChannelMixin, QtSubChannelMixin, \ +from IPython.zmq.kernelmanager import ShellChannel, IOPubChannel, \ + StdInChannel, HBChannel, KernelManager +from base_kernelmanager import QtShellChannelMixin, QtIOPubChannelMixin, \ QtStdInChannelMixin, QtHBChannelMixin, QtKernelManagerMixin -class QtShellSocketChannel(QtShellChannelMixin, ShellSocketChannel): +class QtShellChannel(QtShellChannelMixin, ShellChannel): pass -class QtSubSocketChannel(QtSubChannelMixin, SubSocketChannel): +class QtIOPubChannel(QtIOPubChannelMixin, IOPubChannel): pass -class QtStdInSocketChannel(QtStdInChannelMixin, StdInSocketChannel): +class QtStdInChannel(QtStdInChannelMixin, StdInChannel): pass -class QtHBSocketChannel(QtHBChannelMixin, HBSocketChannel): +class QtHBChannel(QtHBChannelMixin, HBChannel): pass @@ -26,7 +26,7 @@ class QtKernelManager(QtKernelManagerMixin, KernelManager): """ A KernelManager that provides signals and slots. """ - sub_channel_class = Type(QtSubSocketChannel) - shell_channel_class = Type(QtShellSocketChannel) - stdin_channel_class = Type(QtStdInSocketChannel) - hb_channel_class = Type(QtHBSocketChannel) + iopub_channel_class = Type(QtIOPubChannel) + shell_channel_class = Type(QtShellChannel) + stdin_channel_class = Type(QtStdInChannel) + hb_channel_class = Type(QtHBChannel) diff --git a/IPython/frontend/terminal/console/interactiveshell.py b/IPython/frontend/terminal/console/interactiveshell.py index 983d154..743537a 100644 --- a/IPython/frontend/terminal/console/interactiveshell.py +++ b/IPython/frontend/terminal/console/interactiveshell.py @@ -211,8 +211,8 @@ class ZMQTerminalInteractiveShell(TerminalInteractiveShell): sub_msg: message receive from kernel in the sub socket channel capture by kernel manager. """ - while self.km.sub_channel.msg_ready(): - sub_msg = self.km.sub_channel.get_msg() + while self.km.iopub_channel.msg_ready(): + sub_msg = self.km.iopub_channel.get_msg() msg_type = sub_msg['header']['msg_type'] parent = sub_msg["parent_header"] if (not parent) or self.session_id == parent['session']: diff --git a/IPython/inprocess/blockingkernelmanager.py b/IPython/inprocess/blockingkernelmanager.py index abaf645..e9dc184 100644 --- a/IPython/inprocess/blockingkernelmanager.py +++ b/IPython/inprocess/blockingkernelmanager.py @@ -21,51 +21,22 @@ from threading import Event # Local imports. from IPython.utils.io import raw_print from IPython.utils.traitlets import Type -from kernelmanager import InProcessKernelManager, ShellInProcessChannel, \ - SubInProcessChannel, StdInInProcessChannel +from kernelmanager import InProcessKernelManager, InProcessShellChannel, \ + InProcessIOPubChannel, InProcessStdInChannel +from IPython.zmq.blockingkernelmanager import BlockingChannelMixin -#----------------------------------------------------------------------------- -# Utility classes -#----------------------------------------------------------------------------- - -class BlockingChannelMixin(object): - - def __init__(self, *args, **kwds): - super(BlockingChannelMixin, self).__init__(*args, **kwds) - self._in_queue = Queue.Queue() - - def call_handlers(self, msg): - self._in_queue.put(msg) - - def get_msg(self, block=True, timeout=None): - """ Gets a message if there is one that is ready. """ - return self._in_queue.get(block, timeout) - - def get_msgs(self): - """ Get all messages that are currently ready. """ - msgs = [] - while True: - try: - msgs.append(self.get_msg(block=False)) - except Queue.Empty: - break - return msgs - - def msg_ready(self): - """ Is there a message that has been received? """ - return not self._in_queue.empty() #----------------------------------------------------------------------------- # Blocking kernel manager #----------------------------------------------------------------------------- -class BlockingShellInProcessChannel(BlockingChannelMixin, ShellInProcessChannel): +class BlockingInProcessShellChannel(BlockingChannelMixin, InProcessShellChannel): pass -class BlockingSubInProcessChannel(BlockingChannelMixin, SubInProcessChannel): +class BlockingInProcessIOPubChannel(BlockingChannelMixin, InProcessIOPubChannel): pass -class BlockingStdInInProcessChannel(BlockingChannelMixin, StdInInProcessChannel): +class BlockingInProcessStdInChannel(BlockingChannelMixin, InProcessStdInChannel): def call_handlers(self, msg): """ Overridden for the in-process channel. @@ -82,6 +53,6 @@ class BlockingStdInInProcessChannel(BlockingChannelMixin, StdInInProcessChannel) class BlockingInProcessKernelManager(InProcessKernelManager): # The classes to use for the various channels. - shell_channel_class = Type(BlockingShellInProcessChannel) - sub_channel_class = Type(BlockingSubInProcessChannel) - stdin_channel_class = Type(BlockingStdInInProcessChannel) + shell_channel_class = Type(BlockingInProcessShellChannel) + iopub_channel_class = Type(BlockingInProcessIOPubChannel) + stdin_channel_class = Type(BlockingInProcessStdInChannel) diff --git a/IPython/inprocess/ipkernel.py b/IPython/inprocess/ipkernel.py index f6d4e97..de60eb0 100644 --- a/IPython/inprocess/ipkernel.py +++ b/IPython/inprocess/ipkernel.py @@ -123,7 +123,7 @@ class InProcessKernel(Kernel): """ ident, msg = self.session.recv(self.iopub_socket, copy=False) for frontend in self.frontends: - frontend.sub_channel.call_handlers(msg) + frontend.iopub_channel.call_handlers(msg) #------ Trait initializers ----------------------------------------------- diff --git a/IPython/inprocess/kernelmanager.py b/IPython/inprocess/kernelmanager.py index bc03f2d..77b525f 100644 --- a/IPython/inprocess/kernelmanager.py +++ b/IPython/inprocess/kernelmanager.py @@ -71,7 +71,7 @@ class InProcessChannel(object): raise NotImplementedError -class ShellInProcessChannel(InProcessChannel): +class InProcessShellChannel(InProcessChannel): """The DEALER channel for issues request/replies to the kernel. """ @@ -240,7 +240,7 @@ class ShellInProcessChannel(InProcessChannel): self.call_handlers_later(reply_msg) -class SubInProcessChannel(InProcessChannel): +class InProcessIOPubChannel(InProcessChannel): """The SUB channel which listens for messages that the kernel publishes. """ @@ -252,7 +252,7 @@ class SubInProcessChannel(InProcessChannel): pass -class StdInInProcessChannel(InProcessChannel): +class InProcessStdInChannel(InProcessChannel): """ A reply channel to handle raw_input requests that the kernel makes. """ def input(self, string): @@ -264,13 +264,13 @@ class StdInInProcessChannel(InProcessChannel): kernel.raw_input_str = string -class HBInProcessChannel(InProcessChannel): +class InProcessHBChannel(InProcessChannel): """ A dummy heartbeat channel. """ time_to_dead = 3.0 def __init__(self, *args, **kwds): - super(HBInProcessChannel, self).__init__(*args, **kwds) + super(InProcessHBChannel, self).__init__(*args, **kwds) self._pause = True def pause(self): @@ -310,14 +310,14 @@ class InProcessKernelManager(HasTraits): kernel = Instance('IPython.inprocess.ipkernel.InProcessKernel') # The classes to use for the various channels. - shell_channel_class = Type(ShellInProcessChannel) - sub_channel_class = Type(SubInProcessChannel) - stdin_channel_class = Type(StdInInProcessChannel) - hb_channel_class = Type(HBInProcessChannel) + shell_channel_class = Type(InProcessShellChannel) + iopub_channel_class = Type(InProcessIOPubChannel) + stdin_channel_class = Type(InProcessStdInChannel) + hb_channel_class = Type(InProcessHBChannel) # Protected traits. _shell_channel = Any - _sub_channel = Any + _iopub_channel = Any _stdin_channel = Any _hb_channel = Any @@ -331,7 +331,7 @@ class InProcessKernelManager(HasTraits): if shell: self.shell_channel.start() if sub: - self.sub_channel.start() + self.iopub_channel.start() if stdin: self.stdin_channel.start() self.shell_channel.allow_stdin = True @@ -345,8 +345,8 @@ class InProcessKernelManager(HasTraits): """ if self.shell_channel.is_alive(): self.shell_channel.stop() - if self.sub_channel.is_alive(): - self.sub_channel.stop() + if self.iopub_channel.is_alive(): + self.iopub_channel.stop() if self.stdin_channel.is_alive(): self.stdin_channel.stop() if self.hb_channel.is_alive(): @@ -355,7 +355,7 @@ class InProcessKernelManager(HasTraits): @property def channels_running(self): """ Are any of the channels created and running? """ - return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or + return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or self.stdin_channel.is_alive() or self.hb_channel.is_alive()) #-------------------------------------------------------------------------- @@ -421,11 +421,11 @@ class InProcessKernelManager(HasTraits): return self._shell_channel @property - def sub_channel(self): + def iopub_channel(self): """Get the SUB socket channel object.""" - if self._sub_channel is None: - self._sub_channel = self.sub_channel_class(self) - return self._sub_channel + if self._iopub_channel is None: + self._iopub_channel = self.iopub_channel_class(self) + return self._iopub_channel @property def stdin_channel(self): diff --git a/IPython/inprocess/tests/test_kernel.py b/IPython/inprocess/tests/test_kernel.py index f366a5b..b40ed60 100644 --- a/IPython/inprocess/tests/test_kernel.py +++ b/IPython/inprocess/tests/test_kernel.py @@ -80,7 +80,7 @@ def get_stream_message(kernel_manager, timeout=5): """ Gets a single stream message synchronously from the sub channel. """ while True: - msg = kernel_manager.sub_channel.get_msg(timeout=timeout) + msg = kernel_manager.iopub_channel.get_msg(timeout=timeout) if msg['header']['msg_type'] == 'stream': return msg diff --git a/IPython/zmq/blockingkernelmanager.py b/IPython/zmq/blockingkernelmanager.py index aad1716..6b53697 100644 --- a/IPython/zmq/blockingkernelmanager.py +++ b/IPython/zmq/blockingkernelmanager.py @@ -13,26 +13,56 @@ Useful for test suites and blocking terminal interfaces. # Imports #----------------------------------------------------------------------------- -# Local imports. -from IPython.inprocess.blockingkernelmanager import BlockingChannelMixin from IPython.utils.traitlets import Type -from kernelmanager import KernelManager, SubSocketChannel, HBSocketChannel, \ - ShellSocketChannel, StdInSocketChannel +from kernelmanager import KernelManager, IOPubChannel, HBChannel, \ + ShellChannel, StdInChannel #----------------------------------------------------------------------------- # Blocking kernel manager #----------------------------------------------------------------------------- -class BlockingSubSocketChannel(BlockingChannelMixin, SubSocketChannel): + +class BlockingChannelMixin(object): + + def __init__(self, *args, **kwds): + super(BlockingChannelMixin, self).__init__(*args, **kwds) + self._in_queue = Queue.Queue() + + def call_handlers(self, msg): + self._in_queue.put(msg) + + def get_msg(self, block=True, timeout=None): + """ Gets a message if there is one that is ready. """ + return self._in_queue.get(block, timeout) + + def get_msgs(self): + """ Get all messages that are currently ready. """ + msgs = [] + while True: + try: + msgs.append(self.get_msg(block=False)) + except Queue.Empty: + break + return msgs + + def msg_ready(self): + """ Is there a message that has been received? """ + return not self._in_queue.empty() + + +class BlockingIOPubChannel(BlockingChannelMixin, IOPubChannel): pass -class BlockingShellSocketChannel(BlockingChannelMixin, ShellSocketChannel): + +class BlockingShellChannel(BlockingChannelMixin, ShellChannel): pass -class BlockingStdInSocketChannel(BlockingChannelMixin, StdInSocketChannel): + +class BlockingStdInChannel(BlockingChannelMixin, StdInChannel): pass -class BlockingHBSocketChannel(HBSocketChannel): + +class BlockingHBChannel(HBChannel): # This kernel needs quicker monitoring, shorten to 1 sec. # less than 0.5s is unreliable, and will get occasional @@ -43,11 +73,12 @@ class BlockingHBSocketChannel(HBSocketChannel): """ Pause beating on missed heartbeat. """ pass + class BlockingKernelManager(KernelManager): # The classes to use for the various channels. - shell_channel_class = Type(BlockingShellSocketChannel) - sub_channel_class = Type(BlockingSubSocketChannel) - stdin_channel_class = Type(BlockingStdInSocketChannel) - hb_channel_class = Type(BlockingHBSocketChannel) + shell_channel_class = Type(BlockingShellChannel) + iopub_channel_class = Type(BlockingIOPubChannel) + stdin_channel_class = Type(BlockingStdInChannel) + hb_channel_class = Type(BlockingHBChannel) diff --git a/IPython/zmq/kernelmanager.py b/IPython/zmq/kernelmanager.py index c79efeb..00d3ed0 100644 --- a/IPython/zmq/kernelmanager.py +++ b/IPython/zmq/kernelmanager.py @@ -180,7 +180,7 @@ class ZMQSocketChannel(Thread): -class ShellSocketChannel(ZMQSocketChannel): +class ShellChannel(ZMQSocketChannel): """The DEALER channel for issues request/replies to the kernel. """ @@ -189,7 +189,7 @@ class ShellSocketChannel(ZMQSocketChannel): allow_stdin = True def __init__(self, context, session, address): - super(ShellSocketChannel, self).__init__(context, session, address) + super(ShellChannel, self).__init__(context, session, address) self.ioloop = ioloop.IOLoop() def run(self): @@ -207,7 +207,7 @@ class ShellSocketChannel(ZMQSocketChannel): def stop(self): self.ioloop.stop() - super(ShellSocketChannel, self).stop() + super(ShellChannel, self).stop() def call_handlers(self, msg): """This method is called in the ioloop thread when a message arrives. @@ -389,12 +389,12 @@ class ShellSocketChannel(ZMQSocketChannel): -class SubSocketChannel(ZMQSocketChannel): +class IOPubChannel(ZMQSocketChannel): """The SUB channel which listens for messages that the kernel publishes. """ def __init__(self, context, session, address): - super(SubSocketChannel, self).__init__(context, session, address) + super(IOPubChannel, self).__init__(context, session, address) self.ioloop = ioloop.IOLoop() def run(self): @@ -413,7 +413,7 @@ class SubSocketChannel(ZMQSocketChannel): def stop(self): self.ioloop.stop() - super(SubSocketChannel, self).stop() + super(IOPubChannel, self).stop() def call_handlers(self, msg): """This method is called in the ioloop thread when a message arrives. @@ -455,13 +455,13 @@ class SubSocketChannel(ZMQSocketChannel): self._flushed = True -class StdInSocketChannel(ZMQSocketChannel): +class StdInChannel(ZMQSocketChannel): """A reply channel to handle raw_input requests that the kernel makes.""" msg_queue = None def __init__(self, context, session, address): - super(StdInSocketChannel, self).__init__(context, session, address) + super(StdInChannel, self).__init__(context, session, address) self.ioloop = ioloop.IOLoop() def run(self): @@ -480,7 +480,7 @@ class StdInSocketChannel(ZMQSocketChannel): def stop(self): self.ioloop.stop() - super(StdInSocketChannel, self).stop() + super(StdInChannel, self).stop() def call_handlers(self, msg): """This method is called in the ioloop thread when a message arrives. @@ -499,7 +499,7 @@ class StdInSocketChannel(ZMQSocketChannel): self._queue_send(msg) -class HBSocketChannel(ZMQSocketChannel): +class HBChannel(ZMQSocketChannel): """The heartbeat channel which monitors the kernel heartbeat. Note that the heartbeat channel is paused by default. As long as you start @@ -515,7 +515,7 @@ class HBSocketChannel(ZMQSocketChannel): _beating = None def __init__(self, context, session, address): - super(HBSocketChannel, self).__init__(context, session, address) + super(HBChannel, self).__init__(context, session, address) self._running = False self._pause =True self.poller = zmq.Poller() @@ -622,7 +622,7 @@ class HBSocketChannel(ZMQSocketChannel): def stop(self): self._running = False - super(HBSocketChannel, self).stop() + super(HBChannel, self).stop() def call_handlers(self, since_last_heartbeat): """This method is called in the ioloop thread when a message arrives. @@ -678,15 +678,15 @@ class KernelManager(Configurable): hb_port = Integer(0) # The classes to use for the various channels. - shell_channel_class = Type(ShellSocketChannel) - sub_channel_class = Type(SubSocketChannel) - stdin_channel_class = Type(StdInSocketChannel) - hb_channel_class = Type(HBSocketChannel) + shell_channel_class = Type(ShellChannel) + iopub_channel_class = Type(IOPubChannel) + stdin_channel_class = Type(StdInChannel) + hb_channel_class = Type(HBChannel) # Protected traits. _launch_args = Any _shell_channel = Any - _sub_channel = Any + _iopub_channel = Any _stdin_channel = Any _hb_channel = Any _connection_file_written=Bool(False) @@ -709,7 +709,7 @@ class KernelManager(Configurable): if shell: self.shell_channel.start() if sub: - self.sub_channel.start() + self.iopub_channel.start() if stdin: self.stdin_channel.start() self.shell_channel.allow_stdin = True @@ -723,8 +723,8 @@ class KernelManager(Configurable): """ if self.shell_channel.is_alive(): self.shell_channel.stop() - if self.sub_channel.is_alive(): - self.sub_channel.stop() + if self.iopub_channel.is_alive(): + self.iopub_channel.stop() if self.stdin_channel.is_alive(): self.stdin_channel.stop() if self.hb_channel.is_alive(): @@ -733,7 +733,7 @@ class KernelManager(Configurable): @property def channels_running(self): """Are any of the channels created and running?""" - return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or + return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or self.stdin_channel.is_alive() or self.hb_channel.is_alive()) #-------------------------------------------------------------------------- @@ -1007,14 +1007,14 @@ class KernelManager(Configurable): return self._shell_channel @property - def sub_channel(self): + def iopub_channel(self): """Get the SUB socket channel object.""" - if self._sub_channel is None: - self._sub_channel = self.sub_channel_class(self.context, + if self._iopub_channel is None: + self._iopub_channel = self.iopub_channel_class(self.context, self.session, self._make_url(self.iopub_port), ) - return self._sub_channel + return self._iopub_channel @property def stdin_channel(self): diff --git a/IPython/zmq/tests/test_message_spec.py b/IPython/zmq/tests/test_message_spec.py index 6f2e72c..896c5ff 100644 --- a/IPython/zmq/tests/test_message_spec.py +++ b/IPython/zmq/tests/test_message_spec.py @@ -48,7 +48,7 @@ def teardown(): def flush_channels(): """flush any messages waiting on the queue""" - for channel in (KM.shell_channel, KM.sub_channel): + for channel in (KM.shell_channel, KM.iopub_channel): while True: try: msg = channel.get_msg(block=True, timeout=0.1) @@ -61,7 +61,7 @@ def flush_channels(): def execute(code='', **kwargs): """wrapper for doing common steps for validating an execution request""" shell = KM.shell_channel - sub = KM.sub_channel + sub = KM.iopub_channel msg_id = shell.execute(code=code, **kwargs) reply = shell.get_msg(timeout=2) @@ -310,23 +310,23 @@ def test_execute_silent(): msg_id, reply = execute(code='x=1', silent=True) # flush status=idle - status = KM.sub_channel.get_msg(timeout=2) + status = KM.iopub_channel.get_msg(timeout=2) for tst in validate_message(status, 'status', msg_id): yield tst nt.assert_equal(status['content']['execution_state'], 'idle') - yield nt.assert_raises(Empty, KM.sub_channel.get_msg, timeout=0.1) + yield nt.assert_raises(Empty, KM.iopub_channel.get_msg, timeout=0.1) count = reply['execution_count'] msg_id, reply = execute(code='x=2', silent=True) # flush status=idle - status = KM.sub_channel.get_msg(timeout=2) + status = KM.iopub_channel.get_msg(timeout=2) for tst in validate_message(status, 'status', msg_id): yield tst yield nt.assert_equal(status['content']['execution_state'], 'idle') - yield nt.assert_raises(Empty, KM.sub_channel.get_msg, timeout=0.1) + yield nt.assert_raises(Empty, KM.iopub_channel.get_msg, timeout=0.1) count_2 = reply['execution_count'] yield nt.assert_equal(count_2, count) @@ -339,7 +339,7 @@ def test_execute_error(): yield nt.assert_equal(reply['status'], 'error') yield nt.assert_equal(reply['ename'], 'ZeroDivisionError') - pyerr = KM.sub_channel.get_msg(timeout=2) + pyerr = KM.iopub_channel.get_msg(timeout=2) for tst in validate_message(pyerr, 'pyerr', msg_id): yield tst @@ -475,7 +475,7 @@ def test_stream(): msg_id, reply = execute("print('hi')") - stdout = KM.sub_channel.get_msg(timeout=2) + stdout = KM.iopub_channel.get_msg(timeout=2) for tst in validate_message(stdout, 'stream', msg_id): yield tst content = stdout['content'] @@ -489,7 +489,7 @@ def test_display_data(): msg_id, reply = execute("from IPython.core.display import display; display(1)") - display = KM.sub_channel.get_msg(timeout=2) + display = KM.iopub_channel.get_msg(timeout=2) for tst in validate_message(display, 'display_data', parent=msg_id): yield tst data = display['content']['data']