##// END OF EJS Templates
Merge pull request #13191 from Carreau/fix-loop...
Matthias Bussonnier -
r26858:e6aab7f5 merge
parent child Browse files
Show More
@@ -1,39 +1,40 b''
1 1 # This workflow will install Python dependencies, run tests and lint with a variety of Python versions
2 2 # For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions
3 3
4 4 name: Python package
5 5
6 6 on:
7 7 push:
8 8 branches: [ master, 7.x ]
9 9 pull_request:
10 10 branches: [ master, 7.x ]
11 11
12 12 jobs:
13 13 build:
14 14
15 15 runs-on: ubuntu-latest
16 timeout-minutes: 10
16 17 strategy:
17 18 matrix:
18 19 python-version: [3.8]
19 20
20 21 steps:
21 22 - uses: actions/checkout@v2
22 23 with:
23 24 fetch-depth: 0
24 25 - name: Set up Python ${{ matrix.python-version }}
25 26 uses: actions/setup-python@v2
26 27 with:
27 28 python-version: ${{ matrix.python-version }}
28 29 - name: Install dependencies
29 30 run: |
30 31 python -m pip install --upgrade pip
31 32 pip install darker isort
32 33 - name: Lint with darker
33 34 run: |
34 35 darker -r 60625f241f298b5039cb2debc365db38aa7bb522 --check --diff . || (
35 36 echo "Changes need auto-formatting. Run:"
36 37 echo " darker -r 60625f241f298b5039cb2debc365db38aa7bb522"
37 38 echo "then commit and push changes to fix."
38 39 exit 1
39 40 )
@@ -1,50 +1,50 b''
1 1 name: Run tests
2 2
3 3 on:
4 4 push:
5 5 pull_request:
6 6 # Run weekly on Monday at 1:23 UTC
7 7 schedule:
8 8 - cron: '23 1 * * 1'
9 9 workflow_dispatch:
10 10
11 11
12 12 jobs:
13 13 test:
14 14 runs-on: ${{ matrix.os }}
15 15 strategy:
16 16 matrix:
17 17 os: [ubuntu-latest]
18 18 python-version: ["3.7", "3.8", "3.9"]
19 19 # Test all on ubuntu, test ends on macos
20 20 include:
21 21 - os: macos-latest
22 22 python-version: "3.7"
23 23 - os: macos-latest
24 24 python-version: "3.9"
25 25
26 26 steps:
27 27 - uses: actions/checkout@v2
28 28 - name: Set up Python ${{ matrix.python-version }}
29 29 uses: actions/setup-python@v2
30 30 with:
31 31 python-version: ${{ matrix.python-version }}
32 32 - name: Install and update Python dependencies
33 33 run: |
34 34 python -m pip install --upgrade pip setuptools wheel
35 35 python -m pip install --upgrade -e file://$PWD#egg=ipython[test]
36 36 python -m pip install --upgrade --upgrade-strategy eager trio curio
37 37 python -m pip install --upgrade pytest pytest-trio 'matplotlib!=3.2.0'
38 38 python -m pip install --upgrade check-manifest pytest-cov anyio
39 39 - name: Check manifest
40 40 run: check-manifest
41 41 - name: iptest
42 42 run: |
43 43 cd /tmp && iptest --coverage xml && cd -
44 44 cp /tmp/ipy_coverage.xml ./
45 45 cp /tmp/.coverage ./
46 46 - name: pytest
47 47 run: |
48 pytest
48 pytest -v
49 49 - name: Upload coverage to Codecov
50 50 uses: codecov/codecov-action@v2
@@ -1,321 +1,355 b''
1 1 """Magic functions for running cells in various scripts."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 import asyncio
7 import atexit
6 8 import errno
9 import functools
7 10 import os
8 import sys
9 11 import signal
12 import sys
10 13 import time
11 import asyncio
12 import atexit
13
14 from contextlib import contextmanager
14 15 from subprocess import CalledProcessError
15 16
17 from traitlets import Dict, List, default
18
16 19 from IPython.core import magic_arguments
17 from IPython.core.magic import (
18 Magics, magics_class, line_magic, cell_magic
19 )
20 from IPython.core.magic import Magics, cell_magic, line_magic, magics_class
20 21 from IPython.lib.backgroundjobs import BackgroundJobManager
21 22 from IPython.utils.process import arg_split
22 from traitlets import List, Dict, default
23
24 23
25 24 #-----------------------------------------------------------------------------
26 25 # Magic implementation classes
27 26 #-----------------------------------------------------------------------------
28 27
29 28 def script_args(f):
30 29 """single decorator for adding script args"""
31 30 args = [
32 31 magic_arguments.argument(
33 32 '--out', type=str,
34 33 help="""The variable in which to store stdout from the script.
35 34 If the script is backgrounded, this will be the stdout *pipe*,
36 35 instead of the stderr text itself and will not be auto closed.
37 36 """
38 37 ),
39 38 magic_arguments.argument(
40 39 '--err', type=str,
41 40 help="""The variable in which to store stderr from the script.
42 41 If the script is backgrounded, this will be the stderr *pipe*,
43 42 instead of the stderr text itself and will not be autoclosed.
44 43 """
45 44 ),
46 45 magic_arguments.argument(
47 46 '--bg', action="store_true",
48 47 help="""Whether to run the script in the background.
49 48 If given, the only way to see the output of the command is
50 49 with --out/err.
51 50 """
52 51 ),
53 52 magic_arguments.argument(
54 53 '--proc', type=str,
55 54 help="""The variable in which to store Popen instance.
56 55 This is used only when --bg option is given.
57 56 """
58 57 ),
59 58 magic_arguments.argument(
60 59 '--no-raise-error', action="store_false", dest='raise_error',
61 60 help="""Whether you should raise an error message in addition to
62 61 a stream on stderr if you get a nonzero exit code.
63 62 """
64 63 )
65 64 ]
66 65 for arg in args:
67 66 f = arg(f)
68 67 return f
69 68
69
70 @contextmanager
71 def safe_watcher():
72 if sys.platform == "win32":
73 yield
74 return
75
76 from asyncio import SafeChildWatcher
77
78 policy = asyncio.get_event_loop_policy()
79 old_watcher = policy.get_child_watcher()
80 if isinstance(old_watcher, SafeChildWatcher):
81 yield
82 return
83
84 loop = asyncio.get_event_loop()
85 try:
86 watcher = asyncio.SafeChildWatcher()
87 watcher.attach_loop(loop)
88 policy.set_child_watcher(watcher)
89 yield
90 finally:
91 watcher.close()
92 policy.set_child_watcher(old_watcher)
93
94
95 def dec_safe_watcher(fun):
96 @functools.wraps(fun)
97 def _inner(*args, **kwargs):
98 with safe_watcher():
99 return fun(*args, **kwargs)
100
101 return _inner
102
103
70 104 @magics_class
71 105 class ScriptMagics(Magics):
72 106 """Magics for talking to scripts
73 107
74 108 This defines a base `%%script` cell magic for running a cell
75 109 with a program in a subprocess, and registers a few top-level
76 110 magics that call %%script with common interpreters.
77 111 """
78 112 script_magics = List(
79 113 help="""Extra script cell magics to define
80 114
81 115 This generates simple wrappers of `%%script foo` as `%%foo`.
82 116
83 117 If you want to add script magics that aren't on your path,
84 118 specify them in script_paths
85 119 """,
86 120 ).tag(config=True)
87 121 @default('script_magics')
88 122 def _script_magics_default(self):
89 123 """default to a common list of programs"""
90 124
91 125 defaults = [
92 126 'sh',
93 127 'bash',
94 128 'perl',
95 129 'ruby',
96 130 'python',
97 131 'python2',
98 132 'python3',
99 133 'pypy',
100 134 ]
101 135 if os.name == 'nt':
102 136 defaults.extend([
103 137 'cmd',
104 138 ])
105 139
106 140 return defaults
107 141
108 142 script_paths = Dict(
109 143 help="""Dict mapping short 'ruby' names to full paths, such as '/opt/secret/bin/ruby'
110 144
111 145 Only necessary for items in script_magics where the default path will not
112 146 find the right interpreter.
113 147 """
114 148 ).tag(config=True)
115 149
116 150 def __init__(self, shell=None):
117 151 super(ScriptMagics, self).__init__(shell=shell)
118 152 self._generate_script_magics()
119 153 self.job_manager = BackgroundJobManager()
120 154 self.bg_processes = []
121 155 atexit.register(self.kill_bg_processes)
122 156
123 157 def __del__(self):
124 158 self.kill_bg_processes()
125 159
126 160 def _generate_script_magics(self):
127 161 cell_magics = self.magics['cell']
128 162 for name in self.script_magics:
129 163 cell_magics[name] = self._make_script_magic(name)
130 164
131 165 def _make_script_magic(self, name):
132 166 """make a named magic, that calls %%script with a particular program"""
133 167 # expand to explicit path if necessary:
134 168 script = self.script_paths.get(name, name)
135 169
136 170 @magic_arguments.magic_arguments()
137 171 @script_args
138 172 def named_script_magic(line, cell):
139 173 # if line, add it as cl-flags
140 174 if line:
141 175 line = "%s %s" % (script, line)
142 176 else:
143 177 line = script
144 178 return self.shebang(line, cell)
145 179
146 180 # write a basic docstring:
147 181 named_script_magic.__doc__ = \
148 182 """%%{name} script magic
149 183
150 184 Run cells with {script} in a subprocess.
151 185
152 186 This is a shortcut for `%%script {script}`
153 187 """.format(**locals())
154 188
155 189 return named_script_magic
156 190
157 191 @magic_arguments.magic_arguments()
158 192 @script_args
159 193 @cell_magic("script")
194 @dec_safe_watcher
160 195 def shebang(self, line, cell):
161 196 """Run a cell via a shell command
162 197
163 198 The `%%script` line is like the #! line of script,
164 199 specifying a program (bash, perl, ruby, etc.) with which to run.
165 200
166 201 The rest of the cell is run by that program.
167 202
168 203 Examples
169 204 --------
170 205 ::
171 206
172 207 In [1]: %%script bash
173 208 ...: for i in 1 2 3; do
174 209 ...: echo $i
175 210 ...: done
176 211 1
177 212 2
178 213 3
179 214 """
180 215
181 216 async def _handle_stream(stream, stream_arg, file_object):
182 217 while True:
183 218 line = (await stream.readline()).decode("utf8")
184 219 if not line:
185 220 break
186 221 if stream_arg:
187 222 self.shell.user_ns[stream_arg] = line
188 223 else:
189 224 file_object.write(line)
190 225 file_object.flush()
191 226
192 227 async def _stream_communicate(process, cell):
193 228 process.stdin.write(cell)
194 229 process.stdin.close()
195 230 stdout_task = asyncio.create_task(
196 231 _handle_stream(process.stdout, args.out, sys.stdout)
197 232 )
198 233 stderr_task = asyncio.create_task(
199 234 _handle_stream(process.stderr, args.err, sys.stderr)
200 235 )
201 236 await asyncio.wait([stdout_task, stderr_task])
202 237 await process.wait()
203 238
204 239 if sys.platform.startswith("win"):
205 240 asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
206 241 loop = asyncio.get_event_loop()
207
208 242 argv = arg_split(line, posix=not sys.platform.startswith("win"))
209 243 args, cmd = self.shebang.parser.parse_known_args(argv)
210 244 try:
211 245 p = loop.run_until_complete(
212 246 asyncio.create_subprocess_exec(
213 247 *cmd,
214 248 stdout=asyncio.subprocess.PIPE,
215 249 stderr=asyncio.subprocess.PIPE,
216 250 stdin=asyncio.subprocess.PIPE,
217 251 )
218 252 )
219 253 except OSError as e:
220 254 if e.errno == errno.ENOENT:
221 255 print("Couldn't find program: %r" % cmd[0])
222 256 return
223 257 else:
224 258 raise
225 259
226 260 if not cell.endswith('\n'):
227 261 cell += '\n'
228 262 cell = cell.encode('utf8', 'replace')
229 263 if args.bg:
230 264 self.bg_processes.append(p)
231 265 self._gc_bg_processes()
232 266 to_close = []
233 267 if args.out:
234 268 self.shell.user_ns[args.out] = p.stdout
235 269 else:
236 270 to_close.append(p.stdout)
237 271 if args.err:
238 272 self.shell.user_ns[args.err] = p.stderr
239 273 else:
240 274 to_close.append(p.stderr)
241 275 self.job_manager.new(self._run_script, p, cell, to_close, daemon=True)
242 276 if args.proc:
243 277 self.shell.user_ns[args.proc] = p
244 278 return
245 279
246 280 try:
247 281 loop.run_until_complete(_stream_communicate(p, cell))
248 282 except KeyboardInterrupt:
249 283 try:
250 284 p.send_signal(signal.SIGINT)
251 285 time.sleep(0.1)
252 286 if p.returncode is not None:
253 287 print("Process is interrupted.")
254 288 return
255 289 p.terminate()
256 290 time.sleep(0.1)
257 291 if p.returncode is not None:
258 292 print("Process is terminated.")
259 293 return
260 294 p.kill()
261 295 print("Process is killed.")
262 296 except OSError:
263 297 pass
264 298 except Exception as e:
265 299 print("Error while terminating subprocess (pid=%i): %s" % (p.pid, e))
266 300 return
267 301 if args.raise_error and p.returncode!=0:
268 302 # If we get here and p.returncode is still None, we must have
269 303 # killed it but not yet seen its return code. We don't wait for it,
270 304 # in case it's stuck in uninterruptible sleep. -9 = SIGKILL
271 305 rc = p.returncode or -9
272 306 raise CalledProcessError(rc, cell)
273 307
274 308 def _run_script(self, p, cell, to_close):
275 309 """callback for running the script in the background"""
276 310 p.stdin.write(cell)
277 311 p.stdin.close()
278 312 for s in to_close:
279 313 s.close()
280 314 p.wait()
281 315
282 316 @line_magic("killbgscripts")
283 317 def killbgscripts(self, _nouse_=''):
284 318 """Kill all BG processes started by %%script and its family."""
285 319 self.kill_bg_processes()
286 320 print("All background processes were killed.")
287 321
288 322 def kill_bg_processes(self):
289 323 """Kill all BG processes which are still running."""
290 324 if not self.bg_processes:
291 325 return
292 326 for p in self.bg_processes:
293 327 if p.returncode is None:
294 328 try:
295 329 p.send_signal(signal.SIGINT)
296 330 except:
297 331 pass
298 332 time.sleep(0.1)
299 333 self._gc_bg_processes()
300 334 if not self.bg_processes:
301 335 return
302 336 for p in self.bg_processes:
303 337 if p.returncode is None:
304 338 try:
305 339 p.terminate()
306 340 except:
307 341 pass
308 342 time.sleep(0.1)
309 343 self._gc_bg_processes()
310 344 if not self.bg_processes:
311 345 return
312 346 for p in self.bg_processes:
313 347 if p.returncode is None:
314 348 try:
315 349 p.kill()
316 350 except:
317 351 pass
318 352 self._gc_bg_processes()
319 353
320 354 def _gc_bg_processes(self):
321 355 self.bg_processes = [p for p in self.bg_processes if p.returncode is None]
@@ -1,1302 +1,1341 b''
1 1 # -*- coding: utf-8 -*-
2 2 """Tests for various magic functions.
3 3
4 4 Needs to be run by nose (to make ipython session available).
5 5 """
6 6
7 import asyncio
7 8 import io
8 9 import os
9 10 import re
11 import shlex
10 12 import sys
11 13 import warnings
12 from textwrap import dedent
13 from unittest import TestCase
14 from unittest import mock
15 14 from importlib import invalidate_caches
16 15 from io import StringIO
17 16 from pathlib import Path
17 from textwrap import dedent
18 from unittest import TestCase, mock
18 19
19 20 import nose.tools as nt
20 21
21 import shlex
22 import pytest
22 23
23 24 from IPython import get_ipython
24 25 from IPython.core import magic
25 26 from IPython.core.error import UsageError
26 from IPython.core.magic import (Magics, magics_class, line_magic,
27 cell_magic,
28 register_line_magic, register_cell_magic)
29 from IPython.core.magics import execution, script, code, logging, osm
27 from IPython.core.magic import (
28 Magics,
29 cell_magic,
30 line_magic,
31 magics_class,
32 register_cell_magic,
33 register_line_magic,
34 )
35 from IPython.core.magics import code, execution, logging, osm, script
30 36 from IPython.testing import decorators as dec
31 37 from IPython.testing import tools as tt
32 38 from IPython.utils.io import capture_output
33 from IPython.utils.tempdir import (TemporaryDirectory,
34 TemporaryWorkingDirectory)
35 39 from IPython.utils.process import find_cmd
36 from .test_debugger import PdbTestInput
40 from IPython.utils.tempdir import TemporaryDirectory, TemporaryWorkingDirectory
37 41
38 import pytest
42 from .test_debugger import PdbTestInput
39 43
40 44
41 45 @magic.magics_class
42 46 class DummyMagics(magic.Magics): pass
43 47
44 48 def test_extract_code_ranges():
45 49 instr = "1 3 5-6 7-9 10:15 17: :10 10- -13 :"
46 50 expected = [(0, 1),
47 51 (2, 3),
48 52 (4, 6),
49 53 (6, 9),
50 54 (9, 14),
51 55 (16, None),
52 56 (None, 9),
53 57 (9, None),
54 58 (None, 13),
55 59 (None, None)]
56 60 actual = list(code.extract_code_ranges(instr))
57 61 nt.assert_equal(actual, expected)
58 62
59 63 def test_extract_symbols():
60 64 source = """import foo\na = 10\ndef b():\n return 42\n\n\nclass A: pass\n\n\n"""
61 65 symbols_args = ["a", "b", "A", "A,b", "A,a", "z"]
62 66 expected = [([], ['a']),
63 67 (["def b():\n return 42\n"], []),
64 68 (["class A: pass\n"], []),
65 69 (["class A: pass\n", "def b():\n return 42\n"], []),
66 70 (["class A: pass\n"], ['a']),
67 71 ([], ['z'])]
68 72 for symbols, exp in zip(symbols_args, expected):
69 73 nt.assert_equal(code.extract_symbols(source, symbols), exp)
70 74
71 75
72 76 def test_extract_symbols_raises_exception_with_non_python_code():
73 77 source = ("=begin A Ruby program :)=end\n"
74 78 "def hello\n"
75 79 "puts 'Hello world'\n"
76 80 "end")
77 81 with nt.assert_raises(SyntaxError):
78 82 code.extract_symbols(source, "hello")
79 83
80 84
81 85 def test_magic_not_found():
82 86 # magic not found raises UsageError
83 87 with nt.assert_raises(UsageError):
84 88 _ip.magic('doesntexist')
85 89
86 90 # ensure result isn't success when a magic isn't found
87 91 result = _ip.run_cell('%doesntexist')
88 92 assert isinstance(result.error_in_exec, UsageError)
89 93
90 94
91 95 def test_cell_magic_not_found():
92 96 # magic not found raises UsageError
93 97 with nt.assert_raises(UsageError):
94 98 _ip.run_cell_magic('doesntexist', 'line', 'cell')
95 99
96 100 # ensure result isn't success when a magic isn't found
97 101 result = _ip.run_cell('%%doesntexist')
98 102 assert isinstance(result.error_in_exec, UsageError)
99 103
100 104
101 105 def test_magic_error_status():
102 106 def fail(shell):
103 107 1/0
104 108 _ip.register_magic_function(fail)
105 109 result = _ip.run_cell('%fail')
106 110 assert isinstance(result.error_in_exec, ZeroDivisionError)
107 111
108 112
109 113 def test_config():
110 114 """ test that config magic does not raise
111 115 can happen if Configurable init is moved too early into
112 116 Magics.__init__ as then a Config object will be registered as a
113 117 magic.
114 118 """
115 119 ## should not raise.
116 120 _ip.magic('config')
117 121
118 122 def test_config_available_configs():
119 123 """ test that config magic prints available configs in unique and
120 124 sorted order. """
121 125 with capture_output() as captured:
122 126 _ip.magic('config')
123 127
124 128 stdout = captured.stdout
125 129 config_classes = stdout.strip().split('\n')[1:]
126 130 nt.assert_list_equal(config_classes, sorted(set(config_classes)))
127 131
128 132 def test_config_print_class():
129 133 """ test that config with a classname prints the class's options. """
130 134 with capture_output() as captured:
131 135 _ip.magic('config TerminalInteractiveShell')
132 136
133 137 stdout = captured.stdout
134 138 if not re.match("TerminalInteractiveShell.* options", stdout.splitlines()[0]):
135 139 print(stdout)
136 140 raise AssertionError("1st line of stdout not like "
137 141 "'TerminalInteractiveShell.* options'")
138 142
139 143 def test_rehashx():
140 144 # clear up everything
141 145 _ip.alias_manager.clear_aliases()
142 146 del _ip.db['syscmdlist']
143 147
144 148 _ip.magic('rehashx')
145 149 # Practically ALL ipython development systems will have more than 10 aliases
146 150
147 151 nt.assert_true(len(_ip.alias_manager.aliases) > 10)
148 152 for name, cmd in _ip.alias_manager.aliases:
149 153 # we must strip dots from alias names
150 154 nt.assert_not_in('.', name)
151 155
152 156 # rehashx must fill up syscmdlist
153 157 scoms = _ip.db['syscmdlist']
154 158 nt.assert_true(len(scoms) > 10)
155 159
156 160
157 161
158 162 def test_magic_parse_options():
159 163 """Test that we don't mangle paths when parsing magic options."""
160 164 ip = get_ipython()
161 165 path = 'c:\\x'
162 166 m = DummyMagics(ip)
163 167 opts = m.parse_options('-f %s' % path,'f:')[0]
164 168 # argv splitting is os-dependent
165 169 if os.name == 'posix':
166 170 expected = 'c:x'
167 171 else:
168 172 expected = path
169 173 nt.assert_equal(opts['f'], expected)
170 174
171 175 def test_magic_parse_long_options():
172 176 """Magic.parse_options can handle --foo=bar long options"""
173 177 ip = get_ipython()
174 178 m = DummyMagics(ip)
175 179 opts, _ = m.parse_options('--foo --bar=bubble', 'a', 'foo', 'bar=')
176 180 nt.assert_in('foo', opts)
177 181 nt.assert_in('bar', opts)
178 182 nt.assert_equal(opts['bar'], "bubble")
179 183
180 184
181 185 def doctest_hist_f():
182 186 """Test %hist -f with temporary filename.
183 187
184 188 In [9]: import tempfile
185 189
186 190 In [10]: tfile = tempfile.mktemp('.py','tmp-ipython-')
187 191
188 192 In [11]: %hist -nl -f $tfile 3
189 193
190 194 In [13]: import os; os.unlink(tfile)
191 195 """
192 196
193 197
194 198 def doctest_hist_op():
195 199 """Test %hist -op
196 200
197 201 In [1]: class b(float):
198 202 ...: pass
199 203 ...:
200 204
201 205 In [2]: class s(object):
202 206 ...: def __str__(self):
203 207 ...: return 's'
204 208 ...:
205 209
206 210 In [3]:
207 211
208 212 In [4]: class r(b):
209 213 ...: def __repr__(self):
210 214 ...: return 'r'
211 215 ...:
212 216
213 217 In [5]: class sr(s,r): pass
214 218 ...:
215 219
216 220 In [6]:
217 221
218 222 In [7]: bb=b()
219 223
220 224 In [8]: ss=s()
221 225
222 226 In [9]: rr=r()
223 227
224 228 In [10]: ssrr=sr()
225 229
226 230 In [11]: 4.5
227 231 Out[11]: 4.5
228 232
229 233 In [12]: str(ss)
230 234 Out[12]: 's'
231 235
232 236 In [13]:
233 237
234 238 In [14]: %hist -op
235 239 >>> class b:
236 240 ... pass
237 241 ...
238 242 >>> class s(b):
239 243 ... def __str__(self):
240 244 ... return 's'
241 245 ...
242 246 >>>
243 247 >>> class r(b):
244 248 ... def __repr__(self):
245 249 ... return 'r'
246 250 ...
247 251 >>> class sr(s,r): pass
248 252 >>>
249 253 >>> bb=b()
250 254 >>> ss=s()
251 255 >>> rr=r()
252 256 >>> ssrr=sr()
253 257 >>> 4.5
254 258 4.5
255 259 >>> str(ss)
256 260 's'
257 261 >>>
258 262 """
259 263
260 264 def test_hist_pof():
261 265 ip = get_ipython()
262 266 ip.run_cell(u"1+2", store_history=True)
263 267 #raise Exception(ip.history_manager.session_number)
264 268 #raise Exception(list(ip.history_manager._get_range_session()))
265 269 with TemporaryDirectory() as td:
266 270 tf = os.path.join(td, 'hist.py')
267 271 ip.run_line_magic('history', '-pof %s' % tf)
268 272 assert os.path.isfile(tf)
269 273
270 274
271 275 def test_macro():
272 276 ip = get_ipython()
273 277 ip.history_manager.reset() # Clear any existing history.
274 278 cmds = ["a=1", "def b():\n return a**2", "print(a,b())"]
275 279 for i, cmd in enumerate(cmds, start=1):
276 280 ip.history_manager.store_inputs(i, cmd)
277 281 ip.magic("macro test 1-3")
278 282 nt.assert_equal(ip.user_ns["test"].value, "\n".join(cmds)+"\n")
279 283
280 284 # List macros
281 285 nt.assert_in("test", ip.magic("macro"))
282 286
283 287
284 288 def test_macro_run():
285 289 """Test that we can run a multi-line macro successfully."""
286 290 ip = get_ipython()
287 291 ip.history_manager.reset()
288 292 cmds = ["a=10", "a+=1", "print(a)", "%macro test 2-3"]
289 293 for cmd in cmds:
290 294 ip.run_cell(cmd, store_history=True)
291 295 nt.assert_equal(ip.user_ns["test"].value, "a+=1\nprint(a)\n")
292 296 with tt.AssertPrints("12"):
293 297 ip.run_cell("test")
294 298 with tt.AssertPrints("13"):
295 299 ip.run_cell("test")
296 300
297 301
298 302 def test_magic_magic():
299 303 """Test %magic"""
300 304 ip = get_ipython()
301 305 with capture_output() as captured:
302 306 ip.magic("magic")
303 307
304 308 stdout = captured.stdout
305 309 nt.assert_in('%magic', stdout)
306 310 nt.assert_in('IPython', stdout)
307 311 nt.assert_in('Available', stdout)
308 312
309 313
310 314 @dec.skipif_not_numpy
311 315 def test_numpy_reset_array_undec():
312 316 "Test '%reset array' functionality"
313 317 _ip.ex('import numpy as np')
314 318 _ip.ex('a = np.empty(2)')
315 319 nt.assert_in('a', _ip.user_ns)
316 320 _ip.magic('reset -f array')
317 321 nt.assert_not_in('a', _ip.user_ns)
318 322
319 323 def test_reset_out():
320 324 "Test '%reset out' magic"
321 325 _ip.run_cell("parrot = 'dead'", store_history=True)
322 326 # test '%reset -f out', make an Out prompt
323 327 _ip.run_cell("parrot", store_history=True)
324 328 nt.assert_true('dead' in [_ip.user_ns[x] for x in ('_','__','___')])
325 329 _ip.magic('reset -f out')
326 330 nt.assert_false('dead' in [_ip.user_ns[x] for x in ('_','__','___')])
327 331 nt.assert_equal(len(_ip.user_ns['Out']), 0)
328 332
329 333 def test_reset_in():
330 334 "Test '%reset in' magic"
331 335 # test '%reset -f in'
332 336 _ip.run_cell("parrot", store_history=True)
333 337 nt.assert_true('parrot' in [_ip.user_ns[x] for x in ('_i','_ii','_iii')])
334 338 _ip.magic('%reset -f in')
335 339 nt.assert_false('parrot' in [_ip.user_ns[x] for x in ('_i','_ii','_iii')])
336 340 nt.assert_equal(len(set(_ip.user_ns['In'])), 1)
337 341
338 342 def test_reset_dhist():
339 343 "Test '%reset dhist' magic"
340 344 _ip.run_cell("tmp = [d for d in _dh]") # copy before clearing
341 345 _ip.magic('cd ' + os.path.dirname(nt.__file__))
342 346 _ip.magic('cd -')
343 347 nt.assert_true(len(_ip.user_ns['_dh']) > 0)
344 348 _ip.magic('reset -f dhist')
345 349 nt.assert_equal(len(_ip.user_ns['_dh']), 0)
346 350 _ip.run_cell("_dh = [d for d in tmp]") #restore
347 351
348 352 def test_reset_in_length():
349 353 "Test that '%reset in' preserves In[] length"
350 354 _ip.run_cell("print 'foo'")
351 355 _ip.run_cell("reset -f in")
352 356 nt.assert_equal(len(_ip.user_ns['In']), _ip.displayhook.prompt_count+1)
353 357
354 358 class TestResetErrors(TestCase):
355 359
356 360 def test_reset_redefine(self):
357 361
358 362 @magics_class
359 363 class KernelMagics(Magics):
360 364 @line_magic
361 365 def less(self, shell): pass
362 366
363 367 _ip.register_magics(KernelMagics)
364 368
365 369 with self.assertLogs() as cm:
366 370 # hack, we want to just capture logs, but assertLogs fails if not
367 371 # logs get produce.
368 372 # so log one things we ignore.
369 373 import logging as log_mod
370 374 log = log_mod.getLogger()
371 375 log.info('Nothing')
372 376 # end hack.
373 377 _ip.run_cell("reset -f")
374 378
375 379 assert len(cm.output) == 1
376 380 for out in cm.output:
377 381 assert "Invalid alias" not in out
378 382
379 383 def test_tb_syntaxerror():
380 384 """test %tb after a SyntaxError"""
381 385 ip = get_ipython()
382 386 ip.run_cell("for")
383 387
384 388 # trap and validate stdout
385 389 save_stdout = sys.stdout
386 390 try:
387 391 sys.stdout = StringIO()
388 392 ip.run_cell("%tb")
389 393 out = sys.stdout.getvalue()
390 394 finally:
391 395 sys.stdout = save_stdout
392 396 # trim output, and only check the last line
393 397 last_line = out.rstrip().splitlines()[-1].strip()
394 398 nt.assert_equal(last_line, "SyntaxError: invalid syntax")
395 399
396 400
397 401 def test_time():
398 402 ip = get_ipython()
399 403
400 404 with tt.AssertPrints("Wall time: "):
401 405 ip.run_cell("%time None")
402 406
403 407 ip.run_cell("def f(kmjy):\n"
404 408 " %time print (2*kmjy)")
405 409
406 410 with tt.AssertPrints("Wall time: "):
407 411 with tt.AssertPrints("hihi", suppress=False):
408 412 ip.run_cell("f('hi')")
409 413
410 414 def test_time_last_not_expression():
411 415 ip.run_cell("%%time\n"
412 416 "var_1 = 1\n"
413 417 "var_2 = 2\n")
414 418 assert ip.user_ns['var_1'] == 1
415 419 del ip.user_ns['var_1']
416 420 assert ip.user_ns['var_2'] == 2
417 421 del ip.user_ns['var_2']
418 422
419 423
420 424 @dec.skip_win32
421 425 def test_time2():
422 426 ip = get_ipython()
423 427
424 428 with tt.AssertPrints("CPU times: user "):
425 429 ip.run_cell("%time None")
426 430
427 431 def test_time3():
428 432 """Erroneous magic function calls, issue gh-3334"""
429 433 ip = get_ipython()
430 434 ip.user_ns.pop('run', None)
431 435
432 436 with tt.AssertNotPrints("not found", channel='stderr'):
433 437 ip.run_cell("%%time\n"
434 438 "run = 0\n"
435 439 "run += 1")
436 440
437 441 def test_multiline_time():
438 442 """Make sure last statement from time return a value."""
439 443 ip = get_ipython()
440 444 ip.user_ns.pop('run', None)
441 445
442 446 ip.run_cell(dedent("""\
443 447 %%time
444 448 a = "ho"
445 449 b = "hey"
446 450 a+b
447 451 """))
448 452 nt.assert_equal(ip.user_ns_hidden['_'], 'hohey')
449 453
450 454 def test_time_local_ns():
451 455 """
452 456 Test that local_ns is actually global_ns when running a cell magic
453 457 """
454 458 ip = get_ipython()
455 459 ip.run_cell("%%time\n"
456 460 "myvar = 1")
457 461 nt.assert_equal(ip.user_ns['myvar'], 1)
458 462 del ip.user_ns['myvar']
459 463
460 464 def test_doctest_mode():
461 465 "Toggle doctest_mode twice, it should be a no-op and run without error"
462 466 _ip.magic('doctest_mode')
463 467 _ip.magic('doctest_mode')
464 468
465 469
466 470 def test_parse_options():
467 471 """Tests for basic options parsing in magics."""
468 472 # These are only the most minimal of tests, more should be added later. At
469 473 # the very least we check that basic text/unicode calls work OK.
470 474 m = DummyMagics(_ip)
471 475 nt.assert_equal(m.parse_options('foo', '')[1], 'foo')
472 476 nt.assert_equal(m.parse_options(u'foo', '')[1], u'foo')
473 477
474 478
475 479 def test_parse_options_preserve_non_option_string():
476 480 """Test to assert preservation of non-option part of magic-block, while parsing magic options."""
477 481 m = DummyMagics(_ip)
478 482 opts, stmt = m.parse_options(
479 483 " -n1 -r 13 _ = 314 + foo", "n:r:", preserve_non_opts=True
480 484 )
481 485 nt.assert_equal(opts, {"n": "1", "r": "13"})
482 486 nt.assert_equal(stmt, "_ = 314 + foo")
483 487
484 488
485 489 def test_run_magic_preserve_code_block():
486 490 """Test to assert preservation of non-option part of magic-block, while running magic."""
487 491 _ip.user_ns["spaces"] = []
488 492 _ip.magic("timeit -n1 -r1 spaces.append([s.count(' ') for s in ['document']])")
489 493 assert _ip.user_ns["spaces"] == [[0]]
490 494
491 495
492 496 def test_dirops():
493 497 """Test various directory handling operations."""
494 498 # curpath = lambda :os.path.splitdrive(os.getcwd())[1].replace('\\','/')
495 499 curpath = os.getcwd
496 500 startdir = os.getcwd()
497 501 ipdir = os.path.realpath(_ip.ipython_dir)
498 502 try:
499 503 _ip.magic('cd "%s"' % ipdir)
500 504 nt.assert_equal(curpath(), ipdir)
501 505 _ip.magic('cd -')
502 506 nt.assert_equal(curpath(), startdir)
503 507 _ip.magic('pushd "%s"' % ipdir)
504 508 nt.assert_equal(curpath(), ipdir)
505 509 _ip.magic('popd')
506 510 nt.assert_equal(curpath(), startdir)
507 511 finally:
508 512 os.chdir(startdir)
509 513
510 514
511 515 def test_cd_force_quiet():
512 516 """Test OSMagics.cd_force_quiet option"""
513 517 _ip.config.OSMagics.cd_force_quiet = True
514 518 osmagics = osm.OSMagics(shell=_ip)
515 519
516 520 startdir = os.getcwd()
517 521 ipdir = os.path.realpath(_ip.ipython_dir)
518 522
519 523 try:
520 524 with tt.AssertNotPrints(ipdir):
521 525 osmagics.cd('"%s"' % ipdir)
522 526 with tt.AssertNotPrints(startdir):
523 527 osmagics.cd('-')
524 528 finally:
525 529 os.chdir(startdir)
526 530
527 531
528 532 def test_xmode():
529 533 # Calling xmode three times should be a no-op
530 534 xmode = _ip.InteractiveTB.mode
531 535 for i in range(4):
532 536 _ip.magic("xmode")
533 537 nt.assert_equal(_ip.InteractiveTB.mode, xmode)
534 538
535 539 def test_reset_hard():
536 540 monitor = []
537 541 class A(object):
538 542 def __del__(self):
539 543 monitor.append(1)
540 544 def __repr__(self):
541 545 return "<A instance>"
542 546
543 547 _ip.user_ns["a"] = A()
544 548 _ip.run_cell("a")
545 549
546 550 nt.assert_equal(monitor, [])
547 551 _ip.magic("reset -f")
548 552 nt.assert_equal(monitor, [1])
549 553
550 554 class TestXdel(tt.TempFileMixin):
551 555 def test_xdel(self):
552 556 """Test that references from %run are cleared by xdel."""
553 557 src = ("class A(object):\n"
554 558 " monitor = []\n"
555 559 " def __del__(self):\n"
556 560 " self.monitor.append(1)\n"
557 561 "a = A()\n")
558 562 self.mktmp(src)
559 563 # %run creates some hidden references...
560 564 _ip.magic("run %s" % self.fname)
561 565 # ... as does the displayhook.
562 566 _ip.run_cell("a")
563 567
564 568 monitor = _ip.user_ns["A"].monitor
565 569 nt.assert_equal(monitor, [])
566 570
567 571 _ip.magic("xdel a")
568 572
569 573 # Check that a's __del__ method has been called.
570 574 nt.assert_equal(monitor, [1])
571 575
572 576 def doctest_who():
573 577 """doctest for %who
574 578
575 579 In [1]: %reset -f
576 580
577 581 In [2]: alpha = 123
578 582
579 583 In [3]: beta = 'beta'
580 584
581 585 In [4]: %who int
582 586 alpha
583 587
584 588 In [5]: %who str
585 589 beta
586 590
587 591 In [6]: %whos
588 592 Variable Type Data/Info
589 593 ----------------------------
590 594 alpha int 123
591 595 beta str beta
592 596
593 597 In [7]: %who_ls
594 598 Out[7]: ['alpha', 'beta']
595 599 """
596 600
597 601 def test_whos():
598 602 """Check that whos is protected against objects where repr() fails."""
599 603 class A(object):
600 604 def __repr__(self):
601 605 raise Exception()
602 606 _ip.user_ns['a'] = A()
603 607 _ip.magic("whos")
604 608
605 609 def doctest_precision():
606 610 """doctest for %precision
607 611
608 612 In [1]: f = get_ipython().display_formatter.formatters['text/plain']
609 613
610 614 In [2]: %precision 5
611 615 Out[2]: '%.5f'
612 616
613 617 In [3]: f.float_format
614 618 Out[3]: '%.5f'
615 619
616 620 In [4]: %precision %e
617 621 Out[4]: '%e'
618 622
619 623 In [5]: f(3.1415927)
620 624 Out[5]: '3.141593e+00'
621 625 """
622 626
623 627 def test_debug_magic():
624 628 """Test debugging a small code with %debug
625 629
626 630 In [1]: with PdbTestInput(['c']):
627 631 ...: %debug print("a b") #doctest: +ELLIPSIS
628 632 ...:
629 633 ...
630 634 ipdb> c
631 635 a b
632 636 In [2]:
633 637 """
634 638
635 639 def test_psearch():
636 640 with tt.AssertPrints("dict.fromkeys"):
637 641 _ip.run_cell("dict.fr*?")
638 642 with tt.AssertPrints("Ο€.is_integer"):
639 643 _ip.run_cell("Ο€ = 3.14;\nΟ€.is_integ*?")
640 644
641 645 def test_timeit_shlex():
642 646 """test shlex issues with timeit (#1109)"""
643 647 _ip.ex("def f(*a,**kw): pass")
644 648 _ip.magic('timeit -n1 "this is a bug".count(" ")')
645 649 _ip.magic('timeit -r1 -n1 f(" ", 1)')
646 650 _ip.magic('timeit -r1 -n1 f(" ", 1, " ", 2, " ")')
647 651 _ip.magic('timeit -r1 -n1 ("a " + "b")')
648 652 _ip.magic('timeit -r1 -n1 f("a " + "b")')
649 653 _ip.magic('timeit -r1 -n1 f("a " + "b ")')
650 654
651 655
652 656 def test_timeit_special_syntax():
653 657 "Test %%timeit with IPython special syntax"
654 658 @register_line_magic
655 659 def lmagic(line):
656 660 ip = get_ipython()
657 661 ip.user_ns['lmagic_out'] = line
658 662
659 663 # line mode test
660 664 _ip.run_line_magic('timeit', '-n1 -r1 %lmagic my line')
661 665 nt.assert_equal(_ip.user_ns['lmagic_out'], 'my line')
662 666 # cell mode test
663 667 _ip.run_cell_magic('timeit', '-n1 -r1', '%lmagic my line2')
664 668 nt.assert_equal(_ip.user_ns['lmagic_out'], 'my line2')
665 669
666 670 def test_timeit_return():
667 671 """
668 672 test whether timeit -o return object
669 673 """
670 674
671 675 res = _ip.run_line_magic('timeit','-n10 -r10 -o 1')
672 676 assert(res is not None)
673 677
674 678 def test_timeit_quiet():
675 679 """
676 680 test quiet option of timeit magic
677 681 """
678 682 with tt.AssertNotPrints("loops"):
679 683 _ip.run_cell("%timeit -n1 -r1 -q 1")
680 684
681 685 def test_timeit_return_quiet():
682 686 with tt.AssertNotPrints("loops"):
683 687 res = _ip.run_line_magic('timeit', '-n1 -r1 -q -o 1')
684 688 assert (res is not None)
685 689
686 690 def test_timeit_invalid_return():
687 691 with nt.assert_raises_regex(SyntaxError, "outside function"):
688 692 _ip.run_line_magic('timeit', 'return')
689 693
690 694 @dec.skipif(execution.profile is None)
691 695 def test_prun_special_syntax():
692 696 "Test %%prun with IPython special syntax"
693 697 @register_line_magic
694 698 def lmagic(line):
695 699 ip = get_ipython()
696 700 ip.user_ns['lmagic_out'] = line
697 701
698 702 # line mode test
699 703 _ip.run_line_magic('prun', '-q %lmagic my line')
700 704 nt.assert_equal(_ip.user_ns['lmagic_out'], 'my line')
701 705 # cell mode test
702 706 _ip.run_cell_magic('prun', '-q', '%lmagic my line2')
703 707 nt.assert_equal(_ip.user_ns['lmagic_out'], 'my line2')
704 708
705 709 @dec.skipif(execution.profile is None)
706 710 def test_prun_quotes():
707 711 "Test that prun does not clobber string escapes (GH #1302)"
708 712 _ip.magic(r"prun -q x = '\t'")
709 713 nt.assert_equal(_ip.user_ns['x'], '\t')
710 714
711 715 def test_extension():
712 716 # Debugging information for failures of this test
713 717 print('sys.path:')
714 718 for p in sys.path:
715 719 print(' ', p)
716 720 print('CWD', os.getcwd())
717 721
718 722 nt.assert_raises(ImportError, _ip.magic, "load_ext daft_extension")
719 723 daft_path = os.path.join(os.path.dirname(__file__), "daft_extension")
720 724 sys.path.insert(0, daft_path)
721 725 try:
722 726 _ip.user_ns.pop('arq', None)
723 727 invalidate_caches() # Clear import caches
724 728 _ip.magic("load_ext daft_extension")
725 729 nt.assert_equal(_ip.user_ns['arq'], 185)
726 730 _ip.magic("unload_ext daft_extension")
727 731 assert 'arq' not in _ip.user_ns
728 732 finally:
729 733 sys.path.remove(daft_path)
730 734
731 735
732 736 def test_notebook_export_json():
733 737 _ip = get_ipython()
734 738 _ip.history_manager.reset() # Clear any existing history.
735 739 cmds = [u"a=1", u"def b():\n return a**2", u"print('noΓ«l, Γ©tΓ©', b())"]
736 740 for i, cmd in enumerate(cmds, start=1):
737 741 _ip.history_manager.store_inputs(i, cmd)
738 742 with TemporaryDirectory() as td:
739 743 outfile = os.path.join(td, "nb.ipynb")
740 744 _ip.magic("notebook -e %s" % outfile)
741 745
742 746
743 747 class TestEnv(TestCase):
744 748
745 749 def test_env(self):
746 750 env = _ip.magic("env")
747 751 self.assertTrue(isinstance(env, dict))
748 752
749 753 def test_env_secret(self):
750 754 env = _ip.magic("env")
751 755 hidden = "<hidden>"
752 756 with mock.patch.dict(
753 757 os.environ,
754 758 {
755 759 "API_KEY": "abc123",
756 760 "SECRET_THING": "ssshhh",
757 761 "JUPYTER_TOKEN": "",
758 762 "VAR": "abc"
759 763 }
760 764 ):
761 765 env = _ip.magic("env")
762 766 assert env["API_KEY"] == hidden
763 767 assert env["SECRET_THING"] == hidden
764 768 assert env["JUPYTER_TOKEN"] == hidden
765 769 assert env["VAR"] == "abc"
766 770
767 771 def test_env_get_set_simple(self):
768 772 env = _ip.magic("env var val1")
769 773 self.assertEqual(env, None)
770 774 self.assertEqual(os.environ['var'], 'val1')
771 775 self.assertEqual(_ip.magic("env var"), 'val1')
772 776 env = _ip.magic("env var=val2")
773 777 self.assertEqual(env, None)
774 778 self.assertEqual(os.environ['var'], 'val2')
775 779
776 780 def test_env_get_set_complex(self):
777 781 env = _ip.magic("env var 'val1 '' 'val2")
778 782 self.assertEqual(env, None)
779 783 self.assertEqual(os.environ['var'], "'val1 '' 'val2")
780 784 self.assertEqual(_ip.magic("env var"), "'val1 '' 'val2")
781 785 env = _ip.magic('env var=val2 val3="val4')
782 786 self.assertEqual(env, None)
783 787 self.assertEqual(os.environ['var'], 'val2 val3="val4')
784 788
785 789 def test_env_set_bad_input(self):
786 790 self.assertRaises(UsageError, lambda: _ip.magic("set_env var"))
787 791
788 792 def test_env_set_whitespace(self):
789 793 self.assertRaises(UsageError, lambda: _ip.magic("env var A=B"))
790 794
791 795
792 796 class CellMagicTestCase(TestCase):
793 797
794 798 def check_ident(self, magic):
795 799 # Manually called, we get the result
796 800 out = _ip.run_cell_magic(magic, 'a', 'b')
797 801 nt.assert_equal(out, ('a','b'))
798 802 # Via run_cell, it goes into the user's namespace via displayhook
799 803 _ip.run_cell('%%' + magic +' c\nd\n')
800 804 nt.assert_equal(_ip.user_ns['_'], ('c','d\n'))
801 805
802 806 def test_cell_magic_func_deco(self):
803 807 "Cell magic using simple decorator"
804 808 @register_cell_magic
805 809 def cellm(line, cell):
806 810 return line, cell
807 811
808 812 self.check_ident('cellm')
809 813
810 814 def test_cell_magic_reg(self):
811 815 "Cell magic manually registered"
812 816 def cellm(line, cell):
813 817 return line, cell
814 818
815 819 _ip.register_magic_function(cellm, 'cell', 'cellm2')
816 820 self.check_ident('cellm2')
817 821
818 822 def test_cell_magic_class(self):
819 823 "Cell magics declared via a class"
820 824 @magics_class
821 825 class MyMagics(Magics):
822 826
823 827 @cell_magic
824 828 def cellm3(self, line, cell):
825 829 return line, cell
826 830
827 831 _ip.register_magics(MyMagics)
828 832 self.check_ident('cellm3')
829 833
830 834 def test_cell_magic_class2(self):
831 835 "Cell magics declared via a class, #2"
832 836 @magics_class
833 837 class MyMagics2(Magics):
834 838
835 839 @cell_magic('cellm4')
836 840 def cellm33(self, line, cell):
837 841 return line, cell
838 842
839 843 _ip.register_magics(MyMagics2)
840 844 self.check_ident('cellm4')
841 845 # Check that nothing is registered as 'cellm33'
842 846 c33 = _ip.find_cell_magic('cellm33')
843 847 nt.assert_equal(c33, None)
844 848
845 849 def test_file():
846 850 """Basic %%writefile"""
847 851 ip = get_ipython()
848 852 with TemporaryDirectory() as td:
849 853 fname = os.path.join(td, 'file1')
850 854 ip.run_cell_magic("writefile", fname, u'\n'.join([
851 855 'line1',
852 856 'line2',
853 857 ]))
854 858 s = Path(fname).read_text()
855 859 nt.assert_in('line1\n', s)
856 860 nt.assert_in('line2', s)
857 861
858 862 @dec.skip_win32
859 863 def test_file_single_quote():
860 864 """Basic %%writefile with embedded single quotes"""
861 865 ip = get_ipython()
862 866 with TemporaryDirectory() as td:
863 867 fname = os.path.join(td, '\'file1\'')
864 868 ip.run_cell_magic("writefile", fname, u'\n'.join([
865 869 'line1',
866 870 'line2',
867 871 ]))
868 872 s = Path(fname).read_text()
869 873 nt.assert_in('line1\n', s)
870 874 nt.assert_in('line2', s)
871 875
872 876 @dec.skip_win32
873 877 def test_file_double_quote():
874 878 """Basic %%writefile with embedded double quotes"""
875 879 ip = get_ipython()
876 880 with TemporaryDirectory() as td:
877 881 fname = os.path.join(td, '"file1"')
878 882 ip.run_cell_magic("writefile", fname, u'\n'.join([
879 883 'line1',
880 884 'line2',
881 885 ]))
882 886 s = Path(fname).read_text()
883 887 nt.assert_in('line1\n', s)
884 888 nt.assert_in('line2', s)
885 889
886 890 def test_file_var_expand():
887 891 """%%writefile $filename"""
888 892 ip = get_ipython()
889 893 with TemporaryDirectory() as td:
890 894 fname = os.path.join(td, 'file1')
891 895 ip.user_ns['filename'] = fname
892 896 ip.run_cell_magic("writefile", '$filename', u'\n'.join([
893 897 'line1',
894 898 'line2',
895 899 ]))
896 900 s = Path(fname).read_text()
897 901 nt.assert_in('line1\n', s)
898 902 nt.assert_in('line2', s)
899 903
900 904 def test_file_unicode():
901 905 """%%writefile with unicode cell"""
902 906 ip = get_ipython()
903 907 with TemporaryDirectory() as td:
904 908 fname = os.path.join(td, 'file1')
905 909 ip.run_cell_magic("writefile", fname, u'\n'.join([
906 910 u'linΓ©1',
907 911 u'linΓ©2',
908 912 ]))
909 913 with io.open(fname, encoding='utf-8') as f:
910 914 s = f.read()
911 915 nt.assert_in(u'linΓ©1\n', s)
912 916 nt.assert_in(u'linΓ©2', s)
913 917
914 918 def test_file_amend():
915 919 """%%writefile -a amends files"""
916 920 ip = get_ipython()
917 921 with TemporaryDirectory() as td:
918 922 fname = os.path.join(td, 'file2')
919 923 ip.run_cell_magic("writefile", fname, u'\n'.join([
920 924 'line1',
921 925 'line2',
922 926 ]))
923 927 ip.run_cell_magic("writefile", "-a %s" % fname, u'\n'.join([
924 928 'line3',
925 929 'line4',
926 930 ]))
927 931 s = Path(fname).read_text()
928 932 nt.assert_in('line1\n', s)
929 933 nt.assert_in('line3\n', s)
930 934
931 935 def test_file_spaces():
932 936 """%%file with spaces in filename"""
933 937 ip = get_ipython()
934 938 with TemporaryWorkingDirectory() as td:
935 939 fname = "file name"
936 940 ip.run_cell_magic("file", '"%s"'%fname, u'\n'.join([
937 941 'line1',
938 942 'line2',
939 943 ]))
940 944 s = Path(fname).read_text()
941 945 nt.assert_in('line1\n', s)
942 946 nt.assert_in('line2', s)
943 947
944 948 def test_script_config():
945 949 ip = get_ipython()
946 950 ip.config.ScriptMagics.script_magics = ['whoda']
947 951 sm = script.ScriptMagics(shell=ip)
948 952 nt.assert_in('whoda', sm.magics['cell'])
949 953
954 @dec.skip_iptest_but_not_pytest
950 955 @dec.skip_win32
956 @pytest.mark.skipif(
957 sys.platform == "win32", reason="This test does not run under Windows"
958 )
951 959 def test_script_out():
960 assert asyncio.get_event_loop().is_running() is False
961
952 962 ip = get_ipython()
953 963 ip.run_cell_magic("script", "--out output sh", "echo 'hi'")
964 assert asyncio.get_event_loop().is_running() is False
954 965 nt.assert_equal(ip.user_ns['output'], 'hi\n')
955 966
967 @dec.skip_iptest_but_not_pytest
956 968 @dec.skip_win32
969 @pytest.mark.skipif(
970 sys.platform == "win32", reason="This test does not run under Windows"
971 )
957 972 def test_script_err():
958 973 ip = get_ipython()
974 assert asyncio.get_event_loop().is_running() is False
959 975 ip.run_cell_magic("script", "--err error sh", "echo 'hello' >&2")
976 assert asyncio.get_event_loop().is_running() is False
960 977 nt.assert_equal(ip.user_ns['error'], 'hello\n')
961 978
979
980 @dec.skip_iptest_but_not_pytest
962 981 @dec.skip_win32
982 @pytest.mark.skipif(
983 sys.platform == "win32", reason="This test does not run under Windows"
984 )
963 985 def test_script_out_err():
986
964 987 ip = get_ipython()
965 ip.run_cell_magic("script", "--out output --err error sh", "echo 'hi'\necho 'hello' >&2")
966 nt.assert_equal(ip.user_ns['output'], 'hi\n')
967 nt.assert_equal(ip.user_ns['error'], 'hello\n')
988 ip.run_cell_magic(
989 "script", "--out output --err error sh", "echo 'hi'\necho 'hello' >&2"
990 )
991 nt.assert_equal(ip.user_ns["output"], "hi\n")
992 nt.assert_equal(ip.user_ns["error"], "hello\n")
968 993
994
995 @dec.skip_iptest_but_not_pytest
969 996 @dec.skip_win32
997 @pytest.mark.skipif(
998 sys.platform == "win32", reason="This test does not run under Windows"
999 )
970 1000 async def test_script_bg_out():
971 1001 ip = get_ipython()
972 1002 ip.run_cell_magic("script", "--bg --out output sh", "echo 'hi'")
973 1003 nt.assert_equal((await ip.user_ns["output"].read()), b"hi\n")
974 ip.user_ns['output'].close()
1004 ip.user_ns["output"].close()
1005 asyncio.get_event_loop().stop()
975 1006
1007 @dec.skip_iptest_but_not_pytest
976 1008 @dec.skip_win32
1009 @pytest.mark.skipif(
1010 sys.platform == "win32", reason="This test does not run under Windows"
1011 )
977 1012 async def test_script_bg_err():
978 1013 ip = get_ipython()
979 1014 ip.run_cell_magic("script", "--bg --err error sh", "echo 'hello' >&2")
980 1015 nt.assert_equal((await ip.user_ns["error"].read()), b"hello\n")
981 1016 ip.user_ns["error"].close()
982 1017
983 1018
1019 @dec.skip_iptest_but_not_pytest
984 1020 @dec.skip_win32
1021 @pytest.mark.skipif(
1022 sys.platform == "win32", reason="This test does not run under Windows"
1023 )
985 1024 async def test_script_bg_out_err():
986 1025 ip = get_ipython()
987 1026 ip.run_cell_magic(
988 1027 "script", "--bg --out output --err error sh", "echo 'hi'\necho 'hello' >&2"
989 1028 )
990 1029 nt.assert_equal((await ip.user_ns["output"].read()), b"hi\n")
991 1030 nt.assert_equal((await ip.user_ns["error"].read()), b"hello\n")
992 1031 ip.user_ns["output"].close()
993 1032 ip.user_ns["error"].close()
994 1033
995 1034
996 1035 def test_script_defaults():
997 1036 ip = get_ipython()
998 1037 for cmd in ['sh', 'bash', 'perl', 'ruby']:
999 1038 try:
1000 1039 find_cmd(cmd)
1001 1040 except Exception:
1002 1041 pass
1003 1042 else:
1004 1043 nt.assert_in(cmd, ip.magics_manager.magics['cell'])
1005 1044
1006 1045
1007 1046 @magics_class
1008 1047 class FooFoo(Magics):
1009 1048 """class with both %foo and %%foo magics"""
1010 1049 @line_magic('foo')
1011 1050 def line_foo(self, line):
1012 1051 "I am line foo"
1013 1052 pass
1014 1053
1015 1054 @cell_magic("foo")
1016 1055 def cell_foo(self, line, cell):
1017 1056 "I am cell foo, not line foo"
1018 1057 pass
1019 1058
1020 1059 def test_line_cell_info():
1021 1060 """%%foo and %foo magics are distinguishable to inspect"""
1022 1061 ip = get_ipython()
1023 1062 ip.magics_manager.register(FooFoo)
1024 1063 oinfo = ip.object_inspect('foo')
1025 1064 nt.assert_true(oinfo['found'])
1026 1065 nt.assert_true(oinfo['ismagic'])
1027 1066
1028 1067 oinfo = ip.object_inspect('%%foo')
1029 1068 nt.assert_true(oinfo['found'])
1030 1069 nt.assert_true(oinfo['ismagic'])
1031 1070 nt.assert_equal(oinfo['docstring'], FooFoo.cell_foo.__doc__)
1032 1071
1033 1072 oinfo = ip.object_inspect('%foo')
1034 1073 nt.assert_true(oinfo['found'])
1035 1074 nt.assert_true(oinfo['ismagic'])
1036 1075 nt.assert_equal(oinfo['docstring'], FooFoo.line_foo.__doc__)
1037 1076
1038 1077 def test_multiple_magics():
1039 1078 ip = get_ipython()
1040 1079 foo1 = FooFoo(ip)
1041 1080 foo2 = FooFoo(ip)
1042 1081 mm = ip.magics_manager
1043 1082 mm.register(foo1)
1044 1083 nt.assert_true(mm.magics['line']['foo'].__self__ is foo1)
1045 1084 mm.register(foo2)
1046 1085 nt.assert_true(mm.magics['line']['foo'].__self__ is foo2)
1047 1086
1048 1087 def test_alias_magic():
1049 1088 """Test %alias_magic."""
1050 1089 ip = get_ipython()
1051 1090 mm = ip.magics_manager
1052 1091
1053 1092 # Basic operation: both cell and line magics are created, if possible.
1054 1093 ip.run_line_magic('alias_magic', 'timeit_alias timeit')
1055 1094 nt.assert_in('timeit_alias', mm.magics['line'])
1056 1095 nt.assert_in('timeit_alias', mm.magics['cell'])
1057 1096
1058 1097 # --cell is specified, line magic not created.
1059 1098 ip.run_line_magic('alias_magic', '--cell timeit_cell_alias timeit')
1060 1099 nt.assert_not_in('timeit_cell_alias', mm.magics['line'])
1061 1100 nt.assert_in('timeit_cell_alias', mm.magics['cell'])
1062 1101
1063 1102 # Test that line alias is created successfully.
1064 1103 ip.run_line_magic('alias_magic', '--line env_alias env')
1065 1104 nt.assert_equal(ip.run_line_magic('env', ''),
1066 1105 ip.run_line_magic('env_alias', ''))
1067 1106
1068 1107 # Test that line alias with parameters passed in is created successfully.
1069 1108 ip.run_line_magic('alias_magic', '--line history_alias history --params ' + shlex.quote('3'))
1070 1109 nt.assert_in('history_alias', mm.magics['line'])
1071 1110
1072 1111
1073 1112 def test_save():
1074 1113 """Test %save."""
1075 1114 ip = get_ipython()
1076 1115 ip.history_manager.reset() # Clear any existing history.
1077 1116 cmds = [u"a=1", u"def b():\n return a**2", u"print(a, b())"]
1078 1117 for i, cmd in enumerate(cmds, start=1):
1079 1118 ip.history_manager.store_inputs(i, cmd)
1080 1119 with TemporaryDirectory() as tmpdir:
1081 1120 file = os.path.join(tmpdir, "testsave.py")
1082 1121 ip.run_line_magic("save", "%s 1-10" % file)
1083 1122 content = Path(file).read_text()
1084 1123 nt.assert_equal(content.count(cmds[0]), 1)
1085 1124 nt.assert_in("coding: utf-8", content)
1086 1125 ip.run_line_magic("save", "-a %s 1-10" % file)
1087 1126 content = Path(file).read_text()
1088 1127 nt.assert_equal(content.count(cmds[0]), 2)
1089 1128 nt.assert_in("coding: utf-8", content)
1090 1129
1091 1130
1092 1131 def test_save_with_no_args():
1093 1132 ip = get_ipython()
1094 1133 ip.history_manager.reset() # Clear any existing history.
1095 1134 cmds = [u"a=1", u"def b():\n return a**2", u"print(a, b())", "%save"]
1096 1135 for i, cmd in enumerate(cmds, start=1):
1097 1136 ip.history_manager.store_inputs(i, cmd)
1098 1137
1099 1138 with TemporaryDirectory() as tmpdir:
1100 1139 path = os.path.join(tmpdir, "testsave.py")
1101 1140 ip.run_line_magic("save", path)
1102 1141 content = Path(path).read_text()
1103 1142 expected_content = dedent(
1104 1143 """\
1105 1144 # coding: utf-8
1106 1145 a=1
1107 1146 def b():
1108 1147 return a**2
1109 1148 print(a, b())
1110 1149 """
1111 1150 )
1112 1151 nt.assert_equal(content, expected_content)
1113 1152
1114 1153
1115 1154 def test_store():
1116 1155 """Test %store."""
1117 1156 ip = get_ipython()
1118 1157 ip.run_line_magic('load_ext', 'storemagic')
1119 1158
1120 1159 # make sure the storage is empty
1121 1160 ip.run_line_magic('store', '-z')
1122 1161 ip.user_ns['var'] = 42
1123 1162 ip.run_line_magic('store', 'var')
1124 1163 ip.user_ns['var'] = 39
1125 1164 ip.run_line_magic('store', '-r')
1126 1165 nt.assert_equal(ip.user_ns['var'], 42)
1127 1166
1128 1167 ip.run_line_magic('store', '-d var')
1129 1168 ip.user_ns['var'] = 39
1130 1169 ip.run_line_magic('store' , '-r')
1131 1170 nt.assert_equal(ip.user_ns['var'], 39)
1132 1171
1133 1172
1134 1173 def _run_edit_test(arg_s, exp_filename=None,
1135 1174 exp_lineno=-1,
1136 1175 exp_contents=None,
1137 1176 exp_is_temp=None):
1138 1177 ip = get_ipython()
1139 1178 M = code.CodeMagics(ip)
1140 1179 last_call = ['','']
1141 1180 opts,args = M.parse_options(arg_s,'prxn:')
1142 1181 filename, lineno, is_temp = M._find_edit_target(ip, args, opts, last_call)
1143 1182
1144 1183 if exp_filename is not None:
1145 1184 nt.assert_equal(exp_filename, filename)
1146 1185 if exp_contents is not None:
1147 1186 with io.open(filename, 'r', encoding='utf-8') as f:
1148 1187 contents = f.read()
1149 1188 nt.assert_equal(exp_contents, contents)
1150 1189 if exp_lineno != -1:
1151 1190 nt.assert_equal(exp_lineno, lineno)
1152 1191 if exp_is_temp is not None:
1153 1192 nt.assert_equal(exp_is_temp, is_temp)
1154 1193
1155 1194
1156 1195 def test_edit_interactive():
1157 1196 """%edit on interactively defined objects"""
1158 1197 ip = get_ipython()
1159 1198 n = ip.execution_count
1160 1199 ip.run_cell(u"def foo(): return 1", store_history=True)
1161 1200
1162 1201 try:
1163 1202 _run_edit_test("foo")
1164 1203 except code.InteractivelyDefined as e:
1165 1204 nt.assert_equal(e.index, n)
1166 1205 else:
1167 1206 raise AssertionError("Should have raised InteractivelyDefined")
1168 1207
1169 1208
1170 1209 def test_edit_cell():
1171 1210 """%edit [cell id]"""
1172 1211 ip = get_ipython()
1173 1212
1174 1213 ip.run_cell(u"def foo(): return 1", store_history=True)
1175 1214
1176 1215 # test
1177 1216 _run_edit_test("1", exp_contents=ip.user_ns['In'][1], exp_is_temp=True)
1178 1217
1179 1218 def test_edit_fname():
1180 1219 """%edit file"""
1181 1220 # test
1182 1221 _run_edit_test("test file.py", exp_filename="test file.py")
1183 1222
1184 1223 def test_bookmark():
1185 1224 ip = get_ipython()
1186 1225 ip.run_line_magic('bookmark', 'bmname')
1187 1226 with tt.AssertPrints('bmname'):
1188 1227 ip.run_line_magic('bookmark', '-l')
1189 1228 ip.run_line_magic('bookmark', '-d bmname')
1190 1229
1191 1230 def test_ls_magic():
1192 1231 ip = get_ipython()
1193 1232 json_formatter = ip.display_formatter.formatters['application/json']
1194 1233 json_formatter.enabled = True
1195 1234 lsmagic = ip.magic('lsmagic')
1196 1235 with warnings.catch_warnings(record=True) as w:
1197 1236 j = json_formatter(lsmagic)
1198 1237 nt.assert_equal(sorted(j), ['cell', 'line'])
1199 1238 nt.assert_equal(w, []) # no warnings
1200 1239
1201 1240 def test_strip_initial_indent():
1202 1241 def sii(s):
1203 1242 lines = s.splitlines()
1204 1243 return '\n'.join(code.strip_initial_indent(lines))
1205 1244
1206 1245 nt.assert_equal(sii(" a = 1\nb = 2"), "a = 1\nb = 2")
1207 1246 nt.assert_equal(sii(" a\n b\nc"), "a\n b\nc")
1208 1247 nt.assert_equal(sii("a\n b"), "a\n b")
1209 1248
1210 1249 def test_logging_magic_quiet_from_arg():
1211 1250 _ip.config.LoggingMagics.quiet = False
1212 1251 lm = logging.LoggingMagics(shell=_ip)
1213 1252 with TemporaryDirectory() as td:
1214 1253 try:
1215 1254 with tt.AssertNotPrints(re.compile("Activating.*")):
1216 1255 lm.logstart('-q {}'.format(
1217 1256 os.path.join(td, "quiet_from_arg.log")))
1218 1257 finally:
1219 1258 _ip.logger.logstop()
1220 1259
1221 1260 def test_logging_magic_quiet_from_config():
1222 1261 _ip.config.LoggingMagics.quiet = True
1223 1262 lm = logging.LoggingMagics(shell=_ip)
1224 1263 with TemporaryDirectory() as td:
1225 1264 try:
1226 1265 with tt.AssertNotPrints(re.compile("Activating.*")):
1227 1266 lm.logstart(os.path.join(td, "quiet_from_config.log"))
1228 1267 finally:
1229 1268 _ip.logger.logstop()
1230 1269
1231 1270
1232 1271 def test_logging_magic_not_quiet():
1233 1272 _ip.config.LoggingMagics.quiet = False
1234 1273 lm = logging.LoggingMagics(shell=_ip)
1235 1274 with TemporaryDirectory() as td:
1236 1275 try:
1237 1276 with tt.AssertPrints(re.compile("Activating.*")):
1238 1277 lm.logstart(os.path.join(td, "not_quiet.log"))
1239 1278 finally:
1240 1279 _ip.logger.logstop()
1241 1280
1242 1281
1243 1282 def test_time_no_var_expand():
1244 1283 _ip.user_ns['a'] = 5
1245 1284 _ip.user_ns['b'] = []
1246 1285 _ip.magic('time b.append("{a}")')
1247 1286 assert _ip.user_ns['b'] == ['{a}']
1248 1287
1249 1288
1250 1289 # this is slow, put at the end for local testing.
1251 1290 def test_timeit_arguments():
1252 1291 "Test valid timeit arguments, should not cause SyntaxError (GH #1269)"
1253 1292 if sys.version_info < (3,7):
1254 1293 _ip.magic("timeit -n1 -r1 ('#')")
1255 1294 else:
1256 1295 # 3.7 optimize no-op statement like above out, and complain there is
1257 1296 # nothing in the for loop.
1258 1297 _ip.magic("timeit -n1 -r1 a=('#')")
1259 1298
1260 1299
1261 1300 TEST_MODULE = """
1262 1301 print('Loaded my_tmp')
1263 1302 if __name__ == "__main__":
1264 1303 print('I just ran a script')
1265 1304 """
1266 1305
1267 1306
1268 1307 def test_run_module_from_import_hook():
1269 1308 "Test that a module can be loaded via an import hook"
1270 1309 with TemporaryDirectory() as tmpdir:
1271 1310 fullpath = os.path.join(tmpdir, 'my_tmp.py')
1272 1311 Path(fullpath).write_text(TEST_MODULE)
1273 1312
1274 1313 class MyTempImporter(object):
1275 1314 def __init__(self):
1276 1315 pass
1277 1316
1278 1317 def find_module(self, fullname, path=None):
1279 1318 if 'my_tmp' in fullname:
1280 1319 return self
1281 1320 return None
1282 1321
1283 1322 def load_module(self, name):
1284 1323 import imp
1285 1324 return imp.load_source('my_tmp', fullpath)
1286 1325
1287 1326 def get_code(self, fullname):
1288 1327 return compile(Path(fullpath).read_text(), "foo", "exec")
1289 1328
1290 1329 def is_package(self, __):
1291 1330 return False
1292 1331
1293 1332 sys.meta_path.insert(0, MyTempImporter())
1294 1333
1295 1334 with capture_output() as captured:
1296 1335 _ip.magic("run -m my_tmp")
1297 1336 _ip.run_cell("import my_tmp")
1298 1337
1299 1338 output = "Loaded my_tmp\nI just ran a script\nLoaded my_tmp\n"
1300 1339 nt.assert_equal(output, captured.stdout)
1301 1340
1302 1341 sys.meta_path.pop(0)
General Comments 0
You need to be logged in to leave comments. Login now