##// END OF EJS Templates
MAINT: mock slowest test....
Matthias Bussonnier -
Show More
@@ -1,1112 +1,1129 b''
1 1 # -*- coding: utf-8 -*-
2 2 """Tests for the key interactiveshell module.
3 3
4 4 Historically the main classes in interactiveshell have been under-tested. This
5 5 module should grow as many single-method tests as possible to trap many of the
6 6 recurring bugs we seem to encounter with high-level interaction.
7 7 """
8 8
9 9 # Copyright (c) IPython Development Team.
10 10 # Distributed under the terms of the Modified BSD License.
11 11
12 12 import asyncio
13 13 import ast
14 14 import os
15 15 import signal
16 16 import shutil
17 17 import sys
18 18 import tempfile
19 19 import unittest
20 import pytest
20 21 from unittest import mock
21 22
22 23 from os.path import join
23 24
24 25 from IPython.core.error import InputRejected
25 26 from IPython.core.inputtransformer import InputTransformer
26 27 from IPython.core import interactiveshell
27 28 from IPython.testing.decorators import (
28 29 skipif, skip_win32, onlyif_unicode_paths, onlyif_cmds_exist,
29 30 )
30 31 from IPython.testing import tools as tt
31 32 from IPython.utils.process import find_cmd
32 33
33 34 #-----------------------------------------------------------------------------
34 35 # Globals
35 36 #-----------------------------------------------------------------------------
36 37 # This is used by every single test, no point repeating it ad nauseam
37 38
38 39 #-----------------------------------------------------------------------------
39 40 # Tests
40 41 #-----------------------------------------------------------------------------
41 42
42 43 class DerivedInterrupt(KeyboardInterrupt):
43 44 pass
44 45
45 46 class InteractiveShellTestCase(unittest.TestCase):
46 47 def test_naked_string_cells(self):
47 48 """Test that cells with only naked strings are fully executed"""
48 49 # First, single-line inputs
49 50 ip.run_cell('"a"\n')
50 51 self.assertEqual(ip.user_ns['_'], 'a')
51 52 # And also multi-line cells
52 53 ip.run_cell('"""a\nb"""\n')
53 54 self.assertEqual(ip.user_ns['_'], 'a\nb')
54 55
55 56 def test_run_empty_cell(self):
56 57 """Just make sure we don't get a horrible error with a blank
57 58 cell of input. Yes, I did overlook that."""
58 59 old_xc = ip.execution_count
59 60 res = ip.run_cell('')
60 61 self.assertEqual(ip.execution_count, old_xc)
61 62 self.assertEqual(res.execution_count, None)
62 63
63 64 def test_run_cell_multiline(self):
64 65 """Multi-block, multi-line cells must execute correctly.
65 66 """
66 67 src = '\n'.join(["x=1",
67 68 "y=2",
68 69 "if 1:",
69 70 " x += 1",
70 71 " y += 1",])
71 72 res = ip.run_cell(src)
72 73 self.assertEqual(ip.user_ns['x'], 2)
73 74 self.assertEqual(ip.user_ns['y'], 3)
74 75 self.assertEqual(res.success, True)
75 76 self.assertEqual(res.result, None)
76 77
77 78 def test_multiline_string_cells(self):
78 79 "Code sprinkled with multiline strings should execute (GH-306)"
79 80 ip.run_cell('tmp=0')
80 81 self.assertEqual(ip.user_ns['tmp'], 0)
81 82 res = ip.run_cell('tmp=1;"""a\nb"""\n')
82 83 self.assertEqual(ip.user_ns['tmp'], 1)
83 84 self.assertEqual(res.success, True)
84 85 self.assertEqual(res.result, "a\nb")
85 86
86 87 def test_dont_cache_with_semicolon(self):
87 88 "Ending a line with semicolon should not cache the returned object (GH-307)"
88 89 oldlen = len(ip.user_ns['Out'])
89 90 for cell in ['1;', '1;1;']:
90 91 res = ip.run_cell(cell, store_history=True)
91 92 newlen = len(ip.user_ns['Out'])
92 93 self.assertEqual(oldlen, newlen)
93 94 self.assertIsNone(res.result)
94 95 i = 0
95 96 #also test the default caching behavior
96 97 for cell in ['1', '1;1']:
97 98 ip.run_cell(cell, store_history=True)
98 99 newlen = len(ip.user_ns['Out'])
99 100 i += 1
100 101 self.assertEqual(oldlen+i, newlen)
101 102
102 103 def test_syntax_error(self):
103 104 res = ip.run_cell("raise = 3")
104 105 self.assertIsInstance(res.error_before_exec, SyntaxError)
105 106
106 107 def test_open_standard_input_stream(self):
107 108 res = ip.run_cell("open(0)")
108 109 self.assertIsInstance(res.error_in_exec, ValueError)
109 110
110 111 def test_open_standard_output_stream(self):
111 112 res = ip.run_cell("open(1)")
112 113 self.assertIsInstance(res.error_in_exec, ValueError)
113 114
114 115 def test_open_standard_error_stream(self):
115 116 res = ip.run_cell("open(2)")
116 117 self.assertIsInstance(res.error_in_exec, ValueError)
117 118
118 119 def test_In_variable(self):
119 120 "Verify that In variable grows with user input (GH-284)"
120 121 oldlen = len(ip.user_ns['In'])
121 122 ip.run_cell('1;', store_history=True)
122 123 newlen = len(ip.user_ns['In'])
123 124 self.assertEqual(oldlen+1, newlen)
124 125 self.assertEqual(ip.user_ns['In'][-1],'1;')
125 126
126 127 def test_magic_names_in_string(self):
127 128 ip.run_cell('a = """\n%exit\n"""')
128 129 self.assertEqual(ip.user_ns['a'], '\n%exit\n')
129 130
130 131 def test_trailing_newline(self):
131 132 """test that running !(command) does not raise a SyntaxError"""
132 133 ip.run_cell('!(true)\n', False)
133 134 ip.run_cell('!(true)\n\n\n', False)
134 135
135 136 def test_gh_597(self):
136 137 """Pretty-printing lists of objects with non-ascii reprs may cause
137 138 problems."""
138 139 class Spam(object):
139 140 def __repr__(self):
140 141 return "\xe9"*50
141 142 import IPython.core.formatters
142 143 f = IPython.core.formatters.PlainTextFormatter()
143 144 f([Spam(),Spam()])
144 145
145 146
146 147 def test_future_flags(self):
147 148 """Check that future flags are used for parsing code (gh-777)"""
148 149 ip.run_cell('from __future__ import barry_as_FLUFL')
149 150 try:
150 151 ip.run_cell('prfunc_return_val = 1 <> 2')
151 152 assert 'prfunc_return_val' in ip.user_ns
152 153 finally:
153 154 # Reset compiler flags so we don't mess up other tests.
154 155 ip.compile.reset_compiler_flags()
155 156
156 157 def test_can_pickle(self):
157 158 "Can we pickle objects defined interactively (GH-29)"
158 159 ip = get_ipython()
159 160 ip.reset()
160 161 ip.run_cell(("class Mylist(list):\n"
161 162 " def __init__(self,x=[]):\n"
162 163 " list.__init__(self,x)"))
163 164 ip.run_cell("w=Mylist([1,2,3])")
164 165
165 166 from pickle import dumps
166 167
167 168 # We need to swap in our main module - this is only necessary
168 169 # inside the test framework, because IPython puts the interactive module
169 170 # in place (but the test framework undoes this).
170 171 _main = sys.modules['__main__']
171 172 sys.modules['__main__'] = ip.user_module
172 173 try:
173 174 res = dumps(ip.user_ns["w"])
174 175 finally:
175 176 sys.modules['__main__'] = _main
176 177 self.assertTrue(isinstance(res, bytes))
177 178
178 179 def test_global_ns(self):
179 180 "Code in functions must be able to access variables outside them."
180 181 ip = get_ipython()
181 182 ip.run_cell("a = 10")
182 183 ip.run_cell(("def f(x):\n"
183 184 " return x + a"))
184 185 ip.run_cell("b = f(12)")
185 186 self.assertEqual(ip.user_ns["b"], 22)
186 187
187 188 def test_bad_custom_tb(self):
188 189 """Check that InteractiveShell is protected from bad custom exception handlers"""
189 190 ip.set_custom_exc((IOError,), lambda etype,value,tb: 1/0)
190 191 self.assertEqual(ip.custom_exceptions, (IOError,))
191 192 with tt.AssertPrints("Custom TB Handler failed", channel='stderr'):
192 193 ip.run_cell(u'raise IOError("foo")')
193 194 self.assertEqual(ip.custom_exceptions, ())
194 195
195 196 def test_bad_custom_tb_return(self):
196 197 """Check that InteractiveShell is protected from bad return types in custom exception handlers"""
197 198 ip.set_custom_exc((NameError,),lambda etype,value,tb, tb_offset=None: 1)
198 199 self.assertEqual(ip.custom_exceptions, (NameError,))
199 200 with tt.AssertPrints("Custom TB Handler failed", channel='stderr'):
200 201 ip.run_cell(u'a=abracadabra')
201 202 self.assertEqual(ip.custom_exceptions, ())
202 203
203 204 def test_drop_by_id(self):
204 205 myvars = {"a":object(), "b":object(), "c": object()}
205 206 ip.push(myvars, interactive=False)
206 207 for name in myvars:
207 208 assert name in ip.user_ns, name
208 209 assert name in ip.user_ns_hidden, name
209 210 ip.user_ns['b'] = 12
210 211 ip.drop_by_id(myvars)
211 212 for name in ["a", "c"]:
212 213 assert name not in ip.user_ns, name
213 214 assert name not in ip.user_ns_hidden, name
214 215 assert ip.user_ns['b'] == 12
215 216 ip.reset()
216 217
217 218 def test_var_expand(self):
218 219 ip.user_ns['f'] = u'Ca\xf1o'
219 220 self.assertEqual(ip.var_expand(u'echo $f'), u'echo Ca\xf1o')
220 221 self.assertEqual(ip.var_expand(u'echo {f}'), u'echo Ca\xf1o')
221 222 self.assertEqual(ip.var_expand(u'echo {f[:-1]}'), u'echo Ca\xf1')
222 223 self.assertEqual(ip.var_expand(u'echo {1*2}'), u'echo 2')
223 224
224 225 self.assertEqual(ip.var_expand(u"grep x | awk '{print $1}'"), u"grep x | awk '{print $1}'")
225 226
226 227 ip.user_ns['f'] = b'Ca\xc3\xb1o'
227 228 # This should not raise any exception:
228 229 ip.var_expand(u'echo $f')
229 230
230 231 def test_var_expand_local(self):
231 232 """Test local variable expansion in !system and %magic calls"""
232 233 # !system
233 234 ip.run_cell(
234 235 "def test():\n"
235 236 ' lvar = "ttt"\n'
236 237 " ret = !echo {lvar}\n"
237 238 " return ret[0]\n"
238 239 )
239 240 res = ip.user_ns["test"]()
240 241 self.assertIn("ttt", res)
241 242
242 243 # %magic
243 244 ip.run_cell(
244 245 "def makemacro():\n"
245 246 ' macroname = "macro_var_expand_locals"\n'
246 247 " %macro {macroname} codestr\n"
247 248 )
248 249 ip.user_ns["codestr"] = "str(12)"
249 250 ip.run_cell("makemacro()")
250 251 self.assertIn("macro_var_expand_locals", ip.user_ns)
251 252
252 253 def test_var_expand_self(self):
253 254 """Test variable expansion with the name 'self', which was failing.
254 255
255 256 See https://github.com/ipython/ipython/issues/1878#issuecomment-7698218
256 257 """
257 258 ip.run_cell(
258 259 "class cTest:\n"
259 260 ' classvar="see me"\n'
260 261 " def test(self):\n"
261 262 " res = !echo Variable: {self.classvar}\n"
262 263 " return res[0]\n"
263 264 )
264 265 self.assertIn("see me", ip.user_ns["cTest"]().test())
265 266
266 267 def test_bad_var_expand(self):
267 268 """var_expand on invalid formats shouldn't raise"""
268 269 # SyntaxError
269 270 self.assertEqual(ip.var_expand(u"{'a':5}"), u"{'a':5}")
270 271 # NameError
271 272 self.assertEqual(ip.var_expand(u"{asdf}"), u"{asdf}")
272 273 # ZeroDivisionError
273 274 self.assertEqual(ip.var_expand(u"{1/0}"), u"{1/0}")
274 275
275 276 def test_silent_postexec(self):
276 277 """run_cell(silent=True) doesn't invoke pre/post_run_cell callbacks"""
277 278 pre_explicit = mock.Mock()
278 279 pre_always = mock.Mock()
279 280 post_explicit = mock.Mock()
280 281 post_always = mock.Mock()
281 282 all_mocks = [pre_explicit, pre_always, post_explicit, post_always]
282 283
283 284 ip.events.register('pre_run_cell', pre_explicit)
284 285 ip.events.register('pre_execute', pre_always)
285 286 ip.events.register('post_run_cell', post_explicit)
286 287 ip.events.register('post_execute', post_always)
287 288
288 289 try:
289 290 ip.run_cell("1", silent=True)
290 291 assert pre_always.called
291 292 assert not pre_explicit.called
292 293 assert post_always.called
293 294 assert not post_explicit.called
294 295 # double-check that non-silent exec did what we expected
295 296 # silent to avoid
296 297 ip.run_cell("1")
297 298 assert pre_explicit.called
298 299 assert post_explicit.called
299 300 info, = pre_explicit.call_args[0]
300 301 result, = post_explicit.call_args[0]
301 302 self.assertEqual(info, result.info)
302 303 # check that post hooks are always called
303 304 [m.reset_mock() for m in all_mocks]
304 305 ip.run_cell("syntax error")
305 306 assert pre_always.called
306 307 assert pre_explicit.called
307 308 assert post_always.called
308 309 assert post_explicit.called
309 310 info, = pre_explicit.call_args[0]
310 311 result, = post_explicit.call_args[0]
311 312 self.assertEqual(info, result.info)
312 313 finally:
313 314 # remove post-exec
314 315 ip.events.unregister('pre_run_cell', pre_explicit)
315 316 ip.events.unregister('pre_execute', pre_always)
316 317 ip.events.unregister('post_run_cell', post_explicit)
317 318 ip.events.unregister('post_execute', post_always)
318 319
319 320 def test_silent_noadvance(self):
320 321 """run_cell(silent=True) doesn't advance execution_count"""
321 322 ec = ip.execution_count
322 323 # silent should force store_history=False
323 324 ip.run_cell("1", store_history=True, silent=True)
324 325
325 326 self.assertEqual(ec, ip.execution_count)
326 327 # double-check that non-silent exec did what we expected
327 328 # silent to avoid
328 329 ip.run_cell("1", store_history=True)
329 330 self.assertEqual(ec+1, ip.execution_count)
330 331
331 332 def test_silent_nodisplayhook(self):
332 333 """run_cell(silent=True) doesn't trigger displayhook"""
333 334 d = dict(called=False)
334 335
335 336 trap = ip.display_trap
336 337 save_hook = trap.hook
337 338
338 339 def failing_hook(*args, **kwargs):
339 340 d['called'] = True
340 341
341 342 try:
342 343 trap.hook = failing_hook
343 344 res = ip.run_cell("1", silent=True)
344 345 self.assertFalse(d['called'])
345 346 self.assertIsNone(res.result)
346 347 # double-check that non-silent exec did what we expected
347 348 # silent to avoid
348 349 ip.run_cell("1")
349 350 self.assertTrue(d['called'])
350 351 finally:
351 352 trap.hook = save_hook
352 353
353 354 def test_ofind_line_magic(self):
354 355 from IPython.core.magic import register_line_magic
355 356
356 357 @register_line_magic
357 358 def lmagic(line):
358 359 "A line magic"
359 360
360 361 # Get info on line magic
361 362 lfind = ip._ofind("lmagic")
362 363 info = dict(
363 364 found=True,
364 365 isalias=False,
365 366 ismagic=True,
366 367 namespace="IPython internal",
367 368 obj=lmagic,
368 369 parent=None,
369 370 )
370 371 self.assertEqual(lfind, info)
371 372
372 373 def test_ofind_cell_magic(self):
373 374 from IPython.core.magic import register_cell_magic
374 375
375 376 @register_cell_magic
376 377 def cmagic(line, cell):
377 378 "A cell magic"
378 379
379 380 # Get info on cell magic
380 381 find = ip._ofind("cmagic")
381 382 info = dict(
382 383 found=True,
383 384 isalias=False,
384 385 ismagic=True,
385 386 namespace="IPython internal",
386 387 obj=cmagic,
387 388 parent=None,
388 389 )
389 390 self.assertEqual(find, info)
390 391
391 392 def test_ofind_property_with_error(self):
392 393 class A(object):
393 394 @property
394 395 def foo(self):
395 396 raise NotImplementedError() # pragma: no cover
396 397
397 398 a = A()
398 399
399 400 found = ip._ofind('a.foo', [('locals', locals())])
400 401 info = dict(found=True, isalias=False, ismagic=False,
401 402 namespace='locals', obj=A.foo, parent=a)
402 403 self.assertEqual(found, info)
403 404
404 405 def test_ofind_multiple_attribute_lookups(self):
405 406 class A(object):
406 407 @property
407 408 def foo(self):
408 409 raise NotImplementedError() # pragma: no cover
409 410
410 411 a = A()
411 412 a.a = A()
412 413 a.a.a = A()
413 414
414 415 found = ip._ofind('a.a.a.foo', [('locals', locals())])
415 416 info = dict(found=True, isalias=False, ismagic=False,
416 417 namespace='locals', obj=A.foo, parent=a.a.a)
417 418 self.assertEqual(found, info)
418 419
419 420 def test_ofind_slotted_attributes(self):
420 421 class A(object):
421 422 __slots__ = ['foo']
422 423 def __init__(self):
423 424 self.foo = 'bar'
424 425
425 426 a = A()
426 427 found = ip._ofind('a.foo', [('locals', locals())])
427 428 info = dict(found=True, isalias=False, ismagic=False,
428 429 namespace='locals', obj=a.foo, parent=a)
429 430 self.assertEqual(found, info)
430 431
431 432 found = ip._ofind('a.bar', [('locals', locals())])
432 433 info = dict(found=False, isalias=False, ismagic=False,
433 434 namespace=None, obj=None, parent=a)
434 435 self.assertEqual(found, info)
435 436
436 437 def test_ofind_prefers_property_to_instance_level_attribute(self):
437 438 class A(object):
438 439 @property
439 440 def foo(self):
440 441 return 'bar'
441 442 a = A()
442 443 a.__dict__["foo"] = "baz"
443 444 self.assertEqual(a.foo, "bar")
444 445 found = ip._ofind("a.foo", [("locals", locals())])
445 446 self.assertIs(found["obj"], A.foo)
446 447
447 448 def test_custom_syntaxerror_exception(self):
448 449 called = []
449 450 def my_handler(shell, etype, value, tb, tb_offset=None):
450 451 called.append(etype)
451 452 shell.showtraceback((etype, value, tb), tb_offset=tb_offset)
452 453
453 454 ip.set_custom_exc((SyntaxError,), my_handler)
454 455 try:
455 456 ip.run_cell("1f")
456 457 # Check that this was called, and only once.
457 458 self.assertEqual(called, [SyntaxError])
458 459 finally:
459 460 # Reset the custom exception hook
460 461 ip.set_custom_exc((), None)
461 462
462 463 def test_custom_exception(self):
463 464 called = []
464 465 def my_handler(shell, etype, value, tb, tb_offset=None):
465 466 called.append(etype)
466 467 shell.showtraceback((etype, value, tb), tb_offset=tb_offset)
467 468
468 469 ip.set_custom_exc((ValueError,), my_handler)
469 470 try:
470 471 res = ip.run_cell("raise ValueError('test')")
471 472 # Check that this was called, and only once.
472 473 self.assertEqual(called, [ValueError])
473 474 # Check that the error is on the result object
474 475 self.assertIsInstance(res.error_in_exec, ValueError)
475 476 finally:
476 477 # Reset the custom exception hook
477 478 ip.set_custom_exc((), None)
478 479
479 480 @mock.patch("builtins.print")
480 481 def test_showtraceback_with_surrogates(self, mocked_print):
481 482 values = []
482 483
483 484 def mock_print_func(value, sep=" ", end="\n", file=sys.stdout, flush=False):
484 485 values.append(value)
485 486 if value == chr(0xD8FF):
486 487 raise UnicodeEncodeError("utf-8", chr(0xD8FF), 0, 1, "")
487 488
488 489 # mock builtins.print
489 490 mocked_print.side_effect = mock_print_func
490 491
491 492 # ip._showtraceback() is replaced in globalipapp.py.
492 493 # Call original method to test.
493 494 interactiveshell.InteractiveShell._showtraceback(ip, None, None, chr(0xD8FF))
494 495
495 496 self.assertEqual(mocked_print.call_count, 2)
496 497 self.assertEqual(values, [chr(0xD8FF), "\\ud8ff"])
497 498
498 499 def test_mktempfile(self):
499 500 filename = ip.mktempfile()
500 501 # Check that we can open the file again on Windows
501 502 with open(filename, "w", encoding="utf-8") as f:
502 503 f.write("abc")
503 504
504 505 filename = ip.mktempfile(data="blah")
505 506 with open(filename, "r", encoding="utf-8") as f:
506 507 self.assertEqual(f.read(), "blah")
507 508
508 509 def test_new_main_mod(self):
509 510 # Smoketest to check that this accepts a unicode module name
510 511 name = u'jiefmw'
511 512 mod = ip.new_main_mod(u'%s.py' % name, name)
512 513 self.assertEqual(mod.__name__, name)
513 514
514 515 def test_get_exception_only(self):
515 516 try:
516 517 raise KeyboardInterrupt
517 518 except KeyboardInterrupt:
518 519 msg = ip.get_exception_only()
519 520 self.assertEqual(msg, 'KeyboardInterrupt\n')
520 521
521 522 try:
522 523 raise DerivedInterrupt("foo")
523 524 except KeyboardInterrupt:
524 525 msg = ip.get_exception_only()
525 526 self.assertEqual(msg, 'IPython.core.tests.test_interactiveshell.DerivedInterrupt: foo\n')
526 527
527 528 def test_inspect_text(self):
528 529 ip.run_cell('a = 5')
529 530 text = ip.object_inspect_text('a')
530 531 self.assertIsInstance(text, str)
531 532
532 533 def test_last_execution_result(self):
533 534 """ Check that last execution result gets set correctly (GH-10702) """
534 535 result = ip.run_cell('a = 5; a')
535 536 self.assertTrue(ip.last_execution_succeeded)
536 537 self.assertEqual(ip.last_execution_result.result, 5)
537 538
538 539 result = ip.run_cell('a = x_invalid_id_x')
539 540 self.assertFalse(ip.last_execution_succeeded)
540 541 self.assertFalse(ip.last_execution_result.success)
541 542 self.assertIsInstance(ip.last_execution_result.error_in_exec, NameError)
542 543
543 544 def test_reset_aliasing(self):
544 545 """ Check that standard posix aliases work after %reset. """
545 546 if os.name != 'posix':
546 547 return
547 548
548 549 ip.reset()
549 550 for cmd in ('clear', 'more', 'less', 'man'):
550 551 res = ip.run_cell('%' + cmd)
551 552 self.assertEqual(res.success, True)
552 553
553 554
554 555 class TestSafeExecfileNonAsciiPath(unittest.TestCase):
555 556
556 557 @onlyif_unicode_paths
557 558 def setUp(self):
558 559 self.BASETESTDIR = tempfile.mkdtemp()
559 560 self.TESTDIR = join(self.BASETESTDIR, u"Γ₯Àâ")
560 561 os.mkdir(self.TESTDIR)
561 562 with open(
562 563 join(self.TESTDIR, "Γ₯Àâtestscript.py"), "w", encoding="utf-8"
563 564 ) as sfile:
564 565 sfile.write("pass\n")
565 566 self.oldpath = os.getcwd()
566 567 os.chdir(self.TESTDIR)
567 568 self.fname = u"Γ₯Àâtestscript.py"
568 569
569 570 def tearDown(self):
570 571 os.chdir(self.oldpath)
571 572 shutil.rmtree(self.BASETESTDIR)
572 573
573 574 @onlyif_unicode_paths
574 575 def test_1(self):
575 576 """Test safe_execfile with non-ascii path
576 577 """
577 578 ip.safe_execfile(self.fname, {}, raise_exceptions=True)
578 579
579 580 class ExitCodeChecks(tt.TempFileMixin):
580 581
581 582 def setUp(self):
582 583 self.system = ip.system_raw
583 584
584 585 def test_exit_code_ok(self):
585 586 self.system('exit 0')
586 587 self.assertEqual(ip.user_ns['_exit_code'], 0)
587 588
588 589 def test_exit_code_error(self):
589 590 self.system('exit 1')
590 591 self.assertEqual(ip.user_ns['_exit_code'], 1)
591 592
592 593 @skipif(not hasattr(signal, 'SIGALRM'))
593 594 def test_exit_code_signal(self):
594 595 self.mktmp("import signal, time\n"
595 596 "signal.setitimer(signal.ITIMER_REAL, 0.1)\n"
596 597 "time.sleep(1)\n")
597 598 self.system("%s %s" % (sys.executable, self.fname))
598 599 self.assertEqual(ip.user_ns['_exit_code'], -signal.SIGALRM)
599 600
600 601 @onlyif_cmds_exist("csh")
601 602 def test_exit_code_signal_csh(self): # pragma: no cover
602 603 SHELL = os.environ.get("SHELL", None)
603 604 os.environ["SHELL"] = find_cmd("csh")
604 605 try:
605 606 self.test_exit_code_signal()
606 607 finally:
607 608 if SHELL is not None:
608 609 os.environ['SHELL'] = SHELL
609 610 else:
610 611 del os.environ['SHELL']
611 612
612 613
613 614 class TestSystemRaw(ExitCodeChecks):
614 615
615 616 def setUp(self):
616 617 super().setUp()
617 618 self.system = ip.system_raw
618 619
619 620 @onlyif_unicode_paths
620 621 def test_1(self):
621 622 """Test system_raw with non-ascii cmd
622 623 """
623 624 cmd = u'''python -c "'Γ₯Àâ'" '''
624 625 ip.system_raw(cmd)
625 626
626 627 @mock.patch('subprocess.call', side_effect=KeyboardInterrupt)
627 628 @mock.patch('os.system', side_effect=KeyboardInterrupt)
628 629 def test_control_c(self, *mocks):
629 630 try:
630 631 self.system("sleep 1 # wont happen")
631 632 except KeyboardInterrupt: # pragma: no cove
632 633 self.fail(
633 634 "system call should intercept "
634 635 "keyboard interrupt from subprocess.call"
635 636 )
636 637 self.assertEqual(ip.user_ns["_exit_code"], -signal.SIGINT)
637 638
638 def test_magic_warnings(self):
639 for magic_cmd in ("pip", "conda", "cd"):
640 with self.assertWarnsRegex(Warning, "You executed the system command"):
641 ip.system_raw(magic_cmd)
639
640 @pytest.mark.parametrize("magic_cmd", ["pip", "conda", "cd"])
641 def test_magic_warnings(magic_cmd):
642 if sys.platform == "win32":
643 to_mock = "os.system"
644 expected_arg, expected_kwargs = magic_cmd, dict()
645 else:
646 to_mock = "subprocess.call"
647 expected_arg, expected_kwargs = magic_cmd, dict(
648 shell=True, executable=os.environ.get("SHELL", None)
649 )
650
651 with mock.patch(to_mock, return_value=0) as mock_sub:
652 with pytest.warns(Warning, match=r"You executed the system command"):
653 ip.system_raw(magic_cmd)
654 mock_sub.assert_called_once_with(expected_arg, **expected_kwargs)
655
642 656
643 657 # TODO: Exit codes are currently ignored on Windows.
644 658 class TestSystemPipedExitCode(ExitCodeChecks):
645 659
646 660 def setUp(self):
647 661 super().setUp()
648 662 self.system = ip.system_piped
649 663
650 664 @skip_win32
651 665 def test_exit_code_ok(self):
652 666 ExitCodeChecks.test_exit_code_ok(self)
653 667
654 668 @skip_win32
655 669 def test_exit_code_error(self):
656 670 ExitCodeChecks.test_exit_code_error(self)
657 671
658 672 @skip_win32
659 673 def test_exit_code_signal(self):
660 674 ExitCodeChecks.test_exit_code_signal(self)
661 675
662 676 class TestModules(tt.TempFileMixin):
663 677 def test_extraneous_loads(self):
664 678 """Test we're not loading modules on startup that we shouldn't.
665 679 """
666 680 self.mktmp("import sys\n"
667 681 "print('numpy' in sys.modules)\n"
668 682 "print('ipyparallel' in sys.modules)\n"
669 683 "print('ipykernel' in sys.modules)\n"
670 684 )
671 685 out = "False\nFalse\nFalse\n"
672 686 tt.ipexec_validate(self.fname, out)
673 687
674 688 class Negator(ast.NodeTransformer):
675 689 """Negates all number literals in an AST."""
676 690
677 691 # for python 3.7 and earlier
678 692 def visit_Num(self, node):
679 693 node.n = -node.n
680 694 return node
681 695
682 696 # for python 3.8+
683 697 def visit_Constant(self, node):
684 698 if isinstance(node.value, int):
685 699 return self.visit_Num(node)
686 700 return node
687 701
688 702 class TestAstTransform(unittest.TestCase):
689 703 def setUp(self):
690 704 self.negator = Negator()
691 705 ip.ast_transformers.append(self.negator)
692 706
693 707 def tearDown(self):
694 708 ip.ast_transformers.remove(self.negator)
695 709
696 710 def test_non_int_const(self):
697 711 with tt.AssertPrints("hello"):
698 712 ip.run_cell('print("hello")')
699 713
700 714 def test_run_cell(self):
701 715 with tt.AssertPrints("-34"):
702 716 ip.run_cell("print(12 + 22)")
703 717
704 718 # A named reference to a number shouldn't be transformed.
705 719 ip.user_ns["n"] = 55
706 720 with tt.AssertNotPrints("-55"):
707 721 ip.run_cell("print(n)")
708 722
709 723 def test_timeit(self):
710 724 called = set()
711 725 def f(x):
712 726 called.add(x)
713 727 ip.push({'f':f})
714 728
715 729 with tt.AssertPrints("std. dev. of"):
716 730 ip.run_line_magic("timeit", "-n1 f(1)")
717 731 self.assertEqual(called, {-1})
718 732 called.clear()
719 733
720 734 with tt.AssertPrints("std. dev. of"):
721 735 ip.run_cell_magic("timeit", "-n1 f(2)", "f(3)")
722 736 self.assertEqual(called, {-2, -3})
723 737
724 738 def test_time(self):
725 739 called = []
726 740 def f(x):
727 741 called.append(x)
728 742 ip.push({'f':f})
729 743
730 744 # Test with an expression
731 745 with tt.AssertPrints("Wall time: "):
732 746 ip.run_line_magic("time", "f(5+9)")
733 747 self.assertEqual(called, [-14])
734 748 called[:] = []
735 749
736 750 # Test with a statement (different code path)
737 751 with tt.AssertPrints("Wall time: "):
738 752 ip.run_line_magic("time", "a = f(-3 + -2)")
739 753 self.assertEqual(called, [5])
740 754
741 755 def test_macro(self):
742 756 ip.push({'a':10})
743 757 # The AST transformation makes this do a+=-1
744 758 ip.define_macro("amacro", "a+=1\nprint(a)")
745 759
746 760 with tt.AssertPrints("9"):
747 761 ip.run_cell("amacro")
748 762 with tt.AssertPrints("8"):
749 763 ip.run_cell("amacro")
750 764
751 765 class TestMiscTransform(unittest.TestCase):
752 766
753 767
754 768 def test_transform_only_once(self):
755 769 cleanup = 0
756 770 line_t = 0
757 771 def count_cleanup(lines):
758 772 nonlocal cleanup
759 773 cleanup += 1
760 774 return lines
761 775
762 776 def count_line_t(lines):
763 777 nonlocal line_t
764 778 line_t += 1
765 779 return lines
766 780
767 781 ip.input_transformer_manager.cleanup_transforms.append(count_cleanup)
768 782 ip.input_transformer_manager.line_transforms.append(count_line_t)
769 783
770 784 ip.run_cell('1')
771 785
772 786 assert cleanup == 1
773 787 assert line_t == 1
774 788
775 789 class IntegerWrapper(ast.NodeTransformer):
776 790 """Wraps all integers in a call to Integer()"""
777 791
778 792 # for Python 3.7 and earlier
779 793
780 794 # for Python 3.7 and earlier
781 795 def visit_Num(self, node):
782 796 if isinstance(node.n, int):
783 797 return ast.Call(func=ast.Name(id='Integer', ctx=ast.Load()),
784 798 args=[node], keywords=[])
785 799 return node
786 800
787 801 # For Python 3.8+
788 802 def visit_Constant(self, node):
789 803 if isinstance(node.value, int):
790 804 return self.visit_Num(node)
791 805 return node
792 806
793 807
794 808 class TestAstTransform2(unittest.TestCase):
795 809 def setUp(self):
796 810 self.intwrapper = IntegerWrapper()
797 811 ip.ast_transformers.append(self.intwrapper)
798 812
799 813 self.calls = []
800 814 def Integer(*args):
801 815 self.calls.append(args)
802 816 return args
803 817 ip.push({"Integer": Integer})
804 818
805 819 def tearDown(self):
806 820 ip.ast_transformers.remove(self.intwrapper)
807 821 del ip.user_ns['Integer']
808 822
809 823 def test_run_cell(self):
810 824 ip.run_cell("n = 2")
811 825 self.assertEqual(self.calls, [(2,)])
812 826
813 827 # This shouldn't throw an error
814 828 ip.run_cell("o = 2.0")
815 829 self.assertEqual(ip.user_ns['o'], 2.0)
816 830
817 831 def test_run_cell_non_int(self):
818 832 ip.run_cell("n = 'a'")
819 833 assert self.calls == []
820 834
821 835 def test_timeit(self):
822 836 called = set()
823 837 def f(x):
824 838 called.add(x)
825 839 ip.push({'f':f})
826 840
827 841 with tt.AssertPrints("std. dev. of"):
828 842 ip.run_line_magic("timeit", "-n1 f(1)")
829 843 self.assertEqual(called, {(1,)})
830 844 called.clear()
831 845
832 846 with tt.AssertPrints("std. dev. of"):
833 847 ip.run_cell_magic("timeit", "-n1 f(2)", "f(3)")
834 848 self.assertEqual(called, {(2,), (3,)})
835 849
836 850 class ErrorTransformer(ast.NodeTransformer):
837 851 """Throws an error when it sees a number."""
838 852
839 853 def visit_Constant(self, node):
840 854 if isinstance(node.value, int):
841 855 raise ValueError("test")
842 856 return node
843 857
844 858
845 859 class TestAstTransformError(unittest.TestCase):
846 860 def test_unregistering(self):
847 861 err_transformer = ErrorTransformer()
848 862 ip.ast_transformers.append(err_transformer)
849 863
850 864 with self.assertWarnsRegex(UserWarning, "It will be unregistered"):
851 865 ip.run_cell("1 + 2")
852 866
853 867 # This should have been removed.
854 868 self.assertNotIn(err_transformer, ip.ast_transformers)
855 869
856 870
857 871 class StringRejector(ast.NodeTransformer):
858 872 """Throws an InputRejected when it sees a string literal.
859 873
860 874 Used to verify that NodeTransformers can signal that a piece of code should
861 875 not be executed by throwing an InputRejected.
862 876 """
863 877
864 878 # 3.8 only
865 879 def visit_Constant(self, node):
866 880 if isinstance(node.value, str):
867 881 raise InputRejected("test")
868 882 return node
869 883
870 884
871 885 class TestAstTransformInputRejection(unittest.TestCase):
872 886
873 887 def setUp(self):
874 888 self.transformer = StringRejector()
875 889 ip.ast_transformers.append(self.transformer)
876 890
877 891 def tearDown(self):
878 892 ip.ast_transformers.remove(self.transformer)
879 893
880 894 def test_input_rejection(self):
881 895 """Check that NodeTransformers can reject input."""
882 896
883 897 expect_exception_tb = tt.AssertPrints("InputRejected: test")
884 898 expect_no_cell_output = tt.AssertNotPrints("'unsafe'", suppress=False)
885 899
886 900 # Run the same check twice to verify that the transformer is not
887 901 # disabled after raising.
888 902 with expect_exception_tb, expect_no_cell_output:
889 903 ip.run_cell("'unsafe'")
890 904
891 905 with expect_exception_tb, expect_no_cell_output:
892 906 res = ip.run_cell("'unsafe'")
893 907
894 908 self.assertIsInstance(res.error_before_exec, InputRejected)
895 909
896 910 def test__IPYTHON__():
897 911 # This shouldn't raise a NameError, that's all
898 912 __IPYTHON__
899 913
900 914
901 915 class DummyRepr(object):
902 916 def __repr__(self):
903 917 return "DummyRepr"
904 918
905 919 def _repr_html_(self):
906 920 return "<b>dummy</b>"
907 921
908 922 def _repr_javascript_(self):
909 923 return "console.log('hi');", {'key': 'value'}
910 924
911 925
912 926 def test_user_variables():
913 927 # enable all formatters
914 928 ip.display_formatter.active_types = ip.display_formatter.format_types
915 929
916 930 ip.user_ns['dummy'] = d = DummyRepr()
917 931 keys = {'dummy', 'doesnotexist'}
918 932 r = ip.user_expressions({ key:key for key in keys})
919 933
920 934 assert keys == set(r.keys())
921 935 dummy = r["dummy"]
922 936 assert {"status", "data", "metadata"} == set(dummy.keys())
923 937 assert dummy["status"] == "ok"
924 938 data = dummy["data"]
925 939 metadata = dummy["metadata"]
926 940 assert data.get("text/html") == d._repr_html_()
927 941 js, jsmd = d._repr_javascript_()
928 942 assert data.get("application/javascript") == js
929 943 assert metadata.get("application/javascript") == jsmd
930 944
931 945 dne = r["doesnotexist"]
932 946 assert dne["status"] == "error"
933 947 assert dne["ename"] == "NameError"
934 948
935 949 # back to text only
936 950 ip.display_formatter.active_types = ['text/plain']
937 951
938 952 def test_user_expression():
939 953 # enable all formatters
940 954 ip.display_formatter.active_types = ip.display_formatter.format_types
941 955 query = {
942 956 'a' : '1 + 2',
943 957 'b' : '1/0',
944 958 }
945 959 r = ip.user_expressions(query)
946 960 import pprint
947 961 pprint.pprint(r)
948 962 assert set(r.keys()) == set(query.keys())
949 963 a = r["a"]
950 964 assert {"status", "data", "metadata"} == set(a.keys())
951 965 assert a["status"] == "ok"
952 966 data = a["data"]
953 967 metadata = a["metadata"]
954 968 assert data.get("text/plain") == "3"
955 969
956 970 b = r["b"]
957 971 assert b["status"] == "error"
958 972 assert b["ename"] == "ZeroDivisionError"
959 973
960 974 # back to text only
961 975 ip.display_formatter.active_types = ['text/plain']
962 976
963 977
964 978 class TestSyntaxErrorTransformer(unittest.TestCase):
965 979 """Check that SyntaxError raised by an input transformer is handled by run_cell()"""
966 980
967 981 @staticmethod
968 982 def transformer(lines):
969 983 for line in lines:
970 984 pos = line.find('syntaxerror')
971 985 if pos >= 0:
972 986 e = SyntaxError('input contains "syntaxerror"')
973 987 e.text = line
974 988 e.offset = pos + 1
975 989 raise e
976 990 return lines
977 991
978 992 def setUp(self):
979 993 ip.input_transformers_post.append(self.transformer)
980 994
981 995 def tearDown(self):
982 996 ip.input_transformers_post.remove(self.transformer)
983 997
984 998 def test_syntaxerror_input_transformer(self):
985 999 with tt.AssertPrints('1234'):
986 1000 ip.run_cell('1234')
987 1001 with tt.AssertPrints('SyntaxError: invalid syntax'):
988 1002 ip.run_cell('1 2 3') # plain python syntax error
989 1003 with tt.AssertPrints('SyntaxError: input contains "syntaxerror"'):
990 1004 ip.run_cell('2345 # syntaxerror') # input transformer syntax error
991 1005 with tt.AssertPrints('3456'):
992 1006 ip.run_cell('3456')
993 1007
994 1008
995 1009 class TestWarningSuppression(unittest.TestCase):
996 1010 def test_warning_suppression(self):
997 1011 ip.run_cell("import warnings")
998 1012 try:
999 1013 with self.assertWarnsRegex(UserWarning, "asdf"):
1000 1014 ip.run_cell("warnings.warn('asdf')")
1001 1015 # Here's the real test -- if we run that again, we should get the
1002 1016 # warning again. Traditionally, each warning was only issued once per
1003 1017 # IPython session (approximately), even if the user typed in new and
1004 1018 # different code that should have also triggered the warning, leading
1005 1019 # to much confusion.
1006 1020 with self.assertWarnsRegex(UserWarning, "asdf"):
1007 1021 ip.run_cell("warnings.warn('asdf')")
1008 1022 finally:
1009 1023 ip.run_cell("del warnings")
1010 1024
1011 1025
1012 1026 def test_deprecation_warning(self):
1013 1027 ip.run_cell("""
1014 1028 import warnings
1015 1029 def wrn():
1016 1030 warnings.warn(
1017 1031 "I AM A WARNING",
1018 1032 DeprecationWarning
1019 1033 )
1020 1034 """)
1021 1035 try:
1022 1036 with self.assertWarnsRegex(DeprecationWarning, "I AM A WARNING"):
1023 1037 ip.run_cell("wrn()")
1024 1038 finally:
1025 1039 ip.run_cell("del warnings")
1026 1040 ip.run_cell("del wrn")
1027 1041
1028 1042
1029 1043 class TestImportNoDeprecate(tt.TempFileMixin):
1030 1044
1031 1045 def setUp(self):
1032 1046 """Make a valid python temp file."""
1033 1047 self.mktmp("""
1034 1048 import warnings
1035 1049 def wrn():
1036 1050 warnings.warn(
1037 1051 "I AM A WARNING",
1038 1052 DeprecationWarning
1039 1053 )
1040 1054 """)
1041 1055 super().setUp()
1042 1056
1043 1057 def test_no_dep(self):
1044 1058 """
1045 1059 No deprecation warning should be raised from imported functions
1046 1060 """
1047 1061 ip.run_cell("from {} import wrn".format(self.fname))
1048 1062
1049 1063 with tt.AssertNotPrints("I AM A WARNING"):
1050 1064 ip.run_cell("wrn()")
1051 1065 ip.run_cell("del wrn")
1052 1066
1053 1067
1054 1068 def test_custom_exc_count():
1055 1069 hook = mock.Mock(return_value=None)
1056 1070 ip.set_custom_exc((SyntaxError,), hook)
1057 1071 before = ip.execution_count
1058 1072 ip.run_cell("def foo()", store_history=True)
1059 1073 # restore default excepthook
1060 1074 ip.set_custom_exc((), None)
1061 1075 assert hook.call_count == 1
1062 1076 assert ip.execution_count == before + 1
1063 1077
1064 1078
1065 1079 def test_run_cell_async():
1066 1080 ip.run_cell("import asyncio")
1067 1081 coro = ip.run_cell_async("await asyncio.sleep(0.01)\n5")
1068 1082 assert asyncio.iscoroutine(coro)
1069 1083 loop = asyncio.new_event_loop()
1070 1084 result = loop.run_until_complete(coro)
1071 1085 assert isinstance(result, interactiveshell.ExecutionResult)
1072 1086 assert result.result == 5
1073 1087
1074 1088
1075 1089 def test_run_cell_await():
1076 1090 ip.run_cell("import asyncio")
1077 1091 result = ip.run_cell("await asyncio.sleep(0.01); 10")
1078 1092 assert ip.user_ns["_"] == 10
1079 1093
1080 1094
1081 1095 def test_run_cell_asyncio_run():
1082 1096 ip.run_cell("import asyncio")
1083 1097 result = ip.run_cell("await asyncio.sleep(0.01); 1")
1084 1098 assert ip.user_ns["_"] == 1
1085 1099 result = ip.run_cell("asyncio.run(asyncio.sleep(0.01)); 2")
1086 1100 assert ip.user_ns["_"] == 2
1087 1101 result = ip.run_cell("await asyncio.sleep(0.01); 3")
1088 1102 assert ip.user_ns["_"] == 3
1089 1103
1090 1104
1091 1105 def test_should_run_async():
1092 assert not ip.should_run_async("a = 5")
1093 assert ip.should_run_async("await x")
1094 assert ip.should_run_async("import asyncio; await asyncio.sleep(1)")
1106 assert not ip.should_run_async("a = 5", transformed_cell="a = 5")
1107 assert ip.should_run_async("await x", transformed_cell="await x")
1108 assert ip.should_run_async(
1109 "import asyncio; await asyncio.sleep(1)",
1110 transformed_cell="import asyncio; await asyncio.sleep(1)",
1111 )
1095 1112
1096 1113
1097 1114 def test_set_custom_completer():
1098 1115 num_completers = len(ip.Completer.matchers)
1099 1116
1100 1117 def foo(*args, **kwargs):
1101 1118 return "I'm a completer!"
1102 1119
1103 1120 ip.set_custom_completer(foo, 0)
1104 1121
1105 1122 # check that we've really added a new completer
1106 1123 assert len(ip.Completer.matchers) == num_completers + 1
1107 1124
1108 1125 # check that the first completer is the function we defined
1109 1126 assert ip.Completer.matchers[0]() == "I'm a completer!"
1110 1127
1111 1128 # clean up
1112 1129 ip.Completer.custom_matchers.pop()
General Comments 0
You need to be logged in to leave comments. Login now