test_io.py
177 lines
| 5.5 KiB
| text/x-python
|
PythonLexer
Brian Granger
|
r2498 | # encoding: utf-8 | |
"""Tests for io.py""" | |||
#----------------------------------------------------------------------------- | |||
Matthias BUSSONNIER
|
r5390 | # Copyright (C) 2008-2011 The IPython Development Team | |
Brian Granger
|
r2498 | # | |
# Distributed under the terms of the BSD License. The full license is in | |||
# the file COPYING, distributed as part of this software. | |||
#----------------------------------------------------------------------------- | |||
#----------------------------------------------------------------------------- | |||
# Imports | |||
#----------------------------------------------------------------------------- | |||
Matthias BUSSONNIER
|
r7817 | from __future__ import print_function | |
Thomas Kluyver
|
r13690 | from __future__ import absolute_import | |
Brian Granger
|
r2498 | ||
Thomas Kluyver
|
r13690 | import io as stdlib_io | |
Thomas Kluyver
|
r17570 | import os.path | |
Thomas Kluyver
|
r17831 | import stat | |
Brian Granger
|
r2498 | import sys | |
MinRK
|
r3800 | from subprocess import Popen, PIPE | |
Thomas Kluyver
|
r12372 | import unittest | |
Brian Granger
|
r2498 | ||
import nose.tools as nt | |||
Thomas Kluyver
|
r13690 | from IPython.testing.decorators import skipif | |
Thomas Kluyver
|
r17570 | from IPython.utils.io import (Tee, capture_output, unicode_std_stream, | |
atomic_writing, | |||
) | |||
Thomas Kluyver
|
r13366 | from IPython.utils.py3compat import doctest_refactor_print, PY3 | |
Thomas Kluyver
|
r17570 | from IPython.utils.tempdir import TemporaryDirectory | |
Thomas Kluyver
|
r13366 | ||
if PY3: | |||
from io import StringIO | |||
else: | |||
from StringIO import StringIO | |||
Brian Granger
|
r2498 | ||
#----------------------------------------------------------------------------- | |||
# Tests | |||
#----------------------------------------------------------------------------- | |||
def test_tee_simple(): | |||
"Very simple check with stdout only" | |||
chan = StringIO() | |||
text = 'Hello' | |||
tee = Tee(chan, channel='stdout') | |||
Matthias BUSSONNIER
|
r7817 | print(text, file=chan) | |
Thomas Kluyver
|
r4891 | nt.assert_equal(chan.getvalue(), text+"\n") | |
Brian Granger
|
r2498 | ||
Thomas Kluyver
|
r12372 | class TeeTestCase(unittest.TestCase): | |
Brian Granger
|
r2498 | ||
def tchan(self, channel, check='close'): | |||
trap = StringIO() | |||
chan = StringIO() | |||
text = 'Hello' | |||
std_ori = getattr(sys, channel) | |||
setattr(sys, channel, trap) | |||
tee = Tee(chan, channel=channel) | |||
Matthias BUSSONNIER
|
r7817 | print(text, end='', file=chan) | |
Brian Granger
|
r2498 | setattr(sys, channel, std_ori) | |
trap_val = trap.getvalue() | |||
Bradley M. Froehle
|
r7875 | nt.assert_equal(chan.getvalue(), text) | |
Brian Granger
|
r2498 | if check=='close': | |
tee.close() | |||
else: | |||
del tee | |||
def test(self): | |||
for chan in ['stdout', 'stderr']: | |||
for check in ['close', 'del']: | |||
Thomas Kluyver
|
r12372 | self.tchan(chan, check) | |
MinRK
|
r3800 | ||
def test_io_init(): | |||
"""Test that io.stdin/out/err exist at startup""" | |||
for name in ('stdin', 'stdout', 'stderr'): | |||
Thomas Kluyver
|
r4891 | cmd = doctest_refactor_print("from IPython.utils import io;print io.%s.__class__"%name) | |
p = Popen([sys.executable, '-c', cmd], | |||
MinRK
|
r3800 | stdout=PIPE) | |
p.wait() | |||
Thomas Kluyver
|
r4891 | classname = p.stdout.read().strip().decode('ascii') | |
# __class__ is a reference to the class object in Python 3, so we can't | |||
# just test for string equality. | |||
assert 'IPython.utils.io.IOStream' in classname, classname | |||
MinRK
|
r7324 | ||
def test_capture_output(): | |||
"""capture_output() context works""" | |||
with capture_output() as io: | |||
Matthias BUSSONNIER
|
r7817 | print('hi, stdout') | |
print('hi, stderr', file=sys.stderr) | |||
MinRK
|
r7324 | ||
Bradley M. Froehle
|
r7875 | nt.assert_equal(io.stdout, 'hi, stdout\n') | |
nt.assert_equal(io.stderr, 'hi, stderr\n') | |||
Thomas Kluyver
|
r13690 | ||
def test_UnicodeStdStream(): | |||
# Test wrapping a bytes-level stdout | |||
if PY3: | |||
stdoutb = stdlib_io.BytesIO() | |||
stdout = stdlib_io.TextIOWrapper(stdoutb, encoding='ascii') | |||
else: | |||
stdout = stdoutb = stdlib_io.BytesIO() | |||
orig_stdout = sys.stdout | |||
sys.stdout = stdout | |||
try: | |||
sample = u"@łe¶ŧ←" | |||
unicode_std_stream().write(sample) | |||
output = stdoutb.getvalue().decode('utf-8') | |||
nt.assert_equal(output, sample) | |||
assert not stdout.closed | |||
finally: | |||
sys.stdout = orig_stdout | |||
@skipif(not PY3, "Not applicable on Python 2") | |||
def test_UnicodeStdStream_nowrap(): | |||
# If we replace stdout with a StringIO, it shouldn't get wrapped. | |||
orig_stdout = sys.stdout | |||
sys.stdout = StringIO() | |||
try: | |||
nt.assert_is(unicode_std_stream(), sys.stdout) | |||
assert not sys.stdout.closed | |||
finally: | |||
Thomas Kluyver
|
r17570 | sys.stdout = orig_stdout | |
def test_atomic_writing(): | |||
class CustomExc(Exception): pass | |||
with TemporaryDirectory() as td: | |||
f1 = os.path.join(td, 'penguin') | |||
with stdlib_io.open(f1, 'w') as f: | |||
f.write(u'Before') | |||
Thomas Kluyver
|
r17831 | ||
if os.name != 'nt': | |||
os.chmod(f1, 0o701) | |||
orig_mode = stat.S_IMODE(os.stat(f1).st_mode) | |||
Thomas Kluyver
|
r17570 | ||
Thomas Kluyver
|
r17832 | f2 = os.path.join(td, 'flamingo') | |
try: | |||
os.symlink(f1, f2) | |||
have_symlink = True | |||
Thomas Kluyver
|
r18074 | except (AttributeError, NotImplementedError, OSError): | |
# AttributeError: Python doesn't support it | |||
# NotImplementedError: The system doesn't support it | |||
# OSError: The user lacks the privilege (Windows) | |||
Thomas Kluyver
|
r17832 | have_symlink = False | |
Thomas Kluyver
|
r17570 | with nt.assert_raises(CustomExc): | |
with atomic_writing(f1) as f: | |||
f.write(u'Failing write') | |||
raise CustomExc | |||
# Because of the exception, the file should not have been modified | |||
with stdlib_io.open(f1, 'r') as f: | |||
nt.assert_equal(f.read(), u'Before') | |||
with atomic_writing(f1) as f: | |||
f.write(u'Overwritten') | |||
with stdlib_io.open(f1, 'r') as f: | |||
nt.assert_equal(f.read(), u'Overwritten') | |||
Thomas Kluyver
|
r17831 | ||
if os.name != 'nt': | |||
mode = stat.S_IMODE(os.stat(f1).st_mode) | |||
nt.assert_equal(mode, orig_mode) | |||
Thomas Kluyver
|
r17832 | ||
if have_symlink: | |||
# Check that writing over a file preserves a symlink | |||
with atomic_writing(f2) as f: | |||
f.write(u'written from symlink') | |||
with stdlib_io.open(f1, 'r') as f: | |||
nt.assert_equal(f.read(), u'written from symlink') |