##// END OF EJS Templates
Test that the system() subprocess can be interrupted.
Itamar Turner-Trauring -
Show More
@@ -1,144 +1,172 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 Tests for platutils.py
3 Tests for platutils.py
4 """
4 """
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008-2011 The IPython Development Team
7 # Copyright (C) 2008-2011 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 import sys
17 import sys
18 import os
18 import os
19 import time
20 from _thread import interrupt_main # Py 3
21 import threading
19
22
20 import nose.tools as nt
23 import nose.tools as nt
21
24
22 from IPython.utils.process import (find_cmd, FindCmdError, arg_split,
25 from IPython.utils.process import (find_cmd, FindCmdError, arg_split,
23 system, getoutput, getoutputerror,
26 system, getoutput, getoutputerror,
24 get_output_error_code)
27 get_output_error_code)
25 from IPython.testing import decorators as dec
28 from IPython.testing import decorators as dec
26 from IPython.testing import tools as tt
29 from IPython.testing import tools as tt
27
30
28 python = os.path.basename(sys.executable)
31 python = os.path.basename(sys.executable)
29
32
30 #-----------------------------------------------------------------------------
33 #-----------------------------------------------------------------------------
31 # Tests
34 # Tests
32 #-----------------------------------------------------------------------------
35 #-----------------------------------------------------------------------------
33
36
34
37
35 @dec.skip_win32
38 @dec.skip_win32
36 def test_find_cmd_ls():
39 def test_find_cmd_ls():
37 """Make sure we can find the full path to ls."""
40 """Make sure we can find the full path to ls."""
38 path = find_cmd('ls')
41 path = find_cmd('ls')
39 nt.assert_true(path.endswith('ls'))
42 nt.assert_true(path.endswith('ls'))
40
43
41
44
42 def has_pywin32():
45 def has_pywin32():
43 try:
46 try:
44 import win32api
47 import win32api
45 except ImportError:
48 except ImportError:
46 return False
49 return False
47 return True
50 return True
48
51
49
52
50 @dec.onlyif(has_pywin32, "This test requires win32api to run")
53 @dec.onlyif(has_pywin32, "This test requires win32api to run")
51 def test_find_cmd_pythonw():
54 def test_find_cmd_pythonw():
52 """Try to find pythonw on Windows."""
55 """Try to find pythonw on Windows."""
53 path = find_cmd('pythonw')
56 path = find_cmd('pythonw')
54 assert path.lower().endswith('pythonw.exe'), path
57 assert path.lower().endswith('pythonw.exe'), path
55
58
56
59
57 @dec.onlyif(lambda : sys.platform != 'win32' or has_pywin32(),
60 @dec.onlyif(lambda : sys.platform != 'win32' or has_pywin32(),
58 "This test runs on posix or in win32 with win32api installed")
61 "This test runs on posix or in win32 with win32api installed")
59 def test_find_cmd_fail():
62 def test_find_cmd_fail():
60 """Make sure that FindCmdError is raised if we can't find the cmd."""
63 """Make sure that FindCmdError is raised if we can't find the cmd."""
61 nt.assert_raises(FindCmdError,find_cmd,'asdfasdf')
64 nt.assert_raises(FindCmdError,find_cmd,'asdfasdf')
62
65
63
66
64 @dec.skip_win32
67 @dec.skip_win32
65 def test_arg_split():
68 def test_arg_split():
66 """Ensure that argument lines are correctly split like in a shell."""
69 """Ensure that argument lines are correctly split like in a shell."""
67 tests = [['hi', ['hi']],
70 tests = [['hi', ['hi']],
68 [u'hi', [u'hi']],
71 [u'hi', [u'hi']],
69 ['hello there', ['hello', 'there']],
72 ['hello there', ['hello', 'there']],
70 # \u01ce == \N{LATIN SMALL LETTER A WITH CARON}
73 # \u01ce == \N{LATIN SMALL LETTER A WITH CARON}
71 # Do not use \N because the tests crash with syntax error in
74 # Do not use \N because the tests crash with syntax error in
72 # some cases, for example windows python2.6.
75 # some cases, for example windows python2.6.
73 [u'h\u01cello', [u'h\u01cello']],
76 [u'h\u01cello', [u'h\u01cello']],
74 ['something "with quotes"', ['something', '"with quotes"']],
77 ['something "with quotes"', ['something', '"with quotes"']],
75 ]
78 ]
76 for argstr, argv in tests:
79 for argstr, argv in tests:
77 nt.assert_equal(arg_split(argstr), argv)
80 nt.assert_equal(arg_split(argstr), argv)
78
81
79 @dec.skip_if_not_win32
82 @dec.skip_if_not_win32
80 def test_arg_split_win32():
83 def test_arg_split_win32():
81 """Ensure that argument lines are correctly split like in a shell."""
84 """Ensure that argument lines are correctly split like in a shell."""
82 tests = [['hi', ['hi']],
85 tests = [['hi', ['hi']],
83 [u'hi', [u'hi']],
86 [u'hi', [u'hi']],
84 ['hello there', ['hello', 'there']],
87 ['hello there', ['hello', 'there']],
85 [u'h\u01cello', [u'h\u01cello']],
88 [u'h\u01cello', [u'h\u01cello']],
86 ['something "with quotes"', ['something', 'with quotes']],
89 ['something "with quotes"', ['something', 'with quotes']],
87 ]
90 ]
88 for argstr, argv in tests:
91 for argstr, argv in tests:
89 nt.assert_equal(arg_split(argstr), argv)
92 nt.assert_equal(arg_split(argstr), argv)
90
93
91
94
92 class SubProcessTestCase(tt.TempFileMixin):
95 class SubProcessTestCase(tt.TempFileMixin):
93 def setUp(self):
96 def setUp(self):
94 """Make a valid python temp file."""
97 """Make a valid python temp file."""
95 lines = [ "import sys",
98 lines = [ "import sys",
96 "print('on stdout', end='', file=sys.stdout)",
99 "print('on stdout', end='', file=sys.stdout)",
97 "print('on stderr', end='', file=sys.stderr)",
100 "print('on stderr', end='', file=sys.stderr)",
98 "sys.stdout.flush()",
101 "sys.stdout.flush()",
99 "sys.stderr.flush()"]
102 "sys.stderr.flush()"]
100 self.mktmp('\n'.join(lines))
103 self.mktmp('\n'.join(lines))
101
104
102 def test_system(self):
105 def test_system(self):
103 status = system('%s "%s"' % (python, self.fname))
106 status = system('%s "%s"' % (python, self.fname))
104 self.assertEqual(status, 0)
107 self.assertEqual(status, 0)
105
108
106 def test_system_quotes(self):
109 def test_system_quotes(self):
107 status = system('%s -c "import sys"' % python)
110 status = system('%s -c "import sys"' % python)
108 self.assertEqual(status, 0)
111 self.assertEqual(status, 0)
109
112
113 def test_system_interrupt(self):
114 """
115 When interrupted in the way ipykernel interrupts IPython, the
116 subprocess is interrupted.
117 """
118 if threading.main_thread() != threading.current_thread():
119 raise nt.SkipTest("Can't run this test if not in main thread.")
120
121 def interrupt():
122 # Wait for subprocess to start:
123 time.sleep(0.5)
124 interrupt_main()
125
126 threading.Thread(target=interrupt).start()
127 try:
128 status = system('%s -c "import time; time.sleep(5)"' % python)
129 except KeyboardInterrupt:
130 # Success!
131 return
132 self.assertNotEqual(
133 status, 0, "The process wasn't interrupted. Status: %s" % (status,)
134 )
135
110 def test_getoutput(self):
136 def test_getoutput(self):
111 out = getoutput('%s "%s"' % (python, self.fname))
137 out = getoutput('%s "%s"' % (python, self.fname))
112 # we can't rely on the order the line buffered streams are flushed
138 # we can't rely on the order the line buffered streams are flushed
113 try:
139 try:
114 self.assertEqual(out, 'on stderron stdout')
140 self.assertEqual(out, 'on stderron stdout')
115 except AssertionError:
141 except AssertionError:
116 self.assertEqual(out, 'on stdouton stderr')
142 self.assertEqual(out, 'on stdouton stderr')
117
143
118 def test_getoutput_quoted(self):
144 def test_getoutput_quoted(self):
119 out = getoutput('%s -c "print (1)"' % python)
145 out = getoutput('%s -c "print (1)"' % python)
120 self.assertEqual(out.strip(), '1')
146 self.assertEqual(out.strip(), '1')
121
147
122 #Invalid quoting on windows
148 #Invalid quoting on windows
123 @dec.skip_win32
149 @dec.skip_win32
124 def test_getoutput_quoted2(self):
150 def test_getoutput_quoted2(self):
125 out = getoutput("%s -c 'print (1)'" % python)
151 out = getoutput("%s -c 'print (1)'" % python)
126 self.assertEqual(out.strip(), '1')
152 self.assertEqual(out.strip(), '1')
127 out = getoutput("%s -c 'print (\"1\")'" % python)
153 out = getoutput("%s -c 'print (\"1\")'" % python)
128 self.assertEqual(out.strip(), '1')
154 self.assertEqual(out.strip(), '1')
129
155
130 def test_getoutput_error(self):
156 def test_getoutput_error(self):
131 out, err = getoutputerror('%s "%s"' % (python, self.fname))
157 out, err = getoutputerror('%s "%s"' % (python, self.fname))
132 self.assertEqual(out, 'on stdout')
158 self.assertEqual(out, 'on stdout')
133 self.assertEqual(err, 'on stderr')
159 self.assertEqual(err, 'on stderr')
134
160
135 def test_get_output_error_code(self):
161 def test_get_output_error_code(self):
136 quiet_exit = '%s -c "import sys; sys.exit(1)"' % python
162 quiet_exit = '%s -c "import sys; sys.exit(1)"' % python
137 out, err, code = get_output_error_code(quiet_exit)
163 out, err, code = get_output_error_code(quiet_exit)
138 self.assertEqual(out, '')
164 self.assertEqual(out, '')
139 self.assertEqual(err, '')
165 self.assertEqual(err, '')
140 self.assertEqual(code, 1)
166 self.assertEqual(code, 1)
141 out, err, code = get_output_error_code('%s "%s"' % (python, self.fname))
167 out, err, code = get_output_error_code('%s "%s"' % (python, self.fname))
142 self.assertEqual(out, 'on stdout')
168 self.assertEqual(out, 'on stdout')
143 self.assertEqual(err, 'on stderr')
169 self.assertEqual(err, 'on stderr')
144 self.assertEqual(code, 0)
170 self.assertEqual(code, 0)
171
172
General Comments 0
You need to be logged in to leave comments. Login now