##// END OF EJS Templates
tests: make names in test-stdio.py more distinctive...
Manuel Jacob -
r45630:bc05c13e default
parent child Browse files
Show More
@@ -1,145 +1,145 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 Tests the buffering behavior of stdio streams in `mercurial.utils.procutil`.
3 Tests the buffering behavior of stdio streams in `mercurial.utils.procutil`.
4 """
4 """
5 from __future__ import absolute_import
5 from __future__ import absolute_import
6
6
7 import contextlib
7 import contextlib
8 import errno
8 import errno
9 import os
9 import os
10 import subprocess
10 import subprocess
11 import sys
11 import sys
12 import unittest
12 import unittest
13
13
14 from mercurial import pycompat
14 from mercurial import pycompat
15
15
16
16
17 CHILD_PROCESS = r'''
17 BUFFERING_CHILD_SCRIPT = r'''
18 import os
18 import os
19
19
20 from mercurial import dispatch
20 from mercurial import dispatch
21 from mercurial.utils import procutil
21 from mercurial.utils import procutil
22
22
23 dispatch.initstdio()
23 dispatch.initstdio()
24 procutil.{stream}.write(b'aaa')
24 procutil.{stream}.write(b'aaa')
25 os.write(procutil.{stream}.fileno(), b'[written aaa]')
25 os.write(procutil.{stream}.fileno(), b'[written aaa]')
26 procutil.{stream}.write(b'bbb\n')
26 procutil.{stream}.write(b'bbb\n')
27 os.write(procutil.{stream}.fileno(), b'[written bbb\\n]')
27 os.write(procutil.{stream}.fileno(), b'[written bbb\\n]')
28 '''
28 '''
29 UNBUFFERED = b'aaa[written aaa]bbb\n[written bbb\\n]'
29 UNBUFFERED = b'aaa[written aaa]bbb\n[written bbb\\n]'
30 LINE_BUFFERED = b'[written aaa]aaabbb\n[written bbb\\n]'
30 LINE_BUFFERED = b'[written aaa]aaabbb\n[written bbb\\n]'
31 FULLY_BUFFERED = b'[written aaa][written bbb\\n]aaabbb\n'
31 FULLY_BUFFERED = b'[written aaa][written bbb\\n]aaabbb\n'
32
32
33
33
34 @contextlib.contextmanager
34 @contextlib.contextmanager
35 def _closing(fds):
35 def _closing(fds):
36 try:
36 try:
37 yield
37 yield
38 finally:
38 finally:
39 for fd in fds:
39 for fd in fds:
40 try:
40 try:
41 os.close(fd)
41 os.close(fd)
42 except EnvironmentError:
42 except EnvironmentError:
43 pass
43 pass
44
44
45
45
46 @contextlib.contextmanager
46 @contextlib.contextmanager
47 def _pipes():
47 def _pipes():
48 rwpair = os.pipe()
48 rwpair = os.pipe()
49 with _closing(rwpair):
49 with _closing(rwpair):
50 yield rwpair
50 yield rwpair
51
51
52
52
53 @contextlib.contextmanager
53 @contextlib.contextmanager
54 def _ptys():
54 def _ptys():
55 if pycompat.iswindows:
55 if pycompat.iswindows:
56 raise unittest.SkipTest("PTYs are not supported on Windows")
56 raise unittest.SkipTest("PTYs are not supported on Windows")
57 import pty
57 import pty
58 import tty
58 import tty
59
59
60 rwpair = pty.openpty()
60 rwpair = pty.openpty()
61 with _closing(rwpair):
61 with _closing(rwpair):
62 tty.setraw(rwpair[0])
62 tty.setraw(rwpair[0])
63 yield rwpair
63 yield rwpair
64
64
65
65
66 def _readall(fd, buffer_size):
66 def _readall(fd, buffer_size):
67 buf = []
67 buf = []
68 while True:
68 while True:
69 try:
69 try:
70 s = os.read(fd, buffer_size)
70 s = os.read(fd, buffer_size)
71 except OSError as e:
71 except OSError as e:
72 if e.errno == errno.EIO:
72 if e.errno == errno.EIO:
73 # If the child-facing PTY got closed, reading from the
73 # If the child-facing PTY got closed, reading from the
74 # parent-facing PTY raises EIO.
74 # parent-facing PTY raises EIO.
75 break
75 break
76 raise
76 raise
77 if not s:
77 if not s:
78 break
78 break
79 buf.append(s)
79 buf.append(s)
80 return b''.join(buf)
80 return b''.join(buf)
81
81
82
82
83 class TestStdio(unittest.TestCase):
83 class TestStdio(unittest.TestCase):
84 def _test(self, stream, rwpair_generator, expected_output, python_args=[]):
84 def _test(self, stream, rwpair_generator, expected_output, python_args=[]):
85 assert stream in ('stdout', 'stderr')
85 assert stream in ('stdout', 'stderr')
86 with rwpair_generator() as (stream_receiver, child_stream), open(
86 with rwpair_generator() as (stream_receiver, child_stream), open(
87 os.devnull, 'rb'
87 os.devnull, 'rb'
88 ) as child_stdin:
88 ) as child_stdin:
89 proc = subprocess.Popen(
89 proc = subprocess.Popen(
90 [sys.executable]
90 [sys.executable]
91 + python_args
91 + python_args
92 + ['-c', CHILD_PROCESS.format(stream=stream)],
92 + ['-c', BUFFERING_CHILD_SCRIPT.format(stream=stream)],
93 stdin=child_stdin,
93 stdin=child_stdin,
94 stdout=child_stream if stream == 'stdout' else None,
94 stdout=child_stream if stream == 'stdout' else None,
95 stderr=child_stream if stream == 'stderr' else None,
95 stderr=child_stream if stream == 'stderr' else None,
96 )
96 )
97 try:
97 try:
98 os.close(child_stream)
98 os.close(child_stream)
99 self.assertEqual(
99 self.assertEqual(
100 _readall(stream_receiver, 1024), expected_output
100 _readall(stream_receiver, 1024), expected_output
101 )
101 )
102 except: # re-raises
102 except: # re-raises
103 proc.terminate()
103 proc.terminate()
104 raise
104 raise
105 finally:
105 finally:
106 retcode = proc.wait()
106 retcode = proc.wait()
107 self.assertEqual(retcode, 0)
107 self.assertEqual(retcode, 0)
108
108
109 def test_stdout_pipes(self):
109 def test_buffering_stdout_pipes(self):
110 self._test('stdout', _pipes, FULLY_BUFFERED)
110 self._test('stdout', _pipes, FULLY_BUFFERED)
111
111
112 def test_stdout_ptys(self):
112 def test_buffering_stdout_ptys(self):
113 self._test('stdout', _ptys, LINE_BUFFERED)
113 self._test('stdout', _ptys, LINE_BUFFERED)
114
114
115 def test_stdout_pipes_unbuffered(self):
115 def test_buffering_stdout_pipes_unbuffered(self):
116 self._test('stdout', _pipes, UNBUFFERED, python_args=['-u'])
116 self._test('stdout', _pipes, UNBUFFERED, python_args=['-u'])
117
117
118 def test_stdout_ptys_unbuffered(self):
118 def test_buffering_stdout_ptys_unbuffered(self):
119 self._test('stdout', _ptys, UNBUFFERED, python_args=['-u'])
119 self._test('stdout', _ptys, UNBUFFERED, python_args=['-u'])
120
120
121 if not pycompat.ispy3 and not pycompat.iswindows:
121 if not pycompat.ispy3 and not pycompat.iswindows:
122 # On Python 2 on non-Windows, we manually open stdout in line-buffered
122 # On Python 2 on non-Windows, we manually open stdout in line-buffered
123 # mode if connected to a TTY. We should check if Python was configured
123 # mode if connected to a TTY. We should check if Python was configured
124 # to use unbuffered stdout, but it's hard to do that.
124 # to use unbuffered stdout, but it's hard to do that.
125 test_stdout_ptys_unbuffered = unittest.expectedFailure(
125 test_buffering_stdout_ptys_unbuffered = unittest.expectedFailure(
126 test_stdout_ptys_unbuffered
126 test_buffering_stdout_ptys_unbuffered
127 )
127 )
128
128
129 def test_stderr_pipes(self):
129 def test_buffering_stderr_pipes(self):
130 self._test('stderr', _pipes, UNBUFFERED)
130 self._test('stderr', _pipes, UNBUFFERED)
131
131
132 def test_stderr_ptys(self):
132 def test_buffering_stderr_ptys(self):
133 self._test('stderr', _ptys, UNBUFFERED)
133 self._test('stderr', _ptys, UNBUFFERED)
134
134
135 def test_stderr_pipes_unbuffered(self):
135 def test_buffering_stderr_pipes_unbuffered(self):
136 self._test('stderr', _pipes, UNBUFFERED, python_args=['-u'])
136 self._test('stderr', _pipes, UNBUFFERED, python_args=['-u'])
137
137
138 def test_stderr_ptys_unbuffered(self):
138 def test_buffering_stderr_ptys_unbuffered(self):
139 self._test('stderr', _ptys, UNBUFFERED, python_args=['-u'])
139 self._test('stderr', _ptys, UNBUFFERED, python_args=['-u'])
140
140
141
141
142 if __name__ == '__main__':
142 if __name__ == '__main__':
143 import silenttestrunner
143 import silenttestrunner
144
144
145 silenttestrunner.main(__name__)
145 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now