##// END OF EJS Templates
Merge pull request #13289 from Kojoley/deprecated-asyncio.get_event_loop...
Matthias Bussonnier -
r27089:120bcdcb merge
parent child Browse files
Show More
@@ -1,172 +1,172
1 1 """
2 2 Async helper function that are invalid syntax on Python 3.5 and below.
3 3
4 4 This code is best effort, and may have edge cases not behaving as expected. In
5 5 particular it contain a number of heuristics to detect whether code is
6 6 effectively async and need to run in an event loop or not.
7 7
8 8 Some constructs (like top-level `return`, or `yield`) are taken care of
9 9 explicitly to actually raise a SyntaxError and stay as close as possible to
10 10 Python semantics.
11 11 """
12 12
13 13
14 14 import ast
15 15 import sys
16 16 import inspect
17 17 from textwrap import dedent, indent
18 18
19 19
20 20 class _AsyncIORunner:
21 21
22 22 def __call__(self, coro):
23 23 """
24 24 Handler for asyncio autoawait
25 25 """
26 26 import asyncio
27 27
28 return asyncio.get_event_loop().run_until_complete(coro)
28 return asyncio.get_event_loop_policy().get_event_loop().run_until_complete(coro)
29 29
30 30 def __str__(self):
31 31 return 'asyncio'
32 32
33 33 _asyncio_runner = _AsyncIORunner()
34 34
35 35
36 36 def _curio_runner(coroutine):
37 37 """
38 38 handler for curio autoawait
39 39 """
40 40 import curio
41 41
42 42 return curio.run(coroutine)
43 43
44 44
45 45 def _trio_runner(async_fn):
46 46 import trio
47 47
48 48 async def loc(coro):
49 49 """
50 50 We need the dummy no-op async def to protect from
51 51 trio's internal. See https://github.com/python-trio/trio/issues/89
52 52 """
53 53 return await coro
54 54
55 55 return trio.run(loc, async_fn)
56 56
57 57
58 58 def _pseudo_sync_runner(coro):
59 59 """
60 60 A runner that does not really allow async execution, and just advance the coroutine.
61 61
62 62 See discussion in https://github.com/python-trio/trio/issues/608,
63 63
64 64 Credit to Nathaniel Smith
65 65 """
66 66 try:
67 67 coro.send(None)
68 68 except StopIteration as exc:
69 69 return exc.value
70 70 else:
71 71 # TODO: do not raise but return an execution result with the right info.
72 72 raise RuntimeError(
73 73 "{coro_name!r} needs a real async loop".format(coro_name=coro.__name__)
74 74 )
75 75
76 76
77 77 def _asyncify(code: str) -> str:
78 78 """wrap code in async def definition.
79 79
80 80 And setup a bit of context to run it later.
81 81 """
82 82 res = dedent(
83 83 """
84 84 async def __wrapper__():
85 85 try:
86 86 {usercode}
87 87 finally:
88 88 locals()
89 89 """
90 90 ).format(usercode=indent(code, " " * 8))
91 91 return res
92 92
93 93
94 94 class _AsyncSyntaxErrorVisitor(ast.NodeVisitor):
95 95 """
96 96 Find syntax errors that would be an error in an async repl, but because
97 97 the implementation involves wrapping the repl in an async function, it
98 98 is erroneously allowed (e.g. yield or return at the top level)
99 99 """
100 100 def __init__(self):
101 101 if sys.version_info >= (3,8):
102 102 raise ValueError('DEPRECATED in Python 3.8+')
103 103 self.depth = 0
104 104 super().__init__()
105 105
106 106 def generic_visit(self, node):
107 107 func_types = (ast.FunctionDef, ast.AsyncFunctionDef)
108 108 invalid_types_by_depth = {
109 109 0: (ast.Return, ast.Yield, ast.YieldFrom),
110 110 1: (ast.Nonlocal,)
111 111 }
112 112
113 113 should_traverse = self.depth < max(invalid_types_by_depth.keys())
114 114 if isinstance(node, func_types) and should_traverse:
115 115 self.depth += 1
116 116 super().generic_visit(node)
117 117 self.depth -= 1
118 118 elif isinstance(node, invalid_types_by_depth[self.depth]):
119 119 raise SyntaxError()
120 120 else:
121 121 super().generic_visit(node)
122 122
123 123
124 124 def _async_parse_cell(cell: str) -> ast.AST:
125 125 """
126 126 This is a compatibility shim for pre-3.7 when async outside of a function
127 127 is a syntax error at the parse stage.
128 128
129 129 It will return an abstract syntax tree parsed as if async and await outside
130 130 of a function were not a syntax error.
131 131 """
132 132 if sys.version_info < (3, 7):
133 133 # Prior to 3.7 you need to asyncify before parse
134 134 wrapped_parse_tree = ast.parse(_asyncify(cell))
135 135 return wrapped_parse_tree.body[0].body[0]
136 136 else:
137 137 return ast.parse(cell)
138 138
139 139
140 140 def _should_be_async(cell: str) -> bool:
141 141 """Detect if a block of code need to be wrapped in an `async def`
142 142
143 143 Attempt to parse the block of code, it it compile we're fine.
144 144 Otherwise we wrap if and try to compile.
145 145
146 146 If it works, assume it should be async. Otherwise Return False.
147 147
148 148 Not handled yet: If the block of code has a return statement as the top
149 149 level, it will be seen as async. This is a know limitation.
150 150 """
151 151 if sys.version_info > (3, 8):
152 152 try:
153 153 code = compile(cell, "<>", "exec", flags=getattr(ast,'PyCF_ALLOW_TOP_LEVEL_AWAIT', 0x0))
154 154 return inspect.CO_COROUTINE & code.co_flags == inspect.CO_COROUTINE
155 155 except (SyntaxError, MemoryError):
156 156 return False
157 157 try:
158 158 # we can't limit ourself to ast.parse, as it __accepts__ to parse on
159 159 # 3.7+, but just does not _compile_
160 160 code = compile(cell, "<>", "exec")
161 161 except (SyntaxError, MemoryError):
162 162 try:
163 163 parse_tree = _async_parse_cell(cell)
164 164
165 165 # Raise a SyntaxError if there are top-level return or yields
166 166 v = _AsyncSyntaxErrorVisitor()
167 167 v.visit(parse_tree)
168 168
169 169 except (SyntaxError, MemoryError):
170 170 return False
171 171 return True
172 172 return False
@@ -1,355 +1,355
1 1 """Magic functions for running cells in various scripts."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 import asyncio
7 7 import atexit
8 8 import errno
9 9 import functools
10 10 import os
11 11 import signal
12 12 import sys
13 13 import time
14 14 from contextlib import contextmanager
15 15 from subprocess import CalledProcessError
16 16
17 17 from traitlets import Dict, List, default
18 18
19 19 from IPython.core import magic_arguments
20 20 from IPython.core.magic import Magics, cell_magic, line_magic, magics_class
21 21 from IPython.lib.backgroundjobs import BackgroundJobManager
22 22 from IPython.utils.process import arg_split
23 23
24 24 #-----------------------------------------------------------------------------
25 25 # Magic implementation classes
26 26 #-----------------------------------------------------------------------------
27 27
28 28 def script_args(f):
29 29 """single decorator for adding script args"""
30 30 args = [
31 31 magic_arguments.argument(
32 32 '--out', type=str,
33 33 help="""The variable in which to store stdout from the script.
34 34 If the script is backgrounded, this will be the stdout *pipe*,
35 35 instead of the stderr text itself and will not be auto closed.
36 36 """
37 37 ),
38 38 magic_arguments.argument(
39 39 '--err', type=str,
40 40 help="""The variable in which to store stderr from the script.
41 41 If the script is backgrounded, this will be the stderr *pipe*,
42 42 instead of the stderr text itself and will not be autoclosed.
43 43 """
44 44 ),
45 45 magic_arguments.argument(
46 46 '--bg', action="store_true",
47 47 help="""Whether to run the script in the background.
48 48 If given, the only way to see the output of the command is
49 49 with --out/err.
50 50 """
51 51 ),
52 52 magic_arguments.argument(
53 53 '--proc', type=str,
54 54 help="""The variable in which to store Popen instance.
55 55 This is used only when --bg option is given.
56 56 """
57 57 ),
58 58 magic_arguments.argument(
59 59 '--no-raise-error', action="store_false", dest='raise_error',
60 60 help="""Whether you should raise an error message in addition to
61 61 a stream on stderr if you get a nonzero exit code.
62 62 """
63 63 )
64 64 ]
65 65 for arg in args:
66 66 f = arg(f)
67 67 return f
68 68
69 69
70 70 @contextmanager
71 71 def safe_watcher():
72 72 if sys.platform == "win32":
73 73 yield
74 74 return
75 75
76 76 from asyncio import SafeChildWatcher
77 77
78 78 policy = asyncio.get_event_loop_policy()
79 79 old_watcher = policy.get_child_watcher()
80 80 if isinstance(old_watcher, SafeChildWatcher):
81 81 yield
82 82 return
83 83
84 loop = asyncio.get_event_loop()
84 loop = policy.get_event_loop()
85 85 try:
86 86 watcher = asyncio.SafeChildWatcher()
87 87 watcher.attach_loop(loop)
88 88 policy.set_child_watcher(watcher)
89 89 yield
90 90 finally:
91 91 watcher.close()
92 92 policy.set_child_watcher(old_watcher)
93 93
94 94
95 95 def dec_safe_watcher(fun):
96 96 @functools.wraps(fun)
97 97 def _inner(*args, **kwargs):
98 98 with safe_watcher():
99 99 return fun(*args, **kwargs)
100 100
101 101 return _inner
102 102
103 103
104 104 @magics_class
105 105 class ScriptMagics(Magics):
106 106 """Magics for talking to scripts
107 107
108 108 This defines a base `%%script` cell magic for running a cell
109 109 with a program in a subprocess, and registers a few top-level
110 110 magics that call %%script with common interpreters.
111 111 """
112 112 script_magics = List(
113 113 help="""Extra script cell magics to define
114 114
115 115 This generates simple wrappers of `%%script foo` as `%%foo`.
116 116
117 117 If you want to add script magics that aren't on your path,
118 118 specify them in script_paths
119 119 """,
120 120 ).tag(config=True)
121 121 @default('script_magics')
122 122 def _script_magics_default(self):
123 123 """default to a common list of programs"""
124 124
125 125 defaults = [
126 126 'sh',
127 127 'bash',
128 128 'perl',
129 129 'ruby',
130 130 'python',
131 131 'python2',
132 132 'python3',
133 133 'pypy',
134 134 ]
135 135 if os.name == 'nt':
136 136 defaults.extend([
137 137 'cmd',
138 138 ])
139 139
140 140 return defaults
141 141
142 142 script_paths = Dict(
143 143 help="""Dict mapping short 'ruby' names to full paths, such as '/opt/secret/bin/ruby'
144 144
145 145 Only necessary for items in script_magics where the default path will not
146 146 find the right interpreter.
147 147 """
148 148 ).tag(config=True)
149 149
150 150 def __init__(self, shell=None):
151 151 super(ScriptMagics, self).__init__(shell=shell)
152 152 self._generate_script_magics()
153 153 self.job_manager = BackgroundJobManager()
154 154 self.bg_processes = []
155 155 atexit.register(self.kill_bg_processes)
156 156
157 157 def __del__(self):
158 158 self.kill_bg_processes()
159 159
160 160 def _generate_script_magics(self):
161 161 cell_magics = self.magics['cell']
162 162 for name in self.script_magics:
163 163 cell_magics[name] = self._make_script_magic(name)
164 164
165 165 def _make_script_magic(self, name):
166 166 """make a named magic, that calls %%script with a particular program"""
167 167 # expand to explicit path if necessary:
168 168 script = self.script_paths.get(name, name)
169 169
170 170 @magic_arguments.magic_arguments()
171 171 @script_args
172 172 def named_script_magic(line, cell):
173 173 # if line, add it as cl-flags
174 174 if line:
175 175 line = "%s %s" % (script, line)
176 176 else:
177 177 line = script
178 178 return self.shebang(line, cell)
179 179
180 180 # write a basic docstring:
181 181 named_script_magic.__doc__ = \
182 182 """%%{name} script magic
183 183
184 184 Run cells with {script} in a subprocess.
185 185
186 186 This is a shortcut for `%%script {script}`
187 187 """.format(**locals())
188 188
189 189 return named_script_magic
190 190
191 191 @magic_arguments.magic_arguments()
192 192 @script_args
193 193 @cell_magic("script")
194 194 @dec_safe_watcher
195 195 def shebang(self, line, cell):
196 196 """Run a cell via a shell command
197 197
198 198 The `%%script` line is like the #! line of script,
199 199 specifying a program (bash, perl, ruby, etc.) with which to run.
200 200
201 201 The rest of the cell is run by that program.
202 202
203 203 Examples
204 204 --------
205 205 ::
206 206
207 207 In [1]: %%script bash
208 208 ...: for i in 1 2 3; do
209 209 ...: echo $i
210 210 ...: done
211 211 1
212 212 2
213 213 3
214 214 """
215 215
216 216 async def _handle_stream(stream, stream_arg, file_object):
217 217 while True:
218 218 line = (await stream.readline()).decode("utf8")
219 219 if not line:
220 220 break
221 221 if stream_arg:
222 222 self.shell.user_ns[stream_arg] = line
223 223 else:
224 224 file_object.write(line)
225 225 file_object.flush()
226 226
227 227 async def _stream_communicate(process, cell):
228 228 process.stdin.write(cell)
229 229 process.stdin.close()
230 230 stdout_task = asyncio.create_task(
231 231 _handle_stream(process.stdout, args.out, sys.stdout)
232 232 )
233 233 stderr_task = asyncio.create_task(
234 234 _handle_stream(process.stderr, args.err, sys.stderr)
235 235 )
236 236 await asyncio.wait([stdout_task, stderr_task])
237 237 await process.wait()
238 238
239 239 if sys.platform.startswith("win"):
240 240 asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
241 loop = asyncio.get_event_loop()
241 loop = asyncio.get_event_loop_policy().get_event_loop()
242 242 argv = arg_split(line, posix=not sys.platform.startswith("win"))
243 243 args, cmd = self.shebang.parser.parse_known_args(argv)
244 244 try:
245 245 p = loop.run_until_complete(
246 246 asyncio.create_subprocess_exec(
247 247 *cmd,
248 248 stdout=asyncio.subprocess.PIPE,
249 249 stderr=asyncio.subprocess.PIPE,
250 250 stdin=asyncio.subprocess.PIPE,
251 251 )
252 252 )
253 253 except OSError as e:
254 254 if e.errno == errno.ENOENT:
255 255 print("Couldn't find program: %r" % cmd[0])
256 256 return
257 257 else:
258 258 raise
259 259
260 260 if not cell.endswith('\n'):
261 261 cell += '\n'
262 262 cell = cell.encode('utf8', 'replace')
263 263 if args.bg:
264 264 self.bg_processes.append(p)
265 265 self._gc_bg_processes()
266 266 to_close = []
267 267 if args.out:
268 268 self.shell.user_ns[args.out] = p.stdout
269 269 else:
270 270 to_close.append(p.stdout)
271 271 if args.err:
272 272 self.shell.user_ns[args.err] = p.stderr
273 273 else:
274 274 to_close.append(p.stderr)
275 275 self.job_manager.new(self._run_script, p, cell, to_close, daemon=True)
276 276 if args.proc:
277 277 self.shell.user_ns[args.proc] = p
278 278 return
279 279
280 280 try:
281 281 loop.run_until_complete(_stream_communicate(p, cell))
282 282 except KeyboardInterrupt:
283 283 try:
284 284 p.send_signal(signal.SIGINT)
285 285 time.sleep(0.1)
286 286 if p.returncode is not None:
287 287 print("Process is interrupted.")
288 288 return
289 289 p.terminate()
290 290 time.sleep(0.1)
291 291 if p.returncode is not None:
292 292 print("Process is terminated.")
293 293 return
294 294 p.kill()
295 295 print("Process is killed.")
296 296 except OSError:
297 297 pass
298 298 except Exception as e:
299 299 print("Error while terminating subprocess (pid=%i): %s" % (p.pid, e))
300 300 return
301 301 if args.raise_error and p.returncode!=0:
302 302 # If we get here and p.returncode is still None, we must have
303 303 # killed it but not yet seen its return code. We don't wait for it,
304 304 # in case it's stuck in uninterruptible sleep. -9 = SIGKILL
305 305 rc = p.returncode or -9
306 306 raise CalledProcessError(rc, cell)
307 307
308 308 def _run_script(self, p, cell, to_close):
309 309 """callback for running the script in the background"""
310 310 p.stdin.write(cell)
311 311 p.stdin.close()
312 312 for s in to_close:
313 313 s.close()
314 314 p.wait()
315 315
316 316 @line_magic("killbgscripts")
317 317 def killbgscripts(self, _nouse_=''):
318 318 """Kill all BG processes started by %%script and its family."""
319 319 self.kill_bg_processes()
320 320 print("All background processes were killed.")
321 321
322 322 def kill_bg_processes(self):
323 323 """Kill all BG processes which are still running."""
324 324 if not self.bg_processes:
325 325 return
326 326 for p in self.bg_processes:
327 327 if p.returncode is None:
328 328 try:
329 329 p.send_signal(signal.SIGINT)
330 330 except:
331 331 pass
332 332 time.sleep(0.1)
333 333 self._gc_bg_processes()
334 334 if not self.bg_processes:
335 335 return
336 336 for p in self.bg_processes:
337 337 if p.returncode is None:
338 338 try:
339 339 p.terminate()
340 340 except:
341 341 pass
342 342 time.sleep(0.1)
343 343 self._gc_bg_processes()
344 344 if not self.bg_processes:
345 345 return
346 346 for p in self.bg_processes:
347 347 if p.returncode is None:
348 348 try:
349 349 p.kill()
350 350 except:
351 351 pass
352 352 self._gc_bg_processes()
353 353
354 354 def _gc_bg_processes(self):
355 355 self.bg_processes = [p for p in self.bg_processes if p.returncode is None]
@@ -1,1072 +1,1072
1 1 # -*- coding: utf-8 -*-
2 2 """Tests for the key interactiveshell module.
3 3
4 4 Historically the main classes in interactiveshell have been under-tested. This
5 5 module should grow as many single-method tests as possible to trap many of the
6 6 recurring bugs we seem to encounter with high-level interaction.
7 7 """
8 8
9 9 # Copyright (c) IPython Development Team.
10 10 # Distributed under the terms of the Modified BSD License.
11 11
12 12 import asyncio
13 13 import ast
14 14 import os
15 15 import signal
16 16 import shutil
17 17 import sys
18 18 import tempfile
19 19 import unittest
20 20 from unittest import mock
21 21
22 22 from os.path import join
23 23
24 24 from IPython.core.error import InputRejected
25 25 from IPython.core.inputtransformer import InputTransformer
26 26 from IPython.core import interactiveshell
27 27 from IPython.testing.decorators import (
28 28 skipif, skip_win32, onlyif_unicode_paths, onlyif_cmds_exist,
29 29 )
30 30 from IPython.testing import tools as tt
31 31 from IPython.utils.process import find_cmd
32 32
33 33 #-----------------------------------------------------------------------------
34 34 # Globals
35 35 #-----------------------------------------------------------------------------
36 36 # This is used by every single test, no point repeating it ad nauseam
37 37
38 38 #-----------------------------------------------------------------------------
39 39 # Tests
40 40 #-----------------------------------------------------------------------------
41 41
42 42 class DerivedInterrupt(KeyboardInterrupt):
43 43 pass
44 44
45 45 class InteractiveShellTestCase(unittest.TestCase):
46 46 def test_naked_string_cells(self):
47 47 """Test that cells with only naked strings are fully executed"""
48 48 # First, single-line inputs
49 49 ip.run_cell('"a"\n')
50 50 self.assertEqual(ip.user_ns['_'], 'a')
51 51 # And also multi-line cells
52 52 ip.run_cell('"""a\nb"""\n')
53 53 self.assertEqual(ip.user_ns['_'], 'a\nb')
54 54
55 55 def test_run_empty_cell(self):
56 56 """Just make sure we don't get a horrible error with a blank
57 57 cell of input. Yes, I did overlook that."""
58 58 old_xc = ip.execution_count
59 59 res = ip.run_cell('')
60 60 self.assertEqual(ip.execution_count, old_xc)
61 61 self.assertEqual(res.execution_count, None)
62 62
63 63 def test_run_cell_multiline(self):
64 64 """Multi-block, multi-line cells must execute correctly.
65 65 """
66 66 src = '\n'.join(["x=1",
67 67 "y=2",
68 68 "if 1:",
69 69 " x += 1",
70 70 " y += 1",])
71 71 res = ip.run_cell(src)
72 72 self.assertEqual(ip.user_ns['x'], 2)
73 73 self.assertEqual(ip.user_ns['y'], 3)
74 74 self.assertEqual(res.success, True)
75 75 self.assertEqual(res.result, None)
76 76
77 77 def test_multiline_string_cells(self):
78 78 "Code sprinkled with multiline strings should execute (GH-306)"
79 79 ip.run_cell('tmp=0')
80 80 self.assertEqual(ip.user_ns['tmp'], 0)
81 81 res = ip.run_cell('tmp=1;"""a\nb"""\n')
82 82 self.assertEqual(ip.user_ns['tmp'], 1)
83 83 self.assertEqual(res.success, True)
84 84 self.assertEqual(res.result, "a\nb")
85 85
86 86 def test_dont_cache_with_semicolon(self):
87 87 "Ending a line with semicolon should not cache the returned object (GH-307)"
88 88 oldlen = len(ip.user_ns['Out'])
89 89 for cell in ['1;', '1;1;']:
90 90 res = ip.run_cell(cell, store_history=True)
91 91 newlen = len(ip.user_ns['Out'])
92 92 self.assertEqual(oldlen, newlen)
93 93 self.assertIsNone(res.result)
94 94 i = 0
95 95 #also test the default caching behavior
96 96 for cell in ['1', '1;1']:
97 97 ip.run_cell(cell, store_history=True)
98 98 newlen = len(ip.user_ns['Out'])
99 99 i += 1
100 100 self.assertEqual(oldlen+i, newlen)
101 101
102 102 def test_syntax_error(self):
103 103 res = ip.run_cell("raise = 3")
104 104 self.assertIsInstance(res.error_before_exec, SyntaxError)
105 105
106 106 def test_In_variable(self):
107 107 "Verify that In variable grows with user input (GH-284)"
108 108 oldlen = len(ip.user_ns['In'])
109 109 ip.run_cell('1;', store_history=True)
110 110 newlen = len(ip.user_ns['In'])
111 111 self.assertEqual(oldlen+1, newlen)
112 112 self.assertEqual(ip.user_ns['In'][-1],'1;')
113 113
114 114 def test_magic_names_in_string(self):
115 115 ip.run_cell('a = """\n%exit\n"""')
116 116 self.assertEqual(ip.user_ns['a'], '\n%exit\n')
117 117
118 118 def test_trailing_newline(self):
119 119 """test that running !(command) does not raise a SyntaxError"""
120 120 ip.run_cell('!(true)\n', False)
121 121 ip.run_cell('!(true)\n\n\n', False)
122 122
123 123 def test_gh_597(self):
124 124 """Pretty-printing lists of objects with non-ascii reprs may cause
125 125 problems."""
126 126 class Spam(object):
127 127 def __repr__(self):
128 128 return "\xe9"*50
129 129 import IPython.core.formatters
130 130 f = IPython.core.formatters.PlainTextFormatter()
131 131 f([Spam(),Spam()])
132 132
133 133
134 134 def test_future_flags(self):
135 135 """Check that future flags are used for parsing code (gh-777)"""
136 136 ip.run_cell('from __future__ import barry_as_FLUFL')
137 137 try:
138 138 ip.run_cell('prfunc_return_val = 1 <> 2')
139 139 assert 'prfunc_return_val' in ip.user_ns
140 140 finally:
141 141 # Reset compiler flags so we don't mess up other tests.
142 142 ip.compile.reset_compiler_flags()
143 143
144 144 def test_can_pickle(self):
145 145 "Can we pickle objects defined interactively (GH-29)"
146 146 ip = get_ipython()
147 147 ip.reset()
148 148 ip.run_cell(("class Mylist(list):\n"
149 149 " def __init__(self,x=[]):\n"
150 150 " list.__init__(self,x)"))
151 151 ip.run_cell("w=Mylist([1,2,3])")
152 152
153 153 from pickle import dumps
154 154
155 155 # We need to swap in our main module - this is only necessary
156 156 # inside the test framework, because IPython puts the interactive module
157 157 # in place (but the test framework undoes this).
158 158 _main = sys.modules['__main__']
159 159 sys.modules['__main__'] = ip.user_module
160 160 try:
161 161 res = dumps(ip.user_ns["w"])
162 162 finally:
163 163 sys.modules['__main__'] = _main
164 164 self.assertTrue(isinstance(res, bytes))
165 165
166 166 def test_global_ns(self):
167 167 "Code in functions must be able to access variables outside them."
168 168 ip = get_ipython()
169 169 ip.run_cell("a = 10")
170 170 ip.run_cell(("def f(x):\n"
171 171 " return x + a"))
172 172 ip.run_cell("b = f(12)")
173 173 self.assertEqual(ip.user_ns["b"], 22)
174 174
175 175 def test_bad_custom_tb(self):
176 176 """Check that InteractiveShell is protected from bad custom exception handlers"""
177 177 ip.set_custom_exc((IOError,), lambda etype,value,tb: 1/0)
178 178 self.assertEqual(ip.custom_exceptions, (IOError,))
179 179 with tt.AssertPrints("Custom TB Handler failed", channel='stderr'):
180 180 ip.run_cell(u'raise IOError("foo")')
181 181 self.assertEqual(ip.custom_exceptions, ())
182 182
183 183 def test_bad_custom_tb_return(self):
184 184 """Check that InteractiveShell is protected from bad return types in custom exception handlers"""
185 185 ip.set_custom_exc((NameError,),lambda etype,value,tb, tb_offset=None: 1)
186 186 self.assertEqual(ip.custom_exceptions, (NameError,))
187 187 with tt.AssertPrints("Custom TB Handler failed", channel='stderr'):
188 188 ip.run_cell(u'a=abracadabra')
189 189 self.assertEqual(ip.custom_exceptions, ())
190 190
191 191 def test_drop_by_id(self):
192 192 myvars = {"a":object(), "b":object(), "c": object()}
193 193 ip.push(myvars, interactive=False)
194 194 for name in myvars:
195 195 assert name in ip.user_ns, name
196 196 assert name in ip.user_ns_hidden, name
197 197 ip.user_ns['b'] = 12
198 198 ip.drop_by_id(myvars)
199 199 for name in ["a", "c"]:
200 200 assert name not in ip.user_ns, name
201 201 assert name not in ip.user_ns_hidden, name
202 202 assert ip.user_ns['b'] == 12
203 203 ip.reset()
204 204
205 205 def test_var_expand(self):
206 206 ip.user_ns['f'] = u'Ca\xf1o'
207 207 self.assertEqual(ip.var_expand(u'echo $f'), u'echo Ca\xf1o')
208 208 self.assertEqual(ip.var_expand(u'echo {f}'), u'echo Ca\xf1o')
209 209 self.assertEqual(ip.var_expand(u'echo {f[:-1]}'), u'echo Ca\xf1')
210 210 self.assertEqual(ip.var_expand(u'echo {1*2}'), u'echo 2')
211 211
212 212 self.assertEqual(ip.var_expand(u"grep x | awk '{print $1}'"), u"grep x | awk '{print $1}'")
213 213
214 214 ip.user_ns['f'] = b'Ca\xc3\xb1o'
215 215 # This should not raise any exception:
216 216 ip.var_expand(u'echo $f')
217 217
218 218 def test_var_expand_local(self):
219 219 """Test local variable expansion in !system and %magic calls"""
220 220 # !system
221 221 ip.run_cell(
222 222 "def test():\n"
223 223 ' lvar = "ttt"\n'
224 224 " ret = !echo {lvar}\n"
225 225 " return ret[0]\n"
226 226 )
227 227 res = ip.user_ns["test"]()
228 228 self.assertIn("ttt", res)
229 229
230 230 # %magic
231 231 ip.run_cell(
232 232 "def makemacro():\n"
233 233 ' macroname = "macro_var_expand_locals"\n'
234 234 " %macro {macroname} codestr\n"
235 235 )
236 236 ip.user_ns["codestr"] = "str(12)"
237 237 ip.run_cell("makemacro()")
238 238 self.assertIn("macro_var_expand_locals", ip.user_ns)
239 239
240 240 def test_var_expand_self(self):
241 241 """Test variable expansion with the name 'self', which was failing.
242 242
243 243 See https://github.com/ipython/ipython/issues/1878#issuecomment-7698218
244 244 """
245 245 ip.run_cell(
246 246 "class cTest:\n"
247 247 ' classvar="see me"\n'
248 248 " def test(self):\n"
249 249 " res = !echo Variable: {self.classvar}\n"
250 250 " return res[0]\n"
251 251 )
252 252 self.assertIn("see me", ip.user_ns["cTest"]().test())
253 253
254 254 def test_bad_var_expand(self):
255 255 """var_expand on invalid formats shouldn't raise"""
256 256 # SyntaxError
257 257 self.assertEqual(ip.var_expand(u"{'a':5}"), u"{'a':5}")
258 258 # NameError
259 259 self.assertEqual(ip.var_expand(u"{asdf}"), u"{asdf}")
260 260 # ZeroDivisionError
261 261 self.assertEqual(ip.var_expand(u"{1/0}"), u"{1/0}")
262 262
263 263 def test_silent_postexec(self):
264 264 """run_cell(silent=True) doesn't invoke pre/post_run_cell callbacks"""
265 265 pre_explicit = mock.Mock()
266 266 pre_always = mock.Mock()
267 267 post_explicit = mock.Mock()
268 268 post_always = mock.Mock()
269 269 all_mocks = [pre_explicit, pre_always, post_explicit, post_always]
270 270
271 271 ip.events.register('pre_run_cell', pre_explicit)
272 272 ip.events.register('pre_execute', pre_always)
273 273 ip.events.register('post_run_cell', post_explicit)
274 274 ip.events.register('post_execute', post_always)
275 275
276 276 try:
277 277 ip.run_cell("1", silent=True)
278 278 assert pre_always.called
279 279 assert not pre_explicit.called
280 280 assert post_always.called
281 281 assert not post_explicit.called
282 282 # double-check that non-silent exec did what we expected
283 283 # silent to avoid
284 284 ip.run_cell("1")
285 285 assert pre_explicit.called
286 286 assert post_explicit.called
287 287 info, = pre_explicit.call_args[0]
288 288 result, = post_explicit.call_args[0]
289 289 self.assertEqual(info, result.info)
290 290 # check that post hooks are always called
291 291 [m.reset_mock() for m in all_mocks]
292 292 ip.run_cell("syntax error")
293 293 assert pre_always.called
294 294 assert pre_explicit.called
295 295 assert post_always.called
296 296 assert post_explicit.called
297 297 info, = pre_explicit.call_args[0]
298 298 result, = post_explicit.call_args[0]
299 299 self.assertEqual(info, result.info)
300 300 finally:
301 301 # remove post-exec
302 302 ip.events.unregister('pre_run_cell', pre_explicit)
303 303 ip.events.unregister('pre_execute', pre_always)
304 304 ip.events.unregister('post_run_cell', post_explicit)
305 305 ip.events.unregister('post_execute', post_always)
306 306
307 307 def test_silent_noadvance(self):
308 308 """run_cell(silent=True) doesn't advance execution_count"""
309 309 ec = ip.execution_count
310 310 # silent should force store_history=False
311 311 ip.run_cell("1", store_history=True, silent=True)
312 312
313 313 self.assertEqual(ec, ip.execution_count)
314 314 # double-check that non-silent exec did what we expected
315 315 # silent to avoid
316 316 ip.run_cell("1", store_history=True)
317 317 self.assertEqual(ec+1, ip.execution_count)
318 318
319 319 def test_silent_nodisplayhook(self):
320 320 """run_cell(silent=True) doesn't trigger displayhook"""
321 321 d = dict(called=False)
322 322
323 323 trap = ip.display_trap
324 324 save_hook = trap.hook
325 325
326 326 def failing_hook(*args, **kwargs):
327 327 d['called'] = True
328 328
329 329 try:
330 330 trap.hook = failing_hook
331 331 res = ip.run_cell("1", silent=True)
332 332 self.assertFalse(d['called'])
333 333 self.assertIsNone(res.result)
334 334 # double-check that non-silent exec did what we expected
335 335 # silent to avoid
336 336 ip.run_cell("1")
337 337 self.assertTrue(d['called'])
338 338 finally:
339 339 trap.hook = save_hook
340 340
341 341 def test_ofind_line_magic(self):
342 342 from IPython.core.magic import register_line_magic
343 343
344 344 @register_line_magic
345 345 def lmagic(line):
346 346 "A line magic"
347 347
348 348 # Get info on line magic
349 349 lfind = ip._ofind('lmagic')
350 350 info = dict(found=True, isalias=False, ismagic=True,
351 351 namespace = 'IPython internal', obj= lmagic.__wrapped__,
352 352 parent = None)
353 353 self.assertEqual(lfind, info)
354 354
355 355 def test_ofind_cell_magic(self):
356 356 from IPython.core.magic import register_cell_magic
357 357
358 358 @register_cell_magic
359 359 def cmagic(line, cell):
360 360 "A cell magic"
361 361
362 362 # Get info on cell magic
363 363 find = ip._ofind('cmagic')
364 364 info = dict(found=True, isalias=False, ismagic=True,
365 365 namespace = 'IPython internal', obj= cmagic.__wrapped__,
366 366 parent = None)
367 367 self.assertEqual(find, info)
368 368
369 369 def test_ofind_property_with_error(self):
370 370 class A(object):
371 371 @property
372 372 def foo(self):
373 373 raise NotImplementedError()
374 374 a = A()
375 375
376 376 found = ip._ofind('a.foo', [('locals', locals())])
377 377 info = dict(found=True, isalias=False, ismagic=False,
378 378 namespace='locals', obj=A.foo, parent=a)
379 379 self.assertEqual(found, info)
380 380
381 381 def test_ofind_multiple_attribute_lookups(self):
382 382 class A(object):
383 383 @property
384 384 def foo(self):
385 385 raise NotImplementedError()
386 386
387 387 a = A()
388 388 a.a = A()
389 389 a.a.a = A()
390 390
391 391 found = ip._ofind('a.a.a.foo', [('locals', locals())])
392 392 info = dict(found=True, isalias=False, ismagic=False,
393 393 namespace='locals', obj=A.foo, parent=a.a.a)
394 394 self.assertEqual(found, info)
395 395
396 396 def test_ofind_slotted_attributes(self):
397 397 class A(object):
398 398 __slots__ = ['foo']
399 399 def __init__(self):
400 400 self.foo = 'bar'
401 401
402 402 a = A()
403 403 found = ip._ofind('a.foo', [('locals', locals())])
404 404 info = dict(found=True, isalias=False, ismagic=False,
405 405 namespace='locals', obj=a.foo, parent=a)
406 406 self.assertEqual(found, info)
407 407
408 408 found = ip._ofind('a.bar', [('locals', locals())])
409 409 info = dict(found=False, isalias=False, ismagic=False,
410 410 namespace=None, obj=None, parent=a)
411 411 self.assertEqual(found, info)
412 412
413 413 def test_ofind_prefers_property_to_instance_level_attribute(self):
414 414 class A(object):
415 415 @property
416 416 def foo(self):
417 417 return 'bar'
418 418 a = A()
419 419 a.__dict__["foo"] = "baz"
420 420 self.assertEqual(a.foo, "bar")
421 421 found = ip._ofind("a.foo", [("locals", locals())])
422 422 self.assertIs(found["obj"], A.foo)
423 423
424 424 def test_custom_syntaxerror_exception(self):
425 425 called = []
426 426 def my_handler(shell, etype, value, tb, tb_offset=None):
427 427 called.append(etype)
428 428 shell.showtraceback((etype, value, tb), tb_offset=tb_offset)
429 429
430 430 ip.set_custom_exc((SyntaxError,), my_handler)
431 431 try:
432 432 ip.run_cell("1f")
433 433 # Check that this was called, and only once.
434 434 self.assertEqual(called, [SyntaxError])
435 435 finally:
436 436 # Reset the custom exception hook
437 437 ip.set_custom_exc((), None)
438 438
439 439 def test_custom_exception(self):
440 440 called = []
441 441 def my_handler(shell, etype, value, tb, tb_offset=None):
442 442 called.append(etype)
443 443 shell.showtraceback((etype, value, tb), tb_offset=tb_offset)
444 444
445 445 ip.set_custom_exc((ValueError,), my_handler)
446 446 try:
447 447 res = ip.run_cell("raise ValueError('test')")
448 448 # Check that this was called, and only once.
449 449 self.assertEqual(called, [ValueError])
450 450 # Check that the error is on the result object
451 451 self.assertIsInstance(res.error_in_exec, ValueError)
452 452 finally:
453 453 # Reset the custom exception hook
454 454 ip.set_custom_exc((), None)
455 455
456 456 @mock.patch("builtins.print")
457 457 def test_showtraceback_with_surrogates(self, mocked_print):
458 458 values = []
459 459
460 460 def mock_print_func(value, sep=" ", end="\n", file=sys.stdout, flush=False):
461 461 values.append(value)
462 462 if value == chr(0xD8FF):
463 463 raise UnicodeEncodeError("utf-8", chr(0xD8FF), 0, 1, "")
464 464
465 465 # mock builtins.print
466 466 mocked_print.side_effect = mock_print_func
467 467
468 468 # ip._showtraceback() is replaced in globalipapp.py.
469 469 # Call original method to test.
470 470 interactiveshell.InteractiveShell._showtraceback(ip, None, None, chr(0xD8FF))
471 471
472 472 self.assertEqual(mocked_print.call_count, 2)
473 473 self.assertEqual(values, [chr(0xD8FF), "\\ud8ff"])
474 474
475 475 def test_mktempfile(self):
476 476 filename = ip.mktempfile()
477 477 # Check that we can open the file again on Windows
478 478 with open(filename, 'w') as f:
479 479 f.write('abc')
480 480
481 481 filename = ip.mktempfile(data='blah')
482 482 with open(filename, 'r') as f:
483 483 self.assertEqual(f.read(), 'blah')
484 484
485 485 def test_new_main_mod(self):
486 486 # Smoketest to check that this accepts a unicode module name
487 487 name = u'jiefmw'
488 488 mod = ip.new_main_mod(u'%s.py' % name, name)
489 489 self.assertEqual(mod.__name__, name)
490 490
491 491 def test_get_exception_only(self):
492 492 try:
493 493 raise KeyboardInterrupt
494 494 except KeyboardInterrupt:
495 495 msg = ip.get_exception_only()
496 496 self.assertEqual(msg, 'KeyboardInterrupt\n')
497 497
498 498 try:
499 499 raise DerivedInterrupt("foo")
500 500 except KeyboardInterrupt:
501 501 msg = ip.get_exception_only()
502 502 self.assertEqual(msg, 'IPython.core.tests.test_interactiveshell.DerivedInterrupt: foo\n')
503 503
504 504 def test_inspect_text(self):
505 505 ip.run_cell('a = 5')
506 506 text = ip.object_inspect_text('a')
507 507 self.assertIsInstance(text, str)
508 508
509 509 def test_last_execution_result(self):
510 510 """ Check that last execution result gets set correctly (GH-10702) """
511 511 result = ip.run_cell('a = 5; a')
512 512 self.assertTrue(ip.last_execution_succeeded)
513 513 self.assertEqual(ip.last_execution_result.result, 5)
514 514
515 515 result = ip.run_cell('a = x_invalid_id_x')
516 516 self.assertFalse(ip.last_execution_succeeded)
517 517 self.assertFalse(ip.last_execution_result.success)
518 518 self.assertIsInstance(ip.last_execution_result.error_in_exec, NameError)
519 519
520 520 def test_reset_aliasing(self):
521 521 """ Check that standard posix aliases work after %reset. """
522 522 if os.name != 'posix':
523 523 return
524 524
525 525 ip.reset()
526 526 for cmd in ('clear', 'more', 'less', 'man'):
527 527 res = ip.run_cell('%' + cmd)
528 528 self.assertEqual(res.success, True)
529 529
530 530
531 531 class TestSafeExecfileNonAsciiPath(unittest.TestCase):
532 532
533 533 @onlyif_unicode_paths
534 534 def setUp(self):
535 535 self.BASETESTDIR = tempfile.mkdtemp()
536 536 self.TESTDIR = join(self.BASETESTDIR, u"Γ₯Àâ")
537 537 os.mkdir(self.TESTDIR)
538 538 with open(join(self.TESTDIR, u"Γ₯Àâtestscript.py"), "w") as sfile:
539 539 sfile.write("pass\n")
540 540 self.oldpath = os.getcwd()
541 541 os.chdir(self.TESTDIR)
542 542 self.fname = u"Γ₯Àâtestscript.py"
543 543
544 544 def tearDown(self):
545 545 os.chdir(self.oldpath)
546 546 shutil.rmtree(self.BASETESTDIR)
547 547
548 548 @onlyif_unicode_paths
549 549 def test_1(self):
550 550 """Test safe_execfile with non-ascii path
551 551 """
552 552 ip.safe_execfile(self.fname, {}, raise_exceptions=True)
553 553
554 554 class ExitCodeChecks(tt.TempFileMixin):
555 555
556 556 def setUp(self):
557 557 self.system = ip.system_raw
558 558
559 559 def test_exit_code_ok(self):
560 560 self.system('exit 0')
561 561 self.assertEqual(ip.user_ns['_exit_code'], 0)
562 562
563 563 def test_exit_code_error(self):
564 564 self.system('exit 1')
565 565 self.assertEqual(ip.user_ns['_exit_code'], 1)
566 566
567 567 @skipif(not hasattr(signal, 'SIGALRM'))
568 568 def test_exit_code_signal(self):
569 569 self.mktmp("import signal, time\n"
570 570 "signal.setitimer(signal.ITIMER_REAL, 0.1)\n"
571 571 "time.sleep(1)\n")
572 572 self.system("%s %s" % (sys.executable, self.fname))
573 573 self.assertEqual(ip.user_ns['_exit_code'], -signal.SIGALRM)
574 574
575 575 @onlyif_cmds_exist("csh")
576 576 def test_exit_code_signal_csh(self):
577 577 SHELL = os.environ.get('SHELL', None)
578 578 os.environ['SHELL'] = find_cmd("csh")
579 579 try:
580 580 self.test_exit_code_signal()
581 581 finally:
582 582 if SHELL is not None:
583 583 os.environ['SHELL'] = SHELL
584 584 else:
585 585 del os.environ['SHELL']
586 586
587 587
588 588 class TestSystemRaw(ExitCodeChecks):
589 589
590 590 def setUp(self):
591 591 super().setUp()
592 592 self.system = ip.system_raw
593 593
594 594 @onlyif_unicode_paths
595 595 def test_1(self):
596 596 """Test system_raw with non-ascii cmd
597 597 """
598 598 cmd = u'''python -c "'Γ₯Àâ'" '''
599 599 ip.system_raw(cmd)
600 600
601 601 @mock.patch('subprocess.call', side_effect=KeyboardInterrupt)
602 602 @mock.patch('os.system', side_effect=KeyboardInterrupt)
603 603 def test_control_c(self, *mocks):
604 604 try:
605 605 self.system("sleep 1 # wont happen")
606 606 except KeyboardInterrupt:
607 607 self.fail(
608 608 "system call should intercept "
609 609 "keyboard interrupt from subprocess.call"
610 610 )
611 611 self.assertEqual(ip.user_ns["_exit_code"], -signal.SIGINT)
612 612
613 613 def test_magic_warnings(self):
614 614 for magic_cmd in ("ls", "pip", "conda", "cd"):
615 615 with self.assertWarnsRegex(Warning, "You executed the system command"):
616 616 ip.system_raw(magic_cmd)
617 617
618 618 # TODO: Exit codes are currently ignored on Windows.
619 619 class TestSystemPipedExitCode(ExitCodeChecks):
620 620
621 621 def setUp(self):
622 622 super().setUp()
623 623 self.system = ip.system_piped
624 624
625 625 @skip_win32
626 626 def test_exit_code_ok(self):
627 627 ExitCodeChecks.test_exit_code_ok(self)
628 628
629 629 @skip_win32
630 630 def test_exit_code_error(self):
631 631 ExitCodeChecks.test_exit_code_error(self)
632 632
633 633 @skip_win32
634 634 def test_exit_code_signal(self):
635 635 ExitCodeChecks.test_exit_code_signal(self)
636 636
637 637 class TestModules(tt.TempFileMixin):
638 638 def test_extraneous_loads(self):
639 639 """Test we're not loading modules on startup that we shouldn't.
640 640 """
641 641 self.mktmp("import sys\n"
642 642 "print('numpy' in sys.modules)\n"
643 643 "print('ipyparallel' in sys.modules)\n"
644 644 "print('ipykernel' in sys.modules)\n"
645 645 )
646 646 out = "False\nFalse\nFalse\n"
647 647 tt.ipexec_validate(self.fname, out)
648 648
649 649 class Negator(ast.NodeTransformer):
650 650 """Negates all number literals in an AST."""
651 651
652 652 # for python 3.7 and earlier
653 653 def visit_Num(self, node):
654 654 node.n = -node.n
655 655 return node
656 656
657 657 # for python 3.8+
658 658 def visit_Constant(self, node):
659 659 if isinstance(node.value, int):
660 660 return self.visit_Num(node)
661 661 return node
662 662
663 663 class TestAstTransform(unittest.TestCase):
664 664 def setUp(self):
665 665 self.negator = Negator()
666 666 ip.ast_transformers.append(self.negator)
667 667
668 668 def tearDown(self):
669 669 ip.ast_transformers.remove(self.negator)
670 670
671 671 def test_run_cell(self):
672 672 with tt.AssertPrints('-34'):
673 673 ip.run_cell('print (12 + 22)')
674 674
675 675 # A named reference to a number shouldn't be transformed.
676 676 ip.user_ns['n'] = 55
677 677 with tt.AssertNotPrints('-55'):
678 678 ip.run_cell('print (n)')
679 679
680 680 def test_timeit(self):
681 681 called = set()
682 682 def f(x):
683 683 called.add(x)
684 684 ip.push({'f':f})
685 685
686 686 with tt.AssertPrints("std. dev. of"):
687 687 ip.run_line_magic("timeit", "-n1 f(1)")
688 688 self.assertEqual(called, {-1})
689 689 called.clear()
690 690
691 691 with tt.AssertPrints("std. dev. of"):
692 692 ip.run_cell_magic("timeit", "-n1 f(2)", "f(3)")
693 693 self.assertEqual(called, {-2, -3})
694 694
695 695 def test_time(self):
696 696 called = []
697 697 def f(x):
698 698 called.append(x)
699 699 ip.push({'f':f})
700 700
701 701 # Test with an expression
702 702 with tt.AssertPrints("Wall time: "):
703 703 ip.run_line_magic("time", "f(5+9)")
704 704 self.assertEqual(called, [-14])
705 705 called[:] = []
706 706
707 707 # Test with a statement (different code path)
708 708 with tt.AssertPrints("Wall time: "):
709 709 ip.run_line_magic("time", "a = f(-3 + -2)")
710 710 self.assertEqual(called, [5])
711 711
712 712 def test_macro(self):
713 713 ip.push({'a':10})
714 714 # The AST transformation makes this do a+=-1
715 715 ip.define_macro("amacro", "a+=1\nprint(a)")
716 716
717 717 with tt.AssertPrints("9"):
718 718 ip.run_cell("amacro")
719 719 with tt.AssertPrints("8"):
720 720 ip.run_cell("amacro")
721 721
722 722 class TestMiscTransform(unittest.TestCase):
723 723
724 724
725 725 def test_transform_only_once(self):
726 726 cleanup = 0
727 727 line_t = 0
728 728 def count_cleanup(lines):
729 729 nonlocal cleanup
730 730 cleanup += 1
731 731 return lines
732 732
733 733 def count_line_t(lines):
734 734 nonlocal line_t
735 735 line_t += 1
736 736 return lines
737 737
738 738 ip.input_transformer_manager.cleanup_transforms.append(count_cleanup)
739 739 ip.input_transformer_manager.line_transforms.append(count_line_t)
740 740
741 741 ip.run_cell('1')
742 742
743 743 assert cleanup == 1
744 744 assert line_t == 1
745 745
746 746 class IntegerWrapper(ast.NodeTransformer):
747 747 """Wraps all integers in a call to Integer()"""
748 748
749 749 # for Python 3.7 and earlier
750 750
751 751 # for Python 3.7 and earlier
752 752 def visit_Num(self, node):
753 753 if isinstance(node.n, int):
754 754 return ast.Call(func=ast.Name(id='Integer', ctx=ast.Load()),
755 755 args=[node], keywords=[])
756 756 return node
757 757
758 758 # For Python 3.8+
759 759 def visit_Constant(self, node):
760 760 if isinstance(node.value, int):
761 761 return self.visit_Num(node)
762 762 return node
763 763
764 764
765 765 class TestAstTransform2(unittest.TestCase):
766 766 def setUp(self):
767 767 self.intwrapper = IntegerWrapper()
768 768 ip.ast_transformers.append(self.intwrapper)
769 769
770 770 self.calls = []
771 771 def Integer(*args):
772 772 self.calls.append(args)
773 773 return args
774 774 ip.push({"Integer": Integer})
775 775
776 776 def tearDown(self):
777 777 ip.ast_transformers.remove(self.intwrapper)
778 778 del ip.user_ns['Integer']
779 779
780 780 def test_run_cell(self):
781 781 ip.run_cell("n = 2")
782 782 self.assertEqual(self.calls, [(2,)])
783 783
784 784 # This shouldn't throw an error
785 785 ip.run_cell("o = 2.0")
786 786 self.assertEqual(ip.user_ns['o'], 2.0)
787 787
788 788 def test_timeit(self):
789 789 called = set()
790 790 def f(x):
791 791 called.add(x)
792 792 ip.push({'f':f})
793 793
794 794 with tt.AssertPrints("std. dev. of"):
795 795 ip.run_line_magic("timeit", "-n1 f(1)")
796 796 self.assertEqual(called, {(1,)})
797 797 called.clear()
798 798
799 799 with tt.AssertPrints("std. dev. of"):
800 800 ip.run_cell_magic("timeit", "-n1 f(2)", "f(3)")
801 801 self.assertEqual(called, {(2,), (3,)})
802 802
803 803 class ErrorTransformer(ast.NodeTransformer):
804 804 """Throws an error when it sees a number."""
805 805
806 806 # for Python 3.7 and earlier
807 807 def visit_Num(self, node):
808 808 raise ValueError("test")
809 809
810 810 # for Python 3.8+
811 811 def visit_Constant(self, node):
812 812 if isinstance(node.value, int):
813 813 return self.visit_Num(node)
814 814 return node
815 815
816 816
817 817 class TestAstTransformError(unittest.TestCase):
818 818 def test_unregistering(self):
819 819 err_transformer = ErrorTransformer()
820 820 ip.ast_transformers.append(err_transformer)
821 821
822 822 with self.assertWarnsRegex(UserWarning, "It will be unregistered"):
823 823 ip.run_cell("1 + 2")
824 824
825 825 # This should have been removed.
826 826 self.assertNotIn(err_transformer, ip.ast_transformers)
827 827
828 828
829 829 class StringRejector(ast.NodeTransformer):
830 830 """Throws an InputRejected when it sees a string literal.
831 831
832 832 Used to verify that NodeTransformers can signal that a piece of code should
833 833 not be executed by throwing an InputRejected.
834 834 """
835 835
836 836 #for python 3.7 and earlier
837 837 def visit_Str(self, node):
838 838 raise InputRejected("test")
839 839
840 840 # 3.8 only
841 841 def visit_Constant(self, node):
842 842 if isinstance(node.value, str):
843 843 raise InputRejected("test")
844 844 return node
845 845
846 846
847 847 class TestAstTransformInputRejection(unittest.TestCase):
848 848
849 849 def setUp(self):
850 850 self.transformer = StringRejector()
851 851 ip.ast_transformers.append(self.transformer)
852 852
853 853 def tearDown(self):
854 854 ip.ast_transformers.remove(self.transformer)
855 855
856 856 def test_input_rejection(self):
857 857 """Check that NodeTransformers can reject input."""
858 858
859 859 expect_exception_tb = tt.AssertPrints("InputRejected: test")
860 860 expect_no_cell_output = tt.AssertNotPrints("'unsafe'", suppress=False)
861 861
862 862 # Run the same check twice to verify that the transformer is not
863 863 # disabled after raising.
864 864 with expect_exception_tb, expect_no_cell_output:
865 865 ip.run_cell("'unsafe'")
866 866
867 867 with expect_exception_tb, expect_no_cell_output:
868 868 res = ip.run_cell("'unsafe'")
869 869
870 870 self.assertIsInstance(res.error_before_exec, InputRejected)
871 871
872 872 def test__IPYTHON__():
873 873 # This shouldn't raise a NameError, that's all
874 874 __IPYTHON__
875 875
876 876
877 877 class DummyRepr(object):
878 878 def __repr__(self):
879 879 return "DummyRepr"
880 880
881 881 def _repr_html_(self):
882 882 return "<b>dummy</b>"
883 883
884 884 def _repr_javascript_(self):
885 885 return "console.log('hi');", {'key': 'value'}
886 886
887 887
888 888 def test_user_variables():
889 889 # enable all formatters
890 890 ip.display_formatter.active_types = ip.display_formatter.format_types
891 891
892 892 ip.user_ns['dummy'] = d = DummyRepr()
893 893 keys = {'dummy', 'doesnotexist'}
894 894 r = ip.user_expressions({ key:key for key in keys})
895 895
896 896 assert keys == set(r.keys())
897 897 dummy = r["dummy"]
898 898 assert {"status", "data", "metadata"} == set(dummy.keys())
899 899 assert dummy["status"] == "ok"
900 900 data = dummy["data"]
901 901 metadata = dummy["metadata"]
902 902 assert data.get("text/html") == d._repr_html_()
903 903 js, jsmd = d._repr_javascript_()
904 904 assert data.get("application/javascript") == js
905 905 assert metadata.get("application/javascript") == jsmd
906 906
907 907 dne = r["doesnotexist"]
908 908 assert dne["status"] == "error"
909 909 assert dne["ename"] == "NameError"
910 910
911 911 # back to text only
912 912 ip.display_formatter.active_types = ['text/plain']
913 913
914 914 def test_user_expression():
915 915 # enable all formatters
916 916 ip.display_formatter.active_types = ip.display_formatter.format_types
917 917 query = {
918 918 'a' : '1 + 2',
919 919 'b' : '1/0',
920 920 }
921 921 r = ip.user_expressions(query)
922 922 import pprint
923 923 pprint.pprint(r)
924 924 assert set(r.keys()) == set(query.keys())
925 925 a = r["a"]
926 926 assert {"status", "data", "metadata"} == set(a.keys())
927 927 assert a["status"] == "ok"
928 928 data = a["data"]
929 929 metadata = a["metadata"]
930 930 assert data.get("text/plain") == "3"
931 931
932 932 b = r["b"]
933 933 assert b["status"] == "error"
934 934 assert b["ename"] == "ZeroDivisionError"
935 935
936 936 # back to text only
937 937 ip.display_formatter.active_types = ['text/plain']
938 938
939 939
940 940 class TestSyntaxErrorTransformer(unittest.TestCase):
941 941 """Check that SyntaxError raised by an input transformer is handled by run_cell()"""
942 942
943 943 @staticmethod
944 944 def transformer(lines):
945 945 for line in lines:
946 946 pos = line.find('syntaxerror')
947 947 if pos >= 0:
948 948 e = SyntaxError('input contains "syntaxerror"')
949 949 e.text = line
950 950 e.offset = pos + 1
951 951 raise e
952 952 return lines
953 953
954 954 def setUp(self):
955 955 ip.input_transformers_post.append(self.transformer)
956 956
957 957 def tearDown(self):
958 958 ip.input_transformers_post.remove(self.transformer)
959 959
960 960 def test_syntaxerror_input_transformer(self):
961 961 with tt.AssertPrints('1234'):
962 962 ip.run_cell('1234')
963 963 with tt.AssertPrints('SyntaxError: invalid syntax'):
964 964 ip.run_cell('1 2 3') # plain python syntax error
965 965 with tt.AssertPrints('SyntaxError: input contains "syntaxerror"'):
966 966 ip.run_cell('2345 # syntaxerror') # input transformer syntax error
967 967 with tt.AssertPrints('3456'):
968 968 ip.run_cell('3456')
969 969
970 970
971 971 class TestWarningSuppression(unittest.TestCase):
972 972 def test_warning_suppression(self):
973 973 ip.run_cell("import warnings")
974 974 try:
975 975 with self.assertWarnsRegex(UserWarning, "asdf"):
976 976 ip.run_cell("warnings.warn('asdf')")
977 977 # Here's the real test -- if we run that again, we should get the
978 978 # warning again. Traditionally, each warning was only issued once per
979 979 # IPython session (approximately), even if the user typed in new and
980 980 # different code that should have also triggered the warning, leading
981 981 # to much confusion.
982 982 with self.assertWarnsRegex(UserWarning, "asdf"):
983 983 ip.run_cell("warnings.warn('asdf')")
984 984 finally:
985 985 ip.run_cell("del warnings")
986 986
987 987
988 988 def test_deprecation_warning(self):
989 989 ip.run_cell("""
990 990 import warnings
991 991 def wrn():
992 992 warnings.warn(
993 993 "I AM A WARNING",
994 994 DeprecationWarning
995 995 )
996 996 """)
997 997 try:
998 998 with self.assertWarnsRegex(DeprecationWarning, "I AM A WARNING"):
999 999 ip.run_cell("wrn()")
1000 1000 finally:
1001 1001 ip.run_cell("del warnings")
1002 1002 ip.run_cell("del wrn")
1003 1003
1004 1004
1005 1005 class TestImportNoDeprecate(tt.TempFileMixin):
1006 1006
1007 1007 def setUp(self):
1008 1008 """Make a valid python temp file."""
1009 1009 self.mktmp("""
1010 1010 import warnings
1011 1011 def wrn():
1012 1012 warnings.warn(
1013 1013 "I AM A WARNING",
1014 1014 DeprecationWarning
1015 1015 )
1016 1016 """)
1017 1017 super().setUp()
1018 1018
1019 1019 def test_no_dep(self):
1020 1020 """
1021 1021 No deprecation warning should be raised from imported functions
1022 1022 """
1023 1023 ip.run_cell("from {} import wrn".format(self.fname))
1024 1024
1025 1025 with tt.AssertNotPrints("I AM A WARNING"):
1026 1026 ip.run_cell("wrn()")
1027 1027 ip.run_cell("del wrn")
1028 1028
1029 1029
1030 1030 def test_custom_exc_count():
1031 1031 hook = mock.Mock(return_value=None)
1032 1032 ip.set_custom_exc((SyntaxError,), hook)
1033 1033 before = ip.execution_count
1034 1034 ip.run_cell("def foo()", store_history=True)
1035 1035 # restore default excepthook
1036 1036 ip.set_custom_exc((), None)
1037 1037 assert hook.call_count == 1
1038 1038 assert ip.execution_count == before + 1
1039 1039
1040 1040
1041 1041 def test_run_cell_async():
1042 loop = asyncio.get_event_loop()
1042 loop = asyncio.get_event_loop_policy().get_event_loop()
1043 1043 ip.run_cell("import asyncio")
1044 1044 coro = ip.run_cell_async("await asyncio.sleep(0.01)\n5")
1045 1045 assert asyncio.iscoroutine(coro)
1046 1046 result = loop.run_until_complete(coro)
1047 1047 assert isinstance(result, interactiveshell.ExecutionResult)
1048 1048 assert result.result == 5
1049 1049
1050 1050
1051 1051 def test_should_run_async():
1052 1052 assert not ip.should_run_async("a = 5")
1053 1053 assert ip.should_run_async("await x")
1054 1054 assert ip.should_run_async("import asyncio; await asyncio.sleep(1)")
1055 1055
1056 1056
1057 1057 def test_set_custom_completer():
1058 1058 num_completers = len(ip.Completer.matchers)
1059 1059
1060 1060 def foo(*args, **kwargs):
1061 1061 return "I'm a completer!"
1062 1062
1063 1063 ip.set_custom_completer(foo, 0)
1064 1064
1065 1065 # check that we've really added a new completer
1066 1066 assert len(ip.Completer.matchers) == num_completers + 1
1067 1067
1068 1068 # check that the first completer is the function we defined
1069 1069 assert ip.Completer.matchers[0]() == "I'm a completer!"
1070 1070
1071 1071 # clean up
1072 1072 ip.Completer.custom_matchers.pop()
@@ -1,1363 +1,1368
1 1 # -*- coding: utf-8 -*-
2 2 """Tests for various magic functions."""
3 3
4 4 import asyncio
5 5 import io
6 6 import os
7 7 import re
8 8 import shlex
9 9 import sys
10 10 import warnings
11 11 from importlib import invalidate_caches
12 12 from io import StringIO
13 13 from pathlib import Path
14 14 from textwrap import dedent
15 15 from unittest import TestCase, mock
16 16
17 17 import pytest
18 18
19 19 from IPython import get_ipython
20 20 from IPython.core import magic
21 21 from IPython.core.error import UsageError
22 22 from IPython.core.magic import (
23 23 Magics,
24 24 cell_magic,
25 25 line_magic,
26 26 magics_class,
27 27 register_cell_magic,
28 28 register_line_magic,
29 29 )
30 30 from IPython.core.magics import code, execution, logging, osm, script
31 31 from IPython.testing import decorators as dec
32 32 from IPython.testing import tools as tt
33 33 from IPython.utils.io import capture_output
34 34 from IPython.utils.process import find_cmd
35 35 from IPython.utils.tempdir import TemporaryDirectory, TemporaryWorkingDirectory
36 36
37 37 from .test_debugger import PdbTestInput
38 38
39 39
40 40 @magic.magics_class
41 41 class DummyMagics(magic.Magics): pass
42 42
43 43 def test_extract_code_ranges():
44 44 instr = "1 3 5-6 7-9 10:15 17: :10 10- -13 :"
45 45 expected = [
46 46 (0, 1),
47 47 (2, 3),
48 48 (4, 6),
49 49 (6, 9),
50 50 (9, 14),
51 51 (16, None),
52 52 (None, 9),
53 53 (9, None),
54 54 (None, 13),
55 55 (None, None),
56 56 ]
57 57 actual = list(code.extract_code_ranges(instr))
58 58 assert actual == expected
59 59
60 60 def test_extract_symbols():
61 61 source = """import foo\na = 10\ndef b():\n return 42\n\n\nclass A: pass\n\n\n"""
62 62 symbols_args = ["a", "b", "A", "A,b", "A,a", "z"]
63 63 expected = [([], ['a']),
64 64 (["def b():\n return 42\n"], []),
65 65 (["class A: pass\n"], []),
66 66 (["class A: pass\n", "def b():\n return 42\n"], []),
67 67 (["class A: pass\n"], ['a']),
68 68 ([], ['z'])]
69 69 for symbols, exp in zip(symbols_args, expected):
70 70 assert code.extract_symbols(source, symbols) == exp
71 71
72 72
73 73 def test_extract_symbols_raises_exception_with_non_python_code():
74 74 source = ("=begin A Ruby program :)=end\n"
75 75 "def hello\n"
76 76 "puts 'Hello world'\n"
77 77 "end")
78 78 with pytest.raises(SyntaxError):
79 79 code.extract_symbols(source, "hello")
80 80
81 81
82 82 def test_magic_not_found():
83 83 # magic not found raises UsageError
84 84 with pytest.raises(UsageError):
85 85 _ip.magic('doesntexist')
86 86
87 87 # ensure result isn't success when a magic isn't found
88 88 result = _ip.run_cell('%doesntexist')
89 89 assert isinstance(result.error_in_exec, UsageError)
90 90
91 91
92 92 def test_cell_magic_not_found():
93 93 # magic not found raises UsageError
94 94 with pytest.raises(UsageError):
95 95 _ip.run_cell_magic('doesntexist', 'line', 'cell')
96 96
97 97 # ensure result isn't success when a magic isn't found
98 98 result = _ip.run_cell('%%doesntexist')
99 99 assert isinstance(result.error_in_exec, UsageError)
100 100
101 101
102 102 def test_magic_error_status():
103 103 def fail(shell):
104 104 1/0
105 105 _ip.register_magic_function(fail)
106 106 result = _ip.run_cell('%fail')
107 107 assert isinstance(result.error_in_exec, ZeroDivisionError)
108 108
109 109
110 110 def test_config():
111 111 """ test that config magic does not raise
112 112 can happen if Configurable init is moved too early into
113 113 Magics.__init__ as then a Config object will be registered as a
114 114 magic.
115 115 """
116 116 ## should not raise.
117 117 _ip.magic('config')
118 118
119 119 def test_config_available_configs():
120 120 """ test that config magic prints available configs in unique and
121 121 sorted order. """
122 122 with capture_output() as captured:
123 123 _ip.magic('config')
124 124
125 125 stdout = captured.stdout
126 126 config_classes = stdout.strip().split('\n')[1:]
127 127 assert config_classes == sorted(set(config_classes))
128 128
129 129 def test_config_print_class():
130 130 """ test that config with a classname prints the class's options. """
131 131 with capture_output() as captured:
132 132 _ip.magic('config TerminalInteractiveShell')
133 133
134 134 stdout = captured.stdout
135 135 if not re.match("TerminalInteractiveShell.* options", stdout.splitlines()[0]):
136 136 print(stdout)
137 137 raise AssertionError("1st line of stdout not like "
138 138 "'TerminalInteractiveShell.* options'")
139 139
140 140 def test_rehashx():
141 141 # clear up everything
142 142 _ip.alias_manager.clear_aliases()
143 143 del _ip.db['syscmdlist']
144 144
145 145 _ip.magic('rehashx')
146 146 # Practically ALL ipython development systems will have more than 10 aliases
147 147
148 148 assert len(_ip.alias_manager.aliases) > 10
149 149 for name, cmd in _ip.alias_manager.aliases:
150 150 # we must strip dots from alias names
151 151 assert "." not in name
152 152
153 153 # rehashx must fill up syscmdlist
154 154 scoms = _ip.db['syscmdlist']
155 155 assert len(scoms) > 10
156 156
157 157
158 158 def test_magic_parse_options():
159 159 """Test that we don't mangle paths when parsing magic options."""
160 160 ip = get_ipython()
161 161 path = 'c:\\x'
162 162 m = DummyMagics(ip)
163 163 opts = m.parse_options('-f %s' % path,'f:')[0]
164 164 # argv splitting is os-dependent
165 165 if os.name == 'posix':
166 166 expected = 'c:x'
167 167 else:
168 168 expected = path
169 169 assert opts["f"] == expected
170 170
171 171
172 172 def test_magic_parse_long_options():
173 173 """Magic.parse_options can handle --foo=bar long options"""
174 174 ip = get_ipython()
175 175 m = DummyMagics(ip)
176 176 opts, _ = m.parse_options("--foo --bar=bubble", "a", "foo", "bar=")
177 177 assert "foo" in opts
178 178 assert "bar" in opts
179 179 assert opts["bar"] == "bubble"
180 180
181 181
182 182 def doctest_hist_f():
183 183 """Test %hist -f with temporary filename.
184 184
185 185 In [9]: import tempfile
186 186
187 187 In [10]: tfile = tempfile.mktemp('.py','tmp-ipython-')
188 188
189 189 In [11]: %hist -nl -f $tfile 3
190 190
191 191 In [13]: import os; os.unlink(tfile)
192 192 """
193 193
194 194
195 195 def doctest_hist_op():
196 196 """Test %hist -op
197 197
198 198 In [1]: class b(float):
199 199 ...: pass
200 200 ...:
201 201
202 202 In [2]: class s(object):
203 203 ...: def __str__(self):
204 204 ...: return 's'
205 205 ...:
206 206
207 207 In [3]:
208 208
209 209 In [4]: class r(b):
210 210 ...: def __repr__(self):
211 211 ...: return 'r'
212 212 ...:
213 213
214 214 In [5]: class sr(s,r): pass
215 215 ...:
216 216
217 217 In [6]:
218 218
219 219 In [7]: bb=b()
220 220
221 221 In [8]: ss=s()
222 222
223 223 In [9]: rr=r()
224 224
225 225 In [10]: ssrr=sr()
226 226
227 227 In [11]: 4.5
228 228 Out[11]: 4.5
229 229
230 230 In [12]: str(ss)
231 231 Out[12]: 's'
232 232
233 233 In [13]:
234 234
235 235 In [14]: %hist -op
236 236 >>> class b:
237 237 ... pass
238 238 ...
239 239 >>> class s(b):
240 240 ... def __str__(self):
241 241 ... return 's'
242 242 ...
243 243 >>>
244 244 >>> class r(b):
245 245 ... def __repr__(self):
246 246 ... return 'r'
247 247 ...
248 248 >>> class sr(s,r): pass
249 249 >>>
250 250 >>> bb=b()
251 251 >>> ss=s()
252 252 >>> rr=r()
253 253 >>> ssrr=sr()
254 254 >>> 4.5
255 255 4.5
256 256 >>> str(ss)
257 257 's'
258 258 >>>
259 259 """
260 260
261 261 def test_hist_pof():
262 262 ip = get_ipython()
263 263 ip.run_cell("1+2", store_history=True)
264 264 #raise Exception(ip.history_manager.session_number)
265 265 #raise Exception(list(ip.history_manager._get_range_session()))
266 266 with TemporaryDirectory() as td:
267 267 tf = os.path.join(td, 'hist.py')
268 268 ip.run_line_magic('history', '-pof %s' % tf)
269 269 assert os.path.isfile(tf)
270 270
271 271
272 272 def test_macro():
273 273 ip = get_ipython()
274 274 ip.history_manager.reset() # Clear any existing history.
275 275 cmds = ["a=1", "def b():\n return a**2", "print(a,b())"]
276 276 for i, cmd in enumerate(cmds, start=1):
277 277 ip.history_manager.store_inputs(i, cmd)
278 278 ip.magic("macro test 1-3")
279 279 assert ip.user_ns["test"].value == "\n".join(cmds) + "\n"
280 280
281 281 # List macros
282 282 assert "test" in ip.magic("macro")
283 283
284 284
285 285 def test_macro_run():
286 286 """Test that we can run a multi-line macro successfully."""
287 287 ip = get_ipython()
288 288 ip.history_manager.reset()
289 289 cmds = ["a=10", "a+=1", "print(a)", "%macro test 2-3"]
290 290 for cmd in cmds:
291 291 ip.run_cell(cmd, store_history=True)
292 292 assert ip.user_ns["test"].value == "a+=1\nprint(a)\n"
293 293 with tt.AssertPrints("12"):
294 294 ip.run_cell("test")
295 295 with tt.AssertPrints("13"):
296 296 ip.run_cell("test")
297 297
298 298
299 299 def test_magic_magic():
300 300 """Test %magic"""
301 301 ip = get_ipython()
302 302 with capture_output() as captured:
303 303 ip.magic("magic")
304 304
305 305 stdout = captured.stdout
306 306 assert "%magic" in stdout
307 307 assert "IPython" in stdout
308 308 assert "Available" in stdout
309 309
310 310
311 311 @dec.skipif_not_numpy
312 312 def test_numpy_reset_array_undec():
313 313 "Test '%reset array' functionality"
314 314 _ip.ex("import numpy as np")
315 315 _ip.ex("a = np.empty(2)")
316 316 assert "a" in _ip.user_ns
317 317 _ip.magic("reset -f array")
318 318 assert "a" not in _ip.user_ns
319 319
320 320
321 321 def test_reset_out():
322 322 "Test '%reset out' magic"
323 323 _ip.run_cell("parrot = 'dead'", store_history=True)
324 324 # test '%reset -f out', make an Out prompt
325 325 _ip.run_cell("parrot", store_history=True)
326 326 assert "dead" in [_ip.user_ns[x] for x in ("_", "__", "___")]
327 327 _ip.magic("reset -f out")
328 328 assert "dead" not in [_ip.user_ns[x] for x in ("_", "__", "___")]
329 329 assert len(_ip.user_ns["Out"]) == 0
330 330
331 331
332 332 def test_reset_in():
333 333 "Test '%reset in' magic"
334 334 # test '%reset -f in'
335 335 _ip.run_cell("parrot", store_history=True)
336 336 assert "parrot" in [_ip.user_ns[x] for x in ("_i", "_ii", "_iii")]
337 337 _ip.magic("%reset -f in")
338 338 assert "parrot" not in [_ip.user_ns[x] for x in ("_i", "_ii", "_iii")]
339 339 assert len(set(_ip.user_ns["In"])) == 1
340 340
341 341
342 342 def test_reset_dhist():
343 343 "Test '%reset dhist' magic"
344 344 _ip.run_cell("tmp = [d for d in _dh]") # copy before clearing
345 345 _ip.magic("cd " + os.path.dirname(pytest.__file__))
346 346 _ip.magic("cd -")
347 347 assert len(_ip.user_ns["_dh"]) > 0
348 348 _ip.magic("reset -f dhist")
349 349 assert len(_ip.user_ns["_dh"]) == 0
350 350 _ip.run_cell("_dh = [d for d in tmp]") # restore
351 351
352 352
353 353 def test_reset_in_length():
354 354 "Test that '%reset in' preserves In[] length"
355 355 _ip.run_cell("print 'foo'")
356 356 _ip.run_cell("reset -f in")
357 357 assert len(_ip.user_ns["In"]) == _ip.displayhook.prompt_count + 1
358 358
359 359
360 360 class TestResetErrors(TestCase):
361 361
362 362 def test_reset_redefine(self):
363 363
364 364 @magics_class
365 365 class KernelMagics(Magics):
366 366 @line_magic
367 367 def less(self, shell): pass
368 368
369 369 _ip.register_magics(KernelMagics)
370 370
371 371 with self.assertLogs() as cm:
372 372 # hack, we want to just capture logs, but assertLogs fails if not
373 373 # logs get produce.
374 374 # so log one things we ignore.
375 375 import logging as log_mod
376 376 log = log_mod.getLogger()
377 377 log.info('Nothing')
378 378 # end hack.
379 379 _ip.run_cell("reset -f")
380 380
381 381 assert len(cm.output) == 1
382 382 for out in cm.output:
383 383 assert "Invalid alias" not in out
384 384
385 385 def test_tb_syntaxerror():
386 386 """test %tb after a SyntaxError"""
387 387 ip = get_ipython()
388 388 ip.run_cell("for")
389 389
390 390 # trap and validate stdout
391 391 save_stdout = sys.stdout
392 392 try:
393 393 sys.stdout = StringIO()
394 394 ip.run_cell("%tb")
395 395 out = sys.stdout.getvalue()
396 396 finally:
397 397 sys.stdout = save_stdout
398 398 # trim output, and only check the last line
399 399 last_line = out.rstrip().splitlines()[-1].strip()
400 400 assert last_line == "SyntaxError: invalid syntax"
401 401
402 402
403 403 def test_time():
404 404 ip = get_ipython()
405 405
406 406 with tt.AssertPrints("Wall time: "):
407 407 ip.run_cell("%time None")
408 408
409 409 ip.run_cell("def f(kmjy):\n"
410 410 " %time print (2*kmjy)")
411 411
412 412 with tt.AssertPrints("Wall time: "):
413 413 with tt.AssertPrints("hihi", suppress=False):
414 414 ip.run_cell("f('hi')")
415 415
416 416 def test_time_last_not_expression():
417 417 ip.run_cell("%%time\n"
418 418 "var_1 = 1\n"
419 419 "var_2 = 2\n")
420 420 assert ip.user_ns['var_1'] == 1
421 421 del ip.user_ns['var_1']
422 422 assert ip.user_ns['var_2'] == 2
423 423 del ip.user_ns['var_2']
424 424
425 425
426 426 @dec.skip_win32
427 427 def test_time2():
428 428 ip = get_ipython()
429 429
430 430 with tt.AssertPrints("CPU times: user "):
431 431 ip.run_cell("%time None")
432 432
433 433 def test_time3():
434 434 """Erroneous magic function calls, issue gh-3334"""
435 435 ip = get_ipython()
436 436 ip.user_ns.pop('run', None)
437 437
438 438 with tt.AssertNotPrints("not found", channel='stderr'):
439 439 ip.run_cell("%%time\n"
440 440 "run = 0\n"
441 441 "run += 1")
442 442
443 443 def test_multiline_time():
444 444 """Make sure last statement from time return a value."""
445 445 ip = get_ipython()
446 446 ip.user_ns.pop('run', None)
447 447
448 448 ip.run_cell(dedent("""\
449 449 %%time
450 450 a = "ho"
451 451 b = "hey"
452 452 a+b
453 453 """
454 454 )
455 455 )
456 456 assert ip.user_ns_hidden["_"] == "hohey"
457 457
458 458
459 459 def test_time_local_ns():
460 460 """
461 461 Test that local_ns is actually global_ns when running a cell magic
462 462 """
463 463 ip = get_ipython()
464 464 ip.run_cell("%%time\n" "myvar = 1")
465 465 assert ip.user_ns["myvar"] == 1
466 466 del ip.user_ns["myvar"]
467 467
468 468
469 469 def test_doctest_mode():
470 470 "Toggle doctest_mode twice, it should be a no-op and run without error"
471 471 _ip.magic('doctest_mode')
472 472 _ip.magic('doctest_mode')
473 473
474 474
475 475 def test_parse_options():
476 476 """Tests for basic options parsing in magics."""
477 477 # These are only the most minimal of tests, more should be added later. At
478 478 # the very least we check that basic text/unicode calls work OK.
479 479 m = DummyMagics(_ip)
480 480 assert m.parse_options("foo", "")[1] == "foo"
481 481 assert m.parse_options("foo", "")[1] == "foo"
482 482
483 483
484 484 def test_parse_options_preserve_non_option_string():
485 485 """Test to assert preservation of non-option part of magic-block, while parsing magic options."""
486 486 m = DummyMagics(_ip)
487 487 opts, stmt = m.parse_options(
488 488 " -n1 -r 13 _ = 314 + foo", "n:r:", preserve_non_opts=True
489 489 )
490 490 assert opts == {"n": "1", "r": "13"}
491 491 assert stmt == "_ = 314 + foo"
492 492
493 493
494 494 def test_run_magic_preserve_code_block():
495 495 """Test to assert preservation of non-option part of magic-block, while running magic."""
496 496 _ip.user_ns["spaces"] = []
497 497 _ip.magic("timeit -n1 -r1 spaces.append([s.count(' ') for s in ['document']])")
498 498 assert _ip.user_ns["spaces"] == [[0]]
499 499
500 500
501 501 def test_dirops():
502 502 """Test various directory handling operations."""
503 503 # curpath = lambda :os.path.splitdrive(os.getcwd())[1].replace('\\','/')
504 504 curpath = os.getcwd
505 505 startdir = os.getcwd()
506 506 ipdir = os.path.realpath(_ip.ipython_dir)
507 507 try:
508 508 _ip.magic('cd "%s"' % ipdir)
509 509 assert curpath() == ipdir
510 510 _ip.magic('cd -')
511 511 assert curpath() == startdir
512 512 _ip.magic('pushd "%s"' % ipdir)
513 513 assert curpath() == ipdir
514 514 _ip.magic('popd')
515 515 assert curpath() == startdir
516 516 finally:
517 517 os.chdir(startdir)
518 518
519 519
520 520 def test_cd_force_quiet():
521 521 """Test OSMagics.cd_force_quiet option"""
522 522 _ip.config.OSMagics.cd_force_quiet = True
523 523 osmagics = osm.OSMagics(shell=_ip)
524 524
525 525 startdir = os.getcwd()
526 526 ipdir = os.path.realpath(_ip.ipython_dir)
527 527
528 528 try:
529 529 with tt.AssertNotPrints(ipdir):
530 530 osmagics.cd('"%s"' % ipdir)
531 531 with tt.AssertNotPrints(startdir):
532 532 osmagics.cd('-')
533 533 finally:
534 534 os.chdir(startdir)
535 535
536 536
537 537 def test_xmode():
538 538 # Calling xmode three times should be a no-op
539 539 xmode = _ip.InteractiveTB.mode
540 540 for i in range(4):
541 541 _ip.magic("xmode")
542 542 assert _ip.InteractiveTB.mode == xmode
543 543
544 544 def test_reset_hard():
545 545 monitor = []
546 546 class A(object):
547 547 def __del__(self):
548 548 monitor.append(1)
549 549 def __repr__(self):
550 550 return "<A instance>"
551 551
552 552 _ip.user_ns["a"] = A()
553 553 _ip.run_cell("a")
554 554
555 555 assert monitor == []
556 556 _ip.magic("reset -f")
557 557 assert monitor == [1]
558 558
559 559 class TestXdel(tt.TempFileMixin):
560 560 def test_xdel(self):
561 561 """Test that references from %run are cleared by xdel."""
562 562 src = ("class A(object):\n"
563 563 " monitor = []\n"
564 564 " def __del__(self):\n"
565 565 " self.monitor.append(1)\n"
566 566 "a = A()\n")
567 567 self.mktmp(src)
568 568 # %run creates some hidden references...
569 569 _ip.magic("run %s" % self.fname)
570 570 # ... as does the displayhook.
571 571 _ip.run_cell("a")
572 572
573 573 monitor = _ip.user_ns["A"].monitor
574 574 assert monitor == []
575 575
576 576 _ip.magic("xdel a")
577 577
578 578 # Check that a's __del__ method has been called.
579 579 assert monitor == [1]
580 580
581 581 def doctest_who():
582 582 """doctest for %who
583 583
584 584 In [1]: %reset -sf
585 585
586 586 In [2]: alpha = 123
587 587
588 588 In [3]: beta = 'beta'
589 589
590 590 In [4]: %who int
591 591 alpha
592 592
593 593 In [5]: %who str
594 594 beta
595 595
596 596 In [6]: %whos
597 597 Variable Type Data/Info
598 598 ----------------------------
599 599 alpha int 123
600 600 beta str beta
601 601
602 602 In [7]: %who_ls
603 603 Out[7]: ['alpha', 'beta']
604 604 """
605 605
606 606 def test_whos():
607 607 """Check that whos is protected against objects where repr() fails."""
608 608 class A(object):
609 609 def __repr__(self):
610 610 raise Exception()
611 611 _ip.user_ns['a'] = A()
612 612 _ip.magic("whos")
613 613
614 614 def doctest_precision():
615 615 """doctest for %precision
616 616
617 617 In [1]: f = get_ipython().display_formatter.formatters['text/plain']
618 618
619 619 In [2]: %precision 5
620 620 Out[2]: '%.5f'
621 621
622 622 In [3]: f.float_format
623 623 Out[3]: '%.5f'
624 624
625 625 In [4]: %precision %e
626 626 Out[4]: '%e'
627 627
628 628 In [5]: f(3.1415927)
629 629 Out[5]: '3.141593e+00'
630 630 """
631 631
632 632 def test_debug_magic():
633 633 """Test debugging a small code with %debug
634 634
635 635 In [1]: with PdbTestInput(['c']):
636 636 ...: %debug print("a b") #doctest: +ELLIPSIS
637 637 ...:
638 638 ...
639 639 ipdb> c
640 640 a b
641 641 In [2]:
642 642 """
643 643
644 644 def test_psearch():
645 645 with tt.AssertPrints("dict.fromkeys"):
646 646 _ip.run_cell("dict.fr*?")
647 647 with tt.AssertPrints("Ο€.is_integer"):
648 648 _ip.run_cell("Ο€ = 3.14;\nΟ€.is_integ*?")
649 649
650 650 def test_timeit_shlex():
651 651 """test shlex issues with timeit (#1109)"""
652 652 _ip.ex("def f(*a,**kw): pass")
653 653 _ip.magic('timeit -n1 "this is a bug".count(" ")')
654 654 _ip.magic('timeit -r1 -n1 f(" ", 1)')
655 655 _ip.magic('timeit -r1 -n1 f(" ", 1, " ", 2, " ")')
656 656 _ip.magic('timeit -r1 -n1 ("a " + "b")')
657 657 _ip.magic('timeit -r1 -n1 f("a " + "b")')
658 658 _ip.magic('timeit -r1 -n1 f("a " + "b ")')
659 659
660 660
661 661 def test_timeit_special_syntax():
662 662 "Test %%timeit with IPython special syntax"
663 663 @register_line_magic
664 664 def lmagic(line):
665 665 ip = get_ipython()
666 666 ip.user_ns['lmagic_out'] = line
667 667
668 668 # line mode test
669 669 _ip.run_line_magic("timeit", "-n1 -r1 %lmagic my line")
670 670 assert _ip.user_ns["lmagic_out"] == "my line"
671 671 # cell mode test
672 672 _ip.run_cell_magic("timeit", "-n1 -r1", "%lmagic my line2")
673 673 assert _ip.user_ns["lmagic_out"] == "my line2"
674 674
675 675
676 676 def test_timeit_return():
677 677 """
678 678 test whether timeit -o return object
679 679 """
680 680
681 681 res = _ip.run_line_magic('timeit','-n10 -r10 -o 1')
682 682 assert(res is not None)
683 683
684 684 def test_timeit_quiet():
685 685 """
686 686 test quiet option of timeit magic
687 687 """
688 688 with tt.AssertNotPrints("loops"):
689 689 _ip.run_cell("%timeit -n1 -r1 -q 1")
690 690
691 691 def test_timeit_return_quiet():
692 692 with tt.AssertNotPrints("loops"):
693 693 res = _ip.run_line_magic('timeit', '-n1 -r1 -q -o 1')
694 694 assert (res is not None)
695 695
696 696 def test_timeit_invalid_return():
697 697 with pytest.raises(SyntaxError):
698 698 _ip.run_line_magic('timeit', 'return')
699 699
700 700 @dec.skipif(execution.profile is None)
701 701 def test_prun_special_syntax():
702 702 "Test %%prun with IPython special syntax"
703 703 @register_line_magic
704 704 def lmagic(line):
705 705 ip = get_ipython()
706 706 ip.user_ns['lmagic_out'] = line
707 707
708 708 # line mode test
709 709 _ip.run_line_magic("prun", "-q %lmagic my line")
710 710 assert _ip.user_ns["lmagic_out"] == "my line"
711 711 # cell mode test
712 712 _ip.run_cell_magic("prun", "-q", "%lmagic my line2")
713 713 assert _ip.user_ns["lmagic_out"] == "my line2"
714 714
715 715
716 716 @dec.skipif(execution.profile is None)
717 717 def test_prun_quotes():
718 718 "Test that prun does not clobber string escapes (GH #1302)"
719 719 _ip.magic(r"prun -q x = '\t'")
720 720 assert _ip.user_ns["x"] == "\t"
721 721
722 722
723 723 def test_extension():
724 724 # Debugging information for failures of this test
725 725 print('sys.path:')
726 726 for p in sys.path:
727 727 print(' ', p)
728 728 print('CWD', os.getcwd())
729 729
730 730 pytest.raises(ImportError, _ip.magic, "load_ext daft_extension")
731 731 daft_path = os.path.join(os.path.dirname(__file__), "daft_extension")
732 732 sys.path.insert(0, daft_path)
733 733 try:
734 734 _ip.user_ns.pop('arq', None)
735 735 invalidate_caches() # Clear import caches
736 736 _ip.magic("load_ext daft_extension")
737 737 assert _ip.user_ns["arq"] == 185
738 738 _ip.magic("unload_ext daft_extension")
739 739 assert 'arq' not in _ip.user_ns
740 740 finally:
741 741 sys.path.remove(daft_path)
742 742
743 743
744 744 def test_notebook_export_json():
745 745 _ip = get_ipython()
746 746 _ip.history_manager.reset() # Clear any existing history.
747 747 cmds = ["a=1", "def b():\n return a**2", "print('noΓ«l, Γ©tΓ©', b())"]
748 748 for i, cmd in enumerate(cmds, start=1):
749 749 _ip.history_manager.store_inputs(i, cmd)
750 750 with TemporaryDirectory() as td:
751 751 outfile = os.path.join(td, "nb.ipynb")
752 752 _ip.magic("notebook -e %s" % outfile)
753 753
754 754
755 755 class TestEnv(TestCase):
756 756
757 757 def test_env(self):
758 758 env = _ip.magic("env")
759 759 self.assertTrue(isinstance(env, dict))
760 760
761 761 def test_env_secret(self):
762 762 env = _ip.magic("env")
763 763 hidden = "<hidden>"
764 764 with mock.patch.dict(
765 765 os.environ,
766 766 {
767 767 "API_KEY": "abc123",
768 768 "SECRET_THING": "ssshhh",
769 769 "JUPYTER_TOKEN": "",
770 770 "VAR": "abc"
771 771 }
772 772 ):
773 773 env = _ip.magic("env")
774 774 assert env["API_KEY"] == hidden
775 775 assert env["SECRET_THING"] == hidden
776 776 assert env["JUPYTER_TOKEN"] == hidden
777 777 assert env["VAR"] == "abc"
778 778
779 779 def test_env_get_set_simple(self):
780 780 env = _ip.magic("env var val1")
781 781 self.assertEqual(env, None)
782 782 self.assertEqual(os.environ['var'], 'val1')
783 783 self.assertEqual(_ip.magic("env var"), 'val1')
784 784 env = _ip.magic("env var=val2")
785 785 self.assertEqual(env, None)
786 786 self.assertEqual(os.environ['var'], 'val2')
787 787
788 788 def test_env_get_set_complex(self):
789 789 env = _ip.magic("env var 'val1 '' 'val2")
790 790 self.assertEqual(env, None)
791 791 self.assertEqual(os.environ['var'], "'val1 '' 'val2")
792 792 self.assertEqual(_ip.magic("env var"), "'val1 '' 'val2")
793 793 env = _ip.magic('env var=val2 val3="val4')
794 794 self.assertEqual(env, None)
795 795 self.assertEqual(os.environ['var'], 'val2 val3="val4')
796 796
797 797 def test_env_set_bad_input(self):
798 798 self.assertRaises(UsageError, lambda: _ip.magic("set_env var"))
799 799
800 800 def test_env_set_whitespace(self):
801 801 self.assertRaises(UsageError, lambda: _ip.magic("env var A=B"))
802 802
803 803
804 804 class CellMagicTestCase(TestCase):
805 805
806 806 def check_ident(self, magic):
807 807 # Manually called, we get the result
808 808 out = _ip.run_cell_magic(magic, "a", "b")
809 809 assert out == ("a", "b")
810 810 # Via run_cell, it goes into the user's namespace via displayhook
811 811 _ip.run_cell("%%" + magic + " c\nd\n")
812 812 assert _ip.user_ns["_"] == ("c", "d\n")
813 813
814 814 def test_cell_magic_func_deco(self):
815 815 "Cell magic using simple decorator"
816 816 @register_cell_magic
817 817 def cellm(line, cell):
818 818 return line, cell
819 819
820 820 self.check_ident('cellm')
821 821
822 822 def test_cell_magic_reg(self):
823 823 "Cell magic manually registered"
824 824 def cellm(line, cell):
825 825 return line, cell
826 826
827 827 _ip.register_magic_function(cellm, 'cell', 'cellm2')
828 828 self.check_ident('cellm2')
829 829
830 830 def test_cell_magic_class(self):
831 831 "Cell magics declared via a class"
832 832 @magics_class
833 833 class MyMagics(Magics):
834 834
835 835 @cell_magic
836 836 def cellm3(self, line, cell):
837 837 return line, cell
838 838
839 839 _ip.register_magics(MyMagics)
840 840 self.check_ident('cellm3')
841 841
842 842 def test_cell_magic_class2(self):
843 843 "Cell magics declared via a class, #2"
844 844 @magics_class
845 845 class MyMagics2(Magics):
846 846
847 847 @cell_magic('cellm4')
848 848 def cellm33(self, line, cell):
849 849 return line, cell
850 850
851 851 _ip.register_magics(MyMagics2)
852 852 self.check_ident('cellm4')
853 853 # Check that nothing is registered as 'cellm33'
854 854 c33 = _ip.find_cell_magic('cellm33')
855 855 assert c33 == None
856 856
857 857 def test_file():
858 858 """Basic %%writefile"""
859 859 ip = get_ipython()
860 860 with TemporaryDirectory() as td:
861 861 fname = os.path.join(td, 'file1')
862 862 ip.run_cell_magic("writefile", fname, u'\n'.join([
863 863 'line1',
864 864 'line2',
865 865 ]))
866 866 s = Path(fname).read_text()
867 867 assert "line1\n" in s
868 868 assert "line2" in s
869 869
870 870
871 871 @dec.skip_win32
872 872 def test_file_single_quote():
873 873 """Basic %%writefile with embedded single quotes"""
874 874 ip = get_ipython()
875 875 with TemporaryDirectory() as td:
876 876 fname = os.path.join(td, '\'file1\'')
877 877 ip.run_cell_magic("writefile", fname, u'\n'.join([
878 878 'line1',
879 879 'line2',
880 880 ]))
881 881 s = Path(fname).read_text()
882 882 assert "line1\n" in s
883 883 assert "line2" in s
884 884
885 885
886 886 @dec.skip_win32
887 887 def test_file_double_quote():
888 888 """Basic %%writefile with embedded double quotes"""
889 889 ip = get_ipython()
890 890 with TemporaryDirectory() as td:
891 891 fname = os.path.join(td, '"file1"')
892 892 ip.run_cell_magic("writefile", fname, u'\n'.join([
893 893 'line1',
894 894 'line2',
895 895 ]))
896 896 s = Path(fname).read_text()
897 897 assert "line1\n" in s
898 898 assert "line2" in s
899 899
900 900
901 901 def test_file_var_expand():
902 902 """%%writefile $filename"""
903 903 ip = get_ipython()
904 904 with TemporaryDirectory() as td:
905 905 fname = os.path.join(td, 'file1')
906 906 ip.user_ns['filename'] = fname
907 907 ip.run_cell_magic("writefile", '$filename', u'\n'.join([
908 908 'line1',
909 909 'line2',
910 910 ]))
911 911 s = Path(fname).read_text()
912 912 assert "line1\n" in s
913 913 assert "line2" in s
914 914
915 915
916 916 def test_file_unicode():
917 917 """%%writefile with unicode cell"""
918 918 ip = get_ipython()
919 919 with TemporaryDirectory() as td:
920 920 fname = os.path.join(td, 'file1')
921 921 ip.run_cell_magic("writefile", fname, u'\n'.join([
922 922 u'linΓ©1',
923 923 u'linΓ©2',
924 924 ]))
925 925 with io.open(fname, encoding='utf-8') as f:
926 926 s = f.read()
927 927 assert "linΓ©1\n" in s
928 928 assert "linΓ©2" in s
929 929
930 930
931 931 def test_file_amend():
932 932 """%%writefile -a amends files"""
933 933 ip = get_ipython()
934 934 with TemporaryDirectory() as td:
935 935 fname = os.path.join(td, 'file2')
936 936 ip.run_cell_magic("writefile", fname, u'\n'.join([
937 937 'line1',
938 938 'line2',
939 939 ]))
940 940 ip.run_cell_magic("writefile", "-a %s" % fname, u'\n'.join([
941 941 'line3',
942 942 'line4',
943 943 ]))
944 944 s = Path(fname).read_text()
945 945 assert "line1\n" in s
946 946 assert "line3\n" in s
947 947
948 948
949 949 def test_file_spaces():
950 950 """%%file with spaces in filename"""
951 951 ip = get_ipython()
952 952 with TemporaryWorkingDirectory() as td:
953 953 fname = "file name"
954 954 ip.run_cell_magic("file", '"%s"'%fname, u'\n'.join([
955 955 'line1',
956 956 'line2',
957 957 ]))
958 958 s = Path(fname).read_text()
959 959 assert "line1\n" in s
960 960 assert "line2" in s
961 961
962 962
963 963 def test_script_config():
964 964 ip = get_ipython()
965 965 ip.config.ScriptMagics.script_magics = ['whoda']
966 966 sm = script.ScriptMagics(shell=ip)
967 967 assert "whoda" in sm.magics["cell"]
968 968
969 969
970 @pytest.fixture
971 def event_loop():
972 yield asyncio.get_event_loop_policy().get_event_loop()
973
974
970 975 @dec.skip_iptest_but_not_pytest
971 976 @dec.skip_win32
972 977 @pytest.mark.skipif(
973 978 sys.platform == "win32", reason="This test does not run under Windows"
974 979 )
975 def test_script_out():
976 assert asyncio.get_event_loop().is_running() is False
980 def test_script_out(event_loop):
981 assert event_loop.is_running() is False
977 982
978 983 ip = get_ipython()
979 984 ip.run_cell_magic("script", "--out output sh", "echo 'hi'")
980 assert asyncio.get_event_loop().is_running() is False
985 assert event_loop.is_running() is False
981 986 assert ip.user_ns["output"] == "hi\n"
982 987
983 988
984 989 @dec.skip_iptest_but_not_pytest
985 990 @dec.skip_win32
986 991 @pytest.mark.skipif(
987 992 sys.platform == "win32", reason="This test does not run under Windows"
988 993 )
989 def test_script_err():
994 def test_script_err(event_loop):
990 995 ip = get_ipython()
991 assert asyncio.get_event_loop().is_running() is False
996 assert event_loop.is_running() is False
992 997 ip.run_cell_magic("script", "--err error sh", "echo 'hello' >&2")
993 assert asyncio.get_event_loop().is_running() is False
998 assert event_loop.is_running() is False
994 999 assert ip.user_ns["error"] == "hello\n"
995 1000
996 1001
997 1002 @dec.skip_iptest_but_not_pytest
998 1003 @dec.skip_win32
999 1004 @pytest.mark.skipif(
1000 1005 sys.platform == "win32", reason="This test does not run under Windows"
1001 1006 )
1002 1007 def test_script_out_err():
1003 1008
1004 1009 ip = get_ipython()
1005 1010 ip.run_cell_magic(
1006 1011 "script", "--out output --err error sh", "echo 'hi'\necho 'hello' >&2"
1007 1012 )
1008 1013 assert ip.user_ns["output"] == "hi\n"
1009 1014 assert ip.user_ns["error"] == "hello\n"
1010 1015
1011 1016
1012 1017 @dec.skip_iptest_but_not_pytest
1013 1018 @dec.skip_win32
1014 1019 @pytest.mark.skipif(
1015 1020 sys.platform == "win32", reason="This test does not run under Windows"
1016 1021 )
1017 async def test_script_bg_out():
1022 async def test_script_bg_out(event_loop):
1018 1023 ip = get_ipython()
1019 1024 ip.run_cell_magic("script", "--bg --out output sh", "echo 'hi'")
1020 1025 assert (await ip.user_ns["output"].read()) == b"hi\n"
1021 1026 ip.user_ns["output"].close()
1022 asyncio.get_event_loop().stop()
1027 event_loop.stop()
1023 1028
1024 1029
1025 1030 @dec.skip_iptest_but_not_pytest
1026 1031 @dec.skip_win32
1027 1032 @pytest.mark.skipif(
1028 1033 sys.platform == "win32", reason="This test does not run under Windows"
1029 1034 )
1030 1035 async def test_script_bg_err():
1031 1036 ip = get_ipython()
1032 1037 ip.run_cell_magic("script", "--bg --err error sh", "echo 'hello' >&2")
1033 1038 assert (await ip.user_ns["error"].read()) == b"hello\n"
1034 1039 ip.user_ns["error"].close()
1035 1040
1036 1041
1037 1042 @dec.skip_iptest_but_not_pytest
1038 1043 @dec.skip_win32
1039 1044 @pytest.mark.skipif(
1040 1045 sys.platform == "win32", reason="This test does not run under Windows"
1041 1046 )
1042 1047 async def test_script_bg_out_err():
1043 1048 ip = get_ipython()
1044 1049 ip.run_cell_magic(
1045 1050 "script", "--bg --out output --err error sh", "echo 'hi'\necho 'hello' >&2"
1046 1051 )
1047 1052 assert (await ip.user_ns["output"].read()) == b"hi\n"
1048 1053 assert (await ip.user_ns["error"].read()) == b"hello\n"
1049 1054 ip.user_ns["output"].close()
1050 1055 ip.user_ns["error"].close()
1051 1056
1052 1057
1053 1058 def test_script_defaults():
1054 1059 ip = get_ipython()
1055 1060 for cmd in ['sh', 'bash', 'perl', 'ruby']:
1056 1061 try:
1057 1062 find_cmd(cmd)
1058 1063 except Exception:
1059 1064 pass
1060 1065 else:
1061 1066 assert cmd in ip.magics_manager.magics["cell"]
1062 1067
1063 1068
1064 1069 @magics_class
1065 1070 class FooFoo(Magics):
1066 1071 """class with both %foo and %%foo magics"""
1067 1072 @line_magic('foo')
1068 1073 def line_foo(self, line):
1069 1074 "I am line foo"
1070 1075 pass
1071 1076
1072 1077 @cell_magic("foo")
1073 1078 def cell_foo(self, line, cell):
1074 1079 "I am cell foo, not line foo"
1075 1080 pass
1076 1081
1077 1082 def test_line_cell_info():
1078 1083 """%%foo and %foo magics are distinguishable to inspect"""
1079 1084 ip = get_ipython()
1080 1085 ip.magics_manager.register(FooFoo)
1081 1086 oinfo = ip.object_inspect("foo")
1082 1087 assert oinfo["found"] is True
1083 1088 assert oinfo["ismagic"] is True
1084 1089
1085 1090 oinfo = ip.object_inspect("%%foo")
1086 1091 assert oinfo["found"] is True
1087 1092 assert oinfo["ismagic"] is True
1088 1093 assert oinfo["docstring"] == FooFoo.cell_foo.__doc__
1089 1094
1090 1095 oinfo = ip.object_inspect("%foo")
1091 1096 assert oinfo["found"] is True
1092 1097 assert oinfo["ismagic"] is True
1093 1098 assert oinfo["docstring"] == FooFoo.line_foo.__doc__
1094 1099
1095 1100
1096 1101 def test_multiple_magics():
1097 1102 ip = get_ipython()
1098 1103 foo1 = FooFoo(ip)
1099 1104 foo2 = FooFoo(ip)
1100 1105 mm = ip.magics_manager
1101 1106 mm.register(foo1)
1102 1107 assert mm.magics["line"]["foo"].__self__ is foo1
1103 1108 mm.register(foo2)
1104 1109 assert mm.magics["line"]["foo"].__self__ is foo2
1105 1110
1106 1111
1107 1112 def test_alias_magic():
1108 1113 """Test %alias_magic."""
1109 1114 ip = get_ipython()
1110 1115 mm = ip.magics_manager
1111 1116
1112 1117 # Basic operation: both cell and line magics are created, if possible.
1113 1118 ip.run_line_magic("alias_magic", "timeit_alias timeit")
1114 1119 assert "timeit_alias" in mm.magics["line"]
1115 1120 assert "timeit_alias" in mm.magics["cell"]
1116 1121
1117 1122 # --cell is specified, line magic not created.
1118 1123 ip.run_line_magic("alias_magic", "--cell timeit_cell_alias timeit")
1119 1124 assert "timeit_cell_alias" not in mm.magics["line"]
1120 1125 assert "timeit_cell_alias" in mm.magics["cell"]
1121 1126
1122 1127 # Test that line alias is created successfully.
1123 1128 ip.run_line_magic("alias_magic", "--line env_alias env")
1124 1129 assert ip.run_line_magic("env", "") == ip.run_line_magic("env_alias", "")
1125 1130
1126 1131 # Test that line alias with parameters passed in is created successfully.
1127 1132 ip.run_line_magic(
1128 1133 "alias_magic", "--line history_alias history --params " + shlex.quote("3")
1129 1134 )
1130 1135 assert "history_alias" in mm.magics["line"]
1131 1136
1132 1137
1133 1138 def test_save():
1134 1139 """Test %save."""
1135 1140 ip = get_ipython()
1136 1141 ip.history_manager.reset() # Clear any existing history.
1137 1142 cmds = ["a=1", "def b():\n return a**2", "print(a, b())"]
1138 1143 for i, cmd in enumerate(cmds, start=1):
1139 1144 ip.history_manager.store_inputs(i, cmd)
1140 1145 with TemporaryDirectory() as tmpdir:
1141 1146 file = os.path.join(tmpdir, "testsave.py")
1142 1147 ip.run_line_magic("save", "%s 1-10" % file)
1143 1148 content = Path(file).read_text()
1144 1149 assert content.count(cmds[0]) == 1
1145 1150 assert "coding: utf-8" in content
1146 1151 ip.run_line_magic("save", "-a %s 1-10" % file)
1147 1152 content = Path(file).read_text()
1148 1153 assert content.count(cmds[0]) == 2
1149 1154 assert "coding: utf-8" in content
1150 1155
1151 1156
1152 1157 def test_save_with_no_args():
1153 1158 ip = get_ipython()
1154 1159 ip.history_manager.reset() # Clear any existing history.
1155 1160 cmds = ["a=1", "def b():\n return a**2", "print(a, b())", "%save"]
1156 1161 for i, cmd in enumerate(cmds, start=1):
1157 1162 ip.history_manager.store_inputs(i, cmd)
1158 1163
1159 1164 with TemporaryDirectory() as tmpdir:
1160 1165 path = os.path.join(tmpdir, "testsave.py")
1161 1166 ip.run_line_magic("save", path)
1162 1167 content = Path(path).read_text()
1163 1168 expected_content = dedent(
1164 1169 """\
1165 1170 # coding: utf-8
1166 1171 a=1
1167 1172 def b():
1168 1173 return a**2
1169 1174 print(a, b())
1170 1175 """
1171 1176 )
1172 1177 assert content == expected_content
1173 1178
1174 1179
1175 1180 def test_store():
1176 1181 """Test %store."""
1177 1182 ip = get_ipython()
1178 1183 ip.run_line_magic('load_ext', 'storemagic')
1179 1184
1180 1185 # make sure the storage is empty
1181 1186 ip.run_line_magic("store", "-z")
1182 1187 ip.user_ns["var"] = 42
1183 1188 ip.run_line_magic("store", "var")
1184 1189 ip.user_ns["var"] = 39
1185 1190 ip.run_line_magic("store", "-r")
1186 1191 assert ip.user_ns["var"] == 42
1187 1192
1188 1193 ip.run_line_magic("store", "-d var")
1189 1194 ip.user_ns["var"] = 39
1190 1195 ip.run_line_magic("store", "-r")
1191 1196 assert ip.user_ns["var"] == 39
1192 1197
1193 1198
1194 1199 def _run_edit_test(arg_s, exp_filename=None,
1195 1200 exp_lineno=-1,
1196 1201 exp_contents=None,
1197 1202 exp_is_temp=None):
1198 1203 ip = get_ipython()
1199 1204 M = code.CodeMagics(ip)
1200 1205 last_call = ['','']
1201 1206 opts,args = M.parse_options(arg_s,'prxn:')
1202 1207 filename, lineno, is_temp = M._find_edit_target(ip, args, opts, last_call)
1203 1208
1204 1209 if exp_filename is not None:
1205 1210 assert exp_filename == filename
1206 1211 if exp_contents is not None:
1207 1212 with io.open(filename, 'r', encoding='utf-8') as f:
1208 1213 contents = f.read()
1209 1214 assert exp_contents == contents
1210 1215 if exp_lineno != -1:
1211 1216 assert exp_lineno == lineno
1212 1217 if exp_is_temp is not None:
1213 1218 assert exp_is_temp == is_temp
1214 1219
1215 1220
1216 1221 def test_edit_interactive():
1217 1222 """%edit on interactively defined objects"""
1218 1223 ip = get_ipython()
1219 1224 n = ip.execution_count
1220 1225 ip.run_cell("def foo(): return 1", store_history=True)
1221 1226
1222 1227 try:
1223 1228 _run_edit_test("foo")
1224 1229 except code.InteractivelyDefined as e:
1225 1230 assert e.index == n
1226 1231 else:
1227 1232 raise AssertionError("Should have raised InteractivelyDefined")
1228 1233
1229 1234
1230 1235 def test_edit_cell():
1231 1236 """%edit [cell id]"""
1232 1237 ip = get_ipython()
1233 1238
1234 1239 ip.run_cell("def foo(): return 1", store_history=True)
1235 1240
1236 1241 # test
1237 1242 _run_edit_test("1", exp_contents=ip.user_ns['In'][1], exp_is_temp=True)
1238 1243
1239 1244 def test_edit_fname():
1240 1245 """%edit file"""
1241 1246 # test
1242 1247 _run_edit_test("test file.py", exp_filename="test file.py")
1243 1248
1244 1249 def test_bookmark():
1245 1250 ip = get_ipython()
1246 1251 ip.run_line_magic('bookmark', 'bmname')
1247 1252 with tt.AssertPrints('bmname'):
1248 1253 ip.run_line_magic('bookmark', '-l')
1249 1254 ip.run_line_magic('bookmark', '-d bmname')
1250 1255
1251 1256 def test_ls_magic():
1252 1257 ip = get_ipython()
1253 1258 json_formatter = ip.display_formatter.formatters['application/json']
1254 1259 json_formatter.enabled = True
1255 1260 lsmagic = ip.magic('lsmagic')
1256 1261 with warnings.catch_warnings(record=True) as w:
1257 1262 j = json_formatter(lsmagic)
1258 1263 assert sorted(j) == ["cell", "line"]
1259 1264 assert w == [] # no warnings
1260 1265
1261 1266
1262 1267 def test_strip_initial_indent():
1263 1268 def sii(s):
1264 1269 lines = s.splitlines()
1265 1270 return '\n'.join(code.strip_initial_indent(lines))
1266 1271
1267 1272 assert sii(" a = 1\nb = 2") == "a = 1\nb = 2"
1268 1273 assert sii(" a\n b\nc") == "a\n b\nc"
1269 1274 assert sii("a\n b") == "a\n b"
1270 1275
1271 1276 def test_logging_magic_quiet_from_arg():
1272 1277 _ip.config.LoggingMagics.quiet = False
1273 1278 lm = logging.LoggingMagics(shell=_ip)
1274 1279 with TemporaryDirectory() as td:
1275 1280 try:
1276 1281 with tt.AssertNotPrints(re.compile("Activating.*")):
1277 1282 lm.logstart('-q {}'.format(
1278 1283 os.path.join(td, "quiet_from_arg.log")))
1279 1284 finally:
1280 1285 _ip.logger.logstop()
1281 1286
1282 1287 def test_logging_magic_quiet_from_config():
1283 1288 _ip.config.LoggingMagics.quiet = True
1284 1289 lm = logging.LoggingMagics(shell=_ip)
1285 1290 with TemporaryDirectory() as td:
1286 1291 try:
1287 1292 with tt.AssertNotPrints(re.compile("Activating.*")):
1288 1293 lm.logstart(os.path.join(td, "quiet_from_config.log"))
1289 1294 finally:
1290 1295 _ip.logger.logstop()
1291 1296
1292 1297
1293 1298 def test_logging_magic_not_quiet():
1294 1299 _ip.config.LoggingMagics.quiet = False
1295 1300 lm = logging.LoggingMagics(shell=_ip)
1296 1301 with TemporaryDirectory() as td:
1297 1302 try:
1298 1303 with tt.AssertPrints(re.compile("Activating.*")):
1299 1304 lm.logstart(os.path.join(td, "not_quiet.log"))
1300 1305 finally:
1301 1306 _ip.logger.logstop()
1302 1307
1303 1308
1304 1309 def test_time_no_var_expand():
1305 1310 _ip.user_ns['a'] = 5
1306 1311 _ip.user_ns['b'] = []
1307 1312 _ip.magic('time b.append("{a}")')
1308 1313 assert _ip.user_ns['b'] == ['{a}']
1309 1314
1310 1315
1311 1316 # this is slow, put at the end for local testing.
1312 1317 def test_timeit_arguments():
1313 1318 "Test valid timeit arguments, should not cause SyntaxError (GH #1269)"
1314 1319 if sys.version_info < (3,7):
1315 1320 _ip.magic("timeit -n1 -r1 ('#')")
1316 1321 else:
1317 1322 # 3.7 optimize no-op statement like above out, and complain there is
1318 1323 # nothing in the for loop.
1319 1324 _ip.magic("timeit -n1 -r1 a=('#')")
1320 1325
1321 1326
1322 1327 TEST_MODULE = """
1323 1328 print('Loaded my_tmp')
1324 1329 if __name__ == "__main__":
1325 1330 print('I just ran a script')
1326 1331 """
1327 1332
1328 1333
1329 1334 def test_run_module_from_import_hook():
1330 1335 "Test that a module can be loaded via an import hook"
1331 1336 with TemporaryDirectory() as tmpdir:
1332 1337 fullpath = os.path.join(tmpdir, 'my_tmp.py')
1333 1338 Path(fullpath).write_text(TEST_MODULE)
1334 1339
1335 1340 class MyTempImporter(object):
1336 1341 def __init__(self):
1337 1342 pass
1338 1343
1339 1344 def find_module(self, fullname, path=None):
1340 1345 if 'my_tmp' in fullname:
1341 1346 return self
1342 1347 return None
1343 1348
1344 1349 def load_module(self, name):
1345 1350 import imp
1346 1351 return imp.load_source('my_tmp', fullpath)
1347 1352
1348 1353 def get_code(self, fullname):
1349 1354 return compile(Path(fullpath).read_text(), "foo", "exec")
1350 1355
1351 1356 def is_package(self, __):
1352 1357 return False
1353 1358
1354 1359 sys.meta_path.insert(0, MyTempImporter())
1355 1360
1356 1361 with capture_output() as captured:
1357 1362 _ip.magic("run -m my_tmp")
1358 1363 _ip.run_cell("import my_tmp")
1359 1364
1360 1365 output = "Loaded my_tmp\nI just ran a script\nLoaded my_tmp\n"
1361 1366 assert output == captured.stdout
1362 1367
1363 1368 sys.meta_path.pop(0)
@@ -1,695 +1,695
1 1 """IPython terminal interface using prompt_toolkit"""
2 2
3 3 import asyncio
4 4 import os
5 5 import sys
6 6 import warnings
7 7 from warnings import warn
8 8
9 9 from IPython.core.interactiveshell import InteractiveShell, InteractiveShellABC
10 10 from IPython.utils import io
11 11 from IPython.utils.py3compat import input
12 12 from IPython.utils.terminal import toggle_set_term_title, set_term_title, restore_term_title
13 13 from IPython.utils.process import abbrev_cwd
14 14 from traitlets import (
15 15 Bool,
16 16 Unicode,
17 17 Dict,
18 18 Integer,
19 19 observe,
20 20 Instance,
21 21 Type,
22 22 default,
23 23 Enum,
24 24 Union,
25 25 Any,
26 26 validate,
27 27 Float,
28 28 )
29 29
30 30 from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
31 31 from prompt_toolkit.enums import DEFAULT_BUFFER, EditingMode
32 32 from prompt_toolkit.filters import (HasFocus, Condition, IsDone)
33 33 from prompt_toolkit.formatted_text import PygmentsTokens
34 34 from prompt_toolkit.history import InMemoryHistory
35 35 from prompt_toolkit.layout.processors import ConditionalProcessor, HighlightMatchingBracketProcessor
36 36 from prompt_toolkit.output import ColorDepth
37 37 from prompt_toolkit.patch_stdout import patch_stdout
38 38 from prompt_toolkit.shortcuts import PromptSession, CompleteStyle, print_formatted_text
39 39 from prompt_toolkit.styles import DynamicStyle, merge_styles
40 40 from prompt_toolkit.styles.pygments import style_from_pygments_cls, style_from_pygments_dict
41 41 from prompt_toolkit import __version__ as ptk_version
42 42
43 43 from pygments.styles import get_style_by_name
44 44 from pygments.style import Style
45 45 from pygments.token import Token
46 46
47 47 from .debugger import TerminalPdb, Pdb
48 48 from .magics import TerminalMagics
49 49 from .pt_inputhooks import get_inputhook_name_and_func
50 50 from .prompts import Prompts, ClassicPrompts, RichPromptDisplayHook
51 51 from .ptutils import IPythonPTCompleter, IPythonPTLexer
52 52 from .shortcuts import create_ipython_shortcuts
53 53
54 54 DISPLAY_BANNER_DEPRECATED = object()
55 55 PTK3 = ptk_version.startswith('3.')
56 56
57 57
58 58 class _NoStyle(Style): pass
59 59
60 60
61 61
62 62 _style_overrides_light_bg = {
63 63 Token.Prompt: '#ansibrightblue',
64 64 Token.PromptNum: '#ansiblue bold',
65 65 Token.OutPrompt: '#ansibrightred',
66 66 Token.OutPromptNum: '#ansired bold',
67 67 }
68 68
69 69 _style_overrides_linux = {
70 70 Token.Prompt: '#ansibrightgreen',
71 71 Token.PromptNum: '#ansigreen bold',
72 72 Token.OutPrompt: '#ansibrightred',
73 73 Token.OutPromptNum: '#ansired bold',
74 74 }
75 75
76 76 def get_default_editor():
77 77 try:
78 78 return os.environ['EDITOR']
79 79 except KeyError:
80 80 pass
81 81 except UnicodeError:
82 82 warn("$EDITOR environment variable is not pure ASCII. Using platform "
83 83 "default editor.")
84 84
85 85 if os.name == 'posix':
86 86 return 'vi' # the only one guaranteed to be there!
87 87 else:
88 88 return 'notepad' # same in Windows!
89 89
90 90 # conservatively check for tty
91 91 # overridden streams can result in things like:
92 92 # - sys.stdin = None
93 93 # - no isatty method
94 94 for _name in ('stdin', 'stdout', 'stderr'):
95 95 _stream = getattr(sys, _name)
96 96 if not _stream or not hasattr(_stream, 'isatty') or not _stream.isatty():
97 97 _is_tty = False
98 98 break
99 99 else:
100 100 _is_tty = True
101 101
102 102
103 103 _use_simple_prompt = ('IPY_TEST_SIMPLE_PROMPT' in os.environ) or (not _is_tty)
104 104
105 105 def black_reformat_handler(text_before_cursor):
106 106 import black
107 107 formatted_text = black.format_str(text_before_cursor, mode=black.FileMode())
108 108 if not text_before_cursor.endswith('\n') and formatted_text.endswith('\n'):
109 109 formatted_text = formatted_text[:-1]
110 110 return formatted_text
111 111
112 112
113 113 class TerminalInteractiveShell(InteractiveShell):
114 114 mime_renderers = Dict().tag(config=True)
115 115
116 116 space_for_menu = Integer(6, help='Number of line at the bottom of the screen '
117 117 'to reserve for the tab completion menu, '
118 118 'search history, ...etc, the height of '
119 119 'these menus will at most this value. '
120 120 'Increase it is you prefer long and skinny '
121 121 'menus, decrease for short and wide.'
122 122 ).tag(config=True)
123 123
124 124 pt_app = None
125 125 debugger_history = None
126 126
127 127 debugger_history_file = Unicode(
128 128 "~/.pdbhistory", help="File in which to store and read history"
129 129 ).tag(config=True)
130 130
131 131 simple_prompt = Bool(_use_simple_prompt,
132 132 help="""Use `raw_input` for the REPL, without completion and prompt colors.
133 133
134 134 Useful when controlling IPython as a subprocess, and piping STDIN/OUT/ERR. Known usage are:
135 135 IPython own testing machinery, and emacs inferior-shell integration through elpy.
136 136
137 137 This mode default to `True` if the `IPY_TEST_SIMPLE_PROMPT`
138 138 environment variable is set, or the current terminal is not a tty."""
139 139 ).tag(config=True)
140 140
141 141 @property
142 142 def debugger_cls(self):
143 143 return Pdb if self.simple_prompt else TerminalPdb
144 144
145 145 confirm_exit = Bool(True,
146 146 help="""
147 147 Set to confirm when you try to exit IPython with an EOF (Control-D
148 148 in Unix, Control-Z/Enter in Windows). By typing 'exit' or 'quit',
149 149 you can force a direct exit without any confirmation.""",
150 150 ).tag(config=True)
151 151
152 152 editing_mode = Unicode('emacs',
153 153 help="Shortcut style to use at the prompt. 'vi' or 'emacs'.",
154 154 ).tag(config=True)
155 155
156 156 emacs_bindings_in_vi_insert_mode = Bool(
157 157 True,
158 158 help="Add shortcuts from 'emacs' insert mode to 'vi' insert mode.",
159 159 ).tag(config=True)
160 160
161 161 modal_cursor = Bool(
162 162 True,
163 163 help="""
164 164 Cursor shape changes depending on vi mode: beam in vi insert mode,
165 165 block in nav mode, underscore in replace mode.""",
166 166 ).tag(config=True)
167 167
168 168 ttimeoutlen = Float(
169 169 0.01,
170 170 help="""The time in milliseconds that is waited for a key code
171 171 to complete.""",
172 172 ).tag(config=True)
173 173
174 174 timeoutlen = Float(
175 175 0.5,
176 176 help="""The time in milliseconds that is waited for a mapped key
177 177 sequence to complete.""",
178 178 ).tag(config=True)
179 179
180 180 autoformatter = Unicode(None,
181 181 help="Autoformatter to reformat Terminal code. Can be `'black'` or `None`",
182 182 allow_none=True
183 183 ).tag(config=True)
184 184
185 185 mouse_support = Bool(False,
186 186 help="Enable mouse support in the prompt\n(Note: prevents selecting text with the mouse)"
187 187 ).tag(config=True)
188 188
189 189 # We don't load the list of styles for the help string, because loading
190 190 # Pygments plugins takes time and can cause unexpected errors.
191 191 highlighting_style = Union([Unicode('legacy'), Type(klass=Style)],
192 192 help="""The name or class of a Pygments style to use for syntax
193 193 highlighting. To see available styles, run `pygmentize -L styles`."""
194 194 ).tag(config=True)
195 195
196 196 @validate('editing_mode')
197 197 def _validate_editing_mode(self, proposal):
198 198 if proposal['value'].lower() == 'vim':
199 199 proposal['value']= 'vi'
200 200 elif proposal['value'].lower() == 'default':
201 201 proposal['value']= 'emacs'
202 202
203 203 if hasattr(EditingMode, proposal['value'].upper()):
204 204 return proposal['value'].lower()
205 205
206 206 return self.editing_mode
207 207
208 208
209 209 @observe('editing_mode')
210 210 def _editing_mode(self, change):
211 211 if self.pt_app:
212 212 self.pt_app.editing_mode = getattr(EditingMode, change.new.upper())
213 213
214 214 @observe('autoformatter')
215 215 def _autoformatter_changed(self, change):
216 216 formatter = change.new
217 217 if formatter is None:
218 218 self.reformat_handler = lambda x:x
219 219 elif formatter == 'black':
220 220 self.reformat_handler = black_reformat_handler
221 221 else:
222 222 raise ValueError
223 223
224 224 @observe('highlighting_style')
225 225 @observe('colors')
226 226 def _highlighting_style_changed(self, change):
227 227 self.refresh_style()
228 228
229 229 def refresh_style(self):
230 230 self._style = self._make_style_from_name_or_cls(self.highlighting_style)
231 231
232 232
233 233 highlighting_style_overrides = Dict(
234 234 help="Override highlighting format for specific tokens"
235 235 ).tag(config=True)
236 236
237 237 true_color = Bool(False,
238 238 help="""Use 24bit colors instead of 256 colors in prompt highlighting.
239 239 If your terminal supports true color, the following command should
240 240 print ``TRUECOLOR`` in orange::
241 241
242 242 printf \"\\x1b[38;2;255;100;0mTRUECOLOR\\x1b[0m\\n\"
243 243 """,
244 244 ).tag(config=True)
245 245
246 246 editor = Unicode(get_default_editor(),
247 247 help="Set the editor used by IPython (default to $EDITOR/vi/notepad)."
248 248 ).tag(config=True)
249 249
250 250 prompts_class = Type(Prompts, help='Class used to generate Prompt token for prompt_toolkit').tag(config=True)
251 251
252 252 prompts = Instance(Prompts)
253 253
254 254 @default('prompts')
255 255 def _prompts_default(self):
256 256 return self.prompts_class(self)
257 257
258 258 # @observe('prompts')
259 259 # def _(self, change):
260 260 # self._update_layout()
261 261
262 262 @default('displayhook_class')
263 263 def _displayhook_class_default(self):
264 264 return RichPromptDisplayHook
265 265
266 266 term_title = Bool(True,
267 267 help="Automatically set the terminal title"
268 268 ).tag(config=True)
269 269
270 270 term_title_format = Unicode("IPython: {cwd}",
271 271 help="Customize the terminal title format. This is a python format string. " +
272 272 "Available substitutions are: {cwd}."
273 273 ).tag(config=True)
274 274
275 275 display_completions = Enum(('column', 'multicolumn','readlinelike'),
276 276 help= ( "Options for displaying tab completions, 'column', 'multicolumn', and "
277 277 "'readlinelike'. These options are for `prompt_toolkit`, see "
278 278 "`prompt_toolkit` documentation for more information."
279 279 ),
280 280 default_value='multicolumn').tag(config=True)
281 281
282 282 highlight_matching_brackets = Bool(True,
283 283 help="Highlight matching brackets.",
284 284 ).tag(config=True)
285 285
286 286 extra_open_editor_shortcuts = Bool(False,
287 287 help="Enable vi (v) or Emacs (C-X C-E) shortcuts to open an external editor. "
288 288 "This is in addition to the F2 binding, which is always enabled."
289 289 ).tag(config=True)
290 290
291 291 handle_return = Any(None,
292 292 help="Provide an alternative handler to be called when the user presses "
293 293 "Return. This is an advanced option intended for debugging, which "
294 294 "may be changed or removed in later releases."
295 295 ).tag(config=True)
296 296
297 297 enable_history_search = Bool(True,
298 298 help="Allows to enable/disable the prompt toolkit history search"
299 299 ).tag(config=True)
300 300
301 301 prompt_includes_vi_mode = Bool(True,
302 302 help="Display the current vi mode (when using vi editing mode)."
303 303 ).tag(config=True)
304 304
305 305 @observe('term_title')
306 306 def init_term_title(self, change=None):
307 307 # Enable or disable the terminal title.
308 308 if self.term_title:
309 309 toggle_set_term_title(True)
310 310 set_term_title(self.term_title_format.format(cwd=abbrev_cwd()))
311 311 else:
312 312 toggle_set_term_title(False)
313 313
314 314 def restore_term_title(self):
315 315 if self.term_title:
316 316 restore_term_title()
317 317
318 318 def init_display_formatter(self):
319 319 super(TerminalInteractiveShell, self).init_display_formatter()
320 320 # terminal only supports plain text
321 321 self.display_formatter.active_types = ['text/plain']
322 322 # disable `_ipython_display_`
323 323 self.display_formatter.ipython_display_formatter.enabled = False
324 324
325 325 def init_prompt_toolkit_cli(self):
326 326 if self.simple_prompt:
327 327 # Fall back to plain non-interactive output for tests.
328 328 # This is very limited.
329 329 def prompt():
330 330 prompt_text = "".join(x[1] for x in self.prompts.in_prompt_tokens())
331 331 lines = [input(prompt_text)]
332 332 prompt_continuation = "".join(x[1] for x in self.prompts.continuation_prompt_tokens())
333 333 while self.check_complete('\n'.join(lines))[0] == 'incomplete':
334 334 lines.append( input(prompt_continuation) )
335 335 return '\n'.join(lines)
336 336 self.prompt_for_code = prompt
337 337 return
338 338
339 339 # Set up keyboard shortcuts
340 340 key_bindings = create_ipython_shortcuts(self)
341 341
342 342 # Pre-populate history from IPython's history database
343 343 history = InMemoryHistory()
344 344 last_cell = u""
345 345 for __, ___, cell in self.history_manager.get_tail(self.history_load_length,
346 346 include_latest=True):
347 347 # Ignore blank lines and consecutive duplicates
348 348 cell = cell.rstrip()
349 349 if cell and (cell != last_cell):
350 350 history.append_string(cell)
351 351 last_cell = cell
352 352
353 353 self._style = self._make_style_from_name_or_cls(self.highlighting_style)
354 354 self.style = DynamicStyle(lambda: self._style)
355 355
356 356 editing_mode = getattr(EditingMode, self.editing_mode.upper())
357 357
358 358 self.pt_loop = asyncio.new_event_loop()
359 359 self.pt_app = PromptSession(
360 360 auto_suggest=AutoSuggestFromHistory(),
361 361 editing_mode=editing_mode,
362 362 key_bindings=key_bindings,
363 363 history=history,
364 364 completer=IPythonPTCompleter(shell=self),
365 365 enable_history_search=self.enable_history_search,
366 366 style=self.style,
367 367 include_default_pygments_style=False,
368 368 mouse_support=self.mouse_support,
369 369 enable_open_in_editor=self.extra_open_editor_shortcuts,
370 370 color_depth=self.color_depth,
371 371 tempfile_suffix=".py",
372 372 **self._extra_prompt_options()
373 373 )
374 374
375 375 def _make_style_from_name_or_cls(self, name_or_cls):
376 376 """
377 377 Small wrapper that make an IPython compatible style from a style name
378 378
379 379 We need that to add style for prompt ... etc.
380 380 """
381 381 style_overrides = {}
382 382 if name_or_cls == 'legacy':
383 383 legacy = self.colors.lower()
384 384 if legacy == 'linux':
385 385 style_cls = get_style_by_name('monokai')
386 386 style_overrides = _style_overrides_linux
387 387 elif legacy == 'lightbg':
388 388 style_overrides = _style_overrides_light_bg
389 389 style_cls = get_style_by_name('pastie')
390 390 elif legacy == 'neutral':
391 391 # The default theme needs to be visible on both a dark background
392 392 # and a light background, because we can't tell what the terminal
393 393 # looks like. These tweaks to the default theme help with that.
394 394 style_cls = get_style_by_name('default')
395 395 style_overrides.update({
396 396 Token.Number: '#ansigreen',
397 397 Token.Operator: 'noinherit',
398 398 Token.String: '#ansiyellow',
399 399 Token.Name.Function: '#ansiblue',
400 400 Token.Name.Class: 'bold #ansiblue',
401 401 Token.Name.Namespace: 'bold #ansiblue',
402 402 Token.Name.Variable.Magic: '#ansiblue',
403 403 Token.Prompt: '#ansigreen',
404 404 Token.PromptNum: '#ansibrightgreen bold',
405 405 Token.OutPrompt: '#ansired',
406 406 Token.OutPromptNum: '#ansibrightred bold',
407 407 })
408 408
409 409 # Hack: Due to limited color support on the Windows console
410 410 # the prompt colors will be wrong without this
411 411 if os.name == 'nt':
412 412 style_overrides.update({
413 413 Token.Prompt: '#ansidarkgreen',
414 414 Token.PromptNum: '#ansigreen bold',
415 415 Token.OutPrompt: '#ansidarkred',
416 416 Token.OutPromptNum: '#ansired bold',
417 417 })
418 418 elif legacy =='nocolor':
419 419 style_cls=_NoStyle
420 420 style_overrides = {}
421 421 else :
422 422 raise ValueError('Got unknown colors: ', legacy)
423 423 else :
424 424 if isinstance(name_or_cls, str):
425 425 style_cls = get_style_by_name(name_or_cls)
426 426 else:
427 427 style_cls = name_or_cls
428 428 style_overrides = {
429 429 Token.Prompt: '#ansigreen',
430 430 Token.PromptNum: '#ansibrightgreen bold',
431 431 Token.OutPrompt: '#ansired',
432 432 Token.OutPromptNum: '#ansibrightred bold',
433 433 }
434 434 style_overrides.update(self.highlighting_style_overrides)
435 435 style = merge_styles([
436 436 style_from_pygments_cls(style_cls),
437 437 style_from_pygments_dict(style_overrides),
438 438 ])
439 439
440 440 return style
441 441
442 442 @property
443 443 def pt_complete_style(self):
444 444 return {
445 445 'multicolumn': CompleteStyle.MULTI_COLUMN,
446 446 'column': CompleteStyle.COLUMN,
447 447 'readlinelike': CompleteStyle.READLINE_LIKE,
448 448 }[self.display_completions]
449 449
450 450 @property
451 451 def color_depth(self):
452 452 return (ColorDepth.TRUE_COLOR if self.true_color else None)
453 453
454 454 def _extra_prompt_options(self):
455 455 """
456 456 Return the current layout option for the current Terminal InteractiveShell
457 457 """
458 458 def get_message():
459 459 return PygmentsTokens(self.prompts.in_prompt_tokens())
460 460
461 461 if self.editing_mode == 'emacs':
462 462 # with emacs mode the prompt is (usually) static, so we call only
463 463 # the function once. With VI mode it can toggle between [ins] and
464 464 # [nor] so we can't precompute.
465 465 # here I'm going to favor the default keybinding which almost
466 466 # everybody uses to decrease CPU usage.
467 467 # if we have issues with users with custom Prompts we can see how to
468 468 # work around this.
469 469 get_message = get_message()
470 470
471 471 options = {
472 472 'complete_in_thread': False,
473 473 'lexer':IPythonPTLexer(),
474 474 'reserve_space_for_menu':self.space_for_menu,
475 475 'message': get_message,
476 476 'prompt_continuation': (
477 477 lambda width, lineno, is_soft_wrap:
478 478 PygmentsTokens(self.prompts.continuation_prompt_tokens(width))),
479 479 'multiline': True,
480 480 'complete_style': self.pt_complete_style,
481 481
482 482 # Highlight matching brackets, but only when this setting is
483 483 # enabled, and only when the DEFAULT_BUFFER has the focus.
484 484 'input_processors': [ConditionalProcessor(
485 485 processor=HighlightMatchingBracketProcessor(chars='[](){}'),
486 486 filter=HasFocus(DEFAULT_BUFFER) & ~IsDone() &
487 487 Condition(lambda: self.highlight_matching_brackets))],
488 488 }
489 489 if not PTK3:
490 490 options['inputhook'] = self.inputhook
491 491
492 492 return options
493 493
494 494 def prompt_for_code(self):
495 495 if self.rl_next_input:
496 496 default = self.rl_next_input
497 497 self.rl_next_input = None
498 498 else:
499 499 default = ''
500 500
501 501 # In order to make sure that asyncio code written in the
502 502 # interactive shell doesn't interfere with the prompt, we run the
503 503 # prompt in a different event loop.
504 504 # If we don't do this, people could spawn coroutine with a
505 505 # while/true inside which will freeze the prompt.
506 506
507 507 try:
508 508 old_loop = asyncio.get_running_loop()
509 509 except RuntimeError:
510 510 # This happens when the user used `asyncio.run()`.
511 511 old_loop = None
512 512
513 513 asyncio.set_event_loop(self.pt_loop)
514 514 try:
515 515 with patch_stdout(raw=True):
516 516 text = self.pt_app.prompt(
517 517 default=default,
518 518 **self._extra_prompt_options())
519 519 finally:
520 520 # Restore the original event loop.
521 521 asyncio.set_event_loop(old_loop)
522 522
523 523 return text
524 524
525 525 def enable_win_unicode_console(self):
526 526 # Since IPython 7.10 doesn't support python < 3.6 and PEP 528, Python uses the unicode APIs for the Windows
527 527 # console by default, so WUC shouldn't be needed.
528 528 from warnings import warn
529 529 warn("`enable_win_unicode_console` is deprecated since IPython 7.10, does not do anything and will be removed in the future",
530 530 DeprecationWarning,
531 531 stacklevel=2)
532 532
533 533 def init_io(self):
534 534 if sys.platform not in {'win32', 'cli'}:
535 535 return
536 536
537 537 import colorama
538 538 colorama.init()
539 539
540 540 # For some reason we make these wrappers around stdout/stderr.
541 541 # For now, we need to reset them so all output gets coloured.
542 542 # https://github.com/ipython/ipython/issues/8669
543 543 # io.std* are deprecated, but don't show our own deprecation warnings
544 544 # during initialization of the deprecated API.
545 545 with warnings.catch_warnings():
546 546 warnings.simplefilter('ignore', DeprecationWarning)
547 547 io.stdout = io.IOStream(sys.stdout)
548 548 io.stderr = io.IOStream(sys.stderr)
549 549
550 550 def init_magics(self):
551 551 super(TerminalInteractiveShell, self).init_magics()
552 552 self.register_magics(TerminalMagics)
553 553
554 554 def init_alias(self):
555 555 # The parent class defines aliases that can be safely used with any
556 556 # frontend.
557 557 super(TerminalInteractiveShell, self).init_alias()
558 558
559 559 # Now define aliases that only make sense on the terminal, because they
560 560 # need direct access to the console in a way that we can't emulate in
561 561 # GUI or web frontend
562 562 if os.name == 'posix':
563 563 for cmd in ('clear', 'more', 'less', 'man'):
564 564 self.alias_manager.soft_define_alias(cmd, cmd)
565 565
566 566
567 567 def __init__(self, *args, **kwargs):
568 568 super(TerminalInteractiveShell, self).__init__(*args, **kwargs)
569 569 self.init_prompt_toolkit_cli()
570 570 self.init_term_title()
571 571 self.keep_running = True
572 572
573 573
574 574 def ask_exit(self):
575 575 self.keep_running = False
576 576
577 577 rl_next_input = None
578 578
579 579 def interact(self, display_banner=DISPLAY_BANNER_DEPRECATED):
580 580
581 581 if display_banner is not DISPLAY_BANNER_DEPRECATED:
582 582 warn('interact `display_banner` argument is deprecated since IPython 5.0. Call `show_banner()` if needed.', DeprecationWarning, stacklevel=2)
583 583
584 584 self.keep_running = True
585 585 while self.keep_running:
586 586 print(self.separate_in, end='')
587 587
588 588 try:
589 589 code = self.prompt_for_code()
590 590 except EOFError:
591 591 if (not self.confirm_exit) \
592 592 or self.ask_yes_no('Do you really want to exit ([y]/n)?','y','n'):
593 593 self.ask_exit()
594 594
595 595 else:
596 596 if code:
597 597 self.run_cell(code, store_history=True)
598 598
599 599 def mainloop(self, display_banner=DISPLAY_BANNER_DEPRECATED):
600 600 # An extra layer of protection in case someone mashing Ctrl-C breaks
601 601 # out of our internal code.
602 602 if display_banner is not DISPLAY_BANNER_DEPRECATED:
603 603 warn('mainloop `display_banner` argument is deprecated since IPython 5.0. Call `show_banner()` if needed.', DeprecationWarning, stacklevel=2)
604 604 while True:
605 605 try:
606 606 self.interact()
607 607 break
608 608 except KeyboardInterrupt as e:
609 609 print("\n%s escaped interact()\n" % type(e).__name__)
610 610 finally:
611 611 # An interrupt during the eventloop will mess up the
612 612 # internal state of the prompt_toolkit library.
613 613 # Stopping the eventloop fixes this, see
614 614 # https://github.com/ipython/ipython/pull/9867
615 615 if hasattr(self, '_eventloop'):
616 616 self._eventloop.stop()
617 617
618 618 self.restore_term_title()
619 619
620 620 # try to call some at-exit operation optimistically as some things can't
621 621 # be done during interpreter shutdown. this is technically inaccurate as
622 622 # this make mainlool not re-callable, but that should be a rare if not
623 623 # in existent use case.
624 624
625 625 self._atexit_once()
626 626
627 627
628 628 _inputhook = None
629 629 def inputhook(self, context):
630 630 if self._inputhook is not None:
631 631 self._inputhook(context)
632 632
633 633 active_eventloop = None
634 634 def enable_gui(self, gui=None):
635 635 if gui and (gui != 'inline') :
636 636 self.active_eventloop, self._inputhook =\
637 637 get_inputhook_name_and_func(gui)
638 638 else:
639 639 self.active_eventloop = self._inputhook = None
640 640
641 641 # For prompt_toolkit 3.0. We have to create an asyncio event loop with
642 642 # this inputhook.
643 643 if PTK3:
644 644 import asyncio
645 645 from prompt_toolkit.eventloop import new_eventloop_with_inputhook
646 646
647 647 if gui == 'asyncio':
648 648 # When we integrate the asyncio event loop, run the UI in the
649 649 # same event loop as the rest of the code. don't use an actual
650 650 # input hook. (Asyncio is not made for nesting event loops.)
651 self.pt_loop = asyncio.get_event_loop()
651 self.pt_loop = asyncio.get_event_loop_policy().get_event_loop()
652 652
653 653 elif self._inputhook:
654 654 # If an inputhook was set, create a new asyncio event loop with
655 655 # this inputhook for the prompt.
656 656 self.pt_loop = new_eventloop_with_inputhook(self._inputhook)
657 657 else:
658 658 # When there's no inputhook, run the prompt in a separate
659 659 # asyncio event loop.
660 660 self.pt_loop = asyncio.new_event_loop()
661 661
662 662 # Run !system commands directly, not through pipes, so terminal programs
663 663 # work correctly.
664 664 system = InteractiveShell.system_raw
665 665
666 666 def auto_rewrite_input(self, cmd):
667 667 """Overridden from the parent class to use fancy rewriting prompt"""
668 668 if not self.show_rewritten_input:
669 669 return
670 670
671 671 tokens = self.prompts.rewrite_prompt_tokens()
672 672 if self.pt_app:
673 673 print_formatted_text(PygmentsTokens(tokens), end='',
674 674 style=self.pt_app.app.style)
675 675 print(cmd)
676 676 else:
677 677 prompt = ''.join(s for t, s in tokens)
678 678 print(prompt, cmd, sep='')
679 679
680 680 _prompts_before = None
681 681 def switch_doctest_mode(self, mode):
682 682 """Switch prompts to classic for %doctest_mode"""
683 683 if mode:
684 684 self._prompts_before = self.prompts
685 685 self.prompts = ClassicPrompts(self)
686 686 elif self._prompts_before:
687 687 self.prompts = self._prompts_before
688 688 self._prompts_before = None
689 689 # self._update_layout()
690 690
691 691
692 692 InteractiveShellABC.register(TerminalInteractiveShell)
693 693
694 694 if __name__ == '__main__':
695 695 TerminalInteractiveShell.instance().interact()
@@ -1,64 +1,64
1 1 """
2 2 Inputhook for running the original asyncio event loop while we're waiting for
3 3 input.
4 4
5 5 By default, in IPython, we run the prompt with a different asyncio event loop,
6 6 because otherwise we risk that people are freezing the prompt by scheduling bad
7 7 coroutines. E.g., a coroutine that does a while/true and never yield back
8 8 control to the loop. We can't cancel that.
9 9
10 10 However, sometimes we want the asyncio loop to keep running while waiting for
11 11 a prompt.
12 12
13 13 The following example will print the numbers from 1 to 10 above the prompt,
14 14 while we are waiting for input. (This works also because we use
15 15 prompt_toolkit`s `patch_stdout`)::
16 16
17 17 In [1]: import asyncio
18 18
19 19 In [2]: %gui asyncio
20 20
21 21 In [3]: async def f():
22 22 ...: for i in range(10):
23 23 ...: await asyncio.sleep(1)
24 24 ...: print(i)
25 25
26 26
27 27 In [4]: asyncio.ensure_future(f())
28 28
29 29 """
30 30 import asyncio
31 31 from prompt_toolkit import __version__ as ptk_version
32 32
33 33 PTK3 = ptk_version.startswith('3.')
34 34
35 35
36 36 # Keep reference to the original asyncio loop, because getting the event loop
37 37 # within the input hook would return the other loop.
38 loop = asyncio.get_event_loop()
38 loop = asyncio.get_event_loop_policy().get_event_loop()
39 39
40 40
41 41 def inputhook(context):
42 42 """
43 43 Inputhook for asyncio event loop integration.
44 44 """
45 45 # For prompt_toolkit 3.0, this input hook literally doesn't do anything.
46 46 # The event loop integration here is implemented in `interactiveshell.py`
47 47 # by running the prompt itself in the current asyncio loop. The main reason
48 48 # for this is that nesting asyncio event loops is unreliable.
49 49 if PTK3:
50 50 return
51 51
52 52 # For prompt_toolkit 2.0, we can run the current asyncio event loop,
53 53 # because prompt_toolkit 2.0 uses a different event loop internally.
54 54
55 55 def stop():
56 56 loop.stop()
57 57
58 58 fileno = context.fileno()
59 59 loop.add_reader(fileno, stop)
60 60 try:
61 61 loop.run_forever()
62 62 finally:
63 63 loop.remove_reader(fileno)
64 64
General Comments 0
You need to be logged in to leave comments. Login now