diff --git a/IPython/kernel/blocking/channels.py b/IPython/kernel/blocking/channels.py index 0417e60..6d022e7 100644 --- a/IPython/kernel/blocking/channels.py +++ b/IPython/kernel/blocking/channels.py @@ -51,17 +51,17 @@ class ZMQSocketChannel(object): _exiting = False proxy_methods = [] - def __init__(self, socket, session): + def __init__(self, socket, session, loop=None): """Create a channel. Parameters ---------- - context : :class:`zmq.Context` - The ZMQ context to use. + socket : :class:`zmq.Socket` + The ZMQ socket to use. session : :class:`session.Session` The session to use. - address : zmq url - Standard (ip, port) tuple that the kernel is listening on. + loop + Unused here, for other implementations """ super(ZMQSocketChannel, self).__init__() diff --git a/IPython/kernel/client.py b/IPython/kernel/client.py index cd77320..11f85ce 100644 --- a/IPython/kernel/client.py +++ b/IPython/kernel/client.py @@ -124,6 +124,8 @@ class KernelClient(ConnectionFileMixin): return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or self.stdin_channel.is_alive() or self.hb_channel.is_alive()) + ioloop = None # Overridden in subclasses that use pyzmq event loop + @property def shell_channel(self): """Get the shell channel object for this kernel.""" @@ -132,7 +134,7 @@ class KernelClient(ConnectionFileMixin): self.log.debug("connecting shell channel to %s", url) socket = make_shell_socket(self.context, self.session.bsession, url) self._shell_channel = self.shell_channel_class( - socket, self.session + socket, self.session, self.ioloop ) return self._shell_channel @@ -144,7 +146,7 @@ class KernelClient(ConnectionFileMixin): self.log.debug("connecting iopub channel to %s", url) socket = make_iopub_socket(self.context, self.session.bsession, url) self._iopub_channel = self.iopub_channel_class( - socket, self.session + socket, self.session, self.ioloop ) return self._iopub_channel @@ -156,7 +158,7 @@ class KernelClient(ConnectionFileMixin): self.log.debug("connecting stdin channel to %s", url) socket = make_stdin_socket(self.context, self.session.bsession, url) self._stdin_channel = self.stdin_channel_class( - socket, self.session + socket, self.session, self.ioloop ) return self._stdin_channel diff --git a/IPython/qt/client.py b/IPython/qt/client.py index 0410191..b424d64 100644 --- a/IPython/qt/client.py +++ b/IPython/qt/client.py @@ -14,7 +14,7 @@ from zmq.eventloop import ioloop, zmqstream from IPython.external.qt import QtCore # Local imports -from IPython.utils.traitlets import Type +from IPython.utils.traitlets import Type, Instance from IPython.kernel.channels import HBChannel,\ make_shell_socket, make_iopub_socket, make_stdin_socket from IPython.kernel import KernelClient @@ -53,20 +53,12 @@ def validate_string_dict(dct): -class QtZMQSocketChannel(SuperQObject, Thread): +class QtZMQSocketChannel(SuperQObject): """The base class for the channels that use ZMQ sockets.""" session = None socket = None ioloop = None stream = None - _exiting = False - proxy_methods = [] - - # Emitted when the channel is started. - started = QtCore.Signal() - - # Emitted when the channel is stopped. - stopped = QtCore.Signal() message_received = QtCore.Signal(object) @@ -85,78 +77,38 @@ class QtZMQSocketChannel(SuperQObject, Thread): """ QtCore.QCoreApplication.instance().processEvents() - def __init__(self, socket, session): + def __init__(self, socket, session, loop): """Create a channel. Parameters ---------- - context : :class:`zmq.Context` - The ZMQ context to use. + socket : :class:`zmq.Socket` + The ZMQ socket to use. session : :class:`session.Session` The session to use. - address : zmq url - Standard (ip, port) tuple that the kernel is listening on. + loop + A pyzmq ioloop to connect the socket to using a ZMQStream """ super(QtZMQSocketChannel, self).__init__() - self.daemon = True self.socket = socket self.session = session - atexit.register(self._notice_exit) - self.ioloop = ioloop.IOLoop() + self.ioloop = loop - def _notice_exit(self): - self._exiting = True + self.stream = zmqstream.ZMQStream(self.socket, self.ioloop) + self.stream.on_recv(self._handle_recv) - def _run_loop(self): - """Run my loop, ignoring EINTR events in the poller""" - while True: - try: - self.ioloop.start() - except ZMQError as e: - if e.errno == errno.EINTR: - continue - else: - raise - except Exception: - if self._exiting: - break - else: - raise - else: - break + _is_alive = False + def is_alive(self): + return self._is_alive def start(self): - """ Reimplemented to emit signal. - """ - super(QtZMQSocketChannel, self).start() - self.started.emit() - - def run(self): - """The thread's main activity. Call start() instead.""" - self.stream = zmqstream.ZMQStream(self.socket, self.ioloop) - self.stream.on_recv(self._handle_recv) - self._run_loop() + self._is_alive = True def stop(self): - """Stop the channel's event loop and join its thread. - - This calls :meth:`~threading.Thread.join` and returns when the thread - terminates. :class:`RuntimeError` will be raised if - :meth:`~threading.Thread.start` is called again. - """ - if self.ioloop is not None: - self.ioloop.stop() - self.join() - self.close() - self.stopped.emit() + self._is_alive = False def close(self): - if self.ioloop is not None: - try: - self.ioloop.close(all_fds=True) - except Exception: - pass if self.socket is not None: try: self.socket.close(linger=0) @@ -238,12 +190,76 @@ class QtZMQSocketChannel(SuperQObject, Thread): self._flushed = True +class IOLoopThread(Thread): + """Run a pyzmq ioloop in a thread to send and receive messages + """ + def __init__(self, loop): + super(IOLoopThread, self).__init__() + self.daemon = True + atexit.register(self._notice_exit) + self.ioloop = loop or ioloop.IOLoop() + + def _notice_exit(self): + self._exiting = True + + def run(self): + """Run my loop, ignoring EINTR events in the poller""" + while True: + try: + self.ioloop.start() + except ZMQError as e: + if e.errno == errno.EINTR: + continue + else: + raise + except Exception: + if self._exiting: + break + else: + raise + else: + break + + def stop(self): + """Stop the channel's event loop and join its thread. + + This calls :meth:`~threading.Thread.join` and returns when the thread + terminates. :class:`RuntimeError` will be raised if + :meth:`~threading.Thread.start` is called again. + """ + if self.ioloop is not None: + self.ioloop.stop() + self.join() + self.close() + + def close(self): + if self.ioloop is not None: + try: + self.ioloop.close(all_fds=True) + except Exception: + pass + + class QtKernelClient(QtKernelClientMixin, KernelClient): """ A KernelClient that provides signals and slots. """ + + _ioloop = None + @property + def ioloop(self): + if self._ioloop is None: + self._ioloop = ioloop.IOLoop() + return self._ioloop + + ioloop_thread = Instance(IOLoopThread) + def start_channels(self, shell=True, iopub=True, stdin=True, hb=True): if shell: self.shell_channel.message_received.connect(self._check_kernel_info_reply) + + self.channel_listener_thread = IOLoopThread(self.ioloop) + self.channel_listener_thread.start() + super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb) def _check_kernel_info_reply(self, msg): @@ -251,6 +267,11 @@ class QtKernelClient(QtKernelClientMixin, KernelClient): self._handle_kernel_info_reply(msg) self.shell_channel.message_received.disconnect(self._check_kernel_info_reply) + def stop_channels(self): + super(QtKernelClient, self).stop_channels() + if self.ioloop_thread.is_alive(): + self.ioloop_thread.stop() + iopub_channel_class = Type(QtZMQSocketChannel) shell_channel_class = Type(QtZMQSocketChannel) stdin_channel_class = Type(QtZMQSocketChannel)