From fe5dd27522f584413caba7bbedff7bc8a410b641 2014-12-11 18:08:35 From: Min RK Date: 2014-12-11 18:08:35 Subject: [PATCH] Merge pull request #7108 from takluyver/digging-channels Refactor kernel channel classes --- diff --git a/IPython/kernel/blocking/channels.py b/IPython/kernel/blocking/channels.py index c10431d..b9b129e 100644 --- a/IPython/kernel/blocking/channels.py +++ b/IPython/kernel/blocking/channels.py @@ -2,46 +2,59 @@ Useful for test suites and blocking terminal interfaces. """ -#----------------------------------------------------------------------------- -# Copyright (C) 2013 The IPython Development Team -# -# Distributed under the terms of the BSD License. The full license is in -# the file COPYING.txt, distributed as part of this software. -#----------------------------------------------------------------------------- -#----------------------------------------------------------------------------- -# Imports -#----------------------------------------------------------------------------- +# Copyright (c) IPython Development Team. +# Distributed under the terms of the Modified BSD License. try: from queue import Queue, Empty # Py 3 except ImportError: from Queue import Queue, Empty # Py 2 -from IPython.kernel.channels import IOPubChannel, HBChannel, \ - ShellChannel, StdInChannel -#----------------------------------------------------------------------------- -# Blocking kernel manager -#----------------------------------------------------------------------------- - - -class BlockingChannelMixin(object): - - def __init__(self, *args, **kwds): - super(BlockingChannelMixin, self).__init__(*args, **kwds) - self._in_queue = Queue() - - def call_handlers(self, msg): - self._in_queue.put(msg) +class ZMQSocketChannel(object): + """A ZMQ socket in a simple blocking API""" + session = None + socket = None + stream = None + _exiting = False + proxy_methods = [] + + def __init__(self, socket, session, loop=None): + """Create a channel. + + Parameters + ---------- + socket : :class:`zmq.Socket` + The ZMQ socket to use. + session : :class:`session.Session` + The session to use. + loop + Unused here, for other implementations + """ + super(ZMQSocketChannel, self).__init__() + + self.socket = socket + self.session = session + + def _recv(self, **kwargs): + msg = self.socket.recv_multipart(**kwargs) + ident,smsg = self.session.feed_identities(msg) + return self.session.deserialize(smsg) def get_msg(self, block=True, timeout=None): """ Gets a message if there is one that is ready. """ - if timeout is None: - # Queue.get(timeout=None) has stupid uninteruptible - # behavior, so wait for a week instead - timeout = 604800 - return self._in_queue.get(block, timeout) + if block: + if timeout is not None: + timeout *= 1000 # seconds to ms + ready = self.socket.poll(timeout) + else: + ready = self.socket.poll(timeout=0) + + if ready: + return self._recv() + else: + raise Empty def get_msgs(self): """ Get all messages that are currently ready. """ @@ -55,31 +68,25 @@ class BlockingChannelMixin(object): def msg_ready(self): """ Is there a message that has been received? """ - return not self._in_queue.empty() - - -class BlockingIOPubChannel(BlockingChannelMixin, IOPubChannel): - pass - - -class BlockingShellChannel(BlockingChannelMixin, ShellChannel): - def call_handlers(self, msg): - if msg['msg_type'] == 'kernel_info_reply': - self._handle_kernel_info_reply(msg) - return super(BlockingShellChannel, self).call_handlers(msg) - - -class BlockingStdInChannel(BlockingChannelMixin, StdInChannel): - pass + return bool(self.socket.poll(timeout=0)) + def close(self): + if self.socket is not None: + try: + self.socket.close(linger=0) + except Exception: + pass + self.socket = None + stop = close -class BlockingHBChannel(HBChannel): + def is_alive(self): + return (self.socket is not None) - # This kernel needs quicker monitoring, shorten to 1 sec. - # less than 0.5s is unreliable, and will get occasional - # false reports of missed beats. - time_to_dead = 1. + def send(self, msg): + """Pass a message to the ZMQ socket to send + """ + self.session.send(self.socket, msg) - def call_handlers(self, since_last_heartbeat): - """ Pause beating on missed heartbeat. """ + def start(self): pass + diff --git a/IPython/kernel/blocking/client.py b/IPython/kernel/blocking/client.py index 971cf0c..054317a 100644 --- a/IPython/kernel/blocking/client.py +++ b/IPython/kernel/blocking/client.py @@ -2,32 +2,38 @@ Useful for test suites and blocking terminal interfaces. """ -#----------------------------------------------------------------------------- -# Copyright (C) 2013 The IPython Development Team -# -# Distributed under the terms of the BSD License. The full license is in -# the file COPYING.txt, distributed as part of this software. -#----------------------------------------------------------------------------- +# Copyright (c) IPython Development Team. +# Distributed under the terms of the Modified BSD License. -#----------------------------------------------------------------------------- -# Imports -#----------------------------------------------------------------------------- +try: + from queue import Empty # Python 3 +except ImportError: + from Queue import Empty # Python 2 from IPython.utils.traitlets import Type +from IPython.kernel.channels import HBChannel from IPython.kernel.client import KernelClient -from .channels import ( - BlockingIOPubChannel, BlockingHBChannel, - BlockingShellChannel, BlockingStdInChannel -) - -#----------------------------------------------------------------------------- -# Blocking kernel manager -#----------------------------------------------------------------------------- +from .channels import ZMQSocketChannel class BlockingKernelClient(KernelClient): + def wait_for_ready(self): + # Wait for kernel info reply on shell channel + while True: + msg = self.shell_channel.get_msg(block=True) + if msg['msg_type'] == 'kernel_info_reply': + self._handle_kernel_info_reply(msg) + break + + # Flush IOPub channel + while True: + try: + msg = self.iopub_channel.get_msg(block=True, timeout=0.2) + print(msg['msg_type']) + except Empty: + break # The classes to use for the various channels - shell_channel_class = Type(BlockingShellChannel) - iopub_channel_class = Type(BlockingIOPubChannel) - stdin_channel_class = Type(BlockingStdInChannel) - hb_channel_class = Type(BlockingHBChannel) + shell_channel_class = Type(ZMQSocketChannel) + iopub_channel_class = Type(ZMQSocketChannel) + stdin_channel_class = Type(ZMQSocketChannel) + hb_channel_class = Type(HBChannel) diff --git a/IPython/kernel/channels.py b/IPython/kernel/channels.py index 62ccaf5..b22cfba 100644 --- a/IPython/kernel/channels.py +++ b/IPython/kernel/channels.py @@ -14,15 +14,10 @@ import zmq # import ZMQError in top-level namespace, to avoid ugly attribute-error messages # during garbage collection of threads at exit: from zmq import ZMQError -from zmq.eventloop import ioloop, zmqstream from IPython.core.release import kernel_protocol_version_info -from .channelsabc import ( - ShellChannelABC, IOPubChannelABC, - HBChannelABC, StdInChannelABC, -) -from IPython.utils.py3compat import string_types, iteritems +from .channelsabc import HBChannelABC #----------------------------------------------------------------------------- # Constants and exceptions @@ -33,52 +28,27 @@ major_protocol_version = kernel_protocol_version_info[0] class InvalidPortNumber(Exception): pass -#----------------------------------------------------------------------------- -# Utility functions -#----------------------------------------------------------------------------- - -# some utilities to validate message structure, these might get moved elsewhere -# if they prove to have more generic utility - -def validate_string_list(lst): - """Validate that the input is a list of strings. - - Raises ValueError if not.""" - if not isinstance(lst, list): - raise ValueError('input %r must be a list' % lst) - for x in lst: - if not isinstance(x, string_types): - raise ValueError('element %r in list must be a string' % x) - - -def validate_string_dict(dct): - """Validate that the input is a dict with string keys and values. - - Raises ValueError if not.""" - for k,v in iteritems(dct): - if not isinstance(k, string_types): - raise ValueError('key %r in dict must be a string' % k) - if not isinstance(v, string_types): - raise ValueError('value %r in dict must be a string' % v) - - -#----------------------------------------------------------------------------- -# ZMQ Socket Channel classes -#----------------------------------------------------------------------------- +class HBChannel(Thread): + """The heartbeat channel which monitors the kernel heartbeat. -class ZMQSocketChannel(Thread): - """The base class for the channels that use ZMQ sockets.""" + Note that the heartbeat channel is paused by default. As long as you start + this channel, the kernel manager will ensure that it is paused and un-paused + as appropriate. + """ context = None session = None socket = None - ioloop = None - stream = None - _address = None + address = None _exiting = False - proxy_methods = [] + + time_to_dead = 1. + poller = None + _running = None + _pause = None + _beating = None def __init__(self, context, session, address): - """Create a channel. + """Create the heartbeat monitor thread. Parameters ---------- @@ -89,7 +59,7 @@ class ZMQSocketChannel(Thread): address : zmq url Standard (ip, port) tuple that the kernel is listening on. """ - super(ZMQSocketChannel, self).__init__() + super(HBChannel, self).__init__() self.daemon = True self.context = context @@ -99,429 +69,16 @@ class ZMQSocketChannel(Thread): message = 'The port number for a channel cannot be 0.' raise InvalidPortNumber(message) address = "tcp://%s:%i" % address - self._address = address + self.address = address atexit.register(self._notice_exit) - def _notice_exit(self): - self._exiting = True - - def _run_loop(self): - """Run my loop, ignoring EINTR events in the poller""" - while True: - try: - self.ioloop.start() - except ZMQError as e: - if e.errno == errno.EINTR: - continue - else: - raise - except Exception: - if self._exiting: - break - else: - raise - else: - break - - def stop(self): - """Stop the channel's event loop and join its thread. - - This calls :meth:`~threading.Thread.join` and returns when the thread - terminates. :class:`RuntimeError` will be raised if - :meth:`~threading.Thread.start` is called again. - """ - if self.ioloop is not None: - self.ioloop.stop() - self.join() - self.close() - - def close(self): - if self.ioloop is not None: - try: - self.ioloop.close(all_fds=True) - except Exception: - pass - if self.socket is not None: - try: - self.socket.close(linger=0) - except Exception: - pass - self.socket = None - - @property - def address(self): - """Get the channel's address as a zmq url string. - - These URLS have the form: 'tcp://127.0.0.1:5555'. - """ - return self._address - - def _queue_send(self, msg): - """Queue a message to be sent from the IOLoop's thread. - - Parameters - ---------- - msg : message to send - - This is threadsafe, as it uses IOLoop.add_callback to give the loop's - thread control of the action. - """ - def thread_send(): - self.session.send(self.stream, msg) - self.ioloop.add_callback(thread_send) - - def _handle_recv(self, msg): - """Callback for stream.on_recv. - - Unpacks message, and calls handlers with it. - """ - ident,smsg = self.session.feed_identities(msg) - msg = self.session.deserialize(smsg) - self.call_handlers(msg) - - - -class ShellChannel(ZMQSocketChannel): - """The shell channel for issuing request/replies to the kernel.""" - - command_queue = None - # flag for whether execute requests should be allowed to call raw_input: - allow_stdin = True - proxy_methods = [ - 'execute', - 'complete', - 'inspect', - 'history', - 'kernel_info', - 'shutdown', - 'is_complete', - ] - - def __init__(self, context, session, address): - super(ShellChannel, self).__init__(context, session, address) - self.ioloop = ioloop.IOLoop() - - def run(self): - """The thread's main activity. Call start() instead.""" - self.socket = self.context.socket(zmq.DEALER) - self.socket.linger = 1000 - self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) - self.socket.connect(self.address) - self.stream = zmqstream.ZMQStream(self.socket, self.ioloop) - self.stream.on_recv(self._handle_recv) - self._run_loop() - - def call_handlers(self, msg): - """This method is called in the ioloop thread when a message arrives. - - Subclasses should override this method to handle incoming messages. - It is important to remember that this method is called in the thread - so that some logic must be done to ensure that the application level - handlers are called in the application thread. - """ - raise NotImplementedError('call_handlers must be defined in a subclass.') - - def execute(self, code, silent=False, store_history=True, - user_expressions=None, allow_stdin=None): - """Execute code in the kernel. - - Parameters - ---------- - code : str - A string of Python code. - - silent : bool, optional (default False) - If set, the kernel will execute the code as quietly possible, and - will force store_history to be False. - - store_history : bool, optional (default True) - If set, the kernel will store command history. This is forced - to be False if silent is True. - - user_expressions : dict, optional - A dict mapping names to expressions to be evaluated in the user's - dict. The expression values are returned as strings formatted using - :func:`repr`. - - allow_stdin : bool, optional (default self.allow_stdin) - Flag for whether the kernel can send stdin requests to frontends. - - Some frontends (e.g. the Notebook) do not support stdin requests. - If raw_input is called from code executed from such a frontend, a - StdinNotImplementedError will be raised. - - Returns - ------- - The msg_id of the message sent. - """ - if user_expressions is None: - user_expressions = {} - if allow_stdin is None: - allow_stdin = self.allow_stdin - - - # Don't waste network traffic if inputs are invalid - if not isinstance(code, string_types): - raise ValueError('code %r must be a string' % code) - validate_string_dict(user_expressions) - - # Create class for content/msg creation. Related to, but possibly - # not in Session. - content = dict(code=code, silent=silent, store_history=store_history, - user_expressions=user_expressions, - allow_stdin=allow_stdin, - ) - msg = self.session.msg('execute_request', content) - self._queue_send(msg) - return msg['header']['msg_id'] - - def complete(self, code, cursor_pos=None): - """Tab complete text in the kernel's namespace. - - Parameters - ---------- - code : str - The context in which completion is requested. - Can be anything between a variable name and an entire cell. - cursor_pos : int, optional - The position of the cursor in the block of code where the completion was requested. - Default: ``len(code)`` - - Returns - ------- - The msg_id of the message sent. - """ - if cursor_pos is None: - cursor_pos = len(code) - content = dict(code=code, cursor_pos=cursor_pos) - msg = self.session.msg('complete_request', content) - self._queue_send(msg) - return msg['header']['msg_id'] - - def inspect(self, code, cursor_pos=None, detail_level=0): - """Get metadata information about an object in the kernel's namespace. - - It is up to the kernel to determine the appropriate object to inspect. - - Parameters - ---------- - code : str - The context in which info is requested. - Can be anything between a variable name and an entire cell. - cursor_pos : int, optional - The position of the cursor in the block of code where the info was requested. - Default: ``len(code)`` - detail_level : int, optional - The level of detail for the introspection (0-2) - - Returns - ------- - The msg_id of the message sent. - """ - if cursor_pos is None: - cursor_pos = len(code) - content = dict(code=code, cursor_pos=cursor_pos, - detail_level=detail_level, - ) - msg = self.session.msg('inspect_request', content) - self._queue_send(msg) - return msg['header']['msg_id'] - - def history(self, raw=True, output=False, hist_access_type='range', **kwargs): - """Get entries from the kernel's history list. - - Parameters - ---------- - raw : bool - If True, return the raw input. - output : bool - If True, then return the output as well. - hist_access_type : str - 'range' (fill in session, start and stop params), 'tail' (fill in n) - or 'search' (fill in pattern param). - - session : int - For a range request, the session from which to get lines. Session - numbers are positive integers; negative ones count back from the - current session. - start : int - The first line number of a history range. - stop : int - The final (excluded) line number of a history range. - - n : int - The number of lines of history to get for a tail request. - - pattern : str - The glob-syntax pattern for a search request. - - Returns - ------- - The msg_id of the message sent. - """ - content = dict(raw=raw, output=output, hist_access_type=hist_access_type, - **kwargs) - msg = self.session.msg('history_request', content) - self._queue_send(msg) - return msg['header']['msg_id'] - - def kernel_info(self): - """Request kernel info.""" - msg = self.session.msg('kernel_info_request') - self._queue_send(msg) - return msg['header']['msg_id'] - - def _handle_kernel_info_reply(self, msg): - """handle kernel info reply - - sets protocol adaptation version - """ - adapt_version = int(msg['content']['protocol_version'].split('.')[0]) - if adapt_version != major_protocol_version: - self.session.adapt_version = adapt_version - - def shutdown(self, restart=False): - """Request an immediate kernel shutdown. - - Upon receipt of the (empty) reply, client code can safely assume that - the kernel has shut down and it's safe to forcefully terminate it if - it's still alive. - - The kernel will send the reply via a function registered with Python's - atexit module, ensuring it's truly done as the kernel is done with all - normal operation. - """ - # Send quit message to kernel. Once we implement kernel-side setattr, - # this should probably be done that way, but for now this will do. - msg = self.session.msg('shutdown_request', {'restart':restart}) - self._queue_send(msg) - return msg['header']['msg_id'] - - def is_complete(self, code): - msg = self.session.msg('is_complete_request', {'code': code}) - self._queue_send(msg) - return msg['header']['msg_id'] - - -class IOPubChannel(ZMQSocketChannel): - """The iopub channel which listens for messages that the kernel publishes. - - This channel is where all output is published to frontends. - """ - - def __init__(self, context, session, address): - super(IOPubChannel, self).__init__(context, session, address) - self.ioloop = ioloop.IOLoop() - - def run(self): - """The thread's main activity. Call start() instead.""" - self.socket = self.context.socket(zmq.SUB) - self.socket.linger = 1000 - self.socket.setsockopt(zmq.SUBSCRIBE,b'') - self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) - self.socket.connect(self.address) - self.stream = zmqstream.ZMQStream(self.socket, self.ioloop) - self.stream.on_recv(self._handle_recv) - self._run_loop() - - def call_handlers(self, msg): - """This method is called in the ioloop thread when a message arrives. - - Subclasses should override this method to handle incoming messages. - It is important to remember that this method is called in the thread - so that some logic must be done to ensure that the application leve - handlers are called in the application thread. - """ - raise NotImplementedError('call_handlers must be defined in a subclass.') - - def flush(self, timeout=1.0): - """Immediately processes all pending messages on the iopub channel. - - Callers should use this method to ensure that :meth:`call_handlers` - has been called for all messages that have been received on the - 0MQ SUB socket of this channel. - - This method is thread safe. - - Parameters - ---------- - timeout : float, optional - The maximum amount of time to spend flushing, in seconds. The - default is one second. - """ - # We do the IOLoop callback process twice to ensure that the IOLoop - # gets to perform at least one full poll. - stop_time = time.time() + timeout - for i in range(2): - self._flushed = False - self.ioloop.add_callback(self._flush) - while not self._flushed and time.time() < stop_time: - time.sleep(0.01) - - def _flush(self): - """Callback for :method:`self.flush`.""" - self.stream.flush() - self._flushed = True - - -class StdInChannel(ZMQSocketChannel): - """The stdin channel to handle raw_input requests that the kernel makes.""" - - msg_queue = None - proxy_methods = ['input'] - - def __init__(self, context, session, address): - super(StdInChannel, self).__init__(context, session, address) - self.ioloop = ioloop.IOLoop() - - def run(self): - """The thread's main activity. Call start() instead.""" - self.socket = self.context.socket(zmq.DEALER) - self.socket.linger = 1000 - self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) - self.socket.connect(self.address) - self.stream = zmqstream.ZMQStream(self.socket, self.ioloop) - self.stream.on_recv(self._handle_recv) - self._run_loop() - - def call_handlers(self, msg): - """This method is called in the ioloop thread when a message arrives. - - Subclasses should override this method to handle incoming messages. - It is important to remember that this method is called in the thread - so that some logic must be done to ensure that the application leve - handlers are called in the application thread. - """ - raise NotImplementedError('call_handlers must be defined in a subclass.') - - def input(self, string): - """Send a string of raw input to the kernel.""" - content = dict(value=string) - msg = self.session.msg('input_reply', content) - self._queue_send(msg) - - -class HBChannel(ZMQSocketChannel): - """The heartbeat channel which monitors the kernel heartbeat. - - Note that the heartbeat channel is paused by default. As long as you start - this channel, the kernel manager will ensure that it is paused and un-paused - as appropriate. - """ - - time_to_dead = 3.0 - socket = None - poller = None - _running = None - _pause = None - _beating = None - - def __init__(self, context, session, address): - super(HBChannel, self).__init__(context, session, address) self._running = False - self._pause =True + self._pause = True self.poller = zmq.Poller() + def _notice_exit(self): + self._exiting = True + def _create_socket(self): if self.socket is not None: # close previous socket, before opening a new one @@ -621,7 +178,16 @@ class HBChannel(ZMQSocketChannel): def stop(self): """Stop the channel's event loop and join its thread.""" self._running = False - super(HBChannel, self).stop() + self.join() + self.close() + + def close(self): + if self.socket is not None: + try: + self.socket.close(linger=0) + except Exception: + pass + self.socket = None def call_handlers(self, since_last_heartbeat): """This method is called in the ioloop thread when a message arrives. @@ -631,14 +197,7 @@ class HBChannel(ZMQSocketChannel): so that some logic must be done to ensure that the application level handlers are called in the application thread. """ - raise NotImplementedError('call_handlers must be defined in a subclass.') + pass -#---------------------------------------------------------------------#----------------------------------------------------------------------------- -# ABC Registration -#----------------------------------------------------------------------------- - -ShellChannelABC.register(ShellChannel) -IOPubChannelABC.register(IOPubChannel) HBChannelABC.register(HBChannel) -StdInChannelABC.register(StdInChannel) diff --git a/IPython/kernel/channelsabc.py b/IPython/kernel/channelsabc.py index dadf612..b4f5c3d 100644 --- a/IPython/kernel/channelsabc.py +++ b/IPython/kernel/channelsabc.py @@ -24,70 +24,6 @@ class ChannelABC(with_metaclass(abc.ABCMeta, object)): pass -class ShellChannelABC(ChannelABC): - """ShellChannel ABC. - - The docstrings for this class can be found in the base implementation: - - `IPython.kernel.channels.ShellChannel` - """ - - @abc.abstractproperty - def allow_stdin(self): - pass - - @abc.abstractmethod - def execute(self, code, silent=False, store_history=True, - user_expressions=None, allow_stdin=None): - pass - - @abc.abstractmethod - def complete(self, text, line, cursor_pos, block=None): - pass - - @abc.abstractmethod - def inspect(self, oname, detail_level=0): - pass - - @abc.abstractmethod - def history(self, raw=True, output=False, hist_access_type='range', **kwargs): - pass - - @abc.abstractmethod - def kernel_info(self): - pass - - @abc.abstractmethod - def shutdown(self, restart=False): - pass - - -class IOPubChannelABC(ChannelABC): - """IOPubChannel ABC. - - The docstrings for this class can be found in the base implementation: - - `IPython.kernel.channels.IOPubChannel` - """ - - @abc.abstractmethod - def flush(self, timeout=1.0): - pass - - -class StdInChannelABC(ChannelABC): - """StdInChannel ABC. - - The docstrings for this class can be found in the base implementation: - - `IPython.kernel.channels.StdInChannel` - """ - - @abc.abstractmethod - def input(self, string): - pass - - class HBChannelABC(ChannelABC): """HBChannel ABC. diff --git a/IPython/kernel/client.py b/IPython/kernel/client.py index e7f47cc..3040502 100644 --- a/IPython/kernel/client.py +++ b/IPython/kernel/client.py @@ -4,6 +4,8 @@ # Distributed under the terms of the Modified BSD License. from __future__ import absolute_import +from IPython.kernel.channels import major_protocol_version +from IPython.utils.py3compat import string_types, iteritems import zmq @@ -11,15 +13,25 @@ from IPython.utils.traitlets import ( Any, Instance, Type, ) -from .zmq.session import Session -from .channels import ( - ShellChannel, IOPubChannel, - HBChannel, StdInChannel, -) +from .channelsabc import (ChannelABC, HBChannelABC) from .clientabc import KernelClientABC from .connect import ConnectionFileMixin +# some utilities to validate message structure, these might get moved elsewhere +# if they prove to have more generic utility + +def validate_string_dict(dct): + """Validate that the input is a dict with string keys and values. + + Raises ValueError if not.""" + for k,v in iteritems(dct): + if not isinstance(k, string_types): + raise ValueError('key %r in dict must be a string' % k) + if not isinstance(v, string_types): + raise ValueError('value %r in dict must be a string' % v) + + class KernelClient(ConnectionFileMixin): """Communicates with a single kernel on any host via zmq channels. @@ -42,10 +54,10 @@ class KernelClient(ConnectionFileMixin): return zmq.Context.instance() # The classes to use for the various channels - shell_channel_class = Type(ShellChannel) - iopub_channel_class = Type(IOPubChannel) - stdin_channel_class = Type(StdInChannel) - hb_channel_class = Type(HBChannel) + shell_channel_class = Type(ChannelABC) + iopub_channel_class = Type(ChannelABC) + stdin_channel_class = Type(ChannelABC) + hb_channel_class = Type(HBChannelABC) # Protected traits _shell_channel = Any @@ -53,6 +65,9 @@ class KernelClient(ConnectionFileMixin): _stdin_channel = Any _hb_channel = Any + # flag for whether execute requests should be allowed to call raw_input: + allow_stdin = True + #-------------------------------------------------------------------------- # Channel proxy methods #-------------------------------------------------------------------------- @@ -87,19 +102,14 @@ class KernelClient(ConnectionFileMixin): """ if shell: self.shell_channel.start() - for method in self.shell_channel.proxy_methods: - setattr(self, method, getattr(self.shell_channel, method)) + self.kernel_info() if iopub: self.iopub_channel.start() - for method in self.iopub_channel.proxy_methods: - setattr(self, method, getattr(self.iopub_channel, method)) if stdin: self.stdin_channel.start() - for method in self.stdin_channel.proxy_methods: - setattr(self, method, getattr(self.stdin_channel, method)) - self.shell_channel.allow_stdin = True + self.allow_stdin = True else: - self.shell_channel.allow_stdin = False + self.allow_stdin = False if hb: self.hb_channel.start() @@ -123,14 +133,17 @@ class KernelClient(ConnectionFileMixin): return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or self.stdin_channel.is_alive() or self.hb_channel.is_alive()) + ioloop = None # Overridden in subclasses that use pyzmq event loop + @property def shell_channel(self): """Get the shell channel object for this kernel.""" if self._shell_channel is None: url = self._make_url('shell') self.log.debug("connecting shell channel to %s", url) + socket = self.connect_shell(identity=self.session.bsession) self._shell_channel = self.shell_channel_class( - self.context, self.session, url + socket, self.session, self.ioloop ) return self._shell_channel @@ -140,8 +153,9 @@ class KernelClient(ConnectionFileMixin): if self._iopub_channel is None: url = self._make_url('iopub') self.log.debug("connecting iopub channel to %s", url) + socket = self.connect_iopub() self._iopub_channel = self.iopub_channel_class( - self.context, self.session, url + socket, self.session, self.ioloop ) return self._iopub_channel @@ -151,8 +165,9 @@ class KernelClient(ConnectionFileMixin): if self._stdin_channel is None: url = self._make_url('stdin') self.log.debug("connecting stdin channel to %s", url) + socket = self.connect_stdin(identity=self.session.bsession) self._stdin_channel = self.stdin_channel_class( - self.context, self.session, url + socket, self.session, self.ioloop ) return self._stdin_channel @@ -179,8 +194,193 @@ class KernelClient(ConnectionFileMixin): return True -#----------------------------------------------------------------------------- -# ABC Registration -#----------------------------------------------------------------------------- + # Methods to send specific messages on channels + def execute(self, code, silent=False, store_history=True, + user_expressions=None, allow_stdin=None): + """Execute code in the kernel. + + Parameters + ---------- + code : str + A string of Python code. + + silent : bool, optional (default False) + If set, the kernel will execute the code as quietly possible, and + will force store_history to be False. + + store_history : bool, optional (default True) + If set, the kernel will store command history. This is forced + to be False if silent is True. + + user_expressions : dict, optional + A dict mapping names to expressions to be evaluated in the user's + dict. The expression values are returned as strings formatted using + :func:`repr`. + + allow_stdin : bool, optional (default self.allow_stdin) + Flag for whether the kernel can send stdin requests to frontends. + + Some frontends (e.g. the Notebook) do not support stdin requests. + If raw_input is called from code executed from such a frontend, a + StdinNotImplementedError will be raised. + + Returns + ------- + The msg_id of the message sent. + """ + if user_expressions is None: + user_expressions = {} + if allow_stdin is None: + allow_stdin = self.allow_stdin + + + # Don't waste network traffic if inputs are invalid + if not isinstance(code, string_types): + raise ValueError('code %r must be a string' % code) + validate_string_dict(user_expressions) + + # Create class for content/msg creation. Related to, but possibly + # not in Session. + content = dict(code=code, silent=silent, store_history=store_history, + user_expressions=user_expressions, + allow_stdin=allow_stdin, + ) + msg = self.session.msg('execute_request', content) + self.shell_channel.send(msg) + return msg['header']['msg_id'] + + def complete(self, code, cursor_pos=None): + """Tab complete text in the kernel's namespace. + + Parameters + ---------- + code : str + The context in which completion is requested. + Can be anything between a variable name and an entire cell. + cursor_pos : int, optional + The position of the cursor in the block of code where the completion was requested. + Default: ``len(code)`` + + Returns + ------- + The msg_id of the message sent. + """ + if cursor_pos is None: + cursor_pos = len(code) + content = dict(code=code, cursor_pos=cursor_pos) + msg = self.session.msg('complete_request', content) + self.shell_channel.send(msg) + return msg['header']['msg_id'] + + def inspect(self, code, cursor_pos=None, detail_level=0): + """Get metadata information about an object in the kernel's namespace. + + It is up to the kernel to determine the appropriate object to inspect. + + Parameters + ---------- + code : str + The context in which info is requested. + Can be anything between a variable name and an entire cell. + cursor_pos : int, optional + The position of the cursor in the block of code where the info was requested. + Default: ``len(code)`` + detail_level : int, optional + The level of detail for the introspection (0-2) + + Returns + ------- + The msg_id of the message sent. + """ + if cursor_pos is None: + cursor_pos = len(code) + content = dict(code=code, cursor_pos=cursor_pos, + detail_level=detail_level, + ) + msg = self.session.msg('inspect_request', content) + self.shell_channel.send(msg) + return msg['header']['msg_id'] + + def history(self, raw=True, output=False, hist_access_type='range', **kwargs): + """Get entries from the kernel's history list. + + Parameters + ---------- + raw : bool + If True, return the raw input. + output : bool + If True, then return the output as well. + hist_access_type : str + 'range' (fill in session, start and stop params), 'tail' (fill in n) + or 'search' (fill in pattern param). + + session : int + For a range request, the session from which to get lines. Session + numbers are positive integers; negative ones count back from the + current session. + start : int + The first line number of a history range. + stop : int + The final (excluded) line number of a history range. + + n : int + The number of lines of history to get for a tail request. + + pattern : str + The glob-syntax pattern for a search request. + + Returns + ------- + The msg_id of the message sent. + """ + content = dict(raw=raw, output=output, hist_access_type=hist_access_type, + **kwargs) + msg = self.session.msg('history_request', content) + self.shell_channel.send(msg) + return msg['header']['msg_id'] + + def kernel_info(self): + """Request kernel info.""" + msg = self.session.msg('kernel_info_request') + self.shell_channel.send(msg) + return msg['header']['msg_id'] + + def _handle_kernel_info_reply(self, msg): + """handle kernel info reply + + sets protocol adaptation version + """ + adapt_version = int(msg['content']['protocol_version'].split('.')[0]) + if adapt_version != major_protocol_version: + self.session.adapt_version = adapt_version + + def shutdown(self, restart=False): + """Request an immediate kernel shutdown. + + Upon receipt of the (empty) reply, client code can safely assume that + the kernel has shut down and it's safe to forcefully terminate it if + it's still alive. + + The kernel will send the reply via a function registered with Python's + atexit module, ensuring it's truly done as the kernel is done with all + normal operation. + """ + # Send quit message to kernel. Once we implement kernel-side setattr, + # this should probably be done that way, but for now this will do. + msg = self.session.msg('shutdown_request', {'restart':restart}) + self.shell_channel.send(msg) + return msg['header']['msg_id'] + + def is_complete(self, code): + msg = self.session.msg('is_complete_request', {'code': code}) + self.shell_channel.send(msg) + return msg['header']['msg_id'] + + def input(self, string): + """Send a string of raw input to the kernel.""" + content = dict(value=string) + msg = self.session.msg('input_reply', content) + self.stdin_channel.send(msg) + KernelClientABC.register(KernelClient) diff --git a/IPython/kernel/inprocess/__init__.py b/IPython/kernel/inprocess/__init__.py index 6070a7c..8da7561 100644 --- a/IPython/kernel/inprocess/__init__.py +++ b/IPython/kernel/inprocess/__init__.py @@ -1,7 +1,5 @@ from .channels import ( - InProcessShellChannel, - InProcessIOPubChannel, - InProcessStdInChannel, + InProcessChannel, InProcessHBChannel, ) diff --git a/IPython/kernel/inprocess/blocking.py b/IPython/kernel/inprocess/blocking.py index 91042c9..1d3a010 100644 --- a/IPython/kernel/inprocess/blocking.py +++ b/IPython/kernel/inprocess/blocking.py @@ -9,35 +9,55 @@ Useful for test suites and blocking terminal interfaces. # the file COPYING.txt, distributed as part of this software. #----------------------------------------------------------------------------- -#----------------------------------------------------------------------------- -# Imports -#----------------------------------------------------------------------------- +try: + from queue import Queue, Empty # Py 3 +except ImportError: + from Queue import Queue, Empty # Py 2 # IPython imports from IPython.utils.io import raw_print from IPython.utils.traitlets import Type -from IPython.kernel.blocking.channels import BlockingChannelMixin +#from IPython.kernel.blocking.channels import BlockingChannelMixin # Local imports from .channels import ( - InProcessShellChannel, - InProcessIOPubChannel, - InProcessStdInChannel, + InProcessChannel, ) from .client import InProcessKernelClient -#----------------------------------------------------------------------------- -# Blocking kernel manager -#----------------------------------------------------------------------------- +class BlockingInProcessChannel(InProcessChannel): + + def __init__(self, *args, **kwds): + super(BlockingInProcessChannel, self).__init__(*args, **kwds) + self._in_queue = Queue() -class BlockingInProcessShellChannel(BlockingChannelMixin, InProcessShellChannel): - pass + def call_handlers(self, msg): + self._in_queue.put(msg) + + def get_msg(self, block=True, timeout=None): + """ Gets a message if there is one that is ready. """ + if timeout is None: + # Queue.get(timeout=None) has stupid uninteruptible + # behavior, so wait for a week instead + timeout = 604800 + return self._in_queue.get(block, timeout) + + def get_msgs(self): + """ Get all messages that are currently ready. """ + msgs = [] + while True: + try: + msgs.append(self.get_msg(block=False)) + except Empty: + break + return msgs -class BlockingInProcessIOPubChannel(BlockingChannelMixin, InProcessIOPubChannel): - pass + def msg_ready(self): + """ Is there a message that has been received? """ + return not self._in_queue.empty() -class BlockingInProcessStdInChannel(BlockingChannelMixin, InProcessStdInChannel): +class BlockingInProcessStdInChannel(BlockingInProcessChannel): def call_handlers(self, msg): """ Overridden for the in-process channel. @@ -48,11 +68,27 @@ class BlockingInProcessStdInChannel(BlockingChannelMixin, InProcessStdInChannel) _raw_input = self.client.kernel._sys_raw_input prompt = msg['content']['prompt'] raw_print(prompt, end='') - self.input(_raw_input()) + self.client.input(_raw_input()) class BlockingInProcessKernelClient(InProcessKernelClient): # The classes to use for the various channels. - shell_channel_class = Type(BlockingInProcessShellChannel) - iopub_channel_class = Type(BlockingInProcessIOPubChannel) + shell_channel_class = Type(BlockingInProcessChannel) + iopub_channel_class = Type(BlockingInProcessChannel) stdin_channel_class = Type(BlockingInProcessStdInChannel) + + def wait_for_ready(self): + # Wait for kernel info reply on shell channel + while True: + msg = self.shell_channel.get_msg(block=True) + if msg['msg_type'] == 'kernel_info_reply': + self._handle_kernel_info_reply(msg) + break + + # Flush IOPub channel + while True: + try: + msg = self.iopub_channel.get_msg(block=True, timeout=0.2) + print(msg['msg_type']) + except Empty: + break diff --git a/IPython/kernel/inprocess/channels.py b/IPython/kernel/inprocess/channels.py index 25eb1a0..40b1011 100644 --- a/IPython/kernel/inprocess/channels.py +++ b/IPython/kernel/inprocess/channels.py @@ -3,10 +3,7 @@ # Copyright (c) IPython Development Team. # Distributed under the terms of the Modified BSD License. -from IPython.kernel.channelsabc import ( - ShellChannelABC, IOPubChannelABC, - HBChannelABC, StdInChannelABC, -) +from IPython.kernel.channelsabc import HBChannelABC from .socket import DummySocket @@ -23,10 +20,6 @@ class InProcessChannel(object): self.client = client self._is_alive = False - #-------------------------------------------------------------------------- - # Channel interface - #-------------------------------------------------------------------------- - def is_alive(self): return self._is_alive @@ -43,9 +36,9 @@ class InProcessChannel(object): """ raise NotImplementedError('call_handlers must be defined in a subclass.') - #-------------------------------------------------------------------------- - # InProcessChannel interface - #-------------------------------------------------------------------------- + def flush(self, timeout=1.0): + pass + def call_handlers_later(self, *args, **kwds): """ Call the message handlers later. @@ -65,117 +58,31 @@ class InProcessChannel(object): raise NotImplementedError -class InProcessShellChannel(InProcessChannel): - """See `IPython.kernel.channels.ShellChannel` for docstrings.""" - - # flag for whether execute requests should be allowed to call raw_input - allow_stdin = True - proxy_methods = [ - 'execute', - 'complete', - 'inspect', - 'history', - 'shutdown', - 'kernel_info', - ] - - #-------------------------------------------------------------------------- - # ShellChannel interface - #-------------------------------------------------------------------------- - - def execute(self, code, silent=False, store_history=True, - user_expressions={}, allow_stdin=None): - if allow_stdin is None: - allow_stdin = self.allow_stdin - content = dict(code=code, silent=silent, store_history=store_history, - user_expressions=user_expressions, - allow_stdin=allow_stdin) - msg = self.client.session.msg('execute_request', content) - self._dispatch_to_kernel(msg) - return msg['header']['msg_id'] - - def complete(self, code, cursor_pos=None): - if cursor_pos is None: - cursor_pos = len(code) - content = dict(code=code, cursor_pos=cursor_pos) - msg = self.client.session.msg('complete_request', content) - self._dispatch_to_kernel(msg) - return msg['header']['msg_id'] - - def inspect(self, code, cursor_pos=None, detail_level=0): - if cursor_pos is None: - cursor_pos = len(code) - content = dict(code=code, cursor_pos=cursor_pos, - detail_level=detail_level, - ) - msg = self.client.session.msg('inspect_request', content) - self._dispatch_to_kernel(msg) - return msg['header']['msg_id'] - - def history(self, raw=True, output=False, hist_access_type='range', **kwds): - content = dict(raw=raw, output=output, - hist_access_type=hist_access_type, **kwds) - msg = self.client.session.msg('history_request', content) - self._dispatch_to_kernel(msg) - return msg['header']['msg_id'] - - def shutdown(self, restart=False): - # FIXME: What to do here? - raise NotImplementedError('Cannot shutdown in-process kernel') - - def kernel_info(self): - """Request kernel info.""" - msg = self.client.session.msg('kernel_info_request') - self._dispatch_to_kernel(msg) - return msg['header']['msg_id'] - - #-------------------------------------------------------------------------- - # Protected interface - #-------------------------------------------------------------------------- - - def _dispatch_to_kernel(self, msg): - """ Send a message to the kernel and handle a reply. - """ - kernel = self.client.kernel - if kernel is None: - raise RuntimeError('Cannot send request. No kernel exists.') - - stream = DummySocket() - self.client.session.send(stream, msg) - msg_parts = stream.recv_multipart() - kernel.dispatch_shell(stream, msg_parts) - - idents, reply_msg = self.client.session.recv(stream, copy=False) - self.call_handlers_later(reply_msg) +class InProcessHBChannel(object): + """A dummy heartbeat channel interface for in-process kernels. -class InProcessIOPubChannel(InProcessChannel): - """See `IPython.kernel.channels.IOPubChannel` for docstrings.""" + Normally we use the heartbeat to check that the kernel process is alive. + When the kernel is in-process, that doesn't make sense, but clients still + expect this interface. + """ - def flush(self, timeout=1.0): - pass - - -class InProcessStdInChannel(InProcessChannel): - """See `IPython.kernel.channels.StdInChannel` for docstrings.""" - - proxy_methods = ['input'] - - def input(self, string): - kernel = self.client.kernel - if kernel is None: - raise RuntimeError('Cannot send input reply. No kernel exists.') - kernel.raw_input_str = string + time_to_dead = 3.0 + def __init__(self, client=None): + super(InProcessHBChannel, self).__init__() + self.client = client + self._is_alive = False + self._pause = True -class InProcessHBChannel(InProcessChannel): - """See `IPython.kernel.channels.HBChannel` for docstrings.""" + def is_alive(self): + return self._is_alive - time_to_dead = 3.0 + def start(self): + self._is_alive = True - def __init__(self, *args, **kwds): - super(InProcessHBChannel, self).__init__(*args, **kwds) - self._pause = True + def stop(self): + self._is_alive = False def pause(self): self._pause = True @@ -186,11 +93,5 @@ class InProcessHBChannel(InProcessChannel): def is_beating(self): return not self._pause -#----------------------------------------------------------------------------- -# ABC Registration -#----------------------------------------------------------------------------- -ShellChannelABC.register(InProcessShellChannel) -IOPubChannelABC.register(InProcessIOPubChannel) HBChannelABC.register(InProcessHBChannel) -StdInChannelABC.register(InProcessStdInChannel) diff --git a/IPython/kernel/inprocess/client.py b/IPython/kernel/inprocess/client.py index 6484a47..8b48f3b 100644 --- a/IPython/kernel/inprocess/client.py +++ b/IPython/kernel/inprocess/client.py @@ -12,16 +12,15 @@ #----------------------------------------------------------------------------- # IPython imports +from IPython.kernel.inprocess.socket import DummySocket from IPython.utils.traitlets import Type, Instance from IPython.kernel.clientabc import KernelClientABC from IPython.kernel.client import KernelClient # Local imports from .channels import ( - InProcessShellChannel, - InProcessIOPubChannel, + InProcessChannel, InProcessHBChannel, - InProcessStdInChannel, ) @@ -40,9 +39,9 @@ class InProcessKernelClient(KernelClient): """ # The classes to use for the various channels. - shell_channel_class = Type(InProcessShellChannel) - iopub_channel_class = Type(InProcessIOPubChannel) - stdin_channel_class = Type(InProcessStdInChannel) + shell_channel_class = Type(InProcessChannel) + iopub_channel_class = Type(InProcessChannel) + stdin_channel_class = Type(InProcessChannel) hb_channel_class = Type(InProcessHBChannel) kernel = Instance('IPython.kernel.inprocess.ipkernel.InProcessKernel') @@ -79,6 +78,76 @@ class InProcessKernelClient(KernelClient): self._hb_channel = self.hb_channel_class(self) return self._hb_channel + # Methods for sending specific messages + # ------------------------------------- + + def execute(self, code, silent=False, store_history=True, + user_expressions={}, allow_stdin=None): + if allow_stdin is None: + allow_stdin = self.allow_stdin + content = dict(code=code, silent=silent, store_history=store_history, + user_expressions=user_expressions, + allow_stdin=allow_stdin) + msg = self.session.msg('execute_request', content) + self._dispatch_to_kernel(msg) + return msg['header']['msg_id'] + + def complete(self, code, cursor_pos=None): + if cursor_pos is None: + cursor_pos = len(code) + content = dict(code=code, cursor_pos=cursor_pos) + msg = self.session.msg('complete_request', content) + self._dispatch_to_kernel(msg) + return msg['header']['msg_id'] + + def inspect(self, code, cursor_pos=None, detail_level=0): + if cursor_pos is None: + cursor_pos = len(code) + content = dict(code=code, cursor_pos=cursor_pos, + detail_level=detail_level, + ) + msg = self.session.msg('inspect_request', content) + self._dispatch_to_kernel(msg) + return msg['header']['msg_id'] + + def history(self, raw=True, output=False, hist_access_type='range', **kwds): + content = dict(raw=raw, output=output, + hist_access_type=hist_access_type, **kwds) + msg = self.session.msg('history_request', content) + self._dispatch_to_kernel(msg) + return msg['header']['msg_id'] + + def shutdown(self, restart=False): + # FIXME: What to do here? + raise NotImplementedError('Cannot shutdown in-process kernel') + + def kernel_info(self): + """Request kernel info.""" + msg = self.session.msg('kernel_info_request') + self._dispatch_to_kernel(msg) + return msg['header']['msg_id'] + + def input(self, string): + if self.kernel is None: + raise RuntimeError('Cannot send input reply. No kernel exists.') + self.kernel.raw_input_str = string + + + def _dispatch_to_kernel(self, msg): + """ Send a message to the kernel and handle a reply. + """ + kernel = self.kernel + if kernel is None: + raise RuntimeError('Cannot send request. No kernel exists.') + + stream = DummySocket() + self.session.send(stream, msg) + msg_parts = stream.recv_multipart() + kernel.dispatch_shell(stream, msg_parts) + + idents, reply_msg = self.session.recv(stream, copy=False) + self.shell_channel.call_handlers_later(reply_msg) + #----------------------------------------------------------------------------- # ABC Registration diff --git a/IPython/kernel/inprocess/tests/test_kernel.py b/IPython/kernel/inprocess/tests/test_kernel.py index e32b72e..41d20c8 100644 --- a/IPython/kernel/inprocess/tests/test_kernel.py +++ b/IPython/kernel/inprocess/tests/test_kernel.py @@ -26,6 +26,7 @@ class InProcessKernelTestCase(unittest.TestCase): self.km.start_kernel() self.kc = BlockingInProcessKernelClient(kernel=self.km.kernel) self.kc.start_channels() + self.kc.wait_for_ready() @skipif_not_matplotlib def test_pylab(self): @@ -61,7 +62,7 @@ class InProcessKernelTestCase(unittest.TestCase): kc = BlockingInProcessKernelClient(kernel=kernel) kernel.frontends.append(kc) - kc.shell_channel.execute('print("bar")') + kc.execute('print("bar")') msg = get_stream_message(kc) self.assertEqual(msg['content']['text'], 'bar\n') diff --git a/IPython/kernel/inprocess/tests/test_kernelmanager.py b/IPython/kernel/inprocess/tests/test_kernelmanager.py index 2c029ef..e191356 100644 --- a/IPython/kernel/inprocess/tests/test_kernelmanager.py +++ b/IPython/kernel/inprocess/tests/test_kernelmanager.py @@ -51,6 +51,7 @@ class InProcessKernelManagerTestCase(unittest.TestCase): km.start_kernel() kc = BlockingInProcessKernelClient(kernel=km.kernel) kc.start_channels() + kc.wait_for_ready() kc.execute('foo = 1') self.assertEquals(km.kernel.shell.user_ns['foo'], 1) @@ -61,6 +62,7 @@ class InProcessKernelManagerTestCase(unittest.TestCase): km.start_kernel() kc = BlockingInProcessKernelClient(kernel=km.kernel) kc.start_channels() + kc.wait_for_ready() km.kernel.shell.push({'my_bar': 0, 'my_baz': 1}) kc.complete('my_ba', 5) msg = kc.get_shell_msg() @@ -75,6 +77,7 @@ class InProcessKernelManagerTestCase(unittest.TestCase): km.start_kernel() kc = BlockingInProcessKernelClient(kernel=km.kernel) kc.start_channels() + kc.wait_for_ready() km.kernel.shell.user_ns['foo'] = 1 kc.inspect('foo') msg = kc.get_shell_msg() @@ -91,6 +94,7 @@ class InProcessKernelManagerTestCase(unittest.TestCase): km.start_kernel() kc = BlockingInProcessKernelClient(kernel=km.kernel) kc.start_channels() + kc.wait_for_ready() kc.execute('%who') kc.history(hist_access_type='tail', n=1) msg = kc.shell_channel.get_msgs()[-1] diff --git a/IPython/kernel/manager.py b/IPython/kernel/manager.py index ad55deb..a500814 100644 --- a/IPython/kernel/manager.py +++ b/IPython/kernel/manager.py @@ -420,17 +420,8 @@ def start_new_kernel(startup_timeout=60, kernel_name='python', **kwargs): km.start_kernel(**kwargs) kc = km.client() kc.start_channels() + kc.wait_for_ready() - kc.kernel_info() - kc.get_shell_msg(block=True, timeout=startup_timeout) - - # Flush channels - for channel in (kc.shell_channel, kc.iopub_channel): - while True: - try: - channel.get_msg(block=True, timeout=0.1) - except Empty: - break return km, kc @contextmanager diff --git a/IPython/kernel/zmq/tests/test_embed_kernel.py b/IPython/kernel/zmq/tests/test_embed_kernel.py index 45f543f..977c22c 100644 --- a/IPython/kernel/zmq/tests/test_embed_kernel.py +++ b/IPython/kernel/zmq/tests/test_embed_kernel.py @@ -92,6 +92,7 @@ def setup_kernel(cmd): client = BlockingKernelClient(connection_file=connection_file) client.load_connection_file() client.start_channels() + client.wait_for_ready() try: yield client diff --git a/IPython/nbconvert/preprocessors/execute.py b/IPython/nbconvert/preprocessors/execute.py index 4360b6b..d80dda3 100644 --- a/IPython/nbconvert/preprocessors/execute.py +++ b/IPython/nbconvert/preprocessors/execute.py @@ -46,7 +46,7 @@ class ExecutePreprocessor(Preprocessor): if cell.cell_type != 'code': return cell, resources try: - outputs = self.run_cell(self.kc.shell_channel, self.kc.iopub_channel, cell) + outputs = self.run_cell(cell) except Exception as e: self.log.error("failed to run cell: " + repr(e)) self.log.error(str(cell.source)) @@ -54,13 +54,13 @@ class ExecutePreprocessor(Preprocessor): cell.outputs = outputs return cell, resources - def run_cell(self, shell, iopub, cell): - msg_id = shell.execute(cell.source) + def run_cell(self, cell): + msg_id = self.kc.execute(cell.source) self.log.debug("Executing cell:\n%s", cell.source) # wait for finish, with timeout while True: try: - msg = shell.get_msg(timeout=self.timeout) + msg = self.kc.shell_channel.get_msg(timeout=self.timeout) except Empty: self.log.error("Timeout waiting for execute reply") raise @@ -74,7 +74,7 @@ class ExecutePreprocessor(Preprocessor): while True: try: - msg = iopub.get_msg(timeout=self.timeout) + msg = self.kc.iopub_channel.get_msg(timeout=self.timeout) except Empty: self.log.warn("Timeout waiting for IOPub output") break diff --git a/IPython/qt/client.py b/IPython/qt/client.py index 0aa927a..dc316f9 100644 --- a/IPython/qt/client.py +++ b/IPython/qt/client.py @@ -1,37 +1,249 @@ """ Defines a KernelClient that provides signals and slots. """ +import atexit +import errno +from threading import Thread +import time + +import zmq +# import ZMQError in top-level namespace, to avoid ugly attribute-error messages +# during garbage collection of threads at exit: +from zmq import ZMQError +from zmq.eventloop import ioloop, zmqstream + +from IPython.external.qt import QtCore # Local imports -from IPython.utils.traitlets import Type -from IPython.kernel.channels import ( - ShellChannel, IOPubChannel, StdInChannel, HBChannel -) +from IPython.utils.traitlets import Type, Instance +from IPython.kernel.channels import HBChannel from IPython.kernel import KernelClient -from .kernel_mixins import ( - QtShellChannelMixin, QtIOPubChannelMixin, - QtStdInChannelMixin, QtHBChannelMixin, - QtKernelClientMixin -) +from .kernel_mixins import QtKernelClientMixin +from .util import SuperQObject -class QtShellChannel(QtShellChannelMixin, ShellChannel): - pass +class QtHBChannel(SuperQObject, HBChannel): + # A longer timeout than the base class + time_to_dead = 3.0 -class QtIOPubChannel(QtIOPubChannelMixin, IOPubChannel): - pass + # Emitted when the kernel has died. + kernel_died = QtCore.Signal(object) -class QtStdInChannel(QtStdInChannelMixin, StdInChannel): - pass + def call_handlers(self, since_last_heartbeat): + """ Reimplemented to emit signals instead of making callbacks. + """ + # Emit the generic signal. + self.kernel_died.emit(since_last_heartbeat) + +from IPython.core.release import kernel_protocol_version_info -class QtHBChannel(QtHBChannelMixin, HBChannel): +major_protocol_version = kernel_protocol_version_info[0] + +class InvalidPortNumber(Exception): pass +class QtZMQSocketChannel(SuperQObject): + """A ZMQ socket emitting a Qt signal when a message is received.""" + session = None + socket = None + ioloop = None + stream = None + + message_received = QtCore.Signal(object) + + def process_events(self): + """ Process any pending GUI events. + """ + QtCore.QCoreApplication.instance().processEvents() + + def __init__(self, socket, session, loop): + """Create a channel. + + Parameters + ---------- + socket : :class:`zmq.Socket` + The ZMQ socket to use. + session : :class:`session.Session` + The session to use. + loop + A pyzmq ioloop to connect the socket to using a ZMQStream + """ + super(QtZMQSocketChannel, self).__init__() + + self.socket = socket + self.session = session + self.ioloop = loop + + self.stream = zmqstream.ZMQStream(self.socket, self.ioloop) + self.stream.on_recv(self._handle_recv) + + _is_alive = False + def is_alive(self): + return self._is_alive + + def start(self): + self._is_alive = True + + def stop(self): + self._is_alive = False + + def close(self): + if self.socket is not None: + try: + self.socket.close(linger=0) + except Exception: + pass + self.socket = None + + def send(self, msg): + """Queue a message to be sent from the IOLoop's thread. + + Parameters + ---------- + msg : message to send + + This is threadsafe, as it uses IOLoop.add_callback to give the loop's + thread control of the action. + """ + def thread_send(): + self.session.send(self.stream, msg) + self.ioloop.add_callback(thread_send) + + def _handle_recv(self, msg): + """Callback for stream.on_recv. + + Unpacks message, and calls handlers with it. + """ + ident,smsg = self.session.feed_identities(msg) + msg = self.session.deserialize(smsg) + self.call_handlers(msg) + + def call_handlers(self, msg): + """This method is called in the ioloop thread when a message arrives. + + Subclasses should override this method to handle incoming messages. + It is important to remember that this method is called in the thread + so that some logic must be done to ensure that the application level + handlers are called in the application thread. + """ + # Emit the generic signal. + self.message_received.emit(msg) + + def flush(self, timeout=1.0): + """Immediately processes all pending messages on this channel. + + This is only used for the IOPub channel. + + Callers should use this method to ensure that :meth:`call_handlers` + has been called for all messages that have been received on the + 0MQ SUB socket of this channel. + + This method is thread safe. + + Parameters + ---------- + timeout : float, optional + The maximum amount of time to spend flushing, in seconds. The + default is one second. + """ + # We do the IOLoop callback process twice to ensure that the IOLoop + # gets to perform at least one full poll. + stop_time = time.time() + timeout + for i in range(2): + self._flushed = False + self.ioloop.add_callback(self._flush) + while not self._flushed and time.time() < stop_time: + time.sleep(0.01) + + def _flush(self): + """Callback for :method:`self.flush`.""" + self.stream.flush() + self._flushed = True + + +class IOLoopThread(Thread): + """Run a pyzmq ioloop in a thread to send and receive messages + """ + def __init__(self, loop): + super(IOLoopThread, self).__init__() + self.daemon = True + atexit.register(self._notice_exit) + self.ioloop = loop or ioloop.IOLoop() + + def _notice_exit(self): + self._exiting = True + + def run(self): + """Run my loop, ignoring EINTR events in the poller""" + while True: + try: + self.ioloop.start() + except ZMQError as e: + if e.errno == errno.EINTR: + continue + else: + raise + except Exception: + if self._exiting: + break + else: + raise + else: + break + + def stop(self): + """Stop the channel's event loop and join its thread. + + This calls :meth:`~threading.Thread.join` and returns when the thread + terminates. :class:`RuntimeError` will be raised if + :meth:`~threading.Thread.start` is called again. + """ + if self.ioloop is not None: + self.ioloop.stop() + self.join() + self.close() + + def close(self): + if self.ioloop is not None: + try: + self.ioloop.close(all_fds=True) + except Exception: + pass + + class QtKernelClient(QtKernelClientMixin, KernelClient): """ A KernelClient that provides signals and slots. """ - iopub_channel_class = Type(QtIOPubChannel) - shell_channel_class = Type(QtShellChannel) - stdin_channel_class = Type(QtStdInChannel) + _ioloop = None + @property + def ioloop(self): + if self._ioloop is None: + self._ioloop = ioloop.IOLoop() + return self._ioloop + + ioloop_thread = Instance(IOLoopThread) + + def start_channels(self, shell=True, iopub=True, stdin=True, hb=True): + if shell: + self.shell_channel.message_received.connect(self._check_kernel_info_reply) + + self.ioloop_thread = IOLoopThread(self.ioloop) + self.ioloop_thread.start() + + super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb) + + def _check_kernel_info_reply(self, msg): + if msg['msg_type'] == 'kernel_info_reply': + self._handle_kernel_info_reply(msg) + self.shell_channel.message_received.disconnect(self._check_kernel_info_reply) + + def stop_channels(self): + super(QtKernelClient, self).stop_channels() + if self.ioloop_thread.is_alive(): + self.ioloop_thread.stop() + + iopub_channel_class = Type(QtZMQSocketChannel) + shell_channel_class = Type(QtZMQSocketChannel) + stdin_channel_class = Type(QtZMQSocketChannel) hb_channel_class = Type(QtHBChannel) diff --git a/IPython/qt/console/frontend_widget.py b/IPython/qt/console/frontend_widget.py index 4387450..95a184b 100644 --- a/IPython/qt/console/frontend_widget.py +++ b/IPython/qt/console/frontend_widget.py @@ -82,6 +82,8 @@ class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin): # The text to show when the kernel is (re)started. banner = Unicode(config=True) kernel_banner = Unicode() + # Whether to show the banner + _display_banner = Bool(False) # An option and corresponding signal for overriding the default kernel # interrupt behavior. @@ -464,7 +466,7 @@ class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin): self.kernel_client.iopub_channel.flush() def callback(line): - self.kernel_client.stdin_channel.input(line) + self.kernel_client.input(line) if self._reading: self.log.debug("Got second input request, assuming first was interrupted.") self._reading = False diff --git a/IPython/qt/console/history_console_widget.py b/IPython/qt/console/history_console_widget.py index d2c5354..e4af363 100644 --- a/IPython/qt/console/history_console_widget.py +++ b/IPython/qt/console/history_console_widget.py @@ -225,7 +225,7 @@ class HistoryConsoleWidget(ConsoleWidget): return self._history[-n:] def _request_update_session_history_length(self): - msg_id = self.kernel_client.shell_channel.execute('', + msg_id = self.kernel_client.execute('', silent=True, user_expressions={ 'hlen':'len(get_ipython().history_manager.input_hist_raw)', diff --git a/IPython/qt/console/ipython_widget.py b/IPython/qt/console/ipython_widget.py index 6f3da94..c9cbe8c 100644 --- a/IPython/qt/console/ipython_widget.py +++ b/IPython/qt/console/ipython_widget.py @@ -203,7 +203,7 @@ class IPythonWidget(FrontendWidget): self._retrying_history_request = True # wait out the kernel's queue flush, which is currently timed at 0.1s time.sleep(0.25) - self.kernel_client.shell_channel.history(hist_access_type='tail',n=1000) + self.kernel_client.history(hist_access_type='tail',n=1000) else: self._retrying_history_request = False return @@ -296,12 +296,11 @@ class IPythonWidget(FrontendWidget): # The reply will trigger %guiref load provided language=='python' self.kernel_client.kernel_info() - self.kernel_client.shell_channel.history(hist_access_type='tail', - n=1000) + self.kernel_client.history(hist_access_type='tail', n=1000) def _load_guiref_magic(self): """Load %guiref magic.""" - self.kernel_client.shell_channel.execute('\n'.join([ + self.kernel_client.execute('\n'.join([ "try:", " _usage", "except:", @@ -385,7 +384,7 @@ class IPythonWidget(FrontendWidget): """ # If a number was not specified, make a prompt number request. if number is None: - msg_id = self.kernel_client.shell_channel.execute('', silent=True) + msg_id = self.kernel_client.execute('', silent=True) info = self._ExecutionRequest(msg_id, 'prompt') self._request_info['execute'][msg_id] = info return diff --git a/IPython/qt/inprocess.py b/IPython/qt/inprocess.py index 8de6051..e6efafc 100644 --- a/IPython/qt/inprocess.py +++ b/IPython/qt/inprocess.py @@ -2,38 +2,73 @@ """ # Local imports. +from IPython.external.qt import QtCore from IPython.kernel.inprocess import ( - InProcessShellChannel, InProcessIOPubChannel, InProcessStdInChannel, InProcessHBChannel, InProcessKernelClient, InProcessKernelManager, ) +from IPython.kernel.inprocess.channels import InProcessChannel from IPython.utils.traitlets import Type +from .util import SuperQObject from .kernel_mixins import ( - QtShellChannelMixin, QtIOPubChannelMixin, - QtStdInChannelMixin, QtHBChannelMixin, QtKernelClientMixin, - QtKernelManagerMixin, + QtKernelClientMixin, QtKernelManagerMixin, ) +class QtInProcessChannel(SuperQObject, InProcessChannel): + # Emitted when the channel is started. + started = QtCore.Signal() -class QtInProcessShellChannel(QtShellChannelMixin, InProcessShellChannel): - pass + # Emitted when the channel is stopped. + stopped = QtCore.Signal() -class QtInProcessIOPubChannel(QtIOPubChannelMixin, InProcessIOPubChannel): - pass + # Emitted when any message is received. + message_received = QtCore.Signal(object) -class QtInProcessStdInChannel(QtStdInChannelMixin, InProcessStdInChannel): - pass + def start(self): + """ Reimplemented to emit signal. + """ + super(QtInProcessChannel, self).start() + self.started.emit() + + def stop(self): + """ Reimplemented to emit signal. + """ + super(QtInProcessChannel, self).stop() + self.stopped.emit() + + def call_handlers_later(self, *args, **kwds): + """ Call the message handlers later. + """ + do_later = lambda: self.call_handlers(*args, **kwds) + QtCore.QTimer.singleShot(0, do_later) + + def call_handlers(self, msg): + self.message_received.emit(msg) + + def process_events(self): + """ Process any pending GUI events. + """ + QtCore.QCoreApplication.instance().processEvents() + + def flush(self, timeout=1.0): + """ Reimplemented to ensure that signals are dispatched immediately. + """ + super(QtInProcessChannel, self).flush() + self.process_events() + + +class QtInProcessHBChannel(SuperQObject, InProcessHBChannel): + # This signal will never be fired, but it needs to exist + kernel_died = QtCore.Signal() -class QtInProcessHBChannel(QtHBChannelMixin, InProcessHBChannel): - pass class QtInProcessKernelClient(QtKernelClientMixin, InProcessKernelClient): """ An in-process KernelManager with signals and slots. """ - iopub_channel_class = Type(QtInProcessIOPubChannel) - shell_channel_class = Type(QtInProcessShellChannel) - stdin_channel_class = Type(QtInProcessStdInChannel) + iopub_channel_class = Type(QtInProcessChannel) + shell_channel_class = Type(QtInProcessChannel) + stdin_channel_class = Type(QtInProcessChannel) hb_channel_class = Type(QtInProcessHBChannel) class QtInProcessKernelManager(QtKernelManagerMixin, InProcessKernelManager): diff --git a/IPython/qt/kernel_mixins.py b/IPython/qt/kernel_mixins.py index 23d432b..9a63806 100644 --- a/IPython/qt/kernel_mixins.py +++ b/IPython/qt/kernel_mixins.py @@ -9,146 +9,6 @@ from IPython.utils.traitlets import HasTraits, Type from .util import MetaQObjectHasTraits, SuperQObject -class ChannelQObject(SuperQObject): - - # Emitted when the channel is started. - started = QtCore.Signal() - - # Emitted when the channel is stopped. - stopped = QtCore.Signal() - - def start(self): - """ Reimplemented to emit signal. - """ - super(ChannelQObject, self).start() - self.started.emit() - - def stop(self): - """ Reimplemented to emit signal. - """ - super(ChannelQObject, self).stop() - self.stopped.emit() - - #--------------------------------------------------------------------------- - # InProcessChannel interface - #--------------------------------------------------------------------------- - - def call_handlers_later(self, *args, **kwds): - """ Call the message handlers later. - """ - do_later = lambda: self.call_handlers(*args, **kwds) - QtCore.QTimer.singleShot(0, do_later) - - def process_events(self): - """ Process any pending GUI events. - """ - QtCore.QCoreApplication.instance().processEvents() - - -class QtShellChannelMixin(ChannelQObject): - - # Emitted when any message is received. - message_received = QtCore.Signal(object) - - # Emitted when a reply has been received for the corresponding request type. - execute_reply = QtCore.Signal(object) - complete_reply = QtCore.Signal(object) - inspect_reply = QtCore.Signal(object) - history_reply = QtCore.Signal(object) - kernel_info_reply = QtCore.Signal(object) - - def call_handlers(self, msg): - """ Reimplemented to emit signals instead of making callbacks. - """ - # Emit the generic signal. - self.message_received.emit(msg) - - # Emit signals for specialized message types. - msg_type = msg['header']['msg_type'] - if msg_type == 'kernel_info_reply': - self._handle_kernel_info_reply(msg) - - signal = getattr(self, msg_type, None) - if signal: - signal.emit(msg) - - -class QtIOPubChannelMixin(ChannelQObject): - - # Emitted when any message is received. - message_received = QtCore.Signal(object) - - # Emitted when a message of type 'stream' is received. - stream_received = QtCore.Signal(object) - - # Emitted when a message of type 'execute_input' is received. - execute_input_received = QtCore.Signal(object) - - # Emitted when a message of type 'execute_result' is received. - execute_result_received = QtCore.Signal(object) - - # Emitted when a message of type 'error' is received. - error_received = QtCore.Signal(object) - - # Emitted when a message of type 'display_data' is received - display_data_received = QtCore.Signal(object) - - # Emitted when a crash report message is received from the kernel's - # last-resort sys.excepthook. - crash_received = QtCore.Signal(object) - - # Emitted when a shutdown is noticed. - shutdown_reply_received = QtCore.Signal(object) - - def call_handlers(self, msg): - """ Reimplemented to emit signals instead of making callbacks. - """ - # Emit the generic signal. - self.message_received.emit(msg) - # Emit signals for specialized message types. - msg_type = msg['header']['msg_type'] - signal = getattr(self, msg_type + '_received', None) - if signal: - signal.emit(msg) - - def flush(self): - """ Reimplemented to ensure that signals are dispatched immediately. - """ - super(QtIOPubChannelMixin, self).flush() - QtCore.QCoreApplication.instance().processEvents() - - -class QtStdInChannelMixin(ChannelQObject): - - # Emitted when any message is received. - message_received = QtCore.Signal(object) - - # Emitted when an input request is received. - input_requested = QtCore.Signal(object) - - def call_handlers(self, msg): - """ Reimplemented to emit signals instead of making callbacks. - """ - # Emit the generic signal. - self.message_received.emit(msg) - - # Emit signals for specialized message types. - msg_type = msg['header']['msg_type'] - if msg_type == 'input_request': - self.input_requested.emit(msg) - - -class QtHBChannelMixin(ChannelQObject): - - # Emitted when the kernel has died. - kernel_died = QtCore.Signal(object) - - def call_handlers(self, since_last_heartbeat): - """ Reimplemented to emit signals instead of making callbacks. - """ - self.kernel_died.emit(since_last_heartbeat) - - class QtKernelRestarterMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})): _timer = None @@ -171,12 +31,6 @@ class QtKernelClientMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObje # Emitted when the kernel client has stopped listening. stopped_channels = QtCore.Signal() - # Use Qt-specific channel classes that emit signals. - iopub_channel_class = Type(QtIOPubChannelMixin) - shell_channel_class = Type(QtShellChannelMixin) - stdin_channel_class = Type(QtStdInChannelMixin) - hb_channel_class = Type(QtHBChannelMixin) - #--------------------------------------------------------------------------- # 'KernelClient' interface #--------------------------------------------------------------------------- diff --git a/IPython/terminal/console/completer.py b/IPython/terminal/console/completer.py index a85b6e5..40c2644 100644 --- a/IPython/terminal/console/completer.py +++ b/IPython/terminal/console/completer.py @@ -36,7 +36,7 @@ class ZMQCompleter(IPCompleter): # send completion request to kernel # Give the kernel up to 0.5s to respond - msg_id = self.client.shell_channel.complete( + msg_id = self.client.complete( code=line, cursor_pos=cursor_pos, ) diff --git a/IPython/terminal/console/interactiveshell.py b/IPython/terminal/console/interactiveshell.py index 1daba8e..ef97c87 100644 --- a/IPython/terminal/console/interactiveshell.py +++ b/IPython/terminal/console/interactiveshell.py @@ -157,8 +157,8 @@ class ZMQTerminalInteractiveShell(TerminalInteractiveShell): # flush stale replies, which could have been ignored, due to missed heartbeats while self.client.shell_channel.msg_ready(): self.client.shell_channel.get_msg() - # shell_channel.execute takes 'hidden', which is the inverse of store_hist - msg_id = self.client.shell_channel.execute(cell, not store_history) + # execute takes 'hidden', which is the inverse of store_hist + msg_id = self.client.execute(cell, not store_history) # first thing is wait for any side effects (output, stdin, etc.) self._executing = True @@ -399,7 +399,7 @@ class ZMQTerminalInteractiveShell(TerminalInteractiveShell): # only send stdin reply if there *was not* another request # or execution finished while we were reading. if not (self.client.stdin_channel.msg_ready() or self.client.shell_channel.msg_ready()): - self.client.stdin_channel.input(raw_data) + self.client.input(raw_data) def mainloop(self, display_banner=False): while True: @@ -414,7 +414,7 @@ class ZMQTerminalInteractiveShell(TerminalInteractiveShell): # handling seems rather unpredictable... self.write("\nKeyboardInterrupt in interact()\n") - self.client.shell_channel.shutdown() + self.client.shutdown() def _banner1_default(self): return "IPython Console {version}\n".format(version=release.version)