##// END OF EJS Templates
avoid deprecated asyncio.get_event_loop...
Min RK -
Show More
@@ -1,92 +1,103 b''
1 1 """
2 2 Async helper function that are invalid syntax on Python 3.5 and below.
3 3
4 4 This code is best effort, and may have edge cases not behaving as expected. In
5 5 particular it contain a number of heuristics to detect whether code is
6 6 effectively async and need to run in an event loop or not.
7 7
8 8 Some constructs (like top-level `return`, or `yield`) are taken care of
9 9 explicitly to actually raise a SyntaxError and stay as close as possible to
10 10 Python semantics.
11 11 """
12 12
13 13
14 14 import ast
15 import asyncio
15 16 import inspect
16 17
17 18
18 19 class _AsyncIORunner:
20 def __init__(self):
21 self._loop = None
22
23 @property
24 def loop(self):
25 """Always returns a non-closed event loop"""
26 if self._loop is None or self._loop.is_closed():
27 policy = asyncio.get_event_loop_policy()
28 self._loop = policy.new_event_loop()
29 policy.set_event_loop(self._loop)
30 return self._loop
31
19 32 def __call__(self, coro):
20 33 """
21 34 Handler for asyncio autoawait
22 35 """
23 import asyncio
24
25 return asyncio.get_event_loop_policy().get_event_loop().run_until_complete(coro)
36 return self.loop.run_until_complete(coro)
26 37
27 38 def __str__(self):
28 39 return "asyncio"
29 40
30 41
31 42 _asyncio_runner = _AsyncIORunner()
32 43
33 44
34 45 def _curio_runner(coroutine):
35 46 """
36 47 handler for curio autoawait
37 48 """
38 49 import curio
39 50
40 51 return curio.run(coroutine)
41 52
42 53
43 54 def _trio_runner(async_fn):
44 55 import trio
45 56
46 57 async def loc(coro):
47 58 """
48 59 We need the dummy no-op async def to protect from
49 60 trio's internal. See https://github.com/python-trio/trio/issues/89
50 61 """
51 62 return await coro
52 63
53 64 return trio.run(loc, async_fn)
54 65
55 66
56 67 def _pseudo_sync_runner(coro):
57 68 """
58 69 A runner that does not really allow async execution, and just advance the coroutine.
59 70
60 71 See discussion in https://github.com/python-trio/trio/issues/608,
61 72
62 73 Credit to Nathaniel Smith
63 74 """
64 75 try:
65 76 coro.send(None)
66 77 except StopIteration as exc:
67 78 return exc.value
68 79 else:
69 80 # TODO: do not raise but return an execution result with the right info.
70 81 raise RuntimeError(
71 82 "{coro_name!r} needs a real async loop".format(coro_name=coro.__name__)
72 83 )
73 84
74 85
75 86 def _should_be_async(cell: str) -> bool:
76 87 """Detect if a block of code need to be wrapped in an `async def`
77 88
78 89 Attempt to parse the block of code, it it compile we're fine.
79 90 Otherwise we wrap if and try to compile.
80 91
81 92 If it works, assume it should be async. Otherwise Return False.
82 93
83 94 Not handled yet: If the block of code has a return statement as the top
84 95 level, it will be seen as async. This is a know limitation.
85 96 """
86 97 try:
87 98 code = compile(
88 99 cell, "<>", "exec", flags=getattr(ast, "PyCF_ALLOW_TOP_LEVEL_AWAIT", 0x0)
89 100 )
90 101 return inspect.CO_COROUTINE & code.co_flags == inspect.CO_COROUTINE
91 102 except (SyntaxError, MemoryError):
92 103 return False
@@ -1,1082 +1,1098 b''
1 1 # -*- coding: utf-8 -*-
2 2 """Tests for the key interactiveshell module.
3 3
4 4 Historically the main classes in interactiveshell have been under-tested. This
5 5 module should grow as many single-method tests as possible to trap many of the
6 6 recurring bugs we seem to encounter with high-level interaction.
7 7 """
8 8
9 9 # Copyright (c) IPython Development Team.
10 10 # Distributed under the terms of the Modified BSD License.
11 11
12 12 import asyncio
13 13 import ast
14 14 import os
15 15 import signal
16 16 import shutil
17 17 import sys
18 18 import tempfile
19 19 import unittest
20 20 from unittest import mock
21 21
22 22 from os.path import join
23 23
24 24 from IPython.core.error import InputRejected
25 25 from IPython.core.inputtransformer import InputTransformer
26 26 from IPython.core import interactiveshell
27 27 from IPython.testing.decorators import (
28 28 skipif, skip_win32, onlyif_unicode_paths, onlyif_cmds_exist,
29 29 )
30 30 from IPython.testing import tools as tt
31 31 from IPython.utils.process import find_cmd
32 32
33 33 #-----------------------------------------------------------------------------
34 34 # Globals
35 35 #-----------------------------------------------------------------------------
36 36 # This is used by every single test, no point repeating it ad nauseam
37 37
38 38 #-----------------------------------------------------------------------------
39 39 # Tests
40 40 #-----------------------------------------------------------------------------
41 41
42 42 class DerivedInterrupt(KeyboardInterrupt):
43 43 pass
44 44
45 45 class InteractiveShellTestCase(unittest.TestCase):
46 46 def test_naked_string_cells(self):
47 47 """Test that cells with only naked strings are fully executed"""
48 48 # First, single-line inputs
49 49 ip.run_cell('"a"\n')
50 50 self.assertEqual(ip.user_ns['_'], 'a')
51 51 # And also multi-line cells
52 52 ip.run_cell('"""a\nb"""\n')
53 53 self.assertEqual(ip.user_ns['_'], 'a\nb')
54 54
55 55 def test_run_empty_cell(self):
56 56 """Just make sure we don't get a horrible error with a blank
57 57 cell of input. Yes, I did overlook that."""
58 58 old_xc = ip.execution_count
59 59 res = ip.run_cell('')
60 60 self.assertEqual(ip.execution_count, old_xc)
61 61 self.assertEqual(res.execution_count, None)
62 62
63 63 def test_run_cell_multiline(self):
64 64 """Multi-block, multi-line cells must execute correctly.
65 65 """
66 66 src = '\n'.join(["x=1",
67 67 "y=2",
68 68 "if 1:",
69 69 " x += 1",
70 70 " y += 1",])
71 71 res = ip.run_cell(src)
72 72 self.assertEqual(ip.user_ns['x'], 2)
73 73 self.assertEqual(ip.user_ns['y'], 3)
74 74 self.assertEqual(res.success, True)
75 75 self.assertEqual(res.result, None)
76 76
77 77 def test_multiline_string_cells(self):
78 78 "Code sprinkled with multiline strings should execute (GH-306)"
79 79 ip.run_cell('tmp=0')
80 80 self.assertEqual(ip.user_ns['tmp'], 0)
81 81 res = ip.run_cell('tmp=1;"""a\nb"""\n')
82 82 self.assertEqual(ip.user_ns['tmp'], 1)
83 83 self.assertEqual(res.success, True)
84 84 self.assertEqual(res.result, "a\nb")
85 85
86 86 def test_dont_cache_with_semicolon(self):
87 87 "Ending a line with semicolon should not cache the returned object (GH-307)"
88 88 oldlen = len(ip.user_ns['Out'])
89 89 for cell in ['1;', '1;1;']:
90 90 res = ip.run_cell(cell, store_history=True)
91 91 newlen = len(ip.user_ns['Out'])
92 92 self.assertEqual(oldlen, newlen)
93 93 self.assertIsNone(res.result)
94 94 i = 0
95 95 #also test the default caching behavior
96 96 for cell in ['1', '1;1']:
97 97 ip.run_cell(cell, store_history=True)
98 98 newlen = len(ip.user_ns['Out'])
99 99 i += 1
100 100 self.assertEqual(oldlen+i, newlen)
101 101
102 102 def test_syntax_error(self):
103 103 res = ip.run_cell("raise = 3")
104 104 self.assertIsInstance(res.error_before_exec, SyntaxError)
105 105
106 106 def test_In_variable(self):
107 107 "Verify that In variable grows with user input (GH-284)"
108 108 oldlen = len(ip.user_ns['In'])
109 109 ip.run_cell('1;', store_history=True)
110 110 newlen = len(ip.user_ns['In'])
111 111 self.assertEqual(oldlen+1, newlen)
112 112 self.assertEqual(ip.user_ns['In'][-1],'1;')
113 113
114 114 def test_magic_names_in_string(self):
115 115 ip.run_cell('a = """\n%exit\n"""')
116 116 self.assertEqual(ip.user_ns['a'], '\n%exit\n')
117 117
118 118 def test_trailing_newline(self):
119 119 """test that running !(command) does not raise a SyntaxError"""
120 120 ip.run_cell('!(true)\n', False)
121 121 ip.run_cell('!(true)\n\n\n', False)
122 122
123 123 def test_gh_597(self):
124 124 """Pretty-printing lists of objects with non-ascii reprs may cause
125 125 problems."""
126 126 class Spam(object):
127 127 def __repr__(self):
128 128 return "\xe9"*50
129 129 import IPython.core.formatters
130 130 f = IPython.core.formatters.PlainTextFormatter()
131 131 f([Spam(),Spam()])
132 132
133 133
134 134 def test_future_flags(self):
135 135 """Check that future flags are used for parsing code (gh-777)"""
136 136 ip.run_cell('from __future__ import barry_as_FLUFL')
137 137 try:
138 138 ip.run_cell('prfunc_return_val = 1 <> 2')
139 139 assert 'prfunc_return_val' in ip.user_ns
140 140 finally:
141 141 # Reset compiler flags so we don't mess up other tests.
142 142 ip.compile.reset_compiler_flags()
143 143
144 144 def test_can_pickle(self):
145 145 "Can we pickle objects defined interactively (GH-29)"
146 146 ip = get_ipython()
147 147 ip.reset()
148 148 ip.run_cell(("class Mylist(list):\n"
149 149 " def __init__(self,x=[]):\n"
150 150 " list.__init__(self,x)"))
151 151 ip.run_cell("w=Mylist([1,2,3])")
152 152
153 153 from pickle import dumps
154 154
155 155 # We need to swap in our main module - this is only necessary
156 156 # inside the test framework, because IPython puts the interactive module
157 157 # in place (but the test framework undoes this).
158 158 _main = sys.modules['__main__']
159 159 sys.modules['__main__'] = ip.user_module
160 160 try:
161 161 res = dumps(ip.user_ns["w"])
162 162 finally:
163 163 sys.modules['__main__'] = _main
164 164 self.assertTrue(isinstance(res, bytes))
165 165
166 166 def test_global_ns(self):
167 167 "Code in functions must be able to access variables outside them."
168 168 ip = get_ipython()
169 169 ip.run_cell("a = 10")
170 170 ip.run_cell(("def f(x):\n"
171 171 " return x + a"))
172 172 ip.run_cell("b = f(12)")
173 173 self.assertEqual(ip.user_ns["b"], 22)
174 174
175 175 def test_bad_custom_tb(self):
176 176 """Check that InteractiveShell is protected from bad custom exception handlers"""
177 177 ip.set_custom_exc((IOError,), lambda etype,value,tb: 1/0)
178 178 self.assertEqual(ip.custom_exceptions, (IOError,))
179 179 with tt.AssertPrints("Custom TB Handler failed", channel='stderr'):
180 180 ip.run_cell(u'raise IOError("foo")')
181 181 self.assertEqual(ip.custom_exceptions, ())
182 182
183 183 def test_bad_custom_tb_return(self):
184 184 """Check that InteractiveShell is protected from bad return types in custom exception handlers"""
185 185 ip.set_custom_exc((NameError,),lambda etype,value,tb, tb_offset=None: 1)
186 186 self.assertEqual(ip.custom_exceptions, (NameError,))
187 187 with tt.AssertPrints("Custom TB Handler failed", channel='stderr'):
188 188 ip.run_cell(u'a=abracadabra')
189 189 self.assertEqual(ip.custom_exceptions, ())
190 190
191 191 def test_drop_by_id(self):
192 192 myvars = {"a":object(), "b":object(), "c": object()}
193 193 ip.push(myvars, interactive=False)
194 194 for name in myvars:
195 195 assert name in ip.user_ns, name
196 196 assert name in ip.user_ns_hidden, name
197 197 ip.user_ns['b'] = 12
198 198 ip.drop_by_id(myvars)
199 199 for name in ["a", "c"]:
200 200 assert name not in ip.user_ns, name
201 201 assert name not in ip.user_ns_hidden, name
202 202 assert ip.user_ns['b'] == 12
203 203 ip.reset()
204 204
205 205 def test_var_expand(self):
206 206 ip.user_ns['f'] = u'Ca\xf1o'
207 207 self.assertEqual(ip.var_expand(u'echo $f'), u'echo Ca\xf1o')
208 208 self.assertEqual(ip.var_expand(u'echo {f}'), u'echo Ca\xf1o')
209 209 self.assertEqual(ip.var_expand(u'echo {f[:-1]}'), u'echo Ca\xf1')
210 210 self.assertEqual(ip.var_expand(u'echo {1*2}'), u'echo 2')
211 211
212 212 self.assertEqual(ip.var_expand(u"grep x | awk '{print $1}'"), u"grep x | awk '{print $1}'")
213 213
214 214 ip.user_ns['f'] = b'Ca\xc3\xb1o'
215 215 # This should not raise any exception:
216 216 ip.var_expand(u'echo $f')
217 217
218 218 def test_var_expand_local(self):
219 219 """Test local variable expansion in !system and %magic calls"""
220 220 # !system
221 221 ip.run_cell(
222 222 "def test():\n"
223 223 ' lvar = "ttt"\n'
224 224 " ret = !echo {lvar}\n"
225 225 " return ret[0]\n"
226 226 )
227 227 res = ip.user_ns["test"]()
228 228 self.assertIn("ttt", res)
229 229
230 230 # %magic
231 231 ip.run_cell(
232 232 "def makemacro():\n"
233 233 ' macroname = "macro_var_expand_locals"\n'
234 234 " %macro {macroname} codestr\n"
235 235 )
236 236 ip.user_ns["codestr"] = "str(12)"
237 237 ip.run_cell("makemacro()")
238 238 self.assertIn("macro_var_expand_locals", ip.user_ns)
239 239
240 240 def test_var_expand_self(self):
241 241 """Test variable expansion with the name 'self', which was failing.
242 242
243 243 See https://github.com/ipython/ipython/issues/1878#issuecomment-7698218
244 244 """
245 245 ip.run_cell(
246 246 "class cTest:\n"
247 247 ' classvar="see me"\n'
248 248 " def test(self):\n"
249 249 " res = !echo Variable: {self.classvar}\n"
250 250 " return res[0]\n"
251 251 )
252 252 self.assertIn("see me", ip.user_ns["cTest"]().test())
253 253
254 254 def test_bad_var_expand(self):
255 255 """var_expand on invalid formats shouldn't raise"""
256 256 # SyntaxError
257 257 self.assertEqual(ip.var_expand(u"{'a':5}"), u"{'a':5}")
258 258 # NameError
259 259 self.assertEqual(ip.var_expand(u"{asdf}"), u"{asdf}")
260 260 # ZeroDivisionError
261 261 self.assertEqual(ip.var_expand(u"{1/0}"), u"{1/0}")
262 262
263 263 def test_silent_postexec(self):
264 264 """run_cell(silent=True) doesn't invoke pre/post_run_cell callbacks"""
265 265 pre_explicit = mock.Mock()
266 266 pre_always = mock.Mock()
267 267 post_explicit = mock.Mock()
268 268 post_always = mock.Mock()
269 269 all_mocks = [pre_explicit, pre_always, post_explicit, post_always]
270 270
271 271 ip.events.register('pre_run_cell', pre_explicit)
272 272 ip.events.register('pre_execute', pre_always)
273 273 ip.events.register('post_run_cell', post_explicit)
274 274 ip.events.register('post_execute', post_always)
275 275
276 276 try:
277 277 ip.run_cell("1", silent=True)
278 278 assert pre_always.called
279 279 assert not pre_explicit.called
280 280 assert post_always.called
281 281 assert not post_explicit.called
282 282 # double-check that non-silent exec did what we expected
283 283 # silent to avoid
284 284 ip.run_cell("1")
285 285 assert pre_explicit.called
286 286 assert post_explicit.called
287 287 info, = pre_explicit.call_args[0]
288 288 result, = post_explicit.call_args[0]
289 289 self.assertEqual(info, result.info)
290 290 # check that post hooks are always called
291 291 [m.reset_mock() for m in all_mocks]
292 292 ip.run_cell("syntax error")
293 293 assert pre_always.called
294 294 assert pre_explicit.called
295 295 assert post_always.called
296 296 assert post_explicit.called
297 297 info, = pre_explicit.call_args[0]
298 298 result, = post_explicit.call_args[0]
299 299 self.assertEqual(info, result.info)
300 300 finally:
301 301 # remove post-exec
302 302 ip.events.unregister('pre_run_cell', pre_explicit)
303 303 ip.events.unregister('pre_execute', pre_always)
304 304 ip.events.unregister('post_run_cell', post_explicit)
305 305 ip.events.unregister('post_execute', post_always)
306 306
307 307 def test_silent_noadvance(self):
308 308 """run_cell(silent=True) doesn't advance execution_count"""
309 309 ec = ip.execution_count
310 310 # silent should force store_history=False
311 311 ip.run_cell("1", store_history=True, silent=True)
312 312
313 313 self.assertEqual(ec, ip.execution_count)
314 314 # double-check that non-silent exec did what we expected
315 315 # silent to avoid
316 316 ip.run_cell("1", store_history=True)
317 317 self.assertEqual(ec+1, ip.execution_count)
318 318
319 319 def test_silent_nodisplayhook(self):
320 320 """run_cell(silent=True) doesn't trigger displayhook"""
321 321 d = dict(called=False)
322 322
323 323 trap = ip.display_trap
324 324 save_hook = trap.hook
325 325
326 326 def failing_hook(*args, **kwargs):
327 327 d['called'] = True
328 328
329 329 try:
330 330 trap.hook = failing_hook
331 331 res = ip.run_cell("1", silent=True)
332 332 self.assertFalse(d['called'])
333 333 self.assertIsNone(res.result)
334 334 # double-check that non-silent exec did what we expected
335 335 # silent to avoid
336 336 ip.run_cell("1")
337 337 self.assertTrue(d['called'])
338 338 finally:
339 339 trap.hook = save_hook
340 340
341 341 def test_ofind_line_magic(self):
342 342 from IPython.core.magic import register_line_magic
343 343
344 344 @register_line_magic
345 345 def lmagic(line):
346 346 "A line magic"
347 347
348 348 # Get info on line magic
349 349 lfind = ip._ofind("lmagic")
350 350 info = dict(
351 351 found=True,
352 352 isalias=False,
353 353 ismagic=True,
354 354 namespace="IPython internal",
355 355 obj=lmagic,
356 356 parent=None,
357 357 )
358 358 self.assertEqual(lfind, info)
359 359
360 360 def test_ofind_cell_magic(self):
361 361 from IPython.core.magic import register_cell_magic
362 362
363 363 @register_cell_magic
364 364 def cmagic(line, cell):
365 365 "A cell magic"
366 366
367 367 # Get info on cell magic
368 368 find = ip._ofind("cmagic")
369 369 info = dict(
370 370 found=True,
371 371 isalias=False,
372 372 ismagic=True,
373 373 namespace="IPython internal",
374 374 obj=cmagic,
375 375 parent=None,
376 376 )
377 377 self.assertEqual(find, info)
378 378
379 379 def test_ofind_property_with_error(self):
380 380 class A(object):
381 381 @property
382 382 def foo(self):
383 383 raise NotImplementedError()
384 384 a = A()
385 385
386 386 found = ip._ofind('a.foo', [('locals', locals())])
387 387 info = dict(found=True, isalias=False, ismagic=False,
388 388 namespace='locals', obj=A.foo, parent=a)
389 389 self.assertEqual(found, info)
390 390
391 391 def test_ofind_multiple_attribute_lookups(self):
392 392 class A(object):
393 393 @property
394 394 def foo(self):
395 395 raise NotImplementedError()
396 396
397 397 a = A()
398 398 a.a = A()
399 399 a.a.a = A()
400 400
401 401 found = ip._ofind('a.a.a.foo', [('locals', locals())])
402 402 info = dict(found=True, isalias=False, ismagic=False,
403 403 namespace='locals', obj=A.foo, parent=a.a.a)
404 404 self.assertEqual(found, info)
405 405
406 406 def test_ofind_slotted_attributes(self):
407 407 class A(object):
408 408 __slots__ = ['foo']
409 409 def __init__(self):
410 410 self.foo = 'bar'
411 411
412 412 a = A()
413 413 found = ip._ofind('a.foo', [('locals', locals())])
414 414 info = dict(found=True, isalias=False, ismagic=False,
415 415 namespace='locals', obj=a.foo, parent=a)
416 416 self.assertEqual(found, info)
417 417
418 418 found = ip._ofind('a.bar', [('locals', locals())])
419 419 info = dict(found=False, isalias=False, ismagic=False,
420 420 namespace=None, obj=None, parent=a)
421 421 self.assertEqual(found, info)
422 422
423 423 def test_ofind_prefers_property_to_instance_level_attribute(self):
424 424 class A(object):
425 425 @property
426 426 def foo(self):
427 427 return 'bar'
428 428 a = A()
429 429 a.__dict__["foo"] = "baz"
430 430 self.assertEqual(a.foo, "bar")
431 431 found = ip._ofind("a.foo", [("locals", locals())])
432 432 self.assertIs(found["obj"], A.foo)
433 433
434 434 def test_custom_syntaxerror_exception(self):
435 435 called = []
436 436 def my_handler(shell, etype, value, tb, tb_offset=None):
437 437 called.append(etype)
438 438 shell.showtraceback((etype, value, tb), tb_offset=tb_offset)
439 439
440 440 ip.set_custom_exc((SyntaxError,), my_handler)
441 441 try:
442 442 ip.run_cell("1f")
443 443 # Check that this was called, and only once.
444 444 self.assertEqual(called, [SyntaxError])
445 445 finally:
446 446 # Reset the custom exception hook
447 447 ip.set_custom_exc((), None)
448 448
449 449 def test_custom_exception(self):
450 450 called = []
451 451 def my_handler(shell, etype, value, tb, tb_offset=None):
452 452 called.append(etype)
453 453 shell.showtraceback((etype, value, tb), tb_offset=tb_offset)
454 454
455 455 ip.set_custom_exc((ValueError,), my_handler)
456 456 try:
457 457 res = ip.run_cell("raise ValueError('test')")
458 458 # Check that this was called, and only once.
459 459 self.assertEqual(called, [ValueError])
460 460 # Check that the error is on the result object
461 461 self.assertIsInstance(res.error_in_exec, ValueError)
462 462 finally:
463 463 # Reset the custom exception hook
464 464 ip.set_custom_exc((), None)
465 465
466 466 @mock.patch("builtins.print")
467 467 def test_showtraceback_with_surrogates(self, mocked_print):
468 468 values = []
469 469
470 470 def mock_print_func(value, sep=" ", end="\n", file=sys.stdout, flush=False):
471 471 values.append(value)
472 472 if value == chr(0xD8FF):
473 473 raise UnicodeEncodeError("utf-8", chr(0xD8FF), 0, 1, "")
474 474
475 475 # mock builtins.print
476 476 mocked_print.side_effect = mock_print_func
477 477
478 478 # ip._showtraceback() is replaced in globalipapp.py.
479 479 # Call original method to test.
480 480 interactiveshell.InteractiveShell._showtraceback(ip, None, None, chr(0xD8FF))
481 481
482 482 self.assertEqual(mocked_print.call_count, 2)
483 483 self.assertEqual(values, [chr(0xD8FF), "\\ud8ff"])
484 484
485 485 def test_mktempfile(self):
486 486 filename = ip.mktempfile()
487 487 # Check that we can open the file again on Windows
488 488 with open(filename, 'w') as f:
489 489 f.write('abc')
490 490
491 491 filename = ip.mktempfile(data='blah')
492 492 with open(filename, 'r') as f:
493 493 self.assertEqual(f.read(), 'blah')
494 494
495 495 def test_new_main_mod(self):
496 496 # Smoketest to check that this accepts a unicode module name
497 497 name = u'jiefmw'
498 498 mod = ip.new_main_mod(u'%s.py' % name, name)
499 499 self.assertEqual(mod.__name__, name)
500 500
501 501 def test_get_exception_only(self):
502 502 try:
503 503 raise KeyboardInterrupt
504 504 except KeyboardInterrupt:
505 505 msg = ip.get_exception_only()
506 506 self.assertEqual(msg, 'KeyboardInterrupt\n')
507 507
508 508 try:
509 509 raise DerivedInterrupt("foo")
510 510 except KeyboardInterrupt:
511 511 msg = ip.get_exception_only()
512 512 self.assertEqual(msg, 'IPython.core.tests.test_interactiveshell.DerivedInterrupt: foo\n')
513 513
514 514 def test_inspect_text(self):
515 515 ip.run_cell('a = 5')
516 516 text = ip.object_inspect_text('a')
517 517 self.assertIsInstance(text, str)
518 518
519 519 def test_last_execution_result(self):
520 520 """ Check that last execution result gets set correctly (GH-10702) """
521 521 result = ip.run_cell('a = 5; a')
522 522 self.assertTrue(ip.last_execution_succeeded)
523 523 self.assertEqual(ip.last_execution_result.result, 5)
524 524
525 525 result = ip.run_cell('a = x_invalid_id_x')
526 526 self.assertFalse(ip.last_execution_succeeded)
527 527 self.assertFalse(ip.last_execution_result.success)
528 528 self.assertIsInstance(ip.last_execution_result.error_in_exec, NameError)
529 529
530 530 def test_reset_aliasing(self):
531 531 """ Check that standard posix aliases work after %reset. """
532 532 if os.name != 'posix':
533 533 return
534 534
535 535 ip.reset()
536 536 for cmd in ('clear', 'more', 'less', 'man'):
537 537 res = ip.run_cell('%' + cmd)
538 538 self.assertEqual(res.success, True)
539 539
540 540
541 541 class TestSafeExecfileNonAsciiPath(unittest.TestCase):
542 542
543 543 @onlyif_unicode_paths
544 544 def setUp(self):
545 545 self.BASETESTDIR = tempfile.mkdtemp()
546 546 self.TESTDIR = join(self.BASETESTDIR, u"Γ₯Àâ")
547 547 os.mkdir(self.TESTDIR)
548 548 with open(join(self.TESTDIR, u"Γ₯Àâtestscript.py"), "w") as sfile:
549 549 sfile.write("pass\n")
550 550 self.oldpath = os.getcwd()
551 551 os.chdir(self.TESTDIR)
552 552 self.fname = u"Γ₯Àâtestscript.py"
553 553
554 554 def tearDown(self):
555 555 os.chdir(self.oldpath)
556 556 shutil.rmtree(self.BASETESTDIR)
557 557
558 558 @onlyif_unicode_paths
559 559 def test_1(self):
560 560 """Test safe_execfile with non-ascii path
561 561 """
562 562 ip.safe_execfile(self.fname, {}, raise_exceptions=True)
563 563
564 564 class ExitCodeChecks(tt.TempFileMixin):
565 565
566 566 def setUp(self):
567 567 self.system = ip.system_raw
568 568
569 569 def test_exit_code_ok(self):
570 570 self.system('exit 0')
571 571 self.assertEqual(ip.user_ns['_exit_code'], 0)
572 572
573 573 def test_exit_code_error(self):
574 574 self.system('exit 1')
575 575 self.assertEqual(ip.user_ns['_exit_code'], 1)
576 576
577 577 @skipif(not hasattr(signal, 'SIGALRM'))
578 578 def test_exit_code_signal(self):
579 579 self.mktmp("import signal, time\n"
580 580 "signal.setitimer(signal.ITIMER_REAL, 0.1)\n"
581 581 "time.sleep(1)\n")
582 582 self.system("%s %s" % (sys.executable, self.fname))
583 583 self.assertEqual(ip.user_ns['_exit_code'], -signal.SIGALRM)
584 584
585 585 @onlyif_cmds_exist("csh")
586 586 def test_exit_code_signal_csh(self):
587 587 SHELL = os.environ.get('SHELL', None)
588 588 os.environ['SHELL'] = find_cmd("csh")
589 589 try:
590 590 self.test_exit_code_signal()
591 591 finally:
592 592 if SHELL is not None:
593 593 os.environ['SHELL'] = SHELL
594 594 else:
595 595 del os.environ['SHELL']
596 596
597 597
598 598 class TestSystemRaw(ExitCodeChecks):
599 599
600 600 def setUp(self):
601 601 super().setUp()
602 602 self.system = ip.system_raw
603 603
604 604 @onlyif_unicode_paths
605 605 def test_1(self):
606 606 """Test system_raw with non-ascii cmd
607 607 """
608 608 cmd = u'''python -c "'Γ₯Àâ'" '''
609 609 ip.system_raw(cmd)
610 610
611 611 @mock.patch('subprocess.call', side_effect=KeyboardInterrupt)
612 612 @mock.patch('os.system', side_effect=KeyboardInterrupt)
613 613 def test_control_c(self, *mocks):
614 614 try:
615 615 self.system("sleep 1 # wont happen")
616 616 except KeyboardInterrupt:
617 617 self.fail(
618 618 "system call should intercept "
619 619 "keyboard interrupt from subprocess.call"
620 620 )
621 621 self.assertEqual(ip.user_ns["_exit_code"], -signal.SIGINT)
622 622
623 623 def test_magic_warnings(self):
624 624 for magic_cmd in ("ls", "pip", "conda", "cd"):
625 625 with self.assertWarnsRegex(Warning, "You executed the system command"):
626 626 ip.system_raw(magic_cmd)
627 627
628 628 # TODO: Exit codes are currently ignored on Windows.
629 629 class TestSystemPipedExitCode(ExitCodeChecks):
630 630
631 631 def setUp(self):
632 632 super().setUp()
633 633 self.system = ip.system_piped
634 634
635 635 @skip_win32
636 636 def test_exit_code_ok(self):
637 637 ExitCodeChecks.test_exit_code_ok(self)
638 638
639 639 @skip_win32
640 640 def test_exit_code_error(self):
641 641 ExitCodeChecks.test_exit_code_error(self)
642 642
643 643 @skip_win32
644 644 def test_exit_code_signal(self):
645 645 ExitCodeChecks.test_exit_code_signal(self)
646 646
647 647 class TestModules(tt.TempFileMixin):
648 648 def test_extraneous_loads(self):
649 649 """Test we're not loading modules on startup that we shouldn't.
650 650 """
651 651 self.mktmp("import sys\n"
652 652 "print('numpy' in sys.modules)\n"
653 653 "print('ipyparallel' in sys.modules)\n"
654 654 "print('ipykernel' in sys.modules)\n"
655 655 )
656 656 out = "False\nFalse\nFalse\n"
657 657 tt.ipexec_validate(self.fname, out)
658 658
659 659 class Negator(ast.NodeTransformer):
660 660 """Negates all number literals in an AST."""
661 661
662 662 # for python 3.7 and earlier
663 663 def visit_Num(self, node):
664 664 node.n = -node.n
665 665 return node
666 666
667 667 # for python 3.8+
668 668 def visit_Constant(self, node):
669 669 if isinstance(node.value, int):
670 670 return self.visit_Num(node)
671 671 return node
672 672
673 673 class TestAstTransform(unittest.TestCase):
674 674 def setUp(self):
675 675 self.negator = Negator()
676 676 ip.ast_transformers.append(self.negator)
677 677
678 678 def tearDown(self):
679 679 ip.ast_transformers.remove(self.negator)
680 680
681 681 def test_run_cell(self):
682 682 with tt.AssertPrints('-34'):
683 683 ip.run_cell('print (12 + 22)')
684 684
685 685 # A named reference to a number shouldn't be transformed.
686 686 ip.user_ns['n'] = 55
687 687 with tt.AssertNotPrints('-55'):
688 688 ip.run_cell('print (n)')
689 689
690 690 def test_timeit(self):
691 691 called = set()
692 692 def f(x):
693 693 called.add(x)
694 694 ip.push({'f':f})
695 695
696 696 with tt.AssertPrints("std. dev. of"):
697 697 ip.run_line_magic("timeit", "-n1 f(1)")
698 698 self.assertEqual(called, {-1})
699 699 called.clear()
700 700
701 701 with tt.AssertPrints("std. dev. of"):
702 702 ip.run_cell_magic("timeit", "-n1 f(2)", "f(3)")
703 703 self.assertEqual(called, {-2, -3})
704 704
705 705 def test_time(self):
706 706 called = []
707 707 def f(x):
708 708 called.append(x)
709 709 ip.push({'f':f})
710 710
711 711 # Test with an expression
712 712 with tt.AssertPrints("Wall time: "):
713 713 ip.run_line_magic("time", "f(5+9)")
714 714 self.assertEqual(called, [-14])
715 715 called[:] = []
716 716
717 717 # Test with a statement (different code path)
718 718 with tt.AssertPrints("Wall time: "):
719 719 ip.run_line_magic("time", "a = f(-3 + -2)")
720 720 self.assertEqual(called, [5])
721 721
722 722 def test_macro(self):
723 723 ip.push({'a':10})
724 724 # The AST transformation makes this do a+=-1
725 725 ip.define_macro("amacro", "a+=1\nprint(a)")
726 726
727 727 with tt.AssertPrints("9"):
728 728 ip.run_cell("amacro")
729 729 with tt.AssertPrints("8"):
730 730 ip.run_cell("amacro")
731 731
732 732 class TestMiscTransform(unittest.TestCase):
733 733
734 734
735 735 def test_transform_only_once(self):
736 736 cleanup = 0
737 737 line_t = 0
738 738 def count_cleanup(lines):
739 739 nonlocal cleanup
740 740 cleanup += 1
741 741 return lines
742 742
743 743 def count_line_t(lines):
744 744 nonlocal line_t
745 745 line_t += 1
746 746 return lines
747 747
748 748 ip.input_transformer_manager.cleanup_transforms.append(count_cleanup)
749 749 ip.input_transformer_manager.line_transforms.append(count_line_t)
750 750
751 751 ip.run_cell('1')
752 752
753 753 assert cleanup == 1
754 754 assert line_t == 1
755 755
756 756 class IntegerWrapper(ast.NodeTransformer):
757 757 """Wraps all integers in a call to Integer()"""
758 758
759 759 # for Python 3.7 and earlier
760 760
761 761 # for Python 3.7 and earlier
762 762 def visit_Num(self, node):
763 763 if isinstance(node.n, int):
764 764 return ast.Call(func=ast.Name(id='Integer', ctx=ast.Load()),
765 765 args=[node], keywords=[])
766 766 return node
767 767
768 768 # For Python 3.8+
769 769 def visit_Constant(self, node):
770 770 if isinstance(node.value, int):
771 771 return self.visit_Num(node)
772 772 return node
773 773
774 774
775 775 class TestAstTransform2(unittest.TestCase):
776 776 def setUp(self):
777 777 self.intwrapper = IntegerWrapper()
778 778 ip.ast_transformers.append(self.intwrapper)
779 779
780 780 self.calls = []
781 781 def Integer(*args):
782 782 self.calls.append(args)
783 783 return args
784 784 ip.push({"Integer": Integer})
785 785
786 786 def tearDown(self):
787 787 ip.ast_transformers.remove(self.intwrapper)
788 788 del ip.user_ns['Integer']
789 789
790 790 def test_run_cell(self):
791 791 ip.run_cell("n = 2")
792 792 self.assertEqual(self.calls, [(2,)])
793 793
794 794 # This shouldn't throw an error
795 795 ip.run_cell("o = 2.0")
796 796 self.assertEqual(ip.user_ns['o'], 2.0)
797 797
798 798 def test_timeit(self):
799 799 called = set()
800 800 def f(x):
801 801 called.add(x)
802 802 ip.push({'f':f})
803 803
804 804 with tt.AssertPrints("std. dev. of"):
805 805 ip.run_line_magic("timeit", "-n1 f(1)")
806 806 self.assertEqual(called, {(1,)})
807 807 called.clear()
808 808
809 809 with tt.AssertPrints("std. dev. of"):
810 810 ip.run_cell_magic("timeit", "-n1 f(2)", "f(3)")
811 811 self.assertEqual(called, {(2,), (3,)})
812 812
813 813 class ErrorTransformer(ast.NodeTransformer):
814 814 """Throws an error when it sees a number."""
815 815
816 816 # for Python 3.7 and earlier
817 817 def visit_Num(self, node):
818 818 raise ValueError("test")
819 819
820 820 # for Python 3.8+
821 821 def visit_Constant(self, node):
822 822 if isinstance(node.value, int):
823 823 return self.visit_Num(node)
824 824 return node
825 825
826 826
827 827 class TestAstTransformError(unittest.TestCase):
828 828 def test_unregistering(self):
829 829 err_transformer = ErrorTransformer()
830 830 ip.ast_transformers.append(err_transformer)
831 831
832 832 with self.assertWarnsRegex(UserWarning, "It will be unregistered"):
833 833 ip.run_cell("1 + 2")
834 834
835 835 # This should have been removed.
836 836 self.assertNotIn(err_transformer, ip.ast_transformers)
837 837
838 838
839 839 class StringRejector(ast.NodeTransformer):
840 840 """Throws an InputRejected when it sees a string literal.
841 841
842 842 Used to verify that NodeTransformers can signal that a piece of code should
843 843 not be executed by throwing an InputRejected.
844 844 """
845 845
846 846 #for python 3.7 and earlier
847 847 def visit_Str(self, node):
848 848 raise InputRejected("test")
849 849
850 850 # 3.8 only
851 851 def visit_Constant(self, node):
852 852 if isinstance(node.value, str):
853 853 raise InputRejected("test")
854 854 return node
855 855
856 856
857 857 class TestAstTransformInputRejection(unittest.TestCase):
858 858
859 859 def setUp(self):
860 860 self.transformer = StringRejector()
861 861 ip.ast_transformers.append(self.transformer)
862 862
863 863 def tearDown(self):
864 864 ip.ast_transformers.remove(self.transformer)
865 865
866 866 def test_input_rejection(self):
867 867 """Check that NodeTransformers can reject input."""
868 868
869 869 expect_exception_tb = tt.AssertPrints("InputRejected: test")
870 870 expect_no_cell_output = tt.AssertNotPrints("'unsafe'", suppress=False)
871 871
872 872 # Run the same check twice to verify that the transformer is not
873 873 # disabled after raising.
874 874 with expect_exception_tb, expect_no_cell_output:
875 875 ip.run_cell("'unsafe'")
876 876
877 877 with expect_exception_tb, expect_no_cell_output:
878 878 res = ip.run_cell("'unsafe'")
879 879
880 880 self.assertIsInstance(res.error_before_exec, InputRejected)
881 881
882 882 def test__IPYTHON__():
883 883 # This shouldn't raise a NameError, that's all
884 884 __IPYTHON__
885 885
886 886
887 887 class DummyRepr(object):
888 888 def __repr__(self):
889 889 return "DummyRepr"
890 890
891 891 def _repr_html_(self):
892 892 return "<b>dummy</b>"
893 893
894 894 def _repr_javascript_(self):
895 895 return "console.log('hi');", {'key': 'value'}
896 896
897 897
898 898 def test_user_variables():
899 899 # enable all formatters
900 900 ip.display_formatter.active_types = ip.display_formatter.format_types
901 901
902 902 ip.user_ns['dummy'] = d = DummyRepr()
903 903 keys = {'dummy', 'doesnotexist'}
904 904 r = ip.user_expressions({ key:key for key in keys})
905 905
906 906 assert keys == set(r.keys())
907 907 dummy = r["dummy"]
908 908 assert {"status", "data", "metadata"} == set(dummy.keys())
909 909 assert dummy["status"] == "ok"
910 910 data = dummy["data"]
911 911 metadata = dummy["metadata"]
912 912 assert data.get("text/html") == d._repr_html_()
913 913 js, jsmd = d._repr_javascript_()
914 914 assert data.get("application/javascript") == js
915 915 assert metadata.get("application/javascript") == jsmd
916 916
917 917 dne = r["doesnotexist"]
918 918 assert dne["status"] == "error"
919 919 assert dne["ename"] == "NameError"
920 920
921 921 # back to text only
922 922 ip.display_formatter.active_types = ['text/plain']
923 923
924 924 def test_user_expression():
925 925 # enable all formatters
926 926 ip.display_formatter.active_types = ip.display_formatter.format_types
927 927 query = {
928 928 'a' : '1 + 2',
929 929 'b' : '1/0',
930 930 }
931 931 r = ip.user_expressions(query)
932 932 import pprint
933 933 pprint.pprint(r)
934 934 assert set(r.keys()) == set(query.keys())
935 935 a = r["a"]
936 936 assert {"status", "data", "metadata"} == set(a.keys())
937 937 assert a["status"] == "ok"
938 938 data = a["data"]
939 939 metadata = a["metadata"]
940 940 assert data.get("text/plain") == "3"
941 941
942 942 b = r["b"]
943 943 assert b["status"] == "error"
944 944 assert b["ename"] == "ZeroDivisionError"
945 945
946 946 # back to text only
947 947 ip.display_formatter.active_types = ['text/plain']
948 948
949 949
950 950 class TestSyntaxErrorTransformer(unittest.TestCase):
951 951 """Check that SyntaxError raised by an input transformer is handled by run_cell()"""
952 952
953 953 @staticmethod
954 954 def transformer(lines):
955 955 for line in lines:
956 956 pos = line.find('syntaxerror')
957 957 if pos >= 0:
958 958 e = SyntaxError('input contains "syntaxerror"')
959 959 e.text = line
960 960 e.offset = pos + 1
961 961 raise e
962 962 return lines
963 963
964 964 def setUp(self):
965 965 ip.input_transformers_post.append(self.transformer)
966 966
967 967 def tearDown(self):
968 968 ip.input_transformers_post.remove(self.transformer)
969 969
970 970 def test_syntaxerror_input_transformer(self):
971 971 with tt.AssertPrints('1234'):
972 972 ip.run_cell('1234')
973 973 with tt.AssertPrints('SyntaxError: invalid syntax'):
974 974 ip.run_cell('1 2 3') # plain python syntax error
975 975 with tt.AssertPrints('SyntaxError: input contains "syntaxerror"'):
976 976 ip.run_cell('2345 # syntaxerror') # input transformer syntax error
977 977 with tt.AssertPrints('3456'):
978 978 ip.run_cell('3456')
979 979
980 980
981 981 class TestWarningSuppression(unittest.TestCase):
982 982 def test_warning_suppression(self):
983 983 ip.run_cell("import warnings")
984 984 try:
985 985 with self.assertWarnsRegex(UserWarning, "asdf"):
986 986 ip.run_cell("warnings.warn('asdf')")
987 987 # Here's the real test -- if we run that again, we should get the
988 988 # warning again. Traditionally, each warning was only issued once per
989 989 # IPython session (approximately), even if the user typed in new and
990 990 # different code that should have also triggered the warning, leading
991 991 # to much confusion.
992 992 with self.assertWarnsRegex(UserWarning, "asdf"):
993 993 ip.run_cell("warnings.warn('asdf')")
994 994 finally:
995 995 ip.run_cell("del warnings")
996 996
997 997
998 998 def test_deprecation_warning(self):
999 999 ip.run_cell("""
1000 1000 import warnings
1001 1001 def wrn():
1002 1002 warnings.warn(
1003 1003 "I AM A WARNING",
1004 1004 DeprecationWarning
1005 1005 )
1006 1006 """)
1007 1007 try:
1008 1008 with self.assertWarnsRegex(DeprecationWarning, "I AM A WARNING"):
1009 1009 ip.run_cell("wrn()")
1010 1010 finally:
1011 1011 ip.run_cell("del warnings")
1012 1012 ip.run_cell("del wrn")
1013 1013
1014 1014
1015 1015 class TestImportNoDeprecate(tt.TempFileMixin):
1016 1016
1017 1017 def setUp(self):
1018 1018 """Make a valid python temp file."""
1019 1019 self.mktmp("""
1020 1020 import warnings
1021 1021 def wrn():
1022 1022 warnings.warn(
1023 1023 "I AM A WARNING",
1024 1024 DeprecationWarning
1025 1025 )
1026 1026 """)
1027 1027 super().setUp()
1028 1028
1029 1029 def test_no_dep(self):
1030 1030 """
1031 1031 No deprecation warning should be raised from imported functions
1032 1032 """
1033 1033 ip.run_cell("from {} import wrn".format(self.fname))
1034 1034
1035 1035 with tt.AssertNotPrints("I AM A WARNING"):
1036 1036 ip.run_cell("wrn()")
1037 1037 ip.run_cell("del wrn")
1038 1038
1039 1039
1040 1040 def test_custom_exc_count():
1041 1041 hook = mock.Mock(return_value=None)
1042 1042 ip.set_custom_exc((SyntaxError,), hook)
1043 1043 before = ip.execution_count
1044 1044 ip.run_cell("def foo()", store_history=True)
1045 1045 # restore default excepthook
1046 1046 ip.set_custom_exc((), None)
1047 1047 assert hook.call_count == 1
1048 1048 assert ip.execution_count == before + 1
1049 1049
1050 1050
1051 1051 def test_run_cell_async():
1052 1052 loop = asyncio.get_event_loop_policy().get_event_loop()
1053 1053 ip.run_cell("import asyncio")
1054 1054 coro = ip.run_cell_async("await asyncio.sleep(0.01)\n5")
1055 1055 assert asyncio.iscoroutine(coro)
1056 1056 result = loop.run_until_complete(coro)
1057 1057 assert isinstance(result, interactiveshell.ExecutionResult)
1058 1058 assert result.result == 5
1059 1059
1060 1060
1061 def test_run_cell_await():
1062 ip.run_cell("import asyncio")
1063 result = ip.run_cell("await asyncio.sleep(0.01); 10")
1064 assert ip.user_ns["_"] == 10
1065
1066
1067 def test_run_cell_asyncio_run():
1068 ip.run_cell("import asyncio")
1069 result = ip.run_cell("await asyncio.sleep(0.01); 1")
1070 assert ip.user_ns["_"] == 1
1071 result = ip.run_cell("asyncio.run(asyncio.sleep(0.01)); 2")
1072 assert ip.user_ns["_"] == 2
1073 result = ip.run_cell("await asyncio.sleep(0.01); 3")
1074 assert ip.user_ns["_"] == 3
1075
1076
1061 1077 def test_should_run_async():
1062 1078 assert not ip.should_run_async("a = 5")
1063 1079 assert ip.should_run_async("await x")
1064 1080 assert ip.should_run_async("import asyncio; await asyncio.sleep(1)")
1065 1081
1066 1082
1067 1083 def test_set_custom_completer():
1068 1084 num_completers = len(ip.Completer.matchers)
1069 1085
1070 1086 def foo(*args, **kwargs):
1071 1087 return "I'm a completer!"
1072 1088
1073 1089 ip.set_custom_completer(foo, 0)
1074 1090
1075 1091 # check that we've really added a new completer
1076 1092 assert len(ip.Completer.matchers) == num_completers + 1
1077 1093
1078 1094 # check that the first completer is the function we defined
1079 1095 assert ip.Completer.matchers[0]() == "I'm a completer!"
1080 1096
1081 1097 # clean up
1082 1098 ip.Completer.custom_matchers.pop()
@@ -1,1354 +1,1358 b''
1 1 # -*- coding: utf-8 -*-
2 2 """Tests for various magic functions."""
3 3
4 4 import asyncio
5 5 import io
6 6 import os
7 7 import re
8 8 import shlex
9 9 import sys
10 10 import warnings
11 11 from importlib import invalidate_caches
12 12 from io import StringIO
13 13 from pathlib import Path
14 14 from textwrap import dedent
15 15 from unittest import TestCase, mock
16 16
17 17 import pytest
18 18
19 19 from IPython import get_ipython
20 20 from IPython.core import magic
21 21 from IPython.core.error import UsageError
22 22 from IPython.core.magic import (
23 23 Magics,
24 24 cell_magic,
25 25 line_magic,
26 26 magics_class,
27 27 register_cell_magic,
28 28 register_line_magic,
29 29 )
30 30 from IPython.core.magics import code, execution, logging, osm, script
31 31 from IPython.testing import decorators as dec
32 32 from IPython.testing import tools as tt
33 33 from IPython.utils.io import capture_output
34 34 from IPython.utils.process import find_cmd
35 35 from IPython.utils.tempdir import TemporaryDirectory, TemporaryWorkingDirectory
36 36
37 37 from .test_debugger import PdbTestInput
38 38
39 39
40 40 @magic.magics_class
41 41 class DummyMagics(magic.Magics): pass
42 42
43 43 def test_extract_code_ranges():
44 44 instr = "1 3 5-6 7-9 10:15 17: :10 10- -13 :"
45 45 expected = [
46 46 (0, 1),
47 47 (2, 3),
48 48 (4, 6),
49 49 (6, 9),
50 50 (9, 14),
51 51 (16, None),
52 52 (None, 9),
53 53 (9, None),
54 54 (None, 13),
55 55 (None, None),
56 56 ]
57 57 actual = list(code.extract_code_ranges(instr))
58 58 assert actual == expected
59 59
60 60 def test_extract_symbols():
61 61 source = """import foo\na = 10\ndef b():\n return 42\n\n\nclass A: pass\n\n\n"""
62 62 symbols_args = ["a", "b", "A", "A,b", "A,a", "z"]
63 63 expected = [([], ['a']),
64 64 (["def b():\n return 42\n"], []),
65 65 (["class A: pass\n"], []),
66 66 (["class A: pass\n", "def b():\n return 42\n"], []),
67 67 (["class A: pass\n"], ['a']),
68 68 ([], ['z'])]
69 69 for symbols, exp in zip(symbols_args, expected):
70 70 assert code.extract_symbols(source, symbols) == exp
71 71
72 72
73 73 def test_extract_symbols_raises_exception_with_non_python_code():
74 74 source = ("=begin A Ruby program :)=end\n"
75 75 "def hello\n"
76 76 "puts 'Hello world'\n"
77 77 "end")
78 78 with pytest.raises(SyntaxError):
79 79 code.extract_symbols(source, "hello")
80 80
81 81
82 82 def test_magic_not_found():
83 83 # magic not found raises UsageError
84 84 with pytest.raises(UsageError):
85 85 _ip.magic('doesntexist')
86 86
87 87 # ensure result isn't success when a magic isn't found
88 88 result = _ip.run_cell('%doesntexist')
89 89 assert isinstance(result.error_in_exec, UsageError)
90 90
91 91
92 92 def test_cell_magic_not_found():
93 93 # magic not found raises UsageError
94 94 with pytest.raises(UsageError):
95 95 _ip.run_cell_magic('doesntexist', 'line', 'cell')
96 96
97 97 # ensure result isn't success when a magic isn't found
98 98 result = _ip.run_cell('%%doesntexist')
99 99 assert isinstance(result.error_in_exec, UsageError)
100 100
101 101
102 102 def test_magic_error_status():
103 103 def fail(shell):
104 104 1/0
105 105 _ip.register_magic_function(fail)
106 106 result = _ip.run_cell('%fail')
107 107 assert isinstance(result.error_in_exec, ZeroDivisionError)
108 108
109 109
110 110 def test_config():
111 111 """ test that config magic does not raise
112 112 can happen if Configurable init is moved too early into
113 113 Magics.__init__ as then a Config object will be registered as a
114 114 magic.
115 115 """
116 116 ## should not raise.
117 117 _ip.magic('config')
118 118
119 119 def test_config_available_configs():
120 120 """ test that config magic prints available configs in unique and
121 121 sorted order. """
122 122 with capture_output() as captured:
123 123 _ip.magic('config')
124 124
125 125 stdout = captured.stdout
126 126 config_classes = stdout.strip().split('\n')[1:]
127 127 assert config_classes == sorted(set(config_classes))
128 128
129 129 def test_config_print_class():
130 130 """ test that config with a classname prints the class's options. """
131 131 with capture_output() as captured:
132 132 _ip.magic('config TerminalInteractiveShell')
133 133
134 134 stdout = captured.stdout
135 135 assert re.match(
136 136 "TerminalInteractiveShell.* options", stdout.splitlines()[0]
137 137 ), f"{stdout}\n\n1st line of stdout not like 'TerminalInteractiveShell.* options'"
138 138
139 139
140 140 def test_rehashx():
141 141 # clear up everything
142 142 _ip.alias_manager.clear_aliases()
143 143 del _ip.db['syscmdlist']
144 144
145 145 _ip.magic('rehashx')
146 146 # Practically ALL ipython development systems will have more than 10 aliases
147 147
148 148 assert len(_ip.alias_manager.aliases) > 10
149 149 for name, cmd in _ip.alias_manager.aliases:
150 150 # we must strip dots from alias names
151 151 assert "." not in name
152 152
153 153 # rehashx must fill up syscmdlist
154 154 scoms = _ip.db['syscmdlist']
155 155 assert len(scoms) > 10
156 156
157 157
158 158 def test_magic_parse_options():
159 159 """Test that we don't mangle paths when parsing magic options."""
160 160 ip = get_ipython()
161 161 path = 'c:\\x'
162 162 m = DummyMagics(ip)
163 163 opts = m.parse_options('-f %s' % path,'f:')[0]
164 164 # argv splitting is os-dependent
165 165 if os.name == 'posix':
166 166 expected = 'c:x'
167 167 else:
168 168 expected = path
169 169 assert opts["f"] == expected
170 170
171 171
172 172 def test_magic_parse_long_options():
173 173 """Magic.parse_options can handle --foo=bar long options"""
174 174 ip = get_ipython()
175 175 m = DummyMagics(ip)
176 176 opts, _ = m.parse_options("--foo --bar=bubble", "a", "foo", "bar=")
177 177 assert "foo" in opts
178 178 assert "bar" in opts
179 179 assert opts["bar"] == "bubble"
180 180
181 181
182 182 def doctest_hist_f():
183 183 """Test %hist -f with temporary filename.
184 184
185 185 In [9]: import tempfile
186 186
187 187 In [10]: tfile = tempfile.mktemp('.py','tmp-ipython-')
188 188
189 189 In [11]: %hist -nl -f $tfile 3
190 190
191 191 In [13]: import os; os.unlink(tfile)
192 192 """
193 193
194 194
195 195 def doctest_hist_op():
196 196 """Test %hist -op
197 197
198 198 In [1]: class b(float):
199 199 ...: pass
200 200 ...:
201 201
202 202 In [2]: class s(object):
203 203 ...: def __str__(self):
204 204 ...: return 's'
205 205 ...:
206 206
207 207 In [3]:
208 208
209 209 In [4]: class r(b):
210 210 ...: def __repr__(self):
211 211 ...: return 'r'
212 212 ...:
213 213
214 214 In [5]: class sr(s,r): pass
215 215 ...:
216 216
217 217 In [6]:
218 218
219 219 In [7]: bb=b()
220 220
221 221 In [8]: ss=s()
222 222
223 223 In [9]: rr=r()
224 224
225 225 In [10]: ssrr=sr()
226 226
227 227 In [11]: 4.5
228 228 Out[11]: 4.5
229 229
230 230 In [12]: str(ss)
231 231 Out[12]: 's'
232 232
233 233 In [13]:
234 234
235 235 In [14]: %hist -op
236 236 >>> class b:
237 237 ... pass
238 238 ...
239 239 >>> class s(b):
240 240 ... def __str__(self):
241 241 ... return 's'
242 242 ...
243 243 >>>
244 244 >>> class r(b):
245 245 ... def __repr__(self):
246 246 ... return 'r'
247 247 ...
248 248 >>> class sr(s,r): pass
249 249 >>>
250 250 >>> bb=b()
251 251 >>> ss=s()
252 252 >>> rr=r()
253 253 >>> ssrr=sr()
254 254 >>> 4.5
255 255 4.5
256 256 >>> str(ss)
257 257 's'
258 258 >>>
259 259 """
260 260
261 261 def test_hist_pof():
262 262 ip = get_ipython()
263 263 ip.run_cell("1+2", store_history=True)
264 264 #raise Exception(ip.history_manager.session_number)
265 265 #raise Exception(list(ip.history_manager._get_range_session()))
266 266 with TemporaryDirectory() as td:
267 267 tf = os.path.join(td, 'hist.py')
268 268 ip.run_line_magic('history', '-pof %s' % tf)
269 269 assert os.path.isfile(tf)
270 270
271 271
272 272 def test_macro():
273 273 ip = get_ipython()
274 274 ip.history_manager.reset() # Clear any existing history.
275 275 cmds = ["a=1", "def b():\n return a**2", "print(a,b())"]
276 276 for i, cmd in enumerate(cmds, start=1):
277 277 ip.history_manager.store_inputs(i, cmd)
278 278 ip.magic("macro test 1-3")
279 279 assert ip.user_ns["test"].value == "\n".join(cmds) + "\n"
280 280
281 281 # List macros
282 282 assert "test" in ip.magic("macro")
283 283
284 284
285 285 def test_macro_run():
286 286 """Test that we can run a multi-line macro successfully."""
287 287 ip = get_ipython()
288 288 ip.history_manager.reset()
289 289 cmds = ["a=10", "a+=1", "print(a)", "%macro test 2-3"]
290 290 for cmd in cmds:
291 291 ip.run_cell(cmd, store_history=True)
292 292 assert ip.user_ns["test"].value == "a+=1\nprint(a)\n"
293 293 with tt.AssertPrints("12"):
294 294 ip.run_cell("test")
295 295 with tt.AssertPrints("13"):
296 296 ip.run_cell("test")
297 297
298 298
299 299 def test_magic_magic():
300 300 """Test %magic"""
301 301 ip = get_ipython()
302 302 with capture_output() as captured:
303 303 ip.magic("magic")
304 304
305 305 stdout = captured.stdout
306 306 assert "%magic" in stdout
307 307 assert "IPython" in stdout
308 308 assert "Available" in stdout
309 309
310 310
311 311 @dec.skipif_not_numpy
312 312 def test_numpy_reset_array_undec():
313 313 "Test '%reset array' functionality"
314 314 _ip.ex("import numpy as np")
315 315 _ip.ex("a = np.empty(2)")
316 316 assert "a" in _ip.user_ns
317 317 _ip.magic("reset -f array")
318 318 assert "a" not in _ip.user_ns
319 319
320 320
321 321 def test_reset_out():
322 322 "Test '%reset out' magic"
323 323 _ip.run_cell("parrot = 'dead'", store_history=True)
324 324 # test '%reset -f out', make an Out prompt
325 325 _ip.run_cell("parrot", store_history=True)
326 326 assert "dead" in [_ip.user_ns[x] for x in ("_", "__", "___")]
327 327 _ip.magic("reset -f out")
328 328 assert "dead" not in [_ip.user_ns[x] for x in ("_", "__", "___")]
329 329 assert len(_ip.user_ns["Out"]) == 0
330 330
331 331
332 332 def test_reset_in():
333 333 "Test '%reset in' magic"
334 334 # test '%reset -f in'
335 335 _ip.run_cell("parrot", store_history=True)
336 336 assert "parrot" in [_ip.user_ns[x] for x in ("_i", "_ii", "_iii")]
337 337 _ip.magic("%reset -f in")
338 338 assert "parrot" not in [_ip.user_ns[x] for x in ("_i", "_ii", "_iii")]
339 339 assert len(set(_ip.user_ns["In"])) == 1
340 340
341 341
342 342 def test_reset_dhist():
343 343 "Test '%reset dhist' magic"
344 344 _ip.run_cell("tmp = [d for d in _dh]") # copy before clearing
345 345 _ip.magic("cd " + os.path.dirname(pytest.__file__))
346 346 _ip.magic("cd -")
347 347 assert len(_ip.user_ns["_dh"]) > 0
348 348 _ip.magic("reset -f dhist")
349 349 assert len(_ip.user_ns["_dh"]) == 0
350 350 _ip.run_cell("_dh = [d for d in tmp]") # restore
351 351
352 352
353 353 def test_reset_in_length():
354 354 "Test that '%reset in' preserves In[] length"
355 355 _ip.run_cell("print 'foo'")
356 356 _ip.run_cell("reset -f in")
357 357 assert len(_ip.user_ns["In"]) == _ip.displayhook.prompt_count + 1
358 358
359 359
360 360 class TestResetErrors(TestCase):
361 361
362 362 def test_reset_redefine(self):
363 363
364 364 @magics_class
365 365 class KernelMagics(Magics):
366 366 @line_magic
367 367 def less(self, shell): pass
368 368
369 369 _ip.register_magics(KernelMagics)
370 370
371 371 with self.assertLogs() as cm:
372 372 # hack, we want to just capture logs, but assertLogs fails if not
373 373 # logs get produce.
374 374 # so log one things we ignore.
375 375 import logging as log_mod
376 376 log = log_mod.getLogger()
377 377 log.info('Nothing')
378 378 # end hack.
379 379 _ip.run_cell("reset -f")
380 380
381 381 assert len(cm.output) == 1
382 382 for out in cm.output:
383 383 assert "Invalid alias" not in out
384 384
385 385 def test_tb_syntaxerror():
386 386 """test %tb after a SyntaxError"""
387 387 ip = get_ipython()
388 388 ip.run_cell("for")
389 389
390 390 # trap and validate stdout
391 391 save_stdout = sys.stdout
392 392 try:
393 393 sys.stdout = StringIO()
394 394 ip.run_cell("%tb")
395 395 out = sys.stdout.getvalue()
396 396 finally:
397 397 sys.stdout = save_stdout
398 398 # trim output, and only check the last line
399 399 last_line = out.rstrip().splitlines()[-1].strip()
400 400 assert last_line == "SyntaxError: invalid syntax"
401 401
402 402
403 403 def test_time():
404 404 ip = get_ipython()
405 405
406 406 with tt.AssertPrints("Wall time: "):
407 407 ip.run_cell("%time None")
408 408
409 409 ip.run_cell("def f(kmjy):\n"
410 410 " %time print (2*kmjy)")
411 411
412 412 with tt.AssertPrints("Wall time: "):
413 413 with tt.AssertPrints("hihi", suppress=False):
414 414 ip.run_cell("f('hi')")
415 415
416 416 def test_time_last_not_expression():
417 417 ip.run_cell("%%time\n"
418 418 "var_1 = 1\n"
419 419 "var_2 = 2\n")
420 420 assert ip.user_ns['var_1'] == 1
421 421 del ip.user_ns['var_1']
422 422 assert ip.user_ns['var_2'] == 2
423 423 del ip.user_ns['var_2']
424 424
425 425
426 426 @dec.skip_win32
427 427 def test_time2():
428 428 ip = get_ipython()
429 429
430 430 with tt.AssertPrints("CPU times: user "):
431 431 ip.run_cell("%time None")
432 432
433 433 def test_time3():
434 434 """Erroneous magic function calls, issue gh-3334"""
435 435 ip = get_ipython()
436 436 ip.user_ns.pop('run', None)
437 437
438 438 with tt.AssertNotPrints("not found", channel='stderr'):
439 439 ip.run_cell("%%time\n"
440 440 "run = 0\n"
441 441 "run += 1")
442 442
443 443 def test_multiline_time():
444 444 """Make sure last statement from time return a value."""
445 445 ip = get_ipython()
446 446 ip.user_ns.pop('run', None)
447 447
448 448 ip.run_cell(dedent("""\
449 449 %%time
450 450 a = "ho"
451 451 b = "hey"
452 452 a+b
453 453 """
454 454 )
455 455 )
456 456 assert ip.user_ns_hidden["_"] == "hohey"
457 457
458 458
459 459 def test_time_local_ns():
460 460 """
461 461 Test that local_ns is actually global_ns when running a cell magic
462 462 """
463 463 ip = get_ipython()
464 464 ip.run_cell("%%time\n" "myvar = 1")
465 465 assert ip.user_ns["myvar"] == 1
466 466 del ip.user_ns["myvar"]
467 467
468 468
469 469 def test_doctest_mode():
470 470 "Toggle doctest_mode twice, it should be a no-op and run without error"
471 471 _ip.magic('doctest_mode')
472 472 _ip.magic('doctest_mode')
473 473
474 474
475 475 def test_parse_options():
476 476 """Tests for basic options parsing in magics."""
477 477 # These are only the most minimal of tests, more should be added later. At
478 478 # the very least we check that basic text/unicode calls work OK.
479 479 m = DummyMagics(_ip)
480 480 assert m.parse_options("foo", "")[1] == "foo"
481 481 assert m.parse_options("foo", "")[1] == "foo"
482 482
483 483
484 484 def test_parse_options_preserve_non_option_string():
485 485 """Test to assert preservation of non-option part of magic-block, while parsing magic options."""
486 486 m = DummyMagics(_ip)
487 487 opts, stmt = m.parse_options(
488 488 " -n1 -r 13 _ = 314 + foo", "n:r:", preserve_non_opts=True
489 489 )
490 490 assert opts == {"n": "1", "r": "13"}
491 491 assert stmt == "_ = 314 + foo"
492 492
493 493
494 494 def test_run_magic_preserve_code_block():
495 495 """Test to assert preservation of non-option part of magic-block, while running magic."""
496 496 _ip.user_ns["spaces"] = []
497 497 _ip.magic("timeit -n1 -r1 spaces.append([s.count(' ') for s in ['document']])")
498 498 assert _ip.user_ns["spaces"] == [[0]]
499 499
500 500
501 501 def test_dirops():
502 502 """Test various directory handling operations."""
503 503 # curpath = lambda :os.path.splitdrive(os.getcwd())[1].replace('\\','/')
504 504 curpath = os.getcwd
505 505 startdir = os.getcwd()
506 506 ipdir = os.path.realpath(_ip.ipython_dir)
507 507 try:
508 508 _ip.magic('cd "%s"' % ipdir)
509 509 assert curpath() == ipdir
510 510 _ip.magic('cd -')
511 511 assert curpath() == startdir
512 512 _ip.magic('pushd "%s"' % ipdir)
513 513 assert curpath() == ipdir
514 514 _ip.magic('popd')
515 515 assert curpath() == startdir
516 516 finally:
517 517 os.chdir(startdir)
518 518
519 519
520 520 def test_cd_force_quiet():
521 521 """Test OSMagics.cd_force_quiet option"""
522 522 _ip.config.OSMagics.cd_force_quiet = True
523 523 osmagics = osm.OSMagics(shell=_ip)
524 524
525 525 startdir = os.getcwd()
526 526 ipdir = os.path.realpath(_ip.ipython_dir)
527 527
528 528 try:
529 529 with tt.AssertNotPrints(ipdir):
530 530 osmagics.cd('"%s"' % ipdir)
531 531 with tt.AssertNotPrints(startdir):
532 532 osmagics.cd('-')
533 533 finally:
534 534 os.chdir(startdir)
535 535
536 536
537 537 def test_xmode():
538 538 # Calling xmode three times should be a no-op
539 539 xmode = _ip.InteractiveTB.mode
540 540 for i in range(4):
541 541 _ip.magic("xmode")
542 542 assert _ip.InteractiveTB.mode == xmode
543 543
544 544 def test_reset_hard():
545 545 monitor = []
546 546 class A(object):
547 547 def __del__(self):
548 548 monitor.append(1)
549 549 def __repr__(self):
550 550 return "<A instance>"
551 551
552 552 _ip.user_ns["a"] = A()
553 553 _ip.run_cell("a")
554 554
555 555 assert monitor == []
556 556 _ip.magic("reset -f")
557 557 assert monitor == [1]
558 558
559 559 class TestXdel(tt.TempFileMixin):
560 560 def test_xdel(self):
561 561 """Test that references from %run are cleared by xdel."""
562 562 src = ("class A(object):\n"
563 563 " monitor = []\n"
564 564 " def __del__(self):\n"
565 565 " self.monitor.append(1)\n"
566 566 "a = A()\n")
567 567 self.mktmp(src)
568 568 # %run creates some hidden references...
569 569 _ip.magic("run %s" % self.fname)
570 570 # ... as does the displayhook.
571 571 _ip.run_cell("a")
572 572
573 573 monitor = _ip.user_ns["A"].monitor
574 574 assert monitor == []
575 575
576 576 _ip.magic("xdel a")
577 577
578 578 # Check that a's __del__ method has been called.
579 579 assert monitor == [1]
580 580
581 581 def doctest_who():
582 582 """doctest for %who
583 583
584 584 In [1]: %reset -sf
585 585
586 586 In [2]: alpha = 123
587 587
588 588 In [3]: beta = 'beta'
589 589
590 590 In [4]: %who int
591 591 alpha
592 592
593 593 In [5]: %who str
594 594 beta
595 595
596 596 In [6]: %whos
597 597 Variable Type Data/Info
598 598 ----------------------------
599 599 alpha int 123
600 600 beta str beta
601 601
602 602 In [7]: %who_ls
603 603 Out[7]: ['alpha', 'beta']
604 604 """
605 605
606 606 def test_whos():
607 607 """Check that whos is protected against objects where repr() fails."""
608 608 class A(object):
609 609 def __repr__(self):
610 610 raise Exception()
611 611 _ip.user_ns['a'] = A()
612 612 _ip.magic("whos")
613 613
614 614 def doctest_precision():
615 615 """doctest for %precision
616 616
617 617 In [1]: f = get_ipython().display_formatter.formatters['text/plain']
618 618
619 619 In [2]: %precision 5
620 620 Out[2]: '%.5f'
621 621
622 622 In [3]: f.float_format
623 623 Out[3]: '%.5f'
624 624
625 625 In [4]: %precision %e
626 626 Out[4]: '%e'
627 627
628 628 In [5]: f(3.1415927)
629 629 Out[5]: '3.141593e+00'
630 630 """
631 631
632 632 def test_debug_magic():
633 633 """Test debugging a small code with %debug
634 634
635 635 In [1]: with PdbTestInput(['c']):
636 636 ...: %debug print("a b") #doctest: +ELLIPSIS
637 637 ...:
638 638 ...
639 639 ipdb> c
640 640 a b
641 641 In [2]:
642 642 """
643 643
644 644 def test_psearch():
645 645 with tt.AssertPrints("dict.fromkeys"):
646 646 _ip.run_cell("dict.fr*?")
647 647 with tt.AssertPrints("Ο€.is_integer"):
648 648 _ip.run_cell("Ο€ = 3.14;\nΟ€.is_integ*?")
649 649
650 650 def test_timeit_shlex():
651 651 """test shlex issues with timeit (#1109)"""
652 652 _ip.ex("def f(*a,**kw): pass")
653 653 _ip.magic('timeit -n1 "this is a bug".count(" ")')
654 654 _ip.magic('timeit -r1 -n1 f(" ", 1)')
655 655 _ip.magic('timeit -r1 -n1 f(" ", 1, " ", 2, " ")')
656 656 _ip.magic('timeit -r1 -n1 ("a " + "b")')
657 657 _ip.magic('timeit -r1 -n1 f("a " + "b")')
658 658 _ip.magic('timeit -r1 -n1 f("a " + "b ")')
659 659
660 660
661 661 def test_timeit_special_syntax():
662 662 "Test %%timeit with IPython special syntax"
663 663 @register_line_magic
664 664 def lmagic(line):
665 665 ip = get_ipython()
666 666 ip.user_ns['lmagic_out'] = line
667 667
668 668 # line mode test
669 669 _ip.run_line_magic("timeit", "-n1 -r1 %lmagic my line")
670 670 assert _ip.user_ns["lmagic_out"] == "my line"
671 671 # cell mode test
672 672 _ip.run_cell_magic("timeit", "-n1 -r1", "%lmagic my line2")
673 673 assert _ip.user_ns["lmagic_out"] == "my line2"
674 674
675 675
676 676 def test_timeit_return():
677 677 """
678 678 test whether timeit -o return object
679 679 """
680 680
681 681 res = _ip.run_line_magic('timeit','-n10 -r10 -o 1')
682 682 assert(res is not None)
683 683
684 684 def test_timeit_quiet():
685 685 """
686 686 test quiet option of timeit magic
687 687 """
688 688 with tt.AssertNotPrints("loops"):
689 689 _ip.run_cell("%timeit -n1 -r1 -q 1")
690 690
691 691 def test_timeit_return_quiet():
692 692 with tt.AssertNotPrints("loops"):
693 693 res = _ip.run_line_magic('timeit', '-n1 -r1 -q -o 1')
694 694 assert (res is not None)
695 695
696 696 def test_timeit_invalid_return():
697 697 with pytest.raises(SyntaxError):
698 698 _ip.run_line_magic('timeit', 'return')
699 699
700 700 @dec.skipif(execution.profile is None)
701 701 def test_prun_special_syntax():
702 702 "Test %%prun with IPython special syntax"
703 703 @register_line_magic
704 704 def lmagic(line):
705 705 ip = get_ipython()
706 706 ip.user_ns['lmagic_out'] = line
707 707
708 708 # line mode test
709 709 _ip.run_line_magic("prun", "-q %lmagic my line")
710 710 assert _ip.user_ns["lmagic_out"] == "my line"
711 711 # cell mode test
712 712 _ip.run_cell_magic("prun", "-q", "%lmagic my line2")
713 713 assert _ip.user_ns["lmagic_out"] == "my line2"
714 714
715 715
716 716 @dec.skipif(execution.profile is None)
717 717 def test_prun_quotes():
718 718 "Test that prun does not clobber string escapes (GH #1302)"
719 719 _ip.magic(r"prun -q x = '\t'")
720 720 assert _ip.user_ns["x"] == "\t"
721 721
722 722
723 723 def test_extension():
724 724 # Debugging information for failures of this test
725 725 print('sys.path:')
726 726 for p in sys.path:
727 727 print(' ', p)
728 728 print('CWD', os.getcwd())
729 729
730 730 pytest.raises(ImportError, _ip.magic, "load_ext daft_extension")
731 731 daft_path = os.path.join(os.path.dirname(__file__), "daft_extension")
732 732 sys.path.insert(0, daft_path)
733 733 try:
734 734 _ip.user_ns.pop('arq', None)
735 735 invalidate_caches() # Clear import caches
736 736 _ip.magic("load_ext daft_extension")
737 737 assert _ip.user_ns["arq"] == 185
738 738 _ip.magic("unload_ext daft_extension")
739 739 assert 'arq' not in _ip.user_ns
740 740 finally:
741 741 sys.path.remove(daft_path)
742 742
743 743
744 744 def test_notebook_export_json():
745 745 pytest.importorskip("nbformat")
746 746 _ip = get_ipython()
747 747 _ip.history_manager.reset() # Clear any existing history.
748 748 cmds = ["a=1", "def b():\n return a**2", "print('noΓ«l, Γ©tΓ©', b())"]
749 749 for i, cmd in enumerate(cmds, start=1):
750 750 _ip.history_manager.store_inputs(i, cmd)
751 751 with TemporaryDirectory() as td:
752 752 outfile = os.path.join(td, "nb.ipynb")
753 753 _ip.magic("notebook -e %s" % outfile)
754 754
755 755
756 756 class TestEnv(TestCase):
757 757
758 758 def test_env(self):
759 759 env = _ip.magic("env")
760 760 self.assertTrue(isinstance(env, dict))
761 761
762 762 def test_env_secret(self):
763 763 env = _ip.magic("env")
764 764 hidden = "<hidden>"
765 765 with mock.patch.dict(
766 766 os.environ,
767 767 {
768 768 "API_KEY": "abc123",
769 769 "SECRET_THING": "ssshhh",
770 770 "JUPYTER_TOKEN": "",
771 771 "VAR": "abc"
772 772 }
773 773 ):
774 774 env = _ip.magic("env")
775 775 assert env["API_KEY"] == hidden
776 776 assert env["SECRET_THING"] == hidden
777 777 assert env["JUPYTER_TOKEN"] == hidden
778 778 assert env["VAR"] == "abc"
779 779
780 780 def test_env_get_set_simple(self):
781 781 env = _ip.magic("env var val1")
782 782 self.assertEqual(env, None)
783 783 self.assertEqual(os.environ['var'], 'val1')
784 784 self.assertEqual(_ip.magic("env var"), 'val1')
785 785 env = _ip.magic("env var=val2")
786 786 self.assertEqual(env, None)
787 787 self.assertEqual(os.environ['var'], 'val2')
788 788
789 789 def test_env_get_set_complex(self):
790 790 env = _ip.magic("env var 'val1 '' 'val2")
791 791 self.assertEqual(env, None)
792 792 self.assertEqual(os.environ['var'], "'val1 '' 'val2")
793 793 self.assertEqual(_ip.magic("env var"), "'val1 '' 'val2")
794 794 env = _ip.magic('env var=val2 val3="val4')
795 795 self.assertEqual(env, None)
796 796 self.assertEqual(os.environ['var'], 'val2 val3="val4')
797 797
798 798 def test_env_set_bad_input(self):
799 799 self.assertRaises(UsageError, lambda: _ip.magic("set_env var"))
800 800
801 801 def test_env_set_whitespace(self):
802 802 self.assertRaises(UsageError, lambda: _ip.magic("env var A=B"))
803 803
804 804
805 805 class CellMagicTestCase(TestCase):
806 806
807 807 def check_ident(self, magic):
808 808 # Manually called, we get the result
809 809 out = _ip.run_cell_magic(magic, "a", "b")
810 810 assert out == ("a", "b")
811 811 # Via run_cell, it goes into the user's namespace via displayhook
812 812 _ip.run_cell("%%" + magic + " c\nd\n")
813 813 assert _ip.user_ns["_"] == ("c", "d\n")
814 814
815 815 def test_cell_magic_func_deco(self):
816 816 "Cell magic using simple decorator"
817 817 @register_cell_magic
818 818 def cellm(line, cell):
819 819 return line, cell
820 820
821 821 self.check_ident('cellm')
822 822
823 823 def test_cell_magic_reg(self):
824 824 "Cell magic manually registered"
825 825 def cellm(line, cell):
826 826 return line, cell
827 827
828 828 _ip.register_magic_function(cellm, 'cell', 'cellm2')
829 829 self.check_ident('cellm2')
830 830
831 831 def test_cell_magic_class(self):
832 832 "Cell magics declared via a class"
833 833 @magics_class
834 834 class MyMagics(Magics):
835 835
836 836 @cell_magic
837 837 def cellm3(self, line, cell):
838 838 return line, cell
839 839
840 840 _ip.register_magics(MyMagics)
841 841 self.check_ident('cellm3')
842 842
843 843 def test_cell_magic_class2(self):
844 844 "Cell magics declared via a class, #2"
845 845 @magics_class
846 846 class MyMagics2(Magics):
847 847
848 848 @cell_magic('cellm4')
849 849 def cellm33(self, line, cell):
850 850 return line, cell
851 851
852 852 _ip.register_magics(MyMagics2)
853 853 self.check_ident('cellm4')
854 854 # Check that nothing is registered as 'cellm33'
855 855 c33 = _ip.find_cell_magic('cellm33')
856 856 assert c33 == None
857 857
858 858 def test_file():
859 859 """Basic %%writefile"""
860 860 ip = get_ipython()
861 861 with TemporaryDirectory() as td:
862 862 fname = os.path.join(td, 'file1')
863 863 ip.run_cell_magic("writefile", fname, u'\n'.join([
864 864 'line1',
865 865 'line2',
866 866 ]))
867 867 s = Path(fname).read_text()
868 868 assert "line1\n" in s
869 869 assert "line2" in s
870 870
871 871
872 872 @dec.skip_win32
873 873 def test_file_single_quote():
874 874 """Basic %%writefile with embedded single quotes"""
875 875 ip = get_ipython()
876 876 with TemporaryDirectory() as td:
877 877 fname = os.path.join(td, '\'file1\'')
878 878 ip.run_cell_magic("writefile", fname, u'\n'.join([
879 879 'line1',
880 880 'line2',
881 881 ]))
882 882 s = Path(fname).read_text()
883 883 assert "line1\n" in s
884 884 assert "line2" in s
885 885
886 886
887 887 @dec.skip_win32
888 888 def test_file_double_quote():
889 889 """Basic %%writefile with embedded double quotes"""
890 890 ip = get_ipython()
891 891 with TemporaryDirectory() as td:
892 892 fname = os.path.join(td, '"file1"')
893 893 ip.run_cell_magic("writefile", fname, u'\n'.join([
894 894 'line1',
895 895 'line2',
896 896 ]))
897 897 s = Path(fname).read_text()
898 898 assert "line1\n" in s
899 899 assert "line2" in s
900 900
901 901
902 902 def test_file_var_expand():
903 903 """%%writefile $filename"""
904 904 ip = get_ipython()
905 905 with TemporaryDirectory() as td:
906 906 fname = os.path.join(td, 'file1')
907 907 ip.user_ns['filename'] = fname
908 908 ip.run_cell_magic("writefile", '$filename', u'\n'.join([
909 909 'line1',
910 910 'line2',
911 911 ]))
912 912 s = Path(fname).read_text()
913 913 assert "line1\n" in s
914 914 assert "line2" in s
915 915
916 916
917 917 def test_file_unicode():
918 918 """%%writefile with unicode cell"""
919 919 ip = get_ipython()
920 920 with TemporaryDirectory() as td:
921 921 fname = os.path.join(td, 'file1')
922 922 ip.run_cell_magic("writefile", fname, u'\n'.join([
923 923 u'linΓ©1',
924 924 u'linΓ©2',
925 925 ]))
926 926 with io.open(fname, encoding='utf-8') as f:
927 927 s = f.read()
928 928 assert "linΓ©1\n" in s
929 929 assert "linΓ©2" in s
930 930
931 931
932 932 def test_file_amend():
933 933 """%%writefile -a amends files"""
934 934 ip = get_ipython()
935 935 with TemporaryDirectory() as td:
936 936 fname = os.path.join(td, 'file2')
937 937 ip.run_cell_magic("writefile", fname, u'\n'.join([
938 938 'line1',
939 939 'line2',
940 940 ]))
941 941 ip.run_cell_magic("writefile", "-a %s" % fname, u'\n'.join([
942 942 'line3',
943 943 'line4',
944 944 ]))
945 945 s = Path(fname).read_text()
946 946 assert "line1\n" in s
947 947 assert "line3\n" in s
948 948
949 949
950 950 def test_file_spaces():
951 951 """%%file with spaces in filename"""
952 952 ip = get_ipython()
953 953 with TemporaryWorkingDirectory() as td:
954 954 fname = "file name"
955 955 ip.run_cell_magic("file", '"%s"'%fname, u'\n'.join([
956 956 'line1',
957 957 'line2',
958 958 ]))
959 959 s = Path(fname).read_text()
960 960 assert "line1\n" in s
961 961 assert "line2" in s
962 962
963 963
964 964 def test_script_config():
965 965 ip = get_ipython()
966 966 ip.config.ScriptMagics.script_magics = ['whoda']
967 967 sm = script.ScriptMagics(shell=ip)
968 968 assert "whoda" in sm.magics["cell"]
969 969
970 970
971 971 @pytest.fixture
972 972 def event_loop():
973 yield asyncio.get_event_loop_policy().get_event_loop()
973 policy = asyncio.get_event_loop_policy()
974 loop = policy.new_event_loop()
975 policy.set_event_loop(loop)
976 yield loop
977 loop.close()
974 978
975 979
976 980 @dec.skip_win32
977 981 @pytest.mark.skipif(
978 982 sys.platform == "win32", reason="This test does not run under Windows"
979 983 )
980 984 def test_script_out(event_loop):
981 985 assert event_loop.is_running() is False
982 986
983 987 ip = get_ipython()
984 988 ip.run_cell_magic("script", "--out output sh", "echo 'hi'")
985 989 assert event_loop.is_running() is False
986 990 assert ip.user_ns["output"] == "hi\n"
987 991
988 992
989 993 @dec.skip_win32
990 994 @pytest.mark.skipif(
991 995 sys.platform == "win32", reason="This test does not run under Windows"
992 996 )
993 997 def test_script_err(event_loop):
994 998 ip = get_ipython()
995 999 assert event_loop.is_running() is False
996 1000 ip.run_cell_magic("script", "--err error sh", "echo 'hello' >&2")
997 1001 assert event_loop.is_running() is False
998 1002 assert ip.user_ns["error"] == "hello\n"
999 1003
1000 1004
1001 1005 @dec.skip_win32
1002 1006 @pytest.mark.skipif(
1003 1007 sys.platform == "win32", reason="This test does not run under Windows"
1004 1008 )
1005 1009 def test_script_out_err():
1006 1010
1007 1011 ip = get_ipython()
1008 1012 ip.run_cell_magic(
1009 1013 "script", "--out output --err error sh", "echo 'hi'\necho 'hello' >&2"
1010 1014 )
1011 1015 assert ip.user_ns["output"] == "hi\n"
1012 1016 assert ip.user_ns["error"] == "hello\n"
1013 1017
1014 1018
1015 1019 @dec.skip_win32
1016 1020 @pytest.mark.skipif(
1017 1021 sys.platform == "win32", reason="This test does not run under Windows"
1018 1022 )
1019 1023 async def test_script_bg_out(event_loop):
1020 1024 ip = get_ipython()
1021 1025 ip.run_cell_magic("script", "--bg --out output sh", "echo 'hi'")
1022 1026 assert (await ip.user_ns["output"].read()) == b"hi\n"
1023 1027 ip.user_ns["output"].close()
1024 1028 event_loop.stop()
1025 1029
1026 1030
1027 1031 @dec.skip_win32
1028 1032 @pytest.mark.skipif(
1029 1033 sys.platform == "win32", reason="This test does not run under Windows"
1030 1034 )
1031 1035 async def test_script_bg_err():
1032 1036 ip = get_ipython()
1033 1037 ip.run_cell_magic("script", "--bg --err error sh", "echo 'hello' >&2")
1034 1038 assert (await ip.user_ns["error"].read()) == b"hello\n"
1035 1039 ip.user_ns["error"].close()
1036 1040
1037 1041
1038 1042 @dec.skip_win32
1039 1043 @pytest.mark.skipif(
1040 1044 sys.platform == "win32", reason="This test does not run under Windows"
1041 1045 )
1042 1046 async def test_script_bg_out_err():
1043 1047 ip = get_ipython()
1044 1048 ip.run_cell_magic(
1045 1049 "script", "--bg --out output --err error sh", "echo 'hi'\necho 'hello' >&2"
1046 1050 )
1047 1051 assert (await ip.user_ns["output"].read()) == b"hi\n"
1048 1052 assert (await ip.user_ns["error"].read()) == b"hello\n"
1049 1053 ip.user_ns["output"].close()
1050 1054 ip.user_ns["error"].close()
1051 1055
1052 1056
1053 1057 def test_script_defaults():
1054 1058 ip = get_ipython()
1055 1059 for cmd in ['sh', 'bash', 'perl', 'ruby']:
1056 1060 try:
1057 1061 find_cmd(cmd)
1058 1062 except Exception:
1059 1063 pass
1060 1064 else:
1061 1065 assert cmd in ip.magics_manager.magics["cell"]
1062 1066
1063 1067
1064 1068 @magics_class
1065 1069 class FooFoo(Magics):
1066 1070 """class with both %foo and %%foo magics"""
1067 1071 @line_magic('foo')
1068 1072 def line_foo(self, line):
1069 1073 "I am line foo"
1070 1074 pass
1071 1075
1072 1076 @cell_magic("foo")
1073 1077 def cell_foo(self, line, cell):
1074 1078 "I am cell foo, not line foo"
1075 1079 pass
1076 1080
1077 1081 def test_line_cell_info():
1078 1082 """%%foo and %foo magics are distinguishable to inspect"""
1079 1083 ip = get_ipython()
1080 1084 ip.magics_manager.register(FooFoo)
1081 1085 oinfo = ip.object_inspect("foo")
1082 1086 assert oinfo["found"] is True
1083 1087 assert oinfo["ismagic"] is True
1084 1088
1085 1089 oinfo = ip.object_inspect("%%foo")
1086 1090 assert oinfo["found"] is True
1087 1091 assert oinfo["ismagic"] is True
1088 1092 assert oinfo["docstring"] == FooFoo.cell_foo.__doc__
1089 1093
1090 1094 oinfo = ip.object_inspect("%foo")
1091 1095 assert oinfo["found"] is True
1092 1096 assert oinfo["ismagic"] is True
1093 1097 assert oinfo["docstring"] == FooFoo.line_foo.__doc__
1094 1098
1095 1099
1096 1100 def test_multiple_magics():
1097 1101 ip = get_ipython()
1098 1102 foo1 = FooFoo(ip)
1099 1103 foo2 = FooFoo(ip)
1100 1104 mm = ip.magics_manager
1101 1105 mm.register(foo1)
1102 1106 assert mm.magics["line"]["foo"].__self__ is foo1
1103 1107 mm.register(foo2)
1104 1108 assert mm.magics["line"]["foo"].__self__ is foo2
1105 1109
1106 1110
1107 1111 def test_alias_magic():
1108 1112 """Test %alias_magic."""
1109 1113 ip = get_ipython()
1110 1114 mm = ip.magics_manager
1111 1115
1112 1116 # Basic operation: both cell and line magics are created, if possible.
1113 1117 ip.run_line_magic("alias_magic", "timeit_alias timeit")
1114 1118 assert "timeit_alias" in mm.magics["line"]
1115 1119 assert "timeit_alias" in mm.magics["cell"]
1116 1120
1117 1121 # --cell is specified, line magic not created.
1118 1122 ip.run_line_magic("alias_magic", "--cell timeit_cell_alias timeit")
1119 1123 assert "timeit_cell_alias" not in mm.magics["line"]
1120 1124 assert "timeit_cell_alias" in mm.magics["cell"]
1121 1125
1122 1126 # Test that line alias is created successfully.
1123 1127 ip.run_line_magic("alias_magic", "--line env_alias env")
1124 1128 assert ip.run_line_magic("env", "") == ip.run_line_magic("env_alias", "")
1125 1129
1126 1130 # Test that line alias with parameters passed in is created successfully.
1127 1131 ip.run_line_magic(
1128 1132 "alias_magic", "--line history_alias history --params " + shlex.quote("3")
1129 1133 )
1130 1134 assert "history_alias" in mm.magics["line"]
1131 1135
1132 1136
1133 1137 def test_save():
1134 1138 """Test %save."""
1135 1139 ip = get_ipython()
1136 1140 ip.history_manager.reset() # Clear any existing history.
1137 1141 cmds = ["a=1", "def b():\n return a**2", "print(a, b())"]
1138 1142 for i, cmd in enumerate(cmds, start=1):
1139 1143 ip.history_manager.store_inputs(i, cmd)
1140 1144 with TemporaryDirectory() as tmpdir:
1141 1145 file = os.path.join(tmpdir, "testsave.py")
1142 1146 ip.run_line_magic("save", "%s 1-10" % file)
1143 1147 content = Path(file).read_text()
1144 1148 assert content.count(cmds[0]) == 1
1145 1149 assert "coding: utf-8" in content
1146 1150 ip.run_line_magic("save", "-a %s 1-10" % file)
1147 1151 content = Path(file).read_text()
1148 1152 assert content.count(cmds[0]) == 2
1149 1153 assert "coding: utf-8" in content
1150 1154
1151 1155
1152 1156 def test_save_with_no_args():
1153 1157 ip = get_ipython()
1154 1158 ip.history_manager.reset() # Clear any existing history.
1155 1159 cmds = ["a=1", "def b():\n return a**2", "print(a, b())", "%save"]
1156 1160 for i, cmd in enumerate(cmds, start=1):
1157 1161 ip.history_manager.store_inputs(i, cmd)
1158 1162
1159 1163 with TemporaryDirectory() as tmpdir:
1160 1164 path = os.path.join(tmpdir, "testsave.py")
1161 1165 ip.run_line_magic("save", path)
1162 1166 content = Path(path).read_text()
1163 1167 expected_content = dedent(
1164 1168 """\
1165 1169 # coding: utf-8
1166 1170 a=1
1167 1171 def b():
1168 1172 return a**2
1169 1173 print(a, b())
1170 1174 """
1171 1175 )
1172 1176 assert content == expected_content
1173 1177
1174 1178
1175 1179 def test_store():
1176 1180 """Test %store."""
1177 1181 ip = get_ipython()
1178 1182 ip.run_line_magic('load_ext', 'storemagic')
1179 1183
1180 1184 # make sure the storage is empty
1181 1185 ip.run_line_magic("store", "-z")
1182 1186 ip.user_ns["var"] = 42
1183 1187 ip.run_line_magic("store", "var")
1184 1188 ip.user_ns["var"] = 39
1185 1189 ip.run_line_magic("store", "-r")
1186 1190 assert ip.user_ns["var"] == 42
1187 1191
1188 1192 ip.run_line_magic("store", "-d var")
1189 1193 ip.user_ns["var"] = 39
1190 1194 ip.run_line_magic("store", "-r")
1191 1195 assert ip.user_ns["var"] == 39
1192 1196
1193 1197
1194 1198 def _run_edit_test(arg_s, exp_filename=None,
1195 1199 exp_lineno=-1,
1196 1200 exp_contents=None,
1197 1201 exp_is_temp=None):
1198 1202 ip = get_ipython()
1199 1203 M = code.CodeMagics(ip)
1200 1204 last_call = ['','']
1201 1205 opts,args = M.parse_options(arg_s,'prxn:')
1202 1206 filename, lineno, is_temp = M._find_edit_target(ip, args, opts, last_call)
1203 1207
1204 1208 if exp_filename is not None:
1205 1209 assert exp_filename == filename
1206 1210 if exp_contents is not None:
1207 1211 with io.open(filename, 'r', encoding='utf-8') as f:
1208 1212 contents = f.read()
1209 1213 assert exp_contents == contents
1210 1214 if exp_lineno != -1:
1211 1215 assert exp_lineno == lineno
1212 1216 if exp_is_temp is not None:
1213 1217 assert exp_is_temp == is_temp
1214 1218
1215 1219
1216 1220 def test_edit_interactive():
1217 1221 """%edit on interactively defined objects"""
1218 1222 ip = get_ipython()
1219 1223 n = ip.execution_count
1220 1224 ip.run_cell("def foo(): return 1", store_history=True)
1221 1225
1222 1226 with pytest.raises(code.InteractivelyDefined) as e:
1223 1227 _run_edit_test("foo")
1224 1228 assert e.value.index == n
1225 1229
1226 1230
1227 1231 def test_edit_cell():
1228 1232 """%edit [cell id]"""
1229 1233 ip = get_ipython()
1230 1234
1231 1235 ip.run_cell("def foo(): return 1", store_history=True)
1232 1236
1233 1237 # test
1234 1238 _run_edit_test("1", exp_contents=ip.user_ns['In'][1], exp_is_temp=True)
1235 1239
1236 1240 def test_edit_fname():
1237 1241 """%edit file"""
1238 1242 # test
1239 1243 _run_edit_test("test file.py", exp_filename="test file.py")
1240 1244
1241 1245 def test_bookmark():
1242 1246 ip = get_ipython()
1243 1247 ip.run_line_magic('bookmark', 'bmname')
1244 1248 with tt.AssertPrints('bmname'):
1245 1249 ip.run_line_magic('bookmark', '-l')
1246 1250 ip.run_line_magic('bookmark', '-d bmname')
1247 1251
1248 1252 def test_ls_magic():
1249 1253 ip = get_ipython()
1250 1254 json_formatter = ip.display_formatter.formatters['application/json']
1251 1255 json_formatter.enabled = True
1252 1256 lsmagic = ip.magic('lsmagic')
1253 1257 with warnings.catch_warnings(record=True) as w:
1254 1258 j = json_formatter(lsmagic)
1255 1259 assert sorted(j) == ["cell", "line"]
1256 1260 assert w == [] # no warnings
1257 1261
1258 1262
1259 1263 def test_strip_initial_indent():
1260 1264 def sii(s):
1261 1265 lines = s.splitlines()
1262 1266 return '\n'.join(code.strip_initial_indent(lines))
1263 1267
1264 1268 assert sii(" a = 1\nb = 2") == "a = 1\nb = 2"
1265 1269 assert sii(" a\n b\nc") == "a\n b\nc"
1266 1270 assert sii("a\n b") == "a\n b"
1267 1271
1268 1272 def test_logging_magic_quiet_from_arg():
1269 1273 _ip.config.LoggingMagics.quiet = False
1270 1274 lm = logging.LoggingMagics(shell=_ip)
1271 1275 with TemporaryDirectory() as td:
1272 1276 try:
1273 1277 with tt.AssertNotPrints(re.compile("Activating.*")):
1274 1278 lm.logstart('-q {}'.format(
1275 1279 os.path.join(td, "quiet_from_arg.log")))
1276 1280 finally:
1277 1281 _ip.logger.logstop()
1278 1282
1279 1283 def test_logging_magic_quiet_from_config():
1280 1284 _ip.config.LoggingMagics.quiet = True
1281 1285 lm = logging.LoggingMagics(shell=_ip)
1282 1286 with TemporaryDirectory() as td:
1283 1287 try:
1284 1288 with tt.AssertNotPrints(re.compile("Activating.*")):
1285 1289 lm.logstart(os.path.join(td, "quiet_from_config.log"))
1286 1290 finally:
1287 1291 _ip.logger.logstop()
1288 1292
1289 1293
1290 1294 def test_logging_magic_not_quiet():
1291 1295 _ip.config.LoggingMagics.quiet = False
1292 1296 lm = logging.LoggingMagics(shell=_ip)
1293 1297 with TemporaryDirectory() as td:
1294 1298 try:
1295 1299 with tt.AssertPrints(re.compile("Activating.*")):
1296 1300 lm.logstart(os.path.join(td, "not_quiet.log"))
1297 1301 finally:
1298 1302 _ip.logger.logstop()
1299 1303
1300 1304
1301 1305 def test_time_no_var_expand():
1302 1306 _ip.user_ns['a'] = 5
1303 1307 _ip.user_ns['b'] = []
1304 1308 _ip.magic('time b.append("{a}")')
1305 1309 assert _ip.user_ns['b'] == ['{a}']
1306 1310
1307 1311
1308 1312 # this is slow, put at the end for local testing.
1309 1313 def test_timeit_arguments():
1310 1314 "Test valid timeit arguments, should not cause SyntaxError (GH #1269)"
1311 1315 _ip.magic("timeit -n1 -r1 a=('#')")
1312 1316
1313 1317
1314 1318 TEST_MODULE = """
1315 1319 print('Loaded my_tmp')
1316 1320 if __name__ == "__main__":
1317 1321 print('I just ran a script')
1318 1322 """
1319 1323
1320 1324
1321 1325 def test_run_module_from_import_hook():
1322 1326 "Test that a module can be loaded via an import hook"
1323 1327 with TemporaryDirectory() as tmpdir:
1324 1328 fullpath = os.path.join(tmpdir, 'my_tmp.py')
1325 1329 Path(fullpath).write_text(TEST_MODULE)
1326 1330
1327 1331 import importlib.abc
1328 1332 import importlib.util
1329 1333
1330 1334 class MyTempImporter(importlib.abc.MetaPathFinder, importlib.abc.SourceLoader):
1331 1335 def find_spec(self, fullname, path, target=None):
1332 1336 if fullname == "my_tmp":
1333 1337 return importlib.util.spec_from_loader(fullname, self)
1334 1338
1335 1339 def get_filename(self, fullname):
1336 1340 if fullname != "my_tmp":
1337 1341 raise ImportError(f"unexpected module name '{fullname}'")
1338 1342 return fullpath
1339 1343
1340 1344 def get_data(self, path):
1341 1345 if not Path(path).samefile(fullpath):
1342 1346 raise OSError(f"expected path '{fullpath}', got '{path}'")
1343 1347 return Path(fullpath).read_text()
1344 1348
1345 1349 sys.meta_path.insert(0, MyTempImporter())
1346 1350
1347 1351 with capture_output() as captured:
1348 1352 _ip.magic("run -m my_tmp")
1349 1353 _ip.run_cell("import my_tmp")
1350 1354
1351 1355 output = "Loaded my_tmp\nI just ran a script\nLoaded my_tmp\n"
1352 1356 assert output == captured.stdout
1353 1357
1354 1358 sys.meta_path.pop(0)
@@ -1,695 +1,698 b''
1 1 """IPython terminal interface using prompt_toolkit"""
2 2
3 3 import asyncio
4 4 import os
5 5 import sys
6 6 import warnings
7 7 from warnings import warn
8 8
9 9 from IPython.core.interactiveshell import InteractiveShell, InteractiveShellABC
10 10 from IPython.utils import io
11 11 from IPython.utils.py3compat import input
12 12 from IPython.utils.terminal import toggle_set_term_title, set_term_title, restore_term_title
13 13 from IPython.utils.process import abbrev_cwd
14 14 from traitlets import (
15 15 Bool,
16 16 Unicode,
17 17 Dict,
18 18 Integer,
19 19 observe,
20 20 Instance,
21 21 Type,
22 22 default,
23 23 Enum,
24 24 Union,
25 25 Any,
26 26 validate,
27 27 Float,
28 28 )
29 29
30 30 from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
31 31 from prompt_toolkit.enums import DEFAULT_BUFFER, EditingMode
32 32 from prompt_toolkit.filters import (HasFocus, Condition, IsDone)
33 33 from prompt_toolkit.formatted_text import PygmentsTokens
34 34 from prompt_toolkit.history import InMemoryHistory
35 35 from prompt_toolkit.layout.processors import ConditionalProcessor, HighlightMatchingBracketProcessor
36 36 from prompt_toolkit.output import ColorDepth
37 37 from prompt_toolkit.patch_stdout import patch_stdout
38 38 from prompt_toolkit.shortcuts import PromptSession, CompleteStyle, print_formatted_text
39 39 from prompt_toolkit.styles import DynamicStyle, merge_styles
40 40 from prompt_toolkit.styles.pygments import style_from_pygments_cls, style_from_pygments_dict
41 41 from prompt_toolkit import __version__ as ptk_version
42 42
43 43 from pygments.styles import get_style_by_name
44 44 from pygments.style import Style
45 45 from pygments.token import Token
46 46
47 47 from .debugger import TerminalPdb, Pdb
48 48 from .magics import TerminalMagics
49 49 from .pt_inputhooks import get_inputhook_name_and_func
50 50 from .prompts import Prompts, ClassicPrompts, RichPromptDisplayHook
51 51 from .ptutils import IPythonPTCompleter, IPythonPTLexer
52 52 from .shortcuts import create_ipython_shortcuts
53 53
54 54 DISPLAY_BANNER_DEPRECATED = object()
55 55 PTK3 = ptk_version.startswith('3.')
56 56
57 57
58 58 class _NoStyle(Style): pass
59 59
60 60
61 61
62 62 _style_overrides_light_bg = {
63 63 Token.Prompt: '#ansibrightblue',
64 64 Token.PromptNum: '#ansiblue bold',
65 65 Token.OutPrompt: '#ansibrightred',
66 66 Token.OutPromptNum: '#ansired bold',
67 67 }
68 68
69 69 _style_overrides_linux = {
70 70 Token.Prompt: '#ansibrightgreen',
71 71 Token.PromptNum: '#ansigreen bold',
72 72 Token.OutPrompt: '#ansibrightred',
73 73 Token.OutPromptNum: '#ansired bold',
74 74 }
75 75
76 76 def get_default_editor():
77 77 try:
78 78 return os.environ['EDITOR']
79 79 except KeyError:
80 80 pass
81 81 except UnicodeError:
82 82 warn("$EDITOR environment variable is not pure ASCII. Using platform "
83 83 "default editor.")
84 84
85 85 if os.name == 'posix':
86 86 return 'vi' # the only one guaranteed to be there!
87 87 else:
88 88 return 'notepad' # same in Windows!
89 89
90 90 # conservatively check for tty
91 91 # overridden streams can result in things like:
92 92 # - sys.stdin = None
93 93 # - no isatty method
94 94 for _name in ('stdin', 'stdout', 'stderr'):
95 95 _stream = getattr(sys, _name)
96 96 if not _stream or not hasattr(_stream, 'isatty') or not _stream.isatty():
97 97 _is_tty = False
98 98 break
99 99 else:
100 100 _is_tty = True
101 101
102 102
103 103 _use_simple_prompt = ('IPY_TEST_SIMPLE_PROMPT' in os.environ) or (not _is_tty)
104 104
105 105 def black_reformat_handler(text_before_cursor):
106 106 import black
107 107 formatted_text = black.format_str(text_before_cursor, mode=black.FileMode())
108 108 if not text_before_cursor.endswith('\n') and formatted_text.endswith('\n'):
109 109 formatted_text = formatted_text[:-1]
110 110 return formatted_text
111 111
112 112
113 113 class TerminalInteractiveShell(InteractiveShell):
114 114 mime_renderers = Dict().tag(config=True)
115 115
116 116 space_for_menu = Integer(6, help='Number of line at the bottom of the screen '
117 117 'to reserve for the tab completion menu, '
118 118 'search history, ...etc, the height of '
119 119 'these menus will at most this value. '
120 120 'Increase it is you prefer long and skinny '
121 121 'menus, decrease for short and wide.'
122 122 ).tag(config=True)
123 123
124 124 pt_app = None
125 125 debugger_history = None
126 126
127 127 debugger_history_file = Unicode(
128 128 "~/.pdbhistory", help="File in which to store and read history"
129 129 ).tag(config=True)
130 130
131 131 simple_prompt = Bool(_use_simple_prompt,
132 132 help="""Use `raw_input` for the REPL, without completion and prompt colors.
133 133
134 134 Useful when controlling IPython as a subprocess, and piping STDIN/OUT/ERR. Known usage are:
135 135 IPython own testing machinery, and emacs inferior-shell integration through elpy.
136 136
137 137 This mode default to `True` if the `IPY_TEST_SIMPLE_PROMPT`
138 138 environment variable is set, or the current terminal is not a tty."""
139 139 ).tag(config=True)
140 140
141 141 @property
142 142 def debugger_cls(self):
143 143 return Pdb if self.simple_prompt else TerminalPdb
144 144
145 145 confirm_exit = Bool(True,
146 146 help="""
147 147 Set to confirm when you try to exit IPython with an EOF (Control-D
148 148 in Unix, Control-Z/Enter in Windows). By typing 'exit' or 'quit',
149 149 you can force a direct exit without any confirmation.""",
150 150 ).tag(config=True)
151 151
152 152 editing_mode = Unicode('emacs',
153 153 help="Shortcut style to use at the prompt. 'vi' or 'emacs'.",
154 154 ).tag(config=True)
155 155
156 156 emacs_bindings_in_vi_insert_mode = Bool(
157 157 True,
158 158 help="Add shortcuts from 'emacs' insert mode to 'vi' insert mode.",
159 159 ).tag(config=True)
160 160
161 161 modal_cursor = Bool(
162 162 True,
163 163 help="""
164 164 Cursor shape changes depending on vi mode: beam in vi insert mode,
165 165 block in nav mode, underscore in replace mode.""",
166 166 ).tag(config=True)
167 167
168 168 ttimeoutlen = Float(
169 169 0.01,
170 170 help="""The time in milliseconds that is waited for a key code
171 171 to complete.""",
172 172 ).tag(config=True)
173 173
174 174 timeoutlen = Float(
175 175 0.5,
176 176 help="""The time in milliseconds that is waited for a mapped key
177 177 sequence to complete.""",
178 178 ).tag(config=True)
179 179
180 180 autoformatter = Unicode(None,
181 181 help="Autoformatter to reformat Terminal code. Can be `'black'` or `None`",
182 182 allow_none=True
183 183 ).tag(config=True)
184 184
185 185 mouse_support = Bool(False,
186 186 help="Enable mouse support in the prompt\n(Note: prevents selecting text with the mouse)"
187 187 ).tag(config=True)
188 188
189 189 # We don't load the list of styles for the help string, because loading
190 190 # Pygments plugins takes time and can cause unexpected errors.
191 191 highlighting_style = Union([Unicode('legacy'), Type(klass=Style)],
192 192 help="""The name or class of a Pygments style to use for syntax
193 193 highlighting. To see available styles, run `pygmentize -L styles`."""
194 194 ).tag(config=True)
195 195
196 196 @validate('editing_mode')
197 197 def _validate_editing_mode(self, proposal):
198 198 if proposal['value'].lower() == 'vim':
199 199 proposal['value']= 'vi'
200 200 elif proposal['value'].lower() == 'default':
201 201 proposal['value']= 'emacs'
202 202
203 203 if hasattr(EditingMode, proposal['value'].upper()):
204 204 return proposal['value'].lower()
205 205
206 206 return self.editing_mode
207 207
208 208
209 209 @observe('editing_mode')
210 210 def _editing_mode(self, change):
211 211 if self.pt_app:
212 212 self.pt_app.editing_mode = getattr(EditingMode, change.new.upper())
213 213
214 214 @observe('autoformatter')
215 215 def _autoformatter_changed(self, change):
216 216 formatter = change.new
217 217 if formatter is None:
218 218 self.reformat_handler = lambda x:x
219 219 elif formatter == 'black':
220 220 self.reformat_handler = black_reformat_handler
221 221 else:
222 222 raise ValueError
223 223
224 224 @observe('highlighting_style')
225 225 @observe('colors')
226 226 def _highlighting_style_changed(self, change):
227 227 self.refresh_style()
228 228
229 229 def refresh_style(self):
230 230 self._style = self._make_style_from_name_or_cls(self.highlighting_style)
231 231
232 232
233 233 highlighting_style_overrides = Dict(
234 234 help="Override highlighting format for specific tokens"
235 235 ).tag(config=True)
236 236
237 237 true_color = Bool(False,
238 238 help="""Use 24bit colors instead of 256 colors in prompt highlighting.
239 239 If your terminal supports true color, the following command should
240 240 print ``TRUECOLOR`` in orange::
241 241
242 242 printf \"\\x1b[38;2;255;100;0mTRUECOLOR\\x1b[0m\\n\"
243 243 """,
244 244 ).tag(config=True)
245 245
246 246 editor = Unicode(get_default_editor(),
247 247 help="Set the editor used by IPython (default to $EDITOR/vi/notepad)."
248 248 ).tag(config=True)
249 249
250 250 prompts_class = Type(Prompts, help='Class used to generate Prompt token for prompt_toolkit').tag(config=True)
251 251
252 252 prompts = Instance(Prompts)
253 253
254 254 @default('prompts')
255 255 def _prompts_default(self):
256 256 return self.prompts_class(self)
257 257
258 258 # @observe('prompts')
259 259 # def _(self, change):
260 260 # self._update_layout()
261 261
262 262 @default('displayhook_class')
263 263 def _displayhook_class_default(self):
264 264 return RichPromptDisplayHook
265 265
266 266 term_title = Bool(True,
267 267 help="Automatically set the terminal title"
268 268 ).tag(config=True)
269 269
270 270 term_title_format = Unicode("IPython: {cwd}",
271 271 help="Customize the terminal title format. This is a python format string. " +
272 272 "Available substitutions are: {cwd}."
273 273 ).tag(config=True)
274 274
275 275 display_completions = Enum(('column', 'multicolumn','readlinelike'),
276 276 help= ( "Options for displaying tab completions, 'column', 'multicolumn', and "
277 277 "'readlinelike'. These options are for `prompt_toolkit`, see "
278 278 "`prompt_toolkit` documentation for more information."
279 279 ),
280 280 default_value='multicolumn').tag(config=True)
281 281
282 282 highlight_matching_brackets = Bool(True,
283 283 help="Highlight matching brackets.",
284 284 ).tag(config=True)
285 285
286 286 extra_open_editor_shortcuts = Bool(False,
287 287 help="Enable vi (v) or Emacs (C-X C-E) shortcuts to open an external editor. "
288 288 "This is in addition to the F2 binding, which is always enabled."
289 289 ).tag(config=True)
290 290
291 291 handle_return = Any(None,
292 292 help="Provide an alternative handler to be called when the user presses "
293 293 "Return. This is an advanced option intended for debugging, which "
294 294 "may be changed or removed in later releases."
295 295 ).tag(config=True)
296 296
297 297 enable_history_search = Bool(True,
298 298 help="Allows to enable/disable the prompt toolkit history search"
299 299 ).tag(config=True)
300 300
301 301 prompt_includes_vi_mode = Bool(True,
302 302 help="Display the current vi mode (when using vi editing mode)."
303 303 ).tag(config=True)
304 304
305 305 @observe('term_title')
306 306 def init_term_title(self, change=None):
307 307 # Enable or disable the terminal title.
308 308 if self.term_title:
309 309 toggle_set_term_title(True)
310 310 set_term_title(self.term_title_format.format(cwd=abbrev_cwd()))
311 311 else:
312 312 toggle_set_term_title(False)
313 313
314 314 def restore_term_title(self):
315 315 if self.term_title:
316 316 restore_term_title()
317 317
318 318 def init_display_formatter(self):
319 319 super(TerminalInteractiveShell, self).init_display_formatter()
320 320 # terminal only supports plain text
321 321 self.display_formatter.active_types = ['text/plain']
322 322 # disable `_ipython_display_`
323 323 self.display_formatter.ipython_display_formatter.enabled = False
324 324
325 325 def init_prompt_toolkit_cli(self):
326 326 if self.simple_prompt:
327 327 # Fall back to plain non-interactive output for tests.
328 328 # This is very limited.
329 329 def prompt():
330 330 prompt_text = "".join(x[1] for x in self.prompts.in_prompt_tokens())
331 331 lines = [input(prompt_text)]
332 332 prompt_continuation = "".join(x[1] for x in self.prompts.continuation_prompt_tokens())
333 333 while self.check_complete('\n'.join(lines))[0] == 'incomplete':
334 334 lines.append( input(prompt_continuation) )
335 335 return '\n'.join(lines)
336 336 self.prompt_for_code = prompt
337 337 return
338 338
339 339 # Set up keyboard shortcuts
340 340 key_bindings = create_ipython_shortcuts(self)
341 341
342 342 # Pre-populate history from IPython's history database
343 343 history = InMemoryHistory()
344 344 last_cell = u""
345 345 for __, ___, cell in self.history_manager.get_tail(self.history_load_length,
346 346 include_latest=True):
347 347 # Ignore blank lines and consecutive duplicates
348 348 cell = cell.rstrip()
349 349 if cell and (cell != last_cell):
350 350 history.append_string(cell)
351 351 last_cell = cell
352 352
353 353 self._style = self._make_style_from_name_or_cls(self.highlighting_style)
354 354 self.style = DynamicStyle(lambda: self._style)
355 355
356 356 editing_mode = getattr(EditingMode, self.editing_mode.upper())
357 357
358 358 self.pt_loop = asyncio.new_event_loop()
359 359 self.pt_app = PromptSession(
360 360 auto_suggest=AutoSuggestFromHistory(),
361 361 editing_mode=editing_mode,
362 362 key_bindings=key_bindings,
363 363 history=history,
364 364 completer=IPythonPTCompleter(shell=self),
365 365 enable_history_search=self.enable_history_search,
366 366 style=self.style,
367 367 include_default_pygments_style=False,
368 368 mouse_support=self.mouse_support,
369 369 enable_open_in_editor=self.extra_open_editor_shortcuts,
370 370 color_depth=self.color_depth,
371 371 tempfile_suffix=".py",
372 372 **self._extra_prompt_options()
373 373 )
374 374
375 375 def _make_style_from_name_or_cls(self, name_or_cls):
376 376 """
377 377 Small wrapper that make an IPython compatible style from a style name
378 378
379 379 We need that to add style for prompt ... etc.
380 380 """
381 381 style_overrides = {}
382 382 if name_or_cls == 'legacy':
383 383 legacy = self.colors.lower()
384 384 if legacy == 'linux':
385 385 style_cls = get_style_by_name('monokai')
386 386 style_overrides = _style_overrides_linux
387 387 elif legacy == 'lightbg':
388 388 style_overrides = _style_overrides_light_bg
389 389 style_cls = get_style_by_name('pastie')
390 390 elif legacy == 'neutral':
391 391 # The default theme needs to be visible on both a dark background
392 392 # and a light background, because we can't tell what the terminal
393 393 # looks like. These tweaks to the default theme help with that.
394 394 style_cls = get_style_by_name('default')
395 395 style_overrides.update({
396 396 Token.Number: '#ansigreen',
397 397 Token.Operator: 'noinherit',
398 398 Token.String: '#ansiyellow',
399 399 Token.Name.Function: '#ansiblue',
400 400 Token.Name.Class: 'bold #ansiblue',
401 401 Token.Name.Namespace: 'bold #ansiblue',
402 402 Token.Name.Variable.Magic: '#ansiblue',
403 403 Token.Prompt: '#ansigreen',
404 404 Token.PromptNum: '#ansibrightgreen bold',
405 405 Token.OutPrompt: '#ansired',
406 406 Token.OutPromptNum: '#ansibrightred bold',
407 407 })
408 408
409 409 # Hack: Due to limited color support on the Windows console
410 410 # the prompt colors will be wrong without this
411 411 if os.name == 'nt':
412 412 style_overrides.update({
413 413 Token.Prompt: '#ansidarkgreen',
414 414 Token.PromptNum: '#ansigreen bold',
415 415 Token.OutPrompt: '#ansidarkred',
416 416 Token.OutPromptNum: '#ansired bold',
417 417 })
418 418 elif legacy =='nocolor':
419 419 style_cls=_NoStyle
420 420 style_overrides = {}
421 421 else :
422 422 raise ValueError('Got unknown colors: ', legacy)
423 423 else :
424 424 if isinstance(name_or_cls, str):
425 425 style_cls = get_style_by_name(name_or_cls)
426 426 else:
427 427 style_cls = name_or_cls
428 428 style_overrides = {
429 429 Token.Prompt: '#ansigreen',
430 430 Token.PromptNum: '#ansibrightgreen bold',
431 431 Token.OutPrompt: '#ansired',
432 432 Token.OutPromptNum: '#ansibrightred bold',
433 433 }
434 434 style_overrides.update(self.highlighting_style_overrides)
435 435 style = merge_styles([
436 436 style_from_pygments_cls(style_cls),
437 437 style_from_pygments_dict(style_overrides),
438 438 ])
439 439
440 440 return style
441 441
442 442 @property
443 443 def pt_complete_style(self):
444 444 return {
445 445 'multicolumn': CompleteStyle.MULTI_COLUMN,
446 446 'column': CompleteStyle.COLUMN,
447 447 'readlinelike': CompleteStyle.READLINE_LIKE,
448 448 }[self.display_completions]
449 449
450 450 @property
451 451 def color_depth(self):
452 452 return (ColorDepth.TRUE_COLOR if self.true_color else None)
453 453
454 454 def _extra_prompt_options(self):
455 455 """
456 456 Return the current layout option for the current Terminal InteractiveShell
457 457 """
458 458 def get_message():
459 459 return PygmentsTokens(self.prompts.in_prompt_tokens())
460 460
461 461 if self.editing_mode == 'emacs':
462 462 # with emacs mode the prompt is (usually) static, so we call only
463 463 # the function once. With VI mode it can toggle between [ins] and
464 464 # [nor] so we can't precompute.
465 465 # here I'm going to favor the default keybinding which almost
466 466 # everybody uses to decrease CPU usage.
467 467 # if we have issues with users with custom Prompts we can see how to
468 468 # work around this.
469 469 get_message = get_message()
470 470
471 471 options = {
472 472 'complete_in_thread': False,
473 473 'lexer':IPythonPTLexer(),
474 474 'reserve_space_for_menu':self.space_for_menu,
475 475 'message': get_message,
476 476 'prompt_continuation': (
477 477 lambda width, lineno, is_soft_wrap:
478 478 PygmentsTokens(self.prompts.continuation_prompt_tokens(width))),
479 479 'multiline': True,
480 480 'complete_style': self.pt_complete_style,
481 481
482 482 # Highlight matching brackets, but only when this setting is
483 483 # enabled, and only when the DEFAULT_BUFFER has the focus.
484 484 'input_processors': [ConditionalProcessor(
485 485 processor=HighlightMatchingBracketProcessor(chars='[](){}'),
486 486 filter=HasFocus(DEFAULT_BUFFER) & ~IsDone() &
487 487 Condition(lambda: self.highlight_matching_brackets))],
488 488 }
489 489 if not PTK3:
490 490 options['inputhook'] = self.inputhook
491 491
492 492 return options
493 493
494 494 def prompt_for_code(self):
495 495 if self.rl_next_input:
496 496 default = self.rl_next_input
497 497 self.rl_next_input = None
498 498 else:
499 499 default = ''
500 500
501 501 # In order to make sure that asyncio code written in the
502 502 # interactive shell doesn't interfere with the prompt, we run the
503 503 # prompt in a different event loop.
504 504 # If we don't do this, people could spawn coroutine with a
505 505 # while/true inside which will freeze the prompt.
506 506
507 policy = asyncio.get_event_loop_policy()
507 508 try:
508 old_loop = asyncio.get_running_loop()
509 old_loop = policy.get_event_loop()
509 510 except RuntimeError:
510 # This happens when the user used `asyncio.run()`.
511 # This happens when the the event loop is closed,
512 # e.g. by calling `asyncio.run()`.
511 513 old_loop = None
512 514
513 asyncio.set_event_loop(self.pt_loop)
515 policy.set_event_loop(self.pt_loop)
514 516 try:
515 517 with patch_stdout(raw=True):
516 518 text = self.pt_app.prompt(
517 519 default=default,
518 520 **self._extra_prompt_options())
519 521 finally:
520 522 # Restore the original event loop.
521 asyncio.set_event_loop(old_loop)
523 if old_loop is not None:
524 policy.set_event_loop(old_loop)
522 525
523 526 return text
524 527
525 528 def enable_win_unicode_console(self):
526 529 # Since IPython 7.10 doesn't support python < 3.6 and PEP 528, Python uses the unicode APIs for the Windows
527 530 # console by default, so WUC shouldn't be needed.
528 531 from warnings import warn
529 532 warn("`enable_win_unicode_console` is deprecated since IPython 7.10, does not do anything and will be removed in the future",
530 533 DeprecationWarning,
531 534 stacklevel=2)
532 535
533 536 def init_io(self):
534 537 if sys.platform not in {'win32', 'cli'}:
535 538 return
536 539
537 540 import colorama
538 541 colorama.init()
539 542
540 543 # For some reason we make these wrappers around stdout/stderr.
541 544 # For now, we need to reset them so all output gets coloured.
542 545 # https://github.com/ipython/ipython/issues/8669
543 546 # io.std* are deprecated, but don't show our own deprecation warnings
544 547 # during initialization of the deprecated API.
545 548 with warnings.catch_warnings():
546 549 warnings.simplefilter('ignore', DeprecationWarning)
547 550 io.stdout = io.IOStream(sys.stdout)
548 551 io.stderr = io.IOStream(sys.stderr)
549 552
550 553 def init_magics(self):
551 554 super(TerminalInteractiveShell, self).init_magics()
552 555 self.register_magics(TerminalMagics)
553 556
554 557 def init_alias(self):
555 558 # The parent class defines aliases that can be safely used with any
556 559 # frontend.
557 560 super(TerminalInteractiveShell, self).init_alias()
558 561
559 562 # Now define aliases that only make sense on the terminal, because they
560 563 # need direct access to the console in a way that we can't emulate in
561 564 # GUI or web frontend
562 565 if os.name == 'posix':
563 566 for cmd in ('clear', 'more', 'less', 'man'):
564 567 self.alias_manager.soft_define_alias(cmd, cmd)
565 568
566 569
567 570 def __init__(self, *args, **kwargs):
568 571 super(TerminalInteractiveShell, self).__init__(*args, **kwargs)
569 572 self.init_prompt_toolkit_cli()
570 573 self.init_term_title()
571 574 self.keep_running = True
572 575
573 576
574 577 def ask_exit(self):
575 578 self.keep_running = False
576 579
577 580 rl_next_input = None
578 581
579 582 def interact(self, display_banner=DISPLAY_BANNER_DEPRECATED):
580 583
581 584 if display_banner is not DISPLAY_BANNER_DEPRECATED:
582 585 warn('interact `display_banner` argument is deprecated since IPython 5.0. Call `show_banner()` if needed.', DeprecationWarning, stacklevel=2)
583 586
584 587 self.keep_running = True
585 588 while self.keep_running:
586 589 print(self.separate_in, end='')
587 590
588 591 try:
589 592 code = self.prompt_for_code()
590 593 except EOFError:
591 594 if (not self.confirm_exit) \
592 595 or self.ask_yes_no('Do you really want to exit ([y]/n)?','y','n'):
593 596 self.ask_exit()
594 597
595 598 else:
596 599 if code:
597 600 self.run_cell(code, store_history=True)
598 601
599 602 def mainloop(self, display_banner=DISPLAY_BANNER_DEPRECATED):
600 603 # An extra layer of protection in case someone mashing Ctrl-C breaks
601 604 # out of our internal code.
602 605 if display_banner is not DISPLAY_BANNER_DEPRECATED:
603 606 warn('mainloop `display_banner` argument is deprecated since IPython 5.0. Call `show_banner()` if needed.', DeprecationWarning, stacklevel=2)
604 607 while True:
605 608 try:
606 609 self.interact()
607 610 break
608 611 except KeyboardInterrupt as e:
609 612 print("\n%s escaped interact()\n" % type(e).__name__)
610 613 finally:
611 614 # An interrupt during the eventloop will mess up the
612 615 # internal state of the prompt_toolkit library.
613 616 # Stopping the eventloop fixes this, see
614 617 # https://github.com/ipython/ipython/pull/9867
615 618 if hasattr(self, '_eventloop'):
616 619 self._eventloop.stop()
617 620
618 621 self.restore_term_title()
619 622
620 623 # try to call some at-exit operation optimistically as some things can't
621 624 # be done during interpreter shutdown. this is technically inaccurate as
622 625 # this make mainlool not re-callable, but that should be a rare if not
623 626 # in existent use case.
624 627
625 628 self._atexit_once()
626 629
627 630
628 631 _inputhook = None
629 632 def inputhook(self, context):
630 633 if self._inputhook is not None:
631 634 self._inputhook(context)
632 635
633 636 active_eventloop = None
634 637 def enable_gui(self, gui=None):
635 638 if gui and (gui != 'inline') :
636 639 self.active_eventloop, self._inputhook =\
637 640 get_inputhook_name_and_func(gui)
638 641 else:
639 642 self.active_eventloop = self._inputhook = None
640 643
641 644 # For prompt_toolkit 3.0. We have to create an asyncio event loop with
642 645 # this inputhook.
643 646 if PTK3:
644 647 import asyncio
645 648 from prompt_toolkit.eventloop import new_eventloop_with_inputhook
646 649
647 650 if gui == 'asyncio':
648 651 # When we integrate the asyncio event loop, run the UI in the
649 652 # same event loop as the rest of the code. don't use an actual
650 653 # input hook. (Asyncio is not made for nesting event loops.)
651 654 self.pt_loop = asyncio.get_event_loop_policy().get_event_loop()
652 655
653 656 elif self._inputhook:
654 657 # If an inputhook was set, create a new asyncio event loop with
655 658 # this inputhook for the prompt.
656 659 self.pt_loop = new_eventloop_with_inputhook(self._inputhook)
657 660 else:
658 661 # When there's no inputhook, run the prompt in a separate
659 662 # asyncio event loop.
660 663 self.pt_loop = asyncio.new_event_loop()
661 664
662 665 # Run !system commands directly, not through pipes, so terminal programs
663 666 # work correctly.
664 667 system = InteractiveShell.system_raw
665 668
666 669 def auto_rewrite_input(self, cmd):
667 670 """Overridden from the parent class to use fancy rewriting prompt"""
668 671 if not self.show_rewritten_input:
669 672 return
670 673
671 674 tokens = self.prompts.rewrite_prompt_tokens()
672 675 if self.pt_app:
673 676 print_formatted_text(PygmentsTokens(tokens), end='',
674 677 style=self.pt_app.app.style)
675 678 print(cmd)
676 679 else:
677 680 prompt = ''.join(s for t, s in tokens)
678 681 print(prompt, cmd, sep='')
679 682
680 683 _prompts_before = None
681 684 def switch_doctest_mode(self, mode):
682 685 """Switch prompts to classic for %doctest_mode"""
683 686 if mode:
684 687 self._prompts_before = self.prompts
685 688 self.prompts = ClassicPrompts(self)
686 689 elif self._prompts_before:
687 690 self.prompts = self._prompts_before
688 691 self._prompts_before = None
689 692 # self._update_layout()
690 693
691 694
692 695 InteractiveShellABC.register(TerminalInteractiveShell)
693 696
694 697 if __name__ == '__main__':
695 698 TerminalInteractiveShell.instance().interact()
General Comments 0
You need to be logged in to leave comments. Login now