##// END OF EJS Templates
tests: check that procutil.std{out,err}.write() returns correct result...
Manuel Jacob -
r45657:dff20839 default
parent child Browse files
Show More
@@ -1,245 +1,277 b''
1 1 #!/usr/bin/env python
2 2 """
3 3 Tests the buffering behavior of stdio streams in `mercurial.utils.procutil`.
4 4 """
5 5 from __future__ import absolute_import
6 6
7 7 import contextlib
8 8 import errno
9 9 import os
10 10 import signal
11 11 import subprocess
12 12 import sys
13 import tempfile
13 14 import unittest
14 15
15 16 from mercurial import pycompat
16 17
17 18
18 19 TEST_BUFFERING_CHILD_SCRIPT = r'''
19 20 import os
20 21
21 22 from mercurial import dispatch
22 23 from mercurial.utils import procutil
23 24
24 25 dispatch.initstdio()
25 26 procutil.{stream}.write(b'aaa')
26 27 os.write(procutil.{stream}.fileno(), b'[written aaa]')
27 28 procutil.{stream}.write(b'bbb\n')
28 29 os.write(procutil.{stream}.fileno(), b'[written bbb\\n]')
29 30 '''
30 31 UNBUFFERED = b'aaa[written aaa]bbb\n[written bbb\\n]'
31 32 LINE_BUFFERED = b'[written aaa]aaabbb\n[written bbb\\n]'
32 33 FULLY_BUFFERED = b'[written aaa][written bbb\\n]aaabbb\n'
33 34
34 35
35 36 TEST_LARGE_WRITE_CHILD_SCRIPT = r'''
36 37 import signal
37 38 import sys
38 39
39 40 from mercurial import dispatch
40 41 from mercurial.utils import procutil
41 42
42 43 signal.signal(signal.SIGINT, lambda *x: None)
43 44 dispatch.initstdio()
44 procutil.{stream}.write(b'x' * 1048576)
45 write_result = procutil.{stream}.write(b'x' * 1048576)
46 with open({write_result_fn}, 'w') as write_result_f:
47 write_result_f.write(str(write_result))
45 48 '''
46 49
47 50
48 51 @contextlib.contextmanager
49 52 def _closing(fds):
50 53 try:
51 54 yield
52 55 finally:
53 56 for fd in fds:
54 57 try:
55 58 os.close(fd)
56 59 except EnvironmentError:
57 60 pass
58 61
59 62
60 63 @contextlib.contextmanager
61 64 def _devnull():
62 65 devnull = os.open(os.devnull, os.O_WRONLY)
63 66 with _closing([devnull]):
64 67 yield (None, devnull)
65 68
66 69
67 70 @contextlib.contextmanager
68 71 def _pipes():
69 72 rwpair = os.pipe()
70 73 with _closing(rwpair):
71 74 yield rwpair
72 75
73 76
74 77 @contextlib.contextmanager
75 78 def _ptys():
76 79 if pycompat.iswindows:
77 80 raise unittest.SkipTest("PTYs are not supported on Windows")
78 81 import pty
79 82 import tty
80 83
81 84 rwpair = pty.openpty()
82 85 with _closing(rwpair):
83 86 tty.setraw(rwpair[0])
84 87 yield rwpair
85 88
86 89
87 90 def _readall(fd, buffer_size, initial_buf=None):
88 91 buf = initial_buf or []
89 92 while True:
90 93 try:
91 94 s = os.read(fd, buffer_size)
92 95 except OSError as e:
93 96 if e.errno == errno.EIO:
94 97 # If the child-facing PTY got closed, reading from the
95 98 # parent-facing PTY raises EIO.
96 99 break
97 100 raise
98 101 if not s:
99 102 break
100 103 buf.append(s)
101 104 return b''.join(buf)
102 105
103 106
104 107 class TestStdio(unittest.TestCase):
105 108 def _test(
106 109 self,
107 110 child_script,
108 111 stream,
109 112 rwpair_generator,
110 113 check_output,
111 114 python_args=[],
115 post_child_check=None,
112 116 ):
113 117 assert stream in ('stdout', 'stderr')
114 118 with rwpair_generator() as (stream_receiver, child_stream), open(
115 119 os.devnull, 'rb'
116 120 ) as child_stdin:
117 121 proc = subprocess.Popen(
118 122 [sys.executable] + python_args + ['-c', child_script],
119 123 stdin=child_stdin,
120 124 stdout=child_stream if stream == 'stdout' else None,
121 125 stderr=child_stream if stream == 'stderr' else None,
122 126 )
123 127 try:
124 128 os.close(child_stream)
125 129 if stream_receiver is not None:
126 130 check_output(stream_receiver, proc)
127 131 except: # re-raises
128 132 proc.terminate()
129 133 raise
130 134 finally:
131 135 retcode = proc.wait()
132 136 self.assertEqual(retcode, 0)
137 if post_child_check is not None:
138 post_child_check()
133 139
134 140 def _test_buffering(
135 141 self, stream, rwpair_generator, expected_output, python_args=[]
136 142 ):
137 143 def check_output(stream_receiver, proc):
138 144 self.assertEqual(_readall(stream_receiver, 1024), expected_output)
139 145
140 146 self._test(
141 147 TEST_BUFFERING_CHILD_SCRIPT.format(stream=stream),
142 148 stream,
143 149 rwpair_generator,
144 150 check_output,
145 151 python_args,
146 152 )
147 153
148 154 def test_buffering_stdout_devnull(self):
149 155 self._test_buffering('stdout', _devnull, None)
150 156
151 157 def test_buffering_stdout_pipes(self):
152 158 self._test_buffering('stdout', _pipes, FULLY_BUFFERED)
153 159
154 160 def test_buffering_stdout_ptys(self):
155 161 self._test_buffering('stdout', _ptys, LINE_BUFFERED)
156 162
157 163 def test_buffering_stdout_devnull_unbuffered(self):
158 164 self._test_buffering('stdout', _devnull, None, python_args=['-u'])
159 165
160 166 def test_buffering_stdout_pipes_unbuffered(self):
161 167 self._test_buffering('stdout', _pipes, UNBUFFERED, python_args=['-u'])
162 168
163 169 def test_buffering_stdout_ptys_unbuffered(self):
164 170 self._test_buffering('stdout', _ptys, UNBUFFERED, python_args=['-u'])
165 171
166 172 if not pycompat.ispy3 and not pycompat.iswindows:
167 173 # On Python 2 on non-Windows, we manually open stdout in line-buffered
168 174 # mode if connected to a TTY. We should check if Python was configured
169 175 # to use unbuffered stdout, but it's hard to do that.
170 176 test_buffering_stdout_ptys_unbuffered = unittest.expectedFailure(
171 177 test_buffering_stdout_ptys_unbuffered
172 178 )
173 179
174 180 def _test_large_write(self, stream, rwpair_generator, python_args=[]):
175 181 if not pycompat.ispy3 and pycompat.isdarwin:
176 182 # Python 2 doesn't always retry on EINTR, but the libc might retry.
177 183 # So far, it was observed only on macOS that EINTR is raised at the
178 184 # Python level. As Python 2 support will be dropped soon-ish, we
179 185 # won't attempt to fix it.
180 186 raise unittest.SkipTest("raises EINTR on macOS")
181 187
182 188 def check_output(stream_receiver, proc):
183 189 if not pycompat.iswindows:
184 190 # On Unix, we can provoke a partial write() by interrupting it
185 191 # by a signal handler as soon as a bit of data was written.
186 192 # We test that write() is called until all data is written.
187 193 buf = [os.read(stream_receiver, 1)]
188 194 proc.send_signal(signal.SIGINT)
189 195 else:
190 196 # On Windows, there doesn't seem to be a way to cause partial
191 197 # writes.
192 198 buf = []
193 199 self.assertEqual(
194 200 _readall(stream_receiver, 131072, buf), b'x' * 1048576
195 201 )
196 202
197 self._test(
198 TEST_LARGE_WRITE_CHILD_SCRIPT.format(stream=stream),
199 stream,
200 rwpair_generator,
201 check_output,
202 python_args,
203 )
203 def post_child_check():
204 with open(write_result_fn, 'r') as write_result_f:
205 write_result_str = write_result_f.read()
206 if pycompat.ispy3:
207 # On Python 3, we test that the correct number of bytes is
208 # claimed to have been written.
209 expected_write_result_str = '1048576'
210 else:
211 # On Python 2, we only check that the large write does not
212 # crash.
213 expected_write_result_str = 'None'
214 self.assertEqual(write_result_str, expected_write_result_str)
215
216 try:
217 # tempfile.mktemp() is unsafe in general, as a malicious process
218 # could create the file before we do. But in tests, we're running
219 # in a controlled environment.
220 write_result_fn = tempfile.mktemp()
221 self._test(
222 TEST_LARGE_WRITE_CHILD_SCRIPT.format(
223 stream=stream, write_result_fn=repr(write_result_fn)
224 ),
225 stream,
226 rwpair_generator,
227 check_output,
228 python_args,
229 post_child_check=post_child_check,
230 )
231 finally:
232 try:
233 os.unlink(write_result_fn)
234 except OSError:
235 pass
204 236
205 237 def test_large_write_stdout_devnull(self):
206 238 self._test_large_write('stdout', _devnull)
207 239
208 240 def test_large_write_stdout_pipes(self):
209 241 self._test_large_write('stdout', _pipes)
210 242
211 243 def test_large_write_stdout_ptys(self):
212 244 self._test_large_write('stdout', _ptys)
213 245
214 246 def test_large_write_stdout_devnull_unbuffered(self):
215 247 self._test_large_write('stdout', _devnull, python_args=['-u'])
216 248
217 249 def test_large_write_stdout_pipes_unbuffered(self):
218 250 self._test_large_write('stdout', _pipes, python_args=['-u'])
219 251
220 252 def test_large_write_stdout_ptys_unbuffered(self):
221 253 self._test_large_write('stdout', _ptys, python_args=['-u'])
222 254
223 255 def test_large_write_stderr_devnull(self):
224 256 self._test_large_write('stderr', _devnull)
225 257
226 258 def test_large_write_stderr_pipes(self):
227 259 self._test_large_write('stderr', _pipes)
228 260
229 261 def test_large_write_stderr_ptys(self):
230 262 self._test_large_write('stderr', _ptys)
231 263
232 264 def test_large_write_stderr_devnull_unbuffered(self):
233 265 self._test_large_write('stderr', _devnull, python_args=['-u'])
234 266
235 267 def test_large_write_stderr_pipes_unbuffered(self):
236 268 self._test_large_write('stderr', _pipes, python_args=['-u'])
237 269
238 270 def test_large_write_stderr_ptys_unbuffered(self):
239 271 self._test_large_write('stderr', _ptys, python_args=['-u'])
240 272
241 273
242 274 if __name__ == '__main__':
243 275 import silenttestrunner
244 276
245 277 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now