##// END OF EJS Templates
exercise calling run_cell_async as a coroutine
Min RK -
Show More
@@ -1,930 +1,948
1 1 # -*- coding: utf-8 -*-
2 2 """Tests for the key interactiveshell module.
3 3
4 4 Historically the main classes in interactiveshell have been under-tested. This
5 5 module should grow as many single-method tests as possible to trap many of the
6 6 recurring bugs we seem to encounter with high-level interaction.
7 7 """
8 8
9 9 # Copyright (c) IPython Development Team.
10 10 # Distributed under the terms of the Modified BSD License.
11 11
12 import asyncio
12 13 import ast
13 14 import os
14 15 import signal
15 16 import shutil
16 17 import sys
17 18 import tempfile
18 19 import unittest
19 20 from unittest import mock
20 21
21 22 from os.path import join
22 23
23 24 import nose.tools as nt
24 25
25 26 from IPython.core.error import InputRejected
26 27 from IPython.core.inputtransformer import InputTransformer
28 from IPython.core import interactiveshell
27 29 from IPython.testing.decorators import (
28 30 skipif, skip_win32, onlyif_unicode_paths, onlyif_cmds_exist,
29 31 )
30 32 from IPython.testing import tools as tt
31 33 from IPython.utils.process import find_cmd
32 34
33 35 #-----------------------------------------------------------------------------
34 36 # Globals
35 37 #-----------------------------------------------------------------------------
36 38 # This is used by every single test, no point repeating it ad nauseam
37 39 ip = get_ipython()
38 40
39 41 #-----------------------------------------------------------------------------
40 42 # Tests
41 43 #-----------------------------------------------------------------------------
42 44
43 45 class DerivedInterrupt(KeyboardInterrupt):
44 46 pass
45 47
46 48 class InteractiveShellTestCase(unittest.TestCase):
47 49 def test_naked_string_cells(self):
48 50 """Test that cells with only naked strings are fully executed"""
49 51 # First, single-line inputs
50 52 ip.run_cell('"a"\n')
51 53 self.assertEqual(ip.user_ns['_'], 'a')
52 54 # And also multi-line cells
53 55 ip.run_cell('"""a\nb"""\n')
54 56 self.assertEqual(ip.user_ns['_'], 'a\nb')
55 57
56 58 def test_run_empty_cell(self):
57 59 """Just make sure we don't get a horrible error with a blank
58 60 cell of input. Yes, I did overlook that."""
59 61 old_xc = ip.execution_count
60 62 res = ip.run_cell('')
61 63 self.assertEqual(ip.execution_count, old_xc)
62 64 self.assertEqual(res.execution_count, None)
63 65
64 66 def test_run_cell_multiline(self):
65 67 """Multi-block, multi-line cells must execute correctly.
66 68 """
67 69 src = '\n'.join(["x=1",
68 70 "y=2",
69 71 "if 1:",
70 72 " x += 1",
71 73 " y += 1",])
72 74 res = ip.run_cell(src)
73 75 self.assertEqual(ip.user_ns['x'], 2)
74 76 self.assertEqual(ip.user_ns['y'], 3)
75 77 self.assertEqual(res.success, True)
76 78 self.assertEqual(res.result, None)
77 79
78 80 def test_multiline_string_cells(self):
79 81 "Code sprinkled with multiline strings should execute (GH-306)"
80 82 ip.run_cell('tmp=0')
81 83 self.assertEqual(ip.user_ns['tmp'], 0)
82 84 res = ip.run_cell('tmp=1;"""a\nb"""\n')
83 85 self.assertEqual(ip.user_ns['tmp'], 1)
84 86 self.assertEqual(res.success, True)
85 87 self.assertEqual(res.result, "a\nb")
86 88
87 89 def test_dont_cache_with_semicolon(self):
88 90 "Ending a line with semicolon should not cache the returned object (GH-307)"
89 91 oldlen = len(ip.user_ns['Out'])
90 92 for cell in ['1;', '1;1;']:
91 93 res = ip.run_cell(cell, store_history=True)
92 94 newlen = len(ip.user_ns['Out'])
93 95 self.assertEqual(oldlen, newlen)
94 96 self.assertIsNone(res.result)
95 97 i = 0
96 98 #also test the default caching behavior
97 99 for cell in ['1', '1;1']:
98 100 ip.run_cell(cell, store_history=True)
99 101 newlen = len(ip.user_ns['Out'])
100 102 i += 1
101 103 self.assertEqual(oldlen+i, newlen)
102 104
103 105 def test_syntax_error(self):
104 106 res = ip.run_cell("raise = 3")
105 107 self.assertIsInstance(res.error_before_exec, SyntaxError)
106 108
107 109 def test_In_variable(self):
108 110 "Verify that In variable grows with user input (GH-284)"
109 111 oldlen = len(ip.user_ns['In'])
110 112 ip.run_cell('1;', store_history=True)
111 113 newlen = len(ip.user_ns['In'])
112 114 self.assertEqual(oldlen+1, newlen)
113 115 self.assertEqual(ip.user_ns['In'][-1],'1;')
114 116
115 117 def test_magic_names_in_string(self):
116 118 ip.run_cell('a = """\n%exit\n"""')
117 119 self.assertEqual(ip.user_ns['a'], '\n%exit\n')
118 120
119 121 def test_trailing_newline(self):
120 122 """test that running !(command) does not raise a SyntaxError"""
121 123 ip.run_cell('!(true)\n', False)
122 124 ip.run_cell('!(true)\n\n\n', False)
123 125
124 126 def test_gh_597(self):
125 127 """Pretty-printing lists of objects with non-ascii reprs may cause
126 128 problems."""
127 129 class Spam(object):
128 130 def __repr__(self):
129 131 return "\xe9"*50
130 132 import IPython.core.formatters
131 133 f = IPython.core.formatters.PlainTextFormatter()
132 134 f([Spam(),Spam()])
133 135
134 136
135 137 def test_future_flags(self):
136 138 """Check that future flags are used for parsing code (gh-777)"""
137 139 ip.run_cell('from __future__ import barry_as_FLUFL')
138 140 try:
139 141 ip.run_cell('prfunc_return_val = 1 <> 2')
140 142 assert 'prfunc_return_val' in ip.user_ns
141 143 finally:
142 144 # Reset compiler flags so we don't mess up other tests.
143 145 ip.compile.reset_compiler_flags()
144 146
145 147 def test_can_pickle(self):
146 148 "Can we pickle objects defined interactively (GH-29)"
147 149 ip = get_ipython()
148 150 ip.reset()
149 151 ip.run_cell(("class Mylist(list):\n"
150 152 " def __init__(self,x=[]):\n"
151 153 " list.__init__(self,x)"))
152 154 ip.run_cell("w=Mylist([1,2,3])")
153 155
154 156 from pickle import dumps
155 157
156 158 # We need to swap in our main module - this is only necessary
157 159 # inside the test framework, because IPython puts the interactive module
158 160 # in place (but the test framework undoes this).
159 161 _main = sys.modules['__main__']
160 162 sys.modules['__main__'] = ip.user_module
161 163 try:
162 164 res = dumps(ip.user_ns["w"])
163 165 finally:
164 166 sys.modules['__main__'] = _main
165 167 self.assertTrue(isinstance(res, bytes))
166 168
167 169 def test_global_ns(self):
168 170 "Code in functions must be able to access variables outside them."
169 171 ip = get_ipython()
170 172 ip.run_cell("a = 10")
171 173 ip.run_cell(("def f(x):\n"
172 174 " return x + a"))
173 175 ip.run_cell("b = f(12)")
174 176 self.assertEqual(ip.user_ns["b"], 22)
175 177
176 178 def test_bad_custom_tb(self):
177 179 """Check that InteractiveShell is protected from bad custom exception handlers"""
178 180 ip.set_custom_exc((IOError,), lambda etype,value,tb: 1/0)
179 181 self.assertEqual(ip.custom_exceptions, (IOError,))
180 182 with tt.AssertPrints("Custom TB Handler failed", channel='stderr'):
181 183 ip.run_cell(u'raise IOError("foo")')
182 184 self.assertEqual(ip.custom_exceptions, ())
183 185
184 186 def test_bad_custom_tb_return(self):
185 187 """Check that InteractiveShell is protected from bad return types in custom exception handlers"""
186 188 ip.set_custom_exc((NameError,),lambda etype,value,tb, tb_offset=None: 1)
187 189 self.assertEqual(ip.custom_exceptions, (NameError,))
188 190 with tt.AssertPrints("Custom TB Handler failed", channel='stderr'):
189 191 ip.run_cell(u'a=abracadabra')
190 192 self.assertEqual(ip.custom_exceptions, ())
191 193
192 194 def test_drop_by_id(self):
193 195 myvars = {"a":object(), "b":object(), "c": object()}
194 196 ip.push(myvars, interactive=False)
195 197 for name in myvars:
196 198 assert name in ip.user_ns, name
197 199 assert name in ip.user_ns_hidden, name
198 200 ip.user_ns['b'] = 12
199 201 ip.drop_by_id(myvars)
200 202 for name in ["a", "c"]:
201 203 assert name not in ip.user_ns, name
202 204 assert name not in ip.user_ns_hidden, name
203 205 assert ip.user_ns['b'] == 12
204 206 ip.reset()
205 207
206 208 def test_var_expand(self):
207 209 ip.user_ns['f'] = u'Ca\xf1o'
208 210 self.assertEqual(ip.var_expand(u'echo $f'), u'echo Ca\xf1o')
209 211 self.assertEqual(ip.var_expand(u'echo {f}'), u'echo Ca\xf1o')
210 212 self.assertEqual(ip.var_expand(u'echo {f[:-1]}'), u'echo Ca\xf1')
211 213 self.assertEqual(ip.var_expand(u'echo {1*2}'), u'echo 2')
212 214
213 215 self.assertEqual(ip.var_expand(u"grep x | awk '{print $1}'"), u"grep x | awk '{print $1}'")
214 216
215 217 ip.user_ns['f'] = b'Ca\xc3\xb1o'
216 218 # This should not raise any exception:
217 219 ip.var_expand(u'echo $f')
218 220
219 221 def test_var_expand_local(self):
220 222 """Test local variable expansion in !system and %magic calls"""
221 223 # !system
222 224 ip.run_cell('def test():\n'
223 225 ' lvar = "ttt"\n'
224 226 ' ret = !echo {lvar}\n'
225 227 ' return ret[0]\n')
226 228 res = ip.user_ns['test']()
227 229 nt.assert_in('ttt', res)
228 230
229 231 # %magic
230 232 ip.run_cell('def makemacro():\n'
231 233 ' macroname = "macro_var_expand_locals"\n'
232 234 ' %macro {macroname} codestr\n')
233 235 ip.user_ns['codestr'] = "str(12)"
234 236 ip.run_cell('makemacro()')
235 237 nt.assert_in('macro_var_expand_locals', ip.user_ns)
236 238
237 239 def test_var_expand_self(self):
238 240 """Test variable expansion with the name 'self', which was failing.
239 241
240 242 See https://github.com/ipython/ipython/issues/1878#issuecomment-7698218
241 243 """
242 244 ip.run_cell('class cTest:\n'
243 245 ' classvar="see me"\n'
244 246 ' def test(self):\n'
245 247 ' res = !echo Variable: {self.classvar}\n'
246 248 ' return res[0]\n')
247 249 nt.assert_in('see me', ip.user_ns['cTest']().test())
248 250
249 251 def test_bad_var_expand(self):
250 252 """var_expand on invalid formats shouldn't raise"""
251 253 # SyntaxError
252 254 self.assertEqual(ip.var_expand(u"{'a':5}"), u"{'a':5}")
253 255 # NameError
254 256 self.assertEqual(ip.var_expand(u"{asdf}"), u"{asdf}")
255 257 # ZeroDivisionError
256 258 self.assertEqual(ip.var_expand(u"{1/0}"), u"{1/0}")
257 259
258 260 def test_silent_postexec(self):
259 261 """run_cell(silent=True) doesn't invoke pre/post_run_cell callbacks"""
260 262 pre_explicit = mock.Mock()
261 263 pre_always = mock.Mock()
262 264 post_explicit = mock.Mock()
263 265 post_always = mock.Mock()
264 266 all_mocks = [pre_explicit, pre_always, post_explicit, post_always]
265 267
266 268 ip.events.register('pre_run_cell', pre_explicit)
267 269 ip.events.register('pre_execute', pre_always)
268 270 ip.events.register('post_run_cell', post_explicit)
269 271 ip.events.register('post_execute', post_always)
270 272
271 273 try:
272 274 ip.run_cell("1", silent=True)
273 275 assert pre_always.called
274 276 assert not pre_explicit.called
275 277 assert post_always.called
276 278 assert not post_explicit.called
277 279 # double-check that non-silent exec did what we expected
278 280 # silent to avoid
279 281 ip.run_cell("1")
280 282 assert pre_explicit.called
281 283 assert post_explicit.called
282 284 info, = pre_explicit.call_args[0]
283 285 result, = post_explicit.call_args[0]
284 286 self.assertEqual(info, result.info)
285 287 # check that post hooks are always called
286 288 [m.reset_mock() for m in all_mocks]
287 289 ip.run_cell("syntax error")
288 290 assert pre_always.called
289 291 assert pre_explicit.called
290 292 assert post_always.called
291 293 assert post_explicit.called
292 294 info, = pre_explicit.call_args[0]
293 295 result, = post_explicit.call_args[0]
294 296 self.assertEqual(info, result.info)
295 297 finally:
296 298 # remove post-exec
297 299 ip.events.unregister('pre_run_cell', pre_explicit)
298 300 ip.events.unregister('pre_execute', pre_always)
299 301 ip.events.unregister('post_run_cell', post_explicit)
300 302 ip.events.unregister('post_execute', post_always)
301 303
302 304 def test_silent_noadvance(self):
303 305 """run_cell(silent=True) doesn't advance execution_count"""
304 306 ec = ip.execution_count
305 307 # silent should force store_history=False
306 308 ip.run_cell("1", store_history=True, silent=True)
307 309
308 310 self.assertEqual(ec, ip.execution_count)
309 311 # double-check that non-silent exec did what we expected
310 312 # silent to avoid
311 313 ip.run_cell("1", store_history=True)
312 314 self.assertEqual(ec+1, ip.execution_count)
313 315
314 316 def test_silent_nodisplayhook(self):
315 317 """run_cell(silent=True) doesn't trigger displayhook"""
316 318 d = dict(called=False)
317 319
318 320 trap = ip.display_trap
319 321 save_hook = trap.hook
320 322
321 323 def failing_hook(*args, **kwargs):
322 324 d['called'] = True
323 325
324 326 try:
325 327 trap.hook = failing_hook
326 328 res = ip.run_cell("1", silent=True)
327 329 self.assertFalse(d['called'])
328 330 self.assertIsNone(res.result)
329 331 # double-check that non-silent exec did what we expected
330 332 # silent to avoid
331 333 ip.run_cell("1")
332 334 self.assertTrue(d['called'])
333 335 finally:
334 336 trap.hook = save_hook
335 337
336 338 def test_ofind_line_magic(self):
337 339 from IPython.core.magic import register_line_magic
338 340
339 341 @register_line_magic
340 342 def lmagic(line):
341 343 "A line magic"
342 344
343 345 # Get info on line magic
344 346 lfind = ip._ofind('lmagic')
345 347 info = dict(found=True, isalias=False, ismagic=True,
346 348 namespace = 'IPython internal', obj= lmagic.__wrapped__,
347 349 parent = None)
348 350 nt.assert_equal(lfind, info)
349 351
350 352 def test_ofind_cell_magic(self):
351 353 from IPython.core.magic import register_cell_magic
352 354
353 355 @register_cell_magic
354 356 def cmagic(line, cell):
355 357 "A cell magic"
356 358
357 359 # Get info on cell magic
358 360 find = ip._ofind('cmagic')
359 361 info = dict(found=True, isalias=False, ismagic=True,
360 362 namespace = 'IPython internal', obj= cmagic.__wrapped__,
361 363 parent = None)
362 364 nt.assert_equal(find, info)
363 365
364 366 def test_ofind_property_with_error(self):
365 367 class A(object):
366 368 @property
367 369 def foo(self):
368 370 raise NotImplementedError()
369 371 a = A()
370 372
371 373 found = ip._ofind('a.foo', [('locals', locals())])
372 374 info = dict(found=True, isalias=False, ismagic=False,
373 375 namespace='locals', obj=A.foo, parent=a)
374 376 nt.assert_equal(found, info)
375 377
376 378 def test_ofind_multiple_attribute_lookups(self):
377 379 class A(object):
378 380 @property
379 381 def foo(self):
380 382 raise NotImplementedError()
381 383
382 384 a = A()
383 385 a.a = A()
384 386 a.a.a = A()
385 387
386 388 found = ip._ofind('a.a.a.foo', [('locals', locals())])
387 389 info = dict(found=True, isalias=False, ismagic=False,
388 390 namespace='locals', obj=A.foo, parent=a.a.a)
389 391 nt.assert_equal(found, info)
390 392
391 393 def test_ofind_slotted_attributes(self):
392 394 class A(object):
393 395 __slots__ = ['foo']
394 396 def __init__(self):
395 397 self.foo = 'bar'
396 398
397 399 a = A()
398 400 found = ip._ofind('a.foo', [('locals', locals())])
399 401 info = dict(found=True, isalias=False, ismagic=False,
400 402 namespace='locals', obj=a.foo, parent=a)
401 403 nt.assert_equal(found, info)
402 404
403 405 found = ip._ofind('a.bar', [('locals', locals())])
404 406 info = dict(found=False, isalias=False, ismagic=False,
405 407 namespace=None, obj=None, parent=a)
406 408 nt.assert_equal(found, info)
407 409
408 410 def test_ofind_prefers_property_to_instance_level_attribute(self):
409 411 class A(object):
410 412 @property
411 413 def foo(self):
412 414 return 'bar'
413 415 a = A()
414 416 a.__dict__['foo'] = 'baz'
415 417 nt.assert_equal(a.foo, 'bar')
416 418 found = ip._ofind('a.foo', [('locals', locals())])
417 419 nt.assert_is(found['obj'], A.foo)
418 420
419 421 def test_custom_syntaxerror_exception(self):
420 422 called = []
421 423 def my_handler(shell, etype, value, tb, tb_offset=None):
422 424 called.append(etype)
423 425 shell.showtraceback((etype, value, tb), tb_offset=tb_offset)
424 426
425 427 ip.set_custom_exc((SyntaxError,), my_handler)
426 428 try:
427 429 ip.run_cell("1f")
428 430 # Check that this was called, and only once.
429 431 self.assertEqual(called, [SyntaxError])
430 432 finally:
431 433 # Reset the custom exception hook
432 434 ip.set_custom_exc((), None)
433 435
434 436 def test_custom_exception(self):
435 437 called = []
436 438 def my_handler(shell, etype, value, tb, tb_offset=None):
437 439 called.append(etype)
438 440 shell.showtraceback((etype, value, tb), tb_offset=tb_offset)
439 441
440 442 ip.set_custom_exc((ValueError,), my_handler)
441 443 try:
442 444 res = ip.run_cell("raise ValueError('test')")
443 445 # Check that this was called, and only once.
444 446 self.assertEqual(called, [ValueError])
445 447 # Check that the error is on the result object
446 448 self.assertIsInstance(res.error_in_exec, ValueError)
447 449 finally:
448 450 # Reset the custom exception hook
449 451 ip.set_custom_exc((), None)
450 452
451 453 def test_mktempfile(self):
452 454 filename = ip.mktempfile()
453 455 # Check that we can open the file again on Windows
454 456 with open(filename, 'w') as f:
455 457 f.write('abc')
456 458
457 459 filename = ip.mktempfile(data='blah')
458 460 with open(filename, 'r') as f:
459 461 self.assertEqual(f.read(), 'blah')
460 462
461 463 def test_new_main_mod(self):
462 464 # Smoketest to check that this accepts a unicode module name
463 465 name = u'jiefmw'
464 466 mod = ip.new_main_mod(u'%s.py' % name, name)
465 467 self.assertEqual(mod.__name__, name)
466 468
467 469 def test_get_exception_only(self):
468 470 try:
469 471 raise KeyboardInterrupt
470 472 except KeyboardInterrupt:
471 473 msg = ip.get_exception_only()
472 474 self.assertEqual(msg, 'KeyboardInterrupt\n')
473 475
474 476 try:
475 477 raise DerivedInterrupt("foo")
476 478 except KeyboardInterrupt:
477 479 msg = ip.get_exception_only()
478 480 self.assertEqual(msg, 'IPython.core.tests.test_interactiveshell.DerivedInterrupt: foo\n')
479 481
480 482 def test_inspect_text(self):
481 483 ip.run_cell('a = 5')
482 484 text = ip.object_inspect_text('a')
483 485 self.assertIsInstance(text, str)
484 486
485 487 def test_last_execution_result(self):
486 488 """ Check that last execution result gets set correctly (GH-10702) """
487 489 result = ip.run_cell('a = 5; a')
488 490 self.assertTrue(ip.last_execution_succeeded)
489 491 self.assertEqual(ip.last_execution_result.result, 5)
490 492
491 493 result = ip.run_cell('a = x_invalid_id_x')
492 494 self.assertFalse(ip.last_execution_succeeded)
493 495 self.assertFalse(ip.last_execution_result.success)
494 496 self.assertIsInstance(ip.last_execution_result.error_in_exec, NameError)
495 497
496 498
497 499 class TestSafeExecfileNonAsciiPath(unittest.TestCase):
498 500
499 501 @onlyif_unicode_paths
500 502 def setUp(self):
501 503 self.BASETESTDIR = tempfile.mkdtemp()
502 504 self.TESTDIR = join(self.BASETESTDIR, u"åäö")
503 505 os.mkdir(self.TESTDIR)
504 506 with open(join(self.TESTDIR, u"åäötestscript.py"), "w") as sfile:
505 507 sfile.write("pass\n")
506 508 self.oldpath = os.getcwd()
507 509 os.chdir(self.TESTDIR)
508 510 self.fname = u"åäötestscript.py"
509 511
510 512 def tearDown(self):
511 513 os.chdir(self.oldpath)
512 514 shutil.rmtree(self.BASETESTDIR)
513 515
514 516 @onlyif_unicode_paths
515 517 def test_1(self):
516 518 """Test safe_execfile with non-ascii path
517 519 """
518 520 ip.safe_execfile(self.fname, {}, raise_exceptions=True)
519 521
520 522 class ExitCodeChecks(tt.TempFileMixin):
521 523 def test_exit_code_ok(self):
522 524 self.system('exit 0')
523 525 self.assertEqual(ip.user_ns['_exit_code'], 0)
524 526
525 527 def test_exit_code_error(self):
526 528 self.system('exit 1')
527 529 self.assertEqual(ip.user_ns['_exit_code'], 1)
528 530
529 531 @skipif(not hasattr(signal, 'SIGALRM'))
530 532 def test_exit_code_signal(self):
531 533 self.mktmp("import signal, time\n"
532 534 "signal.setitimer(signal.ITIMER_REAL, 0.1)\n"
533 535 "time.sleep(1)\n")
534 536 self.system("%s %s" % (sys.executable, self.fname))
535 537 self.assertEqual(ip.user_ns['_exit_code'], -signal.SIGALRM)
536 538
537 539 @onlyif_cmds_exist("csh")
538 540 def test_exit_code_signal_csh(self):
539 541 SHELL = os.environ.get('SHELL', None)
540 542 os.environ['SHELL'] = find_cmd("csh")
541 543 try:
542 544 self.test_exit_code_signal()
543 545 finally:
544 546 if SHELL is not None:
545 547 os.environ['SHELL'] = SHELL
546 548 else:
547 549 del os.environ['SHELL']
548 550
549 551
550 552 class TestSystemRaw(ExitCodeChecks, unittest.TestCase):
551 553 system = ip.system_raw
552 554
553 555 @onlyif_unicode_paths
554 556 def test_1(self):
555 557 """Test system_raw with non-ascii cmd
556 558 """
557 559 cmd = u'''python -c "'åäö'" '''
558 560 ip.system_raw(cmd)
559 561
560 562 @mock.patch('subprocess.call', side_effect=KeyboardInterrupt)
561 563 @mock.patch('os.system', side_effect=KeyboardInterrupt)
562 564 def test_control_c(self, *mocks):
563 565 try:
564 566 self.system("sleep 1 # wont happen")
565 567 except KeyboardInterrupt:
566 568 self.fail("system call should intercept "
567 569 "keyboard interrupt from subprocess.call")
568 570 self.assertEqual(ip.user_ns['_exit_code'], -signal.SIGINT)
569 571
570 572 # TODO: Exit codes are currently ignored on Windows.
571 573 class TestSystemPipedExitCode(ExitCodeChecks, unittest.TestCase):
572 574 system = ip.system_piped
573 575
574 576 @skip_win32
575 577 def test_exit_code_ok(self):
576 578 ExitCodeChecks.test_exit_code_ok(self)
577 579
578 580 @skip_win32
579 581 def test_exit_code_error(self):
580 582 ExitCodeChecks.test_exit_code_error(self)
581 583
582 584 @skip_win32
583 585 def test_exit_code_signal(self):
584 586 ExitCodeChecks.test_exit_code_signal(self)
585 587
586 588 class TestModules(tt.TempFileMixin, unittest.TestCase):
587 589 def test_extraneous_loads(self):
588 590 """Test we're not loading modules on startup that we shouldn't.
589 591 """
590 592 self.mktmp("import sys\n"
591 593 "print('numpy' in sys.modules)\n"
592 594 "print('ipyparallel' in sys.modules)\n"
593 595 "print('ipykernel' in sys.modules)\n"
594 596 )
595 597 out = "False\nFalse\nFalse\n"
596 598 tt.ipexec_validate(self.fname, out)
597 599
598 600 class Negator(ast.NodeTransformer):
599 601 """Negates all number literals in an AST."""
600 602 def visit_Num(self, node):
601 603 node.n = -node.n
602 604 return node
603 605
604 606 class TestAstTransform(unittest.TestCase):
605 607 def setUp(self):
606 608 self.negator = Negator()
607 609 ip.ast_transformers.append(self.negator)
608 610
609 611 def tearDown(self):
610 612 ip.ast_transformers.remove(self.negator)
611 613
612 614 def test_run_cell(self):
613 615 with tt.AssertPrints('-34'):
614 616 ip.run_cell('print (12 + 22)')
615 617
616 618 # A named reference to a number shouldn't be transformed.
617 619 ip.user_ns['n'] = 55
618 620 with tt.AssertNotPrints('-55'):
619 621 ip.run_cell('print (n)')
620 622
621 623 def test_timeit(self):
622 624 called = set()
623 625 def f(x):
624 626 called.add(x)
625 627 ip.push({'f':f})
626 628
627 629 with tt.AssertPrints("std. dev. of"):
628 630 ip.run_line_magic("timeit", "-n1 f(1)")
629 631 self.assertEqual(called, {-1})
630 632 called.clear()
631 633
632 634 with tt.AssertPrints("std. dev. of"):
633 635 ip.run_cell_magic("timeit", "-n1 f(2)", "f(3)")
634 636 self.assertEqual(called, {-2, -3})
635 637
636 638 def test_time(self):
637 639 called = []
638 640 def f(x):
639 641 called.append(x)
640 642 ip.push({'f':f})
641 643
642 644 # Test with an expression
643 645 with tt.AssertPrints("Wall time: "):
644 646 ip.run_line_magic("time", "f(5+9)")
645 647 self.assertEqual(called, [-14])
646 648 called[:] = []
647 649
648 650 # Test with a statement (different code path)
649 651 with tt.AssertPrints("Wall time: "):
650 652 ip.run_line_magic("time", "a = f(-3 + -2)")
651 653 self.assertEqual(called, [5])
652 654
653 655 def test_macro(self):
654 656 ip.push({'a':10})
655 657 # The AST transformation makes this do a+=-1
656 658 ip.define_macro("amacro", "a+=1\nprint(a)")
657 659
658 660 with tt.AssertPrints("9"):
659 661 ip.run_cell("amacro")
660 662 with tt.AssertPrints("8"):
661 663 ip.run_cell("amacro")
662 664
663 665 class IntegerWrapper(ast.NodeTransformer):
664 666 """Wraps all integers in a call to Integer()"""
665 667 def visit_Num(self, node):
666 668 if isinstance(node.n, int):
667 669 return ast.Call(func=ast.Name(id='Integer', ctx=ast.Load()),
668 670 args=[node], keywords=[])
669 671 return node
670 672
671 673 class TestAstTransform2(unittest.TestCase):
672 674 def setUp(self):
673 675 self.intwrapper = IntegerWrapper()
674 676 ip.ast_transformers.append(self.intwrapper)
675 677
676 678 self.calls = []
677 679 def Integer(*args):
678 680 self.calls.append(args)
679 681 return args
680 682 ip.push({"Integer": Integer})
681 683
682 684 def tearDown(self):
683 685 ip.ast_transformers.remove(self.intwrapper)
684 686 del ip.user_ns['Integer']
685 687
686 688 def test_run_cell(self):
687 689 ip.run_cell("n = 2")
688 690 self.assertEqual(self.calls, [(2,)])
689 691
690 692 # This shouldn't throw an error
691 693 ip.run_cell("o = 2.0")
692 694 self.assertEqual(ip.user_ns['o'], 2.0)
693 695
694 696 def test_timeit(self):
695 697 called = set()
696 698 def f(x):
697 699 called.add(x)
698 700 ip.push({'f':f})
699 701
700 702 with tt.AssertPrints("std. dev. of"):
701 703 ip.run_line_magic("timeit", "-n1 f(1)")
702 704 self.assertEqual(called, {(1,)})
703 705 called.clear()
704 706
705 707 with tt.AssertPrints("std. dev. of"):
706 708 ip.run_cell_magic("timeit", "-n1 f(2)", "f(3)")
707 709 self.assertEqual(called, {(2,), (3,)})
708 710
709 711 class ErrorTransformer(ast.NodeTransformer):
710 712 """Throws an error when it sees a number."""
711 713 def visit_Num(self, node):
712 714 raise ValueError("test")
713 715
714 716 class TestAstTransformError(unittest.TestCase):
715 717 def test_unregistering(self):
716 718 err_transformer = ErrorTransformer()
717 719 ip.ast_transformers.append(err_transformer)
718 720
719 721 with tt.AssertPrints("unregister", channel='stderr'):
720 722 ip.run_cell("1 + 2")
721 723
722 724 # This should have been removed.
723 725 nt.assert_not_in(err_transformer, ip.ast_transformers)
724 726
725 727
726 728 class StringRejector(ast.NodeTransformer):
727 729 """Throws an InputRejected when it sees a string literal.
728 730
729 731 Used to verify that NodeTransformers can signal that a piece of code should
730 732 not be executed by throwing an InputRejected.
731 733 """
732 734
733 735 def visit_Str(self, node):
734 736 raise InputRejected("test")
735 737
736 738
737 739 class TestAstTransformInputRejection(unittest.TestCase):
738 740
739 741 def setUp(self):
740 742 self.transformer = StringRejector()
741 743 ip.ast_transformers.append(self.transformer)
742 744
743 745 def tearDown(self):
744 746 ip.ast_transformers.remove(self.transformer)
745 747
746 748 def test_input_rejection(self):
747 749 """Check that NodeTransformers can reject input."""
748 750
749 751 expect_exception_tb = tt.AssertPrints("InputRejected: test")
750 752 expect_no_cell_output = tt.AssertNotPrints("'unsafe'", suppress=False)
751 753
752 754 # Run the same check twice to verify that the transformer is not
753 755 # disabled after raising.
754 756 with expect_exception_tb, expect_no_cell_output:
755 757 ip.run_cell("'unsafe'")
756 758
757 759 with expect_exception_tb, expect_no_cell_output:
758 760 res = ip.run_cell("'unsafe'")
759 761
760 762 self.assertIsInstance(res.error_before_exec, InputRejected)
761 763
762 764 def test__IPYTHON__():
763 765 # This shouldn't raise a NameError, that's all
764 766 __IPYTHON__
765 767
766 768
767 769 class DummyRepr(object):
768 770 def __repr__(self):
769 771 return "DummyRepr"
770 772
771 773 def _repr_html_(self):
772 774 return "<b>dummy</b>"
773 775
774 776 def _repr_javascript_(self):
775 777 return "console.log('hi');", {'key': 'value'}
776 778
777 779
778 780 def test_user_variables():
779 781 # enable all formatters
780 782 ip.display_formatter.active_types = ip.display_formatter.format_types
781 783
782 784 ip.user_ns['dummy'] = d = DummyRepr()
783 785 keys = {'dummy', 'doesnotexist'}
784 786 r = ip.user_expressions({ key:key for key in keys})
785 787
786 788 nt.assert_equal(keys, set(r.keys()))
787 789 dummy = r['dummy']
788 790 nt.assert_equal({'status', 'data', 'metadata'}, set(dummy.keys()))
789 791 nt.assert_equal(dummy['status'], 'ok')
790 792 data = dummy['data']
791 793 metadata = dummy['metadata']
792 794 nt.assert_equal(data.get('text/html'), d._repr_html_())
793 795 js, jsmd = d._repr_javascript_()
794 796 nt.assert_equal(data.get('application/javascript'), js)
795 797 nt.assert_equal(metadata.get('application/javascript'), jsmd)
796 798
797 799 dne = r['doesnotexist']
798 800 nt.assert_equal(dne['status'], 'error')
799 801 nt.assert_equal(dne['ename'], 'NameError')
800 802
801 803 # back to text only
802 804 ip.display_formatter.active_types = ['text/plain']
803 805
804 806 def test_user_expression():
805 807 # enable all formatters
806 808 ip.display_formatter.active_types = ip.display_formatter.format_types
807 809 query = {
808 810 'a' : '1 + 2',
809 811 'b' : '1/0',
810 812 }
811 813 r = ip.user_expressions(query)
812 814 import pprint
813 815 pprint.pprint(r)
814 816 nt.assert_equal(set(r.keys()), set(query.keys()))
815 817 a = r['a']
816 818 nt.assert_equal({'status', 'data', 'metadata'}, set(a.keys()))
817 819 nt.assert_equal(a['status'], 'ok')
818 820 data = a['data']
819 821 metadata = a['metadata']
820 822 nt.assert_equal(data.get('text/plain'), '3')
821 823
822 824 b = r['b']
823 825 nt.assert_equal(b['status'], 'error')
824 826 nt.assert_equal(b['ename'], 'ZeroDivisionError')
825 827
826 828 # back to text only
827 829 ip.display_formatter.active_types = ['text/plain']
828 830
829 831
830 832
831 833
832 834
833 835 class TestSyntaxErrorTransformer(unittest.TestCase):
834 836 """Check that SyntaxError raised by an input transformer is handled by run_cell()"""
835 837
836 838 @staticmethod
837 839 def transformer(lines):
838 840 for line in lines:
839 841 pos = line.find('syntaxerror')
840 842 if pos >= 0:
841 843 e = SyntaxError('input contains "syntaxerror"')
842 844 e.text = line
843 845 e.offset = pos + 1
844 846 raise e
845 847 return lines
846 848
847 849 def setUp(self):
848 850 ip.input_transformers_post.append(self.transformer)
849 851
850 852 def tearDown(self):
851 853 ip.input_transformers_post.remove(self.transformer)
852 854
853 855 def test_syntaxerror_input_transformer(self):
854 856 with tt.AssertPrints('1234'):
855 857 ip.run_cell('1234')
856 858 with tt.AssertPrints('SyntaxError: invalid syntax'):
857 859 ip.run_cell('1 2 3') # plain python syntax error
858 860 with tt.AssertPrints('SyntaxError: input contains "syntaxerror"'):
859 861 ip.run_cell('2345 # syntaxerror') # input transformer syntax error
860 862 with tt.AssertPrints('3456'):
861 863 ip.run_cell('3456')
862 864
863 865
864 866
865 867 def test_warning_suppression():
866 868 ip.run_cell("import warnings")
867 869 try:
868 870 with tt.AssertPrints("UserWarning: asdf", channel="stderr"):
869 871 ip.run_cell("warnings.warn('asdf')")
870 872 # Here's the real test -- if we run that again, we should get the
871 873 # warning again. Traditionally, each warning was only issued once per
872 874 # IPython session (approximately), even if the user typed in new and
873 875 # different code that should have also triggered the warning, leading
874 876 # to much confusion.
875 877 with tt.AssertPrints("UserWarning: asdf", channel="stderr"):
876 878 ip.run_cell("warnings.warn('asdf')")
877 879 finally:
878 880 ip.run_cell("del warnings")
879 881
880 882
881 883 def test_deprecation_warning():
882 884 ip.run_cell("""
883 885 import warnings
884 886 def wrn():
885 887 warnings.warn(
886 888 "I AM A WARNING",
887 889 DeprecationWarning
888 890 )
889 891 """)
890 892 try:
891 893 with tt.AssertPrints("I AM A WARNING", channel="stderr"):
892 894 ip.run_cell("wrn()")
893 895 finally:
894 896 ip.run_cell("del warnings")
895 897 ip.run_cell("del wrn")
896 898
897 899
898 900 class TestImportNoDeprecate(tt.TempFileMixin):
899 901
900 902 def setup(self):
901 903 """Make a valid python temp file."""
902 904 self.mktmp("""
903 905 import warnings
904 906 def wrn():
905 907 warnings.warn(
906 908 "I AM A WARNING",
907 909 DeprecationWarning
908 910 )
909 911 """)
910 912
911 913 def test_no_dep(self):
912 914 """
913 915 No deprecation warning should be raised from imported functions
914 916 """
915 917 ip.run_cell("from {} import wrn".format(self.fname))
916 918
917 919 with tt.AssertNotPrints("I AM A WARNING"):
918 920 ip.run_cell("wrn()")
919 921 ip.run_cell("del wrn")
920 922
921 923
922 924 def test_custom_exc_count():
923 925 hook = mock.Mock(return_value=None)
924 926 ip.set_custom_exc((SyntaxError,), hook)
925 927 before = ip.execution_count
926 928 ip.run_cell("def foo()", store_history=True)
927 929 # restore default excepthook
928 930 ip.set_custom_exc((), None)
929 931 nt.assert_equal(hook.call_count, 1)
930 932 nt.assert_equal(ip.execution_count, before + 1)
933
934
935 def test_run_cell_async():
936 loop = asyncio.get_event_loop()
937 ip.run_cell("import asyncio")
938 coro = ip.run_cell_async("await asyncio.sleep(0.01)\n5")
939 assert asyncio.iscoroutine(coro)
940 result = loop.run_until_complete(coro)
941 assert isinstance(result, interactiveshell.ExecutionResult)
942 assert result.result == 5
943
944
945 def test_should_run_async():
946 assert not ip.should_run_async("a = 5")
947 assert ip.should_run_async("await x")
948 assert ip.should_run_async("import asyncio; await asyncio.sleep(1)")
General Comments 0
You need to be logged in to leave comments. Login now