##// END OF EJS Templates
Skip test on problematic PyPy versions...
Matthias Bussonnier -
Show More
@@ -1,1197 +1,1201 b''
1 1 # -*- coding: utf-8 -*-
2 2 """Tests for the key interactiveshell module.
3 3
4 4 Historically the main classes in interactiveshell have been under-tested. This
5 5 module should grow as many single-method tests as possible to trap many of the
6 6 recurring bugs we seem to encounter with high-level interaction.
7 7 """
8 8
9 9 # Copyright (c) IPython Development Team.
10 10 # Distributed under the terms of the Modified BSD License.
11 11
12 12 import asyncio
13 13 import ast
14 14 import os
15 15 import signal
16 16 import shutil
17 17 import sys
18 18 import tempfile
19 19 import unittest
20 20 import pytest
21 21 from unittest import mock
22 22
23 23 from os.path import join
24 24
25 25 from IPython.core.error import InputRejected
26 26 from IPython.core.inputtransformer import InputTransformer
27 27 from IPython.core import interactiveshell
28 28 from IPython.core.oinspect import OInfo
29 29 from IPython.testing.decorators import (
30 30 skipif, skip_win32, onlyif_unicode_paths, onlyif_cmds_exist,
31 31 )
32 32 from IPython.testing import tools as tt
33 33 from IPython.utils.process import find_cmd
34 34
35 35 #-----------------------------------------------------------------------------
36 36 # Globals
37 37 #-----------------------------------------------------------------------------
38 38 # This is used by every single test, no point repeating it ad nauseam
39 39
40 40 #-----------------------------------------------------------------------------
41 41 # Tests
42 42 #-----------------------------------------------------------------------------
43 43
44 44 class DerivedInterrupt(KeyboardInterrupt):
45 45 pass
46 46
47 47 class InteractiveShellTestCase(unittest.TestCase):
48 48 def test_naked_string_cells(self):
49 49 """Test that cells with only naked strings are fully executed"""
50 50 # First, single-line inputs
51 51 ip.run_cell('"a"\n')
52 52 self.assertEqual(ip.user_ns['_'], 'a')
53 53 # And also multi-line cells
54 54 ip.run_cell('"""a\nb"""\n')
55 55 self.assertEqual(ip.user_ns['_'], 'a\nb')
56 56
57 57 def test_run_empty_cell(self):
58 58 """Just make sure we don't get a horrible error with a blank
59 59 cell of input. Yes, I did overlook that."""
60 60 old_xc = ip.execution_count
61 61 res = ip.run_cell('')
62 62 self.assertEqual(ip.execution_count, old_xc)
63 63 self.assertEqual(res.execution_count, None)
64 64
65 65 def test_run_cell_multiline(self):
66 66 """Multi-block, multi-line cells must execute correctly.
67 67 """
68 68 src = '\n'.join(["x=1",
69 69 "y=2",
70 70 "if 1:",
71 71 " x += 1",
72 72 " y += 1",])
73 73 res = ip.run_cell(src)
74 74 self.assertEqual(ip.user_ns['x'], 2)
75 75 self.assertEqual(ip.user_ns['y'], 3)
76 76 self.assertEqual(res.success, True)
77 77 self.assertEqual(res.result, None)
78 78
79 79 def test_multiline_string_cells(self):
80 80 "Code sprinkled with multiline strings should execute (GH-306)"
81 81 ip.run_cell('tmp=0')
82 82 self.assertEqual(ip.user_ns['tmp'], 0)
83 83 res = ip.run_cell('tmp=1;"""a\nb"""\n')
84 84 self.assertEqual(ip.user_ns['tmp'], 1)
85 85 self.assertEqual(res.success, True)
86 86 self.assertEqual(res.result, "a\nb")
87 87
88 88 def test_dont_cache_with_semicolon(self):
89 89 "Ending a line with semicolon should not cache the returned object (GH-307)"
90 90 oldlen = len(ip.user_ns['Out'])
91 91 for cell in ['1;', '1;1;']:
92 92 res = ip.run_cell(cell, store_history=True)
93 93 newlen = len(ip.user_ns['Out'])
94 94 self.assertEqual(oldlen, newlen)
95 95 self.assertIsNone(res.result)
96 96 i = 0
97 97 #also test the default caching behavior
98 98 for cell in ['1', '1;1']:
99 99 ip.run_cell(cell, store_history=True)
100 100 newlen = len(ip.user_ns['Out'])
101 101 i += 1
102 102 self.assertEqual(oldlen+i, newlen)
103 103
104 104 def test_syntax_error(self):
105 105 res = ip.run_cell("raise = 3")
106 106 self.assertIsInstance(res.error_before_exec, SyntaxError)
107 107
108 108 def test_open_standard_input_stream(self):
109 109 res = ip.run_cell("open(0)")
110 110 self.assertIsInstance(res.error_in_exec, ValueError)
111 111
112 112 def test_open_standard_output_stream(self):
113 113 res = ip.run_cell("open(1)")
114 114 self.assertIsInstance(res.error_in_exec, ValueError)
115 115
116 116 def test_open_standard_error_stream(self):
117 117 res = ip.run_cell("open(2)")
118 118 self.assertIsInstance(res.error_in_exec, ValueError)
119 119
120 120 def test_In_variable(self):
121 121 "Verify that In variable grows with user input (GH-284)"
122 122 oldlen = len(ip.user_ns['In'])
123 123 ip.run_cell('1;', store_history=True)
124 124 newlen = len(ip.user_ns['In'])
125 125 self.assertEqual(oldlen+1, newlen)
126 126 self.assertEqual(ip.user_ns['In'][-1],'1;')
127 127
128 128 def test_magic_names_in_string(self):
129 129 ip.run_cell('a = """\n%exit\n"""')
130 130 self.assertEqual(ip.user_ns['a'], '\n%exit\n')
131 131
132 132 def test_trailing_newline(self):
133 133 """test that running !(command) does not raise a SyntaxError"""
134 134 ip.run_cell('!(true)\n', False)
135 135 ip.run_cell('!(true)\n\n\n', False)
136 136
137 137 def test_gh_597(self):
138 138 """Pretty-printing lists of objects with non-ascii reprs may cause
139 139 problems."""
140 140 class Spam(object):
141 141 def __repr__(self):
142 142 return "\xe9"*50
143 143 import IPython.core.formatters
144 144 f = IPython.core.formatters.PlainTextFormatter()
145 145 f([Spam(),Spam()])
146 146
147 147
148 148 def test_future_flags(self):
149 149 """Check that future flags are used for parsing code (gh-777)"""
150 150 ip.run_cell('from __future__ import barry_as_FLUFL')
151 151 try:
152 152 ip.run_cell('prfunc_return_val = 1 <> 2')
153 153 assert 'prfunc_return_val' in ip.user_ns
154 154 finally:
155 155 # Reset compiler flags so we don't mess up other tests.
156 156 ip.compile.reset_compiler_flags()
157 157
158 158 def test_can_pickle(self):
159 159 "Can we pickle objects defined interactively (GH-29)"
160 160 ip = get_ipython()
161 161 ip.reset()
162 162 ip.run_cell(("class Mylist(list):\n"
163 163 " def __init__(self,x=[]):\n"
164 164 " list.__init__(self,x)"))
165 165 ip.run_cell("w=Mylist([1,2,3])")
166 166
167 167 from pickle import dumps
168 168
169 169 # We need to swap in our main module - this is only necessary
170 170 # inside the test framework, because IPython puts the interactive module
171 171 # in place (but the test framework undoes this).
172 172 _main = sys.modules['__main__']
173 173 sys.modules['__main__'] = ip.user_module
174 174 try:
175 175 res = dumps(ip.user_ns["w"])
176 176 finally:
177 177 sys.modules['__main__'] = _main
178 178 self.assertTrue(isinstance(res, bytes))
179 179
180 180 def test_global_ns(self):
181 181 "Code in functions must be able to access variables outside them."
182 182 ip = get_ipython()
183 183 ip.run_cell("a = 10")
184 184 ip.run_cell(("def f(x):\n"
185 185 " return x + a"))
186 186 ip.run_cell("b = f(12)")
187 187 self.assertEqual(ip.user_ns["b"], 22)
188 188
189 189 def test_bad_custom_tb(self):
190 190 """Check that InteractiveShell is protected from bad custom exception handlers"""
191 191 ip.set_custom_exc((IOError,), lambda etype,value,tb: 1/0)
192 192 self.assertEqual(ip.custom_exceptions, (IOError,))
193 193 with tt.AssertPrints("Custom TB Handler failed", channel='stderr'):
194 194 ip.run_cell(u'raise IOError("foo")')
195 195 self.assertEqual(ip.custom_exceptions, ())
196 196
197 197 def test_bad_custom_tb_return(self):
198 198 """Check that InteractiveShell is protected from bad return types in custom exception handlers"""
199 199 ip.set_custom_exc((NameError,),lambda etype,value,tb, tb_offset=None: 1)
200 200 self.assertEqual(ip.custom_exceptions, (NameError,))
201 201 with tt.AssertPrints("Custom TB Handler failed", channel='stderr'):
202 202 ip.run_cell(u'a=abracadabra')
203 203 self.assertEqual(ip.custom_exceptions, ())
204 204
205 205 def test_drop_by_id(self):
206 206 myvars = {"a":object(), "b":object(), "c": object()}
207 207 ip.push(myvars, interactive=False)
208 208 for name in myvars:
209 209 assert name in ip.user_ns, name
210 210 assert name in ip.user_ns_hidden, name
211 211 ip.user_ns['b'] = 12
212 212 ip.drop_by_id(myvars)
213 213 for name in ["a", "c"]:
214 214 assert name not in ip.user_ns, name
215 215 assert name not in ip.user_ns_hidden, name
216 216 assert ip.user_ns['b'] == 12
217 217 ip.reset()
218 218
219 219 def test_var_expand(self):
220 220 ip.user_ns['f'] = u'Ca\xf1o'
221 221 self.assertEqual(ip.var_expand(u'echo $f'), u'echo Ca\xf1o')
222 222 self.assertEqual(ip.var_expand(u'echo {f}'), u'echo Ca\xf1o')
223 223 self.assertEqual(ip.var_expand(u'echo {f[:-1]}'), u'echo Ca\xf1')
224 224 self.assertEqual(ip.var_expand(u'echo {1*2}'), u'echo 2')
225 225
226 226 self.assertEqual(ip.var_expand(u"grep x | awk '{print $1}'"), u"grep x | awk '{print $1}'")
227 227
228 228 ip.user_ns['f'] = b'Ca\xc3\xb1o'
229 229 # This should not raise any exception:
230 230 ip.var_expand(u'echo $f')
231 231
232 232 def test_var_expand_local(self):
233 233 """Test local variable expansion in !system and %magic calls"""
234 234 # !system
235 235 ip.run_cell(
236 236 "def test():\n"
237 237 ' lvar = "ttt"\n'
238 238 " ret = !echo {lvar}\n"
239 239 " return ret[0]\n"
240 240 )
241 241 res = ip.user_ns["test"]()
242 242 self.assertIn("ttt", res)
243 243
244 244 # %magic
245 245 ip.run_cell(
246 246 "def makemacro():\n"
247 247 ' macroname = "macro_var_expand_locals"\n'
248 248 " %macro {macroname} codestr\n"
249 249 )
250 250 ip.user_ns["codestr"] = "str(12)"
251 251 ip.run_cell("makemacro()")
252 252 self.assertIn("macro_var_expand_locals", ip.user_ns)
253 253
254 254 def test_var_expand_self(self):
255 255 """Test variable expansion with the name 'self', which was failing.
256 256
257 257 See https://github.com/ipython/ipython/issues/1878#issuecomment-7698218
258 258 """
259 259 ip.run_cell(
260 260 "class cTest:\n"
261 261 ' classvar="see me"\n'
262 262 " def test(self):\n"
263 263 " res = !echo Variable: {self.classvar}\n"
264 264 " return res[0]\n"
265 265 )
266 266 self.assertIn("see me", ip.user_ns["cTest"]().test())
267 267
268 268 def test_bad_var_expand(self):
269 269 """var_expand on invalid formats shouldn't raise"""
270 270 # SyntaxError
271 271 self.assertEqual(ip.var_expand(u"{'a':5}"), u"{'a':5}")
272 272 # NameError
273 273 self.assertEqual(ip.var_expand(u"{asdf}"), u"{asdf}")
274 274 # ZeroDivisionError
275 275 self.assertEqual(ip.var_expand(u"{1/0}"), u"{1/0}")
276 276
277 277 def test_silent_postexec(self):
278 278 """run_cell(silent=True) doesn't invoke pre/post_run_cell callbacks"""
279 279 pre_explicit = mock.Mock()
280 280 pre_always = mock.Mock()
281 281 post_explicit = mock.Mock()
282 282 post_always = mock.Mock()
283 283 all_mocks = [pre_explicit, pre_always, post_explicit, post_always]
284 284
285 285 ip.events.register('pre_run_cell', pre_explicit)
286 286 ip.events.register('pre_execute', pre_always)
287 287 ip.events.register('post_run_cell', post_explicit)
288 288 ip.events.register('post_execute', post_always)
289 289
290 290 try:
291 291 ip.run_cell("1", silent=True)
292 292 assert pre_always.called
293 293 assert not pre_explicit.called
294 294 assert post_always.called
295 295 assert not post_explicit.called
296 296 # double-check that non-silent exec did what we expected
297 297 # silent to avoid
298 298 ip.run_cell("1")
299 299 assert pre_explicit.called
300 300 assert post_explicit.called
301 301 info, = pre_explicit.call_args[0]
302 302 result, = post_explicit.call_args[0]
303 303 self.assertEqual(info, result.info)
304 304 # check that post hooks are always called
305 305 [m.reset_mock() for m in all_mocks]
306 306 ip.run_cell("syntax error")
307 307 assert pre_always.called
308 308 assert pre_explicit.called
309 309 assert post_always.called
310 310 assert post_explicit.called
311 311 info, = pre_explicit.call_args[0]
312 312 result, = post_explicit.call_args[0]
313 313 self.assertEqual(info, result.info)
314 314 finally:
315 315 # remove post-exec
316 316 ip.events.unregister('pre_run_cell', pre_explicit)
317 317 ip.events.unregister('pre_execute', pre_always)
318 318 ip.events.unregister('post_run_cell', post_explicit)
319 319 ip.events.unregister('post_execute', post_always)
320 320
321 321 def test_silent_noadvance(self):
322 322 """run_cell(silent=True) doesn't advance execution_count"""
323 323 ec = ip.execution_count
324 324 # silent should force store_history=False
325 325 ip.run_cell("1", store_history=True, silent=True)
326 326
327 327 self.assertEqual(ec, ip.execution_count)
328 328 # double-check that non-silent exec did what we expected
329 329 # silent to avoid
330 330 ip.run_cell("1", store_history=True)
331 331 self.assertEqual(ec+1, ip.execution_count)
332 332
333 333 def test_silent_nodisplayhook(self):
334 334 """run_cell(silent=True) doesn't trigger displayhook"""
335 335 d = dict(called=False)
336 336
337 337 trap = ip.display_trap
338 338 save_hook = trap.hook
339 339
340 340 def failing_hook(*args, **kwargs):
341 341 d['called'] = True
342 342
343 343 try:
344 344 trap.hook = failing_hook
345 345 res = ip.run_cell("1", silent=True)
346 346 self.assertFalse(d['called'])
347 347 self.assertIsNone(res.result)
348 348 # double-check that non-silent exec did what we expected
349 349 # silent to avoid
350 350 ip.run_cell("1")
351 351 self.assertTrue(d['called'])
352 352 finally:
353 353 trap.hook = save_hook
354 354
355 355 def test_ofind_line_magic(self):
356 356 from IPython.core.magic import register_line_magic
357 357
358 358 @register_line_magic
359 359 def lmagic(line):
360 360 "A line magic"
361 361
362 362 # Get info on line magic
363 363 lfind = ip._ofind("lmagic")
364 364 info = OInfo(
365 365 found=True,
366 366 isalias=False,
367 367 ismagic=True,
368 368 namespace="IPython internal",
369 369 obj=lmagic,
370 370 parent=None,
371 371 )
372 372 self.assertEqual(lfind, info)
373 373
374 374 def test_ofind_cell_magic(self):
375 375 from IPython.core.magic import register_cell_magic
376 376
377 377 @register_cell_magic
378 378 def cmagic(line, cell):
379 379 "A cell magic"
380 380
381 381 # Get info on cell magic
382 382 find = ip._ofind("cmagic")
383 383 info = OInfo(
384 384 found=True,
385 385 isalias=False,
386 386 ismagic=True,
387 387 namespace="IPython internal",
388 388 obj=cmagic,
389 389 parent=None,
390 390 )
391 391 self.assertEqual(find, info)
392 392
393 393 def test_ofind_property_with_error(self):
394 394 class A(object):
395 395 @property
396 396 def foo(self):
397 397 raise NotImplementedError() # pragma: no cover
398 398
399 399 a = A()
400 400
401 401 found = ip._ofind("a.foo", [("locals", locals())])
402 402 info = OInfo(
403 403 found=True,
404 404 isalias=False,
405 405 ismagic=False,
406 406 namespace="locals",
407 407 obj=A.foo,
408 408 parent=a,
409 409 )
410 410 self.assertEqual(found, info)
411 411
412 412 def test_ofind_multiple_attribute_lookups(self):
413 413 class A(object):
414 414 @property
415 415 def foo(self):
416 416 raise NotImplementedError() # pragma: no cover
417 417
418 418 a = A()
419 419 a.a = A()
420 420 a.a.a = A()
421 421
422 422 found = ip._ofind("a.a.a.foo", [("locals", locals())])
423 423 info = OInfo(
424 424 found=True,
425 425 isalias=False,
426 426 ismagic=False,
427 427 namespace="locals",
428 428 obj=A.foo,
429 429 parent=a.a.a,
430 430 )
431 431 self.assertEqual(found, info)
432 432
433 433 def test_ofind_slotted_attributes(self):
434 434 class A(object):
435 435 __slots__ = ['foo']
436 436 def __init__(self):
437 437 self.foo = 'bar'
438 438
439 439 a = A()
440 440 found = ip._ofind("a.foo", [("locals", locals())])
441 441 info = OInfo(
442 442 found=True,
443 443 isalias=False,
444 444 ismagic=False,
445 445 namespace="locals",
446 446 obj=a.foo,
447 447 parent=a,
448 448 )
449 449 self.assertEqual(found, info)
450 450
451 451 found = ip._ofind("a.bar", [("locals", locals())])
452 452 expected = OInfo(
453 453 found=False,
454 454 isalias=False,
455 455 ismagic=False,
456 456 namespace=None,
457 457 obj=None,
458 458 parent=a,
459 459 )
460 460 assert found == expected
461 461
462 462 def test_ofind_prefers_property_to_instance_level_attribute(self):
463 463 class A(object):
464 464 @property
465 465 def foo(self):
466 466 return 'bar'
467 467 a = A()
468 468 a.__dict__["foo"] = "baz"
469 469 self.assertEqual(a.foo, "bar")
470 470 found = ip._ofind("a.foo", [("locals", locals())])
471 471 self.assertIs(found.obj, A.foo)
472 472
473 473 def test_custom_syntaxerror_exception(self):
474 474 called = []
475 475 def my_handler(shell, etype, value, tb, tb_offset=None):
476 476 called.append(etype)
477 477 shell.showtraceback((etype, value, tb), tb_offset=tb_offset)
478 478
479 479 ip.set_custom_exc((SyntaxError,), my_handler)
480 480 try:
481 481 ip.run_cell("1f")
482 482 # Check that this was called, and only once.
483 483 self.assertEqual(called, [SyntaxError])
484 484 finally:
485 485 # Reset the custom exception hook
486 486 ip.set_custom_exc((), None)
487 487
488 488 def test_custom_exception(self):
489 489 called = []
490 490 def my_handler(shell, etype, value, tb, tb_offset=None):
491 491 called.append(etype)
492 492 shell.showtraceback((etype, value, tb), tb_offset=tb_offset)
493 493
494 494 ip.set_custom_exc((ValueError,), my_handler)
495 495 try:
496 496 res = ip.run_cell("raise ValueError('test')")
497 497 # Check that this was called, and only once.
498 498 self.assertEqual(called, [ValueError])
499 499 # Check that the error is on the result object
500 500 self.assertIsInstance(res.error_in_exec, ValueError)
501 501 finally:
502 502 # Reset the custom exception hook
503 503 ip.set_custom_exc((), None)
504 504
505 505 @mock.patch("builtins.print")
506 506 def test_showtraceback_with_surrogates(self, mocked_print):
507 507 values = []
508 508
509 509 def mock_print_func(value, sep=" ", end="\n", file=sys.stdout, flush=False):
510 510 values.append(value)
511 511 if value == chr(0xD8FF):
512 512 raise UnicodeEncodeError("utf-8", chr(0xD8FF), 0, 1, "")
513 513
514 514 # mock builtins.print
515 515 mocked_print.side_effect = mock_print_func
516 516
517 517 # ip._showtraceback() is replaced in globalipapp.py.
518 518 # Call original method to test.
519 519 interactiveshell.InteractiveShell._showtraceback(ip, None, None, chr(0xD8FF))
520 520
521 521 self.assertEqual(mocked_print.call_count, 2)
522 522 self.assertEqual(values, [chr(0xD8FF), "\\ud8ff"])
523 523
524 524 def test_mktempfile(self):
525 525 filename = ip.mktempfile()
526 526 # Check that we can open the file again on Windows
527 527 with open(filename, "w", encoding="utf-8") as f:
528 528 f.write("abc")
529 529
530 530 filename = ip.mktempfile(data="blah")
531 531 with open(filename, "r", encoding="utf-8") as f:
532 532 self.assertEqual(f.read(), "blah")
533 533
534 534 def test_new_main_mod(self):
535 535 # Smoketest to check that this accepts a unicode module name
536 536 name = u'jiefmw'
537 537 mod = ip.new_main_mod(u'%s.py' % name, name)
538 538 self.assertEqual(mod.__name__, name)
539 539
540 540 def test_get_exception_only(self):
541 541 try:
542 542 raise KeyboardInterrupt
543 543 except KeyboardInterrupt:
544 544 msg = ip.get_exception_only()
545 545 self.assertEqual(msg, 'KeyboardInterrupt\n')
546 546
547 547 try:
548 548 raise DerivedInterrupt("foo")
549 549 except KeyboardInterrupt:
550 550 msg = ip.get_exception_only()
551 551 self.assertEqual(msg, 'IPython.core.tests.test_interactiveshell.DerivedInterrupt: foo\n')
552 552
553 553 def test_inspect_text(self):
554 554 ip.run_cell('a = 5')
555 555 text = ip.object_inspect_text('a')
556 556 self.assertIsInstance(text, str)
557 557
558 558 def test_last_execution_result(self):
559 559 """ Check that last execution result gets set correctly (GH-10702) """
560 560 result = ip.run_cell('a = 5; a')
561 561 self.assertTrue(ip.last_execution_succeeded)
562 562 self.assertEqual(ip.last_execution_result.result, 5)
563 563
564 564 result = ip.run_cell('a = x_invalid_id_x')
565 565 self.assertFalse(ip.last_execution_succeeded)
566 566 self.assertFalse(ip.last_execution_result.success)
567 567 self.assertIsInstance(ip.last_execution_result.error_in_exec, NameError)
568 568
569 569 def test_reset_aliasing(self):
570 570 """ Check that standard posix aliases work after %reset. """
571 571 if os.name != 'posix':
572 572 return
573 573
574 574 ip.reset()
575 575 for cmd in ('clear', 'more', 'less', 'man'):
576 576 res = ip.run_cell('%' + cmd)
577 577 self.assertEqual(res.success, True)
578 578
579 579
580 @pytest.mark.skipif(
581 sys.implementation.name == "pypy"
582 and ((7, 3, 13) < sys.implementation.version < (7, 3, 16)),
583 reason="Unicode issues with scandir on PyPy, see https://github.com/pypy/pypy/issues/4860",
584 )
580 585 class TestSafeExecfileNonAsciiPath(unittest.TestCase):
581
582 586 @onlyif_unicode_paths
583 587 def setUp(self):
584 588 self.BASETESTDIR = tempfile.mkdtemp()
585 589 self.TESTDIR = join(self.BASETESTDIR, u"Γ₯Àâ")
586 590 os.mkdir(self.TESTDIR)
587 591 with open(
588 592 join(self.TESTDIR, "Γ₯Àâtestscript.py"), "w", encoding="utf-8"
589 593 ) as sfile:
590 594 sfile.write("pass\n")
591 595 self.oldpath = os.getcwd()
592 596 os.chdir(self.TESTDIR)
593 597 self.fname = u"Γ₯Àâtestscript.py"
594 598
595 599 def tearDown(self):
596 600 os.chdir(self.oldpath)
597 601 shutil.rmtree(self.BASETESTDIR)
598 602
599 603 @onlyif_unicode_paths
600 604 def test_1(self):
601 605 """Test safe_execfile with non-ascii path
602 606 """
603 607 ip.safe_execfile(self.fname, {}, raise_exceptions=True)
604 608
605 609 class ExitCodeChecks(tt.TempFileMixin):
606 610
607 611 def setUp(self):
608 612 self.system = ip.system_raw
609 613
610 614 def test_exit_code_ok(self):
611 615 self.system('exit 0')
612 616 self.assertEqual(ip.user_ns['_exit_code'], 0)
613 617
614 618 def test_exit_code_error(self):
615 619 self.system('exit 1')
616 620 self.assertEqual(ip.user_ns['_exit_code'], 1)
617 621
618 622 @skipif(not hasattr(signal, 'SIGALRM'))
619 623 def test_exit_code_signal(self):
620 624 self.mktmp("import signal, time\n"
621 625 "signal.setitimer(signal.ITIMER_REAL, 0.1)\n"
622 626 "time.sleep(1)\n")
623 627 self.system("%s %s" % (sys.executable, self.fname))
624 628 self.assertEqual(ip.user_ns['_exit_code'], -signal.SIGALRM)
625 629
626 630 @onlyif_cmds_exist("csh")
627 631 def test_exit_code_signal_csh(self): # pragma: no cover
628 632 SHELL = os.environ.get("SHELL", None)
629 633 os.environ["SHELL"] = find_cmd("csh")
630 634 try:
631 635 self.test_exit_code_signal()
632 636 finally:
633 637 if SHELL is not None:
634 638 os.environ['SHELL'] = SHELL
635 639 else:
636 640 del os.environ['SHELL']
637 641
638 642
639 643 class TestSystemRaw(ExitCodeChecks):
640 644
641 645 def setUp(self):
642 646 super().setUp()
643 647 self.system = ip.system_raw
644 648
645 649 @onlyif_unicode_paths
646 650 def test_1(self):
647 651 """Test system_raw with non-ascii cmd
648 652 """
649 653 cmd = u'''python -c "'Γ₯Àâ'" '''
650 654 ip.system_raw(cmd)
651 655
652 656 @mock.patch('subprocess.call', side_effect=KeyboardInterrupt)
653 657 @mock.patch('os.system', side_effect=KeyboardInterrupt)
654 658 def test_control_c(self, *mocks):
655 659 try:
656 660 self.system("sleep 1 # wont happen")
657 661 except KeyboardInterrupt: # pragma: no cove
658 662 self.fail(
659 663 "system call should intercept "
660 664 "keyboard interrupt from subprocess.call"
661 665 )
662 666 self.assertEqual(ip.user_ns["_exit_code"], -signal.SIGINT)
663 667
664 668
665 669 @pytest.mark.parametrize("magic_cmd", ["pip", "conda", "cd"])
666 670 def test_magic_warnings(magic_cmd):
667 671 if sys.platform == "win32":
668 672 to_mock = "os.system"
669 673 expected_arg, expected_kwargs = magic_cmd, dict()
670 674 else:
671 675 to_mock = "subprocess.call"
672 676 expected_arg, expected_kwargs = magic_cmd, dict(
673 677 shell=True, executable=os.environ.get("SHELL", None)
674 678 )
675 679
676 680 with mock.patch(to_mock, return_value=0) as mock_sub:
677 681 with pytest.warns(Warning, match=r"You executed the system command"):
678 682 ip.system_raw(magic_cmd)
679 683 mock_sub.assert_called_once_with(expected_arg, **expected_kwargs)
680 684
681 685
682 686 # TODO: Exit codes are currently ignored on Windows.
683 687 class TestSystemPipedExitCode(ExitCodeChecks):
684 688
685 689 def setUp(self):
686 690 super().setUp()
687 691 self.system = ip.system_piped
688 692
689 693 @skip_win32
690 694 def test_exit_code_ok(self):
691 695 ExitCodeChecks.test_exit_code_ok(self)
692 696
693 697 @skip_win32
694 698 def test_exit_code_error(self):
695 699 ExitCodeChecks.test_exit_code_error(self)
696 700
697 701 @skip_win32
698 702 def test_exit_code_signal(self):
699 703 ExitCodeChecks.test_exit_code_signal(self)
700 704
701 705 class TestModules(tt.TempFileMixin):
702 706 def test_extraneous_loads(self):
703 707 """Test we're not loading modules on startup that we shouldn't.
704 708 """
705 709 self.mktmp("import sys\n"
706 710 "print('numpy' in sys.modules)\n"
707 711 "print('ipyparallel' in sys.modules)\n"
708 712 "print('ipykernel' in sys.modules)\n"
709 713 )
710 714 out = "False\nFalse\nFalse\n"
711 715 tt.ipexec_validate(self.fname, out)
712 716
713 717 class Negator(ast.NodeTransformer):
714 718 """Negates all number literals in an AST."""
715 719
716 720 def visit_Num(self, node):
717 721 node.n = -node.n
718 722 return node
719 723
720 724 def visit_Constant(self, node):
721 725 if isinstance(node.value, int):
722 726 return self.visit_Num(node)
723 727 return node
724 728
725 729 class TestAstTransform(unittest.TestCase):
726 730 def setUp(self):
727 731 self.negator = Negator()
728 732 ip.ast_transformers.append(self.negator)
729 733
730 734 def tearDown(self):
731 735 ip.ast_transformers.remove(self.negator)
732 736
733 737 def test_non_int_const(self):
734 738 with tt.AssertPrints("hello"):
735 739 ip.run_cell('print("hello")')
736 740
737 741 def test_run_cell(self):
738 742 with tt.AssertPrints("-34"):
739 743 ip.run_cell("print(12 + 22)")
740 744
741 745 # A named reference to a number shouldn't be transformed.
742 746 ip.user_ns["n"] = 55
743 747 with tt.AssertNotPrints("-55"):
744 748 ip.run_cell("print(n)")
745 749
746 750 def test_timeit(self):
747 751 called = set()
748 752 def f(x):
749 753 called.add(x)
750 754 ip.push({'f':f})
751 755
752 756 with tt.AssertPrints("std. dev. of"):
753 757 ip.run_line_magic("timeit", "-n1 f(1)")
754 758 self.assertEqual(called, {-1})
755 759 called.clear()
756 760
757 761 with tt.AssertPrints("std. dev. of"):
758 762 ip.run_cell_magic("timeit", "-n1 f(2)", "f(3)")
759 763 self.assertEqual(called, {-2, -3})
760 764
761 765 def test_time(self):
762 766 called = []
763 767 def f(x):
764 768 called.append(x)
765 769 ip.push({'f':f})
766 770
767 771 # Test with an expression
768 772 with tt.AssertPrints("Wall time: "):
769 773 ip.run_line_magic("time", "f(5+9)")
770 774 self.assertEqual(called, [-14])
771 775 called[:] = []
772 776
773 777 # Test with a statement (different code path)
774 778 with tt.AssertPrints("Wall time: "):
775 779 ip.run_line_magic("time", "a = f(-3 + -2)")
776 780 self.assertEqual(called, [5])
777 781
778 782 def test_macro(self):
779 783 ip.push({'a':10})
780 784 # The AST transformation makes this do a+=-1
781 785 ip.define_macro("amacro", "a+=1\nprint(a)")
782 786
783 787 with tt.AssertPrints("9"):
784 788 ip.run_cell("amacro")
785 789 with tt.AssertPrints("8"):
786 790 ip.run_cell("amacro")
787 791
788 792 class TestMiscTransform(unittest.TestCase):
789 793
790 794
791 795 def test_transform_only_once(self):
792 796 cleanup = 0
793 797 line_t = 0
794 798 def count_cleanup(lines):
795 799 nonlocal cleanup
796 800 cleanup += 1
797 801 return lines
798 802
799 803 def count_line_t(lines):
800 804 nonlocal line_t
801 805 line_t += 1
802 806 return lines
803 807
804 808 ip.input_transformer_manager.cleanup_transforms.append(count_cleanup)
805 809 ip.input_transformer_manager.line_transforms.append(count_line_t)
806 810
807 811 ip.run_cell('1')
808 812
809 813 assert cleanup == 1
810 814 assert line_t == 1
811 815
812 816 class IntegerWrapper(ast.NodeTransformer):
813 817 """Wraps all integers in a call to Integer()"""
814 818
815 819 # for Python 3.7 and earlier
816 820
817 821 # for Python 3.7 and earlier
818 822 def visit_Num(self, node):
819 823 if isinstance(node.n, int):
820 824 return ast.Call(func=ast.Name(id='Integer', ctx=ast.Load()),
821 825 args=[node], keywords=[])
822 826 return node
823 827
824 828 # For Python 3.8+
825 829 def visit_Constant(self, node):
826 830 if isinstance(node.value, int):
827 831 return self.visit_Num(node)
828 832 return node
829 833
830 834
831 835 class TestAstTransform2(unittest.TestCase):
832 836 def setUp(self):
833 837 self.intwrapper = IntegerWrapper()
834 838 ip.ast_transformers.append(self.intwrapper)
835 839
836 840 self.calls = []
837 841 def Integer(*args):
838 842 self.calls.append(args)
839 843 return args
840 844 ip.push({"Integer": Integer})
841 845
842 846 def tearDown(self):
843 847 ip.ast_transformers.remove(self.intwrapper)
844 848 del ip.user_ns['Integer']
845 849
846 850 def test_run_cell(self):
847 851 ip.run_cell("n = 2")
848 852 self.assertEqual(self.calls, [(2,)])
849 853
850 854 # This shouldn't throw an error
851 855 ip.run_cell("o = 2.0")
852 856 self.assertEqual(ip.user_ns['o'], 2.0)
853 857
854 858 def test_run_cell_non_int(self):
855 859 ip.run_cell("n = 'a'")
856 860 assert self.calls == []
857 861
858 862 def test_timeit(self):
859 863 called = set()
860 864 def f(x):
861 865 called.add(x)
862 866 ip.push({'f':f})
863 867
864 868 with tt.AssertPrints("std. dev. of"):
865 869 ip.run_line_magic("timeit", "-n1 f(1)")
866 870 self.assertEqual(called, {(1,)})
867 871 called.clear()
868 872
869 873 with tt.AssertPrints("std. dev. of"):
870 874 ip.run_cell_magic("timeit", "-n1 f(2)", "f(3)")
871 875 self.assertEqual(called, {(2,), (3,)})
872 876
873 877 class ErrorTransformer(ast.NodeTransformer):
874 878 """Throws an error when it sees a number."""
875 879
876 880 def visit_Constant(self, node):
877 881 if isinstance(node.value, int):
878 882 raise ValueError("test")
879 883 return node
880 884
881 885
882 886 class TestAstTransformError(unittest.TestCase):
883 887 def test_unregistering(self):
884 888 err_transformer = ErrorTransformer()
885 889 ip.ast_transformers.append(err_transformer)
886 890
887 891 with self.assertWarnsRegex(UserWarning, "It will be unregistered"):
888 892 ip.run_cell("1 + 2")
889 893
890 894 # This should have been removed.
891 895 self.assertNotIn(err_transformer, ip.ast_transformers)
892 896
893 897
894 898 class StringRejector(ast.NodeTransformer):
895 899 """Throws an InputRejected when it sees a string literal.
896 900
897 901 Used to verify that NodeTransformers can signal that a piece of code should
898 902 not be executed by throwing an InputRejected.
899 903 """
900 904
901 905 def visit_Constant(self, node):
902 906 if isinstance(node.value, str):
903 907 raise InputRejected("test")
904 908 return node
905 909
906 910
907 911 class TestAstTransformInputRejection(unittest.TestCase):
908 912
909 913 def setUp(self):
910 914 self.transformer = StringRejector()
911 915 ip.ast_transformers.append(self.transformer)
912 916
913 917 def tearDown(self):
914 918 ip.ast_transformers.remove(self.transformer)
915 919
916 920 def test_input_rejection(self):
917 921 """Check that NodeTransformers can reject input."""
918 922
919 923 expect_exception_tb = tt.AssertPrints("InputRejected: test")
920 924 expect_no_cell_output = tt.AssertNotPrints("'unsafe'", suppress=False)
921 925
922 926 # Run the same check twice to verify that the transformer is not
923 927 # disabled after raising.
924 928 with expect_exception_tb, expect_no_cell_output:
925 929 ip.run_cell("'unsafe'")
926 930
927 931 with expect_exception_tb, expect_no_cell_output:
928 932 res = ip.run_cell("'unsafe'")
929 933
930 934 self.assertIsInstance(res.error_before_exec, InputRejected)
931 935
932 936 def test__IPYTHON__():
933 937 # This shouldn't raise a NameError, that's all
934 938 __IPYTHON__
935 939
936 940
937 941 class DummyRepr(object):
938 942 def __repr__(self):
939 943 return "DummyRepr"
940 944
941 945 def _repr_html_(self):
942 946 return "<b>dummy</b>"
943 947
944 948 def _repr_javascript_(self):
945 949 return "console.log('hi');", {'key': 'value'}
946 950
947 951
948 952 def test_user_variables():
949 953 # enable all formatters
950 954 ip.display_formatter.active_types = ip.display_formatter.format_types
951 955
952 956 ip.user_ns['dummy'] = d = DummyRepr()
953 957 keys = {'dummy', 'doesnotexist'}
954 958 r = ip.user_expressions({ key:key for key in keys})
955 959
956 960 assert keys == set(r.keys())
957 961 dummy = r["dummy"]
958 962 assert {"status", "data", "metadata"} == set(dummy.keys())
959 963 assert dummy["status"] == "ok"
960 964 data = dummy["data"]
961 965 metadata = dummy["metadata"]
962 966 assert data.get("text/html") == d._repr_html_()
963 967 js, jsmd = d._repr_javascript_()
964 968 assert data.get("application/javascript") == js
965 969 assert metadata.get("application/javascript") == jsmd
966 970
967 971 dne = r["doesnotexist"]
968 972 assert dne["status"] == "error"
969 973 assert dne["ename"] == "NameError"
970 974
971 975 # back to text only
972 976 ip.display_formatter.active_types = ['text/plain']
973 977
974 978 def test_user_expression():
975 979 # enable all formatters
976 980 ip.display_formatter.active_types = ip.display_formatter.format_types
977 981 query = {
978 982 'a' : '1 + 2',
979 983 'b' : '1/0',
980 984 }
981 985 r = ip.user_expressions(query)
982 986 import pprint
983 987 pprint.pprint(r)
984 988 assert set(r.keys()) == set(query.keys())
985 989 a = r["a"]
986 990 assert {"status", "data", "metadata"} == set(a.keys())
987 991 assert a["status"] == "ok"
988 992 data = a["data"]
989 993 metadata = a["metadata"]
990 994 assert data.get("text/plain") == "3"
991 995
992 996 b = r["b"]
993 997 assert b["status"] == "error"
994 998 assert b["ename"] == "ZeroDivisionError"
995 999
996 1000 # back to text only
997 1001 ip.display_formatter.active_types = ['text/plain']
998 1002
999 1003
1000 1004 class TestSyntaxErrorTransformer(unittest.TestCase):
1001 1005 """Check that SyntaxError raised by an input transformer is handled by run_cell()"""
1002 1006
1003 1007 @staticmethod
1004 1008 def transformer(lines):
1005 1009 for line in lines:
1006 1010 pos = line.find('syntaxerror')
1007 1011 if pos >= 0:
1008 1012 e = SyntaxError('input contains "syntaxerror"')
1009 1013 e.text = line
1010 1014 e.offset = pos + 1
1011 1015 raise e
1012 1016 return lines
1013 1017
1014 1018 def setUp(self):
1015 1019 ip.input_transformers_post.append(self.transformer)
1016 1020
1017 1021 def tearDown(self):
1018 1022 ip.input_transformers_post.remove(self.transformer)
1019 1023
1020 1024 def test_syntaxerror_input_transformer(self):
1021 1025 with tt.AssertPrints('1234'):
1022 1026 ip.run_cell('1234')
1023 1027 with tt.AssertPrints('SyntaxError: invalid syntax'):
1024 1028 ip.run_cell('1 2 3') # plain python syntax error
1025 1029 with tt.AssertPrints('SyntaxError: input contains "syntaxerror"'):
1026 1030 ip.run_cell('2345 # syntaxerror') # input transformer syntax error
1027 1031 with tt.AssertPrints('3456'):
1028 1032 ip.run_cell('3456')
1029 1033
1030 1034
1031 1035 class TestWarningSuppression(unittest.TestCase):
1032 1036 def test_warning_suppression(self):
1033 1037 ip.run_cell("import warnings")
1034 1038 try:
1035 1039 with self.assertWarnsRegex(UserWarning, "asdf"):
1036 1040 ip.run_cell("warnings.warn('asdf')")
1037 1041 # Here's the real test -- if we run that again, we should get the
1038 1042 # warning again. Traditionally, each warning was only issued once per
1039 1043 # IPython session (approximately), even if the user typed in new and
1040 1044 # different code that should have also triggered the warning, leading
1041 1045 # to much confusion.
1042 1046 with self.assertWarnsRegex(UserWarning, "asdf"):
1043 1047 ip.run_cell("warnings.warn('asdf')")
1044 1048 finally:
1045 1049 ip.run_cell("del warnings")
1046 1050
1047 1051
1048 1052 def test_deprecation_warning(self):
1049 1053 ip.run_cell("""
1050 1054 import warnings
1051 1055 def wrn():
1052 1056 warnings.warn(
1053 1057 "I AM A WARNING",
1054 1058 DeprecationWarning
1055 1059 )
1056 1060 """)
1057 1061 try:
1058 1062 with self.assertWarnsRegex(DeprecationWarning, "I AM A WARNING"):
1059 1063 ip.run_cell("wrn()")
1060 1064 finally:
1061 1065 ip.run_cell("del warnings")
1062 1066 ip.run_cell("del wrn")
1063 1067
1064 1068
1065 1069 class TestImportNoDeprecate(tt.TempFileMixin):
1066 1070
1067 1071 def setUp(self):
1068 1072 """Make a valid python temp file."""
1069 1073 self.mktmp("""
1070 1074 import warnings
1071 1075 def wrn():
1072 1076 warnings.warn(
1073 1077 "I AM A WARNING",
1074 1078 DeprecationWarning
1075 1079 )
1076 1080 """)
1077 1081 super().setUp()
1078 1082
1079 1083 def test_no_dep(self):
1080 1084 """
1081 1085 No deprecation warning should be raised from imported functions
1082 1086 """
1083 1087 ip.run_cell("from {} import wrn".format(self.fname))
1084 1088
1085 1089 with tt.AssertNotPrints("I AM A WARNING"):
1086 1090 ip.run_cell("wrn()")
1087 1091 ip.run_cell("del wrn")
1088 1092
1089 1093
1090 1094 def test_custom_exc_count():
1091 1095 hook = mock.Mock(return_value=None)
1092 1096 ip.set_custom_exc((SyntaxError,), hook)
1093 1097 before = ip.execution_count
1094 1098 ip.run_cell("def foo()", store_history=True)
1095 1099 # restore default excepthook
1096 1100 ip.set_custom_exc((), None)
1097 1101 assert hook.call_count == 1
1098 1102 assert ip.execution_count == before + 1
1099 1103
1100 1104
1101 1105 def test_run_cell_async():
1102 1106 ip.run_cell("import asyncio")
1103 1107 coro = ip.run_cell_async("await asyncio.sleep(0.01)\n5")
1104 1108 assert asyncio.iscoroutine(coro)
1105 1109 loop = asyncio.new_event_loop()
1106 1110 result = loop.run_until_complete(coro)
1107 1111 assert isinstance(result, interactiveshell.ExecutionResult)
1108 1112 assert result.result == 5
1109 1113
1110 1114
1111 1115 def test_run_cell_await():
1112 1116 ip.run_cell("import asyncio")
1113 1117 result = ip.run_cell("await asyncio.sleep(0.01); 10")
1114 1118 assert ip.user_ns["_"] == 10
1115 1119
1116 1120
1117 1121 def test_run_cell_asyncio_run():
1118 1122 ip.run_cell("import asyncio")
1119 1123 result = ip.run_cell("await asyncio.sleep(0.01); 1")
1120 1124 assert ip.user_ns["_"] == 1
1121 1125 result = ip.run_cell("asyncio.run(asyncio.sleep(0.01)); 2")
1122 1126 assert ip.user_ns["_"] == 2
1123 1127 result = ip.run_cell("await asyncio.sleep(0.01); 3")
1124 1128 assert ip.user_ns["_"] == 3
1125 1129
1126 1130
1127 1131 def test_should_run_async():
1128 1132 assert not ip.should_run_async("a = 5", transformed_cell="a = 5")
1129 1133 assert ip.should_run_async("await x", transformed_cell="await x")
1130 1134 assert ip.should_run_async(
1131 1135 "import asyncio; await asyncio.sleep(1)",
1132 1136 transformed_cell="import asyncio; await asyncio.sleep(1)",
1133 1137 )
1134 1138
1135 1139
1136 1140 def test_set_custom_completer():
1137 1141 num_completers = len(ip.Completer.matchers)
1138 1142
1139 1143 def foo(*args, **kwargs):
1140 1144 return "I'm a completer!"
1141 1145
1142 1146 ip.set_custom_completer(foo, 0)
1143 1147
1144 1148 # check that we've really added a new completer
1145 1149 assert len(ip.Completer.matchers) == num_completers + 1
1146 1150
1147 1151 # check that the first completer is the function we defined
1148 1152 assert ip.Completer.matchers[0]() == "I'm a completer!"
1149 1153
1150 1154 # clean up
1151 1155 ip.Completer.custom_matchers.pop()
1152 1156
1153 1157
1154 1158 class TestShowTracebackAttack(unittest.TestCase):
1155 1159 """Test that the interactive shell is resilient against the client attack of
1156 1160 manipulating the showtracebacks method. These attacks shouldn't result in an
1157 1161 unhandled exception in the kernel."""
1158 1162
1159 1163 def setUp(self):
1160 1164 self.orig_showtraceback = interactiveshell.InteractiveShell.showtraceback
1161 1165
1162 1166 def tearDown(self):
1163 1167 interactiveshell.InteractiveShell.showtraceback = self.orig_showtraceback
1164 1168
1165 1169 def test_set_show_tracebacks_none(self):
1166 1170 """Test the case of the client setting showtracebacks to None"""
1167 1171
1168 1172 result = ip.run_cell(
1169 1173 """
1170 1174 import IPython.core.interactiveshell
1171 1175 IPython.core.interactiveshell.InteractiveShell.showtraceback = None
1172 1176
1173 1177 assert False, "This should not raise an exception"
1174 1178 """
1175 1179 )
1176 1180 print(result)
1177 1181
1178 1182 assert result.result is None
1179 1183 assert isinstance(result.error_in_exec, TypeError)
1180 1184 assert str(result.error_in_exec) == "'NoneType' object is not callable"
1181 1185
1182 1186 def test_set_show_tracebacks_noop(self):
1183 1187 """Test the case of the client setting showtracebacks to a no op lambda"""
1184 1188
1185 1189 result = ip.run_cell(
1186 1190 """
1187 1191 import IPython.core.interactiveshell
1188 1192 IPython.core.interactiveshell.InteractiveShell.showtraceback = lambda *args, **kwargs: None
1189 1193
1190 1194 assert False, "This should not raise an exception"
1191 1195 """
1192 1196 )
1193 1197 print(result)
1194 1198
1195 1199 assert result.result is None
1196 1200 assert isinstance(result.error_in_exec, AssertionError)
1197 1201 assert str(result.error_in_exec) == "This should not raise an exception"
@@ -1,155 +1,161 b''
1 1 # coding: utf-8
2 2 """Tests for profile-related functions.
3 3
4 4 Currently only the startup-dir functionality is tested, but more tests should
5 5 be added for:
6 6
7 7 * ipython profile create
8 8 * ipython profile list
9 9 * ipython profile create --parallel
10 10 * security dir permissions
11 11
12 12 Authors
13 13 -------
14 14
15 15 * MinRK
16 16
17 17 """
18 18
19 19 #-----------------------------------------------------------------------------
20 20 # Imports
21 21 #-----------------------------------------------------------------------------
22 22
23 23 import shutil
24 24 import sys
25 25 import tempfile
26 26 from pathlib import Path
27 from tempfile import TemporaryDirectory
27 28 from unittest import TestCase
28 29
29 from tempfile import TemporaryDirectory
30 import pytest
30 31
31 32 from IPython.core.profileapp import list_bundled_profiles, list_profiles_in
32 33 from IPython.core.profiledir import ProfileDir
33 34 from IPython.testing import decorators as dec
34 35 from IPython.testing import tools as tt
35 36 from IPython.utils.process import getoutput
36 37
37 38 #-----------------------------------------------------------------------------
38 39 # Globals
39 40 #-----------------------------------------------------------------------------
40 41 TMP_TEST_DIR = Path(tempfile.mkdtemp())
41 42 HOME_TEST_DIR = TMP_TEST_DIR / "home_test_dir"
42 43 IP_TEST_DIR = HOME_TEST_DIR / ".ipython"
43 44
44 45 #
45 46 # Setup/teardown functions/decorators
46 47 #
47 48
48 49 def setup_module():
49 50 """Setup test environment for the module:
50 51
51 52 - Adds dummy home dir tree
52 53 """
53 54 # Do not mask exceptions here. In particular, catching WindowsError is a
54 55 # problem because that exception is only defined on Windows...
55 56 (Path.cwd() / IP_TEST_DIR).mkdir(parents=True)
56 57
57 58
58 59 def teardown_module():
59 60 """Teardown test environment for the module:
60 61
61 62 - Remove dummy home dir tree
62 63 """
63 64 # Note: we remove the parent test dir, which is the root of all test
64 65 # subdirs we may have created. Use shutil instead of os.removedirs, so
65 66 # that non-empty directories are all recursively removed.
66 67 shutil.rmtree(TMP_TEST_DIR)
67 68
68 69
69 70 #-----------------------------------------------------------------------------
70 71 # Test functions
71 72 #-----------------------------------------------------------------------------
72 73 class ProfileStartupTest(TestCase):
73 74 def setUp(self):
74 75 # create profile dir
75 76 self.pd = ProfileDir.create_profile_dir_by_name(IP_TEST_DIR, "test")
76 77 self.options = ["--ipython-dir", IP_TEST_DIR, "--profile", "test"]
77 78 self.fname = TMP_TEST_DIR / "test.py"
78 79
79 80 def tearDown(self):
80 81 # We must remove this profile right away so its presence doesn't
81 82 # confuse other tests.
82 83 shutil.rmtree(self.pd.location)
83 84
84 85 def init(self, startup_file, startup, test):
85 86 # write startup python file
86 87 with open(Path(self.pd.startup_dir) / startup_file, "w", encoding="utf-8") as f:
87 88 f.write(startup)
88 89 # write simple test file, to check that the startup file was run
89 90 with open(self.fname, "w", encoding="utf-8") as f:
90 91 f.write(test)
91 92
92 93 def validate(self, output):
93 94 tt.ipexec_validate(self.fname, output, "", options=self.options)
94 95
95 96 def test_startup_py(self):
96 97 self.init('00-start.py', 'zzz=123\n', 'print(zzz)\n')
97 98 self.validate('123')
98 99
99 100 def test_startup_ipy(self):
100 101 self.init('00-start.ipy', '%xmode plain\n', '')
101 102 self.validate('Exception reporting mode: Plain')
102 103
103 104
105 @pytest.mark.skipif(
106 sys.implementation.name == "pypy"
107 and ((7, 3, 13) < sys.implementation.version < (7, 3, 16)),
108 reason="Unicode issues with scandir on PyPy, see https://github.com/pypy/pypy/issues/4860",
109 )
104 110 def test_list_profiles_in():
105 111 # No need to remove these directories and files, as they will get nuked in
106 112 # the module-level teardown.
107 113 td = Path(tempfile.mkdtemp(dir=TMP_TEST_DIR))
108 114 for name in ("profile_foo", "profile_hello", "not_a_profile"):
109 115 Path(td / name).mkdir(parents=True)
110 116 if dec.unicode_paths:
111 117 Path(td / "profile_ΓΌnicode").mkdir(parents=True)
112 118
113 119 with open(td / "profile_file", "w", encoding="utf-8") as f:
114 120 f.write("I am not a profile directory")
115 121 profiles = list_profiles_in(td)
116 122
117 123 # unicode normalization can turn u'ΓΌnicode' into u'u\0308nicode',
118 124 # so only check for *nicode, and that creating a ProfileDir from the
119 125 # name remains valid
120 126 found_unicode = False
121 127 for p in list(profiles):
122 128 if p.endswith('nicode'):
123 129 pd = ProfileDir.find_profile_dir_by_name(td, p)
124 130 profiles.remove(p)
125 131 found_unicode = True
126 132 break
127 133 if dec.unicode_paths:
128 134 assert found_unicode is True
129 135 assert set(profiles) == {"foo", "hello"}
130 136
131 137
132 138 def test_list_bundled_profiles():
133 139 # This variable will need to be updated when a new profile gets bundled
134 140 bundled = sorted(list_bundled_profiles())
135 141 assert bundled == []
136 142
137 143
138 144 def test_profile_create_ipython_dir():
139 145 """ipython profile create respects --ipython-dir"""
140 146 with TemporaryDirectory() as td:
141 147 getoutput(
142 148 [
143 149 sys.executable,
144 150 "-m",
145 151 "IPython",
146 152 "profile",
147 153 "create",
148 154 "foo",
149 155 "--ipython-dir=%s" % td,
150 156 ]
151 157 )
152 158 profile_dir = Path(td) / "profile_foo"
153 159 assert Path(profile_dir).exists()
154 160 ipython_config = profile_dir / "ipython_config.py"
155 161 assert Path(ipython_config).exists()
@@ -1,60 +1,61 b''
1 1 # encoding: utf-8
2 2 """Tests for io.py"""
3 3
4 4 # Copyright (c) IPython Development Team.
5 5 # Distributed under the terms of the Modified BSD License.
6 6
7 7
8 8 import sys
9 9 from io import StringIO
10 10
11 11 import unittest
12 12
13 13 from IPython.utils.io import Tee, capture_output
14 14
15 15
16 16 def test_tee_simple():
17 17 "Very simple check with stdout only"
18 18 chan = StringIO()
19 19 text = 'Hello'
20 20 tee = Tee(chan, channel='stdout')
21 21 print(text, file=chan)
22 22 assert chan.getvalue() == text + "\n"
23 tee.close()
23 24
24 25
25 26 class TeeTestCase(unittest.TestCase):
26 27
27 28 def tchan(self, channel):
28 29 trap = StringIO()
29 30 chan = StringIO()
30 31 text = 'Hello'
31 32
32 33 std_ori = getattr(sys, channel)
33 34 setattr(sys, channel, trap)
34 35
35 36 tee = Tee(chan, channel=channel)
36 37
37 38 print(text, end='', file=chan)
38 39 trap_val = trap.getvalue()
39 40 self.assertEqual(chan.getvalue(), text)
40 41
41 42 tee.close()
42 43
43 44 setattr(sys, channel, std_ori)
44 45 assert getattr(sys, channel) == std_ori
45 46
46 47 def test(self):
47 48 for chan in ['stdout', 'stderr']:
48 49 self.tchan(chan)
49 50
50 51 class TestIOStream(unittest.TestCase):
51 52
52 53 def test_capture_output(self):
53 54 """capture_output() context works"""
54 55
55 56 with capture_output() as io:
56 57 print("hi, stdout")
57 58 print("hi, stderr", file=sys.stderr)
58 59
59 60 self.assertEqual(io.stdout, "hi, stdout\n")
60 61 self.assertEqual(io.stderr, "hi, stderr\n")
General Comments 0
You need to be logged in to leave comments. Login now