##// END OF EJS Templates
tests: terminate subprocess in test-stdio.py in case of exception...
Manuel Jacob -
r45629:9172fd51 default
parent child Browse files
Show More
@@ -1,142 +1,145 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 Tests the buffering behavior of stdio streams in `mercurial.utils.procutil`.
3 Tests the buffering behavior of stdio streams in `mercurial.utils.procutil`.
4 """
4 """
5 from __future__ import absolute_import
5 from __future__ import absolute_import
6
6
7 import contextlib
7 import contextlib
8 import errno
8 import errno
9 import os
9 import os
10 import subprocess
10 import subprocess
11 import sys
11 import sys
12 import unittest
12 import unittest
13
13
14 from mercurial import pycompat
14 from mercurial import pycompat
15
15
16
16
17 CHILD_PROCESS = r'''
17 CHILD_PROCESS = r'''
18 import os
18 import os
19
19
20 from mercurial import dispatch
20 from mercurial import dispatch
21 from mercurial.utils import procutil
21 from mercurial.utils import procutil
22
22
23 dispatch.initstdio()
23 dispatch.initstdio()
24 procutil.{stream}.write(b'aaa')
24 procutil.{stream}.write(b'aaa')
25 os.write(procutil.{stream}.fileno(), b'[written aaa]')
25 os.write(procutil.{stream}.fileno(), b'[written aaa]')
26 procutil.{stream}.write(b'bbb\n')
26 procutil.{stream}.write(b'bbb\n')
27 os.write(procutil.{stream}.fileno(), b'[written bbb\\n]')
27 os.write(procutil.{stream}.fileno(), b'[written bbb\\n]')
28 '''
28 '''
29 UNBUFFERED = b'aaa[written aaa]bbb\n[written bbb\\n]'
29 UNBUFFERED = b'aaa[written aaa]bbb\n[written bbb\\n]'
30 LINE_BUFFERED = b'[written aaa]aaabbb\n[written bbb\\n]'
30 LINE_BUFFERED = b'[written aaa]aaabbb\n[written bbb\\n]'
31 FULLY_BUFFERED = b'[written aaa][written bbb\\n]aaabbb\n'
31 FULLY_BUFFERED = b'[written aaa][written bbb\\n]aaabbb\n'
32
32
33
33
34 @contextlib.contextmanager
34 @contextlib.contextmanager
35 def _closing(fds):
35 def _closing(fds):
36 try:
36 try:
37 yield
37 yield
38 finally:
38 finally:
39 for fd in fds:
39 for fd in fds:
40 try:
40 try:
41 os.close(fd)
41 os.close(fd)
42 except EnvironmentError:
42 except EnvironmentError:
43 pass
43 pass
44
44
45
45
46 @contextlib.contextmanager
46 @contextlib.contextmanager
47 def _pipes():
47 def _pipes():
48 rwpair = os.pipe()
48 rwpair = os.pipe()
49 with _closing(rwpair):
49 with _closing(rwpair):
50 yield rwpair
50 yield rwpair
51
51
52
52
53 @contextlib.contextmanager
53 @contextlib.contextmanager
54 def _ptys():
54 def _ptys():
55 if pycompat.iswindows:
55 if pycompat.iswindows:
56 raise unittest.SkipTest("PTYs are not supported on Windows")
56 raise unittest.SkipTest("PTYs are not supported on Windows")
57 import pty
57 import pty
58 import tty
58 import tty
59
59
60 rwpair = pty.openpty()
60 rwpair = pty.openpty()
61 with _closing(rwpair):
61 with _closing(rwpair):
62 tty.setraw(rwpair[0])
62 tty.setraw(rwpair[0])
63 yield rwpair
63 yield rwpair
64
64
65
65
66 def _readall(fd, buffer_size):
66 def _readall(fd, buffer_size):
67 buf = []
67 buf = []
68 while True:
68 while True:
69 try:
69 try:
70 s = os.read(fd, buffer_size)
70 s = os.read(fd, buffer_size)
71 except OSError as e:
71 except OSError as e:
72 if e.errno == errno.EIO:
72 if e.errno == errno.EIO:
73 # If the child-facing PTY got closed, reading from the
73 # If the child-facing PTY got closed, reading from the
74 # parent-facing PTY raises EIO.
74 # parent-facing PTY raises EIO.
75 break
75 break
76 raise
76 raise
77 if not s:
77 if not s:
78 break
78 break
79 buf.append(s)
79 buf.append(s)
80 return b''.join(buf)
80 return b''.join(buf)
81
81
82
82
83 class TestStdio(unittest.TestCase):
83 class TestStdio(unittest.TestCase):
84 def _test(self, stream, rwpair_generator, expected_output, python_args=[]):
84 def _test(self, stream, rwpair_generator, expected_output, python_args=[]):
85 assert stream in ('stdout', 'stderr')
85 assert stream in ('stdout', 'stderr')
86 with rwpair_generator() as (stream_receiver, child_stream), open(
86 with rwpair_generator() as (stream_receiver, child_stream), open(
87 os.devnull, 'rb'
87 os.devnull, 'rb'
88 ) as child_stdin:
88 ) as child_stdin:
89 proc = subprocess.Popen(
89 proc = subprocess.Popen(
90 [sys.executable]
90 [sys.executable]
91 + python_args
91 + python_args
92 + ['-c', CHILD_PROCESS.format(stream=stream)],
92 + ['-c', CHILD_PROCESS.format(stream=stream)],
93 stdin=child_stdin,
93 stdin=child_stdin,
94 stdout=child_stream if stream == 'stdout' else None,
94 stdout=child_stream if stream == 'stdout' else None,
95 stderr=child_stream if stream == 'stderr' else None,
95 stderr=child_stream if stream == 'stderr' else None,
96 )
96 )
97 try:
97 try:
98 os.close(child_stream)
98 os.close(child_stream)
99 self.assertEqual(
99 self.assertEqual(
100 _readall(stream_receiver, 1024), expected_output
100 _readall(stream_receiver, 1024), expected_output
101 )
101 )
102 except: # re-raises
103 proc.terminate()
104 raise
102 finally:
105 finally:
103 retcode = proc.wait()
106 retcode = proc.wait()
104 self.assertEqual(retcode, 0)
107 self.assertEqual(retcode, 0)
105
108
106 def test_stdout_pipes(self):
109 def test_stdout_pipes(self):
107 self._test('stdout', _pipes, FULLY_BUFFERED)
110 self._test('stdout', _pipes, FULLY_BUFFERED)
108
111
109 def test_stdout_ptys(self):
112 def test_stdout_ptys(self):
110 self._test('stdout', _ptys, LINE_BUFFERED)
113 self._test('stdout', _ptys, LINE_BUFFERED)
111
114
112 def test_stdout_pipes_unbuffered(self):
115 def test_stdout_pipes_unbuffered(self):
113 self._test('stdout', _pipes, UNBUFFERED, python_args=['-u'])
116 self._test('stdout', _pipes, UNBUFFERED, python_args=['-u'])
114
117
115 def test_stdout_ptys_unbuffered(self):
118 def test_stdout_ptys_unbuffered(self):
116 self._test('stdout', _ptys, UNBUFFERED, python_args=['-u'])
119 self._test('stdout', _ptys, UNBUFFERED, python_args=['-u'])
117
120
118 if not pycompat.ispy3 and not pycompat.iswindows:
121 if not pycompat.ispy3 and not pycompat.iswindows:
119 # On Python 2 on non-Windows, we manually open stdout in line-buffered
122 # On Python 2 on non-Windows, we manually open stdout in line-buffered
120 # mode if connected to a TTY. We should check if Python was configured
123 # mode if connected to a TTY. We should check if Python was configured
121 # to use unbuffered stdout, but it's hard to do that.
124 # to use unbuffered stdout, but it's hard to do that.
122 test_stdout_ptys_unbuffered = unittest.expectedFailure(
125 test_stdout_ptys_unbuffered = unittest.expectedFailure(
123 test_stdout_ptys_unbuffered
126 test_stdout_ptys_unbuffered
124 )
127 )
125
128
126 def test_stderr_pipes(self):
129 def test_stderr_pipes(self):
127 self._test('stderr', _pipes, UNBUFFERED)
130 self._test('stderr', _pipes, UNBUFFERED)
128
131
129 def test_stderr_ptys(self):
132 def test_stderr_ptys(self):
130 self._test('stderr', _ptys, UNBUFFERED)
133 self._test('stderr', _ptys, UNBUFFERED)
131
134
132 def test_stderr_pipes_unbuffered(self):
135 def test_stderr_pipes_unbuffered(self):
133 self._test('stderr', _pipes, UNBUFFERED, python_args=['-u'])
136 self._test('stderr', _pipes, UNBUFFERED, python_args=['-u'])
134
137
135 def test_stderr_ptys_unbuffered(self):
138 def test_stderr_ptys_unbuffered(self):
136 self._test('stderr', _ptys, UNBUFFERED, python_args=['-u'])
139 self._test('stderr', _ptys, UNBUFFERED, python_args=['-u'])
137
140
138
141
139 if __name__ == '__main__':
142 if __name__ == '__main__':
140 import silenttestrunner
143 import silenttestrunner
141
144
142 silenttestrunner.main(__name__)
145 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now