##// END OF EJS Templates
Test for filling stderr buffer.
Itamar Turner-Trauring -
Show More
@@ -1,199 +1,210 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 Tests for platutils.py
3 Tests for platutils.py
4 """
4 """
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008-2011 The IPython Development Team
7 # Copyright (C) 2008-2011 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 import sys
17 import sys
18 import os
18 import os
19 import time
19 import time
20 from _thread import interrupt_main # Py 3
20 from _thread import interrupt_main # Py 3
21 import threading
21 import threading
22 from unittest import SkipTest
22 from unittest import SkipTest
23
23
24 import nose.tools as nt
24 import nose.tools as nt
25
25
26 from IPython.utils.process import (find_cmd, FindCmdError, arg_split,
26 from IPython.utils.process import (find_cmd, FindCmdError, arg_split,
27 system, getoutput, getoutputerror,
27 system, getoutput, getoutputerror,
28 get_output_error_code)
28 get_output_error_code)
29 from IPython.utils.capture import capture_output
29 from IPython.testing import decorators as dec
30 from IPython.testing import decorators as dec
30 from IPython.testing import tools as tt
31 from IPython.testing import tools as tt
31
32
32 python = os.path.basename(sys.executable)
33 python = os.path.basename(sys.executable)
33
34
34 #-----------------------------------------------------------------------------
35 #-----------------------------------------------------------------------------
35 # Tests
36 # Tests
36 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
37
38
38
39
39 @dec.skip_win32
40 @dec.skip_win32
40 def test_find_cmd_ls():
41 def test_find_cmd_ls():
41 """Make sure we can find the full path to ls."""
42 """Make sure we can find the full path to ls."""
42 path = find_cmd('ls')
43 path = find_cmd('ls')
43 nt.assert_true(path.endswith('ls'))
44 nt.assert_true(path.endswith('ls'))
44
45
45
46
46 def has_pywin32():
47 def has_pywin32():
47 try:
48 try:
48 import win32api
49 import win32api
49 except ImportError:
50 except ImportError:
50 return False
51 return False
51 return True
52 return True
52
53
53
54
54 @dec.onlyif(has_pywin32, "This test requires win32api to run")
55 @dec.onlyif(has_pywin32, "This test requires win32api to run")
55 def test_find_cmd_pythonw():
56 def test_find_cmd_pythonw():
56 """Try to find pythonw on Windows."""
57 """Try to find pythonw on Windows."""
57 path = find_cmd('pythonw')
58 path = find_cmd('pythonw')
58 assert path.lower().endswith('pythonw.exe'), path
59 assert path.lower().endswith('pythonw.exe'), path
59
60
60
61
61 @dec.onlyif(lambda : sys.platform != 'win32' or has_pywin32(),
62 @dec.onlyif(lambda : sys.platform != 'win32' or has_pywin32(),
62 "This test runs on posix or in win32 with win32api installed")
63 "This test runs on posix or in win32 with win32api installed")
63 def test_find_cmd_fail():
64 def test_find_cmd_fail():
64 """Make sure that FindCmdError is raised if we can't find the cmd."""
65 """Make sure that FindCmdError is raised if we can't find the cmd."""
65 nt.assert_raises(FindCmdError,find_cmd,'asdfasdf')
66 nt.assert_raises(FindCmdError,find_cmd,'asdfasdf')
66
67
67
68
68 @dec.skip_win32
69 @dec.skip_win32
69 def test_arg_split():
70 def test_arg_split():
70 """Ensure that argument lines are correctly split like in a shell."""
71 """Ensure that argument lines are correctly split like in a shell."""
71 tests = [['hi', ['hi']],
72 tests = [['hi', ['hi']],
72 [u'hi', [u'hi']],
73 [u'hi', [u'hi']],
73 ['hello there', ['hello', 'there']],
74 ['hello there', ['hello', 'there']],
74 # \u01ce == \N{LATIN SMALL LETTER A WITH CARON}
75 # \u01ce == \N{LATIN SMALL LETTER A WITH CARON}
75 # Do not use \N because the tests crash with syntax error in
76 # Do not use \N because the tests crash with syntax error in
76 # some cases, for example windows python2.6.
77 # some cases, for example windows python2.6.
77 [u'h\u01cello', [u'h\u01cello']],
78 [u'h\u01cello', [u'h\u01cello']],
78 ['something "with quotes"', ['something', '"with quotes"']],
79 ['something "with quotes"', ['something', '"with quotes"']],
79 ]
80 ]
80 for argstr, argv in tests:
81 for argstr, argv in tests:
81 nt.assert_equal(arg_split(argstr), argv)
82 nt.assert_equal(arg_split(argstr), argv)
82
83
83 @dec.skip_if_not_win32
84 @dec.skip_if_not_win32
84 def test_arg_split_win32():
85 def test_arg_split_win32():
85 """Ensure that argument lines are correctly split like in a shell."""
86 """Ensure that argument lines are correctly split like in a shell."""
86 tests = [['hi', ['hi']],
87 tests = [['hi', ['hi']],
87 [u'hi', [u'hi']],
88 [u'hi', [u'hi']],
88 ['hello there', ['hello', 'there']],
89 ['hello there', ['hello', 'there']],
89 [u'h\u01cello', [u'h\u01cello']],
90 [u'h\u01cello', [u'h\u01cello']],
90 ['something "with quotes"', ['something', 'with quotes']],
91 ['something "with quotes"', ['something', 'with quotes']],
91 ]
92 ]
92 for argstr, argv in tests:
93 for argstr, argv in tests:
93 nt.assert_equal(arg_split(argstr), argv)
94 nt.assert_equal(arg_split(argstr), argv)
94
95
95
96
96 class SubProcessTestCase(tt.TempFileMixin):
97 class SubProcessTestCase(tt.TempFileMixin):
97 def setUp(self):
98 def setUp(self):
98 """Make a valid python temp file."""
99 """Make a valid python temp file."""
99 lines = [ "import sys",
100 lines = [ "import sys",
100 "print('on stdout', end='', file=sys.stdout)",
101 "print('on stdout', end='', file=sys.stdout)",
101 "print('on stderr', end='', file=sys.stderr)",
102 "print('on stderr', end='', file=sys.stderr)",
102 "sys.stdout.flush()",
103 "sys.stdout.flush()",
103 "sys.stderr.flush()"]
104 "sys.stderr.flush()"]
104 self.mktmp('\n'.join(lines))
105 self.mktmp('\n'.join(lines))
105
106
106 def test_system(self):
107 def test_system(self):
107 status = system('%s "%s"' % (python, self.fname))
108 status = system('%s "%s"' % (python, self.fname))
108 self.assertEqual(status, 0)
109 self.assertEqual(status, 0)
109
110
110 def test_system_quotes(self):
111 def test_system_quotes(self):
111 status = system('%s -c "import sys"' % python)
112 status = system('%s -c "import sys"' % python)
112 self.assertEqual(status, 0)
113 self.assertEqual(status, 0)
113
114
114 def assert_interrupts(self, command):
115 def assert_interrupts(self, command):
115 """
116 """
116 Interrupt a subprocess after a second.
117 Interrupt a subprocess after a second.
117 """
118 """
118 if threading.main_thread() != threading.current_thread():
119 if threading.main_thread() != threading.current_thread():
119 raise nt.SkipTest("Can't run this test if not in main thread.")
120 raise nt.SkipTest("Can't run this test if not in main thread.")
120
121
121 def interrupt():
122 def interrupt():
122 # Wait for subprocess to start:
123 # Wait for subprocess to start:
123 time.sleep(0.5)
124 time.sleep(0.5)
124 interrupt_main()
125 interrupt_main()
125
126
126 threading.Thread(target=interrupt).start()
127 threading.Thread(target=interrupt).start()
127 start = time.time()
128 start = time.time()
128 try:
129 try:
129 result = command()
130 result = command()
130 except KeyboardInterrupt:
131 except KeyboardInterrupt:
131 # Success!
132 # Success!
132 return
133 return
133 end = time.time()
134 end = time.time()
134 self.assertTrue(
135 self.assertTrue(
135 end - start < 2, "Process didn't die quickly: %s" % (end - start)
136 end - start < 2, "Process didn't die quickly: %s" % (end - start)
136 )
137 )
137 return result
138 return result
138
139
139 def test_system_interrupt(self):
140 def test_system_interrupt(self):
140 """
141 """
141 When interrupted in the way ipykernel interrupts IPython, the
142 When interrupted in the way ipykernel interrupts IPython, the
142 subprocess is interrupted.
143 subprocess is interrupted.
143 """
144 """
144 def command():
145 def command():
145 return system('%s -c "import time; time.sleep(5)"' % python)
146 return system('%s -c "import time; time.sleep(5)"' % python)
146
147
147 status = self.assert_interrupts(command)
148 status = self.assert_interrupts(command)
148 self.assertNotEqual(
149 self.assertNotEqual(
149 status, 0, "The process wasn't interrupted. Status: %s" % (status,)
150 status, 0, "The process wasn't interrupted. Status: %s" % (status,)
150 )
151 )
151
152
153 def test_stderr_while_stdout_open(self):
154 """
155 If lots of data is written to stderr while stdout is still open, enough
156 data to fill the pipe buffer in fact, the process still exits (i.e.
157 there is no deadlock).
158 """
159 with capture_output(display=False):
160 system(("%s -c 'import sys\nfor i in range(2000): " +
161 "sys.stderr.write(\" \" * 100 + \"\\n\")'") % (python,))
162
152 def test_getoutput(self):
163 def test_getoutput(self):
153 out = getoutput('%s "%s"' % (python, self.fname))
164 out = getoutput('%s "%s"' % (python, self.fname))
154 # we can't rely on the order the line buffered streams are flushed
165 # we can't rely on the order the line buffered streams are flushed
155 try:
166 try:
156 self.assertEqual(out, 'on stderron stdout')
167 self.assertEqual(out, 'on stderron stdout')
157 except AssertionError:
168 except AssertionError:
158 self.assertEqual(out, 'on stdouton stderr')
169 self.assertEqual(out, 'on stdouton stderr')
159
170
160 def test_getoutput_interrupt(self):
171 def test_getoutput_interrupt(self):
161 """
172 """
162 When interrupted in the way ipykernel interrupts IPython, the
173 When interrupted in the way ipykernel interrupts IPython, the
163 subprocess is interrupted.
174 subprocess is interrupted.
164 """
175 """
165 raise SkipTest("This fails on POSIX too, revisit in future.")
176 raise SkipTest("This fails on POSIX too, revisit in future.")
166 def command():
177 def command():
167 return getoutput('%s -c "import time; time.sleep(5)"' % (python, ))
178 return getoutput('%s -c "import time; time.sleep(5)"' % (python, ))
168
179
169 self.assert_interrupts(command)
180 self.assert_interrupts(command)
170
181
171 def test_getoutput_quoted(self):
182 def test_getoutput_quoted(self):
172 out = getoutput('%s -c "print (1)"' % python)
183 out = getoutput('%s -c "print (1)"' % python)
173 self.assertEqual(out.strip(), '1')
184 self.assertEqual(out.strip(), '1')
174
185
175 #Invalid quoting on windows
186 #Invalid quoting on windows
176 @dec.skip_win32
187 @dec.skip_win32
177 def test_getoutput_quoted2(self):
188 def test_getoutput_quoted2(self):
178 out = getoutput("%s -c 'print (1)'" % python)
189 out = getoutput("%s -c 'print (1)'" % python)
179 self.assertEqual(out.strip(), '1')
190 self.assertEqual(out.strip(), '1')
180 out = getoutput("%s -c 'print (\"1\")'" % python)
191 out = getoutput("%s -c 'print (\"1\")'" % python)
181 self.assertEqual(out.strip(), '1')
192 self.assertEqual(out.strip(), '1')
182
193
183 def test_getoutput_error(self):
194 def test_getoutput_error(self):
184 out, err = getoutputerror('%s "%s"' % (python, self.fname))
195 out, err = getoutputerror('%s "%s"' % (python, self.fname))
185 self.assertEqual(out, 'on stdout')
196 self.assertEqual(out, 'on stdout')
186 self.assertEqual(err, 'on stderr')
197 self.assertEqual(err, 'on stderr')
187
198
188 def test_get_output_error_code(self):
199 def test_get_output_error_code(self):
189 quiet_exit = '%s -c "import sys; sys.exit(1)"' % python
200 quiet_exit = '%s -c "import sys; sys.exit(1)"' % python
190 out, err, code = get_output_error_code(quiet_exit)
201 out, err, code = get_output_error_code(quiet_exit)
191 self.assertEqual(out, '')
202 self.assertEqual(out, '')
192 self.assertEqual(err, '')
203 self.assertEqual(err, '')
193 self.assertEqual(code, 1)
204 self.assertEqual(code, 1)
194 out, err, code = get_output_error_code('%s "%s"' % (python, self.fname))
205 out, err, code = get_output_error_code('%s "%s"' % (python, self.fname))
195 self.assertEqual(out, 'on stdout')
206 self.assertEqual(out, 'on stdout')
196 self.assertEqual(err, 'on stderr')
207 self.assertEqual(err, 'on stderr')
197 self.assertEqual(code, 0)
208 self.assertEqual(code, 0)
198
209
199
210
General Comments 0
You need to be logged in to leave comments. Login now