##// END OF EJS Templates
[issue6883] catch keyboardinterrupt if generated during shell.system() calls
mvr -
Show More

The requested changes are too big and content was truncated. Show full diff

1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
@@ -1,859 +1,869 b''
1 1 # -*- coding: utf-8 -*-
2 2 """Tests for the key interactiveshell module.
3 3
4 4 Historically the main classes in interactiveshell have been under-tested. This
5 5 module should grow as many single-method tests as possible to trap many of the
6 6 recurring bugs we seem to encounter with high-level interaction.
7 7 """
8 8
9 9 # Copyright (c) IPython Development Team.
10 10 # Distributed under the terms of the Modified BSD License.
11 11
12 12 import ast
13 13 import os
14 14 import signal
15 15 import shutil
16 16 import sys
17 17 import tempfile
18 18 import unittest
19 19 try:
20 20 from unittest import mock
21 21 except ImportError:
22 22 import mock
23 23 from os.path import join
24 24
25 25 import nose.tools as nt
26 26
27 27 from IPython.core.error import InputRejected
28 28 from IPython.core.inputtransformer import InputTransformer
29 29 from IPython.testing.decorators import (
30 30 skipif, skip_win32, onlyif_unicode_paths, onlyif_cmds_exist,
31 31 )
32 32 from IPython.testing import tools as tt
33 33 from IPython.utils import io
34 34 from IPython.utils.process import find_cmd
35 35 from IPython.utils import py3compat
36 36 from IPython.utils.py3compat import unicode_type, PY3
37 37
38 38 if PY3:
39 39 from io import StringIO
40 40 else:
41 41 from StringIO import StringIO
42 42
43 43 #-----------------------------------------------------------------------------
44 44 # Globals
45 45 #-----------------------------------------------------------------------------
46 46 # This is used by every single test, no point repeating it ad nauseam
47 47 ip = get_ipython()
48 48
49 49 #-----------------------------------------------------------------------------
50 50 # Tests
51 51 #-----------------------------------------------------------------------------
52 52
53 53 class InteractiveShellTestCase(unittest.TestCase):
54 54 def test_naked_string_cells(self):
55 55 """Test that cells with only naked strings are fully executed"""
56 56 # First, single-line inputs
57 57 ip.run_cell('"a"\n')
58 58 self.assertEqual(ip.user_ns['_'], 'a')
59 59 # And also multi-line cells
60 60 ip.run_cell('"""a\nb"""\n')
61 61 self.assertEqual(ip.user_ns['_'], 'a\nb')
62 62
63 63 def test_run_empty_cell(self):
64 64 """Just make sure we don't get a horrible error with a blank
65 65 cell of input. Yes, I did overlook that."""
66 66 old_xc = ip.execution_count
67 67 ip.run_cell('')
68 68 self.assertEqual(ip.execution_count, old_xc)
69 69
70 70 def test_run_cell_multiline(self):
71 71 """Multi-block, multi-line cells must execute correctly.
72 72 """
73 73 src = '\n'.join(["x=1",
74 74 "y=2",
75 75 "if 1:",
76 76 " x += 1",
77 77 " y += 1",])
78 78 ip.run_cell(src)
79 79 self.assertEqual(ip.user_ns['x'], 2)
80 80 self.assertEqual(ip.user_ns['y'], 3)
81 81
82 82 def test_multiline_string_cells(self):
83 83 "Code sprinkled with multiline strings should execute (GH-306)"
84 84 ip.run_cell('tmp=0')
85 85 self.assertEqual(ip.user_ns['tmp'], 0)
86 86 ip.run_cell('tmp=1;"""a\nb"""\n')
87 87 self.assertEqual(ip.user_ns['tmp'], 1)
88 88
89 89 def test_dont_cache_with_semicolon(self):
90 90 "Ending a line with semicolon should not cache the returned object (GH-307)"
91 91 oldlen = len(ip.user_ns['Out'])
92 92 for cell in ['1;', '1;1;']:
93 93 ip.run_cell(cell, store_history=True)
94 94 newlen = len(ip.user_ns['Out'])
95 95 self.assertEqual(oldlen, newlen)
96 96 i = 0
97 97 #also test the default caching behavior
98 98 for cell in ['1', '1;1']:
99 99 ip.run_cell(cell, store_history=True)
100 100 newlen = len(ip.user_ns['Out'])
101 101 i += 1
102 102 self.assertEqual(oldlen+i, newlen)
103 103
104 104 def test_In_variable(self):
105 105 "Verify that In variable grows with user input (GH-284)"
106 106 oldlen = len(ip.user_ns['In'])
107 107 ip.run_cell('1;', store_history=True)
108 108 newlen = len(ip.user_ns['In'])
109 109 self.assertEqual(oldlen+1, newlen)
110 110 self.assertEqual(ip.user_ns['In'][-1],'1;')
111 111
112 112 def test_magic_names_in_string(self):
113 113 ip.run_cell('a = """\n%exit\n"""')
114 114 self.assertEqual(ip.user_ns['a'], '\n%exit\n')
115 115
116 116 def test_trailing_newline(self):
117 117 """test that running !(command) does not raise a SyntaxError"""
118 118 ip.run_cell('!(true)\n', False)
119 119 ip.run_cell('!(true)\n\n\n', False)
120 120
121 121 def test_gh_597(self):
122 122 """Pretty-printing lists of objects with non-ascii reprs may cause
123 123 problems."""
124 124 class Spam(object):
125 125 def __repr__(self):
126 126 return "\xe9"*50
127 127 import IPython.core.formatters
128 128 f = IPython.core.formatters.PlainTextFormatter()
129 129 f([Spam(),Spam()])
130 130
131 131
132 132 def test_future_flags(self):
133 133 """Check that future flags are used for parsing code (gh-777)"""
134 134 ip.run_cell('from __future__ import print_function')
135 135 try:
136 136 ip.run_cell('prfunc_return_val = print(1,2, sep=" ")')
137 137 assert 'prfunc_return_val' in ip.user_ns
138 138 finally:
139 139 # Reset compiler flags so we don't mess up other tests.
140 140 ip.compile.reset_compiler_flags()
141 141
142 142 def test_future_unicode(self):
143 143 """Check that unicode_literals is imported from __future__ (gh #786)"""
144 144 try:
145 145 ip.run_cell(u'byte_str = "a"')
146 146 assert isinstance(ip.user_ns['byte_str'], str) # string literals are byte strings by default
147 147 ip.run_cell('from __future__ import unicode_literals')
148 148 ip.run_cell(u'unicode_str = "a"')
149 149 assert isinstance(ip.user_ns['unicode_str'], unicode_type) # strings literals are now unicode
150 150 finally:
151 151 # Reset compiler flags so we don't mess up other tests.
152 152 ip.compile.reset_compiler_flags()
153 153
154 154 def test_can_pickle(self):
155 155 "Can we pickle objects defined interactively (GH-29)"
156 156 ip = get_ipython()
157 157 ip.reset()
158 158 ip.run_cell(("class Mylist(list):\n"
159 159 " def __init__(self,x=[]):\n"
160 160 " list.__init__(self,x)"))
161 161 ip.run_cell("w=Mylist([1,2,3])")
162 162
163 163 from pickle import dumps
164 164
165 165 # We need to swap in our main module - this is only necessary
166 166 # inside the test framework, because IPython puts the interactive module
167 167 # in place (but the test framework undoes this).
168 168 _main = sys.modules['__main__']
169 169 sys.modules['__main__'] = ip.user_module
170 170 try:
171 171 res = dumps(ip.user_ns["w"])
172 172 finally:
173 173 sys.modules['__main__'] = _main
174 174 self.assertTrue(isinstance(res, bytes))
175 175
176 176 def test_global_ns(self):
177 177 "Code in functions must be able to access variables outside them."
178 178 ip = get_ipython()
179 179 ip.run_cell("a = 10")
180 180 ip.run_cell(("def f(x):\n"
181 181 " return x + a"))
182 182 ip.run_cell("b = f(12)")
183 183 self.assertEqual(ip.user_ns["b"], 22)
184 184
185 185 def test_bad_custom_tb(self):
186 186 """Check that InteractiveShell is protected from bad custom exception handlers"""
187 187 from IPython.utils import io
188 188 save_stderr = io.stderr
189 189 try:
190 190 # capture stderr
191 191 io.stderr = StringIO()
192 192 ip.set_custom_exc((IOError,), lambda etype,value,tb: 1/0)
193 193 self.assertEqual(ip.custom_exceptions, (IOError,))
194 194 ip.run_cell(u'raise IOError("foo")')
195 195 self.assertEqual(ip.custom_exceptions, ())
196 196 self.assertTrue("Custom TB Handler failed" in io.stderr.getvalue())
197 197 finally:
198 198 io.stderr = save_stderr
199 199
200 200 def test_bad_custom_tb_return(self):
201 201 """Check that InteractiveShell is protected from bad return types in custom exception handlers"""
202 202 from IPython.utils import io
203 203 save_stderr = io.stderr
204 204 try:
205 205 # capture stderr
206 206 io.stderr = StringIO()
207 207 ip.set_custom_exc((NameError,),lambda etype,value,tb, tb_offset=None: 1)
208 208 self.assertEqual(ip.custom_exceptions, (NameError,))
209 209 ip.run_cell(u'a=abracadabra')
210 210 self.assertEqual(ip.custom_exceptions, ())
211 211 self.assertTrue("Custom TB Handler failed" in io.stderr.getvalue())
212 212 finally:
213 213 io.stderr = save_stderr
214 214
215 215 def test_drop_by_id(self):
216 216 myvars = {"a":object(), "b":object(), "c": object()}
217 217 ip.push(myvars, interactive=False)
218 218 for name in myvars:
219 219 assert name in ip.user_ns, name
220 220 assert name in ip.user_ns_hidden, name
221 221 ip.user_ns['b'] = 12
222 222 ip.drop_by_id(myvars)
223 223 for name in ["a", "c"]:
224 224 assert name not in ip.user_ns, name
225 225 assert name not in ip.user_ns_hidden, name
226 226 assert ip.user_ns['b'] == 12
227 227 ip.reset()
228 228
229 229 def test_var_expand(self):
230 230 ip.user_ns['f'] = u'Ca\xf1o'
231 231 self.assertEqual(ip.var_expand(u'echo $f'), u'echo Ca\xf1o')
232 232 self.assertEqual(ip.var_expand(u'echo {f}'), u'echo Ca\xf1o')
233 233 self.assertEqual(ip.var_expand(u'echo {f[:-1]}'), u'echo Ca\xf1')
234 234 self.assertEqual(ip.var_expand(u'echo {1*2}'), u'echo 2')
235 235
236 236 ip.user_ns['f'] = b'Ca\xc3\xb1o'
237 237 # This should not raise any exception:
238 238 ip.var_expand(u'echo $f')
239 239
240 240 def test_var_expand_local(self):
241 241 """Test local variable expansion in !system and %magic calls"""
242 242 # !system
243 243 ip.run_cell('def test():\n'
244 244 ' lvar = "ttt"\n'
245 245 ' ret = !echo {lvar}\n'
246 246 ' return ret[0]\n')
247 247 res = ip.user_ns['test']()
248 248 nt.assert_in('ttt', res)
249 249
250 250 # %magic
251 251 ip.run_cell('def makemacro():\n'
252 252 ' macroname = "macro_var_expand_locals"\n'
253 253 ' %macro {macroname} codestr\n')
254 254 ip.user_ns['codestr'] = "str(12)"
255 255 ip.run_cell('makemacro()')
256 256 nt.assert_in('macro_var_expand_locals', ip.user_ns)
257 257
258 258 def test_var_expand_self(self):
259 259 """Test variable expansion with the name 'self', which was failing.
260 260
261 261 See https://github.com/ipython/ipython/issues/1878#issuecomment-7698218
262 262 """
263 263 ip.run_cell('class cTest:\n'
264 264 ' classvar="see me"\n'
265 265 ' def test(self):\n'
266 266 ' res = !echo Variable: {self.classvar}\n'
267 267 ' return res[0]\n')
268 268 nt.assert_in('see me', ip.user_ns['cTest']().test())
269 269
270 270 def test_bad_var_expand(self):
271 271 """var_expand on invalid formats shouldn't raise"""
272 272 # SyntaxError
273 273 self.assertEqual(ip.var_expand(u"{'a':5}"), u"{'a':5}")
274 274 # NameError
275 275 self.assertEqual(ip.var_expand(u"{asdf}"), u"{asdf}")
276 276 # ZeroDivisionError
277 277 self.assertEqual(ip.var_expand(u"{1/0}"), u"{1/0}")
278 278
279 279 def test_silent_postexec(self):
280 280 """run_cell(silent=True) doesn't invoke pre/post_run_cell callbacks"""
281 281 pre_explicit = mock.Mock()
282 282 pre_always = mock.Mock()
283 283 post_explicit = mock.Mock()
284 284 post_always = mock.Mock()
285 285
286 286 ip.events.register('pre_run_cell', pre_explicit)
287 287 ip.events.register('pre_execute', pre_always)
288 288 ip.events.register('post_run_cell', post_explicit)
289 289 ip.events.register('post_execute', post_always)
290 290
291 291 try:
292 292 ip.run_cell("1", silent=True)
293 293 assert pre_always.called
294 294 assert not pre_explicit.called
295 295 assert post_always.called
296 296 assert not post_explicit.called
297 297 # double-check that non-silent exec did what we expected
298 298 # silent to avoid
299 299 ip.run_cell("1")
300 300 assert pre_explicit.called
301 301 assert post_explicit.called
302 302 finally:
303 303 # remove post-exec
304 304 ip.events.unregister('pre_run_cell', pre_explicit)
305 305 ip.events.unregister('pre_execute', pre_always)
306 306 ip.events.unregister('post_run_cell', post_explicit)
307 307 ip.events.unregister('post_execute', post_always)
308 308
309 309 def test_silent_noadvance(self):
310 310 """run_cell(silent=True) doesn't advance execution_count"""
311 311 ec = ip.execution_count
312 312 # silent should force store_history=False
313 313 ip.run_cell("1", store_history=True, silent=True)
314 314
315 315 self.assertEqual(ec, ip.execution_count)
316 316 # double-check that non-silent exec did what we expected
317 317 # silent to avoid
318 318 ip.run_cell("1", store_history=True)
319 319 self.assertEqual(ec+1, ip.execution_count)
320 320
321 321 def test_silent_nodisplayhook(self):
322 322 """run_cell(silent=True) doesn't trigger displayhook"""
323 323 d = dict(called=False)
324 324
325 325 trap = ip.display_trap
326 326 save_hook = trap.hook
327 327
328 328 def failing_hook(*args, **kwargs):
329 329 d['called'] = True
330 330
331 331 try:
332 332 trap.hook = failing_hook
333 333 ip.run_cell("1", silent=True)
334 334 self.assertFalse(d['called'])
335 335 # double-check that non-silent exec did what we expected
336 336 # silent to avoid
337 337 ip.run_cell("1")
338 338 self.assertTrue(d['called'])
339 339 finally:
340 340 trap.hook = save_hook
341 341
342 342 @skipif(sys.version_info[0] >= 3, "softspace removed in py3")
343 343 def test_print_softspace(self):
344 344 """Verify that softspace is handled correctly when executing multiple
345 345 statements.
346 346
347 347 In [1]: print 1; print 2
348 348 1
349 349 2
350 350
351 351 In [2]: print 1,; print 2
352 352 1 2
353 353 """
354 354
355 355 def test_ofind_line_magic(self):
356 356 from IPython.core.magic import register_line_magic
357 357
358 358 @register_line_magic
359 359 def lmagic(line):
360 360 "A line magic"
361 361
362 362 # Get info on line magic
363 363 lfind = ip._ofind('lmagic')
364 364 info = dict(found=True, isalias=False, ismagic=True,
365 365 namespace = 'IPython internal', obj= lmagic.__wrapped__,
366 366 parent = None)
367 367 nt.assert_equal(lfind, info)
368 368
369 369 def test_ofind_cell_magic(self):
370 370 from IPython.core.magic import register_cell_magic
371 371
372 372 @register_cell_magic
373 373 def cmagic(line, cell):
374 374 "A cell magic"
375 375
376 376 # Get info on cell magic
377 377 find = ip._ofind('cmagic')
378 378 info = dict(found=True, isalias=False, ismagic=True,
379 379 namespace = 'IPython internal', obj= cmagic.__wrapped__,
380 380 parent = None)
381 381 nt.assert_equal(find, info)
382 382
383 383 def test_ofind_property_with_error(self):
384 384 class A(object):
385 385 @property
386 386 def foo(self):
387 387 raise NotImplementedError()
388 388 a = A()
389 389
390 390 found = ip._ofind('a.foo', [('locals', locals())])
391 391 info = dict(found=True, isalias=False, ismagic=False,
392 392 namespace='locals', obj=A.foo, parent=a)
393 393 nt.assert_equal(found, info)
394 394
395 395 def test_ofind_multiple_attribute_lookups(self):
396 396 class A(object):
397 397 @property
398 398 def foo(self):
399 399 raise NotImplementedError()
400 400
401 401 a = A()
402 402 a.a = A()
403 403 a.a.a = A()
404 404
405 405 found = ip._ofind('a.a.a.foo', [('locals', locals())])
406 406 info = dict(found=True, isalias=False, ismagic=False,
407 407 namespace='locals', obj=A.foo, parent=a.a.a)
408 408 nt.assert_equal(found, info)
409 409
410 410 def test_ofind_slotted_attributes(self):
411 411 class A(object):
412 412 __slots__ = ['foo']
413 413 def __init__(self):
414 414 self.foo = 'bar'
415 415
416 416 a = A()
417 417 found = ip._ofind('a.foo', [('locals', locals())])
418 418 info = dict(found=True, isalias=False, ismagic=False,
419 419 namespace='locals', obj=a.foo, parent=a)
420 420 nt.assert_equal(found, info)
421 421
422 422 found = ip._ofind('a.bar', [('locals', locals())])
423 423 info = dict(found=False, isalias=False, ismagic=False,
424 424 namespace=None, obj=None, parent=a)
425 425 nt.assert_equal(found, info)
426 426
427 427 def test_ofind_prefers_property_to_instance_level_attribute(self):
428 428 class A(object):
429 429 @property
430 430 def foo(self):
431 431 return 'bar'
432 432 a = A()
433 433 a.__dict__['foo'] = 'baz'
434 434 nt.assert_equal(a.foo, 'bar')
435 435 found = ip._ofind('a.foo', [('locals', locals())])
436 436 nt.assert_is(found['obj'], A.foo)
437 437
438 438 def test_custom_exception(self):
439 439 called = []
440 440 def my_handler(shell, etype, value, tb, tb_offset=None):
441 441 called.append(etype)
442 442 shell.showtraceback((etype, value, tb), tb_offset=tb_offset)
443 443
444 444 ip.set_custom_exc((ValueError,), my_handler)
445 445 try:
446 446 ip.run_cell("raise ValueError('test')")
447 447 # Check that this was called, and only once.
448 448 self.assertEqual(called, [ValueError])
449 449 finally:
450 450 # Reset the custom exception hook
451 451 ip.set_custom_exc((), None)
452 452
453 453 @skipif(sys.version_info[0] >= 3, "no differences with __future__ in py3")
454 454 def test_future_environment(self):
455 455 "Can we run code with & without the shell's __future__ imports?"
456 456 ip.run_cell("from __future__ import division")
457 457 ip.run_cell("a = 1/2", shell_futures=True)
458 458 self.assertEqual(ip.user_ns['a'], 0.5)
459 459 ip.run_cell("b = 1/2", shell_futures=False)
460 460 self.assertEqual(ip.user_ns['b'], 0)
461 461
462 462 ip.compile.reset_compiler_flags()
463 463 # This shouldn't leak to the shell's compiler
464 464 ip.run_cell("from __future__ import division \nc=1/2", shell_futures=False)
465 465 self.assertEqual(ip.user_ns['c'], 0.5)
466 466 ip.run_cell("d = 1/2", shell_futures=True)
467 467 self.assertEqual(ip.user_ns['d'], 0)
468 468
469 469 def test_mktempfile(self):
470 470 filename = ip.mktempfile()
471 471 # Check that we can open the file again on Windows
472 472 with open(filename, 'w') as f:
473 473 f.write('abc')
474 474
475 475 filename = ip.mktempfile(data='blah')
476 476 with open(filename, 'r') as f:
477 477 self.assertEqual(f.read(), 'blah')
478 478
479 479 def test_new_main_mod(self):
480 480 # Smoketest to check that this accepts a unicode module name
481 481 name = u'jiefmw'
482 482 mod = ip.new_main_mod(u'%s.py' % name, name)
483 483 self.assertEqual(mod.__name__, name)
484 484
485 485 class TestSafeExecfileNonAsciiPath(unittest.TestCase):
486 486
487 487 @onlyif_unicode_paths
488 488 def setUp(self):
489 489 self.BASETESTDIR = tempfile.mkdtemp()
490 490 self.TESTDIR = join(self.BASETESTDIR, u"åäö")
491 491 os.mkdir(self.TESTDIR)
492 492 with open(join(self.TESTDIR, u"åäötestscript.py"), "w") as sfile:
493 493 sfile.write("pass\n")
494 494 self.oldpath = py3compat.getcwd()
495 495 os.chdir(self.TESTDIR)
496 496 self.fname = u"åäötestscript.py"
497 497
498 498 def tearDown(self):
499 499 os.chdir(self.oldpath)
500 500 shutil.rmtree(self.BASETESTDIR)
501 501
502 502 @onlyif_unicode_paths
503 503 def test_1(self):
504 504 """Test safe_execfile with non-ascii path
505 505 """
506 506 ip.safe_execfile(self.fname, {}, raise_exceptions=True)
507 507
508 508 class ExitCodeChecks(tt.TempFileMixin):
509 509 def test_exit_code_ok(self):
510 510 self.system('exit 0')
511 511 self.assertEqual(ip.user_ns['_exit_code'], 0)
512 512
513 513 def test_exit_code_error(self):
514 514 self.system('exit 1')
515 515 self.assertEqual(ip.user_ns['_exit_code'], 1)
516 516
517 517 @skipif(not hasattr(signal, 'SIGALRM'))
518 518 def test_exit_code_signal(self):
519 519 self.mktmp("import signal, time\n"
520 520 "signal.setitimer(signal.ITIMER_REAL, 0.1)\n"
521 521 "time.sleep(1)\n")
522 522 self.system("%s %s" % (sys.executable, self.fname))
523 523 self.assertEqual(ip.user_ns['_exit_code'], -signal.SIGALRM)
524 524
525 525 @onlyif_cmds_exist("csh")
526 526 def test_exit_code_signal_csh(self):
527 527 SHELL = os.environ.get('SHELL', None)
528 528 os.environ['SHELL'] = find_cmd("csh")
529 529 try:
530 530 self.test_exit_code_signal()
531 531 finally:
532 532 if SHELL is not None:
533 533 os.environ['SHELL'] = SHELL
534 534 else:
535 535 del os.environ['SHELL']
536 536
537 537 class TestSystemRaw(unittest.TestCase, ExitCodeChecks):
538 538 system = ip.system_raw
539 539
540 540 @onlyif_unicode_paths
541 541 def test_1(self):
542 542 """Test system_raw with non-ascii cmd
543 543 """
544 544 cmd = u'''python -c "'åäö'" '''
545 545 ip.system_raw(cmd)
546 546
547 @mock.patch('subprocess.call')
548 def test_control_c(self, call_mock):
549 call_mock.side_effect = KeyboardInterrupt()
550 try:
551 self.system("sleep 1 # wont happen")
552 except KeyboardInterrupt:
553 self.fail("system call should intercept "
554 "keyboard interrupt from subprocess.call")
555 self.assertEqual(ip.user_ns['_exit_code'], -signal.SIGINT)
556
547 557 # TODO: Exit codes are currently ignored on Windows.
548 558 class TestSystemPipedExitCode(unittest.TestCase, ExitCodeChecks):
549 559 system = ip.system_piped
550 560
551 561 @skip_win32
552 562 def test_exit_code_ok(self):
553 563 ExitCodeChecks.test_exit_code_ok(self)
554 564
555 565 @skip_win32
556 566 def test_exit_code_error(self):
557 567 ExitCodeChecks.test_exit_code_error(self)
558 568
559 569 @skip_win32
560 570 def test_exit_code_signal(self):
561 571 ExitCodeChecks.test_exit_code_signal(self)
562 572
563 573 class TestModules(unittest.TestCase, tt.TempFileMixin):
564 574 def test_extraneous_loads(self):
565 575 """Test we're not loading modules on startup that we shouldn't.
566 576 """
567 577 self.mktmp("import sys\n"
568 578 "print('numpy' in sys.modules)\n"
569 579 "print('IPython.parallel' in sys.modules)\n"
570 580 "print('IPython.kernel.zmq' in sys.modules)\n"
571 581 )
572 582 out = "False\nFalse\nFalse\n"
573 583 tt.ipexec_validate(self.fname, out)
574 584
575 585 class Negator(ast.NodeTransformer):
576 586 """Negates all number literals in an AST."""
577 587 def visit_Num(self, node):
578 588 node.n = -node.n
579 589 return node
580 590
581 591 class TestAstTransform(unittest.TestCase):
582 592 def setUp(self):
583 593 self.negator = Negator()
584 594 ip.ast_transformers.append(self.negator)
585 595
586 596 def tearDown(self):
587 597 ip.ast_transformers.remove(self.negator)
588 598
589 599 def test_run_cell(self):
590 600 with tt.AssertPrints('-34'):
591 601 ip.run_cell('print (12 + 22)')
592 602
593 603 # A named reference to a number shouldn't be transformed.
594 604 ip.user_ns['n'] = 55
595 605 with tt.AssertNotPrints('-55'):
596 606 ip.run_cell('print (n)')
597 607
598 608 def test_timeit(self):
599 609 called = set()
600 610 def f(x):
601 611 called.add(x)
602 612 ip.push({'f':f})
603 613
604 614 with tt.AssertPrints("best of "):
605 615 ip.run_line_magic("timeit", "-n1 f(1)")
606 616 self.assertEqual(called, set([-1]))
607 617 called.clear()
608 618
609 619 with tt.AssertPrints("best of "):
610 620 ip.run_cell_magic("timeit", "-n1 f(2)", "f(3)")
611 621 self.assertEqual(called, set([-2, -3]))
612 622
613 623 def test_time(self):
614 624 called = []
615 625 def f(x):
616 626 called.append(x)
617 627 ip.push({'f':f})
618 628
619 629 # Test with an expression
620 630 with tt.AssertPrints("Wall time: "):
621 631 ip.run_line_magic("time", "f(5+9)")
622 632 self.assertEqual(called, [-14])
623 633 called[:] = []
624 634
625 635 # Test with a statement (different code path)
626 636 with tt.AssertPrints("Wall time: "):
627 637 ip.run_line_magic("time", "a = f(-3 + -2)")
628 638 self.assertEqual(called, [5])
629 639
630 640 def test_macro(self):
631 641 ip.push({'a':10})
632 642 # The AST transformation makes this do a+=-1
633 643 ip.define_macro("amacro", "a+=1\nprint(a)")
634 644
635 645 with tt.AssertPrints("9"):
636 646 ip.run_cell("amacro")
637 647 with tt.AssertPrints("8"):
638 648 ip.run_cell("amacro")
639 649
640 650 class IntegerWrapper(ast.NodeTransformer):
641 651 """Wraps all integers in a call to Integer()"""
642 652 def visit_Num(self, node):
643 653 if isinstance(node.n, int):
644 654 return ast.Call(func=ast.Name(id='Integer', ctx=ast.Load()),
645 655 args=[node], keywords=[])
646 656 return node
647 657
648 658 class TestAstTransform2(unittest.TestCase):
649 659 def setUp(self):
650 660 self.intwrapper = IntegerWrapper()
651 661 ip.ast_transformers.append(self.intwrapper)
652 662
653 663 self.calls = []
654 664 def Integer(*args):
655 665 self.calls.append(args)
656 666 return args
657 667 ip.push({"Integer": Integer})
658 668
659 669 def tearDown(self):
660 670 ip.ast_transformers.remove(self.intwrapper)
661 671 del ip.user_ns['Integer']
662 672
663 673 def test_run_cell(self):
664 674 ip.run_cell("n = 2")
665 675 self.assertEqual(self.calls, [(2,)])
666 676
667 677 # This shouldn't throw an error
668 678 ip.run_cell("o = 2.0")
669 679 self.assertEqual(ip.user_ns['o'], 2.0)
670 680
671 681 def test_timeit(self):
672 682 called = set()
673 683 def f(x):
674 684 called.add(x)
675 685 ip.push({'f':f})
676 686
677 687 with tt.AssertPrints("best of "):
678 688 ip.run_line_magic("timeit", "-n1 f(1)")
679 689 self.assertEqual(called, set([(1,)]))
680 690 called.clear()
681 691
682 692 with tt.AssertPrints("best of "):
683 693 ip.run_cell_magic("timeit", "-n1 f(2)", "f(3)")
684 694 self.assertEqual(called, set([(2,), (3,)]))
685 695
686 696 class ErrorTransformer(ast.NodeTransformer):
687 697 """Throws an error when it sees a number."""
688 698 def visit_Num(self, node):
689 699 raise ValueError("test")
690 700
691 701 class TestAstTransformError(unittest.TestCase):
692 702 def test_unregistering(self):
693 703 err_transformer = ErrorTransformer()
694 704 ip.ast_transformers.append(err_transformer)
695 705
696 706 with tt.AssertPrints("unregister", channel='stderr'):
697 707 ip.run_cell("1 + 2")
698 708
699 709 # This should have been removed.
700 710 nt.assert_not_in(err_transformer, ip.ast_transformers)
701 711
702 712
703 713 class StringRejector(ast.NodeTransformer):
704 714 """Throws an InputRejected when it sees a string literal.
705 715
706 716 Used to verify that NodeTransformers can signal that a piece of code should
707 717 not be executed by throwing an InputRejected.
708 718 """
709 719
710 720 def visit_Str(self, node):
711 721 raise InputRejected("test")
712 722
713 723
714 724 class TestAstTransformInputRejection(unittest.TestCase):
715 725
716 726 def setUp(self):
717 727 self.transformer = StringRejector()
718 728 ip.ast_transformers.append(self.transformer)
719 729
720 730 def tearDown(self):
721 731 ip.ast_transformers.remove(self.transformer)
722 732
723 733 def test_input_rejection(self):
724 734 """Check that NodeTransformers can reject input."""
725 735
726 736 expect_exception_tb = tt.AssertPrints("InputRejected: test")
727 737 expect_no_cell_output = tt.AssertNotPrints("'unsafe'", suppress=False)
728 738
729 739 # Run the same check twice to verify that the transformer is not
730 740 # disabled after raising.
731 741 with expect_exception_tb, expect_no_cell_output:
732 742 ip.run_cell("'unsafe'")
733 743
734 744 with expect_exception_tb, expect_no_cell_output:
735 745 ip.run_cell("'unsafe'")
736 746
737 747 def test__IPYTHON__():
738 748 # This shouldn't raise a NameError, that's all
739 749 __IPYTHON__
740 750
741 751
742 752 class DummyRepr(object):
743 753 def __repr__(self):
744 754 return "DummyRepr"
745 755
746 756 def _repr_html_(self):
747 757 return "<b>dummy</b>"
748 758
749 759 def _repr_javascript_(self):
750 760 return "console.log('hi');", {'key': 'value'}
751 761
752 762
753 763 def test_user_variables():
754 764 # enable all formatters
755 765 ip.display_formatter.active_types = ip.display_formatter.format_types
756 766
757 767 ip.user_ns['dummy'] = d = DummyRepr()
758 768 keys = set(['dummy', 'doesnotexist'])
759 769 r = ip.user_expressions({ key:key for key in keys})
760 770
761 771 nt.assert_equal(keys, set(r.keys()))
762 772 dummy = r['dummy']
763 773 nt.assert_equal(set(['status', 'data', 'metadata']), set(dummy.keys()))
764 774 nt.assert_equal(dummy['status'], 'ok')
765 775 data = dummy['data']
766 776 metadata = dummy['metadata']
767 777 nt.assert_equal(data.get('text/html'), d._repr_html_())
768 778 js, jsmd = d._repr_javascript_()
769 779 nt.assert_equal(data.get('application/javascript'), js)
770 780 nt.assert_equal(metadata.get('application/javascript'), jsmd)
771 781
772 782 dne = r['doesnotexist']
773 783 nt.assert_equal(dne['status'], 'error')
774 784 nt.assert_equal(dne['ename'], 'NameError')
775 785
776 786 # back to text only
777 787 ip.display_formatter.active_types = ['text/plain']
778 788
779 789 def test_user_expression():
780 790 # enable all formatters
781 791 ip.display_formatter.active_types = ip.display_formatter.format_types
782 792 query = {
783 793 'a' : '1 + 2',
784 794 'b' : '1/0',
785 795 }
786 796 r = ip.user_expressions(query)
787 797 import pprint
788 798 pprint.pprint(r)
789 799 nt.assert_equal(set(r.keys()), set(query.keys()))
790 800 a = r['a']
791 801 nt.assert_equal(set(['status', 'data', 'metadata']), set(a.keys()))
792 802 nt.assert_equal(a['status'], 'ok')
793 803 data = a['data']
794 804 metadata = a['metadata']
795 805 nt.assert_equal(data.get('text/plain'), '3')
796 806
797 807 b = r['b']
798 808 nt.assert_equal(b['status'], 'error')
799 809 nt.assert_equal(b['ename'], 'ZeroDivisionError')
800 810
801 811 # back to text only
802 812 ip.display_formatter.active_types = ['text/plain']
803 813
804 814
805 815
806 816
807 817
808 818 class TestSyntaxErrorTransformer(unittest.TestCase):
809 819 """Check that SyntaxError raised by an input transformer is handled by run_cell()"""
810 820
811 821 class SyntaxErrorTransformer(InputTransformer):
812 822
813 823 def push(self, line):
814 824 pos = line.find('syntaxerror')
815 825 if pos >= 0:
816 826 e = SyntaxError('input contains "syntaxerror"')
817 827 e.text = line
818 828 e.offset = pos + 1
819 829 raise e
820 830 return line
821 831
822 832 def reset(self):
823 833 pass
824 834
825 835 def setUp(self):
826 836 self.transformer = TestSyntaxErrorTransformer.SyntaxErrorTransformer()
827 837 ip.input_splitter.python_line_transforms.append(self.transformer)
828 838 ip.input_transformer_manager.python_line_transforms.append(self.transformer)
829 839
830 840 def tearDown(self):
831 841 ip.input_splitter.python_line_transforms.remove(self.transformer)
832 842 ip.input_transformer_manager.python_line_transforms.remove(self.transformer)
833 843
834 844 def test_syntaxerror_input_transformer(self):
835 845 with tt.AssertPrints('1234'):
836 846 ip.run_cell('1234')
837 847 with tt.AssertPrints('SyntaxError: invalid syntax'):
838 848 ip.run_cell('1 2 3') # plain python syntax error
839 849 with tt.AssertPrints('SyntaxError: input contains "syntaxerror"'):
840 850 ip.run_cell('2345 # syntaxerror') # input transformer syntax error
841 851 with tt.AssertPrints('3456'):
842 852 ip.run_cell('3456')
843 853
844 854
845 855
846 856 def test_warning_suppression():
847 857 ip.run_cell("import warnings")
848 858 try:
849 859 with tt.AssertPrints("UserWarning: asdf", channel="stderr"):
850 860 ip.run_cell("warnings.warn('asdf')")
851 861 # Here's the real test -- if we run that again, we should get the
852 862 # warning again. Traditionally, each warning was only issued once per
853 863 # IPython session (approximately), even if the user typed in new and
854 864 # different code that should have also triggered the warning, leading
855 865 # to much confusion.
856 866 with tt.AssertPrints("UserWarning: asdf", channel="stderr"):
857 867 ip.run_cell("warnings.warn('asdf')")
858 868 finally:
859 869 ip.run_cell("del warnings")
General Comments 0
You need to be logged in to leave comments. Login now