##// END OF EJS Templates
Merge pull request #4438 from takluyver/1codebase...
Merge pull request #4438 from takluyver/1codebase Single codebase Python 3 support (again)

File last commit:

r13354:5d199385
r13404:dd39c908 merge
Show More
utils.py
176 lines | 5.0 KiB | text/x-python | PythonLexer
"""utilities for testing IPython kernels"""
#-------------------------------------------------------------------------------
# Copyright (C) 2013 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
import atexit
from contextlib import contextmanager
from subprocess import PIPE, STDOUT
try:
from queue import Empty # Py 3
except ImportError:
from Queue import Empty # Py 2
import nose
import nose.tools as nt
from IPython.kernel import KernelManager
#-------------------------------------------------------------------------------
# Globals
#-------------------------------------------------------------------------------
STARTUP_TIMEOUT = 60
TIMEOUT = 15
KM = None
KC = None
#-------------------------------------------------------------------------------
# code
#-------------------------------------------------------------------------------
def start_new_kernel(argv=None):
"""start a new kernel, and return its Manager and Client"""
km = KernelManager()
kwargs = dict(stdout=PIPE, stderr=STDOUT)
if argv:
kwargs['extra_arguments'] = argv
km.start_kernel(**kwargs)
nose.ipy_stream_capturer.add_stream(km.kernel.stdout.fileno())
nose.ipy_stream_capturer.ensure_started()
kc = km.client()
kc.start_channels()
msg_id = kc.kernel_info()
kc.get_shell_msg(block=True, timeout=STARTUP_TIMEOUT)
flush_channels(kc)
return km, kc
def flush_channels(kc=None):
"""flush any messages waiting on the queue"""
from .test_message_spec import validate_message
if kc is None:
kc = KC
for channel in (kc.shell_channel, kc.iopub_channel):
while True:
try:
msg = channel.get_msg(block=True, timeout=0.1)
except Empty:
break
else:
validate_message(msg)
def execute(code='', kc=None, **kwargs):
"""wrapper for doing common steps for validating an execution request"""
from .test_message_spec import validate_message
if kc is None:
kc = KC
msg_id = kc.execute(code=code, **kwargs)
reply = kc.get_shell_msg(timeout=TIMEOUT)
validate_message(reply, 'execute_reply', msg_id)
busy = kc.get_iopub_msg(timeout=TIMEOUT)
validate_message(busy, 'status', msg_id)
nt.assert_equal(busy['content']['execution_state'], 'busy')
if not kwargs.get('silent'):
pyin = kc.get_iopub_msg(timeout=TIMEOUT)
validate_message(pyin, 'pyin', msg_id)
nt.assert_equal(pyin['content']['code'], code)
return msg_id, reply['content']
def start_global_kernel():
"""start the global kernel (if it isn't running) and return its client"""
global KM, KC
if KM is None:
KM, KC = start_new_kernel()
atexit.register(stop_global_kernel)
return KC
@contextmanager
def kernel():
"""Context manager for the global kernel instance
Should be used for most kernel tests
Returns
-------
kernel_client: connected KernelClient instance
"""
yield start_global_kernel()
def uses_kernel(test_f):
"""Decorator for tests that use the global kernel"""
def wrapped_test():
with kernel() as kc:
test_f(kc)
wrapped_test.__doc__ = test_f.__doc__
wrapped_test.__name__ = test_f.__name__
return wrapped_test
def stop_global_kernel():
"""Stop the global shared kernel instance, if it exists"""
global KM, KC
KC.stop_channels()
KC = None
if KM is None:
return
KM.shutdown_kernel(now=True)
KM = None
@contextmanager
def new_kernel(argv=None):
"""Context manager for a new kernel in a subprocess
Should only be used for tests where the kernel must not be re-used.
Returns
-------
kernel_client: connected KernelClient instance
"""
km, kc = start_new_kernel(argv)
try:
yield kc
finally:
kc.stop_channels()
km.shutdown_kernel(now=True)
def assemble_output(iopub):
"""assemble stdout/err from an execution"""
stdout = ''
stderr = ''
while True:
msg = iopub.get_msg(block=True, timeout=1)
msg_type = msg['msg_type']
content = msg['content']
if msg_type == 'status' and content['execution_state'] == 'idle':
# idle message signals end of output
break
elif msg['msg_type'] == 'stream':
if content['name'] == 'stdout':
stdout += content['data']
elif content['name'] == 'stderr':
stderr += content['data']
else:
raise KeyError("bad stream: %r" % content['name'])
else:
# other output, ignored
pass
return stdout, stderr