utils.py
175 lines
| 5.0 KiB
| text/x-python
|
PythonLexer
MinRK
|
r12414 | """utilities for testing IPython kernels""" | ||
#------------------------------------------------------------------------------- | ||||
# Copyright (C) 2013 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#------------------------------------------------------------------------------- | ||||
#------------------------------------------------------------------------------- | ||||
# Imports | ||||
#------------------------------------------------------------------------------- | ||||
import atexit | ||||
from contextlib import contextmanager | ||||
Thomas Kluyver
|
r13156 | from subprocess import PIPE, STDOUT | ||
Thomas Kluyver
|
r13354 | try: | ||
from queue import Empty # Py 3 | ||||
except ImportError: | ||||
from Queue import Empty # Py 2 | ||||
MinRK
|
r12414 | |||
Thomas Kluyver
|
r13156 | import nose | ||
MinRK
|
r12414 | import nose.tools as nt | ||
from IPython.kernel import KernelManager | ||||
#------------------------------------------------------------------------------- | ||||
# Globals | ||||
#------------------------------------------------------------------------------- | ||||
STARTUP_TIMEOUT = 60 | ||||
TIMEOUT = 15 | ||||
KM = None | ||||
KC = None | ||||
#------------------------------------------------------------------------------- | ||||
# code | ||||
#------------------------------------------------------------------------------- | ||||
MinRK
|
r12853 | def start_new_kernel(argv=None): | ||
MinRK
|
r12414 | """start a new kernel, and return its Manager and Client""" | ||
km = KernelManager() | ||||
Thomas Kluyver
|
r13405 | kwargs = dict(stdout=nose.ipy_stream_capturer.writefd, stderr=STDOUT) | ||
MinRK
|
r12853 | if argv: | ||
kwargs['extra_arguments'] = argv | ||||
km.start_kernel(**kwargs) | ||||
Thomas Kluyver
|
r13156 | nose.ipy_stream_capturer.ensure_started() | ||
MinRK
|
r12414 | kc = km.client() | ||
kc.start_channels() | ||||
msg_id = kc.kernel_info() | ||||
kc.get_shell_msg(block=True, timeout=STARTUP_TIMEOUT) | ||||
flush_channels(kc) | ||||
return km, kc | ||||
def flush_channels(kc=None): | ||||
"""flush any messages waiting on the queue""" | ||||
from .test_message_spec import validate_message | ||||
if kc is None: | ||||
kc = KC | ||||
for channel in (kc.shell_channel, kc.iopub_channel): | ||||
while True: | ||||
try: | ||||
msg = channel.get_msg(block=True, timeout=0.1) | ||||
except Empty: | ||||
break | ||||
else: | ||||
validate_message(msg) | ||||
def execute(code='', kc=None, **kwargs): | ||||
"""wrapper for doing common steps for validating an execution request""" | ||||
from .test_message_spec import validate_message | ||||
if kc is None: | ||||
kc = KC | ||||
msg_id = kc.execute(code=code, **kwargs) | ||||
reply = kc.get_shell_msg(timeout=TIMEOUT) | ||||
validate_message(reply, 'execute_reply', msg_id) | ||||
busy = kc.get_iopub_msg(timeout=TIMEOUT) | ||||
validate_message(busy, 'status', msg_id) | ||||
nt.assert_equal(busy['content']['execution_state'], 'busy') | ||||
if not kwargs.get('silent'): | ||||
pyin = kc.get_iopub_msg(timeout=TIMEOUT) | ||||
validate_message(pyin, 'pyin', msg_id) | ||||
nt.assert_equal(pyin['content']['code'], code) | ||||
return msg_id, reply['content'] | ||||
def start_global_kernel(): | ||||
"""start the global kernel (if it isn't running) and return its client""" | ||||
global KM, KC | ||||
if KM is None: | ||||
KM, KC = start_new_kernel() | ||||
atexit.register(stop_global_kernel) | ||||
return KC | ||||
@contextmanager | ||||
def kernel(): | ||||
"""Context manager for the global kernel instance | ||||
Should be used for most kernel tests | ||||
Returns | ||||
------- | ||||
kernel_client: connected KernelClient instance | ||||
""" | ||||
yield start_global_kernel() | ||||
def uses_kernel(test_f): | ||||
"""Decorator for tests that use the global kernel""" | ||||
def wrapped_test(): | ||||
with kernel() as kc: | ||||
test_f(kc) | ||||
wrapped_test.__doc__ = test_f.__doc__ | ||||
wrapped_test.__name__ = test_f.__name__ | ||||
return wrapped_test | ||||
def stop_global_kernel(): | ||||
"""Stop the global shared kernel instance, if it exists""" | ||||
global KM, KC | ||||
KC.stop_channels() | ||||
KC = None | ||||
if KM is None: | ||||
return | ||||
KM.shutdown_kernel(now=True) | ||||
KM = None | ||||
@contextmanager | ||||
MinRK
|
r12853 | def new_kernel(argv=None): | ||
MinRK
|
r12414 | """Context manager for a new kernel in a subprocess | ||
Should only be used for tests where the kernel must not be re-used. | ||||
Returns | ||||
------- | ||||
kernel_client: connected KernelClient instance | ||||
""" | ||||
MinRK
|
r12853 | km, kc = start_new_kernel(argv) | ||
MinRK
|
r12414 | try: | ||
yield kc | ||||
finally: | ||||
kc.stop_channels() | ||||
MinRK
|
r12418 | km.shutdown_kernel(now=True) | ||
MinRK
|
r12414 | |||
def assemble_output(iopub): | ||||
"""assemble stdout/err from an execution""" | ||||
stdout = '' | ||||
stderr = '' | ||||
while True: | ||||
msg = iopub.get_msg(block=True, timeout=1) | ||||
msg_type = msg['msg_type'] | ||||
content = msg['content'] | ||||
if msg_type == 'status' and content['execution_state'] == 'idle': | ||||
# idle message signals end of output | ||||
break | ||||
elif msg['msg_type'] == 'stream': | ||||
if content['name'] == 'stdout': | ||||
MinRK
|
r12418 | stdout += content['data'] | ||
MinRK
|
r12414 | elif content['name'] == 'stderr': | ||
MinRK
|
r12418 | stderr += content['data'] | ||
MinRK
|
r12414 | else: | ||
raise KeyError("bad stream: %r" % content['name']) | ||||
else: | ||||
# other output, ignored | ||||
pass | ||||
return stdout, stderr | ||||