##// END OF EJS Templates
sigpipe-remote: verify the script is IO are unbuffered...
marmoute -
r48417:d5fc1b59 default
parent child Browse files
Show More
@@ -1,176 +1,181
1 #!/usr/bin/env python3
1 #!/usr/bin/env python3
2 from __future__ import print_function
2 from __future__ import print_function
3
3
4 import io
4 import os
5 import os
5 import subprocess
6 import subprocess
6 import sys
7 import sys
7 import threading
8 import threading
8 import time
9 import time
9
10
10 # we cannot use mercurial.testing as long as python2 is not dropped as the test will only install the mercurial module for python2 in python2 run
11 # we cannot use mercurial.testing as long as python2 is not dropped as the test will only install the mercurial module for python2 in python2 run
11
12
13 if isinstance(sys.stdout.buffer, io.BufferedWriter):
14 print('SIGPIPE-HELPER: script need unbuffered output', file=sys.stderr)
15 sys.exit(255)
16
12 DEBUG_FILE = os.environ.get('SIGPIPE_REMOTE_DEBUG_FILE')
17 DEBUG_FILE = os.environ.get('SIGPIPE_REMOTE_DEBUG_FILE')
13 if DEBUG_FILE is None:
18 if DEBUG_FILE is None:
14 debug_stream = sys.stderr.buffer
19 debug_stream = sys.stderr.buffer
15 else:
20 else:
16 debug_stream = open(DEBUG_FILE, 'bw', buffering=0)
21 debug_stream = open(DEBUG_FILE, 'bw', buffering=0)
17
22
18 SYNCFILE1 = os.environ.get('SYNCFILE1')
23 SYNCFILE1 = os.environ.get('SYNCFILE1')
19 SYNCFILE2 = os.environ.get('SYNCFILE2')
24 SYNCFILE2 = os.environ.get('SYNCFILE2')
20 if SYNCFILE1 is None:
25 if SYNCFILE1 is None:
21 print('SIGPIPE-HELPER: missing variable $SYNCFILE1', file=sys.stderr)
26 print('SIGPIPE-HELPER: missing variable $SYNCFILE1', file=sys.stderr)
22 sys.exit(255)
27 sys.exit(255)
23 if SYNCFILE2 is None:
28 if SYNCFILE2 is None:
24 print('SIGPIPE-HELPER: missing variable $SYNCFILE2', file=sys.stderr)
29 print('SIGPIPE-HELPER: missing variable $SYNCFILE2', file=sys.stderr)
25 sys.exit(255)
30 sys.exit(255)
26
31
27
32
28 def _timeout_factor():
33 def _timeout_factor():
29 """return the current modification to timeout"""
34 """return the current modification to timeout"""
30 default = int(os.environ.get('HGTEST_TIMEOUT_DEFAULT', 360))
35 default = int(os.environ.get('HGTEST_TIMEOUT_DEFAULT', 360))
31 current = int(os.environ.get('HGTEST_TIMEOUT', default))
36 current = int(os.environ.get('HGTEST_TIMEOUT', default))
32 if current == 0:
37 if current == 0:
33 return 1
38 return 1
34 return current / float(default)
39 return current / float(default)
35
40
36
41
37 def wait_file(path, timeout=10):
42 def wait_file(path, timeout=10):
38 timeout *= _timeout_factor()
43 timeout *= _timeout_factor()
39 start = time.time()
44 start = time.time()
40 while not os.path.exists(path):
45 while not os.path.exists(path):
41 if (time.time() - start) > timeout:
46 if (time.time() - start) > timeout:
42 raise RuntimeError(b"timed out waiting for file: %s" % path)
47 raise RuntimeError(b"timed out waiting for file: %s" % path)
43 time.sleep(0.01)
48 time.sleep(0.01)
44
49
45
50
46 def write_file(path, content=b''):
51 def write_file(path, content=b''):
47 with open(path, 'wb') as f:
52 with open(path, 'wb') as f:
48 f.write(content)
53 f.write(content)
49
54
50
55
51 # end of mercurial.testing content
56 # end of mercurial.testing content
52
57
53 if sys.version_info[0] < 3:
58 if sys.version_info[0] < 3:
54 print('SIGPIPE-HELPER: script should run with Python 3', file=sys.stderr)
59 print('SIGPIPE-HELPER: script should run with Python 3', file=sys.stderr)
55 sys.exit(255)
60 sys.exit(255)
56
61
57
62
58 def sysbytes(s):
63 def sysbytes(s):
59 return s.encode('utf-8')
64 return s.encode('utf-8')
60
65
61
66
62 def sysstr(s):
67 def sysstr(s):
63 return s.decode('latin-1')
68 return s.decode('latin-1')
64
69
65
70
66 piped_stdout = os.pipe2(os.O_NONBLOCK | os.O_CLOEXEC)
71 piped_stdout = os.pipe2(os.O_NONBLOCK | os.O_CLOEXEC)
67 piped_stderr = os.pipe2(os.O_NONBLOCK | os.O_CLOEXEC)
72 piped_stderr = os.pipe2(os.O_NONBLOCK | os.O_CLOEXEC)
68
73
69 stdout_writer = os.fdopen(piped_stdout[1], "rb")
74 stdout_writer = os.fdopen(piped_stdout[1], "rb")
70 stdout_reader = os.fdopen(piped_stdout[0], "rb")
75 stdout_reader = os.fdopen(piped_stdout[0], "rb")
71 stderr_writer = os.fdopen(piped_stderr[1], "rb")
76 stderr_writer = os.fdopen(piped_stderr[1], "rb")
72 stderr_reader = os.fdopen(piped_stderr[0], "rb")
77 stderr_reader = os.fdopen(piped_stderr[0], "rb")
73
78
74 debug_stream.write(b'SIGPIPE-HELPER: Starting\n')
79 debug_stream.write(b'SIGPIPE-HELPER: Starting\n')
75
80
76 TESTLIB_DIR = os.path.dirname(sys.argv[0])
81 TESTLIB_DIR = os.path.dirname(sys.argv[0])
77 WAIT_SCRIPT = os.path.join(TESTLIB_DIR, 'wait-on-file')
82 WAIT_SCRIPT = os.path.join(TESTLIB_DIR, 'wait-on-file')
78
83
79 hooks_cmd = '%s 10 %s %s'
84 hooks_cmd = '%s 10 %s %s'
80 hooks_cmd %= (
85 hooks_cmd %= (
81 WAIT_SCRIPT,
86 WAIT_SCRIPT,
82 SYNCFILE2,
87 SYNCFILE2,
83 SYNCFILE1,
88 SYNCFILE1,
84 )
89 )
85
90
86 cmd = ['hg']
91 cmd = ['hg']
87 cmd += sys.argv[1:]
92 cmd += sys.argv[1:]
88 sub = subprocess.Popen(
93 sub = subprocess.Popen(
89 cmd,
94 cmd,
90 bufsize=0,
95 bufsize=0,
91 close_fds=True,
96 close_fds=True,
92 stdin=sys.stdin,
97 stdin=sys.stdin,
93 stdout=stdout_writer,
98 stdout=stdout_writer,
94 stderr=stderr_writer,
99 stderr=stderr_writer,
95 )
100 )
96
101
97 debug_stream.write(b'SIGPIPE-HELPER: Mercurial started\n')
102 debug_stream.write(b'SIGPIPE-HELPER: Mercurial started\n')
98
103
99
104
100 shut_down = threading.Event()
105 shut_down = threading.Event()
101
106
102 close_lock = threading.Lock()
107 close_lock = threading.Lock()
103
108
104
109
105 def _read(stream):
110 def _read(stream):
106 try:
111 try:
107 return stream.read()
112 return stream.read()
108 except ValueError:
113 except ValueError:
109 # read on closed file
114 # read on closed file
110 return None
115 return None
111
116
112
117
113 def forward_stdout():
118 def forward_stdout():
114 while not shut_down.is_set():
119 while not shut_down.is_set():
115 c = _read(stdout_reader)
120 c = _read(stdout_reader)
116 while c is not None:
121 while c is not None:
117 sys.stdout.buffer.write(c)
122 sys.stdout.buffer.write(c)
118 c = _read(stdout_reader)
123 c = _read(stdout_reader)
119 time.sleep(0.001)
124 time.sleep(0.001)
120 with close_lock:
125 with close_lock:
121 if not stdout_reader.closed:
126 if not stdout_reader.closed:
122 stdout_reader.close()
127 stdout_reader.close()
123 debug_stream.write(b'SIGPIPE-HELPER: stdout closed\n')
128 debug_stream.write(b'SIGPIPE-HELPER: stdout closed\n')
124
129
125
130
126 def forward_stderr():
131 def forward_stderr():
127 while not shut_down.is_set():
132 while not shut_down.is_set():
128 c = _read(stderr_reader)
133 c = _read(stderr_reader)
129 if c is not None:
134 if c is not None:
130 sys.stderr.buffer.write(c)
135 sys.stderr.buffer.write(c)
131 c = _read(stderr_reader)
136 c = _read(stderr_reader)
132 time.sleep(0.001)
137 time.sleep(0.001)
133 with close_lock:
138 with close_lock:
134 if not stderr_reader.closed:
139 if not stderr_reader.closed:
135 stderr_reader.close()
140 stderr_reader.close()
136 debug_stream.write(b'SIGPIPE-HELPER: stderr closed\n')
141 debug_stream.write(b'SIGPIPE-HELPER: stderr closed\n')
137
142
138
143
139 stdout_thread = threading.Thread(target=forward_stdout, daemon=True)
144 stdout_thread = threading.Thread(target=forward_stdout, daemon=True)
140 stderr_thread = threading.Thread(target=forward_stderr, daemon=True)
145 stderr_thread = threading.Thread(target=forward_stderr, daemon=True)
141
146
142 try:
147 try:
143 stdout_thread.start()
148 stdout_thread.start()
144 stderr_thread.start()
149 stderr_thread.start()
145
150
146 debug_stream.write(b'SIGPIPE-HELPER: Redirection in place\n')
151 debug_stream.write(b'SIGPIPE-HELPER: Redirection in place\n')
147
152
148 try:
153 try:
149 wait_file(sysbytes(SYNCFILE1))
154 wait_file(sysbytes(SYNCFILE1))
150 except RuntimeError as exc:
155 except RuntimeError as exc:
151 msg = sysbytes(str(exc))
156 msg = sysbytes(str(exc))
152 debug_stream.write(b'SIGPIPE-HELPER: wait failed: %s\n' % msg)
157 debug_stream.write(b'SIGPIPE-HELPER: wait failed: %s\n' % msg)
153 else:
158 else:
154 debug_stream.write(b'SIGPIPE-HELPER: SYNCFILE1 detected\n')
159 debug_stream.write(b'SIGPIPE-HELPER: SYNCFILE1 detected\n')
155 with close_lock:
160 with close_lock:
156 if not stdout_reader.closed:
161 if not stdout_reader.closed:
157 stdout_reader.close()
162 stdout_reader.close()
158 if not stderr_reader.closed:
163 if not stderr_reader.closed:
159 stderr_reader.close()
164 stderr_reader.close()
160 sys.stdin.close()
165 sys.stdin.close()
161 debug_stream.write(b'SIGPIPE-HELPER: pipes closed\n')
166 debug_stream.write(b'SIGPIPE-HELPER: pipes closed\n')
162 debug_stream.write(b'SIGPIPE-HELPER: creating SYNCFILE2\n')
167 debug_stream.write(b'SIGPIPE-HELPER: creating SYNCFILE2\n')
163 write_file(sysbytes(SYNCFILE2))
168 write_file(sysbytes(SYNCFILE2))
164 finally:
169 finally:
165 debug_stream.write(b'SIGPIPE-HELPER: Shutting down\n')
170 debug_stream.write(b'SIGPIPE-HELPER: Shutting down\n')
166 shut_down.set()
171 shut_down.set()
167 if not sys.stdin.closed:
172 if not sys.stdin.closed:
168 sys.stdin.close()
173 sys.stdin.close()
169 try:
174 try:
170 sub.wait(timeout=30)
175 sub.wait(timeout=30)
171 except subprocess.TimeoutExpired:
176 except subprocess.TimeoutExpired:
172 msg = b'SIGPIPE-HELPER: Server process failed to terminate\n'
177 msg = b'SIGPIPE-HELPER: Server process failed to terminate\n'
173 debug_stream.write(msg)
178 debug_stream.write(msg)
174 else:
179 else:
175 debug_stream.write(b'SIGPIPE-HELPER: Server process terminated\n')
180 debug_stream.write(b'SIGPIPE-HELPER: Server process terminated\n')
176 debug_stream.write(b'SIGPIPE-HELPER: Shut down\n')
181 debug_stream.write(b'SIGPIPE-HELPER: Shut down\n')
General Comments 0
You need to be logged in to leave comments. Login now