##// END OF EJS Templates
Try to fix some of current failures....
Matthias Bussonnier -
Show More
@@ -1,39 +1,40 b''
1 1 # This workflow will install Python dependencies, run tests and lint with a variety of Python versions
2 2 # For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions
3 3
4 4 name: Python package
5 5
6 6 on:
7 7 push:
8 8 branches: [ master, 7.x ]
9 9 pull_request:
10 10 branches: [ master, 7.x ]
11 11
12 12 jobs:
13 13 build:
14 14
15 15 runs-on: ubuntu-latest
16 timeout-minutes: 10
16 17 strategy:
17 18 matrix:
18 19 python-version: [3.8]
19 20
20 21 steps:
21 22 - uses: actions/checkout@v2
22 23 with:
23 24 fetch-depth: 0
24 25 - name: Set up Python ${{ matrix.python-version }}
25 26 uses: actions/setup-python@v2
26 27 with:
27 28 python-version: ${{ matrix.python-version }}
28 29 - name: Install dependencies
29 30 run: |
30 31 python -m pip install --upgrade pip
31 32 pip install darker isort
32 33 - name: Lint with darker
33 34 run: |
34 35 darker -r 60625f241f298b5039cb2debc365db38aa7bb522 --check --diff . || (
35 36 echo "Changes need auto-formatting. Run:"
36 37 echo " darker -r 60625f241f298b5039cb2debc365db38aa7bb522"
37 38 echo "then commit and push changes to fix."
38 39 exit 1
39 40 )
@@ -1,50 +1,50 b''
1 1 name: Run tests
2 2
3 3 on:
4 4 push:
5 5 pull_request:
6 6 # Run weekly on Monday at 1:23 UTC
7 7 schedule:
8 8 - cron: '23 1 * * 1'
9 9 workflow_dispatch:
10 10
11 11
12 12 jobs:
13 13 test:
14 14 runs-on: ${{ matrix.os }}
15 15 strategy:
16 16 matrix:
17 17 os: [ubuntu-latest]
18 18 python-version: ["3.7", "3.8", "3.9"]
19 19 # Test all on ubuntu, test ends on macos
20 20 include:
21 21 - os: macos-latest
22 22 python-version: "3.7"
23 23 - os: macos-latest
24 24 python-version: "3.9"
25 25
26 26 steps:
27 27 - uses: actions/checkout@v2
28 28 - name: Set up Python ${{ matrix.python-version }}
29 29 uses: actions/setup-python@v2
30 30 with:
31 31 python-version: ${{ matrix.python-version }}
32 32 - name: Install and update Python dependencies
33 33 run: |
34 34 python -m pip install --upgrade pip setuptools wheel
35 35 python -m pip install --upgrade -e file://$PWD#egg=ipython[test]
36 36 python -m pip install --upgrade --upgrade-strategy eager trio curio
37 37 python -m pip install --upgrade pytest pytest-trio 'matplotlib!=3.2.0'
38 38 python -m pip install --upgrade check-manifest pytest-cov anyio
39 39 - name: Check manifest
40 40 run: check-manifest
41 41 - name: iptest
42 42 run: |
43 43 cd /tmp && iptest --coverage xml && cd -
44 44 cp /tmp/ipy_coverage.xml ./
45 45 cp /tmp/.coverage ./
46 46 - name: pytest
47 47 run: |
48 pytest
48 pytest -v
49 49 - name: Upload coverage to Codecov
50 50 uses: codecov/codecov-action@v1
@@ -1,321 +1,354 b''
1 1 """Magic functions for running cells in various scripts."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 import asyncio
7 import atexit
6 8 import errno
9 import functools
7 10 import os
8 import sys
9 11 import signal
12 import sys
10 13 import time
11 import asyncio
12 import atexit
13
14 from asyncio import SafeChildWatcher
15 from contextlib import contextmanager
14 16 from subprocess import CalledProcessError
15 17
18 from traitlets import Dict, List, default
19
16 20 from IPython.core import magic_arguments
17 from IPython.core.magic import (
18 Magics, magics_class, line_magic, cell_magic
19 )
21 from IPython.core.magic import Magics, cell_magic, line_magic, magics_class
20 22 from IPython.lib.backgroundjobs import BackgroundJobManager
21 23 from IPython.utils.process import arg_split
22 from traitlets import List, Dict, default
23
24 24
25 25 #-----------------------------------------------------------------------------
26 26 # Magic implementation classes
27 27 #-----------------------------------------------------------------------------
28 28
29 29 def script_args(f):
30 30 """single decorator for adding script args"""
31 31 args = [
32 32 magic_arguments.argument(
33 33 '--out', type=str,
34 34 help="""The variable in which to store stdout from the script.
35 35 If the script is backgrounded, this will be the stdout *pipe*,
36 36 instead of the stderr text itself and will not be auto closed.
37 37 """
38 38 ),
39 39 magic_arguments.argument(
40 40 '--err', type=str,
41 41 help="""The variable in which to store stderr from the script.
42 42 If the script is backgrounded, this will be the stderr *pipe*,
43 43 instead of the stderr text itself and will not be autoclosed.
44 44 """
45 45 ),
46 46 magic_arguments.argument(
47 47 '--bg', action="store_true",
48 48 help="""Whether to run the script in the background.
49 49 If given, the only way to see the output of the command is
50 50 with --out/err.
51 51 """
52 52 ),
53 53 magic_arguments.argument(
54 54 '--proc', type=str,
55 55 help="""The variable in which to store Popen instance.
56 56 This is used only when --bg option is given.
57 57 """
58 58 ),
59 59 magic_arguments.argument(
60 60 '--no-raise-error', action="store_false", dest='raise_error',
61 61 help="""Whether you should raise an error message in addition to
62 62 a stream on stderr if you get a nonzero exit code.
63 63 """
64 64 )
65 65 ]
66 66 for arg in args:
67 67 f = arg(f)
68 68 return f
69 69
70
71 @contextmanager
72 def safe_watcher():
73 if sys.platform == "win32":
74 yield
75 return
76
77 policy = asyncio.get_event_loop_policy()
78 old_watcher = policy.get_child_watcher()
79 if isinstance(old_watcher, SafeChildWatcher):
80 yield
81 return
82
83 loop = asyncio.get_event_loop()
84 try:
85 watcher = asyncio.SafeChildWatcher()
86 watcher.attach_loop(loop)
87 policy.set_child_watcher(watcher)
88 yield
89 finally:
90 watcher.close()
91 policy.set_child_watcher(old_watcher)
92
93
94 def dec_safe_watcher(fun):
95 @functools.wraps(fun)
96 def _inner(*args, **kwargs):
97 with safe_watcher():
98 return fun(*args, **kwargs)
99
100 return _inner
101
102
70 103 @magics_class
71 104 class ScriptMagics(Magics):
72 105 """Magics for talking to scripts
73 106
74 107 This defines a base `%%script` cell magic for running a cell
75 108 with a program in a subprocess, and registers a few top-level
76 109 magics that call %%script with common interpreters.
77 110 """
78 111 script_magics = List(
79 112 help="""Extra script cell magics to define
80 113
81 114 This generates simple wrappers of `%%script foo` as `%%foo`.
82 115
83 116 If you want to add script magics that aren't on your path,
84 117 specify them in script_paths
85 118 """,
86 119 ).tag(config=True)
87 120 @default('script_magics')
88 121 def _script_magics_default(self):
89 122 """default to a common list of programs"""
90 123
91 124 defaults = [
92 125 'sh',
93 126 'bash',
94 127 'perl',
95 128 'ruby',
96 129 'python',
97 130 'python2',
98 131 'python3',
99 132 'pypy',
100 133 ]
101 134 if os.name == 'nt':
102 135 defaults.extend([
103 136 'cmd',
104 137 ])
105 138
106 139 return defaults
107 140
108 141 script_paths = Dict(
109 142 help="""Dict mapping short 'ruby' names to full paths, such as '/opt/secret/bin/ruby'
110 143
111 144 Only necessary for items in script_magics where the default path will not
112 145 find the right interpreter.
113 146 """
114 147 ).tag(config=True)
115 148
116 149 def __init__(self, shell=None):
117 150 super(ScriptMagics, self).__init__(shell=shell)
118 151 self._generate_script_magics()
119 152 self.job_manager = BackgroundJobManager()
120 153 self.bg_processes = []
121 154 atexit.register(self.kill_bg_processes)
122 155
123 156 def __del__(self):
124 157 self.kill_bg_processes()
125 158
126 159 def _generate_script_magics(self):
127 160 cell_magics = self.magics['cell']
128 161 for name in self.script_magics:
129 162 cell_magics[name] = self._make_script_magic(name)
130 163
131 164 def _make_script_magic(self, name):
132 165 """make a named magic, that calls %%script with a particular program"""
133 166 # expand to explicit path if necessary:
134 167 script = self.script_paths.get(name, name)
135 168
136 169 @magic_arguments.magic_arguments()
137 170 @script_args
138 171 def named_script_magic(line, cell):
139 172 # if line, add it as cl-flags
140 173 if line:
141 174 line = "%s %s" % (script, line)
142 175 else:
143 176 line = script
144 177 return self.shebang(line, cell)
145 178
146 179 # write a basic docstring:
147 180 named_script_magic.__doc__ = \
148 181 """%%{name} script magic
149 182
150 183 Run cells with {script} in a subprocess.
151 184
152 185 This is a shortcut for `%%script {script}`
153 186 """.format(**locals())
154 187
155 188 return named_script_magic
156 189
157 190 @magic_arguments.magic_arguments()
158 191 @script_args
159 192 @cell_magic("script")
193 @dec_safe_watcher
160 194 def shebang(self, line, cell):
161 195 """Run a cell via a shell command
162 196
163 197 The `%%script` line is like the #! line of script,
164 198 specifying a program (bash, perl, ruby, etc.) with which to run.
165 199
166 200 The rest of the cell is run by that program.
167 201
168 202 Examples
169 203 --------
170 204 ::
171 205
172 206 In [1]: %%script bash
173 207 ...: for i in 1 2 3; do
174 208 ...: echo $i
175 209 ...: done
176 210 1
177 211 2
178 212 3
179 213 """
180 214
181 215 async def _handle_stream(stream, stream_arg, file_object):
182 216 while True:
183 217 line = (await stream.readline()).decode("utf8")
184 218 if not line:
185 219 break
186 220 if stream_arg:
187 221 self.shell.user_ns[stream_arg] = line
188 222 else:
189 223 file_object.write(line)
190 224 file_object.flush()
191 225
192 226 async def _stream_communicate(process, cell):
193 227 process.stdin.write(cell)
194 228 process.stdin.close()
195 229 stdout_task = asyncio.create_task(
196 230 _handle_stream(process.stdout, args.out, sys.stdout)
197 231 )
198 232 stderr_task = asyncio.create_task(
199 233 _handle_stream(process.stderr, args.err, sys.stderr)
200 234 )
201 235 await asyncio.wait([stdout_task, stderr_task])
202 236 await process.wait()
203 237
204 238 if sys.platform.startswith("win"):
205 239 asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
206 240 loop = asyncio.get_event_loop()
207
208 241 argv = arg_split(line, posix=not sys.platform.startswith("win"))
209 242 args, cmd = self.shebang.parser.parse_known_args(argv)
210 243 try:
211 244 p = loop.run_until_complete(
212 245 asyncio.create_subprocess_exec(
213 246 *cmd,
214 247 stdout=asyncio.subprocess.PIPE,
215 248 stderr=asyncio.subprocess.PIPE,
216 249 stdin=asyncio.subprocess.PIPE,
217 250 )
218 251 )
219 252 except OSError as e:
220 253 if e.errno == errno.ENOENT:
221 254 print("Couldn't find program: %r" % cmd[0])
222 255 return
223 256 else:
224 257 raise
225 258
226 259 if not cell.endswith('\n'):
227 260 cell += '\n'
228 261 cell = cell.encode('utf8', 'replace')
229 262 if args.bg:
230 263 self.bg_processes.append(p)
231 264 self._gc_bg_processes()
232 265 to_close = []
233 266 if args.out:
234 267 self.shell.user_ns[args.out] = p.stdout
235 268 else:
236 269 to_close.append(p.stdout)
237 270 if args.err:
238 271 self.shell.user_ns[args.err] = p.stderr
239 272 else:
240 273 to_close.append(p.stderr)
241 274 self.job_manager.new(self._run_script, p, cell, to_close, daemon=True)
242 275 if args.proc:
243 276 self.shell.user_ns[args.proc] = p
244 277 return
245 278
246 279 try:
247 280 loop.run_until_complete(_stream_communicate(p, cell))
248 281 except KeyboardInterrupt:
249 282 try:
250 283 p.send_signal(signal.SIGINT)
251 284 time.sleep(0.1)
252 285 if p.returncode is not None:
253 286 print("Process is interrupted.")
254 287 return
255 288 p.terminate()
256 289 time.sleep(0.1)
257 290 if p.returncode is not None:
258 291 print("Process is terminated.")
259 292 return
260 293 p.kill()
261 294 print("Process is killed.")
262 295 except OSError:
263 296 pass
264 297 except Exception as e:
265 298 print("Error while terminating subprocess (pid=%i): %s" % (p.pid, e))
266 299 return
267 300 if args.raise_error and p.returncode!=0:
268 301 # If we get here and p.returncode is still None, we must have
269 302 # killed it but not yet seen its return code. We don't wait for it,
270 303 # in case it's stuck in uninterruptible sleep. -9 = SIGKILL
271 304 rc = p.returncode or -9
272 305 raise CalledProcessError(rc, cell)
273 306
274 307 def _run_script(self, p, cell, to_close):
275 308 """callback for running the script in the background"""
276 309 p.stdin.write(cell)
277 310 p.stdin.close()
278 311 for s in to_close:
279 312 s.close()
280 313 p.wait()
281 314
282 315 @line_magic("killbgscripts")
283 316 def killbgscripts(self, _nouse_=''):
284 317 """Kill all BG processes started by %%script and its family."""
285 318 self.kill_bg_processes()
286 319 print("All background processes were killed.")
287 320
288 321 def kill_bg_processes(self):
289 322 """Kill all BG processes which are still running."""
290 323 if not self.bg_processes:
291 324 return
292 325 for p in self.bg_processes:
293 326 if p.returncode is None:
294 327 try:
295 328 p.send_signal(signal.SIGINT)
296 329 except:
297 330 pass
298 331 time.sleep(0.1)
299 332 self._gc_bg_processes()
300 333 if not self.bg_processes:
301 334 return
302 335 for p in self.bg_processes:
303 336 if p.returncode is None:
304 337 try:
305 338 p.terminate()
306 339 except:
307 340 pass
308 341 time.sleep(0.1)
309 342 self._gc_bg_processes()
310 343 if not self.bg_processes:
311 344 return
312 345 for p in self.bg_processes:
313 346 if p.returncode is None:
314 347 try:
315 348 p.kill()
316 349 except:
317 350 pass
318 351 self._gc_bg_processes()
319 352
320 353 def _gc_bg_processes(self):
321 354 self.bg_processes = [p for p in self.bg_processes if p.returncode is None]
@@ -1,1302 +1,1321 b''
1 1 # -*- coding: utf-8 -*-
2 2 """Tests for various magic functions.
3 3
4 4 Needs to be run by nose (to make ipython session available).
5 5 """
6 6
7 import asyncio
7 8 import io
8 9 import os
9 10 import re
11 import shlex
10 12 import sys
11 13 import warnings
12 from textwrap import dedent
13 from unittest import TestCase
14 from unittest import mock
15 14 from importlib import invalidate_caches
16 15 from io import StringIO
17 16 from pathlib import Path
17 from textwrap import dedent
18 from unittest import TestCase, mock
18 19
19 20 import nose.tools as nt
20 21
21 import shlex
22
23 22 from IPython import get_ipython
24 23 from IPython.core import magic
25 24 from IPython.core.error import UsageError
26 from IPython.core.magic import (Magics, magics_class, line_magic,
27 cell_magic,
28 register_line_magic, register_cell_magic)
29 from IPython.core.magics import execution, script, code, logging, osm
25 from IPython.core.magic import (
26 Magics,
27 cell_magic,
28 line_magic,
29 magics_class,
30 register_cell_magic,
31 register_line_magic,
32 )
33 from IPython.core.magics import code, execution, logging, osm, script
30 34 from IPython.testing import decorators as dec
31 35 from IPython.testing import tools as tt
32 36 from IPython.utils.io import capture_output
33 from IPython.utils.tempdir import (TemporaryDirectory,
34 TemporaryWorkingDirectory)
35 37 from IPython.utils.process import find_cmd
36 from .test_debugger import PdbTestInput
38 from IPython.utils.tempdir import TemporaryDirectory, TemporaryWorkingDirectory
37 39
38 import pytest
40 from .test_debugger import PdbTestInput
39 41
40 42
41 43 @magic.magics_class
42 44 class DummyMagics(magic.Magics): pass
43 45
44 46 def test_extract_code_ranges():
45 47 instr = "1 3 5-6 7-9 10:15 17: :10 10- -13 :"
46 48 expected = [(0, 1),
47 49 (2, 3),
48 50 (4, 6),
49 51 (6, 9),
50 52 (9, 14),
51 53 (16, None),
52 54 (None, 9),
53 55 (9, None),
54 56 (None, 13),
55 57 (None, None)]
56 58 actual = list(code.extract_code_ranges(instr))
57 59 nt.assert_equal(actual, expected)
58 60
59 61 def test_extract_symbols():
60 62 source = """import foo\na = 10\ndef b():\n return 42\n\n\nclass A: pass\n\n\n"""
61 63 symbols_args = ["a", "b", "A", "A,b", "A,a", "z"]
62 64 expected = [([], ['a']),
63 65 (["def b():\n return 42\n"], []),
64 66 (["class A: pass\n"], []),
65 67 (["class A: pass\n", "def b():\n return 42\n"], []),
66 68 (["class A: pass\n"], ['a']),
67 69 ([], ['z'])]
68 70 for symbols, exp in zip(symbols_args, expected):
69 71 nt.assert_equal(code.extract_symbols(source, symbols), exp)
70 72
71 73
72 74 def test_extract_symbols_raises_exception_with_non_python_code():
73 75 source = ("=begin A Ruby program :)=end\n"
74 76 "def hello\n"
75 77 "puts 'Hello world'\n"
76 78 "end")
77 79 with nt.assert_raises(SyntaxError):
78 80 code.extract_symbols(source, "hello")
79 81
80 82
81 83 def test_magic_not_found():
82 84 # magic not found raises UsageError
83 85 with nt.assert_raises(UsageError):
84 86 _ip.magic('doesntexist')
85 87
86 88 # ensure result isn't success when a magic isn't found
87 89 result = _ip.run_cell('%doesntexist')
88 90 assert isinstance(result.error_in_exec, UsageError)
89 91
90 92
91 93 def test_cell_magic_not_found():
92 94 # magic not found raises UsageError
93 95 with nt.assert_raises(UsageError):
94 96 _ip.run_cell_magic('doesntexist', 'line', 'cell')
95 97
96 98 # ensure result isn't success when a magic isn't found
97 99 result = _ip.run_cell('%%doesntexist')
98 100 assert isinstance(result.error_in_exec, UsageError)
99 101
100 102
101 103 def test_magic_error_status():
102 104 def fail(shell):
103 105 1/0
104 106 _ip.register_magic_function(fail)
105 107 result = _ip.run_cell('%fail')
106 108 assert isinstance(result.error_in_exec, ZeroDivisionError)
107 109
108 110
109 111 def test_config():
110 112 """ test that config magic does not raise
111 113 can happen if Configurable init is moved too early into
112 114 Magics.__init__ as then a Config object will be registered as a
113 115 magic.
114 116 """
115 117 ## should not raise.
116 118 _ip.magic('config')
117 119
118 120 def test_config_available_configs():
119 121 """ test that config magic prints available configs in unique and
120 122 sorted order. """
121 123 with capture_output() as captured:
122 124 _ip.magic('config')
123 125
124 126 stdout = captured.stdout
125 127 config_classes = stdout.strip().split('\n')[1:]
126 128 nt.assert_list_equal(config_classes, sorted(set(config_classes)))
127 129
128 130 def test_config_print_class():
129 131 """ test that config with a classname prints the class's options. """
130 132 with capture_output() as captured:
131 133 _ip.magic('config TerminalInteractiveShell')
132 134
133 135 stdout = captured.stdout
134 136 if not re.match("TerminalInteractiveShell.* options", stdout.splitlines()[0]):
135 137 print(stdout)
136 138 raise AssertionError("1st line of stdout not like "
137 139 "'TerminalInteractiveShell.* options'")
138 140
139 141 def test_rehashx():
140 142 # clear up everything
141 143 _ip.alias_manager.clear_aliases()
142 144 del _ip.db['syscmdlist']
143 145
144 146 _ip.magic('rehashx')
145 147 # Practically ALL ipython development systems will have more than 10 aliases
146 148
147 149 nt.assert_true(len(_ip.alias_manager.aliases) > 10)
148 150 for name, cmd in _ip.alias_manager.aliases:
149 151 # we must strip dots from alias names
150 152 nt.assert_not_in('.', name)
151 153
152 154 # rehashx must fill up syscmdlist
153 155 scoms = _ip.db['syscmdlist']
154 156 nt.assert_true(len(scoms) > 10)
155 157
156 158
157 159
158 160 def test_magic_parse_options():
159 161 """Test that we don't mangle paths when parsing magic options."""
160 162 ip = get_ipython()
161 163 path = 'c:\\x'
162 164 m = DummyMagics(ip)
163 165 opts = m.parse_options('-f %s' % path,'f:')[0]
164 166 # argv splitting is os-dependent
165 167 if os.name == 'posix':
166 168 expected = 'c:x'
167 169 else:
168 170 expected = path
169 171 nt.assert_equal(opts['f'], expected)
170 172
171 173 def test_magic_parse_long_options():
172 174 """Magic.parse_options can handle --foo=bar long options"""
173 175 ip = get_ipython()
174 176 m = DummyMagics(ip)
175 177 opts, _ = m.parse_options('--foo --bar=bubble', 'a', 'foo', 'bar=')
176 178 nt.assert_in('foo', opts)
177 179 nt.assert_in('bar', opts)
178 180 nt.assert_equal(opts['bar'], "bubble")
179 181
180 182
181 183 def doctest_hist_f():
182 184 """Test %hist -f with temporary filename.
183 185
184 186 In [9]: import tempfile
185 187
186 188 In [10]: tfile = tempfile.mktemp('.py','tmp-ipython-')
187 189
188 190 In [11]: %hist -nl -f $tfile 3
189 191
190 192 In [13]: import os; os.unlink(tfile)
191 193 """
192 194
193 195
194 196 def doctest_hist_op():
195 197 """Test %hist -op
196 198
197 199 In [1]: class b(float):
198 200 ...: pass
199 201 ...:
200 202
201 203 In [2]: class s(object):
202 204 ...: def __str__(self):
203 205 ...: return 's'
204 206 ...:
205 207
206 208 In [3]:
207 209
208 210 In [4]: class r(b):
209 211 ...: def __repr__(self):
210 212 ...: return 'r'
211 213 ...:
212 214
213 215 In [5]: class sr(s,r): pass
214 216 ...:
215 217
216 218 In [6]:
217 219
218 220 In [7]: bb=b()
219 221
220 222 In [8]: ss=s()
221 223
222 224 In [9]: rr=r()
223 225
224 226 In [10]: ssrr=sr()
225 227
226 228 In [11]: 4.5
227 229 Out[11]: 4.5
228 230
229 231 In [12]: str(ss)
230 232 Out[12]: 's'
231 233
232 234 In [13]:
233 235
234 236 In [14]: %hist -op
235 237 >>> class b:
236 238 ... pass
237 239 ...
238 240 >>> class s(b):
239 241 ... def __str__(self):
240 242 ... return 's'
241 243 ...
242 244 >>>
243 245 >>> class r(b):
244 246 ... def __repr__(self):
245 247 ... return 'r'
246 248 ...
247 249 >>> class sr(s,r): pass
248 250 >>>
249 251 >>> bb=b()
250 252 >>> ss=s()
251 253 >>> rr=r()
252 254 >>> ssrr=sr()
253 255 >>> 4.5
254 256 4.5
255 257 >>> str(ss)
256 258 's'
257 259 >>>
258 260 """
259 261
260 262 def test_hist_pof():
261 263 ip = get_ipython()
262 264 ip.run_cell(u"1+2", store_history=True)
263 265 #raise Exception(ip.history_manager.session_number)
264 266 #raise Exception(list(ip.history_manager._get_range_session()))
265 267 with TemporaryDirectory() as td:
266 268 tf = os.path.join(td, 'hist.py')
267 269 ip.run_line_magic('history', '-pof %s' % tf)
268 270 assert os.path.isfile(tf)
269 271
270 272
271 273 def test_macro():
272 274 ip = get_ipython()
273 275 ip.history_manager.reset() # Clear any existing history.
274 276 cmds = ["a=1", "def b():\n return a**2", "print(a,b())"]
275 277 for i, cmd in enumerate(cmds, start=1):
276 278 ip.history_manager.store_inputs(i, cmd)
277 279 ip.magic("macro test 1-3")
278 280 nt.assert_equal(ip.user_ns["test"].value, "\n".join(cmds)+"\n")
279 281
280 282 # List macros
281 283 nt.assert_in("test", ip.magic("macro"))
282 284
283 285
284 286 def test_macro_run():
285 287 """Test that we can run a multi-line macro successfully."""
286 288 ip = get_ipython()
287 289 ip.history_manager.reset()
288 290 cmds = ["a=10", "a+=1", "print(a)", "%macro test 2-3"]
289 291 for cmd in cmds:
290 292 ip.run_cell(cmd, store_history=True)
291 293 nt.assert_equal(ip.user_ns["test"].value, "a+=1\nprint(a)\n")
292 294 with tt.AssertPrints("12"):
293 295 ip.run_cell("test")
294 296 with tt.AssertPrints("13"):
295 297 ip.run_cell("test")
296 298
297 299
298 300 def test_magic_magic():
299 301 """Test %magic"""
300 302 ip = get_ipython()
301 303 with capture_output() as captured:
302 304 ip.magic("magic")
303 305
304 306 stdout = captured.stdout
305 307 nt.assert_in('%magic', stdout)
306 308 nt.assert_in('IPython', stdout)
307 309 nt.assert_in('Available', stdout)
308 310
309 311
310 312 @dec.skipif_not_numpy
311 313 def test_numpy_reset_array_undec():
312 314 "Test '%reset array' functionality"
313 315 _ip.ex('import numpy as np')
314 316 _ip.ex('a = np.empty(2)')
315 317 nt.assert_in('a', _ip.user_ns)
316 318 _ip.magic('reset -f array')
317 319 nt.assert_not_in('a', _ip.user_ns)
318 320
319 321 def test_reset_out():
320 322 "Test '%reset out' magic"
321 323 _ip.run_cell("parrot = 'dead'", store_history=True)
322 324 # test '%reset -f out', make an Out prompt
323 325 _ip.run_cell("parrot", store_history=True)
324 326 nt.assert_true('dead' in [_ip.user_ns[x] for x in ('_','__','___')])
325 327 _ip.magic('reset -f out')
326 328 nt.assert_false('dead' in [_ip.user_ns[x] for x in ('_','__','___')])
327 329 nt.assert_equal(len(_ip.user_ns['Out']), 0)
328 330
329 331 def test_reset_in():
330 332 "Test '%reset in' magic"
331 333 # test '%reset -f in'
332 334 _ip.run_cell("parrot", store_history=True)
333 335 nt.assert_true('parrot' in [_ip.user_ns[x] for x in ('_i','_ii','_iii')])
334 336 _ip.magic('%reset -f in')
335 337 nt.assert_false('parrot' in [_ip.user_ns[x] for x in ('_i','_ii','_iii')])
336 338 nt.assert_equal(len(set(_ip.user_ns['In'])), 1)
337 339
338 340 def test_reset_dhist():
339 341 "Test '%reset dhist' magic"
340 342 _ip.run_cell("tmp = [d for d in _dh]") # copy before clearing
341 343 _ip.magic('cd ' + os.path.dirname(nt.__file__))
342 344 _ip.magic('cd -')
343 345 nt.assert_true(len(_ip.user_ns['_dh']) > 0)
344 346 _ip.magic('reset -f dhist')
345 347 nt.assert_equal(len(_ip.user_ns['_dh']), 0)
346 348 _ip.run_cell("_dh = [d for d in tmp]") #restore
347 349
348 350 def test_reset_in_length():
349 351 "Test that '%reset in' preserves In[] length"
350 352 _ip.run_cell("print 'foo'")
351 353 _ip.run_cell("reset -f in")
352 354 nt.assert_equal(len(_ip.user_ns['In']), _ip.displayhook.prompt_count+1)
353 355
354 356 class TestResetErrors(TestCase):
355 357
356 358 def test_reset_redefine(self):
357 359
358 360 @magics_class
359 361 class KernelMagics(Magics):
360 362 @line_magic
361 363 def less(self, shell): pass
362 364
363 365 _ip.register_magics(KernelMagics)
364 366
365 367 with self.assertLogs() as cm:
366 368 # hack, we want to just capture logs, but assertLogs fails if not
367 369 # logs get produce.
368 370 # so log one things we ignore.
369 371 import logging as log_mod
370 372 log = log_mod.getLogger()
371 373 log.info('Nothing')
372 374 # end hack.
373 375 _ip.run_cell("reset -f")
374 376
375 377 assert len(cm.output) == 1
376 378 for out in cm.output:
377 379 assert "Invalid alias" not in out
378 380
379 381 def test_tb_syntaxerror():
380 382 """test %tb after a SyntaxError"""
381 383 ip = get_ipython()
382 384 ip.run_cell("for")
383 385
384 386 # trap and validate stdout
385 387 save_stdout = sys.stdout
386 388 try:
387 389 sys.stdout = StringIO()
388 390 ip.run_cell("%tb")
389 391 out = sys.stdout.getvalue()
390 392 finally:
391 393 sys.stdout = save_stdout
392 394 # trim output, and only check the last line
393 395 last_line = out.rstrip().splitlines()[-1].strip()
394 396 nt.assert_equal(last_line, "SyntaxError: invalid syntax")
395 397
396 398
397 399 def test_time():
398 400 ip = get_ipython()
399 401
400 402 with tt.AssertPrints("Wall time: "):
401 403 ip.run_cell("%time None")
402 404
403 405 ip.run_cell("def f(kmjy):\n"
404 406 " %time print (2*kmjy)")
405 407
406 408 with tt.AssertPrints("Wall time: "):
407 409 with tt.AssertPrints("hihi", suppress=False):
408 410 ip.run_cell("f('hi')")
409 411
410 412 def test_time_last_not_expression():
411 413 ip.run_cell("%%time\n"
412 414 "var_1 = 1\n"
413 415 "var_2 = 2\n")
414 416 assert ip.user_ns['var_1'] == 1
415 417 del ip.user_ns['var_1']
416 418 assert ip.user_ns['var_2'] == 2
417 419 del ip.user_ns['var_2']
418 420
419 421
420 422 @dec.skip_win32
421 423 def test_time2():
422 424 ip = get_ipython()
423 425
424 426 with tt.AssertPrints("CPU times: user "):
425 427 ip.run_cell("%time None")
426 428
427 429 def test_time3():
428 430 """Erroneous magic function calls, issue gh-3334"""
429 431 ip = get_ipython()
430 432 ip.user_ns.pop('run', None)
431 433
432 434 with tt.AssertNotPrints("not found", channel='stderr'):
433 435 ip.run_cell("%%time\n"
434 436 "run = 0\n"
435 437 "run += 1")
436 438
437 439 def test_multiline_time():
438 440 """Make sure last statement from time return a value."""
439 441 ip = get_ipython()
440 442 ip.user_ns.pop('run', None)
441 443
442 444 ip.run_cell(dedent("""\
443 445 %%time
444 446 a = "ho"
445 447 b = "hey"
446 448 a+b
447 449 """))
448 450 nt.assert_equal(ip.user_ns_hidden['_'], 'hohey')
449 451
450 452 def test_time_local_ns():
451 453 """
452 454 Test that local_ns is actually global_ns when running a cell magic
453 455 """
454 456 ip = get_ipython()
455 457 ip.run_cell("%%time\n"
456 458 "myvar = 1")
457 459 nt.assert_equal(ip.user_ns['myvar'], 1)
458 460 del ip.user_ns['myvar']
459 461
460 462 def test_doctest_mode():
461 463 "Toggle doctest_mode twice, it should be a no-op and run without error"
462 464 _ip.magic('doctest_mode')
463 465 _ip.magic('doctest_mode')
464 466
465 467
466 468 def test_parse_options():
467 469 """Tests for basic options parsing in magics."""
468 470 # These are only the most minimal of tests, more should be added later. At
469 471 # the very least we check that basic text/unicode calls work OK.
470 472 m = DummyMagics(_ip)
471 473 nt.assert_equal(m.parse_options('foo', '')[1], 'foo')
472 474 nt.assert_equal(m.parse_options(u'foo', '')[1], u'foo')
473 475
474 476
475 477 def test_parse_options_preserve_non_option_string():
476 478 """Test to assert preservation of non-option part of magic-block, while parsing magic options."""
477 479 m = DummyMagics(_ip)
478 480 opts, stmt = m.parse_options(
479 481 " -n1 -r 13 _ = 314 + foo", "n:r:", preserve_non_opts=True
480 482 )
481 483 nt.assert_equal(opts, {"n": "1", "r": "13"})
482 484 nt.assert_equal(stmt, "_ = 314 + foo")
483 485
484 486
485 487 def test_run_magic_preserve_code_block():
486 488 """Test to assert preservation of non-option part of magic-block, while running magic."""
487 489 _ip.user_ns["spaces"] = []
488 490 _ip.magic("timeit -n1 -r1 spaces.append([s.count(' ') for s in ['document']])")
489 491 assert _ip.user_ns["spaces"] == [[0]]
490 492
491 493
492 494 def test_dirops():
493 495 """Test various directory handling operations."""
494 496 # curpath = lambda :os.path.splitdrive(os.getcwd())[1].replace('\\','/')
495 497 curpath = os.getcwd
496 498 startdir = os.getcwd()
497 499 ipdir = os.path.realpath(_ip.ipython_dir)
498 500 try:
499 501 _ip.magic('cd "%s"' % ipdir)
500 502 nt.assert_equal(curpath(), ipdir)
501 503 _ip.magic('cd -')
502 504 nt.assert_equal(curpath(), startdir)
503 505 _ip.magic('pushd "%s"' % ipdir)
504 506 nt.assert_equal(curpath(), ipdir)
505 507 _ip.magic('popd')
506 508 nt.assert_equal(curpath(), startdir)
507 509 finally:
508 510 os.chdir(startdir)
509 511
510 512
511 513 def test_cd_force_quiet():
512 514 """Test OSMagics.cd_force_quiet option"""
513 515 _ip.config.OSMagics.cd_force_quiet = True
514 516 osmagics = osm.OSMagics(shell=_ip)
515 517
516 518 startdir = os.getcwd()
517 519 ipdir = os.path.realpath(_ip.ipython_dir)
518 520
519 521 try:
520 522 with tt.AssertNotPrints(ipdir):
521 523 osmagics.cd('"%s"' % ipdir)
522 524 with tt.AssertNotPrints(startdir):
523 525 osmagics.cd('-')
524 526 finally:
525 527 os.chdir(startdir)
526 528
527 529
528 530 def test_xmode():
529 531 # Calling xmode three times should be a no-op
530 532 xmode = _ip.InteractiveTB.mode
531 533 for i in range(4):
532 534 _ip.magic("xmode")
533 535 nt.assert_equal(_ip.InteractiveTB.mode, xmode)
534 536
535 537 def test_reset_hard():
536 538 monitor = []
537 539 class A(object):
538 540 def __del__(self):
539 541 monitor.append(1)
540 542 def __repr__(self):
541 543 return "<A instance>"
542 544
543 545 _ip.user_ns["a"] = A()
544 546 _ip.run_cell("a")
545 547
546 548 nt.assert_equal(monitor, [])
547 549 _ip.magic("reset -f")
548 550 nt.assert_equal(monitor, [1])
549 551
550 552 class TestXdel(tt.TempFileMixin):
551 553 def test_xdel(self):
552 554 """Test that references from %run are cleared by xdel."""
553 555 src = ("class A(object):\n"
554 556 " monitor = []\n"
555 557 " def __del__(self):\n"
556 558 " self.monitor.append(1)\n"
557 559 "a = A()\n")
558 560 self.mktmp(src)
559 561 # %run creates some hidden references...
560 562 _ip.magic("run %s" % self.fname)
561 563 # ... as does the displayhook.
562 564 _ip.run_cell("a")
563 565
564 566 monitor = _ip.user_ns["A"].monitor
565 567 nt.assert_equal(monitor, [])
566 568
567 569 _ip.magic("xdel a")
568 570
569 571 # Check that a's __del__ method has been called.
570 572 nt.assert_equal(monitor, [1])
571 573
572 574 def doctest_who():
573 575 """doctest for %who
574 576
575 577 In [1]: %reset -f
576 578
577 579 In [2]: alpha = 123
578 580
579 581 In [3]: beta = 'beta'
580 582
581 583 In [4]: %who int
582 584 alpha
583 585
584 586 In [5]: %who str
585 587 beta
586 588
587 589 In [6]: %whos
588 590 Variable Type Data/Info
589 591 ----------------------------
590 592 alpha int 123
591 593 beta str beta
592 594
593 595 In [7]: %who_ls
594 596 Out[7]: ['alpha', 'beta']
595 597 """
596 598
597 599 def test_whos():
598 600 """Check that whos is protected against objects where repr() fails."""
599 601 class A(object):
600 602 def __repr__(self):
601 603 raise Exception()
602 604 _ip.user_ns['a'] = A()
603 605 _ip.magic("whos")
604 606
605 607 def doctest_precision():
606 608 """doctest for %precision
607 609
608 610 In [1]: f = get_ipython().display_formatter.formatters['text/plain']
609 611
610 612 In [2]: %precision 5
611 613 Out[2]: '%.5f'
612 614
613 615 In [3]: f.float_format
614 616 Out[3]: '%.5f'
615 617
616 618 In [4]: %precision %e
617 619 Out[4]: '%e'
618 620
619 621 In [5]: f(3.1415927)
620 622 Out[5]: '3.141593e+00'
621 623 """
622 624
623 625 def test_debug_magic():
624 626 """Test debugging a small code with %debug
625 627
626 628 In [1]: with PdbTestInput(['c']):
627 629 ...: %debug print("a b") #doctest: +ELLIPSIS
628 630 ...:
629 631 ...
630 632 ipdb> c
631 633 a b
632 634 In [2]:
633 635 """
634 636
635 637 def test_psearch():
636 638 with tt.AssertPrints("dict.fromkeys"):
637 639 _ip.run_cell("dict.fr*?")
638 640 with tt.AssertPrints("Ο€.is_integer"):
639 641 _ip.run_cell("Ο€ = 3.14;\nΟ€.is_integ*?")
640 642
641 643 def test_timeit_shlex():
642 644 """test shlex issues with timeit (#1109)"""
643 645 _ip.ex("def f(*a,**kw): pass")
644 646 _ip.magic('timeit -n1 "this is a bug".count(" ")')
645 647 _ip.magic('timeit -r1 -n1 f(" ", 1)')
646 648 _ip.magic('timeit -r1 -n1 f(" ", 1, " ", 2, " ")')
647 649 _ip.magic('timeit -r1 -n1 ("a " + "b")')
648 650 _ip.magic('timeit -r1 -n1 f("a " + "b")')
649 651 _ip.magic('timeit -r1 -n1 f("a " + "b ")')
650 652
651 653
652 654 def test_timeit_special_syntax():
653 655 "Test %%timeit with IPython special syntax"
654 656 @register_line_magic
655 657 def lmagic(line):
656 658 ip = get_ipython()
657 659 ip.user_ns['lmagic_out'] = line
658 660
659 661 # line mode test
660 662 _ip.run_line_magic('timeit', '-n1 -r1 %lmagic my line')
661 663 nt.assert_equal(_ip.user_ns['lmagic_out'], 'my line')
662 664 # cell mode test
663 665 _ip.run_cell_magic('timeit', '-n1 -r1', '%lmagic my line2')
664 666 nt.assert_equal(_ip.user_ns['lmagic_out'], 'my line2')
665 667
666 668 def test_timeit_return():
667 669 """
668 670 test whether timeit -o return object
669 671 """
670 672
671 673 res = _ip.run_line_magic('timeit','-n10 -r10 -o 1')
672 674 assert(res is not None)
673 675
674 676 def test_timeit_quiet():
675 677 """
676 678 test quiet option of timeit magic
677 679 """
678 680 with tt.AssertNotPrints("loops"):
679 681 _ip.run_cell("%timeit -n1 -r1 -q 1")
680 682
681 683 def test_timeit_return_quiet():
682 684 with tt.AssertNotPrints("loops"):
683 685 res = _ip.run_line_magic('timeit', '-n1 -r1 -q -o 1')
684 686 assert (res is not None)
685 687
686 688 def test_timeit_invalid_return():
687 689 with nt.assert_raises_regex(SyntaxError, "outside function"):
688 690 _ip.run_line_magic('timeit', 'return')
689 691
690 692 @dec.skipif(execution.profile is None)
691 693 def test_prun_special_syntax():
692 694 "Test %%prun with IPython special syntax"
693 695 @register_line_magic
694 696 def lmagic(line):
695 697 ip = get_ipython()
696 698 ip.user_ns['lmagic_out'] = line
697 699
698 700 # line mode test
699 701 _ip.run_line_magic('prun', '-q %lmagic my line')
700 702 nt.assert_equal(_ip.user_ns['lmagic_out'], 'my line')
701 703 # cell mode test
702 704 _ip.run_cell_magic('prun', '-q', '%lmagic my line2')
703 705 nt.assert_equal(_ip.user_ns['lmagic_out'], 'my line2')
704 706
705 707 @dec.skipif(execution.profile is None)
706 708 def test_prun_quotes():
707 709 "Test that prun does not clobber string escapes (GH #1302)"
708 710 _ip.magic(r"prun -q x = '\t'")
709 711 nt.assert_equal(_ip.user_ns['x'], '\t')
710 712
711 713 def test_extension():
712 714 # Debugging information for failures of this test
713 715 print('sys.path:')
714 716 for p in sys.path:
715 717 print(' ', p)
716 718 print('CWD', os.getcwd())
717 719
718 720 nt.assert_raises(ImportError, _ip.magic, "load_ext daft_extension")
719 721 daft_path = os.path.join(os.path.dirname(__file__), "daft_extension")
720 722 sys.path.insert(0, daft_path)
721 723 try:
722 724 _ip.user_ns.pop('arq', None)
723 725 invalidate_caches() # Clear import caches
724 726 _ip.magic("load_ext daft_extension")
725 727 nt.assert_equal(_ip.user_ns['arq'], 185)
726 728 _ip.magic("unload_ext daft_extension")
727 729 assert 'arq' not in _ip.user_ns
728 730 finally:
729 731 sys.path.remove(daft_path)
730 732
731 733
732 734 def test_notebook_export_json():
733 735 _ip = get_ipython()
734 736 _ip.history_manager.reset() # Clear any existing history.
735 737 cmds = [u"a=1", u"def b():\n return a**2", u"print('noΓ«l, Γ©tΓ©', b())"]
736 738 for i, cmd in enumerate(cmds, start=1):
737 739 _ip.history_manager.store_inputs(i, cmd)
738 740 with TemporaryDirectory() as td:
739 741 outfile = os.path.join(td, "nb.ipynb")
740 742 _ip.magic("notebook -e %s" % outfile)
741 743
742 744
743 745 class TestEnv(TestCase):
744 746
745 747 def test_env(self):
746 748 env = _ip.magic("env")
747 749 self.assertTrue(isinstance(env, dict))
748 750
749 751 def test_env_secret(self):
750 752 env = _ip.magic("env")
751 753 hidden = "<hidden>"
752 754 with mock.patch.dict(
753 755 os.environ,
754 756 {
755 757 "API_KEY": "abc123",
756 758 "SECRET_THING": "ssshhh",
757 759 "JUPYTER_TOKEN": "",
758 760 "VAR": "abc"
759 761 }
760 762 ):
761 763 env = _ip.magic("env")
762 764 assert env["API_KEY"] == hidden
763 765 assert env["SECRET_THING"] == hidden
764 766 assert env["JUPYTER_TOKEN"] == hidden
765 767 assert env["VAR"] == "abc"
766 768
767 769 def test_env_get_set_simple(self):
768 770 env = _ip.magic("env var val1")
769 771 self.assertEqual(env, None)
770 772 self.assertEqual(os.environ['var'], 'val1')
771 773 self.assertEqual(_ip.magic("env var"), 'val1')
772 774 env = _ip.magic("env var=val2")
773 775 self.assertEqual(env, None)
774 776 self.assertEqual(os.environ['var'], 'val2')
775 777
776 778 def test_env_get_set_complex(self):
777 779 env = _ip.magic("env var 'val1 '' 'val2")
778 780 self.assertEqual(env, None)
779 781 self.assertEqual(os.environ['var'], "'val1 '' 'val2")
780 782 self.assertEqual(_ip.magic("env var"), "'val1 '' 'val2")
781 783 env = _ip.magic('env var=val2 val3="val4')
782 784 self.assertEqual(env, None)
783 785 self.assertEqual(os.environ['var'], 'val2 val3="val4')
784 786
785 787 def test_env_set_bad_input(self):
786 788 self.assertRaises(UsageError, lambda: _ip.magic("set_env var"))
787 789
788 790 def test_env_set_whitespace(self):
789 791 self.assertRaises(UsageError, lambda: _ip.magic("env var A=B"))
790 792
791 793
792 794 class CellMagicTestCase(TestCase):
793 795
794 796 def check_ident(self, magic):
795 797 # Manually called, we get the result
796 798 out = _ip.run_cell_magic(magic, 'a', 'b')
797 799 nt.assert_equal(out, ('a','b'))
798 800 # Via run_cell, it goes into the user's namespace via displayhook
799 801 _ip.run_cell('%%' + magic +' c\nd\n')
800 802 nt.assert_equal(_ip.user_ns['_'], ('c','d\n'))
801 803
802 804 def test_cell_magic_func_deco(self):
803 805 "Cell magic using simple decorator"
804 806 @register_cell_magic
805 807 def cellm(line, cell):
806 808 return line, cell
807 809
808 810 self.check_ident('cellm')
809 811
810 812 def test_cell_magic_reg(self):
811 813 "Cell magic manually registered"
812 814 def cellm(line, cell):
813 815 return line, cell
814 816
815 817 _ip.register_magic_function(cellm, 'cell', 'cellm2')
816 818 self.check_ident('cellm2')
817 819
818 820 def test_cell_magic_class(self):
819 821 "Cell magics declared via a class"
820 822 @magics_class
821 823 class MyMagics(Magics):
822 824
823 825 @cell_magic
824 826 def cellm3(self, line, cell):
825 827 return line, cell
826 828
827 829 _ip.register_magics(MyMagics)
828 830 self.check_ident('cellm3')
829 831
830 832 def test_cell_magic_class2(self):
831 833 "Cell magics declared via a class, #2"
832 834 @magics_class
833 835 class MyMagics2(Magics):
834 836
835 837 @cell_magic('cellm4')
836 838 def cellm33(self, line, cell):
837 839 return line, cell
838 840
839 841 _ip.register_magics(MyMagics2)
840 842 self.check_ident('cellm4')
841 843 # Check that nothing is registered as 'cellm33'
842 844 c33 = _ip.find_cell_magic('cellm33')
843 845 nt.assert_equal(c33, None)
844 846
845 847 def test_file():
846 848 """Basic %%writefile"""
847 849 ip = get_ipython()
848 850 with TemporaryDirectory() as td:
849 851 fname = os.path.join(td, 'file1')
850 852 ip.run_cell_magic("writefile", fname, u'\n'.join([
851 853 'line1',
852 854 'line2',
853 855 ]))
854 856 s = Path(fname).read_text()
855 857 nt.assert_in('line1\n', s)
856 858 nt.assert_in('line2', s)
857 859
858 860 @dec.skip_win32
859 861 def test_file_single_quote():
860 862 """Basic %%writefile with embedded single quotes"""
861 863 ip = get_ipython()
862 864 with TemporaryDirectory() as td:
863 865 fname = os.path.join(td, '\'file1\'')
864 866 ip.run_cell_magic("writefile", fname, u'\n'.join([
865 867 'line1',
866 868 'line2',
867 869 ]))
868 870 s = Path(fname).read_text()
869 871 nt.assert_in('line1\n', s)
870 872 nt.assert_in('line2', s)
871 873
872 874 @dec.skip_win32
873 875 def test_file_double_quote():
874 876 """Basic %%writefile with embedded double quotes"""
875 877 ip = get_ipython()
876 878 with TemporaryDirectory() as td:
877 879 fname = os.path.join(td, '"file1"')
878 880 ip.run_cell_magic("writefile", fname, u'\n'.join([
879 881 'line1',
880 882 'line2',
881 883 ]))
882 884 s = Path(fname).read_text()
883 885 nt.assert_in('line1\n', s)
884 886 nt.assert_in('line2', s)
885 887
886 888 def test_file_var_expand():
887 889 """%%writefile $filename"""
888 890 ip = get_ipython()
889 891 with TemporaryDirectory() as td:
890 892 fname = os.path.join(td, 'file1')
891 893 ip.user_ns['filename'] = fname
892 894 ip.run_cell_magic("writefile", '$filename', u'\n'.join([
893 895 'line1',
894 896 'line2',
895 897 ]))
896 898 s = Path(fname).read_text()
897 899 nt.assert_in('line1\n', s)
898 900 nt.assert_in('line2', s)
899 901
900 902 def test_file_unicode():
901 903 """%%writefile with unicode cell"""
902 904 ip = get_ipython()
903 905 with TemporaryDirectory() as td:
904 906 fname = os.path.join(td, 'file1')
905 907 ip.run_cell_magic("writefile", fname, u'\n'.join([
906 908 u'linΓ©1',
907 909 u'linΓ©2',
908 910 ]))
909 911 with io.open(fname, encoding='utf-8') as f:
910 912 s = f.read()
911 913 nt.assert_in(u'linΓ©1\n', s)
912 914 nt.assert_in(u'linΓ©2', s)
913 915
914 916 def test_file_amend():
915 917 """%%writefile -a amends files"""
916 918 ip = get_ipython()
917 919 with TemporaryDirectory() as td:
918 920 fname = os.path.join(td, 'file2')
919 921 ip.run_cell_magic("writefile", fname, u'\n'.join([
920 922 'line1',
921 923 'line2',
922 924 ]))
923 925 ip.run_cell_magic("writefile", "-a %s" % fname, u'\n'.join([
924 926 'line3',
925 927 'line4',
926 928 ]))
927 929 s = Path(fname).read_text()
928 930 nt.assert_in('line1\n', s)
929 931 nt.assert_in('line3\n', s)
930 932
931 933 def test_file_spaces():
932 934 """%%file with spaces in filename"""
933 935 ip = get_ipython()
934 936 with TemporaryWorkingDirectory() as td:
935 937 fname = "file name"
936 938 ip.run_cell_magic("file", '"%s"'%fname, u'\n'.join([
937 939 'line1',
938 940 'line2',
939 941 ]))
940 942 s = Path(fname).read_text()
941 943 nt.assert_in('line1\n', s)
942 944 nt.assert_in('line2', s)
943 945
944 946 def test_script_config():
945 947 ip = get_ipython()
946 948 ip.config.ScriptMagics.script_magics = ['whoda']
947 949 sm = script.ScriptMagics(shell=ip)
948 950 nt.assert_in('whoda', sm.magics['cell'])
949 951
952 @dec.skip_iptest_but_not_pytest
950 953 @dec.skip_win32
951 954 def test_script_out():
955 assert asyncio.get_event_loop().is_running() is False
956
952 957 ip = get_ipython()
953 958 ip.run_cell_magic("script", "--out output sh", "echo 'hi'")
959 assert asyncio.get_event_loop().is_running() is False
954 960 nt.assert_equal(ip.user_ns['output'], 'hi\n')
955 961
962 @dec.skip_iptest_but_not_pytest
956 963 @dec.skip_win32
957 964 def test_script_err():
958 965 ip = get_ipython()
966 assert asyncio.get_event_loop().is_running() is False
959 967 ip.run_cell_magic("script", "--err error sh", "echo 'hello' >&2")
968 assert asyncio.get_event_loop().is_running() is False
960 969 nt.assert_equal(ip.user_ns['error'], 'hello\n')
961 970
971
972 @dec.skip_iptest_but_not_pytest
962 973 @dec.skip_win32
963 974 def test_script_out_err():
975
964 976 ip = get_ipython()
965 ip.run_cell_magic("script", "--out output --err error sh", "echo 'hi'\necho 'hello' >&2")
966 nt.assert_equal(ip.user_ns['output'], 'hi\n')
967 nt.assert_equal(ip.user_ns['error'], 'hello\n')
977 ip.run_cell_magic(
978 "script", "--out output --err error sh", "echo 'hi'\necho 'hello' >&2"
979 )
980 nt.assert_equal(ip.user_ns["output"], "hi\n")
981 nt.assert_equal(ip.user_ns["error"], "hello\n")
968 982
983
984 @dec.skip_iptest_but_not_pytest
969 985 @dec.skip_win32
970 986 async def test_script_bg_out():
971 987 ip = get_ipython()
972 988 ip.run_cell_magic("script", "--bg --out output sh", "echo 'hi'")
973 989 nt.assert_equal((await ip.user_ns["output"].read()), b"hi\n")
974 ip.user_ns['output'].close()
990 ip.user_ns["output"].close()
991 asyncio.get_event_loop().stop()
975 992
993 @dec.skip_iptest_but_not_pytest
976 994 @dec.skip_win32
977 995 async def test_script_bg_err():
978 996 ip = get_ipython()
979 997 ip.run_cell_magic("script", "--bg --err error sh", "echo 'hello' >&2")
980 998 nt.assert_equal((await ip.user_ns["error"].read()), b"hello\n")
981 999 ip.user_ns["error"].close()
982 1000
983 1001
1002 @dec.skip_iptest_but_not_pytest
984 1003 @dec.skip_win32
985 1004 async def test_script_bg_out_err():
986 1005 ip = get_ipython()
987 1006 ip.run_cell_magic(
988 1007 "script", "--bg --out output --err error sh", "echo 'hi'\necho 'hello' >&2"
989 1008 )
990 1009 nt.assert_equal((await ip.user_ns["output"].read()), b"hi\n")
991 1010 nt.assert_equal((await ip.user_ns["error"].read()), b"hello\n")
992 1011 ip.user_ns["output"].close()
993 1012 ip.user_ns["error"].close()
994 1013
995 1014
996 1015 def test_script_defaults():
997 1016 ip = get_ipython()
998 1017 for cmd in ['sh', 'bash', 'perl', 'ruby']:
999 1018 try:
1000 1019 find_cmd(cmd)
1001 1020 except Exception:
1002 1021 pass
1003 1022 else:
1004 1023 nt.assert_in(cmd, ip.magics_manager.magics['cell'])
1005 1024
1006 1025
1007 1026 @magics_class
1008 1027 class FooFoo(Magics):
1009 1028 """class with both %foo and %%foo magics"""
1010 1029 @line_magic('foo')
1011 1030 def line_foo(self, line):
1012 1031 "I am line foo"
1013 1032 pass
1014 1033
1015 1034 @cell_magic("foo")
1016 1035 def cell_foo(self, line, cell):
1017 1036 "I am cell foo, not line foo"
1018 1037 pass
1019 1038
1020 1039 def test_line_cell_info():
1021 1040 """%%foo and %foo magics are distinguishable to inspect"""
1022 1041 ip = get_ipython()
1023 1042 ip.magics_manager.register(FooFoo)
1024 1043 oinfo = ip.object_inspect('foo')
1025 1044 nt.assert_true(oinfo['found'])
1026 1045 nt.assert_true(oinfo['ismagic'])
1027 1046
1028 1047 oinfo = ip.object_inspect('%%foo')
1029 1048 nt.assert_true(oinfo['found'])
1030 1049 nt.assert_true(oinfo['ismagic'])
1031 1050 nt.assert_equal(oinfo['docstring'], FooFoo.cell_foo.__doc__)
1032 1051
1033 1052 oinfo = ip.object_inspect('%foo')
1034 1053 nt.assert_true(oinfo['found'])
1035 1054 nt.assert_true(oinfo['ismagic'])
1036 1055 nt.assert_equal(oinfo['docstring'], FooFoo.line_foo.__doc__)
1037 1056
1038 1057 def test_multiple_magics():
1039 1058 ip = get_ipython()
1040 1059 foo1 = FooFoo(ip)
1041 1060 foo2 = FooFoo(ip)
1042 1061 mm = ip.magics_manager
1043 1062 mm.register(foo1)
1044 1063 nt.assert_true(mm.magics['line']['foo'].__self__ is foo1)
1045 1064 mm.register(foo2)
1046 1065 nt.assert_true(mm.magics['line']['foo'].__self__ is foo2)
1047 1066
1048 1067 def test_alias_magic():
1049 1068 """Test %alias_magic."""
1050 1069 ip = get_ipython()
1051 1070 mm = ip.magics_manager
1052 1071
1053 1072 # Basic operation: both cell and line magics are created, if possible.
1054 1073 ip.run_line_magic('alias_magic', 'timeit_alias timeit')
1055 1074 nt.assert_in('timeit_alias', mm.magics['line'])
1056 1075 nt.assert_in('timeit_alias', mm.magics['cell'])
1057 1076
1058 1077 # --cell is specified, line magic not created.
1059 1078 ip.run_line_magic('alias_magic', '--cell timeit_cell_alias timeit')
1060 1079 nt.assert_not_in('timeit_cell_alias', mm.magics['line'])
1061 1080 nt.assert_in('timeit_cell_alias', mm.magics['cell'])
1062 1081
1063 1082 # Test that line alias is created successfully.
1064 1083 ip.run_line_magic('alias_magic', '--line env_alias env')
1065 1084 nt.assert_equal(ip.run_line_magic('env', ''),
1066 1085 ip.run_line_magic('env_alias', ''))
1067 1086
1068 1087 # Test that line alias with parameters passed in is created successfully.
1069 1088 ip.run_line_magic('alias_magic', '--line history_alias history --params ' + shlex.quote('3'))
1070 1089 nt.assert_in('history_alias', mm.magics['line'])
1071 1090
1072 1091
1073 1092 def test_save():
1074 1093 """Test %save."""
1075 1094 ip = get_ipython()
1076 1095 ip.history_manager.reset() # Clear any existing history.
1077 1096 cmds = [u"a=1", u"def b():\n return a**2", u"print(a, b())"]
1078 1097 for i, cmd in enumerate(cmds, start=1):
1079 1098 ip.history_manager.store_inputs(i, cmd)
1080 1099 with TemporaryDirectory() as tmpdir:
1081 1100 file = os.path.join(tmpdir, "testsave.py")
1082 1101 ip.run_line_magic("save", "%s 1-10" % file)
1083 1102 content = Path(file).read_text()
1084 1103 nt.assert_equal(content.count(cmds[0]), 1)
1085 1104 nt.assert_in("coding: utf-8", content)
1086 1105 ip.run_line_magic("save", "-a %s 1-10" % file)
1087 1106 content = Path(file).read_text()
1088 1107 nt.assert_equal(content.count(cmds[0]), 2)
1089 1108 nt.assert_in("coding: utf-8", content)
1090 1109
1091 1110
1092 1111 def test_save_with_no_args():
1093 1112 ip = get_ipython()
1094 1113 ip.history_manager.reset() # Clear any existing history.
1095 1114 cmds = [u"a=1", u"def b():\n return a**2", u"print(a, b())", "%save"]
1096 1115 for i, cmd in enumerate(cmds, start=1):
1097 1116 ip.history_manager.store_inputs(i, cmd)
1098 1117
1099 1118 with TemporaryDirectory() as tmpdir:
1100 1119 path = os.path.join(tmpdir, "testsave.py")
1101 1120 ip.run_line_magic("save", path)
1102 1121 content = Path(path).read_text()
1103 1122 expected_content = dedent(
1104 1123 """\
1105 1124 # coding: utf-8
1106 1125 a=1
1107 1126 def b():
1108 1127 return a**2
1109 1128 print(a, b())
1110 1129 """
1111 1130 )
1112 1131 nt.assert_equal(content, expected_content)
1113 1132
1114 1133
1115 1134 def test_store():
1116 1135 """Test %store."""
1117 1136 ip = get_ipython()
1118 1137 ip.run_line_magic('load_ext', 'storemagic')
1119 1138
1120 1139 # make sure the storage is empty
1121 1140 ip.run_line_magic('store', '-z')
1122 1141 ip.user_ns['var'] = 42
1123 1142 ip.run_line_magic('store', 'var')
1124 1143 ip.user_ns['var'] = 39
1125 1144 ip.run_line_magic('store', '-r')
1126 1145 nt.assert_equal(ip.user_ns['var'], 42)
1127 1146
1128 1147 ip.run_line_magic('store', '-d var')
1129 1148 ip.user_ns['var'] = 39
1130 1149 ip.run_line_magic('store' , '-r')
1131 1150 nt.assert_equal(ip.user_ns['var'], 39)
1132 1151
1133 1152
1134 1153 def _run_edit_test(arg_s, exp_filename=None,
1135 1154 exp_lineno=-1,
1136 1155 exp_contents=None,
1137 1156 exp_is_temp=None):
1138 1157 ip = get_ipython()
1139 1158 M = code.CodeMagics(ip)
1140 1159 last_call = ['','']
1141 1160 opts,args = M.parse_options(arg_s,'prxn:')
1142 1161 filename, lineno, is_temp = M._find_edit_target(ip, args, opts, last_call)
1143 1162
1144 1163 if exp_filename is not None:
1145 1164 nt.assert_equal(exp_filename, filename)
1146 1165 if exp_contents is not None:
1147 1166 with io.open(filename, 'r', encoding='utf-8') as f:
1148 1167 contents = f.read()
1149 1168 nt.assert_equal(exp_contents, contents)
1150 1169 if exp_lineno != -1:
1151 1170 nt.assert_equal(exp_lineno, lineno)
1152 1171 if exp_is_temp is not None:
1153 1172 nt.assert_equal(exp_is_temp, is_temp)
1154 1173
1155 1174
1156 1175 def test_edit_interactive():
1157 1176 """%edit on interactively defined objects"""
1158 1177 ip = get_ipython()
1159 1178 n = ip.execution_count
1160 1179 ip.run_cell(u"def foo(): return 1", store_history=True)
1161 1180
1162 1181 try:
1163 1182 _run_edit_test("foo")
1164 1183 except code.InteractivelyDefined as e:
1165 1184 nt.assert_equal(e.index, n)
1166 1185 else:
1167 1186 raise AssertionError("Should have raised InteractivelyDefined")
1168 1187
1169 1188
1170 1189 def test_edit_cell():
1171 1190 """%edit [cell id]"""
1172 1191 ip = get_ipython()
1173 1192
1174 1193 ip.run_cell(u"def foo(): return 1", store_history=True)
1175 1194
1176 1195 # test
1177 1196 _run_edit_test("1", exp_contents=ip.user_ns['In'][1], exp_is_temp=True)
1178 1197
1179 1198 def test_edit_fname():
1180 1199 """%edit file"""
1181 1200 # test
1182 1201 _run_edit_test("test file.py", exp_filename="test file.py")
1183 1202
1184 1203 def test_bookmark():
1185 1204 ip = get_ipython()
1186 1205 ip.run_line_magic('bookmark', 'bmname')
1187 1206 with tt.AssertPrints('bmname'):
1188 1207 ip.run_line_magic('bookmark', '-l')
1189 1208 ip.run_line_magic('bookmark', '-d bmname')
1190 1209
1191 1210 def test_ls_magic():
1192 1211 ip = get_ipython()
1193 1212 json_formatter = ip.display_formatter.formatters['application/json']
1194 1213 json_formatter.enabled = True
1195 1214 lsmagic = ip.magic('lsmagic')
1196 1215 with warnings.catch_warnings(record=True) as w:
1197 1216 j = json_formatter(lsmagic)
1198 1217 nt.assert_equal(sorted(j), ['cell', 'line'])
1199 1218 nt.assert_equal(w, []) # no warnings
1200 1219
1201 1220 def test_strip_initial_indent():
1202 1221 def sii(s):
1203 1222 lines = s.splitlines()
1204 1223 return '\n'.join(code.strip_initial_indent(lines))
1205 1224
1206 1225 nt.assert_equal(sii(" a = 1\nb = 2"), "a = 1\nb = 2")
1207 1226 nt.assert_equal(sii(" a\n b\nc"), "a\n b\nc")
1208 1227 nt.assert_equal(sii("a\n b"), "a\n b")
1209 1228
1210 1229 def test_logging_magic_quiet_from_arg():
1211 1230 _ip.config.LoggingMagics.quiet = False
1212 1231 lm = logging.LoggingMagics(shell=_ip)
1213 1232 with TemporaryDirectory() as td:
1214 1233 try:
1215 1234 with tt.AssertNotPrints(re.compile("Activating.*")):
1216 1235 lm.logstart('-q {}'.format(
1217 1236 os.path.join(td, "quiet_from_arg.log")))
1218 1237 finally:
1219 1238 _ip.logger.logstop()
1220 1239
1221 1240 def test_logging_magic_quiet_from_config():
1222 1241 _ip.config.LoggingMagics.quiet = True
1223 1242 lm = logging.LoggingMagics(shell=_ip)
1224 1243 with TemporaryDirectory() as td:
1225 1244 try:
1226 1245 with tt.AssertNotPrints(re.compile("Activating.*")):
1227 1246 lm.logstart(os.path.join(td, "quiet_from_config.log"))
1228 1247 finally:
1229 1248 _ip.logger.logstop()
1230 1249
1231 1250
1232 1251 def test_logging_magic_not_quiet():
1233 1252 _ip.config.LoggingMagics.quiet = False
1234 1253 lm = logging.LoggingMagics(shell=_ip)
1235 1254 with TemporaryDirectory() as td:
1236 1255 try:
1237 1256 with tt.AssertPrints(re.compile("Activating.*")):
1238 1257 lm.logstart(os.path.join(td, "not_quiet.log"))
1239 1258 finally:
1240 1259 _ip.logger.logstop()
1241 1260
1242 1261
1243 1262 def test_time_no_var_expand():
1244 1263 _ip.user_ns['a'] = 5
1245 1264 _ip.user_ns['b'] = []
1246 1265 _ip.magic('time b.append("{a}")')
1247 1266 assert _ip.user_ns['b'] == ['{a}']
1248 1267
1249 1268
1250 1269 # this is slow, put at the end for local testing.
1251 1270 def test_timeit_arguments():
1252 1271 "Test valid timeit arguments, should not cause SyntaxError (GH #1269)"
1253 1272 if sys.version_info < (3,7):
1254 1273 _ip.magic("timeit -n1 -r1 ('#')")
1255 1274 else:
1256 1275 # 3.7 optimize no-op statement like above out, and complain there is
1257 1276 # nothing in the for loop.
1258 1277 _ip.magic("timeit -n1 -r1 a=('#')")
1259 1278
1260 1279
1261 1280 TEST_MODULE = """
1262 1281 print('Loaded my_tmp')
1263 1282 if __name__ == "__main__":
1264 1283 print('I just ran a script')
1265 1284 """
1266 1285
1267 1286
1268 1287 def test_run_module_from_import_hook():
1269 1288 "Test that a module can be loaded via an import hook"
1270 1289 with TemporaryDirectory() as tmpdir:
1271 1290 fullpath = os.path.join(tmpdir, 'my_tmp.py')
1272 1291 Path(fullpath).write_text(TEST_MODULE)
1273 1292
1274 1293 class MyTempImporter(object):
1275 1294 def __init__(self):
1276 1295 pass
1277 1296
1278 1297 def find_module(self, fullname, path=None):
1279 1298 if 'my_tmp' in fullname:
1280 1299 return self
1281 1300 return None
1282 1301
1283 1302 def load_module(self, name):
1284 1303 import imp
1285 1304 return imp.load_source('my_tmp', fullpath)
1286 1305
1287 1306 def get_code(self, fullname):
1288 1307 return compile(Path(fullpath).read_text(), "foo", "exec")
1289 1308
1290 1309 def is_package(self, __):
1291 1310 return False
1292 1311
1293 1312 sys.meta_path.insert(0, MyTempImporter())
1294 1313
1295 1314 with capture_output() as captured:
1296 1315 _ip.magic("run -m my_tmp")
1297 1316 _ip.run_cell("import my_tmp")
1298 1317
1299 1318 output = "Loaded my_tmp\nI just ran a script\nLoaded my_tmp\n"
1300 1319 nt.assert_equal(output, captured.stdout)
1301 1320
1302 1321 sys.meta_path.pop(0)
General Comments 0
You need to be logged in to leave comments. Login now