##// END OF EJS Templates
sigpipe-remote: verify the script is IO are unbuffered...
marmoute -
r48417:d5fc1b59 default
parent child Browse files
Show More
@@ -1,176 +1,181
1 1 #!/usr/bin/env python3
2 2 from __future__ import print_function
3 3
4 import io
4 5 import os
5 6 import subprocess
6 7 import sys
7 8 import threading
8 9 import time
9 10
10 11 # we cannot use mercurial.testing as long as python2 is not dropped as the test will only install the mercurial module for python2 in python2 run
11 12
13 if isinstance(sys.stdout.buffer, io.BufferedWriter):
14 print('SIGPIPE-HELPER: script need unbuffered output', file=sys.stderr)
15 sys.exit(255)
16
12 17 DEBUG_FILE = os.environ.get('SIGPIPE_REMOTE_DEBUG_FILE')
13 18 if DEBUG_FILE is None:
14 19 debug_stream = sys.stderr.buffer
15 20 else:
16 21 debug_stream = open(DEBUG_FILE, 'bw', buffering=0)
17 22
18 23 SYNCFILE1 = os.environ.get('SYNCFILE1')
19 24 SYNCFILE2 = os.environ.get('SYNCFILE2')
20 25 if SYNCFILE1 is None:
21 26 print('SIGPIPE-HELPER: missing variable $SYNCFILE1', file=sys.stderr)
22 27 sys.exit(255)
23 28 if SYNCFILE2 is None:
24 29 print('SIGPIPE-HELPER: missing variable $SYNCFILE2', file=sys.stderr)
25 30 sys.exit(255)
26 31
27 32
28 33 def _timeout_factor():
29 34 """return the current modification to timeout"""
30 35 default = int(os.environ.get('HGTEST_TIMEOUT_DEFAULT', 360))
31 36 current = int(os.environ.get('HGTEST_TIMEOUT', default))
32 37 if current == 0:
33 38 return 1
34 39 return current / float(default)
35 40
36 41
37 42 def wait_file(path, timeout=10):
38 43 timeout *= _timeout_factor()
39 44 start = time.time()
40 45 while not os.path.exists(path):
41 46 if (time.time() - start) > timeout:
42 47 raise RuntimeError(b"timed out waiting for file: %s" % path)
43 48 time.sleep(0.01)
44 49
45 50
46 51 def write_file(path, content=b''):
47 52 with open(path, 'wb') as f:
48 53 f.write(content)
49 54
50 55
51 56 # end of mercurial.testing content
52 57
53 58 if sys.version_info[0] < 3:
54 59 print('SIGPIPE-HELPER: script should run with Python 3', file=sys.stderr)
55 60 sys.exit(255)
56 61
57 62
58 63 def sysbytes(s):
59 64 return s.encode('utf-8')
60 65
61 66
62 67 def sysstr(s):
63 68 return s.decode('latin-1')
64 69
65 70
66 71 piped_stdout = os.pipe2(os.O_NONBLOCK | os.O_CLOEXEC)
67 72 piped_stderr = os.pipe2(os.O_NONBLOCK | os.O_CLOEXEC)
68 73
69 74 stdout_writer = os.fdopen(piped_stdout[1], "rb")
70 75 stdout_reader = os.fdopen(piped_stdout[0], "rb")
71 76 stderr_writer = os.fdopen(piped_stderr[1], "rb")
72 77 stderr_reader = os.fdopen(piped_stderr[0], "rb")
73 78
74 79 debug_stream.write(b'SIGPIPE-HELPER: Starting\n')
75 80
76 81 TESTLIB_DIR = os.path.dirname(sys.argv[0])
77 82 WAIT_SCRIPT = os.path.join(TESTLIB_DIR, 'wait-on-file')
78 83
79 84 hooks_cmd = '%s 10 %s %s'
80 85 hooks_cmd %= (
81 86 WAIT_SCRIPT,
82 87 SYNCFILE2,
83 88 SYNCFILE1,
84 89 )
85 90
86 91 cmd = ['hg']
87 92 cmd += sys.argv[1:]
88 93 sub = subprocess.Popen(
89 94 cmd,
90 95 bufsize=0,
91 96 close_fds=True,
92 97 stdin=sys.stdin,
93 98 stdout=stdout_writer,
94 99 stderr=stderr_writer,
95 100 )
96 101
97 102 debug_stream.write(b'SIGPIPE-HELPER: Mercurial started\n')
98 103
99 104
100 105 shut_down = threading.Event()
101 106
102 107 close_lock = threading.Lock()
103 108
104 109
105 110 def _read(stream):
106 111 try:
107 112 return stream.read()
108 113 except ValueError:
109 114 # read on closed file
110 115 return None
111 116
112 117
113 118 def forward_stdout():
114 119 while not shut_down.is_set():
115 120 c = _read(stdout_reader)
116 121 while c is not None:
117 122 sys.stdout.buffer.write(c)
118 123 c = _read(stdout_reader)
119 124 time.sleep(0.001)
120 125 with close_lock:
121 126 if not stdout_reader.closed:
122 127 stdout_reader.close()
123 128 debug_stream.write(b'SIGPIPE-HELPER: stdout closed\n')
124 129
125 130
126 131 def forward_stderr():
127 132 while not shut_down.is_set():
128 133 c = _read(stderr_reader)
129 134 if c is not None:
130 135 sys.stderr.buffer.write(c)
131 136 c = _read(stderr_reader)
132 137 time.sleep(0.001)
133 138 with close_lock:
134 139 if not stderr_reader.closed:
135 140 stderr_reader.close()
136 141 debug_stream.write(b'SIGPIPE-HELPER: stderr closed\n')
137 142
138 143
139 144 stdout_thread = threading.Thread(target=forward_stdout, daemon=True)
140 145 stderr_thread = threading.Thread(target=forward_stderr, daemon=True)
141 146
142 147 try:
143 148 stdout_thread.start()
144 149 stderr_thread.start()
145 150
146 151 debug_stream.write(b'SIGPIPE-HELPER: Redirection in place\n')
147 152
148 153 try:
149 154 wait_file(sysbytes(SYNCFILE1))
150 155 except RuntimeError as exc:
151 156 msg = sysbytes(str(exc))
152 157 debug_stream.write(b'SIGPIPE-HELPER: wait failed: %s\n' % msg)
153 158 else:
154 159 debug_stream.write(b'SIGPIPE-HELPER: SYNCFILE1 detected\n')
155 160 with close_lock:
156 161 if not stdout_reader.closed:
157 162 stdout_reader.close()
158 163 if not stderr_reader.closed:
159 164 stderr_reader.close()
160 165 sys.stdin.close()
161 166 debug_stream.write(b'SIGPIPE-HELPER: pipes closed\n')
162 167 debug_stream.write(b'SIGPIPE-HELPER: creating SYNCFILE2\n')
163 168 write_file(sysbytes(SYNCFILE2))
164 169 finally:
165 170 debug_stream.write(b'SIGPIPE-HELPER: Shutting down\n')
166 171 shut_down.set()
167 172 if not sys.stdin.closed:
168 173 sys.stdin.close()
169 174 try:
170 175 sub.wait(timeout=30)
171 176 except subprocess.TimeoutExpired:
172 177 msg = b'SIGPIPE-HELPER: Server process failed to terminate\n'
173 178 debug_stream.write(msg)
174 179 else:
175 180 debug_stream.write(b'SIGPIPE-HELPER: Server process terminated\n')
176 181 debug_stream.write(b'SIGPIPE-HELPER: Shut down\n')
General Comments 0
You need to be logged in to leave comments. Login now