|
|
"""
|
|
|
Tests for platutils.py
|
|
|
"""
|
|
|
|
|
|
#-----------------------------------------------------------------------------
|
|
|
# Copyright (C) 2008-2011 The IPython Development Team
|
|
|
#
|
|
|
# Distributed under the terms of the BSD License. The full license is in
|
|
|
# the file COPYING, distributed as part of this software.
|
|
|
#-----------------------------------------------------------------------------
|
|
|
|
|
|
#-----------------------------------------------------------------------------
|
|
|
# Imports
|
|
|
#-----------------------------------------------------------------------------
|
|
|
|
|
|
import sys
|
|
|
import signal
|
|
|
import os
|
|
|
import time
|
|
|
from _thread import interrupt_main # Py 3
|
|
|
import threading
|
|
|
|
|
|
import pytest
|
|
|
|
|
|
from IPython.utils.process import (find_cmd, FindCmdError, arg_split,
|
|
|
system, getoutput, getoutputerror,
|
|
|
get_output_error_code)
|
|
|
from IPython.utils.capture import capture_output
|
|
|
from IPython.testing import decorators as dec
|
|
|
from IPython.testing import tools as tt
|
|
|
|
|
|
python = os.path.basename(sys.executable)
|
|
|
|
|
|
#-----------------------------------------------------------------------------
|
|
|
# Tests
|
|
|
#-----------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
@dec.skip_win32
|
|
|
def test_find_cmd_ls():
|
|
|
"""Make sure we can find the full path to ls."""
|
|
|
path = find_cmd("ls")
|
|
|
assert path.endswith("ls")
|
|
|
|
|
|
|
|
|
@dec.skip_if_not_win32
|
|
|
def test_find_cmd_pythonw():
|
|
|
"""Try to find pythonw on Windows."""
|
|
|
path = find_cmd('pythonw')
|
|
|
assert path.lower().endswith('pythonw.exe'), path
|
|
|
|
|
|
|
|
|
def test_find_cmd_fail():
|
|
|
"""Make sure that FindCmdError is raised if we can't find the cmd."""
|
|
|
pytest.raises(FindCmdError, find_cmd, "asdfasdf")
|
|
|
|
|
|
|
|
|
@dec.skip_win32
|
|
|
@pytest.mark.parametrize(
|
|
|
"argstr, argv",
|
|
|
[
|
|
|
("hi", ["hi"]),
|
|
|
("hello there", ["hello", "there"]),
|
|
|
# \u01ce == \N{LATIN SMALL LETTER A WITH CARON}
|
|
|
# Do not use \N because the tests crash with syntax error in
|
|
|
# some cases, for example windows python2.6.
|
|
|
("h\u01cello", ["h\u01cello"]),
|
|
|
('something "with quotes"', ["something", '"with quotes"']),
|
|
|
],
|
|
|
)
|
|
|
def test_arg_split(argstr, argv):
|
|
|
"""Ensure that argument lines are correctly split like in a shell."""
|
|
|
assert arg_split(argstr) == argv
|
|
|
|
|
|
|
|
|
@dec.skip_if_not_win32
|
|
|
@pytest.mark.parametrize(
|
|
|
"argstr,argv",
|
|
|
[
|
|
|
("hi", ["hi"]),
|
|
|
("hello there", ["hello", "there"]),
|
|
|
("h\u01cello", ["h\u01cello"]),
|
|
|
('something "with quotes"', ["something", "with quotes"]),
|
|
|
],
|
|
|
)
|
|
|
def test_arg_split_win32(argstr, argv):
|
|
|
"""Ensure that argument lines are correctly split like in a shell."""
|
|
|
assert arg_split(argstr) == argv
|
|
|
|
|
|
|
|
|
class SubProcessTestCase(tt.TempFileMixin):
|
|
|
def setUp(self):
|
|
|
"""Make a valid python temp file."""
|
|
|
lines = [ "import sys",
|
|
|
"print('on stdout', end='', file=sys.stdout)",
|
|
|
"print('on stderr', end='', file=sys.stderr)",
|
|
|
"sys.stdout.flush()",
|
|
|
"sys.stderr.flush()"]
|
|
|
self.mktmp('\n'.join(lines))
|
|
|
|
|
|
def test_system(self):
|
|
|
status = system(f'{python} "{self.fname}"')
|
|
|
self.assertEqual(status, 0)
|
|
|
|
|
|
def test_system_quotes(self):
|
|
|
status = system('%s -c "import sys"' % python)
|
|
|
self.assertEqual(status, 0)
|
|
|
|
|
|
def assert_interrupts(self, command):
|
|
|
"""
|
|
|
Interrupt a subprocess after a second.
|
|
|
"""
|
|
|
if threading.main_thread() != threading.current_thread():
|
|
|
raise pytest.skip("Can't run this test if not in main thread.")
|
|
|
|
|
|
# Some tests can overwrite SIGINT handler (by using pdb for example),
|
|
|
# which then breaks this test, so just make sure it's operating
|
|
|
# normally.
|
|
|
signal.signal(signal.SIGINT, signal.default_int_handler)
|
|
|
|
|
|
def interrupt():
|
|
|
# Wait for subprocess to start:
|
|
|
time.sleep(0.5)
|
|
|
interrupt_main()
|
|
|
|
|
|
threading.Thread(target=interrupt).start()
|
|
|
start = time.time()
|
|
|
try:
|
|
|
result = command()
|
|
|
except KeyboardInterrupt:
|
|
|
# Success!
|
|
|
pass
|
|
|
end = time.time()
|
|
|
self.assertTrue(
|
|
|
end - start < 2, "Process didn't die quickly: %s" % (end - start)
|
|
|
)
|
|
|
return result
|
|
|
|
|
|
def test_system_interrupt(self):
|
|
|
"""
|
|
|
When interrupted in the way ipykernel interrupts IPython, the
|
|
|
subprocess is interrupted.
|
|
|
"""
|
|
|
def command():
|
|
|
return system('%s -c "import time; time.sleep(5)"' % python)
|
|
|
|
|
|
status = self.assert_interrupts(command)
|
|
|
self.assertNotEqual(
|
|
|
status, 0, f"The process wasn't interrupted. Status: {status}"
|
|
|
)
|
|
|
|
|
|
def test_getoutput(self):
|
|
|
out = getoutput(f'{python} "{self.fname}"')
|
|
|
# we can't rely on the order the line buffered streams are flushed
|
|
|
try:
|
|
|
self.assertEqual(out, 'on stderron stdout')
|
|
|
except AssertionError:
|
|
|
self.assertEqual(out, 'on stdouton stderr')
|
|
|
|
|
|
def test_getoutput_quoted(self):
|
|
|
out = getoutput('%s -c "print (1)"' % python)
|
|
|
self.assertEqual(out.strip(), '1')
|
|
|
|
|
|
#Invalid quoting on windows
|
|
|
@dec.skip_win32
|
|
|
def test_getoutput_quoted2(self):
|
|
|
out = getoutput("%s -c 'print (1)'" % python)
|
|
|
self.assertEqual(out.strip(), '1')
|
|
|
out = getoutput("%s -c 'print (\"1\")'" % python)
|
|
|
self.assertEqual(out.strip(), '1')
|
|
|
|
|
|
def test_getoutput_error(self):
|
|
|
out, err = getoutputerror(f'{python} "{self.fname}"')
|
|
|
self.assertEqual(out, 'on stdout')
|
|
|
self.assertEqual(err, 'on stderr')
|
|
|
|
|
|
def test_get_output_error_code(self):
|
|
|
quiet_exit = '%s -c "import sys; sys.exit(1)"' % python
|
|
|
out, err, code = get_output_error_code(quiet_exit)
|
|
|
self.assertEqual(out, '')
|
|
|
self.assertEqual(err, '')
|
|
|
self.assertEqual(code, 1)
|
|
|
out, err, code = get_output_error_code(f'{python} "{self.fname}"')
|
|
|
self.assertEqual(out, 'on stdout')
|
|
|
self.assertEqual(err, 'on stderr')
|
|
|
self.assertEqual(code, 0)
|
|
|
|
|
|
|
|
|
|