##// END OF EJS Templates
tests: make subprocess handling reusable for different tests in test-stdio.py
Manuel Jacob -
r45638:a59aab60 default
parent child Browse files
Show More
@@ -1,133 +1,150 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 Tests the buffering behavior of stdio streams in `mercurial.utils.procutil`.
3 Tests the buffering behavior of stdio streams in `mercurial.utils.procutil`.
4 """
4 """
5 from __future__ import absolute_import
5 from __future__ import absolute_import
6
6
7 import contextlib
7 import contextlib
8 import errno
8 import errno
9 import os
9 import os
10 import subprocess
10 import subprocess
11 import sys
11 import sys
12 import unittest
12 import unittest
13
13
14 from mercurial import pycompat
14 from mercurial import pycompat
15
15
16
16
17 BUFFERING_CHILD_SCRIPT = r'''
17 TEST_BUFFERING_CHILD_SCRIPT = r'''
18 import os
18 import os
19
19
20 from mercurial import dispatch
20 from mercurial import dispatch
21 from mercurial.utils import procutil
21 from mercurial.utils import procutil
22
22
23 dispatch.initstdio()
23 dispatch.initstdio()
24 procutil.{stream}.write(b'aaa')
24 procutil.{stream}.write(b'aaa')
25 os.write(procutil.{stream}.fileno(), b'[written aaa]')
25 os.write(procutil.{stream}.fileno(), b'[written aaa]')
26 procutil.{stream}.write(b'bbb\n')
26 procutil.{stream}.write(b'bbb\n')
27 os.write(procutil.{stream}.fileno(), b'[written bbb\\n]')
27 os.write(procutil.{stream}.fileno(), b'[written bbb\\n]')
28 '''
28 '''
29 UNBUFFERED = b'aaa[written aaa]bbb\n[written bbb\\n]'
29 UNBUFFERED = b'aaa[written aaa]bbb\n[written bbb\\n]'
30 LINE_BUFFERED = b'[written aaa]aaabbb\n[written bbb\\n]'
30 LINE_BUFFERED = b'[written aaa]aaabbb\n[written bbb\\n]'
31 FULLY_BUFFERED = b'[written aaa][written bbb\\n]aaabbb\n'
31 FULLY_BUFFERED = b'[written aaa][written bbb\\n]aaabbb\n'
32
32
33
33
34 @contextlib.contextmanager
34 @contextlib.contextmanager
35 def _closing(fds):
35 def _closing(fds):
36 try:
36 try:
37 yield
37 yield
38 finally:
38 finally:
39 for fd in fds:
39 for fd in fds:
40 try:
40 try:
41 os.close(fd)
41 os.close(fd)
42 except EnvironmentError:
42 except EnvironmentError:
43 pass
43 pass
44
44
45
45
46 @contextlib.contextmanager
46 @contextlib.contextmanager
47 def _pipes():
47 def _pipes():
48 rwpair = os.pipe()
48 rwpair = os.pipe()
49 with _closing(rwpair):
49 with _closing(rwpair):
50 yield rwpair
50 yield rwpair
51
51
52
52
53 @contextlib.contextmanager
53 @contextlib.contextmanager
54 def _ptys():
54 def _ptys():
55 if pycompat.iswindows:
55 if pycompat.iswindows:
56 raise unittest.SkipTest("PTYs are not supported on Windows")
56 raise unittest.SkipTest("PTYs are not supported on Windows")
57 import pty
57 import pty
58 import tty
58 import tty
59
59
60 rwpair = pty.openpty()
60 rwpair = pty.openpty()
61 with _closing(rwpair):
61 with _closing(rwpair):
62 tty.setraw(rwpair[0])
62 tty.setraw(rwpair[0])
63 yield rwpair
63 yield rwpair
64
64
65
65
66 def _readall(fd, buffer_size):
66 def _readall(fd, buffer_size):
67 buf = []
67 buf = []
68 while True:
68 while True:
69 try:
69 try:
70 s = os.read(fd, buffer_size)
70 s = os.read(fd, buffer_size)
71 except OSError as e:
71 except OSError as e:
72 if e.errno == errno.EIO:
72 if e.errno == errno.EIO:
73 # If the child-facing PTY got closed, reading from the
73 # If the child-facing PTY got closed, reading from the
74 # parent-facing PTY raises EIO.
74 # parent-facing PTY raises EIO.
75 break
75 break
76 raise
76 raise
77 if not s:
77 if not s:
78 break
78 break
79 buf.append(s)
79 buf.append(s)
80 return b''.join(buf)
80 return b''.join(buf)
81
81
82
82
83 class TestStdio(unittest.TestCase):
83 class TestStdio(unittest.TestCase):
84 def _test(self, stream, rwpair_generator, expected_output, python_args=[]):
84 def _test(
85 self,
86 child_script,
87 stream,
88 rwpair_generator,
89 check_output,
90 python_args=[],
91 ):
85 assert stream in ('stdout', 'stderr')
92 assert stream in ('stdout', 'stderr')
86 with rwpair_generator() as (stream_receiver, child_stream), open(
93 with rwpair_generator() as (stream_receiver, child_stream), open(
87 os.devnull, 'rb'
94 os.devnull, 'rb'
88 ) as child_stdin:
95 ) as child_stdin:
89 proc = subprocess.Popen(
96 proc = subprocess.Popen(
90 [sys.executable]
97 [sys.executable] + python_args + ['-c', child_script],
91 + python_args
92 + ['-c', BUFFERING_CHILD_SCRIPT.format(stream=stream)],
93 stdin=child_stdin,
98 stdin=child_stdin,
94 stdout=child_stream if stream == 'stdout' else None,
99 stdout=child_stream if stream == 'stdout' else None,
95 stderr=child_stream if stream == 'stderr' else None,
100 stderr=child_stream if stream == 'stderr' else None,
96 )
101 )
97 try:
102 try:
98 os.close(child_stream)
103 os.close(child_stream)
99 self.assertEqual(
104 check_output(stream_receiver)
100 _readall(stream_receiver, 1024), expected_output
101 )
102 except: # re-raises
105 except: # re-raises
103 proc.terminate()
106 proc.terminate()
104 raise
107 raise
105 finally:
108 finally:
106 retcode = proc.wait()
109 retcode = proc.wait()
107 self.assertEqual(retcode, 0)
110 self.assertEqual(retcode, 0)
108
111
112 def _test_buffering(
113 self, stream, rwpair_generator, expected_output, python_args=[]
114 ):
115 def check_output(stream_receiver):
116 self.assertEqual(_readall(stream_receiver, 1024), expected_output)
117
118 self._test(
119 TEST_BUFFERING_CHILD_SCRIPT.format(stream=stream),
120 stream,
121 rwpair_generator,
122 check_output,
123 python_args,
124 )
125
109 def test_buffering_stdout_pipes(self):
126 def test_buffering_stdout_pipes(self):
110 self._test('stdout', _pipes, FULLY_BUFFERED)
127 self._test_buffering('stdout', _pipes, FULLY_BUFFERED)
111
128
112 def test_buffering_stdout_ptys(self):
129 def test_buffering_stdout_ptys(self):
113 self._test('stdout', _ptys, LINE_BUFFERED)
130 self._test_buffering('stdout', _ptys, LINE_BUFFERED)
114
131
115 def test_buffering_stdout_pipes_unbuffered(self):
132 def test_buffering_stdout_pipes_unbuffered(self):
116 self._test('stdout', _pipes, UNBUFFERED, python_args=['-u'])
133 self._test_buffering('stdout', _pipes, UNBUFFERED, python_args=['-u'])
117
134
118 def test_buffering_stdout_ptys_unbuffered(self):
135 def test_buffering_stdout_ptys_unbuffered(self):
119 self._test('stdout', _ptys, UNBUFFERED, python_args=['-u'])
136 self._test_buffering('stdout', _ptys, UNBUFFERED, python_args=['-u'])
120
137
121 if not pycompat.ispy3 and not pycompat.iswindows:
138 if not pycompat.ispy3 and not pycompat.iswindows:
122 # On Python 2 on non-Windows, we manually open stdout in line-buffered
139 # On Python 2 on non-Windows, we manually open stdout in line-buffered
123 # mode if connected to a TTY. We should check if Python was configured
140 # mode if connected to a TTY. We should check if Python was configured
124 # to use unbuffered stdout, but it's hard to do that.
141 # to use unbuffered stdout, but it's hard to do that.
125 test_buffering_stdout_ptys_unbuffered = unittest.expectedFailure(
142 test_buffering_stdout_ptys_unbuffered = unittest.expectedFailure(
126 test_buffering_stdout_ptys_unbuffered
143 test_buffering_stdout_ptys_unbuffered
127 )
144 )
128
145
129
146
130 if __name__ == '__main__':
147 if __name__ == '__main__':
131 import silenttestrunner
148 import silenttestrunner
132
149
133 silenttestrunner.main(__name__)
150 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now