##// END OF EJS Templates
fix(svn-hooks): fixed problem with svn subprocess execution fixes RCCE-62
super-admin -
r1220:99aaec7b v5.0.0 stable
parent child Browse files
Show More
@@ -1,220 +1,230 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18
18 19 import re
19 20 import os
20 21 import sys
21 22 import datetime
22 23 import logging
23 24 import pkg_resources
24 25
25 26 import vcsserver
27 import vcsserver.settings
26 28 from vcsserver.str_utils import safe_bytes
27 29
28 30 log = logging.getLogger(__name__)
29 31
30 32 HOOKS_DIR_MODE = 0o755
31 33 HOOKS_FILE_MODE = 0o755
32 34
33 35
34 36 def set_permissions_if_needed(path_to_check, perms: oct):
35 37 # Get current permissions
36 38 current_permissions = os.stat(path_to_check).st_mode & 0o777 # Extract permission bits
37 39
38 40 # Check if current permissions are lower than required
39 41 if current_permissions < int(perms):
40 42 # Change the permissions if they are lower than required
41 43 os.chmod(path_to_check, perms)
42 44
43 45
44 46 def get_git_hooks_path(repo_path, bare):
45 47 hooks_path = os.path.join(repo_path, 'hooks')
46 48 if not bare:
47 49 hooks_path = os.path.join(repo_path, '.git', 'hooks')
48 50
49 51 return hooks_path
50 52
51 53
52 54 def install_git_hooks(repo_path, bare, executable=None, force_create=False):
53 55 """
54 56 Creates a RhodeCode hook inside a git repository
55 57
56 58 :param repo_path: path to repository
57 59 :param bare: defines if repository is considered a bare git repo
58 60 :param executable: binary executable to put in the hooks
59 61 :param force_create: Creates even if the same name hook exists
60 62 """
61 63 executable = executable or sys.executable
62 64 hooks_path = get_git_hooks_path(repo_path, bare)
63 65
64 66 # we always call it to ensure dir exists and it has a proper mode
65 67 if not os.path.exists(hooks_path):
66 68 # If it doesn't exist, create a new directory with the specified mode
67 69 os.makedirs(hooks_path, mode=HOOKS_DIR_MODE, exist_ok=True)
68 70 # If it exists, change the directory's mode to the specified mode
69 71 set_permissions_if_needed(hooks_path, perms=HOOKS_DIR_MODE)
70 72
71 73 tmpl_post = pkg_resources.resource_string(
72 74 'vcsserver', '/'.join(
73 75 ('hook_utils', 'hook_templates', 'git_post_receive.py.tmpl')))
74 76 tmpl_pre = pkg_resources.resource_string(
75 77 'vcsserver', '/'.join(
76 78 ('hook_utils', 'hook_templates', 'git_pre_receive.py.tmpl')))
77 79
78 80 path = '' # not used for now
79 81 timestamp = datetime.datetime.utcnow().isoformat()
80 82
81 83 for h_type, template in [('pre', tmpl_pre), ('post', tmpl_post)]:
82 84 log.debug('Installing git hook in repo %s', repo_path)
83 85 _hook_file = os.path.join(hooks_path, f'{h_type}-receive')
84 86 _rhodecode_hook = check_rhodecode_hook(_hook_file)
85 87
86 88 if _rhodecode_hook or force_create:
87 89 log.debug('writing git %s hook file at %s !', h_type, _hook_file)
88 90 try:
89 91 with open(_hook_file, 'wb') as f:
90 92 template = template.replace(b'_TMPL_', safe_bytes(vcsserver.get_version()))
91 93 template = template.replace(b'_DATE_', safe_bytes(timestamp))
92 94 template = template.replace(b'_ENV_', safe_bytes(executable))
93 95 template = template.replace(b'_PATH_', safe_bytes(path))
94 96 f.write(template)
95 97 set_permissions_if_needed(_hook_file, perms=HOOKS_FILE_MODE)
96 98 except OSError:
97 99 log.exception('error writing hook file %s', _hook_file)
98 100 else:
99 101 log.debug('skipping writing hook file')
100 102
101 103 return True
102 104
103 105
104 106 def get_svn_hooks_path(repo_path):
105 107 hooks_path = os.path.join(repo_path, 'hooks')
106 108
107 109 return hooks_path
108 110
109 111
110 112 def install_svn_hooks(repo_path, executable=None, force_create=False):
111 113 """
112 114 Creates RhodeCode hooks inside a svn repository
113 115
114 116 :param repo_path: path to repository
115 117 :param executable: binary executable to put in the hooks
116 118 :param force_create: Create even if same name hook exists
117 119 """
118 120 executable = executable or sys.executable
119 121 hooks_path = get_svn_hooks_path(repo_path)
120 122 if not os.path.isdir(hooks_path):
121 123 os.makedirs(hooks_path, mode=0o777, exist_ok=True)
122 124
123 125 tmpl_post = pkg_resources.resource_string(
124 126 'vcsserver', '/'.join(
125 127 ('hook_utils', 'hook_templates', 'svn_post_commit_hook.py.tmpl')))
126 128 tmpl_pre = pkg_resources.resource_string(
127 129 'vcsserver', '/'.join(
128 130 ('hook_utils', 'hook_templates', 'svn_pre_commit_hook.py.tmpl')))
129 131
130 132 path = '' # not used for now
131 133 timestamp = datetime.datetime.utcnow().isoformat()
132 134
133 135 for h_type, template in [('pre', tmpl_pre), ('post', tmpl_post)]:
134 136 log.debug('Installing svn hook in repo %s', repo_path)
135 137 _hook_file = os.path.join(hooks_path, f'{h_type}-commit')
136 138 _rhodecode_hook = check_rhodecode_hook(_hook_file)
137 139
138 140 if _rhodecode_hook or force_create:
139 141 log.debug('writing svn %s hook file at %s !', h_type, _hook_file)
140 142
143 env_expand = str([
144 ('RC_CORE_BINARY_DIR', vcsserver.settings.BINARY_DIR),
145 ('RC_GIT_EXECUTABLE', vcsserver.settings.GIT_EXECUTABLE),
146 ('RC_SVN_EXECUTABLE', vcsserver.settings.SVN_EXECUTABLE),
147 ('RC_SVNLOOK_EXECUTABLE', vcsserver.settings.SVNLOOK_EXECUTABLE),
148
149 ])
141 150 try:
142 151 with open(_hook_file, 'wb') as f:
143 152 template = template.replace(b'_TMPL_', safe_bytes(vcsserver.get_version()))
144 153 template = template.replace(b'_DATE_', safe_bytes(timestamp))
154 template = template.replace(b'_OS_EXPAND_', safe_bytes(env_expand))
145 155 template = template.replace(b'_ENV_', safe_bytes(executable))
146 156 template = template.replace(b'_PATH_', safe_bytes(path))
147 157
148 158 f.write(template)
149 159 os.chmod(_hook_file, 0o755)
150 160 except OSError:
151 161 log.exception('error writing hook file %s', _hook_file)
152 162 else:
153 163 log.debug('skipping writing hook file')
154 164
155 165 return True
156 166
157 167
158 168 def get_version_from_hook(hook_path):
159 169 version = b''
160 170 hook_content = read_hook_content(hook_path)
161 171 matches = re.search(rb'RC_HOOK_VER\s*=\s*(.*)', hook_content)
162 172 if matches:
163 173 try:
164 174 version = matches.groups()[0]
165 175 log.debug('got version %s from hooks.', version)
166 176 except Exception:
167 177 log.exception("Exception while reading the hook version.")
168 178 return version.replace(b"'", b"")
169 179
170 180
171 181 def check_rhodecode_hook(hook_path):
172 182 """
173 183 Check if the hook was created by RhodeCode
174 184 """
175 185 if not os.path.exists(hook_path):
176 186 return True
177 187
178 188 log.debug('hook exists, checking if it is from RhodeCode')
179 189
180 190 version = get_version_from_hook(hook_path)
181 191 if version:
182 192 return True
183 193
184 194 return False
185 195
186 196
187 197 def read_hook_content(hook_path) -> bytes:
188 198 content = b''
189 199 if os.path.isfile(hook_path):
190 200 with open(hook_path, 'rb') as f:
191 201 content = f.read()
192 202 return content
193 203
194 204
195 205 def get_git_pre_hook_version(repo_path, bare):
196 206 hooks_path = get_git_hooks_path(repo_path, bare)
197 207 _hook_file = os.path.join(hooks_path, 'pre-receive')
198 208 version = get_version_from_hook(_hook_file)
199 209 return version
200 210
201 211
202 212 def get_git_post_hook_version(repo_path, bare):
203 213 hooks_path = get_git_hooks_path(repo_path, bare)
204 214 _hook_file = os.path.join(hooks_path, 'post-receive')
205 215 version = get_version_from_hook(_hook_file)
206 216 return version
207 217
208 218
209 219 def get_svn_pre_hook_version(repo_path):
210 220 hooks_path = get_svn_hooks_path(repo_path)
211 221 _hook_file = os.path.join(hooks_path, 'pre-commit')
212 222 version = get_version_from_hook(_hook_file)
213 223 return version
214 224
215 225
216 226 def get_svn_post_hook_version(repo_path):
217 227 hooks_path = get_svn_hooks_path(repo_path)
218 228 _hook_file = os.path.join(hooks_path, 'post-commit')
219 229 version = get_version_from_hook(_hook_file)
220 230 return version
@@ -1,50 +1,54 b''
1 1 #!_ENV_
2 2
3 3 import os
4 4 import sys
5 5 path_adjust = [_PATH_]
6 6
7 7 if path_adjust:
8 8 sys.path = path_adjust
9 9
10 10 try:
11 11 from vcsserver import hooks
12 12 except ImportError:
13 13 if os.environ.get('RC_DEBUG_SVN_HOOK'):
14 14 import traceback
15 15 print(traceback.format_exc())
16 16 hooks = None
17 17
18 18
19 19 # TIMESTAMP: _DATE_
20 20 RC_HOOK_VER = '_TMPL_'
21 21
22 22
23 # special trick to pass in some information from rc to hooks
24 # mod_dav strips ALL env vars and we can't even access things like PATH
25 for env_k, env_v in _OS_EXPAND_:
26 os.environ[env_k] = env_v
27
23 28 def main():
24 29 if hooks is None:
25 30 # exit with success if we cannot import vcsserver.hooks !!
26 31 # this allows simply push to this repo even without rhodecode
27 32 sys.exit(0)
28 33
29 34 if os.environ.get('RC_SKIP_HOOKS') or os.environ.get('RC_SKIP_SVN_HOOKS'):
30 35 sys.exit(0)
31 36 repo_path = os.getcwd()
32 37 push_data = sys.argv[1:]
33 38
34 39 os.environ['RC_HOOK_VER'] = RC_HOOK_VER
35 40
36 41 try:
37 42 result = hooks.svn_post_commit(repo_path, push_data, os.environ)
38 43 sys.exit(result)
39 44 except Exception as error:
40 45 # TODO: johbo: Improve handling of this special case
41 46 if not getattr(error, '_vcs_kind', None) == 'repo_locked':
42 47 raise
43 48 print(f'ERROR: {error}')
44 49 sys.exit(1)
45 50 sys.exit(0)
46 51
47 52
48
49 53 if __name__ == '__main__':
50 54 main()
@@ -1,52 +1,58 b''
1 1 #!_ENV_
2 2
3 3 import os
4 4 import sys
5 5 path_adjust = [_PATH_]
6 6
7 7 if path_adjust:
8 8 sys.path = path_adjust
9 9
10 10 try:
11 11 from vcsserver import hooks
12 12 except ImportError:
13 13 if os.environ.get('RC_DEBUG_SVN_HOOK'):
14 14 import traceback
15 15 print(traceback.format_exc())
16 16 hooks = None
17 17
18 18
19 19 # TIMESTAMP: _DATE_
20 20 RC_HOOK_VER = '_TMPL_'
21 21
22 22
23 # special trick to pass in some information from rc to hooks
24 # mod_dav strips ALL env vars and we can't even access things like PATH
25 for env_k, env_v in _OS_EXPAND_:
26 os.environ[env_k] = env_v
27
23 28 def main():
24 29 if os.environ.get('SSH_READ_ONLY') == '1':
25 30 sys.stderr.write('Only read-only access is allowed')
26 31 sys.exit(1)
27 32
28 33 if hooks is None:
29 34 # exit with success if we cannot import vcsserver.hooks !!
30 35 # this allows simply push to this repo even without rhodecode
31 36 sys.exit(0)
37
32 38 if os.environ.get('RC_SKIP_HOOKS') or os.environ.get('RC_SKIP_SVN_HOOKS'):
33 39 sys.exit(0)
34 40 repo_path = os.getcwd()
35 41 push_data = sys.argv[1:]
36 42
37 43 os.environ['RC_HOOK_VER'] = RC_HOOK_VER
38 44
39 45 try:
40 46 result = hooks.svn_pre_commit(repo_path, push_data, os.environ)
41 47 sys.exit(result)
42 48 except Exception as error:
43 49 # TODO: johbo: Improve handling of this special case
44 50 if not getattr(error, '_vcs_kind', None) == 'repo_locked':
45 51 raise
46 52 print(f'ERROR: {error}')
47 53 sys.exit(1)
48 54 sys.exit(0)
49 55
50 56
51 57 if __name__ == '__main__':
52 58 main()
@@ -1,795 +1,818 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import io
19 19 import os
20 20 import sys
21 21 import logging
22 22 import collections
23 23 import base64
24 24 import msgpack
25 25 import dataclasses
26 26 import pygit2
27 27
28 28 import http.client
29 29 from celery import Celery
30 30
31 31 import mercurial.scmutil
32 32 import mercurial.node
33 33
34 import vcsserver.settings
34 35 from vcsserver.lib.rc_json import json
35 36 from vcsserver import exceptions, subprocessio, settings
36 37 from vcsserver.str_utils import ascii_str, safe_str
37 38 from vcsserver.remote.git_remote import Repository
38 39
39 40 celery_app = Celery('__vcsserver__')
40 41 log = logging.getLogger(__name__)
41 42
42 43
43 44 class HooksHttpClient:
44 45 proto = 'msgpack.v1'
45 46 connection = None
46 47
47 48 def __init__(self, hooks_uri):
48 49 self.hooks_uri = hooks_uri
49 50
50 51 def __repr__(self):
51 52 return f'{self.__class__}(hook_uri={self.hooks_uri}, proto={self.proto})'
52 53
53 54 def __call__(self, method, extras):
54 55 connection = http.client.HTTPConnection(self.hooks_uri)
55 56 # binary msgpack body
56 57 headers, body = self._serialize(method, extras)
57 58 log.debug('Doing a new hooks call using HTTPConnection to %s', self.hooks_uri)
58 59
59 60 try:
60 61 try:
61 62 connection.request('POST', '/', body, headers)
62 63 except Exception as error:
63 64 log.error('Hooks calling Connection failed on %s, org error: %s', connection.__dict__, error)
64 65 raise
65 66
66 67 response = connection.getresponse()
67 68 try:
68 69 return msgpack.load(response)
69 70 except Exception:
70 71 response_data = response.read()
71 72 log.exception('Failed to decode hook response json data. '
72 73 'response_code:%s, raw_data:%s',
73 74 response.status, response_data)
74 75 raise
75 76 finally:
76 77 connection.close()
77 78
78 79 @classmethod
79 80 def _serialize(cls, hook_name, extras):
80 81 data = {
81 82 'method': hook_name,
82 83 'extras': extras
83 84 }
84 85 headers = {
85 86 "rc-hooks-protocol": cls.proto,
86 87 "Connection": "keep-alive"
87 88 }
88 89 return headers, msgpack.packb(data)
89 90
90 91
91 92 class HooksCeleryClient:
92 93 TASK_TIMEOUT = 60 # time in seconds
93 94
94 95 def __init__(self, queue, backend):
95 96 celery_app.config_from_object({
96 97 'broker_url': queue, 'result_backend': backend,
97 98 'broker_connection_retry_on_startup': True,
98 99 'task_serializer': 'msgpack',
99 100 'accept_content': ['json', 'msgpack'],
100 101 'result_serializer': 'msgpack',
101 102 'result_accept_content': ['json', 'msgpack']
102 103 })
103 104 self.celery_app = celery_app
104 105
105 106 def __call__(self, method, extras):
106 107 inquired_task = self.celery_app.signature(
107 108 f'rhodecode.lib.celerylib.tasks.{method}'
108 109 )
109 110 return inquired_task.delay(extras).get(timeout=self.TASK_TIMEOUT)
110 111
111 112
112 113 class HooksShadowRepoClient:
113 114
114 115 def __call__(self, hook_name, extras):
115 116 return {'output': '', 'status': 0}
116 117
117 118
118 119 class RemoteMessageWriter:
119 120 """Writer base class."""
120 121 def write(self, message):
121 122 raise NotImplementedError()
122 123
123 124
124 125 class HgMessageWriter(RemoteMessageWriter):
125 126 """Writer that knows how to send messages to mercurial clients."""
126 127
127 128 def __init__(self, ui):
128 129 self.ui = ui
129 130
130 131 def write(self, message: str):
131 132 # TODO: Check why the quiet flag is set by default.
132 133 old = self.ui.quiet
133 134 self.ui.quiet = False
134 135 self.ui.status(message.encode('utf-8'))
135 136 self.ui.quiet = old
136 137
137 138
138 139 class GitMessageWriter(RemoteMessageWriter):
139 140 """Writer that knows how to send messages to git clients."""
140 141
141 142 def __init__(self, stdout=None):
142 143 self.stdout = stdout or sys.stdout
143 144
144 145 def write(self, message: str):
145 146 self.stdout.write(message)
146 147
147 148
148 149 class SvnMessageWriter(RemoteMessageWriter):
149 150 """Writer that knows how to send messages to svn clients."""
150 151
151 152 def __init__(self, stderr=None):
152 153 # SVN needs data sent to stderr for back-to-client messaging
153 154 self.stderr = stderr or sys.stderr
154 155
155 156 def write(self, message):
156 self.stderr.write(message.encode('utf-8'))
157 self.stderr.write(message)
157 158
158 159
159 160 def _handle_exception(result):
160 161 exception_class = result.get('exception')
161 162 exception_traceback = result.get('exception_traceback')
162 163 log.debug('Handling hook-call exception: %s', exception_class)
163 164
164 165 if exception_traceback:
165 166 log.error('Got traceback from remote call:%s', exception_traceback)
166 167
167 168 if exception_class == 'HTTPLockedRC':
168 169 raise exceptions.RepositoryLockedException()(*result['exception_args'])
169 170 elif exception_class == 'HTTPBranchProtected':
170 171 raise exceptions.RepositoryBranchProtectedException()(*result['exception_args'])
171 172 elif exception_class == 'RepositoryError':
172 173 raise exceptions.VcsException()(*result['exception_args'])
173 174 elif exception_class:
174 175 raise Exception(
175 176 f"""Got remote exception "{exception_class}" with args "{result['exception_args']}" """
176 177 )
177 178
178 179
179 180 def _get_hooks_client(extras):
180 181 hooks_uri = extras.get('hooks_uri')
181 182 task_queue = extras.get('task_queue')
182 183 task_backend = extras.get('task_backend')
183 184 is_shadow_repo = extras.get('is_shadow_repo')
184 185
185 186 if hooks_uri:
186 187 return HooksHttpClient(hooks_uri)
187 188 elif task_queue and task_backend:
188 189 return HooksCeleryClient(task_queue, task_backend)
189 190 elif is_shadow_repo:
190 191 return HooksShadowRepoClient()
191 192 else:
192 193 raise Exception("Hooks client not found!")
193 194
194 195
195 196 def _call_hook(hook_name, extras, writer):
196 197 hooks_client = _get_hooks_client(extras)
197 198 log.debug('Hooks, using client:%s', hooks_client)
198 199 result = hooks_client(hook_name, extras)
199 200 log.debug('Hooks got result: %s', result)
200 201 _handle_exception(result)
201 202 writer.write(result['output'])
202 203
203 204 return result['status']
204 205
205 206
206 207 def _extras_from_ui(ui):
207 208 hook_data = ui.config(b'rhodecode', b'RC_SCM_DATA')
208 209 if not hook_data:
209 210 # maybe it's inside environ ?
210 211 env_hook_data = os.environ.get('RC_SCM_DATA')
211 212 if env_hook_data:
212 213 hook_data = env_hook_data
213 214
214 215 extras = {}
215 216 if hook_data:
216 217 extras = json.loads(hook_data)
217 218 return extras
218 219
219 220
220 221 def _rev_range_hash(repo, node, check_heads=False):
221 222 from vcsserver.hgcompat import get_ctx
222 223
223 224 commits = []
224 225 revs = []
225 226 start = get_ctx(repo, node).rev()
226 227 end = len(repo)
227 228 for rev in range(start, end):
228 229 revs.append(rev)
229 230 ctx = get_ctx(repo, rev)
230 231 commit_id = ascii_str(mercurial.node.hex(ctx.node()))
231 232 branch = safe_str(ctx.branch())
232 233 commits.append((commit_id, branch))
233 234
234 235 parent_heads = []
235 236 if check_heads:
236 237 parent_heads = _check_heads(repo, start, end, revs)
237 238 return commits, parent_heads
238 239
239 240
240 241 def _check_heads(repo, start, end, commits):
241 242 from vcsserver.hgcompat import get_ctx
242 243 changelog = repo.changelog
243 244 parents = set()
244 245
245 246 for new_rev in commits:
246 247 for p in changelog.parentrevs(new_rev):
247 248 if p == mercurial.node.nullrev:
248 249 continue
249 250 if p < start:
250 251 parents.add(p)
251 252
252 253 for p in parents:
253 254 branch = get_ctx(repo, p).branch()
254 255 # The heads descending from that parent, on the same branch
255 256 parent_heads = {p}
256 257 reachable = {p}
257 258 for x in range(p + 1, end):
258 259 if get_ctx(repo, x).branch() != branch:
259 260 continue
260 261 for pp in changelog.parentrevs(x):
261 262 if pp in reachable:
262 263 reachable.add(x)
263 264 parent_heads.discard(pp)
264 265 parent_heads.add(x)
265 266 # More than one head? Suggest merging
266 267 if len(parent_heads) > 1:
267 268 return list(parent_heads)
268 269
269 270 return []
270 271
271 272
272 273 def _get_git_env():
273 274 env = {}
274 275 for k, v in os.environ.items():
275 276 if k.startswith('GIT'):
276 277 env[k] = v
277 278
278 279 # serialized version
279 280 return [(k, v) for k, v in env.items()]
280 281
281 282
282 283 def _get_hg_env(old_rev, new_rev, txnid, repo_path):
283 284 env = {}
284 285 for k, v in os.environ.items():
285 286 if k.startswith('HG'):
286 287 env[k] = v
287 288
288 289 env['HG_NODE'] = old_rev
289 290 env['HG_NODE_LAST'] = new_rev
290 291 env['HG_TXNID'] = txnid
291 292 env['HG_PENDING'] = repo_path
292 293
293 294 return [(k, v) for k, v in env.items()]
294 295
295 296
297 def _fix_hooks_executables():
298 """
299 This is a trick to set proper settings.EXECUTABLE paths for certain execution patterns
300 especially for subversion where hooks strip entire env, and calling just 'svn' command will most likely fail
301 because svn is not on PATH
302 """
303 vcsserver.settings.BINARY_DIR = (
304 os.environ.get('RC_BINARY_DIR') or vcsserver.settings.BINARY_DIR)
305 vcsserver.settings.GIT_EXECUTABLE = (
306 os.environ.get('RC_GIT_EXECUTABLE') or vcsserver.settings.GIT_EXECUTABLE)
307 vcsserver.settings.SVN_EXECUTABLE = (
308 os.environ.get('RC_SVN_EXECUTABLE') or vcsserver.settings.SVN_EXECUTABLE)
309 vcsserver.settings.SVNLOOK_EXECUTABLE = (
310 os.environ.get('RC_SVNLOOK_EXECUTABLE') or vcsserver.settings.SVNLOOK_EXECUTABLE)
311
312
296 313 def repo_size(ui, repo, **kwargs):
297 314 extras = _extras_from_ui(ui)
298 315 return _call_hook('repo_size', extras, HgMessageWriter(ui))
299 316
300 317
301 318 def pre_pull(ui, repo, **kwargs):
302 319 extras = _extras_from_ui(ui)
303 320 return _call_hook('pre_pull', extras, HgMessageWriter(ui))
304 321
305 322
306 323 def pre_pull_ssh(ui, repo, **kwargs):
307 324 extras = _extras_from_ui(ui)
308 325 if extras and extras.get('SSH'):
309 326 return pre_pull(ui, repo, **kwargs)
310 327 return 0
311 328
312 329
313 330 def post_pull(ui, repo, **kwargs):
314 331 extras = _extras_from_ui(ui)
315 332 return _call_hook('post_pull', extras, HgMessageWriter(ui))
316 333
317 334
318 335 def post_pull_ssh(ui, repo, **kwargs):
319 336 extras = _extras_from_ui(ui)
320 337 if extras and extras.get('SSH'):
321 338 return post_pull(ui, repo, **kwargs)
322 339 return 0
323 340
324 341
325 342 def pre_push(ui, repo, node=None, **kwargs):
326 343 """
327 344 Mercurial pre_push hook
328 345 """
329 346 extras = _extras_from_ui(ui)
330 347 detect_force_push = extras.get('detect_force_push')
331 348
332 349 rev_data = []
333 350 hook_type: str = safe_str(kwargs.get('hooktype'))
334 351
335 352 if node and hook_type == 'pretxnchangegroup':
336 353 branches = collections.defaultdict(list)
337 354 commits, _heads = _rev_range_hash(repo, node, check_heads=detect_force_push)
338 355 for commit_id, branch in commits:
339 356 branches[branch].append(commit_id)
340 357
341 358 for branch, commits in branches.items():
342 359 old_rev = ascii_str(kwargs.get('node_last')) or commits[0]
343 360 rev_data.append({
344 361 'total_commits': len(commits),
345 362 'old_rev': old_rev,
346 363 'new_rev': commits[-1],
347 364 'ref': '',
348 365 'type': 'branch',
349 366 'name': branch,
350 367 })
351 368
352 369 for push_ref in rev_data:
353 370 push_ref['multiple_heads'] = _heads
354 371
355 372 repo_path = os.path.join(
356 373 extras.get('repo_store', ''), extras.get('repository', ''))
357 374 push_ref['hg_env'] = _get_hg_env(
358 375 old_rev=push_ref['old_rev'],
359 376 new_rev=push_ref['new_rev'], txnid=ascii_str(kwargs.get('txnid')),
360 377 repo_path=repo_path)
361 378
362 379 extras['hook_type'] = hook_type or 'pre_push'
363 380 extras['commit_ids'] = rev_data
364 381
365 382 return _call_hook('pre_push', extras, HgMessageWriter(ui))
366 383
367 384
368 385 def pre_push_ssh(ui, repo, node=None, **kwargs):
369 386 extras = _extras_from_ui(ui)
370 387 if extras.get('SSH'):
371 388 return pre_push(ui, repo, node, **kwargs)
372 389
373 390 return 0
374 391
375 392
376 393 def pre_push_ssh_auth(ui, repo, node=None, **kwargs):
377 394 """
378 395 Mercurial pre_push hook for SSH
379 396 """
380 397 extras = _extras_from_ui(ui)
381 398 if extras.get('SSH'):
382 399 permission = extras['SSH_PERMISSIONS']
383 400
384 401 if 'repository.write' == permission or 'repository.admin' == permission:
385 402 return 0
386 403
387 404 # non-zero ret code
388 405 return 1
389 406
390 407 return 0
391 408
392 409
393 410 def post_push(ui, repo, node, **kwargs):
394 411 """
395 412 Mercurial post_push hook
396 413 """
397 414 extras = _extras_from_ui(ui)
398 415
399 416 commit_ids = []
400 417 branches = []
401 418 bookmarks = []
402 419 tags = []
403 420 hook_type: str = safe_str(kwargs.get('hooktype'))
404 421
405 422 commits, _heads = _rev_range_hash(repo, node)
406 423 for commit_id, branch in commits:
407 424 commit_ids.append(commit_id)
408 425 if branch not in branches:
409 426 branches.append(branch)
410 427
411 428 if hasattr(ui, '_rc_pushkey_bookmarks'):
412 429 bookmarks = ui._rc_pushkey_bookmarks
413 430
414 431 extras['hook_type'] = hook_type or 'post_push'
415 432 extras['commit_ids'] = commit_ids
416 433
417 434 extras['new_refs'] = {
418 435 'branches': branches,
419 436 'bookmarks': bookmarks,
420 437 'tags': tags
421 438 }
422 439
423 440 return _call_hook('post_push', extras, HgMessageWriter(ui))
424 441
425 442
426 443 def post_push_ssh(ui, repo, node, **kwargs):
427 444 """
428 445 Mercurial post_push hook for SSH
429 446 """
430 447 if _extras_from_ui(ui).get('SSH'):
431 448 return post_push(ui, repo, node, **kwargs)
432 449 return 0
433 450
434 451
435 452 def key_push(ui, repo, **kwargs):
436 453 from vcsserver.hgcompat import get_ctx
437 454
438 455 if kwargs['new'] != b'0' and kwargs['namespace'] == b'bookmarks':
439 456 # store new bookmarks in our UI object propagated later to post_push
440 457 ui._rc_pushkey_bookmarks = get_ctx(repo, kwargs['key']).bookmarks()
441 458 return
442 459
443 460
444 461 # backward compat
445 462 log_pull_action = post_pull
446 463
447 464 # backward compat
448 465 log_push_action = post_push
449 466
450 467
451 468 def handle_git_pre_receive(unused_repo_path, unused_revs, unused_env):
452 469 """
453 470 Old hook name: keep here for backward compatibility.
454 471
455 472 This is only required when the installed git hooks are not upgraded.
456 473 """
457 474 pass
458 475
459 476
460 477 def handle_git_post_receive(unused_repo_path, unused_revs, unused_env):
461 478 """
462 479 Old hook name: keep here for backward compatibility.
463 480
464 481 This is only required when the installed git hooks are not upgraded.
465 482 """
466 483 pass
467 484
468 485
469 486 @dataclasses.dataclass
470 487 class HookResponse:
471 488 status: int
472 489 output: str
473 490
474 491
475 492 def git_pre_pull(extras) -> HookResponse:
476 493 """
477 494 Pre pull hook.
478 495
479 496 :param extras: dictionary containing the keys defined in simplevcs
480 497 :type extras: dict
481 498
482 499 :return: status code of the hook. 0 for success.
483 500 :rtype: int
484 501 """
485 502
486 503 if 'pull' not in extras['hooks']:
487 504 return HookResponse(0, '')
488 505
489 506 stdout = io.StringIO()
490 507 try:
491 508 status_code = _call_hook('pre_pull', extras, GitMessageWriter(stdout))
492 509
493 510 except Exception as error:
494 511 log.exception('Failed to call pre_pull hook')
495 512 status_code = 128
496 513 stdout.write(f'ERROR: {error}\n')
497 514
498 515 return HookResponse(status_code, stdout.getvalue())
499 516
500 517
501 518 def git_post_pull(extras) -> HookResponse:
502 519 """
503 520 Post pull hook.
504 521
505 522 :param extras: dictionary containing the keys defined in simplevcs
506 523 :type extras: dict
507 524
508 525 :return: status code of the hook. 0 for success.
509 526 :rtype: int
510 527 """
511 528 if 'pull' not in extras['hooks']:
512 529 return HookResponse(0, '')
513 530
514 531 stdout = io.StringIO()
515 532 try:
516 533 status = _call_hook('post_pull', extras, GitMessageWriter(stdout))
517 534 except Exception as error:
518 535 status = 128
519 536 stdout.write(f'ERROR: {error}\n')
520 537
521 538 return HookResponse(status, stdout.getvalue())
522 539
523 540
524 541 def _parse_git_ref_lines(revision_lines):
525 542 rev_data = []
526 543 for revision_line in revision_lines or []:
527 544 old_rev, new_rev, ref = revision_line.strip().split(' ')
528 545 ref_data = ref.split('/', 2)
529 546 if ref_data[1] in ('tags', 'heads'):
530 547 rev_data.append({
531 548 # NOTE(marcink):
532 549 # we're unable to tell total_commits for git at this point
533 550 # but we set the variable for consistency with GIT
534 551 'total_commits': -1,
535 552 'old_rev': old_rev,
536 553 'new_rev': new_rev,
537 554 'ref': ref,
538 555 'type': ref_data[1],
539 556 'name': ref_data[2],
540 557 })
541 558 return rev_data
542 559
543 560
544 561 def git_pre_receive(unused_repo_path, revision_lines, env) -> int:
545 562 """
546 563 Pre push hook.
547 564
548 565 :return: status code of the hook. 0 for success.
549 566 """
550 567 extras = json.loads(env['RC_SCM_DATA'])
551 568 rev_data = _parse_git_ref_lines(revision_lines)
552 569 if 'push' not in extras['hooks']:
553 570 return 0
554 571 empty_commit_id = '0' * 40
555 572
556 573 detect_force_push = extras.get('detect_force_push')
557
574 _fix_hooks_executables()
558 575 for push_ref in rev_data:
559 576 # store our git-env which holds the temp store
560 577 push_ref['git_env'] = _get_git_env()
561 578 push_ref['pruned_sha'] = ''
562 579 if not detect_force_push:
563 580 # don't check for forced-push when we don't need to
564 581 continue
565 582
566 583 type_ = push_ref['type']
567 584 new_branch = push_ref['old_rev'] == empty_commit_id
568 585 delete_branch = push_ref['new_rev'] == empty_commit_id
569 586 if type_ == 'heads' and not (new_branch or delete_branch):
570 587 old_rev = push_ref['old_rev']
571 588 new_rev = push_ref['new_rev']
572 589 cmd = [settings.GIT_EXECUTABLE, 'rev-list', old_rev, f'^{new_rev}']
573 590 stdout, stderr = subprocessio.run_command(
574 591 cmd, env=os.environ.copy())
575 592 # means we're having some non-reachable objects, this forced push was used
576 593 if stdout:
577 594 push_ref['pruned_sha'] = stdout.splitlines()
578 595
579 596 extras['hook_type'] = 'pre_receive'
580 597 extras['commit_ids'] = rev_data
581 598
582 599 stdout = sys.stdout
583 600 status_code = _call_hook('pre_push', extras, GitMessageWriter(stdout))
584 601
585 602 return status_code
586 603
587 604
588 605 def git_post_receive(unused_repo_path, revision_lines, env) -> int:
589 606 """
590 607 Post push hook.
591 608
592 609 :return: status code of the hook. 0 for success.
593 610 """
594 611 extras = json.loads(env['RC_SCM_DATA'])
595 612 if 'push' not in extras['hooks']:
596 613 return 0
614 _fix_hooks_executables()
597 615
598 616 rev_data = _parse_git_ref_lines(revision_lines)
599 617
600 618 git_revs = []
601 619
602 620 # N.B.(skreft): it is ok to just call git, as git before calling a
603 621 # subcommand sets the PATH environment variable so that it point to the
604 622 # correct version of the git executable.
605 623 empty_commit_id = '0' * 40
606 624 branches = []
607 625 tags = []
608 626 for push_ref in rev_data:
609 627 type_ = push_ref['type']
610 628
611 629 if type_ == 'heads':
612 630 # starting new branch case
613 631 if push_ref['old_rev'] == empty_commit_id:
614 632 push_ref_name = push_ref['name']
615 633
616 634 if push_ref_name not in branches:
617 635 branches.append(push_ref_name)
618 636
619 637 need_head_set = ''
620 638 with Repository(os.getcwd()) as repo:
621 639 try:
622 640 repo.head
623 641 except pygit2.GitError:
624 642 need_head_set = f'refs/heads/{push_ref_name}'
625 643
626 644 if need_head_set:
627 645 repo.set_head(need_head_set)
628 646 print(f"Setting default branch to {push_ref_name}")
629 647
630 648 cmd = [settings.GIT_EXECUTABLE, 'for-each-ref', '--format=%(refname)', 'refs/heads/*']
631 649 stdout, stderr = subprocessio.run_command(
632 650 cmd, env=os.environ.copy())
633 651 heads = safe_str(stdout)
634 652 heads = heads.replace(push_ref['ref'], '')
635 653 heads = ' '.join(head for head
636 654 in heads.splitlines() if head) or '.'
637 655 cmd = [settings.GIT_EXECUTABLE, 'log', '--reverse',
638 656 '--pretty=format:%H', '--', push_ref['new_rev'],
639 657 '--not', heads]
640 658 stdout, stderr = subprocessio.run_command(
641 659 cmd, env=os.environ.copy())
642 660 git_revs.extend(list(map(ascii_str, stdout.splitlines())))
643 661
644 662 # delete branch case
645 663 elif push_ref['new_rev'] == empty_commit_id:
646 664 git_revs.append(f'delete_branch=>{push_ref["name"]}')
647 665 else:
648 666 if push_ref['name'] not in branches:
649 667 branches.append(push_ref['name'])
650 668
651 669 cmd = [settings.GIT_EXECUTABLE, 'log',
652 670 f'{push_ref["old_rev"]}..{push_ref["new_rev"]}',
653 671 '--reverse', '--pretty=format:%H']
654 672 stdout, stderr = subprocessio.run_command(
655 673 cmd, env=os.environ.copy())
656 674 # we get bytes from stdout, we need str to be consistent
657 675 log_revs = list(map(ascii_str, stdout.splitlines()))
658 676 git_revs.extend(log_revs)
659 677
660 678 # Pure pygit2 impl. but still 2-3x slower :/
661 679 # results = []
662 680 #
663 681 # with Repository(os.getcwd()) as repo:
664 682 # repo_new_rev = repo[push_ref['new_rev']]
665 683 # repo_old_rev = repo[push_ref['old_rev']]
666 684 # walker = repo.walk(repo_new_rev.id, pygit2.GIT_SORT_TOPOLOGICAL)
667 685 #
668 686 # for commit in walker:
669 687 # if commit.id == repo_old_rev.id:
670 688 # break
671 689 # results.append(commit.id.hex)
672 690 # # reverse the order, can't use GIT_SORT_REVERSE
673 691 # log_revs = results[::-1]
674 692
675 693 elif type_ == 'tags':
676 694 if push_ref['name'] not in tags:
677 695 tags.append(push_ref['name'])
678 696 git_revs.append(f'tag=>{push_ref["name"]}')
679 697
680 698 extras['hook_type'] = 'post_receive'
681 699 extras['commit_ids'] = git_revs
682 700 extras['new_refs'] = {
683 701 'branches': branches,
684 702 'bookmarks': [],
685 703 'tags': tags,
686 704 }
687 705
688 706 stdout = sys.stdout
689 707
690 708 if 'repo_size' in extras['hooks']:
691 709 try:
692 710 _call_hook('repo_size', extras, GitMessageWriter(stdout))
693 711 except Exception:
694 712 pass
695 713
696 714 status_code = _call_hook('post_push', extras, GitMessageWriter(stdout))
697 715 return status_code
698 716
699 717
700 718 def _get_extras_from_txn_id(path, txn_id):
701 719 extras = {}
702 720 try:
703 721 cmd = [settings.SVNLOOK_EXECUTABLE, 'pget',
704 722 '-t', txn_id,
705 723 '--revprop', path, 'rc-scm-extras']
706 724 stdout, stderr = subprocessio.run_command(
707 725 cmd, env=os.environ.copy())
708 726 extras = json.loads(base64.urlsafe_b64decode(stdout))
709 727 except Exception:
710 728 log.exception('Failed to extract extras info from txn_id')
711 729
712 730 return extras
713 731
714 732
715 733 def _get_extras_from_commit_id(commit_id, path):
716 734 extras = {}
717 735 try:
718 736 cmd = [settings.SVNLOOK_EXECUTABLE, 'pget',
719 737 '-r', commit_id,
720 738 '--revprop', path, 'rc-scm-extras']
721 739 stdout, stderr = subprocessio.run_command(
722 740 cmd, env=os.environ.copy())
723 741 extras = json.loads(base64.urlsafe_b64decode(stdout))
724 742 except Exception:
725 743 log.exception('Failed to extract extras info from commit_id')
726 744
727 745 return extras
728 746
729 747
730 748 def svn_pre_commit(repo_path, commit_data, env):
731 749 path, txn_id = commit_data
732 750 branches = []
733 751 tags = []
734 752
753 _fix_hooks_executables()
735 754 if env.get('RC_SCM_DATA'):
736 755 extras = json.loads(env['RC_SCM_DATA'])
737 756 else:
738 757 # fallback method to read from TXN-ID stored data
739 758 extras = _get_extras_from_txn_id(path, txn_id)
740 759 if not extras:
741 760 return 0
742 761
743 762 extras['hook_type'] = 'pre_commit'
744 763 extras['commit_ids'] = [txn_id]
745 764 extras['txn_id'] = txn_id
746 765 extras['new_refs'] = {
747 766 'total_commits': 1,
748 767 'branches': branches,
749 768 'bookmarks': [],
750 769 'tags': tags,
751 770 }
752 771
753 772 return _call_hook('pre_push', extras, SvnMessageWriter())
754 773
755 774
756 775 def svn_post_commit(repo_path, commit_data, env):
757 776 """
758 777 commit_data is path, rev, txn_id
759 778 """
779
760 780 if len(commit_data) == 3:
761 781 path, commit_id, txn_id = commit_data
762 782 elif len(commit_data) == 2:
763 783 log.error('Failed to extract txn_id from commit_data using legacy method. '
764 784 'Some functionality might be limited')
765 785 path, commit_id = commit_data
766 786 txn_id = None
787 else:
788 return 0
767 789
768 790 branches = []
769 791 tags = []
770 792
793 _fix_hooks_executables()
771 794 if env.get('RC_SCM_DATA'):
772 795 extras = json.loads(env['RC_SCM_DATA'])
773 796 else:
774 797 # fallback method to read from TXN-ID stored data
775 798 extras = _get_extras_from_commit_id(commit_id, path)
776 799 if not extras:
777 800 return 0
778 801
779 802 extras['hook_type'] = 'post_commit'
780 803 extras['commit_ids'] = [commit_id]
781 804 extras['txn_id'] = txn_id
782 805 extras['new_refs'] = {
783 806 'branches': branches,
784 807 'bookmarks': [],
785 808 'tags': tags,
786 809 'total_commits': 1,
787 810 }
788 811
789 812 if 'repo_size' in extras['hooks']:
790 813 try:
791 814 _call_hook('repo_size', extras, SvnMessageWriter())
792 815 except Exception:
793 816 pass
794 817
795 818 return _call_hook('post_push', extras, SvnMessageWriter())
@@ -1,775 +1,777 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import io
19 19 import os
20 20 import platform
21 21 import sys
22 22 import locale
23 23 import logging
24 24 import uuid
25 25 import time
26 26 import wsgiref.util
27 27 import tempfile
28 28 import psutil
29 29
30 30 from itertools import chain
31 31
32 32 import msgpack
33 33 import configparser
34 34
35 35 from pyramid.config import Configurator
36 36 from pyramid.wsgi import wsgiapp
37 37 from pyramid.response import Response
38 38
39 39 from vcsserver.base import BytesEnvelope, BinaryEnvelope
40 40 from vcsserver.lib.rc_json import json
41 41 from vcsserver.config.settings_maker import SettingsMaker
42 42 from vcsserver.str_utils import safe_int
43 43 from vcsserver.lib.statsd_client import StatsdClient
44 44 from vcsserver.tweens.request_wrapper import get_headers_call_context
45 45
46 46 import vcsserver
47 47 from vcsserver import remote_wsgi, scm_app, settings, hgpatches
48 48 from vcsserver.git_lfs.app import GIT_LFS_CONTENT_TYPE, GIT_LFS_PROTO_PAT
49 49 from vcsserver.echo_stub import remote_wsgi as remote_wsgi_stub
50 50 from vcsserver.echo_stub.echo_app import EchoApp
51 51 from vcsserver.exceptions import HTTPRepoLocked, HTTPRepoBranchProtected
52 52 from vcsserver.lib.exc_tracking import store_exception, format_exc
53 53 from vcsserver.server import VcsServer
54 54
55 55 strict_vcs = True
56 56
57 57 git_import_err = None
58 58 try:
59 59 from vcsserver.remote.git_remote import GitFactory, GitRemote
60 60 except ImportError as e:
61 61 GitFactory = None
62 62 GitRemote = None
63 63 git_import_err = e
64 64 if strict_vcs:
65 65 raise
66 66
67 67
68 68 hg_import_err = None
69 69 try:
70 70 from vcsserver.remote.hg_remote import MercurialFactory, HgRemote
71 71 except ImportError as e:
72 72 MercurialFactory = None
73 73 HgRemote = None
74 74 hg_import_err = e
75 75 if strict_vcs:
76 76 raise
77 77
78 78
79 79 svn_import_err = None
80 80 try:
81 81 from vcsserver.remote.svn_remote import SubversionFactory, SvnRemote
82 82 except ImportError as e:
83 83 SubversionFactory = None
84 84 SvnRemote = None
85 85 svn_import_err = e
86 86 if strict_vcs:
87 87 raise
88 88
89 89 log = logging.getLogger(__name__)
90 90
91 91 # due to Mercurial/glibc2.27 problems we need to detect if locale settings are
92 92 # causing problems and "fix" it in case they do and fallback to LC_ALL = C
93 93
94 94 try:
95 95 locale.setlocale(locale.LC_ALL, '')
96 96 except locale.Error as e:
97 97 log.error(
98 98 'LOCALE ERROR: failed to set LC_ALL, fallback to LC_ALL=C, org error: %s', e)
99 99 os.environ['LC_ALL'] = 'C'
100 100
101 101
102 102 def _is_request_chunked(environ):
103 103 stream = environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked'
104 104 return stream
105 105
106 106
107 107 def log_max_fd():
108 108 try:
109 109 maxfd = psutil.Process().rlimit(psutil.RLIMIT_NOFILE)[1]
110 110 log.info('Max file descriptors value: %s', maxfd)
111 111 except Exception:
112 112 pass
113 113
114 114
115 115 class VCS:
116 116 def __init__(self, locale_conf=None, cache_config=None):
117 117 self.locale = locale_conf
118 118 self.cache_config = cache_config
119 119 self._configure_locale()
120 120
121 121 log_max_fd()
122 122
123 123 if GitFactory and GitRemote:
124 124 git_factory = GitFactory()
125 125 self._git_remote = GitRemote(git_factory)
126 126 else:
127 127 log.error("Git client import failed: %s", git_import_err)
128 128
129 129 if MercurialFactory and HgRemote:
130 130 hg_factory = MercurialFactory()
131 131 self._hg_remote = HgRemote(hg_factory)
132 132 else:
133 133 log.error("Mercurial client import failed: %s", hg_import_err)
134 134
135 135 if SubversionFactory and SvnRemote:
136 136 svn_factory = SubversionFactory()
137 137
138 138 # hg factory is used for svn url validation
139 139 hg_factory = MercurialFactory()
140 140 self._svn_remote = SvnRemote(svn_factory, hg_factory=hg_factory)
141 141 else:
142 142 log.error("Subversion client import failed: %s", svn_import_err)
143 143
144 144 self._vcsserver = VcsServer()
145 145
146 146 def _configure_locale(self):
147 147 if self.locale:
148 148 log.info('Settings locale: `LC_ALL` to %s', self.locale)
149 149 else:
150 150 log.info('Configuring locale subsystem based on environment variables')
151 151 try:
152 152 # If self.locale is the empty string, then the locale
153 153 # module will use the environment variables. See the
154 154 # documentation of the package `locale`.
155 155 locale.setlocale(locale.LC_ALL, self.locale)
156 156
157 157 language_code, encoding = locale.getlocale()
158 158 log.info(
159 159 'Locale set to language code "%s" with encoding "%s".',
160 160 language_code, encoding)
161 161 except locale.Error:
162 162 log.exception('Cannot set locale, not configuring the locale system')
163 163
164 164
165 165 class WsgiProxy:
166 166 def __init__(self, wsgi):
167 167 self.wsgi = wsgi
168 168
169 169 def __call__(self, environ, start_response):
170 170 input_data = environ['wsgi.input'].read()
171 171 input_data = msgpack.unpackb(input_data)
172 172
173 173 error = None
174 174 try:
175 175 data, status, headers = self.wsgi.handle(
176 176 input_data['environment'], input_data['input_data'],
177 177 *input_data['args'], **input_data['kwargs'])
178 178 except Exception as e:
179 179 data, status, headers = [], None, None
180 180 error = {
181 181 'message': str(e),
182 182 '_vcs_kind': getattr(e, '_vcs_kind', None)
183 183 }
184 184
185 185 start_response(200, {})
186 186 return self._iterator(error, status, headers, data)
187 187
188 188 def _iterator(self, error, status, headers, data):
189 189 initial_data = [
190 190 error,
191 191 status,
192 192 headers,
193 193 ]
194 194
195 195 for d in chain(initial_data, data):
196 196 yield msgpack.packb(d)
197 197
198 198
199 199 def not_found(request):
200 200 return {'status': '404 NOT FOUND'}
201 201
202 202
203 203 class VCSViewPredicate:
204 204 def __init__(self, val, config):
205 205 self.remotes = val
206 206
207 207 def text(self):
208 208 return f'vcs view method = {list(self.remotes.keys())}'
209 209
210 210 phash = text
211 211
212 212 def __call__(self, context, request):
213 213 """
214 214 View predicate that returns true if given backend is supported by
215 215 defined remotes.
216 216 """
217 217 backend = request.matchdict.get('backend')
218 218 return backend in self.remotes
219 219
220 220
221 221 class HTTPApplication:
222 222 ALLOWED_EXCEPTIONS = ('KeyError', 'URLError')
223 223
224 224 remote_wsgi = remote_wsgi
225 225 _use_echo_app = False
226 226
227 227 def __init__(self, settings=None, global_config=None):
228 228
229 229 self.config = Configurator(settings=settings)
230 230 # Init our statsd at very start
231 231 self.config.registry.statsd = StatsdClient.statsd
232 232 self.config.registry.vcs_call_context = {}
233 233
234 234 self.global_config = global_config
235 235 self.config.include('vcsserver.lib.rc_cache')
236 236 self.config.include('vcsserver.lib.rc_cache.archive_cache')
237 237
238 238 settings_locale = settings.get('locale', '') or 'en_US.UTF-8'
239 239 vcs = VCS(locale_conf=settings_locale, cache_config=settings)
240 240 self._remotes = {
241 241 'hg': vcs._hg_remote,
242 242 'git': vcs._git_remote,
243 243 'svn': vcs._svn_remote,
244 244 'server': vcs._vcsserver,
245 245 }
246 246 if settings.get('dev.use_echo_app', 'false').lower() == 'true':
247 247 self._use_echo_app = True
248 248 log.warning("Using EchoApp for VCS operations.")
249 249 self.remote_wsgi = remote_wsgi_stub
250 250
251 251 self._configure_settings(global_config, settings)
252 252
253 253 self._configure()
254 254
255 255 def _configure_settings(self, global_config, app_settings):
256 256 """
257 257 Configure the settings module.
258 258 """
259 259 settings_merged = global_config.copy()
260 260 settings_merged.update(app_settings)
261 261
262 git_path = app_settings.get('git_path', None)
263 if git_path:
264 settings.GIT_EXECUTABLE = git_path
265 binary_dir = app_settings.get('core.binary_dir', None)
266 if binary_dir:
267 settings.BINARY_DIR = binary_dir
262 binary_dir = app_settings['core.binary_dir']
263
264 settings.BINARY_DIR = binary_dir
265
266 # from core.binary dir we set executable paths
267 settings.GIT_EXECUTABLE = os.path.join(binary_dir, settings.GIT_EXECUTABLE)
268 settings.SVN_EXECUTABLE = os.path.join(binary_dir, settings.SVN_EXECUTABLE)
269 settings.SVNLOOK_EXECUTABLE = os.path.join(binary_dir, settings.SVNLOOK_EXECUTABLE)
268 270
269 271 # Store the settings to make them available to other modules.
270 272 vcsserver.PYRAMID_SETTINGS = settings_merged
271 273 vcsserver.CONFIG = settings_merged
272 274
273 275 def _configure(self):
274 276 self.config.add_renderer(name='msgpack', factory=self._msgpack_renderer_factory)
275 277
276 278 self.config.add_route('service', '/_service')
277 279 self.config.add_route('status', '/status')
278 280 self.config.add_route('hg_proxy', '/proxy/hg')
279 281 self.config.add_route('git_proxy', '/proxy/git')
280 282
281 283 # rpc methods
282 284 self.config.add_route('vcs', '/{backend}')
283 285
284 286 # streaming rpc remote methods
285 287 self.config.add_route('vcs_stream', '/{backend}/stream')
286 288
287 289 # vcs operations clone/push as streaming
288 290 self.config.add_route('stream_git', '/stream/git/*repo_name')
289 291 self.config.add_route('stream_hg', '/stream/hg/*repo_name')
290 292
291 293 self.config.add_view(self.status_view, route_name='status', renderer='json')
292 294 self.config.add_view(self.service_view, route_name='service', renderer='msgpack')
293 295
294 296 self.config.add_view(self.hg_proxy(), route_name='hg_proxy')
295 297 self.config.add_view(self.git_proxy(), route_name='git_proxy')
296 298 self.config.add_view(self.vcs_view, route_name='vcs', renderer='msgpack',
297 299 vcs_view=self._remotes)
298 300 self.config.add_view(self.vcs_stream_view, route_name='vcs_stream',
299 301 vcs_view=self._remotes)
300 302
301 303 self.config.add_view(self.hg_stream(), route_name='stream_hg')
302 304 self.config.add_view(self.git_stream(), route_name='stream_git')
303 305
304 306 self.config.add_view_predicate('vcs_view', VCSViewPredicate)
305 307
306 308 self.config.add_notfound_view(not_found, renderer='json')
307 309
308 310 self.config.add_view(self.handle_vcs_exception, context=Exception)
309 311
310 312 self.config.add_tween(
311 313 'vcsserver.tweens.request_wrapper.RequestWrapperTween',
312 314 )
313 315 self.config.add_request_method(
314 316 'vcsserver.lib.request_counter.get_request_counter',
315 317 'request_count')
316 318
317 319 def wsgi_app(self):
318 320 return self.config.make_wsgi_app()
319 321
320 322 def _vcs_view_params(self, request):
321 323 remote = self._remotes[request.matchdict['backend']]
322 324 payload = msgpack.unpackb(request.body, use_list=True)
323 325
324 326 method = payload.get('method')
325 327 params = payload['params']
326 328 wire = params.get('wire')
327 329 args = params.get('args')
328 330 kwargs = params.get('kwargs')
329 331 context_uid = None
330 332
331 333 request.registry.vcs_call_context = {
332 334 'method': method,
333 335 'repo_name': payload.get('_repo_name'),
334 336 }
335 337
336 338 if wire:
337 339 try:
338 340 wire['context'] = context_uid = uuid.UUID(wire['context'])
339 341 except KeyError:
340 342 pass
341 343 args.insert(0, wire)
342 344 repo_state_uid = wire.get('repo_state_uid') if wire else None
343 345
344 346 # NOTE(marcink): trading complexity for slight performance
345 347 if log.isEnabledFor(logging.DEBUG):
346 348 # also we SKIP printing out any of those methods args since they maybe excessive
347 349 just_args_methods = {
348 350 'commitctx': ('content', 'removed', 'updated'),
349 351 'commit': ('content', 'removed', 'updated')
350 352 }
351 353 if method in just_args_methods:
352 354 skip_args = just_args_methods[method]
353 355 call_args = ''
354 356 call_kwargs = {}
355 357 for k in kwargs:
356 358 if k in skip_args:
357 359 # replace our skip key with dummy
358 360 call_kwargs[k] = f'RemovedParam({k})'
359 361 else:
360 362 call_kwargs[k] = kwargs[k]
361 363 else:
362 364 call_args = args[1:]
363 365 call_kwargs = kwargs
364 366
365 367 log.debug('Method requested:`%s` with args:%s kwargs:%s context_uid: %s, repo_state_uid:%s',
366 368 method, call_args, call_kwargs, context_uid, repo_state_uid)
367 369
368 370 statsd = request.registry.statsd
369 371 if statsd:
370 372 statsd.incr(
371 373 'vcsserver_method_total', tags=[
372 374 f"method:{method}",
373 375 ])
374 376 return payload, remote, method, args, kwargs
375 377
376 378 def vcs_view(self, request):
377 379
378 380 payload, remote, method, args, kwargs = self._vcs_view_params(request)
379 381 payload_id = payload.get('id')
380 382
381 383 try:
382 384 resp = getattr(remote, method)(*args, **kwargs)
383 385 except Exception as e:
384 386 exc_info = list(sys.exc_info())
385 387 exc_type, exc_value, exc_traceback = exc_info
386 388
387 389 org_exc = getattr(e, '_org_exc', None)
388 390 org_exc_name = None
389 391 org_exc_tb = ''
390 392 if org_exc:
391 393 org_exc_name = org_exc.__class__.__name__
392 394 org_exc_tb = getattr(e, '_org_exc_tb', '')
393 395 # replace our "faked" exception with our org
394 396 exc_info[0] = org_exc.__class__
395 397 exc_info[1] = org_exc
396 398
397 399 should_store_exc = True
398 400 if org_exc:
399 401 def get_exc_fqn(_exc_obj):
400 402 module_name = getattr(org_exc.__class__, '__module__', 'UNKNOWN')
401 403 return module_name + '.' + org_exc_name
402 404
403 405 exc_fqn = get_exc_fqn(org_exc)
404 406
405 407 if exc_fqn in ['mercurial.error.RepoLookupError',
406 408 'vcsserver.exceptions.RefNotFoundException']:
407 409 should_store_exc = False
408 410
409 411 if should_store_exc:
410 412 store_exception(id(exc_info), exc_info, request_path=request.path)
411 413
412 414 tb_info = format_exc(exc_info)
413 415
414 416 type_ = e.__class__.__name__
415 417 if type_ not in self.ALLOWED_EXCEPTIONS:
416 418 type_ = None
417 419
418 420 resp = {
419 421 'id': payload_id,
420 422 'error': {
421 423 'message': str(e),
422 424 'traceback': tb_info,
423 425 'org_exc': org_exc_name,
424 426 'org_exc_tb': org_exc_tb,
425 427 'type': type_
426 428 }
427 429 }
428 430
429 431 try:
430 432 resp['error']['_vcs_kind'] = getattr(e, '_vcs_kind', None)
431 433 except AttributeError:
432 434 pass
433 435 else:
434 436 resp = {
435 437 'id': payload_id,
436 438 'result': resp
437 439 }
438 440 log.debug('Serving data for method %s', method)
439 441 return resp
440 442
441 443 def vcs_stream_view(self, request):
442 444 payload, remote, method, args, kwargs = self._vcs_view_params(request)
443 445 # this method has a stream: marker we remove it here
444 446 method = method.split('stream:')[-1]
445 447 chunk_size = safe_int(payload.get('chunk_size')) or 4096
446 448
447 449 resp = getattr(remote, method)(*args, **kwargs)
448 450
449 451 def get_chunked_data(method_resp):
450 452 stream = io.BytesIO(method_resp)
451 453 while 1:
452 454 chunk = stream.read(chunk_size)
453 455 if not chunk:
454 456 break
455 457 yield chunk
456 458
457 459 response = Response(app_iter=get_chunked_data(resp))
458 460 response.content_type = 'application/octet-stream'
459 461
460 462 return response
461 463
462 464 def status_view(self, request):
463 465 import vcsserver
464 466 _platform_id = platform.uname()[1] or 'instance'
465 467
466 468 return {
467 469 "status": "OK",
468 470 "vcsserver_version": vcsserver.get_version(),
469 471 "platform": _platform_id,
470 472 "pid": os.getpid(),
471 473 }
472 474
473 475 def service_view(self, request):
474 476 import vcsserver
475 477
476 478 payload = msgpack.unpackb(request.body, use_list=True)
477 479 server_config, app_config = {}, {}
478 480
479 481 try:
480 482 path = self.global_config['__file__']
481 483 config = configparser.RawConfigParser()
482 484
483 485 config.read(path)
484 486
485 487 if config.has_section('server:main'):
486 488 server_config = dict(config.items('server:main'))
487 489 if config.has_section('app:main'):
488 490 app_config = dict(config.items('app:main'))
489 491
490 492 except Exception:
491 493 log.exception('Failed to read .ini file for display')
492 494
493 495 environ = list(os.environ.items())
494 496
495 497 resp = {
496 498 'id': payload.get('id'),
497 499 'result': dict(
498 500 version=vcsserver.get_version(),
499 501 config=server_config,
500 502 app_config=app_config,
501 503 environ=environ,
502 504 payload=payload,
503 505 )
504 506 }
505 507 return resp
506 508
507 509 def _msgpack_renderer_factory(self, info):
508 510
509 511 def _render(value, system):
510 512 bin_type = False
511 513 res = value.get('result')
512 514 if isinstance(res, BytesEnvelope):
513 515 log.debug('Result is wrapped in BytesEnvelope type')
514 516 bin_type = True
515 517 elif isinstance(res, BinaryEnvelope):
516 518 log.debug('Result is wrapped in BinaryEnvelope type')
517 519 value['result'] = res.val
518 520 bin_type = True
519 521
520 522 request = system.get('request')
521 523 if request is not None:
522 524 response = request.response
523 525 ct = response.content_type
524 526 if ct == response.default_content_type:
525 527 response.content_type = 'application/x-msgpack'
526 528 if bin_type:
527 529 response.content_type = 'application/x-msgpack-bin'
528 530
529 531 return msgpack.packb(value, use_bin_type=bin_type)
530 532 return _render
531 533
532 534 def set_env_from_config(self, environ, config):
533 535 dict_conf = {}
534 536 try:
535 537 for elem in config:
536 538 if elem[0] == 'rhodecode':
537 539 dict_conf = json.loads(elem[2])
538 540 break
539 541 except Exception:
540 542 log.exception('Failed to fetch SCM CONFIG')
541 543 return
542 544
543 545 username = dict_conf.get('username')
544 546 if username:
545 547 environ['REMOTE_USER'] = username
546 548 # mercurial specific, some extension api rely on this
547 549 environ['HGUSER'] = username
548 550
549 551 ip = dict_conf.get('ip')
550 552 if ip:
551 553 environ['REMOTE_HOST'] = ip
552 554
553 555 if _is_request_chunked(environ):
554 556 # set the compatibility flag for webob
555 557 environ['wsgi.input_terminated'] = True
556 558
557 559 def hg_proxy(self):
558 560 @wsgiapp
559 561 def _hg_proxy(environ, start_response):
560 562 app = WsgiProxy(self.remote_wsgi.HgRemoteWsgi())
561 563 return app(environ, start_response)
562 564 return _hg_proxy
563 565
564 566 def git_proxy(self):
565 567 @wsgiapp
566 568 def _git_proxy(environ, start_response):
567 569 app = WsgiProxy(self.remote_wsgi.GitRemoteWsgi())
568 570 return app(environ, start_response)
569 571 return _git_proxy
570 572
571 573 def hg_stream(self):
572 574 if self._use_echo_app:
573 575 @wsgiapp
574 576 def _hg_stream(environ, start_response):
575 577 app = EchoApp('fake_path', 'fake_name', None)
576 578 return app(environ, start_response)
577 579 return _hg_stream
578 580 else:
579 581 @wsgiapp
580 582 def _hg_stream(environ, start_response):
581 583 log.debug('http-app: handling hg stream')
582 584 call_context = get_headers_call_context(environ)
583 585
584 586 repo_path = call_context['repo_path']
585 587 repo_name = call_context['repo_name']
586 588 config = call_context['repo_config']
587 589
588 590 app = scm_app.create_hg_wsgi_app(
589 591 repo_path, repo_name, config)
590 592
591 593 # Consistent path information for hgweb
592 594 environ['PATH_INFO'] = call_context['path_info']
593 595 environ['REPO_NAME'] = repo_name
594 596 self.set_env_from_config(environ, config)
595 597
596 598 log.debug('http-app: starting app handler '
597 599 'with %s and process request', app)
598 600 return app(environ, ResponseFilter(start_response))
599 601 return _hg_stream
600 602
601 603 def git_stream(self):
602 604 if self._use_echo_app:
603 605 @wsgiapp
604 606 def _git_stream(environ, start_response):
605 607 app = EchoApp('fake_path', 'fake_name', None)
606 608 return app(environ, start_response)
607 609 return _git_stream
608 610 else:
609 611 @wsgiapp
610 612 def _git_stream(environ, start_response):
611 613 log.debug('http-app: handling git stream')
612 614
613 615 call_context = get_headers_call_context(environ)
614 616
615 617 repo_path = call_context['repo_path']
616 618 repo_name = call_context['repo_name']
617 619 config = call_context['repo_config']
618 620
619 621 environ['PATH_INFO'] = call_context['path_info']
620 622 self.set_env_from_config(environ, config)
621 623
622 624 content_type = environ.get('CONTENT_TYPE', '')
623 625
624 626 path = environ['PATH_INFO']
625 627 is_lfs_request = GIT_LFS_CONTENT_TYPE in content_type
626 628 log.debug(
627 629 'LFS: Detecting if request `%s` is LFS server path based '
628 630 'on content type:`%s`, is_lfs:%s',
629 631 path, content_type, is_lfs_request)
630 632
631 633 if not is_lfs_request:
632 634 # fallback detection by path
633 635 if GIT_LFS_PROTO_PAT.match(path):
634 636 is_lfs_request = True
635 637 log.debug(
636 638 'LFS: fallback detection by path of: `%s`, is_lfs:%s',
637 639 path, is_lfs_request)
638 640
639 641 if is_lfs_request:
640 642 app = scm_app.create_git_lfs_wsgi_app(
641 643 repo_path, repo_name, config)
642 644 else:
643 645 app = scm_app.create_git_wsgi_app(
644 646 repo_path, repo_name, config)
645 647
646 648 log.debug('http-app: starting app handler '
647 649 'with %s and process request', app)
648 650
649 651 return app(environ, start_response)
650 652
651 653 return _git_stream
652 654
653 655 def handle_vcs_exception(self, exception, request):
654 656 _vcs_kind = getattr(exception, '_vcs_kind', '')
655 657
656 658 if _vcs_kind == 'repo_locked':
657 659 headers_call_context = get_headers_call_context(request.environ)
658 660 status_code = safe_int(headers_call_context['locked_status_code'])
659 661
660 662 return HTTPRepoLocked(
661 663 title=str(exception), status_code=status_code, headers=[('X-Rc-Locked', '1')])
662 664
663 665 elif _vcs_kind == 'repo_branch_protected':
664 666 # Get custom repo-branch-protected status code if present.
665 667 return HTTPRepoBranchProtected(
666 668 title=str(exception), headers=[('X-Rc-Branch-Protection', '1')])
667 669
668 670 exc_info = request.exc_info
669 671 store_exception(id(exc_info), exc_info)
670 672
671 673 traceback_info = 'unavailable'
672 674 if request.exc_info:
673 675 traceback_info = format_exc(request.exc_info)
674 676
675 677 log.error(
676 678 'error occurred handling this request for path: %s, \n%s',
677 679 request.path, traceback_info)
678 680
679 681 statsd = request.registry.statsd
680 682 if statsd:
681 683 exc_type = f"{exception.__class__.__module__}.{exception.__class__.__name__}"
682 684 statsd.incr('vcsserver_exception_total',
683 685 tags=[f"type:{exc_type}"])
684 686 raise exception
685 687
686 688
687 689 class ResponseFilter:
688 690
689 691 def __init__(self, start_response):
690 692 self._start_response = start_response
691 693
692 694 def __call__(self, status, response_headers, exc_info=None):
693 695 headers = tuple(
694 696 (h, v) for h, v in response_headers
695 697 if not wsgiref.util.is_hop_by_hop(h))
696 698 return self._start_response(status, headers, exc_info)
697 699
698 700
699 701 def sanitize_settings_and_apply_defaults(global_config, settings):
700 702 _global_settings_maker = SettingsMaker(global_config)
701 703 settings_maker = SettingsMaker(settings)
702 704
703 705 settings_maker.make_setting('logging.autoconfigure', False, parser='bool')
704 706
705 707 logging_conf = os.path.join(os.path.dirname(global_config.get('__file__')), 'logging.ini')
706 708 settings_maker.enable_logging(logging_conf)
707 709
708 710 # Default includes, possible to change as a user
709 711 pyramid_includes = settings_maker.make_setting('pyramid.includes', [], parser='list:newline')
710 712 log.debug("Using the following pyramid.includes: %s", pyramid_includes)
711 713
712 714 settings_maker.make_setting('__file__', global_config.get('__file__'))
713 715
714 716 settings_maker.make_setting('pyramid.default_locale_name', 'en')
715 717 settings_maker.make_setting('locale', 'en_US.UTF-8')
716 718
717 settings_maker.make_setting('core.binary_dir', '')
719 settings_maker.make_setting('core.binary_dir', '/usr/local/bin/rhodecode_bin/vcs_bin')
718 720
719 721 temp_store = tempfile.gettempdir()
720 722 default_cache_dir = os.path.join(temp_store, 'rc_cache')
721 723 # save default, cache dir, and use it for all backends later.
722 724 default_cache_dir = settings_maker.make_setting(
723 725 'cache_dir',
724 726 default=default_cache_dir, default_when_empty=True,
725 727 parser='dir:ensured')
726 728
727 729 # exception store cache
728 730 settings_maker.make_setting(
729 731 'exception_tracker.store_path',
730 732 default=os.path.join(default_cache_dir, 'exc_store'), default_when_empty=True,
731 733 parser='dir:ensured'
732 734 )
733 735
734 736 # repo_object cache defaults
735 737 settings_maker.make_setting(
736 738 'rc_cache.repo_object.backend',
737 739 default='dogpile.cache.rc.file_namespace',
738 740 parser='string')
739 741 settings_maker.make_setting(
740 742 'rc_cache.repo_object.expiration_time',
741 743 default=30 * 24 * 60 * 60, # 30days
742 744 parser='int')
743 745 settings_maker.make_setting(
744 746 'rc_cache.repo_object.arguments.filename',
745 747 default=os.path.join(default_cache_dir, 'vcsserver_cache_repo_object.db'),
746 748 parser='string')
747 749
748 750 # statsd
749 751 settings_maker.make_setting('statsd.enabled', False, parser='bool')
750 752 settings_maker.make_setting('statsd.statsd_host', 'statsd-exporter', parser='string')
751 753 settings_maker.make_setting('statsd.statsd_port', 9125, parser='int')
752 754 settings_maker.make_setting('statsd.statsd_prefix', '')
753 755 settings_maker.make_setting('statsd.statsd_ipv6', False, parser='bool')
754 756
755 757 settings_maker.env_expand()
756 758
757 759
758 760 def main(global_config, **settings):
759 761 start_time = time.time()
760 762 log.info('Pyramid app config starting')
761 763
762 764 if MercurialFactory:
763 765 hgpatches.patch_largefiles_capabilities()
764 766 hgpatches.patch_subrepo_type_mapping()
765 767
766 768 # Fill in and sanitize the defaults & do ENV expansion
767 769 sanitize_settings_and_apply_defaults(global_config, settings)
768 770
769 771 # init and bootstrap StatsdClient
770 772 StatsdClient.setup(settings)
771 773
772 774 pyramid_app = HTTPApplication(settings=settings, global_config=global_config).wsgi_app()
773 775 total_time = time.time() - start_time
774 776 log.info('Pyramid app created and configured in %.2fs', total_time)
775 777 return pyramid_app
@@ -1,563 +1,563 b''
1 1 """
2 2 Module provides a class allowing to wrap communication over subprocess.Popen
3 3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 4 stream processor exposing the output data as an iterator fitting to be a
5 5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6 6
7 7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
8 8
9 9 This file is part of git_http_backend.py Project.
10 10
11 11 git_http_backend.py Project is free software: you can redistribute it and/or
12 12 modify it under the terms of the GNU Lesser General Public License as
13 13 published by the Free Software Foundation, either version 2.1 of the License,
14 14 or (at your option) any later version.
15 15
16 16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 19 GNU Lesser General Public License for more details.
20 20
21 21 You should have received a copy of the GNU Lesser General Public License
22 22 along with git_http_backend.py Project.
23 23 If not, see <http://www.gnu.org/licenses/>.
24 24 """
25 25 import os
26 26 import collections
27 27 import logging
28 28 import subprocess
29 29 import threading
30 30
31 31 from vcsserver.str_utils import safe_str
32 32
33 33 log = logging.getLogger(__name__)
34 34
35 35
36 36 class StreamFeeder(threading.Thread):
37 37 """
38 38 Normal writing into pipe-like is blocking once the buffer is filled.
39 39 This thread allows a thread to seep data from a file-like into a pipe
40 40 without blocking the main thread.
41 41 We close inpipe once the end of the source stream is reached.
42 42 """
43 43
44 44 def __init__(self, source):
45 45 super().__init__()
46 46 self.daemon = True
47 47 filelike = False
48 48 self.bytes = b''
49 49 if type(source) in (str, bytes, bytearray): # string-like
50 50 self.bytes = bytes(source)
51 51 else: # can be either file pointer or file-like
52 52 if isinstance(source, int): # file pointer it is
53 53 # converting file descriptor (int) stdin into file-like
54 54 source = os.fdopen(source, 'rb', 16384)
55 55 # let's see if source is file-like by now
56 56 filelike = hasattr(source, 'read')
57 57 if not filelike and not self.bytes:
58 58 raise TypeError("StreamFeeder's source object must be a readable "
59 59 "file-like, a file descriptor, or a string-like.")
60 60 self.source = source
61 61 self.readiface, self.writeiface = os.pipe()
62 62
63 63 def run(self):
64 64 writer = self.writeiface
65 65 try:
66 66 if self.bytes:
67 67 os.write(writer, self.bytes)
68 68 else:
69 69 s = self.source
70 70
71 71 while 1:
72 72 _bytes = s.read(4096)
73 73 if not _bytes:
74 74 break
75 75 os.write(writer, _bytes)
76 76
77 77 finally:
78 78 os.close(writer)
79 79
80 80 @property
81 81 def output(self):
82 82 return self.readiface
83 83
84 84
85 85 class InputStreamChunker(threading.Thread):
86 86 def __init__(self, source, target, buffer_size, chunk_size):
87 87
88 88 super().__init__()
89 89
90 90 self.daemon = True # die die die.
91 91
92 92 self.source = source
93 93 self.target = target
94 94 self.chunk_count_max = int(buffer_size / chunk_size) + 1
95 95 self.chunk_size = chunk_size
96 96
97 97 self.data_added = threading.Event()
98 98 self.data_added.clear()
99 99
100 100 self.keep_reading = threading.Event()
101 101 self.keep_reading.set()
102 102
103 103 self.EOF = threading.Event()
104 104 self.EOF.clear()
105 105
106 106 self.go = threading.Event()
107 107 self.go.set()
108 108
109 109 def stop(self):
110 110 self.go.clear()
111 111 self.EOF.set()
112 112 try:
113 113 # this is not proper, but is done to force the reader thread let
114 114 # go of the input because, if successful, .close() will send EOF
115 115 # down the pipe.
116 116 self.source.close()
117 117 except Exception:
118 118 pass
119 119
120 120 def run(self):
121 121 s = self.source
122 122 t = self.target
123 123 cs = self.chunk_size
124 124 chunk_count_max = self.chunk_count_max
125 125 keep_reading = self.keep_reading
126 126 da = self.data_added
127 127 go = self.go
128 128
129 129 try:
130 130 b = s.read(cs)
131 131 except ValueError:
132 132 b = ''
133 133
134 134 timeout_input = 20
135 135 while b and go.is_set():
136 136 if len(t) > chunk_count_max:
137 137 keep_reading.clear()
138 138 keep_reading.wait(timeout_input)
139 139 if len(t) > chunk_count_max + timeout_input:
140 140 log.error("Timed out while waiting for input from subprocess.")
141 141 os._exit(-1) # this will cause the worker to recycle itself
142 142
143 143 t.append(b)
144 144 da.set()
145 145
146 146 try:
147 147 b = s.read(cs)
148 148 except ValueError: # probably "I/O operation on closed file"
149 149 b = ''
150 150
151 151 self.EOF.set()
152 152 da.set() # for cases when done but there was no input.
153 153
154 154
155 155 class BufferedGenerator:
156 156 """
157 157 Class behaves as a non-blocking, buffered pipe reader.
158 158 Reads chunks of data (through a thread)
159 159 from a blocking pipe, and attaches these to an array (Deque) of chunks.
160 160 Reading is halted in the thread when max chunks is internally buffered.
161 161 The .next() may operate in blocking or non-blocking fashion by yielding
162 162 '' if no data is ready
163 163 to be sent or by not returning until there is some data to send
164 164 When we get EOF from underlying source pipe we raise the marker to raise
165 165 StopIteration after the last chunk of data is yielded.
166 166 """
167 167
168 168 def __init__(self, name, source, buffer_size=65536, chunk_size=4096,
169 169 starting_values=None, bottomless=False):
170 170 starting_values = starting_values or []
171 171 self.name = name
172 172 self.buffer_size = buffer_size
173 173 self.chunk_size = chunk_size
174 174
175 175 if bottomless:
176 176 maxlen = int(buffer_size / chunk_size)
177 177 else:
178 178 maxlen = None
179 179
180 180 self.data_queue = collections.deque(starting_values, maxlen)
181 181 self.worker = InputStreamChunker(source, self.data_queue, buffer_size, chunk_size)
182 182 if starting_values:
183 183 self.worker.data_added.set()
184 184 self.worker.start()
185 185
186 186 ####################
187 187 # Generator's methods
188 188 ####################
189 189 def __str__(self):
190 190 return f'BufferedGenerator(name={self.name} chunk: {self.chunk_size} on buffer: {self.buffer_size})'
191 191
192 192 def __iter__(self):
193 193 return self
194 194
195 195 def __next__(self):
196 196
197 197 while not self.length and not self.worker.EOF.is_set():
198 198 self.worker.data_added.clear()
199 199 self.worker.data_added.wait(0.2)
200 200
201 201 if self.length:
202 202 self.worker.keep_reading.set()
203 203 return bytes(self.data_queue.popleft())
204 204 elif self.worker.EOF.is_set():
205 205 raise StopIteration
206 206
207 207 def throw(self, exc_type, value=None, traceback=None):
208 208 if not self.worker.EOF.is_set():
209 209 raise exc_type(value)
210 210
211 211 def start(self):
212 212 self.worker.start()
213 213
214 214 def stop(self):
215 215 self.worker.stop()
216 216
217 217 def close(self):
218 218 try:
219 219 self.worker.stop()
220 220 self.throw(GeneratorExit)
221 221 except (GeneratorExit, StopIteration):
222 222 pass
223 223
224 224 ####################
225 225 # Threaded reader's infrastructure.
226 226 ####################
227 227 @property
228 228 def input(self):
229 229 return self.worker.w
230 230
231 231 @property
232 232 def data_added_event(self):
233 233 return self.worker.data_added
234 234
235 235 @property
236 236 def data_added(self):
237 237 return self.worker.data_added.is_set()
238 238
239 239 @property
240 240 def reading_paused(self):
241 241 return not self.worker.keep_reading.is_set()
242 242
243 243 @property
244 244 def done_reading_event(self):
245 245 """
246 246 Done_reding does not mean that the iterator's buffer is empty.
247 247 Iterator might have done reading from underlying source, but the read
248 248 chunks might still be available for serving through .next() method.
249 249
250 250 :returns: An Event class instance.
251 251 """
252 252 return self.worker.EOF
253 253
254 254 @property
255 255 def done_reading(self):
256 256 """
257 257 Done_reading does not mean that the iterator's buffer is empty.
258 258 Iterator might have done reading from underlying source, but the read
259 259 chunks might still be available for serving through .next() method.
260 260
261 261 :returns: An Bool value.
262 262 """
263 263 return self.worker.EOF.is_set()
264 264
265 265 @property
266 266 def length(self):
267 267 """
268 268 returns int.
269 269
270 270 This is the length of the queue of chunks, not the length of
271 271 the combined contents in those chunks.
272 272
273 273 __len__() cannot be meaningfully implemented because this
274 274 reader is just flying through a bottomless pit content and
275 275 can only know the length of what it already saw.
276 276
277 277 If __len__() on WSGI server per PEP 3333 returns a value,
278 278 the response's length will be set to that. In order not to
279 279 confuse WSGI PEP3333 servers, we will not implement __len__
280 280 at all.
281 281 """
282 282 return len(self.data_queue)
283 283
284 284 def prepend(self, x):
285 285 self.data_queue.appendleft(x)
286 286
287 287 def append(self, x):
288 288 self.data_queue.append(x)
289 289
290 290 def extend(self, o):
291 291 self.data_queue.extend(o)
292 292
293 293 def __getitem__(self, i):
294 294 return self.data_queue[i]
295 295
296 296
297 297 class SubprocessIOChunker:
298 298 """
299 299 Processor class wrapping handling of subprocess IO.
300 300
301 301 .. important::
302 302
303 303 Watch out for the method `__del__` on this class. If this object
304 304 is deleted, it will kill the subprocess, so avoid to
305 305 return the `output` attribute or usage of it like in the following
306 306 example::
307 307
308 308 # `args` expected to run a program that produces a lot of output
309 309 output = ''.join(SubprocessIOChunker(
310 310 args, shell=False, inputstream=inputstream, env=environ).output)
311 311
312 312 # `output` will not contain all the data, because the __del__ method
313 313 # has already killed the subprocess in this case before all output
314 314 # has been consumed.
315 315
316 316
317 317
318 318 In a way, this is a "communicate()" replacement with a twist.
319 319
320 320 - We are multithreaded. Writing in and reading out, err are all sep threads.
321 321 - We support concurrent (in and out) stream processing.
322 322 - The output is not a stream. It's a queue of read string (bytes, not str)
323 323 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
324 324 - We are non-blocking in more respects than communicate()
325 325 (reading from subprocess out pauses when internal buffer is full, but
326 326 does not block the parent calling code. On the flip side, reading from
327 327 slow-yielding subprocess may block the iteration until data shows up. This
328 328 does not block the parallel inpipe reading occurring parallel thread.)
329 329
330 330 The purpose of the object is to allow us to wrap subprocess interactions into
331 331 an iterable that can be passed to a WSGI server as the application's return
332 332 value. Because of stream-processing-ability, WSGI does not have to read ALL
333 333 of the subprocess's output and buffer it, before handing it to WSGI server for
334 334 HTTP response. Instead, the class initializer reads just a bit of the stream
335 335 to figure out if error occurred or likely to occur and if not, just hands the
336 336 further iteration over subprocess output to the server for completion of HTTP
337 337 response.
338 338
339 339 The real or perceived subprocess error is trapped and raised as one of
340 340 OSError family of exceptions
341 341
342 342 Example usage:
343 343 # try:
344 344 # answer = SubprocessIOChunker(
345 345 # cmd,
346 346 # input,
347 347 # buffer_size = 65536,
348 348 # chunk_size = 4096
349 349 # )
350 350 # except (OSError) as e:
351 351 # print str(e)
352 352 # raise e
353 353 #
354 354 # return answer
355 355
356 356
357 357 """
358 358
359 359 # TODO: johbo: This is used to make sure that the open end of the PIPE
360 360 # is closed in the end. It would be way better to wrap this into an
361 361 # object, so that it is closed automatically once it is consumed or
362 362 # something similar.
363 363 _close_input_fd = None
364 364
365 365 _closed = False
366 366 _stdout = None
367 367 _stderr = None
368 368
369 369 def __init__(self, cmd, input_stream=None, buffer_size=65536,
370 370 chunk_size=4096, starting_values=None, fail_on_stderr=True,
371 371 fail_on_return_code=True, **kwargs):
372 372 """
373 373 Initializes SubprocessIOChunker
374 374
375 375 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
376 376 :param input_stream: (Default: None) A file-like, string, or file pointer.
377 377 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
378 378 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
379 379 :param starting_values: (Default: []) An array of strings to put in front of output que.
380 380 :param fail_on_stderr: (Default: True) Whether to raise an exception in
381 381 case something is written to stderr.
382 382 :param fail_on_return_code: (Default: True) Whether to raise an
383 383 exception if the return code is not 0.
384 384 """
385 385
386 386 kwargs['shell'] = kwargs.get('shell', True)
387 387
388 388 starting_values = starting_values or []
389 389 if input_stream:
390 390 input_streamer = StreamFeeder(input_stream)
391 391 input_streamer.start()
392 392 input_stream = input_streamer.output
393 393 self._close_input_fd = input_stream
394 394
395 395 self._fail_on_stderr = fail_on_stderr
396 396 self._fail_on_return_code = fail_on_return_code
397 397 self.cmd = cmd
398 398
399 399 _p = subprocess.Popen(cmd, bufsize=-1, stdin=input_stream, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
400 400 **kwargs)
401 401 self.process = _p
402 402
403 403 bg_out = BufferedGenerator('stdout', _p.stdout, buffer_size, chunk_size, starting_values)
404 404 bg_err = BufferedGenerator('stderr', _p.stderr, 10240, 1, bottomless=True)
405 405
406 406 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
407 407 # doing this until we reach either end of file, or end of buffer.
408 408 bg_out.data_added_event.wait(0.2)
409 409 bg_out.data_added_event.clear()
410 410
411 411 # at this point it's still ambiguous if we are done reading or just full buffer.
412 412 # Either way, if error (returned by ended process, or implied based on
413 413 # presence of stuff in stderr output) we error out.
414 414 # Else, we are happy.
415 415 return_code = _p.poll()
416 416 ret_code_ok = return_code in [None, 0]
417 417 ret_code_fail = return_code is not None and return_code != 0
418 418 if (
419 419 (ret_code_fail and fail_on_return_code) or
420 420 (ret_code_ok and fail_on_stderr and bg_err.length)
421 421 ):
422 422
423 423 try:
424 424 _p.terminate()
425 425 except Exception:
426 426 pass
427 427
428 428 bg_out.stop()
429 429 out = b''.join(bg_out)
430 430 self._stdout = out
431 431
432 432 bg_err.stop()
433 433 err = b''.join(bg_err)
434 434 self._stderr = err
435 435
436 436 # code from https://github.com/schacon/grack/pull/7
437 437 if err.strip() == b'fatal: The remote end hung up unexpectedly' and out.startswith(b'0034shallow '):
438 438 bg_out = iter([out])
439 439 _p = None
440 440 elif err and fail_on_stderr:
441 441 text_err = err.decode()
442 442 raise OSError(
443 443 f"Subprocess exited due to an error:\n{text_err}")
444 444
445 445 if ret_code_fail and fail_on_return_code:
446 446 text_err = err.decode()
447 447 if not err:
448 448 # maybe get empty stderr, try stdout instead
449 449 # in many cases git reports the errors on stdout too
450 450 text_err = out.decode()
451 451 raise OSError(
452 452 f"Subprocess exited with non 0 ret code:{return_code}: stderr:{text_err}")
453 453
454 454 self.stdout = bg_out
455 455 self.stderr = bg_err
456 456 self.inputstream = input_stream
457 457
458 458 def __str__(self):
459 459 proc = getattr(self, 'process', 'NO_PROCESS')
460 460 return f'SubprocessIOChunker: {proc}'
461 461
462 462 def __iter__(self):
463 463 return self
464 464
465 465 def __next__(self):
466 466 # Note: mikhail: We need to be sure that we are checking the return
467 467 # code after the stdout stream is closed. Some processes, e.g. git
468 468 # are doing some magic in between closing stdout and terminating the
469 469 # process and, as a result, we are not getting return code on "slow"
470 470 # systems.
471 471 result = None
472 472 stop_iteration = None
473 473 try:
474 474 result = next(self.stdout)
475 475 except StopIteration as e:
476 476 stop_iteration = e
477 477
478 478 if self.process:
479 479 return_code = self.process.poll()
480 480 ret_code_fail = return_code is not None and return_code != 0
481 481 if ret_code_fail and self._fail_on_return_code:
482 482 self.stop_streams()
483 483 err = self.get_stderr()
484 484 raise OSError(
485 485 f"Subprocess exited (exit_code:{return_code}) due to an error during iteration:\n{err}")
486 486
487 487 if stop_iteration:
488 488 raise stop_iteration
489 489 return result
490 490
491 491 def throw(self, exc_type, value=None, traceback=None):
492 492 if self.stdout.length or not self.stdout.done_reading:
493 493 raise exc_type(value)
494 494
495 495 def close(self):
496 496 if self._closed:
497 497 return
498 498
499 499 try:
500 500 self.process.terminate()
501 501 except Exception:
502 502 pass
503 503 if self._close_input_fd:
504 504 os.close(self._close_input_fd)
505 505 try:
506 506 self.stdout.close()
507 507 except Exception:
508 508 pass
509 509 try:
510 510 self.stderr.close()
511 511 except Exception:
512 512 pass
513 513 try:
514 514 os.close(self.inputstream)
515 515 except Exception:
516 516 pass
517 517
518 518 self._closed = True
519 519
520 520 def stop_streams(self):
521 521 getattr(self.stdout, 'stop', lambda: None)()
522 522 getattr(self.stderr, 'stop', lambda: None)()
523 523
524 524 def get_stdout(self):
525 525 if self._stdout:
526 526 return self._stdout
527 527 else:
528 528 return b''.join(self.stdout)
529 529
530 530 def get_stderr(self):
531 531 if self._stderr:
532 532 return self._stderr
533 533 else:
534 534 return b''.join(self.stderr)
535 535
536 536
537 537 def run_command(arguments, env=None):
538 538 """
539 539 Run the specified command and return the stdout.
540 540
541 541 :param arguments: sequence of program arguments (including the program name)
542 542 :type arguments: list[str]
543 543 """
544 544
545 545 cmd = arguments
546 546 log.debug('Running subprocessio command %s', cmd)
547 547 proc = None
548 548 try:
549 549 _opts = {'shell': False, 'fail_on_stderr': False}
550 550 if env:
551 551 _opts.update({'env': env})
552 552 proc = SubprocessIOChunker(cmd, **_opts)
553 553 return b''.join(proc), b''.join(proc.stderr)
554 554 except OSError as err:
555 cmd = ' '.join(map(safe_str, cmd)) # human friendly CMD
555 cmd = ' '.join(map(safe_str, cmd)) # human friendly CMD
556 556 tb_err = ("Couldn't run subprocessio command (%s).\n"
557 557 "Original error was:%s\n" % (cmd, err))
558 558 log.exception(tb_err)
559 559 raise Exception(tb_err)
560 560 finally:
561 561 if proc:
562 562 proc.close()
563 563
General Comments 0
You need to be logged in to leave comments. Login now