##// END OF EJS Templates
Catch KeyboardInterrupt for !commands on Windows...
Thomas Kluyver -
Show More

The requested changes are too big and content was truncated. Show full diff

1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
@@ -1,869 +1,869 b''
1 1 # -*- coding: utf-8 -*-
2 2 """Tests for the key interactiveshell module.
3 3
4 4 Historically the main classes in interactiveshell have been under-tested. This
5 5 module should grow as many single-method tests as possible to trap many of the
6 6 recurring bugs we seem to encounter with high-level interaction.
7 7 """
8 8
9 9 # Copyright (c) IPython Development Team.
10 10 # Distributed under the terms of the Modified BSD License.
11 11
12 12 import ast
13 13 import os
14 14 import signal
15 15 import shutil
16 16 import sys
17 17 import tempfile
18 18 import unittest
19 19 try:
20 20 from unittest import mock
21 21 except ImportError:
22 22 import mock
23 23 from os.path import join
24 24
25 25 import nose.tools as nt
26 26
27 27 from IPython.core.error import InputRejected
28 28 from IPython.core.inputtransformer import InputTransformer
29 29 from IPython.testing.decorators import (
30 30 skipif, skip_win32, onlyif_unicode_paths, onlyif_cmds_exist,
31 31 )
32 32 from IPython.testing import tools as tt
33 33 from IPython.utils import io
34 34 from IPython.utils.process import find_cmd
35 35 from IPython.utils import py3compat
36 36 from IPython.utils.py3compat import unicode_type, PY3
37 37
38 38 if PY3:
39 39 from io import StringIO
40 40 else:
41 41 from StringIO import StringIO
42 42
43 43 #-----------------------------------------------------------------------------
44 44 # Globals
45 45 #-----------------------------------------------------------------------------
46 46 # This is used by every single test, no point repeating it ad nauseam
47 47 ip = get_ipython()
48 48
49 49 #-----------------------------------------------------------------------------
50 50 # Tests
51 51 #-----------------------------------------------------------------------------
52 52
53 53 class InteractiveShellTestCase(unittest.TestCase):
54 54 def test_naked_string_cells(self):
55 55 """Test that cells with only naked strings are fully executed"""
56 56 # First, single-line inputs
57 57 ip.run_cell('"a"\n')
58 58 self.assertEqual(ip.user_ns['_'], 'a')
59 59 # And also multi-line cells
60 60 ip.run_cell('"""a\nb"""\n')
61 61 self.assertEqual(ip.user_ns['_'], 'a\nb')
62 62
63 63 def test_run_empty_cell(self):
64 64 """Just make sure we don't get a horrible error with a blank
65 65 cell of input. Yes, I did overlook that."""
66 66 old_xc = ip.execution_count
67 67 ip.run_cell('')
68 68 self.assertEqual(ip.execution_count, old_xc)
69 69
70 70 def test_run_cell_multiline(self):
71 71 """Multi-block, multi-line cells must execute correctly.
72 72 """
73 73 src = '\n'.join(["x=1",
74 74 "y=2",
75 75 "if 1:",
76 76 " x += 1",
77 77 " y += 1",])
78 78 ip.run_cell(src)
79 79 self.assertEqual(ip.user_ns['x'], 2)
80 80 self.assertEqual(ip.user_ns['y'], 3)
81 81
82 82 def test_multiline_string_cells(self):
83 83 "Code sprinkled with multiline strings should execute (GH-306)"
84 84 ip.run_cell('tmp=0')
85 85 self.assertEqual(ip.user_ns['tmp'], 0)
86 86 ip.run_cell('tmp=1;"""a\nb"""\n')
87 87 self.assertEqual(ip.user_ns['tmp'], 1)
88 88
89 89 def test_dont_cache_with_semicolon(self):
90 90 "Ending a line with semicolon should not cache the returned object (GH-307)"
91 91 oldlen = len(ip.user_ns['Out'])
92 92 for cell in ['1;', '1;1;']:
93 93 ip.run_cell(cell, store_history=True)
94 94 newlen = len(ip.user_ns['Out'])
95 95 self.assertEqual(oldlen, newlen)
96 96 i = 0
97 97 #also test the default caching behavior
98 98 for cell in ['1', '1;1']:
99 99 ip.run_cell(cell, store_history=True)
100 100 newlen = len(ip.user_ns['Out'])
101 101 i += 1
102 102 self.assertEqual(oldlen+i, newlen)
103 103
104 104 def test_In_variable(self):
105 105 "Verify that In variable grows with user input (GH-284)"
106 106 oldlen = len(ip.user_ns['In'])
107 107 ip.run_cell('1;', store_history=True)
108 108 newlen = len(ip.user_ns['In'])
109 109 self.assertEqual(oldlen+1, newlen)
110 110 self.assertEqual(ip.user_ns['In'][-1],'1;')
111 111
112 112 def test_magic_names_in_string(self):
113 113 ip.run_cell('a = """\n%exit\n"""')
114 114 self.assertEqual(ip.user_ns['a'], '\n%exit\n')
115 115
116 116 def test_trailing_newline(self):
117 117 """test that running !(command) does not raise a SyntaxError"""
118 118 ip.run_cell('!(true)\n', False)
119 119 ip.run_cell('!(true)\n\n\n', False)
120 120
121 121 def test_gh_597(self):
122 122 """Pretty-printing lists of objects with non-ascii reprs may cause
123 123 problems."""
124 124 class Spam(object):
125 125 def __repr__(self):
126 126 return "\xe9"*50
127 127 import IPython.core.formatters
128 128 f = IPython.core.formatters.PlainTextFormatter()
129 129 f([Spam(),Spam()])
130 130
131 131
132 132 def test_future_flags(self):
133 133 """Check that future flags are used for parsing code (gh-777)"""
134 134 ip.run_cell('from __future__ import print_function')
135 135 try:
136 136 ip.run_cell('prfunc_return_val = print(1,2, sep=" ")')
137 137 assert 'prfunc_return_val' in ip.user_ns
138 138 finally:
139 139 # Reset compiler flags so we don't mess up other tests.
140 140 ip.compile.reset_compiler_flags()
141 141
142 142 def test_future_unicode(self):
143 143 """Check that unicode_literals is imported from __future__ (gh #786)"""
144 144 try:
145 145 ip.run_cell(u'byte_str = "a"')
146 146 assert isinstance(ip.user_ns['byte_str'], str) # string literals are byte strings by default
147 147 ip.run_cell('from __future__ import unicode_literals')
148 148 ip.run_cell(u'unicode_str = "a"')
149 149 assert isinstance(ip.user_ns['unicode_str'], unicode_type) # strings literals are now unicode
150 150 finally:
151 151 # Reset compiler flags so we don't mess up other tests.
152 152 ip.compile.reset_compiler_flags()
153 153
154 154 def test_can_pickle(self):
155 155 "Can we pickle objects defined interactively (GH-29)"
156 156 ip = get_ipython()
157 157 ip.reset()
158 158 ip.run_cell(("class Mylist(list):\n"
159 159 " def __init__(self,x=[]):\n"
160 160 " list.__init__(self,x)"))
161 161 ip.run_cell("w=Mylist([1,2,3])")
162 162
163 163 from pickle import dumps
164 164
165 165 # We need to swap in our main module - this is only necessary
166 166 # inside the test framework, because IPython puts the interactive module
167 167 # in place (but the test framework undoes this).
168 168 _main = sys.modules['__main__']
169 169 sys.modules['__main__'] = ip.user_module
170 170 try:
171 171 res = dumps(ip.user_ns["w"])
172 172 finally:
173 173 sys.modules['__main__'] = _main
174 174 self.assertTrue(isinstance(res, bytes))
175 175
176 176 def test_global_ns(self):
177 177 "Code in functions must be able to access variables outside them."
178 178 ip = get_ipython()
179 179 ip.run_cell("a = 10")
180 180 ip.run_cell(("def f(x):\n"
181 181 " return x + a"))
182 182 ip.run_cell("b = f(12)")
183 183 self.assertEqual(ip.user_ns["b"], 22)
184 184
185 185 def test_bad_custom_tb(self):
186 186 """Check that InteractiveShell is protected from bad custom exception handlers"""
187 187 from IPython.utils import io
188 188 save_stderr = io.stderr
189 189 try:
190 190 # capture stderr
191 191 io.stderr = StringIO()
192 192 ip.set_custom_exc((IOError,), lambda etype,value,tb: 1/0)
193 193 self.assertEqual(ip.custom_exceptions, (IOError,))
194 194 ip.run_cell(u'raise IOError("foo")')
195 195 self.assertEqual(ip.custom_exceptions, ())
196 196 self.assertTrue("Custom TB Handler failed" in io.stderr.getvalue())
197 197 finally:
198 198 io.stderr = save_stderr
199 199
200 200 def test_bad_custom_tb_return(self):
201 201 """Check that InteractiveShell is protected from bad return types in custom exception handlers"""
202 202 from IPython.utils import io
203 203 save_stderr = io.stderr
204 204 try:
205 205 # capture stderr
206 206 io.stderr = StringIO()
207 207 ip.set_custom_exc((NameError,),lambda etype,value,tb, tb_offset=None: 1)
208 208 self.assertEqual(ip.custom_exceptions, (NameError,))
209 209 ip.run_cell(u'a=abracadabra')
210 210 self.assertEqual(ip.custom_exceptions, ())
211 211 self.assertTrue("Custom TB Handler failed" in io.stderr.getvalue())
212 212 finally:
213 213 io.stderr = save_stderr
214 214
215 215 def test_drop_by_id(self):
216 216 myvars = {"a":object(), "b":object(), "c": object()}
217 217 ip.push(myvars, interactive=False)
218 218 for name in myvars:
219 219 assert name in ip.user_ns, name
220 220 assert name in ip.user_ns_hidden, name
221 221 ip.user_ns['b'] = 12
222 222 ip.drop_by_id(myvars)
223 223 for name in ["a", "c"]:
224 224 assert name not in ip.user_ns, name
225 225 assert name not in ip.user_ns_hidden, name
226 226 assert ip.user_ns['b'] == 12
227 227 ip.reset()
228 228
229 229 def test_var_expand(self):
230 230 ip.user_ns['f'] = u'Ca\xf1o'
231 231 self.assertEqual(ip.var_expand(u'echo $f'), u'echo Ca\xf1o')
232 232 self.assertEqual(ip.var_expand(u'echo {f}'), u'echo Ca\xf1o')
233 233 self.assertEqual(ip.var_expand(u'echo {f[:-1]}'), u'echo Ca\xf1')
234 234 self.assertEqual(ip.var_expand(u'echo {1*2}'), u'echo 2')
235 235
236 236 ip.user_ns['f'] = b'Ca\xc3\xb1o'
237 237 # This should not raise any exception:
238 238 ip.var_expand(u'echo $f')
239 239
240 240 def test_var_expand_local(self):
241 241 """Test local variable expansion in !system and %magic calls"""
242 242 # !system
243 243 ip.run_cell('def test():\n'
244 244 ' lvar = "ttt"\n'
245 245 ' ret = !echo {lvar}\n'
246 246 ' return ret[0]\n')
247 247 res = ip.user_ns['test']()
248 248 nt.assert_in('ttt', res)
249 249
250 250 # %magic
251 251 ip.run_cell('def makemacro():\n'
252 252 ' macroname = "macro_var_expand_locals"\n'
253 253 ' %macro {macroname} codestr\n')
254 254 ip.user_ns['codestr'] = "str(12)"
255 255 ip.run_cell('makemacro()')
256 256 nt.assert_in('macro_var_expand_locals', ip.user_ns)
257 257
258 258 def test_var_expand_self(self):
259 259 """Test variable expansion with the name 'self', which was failing.
260 260
261 261 See https://github.com/ipython/ipython/issues/1878#issuecomment-7698218
262 262 """
263 263 ip.run_cell('class cTest:\n'
264 264 ' classvar="see me"\n'
265 265 ' def test(self):\n'
266 266 ' res = !echo Variable: {self.classvar}\n'
267 267 ' return res[0]\n')
268 268 nt.assert_in('see me', ip.user_ns['cTest']().test())
269 269
270 270 def test_bad_var_expand(self):
271 271 """var_expand on invalid formats shouldn't raise"""
272 272 # SyntaxError
273 273 self.assertEqual(ip.var_expand(u"{'a':5}"), u"{'a':5}")
274 274 # NameError
275 275 self.assertEqual(ip.var_expand(u"{asdf}"), u"{asdf}")
276 276 # ZeroDivisionError
277 277 self.assertEqual(ip.var_expand(u"{1/0}"), u"{1/0}")
278 278
279 279 def test_silent_postexec(self):
280 280 """run_cell(silent=True) doesn't invoke pre/post_run_cell callbacks"""
281 281 pre_explicit = mock.Mock()
282 282 pre_always = mock.Mock()
283 283 post_explicit = mock.Mock()
284 284 post_always = mock.Mock()
285 285
286 286 ip.events.register('pre_run_cell', pre_explicit)
287 287 ip.events.register('pre_execute', pre_always)
288 288 ip.events.register('post_run_cell', post_explicit)
289 289 ip.events.register('post_execute', post_always)
290 290
291 291 try:
292 292 ip.run_cell("1", silent=True)
293 293 assert pre_always.called
294 294 assert not pre_explicit.called
295 295 assert post_always.called
296 296 assert not post_explicit.called
297 297 # double-check that non-silent exec did what we expected
298 298 # silent to avoid
299 299 ip.run_cell("1")
300 300 assert pre_explicit.called
301 301 assert post_explicit.called
302 302 finally:
303 303 # remove post-exec
304 304 ip.events.unregister('pre_run_cell', pre_explicit)
305 305 ip.events.unregister('pre_execute', pre_always)
306 306 ip.events.unregister('post_run_cell', post_explicit)
307 307 ip.events.unregister('post_execute', post_always)
308 308
309 309 def test_silent_noadvance(self):
310 310 """run_cell(silent=True) doesn't advance execution_count"""
311 311 ec = ip.execution_count
312 312 # silent should force store_history=False
313 313 ip.run_cell("1", store_history=True, silent=True)
314 314
315 315 self.assertEqual(ec, ip.execution_count)
316 316 # double-check that non-silent exec did what we expected
317 317 # silent to avoid
318 318 ip.run_cell("1", store_history=True)
319 319 self.assertEqual(ec+1, ip.execution_count)
320 320
321 321 def test_silent_nodisplayhook(self):
322 322 """run_cell(silent=True) doesn't trigger displayhook"""
323 323 d = dict(called=False)
324 324
325 325 trap = ip.display_trap
326 326 save_hook = trap.hook
327 327
328 328 def failing_hook(*args, **kwargs):
329 329 d['called'] = True
330 330
331 331 try:
332 332 trap.hook = failing_hook
333 333 ip.run_cell("1", silent=True)
334 334 self.assertFalse(d['called'])
335 335 # double-check that non-silent exec did what we expected
336 336 # silent to avoid
337 337 ip.run_cell("1")
338 338 self.assertTrue(d['called'])
339 339 finally:
340 340 trap.hook = save_hook
341 341
342 342 @skipif(sys.version_info[0] >= 3, "softspace removed in py3")
343 343 def test_print_softspace(self):
344 344 """Verify that softspace is handled correctly when executing multiple
345 345 statements.
346 346
347 347 In [1]: print 1; print 2
348 348 1
349 349 2
350 350
351 351 In [2]: print 1,; print 2
352 352 1 2
353 353 """
354 354
355 355 def test_ofind_line_magic(self):
356 356 from IPython.core.magic import register_line_magic
357 357
358 358 @register_line_magic
359 359 def lmagic(line):
360 360 "A line magic"
361 361
362 362 # Get info on line magic
363 363 lfind = ip._ofind('lmagic')
364 364 info = dict(found=True, isalias=False, ismagic=True,
365 365 namespace = 'IPython internal', obj= lmagic.__wrapped__,
366 366 parent = None)
367 367 nt.assert_equal(lfind, info)
368 368
369 369 def test_ofind_cell_magic(self):
370 370 from IPython.core.magic import register_cell_magic
371 371
372 372 @register_cell_magic
373 373 def cmagic(line, cell):
374 374 "A cell magic"
375 375
376 376 # Get info on cell magic
377 377 find = ip._ofind('cmagic')
378 378 info = dict(found=True, isalias=False, ismagic=True,
379 379 namespace = 'IPython internal', obj= cmagic.__wrapped__,
380 380 parent = None)
381 381 nt.assert_equal(find, info)
382 382
383 383 def test_ofind_property_with_error(self):
384 384 class A(object):
385 385 @property
386 386 def foo(self):
387 387 raise NotImplementedError()
388 388 a = A()
389 389
390 390 found = ip._ofind('a.foo', [('locals', locals())])
391 391 info = dict(found=True, isalias=False, ismagic=False,
392 392 namespace='locals', obj=A.foo, parent=a)
393 393 nt.assert_equal(found, info)
394 394
395 395 def test_ofind_multiple_attribute_lookups(self):
396 396 class A(object):
397 397 @property
398 398 def foo(self):
399 399 raise NotImplementedError()
400 400
401 401 a = A()
402 402 a.a = A()
403 403 a.a.a = A()
404 404
405 405 found = ip._ofind('a.a.a.foo', [('locals', locals())])
406 406 info = dict(found=True, isalias=False, ismagic=False,
407 407 namespace='locals', obj=A.foo, parent=a.a.a)
408 408 nt.assert_equal(found, info)
409 409
410 410 def test_ofind_slotted_attributes(self):
411 411 class A(object):
412 412 __slots__ = ['foo']
413 413 def __init__(self):
414 414 self.foo = 'bar'
415 415
416 416 a = A()
417 417 found = ip._ofind('a.foo', [('locals', locals())])
418 418 info = dict(found=True, isalias=False, ismagic=False,
419 419 namespace='locals', obj=a.foo, parent=a)
420 420 nt.assert_equal(found, info)
421 421
422 422 found = ip._ofind('a.bar', [('locals', locals())])
423 423 info = dict(found=False, isalias=False, ismagic=False,
424 424 namespace=None, obj=None, parent=a)
425 425 nt.assert_equal(found, info)
426 426
427 427 def test_ofind_prefers_property_to_instance_level_attribute(self):
428 428 class A(object):
429 429 @property
430 430 def foo(self):
431 431 return 'bar'
432 432 a = A()
433 433 a.__dict__['foo'] = 'baz'
434 434 nt.assert_equal(a.foo, 'bar')
435 435 found = ip._ofind('a.foo', [('locals', locals())])
436 436 nt.assert_is(found['obj'], A.foo)
437 437
438 438 def test_custom_exception(self):
439 439 called = []
440 440 def my_handler(shell, etype, value, tb, tb_offset=None):
441 441 called.append(etype)
442 442 shell.showtraceback((etype, value, tb), tb_offset=tb_offset)
443 443
444 444 ip.set_custom_exc((ValueError,), my_handler)
445 445 try:
446 446 ip.run_cell("raise ValueError('test')")
447 447 # Check that this was called, and only once.
448 448 self.assertEqual(called, [ValueError])
449 449 finally:
450 450 # Reset the custom exception hook
451 451 ip.set_custom_exc((), None)
452 452
453 453 @skipif(sys.version_info[0] >= 3, "no differences with __future__ in py3")
454 454 def test_future_environment(self):
455 455 "Can we run code with & without the shell's __future__ imports?"
456 456 ip.run_cell("from __future__ import division")
457 457 ip.run_cell("a = 1/2", shell_futures=True)
458 458 self.assertEqual(ip.user_ns['a'], 0.5)
459 459 ip.run_cell("b = 1/2", shell_futures=False)
460 460 self.assertEqual(ip.user_ns['b'], 0)
461 461
462 462 ip.compile.reset_compiler_flags()
463 463 # This shouldn't leak to the shell's compiler
464 464 ip.run_cell("from __future__ import division \nc=1/2", shell_futures=False)
465 465 self.assertEqual(ip.user_ns['c'], 0.5)
466 466 ip.run_cell("d = 1/2", shell_futures=True)
467 467 self.assertEqual(ip.user_ns['d'], 0)
468 468
469 469 def test_mktempfile(self):
470 470 filename = ip.mktempfile()
471 471 # Check that we can open the file again on Windows
472 472 with open(filename, 'w') as f:
473 473 f.write('abc')
474 474
475 475 filename = ip.mktempfile(data='blah')
476 476 with open(filename, 'r') as f:
477 477 self.assertEqual(f.read(), 'blah')
478 478
479 479 def test_new_main_mod(self):
480 480 # Smoketest to check that this accepts a unicode module name
481 481 name = u'jiefmw'
482 482 mod = ip.new_main_mod(u'%s.py' % name, name)
483 483 self.assertEqual(mod.__name__, name)
484 484
485 485 class TestSafeExecfileNonAsciiPath(unittest.TestCase):
486 486
487 487 @onlyif_unicode_paths
488 488 def setUp(self):
489 489 self.BASETESTDIR = tempfile.mkdtemp()
490 490 self.TESTDIR = join(self.BASETESTDIR, u"åäö")
491 491 os.mkdir(self.TESTDIR)
492 492 with open(join(self.TESTDIR, u"åäötestscript.py"), "w") as sfile:
493 493 sfile.write("pass\n")
494 494 self.oldpath = py3compat.getcwd()
495 495 os.chdir(self.TESTDIR)
496 496 self.fname = u"åäötestscript.py"
497 497
498 498 def tearDown(self):
499 499 os.chdir(self.oldpath)
500 500 shutil.rmtree(self.BASETESTDIR)
501 501
502 502 @onlyif_unicode_paths
503 503 def test_1(self):
504 504 """Test safe_execfile with non-ascii path
505 505 """
506 506 ip.safe_execfile(self.fname, {}, raise_exceptions=True)
507 507
508 508 class ExitCodeChecks(tt.TempFileMixin):
509 509 def test_exit_code_ok(self):
510 510 self.system('exit 0')
511 511 self.assertEqual(ip.user_ns['_exit_code'], 0)
512 512
513 513 def test_exit_code_error(self):
514 514 self.system('exit 1')
515 515 self.assertEqual(ip.user_ns['_exit_code'], 1)
516 516
517 517 @skipif(not hasattr(signal, 'SIGALRM'))
518 518 def test_exit_code_signal(self):
519 519 self.mktmp("import signal, time\n"
520 520 "signal.setitimer(signal.ITIMER_REAL, 0.1)\n"
521 521 "time.sleep(1)\n")
522 522 self.system("%s %s" % (sys.executable, self.fname))
523 523 self.assertEqual(ip.user_ns['_exit_code'], -signal.SIGALRM)
524 524
525 525 @onlyif_cmds_exist("csh")
526 526 def test_exit_code_signal_csh(self):
527 527 SHELL = os.environ.get('SHELL', None)
528 528 os.environ['SHELL'] = find_cmd("csh")
529 529 try:
530 530 self.test_exit_code_signal()
531 531 finally:
532 532 if SHELL is not None:
533 533 os.environ['SHELL'] = SHELL
534 534 else:
535 535 del os.environ['SHELL']
536 536
537 537 class TestSystemRaw(unittest.TestCase, ExitCodeChecks):
538 538 system = ip.system_raw
539 539
540 540 @onlyif_unicode_paths
541 541 def test_1(self):
542 542 """Test system_raw with non-ascii cmd
543 543 """
544 544 cmd = u'''python -c "'åäö'" '''
545 545 ip.system_raw(cmd)
546 546
547 @mock.patch('subprocess.call')
548 def test_control_c(self, call_mock):
549 call_mock.side_effect = KeyboardInterrupt()
547 @mock.patch('subprocess.call', side_effect=KeyboardInterrupt)
548 @mock.patch('os.system', side_effect=KeyboardInterrupt)
549 def test_control_c(self, *mocks):
550 550 try:
551 551 self.system("sleep 1 # wont happen")
552 552 except KeyboardInterrupt:
553 553 self.fail("system call should intercept "
554 554 "keyboard interrupt from subprocess.call")
555 555 self.assertEqual(ip.user_ns['_exit_code'], -signal.SIGINT)
556 556
557 557 # TODO: Exit codes are currently ignored on Windows.
558 558 class TestSystemPipedExitCode(unittest.TestCase, ExitCodeChecks):
559 559 system = ip.system_piped
560 560
561 561 @skip_win32
562 562 def test_exit_code_ok(self):
563 563 ExitCodeChecks.test_exit_code_ok(self)
564 564
565 565 @skip_win32
566 566 def test_exit_code_error(self):
567 567 ExitCodeChecks.test_exit_code_error(self)
568 568
569 569 @skip_win32
570 570 def test_exit_code_signal(self):
571 571 ExitCodeChecks.test_exit_code_signal(self)
572 572
573 573 class TestModules(unittest.TestCase, tt.TempFileMixin):
574 574 def test_extraneous_loads(self):
575 575 """Test we're not loading modules on startup that we shouldn't.
576 576 """
577 577 self.mktmp("import sys\n"
578 578 "print('numpy' in sys.modules)\n"
579 579 "print('IPython.parallel' in sys.modules)\n"
580 580 "print('IPython.kernel.zmq' in sys.modules)\n"
581 581 )
582 582 out = "False\nFalse\nFalse\n"
583 583 tt.ipexec_validate(self.fname, out)
584 584
585 585 class Negator(ast.NodeTransformer):
586 586 """Negates all number literals in an AST."""
587 587 def visit_Num(self, node):
588 588 node.n = -node.n
589 589 return node
590 590
591 591 class TestAstTransform(unittest.TestCase):
592 592 def setUp(self):
593 593 self.negator = Negator()
594 594 ip.ast_transformers.append(self.negator)
595 595
596 596 def tearDown(self):
597 597 ip.ast_transformers.remove(self.negator)
598 598
599 599 def test_run_cell(self):
600 600 with tt.AssertPrints('-34'):
601 601 ip.run_cell('print (12 + 22)')
602 602
603 603 # A named reference to a number shouldn't be transformed.
604 604 ip.user_ns['n'] = 55
605 605 with tt.AssertNotPrints('-55'):
606 606 ip.run_cell('print (n)')
607 607
608 608 def test_timeit(self):
609 609 called = set()
610 610 def f(x):
611 611 called.add(x)
612 612 ip.push({'f':f})
613 613
614 614 with tt.AssertPrints("best of "):
615 615 ip.run_line_magic("timeit", "-n1 f(1)")
616 616 self.assertEqual(called, set([-1]))
617 617 called.clear()
618 618
619 619 with tt.AssertPrints("best of "):
620 620 ip.run_cell_magic("timeit", "-n1 f(2)", "f(3)")
621 621 self.assertEqual(called, set([-2, -3]))
622 622
623 623 def test_time(self):
624 624 called = []
625 625 def f(x):
626 626 called.append(x)
627 627 ip.push({'f':f})
628 628
629 629 # Test with an expression
630 630 with tt.AssertPrints("Wall time: "):
631 631 ip.run_line_magic("time", "f(5+9)")
632 632 self.assertEqual(called, [-14])
633 633 called[:] = []
634 634
635 635 # Test with a statement (different code path)
636 636 with tt.AssertPrints("Wall time: "):
637 637 ip.run_line_magic("time", "a = f(-3 + -2)")
638 638 self.assertEqual(called, [5])
639 639
640 640 def test_macro(self):
641 641 ip.push({'a':10})
642 642 # The AST transformation makes this do a+=-1
643 643 ip.define_macro("amacro", "a+=1\nprint(a)")
644 644
645 645 with tt.AssertPrints("9"):
646 646 ip.run_cell("amacro")
647 647 with tt.AssertPrints("8"):
648 648 ip.run_cell("amacro")
649 649
650 650 class IntegerWrapper(ast.NodeTransformer):
651 651 """Wraps all integers in a call to Integer()"""
652 652 def visit_Num(self, node):
653 653 if isinstance(node.n, int):
654 654 return ast.Call(func=ast.Name(id='Integer', ctx=ast.Load()),
655 655 args=[node], keywords=[])
656 656 return node
657 657
658 658 class TestAstTransform2(unittest.TestCase):
659 659 def setUp(self):
660 660 self.intwrapper = IntegerWrapper()
661 661 ip.ast_transformers.append(self.intwrapper)
662 662
663 663 self.calls = []
664 664 def Integer(*args):
665 665 self.calls.append(args)
666 666 return args
667 667 ip.push({"Integer": Integer})
668 668
669 669 def tearDown(self):
670 670 ip.ast_transformers.remove(self.intwrapper)
671 671 del ip.user_ns['Integer']
672 672
673 673 def test_run_cell(self):
674 674 ip.run_cell("n = 2")
675 675 self.assertEqual(self.calls, [(2,)])
676 676
677 677 # This shouldn't throw an error
678 678 ip.run_cell("o = 2.0")
679 679 self.assertEqual(ip.user_ns['o'], 2.0)
680 680
681 681 def test_timeit(self):
682 682 called = set()
683 683 def f(x):
684 684 called.add(x)
685 685 ip.push({'f':f})
686 686
687 687 with tt.AssertPrints("best of "):
688 688 ip.run_line_magic("timeit", "-n1 f(1)")
689 689 self.assertEqual(called, set([(1,)]))
690 690 called.clear()
691 691
692 692 with tt.AssertPrints("best of "):
693 693 ip.run_cell_magic("timeit", "-n1 f(2)", "f(3)")
694 694 self.assertEqual(called, set([(2,), (3,)]))
695 695
696 696 class ErrorTransformer(ast.NodeTransformer):
697 697 """Throws an error when it sees a number."""
698 698 def visit_Num(self, node):
699 699 raise ValueError("test")
700 700
701 701 class TestAstTransformError(unittest.TestCase):
702 702 def test_unregistering(self):
703 703 err_transformer = ErrorTransformer()
704 704 ip.ast_transformers.append(err_transformer)
705 705
706 706 with tt.AssertPrints("unregister", channel='stderr'):
707 707 ip.run_cell("1 + 2")
708 708
709 709 # This should have been removed.
710 710 nt.assert_not_in(err_transformer, ip.ast_transformers)
711 711
712 712
713 713 class StringRejector(ast.NodeTransformer):
714 714 """Throws an InputRejected when it sees a string literal.
715 715
716 716 Used to verify that NodeTransformers can signal that a piece of code should
717 717 not be executed by throwing an InputRejected.
718 718 """
719 719
720 720 def visit_Str(self, node):
721 721 raise InputRejected("test")
722 722
723 723
724 724 class TestAstTransformInputRejection(unittest.TestCase):
725 725
726 726 def setUp(self):
727 727 self.transformer = StringRejector()
728 728 ip.ast_transformers.append(self.transformer)
729 729
730 730 def tearDown(self):
731 731 ip.ast_transformers.remove(self.transformer)
732 732
733 733 def test_input_rejection(self):
734 734 """Check that NodeTransformers can reject input."""
735 735
736 736 expect_exception_tb = tt.AssertPrints("InputRejected: test")
737 737 expect_no_cell_output = tt.AssertNotPrints("'unsafe'", suppress=False)
738 738
739 739 # Run the same check twice to verify that the transformer is not
740 740 # disabled after raising.
741 741 with expect_exception_tb, expect_no_cell_output:
742 742 ip.run_cell("'unsafe'")
743 743
744 744 with expect_exception_tb, expect_no_cell_output:
745 745 ip.run_cell("'unsafe'")
746 746
747 747 def test__IPYTHON__():
748 748 # This shouldn't raise a NameError, that's all
749 749 __IPYTHON__
750 750
751 751
752 752 class DummyRepr(object):
753 753 def __repr__(self):
754 754 return "DummyRepr"
755 755
756 756 def _repr_html_(self):
757 757 return "<b>dummy</b>"
758 758
759 759 def _repr_javascript_(self):
760 760 return "console.log('hi');", {'key': 'value'}
761 761
762 762
763 763 def test_user_variables():
764 764 # enable all formatters
765 765 ip.display_formatter.active_types = ip.display_formatter.format_types
766 766
767 767 ip.user_ns['dummy'] = d = DummyRepr()
768 768 keys = set(['dummy', 'doesnotexist'])
769 769 r = ip.user_expressions({ key:key for key in keys})
770 770
771 771 nt.assert_equal(keys, set(r.keys()))
772 772 dummy = r['dummy']
773 773 nt.assert_equal(set(['status', 'data', 'metadata']), set(dummy.keys()))
774 774 nt.assert_equal(dummy['status'], 'ok')
775 775 data = dummy['data']
776 776 metadata = dummy['metadata']
777 777 nt.assert_equal(data.get('text/html'), d._repr_html_())
778 778 js, jsmd = d._repr_javascript_()
779 779 nt.assert_equal(data.get('application/javascript'), js)
780 780 nt.assert_equal(metadata.get('application/javascript'), jsmd)
781 781
782 782 dne = r['doesnotexist']
783 783 nt.assert_equal(dne['status'], 'error')
784 784 nt.assert_equal(dne['ename'], 'NameError')
785 785
786 786 # back to text only
787 787 ip.display_formatter.active_types = ['text/plain']
788 788
789 789 def test_user_expression():
790 790 # enable all formatters
791 791 ip.display_formatter.active_types = ip.display_formatter.format_types
792 792 query = {
793 793 'a' : '1 + 2',
794 794 'b' : '1/0',
795 795 }
796 796 r = ip.user_expressions(query)
797 797 import pprint
798 798 pprint.pprint(r)
799 799 nt.assert_equal(set(r.keys()), set(query.keys()))
800 800 a = r['a']
801 801 nt.assert_equal(set(['status', 'data', 'metadata']), set(a.keys()))
802 802 nt.assert_equal(a['status'], 'ok')
803 803 data = a['data']
804 804 metadata = a['metadata']
805 805 nt.assert_equal(data.get('text/plain'), '3')
806 806
807 807 b = r['b']
808 808 nt.assert_equal(b['status'], 'error')
809 809 nt.assert_equal(b['ename'], 'ZeroDivisionError')
810 810
811 811 # back to text only
812 812 ip.display_formatter.active_types = ['text/plain']
813 813
814 814
815 815
816 816
817 817
818 818 class TestSyntaxErrorTransformer(unittest.TestCase):
819 819 """Check that SyntaxError raised by an input transformer is handled by run_cell()"""
820 820
821 821 class SyntaxErrorTransformer(InputTransformer):
822 822
823 823 def push(self, line):
824 824 pos = line.find('syntaxerror')
825 825 if pos >= 0:
826 826 e = SyntaxError('input contains "syntaxerror"')
827 827 e.text = line
828 828 e.offset = pos + 1
829 829 raise e
830 830 return line
831 831
832 832 def reset(self):
833 833 pass
834 834
835 835 def setUp(self):
836 836 self.transformer = TestSyntaxErrorTransformer.SyntaxErrorTransformer()
837 837 ip.input_splitter.python_line_transforms.append(self.transformer)
838 838 ip.input_transformer_manager.python_line_transforms.append(self.transformer)
839 839
840 840 def tearDown(self):
841 841 ip.input_splitter.python_line_transforms.remove(self.transformer)
842 842 ip.input_transformer_manager.python_line_transforms.remove(self.transformer)
843 843
844 844 def test_syntaxerror_input_transformer(self):
845 845 with tt.AssertPrints('1234'):
846 846 ip.run_cell('1234')
847 847 with tt.AssertPrints('SyntaxError: invalid syntax'):
848 848 ip.run_cell('1 2 3') # plain python syntax error
849 849 with tt.AssertPrints('SyntaxError: input contains "syntaxerror"'):
850 850 ip.run_cell('2345 # syntaxerror') # input transformer syntax error
851 851 with tt.AssertPrints('3456'):
852 852 ip.run_cell('3456')
853 853
854 854
855 855
856 856 def test_warning_suppression():
857 857 ip.run_cell("import warnings")
858 858 try:
859 859 with tt.AssertPrints("UserWarning: asdf", channel="stderr"):
860 860 ip.run_cell("warnings.warn('asdf')")
861 861 # Here's the real test -- if we run that again, we should get the
862 862 # warning again. Traditionally, each warning was only issued once per
863 863 # IPython session (approximately), even if the user typed in new and
864 864 # different code that should have also triggered the warning, leading
865 865 # to much confusion.
866 866 with tt.AssertPrints("UserWarning: asdf", channel="stderr"):
867 867 ip.run_cell("warnings.warn('asdf')")
868 868 finally:
869 869 ip.run_cell("del warnings")
General Comments 0
You need to be logged in to leave comments. Login now