Show More
@@ -0,0 +1,105 b'' | |||||
|
1 | #!/usr/bin/env python | |||
|
2 | """ | |||
|
3 | Tests the buffering behavior of stdio streams in `mercurial.utils.procutil`. | |||
|
4 | """ | |||
|
5 | from __future__ import absolute_import | |||
|
6 | ||||
|
7 | import contextlib | |||
|
8 | import os | |||
|
9 | import subprocess | |||
|
10 | import sys | |||
|
11 | import unittest | |||
|
12 | ||||
|
13 | from mercurial import pycompat | |||
|
14 | ||||
|
15 | ||||
|
16 | CHILD_PROCESS = r''' | |||
|
17 | import os | |||
|
18 | ||||
|
19 | from mercurial import dispatch | |||
|
20 | from mercurial.utils import procutil | |||
|
21 | ||||
|
22 | dispatch.initstdio() | |||
|
23 | procutil.stdout.write(b'aaa') | |||
|
24 | os.write(procutil.stdout.fileno(), b'[written aaa]') | |||
|
25 | procutil.stdout.write(b'bbb\n') | |||
|
26 | os.write(procutil.stdout.fileno(), b'[written bbb\\n]') | |||
|
27 | ''' | |||
|
28 | UNBUFFERED = b'aaa[written aaa]bbb\n[written bbb\\n]' | |||
|
29 | LINE_BUFFERED = b'[written aaa]aaabbb\n[written bbb\\n]' | |||
|
30 | FULLY_BUFFERED = b'[written aaa][written bbb\\n]aaabbb\n' | |||
|
31 | ||||
|
32 | ||||
|
33 | @contextlib.contextmanager | |||
|
34 | def _closing(fds): | |||
|
35 | try: | |||
|
36 | yield | |||
|
37 | finally: | |||
|
38 | for fd in fds: | |||
|
39 | try: | |||
|
40 | os.close(fd) | |||
|
41 | except EnvironmentError: | |||
|
42 | pass | |||
|
43 | ||||
|
44 | ||||
|
45 | @contextlib.contextmanager | |||
|
46 | def _pipes(): | |||
|
47 | rwpair = os.pipe() | |||
|
48 | with _closing(rwpair): | |||
|
49 | yield rwpair | |||
|
50 | ||||
|
51 | ||||
|
52 | @contextlib.contextmanager | |||
|
53 | def _ptys(): | |||
|
54 | if pycompat.iswindows: | |||
|
55 | raise unittest.SkipTest("PTYs are not supported on Windows") | |||
|
56 | import pty | |||
|
57 | import tty | |||
|
58 | ||||
|
59 | rwpair = pty.openpty() | |||
|
60 | with _closing(rwpair): | |||
|
61 | tty.setraw(rwpair[0]) | |||
|
62 | yield rwpair | |||
|
63 | ||||
|
64 | ||||
|
65 | class TestStdout(unittest.TestCase): | |||
|
66 | def _test(self, rwpair_generator, expected_output, python_args=[]): | |||
|
67 | with rwpair_generator() as (stdout_receiver, child_stdout), open( | |||
|
68 | os.devnull, 'rb' | |||
|
69 | ) as child_stdin: | |||
|
70 | proc = subprocess.Popen( | |||
|
71 | [sys.executable] + python_args + ['-c', CHILD_PROCESS], | |||
|
72 | stdin=child_stdin, | |||
|
73 | stdout=child_stdout, | |||
|
74 | stderr=None, | |||
|
75 | ) | |||
|
76 | retcode = proc.wait() | |||
|
77 | self.assertEqual(retcode, 0) | |||
|
78 | self.assertEqual(os.read(stdout_receiver, 1024), expected_output) | |||
|
79 | ||||
|
80 | def test_stdout_pipes(self): | |||
|
81 | self._test(_pipes, FULLY_BUFFERED) | |||
|
82 | ||||
|
83 | def test_stdout_ptys(self): | |||
|
84 | self._test(_ptys, LINE_BUFFERED) | |||
|
85 | ||||
|
86 | def test_stdout_pipes_unbuffered(self): | |||
|
87 | self._test(_pipes, UNBUFFERED, python_args=['-u']) | |||
|
88 | ||||
|
89 | def test_stdout_ptys_unbuffered(self): | |||
|
90 | self._test(_ptys, UNBUFFERED, python_args=['-u']) | |||
|
91 | ||||
|
92 | # On Windows, test_stdout_ptys wouldn't pass, but it's skipped anyway. | |||
|
93 | if not pycompat.ispy3 and not pycompat.iswindows: | |||
|
94 | # On Python 2 on non-Windows, we manually open stdout in line-buffered | |||
|
95 | # mode if connected to a TTY. We should check if Python was configured | |||
|
96 | # to use unbuffered stdout, but it's hard to do that. | |||
|
97 | test_stdout_ptys_unbuffered = unittest.expectedFailure( | |||
|
98 | test_stdout_ptys_unbuffered | |||
|
99 | ) | |||
|
100 | ||||
|
101 | ||||
|
102 | if __name__ == '__main__': | |||
|
103 | import silenttestrunner | |||
|
104 | ||||
|
105 | silenttestrunner.main(__name__) |
General Comments 0
You need to be logged in to leave comments.
Login now