##// END OF EJS Templates
avoid deprecated get_event_loop...
Min RK -
Show More
@@ -1,103 +1,156 b''
1 1 """
2 2 Async helper function that are invalid syntax on Python 3.5 and below.
3 3
4 4 This code is best effort, and may have edge cases not behaving as expected. In
5 5 particular it contain a number of heuristics to detect whether code is
6 6 effectively async and need to run in an event loop or not.
7 7
8 8 Some constructs (like top-level `return`, or `yield`) are taken care of
9 9 explicitly to actually raise a SyntaxError and stay as close as possible to
10 10 Python semantics.
11 11 """
12 12
13 13
14 14 import ast
15 15 import asyncio
16 16 import inspect
17 from functools import wraps
17 18
19 _asyncio_event_loop = None
18 20
19 class _AsyncIORunner:
20 def __init__(self):
21 self._loop = None
22
23 @property
24 def loop(self):
25 """Always returns a non-closed event loop"""
26 if self._loop is None or self._loop.is_closed():
27 policy = asyncio.get_event_loop_policy()
28 self._loop = policy.new_event_loop()
29 policy.set_event_loop(self._loop)
30 return self._loop
31 21
22 def get_asyncio_loop():
23 """asyncio has deprecated get_event_loop
24
25 Replicate it here, with our desired semantics:
26
27 - always returns a valid, not-closed loop
28 - not thread-local like asyncio's,
29 because we only want one loop for IPython
30 - if called from inside a coroutine (e.g. in ipykernel),
31 return the running loop
32
33 .. versionadded:: 8.0
34 """
35 try:
36 return asyncio.get_running_loop()
37 except RuntimeError:
38 # not inside a coroutine,
39 # track our own global
40 pass
41
42 # not thread-local like asyncio's,
43 # because we only track one event loop to run for IPython itself,
44 # always in the main thread.
45 global _asyncio_event_loop
46 if _asyncio_event_loop is None or _asyncio_event_loop.is_closed():
47 _asyncio_event_loop = asyncio.new_event_loop()
48 return _asyncio_event_loop
49
50
51 class _AsyncIORunner:
32 52 def __call__(self, coro):
33 53 """
34 54 Handler for asyncio autoawait
35 55 """
36 return self.loop.run_until_complete(coro)
56 return get_asyncio_loop().run_until_complete(coro)
37 57
38 58 def __str__(self):
39 59 return "asyncio"
40 60
41 61
42 62 _asyncio_runner = _AsyncIORunner()
43 63
44 64
65 class _AsyncIOProxy:
66 """Proxy-object for an asyncio
67
68 Any coroutine methods will be wrapped in event_loop.run_
69 """
70
71 def __init__(self, obj, event_loop):
72 self._obj = obj
73 self._event_loop = event_loop
74
75 def __repr__(self):
76 return f"<_AsyncIOProxy({self._obj!r})>"
77
78 def __getattr__(self, key):
79 attr = getattr(self._obj, key)
80 if inspect.iscoroutinefunction(attr):
81 # if it's a coroutine method,
82 # return a threadsafe wrapper onto the _current_ asyncio loop
83 @wraps(attr)
84 def _wrapped(*args, **kwargs):
85 concurrent_future = asyncio.run_coroutine_threadsafe(
86 attr(*args, **kwargs), self._event_loop
87 )
88 return asyncio.wrap_future(concurrent_future)
89
90 return _wrapped
91 else:
92 return attr
93
94 def __dir__(self):
95 return dir(self._obj)
96
97
45 98 def _curio_runner(coroutine):
46 99 """
47 100 handler for curio autoawait
48 101 """
49 102 import curio
50 103
51 104 return curio.run(coroutine)
52 105
53 106
54 107 def _trio_runner(async_fn):
55 108 import trio
56 109
57 110 async def loc(coro):
58 111 """
59 112 We need the dummy no-op async def to protect from
60 113 trio's internal. See https://github.com/python-trio/trio/issues/89
61 114 """
62 115 return await coro
63 116
64 117 return trio.run(loc, async_fn)
65 118
66 119
67 120 def _pseudo_sync_runner(coro):
68 121 """
69 122 A runner that does not really allow async execution, and just advance the coroutine.
70 123
71 124 See discussion in https://github.com/python-trio/trio/issues/608,
72 125
73 126 Credit to Nathaniel Smith
74 127 """
75 128 try:
76 129 coro.send(None)
77 130 except StopIteration as exc:
78 131 return exc.value
79 132 else:
80 133 # TODO: do not raise but return an execution result with the right info.
81 134 raise RuntimeError(
82 135 "{coro_name!r} needs a real async loop".format(coro_name=coro.__name__)
83 136 )
84 137
85 138
86 139 def _should_be_async(cell: str) -> bool:
87 140 """Detect if a block of code need to be wrapped in an `async def`
88 141
89 142 Attempt to parse the block of code, it it compile we're fine.
90 143 Otherwise we wrap if and try to compile.
91 144
92 145 If it works, assume it should be async. Otherwise Return False.
93 146
94 147 Not handled yet: If the block of code has a return statement as the top
95 148 level, it will be seen as async. This is a know limitation.
96 149 """
97 150 try:
98 151 code = compile(
99 152 cell, "<>", "exec", flags=getattr(ast, "PyCF_ALLOW_TOP_LEVEL_AWAIT", 0x0)
100 153 )
101 154 return inspect.CO_COROUTINE & code.co_flags == inspect.CO_COROUTINE
102 155 except (SyntaxError, MemoryError):
103 156 return False
@@ -1,375 +1,362 b''
1 1 """Magic functions for running cells in various scripts."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 import asyncio
7 7 import atexit
8 8 import errno
9 import functools
10 9 import os
11 10 import signal
12 11 import sys
13 12 import time
14 from contextlib import contextmanager
15 13 from subprocess import CalledProcessError
14 from threading import Thread
16 15
17 from traitlets import Dict, List, default
16 from traitlets import Any, Dict, List, default
18 17
19 18 from IPython.core import magic_arguments
19 from IPython.core.async_helpers import _AsyncIOProxy
20 20 from IPython.core.magic import Magics, cell_magic, line_magic, magics_class
21 from IPython.lib.backgroundjobs import BackgroundJobManager
22 21 from IPython.utils.process import arg_split
23 22
24 23 #-----------------------------------------------------------------------------
25 24 # Magic implementation classes
26 25 #-----------------------------------------------------------------------------
27 26
28 27 def script_args(f):
29 28 """single decorator for adding script args"""
30 29 args = [
31 30 magic_arguments.argument(
32 31 '--out', type=str,
33 32 help="""The variable in which to store stdout from the script.
34 33 If the script is backgrounded, this will be the stdout *pipe*,
35 34 instead of the stderr text itself and will not be auto closed.
36 35 """
37 36 ),
38 37 magic_arguments.argument(
39 38 '--err', type=str,
40 39 help="""The variable in which to store stderr from the script.
41 40 If the script is backgrounded, this will be the stderr *pipe*,
42 41 instead of the stderr text itself and will not be autoclosed.
43 42 """
44 43 ),
45 44 magic_arguments.argument(
46 45 '--bg', action="store_true",
47 46 help="""Whether to run the script in the background.
48 47 If given, the only way to see the output of the command is
49 48 with --out/err.
50 49 """
51 50 ),
52 51 magic_arguments.argument(
53 52 '--proc', type=str,
54 53 help="""The variable in which to store Popen instance.
55 54 This is used only when --bg option is given.
56 55 """
57 56 ),
58 57 magic_arguments.argument(
59 58 '--no-raise-error', action="store_false", dest='raise_error',
60 help="""Whether you should raise an error message in addition to
59 help="""Whether you should raise an error message in addition to
61 60 a stream on stderr if you get a nonzero exit code.
62 61 """
63 62 )
64 63 ]
65 64 for arg in args:
66 65 f = arg(f)
67 66 return f
68 67
69 68
70 @contextmanager
71 def safe_watcher():
72 if sys.platform == "win32":
73 yield
74 return
75
76 from asyncio import SafeChildWatcher
77
78 policy = asyncio.get_event_loop_policy()
79 old_watcher = policy.get_child_watcher()
80 if isinstance(old_watcher, SafeChildWatcher):
81 yield
82 return
83
84 try:
85 loop = policy.get_event_loop()
86 if loop.is_closed():
87 raise RuntimeError("open a new one")
88 except RuntimeError:
89 # closed loop, make a new one
90 loop = policy.new_event_loop()
91 policy.set_event_loop(loop)
92
93 try:
94 watcher = asyncio.SafeChildWatcher()
95 watcher.attach_loop(loop)
96 policy.set_child_watcher(watcher)
97 yield
98 finally:
99 watcher.close()
100 policy.set_child_watcher(old_watcher)
101
102
103 def dec_safe_watcher(fun):
104 @functools.wraps(fun)
105 def _inner(*args, **kwargs):
106 with safe_watcher():
107 return fun(*args, **kwargs)
108
109 return _inner
110
111
112 69 @magics_class
113 70 class ScriptMagics(Magics):
114 71 """Magics for talking to scripts
115 72
116 73 This defines a base `%%script` cell magic for running a cell
117 74 with a program in a subprocess, and registers a few top-level
118 75 magics that call %%script with common interpreters.
119 76 """
77
78 event_loop = Any(
79 help="""
80 The event loop on which to run subprocesses
81
82 Not the main event loop,
83 because we want to be able to make blocking calls
84 and have certain requirements we don't want to impose on the main loop.
85 """
86 )
87
120 88 script_magics = List(
121 89 help="""Extra script cell magics to define
122 90
123 91 This generates simple wrappers of `%%script foo` as `%%foo`.
124 92
125 93 If you want to add script magics that aren't on your path,
126 94 specify them in script_paths
127 95 """,
128 96 ).tag(config=True)
129 97 @default('script_magics')
130 98 def _script_magics_default(self):
131 99 """default to a common list of programs"""
132 100
133 101 defaults = [
134 102 'sh',
135 103 'bash',
136 104 'perl',
137 105 'ruby',
138 106 'python',
139 107 'python2',
140 108 'python3',
141 109 'pypy',
142 110 ]
143 111 if os.name == 'nt':
144 112 defaults.extend([
145 113 'cmd',
146 114 ])
147 115
148 116 return defaults
149 117
150 118 script_paths = Dict(
151 119 help="""Dict mapping short 'ruby' names to full paths, such as '/opt/secret/bin/ruby'
152 120
153 121 Only necessary for items in script_magics where the default path will not
154 122 find the right interpreter.
155 123 """
156 124 ).tag(config=True)
157 125
158 126 def __init__(self, shell=None):
159 127 super(ScriptMagics, self).__init__(shell=shell)
160 128 self._generate_script_magics()
161 self.job_manager = BackgroundJobManager()
162 129 self.bg_processes = []
163 130 atexit.register(self.kill_bg_processes)
164 131
165 132 def __del__(self):
166 133 self.kill_bg_processes()
167 134
168 135 def _generate_script_magics(self):
169 136 cell_magics = self.magics['cell']
170 137 for name in self.script_magics:
171 138 cell_magics[name] = self._make_script_magic(name)
172 139
173 140 def _make_script_magic(self, name):
174 141 """make a named magic, that calls %%script with a particular program"""
175 142 # expand to explicit path if necessary:
176 143 script = self.script_paths.get(name, name)
177 144
178 145 @magic_arguments.magic_arguments()
179 146 @script_args
180 147 def named_script_magic(line, cell):
181 148 # if line, add it as cl-flags
182 149 if line:
183 150 line = "%s %s" % (script, line)
184 151 else:
185 152 line = script
186 153 return self.shebang(line, cell)
187 154
188 155 # write a basic docstring:
189 156 named_script_magic.__doc__ = \
190 157 """%%{name} script magic
191 158
192 159 Run cells with {script} in a subprocess.
193 160
194 161 This is a shortcut for `%%script {script}`
195 162 """.format(**locals())
196 163
197 164 return named_script_magic
198 165
199 166 @magic_arguments.magic_arguments()
200 167 @script_args
201 168 @cell_magic("script")
202 @dec_safe_watcher
203 169 def shebang(self, line, cell):
204 170 """Run a cell via a shell command
205 171
206 172 The `%%script` line is like the #! line of script,
207 173 specifying a program (bash, perl, ruby, etc.) with which to run.
208 174
209 175 The rest of the cell is run by that program.
210 176
211 177 Examples
212 178 --------
213 179 ::
214 180
215 181 In [1]: %%script bash
216 182 ...: for i in 1 2 3; do
217 183 ...: echo $i
218 184 ...: done
219 185 1
220 186 2
221 187 3
222 188 """
223 189
190 # Create the event loop in which to run script magics
191 # this operates on a background thread
192 if self.event_loop is None:
193 if sys.platform == "win32":
194 # don't override the current policy,
195 # just create an event loop
196 event_loop = asyncio.WindowsProactorEventLoopPolicy().new_event_loop()
197 else:
198 event_loop = asyncio.new_event_loop()
199 self.event_loop = event_loop
200
201 # start the loop in a background thread
202 asyncio_thread = Thread(target=event_loop.run_forever, daemon=True)
203 asyncio_thread.start()
204 else:
205 event_loop = self.event_loop
206
207 def in_thread(coro):
208 """Call a coroutine on the asyncio thread"""
209 return asyncio.run_coroutine_threadsafe(coro, event_loop).result()
210
224 211 async def _handle_stream(stream, stream_arg, file_object):
225 212 while True:
226 213 line = (await stream.readline()).decode("utf8")
227 214 if not line:
228 215 break
229 216 if stream_arg:
230 217 self.shell.user_ns[stream_arg] = line
231 218 else:
232 219 file_object.write(line)
233 220 file_object.flush()
234 221
235 222 async def _stream_communicate(process, cell):
236 223 process.stdin.write(cell)
237 224 process.stdin.close()
238 225 stdout_task = asyncio.create_task(
239 226 _handle_stream(process.stdout, args.out, sys.stdout)
240 227 )
241 228 stderr_task = asyncio.create_task(
242 229 _handle_stream(process.stderr, args.err, sys.stderr)
243 230 )
244 231 await asyncio.wait([stdout_task, stderr_task])
245 232 await process.wait()
246 233
247 policy = asyncio.get_event_loop_policy()
248 if sys.platform.startswith("win") and not isinstance(
249 policy, asyncio.WindowsProactorEventLoopPolicy
250 ):
251 # _do not_ overwrite the current policy
252 policy = asyncio.WindowsProactorEventLoopPolicy()
253
254 try:
255 loop = policy.get_event_loop()
256 except RuntimeError:
257 # closed loop, make a new one
258 loop = policy.new_event_loop()
259 policy.set_event_loop(loop)
260 234 argv = arg_split(line, posix=not sys.platform.startswith("win"))
261 235 args, cmd = self.shebang.parser.parse_known_args(argv)
236
262 237 try:
263 p = loop.run_until_complete(
238 p = in_thread(
264 239 asyncio.create_subprocess_exec(
265 240 *cmd,
266 241 stdout=asyncio.subprocess.PIPE,
267 242 stderr=asyncio.subprocess.PIPE,
268 243 stdin=asyncio.subprocess.PIPE,
269 244 )
270 245 )
271 246 except OSError as e:
272 247 if e.errno == errno.ENOENT:
273 248 print("Couldn't find program: %r" % cmd[0])
274 249 return
275 250 else:
276 251 raise
277
252
278 253 if not cell.endswith('\n'):
279 254 cell += '\n'
280 255 cell = cell.encode('utf8', 'replace')
281 256 if args.bg:
282 257 self.bg_processes.append(p)
283 258 self._gc_bg_processes()
284 259 to_close = []
285 260 if args.out:
286 self.shell.user_ns[args.out] = p.stdout
261 self.shell.user_ns[args.out] = _AsyncIOProxy(p.stdout, event_loop)
287 262 else:
288 263 to_close.append(p.stdout)
289 264 if args.err:
290 self.shell.user_ns[args.err] = p.stderr
265 self.shell.user_ns[args.err] = _AsyncIOProxy(p.stderr, event_loop)
291 266 else:
292 267 to_close.append(p.stderr)
293 self.job_manager.new(self._run_script, p, cell, to_close, daemon=True)
268 event_loop.call_soon_threadsafe(
269 lambda: asyncio.Task(self._run_script(p, cell, to_close))
270 )
294 271 if args.proc:
295 self.shell.user_ns[args.proc] = p
272 proc_proxy = _AsyncIOProxy(p, event_loop)
273 proc_proxy.stdout = _AsyncIOProxy(p.stdout, event_loop)
274 proc_proxy.stderr = _AsyncIOProxy(p.stderr, event_loop)
275 self.shell.user_ns[args.proc] = proc_proxy
296 276 return
297
277
298 278 try:
299 loop.run_until_complete(_stream_communicate(p, cell))
279 in_thread(_stream_communicate(p, cell))
300 280 except KeyboardInterrupt:
301 281 try:
302 282 p.send_signal(signal.SIGINT)
303 time.sleep(0.1)
283 in_thread(asyncio.wait_for(p.wait(), timeout=0.1))
304 284 if p.returncode is not None:
305 285 print("Process is interrupted.")
306 286 return
307 287 p.terminate()
308 time.sleep(0.1)
288 in_thread(asyncio.wait_for(p.wait(), timeout=0.1))
309 289 if p.returncode is not None:
310 290 print("Process is terminated.")
311 291 return
312 292 p.kill()
313 293 print("Process is killed.")
314 294 except OSError:
315 295 pass
316 296 except Exception as e:
317 297 print("Error while terminating subprocess (pid=%i): %s" % (p.pid, e))
318 298 return
319 if args.raise_error and p.returncode!=0:
299
300 if args.raise_error and p.returncode != 0:
320 301 # If we get here and p.returncode is still None, we must have
321 302 # killed it but not yet seen its return code. We don't wait for it,
322 303 # in case it's stuck in uninterruptible sleep. -9 = SIGKILL
323 304 rc = p.returncode or -9
324 305 raise CalledProcessError(rc, cell)
325 306
326 307 shebang.__skip_doctest__ = os.name != "posix"
327
328 def _run_script(self, p, cell, to_close):
308
309 async def _run_script(self, p, cell, to_close):
329 310 """callback for running the script in the background"""
311
330 312 p.stdin.write(cell)
313 await p.stdin.drain()
331 314 p.stdin.close()
315 await p.stdin.wait_closed()
316 await p.wait()
317 # asyncio read pipes have no close
318 # but we should drain the data anyway
332 319 for s in to_close:
333 s.close()
334 p.wait()
320 await s.read()
321 self._gc_bg_processes()
335 322
336 323 @line_magic("killbgscripts")
337 324 def killbgscripts(self, _nouse_=''):
338 325 """Kill all BG processes started by %%script and its family."""
339 326 self.kill_bg_processes()
340 327 print("All background processes were killed.")
341 328
342 329 def kill_bg_processes(self):
343 330 """Kill all BG processes which are still running."""
344 331 if not self.bg_processes:
345 332 return
346 333 for p in self.bg_processes:
347 334 if p.returncode is None:
348 335 try:
349 336 p.send_signal(signal.SIGINT)
350 337 except:
351 338 pass
352 339 time.sleep(0.1)
353 340 self._gc_bg_processes()
354 341 if not self.bg_processes:
355 342 return
356 343 for p in self.bg_processes:
357 344 if p.returncode is None:
358 345 try:
359 346 p.terminate()
360 347 except:
361 348 pass
362 349 time.sleep(0.1)
363 350 self._gc_bg_processes()
364 351 if not self.bg_processes:
365 352 return
366 353 for p in self.bg_processes:
367 354 if p.returncode is None:
368 355 try:
369 356 p.kill()
370 357 except:
371 358 pass
372 359 self._gc_bg_processes()
373 360
374 361 def _gc_bg_processes(self):
375 362 self.bg_processes = [p for p in self.bg_processes if p.returncode is None]
@@ -1,1098 +1,1098 b''
1 1 # -*- coding: utf-8 -*-
2 2 """Tests for the key interactiveshell module.
3 3
4 4 Historically the main classes in interactiveshell have been under-tested. This
5 5 module should grow as many single-method tests as possible to trap many of the
6 6 recurring bugs we seem to encounter with high-level interaction.
7 7 """
8 8
9 9 # Copyright (c) IPython Development Team.
10 10 # Distributed under the terms of the Modified BSD License.
11 11
12 12 import asyncio
13 13 import ast
14 14 import os
15 15 import signal
16 16 import shutil
17 17 import sys
18 18 import tempfile
19 19 import unittest
20 20 from unittest import mock
21 21
22 22 from os.path import join
23 23
24 24 from IPython.core.error import InputRejected
25 25 from IPython.core.inputtransformer import InputTransformer
26 26 from IPython.core import interactiveshell
27 27 from IPython.testing.decorators import (
28 28 skipif, skip_win32, onlyif_unicode_paths, onlyif_cmds_exist,
29 29 )
30 30 from IPython.testing import tools as tt
31 31 from IPython.utils.process import find_cmd
32 32
33 33 #-----------------------------------------------------------------------------
34 34 # Globals
35 35 #-----------------------------------------------------------------------------
36 36 # This is used by every single test, no point repeating it ad nauseam
37 37
38 38 #-----------------------------------------------------------------------------
39 39 # Tests
40 40 #-----------------------------------------------------------------------------
41 41
42 42 class DerivedInterrupt(KeyboardInterrupt):
43 43 pass
44 44
45 45 class InteractiveShellTestCase(unittest.TestCase):
46 46 def test_naked_string_cells(self):
47 47 """Test that cells with only naked strings are fully executed"""
48 48 # First, single-line inputs
49 49 ip.run_cell('"a"\n')
50 50 self.assertEqual(ip.user_ns['_'], 'a')
51 51 # And also multi-line cells
52 52 ip.run_cell('"""a\nb"""\n')
53 53 self.assertEqual(ip.user_ns['_'], 'a\nb')
54 54
55 55 def test_run_empty_cell(self):
56 56 """Just make sure we don't get a horrible error with a blank
57 57 cell of input. Yes, I did overlook that."""
58 58 old_xc = ip.execution_count
59 59 res = ip.run_cell('')
60 60 self.assertEqual(ip.execution_count, old_xc)
61 61 self.assertEqual(res.execution_count, None)
62 62
63 63 def test_run_cell_multiline(self):
64 64 """Multi-block, multi-line cells must execute correctly.
65 65 """
66 66 src = '\n'.join(["x=1",
67 67 "y=2",
68 68 "if 1:",
69 69 " x += 1",
70 70 " y += 1",])
71 71 res = ip.run_cell(src)
72 72 self.assertEqual(ip.user_ns['x'], 2)
73 73 self.assertEqual(ip.user_ns['y'], 3)
74 74 self.assertEqual(res.success, True)
75 75 self.assertEqual(res.result, None)
76 76
77 77 def test_multiline_string_cells(self):
78 78 "Code sprinkled with multiline strings should execute (GH-306)"
79 79 ip.run_cell('tmp=0')
80 80 self.assertEqual(ip.user_ns['tmp'], 0)
81 81 res = ip.run_cell('tmp=1;"""a\nb"""\n')
82 82 self.assertEqual(ip.user_ns['tmp'], 1)
83 83 self.assertEqual(res.success, True)
84 84 self.assertEqual(res.result, "a\nb")
85 85
86 86 def test_dont_cache_with_semicolon(self):
87 87 "Ending a line with semicolon should not cache the returned object (GH-307)"
88 88 oldlen = len(ip.user_ns['Out'])
89 89 for cell in ['1;', '1;1;']:
90 90 res = ip.run_cell(cell, store_history=True)
91 91 newlen = len(ip.user_ns['Out'])
92 92 self.assertEqual(oldlen, newlen)
93 93 self.assertIsNone(res.result)
94 94 i = 0
95 95 #also test the default caching behavior
96 96 for cell in ['1', '1;1']:
97 97 ip.run_cell(cell, store_history=True)
98 98 newlen = len(ip.user_ns['Out'])
99 99 i += 1
100 100 self.assertEqual(oldlen+i, newlen)
101 101
102 102 def test_syntax_error(self):
103 103 res = ip.run_cell("raise = 3")
104 104 self.assertIsInstance(res.error_before_exec, SyntaxError)
105 105
106 106 def test_In_variable(self):
107 107 "Verify that In variable grows with user input (GH-284)"
108 108 oldlen = len(ip.user_ns['In'])
109 109 ip.run_cell('1;', store_history=True)
110 110 newlen = len(ip.user_ns['In'])
111 111 self.assertEqual(oldlen+1, newlen)
112 112 self.assertEqual(ip.user_ns['In'][-1],'1;')
113 113
114 114 def test_magic_names_in_string(self):
115 115 ip.run_cell('a = """\n%exit\n"""')
116 116 self.assertEqual(ip.user_ns['a'], '\n%exit\n')
117 117
118 118 def test_trailing_newline(self):
119 119 """test that running !(command) does not raise a SyntaxError"""
120 120 ip.run_cell('!(true)\n', False)
121 121 ip.run_cell('!(true)\n\n\n', False)
122 122
123 123 def test_gh_597(self):
124 124 """Pretty-printing lists of objects with non-ascii reprs may cause
125 125 problems."""
126 126 class Spam(object):
127 127 def __repr__(self):
128 128 return "\xe9"*50
129 129 import IPython.core.formatters
130 130 f = IPython.core.formatters.PlainTextFormatter()
131 131 f([Spam(),Spam()])
132 132
133 133
134 134 def test_future_flags(self):
135 135 """Check that future flags are used for parsing code (gh-777)"""
136 136 ip.run_cell('from __future__ import barry_as_FLUFL')
137 137 try:
138 138 ip.run_cell('prfunc_return_val = 1 <> 2')
139 139 assert 'prfunc_return_val' in ip.user_ns
140 140 finally:
141 141 # Reset compiler flags so we don't mess up other tests.
142 142 ip.compile.reset_compiler_flags()
143 143
144 144 def test_can_pickle(self):
145 145 "Can we pickle objects defined interactively (GH-29)"
146 146 ip = get_ipython()
147 147 ip.reset()
148 148 ip.run_cell(("class Mylist(list):\n"
149 149 " def __init__(self,x=[]):\n"
150 150 " list.__init__(self,x)"))
151 151 ip.run_cell("w=Mylist([1,2,3])")
152 152
153 153 from pickle import dumps
154 154
155 155 # We need to swap in our main module - this is only necessary
156 156 # inside the test framework, because IPython puts the interactive module
157 157 # in place (but the test framework undoes this).
158 158 _main = sys.modules['__main__']
159 159 sys.modules['__main__'] = ip.user_module
160 160 try:
161 161 res = dumps(ip.user_ns["w"])
162 162 finally:
163 163 sys.modules['__main__'] = _main
164 164 self.assertTrue(isinstance(res, bytes))
165 165
166 166 def test_global_ns(self):
167 167 "Code in functions must be able to access variables outside them."
168 168 ip = get_ipython()
169 169 ip.run_cell("a = 10")
170 170 ip.run_cell(("def f(x):\n"
171 171 " return x + a"))
172 172 ip.run_cell("b = f(12)")
173 173 self.assertEqual(ip.user_ns["b"], 22)
174 174
175 175 def test_bad_custom_tb(self):
176 176 """Check that InteractiveShell is protected from bad custom exception handlers"""
177 177 ip.set_custom_exc((IOError,), lambda etype,value,tb: 1/0)
178 178 self.assertEqual(ip.custom_exceptions, (IOError,))
179 179 with tt.AssertPrints("Custom TB Handler failed", channel='stderr'):
180 180 ip.run_cell(u'raise IOError("foo")')
181 181 self.assertEqual(ip.custom_exceptions, ())
182 182
183 183 def test_bad_custom_tb_return(self):
184 184 """Check that InteractiveShell is protected from bad return types in custom exception handlers"""
185 185 ip.set_custom_exc((NameError,),lambda etype,value,tb, tb_offset=None: 1)
186 186 self.assertEqual(ip.custom_exceptions, (NameError,))
187 187 with tt.AssertPrints("Custom TB Handler failed", channel='stderr'):
188 188 ip.run_cell(u'a=abracadabra')
189 189 self.assertEqual(ip.custom_exceptions, ())
190 190
191 191 def test_drop_by_id(self):
192 192 myvars = {"a":object(), "b":object(), "c": object()}
193 193 ip.push(myvars, interactive=False)
194 194 for name in myvars:
195 195 assert name in ip.user_ns, name
196 196 assert name in ip.user_ns_hidden, name
197 197 ip.user_ns['b'] = 12
198 198 ip.drop_by_id(myvars)
199 199 for name in ["a", "c"]:
200 200 assert name not in ip.user_ns, name
201 201 assert name not in ip.user_ns_hidden, name
202 202 assert ip.user_ns['b'] == 12
203 203 ip.reset()
204 204
205 205 def test_var_expand(self):
206 206 ip.user_ns['f'] = u'Ca\xf1o'
207 207 self.assertEqual(ip.var_expand(u'echo $f'), u'echo Ca\xf1o')
208 208 self.assertEqual(ip.var_expand(u'echo {f}'), u'echo Ca\xf1o')
209 209 self.assertEqual(ip.var_expand(u'echo {f[:-1]}'), u'echo Ca\xf1')
210 210 self.assertEqual(ip.var_expand(u'echo {1*2}'), u'echo 2')
211 211
212 212 self.assertEqual(ip.var_expand(u"grep x | awk '{print $1}'"), u"grep x | awk '{print $1}'")
213 213
214 214 ip.user_ns['f'] = b'Ca\xc3\xb1o'
215 215 # This should not raise any exception:
216 216 ip.var_expand(u'echo $f')
217 217
218 218 def test_var_expand_local(self):
219 219 """Test local variable expansion in !system and %magic calls"""
220 220 # !system
221 221 ip.run_cell(
222 222 "def test():\n"
223 223 ' lvar = "ttt"\n'
224 224 " ret = !echo {lvar}\n"
225 225 " return ret[0]\n"
226 226 )
227 227 res = ip.user_ns["test"]()
228 228 self.assertIn("ttt", res)
229 229
230 230 # %magic
231 231 ip.run_cell(
232 232 "def makemacro():\n"
233 233 ' macroname = "macro_var_expand_locals"\n'
234 234 " %macro {macroname} codestr\n"
235 235 )
236 236 ip.user_ns["codestr"] = "str(12)"
237 237 ip.run_cell("makemacro()")
238 238 self.assertIn("macro_var_expand_locals", ip.user_ns)
239 239
240 240 def test_var_expand_self(self):
241 241 """Test variable expansion with the name 'self', which was failing.
242 242
243 243 See https://github.com/ipython/ipython/issues/1878#issuecomment-7698218
244 244 """
245 245 ip.run_cell(
246 246 "class cTest:\n"
247 247 ' classvar="see me"\n'
248 248 " def test(self):\n"
249 249 " res = !echo Variable: {self.classvar}\n"
250 250 " return res[0]\n"
251 251 )
252 252 self.assertIn("see me", ip.user_ns["cTest"]().test())
253 253
254 254 def test_bad_var_expand(self):
255 255 """var_expand on invalid formats shouldn't raise"""
256 256 # SyntaxError
257 257 self.assertEqual(ip.var_expand(u"{'a':5}"), u"{'a':5}")
258 258 # NameError
259 259 self.assertEqual(ip.var_expand(u"{asdf}"), u"{asdf}")
260 260 # ZeroDivisionError
261 261 self.assertEqual(ip.var_expand(u"{1/0}"), u"{1/0}")
262 262
263 263 def test_silent_postexec(self):
264 264 """run_cell(silent=True) doesn't invoke pre/post_run_cell callbacks"""
265 265 pre_explicit = mock.Mock()
266 266 pre_always = mock.Mock()
267 267 post_explicit = mock.Mock()
268 268 post_always = mock.Mock()
269 269 all_mocks = [pre_explicit, pre_always, post_explicit, post_always]
270 270
271 271 ip.events.register('pre_run_cell', pre_explicit)
272 272 ip.events.register('pre_execute', pre_always)
273 273 ip.events.register('post_run_cell', post_explicit)
274 274 ip.events.register('post_execute', post_always)
275 275
276 276 try:
277 277 ip.run_cell("1", silent=True)
278 278 assert pre_always.called
279 279 assert not pre_explicit.called
280 280 assert post_always.called
281 281 assert not post_explicit.called
282 282 # double-check that non-silent exec did what we expected
283 283 # silent to avoid
284 284 ip.run_cell("1")
285 285 assert pre_explicit.called
286 286 assert post_explicit.called
287 287 info, = pre_explicit.call_args[0]
288 288 result, = post_explicit.call_args[0]
289 289 self.assertEqual(info, result.info)
290 290 # check that post hooks are always called
291 291 [m.reset_mock() for m in all_mocks]
292 292 ip.run_cell("syntax error")
293 293 assert pre_always.called
294 294 assert pre_explicit.called
295 295 assert post_always.called
296 296 assert post_explicit.called
297 297 info, = pre_explicit.call_args[0]
298 298 result, = post_explicit.call_args[0]
299 299 self.assertEqual(info, result.info)
300 300 finally:
301 301 # remove post-exec
302 302 ip.events.unregister('pre_run_cell', pre_explicit)
303 303 ip.events.unregister('pre_execute', pre_always)
304 304 ip.events.unregister('post_run_cell', post_explicit)
305 305 ip.events.unregister('post_execute', post_always)
306 306
307 307 def test_silent_noadvance(self):
308 308 """run_cell(silent=True) doesn't advance execution_count"""
309 309 ec = ip.execution_count
310 310 # silent should force store_history=False
311 311 ip.run_cell("1", store_history=True, silent=True)
312 312
313 313 self.assertEqual(ec, ip.execution_count)
314 314 # double-check that non-silent exec did what we expected
315 315 # silent to avoid
316 316 ip.run_cell("1", store_history=True)
317 317 self.assertEqual(ec+1, ip.execution_count)
318 318
319 319 def test_silent_nodisplayhook(self):
320 320 """run_cell(silent=True) doesn't trigger displayhook"""
321 321 d = dict(called=False)
322 322
323 323 trap = ip.display_trap
324 324 save_hook = trap.hook
325 325
326 326 def failing_hook(*args, **kwargs):
327 327 d['called'] = True
328 328
329 329 try:
330 330 trap.hook = failing_hook
331 331 res = ip.run_cell("1", silent=True)
332 332 self.assertFalse(d['called'])
333 333 self.assertIsNone(res.result)
334 334 # double-check that non-silent exec did what we expected
335 335 # silent to avoid
336 336 ip.run_cell("1")
337 337 self.assertTrue(d['called'])
338 338 finally:
339 339 trap.hook = save_hook
340 340
341 341 def test_ofind_line_magic(self):
342 342 from IPython.core.magic import register_line_magic
343 343
344 344 @register_line_magic
345 345 def lmagic(line):
346 346 "A line magic"
347 347
348 348 # Get info on line magic
349 349 lfind = ip._ofind("lmagic")
350 350 info = dict(
351 351 found=True,
352 352 isalias=False,
353 353 ismagic=True,
354 354 namespace="IPython internal",
355 355 obj=lmagic,
356 356 parent=None,
357 357 )
358 358 self.assertEqual(lfind, info)
359 359
360 360 def test_ofind_cell_magic(self):
361 361 from IPython.core.magic import register_cell_magic
362 362
363 363 @register_cell_magic
364 364 def cmagic(line, cell):
365 365 "A cell magic"
366 366
367 367 # Get info on cell magic
368 368 find = ip._ofind("cmagic")
369 369 info = dict(
370 370 found=True,
371 371 isalias=False,
372 372 ismagic=True,
373 373 namespace="IPython internal",
374 374 obj=cmagic,
375 375 parent=None,
376 376 )
377 377 self.assertEqual(find, info)
378 378
379 379 def test_ofind_property_with_error(self):
380 380 class A(object):
381 381 @property
382 382 def foo(self):
383 383 raise NotImplementedError()
384 384 a = A()
385 385
386 386 found = ip._ofind('a.foo', [('locals', locals())])
387 387 info = dict(found=True, isalias=False, ismagic=False,
388 388 namespace='locals', obj=A.foo, parent=a)
389 389 self.assertEqual(found, info)
390 390
391 391 def test_ofind_multiple_attribute_lookups(self):
392 392 class A(object):
393 393 @property
394 394 def foo(self):
395 395 raise NotImplementedError()
396 396
397 397 a = A()
398 398 a.a = A()
399 399 a.a.a = A()
400 400
401 401 found = ip._ofind('a.a.a.foo', [('locals', locals())])
402 402 info = dict(found=True, isalias=False, ismagic=False,
403 403 namespace='locals', obj=A.foo, parent=a.a.a)
404 404 self.assertEqual(found, info)
405 405
406 406 def test_ofind_slotted_attributes(self):
407 407 class A(object):
408 408 __slots__ = ['foo']
409 409 def __init__(self):
410 410 self.foo = 'bar'
411 411
412 412 a = A()
413 413 found = ip._ofind('a.foo', [('locals', locals())])
414 414 info = dict(found=True, isalias=False, ismagic=False,
415 415 namespace='locals', obj=a.foo, parent=a)
416 416 self.assertEqual(found, info)
417 417
418 418 found = ip._ofind('a.bar', [('locals', locals())])
419 419 info = dict(found=False, isalias=False, ismagic=False,
420 420 namespace=None, obj=None, parent=a)
421 421 self.assertEqual(found, info)
422 422
423 423 def test_ofind_prefers_property_to_instance_level_attribute(self):
424 424 class A(object):
425 425 @property
426 426 def foo(self):
427 427 return 'bar'
428 428 a = A()
429 429 a.__dict__["foo"] = "baz"
430 430 self.assertEqual(a.foo, "bar")
431 431 found = ip._ofind("a.foo", [("locals", locals())])
432 432 self.assertIs(found["obj"], A.foo)
433 433
434 434 def test_custom_syntaxerror_exception(self):
435 435 called = []
436 436 def my_handler(shell, etype, value, tb, tb_offset=None):
437 437 called.append(etype)
438 438 shell.showtraceback((etype, value, tb), tb_offset=tb_offset)
439 439
440 440 ip.set_custom_exc((SyntaxError,), my_handler)
441 441 try:
442 442 ip.run_cell("1f")
443 443 # Check that this was called, and only once.
444 444 self.assertEqual(called, [SyntaxError])
445 445 finally:
446 446 # Reset the custom exception hook
447 447 ip.set_custom_exc((), None)
448 448
449 449 def test_custom_exception(self):
450 450 called = []
451 451 def my_handler(shell, etype, value, tb, tb_offset=None):
452 452 called.append(etype)
453 453 shell.showtraceback((etype, value, tb), tb_offset=tb_offset)
454 454
455 455 ip.set_custom_exc((ValueError,), my_handler)
456 456 try:
457 457 res = ip.run_cell("raise ValueError('test')")
458 458 # Check that this was called, and only once.
459 459 self.assertEqual(called, [ValueError])
460 460 # Check that the error is on the result object
461 461 self.assertIsInstance(res.error_in_exec, ValueError)
462 462 finally:
463 463 # Reset the custom exception hook
464 464 ip.set_custom_exc((), None)
465 465
466 466 @mock.patch("builtins.print")
467 467 def test_showtraceback_with_surrogates(self, mocked_print):
468 468 values = []
469 469
470 470 def mock_print_func(value, sep=" ", end="\n", file=sys.stdout, flush=False):
471 471 values.append(value)
472 472 if value == chr(0xD8FF):
473 473 raise UnicodeEncodeError("utf-8", chr(0xD8FF), 0, 1, "")
474 474
475 475 # mock builtins.print
476 476 mocked_print.side_effect = mock_print_func
477 477
478 478 # ip._showtraceback() is replaced in globalipapp.py.
479 479 # Call original method to test.
480 480 interactiveshell.InteractiveShell._showtraceback(ip, None, None, chr(0xD8FF))
481 481
482 482 self.assertEqual(mocked_print.call_count, 2)
483 483 self.assertEqual(values, [chr(0xD8FF), "\\ud8ff"])
484 484
485 485 def test_mktempfile(self):
486 486 filename = ip.mktempfile()
487 487 # Check that we can open the file again on Windows
488 488 with open(filename, 'w') as f:
489 489 f.write('abc')
490 490
491 491 filename = ip.mktempfile(data='blah')
492 492 with open(filename, 'r') as f:
493 493 self.assertEqual(f.read(), 'blah')
494 494
495 495 def test_new_main_mod(self):
496 496 # Smoketest to check that this accepts a unicode module name
497 497 name = u'jiefmw'
498 498 mod = ip.new_main_mod(u'%s.py' % name, name)
499 499 self.assertEqual(mod.__name__, name)
500 500
501 501 def test_get_exception_only(self):
502 502 try:
503 503 raise KeyboardInterrupt
504 504 except KeyboardInterrupt:
505 505 msg = ip.get_exception_only()
506 506 self.assertEqual(msg, 'KeyboardInterrupt\n')
507 507
508 508 try:
509 509 raise DerivedInterrupt("foo")
510 510 except KeyboardInterrupt:
511 511 msg = ip.get_exception_only()
512 512 self.assertEqual(msg, 'IPython.core.tests.test_interactiveshell.DerivedInterrupt: foo\n')
513 513
514 514 def test_inspect_text(self):
515 515 ip.run_cell('a = 5')
516 516 text = ip.object_inspect_text('a')
517 517 self.assertIsInstance(text, str)
518 518
519 519 def test_last_execution_result(self):
520 520 """ Check that last execution result gets set correctly (GH-10702) """
521 521 result = ip.run_cell('a = 5; a')
522 522 self.assertTrue(ip.last_execution_succeeded)
523 523 self.assertEqual(ip.last_execution_result.result, 5)
524 524
525 525 result = ip.run_cell('a = x_invalid_id_x')
526 526 self.assertFalse(ip.last_execution_succeeded)
527 527 self.assertFalse(ip.last_execution_result.success)
528 528 self.assertIsInstance(ip.last_execution_result.error_in_exec, NameError)
529 529
530 530 def test_reset_aliasing(self):
531 531 """ Check that standard posix aliases work after %reset. """
532 532 if os.name != 'posix':
533 533 return
534 534
535 535 ip.reset()
536 536 for cmd in ('clear', 'more', 'less', 'man'):
537 537 res = ip.run_cell('%' + cmd)
538 538 self.assertEqual(res.success, True)
539 539
540 540
541 541 class TestSafeExecfileNonAsciiPath(unittest.TestCase):
542 542
543 543 @onlyif_unicode_paths
544 544 def setUp(self):
545 545 self.BASETESTDIR = tempfile.mkdtemp()
546 546 self.TESTDIR = join(self.BASETESTDIR, u"Γ₯Àâ")
547 547 os.mkdir(self.TESTDIR)
548 548 with open(join(self.TESTDIR, u"Γ₯Àâtestscript.py"), "w") as sfile:
549 549 sfile.write("pass\n")
550 550 self.oldpath = os.getcwd()
551 551 os.chdir(self.TESTDIR)
552 552 self.fname = u"Γ₯Àâtestscript.py"
553 553
554 554 def tearDown(self):
555 555 os.chdir(self.oldpath)
556 556 shutil.rmtree(self.BASETESTDIR)
557 557
558 558 @onlyif_unicode_paths
559 559 def test_1(self):
560 560 """Test safe_execfile with non-ascii path
561 561 """
562 562 ip.safe_execfile(self.fname, {}, raise_exceptions=True)
563 563
564 564 class ExitCodeChecks(tt.TempFileMixin):
565 565
566 566 def setUp(self):
567 567 self.system = ip.system_raw
568 568
569 569 def test_exit_code_ok(self):
570 570 self.system('exit 0')
571 571 self.assertEqual(ip.user_ns['_exit_code'], 0)
572 572
573 573 def test_exit_code_error(self):
574 574 self.system('exit 1')
575 575 self.assertEqual(ip.user_ns['_exit_code'], 1)
576 576
577 577 @skipif(not hasattr(signal, 'SIGALRM'))
578 578 def test_exit_code_signal(self):
579 579 self.mktmp("import signal, time\n"
580 580 "signal.setitimer(signal.ITIMER_REAL, 0.1)\n"
581 581 "time.sleep(1)\n")
582 582 self.system("%s %s" % (sys.executable, self.fname))
583 583 self.assertEqual(ip.user_ns['_exit_code'], -signal.SIGALRM)
584 584
585 585 @onlyif_cmds_exist("csh")
586 586 def test_exit_code_signal_csh(self):
587 587 SHELL = os.environ.get('SHELL', None)
588 588 os.environ['SHELL'] = find_cmd("csh")
589 589 try:
590 590 self.test_exit_code_signal()
591 591 finally:
592 592 if SHELL is not None:
593 593 os.environ['SHELL'] = SHELL
594 594 else:
595 595 del os.environ['SHELL']
596 596
597 597
598 598 class TestSystemRaw(ExitCodeChecks):
599 599
600 600 def setUp(self):
601 601 super().setUp()
602 602 self.system = ip.system_raw
603 603
604 604 @onlyif_unicode_paths
605 605 def test_1(self):
606 606 """Test system_raw with non-ascii cmd
607 607 """
608 608 cmd = u'''python -c "'Γ₯Àâ'" '''
609 609 ip.system_raw(cmd)
610 610
611 611 @mock.patch('subprocess.call', side_effect=KeyboardInterrupt)
612 612 @mock.patch('os.system', side_effect=KeyboardInterrupt)
613 613 def test_control_c(self, *mocks):
614 614 try:
615 615 self.system("sleep 1 # wont happen")
616 616 except KeyboardInterrupt:
617 617 self.fail(
618 618 "system call should intercept "
619 619 "keyboard interrupt from subprocess.call"
620 620 )
621 621 self.assertEqual(ip.user_ns["_exit_code"], -signal.SIGINT)
622 622
623 623 def test_magic_warnings(self):
624 624 for magic_cmd in ("ls", "pip", "conda", "cd"):
625 625 with self.assertWarnsRegex(Warning, "You executed the system command"):
626 626 ip.system_raw(magic_cmd)
627 627
628 628 # TODO: Exit codes are currently ignored on Windows.
629 629 class TestSystemPipedExitCode(ExitCodeChecks):
630 630
631 631 def setUp(self):
632 632 super().setUp()
633 633 self.system = ip.system_piped
634 634
635 635 @skip_win32
636 636 def test_exit_code_ok(self):
637 637 ExitCodeChecks.test_exit_code_ok(self)
638 638
639 639 @skip_win32
640 640 def test_exit_code_error(self):
641 641 ExitCodeChecks.test_exit_code_error(self)
642 642
643 643 @skip_win32
644 644 def test_exit_code_signal(self):
645 645 ExitCodeChecks.test_exit_code_signal(self)
646 646
647 647 class TestModules(tt.TempFileMixin):
648 648 def test_extraneous_loads(self):
649 649 """Test we're not loading modules on startup that we shouldn't.
650 650 """
651 651 self.mktmp("import sys\n"
652 652 "print('numpy' in sys.modules)\n"
653 653 "print('ipyparallel' in sys.modules)\n"
654 654 "print('ipykernel' in sys.modules)\n"
655 655 )
656 656 out = "False\nFalse\nFalse\n"
657 657 tt.ipexec_validate(self.fname, out)
658 658
659 659 class Negator(ast.NodeTransformer):
660 660 """Negates all number literals in an AST."""
661 661
662 662 # for python 3.7 and earlier
663 663 def visit_Num(self, node):
664 664 node.n = -node.n
665 665 return node
666 666
667 667 # for python 3.8+
668 668 def visit_Constant(self, node):
669 669 if isinstance(node.value, int):
670 670 return self.visit_Num(node)
671 671 return node
672 672
673 673 class TestAstTransform(unittest.TestCase):
674 674 def setUp(self):
675 675 self.negator = Negator()
676 676 ip.ast_transformers.append(self.negator)
677 677
678 678 def tearDown(self):
679 679 ip.ast_transformers.remove(self.negator)
680 680
681 681 def test_run_cell(self):
682 682 with tt.AssertPrints('-34'):
683 683 ip.run_cell('print (12 + 22)')
684 684
685 685 # A named reference to a number shouldn't be transformed.
686 686 ip.user_ns['n'] = 55
687 687 with tt.AssertNotPrints('-55'):
688 688 ip.run_cell('print (n)')
689 689
690 690 def test_timeit(self):
691 691 called = set()
692 692 def f(x):
693 693 called.add(x)
694 694 ip.push({'f':f})
695 695
696 696 with tt.AssertPrints("std. dev. of"):
697 697 ip.run_line_magic("timeit", "-n1 f(1)")
698 698 self.assertEqual(called, {-1})
699 699 called.clear()
700 700
701 701 with tt.AssertPrints("std. dev. of"):
702 702 ip.run_cell_magic("timeit", "-n1 f(2)", "f(3)")
703 703 self.assertEqual(called, {-2, -3})
704 704
705 705 def test_time(self):
706 706 called = []
707 707 def f(x):
708 708 called.append(x)
709 709 ip.push({'f':f})
710 710
711 711 # Test with an expression
712 712 with tt.AssertPrints("Wall time: "):
713 713 ip.run_line_magic("time", "f(5+9)")
714 714 self.assertEqual(called, [-14])
715 715 called[:] = []
716 716
717 717 # Test with a statement (different code path)
718 718 with tt.AssertPrints("Wall time: "):
719 719 ip.run_line_magic("time", "a = f(-3 + -2)")
720 720 self.assertEqual(called, [5])
721 721
722 722 def test_macro(self):
723 723 ip.push({'a':10})
724 724 # The AST transformation makes this do a+=-1
725 725 ip.define_macro("amacro", "a+=1\nprint(a)")
726 726
727 727 with tt.AssertPrints("9"):
728 728 ip.run_cell("amacro")
729 729 with tt.AssertPrints("8"):
730 730 ip.run_cell("amacro")
731 731
732 732 class TestMiscTransform(unittest.TestCase):
733 733
734 734
735 735 def test_transform_only_once(self):
736 736 cleanup = 0
737 737 line_t = 0
738 738 def count_cleanup(lines):
739 739 nonlocal cleanup
740 740 cleanup += 1
741 741 return lines
742 742
743 743 def count_line_t(lines):
744 744 nonlocal line_t
745 745 line_t += 1
746 746 return lines
747 747
748 748 ip.input_transformer_manager.cleanup_transforms.append(count_cleanup)
749 749 ip.input_transformer_manager.line_transforms.append(count_line_t)
750 750
751 751 ip.run_cell('1')
752 752
753 753 assert cleanup == 1
754 754 assert line_t == 1
755 755
756 756 class IntegerWrapper(ast.NodeTransformer):
757 757 """Wraps all integers in a call to Integer()"""
758 758
759 759 # for Python 3.7 and earlier
760 760
761 761 # for Python 3.7 and earlier
762 762 def visit_Num(self, node):
763 763 if isinstance(node.n, int):
764 764 return ast.Call(func=ast.Name(id='Integer', ctx=ast.Load()),
765 765 args=[node], keywords=[])
766 766 return node
767 767
768 768 # For Python 3.8+
769 769 def visit_Constant(self, node):
770 770 if isinstance(node.value, int):
771 771 return self.visit_Num(node)
772 772 return node
773 773
774 774
775 775 class TestAstTransform2(unittest.TestCase):
776 776 def setUp(self):
777 777 self.intwrapper = IntegerWrapper()
778 778 ip.ast_transformers.append(self.intwrapper)
779 779
780 780 self.calls = []
781 781 def Integer(*args):
782 782 self.calls.append(args)
783 783 return args
784 784 ip.push({"Integer": Integer})
785 785
786 786 def tearDown(self):
787 787 ip.ast_transformers.remove(self.intwrapper)
788 788 del ip.user_ns['Integer']
789 789
790 790 def test_run_cell(self):
791 791 ip.run_cell("n = 2")
792 792 self.assertEqual(self.calls, [(2,)])
793 793
794 794 # This shouldn't throw an error
795 795 ip.run_cell("o = 2.0")
796 796 self.assertEqual(ip.user_ns['o'], 2.0)
797 797
798 798 def test_timeit(self):
799 799 called = set()
800 800 def f(x):
801 801 called.add(x)
802 802 ip.push({'f':f})
803 803
804 804 with tt.AssertPrints("std. dev. of"):
805 805 ip.run_line_magic("timeit", "-n1 f(1)")
806 806 self.assertEqual(called, {(1,)})
807 807 called.clear()
808 808
809 809 with tt.AssertPrints("std. dev. of"):
810 810 ip.run_cell_magic("timeit", "-n1 f(2)", "f(3)")
811 811 self.assertEqual(called, {(2,), (3,)})
812 812
813 813 class ErrorTransformer(ast.NodeTransformer):
814 814 """Throws an error when it sees a number."""
815 815
816 816 # for Python 3.7 and earlier
817 817 def visit_Num(self, node):
818 818 raise ValueError("test")
819 819
820 820 # for Python 3.8+
821 821 def visit_Constant(self, node):
822 822 if isinstance(node.value, int):
823 823 return self.visit_Num(node)
824 824 return node
825 825
826 826
827 827 class TestAstTransformError(unittest.TestCase):
828 828 def test_unregistering(self):
829 829 err_transformer = ErrorTransformer()
830 830 ip.ast_transformers.append(err_transformer)
831 831
832 832 with self.assertWarnsRegex(UserWarning, "It will be unregistered"):
833 833 ip.run_cell("1 + 2")
834 834
835 835 # This should have been removed.
836 836 self.assertNotIn(err_transformer, ip.ast_transformers)
837 837
838 838
839 839 class StringRejector(ast.NodeTransformer):
840 840 """Throws an InputRejected when it sees a string literal.
841 841
842 842 Used to verify that NodeTransformers can signal that a piece of code should
843 843 not be executed by throwing an InputRejected.
844 844 """
845 845
846 846 #for python 3.7 and earlier
847 847 def visit_Str(self, node):
848 848 raise InputRejected("test")
849 849
850 850 # 3.8 only
851 851 def visit_Constant(self, node):
852 852 if isinstance(node.value, str):
853 853 raise InputRejected("test")
854 854 return node
855 855
856 856
857 857 class TestAstTransformInputRejection(unittest.TestCase):
858 858
859 859 def setUp(self):
860 860 self.transformer = StringRejector()
861 861 ip.ast_transformers.append(self.transformer)
862 862
863 863 def tearDown(self):
864 864 ip.ast_transformers.remove(self.transformer)
865 865
866 866 def test_input_rejection(self):
867 867 """Check that NodeTransformers can reject input."""
868 868
869 869 expect_exception_tb = tt.AssertPrints("InputRejected: test")
870 870 expect_no_cell_output = tt.AssertNotPrints("'unsafe'", suppress=False)
871 871
872 872 # Run the same check twice to verify that the transformer is not
873 873 # disabled after raising.
874 874 with expect_exception_tb, expect_no_cell_output:
875 875 ip.run_cell("'unsafe'")
876 876
877 877 with expect_exception_tb, expect_no_cell_output:
878 878 res = ip.run_cell("'unsafe'")
879 879
880 880 self.assertIsInstance(res.error_before_exec, InputRejected)
881 881
882 882 def test__IPYTHON__():
883 883 # This shouldn't raise a NameError, that's all
884 884 __IPYTHON__
885 885
886 886
887 887 class DummyRepr(object):
888 888 def __repr__(self):
889 889 return "DummyRepr"
890 890
891 891 def _repr_html_(self):
892 892 return "<b>dummy</b>"
893 893
894 894 def _repr_javascript_(self):
895 895 return "console.log('hi');", {'key': 'value'}
896 896
897 897
898 898 def test_user_variables():
899 899 # enable all formatters
900 900 ip.display_formatter.active_types = ip.display_formatter.format_types
901 901
902 902 ip.user_ns['dummy'] = d = DummyRepr()
903 903 keys = {'dummy', 'doesnotexist'}
904 904 r = ip.user_expressions({ key:key for key in keys})
905 905
906 906 assert keys == set(r.keys())
907 907 dummy = r["dummy"]
908 908 assert {"status", "data", "metadata"} == set(dummy.keys())
909 909 assert dummy["status"] == "ok"
910 910 data = dummy["data"]
911 911 metadata = dummy["metadata"]
912 912 assert data.get("text/html") == d._repr_html_()
913 913 js, jsmd = d._repr_javascript_()
914 914 assert data.get("application/javascript") == js
915 915 assert metadata.get("application/javascript") == jsmd
916 916
917 917 dne = r["doesnotexist"]
918 918 assert dne["status"] == "error"
919 919 assert dne["ename"] == "NameError"
920 920
921 921 # back to text only
922 922 ip.display_formatter.active_types = ['text/plain']
923 923
924 924 def test_user_expression():
925 925 # enable all formatters
926 926 ip.display_formatter.active_types = ip.display_formatter.format_types
927 927 query = {
928 928 'a' : '1 + 2',
929 929 'b' : '1/0',
930 930 }
931 931 r = ip.user_expressions(query)
932 932 import pprint
933 933 pprint.pprint(r)
934 934 assert set(r.keys()) == set(query.keys())
935 935 a = r["a"]
936 936 assert {"status", "data", "metadata"} == set(a.keys())
937 937 assert a["status"] == "ok"
938 938 data = a["data"]
939 939 metadata = a["metadata"]
940 940 assert data.get("text/plain") == "3"
941 941
942 942 b = r["b"]
943 943 assert b["status"] == "error"
944 944 assert b["ename"] == "ZeroDivisionError"
945 945
946 946 # back to text only
947 947 ip.display_formatter.active_types = ['text/plain']
948 948
949 949
950 950 class TestSyntaxErrorTransformer(unittest.TestCase):
951 951 """Check that SyntaxError raised by an input transformer is handled by run_cell()"""
952 952
953 953 @staticmethod
954 954 def transformer(lines):
955 955 for line in lines:
956 956 pos = line.find('syntaxerror')
957 957 if pos >= 0:
958 958 e = SyntaxError('input contains "syntaxerror"')
959 959 e.text = line
960 960 e.offset = pos + 1
961 961 raise e
962 962 return lines
963 963
964 964 def setUp(self):
965 965 ip.input_transformers_post.append(self.transformer)
966 966
967 967 def tearDown(self):
968 968 ip.input_transformers_post.remove(self.transformer)
969 969
970 970 def test_syntaxerror_input_transformer(self):
971 971 with tt.AssertPrints('1234'):
972 972 ip.run_cell('1234')
973 973 with tt.AssertPrints('SyntaxError: invalid syntax'):
974 974 ip.run_cell('1 2 3') # plain python syntax error
975 975 with tt.AssertPrints('SyntaxError: input contains "syntaxerror"'):
976 976 ip.run_cell('2345 # syntaxerror') # input transformer syntax error
977 977 with tt.AssertPrints('3456'):
978 978 ip.run_cell('3456')
979 979
980 980
981 981 class TestWarningSuppression(unittest.TestCase):
982 982 def test_warning_suppression(self):
983 983 ip.run_cell("import warnings")
984 984 try:
985 985 with self.assertWarnsRegex(UserWarning, "asdf"):
986 986 ip.run_cell("warnings.warn('asdf')")
987 987 # Here's the real test -- if we run that again, we should get the
988 988 # warning again. Traditionally, each warning was only issued once per
989 989 # IPython session (approximately), even if the user typed in new and
990 990 # different code that should have also triggered the warning, leading
991 991 # to much confusion.
992 992 with self.assertWarnsRegex(UserWarning, "asdf"):
993 993 ip.run_cell("warnings.warn('asdf')")
994 994 finally:
995 995 ip.run_cell("del warnings")
996 996
997 997
998 998 def test_deprecation_warning(self):
999 999 ip.run_cell("""
1000 1000 import warnings
1001 1001 def wrn():
1002 1002 warnings.warn(
1003 1003 "I AM A WARNING",
1004 1004 DeprecationWarning
1005 1005 )
1006 1006 """)
1007 1007 try:
1008 1008 with self.assertWarnsRegex(DeprecationWarning, "I AM A WARNING"):
1009 1009 ip.run_cell("wrn()")
1010 1010 finally:
1011 1011 ip.run_cell("del warnings")
1012 1012 ip.run_cell("del wrn")
1013 1013
1014 1014
1015 1015 class TestImportNoDeprecate(tt.TempFileMixin):
1016 1016
1017 1017 def setUp(self):
1018 1018 """Make a valid python temp file."""
1019 1019 self.mktmp("""
1020 1020 import warnings
1021 1021 def wrn():
1022 1022 warnings.warn(
1023 1023 "I AM A WARNING",
1024 1024 DeprecationWarning
1025 1025 )
1026 1026 """)
1027 1027 super().setUp()
1028 1028
1029 1029 def test_no_dep(self):
1030 1030 """
1031 1031 No deprecation warning should be raised from imported functions
1032 1032 """
1033 1033 ip.run_cell("from {} import wrn".format(self.fname))
1034 1034
1035 1035 with tt.AssertNotPrints("I AM A WARNING"):
1036 1036 ip.run_cell("wrn()")
1037 1037 ip.run_cell("del wrn")
1038 1038
1039 1039
1040 1040 def test_custom_exc_count():
1041 1041 hook = mock.Mock(return_value=None)
1042 1042 ip.set_custom_exc((SyntaxError,), hook)
1043 1043 before = ip.execution_count
1044 1044 ip.run_cell("def foo()", store_history=True)
1045 1045 # restore default excepthook
1046 1046 ip.set_custom_exc((), None)
1047 1047 assert hook.call_count == 1
1048 1048 assert ip.execution_count == before + 1
1049 1049
1050 1050
1051 1051 def test_run_cell_async():
1052 loop = asyncio.get_event_loop_policy().get_event_loop()
1053 1052 ip.run_cell("import asyncio")
1054 1053 coro = ip.run_cell_async("await asyncio.sleep(0.01)\n5")
1055 1054 assert asyncio.iscoroutine(coro)
1055 loop = asyncio.new_event_loop()
1056 1056 result = loop.run_until_complete(coro)
1057 1057 assert isinstance(result, interactiveshell.ExecutionResult)
1058 1058 assert result.result == 5
1059 1059
1060 1060
1061 1061 def test_run_cell_await():
1062 1062 ip.run_cell("import asyncio")
1063 1063 result = ip.run_cell("await asyncio.sleep(0.01); 10")
1064 1064 assert ip.user_ns["_"] == 10
1065 1065
1066 1066
1067 1067 def test_run_cell_asyncio_run():
1068 1068 ip.run_cell("import asyncio")
1069 1069 result = ip.run_cell("await asyncio.sleep(0.01); 1")
1070 1070 assert ip.user_ns["_"] == 1
1071 1071 result = ip.run_cell("asyncio.run(asyncio.sleep(0.01)); 2")
1072 1072 assert ip.user_ns["_"] == 2
1073 1073 result = ip.run_cell("await asyncio.sleep(0.01); 3")
1074 1074 assert ip.user_ns["_"] == 3
1075 1075
1076 1076
1077 1077 def test_should_run_async():
1078 1078 assert not ip.should_run_async("a = 5")
1079 1079 assert ip.should_run_async("await x")
1080 1080 assert ip.should_run_async("import asyncio; await asyncio.sleep(1)")
1081 1081
1082 1082
1083 1083 def test_set_custom_completer():
1084 1084 num_completers = len(ip.Completer.matchers)
1085 1085
1086 1086 def foo(*args, **kwargs):
1087 1087 return "I'm a completer!"
1088 1088
1089 1089 ip.set_custom_completer(foo, 0)
1090 1090
1091 1091 # check that we've really added a new completer
1092 1092 assert len(ip.Completer.matchers) == num_completers + 1
1093 1093
1094 1094 # check that the first completer is the function we defined
1095 1095 assert ip.Completer.matchers[0]() == "I'm a completer!"
1096 1096
1097 1097 # clean up
1098 1098 ip.Completer.custom_matchers.pop()
@@ -1,1358 +1,1375 b''
1 1 # -*- coding: utf-8 -*-
2 2 """Tests for various magic functions."""
3 3
4 4 import asyncio
5 5 import gc
6 6 import io
7 7 import os
8 8 import re
9 9 import shlex
10 10 import sys
11 11 import warnings
12 12 from importlib import invalidate_caches
13 13 from io import StringIO
14 14 from pathlib import Path
15 15 from textwrap import dedent
16 16 from unittest import TestCase, mock
17 17
18 18 import pytest
19 19
20 20 from IPython import get_ipython
21 21 from IPython.core import magic
22 22 from IPython.core.error import UsageError
23 23 from IPython.core.magic import (
24 24 Magics,
25 25 cell_magic,
26 26 line_magic,
27 27 magics_class,
28 28 register_cell_magic,
29 29 register_line_magic,
30 30 )
31 31 from IPython.core.magics import code, execution, logging, osm, script
32 32 from IPython.testing import decorators as dec
33 33 from IPython.testing import tools as tt
34 34 from IPython.utils.io import capture_output
35 35 from IPython.utils.process import find_cmd
36 36 from IPython.utils.tempdir import TemporaryDirectory, TemporaryWorkingDirectory
37 37
38 38 from .test_debugger import PdbTestInput
39 39
40 40
41 41 @magic.magics_class
42 42 class DummyMagics(magic.Magics): pass
43 43
44 44 def test_extract_code_ranges():
45 45 instr = "1 3 5-6 7-9 10:15 17: :10 10- -13 :"
46 46 expected = [
47 47 (0, 1),
48 48 (2, 3),
49 49 (4, 6),
50 50 (6, 9),
51 51 (9, 14),
52 52 (16, None),
53 53 (None, 9),
54 54 (9, None),
55 55 (None, 13),
56 56 (None, None),
57 57 ]
58 58 actual = list(code.extract_code_ranges(instr))
59 59 assert actual == expected
60 60
61 61 def test_extract_symbols():
62 62 source = """import foo\na = 10\ndef b():\n return 42\n\n\nclass A: pass\n\n\n"""
63 63 symbols_args = ["a", "b", "A", "A,b", "A,a", "z"]
64 64 expected = [([], ['a']),
65 65 (["def b():\n return 42\n"], []),
66 66 (["class A: pass\n"], []),
67 67 (["class A: pass\n", "def b():\n return 42\n"], []),
68 68 (["class A: pass\n"], ['a']),
69 69 ([], ['z'])]
70 70 for symbols, exp in zip(symbols_args, expected):
71 71 assert code.extract_symbols(source, symbols) == exp
72 72
73 73
74 74 def test_extract_symbols_raises_exception_with_non_python_code():
75 75 source = ("=begin A Ruby program :)=end\n"
76 76 "def hello\n"
77 77 "puts 'Hello world'\n"
78 78 "end")
79 79 with pytest.raises(SyntaxError):
80 80 code.extract_symbols(source, "hello")
81 81
82 82
83 83 def test_magic_not_found():
84 84 # magic not found raises UsageError
85 85 with pytest.raises(UsageError):
86 86 _ip.magic('doesntexist')
87 87
88 88 # ensure result isn't success when a magic isn't found
89 89 result = _ip.run_cell('%doesntexist')
90 90 assert isinstance(result.error_in_exec, UsageError)
91 91
92 92
93 93 def test_cell_magic_not_found():
94 94 # magic not found raises UsageError
95 95 with pytest.raises(UsageError):
96 96 _ip.run_cell_magic('doesntexist', 'line', 'cell')
97 97
98 98 # ensure result isn't success when a magic isn't found
99 99 result = _ip.run_cell('%%doesntexist')
100 100 assert isinstance(result.error_in_exec, UsageError)
101 101
102 102
103 103 def test_magic_error_status():
104 104 def fail(shell):
105 105 1/0
106 106 _ip.register_magic_function(fail)
107 107 result = _ip.run_cell('%fail')
108 108 assert isinstance(result.error_in_exec, ZeroDivisionError)
109 109
110 110
111 111 def test_config():
112 112 """ test that config magic does not raise
113 113 can happen if Configurable init is moved too early into
114 114 Magics.__init__ as then a Config object will be registered as a
115 115 magic.
116 116 """
117 117 ## should not raise.
118 118 _ip.magic('config')
119 119
120 120 def test_config_available_configs():
121 121 """ test that config magic prints available configs in unique and
122 122 sorted order. """
123 123 with capture_output() as captured:
124 124 _ip.magic('config')
125 125
126 126 stdout = captured.stdout
127 127 config_classes = stdout.strip().split('\n')[1:]
128 128 assert config_classes == sorted(set(config_classes))
129 129
130 130 def test_config_print_class():
131 131 """ test that config with a classname prints the class's options. """
132 132 with capture_output() as captured:
133 133 _ip.magic('config TerminalInteractiveShell')
134 134
135 135 stdout = captured.stdout
136 136 assert re.match(
137 137 "TerminalInteractiveShell.* options", stdout.splitlines()[0]
138 138 ), f"{stdout}\n\n1st line of stdout not like 'TerminalInteractiveShell.* options'"
139 139
140 140
141 141 def test_rehashx():
142 142 # clear up everything
143 143 _ip.alias_manager.clear_aliases()
144 144 del _ip.db['syscmdlist']
145 145
146 146 _ip.magic('rehashx')
147 147 # Practically ALL ipython development systems will have more than 10 aliases
148 148
149 149 assert len(_ip.alias_manager.aliases) > 10
150 150 for name, cmd in _ip.alias_manager.aliases:
151 151 # we must strip dots from alias names
152 152 assert "." not in name
153 153
154 154 # rehashx must fill up syscmdlist
155 155 scoms = _ip.db['syscmdlist']
156 156 assert len(scoms) > 10
157 157
158 158
159 159 def test_magic_parse_options():
160 160 """Test that we don't mangle paths when parsing magic options."""
161 161 ip = get_ipython()
162 162 path = 'c:\\x'
163 163 m = DummyMagics(ip)
164 164 opts = m.parse_options('-f %s' % path,'f:')[0]
165 165 # argv splitting is os-dependent
166 166 if os.name == 'posix':
167 167 expected = 'c:x'
168 168 else:
169 169 expected = path
170 170 assert opts["f"] == expected
171 171
172 172
173 173 def test_magic_parse_long_options():
174 174 """Magic.parse_options can handle --foo=bar long options"""
175 175 ip = get_ipython()
176 176 m = DummyMagics(ip)
177 177 opts, _ = m.parse_options("--foo --bar=bubble", "a", "foo", "bar=")
178 178 assert "foo" in opts
179 179 assert "bar" in opts
180 180 assert opts["bar"] == "bubble"
181 181
182 182
183 183 def doctest_hist_f():
184 184 """Test %hist -f with temporary filename.
185 185
186 186 In [9]: import tempfile
187 187
188 188 In [10]: tfile = tempfile.mktemp('.py','tmp-ipython-')
189 189
190 190 In [11]: %hist -nl -f $tfile 3
191 191
192 192 In [13]: import os; os.unlink(tfile)
193 193 """
194 194
195 195
196 196 def doctest_hist_op():
197 197 """Test %hist -op
198 198
199 199 In [1]: class b(float):
200 200 ...: pass
201 201 ...:
202 202
203 203 In [2]: class s(object):
204 204 ...: def __str__(self):
205 205 ...: return 's'
206 206 ...:
207 207
208 208 In [3]:
209 209
210 210 In [4]: class r(b):
211 211 ...: def __repr__(self):
212 212 ...: return 'r'
213 213 ...:
214 214
215 215 In [5]: class sr(s,r): pass
216 216 ...:
217 217
218 218 In [6]:
219 219
220 220 In [7]: bb=b()
221 221
222 222 In [8]: ss=s()
223 223
224 224 In [9]: rr=r()
225 225
226 226 In [10]: ssrr=sr()
227 227
228 228 In [11]: 4.5
229 229 Out[11]: 4.5
230 230
231 231 In [12]: str(ss)
232 232 Out[12]: 's'
233 233
234 234 In [13]:
235 235
236 236 In [14]: %hist -op
237 237 >>> class b:
238 238 ... pass
239 239 ...
240 240 >>> class s(b):
241 241 ... def __str__(self):
242 242 ... return 's'
243 243 ...
244 244 >>>
245 245 >>> class r(b):
246 246 ... def __repr__(self):
247 247 ... return 'r'
248 248 ...
249 249 >>> class sr(s,r): pass
250 250 >>>
251 251 >>> bb=b()
252 252 >>> ss=s()
253 253 >>> rr=r()
254 254 >>> ssrr=sr()
255 255 >>> 4.5
256 256 4.5
257 257 >>> str(ss)
258 258 's'
259 259 >>>
260 260 """
261 261
262 262 def test_hist_pof():
263 263 ip = get_ipython()
264 264 ip.run_cell("1+2", store_history=True)
265 265 #raise Exception(ip.history_manager.session_number)
266 266 #raise Exception(list(ip.history_manager._get_range_session()))
267 267 with TemporaryDirectory() as td:
268 268 tf = os.path.join(td, 'hist.py')
269 269 ip.run_line_magic('history', '-pof %s' % tf)
270 270 assert os.path.isfile(tf)
271 271
272 272
273 273 def test_macro():
274 274 ip = get_ipython()
275 275 ip.history_manager.reset() # Clear any existing history.
276 276 cmds = ["a=1", "def b():\n return a**2", "print(a,b())"]
277 277 for i, cmd in enumerate(cmds, start=1):
278 278 ip.history_manager.store_inputs(i, cmd)
279 279 ip.magic("macro test 1-3")
280 280 assert ip.user_ns["test"].value == "\n".join(cmds) + "\n"
281 281
282 282 # List macros
283 283 assert "test" in ip.magic("macro")
284 284
285 285
286 286 def test_macro_run():
287 287 """Test that we can run a multi-line macro successfully."""
288 288 ip = get_ipython()
289 289 ip.history_manager.reset()
290 290 cmds = ["a=10", "a+=1", "print(a)", "%macro test 2-3"]
291 291 for cmd in cmds:
292 292 ip.run_cell(cmd, store_history=True)
293 293 assert ip.user_ns["test"].value == "a+=1\nprint(a)\n"
294 294 with tt.AssertPrints("12"):
295 295 ip.run_cell("test")
296 296 with tt.AssertPrints("13"):
297 297 ip.run_cell("test")
298 298
299 299
300 300 def test_magic_magic():
301 301 """Test %magic"""
302 302 ip = get_ipython()
303 303 with capture_output() as captured:
304 304 ip.magic("magic")
305 305
306 306 stdout = captured.stdout
307 307 assert "%magic" in stdout
308 308 assert "IPython" in stdout
309 309 assert "Available" in stdout
310 310
311 311
312 312 @dec.skipif_not_numpy
313 313 def test_numpy_reset_array_undec():
314 314 "Test '%reset array' functionality"
315 315 _ip.ex("import numpy as np")
316 316 _ip.ex("a = np.empty(2)")
317 317 assert "a" in _ip.user_ns
318 318 _ip.magic("reset -f array")
319 319 assert "a" not in _ip.user_ns
320 320
321 321
322 322 def test_reset_out():
323 323 "Test '%reset out' magic"
324 324 _ip.run_cell("parrot = 'dead'", store_history=True)
325 325 # test '%reset -f out', make an Out prompt
326 326 _ip.run_cell("parrot", store_history=True)
327 327 assert "dead" in [_ip.user_ns[x] for x in ("_", "__", "___")]
328 328 _ip.magic("reset -f out")
329 329 assert "dead" not in [_ip.user_ns[x] for x in ("_", "__", "___")]
330 330 assert len(_ip.user_ns["Out"]) == 0
331 331
332 332
333 333 def test_reset_in():
334 334 "Test '%reset in' magic"
335 335 # test '%reset -f in'
336 336 _ip.run_cell("parrot", store_history=True)
337 337 assert "parrot" in [_ip.user_ns[x] for x in ("_i", "_ii", "_iii")]
338 338 _ip.magic("%reset -f in")
339 339 assert "parrot" not in [_ip.user_ns[x] for x in ("_i", "_ii", "_iii")]
340 340 assert len(set(_ip.user_ns["In"])) == 1
341 341
342 342
343 343 def test_reset_dhist():
344 344 "Test '%reset dhist' magic"
345 345 _ip.run_cell("tmp = [d for d in _dh]") # copy before clearing
346 346 _ip.magic("cd " + os.path.dirname(pytest.__file__))
347 347 _ip.magic("cd -")
348 348 assert len(_ip.user_ns["_dh"]) > 0
349 349 _ip.magic("reset -f dhist")
350 350 assert len(_ip.user_ns["_dh"]) == 0
351 351 _ip.run_cell("_dh = [d for d in tmp]") # restore
352 352
353 353
354 354 def test_reset_in_length():
355 355 "Test that '%reset in' preserves In[] length"
356 356 _ip.run_cell("print 'foo'")
357 357 _ip.run_cell("reset -f in")
358 358 assert len(_ip.user_ns["In"]) == _ip.displayhook.prompt_count + 1
359 359
360 360
361 361 class TestResetErrors(TestCase):
362 362
363 363 def test_reset_redefine(self):
364 364
365 365 @magics_class
366 366 class KernelMagics(Magics):
367 367 @line_magic
368 368 def less(self, shell): pass
369 369
370 370 _ip.register_magics(KernelMagics)
371 371
372 372 with self.assertLogs() as cm:
373 373 # hack, we want to just capture logs, but assertLogs fails if not
374 374 # logs get produce.
375 375 # so log one things we ignore.
376 376 import logging as log_mod
377 377 log = log_mod.getLogger()
378 378 log.info('Nothing')
379 379 # end hack.
380 380 _ip.run_cell("reset -f")
381 381
382 382 assert len(cm.output) == 1
383 383 for out in cm.output:
384 384 assert "Invalid alias" not in out
385 385
386 386 def test_tb_syntaxerror():
387 387 """test %tb after a SyntaxError"""
388 388 ip = get_ipython()
389 389 ip.run_cell("for")
390 390
391 391 # trap and validate stdout
392 392 save_stdout = sys.stdout
393 393 try:
394 394 sys.stdout = StringIO()
395 395 ip.run_cell("%tb")
396 396 out = sys.stdout.getvalue()
397 397 finally:
398 398 sys.stdout = save_stdout
399 399 # trim output, and only check the last line
400 400 last_line = out.rstrip().splitlines()[-1].strip()
401 401 assert last_line == "SyntaxError: invalid syntax"
402 402
403 403
404 404 def test_time():
405 405 ip = get_ipython()
406 406
407 407 with tt.AssertPrints("Wall time: "):
408 408 ip.run_cell("%time None")
409 409
410 410 ip.run_cell("def f(kmjy):\n"
411 411 " %time print (2*kmjy)")
412 412
413 413 with tt.AssertPrints("Wall time: "):
414 414 with tt.AssertPrints("hihi", suppress=False):
415 415 ip.run_cell("f('hi')")
416 416
417 417 def test_time_last_not_expression():
418 418 ip.run_cell("%%time\n"
419 419 "var_1 = 1\n"
420 420 "var_2 = 2\n")
421 421 assert ip.user_ns['var_1'] == 1
422 422 del ip.user_ns['var_1']
423 423 assert ip.user_ns['var_2'] == 2
424 424 del ip.user_ns['var_2']
425 425
426 426
427 427 @dec.skip_win32
428 428 def test_time2():
429 429 ip = get_ipython()
430 430
431 431 with tt.AssertPrints("CPU times: user "):
432 432 ip.run_cell("%time None")
433 433
434 434 def test_time3():
435 435 """Erroneous magic function calls, issue gh-3334"""
436 436 ip = get_ipython()
437 437 ip.user_ns.pop('run', None)
438 438
439 439 with tt.AssertNotPrints("not found", channel='stderr'):
440 440 ip.run_cell("%%time\n"
441 441 "run = 0\n"
442 442 "run += 1")
443 443
444 444 def test_multiline_time():
445 445 """Make sure last statement from time return a value."""
446 446 ip = get_ipython()
447 447 ip.user_ns.pop('run', None)
448 448
449 449 ip.run_cell(dedent("""\
450 450 %%time
451 451 a = "ho"
452 452 b = "hey"
453 453 a+b
454 454 """
455 455 )
456 456 )
457 457 assert ip.user_ns_hidden["_"] == "hohey"
458 458
459 459
460 460 def test_time_local_ns():
461 461 """
462 462 Test that local_ns is actually global_ns when running a cell magic
463 463 """
464 464 ip = get_ipython()
465 465 ip.run_cell("%%time\n" "myvar = 1")
466 466 assert ip.user_ns["myvar"] == 1
467 467 del ip.user_ns["myvar"]
468 468
469 469
470 470 def test_doctest_mode():
471 471 "Toggle doctest_mode twice, it should be a no-op and run without error"
472 472 _ip.magic('doctest_mode')
473 473 _ip.magic('doctest_mode')
474 474
475 475
476 476 def test_parse_options():
477 477 """Tests for basic options parsing in magics."""
478 478 # These are only the most minimal of tests, more should be added later. At
479 479 # the very least we check that basic text/unicode calls work OK.
480 480 m = DummyMagics(_ip)
481 481 assert m.parse_options("foo", "")[1] == "foo"
482 482 assert m.parse_options("foo", "")[1] == "foo"
483 483
484 484
485 485 def test_parse_options_preserve_non_option_string():
486 486 """Test to assert preservation of non-option part of magic-block, while parsing magic options."""
487 487 m = DummyMagics(_ip)
488 488 opts, stmt = m.parse_options(
489 489 " -n1 -r 13 _ = 314 + foo", "n:r:", preserve_non_opts=True
490 490 )
491 491 assert opts == {"n": "1", "r": "13"}
492 492 assert stmt == "_ = 314 + foo"
493 493
494 494
495 495 def test_run_magic_preserve_code_block():
496 496 """Test to assert preservation of non-option part of magic-block, while running magic."""
497 497 _ip.user_ns["spaces"] = []
498 498 _ip.magic("timeit -n1 -r1 spaces.append([s.count(' ') for s in ['document']])")
499 499 assert _ip.user_ns["spaces"] == [[0]]
500 500
501 501
502 502 def test_dirops():
503 503 """Test various directory handling operations."""
504 504 # curpath = lambda :os.path.splitdrive(os.getcwd())[1].replace('\\','/')
505 505 curpath = os.getcwd
506 506 startdir = os.getcwd()
507 507 ipdir = os.path.realpath(_ip.ipython_dir)
508 508 try:
509 509 _ip.magic('cd "%s"' % ipdir)
510 510 assert curpath() == ipdir
511 511 _ip.magic('cd -')
512 512 assert curpath() == startdir
513 513 _ip.magic('pushd "%s"' % ipdir)
514 514 assert curpath() == ipdir
515 515 _ip.magic('popd')
516 516 assert curpath() == startdir
517 517 finally:
518 518 os.chdir(startdir)
519 519
520 520
521 521 def test_cd_force_quiet():
522 522 """Test OSMagics.cd_force_quiet option"""
523 523 _ip.config.OSMagics.cd_force_quiet = True
524 524 osmagics = osm.OSMagics(shell=_ip)
525 525
526 526 startdir = os.getcwd()
527 527 ipdir = os.path.realpath(_ip.ipython_dir)
528 528
529 529 try:
530 530 with tt.AssertNotPrints(ipdir):
531 531 osmagics.cd('"%s"' % ipdir)
532 532 with tt.AssertNotPrints(startdir):
533 533 osmagics.cd('-')
534 534 finally:
535 535 os.chdir(startdir)
536 536
537 537
538 538 def test_xmode():
539 539 # Calling xmode three times should be a no-op
540 540 xmode = _ip.InteractiveTB.mode
541 541 for i in range(4):
542 542 _ip.magic("xmode")
543 543 assert _ip.InteractiveTB.mode == xmode
544 544
545 545 def test_reset_hard():
546 546 monitor = []
547 547 class A(object):
548 548 def __del__(self):
549 549 monitor.append(1)
550 550 def __repr__(self):
551 551 return "<A instance>"
552 552
553 553 _ip.user_ns["a"] = A()
554 554 _ip.run_cell("a")
555 555
556 556 assert monitor == []
557 557 _ip.magic("reset -f")
558 558 assert monitor == [1]
559 559
560 560 class TestXdel(tt.TempFileMixin):
561 561 def test_xdel(self):
562 562 """Test that references from %run are cleared by xdel."""
563 563 src = ("class A(object):\n"
564 564 " monitor = []\n"
565 565 " def __del__(self):\n"
566 566 " self.monitor.append(1)\n"
567 567 "a = A()\n")
568 568 self.mktmp(src)
569 569 # %run creates some hidden references...
570 570 _ip.magic("run %s" % self.fname)
571 571 # ... as does the displayhook.
572 572 _ip.run_cell("a")
573 573
574 574 monitor = _ip.user_ns["A"].monitor
575 575 assert monitor == []
576 576
577 577 _ip.magic("xdel a")
578 578
579 579 # Check that a's __del__ method has been called.
580 580 gc.collect(0)
581 581 assert monitor == [1]
582 582
583 583 def doctest_who():
584 584 """doctest for %who
585 585
586 586 In [1]: %reset -sf
587 587
588 588 In [2]: alpha = 123
589 589
590 590 In [3]: beta = 'beta'
591 591
592 592 In [4]: %who int
593 593 alpha
594 594
595 595 In [5]: %who str
596 596 beta
597 597
598 598 In [6]: %whos
599 599 Variable Type Data/Info
600 600 ----------------------------
601 601 alpha int 123
602 602 beta str beta
603 603
604 604 In [7]: %who_ls
605 605 Out[7]: ['alpha', 'beta']
606 606 """
607 607
608 608 def test_whos():
609 609 """Check that whos is protected against objects where repr() fails."""
610 610 class A(object):
611 611 def __repr__(self):
612 612 raise Exception()
613 613 _ip.user_ns['a'] = A()
614 614 _ip.magic("whos")
615 615
616 616 def doctest_precision():
617 617 """doctest for %precision
618 618
619 619 In [1]: f = get_ipython().display_formatter.formatters['text/plain']
620 620
621 621 In [2]: %precision 5
622 622 Out[2]: '%.5f'
623 623
624 624 In [3]: f.float_format
625 625 Out[3]: '%.5f'
626 626
627 627 In [4]: %precision %e
628 628 Out[4]: '%e'
629 629
630 630 In [5]: f(3.1415927)
631 631 Out[5]: '3.141593e+00'
632 632 """
633 633
634 634 def test_debug_magic():
635 635 """Test debugging a small code with %debug
636 636
637 637 In [1]: with PdbTestInput(['c']):
638 638 ...: %debug print("a b") #doctest: +ELLIPSIS
639 639 ...:
640 640 ...
641 641 ipdb> c
642 642 a b
643 643 In [2]:
644 644 """
645 645
646 646 def test_psearch():
647 647 with tt.AssertPrints("dict.fromkeys"):
648 648 _ip.run_cell("dict.fr*?")
649 649 with tt.AssertPrints("Ο€.is_integer"):
650 650 _ip.run_cell("Ο€ = 3.14;\nΟ€.is_integ*?")
651 651
652 652 def test_timeit_shlex():
653 653 """test shlex issues with timeit (#1109)"""
654 654 _ip.ex("def f(*a,**kw): pass")
655 655 _ip.magic('timeit -n1 "this is a bug".count(" ")')
656 656 _ip.magic('timeit -r1 -n1 f(" ", 1)')
657 657 _ip.magic('timeit -r1 -n1 f(" ", 1, " ", 2, " ")')
658 658 _ip.magic('timeit -r1 -n1 ("a " + "b")')
659 659 _ip.magic('timeit -r1 -n1 f("a " + "b")')
660 660 _ip.magic('timeit -r1 -n1 f("a " + "b ")')
661 661
662 662
663 663 def test_timeit_special_syntax():
664 664 "Test %%timeit with IPython special syntax"
665 665 @register_line_magic
666 666 def lmagic(line):
667 667 ip = get_ipython()
668 668 ip.user_ns['lmagic_out'] = line
669 669
670 670 # line mode test
671 671 _ip.run_line_magic("timeit", "-n1 -r1 %lmagic my line")
672 672 assert _ip.user_ns["lmagic_out"] == "my line"
673 673 # cell mode test
674 674 _ip.run_cell_magic("timeit", "-n1 -r1", "%lmagic my line2")
675 675 assert _ip.user_ns["lmagic_out"] == "my line2"
676 676
677 677
678 678 def test_timeit_return():
679 679 """
680 680 test whether timeit -o return object
681 681 """
682 682
683 683 res = _ip.run_line_magic('timeit','-n10 -r10 -o 1')
684 684 assert(res is not None)
685 685
686 686 def test_timeit_quiet():
687 687 """
688 688 test quiet option of timeit magic
689 689 """
690 690 with tt.AssertNotPrints("loops"):
691 691 _ip.run_cell("%timeit -n1 -r1 -q 1")
692 692
693 693 def test_timeit_return_quiet():
694 694 with tt.AssertNotPrints("loops"):
695 695 res = _ip.run_line_magic('timeit', '-n1 -r1 -q -o 1')
696 696 assert (res is not None)
697 697
698 698 def test_timeit_invalid_return():
699 699 with pytest.raises(SyntaxError):
700 700 _ip.run_line_magic('timeit', 'return')
701 701
702 702 @dec.skipif(execution.profile is None)
703 703 def test_prun_special_syntax():
704 704 "Test %%prun with IPython special syntax"
705 705 @register_line_magic
706 706 def lmagic(line):
707 707 ip = get_ipython()
708 708 ip.user_ns['lmagic_out'] = line
709 709
710 710 # line mode test
711 711 _ip.run_line_magic("prun", "-q %lmagic my line")
712 712 assert _ip.user_ns["lmagic_out"] == "my line"
713 713 # cell mode test
714 714 _ip.run_cell_magic("prun", "-q", "%lmagic my line2")
715 715 assert _ip.user_ns["lmagic_out"] == "my line2"
716 716
717 717
718 718 @dec.skipif(execution.profile is None)
719 719 def test_prun_quotes():
720 720 "Test that prun does not clobber string escapes (GH #1302)"
721 721 _ip.magic(r"prun -q x = '\t'")
722 722 assert _ip.user_ns["x"] == "\t"
723 723
724 724
725 725 def test_extension():
726 726 # Debugging information for failures of this test
727 727 print('sys.path:')
728 728 for p in sys.path:
729 729 print(' ', p)
730 730 print('CWD', os.getcwd())
731 731
732 732 pytest.raises(ImportError, _ip.magic, "load_ext daft_extension")
733 733 daft_path = os.path.join(os.path.dirname(__file__), "daft_extension")
734 734 sys.path.insert(0, daft_path)
735 735 try:
736 736 _ip.user_ns.pop('arq', None)
737 737 invalidate_caches() # Clear import caches
738 738 _ip.magic("load_ext daft_extension")
739 739 assert _ip.user_ns["arq"] == 185
740 740 _ip.magic("unload_ext daft_extension")
741 741 assert 'arq' not in _ip.user_ns
742 742 finally:
743 743 sys.path.remove(daft_path)
744 744
745 745
746 746 def test_notebook_export_json():
747 747 pytest.importorskip("nbformat")
748 748 _ip = get_ipython()
749 749 _ip.history_manager.reset() # Clear any existing history.
750 750 cmds = ["a=1", "def b():\n return a**2", "print('noΓ«l, Γ©tΓ©', b())"]
751 751 for i, cmd in enumerate(cmds, start=1):
752 752 _ip.history_manager.store_inputs(i, cmd)
753 753 with TemporaryDirectory() as td:
754 754 outfile = os.path.join(td, "nb.ipynb")
755 755 _ip.magic("notebook %s" % outfile)
756 756
757 757
758 758 class TestEnv(TestCase):
759 759
760 760 def test_env(self):
761 761 env = _ip.magic("env")
762 762 self.assertTrue(isinstance(env, dict))
763 763
764 764 def test_env_secret(self):
765 765 env = _ip.magic("env")
766 766 hidden = "<hidden>"
767 767 with mock.patch.dict(
768 768 os.environ,
769 769 {
770 770 "API_KEY": "abc123",
771 771 "SECRET_THING": "ssshhh",
772 772 "JUPYTER_TOKEN": "",
773 773 "VAR": "abc"
774 774 }
775 775 ):
776 776 env = _ip.magic("env")
777 777 assert env["API_KEY"] == hidden
778 778 assert env["SECRET_THING"] == hidden
779 779 assert env["JUPYTER_TOKEN"] == hidden
780 780 assert env["VAR"] == "abc"
781 781
782 782 def test_env_get_set_simple(self):
783 783 env = _ip.magic("env var val1")
784 784 self.assertEqual(env, None)
785 785 self.assertEqual(os.environ['var'], 'val1')
786 786 self.assertEqual(_ip.magic("env var"), 'val1')
787 787 env = _ip.magic("env var=val2")
788 788 self.assertEqual(env, None)
789 789 self.assertEqual(os.environ['var'], 'val2')
790 790
791 791 def test_env_get_set_complex(self):
792 792 env = _ip.magic("env var 'val1 '' 'val2")
793 793 self.assertEqual(env, None)
794 794 self.assertEqual(os.environ['var'], "'val1 '' 'val2")
795 795 self.assertEqual(_ip.magic("env var"), "'val1 '' 'val2")
796 796 env = _ip.magic('env var=val2 val3="val4')
797 797 self.assertEqual(env, None)
798 798 self.assertEqual(os.environ['var'], 'val2 val3="val4')
799 799
800 800 def test_env_set_bad_input(self):
801 801 self.assertRaises(UsageError, lambda: _ip.magic("set_env var"))
802 802
803 803 def test_env_set_whitespace(self):
804 804 self.assertRaises(UsageError, lambda: _ip.magic("env var A=B"))
805 805
806 806
807 807 class CellMagicTestCase(TestCase):
808 808
809 809 def check_ident(self, magic):
810 810 # Manually called, we get the result
811 811 out = _ip.run_cell_magic(magic, "a", "b")
812 812 assert out == ("a", "b")
813 813 # Via run_cell, it goes into the user's namespace via displayhook
814 814 _ip.run_cell("%%" + magic + " c\nd\n")
815 815 assert _ip.user_ns["_"] == ("c", "d\n")
816 816
817 817 def test_cell_magic_func_deco(self):
818 818 "Cell magic using simple decorator"
819 819 @register_cell_magic
820 820 def cellm(line, cell):
821 821 return line, cell
822 822
823 823 self.check_ident('cellm')
824 824
825 825 def test_cell_magic_reg(self):
826 826 "Cell magic manually registered"
827 827 def cellm(line, cell):
828 828 return line, cell
829 829
830 830 _ip.register_magic_function(cellm, 'cell', 'cellm2')
831 831 self.check_ident('cellm2')
832 832
833 833 def test_cell_magic_class(self):
834 834 "Cell magics declared via a class"
835 835 @magics_class
836 836 class MyMagics(Magics):
837 837
838 838 @cell_magic
839 839 def cellm3(self, line, cell):
840 840 return line, cell
841 841
842 842 _ip.register_magics(MyMagics)
843 843 self.check_ident('cellm3')
844 844
845 845 def test_cell_magic_class2(self):
846 846 "Cell magics declared via a class, #2"
847 847 @magics_class
848 848 class MyMagics2(Magics):
849 849
850 850 @cell_magic('cellm4')
851 851 def cellm33(self, line, cell):
852 852 return line, cell
853 853
854 854 _ip.register_magics(MyMagics2)
855 855 self.check_ident('cellm4')
856 856 # Check that nothing is registered as 'cellm33'
857 857 c33 = _ip.find_cell_magic('cellm33')
858 858 assert c33 == None
859 859
860 860 def test_file():
861 861 """Basic %%writefile"""
862 862 ip = get_ipython()
863 863 with TemporaryDirectory() as td:
864 864 fname = os.path.join(td, 'file1')
865 865 ip.run_cell_magic("writefile", fname, u'\n'.join([
866 866 'line1',
867 867 'line2',
868 868 ]))
869 869 s = Path(fname).read_text()
870 870 assert "line1\n" in s
871 871 assert "line2" in s
872 872
873 873
874 874 @dec.skip_win32
875 875 def test_file_single_quote():
876 876 """Basic %%writefile with embedded single quotes"""
877 877 ip = get_ipython()
878 878 with TemporaryDirectory() as td:
879 879 fname = os.path.join(td, '\'file1\'')
880 880 ip.run_cell_magic("writefile", fname, u'\n'.join([
881 881 'line1',
882 882 'line2',
883 883 ]))
884 884 s = Path(fname).read_text()
885 885 assert "line1\n" in s
886 886 assert "line2" in s
887 887
888 888
889 889 @dec.skip_win32
890 890 def test_file_double_quote():
891 891 """Basic %%writefile with embedded double quotes"""
892 892 ip = get_ipython()
893 893 with TemporaryDirectory() as td:
894 894 fname = os.path.join(td, '"file1"')
895 895 ip.run_cell_magic("writefile", fname, u'\n'.join([
896 896 'line1',
897 897 'line2',
898 898 ]))
899 899 s = Path(fname).read_text()
900 900 assert "line1\n" in s
901 901 assert "line2" in s
902 902
903 903
904 904 def test_file_var_expand():
905 905 """%%writefile $filename"""
906 906 ip = get_ipython()
907 907 with TemporaryDirectory() as td:
908 908 fname = os.path.join(td, 'file1')
909 909 ip.user_ns['filename'] = fname
910 910 ip.run_cell_magic("writefile", '$filename', u'\n'.join([
911 911 'line1',
912 912 'line2',
913 913 ]))
914 914 s = Path(fname).read_text()
915 915 assert "line1\n" in s
916 916 assert "line2" in s
917 917
918 918
919 919 def test_file_unicode():
920 920 """%%writefile with unicode cell"""
921 921 ip = get_ipython()
922 922 with TemporaryDirectory() as td:
923 923 fname = os.path.join(td, 'file1')
924 924 ip.run_cell_magic("writefile", fname, u'\n'.join([
925 925 u'linΓ©1',
926 926 u'linΓ©2',
927 927 ]))
928 928 with io.open(fname, encoding='utf-8') as f:
929 929 s = f.read()
930 930 assert "linΓ©1\n" in s
931 931 assert "linΓ©2" in s
932 932
933 933
934 934 def test_file_amend():
935 935 """%%writefile -a amends files"""
936 936 ip = get_ipython()
937 937 with TemporaryDirectory() as td:
938 938 fname = os.path.join(td, 'file2')
939 939 ip.run_cell_magic("writefile", fname, u'\n'.join([
940 940 'line1',
941 941 'line2',
942 942 ]))
943 943 ip.run_cell_magic("writefile", "-a %s" % fname, u'\n'.join([
944 944 'line3',
945 945 'line4',
946 946 ]))
947 947 s = Path(fname).read_text()
948 948 assert "line1\n" in s
949 949 assert "line3\n" in s
950 950
951 951
952 952 def test_file_spaces():
953 953 """%%file with spaces in filename"""
954 954 ip = get_ipython()
955 955 with TemporaryWorkingDirectory() as td:
956 956 fname = "file name"
957 957 ip.run_cell_magic("file", '"%s"'%fname, u'\n'.join([
958 958 'line1',
959 959 'line2',
960 960 ]))
961 961 s = Path(fname).read_text()
962 962 assert "line1\n" in s
963 963 assert "line2" in s
964 964
965 965
966 966 def test_script_config():
967 967 ip = get_ipython()
968 968 ip.config.ScriptMagics.script_magics = ['whoda']
969 969 sm = script.ScriptMagics(shell=ip)
970 970 assert "whoda" in sm.magics["cell"]
971 971
972 972
973 973 @pytest.fixture
974 974 def event_loop():
975 975 policy = asyncio.get_event_loop_policy()
976 976 loop = policy.new_event_loop()
977 977 policy.set_event_loop(loop)
978 978 yield loop
979 979 loop.close()
980 980
981 981
982 982 @dec.skip_win32
983 983 @pytest.mark.skipif(
984 984 sys.platform == "win32", reason="This test does not run under Windows"
985 985 )
986 986 def test_script_out(event_loop):
987 987 assert event_loop.is_running() is False
988 988
989 989 ip = get_ipython()
990 990 ip.run_cell_magic("script", "--out output sh", "echo 'hi'")
991 assert event_loop.is_running() is False
991 assert not event_loop.is_running()
992 992 assert ip.user_ns["output"] == "hi\n"
993 993
994 994
995 995 @dec.skip_win32
996 996 @pytest.mark.skipif(
997 997 sys.platform == "win32", reason="This test does not run under Windows"
998 998 )
999 999 def test_script_err(event_loop):
1000 1000 ip = get_ipython()
1001 assert event_loop.is_running() is False
1001 assert not event_loop.is_running()
1002 1002 ip.run_cell_magic("script", "--err error sh", "echo 'hello' >&2")
1003 assert event_loop.is_running() is False
1003 assert not event_loop.is_running()
1004 1004 assert ip.user_ns["error"] == "hello\n"
1005 1005
1006 1006
1007 1007 @dec.skip_win32
1008 1008 @pytest.mark.skipif(
1009 1009 sys.platform == "win32", reason="This test does not run under Windows"
1010 1010 )
1011 1011 def test_script_out_err():
1012 1012
1013 1013 ip = get_ipython()
1014 1014 ip.run_cell_magic(
1015 1015 "script", "--out output --err error sh", "echo 'hi'\necho 'hello' >&2"
1016 1016 )
1017 1017 assert ip.user_ns["output"] == "hi\n"
1018 1018 assert ip.user_ns["error"] == "hello\n"
1019 1019
1020 1020
1021 1021 @dec.skip_win32
1022 1022 @pytest.mark.skipif(
1023 1023 sys.platform == "win32", reason="This test does not run under Windows"
1024 1024 )
1025 async def test_script_bg_out(event_loop):
1025 async def test_script_bg_out():
1026 1026 ip = get_ipython()
1027 1027 ip.run_cell_magic("script", "--bg --out output sh", "echo 'hi'")
1028 1028 assert (await ip.user_ns["output"].read()) == b"hi\n"
1029 ip.user_ns["output"].close()
1030 event_loop.stop()
1031
1029 assert ip.user_ns["output"].at_eof()
1032 1030
1033 1031 @dec.skip_win32
1034 1032 @pytest.mark.skipif(
1035 1033 sys.platform == "win32", reason="This test does not run under Windows"
1036 1034 )
1037 1035 async def test_script_bg_err():
1038 1036 ip = get_ipython()
1039 1037 ip.run_cell_magic("script", "--bg --err error sh", "echo 'hello' >&2")
1040 1038 assert (await ip.user_ns["error"].read()) == b"hello\n"
1041 ip.user_ns["error"].close()
1039 assert ip.user_ns["error"].at_eof()
1042 1040
1043 1041
1044 1042 @dec.skip_win32
1045 1043 @pytest.mark.skipif(
1046 1044 sys.platform == "win32", reason="This test does not run under Windows"
1047 1045 )
1048 1046 async def test_script_bg_out_err():
1049 1047 ip = get_ipython()
1050 1048 ip.run_cell_magic(
1051 1049 "script", "--bg --out output --err error sh", "echo 'hi'\necho 'hello' >&2"
1052 1050 )
1053 1051 assert (await ip.user_ns["output"].read()) == b"hi\n"
1054 1052 assert (await ip.user_ns["error"].read()) == b"hello\n"
1055 ip.user_ns["output"].close()
1056 ip.user_ns["error"].close()
1053 assert ip.user_ns["output"].at_eof()
1054 assert ip.user_ns["error"].at_eof()
1055
1056
1057 @dec.skip_win32
1058 @pytest.mark.skipif(
1059 sys.platform == "win32", reason="This test does not run under Windows"
1060 )
1061 async def test_script_bg_proc():
1062 ip = get_ipython()
1063 ip.run_cell_magic(
1064 "script", "--bg --proc p sh --out out", "echo 'hi'\necho 'hello' >&2"
1065 )
1066 p = ip.user_ns["p"]
1067 await p.wait()
1068 assert p.returncode == 0
1069 assert (await p.stdout.read()) == b"hi\n"
1070 # not captured, so empty
1071 assert (await p.stderr.read()) == b""
1072 assert p.stdout.at_eof()
1073 assert p.stderr.at_eof()
1057 1074
1058 1075
1059 1076 def test_script_defaults():
1060 1077 ip = get_ipython()
1061 1078 for cmd in ['sh', 'bash', 'perl', 'ruby']:
1062 1079 try:
1063 1080 find_cmd(cmd)
1064 1081 except Exception:
1065 1082 pass
1066 1083 else:
1067 1084 assert cmd in ip.magics_manager.magics["cell"]
1068 1085
1069 1086
1070 1087 @magics_class
1071 1088 class FooFoo(Magics):
1072 1089 """class with both %foo and %%foo magics"""
1073 1090 @line_magic('foo')
1074 1091 def line_foo(self, line):
1075 1092 "I am line foo"
1076 1093 pass
1077 1094
1078 1095 @cell_magic("foo")
1079 1096 def cell_foo(self, line, cell):
1080 1097 "I am cell foo, not line foo"
1081 1098 pass
1082 1099
1083 1100 def test_line_cell_info():
1084 1101 """%%foo and %foo magics are distinguishable to inspect"""
1085 1102 ip = get_ipython()
1086 1103 ip.magics_manager.register(FooFoo)
1087 1104 oinfo = ip.object_inspect("foo")
1088 1105 assert oinfo["found"] is True
1089 1106 assert oinfo["ismagic"] is True
1090 1107
1091 1108 oinfo = ip.object_inspect("%%foo")
1092 1109 assert oinfo["found"] is True
1093 1110 assert oinfo["ismagic"] is True
1094 1111 assert oinfo["docstring"] == FooFoo.cell_foo.__doc__
1095 1112
1096 1113 oinfo = ip.object_inspect("%foo")
1097 1114 assert oinfo["found"] is True
1098 1115 assert oinfo["ismagic"] is True
1099 1116 assert oinfo["docstring"] == FooFoo.line_foo.__doc__
1100 1117
1101 1118
1102 1119 def test_multiple_magics():
1103 1120 ip = get_ipython()
1104 1121 foo1 = FooFoo(ip)
1105 1122 foo2 = FooFoo(ip)
1106 1123 mm = ip.magics_manager
1107 1124 mm.register(foo1)
1108 1125 assert mm.magics["line"]["foo"].__self__ is foo1
1109 1126 mm.register(foo2)
1110 1127 assert mm.magics["line"]["foo"].__self__ is foo2
1111 1128
1112 1129
1113 1130 def test_alias_magic():
1114 1131 """Test %alias_magic."""
1115 1132 ip = get_ipython()
1116 1133 mm = ip.magics_manager
1117 1134
1118 1135 # Basic operation: both cell and line magics are created, if possible.
1119 1136 ip.run_line_magic("alias_magic", "timeit_alias timeit")
1120 1137 assert "timeit_alias" in mm.magics["line"]
1121 1138 assert "timeit_alias" in mm.magics["cell"]
1122 1139
1123 1140 # --cell is specified, line magic not created.
1124 1141 ip.run_line_magic("alias_magic", "--cell timeit_cell_alias timeit")
1125 1142 assert "timeit_cell_alias" not in mm.magics["line"]
1126 1143 assert "timeit_cell_alias" in mm.magics["cell"]
1127 1144
1128 1145 # Test that line alias is created successfully.
1129 1146 ip.run_line_magic("alias_magic", "--line env_alias env")
1130 1147 assert ip.run_line_magic("env", "") == ip.run_line_magic("env_alias", "")
1131 1148
1132 1149 # Test that line alias with parameters passed in is created successfully.
1133 1150 ip.run_line_magic(
1134 1151 "alias_magic", "--line history_alias history --params " + shlex.quote("3")
1135 1152 )
1136 1153 assert "history_alias" in mm.magics["line"]
1137 1154
1138 1155
1139 1156 def test_save():
1140 1157 """Test %save."""
1141 1158 ip = get_ipython()
1142 1159 ip.history_manager.reset() # Clear any existing history.
1143 1160 cmds = ["a=1", "def b():\n return a**2", "print(a, b())"]
1144 1161 for i, cmd in enumerate(cmds, start=1):
1145 1162 ip.history_manager.store_inputs(i, cmd)
1146 1163 with TemporaryDirectory() as tmpdir:
1147 1164 file = os.path.join(tmpdir, "testsave.py")
1148 1165 ip.run_line_magic("save", "%s 1-10" % file)
1149 1166 content = Path(file).read_text()
1150 1167 assert content.count(cmds[0]) == 1
1151 1168 assert "coding: utf-8" in content
1152 1169 ip.run_line_magic("save", "-a %s 1-10" % file)
1153 1170 content = Path(file).read_text()
1154 1171 assert content.count(cmds[0]) == 2
1155 1172 assert "coding: utf-8" in content
1156 1173
1157 1174
1158 1175 def test_save_with_no_args():
1159 1176 ip = get_ipython()
1160 1177 ip.history_manager.reset() # Clear any existing history.
1161 1178 cmds = ["a=1", "def b():\n return a**2", "print(a, b())", "%save"]
1162 1179 for i, cmd in enumerate(cmds, start=1):
1163 1180 ip.history_manager.store_inputs(i, cmd)
1164 1181
1165 1182 with TemporaryDirectory() as tmpdir:
1166 1183 path = os.path.join(tmpdir, "testsave.py")
1167 1184 ip.run_line_magic("save", path)
1168 1185 content = Path(path).read_text()
1169 1186 expected_content = dedent(
1170 1187 """\
1171 1188 # coding: utf-8
1172 1189 a=1
1173 1190 def b():
1174 1191 return a**2
1175 1192 print(a, b())
1176 1193 """
1177 1194 )
1178 1195 assert content == expected_content
1179 1196
1180 1197
1181 1198 def test_store():
1182 1199 """Test %store."""
1183 1200 ip = get_ipython()
1184 1201 ip.run_line_magic('load_ext', 'storemagic')
1185 1202
1186 1203 # make sure the storage is empty
1187 1204 ip.run_line_magic("store", "-z")
1188 1205 ip.user_ns["var"] = 42
1189 1206 ip.run_line_magic("store", "var")
1190 1207 ip.user_ns["var"] = 39
1191 1208 ip.run_line_magic("store", "-r")
1192 1209 assert ip.user_ns["var"] == 42
1193 1210
1194 1211 ip.run_line_magic("store", "-d var")
1195 1212 ip.user_ns["var"] = 39
1196 1213 ip.run_line_magic("store", "-r")
1197 1214 assert ip.user_ns["var"] == 39
1198 1215
1199 1216
1200 1217 def _run_edit_test(arg_s, exp_filename=None,
1201 1218 exp_lineno=-1,
1202 1219 exp_contents=None,
1203 1220 exp_is_temp=None):
1204 1221 ip = get_ipython()
1205 1222 M = code.CodeMagics(ip)
1206 1223 last_call = ['','']
1207 1224 opts,args = M.parse_options(arg_s,'prxn:')
1208 1225 filename, lineno, is_temp = M._find_edit_target(ip, args, opts, last_call)
1209 1226
1210 1227 if exp_filename is not None:
1211 1228 assert exp_filename == filename
1212 1229 if exp_contents is not None:
1213 1230 with io.open(filename, 'r', encoding='utf-8') as f:
1214 1231 contents = f.read()
1215 1232 assert exp_contents == contents
1216 1233 if exp_lineno != -1:
1217 1234 assert exp_lineno == lineno
1218 1235 if exp_is_temp is not None:
1219 1236 assert exp_is_temp == is_temp
1220 1237
1221 1238
1222 1239 def test_edit_interactive():
1223 1240 """%edit on interactively defined objects"""
1224 1241 ip = get_ipython()
1225 1242 n = ip.execution_count
1226 1243 ip.run_cell("def foo(): return 1", store_history=True)
1227 1244
1228 1245 with pytest.raises(code.InteractivelyDefined) as e:
1229 1246 _run_edit_test("foo")
1230 1247 assert e.value.index == n
1231 1248
1232 1249
1233 1250 def test_edit_cell():
1234 1251 """%edit [cell id]"""
1235 1252 ip = get_ipython()
1236 1253
1237 1254 ip.run_cell("def foo(): return 1", store_history=True)
1238 1255
1239 1256 # test
1240 1257 _run_edit_test("1", exp_contents=ip.user_ns['In'][1], exp_is_temp=True)
1241 1258
1242 1259 def test_edit_fname():
1243 1260 """%edit file"""
1244 1261 # test
1245 1262 _run_edit_test("test file.py", exp_filename="test file.py")
1246 1263
1247 1264 def test_bookmark():
1248 1265 ip = get_ipython()
1249 1266 ip.run_line_magic('bookmark', 'bmname')
1250 1267 with tt.AssertPrints('bmname'):
1251 1268 ip.run_line_magic('bookmark', '-l')
1252 1269 ip.run_line_magic('bookmark', '-d bmname')
1253 1270
1254 1271 def test_ls_magic():
1255 1272 ip = get_ipython()
1256 1273 json_formatter = ip.display_formatter.formatters['application/json']
1257 1274 json_formatter.enabled = True
1258 1275 lsmagic = ip.magic('lsmagic')
1259 1276 with warnings.catch_warnings(record=True) as w:
1260 1277 j = json_formatter(lsmagic)
1261 1278 assert sorted(j) == ["cell", "line"]
1262 1279 assert w == [] # no warnings
1263 1280
1264 1281
1265 1282 def test_strip_initial_indent():
1266 1283 def sii(s):
1267 1284 lines = s.splitlines()
1268 1285 return '\n'.join(code.strip_initial_indent(lines))
1269 1286
1270 1287 assert sii(" a = 1\nb = 2") == "a = 1\nb = 2"
1271 1288 assert sii(" a\n b\nc") == "a\n b\nc"
1272 1289 assert sii("a\n b") == "a\n b"
1273 1290
1274 1291 def test_logging_magic_quiet_from_arg():
1275 1292 _ip.config.LoggingMagics.quiet = False
1276 1293 lm = logging.LoggingMagics(shell=_ip)
1277 1294 with TemporaryDirectory() as td:
1278 1295 try:
1279 1296 with tt.AssertNotPrints(re.compile("Activating.*")):
1280 1297 lm.logstart('-q {}'.format(
1281 1298 os.path.join(td, "quiet_from_arg.log")))
1282 1299 finally:
1283 1300 _ip.logger.logstop()
1284 1301
1285 1302 def test_logging_magic_quiet_from_config():
1286 1303 _ip.config.LoggingMagics.quiet = True
1287 1304 lm = logging.LoggingMagics(shell=_ip)
1288 1305 with TemporaryDirectory() as td:
1289 1306 try:
1290 1307 with tt.AssertNotPrints(re.compile("Activating.*")):
1291 1308 lm.logstart(os.path.join(td, "quiet_from_config.log"))
1292 1309 finally:
1293 1310 _ip.logger.logstop()
1294 1311
1295 1312
1296 1313 def test_logging_magic_not_quiet():
1297 1314 _ip.config.LoggingMagics.quiet = False
1298 1315 lm = logging.LoggingMagics(shell=_ip)
1299 1316 with TemporaryDirectory() as td:
1300 1317 try:
1301 1318 with tt.AssertPrints(re.compile("Activating.*")):
1302 1319 lm.logstart(os.path.join(td, "not_quiet.log"))
1303 1320 finally:
1304 1321 _ip.logger.logstop()
1305 1322
1306 1323
1307 1324 def test_time_no_var_expand():
1308 1325 _ip.user_ns['a'] = 5
1309 1326 _ip.user_ns['b'] = []
1310 1327 _ip.magic('time b.append("{a}")')
1311 1328 assert _ip.user_ns['b'] == ['{a}']
1312 1329
1313 1330
1314 1331 # this is slow, put at the end for local testing.
1315 1332 def test_timeit_arguments():
1316 1333 "Test valid timeit arguments, should not cause SyntaxError (GH #1269)"
1317 1334 _ip.magic("timeit -n1 -r1 a=('#')")
1318 1335
1319 1336
1320 1337 TEST_MODULE = """
1321 1338 print('Loaded my_tmp')
1322 1339 if __name__ == "__main__":
1323 1340 print('I just ran a script')
1324 1341 """
1325 1342
1326 1343
1327 1344 def test_run_module_from_import_hook():
1328 1345 "Test that a module can be loaded via an import hook"
1329 1346 with TemporaryDirectory() as tmpdir:
1330 1347 fullpath = os.path.join(tmpdir, 'my_tmp.py')
1331 1348 Path(fullpath).write_text(TEST_MODULE)
1332 1349
1333 1350 import importlib.abc
1334 1351 import importlib.util
1335 1352
1336 1353 class MyTempImporter(importlib.abc.MetaPathFinder, importlib.abc.SourceLoader):
1337 1354 def find_spec(self, fullname, path, target=None):
1338 1355 if fullname == "my_tmp":
1339 1356 return importlib.util.spec_from_loader(fullname, self)
1340 1357
1341 1358 def get_filename(self, fullname):
1342 1359 assert fullname == "my_tmp"
1343 1360 return fullpath
1344 1361
1345 1362 def get_data(self, path):
1346 1363 assert Path(path).samefile(fullpath)
1347 1364 return Path(fullpath).read_text()
1348 1365
1349 1366 sys.meta_path.insert(0, MyTempImporter())
1350 1367
1351 1368 with capture_output() as captured:
1352 1369 _ip.magic("run -m my_tmp")
1353 1370 _ip.run_cell("import my_tmp")
1354 1371
1355 1372 output = "Loaded my_tmp\nI just ran a script\nLoaded my_tmp\n"
1356 1373 assert output == captured.stdout
1357 1374
1358 1375 sys.meta_path.pop(0)
@@ -1,699 +1,700 b''
1 1 """IPython terminal interface using prompt_toolkit"""
2 2
3 3 import asyncio
4 4 import os
5 5 import sys
6 6 import warnings
7 7 from warnings import warn
8 8
9 from IPython.core.async_helpers import get_asyncio_loop
9 10 from IPython.core.interactiveshell import InteractiveShell, InteractiveShellABC
10 11 from IPython.utils import io
11 12 from IPython.utils.py3compat import input
12 13 from IPython.utils.terminal import toggle_set_term_title, set_term_title, restore_term_title
13 14 from IPython.utils.process import abbrev_cwd
14 15 from traitlets import (
15 16 Bool,
16 17 Unicode,
17 18 Dict,
18 19 Integer,
19 20 observe,
20 21 Instance,
21 22 Type,
22 23 default,
23 24 Enum,
24 25 Union,
25 26 Any,
26 27 validate,
27 28 Float,
28 29 )
29 30
30 31 from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
31 32 from prompt_toolkit.enums import DEFAULT_BUFFER, EditingMode
32 33 from prompt_toolkit.filters import (HasFocus, Condition, IsDone)
33 34 from prompt_toolkit.formatted_text import PygmentsTokens
34 35 from prompt_toolkit.history import InMemoryHistory
35 36 from prompt_toolkit.layout.processors import ConditionalProcessor, HighlightMatchingBracketProcessor
36 37 from prompt_toolkit.output import ColorDepth
37 38 from prompt_toolkit.patch_stdout import patch_stdout
38 39 from prompt_toolkit.shortcuts import PromptSession, CompleteStyle, print_formatted_text
39 40 from prompt_toolkit.styles import DynamicStyle, merge_styles
40 41 from prompt_toolkit.styles.pygments import style_from_pygments_cls, style_from_pygments_dict
41 42 from prompt_toolkit import __version__ as ptk_version
42 43
43 44 from pygments.styles import get_style_by_name
44 45 from pygments.style import Style
45 46 from pygments.token import Token
46 47
47 48 from .debugger import TerminalPdb, Pdb
48 49 from .magics import TerminalMagics
49 50 from .pt_inputhooks import get_inputhook_name_and_func
50 51 from .prompts import Prompts, ClassicPrompts, RichPromptDisplayHook
51 52 from .ptutils import IPythonPTCompleter, IPythonPTLexer
52 53 from .shortcuts import create_ipython_shortcuts
53 54
54 55 PTK3 = ptk_version.startswith('3.')
55 56
56 57
57 58 class _NoStyle(Style): pass
58 59
59 60
60 61
61 62 _style_overrides_light_bg = {
62 63 Token.Prompt: '#ansibrightblue',
63 64 Token.PromptNum: '#ansiblue bold',
64 65 Token.OutPrompt: '#ansibrightred',
65 66 Token.OutPromptNum: '#ansired bold',
66 67 }
67 68
68 69 _style_overrides_linux = {
69 70 Token.Prompt: '#ansibrightgreen',
70 71 Token.PromptNum: '#ansigreen bold',
71 72 Token.OutPrompt: '#ansibrightred',
72 73 Token.OutPromptNum: '#ansired bold',
73 74 }
74 75
75 76 def get_default_editor():
76 77 try:
77 78 return os.environ['EDITOR']
78 79 except KeyError:
79 80 pass
80 81 except UnicodeError:
81 82 warn("$EDITOR environment variable is not pure ASCII. Using platform "
82 83 "default editor.")
83 84
84 85 if os.name == 'posix':
85 86 return 'vi' # the only one guaranteed to be there!
86 87 else:
87 88 return 'notepad' # same in Windows!
88 89
89 90 # conservatively check for tty
90 91 # overridden streams can result in things like:
91 92 # - sys.stdin = None
92 93 # - no isatty method
93 94 for _name in ('stdin', 'stdout', 'stderr'):
94 95 _stream = getattr(sys, _name)
95 96 if not _stream or not hasattr(_stream, 'isatty') or not _stream.isatty():
96 97 _is_tty = False
97 98 break
98 99 else:
99 100 _is_tty = True
100 101
101 102
102 103 _use_simple_prompt = ('IPY_TEST_SIMPLE_PROMPT' in os.environ) or (not _is_tty)
103 104
104 105 def black_reformat_handler(text_before_cursor):
105 106 """
106 107 We do not need to protect against error,
107 108 this is taken care at a higher level where any reformat error is ignored.
108 109 Indeed we may call reformatting on incomplete code.
109 110 """
110 111 import black
111 112
112 113 formatted_text = black.format_str(text_before_cursor, mode=black.FileMode())
113 114 if not text_before_cursor.endswith("\n") and formatted_text.endswith("\n"):
114 115 formatted_text = formatted_text[:-1]
115 116 return formatted_text
116 117
117 118
118 119 class TerminalInteractiveShell(InteractiveShell):
119 120 mime_renderers = Dict().tag(config=True)
120 121
121 122 space_for_menu = Integer(6, help='Number of line at the bottom of the screen '
122 123 'to reserve for the tab completion menu, '
123 124 'search history, ...etc, the height of '
124 125 'these menus will at most this value. '
125 126 'Increase it is you prefer long and skinny '
126 127 'menus, decrease for short and wide.'
127 128 ).tag(config=True)
128 129
129 130 pt_app = None
130 131 debugger_history = None
131 132
132 133 debugger_history_file = Unicode(
133 134 "~/.pdbhistory", help="File in which to store and read history"
134 135 ).tag(config=True)
135 136
136 137 simple_prompt = Bool(_use_simple_prompt,
137 138 help="""Use `raw_input` for the REPL, without completion and prompt colors.
138 139
139 140 Useful when controlling IPython as a subprocess, and piping STDIN/OUT/ERR. Known usage are:
140 141 IPython own testing machinery, and emacs inferior-shell integration through elpy.
141 142
142 143 This mode default to `True` if the `IPY_TEST_SIMPLE_PROMPT`
143 144 environment variable is set, or the current terminal is not a tty."""
144 145 ).tag(config=True)
145 146
146 147 @property
147 148 def debugger_cls(self):
148 149 return Pdb if self.simple_prompt else TerminalPdb
149 150
150 151 confirm_exit = Bool(True,
151 152 help="""
152 153 Set to confirm when you try to exit IPython with an EOF (Control-D
153 154 in Unix, Control-Z/Enter in Windows). By typing 'exit' or 'quit',
154 155 you can force a direct exit without any confirmation.""",
155 156 ).tag(config=True)
156 157
157 158 editing_mode = Unicode('emacs',
158 159 help="Shortcut style to use at the prompt. 'vi' or 'emacs'.",
159 160 ).tag(config=True)
160 161
161 162 emacs_bindings_in_vi_insert_mode = Bool(
162 163 True,
163 164 help="Add shortcuts from 'emacs' insert mode to 'vi' insert mode.",
164 165 ).tag(config=True)
165 166
166 167 modal_cursor = Bool(
167 168 True,
168 169 help="""
169 170 Cursor shape changes depending on vi mode: beam in vi insert mode,
170 171 block in nav mode, underscore in replace mode.""",
171 172 ).tag(config=True)
172 173
173 174 ttimeoutlen = Float(
174 175 0.01,
175 176 help="""The time in milliseconds that is waited for a key code
176 177 to complete.""",
177 178 ).tag(config=True)
178 179
179 180 timeoutlen = Float(
180 181 0.5,
181 182 help="""The time in milliseconds that is waited for a mapped key
182 183 sequence to complete.""",
183 184 ).tag(config=True)
184 185
185 186 autoformatter = Unicode(
186 187 "black",
187 188 help="Autoformatter to reformat Terminal code. Can be `'black'` or `None`",
188 189 allow_none=True
189 190 ).tag(config=True)
190 191
191 192 auto_match = Bool(
192 193 False,
193 194 help="""
194 195 Automatically add/delete closing bracket or quote when opening bracket or quote is entered/deleted.
195 196 Brackets: (), [], {}
196 197 Quotes: '', \"\"
197 198 """,
198 199 ).tag(config=True)
199 200
200 201 mouse_support = Bool(False,
201 202 help="Enable mouse support in the prompt\n(Note: prevents selecting text with the mouse)"
202 203 ).tag(config=True)
203 204
204 205 # We don't load the list of styles for the help string, because loading
205 206 # Pygments plugins takes time and can cause unexpected errors.
206 207 highlighting_style = Union([Unicode('legacy'), Type(klass=Style)],
207 208 help="""The name or class of a Pygments style to use for syntax
208 209 highlighting. To see available styles, run `pygmentize -L styles`."""
209 210 ).tag(config=True)
210 211
211 212 @validate('editing_mode')
212 213 def _validate_editing_mode(self, proposal):
213 214 if proposal['value'].lower() == 'vim':
214 215 proposal['value']= 'vi'
215 216 elif proposal['value'].lower() == 'default':
216 217 proposal['value']= 'emacs'
217 218
218 219 if hasattr(EditingMode, proposal['value'].upper()):
219 220 return proposal['value'].lower()
220 221
221 222 return self.editing_mode
222 223
223 224
224 225 @observe('editing_mode')
225 226 def _editing_mode(self, change):
226 227 if self.pt_app:
227 228 self.pt_app.editing_mode = getattr(EditingMode, change.new.upper())
228 229
229 230 def _set_formatter(self, formatter):
230 231 if formatter is None:
231 232 self.reformat_handler = lambda x:x
232 233 elif formatter == 'black':
233 234 self.reformat_handler = black_reformat_handler
234 235 else:
235 236 raise ValueError
236 237
237 238 @observe("autoformatter")
238 239 def _autoformatter_changed(self, change):
239 240 formatter = change.new
240 241 self._set_formatter(formatter)
241 242
242 243 @observe('highlighting_style')
243 244 @observe('colors')
244 245 def _highlighting_style_changed(self, change):
245 246 self.refresh_style()
246 247
247 248 def refresh_style(self):
248 249 self._style = self._make_style_from_name_or_cls(self.highlighting_style)
249 250
250 251
251 252 highlighting_style_overrides = Dict(
252 253 help="Override highlighting format for specific tokens"
253 254 ).tag(config=True)
254 255
255 256 true_color = Bool(False,
256 257 help="""Use 24bit colors instead of 256 colors in prompt highlighting.
257 258 If your terminal supports true color, the following command should
258 259 print ``TRUECOLOR`` in orange::
259 260
260 261 printf \"\\x1b[38;2;255;100;0mTRUECOLOR\\x1b[0m\\n\"
261 262 """,
262 263 ).tag(config=True)
263 264
264 265 editor = Unicode(get_default_editor(),
265 266 help="Set the editor used by IPython (default to $EDITOR/vi/notepad)."
266 267 ).tag(config=True)
267 268
268 269 prompts_class = Type(Prompts, help='Class used to generate Prompt token for prompt_toolkit').tag(config=True)
269 270
270 271 prompts = Instance(Prompts)
271 272
272 273 @default('prompts')
273 274 def _prompts_default(self):
274 275 return self.prompts_class(self)
275 276
276 277 # @observe('prompts')
277 278 # def _(self, change):
278 279 # self._update_layout()
279 280
280 281 @default('displayhook_class')
281 282 def _displayhook_class_default(self):
282 283 return RichPromptDisplayHook
283 284
284 285 term_title = Bool(True,
285 286 help="Automatically set the terminal title"
286 287 ).tag(config=True)
287 288
288 289 term_title_format = Unicode("IPython: {cwd}",
289 290 help="Customize the terminal title format. This is a python format string. " +
290 291 "Available substitutions are: {cwd}."
291 292 ).tag(config=True)
292 293
293 294 display_completions = Enum(('column', 'multicolumn','readlinelike'),
294 295 help= ( "Options for displaying tab completions, 'column', 'multicolumn', and "
295 296 "'readlinelike'. These options are for `prompt_toolkit`, see "
296 297 "`prompt_toolkit` documentation for more information."
297 298 ),
298 299 default_value='multicolumn').tag(config=True)
299 300
300 301 highlight_matching_brackets = Bool(True,
301 302 help="Highlight matching brackets.",
302 303 ).tag(config=True)
303 304
304 305 extra_open_editor_shortcuts = Bool(False,
305 306 help="Enable vi (v) or Emacs (C-X C-E) shortcuts to open an external editor. "
306 307 "This is in addition to the F2 binding, which is always enabled."
307 308 ).tag(config=True)
308 309
309 310 handle_return = Any(None,
310 311 help="Provide an alternative handler to be called when the user presses "
311 312 "Return. This is an advanced option intended for debugging, which "
312 313 "may be changed or removed in later releases."
313 314 ).tag(config=True)
314 315
315 316 enable_history_search = Bool(True,
316 317 help="Allows to enable/disable the prompt toolkit history search"
317 318 ).tag(config=True)
318 319
319 320 prompt_includes_vi_mode = Bool(True,
320 321 help="Display the current vi mode (when using vi editing mode)."
321 322 ).tag(config=True)
322 323
323 324 @observe('term_title')
324 325 def init_term_title(self, change=None):
325 326 # Enable or disable the terminal title.
326 327 if self.term_title:
327 328 toggle_set_term_title(True)
328 329 set_term_title(self.term_title_format.format(cwd=abbrev_cwd()))
329 330 else:
330 331 toggle_set_term_title(False)
331 332
332 333 def restore_term_title(self):
333 334 if self.term_title:
334 335 restore_term_title()
335 336
336 337 def init_display_formatter(self):
337 338 super(TerminalInteractiveShell, self).init_display_formatter()
338 339 # terminal only supports plain text
339 340 self.display_formatter.active_types = ["text/plain"]
340 341
341 342 def init_prompt_toolkit_cli(self):
342 343 if self.simple_prompt:
343 344 # Fall back to plain non-interactive output for tests.
344 345 # This is very limited.
345 346 def prompt():
346 347 prompt_text = "".join(x[1] for x in self.prompts.in_prompt_tokens())
347 348 lines = [input(prompt_text)]
348 349 prompt_continuation = "".join(x[1] for x in self.prompts.continuation_prompt_tokens())
349 350 while self.check_complete('\n'.join(lines))[0] == 'incomplete':
350 351 lines.append( input(prompt_continuation) )
351 352 return '\n'.join(lines)
352 353 self.prompt_for_code = prompt
353 354 return
354 355
355 356 # Set up keyboard shortcuts
356 357 key_bindings = create_ipython_shortcuts(self)
357 358
358 359 # Pre-populate history from IPython's history database
359 360 history = InMemoryHistory()
360 361 last_cell = u""
361 362 for __, ___, cell in self.history_manager.get_tail(self.history_load_length,
362 363 include_latest=True):
363 364 # Ignore blank lines and consecutive duplicates
364 365 cell = cell.rstrip()
365 366 if cell and (cell != last_cell):
366 367 history.append_string(cell)
367 368 last_cell = cell
368 369
369 370 self._style = self._make_style_from_name_or_cls(self.highlighting_style)
370 371 self.style = DynamicStyle(lambda: self._style)
371 372
372 373 editing_mode = getattr(EditingMode, self.editing_mode.upper())
373 374
374 375 self.pt_loop = asyncio.new_event_loop()
375 376 self.pt_app = PromptSession(
376 377 auto_suggest=AutoSuggestFromHistory(),
377 378 editing_mode=editing_mode,
378 379 key_bindings=key_bindings,
379 380 history=history,
380 381 completer=IPythonPTCompleter(shell=self),
381 382 enable_history_search=self.enable_history_search,
382 383 style=self.style,
383 384 include_default_pygments_style=False,
384 385 mouse_support=self.mouse_support,
385 386 enable_open_in_editor=self.extra_open_editor_shortcuts,
386 387 color_depth=self.color_depth,
387 388 tempfile_suffix=".py",
388 389 **self._extra_prompt_options()
389 390 )
390 391
391 392 def _make_style_from_name_or_cls(self, name_or_cls):
392 393 """
393 394 Small wrapper that make an IPython compatible style from a style name
394 395
395 396 We need that to add style for prompt ... etc.
396 397 """
397 398 style_overrides = {}
398 399 if name_or_cls == 'legacy':
399 400 legacy = self.colors.lower()
400 401 if legacy == 'linux':
401 402 style_cls = get_style_by_name('monokai')
402 403 style_overrides = _style_overrides_linux
403 404 elif legacy == 'lightbg':
404 405 style_overrides = _style_overrides_light_bg
405 406 style_cls = get_style_by_name('pastie')
406 407 elif legacy == 'neutral':
407 408 # The default theme needs to be visible on both a dark background
408 409 # and a light background, because we can't tell what the terminal
409 410 # looks like. These tweaks to the default theme help with that.
410 411 style_cls = get_style_by_name('default')
411 412 style_overrides.update({
412 413 Token.Number: '#ansigreen',
413 414 Token.Operator: 'noinherit',
414 415 Token.String: '#ansiyellow',
415 416 Token.Name.Function: '#ansiblue',
416 417 Token.Name.Class: 'bold #ansiblue',
417 418 Token.Name.Namespace: 'bold #ansiblue',
418 419 Token.Name.Variable.Magic: '#ansiblue',
419 420 Token.Prompt: '#ansigreen',
420 421 Token.PromptNum: '#ansibrightgreen bold',
421 422 Token.OutPrompt: '#ansired',
422 423 Token.OutPromptNum: '#ansibrightred bold',
423 424 })
424 425
425 426 # Hack: Due to limited color support on the Windows console
426 427 # the prompt colors will be wrong without this
427 428 if os.name == 'nt':
428 429 style_overrides.update({
429 430 Token.Prompt: '#ansidarkgreen',
430 431 Token.PromptNum: '#ansigreen bold',
431 432 Token.OutPrompt: '#ansidarkred',
432 433 Token.OutPromptNum: '#ansired bold',
433 434 })
434 435 elif legacy =='nocolor':
435 436 style_cls=_NoStyle
436 437 style_overrides = {}
437 438 else :
438 439 raise ValueError('Got unknown colors: ', legacy)
439 440 else :
440 441 if isinstance(name_or_cls, str):
441 442 style_cls = get_style_by_name(name_or_cls)
442 443 else:
443 444 style_cls = name_or_cls
444 445 style_overrides = {
445 446 Token.Prompt: '#ansigreen',
446 447 Token.PromptNum: '#ansibrightgreen bold',
447 448 Token.OutPrompt: '#ansired',
448 449 Token.OutPromptNum: '#ansibrightred bold',
449 450 }
450 451 style_overrides.update(self.highlighting_style_overrides)
451 452 style = merge_styles([
452 453 style_from_pygments_cls(style_cls),
453 454 style_from_pygments_dict(style_overrides),
454 455 ])
455 456
456 457 return style
457 458
458 459 @property
459 460 def pt_complete_style(self):
460 461 return {
461 462 'multicolumn': CompleteStyle.MULTI_COLUMN,
462 463 'column': CompleteStyle.COLUMN,
463 464 'readlinelike': CompleteStyle.READLINE_LIKE,
464 465 }[self.display_completions]
465 466
466 467 @property
467 468 def color_depth(self):
468 469 return (ColorDepth.TRUE_COLOR if self.true_color else None)
469 470
470 471 def _extra_prompt_options(self):
471 472 """
472 473 Return the current layout option for the current Terminal InteractiveShell
473 474 """
474 475 def get_message():
475 476 return PygmentsTokens(self.prompts.in_prompt_tokens())
476 477
477 478 if self.editing_mode == 'emacs':
478 479 # with emacs mode the prompt is (usually) static, so we call only
479 480 # the function once. With VI mode it can toggle between [ins] and
480 481 # [nor] so we can't precompute.
481 482 # here I'm going to favor the default keybinding which almost
482 483 # everybody uses to decrease CPU usage.
483 484 # if we have issues with users with custom Prompts we can see how to
484 485 # work around this.
485 486 get_message = get_message()
486 487
487 488 options = {
488 489 'complete_in_thread': False,
489 490 'lexer':IPythonPTLexer(),
490 491 'reserve_space_for_menu':self.space_for_menu,
491 492 'message': get_message,
492 493 'prompt_continuation': (
493 494 lambda width, lineno, is_soft_wrap:
494 495 PygmentsTokens(self.prompts.continuation_prompt_tokens(width))),
495 496 'multiline': True,
496 497 'complete_style': self.pt_complete_style,
497 498
498 499 # Highlight matching brackets, but only when this setting is
499 500 # enabled, and only when the DEFAULT_BUFFER has the focus.
500 501 'input_processors': [ConditionalProcessor(
501 502 processor=HighlightMatchingBracketProcessor(chars='[](){}'),
502 503 filter=HasFocus(DEFAULT_BUFFER) & ~IsDone() &
503 504 Condition(lambda: self.highlight_matching_brackets))],
504 505 }
505 506 if not PTK3:
506 507 options['inputhook'] = self.inputhook
507 508
508 509 return options
509 510
510 511 def prompt_for_code(self):
511 512 if self.rl_next_input:
512 513 default = self.rl_next_input
513 514 self.rl_next_input = None
514 515 else:
515 516 default = ''
516 517
517 518 # In order to make sure that asyncio code written in the
518 519 # interactive shell doesn't interfere with the prompt, we run the
519 520 # prompt in a different event loop.
520 521 # If we don't do this, people could spawn coroutine with a
521 522 # while/true inside which will freeze the prompt.
522 523
523 524 policy = asyncio.get_event_loop_policy()
524 try:
525 old_loop = policy.get_event_loop()
526 except RuntimeError:
527 # This happens when the the event loop is closed,
528 # e.g. by calling `asyncio.run()`.
529 old_loop = None
530
531 policy.set_event_loop(self.pt_loop)
525 old_loop = get_asyncio_loop()
526
527 # FIXME: prompt_toolkit is using the deprecated `asyncio.get_event_loop`
528 # to get the current event loop.
529 # This will probably be replaced by an attribute or input argument,
530 # at which point we can stop calling the soon-to-be-deprecated `set_event_loop` here.
531 if old_loop is not self.pt_loop:
532 policy.set_event_loop(self.pt_loop)
532 533 try:
533 534 with patch_stdout(raw=True):
534 535 text = self.pt_app.prompt(
535 536 default=default,
536 537 **self._extra_prompt_options())
537 538 finally:
538 539 # Restore the original event loop.
539 if old_loop is not None:
540 if old_loop is not None and old_loop is not self.pt_loop:
540 541 policy.set_event_loop(old_loop)
541 542
542 543 return text
543 544
544 545 def enable_win_unicode_console(self):
545 546 # Since IPython 7.10 doesn't support python < 3.6 and PEP 528, Python uses the unicode APIs for the Windows
546 547 # console by default, so WUC shouldn't be needed.
547 548 from warnings import warn
548 549 warn("`enable_win_unicode_console` is deprecated since IPython 7.10, does not do anything and will be removed in the future",
549 550 DeprecationWarning,
550 551 stacklevel=2)
551 552
552 553 def init_io(self):
553 554 if sys.platform not in {'win32', 'cli'}:
554 555 return
555 556
556 557 import colorama
557 558 colorama.init()
558 559
559 560 def init_magics(self):
560 561 super(TerminalInteractiveShell, self).init_magics()
561 562 self.register_magics(TerminalMagics)
562 563
563 564 def init_alias(self):
564 565 # The parent class defines aliases that can be safely used with any
565 566 # frontend.
566 567 super(TerminalInteractiveShell, self).init_alias()
567 568
568 569 # Now define aliases that only make sense on the terminal, because they
569 570 # need direct access to the console in a way that we can't emulate in
570 571 # GUI or web frontend
571 572 if os.name == 'posix':
572 573 for cmd in ('clear', 'more', 'less', 'man'):
573 574 self.alias_manager.soft_define_alias(cmd, cmd)
574 575
575 576
576 577 def __init__(self, *args, **kwargs):
577 578 super(TerminalInteractiveShell, self).__init__(*args, **kwargs)
578 579 self.init_prompt_toolkit_cli()
579 580 self.init_term_title()
580 581 self.keep_running = True
581 582 self._set_formatter(self.autoformatter)
582 583
583 584
584 585 def ask_exit(self):
585 586 self.keep_running = False
586 587
587 588 rl_next_input = None
588 589
589 590 def interact(self):
590 591 self.keep_running = True
591 592 while self.keep_running:
592 593 print(self.separate_in, end='')
593 594
594 595 try:
595 596 code = self.prompt_for_code()
596 597 except EOFError:
597 598 if (not self.confirm_exit) \
598 599 or self.ask_yes_no('Do you really want to exit ([y]/n)?','y','n'):
599 600 self.ask_exit()
600 601
601 602 else:
602 603 if code:
603 604 self.run_cell(code, store_history=True)
604 605
605 606 def mainloop(self):
606 607 # An extra layer of protection in case someone mashing Ctrl-C breaks
607 608 # out of our internal code.
608 609 while True:
609 610 try:
610 611 self.interact()
611 612 break
612 613 except KeyboardInterrupt as e:
613 614 print("\n%s escaped interact()\n" % type(e).__name__)
614 615 finally:
615 616 # An interrupt during the eventloop will mess up the
616 617 # internal state of the prompt_toolkit library.
617 618 # Stopping the eventloop fixes this, see
618 619 # https://github.com/ipython/ipython/pull/9867
619 620 if hasattr(self, '_eventloop'):
620 621 self._eventloop.stop()
621 622
622 623 self.restore_term_title()
623 624
624 625 # try to call some at-exit operation optimistically as some things can't
625 626 # be done during interpreter shutdown. this is technically inaccurate as
626 627 # this make mainlool not re-callable, but that should be a rare if not
627 628 # in existent use case.
628 629
629 630 self._atexit_once()
630 631
631 632
632 633 _inputhook = None
633 634 def inputhook(self, context):
634 635 if self._inputhook is not None:
635 636 self._inputhook(context)
636 637
637 638 active_eventloop = None
638 639 def enable_gui(self, gui=None):
639 640 if gui and (gui != 'inline') :
640 641 self.active_eventloop, self._inputhook =\
641 642 get_inputhook_name_and_func(gui)
642 643 else:
643 644 self.active_eventloop = self._inputhook = None
644 645
645 646 # For prompt_toolkit 3.0. We have to create an asyncio event loop with
646 647 # this inputhook.
647 648 if PTK3:
648 649 import asyncio
649 650 from prompt_toolkit.eventloop import new_eventloop_with_inputhook
650 651
651 652 if gui == 'asyncio':
652 653 # When we integrate the asyncio event loop, run the UI in the
653 654 # same event loop as the rest of the code. don't use an actual
654 655 # input hook. (Asyncio is not made for nesting event loops.)
655 self.pt_loop = asyncio.get_event_loop_policy().get_event_loop()
656 self.pt_loop = get_asyncio_loop()
656 657
657 658 elif self._inputhook:
658 659 # If an inputhook was set, create a new asyncio event loop with
659 660 # this inputhook for the prompt.
660 661 self.pt_loop = new_eventloop_with_inputhook(self._inputhook)
661 662 else:
662 663 # When there's no inputhook, run the prompt in a separate
663 664 # asyncio event loop.
664 665 self.pt_loop = asyncio.new_event_loop()
665 666
666 667 # Run !system commands directly, not through pipes, so terminal programs
667 668 # work correctly.
668 669 system = InteractiveShell.system_raw
669 670
670 671 def auto_rewrite_input(self, cmd):
671 672 """Overridden from the parent class to use fancy rewriting prompt"""
672 673 if not self.show_rewritten_input:
673 674 return
674 675
675 676 tokens = self.prompts.rewrite_prompt_tokens()
676 677 if self.pt_app:
677 678 print_formatted_text(PygmentsTokens(tokens), end='',
678 679 style=self.pt_app.app.style)
679 680 print(cmd)
680 681 else:
681 682 prompt = ''.join(s for t, s in tokens)
682 683 print(prompt, cmd, sep='')
683 684
684 685 _prompts_before = None
685 686 def switch_doctest_mode(self, mode):
686 687 """Switch prompts to classic for %doctest_mode"""
687 688 if mode:
688 689 self._prompts_before = self.prompts
689 690 self.prompts = ClassicPrompts(self)
690 691 elif self._prompts_before:
691 692 self.prompts = self._prompts_before
692 693 self._prompts_before = None
693 694 # self._update_layout()
694 695
695 696
696 697 InteractiveShellABC.register(TerminalInteractiveShell)
697 698
698 699 if __name__ == '__main__':
699 700 TerminalInteractiveShell.instance().interact()
@@ -1,64 +1,63 b''
1 1 """
2 2 Inputhook for running the original asyncio event loop while we're waiting for
3 3 input.
4 4
5 5 By default, in IPython, we run the prompt with a different asyncio event loop,
6 6 because otherwise we risk that people are freezing the prompt by scheduling bad
7 7 coroutines. E.g., a coroutine that does a while/true and never yield back
8 8 control to the loop. We can't cancel that.
9 9
10 10 However, sometimes we want the asyncio loop to keep running while waiting for
11 11 a prompt.
12 12
13 13 The following example will print the numbers from 1 to 10 above the prompt,
14 14 while we are waiting for input. (This works also because we use
15 15 prompt_toolkit`s `patch_stdout`)::
16 16
17 17 In [1]: import asyncio
18 18
19 19 In [2]: %gui asyncio
20 20
21 21 In [3]: async def f():
22 22 ...: for i in range(10):
23 23 ...: await asyncio.sleep(1)
24 24 ...: print(i)
25 25
26 26
27 27 In [4]: asyncio.ensure_future(f())
28 28
29 29 """
30 import asyncio
31 30 from prompt_toolkit import __version__ as ptk_version
32 31
33 PTK3 = ptk_version.startswith('3.')
32 from IPython.core.async_helpers import get_asyncio_loop
34 33
34 PTK3 = ptk_version.startswith('3.')
35 35
36 # Keep reference to the original asyncio loop, because getting the event loop
37 # within the input hook would return the other loop.
38 loop = asyncio.get_event_loop_policy().get_event_loop()
39 36
40 37
41 38 def inputhook(context):
42 39 """
43 40 Inputhook for asyncio event loop integration.
44 41 """
45 42 # For prompt_toolkit 3.0, this input hook literally doesn't do anything.
46 43 # The event loop integration here is implemented in `interactiveshell.py`
47 44 # by running the prompt itself in the current asyncio loop. The main reason
48 45 # for this is that nesting asyncio event loops is unreliable.
49 46 if PTK3:
50 47 return
51 48
52 49 # For prompt_toolkit 2.0, we can run the current asyncio event loop,
53 50 # because prompt_toolkit 2.0 uses a different event loop internally.
54 51
52 # get the persistent asyncio event loop
53 loop = get_asyncio_loop()
54
55 55 def stop():
56 56 loop.stop()
57 57
58 58 fileno = context.fileno()
59 59 loop.add_reader(fileno, stop)
60 60 try:
61 61 loop.run_forever()
62 62 finally:
63 63 loop.remove_reader(fileno)
64
General Comments 0
You need to be logged in to leave comments. Login now