##// END OF EJS Templates
tests: proof test-stdio.py against buffer fill-up...
Manuel Jacob -
r45628:8cd18aba default
parent child Browse files
Show More
@@ -1,119 +1,142 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 Tests the buffering behavior of stdio streams in `mercurial.utils.procutil`.
3 Tests the buffering behavior of stdio streams in `mercurial.utils.procutil`.
4 """
4 """
5 from __future__ import absolute_import
5 from __future__ import absolute_import
6
6
7 import contextlib
7 import contextlib
8 import errno
8 import os
9 import os
9 import subprocess
10 import subprocess
10 import sys
11 import sys
11 import unittest
12 import unittest
12
13
13 from mercurial import pycompat
14 from mercurial import pycompat
14
15
15
16
16 CHILD_PROCESS = r'''
17 CHILD_PROCESS = r'''
17 import os
18 import os
18
19
19 from mercurial import dispatch
20 from mercurial import dispatch
20 from mercurial.utils import procutil
21 from mercurial.utils import procutil
21
22
22 dispatch.initstdio()
23 dispatch.initstdio()
23 procutil.{stream}.write(b'aaa')
24 procutil.{stream}.write(b'aaa')
24 os.write(procutil.{stream}.fileno(), b'[written aaa]')
25 os.write(procutil.{stream}.fileno(), b'[written aaa]')
25 procutil.{stream}.write(b'bbb\n')
26 procutil.{stream}.write(b'bbb\n')
26 os.write(procutil.{stream}.fileno(), b'[written bbb\\n]')
27 os.write(procutil.{stream}.fileno(), b'[written bbb\\n]')
27 '''
28 '''
28 UNBUFFERED = b'aaa[written aaa]bbb\n[written bbb\\n]'
29 UNBUFFERED = b'aaa[written aaa]bbb\n[written bbb\\n]'
29 LINE_BUFFERED = b'[written aaa]aaabbb\n[written bbb\\n]'
30 LINE_BUFFERED = b'[written aaa]aaabbb\n[written bbb\\n]'
30 FULLY_BUFFERED = b'[written aaa][written bbb\\n]aaabbb\n'
31 FULLY_BUFFERED = b'[written aaa][written bbb\\n]aaabbb\n'
31
32
32
33
33 @contextlib.contextmanager
34 @contextlib.contextmanager
34 def _closing(fds):
35 def _closing(fds):
35 try:
36 try:
36 yield
37 yield
37 finally:
38 finally:
38 for fd in fds:
39 for fd in fds:
39 try:
40 try:
40 os.close(fd)
41 os.close(fd)
41 except EnvironmentError:
42 except EnvironmentError:
42 pass
43 pass
43
44
44
45
45 @contextlib.contextmanager
46 @contextlib.contextmanager
46 def _pipes():
47 def _pipes():
47 rwpair = os.pipe()
48 rwpair = os.pipe()
48 with _closing(rwpair):
49 with _closing(rwpair):
49 yield rwpair
50 yield rwpair
50
51
51
52
52 @contextlib.contextmanager
53 @contextlib.contextmanager
53 def _ptys():
54 def _ptys():
54 if pycompat.iswindows:
55 if pycompat.iswindows:
55 raise unittest.SkipTest("PTYs are not supported on Windows")
56 raise unittest.SkipTest("PTYs are not supported on Windows")
56 import pty
57 import pty
57 import tty
58 import tty
58
59
59 rwpair = pty.openpty()
60 rwpair = pty.openpty()
60 with _closing(rwpair):
61 with _closing(rwpair):
61 tty.setraw(rwpair[0])
62 tty.setraw(rwpair[0])
62 yield rwpair
63 yield rwpair
63
64
64
65
66 def _readall(fd, buffer_size):
67 buf = []
68 while True:
69 try:
70 s = os.read(fd, buffer_size)
71 except OSError as e:
72 if e.errno == errno.EIO:
73 # If the child-facing PTY got closed, reading from the
74 # parent-facing PTY raises EIO.
75 break
76 raise
77 if not s:
78 break
79 buf.append(s)
80 return b''.join(buf)
81
82
65 class TestStdio(unittest.TestCase):
83 class TestStdio(unittest.TestCase):
66 def _test(self, stream, rwpair_generator, expected_output, python_args=[]):
84 def _test(self, stream, rwpair_generator, expected_output, python_args=[]):
67 assert stream in ('stdout', 'stderr')
85 assert stream in ('stdout', 'stderr')
68 with rwpair_generator() as (stream_receiver, child_stream), open(
86 with rwpair_generator() as (stream_receiver, child_stream), open(
69 os.devnull, 'rb'
87 os.devnull, 'rb'
70 ) as child_stdin:
88 ) as child_stdin:
71 proc = subprocess.Popen(
89 proc = subprocess.Popen(
72 [sys.executable]
90 [sys.executable]
73 + python_args
91 + python_args
74 + ['-c', CHILD_PROCESS.format(stream=stream)],
92 + ['-c', CHILD_PROCESS.format(stream=stream)],
75 stdin=child_stdin,
93 stdin=child_stdin,
76 stdout=child_stream if stream == 'stdout' else None,
94 stdout=child_stream if stream == 'stdout' else None,
77 stderr=child_stream if stream == 'stderr' else None,
95 stderr=child_stream if stream == 'stderr' else None,
78 )
96 )
79 retcode = proc.wait()
97 try:
98 os.close(child_stream)
99 self.assertEqual(
100 _readall(stream_receiver, 1024), expected_output
101 )
102 finally:
103 retcode = proc.wait()
80 self.assertEqual(retcode, 0)
104 self.assertEqual(retcode, 0)
81 self.assertEqual(os.read(stream_receiver, 1024), expected_output)
82
105
83 def test_stdout_pipes(self):
106 def test_stdout_pipes(self):
84 self._test('stdout', _pipes, FULLY_BUFFERED)
107 self._test('stdout', _pipes, FULLY_BUFFERED)
85
108
86 def test_stdout_ptys(self):
109 def test_stdout_ptys(self):
87 self._test('stdout', _ptys, LINE_BUFFERED)
110 self._test('stdout', _ptys, LINE_BUFFERED)
88
111
89 def test_stdout_pipes_unbuffered(self):
112 def test_stdout_pipes_unbuffered(self):
90 self._test('stdout', _pipes, UNBUFFERED, python_args=['-u'])
113 self._test('stdout', _pipes, UNBUFFERED, python_args=['-u'])
91
114
92 def test_stdout_ptys_unbuffered(self):
115 def test_stdout_ptys_unbuffered(self):
93 self._test('stdout', _ptys, UNBUFFERED, python_args=['-u'])
116 self._test('stdout', _ptys, UNBUFFERED, python_args=['-u'])
94
117
95 if not pycompat.ispy3 and not pycompat.iswindows:
118 if not pycompat.ispy3 and not pycompat.iswindows:
96 # On Python 2 on non-Windows, we manually open stdout in line-buffered
119 # On Python 2 on non-Windows, we manually open stdout in line-buffered
97 # mode if connected to a TTY. We should check if Python was configured
120 # mode if connected to a TTY. We should check if Python was configured
98 # to use unbuffered stdout, but it's hard to do that.
121 # to use unbuffered stdout, but it's hard to do that.
99 test_stdout_ptys_unbuffered = unittest.expectedFailure(
122 test_stdout_ptys_unbuffered = unittest.expectedFailure(
100 test_stdout_ptys_unbuffered
123 test_stdout_ptys_unbuffered
101 )
124 )
102
125
103 def test_stderr_pipes(self):
126 def test_stderr_pipes(self):
104 self._test('stderr', _pipes, UNBUFFERED)
127 self._test('stderr', _pipes, UNBUFFERED)
105
128
106 def test_stderr_ptys(self):
129 def test_stderr_ptys(self):
107 self._test('stderr', _ptys, UNBUFFERED)
130 self._test('stderr', _ptys, UNBUFFERED)
108
131
109 def test_stderr_pipes_unbuffered(self):
132 def test_stderr_pipes_unbuffered(self):
110 self._test('stderr', _pipes, UNBUFFERED, python_args=['-u'])
133 self._test('stderr', _pipes, UNBUFFERED, python_args=['-u'])
111
134
112 def test_stderr_ptys_unbuffered(self):
135 def test_stderr_ptys_unbuffered(self):
113 self._test('stderr', _ptys, UNBUFFERED, python_args=['-u'])
136 self._test('stderr', _ptys, UNBUFFERED, python_args=['-u'])
114
137
115
138
116 if __name__ == '__main__':
139 if __name__ == '__main__':
117 import silenttestrunner
140 import silenttestrunner
118
141
119 silenttestrunner.main(__name__)
142 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now