##// END OF EJS Templates
test: use a python script in `test-transaction-rollback-on-sigpipe.t`...
test: use a python script in `test-transaction-rollback-on-sigpipe.t` This still does not work on Windows, but at least this is a python script now. Differential Revision: https://phab.mercurial-scm.org/D10947

File last commit:

r48352:ed81f2be default
r48352:ed81f2be default
Show More
sigpipe-remote.py
176 lines | 4.6 KiB | text/x-python | PythonLexer
#!/usr/bin/env python3
from __future__ import print_function
import os
import subprocess
import sys
import threading
import time
# we cannot use mercurial.testing as long as python2 is not dropped as the test will only install the mercurial module for python2 in python2 run
def _timeout_factor():
"""return the current modification to timeout"""
default = int(os.environ.get('HGTEST_TIMEOUT_DEFAULT', 360))
current = int(os.environ.get('HGTEST_TIMEOUT', default))
if current == 0:
return 1
return current / float(default)
def wait_file(path, timeout=10):
timeout *= _timeout_factor()
start = time.time()
while not os.path.exists(path):
if (time.time() - start) > timeout:
raise RuntimeError(b"timed out waiting for file: %s" % path)
time.sleep(0.01)
def write_file(path, content=b''):
with open(path, 'wb') as f:
f.write(content)
# end of mercurial.testing content
if sys.version_info[0] < 3:
print('SIGPIPE-HELPER: script should run with Python 3', file=sys.stderr)
sys.exit(255)
def sysbytes(s):
return s.encode('utf-8')
def sysstr(s):
return s.decode('latin-1')
piped_stdout = os.pipe2(os.O_NONBLOCK | os.O_CLOEXEC)
piped_stderr = os.pipe2(os.O_NONBLOCK | os.O_CLOEXEC)
stdout_writer = os.fdopen(piped_stdout[1], "rb")
stdout_reader = os.fdopen(piped_stdout[0], "rb")
stderr_writer = os.fdopen(piped_stderr[1], "rb")
stderr_reader = os.fdopen(piped_stderr[0], "rb")
DEBUG_FILE = os.environ.get('SIGPIPE_REMOTE_DEBUG_FILE')
if DEBUG_FILE is None:
debug_stream = sys.stderr.buffer
else:
debug_stream = open(DEBUG_FILE, 'bw', buffering=0)
SYNCFILE1 = os.environ.get('SYNCFILE1')
SYNCFILE2 = os.environ.get('SYNCFILE2')
if SYNCFILE1 is None:
print('SIGPIPE-HELPER: missing variable $SYNCFILE1', file=sys.stderr)
sys.exit(255)
if SYNCFILE2 is None:
print('SIGPIPE-HELPER: missing variable $SYNCFILE2', file=sys.stderr)
sys.exit(255)
debug_stream.write(b'SIGPIPE-HELPER: Starting\n')
TESTLIB_DIR = os.path.dirname(sys.argv[0])
WAIT_SCRIPT = os.path.join(TESTLIB_DIR, 'wait-on-file')
hooks_cmd = '%s 10 %s %s'
hooks_cmd %= (
WAIT_SCRIPT,
SYNCFILE2,
SYNCFILE1,
)
cmd = ['hg']
cmd += sys.argv[1:]
sub = subprocess.Popen(
cmd,
bufsize=0,
close_fds=True,
stdin=sys.stdin,
stdout=stdout_writer,
stderr=stderr_writer,
)
debug_stream.write(b'SIGPIPE-HELPER: Mercurial started\n')
shut_down = threading.Event()
close_lock = threading.Lock()
def _read(stream):
try:
return stream.read()
except ValueError:
# read on closed file
return None
def forward_stdout():
while not shut_down.is_set():
c = _read(stdout_reader)
while c is not None:
sys.stdout.buffer.write(c)
c = _read(stdout_reader)
time.sleep(0.001)
with close_lock:
if not stdout_reader.closed:
stdout_reader.close()
debug_stream.write(b'SIGPIPE-HELPER: stdout closed\n')
def forward_stderr():
while not shut_down.is_set():
c = _read(stderr_reader)
if c is not None:
sys.stderr.buffer.write(c)
c = _read(stderr_reader)
time.sleep(0.001)
with close_lock:
if not stderr_reader.closed:
stderr_reader.close()
debug_stream.write(b'SIGPIPE-HELPER: stderr closed\n')
stdout_thread = threading.Thread(target=forward_stdout, daemon=True)
stderr_thread = threading.Thread(target=forward_stderr, daemon=True)
try:
stdout_thread.start()
stderr_thread.start()
debug_stream.write(b'SIGPIPE-HELPER: Redirection in place\n')
try:
wait_file(sysbytes(SYNCFILE1))
except RuntimeError as exc:
msg = sysbytes(str(exc))
debug_stream.write(b'SIGPIPE-HELPER: wait failed: %s\n' % msg)
else:
debug_stream.write(b'SIGPIPE-HELPER: SYNCFILE1 detected\n')
with close_lock:
if not stdout_reader.closed:
stdout_reader.close()
if not stderr_reader.closed:
stderr_reader.close()
sys.stdin.close()
debug_stream.write(b'SIGPIPE-HELPER: pipes closed\n')
debug_stream.write(b'SIGPIPE-HELPER: creating SYNCFILE2\n')
write_file(sysbytes(SYNCFILE2))
finally:
debug_stream.write(b'SIGPIPE-HELPER: Shutting down\n')
shut_down.set()
if not sys.stdin.closed:
sys.stdin.close()
try:
sub.wait(timeout=30)
except subprocess.TimeoutExpired:
msg = b'SIGPIPE-HELPER: Server process failed to terminate\n'
debug_stream.write(msg)
else:
debug_stream.write(b'SIGPIPE-HELPER: Server process terminated\n')
debug_stream.write(b'SIGPIPE-HELPER: Shut down\n')