##// END OF EJS Templates
Interpret another error as missing symlink support on Windows...
Thomas Kluyver -
Show More
@@ -1,175 +1,178 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """Tests for io.py"""
2 """Tests for io.py"""
3
3
4 #-----------------------------------------------------------------------------
4 #-----------------------------------------------------------------------------
5 # Copyright (C) 2008-2011 The IPython Development Team
5 # Copyright (C) 2008-2011 The IPython Development Team
6 #
6 #
7 # Distributed under the terms of the BSD License. The full license is in
7 # Distributed under the terms of the BSD License. The full license is in
8 # the file COPYING, distributed as part of this software.
8 # the file COPYING, distributed as part of this software.
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10
10
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Imports
12 # Imports
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 from __future__ import print_function
14 from __future__ import print_function
15 from __future__ import absolute_import
15 from __future__ import absolute_import
16
16
17 import io as stdlib_io
17 import io as stdlib_io
18 import os.path
18 import os.path
19 import stat
19 import stat
20 import sys
20 import sys
21
21
22 from subprocess import Popen, PIPE
22 from subprocess import Popen, PIPE
23 import unittest
23 import unittest
24
24
25 import nose.tools as nt
25 import nose.tools as nt
26
26
27 from IPython.testing.decorators import skipif
27 from IPython.testing.decorators import skipif
28 from IPython.utils.io import (Tee, capture_output, unicode_std_stream,
28 from IPython.utils.io import (Tee, capture_output, unicode_std_stream,
29 atomic_writing,
29 atomic_writing,
30 )
30 )
31 from IPython.utils.py3compat import doctest_refactor_print, PY3
31 from IPython.utils.py3compat import doctest_refactor_print, PY3
32 from IPython.utils.tempdir import TemporaryDirectory
32 from IPython.utils.tempdir import TemporaryDirectory
33
33
34 if PY3:
34 if PY3:
35 from io import StringIO
35 from io import StringIO
36 else:
36 else:
37 from StringIO import StringIO
37 from StringIO import StringIO
38
38
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40 # Tests
40 # Tests
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42
42
43
43
44 def test_tee_simple():
44 def test_tee_simple():
45 "Very simple check with stdout only"
45 "Very simple check with stdout only"
46 chan = StringIO()
46 chan = StringIO()
47 text = 'Hello'
47 text = 'Hello'
48 tee = Tee(chan, channel='stdout')
48 tee = Tee(chan, channel='stdout')
49 print(text, file=chan)
49 print(text, file=chan)
50 nt.assert_equal(chan.getvalue(), text+"\n")
50 nt.assert_equal(chan.getvalue(), text+"\n")
51
51
52
52
53 class TeeTestCase(unittest.TestCase):
53 class TeeTestCase(unittest.TestCase):
54
54
55 def tchan(self, channel, check='close'):
55 def tchan(self, channel, check='close'):
56 trap = StringIO()
56 trap = StringIO()
57 chan = StringIO()
57 chan = StringIO()
58 text = 'Hello'
58 text = 'Hello'
59
59
60 std_ori = getattr(sys, channel)
60 std_ori = getattr(sys, channel)
61 setattr(sys, channel, trap)
61 setattr(sys, channel, trap)
62
62
63 tee = Tee(chan, channel=channel)
63 tee = Tee(chan, channel=channel)
64 print(text, end='', file=chan)
64 print(text, end='', file=chan)
65 setattr(sys, channel, std_ori)
65 setattr(sys, channel, std_ori)
66 trap_val = trap.getvalue()
66 trap_val = trap.getvalue()
67 nt.assert_equal(chan.getvalue(), text)
67 nt.assert_equal(chan.getvalue(), text)
68 if check=='close':
68 if check=='close':
69 tee.close()
69 tee.close()
70 else:
70 else:
71 del tee
71 del tee
72
72
73 def test(self):
73 def test(self):
74 for chan in ['stdout', 'stderr']:
74 for chan in ['stdout', 'stderr']:
75 for check in ['close', 'del']:
75 for check in ['close', 'del']:
76 self.tchan(chan, check)
76 self.tchan(chan, check)
77
77
78 def test_io_init():
78 def test_io_init():
79 """Test that io.stdin/out/err exist at startup"""
79 """Test that io.stdin/out/err exist at startup"""
80 for name in ('stdin', 'stdout', 'stderr'):
80 for name in ('stdin', 'stdout', 'stderr'):
81 cmd = doctest_refactor_print("from IPython.utils import io;print io.%s.__class__"%name)
81 cmd = doctest_refactor_print("from IPython.utils import io;print io.%s.__class__"%name)
82 p = Popen([sys.executable, '-c', cmd],
82 p = Popen([sys.executable, '-c', cmd],
83 stdout=PIPE)
83 stdout=PIPE)
84 p.wait()
84 p.wait()
85 classname = p.stdout.read().strip().decode('ascii')
85 classname = p.stdout.read().strip().decode('ascii')
86 # __class__ is a reference to the class object in Python 3, so we can't
86 # __class__ is a reference to the class object in Python 3, so we can't
87 # just test for string equality.
87 # just test for string equality.
88 assert 'IPython.utils.io.IOStream' in classname, classname
88 assert 'IPython.utils.io.IOStream' in classname, classname
89
89
90 def test_capture_output():
90 def test_capture_output():
91 """capture_output() context works"""
91 """capture_output() context works"""
92
92
93 with capture_output() as io:
93 with capture_output() as io:
94 print('hi, stdout')
94 print('hi, stdout')
95 print('hi, stderr', file=sys.stderr)
95 print('hi, stderr', file=sys.stderr)
96
96
97 nt.assert_equal(io.stdout, 'hi, stdout\n')
97 nt.assert_equal(io.stdout, 'hi, stdout\n')
98 nt.assert_equal(io.stderr, 'hi, stderr\n')
98 nt.assert_equal(io.stderr, 'hi, stderr\n')
99
99
100 def test_UnicodeStdStream():
100 def test_UnicodeStdStream():
101 # Test wrapping a bytes-level stdout
101 # Test wrapping a bytes-level stdout
102 if PY3:
102 if PY3:
103 stdoutb = stdlib_io.BytesIO()
103 stdoutb = stdlib_io.BytesIO()
104 stdout = stdlib_io.TextIOWrapper(stdoutb, encoding='ascii')
104 stdout = stdlib_io.TextIOWrapper(stdoutb, encoding='ascii')
105 else:
105 else:
106 stdout = stdoutb = stdlib_io.BytesIO()
106 stdout = stdoutb = stdlib_io.BytesIO()
107
107
108 orig_stdout = sys.stdout
108 orig_stdout = sys.stdout
109 sys.stdout = stdout
109 sys.stdout = stdout
110 try:
110 try:
111 sample = u"@Ε‚e¢ŧ←"
111 sample = u"@Ε‚e¢ŧ←"
112 unicode_std_stream().write(sample)
112 unicode_std_stream().write(sample)
113
113
114 output = stdoutb.getvalue().decode('utf-8')
114 output = stdoutb.getvalue().decode('utf-8')
115 nt.assert_equal(output, sample)
115 nt.assert_equal(output, sample)
116 assert not stdout.closed
116 assert not stdout.closed
117 finally:
117 finally:
118 sys.stdout = orig_stdout
118 sys.stdout = orig_stdout
119
119
120 @skipif(not PY3, "Not applicable on Python 2")
120 @skipif(not PY3, "Not applicable on Python 2")
121 def test_UnicodeStdStream_nowrap():
121 def test_UnicodeStdStream_nowrap():
122 # If we replace stdout with a StringIO, it shouldn't get wrapped.
122 # If we replace stdout with a StringIO, it shouldn't get wrapped.
123 orig_stdout = sys.stdout
123 orig_stdout = sys.stdout
124 sys.stdout = StringIO()
124 sys.stdout = StringIO()
125 try:
125 try:
126 nt.assert_is(unicode_std_stream(), sys.stdout)
126 nt.assert_is(unicode_std_stream(), sys.stdout)
127 assert not sys.stdout.closed
127 assert not sys.stdout.closed
128 finally:
128 finally:
129 sys.stdout = orig_stdout
129 sys.stdout = orig_stdout
130
130
131 def test_atomic_writing():
131 def test_atomic_writing():
132 class CustomExc(Exception): pass
132 class CustomExc(Exception): pass
133
133
134 with TemporaryDirectory() as td:
134 with TemporaryDirectory() as td:
135 f1 = os.path.join(td, 'penguin')
135 f1 = os.path.join(td, 'penguin')
136 with stdlib_io.open(f1, 'w') as f:
136 with stdlib_io.open(f1, 'w') as f:
137 f.write(u'Before')
137 f.write(u'Before')
138
138
139 if os.name != 'nt':
139 if os.name != 'nt':
140 os.chmod(f1, 0o701)
140 os.chmod(f1, 0o701)
141 orig_mode = stat.S_IMODE(os.stat(f1).st_mode)
141 orig_mode = stat.S_IMODE(os.stat(f1).st_mode)
142
142
143 f2 = os.path.join(td, 'flamingo')
143 f2 = os.path.join(td, 'flamingo')
144 try:
144 try:
145 os.symlink(f1, f2)
145 os.symlink(f1, f2)
146 have_symlink = True
146 have_symlink = True
147 except (AttributeError, NotImplementedError):
147 except (AttributeError, NotImplementedError, OSError):
148 # AttributeError: Python doesn't support it
149 # NotImplementedError: The system doesn't support it
150 # OSError: The user lacks the privilege (Windows)
148 have_symlink = False
151 have_symlink = False
149
152
150 with nt.assert_raises(CustomExc):
153 with nt.assert_raises(CustomExc):
151 with atomic_writing(f1) as f:
154 with atomic_writing(f1) as f:
152 f.write(u'Failing write')
155 f.write(u'Failing write')
153 raise CustomExc
156 raise CustomExc
154
157
155 # Because of the exception, the file should not have been modified
158 # Because of the exception, the file should not have been modified
156 with stdlib_io.open(f1, 'r') as f:
159 with stdlib_io.open(f1, 'r') as f:
157 nt.assert_equal(f.read(), u'Before')
160 nt.assert_equal(f.read(), u'Before')
158
161
159 with atomic_writing(f1) as f:
162 with atomic_writing(f1) as f:
160 f.write(u'Overwritten')
163 f.write(u'Overwritten')
161
164
162 with stdlib_io.open(f1, 'r') as f:
165 with stdlib_io.open(f1, 'r') as f:
163 nt.assert_equal(f.read(), u'Overwritten')
166 nt.assert_equal(f.read(), u'Overwritten')
164
167
165 if os.name != 'nt':
168 if os.name != 'nt':
166 mode = stat.S_IMODE(os.stat(f1).st_mode)
169 mode = stat.S_IMODE(os.stat(f1).st_mode)
167 nt.assert_equal(mode, orig_mode)
170 nt.assert_equal(mode, orig_mode)
168
171
169 if have_symlink:
172 if have_symlink:
170 # Check that writing over a file preserves a symlink
173 # Check that writing over a file preserves a symlink
171 with atomic_writing(f2) as f:
174 with atomic_writing(f2) as f:
172 f.write(u'written from symlink')
175 f.write(u'written from symlink')
173
176
174 with stdlib_io.open(f1, 'r') as f:
177 with stdlib_io.open(f1, 'r') as f:
175 nt.assert_equal(f.read(), u'written from symlink') No newline at end of file
178 nt.assert_equal(f.read(), u'written from symlink')
General Comments 0
You need to be logged in to leave comments. Login now