diff --git a/IPython/kernel/zmq/ipkernel.py b/IPython/kernel/zmq/ipkernel.py index fc54ea1..31bad97 100644 --- a/IPython/kernel/zmq/ipkernel.py +++ b/IPython/kernel/zmq/ipkernel.py @@ -1,4 +1,286 @@ +"""The IPython kernel implementation""" + +import getpass +import sys +import traceback + +from IPython.core import release +from IPython.utils.py3compat import builtin_mod, PY3 +from IPython.utils.tokenutil import token_at_cursor +from IPython.utils.traitlets import Instance, Type, Any + from .kernelbase import KernelBase +from .serialize import serialize_object, unpack_apply_message +from .zmqshell import ZMQInteractiveShell class Kernel(KernelBase): - pass \ No newline at end of file + shell = Instance('IPython.core.interactiveshell.InteractiveShellABC') + shell_class = Type(ZMQInteractiveShell) + + user_module = Any() + def _user_module_changed(self, name, old, new): + if self.shell is not None: + self.shell.user_module = new + + user_ns = Instance(dict, args=None, allow_none=True) + def _user_ns_changed(self, name, old, new): + if self.shell is not None: + self.shell.user_ns = new + self.shell.init_user_ns() + + # A reference to the Python builtin 'raw_input' function. + # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3) + _sys_raw_input = Any() + _sys_eval_input = Any() + + def __init__(self, **kwargs): + super(Kernel, self).__init__(**kwargs) + + # Initialize the InteractiveShell subclass + self.shell = self.shell_class.instance(parent=self, + profile_dir = self.profile_dir, + user_module = self.user_module, + user_ns = self.user_ns, + kernel = self, + ) + self.shell.displayhook.session = self.session + self.shell.displayhook.pub_socket = self.iopub_socket + self.shell.displayhook.topic = self._topic('execute_result') + self.shell.display_pub.session = self.session + self.shell.display_pub.pub_socket = self.iopub_socket + self.shell.data_pub.session = self.session + self.shell.data_pub.pub_socket = self.iopub_socket + + # TMP - hack while developing + self.shell._reply_content = None + + comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ] + comm_manager = self.shell.comm_manager + for msg_type in comm_msg_types: + self.shell_handlers[msg_type] = getattr(comm_manager, msg_type) + + # Kernel info fields + implementation = 'ipython' + implementation_version = release.version + language = 'python' + language_version = sys.version.split()[0] + @property + def banner(self): + return self.shell.banner + + def set_parent(self, ident, parent): + """Overridden from parent to tell the display hook and output streams + about the parent message. + """ + super(Kernel, self).set_parent(ident, parent) + self.shell.set_parent(parent) + + def _forward_input(self, allow_stdin=False): + """Forward raw_input and getpass to the current frontend. + + via input_request + """ + self._allow_stdin = allow_stdin + + if PY3: + self._sys_raw_input = builtin_mod.input + builtin_mod.input = self.raw_input + else: + self._sys_raw_input = builtin_mod.raw_input + self._sys_eval_input = builtin_mod.input + builtin_mod.raw_input = self.raw_input + builtin_mod.input = lambda prompt='': eval(self.raw_input(prompt)) + self._save_getpass = getpass.getpass + getpass.getpass = self.getpass + + def _restore_input(self): + """Restore raw_input, getpass""" + if PY3: + builtin_mod.input = self._sys_raw_input + else: + builtin_mod.raw_input = self._sys_raw_input + builtin_mod.input = self._sys_eval_input + + getpass.getpass = self._save_getpass + + @property + def execution_count(self): + return self.shell.execution_count + + @execution_count.setter + def execution_count(self, value): + # Ignore the incrememnting done by KernelBase, in favour of our shell's + # execution counter. + pass + + def do_execute(self, code, silent, store_history=True, + user_expressions=None, allow_stdin=False): + shell = self.shell # we'll need this a lot here + + self._forward_input(allow_stdin) + + reply_content = {} + # FIXME: the shell calls the exception handler itself. + shell._reply_content = None + try: + shell.run_cell(code, store_history=store_history, silent=silent) + except: + status = u'error' + # FIXME: this code right now isn't being used yet by default, + # because the run_cell() call above directly fires off exception + # reporting. This code, therefore, is only active in the scenario + # where runlines itself has an unhandled exception. We need to + # uniformize this, for all exception construction to come from a + # single location in the codbase. + etype, evalue, tb = sys.exc_info() + tb_list = traceback.format_exception(etype, evalue, tb) + reply_content.update(shell._showtraceback(etype, evalue, tb_list)) + else: + status = u'ok' + finally: + self._restore_input() + + reply_content[u'status'] = status + + # Return the execution counter so clients can display prompts + reply_content['execution_count'] = shell.execution_count - 1 + + # FIXME - fish exception info out of shell, possibly left there by + # runlines. We'll need to clean up this logic later. + if shell._reply_content is not None: + reply_content.update(shell._reply_content) + e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute') + reply_content['engine_info'] = e_info + # reset after use + shell._reply_content = None + + if 'traceback' in reply_content: + self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback'])) + + + # At this point, we can tell whether the main code execution succeeded + # or not. If it did, we proceed to evaluate user_expressions + if reply_content['status'] == 'ok': + reply_content[u'user_expressions'] = \ + shell.user_expressions(user_expressions or {}) + else: + # If there was an error, don't even try to compute expressions + reply_content[u'user_expressions'] = {} + + # Payloads should be retrieved regardless of outcome, so we can both + # recover partial output (that could have been generated early in a + # block, before an error) and clear the payload system always. + reply_content[u'payload'] = shell.payload_manager.read_payload() + # Be agressive about clearing the payload because we don't want + # it to sit in memory until the next execute_request comes in. + shell.payload_manager.clear_payload() + + return reply_content + + def do_complete(self, code, cursor_pos): + txt, matches = self.shell.complete('', code, cursor_pos) + return {'matches' : matches, + 'cursor_end' : cursor_pos, + 'cursor_start' : cursor_pos - len(txt), + 'metadata' : {}, + 'status' : 'ok'} + + def do_inspect(self, code, cursor_pos, detail_level=0): + name = token_at_cursor(code, cursor_pos) + info = self.shell.object_inspect(name) + + reply_content = {'status' : 'ok'} + reply_content['data'] = data = {} + reply_content['metadata'] = {} + reply_content['found'] = info['found'] + if info['found']: + info_text = self.shell.object_inspect_text( + name, + detail_level=detail_level, + ) + data['text/plain'] = info_text + + return reply_content + + def do_history(self, hist_access_type, output, raw, session=None, start=None, + stop=None, n=None, pattern=None, unique=False): + if hist_access_type == 'tail': + hist = self.shell.history_manager.get_tail(n, raw=raw, output=output, + include_latest=True) + + elif hist_access_type == 'range': + hist = self.shell.history_manager.get_range(session, start, stop, + raw=raw, output=output) + + elif hist_access_type == 'search': + hist = self.shell.history_manager.search( + pattern, raw=raw, output=output, n=n, unique=unique) + else: + hist = [] + + return {'history' : list(hist)} + + def do_shutdown(self, restart): + self.shell.exit_now = True + return dict(status='ok', restart=restart) + + def do_apply(self, content, bufs, msg_id, reply_metadata): + shell = self.shell + try: + working = shell.user_ns + + prefix = "_"+str(msg_id).replace("-","")+"_" + + f,args,kwargs = unpack_apply_message(bufs, working, copy=False) + + fname = getattr(f, '__name__', 'f') + + fname = prefix+"f" + argname = prefix+"args" + kwargname = prefix+"kwargs" + resultname = prefix+"result" + + ns = { fname : f, argname : args, kwargname : kwargs , resultname : None } + # print ns + working.update(ns) + code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname) + try: + exec(code, shell.user_global_ns, shell.user_ns) + result = working.get(resultname) + finally: + for key in ns: + working.pop(key) + + result_buf = serialize_object(result, + buffer_threshold=self.session.buffer_threshold, + item_threshold=self.session.item_threshold, + ) + + except: + # invoke IPython traceback formatting + shell.showtraceback() + # FIXME - fish exception info out of shell, possibly left there by + # run_code. We'll need to clean up this logic later. + reply_content = {} + if shell._reply_content is not None: + reply_content.update(shell._reply_content) + e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply') + reply_content['engine_info'] = e_info + # reset after use + shell._reply_content = None + + self.send_response(self.iopub_socket, u'error', reply_content, + ident=self._topic('error')) + self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback'])) + result_buf = [] + + if reply_content['ename'] == 'UnmetDependency': + reply_metadata['dependencies_met'] = False + else: + reply_content = {'status' : 'ok'} + + return reply_content, result_buf + + def do_clear(self): + self.shell.reset(False) + return dict(status='ok') diff --git a/IPython/kernel/zmq/kernelbase.py b/IPython/kernel/zmq/kernelbase.py index 9eee2a2..5a42774 100755 --- a/IPython/kernel/zmq/kernelbase.py +++ b/IPython/kernel/zmq/kernelbase.py @@ -1,14 +1,12 @@ -"""An interactive kernel that talks to frontends over 0MQ.""" +"""Base class for a kernel that talks to frontends over 0MQ.""" # Copyright (c) IPython Development Team. # Distributed under the terms of the Modified BSD License. from __future__ import print_function -import getpass import sys import time -import traceback import logging import uuid @@ -25,28 +23,19 @@ from IPython.config.configurable import Configurable from IPython.core.error import StdinNotImplementedError from IPython.core import release from IPython.utils import py3compat -from IPython.utils.py3compat import builtin_mod, unicode_type, string_types +from IPython.utils.py3compat import unicode_type, string_types from IPython.utils.jsonutil import json_clean -from IPython.utils.tokenutil import token_at_cursor from IPython.utils.traitlets import ( - Any, Instance, Float, Dict, List, Set, Integer, Unicode, - Type, Bool, + Any, Instance, Float, Dict, List, Set, Integer, Unicode, Bool, ) -from .serialize import serialize_object, unpack_apply_message from .session import Session -from .zmqshell import ZMQInteractiveShell #----------------------------------------------------------------------------- # Main kernel class #----------------------------------------------------------------------------- -protocol_version = release.kernel_protocol_version -ipython_version = release.version -language_version = sys.version.split()[0] - - class KernelBase(Configurable): #--------------------------------------------------------------------------- @@ -60,9 +49,6 @@ class KernelBase(Configurable): loop = ioloop.IOLoop.instance() loop.add_callback(self.enter_eventloop) - shell = Instance('IPython.core.interactiveshell.InteractiveShellABC') - shell_class = Type(ZMQInteractiveShell) - session = Instance(Session) profile_dir = Instance('IPython.core.profiledir.ProfileDir') shell_streams = List() @@ -70,17 +56,6 @@ class KernelBase(Configurable): iopub_socket = Instance(zmq.Socket) stdin_socket = Instance(zmq.Socket) log = Instance(logging.Logger) - - user_module = Any() - def _user_module_changed(self, name, old, new): - if self.shell is not None: - self.shell.user_module = new - - user_ns = Instance(dict, args=None, allow_none=True) - def _user_ns_changed(self, name, old, new): - if self.shell is not None: - self.shell.user_ns = new - self.shell.init_user_ns() # identities: int_id = Integer(-1) @@ -127,36 +102,17 @@ class KernelBase(Configurable): # by record_ports and used by connect_request. _recorded_ports = Dict() - # A reference to the Python builtin 'raw_input' function. - # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3) - _sys_raw_input = Any() - _sys_eval_input = Any() - # set of aborted msg_ids aborted = Set() + # Track execution count here. For IPython, we override this to use the + # execution count we store in the shell. + execution_count = 0 + def __init__(self, **kwargs): super(KernelBase, self).__init__(**kwargs) - # Initialize the InteractiveShell subclass - self.shell = self.shell_class.instance(parent=self, - profile_dir = self.profile_dir, - user_module = self.user_module, - user_ns = self.user_ns, - kernel = self, - ) - self.shell.displayhook.session = self.session - self.shell.displayhook.pub_socket = self.iopub_socket - self.shell.displayhook.topic = self._topic('execute_result') - self.shell.display_pub.session = self.session - self.shell.display_pub.pub_socket = self.iopub_socket - self.shell.data_pub.session = self.session - self.shell.data_pub.pub_socket = self.iopub_socket - - # TMP - hack while developing - self.shell._reply_content = None - # Build dict of handlers for message types msg_types = [ 'execute_request', 'complete_request', 'inspect_request', 'history_request', @@ -168,11 +124,6 @@ class KernelBase(Configurable): for msg_type in msg_types: self.shell_handlers[msg_type] = getattr(self, msg_type) - comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ] - comm_manager = self.shell.comm_manager - for msg_type in comm_msg_types: - self.shell_handlers[msg_type] = getattr(comm_manager, msg_type) - control_msg_types = msg_types + [ 'clear_request', 'abort_request' ] self.control_handlers = {} for msg_type in control_msg_types: @@ -191,7 +142,6 @@ class KernelBase(Configurable): self.log.debug("Control received: %s", msg) header = msg['header'] - msg_id = header['msg_id'] msg_type = header['msg_type'] handler = self.control_handlers.get(msg_type, None) @@ -233,7 +183,7 @@ class KernelBase(Configurable): status = {'status' : 'aborted'} md = {'engine' : self.ident} md.update(status) - reply_msg = self.session.send(stream, reply_type, metadata=md, + self.session.send(stream, reply_type, metadata=md, content=status, parent=msg, ident=idents) return @@ -340,34 +290,6 @@ class KernelBase(Configurable): ident=self._topic('status'), ) - def _forward_input(self, allow_stdin=False): - """Forward raw_input and getpass to the current frontend. - - via input_request - """ - self._allow_stdin = allow_stdin - - if py3compat.PY3: - self._sys_raw_input = builtin_mod.input - builtin_mod.input = self.raw_input - else: - self._sys_raw_input = builtin_mod.raw_input - self._sys_eval_input = builtin_mod.input - builtin_mod.raw_input = self.raw_input - builtin_mod.input = lambda prompt='': eval(self.raw_input(prompt)) - self._save_getpass = getpass.getpass - getpass.getpass = self.getpass - - def _restore_input(self): - """Restore raw_input, getpass""" - if py3compat.PY3: - builtin_mod.input = self._sys_raw_input - else: - builtin_mod.raw_input = self._sys_raw_input - builtin_mod.input = self._sys_eval_input - - getpass.getpass = self._save_getpass - def set_parent(self, ident, parent): """Set the current parent_header @@ -379,7 +301,19 @@ class KernelBase(Configurable): """ self._parent_ident = ident self._parent_header = parent - self.shell.set_parent(parent) + + def send_response(self, stream, msg_or_type, content=None, ident=None, + buffers=None, track=False, header=None, metadata=None): + """Send a response to the message we're currently processing. + + This accepts all the parameters of :meth:`IPython.kernel.zmq.session.Session.send` + except ``parent``. + + This relies on :meth:`set_parent` having been called for the current + message. + """ + return self.session.send(stream, msg_or_type, content, self._parent_header, + ident, buffers, track, header, metadata) def execute_request(self, stream, ident, parent): """handle an execute_request""" @@ -391,6 +325,8 @@ class KernelBase(Configurable): code = py3compat.cast_unicode_py2(content[u'code']) silent = content[u'silent'] store_history = content.get(u'store_history', not silent) + user_expressions = content.get('user_expressions', {}) + allow_stdin = content.get('allow_stdin', False) except: self.log.error("Got bad msg: ") self.log.error("%s", parent) @@ -398,72 +334,17 @@ class KernelBase(Configurable): md = self._make_metadata(parent['metadata']) - shell = self.shell # we'll need this a lot here - - self._forward_input(content.get('allow_stdin', False)) # Set the parent message of the display hook and out streams. self.set_parent(ident, parent) # Re-broadcast our input for the benefit of listening clients, and # start computing output if not silent: - self._publish_execute_input(code, parent, shell.execution_count) - - reply_content = {} - # FIXME: the shell calls the exception handler itself. - shell._reply_content = None - try: - shell.run_cell(code, store_history=store_history, silent=silent) - except: - status = u'error' - # FIXME: this code right now isn't being used yet by default, - # because the run_cell() call above directly fires off exception - # reporting. This code, therefore, is only active in the scenario - # where runlines itself has an unhandled exception. We need to - # uniformize this, for all exception construction to come from a - # single location in the codbase. - etype, evalue, tb = sys.exc_info() - tb_list = traceback.format_exception(etype, evalue, tb) - reply_content.update(shell._showtraceback(etype, evalue, tb_list)) - else: - status = u'ok' - finally: - self._restore_input() - - reply_content[u'status'] = status - - # Return the execution counter so clients can display prompts - reply_content['execution_count'] = shell.execution_count - 1 - - # FIXME - fish exception info out of shell, possibly left there by - # runlines. We'll need to clean up this logic later. - if shell._reply_content is not None: - reply_content.update(shell._reply_content) - e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute') - reply_content['engine_info'] = e_info - # reset after use - shell._reply_content = None - - if 'traceback' in reply_content: - self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback'])) + self.execution_count += 1 + self._publish_execute_input(code, parent, self.execution_count) - - # At this point, we can tell whether the main code execution succeeded - # or not. If it did, we proceed to evaluate user_expressions - if reply_content['status'] == 'ok': - reply_content[u'user_expressions'] = \ - shell.user_expressions(content.get(u'user_expressions', {})) - else: - # If there was an error, don't even try to compute expressions - reply_content[u'user_expressions'] = {} - - # Payloads should be retrieved regardless of outcome, so we can both - # recover partial output (that could have been generated early in a - # block, before an error) and clear the payload system always. - reply_content[u'payload'] = shell.payload_manager.read_payload() - # Be agressive about clearing the payload because we don't want - # it to sit in memory until the next execute_request comes in. - shell.payload_manager.clear_payload() + reply_content = self.do_execute(code, silent, store_history, + user_expressions, allow_stdin) # Flush output before sending the reply. sys.stdout.flush() @@ -498,12 +379,7 @@ class KernelBase(Configurable): code = content['code'] cursor_pos = content['cursor_pos'] - txt, matches = self.shell.complete('', code, cursor_pos) - matches = {'matches' : matches, - 'cursor_end' : cursor_pos, - 'cursor_start' : cursor_pos - len(txt), - 'metadata' : {}, - 'status' : 'ok'} + matches = self.do_complete(code, cursor_pos) matches = json_clean(matches) completion_msg = self.session.send(stream, 'complete_reply', matches, parent, ident) @@ -512,19 +388,8 @@ class KernelBase(Configurable): def inspect_request(self, stream, ident, parent): content = parent['content'] - name = token_at_cursor(content['code'], content['cursor_pos']) - info = self.shell.object_inspect(name) - - reply_content = {'status' : 'ok'} - reply_content['data'] = data = {} - reply_content['metadata'] = {} - reply_content['found'] = info['found'] - if info['found']: - info_text = self.shell.object_inspect_text( - name, - detail_level=content.get('detail_level', 0), - ) - reply_content['data']['text/plain'] = info_text + reply_content = self.do_inspect(content['code'], content['cursor_pos'], + content.get('detail_level', 0)) # Before we send this object over, we scrub it for JSON usage reply_content = json_clean(reply_content) msg = self.session.send(stream, 'inspect_reply', @@ -534,36 +399,22 @@ class KernelBase(Configurable): def history_request(self, stream, ident, parent): # We need to pull these out, as passing **kwargs doesn't work with # unicode keys before Python 2.6.5. - hist_access_type = parent['content']['hist_access_type'] - raw = parent['content']['raw'] - output = parent['content']['output'] - if hist_access_type == 'tail': - n = parent['content']['n'] - hist = self.shell.history_manager.get_tail(n, raw=raw, output=output, - include_latest=True) - - elif hist_access_type == 'range': - session = parent['content']['session'] - start = parent['content']['start'] - stop = parent['content']['stop'] - hist = self.shell.history_manager.get_range(session, start, stop, - raw=raw, output=output) - - elif hist_access_type == 'search': - n = parent['content'].get('n') - unique = parent['content'].get('unique', False) - pattern = parent['content']['pattern'] - hist = self.shell.history_manager.search( - pattern, raw=raw, output=output, n=n, unique=unique) + content = parent['content'] - else: - hist = [] - hist = list(hist) - content = {'history' : hist} - content = json_clean(content) + reply_content = self.do_history(content['hist_access_type'], + content['output'], content['raw'], + content.get('session', None), + content.get('start', None), + content.get('stop', None), + content.get('n', None), + content.get('pattern', None), + content.get('unique', False), + ) + + reply_content = json_clean(reply_content) msg = self.session.send(stream, 'history_reply', - content, parent, ident) - self.log.debug("Sending history reply with %i entries", len(hist)) + reply_content, parent, ident) + self.log.debug("%s", msg) def connect_request(self, stream, ident, parent): if self._recorded_ports is not None: @@ -574,23 +425,24 @@ class KernelBase(Configurable): content, parent, ident) self.log.debug("%s", msg) - def kernel_info_request(self, stream, ident, parent): - vinfo = { - 'protocol_version': protocol_version, - 'implementation': 'ipython', - 'implementation_version': ipython_version, - 'language_version': language_version, - 'language': 'python', - 'banner': self.shell.banner, + @property + def kernel_info(self): + return { + 'protocol_version': release.kernel_protocol_version, + 'implementation': self.implementation, + 'implementation_version': self.implementation_version, + 'language': self.language, + 'language_version': self.language_version, + 'banner': self.banner, } + + def kernel_info_request(self, stream, ident, parent): msg = self.session.send(stream, 'kernel_info_reply', - vinfo, parent, ident) + self.kernel_info, parent, ident) self.log.debug("%s", msg) def shutdown_request(self, stream, ident, parent): - self.shell.exit_now = True - content = dict(status='ok') - content.update(parent['content']) + content = self.do_shutdown(parent['content']['restart']) self.session.send(stream, u'shutdown_reply', content, parent, ident=ident) # same content, but different msg_id for broadcasting on IOPub self._shutdown_message = self.session.msg(u'shutdown_reply', @@ -618,62 +470,11 @@ class KernelBase(Configurable): self._publish_status(u'busy', parent) # Set the parent message of the display hook and out streams. - shell = self.shell - shell.set_parent(parent) + self.set_parent(ident, parent) md = self._make_metadata(parent['metadata']) - try: - working = shell.user_ns - - prefix = "_"+str(msg_id).replace("-","")+"_" - - f,args,kwargs = unpack_apply_message(bufs, working, copy=False) - - fname = getattr(f, '__name__', 'f') - fname = prefix+"f" - argname = prefix+"args" - kwargname = prefix+"kwargs" - resultname = prefix+"result" - - ns = { fname : f, argname : args, kwargname : kwargs , resultname : None } - # print ns - working.update(ns) - code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname) - try: - exec(code, shell.user_global_ns, shell.user_ns) - result = working.get(resultname) - finally: - for key in ns: - working.pop(key) - - result_buf = serialize_object(result, - buffer_threshold=self.session.buffer_threshold, - item_threshold=self.session.item_threshold, - ) - - except: - # invoke IPython traceback formatting - shell.showtraceback() - # FIXME - fish exception info out of shell, possibly left there by - # run_code. We'll need to clean up this logic later. - reply_content = {} - if shell._reply_content is not None: - reply_content.update(shell._reply_content) - e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply') - reply_content['engine_info'] = e_info - # reset after use - shell._reply_content = None - - self.session.send(self.iopub_socket, u'error', reply_content, parent=parent, - ident=self._topic('error')) - self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback'])) - result_buf = [] - - if reply_content['ename'] == 'UnmetDependency': - md['dependencies_met'] = False - else: - reply_content = {'status' : 'ok'} + reply_content, result_buf = self.do_apply(content, bufs, msg_id, md) # put 'ok'/'error' status in header, for scheduler introspection: md['status'] = reply_content['status'] @@ -682,7 +483,7 @@ class KernelBase(Configurable): sys.stdout.flush() sys.stderr.flush() - reply_msg = self.session.send(stream, u'apply_reply', reply_content, + self.session.send(stream, u'apply_reply', reply_content, parent=parent, ident=ident,buffers=result_buf, metadata=md) self._publish_status(u'idle', parent) @@ -708,23 +509,14 @@ class KernelBase(Configurable): def clear_request(self, stream, idents, parent): """Clear our namespace.""" - self.shell.reset(False) - msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent, - content = dict(status='ok')) - + content = self.do_clear() + self.session.send(stream, 'clear_reply', ident=idents, parent=parent, + content = content) #--------------------------------------------------------------------------- # Protected interface #--------------------------------------------------------------------------- - def _wrap_exception(self, method=None): - # import here, because _wrap_exception is only used in parallel, - # and parallel has higher min pyzmq version - from IPython.parallel.error import wrap_exception - e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method) - content = wrap_exception(e_info) - return content - def _topic(self, topic): """prefixed topic for IOPub messages""" if self.int_id >= 0: diff --git a/IPython/parallel/tests/test_asyncresult.py b/IPython/parallel/tests/test_asyncresult.py index ddb458d..80fdf3c 100644 --- a/IPython/parallel/tests/test_asyncresult.py +++ b/IPython/parallel/tests/test_asyncresult.py @@ -188,8 +188,8 @@ class AsyncResultTest(ClusterTestCase): ar = v.apply_async(time.sleep, 0.25) while not ar.ready(): time.sleep(0.01) - self.assertTrue(ar.elapsed < 1) - self.assertTrue(ar.elapsed < 1) + self.assertLess(ar.elapsed, 1) + self.assertLess(ar.elapsed, 1) ar.get(2) def test_hubresult_timestamps(self):