##// END OF EJS Templates
Increase write size.
Itamar Turner-Trauring -
Show More
@@ -1,210 +1,210 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 Tests for platutils.py
3 Tests for platutils.py
4 """
4 """
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008-2011 The IPython Development Team
7 # Copyright (C) 2008-2011 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 import sys
17 import sys
18 import os
18 import os
19 import time
19 import time
20 from _thread import interrupt_main # Py 3
20 from _thread import interrupt_main # Py 3
21 import threading
21 import threading
22 from unittest import SkipTest
22 from unittest import SkipTest
23
23
24 import nose.tools as nt
24 import nose.tools as nt
25
25
26 from IPython.utils.process import (find_cmd, FindCmdError, arg_split,
26 from IPython.utils.process import (find_cmd, FindCmdError, arg_split,
27 system, getoutput, getoutputerror,
27 system, getoutput, getoutputerror,
28 get_output_error_code)
28 get_output_error_code)
29 from IPython.utils.capture import capture_output
29 from IPython.utils.capture import capture_output
30 from IPython.testing import decorators as dec
30 from IPython.testing import decorators as dec
31 from IPython.testing import tools as tt
31 from IPython.testing import tools as tt
32
32
33 python = os.path.basename(sys.executable)
33 python = os.path.basename(sys.executable)
34
34
35 #-----------------------------------------------------------------------------
35 #-----------------------------------------------------------------------------
36 # Tests
36 # Tests
37 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
38
38
39
39
40 @dec.skip_win32
40 @dec.skip_win32
41 def test_find_cmd_ls():
41 def test_find_cmd_ls():
42 """Make sure we can find the full path to ls."""
42 """Make sure we can find the full path to ls."""
43 path = find_cmd('ls')
43 path = find_cmd('ls')
44 nt.assert_true(path.endswith('ls'))
44 nt.assert_true(path.endswith('ls'))
45
45
46
46
47 def has_pywin32():
47 def has_pywin32():
48 try:
48 try:
49 import win32api
49 import win32api
50 except ImportError:
50 except ImportError:
51 return False
51 return False
52 return True
52 return True
53
53
54
54
55 @dec.onlyif(has_pywin32, "This test requires win32api to run")
55 @dec.onlyif(has_pywin32, "This test requires win32api to run")
56 def test_find_cmd_pythonw():
56 def test_find_cmd_pythonw():
57 """Try to find pythonw on Windows."""
57 """Try to find pythonw on Windows."""
58 path = find_cmd('pythonw')
58 path = find_cmd('pythonw')
59 assert path.lower().endswith('pythonw.exe'), path
59 assert path.lower().endswith('pythonw.exe'), path
60
60
61
61
62 @dec.onlyif(lambda : sys.platform != 'win32' or has_pywin32(),
62 @dec.onlyif(lambda : sys.platform != 'win32' or has_pywin32(),
63 "This test runs on posix or in win32 with win32api installed")
63 "This test runs on posix or in win32 with win32api installed")
64 def test_find_cmd_fail():
64 def test_find_cmd_fail():
65 """Make sure that FindCmdError is raised if we can't find the cmd."""
65 """Make sure that FindCmdError is raised if we can't find the cmd."""
66 nt.assert_raises(FindCmdError,find_cmd,'asdfasdf')
66 nt.assert_raises(FindCmdError,find_cmd,'asdfasdf')
67
67
68
68
69 @dec.skip_win32
69 @dec.skip_win32
70 def test_arg_split():
70 def test_arg_split():
71 """Ensure that argument lines are correctly split like in a shell."""
71 """Ensure that argument lines are correctly split like in a shell."""
72 tests = [['hi', ['hi']],
72 tests = [['hi', ['hi']],
73 [u'hi', [u'hi']],
73 [u'hi', [u'hi']],
74 ['hello there', ['hello', 'there']],
74 ['hello there', ['hello', 'there']],
75 # \u01ce == \N{LATIN SMALL LETTER A WITH CARON}
75 # \u01ce == \N{LATIN SMALL LETTER A WITH CARON}
76 # Do not use \N because the tests crash with syntax error in
76 # Do not use \N because the tests crash with syntax error in
77 # some cases, for example windows python2.6.
77 # some cases, for example windows python2.6.
78 [u'h\u01cello', [u'h\u01cello']],
78 [u'h\u01cello', [u'h\u01cello']],
79 ['something "with quotes"', ['something', '"with quotes"']],
79 ['something "with quotes"', ['something', '"with quotes"']],
80 ]
80 ]
81 for argstr, argv in tests:
81 for argstr, argv in tests:
82 nt.assert_equal(arg_split(argstr), argv)
82 nt.assert_equal(arg_split(argstr), argv)
83
83
84 @dec.skip_if_not_win32
84 @dec.skip_if_not_win32
85 def test_arg_split_win32():
85 def test_arg_split_win32():
86 """Ensure that argument lines are correctly split like in a shell."""
86 """Ensure that argument lines are correctly split like in a shell."""
87 tests = [['hi', ['hi']],
87 tests = [['hi', ['hi']],
88 [u'hi', [u'hi']],
88 [u'hi', [u'hi']],
89 ['hello there', ['hello', 'there']],
89 ['hello there', ['hello', 'there']],
90 [u'h\u01cello', [u'h\u01cello']],
90 [u'h\u01cello', [u'h\u01cello']],
91 ['something "with quotes"', ['something', 'with quotes']],
91 ['something "with quotes"', ['something', 'with quotes']],
92 ]
92 ]
93 for argstr, argv in tests:
93 for argstr, argv in tests:
94 nt.assert_equal(arg_split(argstr), argv)
94 nt.assert_equal(arg_split(argstr), argv)
95
95
96
96
97 class SubProcessTestCase(tt.TempFileMixin):
97 class SubProcessTestCase(tt.TempFileMixin):
98 def setUp(self):
98 def setUp(self):
99 """Make a valid python temp file."""
99 """Make a valid python temp file."""
100 lines = [ "import sys",
100 lines = [ "import sys",
101 "print('on stdout', end='', file=sys.stdout)",
101 "print('on stdout', end='', file=sys.stdout)",
102 "print('on stderr', end='', file=sys.stderr)",
102 "print('on stderr', end='', file=sys.stderr)",
103 "sys.stdout.flush()",
103 "sys.stdout.flush()",
104 "sys.stderr.flush()"]
104 "sys.stderr.flush()"]
105 self.mktmp('\n'.join(lines))
105 self.mktmp('\n'.join(lines))
106
106
107 def test_system(self):
107 def test_system(self):
108 status = system('%s "%s"' % (python, self.fname))
108 status = system('%s "%s"' % (python, self.fname))
109 self.assertEqual(status, 0)
109 self.assertEqual(status, 0)
110
110
111 def test_system_quotes(self):
111 def test_system_quotes(self):
112 status = system('%s -c "import sys"' % python)
112 status = system('%s -c "import sys"' % python)
113 self.assertEqual(status, 0)
113 self.assertEqual(status, 0)
114
114
115 def assert_interrupts(self, command):
115 def assert_interrupts(self, command):
116 """
116 """
117 Interrupt a subprocess after a second.
117 Interrupt a subprocess after a second.
118 """
118 """
119 if threading.main_thread() != threading.current_thread():
119 if threading.main_thread() != threading.current_thread():
120 raise nt.SkipTest("Can't run this test if not in main thread.")
120 raise nt.SkipTest("Can't run this test if not in main thread.")
121
121
122 def interrupt():
122 def interrupt():
123 # Wait for subprocess to start:
123 # Wait for subprocess to start:
124 time.sleep(0.5)
124 time.sleep(0.5)
125 interrupt_main()
125 interrupt_main()
126
126
127 threading.Thread(target=interrupt).start()
127 threading.Thread(target=interrupt).start()
128 start = time.time()
128 start = time.time()
129 try:
129 try:
130 result = command()
130 result = command()
131 except KeyboardInterrupt:
131 except KeyboardInterrupt:
132 # Success!
132 # Success!
133 return
133 return
134 end = time.time()
134 end = time.time()
135 self.assertTrue(
135 self.assertTrue(
136 end - start < 2, "Process didn't die quickly: %s" % (end - start)
136 end - start < 2, "Process didn't die quickly: %s" % (end - start)
137 )
137 )
138 return result
138 return result
139
139
140 def test_system_interrupt(self):
140 def test_system_interrupt(self):
141 """
141 """
142 When interrupted in the way ipykernel interrupts IPython, the
142 When interrupted in the way ipykernel interrupts IPython, the
143 subprocess is interrupted.
143 subprocess is interrupted.
144 """
144 """
145 def command():
145 def command():
146 return system('%s -c "import time; time.sleep(5)"' % python)
146 return system('%s -c "import time; time.sleep(5)"' % python)
147
147
148 status = self.assert_interrupts(command)
148 status = self.assert_interrupts(command)
149 self.assertNotEqual(
149 self.assertNotEqual(
150 status, 0, "The process wasn't interrupted. Status: %s" % (status,)
150 status, 0, "The process wasn't interrupted. Status: %s" % (status,)
151 )
151 )
152
152
153 def test_stderr_while_stdout_open(self):
153 def test_stderr_while_stdout_open(self):
154 """
154 """
155 If lots of data is written to stderr while stdout is still open, enough
155 If lots of data is written to stderr while stdout is still open, enough
156 data to fill the pipe buffer in fact, the process still exits (i.e.
156 data to fill the pipe buffer in fact, the process still exits (i.e.
157 there is no deadlock).
157 there is no deadlock).
158 """
158 """
159 with capture_output(display=False):
159 with capture_output(display=False):
160 system(("%s -c 'import sys\nfor i in range(2000): " +
160 system(("%s -c 'import sys\nfor i in range(20000): " +
161 "sys.stderr.write(\" \" * 100 + \"\\n\")'") % (python,))
161 "sys.stderr.write(\" \" * 100 + \"\\n\")'") % (python,))
162
162
163 def test_getoutput(self):
163 def test_getoutput(self):
164 out = getoutput('%s "%s"' % (python, self.fname))
164 out = getoutput('%s "%s"' % (python, self.fname))
165 # we can't rely on the order the line buffered streams are flushed
165 # we can't rely on the order the line buffered streams are flushed
166 try:
166 try:
167 self.assertEqual(out, 'on stderron stdout')
167 self.assertEqual(out, 'on stderron stdout')
168 except AssertionError:
168 except AssertionError:
169 self.assertEqual(out, 'on stdouton stderr')
169 self.assertEqual(out, 'on stdouton stderr')
170
170
171 def test_getoutput_interrupt(self):
171 def test_getoutput_interrupt(self):
172 """
172 """
173 When interrupted in the way ipykernel interrupts IPython, the
173 When interrupted in the way ipykernel interrupts IPython, the
174 subprocess is interrupted.
174 subprocess is interrupted.
175 """
175 """
176 raise SkipTest("This fails on POSIX too, revisit in future.")
176 raise SkipTest("This fails on POSIX too, revisit in future.")
177 def command():
177 def command():
178 return getoutput('%s -c "import time; time.sleep(5)"' % (python, ))
178 return getoutput('%s -c "import time; time.sleep(5)"' % (python, ))
179
179
180 self.assert_interrupts(command)
180 self.assert_interrupts(command)
181
181
182 def test_getoutput_quoted(self):
182 def test_getoutput_quoted(self):
183 out = getoutput('%s -c "print (1)"' % python)
183 out = getoutput('%s -c "print (1)"' % python)
184 self.assertEqual(out.strip(), '1')
184 self.assertEqual(out.strip(), '1')
185
185
186 #Invalid quoting on windows
186 #Invalid quoting on windows
187 @dec.skip_win32
187 @dec.skip_win32
188 def test_getoutput_quoted2(self):
188 def test_getoutput_quoted2(self):
189 out = getoutput("%s -c 'print (1)'" % python)
189 out = getoutput("%s -c 'print (1)'" % python)
190 self.assertEqual(out.strip(), '1')
190 self.assertEqual(out.strip(), '1')
191 out = getoutput("%s -c 'print (\"1\")'" % python)
191 out = getoutput("%s -c 'print (\"1\")'" % python)
192 self.assertEqual(out.strip(), '1')
192 self.assertEqual(out.strip(), '1')
193
193
194 def test_getoutput_error(self):
194 def test_getoutput_error(self):
195 out, err = getoutputerror('%s "%s"' % (python, self.fname))
195 out, err = getoutputerror('%s "%s"' % (python, self.fname))
196 self.assertEqual(out, 'on stdout')
196 self.assertEqual(out, 'on stdout')
197 self.assertEqual(err, 'on stderr')
197 self.assertEqual(err, 'on stderr')
198
198
199 def test_get_output_error_code(self):
199 def test_get_output_error_code(self):
200 quiet_exit = '%s -c "import sys; sys.exit(1)"' % python
200 quiet_exit = '%s -c "import sys; sys.exit(1)"' % python
201 out, err, code = get_output_error_code(quiet_exit)
201 out, err, code = get_output_error_code(quiet_exit)
202 self.assertEqual(out, '')
202 self.assertEqual(out, '')
203 self.assertEqual(err, '')
203 self.assertEqual(err, '')
204 self.assertEqual(code, 1)
204 self.assertEqual(code, 1)
205 out, err, code = get_output_error_code('%s "%s"' % (python, self.fname))
205 out, err, code = get_output_error_code('%s "%s"' % (python, self.fname))
206 self.assertEqual(out, 'on stdout')
206 self.assertEqual(out, 'on stdout')
207 self.assertEqual(err, 'on stderr')
207 self.assertEqual(err, 'on stderr')
208 self.assertEqual(code, 0)
208 self.assertEqual(code, 0)
209
209
210
210
General Comments 0
You need to be logged in to leave comments. Login now