##// END OF EJS Templates
procutil: avoid use of deprecated tempfile.mktemp()...
Manuel Jacob -
r45664:eb26a9cf default
parent child Browse files
Show More
@@ -1,277 +1,271 b''
1 1 #!/usr/bin/env python
2 2 """
3 3 Tests the buffering behavior of stdio streams in `mercurial.utils.procutil`.
4 4 """
5 5 from __future__ import absolute_import
6 6
7 7 import contextlib
8 8 import errno
9 9 import os
10 10 import signal
11 11 import subprocess
12 12 import sys
13 13 import tempfile
14 14 import unittest
15 15
16 16 from mercurial import pycompat
17 17
18 18
19 19 TEST_BUFFERING_CHILD_SCRIPT = r'''
20 20 import os
21 21
22 22 from mercurial import dispatch
23 23 from mercurial.utils import procutil
24 24
25 25 dispatch.initstdio()
26 26 procutil.{stream}.write(b'aaa')
27 27 os.write(procutil.{stream}.fileno(), b'[written aaa]')
28 28 procutil.{stream}.write(b'bbb\n')
29 29 os.write(procutil.{stream}.fileno(), b'[written bbb\\n]')
30 30 '''
31 31 UNBUFFERED = b'aaa[written aaa]bbb\n[written bbb\\n]'
32 32 LINE_BUFFERED = b'[written aaa]aaabbb\n[written bbb\\n]'
33 33 FULLY_BUFFERED = b'[written aaa][written bbb\\n]aaabbb\n'
34 34
35 35
36 36 TEST_LARGE_WRITE_CHILD_SCRIPT = r'''
37 import os
37 38 import signal
38 39 import sys
39 40
40 41 from mercurial import dispatch
41 42 from mercurial.utils import procutil
42 43
43 44 signal.signal(signal.SIGINT, lambda *x: None)
44 45 dispatch.initstdio()
45 46 write_result = procutil.{stream}.write(b'x' * 1048576)
46 with open({write_result_fn}, 'w') as write_result_f:
47 with os.fdopen(
48 os.open({write_result_fn!r}, os.O_WRONLY | getattr(os, 'O_TEMPORARY', 0)),
49 'w',
50 ) as write_result_f:
47 51 write_result_f.write(str(write_result))
48 52 '''
49 53
50 54
51 55 @contextlib.contextmanager
52 56 def _closing(fds):
53 57 try:
54 58 yield
55 59 finally:
56 60 for fd in fds:
57 61 try:
58 62 os.close(fd)
59 63 except EnvironmentError:
60 64 pass
61 65
62 66
63 67 @contextlib.contextmanager
64 68 def _devnull():
65 69 devnull = os.open(os.devnull, os.O_WRONLY)
66 70 with _closing([devnull]):
67 71 yield (None, devnull)
68 72
69 73
70 74 @contextlib.contextmanager
71 75 def _pipes():
72 76 rwpair = os.pipe()
73 77 with _closing(rwpair):
74 78 yield rwpair
75 79
76 80
77 81 @contextlib.contextmanager
78 82 def _ptys():
79 83 if pycompat.iswindows:
80 84 raise unittest.SkipTest("PTYs are not supported on Windows")
81 85 import pty
82 86 import tty
83 87
84 88 rwpair = pty.openpty()
85 89 with _closing(rwpair):
86 90 tty.setraw(rwpair[0])
87 91 yield rwpair
88 92
89 93
90 94 def _readall(fd, buffer_size, initial_buf=None):
91 95 buf = initial_buf or []
92 96 while True:
93 97 try:
94 98 s = os.read(fd, buffer_size)
95 99 except OSError as e:
96 100 if e.errno == errno.EIO:
97 101 # If the child-facing PTY got closed, reading from the
98 102 # parent-facing PTY raises EIO.
99 103 break
100 104 raise
101 105 if not s:
102 106 break
103 107 buf.append(s)
104 108 return b''.join(buf)
105 109
106 110
107 111 class TestStdio(unittest.TestCase):
108 112 def _test(
109 113 self,
110 114 child_script,
111 115 stream,
112 116 rwpair_generator,
113 117 check_output,
114 118 python_args=[],
115 119 post_child_check=None,
116 120 ):
117 121 assert stream in ('stdout', 'stderr')
118 122 with rwpair_generator() as (stream_receiver, child_stream), open(
119 123 os.devnull, 'rb'
120 124 ) as child_stdin:
121 125 proc = subprocess.Popen(
122 126 [sys.executable] + python_args + ['-c', child_script],
123 127 stdin=child_stdin,
124 128 stdout=child_stream if stream == 'stdout' else None,
125 129 stderr=child_stream if stream == 'stderr' else None,
126 130 )
127 131 try:
128 132 os.close(child_stream)
129 133 if stream_receiver is not None:
130 134 check_output(stream_receiver, proc)
131 135 except: # re-raises
132 136 proc.terminate()
133 137 raise
134 138 finally:
135 139 retcode = proc.wait()
136 140 self.assertEqual(retcode, 0)
137 141 if post_child_check is not None:
138 142 post_child_check()
139 143
140 144 def _test_buffering(
141 145 self, stream, rwpair_generator, expected_output, python_args=[]
142 146 ):
143 147 def check_output(stream_receiver, proc):
144 148 self.assertEqual(_readall(stream_receiver, 1024), expected_output)
145 149
146 150 self._test(
147 151 TEST_BUFFERING_CHILD_SCRIPT.format(stream=stream),
148 152 stream,
149 153 rwpair_generator,
150 154 check_output,
151 155 python_args,
152 156 )
153 157
154 158 def test_buffering_stdout_devnull(self):
155 159 self._test_buffering('stdout', _devnull, None)
156 160
157 161 def test_buffering_stdout_pipes(self):
158 162 self._test_buffering('stdout', _pipes, FULLY_BUFFERED)
159 163
160 164 def test_buffering_stdout_ptys(self):
161 165 self._test_buffering('stdout', _ptys, LINE_BUFFERED)
162 166
163 167 def test_buffering_stdout_devnull_unbuffered(self):
164 168 self._test_buffering('stdout', _devnull, None, python_args=['-u'])
165 169
166 170 def test_buffering_stdout_pipes_unbuffered(self):
167 171 self._test_buffering('stdout', _pipes, UNBUFFERED, python_args=['-u'])
168 172
169 173 def test_buffering_stdout_ptys_unbuffered(self):
170 174 self._test_buffering('stdout', _ptys, UNBUFFERED, python_args=['-u'])
171 175
172 176 if not pycompat.ispy3 and not pycompat.iswindows:
173 177 # On Python 2 on non-Windows, we manually open stdout in line-buffered
174 178 # mode if connected to a TTY. We should check if Python was configured
175 179 # to use unbuffered stdout, but it's hard to do that.
176 180 test_buffering_stdout_ptys_unbuffered = unittest.expectedFailure(
177 181 test_buffering_stdout_ptys_unbuffered
178 182 )
179 183
180 184 def _test_large_write(self, stream, rwpair_generator, python_args=[]):
181 185 if not pycompat.ispy3 and pycompat.isdarwin:
182 186 # Python 2 doesn't always retry on EINTR, but the libc might retry.
183 187 # So far, it was observed only on macOS that EINTR is raised at the
184 188 # Python level. As Python 2 support will be dropped soon-ish, we
185 189 # won't attempt to fix it.
186 190 raise unittest.SkipTest("raises EINTR on macOS")
187 191
188 192 def check_output(stream_receiver, proc):
189 193 if not pycompat.iswindows:
190 194 # On Unix, we can provoke a partial write() by interrupting it
191 195 # by a signal handler as soon as a bit of data was written.
192 196 # We test that write() is called until all data is written.
193 197 buf = [os.read(stream_receiver, 1)]
194 198 proc.send_signal(signal.SIGINT)
195 199 else:
196 200 # On Windows, there doesn't seem to be a way to cause partial
197 201 # writes.
198 202 buf = []
199 203 self.assertEqual(
200 204 _readall(stream_receiver, 131072, buf), b'x' * 1048576
201 205 )
202 206
203 207 def post_child_check():
204 with open(write_result_fn, 'r') as write_result_f:
205 write_result_str = write_result_f.read()
208 write_result_str = write_result_f.read()
206 209 if pycompat.ispy3:
207 210 # On Python 3, we test that the correct number of bytes is
208 211 # claimed to have been written.
209 212 expected_write_result_str = '1048576'
210 213 else:
211 214 # On Python 2, we only check that the large write does not
212 215 # crash.
213 216 expected_write_result_str = 'None'
214 217 self.assertEqual(write_result_str, expected_write_result_str)
215 218
216 try:
217 # tempfile.mktemp() is unsafe in general, as a malicious process
218 # could create the file before we do. But in tests, we're running
219 # in a controlled environment.
220 write_result_fn = tempfile.mktemp()
219 with tempfile.NamedTemporaryFile('r') as write_result_f:
221 220 self._test(
222 221 TEST_LARGE_WRITE_CHILD_SCRIPT.format(
223 stream=stream, write_result_fn=repr(write_result_fn)
222 stream=stream, write_result_fn=write_result_f.name
224 223 ),
225 224 stream,
226 225 rwpair_generator,
227 226 check_output,
228 227 python_args,
229 228 post_child_check=post_child_check,
230 229 )
231 finally:
232 try:
233 os.unlink(write_result_fn)
234 except OSError:
235 pass
236 230
237 231 def test_large_write_stdout_devnull(self):
238 232 self._test_large_write('stdout', _devnull)
239 233
240 234 def test_large_write_stdout_pipes(self):
241 235 self._test_large_write('stdout', _pipes)
242 236
243 237 def test_large_write_stdout_ptys(self):
244 238 self._test_large_write('stdout', _ptys)
245 239
246 240 def test_large_write_stdout_devnull_unbuffered(self):
247 241 self._test_large_write('stdout', _devnull, python_args=['-u'])
248 242
249 243 def test_large_write_stdout_pipes_unbuffered(self):
250 244 self._test_large_write('stdout', _pipes, python_args=['-u'])
251 245
252 246 def test_large_write_stdout_ptys_unbuffered(self):
253 247 self._test_large_write('stdout', _ptys, python_args=['-u'])
254 248
255 249 def test_large_write_stderr_devnull(self):
256 250 self._test_large_write('stderr', _devnull)
257 251
258 252 def test_large_write_stderr_pipes(self):
259 253 self._test_large_write('stderr', _pipes)
260 254
261 255 def test_large_write_stderr_ptys(self):
262 256 self._test_large_write('stderr', _ptys)
263 257
264 258 def test_large_write_stderr_devnull_unbuffered(self):
265 259 self._test_large_write('stderr', _devnull, python_args=['-u'])
266 260
267 261 def test_large_write_stderr_pipes_unbuffered(self):
268 262 self._test_large_write('stderr', _pipes, python_args=['-u'])
269 263
270 264 def test_large_write_stderr_ptys_unbuffered(self):
271 265 self._test_large_write('stderr', _ptys, python_args=['-u'])
272 266
273 267
274 268 if __name__ == '__main__':
275 269 import silenttestrunner
276 270
277 271 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now