##// END OF EJS Templates
IOStream: Ignore missing attrs from `dir()`. #6386...
Christopher Welborn -
Show More
@@ -1,240 +1,245 b''
1 1 # encoding: utf-8
2 2 """
3 3 IO related utilities.
4 4 """
5 5
6 6 # Copyright (c) IPython Development Team.
7 7 # Distributed under the terms of the Modified BSD License.
8 8
9 9
10 10
11 11 import atexit
12 12 import os
13 13 import sys
14 14 import tempfile
15 15 import warnings
16 16 from warnings import warn
17 17
18 18 from IPython.utils.decorators import undoc
19 19 from .capture import CapturedIO, capture_output
20 20 from .py3compat import input
21 21
22 22 @undoc
23 23 class IOStream:
24 24
25 25 def __init__(self, stream, fallback=None):
26 26 warn('IOStream is deprecated since IPython 5.0, use sys.{stdin,stdout,stderr} instead',
27 27 DeprecationWarning, stacklevel=2)
28 28 if not hasattr(stream,'write') or not hasattr(stream,'flush'):
29 29 if fallback is not None:
30 30 stream = fallback
31 31 else:
32 32 raise ValueError("fallback required, but not specified")
33 33 self.stream = stream
34 34 self._swrite = stream.write
35 35
36 36 # clone all methods not overridden:
37 37 def clone(meth):
38 38 return not hasattr(self, meth) and not meth.startswith('_')
39 39 for meth in filter(clone, dir(stream)):
40 setattr(self, meth, getattr(stream, meth))
40 try:
41 val = getattr(stream, meth)
42 except AttributeError:
43 pass
44 else:
45 setattr(self, meth, val)
41 46
42 47 def __repr__(self):
43 48 cls = self.__class__
44 49 tpl = '{mod}.{cls}({args})'
45 50 return tpl.format(mod=cls.__module__, cls=cls.__name__, args=self.stream)
46 51
47 52 def write(self,data):
48 53 warn('IOStream is deprecated since IPython 5.0, use sys.{stdin,stdout,stderr} instead',
49 54 DeprecationWarning, stacklevel=2)
50 55 try:
51 56 self._swrite(data)
52 57 except:
53 58 try:
54 59 # print handles some unicode issues which may trip a plain
55 60 # write() call. Emulate write() by using an empty end
56 61 # argument.
57 62 print(data, end='', file=self.stream)
58 63 except:
59 64 # if we get here, something is seriously broken.
60 65 print('ERROR - failed to write data to stream:', self.stream,
61 66 file=sys.stderr)
62 67
63 68 def writelines(self, lines):
64 69 warn('IOStream is deprecated since IPython 5.0, use sys.{stdin,stdout,stderr} instead',
65 70 DeprecationWarning, stacklevel=2)
66 71 if isinstance(lines, str):
67 72 lines = [lines]
68 73 for line in lines:
69 74 self.write(line)
70 75
71 76 # This class used to have a writeln method, but regular files and streams
72 77 # in Python don't have this method. We need to keep this completely
73 78 # compatible so we removed it.
74 79
75 80 @property
76 81 def closed(self):
77 82 return self.stream.closed
78 83
79 84 def close(self):
80 85 pass
81 86
82 87 # setup stdin/stdout/stderr to sys.stdin/sys.stdout/sys.stderr
83 88 devnull = open(os.devnull, 'w')
84 89 atexit.register(devnull.close)
85 90
86 91 # io.std* are deprecated, but don't show our own deprecation warnings
87 92 # during initialization of the deprecated API.
88 93 with warnings.catch_warnings():
89 94 warnings.simplefilter('ignore', DeprecationWarning)
90 95 stdin = IOStream(sys.stdin, fallback=devnull)
91 96 stdout = IOStream(sys.stdout, fallback=devnull)
92 97 stderr = IOStream(sys.stderr, fallback=devnull)
93 98
94 99 class Tee(object):
95 100 """A class to duplicate an output stream to stdout/err.
96 101
97 102 This works in a manner very similar to the Unix 'tee' command.
98 103
99 104 When the object is closed or deleted, it closes the original file given to
100 105 it for duplication.
101 106 """
102 107 # Inspired by:
103 108 # http://mail.python.org/pipermail/python-list/2007-May/442737.html
104 109
105 110 def __init__(self, file_or_name, mode="w", channel='stdout'):
106 111 """Construct a new Tee object.
107 112
108 113 Parameters
109 114 ----------
110 115 file_or_name : filename or open filehandle (writable)
111 116 File that will be duplicated
112 117
113 118 mode : optional, valid mode for open().
114 119 If a filename was give, open with this mode.
115 120
116 121 channel : str, one of ['stdout', 'stderr']
117 122 """
118 123 if channel not in ['stdout', 'stderr']:
119 124 raise ValueError('Invalid channel spec %s' % channel)
120 125
121 126 if hasattr(file_or_name, 'write') and hasattr(file_or_name, 'seek'):
122 127 self.file = file_or_name
123 128 else:
124 129 self.file = open(file_or_name, mode)
125 130 self.channel = channel
126 131 self.ostream = getattr(sys, channel)
127 132 setattr(sys, channel, self)
128 133 self._closed = False
129 134
130 135 def close(self):
131 136 """Close the file and restore the channel."""
132 137 self.flush()
133 138 setattr(sys, self.channel, self.ostream)
134 139 self.file.close()
135 140 self._closed = True
136 141
137 142 def write(self, data):
138 143 """Write data to both channels."""
139 144 self.file.write(data)
140 145 self.ostream.write(data)
141 146 self.ostream.flush()
142 147
143 148 def flush(self):
144 149 """Flush both channels."""
145 150 self.file.flush()
146 151 self.ostream.flush()
147 152
148 153 def __del__(self):
149 154 if not self._closed:
150 155 self.close()
151 156
152 157
153 158 def ask_yes_no(prompt, default=None, interrupt=None):
154 159 """Asks a question and returns a boolean (y/n) answer.
155 160
156 161 If default is given (one of 'y','n'), it is used if the user input is
157 162 empty. If interrupt is given (one of 'y','n'), it is used if the user
158 163 presses Ctrl-C. Otherwise the question is repeated until an answer is
159 164 given.
160 165
161 166 An EOF is treated as the default answer. If there is no default, an
162 167 exception is raised to prevent infinite loops.
163 168
164 169 Valid answers are: y/yes/n/no (match is not case sensitive)."""
165 170
166 171 answers = {'y':True,'n':False,'yes':True,'no':False}
167 172 ans = None
168 173 while ans not in answers.keys():
169 174 try:
170 175 ans = input(prompt+' ').lower()
171 176 if not ans: # response was an empty string
172 177 ans = default
173 178 except KeyboardInterrupt:
174 179 if interrupt:
175 180 ans = interrupt
176 181 print("\r")
177 182 except EOFError:
178 183 if default in answers.keys():
179 184 ans = default
180 185 print()
181 186 else:
182 187 raise
183 188
184 189 return answers[ans]
185 190
186 191
187 192 def temp_pyfile(src, ext='.py'):
188 193 """Make a temporary python file, return filename and filehandle.
189 194
190 195 Parameters
191 196 ----------
192 197 src : string or list of strings (no need for ending newlines if list)
193 198 Source code to be written to the file.
194 199
195 200 ext : optional, string
196 201 Extension for the generated file.
197 202
198 203 Returns
199 204 -------
200 205 (filename, open filehandle)
201 206 It is the caller's responsibility to close the open file and unlink it.
202 207 """
203 208 fname = tempfile.mkstemp(ext)[1]
204 209 f = open(fname,'w')
205 210 f.write(src)
206 211 f.flush()
207 212 return fname, f
208 213
209 214 def atomic_writing(*args, **kwargs):
210 215 """DEPRECATED: moved to notebook.services.contents.fileio"""
211 216 warn("IPython.utils.io.atomic_writing has moved to notebook.services.contents.fileio", stacklevel=2)
212 217 from notebook.services.contents.fileio import atomic_writing
213 218 return atomic_writing(*args, **kwargs)
214 219
215 220 def raw_print(*args, **kw):
216 221 """Raw print to sys.__stdout__, otherwise identical interface to print()."""
217 222
218 223 print(*args, sep=kw.get('sep', ' '), end=kw.get('end', '\n'),
219 224 file=sys.__stdout__)
220 225 sys.__stdout__.flush()
221 226
222 227
223 228 def raw_print_err(*args, **kw):
224 229 """Raw print to sys.__stderr__, otherwise identical interface to print()."""
225 230
226 231 print(*args, sep=kw.get('sep', ' '), end=kw.get('end', '\n'),
227 232 file=sys.__stderr__)
228 233 sys.__stderr__.flush()
229 234
230 235
231 236 # Short aliases for quick debugging, do NOT use these in production code.
232 237 rprint = raw_print
233 238 rprinte = raw_print_err
234 239
235 240
236 241 def unicode_std_stream(stream='stdout'):
237 242 """DEPRECATED, moved to nbconvert.utils.io"""
238 243 warn("IPython.utils.io.unicode_std_stream has moved to nbconvert.utils.io", stacklevel=2)
239 244 from nbconvert.utils.io import unicode_std_stream
240 245 return unicode_std_stream(stream)
@@ -1,81 +1,94 b''
1 1 # encoding: utf-8
2 2 """Tests for io.py"""
3 3
4 4 # Copyright (c) IPython Development Team.
5 5 # Distributed under the terms of the Modified BSD License.
6 6
7 7
8 8 import io as stdlib_io
9 9 import os.path
10 10 import stat
11 11 import sys
12 12 from io import StringIO
13 13
14 14 from subprocess import Popen, PIPE
15 15 import unittest
16 16
17 17 import nose.tools as nt
18 18
19 19 from IPython.testing.decorators import skipif, skip_win32
20 from IPython.utils.io import Tee, capture_output
20 from IPython.utils.io import IOStream, Tee, capture_output
21 21 from IPython.utils.py3compat import doctest_refactor_print
22 22 from IPython.utils.tempdir import TemporaryDirectory
23 23
24 24
25 25 def test_tee_simple():
26 26 "Very simple check with stdout only"
27 27 chan = StringIO()
28 28 text = 'Hello'
29 29 tee = Tee(chan, channel='stdout')
30 30 print(text, file=chan)
31 31 nt.assert_equal(chan.getvalue(), text+"\n")
32 32
33 33
34 34 class TeeTestCase(unittest.TestCase):
35 35
36 36 def tchan(self, channel, check='close'):
37 37 trap = StringIO()
38 38 chan = StringIO()
39 39 text = 'Hello'
40 40
41 41 std_ori = getattr(sys, channel)
42 42 setattr(sys, channel, trap)
43 43
44 44 tee = Tee(chan, channel=channel)
45 45 print(text, end='', file=chan)
46 46 setattr(sys, channel, std_ori)
47 47 trap_val = trap.getvalue()
48 48 nt.assert_equal(chan.getvalue(), text)
49 49 if check=='close':
50 50 tee.close()
51 51 else:
52 52 del tee
53 53
54 54 def test(self):
55 55 for chan in ['stdout', 'stderr']:
56 56 for check in ['close', 'del']:
57 57 self.tchan(chan, check)
58 58
59 59 def test_io_init():
60 60 """Test that io.stdin/out/err exist at startup"""
61 61 for name in ('stdin', 'stdout', 'stderr'):
62 62 cmd = doctest_refactor_print("from IPython.utils import io;print io.%s.__class__"%name)
63 63 p = Popen([sys.executable, '-c', cmd],
64 64 stdout=PIPE)
65 65 p.wait()
66 66 classname = p.stdout.read().strip().decode('ascii')
67 67 # __class__ is a reference to the class object in Python 3, so we can't
68 68 # just test for string equality.
69 69 assert 'IPython.utils.io.IOStream' in classname, classname
70 70
71 def test_IOStream_init():
72 """IOStream initializes from a file-like object missing attributes. """
73 # Cause a failure from getattr and dir(). (Issue #6386)
74 class BadStringIO(StringIO):
75 def __dir__(self):
76 attrs = super(StringIO, self).__dir__()
77 attrs.append('name')
78 return attrs
79
80 iostream = IOStream(BadStringIO())
81 iostream.write('hi, bad iostream\n')
82 assert not hasattr(iostream, 'name')
83
71 84 def test_capture_output():
72 85 """capture_output() context works"""
73 86
74 87 with capture_output() as io:
75 88 print('hi, stdout')
76 89 print('hi, stderr', file=sys.stderr)
77 90
78 91 nt.assert_equal(io.stdout, 'hi, stdout\n')
79 92 nt.assert_equal(io.stderr, 'hi, stderr\n')
80 93
81 94
General Comments 0
You need to be logged in to leave comments. Login now