##// END OF EJS Templates
Return current node instead of None
Matthias Bussonnier -
Show More
@@ -1,981 +1,985 b''
1 1 # -*- coding: utf-8 -*-
2 2 """Tests for the key interactiveshell module.
3 3
4 4 Historically the main classes in interactiveshell have been under-tested. This
5 5 module should grow as many single-method tests as possible to trap many of the
6 6 recurring bugs we seem to encounter with high-level interaction.
7 7 """
8 8
9 9 # Copyright (c) IPython Development Team.
10 10 # Distributed under the terms of the Modified BSD License.
11 11
12 12 import asyncio
13 13 import ast
14 14 import os
15 15 import signal
16 16 import shutil
17 17 import sys
18 18 import tempfile
19 19 import unittest
20 20 from unittest import mock
21 21
22 22 from os.path import join
23 23
24 24 import nose.tools as nt
25 25
26 26 from IPython.core.error import InputRejected
27 27 from IPython.core.inputtransformer import InputTransformer
28 28 from IPython.core import interactiveshell
29 29 from IPython.testing.decorators import (
30 30 skipif, skip_win32, onlyif_unicode_paths, onlyif_cmds_exist,
31 31 )
32 32 from IPython.testing import tools as tt
33 33 from IPython.utils.process import find_cmd
34 34
35 35 #-----------------------------------------------------------------------------
36 36 # Globals
37 37 #-----------------------------------------------------------------------------
38 38 # This is used by every single test, no point repeating it ad nauseam
39 39 ip = get_ipython()
40 40
41 41 #-----------------------------------------------------------------------------
42 42 # Tests
43 43 #-----------------------------------------------------------------------------
44 44
45 45 class DerivedInterrupt(KeyboardInterrupt):
46 46 pass
47 47
48 48 class InteractiveShellTestCase(unittest.TestCase):
49 49 def test_naked_string_cells(self):
50 50 """Test that cells with only naked strings are fully executed"""
51 51 # First, single-line inputs
52 52 ip.run_cell('"a"\n')
53 53 self.assertEqual(ip.user_ns['_'], 'a')
54 54 # And also multi-line cells
55 55 ip.run_cell('"""a\nb"""\n')
56 56 self.assertEqual(ip.user_ns['_'], 'a\nb')
57 57
58 58 def test_run_empty_cell(self):
59 59 """Just make sure we don't get a horrible error with a blank
60 60 cell of input. Yes, I did overlook that."""
61 61 old_xc = ip.execution_count
62 62 res = ip.run_cell('')
63 63 self.assertEqual(ip.execution_count, old_xc)
64 64 self.assertEqual(res.execution_count, None)
65 65
66 66 def test_run_cell_multiline(self):
67 67 """Multi-block, multi-line cells must execute correctly.
68 68 """
69 69 src = '\n'.join(["x=1",
70 70 "y=2",
71 71 "if 1:",
72 72 " x += 1",
73 73 " y += 1",])
74 74 res = ip.run_cell(src)
75 75 self.assertEqual(ip.user_ns['x'], 2)
76 76 self.assertEqual(ip.user_ns['y'], 3)
77 77 self.assertEqual(res.success, True)
78 78 self.assertEqual(res.result, None)
79 79
80 80 def test_multiline_string_cells(self):
81 81 "Code sprinkled with multiline strings should execute (GH-306)"
82 82 ip.run_cell('tmp=0')
83 83 self.assertEqual(ip.user_ns['tmp'], 0)
84 84 res = ip.run_cell('tmp=1;"""a\nb"""\n')
85 85 self.assertEqual(ip.user_ns['tmp'], 1)
86 86 self.assertEqual(res.success, True)
87 87 self.assertEqual(res.result, "a\nb")
88 88
89 89 def test_dont_cache_with_semicolon(self):
90 90 "Ending a line with semicolon should not cache the returned object (GH-307)"
91 91 oldlen = len(ip.user_ns['Out'])
92 92 for cell in ['1;', '1;1;']:
93 93 res = ip.run_cell(cell, store_history=True)
94 94 newlen = len(ip.user_ns['Out'])
95 95 self.assertEqual(oldlen, newlen)
96 96 self.assertIsNone(res.result)
97 97 i = 0
98 98 #also test the default caching behavior
99 99 for cell in ['1', '1;1']:
100 100 ip.run_cell(cell, store_history=True)
101 101 newlen = len(ip.user_ns['Out'])
102 102 i += 1
103 103 self.assertEqual(oldlen+i, newlen)
104 104
105 105 def test_syntax_error(self):
106 106 res = ip.run_cell("raise = 3")
107 107 self.assertIsInstance(res.error_before_exec, SyntaxError)
108 108
109 109 def test_In_variable(self):
110 110 "Verify that In variable grows with user input (GH-284)"
111 111 oldlen = len(ip.user_ns['In'])
112 112 ip.run_cell('1;', store_history=True)
113 113 newlen = len(ip.user_ns['In'])
114 114 self.assertEqual(oldlen+1, newlen)
115 115 self.assertEqual(ip.user_ns['In'][-1],'1;')
116 116
117 117 def test_magic_names_in_string(self):
118 118 ip.run_cell('a = """\n%exit\n"""')
119 119 self.assertEqual(ip.user_ns['a'], '\n%exit\n')
120 120
121 121 def test_trailing_newline(self):
122 122 """test that running !(command) does not raise a SyntaxError"""
123 123 ip.run_cell('!(true)\n', False)
124 124 ip.run_cell('!(true)\n\n\n', False)
125 125
126 126 def test_gh_597(self):
127 127 """Pretty-printing lists of objects with non-ascii reprs may cause
128 128 problems."""
129 129 class Spam(object):
130 130 def __repr__(self):
131 131 return "\xe9"*50
132 132 import IPython.core.formatters
133 133 f = IPython.core.formatters.PlainTextFormatter()
134 134 f([Spam(),Spam()])
135 135
136 136
137 137 def test_future_flags(self):
138 138 """Check that future flags are used for parsing code (gh-777)"""
139 139 ip.run_cell('from __future__ import barry_as_FLUFL')
140 140 try:
141 141 ip.run_cell('prfunc_return_val = 1 <> 2')
142 142 assert 'prfunc_return_val' in ip.user_ns
143 143 finally:
144 144 # Reset compiler flags so we don't mess up other tests.
145 145 ip.compile.reset_compiler_flags()
146 146
147 147 def test_can_pickle(self):
148 148 "Can we pickle objects defined interactively (GH-29)"
149 149 ip = get_ipython()
150 150 ip.reset()
151 151 ip.run_cell(("class Mylist(list):\n"
152 152 " def __init__(self,x=[]):\n"
153 153 " list.__init__(self,x)"))
154 154 ip.run_cell("w=Mylist([1,2,3])")
155 155
156 156 from pickle import dumps
157 157
158 158 # We need to swap in our main module - this is only necessary
159 159 # inside the test framework, because IPython puts the interactive module
160 160 # in place (but the test framework undoes this).
161 161 _main = sys.modules['__main__']
162 162 sys.modules['__main__'] = ip.user_module
163 163 try:
164 164 res = dumps(ip.user_ns["w"])
165 165 finally:
166 166 sys.modules['__main__'] = _main
167 167 self.assertTrue(isinstance(res, bytes))
168 168
169 169 def test_global_ns(self):
170 170 "Code in functions must be able to access variables outside them."
171 171 ip = get_ipython()
172 172 ip.run_cell("a = 10")
173 173 ip.run_cell(("def f(x):\n"
174 174 " return x + a"))
175 175 ip.run_cell("b = f(12)")
176 176 self.assertEqual(ip.user_ns["b"], 22)
177 177
178 178 def test_bad_custom_tb(self):
179 179 """Check that InteractiveShell is protected from bad custom exception handlers"""
180 180 ip.set_custom_exc((IOError,), lambda etype,value,tb: 1/0)
181 181 self.assertEqual(ip.custom_exceptions, (IOError,))
182 182 with tt.AssertPrints("Custom TB Handler failed", channel='stderr'):
183 183 ip.run_cell(u'raise IOError("foo")')
184 184 self.assertEqual(ip.custom_exceptions, ())
185 185
186 186 def test_bad_custom_tb_return(self):
187 187 """Check that InteractiveShell is protected from bad return types in custom exception handlers"""
188 188 ip.set_custom_exc((NameError,),lambda etype,value,tb, tb_offset=None: 1)
189 189 self.assertEqual(ip.custom_exceptions, (NameError,))
190 190 with tt.AssertPrints("Custom TB Handler failed", channel='stderr'):
191 191 ip.run_cell(u'a=abracadabra')
192 192 self.assertEqual(ip.custom_exceptions, ())
193 193
194 194 def test_drop_by_id(self):
195 195 myvars = {"a":object(), "b":object(), "c": object()}
196 196 ip.push(myvars, interactive=False)
197 197 for name in myvars:
198 198 assert name in ip.user_ns, name
199 199 assert name in ip.user_ns_hidden, name
200 200 ip.user_ns['b'] = 12
201 201 ip.drop_by_id(myvars)
202 202 for name in ["a", "c"]:
203 203 assert name not in ip.user_ns, name
204 204 assert name not in ip.user_ns_hidden, name
205 205 assert ip.user_ns['b'] == 12
206 206 ip.reset()
207 207
208 208 def test_var_expand(self):
209 209 ip.user_ns['f'] = u'Ca\xf1o'
210 210 self.assertEqual(ip.var_expand(u'echo $f'), u'echo Ca\xf1o')
211 211 self.assertEqual(ip.var_expand(u'echo {f}'), u'echo Ca\xf1o')
212 212 self.assertEqual(ip.var_expand(u'echo {f[:-1]}'), u'echo Ca\xf1')
213 213 self.assertEqual(ip.var_expand(u'echo {1*2}'), u'echo 2')
214 214
215 215 self.assertEqual(ip.var_expand(u"grep x | awk '{print $1}'"), u"grep x | awk '{print $1}'")
216 216
217 217 ip.user_ns['f'] = b'Ca\xc3\xb1o'
218 218 # This should not raise any exception:
219 219 ip.var_expand(u'echo $f')
220 220
221 221 def test_var_expand_local(self):
222 222 """Test local variable expansion in !system and %magic calls"""
223 223 # !system
224 224 ip.run_cell('def test():\n'
225 225 ' lvar = "ttt"\n'
226 226 ' ret = !echo {lvar}\n'
227 227 ' return ret[0]\n')
228 228 res = ip.user_ns['test']()
229 229 nt.assert_in('ttt', res)
230 230
231 231 # %magic
232 232 ip.run_cell('def makemacro():\n'
233 233 ' macroname = "macro_var_expand_locals"\n'
234 234 ' %macro {macroname} codestr\n')
235 235 ip.user_ns['codestr'] = "str(12)"
236 236 ip.run_cell('makemacro()')
237 237 nt.assert_in('macro_var_expand_locals', ip.user_ns)
238 238
239 239 def test_var_expand_self(self):
240 240 """Test variable expansion with the name 'self', which was failing.
241 241
242 242 See https://github.com/ipython/ipython/issues/1878#issuecomment-7698218
243 243 """
244 244 ip.run_cell('class cTest:\n'
245 245 ' classvar="see me"\n'
246 246 ' def test(self):\n'
247 247 ' res = !echo Variable: {self.classvar}\n'
248 248 ' return res[0]\n')
249 249 nt.assert_in('see me', ip.user_ns['cTest']().test())
250 250
251 251 def test_bad_var_expand(self):
252 252 """var_expand on invalid formats shouldn't raise"""
253 253 # SyntaxError
254 254 self.assertEqual(ip.var_expand(u"{'a':5}"), u"{'a':5}")
255 255 # NameError
256 256 self.assertEqual(ip.var_expand(u"{asdf}"), u"{asdf}")
257 257 # ZeroDivisionError
258 258 self.assertEqual(ip.var_expand(u"{1/0}"), u"{1/0}")
259 259
260 260 def test_silent_postexec(self):
261 261 """run_cell(silent=True) doesn't invoke pre/post_run_cell callbacks"""
262 262 pre_explicit = mock.Mock()
263 263 pre_always = mock.Mock()
264 264 post_explicit = mock.Mock()
265 265 post_always = mock.Mock()
266 266 all_mocks = [pre_explicit, pre_always, post_explicit, post_always]
267 267
268 268 ip.events.register('pre_run_cell', pre_explicit)
269 269 ip.events.register('pre_execute', pre_always)
270 270 ip.events.register('post_run_cell', post_explicit)
271 271 ip.events.register('post_execute', post_always)
272 272
273 273 try:
274 274 ip.run_cell("1", silent=True)
275 275 assert pre_always.called
276 276 assert not pre_explicit.called
277 277 assert post_always.called
278 278 assert not post_explicit.called
279 279 # double-check that non-silent exec did what we expected
280 280 # silent to avoid
281 281 ip.run_cell("1")
282 282 assert pre_explicit.called
283 283 assert post_explicit.called
284 284 info, = pre_explicit.call_args[0]
285 285 result, = post_explicit.call_args[0]
286 286 self.assertEqual(info, result.info)
287 287 # check that post hooks are always called
288 288 [m.reset_mock() for m in all_mocks]
289 289 ip.run_cell("syntax error")
290 290 assert pre_always.called
291 291 assert pre_explicit.called
292 292 assert post_always.called
293 293 assert post_explicit.called
294 294 info, = pre_explicit.call_args[0]
295 295 result, = post_explicit.call_args[0]
296 296 self.assertEqual(info, result.info)
297 297 finally:
298 298 # remove post-exec
299 299 ip.events.unregister('pre_run_cell', pre_explicit)
300 300 ip.events.unregister('pre_execute', pre_always)
301 301 ip.events.unregister('post_run_cell', post_explicit)
302 302 ip.events.unregister('post_execute', post_always)
303 303
304 304 def test_silent_noadvance(self):
305 305 """run_cell(silent=True) doesn't advance execution_count"""
306 306 ec = ip.execution_count
307 307 # silent should force store_history=False
308 308 ip.run_cell("1", store_history=True, silent=True)
309 309
310 310 self.assertEqual(ec, ip.execution_count)
311 311 # double-check that non-silent exec did what we expected
312 312 # silent to avoid
313 313 ip.run_cell("1", store_history=True)
314 314 self.assertEqual(ec+1, ip.execution_count)
315 315
316 316 def test_silent_nodisplayhook(self):
317 317 """run_cell(silent=True) doesn't trigger displayhook"""
318 318 d = dict(called=False)
319 319
320 320 trap = ip.display_trap
321 321 save_hook = trap.hook
322 322
323 323 def failing_hook(*args, **kwargs):
324 324 d['called'] = True
325 325
326 326 try:
327 327 trap.hook = failing_hook
328 328 res = ip.run_cell("1", silent=True)
329 329 self.assertFalse(d['called'])
330 330 self.assertIsNone(res.result)
331 331 # double-check that non-silent exec did what we expected
332 332 # silent to avoid
333 333 ip.run_cell("1")
334 334 self.assertTrue(d['called'])
335 335 finally:
336 336 trap.hook = save_hook
337 337
338 338 def test_ofind_line_magic(self):
339 339 from IPython.core.magic import register_line_magic
340 340
341 341 @register_line_magic
342 342 def lmagic(line):
343 343 "A line magic"
344 344
345 345 # Get info on line magic
346 346 lfind = ip._ofind('lmagic')
347 347 info = dict(found=True, isalias=False, ismagic=True,
348 348 namespace = 'IPython internal', obj= lmagic.__wrapped__,
349 349 parent = None)
350 350 nt.assert_equal(lfind, info)
351 351
352 352 def test_ofind_cell_magic(self):
353 353 from IPython.core.magic import register_cell_magic
354 354
355 355 @register_cell_magic
356 356 def cmagic(line, cell):
357 357 "A cell magic"
358 358
359 359 # Get info on cell magic
360 360 find = ip._ofind('cmagic')
361 361 info = dict(found=True, isalias=False, ismagic=True,
362 362 namespace = 'IPython internal', obj= cmagic.__wrapped__,
363 363 parent = None)
364 364 nt.assert_equal(find, info)
365 365
366 366 def test_ofind_property_with_error(self):
367 367 class A(object):
368 368 @property
369 369 def foo(self):
370 370 raise NotImplementedError()
371 371 a = A()
372 372
373 373 found = ip._ofind('a.foo', [('locals', locals())])
374 374 info = dict(found=True, isalias=False, ismagic=False,
375 375 namespace='locals', obj=A.foo, parent=a)
376 376 nt.assert_equal(found, info)
377 377
378 378 def test_ofind_multiple_attribute_lookups(self):
379 379 class A(object):
380 380 @property
381 381 def foo(self):
382 382 raise NotImplementedError()
383 383
384 384 a = A()
385 385 a.a = A()
386 386 a.a.a = A()
387 387
388 388 found = ip._ofind('a.a.a.foo', [('locals', locals())])
389 389 info = dict(found=True, isalias=False, ismagic=False,
390 390 namespace='locals', obj=A.foo, parent=a.a.a)
391 391 nt.assert_equal(found, info)
392 392
393 393 def test_ofind_slotted_attributes(self):
394 394 class A(object):
395 395 __slots__ = ['foo']
396 396 def __init__(self):
397 397 self.foo = 'bar'
398 398
399 399 a = A()
400 400 found = ip._ofind('a.foo', [('locals', locals())])
401 401 info = dict(found=True, isalias=False, ismagic=False,
402 402 namespace='locals', obj=a.foo, parent=a)
403 403 nt.assert_equal(found, info)
404 404
405 405 found = ip._ofind('a.bar', [('locals', locals())])
406 406 info = dict(found=False, isalias=False, ismagic=False,
407 407 namespace=None, obj=None, parent=a)
408 408 nt.assert_equal(found, info)
409 409
410 410 def test_ofind_prefers_property_to_instance_level_attribute(self):
411 411 class A(object):
412 412 @property
413 413 def foo(self):
414 414 return 'bar'
415 415 a = A()
416 416 a.__dict__['foo'] = 'baz'
417 417 nt.assert_equal(a.foo, 'bar')
418 418 found = ip._ofind('a.foo', [('locals', locals())])
419 419 nt.assert_is(found['obj'], A.foo)
420 420
421 421 def test_custom_syntaxerror_exception(self):
422 422 called = []
423 423 def my_handler(shell, etype, value, tb, tb_offset=None):
424 424 called.append(etype)
425 425 shell.showtraceback((etype, value, tb), tb_offset=tb_offset)
426 426
427 427 ip.set_custom_exc((SyntaxError,), my_handler)
428 428 try:
429 429 ip.run_cell("1f")
430 430 # Check that this was called, and only once.
431 431 self.assertEqual(called, [SyntaxError])
432 432 finally:
433 433 # Reset the custom exception hook
434 434 ip.set_custom_exc((), None)
435 435
436 436 def test_custom_exception(self):
437 437 called = []
438 438 def my_handler(shell, etype, value, tb, tb_offset=None):
439 439 called.append(etype)
440 440 shell.showtraceback((etype, value, tb), tb_offset=tb_offset)
441 441
442 442 ip.set_custom_exc((ValueError,), my_handler)
443 443 try:
444 444 res = ip.run_cell("raise ValueError('test')")
445 445 # Check that this was called, and only once.
446 446 self.assertEqual(called, [ValueError])
447 447 # Check that the error is on the result object
448 448 self.assertIsInstance(res.error_in_exec, ValueError)
449 449 finally:
450 450 # Reset the custom exception hook
451 451 ip.set_custom_exc((), None)
452 452
453 453 def test_mktempfile(self):
454 454 filename = ip.mktempfile()
455 455 # Check that we can open the file again on Windows
456 456 with open(filename, 'w') as f:
457 457 f.write('abc')
458 458
459 459 filename = ip.mktempfile(data='blah')
460 460 with open(filename, 'r') as f:
461 461 self.assertEqual(f.read(), 'blah')
462 462
463 463 def test_new_main_mod(self):
464 464 # Smoketest to check that this accepts a unicode module name
465 465 name = u'jiefmw'
466 466 mod = ip.new_main_mod(u'%s.py' % name, name)
467 467 self.assertEqual(mod.__name__, name)
468 468
469 469 def test_get_exception_only(self):
470 470 try:
471 471 raise KeyboardInterrupt
472 472 except KeyboardInterrupt:
473 473 msg = ip.get_exception_only()
474 474 self.assertEqual(msg, 'KeyboardInterrupt\n')
475 475
476 476 try:
477 477 raise DerivedInterrupt("foo")
478 478 except KeyboardInterrupt:
479 479 msg = ip.get_exception_only()
480 480 self.assertEqual(msg, 'IPython.core.tests.test_interactiveshell.DerivedInterrupt: foo\n')
481 481
482 482 def test_inspect_text(self):
483 483 ip.run_cell('a = 5')
484 484 text = ip.object_inspect_text('a')
485 485 self.assertIsInstance(text, str)
486 486
487 487 def test_last_execution_result(self):
488 488 """ Check that last execution result gets set correctly (GH-10702) """
489 489 result = ip.run_cell('a = 5; a')
490 490 self.assertTrue(ip.last_execution_succeeded)
491 491 self.assertEqual(ip.last_execution_result.result, 5)
492 492
493 493 result = ip.run_cell('a = x_invalid_id_x')
494 494 self.assertFalse(ip.last_execution_succeeded)
495 495 self.assertFalse(ip.last_execution_result.success)
496 496 self.assertIsInstance(ip.last_execution_result.error_in_exec, NameError)
497 497
498 498 def test_reset_aliasing(self):
499 499 """ Check that standard posix aliases work after %reset. """
500 500 if os.name != 'posix':
501 501 return
502 502
503 503 ip.reset()
504 504 for cmd in ('clear', 'more', 'less', 'man'):
505 505 res = ip.run_cell('%' + cmd)
506 506 self.assertEqual(res.success, True)
507 507
508 508
509 509 class TestSafeExecfileNonAsciiPath(unittest.TestCase):
510 510
511 511 @onlyif_unicode_paths
512 512 def setUp(self):
513 513 self.BASETESTDIR = tempfile.mkdtemp()
514 514 self.TESTDIR = join(self.BASETESTDIR, u"Γ₯Àâ")
515 515 os.mkdir(self.TESTDIR)
516 516 with open(join(self.TESTDIR, u"Γ₯Àâtestscript.py"), "w") as sfile:
517 517 sfile.write("pass\n")
518 518 self.oldpath = os.getcwd()
519 519 os.chdir(self.TESTDIR)
520 520 self.fname = u"Γ₯Àâtestscript.py"
521 521
522 522 def tearDown(self):
523 523 os.chdir(self.oldpath)
524 524 shutil.rmtree(self.BASETESTDIR)
525 525
526 526 @onlyif_unicode_paths
527 527 def test_1(self):
528 528 """Test safe_execfile with non-ascii path
529 529 """
530 530 ip.safe_execfile(self.fname, {}, raise_exceptions=True)
531 531
532 532 class ExitCodeChecks(tt.TempFileMixin):
533 533 def test_exit_code_ok(self):
534 534 self.system('exit 0')
535 535 self.assertEqual(ip.user_ns['_exit_code'], 0)
536 536
537 537 def test_exit_code_error(self):
538 538 self.system('exit 1')
539 539 self.assertEqual(ip.user_ns['_exit_code'], 1)
540 540
541 541 @skipif(not hasattr(signal, 'SIGALRM'))
542 542 def test_exit_code_signal(self):
543 543 self.mktmp("import signal, time\n"
544 544 "signal.setitimer(signal.ITIMER_REAL, 0.1)\n"
545 545 "time.sleep(1)\n")
546 546 self.system("%s %s" % (sys.executable, self.fname))
547 547 self.assertEqual(ip.user_ns['_exit_code'], -signal.SIGALRM)
548 548
549 549 @onlyif_cmds_exist("csh")
550 550 def test_exit_code_signal_csh(self):
551 551 SHELL = os.environ.get('SHELL', None)
552 552 os.environ['SHELL'] = find_cmd("csh")
553 553 try:
554 554 self.test_exit_code_signal()
555 555 finally:
556 556 if SHELL is not None:
557 557 os.environ['SHELL'] = SHELL
558 558 else:
559 559 del os.environ['SHELL']
560 560
561 561
562 562 class TestSystemRaw(ExitCodeChecks, unittest.TestCase):
563 563 system = ip.system_raw
564 564
565 565 @onlyif_unicode_paths
566 566 def test_1(self):
567 567 """Test system_raw with non-ascii cmd
568 568 """
569 569 cmd = u'''python -c "'Γ₯Àâ'" '''
570 570 ip.system_raw(cmd)
571 571
572 572 @mock.patch('subprocess.call', side_effect=KeyboardInterrupt)
573 573 @mock.patch('os.system', side_effect=KeyboardInterrupt)
574 574 def test_control_c(self, *mocks):
575 575 try:
576 576 self.system("sleep 1 # wont happen")
577 577 except KeyboardInterrupt:
578 578 self.fail("system call should intercept "
579 579 "keyboard interrupt from subprocess.call")
580 580 self.assertEqual(ip.user_ns['_exit_code'], -signal.SIGINT)
581 581
582 582 # TODO: Exit codes are currently ignored on Windows.
583 583 class TestSystemPipedExitCode(ExitCodeChecks, unittest.TestCase):
584 584 system = ip.system_piped
585 585
586 586 @skip_win32
587 587 def test_exit_code_ok(self):
588 588 ExitCodeChecks.test_exit_code_ok(self)
589 589
590 590 @skip_win32
591 591 def test_exit_code_error(self):
592 592 ExitCodeChecks.test_exit_code_error(self)
593 593
594 594 @skip_win32
595 595 def test_exit_code_signal(self):
596 596 ExitCodeChecks.test_exit_code_signal(self)
597 597
598 598 class TestModules(tt.TempFileMixin, unittest.TestCase):
599 599 def test_extraneous_loads(self):
600 600 """Test we're not loading modules on startup that we shouldn't.
601 601 """
602 602 self.mktmp("import sys\n"
603 603 "print('numpy' in sys.modules)\n"
604 604 "print('ipyparallel' in sys.modules)\n"
605 605 "print('ipykernel' in sys.modules)\n"
606 606 )
607 607 out = "False\nFalse\nFalse\n"
608 608 tt.ipexec_validate(self.fname, out)
609 609
610 610 class Negator(ast.NodeTransformer):
611 611 """Negates all number literals in an AST."""
612 612
613 613 def visit_Num(self, node):
614 614 node.n = -node.n
615 615 return node
616 616
617 617 if sys.version_info > (3,8):
618 618 def visit_Constant(self, node):
619 619 if isinstance(node.value, int):
620 620 return self.visit_Num(node)
621 return node
621 622
622 623 class TestAstTransform(unittest.TestCase):
623 624 def setUp(self):
624 625 self.negator = Negator()
625 626 ip.ast_transformers.append(self.negator)
626 627
627 628 def tearDown(self):
628 629 ip.ast_transformers.remove(self.negator)
629 630
630 631 def test_run_cell(self):
631 632 with tt.AssertPrints('-34'):
632 633 ip.run_cell('print (12 + 22)')
633 634
634 635 # A named reference to a number shouldn't be transformed.
635 636 ip.user_ns['n'] = 55
636 637 with tt.AssertNotPrints('-55'):
637 638 ip.run_cell('print (n)')
638 639
639 640 def test_timeit(self):
640 641 called = set()
641 642 def f(x):
642 643 called.add(x)
643 644 ip.push({'f':f})
644 645
645 646 with tt.AssertPrints("std. dev. of"):
646 647 ip.run_line_magic("timeit", "-n1 f(1)")
647 648 self.assertEqual(called, {-1})
648 649 called.clear()
649 650
650 651 with tt.AssertPrints("std. dev. of"):
651 652 ip.run_cell_magic("timeit", "-n1 f(2)", "f(3)")
652 653 self.assertEqual(called, {-2, -3})
653 654
654 655 def test_time(self):
655 656 called = []
656 657 def f(x):
657 658 called.append(x)
658 659 ip.push({'f':f})
659 660
660 661 # Test with an expression
661 662 with tt.AssertPrints("Wall time: "):
662 663 ip.run_line_magic("time", "f(5+9)")
663 664 self.assertEqual(called, [-14])
664 665 called[:] = []
665 666
666 667 # Test with a statement (different code path)
667 668 with tt.AssertPrints("Wall time: "):
668 669 ip.run_line_magic("time", "a = f(-3 + -2)")
669 670 self.assertEqual(called, [5])
670 671
671 672 def test_macro(self):
672 673 ip.push({'a':10})
673 674 # The AST transformation makes this do a+=-1
674 675 ip.define_macro("amacro", "a+=1\nprint(a)")
675 676
676 677 with tt.AssertPrints("9"):
677 678 ip.run_cell("amacro")
678 679 with tt.AssertPrints("8"):
679 680 ip.run_cell("amacro")
680 681
681 682 class IntegerWrapper(ast.NodeTransformer):
682 683 """Wraps all integers in a call to Integer()"""
683 684 def visit_Num(self, node):
684 685 if isinstance(node.n, int):
685 686 return ast.Call(func=ast.Name(id='Integer', ctx=ast.Load()),
686 687 args=[node], keywords=[])
687 688 return node
688 689
689 690 if sys.version_info > (3,8):
690 691 def visit_Constant(self, node):
691 692 if isinstance(node.value, int):
692 693 return self.visit_Num(node)
694 return node
693 695
694 696
695 697 class TestAstTransform2(unittest.TestCase):
696 698 def setUp(self):
697 699 self.intwrapper = IntegerWrapper()
698 700 ip.ast_transformers.append(self.intwrapper)
699 701
700 702 self.calls = []
701 703 def Integer(*args):
702 704 self.calls.append(args)
703 705 return args
704 706 ip.push({"Integer": Integer})
705 707
706 708 def tearDown(self):
707 709 ip.ast_transformers.remove(self.intwrapper)
708 710 del ip.user_ns['Integer']
709 711
710 712 def test_run_cell(self):
711 713 ip.run_cell("n = 2")
712 714 self.assertEqual(self.calls, [(2,)])
713 715
714 716 # This shouldn't throw an error
715 717 ip.run_cell("o = 2.0")
716 718 self.assertEqual(ip.user_ns['o'], 2.0)
717 719
718 720 def test_timeit(self):
719 721 called = set()
720 722 def f(x):
721 723 called.add(x)
722 724 ip.push({'f':f})
723 725
724 726 with tt.AssertPrints("std. dev. of"):
725 727 ip.run_line_magic("timeit", "-n1 f(1)")
726 728 self.assertEqual(called, {(1,)})
727 729 called.clear()
728 730
729 731 with tt.AssertPrints("std. dev. of"):
730 732 ip.run_cell_magic("timeit", "-n1 f(2)", "f(3)")
731 733 self.assertEqual(called, {(2,), (3,)})
732 734
733 735 class ErrorTransformer(ast.NodeTransformer):
734 736 """Throws an error when it sees a number."""
735 737 def visit_Num(self, node):
736 738 raise ValueError("test")
737 739
738 740 if sys.version_info > (3,8):
739 741 def visit_Constant(self, node):
740 742 if isinstance(node.value, int):
741 743 return self.visit_Num(node)
744 return node
742 745
743 746
744 747 class TestAstTransformError(unittest.TestCase):
745 748 def test_unregistering(self):
746 749 err_transformer = ErrorTransformer()
747 750 ip.ast_transformers.append(err_transformer)
748 751
749 752 with tt.AssertPrints("unregister", channel='stderr'):
750 753 ip.run_cell("1 + 2")
751 754
752 755 # This should have been removed.
753 756 nt.assert_not_in(err_transformer, ip.ast_transformers)
754 757
755 758
756 759 class StringRejector(ast.NodeTransformer):
757 760 """Throws an InputRejected when it sees a string literal.
758 761
759 762 Used to verify that NodeTransformers can signal that a piece of code should
760 763 not be executed by throwing an InputRejected.
761 764 """
762 765
763 766 def visit_Str(self, node):
764 767 raise InputRejected("test")
765 768
766 769 # 3.8 only
767 770 def visit_Constant(self, node):
768 771 if isinstance(node.value, str):
769 772 raise InputRejected("test")
773 return node
770 774
771 775
772 776 class TestAstTransformInputRejection(unittest.TestCase):
773 777
774 778 def setUp(self):
775 779 self.transformer = StringRejector()
776 780 ip.ast_transformers.append(self.transformer)
777 781
778 782 def tearDown(self):
779 783 ip.ast_transformers.remove(self.transformer)
780 784
781 785 def test_input_rejection(self):
782 786 """Check that NodeTransformers can reject input."""
783 787
784 788 expect_exception_tb = tt.AssertPrints("InputRejected: test")
785 789 expect_no_cell_output = tt.AssertNotPrints("'unsafe'", suppress=False)
786 790
787 791 # Run the same check twice to verify that the transformer is not
788 792 # disabled after raising.
789 793 with expect_exception_tb, expect_no_cell_output:
790 794 ip.run_cell("'unsafe'")
791 795
792 796 with expect_exception_tb, expect_no_cell_output:
793 797 res = ip.run_cell("'unsafe'")
794 798
795 799 self.assertIsInstance(res.error_before_exec, InputRejected)
796 800
797 801 def test__IPYTHON__():
798 802 # This shouldn't raise a NameError, that's all
799 803 __IPYTHON__
800 804
801 805
802 806 class DummyRepr(object):
803 807 def __repr__(self):
804 808 return "DummyRepr"
805 809
806 810 def _repr_html_(self):
807 811 return "<b>dummy</b>"
808 812
809 813 def _repr_javascript_(self):
810 814 return "console.log('hi');", {'key': 'value'}
811 815
812 816
813 817 def test_user_variables():
814 818 # enable all formatters
815 819 ip.display_formatter.active_types = ip.display_formatter.format_types
816 820
817 821 ip.user_ns['dummy'] = d = DummyRepr()
818 822 keys = {'dummy', 'doesnotexist'}
819 823 r = ip.user_expressions({ key:key for key in keys})
820 824
821 825 nt.assert_equal(keys, set(r.keys()))
822 826 dummy = r['dummy']
823 827 nt.assert_equal({'status', 'data', 'metadata'}, set(dummy.keys()))
824 828 nt.assert_equal(dummy['status'], 'ok')
825 829 data = dummy['data']
826 830 metadata = dummy['metadata']
827 831 nt.assert_equal(data.get('text/html'), d._repr_html_())
828 832 js, jsmd = d._repr_javascript_()
829 833 nt.assert_equal(data.get('application/javascript'), js)
830 834 nt.assert_equal(metadata.get('application/javascript'), jsmd)
831 835
832 836 dne = r['doesnotexist']
833 837 nt.assert_equal(dne['status'], 'error')
834 838 nt.assert_equal(dne['ename'], 'NameError')
835 839
836 840 # back to text only
837 841 ip.display_formatter.active_types = ['text/plain']
838 842
839 843 def test_user_expression():
840 844 # enable all formatters
841 845 ip.display_formatter.active_types = ip.display_formatter.format_types
842 846 query = {
843 847 'a' : '1 + 2',
844 848 'b' : '1/0',
845 849 }
846 850 r = ip.user_expressions(query)
847 851 import pprint
848 852 pprint.pprint(r)
849 853 nt.assert_equal(set(r.keys()), set(query.keys()))
850 854 a = r['a']
851 855 nt.assert_equal({'status', 'data', 'metadata'}, set(a.keys()))
852 856 nt.assert_equal(a['status'], 'ok')
853 857 data = a['data']
854 858 metadata = a['metadata']
855 859 nt.assert_equal(data.get('text/plain'), '3')
856 860
857 861 b = r['b']
858 862 nt.assert_equal(b['status'], 'error')
859 863 nt.assert_equal(b['ename'], 'ZeroDivisionError')
860 864
861 865 # back to text only
862 866 ip.display_formatter.active_types = ['text/plain']
863 867
864 868
865 869
866 870
867 871
868 872 class TestSyntaxErrorTransformer(unittest.TestCase):
869 873 """Check that SyntaxError raised by an input transformer is handled by run_cell()"""
870 874
871 875 @staticmethod
872 876 def transformer(lines):
873 877 for line in lines:
874 878 pos = line.find('syntaxerror')
875 879 if pos >= 0:
876 880 e = SyntaxError('input contains "syntaxerror"')
877 881 e.text = line
878 882 e.offset = pos + 1
879 883 raise e
880 884 return lines
881 885
882 886 def setUp(self):
883 887 ip.input_transformers_post.append(self.transformer)
884 888
885 889 def tearDown(self):
886 890 ip.input_transformers_post.remove(self.transformer)
887 891
888 892 def test_syntaxerror_input_transformer(self):
889 893 with tt.AssertPrints('1234'):
890 894 ip.run_cell('1234')
891 895 with tt.AssertPrints('SyntaxError: invalid syntax'):
892 896 ip.run_cell('1 2 3') # plain python syntax error
893 897 with tt.AssertPrints('SyntaxError: input contains "syntaxerror"'):
894 898 ip.run_cell('2345 # syntaxerror') # input transformer syntax error
895 899 with tt.AssertPrints('3456'):
896 900 ip.run_cell('3456')
897 901
898 902
899 903
900 904 def test_warning_suppression():
901 905 ip.run_cell("import warnings")
902 906 try:
903 907 with tt.AssertPrints("UserWarning: asdf", channel="stderr"):
904 908 ip.run_cell("warnings.warn('asdf')")
905 909 # Here's the real test -- if we run that again, we should get the
906 910 # warning again. Traditionally, each warning was only issued once per
907 911 # IPython session (approximately), even if the user typed in new and
908 912 # different code that should have also triggered the warning, leading
909 913 # to much confusion.
910 914 with tt.AssertPrints("UserWarning: asdf", channel="stderr"):
911 915 ip.run_cell("warnings.warn('asdf')")
912 916 finally:
913 917 ip.run_cell("del warnings")
914 918
915 919
916 920 def test_deprecation_warning():
917 921 ip.run_cell("""
918 922 import warnings
919 923 def wrn():
920 924 warnings.warn(
921 925 "I AM A WARNING",
922 926 DeprecationWarning
923 927 )
924 928 """)
925 929 try:
926 930 with tt.AssertPrints("I AM A WARNING", channel="stderr"):
927 931 ip.run_cell("wrn()")
928 932 finally:
929 933 ip.run_cell("del warnings")
930 934 ip.run_cell("del wrn")
931 935
932 936
933 937 class TestImportNoDeprecate(tt.TempFileMixin):
934 938
935 939 def setup(self):
936 940 """Make a valid python temp file."""
937 941 self.mktmp("""
938 942 import warnings
939 943 def wrn():
940 944 warnings.warn(
941 945 "I AM A WARNING",
942 946 DeprecationWarning
943 947 )
944 948 """)
945 949
946 950 def test_no_dep(self):
947 951 """
948 952 No deprecation warning should be raised from imported functions
949 953 """
950 954 ip.run_cell("from {} import wrn".format(self.fname))
951 955
952 956 with tt.AssertNotPrints("I AM A WARNING"):
953 957 ip.run_cell("wrn()")
954 958 ip.run_cell("del wrn")
955 959
956 960
957 961 def test_custom_exc_count():
958 962 hook = mock.Mock(return_value=None)
959 963 ip.set_custom_exc((SyntaxError,), hook)
960 964 before = ip.execution_count
961 965 ip.run_cell("def foo()", store_history=True)
962 966 # restore default excepthook
963 967 ip.set_custom_exc((), None)
964 968 nt.assert_equal(hook.call_count, 1)
965 969 nt.assert_equal(ip.execution_count, before + 1)
966 970
967 971
968 972 def test_run_cell_async():
969 973 loop = asyncio.get_event_loop()
970 974 ip.run_cell("import asyncio")
971 975 coro = ip.run_cell_async("await asyncio.sleep(0.01)\n5")
972 976 assert asyncio.iscoroutine(coro)
973 977 result = loop.run_until_complete(coro)
974 978 assert isinstance(result, interactiveshell.ExecutionResult)
975 979 assert result.result == 5
976 980
977 981
978 982 def test_should_run_async():
979 983 assert not ip.should_run_async("a = 5")
980 984 assert ip.should_run_async("await x")
981 985 assert ip.should_run_async("import asyncio; await asyncio.sleep(1)")
General Comments 0
You need to be logged in to leave comments. Login now