Show More
@@ -1,175 +1,178 b'' | |||
|
1 | 1 | # encoding: utf-8 |
|
2 | 2 | """Tests for io.py""" |
|
3 | 3 | |
|
4 | 4 | #----------------------------------------------------------------------------- |
|
5 | 5 | # Copyright (C) 2008-2011 The IPython Development Team |
|
6 | 6 | # |
|
7 | 7 | # Distributed under the terms of the BSD License. The full license is in |
|
8 | 8 | # the file COPYING, distributed as part of this software. |
|
9 | 9 | #----------------------------------------------------------------------------- |
|
10 | 10 | |
|
11 | 11 | #----------------------------------------------------------------------------- |
|
12 | 12 | # Imports |
|
13 | 13 | #----------------------------------------------------------------------------- |
|
14 | 14 | from __future__ import print_function |
|
15 | 15 | from __future__ import absolute_import |
|
16 | 16 | |
|
17 | 17 | import io as stdlib_io |
|
18 | 18 | import os.path |
|
19 | 19 | import stat |
|
20 | 20 | import sys |
|
21 | 21 | |
|
22 | 22 | from subprocess import Popen, PIPE |
|
23 | 23 | import unittest |
|
24 | 24 | |
|
25 | 25 | import nose.tools as nt |
|
26 | 26 | |
|
27 | 27 | from IPython.testing.decorators import skipif |
|
28 | 28 | from IPython.utils.io import (Tee, capture_output, unicode_std_stream, |
|
29 | 29 | atomic_writing, |
|
30 | 30 | ) |
|
31 | 31 | from IPython.utils.py3compat import doctest_refactor_print, PY3 |
|
32 | 32 | from IPython.utils.tempdir import TemporaryDirectory |
|
33 | 33 | |
|
34 | 34 | if PY3: |
|
35 | 35 | from io import StringIO |
|
36 | 36 | else: |
|
37 | 37 | from StringIO import StringIO |
|
38 | 38 | |
|
39 | 39 | #----------------------------------------------------------------------------- |
|
40 | 40 | # Tests |
|
41 | 41 | #----------------------------------------------------------------------------- |
|
42 | 42 | |
|
43 | 43 | |
|
44 | 44 | def test_tee_simple(): |
|
45 | 45 | "Very simple check with stdout only" |
|
46 | 46 | chan = StringIO() |
|
47 | 47 | text = 'Hello' |
|
48 | 48 | tee = Tee(chan, channel='stdout') |
|
49 | 49 | print(text, file=chan) |
|
50 | 50 | nt.assert_equal(chan.getvalue(), text+"\n") |
|
51 | 51 | |
|
52 | 52 | |
|
53 | 53 | class TeeTestCase(unittest.TestCase): |
|
54 | 54 | |
|
55 | 55 | def tchan(self, channel, check='close'): |
|
56 | 56 | trap = StringIO() |
|
57 | 57 | chan = StringIO() |
|
58 | 58 | text = 'Hello' |
|
59 | 59 | |
|
60 | 60 | std_ori = getattr(sys, channel) |
|
61 | 61 | setattr(sys, channel, trap) |
|
62 | 62 | |
|
63 | 63 | tee = Tee(chan, channel=channel) |
|
64 | 64 | print(text, end='', file=chan) |
|
65 | 65 | setattr(sys, channel, std_ori) |
|
66 | 66 | trap_val = trap.getvalue() |
|
67 | 67 | nt.assert_equal(chan.getvalue(), text) |
|
68 | 68 | if check=='close': |
|
69 | 69 | tee.close() |
|
70 | 70 | else: |
|
71 | 71 | del tee |
|
72 | 72 | |
|
73 | 73 | def test(self): |
|
74 | 74 | for chan in ['stdout', 'stderr']: |
|
75 | 75 | for check in ['close', 'del']: |
|
76 | 76 | self.tchan(chan, check) |
|
77 | 77 | |
|
78 | 78 | def test_io_init(): |
|
79 | 79 | """Test that io.stdin/out/err exist at startup""" |
|
80 | 80 | for name in ('stdin', 'stdout', 'stderr'): |
|
81 | 81 | cmd = doctest_refactor_print("from IPython.utils import io;print io.%s.__class__"%name) |
|
82 | 82 | p = Popen([sys.executable, '-c', cmd], |
|
83 | 83 | stdout=PIPE) |
|
84 | 84 | p.wait() |
|
85 | 85 | classname = p.stdout.read().strip().decode('ascii') |
|
86 | 86 | # __class__ is a reference to the class object in Python 3, so we can't |
|
87 | 87 | # just test for string equality. |
|
88 | 88 | assert 'IPython.utils.io.IOStream' in classname, classname |
|
89 | 89 | |
|
90 | 90 | def test_capture_output(): |
|
91 | 91 | """capture_output() context works""" |
|
92 | 92 | |
|
93 | 93 | with capture_output() as io: |
|
94 | 94 | print('hi, stdout') |
|
95 | 95 | print('hi, stderr', file=sys.stderr) |
|
96 | 96 | |
|
97 | 97 | nt.assert_equal(io.stdout, 'hi, stdout\n') |
|
98 | 98 | nt.assert_equal(io.stderr, 'hi, stderr\n') |
|
99 | 99 | |
|
100 | 100 | def test_UnicodeStdStream(): |
|
101 | 101 | # Test wrapping a bytes-level stdout |
|
102 | 102 | if PY3: |
|
103 | 103 | stdoutb = stdlib_io.BytesIO() |
|
104 | 104 | stdout = stdlib_io.TextIOWrapper(stdoutb, encoding='ascii') |
|
105 | 105 | else: |
|
106 | 106 | stdout = stdoutb = stdlib_io.BytesIO() |
|
107 | 107 | |
|
108 | 108 | orig_stdout = sys.stdout |
|
109 | 109 | sys.stdout = stdout |
|
110 | 110 | try: |
|
111 | 111 | sample = u"@ΕeΒΆΕ§β" |
|
112 | 112 | unicode_std_stream().write(sample) |
|
113 | 113 | |
|
114 | 114 | output = stdoutb.getvalue().decode('utf-8') |
|
115 | 115 | nt.assert_equal(output, sample) |
|
116 | 116 | assert not stdout.closed |
|
117 | 117 | finally: |
|
118 | 118 | sys.stdout = orig_stdout |
|
119 | 119 | |
|
120 | 120 | @skipif(not PY3, "Not applicable on Python 2") |
|
121 | 121 | def test_UnicodeStdStream_nowrap(): |
|
122 | 122 | # If we replace stdout with a StringIO, it shouldn't get wrapped. |
|
123 | 123 | orig_stdout = sys.stdout |
|
124 | 124 | sys.stdout = StringIO() |
|
125 | 125 | try: |
|
126 | 126 | nt.assert_is(unicode_std_stream(), sys.stdout) |
|
127 | 127 | assert not sys.stdout.closed |
|
128 | 128 | finally: |
|
129 | 129 | sys.stdout = orig_stdout |
|
130 | 130 | |
|
131 | 131 | def test_atomic_writing(): |
|
132 | 132 | class CustomExc(Exception): pass |
|
133 | 133 | |
|
134 | 134 | with TemporaryDirectory() as td: |
|
135 | 135 | f1 = os.path.join(td, 'penguin') |
|
136 | 136 | with stdlib_io.open(f1, 'w') as f: |
|
137 | 137 | f.write(u'Before') |
|
138 | 138 | |
|
139 | 139 | if os.name != 'nt': |
|
140 | 140 | os.chmod(f1, 0o701) |
|
141 | 141 | orig_mode = stat.S_IMODE(os.stat(f1).st_mode) |
|
142 | 142 | |
|
143 | 143 | f2 = os.path.join(td, 'flamingo') |
|
144 | 144 | try: |
|
145 | 145 | os.symlink(f1, f2) |
|
146 | 146 | have_symlink = True |
|
147 | except (AttributeError, NotImplementedError): | |
|
147 | except (AttributeError, NotImplementedError, OSError): | |
|
148 | # AttributeError: Python doesn't support it | |
|
149 | # NotImplementedError: The system doesn't support it | |
|
150 | # OSError: The user lacks the privilege (Windows) | |
|
148 | 151 | have_symlink = False |
|
149 | 152 | |
|
150 | 153 | with nt.assert_raises(CustomExc): |
|
151 | 154 | with atomic_writing(f1) as f: |
|
152 | 155 | f.write(u'Failing write') |
|
153 | 156 | raise CustomExc |
|
154 | 157 | |
|
155 | 158 | # Because of the exception, the file should not have been modified |
|
156 | 159 | with stdlib_io.open(f1, 'r') as f: |
|
157 | 160 | nt.assert_equal(f.read(), u'Before') |
|
158 | 161 | |
|
159 | 162 | with atomic_writing(f1) as f: |
|
160 | 163 | f.write(u'Overwritten') |
|
161 | 164 | |
|
162 | 165 | with stdlib_io.open(f1, 'r') as f: |
|
163 | 166 | nt.assert_equal(f.read(), u'Overwritten') |
|
164 | 167 | |
|
165 | 168 | if os.name != 'nt': |
|
166 | 169 | mode = stat.S_IMODE(os.stat(f1).st_mode) |
|
167 | 170 | nt.assert_equal(mode, orig_mode) |
|
168 | 171 | |
|
169 | 172 | if have_symlink: |
|
170 | 173 | # Check that writing over a file preserves a symlink |
|
171 | 174 | with atomic_writing(f2) as f: |
|
172 | 175 | f.write(u'written from symlink') |
|
173 | 176 | |
|
174 | 177 | with stdlib_io.open(f1, 'r') as f: |
|
175 | 178 | nt.assert_equal(f.read(), u'written from symlink') No newline at end of file |
General Comments 0
You need to be logged in to leave comments.
Login now