##// END OF EJS Templates
Do not return filehandle as unused
Matthias Bussonnier -
Show More
@@ -1,136 +1,135 b''
1 1 # encoding: utf-8
2 2 """
3 3 Tests for testing.tools
4 4 """
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2008-2011 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 17 import os
18 18 import unittest
19 19
20 20 import nose.tools as nt
21 21
22 22 from IPython.testing import decorators as dec
23 23 from IPython.testing import tools as tt
24 24
25 25 #-----------------------------------------------------------------------------
26 26 # Tests
27 27 #-----------------------------------------------------------------------------
28 28
29 29 @dec.skip_win32
30 30 def test_full_path_posix():
31 31 spath = '/foo/bar.py'
32 32 result = tt.full_path(spath,['a.txt','b.txt'])
33 33 nt.assert_equal(result, ['/foo/a.txt', '/foo/b.txt'])
34 34 spath = '/foo'
35 35 result = tt.full_path(spath,['a.txt','b.txt'])
36 36 nt.assert_equal(result, ['/a.txt', '/b.txt'])
37 37 result = tt.full_path(spath,'a.txt')
38 38 nt.assert_equal(result, ['/a.txt'])
39 39
40 40
41 41 @dec.skip_if_not_win32
42 42 def test_full_path_win32():
43 43 spath = 'c:\\foo\\bar.py'
44 44 result = tt.full_path(spath,['a.txt','b.txt'])
45 45 nt.assert_equal(result, ['c:\\foo\\a.txt', 'c:\\foo\\b.txt'])
46 46 spath = 'c:\\foo'
47 47 result = tt.full_path(spath,['a.txt','b.txt'])
48 48 nt.assert_equal(result, ['c:\\a.txt', 'c:\\b.txt'])
49 49 result = tt.full_path(spath,'a.txt')
50 50 nt.assert_equal(result, ['c:\\a.txt'])
51 51
52 52
53 53 def test_parser():
54 54 err = ("FAILED (errors=1)", 1, 0)
55 55 fail = ("FAILED (failures=1)", 0, 1)
56 56 both = ("FAILED (errors=1, failures=1)", 1, 1)
57 57 for txt, nerr, nfail in [err, fail, both]:
58 58 nerr1, nfail1 = tt.parse_test_output(txt)
59 59 nt.assert_equal(nerr, nerr1)
60 60 nt.assert_equal(nfail, nfail1)
61 61
62 62
63 63 def test_temp_pyfile():
64 64 src = 'pass\n'
65 fname, fh = tt.temp_pyfile(src)
65 fname = tt.temp_pyfile(src)
66 66 assert os.path.isfile(fname)
67 fh.close()
68 67 with open(fname) as fh2:
69 68 src2 = fh2.read()
70 69 nt.assert_equal(src2, src)
71 70
72 71 class TestAssertPrints(unittest.TestCase):
73 72 def test_passing(self):
74 73 with tt.AssertPrints("abc"):
75 74 print("abcd")
76 75 print("def")
77 76 print(b"ghi")
78 77
79 78 def test_failing(self):
80 79 def func():
81 80 with tt.AssertPrints("abc"):
82 81 print("acd")
83 82 print("def")
84 83 print(b"ghi")
85 84
86 85 self.assertRaises(AssertionError, func)
87 86
88 87
89 88 class Test_ipexec_validate(unittest.TestCase, tt.TempFileMixin):
90 89 def test_main_path(self):
91 90 """Test with only stdout results.
92 91 """
93 92 self.mktmp("print('A')\n"
94 93 "print('B')\n"
95 94 )
96 95 out = "A\nB"
97 96 tt.ipexec_validate(self.fname, out)
98 97
99 98 def test_main_path2(self):
100 99 """Test with only stdout results, expecting windows line endings.
101 100 """
102 101 self.mktmp("print('A')\n"
103 102 "print('B')\n"
104 103 )
105 104 out = "A\r\nB"
106 105 tt.ipexec_validate(self.fname, out)
107 106
108 107 def test_exception_path(self):
109 108 """Test exception path in exception_validate.
110 109 """
111 110 self.mktmp("import sys\n"
112 111 "print('A')\n"
113 112 "print('B')\n"
114 113 "print('C', file=sys.stderr)\n"
115 114 "print('D', file=sys.stderr)\n"
116 115 )
117 116 out = "A\nB"
118 117 tt.ipexec_validate(self.fname, expected_out=out, expected_err="C\nD")
119 118
120 119 def test_exception_path2(self):
121 120 """Test exception path in exception_validate, expecting windows line endings.
122 121 """
123 122 self.mktmp("import sys\n"
124 123 "print('A')\n"
125 124 "print('B')\n"
126 125 "print('C', file=sys.stderr)\n"
127 126 "print('D', file=sys.stderr)\n"
128 127 )
129 128 out = "A\r\nB"
130 129 tt.ipexec_validate(self.fname, expected_out=out, expected_err="C\r\nD")
131 130
132 131
133 132 def tearDown(self):
134 133 # tear down correctly the mixin,
135 134 # unittest.TestCase.tearDown does nothing
136 135 tt.TempFileMixin.tearDown(self)
@@ -1,471 +1,470 b''
1 1 """Generic testing tools.
2 2
3 3 Authors
4 4 -------
5 5 - Fernando Perez <Fernando.Perez@berkeley.edu>
6 6 """
7 7
8 8
9 9 # Copyright (c) IPython Development Team.
10 10 # Distributed under the terms of the Modified BSD License.
11 11
12 12 import os
13 13 import re
14 14 import sys
15 15 import tempfile
16 16
17 17 from contextlib import contextmanager
18 18 from io import StringIO
19 19 from subprocess import Popen, PIPE
20 20 from unittest.mock import patch
21 21
22 22 try:
23 23 # These tools are used by parts of the runtime, so we make the nose
24 24 # dependency optional at this point. Nose is a hard dependency to run the
25 25 # test suite, but NOT to use ipython itself.
26 26 import nose.tools as nt
27 27 has_nose = True
28 28 except ImportError:
29 29 has_nose = False
30 30
31 31 from traitlets.config.loader import Config
32 32 from IPython.utils.process import get_output_error_code
33 33 from IPython.utils.text import list_strings
34 34 from IPython.utils.io import temp_pyfile, Tee
35 35 from IPython.utils import py3compat
36 36
37 37 from . import decorators as dec
38 38 from . import skipdoctest
39 39
40 40
41 41 # The docstring for full_path doctests differently on win32 (different path
42 42 # separator) so just skip the doctest there. The example remains informative.
43 43 doctest_deco = skipdoctest.skip_doctest if sys.platform == 'win32' else dec.null_deco
44 44
45 45 @doctest_deco
46 46 def full_path(startPath,files):
47 47 """Make full paths for all the listed files, based on startPath.
48 48
49 49 Only the base part of startPath is kept, since this routine is typically
50 50 used with a script's ``__file__`` variable as startPath. The base of startPath
51 51 is then prepended to all the listed files, forming the output list.
52 52
53 53 Parameters
54 54 ----------
55 55 startPath : string
56 56 Initial path to use as the base for the results. This path is split
57 57 using os.path.split() and only its first component is kept.
58 58
59 59 files : string or list
60 60 One or more files.
61 61
62 62 Examples
63 63 --------
64 64
65 65 >>> full_path('/foo/bar.py',['a.txt','b.txt'])
66 66 ['/foo/a.txt', '/foo/b.txt']
67 67
68 68 >>> full_path('/foo',['a.txt','b.txt'])
69 69 ['/a.txt', '/b.txt']
70 70
71 71 If a single file is given, the output is still a list::
72 72
73 73 >>> full_path('/foo','a.txt')
74 74 ['/a.txt']
75 75 """
76 76
77 77 files = list_strings(files)
78 78 base = os.path.split(startPath)[0]
79 79 return [ os.path.join(base,f) for f in files ]
80 80
81 81
82 82 def parse_test_output(txt):
83 83 """Parse the output of a test run and return errors, failures.
84 84
85 85 Parameters
86 86 ----------
87 87 txt : str
88 88 Text output of a test run, assumed to contain a line of one of the
89 89 following forms::
90 90
91 91 'FAILED (errors=1)'
92 92 'FAILED (failures=1)'
93 93 'FAILED (errors=1, failures=1)'
94 94
95 95 Returns
96 96 -------
97 97 nerr, nfail
98 98 number of errors and failures.
99 99 """
100 100
101 101 err_m = re.search(r'^FAILED \(errors=(\d+)\)', txt, re.MULTILINE)
102 102 if err_m:
103 103 nerr = int(err_m.group(1))
104 104 nfail = 0
105 105 return nerr, nfail
106 106
107 107 fail_m = re.search(r'^FAILED \(failures=(\d+)\)', txt, re.MULTILINE)
108 108 if fail_m:
109 109 nerr = 0
110 110 nfail = int(fail_m.group(1))
111 111 return nerr, nfail
112 112
113 113 both_m = re.search(r'^FAILED \(errors=(\d+), failures=(\d+)\)', txt,
114 114 re.MULTILINE)
115 115 if both_m:
116 116 nerr = int(both_m.group(1))
117 117 nfail = int(both_m.group(2))
118 118 return nerr, nfail
119 119
120 120 # If the input didn't match any of these forms, assume no error/failures
121 121 return 0, 0
122 122
123 123
124 124 # So nose doesn't think this is a test
125 125 parse_test_output.__test__ = False
126 126
127 127
128 128 def default_argv():
129 129 """Return a valid default argv for creating testing instances of ipython"""
130 130
131 131 return ['--quick', # so no config file is loaded
132 132 # Other defaults to minimize side effects on stdout
133 133 '--colors=NoColor', '--no-term-title','--no-banner',
134 134 '--autocall=0']
135 135
136 136
137 137 def default_config():
138 138 """Return a config object with good defaults for testing."""
139 139 config = Config()
140 140 config.TerminalInteractiveShell.colors = 'NoColor'
141 141 config.TerminalTerminalInteractiveShell.term_title = False,
142 142 config.TerminalInteractiveShell.autocall = 0
143 143 f = tempfile.NamedTemporaryFile(suffix=u'test_hist.sqlite', delete=False)
144 144 config.HistoryManager.hist_file = f.name
145 145 f.close()
146 146 config.HistoryManager.db_cache_size = 10000
147 147 return config
148 148
149 149
150 150 def get_ipython_cmd(as_string=False):
151 151 """
152 152 Return appropriate IPython command line name. By default, this will return
153 153 a list that can be used with subprocess.Popen, for example, but passing
154 154 `as_string=True` allows for returning the IPython command as a string.
155 155
156 156 Parameters
157 157 ----------
158 158 as_string: bool
159 159 Flag to allow to return the command as a string.
160 160 """
161 161 ipython_cmd = [sys.executable, "-m", "IPython"]
162 162
163 163 if as_string:
164 164 ipython_cmd = " ".join(ipython_cmd)
165 165
166 166 return ipython_cmd
167 167
168 168 def ipexec(fname, options=None, commands=()):
169 169 """Utility to call 'ipython filename'.
170 170
171 171 Starts IPython with a minimal and safe configuration to make startup as fast
172 172 as possible.
173 173
174 174 Note that this starts IPython in a subprocess!
175 175
176 176 Parameters
177 177 ----------
178 178 fname : str
179 179 Name of file to be executed (should have .py or .ipy extension).
180 180
181 181 options : optional, list
182 182 Extra command-line flags to be passed to IPython.
183 183
184 184 commands : optional, list
185 185 Commands to send in on stdin
186 186
187 187 Returns
188 188 -------
189 189 ``(stdout, stderr)`` of ipython subprocess.
190 190 """
191 191 if options is None: options = []
192 192
193 193 cmdargs = default_argv() + options
194 194
195 195 test_dir = os.path.dirname(__file__)
196 196
197 197 ipython_cmd = get_ipython_cmd()
198 198 # Absolute path for filename
199 199 full_fname = os.path.join(test_dir, fname)
200 200 full_cmd = ipython_cmd + cmdargs + [full_fname]
201 201 env = os.environ.copy()
202 202 # FIXME: ignore all warnings in ipexec while we have shims
203 203 # should we keep suppressing warnings here, even after removing shims?
204 204 env['PYTHONWARNINGS'] = 'ignore'
205 205 # env.pop('PYTHONWARNINGS', None) # Avoid extraneous warnings appearing on stderr
206 206 for k, v in env.items():
207 207 # Debug a bizarre failure we've seen on Windows:
208 208 # TypeError: environment can only contain strings
209 209 if not isinstance(v, str):
210 210 print(k, v)
211 211 p = Popen(full_cmd, stdout=PIPE, stderr=PIPE, stdin=PIPE, env=env)
212 212 out, err = p.communicate(input=py3compat.encode('\n'.join(commands)) or None)
213 213 out, err = py3compat.decode(out), py3compat.decode(err)
214 214 # `import readline` causes 'ESC[?1034h' to be output sometimes,
215 215 # so strip that out before doing comparisons
216 216 if out:
217 217 out = re.sub(r'\x1b\[[^h]+h', '', out)
218 218 return out, err
219 219
220 220
221 221 def ipexec_validate(fname, expected_out, expected_err='',
222 222 options=None, commands=()):
223 223 """Utility to call 'ipython filename' and validate output/error.
224 224
225 225 This function raises an AssertionError if the validation fails.
226 226
227 227 Note that this starts IPython in a subprocess!
228 228
229 229 Parameters
230 230 ----------
231 231 fname : str
232 232 Name of the file to be executed (should have .py or .ipy extension).
233 233
234 234 expected_out : str
235 235 Expected stdout of the process.
236 236
237 237 expected_err : optional, str
238 238 Expected stderr of the process.
239 239
240 240 options : optional, list
241 241 Extra command-line flags to be passed to IPython.
242 242
243 243 Returns
244 244 -------
245 245 None
246 246 """
247 247
248 248 import nose.tools as nt
249 249
250 250 out, err = ipexec(fname, options, commands)
251 251 #print 'OUT', out # dbg
252 252 #print 'ERR', err # dbg
253 253 # If there are any errors, we must check those before stdout, as they may be
254 254 # more informative than simply having an empty stdout.
255 255 if err:
256 256 if expected_err:
257 257 nt.assert_equal("\n".join(err.strip().splitlines()), "\n".join(expected_err.strip().splitlines()))
258 258 else:
259 259 raise ValueError('Running file %r produced error: %r' %
260 260 (fname, err))
261 261 # If no errors or output on stderr was expected, match stdout
262 262 nt.assert_equal("\n".join(out.strip().splitlines()), "\n".join(expected_out.strip().splitlines()))
263 263
264 264
265 265 class TempFileMixin(object):
266 266 """Utility class to create temporary Python/IPython files.
267 267
268 268 Meant as a mixin class for test cases."""
269 269
270 270 def mktmp(self, src, ext='.py'):
271 271 """Make a valid python temp file."""
272 fname, f = temp_pyfile(src, ext)
272 fname = temp_pyfile(src, ext)
273 273 if not hasattr(self, 'tmps'):
274 274 self.tmps=[]
275 self.tmps.append((f, fname))
275 self.tmps.append(fname)
276 276 self.fname = fname
277 277
278 278 def tearDown(self):
279 279 # If the tmpfile wasn't made because of skipped tests, like in
280 280 # win32, there's nothing to cleanup.
281 281 if hasattr(self, 'tmps'):
282 for f,fname in self.tmps:
282 for fname in self.tmps:
283 283 # If the tmpfile wasn't made because of skipped tests, like in
284 284 # win32, there's nothing to cleanup.
285 f.close()
286 285 try:
287 286 os.unlink(fname)
288 287 except:
289 288 # On Windows, even though we close the file, we still can't
290 289 # delete it. I have no clue why
291 290 pass
292 291
293 292 def __enter__(self):
294 293 return self
295 294
296 295 def __exit__(self, exc_type, exc_value, traceback):
297 296 self.tearDown()
298 297
299 298
300 299 pair_fail_msg = ("Testing {0}\n\n"
301 300 "In:\n"
302 301 " {1!r}\n"
303 302 "Expected:\n"
304 303 " {2!r}\n"
305 304 "Got:\n"
306 305 " {3!r}\n")
307 306 def check_pairs(func, pairs):
308 307 """Utility function for the common case of checking a function with a
309 308 sequence of input/output pairs.
310 309
311 310 Parameters
312 311 ----------
313 312 func : callable
314 313 The function to be tested. Should accept a single argument.
315 314 pairs : iterable
316 315 A list of (input, expected_output) tuples.
317 316
318 317 Returns
319 318 -------
320 319 None. Raises an AssertionError if any output does not match the expected
321 320 value.
322 321 """
323 322 name = getattr(func, "func_name", getattr(func, "__name__", "<unknown>"))
324 323 for inp, expected in pairs:
325 324 out = func(inp)
326 325 assert out == expected, pair_fail_msg.format(name, inp, expected, out)
327 326
328 327
329 328 MyStringIO = StringIO
330 329
331 330 _re_type = type(re.compile(r''))
332 331
333 332 notprinted_msg = """Did not find {0!r} in printed output (on {1}):
334 333 -------
335 334 {2!s}
336 335 -------
337 336 """
338 337
339 338 class AssertPrints(object):
340 339 """Context manager for testing that code prints certain text.
341 340
342 341 Examples
343 342 --------
344 343 >>> with AssertPrints("abc", suppress=False):
345 344 ... print("abcd")
346 345 ... print("def")
347 346 ...
348 347 abcd
349 348 def
350 349 """
351 350 def __init__(self, s, channel='stdout', suppress=True):
352 351 self.s = s
353 352 if isinstance(self.s, (str, _re_type)):
354 353 self.s = [self.s]
355 354 self.channel = channel
356 355 self.suppress = suppress
357 356
358 357 def __enter__(self):
359 358 self.orig_stream = getattr(sys, self.channel)
360 359 self.buffer = MyStringIO()
361 360 self.tee = Tee(self.buffer, channel=self.channel)
362 361 setattr(sys, self.channel, self.buffer if self.suppress else self.tee)
363 362
364 363 def __exit__(self, etype, value, traceback):
365 364 try:
366 365 if value is not None:
367 366 # If an error was raised, don't check anything else
368 367 return False
369 368 self.tee.flush()
370 369 setattr(sys, self.channel, self.orig_stream)
371 370 printed = self.buffer.getvalue()
372 371 for s in self.s:
373 372 if isinstance(s, _re_type):
374 373 assert s.search(printed), notprinted_msg.format(s.pattern, self.channel, printed)
375 374 else:
376 375 assert s in printed, notprinted_msg.format(s, self.channel, printed)
377 376 return False
378 377 finally:
379 378 self.tee.close()
380 379
381 380 printed_msg = """Found {0!r} in printed output (on {1}):
382 381 -------
383 382 {2!s}
384 383 -------
385 384 """
386 385
387 386 class AssertNotPrints(AssertPrints):
388 387 """Context manager for checking that certain output *isn't* produced.
389 388
390 389 Counterpart of AssertPrints"""
391 390 def __exit__(self, etype, value, traceback):
392 391 try:
393 392 if value is not None:
394 393 # If an error was raised, don't check anything else
395 394 self.tee.close()
396 395 return False
397 396 self.tee.flush()
398 397 setattr(sys, self.channel, self.orig_stream)
399 398 printed = self.buffer.getvalue()
400 399 for s in self.s:
401 400 if isinstance(s, _re_type):
402 401 assert not s.search(printed),printed_msg.format(
403 402 s.pattern, self.channel, printed)
404 403 else:
405 404 assert s not in printed, printed_msg.format(
406 405 s, self.channel, printed)
407 406 return False
408 407 finally:
409 408 self.tee.close()
410 409
411 410 @contextmanager
412 411 def mute_warn():
413 412 from IPython.utils import warn
414 413 save_warn = warn.warn
415 414 warn.warn = lambda *a, **kw: None
416 415 try:
417 416 yield
418 417 finally:
419 418 warn.warn = save_warn
420 419
421 420 @contextmanager
422 421 def make_tempfile(name):
423 422 """ Create an empty, named, temporary file for the duration of the context.
424 423 """
425 424 open(name, 'w').close()
426 425 try:
427 426 yield
428 427 finally:
429 428 os.unlink(name)
430 429
431 430 def fake_input(inputs):
432 431 """Temporarily replace the input() function to return the given values
433 432
434 433 Use as a context manager:
435 434
436 435 with fake_input(['result1', 'result2']):
437 436 ...
438 437
439 438 Values are returned in order. If input() is called again after the last value
440 439 was used, EOFError is raised.
441 440 """
442 441 it = iter(inputs)
443 442 def mock_input(prompt=''):
444 443 try:
445 444 return next(it)
446 445 except StopIteration:
447 446 raise EOFError('No more inputs given')
448 447
449 448 return patch('builtins.input', mock_input)
450 449
451 450 def help_output_test(subcommand=''):
452 451 """test that `ipython [subcommand] -h` works"""
453 452 cmd = get_ipython_cmd() + [subcommand, '-h']
454 453 out, err, rc = get_output_error_code(cmd)
455 454 nt.assert_equal(rc, 0, err)
456 455 nt.assert_not_in("Traceback", err)
457 456 nt.assert_in("Options", out)
458 457 nt.assert_in("--help-all", out)
459 458 return out, err
460 459
461 460
462 461 def help_all_output_test(subcommand=''):
463 462 """test that `ipython [subcommand] --help-all` works"""
464 463 cmd = get_ipython_cmd() + [subcommand, '--help-all']
465 464 out, err, rc = get_output_error_code(cmd)
466 465 nt.assert_equal(rc, 0, err)
467 466 nt.assert_not_in("Traceback", err)
468 467 nt.assert_in("Options", out)
469 468 nt.assert_in("Class", out)
470 469 return out, err
471 470
@@ -1,248 +1,248 b''
1 1 # encoding: utf-8
2 2 """
3 3 IO related utilities.
4 4 """
5 5
6 6 # Copyright (c) IPython Development Team.
7 7 # Distributed under the terms of the Modified BSD License.
8 8
9 9
10 10
11 11 import atexit
12 12 import os
13 13 import sys
14 14 import tempfile
15 15 import warnings
16 16 from warnings import warn
17 17
18 18 from IPython.utils.decorators import undoc
19 19 from .capture import CapturedIO, capture_output
20 20
21 21 @undoc
22 22 class IOStream:
23 23
24 24 def __init__(self, stream, fallback=None):
25 25 warn('IOStream is deprecated since IPython 5.0, use sys.{stdin,stdout,stderr} instead',
26 26 DeprecationWarning, stacklevel=2)
27 27 if not hasattr(stream,'write') or not hasattr(stream,'flush'):
28 28 if fallback is not None:
29 29 stream = fallback
30 30 else:
31 31 raise ValueError("fallback required, but not specified")
32 32 self.stream = stream
33 33 self._swrite = stream.write
34 34
35 35 # clone all methods not overridden:
36 36 def clone(meth):
37 37 return not hasattr(self, meth) and not meth.startswith('_')
38 38 for meth in filter(clone, dir(stream)):
39 39 try:
40 40 val = getattr(stream, meth)
41 41 except AttributeError:
42 42 pass
43 43 else:
44 44 setattr(self, meth, val)
45 45
46 46 def __repr__(self):
47 47 cls = self.__class__
48 48 tpl = '{mod}.{cls}({args})'
49 49 return tpl.format(mod=cls.__module__, cls=cls.__name__, args=self.stream)
50 50
51 51 def write(self,data):
52 52 warn('IOStream is deprecated since IPython 5.0, use sys.{stdin,stdout,stderr} instead',
53 53 DeprecationWarning, stacklevel=2)
54 54 try:
55 55 self._swrite(data)
56 56 except:
57 57 try:
58 58 # print handles some unicode issues which may trip a plain
59 59 # write() call. Emulate write() by using an empty end
60 60 # argument.
61 61 print(data, end='', file=self.stream)
62 62 except:
63 63 # if we get here, something is seriously broken.
64 64 print('ERROR - failed to write data to stream:', self.stream,
65 65 file=sys.stderr)
66 66
67 67 def writelines(self, lines):
68 68 warn('IOStream is deprecated since IPython 5.0, use sys.{stdin,stdout,stderr} instead',
69 69 DeprecationWarning, stacklevel=2)
70 70 if isinstance(lines, str):
71 71 lines = [lines]
72 72 for line in lines:
73 73 self.write(line)
74 74
75 75 # This class used to have a writeln method, but regular files and streams
76 76 # in Python don't have this method. We need to keep this completely
77 77 # compatible so we removed it.
78 78
79 79 @property
80 80 def closed(self):
81 81 return self.stream.closed
82 82
83 83 def close(self):
84 84 pass
85 85
86 86 # setup stdin/stdout/stderr to sys.stdin/sys.stdout/sys.stderr
87 87 devnull = open(os.devnull, 'w')
88 88 atexit.register(devnull.close)
89 89
90 90 # io.std* are deprecated, but don't show our own deprecation warnings
91 91 # during initialization of the deprecated API.
92 92 with warnings.catch_warnings():
93 93 warnings.simplefilter('ignore', DeprecationWarning)
94 94 stdin = IOStream(sys.stdin, fallback=devnull)
95 95 stdout = IOStream(sys.stdout, fallback=devnull)
96 96 stderr = IOStream(sys.stderr, fallback=devnull)
97 97
98 98 class Tee(object):
99 99 """A class to duplicate an output stream to stdout/err.
100 100
101 101 This works in a manner very similar to the Unix 'tee' command.
102 102
103 103 When the object is closed or deleted, it closes the original file given to
104 104 it for duplication.
105 105 """
106 106 # Inspired by:
107 107 # http://mail.python.org/pipermail/python-list/2007-May/442737.html
108 108
109 109 def __init__(self, file_or_name, mode="w", channel='stdout'):
110 110 """Construct a new Tee object.
111 111
112 112 Parameters
113 113 ----------
114 114 file_or_name : filename or open filehandle (writable)
115 115 File that will be duplicated
116 116
117 117 mode : optional, valid mode for open().
118 118 If a filename was give, open with this mode.
119 119
120 120 channel : str, one of ['stdout', 'stderr']
121 121 """
122 122 if channel not in ['stdout', 'stderr']:
123 123 raise ValueError('Invalid channel spec %s' % channel)
124 124
125 125 if hasattr(file_or_name, 'write') and hasattr(file_or_name, 'seek'):
126 126 self.file = file_or_name
127 127 else:
128 128 self.file = open(file_or_name, mode)
129 129 self.channel = channel
130 130 self.ostream = getattr(sys, channel)
131 131 setattr(sys, channel, self)
132 132 self._closed = False
133 133
134 134 def close(self):
135 135 """Close the file and restore the channel."""
136 136 self.flush()
137 137 setattr(sys, self.channel, self.ostream)
138 138 self.file.close()
139 139 self._closed = True
140 140
141 141 def write(self, data):
142 142 """Write data to both channels."""
143 143 self.file.write(data)
144 144 self.ostream.write(data)
145 145 self.ostream.flush()
146 146
147 147 def flush(self):
148 148 """Flush both channels."""
149 149 self.file.flush()
150 150 self.ostream.flush()
151 151
152 152 def __del__(self):
153 153 if not self._closed:
154 154 self.close()
155 155
156 156
157 157 def ask_yes_no(prompt, default=None, interrupt=None):
158 158 """Asks a question and returns a boolean (y/n) answer.
159 159
160 160 If default is given (one of 'y','n'), it is used if the user input is
161 161 empty. If interrupt is given (one of 'y','n'), it is used if the user
162 162 presses Ctrl-C. Otherwise the question is repeated until an answer is
163 163 given.
164 164
165 165 An EOF is treated as the default answer. If there is no default, an
166 166 exception is raised to prevent infinite loops.
167 167
168 168 Valid answers are: y/yes/n/no (match is not case sensitive)."""
169 169
170 170 answers = {'y':True,'n':False,'yes':True,'no':False}
171 171 ans = None
172 172 while ans not in answers.keys():
173 173 try:
174 174 ans = input(prompt+' ').lower()
175 175 if not ans: # response was an empty string
176 176 ans = default
177 177 except KeyboardInterrupt:
178 178 if interrupt:
179 179 ans = interrupt
180 180 print("\r")
181 181 except EOFError:
182 182 if default in answers.keys():
183 183 ans = default
184 184 print()
185 185 else:
186 186 raise
187 187
188 188 return answers[ans]
189 189
190 190
191 191 def temp_pyfile(src, ext='.py'):
192 192 """Make a temporary python file, return filename and filehandle.
193 193
194 194 Parameters
195 195 ----------
196 196 src : string or list of strings (no need for ending newlines if list)
197 197 Source code to be written to the file.
198 198
199 199 ext : optional, string
200 200 Extension for the generated file.
201 201
202 202 Returns
203 203 -------
204 204 (filename, open filehandle)
205 205 It is the caller's responsibility to close the open file and unlink it.
206 206 """
207 207 fname = tempfile.mkstemp(ext)[1]
208 f = open(fname,'w')
209 f.write(src)
210 f.flush()
211 return fname, f
208 with open(fname,'w') as f:
209 f.write(src)
210 f.flush()
211 return fname
212 212
213 213 @undoc
214 214 def atomic_writing(*args, **kwargs):
215 215 """DEPRECATED: moved to notebook.services.contents.fileio"""
216 216 warn("IPython.utils.io.atomic_writing has moved to notebook.services.contents.fileio since IPython 4.0", DeprecationWarning, stacklevel=2)
217 217 from notebook.services.contents.fileio import atomic_writing
218 218 return atomic_writing(*args, **kwargs)
219 219
220 220 @undoc
221 221 def raw_print(*args, **kw):
222 222 """DEPRECATED: Raw print to sys.__stdout__, otherwise identical interface to print()."""
223 223 warn("IPython.utils.io.raw_print has been deprecated since IPython 7.0", DeprecationWarning, stacklevel=2)
224 224
225 225 print(*args, sep=kw.get('sep', ' '), end=kw.get('end', '\n'),
226 226 file=sys.__stdout__)
227 227 sys.__stdout__.flush()
228 228
229 229 @undoc
230 230 def raw_print_err(*args, **kw):
231 231 """DEPRECATED: Raw print to sys.__stderr__, otherwise identical interface to print()."""
232 232 warn("IPython.utils.io.raw_print_err has been deprecated since IPython 7.0", DeprecationWarning, stacklevel=2)
233 233
234 234 print(*args, sep=kw.get('sep', ' '), end=kw.get('end', '\n'),
235 235 file=sys.__stderr__)
236 236 sys.__stderr__.flush()
237 237
238 238 # used by IPykernel <- 4.9. Removed during IPython 7-dev period and re-added
239 239 # Keep for a version or two then should remove
240 240 rprint = raw_print
241 241 rprinte = raw_print_err
242 242
243 243 @undoc
244 244 def unicode_std_stream(stream='stdout'):
245 245 """DEPRECATED, moved to nbconvert.utils.io"""
246 246 warn("IPython.utils.io.unicode_std_stream has moved to nbconvert.utils.io since IPython 4.0", DeprecationWarning, stacklevel=2)
247 247 from nbconvert.utils.io import unicode_std_stream
248 248 return unicode_std_stream(stream)
General Comments 0
You need to be logged in to leave comments. Login now