##// END OF EJS Templates
sigpipe-remote: simply delegate pipe forwarding to subprocess we can kill...
marmoute -
r48418:27ff8154 default
parent child Browse files
Show More
@@ -0,0 +1,19
1 #!/usr/bin/env python3
2 #
3 # This is literally `cat` but in python, one char at a time.
4 #
5 # see sigpipe-remote.py for details.
6 from __future__ import print_function
7
8 import io
9 import os
10 import sys
11
12
13 if isinstance(sys.stdout.buffer, io.BufferedWriter):
14 print('SIGPIPE-WORKER: script need unbuffered output', file=sys.stderr)
15 sys.exit(255)
16
17 while True:
18 c = os.read(sys.stdin.fileno(), 1)
19 os.write(sys.stdout.fileno(), c)
@@ -26,37 +26,55 disconnecting. Then exit nonzero, to for
26 > [hooks]
26 > [hooks]
27 > pretxnchangegroup.00-break-things=sh "$RUNTESTDIR/testlib/wait-on-file" 10 "$SYNCFILE2" "$SYNCFILE1"
27 > pretxnchangegroup.00-break-things=sh "$RUNTESTDIR/testlib/wait-on-file" 10 "$SYNCFILE2" "$SYNCFILE1"
28 > pretxnchangegroup.01-output-things=echo "some remote output to be forward to the closed pipe"
28 > pretxnchangegroup.01-output-things=echo "some remote output to be forward to the closed pipe"
29 > pretxnchangegroup.02-output-things=echo "some more remote output"
29 > EOF
30 > EOF
30
31
31 $ hg --cwd ./remote tip -T '{node|short}\n'
32 $ hg --cwd ./remote tip -T '{node|short}\n'
32 000000000000
33 000000000000
33 $ cd local
34 $ cd local
34 $ echo foo > foo ; hg commit -qAm "commit"
35 $ echo foo > foo ; hg commit -qAm "commit"
35 $ hg push -e "\"$PYTHON\" \"$TESTDIR/dummyssh\"" --remotecmd "$remotecmd"
36
36 pushing to ssh://user@dummy/$TESTTMP/remote
37 (use quiet to avoid flacky output from the server)
37 searching for changes
38
38 remote: adding changesets (py3 !)
39 $ hg push --quiet -e "\"$PYTHON\" \"$TESTDIR/dummyssh\"" --remotecmd "$remotecmd"
39 remote: adding manifests (py3 !)
40 remote: adding file changes (py3 !)
41 remote: adding changesets (no-py3 no-chg !)
42 remote: adding manifests (no-py3 no-chg !)
43 remote: adding file changes (no-py3 no-chg !)
44 abort: stream ended unexpectedly (got 0 bytes, expected 4)
40 abort: stream ended unexpectedly (got 0 bytes, expected 4)
45 [255]
41 [255]
46 $ cat $SIGPIPE_REMOTE_DEBUG_FILE
42 $ cat $SIGPIPE_REMOTE_DEBUG_FILE
47 SIGPIPE-HELPER: Starting
43 SIGPIPE-HELPER: Starting
48 SIGPIPE-HELPER: Mercurial started
49 SIGPIPE-HELPER: Redirection in place
44 SIGPIPE-HELPER: Redirection in place
45 SIGPIPE-HELPER: pipes closed in main
50 SIGPIPE-HELPER: SYNCFILE1 detected
46 SIGPIPE-HELPER: SYNCFILE1 detected
51 SIGPIPE-HELPER: pipes closed
47 SIGPIPE-HELPER: worker killed
52 SIGPIPE-HELPER: creating SYNCFILE2
48 SIGPIPE-HELPER: creating SYNCFILE2
53 SIGPIPE-HELPER: Shutting down
49 SIGPIPE-HELPER: Shutting down
54 SIGPIPE-HELPER: Server process terminated
50 SIGPIPE-HELPER: Server process terminated with status 255 (no-windows !)
51 SIGPIPE-HELPER: Server process terminated with status 1 (windows !)
55 SIGPIPE-HELPER: Shut down
52 SIGPIPE-HELPER: Shut down
56
53
57 The remote should be left in a good state
54 The remote should be left in a good state
58 $ hg --cwd ../remote tip -T '{node|short}\n'
55 $ hg --cwd ../remote tip -T '{node|short}\n'
59 000000000000
56 000000000000
57
58 #if windows
59
60 XXX-Windows Broken behavior to be fixed
61
62 Behavior on Windows is broken and should be fixed. However this is a fairly
63 corner case situation and no data are being corrupted. This would affect
64 central repository being hosted on a Windows machine and accessed using ssh.
65
66 This was catch as we setup new CI for Windows. Making the test pass on Windows
67 was enough of a pain that fixing the behavior set aside for now. Dear and
68 honorable reader, feel free to fix it.
69
70 $ hg --cwd ../remote recover
71 rolling back interrupted transaction
72 (verify step skipped, run `hg verify` to check your repository content)
73
74 #else
75
60 $ hg --cwd ../remote recover
76 $ hg --cwd ../remote recover
61 no interrupted transaction available
77 no interrupted transaction available
62 [1]
78 [1]
79
80 #endif
@@ -5,7 +5,6 import io
5 import os
5 import os
6 import subprocess
6 import subprocess
7 import sys
7 import sys
8 import threading
9 import time
8 import time
10
9
11 # we cannot use mercurial.testing as long as python2 is not dropped as the test will only install the mercurial module for python2 in python2 run
10 # we cannot use mercurial.testing as long as python2 is not dropped as the test will only install the mercurial module for python2 in python2 run
@@ -68,14 +67,6 def sysstr(s):
68 return s.decode('latin-1')
67 return s.decode('latin-1')
69
68
70
69
71 piped_stdout = os.pipe2(os.O_NONBLOCK | os.O_CLOEXEC)
72 piped_stderr = os.pipe2(os.O_NONBLOCK | os.O_CLOEXEC)
73
74 stdout_writer = os.fdopen(piped_stdout[1], "rb")
75 stdout_reader = os.fdopen(piped_stdout[0], "rb")
76 stderr_writer = os.fdopen(piped_stderr[1], "rb")
77 stderr_reader = os.fdopen(piped_stderr[0], "rb")
78
79 debug_stream.write(b'SIGPIPE-HELPER: Starting\n')
70 debug_stream.write(b'SIGPIPE-HELPER: Starting\n')
80
71
81 TESTLIB_DIR = os.path.dirname(sys.argv[0])
72 TESTLIB_DIR = os.path.dirname(sys.argv[0])
@@ -88,67 +79,44 hooks_cmd %= (
88 SYNCFILE1,
79 SYNCFILE1,
89 )
80 )
90
81
91 cmd = ['hg']
82 try:
92 cmd += sys.argv[1:]
83 cmd = ['hg']
93 sub = subprocess.Popen(
84 cmd += sys.argv[1:]
94 cmd,
85 sub = subprocess.Popen(
95 bufsize=0,
86 cmd,
96 close_fds=True,
87 bufsize=0,
97 stdin=sys.stdin,
88 close_fds=True,
98 stdout=stdout_writer,
89 stdin=sys.stdin,
99 stderr=stderr_writer,
90 stdout=subprocess.PIPE,
100 )
91 stderr=subprocess.PIPE,
101
92 )
102 debug_stream.write(b'SIGPIPE-HELPER: Mercurial started\n')
103
93
104
94 basedir = os.path.dirname(sys.argv[0])
105 shut_down = threading.Event()
95 worker = os.path.join(basedir, 'sigpipe-worker.py')
106
107 close_lock = threading.Lock()
108
109
96
110 def _read(stream):
97 cmd = [sys.executable, worker]
111 try:
112 return stream.read()
113 except ValueError:
114 # read on closed file
115 return None
116
117
98
118 def forward_stdout():
99 stdout_worker = subprocess.Popen(
119 while not shut_down.is_set():
100 cmd,
120 c = _read(stdout_reader)
101 bufsize=0,
121 while c is not None:
102 close_fds=True,
122 sys.stdout.buffer.write(c)
103 stdin=sub.stdout,
123 c = _read(stdout_reader)
104 stdout=sys.stdout,
124 time.sleep(0.001)
105 stderr=sys.stderr,
125 with close_lock:
106 )
126 if not stdout_reader.closed:
127 stdout_reader.close()
128 debug_stream.write(b'SIGPIPE-HELPER: stdout closed\n')
129
130
107
131 def forward_stderr():
108 stderr_worker = subprocess.Popen(
132 while not shut_down.is_set():
109 cmd,
133 c = _read(stderr_reader)
110 bufsize=0,
134 if c is not None:
111 close_fds=True,
135 sys.stderr.buffer.write(c)
112 stdin=sub.stderr,
136 c = _read(stderr_reader)
113 stdout=sys.stderr,
137 time.sleep(0.001)
114 stderr=sys.stderr,
138 with close_lock:
115 )
139 if not stderr_reader.closed:
140 stderr_reader.close()
141 debug_stream.write(b'SIGPIPE-HELPER: stderr closed\n')
142
143
144 stdout_thread = threading.Thread(target=forward_stdout, daemon=True)
145 stderr_thread = threading.Thread(target=forward_stderr, daemon=True)
146
147 try:
148 stdout_thread.start()
149 stderr_thread.start()
150
151 debug_stream.write(b'SIGPIPE-HELPER: Redirection in place\n')
116 debug_stream.write(b'SIGPIPE-HELPER: Redirection in place\n')
117 os.close(sub.stdout.fileno())
118 os.close(sub.stderr.fileno())
119 debug_stream.write(b'SIGPIPE-HELPER: pipes closed in main\n')
152
120
153 try:
121 try:
154 wait_file(sysbytes(SYNCFILE1))
122 wait_file(sysbytes(SYNCFILE1))
@@ -157,18 +125,16 try:
157 debug_stream.write(b'SIGPIPE-HELPER: wait failed: %s\n' % msg)
125 debug_stream.write(b'SIGPIPE-HELPER: wait failed: %s\n' % msg)
158 else:
126 else:
159 debug_stream.write(b'SIGPIPE-HELPER: SYNCFILE1 detected\n')
127 debug_stream.write(b'SIGPIPE-HELPER: SYNCFILE1 detected\n')
160 with close_lock:
128 stdout_worker.kill()
161 if not stdout_reader.closed:
129 stderr_worker.kill()
162 stdout_reader.close()
130 stdout_worker.wait(10)
163 if not stderr_reader.closed:
131 stderr_worker.wait(10)
164 stderr_reader.close()
132 debug_stream.write(b'SIGPIPE-HELPER: worker killed\n')
165 sys.stdin.close()
133
166 debug_stream.write(b'SIGPIPE-HELPER: pipes closed\n')
167 debug_stream.write(b'SIGPIPE-HELPER: creating SYNCFILE2\n')
134 debug_stream.write(b'SIGPIPE-HELPER: creating SYNCFILE2\n')
168 write_file(sysbytes(SYNCFILE2))
135 write_file(sysbytes(SYNCFILE2))
169 finally:
136 finally:
170 debug_stream.write(b'SIGPIPE-HELPER: Shutting down\n')
137 debug_stream.write(b'SIGPIPE-HELPER: Shutting down\n')
171 shut_down.set()
172 if not sys.stdin.closed:
138 if not sys.stdin.closed:
173 sys.stdin.close()
139 sys.stdin.close()
174 try:
140 try:
@@ -176,6 +142,11 finally:
176 except subprocess.TimeoutExpired:
142 except subprocess.TimeoutExpired:
177 msg = b'SIGPIPE-HELPER: Server process failed to terminate\n'
143 msg = b'SIGPIPE-HELPER: Server process failed to terminate\n'
178 debug_stream.write(msg)
144 debug_stream.write(msg)
145 sub.kill()
146 sub.wait()
147 msg = b'SIGPIPE-HELPER: Server process killed\n'
179 else:
148 else:
180 debug_stream.write(b'SIGPIPE-HELPER: Server process terminated\n')
149 msg = b'SIGPIPE-HELPER: Server process terminated with status %d\n'
150 msg %= sub.returncode
151 debug_stream.write(msg)
181 debug_stream.write(b'SIGPIPE-HELPER: Shut down\n')
152 debug_stream.write(b'SIGPIPE-HELPER: Shut down\n')
General Comments 0
You need to be logged in to leave comments. Login now