##// END OF EJS Templates
Merge pull request #4536 from minrk/msgspec5...
Merge pull request #4536 from minrk/msgspec5 Updating the message spec (finish IPEP 13, 24)

File last commit:

r13892:3cd52168
r16692:a2329cd4 merge
Show More
test_kernel.py
207 lines | 7.2 KiB | text/x-python | PythonLexer
# coding: utf-8
"""test the IPython Kernel"""
#-------------------------------------------------------------------------------
# Copyright (C) 2013 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
import io
import os.path
import sys
import nose.tools as nt
from IPython.testing import decorators as dec, tools as tt
from IPython.utils import py3compat
from IPython.utils.path import locate_profile
from IPython.utils.tempdir import TemporaryDirectory
from .utils import (new_kernel, kernel, TIMEOUT, assemble_output, execute,
flush_channels, wait_for_idle)
#-------------------------------------------------------------------------------
# Tests
#-------------------------------------------------------------------------------
def _check_mp_mode(kc, expected=False, stream="stdout"):
execute(kc=kc, code="import sys")
flush_channels(kc)
msg_id, content = execute(kc=kc, code="print (sys.%s._check_mp_mode())" % stream)
stdout, stderr = assemble_output(kc.iopub_channel)
nt.assert_equal(eval(stdout.strip()), expected)
# printing tests
def test_simple_print():
"""simple print statement in kernel"""
with kernel() as kc:
iopub = kc.iopub_channel
msg_id, content = execute(kc=kc, code="print ('hi')")
stdout, stderr = assemble_output(iopub)
nt.assert_equal(stdout, 'hi\n')
nt.assert_equal(stderr, '')
_check_mp_mode(kc, expected=False)
def test_sys_path():
"""test that sys.path doesn't get messed up by default"""
with kernel() as kc:
msg_id, content = execute(kc=kc, code="import sys; print (repr(sys.path[0]))")
stdout, stderr = assemble_output(kc.iopub_channel)
nt.assert_equal(stdout, "''\n")
def test_sys_path_profile_dir():
"""test that sys.path doesn't get messed up when `--profile-dir` is specified"""
with new_kernel(['--profile-dir', locate_profile('default')]) as kc:
msg_id, content = execute(kc=kc, code="import sys; print (repr(sys.path[0]))")
stdout, stderr = assemble_output(kc.iopub_channel)
nt.assert_equal(stdout, "''\n")
@dec.knownfailureif(sys.platform == 'win32', "subprocess prints fail on Windows")
def test_subprocess_print():
"""printing from forked mp.Process"""
with new_kernel() as kc:
iopub = kc.iopub_channel
_check_mp_mode(kc, expected=False)
flush_channels(kc)
np = 5
code = '\n'.join([
"from __future__ import print_function",
"import multiprocessing as mp",
"pool = [mp.Process(target=print, args=('hello', i,)) for i in range(%i)]" % np,
"for p in pool: p.start()",
"for p in pool: p.join()"
])
expected = '\n'.join([
"hello %s" % i for i in range(np)
]) + '\n'
msg_id, content = execute(kc=kc, code=code)
stdout, stderr = assemble_output(iopub)
nt.assert_equal(stdout.count("hello"), np, stdout)
for n in range(np):
nt.assert_equal(stdout.count(str(n)), 1, stdout)
nt.assert_equal(stderr, '')
_check_mp_mode(kc, expected=False)
_check_mp_mode(kc, expected=False, stream="stderr")
def test_subprocess_noprint():
"""mp.Process without print doesn't trigger iostream mp_mode"""
with kernel() as kc:
iopub = kc.iopub_channel
np = 5
code = '\n'.join([
"import multiprocessing as mp",
"pool = [mp.Process(target=range, args=(i,)) for i in range(%i)]" % np,
"for p in pool: p.start()",
"for p in pool: p.join()"
])
msg_id, content = execute(kc=kc, code=code)
stdout, stderr = assemble_output(iopub)
nt.assert_equal(stdout, '')
nt.assert_equal(stderr, '')
_check_mp_mode(kc, expected=False)
_check_mp_mode(kc, expected=False, stream="stderr")
@dec.knownfailureif(sys.platform == 'win32', "subprocess prints fail on Windows")
def test_subprocess_error():
"""error in mp.Process doesn't crash"""
with new_kernel() as kc:
iopub = kc.iopub_channel
code = '\n'.join([
"import multiprocessing as mp",
"p = mp.Process(target=int, args=('hi',))",
"p.start()",
"p.join()",
])
msg_id, content = execute(kc=kc, code=code)
stdout, stderr = assemble_output(iopub)
nt.assert_equal(stdout, '')
nt.assert_true("ValueError" in stderr, stderr)
_check_mp_mode(kc, expected=False)
_check_mp_mode(kc, expected=False, stream="stderr")
# raw_input tests
def test_raw_input():
"""test [raw_]input"""
with kernel() as kc:
iopub = kc.iopub_channel
input_f = "input" if py3compat.PY3 else "raw_input"
theprompt = "prompt> "
code = 'print({input_f}("{theprompt}"))'.format(**locals())
msg_id = kc.execute(code, allow_stdin=True)
msg = kc.get_stdin_msg(block=True, timeout=TIMEOUT)
nt.assert_equal(msg['header']['msg_type'], u'input_request')
content = msg['content']
nt.assert_equal(content['prompt'], theprompt)
text = "some text"
kc.input(text)
reply = kc.get_shell_msg(block=True, timeout=TIMEOUT)
nt.assert_equal(reply['content']['status'], 'ok')
stdout, stderr = assemble_output(iopub)
nt.assert_equal(stdout, text + "\n")
@dec.skipif(py3compat.PY3)
def test_eval_input():
"""test input() on Python 2"""
with kernel() as kc:
iopub = kc.iopub_channel
input_f = "input" if py3compat.PY3 else "raw_input"
theprompt = "prompt> "
code = 'print(input("{theprompt}"))'.format(**locals())
msg_id = kc.execute(code, allow_stdin=True)
msg = kc.get_stdin_msg(block=True, timeout=TIMEOUT)
nt.assert_equal(msg['header']['msg_type'], u'input_request')
content = msg['content']
nt.assert_equal(content['prompt'], theprompt)
kc.input("1+1")
reply = kc.get_shell_msg(block=True, timeout=TIMEOUT)
nt.assert_equal(reply['content']['status'], 'ok')
stdout, stderr = assemble_output(iopub)
nt.assert_equal(stdout, "2\n")
def test_save_history():
# Saving history from the kernel with %hist -f was failing because of
# unicode problems on Python 2.
with kernel() as kc, TemporaryDirectory() as td:
file = os.path.join(td, 'hist.out')
execute(u'a=1', kc=kc)
wait_for_idle(kc)
execute(u'b=u"abcþ"', kc=kc)
wait_for_idle(kc)
_, reply = execute("%hist -f " + file, kc=kc)
nt.assert_equal(reply['status'], 'ok')
with io.open(file, encoding='utf-8') as f:
content = f.read()
nt.assert_in(u'a=1', content)
nt.assert_in(u'b=u"abcþ"', content)
def test_help_output():
"""ipython kernel --help-all works"""
tt.help_all_output_test('kernel')