##// END OF EJS Templates
Capture error messages while stopping ipython clusters using batch (qdel, bkill) commands. Helps diagnosing issues during cluster shutdown, which are otherwise silently ignored.
Capture error messages while stopping ipython clusters using batch (qdel, bkill) commands. Helps diagnosing issues during cluster shutdown, which are otherwise silently ignored.

File last commit:

r9449:912e9fe7
r11285:23ff2ea0
Show More
test_kernel.py
201 lines | 6.1 KiB | text/x-python | PythonLexer
"""test the IPython Kernel"""
#-------------------------------------------------------------------------------
# Copyright (C) 2013 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
import os
import shutil
import sys
import tempfile
from Queue import Empty
from contextlib import contextmanager
from subprocess import PIPE
import nose.tools as nt
from IPython.zmq.blockingkernelmanager import BlockingKernelManager
from IPython.zmq.tests.test_message_spec import execute, flush_channels
from IPython.testing import decorators as dec
from IPython.utils import path, py3compat
#-------------------------------------------------------------------------------
# Tests
#-------------------------------------------------------------------------------
def setup():
"""setup temporary IPYTHONDIR for tests"""
global IPYTHONDIR
global save_env
global save_get_ipython_dir
IPYTHONDIR = tempfile.mkdtemp()
save_env = os.environ.copy()
os.environ["IPYTHONDIR"] = IPYTHONDIR
save_get_ipython_dir = path.get_ipython_dir
path.get_ipython_dir = lambda : IPYTHONDIR
def teardown():
path.get_ipython_dir = save_get_ipython_dir
os.environ = save_env
try:
shutil.rmtree(IPYTHONDIR)
except (OSError, IOError):
# no such file
pass
@contextmanager
def new_kernel():
"""start a kernel in a subprocess, and wait for it to be ready
Returns
-------
kernel_manager: connected KernelManager instance
"""
KM = BlockingKernelManager()
KM.start_kernel(stdout=PIPE, stderr=PIPE)
KM.start_channels()
# wait for kernel to be ready
KM.shell_channel.execute("import sys")
KM.shell_channel.get_msg(block=True, timeout=5)
flush_channels(KM)
try:
yield KM
finally:
KM.stop_channels()
KM.shutdown_kernel()
def assemble_output(iopub):
"""assemble stdout/err from an execution"""
stdout = ''
stderr = ''
while True:
msg = iopub.get_msg(block=True, timeout=1)
msg_type = msg['msg_type']
content = msg['content']
if msg_type == 'status' and content['execution_state'] == 'idle':
# idle message signals end of output
break
elif msg['msg_type'] == 'stream':
if content['name'] == 'stdout':
stdout = stdout + content['data']
elif content['name'] == 'stderr':
stderr = stderr + content['data']
else:
raise KeyError("bad stream: %r" % content['name'])
else:
# other output, ignored
pass
return stdout, stderr
def _check_mp_mode(km, expected=False, stream="stdout"):
execute(km=km, code="import sys")
flush_channels(km)
msg_id, content = execute(km=km, code="print (sys.%s._check_mp_mode())" % stream)
stdout, stderr = assemble_output(km.iopub_channel)
nt.assert_equal(eval(stdout.strip()), expected)
def test_simple_print():
"""simple print statement in kernel"""
with new_kernel() as km:
iopub = km.iopub_channel
msg_id, content = execute(km=km, code="print ('hi')")
stdout, stderr = assemble_output(iopub)
nt.assert_equal(stdout, 'hi\n')
nt.assert_equal(stderr, '')
_check_mp_mode(km, expected=False)
print ('hello')
@dec.knownfailureif(sys.platform == 'win32', "subprocess prints fail on Windows")
def test_subprocess_print():
"""printing from forked mp.Process"""
with new_kernel() as km:
iopub = km.iopub_channel
_check_mp_mode(km, expected=False)
flush_channels(km)
np = 5
code = '\n'.join([
"from __future__ import print_function",
"import multiprocessing as mp",
"pool = [mp.Process(target=print, args=('hello', i,)) for i in range(%i)]" % np,
"for p in pool: p.start()",
"for p in pool: p.join()"
])
expected = '\n'.join([
"hello %s" % i for i in range(np)
]) + '\n'
msg_id, content = execute(km=km, code=code)
stdout, stderr = assemble_output(iopub)
nt.assert_equal(stdout.count("hello"), np, stdout)
for n in range(np):
nt.assert_equal(stdout.count(str(n)), 1, stdout)
nt.assert_equal(stderr, '')
_check_mp_mode(km, expected=False)
_check_mp_mode(km, expected=False, stream="stderr")
def test_subprocess_noprint():
"""mp.Process without print doesn't trigger iostream mp_mode"""
with new_kernel() as km:
iopub = km.iopub_channel
np = 5
code = '\n'.join([
"import multiprocessing as mp",
"pool = [mp.Process(target=range, args=(i,)) for i in range(%i)]" % np,
"for p in pool: p.start()",
"for p in pool: p.join()"
])
msg_id, content = execute(km=km, code=code)
stdout, stderr = assemble_output(iopub)
nt.assert_equal(stdout, '')
nt.assert_equal(stderr, '')
_check_mp_mode(km, expected=False)
_check_mp_mode(km, expected=False, stream="stderr")
@dec.knownfailureif(sys.platform == 'win32', "subprocess prints fail on Windows")
def test_subprocess_error():
"""error in mp.Process doesn't crash"""
with new_kernel() as km:
iopub = km.iopub_channel
code = '\n'.join([
"import multiprocessing as mp",
"p = mp.Process(target=int, args=('hi',))",
"p.start()",
"p.join()",
])
msg_id, content = execute(km=km, code=code)
stdout, stderr = assemble_output(iopub)
nt.assert_equal(stdout, '')
nt.assert_true("ValueError" in stderr, stderr)
_check_mp_mode(km, expected=False)
_check_mp_mode(km, expected=False, stream="stderr")