test_process.py
198 lines
| 6.6 KiB
| text/x-python
|
PythonLexer
Brian Granger
|
r1976 | # encoding: utf-8 | ||
""" | ||||
Tests for platutils.py | ||||
""" | ||||
#----------------------------------------------------------------------------- | ||||
Matthias BUSSONNIER
|
r5390 | # Copyright (C) 2008-2011 The IPython Development Team | ||
Brian Granger
|
r1976 | # | ||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
import sys | ||||
Itamar Turner-Trauring
|
r25514 | import signal | ||
Julian Taylor
|
r10607 | import os | ||
Itamar Turner-Trauring
|
r25497 | import time | ||
from _thread import interrupt_main # Py 3 | ||||
import threading | ||||
Itamar Turner-Trauring
|
r25502 | from unittest import SkipTest | ||
Brian Granger
|
r1976 | |||
import nose.tools as nt | ||||
Fernando Perez
|
r2908 | from IPython.utils.process import (find_cmd, FindCmdError, arg_split, | ||
Paul Ivanov
|
r11826 | system, getoutput, getoutputerror, | ||
get_output_error_code) | ||||
Itamar Turner-Trauring
|
r25507 | from IPython.utils.capture import capture_output | ||
Brian Granger
|
r1976 | from IPython.testing import decorators as dec | ||
Fernando Perez
|
r2908 | from IPython.testing import tools as tt | ||
Brian Granger
|
r1976 | |||
Julian Taylor
|
r10607 | python = os.path.basename(sys.executable) | ||
Brian Granger
|
r1976 | #----------------------------------------------------------------------------- | ||
# Tests | ||||
#----------------------------------------------------------------------------- | ||||
@dec.skip_win32 | ||||
Fernando Perez
|
r2453 | def test_find_cmd_ls(): | ||
Brian Granger
|
r1976 | """Make sure we can find the full path to ls.""" | ||
path = find_cmd('ls') | ||||
Thomas Kluyver
|
r4898 | nt.assert_true(path.endswith('ls')) | ||
Brian Granger
|
r1976 | |||
Fernando Perez
|
r2453 | |||
def has_pywin32(): | ||||
try: | ||||
import win32api | ||||
except ImportError: | ||||
return False | ||||
return True | ||||
@dec.onlyif(has_pywin32, "This test requires win32api to run") | ||||
def test_find_cmd_pythonw(): | ||||
Brian Granger
|
r1976 | """Try to find pythonw on Windows.""" | ||
path = find_cmd('pythonw') | ||||
Thomas Kluyver
|
r21123 | assert path.lower().endswith('pythonw.exe'), path | ||
Brian Granger
|
r1976 | |||
Fernando Perez
|
r2453 | |||
@dec.onlyif(lambda : sys.platform != 'win32' or has_pywin32(), | ||||
"This test runs on posix or in win32 with win32api installed") | ||||
Brian Granger
|
r1976 | def test_find_cmd_fail(): | ||
"""Make sure that FindCmdError is raised if we can't find the cmd.""" | ||||
nt.assert_raises(FindCmdError,find_cmd,'asdfasdf') | ||||
Administrator
|
r1986 | |||
Matthias Bussonnier
|
r26717 | |||
# TODO: move to pytest.mark.parametrize once nose gone | ||||
Jörgen Stenarson
|
r5512 | @dec.skip_win32 | ||
Fernando Perez
|
r2650 | def test_arg_split(): | ||
"""Ensure that argument lines are correctly split like in a shell.""" | ||||
tests = [['hi', ['hi']], | ||||
[u'hi', [u'hi']], | ||||
Robert Kern
|
r3291 | ['hello there', ['hello', 'there']], | ||
Jörgen Stenarson
|
r5517 | # \u01ce == \N{LATIN SMALL LETTER A WITH CARON} | ||
# Do not use \N because the tests crash with syntax error in | ||||
# some cases, for example windows python2.6. | ||||
[u'h\u01cello', [u'h\u01cello']], | ||||
Jörgen Stenarson
|
r5512 | ['something "with quotes"', ['something', '"with quotes"']], | ||
] | ||||
for argstr, argv in tests: | ||||
Matthias Bussonnier
|
r26717 | assert arg_split(argstr) == argv | ||
# TODO: move to pytest.mark.parametrize once nose gone | ||||
Jörgen Stenarson
|
r5513 | @dec.skip_if_not_win32 | ||
Jörgen Stenarson
|
r5512 | def test_arg_split_win32(): | ||
"""Ensure that argument lines are correctly split like in a shell.""" | ||||
tests = [['hi', ['hi']], | ||||
[u'hi', [u'hi']], | ||||
['hello there', ['hello', 'there']], | ||||
Jörgen Stenarson
|
r5517 | [u'h\u01cello', [u'h\u01cello']], | ||
Jörgen Stenarson
|
r5513 | ['something "with quotes"', ['something', 'with quotes']], | ||
Fernando Perez
|
r2650 | ] | ||
for argstr, argv in tests: | ||||
Matthias Bussonnier
|
r26717 | assert arg_split(argstr) == argv | ||
Fernando Perez
|
r2908 | |||
Matthias Bussonnier
|
r25113 | class SubProcessTestCase(tt.TempFileMixin): | ||
Fernando Perez
|
r2908 | def setUp(self): | ||
"""Make a valid python temp file.""" | ||||
Paul Ivanov
|
r22963 | lines = [ "import sys", | ||
Fernando Perez
|
r2908 | "print('on stdout', end='', file=sys.stdout)", | ||
"print('on stderr', end='', file=sys.stderr)", | ||||
"sys.stdout.flush()", | ||||
"sys.stderr.flush()"] | ||||
self.mktmp('\n'.join(lines)) | ||||
def test_system(self): | ||||
Julian Taylor
|
r10607 | status = system('%s "%s"' % (python, self.fname)) | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(status, 0) | ||
MinRK
|
r5296 | |||
def test_system_quotes(self): | ||||
Julian Taylor
|
r10607 | status = system('%s -c "import sys"' % python) | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(status, 0) | ||
Fernando Perez
|
r2908 | |||
Itamar Turner-Trauring
|
r25502 | def assert_interrupts(self, command): | ||
Itamar Turner-Trauring
|
r25497 | """ | ||
Itamar Turner-Trauring
|
r25502 | Interrupt a subprocess after a second. | ||
Itamar Turner-Trauring
|
r25497 | """ | ||
if threading.main_thread() != threading.current_thread(): | ||||
raise nt.SkipTest("Can't run this test if not in main thread.") | ||||
Itamar Turner-Trauring
|
r25514 | # Some tests can overwrite SIGINT handler (by using pdb for example), | ||
# which then breaks this test, so just make sure it's operating | ||||
# normally. | ||||
signal.signal(signal.SIGINT, signal.default_int_handler) | ||||
Itamar Turner-Trauring
|
r25497 | def interrupt(): | ||
# Wait for subprocess to start: | ||||
time.sleep(0.5) | ||||
interrupt_main() | ||||
threading.Thread(target=interrupt).start() | ||||
Itamar Turner-Trauring
|
r25501 | start = time.time() | ||
Itamar Turner-Trauring
|
r25497 | try: | ||
Itamar Turner-Trauring
|
r25502 | result = command() | ||
Itamar Turner-Trauring
|
r25497 | except KeyboardInterrupt: | ||
# Success! | ||||
Itamar Turner-Trauring
|
r25514 | pass | ||
Itamar Turner-Trauring
|
r25501 | end = time.time() | ||
self.assertTrue( | ||||
end - start < 2, "Process didn't die quickly: %s" % (end - start) | ||||
) | ||||
Itamar Turner-Trauring
|
r25502 | return result | ||
def test_system_interrupt(self): | ||||
""" | ||||
When interrupted in the way ipykernel interrupts IPython, the | ||||
subprocess is interrupted. | ||||
""" | ||||
def command(): | ||||
return system('%s -c "import time; time.sleep(5)"' % python) | ||||
status = self.assert_interrupts(command) | ||||
self.assertNotEqual( | ||||
status, 0, "The process wasn't interrupted. Status: %s" % (status,) | ||||
) | ||||
Itamar Turner-Trauring
|
r25497 | |||
Fernando Perez
|
r2908 | def test_getoutput(self): | ||
Julian Taylor
|
r10607 | out = getoutput('%s "%s"' % (python, self.fname)) | ||
Julian Taylor
|
r10606 | # we can't rely on the order the line buffered streams are flushed | ||
try: | ||||
self.assertEqual(out, 'on stderron stdout') | ||||
except AssertionError: | ||||
self.assertEqual(out, 'on stdouton stderr') | ||||
Fernando Perez
|
r2908 | |||
MinRK
|
r5296 | def test_getoutput_quoted(self): | ||
Julian Taylor
|
r10607 | out = getoutput('%s -c "print (1)"' % python) | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(out.strip(), '1') | ||
Jörgen Stenarson
|
r5514 | |||
#Invalid quoting on windows | ||||
@dec.skip_win32 | ||||
def test_getoutput_quoted2(self): | ||||
Julian Taylor
|
r10607 | out = getoutput("%s -c 'print (1)'" % python) | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(out.strip(), '1') | ||
Julian Taylor
|
r10607 | out = getoutput("%s -c 'print (\"1\")'" % python) | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(out.strip(), '1') | ||
MinRK
|
r5296 | |||
Julian Taylor
|
r10606 | def test_getoutput_error(self): | ||
Julian Taylor
|
r10607 | out, err = getoutputerror('%s "%s"' % (python, self.fname)) | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(out, 'on stdout') | ||
self.assertEqual(err, 'on stderr') | ||||
Itamar Turner-Trauring
|
r25497 | |||
Paul Ivanov
|
r11826 | def test_get_output_error_code(self): | ||
quiet_exit = '%s -c "import sys; sys.exit(1)"' % python | ||||
out, err, code = get_output_error_code(quiet_exit) | ||||
self.assertEqual(out, '') | ||||
self.assertEqual(err, '') | ||||
self.assertEqual(code, 1) | ||||
out, err, code = get_output_error_code('%s "%s"' % (python, self.fname)) | ||||
self.assertEqual(out, 'on stdout') | ||||
self.assertEqual(err, 'on stderr') | ||||
self.assertEqual(code, 0) | ||||
Itamar Turner-Trauring
|
r25497 | |||