diff --git a/IPython/kernel/inprocess/client.py b/IPython/kernel/inprocess/client.py index 2ec276e..6484a47 100644 --- a/IPython/kernel/inprocess/client.py +++ b/IPython/kernel/inprocess/client.py @@ -45,7 +45,7 @@ class InProcessKernelClient(KernelClient): stdin_channel_class = Type(InProcessStdInChannel) hb_channel_class = Type(InProcessHBChannel) - kernel = Instance('IPython.kernel.inprocess.ipkernel.Kernel') + kernel = Instance('IPython.kernel.inprocess.ipkernel.InProcessKernel') #-------------------------------------------------------------------------- # Channel management methods diff --git a/IPython/kernel/inprocess/ipkernel.py b/IPython/kernel/inprocess/ipkernel.py index 2b18e01..8086afa 100644 --- a/IPython/kernel/inprocess/ipkernel.py +++ b/IPython/kernel/inprocess/ipkernel.py @@ -10,7 +10,7 @@ import sys from IPython.core.interactiveshell import InteractiveShellABC from IPython.utils.jsonutil import json_clean from IPython.utils.traitlets import Any, Enum, Instance, List, Type -from IPython.kernel.zmq.ipkernel import Kernel +from IPython.kernel.zmq.ipkernel import IPythonKernel from IPython.kernel.zmq.zmqshell import ZMQInteractiveShell from .socket import DummySocket @@ -19,7 +19,7 @@ from .socket import DummySocket # Main kernel class #----------------------------------------------------------------------------- -class InProcessKernel(Kernel): +class InProcessKernel(IPythonKernel): #------------------------------------------------------------------------- # InProcessKernel interface diff --git a/IPython/kernel/zmq/ipkernel.py b/IPython/kernel/zmq/ipkernel.py old mode 100755 new mode 100644 index 09ce934..93915b5 --- a/IPython/kernel/zmq/ipkernel.py +++ b/IPython/kernel/zmq/ipkernel.py @@ -1,143 +1,41 @@ -"""An interactive kernel that talks to frontends over 0MQ.""" - -# Copyright (c) IPython Development Team. -# Distributed under the terms of the Modified BSD License. - -from __future__ import print_function +"""The IPython kernel implementation""" import getpass import sys -import time import traceback -import logging -import uuid - -from datetime import datetime -from signal import ( - signal, default_int_handler, SIGINT -) -import zmq -from zmq.eventloop import ioloop -from zmq.eventloop.zmqstream import ZMQStream - -from IPython.config.configurable import Configurable -from IPython.core.error import StdinNotImplementedError from IPython.core import release -from IPython.utils import py3compat -from IPython.utils.py3compat import builtin_mod, unicode_type, string_types -from IPython.utils.jsonutil import json_clean +from IPython.utils.py3compat import builtin_mod, PY3 from IPython.utils.tokenutil import token_at_cursor -from IPython.utils.traitlets import ( - Any, Instance, Float, Dict, List, Set, Integer, Unicode, - Type, Bool, -) +from IPython.utils.traitlets import Instance, Type, Any +from IPython.utils.decorators import undoc +from .kernelbase import Kernel as KernelBase from .serialize import serialize_object, unpack_apply_message -from .session import Session from .zmqshell import ZMQInteractiveShell - -#----------------------------------------------------------------------------- -# Main kernel class -#----------------------------------------------------------------------------- - -protocol_version = release.kernel_protocol_version -ipython_version = release.version -language_version = sys.version.split()[0] - - -class Kernel(Configurable): - - #--------------------------------------------------------------------------- - # Kernel interface - #--------------------------------------------------------------------------- - - # attribute to override with a GUI - eventloop = Any(None) - def _eventloop_changed(self, name, old, new): - """schedule call to eventloop from IOLoop""" - loop = ioloop.IOLoop.instance() - loop.add_callback(self.enter_eventloop) - +class IPythonKernel(KernelBase): shell = Instance('IPython.core.interactiveshell.InteractiveShellABC') shell_class = Type(ZMQInteractiveShell) - session = Instance(Session) - profile_dir = Instance('IPython.core.profiledir.ProfileDir') - shell_streams = List() - control_stream = Instance(ZMQStream) - iopub_socket = Instance(zmq.Socket) - stdin_socket = Instance(zmq.Socket) - log = Instance(logging.Logger) - user_module = Any() def _user_module_changed(self, name, old, new): if self.shell is not None: self.shell.user_module = new - + user_ns = Instance(dict, args=None, allow_none=True) def _user_ns_changed(self, name, old, new): if self.shell is not None: self.shell.user_ns = new self.shell.init_user_ns() - # identities: - int_id = Integer(-1) - ident = Unicode() - - def _ident_default(self): - return unicode_type(uuid.uuid4()) - - # Private interface - - _darwin_app_nap = Bool(True, config=True, - help="""Whether to use appnope for compatiblity with OS X App Nap. - - Only affects OS X >= 10.9. - """ - ) - - # track associations with current request - _allow_stdin = Bool(False) - _parent_header = Dict() - _parent_ident = Any(b'') - # Time to sleep after flushing the stdout/err buffers in each execute - # cycle. While this introduces a hard limit on the minimal latency of the - # execute cycle, it helps prevent output synchronization problems for - # clients. - # Units are in seconds. The minimum zmq latency on local host is probably - # ~150 microseconds, set this to 500us for now. We may need to increase it - # a little if it's not enough after more interactive testing. - _execute_sleep = Float(0.0005, config=True) - - # Frequency of the kernel's event loop. - # Units are in seconds, kernel subclasses for GUI toolkits may need to - # adapt to milliseconds. - _poll_interval = Float(0.05, config=True) - - # If the shutdown was requested over the network, we leave here the - # necessary reply message so it can be sent by our registered atexit - # handler. This ensures that the reply is only sent to clients truly at - # the end of our shutdown process (which happens after the underlying - # IPython shell's own shutdown). - _shutdown_message = None - - # This is a dict of port number that the kernel is listening on. It is set - # by record_ports and used by connect_request. - _recorded_ports = Dict() - # A reference to the Python builtin 'raw_input' function. # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3) _sys_raw_input = Any() _sys_eval_input = Any() - # set of aborted msg_ids - aborted = Set() - - def __init__(self, **kwargs): - super(Kernel, self).__init__(**kwargs) + super(IPythonKernel, self).__init__(**kwargs) # Initialize the InteractiveShell subclass self.shell = self.shell_class.instance(parent=self, @@ -157,197 +55,39 @@ class Kernel(Configurable): # TMP - hack while developing self.shell._reply_content = None - # Build dict of handlers for message types - msg_types = [ 'execute_request', 'complete_request', - 'inspect_request', 'history_request', - 'kernel_info_request', - 'connect_request', 'shutdown_request', - 'apply_request', - ] - self.shell_handlers = {} - for msg_type in msg_types: - self.shell_handlers[msg_type] = getattr(self, msg_type) - comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ] comm_manager = self.shell.comm_manager for msg_type in comm_msg_types: self.shell_handlers[msg_type] = getattr(comm_manager, msg_type) - - control_msg_types = msg_types + [ 'clear_request', 'abort_request' ] - self.control_handlers = {} - for msg_type in control_msg_types: - self.control_handlers[msg_type] = getattr(self, msg_type) - - - def dispatch_control(self, msg): - """dispatch control requests""" - idents,msg = self.session.feed_identities(msg, copy=False) - try: - msg = self.session.unserialize(msg, content=True, copy=False) - except: - self.log.error("Invalid Control Message", exc_info=True) - return - - self.log.debug("Control received: %s", msg) - - header = msg['header'] - msg_id = header['msg_id'] - msg_type = header['msg_type'] - handler = self.control_handlers.get(msg_type, None) - if handler is None: - self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type) - else: - try: - handler(self.control_stream, idents, msg) - except Exception: - self.log.error("Exception in control handler:", exc_info=True) - - def dispatch_shell(self, stream, msg): - """dispatch shell requests""" - # flush control requests first - if self.control_stream: - self.control_stream.flush() - - idents,msg = self.session.feed_identities(msg, copy=False) - try: - msg = self.session.unserialize(msg, content=True, copy=False) - except: - self.log.error("Invalid Message", exc_info=True) - return - - header = msg['header'] - msg_id = header['msg_id'] - msg_type = msg['header']['msg_type'] - - # Print some info about this message and leave a '--->' marker, so it's - # easier to trace visually the message chain when debugging. Each - # handler prints its message at the end. - self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type) - self.log.debug(' Content: %s\n --->\n ', msg['content']) - - if msg_id in self.aborted: - self.aborted.remove(msg_id) - # is it safe to assume a msg_id will not be resubmitted? - reply_type = msg_type.split('_')[0] + '_reply' - status = {'status' : 'aborted'} - md = {'engine' : self.ident} - md.update(status) - reply_msg = self.session.send(stream, reply_type, metadata=md, - content=status, parent=msg, ident=idents) - return - - handler = self.shell_handlers.get(msg_type, None) - if handler is None: - self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type) - else: - # ensure default_int_handler during handler call - sig = signal(SIGINT, default_int_handler) - self.log.debug("%s: %s", msg_type, msg) - try: - handler(stream, idents, msg) - except Exception: - self.log.error("Exception in message handler:", exc_info=True) - finally: - signal(SIGINT, sig) - - def enter_eventloop(self): - """enter eventloop""" - self.log.info("entering eventloop %s", self.eventloop) - for stream in self.shell_streams: - # flush any pending replies, - # which may be skipped by entering the eventloop - stream.flush(zmq.POLLOUT) - # restore default_int_handler - signal(SIGINT, default_int_handler) - while self.eventloop is not None: - try: - self.eventloop(self) - except KeyboardInterrupt: - # Ctrl-C shouldn't crash the kernel - self.log.error("KeyboardInterrupt caught in kernel") - continue - else: - # eventloop exited cleanly, this means we should stop (right?) - self.eventloop = None - break - self.log.info("exiting eventloop") + # Kernel info fields + implementation = 'ipython' + implementation_version = release.version + language = 'python' + language_version = sys.version.split()[0] + @property + def banner(self): + return self.shell.banner def start(self): - """register dispatchers for streams""" self.shell.exit_now = False - if self.control_stream: - self.control_stream.on_recv(self.dispatch_control, copy=False) - - def make_dispatcher(stream): - def dispatcher(msg): - return self.dispatch_shell(stream, msg) - return dispatcher - - for s in self.shell_streams: - s.on_recv(make_dispatcher(s), copy=False) - - # publish idle status - self._publish_status('starting') - - def do_one_iteration(self): - """step eventloop just once""" - if self.control_stream: - self.control_stream.flush() - for stream in self.shell_streams: - # handle at most one request per iteration - stream.flush(zmq.POLLIN, 1) - stream.flush(zmq.POLLOUT) - - - def record_ports(self, ports): - """Record the ports that this kernel is using. - - The creator of the Kernel instance must call this methods if they - want the :meth:`connect_request` method to return the port numbers. + super(IPythonKernel, self).start() + + def set_parent(self, ident, parent): + """Overridden from parent to tell the display hook and output streams + about the parent message. """ - self._recorded_ports = ports - - #--------------------------------------------------------------------------- - # Kernel request handlers - #--------------------------------------------------------------------------- - - def _make_metadata(self, other=None): - """init metadata dict, for execute/apply_reply""" - new_md = { - 'dependencies_met' : True, - 'engine' : self.ident, - 'started': datetime.now(), - } - if other: - new_md.update(other) - return new_md - - def _publish_execute_input(self, code, parent, execution_count): - """Publish the code request on the iopub stream.""" - - self.session.send(self.iopub_socket, u'execute_input', - {u'code':code, u'execution_count': execution_count}, - parent=parent, ident=self._topic('execute_input') - ) - - def _publish_status(self, status, parent=None): - """send status (busy/idle) on IOPub""" - self.session.send(self.iopub_socket, - u'status', - {u'execution_state': status}, - parent=parent, - ident=self._topic('status'), - ) - + super(IPythonKernel, self).set_parent(ident, parent) + self.shell.set_parent(parent) + def _forward_input(self, allow_stdin=False): """Forward raw_input and getpass to the current frontend. - + via input_request """ self._allow_stdin = allow_stdin - - if py3compat.PY3: + + if PY3: self._sys_raw_input = builtin_mod.input builtin_mod.input = self.raw_input else: @@ -357,57 +97,32 @@ class Kernel(Configurable): builtin_mod.input = lambda prompt='': eval(self.raw_input(prompt)) self._save_getpass = getpass.getpass getpass.getpass = self.getpass - + def _restore_input(self): """Restore raw_input, getpass""" - if py3compat.PY3: + if PY3: builtin_mod.input = self._sys_raw_input else: builtin_mod.raw_input = self._sys_raw_input builtin_mod.input = self._sys_eval_input - + getpass.getpass = self._save_getpass - - def set_parent(self, ident, parent): - """Set the current parent_header - - Side effects (IOPub messages) and replies are associated with - the request that caused them via the parent_header. - - The parent identity is used to route input_request messages - on the stdin channel. - """ - self._parent_ident = ident - self._parent_header = parent - self.shell.set_parent(parent) - - def execute_request(self, stream, ident, parent): - """handle an execute_request""" - - self._publish_status(u'busy', parent) - - try: - content = parent[u'content'] - code = py3compat.cast_unicode_py2(content[u'code']) - silent = content[u'silent'] - store_history = content.get(u'store_history', not silent) - except: - self.log.error("Got bad msg: ") - self.log.error("%s", parent) - return - - md = self._make_metadata(parent['metadata']) - + + @property + def execution_count(self): + return self.shell.execution_count + + @execution_count.setter + def execution_count(self, value): + # Ignore the incrememnting done by KernelBase, in favour of our shell's + # execution counter. + pass + + def do_execute(self, code, silent, store_history=True, + user_expressions=None, allow_stdin=False): shell = self.shell # we'll need this a lot here - - self._forward_input(content.get('allow_stdin', False)) - # Set the parent message of the display hook and out streams. - self.set_parent(ident, parent) - - # Re-broadcast our input for the benefit of listening clients, and - # start computing output - if not silent: - self._publish_execute_input(code, parent, shell.execution_count) + + self._forward_input(allow_stdin) reply_content = {} # FIXME: the shell calls the exception handler itself. @@ -443,16 +158,16 @@ class Kernel(Configurable): reply_content['engine_info'] = e_info # reset after use shell._reply_content = None - + if 'traceback' in reply_content: self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback'])) - + # At this point, we can tell whether the main code execution succeeded # or not. If it did, we proceed to evaluate user_expressions if reply_content['status'] == 'ok': reply_content[u'user_expressions'] = \ - shell.user_expressions(content.get(u'user_expressions', {})) + shell.user_expressions(user_expressions or {}) else: # If there was an error, don't even try to compute expressions reply_content[u'user_expressions'] = {} @@ -465,56 +180,20 @@ class Kernel(Configurable): # it to sit in memory until the next execute_request comes in. shell.payload_manager.clear_payload() - # Flush output before sending the reply. - sys.stdout.flush() - sys.stderr.flush() - # FIXME: on rare occasions, the flush doesn't seem to make it to the - # clients... This seems to mitigate the problem, but we definitely need - # to better understand what's going on. - if self._execute_sleep: - time.sleep(self._execute_sleep) - - # Send the reply. - reply_content = json_clean(reply_content) - - md['status'] = reply_content['status'] - if reply_content['status'] == 'error' and \ - reply_content['ename'] == 'UnmetDependency': - md['dependencies_met'] = False - - reply_msg = self.session.send(stream, u'execute_reply', - reply_content, parent, metadata=md, - ident=ident) - - self.log.debug("%s", reply_msg) - - if not silent and reply_msg['content']['status'] == u'error': - self._abort_queues() - - self._publish_status(u'idle', parent) - - def complete_request(self, stream, ident, parent): - content = parent['content'] - code = content['code'] - cursor_pos = content['cursor_pos'] - + return reply_content + + def do_complete(self, code, cursor_pos): txt, matches = self.shell.complete('', code, cursor_pos) - matches = {'matches' : matches, - 'cursor_end' : cursor_pos, - 'cursor_start' : cursor_pos - len(txt), - 'metadata' : {}, - 'status' : 'ok'} - matches = json_clean(matches) - completion_msg = self.session.send(stream, 'complete_reply', - matches, parent, ident) - self.log.debug("%s", completion_msg) - - def inspect_request(self, stream, ident, parent): - content = parent['content'] - - name = token_at_cursor(content['code'], content['cursor_pos']) + return {'matches' : matches, + 'cursor_end' : cursor_pos, + 'cursor_start' : cursor_pos - len(txt), + 'metadata' : {}, + 'status' : 'ok'} + + def do_inspect(self, code, cursor_pos, detail_level=0): + name = token_at_cursor(code, cursor_pos) info = self.shell.object_inspect(name) - + reply_content = {'status' : 'ok'} reply_content['data'] = data = {} reply_content['metadata'] = {} @@ -522,106 +201,36 @@ class Kernel(Configurable): if info['found']: info_text = self.shell.object_inspect_text( name, - detail_level=content.get('detail_level', 0), + detail_level=detail_level, ) - reply_content['data']['text/plain'] = info_text - # Before we send this object over, we scrub it for JSON usage - reply_content = json_clean(reply_content) - msg = self.session.send(stream, 'inspect_reply', - reply_content, parent, ident) - self.log.debug("%s", msg) - - def history_request(self, stream, ident, parent): - # We need to pull these out, as passing **kwargs doesn't work with - # unicode keys before Python 2.6.5. - hist_access_type = parent['content']['hist_access_type'] - raw = parent['content']['raw'] - output = parent['content']['output'] + data['text/plain'] = info_text + + return reply_content + + def do_history(self, hist_access_type, output, raw, session=None, start=None, + stop=None, n=None, pattern=None, unique=False): if hist_access_type == 'tail': - n = parent['content']['n'] hist = self.shell.history_manager.get_tail(n, raw=raw, output=output, include_latest=True) elif hist_access_type == 'range': - session = parent['content']['session'] - start = parent['content']['start'] - stop = parent['content']['stop'] hist = self.shell.history_manager.get_range(session, start, stop, raw=raw, output=output) elif hist_access_type == 'search': - n = parent['content'].get('n') - unique = parent['content'].get('unique', False) - pattern = parent['content']['pattern'] hist = self.shell.history_manager.search( pattern, raw=raw, output=output, n=n, unique=unique) - else: hist = [] - hist = list(hist) - content = {'history' : hist} - content = json_clean(content) - msg = self.session.send(stream, 'history_reply', - content, parent, ident) - self.log.debug("Sending history reply with %i entries", len(hist)) - - def connect_request(self, stream, ident, parent): - if self._recorded_ports is not None: - content = self._recorded_ports.copy() - else: - content = {} - msg = self.session.send(stream, 'connect_reply', - content, parent, ident) - self.log.debug("%s", msg) - - def kernel_info_request(self, stream, ident, parent): - vinfo = { - 'protocol_version': protocol_version, - 'implementation': 'ipython', - 'implementation_version': ipython_version, - 'language_version': language_version, - 'language': 'python', - 'banner': self.shell.banner, - } - msg = self.session.send(stream, 'kernel_info_reply', - vinfo, parent, ident) - self.log.debug("%s", msg) - - def shutdown_request(self, stream, ident, parent): - self.shell.exit_now = True - content = dict(status='ok') - content.update(parent['content']) - self.session.send(stream, u'shutdown_reply', content, parent, ident=ident) - # same content, but different msg_id for broadcasting on IOPub - self._shutdown_message = self.session.msg(u'shutdown_reply', - content, parent - ) - - self._at_shutdown() - # call sys.exit after a short delay - loop = ioloop.IOLoop.instance() - loop.add_timeout(time.time()+0.1, loop.stop) - #--------------------------------------------------------------------------- - # Engine methods - #--------------------------------------------------------------------------- + return {'history' : list(hist)} - def apply_request(self, stream, ident, parent): - try: - content = parent[u'content'] - bufs = parent[u'buffers'] - msg_id = parent['header']['msg_id'] - except: - self.log.error("Got bad msg: %s", parent, exc_info=True) - return - - self._publish_status(u'busy', parent) + def do_shutdown(self, restart): + self.shell.exit_now = True + return dict(status='ok', restart=restart) - # Set the parent message of the display hook and out streams. + def do_apply(self, content, bufs, msg_id, reply_metadata): shell = self.shell - shell.set_parent(parent) - - md = self._make_metadata(parent['metadata']) try: working = shell.user_ns @@ -651,7 +260,7 @@ class Kernel(Configurable): buffer_threshold=self.session.buffer_threshold, item_threshold=self.session.item_threshold, ) - + except: # invoke IPython traceback formatting shell.showtraceback() @@ -664,191 +273,30 @@ class Kernel(Configurable): reply_content['engine_info'] = e_info # reset after use shell._reply_content = None - - self.session.send(self.iopub_socket, u'error', reply_content, parent=parent, + + self.send_response(self.iopub_socket, u'error', reply_content, ident=self._topic('error')) self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback'])) result_buf = [] if reply_content['ename'] == 'UnmetDependency': - md['dependencies_met'] = False + reply_metadata['dependencies_met'] = False else: reply_content = {'status' : 'ok'} - # put 'ok'/'error' status in header, for scheduler introspection: - md['status'] = reply_content['status'] - - # flush i/o - sys.stdout.flush() - sys.stderr.flush() - - reply_msg = self.session.send(stream, u'apply_reply', reply_content, - parent=parent, ident=ident,buffers=result_buf, metadata=md) - - self._publish_status(u'idle', parent) - - #--------------------------------------------------------------------------- - # Control messages - #--------------------------------------------------------------------------- - - def abort_request(self, stream, ident, parent): - """abort a specifig msg by id""" - msg_ids = parent['content'].get('msg_ids', None) - if isinstance(msg_ids, string_types): - msg_ids = [msg_ids] - if not msg_ids: - self.abort_queues() - for mid in msg_ids: - self.aborted.add(str(mid)) - - content = dict(status='ok') - reply_msg = self.session.send(stream, 'abort_reply', content=content, - parent=parent, ident=ident) - self.log.debug("%s", reply_msg) - - def clear_request(self, stream, idents, parent): - """Clear our namespace.""" + return reply_content, result_buf + + def do_clear(self): self.shell.reset(False) - msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent, - content = dict(status='ok')) - - - #--------------------------------------------------------------------------- - # Protected interface - #--------------------------------------------------------------------------- - - def _wrap_exception(self, method=None): - # import here, because _wrap_exception is only used in parallel, - # and parallel has higher min pyzmq version - from IPython.parallel.error import wrap_exception - e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method) - content = wrap_exception(e_info) - return content - - def _topic(self, topic): - """prefixed topic for IOPub messages""" - if self.int_id >= 0: - base = "engine.%i" % self.int_id - else: - base = "kernel.%s" % self.ident - - return py3compat.cast_bytes("%s.%s" % (base, topic)) - - def _abort_queues(self): - for stream in self.shell_streams: - if stream: - self._abort_queue(stream) - - def _abort_queue(self, stream): - poller = zmq.Poller() - poller.register(stream.socket, zmq.POLLIN) - while True: - idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True) - if msg is None: - return - - self.log.info("Aborting:") - self.log.info("%s", msg) - msg_type = msg['header']['msg_type'] - reply_type = msg_type.split('_')[0] + '_reply' - - status = {'status' : 'aborted'} - md = {'engine' : self.ident} - md.update(status) - reply_msg = self.session.send(stream, reply_type, metadata=md, - content=status, parent=msg, ident=idents) - self.log.debug("%s", reply_msg) - # We need to wait a bit for requests to come in. This can probably - # be set shorter for true asynchronous clients. - poller.poll(50) - - - def _no_raw_input(self): - """Raise StdinNotImplentedError if active frontend doesn't support - stdin.""" - raise StdinNotImplementedError("raw_input was called, but this " - "frontend does not support stdin.") - - def getpass(self, prompt=''): - """Forward getpass to frontends - - Raises - ------ - StdinNotImplentedError if active frontend doesn't support stdin. - """ - if not self._allow_stdin: - raise StdinNotImplementedError( - "getpass was called, but this frontend does not support input requests." - ) - return self._input_request(prompt, - self._parent_ident, - self._parent_header, - password=True, - ) - - def raw_input(self, prompt=''): - """Forward raw_input to frontends - - Raises - ------ - StdinNotImplentedError if active frontend doesn't support stdin. - """ - if not self._allow_stdin: - raise StdinNotImplementedError( - "raw_input was called, but this frontend does not support input requests." - ) - return self._input_request(prompt, - self._parent_ident, - self._parent_header, - password=False, - ) - - def _input_request(self, prompt, ident, parent, password=False): - # Flush output before making the request. - sys.stderr.flush() - sys.stdout.flush() - # flush the stdin socket, to purge stale replies - while True: - try: - self.stdin_socket.recv_multipart(zmq.NOBLOCK) - except zmq.ZMQError as e: - if e.errno == zmq.EAGAIN: - break - else: - raise - - # Send the input request. - content = json_clean(dict(prompt=prompt, password=password)) - self.session.send(self.stdin_socket, u'input_request', content, parent, - ident=ident) - - # Await a response. - while True: - try: - ident, reply = self.session.recv(self.stdin_socket, 0) - except Exception: - self.log.warn("Invalid Message:", exc_info=True) - except KeyboardInterrupt: - # re-raise KeyboardInterrupt, to truncate traceback - raise KeyboardInterrupt - else: - break - try: - value = py3compat.unicode_to_str(reply['content']['value']) - except: - self.log.error("Bad input_reply: %s", parent) - value = '' - if value == '\x04': - # EOF - raise EOFError - return value - - def _at_shutdown(self): - """Actions taken at shutdown by the kernel, called by python's atexit. - """ - # io.rprint("Kernel at_shutdown") # dbg - if self._shutdown_message is not None: - self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown')) - self.log.debug("%s", self._shutdown_message) - [ s.flush(zmq.POLLOUT) for s in self.shell_streams ] + return dict(status='ok') + + +# This exists only for backwards compatibility - use IPythonKernel instead +@undoc +class Kernel(IPythonKernel): + def __init__(self, *args, **kwargs): + import warnings + warnings.warn('Kernel is a deprecated alias of IPython.kernel.zmq.ipkernel.IPythonKernel', + DeprecationWarning) + super(Kernel, self).__init__(*args, **kwargs) \ No newline at end of file diff --git a/IPython/kernel/zmq/kernelapp.py b/IPython/kernel/zmq/kernelapp.py index e53431b..8806cef 100644 --- a/IPython/kernel/zmq/kernelapp.py +++ b/IPython/kernel/zmq/kernelapp.py @@ -25,7 +25,7 @@ from IPython.core.shellapp import ( from IPython.utils import io from IPython.utils.path import filefind from IPython.utils.traitlets import ( - Any, Instance, Dict, Unicode, Integer, Bool, DottedObjectName, + Any, Instance, Dict, Unicode, Integer, Bool, DottedObjectName, Type, ) from IPython.utils.importstring import import_item from IPython.kernel import write_connection_file @@ -33,7 +33,7 @@ from IPython.kernel.connect import ConnectionFileMixin # local imports from .heartbeat import Heartbeat -from .ipkernel import Kernel +from .ipkernel import IPythonKernel from .parentpoller import ParentPollerUnix, ParentPollerWindows from .session import ( Session, session_flags, session_aliases, default_secure, @@ -100,9 +100,10 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp, name='ipkernel' aliases = Dict(kernel_aliases) flags = Dict(kernel_flags) - classes = [Kernel, ZMQInteractiveShell, ProfileDir, Session] + classes = [IPythonKernel, ZMQInteractiveShell, ProfileDir, Session] # the kernel class, as an importstring - kernel_class = DottedObjectName('IPython.kernel.zmq.ipkernel.Kernel', config=True, + kernel_class = Type('IPython.kernel.zmq.ipkernel.IPythonKernel', config=True, + klass='IPython.kernel.zmq.kernelbase.Kernel', help="""The Kernel subclass to be used. This should allow easy re-use of the IPKernelApp entry point @@ -315,7 +316,7 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp, shell_stream = ZMQStream(self.shell_socket) control_stream = ZMQStream(self.control_socket) - kernel_factory = import_item(str(self.kernel_class)) + kernel_factory = self.kernel_class kernel = kernel_factory(parent=self, session=self.session, shell_streams=[shell_stream, control_stream], @@ -351,8 +352,9 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp, shell._showtraceback = _showtraceback def init_shell(self): - self.shell = self.kernel.shell - self.shell.configurables.append(self) + self.shell = getattr(self.kernel, 'shell', None) + if self.shell: + self.shell.configurables.append(self) @catch_config_error def initialize(self, argv=None): @@ -372,9 +374,10 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp, # shell init steps self.init_path() self.init_shell() - self.init_gui_pylab() - self.init_extensions() - self.init_code() + if self.shell: + self.init_gui_pylab() + self.init_extensions() + self.init_code() # flush stdout/stderr, so that anything written to these streams during # initialization do not get associated with the first execution request sys.stdout.flush() diff --git a/IPython/kernel/zmq/kernelbase.py b/IPython/kernel/zmq/kernelbase.py new file mode 100755 index 0000000..dfdcd96 --- /dev/null +++ b/IPython/kernel/zmq/kernelbase.py @@ -0,0 +1,675 @@ +"""Base class for a kernel that talks to frontends over 0MQ.""" + +# Copyright (c) IPython Development Team. +# Distributed under the terms of the Modified BSD License. + +from __future__ import print_function + +import sys +import time +import logging +import uuid + +from datetime import datetime +from signal import ( + signal, default_int_handler, SIGINT +) + +import zmq +from zmq.eventloop import ioloop +from zmq.eventloop.zmqstream import ZMQStream + +from IPython.config.configurable import Configurable +from IPython.core.error import StdinNotImplementedError +from IPython.core import release +from IPython.utils import py3compat +from IPython.utils.py3compat import unicode_type, string_types +from IPython.utils.jsonutil import json_clean +from IPython.utils.traitlets import ( + Any, Instance, Float, Dict, List, Set, Integer, Unicode, Bool, +) + +from .session import Session + + +class Kernel(Configurable): + + #--------------------------------------------------------------------------- + # Kernel interface + #--------------------------------------------------------------------------- + + # attribute to override with a GUI + eventloop = Any(None) + def _eventloop_changed(self, name, old, new): + """schedule call to eventloop from IOLoop""" + loop = ioloop.IOLoop.instance() + loop.add_callback(self.enter_eventloop) + + session = Instance(Session) + profile_dir = Instance('IPython.core.profiledir.ProfileDir') + shell_streams = List() + control_stream = Instance(ZMQStream) + iopub_socket = Instance(zmq.Socket) + stdin_socket = Instance(zmq.Socket) + log = Instance(logging.Logger) + + # identities: + int_id = Integer(-1) + ident = Unicode() + + def _ident_default(self): + return unicode_type(uuid.uuid4()) + + # Private interface + + _darwin_app_nap = Bool(True, config=True, + help="""Whether to use appnope for compatiblity with OS X App Nap. + + Only affects OS X >= 10.9. + """ + ) + + # track associations with current request + _allow_stdin = Bool(False) + _parent_header = Dict() + _parent_ident = Any(b'') + # Time to sleep after flushing the stdout/err buffers in each execute + # cycle. While this introduces a hard limit on the minimal latency of the + # execute cycle, it helps prevent output synchronization problems for + # clients. + # Units are in seconds. The minimum zmq latency on local host is probably + # ~150 microseconds, set this to 500us for now. We may need to increase it + # a little if it's not enough after more interactive testing. + _execute_sleep = Float(0.0005, config=True) + + # Frequency of the kernel's event loop. + # Units are in seconds, kernel subclasses for GUI toolkits may need to + # adapt to milliseconds. + _poll_interval = Float(0.05, config=True) + + # If the shutdown was requested over the network, we leave here the + # necessary reply message so it can be sent by our registered atexit + # handler. This ensures that the reply is only sent to clients truly at + # the end of our shutdown process (which happens after the underlying + # IPython shell's own shutdown). + _shutdown_message = None + + # This is a dict of port number that the kernel is listening on. It is set + # by record_ports and used by connect_request. + _recorded_ports = Dict() + + # set of aborted msg_ids + aborted = Set() + + # Track execution count here. For IPython, we override this to use the + # execution count we store in the shell. + execution_count = 0 + + + def __init__(self, **kwargs): + super(Kernel, self).__init__(**kwargs) + + # Build dict of handlers for message types + msg_types = [ 'execute_request', 'complete_request', + 'inspect_request', 'history_request', + 'kernel_info_request', + 'connect_request', 'shutdown_request', + 'apply_request', + ] + self.shell_handlers = {} + for msg_type in msg_types: + self.shell_handlers[msg_type] = getattr(self, msg_type) + + control_msg_types = msg_types + [ 'clear_request', 'abort_request' ] + self.control_handlers = {} + for msg_type in control_msg_types: + self.control_handlers[msg_type] = getattr(self, msg_type) + + + def dispatch_control(self, msg): + """dispatch control requests""" + idents,msg = self.session.feed_identities(msg, copy=False) + try: + msg = self.session.unserialize(msg, content=True, copy=False) + except: + self.log.error("Invalid Control Message", exc_info=True) + return + + self.log.debug("Control received: %s", msg) + + header = msg['header'] + msg_type = header['msg_type'] + + handler = self.control_handlers.get(msg_type, None) + if handler is None: + self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type) + else: + try: + handler(self.control_stream, idents, msg) + except Exception: + self.log.error("Exception in control handler:", exc_info=True) + + def dispatch_shell(self, stream, msg): + """dispatch shell requests""" + # flush control requests first + if self.control_stream: + self.control_stream.flush() + + idents,msg = self.session.feed_identities(msg, copy=False) + try: + msg = self.session.unserialize(msg, content=True, copy=False) + except: + self.log.error("Invalid Message", exc_info=True) + return + + header = msg['header'] + msg_id = header['msg_id'] + msg_type = msg['header']['msg_type'] + + # Print some info about this message and leave a '--->' marker, so it's + # easier to trace visually the message chain when debugging. Each + # handler prints its message at the end. + self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type) + self.log.debug(' Content: %s\n --->\n ', msg['content']) + + if msg_id in self.aborted: + self.aborted.remove(msg_id) + # is it safe to assume a msg_id will not be resubmitted? + reply_type = msg_type.split('_')[0] + '_reply' + status = {'status' : 'aborted'} + md = {'engine' : self.ident} + md.update(status) + self.session.send(stream, reply_type, metadata=md, + content=status, parent=msg, ident=idents) + return + + handler = self.shell_handlers.get(msg_type, None) + if handler is None: + self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type) + else: + # ensure default_int_handler during handler call + sig = signal(SIGINT, default_int_handler) + self.log.debug("%s: %s", msg_type, msg) + try: + handler(stream, idents, msg) + except Exception: + self.log.error("Exception in message handler:", exc_info=True) + finally: + signal(SIGINT, sig) + + def enter_eventloop(self): + """enter eventloop""" + self.log.info("entering eventloop %s", self.eventloop) + for stream in self.shell_streams: + # flush any pending replies, + # which may be skipped by entering the eventloop + stream.flush(zmq.POLLOUT) + # restore default_int_handler + signal(SIGINT, default_int_handler) + while self.eventloop is not None: + try: + self.eventloop(self) + except KeyboardInterrupt: + # Ctrl-C shouldn't crash the kernel + self.log.error("KeyboardInterrupt caught in kernel") + continue + else: + # eventloop exited cleanly, this means we should stop (right?) + self.eventloop = None + break + self.log.info("exiting eventloop") + + def start(self): + """register dispatchers for streams""" + if self.control_stream: + self.control_stream.on_recv(self.dispatch_control, copy=False) + + def make_dispatcher(stream): + def dispatcher(msg): + return self.dispatch_shell(stream, msg) + return dispatcher + + for s in self.shell_streams: + s.on_recv(make_dispatcher(s), copy=False) + + # publish idle status + self._publish_status('starting') + + def do_one_iteration(self): + """step eventloop just once""" + if self.control_stream: + self.control_stream.flush() + for stream in self.shell_streams: + # handle at most one request per iteration + stream.flush(zmq.POLLIN, 1) + stream.flush(zmq.POLLOUT) + + + def record_ports(self, ports): + """Record the ports that this kernel is using. + + The creator of the Kernel instance must call this methods if they + want the :meth:`connect_request` method to return the port numbers. + """ + self._recorded_ports = ports + + #--------------------------------------------------------------------------- + # Kernel request handlers + #--------------------------------------------------------------------------- + + def _make_metadata(self, other=None): + """init metadata dict, for execute/apply_reply""" + new_md = { + 'dependencies_met' : True, + 'engine' : self.ident, + 'started': datetime.now(), + } + if other: + new_md.update(other) + return new_md + + def _publish_execute_input(self, code, parent, execution_count): + """Publish the code request on the iopub stream.""" + + self.session.send(self.iopub_socket, u'execute_input', + {u'code':code, u'execution_count': execution_count}, + parent=parent, ident=self._topic('execute_input') + ) + + def _publish_status(self, status, parent=None): + """send status (busy/idle) on IOPub""" + self.session.send(self.iopub_socket, + u'status', + {u'execution_state': status}, + parent=parent, + ident=self._topic('status'), + ) + + def set_parent(self, ident, parent): + """Set the current parent_header + + Side effects (IOPub messages) and replies are associated with + the request that caused them via the parent_header. + + The parent identity is used to route input_request messages + on the stdin channel. + """ + self._parent_ident = ident + self._parent_header = parent + + def send_response(self, stream, msg_or_type, content=None, ident=None, + buffers=None, track=False, header=None, metadata=None): + """Send a response to the message we're currently processing. + + This accepts all the parameters of :meth:`IPython.kernel.zmq.session.Session.send` + except ``parent``. + + This relies on :meth:`set_parent` having been called for the current + message. + """ + return self.session.send(stream, msg_or_type, content, self._parent_header, + ident, buffers, track, header, metadata) + + def execute_request(self, stream, ident, parent): + """handle an execute_request""" + + self._publish_status(u'busy', parent) + + try: + content = parent[u'content'] + code = py3compat.cast_unicode_py2(content[u'code']) + silent = content[u'silent'] + store_history = content.get(u'store_history', not silent) + user_expressions = content.get('user_expressions', {}) + allow_stdin = content.get('allow_stdin', False) + except: + self.log.error("Got bad msg: ") + self.log.error("%s", parent) + return + + md = self._make_metadata(parent['metadata']) + + # Set the parent message of the display hook and out streams. + self.set_parent(ident, parent) + + # Re-broadcast our input for the benefit of listening clients, and + # start computing output + if not silent: + self.execution_count += 1 + self._publish_execute_input(code, parent, self.execution_count) + + reply_content = self.do_execute(code, silent, store_history, + user_expressions, allow_stdin) + + # Flush output before sending the reply. + sys.stdout.flush() + sys.stderr.flush() + # FIXME: on rare occasions, the flush doesn't seem to make it to the + # clients... This seems to mitigate the problem, but we definitely need + # to better understand what's going on. + if self._execute_sleep: + time.sleep(self._execute_sleep) + + # Send the reply. + reply_content = json_clean(reply_content) + + md['status'] = reply_content['status'] + if reply_content['status'] == 'error' and \ + reply_content['ename'] == 'UnmetDependency': + md['dependencies_met'] = False + + reply_msg = self.session.send(stream, u'execute_reply', + reply_content, parent, metadata=md, + ident=ident) + + self.log.debug("%s", reply_msg) + + if not silent and reply_msg['content']['status'] == u'error': + self._abort_queues() + + self._publish_status(u'idle', parent) + + def do_execute(self, code, silent, store_history=True, + user_experssions=None, allow_stdin=False): + """Execute user code. Must be overridden by subclasses. + """ + raise NotImplementedError + + def complete_request(self, stream, ident, parent): + content = parent['content'] + code = content['code'] + cursor_pos = content['cursor_pos'] + + matches = self.do_complete(code, cursor_pos) + matches = json_clean(matches) + completion_msg = self.session.send(stream, 'complete_reply', + matches, parent, ident) + self.log.debug("%s", completion_msg) + + def do_complete(self, code, cursor_pos): + """Override in subclasses to find completions. + """ + return {'matches' : [], + 'cursor_end' : cursor_pos, + 'cursor_start' : cursor_pos, + 'metadata' : {}, + 'status' : 'ok'} + + def inspect_request(self, stream, ident, parent): + content = parent['content'] + + reply_content = self.do_inspect(content['code'], content['cursor_pos'], + content.get('detail_level', 0)) + # Before we send this object over, we scrub it for JSON usage + reply_content = json_clean(reply_content) + msg = self.session.send(stream, 'inspect_reply', + reply_content, parent, ident) + self.log.debug("%s", msg) + + def do_inspect(self, code, cursor_pos, detail_level=0): + """Override in subclasses to allow introspection. + """ + return {'status': 'ok', 'data':{}, 'metadata':{}, 'found':False} + + def history_request(self, stream, ident, parent): + content = parent['content'] + + reply_content = self.do_history(**content) + + reply_content = json_clean(reply_content) + msg = self.session.send(stream, 'history_reply', + reply_content, parent, ident) + self.log.debug("%s", msg) + + def do_history(self, hist_access_type, output, raw, session=None, start=None, + stop=None, n=None, pattern=None, unique=False): + """Override in subclasses to access history. + """ + return {'history': []} + + def connect_request(self, stream, ident, parent): + if self._recorded_ports is not None: + content = self._recorded_ports.copy() + else: + content = {} + msg = self.session.send(stream, 'connect_reply', + content, parent, ident) + self.log.debug("%s", msg) + + @property + def kernel_info(self): + return { + 'protocol_version': release.kernel_protocol_version, + 'implementation': self.implementation, + 'implementation_version': self.implementation_version, + 'language': self.language, + 'language_version': self.language_version, + 'banner': self.banner, + } + + def kernel_info_request(self, stream, ident, parent): + msg = self.session.send(stream, 'kernel_info_reply', + self.kernel_info, parent, ident) + self.log.debug("%s", msg) + + def shutdown_request(self, stream, ident, parent): + content = self.do_shutdown(parent['content']['restart']) + self.session.send(stream, u'shutdown_reply', content, parent, ident=ident) + # same content, but different msg_id for broadcasting on IOPub + self._shutdown_message = self.session.msg(u'shutdown_reply', + content, parent + ) + + self._at_shutdown() + # call sys.exit after a short delay + loop = ioloop.IOLoop.instance() + loop.add_timeout(time.time()+0.1, loop.stop) + + def do_shutdown(self, restart): + """Override in subclasses to do things when the frontend shuts down the + kernel. + """ + return {'status': 'ok', 'restart': restart} + + #--------------------------------------------------------------------------- + # Engine methods + #--------------------------------------------------------------------------- + + def apply_request(self, stream, ident, parent): + try: + content = parent[u'content'] + bufs = parent[u'buffers'] + msg_id = parent['header']['msg_id'] + except: + self.log.error("Got bad msg: %s", parent, exc_info=True) + return + + self._publish_status(u'busy', parent) + + # Set the parent message of the display hook and out streams. + self.set_parent(ident, parent) + + md = self._make_metadata(parent['metadata']) + + reply_content, result_buf = self.do_apply(content, bufs, msg_id, md) + + # put 'ok'/'error' status in header, for scheduler introspection: + md['status'] = reply_content['status'] + + # flush i/o + sys.stdout.flush() + sys.stderr.flush() + + self.session.send(stream, u'apply_reply', reply_content, + parent=parent, ident=ident,buffers=result_buf, metadata=md) + + self._publish_status(u'idle', parent) + + def do_apply(self, content, bufs, msg_id, reply_metadata): + """Override in subclasses to support the IPython parallel framework. + """ + raise NotImplementedError + + #--------------------------------------------------------------------------- + # Control messages + #--------------------------------------------------------------------------- + + def abort_request(self, stream, ident, parent): + """abort a specifig msg by id""" + msg_ids = parent['content'].get('msg_ids', None) + if isinstance(msg_ids, string_types): + msg_ids = [msg_ids] + if not msg_ids: + self.abort_queues() + for mid in msg_ids: + self.aborted.add(str(mid)) + + content = dict(status='ok') + reply_msg = self.session.send(stream, 'abort_reply', content=content, + parent=parent, ident=ident) + self.log.debug("%s", reply_msg) + + def clear_request(self, stream, idents, parent): + """Clear our namespace.""" + content = self.do_clear() + self.session.send(stream, 'clear_reply', ident=idents, parent=parent, + content = content) + + def do_clear(self): + """Override in subclasses to clear the namespace + + This is only required for IPython.parallel. + """ + raise NotImplementedError + + #--------------------------------------------------------------------------- + # Protected interface + #--------------------------------------------------------------------------- + + def _topic(self, topic): + """prefixed topic for IOPub messages""" + if self.int_id >= 0: + base = "engine.%i" % self.int_id + else: + base = "kernel.%s" % self.ident + + return py3compat.cast_bytes("%s.%s" % (base, topic)) + + def _abort_queues(self): + for stream in self.shell_streams: + if stream: + self._abort_queue(stream) + + def _abort_queue(self, stream): + poller = zmq.Poller() + poller.register(stream.socket, zmq.POLLIN) + while True: + idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True) + if msg is None: + return + + self.log.info("Aborting:") + self.log.info("%s", msg) + msg_type = msg['header']['msg_type'] + reply_type = msg_type.split('_')[0] + '_reply' + + status = {'status' : 'aborted'} + md = {'engine' : self.ident} + md.update(status) + reply_msg = self.session.send(stream, reply_type, metadata=md, + content=status, parent=msg, ident=idents) + self.log.debug("%s", reply_msg) + # We need to wait a bit for requests to come in. This can probably + # be set shorter for true asynchronous clients. + poller.poll(50) + + + def _no_raw_input(self): + """Raise StdinNotImplentedError if active frontend doesn't support + stdin.""" + raise StdinNotImplementedError("raw_input was called, but this " + "frontend does not support stdin.") + + def getpass(self, prompt=''): + """Forward getpass to frontends + + Raises + ------ + StdinNotImplentedError if active frontend doesn't support stdin. + """ + if not self._allow_stdin: + raise StdinNotImplementedError( + "getpass was called, but this frontend does not support input requests." + ) + return self._input_request(prompt, + self._parent_ident, + self._parent_header, + password=True, + ) + + def raw_input(self, prompt=''): + """Forward raw_input to frontends + + Raises + ------ + StdinNotImplentedError if active frontend doesn't support stdin. + """ + if not self._allow_stdin: + raise StdinNotImplementedError( + "raw_input was called, but this frontend does not support input requests." + ) + return self._input_request(prompt, + self._parent_ident, + self._parent_header, + password=False, + ) + + def _input_request(self, prompt, ident, parent, password=False): + # Flush output before making the request. + sys.stderr.flush() + sys.stdout.flush() + # flush the stdin socket, to purge stale replies + while True: + try: + self.stdin_socket.recv_multipart(zmq.NOBLOCK) + except zmq.ZMQError as e: + if e.errno == zmq.EAGAIN: + break + else: + raise + + # Send the input request. + content = json_clean(dict(prompt=prompt, password=password)) + self.session.send(self.stdin_socket, u'input_request', content, parent, + ident=ident) + + # Await a response. + while True: + try: + ident, reply = self.session.recv(self.stdin_socket, 0) + except Exception: + self.log.warn("Invalid Message:", exc_info=True) + except KeyboardInterrupt: + # re-raise KeyboardInterrupt, to truncate traceback + raise KeyboardInterrupt + else: + break + try: + value = py3compat.unicode_to_str(reply['content']['value']) + except: + self.log.error("Bad input_reply: %s", parent) + value = '' + if value == '\x04': + # EOF + raise EOFError + return value + + def _at_shutdown(self): + """Actions taken at shutdown by the kernel, called by python's atexit. + """ + # io.rprint("Kernel at_shutdown") # dbg + if self._shutdown_message is not None: + self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown')) + self.log.debug("%s", self._shutdown_message) + [ s.flush(zmq.POLLOUT) for s in self.shell_streams ] + diff --git a/IPython/parallel/apps/ipengineapp.py b/IPython/parallel/apps/ipengineapp.py index 26931ea..932a840 100755 --- a/IPython/parallel/apps/ipengineapp.py +++ b/IPython/parallel/apps/ipengineapp.py @@ -37,7 +37,7 @@ from IPython.parallel.apps.baseapp import ( catch_config_error, ) from IPython.kernel.zmq.log import EnginePUBHandler -from IPython.kernel.zmq.ipkernel import Kernel +from IPython.kernel.zmq.ipkernel import IPythonKernel as Kernel from IPython.kernel.zmq.kernelapp import IPKernelApp from IPython.kernel.zmq.session import ( Session, session_aliases, session_flags diff --git a/IPython/parallel/engine/engine.py b/IPython/parallel/engine/engine.py index bc0486b..326a690 100644 --- a/IPython/parallel/engine/engine.py +++ b/IPython/parallel/engine/engine.py @@ -26,7 +26,7 @@ from IPython.parallel.factory import RegistrationFactory from IPython.parallel.util import disambiguate_url from IPython.kernel.zmq.session import Message -from IPython.kernel.zmq.ipkernel import Kernel +from IPython.kernel.zmq.ipkernel import IPythonKernel as Kernel from IPython.kernel.zmq.kernelapp import IPKernelApp class EngineFactory(RegistrationFactory): diff --git a/IPython/parallel/tests/test_asyncresult.py b/IPython/parallel/tests/test_asyncresult.py index ddb458d..80fdf3c 100644 --- a/IPython/parallel/tests/test_asyncresult.py +++ b/IPython/parallel/tests/test_asyncresult.py @@ -188,8 +188,8 @@ class AsyncResultTest(ClusterTestCase): ar = v.apply_async(time.sleep, 0.25) while not ar.ready(): time.sleep(0.01) - self.assertTrue(ar.elapsed < 1) - self.assertTrue(ar.elapsed < 1) + self.assertLess(ar.elapsed, 1) + self.assertLess(ar.elapsed, 1) ar.get(2) def test_hubresult_timestamps(self): diff --git a/IPython/utils/tests/test_traitlets.py b/IPython/utils/tests/test_traitlets.py index e8da006..9dbeb91 100644 --- a/IPython/utils/tests/test_traitlets.py +++ b/IPython/utils/tests/test_traitlets.py @@ -531,6 +531,15 @@ class TestType(TestCase): self.assertRaises(TraitError, setattr, a, 'klass', 10) + def test_set_str_klass(self): + + class A(HasTraits): + klass = Type() + + a = A(klass='IPython.utils.ipstruct.Struct') + from IPython.utils.ipstruct import Struct + self.assertEqual(a.klass, Struct) + class TestInstance(TestCase): def test_basic(self): diff --git a/IPython/utils/traitlets.py b/IPython/utils/traitlets.py index 69d7b64..5234713 100644 --- a/IPython/utils/traitlets.py +++ b/IPython/utils/traitlets.py @@ -755,6 +755,12 @@ class Type(ClassBasedTraitType): def validate(self, obj, value): """Validates that the value is a valid object instance.""" + if isinstance(value, py3compat.string_types): + try: + value = import_item(value) + except ImportError: + raise TraitError("The '%s' trait of %s instance must be a type, but " + "%r could not be imported" % (self.name, obj, value)) try: if issubclass(value, self.klass): return value diff --git a/docs/source/development/index.rst b/docs/source/development/index.rst index f8c9386..cc1cc5a 100644 --- a/docs/source/development/index.rst +++ b/docs/source/development/index.rst @@ -20,6 +20,7 @@ on the IPython GitHub wiki. :maxdepth: 1 messaging + wrapperkernels execution parallel_messages parallel_connections diff --git a/docs/source/development/messaging.rst b/docs/source/development/messaging.rst index 6c145c4..7d95e05 100644 --- a/docs/source/development/messaging.rst +++ b/docs/source/development/messaging.rst @@ -409,6 +409,7 @@ When status is 'error', the following extra fields are present:: When status is 'abort', there are for now no additional data fields. This happens when the kernel was interrupted by a signal. +.. _msging_inspection: Introspection ------------- @@ -465,6 +466,8 @@ Message type: ``inspect_reply``:: Reply is changed from structured data to a mime bundle, allowing formatting decisions to be made by the kernel. +.. _msging_completion: + Completion ---------- @@ -512,6 +515,7 @@ Message type: ``complete_reply``:: - ``matched_text`` is removed in favor of ``cursor_start`` and ``cursor_end``. - ``metadata`` is added for extended information. +.. _msging_history: History ------- @@ -590,6 +594,7 @@ Message type: ``connect_reply``:: 'hb_port' : int, # The port the heartbeat socket is listening on. } +.. _msging_kernel_info: Kernel info ----------- @@ -650,6 +655,7 @@ Message type: ``kernel_info_reply``:: ``implementation``, ``implementation_version``, and ``banner`` keys are added. +.. _msging_shutdown: Kernel shutdown --------------- diff --git a/docs/source/development/wrapperkernels.rst b/docs/source/development/wrapperkernels.rst new file mode 100644 index 0000000..5efcbef --- /dev/null +++ b/docs/source/development/wrapperkernels.rst @@ -0,0 +1,148 @@ +Making simple Python wrapper kernels +==================================== + +.. versionadded:: 3.0 + +You can now re-use the kernel machinery in IPython to easily make new kernels. +This is useful for languages that have Python bindings, such as `Octave +`_ (via +`Oct2Py `_), or languages +where the REPL can be controlled in a tty using `pexpect `_, +such as bash. + +Required steps +-------------- + +Subclass :class:`IPython.kernel.zmq.kernelbase.Kernel`, and implement the +following methods and attributes: + +.. class:: MyKernel + + .. attribute:: implementation + implementation_version + language + language_version + banner + + Information for :ref:`msging_kernel_info` replies. 'Implementation' refers + to the kernel (e.g. IPython), and 'language' refers to the language it + interprets (e.g. Python). The 'banner' is displayed to the user in console + UIs before the first prompt. All of these values are strings. + + .. method:: do_execute(code, silent, store_history=True, user_expressions=None, allow_stdin=False) + + Execute user code. + + :param str code: The code to be executed. + :param bool silent: Whether to display output. + :param bool store_history: Whether to record this code in history and + increase the execution count. If silent is True, this is implicitly + False. + :param dict user_expressions: Mapping of names to expressions to evaluate + after the code has run. You can ignore this if you need to. + :param bool allow_stdin: Whether the frontend can provide input on request + (e.g. for Python's :func:`raw_input`). + + Your method should return a dict containing the fields described in + :ref:`execution_results`. To display output, it can send messages + using :meth:`~IPython.kernel.zmq.kernelbase.Kernel.send_response`. + See :doc:`messaging` for details of the different message types. + +To launch your kernel, add this at the end of your module:: + + if __name__ == '__main__': + from IPython.kernel.zmq.kernelapp import IPKernelApp + IPKernelApp.launch_instance(kernel_class=MyKernel) + +Example +------- + +``echokernel.py`` will simply echo any input it's given to stdout:: + + from IPython.kernel.zmq.kernelbase import Kernel + + class EchoKernel(Kernel): + implementation = 'Echo' + implementation_version = '1.0' + language = 'no-op' + language_version = '0.1' + banner = "Echo kernel - as useful as a parrot" + + def do_execute(self, code, silent, store_history=True, user_experssions=None, + allow_stdin=False): + if not silent: + stream_content = {'name': 'stdout', 'data':code} + self.send_response(self.iopub_socket, 'stream', stream_content) + + return {'status': 'ok', + # The base class increments the execution count + 'execution_count': self.execution_count, + 'payload': [], + 'user_expressions': {}, + } + + if __name__ == '__main__': + from IPython.kernel.zmq.kernelapp import IPKernelApp + IPKernelApp.launch_instance(kernel_class=EchoKernel) + +Here's the Kernel spec ``kernel.json`` file for this:: + + {"argv":["python","-m","echokernel", "-f", "{connection_file}"], + "display_name":"Echo", + "language":"no-op" + } + + +Optional steps +-------------- + +You can override a number of other methods to improve the functionality of your +kernel. All of these methods should return a dictionary as described in the +relevant section of the :doc:`messaging spec `. + +.. class:: MyKernel + + .. method:: do_complete(code, cusor_pos) + + Code completion + + :param str code: The code already present + :param int cursor_pos: The position in the code where completion is requested + + .. seealso:: + + :ref:`msging_completion` messages + + .. method:: do_inspect(code, cusor_pos, detail_level=0) + + Object introspection + + :param str code: The code + :param int cursor_pos: The position in the code where introspection is requested + :param int detail_level: 0 or 1 for more or less detail. In IPython, 1 gets + the source code. + + .. seealso:: + + :ref:`msging_inspection` messages + + .. method:: do_history(hist_access_type, output, raw, session=None, start=None, stop=None, n=None, pattern=None, unique=False) + + History access. Only the relevant parameters for the type of history + request concerned will be passed, so your method definition must have defaults + for all the arguments shown with defaults here. + + .. seealso:: + + :ref:`msging_history` messages + + .. method:: do_shutdown(restart) + + Shutdown the kernel. You only need to handle your own clean up - the kernel + machinery will take care of cleaning up its own things before stopping. + + :param bool restart: Whether the kernel will be started again afterwards + + .. seealso:: + + :ref:`msging_shutdown` messages