##// END OF EJS Templates
Merge pull request #13552 from Carreau/tc...
Matthias Bussonnier -
r27563:c71cde55 merge
parent child Browse files
Show More
@@ -1,1100 +1,1100 b''
1 1 # -*- coding: utf-8 -*-
2 2 """Tests for the key interactiveshell module.
3 3
4 4 Historically the main classes in interactiveshell have been under-tested. This
5 5 module should grow as many single-method tests as possible to trap many of the
6 6 recurring bugs we seem to encounter with high-level interaction.
7 7 """
8 8
9 9 # Copyright (c) IPython Development Team.
10 10 # Distributed under the terms of the Modified BSD License.
11 11
12 12 import asyncio
13 13 import ast
14 14 import os
15 15 import signal
16 16 import shutil
17 17 import sys
18 18 import tempfile
19 19 import unittest
20 20 from unittest import mock
21 21
22 22 from os.path import join
23 23
24 24 from IPython.core.error import InputRejected
25 25 from IPython.core.inputtransformer import InputTransformer
26 26 from IPython.core import interactiveshell
27 27 from IPython.testing.decorators import (
28 28 skipif, skip_win32, onlyif_unicode_paths, onlyif_cmds_exist,
29 29 )
30 30 from IPython.testing import tools as tt
31 31 from IPython.utils.process import find_cmd
32 32
33 33 #-----------------------------------------------------------------------------
34 34 # Globals
35 35 #-----------------------------------------------------------------------------
36 36 # This is used by every single test, no point repeating it ad nauseam
37 37
38 38 #-----------------------------------------------------------------------------
39 39 # Tests
40 40 #-----------------------------------------------------------------------------
41 41
42 42 class DerivedInterrupt(KeyboardInterrupt):
43 43 pass
44 44
45 45 class InteractiveShellTestCase(unittest.TestCase):
46 46 def test_naked_string_cells(self):
47 47 """Test that cells with only naked strings are fully executed"""
48 48 # First, single-line inputs
49 49 ip.run_cell('"a"\n')
50 50 self.assertEqual(ip.user_ns['_'], 'a')
51 51 # And also multi-line cells
52 52 ip.run_cell('"""a\nb"""\n')
53 53 self.assertEqual(ip.user_ns['_'], 'a\nb')
54 54
55 55 def test_run_empty_cell(self):
56 56 """Just make sure we don't get a horrible error with a blank
57 57 cell of input. Yes, I did overlook that."""
58 58 old_xc = ip.execution_count
59 59 res = ip.run_cell('')
60 60 self.assertEqual(ip.execution_count, old_xc)
61 61 self.assertEqual(res.execution_count, None)
62 62
63 63 def test_run_cell_multiline(self):
64 64 """Multi-block, multi-line cells must execute correctly.
65 65 """
66 66 src = '\n'.join(["x=1",
67 67 "y=2",
68 68 "if 1:",
69 69 " x += 1",
70 70 " y += 1",])
71 71 res = ip.run_cell(src)
72 72 self.assertEqual(ip.user_ns['x'], 2)
73 73 self.assertEqual(ip.user_ns['y'], 3)
74 74 self.assertEqual(res.success, True)
75 75 self.assertEqual(res.result, None)
76 76
77 77 def test_multiline_string_cells(self):
78 78 "Code sprinkled with multiline strings should execute (GH-306)"
79 79 ip.run_cell('tmp=0')
80 80 self.assertEqual(ip.user_ns['tmp'], 0)
81 81 res = ip.run_cell('tmp=1;"""a\nb"""\n')
82 82 self.assertEqual(ip.user_ns['tmp'], 1)
83 83 self.assertEqual(res.success, True)
84 84 self.assertEqual(res.result, "a\nb")
85 85
86 86 def test_dont_cache_with_semicolon(self):
87 87 "Ending a line with semicolon should not cache the returned object (GH-307)"
88 88 oldlen = len(ip.user_ns['Out'])
89 89 for cell in ['1;', '1;1;']:
90 90 res = ip.run_cell(cell, store_history=True)
91 91 newlen = len(ip.user_ns['Out'])
92 92 self.assertEqual(oldlen, newlen)
93 93 self.assertIsNone(res.result)
94 94 i = 0
95 95 #also test the default caching behavior
96 96 for cell in ['1', '1;1']:
97 97 ip.run_cell(cell, store_history=True)
98 98 newlen = len(ip.user_ns['Out'])
99 99 i += 1
100 100 self.assertEqual(oldlen+i, newlen)
101 101
102 102 def test_syntax_error(self):
103 103 res = ip.run_cell("raise = 3")
104 104 self.assertIsInstance(res.error_before_exec, SyntaxError)
105 105
106 106 def test_In_variable(self):
107 107 "Verify that In variable grows with user input (GH-284)"
108 108 oldlen = len(ip.user_ns['In'])
109 109 ip.run_cell('1;', store_history=True)
110 110 newlen = len(ip.user_ns['In'])
111 111 self.assertEqual(oldlen+1, newlen)
112 112 self.assertEqual(ip.user_ns['In'][-1],'1;')
113 113
114 114 def test_magic_names_in_string(self):
115 115 ip.run_cell('a = """\n%exit\n"""')
116 116 self.assertEqual(ip.user_ns['a'], '\n%exit\n')
117 117
118 118 def test_trailing_newline(self):
119 119 """test that running !(command) does not raise a SyntaxError"""
120 120 ip.run_cell('!(true)\n', False)
121 121 ip.run_cell('!(true)\n\n\n', False)
122 122
123 123 def test_gh_597(self):
124 124 """Pretty-printing lists of objects with non-ascii reprs may cause
125 125 problems."""
126 126 class Spam(object):
127 127 def __repr__(self):
128 128 return "\xe9"*50
129 129 import IPython.core.formatters
130 130 f = IPython.core.formatters.PlainTextFormatter()
131 131 f([Spam(),Spam()])
132 132
133 133
134 134 def test_future_flags(self):
135 135 """Check that future flags are used for parsing code (gh-777)"""
136 136 ip.run_cell('from __future__ import barry_as_FLUFL')
137 137 try:
138 138 ip.run_cell('prfunc_return_val = 1 <> 2')
139 139 assert 'prfunc_return_val' in ip.user_ns
140 140 finally:
141 141 # Reset compiler flags so we don't mess up other tests.
142 142 ip.compile.reset_compiler_flags()
143 143
144 144 def test_can_pickle(self):
145 145 "Can we pickle objects defined interactively (GH-29)"
146 146 ip = get_ipython()
147 147 ip.reset()
148 148 ip.run_cell(("class Mylist(list):\n"
149 149 " def __init__(self,x=[]):\n"
150 150 " list.__init__(self,x)"))
151 151 ip.run_cell("w=Mylist([1,2,3])")
152 152
153 153 from pickle import dumps
154 154
155 155 # We need to swap in our main module - this is only necessary
156 156 # inside the test framework, because IPython puts the interactive module
157 157 # in place (but the test framework undoes this).
158 158 _main = sys.modules['__main__']
159 159 sys.modules['__main__'] = ip.user_module
160 160 try:
161 161 res = dumps(ip.user_ns["w"])
162 162 finally:
163 163 sys.modules['__main__'] = _main
164 164 self.assertTrue(isinstance(res, bytes))
165 165
166 166 def test_global_ns(self):
167 167 "Code in functions must be able to access variables outside them."
168 168 ip = get_ipython()
169 169 ip.run_cell("a = 10")
170 170 ip.run_cell(("def f(x):\n"
171 171 " return x + a"))
172 172 ip.run_cell("b = f(12)")
173 173 self.assertEqual(ip.user_ns["b"], 22)
174 174
175 175 def test_bad_custom_tb(self):
176 176 """Check that InteractiveShell is protected from bad custom exception handlers"""
177 177 ip.set_custom_exc((IOError,), lambda etype,value,tb: 1/0)
178 178 self.assertEqual(ip.custom_exceptions, (IOError,))
179 179 with tt.AssertPrints("Custom TB Handler failed", channel='stderr'):
180 180 ip.run_cell(u'raise IOError("foo")')
181 181 self.assertEqual(ip.custom_exceptions, ())
182 182
183 183 def test_bad_custom_tb_return(self):
184 184 """Check that InteractiveShell is protected from bad return types in custom exception handlers"""
185 185 ip.set_custom_exc((NameError,),lambda etype,value,tb, tb_offset=None: 1)
186 186 self.assertEqual(ip.custom_exceptions, (NameError,))
187 187 with tt.AssertPrints("Custom TB Handler failed", channel='stderr'):
188 188 ip.run_cell(u'a=abracadabra')
189 189 self.assertEqual(ip.custom_exceptions, ())
190 190
191 191 def test_drop_by_id(self):
192 192 myvars = {"a":object(), "b":object(), "c": object()}
193 193 ip.push(myvars, interactive=False)
194 194 for name in myvars:
195 195 assert name in ip.user_ns, name
196 196 assert name in ip.user_ns_hidden, name
197 197 ip.user_ns['b'] = 12
198 198 ip.drop_by_id(myvars)
199 199 for name in ["a", "c"]:
200 200 assert name not in ip.user_ns, name
201 201 assert name not in ip.user_ns_hidden, name
202 202 assert ip.user_ns['b'] == 12
203 203 ip.reset()
204 204
205 205 def test_var_expand(self):
206 206 ip.user_ns['f'] = u'Ca\xf1o'
207 207 self.assertEqual(ip.var_expand(u'echo $f'), u'echo Ca\xf1o')
208 208 self.assertEqual(ip.var_expand(u'echo {f}'), u'echo Ca\xf1o')
209 209 self.assertEqual(ip.var_expand(u'echo {f[:-1]}'), u'echo Ca\xf1')
210 210 self.assertEqual(ip.var_expand(u'echo {1*2}'), u'echo 2')
211 211
212 212 self.assertEqual(ip.var_expand(u"grep x | awk '{print $1}'"), u"grep x | awk '{print $1}'")
213 213
214 214 ip.user_ns['f'] = b'Ca\xc3\xb1o'
215 215 # This should not raise any exception:
216 216 ip.var_expand(u'echo $f')
217 217
218 218 def test_var_expand_local(self):
219 219 """Test local variable expansion in !system and %magic calls"""
220 220 # !system
221 221 ip.run_cell(
222 222 "def test():\n"
223 223 ' lvar = "ttt"\n'
224 224 " ret = !echo {lvar}\n"
225 225 " return ret[0]\n"
226 226 )
227 227 res = ip.user_ns["test"]()
228 228 self.assertIn("ttt", res)
229 229
230 230 # %magic
231 231 ip.run_cell(
232 232 "def makemacro():\n"
233 233 ' macroname = "macro_var_expand_locals"\n'
234 234 " %macro {macroname} codestr\n"
235 235 )
236 236 ip.user_ns["codestr"] = "str(12)"
237 237 ip.run_cell("makemacro()")
238 238 self.assertIn("macro_var_expand_locals", ip.user_ns)
239 239
240 240 def test_var_expand_self(self):
241 241 """Test variable expansion with the name 'self', which was failing.
242 242
243 243 See https://github.com/ipython/ipython/issues/1878#issuecomment-7698218
244 244 """
245 245 ip.run_cell(
246 246 "class cTest:\n"
247 247 ' classvar="see me"\n'
248 248 " def test(self):\n"
249 249 " res = !echo Variable: {self.classvar}\n"
250 250 " return res[0]\n"
251 251 )
252 252 self.assertIn("see me", ip.user_ns["cTest"]().test())
253 253
254 254 def test_bad_var_expand(self):
255 255 """var_expand on invalid formats shouldn't raise"""
256 256 # SyntaxError
257 257 self.assertEqual(ip.var_expand(u"{'a':5}"), u"{'a':5}")
258 258 # NameError
259 259 self.assertEqual(ip.var_expand(u"{asdf}"), u"{asdf}")
260 260 # ZeroDivisionError
261 261 self.assertEqual(ip.var_expand(u"{1/0}"), u"{1/0}")
262 262
263 263 def test_silent_postexec(self):
264 264 """run_cell(silent=True) doesn't invoke pre/post_run_cell callbacks"""
265 265 pre_explicit = mock.Mock()
266 266 pre_always = mock.Mock()
267 267 post_explicit = mock.Mock()
268 268 post_always = mock.Mock()
269 269 all_mocks = [pre_explicit, pre_always, post_explicit, post_always]
270 270
271 271 ip.events.register('pre_run_cell', pre_explicit)
272 272 ip.events.register('pre_execute', pre_always)
273 273 ip.events.register('post_run_cell', post_explicit)
274 274 ip.events.register('post_execute', post_always)
275 275
276 276 try:
277 277 ip.run_cell("1", silent=True)
278 278 assert pre_always.called
279 279 assert not pre_explicit.called
280 280 assert post_always.called
281 281 assert not post_explicit.called
282 282 # double-check that non-silent exec did what we expected
283 283 # silent to avoid
284 284 ip.run_cell("1")
285 285 assert pre_explicit.called
286 286 assert post_explicit.called
287 287 info, = pre_explicit.call_args[0]
288 288 result, = post_explicit.call_args[0]
289 289 self.assertEqual(info, result.info)
290 290 # check that post hooks are always called
291 291 [m.reset_mock() for m in all_mocks]
292 292 ip.run_cell("syntax error")
293 293 assert pre_always.called
294 294 assert pre_explicit.called
295 295 assert post_always.called
296 296 assert post_explicit.called
297 297 info, = pre_explicit.call_args[0]
298 298 result, = post_explicit.call_args[0]
299 299 self.assertEqual(info, result.info)
300 300 finally:
301 301 # remove post-exec
302 302 ip.events.unregister('pre_run_cell', pre_explicit)
303 303 ip.events.unregister('pre_execute', pre_always)
304 304 ip.events.unregister('post_run_cell', post_explicit)
305 305 ip.events.unregister('post_execute', post_always)
306 306
307 307 def test_silent_noadvance(self):
308 308 """run_cell(silent=True) doesn't advance execution_count"""
309 309 ec = ip.execution_count
310 310 # silent should force store_history=False
311 311 ip.run_cell("1", store_history=True, silent=True)
312 312
313 313 self.assertEqual(ec, ip.execution_count)
314 314 # double-check that non-silent exec did what we expected
315 315 # silent to avoid
316 316 ip.run_cell("1", store_history=True)
317 317 self.assertEqual(ec+1, ip.execution_count)
318 318
319 319 def test_silent_nodisplayhook(self):
320 320 """run_cell(silent=True) doesn't trigger displayhook"""
321 321 d = dict(called=False)
322 322
323 323 trap = ip.display_trap
324 324 save_hook = trap.hook
325 325
326 326 def failing_hook(*args, **kwargs):
327 327 d['called'] = True
328 328
329 329 try:
330 330 trap.hook = failing_hook
331 331 res = ip.run_cell("1", silent=True)
332 332 self.assertFalse(d['called'])
333 333 self.assertIsNone(res.result)
334 334 # double-check that non-silent exec did what we expected
335 335 # silent to avoid
336 336 ip.run_cell("1")
337 337 self.assertTrue(d['called'])
338 338 finally:
339 339 trap.hook = save_hook
340 340
341 341 def test_ofind_line_magic(self):
342 342 from IPython.core.magic import register_line_magic
343 343
344 344 @register_line_magic
345 345 def lmagic(line):
346 346 "A line magic"
347 347
348 348 # Get info on line magic
349 349 lfind = ip._ofind("lmagic")
350 350 info = dict(
351 351 found=True,
352 352 isalias=False,
353 353 ismagic=True,
354 354 namespace="IPython internal",
355 355 obj=lmagic,
356 356 parent=None,
357 357 )
358 358 self.assertEqual(lfind, info)
359 359
360 360 def test_ofind_cell_magic(self):
361 361 from IPython.core.magic import register_cell_magic
362 362
363 363 @register_cell_magic
364 364 def cmagic(line, cell):
365 365 "A cell magic"
366 366
367 367 # Get info on cell magic
368 368 find = ip._ofind("cmagic")
369 369 info = dict(
370 370 found=True,
371 371 isalias=False,
372 372 ismagic=True,
373 373 namespace="IPython internal",
374 374 obj=cmagic,
375 375 parent=None,
376 376 )
377 377 self.assertEqual(find, info)
378 378
379 379 def test_ofind_property_with_error(self):
380 380 class A(object):
381 381 @property
382 382 def foo(self):
383 raise NotImplementedError()
383 raise NotImplementedError() # pragma: no cover
384
384 385 a = A()
385 386
386 387 found = ip._ofind('a.foo', [('locals', locals())])
387 388 info = dict(found=True, isalias=False, ismagic=False,
388 389 namespace='locals', obj=A.foo, parent=a)
389 390 self.assertEqual(found, info)
390 391
391 392 def test_ofind_multiple_attribute_lookups(self):
392 393 class A(object):
393 394 @property
394 395 def foo(self):
395 raise NotImplementedError()
396 raise NotImplementedError() # pragma: no cover
396 397
397 398 a = A()
398 399 a.a = A()
399 400 a.a.a = A()
400 401
401 402 found = ip._ofind('a.a.a.foo', [('locals', locals())])
402 403 info = dict(found=True, isalias=False, ismagic=False,
403 404 namespace='locals', obj=A.foo, parent=a.a.a)
404 405 self.assertEqual(found, info)
405 406
406 407 def test_ofind_slotted_attributes(self):
407 408 class A(object):
408 409 __slots__ = ['foo']
409 410 def __init__(self):
410 411 self.foo = 'bar'
411 412
412 413 a = A()
413 414 found = ip._ofind('a.foo', [('locals', locals())])
414 415 info = dict(found=True, isalias=False, ismagic=False,
415 416 namespace='locals', obj=a.foo, parent=a)
416 417 self.assertEqual(found, info)
417 418
418 419 found = ip._ofind('a.bar', [('locals', locals())])
419 420 info = dict(found=False, isalias=False, ismagic=False,
420 421 namespace=None, obj=None, parent=a)
421 422 self.assertEqual(found, info)
422 423
423 424 def test_ofind_prefers_property_to_instance_level_attribute(self):
424 425 class A(object):
425 426 @property
426 427 def foo(self):
427 428 return 'bar'
428 429 a = A()
429 430 a.__dict__["foo"] = "baz"
430 431 self.assertEqual(a.foo, "bar")
431 432 found = ip._ofind("a.foo", [("locals", locals())])
432 433 self.assertIs(found["obj"], A.foo)
433 434
434 435 def test_custom_syntaxerror_exception(self):
435 436 called = []
436 437 def my_handler(shell, etype, value, tb, tb_offset=None):
437 438 called.append(etype)
438 439 shell.showtraceback((etype, value, tb), tb_offset=tb_offset)
439 440
440 441 ip.set_custom_exc((SyntaxError,), my_handler)
441 442 try:
442 443 ip.run_cell("1f")
443 444 # Check that this was called, and only once.
444 445 self.assertEqual(called, [SyntaxError])
445 446 finally:
446 447 # Reset the custom exception hook
447 448 ip.set_custom_exc((), None)
448 449
449 450 def test_custom_exception(self):
450 451 called = []
451 452 def my_handler(shell, etype, value, tb, tb_offset=None):
452 453 called.append(etype)
453 454 shell.showtraceback((etype, value, tb), tb_offset=tb_offset)
454 455
455 456 ip.set_custom_exc((ValueError,), my_handler)
456 457 try:
457 458 res = ip.run_cell("raise ValueError('test')")
458 459 # Check that this was called, and only once.
459 460 self.assertEqual(called, [ValueError])
460 461 # Check that the error is on the result object
461 462 self.assertIsInstance(res.error_in_exec, ValueError)
462 463 finally:
463 464 # Reset the custom exception hook
464 465 ip.set_custom_exc((), None)
465 466
466 467 @mock.patch("builtins.print")
467 468 def test_showtraceback_with_surrogates(self, mocked_print):
468 469 values = []
469 470
470 471 def mock_print_func(value, sep=" ", end="\n", file=sys.stdout, flush=False):
471 472 values.append(value)
472 473 if value == chr(0xD8FF):
473 474 raise UnicodeEncodeError("utf-8", chr(0xD8FF), 0, 1, "")
474 475
475 476 # mock builtins.print
476 477 mocked_print.side_effect = mock_print_func
477 478
478 479 # ip._showtraceback() is replaced in globalipapp.py.
479 480 # Call original method to test.
480 481 interactiveshell.InteractiveShell._showtraceback(ip, None, None, chr(0xD8FF))
481 482
482 483 self.assertEqual(mocked_print.call_count, 2)
483 484 self.assertEqual(values, [chr(0xD8FF), "\\ud8ff"])
484 485
485 486 def test_mktempfile(self):
486 487 filename = ip.mktempfile()
487 488 # Check that we can open the file again on Windows
488 489 with open(filename, "w", encoding="utf-8") as f:
489 490 f.write("abc")
490 491
491 492 filename = ip.mktempfile(data="blah")
492 493 with open(filename, "r", encoding="utf-8") as f:
493 494 self.assertEqual(f.read(), "blah")
494 495
495 496 def test_new_main_mod(self):
496 497 # Smoketest to check that this accepts a unicode module name
497 498 name = u'jiefmw'
498 499 mod = ip.new_main_mod(u'%s.py' % name, name)
499 500 self.assertEqual(mod.__name__, name)
500 501
501 502 def test_get_exception_only(self):
502 503 try:
503 504 raise KeyboardInterrupt
504 505 except KeyboardInterrupt:
505 506 msg = ip.get_exception_only()
506 507 self.assertEqual(msg, 'KeyboardInterrupt\n')
507 508
508 509 try:
509 510 raise DerivedInterrupt("foo")
510 511 except KeyboardInterrupt:
511 512 msg = ip.get_exception_only()
512 513 self.assertEqual(msg, 'IPython.core.tests.test_interactiveshell.DerivedInterrupt: foo\n')
513 514
514 515 def test_inspect_text(self):
515 516 ip.run_cell('a = 5')
516 517 text = ip.object_inspect_text('a')
517 518 self.assertIsInstance(text, str)
518 519
519 520 def test_last_execution_result(self):
520 521 """ Check that last execution result gets set correctly (GH-10702) """
521 522 result = ip.run_cell('a = 5; a')
522 523 self.assertTrue(ip.last_execution_succeeded)
523 524 self.assertEqual(ip.last_execution_result.result, 5)
524 525
525 526 result = ip.run_cell('a = x_invalid_id_x')
526 527 self.assertFalse(ip.last_execution_succeeded)
527 528 self.assertFalse(ip.last_execution_result.success)
528 529 self.assertIsInstance(ip.last_execution_result.error_in_exec, NameError)
529 530
530 531 def test_reset_aliasing(self):
531 532 """ Check that standard posix aliases work after %reset. """
532 533 if os.name != 'posix':
533 534 return
534 535
535 536 ip.reset()
536 537 for cmd in ('clear', 'more', 'less', 'man'):
537 538 res = ip.run_cell('%' + cmd)
538 539 self.assertEqual(res.success, True)
539 540
540 541
541 542 class TestSafeExecfileNonAsciiPath(unittest.TestCase):
542 543
543 544 @onlyif_unicode_paths
544 545 def setUp(self):
545 546 self.BASETESTDIR = tempfile.mkdtemp()
546 547 self.TESTDIR = join(self.BASETESTDIR, u"Γ₯Àâ")
547 548 os.mkdir(self.TESTDIR)
548 549 with open(
549 550 join(self.TESTDIR, u"Γ₯Àâtestscript.py"), "w", encoding="utf-8"
550 551 ) as sfile:
551 552 sfile.write("pass\n")
552 553 self.oldpath = os.getcwd()
553 554 os.chdir(self.TESTDIR)
554 555 self.fname = u"Γ₯Àâtestscript.py"
555 556
556 557 def tearDown(self):
557 558 os.chdir(self.oldpath)
558 559 shutil.rmtree(self.BASETESTDIR)
559 560
560 561 @onlyif_unicode_paths
561 562 def test_1(self):
562 563 """Test safe_execfile with non-ascii path
563 564 """
564 565 ip.safe_execfile(self.fname, {}, raise_exceptions=True)
565 566
566 567 class ExitCodeChecks(tt.TempFileMixin):
567 568
568 569 def setUp(self):
569 570 self.system = ip.system_raw
570 571
571 572 def test_exit_code_ok(self):
572 573 self.system('exit 0')
573 574 self.assertEqual(ip.user_ns['_exit_code'], 0)
574 575
575 576 def test_exit_code_error(self):
576 577 self.system('exit 1')
577 578 self.assertEqual(ip.user_ns['_exit_code'], 1)
578 579
579 580 @skipif(not hasattr(signal, 'SIGALRM'))
580 581 def test_exit_code_signal(self):
581 582 self.mktmp("import signal, time\n"
582 583 "signal.setitimer(signal.ITIMER_REAL, 0.1)\n"
583 584 "time.sleep(1)\n")
584 585 self.system("%s %s" % (sys.executable, self.fname))
585 586 self.assertEqual(ip.user_ns['_exit_code'], -signal.SIGALRM)
586 587
587 588 @onlyif_cmds_exist("csh")
588 def test_exit_code_signal_csh(self):
589 SHELL = os.environ.get('SHELL', None)
590 os.environ['SHELL'] = find_cmd("csh")
589 def test_exit_code_signal_csh(self): # pragma: no cover
590 SHELL = os.environ.get("SHELL", None)
591 os.environ["SHELL"] = find_cmd("csh")
591 592 try:
592 593 self.test_exit_code_signal()
593 594 finally:
594 595 if SHELL is not None:
595 596 os.environ['SHELL'] = SHELL
596 597 else:
597 598 del os.environ['SHELL']
598 599
599 600
600 601 class TestSystemRaw(ExitCodeChecks):
601 602
602 603 def setUp(self):
603 604 super().setUp()
604 605 self.system = ip.system_raw
605 606
606 607 @onlyif_unicode_paths
607 608 def test_1(self):
608 609 """Test system_raw with non-ascii cmd
609 610 """
610 611 cmd = u'''python -c "'Γ₯Àâ'" '''
611 612 ip.system_raw(cmd)
612 613
613 614 @mock.patch('subprocess.call', side_effect=KeyboardInterrupt)
614 615 @mock.patch('os.system', side_effect=KeyboardInterrupt)
615 616 def test_control_c(self, *mocks):
616 617 try:
617 618 self.system("sleep 1 # wont happen")
618 except KeyboardInterrupt:
619 except KeyboardInterrupt: # pragma: no cove
619 620 self.fail(
620 621 "system call should intercept "
621 622 "keyboard interrupt from subprocess.call"
622 623 )
623 624 self.assertEqual(ip.user_ns["_exit_code"], -signal.SIGINT)
624 625
625 626 def test_magic_warnings(self):
626 627 for magic_cmd in ("pip", "conda", "cd"):
627 628 with self.assertWarnsRegex(Warning, "You executed the system command"):
628 629 ip.system_raw(magic_cmd)
629 630
630 631 # TODO: Exit codes are currently ignored on Windows.
631 632 class TestSystemPipedExitCode(ExitCodeChecks):
632 633
633 634 def setUp(self):
634 635 super().setUp()
635 636 self.system = ip.system_piped
636 637
637 638 @skip_win32
638 639 def test_exit_code_ok(self):
639 640 ExitCodeChecks.test_exit_code_ok(self)
640 641
641 642 @skip_win32
642 643 def test_exit_code_error(self):
643 644 ExitCodeChecks.test_exit_code_error(self)
644 645
645 646 @skip_win32
646 647 def test_exit_code_signal(self):
647 648 ExitCodeChecks.test_exit_code_signal(self)
648 649
649 650 class TestModules(tt.TempFileMixin):
650 651 def test_extraneous_loads(self):
651 652 """Test we're not loading modules on startup that we shouldn't.
652 653 """
653 654 self.mktmp("import sys\n"
654 655 "print('numpy' in sys.modules)\n"
655 656 "print('ipyparallel' in sys.modules)\n"
656 657 "print('ipykernel' in sys.modules)\n"
657 658 )
658 659 out = "False\nFalse\nFalse\n"
659 660 tt.ipexec_validate(self.fname, out)
660 661
661 662 class Negator(ast.NodeTransformer):
662 663 """Negates all number literals in an AST."""
663 664
664 665 # for python 3.7 and earlier
665 666 def visit_Num(self, node):
666 667 node.n = -node.n
667 668 return node
668 669
669 670 # for python 3.8+
670 671 def visit_Constant(self, node):
671 672 if isinstance(node.value, int):
672 673 return self.visit_Num(node)
673 674 return node
674 675
675 676 class TestAstTransform(unittest.TestCase):
676 677 def setUp(self):
677 678 self.negator = Negator()
678 679 ip.ast_transformers.append(self.negator)
679 680
680 681 def tearDown(self):
681 682 ip.ast_transformers.remove(self.negator)
682
683
684 def test_non_int_const(self):
685 with tt.AssertPrints("hello"):
686 ip.run_cell('print("hello")')
687
683 688 def test_run_cell(self):
684 with tt.AssertPrints('-34'):
685 ip.run_cell('print (12 + 22)')
686
689 with tt.AssertPrints("-34"):
690 ip.run_cell("print(12 + 22)")
691
687 692 # A named reference to a number shouldn't be transformed.
688 ip.user_ns['n'] = 55
689 with tt.AssertNotPrints('-55'):
690 ip.run_cell('print (n)')
691
693 ip.user_ns["n"] = 55
694 with tt.AssertNotPrints("-55"):
695 ip.run_cell("print(n)")
696
692 697 def test_timeit(self):
693 698 called = set()
694 699 def f(x):
695 700 called.add(x)
696 701 ip.push({'f':f})
697 702
698 703 with tt.AssertPrints("std. dev. of"):
699 704 ip.run_line_magic("timeit", "-n1 f(1)")
700 705 self.assertEqual(called, {-1})
701 706 called.clear()
702 707
703 708 with tt.AssertPrints("std. dev. of"):
704 709 ip.run_cell_magic("timeit", "-n1 f(2)", "f(3)")
705 710 self.assertEqual(called, {-2, -3})
706 711
707 712 def test_time(self):
708 713 called = []
709 714 def f(x):
710 715 called.append(x)
711 716 ip.push({'f':f})
712 717
713 718 # Test with an expression
714 719 with tt.AssertPrints("Wall time: "):
715 720 ip.run_line_magic("time", "f(5+9)")
716 721 self.assertEqual(called, [-14])
717 722 called[:] = []
718 723
719 724 # Test with a statement (different code path)
720 725 with tt.AssertPrints("Wall time: "):
721 726 ip.run_line_magic("time", "a = f(-3 + -2)")
722 727 self.assertEqual(called, [5])
723 728
724 729 def test_macro(self):
725 730 ip.push({'a':10})
726 731 # The AST transformation makes this do a+=-1
727 732 ip.define_macro("amacro", "a+=1\nprint(a)")
728 733
729 734 with tt.AssertPrints("9"):
730 735 ip.run_cell("amacro")
731 736 with tt.AssertPrints("8"):
732 737 ip.run_cell("amacro")
733 738
734 739 class TestMiscTransform(unittest.TestCase):
735 740
736 741
737 742 def test_transform_only_once(self):
738 743 cleanup = 0
739 744 line_t = 0
740 745 def count_cleanup(lines):
741 746 nonlocal cleanup
742 747 cleanup += 1
743 748 return lines
744 749
745 750 def count_line_t(lines):
746 751 nonlocal line_t
747 752 line_t += 1
748 753 return lines
749 754
750 755 ip.input_transformer_manager.cleanup_transforms.append(count_cleanup)
751 756 ip.input_transformer_manager.line_transforms.append(count_line_t)
752 757
753 758 ip.run_cell('1')
754 759
755 760 assert cleanup == 1
756 761 assert line_t == 1
757 762
758 763 class IntegerWrapper(ast.NodeTransformer):
759 764 """Wraps all integers in a call to Integer()"""
760 765
761 766 # for Python 3.7 and earlier
762 767
763 768 # for Python 3.7 and earlier
764 769 def visit_Num(self, node):
765 770 if isinstance(node.n, int):
766 771 return ast.Call(func=ast.Name(id='Integer', ctx=ast.Load()),
767 772 args=[node], keywords=[])
768 773 return node
769 774
770 775 # For Python 3.8+
771 776 def visit_Constant(self, node):
772 777 if isinstance(node.value, int):
773 778 return self.visit_Num(node)
774 779 return node
775 780
776 781
777 782 class TestAstTransform2(unittest.TestCase):
778 783 def setUp(self):
779 784 self.intwrapper = IntegerWrapper()
780 785 ip.ast_transformers.append(self.intwrapper)
781 786
782 787 self.calls = []
783 788 def Integer(*args):
784 789 self.calls.append(args)
785 790 return args
786 791 ip.push({"Integer": Integer})
787 792
788 793 def tearDown(self):
789 794 ip.ast_transformers.remove(self.intwrapper)
790 795 del ip.user_ns['Integer']
791 796
792 797 def test_run_cell(self):
793 798 ip.run_cell("n = 2")
794 799 self.assertEqual(self.calls, [(2,)])
795 800
796 801 # This shouldn't throw an error
797 802 ip.run_cell("o = 2.0")
798 803 self.assertEqual(ip.user_ns['o'], 2.0)
799
804
805 def test_run_cell_non_int(self):
806 ip.run_cell("n = 'a'")
807 assert self.calls == []
808
800 809 def test_timeit(self):
801 810 called = set()
802 811 def f(x):
803 812 called.add(x)
804 813 ip.push({'f':f})
805 814
806 815 with tt.AssertPrints("std. dev. of"):
807 816 ip.run_line_magic("timeit", "-n1 f(1)")
808 817 self.assertEqual(called, {(1,)})
809 818 called.clear()
810 819
811 820 with tt.AssertPrints("std. dev. of"):
812 821 ip.run_cell_magic("timeit", "-n1 f(2)", "f(3)")
813 822 self.assertEqual(called, {(2,), (3,)})
814 823
815 824 class ErrorTransformer(ast.NodeTransformer):
816 825 """Throws an error when it sees a number."""
817 826
818 # for Python 3.7 and earlier
819 def visit_Num(self, node):
820 raise ValueError("test")
821
822 # for Python 3.8+
823 827 def visit_Constant(self, node):
824 828 if isinstance(node.value, int):
825 return self.visit_Num(node)
829 raise ValueError("test")
826 830 return node
827 831
828 832
829 833 class TestAstTransformError(unittest.TestCase):
830 834 def test_unregistering(self):
831 835 err_transformer = ErrorTransformer()
832 836 ip.ast_transformers.append(err_transformer)
833 837
834 838 with self.assertWarnsRegex(UserWarning, "It will be unregistered"):
835 839 ip.run_cell("1 + 2")
836 840
837 841 # This should have been removed.
838 842 self.assertNotIn(err_transformer, ip.ast_transformers)
839 843
840 844
841 845 class StringRejector(ast.NodeTransformer):
842 846 """Throws an InputRejected when it sees a string literal.
843 847
844 848 Used to verify that NodeTransformers can signal that a piece of code should
845 849 not be executed by throwing an InputRejected.
846 850 """
847 851
848 #for python 3.7 and earlier
849 def visit_Str(self, node):
850 raise InputRejected("test")
851
852 852 # 3.8 only
853 853 def visit_Constant(self, node):
854 854 if isinstance(node.value, str):
855 855 raise InputRejected("test")
856 856 return node
857 857
858 858
859 859 class TestAstTransformInputRejection(unittest.TestCase):
860 860
861 861 def setUp(self):
862 862 self.transformer = StringRejector()
863 863 ip.ast_transformers.append(self.transformer)
864 864
865 865 def tearDown(self):
866 866 ip.ast_transformers.remove(self.transformer)
867 867
868 868 def test_input_rejection(self):
869 869 """Check that NodeTransformers can reject input."""
870 870
871 871 expect_exception_tb = tt.AssertPrints("InputRejected: test")
872 872 expect_no_cell_output = tt.AssertNotPrints("'unsafe'", suppress=False)
873 873
874 874 # Run the same check twice to verify that the transformer is not
875 875 # disabled after raising.
876 876 with expect_exception_tb, expect_no_cell_output:
877 877 ip.run_cell("'unsafe'")
878 878
879 879 with expect_exception_tb, expect_no_cell_output:
880 880 res = ip.run_cell("'unsafe'")
881 881
882 882 self.assertIsInstance(res.error_before_exec, InputRejected)
883 883
884 884 def test__IPYTHON__():
885 885 # This shouldn't raise a NameError, that's all
886 886 __IPYTHON__
887 887
888 888
889 889 class DummyRepr(object):
890 890 def __repr__(self):
891 891 return "DummyRepr"
892 892
893 893 def _repr_html_(self):
894 894 return "<b>dummy</b>"
895 895
896 896 def _repr_javascript_(self):
897 897 return "console.log('hi');", {'key': 'value'}
898 898
899 899
900 900 def test_user_variables():
901 901 # enable all formatters
902 902 ip.display_formatter.active_types = ip.display_formatter.format_types
903 903
904 904 ip.user_ns['dummy'] = d = DummyRepr()
905 905 keys = {'dummy', 'doesnotexist'}
906 906 r = ip.user_expressions({ key:key for key in keys})
907 907
908 908 assert keys == set(r.keys())
909 909 dummy = r["dummy"]
910 910 assert {"status", "data", "metadata"} == set(dummy.keys())
911 911 assert dummy["status"] == "ok"
912 912 data = dummy["data"]
913 913 metadata = dummy["metadata"]
914 914 assert data.get("text/html") == d._repr_html_()
915 915 js, jsmd = d._repr_javascript_()
916 916 assert data.get("application/javascript") == js
917 917 assert metadata.get("application/javascript") == jsmd
918 918
919 919 dne = r["doesnotexist"]
920 920 assert dne["status"] == "error"
921 921 assert dne["ename"] == "NameError"
922 922
923 923 # back to text only
924 924 ip.display_formatter.active_types = ['text/plain']
925 925
926 926 def test_user_expression():
927 927 # enable all formatters
928 928 ip.display_formatter.active_types = ip.display_formatter.format_types
929 929 query = {
930 930 'a' : '1 + 2',
931 931 'b' : '1/0',
932 932 }
933 933 r = ip.user_expressions(query)
934 934 import pprint
935 935 pprint.pprint(r)
936 936 assert set(r.keys()) == set(query.keys())
937 937 a = r["a"]
938 938 assert {"status", "data", "metadata"} == set(a.keys())
939 939 assert a["status"] == "ok"
940 940 data = a["data"]
941 941 metadata = a["metadata"]
942 942 assert data.get("text/plain") == "3"
943 943
944 944 b = r["b"]
945 945 assert b["status"] == "error"
946 946 assert b["ename"] == "ZeroDivisionError"
947 947
948 948 # back to text only
949 949 ip.display_formatter.active_types = ['text/plain']
950 950
951 951
952 952 class TestSyntaxErrorTransformer(unittest.TestCase):
953 953 """Check that SyntaxError raised by an input transformer is handled by run_cell()"""
954 954
955 955 @staticmethod
956 956 def transformer(lines):
957 957 for line in lines:
958 958 pos = line.find('syntaxerror')
959 959 if pos >= 0:
960 960 e = SyntaxError('input contains "syntaxerror"')
961 961 e.text = line
962 962 e.offset = pos + 1
963 963 raise e
964 964 return lines
965 965
966 966 def setUp(self):
967 967 ip.input_transformers_post.append(self.transformer)
968 968
969 969 def tearDown(self):
970 970 ip.input_transformers_post.remove(self.transformer)
971 971
972 972 def test_syntaxerror_input_transformer(self):
973 973 with tt.AssertPrints('1234'):
974 974 ip.run_cell('1234')
975 975 with tt.AssertPrints('SyntaxError: invalid syntax'):
976 976 ip.run_cell('1 2 3') # plain python syntax error
977 977 with tt.AssertPrints('SyntaxError: input contains "syntaxerror"'):
978 978 ip.run_cell('2345 # syntaxerror') # input transformer syntax error
979 979 with tt.AssertPrints('3456'):
980 980 ip.run_cell('3456')
981 981
982 982
983 983 class TestWarningSuppression(unittest.TestCase):
984 984 def test_warning_suppression(self):
985 985 ip.run_cell("import warnings")
986 986 try:
987 987 with self.assertWarnsRegex(UserWarning, "asdf"):
988 988 ip.run_cell("warnings.warn('asdf')")
989 989 # Here's the real test -- if we run that again, we should get the
990 990 # warning again. Traditionally, each warning was only issued once per
991 991 # IPython session (approximately), even if the user typed in new and
992 992 # different code that should have also triggered the warning, leading
993 993 # to much confusion.
994 994 with self.assertWarnsRegex(UserWarning, "asdf"):
995 995 ip.run_cell("warnings.warn('asdf')")
996 996 finally:
997 997 ip.run_cell("del warnings")
998 998
999 999
1000 1000 def test_deprecation_warning(self):
1001 1001 ip.run_cell("""
1002 1002 import warnings
1003 1003 def wrn():
1004 1004 warnings.warn(
1005 1005 "I AM A WARNING",
1006 1006 DeprecationWarning
1007 1007 )
1008 1008 """)
1009 1009 try:
1010 1010 with self.assertWarnsRegex(DeprecationWarning, "I AM A WARNING"):
1011 1011 ip.run_cell("wrn()")
1012 1012 finally:
1013 1013 ip.run_cell("del warnings")
1014 1014 ip.run_cell("del wrn")
1015 1015
1016 1016
1017 1017 class TestImportNoDeprecate(tt.TempFileMixin):
1018 1018
1019 1019 def setUp(self):
1020 1020 """Make a valid python temp file."""
1021 1021 self.mktmp("""
1022 1022 import warnings
1023 1023 def wrn():
1024 1024 warnings.warn(
1025 1025 "I AM A WARNING",
1026 1026 DeprecationWarning
1027 1027 )
1028 1028 """)
1029 1029 super().setUp()
1030 1030
1031 1031 def test_no_dep(self):
1032 1032 """
1033 1033 No deprecation warning should be raised from imported functions
1034 1034 """
1035 1035 ip.run_cell("from {} import wrn".format(self.fname))
1036 1036
1037 1037 with tt.AssertNotPrints("I AM A WARNING"):
1038 1038 ip.run_cell("wrn()")
1039 1039 ip.run_cell("del wrn")
1040 1040
1041 1041
1042 1042 def test_custom_exc_count():
1043 1043 hook = mock.Mock(return_value=None)
1044 1044 ip.set_custom_exc((SyntaxError,), hook)
1045 1045 before = ip.execution_count
1046 1046 ip.run_cell("def foo()", store_history=True)
1047 1047 # restore default excepthook
1048 1048 ip.set_custom_exc((), None)
1049 1049 assert hook.call_count == 1
1050 1050 assert ip.execution_count == before + 1
1051 1051
1052 1052
1053 1053 def test_run_cell_async():
1054 1054 ip.run_cell("import asyncio")
1055 1055 coro = ip.run_cell_async("await asyncio.sleep(0.01)\n5")
1056 1056 assert asyncio.iscoroutine(coro)
1057 1057 loop = asyncio.new_event_loop()
1058 1058 result = loop.run_until_complete(coro)
1059 1059 assert isinstance(result, interactiveshell.ExecutionResult)
1060 1060 assert result.result == 5
1061 1061
1062 1062
1063 1063 def test_run_cell_await():
1064 1064 ip.run_cell("import asyncio")
1065 1065 result = ip.run_cell("await asyncio.sleep(0.01); 10")
1066 1066 assert ip.user_ns["_"] == 10
1067 1067
1068 1068
1069 1069 def test_run_cell_asyncio_run():
1070 1070 ip.run_cell("import asyncio")
1071 1071 result = ip.run_cell("await asyncio.sleep(0.01); 1")
1072 1072 assert ip.user_ns["_"] == 1
1073 1073 result = ip.run_cell("asyncio.run(asyncio.sleep(0.01)); 2")
1074 1074 assert ip.user_ns["_"] == 2
1075 1075 result = ip.run_cell("await asyncio.sleep(0.01); 3")
1076 1076 assert ip.user_ns["_"] == 3
1077 1077
1078 1078
1079 1079 def test_should_run_async():
1080 1080 assert not ip.should_run_async("a = 5")
1081 1081 assert ip.should_run_async("await x")
1082 1082 assert ip.should_run_async("import asyncio; await asyncio.sleep(1)")
1083 1083
1084 1084
1085 1085 def test_set_custom_completer():
1086 1086 num_completers = len(ip.Completer.matchers)
1087 1087
1088 1088 def foo(*args, **kwargs):
1089 1089 return "I'm a completer!"
1090 1090
1091 1091 ip.set_custom_completer(foo, 0)
1092 1092
1093 1093 # check that we've really added a new completer
1094 1094 assert len(ip.Completer.matchers) == num_completers + 1
1095 1095
1096 1096 # check that the first completer is the function we defined
1097 1097 assert ip.Completer.matchers[0]() == "I'm a completer!"
1098 1098
1099 1099 # clean up
1100 1100 ip.Completer.custom_matchers.pop()
General Comments 0
You need to be logged in to leave comments. Login now