##// END OF EJS Templates
raise an error when user tries to open a standard stream...
Osher De Paz -
Show More

The requested changes are too big and content was truncated. Show full diff

1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
@@ -1,1100 +1,1115 b''
1 1 # -*- coding: utf-8 -*-
2 2 """Tests for the key interactiveshell module.
3 3
4 4 Historically the main classes in interactiveshell have been under-tested. This
5 5 module should grow as many single-method tests as possible to trap many of the
6 6 recurring bugs we seem to encounter with high-level interaction.
7 7 """
8 8
9 9 # Copyright (c) IPython Development Team.
10 10 # Distributed under the terms of the Modified BSD License.
11 11
12 12 import asyncio
13 13 import ast
14 14 import os
15 15 import signal
16 16 import shutil
17 17 import sys
18 18 import tempfile
19 19 import unittest
20 20 from unittest import mock
21 21
22 22 from os.path import join
23 23
24 24 from IPython.core.error import InputRejected
25 25 from IPython.core.inputtransformer import InputTransformer
26 26 from IPython.core import interactiveshell
27 27 from IPython.testing.decorators import (
28 28 skipif, skip_win32, onlyif_unicode_paths, onlyif_cmds_exist,
29 29 )
30 30 from IPython.testing import tools as tt
31 31 from IPython.utils.process import find_cmd
32 32
33 33 #-----------------------------------------------------------------------------
34 34 # Globals
35 35 #-----------------------------------------------------------------------------
36 36 # This is used by every single test, no point repeating it ad nauseam
37 37
38 38 #-----------------------------------------------------------------------------
39 39 # Tests
40 40 #-----------------------------------------------------------------------------
41 41
42 42 class DerivedInterrupt(KeyboardInterrupt):
43 43 pass
44 44
45 45 class InteractiveShellTestCase(unittest.TestCase):
46 46 def test_naked_string_cells(self):
47 47 """Test that cells with only naked strings are fully executed"""
48 48 # First, single-line inputs
49 49 ip.run_cell('"a"\n')
50 50 self.assertEqual(ip.user_ns['_'], 'a')
51 51 # And also multi-line cells
52 52 ip.run_cell('"""a\nb"""\n')
53 53 self.assertEqual(ip.user_ns['_'], 'a\nb')
54 54
55 55 def test_run_empty_cell(self):
56 56 """Just make sure we don't get a horrible error with a blank
57 57 cell of input. Yes, I did overlook that."""
58 58 old_xc = ip.execution_count
59 59 res = ip.run_cell('')
60 60 self.assertEqual(ip.execution_count, old_xc)
61 61 self.assertEqual(res.execution_count, None)
62 62
63 63 def test_run_cell_multiline(self):
64 64 """Multi-block, multi-line cells must execute correctly.
65 65 """
66 66 src = '\n'.join(["x=1",
67 67 "y=2",
68 68 "if 1:",
69 69 " x += 1",
70 70 " y += 1",])
71 71 res = ip.run_cell(src)
72 72 self.assertEqual(ip.user_ns['x'], 2)
73 73 self.assertEqual(ip.user_ns['y'], 3)
74 74 self.assertEqual(res.success, True)
75 75 self.assertEqual(res.result, None)
76 76
77 77 def test_multiline_string_cells(self):
78 78 "Code sprinkled with multiline strings should execute (GH-306)"
79 79 ip.run_cell('tmp=0')
80 80 self.assertEqual(ip.user_ns['tmp'], 0)
81 81 res = ip.run_cell('tmp=1;"""a\nb"""\n')
82 82 self.assertEqual(ip.user_ns['tmp'], 1)
83 83 self.assertEqual(res.success, True)
84 84 self.assertEqual(res.result, "a\nb")
85 85
86 86 def test_dont_cache_with_semicolon(self):
87 87 "Ending a line with semicolon should not cache the returned object (GH-307)"
88 88 oldlen = len(ip.user_ns['Out'])
89 89 for cell in ['1;', '1;1;']:
90 90 res = ip.run_cell(cell, store_history=True)
91 91 newlen = len(ip.user_ns['Out'])
92 92 self.assertEqual(oldlen, newlen)
93 93 self.assertIsNone(res.result)
94 94 i = 0
95 95 #also test the default caching behavior
96 96 for cell in ['1', '1;1']:
97 97 ip.run_cell(cell, store_history=True)
98 98 newlen = len(ip.user_ns['Out'])
99 99 i += 1
100 100 self.assertEqual(oldlen+i, newlen)
101 101
102 102 def test_syntax_error(self):
103 103 res = ip.run_cell("raise = 3")
104 104 self.assertIsInstance(res.error_before_exec, SyntaxError)
105 105
106 def test_open_standard_input_stream(self):
107 ip.init_create_namespaces()
108 res = ip.run_cell("open(0)")
109 self.assertIsInstance(res.error_in_exec, ValueError)
110
111 def test_open_standard_output_stream(self):
112 ip.init_create_namespaces()
113 res = ip.run_cell("open(1)")
114 self.assertIsInstance(res.error_in_exec, ValueError)
115
116 def test_open_standard_error_stream(self):
117 ip.init_create_namespaces()
118 res = ip.run_cell("open(2)")
119 self.assertIsInstance(res.error_in_exec, ValueError)
120
106 121 def test_In_variable(self):
107 122 "Verify that In variable grows with user input (GH-284)"
108 123 oldlen = len(ip.user_ns['In'])
109 124 ip.run_cell('1;', store_history=True)
110 125 newlen = len(ip.user_ns['In'])
111 126 self.assertEqual(oldlen+1, newlen)
112 127 self.assertEqual(ip.user_ns['In'][-1],'1;')
113 128
114 129 def test_magic_names_in_string(self):
115 130 ip.run_cell('a = """\n%exit\n"""')
116 131 self.assertEqual(ip.user_ns['a'], '\n%exit\n')
117 132
118 133 def test_trailing_newline(self):
119 134 """test that running !(command) does not raise a SyntaxError"""
120 135 ip.run_cell('!(true)\n', False)
121 136 ip.run_cell('!(true)\n\n\n', False)
122 137
123 138 def test_gh_597(self):
124 139 """Pretty-printing lists of objects with non-ascii reprs may cause
125 140 problems."""
126 141 class Spam(object):
127 142 def __repr__(self):
128 143 return "\xe9"*50
129 144 import IPython.core.formatters
130 145 f = IPython.core.formatters.PlainTextFormatter()
131 146 f([Spam(),Spam()])
132 147
133 148
134 149 def test_future_flags(self):
135 150 """Check that future flags are used for parsing code (gh-777)"""
136 151 ip.run_cell('from __future__ import barry_as_FLUFL')
137 152 try:
138 153 ip.run_cell('prfunc_return_val = 1 <> 2')
139 154 assert 'prfunc_return_val' in ip.user_ns
140 155 finally:
141 156 # Reset compiler flags so we don't mess up other tests.
142 157 ip.compile.reset_compiler_flags()
143 158
144 159 def test_can_pickle(self):
145 160 "Can we pickle objects defined interactively (GH-29)"
146 161 ip = get_ipython()
147 162 ip.reset()
148 163 ip.run_cell(("class Mylist(list):\n"
149 164 " def __init__(self,x=[]):\n"
150 165 " list.__init__(self,x)"))
151 166 ip.run_cell("w=Mylist([1,2,3])")
152 167
153 168 from pickle import dumps
154 169
155 170 # We need to swap in our main module - this is only necessary
156 171 # inside the test framework, because IPython puts the interactive module
157 172 # in place (but the test framework undoes this).
158 173 _main = sys.modules['__main__']
159 174 sys.modules['__main__'] = ip.user_module
160 175 try:
161 176 res = dumps(ip.user_ns["w"])
162 177 finally:
163 178 sys.modules['__main__'] = _main
164 179 self.assertTrue(isinstance(res, bytes))
165 180
166 181 def test_global_ns(self):
167 182 "Code in functions must be able to access variables outside them."
168 183 ip = get_ipython()
169 184 ip.run_cell("a = 10")
170 185 ip.run_cell(("def f(x):\n"
171 186 " return x + a"))
172 187 ip.run_cell("b = f(12)")
173 188 self.assertEqual(ip.user_ns["b"], 22)
174 189
175 190 def test_bad_custom_tb(self):
176 191 """Check that InteractiveShell is protected from bad custom exception handlers"""
177 192 ip.set_custom_exc((IOError,), lambda etype,value,tb: 1/0)
178 193 self.assertEqual(ip.custom_exceptions, (IOError,))
179 194 with tt.AssertPrints("Custom TB Handler failed", channel='stderr'):
180 195 ip.run_cell(u'raise IOError("foo")')
181 196 self.assertEqual(ip.custom_exceptions, ())
182 197
183 198 def test_bad_custom_tb_return(self):
184 199 """Check that InteractiveShell is protected from bad return types in custom exception handlers"""
185 200 ip.set_custom_exc((NameError,),lambda etype,value,tb, tb_offset=None: 1)
186 201 self.assertEqual(ip.custom_exceptions, (NameError,))
187 202 with tt.AssertPrints("Custom TB Handler failed", channel='stderr'):
188 203 ip.run_cell(u'a=abracadabra')
189 204 self.assertEqual(ip.custom_exceptions, ())
190 205
191 206 def test_drop_by_id(self):
192 207 myvars = {"a":object(), "b":object(), "c": object()}
193 208 ip.push(myvars, interactive=False)
194 209 for name in myvars:
195 210 assert name in ip.user_ns, name
196 211 assert name in ip.user_ns_hidden, name
197 212 ip.user_ns['b'] = 12
198 213 ip.drop_by_id(myvars)
199 214 for name in ["a", "c"]:
200 215 assert name not in ip.user_ns, name
201 216 assert name not in ip.user_ns_hidden, name
202 217 assert ip.user_ns['b'] == 12
203 218 ip.reset()
204 219
205 220 def test_var_expand(self):
206 221 ip.user_ns['f'] = u'Ca\xf1o'
207 222 self.assertEqual(ip.var_expand(u'echo $f'), u'echo Ca\xf1o')
208 223 self.assertEqual(ip.var_expand(u'echo {f}'), u'echo Ca\xf1o')
209 224 self.assertEqual(ip.var_expand(u'echo {f[:-1]}'), u'echo Ca\xf1')
210 225 self.assertEqual(ip.var_expand(u'echo {1*2}'), u'echo 2')
211 226
212 227 self.assertEqual(ip.var_expand(u"grep x | awk '{print $1}'"), u"grep x | awk '{print $1}'")
213 228
214 229 ip.user_ns['f'] = b'Ca\xc3\xb1o'
215 230 # This should not raise any exception:
216 231 ip.var_expand(u'echo $f')
217 232
218 233 def test_var_expand_local(self):
219 234 """Test local variable expansion in !system and %magic calls"""
220 235 # !system
221 236 ip.run_cell(
222 237 "def test():\n"
223 238 ' lvar = "ttt"\n'
224 239 " ret = !echo {lvar}\n"
225 240 " return ret[0]\n"
226 241 )
227 242 res = ip.user_ns["test"]()
228 243 self.assertIn("ttt", res)
229 244
230 245 # %magic
231 246 ip.run_cell(
232 247 "def makemacro():\n"
233 248 ' macroname = "macro_var_expand_locals"\n'
234 249 " %macro {macroname} codestr\n"
235 250 )
236 251 ip.user_ns["codestr"] = "str(12)"
237 252 ip.run_cell("makemacro()")
238 253 self.assertIn("macro_var_expand_locals", ip.user_ns)
239 254
240 255 def test_var_expand_self(self):
241 256 """Test variable expansion with the name 'self', which was failing.
242 257
243 258 See https://github.com/ipython/ipython/issues/1878#issuecomment-7698218
244 259 """
245 260 ip.run_cell(
246 261 "class cTest:\n"
247 262 ' classvar="see me"\n'
248 263 " def test(self):\n"
249 264 " res = !echo Variable: {self.classvar}\n"
250 265 " return res[0]\n"
251 266 )
252 267 self.assertIn("see me", ip.user_ns["cTest"]().test())
253 268
254 269 def test_bad_var_expand(self):
255 270 """var_expand on invalid formats shouldn't raise"""
256 271 # SyntaxError
257 272 self.assertEqual(ip.var_expand(u"{'a':5}"), u"{'a':5}")
258 273 # NameError
259 274 self.assertEqual(ip.var_expand(u"{asdf}"), u"{asdf}")
260 275 # ZeroDivisionError
261 276 self.assertEqual(ip.var_expand(u"{1/0}"), u"{1/0}")
262 277
263 278 def test_silent_postexec(self):
264 279 """run_cell(silent=True) doesn't invoke pre/post_run_cell callbacks"""
265 280 pre_explicit = mock.Mock()
266 281 pre_always = mock.Mock()
267 282 post_explicit = mock.Mock()
268 283 post_always = mock.Mock()
269 284 all_mocks = [pre_explicit, pre_always, post_explicit, post_always]
270 285
271 286 ip.events.register('pre_run_cell', pre_explicit)
272 287 ip.events.register('pre_execute', pre_always)
273 288 ip.events.register('post_run_cell', post_explicit)
274 289 ip.events.register('post_execute', post_always)
275 290
276 291 try:
277 292 ip.run_cell("1", silent=True)
278 293 assert pre_always.called
279 294 assert not pre_explicit.called
280 295 assert post_always.called
281 296 assert not post_explicit.called
282 297 # double-check that non-silent exec did what we expected
283 298 # silent to avoid
284 299 ip.run_cell("1")
285 300 assert pre_explicit.called
286 301 assert post_explicit.called
287 302 info, = pre_explicit.call_args[0]
288 303 result, = post_explicit.call_args[0]
289 304 self.assertEqual(info, result.info)
290 305 # check that post hooks are always called
291 306 [m.reset_mock() for m in all_mocks]
292 307 ip.run_cell("syntax error")
293 308 assert pre_always.called
294 309 assert pre_explicit.called
295 310 assert post_always.called
296 311 assert post_explicit.called
297 312 info, = pre_explicit.call_args[0]
298 313 result, = post_explicit.call_args[0]
299 314 self.assertEqual(info, result.info)
300 315 finally:
301 316 # remove post-exec
302 317 ip.events.unregister('pre_run_cell', pre_explicit)
303 318 ip.events.unregister('pre_execute', pre_always)
304 319 ip.events.unregister('post_run_cell', post_explicit)
305 320 ip.events.unregister('post_execute', post_always)
306 321
307 322 def test_silent_noadvance(self):
308 323 """run_cell(silent=True) doesn't advance execution_count"""
309 324 ec = ip.execution_count
310 325 # silent should force store_history=False
311 326 ip.run_cell("1", store_history=True, silent=True)
312 327
313 328 self.assertEqual(ec, ip.execution_count)
314 329 # double-check that non-silent exec did what we expected
315 330 # silent to avoid
316 331 ip.run_cell("1", store_history=True)
317 332 self.assertEqual(ec+1, ip.execution_count)
318 333
319 334 def test_silent_nodisplayhook(self):
320 335 """run_cell(silent=True) doesn't trigger displayhook"""
321 336 d = dict(called=False)
322 337
323 338 trap = ip.display_trap
324 339 save_hook = trap.hook
325 340
326 341 def failing_hook(*args, **kwargs):
327 342 d['called'] = True
328 343
329 344 try:
330 345 trap.hook = failing_hook
331 346 res = ip.run_cell("1", silent=True)
332 347 self.assertFalse(d['called'])
333 348 self.assertIsNone(res.result)
334 349 # double-check that non-silent exec did what we expected
335 350 # silent to avoid
336 351 ip.run_cell("1")
337 352 self.assertTrue(d['called'])
338 353 finally:
339 354 trap.hook = save_hook
340 355
341 356 def test_ofind_line_magic(self):
342 357 from IPython.core.magic import register_line_magic
343 358
344 359 @register_line_magic
345 360 def lmagic(line):
346 361 "A line magic"
347 362
348 363 # Get info on line magic
349 364 lfind = ip._ofind("lmagic")
350 365 info = dict(
351 366 found=True,
352 367 isalias=False,
353 368 ismagic=True,
354 369 namespace="IPython internal",
355 370 obj=lmagic,
356 371 parent=None,
357 372 )
358 373 self.assertEqual(lfind, info)
359 374
360 375 def test_ofind_cell_magic(self):
361 376 from IPython.core.magic import register_cell_magic
362 377
363 378 @register_cell_magic
364 379 def cmagic(line, cell):
365 380 "A cell magic"
366 381
367 382 # Get info on cell magic
368 383 find = ip._ofind("cmagic")
369 384 info = dict(
370 385 found=True,
371 386 isalias=False,
372 387 ismagic=True,
373 388 namespace="IPython internal",
374 389 obj=cmagic,
375 390 parent=None,
376 391 )
377 392 self.assertEqual(find, info)
378 393
379 394 def test_ofind_property_with_error(self):
380 395 class A(object):
381 396 @property
382 397 def foo(self):
383 398 raise NotImplementedError() # pragma: no cover
384 399
385 400 a = A()
386 401
387 402 found = ip._ofind('a.foo', [('locals', locals())])
388 403 info = dict(found=True, isalias=False, ismagic=False,
389 404 namespace='locals', obj=A.foo, parent=a)
390 405 self.assertEqual(found, info)
391 406
392 407 def test_ofind_multiple_attribute_lookups(self):
393 408 class A(object):
394 409 @property
395 410 def foo(self):
396 411 raise NotImplementedError() # pragma: no cover
397 412
398 413 a = A()
399 414 a.a = A()
400 415 a.a.a = A()
401 416
402 417 found = ip._ofind('a.a.a.foo', [('locals', locals())])
403 418 info = dict(found=True, isalias=False, ismagic=False,
404 419 namespace='locals', obj=A.foo, parent=a.a.a)
405 420 self.assertEqual(found, info)
406 421
407 422 def test_ofind_slotted_attributes(self):
408 423 class A(object):
409 424 __slots__ = ['foo']
410 425 def __init__(self):
411 426 self.foo = 'bar'
412 427
413 428 a = A()
414 429 found = ip._ofind('a.foo', [('locals', locals())])
415 430 info = dict(found=True, isalias=False, ismagic=False,
416 431 namespace='locals', obj=a.foo, parent=a)
417 432 self.assertEqual(found, info)
418 433
419 434 found = ip._ofind('a.bar', [('locals', locals())])
420 435 info = dict(found=False, isalias=False, ismagic=False,
421 436 namespace=None, obj=None, parent=a)
422 437 self.assertEqual(found, info)
423 438
424 439 def test_ofind_prefers_property_to_instance_level_attribute(self):
425 440 class A(object):
426 441 @property
427 442 def foo(self):
428 443 return 'bar'
429 444 a = A()
430 445 a.__dict__["foo"] = "baz"
431 446 self.assertEqual(a.foo, "bar")
432 447 found = ip._ofind("a.foo", [("locals", locals())])
433 448 self.assertIs(found["obj"], A.foo)
434 449
435 450 def test_custom_syntaxerror_exception(self):
436 451 called = []
437 452 def my_handler(shell, etype, value, tb, tb_offset=None):
438 453 called.append(etype)
439 454 shell.showtraceback((etype, value, tb), tb_offset=tb_offset)
440 455
441 456 ip.set_custom_exc((SyntaxError,), my_handler)
442 457 try:
443 458 ip.run_cell("1f")
444 459 # Check that this was called, and only once.
445 460 self.assertEqual(called, [SyntaxError])
446 461 finally:
447 462 # Reset the custom exception hook
448 463 ip.set_custom_exc((), None)
449 464
450 465 def test_custom_exception(self):
451 466 called = []
452 467 def my_handler(shell, etype, value, tb, tb_offset=None):
453 468 called.append(etype)
454 469 shell.showtraceback((etype, value, tb), tb_offset=tb_offset)
455 470
456 471 ip.set_custom_exc((ValueError,), my_handler)
457 472 try:
458 473 res = ip.run_cell("raise ValueError('test')")
459 474 # Check that this was called, and only once.
460 475 self.assertEqual(called, [ValueError])
461 476 # Check that the error is on the result object
462 477 self.assertIsInstance(res.error_in_exec, ValueError)
463 478 finally:
464 479 # Reset the custom exception hook
465 480 ip.set_custom_exc((), None)
466 481
467 482 @mock.patch("builtins.print")
468 483 def test_showtraceback_with_surrogates(self, mocked_print):
469 484 values = []
470 485
471 486 def mock_print_func(value, sep=" ", end="\n", file=sys.stdout, flush=False):
472 487 values.append(value)
473 488 if value == chr(0xD8FF):
474 489 raise UnicodeEncodeError("utf-8", chr(0xD8FF), 0, 1, "")
475 490
476 491 # mock builtins.print
477 492 mocked_print.side_effect = mock_print_func
478 493
479 494 # ip._showtraceback() is replaced in globalipapp.py.
480 495 # Call original method to test.
481 496 interactiveshell.InteractiveShell._showtraceback(ip, None, None, chr(0xD8FF))
482 497
483 498 self.assertEqual(mocked_print.call_count, 2)
484 499 self.assertEqual(values, [chr(0xD8FF), "\\ud8ff"])
485 500
486 501 def test_mktempfile(self):
487 502 filename = ip.mktempfile()
488 503 # Check that we can open the file again on Windows
489 504 with open(filename, "w", encoding="utf-8") as f:
490 505 f.write("abc")
491 506
492 507 filename = ip.mktempfile(data="blah")
493 508 with open(filename, "r", encoding="utf-8") as f:
494 509 self.assertEqual(f.read(), "blah")
495 510
496 511 def test_new_main_mod(self):
497 512 # Smoketest to check that this accepts a unicode module name
498 513 name = u'jiefmw'
499 514 mod = ip.new_main_mod(u'%s.py' % name, name)
500 515 self.assertEqual(mod.__name__, name)
501 516
502 517 def test_get_exception_only(self):
503 518 try:
504 519 raise KeyboardInterrupt
505 520 except KeyboardInterrupt:
506 521 msg = ip.get_exception_only()
507 522 self.assertEqual(msg, 'KeyboardInterrupt\n')
508 523
509 524 try:
510 525 raise DerivedInterrupt("foo")
511 526 except KeyboardInterrupt:
512 527 msg = ip.get_exception_only()
513 528 self.assertEqual(msg, 'IPython.core.tests.test_interactiveshell.DerivedInterrupt: foo\n')
514 529
515 530 def test_inspect_text(self):
516 531 ip.run_cell('a = 5')
517 532 text = ip.object_inspect_text('a')
518 533 self.assertIsInstance(text, str)
519 534
520 535 def test_last_execution_result(self):
521 536 """ Check that last execution result gets set correctly (GH-10702) """
522 537 result = ip.run_cell('a = 5; a')
523 538 self.assertTrue(ip.last_execution_succeeded)
524 539 self.assertEqual(ip.last_execution_result.result, 5)
525 540
526 541 result = ip.run_cell('a = x_invalid_id_x')
527 542 self.assertFalse(ip.last_execution_succeeded)
528 543 self.assertFalse(ip.last_execution_result.success)
529 544 self.assertIsInstance(ip.last_execution_result.error_in_exec, NameError)
530 545
531 546 def test_reset_aliasing(self):
532 547 """ Check that standard posix aliases work after %reset. """
533 548 if os.name != 'posix':
534 549 return
535 550
536 551 ip.reset()
537 552 for cmd in ('clear', 'more', 'less', 'man'):
538 553 res = ip.run_cell('%' + cmd)
539 554 self.assertEqual(res.success, True)
540 555
541 556
542 557 class TestSafeExecfileNonAsciiPath(unittest.TestCase):
543 558
544 559 @onlyif_unicode_paths
545 560 def setUp(self):
546 561 self.BASETESTDIR = tempfile.mkdtemp()
547 562 self.TESTDIR = join(self.BASETESTDIR, u"åäö")
548 563 os.mkdir(self.TESTDIR)
549 564 with open(
550 565 join(self.TESTDIR, "åäötestscript.py"), "w", encoding="utf-8"
551 566 ) as sfile:
552 567 sfile.write("pass\n")
553 568 self.oldpath = os.getcwd()
554 569 os.chdir(self.TESTDIR)
555 570 self.fname = u"åäötestscript.py"
556 571
557 572 def tearDown(self):
558 573 os.chdir(self.oldpath)
559 574 shutil.rmtree(self.BASETESTDIR)
560 575
561 576 @onlyif_unicode_paths
562 577 def test_1(self):
563 578 """Test safe_execfile with non-ascii path
564 579 """
565 580 ip.safe_execfile(self.fname, {}, raise_exceptions=True)
566 581
567 582 class ExitCodeChecks(tt.TempFileMixin):
568 583
569 584 def setUp(self):
570 585 self.system = ip.system_raw
571 586
572 587 def test_exit_code_ok(self):
573 588 self.system('exit 0')
574 589 self.assertEqual(ip.user_ns['_exit_code'], 0)
575 590
576 591 def test_exit_code_error(self):
577 592 self.system('exit 1')
578 593 self.assertEqual(ip.user_ns['_exit_code'], 1)
579 594
580 595 @skipif(not hasattr(signal, 'SIGALRM'))
581 596 def test_exit_code_signal(self):
582 597 self.mktmp("import signal, time\n"
583 598 "signal.setitimer(signal.ITIMER_REAL, 0.1)\n"
584 599 "time.sleep(1)\n")
585 600 self.system("%s %s" % (sys.executable, self.fname))
586 601 self.assertEqual(ip.user_ns['_exit_code'], -signal.SIGALRM)
587 602
588 603 @onlyif_cmds_exist("csh")
589 604 def test_exit_code_signal_csh(self): # pragma: no cover
590 605 SHELL = os.environ.get("SHELL", None)
591 606 os.environ["SHELL"] = find_cmd("csh")
592 607 try:
593 608 self.test_exit_code_signal()
594 609 finally:
595 610 if SHELL is not None:
596 611 os.environ['SHELL'] = SHELL
597 612 else:
598 613 del os.environ['SHELL']
599 614
600 615
601 616 class TestSystemRaw(ExitCodeChecks):
602 617
603 618 def setUp(self):
604 619 super().setUp()
605 620 self.system = ip.system_raw
606 621
607 622 @onlyif_unicode_paths
608 623 def test_1(self):
609 624 """Test system_raw with non-ascii cmd
610 625 """
611 626 cmd = u'''python -c "'åäö'" '''
612 627 ip.system_raw(cmd)
613 628
614 629 @mock.patch('subprocess.call', side_effect=KeyboardInterrupt)
615 630 @mock.patch('os.system', side_effect=KeyboardInterrupt)
616 631 def test_control_c(self, *mocks):
617 632 try:
618 633 self.system("sleep 1 # wont happen")
619 634 except KeyboardInterrupt: # pragma: no cove
620 635 self.fail(
621 636 "system call should intercept "
622 637 "keyboard interrupt from subprocess.call"
623 638 )
624 639 self.assertEqual(ip.user_ns["_exit_code"], -signal.SIGINT)
625 640
626 641 def test_magic_warnings(self):
627 642 for magic_cmd in ("pip", "conda", "cd"):
628 643 with self.assertWarnsRegex(Warning, "You executed the system command"):
629 644 ip.system_raw(magic_cmd)
630 645
631 646 # TODO: Exit codes are currently ignored on Windows.
632 647 class TestSystemPipedExitCode(ExitCodeChecks):
633 648
634 649 def setUp(self):
635 650 super().setUp()
636 651 self.system = ip.system_piped
637 652
638 653 @skip_win32
639 654 def test_exit_code_ok(self):
640 655 ExitCodeChecks.test_exit_code_ok(self)
641 656
642 657 @skip_win32
643 658 def test_exit_code_error(self):
644 659 ExitCodeChecks.test_exit_code_error(self)
645 660
646 661 @skip_win32
647 662 def test_exit_code_signal(self):
648 663 ExitCodeChecks.test_exit_code_signal(self)
649 664
650 665 class TestModules(tt.TempFileMixin):
651 666 def test_extraneous_loads(self):
652 667 """Test we're not loading modules on startup that we shouldn't.
653 668 """
654 669 self.mktmp("import sys\n"
655 670 "print('numpy' in sys.modules)\n"
656 671 "print('ipyparallel' in sys.modules)\n"
657 672 "print('ipykernel' in sys.modules)\n"
658 673 )
659 674 out = "False\nFalse\nFalse\n"
660 675 tt.ipexec_validate(self.fname, out)
661 676
662 677 class Negator(ast.NodeTransformer):
663 678 """Negates all number literals in an AST."""
664 679
665 680 # for python 3.7 and earlier
666 681 def visit_Num(self, node):
667 682 node.n = -node.n
668 683 return node
669 684
670 685 # for python 3.8+
671 686 def visit_Constant(self, node):
672 687 if isinstance(node.value, int):
673 688 return self.visit_Num(node)
674 689 return node
675 690
676 691 class TestAstTransform(unittest.TestCase):
677 692 def setUp(self):
678 693 self.negator = Negator()
679 694 ip.ast_transformers.append(self.negator)
680 695
681 696 def tearDown(self):
682 697 ip.ast_transformers.remove(self.negator)
683 698
684 699 def test_non_int_const(self):
685 700 with tt.AssertPrints("hello"):
686 701 ip.run_cell('print("hello")')
687 702
688 703 def test_run_cell(self):
689 704 with tt.AssertPrints("-34"):
690 705 ip.run_cell("print(12 + 22)")
691 706
692 707 # A named reference to a number shouldn't be transformed.
693 708 ip.user_ns["n"] = 55
694 709 with tt.AssertNotPrints("-55"):
695 710 ip.run_cell("print(n)")
696 711
697 712 def test_timeit(self):
698 713 called = set()
699 714 def f(x):
700 715 called.add(x)
701 716 ip.push({'f':f})
702 717
703 718 with tt.AssertPrints("std. dev. of"):
704 719 ip.run_line_magic("timeit", "-n1 f(1)")
705 720 self.assertEqual(called, {-1})
706 721 called.clear()
707 722
708 723 with tt.AssertPrints("std. dev. of"):
709 724 ip.run_cell_magic("timeit", "-n1 f(2)", "f(3)")
710 725 self.assertEqual(called, {-2, -3})
711 726
712 727 def test_time(self):
713 728 called = []
714 729 def f(x):
715 730 called.append(x)
716 731 ip.push({'f':f})
717 732
718 733 # Test with an expression
719 734 with tt.AssertPrints("Wall time: "):
720 735 ip.run_line_magic("time", "f(5+9)")
721 736 self.assertEqual(called, [-14])
722 737 called[:] = []
723 738
724 739 # Test with a statement (different code path)
725 740 with tt.AssertPrints("Wall time: "):
726 741 ip.run_line_magic("time", "a = f(-3 + -2)")
727 742 self.assertEqual(called, [5])
728 743
729 744 def test_macro(self):
730 745 ip.push({'a':10})
731 746 # The AST transformation makes this do a+=-1
732 747 ip.define_macro("amacro", "a+=1\nprint(a)")
733 748
734 749 with tt.AssertPrints("9"):
735 750 ip.run_cell("amacro")
736 751 with tt.AssertPrints("8"):
737 752 ip.run_cell("amacro")
738 753
739 754 class TestMiscTransform(unittest.TestCase):
740 755
741 756
742 757 def test_transform_only_once(self):
743 758 cleanup = 0
744 759 line_t = 0
745 760 def count_cleanup(lines):
746 761 nonlocal cleanup
747 762 cleanup += 1
748 763 return lines
749 764
750 765 def count_line_t(lines):
751 766 nonlocal line_t
752 767 line_t += 1
753 768 return lines
754 769
755 770 ip.input_transformer_manager.cleanup_transforms.append(count_cleanup)
756 771 ip.input_transformer_manager.line_transforms.append(count_line_t)
757 772
758 773 ip.run_cell('1')
759 774
760 775 assert cleanup == 1
761 776 assert line_t == 1
762 777
763 778 class IntegerWrapper(ast.NodeTransformer):
764 779 """Wraps all integers in a call to Integer()"""
765 780
766 781 # for Python 3.7 and earlier
767 782
768 783 # for Python 3.7 and earlier
769 784 def visit_Num(self, node):
770 785 if isinstance(node.n, int):
771 786 return ast.Call(func=ast.Name(id='Integer', ctx=ast.Load()),
772 787 args=[node], keywords=[])
773 788 return node
774 789
775 790 # For Python 3.8+
776 791 def visit_Constant(self, node):
777 792 if isinstance(node.value, int):
778 793 return self.visit_Num(node)
779 794 return node
780 795
781 796
782 797 class TestAstTransform2(unittest.TestCase):
783 798 def setUp(self):
784 799 self.intwrapper = IntegerWrapper()
785 800 ip.ast_transformers.append(self.intwrapper)
786 801
787 802 self.calls = []
788 803 def Integer(*args):
789 804 self.calls.append(args)
790 805 return args
791 806 ip.push({"Integer": Integer})
792 807
793 808 def tearDown(self):
794 809 ip.ast_transformers.remove(self.intwrapper)
795 810 del ip.user_ns['Integer']
796 811
797 812 def test_run_cell(self):
798 813 ip.run_cell("n = 2")
799 814 self.assertEqual(self.calls, [(2,)])
800 815
801 816 # This shouldn't throw an error
802 817 ip.run_cell("o = 2.0")
803 818 self.assertEqual(ip.user_ns['o'], 2.0)
804 819
805 820 def test_run_cell_non_int(self):
806 821 ip.run_cell("n = 'a'")
807 822 assert self.calls == []
808 823
809 824 def test_timeit(self):
810 825 called = set()
811 826 def f(x):
812 827 called.add(x)
813 828 ip.push({'f':f})
814 829
815 830 with tt.AssertPrints("std. dev. of"):
816 831 ip.run_line_magic("timeit", "-n1 f(1)")
817 832 self.assertEqual(called, {(1,)})
818 833 called.clear()
819 834
820 835 with tt.AssertPrints("std. dev. of"):
821 836 ip.run_cell_magic("timeit", "-n1 f(2)", "f(3)")
822 837 self.assertEqual(called, {(2,), (3,)})
823 838
824 839 class ErrorTransformer(ast.NodeTransformer):
825 840 """Throws an error when it sees a number."""
826 841
827 842 def visit_Constant(self, node):
828 843 if isinstance(node.value, int):
829 844 raise ValueError("test")
830 845 return node
831 846
832 847
833 848 class TestAstTransformError(unittest.TestCase):
834 849 def test_unregistering(self):
835 850 err_transformer = ErrorTransformer()
836 851 ip.ast_transformers.append(err_transformer)
837 852
838 853 with self.assertWarnsRegex(UserWarning, "It will be unregistered"):
839 854 ip.run_cell("1 + 2")
840 855
841 856 # This should have been removed.
842 857 self.assertNotIn(err_transformer, ip.ast_transformers)
843 858
844 859
845 860 class StringRejector(ast.NodeTransformer):
846 861 """Throws an InputRejected when it sees a string literal.
847 862
848 863 Used to verify that NodeTransformers can signal that a piece of code should
849 864 not be executed by throwing an InputRejected.
850 865 """
851 866
852 867 # 3.8 only
853 868 def visit_Constant(self, node):
854 869 if isinstance(node.value, str):
855 870 raise InputRejected("test")
856 871 return node
857 872
858 873
859 874 class TestAstTransformInputRejection(unittest.TestCase):
860 875
861 876 def setUp(self):
862 877 self.transformer = StringRejector()
863 878 ip.ast_transformers.append(self.transformer)
864 879
865 880 def tearDown(self):
866 881 ip.ast_transformers.remove(self.transformer)
867 882
868 883 def test_input_rejection(self):
869 884 """Check that NodeTransformers can reject input."""
870 885
871 886 expect_exception_tb = tt.AssertPrints("InputRejected: test")
872 887 expect_no_cell_output = tt.AssertNotPrints("'unsafe'", suppress=False)
873 888
874 889 # Run the same check twice to verify that the transformer is not
875 890 # disabled after raising.
876 891 with expect_exception_tb, expect_no_cell_output:
877 892 ip.run_cell("'unsafe'")
878 893
879 894 with expect_exception_tb, expect_no_cell_output:
880 895 res = ip.run_cell("'unsafe'")
881 896
882 897 self.assertIsInstance(res.error_before_exec, InputRejected)
883 898
884 899 def test__IPYTHON__():
885 900 # This shouldn't raise a NameError, that's all
886 901 __IPYTHON__
887 902
888 903
889 904 class DummyRepr(object):
890 905 def __repr__(self):
891 906 return "DummyRepr"
892 907
893 908 def _repr_html_(self):
894 909 return "<b>dummy</b>"
895 910
896 911 def _repr_javascript_(self):
897 912 return "console.log('hi');", {'key': 'value'}
898 913
899 914
900 915 def test_user_variables():
901 916 # enable all formatters
902 917 ip.display_formatter.active_types = ip.display_formatter.format_types
903 918
904 919 ip.user_ns['dummy'] = d = DummyRepr()
905 920 keys = {'dummy', 'doesnotexist'}
906 921 r = ip.user_expressions({ key:key for key in keys})
907 922
908 923 assert keys == set(r.keys())
909 924 dummy = r["dummy"]
910 925 assert {"status", "data", "metadata"} == set(dummy.keys())
911 926 assert dummy["status"] == "ok"
912 927 data = dummy["data"]
913 928 metadata = dummy["metadata"]
914 929 assert data.get("text/html") == d._repr_html_()
915 930 js, jsmd = d._repr_javascript_()
916 931 assert data.get("application/javascript") == js
917 932 assert metadata.get("application/javascript") == jsmd
918 933
919 934 dne = r["doesnotexist"]
920 935 assert dne["status"] == "error"
921 936 assert dne["ename"] == "NameError"
922 937
923 938 # back to text only
924 939 ip.display_formatter.active_types = ['text/plain']
925 940
926 941 def test_user_expression():
927 942 # enable all formatters
928 943 ip.display_formatter.active_types = ip.display_formatter.format_types
929 944 query = {
930 945 'a' : '1 + 2',
931 946 'b' : '1/0',
932 947 }
933 948 r = ip.user_expressions(query)
934 949 import pprint
935 950 pprint.pprint(r)
936 951 assert set(r.keys()) == set(query.keys())
937 952 a = r["a"]
938 953 assert {"status", "data", "metadata"} == set(a.keys())
939 954 assert a["status"] == "ok"
940 955 data = a["data"]
941 956 metadata = a["metadata"]
942 957 assert data.get("text/plain") == "3"
943 958
944 959 b = r["b"]
945 960 assert b["status"] == "error"
946 961 assert b["ename"] == "ZeroDivisionError"
947 962
948 963 # back to text only
949 964 ip.display_formatter.active_types = ['text/plain']
950 965
951 966
952 967 class TestSyntaxErrorTransformer(unittest.TestCase):
953 968 """Check that SyntaxError raised by an input transformer is handled by run_cell()"""
954 969
955 970 @staticmethod
956 971 def transformer(lines):
957 972 for line in lines:
958 973 pos = line.find('syntaxerror')
959 974 if pos >= 0:
960 975 e = SyntaxError('input contains "syntaxerror"')
961 976 e.text = line
962 977 e.offset = pos + 1
963 978 raise e
964 979 return lines
965 980
966 981 def setUp(self):
967 982 ip.input_transformers_post.append(self.transformer)
968 983
969 984 def tearDown(self):
970 985 ip.input_transformers_post.remove(self.transformer)
971 986
972 987 def test_syntaxerror_input_transformer(self):
973 988 with tt.AssertPrints('1234'):
974 989 ip.run_cell('1234')
975 990 with tt.AssertPrints('SyntaxError: invalid syntax'):
976 991 ip.run_cell('1 2 3') # plain python syntax error
977 992 with tt.AssertPrints('SyntaxError: input contains "syntaxerror"'):
978 993 ip.run_cell('2345 # syntaxerror') # input transformer syntax error
979 994 with tt.AssertPrints('3456'):
980 995 ip.run_cell('3456')
981 996
982 997
983 998 class TestWarningSuppression(unittest.TestCase):
984 999 def test_warning_suppression(self):
985 1000 ip.run_cell("import warnings")
986 1001 try:
987 1002 with self.assertWarnsRegex(UserWarning, "asdf"):
988 1003 ip.run_cell("warnings.warn('asdf')")
989 1004 # Here's the real test -- if we run that again, we should get the
990 1005 # warning again. Traditionally, each warning was only issued once per
991 1006 # IPython session (approximately), even if the user typed in new and
992 1007 # different code that should have also triggered the warning, leading
993 1008 # to much confusion.
994 1009 with self.assertWarnsRegex(UserWarning, "asdf"):
995 1010 ip.run_cell("warnings.warn('asdf')")
996 1011 finally:
997 1012 ip.run_cell("del warnings")
998 1013
999 1014
1000 1015 def test_deprecation_warning(self):
1001 1016 ip.run_cell("""
1002 1017 import warnings
1003 1018 def wrn():
1004 1019 warnings.warn(
1005 1020 "I AM A WARNING",
1006 1021 DeprecationWarning
1007 1022 )
1008 1023 """)
1009 1024 try:
1010 1025 with self.assertWarnsRegex(DeprecationWarning, "I AM A WARNING"):
1011 1026 ip.run_cell("wrn()")
1012 1027 finally:
1013 1028 ip.run_cell("del warnings")
1014 1029 ip.run_cell("del wrn")
1015 1030
1016 1031
1017 1032 class TestImportNoDeprecate(tt.TempFileMixin):
1018 1033
1019 1034 def setUp(self):
1020 1035 """Make a valid python temp file."""
1021 1036 self.mktmp("""
1022 1037 import warnings
1023 1038 def wrn():
1024 1039 warnings.warn(
1025 1040 "I AM A WARNING",
1026 1041 DeprecationWarning
1027 1042 )
1028 1043 """)
1029 1044 super().setUp()
1030 1045
1031 1046 def test_no_dep(self):
1032 1047 """
1033 1048 No deprecation warning should be raised from imported functions
1034 1049 """
1035 1050 ip.run_cell("from {} import wrn".format(self.fname))
1036 1051
1037 1052 with tt.AssertNotPrints("I AM A WARNING"):
1038 1053 ip.run_cell("wrn()")
1039 1054 ip.run_cell("del wrn")
1040 1055
1041 1056
1042 1057 def test_custom_exc_count():
1043 1058 hook = mock.Mock(return_value=None)
1044 1059 ip.set_custom_exc((SyntaxError,), hook)
1045 1060 before = ip.execution_count
1046 1061 ip.run_cell("def foo()", store_history=True)
1047 1062 # restore default excepthook
1048 1063 ip.set_custom_exc((), None)
1049 1064 assert hook.call_count == 1
1050 1065 assert ip.execution_count == before + 1
1051 1066
1052 1067
1053 1068 def test_run_cell_async():
1054 1069 ip.run_cell("import asyncio")
1055 1070 coro = ip.run_cell_async("await asyncio.sleep(0.01)\n5")
1056 1071 assert asyncio.iscoroutine(coro)
1057 1072 loop = asyncio.new_event_loop()
1058 1073 result = loop.run_until_complete(coro)
1059 1074 assert isinstance(result, interactiveshell.ExecutionResult)
1060 1075 assert result.result == 5
1061 1076
1062 1077
1063 1078 def test_run_cell_await():
1064 1079 ip.run_cell("import asyncio")
1065 1080 result = ip.run_cell("await asyncio.sleep(0.01); 10")
1066 1081 assert ip.user_ns["_"] == 10
1067 1082
1068 1083
1069 1084 def test_run_cell_asyncio_run():
1070 1085 ip.run_cell("import asyncio")
1071 1086 result = ip.run_cell("await asyncio.sleep(0.01); 1")
1072 1087 assert ip.user_ns["_"] == 1
1073 1088 result = ip.run_cell("asyncio.run(asyncio.sleep(0.01)); 2")
1074 1089 assert ip.user_ns["_"] == 2
1075 1090 result = ip.run_cell("await asyncio.sleep(0.01); 3")
1076 1091 assert ip.user_ns["_"] == 3
1077 1092
1078 1093
1079 1094 def test_should_run_async():
1080 1095 assert not ip.should_run_async("a = 5")
1081 1096 assert ip.should_run_async("await x")
1082 1097 assert ip.should_run_async("import asyncio; await asyncio.sleep(1)")
1083 1098
1084 1099
1085 1100 def test_set_custom_completer():
1086 1101 num_completers = len(ip.Completer.matchers)
1087 1102
1088 1103 def foo(*args, **kwargs):
1089 1104 return "I'm a completer!"
1090 1105
1091 1106 ip.set_custom_completer(foo, 0)
1092 1107
1093 1108 # check that we've really added a new completer
1094 1109 assert len(ip.Completer.matchers) == num_completers + 1
1095 1110
1096 1111 # check that the first completer is the function we defined
1097 1112 assert ip.Completer.matchers[0]() == "I'm a completer!"
1098 1113
1099 1114 # clean up
1100 1115 ip.Completer.custom_matchers.pop()
General Comments 0
You need to be logged in to leave comments. Login now