utils.py
163 lines
| 4.7 KiB
| text/x-python
|
PythonLexer
MinRK
|
r12414 | """utilities for testing IPython kernels""" | ||
MinRK
|
r16567 | # Copyright (c) IPython Development Team. | ||
# Distributed under the terms of the Modified BSD License. | ||||
MinRK
|
r12414 | |||
import atexit | ||||
Min RK
|
r20955 | import os | ||
MinRK
|
r12414 | |||
from contextlib import contextmanager | ||||
Thomas Kluyver
|
r13156 | from subprocess import PIPE, STDOUT | ||
Thomas Kluyver
|
r13354 | try: | ||
from queue import Empty # Py 3 | ||||
except ImportError: | ||||
from Queue import Empty # Py 2 | ||||
MinRK
|
r12414 | |||
Thomas Kluyver
|
r13156 | import nose | ||
MinRK
|
r12414 | import nose.tools as nt | ||
Min RK
|
r20955 | from jupyter_client import manager | ||
MinRK
|
r12414 | |||
#------------------------------------------------------------------------------- | ||||
# Globals | ||||
#------------------------------------------------------------------------------- | ||||
STARTUP_TIMEOUT = 60 | ||||
TIMEOUT = 15 | ||||
KM = None | ||||
KC = None | ||||
#------------------------------------------------------------------------------- | ||||
# code | ||||
#------------------------------------------------------------------------------- | ||||
Thomas Kluyver
|
r17821 | def start_new_kernel(**kwargs): | ||
"""start a new kernel, and return its Manager and Client | ||||
Min RK
|
r20955 | |||
Thomas Kluyver
|
r17821 | Integrates with our output capturing for tests. | ||
""" | ||||
kwargs.update(dict(stdout=nose.iptest_stdstreams_fileno(), stderr=STDOUT)) | ||||
return manager.start_new_kernel(startup_timeout=STARTUP_TIMEOUT, **kwargs) | ||||
MinRK
|
r12414 | |||
def flush_channels(kc=None): | ||||
"""flush any messages waiting on the queue""" | ||||
from .test_message_spec import validate_message | ||||
Min RK
|
r20955 | |||
MinRK
|
r12414 | if kc is None: | ||
kc = KC | ||||
for channel in (kc.shell_channel, kc.iopub_channel): | ||||
while True: | ||||
try: | ||||
msg = channel.get_msg(block=True, timeout=0.1) | ||||
except Empty: | ||||
break | ||||
else: | ||||
validate_message(msg) | ||||
def execute(code='', kc=None, **kwargs): | ||||
"""wrapper for doing common steps for validating an execution request""" | ||||
from .test_message_spec import validate_message | ||||
if kc is None: | ||||
kc = KC | ||||
msg_id = kc.execute(code=code, **kwargs) | ||||
reply = kc.get_shell_msg(timeout=TIMEOUT) | ||||
validate_message(reply, 'execute_reply', msg_id) | ||||
busy = kc.get_iopub_msg(timeout=TIMEOUT) | ||||
validate_message(busy, 'status', msg_id) | ||||
nt.assert_equal(busy['content']['execution_state'], 'busy') | ||||
Min RK
|
r20955 | |||
MinRK
|
r12414 | if not kwargs.get('silent'): | ||
MinRK
|
r16567 | execute_input = kc.get_iopub_msg(timeout=TIMEOUT) | ||
validate_message(execute_input, 'execute_input', msg_id) | ||||
nt.assert_equal(execute_input['content']['code'], code) | ||||
Min RK
|
r20955 | |||
MinRK
|
r12414 | return msg_id, reply['content'] | ||
def start_global_kernel(): | ||||
"""start the global kernel (if it isn't running) and return its client""" | ||||
global KM, KC | ||||
if KM is None: | ||||
KM, KC = start_new_kernel() | ||||
atexit.register(stop_global_kernel) | ||||
MinRK
|
r18478 | else: | ||
flush_channels(KC) | ||||
MinRK
|
r12414 | return KC | ||
@contextmanager | ||||
def kernel(): | ||||
"""Context manager for the global kernel instance | ||||
Min RK
|
r20955 | |||
MinRK
|
r12414 | Should be used for most kernel tests | ||
Min RK
|
r20955 | |||
MinRK
|
r12414 | Returns | ||
------- | ||||
kernel_client: connected KernelClient instance | ||||
""" | ||||
yield start_global_kernel() | ||||
def uses_kernel(test_f): | ||||
"""Decorator for tests that use the global kernel""" | ||||
def wrapped_test(): | ||||
with kernel() as kc: | ||||
test_f(kc) | ||||
wrapped_test.__doc__ = test_f.__doc__ | ||||
wrapped_test.__name__ = test_f.__name__ | ||||
return wrapped_test | ||||
def stop_global_kernel(): | ||||
"""Stop the global shared kernel instance, if it exists""" | ||||
global KM, KC | ||||
KC.stop_channels() | ||||
KC = None | ||||
if KM is None: | ||||
return | ||||
KM.shutdown_kernel(now=True) | ||||
KM = None | ||||
MinRK
|
r12853 | def new_kernel(argv=None): | ||
MinRK
|
r12414 | """Context manager for a new kernel in a subprocess | ||
Min RK
|
r20955 | |||
MinRK
|
r12414 | Should only be used for tests where the kernel must not be re-used. | ||
Min RK
|
r20955 | |||
MinRK
|
r12414 | Returns | ||
------- | ||||
kernel_client: connected KernelClient instance | ||||
""" | ||||
Min RK
|
r20955 | kwargs = dict( | ||
stdout=nose.iptest_stdstreams_fileno(), stderr=STDOUT, | ||||
Thomas Kluyver
|
r17821 | startup_timeout=STARTUP_TIMEOUT) | ||
if argv is not None: | ||||
kwargs['extra_arguments'] = argv | ||||
return manager.run_kernel(**kwargs) | ||||
MinRK
|
r12414 | |||
def assemble_output(iopub): | ||||
"""assemble stdout/err from an execution""" | ||||
stdout = '' | ||||
stderr = '' | ||||
while True: | ||||
msg = iopub.get_msg(block=True, timeout=1) | ||||
msg_type = msg['msg_type'] | ||||
content = msg['content'] | ||||
if msg_type == 'status' and content['execution_state'] == 'idle': | ||||
# idle message signals end of output | ||||
break | ||||
elif msg['msg_type'] == 'stream': | ||||
if content['name'] == 'stdout': | ||||
MinRK
|
r18104 | stdout += content['text'] | ||
MinRK
|
r12414 | elif content['name'] == 'stderr': | ||
MinRK
|
r18104 | stderr += content['text'] | ||
MinRK
|
r12414 | else: | ||
raise KeyError("bad stream: %r" % content['name']) | ||||
else: | ||||
# other output, ignored | ||||
pass | ||||
return stdout, stderr | ||||
Thomas Kluyver
|
r13892 | def wait_for_idle(kc): | ||
while True: | ||||
msg = kc.iopub_channel.get_msg(block=True, timeout=1) | ||||
msg_type = msg['msg_type'] | ||||
content = msg['content'] | ||||
if msg_type == 'status' and content['execution_state'] == 'idle': | ||||
break | ||||