From 6cb2026f76d00422b9daa55ed590c9ab16f71ec6 2012-05-25 23:43:27 From: Brian E. Granger Date: 2012-05-25 23:43:27 Subject: [PATCH] Merge pull request #1630 from minrk/mergekernel Merge divergent Kernel implementations --- diff --git a/IPython/config/application.py b/IPython/config/application.py index ba26b57..f9e94ee 100644 --- a/IPython/config/application.py +++ b/IPython/config/application.py @@ -132,6 +132,30 @@ class Application(SingletonConfigurable): new = getattr(logging, new) self.log_level = new self.log.setLevel(new) + + log_format = Unicode("[%(name)s] %(message)s", config=True, + help="The Logging format template", + ) + log = Instance(logging.Logger) + def _log_default(self): + """Start logging for this application. + + The default is to log to stdout using a StreaHandler. The log level + starts at loggin.WARN, but this can be adjusted by setting the + ``log_level`` attribute. + """ + log = logging.getLogger(self.__class__.__name__) + log.setLevel(self.log_level) + if sys.executable.endswith('pythonw.exe'): + # this should really go to a file, but file-logging is only + # hooked up in parallel applications + _log_handler = logging.StreamHandler(open(os.devnull, 'w')) + else: + _log_handler = logging.StreamHandler() + _log_formatter = logging.Formatter(self.log_format) + _log_handler.setFormatter(_log_formatter) + log.addHandler(_log_handler) + return log # the alias map for configurables aliases = Dict({'log-level' : 'Application.log_level'}) @@ -169,32 +193,11 @@ class Application(SingletonConfigurable): if self.__class__ not in self.classes: self.classes.insert(0, self.__class__) - self.init_logging() - def _config_changed(self, name, old, new): SingletonConfigurable._config_changed(self, name, old, new) self.log.debug('Config changed:') self.log.debug(repr(new)) - def init_logging(self): - """Start logging for this application. - - The default is to log to stdout using a StreaHandler. The log level - starts at loggin.WARN, but this can be adjusted by setting the - ``log_level`` attribute. - """ - self.log = logging.getLogger(self.__class__.__name__) - self.log.setLevel(self.log_level) - if sys.executable.endswith('pythonw.exe'): - # this should really go to a file, but file-logging is only - # hooked up in parallel applications - self._log_handler = logging.StreamHandler(open(os.devnull, 'w')) - else: - self._log_handler = logging.StreamHandler() - self._log_formatter = logging.Formatter("[%(name)s] %(message)s") - self._log_handler.setFormatter(self._log_formatter) - self.log.addHandler(self._log_handler) - @catch_config_error def initialize(self, argv=None): """Do the basic steps to configure me. diff --git a/IPython/core/history.py b/IPython/core/history.py index df981e9..6c51f73 100644 --- a/IPython/core/history.py +++ b/IPython/core/history.py @@ -403,8 +403,9 @@ class HistoryManager(HistoryAccessor): self.save_flag = threading.Event() self.db_input_cache_lock = threading.Lock() self.db_output_cache_lock = threading.Lock() - self.save_thread = HistorySavingThread(self) - self.save_thread.start() + if self.hist_file != ':memory:': + self.save_thread = HistorySavingThread(self) + self.save_thread.start() self.new_session() diff --git a/IPython/core/interactiveshell.py b/IPython/core/interactiveshell.py index 29d5e6c..a2ccbe8 100644 --- a/IPython/core/interactiveshell.py +++ b/IPython/core/interactiveshell.py @@ -712,6 +712,7 @@ class InteractiveShell(SingletonConfigurable, Magic): self._orig_sys_module_state['stderr'] = sys.stderr self._orig_sys_module_state['excepthook'] = sys.excepthook self._orig_sys_modules_main_name = self.user_module.__name__ + self._orig_sys_modules_main_mod = sys.modules.get(self.user_module.__name__) def restore_sys_module_state(self): """Restore the state of the sys module.""" @@ -721,7 +722,8 @@ class InteractiveShell(SingletonConfigurable, Magic): except AttributeError: pass # Reset what what done in self.init_sys_modules - sys.modules[self.user_module.__name__] = self._orig_sys_modules_main_name + if self._orig_sys_modules_main_mod is not None: + sys.modules[self._orig_sys_modules_main_name] = self._orig_sys_modules_main_mod #------------------------------------------------------------------------- # Things related to hooks @@ -2424,7 +2426,7 @@ class InteractiveShell(SingletonConfigurable, Magic): self.showtraceback() warn('Unknown failure executing module: <%s>' % mod_name) - def run_cell(self, raw_cell, store_history=False): + def run_cell(self, raw_cell, store_history=False, silent=False): """Run a complete IPython cell. Parameters @@ -2435,9 +2437,15 @@ class InteractiveShell(SingletonConfigurable, Magic): If True, the raw and translated cell will be stored in IPython's history. For user code calling back into IPython's machinery, this should be set to False. + silent : bool + If True, avoid side-effets, such as implicit displayhooks, history, + and logging. silent=True forces store_history=False. """ if (not raw_cell) or raw_cell.isspace(): return + + if silent: + store_history = False for line in raw_cell.splitlines(): self.input_splitter.push(line) @@ -2462,8 +2470,8 @@ class InteractiveShell(SingletonConfigurable, Magic): if store_history: self.history_manager.store_inputs(self.execution_count, cell, raw_cell) - - self.logger.log(cell, raw_cell) + if not silent: + self.logger.log(cell, raw_cell) if not prefilter_failed: # don't run if prefilter failed @@ -2483,12 +2491,16 @@ class InteractiveShell(SingletonConfigurable, Magic): if store_history: self.execution_count += 1 return None - + + interactivity = "none" if silent else "last_expr" self.run_ast_nodes(code_ast.body, cell_name, - interactivity="last_expr") - + interactivity=interactivity) + # Execute any registered post-execution functions. - for func, status in self._post_execute.iteritems(): + # unless we are silent + post_exec = [] if silent else self._post_execute.iteritems() + + for func, status in post_exec: if self.disable_failing_post_execute and not status: continue try: diff --git a/IPython/core/tests/test_interactiveshell.py b/IPython/core/tests/test_interactiveshell.py index aee855d..21008e7 100644 --- a/IPython/core/tests/test_interactiveshell.py +++ b/IPython/core/tests/test_interactiveshell.py @@ -254,6 +254,61 @@ class InteractiveShellTestCase(unittest.TestCase): self.assertEqual(ip.var_expand(u"{asdf}"), u"{asdf}") # ZeroDivisionError self.assertEqual(ip.var_expand(u"{1/0}"), u"{1/0}") + + def test_silent_nopostexec(self): + """run_cell(silent=True) doesn't invoke post-exec funcs""" + ip = get_ipython() + + d = dict(called=False) + def set_called(): + d['called'] = True + + ip.register_post_execute(set_called) + ip.run_cell("1", silent=True) + self.assertFalse(d['called']) + # double-check that non-silent exec did what we expected + # silent to avoid + ip.run_cell("1") + self.assertTrue(d['called']) + # remove post-exec + ip._post_execute.pop(set_called) + + def test_silent_noadvance(self): + """run_cell(silent=True) doesn't advance execution_count""" + ip = get_ipython() + + ec = ip.execution_count + # silent should force store_history=False + ip.run_cell("1", store_history=True, silent=True) + + self.assertEquals(ec, ip.execution_count) + # double-check that non-silent exec did what we expected + # silent to avoid + ip.run_cell("1", store_history=True) + self.assertEquals(ec+1, ip.execution_count) + + def test_silent_nodisplayhook(self): + """run_cell(silent=True) doesn't trigger displayhook""" + ip = get_ipython() + + d = dict(called=False) + + trap = ip.display_trap + save_hook = trap.hook + + def failing_hook(*args, **kwargs): + d['called'] = True + + try: + trap.hook = failing_hook + ip.run_cell("1", silent=True) + self.assertFalse(d['called']) + # double-check that non-silent exec did what we expected + # silent to avoid + ip.run_cell("1") + self.assertTrue(d['called']) + finally: + trap.hook = save_hook @skipif(sys.version_info[0] >= 3, "softspace removed in py3") def test_print_softspace(self): diff --git a/IPython/extensions/parallelmagic.py b/IPython/extensions/parallelmagic.py index 45bdb1e..2677d27 100644 --- a/IPython/extensions/parallelmagic.py +++ b/IPython/extensions/parallelmagic.py @@ -211,7 +211,7 @@ class ParalleMagic(Plugin): print '[stdout:%i]'%eid, stdout - def pxrun_cell(self, raw_cell, store_history=True): + def pxrun_cell(self, raw_cell, store_history=False, silent=False): """drop-in replacement for InteractiveShell.run_cell. This executes code remotely, instead of in the local namespace. @@ -258,7 +258,7 @@ class ParalleMagic(Plugin): return False else: try: - result = self.active_view.execute(cell, block=False) + result = self.active_view.execute(cell, silent=False, block=False) except: ipself.showtraceback() return True diff --git a/IPython/frontend/consoleapp.py b/IPython/frontend/consoleapp.py index 2776eef..16ae58d 100644 --- a/IPython/frontend/consoleapp.py +++ b/IPython/frontend/consoleapp.py @@ -140,8 +140,6 @@ class IPythonConsoleApp(Configurable): frontend_flags = Any(app_flags) frontend_aliases = Any(app_aliases) - pure = CBool(False, config=True, - help="Use a pure Python kernel instead of an IPython kernel.") # create requested profiles by default, if they don't exist: auto_create = CBool(True) # connection info: @@ -330,9 +328,7 @@ class IPythonConsoleApp(Configurable): ) # start the kernel if not self.existing: - kwargs = dict(ipython=not self.pure) - kwargs['extra_arguments'] = self.kernel_argv - self.kernel_manager.start_kernel(**kwargs) + self.kernel_manager.start_kernel(extra_arguments=self.kernel_argv) elif self.sshserver: # ssh, write new connection file self.kernel_manager.write_connection_file() diff --git a/IPython/frontend/html/notebook/clustermanager.py b/IPython/frontend/html/notebook/clustermanager.py index 27e1e9b..7e4f428 100644 --- a/IPython/frontend/html/notebook/clustermanager.py +++ b/IPython/frontend/html/notebook/clustermanager.py @@ -45,8 +45,6 @@ class DummyIPClusterStart(IPClusterStart): def init_signal(self): pass - def init_logging(self): - pass def reinit_logging(self): pass diff --git a/IPython/frontend/html/notebook/handlers.py b/IPython/frontend/html/notebook/handlers.py index 62b3b82..92752a5 100644 --- a/IPython/frontend/html/notebook/handlers.py +++ b/IPython/frontend/html/notebook/handlers.py @@ -30,6 +30,7 @@ from zmq.utils import jsonapi from IPython.external.decorator import decorator from IPython.zmq.session import Session from IPython.lib.security import passwd_check +from IPython.utils.jsonutil import date_default try: from docutils.core import publish_string @@ -385,13 +386,13 @@ class ZMQStreamHandler(websocket.WebSocketHandler): except KeyError: pass msg.pop('buffers') - return jsonapi.dumps(msg) + return jsonapi.dumps(msg, default=date_default) def _on_zmq_reply(self, msg_list): try: msg = self._reserialize_reply(msg_list) - except: - self.application.log.critical("Malformed message: %r" % msg_list) + except Exception: + self.application.log.critical("Malformed message: %r" % msg_list, exc_info=True) else: self.write_message(msg) diff --git a/IPython/frontend/html/notebook/notebookapp.py b/IPython/frontend/html/notebook/notebookapp.py index 8e9b813..9addd3b 100644 --- a/IPython/frontend/html/notebook/notebookapp.py +++ b/IPython/frontend/html/notebook/notebookapp.py @@ -397,7 +397,6 @@ class NotebookApp(BaseIPythonApplication): self.cluster_manager.update_profiles() def init_logging(self): - super(NotebookApp, self).init_logging() # This prevents double log messages because tornado use a root logger that # self.log is a child of. The logging module dipatches log messages to a log # and all of its ancenstors until propagate is set to False. @@ -500,6 +499,7 @@ class NotebookApp(BaseIPythonApplication): @catch_config_error def initialize(self, argv=None): + self.init_logging() super(NotebookApp, self).initialize(argv) self.init_configurables() self.init_webapp() diff --git a/IPython/frontend/qt/console/qtconsoleapp.py b/IPython/frontend/qt/console/qtconsoleapp.py index 89f0aa8..80ccbd8 100644 --- a/IPython/frontend/qt/console/qtconsoleapp.py +++ b/IPython/frontend/qt/console/qtconsoleapp.py @@ -101,9 +101,7 @@ ipython qtconsole --pylab=inline # start with pylab in inline plotting mode # start with copy of flags flags = dict(flags) qt_flags = { - 'pure' : ({'IPythonQtConsoleApp' : {'pure' : True}}, - "Use a pure Python kernel instead of an IPython kernel."), - 'plain' : ({'ConsoleWidget' : {'kind' : 'plain'}}, + 'plain' : ({'IPythonQtConsoleApp' : {'plain' : True}}, "Disable rich text support."), } qt_flags.update(boolean_flag( @@ -180,18 +178,14 @@ class IPythonQtConsoleApp(BaseIPythonApplication, IPythonConsoleApp): plain = CBool(False, config=True, help="Use a plaintext widget instead of rich text (plain can't print/save).") - def _pure_changed(self, name, old, new): - kind = 'plain' if self.plain else 'rich' + def _plain_changed(self, name, old, new): + kind = 'plain' if new else 'rich' self.config.ConsoleWidget.kind = kind - if self.pure: - self.widget_factory = FrontendWidget - elif self.plain: + if new: self.widget_factory = IPythonWidget else: self.widget_factory = RichIPythonWidget - _plain_changed = _pure_changed - # the factory for creating a widget widget_factory = Any(RichIPythonWidget) @@ -210,7 +204,7 @@ class IPythonQtConsoleApp(BaseIPythonApplication, IPythonConsoleApp): config=self.config, ) # start the kernel - kwargs = dict(ipython=not self.pure) + kwargs = dict() kwargs['extra_arguments'] = self.kernel_argv kernel_manager.start_kernel(**kwargs) kernel_manager.start_channels() @@ -273,17 +267,13 @@ class IPythonQtConsoleApp(BaseIPythonApplication, IPythonConsoleApp): self.window.add_tab_with_frontend(self.widget) self.window.init_menu_bar() - self.window.setWindowTitle('Python' if self.pure else 'IPython') + self.window.setWindowTitle('IPython') def init_colors(self, widget): """Configure the coloring of the widget""" # Note: This will be dramatically simplified when colors # are removed from the backend. - if self.pure: - # only IPythonWidget supports styling - return - # parse the colors arg down to current known labels try: colors = self.config.ZMQInteractiveShell.colors diff --git a/IPython/lib/kernel.py b/IPython/lib/kernel.py index 345ecee..b7144c9 100644 --- a/IPython/lib/kernel.py +++ b/IPython/lib/kernel.py @@ -46,11 +46,11 @@ def get_connection_file(app=None): If unspecified, the currently running app will be used """ if app is None: - from IPython.zmq.kernelapp import KernelApp - if not KernelApp.initialized(): + from IPython.zmq.ipkernel import IPKernelApp + if not IPKernelApp.initialized(): raise RuntimeError("app not specified, and not in a running Kernel") - app = KernelApp.instance() + app = IPKernelApp.instance() return filefind(app.connection_file, ['.', app.profile_dir.security_dir]) def find_connection_file(filename, profile=None): diff --git a/IPython/parallel/__init__.py b/IPython/parallel/__init__.py index 4cb507d..d265596 100644 --- a/IPython/parallel/__init__.py +++ b/IPython/parallel/__init__.py @@ -35,6 +35,28 @@ from .client.asyncresult import * from .client.client import Client from .client.remotefunction import * from .client.view import * +from .util import interactive from .controller.dependency import * +#----------------------------------------------------------------------------- +# Functions +#----------------------------------------------------------------------------- + + +def bind_kernel(**kwargs): + """Bind an Engine's Kernel to be used as a full IPython kernel. + + This allows a running Engine to be used simultaneously as a full IPython kernel + with the QtConsole or other frontends. + + This function returns immediately. + """ + from IPython.parallel.apps.ipengineapp import IPEngineApp + if IPEngineApp.initialized(): + app = IPEngineApp.instance() + else: + raise RuntimeError("Must be called from an IPEngineApp instance") + + return app.bind_kernel(**kwargs) + diff --git a/IPython/parallel/apps/baseapp.py b/IPython/parallel/apps/baseapp.py index 6d6382a..071a06d 100644 --- a/IPython/parallel/apps/baseapp.py +++ b/IPython/parallel/apps/baseapp.py @@ -102,6 +102,10 @@ class BaseParallelApplication(BaseIPythonApplication): def _log_level_default(self): # temporarily override default_log_level to INFO return logging.INFO + + def _log_format_default(self): + """override default log format to include time""" + return u"%(asctime)s.%(msecs).03d [%(name)s] %(message)s" work_dir = Unicode(os.getcwdu(), config=True, help='Set the working dir for the process.' @@ -175,11 +179,14 @@ class BaseParallelApplication(BaseIPythonApplication): else: open_log_file = None if open_log_file is not None: - self.log.removeHandler(self._log_handler) + while self.log.handlers: + self.log.removeHandler(self.log.handlers[0]) self._log_handler = logging.StreamHandler(open_log_file) self.log.addHandler(self._log_handler) + else: + self._log_handler = self.log.handlers[0] # Add timestamps to log format: - self._log_formatter = logging.Formatter("%(asctime)s.%(msecs).03d [%(name)s] %(message)s", + self._log_formatter = logging.Formatter(self.log_format, datefmt="%Y-%m-%d %H:%M:%S") self._log_handler.setFormatter(self._log_formatter) # do not propagate log messages to root logger diff --git a/IPython/parallel/apps/ipcontrollerapp.py b/IPython/parallel/apps/ipcontrollerapp.py index ead40a7..da9070a 100755 --- a/IPython/parallel/apps/ipcontrollerapp.py +++ b/IPython/parallel/apps/ipcontrollerapp.py @@ -434,11 +434,9 @@ class IPControllerApp(BaseParallelApplication): lsock = context.socket(zmq.PUB) lsock.connect(self.log_url) handler = PUBHandler(lsock) - self.log.removeHandler(self._log_handler) handler.root_topic = 'controller' handler.setLevel(self.log_level) self.log.addHandler(handler) - self._log_handler = handler @catch_config_error def initialize(self, argv=None): diff --git a/IPython/parallel/apps/ipengineapp.py b/IPython/parallel/apps/ipengineapp.py index 494bbd3..d24f574 100755 --- a/IPython/parallel/apps/ipengineapp.py +++ b/IPython/parallel/apps/ipengineapp.py @@ -37,6 +37,7 @@ from IPython.parallel.apps.baseapp import ( catch_config_error, ) from IPython.zmq.log import EnginePUBHandler +from IPython.zmq.ipkernel import Kernel, IPKernelApp from IPython.zmq.session import ( Session, session_aliases, session_flags ) @@ -44,11 +45,11 @@ from IPython.zmq.session import ( from IPython.config.configurable import Configurable from IPython.parallel.engine.engine import EngineFactory -from IPython.parallel.engine.streamkernel import Kernel -from IPython.parallel.util import disambiguate_url, asbytes +from IPython.parallel.util import disambiguate_url from IPython.utils.importstring import import_item -from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float +from IPython.utils.py3compat import cast_bytes +from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float, Instance #----------------------------------------------------------------------------- @@ -173,9 +174,17 @@ class IPEngineApp(BaseParallelApplication): log_url = Unicode('', config=True, help="""The URL for the iploggerapp instance, for forwarding logging to a central location.""") + + # an IPKernelApp instance, used to setup listening for shell frontends + kernel_app = Instance(IPKernelApp) aliases = Dict(aliases) flags = Dict(flags) + + @property + def kernel(self): + """allow access to the Kernel object, so I look like IPKernelApp""" + return self.engine.kernel def find_url_file(self): """Set the url file. @@ -203,7 +212,7 @@ class IPEngineApp(BaseParallelApplication): d = json.loads(f.read()) if 'exec_key' in d: - config.Session.key = asbytes(d['exec_key']) + config.Session.key = cast_bytes(d['exec_key']) try: config.EngineFactory.location @@ -220,7 +229,47 @@ class IPEngineApp(BaseParallelApplication): config.EngineFactory.sshserver except AttributeError: config.EngineFactory.sshserver = d['ssh'] + + def bind_kernel(self, **kwargs): + """Promote engine to listening kernel, accessible to frontends.""" + if self.kernel_app is not None: + return + + self.log.info("Opening ports for direct connections as an IPython kernel") + + kernel = self.kernel + + kwargs.setdefault('config', self.config) + kwargs.setdefault('log', self.log) + kwargs.setdefault('profile_dir', self.profile_dir) + kwargs.setdefault('session', self.engine.session) + + app = self.kernel_app = IPKernelApp(**kwargs) + + # allow IPKernelApp.instance(): + IPKernelApp._instance = app + app.init_connection_file() + # relevant contents of init_sockets: + + app.shell_port = app._bind_socket(kernel.shell_streams[0], app.shell_port) + app.log.debug("shell ROUTER Channel on port: %i", app.shell_port) + + app.iopub_port = app._bind_socket(kernel.iopub_socket, app.iopub_port) + app.log.debug("iopub PUB Channel on port: %i", app.iopub_port) + + kernel.stdin_socket = self.engine.context.socket(zmq.ROUTER) + app.stdin_port = app._bind_socket(kernel.stdin_socket, app.stdin_port) + app.log.debug("stdin ROUTER Channel on port: %i", app.stdin_port) + + # start the heartbeat, and log connection info: + + app.init_heartbeat() + + app.log_connection_info() + app.write_connection_file() + + def init_engine(self): # This is the working dir by now. sys.path.insert(0, '') @@ -282,11 +331,9 @@ class IPEngineApp(BaseParallelApplication): context = self.engine.context lsock = context.socket(zmq.PUB) lsock.connect(self.log_url) - self.log.removeHandler(self._log_handler) handler = EnginePUBHandler(self.engine, lsock) handler.setLevel(self.log_level) self.log.addHandler(handler) - self._log_handler = handler def init_mpi(self): global mpi diff --git a/IPython/parallel/apps/launcher.py b/IPython/parallel/apps/launcher.py index f3309c5..e752d2a 100644 --- a/IPython/parallel/apps/launcher.py +++ b/IPython/parallel/apps/launcher.py @@ -23,6 +23,7 @@ import copy import logging import os import stat +import sys import time # signal imports, handling various platforms, versions @@ -59,8 +60,8 @@ from IPython.utils.text import EvalFormatter from IPython.utils.traitlets import ( Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp ) -from IPython.utils.path import get_ipython_module_path, get_home_dir -from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError +from IPython.utils.path import get_home_dir +from IPython.utils.process import find_cmd, FindCmdError from .win32support import forward_read_events @@ -72,18 +73,13 @@ WINDOWS = os.name == 'nt' # Paths to the kernel apps #----------------------------------------------------------------------------- +cmd = "from IPython.parallel.apps.%s import launch_new_instance; launch_new_instance()" -ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path( - 'IPython.parallel.apps.ipclusterapp' -)) +ipcluster_cmd_argv = [sys.executable, "-c", cmd % "ipclusterapp"] -ipengine_cmd_argv = pycmd2argv(get_ipython_module_path( - 'IPython.parallel.apps.ipengineapp' -)) +ipengine_cmd_argv = [sys.executable, "-c", cmd % "ipengineapp"] -ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path( - 'IPython.parallel.apps.ipcontrollerapp' -)) +ipcontroller_cmd_argv = [sys.executable, "-c", cmd % "ipcontrollerapp"] #----------------------------------------------------------------------------- # Base launchers and errors diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py index ef122fb..7770087 100644 --- a/IPython/parallel/client/client.py +++ b/IPython/parallel/client/client.py @@ -36,6 +36,7 @@ from IPython.core.application import BaseIPythonApplication from IPython.utils.jsonutil import rekey from IPython.utils.localinterfaces import LOCAL_IPS from IPython.utils.path import get_ipython_dir +from IPython.utils.py3compat import cast_bytes from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode, Dict, List, Bool, Set, Any) from IPython.external.decorator import decorator @@ -71,6 +72,60 @@ def spin_first(f, self, *args, **kwargs): # Classes #-------------------------------------------------------------------------- + +class ExecuteReply(object): + """wrapper for finished Execute results""" + def __init__(self, msg_id, content, metadata): + self.msg_id = msg_id + self._content = content + self.execution_count = content['execution_count'] + self.metadata = metadata + + def __getitem__(self, key): + return self.metadata[key] + + def __getattr__(self, key): + if key not in self.metadata: + raise AttributeError(key) + return self.metadata[key] + + def __repr__(self): + pyout = self.metadata['pyout'] or {} + text_out = pyout.get('data', {}).get('text/plain', '') + if len(text_out) > 32: + text_out = text_out[:29] + '...' + + return "" % (self.execution_count, text_out) + + def _repr_html_(self): + pyout = self.metadata['pyout'] or {'data':{}} + return pyout['data'].get("text/html") + + def _repr_latex_(self): + pyout = self.metadata['pyout'] or {'data':{}} + return pyout['data'].get("text/latex") + + def _repr_json_(self): + pyout = self.metadata['pyout'] or {'data':{}} + return pyout['data'].get("application/json") + + def _repr_javascript_(self): + pyout = self.metadata['pyout'] or {'data':{}} + return pyout['data'].get("application/javascript") + + def _repr_png_(self): + pyout = self.metadata['pyout'] or {'data':{}} + return pyout['data'].get("image/png") + + def _repr_jpeg_(self): + pyout = self.metadata['pyout'] or {'data':{}} + return pyout['data'].get("image/jpeg") + + def _repr_svg_(self): + pyout = self.metadata['pyout'] or {'data':{}} + return pyout['data'].get("image/svg+xml") + + class Metadata(dict): """Subclass of dict for initializing metadata values. @@ -97,6 +152,7 @@ class Metadata(dict): 'pyerr' : None, 'stdout' : '', 'stderr' : '', + 'outputs' : [], } self.update(md) self.update(dict(*args, **kwargs)) @@ -308,15 +364,19 @@ class Client(HasTraits): if self._cd is not None: if url_or_file is None: url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json') - assert url_or_file is not None, "I can't find enough information to connect to a hub!"\ - " Please specify at least one of url_or_file or profile." + if url_or_file is None: + raise ValueError( + "I can't find enough information to connect to a hub!" + " Please specify at least one of url_or_file or profile." + ) if not util.is_url(url_or_file): # it's not a url, try for a file if not os.path.exists(url_or_file): if self._cd: url_or_file = os.path.join(self._cd.security_dir, url_or_file) - assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file + if not os.path.exists(url_or_file): + raise IOError("Connection file not found: %r" % url_or_file) with open(url_or_file) as f: cfg = json.loads(f.read()) else: @@ -369,7 +429,7 @@ class Client(HasTraits): if os.path.isfile(exec_key): extra_args['keyfile'] = exec_key else: - exec_key = util.asbytes(exec_key) + exec_key = cast_bytes(exec_key) extra_args['key'] = exec_key self.session = Session(**extra_args) @@ -467,7 +527,7 @@ class Client(HasTraits): if not isinstance(targets, (tuple, list, xrange)): raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets))) - return [util.asbytes(self._engines[t]) for t in targets], list(targets) + return [cast_bytes(self._engines[t]) for t in targets], list(targets) def _connect(self, sshserver, ssh_kwargs, timeout): """setup all our socket connections to the cluster. This is called from @@ -628,7 +688,30 @@ class Client(HasTraits): print ("got unknown result: %s"%msg_id) else: self.outstanding.remove(msg_id) - self.results[msg_id] = self._unwrap_exception(msg['content']) + + content = msg['content'] + header = msg['header'] + + # construct metadata: + md = self.metadata[msg_id] + md.update(self._extract_metadata(header, parent, content)) + # is this redundant? + self.metadata[msg_id] = md + + e_outstanding = self._outstanding_dict[md['engine_uuid']] + if msg_id in e_outstanding: + e_outstanding.remove(msg_id) + + # construct result: + if content['status'] == 'ok': + self.results[msg_id] = ExecuteReply(msg_id, content, md) + elif content['status'] == 'aborted': + self.results[msg_id] = error.TaskAborted(msg_id) + elif content['status'] == 'resubmitted': + # TODO: handle resubmission + pass + else: + self.results[msg_id] = self._unwrap_exception(content) def _handle_apply_reply(self, msg): """Save the reply to an apply_request into our results.""" @@ -750,8 +833,13 @@ class Client(HasTraits): md.update({'pyerr' : self._unwrap_exception(content)}) elif msg_type == 'pyin': md.update({'pyin' : content['code']}) + elif msg_type == 'display_data': + md['outputs'].append(content) + elif msg_type == 'pyout': + md['pyout'] = content else: - md.update({msg_type : content.get('data', '')}) + # unhandled msg_type (status, etc.) + pass # reduntant? self.metadata[msg_id] = md @@ -848,14 +936,14 @@ class Client(HasTraits): """ if self._notification_socket: self._flush_notifications() + if self._iopub_socket: + self._flush_iopub(self._iopub_socket) if self._mux_socket: self._flush_results(self._mux_socket) if self._task_socket: self._flush_results(self._task_socket) if self._control_socket: self._flush_control(self._control_socket) - if self._iopub_socket: - self._flush_iopub(self._iopub_socket) if self._query_socket: self._flush_ignored_hub_replies() @@ -1024,14 +1112,16 @@ class Client(HasTraits): return result - def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None, track=False, + def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False, ident=None): """construct and send an apply message via a socket. This is the principal method with which all engine execution is performed by views. """ - assert not self._closed, "cannot use me anymore, I'm closed!" + if self._closed: + raise RuntimeError("Client cannot be used after its sockets have been closed") + # defaults: args = args if args is not None else [] kwargs = kwargs if kwargs is not None else {} @@ -1066,6 +1156,43 @@ class Client(HasTraits): return msg + def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None): + """construct and send an execute request via a socket. + + """ + + if self._closed: + raise RuntimeError("Client cannot be used after its sockets have been closed") + + # defaults: + subheader = subheader if subheader is not None else {} + + # validate arguments + if not isinstance(code, basestring): + raise TypeError("code must be text, not %s" % type(code)) + if not isinstance(subheader, dict): + raise TypeError("subheader must be dict, not %s" % type(subheader)) + + content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={}) + + + msg = self.session.send(socket, "execute_request", content=content, ident=ident, + subheader=subheader) + + msg_id = msg['header']['msg_id'] + self.outstanding.add(msg_id) + if ident: + # possibly routed to a specific engine + if isinstance(ident, list): + ident = ident[-1] + if ident in self._engines.values(): + # save for later, in case of engine death + self._outstanding_dict[ident].add(msg_id) + self.history.append(msg_id) + self.metadata[msg_id]['submitted'] = datetime.now() + + return msg + #-------------------------------------------------------------------------- # construct a View object #-------------------------------------------------------------------------- @@ -1221,12 +1348,6 @@ class Client(HasTraits): raise TypeError("indices must be str or int, not %r"%id) theids.append(id) - for msg_id in theids: - self.outstanding.discard(msg_id) - if msg_id in self.history: - self.history.remove(msg_id) - self.results.pop(msg_id, None) - self.metadata.pop(msg_id, None) content = dict(msg_ids = theids) self.session.send(self._query_socket, 'resubmit_request', content) @@ -1238,8 +1359,10 @@ class Client(HasTraits): content = msg['content'] if content['status'] != 'ok': raise self._unwrap_exception(content) + mapping = content['resubmitted'] + new_ids = [ mapping[msg_id] for msg_id in theids ] - ar = AsyncHubResult(self, msg_ids=theids) + ar = AsyncHubResult(self, msg_ids=new_ids) if block: ar.wait() diff --git a/IPython/parallel/client/view.py b/IPython/parallel/client/view.py index dc1a078..ca29e84 100644 --- a/IPython/parallel/client/view.py +++ b/IPython/parallel/client/view.py @@ -128,13 +128,20 @@ class View(HasTraits): assert not self.__class__ is View, "Don't use base View objects, use subclasses" - def __repr__(self): strtargets = str(self.targets) if len(strtargets) > 16: strtargets = strtargets[:12]+'...]' return "<%s %s>"%(self.__class__.__name__, strtargets) - + + def __len__(self): + if isinstance(self.targets, list): + return len(self.targets) + elif isinstance(self.targets, int): + return 1 + else: + return len(self.client) + def set_flags(self, **kwargs): """set my attribute flags by keyword. @@ -195,7 +202,7 @@ class View(HasTraits): @sync_results @save_ids def _really_apply(self, f, args, kwargs, block=None, **options): - """wrapper for client.send_apply_message""" + """wrapper for client.send_apply_request""" raise NotImplementedError("Implement in subclasses") def apply(self, f, *args, **kwargs): @@ -533,7 +540,7 @@ class DirectView(View): msg_ids = [] trackers = [] for ident in _idents: - msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track, + msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track, ident=ident) if track: trackers.append(msg['tracker']) @@ -547,6 +554,7 @@ class DirectView(View): pass return ar + @spin_after def map(self, f, *sequences, **kwargs): """view.map(f, *sequences, block=self.block) => list|AsyncMapResult @@ -590,7 +598,9 @@ class DirectView(View): pf = ParallelFunction(self, f, block=block, **kwargs) return pf.map(*sequences) - def execute(self, code, targets=None, block=None): + @sync_results + @save_ids + def execute(self, code, silent=True, targets=None, block=None): """Executes `code` on `targets` in blocking or nonblocking manner. ``execute`` is always `bound` (affects engine namespace) @@ -604,7 +614,22 @@ class DirectView(View): whether or not to wait until done to return default: self.block """ - return self._really_apply(util._execute, args=(code,), block=block, targets=targets) + block = self.block if block is None else block + targets = self.targets if targets is None else targets + + _idents = self.client._build_targets(targets)[0] + msg_ids = [] + trackers = [] + for ident in _idents: + msg = self.client.send_execute_request(self._socket, code, silent=silent, ident=ident) + msg_ids.append(msg['header']['msg_id']) + ar = AsyncResult(self.client, msg_ids, fname='execute', targets=targets) + if block: + try: + ar.get() + except KeyboardInterrupt: + pass + return ar def run(self, filename, targets=None, block=None): """Execute contents of `filename` on my engine(s). @@ -996,7 +1021,7 @@ class LoadBalancedView(View): follow = self._render_dependency(follow) subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries) - msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track, + msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track, subheader=subheader) tracker = None if track is False else msg['tracker'] diff --git a/IPython/parallel/controller/heartmonitor.py b/IPython/parallel/controller/heartmonitor.py index 83d967d..927af4c 100755 --- a/IPython/parallel/controller/heartmonitor.py +++ b/IPython/parallel/controller/heartmonitor.py @@ -23,9 +23,10 @@ from zmq.devices import ThreadDevice from zmq.eventloop import ioloop, zmqstream from IPython.config.configurable import LoggingConfigurable +from IPython.utils.py3compat import str_to_bytes from IPython.utils.traitlets import Set, Instance, CFloat, Integer -from IPython.parallel.util import asbytes, log_errors +from IPython.parallel.util import log_errors class Heart(object): """A basic heart object for responding to a HeartMonitor. @@ -123,7 +124,7 @@ class HeartMonitor(LoggingConfigurable): self.responses = set() # print self.on_probation, self.hearts # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts)) - self.pingstream.send(asbytes(str(self.lifetime))) + self.pingstream.send(str_to_bytes(str(self.lifetime))) # flush stream to force immediate socket send self.pingstream.flush() @@ -151,8 +152,8 @@ class HeartMonitor(LoggingConfigurable): @log_errors def handle_pong(self, msg): "a heart just beat" - current = asbytes(str(self.lifetime)) - last = asbytes(str(self.last_ping)) + current = str_to_bytes(str(self.lifetime)) + last = str_to_bytes(str(self.last_ping)) if msg[1] == current: delta = time.time()-self.tic # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index c6e51a8..892e5e5 100644 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -28,6 +28,7 @@ from zmq.eventloop.zmqstream import ZMQStream # internal: from IPython.utils.importstring import import_item +from IPython.utils.py3compat import cast_bytes from IPython.utils.traitlets import ( HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName ) @@ -441,7 +442,7 @@ class Hub(SessionFactory): for t in targets: # map raw identities to ids if isinstance(t, (str,unicode)): - t = self.by_ident.get(t, t) + t = self.by_ident.get(cast_bytes(t), t) _targets.append(t) targets = _targets bad_targets = [ t for t in targets if t not in self.ids ] @@ -467,13 +468,13 @@ class Hub(SessionFactory): except ValueError: idents=[] if not idents: - self.log.error("Bad Monitor Message: %r", msg) + self.log.error("Monitor message without topic: %r", msg) return handler = self.monitor_handlers.get(switch, None) if handler is not None: handler(idents, msg) else: - self.log.error("Invalid monitor topic: %r", switch) + self.log.error("Unrecognized monitor topic: %r", switch) @util.log_errors @@ -719,15 +720,18 @@ class Hub(SessionFactory): self.unassigned.remove(msg_id) header = msg['header'] - engine_uuid = header.get('engine', None) - eid = self.by_ident.get(engine_uuid, None) + engine_uuid = header.get('engine', u'') + eid = self.by_ident.get(cast_bytes(engine_uuid), None) + + status = header.get('status', None) if msg_id in self.pending: self.log.info("task::task %r finished on %s", msg_id, eid) self.pending.remove(msg_id) self.all_completed.add(msg_id) if eid is not None: - self.completed[eid].append(msg_id) + if status != 'aborted': + self.completed[eid].append(msg_id) if msg_id in self.tasks[eid]: self.tasks[eid].remove(msg_id) completed = header['date'] @@ -760,7 +764,7 @@ class Hub(SessionFactory): # print (content) msg_id = content['msg_id'] engine_uuid = content['engine_id'] - eid = self.by_ident[util.asbytes(engine_uuid)] + eid = self.by_ident[cast_bytes(engine_uuid)] self.log.info("task::task %r arrived on %r", msg_id, eid) if msg_id in self.unassigned: @@ -796,7 +800,7 @@ class Hub(SessionFactory): parent = msg['parent_header'] if not parent: - self.log.error("iopub::invalid IOPub message: %r", msg) + self.log.warn("iopub::IOPub message lacks parent: %r", msg) return msg_id = parent['msg_id'] msg_type = msg['header']['msg_type'] @@ -850,13 +854,13 @@ class Hub(SessionFactory): """Register a new engine.""" content = msg['content'] try: - queue = util.asbytes(content['queue']) + queue = cast_bytes(content['queue']) except KeyError: self.log.error("registration::queue not specified", exc_info=True) return heart = content.get('heartbeat', None) if heart: - heart = util.asbytes(heart) + heart = cast_bytes(heart) """register a new engine, and create the socket(s) necessary""" eid = self._next_id # print (eid, queue, reg, heart) @@ -1130,7 +1134,7 @@ class Hub(SessionFactory): # validate msg_ids found_ids = [ rec['msg_id'] for rec in records ] - invalid_ids = filter(lambda m: m in self.pending, found_ids) + pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ] if len(records) > len(msg_ids): try: raise RuntimeError("DB appears to be in an inconsistent state." @@ -1143,40 +1147,46 @@ class Hub(SessionFactory): raise KeyError("No such msg(s): %r" % missing) except KeyError: return finish(error.wrap_exception()) - elif invalid_ids: - msg_id = invalid_ids[0] + elif pending_ids: + pass + # no need to raise on resubmit of pending task, now that we + # resubmit under new ID, but do we want to raise anyway? + # msg_id = invalid_ids[0] + # try: + # raise ValueError("Task(s) %r appears to be inflight" % ) + # except Exception: + # return finish(error.wrap_exception()) + + # mapping of original IDs to resubmitted IDs + resubmitted = {} + + # send the messages + for rec in records: + header = rec['header'] + msg = self.session.msg(header['msg_type']) + msg_id = msg['msg_id'] + msg['content'] = rec['content'] + header.update(msg['header']) + msg['header'] = header + + self.session.send(self.resubmit, msg, buffers=rec['buffers']) + + resubmitted[rec['msg_id']] = msg_id + self.pending.add(msg_id) + msg['buffers'] = [] try: - raise ValueError("Task %r appears to be inflight" % msg_id) + self.db.add_record(msg_id, init_record(msg)) except Exception: - return finish(error.wrap_exception()) + self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True) - # clear the existing records - now = datetime.now() - rec = empty_record() - map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted']) - rec['resubmitted'] = now - rec['queue'] = 'task' - rec['client_uuid'] = client_id[0] - try: - for msg_id in msg_ids: - self.all_completed.discard(msg_id) - self.db.update_record(msg_id, rec) - except Exception: - self.log.error('db::db error upating record', exc_info=True) - reply = error.wrap_exception() - else: - # send the messages - for rec in records: - header = rec['header'] - # include resubmitted in header to prevent digest collision - header['resubmitted'] = now - msg = self.session.msg(header['msg_type']) - msg['content'] = rec['content'] - msg['header'] = header - msg['header']['msg_id'] = rec['msg_id'] - self.session.send(self.resubmit, msg, buffers=rec['buffers']) - - finish(dict(status='ok')) + finish(dict(status='ok', resubmitted=resubmitted)) + + # store the new IDs in the Task DB + for msg_id, resubmit_id in resubmitted.iteritems(): + try: + self.db.update_record(msg_id, {'resubmitted' : resubmit_id}) + except Exception: + self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True) def _extract_record(self, rec): diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index 25c4a2c..eea1043 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -42,10 +42,11 @@ from IPython.external.decorator import decorator from IPython.config.application import Application from IPython.config.loader import Config from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes +from IPython.utils.py3compat import cast_bytes from IPython.parallel import error, util from IPython.parallel.factory import SessionFactory -from IPython.parallel.util import connect_logger, local_logger, asbytes +from IPython.parallel.util import connect_logger, local_logger from .dependency import Dependency @@ -262,7 +263,7 @@ class TaskScheduler(SessionFactory): self.log.error("Unhandled message type: %r"%msg_type) else: try: - handler(asbytes(msg['content']['queue'])) + handler(cast_bytes(msg['content']['queue'])) except Exception: self.log.error("task::Invalid notification msg: %r", msg, exc_info=True) @@ -316,7 +317,7 @@ class TaskScheduler(SessionFactory): # prevent double-handling of messages continue - raw_msg = lost[msg_id][0] + raw_msg = lost[msg_id].raw_msg idents,msg = self.session.feed_identities(raw_msg, copy=False) parent = self.session.unpack(msg[1].bytes) idents = [engine, idents[0]] @@ -370,7 +371,7 @@ class TaskScheduler(SessionFactory): # get targets as a set of bytes objects # from a list of unicode objects targets = header.get('targets', []) - targets = map(asbytes, targets) + targets = map(cast_bytes, targets) targets = set(targets) retries = header.get('retries', 0) diff --git a/IPython/parallel/controller/sqlitedb.py b/IPython/parallel/controller/sqlitedb.py index 0a85517..2b017d4 100644 --- a/IPython/parallel/controller/sqlitedb.py +++ b/IPython/parallel/controller/sqlitedb.py @@ -138,7 +138,7 @@ class SQLiteDB(BaseDB): 'engine_uuid' : 'text', 'started' : 'timestamp', 'completed' : 'timestamp', - 'resubmitted' : 'timestamp', + 'resubmitted' : 'text', 'received' : 'timestamp', 'result_header' : 'dict text', 'result_content' : 'dict text', @@ -226,15 +226,16 @@ class SQLiteDB(BaseDB): # isolation_level = None)#, cached_statements=64) # print dir(self._db) - first_table = self.table + first_table = previous_table = self.table i=0 while not self._check_table(): i+=1 self.table = first_table+'_%i'%i self.log.warn( "Table %s exists and doesn't match db format, trying %s"% - (first_table,self.table) + (previous_table, self.table) ) + previous_table = self.table self._db.execute("""CREATE TABLE IF NOT EXISTS %s (msg_id text PRIMARY KEY, @@ -246,7 +247,7 @@ class SQLiteDB(BaseDB): engine_uuid text, started timestamp, completed timestamp, - resubmitted timestamp, + resubmitted text, received timestamp, result_header dict text, result_content dict text, diff --git a/IPython/parallel/engine/engine.py b/IPython/parallel/engine/engine.py index fa0f8d4..136af66 100644 --- a/IPython/parallel/engine/engine.py +++ b/IPython/parallel/engine/engine.py @@ -27,15 +27,14 @@ from IPython.external.ssh import tunnel from IPython.utils.traitlets import ( Instance, Dict, Integer, Type, CFloat, Unicode, CBytes, Bool ) -from IPython.utils import py3compat +from IPython.utils.py3compat import cast_bytes from IPython.parallel.controller.heartmonitor import Heart from IPython.parallel.factory import RegistrationFactory -from IPython.parallel.util import disambiguate_url, asbytes +from IPython.parallel.util import disambiguate_url from IPython.zmq.session import Message - -from .streamkernel import Kernel +from IPython.zmq.ipkernel import Kernel class EngineFactory(RegistrationFactory): """IPython engine""" @@ -70,7 +69,7 @@ class EngineFactory(RegistrationFactory): bident = CBytes() ident = Unicode() def _ident_changed(self, name, old, new): - self.bident = asbytes(new) + self.bident = cast_bytes(new) using_ssh=Bool(False) @@ -185,23 +184,27 @@ class EngineFactory(RegistrationFactory): # create iopub stream: iopub_addr = msg.content.iopub - iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop) - iopub_stream.setsockopt(zmq.IDENTITY, identity) - connect(iopub_stream, iopub_addr) - - # # Redirect input streams and set a display hook. + iopub_socket = ctx.socket(zmq.PUB) + iopub_socket.setsockopt(zmq.IDENTITY, identity) + connect(iopub_socket, iopub_addr) + + # disable history: + self.config.HistoryManager.hist_file = ':memory:' + + # Redirect input streams and set a display hook. if self.out_stream_factory: - sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout') - sys.stdout.topic = py3compat.cast_bytes('engine.%i.stdout' % self.id) - sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr') - sys.stderr.topic = py3compat.cast_bytes('engine.%i.stderr' % self.id) + sys.stdout = self.out_stream_factory(self.session, iopub_socket, u'stdout') + sys.stdout.topic = cast_bytes('engine.%i.stdout' % self.id) + sys.stderr = self.out_stream_factory(self.session, iopub_socket, u'stderr') + sys.stderr.topic = cast_bytes('engine.%i.stderr' % self.id) if self.display_hook_factory: - sys.displayhook = self.display_hook_factory(self.session, iopub_stream) - sys.displayhook.topic = py3compat.cast_bytes('engine.%i.pyout' % self.id) + sys.displayhook = self.display_hook_factory(self.session, iopub_socket) + sys.displayhook.topic = cast_bytes('engine.%i.pyout' % self.id) self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session, - control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream, - loop=loop, user_ns = self.user_ns, log=self.log) + control_stream=control_stream, shell_streams=shell_streams, iopub_socket=iopub_socket, + loop=loop, user_ns=self.user_ns, log=self.log) + self.kernel.shell.display_pub.topic = cast_bytes('engine.%i.displaypub' % self.id) self.kernel.start() diff --git a/IPython/parallel/engine/streamkernel.py b/IPython/parallel/engine/streamkernel.py deleted file mode 100644 index 91fe845..0000000 --- a/IPython/parallel/engine/streamkernel.py +++ /dev/null @@ -1,414 +0,0 @@ -""" -Kernel adapted from kernel.py to use ZMQ Streams - -Authors: - -* Min RK -* Brian Granger -* Fernando Perez -* Evan Patterson -""" -#----------------------------------------------------------------------------- -# Copyright (C) 2010-2011 The IPython Development Team -# -# Distributed under the terms of the BSD License. The full license is in -# the file COPYING, distributed as part of this software. -#----------------------------------------------------------------------------- - -#----------------------------------------------------------------------------- -# Imports -#----------------------------------------------------------------------------- - -# Standard library imports. -from __future__ import print_function - -import sys -import time - -from code import CommandCompiler -from datetime import datetime -from pprint import pprint - -# System library imports. -import zmq -from zmq.eventloop import ioloop, zmqstream - -# Local imports. -from IPython.utils.traitlets import Instance, List, Integer, Dict, Set, Unicode, CBytes -from IPython.zmq.completer import KernelCompleter - -from IPython.parallel.error import wrap_exception -from IPython.parallel.factory import SessionFactory -from IPython.parallel.util import serialize_object, unpack_apply_message, asbytes - -def printer(*args): - pprint(args, stream=sys.__stdout__) - - -class _Passer(zmqstream.ZMQStream): - """Empty class that implements `send()` that does nothing. - - Subclass ZMQStream for Session typechecking - - """ - def __init__(self, *args, **kwargs): - pass - - def send(self, *args, **kwargs): - pass - send_multipart = send - - -#----------------------------------------------------------------------------- -# Main kernel class -#----------------------------------------------------------------------------- - -class Kernel(SessionFactory): - - #--------------------------------------------------------------------------- - # Kernel interface - #--------------------------------------------------------------------------- - - # kwargs: - exec_lines = List(Unicode, config=True, - help="List of lines to execute") - - # identities: - int_id = Integer(-1) - bident = CBytes() - ident = Unicode() - def _ident_changed(self, name, old, new): - self.bident = asbytes(new) - - user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""") - - control_stream = Instance(zmqstream.ZMQStream) - task_stream = Instance(zmqstream.ZMQStream) - iopub_stream = Instance(zmqstream.ZMQStream) - client = Instance('IPython.parallel.Client') - - # internals - shell_streams = List() - compiler = Instance(CommandCompiler, (), {}) - completer = Instance(KernelCompleter) - - aborted = Set() - shell_handlers = Dict() - control_handlers = Dict() - - def _set_prefix(self): - self.prefix = "engine.%s"%self.int_id - - def _connect_completer(self): - self.completer = KernelCompleter(self.user_ns) - - def __init__(self, **kwargs): - super(Kernel, self).__init__(**kwargs) - self._set_prefix() - self._connect_completer() - - self.on_trait_change(self._set_prefix, 'id') - self.on_trait_change(self._connect_completer, 'user_ns') - - # Build dict of handlers for message types - for msg_type in ['execute_request', 'complete_request', 'apply_request', - 'clear_request']: - self.shell_handlers[msg_type] = getattr(self, msg_type) - - for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys(): - self.control_handlers[msg_type] = getattr(self, msg_type) - - self._initial_exec_lines() - - def _wrap_exception(self, method=None): - e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method) - content=wrap_exception(e_info) - return content - - def _initial_exec_lines(self): - s = _Passer() - content = dict(silent=True, user_variable=[],user_expressions=[]) - for line in self.exec_lines: - self.log.debug("executing initialization: %s"%line) - content.update({'code':line}) - msg = self.session.msg('execute_request', content) - self.execute_request(s, [], msg) - - - #-------------------- control handlers ----------------------------- - def abort_queues(self): - for stream in self.shell_streams: - if stream: - self.abort_queue(stream) - - def abort_queue(self, stream): - while True: - idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True) - if msg is None: - return - - self.log.info("Aborting:") - self.log.info(str(msg)) - msg_type = msg['header']['msg_type'] - reply_type = msg_type.split('_')[0] + '_reply' - # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg) - # self.reply_socket.send(ident,zmq.SNDMORE) - # self.reply_socket.send_json(reply_msg) - reply_msg = self.session.send(stream, reply_type, - content={'status' : 'aborted'}, parent=msg, ident=idents) - self.log.debug(str(reply_msg)) - # We need to wait a bit for requests to come in. This can probably - # be set shorter for true asynchronous clients. - time.sleep(0.05) - - def abort_request(self, stream, ident, parent): - """abort a specifig msg by id""" - msg_ids = parent['content'].get('msg_ids', None) - if isinstance(msg_ids, basestring): - msg_ids = [msg_ids] - if not msg_ids: - self.abort_queues() - for mid in msg_ids: - self.aborted.add(str(mid)) - - content = dict(status='ok') - reply_msg = self.session.send(stream, 'abort_reply', content=content, - parent=parent, ident=ident) - self.log.debug(str(reply_msg)) - - def shutdown_request(self, stream, ident, parent): - """kill ourself. This should really be handled in an external process""" - try: - self.abort_queues() - except: - content = self._wrap_exception('shutdown') - else: - content = dict(parent['content']) - content['status'] = 'ok' - msg = self.session.send(stream, 'shutdown_reply', - content=content, parent=parent, ident=ident) - self.log.debug(str(msg)) - dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop) - dc.start() - - def dispatch_control(self, msg): - idents,msg = self.session.feed_identities(msg, copy=False) - try: - msg = self.session.unserialize(msg, content=True, copy=False) - except: - self.log.error("Invalid Message", exc_info=True) - return - else: - self.log.debug("Control received, %s", msg) - - header = msg['header'] - msg_id = header['msg_id'] - msg_type = header['msg_type'] - - handler = self.control_handlers.get(msg_type, None) - if handler is None: - self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg_type) - else: - handler(self.control_stream, idents, msg) - - - #-------------------- queue helpers ------------------------------ - - def check_dependencies(self, dependencies): - if not dependencies: - return True - if len(dependencies) == 2 and dependencies[0] in 'any all'.split(): - anyorall = dependencies[0] - dependencies = dependencies[1] - else: - anyorall = 'all' - results = self.client.get_results(dependencies,status_only=True) - if results['status'] != 'ok': - return False - - if anyorall == 'any': - if not results['completed']: - return False - else: - if results['pending']: - return False - - return True - - def check_aborted(self, msg_id): - return msg_id in self.aborted - - #-------------------- queue handlers ----------------------------- - - def clear_request(self, stream, idents, parent): - """Clear our namespace.""" - self.user_ns = {} - msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent, - content = dict(status='ok')) - self._initial_exec_lines() - - def execute_request(self, stream, ident, parent): - self.log.debug('execute request %s'%parent) - try: - code = parent[u'content'][u'code'] - except: - self.log.error("Got bad msg: %s"%parent, exc_info=True) - return - self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent, - ident=asbytes('%s.pyin'%self.prefix)) - started = datetime.now() - try: - comp_code = self.compiler(code, '') - # allow for not overriding displayhook - if hasattr(sys.displayhook, 'set_parent'): - sys.displayhook.set_parent(parent) - sys.stdout.set_parent(parent) - sys.stderr.set_parent(parent) - exec comp_code in self.user_ns, self.user_ns - except: - exc_content = self._wrap_exception('execute') - # exc_msg = self.session.msg(u'pyerr', exc_content, parent) - self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, - ident=asbytes('%s.pyerr'%self.prefix)) - reply_content = exc_content - else: - reply_content = {'status' : 'ok'} - - reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, - ident=ident, subheader = dict(started=started)) - self.log.debug(str(reply_msg)) - if reply_msg['content']['status'] == u'error': - self.abort_queues() - - def complete_request(self, stream, ident, parent): - matches = {'matches' : self.complete(parent), - 'status' : 'ok'} - completion_msg = self.session.send(stream, 'complete_reply', - matches, parent, ident) - # print >> sys.__stdout__, completion_msg - - def complete(self, msg): - return self.completer.complete(msg.content.line, msg.content.text) - - def apply_request(self, stream, ident, parent): - # flush previous reply, so this request won't block it - stream.flush(zmq.POLLOUT) - try: - content = parent[u'content'] - bufs = parent[u'buffers'] - msg_id = parent['header']['msg_id'] - # bound = parent['header'].get('bound', False) - except: - self.log.error("Got bad msg: %s"%parent, exc_info=True) - return - # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) - # self.iopub_stream.send(pyin_msg) - # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent) - sub = {'dependencies_met' : True, 'engine' : self.ident, - 'started': datetime.now()} - try: - # allow for not overriding displayhook - if hasattr(sys.displayhook, 'set_parent'): - sys.displayhook.set_parent(parent) - sys.stdout.set_parent(parent) - sys.stderr.set_parent(parent) - # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns - working = self.user_ns - # suffix = - prefix = "_"+str(msg_id).replace("-","")+"_" - - f,args,kwargs = unpack_apply_message(bufs, working, copy=False) - # if bound: - # bound_ns = Namespace(working) - # args = [bound_ns]+list(args) - - fname = getattr(f, '__name__', 'f') - - fname = prefix+"f" - argname = prefix+"args" - kwargname = prefix+"kwargs" - resultname = prefix+"result" - - ns = { fname : f, argname : args, kwargname : kwargs , resultname : None } - # print ns - working.update(ns) - code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname) - try: - exec code in working,working - result = working.get(resultname) - finally: - for key in ns.iterkeys(): - working.pop(key) - # if bound: - # working.update(bound_ns) - - packed_result,buf = serialize_object(result) - result_buf = [packed_result]+buf - except: - exc_content = self._wrap_exception('apply') - # exc_msg = self.session.msg(u'pyerr', exc_content, parent) - self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, - ident=asbytes('%s.pyerr'%self.prefix)) - reply_content = exc_content - result_buf = [] - - if exc_content['ename'] == 'UnmetDependency': - sub['dependencies_met'] = False - else: - reply_content = {'status' : 'ok'} - - # put 'ok'/'error' status in header, for scheduler introspection: - sub['status'] = reply_content['status'] - - reply_msg = self.session.send(stream, u'apply_reply', reply_content, - parent=parent, ident=ident,buffers=result_buf, subheader=sub) - - # flush i/o - # should this be before reply_msg is sent, like in the single-kernel code, - # or should nothing get in the way of real results? - sys.stdout.flush() - sys.stderr.flush() - - def dispatch_queue(self, stream, msg): - self.control_stream.flush() - idents,msg = self.session.feed_identities(msg, copy=False) - try: - msg = self.session.unserialize(msg, content=True, copy=False) - except: - self.log.error("Invalid Message", exc_info=True) - return - else: - self.log.debug("Message received, %s", msg) - - - header = msg['header'] - msg_id = header['msg_id'] - msg_type = msg['header']['msg_type'] - if self.check_aborted(msg_id): - self.aborted.remove(msg_id) - # is it safe to assume a msg_id will not be resubmitted? - reply_type = msg_type.split('_')[0] + '_reply' - status = {'status' : 'aborted'} - reply_msg = self.session.send(stream, reply_type, subheader=status, - content=status, parent=msg, ident=idents) - return - handler = self.shell_handlers.get(msg_type, None) - if handler is None: - self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg_type) - else: - handler(stream, idents, msg) - - def start(self): - #### stream mode: - if self.control_stream: - self.control_stream.on_recv(self.dispatch_control, copy=False) - - def make_dispatcher(stream): - def dispatcher(msg): - return self.dispatch_queue(stream, msg) - return dispatcher - - for s in self.shell_streams: - s.on_recv(make_dispatcher(s), copy=False) - - diff --git a/IPython/parallel/tests/__init__.py b/IPython/parallel/tests/__init__.py index 70d88c6..6e03b8c 100644 --- a/IPython/parallel/tests/__init__.py +++ b/IPython/parallel/tests/__init__.py @@ -65,7 +65,7 @@ def setup(): if cp.poll() is not None: print cp.poll() raise RuntimeError("The test controller failed to start.") - elif time.time()-tic > 10: + elif time.time()-tic > 15: raise RuntimeError("Timeout waiting for the test controller to start.") time.sleep(0.1) add_engines(1) @@ -93,7 +93,7 @@ def add_engines(n=1, profile='iptest', total=False): while len(rc) < base+n: if any([ ep.poll() is not None for ep in eps ]): raise RuntimeError("A test engine failed to start.") - elif time.time()-tic > 10: + elif time.time()-tic > 15: raise RuntimeError("Timeout waiting for engines to connect.") time.sleep(.1) rc.spin() diff --git a/IPython/parallel/tests/test_asyncresult.py b/IPython/parallel/tests/test_asyncresult.py index dfc3d36..2a831e1 100644 --- a/IPython/parallel/tests/test_asyncresult.py +++ b/IPython/parallel/tests/test_asyncresult.py @@ -154,7 +154,7 @@ class AsyncResultTest(ClusterTestCase): ar = v.apply_async(time.sleep, 0.25) self.assertRaises(TimeoutError, getattr, ar, 'serial_time') ar.get(2) - self.assertTrue(ar.serial_time < 0.5) + self.assertTrue(ar.serial_time < 1.) self.assertTrue(ar.serial_time > 0.2) def test_serial_time_multi(self): @@ -171,8 +171,8 @@ class AsyncResultTest(ClusterTestCase): ar = v.apply_async(time.sleep, 0.25) while not ar.ready(): time.sleep(0.01) - self.assertTrue(ar.elapsed < 0.3) - self.assertTrue(ar.elapsed < 0.3) + self.assertTrue(ar.elapsed < 1) + self.assertTrue(ar.elapsed < 1) ar.get(2) def test_elapsed_multi(self): @@ -180,8 +180,8 @@ class AsyncResultTest(ClusterTestCase): ar = v.apply_async(time.sleep, 0.25) while not ar.ready(): time.sleep(0.01) - self.assertTrue(ar.elapsed < 0.3) - self.assertTrue(ar.elapsed < 0.3) + self.assertTrue(ar.elapsed < 1) + self.assertTrue(ar.elapsed < 1) ar.get(2) def test_hubresult_timestamps(self): diff --git a/IPython/parallel/tests/test_client.py b/IPython/parallel/tests/test_client.py index 48ba9e9..99a0060 100644 --- a/IPython/parallel/tests/test_client.py +++ b/IPython/parallel/tests/test_client.py @@ -280,6 +280,25 @@ class TestClient(ClusterTestCase): time.sleep(0.25) self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids) + def _wait_for_idle(self): + """wait for an engine to become idle, according to the Hub""" + rc = self.client + + # timeout 2s, polling every 100ms + for i in range(20): + qs = rc.queue_status() + if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids): + time.sleep(0.1) + else: + break + + # ensure Hub up to date: + qs = rc.queue_status() + self.assertEquals(qs['unassigned'], 0) + for eid in rc.ids: + self.assertEquals(qs[eid]['tasks'], 0) + + def test_resubmit(self): def f(): import random @@ -288,19 +307,38 @@ class TestClient(ClusterTestCase): ar = v.apply_async(f) r1 = ar.get(1) # give the Hub a chance to notice: - time.sleep(0.5) + self._wait_for_idle() ahr = self.client.resubmit(ar.msg_ids) r2 = ahr.get(1) self.assertFalse(r1 == r2) + def test_resubmit_aborted(self): + def f(): + import random + return random.random() + v = self.client.load_balanced_view() + # restrict to one engine, so we can put a sleep + # ahead of the task, so it will get aborted + eid = self.client.ids[-1] + v.targets = [eid] + sleep = v.apply_async(time.sleep, 0.5) + ar = v.apply_async(f) + ar.abort() + self.assertRaises(error.TaskAborted, ar.get) + # Give the Hub a chance to get up to date: + self._wait_for_idle() + ahr = self.client.resubmit(ar.msg_ids) + r2 = ahr.get(1) + def test_resubmit_inflight(self): - """ensure ValueError on resubmit of inflight task""" + """resubmit of inflight task""" v = self.client.load_balanced_view() ar = v.apply_async(time.sleep,1) # give the message a chance to arrive time.sleep(0.2) - self.assertRaisesRemote(ValueError, self.client.resubmit, ar.msg_ids) + ahr = self.client.resubmit(ar.msg_ids) ar.get(2) + ahr.get(2) def test_resubmit_badkey(self): """ensure KeyError on resubmit of nonexistant task""" diff --git a/IPython/parallel/tests/test_db.py b/IPython/parallel/tests/test_db.py index 90a3902..01cc523 100644 --- a/IPython/parallel/tests/test_db.py +++ b/IPython/parallel/tests/test_db.py @@ -18,6 +18,7 @@ Authors: from __future__ import division +import logging import os import tempfile import time @@ -226,7 +227,9 @@ class TestSQLiteBackend(TestDictBackend): @dec.skip_without('sqlite3') def create_db(self): location, fname = os.path.split(temp_db) - return SQLiteDB(location=location, fname=fname) + log = logging.getLogger('test') + log.setLevel(logging.CRITICAL) + return SQLiteDB(location=location, fname=fname, log=log) def tearDown(self): self.db._db.close() diff --git a/IPython/parallel/tests/test_view.py b/IPython/parallel/tests/test_view.py index 922019b..6c1a7a0 100644 --- a/IPython/parallel/tests/test_view.py +++ b/IPython/parallel/tests/test_view.py @@ -25,6 +25,7 @@ import zmq from nose import SkipTest from IPython.testing import decorators as dec +from IPython.testing.ipunittest import ParametricTestCase from IPython import parallel as pmod from IPython.parallel import error @@ -39,7 +40,7 @@ from .clienttest import ClusterTestCase, crash, wait, skip_without def setup(): add_engines(3, total=True) -class TestView(ClusterTestCase): +class TestView(ClusterTestCase, ParametricTestCase): def test_z_crash_mux(self): """test graceful handling of engine death (direct)""" @@ -421,16 +422,16 @@ class TestView(ClusterTestCase): sys.stdout = sio ip.magic_autopx() ip.run_cell('\n'.join(('a=5','b=10','c=0'))) - ip.run_cell('print b') + ip.run_cell('b*=2') + ip.run_cell('print (b)') ip.run_cell("b/c") - ip.run_code(compile('b*=2', '', 'single')) ip.magic_autopx() sys.stdout = savestdout output = sio.getvalue().strip() self.assertTrue(output.startswith('%autopx enabled')) self.assertTrue(output.endswith('%autopx disabled')) self.assertTrue('RemoteError: ZeroDivisionError' in output) - ar = v.get_result(-2) + ar = v.get_result(-1) self.assertEquals(v['a'], 5) self.assertEquals(v['b'], 20) self.assertRaisesRemote(ZeroDivisionError, ar.get) @@ -446,9 +447,10 @@ class TestView(ClusterTestCase): sys.stdout = sio ip.magic_autopx() ip.run_cell('\n'.join(('a=5','b=10','c=0'))) - ip.run_cell('print b') + ip.run_cell('print (b)') + ip.run_cell('import time; time.sleep(0.1)') ip.run_cell("b/c") - ip.run_code(compile('b*=2', '', 'single')) + ip.run_cell('b*=2') ip.magic_autopx() sys.stdout = savestdout output = sio.getvalue().strip() @@ -456,9 +458,12 @@ class TestView(ClusterTestCase): self.assertTrue(output.endswith('%autopx disabled')) self.assertFalse('ZeroDivisionError' in output) ar = v.get_result(-2) - self.assertEquals(v['a'], 5) - self.assertEquals(v['b'], 20) self.assertRaisesRemote(ZeroDivisionError, ar.get) + # prevent TaskAborted on pulls, due to ZeroDivisionError + time.sleep(0.5) + self.assertEquals(v['a'], 5) + # b*=2 will not fire, due to abort + self.assertEquals(v['b'], 10) def test_magic_result(self): ip = get_ipython() @@ -550,4 +555,140 @@ class TestView(ClusterTestCase): check = [ -1*i for i in r ] result = e0.map_sync(lambda x: -1*x, r) self.assertEquals(result, check) + + def test_len(self): + """len(view) makes sense""" + e0 = self.client[self.client.ids[0]] + yield self.assertEquals(len(e0), 1) + v = self.client[:] + yield self.assertEquals(len(v), len(self.client.ids)) + v = self.client.direct_view('all') + yield self.assertEquals(len(v), len(self.client.ids)) + v = self.client[:2] + yield self.assertEquals(len(v), 2) + v = self.client[:1] + yield self.assertEquals(len(v), 1) + v = self.client.load_balanced_view() + yield self.assertEquals(len(v), len(self.client.ids)) + # parametric tests seem to require manual closing? + self.client.close() + + + # begin execute tests + def _wait_for(self, f, timeout=10): + tic = time.time() + while time.time() <= tic + timeout: + if f(): + return + time.sleep(0.1) + self.client.spin() + if not f(): + print "Warning: Awaited condition never arrived" + + + def test_execute_reply(self): + e0 = self.client[self.client.ids[0]] + e0.block = True + ar = e0.execute("5", silent=False) + er = ar.get() + self._wait_for(lambda : bool(er.pyout)) + self.assertEquals(str(er), "" % er.execution_count) + self.assertEquals(er.pyout['data']['text/plain'], '5') + + def test_execute_reply_stdout(self): + e0 = self.client[self.client.ids[0]] + e0.block = True + ar = e0.execute("print (5)", silent=False) + er = ar.get() + self._wait_for(lambda : bool(er.stdout)) + self.assertEquals(er.stdout.strip(), '5') + + def test_execute_pyout(self): + """execute triggers pyout with silent=False""" + view = self.client[:] + ar = view.execute("5", silent=False, block=True) + self._wait_for(lambda : all(ar.pyout)) + + expected = [{'text/plain' : '5'}] * len(view) + mimes = [ out['data'] for out in ar.pyout ] + self.assertEquals(mimes, expected) + + def test_execute_silent(self): + """execute does not trigger pyout with silent=True""" + view = self.client[:] + ar = view.execute("5", block=True) + expected = [None] * len(view) + self.assertEquals(ar.pyout, expected) + + def test_execute_magic(self): + """execute accepts IPython commands""" + view = self.client[:] + view.execute("a = 5") + ar = view.execute("%whos", block=True) + # this will raise, if that failed + ar.get(5) + self._wait_for(lambda : all(ar.stdout)) + for stdout in ar.stdout: + lines = stdout.splitlines() + self.assertEquals(lines[0].split(), ['Variable', 'Type', 'Data/Info']) + found = False + for line in lines[2:]: + split = line.split() + if split == ['a', 'int', '5']: + found = True + break + self.assertTrue(found, "whos output wrong: %s" % stdout) + + def test_execute_displaypub(self): + """execute tracks display_pub output""" + view = self.client[:] + view.execute("from IPython.core.display import *") + ar = view.execute("[ display(i) for i in range(5) ]", block=True) + + self._wait_for(lambda : all(len(er.outputs) >= 5 for er in ar)) + expected = [ {u'text/plain' : unicode(j)} for j in range(5) ] + for outputs in ar.outputs: + mimes = [ out['data'] for out in outputs ] + self.assertEquals(mimes, expected) + + def test_apply_displaypub(self): + """apply tracks display_pub output""" + view = self.client[:] + view.execute("from IPython.core.display import *") + + @interactive + def publish(): + [ display(i) for i in range(5) ] + + ar = view.apply_async(publish) + ar.get(5) + self._wait_for(lambda : all(len(out) >= 5 for out in ar.outputs)) + expected = [ {u'text/plain' : unicode(j)} for j in range(5) ] + for outputs in ar.outputs: + mimes = [ out['data'] for out in outputs ] + self.assertEquals(mimes, expected) + + def test_execute_raises(self): + """exceptions in execute requests raise appropriately""" + view = self.client[-1] + ar = view.execute("1/0") + self.assertRaisesRemote(ZeroDivisionError, ar.get, 2) + + @dec.skipif_not_matplotlib + def test_magic_pylab(self): + """%pylab works on engines""" + view = self.client[-1] + ar = view.execute("%pylab inline") + # at least check if this raised: + reply = ar.get(5) + # include imports, in case user config + ar = view.execute("plot(rand(100))", silent=False) + reply = ar.get(5) + self._wait_for(lambda : all(ar.outputs)) + self.assertEquals(len(reply.outputs), 1) + output = reply.outputs[0] + self.assertTrue("data" in output) + data = output['data'] + self.assertTrue("image/png" in data) + diff --git a/IPython/parallel/util.py b/IPython/parallel/util.py index f96704a..5b3e5a6 100644 --- a/IPython/parallel/util.py +++ b/IPython/parallel/util.py @@ -47,6 +47,9 @@ from IPython.utils import py3compat from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence from IPython.utils.newserialized import serialize, unserialize from IPython.zmq.log import EnginePUBHandler +from IPython.zmq.serialize import ( + unserialize_object, serialize_object, pack_apply_message, unpack_apply_message +) if py3compat.PY3: buffer = memoryview @@ -121,12 +124,6 @@ def log_errors(f, self, *args, **kwargs): self.log.error("Uncaught exception in %r" % f, exc_info=True) -def asbytes(s): - """ensure that an object is ascii bytes""" - if isinstance(s, unicode): - s = s.encode('ascii') - return s - def is_url(url): """boolean check for whether a string is a zmq url""" if '://' not in url: @@ -222,140 +219,6 @@ def disambiguate_url(url, location=None): return "%s://%s:%s"%(proto,ip,port) -def serialize_object(obj, threshold=64e-6): - """Serialize an object into a list of sendable buffers. - - Parameters - ---------- - - obj : object - The object to be serialized - threshold : float - The threshold for not double-pickling the content. - - - Returns - ------- - ('pmd', [bufs]) : - where pmd is the pickled metadata wrapper, - bufs is a list of data buffers - """ - databuffers = [] - if isinstance(obj, (list, tuple)): - clist = canSequence(obj) - slist = map(serialize, clist) - for s in slist: - if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold: - databuffers.append(s.getData()) - s.data = None - return pickle.dumps(slist,-1), databuffers - elif isinstance(obj, dict): - sobj = {} - for k in sorted(obj.iterkeys()): - s = serialize(can(obj[k])) - if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold: - databuffers.append(s.getData()) - s.data = None - sobj[k] = s - return pickle.dumps(sobj,-1),databuffers - else: - s = serialize(can(obj)) - if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold: - databuffers.append(s.getData()) - s.data = None - return pickle.dumps(s,-1),databuffers - - -def unserialize_object(bufs): - """reconstruct an object serialized by serialize_object from data buffers.""" - bufs = list(bufs) - sobj = pickle.loads(bufs.pop(0)) - if isinstance(sobj, (list, tuple)): - for s in sobj: - if s.data is None: - s.data = bufs.pop(0) - return uncanSequence(map(unserialize, sobj)), bufs - elif isinstance(sobj, dict): - newobj = {} - for k in sorted(sobj.iterkeys()): - s = sobj[k] - if s.data is None: - s.data = bufs.pop(0) - newobj[k] = uncan(unserialize(s)) - return newobj, bufs - else: - if sobj.data is None: - sobj.data = bufs.pop(0) - return uncan(unserialize(sobj)), bufs - -def pack_apply_message(f, args, kwargs, threshold=64e-6): - """pack up a function, args, and kwargs to be sent over the wire - as a series of buffers. Any object whose data is larger than `threshold` - will not have their data copied (currently only numpy arrays support zero-copy)""" - msg = [pickle.dumps(can(f),-1)] - databuffers = [] # for large objects - sargs, bufs = serialize_object(args,threshold) - msg.append(sargs) - databuffers.extend(bufs) - skwargs, bufs = serialize_object(kwargs,threshold) - msg.append(skwargs) - databuffers.extend(bufs) - msg.extend(databuffers) - return msg - -def unpack_apply_message(bufs, g=None, copy=True): - """unpack f,args,kwargs from buffers packed by pack_apply_message() - Returns: original f,args,kwargs""" - bufs = list(bufs) # allow us to pop - assert len(bufs) >= 3, "not enough buffers!" - if not copy: - for i in range(3): - bufs[i] = bufs[i].bytes - cf = pickle.loads(bufs.pop(0)) - sargs = list(pickle.loads(bufs.pop(0))) - skwargs = dict(pickle.loads(bufs.pop(0))) - # print sargs, skwargs - f = uncan(cf, g) - for sa in sargs: - if sa.data is None: - m = bufs.pop(0) - if sa.getTypeDescriptor() in ('buffer', 'ndarray'): - # always use a buffer, until memoryviews get sorted out - sa.data = buffer(m) - # disable memoryview support - # if copy: - # sa.data = buffer(m) - # else: - # sa.data = m.buffer - else: - if copy: - sa.data = m - else: - sa.data = m.bytes - - args = uncanSequence(map(unserialize, sargs), g) - kwargs = {} - for k in sorted(skwargs.iterkeys()): - sa = skwargs[k] - if sa.data is None: - m = bufs.pop(0) - if sa.getTypeDescriptor() in ('buffer', 'ndarray'): - # always use a buffer, until memoryviews get sorted out - sa.data = buffer(m) - # disable memoryview support - # if copy: - # sa.data = buffer(m) - # else: - # sa.data = m.buffer - else: - if copy: - sa.data = m - else: - sa.data = m.bytes - - kwargs[k] = uncan(unserialize(sa), g) - - return f,args,kwargs #-------------------------------------------------------------------------- # helpers for implementing old MEC API via view.apply diff --git a/IPython/zmq/displayhook.py b/IPython/zmq/displayhook.py index 358c979..a3f76ab 100644 --- a/IPython/zmq/displayhook.py +++ b/IPython/zmq/displayhook.py @@ -43,6 +43,7 @@ class ZMQShellDisplayHook(DisplayHook): """A displayhook subclass that publishes data using ZeroMQ. This is intended to work with an InteractiveShell instance. It sends a dict of different representations of the object.""" + topic=None session = Instance(Session) pub_socket = Instance('zmq.Socket') @@ -67,6 +68,6 @@ class ZMQShellDisplayHook(DisplayHook): """Finish up all displayhook activities.""" sys.stdout.flush() sys.stderr.flush() - self.session.send(self.pub_socket, self.msg) + self.session.send(self.pub_socket, self.msg, ident=self.topic) self.msg = None diff --git a/IPython/zmq/eventloops.py b/IPython/zmq/eventloops.py index 3b8b895..4df3b26 100644 --- a/IPython/zmq/eventloops.py +++ b/IPython/zmq/eventloops.py @@ -20,8 +20,10 @@ import sys import zmq # Local imports. +from IPython.config.application import Application from IPython.utils import io + #------------------------------------------------------------------------------ # Eventloops for integrating the Kernel into different GUIs #------------------------------------------------------------------------------ @@ -164,7 +166,10 @@ def loop_cocoa(kernel): # but still need a Poller for when there are no active windows, # during which time mainloop() returns immediately poller = zmq.Poller() - poller.register(kernel.shell_socket, zmq.POLLIN) + if kernel.control_stream: + poller.register(kernel.control_stream.socket, zmq.POLLIN) + for stream in kernel.shell_streams: + poller.register(stream.socket, zmq.POLLIN) while True: try: @@ -203,11 +208,15 @@ loop_map = { def enable_gui(gui, kernel=None): """Enable integration with a given GUI""" - if kernel is None: - from .ipkernel import IPKernelApp - kernel = IPKernelApp.instance().kernel if gui not in loop_map: raise ValueError("GUI %r not supported" % gui) + if kernel is None: + if Application.initialized(): + kernel = getattr(Application.instance(), 'kernel', None) + if kernel is None: + raise RuntimeError("You didn't specify a kernel," + " and no IPython Application with a kernel appears to be running." + ) loop = loop_map[gui] if kernel.eventloop is not None and kernel.eventloop is not loop: raise RuntimeError("Cannot activate multiple GUI eventloops") diff --git a/IPython/zmq/ipkernel.py b/IPython/zmq/ipkernel.py index d57103a..2775d54 100755 --- a/IPython/zmq/ipkernel.py +++ b/IPython/zmq/ipkernel.py @@ -15,20 +15,26 @@ Things to do: #----------------------------------------------------------------------------- from __future__ import print_function -# Standard library imports. +# Standard library imports import __builtin__ import atexit import sys import time import traceback import logging +import uuid + +from datetime import datetime from signal import ( - signal, default_int_handler, SIGINT, SIG_IGN + signal, getsignal, default_int_handler, SIGINT, SIG_IGN ) -# System library imports. + +# System library imports import zmq +from zmq.eventloop import ioloop +from zmq.eventloop.zmqstream import ZMQStream -# Local imports. +# Local imports from IPython.core import pylabtools from IPython.config.configurable import Configurable from IPython.config.application import boolean_flag, catch_config_error @@ -42,11 +48,12 @@ from IPython.utils import py3compat from IPython.utils.frame import extract_module_locals from IPython.utils.jsonutil import json_clean from IPython.utils.traitlets import ( - Any, Instance, Float, Dict, CaselessStrEnum + Any, Instance, Float, Dict, CaselessStrEnum, List, Set, Integer, Unicode ) from entry_point import base_launch_kernel from kernelapp import KernelApp, kernel_flags, kernel_aliases +from serialize import serialize_object, unpack_apply_message from session import Session, Message from zmqshell import ZMQInteractiveShell @@ -63,16 +70,21 @@ class Kernel(Configurable): # attribute to override with a GUI eventloop = Any(None) + def _eventloop_changed(self, name, old, new): + """schedule call to eventloop from IOLoop""" + loop = ioloop.IOLoop.instance() + loop.add_timeout(time.time()+0.1, self.enter_eventloop) shell = Instance('IPython.core.interactiveshell.InteractiveShellABC') session = Instance(Session) profile_dir = Instance('IPython.core.profiledir.ProfileDir') - shell_socket = Instance('zmq.Socket') - iopub_socket = Instance('zmq.Socket') - stdin_socket = Instance('zmq.Socket') + shell_streams = List() + control_stream = Instance(ZMQStream) + iopub_socket = Instance(zmq.Socket) + stdin_socket = Instance(zmq.Socket) log = Instance(logging.Logger) - user_module = Instance('types.ModuleType') + user_module = Any() def _user_module_changed(self, name, old, new): if self.shell is not None: self.shell.user_module = new @@ -83,8 +95,16 @@ class Kernel(Configurable): self.shell.user_ns = new self.shell.init_user_ns() - # Private interface + # identities: + int_id = Integer(-1) + ident = Unicode() + + def _ident_default(self): + return unicode(uuid.uuid4()) + + # Private interface + # Time to sleep after flushing the stdout/err buffers in each execute # cycle. While this introduces a hard limit on the minimal latency of the # execute cycle, it helps prevent output synchronization problems for @@ -109,16 +129,14 @@ class Kernel(Configurable): # This is a dict of port number that the kernel is listening on. It is set # by record_ports and used by connect_request. _recorded_ports = Dict() - + + # set of aborted msg_ids + aborted = Set() def __init__(self, **kwargs): super(Kernel, self).__init__(**kwargs) - # Before we even start up the shell, register *first* our exit handlers - # so they come before the shell's - atexit.register(self._at_shutdown) - # Initialize the InteractiveShell subclass self.shell = ZMQInteractiveShell.instance(config=self.config, profile_dir = self.profile_dir, @@ -127,6 +145,7 @@ class Kernel(Configurable): ) self.shell.displayhook.session = self.session self.shell.displayhook.pub_socket = self.iopub_socket + self.shell.displayhook.topic = self._topic('pyout') self.shell.display_pub.session = self.session self.shell.display_pub.pub_socket = self.iopub_socket @@ -136,96 +155,131 @@ class Kernel(Configurable): # Build dict of handlers for message types msg_types = [ 'execute_request', 'complete_request', 'object_info_request', 'history_request', - 'connect_request', 'shutdown_request'] - self.handlers = {} + 'connect_request', 'shutdown_request', + 'apply_request', + ] + self.shell_handlers = {} for msg_type in msg_types: - self.handlers[msg_type] = getattr(self, msg_type) - - def do_one_iteration(self): - """Do one iteration of the kernel's evaluation loop. - """ + self.shell_handlers[msg_type] = getattr(self, msg_type) + + control_msg_types = msg_types + [ 'clear_request', 'abort_request' ] + self.control_handlers = {} + for msg_type in control_msg_types: + self.control_handlers[msg_type] = getattr(self, msg_type) + + def dispatch_control(self, msg): + """dispatch control requests""" + idents,msg = self.session.feed_identities(msg, copy=False) try: - ident,msg = self.session.recv(self.shell_socket, zmq.NOBLOCK) - except Exception: - self.log.warn("Invalid Message:", exc_info=True) - return - if msg is None: + msg = self.session.unserialize(msg, content=True, copy=False) + except: + self.log.error("Invalid Control Message", exc_info=True) return - msg_type = msg['header']['msg_type'] + self.log.debug("Control received: %s", msg) + + header = msg['header'] + msg_id = header['msg_id'] + msg_type = header['msg_type'] - # This assert will raise in versions of zeromq 2.0.7 and lesser. - # We now require 2.0.8 or above, so we can uncomment for safety. - # print(ident,msg, file=sys.__stdout__) - assert ident is not None, "Missing message part." + handler = self.control_handlers.get(msg_type, None) + if handler is None: + self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type) + else: + try: + handler(self.control_stream, idents, msg) + except Exception: + self.log.error("Exception in control handler:", exc_info=True) + + def dispatch_shell(self, stream, msg): + """dispatch shell requests""" + # flush control requests first + if self.control_stream: + self.control_stream.flush() + + idents,msg = self.session.feed_identities(msg, copy=False) + try: + msg = self.session.unserialize(msg, content=True, copy=False) + except: + self.log.error("Invalid Message", exc_info=True) + return + header = msg['header'] + msg_id = header['msg_id'] + msg_type = msg['header']['msg_type'] + # Print some info about this message and leave a '--->' marker, so it's # easier to trace visually the message chain when debugging. Each # handler prints its message at the end. - self.log.debug('\n*** MESSAGE TYPE:'+str(msg_type)+'***') - self.log.debug(' Content: '+str(msg['content'])+'\n --->\n ') + self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type) + self.log.debug(' Content: %s\n --->\n ', msg['content']) - # Find and call actual handler for message - handler = self.handlers.get(msg_type, None) + if msg_id in self.aborted: + self.aborted.remove(msg_id) + # is it safe to assume a msg_id will not be resubmitted? + reply_type = msg_type.split('_')[0] + '_reply' + status = {'status' : 'aborted'} + sub = {'engine' : self.ident} + sub.update(status) + reply_msg = self.session.send(stream, reply_type, subheader=sub, + content=status, parent=msg, ident=idents) + return + + handler = self.shell_handlers.get(msg_type, None) if handler is None: - self.log.error("UNKNOWN MESSAGE TYPE:" +str(msg)) + self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type) else: - handler(ident, msg) - - # Check whether we should exit, in case the incoming message set the - # exit flag on - if self.shell.exit_now: - self.log.debug('\nExiting IPython kernel...') - # We do a normal, clean exit, which allows any actions registered - # via atexit (such as history saving) to take place. - sys.exit(0) - - - def start(self): - """ Start the kernel main loop. - """ - # a KeyboardInterrupt (SIGINT) can occur on any python statement, so - # let's ignore (SIG_IGN) them until we're in a place to handle them properly - signal(SIGINT,SIG_IGN) - poller = zmq.Poller() - poller.register(self.shell_socket, zmq.POLLIN) - # loop while self.eventloop has not been overridden - while self.eventloop is None: + # ensure default_int_handler during handler call + sig = signal(SIGINT, default_int_handler) try: - # scale by extra factor of 10, because there is no - # reason for this to be anything less than ~ 0.1s - # since it is a real poller and will respond - # to events immediately - - # double nested try/except, to properly catch KeyboardInterrupt - # due to pyzmq Issue #130 - try: - poller.poll(10*1000*self._poll_interval) - # restore raising of KeyboardInterrupt - signal(SIGINT, default_int_handler) - self.do_one_iteration() - except: - raise - finally: - # prevent raising of KeyboardInterrupt - signal(SIGINT,SIG_IGN) - except KeyboardInterrupt: - # Ctrl-C shouldn't crash the kernel - io.raw_print("KeyboardInterrupt caught in kernel") - # stop ignoring sigint, now that we are out of our own loop, - # we don't want to prevent future code from handling it + handler(stream, idents, msg) + except Exception: + self.log.error("Exception in message handler:", exc_info=True) + finally: + signal(SIGINT, sig) + + def enter_eventloop(self): + """enter eventloop""" + self.log.info("entering eventloop") + # restore default_int_handler signal(SIGINT, default_int_handler) while self.eventloop is not None: try: self.eventloop(self) except KeyboardInterrupt: # Ctrl-C shouldn't crash the kernel - io.raw_print("KeyboardInterrupt caught in kernel") + self.log.error("KeyboardInterrupt caught in kernel") continue else: # eventloop exited cleanly, this means we should stop (right?) self.eventloop = None break + self.log.info("exiting eventloop") + # if eventloop exits, IOLoop should stop + ioloop.IOLoop.instance().stop() + + def start(self): + """register dispatchers for streams""" + self.shell.exit_now = False + if self.control_stream: + self.control_stream.on_recv(self.dispatch_control, copy=False) + + def make_dispatcher(stream): + def dispatcher(msg): + return self.dispatch_shell(stream, msg) + return dispatcher + + for s in self.shell_streams: + s.on_recv(make_dispatcher(s), copy=False) + + def do_one_iteration(self): + """step eventloop just once""" + if self.control_stream: + self.control_stream.flush() + for stream in self.shell_streams: + # handle at most one request per iteration + stream.flush(zmq.POLLIN, 1) + stream.flush(zmq.POLLOUT) def record_ports(self, ports): @@ -239,19 +293,31 @@ class Kernel(Configurable): #--------------------------------------------------------------------------- # Kernel request handlers #--------------------------------------------------------------------------- - + + def _make_subheader(self): + """init subheader dict, for execute/apply_reply""" + return { + 'dependencies_met' : True, + 'engine' : self.ident, + 'started': datetime.now(), + } + def _publish_pyin(self, code, parent, execution_count): """Publish the code request on the pyin stream.""" - self.session.send(self.iopub_socket, u'pyin', {u'code':code, - u'execution_count': execution_count}, parent=parent) + self.session.send(self.iopub_socket, u'pyin', + {u'code':code, u'execution_count': execution_count}, + parent=parent, ident=self._topic('pyin') + ) - def execute_request(self, ident, parent): + def execute_request(self, stream, ident, parent): self.session.send(self.iopub_socket, u'status', {u'execution_state':u'busy'}, - parent=parent ) + parent=parent, + ident=self._topic('status'), + ) try: content = parent[u'content'] @@ -259,8 +325,10 @@ class Kernel(Configurable): silent = content[u'silent'] except: self.log.error("Got bad msg: ") - self.log.error(str(Message(parent))) + self.log.error("%s", parent) return + + sub = self._make_subheader() shell = self.shell # we'll need this a lot here @@ -289,14 +357,8 @@ class Kernel(Configurable): reply_content = {} try: - if silent: - # run_code uses 'exec' mode, so no displayhook will fire, and it - # doesn't call logging or history manipulations. Print - # statements in that code will obviously still execute. - shell.run_code(code) - else: - # FIXME: the shell calls the exception handler itself. - shell.run_cell(code, store_history=True) + # FIXME: the shell calls the exception handler itself. + shell.run_cell(code, store_history=not silent, silent=silent) except: status = u'error' # FIXME: this code right now isn't being used yet by default, @@ -314,7 +376,7 @@ class Kernel(Configurable): reply_content[u'status'] = status # Return the execution counter so clients can display prompts - reply_content['execution_count'] = shell.execution_count -1 + reply_content['execution_count'] = shell.execution_count - 1 # FIXME - fish exception info out of shell, possibly left there by # runlines. We'll need to clean up this logic later. @@ -327,9 +389,9 @@ class Kernel(Configurable): # or not. If it did, we proceed to evaluate user_variables/expressions if reply_content['status'] == 'ok': reply_content[u'user_variables'] = \ - shell.user_variables(content[u'user_variables']) + shell.user_variables(content.get(u'user_variables', [])) reply_content[u'user_expressions'] = \ - shell.user_expressions(content[u'user_expressions']) + shell.user_expressions(content.get(u'user_expressions', {})) else: # If there was an error, don't even try to compute variables or # expressions @@ -355,40 +417,49 @@ class Kernel(Configurable): # Send the reply. reply_content = json_clean(reply_content) - reply_msg = self.session.send(self.shell_socket, u'execute_reply', - reply_content, parent, ident=ident) - self.log.debug(str(reply_msg)) + + sub['status'] = reply_content['status'] + if reply_content['status'] == 'error' and \ + reply_content['ename'] == 'UnmetDependency': + sub['dependencies_met'] = False + + reply_msg = self.session.send(stream, u'execute_reply', + reply_content, parent, subheader=sub, + ident=ident) + + self.log.debug("%s", reply_msg) - if reply_msg['content']['status'] == u'error': - self._abort_queue() + if not silent and reply_msg['content']['status'] == u'error': + self._abort_queues() self.session.send(self.iopub_socket, u'status', {u'execution_state':u'idle'}, - parent=parent ) + parent=parent, + ident=self._topic('status')) - def complete_request(self, ident, parent): + def complete_request(self, stream, ident, parent): txt, matches = self._complete(parent) matches = {'matches' : matches, 'matched_text' : txt, 'status' : 'ok'} matches = json_clean(matches) - completion_msg = self.session.send(self.shell_socket, 'complete_reply', + completion_msg = self.session.send(stream, 'complete_reply', matches, parent, ident) - self.log.debug(str(completion_msg)) + self.log.debug("%s", completion_msg) - def object_info_request(self, ident, parent): + def object_info_request(self, stream, ident, parent): content = parent['content'] object_info = self.shell.object_inspect(content['oname'], detail_level = content.get('detail_level', 0) ) # Before we send this object over, we scrub it for JSON usage oinfo = json_clean(object_info) - msg = self.session.send(self.shell_socket, 'object_info_reply', + msg = self.session.send(stream, 'object_info_reply', oinfo, parent, ident) - self.log.debug(msg) + self.log.debug("%s", msg) - def history_request(self, ident, parent): + def history_request(self, stream, ident, parent): # We need to pull these out, as passing **kwargs doesn't work with # unicode keys before Python 2.6.5. hist_access_type = parent['content']['hist_access_type'] @@ -416,51 +487,183 @@ class Kernel(Configurable): hist = list(hist) content = {'history' : hist} content = json_clean(content) - msg = self.session.send(self.shell_socket, 'history_reply', + msg = self.session.send(stream, 'history_reply', content, parent, ident) self.log.debug("Sending history reply with %i entries", len(hist)) - def connect_request(self, ident, parent): + def connect_request(self, stream, ident, parent): if self._recorded_ports is not None: content = self._recorded_ports.copy() else: content = {} - msg = self.session.send(self.shell_socket, 'connect_reply', + msg = self.session.send(stream, 'connect_reply', content, parent, ident) - self.log.debug(msg) + self.log.debug("%s", msg) - def shutdown_request(self, ident, parent): + def shutdown_request(self, stream, ident, parent): self.shell.exit_now = True + content = dict(status='ok') + content.update(parent['content']) + self.session.send(stream, u'shutdown_reply', content, parent, ident=ident) + # same content, but different msg_id for broadcasting on IOPub self._shutdown_message = self.session.msg(u'shutdown_reply', - parent['content'], parent) - sys.exit(0) + content, parent + ) + + self._at_shutdown() + # call sys.exit after a short delay + loop = ioloop.IOLoop.instance() + loop.add_timeout(time.time()+0.1, loop.stop) + + #--------------------------------------------------------------------------- + # Engine methods + #--------------------------------------------------------------------------- + + def apply_request(self, stream, ident, parent): + try: + content = parent[u'content'] + bufs = parent[u'buffers'] + msg_id = parent['header']['msg_id'] + except: + self.log.error("Got bad msg: %s", parent, exc_info=True) + return + + # Set the parent message of the display hook and out streams. + self.shell.displayhook.set_parent(parent) + self.shell.display_pub.set_parent(parent) + sys.stdout.set_parent(parent) + sys.stderr.set_parent(parent) + + # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) + # self.iopub_socket.send(pyin_msg) + # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent) + sub = self._make_subheader() + try: + working = self.shell.user_ns + + prefix = "_"+str(msg_id).replace("-","")+"_" + + f,args,kwargs = unpack_apply_message(bufs, working, copy=False) + + fname = getattr(f, '__name__', 'f') + + fname = prefix+"f" + argname = prefix+"args" + kwargname = prefix+"kwargs" + resultname = prefix+"result" + + ns = { fname : f, argname : args, kwargname : kwargs , resultname : None } + # print ns + working.update(ns) + code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname) + try: + exec code in self.shell.user_global_ns, self.shell.user_ns + result = working.get(resultname) + finally: + for key in ns.iterkeys(): + working.pop(key) + + packed_result,buf = serialize_object(result) + result_buf = [packed_result]+buf + except: + exc_content = self._wrap_exception('apply') + # exc_msg = self.session.msg(u'pyerr', exc_content, parent) + self.session.send(self.iopub_socket, u'pyerr', exc_content, parent=parent, + ident=self._topic('pyerr')) + reply_content = exc_content + result_buf = [] + + if exc_content['ename'] == 'UnmetDependency': + sub['dependencies_met'] = False + else: + reply_content = {'status' : 'ok'} + + # put 'ok'/'error' status in header, for scheduler introspection: + sub['status'] = reply_content['status'] + + # flush i/o + sys.stdout.flush() + sys.stderr.flush() + + reply_msg = self.session.send(stream, u'apply_reply', reply_content, + parent=parent, ident=ident,buffers=result_buf, subheader=sub) + + #--------------------------------------------------------------------------- + # Control messages + #--------------------------------------------------------------------------- + + def abort_request(self, stream, ident, parent): + """abort a specifig msg by id""" + msg_ids = parent['content'].get('msg_ids', None) + if isinstance(msg_ids, basestring): + msg_ids = [msg_ids] + if not msg_ids: + self.abort_queues() + for mid in msg_ids: + self.aborted.add(str(mid)) + + content = dict(status='ok') + reply_msg = self.session.send(stream, 'abort_reply', content=content, + parent=parent, ident=ident) + self.log.debug("%s", reply_msg) + + def clear_request(self, stream, idents, parent): + """Clear our namespace.""" + self.shell.reset(False) + msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent, + content = dict(status='ok')) + #--------------------------------------------------------------------------- # Protected interface #--------------------------------------------------------------------------- - def _abort_queue(self): + + def _wrap_exception(self, method=None): + # import here, because _wrap_exception is only used in parallel, + # and parallel has higher min pyzmq version + from IPython.parallel.error import wrap_exception + e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method) + content = wrap_exception(e_info) + return content + + def _topic(self, topic): + """prefixed topic for IOPub messages""" + if self.int_id >= 0: + base = "engine.%i" % self.int_id + else: + base = "kernel.%s" % self.ident + + return py3compat.cast_bytes("%s.%s" % (base, topic)) + + def _abort_queues(self): + for stream in self.shell_streams: + if stream: + self._abort_queue(stream) + + def _abort_queue(self, stream): + poller = zmq.Poller() + poller.register(stream.socket, zmq.POLLIN) while True: - try: - ident,msg = self.session.recv(self.shell_socket, zmq.NOBLOCK) - except Exception: - self.log.warn("Invalid Message:", exc_info=True) - continue + idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True) if msg is None: - break - else: - assert ident is not None, \ - "Unexpected missing message part." + return - self.log.debug("Aborting:\n"+str(Message(msg))) + self.log.info("Aborting:") + self.log.info("%s", msg) msg_type = msg['header']['msg_type'] reply_type = msg_type.split('_')[0] + '_reply' - reply_msg = self.session.send(self.shell_socket, reply_type, - {'status' : 'aborted'}, msg, ident=ident) - self.log.debug(reply_msg) + + status = {'status' : 'aborted'} + sub = {'engine' : self.ident} + sub.update(status) + reply_msg = self.session.send(stream, reply_type, subheader=sub, + content=status, parent=msg, ident=idents) + self.log.debug("%s", reply_msg) # We need to wait a bit for requests to come in. This can probably # be set shorter for true asynchronous clients. - time.sleep(0.1) + poller.poll(50) + def _no_raw_input(self): """Raise StdinNotImplentedError if active frontend doesn't support @@ -490,7 +693,7 @@ class Kernel(Configurable): value = reply['content']['value'] except: self.log.error("Got bad raw_input reply: ") - self.log.error(str(Message(parent))) + self.log.error("%s", parent) value = '' if value == '\x04': # EOF @@ -545,12 +748,9 @@ class Kernel(Configurable): """ # io.rprint("Kernel at_shutdown") # dbg if self._shutdown_message is not None: - self.session.send(self.shell_socket, self._shutdown_message) - self.session.send(self.iopub_socket, self._shutdown_message) - self.log.debug(str(self._shutdown_message)) - # A very short sleep to give zmq time to flush its message buffers - # before Python truly shuts down. - time.sleep(0.01) + self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown')) + self.log.debug("%s", self._shutdown_message) + [ s.flush(zmq.POLLOUT) for s in self.shell_streams ] #----------------------------------------------------------------------------- # Aliases and Flags for the IPKernelApp @@ -603,9 +803,11 @@ class IPKernelApp(KernelApp, InteractiveShellApp): self.init_code() def init_kernel(self): + + shell_stream = ZMQStream(self.shell_socket) kernel = Kernel(config=self.config, session=self.session, - shell_socket=self.shell_socket, + shell_streams=[shell_stream], iopub_socket=self.iopub_socket, stdin_socket=self.stdin_socket, log=self.log, @@ -687,6 +889,13 @@ def embed_kernel(module=None, local_ns=None, **kwargs): else: app = IPKernelApp.instance(**kwargs) app.initialize([]) + # Undo unnecessary sys module mangling from init_sys_modules. + # This would not be necessary if we could prevent it + # in the first place by using a different InteractiveShell + # subclass, as in the regular embed case. + main = app.kernel.shell._orig_sys_modules_main_mod + if main is not None: + sys.modules[app.kernel.shell._orig_sys_modules_main_name] = main # load the calling scope if not given (caller_module, caller_locals) = extract_module_locals(1) diff --git a/IPython/zmq/kernelapp.py b/IPython/zmq/kernelapp.py index e7eec92..feec06c 100644 --- a/IPython/zmq/kernelapp.py +++ b/IPython/zmq/kernelapp.py @@ -15,15 +15,18 @@ Authors # Imports #----------------------------------------------------------------------------- -# Standard library imports. +# Standard library imports +import atexit import json import os import sys +import signal -# System library imports. +# System library imports import zmq +from zmq.eventloop import ioloop -# IPython imports. +# IPython imports from IPython.core.ultratb import FormattedTB from IPython.core.application import ( BaseIPythonApplication, base_flags, base_aliases, catch_config_error @@ -82,17 +85,18 @@ kernel_flags.update(session_flags) #----------------------------------------------------------------------------- class KernelApp(BaseIPythonApplication): - name='pykernel' + name='ipkernel' aliases = Dict(kernel_aliases) flags = Dict(kernel_flags) classes = [Session] # the kernel class, as an importstring - kernel_class = DottedObjectName('IPython.zmq.pykernel.Kernel') + kernel_class = DottedObjectName('IPython.zmq.ipkernel.Kernel') kernel = Any() poller = Any() # don't restrict this even though current pollers are all Threads heartbeat = Instance(Heartbeat) session = Instance('IPython.zmq.session.Session') ports = Dict() + _full_connection_file = Unicode() # inherit config file name from parent: parent_appname = Unicode(config=True) @@ -163,6 +167,8 @@ class KernelApp(BaseIPythonApplication): fname = filefind(self.connection_file, ['.', self.profile_dir.security_dir]) except IOError: self.log.debug("Connection file not found: %s", self.connection_file) + # This means I own it, so I will clean it up: + atexit.register(self.cleanup_connection_file) return self.log.debug(u"Loading connection file %s", fname) with open(fname) as f: @@ -188,6 +194,16 @@ class KernelApp(BaseIPythonApplication): write_connection_file(cf, ip=self.ip, key=self.session.key, shell_port=self.shell_port, stdin_port=self.stdin_port, hb_port=self.hb_port, iopub_port=self.iopub_port) + + self._full_connection_file = cf + + def cleanup_connection_file(self): + cf = self._full_connection_file + self.log.debug("cleaning up connection file: %r", cf) + try: + os.remove(cf) + except (IOError, OSError): + pass def init_connection_file(self): if not self.connection_file: @@ -216,18 +232,23 @@ class KernelApp(BaseIPythonApplication): self.stdin_socket = context.socket(zmq.ROUTER) self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port) self.log.debug("stdin ROUTER Channel on port: %i"%self.stdin_port) - + + def init_heartbeat(self): + """start the heart beating""" # heartbeat doesn't share context, because it mustn't be blocked # by the GIL, which is accessed by libzmq when freeing zero-copy messages hb_ctx = zmq.Context() self.heartbeat = Heartbeat(hb_ctx, (self.ip, self.hb_port)) self.hb_port = self.heartbeat.port self.log.debug("Heartbeat REP Channel on port: %i"%self.hb_port) + self.heartbeat.start() # Helper to make it easier to connect to an existing kernel. # set log-level to critical, to make sure it is output self.log.critical("To connect another client to this kernel, use:") - + + def log_connection_info(self): + """display connection info, and store ports""" basename = os.path.basename(self.connection_file) if basename == self.connection_file or \ os.path.dirname(self.connection_file) == self.profile_dir.security_dir: @@ -267,6 +288,9 @@ class KernelApp(BaseIPythonApplication): displayhook_factory = import_item(str(self.displayhook_class)) sys.displayhook = displayhook_factory(self.session, self.iopub_socket) + def init_signal(self): + signal.signal(signal.SIGINT, signal.SIG_IGN) + def init_kernel(self): """Create the Kernel object itself""" kernel_factory = import_item(str(self.kernel_class)) @@ -286,9 +310,12 @@ class KernelApp(BaseIPythonApplication): self.init_session() self.init_poller() self.init_sockets() - # writing connection file must be *after* init_sockets + self.init_heartbeat() + # writing/displaying connection info must be *after* init_sockets/heartbeat + self.log_connection_info() self.write_connection_file() self.init_io() + self.init_signal() self.init_kernel() # flush stdout/stderr, so that anything written to these streams during # initialization do not get associated with the first execution request @@ -296,11 +323,11 @@ class KernelApp(BaseIPythonApplication): sys.stderr.flush() def start(self): - self.heartbeat.start() if self.poller is not None: self.poller.start() + self.kernel.start() try: - self.kernel.start() + ioloop.IOLoop.instance().start() except KeyboardInterrupt: pass diff --git a/IPython/zmq/kernelmanager.py b/IPython/zmq/kernelmanager.py index a350866..b175af5 100644 --- a/IPython/zmq/kernelmanager.py +++ b/IPython/zmq/kernelmanager.py @@ -781,9 +781,6 @@ class KernelManager(HasTraits): Parameters: ----------- - ipython : bool, optional (default True) - Whether to use an IPython kernel instead of a plain Python kernel. - launcher : callable, optional (default None) A custom function for launching the kernel process (generally a wrapper around ``entry_point.base_launch_kernel``). In most cases, @@ -805,10 +802,7 @@ class KernelManager(HasTraits): self._launch_args = kw.copy() launch_kernel = kw.pop('launcher', None) if launch_kernel is None: - if kw.pop('ipython', True): - from ipkernel import launch_kernel - else: - from pykernel import launch_kernel + from ipkernel import launch_kernel self.kernel = launch_kernel(fname=self.connection_file, **kw) def shutdown_kernel(self, restart=False): diff --git a/IPython/zmq/pykernel.py b/IPython/zmq/pykernel.py deleted file mode 100755 index 9bdaa9e..0000000 --- a/IPython/zmq/pykernel.py +++ /dev/null @@ -1,282 +0,0 @@ -#!/usr/bin/env python -"""A simple interactive kernel that talks to a frontend over 0MQ. - -Things to do: - -* Implement `set_parent` logic. Right before doing exec, the Kernel should - call set_parent on all the PUB objects with the message about to be executed. -* Implement random port and security key logic. -* Implement control messages. -* Implement event loop and poll version. -""" - -#----------------------------------------------------------------------------- -# Imports -#----------------------------------------------------------------------------- - -# Standard library imports. -import __builtin__ -from code import CommandCompiler -import sys -import time -import traceback - -# System library imports. -import zmq - -# Local imports. -from IPython.utils import py3compat -from IPython.utils.traitlets import HasTraits, Instance, Dict, Float -from completer import KernelCompleter -from entry_point import base_launch_kernel -from session import Session, Message -from kernelapp import KernelApp - -#----------------------------------------------------------------------------- -# Main kernel class -#----------------------------------------------------------------------------- - -class Kernel(HasTraits): - - # Private interface - - # Time to sleep after flushing the stdout/err buffers in each execute - # cycle. While this introduces a hard limit on the minimal latency of the - # execute cycle, it helps prevent output synchronization problems for - # clients. - # Units are in seconds. The minimum zmq latency on local host is probably - # ~150 microseconds, set this to 500us for now. We may need to increase it - # a little if it's not enough after more interactive testing. - _execute_sleep = Float(0.0005, config=True) - - # This is a dict of port number that the kernel is listening on. It is set - # by record_ports and used by connect_request. - _recorded_ports = Dict() - - #--------------------------------------------------------------------------- - # Kernel interface - #--------------------------------------------------------------------------- - - session = Instance(Session) - shell_socket = Instance('zmq.Socket') - iopub_socket = Instance('zmq.Socket') - stdin_socket = Instance('zmq.Socket') - log = Instance('logging.Logger') - - def __init__(self, **kwargs): - super(Kernel, self).__init__(**kwargs) - self.user_ns = {} - self.history = [] - self.compiler = CommandCompiler() - self.completer = KernelCompleter(self.user_ns) - - # Build dict of handlers for message types - msg_types = [ 'execute_request', 'complete_request', - 'object_info_request', 'shutdown_request' ] - self.handlers = {} - for msg_type in msg_types: - self.handlers[msg_type] = getattr(self, msg_type) - - def start(self): - """ Start the kernel main loop. - """ - while True: - ident,msg = self.session.recv(self.shell_socket,0) - assert ident is not None, "Missing message part." - omsg = Message(msg) - self.log.debug(str(omsg)) - handler = self.handlers.get(omsg.msg_type, None) - if handler is None: - self.log.error("UNKNOWN MESSAGE TYPE: %s"%omsg) - else: - handler(ident, omsg) - - def record_ports(self, ports): - """Record the ports that this kernel is using. - - The creator of the Kernel instance must call this methods if they - want the :meth:`connect_request` method to return the port numbers. - """ - self._recorded_ports = ports - - #--------------------------------------------------------------------------- - # Kernel request handlers - #--------------------------------------------------------------------------- - - def execute_request(self, ident, parent): - try: - code = parent[u'content'][u'code'] - except: - self.log.error("Got bad msg: %s"%Message(parent)) - return - pyin_msg = self.session.send(self.iopub_socket, u'pyin',{u'code':code}, parent=parent) - - try: - comp_code = self.compiler(code, '') - - # Replace raw_input. Note that is not sufficient to replace - # raw_input in the user namespace. - raw_input = lambda prompt='': self._raw_input(prompt, ident, parent) - if py3compat.PY3: - __builtin__.input = raw_input - else: - __builtin__.raw_input = raw_input - - # Set the parent message of the display hook and out streams. - sys.displayhook.set_parent(parent) - sys.stdout.set_parent(parent) - sys.stderr.set_parent(parent) - - exec comp_code in self.user_ns, self.user_ns - except: - etype, evalue, tb = sys.exc_info() - tb = traceback.format_exception(etype, evalue, tb) - exc_content = { - u'status' : u'error', - u'traceback' : tb, - u'ename' : unicode(etype.__name__), - u'evalue' : unicode(evalue) - } - exc_msg = self.session.send(self.iopub_socket, u'pyerr', exc_content, parent) - reply_content = exc_content - else: - reply_content = { 'status' : 'ok', 'payload' : {} } - - # Flush output before sending the reply. - sys.stderr.flush() - sys.stdout.flush() - # FIXME: on rare occasions, the flush doesn't seem to make it to the - # clients... This seems to mitigate the problem, but we definitely need - # to better understand what's going on. - if self._execute_sleep: - time.sleep(self._execute_sleep) - - # Send the reply. - reply_msg = self.session.send(self.shell_socket, u'execute_reply', reply_content, parent, ident=ident) - self.log.debug(Message(reply_msg)) - if reply_msg['content']['status'] == u'error': - self._abort_queue() - - def complete_request(self, ident, parent): - matches = {'matches' : self._complete(parent), - 'status' : 'ok'} - completion_msg = self.session.send(self.shell_socket, 'complete_reply', - matches, parent, ident) - self.log.debug(completion_msg) - - def object_info_request(self, ident, parent): - context = parent['content']['oname'].split('.') - object_info = self._object_info(context) - msg = self.session.send(self.shell_socket, 'object_info_reply', - object_info, parent, ident) - self.log.debug(msg) - - def shutdown_request(self, ident, parent): - content = dict(parent['content']) - msg = self.session.send(self.shell_socket, 'shutdown_reply', - content, parent, ident) - msg = self.session.send(self.iopub_socket, 'shutdown_reply', - content, parent, ident) - self.log.debug(msg) - time.sleep(0.1) - sys.exit(0) - - #--------------------------------------------------------------------------- - # Protected interface - #--------------------------------------------------------------------------- - - def _abort_queue(self): - while True: - ident,msg = self.session.recv(self.shell_socket, zmq.NOBLOCK) - if msg is None: - # msg=None on EAGAIN - break - else: - assert ident is not None, "Missing message part." - self.log.debug("Aborting: %s"%Message(msg)) - msg_type = msg['header']['msg_type'] - reply_type = msg_type.split('_')[0] + '_reply' - reply_msg = self.session.send(self.shell_socket, reply_type, {'status':'aborted'}, msg, ident=ident) - self.log.debug(Message(reply_msg)) - # We need to wait a bit for requests to come in. This can probably - # be set shorter for true asynchronous clients. - time.sleep(0.1) - - def _raw_input(self, prompt, ident, parent): - # Flush output before making the request. - sys.stderr.flush() - sys.stdout.flush() - - # Send the input request. - content = dict(prompt=prompt) - msg = self.session.send(self.stdin_socket, u'input_request', content, parent, ident=ident) - - # Await a response. - ident,reply = self.session.recv(self.stdin_socket, 0) - try: - value = reply['content']['value'] - except: - self.log.error("Got bad raw_input reply: %s"%Message(parent)) - value = '' - return value - - def _complete(self, msg): - return self.completer.complete(msg.content.line, msg.content.text) - - def _object_info(self, context): - symbol, leftover = self._symbol_from_context(context) - if symbol is not None and not leftover: - doc = getattr(symbol, '__doc__', '') - else: - doc = '' - object_info = dict(docstring = doc) - return object_info - - def _symbol_from_context(self, context): - if not context: - return None, context - - base_symbol_string = context[0] - symbol = self.user_ns.get(base_symbol_string, None) - if symbol is None: - symbol = __builtin__.__dict__.get(base_symbol_string, None) - if symbol is None: - return None, context - - context = context[1:] - for i, name in enumerate(context): - new_symbol = getattr(symbol, name, None) - if new_symbol is None: - return symbol, context[i:] - else: - symbol = new_symbol - - return symbol, [] - -#----------------------------------------------------------------------------- -# Kernel main and launch functions -#----------------------------------------------------------------------------- - -def launch_kernel(*args, **kwargs): - """ Launches a simple Python kernel, binding to the specified ports. - - This function simply calls entry_point.base_launch_kernel with the right first - command to start a pykernel. See base_launch_kernel for arguments. - - Returns - ------- - A tuple of form: - (kernel_process, xrep_port, pub_port, req_port, hb_port) - where kernel_process is a Popen object and the ports are integers. - """ - return base_launch_kernel('from IPython.zmq.pykernel import main; main()', - *args, **kwargs) - -def main(): - """Run a PyKernel as an application""" - app = KernelApp.instance() - app.initialize() - app.start() - -if __name__ == '__main__': - main() diff --git a/IPython/zmq/serialize.py b/IPython/zmq/serialize.py new file mode 100644 index 0000000..efff2d6 --- /dev/null +++ b/IPython/zmq/serialize.py @@ -0,0 +1,179 @@ +"""serialization utilities for apply messages + +Authors: + +* Min RK +""" +#----------------------------------------------------------------------------- +# Copyright (C) 2010-2011 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#----------------------------------------------------------------------------- + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +# Standard library imports +import logging +import os +import re +import socket +import sys + +try: + import cPickle + pickle = cPickle +except: + cPickle = None + import pickle + + +# IPython imports +from IPython.utils import py3compat +from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence +from IPython.utils.newserialized import serialize, unserialize + +if py3compat.PY3: + buffer = memoryview + +#----------------------------------------------------------------------------- +# Serialization Functions +#----------------------------------------------------------------------------- + +def serialize_object(obj, threshold=64e-6): + """Serialize an object into a list of sendable buffers. + + Parameters + ---------- + + obj : object + The object to be serialized + threshold : float + The threshold for not double-pickling the content. + + + Returns + ------- + ('pmd', [bufs]) : + where pmd is the pickled metadata wrapper, + bufs is a list of data buffers + """ + databuffers = [] + if isinstance(obj, (list, tuple)): + clist = canSequence(obj) + slist = map(serialize, clist) + for s in slist: + if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold: + databuffers.append(s.getData()) + s.data = None + return pickle.dumps(slist,-1), databuffers + elif isinstance(obj, dict): + sobj = {} + for k in sorted(obj.iterkeys()): + s = serialize(can(obj[k])) + if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold: + databuffers.append(s.getData()) + s.data = None + sobj[k] = s + return pickle.dumps(sobj,-1),databuffers + else: + s = serialize(can(obj)) + if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold: + databuffers.append(s.getData()) + s.data = None + return pickle.dumps(s,-1),databuffers + + +def unserialize_object(bufs): + """reconstruct an object serialized by serialize_object from data buffers.""" + bufs = list(bufs) + sobj = pickle.loads(bufs.pop(0)) + if isinstance(sobj, (list, tuple)): + for s in sobj: + if s.data is None: + s.data = bufs.pop(0) + return uncanSequence(map(unserialize, sobj)), bufs + elif isinstance(sobj, dict): + newobj = {} + for k in sorted(sobj.iterkeys()): + s = sobj[k] + if s.data is None: + s.data = bufs.pop(0) + newobj[k] = uncan(unserialize(s)) + return newobj, bufs + else: + if sobj.data is None: + sobj.data = bufs.pop(0) + return uncan(unserialize(sobj)), bufs + +def pack_apply_message(f, args, kwargs, threshold=64e-6): + """pack up a function, args, and kwargs to be sent over the wire + as a series of buffers. Any object whose data is larger than `threshold` + will not have their data copied (currently only numpy arrays support zero-copy)""" + msg = [pickle.dumps(can(f),-1)] + databuffers = [] # for large objects + sargs, bufs = serialize_object(args,threshold) + msg.append(sargs) + databuffers.extend(bufs) + skwargs, bufs = serialize_object(kwargs,threshold) + msg.append(skwargs) + databuffers.extend(bufs) + msg.extend(databuffers) + return msg + +def unpack_apply_message(bufs, g=None, copy=True): + """unpack f,args,kwargs from buffers packed by pack_apply_message() + Returns: original f,args,kwargs""" + bufs = list(bufs) # allow us to pop + assert len(bufs) >= 3, "not enough buffers!" + if not copy: + for i in range(3): + bufs[i] = bufs[i].bytes + cf = pickle.loads(bufs.pop(0)) + sargs = list(pickle.loads(bufs.pop(0))) + skwargs = dict(pickle.loads(bufs.pop(0))) + # print sargs, skwargs + f = uncan(cf, g) + for sa in sargs: + if sa.data is None: + m = bufs.pop(0) + if sa.getTypeDescriptor() in ('buffer', 'ndarray'): + # always use a buffer, until memoryviews get sorted out + sa.data = buffer(m) + # disable memoryview support + # if copy: + # sa.data = buffer(m) + # else: + # sa.data = m.buffer + else: + if copy: + sa.data = m + else: + sa.data = m.bytes + + args = uncanSequence(map(unserialize, sargs), g) + kwargs = {} + for k in sorted(skwargs.iterkeys()): + sa = skwargs[k] + if sa.data is None: + m = bufs.pop(0) + if sa.getTypeDescriptor() in ('buffer', 'ndarray'): + # always use a buffer, until memoryviews get sorted out + sa.data = buffer(m) + # disable memoryview support + # if copy: + # sa.data = buffer(m) + # else: + # sa.data = m.buffer + else: + if copy: + sa.data = m + else: + sa.data = m.bytes + + kwargs[k] = uncan(unserialize(sa), g) + + return f,args,kwargs + diff --git a/IPython/zmq/session.py b/IPython/zmq/session.py index 46eedd7..04d70d2 100644 --- a/IPython/zmq/session.py +++ b/IPython/zmq/session.py @@ -629,7 +629,7 @@ class Session(Configurable): if isinstance(socket, ZMQStream): socket = socket.socket try: - msg_list = socket.recv_multipart(mode) + msg_list = socket.recv_multipart(mode, copy=copy) except zmq.ZMQError as e: if e.errno == zmq.EAGAIN: # We can convert EAGAIN to None as we know in this case diff --git a/IPython/zmq/tests/test_embed_kernel.py b/IPython/zmq/tests/test_embed_kernel.py index 48c9555..4cc7f89 100644 --- a/IPython/zmq/tests/test_embed_kernel.py +++ b/IPython/zmq/tests/test_embed_kernel.py @@ -23,7 +23,7 @@ from subprocess import Popen, PIPE import nose.tools as nt from IPython.zmq.blockingkernelmanager import BlockingKernelManager -from IPython.utils import path +from IPython.utils import path, py3compat #------------------------------------------------------------------------------- @@ -68,17 +68,19 @@ def setup_kernel(cmd): ) # wait for connection file to exist, timeout after 5s tic = time.time() - while not os.path.exists(connection_file) and kernel.poll() is None and time.time() < tic + 5: + while not os.path.exists(connection_file) and kernel.poll() is None and time.time() < tic + 10: time.sleep(0.1) + if kernel.poll() is not None: + o,e = kernel.communicate() + e = py3compat.cast_unicode(e) + raise IOError("Kernel failed to start:\n%s" % e) + if not os.path.exists(connection_file): if kernel.poll() is None: kernel.terminate() raise IOError("Connection file %r never arrived" % connection_file) - if kernel.poll() is not None: - raise IOError("Kernel failed to start") - km = BlockingKernelManager(connection_file=connection_file) km.load_connection_file() km.start_channels() @@ -157,3 +159,33 @@ def test_embed_kernel_namespace(): content = msg['content'] nt.assert_false(content['found']) +def test_embed_kernel_reentrant(): + """IPython.embed_kernel() can be called multiple times""" + cmd = '\n'.join([ + 'from IPython import embed_kernel', + 'count = 0', + 'def go():', + ' global count', + ' embed_kernel()', + ' count = count + 1', + '', + 'while True:' + ' go()', + '', + ]) + + with setup_kernel(cmd) as km: + shell = km.shell_channel + for i in range(5): + msg_id = shell.object_info('count') + msg = shell.get_msg(block=True, timeout=2) + content = msg['content'] + nt.assert_true(content['found']) + nt.assert_equals(content['string_form'], unicode(i)) + + # exit from embed_kernel + shell.execute("get_ipython().exit_now = True") + msg = shell.get_msg(block=True, timeout=2) + time.sleep(0.2) + + diff --git a/IPython/zmq/zmqshell.py b/IPython/zmq/zmqshell.py index 56d777f..b8d420f 100644 --- a/IPython/zmq/zmqshell.py +++ b/IPython/zmq/zmqshell.py @@ -19,8 +19,12 @@ from __future__ import print_function import inspect import os import sys +import time from subprocess import Popen, PIPE +# System library imports +from zmq.eventloop import ioloop + # Our own from IPython.core.interactiveshell import ( InteractiveShell, InteractiveShellABC @@ -39,7 +43,7 @@ from IPython.utils import io from IPython.utils.jsonutil import json_clean from IPython.utils.path import get_py_filename from IPython.utils.process import arg_split -from IPython.utils.traitlets import Instance, Type, Dict, CBool +from IPython.utils.traitlets import Instance, Type, Dict, CBool, CBytes from IPython.utils.warn import warn, error from IPython.zmq.displayhook import ZMQShellDisplayHook, _encode_binary from IPython.zmq.session import extract_header @@ -56,6 +60,7 @@ class ZMQDisplayPublisher(DisplayPublisher): session = Instance(Session) pub_socket = Instance('zmq.Socket') parent_header = Dict({}) + topic = CBytes(b'displaypub') def set_parent(self, parent): """Set the parent for outbound messages.""" @@ -78,7 +83,7 @@ class ZMQDisplayPublisher(DisplayPublisher): content['metadata'] = metadata self.session.send( self.pub_socket, u'display_data', json_clean(content), - parent=self.parent_header + parent=self.parent_header, ident=self.topic, ) def clear_output(self, stdout=True, stderr=True, other=True): @@ -93,7 +98,7 @@ class ZMQDisplayPublisher(DisplayPublisher): self.session.send( self.pub_socket, u'clear_output', content, - parent=self.parent_header + parent=self.parent_header, ident=self.topic, ) class ZMQInteractiveShell(InteractiveShell): @@ -114,6 +119,12 @@ class ZMQInteractiveShell(InteractiveShell): exiter = Instance(ZMQExitAutocall) def _exiter_default(self): return ZMQExitAutocall(self) + + def _exit_now_changed(self, name, old, new): + """stop eventloop when exit_now fires""" + if new: + loop = ioloop.IOLoop.instance() + loop.add_timeout(time.time()+0.1, loop.stop) keepkernel_on_exit = None @@ -154,6 +165,7 @@ class ZMQInteractiveShell(InteractiveShell): def ask_exit(self): """Engage the exit actions.""" + self.exit_now = True payload = dict( source='IPython.zmq.zmqshell.ZMQInteractiveShell.ask_exit', exit=True, @@ -172,7 +184,11 @@ class ZMQInteractiveShell(InteractiveShell): dh = self.displayhook # Send exception info over pub socket for other clients than the caller # to pick up - exc_msg = dh.session.send(dh.pub_socket, u'pyerr', json_clean(exc_content), dh.parent_header) + topic = None + if dh.topic: + topic = dh.topic.replace(b'pyout', b'pyerr') + + exc_msg = dh.session.send(dh.pub_socket, u'pyerr', json_clean(exc_content), dh.parent_header, ident=topic) # FIXME - Hack: store exception info in shell object. Right now, the # caller is reading this info after the fact, we need to fix this logic diff --git a/docs/source/parallel/parallel_db.txt b/docs/source/parallel/parallel_db.txt index 648223f..a742f57 100644 --- a/docs/source/parallel/parallel_db.txt +++ b/docs/source/parallel/parallel_db.txt @@ -34,7 +34,7 @@ TaskRecord keys: =============== =============== ============= Key Type Description =============== =============== ============= -msg_id uuid(bytes) The msg ID +msg_id uuid(ascii) The msg ID header dict The request header content dict The request content (likely empty) buffers list(bytes) buffers containing serialized request objects @@ -43,7 +43,7 @@ client_uuid uuid(bytes) IDENT of client's socket engine_uuid uuid(bytes) IDENT of engine's socket started datetime time task began execution on engine completed datetime time task finished execution (success or failure) on engine -resubmitted datetime time of resubmission (if applicable) +resubmitted uuid(ascii) msg_id of resubmitted task (if applicable) result_header dict header for result result_content dict content for result result_buffers list(bytes) buffers containing serialized request objects