##// END OF EJS Templates
Line-buffered output for %%magic scripts
Ethan Madden -
Show More
@@ -1,294 +1,316 b''
1 1 """Magic functions for running cells in various scripts."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 import errno
7 7 import os
8 8 import sys
9 9 import signal
10 10 import time
11 from subprocess import Popen, PIPE, CalledProcessError
11 import asyncio
12 12 import atexit
13 13
14 14 from IPython.core import magic_arguments
15 15 from IPython.core.magic import (
16 16 Magics, magics_class, line_magic, cell_magic
17 17 )
18 18 from IPython.lib.backgroundjobs import BackgroundJobManager
19 19 from IPython.utils import py3compat
20 20 from IPython.utils.process import arg_split
21 21 from traitlets import List, Dict, default
22 22
23 23 #-----------------------------------------------------------------------------
24 24 # Magic implementation classes
25 25 #-----------------------------------------------------------------------------
26 26
27 27 def script_args(f):
28 28 """single decorator for adding script args"""
29 29 args = [
30 30 magic_arguments.argument(
31 31 '--out', type=str,
32 32 help="""The variable in which to store stdout from the script.
33 33 If the script is backgrounded, this will be the stdout *pipe*,
34 34 instead of the stderr text itself and will not be auto closed.
35 35 """
36 36 ),
37 37 magic_arguments.argument(
38 38 '--err', type=str,
39 39 help="""The variable in which to store stderr from the script.
40 40 If the script is backgrounded, this will be the stderr *pipe*,
41 41 instead of the stderr text itself and will not be autoclosed.
42 42 """
43 43 ),
44 44 magic_arguments.argument(
45 45 '--bg', action="store_true",
46 46 help="""Whether to run the script in the background.
47 47 If given, the only way to see the output of the command is
48 48 with --out/err.
49 49 """
50 50 ),
51 51 magic_arguments.argument(
52 52 '--proc', type=str,
53 53 help="""The variable in which to store Popen instance.
54 54 This is used only when --bg option is given.
55 55 """
56 56 ),
57 57 magic_arguments.argument(
58 58 '--no-raise-error', action="store_false", dest='raise_error',
59 59 help="""Whether you should raise an error message in addition to
60 60 a stream on stderr if you get a nonzero exit code.
61 61 """
62 62 )
63 63 ]
64 64 for arg in args:
65 65 f = arg(f)
66 66 return f
67 67
68 68 @magics_class
69 69 class ScriptMagics(Magics):
70 70 """Magics for talking to scripts
71 71
72 72 This defines a base `%%script` cell magic for running a cell
73 73 with a program in a subprocess, and registers a few top-level
74 74 magics that call %%script with common interpreters.
75 75 """
76 76 script_magics = List(
77 77 help="""Extra script cell magics to define
78 78
79 79 This generates simple wrappers of `%%script foo` as `%%foo`.
80 80
81 81 If you want to add script magics that aren't on your path,
82 82 specify them in script_paths
83 83 """,
84 84 ).tag(config=True)
85 85 @default('script_magics')
86 86 def _script_magics_default(self):
87 87 """default to a common list of programs"""
88 88
89 89 defaults = [
90 90 'sh',
91 91 'bash',
92 92 'perl',
93 93 'ruby',
94 94 'python',
95 95 'python2',
96 96 'python3',
97 97 'pypy',
98 98 ]
99 99 if os.name == 'nt':
100 100 defaults.extend([
101 101 'cmd',
102 102 ])
103 103
104 104 return defaults
105 105
106 106 script_paths = Dict(
107 107 help="""Dict mapping short 'ruby' names to full paths, such as '/opt/secret/bin/ruby'
108 108
109 109 Only necessary for items in script_magics where the default path will not
110 110 find the right interpreter.
111 111 """
112 112 ).tag(config=True)
113 113
114 114 def __init__(self, shell=None):
115 115 super(ScriptMagics, self).__init__(shell=shell)
116 116 self._generate_script_magics()
117 117 self.job_manager = BackgroundJobManager()
118 118 self.bg_processes = []
119 119 atexit.register(self.kill_bg_processes)
120 120
121 121 def __del__(self):
122 122 self.kill_bg_processes()
123 123
124 124 def _generate_script_magics(self):
125 125 cell_magics = self.magics['cell']
126 126 for name in self.script_magics:
127 127 cell_magics[name] = self._make_script_magic(name)
128 128
129 129 def _make_script_magic(self, name):
130 130 """make a named magic, that calls %%script with a particular program"""
131 131 # expand to explicit path if necessary:
132 132 script = self.script_paths.get(name, name)
133 133
134 134 @magic_arguments.magic_arguments()
135 135 @script_args
136 136 def named_script_magic(line, cell):
137 137 # if line, add it as cl-flags
138 138 if line:
139 139 line = "%s %s" % (script, line)
140 140 else:
141 141 line = script
142 142 return self.shebang(line, cell)
143 143
144 144 # write a basic docstring:
145 145 named_script_magic.__doc__ = \
146 146 """%%{name} script magic
147 147
148 148 Run cells with {script} in a subprocess.
149 149
150 150 This is a shortcut for `%%script {script}`
151 151 """.format(**locals())
152 152
153 153 return named_script_magic
154 154
155 155 @magic_arguments.magic_arguments()
156 156 @script_args
157 157 @cell_magic("script")
158 158 def shebang(self, line, cell):
159 159 """Run a cell via a shell command
160 160
161 161 The `%%script` line is like the #! line of script,
162 162 specifying a program (bash, perl, ruby, etc.) with which to run.
163 163
164 164 The rest of the cell is run by that program.
165 165
166 166 Examples
167 167 --------
168 168 ::
169 169
170 170 In [1]: %%script bash
171 171 ...: for i in 1 2 3; do
172 172 ...: echo $i
173 173 ...: done
174 174 1
175 175 2
176 176 3
177 177 """
178 argv = arg_split(line, posix = not sys.platform.startswith('win'))
178
179 async def _handle_stream(stream, stream_arg, file_object):
180 while True:
181 line = (await stream.readline()).decode("utf8")
182 if not line:
183 break
184 if stream_arg:
185 self.shell.user_ns[stream_arg] = line
186 else:
187 file_object.write(line)
188 file_object.flush()
189
190 async def _stream_communicate(process, cell):
191 process.stdin.write(cell)
192 process.stdin.close()
193 stdout_task = asyncio.create_task(
194 _handle_stream(process.stdout, args.out, sys.stdout)
195 )
196 stderr_task = asyncio.create_task(
197 _handle_stream(process.stderr, args.err, sys.stderr)
198 )
199 await asyncio.wait([stdout_task, stderr_task])
200
201 if sys.platform.startswith("win"):
202 asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
203 loop = asyncio.get_event_loop()
204
205 argv = arg_split(line, posix=not sys.platform.startswith('win'))
179 206 args, cmd = self.shebang.parser.parse_known_args(argv)
180
181 207 try:
182 p = Popen(cmd, stdout=PIPE, stderr=PIPE, stdin=PIPE)
208 p = loop.run_until_complete(
209 asyncio.create_subprocess_exec(
210 *cmd,
211 stdout=asyncio.subprocess.PIPE,
212 stderr=asyncio.subprocess.PIPE,
213 stdin=asyncio.subprocess.PIPE
214 )
215 )
183 216 except OSError as e:
184 217 if e.errno == errno.ENOENT:
185 218 print("Couldn't find program: %r" % cmd[0])
186 219 return
187 220 else:
188 221 raise
189 222
190 223 if not cell.endswith('\n'):
191 224 cell += '\n'
192 225 cell = cell.encode('utf8', 'replace')
193 226 if args.bg:
194 227 self.bg_processes.append(p)
195 228 self._gc_bg_processes()
196 229 to_close = []
197 230 if args.out:
198 231 self.shell.user_ns[args.out] = p.stdout
199 232 else:
200 233 to_close.append(p.stdout)
201 234 if args.err:
202 235 self.shell.user_ns[args.err] = p.stderr
203 236 else:
204 237 to_close.append(p.stderr)
205 238 self.job_manager.new(self._run_script, p, cell, to_close, daemon=True)
206 239 if args.proc:
207 240 self.shell.user_ns[args.proc] = p
208 241 return
209 242
210 243 try:
211 out, err = p.communicate(cell)
244 loop.run_until_complete(_stream_communicate(p, cell))
212 245 except KeyboardInterrupt:
213 246 try:
214 247 p.send_signal(signal.SIGINT)
215 248 time.sleep(0.1)
216 if p.poll() is not None:
249 if p.returncode is not None:
217 250 print("Process is interrupted.")
218 251 return
219 252 p.terminate()
220 253 time.sleep(0.1)
221 if p.poll() is not None:
254 if p.returncode is not None:
222 255 print("Process is terminated.")
223 256 return
224 257 p.kill()
225 258 print("Process is killed.")
226 259 except OSError:
227 260 pass
228 261 except Exception as e:
229 262 print("Error while terminating subprocess (pid=%i): %s" \
230 263 % (p.pid, e))
231 264 return
232 out = py3compat.decode(out)
233 err = py3compat.decode(err)
234 if args.out:
235 self.shell.user_ns[args.out] = out
236 else:
237 sys.stdout.write(out)
238 sys.stdout.flush()
239 if args.err:
240 self.shell.user_ns[args.err] = err
241 else:
242 sys.stderr.write(err)
243 sys.stderr.flush()
244 265 if args.raise_error and p.returncode!=0:
245 raise CalledProcessError(p.returncode, cell, output=out, stderr=err)
266 print(p.returncode)
267 raise CalledProcessError(p.returncode, cell)
246 268
247 269 def _run_script(self, p, cell, to_close):
248 270 """callback for running the script in the background"""
249 271 p.stdin.write(cell)
250 272 p.stdin.close()
251 273 for s in to_close:
252 274 s.close()
253 275 p.wait()
254 276
255 277 @line_magic("killbgscripts")
256 278 def killbgscripts(self, _nouse_=''):
257 279 """Kill all BG processes started by %%script and its family."""
258 280 self.kill_bg_processes()
259 281 print("All background processes were killed.")
260 282
261 283 def kill_bg_processes(self):
262 284 """Kill all BG processes which are still running."""
263 285 if not self.bg_processes:
264 286 return
265 287 for p in self.bg_processes:
266 if p.poll() is None:
288 if p.returncode is None:
267 289 try:
268 290 p.send_signal(signal.SIGINT)
269 291 except:
270 292 pass
271 293 time.sleep(0.1)
272 294 self._gc_bg_processes()
273 295 if not self.bg_processes:
274 296 return
275 297 for p in self.bg_processes:
276 if p.poll() is None:
298 if p.returncode is None:
277 299 try:
278 300 p.terminate()
279 301 except:
280 302 pass
281 303 time.sleep(0.1)
282 304 self._gc_bg_processes()
283 305 if not self.bg_processes:
284 306 return
285 307 for p in self.bg_processes:
286 if p.poll() is None:
308 if p.returncode is None:
287 309 try:
288 310 p.kill()
289 311 except:
290 312 pass
291 313 self._gc_bg_processes()
292 314
293 315 def _gc_bg_processes(self):
294 self.bg_processes = [p for p in self.bg_processes if p.poll() is None]
316 self.bg_processes = [p for p in self.bg_processes if p.returncode is None]
@@ -1,1257 +1,1257 b''
1 1 # -*- coding: utf-8 -*-
2 2 """Tests for various magic functions.
3 3
4 4 Needs to be run by nose (to make ipython session available).
5 5 """
6 6
7 7 import io
8 8 import os
9 9 import re
10 10 import sys
11 11 import warnings
12 12 from textwrap import dedent
13 13 from unittest import TestCase
14 14 from unittest import mock
15 15 from importlib import invalidate_caches
16 16 from io import StringIO
17 17 from pathlib import Path
18 18
19 19 import nose.tools as nt
20 20
21 21 import shlex
22 22
23 23 from IPython import get_ipython
24 24 from IPython.core import magic
25 25 from IPython.core.error import UsageError
26 26 from IPython.core.magic import (Magics, magics_class, line_magic,
27 27 cell_magic,
28 28 register_line_magic, register_cell_magic)
29 29 from IPython.core.magics import execution, script, code, logging, osm
30 30 from IPython.testing import decorators as dec
31 31 from IPython.testing import tools as tt
32 32 from IPython.utils.io import capture_output
33 33 from IPython.utils.tempdir import (TemporaryDirectory,
34 34 TemporaryWorkingDirectory)
35 35 from IPython.utils.process import find_cmd
36 36 from .test_debugger import PdbTestInput
37 37
38 38
39 39 @magic.magics_class
40 40 class DummyMagics(magic.Magics): pass
41 41
42 42 def test_extract_code_ranges():
43 43 instr = "1 3 5-6 7-9 10:15 17: :10 10- -13 :"
44 44 expected = [(0, 1),
45 45 (2, 3),
46 46 (4, 6),
47 47 (6, 9),
48 48 (9, 14),
49 49 (16, None),
50 50 (None, 9),
51 51 (9, None),
52 52 (None, 13),
53 53 (None, None)]
54 54 actual = list(code.extract_code_ranges(instr))
55 55 nt.assert_equal(actual, expected)
56 56
57 57 def test_extract_symbols():
58 58 source = """import foo\na = 10\ndef b():\n return 42\n\n\nclass A: pass\n\n\n"""
59 59 symbols_args = ["a", "b", "A", "A,b", "A,a", "z"]
60 60 expected = [([], ['a']),
61 61 (["def b():\n return 42\n"], []),
62 62 (["class A: pass\n"], []),
63 63 (["class A: pass\n", "def b():\n return 42\n"], []),
64 64 (["class A: pass\n"], ['a']),
65 65 ([], ['z'])]
66 66 for symbols, exp in zip(symbols_args, expected):
67 67 nt.assert_equal(code.extract_symbols(source, symbols), exp)
68 68
69 69
70 70 def test_extract_symbols_raises_exception_with_non_python_code():
71 71 source = ("=begin A Ruby program :)=end\n"
72 72 "def hello\n"
73 73 "puts 'Hello world'\n"
74 74 "end")
75 75 with nt.assert_raises(SyntaxError):
76 76 code.extract_symbols(source, "hello")
77 77
78 78
79 79 def test_magic_not_found():
80 80 # magic not found raises UsageError
81 81 with nt.assert_raises(UsageError):
82 82 _ip.magic('doesntexist')
83 83
84 84 # ensure result isn't success when a magic isn't found
85 85 result = _ip.run_cell('%doesntexist')
86 86 assert isinstance(result.error_in_exec, UsageError)
87 87
88 88
89 89 def test_cell_magic_not_found():
90 90 # magic not found raises UsageError
91 91 with nt.assert_raises(UsageError):
92 92 _ip.run_cell_magic('doesntexist', 'line', 'cell')
93 93
94 94 # ensure result isn't success when a magic isn't found
95 95 result = _ip.run_cell('%%doesntexist')
96 96 assert isinstance(result.error_in_exec, UsageError)
97 97
98 98
99 99 def test_magic_error_status():
100 100 def fail(shell):
101 101 1/0
102 102 _ip.register_magic_function(fail)
103 103 result = _ip.run_cell('%fail')
104 104 assert isinstance(result.error_in_exec, ZeroDivisionError)
105 105
106 106
107 107 def test_config():
108 108 """ test that config magic does not raise
109 109 can happen if Configurable init is moved too early into
110 110 Magics.__init__ as then a Config object will be registered as a
111 111 magic.
112 112 """
113 113 ## should not raise.
114 114 _ip.magic('config')
115 115
116 116 def test_config_available_configs():
117 117 """ test that config magic prints available configs in unique and
118 118 sorted order. """
119 119 with capture_output() as captured:
120 120 _ip.magic('config')
121 121
122 122 stdout = captured.stdout
123 123 config_classes = stdout.strip().split('\n')[1:]
124 124 nt.assert_list_equal(config_classes, sorted(set(config_classes)))
125 125
126 126 def test_config_print_class():
127 127 """ test that config with a classname prints the class's options. """
128 128 with capture_output() as captured:
129 129 _ip.magic('config TerminalInteractiveShell')
130 130
131 131 stdout = captured.stdout
132 132 if not re.match("TerminalInteractiveShell.* options", stdout.splitlines()[0]):
133 133 print(stdout)
134 134 raise AssertionError("1st line of stdout not like "
135 135 "'TerminalInteractiveShell.* options'")
136 136
137 137 def test_rehashx():
138 138 # clear up everything
139 139 _ip.alias_manager.clear_aliases()
140 140 del _ip.db['syscmdlist']
141 141
142 142 _ip.magic('rehashx')
143 143 # Practically ALL ipython development systems will have more than 10 aliases
144 144
145 145 nt.assert_true(len(_ip.alias_manager.aliases) > 10)
146 146 for name, cmd in _ip.alias_manager.aliases:
147 147 # we must strip dots from alias names
148 148 nt.assert_not_in('.', name)
149 149
150 150 # rehashx must fill up syscmdlist
151 151 scoms = _ip.db['syscmdlist']
152 152 nt.assert_true(len(scoms) > 10)
153 153
154 154
155 155
156 156 def test_magic_parse_options():
157 157 """Test that we don't mangle paths when parsing magic options."""
158 158 ip = get_ipython()
159 159 path = 'c:\\x'
160 160 m = DummyMagics(ip)
161 161 opts = m.parse_options('-f %s' % path,'f:')[0]
162 162 # argv splitting is os-dependent
163 163 if os.name == 'posix':
164 164 expected = 'c:x'
165 165 else:
166 166 expected = path
167 167 nt.assert_equal(opts['f'], expected)
168 168
169 169 def test_magic_parse_long_options():
170 170 """Magic.parse_options can handle --foo=bar long options"""
171 171 ip = get_ipython()
172 172 m = DummyMagics(ip)
173 173 opts, _ = m.parse_options('--foo --bar=bubble', 'a', 'foo', 'bar=')
174 174 nt.assert_in('foo', opts)
175 175 nt.assert_in('bar', opts)
176 176 nt.assert_equal(opts['bar'], "bubble")
177 177
178 178
179 179 def doctest_hist_f():
180 180 """Test %hist -f with temporary filename.
181 181
182 182 In [9]: import tempfile
183 183
184 184 In [10]: tfile = tempfile.mktemp('.py','tmp-ipython-')
185 185
186 186 In [11]: %hist -nl -f $tfile 3
187 187
188 188 In [13]: import os; os.unlink(tfile)
189 189 """
190 190
191 191
192 192 def doctest_hist_op():
193 193 """Test %hist -op
194 194
195 195 In [1]: class b(float):
196 196 ...: pass
197 197 ...:
198 198
199 199 In [2]: class s(object):
200 200 ...: def __str__(self):
201 201 ...: return 's'
202 202 ...:
203 203
204 204 In [3]:
205 205
206 206 In [4]: class r(b):
207 207 ...: def __repr__(self):
208 208 ...: return 'r'
209 209 ...:
210 210
211 211 In [5]: class sr(s,r): pass
212 212 ...:
213 213
214 214 In [6]:
215 215
216 216 In [7]: bb=b()
217 217
218 218 In [8]: ss=s()
219 219
220 220 In [9]: rr=r()
221 221
222 222 In [10]: ssrr=sr()
223 223
224 224 In [11]: 4.5
225 225 Out[11]: 4.5
226 226
227 227 In [12]: str(ss)
228 228 Out[12]: 's'
229 229
230 230 In [13]:
231 231
232 232 In [14]: %hist -op
233 233 >>> class b:
234 234 ... pass
235 235 ...
236 236 >>> class s(b):
237 237 ... def __str__(self):
238 238 ... return 's'
239 239 ...
240 240 >>>
241 241 >>> class r(b):
242 242 ... def __repr__(self):
243 243 ... return 'r'
244 244 ...
245 245 >>> class sr(s,r): pass
246 246 >>>
247 247 >>> bb=b()
248 248 >>> ss=s()
249 249 >>> rr=r()
250 250 >>> ssrr=sr()
251 251 >>> 4.5
252 252 4.5
253 253 >>> str(ss)
254 254 's'
255 255 >>>
256 256 """
257 257
258 258 def test_hist_pof():
259 259 ip = get_ipython()
260 260 ip.run_cell(u"1+2", store_history=True)
261 261 #raise Exception(ip.history_manager.session_number)
262 262 #raise Exception(list(ip.history_manager._get_range_session()))
263 263 with TemporaryDirectory() as td:
264 264 tf = os.path.join(td, 'hist.py')
265 265 ip.run_line_magic('history', '-pof %s' % tf)
266 266 assert os.path.isfile(tf)
267 267
268 268
269 269 def test_macro():
270 270 ip = get_ipython()
271 271 ip.history_manager.reset() # Clear any existing history.
272 272 cmds = ["a=1", "def b():\n return a**2", "print(a,b())"]
273 273 for i, cmd in enumerate(cmds, start=1):
274 274 ip.history_manager.store_inputs(i, cmd)
275 275 ip.magic("macro test 1-3")
276 276 nt.assert_equal(ip.user_ns["test"].value, "\n".join(cmds)+"\n")
277 277
278 278 # List macros
279 279 nt.assert_in("test", ip.magic("macro"))
280 280
281 281
282 282 def test_macro_run():
283 283 """Test that we can run a multi-line macro successfully."""
284 284 ip = get_ipython()
285 285 ip.history_manager.reset()
286 286 cmds = ["a=10", "a+=1", "print(a)", "%macro test 2-3"]
287 287 for cmd in cmds:
288 288 ip.run_cell(cmd, store_history=True)
289 289 nt.assert_equal(ip.user_ns["test"].value, "a+=1\nprint(a)\n")
290 290 with tt.AssertPrints("12"):
291 291 ip.run_cell("test")
292 292 with tt.AssertPrints("13"):
293 293 ip.run_cell("test")
294 294
295 295
296 296 def test_magic_magic():
297 297 """Test %magic"""
298 298 ip = get_ipython()
299 299 with capture_output() as captured:
300 300 ip.magic("magic")
301 301
302 302 stdout = captured.stdout
303 303 nt.assert_in('%magic', stdout)
304 304 nt.assert_in('IPython', stdout)
305 305 nt.assert_in('Available', stdout)
306 306
307 307
308 308 @dec.skipif_not_numpy
309 309 def test_numpy_reset_array_undec():
310 310 "Test '%reset array' functionality"
311 311 _ip.ex('import numpy as np')
312 312 _ip.ex('a = np.empty(2)')
313 313 nt.assert_in('a', _ip.user_ns)
314 314 _ip.magic('reset -f array')
315 315 nt.assert_not_in('a', _ip.user_ns)
316 316
317 317 def test_reset_out():
318 318 "Test '%reset out' magic"
319 319 _ip.run_cell("parrot = 'dead'", store_history=True)
320 320 # test '%reset -f out', make an Out prompt
321 321 _ip.run_cell("parrot", store_history=True)
322 322 nt.assert_true('dead' in [_ip.user_ns[x] for x in ('_','__','___')])
323 323 _ip.magic('reset -f out')
324 324 nt.assert_false('dead' in [_ip.user_ns[x] for x in ('_','__','___')])
325 325 nt.assert_equal(len(_ip.user_ns['Out']), 0)
326 326
327 327 def test_reset_in():
328 328 "Test '%reset in' magic"
329 329 # test '%reset -f in'
330 330 _ip.run_cell("parrot", store_history=True)
331 331 nt.assert_true('parrot' in [_ip.user_ns[x] for x in ('_i','_ii','_iii')])
332 332 _ip.magic('%reset -f in')
333 333 nt.assert_false('parrot' in [_ip.user_ns[x] for x in ('_i','_ii','_iii')])
334 334 nt.assert_equal(len(set(_ip.user_ns['In'])), 1)
335 335
336 336 def test_reset_dhist():
337 337 "Test '%reset dhist' magic"
338 338 _ip.run_cell("tmp = [d for d in _dh]") # copy before clearing
339 339 _ip.magic('cd ' + os.path.dirname(nt.__file__))
340 340 _ip.magic('cd -')
341 341 nt.assert_true(len(_ip.user_ns['_dh']) > 0)
342 342 _ip.magic('reset -f dhist')
343 343 nt.assert_equal(len(_ip.user_ns['_dh']), 0)
344 344 _ip.run_cell("_dh = [d for d in tmp]") #restore
345 345
346 346 def test_reset_in_length():
347 347 "Test that '%reset in' preserves In[] length"
348 348 _ip.run_cell("print 'foo'")
349 349 _ip.run_cell("reset -f in")
350 350 nt.assert_equal(len(_ip.user_ns['In']), _ip.displayhook.prompt_count+1)
351 351
352 352 class TestResetErrors(TestCase):
353 353
354 354 def test_reset_redefine(self):
355 355
356 356 @magics_class
357 357 class KernelMagics(Magics):
358 358 @line_magic
359 359 def less(self, shell): pass
360 360
361 361 _ip.register_magics(KernelMagics)
362 362
363 363 with self.assertLogs() as cm:
364 364 # hack, we want to just capture logs, but assertLogs fails if not
365 365 # logs get produce.
366 366 # so log one things we ignore.
367 367 import logging as log_mod
368 368 log = log_mod.getLogger()
369 369 log.info('Nothing')
370 370 # end hack.
371 371 _ip.run_cell("reset -f")
372 372
373 373 assert len(cm.output) == 1
374 374 for out in cm.output:
375 375 assert "Invalid alias" not in out
376 376
377 377 def test_tb_syntaxerror():
378 378 """test %tb after a SyntaxError"""
379 379 ip = get_ipython()
380 380 ip.run_cell("for")
381 381
382 382 # trap and validate stdout
383 383 save_stdout = sys.stdout
384 384 try:
385 385 sys.stdout = StringIO()
386 386 ip.run_cell("%tb")
387 387 out = sys.stdout.getvalue()
388 388 finally:
389 389 sys.stdout = save_stdout
390 390 # trim output, and only check the last line
391 391 last_line = out.rstrip().splitlines()[-1].strip()
392 392 nt.assert_equal(last_line, "SyntaxError: invalid syntax")
393 393
394 394
395 395 def test_time():
396 396 ip = get_ipython()
397 397
398 398 with tt.AssertPrints("Wall time: "):
399 399 ip.run_cell("%time None")
400 400
401 401 ip.run_cell("def f(kmjy):\n"
402 402 " %time print (2*kmjy)")
403 403
404 404 with tt.AssertPrints("Wall time: "):
405 405 with tt.AssertPrints("hihi", suppress=False):
406 406 ip.run_cell("f('hi')")
407 407
408 408 def test_time_last_not_expression():
409 409 ip.run_cell("%%time\n"
410 410 "var_1 = 1\n"
411 411 "var_2 = 2\n")
412 412 assert ip.user_ns['var_1'] == 1
413 413 del ip.user_ns['var_1']
414 414 assert ip.user_ns['var_2'] == 2
415 415 del ip.user_ns['var_2']
416 416
417 417
418 418 @dec.skip_win32
419 419 def test_time2():
420 420 ip = get_ipython()
421 421
422 422 with tt.AssertPrints("CPU times: user "):
423 423 ip.run_cell("%time None")
424 424
425 425 def test_time3():
426 426 """Erroneous magic function calls, issue gh-3334"""
427 427 ip = get_ipython()
428 428 ip.user_ns.pop('run', None)
429 429
430 430 with tt.AssertNotPrints("not found", channel='stderr'):
431 431 ip.run_cell("%%time\n"
432 432 "run = 0\n"
433 433 "run += 1")
434 434
435 435 def test_multiline_time():
436 436 """Make sure last statement from time return a value."""
437 437 ip = get_ipython()
438 438 ip.user_ns.pop('run', None)
439 439
440 440 ip.run_cell(dedent("""\
441 441 %%time
442 442 a = "ho"
443 443 b = "hey"
444 444 a+b
445 445 """))
446 446 nt.assert_equal(ip.user_ns_hidden['_'], 'hohey')
447 447
448 448 def test_time_local_ns():
449 449 """
450 450 Test that local_ns is actually global_ns when running a cell magic
451 451 """
452 452 ip = get_ipython()
453 453 ip.run_cell("%%time\n"
454 454 "myvar = 1")
455 455 nt.assert_equal(ip.user_ns['myvar'], 1)
456 456 del ip.user_ns['myvar']
457 457
458 458 def test_doctest_mode():
459 459 "Toggle doctest_mode twice, it should be a no-op and run without error"
460 460 _ip.magic('doctest_mode')
461 461 _ip.magic('doctest_mode')
462 462
463 463
464 464 def test_parse_options():
465 465 """Tests for basic options parsing in magics."""
466 466 # These are only the most minimal of tests, more should be added later. At
467 467 # the very least we check that basic text/unicode calls work OK.
468 468 m = DummyMagics(_ip)
469 469 nt.assert_equal(m.parse_options('foo', '')[1], 'foo')
470 470 nt.assert_equal(m.parse_options(u'foo', '')[1], u'foo')
471 471
472 472
473 473 def test_dirops():
474 474 """Test various directory handling operations."""
475 475 # curpath = lambda :os.path.splitdrive(os.getcwd())[1].replace('\\','/')
476 476 curpath = os.getcwd
477 477 startdir = os.getcwd()
478 478 ipdir = os.path.realpath(_ip.ipython_dir)
479 479 try:
480 480 _ip.magic('cd "%s"' % ipdir)
481 481 nt.assert_equal(curpath(), ipdir)
482 482 _ip.magic('cd -')
483 483 nt.assert_equal(curpath(), startdir)
484 484 _ip.magic('pushd "%s"' % ipdir)
485 485 nt.assert_equal(curpath(), ipdir)
486 486 _ip.magic('popd')
487 487 nt.assert_equal(curpath(), startdir)
488 488 finally:
489 489 os.chdir(startdir)
490 490
491 491
492 492 def test_cd_force_quiet():
493 493 """Test OSMagics.cd_force_quiet option"""
494 494 _ip.config.OSMagics.cd_force_quiet = True
495 495 osmagics = osm.OSMagics(shell=_ip)
496 496
497 497 startdir = os.getcwd()
498 498 ipdir = os.path.realpath(_ip.ipython_dir)
499 499
500 500 try:
501 501 with tt.AssertNotPrints(ipdir):
502 502 osmagics.cd('"%s"' % ipdir)
503 503 with tt.AssertNotPrints(startdir):
504 504 osmagics.cd('-')
505 505 finally:
506 506 os.chdir(startdir)
507 507
508 508
509 509 def test_xmode():
510 510 # Calling xmode three times should be a no-op
511 511 xmode = _ip.InteractiveTB.mode
512 512 for i in range(4):
513 513 _ip.magic("xmode")
514 514 nt.assert_equal(_ip.InteractiveTB.mode, xmode)
515 515
516 516 def test_reset_hard():
517 517 monitor = []
518 518 class A(object):
519 519 def __del__(self):
520 520 monitor.append(1)
521 521 def __repr__(self):
522 522 return "<A instance>"
523 523
524 524 _ip.user_ns["a"] = A()
525 525 _ip.run_cell("a")
526 526
527 527 nt.assert_equal(monitor, [])
528 528 _ip.magic("reset -f")
529 529 nt.assert_equal(monitor, [1])
530 530
531 531 class TestXdel(tt.TempFileMixin):
532 532 def test_xdel(self):
533 533 """Test that references from %run are cleared by xdel."""
534 534 src = ("class A(object):\n"
535 535 " monitor = []\n"
536 536 " def __del__(self):\n"
537 537 " self.monitor.append(1)\n"
538 538 "a = A()\n")
539 539 self.mktmp(src)
540 540 # %run creates some hidden references...
541 541 _ip.magic("run %s" % self.fname)
542 542 # ... as does the displayhook.
543 543 _ip.run_cell("a")
544 544
545 545 monitor = _ip.user_ns["A"].monitor
546 546 nt.assert_equal(monitor, [])
547 547
548 548 _ip.magic("xdel a")
549 549
550 550 # Check that a's __del__ method has been called.
551 551 nt.assert_equal(monitor, [1])
552 552
553 553 def doctest_who():
554 554 """doctest for %who
555 555
556 556 In [1]: %reset -f
557 557
558 558 In [2]: alpha = 123
559 559
560 560 In [3]: beta = 'beta'
561 561
562 562 In [4]: %who int
563 563 alpha
564 564
565 565 In [5]: %who str
566 566 beta
567 567
568 568 In [6]: %whos
569 569 Variable Type Data/Info
570 570 ----------------------------
571 571 alpha int 123
572 572 beta str beta
573 573
574 574 In [7]: %who_ls
575 575 Out[7]: ['alpha', 'beta']
576 576 """
577 577
578 578 def test_whos():
579 579 """Check that whos is protected against objects where repr() fails."""
580 580 class A(object):
581 581 def __repr__(self):
582 582 raise Exception()
583 583 _ip.user_ns['a'] = A()
584 584 _ip.magic("whos")
585 585
586 586 def doctest_precision():
587 587 """doctest for %precision
588 588
589 589 In [1]: f = get_ipython().display_formatter.formatters['text/plain']
590 590
591 591 In [2]: %precision 5
592 592 Out[2]: '%.5f'
593 593
594 594 In [3]: f.float_format
595 595 Out[3]: '%.5f'
596 596
597 597 In [4]: %precision %e
598 598 Out[4]: '%e'
599 599
600 600 In [5]: f(3.1415927)
601 601 Out[5]: '3.141593e+00'
602 602 """
603 603
604 604 def test_debug_magic():
605 605 """Test debugging a small code with %debug
606 606
607 607 In [1]: with PdbTestInput(['c']):
608 608 ...: %debug print("a b") #doctest: +ELLIPSIS
609 609 ...:
610 610 ...
611 611 ipdb> c
612 612 a b
613 613 In [2]:
614 614 """
615 615
616 616 def test_psearch():
617 617 with tt.AssertPrints("dict.fromkeys"):
618 618 _ip.run_cell("dict.fr*?")
619 619 with tt.AssertPrints("Ο€.is_integer"):
620 620 _ip.run_cell("Ο€ = 3.14;\nΟ€.is_integ*?")
621 621
622 622 def test_timeit_shlex():
623 623 """test shlex issues with timeit (#1109)"""
624 624 _ip.ex("def f(*a,**kw): pass")
625 625 _ip.magic('timeit -n1 "this is a bug".count(" ")')
626 626 _ip.magic('timeit -r1 -n1 f(" ", 1)')
627 627 _ip.magic('timeit -r1 -n1 f(" ", 1, " ", 2, " ")')
628 628 _ip.magic('timeit -r1 -n1 ("a " + "b")')
629 629 _ip.magic('timeit -r1 -n1 f("a " + "b")')
630 630 _ip.magic('timeit -r1 -n1 f("a " + "b ")')
631 631
632 632
633 633 def test_timeit_special_syntax():
634 634 "Test %%timeit with IPython special syntax"
635 635 @register_line_magic
636 636 def lmagic(line):
637 637 ip = get_ipython()
638 638 ip.user_ns['lmagic_out'] = line
639 639
640 640 # line mode test
641 641 _ip.run_line_magic('timeit', '-n1 -r1 %lmagic my line')
642 642 nt.assert_equal(_ip.user_ns['lmagic_out'], 'my line')
643 643 # cell mode test
644 644 _ip.run_cell_magic('timeit', '-n1 -r1', '%lmagic my line2')
645 645 nt.assert_equal(_ip.user_ns['lmagic_out'], 'my line2')
646 646
647 647 def test_timeit_return():
648 648 """
649 649 test whether timeit -o return object
650 650 """
651 651
652 652 res = _ip.run_line_magic('timeit','-n10 -r10 -o 1')
653 653 assert(res is not None)
654 654
655 655 def test_timeit_quiet():
656 656 """
657 657 test quiet option of timeit magic
658 658 """
659 659 with tt.AssertNotPrints("loops"):
660 660 _ip.run_cell("%timeit -n1 -r1 -q 1")
661 661
662 662 def test_timeit_return_quiet():
663 663 with tt.AssertNotPrints("loops"):
664 664 res = _ip.run_line_magic('timeit', '-n1 -r1 -q -o 1')
665 665 assert (res is not None)
666 666
667 667 def test_timeit_invalid_return():
668 668 with nt.assert_raises_regex(SyntaxError, "outside function"):
669 669 _ip.run_line_magic('timeit', 'return')
670 670
671 671 @dec.skipif(execution.profile is None)
672 672 def test_prun_special_syntax():
673 673 "Test %%prun with IPython special syntax"
674 674 @register_line_magic
675 675 def lmagic(line):
676 676 ip = get_ipython()
677 677 ip.user_ns['lmagic_out'] = line
678 678
679 679 # line mode test
680 680 _ip.run_line_magic('prun', '-q %lmagic my line')
681 681 nt.assert_equal(_ip.user_ns['lmagic_out'], 'my line')
682 682 # cell mode test
683 683 _ip.run_cell_magic('prun', '-q', '%lmagic my line2')
684 684 nt.assert_equal(_ip.user_ns['lmagic_out'], 'my line2')
685 685
686 686 @dec.skipif(execution.profile is None)
687 687 def test_prun_quotes():
688 688 "Test that prun does not clobber string escapes (GH #1302)"
689 689 _ip.magic(r"prun -q x = '\t'")
690 690 nt.assert_equal(_ip.user_ns['x'], '\t')
691 691
692 692 def test_extension():
693 693 # Debugging information for failures of this test
694 694 print('sys.path:')
695 695 for p in sys.path:
696 696 print(' ', p)
697 697 print('CWD', os.getcwd())
698 698
699 699 nt.assert_raises(ImportError, _ip.magic, "load_ext daft_extension")
700 700 daft_path = os.path.join(os.path.dirname(__file__), "daft_extension")
701 701 sys.path.insert(0, daft_path)
702 702 try:
703 703 _ip.user_ns.pop('arq', None)
704 704 invalidate_caches() # Clear import caches
705 705 _ip.magic("load_ext daft_extension")
706 706 nt.assert_equal(_ip.user_ns['arq'], 185)
707 707 _ip.magic("unload_ext daft_extension")
708 708 assert 'arq' not in _ip.user_ns
709 709 finally:
710 710 sys.path.remove(daft_path)
711 711
712 712
713 713 def test_notebook_export_json():
714 714 _ip = get_ipython()
715 715 _ip.history_manager.reset() # Clear any existing history.
716 716 cmds = [u"a=1", u"def b():\n return a**2", u"print('noΓ«l, Γ©tΓ©', b())"]
717 717 for i, cmd in enumerate(cmds, start=1):
718 718 _ip.history_manager.store_inputs(i, cmd)
719 719 with TemporaryDirectory() as td:
720 720 outfile = os.path.join(td, "nb.ipynb")
721 721 _ip.magic("notebook -e %s" % outfile)
722 722
723 723
724 724 class TestEnv(TestCase):
725 725
726 726 def test_env(self):
727 727 env = _ip.magic("env")
728 728 self.assertTrue(isinstance(env, dict))
729 729
730 730 def test_env_secret(self):
731 731 env = _ip.magic("env")
732 732 hidden = "<hidden>"
733 733 with mock.patch.dict(
734 734 os.environ,
735 735 {
736 736 "API_KEY": "abc123",
737 737 "SECRET_THING": "ssshhh",
738 738 "JUPYTER_TOKEN": "",
739 739 "VAR": "abc"
740 740 }
741 741 ):
742 742 env = _ip.magic("env")
743 743 assert env["API_KEY"] == hidden
744 744 assert env["SECRET_THING"] == hidden
745 745 assert env["JUPYTER_TOKEN"] == hidden
746 746 assert env["VAR"] == "abc"
747 747
748 748 def test_env_get_set_simple(self):
749 749 env = _ip.magic("env var val1")
750 750 self.assertEqual(env, None)
751 751 self.assertEqual(os.environ['var'], 'val1')
752 752 self.assertEqual(_ip.magic("env var"), 'val1')
753 753 env = _ip.magic("env var=val2")
754 754 self.assertEqual(env, None)
755 755 self.assertEqual(os.environ['var'], 'val2')
756 756
757 757 def test_env_get_set_complex(self):
758 758 env = _ip.magic("env var 'val1 '' 'val2")
759 759 self.assertEqual(env, None)
760 760 self.assertEqual(os.environ['var'], "'val1 '' 'val2")
761 761 self.assertEqual(_ip.magic("env var"), "'val1 '' 'val2")
762 762 env = _ip.magic('env var=val2 val3="val4')
763 763 self.assertEqual(env, None)
764 764 self.assertEqual(os.environ['var'], 'val2 val3="val4')
765 765
766 766 def test_env_set_bad_input(self):
767 767 self.assertRaises(UsageError, lambda: _ip.magic("set_env var"))
768 768
769 769 def test_env_set_whitespace(self):
770 770 self.assertRaises(UsageError, lambda: _ip.magic("env var A=B"))
771 771
772 772
773 773 class CellMagicTestCase(TestCase):
774 774
775 775 def check_ident(self, magic):
776 776 # Manually called, we get the result
777 777 out = _ip.run_cell_magic(magic, 'a', 'b')
778 778 nt.assert_equal(out, ('a','b'))
779 779 # Via run_cell, it goes into the user's namespace via displayhook
780 780 _ip.run_cell('%%' + magic +' c\nd\n')
781 781 nt.assert_equal(_ip.user_ns['_'], ('c','d\n'))
782 782
783 783 def test_cell_magic_func_deco(self):
784 784 "Cell magic using simple decorator"
785 785 @register_cell_magic
786 786 def cellm(line, cell):
787 787 return line, cell
788 788
789 789 self.check_ident('cellm')
790 790
791 791 def test_cell_magic_reg(self):
792 792 "Cell magic manually registered"
793 793 def cellm(line, cell):
794 794 return line, cell
795 795
796 796 _ip.register_magic_function(cellm, 'cell', 'cellm2')
797 797 self.check_ident('cellm2')
798 798
799 799 def test_cell_magic_class(self):
800 800 "Cell magics declared via a class"
801 801 @magics_class
802 802 class MyMagics(Magics):
803 803
804 804 @cell_magic
805 805 def cellm3(self, line, cell):
806 806 return line, cell
807 807
808 808 _ip.register_magics(MyMagics)
809 809 self.check_ident('cellm3')
810 810
811 811 def test_cell_magic_class2(self):
812 812 "Cell magics declared via a class, #2"
813 813 @magics_class
814 814 class MyMagics2(Magics):
815 815
816 816 @cell_magic('cellm4')
817 817 def cellm33(self, line, cell):
818 818 return line, cell
819 819
820 820 _ip.register_magics(MyMagics2)
821 821 self.check_ident('cellm4')
822 822 # Check that nothing is registered as 'cellm33'
823 823 c33 = _ip.find_cell_magic('cellm33')
824 824 nt.assert_equal(c33, None)
825 825
826 826 def test_file():
827 827 """Basic %%writefile"""
828 828 ip = get_ipython()
829 829 with TemporaryDirectory() as td:
830 830 fname = os.path.join(td, 'file1')
831 831 ip.run_cell_magic("writefile", fname, u'\n'.join([
832 832 'line1',
833 833 'line2',
834 834 ]))
835 835 s = Path(fname).read_text()
836 836 nt.assert_in('line1\n', s)
837 837 nt.assert_in('line2', s)
838 838
839 839 @dec.skip_win32
840 840 def test_file_single_quote():
841 841 """Basic %%writefile with embedded single quotes"""
842 842 ip = get_ipython()
843 843 with TemporaryDirectory() as td:
844 844 fname = os.path.join(td, '\'file1\'')
845 845 ip.run_cell_magic("writefile", fname, u'\n'.join([
846 846 'line1',
847 847 'line2',
848 848 ]))
849 849 s = Path(fname).read_text()
850 850 nt.assert_in('line1\n', s)
851 851 nt.assert_in('line2', s)
852 852
853 853 @dec.skip_win32
854 854 def test_file_double_quote():
855 855 """Basic %%writefile with embedded double quotes"""
856 856 ip = get_ipython()
857 857 with TemporaryDirectory() as td:
858 858 fname = os.path.join(td, '"file1"')
859 859 ip.run_cell_magic("writefile", fname, u'\n'.join([
860 860 'line1',
861 861 'line2',
862 862 ]))
863 863 s = Path(fname).read_text()
864 864 nt.assert_in('line1\n', s)
865 865 nt.assert_in('line2', s)
866 866
867 867 def test_file_var_expand():
868 868 """%%writefile $filename"""
869 869 ip = get_ipython()
870 870 with TemporaryDirectory() as td:
871 871 fname = os.path.join(td, 'file1')
872 872 ip.user_ns['filename'] = fname
873 873 ip.run_cell_magic("writefile", '$filename', u'\n'.join([
874 874 'line1',
875 875 'line2',
876 876 ]))
877 877 s = Path(fname).read_text()
878 878 nt.assert_in('line1\n', s)
879 879 nt.assert_in('line2', s)
880 880
881 881 def test_file_unicode():
882 882 """%%writefile with unicode cell"""
883 883 ip = get_ipython()
884 884 with TemporaryDirectory() as td:
885 885 fname = os.path.join(td, 'file1')
886 886 ip.run_cell_magic("writefile", fname, u'\n'.join([
887 887 u'linΓ©1',
888 888 u'linΓ©2',
889 889 ]))
890 890 with io.open(fname, encoding='utf-8') as f:
891 891 s = f.read()
892 892 nt.assert_in(u'linΓ©1\n', s)
893 893 nt.assert_in(u'linΓ©2', s)
894 894
895 895 def test_file_amend():
896 896 """%%writefile -a amends files"""
897 897 ip = get_ipython()
898 898 with TemporaryDirectory() as td:
899 899 fname = os.path.join(td, 'file2')
900 900 ip.run_cell_magic("writefile", fname, u'\n'.join([
901 901 'line1',
902 902 'line2',
903 903 ]))
904 904 ip.run_cell_magic("writefile", "-a %s" % fname, u'\n'.join([
905 905 'line3',
906 906 'line4',
907 907 ]))
908 908 s = Path(fname).read_text()
909 909 nt.assert_in('line1\n', s)
910 910 nt.assert_in('line3\n', s)
911 911
912 912 def test_file_spaces():
913 913 """%%file with spaces in filename"""
914 914 ip = get_ipython()
915 915 with TemporaryWorkingDirectory() as td:
916 916 fname = "file name"
917 917 ip.run_cell_magic("file", '"%s"'%fname, u'\n'.join([
918 918 'line1',
919 919 'line2',
920 920 ]))
921 921 s = Path(fname).read_text()
922 922 nt.assert_in('line1\n', s)
923 923 nt.assert_in('line2', s)
924 924
925 925 def test_script_config():
926 926 ip = get_ipython()
927 927 ip.config.ScriptMagics.script_magics = ['whoda']
928 928 sm = script.ScriptMagics(shell=ip)
929 929 nt.assert_in('whoda', sm.magics['cell'])
930 930
931 931 @dec.skip_win32
932 932 def test_script_out():
933 933 ip = get_ipython()
934 934 ip.run_cell_magic("script", "--out output sh", "echo 'hi'")
935 935 nt.assert_equal(ip.user_ns['output'], 'hi\n')
936 936
937 937 @dec.skip_win32
938 938 def test_script_err():
939 939 ip = get_ipython()
940 940 ip.run_cell_magic("script", "--err error sh", "echo 'hello' >&2")
941 941 nt.assert_equal(ip.user_ns['error'], 'hello\n')
942 942
943 943 @dec.skip_win32
944 944 def test_script_out_err():
945 945 ip = get_ipython()
946 946 ip.run_cell_magic("script", "--out output --err error sh", "echo 'hi'\necho 'hello' >&2")
947 947 nt.assert_equal(ip.user_ns['output'], 'hi\n')
948 948 nt.assert_equal(ip.user_ns['error'], 'hello\n')
949 949
950 950 @dec.skip_win32
951 def test_script_bg_out():
951 async def test_script_bg_out():
952 952 ip = get_ipython()
953 953 ip.run_cell_magic("script", "--bg --out output sh", "echo 'hi'")
954 954
955 nt.assert_equal(ip.user_ns['output'].read(), b'hi\n')
955 nt.assert_equal((await ip.user_ns['output'].read()), b'hi\n')
956 956 ip.user_ns['output'].close()
957 957
958 958 @dec.skip_win32
959 def test_script_bg_err():
959 async def test_script_bg_err():
960 960 ip = get_ipython()
961 961 ip.run_cell_magic("script", "--bg --err error sh", "echo 'hello' >&2")
962 nt.assert_equal(ip.user_ns['error'].read(), b'hello\n')
962 nt.assert_equal((await ip.user_ns['error'].read()), b'hello\n')
963 963 ip.user_ns['error'].close()
964 964
965 965 @dec.skip_win32
966 def test_script_bg_out_err():
966 async def test_script_bg_out_err():
967 967 ip = get_ipython()
968 968 ip.run_cell_magic("script", "--bg --out output --err error sh", "echo 'hi'\necho 'hello' >&2")
969 nt.assert_equal(ip.user_ns['output'].read(), b'hi\n')
970 nt.assert_equal(ip.user_ns['error'].read(), b'hello\n')
969 nt.assert_equal((await ip.user_ns['output'].read()), b'hi\n')
970 nt.assert_equal((await ip.user_ns['error'].read()), b'hello\n')
971 971 ip.user_ns['output'].close()
972 972 ip.user_ns['error'].close()
973 973
974 974 def test_script_defaults():
975 975 ip = get_ipython()
976 976 for cmd in ['sh', 'bash', 'perl', 'ruby']:
977 977 try:
978 978 find_cmd(cmd)
979 979 except Exception:
980 980 pass
981 981 else:
982 982 nt.assert_in(cmd, ip.magics_manager.magics['cell'])
983 983
984 984
985 985 @magics_class
986 986 class FooFoo(Magics):
987 987 """class with both %foo and %%foo magics"""
988 988 @line_magic('foo')
989 989 def line_foo(self, line):
990 990 "I am line foo"
991 991 pass
992 992
993 993 @cell_magic("foo")
994 994 def cell_foo(self, line, cell):
995 995 "I am cell foo, not line foo"
996 996 pass
997 997
998 998 def test_line_cell_info():
999 999 """%%foo and %foo magics are distinguishable to inspect"""
1000 1000 ip = get_ipython()
1001 1001 ip.magics_manager.register(FooFoo)
1002 1002 oinfo = ip.object_inspect('foo')
1003 1003 nt.assert_true(oinfo['found'])
1004 1004 nt.assert_true(oinfo['ismagic'])
1005 1005
1006 1006 oinfo = ip.object_inspect('%%foo')
1007 1007 nt.assert_true(oinfo['found'])
1008 1008 nt.assert_true(oinfo['ismagic'])
1009 1009 nt.assert_equal(oinfo['docstring'], FooFoo.cell_foo.__doc__)
1010 1010
1011 1011 oinfo = ip.object_inspect('%foo')
1012 1012 nt.assert_true(oinfo['found'])
1013 1013 nt.assert_true(oinfo['ismagic'])
1014 1014 nt.assert_equal(oinfo['docstring'], FooFoo.line_foo.__doc__)
1015 1015
1016 1016 def test_multiple_magics():
1017 1017 ip = get_ipython()
1018 1018 foo1 = FooFoo(ip)
1019 1019 foo2 = FooFoo(ip)
1020 1020 mm = ip.magics_manager
1021 1021 mm.register(foo1)
1022 1022 nt.assert_true(mm.magics['line']['foo'].__self__ is foo1)
1023 1023 mm.register(foo2)
1024 1024 nt.assert_true(mm.magics['line']['foo'].__self__ is foo2)
1025 1025
1026 1026 def test_alias_magic():
1027 1027 """Test %alias_magic."""
1028 1028 ip = get_ipython()
1029 1029 mm = ip.magics_manager
1030 1030
1031 1031 # Basic operation: both cell and line magics are created, if possible.
1032 1032 ip.run_line_magic('alias_magic', 'timeit_alias timeit')
1033 1033 nt.assert_in('timeit_alias', mm.magics['line'])
1034 1034 nt.assert_in('timeit_alias', mm.magics['cell'])
1035 1035
1036 1036 # --cell is specified, line magic not created.
1037 1037 ip.run_line_magic('alias_magic', '--cell timeit_cell_alias timeit')
1038 1038 nt.assert_not_in('timeit_cell_alias', mm.magics['line'])
1039 1039 nt.assert_in('timeit_cell_alias', mm.magics['cell'])
1040 1040
1041 1041 # Test that line alias is created successfully.
1042 1042 ip.run_line_magic('alias_magic', '--line env_alias env')
1043 1043 nt.assert_equal(ip.run_line_magic('env', ''),
1044 1044 ip.run_line_magic('env_alias', ''))
1045 1045
1046 1046 # Test that line alias with parameters passed in is created successfully.
1047 1047 ip.run_line_magic('alias_magic', '--line history_alias history --params ' + shlex.quote('3'))
1048 1048 nt.assert_in('history_alias', mm.magics['line'])
1049 1049
1050 1050
1051 1051 def test_save():
1052 1052 """Test %save."""
1053 1053 ip = get_ipython()
1054 1054 ip.history_manager.reset() # Clear any existing history.
1055 1055 cmds = [u"a=1", u"def b():\n return a**2", u"print(a, b())"]
1056 1056 for i, cmd in enumerate(cmds, start=1):
1057 1057 ip.history_manager.store_inputs(i, cmd)
1058 1058 with TemporaryDirectory() as tmpdir:
1059 1059 file = os.path.join(tmpdir, "testsave.py")
1060 1060 ip.run_line_magic("save", "%s 1-10" % file)
1061 1061 content = Path(file).read_text()
1062 1062 nt.assert_equal(content.count(cmds[0]), 1)
1063 1063 nt.assert_in("coding: utf-8", content)
1064 1064 ip.run_line_magic("save", "-a %s 1-10" % file)
1065 1065 content = Path(file).read_text()
1066 1066 nt.assert_equal(content.count(cmds[0]), 2)
1067 1067 nt.assert_in("coding: utf-8", content)
1068 1068
1069 1069
1070 1070 def test_store():
1071 1071 """Test %store."""
1072 1072 ip = get_ipython()
1073 1073 ip.run_line_magic('load_ext', 'storemagic')
1074 1074
1075 1075 # make sure the storage is empty
1076 1076 ip.run_line_magic('store', '-z')
1077 1077 ip.user_ns['var'] = 42
1078 1078 ip.run_line_magic('store', 'var')
1079 1079 ip.user_ns['var'] = 39
1080 1080 ip.run_line_magic('store', '-r')
1081 1081 nt.assert_equal(ip.user_ns['var'], 42)
1082 1082
1083 1083 ip.run_line_magic('store', '-d var')
1084 1084 ip.user_ns['var'] = 39
1085 1085 ip.run_line_magic('store' , '-r')
1086 1086 nt.assert_equal(ip.user_ns['var'], 39)
1087 1087
1088 1088
1089 1089 def _run_edit_test(arg_s, exp_filename=None,
1090 1090 exp_lineno=-1,
1091 1091 exp_contents=None,
1092 1092 exp_is_temp=None):
1093 1093 ip = get_ipython()
1094 1094 M = code.CodeMagics(ip)
1095 1095 last_call = ['','']
1096 1096 opts,args = M.parse_options(arg_s,'prxn:')
1097 1097 filename, lineno, is_temp = M._find_edit_target(ip, args, opts, last_call)
1098 1098
1099 1099 if exp_filename is not None:
1100 1100 nt.assert_equal(exp_filename, filename)
1101 1101 if exp_contents is not None:
1102 1102 with io.open(filename, 'r', encoding='utf-8') as f:
1103 1103 contents = f.read()
1104 1104 nt.assert_equal(exp_contents, contents)
1105 1105 if exp_lineno != -1:
1106 1106 nt.assert_equal(exp_lineno, lineno)
1107 1107 if exp_is_temp is not None:
1108 1108 nt.assert_equal(exp_is_temp, is_temp)
1109 1109
1110 1110
1111 1111 def test_edit_interactive():
1112 1112 """%edit on interactively defined objects"""
1113 1113 ip = get_ipython()
1114 1114 n = ip.execution_count
1115 1115 ip.run_cell(u"def foo(): return 1", store_history=True)
1116 1116
1117 1117 try:
1118 1118 _run_edit_test("foo")
1119 1119 except code.InteractivelyDefined as e:
1120 1120 nt.assert_equal(e.index, n)
1121 1121 else:
1122 1122 raise AssertionError("Should have raised InteractivelyDefined")
1123 1123
1124 1124
1125 1125 def test_edit_cell():
1126 1126 """%edit [cell id]"""
1127 1127 ip = get_ipython()
1128 1128
1129 1129 ip.run_cell(u"def foo(): return 1", store_history=True)
1130 1130
1131 1131 # test
1132 1132 _run_edit_test("1", exp_contents=ip.user_ns['In'][1], exp_is_temp=True)
1133 1133
1134 1134 def test_edit_fname():
1135 1135 """%edit file"""
1136 1136 # test
1137 1137 _run_edit_test("test file.py", exp_filename="test file.py")
1138 1138
1139 1139 def test_bookmark():
1140 1140 ip = get_ipython()
1141 1141 ip.run_line_magic('bookmark', 'bmname')
1142 1142 with tt.AssertPrints('bmname'):
1143 1143 ip.run_line_magic('bookmark', '-l')
1144 1144 ip.run_line_magic('bookmark', '-d bmname')
1145 1145
1146 1146 def test_ls_magic():
1147 1147 ip = get_ipython()
1148 1148 json_formatter = ip.display_formatter.formatters['application/json']
1149 1149 json_formatter.enabled = True
1150 1150 lsmagic = ip.magic('lsmagic')
1151 1151 with warnings.catch_warnings(record=True) as w:
1152 1152 j = json_formatter(lsmagic)
1153 1153 nt.assert_equal(sorted(j), ['cell', 'line'])
1154 1154 nt.assert_equal(w, []) # no warnings
1155 1155
1156 1156 def test_strip_initial_indent():
1157 1157 def sii(s):
1158 1158 lines = s.splitlines()
1159 1159 return '\n'.join(code.strip_initial_indent(lines))
1160 1160
1161 1161 nt.assert_equal(sii(" a = 1\nb = 2"), "a = 1\nb = 2")
1162 1162 nt.assert_equal(sii(" a\n b\nc"), "a\n b\nc")
1163 1163 nt.assert_equal(sii("a\n b"), "a\n b")
1164 1164
1165 1165 def test_logging_magic_quiet_from_arg():
1166 1166 _ip.config.LoggingMagics.quiet = False
1167 1167 lm = logging.LoggingMagics(shell=_ip)
1168 1168 with TemporaryDirectory() as td:
1169 1169 try:
1170 1170 with tt.AssertNotPrints(re.compile("Activating.*")):
1171 1171 lm.logstart('-q {}'.format(
1172 1172 os.path.join(td, "quiet_from_arg.log")))
1173 1173 finally:
1174 1174 _ip.logger.logstop()
1175 1175
1176 1176 def test_logging_magic_quiet_from_config():
1177 1177 _ip.config.LoggingMagics.quiet = True
1178 1178 lm = logging.LoggingMagics(shell=_ip)
1179 1179 with TemporaryDirectory() as td:
1180 1180 try:
1181 1181 with tt.AssertNotPrints(re.compile("Activating.*")):
1182 1182 lm.logstart(os.path.join(td, "quiet_from_config.log"))
1183 1183 finally:
1184 1184 _ip.logger.logstop()
1185 1185
1186 1186
1187 1187 def test_logging_magic_not_quiet():
1188 1188 _ip.config.LoggingMagics.quiet = False
1189 1189 lm = logging.LoggingMagics(shell=_ip)
1190 1190 with TemporaryDirectory() as td:
1191 1191 try:
1192 1192 with tt.AssertPrints(re.compile("Activating.*")):
1193 1193 lm.logstart(os.path.join(td, "not_quiet.log"))
1194 1194 finally:
1195 1195 _ip.logger.logstop()
1196 1196
1197 1197
1198 1198 def test_time_no_var_expand():
1199 1199 _ip.user_ns['a'] = 5
1200 1200 _ip.user_ns['b'] = []
1201 1201 _ip.magic('time b.append("{a}")')
1202 1202 assert _ip.user_ns['b'] == ['{a}']
1203 1203
1204 1204
1205 1205 # this is slow, put at the end for local testing.
1206 1206 def test_timeit_arguments():
1207 1207 "Test valid timeit arguments, should not cause SyntaxError (GH #1269)"
1208 1208 if sys.version_info < (3,7):
1209 1209 _ip.magic("timeit -n1 -r1 ('#')")
1210 1210 else:
1211 1211 # 3.7 optimize no-op statement like above out, and complain there is
1212 1212 # nothing in the for loop.
1213 1213 _ip.magic("timeit -n1 -r1 a=('#')")
1214 1214
1215 1215
1216 1216 TEST_MODULE = """
1217 1217 print('Loaded my_tmp')
1218 1218 if __name__ == "__main__":
1219 1219 print('I just ran a script')
1220 1220 """
1221 1221
1222 1222
1223 1223 def test_run_module_from_import_hook():
1224 1224 "Test that a module can be loaded via an import hook"
1225 1225 with TemporaryDirectory() as tmpdir:
1226 1226 fullpath = os.path.join(tmpdir, 'my_tmp.py')
1227 1227 Path(fullpath).write_text(TEST_MODULE)
1228 1228
1229 1229 class MyTempImporter(object):
1230 1230 def __init__(self):
1231 1231 pass
1232 1232
1233 1233 def find_module(self, fullname, path=None):
1234 1234 if 'my_tmp' in fullname:
1235 1235 return self
1236 1236 return None
1237 1237
1238 1238 def load_module(self, name):
1239 1239 import imp
1240 1240 return imp.load_source('my_tmp', fullpath)
1241 1241
1242 1242 def get_code(self, fullname):
1243 1243 return compile(Path(fullpath).read_text(), "foo", "exec")
1244 1244
1245 1245 def is_package(self, __):
1246 1246 return False
1247 1247
1248 1248 sys.meta_path.insert(0, MyTempImporter())
1249 1249
1250 1250 with capture_output() as captured:
1251 1251 _ip.magic("run -m my_tmp")
1252 1252 _ip.run_cell("import my_tmp")
1253 1253
1254 1254 output = "Loaded my_tmp\nI just ran a script\nLoaded my_tmp\n"
1255 1255 nt.assert_equal(output, captured.stdout)
1256 1256
1257 1257 sys.meta_path.pop(0)
General Comments 0
You need to be logged in to leave comments. Login now