##// END OF EJS Templates
Merge pull request #12639 from jetpacktuxedo/buffering
Matthias Bussonnier -
r26204:3b094c83 merge
parent child Browse files
Show More
@@ -1,113 +1,105 b''
1 1 # http://travis-ci.org/#!/ipython/ipython
2 2 language: python
3 3 os: linux
4 4
5 5 addons:
6 6 apt:
7 7 packages:
8 8 - graphviz
9 9
10 10 python:
11 11 - 3.8
12 12
13 13 env:
14 14 global:
15 15 - PATH=$TRAVIS_BUILD_DIR/pandoc:$PATH
16 16
17 17 group: edge
18 18
19 19 before_install:
20 20 - |
21 21 # install Python on macOS
22 22 if [[ "$TRAVIS_OS_NAME" == "osx" ]]; then
23 23 env | sort
24 24 if ! which python$TRAVIS_PYTHON_VERSION; then
25 25 HOMEBREW_NO_AUTO_UPDATE=1 brew tap minrk/homebrew-python-frameworks
26 26 HOMEBREW_NO_AUTO_UPDATE=1 brew cask install python-framework-${TRAVIS_PYTHON_VERSION/./}
27 27 fi
28 28 python3 -m pip install virtualenv
29 29 python3 -m virtualenv -p $(which python$TRAVIS_PYTHON_VERSION) ~/travis-env
30 30 source ~/travis-env/bin/activate
31 31 fi
32 32 - python --version
33 33
34 34 install:
35 35 - pip install pip --upgrade
36 36 - pip install setuptools --upgrade
37 - if [[ "$TRAVIS_PYTHON_VERSION" == "3.6" ]]; then
38 echo "for the time being still test on 3.6";
39 sed -ibkp s/7/6/g setup.py;
40 git diff;
41 fi
42 37 - pip install -e file://$PWD#egg=ipython[test] --upgrade
43 38 - pip install trio curio --upgrade --upgrade-strategy eager
44 39 - pip install 'pytest' 'matplotlib !=3.2.0'
45 40 - pip install codecov check-manifest pytest-cov --upgrade
46 41
47 42
48 43 script:
49 44 - check-manifest
50 45 - |
51 46 if [[ "$TRAVIS_PYTHON_VERSION" == "nightly" ]]; then
52 47 # on nightly fake parso known the grammar
53 48 cp /home/travis/virtualenv/python3.9-dev/lib/python3.9/site-packages/parso/python/grammar38.txt /home/travis/virtualenv/python3.9-dev/lib/python3.9/site-packages/parso/python/grammar39.txt
54 49 fi
55 50 - |
56 51 if [[ "$TRAVIS_PYTHON_VERSION" == "3.8" ]] && [[ "$TRAVIS_OS_NAME" == "linux" ]]; then
57 52 cd /tmp && iptest --coverage xml && cd -
58 53 fi
59 54 - pytest --maxfail=10 IPython
60 55 # On the latest Python (on Linux) only, make sure that the docs build.
61 56 - |
62 57 if [[ "$TRAVIS_PYTHON_VERSION" == "3.8" ]] && [[ "$TRAVIS_OS_NAME" == "linux" ]]; then
63 58 pip install -r docs/requirements.txt
64 59 python tools/fixup_whats_new_pr.py
65 60 make -C docs/ html SPHINXOPTS="-W"
66 61 fi
67 62
68 63 after_success:
69 64 - cp /tmp/ipy_coverage.xml ./
70 65 - cp /tmp/.coverage ./
71 66 - codecov
72 67
73 68 matrix:
74 69 include:
75 70 - arch: amd64
76 python: "3.6"
77 dist: xenial
78 - arch: amd64
79 71 python: "3.7"
80 72 dist: xenial
81 73 - arch: amd64
82 74 python: "3.8"
83 75 dist: xenial
84 76 - arch: amd64
85 77 python: "nightly"
86 78 dist: xenial
87 79 - arch: amd64
88 80 python: "3.9-dev"
89 81 - os: osx
90 82 language: generic
91 83 python: 3.7
92 84 env: TRAVIS_PYTHON_VERSION=3.7
93 85 allow_failures:
94 86 - python: nightly
95 87
96 88 before_deploy:
97 89 - rm -rf dist/
98 90 - python setup.py sdist
99 91 - python setup.py bdist_wheel
100 92
101 93 deploy:
102 94 provider: releases
103 95 api_key:
104 96 secure: Y/Ae9tYs5aoBU8bDjN2YrwGG6tCbezj/h3Lcmtx8HQavSbBgXnhnZVRb2snOKD7auqnqjfT/7QMm4ZyKvaOEgyggGktKqEKYHC8KOZ7yp8I5/UMDtk6j9TnXpSqqBxPiud4MDV76SfRYEQiaDoG4tGGvSfPJ9KcNjKrNvSyyxns=
105 97 file: dist/*
106 98 file_glob: true
107 99 cleanup: false
108 100 on:
109 101 repo: ipython/ipython
110 102 all_branches: true # Backports are released from e.g. 5.x branch
111 103 tags: true
112 104 python: 3.6 # Any version should work, but we only need one
113 105 condition: $TRAVIS_OS_NAME = "linux"
@@ -1,294 +1,315 b''
1 1 """Magic functions for running cells in various scripts."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 import errno
7 7 import os
8 8 import sys
9 9 import signal
10 10 import time
11 from subprocess import Popen, PIPE, CalledProcessError
11 import asyncio
12 12 import atexit
13 13
14 14 from IPython.core import magic_arguments
15 15 from IPython.core.magic import (
16 16 Magics, magics_class, line_magic, cell_magic
17 17 )
18 18 from IPython.lib.backgroundjobs import BackgroundJobManager
19 19 from IPython.utils import py3compat
20 20 from IPython.utils.process import arg_split
21 21 from traitlets import List, Dict, default
22 22
23 23 #-----------------------------------------------------------------------------
24 24 # Magic implementation classes
25 25 #-----------------------------------------------------------------------------
26 26
27 27 def script_args(f):
28 28 """single decorator for adding script args"""
29 29 args = [
30 30 magic_arguments.argument(
31 31 '--out', type=str,
32 32 help="""The variable in which to store stdout from the script.
33 33 If the script is backgrounded, this will be the stdout *pipe*,
34 34 instead of the stderr text itself and will not be auto closed.
35 35 """
36 36 ),
37 37 magic_arguments.argument(
38 38 '--err', type=str,
39 39 help="""The variable in which to store stderr from the script.
40 40 If the script is backgrounded, this will be the stderr *pipe*,
41 41 instead of the stderr text itself and will not be autoclosed.
42 42 """
43 43 ),
44 44 magic_arguments.argument(
45 45 '--bg', action="store_true",
46 46 help="""Whether to run the script in the background.
47 47 If given, the only way to see the output of the command is
48 48 with --out/err.
49 49 """
50 50 ),
51 51 magic_arguments.argument(
52 52 '--proc', type=str,
53 53 help="""The variable in which to store Popen instance.
54 54 This is used only when --bg option is given.
55 55 """
56 56 ),
57 57 magic_arguments.argument(
58 58 '--no-raise-error', action="store_false", dest='raise_error',
59 59 help="""Whether you should raise an error message in addition to
60 60 a stream on stderr if you get a nonzero exit code.
61 61 """
62 62 )
63 63 ]
64 64 for arg in args:
65 65 f = arg(f)
66 66 return f
67 67
68 68 @magics_class
69 69 class ScriptMagics(Magics):
70 70 """Magics for talking to scripts
71 71
72 72 This defines a base `%%script` cell magic for running a cell
73 73 with a program in a subprocess, and registers a few top-level
74 74 magics that call %%script with common interpreters.
75 75 """
76 76 script_magics = List(
77 77 help="""Extra script cell magics to define
78 78
79 79 This generates simple wrappers of `%%script foo` as `%%foo`.
80 80
81 81 If you want to add script magics that aren't on your path,
82 82 specify them in script_paths
83 83 """,
84 84 ).tag(config=True)
85 85 @default('script_magics')
86 86 def _script_magics_default(self):
87 87 """default to a common list of programs"""
88 88
89 89 defaults = [
90 90 'sh',
91 91 'bash',
92 92 'perl',
93 93 'ruby',
94 94 'python',
95 95 'python2',
96 96 'python3',
97 97 'pypy',
98 98 ]
99 99 if os.name == 'nt':
100 100 defaults.extend([
101 101 'cmd',
102 102 ])
103 103
104 104 return defaults
105 105
106 106 script_paths = Dict(
107 107 help="""Dict mapping short 'ruby' names to full paths, such as '/opt/secret/bin/ruby'
108 108
109 109 Only necessary for items in script_magics where the default path will not
110 110 find the right interpreter.
111 111 """
112 112 ).tag(config=True)
113 113
114 114 def __init__(self, shell=None):
115 115 super(ScriptMagics, self).__init__(shell=shell)
116 116 self._generate_script_magics()
117 117 self.job_manager = BackgroundJobManager()
118 118 self.bg_processes = []
119 119 atexit.register(self.kill_bg_processes)
120 120
121 121 def __del__(self):
122 122 self.kill_bg_processes()
123 123
124 124 def _generate_script_magics(self):
125 125 cell_magics = self.magics['cell']
126 126 for name in self.script_magics:
127 127 cell_magics[name] = self._make_script_magic(name)
128 128
129 129 def _make_script_magic(self, name):
130 130 """make a named magic, that calls %%script with a particular program"""
131 131 # expand to explicit path if necessary:
132 132 script = self.script_paths.get(name, name)
133 133
134 134 @magic_arguments.magic_arguments()
135 135 @script_args
136 136 def named_script_magic(line, cell):
137 137 # if line, add it as cl-flags
138 138 if line:
139 139 line = "%s %s" % (script, line)
140 140 else:
141 141 line = script
142 142 return self.shebang(line, cell)
143 143
144 144 # write a basic docstring:
145 145 named_script_magic.__doc__ = \
146 146 """%%{name} script magic
147 147
148 148 Run cells with {script} in a subprocess.
149 149
150 150 This is a shortcut for `%%script {script}`
151 151 """.format(**locals())
152 152
153 153 return named_script_magic
154 154
155 155 @magic_arguments.magic_arguments()
156 156 @script_args
157 157 @cell_magic("script")
158 158 def shebang(self, line, cell):
159 159 """Run a cell via a shell command
160 160
161 161 The `%%script` line is like the #! line of script,
162 162 specifying a program (bash, perl, ruby, etc.) with which to run.
163 163
164 164 The rest of the cell is run by that program.
165 165
166 166 Examples
167 167 --------
168 168 ::
169 169
170 170 In [1]: %%script bash
171 171 ...: for i in 1 2 3; do
172 172 ...: echo $i
173 173 ...: done
174 174 1
175 175 2
176 176 3
177 177 """
178 argv = arg_split(line, posix = not sys.platform.startswith('win'))
178
179 async def _handle_stream(stream, stream_arg, file_object):
180 while True:
181 line = (await stream.readline()).decode("utf8")
182 if not line:
183 break
184 if stream_arg:
185 self.shell.user_ns[stream_arg] = line
186 else:
187 file_object.write(line)
188 file_object.flush()
189
190 async def _stream_communicate(process, cell):
191 process.stdin.write(cell)
192 process.stdin.close()
193 stdout_task = asyncio.create_task(
194 _handle_stream(process.stdout, args.out, sys.stdout)
195 )
196 stderr_task = asyncio.create_task(
197 _handle_stream(process.stderr, args.err, sys.stderr)
198 )
199 await asyncio.wait([stdout_task, stderr_task])
200
201 if sys.platform.startswith("win"):
202 asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
203 loop = asyncio.get_event_loop()
204
205 argv = arg_split(line, posix=not sys.platform.startswith("win"))
179 206 args, cmd = self.shebang.parser.parse_known_args(argv)
180
181 207 try:
182 p = Popen(cmd, stdout=PIPE, stderr=PIPE, stdin=PIPE)
208 p = loop.run_until_complete(
209 asyncio.create_subprocess_exec(
210 *cmd,
211 stdout=asyncio.subprocess.PIPE,
212 stderr=asyncio.subprocess.PIPE,
213 stdin=asyncio.subprocess.PIPE
214 )
215 )
183 216 except OSError as e:
184 217 if e.errno == errno.ENOENT:
185 218 print("Couldn't find program: %r" % cmd[0])
186 219 return
187 220 else:
188 221 raise
189 222
190 223 if not cell.endswith('\n'):
191 224 cell += '\n'
192 225 cell = cell.encode('utf8', 'replace')
193 226 if args.bg:
194 227 self.bg_processes.append(p)
195 228 self._gc_bg_processes()
196 229 to_close = []
197 230 if args.out:
198 231 self.shell.user_ns[args.out] = p.stdout
199 232 else:
200 233 to_close.append(p.stdout)
201 234 if args.err:
202 235 self.shell.user_ns[args.err] = p.stderr
203 236 else:
204 237 to_close.append(p.stderr)
205 238 self.job_manager.new(self._run_script, p, cell, to_close, daemon=True)
206 239 if args.proc:
207 240 self.shell.user_ns[args.proc] = p
208 241 return
209 242
210 243 try:
211 out, err = p.communicate(cell)
244 loop.run_until_complete(_stream_communicate(p, cell))
212 245 except KeyboardInterrupt:
213 246 try:
214 247 p.send_signal(signal.SIGINT)
215 248 time.sleep(0.1)
216 if p.poll() is not None:
249 if p.returncode is not None:
217 250 print("Process is interrupted.")
218 251 return
219 252 p.terminate()
220 253 time.sleep(0.1)
221 if p.poll() is not None:
254 if p.returncode is not None:
222 255 print("Process is terminated.")
223 256 return
224 257 p.kill()
225 258 print("Process is killed.")
226 259 except OSError:
227 260 pass
228 261 except Exception as e:
229 262 print("Error while terminating subprocess (pid=%i): %s" \
230 263 % (p.pid, e))
231 264 return
232 out = py3compat.decode(out)
233 err = py3compat.decode(err)
234 if args.out:
235 self.shell.user_ns[args.out] = out
236 else:
237 sys.stdout.write(out)
238 sys.stdout.flush()
239 if args.err:
240 self.shell.user_ns[args.err] = err
241 else:
242 sys.stderr.write(err)
243 sys.stderr.flush()
244 265 if args.raise_error and p.returncode!=0:
245 raise CalledProcessError(p.returncode, cell, output=out, stderr=err)
266 raise CalledProcessError(p.returncode, cell)
246 267
247 268 def _run_script(self, p, cell, to_close):
248 269 """callback for running the script in the background"""
249 270 p.stdin.write(cell)
250 271 p.stdin.close()
251 272 for s in to_close:
252 273 s.close()
253 274 p.wait()
254 275
255 276 @line_magic("killbgscripts")
256 277 def killbgscripts(self, _nouse_=''):
257 278 """Kill all BG processes started by %%script and its family."""
258 279 self.kill_bg_processes()
259 280 print("All background processes were killed.")
260 281
261 282 def kill_bg_processes(self):
262 283 """Kill all BG processes which are still running."""
263 284 if not self.bg_processes:
264 285 return
265 286 for p in self.bg_processes:
266 if p.poll() is None:
287 if p.returncode is None:
267 288 try:
268 289 p.send_signal(signal.SIGINT)
269 290 except:
270 291 pass
271 292 time.sleep(0.1)
272 293 self._gc_bg_processes()
273 294 if not self.bg_processes:
274 295 return
275 296 for p in self.bg_processes:
276 if p.poll() is None:
297 if p.returncode is None:
277 298 try:
278 299 p.terminate()
279 300 except:
280 301 pass
281 302 time.sleep(0.1)
282 303 self._gc_bg_processes()
283 304 if not self.bg_processes:
284 305 return
285 306 for p in self.bg_processes:
286 if p.poll() is None:
307 if p.returncode is None:
287 308 try:
288 309 p.kill()
289 310 except:
290 311 pass
291 312 self._gc_bg_processes()
292 313
293 314 def _gc_bg_processes(self):
294 self.bg_processes = [p for p in self.bg_processes if p.poll() is None]
315 self.bg_processes = [p for p in self.bg_processes if p.returncode is None]
@@ -1,1257 +1,1261 b''
1 1 # -*- coding: utf-8 -*-
2 2 """Tests for various magic functions.
3 3
4 4 Needs to be run by nose (to make ipython session available).
5 5 """
6 6
7 7 import io
8 8 import os
9 9 import re
10 10 import sys
11 11 import warnings
12 12 from textwrap import dedent
13 13 from unittest import TestCase
14 14 from unittest import mock
15 15 from importlib import invalidate_caches
16 16 from io import StringIO
17 17 from pathlib import Path
18 18
19 19 import nose.tools as nt
20 20
21 21 import shlex
22 22
23 23 from IPython import get_ipython
24 24 from IPython.core import magic
25 25 from IPython.core.error import UsageError
26 26 from IPython.core.magic import (Magics, magics_class, line_magic,
27 27 cell_magic,
28 28 register_line_magic, register_cell_magic)
29 29 from IPython.core.magics import execution, script, code, logging, osm
30 30 from IPython.testing import decorators as dec
31 31 from IPython.testing import tools as tt
32 32 from IPython.utils.io import capture_output
33 33 from IPython.utils.tempdir import (TemporaryDirectory,
34 34 TemporaryWorkingDirectory)
35 35 from IPython.utils.process import find_cmd
36 36 from .test_debugger import PdbTestInput
37 37
38 38
39 39 @magic.magics_class
40 40 class DummyMagics(magic.Magics): pass
41 41
42 42 def test_extract_code_ranges():
43 43 instr = "1 3 5-6 7-9 10:15 17: :10 10- -13 :"
44 44 expected = [(0, 1),
45 45 (2, 3),
46 46 (4, 6),
47 47 (6, 9),
48 48 (9, 14),
49 49 (16, None),
50 50 (None, 9),
51 51 (9, None),
52 52 (None, 13),
53 53 (None, None)]
54 54 actual = list(code.extract_code_ranges(instr))
55 55 nt.assert_equal(actual, expected)
56 56
57 57 def test_extract_symbols():
58 58 source = """import foo\na = 10\ndef b():\n return 42\n\n\nclass A: pass\n\n\n"""
59 59 symbols_args = ["a", "b", "A", "A,b", "A,a", "z"]
60 60 expected = [([], ['a']),
61 61 (["def b():\n return 42\n"], []),
62 62 (["class A: pass\n"], []),
63 63 (["class A: pass\n", "def b():\n return 42\n"], []),
64 64 (["class A: pass\n"], ['a']),
65 65 ([], ['z'])]
66 66 for symbols, exp in zip(symbols_args, expected):
67 67 nt.assert_equal(code.extract_symbols(source, symbols), exp)
68 68
69 69
70 70 def test_extract_symbols_raises_exception_with_non_python_code():
71 71 source = ("=begin A Ruby program :)=end\n"
72 72 "def hello\n"
73 73 "puts 'Hello world'\n"
74 74 "end")
75 75 with nt.assert_raises(SyntaxError):
76 76 code.extract_symbols(source, "hello")
77 77
78 78
79 79 def test_magic_not_found():
80 80 # magic not found raises UsageError
81 81 with nt.assert_raises(UsageError):
82 82 _ip.magic('doesntexist')
83 83
84 84 # ensure result isn't success when a magic isn't found
85 85 result = _ip.run_cell('%doesntexist')
86 86 assert isinstance(result.error_in_exec, UsageError)
87 87
88 88
89 89 def test_cell_magic_not_found():
90 90 # magic not found raises UsageError
91 91 with nt.assert_raises(UsageError):
92 92 _ip.run_cell_magic('doesntexist', 'line', 'cell')
93 93
94 94 # ensure result isn't success when a magic isn't found
95 95 result = _ip.run_cell('%%doesntexist')
96 96 assert isinstance(result.error_in_exec, UsageError)
97 97
98 98
99 99 def test_magic_error_status():
100 100 def fail(shell):
101 101 1/0
102 102 _ip.register_magic_function(fail)
103 103 result = _ip.run_cell('%fail')
104 104 assert isinstance(result.error_in_exec, ZeroDivisionError)
105 105
106 106
107 107 def test_config():
108 108 """ test that config magic does not raise
109 109 can happen if Configurable init is moved too early into
110 110 Magics.__init__ as then a Config object will be registered as a
111 111 magic.
112 112 """
113 113 ## should not raise.
114 114 _ip.magic('config')
115 115
116 116 def test_config_available_configs():
117 117 """ test that config magic prints available configs in unique and
118 118 sorted order. """
119 119 with capture_output() as captured:
120 120 _ip.magic('config')
121 121
122 122 stdout = captured.stdout
123 123 config_classes = stdout.strip().split('\n')[1:]
124 124 nt.assert_list_equal(config_classes, sorted(set(config_classes)))
125 125
126 126 def test_config_print_class():
127 127 """ test that config with a classname prints the class's options. """
128 128 with capture_output() as captured:
129 129 _ip.magic('config TerminalInteractiveShell')
130 130
131 131 stdout = captured.stdout
132 132 if not re.match("TerminalInteractiveShell.* options", stdout.splitlines()[0]):
133 133 print(stdout)
134 134 raise AssertionError("1st line of stdout not like "
135 135 "'TerminalInteractiveShell.* options'")
136 136
137 137 def test_rehashx():
138 138 # clear up everything
139 139 _ip.alias_manager.clear_aliases()
140 140 del _ip.db['syscmdlist']
141 141
142 142 _ip.magic('rehashx')
143 143 # Practically ALL ipython development systems will have more than 10 aliases
144 144
145 145 nt.assert_true(len(_ip.alias_manager.aliases) > 10)
146 146 for name, cmd in _ip.alias_manager.aliases:
147 147 # we must strip dots from alias names
148 148 nt.assert_not_in('.', name)
149 149
150 150 # rehashx must fill up syscmdlist
151 151 scoms = _ip.db['syscmdlist']
152 152 nt.assert_true(len(scoms) > 10)
153 153
154 154
155 155
156 156 def test_magic_parse_options():
157 157 """Test that we don't mangle paths when parsing magic options."""
158 158 ip = get_ipython()
159 159 path = 'c:\\x'
160 160 m = DummyMagics(ip)
161 161 opts = m.parse_options('-f %s' % path,'f:')[0]
162 162 # argv splitting is os-dependent
163 163 if os.name == 'posix':
164 164 expected = 'c:x'
165 165 else:
166 166 expected = path
167 167 nt.assert_equal(opts['f'], expected)
168 168
169 169 def test_magic_parse_long_options():
170 170 """Magic.parse_options can handle --foo=bar long options"""
171 171 ip = get_ipython()
172 172 m = DummyMagics(ip)
173 173 opts, _ = m.parse_options('--foo --bar=bubble', 'a', 'foo', 'bar=')
174 174 nt.assert_in('foo', opts)
175 175 nt.assert_in('bar', opts)
176 176 nt.assert_equal(opts['bar'], "bubble")
177 177
178 178
179 179 def doctest_hist_f():
180 180 """Test %hist -f with temporary filename.
181 181
182 182 In [9]: import tempfile
183 183
184 184 In [10]: tfile = tempfile.mktemp('.py','tmp-ipython-')
185 185
186 186 In [11]: %hist -nl -f $tfile 3
187 187
188 188 In [13]: import os; os.unlink(tfile)
189 189 """
190 190
191 191
192 192 def doctest_hist_op():
193 193 """Test %hist -op
194 194
195 195 In [1]: class b(float):
196 196 ...: pass
197 197 ...:
198 198
199 199 In [2]: class s(object):
200 200 ...: def __str__(self):
201 201 ...: return 's'
202 202 ...:
203 203
204 204 In [3]:
205 205
206 206 In [4]: class r(b):
207 207 ...: def __repr__(self):
208 208 ...: return 'r'
209 209 ...:
210 210
211 211 In [5]: class sr(s,r): pass
212 212 ...:
213 213
214 214 In [6]:
215 215
216 216 In [7]: bb=b()
217 217
218 218 In [8]: ss=s()
219 219
220 220 In [9]: rr=r()
221 221
222 222 In [10]: ssrr=sr()
223 223
224 224 In [11]: 4.5
225 225 Out[11]: 4.5
226 226
227 227 In [12]: str(ss)
228 228 Out[12]: 's'
229 229
230 230 In [13]:
231 231
232 232 In [14]: %hist -op
233 233 >>> class b:
234 234 ... pass
235 235 ...
236 236 >>> class s(b):
237 237 ... def __str__(self):
238 238 ... return 's'
239 239 ...
240 240 >>>
241 241 >>> class r(b):
242 242 ... def __repr__(self):
243 243 ... return 'r'
244 244 ...
245 245 >>> class sr(s,r): pass
246 246 >>>
247 247 >>> bb=b()
248 248 >>> ss=s()
249 249 >>> rr=r()
250 250 >>> ssrr=sr()
251 251 >>> 4.5
252 252 4.5
253 253 >>> str(ss)
254 254 's'
255 255 >>>
256 256 """
257 257
258 258 def test_hist_pof():
259 259 ip = get_ipython()
260 260 ip.run_cell(u"1+2", store_history=True)
261 261 #raise Exception(ip.history_manager.session_number)
262 262 #raise Exception(list(ip.history_manager._get_range_session()))
263 263 with TemporaryDirectory() as td:
264 264 tf = os.path.join(td, 'hist.py')
265 265 ip.run_line_magic('history', '-pof %s' % tf)
266 266 assert os.path.isfile(tf)
267 267
268 268
269 269 def test_macro():
270 270 ip = get_ipython()
271 271 ip.history_manager.reset() # Clear any existing history.
272 272 cmds = ["a=1", "def b():\n return a**2", "print(a,b())"]
273 273 for i, cmd in enumerate(cmds, start=1):
274 274 ip.history_manager.store_inputs(i, cmd)
275 275 ip.magic("macro test 1-3")
276 276 nt.assert_equal(ip.user_ns["test"].value, "\n".join(cmds)+"\n")
277 277
278 278 # List macros
279 279 nt.assert_in("test", ip.magic("macro"))
280 280
281 281
282 282 def test_macro_run():
283 283 """Test that we can run a multi-line macro successfully."""
284 284 ip = get_ipython()
285 285 ip.history_manager.reset()
286 286 cmds = ["a=10", "a+=1", "print(a)", "%macro test 2-3"]
287 287 for cmd in cmds:
288 288 ip.run_cell(cmd, store_history=True)
289 289 nt.assert_equal(ip.user_ns["test"].value, "a+=1\nprint(a)\n")
290 290 with tt.AssertPrints("12"):
291 291 ip.run_cell("test")
292 292 with tt.AssertPrints("13"):
293 293 ip.run_cell("test")
294 294
295 295
296 296 def test_magic_magic():
297 297 """Test %magic"""
298 298 ip = get_ipython()
299 299 with capture_output() as captured:
300 300 ip.magic("magic")
301 301
302 302 stdout = captured.stdout
303 303 nt.assert_in('%magic', stdout)
304 304 nt.assert_in('IPython', stdout)
305 305 nt.assert_in('Available', stdout)
306 306
307 307
308 308 @dec.skipif_not_numpy
309 309 def test_numpy_reset_array_undec():
310 310 "Test '%reset array' functionality"
311 311 _ip.ex('import numpy as np')
312 312 _ip.ex('a = np.empty(2)')
313 313 nt.assert_in('a', _ip.user_ns)
314 314 _ip.magic('reset -f array')
315 315 nt.assert_not_in('a', _ip.user_ns)
316 316
317 317 def test_reset_out():
318 318 "Test '%reset out' magic"
319 319 _ip.run_cell("parrot = 'dead'", store_history=True)
320 320 # test '%reset -f out', make an Out prompt
321 321 _ip.run_cell("parrot", store_history=True)
322 322 nt.assert_true('dead' in [_ip.user_ns[x] for x in ('_','__','___')])
323 323 _ip.magic('reset -f out')
324 324 nt.assert_false('dead' in [_ip.user_ns[x] for x in ('_','__','___')])
325 325 nt.assert_equal(len(_ip.user_ns['Out']), 0)
326 326
327 327 def test_reset_in():
328 328 "Test '%reset in' magic"
329 329 # test '%reset -f in'
330 330 _ip.run_cell("parrot", store_history=True)
331 331 nt.assert_true('parrot' in [_ip.user_ns[x] for x in ('_i','_ii','_iii')])
332 332 _ip.magic('%reset -f in')
333 333 nt.assert_false('parrot' in [_ip.user_ns[x] for x in ('_i','_ii','_iii')])
334 334 nt.assert_equal(len(set(_ip.user_ns['In'])), 1)
335 335
336 336 def test_reset_dhist():
337 337 "Test '%reset dhist' magic"
338 338 _ip.run_cell("tmp = [d for d in _dh]") # copy before clearing
339 339 _ip.magic('cd ' + os.path.dirname(nt.__file__))
340 340 _ip.magic('cd -')
341 341 nt.assert_true(len(_ip.user_ns['_dh']) > 0)
342 342 _ip.magic('reset -f dhist')
343 343 nt.assert_equal(len(_ip.user_ns['_dh']), 0)
344 344 _ip.run_cell("_dh = [d for d in tmp]") #restore
345 345
346 346 def test_reset_in_length():
347 347 "Test that '%reset in' preserves In[] length"
348 348 _ip.run_cell("print 'foo'")
349 349 _ip.run_cell("reset -f in")
350 350 nt.assert_equal(len(_ip.user_ns['In']), _ip.displayhook.prompt_count+1)
351 351
352 352 class TestResetErrors(TestCase):
353 353
354 354 def test_reset_redefine(self):
355 355
356 356 @magics_class
357 357 class KernelMagics(Magics):
358 358 @line_magic
359 359 def less(self, shell): pass
360 360
361 361 _ip.register_magics(KernelMagics)
362 362
363 363 with self.assertLogs() as cm:
364 364 # hack, we want to just capture logs, but assertLogs fails if not
365 365 # logs get produce.
366 366 # so log one things we ignore.
367 367 import logging as log_mod
368 368 log = log_mod.getLogger()
369 369 log.info('Nothing')
370 370 # end hack.
371 371 _ip.run_cell("reset -f")
372 372
373 373 assert len(cm.output) == 1
374 374 for out in cm.output:
375 375 assert "Invalid alias" not in out
376 376
377 377 def test_tb_syntaxerror():
378 378 """test %tb after a SyntaxError"""
379 379 ip = get_ipython()
380 380 ip.run_cell("for")
381 381
382 382 # trap and validate stdout
383 383 save_stdout = sys.stdout
384 384 try:
385 385 sys.stdout = StringIO()
386 386 ip.run_cell("%tb")
387 387 out = sys.stdout.getvalue()
388 388 finally:
389 389 sys.stdout = save_stdout
390 390 # trim output, and only check the last line
391 391 last_line = out.rstrip().splitlines()[-1].strip()
392 392 nt.assert_equal(last_line, "SyntaxError: invalid syntax")
393 393
394 394
395 395 def test_time():
396 396 ip = get_ipython()
397 397
398 398 with tt.AssertPrints("Wall time: "):
399 399 ip.run_cell("%time None")
400 400
401 401 ip.run_cell("def f(kmjy):\n"
402 402 " %time print (2*kmjy)")
403 403
404 404 with tt.AssertPrints("Wall time: "):
405 405 with tt.AssertPrints("hihi", suppress=False):
406 406 ip.run_cell("f('hi')")
407 407
408 408 def test_time_last_not_expression():
409 409 ip.run_cell("%%time\n"
410 410 "var_1 = 1\n"
411 411 "var_2 = 2\n")
412 412 assert ip.user_ns['var_1'] == 1
413 413 del ip.user_ns['var_1']
414 414 assert ip.user_ns['var_2'] == 2
415 415 del ip.user_ns['var_2']
416 416
417 417
418 418 @dec.skip_win32
419 419 def test_time2():
420 420 ip = get_ipython()
421 421
422 422 with tt.AssertPrints("CPU times: user "):
423 423 ip.run_cell("%time None")
424 424
425 425 def test_time3():
426 426 """Erroneous magic function calls, issue gh-3334"""
427 427 ip = get_ipython()
428 428 ip.user_ns.pop('run', None)
429 429
430 430 with tt.AssertNotPrints("not found", channel='stderr'):
431 431 ip.run_cell("%%time\n"
432 432 "run = 0\n"
433 433 "run += 1")
434 434
435 435 def test_multiline_time():
436 436 """Make sure last statement from time return a value."""
437 437 ip = get_ipython()
438 438 ip.user_ns.pop('run', None)
439 439
440 440 ip.run_cell(dedent("""\
441 441 %%time
442 442 a = "ho"
443 443 b = "hey"
444 444 a+b
445 445 """))
446 446 nt.assert_equal(ip.user_ns_hidden['_'], 'hohey')
447 447
448 448 def test_time_local_ns():
449 449 """
450 450 Test that local_ns is actually global_ns when running a cell magic
451 451 """
452 452 ip = get_ipython()
453 453 ip.run_cell("%%time\n"
454 454 "myvar = 1")
455 455 nt.assert_equal(ip.user_ns['myvar'], 1)
456 456 del ip.user_ns['myvar']
457 457
458 458 def test_doctest_mode():
459 459 "Toggle doctest_mode twice, it should be a no-op and run without error"
460 460 _ip.magic('doctest_mode')
461 461 _ip.magic('doctest_mode')
462 462
463 463
464 464 def test_parse_options():
465 465 """Tests for basic options parsing in magics."""
466 466 # These are only the most minimal of tests, more should be added later. At
467 467 # the very least we check that basic text/unicode calls work OK.
468 468 m = DummyMagics(_ip)
469 469 nt.assert_equal(m.parse_options('foo', '')[1], 'foo')
470 470 nt.assert_equal(m.parse_options(u'foo', '')[1], u'foo')
471 471
472 472
473 473 def test_dirops():
474 474 """Test various directory handling operations."""
475 475 # curpath = lambda :os.path.splitdrive(os.getcwd())[1].replace('\\','/')
476 476 curpath = os.getcwd
477 477 startdir = os.getcwd()
478 478 ipdir = os.path.realpath(_ip.ipython_dir)
479 479 try:
480 480 _ip.magic('cd "%s"' % ipdir)
481 481 nt.assert_equal(curpath(), ipdir)
482 482 _ip.magic('cd -')
483 483 nt.assert_equal(curpath(), startdir)
484 484 _ip.magic('pushd "%s"' % ipdir)
485 485 nt.assert_equal(curpath(), ipdir)
486 486 _ip.magic('popd')
487 487 nt.assert_equal(curpath(), startdir)
488 488 finally:
489 489 os.chdir(startdir)
490 490
491 491
492 492 def test_cd_force_quiet():
493 493 """Test OSMagics.cd_force_quiet option"""
494 494 _ip.config.OSMagics.cd_force_quiet = True
495 495 osmagics = osm.OSMagics(shell=_ip)
496 496
497 497 startdir = os.getcwd()
498 498 ipdir = os.path.realpath(_ip.ipython_dir)
499 499
500 500 try:
501 501 with tt.AssertNotPrints(ipdir):
502 502 osmagics.cd('"%s"' % ipdir)
503 503 with tt.AssertNotPrints(startdir):
504 504 osmagics.cd('-')
505 505 finally:
506 506 os.chdir(startdir)
507 507
508 508
509 509 def test_xmode():
510 510 # Calling xmode three times should be a no-op
511 511 xmode = _ip.InteractiveTB.mode
512 512 for i in range(4):
513 513 _ip.magic("xmode")
514 514 nt.assert_equal(_ip.InteractiveTB.mode, xmode)
515 515
516 516 def test_reset_hard():
517 517 monitor = []
518 518 class A(object):
519 519 def __del__(self):
520 520 monitor.append(1)
521 521 def __repr__(self):
522 522 return "<A instance>"
523 523
524 524 _ip.user_ns["a"] = A()
525 525 _ip.run_cell("a")
526 526
527 527 nt.assert_equal(monitor, [])
528 528 _ip.magic("reset -f")
529 529 nt.assert_equal(monitor, [1])
530 530
531 531 class TestXdel(tt.TempFileMixin):
532 532 def test_xdel(self):
533 533 """Test that references from %run are cleared by xdel."""
534 534 src = ("class A(object):\n"
535 535 " monitor = []\n"
536 536 " def __del__(self):\n"
537 537 " self.monitor.append(1)\n"
538 538 "a = A()\n")
539 539 self.mktmp(src)
540 540 # %run creates some hidden references...
541 541 _ip.magic("run %s" % self.fname)
542 542 # ... as does the displayhook.
543 543 _ip.run_cell("a")
544 544
545 545 monitor = _ip.user_ns["A"].monitor
546 546 nt.assert_equal(monitor, [])
547 547
548 548 _ip.magic("xdel a")
549 549
550 550 # Check that a's __del__ method has been called.
551 551 nt.assert_equal(monitor, [1])
552 552
553 553 def doctest_who():
554 554 """doctest for %who
555 555
556 556 In [1]: %reset -f
557 557
558 558 In [2]: alpha = 123
559 559
560 560 In [3]: beta = 'beta'
561 561
562 562 In [4]: %who int
563 563 alpha
564 564
565 565 In [5]: %who str
566 566 beta
567 567
568 568 In [6]: %whos
569 569 Variable Type Data/Info
570 570 ----------------------------
571 571 alpha int 123
572 572 beta str beta
573 573
574 574 In [7]: %who_ls
575 575 Out[7]: ['alpha', 'beta']
576 576 """
577 577
578 578 def test_whos():
579 579 """Check that whos is protected against objects where repr() fails."""
580 580 class A(object):
581 581 def __repr__(self):
582 582 raise Exception()
583 583 _ip.user_ns['a'] = A()
584 584 _ip.magic("whos")
585 585
586 586 def doctest_precision():
587 587 """doctest for %precision
588 588
589 589 In [1]: f = get_ipython().display_formatter.formatters['text/plain']
590 590
591 591 In [2]: %precision 5
592 592 Out[2]: '%.5f'
593 593
594 594 In [3]: f.float_format
595 595 Out[3]: '%.5f'
596 596
597 597 In [4]: %precision %e
598 598 Out[4]: '%e'
599 599
600 600 In [5]: f(3.1415927)
601 601 Out[5]: '3.141593e+00'
602 602 """
603 603
604 604 def test_debug_magic():
605 605 """Test debugging a small code with %debug
606 606
607 607 In [1]: with PdbTestInput(['c']):
608 608 ...: %debug print("a b") #doctest: +ELLIPSIS
609 609 ...:
610 610 ...
611 611 ipdb> c
612 612 a b
613 613 In [2]:
614 614 """
615 615
616 616 def test_psearch():
617 617 with tt.AssertPrints("dict.fromkeys"):
618 618 _ip.run_cell("dict.fr*?")
619 619 with tt.AssertPrints("Ο€.is_integer"):
620 620 _ip.run_cell("Ο€ = 3.14;\nΟ€.is_integ*?")
621 621
622 622 def test_timeit_shlex():
623 623 """test shlex issues with timeit (#1109)"""
624 624 _ip.ex("def f(*a,**kw): pass")
625 625 _ip.magic('timeit -n1 "this is a bug".count(" ")')
626 626 _ip.magic('timeit -r1 -n1 f(" ", 1)')
627 627 _ip.magic('timeit -r1 -n1 f(" ", 1, " ", 2, " ")')
628 628 _ip.magic('timeit -r1 -n1 ("a " + "b")')
629 629 _ip.magic('timeit -r1 -n1 f("a " + "b")')
630 630 _ip.magic('timeit -r1 -n1 f("a " + "b ")')
631 631
632 632
633 633 def test_timeit_special_syntax():
634 634 "Test %%timeit with IPython special syntax"
635 635 @register_line_magic
636 636 def lmagic(line):
637 637 ip = get_ipython()
638 638 ip.user_ns['lmagic_out'] = line
639 639
640 640 # line mode test
641 641 _ip.run_line_magic('timeit', '-n1 -r1 %lmagic my line')
642 642 nt.assert_equal(_ip.user_ns['lmagic_out'], 'my line')
643 643 # cell mode test
644 644 _ip.run_cell_magic('timeit', '-n1 -r1', '%lmagic my line2')
645 645 nt.assert_equal(_ip.user_ns['lmagic_out'], 'my line2')
646 646
647 647 def test_timeit_return():
648 648 """
649 649 test whether timeit -o return object
650 650 """
651 651
652 652 res = _ip.run_line_magic('timeit','-n10 -r10 -o 1')
653 653 assert(res is not None)
654 654
655 655 def test_timeit_quiet():
656 656 """
657 657 test quiet option of timeit magic
658 658 """
659 659 with tt.AssertNotPrints("loops"):
660 660 _ip.run_cell("%timeit -n1 -r1 -q 1")
661 661
662 662 def test_timeit_return_quiet():
663 663 with tt.AssertNotPrints("loops"):
664 664 res = _ip.run_line_magic('timeit', '-n1 -r1 -q -o 1')
665 665 assert (res is not None)
666 666
667 667 def test_timeit_invalid_return():
668 668 with nt.assert_raises_regex(SyntaxError, "outside function"):
669 669 _ip.run_line_magic('timeit', 'return')
670 670
671 671 @dec.skipif(execution.profile is None)
672 672 def test_prun_special_syntax():
673 673 "Test %%prun with IPython special syntax"
674 674 @register_line_magic
675 675 def lmagic(line):
676 676 ip = get_ipython()
677 677 ip.user_ns['lmagic_out'] = line
678 678
679 679 # line mode test
680 680 _ip.run_line_magic('prun', '-q %lmagic my line')
681 681 nt.assert_equal(_ip.user_ns['lmagic_out'], 'my line')
682 682 # cell mode test
683 683 _ip.run_cell_magic('prun', '-q', '%lmagic my line2')
684 684 nt.assert_equal(_ip.user_ns['lmagic_out'], 'my line2')
685 685
686 686 @dec.skipif(execution.profile is None)
687 687 def test_prun_quotes():
688 688 "Test that prun does not clobber string escapes (GH #1302)"
689 689 _ip.magic(r"prun -q x = '\t'")
690 690 nt.assert_equal(_ip.user_ns['x'], '\t')
691 691
692 692 def test_extension():
693 693 # Debugging information for failures of this test
694 694 print('sys.path:')
695 695 for p in sys.path:
696 696 print(' ', p)
697 697 print('CWD', os.getcwd())
698 698
699 699 nt.assert_raises(ImportError, _ip.magic, "load_ext daft_extension")
700 700 daft_path = os.path.join(os.path.dirname(__file__), "daft_extension")
701 701 sys.path.insert(0, daft_path)
702 702 try:
703 703 _ip.user_ns.pop('arq', None)
704 704 invalidate_caches() # Clear import caches
705 705 _ip.magic("load_ext daft_extension")
706 706 nt.assert_equal(_ip.user_ns['arq'], 185)
707 707 _ip.magic("unload_ext daft_extension")
708 708 assert 'arq' not in _ip.user_ns
709 709 finally:
710 710 sys.path.remove(daft_path)
711 711
712 712
713 713 def test_notebook_export_json():
714 714 _ip = get_ipython()
715 715 _ip.history_manager.reset() # Clear any existing history.
716 716 cmds = [u"a=1", u"def b():\n return a**2", u"print('noΓ«l, Γ©tΓ©', b())"]
717 717 for i, cmd in enumerate(cmds, start=1):
718 718 _ip.history_manager.store_inputs(i, cmd)
719 719 with TemporaryDirectory() as td:
720 720 outfile = os.path.join(td, "nb.ipynb")
721 721 _ip.magic("notebook -e %s" % outfile)
722 722
723 723
724 724 class TestEnv(TestCase):
725 725
726 726 def test_env(self):
727 727 env = _ip.magic("env")
728 728 self.assertTrue(isinstance(env, dict))
729 729
730 730 def test_env_secret(self):
731 731 env = _ip.magic("env")
732 732 hidden = "<hidden>"
733 733 with mock.patch.dict(
734 734 os.environ,
735 735 {
736 736 "API_KEY": "abc123",
737 737 "SECRET_THING": "ssshhh",
738 738 "JUPYTER_TOKEN": "",
739 739 "VAR": "abc"
740 740 }
741 741 ):
742 742 env = _ip.magic("env")
743 743 assert env["API_KEY"] == hidden
744 744 assert env["SECRET_THING"] == hidden
745 745 assert env["JUPYTER_TOKEN"] == hidden
746 746 assert env["VAR"] == "abc"
747 747
748 748 def test_env_get_set_simple(self):
749 749 env = _ip.magic("env var val1")
750 750 self.assertEqual(env, None)
751 751 self.assertEqual(os.environ['var'], 'val1')
752 752 self.assertEqual(_ip.magic("env var"), 'val1')
753 753 env = _ip.magic("env var=val2")
754 754 self.assertEqual(env, None)
755 755 self.assertEqual(os.environ['var'], 'val2')
756 756
757 757 def test_env_get_set_complex(self):
758 758 env = _ip.magic("env var 'val1 '' 'val2")
759 759 self.assertEqual(env, None)
760 760 self.assertEqual(os.environ['var'], "'val1 '' 'val2")
761 761 self.assertEqual(_ip.magic("env var"), "'val1 '' 'val2")
762 762 env = _ip.magic('env var=val2 val3="val4')
763 763 self.assertEqual(env, None)
764 764 self.assertEqual(os.environ['var'], 'val2 val3="val4')
765 765
766 766 def test_env_set_bad_input(self):
767 767 self.assertRaises(UsageError, lambda: _ip.magic("set_env var"))
768 768
769 769 def test_env_set_whitespace(self):
770 770 self.assertRaises(UsageError, lambda: _ip.magic("env var A=B"))
771 771
772 772
773 773 class CellMagicTestCase(TestCase):
774 774
775 775 def check_ident(self, magic):
776 776 # Manually called, we get the result
777 777 out = _ip.run_cell_magic(magic, 'a', 'b')
778 778 nt.assert_equal(out, ('a','b'))
779 779 # Via run_cell, it goes into the user's namespace via displayhook
780 780 _ip.run_cell('%%' + magic +' c\nd\n')
781 781 nt.assert_equal(_ip.user_ns['_'], ('c','d\n'))
782 782
783 783 def test_cell_magic_func_deco(self):
784 784 "Cell magic using simple decorator"
785 785 @register_cell_magic
786 786 def cellm(line, cell):
787 787 return line, cell
788 788
789 789 self.check_ident('cellm')
790 790
791 791 def test_cell_magic_reg(self):
792 792 "Cell magic manually registered"
793 793 def cellm(line, cell):
794 794 return line, cell
795 795
796 796 _ip.register_magic_function(cellm, 'cell', 'cellm2')
797 797 self.check_ident('cellm2')
798 798
799 799 def test_cell_magic_class(self):
800 800 "Cell magics declared via a class"
801 801 @magics_class
802 802 class MyMagics(Magics):
803 803
804 804 @cell_magic
805 805 def cellm3(self, line, cell):
806 806 return line, cell
807 807
808 808 _ip.register_magics(MyMagics)
809 809 self.check_ident('cellm3')
810 810
811 811 def test_cell_magic_class2(self):
812 812 "Cell magics declared via a class, #2"
813 813 @magics_class
814 814 class MyMagics2(Magics):
815 815
816 816 @cell_magic('cellm4')
817 817 def cellm33(self, line, cell):
818 818 return line, cell
819 819
820 820 _ip.register_magics(MyMagics2)
821 821 self.check_ident('cellm4')
822 822 # Check that nothing is registered as 'cellm33'
823 823 c33 = _ip.find_cell_magic('cellm33')
824 824 nt.assert_equal(c33, None)
825 825
826 826 def test_file():
827 827 """Basic %%writefile"""
828 828 ip = get_ipython()
829 829 with TemporaryDirectory() as td:
830 830 fname = os.path.join(td, 'file1')
831 831 ip.run_cell_magic("writefile", fname, u'\n'.join([
832 832 'line1',
833 833 'line2',
834 834 ]))
835 835 s = Path(fname).read_text()
836 836 nt.assert_in('line1\n', s)
837 837 nt.assert_in('line2', s)
838 838
839 839 @dec.skip_win32
840 840 def test_file_single_quote():
841 841 """Basic %%writefile with embedded single quotes"""
842 842 ip = get_ipython()
843 843 with TemporaryDirectory() as td:
844 844 fname = os.path.join(td, '\'file1\'')
845 845 ip.run_cell_magic("writefile", fname, u'\n'.join([
846 846 'line1',
847 847 'line2',
848 848 ]))
849 849 s = Path(fname).read_text()
850 850 nt.assert_in('line1\n', s)
851 851 nt.assert_in('line2', s)
852 852
853 853 @dec.skip_win32
854 854 def test_file_double_quote():
855 855 """Basic %%writefile with embedded double quotes"""
856 856 ip = get_ipython()
857 857 with TemporaryDirectory() as td:
858 858 fname = os.path.join(td, '"file1"')
859 859 ip.run_cell_magic("writefile", fname, u'\n'.join([
860 860 'line1',
861 861 'line2',
862 862 ]))
863 863 s = Path(fname).read_text()
864 864 nt.assert_in('line1\n', s)
865 865 nt.assert_in('line2', s)
866 866
867 867 def test_file_var_expand():
868 868 """%%writefile $filename"""
869 869 ip = get_ipython()
870 870 with TemporaryDirectory() as td:
871 871 fname = os.path.join(td, 'file1')
872 872 ip.user_ns['filename'] = fname
873 873 ip.run_cell_magic("writefile", '$filename', u'\n'.join([
874 874 'line1',
875 875 'line2',
876 876 ]))
877 877 s = Path(fname).read_text()
878 878 nt.assert_in('line1\n', s)
879 879 nt.assert_in('line2', s)
880 880
881 881 def test_file_unicode():
882 882 """%%writefile with unicode cell"""
883 883 ip = get_ipython()
884 884 with TemporaryDirectory() as td:
885 885 fname = os.path.join(td, 'file1')
886 886 ip.run_cell_magic("writefile", fname, u'\n'.join([
887 887 u'linΓ©1',
888 888 u'linΓ©2',
889 889 ]))
890 890 with io.open(fname, encoding='utf-8') as f:
891 891 s = f.read()
892 892 nt.assert_in(u'linΓ©1\n', s)
893 893 nt.assert_in(u'linΓ©2', s)
894 894
895 895 def test_file_amend():
896 896 """%%writefile -a amends files"""
897 897 ip = get_ipython()
898 898 with TemporaryDirectory() as td:
899 899 fname = os.path.join(td, 'file2')
900 900 ip.run_cell_magic("writefile", fname, u'\n'.join([
901 901 'line1',
902 902 'line2',
903 903 ]))
904 904 ip.run_cell_magic("writefile", "-a %s" % fname, u'\n'.join([
905 905 'line3',
906 906 'line4',
907 907 ]))
908 908 s = Path(fname).read_text()
909 909 nt.assert_in('line1\n', s)
910 910 nt.assert_in('line3\n', s)
911 911
912 912 def test_file_spaces():
913 913 """%%file with spaces in filename"""
914 914 ip = get_ipython()
915 915 with TemporaryWorkingDirectory() as td:
916 916 fname = "file name"
917 917 ip.run_cell_magic("file", '"%s"'%fname, u'\n'.join([
918 918 'line1',
919 919 'line2',
920 920 ]))
921 921 s = Path(fname).read_text()
922 922 nt.assert_in('line1\n', s)
923 923 nt.assert_in('line2', s)
924 924
925 925 def test_script_config():
926 926 ip = get_ipython()
927 927 ip.config.ScriptMagics.script_magics = ['whoda']
928 928 sm = script.ScriptMagics(shell=ip)
929 929 nt.assert_in('whoda', sm.magics['cell'])
930 930
931 931 @dec.skip_win32
932 932 def test_script_out():
933 933 ip = get_ipython()
934 934 ip.run_cell_magic("script", "--out output sh", "echo 'hi'")
935 935 nt.assert_equal(ip.user_ns['output'], 'hi\n')
936 936
937 937 @dec.skip_win32
938 938 def test_script_err():
939 939 ip = get_ipython()
940 940 ip.run_cell_magic("script", "--err error sh", "echo 'hello' >&2")
941 941 nt.assert_equal(ip.user_ns['error'], 'hello\n')
942 942
943 943 @dec.skip_win32
944 944 def test_script_out_err():
945 945 ip = get_ipython()
946 946 ip.run_cell_magic("script", "--out output --err error sh", "echo 'hi'\necho 'hello' >&2")
947 947 nt.assert_equal(ip.user_ns['output'], 'hi\n')
948 948 nt.assert_equal(ip.user_ns['error'], 'hello\n')
949 949
950 950 @dec.skip_win32
951 def test_script_bg_out():
951 async def test_script_bg_out():
952 952 ip = get_ipython()
953 953 ip.run_cell_magic("script", "--bg --out output sh", "echo 'hi'")
954
955 nt.assert_equal(ip.user_ns['output'].read(), b'hi\n')
954 nt.assert_equal((await ip.user_ns["output"].read()), b"hi\n")
956 955 ip.user_ns['output'].close()
957 956
957
958 958 @dec.skip_win32
959 def test_script_bg_err():
959 async def test_script_bg_err():
960 960 ip = get_ipython()
961 961 ip.run_cell_magic("script", "--bg --err error sh", "echo 'hello' >&2")
962 nt.assert_equal(ip.user_ns['error'].read(), b'hello\n')
963 ip.user_ns['error'].close()
962 nt.assert_equal((await ip.user_ns["error"].read()), b"hello\n")
963 ip.user_ns["error"].close()
964
964 965
965 966 @dec.skip_win32
966 def test_script_bg_out_err():
967 async def test_script_bg_out_err():
967 968 ip = get_ipython()
968 ip.run_cell_magic("script", "--bg --out output --err error sh", "echo 'hi'\necho 'hello' >&2")
969 nt.assert_equal(ip.user_ns['output'].read(), b'hi\n')
970 nt.assert_equal(ip.user_ns['error'].read(), b'hello\n')
971 ip.user_ns['output'].close()
972 ip.user_ns['error'].close()
969 ip.run_cell_magic(
970 "script", "--bg --out output --err error sh", "echo 'hi'\necho 'hello' >&2"
971 )
972 nt.assert_equal((await ip.user_ns["output"].read()), b"hi\n")
973 nt.assert_equal((await ip.user_ns["error"].read()), b"hello\n")
974 ip.user_ns["output"].close()
975 ip.user_ns["error"].close()
976
973 977
974 978 def test_script_defaults():
975 979 ip = get_ipython()
976 980 for cmd in ['sh', 'bash', 'perl', 'ruby']:
977 981 try:
978 982 find_cmd(cmd)
979 983 except Exception:
980 984 pass
981 985 else:
982 986 nt.assert_in(cmd, ip.magics_manager.magics['cell'])
983 987
984 988
985 989 @magics_class
986 990 class FooFoo(Magics):
987 991 """class with both %foo and %%foo magics"""
988 992 @line_magic('foo')
989 993 def line_foo(self, line):
990 994 "I am line foo"
991 995 pass
992 996
993 997 @cell_magic("foo")
994 998 def cell_foo(self, line, cell):
995 999 "I am cell foo, not line foo"
996 1000 pass
997 1001
998 1002 def test_line_cell_info():
999 1003 """%%foo and %foo magics are distinguishable to inspect"""
1000 1004 ip = get_ipython()
1001 1005 ip.magics_manager.register(FooFoo)
1002 1006 oinfo = ip.object_inspect('foo')
1003 1007 nt.assert_true(oinfo['found'])
1004 1008 nt.assert_true(oinfo['ismagic'])
1005 1009
1006 1010 oinfo = ip.object_inspect('%%foo')
1007 1011 nt.assert_true(oinfo['found'])
1008 1012 nt.assert_true(oinfo['ismagic'])
1009 1013 nt.assert_equal(oinfo['docstring'], FooFoo.cell_foo.__doc__)
1010 1014
1011 1015 oinfo = ip.object_inspect('%foo')
1012 1016 nt.assert_true(oinfo['found'])
1013 1017 nt.assert_true(oinfo['ismagic'])
1014 1018 nt.assert_equal(oinfo['docstring'], FooFoo.line_foo.__doc__)
1015 1019
1016 1020 def test_multiple_magics():
1017 1021 ip = get_ipython()
1018 1022 foo1 = FooFoo(ip)
1019 1023 foo2 = FooFoo(ip)
1020 1024 mm = ip.magics_manager
1021 1025 mm.register(foo1)
1022 1026 nt.assert_true(mm.magics['line']['foo'].__self__ is foo1)
1023 1027 mm.register(foo2)
1024 1028 nt.assert_true(mm.magics['line']['foo'].__self__ is foo2)
1025 1029
1026 1030 def test_alias_magic():
1027 1031 """Test %alias_magic."""
1028 1032 ip = get_ipython()
1029 1033 mm = ip.magics_manager
1030 1034
1031 1035 # Basic operation: both cell and line magics are created, if possible.
1032 1036 ip.run_line_magic('alias_magic', 'timeit_alias timeit')
1033 1037 nt.assert_in('timeit_alias', mm.magics['line'])
1034 1038 nt.assert_in('timeit_alias', mm.magics['cell'])
1035 1039
1036 1040 # --cell is specified, line magic not created.
1037 1041 ip.run_line_magic('alias_magic', '--cell timeit_cell_alias timeit')
1038 1042 nt.assert_not_in('timeit_cell_alias', mm.magics['line'])
1039 1043 nt.assert_in('timeit_cell_alias', mm.magics['cell'])
1040 1044
1041 1045 # Test that line alias is created successfully.
1042 1046 ip.run_line_magic('alias_magic', '--line env_alias env')
1043 1047 nt.assert_equal(ip.run_line_magic('env', ''),
1044 1048 ip.run_line_magic('env_alias', ''))
1045 1049
1046 1050 # Test that line alias with parameters passed in is created successfully.
1047 1051 ip.run_line_magic('alias_magic', '--line history_alias history --params ' + shlex.quote('3'))
1048 1052 nt.assert_in('history_alias', mm.magics['line'])
1049 1053
1050 1054
1051 1055 def test_save():
1052 1056 """Test %save."""
1053 1057 ip = get_ipython()
1054 1058 ip.history_manager.reset() # Clear any existing history.
1055 1059 cmds = [u"a=1", u"def b():\n return a**2", u"print(a, b())"]
1056 1060 for i, cmd in enumerate(cmds, start=1):
1057 1061 ip.history_manager.store_inputs(i, cmd)
1058 1062 with TemporaryDirectory() as tmpdir:
1059 1063 file = os.path.join(tmpdir, "testsave.py")
1060 1064 ip.run_line_magic("save", "%s 1-10" % file)
1061 1065 content = Path(file).read_text()
1062 1066 nt.assert_equal(content.count(cmds[0]), 1)
1063 1067 nt.assert_in("coding: utf-8", content)
1064 1068 ip.run_line_magic("save", "-a %s 1-10" % file)
1065 1069 content = Path(file).read_text()
1066 1070 nt.assert_equal(content.count(cmds[0]), 2)
1067 1071 nt.assert_in("coding: utf-8", content)
1068 1072
1069 1073
1070 1074 def test_store():
1071 1075 """Test %store."""
1072 1076 ip = get_ipython()
1073 1077 ip.run_line_magic('load_ext', 'storemagic')
1074 1078
1075 1079 # make sure the storage is empty
1076 1080 ip.run_line_magic('store', '-z')
1077 1081 ip.user_ns['var'] = 42
1078 1082 ip.run_line_magic('store', 'var')
1079 1083 ip.user_ns['var'] = 39
1080 1084 ip.run_line_magic('store', '-r')
1081 1085 nt.assert_equal(ip.user_ns['var'], 42)
1082 1086
1083 1087 ip.run_line_magic('store', '-d var')
1084 1088 ip.user_ns['var'] = 39
1085 1089 ip.run_line_magic('store' , '-r')
1086 1090 nt.assert_equal(ip.user_ns['var'], 39)
1087 1091
1088 1092
1089 1093 def _run_edit_test(arg_s, exp_filename=None,
1090 1094 exp_lineno=-1,
1091 1095 exp_contents=None,
1092 1096 exp_is_temp=None):
1093 1097 ip = get_ipython()
1094 1098 M = code.CodeMagics(ip)
1095 1099 last_call = ['','']
1096 1100 opts,args = M.parse_options(arg_s,'prxn:')
1097 1101 filename, lineno, is_temp = M._find_edit_target(ip, args, opts, last_call)
1098 1102
1099 1103 if exp_filename is not None:
1100 1104 nt.assert_equal(exp_filename, filename)
1101 1105 if exp_contents is not None:
1102 1106 with io.open(filename, 'r', encoding='utf-8') as f:
1103 1107 contents = f.read()
1104 1108 nt.assert_equal(exp_contents, contents)
1105 1109 if exp_lineno != -1:
1106 1110 nt.assert_equal(exp_lineno, lineno)
1107 1111 if exp_is_temp is not None:
1108 1112 nt.assert_equal(exp_is_temp, is_temp)
1109 1113
1110 1114
1111 1115 def test_edit_interactive():
1112 1116 """%edit on interactively defined objects"""
1113 1117 ip = get_ipython()
1114 1118 n = ip.execution_count
1115 1119 ip.run_cell(u"def foo(): return 1", store_history=True)
1116 1120
1117 1121 try:
1118 1122 _run_edit_test("foo")
1119 1123 except code.InteractivelyDefined as e:
1120 1124 nt.assert_equal(e.index, n)
1121 1125 else:
1122 1126 raise AssertionError("Should have raised InteractivelyDefined")
1123 1127
1124 1128
1125 1129 def test_edit_cell():
1126 1130 """%edit [cell id]"""
1127 1131 ip = get_ipython()
1128 1132
1129 1133 ip.run_cell(u"def foo(): return 1", store_history=True)
1130 1134
1131 1135 # test
1132 1136 _run_edit_test("1", exp_contents=ip.user_ns['In'][1], exp_is_temp=True)
1133 1137
1134 1138 def test_edit_fname():
1135 1139 """%edit file"""
1136 1140 # test
1137 1141 _run_edit_test("test file.py", exp_filename="test file.py")
1138 1142
1139 1143 def test_bookmark():
1140 1144 ip = get_ipython()
1141 1145 ip.run_line_magic('bookmark', 'bmname')
1142 1146 with tt.AssertPrints('bmname'):
1143 1147 ip.run_line_magic('bookmark', '-l')
1144 1148 ip.run_line_magic('bookmark', '-d bmname')
1145 1149
1146 1150 def test_ls_magic():
1147 1151 ip = get_ipython()
1148 1152 json_formatter = ip.display_formatter.formatters['application/json']
1149 1153 json_formatter.enabled = True
1150 1154 lsmagic = ip.magic('lsmagic')
1151 1155 with warnings.catch_warnings(record=True) as w:
1152 1156 j = json_formatter(lsmagic)
1153 1157 nt.assert_equal(sorted(j), ['cell', 'line'])
1154 1158 nt.assert_equal(w, []) # no warnings
1155 1159
1156 1160 def test_strip_initial_indent():
1157 1161 def sii(s):
1158 1162 lines = s.splitlines()
1159 1163 return '\n'.join(code.strip_initial_indent(lines))
1160 1164
1161 1165 nt.assert_equal(sii(" a = 1\nb = 2"), "a = 1\nb = 2")
1162 1166 nt.assert_equal(sii(" a\n b\nc"), "a\n b\nc")
1163 1167 nt.assert_equal(sii("a\n b"), "a\n b")
1164 1168
1165 1169 def test_logging_magic_quiet_from_arg():
1166 1170 _ip.config.LoggingMagics.quiet = False
1167 1171 lm = logging.LoggingMagics(shell=_ip)
1168 1172 with TemporaryDirectory() as td:
1169 1173 try:
1170 1174 with tt.AssertNotPrints(re.compile("Activating.*")):
1171 1175 lm.logstart('-q {}'.format(
1172 1176 os.path.join(td, "quiet_from_arg.log")))
1173 1177 finally:
1174 1178 _ip.logger.logstop()
1175 1179
1176 1180 def test_logging_magic_quiet_from_config():
1177 1181 _ip.config.LoggingMagics.quiet = True
1178 1182 lm = logging.LoggingMagics(shell=_ip)
1179 1183 with TemporaryDirectory() as td:
1180 1184 try:
1181 1185 with tt.AssertNotPrints(re.compile("Activating.*")):
1182 1186 lm.logstart(os.path.join(td, "quiet_from_config.log"))
1183 1187 finally:
1184 1188 _ip.logger.logstop()
1185 1189
1186 1190
1187 1191 def test_logging_magic_not_quiet():
1188 1192 _ip.config.LoggingMagics.quiet = False
1189 1193 lm = logging.LoggingMagics(shell=_ip)
1190 1194 with TemporaryDirectory() as td:
1191 1195 try:
1192 1196 with tt.AssertPrints(re.compile("Activating.*")):
1193 1197 lm.logstart(os.path.join(td, "not_quiet.log"))
1194 1198 finally:
1195 1199 _ip.logger.logstop()
1196 1200
1197 1201
1198 1202 def test_time_no_var_expand():
1199 1203 _ip.user_ns['a'] = 5
1200 1204 _ip.user_ns['b'] = []
1201 1205 _ip.magic('time b.append("{a}")')
1202 1206 assert _ip.user_ns['b'] == ['{a}']
1203 1207
1204 1208
1205 1209 # this is slow, put at the end for local testing.
1206 1210 def test_timeit_arguments():
1207 1211 "Test valid timeit arguments, should not cause SyntaxError (GH #1269)"
1208 1212 if sys.version_info < (3,7):
1209 1213 _ip.magic("timeit -n1 -r1 ('#')")
1210 1214 else:
1211 1215 # 3.7 optimize no-op statement like above out, and complain there is
1212 1216 # nothing in the for loop.
1213 1217 _ip.magic("timeit -n1 -r1 a=('#')")
1214 1218
1215 1219
1216 1220 TEST_MODULE = """
1217 1221 print('Loaded my_tmp')
1218 1222 if __name__ == "__main__":
1219 1223 print('I just ran a script')
1220 1224 """
1221 1225
1222 1226
1223 1227 def test_run_module_from_import_hook():
1224 1228 "Test that a module can be loaded via an import hook"
1225 1229 with TemporaryDirectory() as tmpdir:
1226 1230 fullpath = os.path.join(tmpdir, 'my_tmp.py')
1227 1231 Path(fullpath).write_text(TEST_MODULE)
1228 1232
1229 1233 class MyTempImporter(object):
1230 1234 def __init__(self):
1231 1235 pass
1232 1236
1233 1237 def find_module(self, fullname, path=None):
1234 1238 if 'my_tmp' in fullname:
1235 1239 return self
1236 1240 return None
1237 1241
1238 1242 def load_module(self, name):
1239 1243 import imp
1240 1244 return imp.load_source('my_tmp', fullpath)
1241 1245
1242 1246 def get_code(self, fullname):
1243 1247 return compile(Path(fullpath).read_text(), "foo", "exec")
1244 1248
1245 1249 def is_package(self, __):
1246 1250 return False
1247 1251
1248 1252 sys.meta_path.insert(0, MyTempImporter())
1249 1253
1250 1254 with capture_output() as captured:
1251 1255 _ip.magic("run -m my_tmp")
1252 1256 _ip.run_cell("import my_tmp")
1253 1257
1254 1258 output = "Loaded my_tmp\nI just ran a script\nLoaded my_tmp\n"
1255 1259 nt.assert_equal(output, captured.stdout)
1256 1260
1257 1261 sys.meta_path.pop(0)
General Comments 0
You need to be logged in to leave comments. Login now