##// END OF EJS Templates
Test process exit codes with terminating signal
Thomas Kluyver -
Show More
@@ -1,659 +1,682 b''
1 1 # -*- coding: utf-8 -*-
2 2 """Tests for the key interactiveshell module.
3 3
4 4 Historically the main classes in interactiveshell have been under-tested. This
5 5 module should grow as many single-method tests as possible to trap many of the
6 6 recurring bugs we seem to encounter with high-level interaction.
7 7
8 8 Authors
9 9 -------
10 10 * Fernando Perez
11 11 """
12 12 #-----------------------------------------------------------------------------
13 13 # Copyright (C) 2011 The IPython Development Team
14 14 #
15 15 # Distributed under the terms of the BSD License. The full license is in
16 16 # the file COPYING, distributed as part of this software.
17 17 #-----------------------------------------------------------------------------
18 18
19 19 #-----------------------------------------------------------------------------
20 20 # Imports
21 21 #-----------------------------------------------------------------------------
22 22 # stdlib
23 23 import ast
24 24 import os
25 import signal
25 26 import shutil
26 27 import sys
27 28 import tempfile
28 29 import unittest
29 30 from os.path import join
30 31 from StringIO import StringIO
31 32
32 33 # third-party
33 34 import nose.tools as nt
34 35
35 36 # Our own
36 37 from IPython.testing.decorators import skipif, skip_win32, onlyif_unicode_paths
37 38 from IPython.testing import tools as tt
38 39 from IPython.utils import io
39 40
40 41 #-----------------------------------------------------------------------------
41 42 # Globals
42 43 #-----------------------------------------------------------------------------
43 44 # This is used by every single test, no point repeating it ad nauseam
44 45 ip = get_ipython()
45 46
46 47 #-----------------------------------------------------------------------------
47 48 # Tests
48 49 #-----------------------------------------------------------------------------
49 50
50 51 class InteractiveShellTestCase(unittest.TestCase):
51 52 def test_naked_string_cells(self):
52 53 """Test that cells with only naked strings are fully executed"""
53 54 # First, single-line inputs
54 55 ip.run_cell('"a"\n')
55 56 self.assertEqual(ip.user_ns['_'], 'a')
56 57 # And also multi-line cells
57 58 ip.run_cell('"""a\nb"""\n')
58 59 self.assertEqual(ip.user_ns['_'], 'a\nb')
59 60
60 61 def test_run_empty_cell(self):
61 62 """Just make sure we don't get a horrible error with a blank
62 63 cell of input. Yes, I did overlook that."""
63 64 old_xc = ip.execution_count
64 65 ip.run_cell('')
65 66 self.assertEqual(ip.execution_count, old_xc)
66 67
67 68 def test_run_cell_multiline(self):
68 69 """Multi-block, multi-line cells must execute correctly.
69 70 """
70 71 src = '\n'.join(["x=1",
71 72 "y=2",
72 73 "if 1:",
73 74 " x += 1",
74 75 " y += 1",])
75 76 ip.run_cell(src)
76 77 self.assertEqual(ip.user_ns['x'], 2)
77 78 self.assertEqual(ip.user_ns['y'], 3)
78 79
79 80 def test_multiline_string_cells(self):
80 81 "Code sprinkled with multiline strings should execute (GH-306)"
81 82 ip.run_cell('tmp=0')
82 83 self.assertEqual(ip.user_ns['tmp'], 0)
83 84 ip.run_cell('tmp=1;"""a\nb"""\n')
84 85 self.assertEqual(ip.user_ns['tmp'], 1)
85 86
86 87 def test_dont_cache_with_semicolon(self):
87 88 "Ending a line with semicolon should not cache the returned object (GH-307)"
88 89 oldlen = len(ip.user_ns['Out'])
89 90 a = ip.run_cell('1;', store_history=True)
90 91 newlen = len(ip.user_ns['Out'])
91 92 self.assertEqual(oldlen, newlen)
92 93 #also test the default caching behavior
93 94 ip.run_cell('1', store_history=True)
94 95 newlen = len(ip.user_ns['Out'])
95 96 self.assertEqual(oldlen+1, newlen)
96 97
97 98 def test_In_variable(self):
98 99 "Verify that In variable grows with user input (GH-284)"
99 100 oldlen = len(ip.user_ns['In'])
100 101 ip.run_cell('1;', store_history=True)
101 102 newlen = len(ip.user_ns['In'])
102 103 self.assertEqual(oldlen+1, newlen)
103 104 self.assertEqual(ip.user_ns['In'][-1],'1;')
104 105
105 106 def test_magic_names_in_string(self):
106 107 ip.run_cell('a = """\n%exit\n"""')
107 108 self.assertEqual(ip.user_ns['a'], '\n%exit\n')
108 109
109 110 def test_alias_crash(self):
110 111 """Errors in prefilter can't crash IPython"""
111 112 ip.run_cell('%alias parts echo first %s second %s')
112 113 # capture stderr:
113 114 save_err = io.stderr
114 115 io.stderr = StringIO()
115 116 ip.run_cell('parts 1')
116 117 err = io.stderr.getvalue()
117 118 io.stderr = save_err
118 119 self.assertEqual(err.split(':')[0], 'ERROR')
119 120
120 121 def test_trailing_newline(self):
121 122 """test that running !(command) does not raise a SyntaxError"""
122 123 ip.run_cell('!(true)\n', False)
123 124 ip.run_cell('!(true)\n\n\n', False)
124 125
125 126 def test_gh_597(self):
126 127 """Pretty-printing lists of objects with non-ascii reprs may cause
127 128 problems."""
128 129 class Spam(object):
129 130 def __repr__(self):
130 131 return "\xe9"*50
131 132 import IPython.core.formatters
132 133 f = IPython.core.formatters.PlainTextFormatter()
133 134 f([Spam(),Spam()])
134 135
135 136
136 137 def test_future_flags(self):
137 138 """Check that future flags are used for parsing code (gh-777)"""
138 139 ip.run_cell('from __future__ import print_function')
139 140 try:
140 141 ip.run_cell('prfunc_return_val = print(1,2, sep=" ")')
141 142 assert 'prfunc_return_val' in ip.user_ns
142 143 finally:
143 144 # Reset compiler flags so we don't mess up other tests.
144 145 ip.compile.reset_compiler_flags()
145 146
146 147 def test_future_unicode(self):
147 148 """Check that unicode_literals is imported from __future__ (gh #786)"""
148 149 try:
149 150 ip.run_cell(u'byte_str = "a"')
150 151 assert isinstance(ip.user_ns['byte_str'], str) # string literals are byte strings by default
151 152 ip.run_cell('from __future__ import unicode_literals')
152 153 ip.run_cell(u'unicode_str = "a"')
153 154 assert isinstance(ip.user_ns['unicode_str'], unicode) # strings literals are now unicode
154 155 finally:
155 156 # Reset compiler flags so we don't mess up other tests.
156 157 ip.compile.reset_compiler_flags()
157 158
158 159 def test_can_pickle(self):
159 160 "Can we pickle objects defined interactively (GH-29)"
160 161 ip = get_ipython()
161 162 ip.reset()
162 163 ip.run_cell(("class Mylist(list):\n"
163 164 " def __init__(self,x=[]):\n"
164 165 " list.__init__(self,x)"))
165 166 ip.run_cell("w=Mylist([1,2,3])")
166 167
167 168 from cPickle import dumps
168 169
169 170 # We need to swap in our main module - this is only necessary
170 171 # inside the test framework, because IPython puts the interactive module
171 172 # in place (but the test framework undoes this).
172 173 _main = sys.modules['__main__']
173 174 sys.modules['__main__'] = ip.user_module
174 175 try:
175 176 res = dumps(ip.user_ns["w"])
176 177 finally:
177 178 sys.modules['__main__'] = _main
178 179 self.assertTrue(isinstance(res, bytes))
179 180
180 181 def test_global_ns(self):
181 182 "Code in functions must be able to access variables outside them."
182 183 ip = get_ipython()
183 184 ip.run_cell("a = 10")
184 185 ip.run_cell(("def f(x):\n"
185 186 " return x + a"))
186 187 ip.run_cell("b = f(12)")
187 188 self.assertEqual(ip.user_ns["b"], 22)
188 189
189 190 def test_bad_custom_tb(self):
190 191 """Check that InteractiveShell is protected from bad custom exception handlers"""
191 192 from IPython.utils import io
192 193 save_stderr = io.stderr
193 194 try:
194 195 # capture stderr
195 196 io.stderr = StringIO()
196 197 ip.set_custom_exc((IOError,), lambda etype,value,tb: 1/0)
197 198 self.assertEqual(ip.custom_exceptions, (IOError,))
198 199 ip.run_cell(u'raise IOError("foo")')
199 200 self.assertEqual(ip.custom_exceptions, ())
200 201 self.assertTrue("Custom TB Handler failed" in io.stderr.getvalue())
201 202 finally:
202 203 io.stderr = save_stderr
203 204
204 205 def test_bad_custom_tb_return(self):
205 206 """Check that InteractiveShell is protected from bad return types in custom exception handlers"""
206 207 from IPython.utils import io
207 208 save_stderr = io.stderr
208 209 try:
209 210 # capture stderr
210 211 io.stderr = StringIO()
211 212 ip.set_custom_exc((NameError,),lambda etype,value,tb, tb_offset=None: 1)
212 213 self.assertEqual(ip.custom_exceptions, (NameError,))
213 214 ip.run_cell(u'a=abracadabra')
214 215 self.assertEqual(ip.custom_exceptions, ())
215 216 self.assertTrue("Custom TB Handler failed" in io.stderr.getvalue())
216 217 finally:
217 218 io.stderr = save_stderr
218 219
219 220 def test_drop_by_id(self):
220 221 myvars = {"a":object(), "b":object(), "c": object()}
221 222 ip.push(myvars, interactive=False)
222 223 for name in myvars:
223 224 assert name in ip.user_ns, name
224 225 assert name in ip.user_ns_hidden, name
225 226 ip.user_ns['b'] = 12
226 227 ip.drop_by_id(myvars)
227 228 for name in ["a", "c"]:
228 229 assert name not in ip.user_ns, name
229 230 assert name not in ip.user_ns_hidden, name
230 231 assert ip.user_ns['b'] == 12
231 232 ip.reset()
232 233
233 234 def test_var_expand(self):
234 235 ip.user_ns['f'] = u'Ca\xf1o'
235 236 self.assertEqual(ip.var_expand(u'echo $f'), u'echo Ca\xf1o')
236 237 self.assertEqual(ip.var_expand(u'echo {f}'), u'echo Ca\xf1o')
237 238 self.assertEqual(ip.var_expand(u'echo {f[:-1]}'), u'echo Ca\xf1')
238 239 self.assertEqual(ip.var_expand(u'echo {1*2}'), u'echo 2')
239 240
240 241 ip.user_ns['f'] = b'Ca\xc3\xb1o'
241 242 # This should not raise any exception:
242 243 ip.var_expand(u'echo $f')
243 244
244 245 def test_var_expand_local(self):
245 246 """Test local variable expansion in !system and %magic calls"""
246 247 # !system
247 248 ip.run_cell('def test():\n'
248 249 ' lvar = "ttt"\n'
249 250 ' ret = !echo {lvar}\n'
250 251 ' return ret[0]\n')
251 252 res = ip.user_ns['test']()
252 253 nt.assert_in('ttt', res)
253 254
254 255 # %magic
255 256 ip.run_cell('def makemacro():\n'
256 257 ' macroname = "macro_var_expand_locals"\n'
257 258 ' %macro {macroname} codestr\n')
258 259 ip.user_ns['codestr'] = "str(12)"
259 260 ip.run_cell('makemacro()')
260 261 nt.assert_in('macro_var_expand_locals', ip.user_ns)
261 262
262 263 def test_var_expand_self(self):
263 264 """Test variable expansion with the name 'self', which was failing.
264 265
265 266 See https://github.com/ipython/ipython/issues/1878#issuecomment-7698218
266 267 """
267 268 ip.run_cell('class cTest:\n'
268 269 ' classvar="see me"\n'
269 270 ' def test(self):\n'
270 271 ' res = !echo Variable: {self.classvar}\n'
271 272 ' return res[0]\n')
272 273 nt.assert_in('see me', ip.user_ns['cTest']().test())
273 274
274 275 def test_bad_var_expand(self):
275 276 """var_expand on invalid formats shouldn't raise"""
276 277 # SyntaxError
277 278 self.assertEqual(ip.var_expand(u"{'a':5}"), u"{'a':5}")
278 279 # NameError
279 280 self.assertEqual(ip.var_expand(u"{asdf}"), u"{asdf}")
280 281 # ZeroDivisionError
281 282 self.assertEqual(ip.var_expand(u"{1/0}"), u"{1/0}")
282 283
283 284 def test_silent_nopostexec(self):
284 285 """run_cell(silent=True) doesn't invoke post-exec funcs"""
285 286 d = dict(called=False)
286 287 def set_called():
287 288 d['called'] = True
288 289
289 290 ip.register_post_execute(set_called)
290 291 ip.run_cell("1", silent=True)
291 292 self.assertFalse(d['called'])
292 293 # double-check that non-silent exec did what we expected
293 294 # silent to avoid
294 295 ip.run_cell("1")
295 296 self.assertTrue(d['called'])
296 297 # remove post-exec
297 298 ip._post_execute.pop(set_called)
298 299
299 300 def test_silent_noadvance(self):
300 301 """run_cell(silent=True) doesn't advance execution_count"""
301 302 ec = ip.execution_count
302 303 # silent should force store_history=False
303 304 ip.run_cell("1", store_history=True, silent=True)
304 305
305 306 self.assertEqual(ec, ip.execution_count)
306 307 # double-check that non-silent exec did what we expected
307 308 # silent to avoid
308 309 ip.run_cell("1", store_history=True)
309 310 self.assertEqual(ec+1, ip.execution_count)
310 311
311 312 def test_silent_nodisplayhook(self):
312 313 """run_cell(silent=True) doesn't trigger displayhook"""
313 314 d = dict(called=False)
314 315
315 316 trap = ip.display_trap
316 317 save_hook = trap.hook
317 318
318 319 def failing_hook(*args, **kwargs):
319 320 d['called'] = True
320 321
321 322 try:
322 323 trap.hook = failing_hook
323 324 ip.run_cell("1", silent=True)
324 325 self.assertFalse(d['called'])
325 326 # double-check that non-silent exec did what we expected
326 327 # silent to avoid
327 328 ip.run_cell("1")
328 329 self.assertTrue(d['called'])
329 330 finally:
330 331 trap.hook = save_hook
331 332
332 333 @skipif(sys.version_info[0] >= 3, "softspace removed in py3")
333 334 def test_print_softspace(self):
334 335 """Verify that softspace is handled correctly when executing multiple
335 336 statements.
336 337
337 338 In [1]: print 1; print 2
338 339 1
339 340 2
340 341
341 342 In [2]: print 1,; print 2
342 343 1 2
343 344 """
344 345
345 346 def test_ofind_line_magic(self):
346 347 from IPython.core.magic import register_line_magic
347 348
348 349 @register_line_magic
349 350 def lmagic(line):
350 351 "A line magic"
351 352
352 353 # Get info on line magic
353 354 lfind = ip._ofind('lmagic')
354 355 info = dict(found=True, isalias=False, ismagic=True,
355 356 namespace = 'IPython internal', obj= lmagic.__wrapped__,
356 357 parent = None)
357 358 nt.assert_equal(lfind, info)
358 359
359 360 def test_ofind_cell_magic(self):
360 361 from IPython.core.magic import register_cell_magic
361 362
362 363 @register_cell_magic
363 364 def cmagic(line, cell):
364 365 "A cell magic"
365 366
366 367 # Get info on cell magic
367 368 find = ip._ofind('cmagic')
368 369 info = dict(found=True, isalias=False, ismagic=True,
369 370 namespace = 'IPython internal', obj= cmagic.__wrapped__,
370 371 parent = None)
371 372 nt.assert_equal(find, info)
372 373
373 374 def test_custom_exception(self):
374 375 called = []
375 376 def my_handler(shell, etype, value, tb, tb_offset=None):
376 377 called.append(etype)
377 378 shell.showtraceback((etype, value, tb), tb_offset=tb_offset)
378 379
379 380 ip.set_custom_exc((ValueError,), my_handler)
380 381 try:
381 382 ip.run_cell("raise ValueError('test')")
382 383 # Check that this was called, and only once.
383 384 self.assertEqual(called, [ValueError])
384 385 finally:
385 386 # Reset the custom exception hook
386 387 ip.set_custom_exc((), None)
387 388
388 389 @skipif(sys.version_info[0] >= 3, "no differences with __future__ in py3")
389 390 def test_future_environment(self):
390 391 "Can we run code with & without the shell's __future__ imports?"
391 392 ip.run_cell("from __future__ import division")
392 393 ip.run_cell("a = 1/2", shell_futures=True)
393 394 self.assertEqual(ip.user_ns['a'], 0.5)
394 395 ip.run_cell("b = 1/2", shell_futures=False)
395 396 self.assertEqual(ip.user_ns['b'], 0)
396 397
397 398 ip.compile.reset_compiler_flags()
398 399 # This shouldn't leak to the shell's compiler
399 400 ip.run_cell("from __future__ import division \nc=1/2", shell_futures=False)
400 401 self.assertEqual(ip.user_ns['c'], 0.5)
401 402 ip.run_cell("d = 1/2", shell_futures=True)
402 403 self.assertEqual(ip.user_ns['d'], 0)
403 404
404 405
405 406 class TestSafeExecfileNonAsciiPath(unittest.TestCase):
406 407
407 408 @onlyif_unicode_paths
408 409 def setUp(self):
409 410 self.BASETESTDIR = tempfile.mkdtemp()
410 411 self.TESTDIR = join(self.BASETESTDIR, u"Γ₯Àâ")
411 412 os.mkdir(self.TESTDIR)
412 413 with open(join(self.TESTDIR, u"Γ₯Àâtestscript.py"), "w") as sfile:
413 414 sfile.write("pass\n")
414 415 self.oldpath = os.getcwdu()
415 416 os.chdir(self.TESTDIR)
416 417 self.fname = u"Γ₯Àâtestscript.py"
417 418
418 419 def tearDown(self):
419 420 os.chdir(self.oldpath)
420 421 shutil.rmtree(self.BASETESTDIR)
421 422
422 423 @onlyif_unicode_paths
423 424 def test_1(self):
424 425 """Test safe_execfile with non-ascii path
425 426 """
426 427 ip.safe_execfile(self.fname, {}, raise_exceptions=True)
427 428
429 class ExitCodeChecks(tt.TempFileMixin):
430 def test_exit_code_ok(self):
431 self.system('exit 0')
432 self.assertEqual(ip.user_ns['_exit_code'], 0)
433
434 def test_exit_code_error(self):
435 self.system('exit 1')
436 self.assertEqual(ip.user_ns['_exit_code'], 1)
437
438 @skipif(not hasattr(signal, 'SIGALRM'))
439 def test_exit_code_signal(self):
440 self.mktmp("import signal, time\n"
441 "signal.setitimer(signal.ITIMER_REAL, 0.1)\n"
442 "time.sleep(1)\n")
443 self.system("%s %s" % (sys.executable, self.fname))
444 self.assertEqual(ip.user_ns['_exit_code'], -signal.SIGALRM)
445
446 class TestSystemRaw(unittest.TestCase, ExitCodeChecks):
447 system = ip.system_raw
428 448
429 class TestSystemRaw(unittest.TestCase):
430 449 @onlyif_unicode_paths
431 450 def test_1(self):
432 451 """Test system_raw with non-ascii cmd
433 452 """
434 cmd = ur'''python -c "'Γ₯Àâ'" '''
453 cmd = u'''python -c "'Γ₯Àâ'" '''
435 454 ip.system_raw(cmd)
436 455
437 def test_exit_code(self):
438 """Test that the exit code is parsed correctly."""
439 ip.system_raw('exit 1')
440 self.assertEqual(ip.user_ns['_exit_code'], 1)
441
442 class TestSystemPiped(unittest.TestCase):
443 456 # TODO: Exit codes are currently ignored on Windows.
457 class TestSystemPipedExitCode(unittest.TestCase, ExitCodeChecks):
458 system = ip.system_piped
459
444 460 @skip_win32
445 def test_exit_code(self):
446 ip.system_piped('exit 1')
447 self.assertEqual(ip.user_ns['_exit_code'], 1)
461 def test_exit_code_ok(self):
462 ExitCodeChecks.test_exit_code_ok(self)
463
464 @skip_win32
465 def test_exit_code_error(self):
466 ExitCodeChecks.test_exit_code_error(self)
467
468 @skip_win32
469 def test_exit_code_signal(self):
470 ExitCodeChecks.test_exit_code_signal(self)
448 471
449 472 class TestModules(unittest.TestCase, tt.TempFileMixin):
450 473 def test_extraneous_loads(self):
451 474 """Test we're not loading modules on startup that we shouldn't.
452 475 """
453 476 self.mktmp("import sys\n"
454 477 "print('numpy' in sys.modules)\n"
455 478 "print('IPython.parallel' in sys.modules)\n"
456 479 "print('IPython.kernel.zmq' in sys.modules)\n"
457 480 )
458 481 out = "False\nFalse\nFalse\n"
459 482 tt.ipexec_validate(self.fname, out)
460 483
461 484 class Negator(ast.NodeTransformer):
462 485 """Negates all number literals in an AST."""
463 486 def visit_Num(self, node):
464 487 node.n = -node.n
465 488 return node
466 489
467 490 class TestAstTransform(unittest.TestCase):
468 491 def setUp(self):
469 492 self.negator = Negator()
470 493 ip.ast_transformers.append(self.negator)
471 494
472 495 def tearDown(self):
473 496 ip.ast_transformers.remove(self.negator)
474 497
475 498 def test_run_cell(self):
476 499 with tt.AssertPrints('-34'):
477 500 ip.run_cell('print (12 + 22)')
478 501
479 502 # A named reference to a number shouldn't be transformed.
480 503 ip.user_ns['n'] = 55
481 504 with tt.AssertNotPrints('-55'):
482 505 ip.run_cell('print (n)')
483 506
484 507 def test_timeit(self):
485 508 called = set()
486 509 def f(x):
487 510 called.add(x)
488 511 ip.push({'f':f})
489 512
490 513 with tt.AssertPrints("best of "):
491 514 ip.run_line_magic("timeit", "-n1 f(1)")
492 515 self.assertEqual(called, set([-1]))
493 516 called.clear()
494 517
495 518 with tt.AssertPrints("best of "):
496 519 ip.run_cell_magic("timeit", "-n1 f(2)", "f(3)")
497 520 self.assertEqual(called, set([-2, -3]))
498 521
499 522 def test_time(self):
500 523 called = []
501 524 def f(x):
502 525 called.append(x)
503 526 ip.push({'f':f})
504 527
505 528 # Test with an expression
506 529 with tt.AssertPrints("Wall time: "):
507 530 ip.run_line_magic("time", "f(5+9)")
508 531 self.assertEqual(called, [-14])
509 532 called[:] = []
510 533
511 534 # Test with a statement (different code path)
512 535 with tt.AssertPrints("Wall time: "):
513 536 ip.run_line_magic("time", "a = f(-3 + -2)")
514 537 self.assertEqual(called, [5])
515 538
516 539 def test_macro(self):
517 540 ip.push({'a':10})
518 541 # The AST transformation makes this do a+=-1
519 542 ip.define_macro("amacro", "a+=1\nprint(a)")
520 543
521 544 with tt.AssertPrints("9"):
522 545 ip.run_cell("amacro")
523 546 with tt.AssertPrints("8"):
524 547 ip.run_cell("amacro")
525 548
526 549 class IntegerWrapper(ast.NodeTransformer):
527 550 """Wraps all integers in a call to Integer()"""
528 551 def visit_Num(self, node):
529 552 if isinstance(node.n, int):
530 553 return ast.Call(func=ast.Name(id='Integer', ctx=ast.Load()),
531 554 args=[node], keywords=[])
532 555 return node
533 556
534 557 class TestAstTransform2(unittest.TestCase):
535 558 def setUp(self):
536 559 self.intwrapper = IntegerWrapper()
537 560 ip.ast_transformers.append(self.intwrapper)
538 561
539 562 self.calls = []
540 563 def Integer(*args):
541 564 self.calls.append(args)
542 565 return args
543 566 ip.push({"Integer": Integer})
544 567
545 568 def tearDown(self):
546 569 ip.ast_transformers.remove(self.intwrapper)
547 570 del ip.user_ns['Integer']
548 571
549 572 def test_run_cell(self):
550 573 ip.run_cell("n = 2")
551 574 self.assertEqual(self.calls, [(2,)])
552 575
553 576 # This shouldn't throw an error
554 577 ip.run_cell("o = 2.0")
555 578 self.assertEqual(ip.user_ns['o'], 2.0)
556 579
557 580 def test_timeit(self):
558 581 called = set()
559 582 def f(x):
560 583 called.add(x)
561 584 ip.push({'f':f})
562 585
563 586 with tt.AssertPrints("best of "):
564 587 ip.run_line_magic("timeit", "-n1 f(1)")
565 588 self.assertEqual(called, set([(1,)]))
566 589 called.clear()
567 590
568 591 with tt.AssertPrints("best of "):
569 592 ip.run_cell_magic("timeit", "-n1 f(2)", "f(3)")
570 593 self.assertEqual(called, set([(2,), (3,)]))
571 594
572 595 class ErrorTransformer(ast.NodeTransformer):
573 596 """Throws an error when it sees a number."""
574 597 def visit_Num(self):
575 598 raise ValueError("test")
576 599
577 600 class TestAstTransformError(unittest.TestCase):
578 601 def test_unregistering(self):
579 602 err_transformer = ErrorTransformer()
580 603 ip.ast_transformers.append(err_transformer)
581 604
582 605 with tt.AssertPrints("unregister", channel='stderr'):
583 606 ip.run_cell("1 + 2")
584 607
585 608 # This should have been removed.
586 609 nt.assert_not_in(err_transformer, ip.ast_transformers)
587 610
588 611 def test__IPYTHON__():
589 612 # This shouldn't raise a NameError, that's all
590 613 __IPYTHON__
591 614
592 615
593 616 class DummyRepr(object):
594 617 def __repr__(self):
595 618 return "DummyRepr"
596 619
597 620 def _repr_html_(self):
598 621 return "<b>dummy</b>"
599 622
600 623 def _repr_javascript_(self):
601 624 return "console.log('hi');", {'key': 'value'}
602 625
603 626
604 627 def test_user_variables():
605 628 # enable all formatters
606 629 ip.display_formatter.active_types = ip.display_formatter.format_types
607 630
608 631 ip.user_ns['dummy'] = d = DummyRepr()
609 632 keys = set(['dummy', 'doesnotexist'])
610 633 r = ip.user_variables(keys)
611 634
612 635 nt.assert_equal(keys, set(r.keys()))
613 636 dummy = r['dummy']
614 637 nt.assert_equal(set(['status', 'data', 'metadata']), set(dummy.keys()))
615 638 nt.assert_equal(dummy['status'], 'ok')
616 639 data = dummy['data']
617 640 metadata = dummy['metadata']
618 641 nt.assert_equal(data.get('text/html'), d._repr_html_())
619 642 js, jsmd = d._repr_javascript_()
620 643 nt.assert_equal(data.get('application/javascript'), js)
621 644 nt.assert_equal(metadata.get('application/javascript'), jsmd)
622 645
623 646 dne = r['doesnotexist']
624 647 nt.assert_equal(dne['status'], 'error')
625 648 nt.assert_equal(dne['ename'], 'KeyError')
626 649
627 650 # back to text only
628 651 ip.display_formatter.active_types = ['text/plain']
629 652
630 653 def test_user_expression():
631 654 # enable all formatters
632 655 ip.display_formatter.active_types = ip.display_formatter.format_types
633 656 query = {
634 657 'a' : '1 + 2',
635 658 'b' : '1/0',
636 659 }
637 660 r = ip.user_expressions(query)
638 661 import pprint
639 662 pprint.pprint(r)
640 663 nt.assert_equal(r.keys(), query.keys())
641 664 a = r['a']
642 665 nt.assert_equal(set(['status', 'data', 'metadata']), set(a.keys()))
643 666 nt.assert_equal(a['status'], 'ok')
644 667 data = a['data']
645 668 metadata = a['metadata']
646 669 nt.assert_equal(data.get('text/plain'), '3')
647 670
648 671 b = r['b']
649 672 nt.assert_equal(b['status'], 'error')
650 673 nt.assert_equal(b['ename'], 'ZeroDivisionError')
651 674
652 675 # back to text only
653 676 ip.display_formatter.active_types = ['text/plain']
654 677
655 678
656 679
657 680
658 681
659 682
@@ -1,204 +1,204 b''
1 1 """Posix-specific implementation of process utilities.
2 2
3 3 This file is only meant to be imported by process.py, not by end-users.
4 4 """
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2010-2011 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 # Stdlib
19 19 import subprocess as sp
20 20 import sys
21 21
22 22 from IPython.external import pexpect
23 23
24 24 # Our own
25 25 from ._process_common import getoutput, arg_split
26 26 from IPython.utils import py3compat
27 27 from IPython.utils.encoding import DEFAULT_ENCODING
28 28
29 29 #-----------------------------------------------------------------------------
30 30 # Function definitions
31 31 #-----------------------------------------------------------------------------
32 32
33 33 def _find_cmd(cmd):
34 34 """Find the full path to a command using which."""
35 35
36 36 path = sp.Popen(['/usr/bin/env', 'which', cmd],
37 37 stdout=sp.PIPE, stderr=sp.PIPE).communicate()[0]
38 38 return py3compat.bytes_to_str(path)
39 39
40 40
41 41 class ProcessHandler(object):
42 42 """Execute subprocesses under the control of pexpect.
43 43 """
44 44 # Timeout in seconds to wait on each reading of the subprocess' output.
45 45 # This should not be set too low to avoid cpu overusage from our side,
46 46 # since we read in a loop whose period is controlled by this timeout.
47 47 read_timeout = 0.05
48 48
49 49 # Timeout to give a process if we receive SIGINT, between sending the
50 50 # SIGINT to the process and forcefully terminating it.
51 51 terminate_timeout = 0.2
52 52
53 53 # File object where stdout and stderr of the subprocess will be written
54 54 logfile = None
55 55
56 56 # Shell to call for subprocesses to execute
57 57 _sh = None
58 58
59 59 @property
60 60 def sh(self):
61 61 if self._sh is None:
62 62 self._sh = pexpect.which('sh')
63 63 if self._sh is None:
64 64 raise OSError('"sh" shell not found')
65 65
66 66 return self._sh
67 67
68 68 def __init__(self, logfile=None, read_timeout=None, terminate_timeout=None):
69 69 """Arguments are used for pexpect calls."""
70 70 self.read_timeout = (ProcessHandler.read_timeout if read_timeout is
71 71 None else read_timeout)
72 72 self.terminate_timeout = (ProcessHandler.terminate_timeout if
73 73 terminate_timeout is None else
74 74 terminate_timeout)
75 75 self.logfile = sys.stdout if logfile is None else logfile
76 76
77 77 def getoutput(self, cmd):
78 78 """Run a command and return its stdout/stderr as a string.
79 79
80 80 Parameters
81 81 ----------
82 82 cmd : str
83 83 A command to be executed in the system shell.
84 84
85 85 Returns
86 86 -------
87 87 output : str
88 88 A string containing the combination of stdout and stderr from the
89 89 subprocess, in whatever order the subprocess originally wrote to its
90 90 file descriptors (so the order of the information in this string is the
91 91 correct order as would be seen if running the command in a terminal).
92 92 """
93 93 try:
94 94 return pexpect.run(self.sh, args=['-c', cmd]).replace('\r\n', '\n')
95 95 except KeyboardInterrupt:
96 96 print('^C', file=sys.stderr, end='')
97 97
98 98 def getoutput_pexpect(self, cmd):
99 99 """Run a command and return its stdout/stderr as a string.
100 100
101 101 Parameters
102 102 ----------
103 103 cmd : str
104 104 A command to be executed in the system shell.
105 105
106 106 Returns
107 107 -------
108 108 output : str
109 109 A string containing the combination of stdout and stderr from the
110 110 subprocess, in whatever order the subprocess originally wrote to its
111 111 file descriptors (so the order of the information in this string is the
112 112 correct order as would be seen if running the command in a terminal).
113 113 """
114 114 try:
115 115 return pexpect.run(self.sh, args=['-c', cmd]).replace('\r\n', '\n')
116 116 except KeyboardInterrupt:
117 117 print('^C', file=sys.stderr, end='')
118 118
119 119 def system(self, cmd):
120 120 """Execute a command in a subshell.
121 121
122 122 Parameters
123 123 ----------
124 124 cmd : str
125 125 A command to be executed in the system shell.
126 126
127 127 Returns
128 128 -------
129 129 int : child's exitstatus
130 130 """
131 131 # Get likely encoding for the output.
132 132 enc = DEFAULT_ENCODING
133 133
134 134 # Patterns to match on the output, for pexpect. We read input and
135 135 # allow either a short timeout or EOF
136 136 patterns = [pexpect.TIMEOUT, pexpect.EOF]
137 137 # the index of the EOF pattern in the list.
138 138 # even though we know it's 1, this call means we don't have to worry if
139 139 # we change the above list, and forget to change this value:
140 140 EOF_index = patterns.index(pexpect.EOF)
141 141 # The size of the output stored so far in the process output buffer.
142 142 # Since pexpect only appends to this buffer, each time we print we
143 143 # record how far we've printed, so that next time we only print *new*
144 144 # content from the buffer.
145 145 out_size = 0
146 146 try:
147 147 # Since we're not really searching the buffer for text patterns, we
148 148 # can set pexpect's search window to be tiny and it won't matter.
149 149 # We only search for the 'patterns' timeout or EOF, which aren't in
150 150 # the text itself.
151 151 #child = pexpect.spawn(pcmd, searchwindowsize=1)
152 152 if hasattr(pexpect, 'spawnb'):
153 153 child = pexpect.spawnb(self.sh, args=['-c', cmd]) # Pexpect-U
154 154 else:
155 155 child = pexpect.spawn(self.sh, args=['-c', cmd]) # Vanilla Pexpect
156 156 flush = sys.stdout.flush
157 157 while True:
158 158 # res is the index of the pattern that caused the match, so we
159 159 # know whether we've finished (if we matched EOF) or not
160 160 res_idx = child.expect_list(patterns, self.read_timeout)
161 161 print(child.before[out_size:].decode(enc, 'replace'), end='')
162 162 flush()
163 163 if res_idx==EOF_index:
164 164 break
165 165 # Update the pointer to what we've already printed
166 166 out_size = len(child.before)
167 167 except KeyboardInterrupt:
168 168 # We need to send ^C to the process. The ascii code for '^C' is 3
169 169 # (the character is known as ETX for 'End of Text', see
170 170 # curses.ascii.ETX).
171 171 child.sendline(chr(3))
172 172 # Read and print any more output the program might produce on its
173 173 # way out.
174 174 try:
175 175 out_size = len(child.before)
176 176 child.expect_list(patterns, self.terminate_timeout)
177 177 print(child.before[out_size:].decode(enc, 'replace'), end='')
178 178 sys.stdout.flush()
179 179 except KeyboardInterrupt:
180 180 # Impatient users tend to type it multiple times
181 181 pass
182 182 finally:
183 183 # Ensure the subprocess really is terminated
184 184 child.terminate(force=True)
185 185 # add isalive check, to ensure exitstatus is set:
186 186 child.isalive()
187 187
188 188 # We follow the subprocess pattern, returning either the exit status
189 189 # as a positive number, or the terminating signal as a negative
190 # number
191 if child.signalstatus is not None:
192 return -child.signalstatus
190 # number. sh returns 128+n for signals
191 if child.exitstatus > 128:
192 return -(child.exitstatus - 128)
193 193 return child.exitstatus
194 194
195 195
196 196 # Make system() with a functional interface for outside use. Note that we use
197 197 # getoutput() from the _common utils, which is built on top of popen(). Using
198 198 # pexpect to get subprocess output produces difficult to parse output, since
199 199 # programs think they are talking to a tty and produce highly formatted output
200 200 # (ls is a good example) that makes them hard.
201 201 system = ProcessHandler().system
202 202
203 203
204 204
General Comments 0
You need to be logged in to leave comments. Login now