Show More
@@ -1,175 +1,178 b'' | |||||
1 | # encoding: utf-8 |
|
1 | # encoding: utf-8 | |
2 | """Tests for io.py""" |
|
2 | """Tests for io.py""" | |
3 |
|
3 | |||
4 | #----------------------------------------------------------------------------- |
|
4 | #----------------------------------------------------------------------------- | |
5 | # Copyright (C) 2008-2011 The IPython Development Team |
|
5 | # Copyright (C) 2008-2011 The IPython Development Team | |
6 | # |
|
6 | # | |
7 | # Distributed under the terms of the BSD License. The full license is in |
|
7 | # Distributed under the terms of the BSD License. The full license is in | |
8 | # the file COPYING, distributed as part of this software. |
|
8 | # the file COPYING, distributed as part of this software. | |
9 | #----------------------------------------------------------------------------- |
|
9 | #----------------------------------------------------------------------------- | |
10 |
|
10 | |||
11 | #----------------------------------------------------------------------------- |
|
11 | #----------------------------------------------------------------------------- | |
12 | # Imports |
|
12 | # Imports | |
13 | #----------------------------------------------------------------------------- |
|
13 | #----------------------------------------------------------------------------- | |
14 | from __future__ import print_function |
|
14 | from __future__ import print_function | |
15 | from __future__ import absolute_import |
|
15 | from __future__ import absolute_import | |
16 |
|
16 | |||
17 | import io as stdlib_io |
|
17 | import io as stdlib_io | |
18 | import os.path |
|
18 | import os.path | |
19 | import stat |
|
19 | import stat | |
20 | import sys |
|
20 | import sys | |
21 |
|
21 | |||
22 | from subprocess import Popen, PIPE |
|
22 | from subprocess import Popen, PIPE | |
23 | import unittest |
|
23 | import unittest | |
24 |
|
24 | |||
25 | import nose.tools as nt |
|
25 | import nose.tools as nt | |
26 |
|
26 | |||
27 | from IPython.testing.decorators import skipif |
|
27 | from IPython.testing.decorators import skipif | |
28 | from IPython.utils.io import (Tee, capture_output, unicode_std_stream, |
|
28 | from IPython.utils.io import (Tee, capture_output, unicode_std_stream, | |
29 | atomic_writing, |
|
29 | atomic_writing, | |
30 | ) |
|
30 | ) | |
31 | from IPython.utils.py3compat import doctest_refactor_print, PY3 |
|
31 | from IPython.utils.py3compat import doctest_refactor_print, PY3 | |
32 | from IPython.utils.tempdir import TemporaryDirectory |
|
32 | from IPython.utils.tempdir import TemporaryDirectory | |
33 |
|
33 | |||
34 | if PY3: |
|
34 | if PY3: | |
35 | from io import StringIO |
|
35 | from io import StringIO | |
36 | else: |
|
36 | else: | |
37 | from StringIO import StringIO |
|
37 | from StringIO import StringIO | |
38 |
|
38 | |||
39 | #----------------------------------------------------------------------------- |
|
39 | #----------------------------------------------------------------------------- | |
40 | # Tests |
|
40 | # Tests | |
41 | #----------------------------------------------------------------------------- |
|
41 | #----------------------------------------------------------------------------- | |
42 |
|
42 | |||
43 |
|
43 | |||
44 | def test_tee_simple(): |
|
44 | def test_tee_simple(): | |
45 | "Very simple check with stdout only" |
|
45 | "Very simple check with stdout only" | |
46 | chan = StringIO() |
|
46 | chan = StringIO() | |
47 | text = 'Hello' |
|
47 | text = 'Hello' | |
48 | tee = Tee(chan, channel='stdout') |
|
48 | tee = Tee(chan, channel='stdout') | |
49 | print(text, file=chan) |
|
49 | print(text, file=chan) | |
50 | nt.assert_equal(chan.getvalue(), text+"\n") |
|
50 | nt.assert_equal(chan.getvalue(), text+"\n") | |
51 |
|
51 | |||
52 |
|
52 | |||
53 | class TeeTestCase(unittest.TestCase): |
|
53 | class TeeTestCase(unittest.TestCase): | |
54 |
|
54 | |||
55 | def tchan(self, channel, check='close'): |
|
55 | def tchan(self, channel, check='close'): | |
56 | trap = StringIO() |
|
56 | trap = StringIO() | |
57 | chan = StringIO() |
|
57 | chan = StringIO() | |
58 | text = 'Hello' |
|
58 | text = 'Hello' | |
59 |
|
59 | |||
60 | std_ori = getattr(sys, channel) |
|
60 | std_ori = getattr(sys, channel) | |
61 | setattr(sys, channel, trap) |
|
61 | setattr(sys, channel, trap) | |
62 |
|
62 | |||
63 | tee = Tee(chan, channel=channel) |
|
63 | tee = Tee(chan, channel=channel) | |
64 | print(text, end='', file=chan) |
|
64 | print(text, end='', file=chan) | |
65 | setattr(sys, channel, std_ori) |
|
65 | setattr(sys, channel, std_ori) | |
66 | trap_val = trap.getvalue() |
|
66 | trap_val = trap.getvalue() | |
67 | nt.assert_equal(chan.getvalue(), text) |
|
67 | nt.assert_equal(chan.getvalue(), text) | |
68 | if check=='close': |
|
68 | if check=='close': | |
69 | tee.close() |
|
69 | tee.close() | |
70 | else: |
|
70 | else: | |
71 | del tee |
|
71 | del tee | |
72 |
|
72 | |||
73 | def test(self): |
|
73 | def test(self): | |
74 | for chan in ['stdout', 'stderr']: |
|
74 | for chan in ['stdout', 'stderr']: | |
75 | for check in ['close', 'del']: |
|
75 | for check in ['close', 'del']: | |
76 | self.tchan(chan, check) |
|
76 | self.tchan(chan, check) | |
77 |
|
77 | |||
78 | def test_io_init(): |
|
78 | def test_io_init(): | |
79 | """Test that io.stdin/out/err exist at startup""" |
|
79 | """Test that io.stdin/out/err exist at startup""" | |
80 | for name in ('stdin', 'stdout', 'stderr'): |
|
80 | for name in ('stdin', 'stdout', 'stderr'): | |
81 | cmd = doctest_refactor_print("from IPython.utils import io;print io.%s.__class__"%name) |
|
81 | cmd = doctest_refactor_print("from IPython.utils import io;print io.%s.__class__"%name) | |
82 | p = Popen([sys.executable, '-c', cmd], |
|
82 | p = Popen([sys.executable, '-c', cmd], | |
83 | stdout=PIPE) |
|
83 | stdout=PIPE) | |
84 | p.wait() |
|
84 | p.wait() | |
85 | classname = p.stdout.read().strip().decode('ascii') |
|
85 | classname = p.stdout.read().strip().decode('ascii') | |
86 | # __class__ is a reference to the class object in Python 3, so we can't |
|
86 | # __class__ is a reference to the class object in Python 3, so we can't | |
87 | # just test for string equality. |
|
87 | # just test for string equality. | |
88 | assert 'IPython.utils.io.IOStream' in classname, classname |
|
88 | assert 'IPython.utils.io.IOStream' in classname, classname | |
89 |
|
89 | |||
90 | def test_capture_output(): |
|
90 | def test_capture_output(): | |
91 | """capture_output() context works""" |
|
91 | """capture_output() context works""" | |
92 |
|
92 | |||
93 | with capture_output() as io: |
|
93 | with capture_output() as io: | |
94 | print('hi, stdout') |
|
94 | print('hi, stdout') | |
95 | print('hi, stderr', file=sys.stderr) |
|
95 | print('hi, stderr', file=sys.stderr) | |
96 |
|
96 | |||
97 | nt.assert_equal(io.stdout, 'hi, stdout\n') |
|
97 | nt.assert_equal(io.stdout, 'hi, stdout\n') | |
98 | nt.assert_equal(io.stderr, 'hi, stderr\n') |
|
98 | nt.assert_equal(io.stderr, 'hi, stderr\n') | |
99 |
|
99 | |||
100 | def test_UnicodeStdStream(): |
|
100 | def test_UnicodeStdStream(): | |
101 | # Test wrapping a bytes-level stdout |
|
101 | # Test wrapping a bytes-level stdout | |
102 | if PY3: |
|
102 | if PY3: | |
103 | stdoutb = stdlib_io.BytesIO() |
|
103 | stdoutb = stdlib_io.BytesIO() | |
104 | stdout = stdlib_io.TextIOWrapper(stdoutb, encoding='ascii') |
|
104 | stdout = stdlib_io.TextIOWrapper(stdoutb, encoding='ascii') | |
105 | else: |
|
105 | else: | |
106 | stdout = stdoutb = stdlib_io.BytesIO() |
|
106 | stdout = stdoutb = stdlib_io.BytesIO() | |
107 |
|
107 | |||
108 | orig_stdout = sys.stdout |
|
108 | orig_stdout = sys.stdout | |
109 | sys.stdout = stdout |
|
109 | sys.stdout = stdout | |
110 | try: |
|
110 | try: | |
111 | sample = u"@ΕeΒΆΕ§β" |
|
111 | sample = u"@ΕeΒΆΕ§β" | |
112 | unicode_std_stream().write(sample) |
|
112 | unicode_std_stream().write(sample) | |
113 |
|
113 | |||
114 | output = stdoutb.getvalue().decode('utf-8') |
|
114 | output = stdoutb.getvalue().decode('utf-8') | |
115 | nt.assert_equal(output, sample) |
|
115 | nt.assert_equal(output, sample) | |
116 | assert not stdout.closed |
|
116 | assert not stdout.closed | |
117 | finally: |
|
117 | finally: | |
118 | sys.stdout = orig_stdout |
|
118 | sys.stdout = orig_stdout | |
119 |
|
119 | |||
120 | @skipif(not PY3, "Not applicable on Python 2") |
|
120 | @skipif(not PY3, "Not applicable on Python 2") | |
121 | def test_UnicodeStdStream_nowrap(): |
|
121 | def test_UnicodeStdStream_nowrap(): | |
122 | # If we replace stdout with a StringIO, it shouldn't get wrapped. |
|
122 | # If we replace stdout with a StringIO, it shouldn't get wrapped. | |
123 | orig_stdout = sys.stdout |
|
123 | orig_stdout = sys.stdout | |
124 | sys.stdout = StringIO() |
|
124 | sys.stdout = StringIO() | |
125 | try: |
|
125 | try: | |
126 | nt.assert_is(unicode_std_stream(), sys.stdout) |
|
126 | nt.assert_is(unicode_std_stream(), sys.stdout) | |
127 | assert not sys.stdout.closed |
|
127 | assert not sys.stdout.closed | |
128 | finally: |
|
128 | finally: | |
129 | sys.stdout = orig_stdout |
|
129 | sys.stdout = orig_stdout | |
130 |
|
130 | |||
131 | def test_atomic_writing(): |
|
131 | def test_atomic_writing(): | |
132 | class CustomExc(Exception): pass |
|
132 | class CustomExc(Exception): pass | |
133 |
|
133 | |||
134 | with TemporaryDirectory() as td: |
|
134 | with TemporaryDirectory() as td: | |
135 | f1 = os.path.join(td, 'penguin') |
|
135 | f1 = os.path.join(td, 'penguin') | |
136 | with stdlib_io.open(f1, 'w') as f: |
|
136 | with stdlib_io.open(f1, 'w') as f: | |
137 | f.write(u'Before') |
|
137 | f.write(u'Before') | |
138 |
|
138 | |||
139 | if os.name != 'nt': |
|
139 | if os.name != 'nt': | |
140 | os.chmod(f1, 0o701) |
|
140 | os.chmod(f1, 0o701) | |
141 | orig_mode = stat.S_IMODE(os.stat(f1).st_mode) |
|
141 | orig_mode = stat.S_IMODE(os.stat(f1).st_mode) | |
142 |
|
142 | |||
143 | f2 = os.path.join(td, 'flamingo') |
|
143 | f2 = os.path.join(td, 'flamingo') | |
144 | try: |
|
144 | try: | |
145 | os.symlink(f1, f2) |
|
145 | os.symlink(f1, f2) | |
146 | have_symlink = True |
|
146 | have_symlink = True | |
147 | except (AttributeError, NotImplementedError): |
|
147 | except (AttributeError, NotImplementedError, OSError): | |
|
148 | # AttributeError: Python doesn't support it | |||
|
149 | # NotImplementedError: The system doesn't support it | |||
|
150 | # OSError: The user lacks the privilege (Windows) | |||
148 | have_symlink = False |
|
151 | have_symlink = False | |
149 |
|
152 | |||
150 | with nt.assert_raises(CustomExc): |
|
153 | with nt.assert_raises(CustomExc): | |
151 | with atomic_writing(f1) as f: |
|
154 | with atomic_writing(f1) as f: | |
152 | f.write(u'Failing write') |
|
155 | f.write(u'Failing write') | |
153 | raise CustomExc |
|
156 | raise CustomExc | |
154 |
|
157 | |||
155 | # Because of the exception, the file should not have been modified |
|
158 | # Because of the exception, the file should not have been modified | |
156 | with stdlib_io.open(f1, 'r') as f: |
|
159 | with stdlib_io.open(f1, 'r') as f: | |
157 | nt.assert_equal(f.read(), u'Before') |
|
160 | nt.assert_equal(f.read(), u'Before') | |
158 |
|
161 | |||
159 | with atomic_writing(f1) as f: |
|
162 | with atomic_writing(f1) as f: | |
160 | f.write(u'Overwritten') |
|
163 | f.write(u'Overwritten') | |
161 |
|
164 | |||
162 | with stdlib_io.open(f1, 'r') as f: |
|
165 | with stdlib_io.open(f1, 'r') as f: | |
163 | nt.assert_equal(f.read(), u'Overwritten') |
|
166 | nt.assert_equal(f.read(), u'Overwritten') | |
164 |
|
167 | |||
165 | if os.name != 'nt': |
|
168 | if os.name != 'nt': | |
166 | mode = stat.S_IMODE(os.stat(f1).st_mode) |
|
169 | mode = stat.S_IMODE(os.stat(f1).st_mode) | |
167 | nt.assert_equal(mode, orig_mode) |
|
170 | nt.assert_equal(mode, orig_mode) | |
168 |
|
171 | |||
169 | if have_symlink: |
|
172 | if have_symlink: | |
170 | # Check that writing over a file preserves a symlink |
|
173 | # Check that writing over a file preserves a symlink | |
171 | with atomic_writing(f2) as f: |
|
174 | with atomic_writing(f2) as f: | |
172 | f.write(u'written from symlink') |
|
175 | f.write(u'written from symlink') | |
173 |
|
176 | |||
174 | with stdlib_io.open(f1, 'r') as f: |
|
177 | with stdlib_io.open(f1, 'r') as f: | |
175 | nt.assert_equal(f.read(), u'written from symlink') No newline at end of file |
|
178 | nt.assert_equal(f.read(), u'written from symlink') |
General Comments 0
You need to be logged in to leave comments.
Login now