##// END OF EJS Templates
tests: generalize common test case code in test-stdio.py
Manuel Jacob -
r45589:35988468 default
parent child Browse files
Show More
@@ -1,104 +1,107 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 Tests the buffering behavior of stdio streams in `mercurial.utils.procutil`.
3 Tests the buffering behavior of stdio streams in `mercurial.utils.procutil`.
4 """
4 """
5 from __future__ import absolute_import
5 from __future__ import absolute_import
6
6
7 import contextlib
7 import contextlib
8 import os
8 import os
9 import subprocess
9 import subprocess
10 import sys
10 import sys
11 import unittest
11 import unittest
12
12
13 from mercurial import pycompat
13 from mercurial import pycompat
14
14
15
15
16 CHILD_PROCESS = r'''
16 CHILD_PROCESS = r'''
17 import os
17 import os
18
18
19 from mercurial import dispatch
19 from mercurial import dispatch
20 from mercurial.utils import procutil
20 from mercurial.utils import procutil
21
21
22 dispatch.initstdio()
22 dispatch.initstdio()
23 procutil.stdout.write(b'aaa')
23 procutil.{stream}.write(b'aaa')
24 os.write(procutil.stdout.fileno(), b'[written aaa]')
24 os.write(procutil.{stream}.fileno(), b'[written aaa]')
25 procutil.stdout.write(b'bbb\n')
25 procutil.{stream}.write(b'bbb\n')
26 os.write(procutil.stdout.fileno(), b'[written bbb\\n]')
26 os.write(procutil.{stream}.fileno(), b'[written bbb\\n]')
27 '''
27 '''
28 UNBUFFERED = b'aaa[written aaa]bbb\n[written bbb\\n]'
28 UNBUFFERED = b'aaa[written aaa]bbb\n[written bbb\\n]'
29 LINE_BUFFERED = b'[written aaa]aaabbb\n[written bbb\\n]'
29 LINE_BUFFERED = b'[written aaa]aaabbb\n[written bbb\\n]'
30 FULLY_BUFFERED = b'[written aaa][written bbb\\n]aaabbb\n'
30 FULLY_BUFFERED = b'[written aaa][written bbb\\n]aaabbb\n'
31
31
32
32
33 @contextlib.contextmanager
33 @contextlib.contextmanager
34 def _closing(fds):
34 def _closing(fds):
35 try:
35 try:
36 yield
36 yield
37 finally:
37 finally:
38 for fd in fds:
38 for fd in fds:
39 try:
39 try:
40 os.close(fd)
40 os.close(fd)
41 except EnvironmentError:
41 except EnvironmentError:
42 pass
42 pass
43
43
44
44
45 @contextlib.contextmanager
45 @contextlib.contextmanager
46 def _pipes():
46 def _pipes():
47 rwpair = os.pipe()
47 rwpair = os.pipe()
48 with _closing(rwpair):
48 with _closing(rwpair):
49 yield rwpair
49 yield rwpair
50
50
51
51
52 @contextlib.contextmanager
52 @contextlib.contextmanager
53 def _ptys():
53 def _ptys():
54 if pycompat.iswindows:
54 if pycompat.iswindows:
55 raise unittest.SkipTest("PTYs are not supported on Windows")
55 raise unittest.SkipTest("PTYs are not supported on Windows")
56 import pty
56 import pty
57 import tty
57 import tty
58
58
59 rwpair = pty.openpty()
59 rwpair = pty.openpty()
60 with _closing(rwpair):
60 with _closing(rwpair):
61 tty.setraw(rwpair[0])
61 tty.setraw(rwpair[0])
62 yield rwpair
62 yield rwpair
63
63
64
64
65 class TestStdout(unittest.TestCase):
65 class TestStdio(unittest.TestCase):
66 def _test(self, rwpair_generator, expected_output, python_args=[]):
66 def _test(self, stream, rwpair_generator, expected_output, python_args=[]):
67 with rwpair_generator() as (stdout_receiver, child_stdout), open(
67 assert stream in ('stdout', 'stderr')
68 with rwpair_generator() as (stream_receiver, child_stream), open(
68 os.devnull, 'rb'
69 os.devnull, 'rb'
69 ) as child_stdin:
70 ) as child_stdin:
70 proc = subprocess.Popen(
71 proc = subprocess.Popen(
71 [sys.executable] + python_args + ['-c', CHILD_PROCESS],
72 [sys.executable]
73 + python_args
74 + ['-c', CHILD_PROCESS.format(stream=stream)],
72 stdin=child_stdin,
75 stdin=child_stdin,
73 stdout=child_stdout,
76 stdout=child_stream if stream == 'stdout' else None,
74 stderr=None,
77 stderr=child_stream if stream == 'stderr' else None,
75 )
78 )
76 retcode = proc.wait()
79 retcode = proc.wait()
77 self.assertEqual(retcode, 0)
80 self.assertEqual(retcode, 0)
78 self.assertEqual(os.read(stdout_receiver, 1024), expected_output)
81 self.assertEqual(os.read(stream_receiver, 1024), expected_output)
79
82
80 def test_stdout_pipes(self):
83 def test_stdout_pipes(self):
81 self._test(_pipes, FULLY_BUFFERED)
84 self._test('stdout', _pipes, FULLY_BUFFERED)
82
85
83 def test_stdout_ptys(self):
86 def test_stdout_ptys(self):
84 self._test(_ptys, LINE_BUFFERED)
87 self._test('stdout', _ptys, LINE_BUFFERED)
85
88
86 def test_stdout_pipes_unbuffered(self):
89 def test_stdout_pipes_unbuffered(self):
87 self._test(_pipes, UNBUFFERED, python_args=['-u'])
90 self._test('stdout', _pipes, UNBUFFERED, python_args=['-u'])
88
91
89 def test_stdout_ptys_unbuffered(self):
92 def test_stdout_ptys_unbuffered(self):
90 self._test(_ptys, UNBUFFERED, python_args=['-u'])
93 self._test('stdout', _ptys, UNBUFFERED, python_args=['-u'])
91
94
92 if not pycompat.ispy3 and not pycompat.iswindows:
95 if not pycompat.ispy3 and not pycompat.iswindows:
93 # On Python 2 on non-Windows, we manually open stdout in line-buffered
96 # On Python 2 on non-Windows, we manually open stdout in line-buffered
94 # mode if connected to a TTY. We should check if Python was configured
97 # mode if connected to a TTY. We should check if Python was configured
95 # to use unbuffered stdout, but it's hard to do that.
98 # to use unbuffered stdout, but it's hard to do that.
96 test_stdout_ptys_unbuffered = unittest.expectedFailure(
99 test_stdout_ptys_unbuffered = unittest.expectedFailure(
97 test_stdout_ptys_unbuffered
100 test_stdout_ptys_unbuffered
98 )
101 )
99
102
100
103
101 if __name__ == '__main__':
104 if __name__ == '__main__':
102 import silenttestrunner
105 import silenttestrunner
103
106
104 silenttestrunner.main(__name__)
107 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now