##// END OF EJS Templates
Mechanism for testing terminal interact loop in-process....
Thomas Kluyver -
Show More
@@ -1,203 +1,295 b''
1 1 # -*- coding: utf-8 -*-
2 2 """Tests for the key interactiveshell module.
3 3
4 4 Authors
5 5 -------
6 6 * Julian Taylor
7 7 """
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18 # stdlib
19 19 import sys
20 import types
20 21 import unittest
21 22
23 from IPython.core.inputtransformer import InputTransformer
22 24 from IPython.testing.decorators import skipif
23 25 from IPython.utils import py3compat
24 26 from IPython.testing import tools as tt
25 27
28 # Decorator for interaction loop tests -----------------------------------------
29
30 class mock_input_helper(object):
31 """Machinery for tests of the main interact loop.
32
33 Used by the mock_input decorator.
34 """
35 def __init__(self, testgen):
36 self.testgen = testgen
37 self.exception = None
38 self.ip = get_ipython()
39
40 def __enter__(self):
41 self.orig_raw_input = self.ip.raw_input
42 self.ip.raw_input = self.fake_input
43 return self
44
45 def __exit__(self, etype, value, tb):
46 self.ip.raw_input = self.orig_raw_input
47
48 def fake_input(self, prompt):
49 try:
50 return next(self.testgen)
51 except StopIteration:
52 self.ip.exit_now = True
53 return u''
54 except:
55 self.exception = sys.exc_info()
56 self.ip.exit_now = True
57 return u''
58
59 def mock_input(testfunc):
60 """Decorator for tests of the main interact loop.
61
62 Write the test as a generator, yield-ing the input strings, which IPython
63 will see as if they were typed in at the prompt.
64 """
65 def test_method(self):
66 testgen = testfunc(self)
67 with mock_input_helper(testgen) as mih:
68 mih.ip.interact(display_banner=False)
69
70 if mih.exception is not None:
71 # Re-raise captured exception
72 etype, value, tb = mih.exception
73 import traceback
74 traceback.print_tb(tb, file=sys.stdout)
75 del tb # Avoid reference loop
76 raise value
77
78 return test_method
79
80 # Test classes -----------------------------------------------------------------
81
26 82 class InteractiveShellTestCase(unittest.TestCase):
27 83 def rl_hist_entries(self, rl, n):
28 84 """Get last n readline history entries as a list"""
29 85 return [rl.get_history_item(rl.get_current_history_length() - x)
30 86 for x in range(n - 1, -1, -1)]
31 87
32 88 def test_runs_without_rl(self):
33 89 """Test that function does not throw without readline"""
34 90 ip = get_ipython()
35 91 ip.has_readline = False
36 92 ip.readline = None
37 93 ip._replace_rlhist_multiline(u'source', 0)
38 94
39 95 @skipif(not get_ipython().has_readline, 'no readline')
40 96 def test_runs_without_remove_history_item(self):
41 97 """Test that function does not throw on windows without
42 98 remove_history_item"""
43 99 ip = get_ipython()
44 100 if hasattr(ip.readline, 'remove_history_item'):
45 101 del ip.readline.remove_history_item
46 102 ip._replace_rlhist_multiline(u'source', 0)
47 103
48 104 @skipif(not get_ipython().has_readline, 'no readline')
49 105 @skipif(not hasattr(get_ipython().readline, 'remove_history_item'),
50 106 'no remove_history_item')
51 107 def test_replace_multiline_hist_disabled(self):
52 108 """Test that multiline replace does nothing if disabled"""
53 109 ip = get_ipython()
54 110 ip.multiline_history = False
55 111
56 112 ghist = [u'line1', u'line2']
57 113 for h in ghist:
58 114 ip.readline.add_history(h)
59 115 hlen_b4_cell = ip.readline.get_current_history_length()
60 116 hlen_b4_cell = ip._replace_rlhist_multiline(u'sourc€\nsource2',
61 117 hlen_b4_cell)
62 118
63 119 self.assertEqual(ip.readline.get_current_history_length(),
64 120 hlen_b4_cell)
65 121 hist = self.rl_hist_entries(ip.readline, 2)
66 122 self.assertEqual(hist, ghist)
67 123
68 124 @skipif(not get_ipython().has_readline, 'no readline')
69 125 @skipif(not hasattr(get_ipython().readline, 'remove_history_item'),
70 126 'no remove_history_item')
71 127 def test_replace_multiline_hist_adds(self):
72 128 """Test that multiline replace function adds history"""
73 129 ip = get_ipython()
74 130
75 131 hlen_b4_cell = ip.readline.get_current_history_length()
76 132 hlen_b4_cell = ip._replace_rlhist_multiline(u'sourc€', hlen_b4_cell)
77 133
78 134 self.assertEqual(hlen_b4_cell,
79 135 ip.readline.get_current_history_length())
80 136
81 137 @skipif(not get_ipython().has_readline, 'no readline')
82 138 @skipif(not hasattr(get_ipython().readline, 'remove_history_item'),
83 139 'no remove_history_item')
84 140 def test_replace_multiline_hist_keeps_history(self):
85 141 """Test that multiline replace does not delete history"""
86 142 ip = get_ipython()
87 143 ip.multiline_history = True
88 144
89 145 ghist = [u'line1', u'line2']
90 146 for h in ghist:
91 147 ip.readline.add_history(h)
92 148
93 149 #start cell
94 150 hlen_b4_cell = ip.readline.get_current_history_length()
95 151 # nothing added to rl history, should do nothing
96 152 hlen_b4_cell = ip._replace_rlhist_multiline(u'sourc€\nsource2',
97 153 hlen_b4_cell)
98 154
99 155 self.assertEqual(ip.readline.get_current_history_length(),
100 156 hlen_b4_cell)
101 157 hist = self.rl_hist_entries(ip.readline, 2)
102 158 self.assertEqual(hist, ghist)
103 159
104 160
105 161 @skipif(not get_ipython().has_readline, 'no readline')
106 162 @skipif(not hasattr(get_ipython().readline, 'remove_history_item'),
107 163 'no remove_history_item')
108 164 def test_replace_multiline_hist_replaces_twice(self):
109 165 """Test that multiline entries are replaced twice"""
110 166 ip = get_ipython()
111 167 ip.multiline_history = True
112 168
113 169 ip.readline.add_history(u'line0')
114 170 #start cell
115 171 hlen_b4_cell = ip.readline.get_current_history_length()
116 172 ip.readline.add_history('l€ne1')
117 173 ip.readline.add_history('line2')
118 174 #replace cell with single line
119 175 hlen_b4_cell = ip._replace_rlhist_multiline(u'l€ne1\nline2',
120 176 hlen_b4_cell)
121 177 ip.readline.add_history('l€ne3')
122 178 ip.readline.add_history('line4')
123 179 #replace cell with single line
124 180 hlen_b4_cell = ip._replace_rlhist_multiline(u'l€ne3\nline4',
125 181 hlen_b4_cell)
126 182
127 183 self.assertEqual(ip.readline.get_current_history_length(),
128 184 hlen_b4_cell)
129 185 hist = self.rl_hist_entries(ip.readline, 3)
130 186 expected = [u'line0', u'l€ne1\nline2', u'l€ne3\nline4']
131 187 # perform encoding, in case of casting due to ASCII locale
132 188 enc = sys.stdin.encoding or "utf-8"
133 189 expected = [ py3compat.unicode_to_str(e, enc) for e in expected ]
134 190 self.assertEqual(hist, expected)
135 191
136 192
137 193 @skipif(not get_ipython().has_readline, 'no readline')
138 194 @skipif(not hasattr(get_ipython().readline, 'remove_history_item'),
139 195 'no remove_history_item')
140 196 def test_replace_multiline_hist_replaces_empty_line(self):
141 197 """Test that multiline history skips empty line cells"""
142 198 ip = get_ipython()
143 199 ip.multiline_history = True
144 200
145 201 ip.readline.add_history(u'line0')
146 202 #start cell
147 203 hlen_b4_cell = ip.readline.get_current_history_length()
148 204 ip.readline.add_history('l€ne1')
149 205 ip.readline.add_history('line2')
150 206 hlen_b4_cell = ip._replace_rlhist_multiline(u'l€ne1\nline2',
151 207 hlen_b4_cell)
152 208 ip.readline.add_history('')
153 209 hlen_b4_cell = ip._replace_rlhist_multiline(u'', hlen_b4_cell)
154 210 ip.readline.add_history('l€ne3')
155 211 hlen_b4_cell = ip._replace_rlhist_multiline(u'l€ne3', hlen_b4_cell)
156 212 ip.readline.add_history(' ')
157 213 hlen_b4_cell = ip._replace_rlhist_multiline(' ', hlen_b4_cell)
158 214 ip.readline.add_history('\t')
159 215 ip.readline.add_history('\t ')
160 216 hlen_b4_cell = ip._replace_rlhist_multiline('\t', hlen_b4_cell)
161 217 ip.readline.add_history('line4')
162 218 hlen_b4_cell = ip._replace_rlhist_multiline(u'line4', hlen_b4_cell)
163 219
164 220 self.assertEqual(ip.readline.get_current_history_length(),
165 221 hlen_b4_cell)
166 222 hist = self.rl_hist_entries(ip.readline, 4)
167 223 # expect no empty cells in history
168 224 expected = [u'line0', u'l€ne1\nline2', u'l€ne3', u'line4']
169 225 # perform encoding, in case of casting due to ASCII locale
170 226 enc = sys.stdin.encoding or "utf-8"
171 227 expected = [ py3compat.unicode_to_str(e, enc) for e in expected ]
172 228 self.assertEqual(hist, expected)
173 229
230 @mock_input
231 def test_inputtransformer_syntaxerror(self):
232 ip = get_ipython()
233 transformer = SyntaxErrorTransformer()
234 ip.input_splitter.python_line_transforms.append(transformer)
235 ip.input_transformer_manager.python_line_transforms.append(transformer)
236
237 try:
238 #raise Exception
239 with tt.AssertPrints('4', suppress=False):
240 yield u'print(2*2)'
241
242 with tt.AssertPrints('SyntaxError: input contains', suppress=False):
243 yield u'print(2345) # syntaxerror'
244
245 with tt.AssertPrints('16', suppress=False):
246 yield u'print(4*4)'
247
248 finally:
249 ip.input_splitter.python_line_transforms.remove(transformer)
250 ip.input_transformer_manager.python_line_transforms.remove(transformer)
251
252
253 class SyntaxErrorTransformer(InputTransformer):
254 def push(self, line):
255 pos = line.find('syntaxerror')
256 if pos >= 0:
257 e = SyntaxError('input contains "syntaxerror"')
258 e.text = line
259 e.offset = pos + 1
260 raise e
261 return line
262
263 def reset(self):
264 pass
265
174 266 class TerminalMagicsTestCase(unittest.TestCase):
175 267 def test_paste_magics_message(self):
176 268 """Test that an IndentationError while using paste magics doesn't
177 269 trigger a message about paste magics and also the opposite."""
178 270
179 271 ip = get_ipython()
180 272 s = ('for a in range(5):\n'
181 273 'print(a)')
182 274
183 275 tm = ip.magics_manager.registry['TerminalMagics']
184 276 with tt.AssertPrints("If you want to paste code into IPython, try the "
185 277 "%paste and %cpaste magic functions."):
186 278 ip.run_cell(s)
187 279
188 280 with tt.AssertNotPrints("If you want to paste code into IPython, try the "
189 281 "%paste and %cpaste magic functions."):
190 282 tm.store_or_execute(s, name=None)
191 283
192 284 def test_paste_magics_blankline(self):
193 285 """Test that code with a blank line doesn't get split (gh-3246)."""
194 286 ip = get_ipython()
195 287 s = ('def pasted_func(a):\n'
196 288 ' b = a+1\n'
197 289 '\n'
198 290 ' return b')
199 291
200 292 tm = ip.magics_manager.registry['TerminalMagics']
201 293 tm.store_or_execute(s, name=None)
202 294
203 295 self.assertEqual(ip.user_ns['pasted_func'](54), 55)
@@ -1,451 +1,444 b''
1 1 """Generic testing tools.
2 2
3 3 Authors
4 4 -------
5 5 - Fernando Perez <Fernando.Perez@berkeley.edu>
6 6 """
7 7
8 8 from __future__ import absolute_import
9 9
10 10 #-----------------------------------------------------------------------------
11 11 # Copyright (C) 2009 The IPython Development Team
12 12 #
13 13 # Distributed under the terms of the BSD License. The full license is in
14 14 # the file COPYING, distributed as part of this software.
15 15 #-----------------------------------------------------------------------------
16 16
17 17 #-----------------------------------------------------------------------------
18 18 # Imports
19 19 #-----------------------------------------------------------------------------
20 20
21 21 import os
22 22 import re
23 23 import sys
24 24 import tempfile
25 25
26 26 from contextlib import contextmanager
27 27 from io import StringIO
28 28 from subprocess import Popen, PIPE
29 29
30 30 try:
31 31 # These tools are used by parts of the runtime, so we make the nose
32 32 # dependency optional at this point. Nose is a hard dependency to run the
33 33 # test suite, but NOT to use ipython itself.
34 34 import nose.tools as nt
35 35 has_nose = True
36 36 except ImportError:
37 37 has_nose = False
38 38
39 39 from IPython.config.loader import Config
40 40 from IPython.utils.process import get_output_error_code
41 41 from IPython.utils.text import list_strings
42 42 from IPython.utils.io import temp_pyfile, Tee
43 43 from IPython.utils import py3compat
44 44 from IPython.utils.encoding import DEFAULT_ENCODING
45 45
46 46 from . import decorators as dec
47 47 from . import skipdoctest
48 48
49 49 #-----------------------------------------------------------------------------
50 50 # Functions and classes
51 51 #-----------------------------------------------------------------------------
52 52
53 53 # The docstring for full_path doctests differently on win32 (different path
54 54 # separator) so just skip the doctest there. The example remains informative.
55 55 doctest_deco = skipdoctest.skip_doctest if sys.platform == 'win32' else dec.null_deco
56 56
57 57 @doctest_deco
58 58 def full_path(startPath,files):
59 59 """Make full paths for all the listed files, based on startPath.
60 60
61 61 Only the base part of startPath is kept, since this routine is typically
62 62 used with a script's __file__ variable as startPath. The base of startPath
63 63 is then prepended to all the listed files, forming the output list.
64 64
65 65 Parameters
66 66 ----------
67 67 startPath : string
68 68 Initial path to use as the base for the results. This path is split
69 69 using os.path.split() and only its first component is kept.
70 70
71 71 files : string or list
72 72 One or more files.
73 73
74 74 Examples
75 75 --------
76 76
77 77 >>> full_path('/foo/bar.py',['a.txt','b.txt'])
78 78 ['/foo/a.txt', '/foo/b.txt']
79 79
80 80 >>> full_path('/foo',['a.txt','b.txt'])
81 81 ['/a.txt', '/b.txt']
82 82
83 83 If a single file is given, the output is still a list:
84 84 >>> full_path('/foo','a.txt')
85 85 ['/a.txt']
86 86 """
87 87
88 88 files = list_strings(files)
89 89 base = os.path.split(startPath)[0]
90 90 return [ os.path.join(base,f) for f in files ]
91 91
92 92
93 93 def parse_test_output(txt):
94 94 """Parse the output of a test run and return errors, failures.
95 95
96 96 Parameters
97 97 ----------
98 98 txt : str
99 99 Text output of a test run, assumed to contain a line of one of the
100 100 following forms::
101 101
102 102 'FAILED (errors=1)'
103 103 'FAILED (failures=1)'
104 104 'FAILED (errors=1, failures=1)'
105 105
106 106 Returns
107 107 -------
108 108 nerr, nfail: number of errors and failures.
109 109 """
110 110
111 111 err_m = re.search(r'^FAILED \(errors=(\d+)\)', txt, re.MULTILINE)
112 112 if err_m:
113 113 nerr = int(err_m.group(1))
114 114 nfail = 0
115 115 return nerr, nfail
116 116
117 117 fail_m = re.search(r'^FAILED \(failures=(\d+)\)', txt, re.MULTILINE)
118 118 if fail_m:
119 119 nerr = 0
120 120 nfail = int(fail_m.group(1))
121 121 return nerr, nfail
122 122
123 123 both_m = re.search(r'^FAILED \(errors=(\d+), failures=(\d+)\)', txt,
124 124 re.MULTILINE)
125 125 if both_m:
126 126 nerr = int(both_m.group(1))
127 127 nfail = int(both_m.group(2))
128 128 return nerr, nfail
129 129
130 130 # If the input didn't match any of these forms, assume no error/failures
131 131 return 0, 0
132 132
133 133
134 134 # So nose doesn't think this is a test
135 135 parse_test_output.__test__ = False
136 136
137 137
138 138 def default_argv():
139 139 """Return a valid default argv for creating testing instances of ipython"""
140 140
141 141 return ['--quick', # so no config file is loaded
142 142 # Other defaults to minimize side effects on stdout
143 143 '--colors=NoColor', '--no-term-title','--no-banner',
144 144 '--autocall=0']
145 145
146 146
147 147 def default_config():
148 148 """Return a config object with good defaults for testing."""
149 149 config = Config()
150 150 config.TerminalInteractiveShell.colors = 'NoColor'
151 151 config.TerminalTerminalInteractiveShell.term_title = False,
152 152 config.TerminalInteractiveShell.autocall = 0
153 153 config.HistoryManager.hist_file = tempfile.mktemp(u'test_hist.sqlite')
154 154 config.HistoryManager.db_cache_size = 10000
155 155 return config
156 156
157 157
158 158 def get_ipython_cmd(as_string=False):
159 159 """
160 160 Return appropriate IPython command line name. By default, this will return
161 161 a list that can be used with subprocess.Popen, for example, but passing
162 162 `as_string=True` allows for returning the IPython command as a string.
163 163
164 164 Parameters
165 165 ----------
166 166 as_string: bool
167 167 Flag to allow to return the command as a string.
168 168 """
169 169 ipython_cmd = [sys.executable, "-m", "IPython"]
170 170
171 171 if as_string:
172 172 ipython_cmd = " ".join(ipython_cmd)
173 173
174 174 return ipython_cmd
175 175
176 def ipexec(fname, options=None, pipe=False):
176 def ipexec(fname, options=None):
177 177 """Utility to call 'ipython filename'.
178 178
179 179 Starts IPython with a minimal and safe configuration to make startup as fast
180 180 as possible.
181 181
182 182 Note that this starts IPython in a subprocess!
183 183
184 184 Parameters
185 185 ----------
186 186 fname : str
187 187 Name of file to be executed (should have .py or .ipy extension).
188 188
189 189 options : optional, list
190 190 Extra command-line flags to be passed to IPython.
191 191
192 pipe : optional, boolean
193 Pipe fname into IPython as stdin instead of calling it as external file
194
195 192 Returns
196 193 -------
197 194 (stdout, stderr) of ipython subprocess.
198 195 """
199 196 if options is None: options = []
200 197
201 198 # For these subprocess calls, eliminate all prompt printing so we only see
202 199 # output from script execution
203 200 prompt_opts = [ '--PromptManager.in_template=""',
204 201 '--PromptManager.in2_template=""',
205 202 '--PromptManager.out_template=""'
206 203 ]
207 204 cmdargs = default_argv() + prompt_opts + options
208 205
209 206 test_dir = os.path.dirname(__file__)
210 207
211 208 ipython_cmd = get_ipython_cmd()
212 209 # Absolute path for filename
213 210 full_fname = os.path.join(test_dir, fname)
214 if pipe:
215 full_cmd = ipython_cmd + cmdargs
216 p = Popen(full_cmd, stdin=open(full_fname), stdout=PIPE, stderr=PIPE)
217 else:
218 211 full_cmd = ipython_cmd + cmdargs + [full_fname]
219 212 p = Popen(full_cmd, stdout=PIPE, stderr=PIPE)
220 213 out, err = p.communicate()
221 214 out, err = py3compat.bytes_to_str(out), py3compat.bytes_to_str(err)
222 215 # `import readline` causes 'ESC[?1034h' to be output sometimes,
223 216 # so strip that out before doing comparisons
224 217 if out:
225 218 out = re.sub(r'\x1b\[[^h]+h', '', out)
226 219 return out, err
227 220
228 221
229 222 def ipexec_validate(fname, expected_out, expected_err='',
230 223 options=None):
231 224 """Utility to call 'ipython filename' and validate output/error.
232 225
233 226 This function raises an AssertionError if the validation fails.
234 227
235 228 Note that this starts IPython in a subprocess!
236 229
237 230 Parameters
238 231 ----------
239 232 fname : str
240 233 Name of the file to be executed (should have .py or .ipy extension).
241 234
242 235 expected_out : str
243 236 Expected stdout of the process.
244 237
245 238 expected_err : optional, str
246 239 Expected stderr of the process.
247 240
248 241 options : optional, list
249 242 Extra command-line flags to be passed to IPython.
250 243
251 244 Returns
252 245 -------
253 246 None
254 247 """
255 248
256 249 import nose.tools as nt
257 250
258 251 out, err = ipexec(fname, options)
259 252 #print 'OUT', out # dbg
260 253 #print 'ERR', err # dbg
261 254 # If there are any errors, we must check those befor stdout, as they may be
262 255 # more informative than simply having an empty stdout.
263 256 if err:
264 257 if expected_err:
265 258 nt.assert_equal("\n".join(err.strip().splitlines()), "\n".join(expected_err.strip().splitlines()))
266 259 else:
267 260 raise ValueError('Running file %r produced error: %r' %
268 261 (fname, err))
269 262 # If no errors or output on stderr was expected, match stdout
270 263 nt.assert_equal("\n".join(out.strip().splitlines()), "\n".join(expected_out.strip().splitlines()))
271 264
272 265
273 266 class TempFileMixin(object):
274 267 """Utility class to create temporary Python/IPython files.
275 268
276 269 Meant as a mixin class for test cases."""
277 270
278 271 def mktmp(self, src, ext='.py'):
279 272 """Make a valid python temp file."""
280 273 fname, f = temp_pyfile(src, ext)
281 274 self.tmpfile = f
282 275 self.fname = fname
283 276
284 277 def tearDown(self):
285 278 if hasattr(self, 'tmpfile'):
286 279 # If the tmpfile wasn't made because of skipped tests, like in
287 280 # win32, there's nothing to cleanup.
288 281 self.tmpfile.close()
289 282 try:
290 283 os.unlink(self.fname)
291 284 except:
292 285 # On Windows, even though we close the file, we still can't
293 286 # delete it. I have no clue why
294 287 pass
295 288
296 289 pair_fail_msg = ("Testing {0}\n\n"
297 290 "In:\n"
298 291 " {1!r}\n"
299 292 "Expected:\n"
300 293 " {2!r}\n"
301 294 "Got:\n"
302 295 " {3!r}\n")
303 296 def check_pairs(func, pairs):
304 297 """Utility function for the common case of checking a function with a
305 298 sequence of input/output pairs.
306 299
307 300 Parameters
308 301 ----------
309 302 func : callable
310 303 The function to be tested. Should accept a single argument.
311 304 pairs : iterable
312 305 A list of (input, expected_output) tuples.
313 306
314 307 Returns
315 308 -------
316 309 None. Raises an AssertionError if any output does not match the expected
317 310 value.
318 311 """
319 312 name = getattr(func, "func_name", getattr(func, "__name__", "<unknown>"))
320 313 for inp, expected in pairs:
321 314 out = func(inp)
322 315 assert out == expected, pair_fail_msg.format(name, inp, expected, out)
323 316
324 317
325 318 if py3compat.PY3:
326 319 MyStringIO = StringIO
327 320 else:
328 321 # In Python 2, stdout/stderr can have either bytes or unicode written to them,
329 322 # so we need a class that can handle both.
330 323 class MyStringIO(StringIO):
331 324 def write(self, s):
332 325 s = py3compat.cast_unicode(s, encoding=DEFAULT_ENCODING)
333 326 super(MyStringIO, self).write(s)
334 327
335 328 notprinted_msg = """Did not find {0!r} in printed output (on {1}):
336 329 -------
337 330 {2!s}
338 331 -------
339 332 """
340 333
341 334 class AssertPrints(object):
342 335 """Context manager for testing that code prints certain text.
343 336
344 337 Examples
345 338 --------
346 339 >>> with AssertPrints("abc", suppress=False):
347 340 ... print("abcd")
348 341 ... print("def")
349 342 ...
350 343 abcd
351 344 def
352 345 """
353 346 def __init__(self, s, channel='stdout', suppress=True):
354 347 self.s = s
355 348 if isinstance(self.s, py3compat.string_types):
356 349 self.s = [self.s]
357 350 self.channel = channel
358 351 self.suppress = suppress
359 352
360 353 def __enter__(self):
361 354 self.orig_stream = getattr(sys, self.channel)
362 355 self.buffer = MyStringIO()
363 356 self.tee = Tee(self.buffer, channel=self.channel)
364 357 setattr(sys, self.channel, self.buffer if self.suppress else self.tee)
365 358
366 359 def __exit__(self, etype, value, traceback):
367 360 if value is not None:
368 361 # If an error was raised, don't check anything else
369 362 return False
370 363 self.tee.flush()
371 364 setattr(sys, self.channel, self.orig_stream)
372 365 printed = self.buffer.getvalue()
373 366 for s in self.s:
374 367 assert s in printed, notprinted_msg.format(s, self.channel, printed)
375 368 return False
376 369
377 370 printed_msg = """Found {0!r} in printed output (on {1}):
378 371 -------
379 372 {2!s}
380 373 -------
381 374 """
382 375
383 376 class AssertNotPrints(AssertPrints):
384 377 """Context manager for checking that certain output *isn't* produced.
385 378
386 379 Counterpart of AssertPrints"""
387 380 def __exit__(self, etype, value, traceback):
388 381 if value is not None:
389 382 # If an error was raised, don't check anything else
390 383 return False
391 384 self.tee.flush()
392 385 setattr(sys, self.channel, self.orig_stream)
393 386 printed = self.buffer.getvalue()
394 387 for s in self.s:
395 388 assert s not in printed, printed_msg.format(s, self.channel, printed)
396 389 return False
397 390
398 391 @contextmanager
399 392 def mute_warn():
400 393 from IPython.utils import warn
401 394 save_warn = warn.warn
402 395 warn.warn = lambda *a, **kw: None
403 396 try:
404 397 yield
405 398 finally:
406 399 warn.warn = save_warn
407 400
408 401 @contextmanager
409 402 def make_tempfile(name):
410 403 """ Create an empty, named, temporary file for the duration of the context.
411 404 """
412 405 f = open(name, 'w')
413 406 f.close()
414 407 try:
415 408 yield
416 409 finally:
417 410 os.unlink(name)
418 411
419 412
420 413 @contextmanager
421 414 def monkeypatch(obj, name, attr):
422 415 """
423 416 Context manager to replace attribute named `name` in `obj` with `attr`.
424 417 """
425 418 orig = getattr(obj, name)
426 419 setattr(obj, name, attr)
427 420 yield
428 421 setattr(obj, name, orig)
429 422
430 423
431 424 def help_output_test(subcommand=''):
432 425 """test that `ipython [subcommand] -h` works"""
433 426 cmd = ' '.join(get_ipython_cmd() + [subcommand, '-h'])
434 427 out, err, rc = get_output_error_code(cmd)
435 428 nt.assert_equal(rc, 0, err)
436 429 nt.assert_not_in("Traceback", err)
437 430 nt.assert_in("Options", out)
438 431 nt.assert_in("--help-all", out)
439 432 return out, err
440 433
441 434
442 435 def help_all_output_test(subcommand=''):
443 436 """test that `ipython [subcommand] --help-all` works"""
444 437 cmd = ' '.join(get_ipython_cmd() + [subcommand, '--help-all'])
445 438 out, err, rc = get_output_error_code(cmd)
446 439 nt.assert_equal(rc, 0, err)
447 440 nt.assert_not_in("Traceback", err)
448 441 nt.assert_in("Options", out)
449 442 nt.assert_in("Class parameters", out)
450 443 return out, err
451 444
@@ -1,229 +1,234 b''
1 1 # encoding: utf-8
2 2 """
3 3 IO related utilities.
4 4 """
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2008-2011 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12 from __future__ import print_function
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17 import os
18 18 import sys
19 19 import tempfile
20 20 from .capture import CapturedIO, capture_output
21 21 from .py3compat import string_types, input
22 22
23 23 #-----------------------------------------------------------------------------
24 24 # Code
25 25 #-----------------------------------------------------------------------------
26 26
27 27
28 28 class IOStream:
29 29
30 30 def __init__(self,stream, fallback=None):
31 31 if not hasattr(stream,'write') or not hasattr(stream,'flush'):
32 32 if fallback is not None:
33 33 stream = fallback
34 34 else:
35 35 raise ValueError("fallback required, but not specified")
36 36 self.stream = stream
37 37 self._swrite = stream.write
38 38
39 39 # clone all methods not overridden:
40 40 def clone(meth):
41 41 return not hasattr(self, meth) and not meth.startswith('_')
42 42 for meth in filter(clone, dir(stream)):
43 43 setattr(self, meth, getattr(stream, meth))
44 44
45 def __repr__(self):
46 cls = self.__class__
47 tpl = '{mod}.{cls}({args})'
48 return tpl.format(mod=cls.__module__, cls=cls.__name__, args=self.stream)
49
45 50 def write(self,data):
46 51 try:
47 52 self._swrite(data)
48 53 except:
49 54 try:
50 55 # print handles some unicode issues which may trip a plain
51 56 # write() call. Emulate write() by using an empty end
52 57 # argument.
53 58 print(data, end='', file=self.stream)
54 59 except:
55 60 # if we get here, something is seriously broken.
56 61 print('ERROR - failed to write data to stream:', self.stream,
57 62 file=sys.stderr)
58 63
59 64 def writelines(self, lines):
60 65 if isinstance(lines, string_types):
61 66 lines = [lines]
62 67 for line in lines:
63 68 self.write(line)
64 69
65 70 # This class used to have a writeln method, but regular files and streams
66 71 # in Python don't have this method. We need to keep this completely
67 72 # compatible so we removed it.
68 73
69 74 @property
70 75 def closed(self):
71 76 return self.stream.closed
72 77
73 78 def close(self):
74 79 pass
75 80
76 81 # setup stdin/stdout/stderr to sys.stdin/sys.stdout/sys.stderr
77 82 devnull = open(os.devnull, 'a')
78 83 stdin = IOStream(sys.stdin, fallback=devnull)
79 84 stdout = IOStream(sys.stdout, fallback=devnull)
80 85 stderr = IOStream(sys.stderr, fallback=devnull)
81 86
82 87 class IOTerm:
83 88 """ Term holds the file or file-like objects for handling I/O operations.
84 89
85 90 These are normally just sys.stdin, sys.stdout and sys.stderr but for
86 91 Windows they can can replaced to allow editing the strings before they are
87 92 displayed."""
88 93
89 94 # In the future, having IPython channel all its I/O operations through
90 95 # this class will make it easier to embed it into other environments which
91 96 # are not a normal terminal (such as a GUI-based shell)
92 97 def __init__(self, stdin=None, stdout=None, stderr=None):
93 98 mymodule = sys.modules[__name__]
94 99 self.stdin = IOStream(stdin, mymodule.stdin)
95 100 self.stdout = IOStream(stdout, mymodule.stdout)
96 101 self.stderr = IOStream(stderr, mymodule.stderr)
97 102
98 103
99 104 class Tee(object):
100 105 """A class to duplicate an output stream to stdout/err.
101 106
102 107 This works in a manner very similar to the Unix 'tee' command.
103 108
104 109 When the object is closed or deleted, it closes the original file given to
105 110 it for duplication.
106 111 """
107 112 # Inspired by:
108 113 # http://mail.python.org/pipermail/python-list/2007-May/442737.html
109 114
110 115 def __init__(self, file_or_name, mode="w", channel='stdout'):
111 116 """Construct a new Tee object.
112 117
113 118 Parameters
114 119 ----------
115 120 file_or_name : filename or open filehandle (writable)
116 121 File that will be duplicated
117 122
118 123 mode : optional, valid mode for open().
119 124 If a filename was give, open with this mode.
120 125
121 126 channel : str, one of ['stdout', 'stderr']
122 127 """
123 128 if channel not in ['stdout', 'stderr']:
124 129 raise ValueError('Invalid channel spec %s' % channel)
125 130
126 131 if hasattr(file_or_name, 'write') and hasattr(file_or_name, 'seek'):
127 132 self.file = file_or_name
128 133 else:
129 134 self.file = open(file_or_name, mode)
130 135 self.channel = channel
131 136 self.ostream = getattr(sys, channel)
132 137 setattr(sys, channel, self)
133 138 self._closed = False
134 139
135 140 def close(self):
136 141 """Close the file and restore the channel."""
137 142 self.flush()
138 143 setattr(sys, self.channel, self.ostream)
139 144 self.file.close()
140 145 self._closed = True
141 146
142 147 def write(self, data):
143 148 """Write data to both channels."""
144 149 self.file.write(data)
145 150 self.ostream.write(data)
146 151 self.ostream.flush()
147 152
148 153 def flush(self):
149 154 """Flush both channels."""
150 155 self.file.flush()
151 156 self.ostream.flush()
152 157
153 158 def __del__(self):
154 159 if not self._closed:
155 160 self.close()
156 161
157 162
158 163 def ask_yes_no(prompt,default=None):
159 164 """Asks a question and returns a boolean (y/n) answer.
160 165
161 166 If default is given (one of 'y','n'), it is used if the user input is
162 167 empty. Otherwise the question is repeated until an answer is given.
163 168
164 169 An EOF is treated as the default answer. If there is no default, an
165 170 exception is raised to prevent infinite loops.
166 171
167 172 Valid answers are: y/yes/n/no (match is not case sensitive)."""
168 173
169 174 answers = {'y':True,'n':False,'yes':True,'no':False}
170 175 ans = None
171 176 while ans not in answers.keys():
172 177 try:
173 178 ans = input(prompt+' ').lower()
174 179 if not ans: # response was an empty string
175 180 ans = default
176 181 except KeyboardInterrupt:
177 182 pass
178 183 except EOFError:
179 184 if default in answers.keys():
180 185 ans = default
181 186 print()
182 187 else:
183 188 raise
184 189
185 190 return answers[ans]
186 191
187 192
188 193 def temp_pyfile(src, ext='.py'):
189 194 """Make a temporary python file, return filename and filehandle.
190 195
191 196 Parameters
192 197 ----------
193 198 src : string or list of strings (no need for ending newlines if list)
194 199 Source code to be written to the file.
195 200
196 201 ext : optional, string
197 202 Extension for the generated file.
198 203
199 204 Returns
200 205 -------
201 206 (filename, open filehandle)
202 207 It is the caller's responsibility to close the open file and unlink it.
203 208 """
204 209 fname = tempfile.mkstemp(ext)[1]
205 210 f = open(fname,'w')
206 211 f.write(src)
207 212 f.flush()
208 213 return fname, f
209 214
210 215
211 216 def raw_print(*args, **kw):
212 217 """Raw print to sys.__stdout__, otherwise identical interface to print()."""
213 218
214 219 print(*args, sep=kw.get('sep', ' '), end=kw.get('end', '\n'),
215 220 file=sys.__stdout__)
216 221 sys.__stdout__.flush()
217 222
218 223
219 224 def raw_print_err(*args, **kw):
220 225 """Raw print to sys.__stderr__, otherwise identical interface to print()."""
221 226
222 227 print(*args, sep=kw.get('sep', ' '), end=kw.get('end', '\n'),
223 228 file=sys.__stderr__)
224 229 sys.__stderr__.flush()
225 230
226 231
227 232 # Short aliases for quick debugging, do NOT use these in production code.
228 233 rprint = raw_print
229 234 rprinte = raw_print_err
1 NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now