test_kernel.py
170 lines
| 5.8 KiB
| text/x-python
|
PythonLexer
MinRK
|
r9438 | """test the IPython Kernel""" | ||
#------------------------------------------------------------------------------- | ||||
# Copyright (C) 2013 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#------------------------------------------------------------------------------- | ||||
#------------------------------------------------------------------------------- | ||||
# Imports | ||||
#------------------------------------------------------------------------------- | ||||
MinRK
|
r9449 | import sys | ||
MinRK
|
r9438 | |||
import nose.tools as nt | ||||
MinRK
|
r12357 | from IPython.testing import decorators as dec, tools as tt | ||
MinRK
|
r12414 | from IPython.utils import py3compat | ||
from .utils import new_kernel, kernel, TIMEOUT, assemble_output, execute, flush_channels | ||||
MinRK
|
r9438 | |||
#------------------------------------------------------------------------------- | ||||
# Tests | ||||
#------------------------------------------------------------------------------- | ||||
MinRK
|
r12318 | def _check_mp_mode(kc, expected=False, stream="stdout"): | ||
execute(kc=kc, code="import sys") | ||||
flush_channels(kc) | ||||
msg_id, content = execute(kc=kc, code="print (sys.%s._check_mp_mode())" % stream) | ||||
stdout, stderr = assemble_output(kc.iopub_channel) | ||||
MinRK
|
r9438 | nt.assert_equal(eval(stdout.strip()), expected) | ||
MinRK
|
r12322 | # printing tests | ||
MinRK
|
r9438 | def test_simple_print(): | ||
"""simple print statement in kernel""" | ||||
MinRK
|
r12414 | with kernel() as kc: | ||
MinRK
|
r12318 | iopub = kc.iopub_channel | ||
msg_id, content = execute(kc=kc, code="print ('hi')") | ||||
MinRK
|
r9438 | stdout, stderr = assemble_output(iopub) | ||
MinRK
|
r9442 | nt.assert_equal(stdout, 'hi\n') | ||
nt.assert_equal(stderr, '') | ||||
MinRK
|
r12318 | _check_mp_mode(kc, expected=False) | ||
MinRK
|
r9438 | |||
MinRK
|
r9449 | @dec.knownfailureif(sys.platform == 'win32', "subprocess prints fail on Windows") | ||
MinRK
|
r9438 | def test_subprocess_print(): | ||
"""printing from forked mp.Process""" | ||||
MinRK
|
r12318 | with new_kernel() as kc: | ||
iopub = kc.iopub_channel | ||||
MinRK
|
r9438 | |||
MinRK
|
r12318 | _check_mp_mode(kc, expected=False) | ||
flush_channels(kc) | ||||
MinRK
|
r9438 | np = 5 | ||
code = '\n'.join([ | ||||
MinRK
|
r9446 | "from __future__ import print_function", | ||
MinRK
|
r9438 | "import multiprocessing as mp", | ||
MinRK
|
r9446 | "pool = [mp.Process(target=print, args=('hello', i,)) for i in range(%i)]" % np, | ||
MinRK
|
r9438 | "for p in pool: p.start()", | ||
"for p in pool: p.join()" | ||||
]) | ||||
expected = '\n'.join([ | ||||
"hello %s" % i for i in range(np) | ||||
]) + '\n' | ||||
MinRK
|
r12318 | msg_id, content = execute(kc=kc, code=code) | ||
MinRK
|
r9438 | stdout, stderr = assemble_output(iopub) | ||
MinRK
|
r9442 | nt.assert_equal(stdout.count("hello"), np, stdout) | ||
MinRK
|
r9438 | for n in range(np): | ||
MinRK
|
r9442 | nt.assert_equal(stdout.count(str(n)), 1, stdout) | ||
nt.assert_equal(stderr, '') | ||||
MinRK
|
r12318 | _check_mp_mode(kc, expected=False) | ||
_check_mp_mode(kc, expected=False, stream="stderr") | ||||
MinRK
|
r9438 | |||
def test_subprocess_noprint(): | ||||
"""mp.Process without print doesn't trigger iostream mp_mode""" | ||||
MinRK
|
r12414 | with kernel() as kc: | ||
MinRK
|
r12318 | iopub = kc.iopub_channel | ||
MinRK
|
r9438 | |||
np = 5 | ||||
code = '\n'.join([ | ||||
"import multiprocessing as mp", | ||||
MinRK
|
r9449 | "pool = [mp.Process(target=range, args=(i,)) for i in range(%i)]" % np, | ||
MinRK
|
r9438 | "for p in pool: p.start()", | ||
"for p in pool: p.join()" | ||||
]) | ||||
MinRK
|
r12318 | msg_id, content = execute(kc=kc, code=code) | ||
MinRK
|
r9438 | stdout, stderr = assemble_output(iopub) | ||
MinRK
|
r9442 | nt.assert_equal(stdout, '') | ||
nt.assert_equal(stderr, '') | ||||
MinRK
|
r12318 | _check_mp_mode(kc, expected=False) | ||
_check_mp_mode(kc, expected=False, stream="stderr") | ||||
MinRK
|
r9438 | |||
MinRK
|
r9449 | @dec.knownfailureif(sys.platform == 'win32', "subprocess prints fail on Windows") | ||
MinRK
|
r9441 | def test_subprocess_error(): | ||
"""error in mp.Process doesn't crash""" | ||||
MinRK
|
r12318 | with new_kernel() as kc: | ||
iopub = kc.iopub_channel | ||||
MinRK
|
r9441 | |||
code = '\n'.join([ | ||||
"import multiprocessing as mp", | ||||
MinRK
|
r9446 | "p = mp.Process(target=int, args=('hi',))", | ||
MinRK
|
r9441 | "p.start()", | ||
"p.join()", | ||||
]) | ||||
MinRK
|
r12318 | msg_id, content = execute(kc=kc, code=code) | ||
MinRK
|
r9441 | stdout, stderr = assemble_output(iopub) | ||
MinRK
|
r9442 | nt.assert_equal(stdout, '') | ||
MinRK
|
r9446 | nt.assert_true("ValueError" in stderr, stderr) | ||
MinRK
|
r9441 | |||
MinRK
|
r12318 | _check_mp_mode(kc, expected=False) | ||
_check_mp_mode(kc, expected=False, stream="stderr") | ||||
MinRK
|
r9441 | |||
MinRK
|
r12322 | # raw_input tests | ||
def test_raw_input(): | ||||
"""test [raw_]input""" | ||||
MinRK
|
r12414 | with kernel() as kc: | ||
MinRK
|
r12322 | iopub = kc.iopub_channel | ||
input_f = "input" if py3compat.PY3 else "raw_input" | ||||
theprompt = "prompt> " | ||||
code = 'print({input_f}("{theprompt}"))'.format(**locals()) | ||||
msg_id = kc.execute(code, allow_stdin=True) | ||||
msg = kc.get_stdin_msg(block=True, timeout=TIMEOUT) | ||||
nt.assert_equal(msg['header']['msg_type'], u'input_request') | ||||
content = msg['content'] | ||||
nt.assert_equal(content['prompt'], theprompt) | ||||
text = "some text" | ||||
kc.input(text) | ||||
reply = kc.get_shell_msg(block=True, timeout=TIMEOUT) | ||||
nt.assert_equal(reply['content']['status'], 'ok') | ||||
stdout, stderr = assemble_output(iopub) | ||||
nt.assert_equal(stdout, text + "\n") | ||||
@dec.skipif(py3compat.PY3) | ||||
def test_eval_input(): | ||||
"""test input() on Python 2""" | ||||
MinRK
|
r12414 | with kernel() as kc: | ||
MinRK
|
r12322 | iopub = kc.iopub_channel | ||
input_f = "input" if py3compat.PY3 else "raw_input" | ||||
theprompt = "prompt> " | ||||
code = 'print(input("{theprompt}"))'.format(**locals()) | ||||
msg_id = kc.execute(code, allow_stdin=True) | ||||
msg = kc.get_stdin_msg(block=True, timeout=TIMEOUT) | ||||
nt.assert_equal(msg['header']['msg_type'], u'input_request') | ||||
content = msg['content'] | ||||
nt.assert_equal(content['prompt'], theprompt) | ||||
kc.input("1+1") | ||||
reply = kc.get_shell_msg(block=True, timeout=TIMEOUT) | ||||
nt.assert_equal(reply['content']['status'], 'ok') | ||||
stdout, stderr = assemble_output(iopub) | ||||
nt.assert_equal(stdout, "2\n") | ||||
MinRK
|
r12357 | |||
def test_help_output(): | ||||
"""ipython kernel --help-all works""" | ||||
tt.help_all_output_test('kernel') | ||||