test_kernel.py
223 lines
| 8.0 KiB
| text/x-python
|
PythonLexer
Thomas Kluyver
|
r13892 | # coding: utf-8 | |
MinRK
|
r9438 | """test the IPython Kernel""" | |
#------------------------------------------------------------------------------- | |||
# Copyright (C) 2013 The IPython Development Team | |||
# | |||
# Distributed under the terms of the BSD License. The full license is in | |||
# the file COPYING, distributed as part of this software. | |||
#------------------------------------------------------------------------------- | |||
#------------------------------------------------------------------------------- | |||
# Imports | |||
#------------------------------------------------------------------------------- | |||
Thomas Kluyver
|
r13892 | import io | |
import os.path | |||
MinRK
|
r9449 | import sys | |
MinRK
|
r9438 | ||
import nose.tools as nt | |||
MinRK
|
r12357 | from IPython.testing import decorators as dec, tools as tt | |
MinRK
|
r12414 | from IPython.utils import py3compat | |
MinRK
|
r12853 | from IPython.utils.path import locate_profile | |
Thomas Kluyver
|
r13892 | from IPython.utils.tempdir import TemporaryDirectory | |
MinRK
|
r12414 | ||
Thomas Kluyver
|
r13892 | from .utils import (new_kernel, kernel, TIMEOUT, assemble_output, execute, | |
flush_channels, wait_for_idle) | |||
MinRK
|
r9438 | ||
#------------------------------------------------------------------------------- | |||
# Tests | |||
#------------------------------------------------------------------------------- | |||
MinRK
|
r12318 | def _check_mp_mode(kc, expected=False, stream="stdout"): | |
execute(kc=kc, code="import sys") | |||
flush_channels(kc) | |||
msg_id, content = execute(kc=kc, code="print (sys.%s._check_mp_mode())" % stream) | |||
stdout, stderr = assemble_output(kc.iopub_channel) | |||
MinRK
|
r9438 | nt.assert_equal(eval(stdout.strip()), expected) | |
MinRK
|
r12322 | # printing tests | |
MinRK
|
r9438 | def test_simple_print(): | |
"""simple print statement in kernel""" | |||
MinRK
|
r12414 | with kernel() as kc: | |
MinRK
|
r12318 | iopub = kc.iopub_channel | |
msg_id, content = execute(kc=kc, code="print ('hi')") | |||
MinRK
|
r9438 | stdout, stderr = assemble_output(iopub) | |
MinRK
|
r9442 | nt.assert_equal(stdout, 'hi\n') | |
nt.assert_equal(stderr, '') | |||
MinRK
|
r12318 | _check_mp_mode(kc, expected=False) | |
MinRK
|
r9438 | ||
MinRK
|
r12853 | def test_sys_path(): | |
"""test that sys.path doesn't get messed up by default""" | |||
with kernel() as kc: | |||
msg_id, content = execute(kc=kc, code="import sys; print (repr(sys.path[0]))") | |||
stdout, stderr = assemble_output(kc.iopub_channel) | |||
nt.assert_equal(stdout, "''\n") | |||
def test_sys_path_profile_dir(): | |||
"""test that sys.path doesn't get messed up when `--profile-dir` is specified""" | |||
with new_kernel(['--profile-dir', locate_profile('default')]) as kc: | |||
msg_id, content = execute(kc=kc, code="import sys; print (repr(sys.path[0]))") | |||
stdout, stderr = assemble_output(kc.iopub_channel) | |||
nt.assert_equal(stdout, "''\n") | |||
MinRK
|
r9449 | @dec.knownfailureif(sys.platform == 'win32', "subprocess prints fail on Windows") | |
MinRK
|
r9438 | def test_subprocess_print(): | |
"""printing from forked mp.Process""" | |||
MinRK
|
r12318 | with new_kernel() as kc: | |
iopub = kc.iopub_channel | |||
MinRK
|
r9438 | ||
MinRK
|
r12318 | _check_mp_mode(kc, expected=False) | |
flush_channels(kc) | |||
MinRK
|
r9438 | np = 5 | |
code = '\n'.join([ | |||
MinRK
|
r9446 | "from __future__ import print_function", | |
MinRK
|
r9438 | "import multiprocessing as mp", | |
MinRK
|
r9446 | "pool = [mp.Process(target=print, args=('hello', i,)) for i in range(%i)]" % np, | |
MinRK
|
r9438 | "for p in pool: p.start()", | |
"for p in pool: p.join()" | |||
]) | |||
expected = '\n'.join([ | |||
"hello %s" % i for i in range(np) | |||
]) + '\n' | |||
MinRK
|
r12318 | msg_id, content = execute(kc=kc, code=code) | |
MinRK
|
r9438 | stdout, stderr = assemble_output(iopub) | |
MinRK
|
r9442 | nt.assert_equal(stdout.count("hello"), np, stdout) | |
MinRK
|
r9438 | for n in range(np): | |
MinRK
|
r9442 | nt.assert_equal(stdout.count(str(n)), 1, stdout) | |
nt.assert_equal(stderr, '') | |||
MinRK
|
r12318 | _check_mp_mode(kc, expected=False) | |
_check_mp_mode(kc, expected=False, stream="stderr") | |||
MinRK
|
r9438 | ||
def test_subprocess_noprint(): | |||
"""mp.Process without print doesn't trigger iostream mp_mode""" | |||
MinRK
|
r12414 | with kernel() as kc: | |
MinRK
|
r12318 | iopub = kc.iopub_channel | |
MinRK
|
r9438 | ||
np = 5 | |||
code = '\n'.join([ | |||
"import multiprocessing as mp", | |||
MinRK
|
r9449 | "pool = [mp.Process(target=range, args=(i,)) for i in range(%i)]" % np, | |
MinRK
|
r9438 | "for p in pool: p.start()", | |
"for p in pool: p.join()" | |||
]) | |||
MinRK
|
r12318 | msg_id, content = execute(kc=kc, code=code) | |
MinRK
|
r9438 | stdout, stderr = assemble_output(iopub) | |
MinRK
|
r9442 | nt.assert_equal(stdout, '') | |
nt.assert_equal(stderr, '') | |||
MinRK
|
r12318 | _check_mp_mode(kc, expected=False) | |
_check_mp_mode(kc, expected=False, stream="stderr") | |||
MinRK
|
r9438 | ||
MinRK
|
r9449 | @dec.knownfailureif(sys.platform == 'win32', "subprocess prints fail on Windows") | |
MinRK
|
r9441 | def test_subprocess_error(): | |
"""error in mp.Process doesn't crash""" | |||
MinRK
|
r12318 | with new_kernel() as kc: | |
iopub = kc.iopub_channel | |||
MinRK
|
r9441 | ||
code = '\n'.join([ | |||
"import multiprocessing as mp", | |||
MinRK
|
r9446 | "p = mp.Process(target=int, args=('hi',))", | |
MinRK
|
r9441 | "p.start()", | |
"p.join()", | |||
]) | |||
MinRK
|
r12318 | msg_id, content = execute(kc=kc, code=code) | |
MinRK
|
r9441 | stdout, stderr = assemble_output(iopub) | |
MinRK
|
r9442 | nt.assert_equal(stdout, '') | |
MinRK
|
r9446 | nt.assert_true("ValueError" in stderr, stderr) | |
MinRK
|
r9441 | ||
MinRK
|
r12318 | _check_mp_mode(kc, expected=False) | |
_check_mp_mode(kc, expected=False, stream="stderr") | |||
MinRK
|
r9441 | ||
MinRK
|
r12322 | # raw_input tests | |
def test_raw_input(): | |||
"""test [raw_]input""" | |||
MinRK
|
r12414 | with kernel() as kc: | |
MinRK
|
r12322 | iopub = kc.iopub_channel | |
input_f = "input" if py3compat.PY3 else "raw_input" | |||
theprompt = "prompt> " | |||
code = 'print({input_f}("{theprompt}"))'.format(**locals()) | |||
msg_id = kc.execute(code, allow_stdin=True) | |||
msg = kc.get_stdin_msg(block=True, timeout=TIMEOUT) | |||
nt.assert_equal(msg['header']['msg_type'], u'input_request') | |||
content = msg['content'] | |||
nt.assert_equal(content['prompt'], theprompt) | |||
text = "some text" | |||
kc.input(text) | |||
reply = kc.get_shell_msg(block=True, timeout=TIMEOUT) | |||
nt.assert_equal(reply['content']['status'], 'ok') | |||
stdout, stderr = assemble_output(iopub) | |||
nt.assert_equal(stdout, text + "\n") | |||
@dec.skipif(py3compat.PY3) | |||
def test_eval_input(): | |||
"""test input() on Python 2""" | |||
MinRK
|
r12414 | with kernel() as kc: | |
MinRK
|
r12322 | iopub = kc.iopub_channel | |
input_f = "input" if py3compat.PY3 else "raw_input" | |||
theprompt = "prompt> " | |||
code = 'print(input("{theprompt}"))'.format(**locals()) | |||
msg_id = kc.execute(code, allow_stdin=True) | |||
msg = kc.get_stdin_msg(block=True, timeout=TIMEOUT) | |||
nt.assert_equal(msg['header']['msg_type'], u'input_request') | |||
content = msg['content'] | |||
nt.assert_equal(content['prompt'], theprompt) | |||
kc.input("1+1") | |||
reply = kc.get_shell_msg(block=True, timeout=TIMEOUT) | |||
nt.assert_equal(reply['content']['status'], 'ok') | |||
stdout, stderr = assemble_output(iopub) | |||
nt.assert_equal(stdout, "2\n") | |||
MinRK
|
r12357 | ||
Thomas Kluyver
|
r13892 | def test_save_history(): | |
# Saving history from the kernel with %hist -f was failing because of | |||
# unicode problems on Python 2. | |||
with kernel() as kc, TemporaryDirectory() as td: | |||
file = os.path.join(td, 'hist.out') | |||
execute(u'a=1', kc=kc) | |||
wait_for_idle(kc) | |||
execute(u'b=u"abcþ"', kc=kc) | |||
wait_for_idle(kc) | |||
_, reply = execute("%hist -f " + file, kc=kc) | |||
nt.assert_equal(reply['status'], 'ok') | |||
with io.open(file, encoding='utf-8') as f: | |||
content = f.read() | |||
nt.assert_in(u'a=1', content) | |||
nt.assert_in(u'b=u"abcþ"', content) | |||
MinRK
|
r12357 | def test_help_output(): | |
"""ipython kernel --help-all works""" | |||
tt.help_all_output_test('kernel') | |||
Thomas Kluyver
|
r17624 | def test_is_complete(): | |
with kernel() as kc: | |||
# There are more test cases for this in core - here we just check | |||
# that the kernel exposes the interface correctly. | |||
kc.is_complete('2+2') | |||
reply = kc.get_shell_msg(block=True, timeout=TIMEOUT) | |||
Thomas Kluyver
|
r17804 | assert reply['content']['status'] == 'complete' | |
Thomas Kluyver
|
r17624 | ||
# SyntaxError should mean it's complete | |||
kc.is_complete('raise = 2') | |||
reply = kc.get_shell_msg(block=True, timeout=TIMEOUT) | |||
Thomas Kluyver
|
r17804 | assert reply['content']['status'] == 'invalid' | |
Thomas Kluyver
|
r17624 | ||
kc.is_complete('a = [1,\n2,') | |||
reply = kc.get_shell_msg(block=True, timeout=TIMEOUT) | |||
Thomas Kluyver
|
r17804 | assert reply['content']['status'] == 'incomplete' | |
assert reply['content']['indent'] == '' |