|
|
"""Generic testing tools.
|
|
|
|
|
|
Authors
|
|
|
-------
|
|
|
- Fernando Perez <Fernando.Perez@berkeley.edu>
|
|
|
"""
|
|
|
|
|
|
from __future__ import absolute_import
|
|
|
|
|
|
#-----------------------------------------------------------------------------
|
|
|
# Copyright (C) 2009 The IPython Development Team
|
|
|
#
|
|
|
# Distributed under the terms of the BSD License. The full license is in
|
|
|
# the file COPYING, distributed as part of this software.
|
|
|
#-----------------------------------------------------------------------------
|
|
|
|
|
|
#-----------------------------------------------------------------------------
|
|
|
# Imports
|
|
|
#-----------------------------------------------------------------------------
|
|
|
|
|
|
import os
|
|
|
import re
|
|
|
import sys
|
|
|
import tempfile
|
|
|
|
|
|
from contextlib import contextmanager
|
|
|
from io import StringIO
|
|
|
from subprocess import Popen, PIPE
|
|
|
|
|
|
try:
|
|
|
# These tools are used by parts of the runtime, so we make the nose
|
|
|
# dependency optional at this point. Nose is a hard dependency to run the
|
|
|
# test suite, but NOT to use ipython itself.
|
|
|
import nose.tools as nt
|
|
|
has_nose = True
|
|
|
except ImportError:
|
|
|
has_nose = False
|
|
|
|
|
|
from IPython.config.loader import Config
|
|
|
from IPython.utils.process import get_output_error_code
|
|
|
from IPython.utils.text import list_strings
|
|
|
from IPython.utils.io import temp_pyfile, Tee
|
|
|
from IPython.utils import py3compat
|
|
|
from IPython.utils.encoding import DEFAULT_ENCODING
|
|
|
|
|
|
from . import decorators as dec
|
|
|
from . import skipdoctest
|
|
|
|
|
|
#-----------------------------------------------------------------------------
|
|
|
# Functions and classes
|
|
|
#-----------------------------------------------------------------------------
|
|
|
|
|
|
# The docstring for full_path doctests differently on win32 (different path
|
|
|
# separator) so just skip the doctest there. The example remains informative.
|
|
|
doctest_deco = skipdoctest.skip_doctest if sys.platform == 'win32' else dec.null_deco
|
|
|
|
|
|
@doctest_deco
|
|
|
def full_path(startPath,files):
|
|
|
"""Make full paths for all the listed files, based on startPath.
|
|
|
|
|
|
Only the base part of startPath is kept, since this routine is typically
|
|
|
used with a script's ``__file__`` variable as startPath. The base of startPath
|
|
|
is then prepended to all the listed files, forming the output list.
|
|
|
|
|
|
Parameters
|
|
|
----------
|
|
|
startPath : string
|
|
|
Initial path to use as the base for the results. This path is split
|
|
|
using os.path.split() and only its first component is kept.
|
|
|
|
|
|
files : string or list
|
|
|
One or more files.
|
|
|
|
|
|
Examples
|
|
|
--------
|
|
|
|
|
|
>>> full_path('/foo/bar.py',['a.txt','b.txt'])
|
|
|
['/foo/a.txt', '/foo/b.txt']
|
|
|
|
|
|
>>> full_path('/foo',['a.txt','b.txt'])
|
|
|
['/a.txt', '/b.txt']
|
|
|
|
|
|
If a single file is given, the output is still a list::
|
|
|
|
|
|
>>> full_path('/foo','a.txt')
|
|
|
['/a.txt']
|
|
|
"""
|
|
|
|
|
|
files = list_strings(files)
|
|
|
base = os.path.split(startPath)[0]
|
|
|
return [ os.path.join(base,f) for f in files ]
|
|
|
|
|
|
|
|
|
def parse_test_output(txt):
|
|
|
"""Parse the output of a test run and return errors, failures.
|
|
|
|
|
|
Parameters
|
|
|
----------
|
|
|
txt : str
|
|
|
Text output of a test run, assumed to contain a line of one of the
|
|
|
following forms::
|
|
|
|
|
|
'FAILED (errors=1)'
|
|
|
'FAILED (failures=1)'
|
|
|
'FAILED (errors=1, failures=1)'
|
|
|
|
|
|
Returns
|
|
|
-------
|
|
|
nerr, nfail
|
|
|
number of errors and failures.
|
|
|
"""
|
|
|
|
|
|
err_m = re.search(r'^FAILED \(errors=(\d+)\)', txt, re.MULTILINE)
|
|
|
if err_m:
|
|
|
nerr = int(err_m.group(1))
|
|
|
nfail = 0
|
|
|
return nerr, nfail
|
|
|
|
|
|
fail_m = re.search(r'^FAILED \(failures=(\d+)\)', txt, re.MULTILINE)
|
|
|
if fail_m:
|
|
|
nerr = 0
|
|
|
nfail = int(fail_m.group(1))
|
|
|
return nerr, nfail
|
|
|
|
|
|
both_m = re.search(r'^FAILED \(errors=(\d+), failures=(\d+)\)', txt,
|
|
|
re.MULTILINE)
|
|
|
if both_m:
|
|
|
nerr = int(both_m.group(1))
|
|
|
nfail = int(both_m.group(2))
|
|
|
return nerr, nfail
|
|
|
|
|
|
# If the input didn't match any of these forms, assume no error/failures
|
|
|
return 0, 0
|
|
|
|
|
|
|
|
|
# So nose doesn't think this is a test
|
|
|
parse_test_output.__test__ = False
|
|
|
|
|
|
|
|
|
def default_argv():
|
|
|
"""Return a valid default argv for creating testing instances of ipython"""
|
|
|
|
|
|
return ['--quick', # so no config file is loaded
|
|
|
# Other defaults to minimize side effects on stdout
|
|
|
'--colors=NoColor', '--no-term-title','--no-banner',
|
|
|
'--autocall=0']
|
|
|
|
|
|
|
|
|
def default_config():
|
|
|
"""Return a config object with good defaults for testing."""
|
|
|
config = Config()
|
|
|
config.TerminalInteractiveShell.colors = 'NoColor'
|
|
|
config.TerminalTerminalInteractiveShell.term_title = False,
|
|
|
config.TerminalInteractiveShell.autocall = 0
|
|
|
f = tempfile.NamedTemporaryFile(suffix=u'test_hist.sqlite', delete=False)
|
|
|
config.HistoryManager.hist_file = f.name
|
|
|
f.close()
|
|
|
config.HistoryManager.db_cache_size = 10000
|
|
|
return config
|
|
|
|
|
|
|
|
|
def get_ipython_cmd(as_string=False):
|
|
|
"""
|
|
|
Return appropriate IPython command line name. By default, this will return
|
|
|
a list that can be used with subprocess.Popen, for example, but passing
|
|
|
`as_string=True` allows for returning the IPython command as a string.
|
|
|
|
|
|
Parameters
|
|
|
----------
|
|
|
as_string: bool
|
|
|
Flag to allow to return the command as a string.
|
|
|
"""
|
|
|
ipython_cmd = [sys.executable, "-m", "IPython"]
|
|
|
|
|
|
if as_string:
|
|
|
ipython_cmd = " ".join(ipython_cmd)
|
|
|
|
|
|
return ipython_cmd
|
|
|
|
|
|
def ipexec(fname, options=None, commands=()):
|
|
|
"""Utility to call 'ipython filename'.
|
|
|
|
|
|
Starts IPython with a minimal and safe configuration to make startup as fast
|
|
|
as possible.
|
|
|
|
|
|
Note that this starts IPython in a subprocess!
|
|
|
|
|
|
Parameters
|
|
|
----------
|
|
|
fname : str
|
|
|
Name of file to be executed (should have .py or .ipy extension).
|
|
|
|
|
|
options : optional, list
|
|
|
Extra command-line flags to be passed to IPython.
|
|
|
|
|
|
commands : optional, list
|
|
|
Commands to send in on stdin
|
|
|
|
|
|
Returns
|
|
|
-------
|
|
|
(stdout, stderr) of ipython subprocess.
|
|
|
"""
|
|
|
if options is None: options = []
|
|
|
|
|
|
# For these subprocess calls, eliminate all prompt printing so we only see
|
|
|
# output from script execution
|
|
|
prompt_opts = [ '--PromptManager.in_template=""',
|
|
|
'--PromptManager.in2_template=""',
|
|
|
'--PromptManager.out_template=""'
|
|
|
]
|
|
|
cmdargs = default_argv() + prompt_opts + options
|
|
|
|
|
|
test_dir = os.path.dirname(__file__)
|
|
|
|
|
|
ipython_cmd = get_ipython_cmd()
|
|
|
# Absolute path for filename
|
|
|
full_fname = os.path.join(test_dir, fname)
|
|
|
full_cmd = ipython_cmd + cmdargs + [full_fname]
|
|
|
env = os.environ.copy()
|
|
|
env.pop('PYTHONWARNINGS', None) # Avoid extraneous warnings appearing on stderr
|
|
|
for k, v in env.items():
|
|
|
# Debug a bizarre failure we've seen on Windows:
|
|
|
# TypeError: environment can only contain strings
|
|
|
if not isinstance(v, str):
|
|
|
print(k, v)
|
|
|
p = Popen(full_cmd, stdout=PIPE, stderr=PIPE, stdin=PIPE, env=env)
|
|
|
out, err = p.communicate(input=py3compat.str_to_bytes('\n'.join(commands)) or None)
|
|
|
out, err = py3compat.bytes_to_str(out), py3compat.bytes_to_str(err)
|
|
|
# `import readline` causes 'ESC[?1034h' to be output sometimes,
|
|
|
# so strip that out before doing comparisons
|
|
|
if out:
|
|
|
out = re.sub(r'\x1b\[[^h]+h', '', out)
|
|
|
return out, err
|
|
|
|
|
|
|
|
|
def ipexec_validate(fname, expected_out, expected_err='',
|
|
|
options=None, commands=()):
|
|
|
"""Utility to call 'ipython filename' and validate output/error.
|
|
|
|
|
|
This function raises an AssertionError if the validation fails.
|
|
|
|
|
|
Note that this starts IPython in a subprocess!
|
|
|
|
|
|
Parameters
|
|
|
----------
|
|
|
fname : str
|
|
|
Name of the file to be executed (should have .py or .ipy extension).
|
|
|
|
|
|
expected_out : str
|
|
|
Expected stdout of the process.
|
|
|
|
|
|
expected_err : optional, str
|
|
|
Expected stderr of the process.
|
|
|
|
|
|
options : optional, list
|
|
|
Extra command-line flags to be passed to IPython.
|
|
|
|
|
|
Returns
|
|
|
-------
|
|
|
None
|
|
|
"""
|
|
|
|
|
|
import nose.tools as nt
|
|
|
|
|
|
out, err = ipexec(fname, options, commands)
|
|
|
#print 'OUT', out # dbg
|
|
|
#print 'ERR', err # dbg
|
|
|
# If there are any errors, we must check those befor stdout, as they may be
|
|
|
# more informative than simply having an empty stdout.
|
|
|
if err:
|
|
|
if expected_err:
|
|
|
nt.assert_equal("\n".join(err.strip().splitlines()), "\n".join(expected_err.strip().splitlines()))
|
|
|
else:
|
|
|
raise ValueError('Running file %r produced error: %r' %
|
|
|
(fname, err))
|
|
|
# If no errors or output on stderr was expected, match stdout
|
|
|
nt.assert_equal("\n".join(out.strip().splitlines()), "\n".join(expected_out.strip().splitlines()))
|
|
|
|
|
|
|
|
|
class TempFileMixin(object):
|
|
|
"""Utility class to create temporary Python/IPython files.
|
|
|
|
|
|
Meant as a mixin class for test cases."""
|
|
|
|
|
|
def mktmp(self, src, ext='.py'):
|
|
|
"""Make a valid python temp file."""
|
|
|
fname, f = temp_pyfile(src, ext)
|
|
|
self.tmpfile = f
|
|
|
self.fname = fname
|
|
|
|
|
|
def tearDown(self):
|
|
|
if hasattr(self, 'tmpfile'):
|
|
|
# If the tmpfile wasn't made because of skipped tests, like in
|
|
|
# win32, there's nothing to cleanup.
|
|
|
self.tmpfile.close()
|
|
|
try:
|
|
|
os.unlink(self.fname)
|
|
|
except:
|
|
|
# On Windows, even though we close the file, we still can't
|
|
|
# delete it. I have no clue why
|
|
|
pass
|
|
|
|
|
|
pair_fail_msg = ("Testing {0}\n\n"
|
|
|
"In:\n"
|
|
|
" {1!r}\n"
|
|
|
"Expected:\n"
|
|
|
" {2!r}\n"
|
|
|
"Got:\n"
|
|
|
" {3!r}\n")
|
|
|
def check_pairs(func, pairs):
|
|
|
"""Utility function for the common case of checking a function with a
|
|
|
sequence of input/output pairs.
|
|
|
|
|
|
Parameters
|
|
|
----------
|
|
|
func : callable
|
|
|
The function to be tested. Should accept a single argument.
|
|
|
pairs : iterable
|
|
|
A list of (input, expected_output) tuples.
|
|
|
|
|
|
Returns
|
|
|
-------
|
|
|
None. Raises an AssertionError if any output does not match the expected
|
|
|
value.
|
|
|
"""
|
|
|
name = getattr(func, "func_name", getattr(func, "__name__", "<unknown>"))
|
|
|
for inp, expected in pairs:
|
|
|
out = func(inp)
|
|
|
assert out == expected, pair_fail_msg.format(name, inp, expected, out)
|
|
|
|
|
|
|
|
|
if py3compat.PY3:
|
|
|
MyStringIO = StringIO
|
|
|
else:
|
|
|
# In Python 2, stdout/stderr can have either bytes or unicode written to them,
|
|
|
# so we need a class that can handle both.
|
|
|
class MyStringIO(StringIO):
|
|
|
def write(self, s):
|
|
|
s = py3compat.cast_unicode(s, encoding=DEFAULT_ENCODING)
|
|
|
super(MyStringIO, self).write(s)
|
|
|
|
|
|
_re_type = type(re.compile(r''))
|
|
|
|
|
|
notprinted_msg = """Did not find {0!r} in printed output (on {1}):
|
|
|
-------
|
|
|
{2!s}
|
|
|
-------
|
|
|
"""
|
|
|
|
|
|
class AssertPrints(object):
|
|
|
"""Context manager for testing that code prints certain text.
|
|
|
|
|
|
Examples
|
|
|
--------
|
|
|
>>> with AssertPrints("abc", suppress=False):
|
|
|
... print("abcd")
|
|
|
... print("def")
|
|
|
...
|
|
|
abcd
|
|
|
def
|
|
|
"""
|
|
|
def __init__(self, s, channel='stdout', suppress=True):
|
|
|
self.s = s
|
|
|
if isinstance(self.s, (py3compat.string_types, _re_type)):
|
|
|
self.s = [self.s]
|
|
|
self.channel = channel
|
|
|
self.suppress = suppress
|
|
|
|
|
|
def __enter__(self):
|
|
|
self.orig_stream = getattr(sys, self.channel)
|
|
|
self.buffer = MyStringIO()
|
|
|
self.tee = Tee(self.buffer, channel=self.channel)
|
|
|
setattr(sys, self.channel, self.buffer if self.suppress else self.tee)
|
|
|
|
|
|
def __exit__(self, etype, value, traceback):
|
|
|
try:
|
|
|
if value is not None:
|
|
|
# If an error was raised, don't check anything else
|
|
|
return False
|
|
|
self.tee.flush()
|
|
|
setattr(sys, self.channel, self.orig_stream)
|
|
|
printed = self.buffer.getvalue()
|
|
|
for s in self.s:
|
|
|
if isinstance(s, _re_type):
|
|
|
assert s.search(printed), notprinted_msg.format(s.pattern, self.channel, printed)
|
|
|
else:
|
|
|
assert s in printed, notprinted_msg.format(s, self.channel, printed)
|
|
|
return False
|
|
|
finally:
|
|
|
self.tee.close()
|
|
|
|
|
|
printed_msg = """Found {0!r} in printed output (on {1}):
|
|
|
-------
|
|
|
{2!s}
|
|
|
-------
|
|
|
"""
|
|
|
|
|
|
class AssertNotPrints(AssertPrints):
|
|
|
"""Context manager for checking that certain output *isn't* produced.
|
|
|
|
|
|
Counterpart of AssertPrints"""
|
|
|
def __exit__(self, etype, value, traceback):
|
|
|
try:
|
|
|
if value is not None:
|
|
|
# If an error was raised, don't check anything else
|
|
|
self.tee.close()
|
|
|
return False
|
|
|
self.tee.flush()
|
|
|
setattr(sys, self.channel, self.orig_stream)
|
|
|
printed = self.buffer.getvalue()
|
|
|
for s in self.s:
|
|
|
if isinstance(s, _re_type):
|
|
|
assert not s.search(printed),printed_msg.format(
|
|
|
s.pattern, self.channel, printed)
|
|
|
else:
|
|
|
assert s not in printed, printed_msg.format(
|
|
|
s, self.channel, printed)
|
|
|
return False
|
|
|
finally:
|
|
|
self.tee.close()
|
|
|
|
|
|
@contextmanager
|
|
|
def mute_warn():
|
|
|
from IPython.utils import warn
|
|
|
save_warn = warn.warn
|
|
|
warn.warn = lambda *a, **kw: None
|
|
|
try:
|
|
|
yield
|
|
|
finally:
|
|
|
warn.warn = save_warn
|
|
|
|
|
|
@contextmanager
|
|
|
def make_tempfile(name):
|
|
|
""" Create an empty, named, temporary file for the duration of the context.
|
|
|
"""
|
|
|
f = open(name, 'w')
|
|
|
f.close()
|
|
|
try:
|
|
|
yield
|
|
|
finally:
|
|
|
os.unlink(name)
|
|
|
|
|
|
|
|
|
@contextmanager
|
|
|
def monkeypatch(obj, name, attr):
|
|
|
"""
|
|
|
Context manager to replace attribute named `name` in `obj` with `attr`.
|
|
|
"""
|
|
|
orig = getattr(obj, name)
|
|
|
setattr(obj, name, attr)
|
|
|
yield
|
|
|
setattr(obj, name, orig)
|
|
|
|
|
|
|
|
|
def help_output_test(subcommand=''):
|
|
|
"""test that `ipython [subcommand] -h` works"""
|
|
|
cmd = get_ipython_cmd() + [subcommand, '-h']
|
|
|
out, err, rc = get_output_error_code(cmd)
|
|
|
nt.assert_equal(rc, 0, err)
|
|
|
nt.assert_not_in("Traceback", err)
|
|
|
nt.assert_in("Options", out)
|
|
|
nt.assert_in("--help-all", out)
|
|
|
return out, err
|
|
|
|
|
|
|
|
|
def help_all_output_test(subcommand=''):
|
|
|
"""test that `ipython [subcommand] --help-all` works"""
|
|
|
cmd = get_ipython_cmd() + [subcommand, '--help-all']
|
|
|
out, err, rc = get_output_error_code(cmd)
|
|
|
nt.assert_equal(rc, 0, err)
|
|
|
nt.assert_not_in("Traceback", err)
|
|
|
nt.assert_in("Options", out)
|
|
|
nt.assert_in("Class parameters", out)
|
|
|
return out, err
|
|
|
|
|
|
def assert_big_text_equal(a, b, chunk_size=80):
|
|
|
"""assert that large strings are equal
|
|
|
|
|
|
Zooms in on first chunk that differs,
|
|
|
to give better info than vanilla assertEqual for large text blobs.
|
|
|
"""
|
|
|
for i in range(0, len(a), chunk_size):
|
|
|
chunk_a = a[i:i + chunk_size]
|
|
|
chunk_b = b[i:i + chunk_size]
|
|
|
nt.assert_equal(chunk_a, chunk_b, "[offset: %i]\n%r != \n%r" % (
|
|
|
i, chunk_a, chunk_b))
|
|
|
|
|
|
if len(a) > len(b):
|
|
|
nt.fail("Length doesn't match (%i > %i). Extra text:\n%r" % (
|
|
|
len(a), len(b), a[len(b):]
|
|
|
))
|
|
|
elif len(a) < len(b):
|
|
|
nt.fail("Length doesn't match (%i < %i). Extra text:\n%r" % (
|
|
|
len(a), len(b), b[len(a):]
|
|
|
))
|
|
|
|