diff --git a/IPython/kernel/multikernelmanager.py b/IPython/kernel/multikernelmanager.py index e3c2d41..a665573 100644 --- a/IPython/kernel/multikernelmanager.py +++ b/IPython/kernel/multikernelmanager.py @@ -171,7 +171,7 @@ class MultiKernelManager(LoggingConfigurable): self.log.info("Signaled Kernel %s with %s" % (kernel_id, signum)) @kernel_method - def restart_kernel(self, kernel_id): + def restart_kernel(self, kernel_id, now=False): """Restart a kernel by its uuid, keeping the same ports. Parameters diff --git a/IPython/kernel/tests/test_kernel.py b/IPython/kernel/tests/test_kernel.py index 6859eda..0e834c4 100644 --- a/IPython/kernel/tests/test_kernel.py +++ b/IPython/kernel/tests/test_kernel.py @@ -11,104 +11,18 @@ # Imports #------------------------------------------------------------------------------- -import os -import shutil import sys -import tempfile - -from contextlib import contextmanager -from subprocess import PIPE import nose.tools as nt -from IPython.kernel import KernelManager -from IPython.kernel.tests.test_message_spec import execute, flush_channels from IPython.testing import decorators as dec, tools as tt -from IPython.utils import path, py3compat +from IPython.utils import py3compat + +from .utils import new_kernel, kernel, TIMEOUT, assemble_output, execute, flush_channels #------------------------------------------------------------------------------- # Tests #------------------------------------------------------------------------------- -IPYTHONDIR = None -save_env = None -save_get_ipython_dir = None - -STARTUP_TIMEOUT = 60 -TIMEOUT = 15 - -def setup(): - """setup temporary IPYTHONDIR for tests""" - global IPYTHONDIR - global save_env - global save_get_ipython_dir - - IPYTHONDIR = tempfile.mkdtemp() - - save_env = os.environ.copy() - os.environ["IPYTHONDIR"] = IPYTHONDIR - - save_get_ipython_dir = path.get_ipython_dir - path.get_ipython_dir = lambda : IPYTHONDIR - - -def teardown(): - path.get_ipython_dir = save_get_ipython_dir - os.environ = save_env - - try: - shutil.rmtree(IPYTHONDIR) - except (OSError, IOError): - # no such file - pass - - -@contextmanager -def new_kernel(): - """start a kernel in a subprocess, and wait for it to be ready - - Returns - ------- - kernel_manager: connected KernelManager instance - """ - KM = KernelManager() - - KM.start_kernel(stdout=PIPE, stderr=PIPE) - KC = KM.client() - KC.start_channels() - - # wait for kernel to be ready - KC.shell_channel.execute("import sys") - KC.shell_channel.get_msg(block=True, timeout=STARTUP_TIMEOUT) - flush_channels(KC) - try: - yield KC - finally: - KC.stop_channels() - KM.shutdown_kernel() - - -def assemble_output(iopub): - """assemble stdout/err from an execution""" - stdout = '' - stderr = '' - while True: - msg = iopub.get_msg(block=True, timeout=1) - msg_type = msg['msg_type'] - content = msg['content'] - if msg_type == 'status' and content['execution_state'] == 'idle': - # idle message signals end of output - break - elif msg['msg_type'] == 'stream': - if content['name'] == 'stdout': - stdout = stdout + content['data'] - elif content['name'] == 'stderr': - stderr = stderr + content['data'] - else: - raise KeyError("bad stream: %r" % content['name']) - else: - # other output, ignored - pass - return stdout, stderr def _check_mp_mode(kc, expected=False, stream="stdout"): @@ -123,7 +37,7 @@ def _check_mp_mode(kc, expected=False, stream="stdout"): def test_simple_print(): """simple print statement in kernel""" - with new_kernel() as kc: + with kernel() as kc: iopub = kc.iopub_channel msg_id, content = execute(kc=kc, code="print ('hi')") stdout, stderr = assemble_output(iopub) @@ -165,7 +79,7 @@ def test_subprocess_print(): def test_subprocess_noprint(): """mp.Process without print doesn't trigger iostream mp_mode""" - with new_kernel() as kc: + with kernel() as kc: iopub = kc.iopub_channel np = 5 @@ -210,7 +124,7 @@ def test_subprocess_error(): def test_raw_input(): """test [raw_]input""" - with new_kernel() as kc: + with kernel() as kc: iopub = kc.iopub_channel input_f = "input" if py3compat.PY3 else "raw_input" @@ -232,7 +146,7 @@ def test_raw_input(): @dec.skipif(py3compat.PY3) def test_eval_input(): """test input() on Python 2""" - with new_kernel() as kc: + with kernel() as kc: iopub = kc.iopub_channel input_f = "input" if py3compat.PY3 else "raw_input" diff --git a/IPython/kernel/tests/test_kernelmanager.py b/IPython/kernel/tests/test_kernelmanager.py index ad6fef4..fdefe2d 100644 --- a/IPython/kernel/tests/test_kernelmanager.py +++ b/IPython/kernel/tests/test_kernelmanager.py @@ -26,18 +26,11 @@ class TestKernelManager(TestCase): def _run_lifecycle(self, km): km.start_kernel(stdout=PIPE, stderr=PIPE) self.assertTrue(km.is_alive()) - km.restart_kernel() + km.restart_kernel(now=True) self.assertTrue(km.is_alive()) - # We need a delay here to give the restarting kernel a chance to - # restart. Otherwise, the interrupt will kill it, causing the test - # suite to hang. The reason it *hangs* is that the shutdown - # message for the restart sometimes hasn't been sent to the kernel. - # Because linger is oo on the shell channel, the context can't - # close until the message is sent to the kernel, which is not dead. - time.sleep(1.0) km.interrupt_kernel() self.assertTrue(isinstance(km, KernelManager)) - km.shutdown_kernel() + km.shutdown_kernel(now=True) def test_tcp_lifecycle(self): km = self._get_tcp_km() diff --git a/IPython/kernel/tests/test_message_spec.py b/IPython/kernel/tests/test_message_spec.py index b676498..29c56cb 100644 --- a/IPython/kernel/tests/test_message_spec.py +++ b/IPython/kernel/tests/test_message_spec.py @@ -1,7 +1,7 @@ -"""Test suite for our zeromq-based messaging specification. +"""Test suite for our zeromq-based message specification. """ #----------------------------------------------------------------------------- -# Copyright (C) 2010-2011 The IPython Development Team +# Copyright (C) 2010 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING.txt, distributed as part of this software. @@ -19,72 +19,21 @@ from IPython.utils.traitlets import ( HasTraits, TraitError, Bool, Unicode, Dict, Integer, List, Enum, Any, ) +from .utils import TIMEOUT, start_global_kernel, flush_channels, execute + #----------------------------------------------------------------------------- -# Global setup and utilities +# Globals #----------------------------------------------------------------------------- - -STARTUP_TIMEOUT = 60 -TIMEOUT = 15 +KC = None def setup(): - global KM, KC - KM = KernelManager() - KM.start_kernel(stdout=PIPE, stderr=PIPE) - KC = KM.client() - KC.start_channels() - - # wait for kernel to be ready - try: - msg = KC.iopub_channel.get_msg(block=True, timeout=STARTUP_TIMEOUT) - except Empty: - pass - msg_id = KC.kernel_info() - KC.get_shell_msg(block=True, timeout=STARTUP_TIMEOUT) - flush_channels() - - -def teardown(): - KC.stop_channels() - KM.shutdown_kernel() - - -def flush_channels(kc=None): - """flush any messages waiting on the queue""" - if kc is None: - kc = KC - for channel in (kc.shell_channel, kc.iopub_channel): - while True: - try: - msg = channel.get_msg(block=True, timeout=0.1) - except Empty: - break - else: - validate_message(msg) - - -def execute(code='', kc=None, **kwargs): - """wrapper for doing common steps for validating an execution request""" - if kc is None: - kc = KC - msg_id = kc.execute(code=code, **kwargs) - reply = kc.get_shell_msg(timeout=TIMEOUT) - validate_message(reply, 'execute_reply', msg_id) - busy = kc.get_iopub_msg(timeout=TIMEOUT) - validate_message(busy, 'status', msg_id) - nt.assert_equal(busy['content']['execution_state'], 'busy') - - if not kwargs.get('silent'): - pyin = kc.get_iopub_msg(timeout=TIMEOUT) - validate_message(pyin, 'pyin', msg_id) - nt.assert_equal(pyin['content']['code'], code) - - return msg_id, reply['content'] + global KC + KC = start_global_kernel() #----------------------------------------------------------------------------- -# MSG Spec References +# Message Spec References #----------------------------------------------------------------------------- - class Reference(HasTraits): """ diff --git a/IPython/kernel/tests/test_multikernelmanager.py b/IPython/kernel/tests/test_multikernelmanager.py index 0c6ffe5..1136af2 100644 --- a/IPython/kernel/tests/test_multikernelmanager.py +++ b/IPython/kernel/tests/test_multikernelmanager.py @@ -31,20 +31,13 @@ class TestKernelManager(TestCase): self.assertTrue(kid in km) self.assertTrue(kid in km.list_kernel_ids()) self.assertEqual(len(km),1) - km.restart_kernel(kid) + km.restart_kernel(kid, now=True) self.assertTrue(km.is_alive(kid)) self.assertTrue(kid in km.list_kernel_ids()) - # We need a delay here to give the restarting kernel a chance to - # restart. Otherwise, the interrupt will kill it, causing the test - # suite to hang. The reason it *hangs* is that the shutdown - # message for the restart sometimes hasn't been sent to the kernel. - # Because linger is oo on the shell channel, the context can't - # close until the message is sent to the kernel, which is not dead. - time.sleep(1.0) km.interrupt_kernel(kid) k = km.get_kernel(kid) self.assertTrue(isinstance(k, KernelManager)) - km.shutdown_kernel(kid) + km.shutdown_kernel(kid, now=True) self.assertTrue(not kid in km) def _run_cinfo(self, km, transport, ip): @@ -63,7 +56,7 @@ class TestKernelManager(TestCase): self.assertTrue('hb_port' in cinfo) stream = km.connect_hb(kid) stream.close() - km.shutdown_kernel(kid) + km.shutdown_kernel(kid, now=True) def test_tcp_lifecycle(self): km = self._get_tcp_km() diff --git a/IPython/kernel/tests/utils.py b/IPython/kernel/tests/utils.py new file mode 100644 index 0000000..a3cd92f --- /dev/null +++ b/IPython/kernel/tests/utils.py @@ -0,0 +1,167 @@ +"""utilities for testing IPython kernels""" + +#------------------------------------------------------------------------------- +# Copyright (C) 2013 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#------------------------------------------------------------------------------- + +#------------------------------------------------------------------------------- +# Imports +#------------------------------------------------------------------------------- + +import atexit + +from contextlib import contextmanager +from subprocess import PIPE +from Queue import Empty + +import nose.tools as nt + +from IPython.kernel import KernelManager + +#------------------------------------------------------------------------------- +# Globals +#------------------------------------------------------------------------------- + +STARTUP_TIMEOUT = 60 +TIMEOUT = 15 + +KM = None +KC = None + +#------------------------------------------------------------------------------- +# code +#------------------------------------------------------------------------------- + + +def start_new_kernel(): + """start a new kernel, and return its Manager and Client""" + km = KernelManager() + km.start_kernel(stdout=PIPE, stderr=PIPE) + kc = km.client() + kc.start_channels() + + msg_id = kc.kernel_info() + kc.get_shell_msg(block=True, timeout=STARTUP_TIMEOUT) + flush_channels(kc) + return km, kc + +def flush_channels(kc=None): + """flush any messages waiting on the queue""" + from .test_message_spec import validate_message + + if kc is None: + kc = KC + for channel in (kc.shell_channel, kc.iopub_channel): + while True: + try: + msg = channel.get_msg(block=True, timeout=0.1) + except Empty: + break + else: + validate_message(msg) + + +def execute(code='', kc=None, **kwargs): + """wrapper for doing common steps for validating an execution request""" + from .test_message_spec import validate_message + if kc is None: + kc = KC + msg_id = kc.execute(code=code, **kwargs) + reply = kc.get_shell_msg(timeout=TIMEOUT) + validate_message(reply, 'execute_reply', msg_id) + busy = kc.get_iopub_msg(timeout=TIMEOUT) + validate_message(busy, 'status', msg_id) + nt.assert_equal(busy['content']['execution_state'], 'busy') + + if not kwargs.get('silent'): + pyin = kc.get_iopub_msg(timeout=TIMEOUT) + validate_message(pyin, 'pyin', msg_id) + nt.assert_equal(pyin['content']['code'], code) + + return msg_id, reply['content'] + +def start_global_kernel(): + """start the global kernel (if it isn't running) and return its client""" + global KM, KC + if KM is None: + KM, KC = start_new_kernel() + atexit.register(stop_global_kernel) + return KC + +@contextmanager +def kernel(): + """Context manager for the global kernel instance + + Should be used for most kernel tests + + Returns + ------- + kernel_client: connected KernelClient instance + """ + yield start_global_kernel() + +def uses_kernel(test_f): + """Decorator for tests that use the global kernel""" + def wrapped_test(): + with kernel() as kc: + test_f(kc) + wrapped_test.__doc__ = test_f.__doc__ + wrapped_test.__name__ = test_f.__name__ + return wrapped_test + +def stop_global_kernel(): + """Stop the global shared kernel instance, if it exists""" + global KM, KC + KC.stop_channels() + KC = None + if KM is None: + return + KM.shutdown_kernel(now=True) + KM = None + +@contextmanager +def new_kernel(): + """Context manager for a new kernel in a subprocess + + Should only be used for tests where the kernel must not be re-used. + + Returns + ------- + kernel_client: connected KernelClient instance + """ + km, kc = start_new_kernel() + try: + yield kc + finally: + kc.stop_channels() + km.shutdown_kernel(now=True) + + +def assemble_output(iopub): + """assemble stdout/err from an execution""" + stdout = '' + stderr = '' + while True: + msg = iopub.get_msg(block=True, timeout=1) + msg_type = msg['msg_type'] + content = msg['content'] + if msg_type == 'status' and content['execution_state'] == 'idle': + # idle message signals end of output + break + elif msg['msg_type'] == 'stream': + if content['name'] == 'stdout': + stdout += content['data'] + elif content['name'] == 'stderr': + stderr += content['data'] + else: + raise KeyError("bad stream: %r" % content['name']) + else: + # other output, ignored + pass + return stdout, stderr + + +