##// END OF EJS Templates
core: code changes to handle branch-permissions / locking / incorrect client support
super-admin -
r1323:a1a9e846 default
parent child Browse files
Show More
@@ -1,131 +1,138 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2023 RhodeCode GmbH
2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 """
18 """
19 Special exception handling over the wire.
19 Special exception handling over the wire.
20
20
21 Since we cannot assume that our client is able to import our exception classes,
21 Since we cannot assume that our client is able to import our exception classes,
22 this module provides a "wrapping" mechanism to raise plain exceptions
22 this module provides a "wrapping" mechanism to raise plain exceptions
23 which contain an extra attribute `_vcs_kind` to allow a client to distinguish
23 which contain an extra attribute `_vcs_kind` to allow a client to distinguish
24 different error conditions.
24 different error conditions.
25 """
25 """
26
26
27 from pyramid.httpexceptions import HTTPLocked, HTTPForbidden
27 from pyramid.httpexceptions import HTTPLocked, HTTPForbidden
28
28
29
29
30 def _make_exception(kind, org_exc, *args):
30 def _make_exception(kind, org_exc, *args):
31 """
31 """
32 Prepares a base `Exception` instance to be sent over the wire.
32 Prepares a base `Exception` instance to be sent over the wire.
33
33
34 To give our caller a hint what this is about, it will attach an attribute
34 To give our caller a hint what this is about, it will attach an attribute
35 `_vcs_kind` to the exception.
35 `_vcs_kind` to the exception.
36 """
36 """
37 exc = Exception(*args)
37 exc = Exception(*args)
38 exc._vcs_kind = kind
38 exc._vcs_kind = kind
39 exc._org_exc = org_exc
39 exc._org_exc = org_exc
40 exc._org_exc_tb = getattr(org_exc, '_org_exc_tb', '')
40 exc._org_exc_tb = getattr(org_exc, '_org_exc_tb', '')
41 return exc
41 return exc
42
42
43
43
44 def AbortException(org_exc=None):
44 def AbortException(org_exc=None):
45 def _make_exception_wrapper(*args):
45 def _make_exception_wrapper(*args):
46 return _make_exception('abort', org_exc, *args)
46 return _make_exception('abort', org_exc, *args)
47 return _make_exception_wrapper
47 return _make_exception_wrapper
48
48
49
49
50 def ArchiveException(org_exc=None):
50 def ArchiveException(org_exc=None):
51 def _make_exception_wrapper(*args):
51 def _make_exception_wrapper(*args):
52 return _make_exception('archive', org_exc, *args)
52 return _make_exception('archive', org_exc, *args)
53 return _make_exception_wrapper
53 return _make_exception_wrapper
54
54
55
55
56 def ClientNotSupportedException(org_exc=None):
57 def _make_exception_wrapper(*args):
58 return _make_exception('client_not_supported', org_exc, *args)
59 return _make_exception_wrapper
60
61
62 def LookupException(org_exc=None):
56 def LookupException(org_exc=None):
63 def _make_exception_wrapper(*args):
57 def _make_exception_wrapper(*args):
64 return _make_exception('lookup', org_exc, *args)
58 return _make_exception('lookup', org_exc, *args)
65 return _make_exception_wrapper
59 return _make_exception_wrapper
66
60
67
61
68 def VcsException(org_exc=None):
62 def VcsException(org_exc=None):
69 def _make_exception_wrapper(*args):
63 def _make_exception_wrapper(*args):
70 return _make_exception('error', org_exc, *args)
64 return _make_exception('error', org_exc, *args)
71 return _make_exception_wrapper
65 return _make_exception_wrapper
72
66
73
67
74 def LockedRepoException(org_exc=None):
68 def LockedRepoException(org_exc=None):
75 def _make_exception_wrapper(*args):
69 def _make_exception_wrapper(*args):
76 return _make_exception('repo_locked', org_exc, *args)
70 return _make_exception('repo_locked', org_exc, *args)
77 return _make_exception_wrapper
71 return _make_exception_wrapper
78
72
79
73
80 def RepositoryBranchProtectedException(org_exc=None):
74 def RepositoryBranchProtectedException(org_exc=None):
81 def _make_exception_wrapper(*args):
75 def _make_exception_wrapper(*args):
82 return _make_exception('repo_branch_protected', org_exc, *args)
76 return _make_exception('repo_branch_protected', org_exc, *args)
83 return _make_exception_wrapper
77 return _make_exception_wrapper
84
78
79 def ClientNotSupportedException(org_exc=None):
80 def _make_exception_wrapper(*args):
81 return _make_exception('client_not_supported', org_exc, *args)
82 return _make_exception_wrapper
85
83
86 def RequirementException(org_exc=None):
84 def RequirementException(org_exc=None):
87 def _make_exception_wrapper(*args):
85 def _make_exception_wrapper(*args):
88 return _make_exception('requirement', org_exc, *args)
86 return _make_exception('requirement', org_exc, *args)
89 return _make_exception_wrapper
87 return _make_exception_wrapper
90
88
91
89
92 def UnhandledException(org_exc=None):
90 def UnhandledException(org_exc=None):
93 def _make_exception_wrapper(*args):
91 def _make_exception_wrapper(*args):
94 return _make_exception('unhandled', org_exc, *args)
92 return _make_exception('unhandled', org_exc, *args)
95 return _make_exception_wrapper
93 return _make_exception_wrapper
96
94
97
95
98 def URLError(org_exc=None):
96 def URLError(org_exc=None):
99 def _make_exception_wrapper(*args):
97 def _make_exception_wrapper(*args):
100 return _make_exception('url_error', org_exc, *args)
98 return _make_exception('url_error', org_exc, *args)
101 return _make_exception_wrapper
99 return _make_exception_wrapper
102
100
103
101
104 def SubrepoMergeException(org_exc=None):
102 def SubrepoMergeException(org_exc=None):
105 def _make_exception_wrapper(*args):
103 def _make_exception_wrapper(*args):
106 return _make_exception('subrepo_merge_error', org_exc, *args)
104 return _make_exception('subrepo_merge_error', org_exc, *args)
107 return _make_exception_wrapper
105 return _make_exception_wrapper
108
106
109
107
110 class HTTPRepoLocked(HTTPLocked):
108 class HTTPRepoLocked(HTTPLocked):
111 """
109 """
112 Subclass of HTTPLocked response that allows to set the title and status
110 Subclass of HTTPLocked response that allows to set the title and status
113 code via constructor arguments.
111 code via constructor arguments.
114 """
112 """
115 def __init__(self, title, status_code=None, **kwargs):
113 def __init__(self, title, status_code=None, **kwargs):
116 self.code = status_code or HTTPLocked.code
114 self.code = status_code or HTTPLocked.code
117 self.title = title
115 self.title = title
118 super().__init__(**kwargs)
116 super().__init__(**kwargs)
119
117
120
118
121 class HTTPRepoBranchProtected(HTTPForbidden):
119 class HTTPRepoBranchProtected(HTTPLocked):
122 def __init__(self, *args, **kwargs):
120 def __init__(self, title, status_code=None, **kwargs):
123 super(HTTPForbidden, self).__init__(*args, **kwargs)
121 self.code = status_code or HTTPLocked.code
122 self.title = title
123 super().__init__(**kwargs)
124
125
126 class HTTPClientNotSupported(HTTPLocked):
127 def __init__(self, title, status_code=None, **kwargs):
128 self.code = status_code or HTTPLocked.code
129 self.title = title
130 super().__init__(**kwargs)
124
131
125
132
126 class RefNotFoundException(KeyError):
133 class RefNotFoundException(KeyError):
127 pass
134 pass
128
135
129
136
130 class NoContentException(ValueError):
137 class NoContentException(ValueError):
131 pass
138 pass
@@ -1,780 +1,782 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2023 RhodeCode GmbH
2 # Copyright (C) 2014-2024 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 import io
18 import io
19 import os
19 import os
20 import sys
20 import sys
21 import logging
21 import logging
22 import collections
22 import collections
23 import base64
23 import base64
24 import msgpack
24 import msgpack
25 import dataclasses
25 import dataclasses
26 import pygit2
26 import pygit2
27
27
28 import http.client
28 import http.client
29 from celery import Celery
29 from celery import Celery
30
30
31 import mercurial.scmutil
31 import mercurial.scmutil
32 import mercurial.node
32 import mercurial.node
33
33
34 from vcsserver import exceptions, subprocessio, settings
34 from vcsserver import exceptions, subprocessio, settings
35 from vcsserver.lib.ext_json import json
35 from vcsserver.lib.ext_json import json
36 from vcsserver.lib.str_utils import ascii_str, safe_str
36 from vcsserver.lib.str_utils import ascii_str, safe_str
37 from vcsserver.lib.svn_txn_utils import get_txn_id_from_store
37 from vcsserver.lib.svn_txn_utils import get_txn_id_from_store
38 from vcsserver.remote.git_remote import Repository
38 from vcsserver.remote.git_remote import Repository
39
39
40 celery_app = Celery('__vcsserver__')
40 celery_app = Celery('__vcsserver__')
41 log = logging.getLogger(__name__)
41 log = logging.getLogger(__name__)
42
42
43
43
44 class HooksCeleryClient:
44 class HooksCeleryClient:
45 TASK_TIMEOUT = 60 # time in seconds
45 TASK_TIMEOUT = 60 # time in seconds
46
46
47 def __init__(self, queue, backend):
47 def __init__(self, queue, backend):
48 celery_app.config_from_object({
48 celery_app.config_from_object({
49 'broker_url': queue, 'result_backend': backend,
49 'broker_url': queue, 'result_backend': backend,
50 'broker_connection_retry_on_startup': True,
50 'broker_connection_retry_on_startup': True,
51 'task_serializer': 'json',
51 'task_serializer': 'json',
52 'accept_content': ['json', 'msgpack'],
52 'accept_content': ['json', 'msgpack'],
53 'result_serializer': 'json',
53 'result_serializer': 'json',
54 'result_accept_content': ['json', 'msgpack']
54 'result_accept_content': ['json', 'msgpack']
55 })
55 })
56 self.celery_app = celery_app
56 self.celery_app = celery_app
57
57
58 def __call__(self, method, extras):
58 def __call__(self, method, extras):
59 # NOTE: exception handling for those tasks executed is in
60 # @adapt_for_celery decorator
61 # also see: _maybe_handle_exception which is handling exceptions
59 inquired_task = self.celery_app.signature(
62 inquired_task = self.celery_app.signature(
60 f'rhodecode.lib.celerylib.tasks.{method}'
63 f'rhodecode.lib.celerylib.tasks.{method}'
61 )
64 )
62 result = inquired_task.delay(extras).get(timeout=self.TASK_TIMEOUT)
65 result = inquired_task.delay(extras).get(timeout=self.TASK_TIMEOUT)
63
66
64 return result
67 return result
65
68
66
69
67 class HooksShadowRepoClient:
70 class HooksShadowRepoClient:
68
71
69 def __call__(self, hook_name, extras):
72 def __call__(self, hook_name, extras):
70 return {'output': '', 'status': 0}
73 return {'output': '', 'status': 0}
71
74
72
75
73 class RemoteMessageWriter:
76 class RemoteMessageWriter:
74 """Writer base class."""
77 """Writer base class."""
75 def write(self, message):
78 def write(self, message):
76 raise NotImplementedError()
79 raise NotImplementedError()
77
80
78
81
79 class HgMessageWriter(RemoteMessageWriter):
82 class HgMessageWriter(RemoteMessageWriter):
80 """Writer that knows how to send messages to mercurial clients."""
83 """Writer that knows how to send messages to mercurial clients."""
81
84
82 def __init__(self, ui):
85 def __init__(self, ui):
83 self.ui = ui
86 self.ui = ui
84
87
85 def write(self, message: str):
88 def write(self, message: str):
86 # TODO: Check why the quiet flag is set by default.
89 args = (message.encode('utf-8'),)
87 old = self.ui.quiet
90 self.ui._writemsg(self.ui._fmsgerr, type=b'status', *args)
88 self.ui.quiet = False
89 self.ui.status(message.encode('utf-8'))
90 self.ui.quiet = old
91
91
92
92
93 class GitMessageWriter(RemoteMessageWriter):
93 class GitMessageWriter(RemoteMessageWriter):
94 """Writer that knows how to send messages to git clients."""
94 """Writer that knows how to send messages to git clients."""
95
95
96 def __init__(self, stdout=None):
96 def __init__(self, stdout=None):
97 self.stdout = stdout or sys.stdout
97 self.stdout = stdout or sys.stdout
98
98
99 def write(self, message: str):
99 def write(self, message: str):
100 self.stdout.write(message)
100 self.stdout.write(message + "\n" if message else "")
101
101
102
102
103 class SvnMessageWriter(RemoteMessageWriter):
103 class SvnMessageWriter(RemoteMessageWriter):
104 """Writer that knows how to send messages to svn clients."""
104 """Writer that knows how to send messages to svn clients."""
105
105
106 def __init__(self, stderr=None):
106 def __init__(self, stderr=None):
107 # SVN needs data sent to stderr for back-to-client messaging
107 # SVN needs data sent to stderr for back-to-client messaging
108 self.stderr = stderr or sys.stderr
108 self.stderr = stderr or sys.stderr
109
109
110 def write(self, message):
110 def write(self, message):
111 self.stderr.write(message)
111 self.stderr.write(message)
112
112
113
113
114 def _maybe_handle_exception(result):
114 def _maybe_handle_exception(writer, result):
115
115 """
116 adopt_for_celery defines the exception/exception_traceback
117 Ths result is a direct output from a celery task
118 """
116
119
117 exception_class = result.get('exception')
120 exception_class = result.get('exception')
118 exception_traceback = result.get('exception_traceback')
121 exception_traceback = result.get('exception_traceback')
119 if not exception_class:
120 return
121
122
122 log.debug('Handling hook-call exception: %s', exception_class)
123 match exception_class:
123
124 # NOTE: the underlying exceptions are setting _vcs_kind special marker
124 if exception_traceback:
125 # which is later handled by `handle_vcs_exception` and translated into a special HTTP exception
125 log.error('Got traceback from remote call:%s', exception_traceback)
126 # propagated later to the client
126
127 case 'HTTPLockedRepo':
127 if exception_class == 'HTTPLockedRepo':
128 raise exceptions.LockedRepoException()(*result['exception_args'])
128 raise exceptions.LockedRepoException()(*result['exception_args'])
129 case 'ClientNotSupported':
129 elif exception_class == 'ClientNotSupportedError':
130 raise exceptions.ClientNotSupportedException()(*result['exception_args'])
130 raise exceptions.ClientNotSupportedException()(*result['exception_args'])
131 case 'HTTPBranchProtected':
131 elif exception_class == 'HTTPBranchProtected':
132 raise exceptions.RepositoryBranchProtectedException()(*result['exception_args'])
132 raise exceptions.RepositoryBranchProtectedException()(*result['exception_args'])
133 case 'RepositoryError':
133 elif exception_class == 'RepositoryError':
134 raise exceptions.VcsException()(*result['exception_args'])
134 raise exceptions.VcsException()(*result['exception_args'])
135 case _:
135 elif exception_class:
136 if exception_class:
136 raise Exception(
137 log.error('Handling hook-call exception. Got traceback from remote call:%s', exception_traceback)
137 f"""Got remote exception "{exception_class}" with args "{result['exception_args']}" """
138 raise Exception(
138 )
139 f"""Got remote exception "{exception_class}" with args "{result['exception_args']}" """
140 )
139
141
140
142
141 def _get_hooks_client(extras):
143 def _get_hooks_client(extras):
142 task_queue = extras.get('task_queue')
144 task_queue = extras.get('task_queue')
143 task_backend = extras.get('task_backend')
145 task_backend = extras.get('task_backend')
144 is_shadow_repo = extras.get('is_shadow_repo')
146 is_shadow_repo = extras.get('is_shadow_repo')
145
147
146 if task_queue and task_backend:
148 if task_queue and task_backend:
147 return HooksCeleryClient(task_queue, task_backend)
149 return HooksCeleryClient(task_queue, task_backend)
148 elif is_shadow_repo:
150 elif is_shadow_repo:
149 return HooksShadowRepoClient()
151 return HooksShadowRepoClient()
150 else:
152 else:
151 raise Exception("Hooks client not found!")
153 raise Exception("Hooks client not found!")
152
154
153
155
154 def _call_hook(hook_name, extras, writer):
156 def _call_hook(hook_name, extras, writer):
155 hooks_client = _get_hooks_client(extras)
157 hooks_client = _get_hooks_client(extras)
156 log.debug('Hooks, using client:%s', hooks_client)
158 log.debug('Hooks, using client:%s', hooks_client)
157 result = hooks_client(hook_name, extras)
159 result = hooks_client(hook_name, extras)
158 log.debug('Hooks got result: %s', result)
160 log.debug('Hooks got result: %s', result)
159 _maybe_handle_exception(result)
161 _maybe_handle_exception(writer, result)
160 writer.write(result['output'])
162 writer.write(result['output'])
161
163
162 return result['status']
164 return result['status']
163
165
164
166
165 def _extras_from_ui(ui):
167 def _extras_from_ui(ui):
166 hook_data = ui.config(b'rhodecode', b'RC_SCM_DATA')
168 hook_data = ui.config(b'rhodecode', b'RC_SCM_DATA')
167 if not hook_data:
169 if not hook_data:
168 # maybe it's inside environ ?
170 # maybe it's inside environ ?
169 env_hook_data = os.environ.get('RC_SCM_DATA')
171 env_hook_data = os.environ.get('RC_SCM_DATA')
170 if env_hook_data:
172 if env_hook_data:
171 hook_data = env_hook_data
173 hook_data = env_hook_data
172
174
173 extras = {}
175 extras = {}
174 if hook_data:
176 if hook_data:
175 extras = json.loads(hook_data)
177 extras = json.loads(hook_data)
176 return extras
178 return extras
177
179
178
180
179 def _rev_range_hash(repo, node, check_heads=False):
181 def _rev_range_hash(repo, node, check_heads=False):
180 from vcsserver.hgcompat import get_ctx
182 from vcsserver.hgcompat import get_ctx
181
183
182 commits = []
184 commits = []
183 revs = []
185 revs = []
184 start = get_ctx(repo, node).rev()
186 start = get_ctx(repo, node).rev()
185 end = len(repo)
187 end = len(repo)
186 for rev in range(start, end):
188 for rev in range(start, end):
187 revs.append(rev)
189 revs.append(rev)
188 ctx = get_ctx(repo, rev)
190 ctx = get_ctx(repo, rev)
189 commit_id = ascii_str(mercurial.node.hex(ctx.node()))
191 commit_id = ascii_str(mercurial.node.hex(ctx.node()))
190 branch = safe_str(ctx.branch())
192 branch = safe_str(ctx.branch())
191 commits.append((commit_id, branch))
193 commits.append((commit_id, branch))
192
194
193 parent_heads = []
195 parent_heads = []
194 if check_heads:
196 if check_heads:
195 parent_heads = _check_heads(repo, start, end, revs)
197 parent_heads = _check_heads(repo, start, end, revs)
196 return commits, parent_heads
198 return commits, parent_heads
197
199
198
200
199 def _check_heads(repo, start, end, commits):
201 def _check_heads(repo, start, end, commits):
200 from vcsserver.hgcompat import get_ctx
202 from vcsserver.hgcompat import get_ctx
201 changelog = repo.changelog
203 changelog = repo.changelog
202 parents = set()
204 parents = set()
203
205
204 for new_rev in commits:
206 for new_rev in commits:
205 for p in changelog.parentrevs(new_rev):
207 for p in changelog.parentrevs(new_rev):
206 if p == mercurial.node.nullrev:
208 if p == mercurial.node.nullrev:
207 continue
209 continue
208 if p < start:
210 if p < start:
209 parents.add(p)
211 parents.add(p)
210
212
211 for p in parents:
213 for p in parents:
212 branch = get_ctx(repo, p).branch()
214 branch = get_ctx(repo, p).branch()
213 # The heads descending from that parent, on the same branch
215 # The heads descending from that parent, on the same branch
214 parent_heads = {p}
216 parent_heads = {p}
215 reachable = {p}
217 reachable = {p}
216 for x in range(p + 1, end):
218 for x in range(p + 1, end):
217 if get_ctx(repo, x).branch() != branch:
219 if get_ctx(repo, x).branch() != branch:
218 continue
220 continue
219 for pp in changelog.parentrevs(x):
221 for pp in changelog.parentrevs(x):
220 if pp in reachable:
222 if pp in reachable:
221 reachable.add(x)
223 reachable.add(x)
222 parent_heads.discard(pp)
224 parent_heads.discard(pp)
223 parent_heads.add(x)
225 parent_heads.add(x)
224 # More than one head? Suggest merging
226 # More than one head? Suggest merging
225 if len(parent_heads) > 1:
227 if len(parent_heads) > 1:
226 return list(parent_heads)
228 return list(parent_heads)
227
229
228 return []
230 return []
229
231
230
232
231 def _get_git_env():
233 def _get_git_env():
232 env = {}
234 env = {}
233 for k, v in os.environ.items():
235 for k, v in os.environ.items():
234 if k.startswith('GIT'):
236 if k.startswith('GIT'):
235 env[k] = v
237 env[k] = v
236
238
237 # serialized version
239 # serialized version
238 return [(k, v) for k, v in env.items()]
240 return [(k, v) for k, v in env.items()]
239
241
240
242
241 def _get_hg_env(old_rev, new_rev, txnid, repo_path):
243 def _get_hg_env(old_rev, new_rev, txnid, repo_path):
242 env = {}
244 env = {}
243 for k, v in os.environ.items():
245 for k, v in os.environ.items():
244 if k.startswith('HG'):
246 if k.startswith('HG'):
245 env[k] = v
247 env[k] = v
246
248
247 env['HG_NODE'] = old_rev
249 env['HG_NODE'] = old_rev
248 env['HG_NODE_LAST'] = new_rev
250 env['HG_NODE_LAST'] = new_rev
249 env['HG_TXNID'] = txnid
251 env['HG_TXNID'] = txnid
250 env['HG_PENDING'] = repo_path
252 env['HG_PENDING'] = repo_path
251
253
252 return [(k, v) for k, v in env.items()]
254 return [(k, v) for k, v in env.items()]
253
255
254
256
255 def _get_ini_settings(ini_file):
257 def _get_ini_settings(ini_file):
256 from vcsserver.http_main import sanitize_settings_and_apply_defaults
258 from vcsserver.http_main import sanitize_settings_and_apply_defaults
257 from vcsserver.lib.config_utils import get_app_config_lightweight, configure_and_store_settings
259 from vcsserver.lib.config_utils import get_app_config_lightweight, configure_and_store_settings
258
260
259 global_config = {'__file__': ini_file}
261 global_config = {'__file__': ini_file}
260 ini_settings = get_app_config_lightweight(ini_file)
262 ini_settings = get_app_config_lightweight(ini_file)
261 sanitize_settings_and_apply_defaults(global_config, ini_settings)
263 sanitize_settings_and_apply_defaults(global_config, ini_settings)
262 configure_and_store_settings(global_config, ini_settings)
264 configure_and_store_settings(global_config, ini_settings)
263
265
264 return ini_settings
266 return ini_settings
265
267
266
268
267 def _fix_hooks_executables(ini_path=''):
269 def _fix_hooks_executables(ini_path=''):
268 """
270 """
269 This is a trick to set proper settings.EXECUTABLE paths for certain execution patterns
271 This is a trick to set proper settings.EXECUTABLE paths for certain execution patterns
270 especially for subversion where hooks strip entire env, and calling just 'svn' command will most likely fail
272 especially for subversion where hooks strip entire env, and calling just 'svn' command will most likely fail
271 because svn is not on PATH
273 because svn is not on PATH
272 """
274 """
273 # set defaults, in case we can't read from ini_file
275 # set defaults, in case we can't read from ini_file
274 core_binary_dir = settings.BINARY_DIR or '/usr/local/bin/rhodecode_bin/vcs_bin'
276 core_binary_dir = settings.BINARY_DIR or '/usr/local/bin/rhodecode_bin/vcs_bin'
275 if ini_path:
277 if ini_path:
276 ini_settings = _get_ini_settings(ini_path)
278 ini_settings = _get_ini_settings(ini_path)
277 core_binary_dir = ini_settings['core.binary_dir']
279 core_binary_dir = ini_settings['core.binary_dir']
278
280
279 settings.BINARY_DIR = core_binary_dir
281 settings.BINARY_DIR = core_binary_dir
280
282
281
283
282 def repo_size(ui, repo, **kwargs):
284 def repo_size(ui, repo, **kwargs):
283 extras = _extras_from_ui(ui)
285 extras = _extras_from_ui(ui)
284 return _call_hook('repo_size', extras, HgMessageWriter(ui))
286 return _call_hook('repo_size', extras, HgMessageWriter(ui))
285
287
286
288
287 def pre_pull(ui, repo, **kwargs):
289 def pre_pull(ui, repo, **kwargs):
288 extras = _extras_from_ui(ui)
290 extras = _extras_from_ui(ui)
289 return _call_hook('pre_pull', extras, HgMessageWriter(ui))
291 return _call_hook('pre_pull', extras, HgMessageWriter(ui))
290
292
291
293
292 def pre_pull_ssh(ui, repo, **kwargs):
294 def pre_pull_ssh(ui, repo, **kwargs):
293 extras = _extras_from_ui(ui)
295 extras = _extras_from_ui(ui)
294 if extras and extras.get('SSH'):
296 if extras and extras.get('SSH'):
295 return pre_pull(ui, repo, **kwargs)
297 return pre_pull(ui, repo, **kwargs)
296 return 0
298 return 0
297
299
298
300
299 def post_pull(ui, repo, **kwargs):
301 def post_pull(ui, repo, **kwargs):
300 extras = _extras_from_ui(ui)
302 extras = _extras_from_ui(ui)
301 return _call_hook('post_pull', extras, HgMessageWriter(ui))
303 return _call_hook('post_pull', extras, HgMessageWriter(ui))
302
304
303
305
304 def post_pull_ssh(ui, repo, **kwargs):
306 def post_pull_ssh(ui, repo, **kwargs):
305 extras = _extras_from_ui(ui)
307 extras = _extras_from_ui(ui)
306 if extras and extras.get('SSH'):
308 if extras and extras.get('SSH'):
307 return post_pull(ui, repo, **kwargs)
309 return post_pull(ui, repo, **kwargs)
308 return 0
310 return 0
309
311
310
312
311 def pre_push(ui, repo, node=None, **kwargs):
313 def pre_push(ui, repo, node=None, **kwargs):
312 """
314 """
313 Mercurial pre_push hook
315 Mercurial pre_push hook
314 """
316 """
315 extras = _extras_from_ui(ui)
317 extras = _extras_from_ui(ui)
316 detect_force_push = extras.get('detect_force_push')
318 detect_force_push = extras.get('detect_force_push')
317
319
318 rev_data = []
320 rev_data = []
319 hook_type: str = safe_str(kwargs.get('hooktype'))
321 hook_type: str = safe_str(kwargs.get('hooktype'))
320
322
321 if node and hook_type == 'pretxnchangegroup':
323 if node and hook_type == 'pretxnchangegroup':
322 branches = collections.defaultdict(list)
324 branches = collections.defaultdict(list)
323 commits, _heads = _rev_range_hash(repo, node, check_heads=detect_force_push)
325 commits, _heads = _rev_range_hash(repo, node, check_heads=detect_force_push)
324 for commit_id, branch in commits:
326 for commit_id, branch in commits:
325 branches[branch].append(commit_id)
327 branches[branch].append(commit_id)
326
328
327 for branch, commits in branches.items():
329 for branch, commits in branches.items():
328 old_rev = ascii_str(kwargs.get('node_last')) or commits[0]
330 old_rev = ascii_str(kwargs.get('node_last')) or commits[0]
329 rev_data.append({
331 rev_data.append({
330 'total_commits': len(commits),
332 'total_commits': len(commits),
331 'old_rev': old_rev,
333 'old_rev': old_rev,
332 'new_rev': commits[-1],
334 'new_rev': commits[-1],
333 'ref': '',
335 'ref': '',
334 'type': 'branch',
336 'type': 'branch',
335 'name': branch,
337 'name': branch,
336 })
338 })
337
339
338 for push_ref in rev_data:
340 for push_ref in rev_data:
339 push_ref['multiple_heads'] = _heads
341 push_ref['multiple_heads'] = _heads
340
342
341 repo_path = os.path.join(
343 repo_path = os.path.join(
342 extras.get('repo_store', ''), extras.get('repository', ''))
344 extras.get('repo_store', ''), extras.get('repository', ''))
343 push_ref['hg_env'] = _get_hg_env(
345 push_ref['hg_env'] = _get_hg_env(
344 old_rev=push_ref['old_rev'],
346 old_rev=push_ref['old_rev'],
345 new_rev=push_ref['new_rev'], txnid=ascii_str(kwargs.get('txnid')),
347 new_rev=push_ref['new_rev'], txnid=ascii_str(kwargs.get('txnid')),
346 repo_path=repo_path)
348 repo_path=repo_path)
347
349
348 extras['hook_type'] = hook_type or 'pre_push'
350 extras['hook_type'] = hook_type or 'pre_push'
349 extras['commit_ids'] = rev_data
351 extras['commit_ids'] = rev_data
350
352
351 return _call_hook('pre_push', extras, HgMessageWriter(ui))
353 return _call_hook('pre_push', extras, HgMessageWriter(ui))
352
354
353
355
354 def pre_push_ssh(ui, repo, node=None, **kwargs):
356 def pre_push_ssh(ui, repo, node=None, **kwargs):
355 extras = _extras_from_ui(ui)
357 extras = _extras_from_ui(ui)
356 if extras.get('SSH'):
358 if extras.get('SSH'):
357 return pre_push(ui, repo, node, **kwargs)
359 return pre_push(ui, repo, node, **kwargs)
358
360
359 return 0
361 return 0
360
362
361
363
362 def pre_push_ssh_auth(ui, repo, node=None, **kwargs):
364 def pre_push_ssh_auth(ui, repo, node=None, **kwargs):
363 """
365 """
364 Mercurial pre_push hook for SSH
366 Mercurial pre_push hook for SSH
365 """
367 """
366 extras = _extras_from_ui(ui)
368 extras = _extras_from_ui(ui)
367 if extras.get('SSH'):
369 if extras.get('SSH'):
368 permission = extras['SSH_PERMISSIONS']
370 permission = extras['SSH_PERMISSIONS']
369
371
370 if 'repository.write' == permission or 'repository.admin' == permission:
372 if 'repository.write' == permission or 'repository.admin' == permission:
371 return 0
373 return 0
372
374
373 # non-zero ret code
375 # non-zero ret code
374 return 1
376 return 1
375
377
376 return 0
378 return 0
377
379
378
380
379 def post_push(ui, repo, node, **kwargs):
381 def post_push(ui, repo, node, **kwargs):
380 """
382 """
381 Mercurial post_push hook
383 Mercurial post_push hook
382 """
384 """
383 extras = _extras_from_ui(ui)
385 extras = _extras_from_ui(ui)
384
386
385 commit_ids = []
387 commit_ids = []
386 branches = []
388 branches = []
387 bookmarks = []
389 bookmarks = []
388 tags = []
390 tags = []
389 hook_type: str = safe_str(kwargs.get('hooktype'))
391 hook_type: str = safe_str(kwargs.get('hooktype'))
390
392
391 commits, _heads = _rev_range_hash(repo, node)
393 commits, _heads = _rev_range_hash(repo, node)
392 for commit_id, branch in commits:
394 for commit_id, branch in commits:
393 commit_ids.append(commit_id)
395 commit_ids.append(commit_id)
394 if branch not in branches:
396 if branch not in branches:
395 branches.append(branch)
397 branches.append(branch)
396
398
397 if hasattr(ui, '_rc_pushkey_bookmarks'):
399 if hasattr(ui, '_rc_pushkey_bookmarks'):
398 bookmarks = ui._rc_pushkey_bookmarks
400 bookmarks = ui._rc_pushkey_bookmarks
399
401
400 extras['hook_type'] = hook_type or 'post_push'
402 extras['hook_type'] = hook_type or 'post_push'
401 extras['commit_ids'] = commit_ids
403 extras['commit_ids'] = commit_ids
402
404
403 extras['new_refs'] = {
405 extras['new_refs'] = {
404 'branches': branches,
406 'branches': branches,
405 'bookmarks': bookmarks,
407 'bookmarks': bookmarks,
406 'tags': tags
408 'tags': tags
407 }
409 }
408
410
409 return _call_hook('post_push', extras, HgMessageWriter(ui))
411 return _call_hook('post_push', extras, HgMessageWriter(ui))
410
412
411
413
412 def post_push_ssh(ui, repo, node, **kwargs):
414 def post_push_ssh(ui, repo, node, **kwargs):
413 """
415 """
414 Mercurial post_push hook for SSH
416 Mercurial post_push hook for SSH
415 """
417 """
416 if _extras_from_ui(ui).get('SSH'):
418 if _extras_from_ui(ui).get('SSH'):
417 return post_push(ui, repo, node, **kwargs)
419 return post_push(ui, repo, node, **kwargs)
418 return 0
420 return 0
419
421
420
422
421 def key_push(ui, repo, **kwargs):
423 def key_push(ui, repo, **kwargs):
422 from vcsserver.hgcompat import get_ctx
424 from vcsserver.hgcompat import get_ctx
423
425
424 if kwargs['new'] != b'0' and kwargs['namespace'] == b'bookmarks':
426 if kwargs['new'] != b'0' and kwargs['namespace'] == b'bookmarks':
425 # store new bookmarks in our UI object propagated later to post_push
427 # store new bookmarks in our UI object propagated later to post_push
426 ui._rc_pushkey_bookmarks = get_ctx(repo, kwargs['key']).bookmarks()
428 ui._rc_pushkey_bookmarks = get_ctx(repo, kwargs['key']).bookmarks()
427 return
429 return
428
430
429
431
430 # backward compat
432 # backward compat
431 log_pull_action = post_pull
433 log_pull_action = post_pull
432
434
433 # backward compat
435 # backward compat
434 log_push_action = post_push
436 log_push_action = post_push
435
437
436
438
437 def handle_git_pre_receive(unused_repo_path, unused_revs, unused_env):
439 def handle_git_pre_receive(unused_repo_path, unused_revs, unused_env):
438 """
440 """
439 Old hook name: keep here for backward compatibility.
441 Old hook name: keep here for backward compatibility.
440
442
441 This is only required when the installed git hooks are not upgraded.
443 This is only required when the installed git hooks are not upgraded.
442 """
444 """
443 pass
445 pass
444
446
445
447
446 def handle_git_post_receive(unused_repo_path, unused_revs, unused_env):
448 def handle_git_post_receive(unused_repo_path, unused_revs, unused_env):
447 """
449 """
448 Old hook name: keep here for backward compatibility.
450 Old hook name: keep here for backward compatibility.
449
451
450 This is only required when the installed git hooks are not upgraded.
452 This is only required when the installed git hooks are not upgraded.
451 """
453 """
452 pass
454 pass
453
455
454
456
455 @dataclasses.dataclass
457 @dataclasses.dataclass
456 class HookResponse:
458 class HookResponse:
457 status: int
459 status: int
458 output: str
460 output: str
459
461
460
462
461 def git_pre_pull(extras) -> HookResponse:
463 def git_pre_pull(extras) -> HookResponse:
462 """
464 """
463 Pre pull hook.
465 Pre pull hook.
464
466
465 :param extras: dictionary containing the keys defined in simplevcs
467 :param extras: dictionary containing the keys defined in simplevcs
466 :type extras: dict
468 :type extras: dict
467
469
468 :return: status code of the hook. 0 for success.
470 :return: status code of the hook. 0 for success.
469 :rtype: int
471 :rtype: int
470 """
472 """
471
473
472 if 'pull' not in extras['hooks']:
474 if 'pull' not in extras['hooks']:
473 return HookResponse(0, '')
475 return HookResponse(0, '')
474
476
475 stdout = io.StringIO()
477 stdout = io.StringIO()
476 try:
478 try:
477 status_code = _call_hook('pre_pull', extras, GitMessageWriter(stdout))
479 status_code = _call_hook('pre_pull', extras, GitMessageWriter(stdout))
478
480
479 except Exception as error:
481 except Exception as error:
480 log.exception('Failed to call pre_pull hook')
482 log.exception('Failed to call pre_pull hook')
481 status_code = 128
483 status_code = 128
482 stdout.write(f'ERROR: {error}\n')
484 stdout.write(f'ERROR: {error}\n')
483
485
484 return HookResponse(status_code, stdout.getvalue())
486 return HookResponse(status_code, stdout.getvalue())
485
487
486
488
487 def git_post_pull(extras) -> HookResponse:
489 def git_post_pull(extras) -> HookResponse:
488 """
490 """
489 Post pull hook.
491 Post pull hook.
490
492
491 :param extras: dictionary containing the keys defined in simplevcs
493 :param extras: dictionary containing the keys defined in simplevcs
492 :type extras: dict
494 :type extras: dict
493
495
494 :return: status code of the hook. 0 for success.
496 :return: status code of the hook. 0 for success.
495 :rtype: int
497 :rtype: int
496 """
498 """
497 if 'pull' not in extras['hooks']:
499 if 'pull' not in extras['hooks']:
498 return HookResponse(0, '')
500 return HookResponse(0, '')
499
501
500 stdout = io.StringIO()
502 stdout = io.StringIO()
501 try:
503 try:
502 status = _call_hook('post_pull', extras, GitMessageWriter(stdout))
504 status = _call_hook('post_pull', extras, GitMessageWriter(stdout))
503 except Exception as error:
505 except Exception as error:
504 status = 128
506 status = 128
505 stdout.write(f'ERROR: {error}\n')
507 stdout.write(f'ERROR: {error}\n')
506
508
507 return HookResponse(status, stdout.getvalue())
509 return HookResponse(status, stdout.getvalue())
508
510
509
511
510 def _parse_git_ref_lines(revision_lines):
512 def _parse_git_ref_lines(revision_lines):
511 rev_data = []
513 rev_data = []
512 for revision_line in revision_lines or []:
514 for revision_line in revision_lines or []:
513 old_rev, new_rev, ref = revision_line.strip().split(' ')
515 old_rev, new_rev, ref = revision_line.strip().split(' ')
514 ref_data = ref.split('/', 2)
516 ref_data = ref.split('/', 2)
515 if ref_data[1] in ('tags', 'heads'):
517 if ref_data[1] in ('tags', 'heads'):
516 rev_data.append({
518 rev_data.append({
517 # NOTE(marcink):
519 # NOTE(marcink):
518 # we're unable to tell total_commits for git at this point
520 # we're unable to tell total_commits for git at this point
519 # but we set the variable for consistency with GIT
521 # but we set the variable for consistency with GIT
520 'total_commits': -1,
522 'total_commits': -1,
521 'old_rev': old_rev,
523 'old_rev': old_rev,
522 'new_rev': new_rev,
524 'new_rev': new_rev,
523 'ref': ref,
525 'ref': ref,
524 'type': ref_data[1],
526 'type': ref_data[1],
525 'name': ref_data[2],
527 'name': ref_data[2],
526 })
528 })
527 return rev_data
529 return rev_data
528
530
529
531
530 def git_pre_receive(unused_repo_path, revision_lines, env) -> int:
532 def git_pre_receive(unused_repo_path, revision_lines, env) -> int:
531 """
533 """
532 Pre push hook.
534 Pre push hook.
533
535
534 :return: status code of the hook. 0 for success.
536 :return: status code of the hook. 0 for success.
535 """
537 """
536 extras = json.loads(env['RC_SCM_DATA'])
538 extras = json.loads(env['RC_SCM_DATA'])
537 rev_data = _parse_git_ref_lines(revision_lines)
539 rev_data = _parse_git_ref_lines(revision_lines)
538 if 'push' not in extras['hooks']:
540 if 'push' not in extras['hooks']:
539 return 0
541 return 0
540 _fix_hooks_executables(env.get('RC_INI_FILE'))
542 _fix_hooks_executables(env.get('RC_INI_FILE'))
541
543
542 empty_commit_id = '0' * 40
544 empty_commit_id = '0' * 40
543
545
544 detect_force_push = extras.get('detect_force_push')
546 detect_force_push = extras.get('detect_force_push')
545
547
546 for push_ref in rev_data:
548 for push_ref in rev_data:
547 # store our git-env which holds the temp store
549 # store our git-env which holds the temp store
548 push_ref['git_env'] = _get_git_env()
550 push_ref['git_env'] = _get_git_env()
549 push_ref['pruned_sha'] = ''
551 push_ref['pruned_sha'] = ''
550 if not detect_force_push:
552 if not detect_force_push:
551 # don't check for forced-push when we don't need to
553 # don't check for forced-push when we don't need to
552 continue
554 continue
553
555
554 type_ = push_ref['type']
556 type_ = push_ref['type']
555 new_branch = push_ref['old_rev'] == empty_commit_id
557 new_branch = push_ref['old_rev'] == empty_commit_id
556 delete_branch = push_ref['new_rev'] == empty_commit_id
558 delete_branch = push_ref['new_rev'] == empty_commit_id
557 if type_ == 'heads' and not (new_branch or delete_branch):
559 if type_ == 'heads' and not (new_branch or delete_branch):
558 old_rev = push_ref['old_rev']
560 old_rev = push_ref['old_rev']
559 new_rev = push_ref['new_rev']
561 new_rev = push_ref['new_rev']
560 cmd = [settings.GIT_EXECUTABLE(), 'rev-list', old_rev, f'^{new_rev}']
562 cmd = [settings.GIT_EXECUTABLE(), 'rev-list', old_rev, f'^{new_rev}']
561 stdout, stderr = subprocessio.run_command(
563 stdout, stderr = subprocessio.run_command(
562 cmd, env=os.environ.copy())
564 cmd, env=os.environ.copy())
563 # means we're having some non-reachable objects, this forced push was used
565 # means we're having some non-reachable objects, this forced push was used
564 if stdout:
566 if stdout:
565 push_ref['pruned_sha'] = stdout.splitlines()
567 push_ref['pruned_sha'] = stdout.splitlines()
566
568
567 extras['hook_type'] = 'pre_receive'
569 extras['hook_type'] = 'pre_receive'
568 extras['commit_ids'] = rev_data
570 extras['commit_ids'] = rev_data
569
571
570 stdout = sys.stdout
572 stdout = sys.stdout
571 status_code = _call_hook('pre_push', extras, GitMessageWriter(stdout))
573 status_code = _call_hook('pre_push', extras, GitMessageWriter(stdout))
572
574
573 return status_code
575 return status_code
574
576
575
577
576 def git_post_receive(unused_repo_path, revision_lines, env) -> int:
578 def git_post_receive(unused_repo_path, revision_lines, env) -> int:
577 """
579 """
578 Post push hook.
580 Post push hook.
579
581
580 :return: status code of the hook. 0 for success.
582 :return: status code of the hook. 0 for success.
581 """
583 """
582 extras = json.loads(env['RC_SCM_DATA'])
584 extras = json.loads(env['RC_SCM_DATA'])
583 if 'push' not in extras['hooks']:
585 if 'push' not in extras['hooks']:
584 return 0
586 return 0
585
587
586 _fix_hooks_executables(env.get('RC_INI_FILE'))
588 _fix_hooks_executables(env.get('RC_INI_FILE'))
587
589
588 rev_data = _parse_git_ref_lines(revision_lines)
590 rev_data = _parse_git_ref_lines(revision_lines)
589
591
590 git_revs = []
592 git_revs = []
591
593
592 # N.B.(skreft): it is ok to just call git, as git before calling a
594 # N.B.(skreft): it is ok to just call git, as git before calling a
593 # subcommand sets the PATH environment variable so that it point to the
595 # subcommand sets the PATH environment variable so that it point to the
594 # correct version of the git executable.
596 # correct version of the git executable.
595 empty_commit_id = '0' * 40
597 empty_commit_id = '0' * 40
596 branches = []
598 branches = []
597 tags = []
599 tags = []
598 for push_ref in rev_data:
600 for push_ref in rev_data:
599 type_ = push_ref['type']
601 type_ = push_ref['type']
600
602
601 if type_ == 'heads':
603 if type_ == 'heads':
602 # starting new branch case
604 # starting new branch case
603 if push_ref['old_rev'] == empty_commit_id:
605 if push_ref['old_rev'] == empty_commit_id:
604 push_ref_name = push_ref['name']
606 push_ref_name = push_ref['name']
605
607
606 if push_ref_name not in branches:
608 if push_ref_name not in branches:
607 branches.append(push_ref_name)
609 branches.append(push_ref_name)
608
610
609 need_head_set = ''
611 need_head_set = ''
610 with Repository(os.getcwd()) as repo:
612 with Repository(os.getcwd()) as repo:
611 try:
613 try:
612 repo.head
614 repo.head
613 except pygit2.GitError:
615 except pygit2.GitError:
614 need_head_set = f'refs/heads/{push_ref_name}'
616 need_head_set = f'refs/heads/{push_ref_name}'
615
617
616 if need_head_set:
618 if need_head_set:
617 repo.set_head(need_head_set)
619 repo.set_head(need_head_set)
618 print(f"Setting default branch to {push_ref_name}")
620 print(f"Setting default branch to {push_ref_name}")
619
621
620 cmd = [settings.GIT_EXECUTABLE(), 'for-each-ref', '--format=%(refname)', 'refs/heads/*']
622 cmd = [settings.GIT_EXECUTABLE(), 'for-each-ref', '--format=%(refname)', 'refs/heads/*']
621 stdout, stderr = subprocessio.run_command(
623 stdout, stderr = subprocessio.run_command(
622 cmd, env=os.environ.copy())
624 cmd, env=os.environ.copy())
623 heads = safe_str(stdout)
625 heads = safe_str(stdout)
624 heads = heads.replace(push_ref['ref'], '')
626 heads = heads.replace(push_ref['ref'], '')
625 heads = ' '.join(head for head
627 heads = ' '.join(head for head
626 in heads.splitlines() if head) or '.'
628 in heads.splitlines() if head) or '.'
627 cmd = [settings.GIT_EXECUTABLE(), 'log', '--reverse',
629 cmd = [settings.GIT_EXECUTABLE(), 'log', '--reverse',
628 '--pretty=format:%H', '--', push_ref['new_rev'],
630 '--pretty=format:%H', '--', push_ref['new_rev'],
629 '--not', heads]
631 '--not', heads]
630 stdout, stderr = subprocessio.run_command(
632 stdout, stderr = subprocessio.run_command(
631 cmd, env=os.environ.copy())
633 cmd, env=os.environ.copy())
632 git_revs.extend(list(map(ascii_str, stdout.splitlines())))
634 git_revs.extend(list(map(ascii_str, stdout.splitlines())))
633
635
634 # delete branch case
636 # delete branch case
635 elif push_ref['new_rev'] == empty_commit_id:
637 elif push_ref['new_rev'] == empty_commit_id:
636 git_revs.append(f'delete_branch=>{push_ref["name"]}')
638 git_revs.append(f'delete_branch=>{push_ref["name"]}')
637 else:
639 else:
638 if push_ref['name'] not in branches:
640 if push_ref['name'] not in branches:
639 branches.append(push_ref['name'])
641 branches.append(push_ref['name'])
640
642
641 cmd = [settings.GIT_EXECUTABLE(), 'log',
643 cmd = [settings.GIT_EXECUTABLE(), 'log',
642 f'{push_ref["old_rev"]}..{push_ref["new_rev"]}',
644 f'{push_ref["old_rev"]}..{push_ref["new_rev"]}',
643 '--reverse', '--pretty=format:%H']
645 '--reverse', '--pretty=format:%H']
644 stdout, stderr = subprocessio.run_command(
646 stdout, stderr = subprocessio.run_command(
645 cmd, env=os.environ.copy())
647 cmd, env=os.environ.copy())
646 # we get bytes from stdout, we need str to be consistent
648 # we get bytes from stdout, we need str to be consistent
647 log_revs = list(map(ascii_str, stdout.splitlines()))
649 log_revs = list(map(ascii_str, stdout.splitlines()))
648 git_revs.extend(log_revs)
650 git_revs.extend(log_revs)
649
651
650 # Pure pygit2 impl. but still 2-3x slower :/
652 # Pure pygit2 impl. but still 2-3x slower :/
651 # results = []
653 # results = []
652 #
654 #
653 # with Repository(os.getcwd()) as repo:
655 # with Repository(os.getcwd()) as repo:
654 # repo_new_rev = repo[push_ref['new_rev']]
656 # repo_new_rev = repo[push_ref['new_rev']]
655 # repo_old_rev = repo[push_ref['old_rev']]
657 # repo_old_rev = repo[push_ref['old_rev']]
656 # walker = repo.walk(repo_new_rev.id, pygit2.GIT_SORT_TOPOLOGICAL)
658 # walker = repo.walk(repo_new_rev.id, pygit2.GIT_SORT_TOPOLOGICAL)
657 #
659 #
658 # for commit in walker:
660 # for commit in walker:
659 # if commit.id == repo_old_rev.id:
661 # if commit.id == repo_old_rev.id:
660 # break
662 # break
661 # results.append(commit.id.hex)
663 # results.append(commit.id.hex)
662 # # reverse the order, can't use GIT_SORT_REVERSE
664 # # reverse the order, can't use GIT_SORT_REVERSE
663 # log_revs = results[::-1]
665 # log_revs = results[::-1]
664
666
665 elif type_ == 'tags':
667 elif type_ == 'tags':
666 if push_ref['name'] not in tags:
668 if push_ref['name'] not in tags:
667 tags.append(push_ref['name'])
669 tags.append(push_ref['name'])
668 git_revs.append(f'tag=>{push_ref["name"]}')
670 git_revs.append(f'tag=>{push_ref["name"]}')
669
671
670 extras['hook_type'] = 'post_receive'
672 extras['hook_type'] = 'post_receive'
671 extras['commit_ids'] = git_revs
673 extras['commit_ids'] = git_revs
672 extras['new_refs'] = {
674 extras['new_refs'] = {
673 'branches': branches,
675 'branches': branches,
674 'bookmarks': [],
676 'bookmarks': [],
675 'tags': tags,
677 'tags': tags,
676 }
678 }
677
679
678 stdout = sys.stdout
680 stdout = sys.stdout
679
681
680 if 'repo_size' in extras['hooks']:
682 if 'repo_size' in extras['hooks']:
681 try:
683 try:
682 _call_hook('repo_size', extras, GitMessageWriter(stdout))
684 _call_hook('repo_size', extras, GitMessageWriter(stdout))
683 except Exception:
685 except Exception:
684 pass
686 pass
685
687
686 status_code = _call_hook('post_push', extras, GitMessageWriter(stdout))
688 status_code = _call_hook('post_push', extras, GitMessageWriter(stdout))
687 return status_code
689 return status_code
688
690
689
691
690 def get_extras_from_txn_id(repo_path, txn_id):
692 def get_extras_from_txn_id(repo_path, txn_id):
691 extras = get_txn_id_from_store(repo_path, txn_id)
693 extras = get_txn_id_from_store(repo_path, txn_id)
692 return extras
694 return extras
693
695
694
696
695 def svn_pre_commit(repo_path, commit_data, env):
697 def svn_pre_commit(repo_path, commit_data, env):
696
698
697 path, txn_id = commit_data
699 path, txn_id = commit_data
698 branches = []
700 branches = []
699 tags = []
701 tags = []
700
702
701 if env.get('RC_SCM_DATA'):
703 if env.get('RC_SCM_DATA'):
702 extras = json.loads(env['RC_SCM_DATA'])
704 extras = json.loads(env['RC_SCM_DATA'])
703 else:
705 else:
704 ini_path = env.get('RC_INI_FILE')
706 ini_path = env.get('RC_INI_FILE')
705 if ini_path:
707 if ini_path:
706 _get_ini_settings(ini_path)
708 _get_ini_settings(ini_path)
707 # fallback method to read from TXN-ID stored data
709 # fallback method to read from TXN-ID stored data
708 extras = get_extras_from_txn_id(path, txn_id)
710 extras = get_extras_from_txn_id(path, txn_id)
709
711
710 if not extras:
712 if not extras:
711 raise ValueError('SVN-PRE-COMMIT: Failed to extract context data in called extras for hook execution')
713 raise ValueError('SVN-PRE-COMMIT: Failed to extract context data in called extras for hook execution')
712
714
713 if extras.get('rc_internal_commit'):
715 if extras.get('rc_internal_commit'):
714 # special marker for internal commit, we don't call hooks client
716 # special marker for internal commit, we don't call hooks client
715 return 0
717 return 0
716
718
717 extras['hook_type'] = 'pre_commit'
719 extras['hook_type'] = 'pre_commit'
718 extras['commit_ids'] = [txn_id]
720 extras['commit_ids'] = [txn_id]
719 extras['txn_id'] = txn_id
721 extras['txn_id'] = txn_id
720 extras['new_refs'] = {
722 extras['new_refs'] = {
721 'total_commits': 1,
723 'total_commits': 1,
722 'branches': branches,
724 'branches': branches,
723 'bookmarks': [],
725 'bookmarks': [],
724 'tags': tags,
726 'tags': tags,
725 }
727 }
726
728
727 return _call_hook('pre_push', extras, SvnMessageWriter())
729 return _call_hook('pre_push', extras, SvnMessageWriter())
728
730
729
731
730 def svn_post_commit(repo_path, commit_data, env):
732 def svn_post_commit(repo_path, commit_data, env):
731 """
733 """
732 commit_data is path, rev, txn_id
734 commit_data is path, rev, txn_id
733 """
735 """
734
736
735 if len(commit_data) == 3:
737 if len(commit_data) == 3:
736 path, commit_id, txn_id = commit_data
738 path, commit_id, txn_id = commit_data
737 elif len(commit_data) == 2:
739 elif len(commit_data) == 2:
738 log.error('Failed to extract txn_id from commit_data using legacy method. '
740 log.error('Failed to extract txn_id from commit_data using legacy method. '
739 'Some functionality might be limited')
741 'Some functionality might be limited')
740 path, commit_id = commit_data
742 path, commit_id = commit_data
741 txn_id = None
743 txn_id = None
742 else:
744 else:
743 return 0
745 return 0
744
746
745 branches = []
747 branches = []
746 tags = []
748 tags = []
747
749
748 if env.get('RC_SCM_DATA'):
750 if env.get('RC_SCM_DATA'):
749 extras = json.loads(env['RC_SCM_DATA'])
751 extras = json.loads(env['RC_SCM_DATA'])
750 else:
752 else:
751 ini_path = env.get('RC_INI_FILE')
753 ini_path = env.get('RC_INI_FILE')
752 if ini_path:
754 if ini_path:
753 _get_ini_settings(ini_path)
755 _get_ini_settings(ini_path)
754 # fallback method to read from TXN-ID stored data
756 # fallback method to read from TXN-ID stored data
755 extras = get_extras_from_txn_id(path, txn_id)
757 extras = get_extras_from_txn_id(path, txn_id)
756
758
757 if not extras and txn_id:
759 if not extras and txn_id:
758 raise ValueError('SVN-POST-COMMIT: Failed to extract context data in called extras for hook execution')
760 raise ValueError('SVN-POST-COMMIT: Failed to extract context data in called extras for hook execution')
759
761
760 if extras.get('rc_internal_commit'):
762 if extras.get('rc_internal_commit'):
761 # special marker for internal commit, we don't call hooks client
763 # special marker for internal commit, we don't call hooks client
762 return 0
764 return 0
763
765
764 extras['hook_type'] = 'post_commit'
766 extras['hook_type'] = 'post_commit'
765 extras['commit_ids'] = [commit_id]
767 extras['commit_ids'] = [commit_id]
766 extras['txn_id'] = txn_id
768 extras['txn_id'] = txn_id
767 extras['new_refs'] = {
769 extras['new_refs'] = {
768 'branches': branches,
770 'branches': branches,
769 'bookmarks': [],
771 'bookmarks': [],
770 'tags': tags,
772 'tags': tags,
771 'total_commits': 1,
773 'total_commits': 1,
772 }
774 }
773
775
774 if 'repo_size' in extras['hooks']:
776 if 'repo_size' in extras['hooks']:
775 try:
777 try:
776 _call_hook('repo_size', extras, SvnMessageWriter())
778 _call_hook('repo_size', extras, SvnMessageWriter())
777 except Exception:
779 except Exception:
778 pass
780 pass
779
781
780 return _call_hook('post_push', extras, SvnMessageWriter())
782 return _call_hook('post_push', extras, SvnMessageWriter())
@@ -1,763 +1,765 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2023 RhodeCode GmbH
2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 import io
18 import io
19 import os
19 import os
20 import platform
20 import platform
21 import sys
21 import sys
22 import locale
22 import locale
23 import logging
23 import logging
24 import uuid
24 import uuid
25 import time
25 import time
26 import wsgiref.util
26 import wsgiref.util
27 import tempfile
27 import tempfile
28 import psutil
28 import psutil
29
29
30 from itertools import chain
30 from itertools import chain
31
31
32 import msgpack
32 import msgpack
33 import configparser
33 import configparser
34
34
35 from pyramid.config import Configurator
35 from pyramid.config import Configurator
36 from pyramid.wsgi import wsgiapp
36 from pyramid.wsgi import wsgiapp
37 from pyramid.response import Response
37 from pyramid.response import Response
38
38
39 from vcsserver.base import BytesEnvelope, BinaryEnvelope
39 from vcsserver.base import BytesEnvelope, BinaryEnvelope
40
40
41 from vcsserver.config.settings_maker import SettingsMaker
41 from vcsserver.config.settings_maker import SettingsMaker
42
42
43 from vcsserver.tweens.request_wrapper import get_headers_call_context
43 from vcsserver.tweens.request_wrapper import get_headers_call_context
44
44
45 from vcsserver import remote_wsgi, scm_app, hgpatches
45 from vcsserver import remote_wsgi, scm_app, hgpatches
46 from vcsserver.server import VcsServer
46 from vcsserver.server import VcsServer
47 from vcsserver.git_lfs.app import GIT_LFS_CONTENT_TYPE, GIT_LFS_PROTO_PAT
47 from vcsserver.git_lfs.app import GIT_LFS_CONTENT_TYPE, GIT_LFS_PROTO_PAT
48 from vcsserver.echo_stub import remote_wsgi as remote_wsgi_stub
48 from vcsserver.echo_stub import remote_wsgi as remote_wsgi_stub
49 from vcsserver.echo_stub.echo_app import EchoApp
49 from vcsserver.echo_stub.echo_app import EchoApp
50 from vcsserver.exceptions import HTTPRepoLocked, HTTPRepoBranchProtected
50 from vcsserver.exceptions import HTTPRepoLocked, HTTPRepoBranchProtected, HTTPClientNotSupported
51 from vcsserver.lib.exc_tracking import store_exception, format_exc
51 from vcsserver.lib.exc_tracking import store_exception, format_exc
52 from vcsserver.lib.str_utils import safe_int
52 from vcsserver.lib.str_utils import safe_int
53 from vcsserver.lib.statsd_client import StatsdClient
53 from vcsserver.lib.statsd_client import StatsdClient
54 from vcsserver.lib.ext_json import json
54 from vcsserver.lib.ext_json import json
55 from vcsserver.lib.config_utils import configure_and_store_settings
55 from vcsserver.lib.config_utils import configure_and_store_settings
56
56
57
57
58 strict_vcs = True
58 strict_vcs = True
59
59
60 git_import_err = None
60 git_import_err = None
61 try:
61 try:
62 from vcsserver.remote.git_remote import GitFactory, GitRemote
62 from vcsserver.remote.git_remote import GitFactory, GitRemote
63 except ImportError as e:
63 except ImportError as e:
64 GitFactory = None
64 GitFactory = None
65 GitRemote = None
65 GitRemote = None
66 git_import_err = e
66 git_import_err = e
67 if strict_vcs:
67 if strict_vcs:
68 raise
68 raise
69
69
70
70
71 hg_import_err = None
71 hg_import_err = None
72 try:
72 try:
73 from vcsserver.remote.hg_remote import MercurialFactory, HgRemote
73 from vcsserver.remote.hg_remote import MercurialFactory, HgRemote
74 except ImportError as e:
74 except ImportError as e:
75 MercurialFactory = None
75 MercurialFactory = None
76 HgRemote = None
76 HgRemote = None
77 hg_import_err = e
77 hg_import_err = e
78 if strict_vcs:
78 if strict_vcs:
79 raise
79 raise
80
80
81
81
82 svn_import_err = None
82 svn_import_err = None
83 try:
83 try:
84 from vcsserver.remote.svn_remote import SubversionFactory, SvnRemote
84 from vcsserver.remote.svn_remote import SubversionFactory, SvnRemote
85 except ImportError as e:
85 except ImportError as e:
86 SubversionFactory = None
86 SubversionFactory = None
87 SvnRemote = None
87 SvnRemote = None
88 svn_import_err = e
88 svn_import_err = e
89 if strict_vcs:
89 if strict_vcs:
90 raise
90 raise
91
91
92 log = logging.getLogger(__name__)
92 log = logging.getLogger(__name__)
93
93
94 # due to Mercurial/glibc2.27 problems we need to detect if locale settings are
94 # due to Mercurial/glibc2.27 problems we need to detect if locale settings are
95 # causing problems and "fix" it in case they do and fallback to LC_ALL = C
95 # causing problems and "fix" it in case they do and fallback to LC_ALL = C
96
96
97 try:
97 try:
98 locale.setlocale(locale.LC_ALL, '')
98 locale.setlocale(locale.LC_ALL, '')
99 except locale.Error as e:
99 except locale.Error as e:
100 log.error('LOCALE ERROR: failed to set LC_ALL, fallback to LC_ALL=C, org error: %s', e)
100 log.error('LOCALE ERROR: failed to set LC_ALL, fallback to LC_ALL=C, org error: %s', e)
101 os.environ['LC_ALL'] = 'C'
101 os.environ['LC_ALL'] = 'C'
102
102
103
103
104 def _is_request_chunked(environ):
104 def _is_request_chunked(environ):
105 stream = environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked'
105 stream = environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked'
106 return stream
106 return stream
107
107
108
108
109 def log_max_fd():
109 def log_max_fd():
110 try:
110 try:
111 maxfd = psutil.Process().rlimit(psutil.RLIMIT_NOFILE)[1]
111 maxfd = psutil.Process().rlimit(psutil.RLIMIT_NOFILE)[1]
112 log.info('Max file descriptors value: %s', maxfd)
112 log.info('Max file descriptors value: %s', maxfd)
113 except Exception:
113 except Exception:
114 pass
114 pass
115
115
116
116
117 class VCS:
117 class VCS:
118 def __init__(self, locale_conf=None, cache_config=None):
118 def __init__(self, locale_conf=None, cache_config=None):
119 self.locale = locale_conf
119 self.locale = locale_conf
120 self.cache_config = cache_config
120 self.cache_config = cache_config
121 self._configure_locale()
121 self._configure_locale()
122
122
123 log_max_fd()
123 log_max_fd()
124
124
125 if GitFactory and GitRemote:
125 if GitFactory and GitRemote:
126 git_factory = GitFactory()
126 git_factory = GitFactory()
127 self._git_remote = GitRemote(git_factory)
127 self._git_remote = GitRemote(git_factory)
128 else:
128 else:
129 log.error("Git client import failed: %s", git_import_err)
129 log.error("Git client import failed: %s", git_import_err)
130
130
131 if MercurialFactory and HgRemote:
131 if MercurialFactory and HgRemote:
132 hg_factory = MercurialFactory()
132 hg_factory = MercurialFactory()
133 self._hg_remote = HgRemote(hg_factory)
133 self._hg_remote = HgRemote(hg_factory)
134 else:
134 else:
135 log.error("Mercurial client import failed: %s", hg_import_err)
135 log.error("Mercurial client import failed: %s", hg_import_err)
136
136
137 if SubversionFactory and SvnRemote:
137 if SubversionFactory and SvnRemote:
138 svn_factory = SubversionFactory()
138 svn_factory = SubversionFactory()
139
139
140 # hg factory is used for svn url validation
140 # hg factory is used for svn url validation
141 hg_factory = MercurialFactory()
141 hg_factory = MercurialFactory()
142 self._svn_remote = SvnRemote(svn_factory, hg_factory=hg_factory)
142 self._svn_remote = SvnRemote(svn_factory, hg_factory=hg_factory)
143 else:
143 else:
144 log.error("Subversion client import failed: %s", svn_import_err)
144 log.error("Subversion client import failed: %s", svn_import_err)
145
145
146 self._vcsserver = VcsServer()
146 self._vcsserver = VcsServer()
147
147
148 def _configure_locale(self):
148 def _configure_locale(self):
149 if self.locale:
149 if self.locale:
150 log.info('Settings locale: `LC_ALL` to %s', self.locale)
150 log.info('Settings locale: `LC_ALL` to %s', self.locale)
151 else:
151 else:
152 log.info('Configuring locale subsystem based on environment variables')
152 log.info('Configuring locale subsystem based on environment variables')
153 try:
153 try:
154 # If self.locale is the empty string, then the locale
154 # If self.locale is the empty string, then the locale
155 # module will use the environment variables. See the
155 # module will use the environment variables. See the
156 # documentation of the package `locale`.
156 # documentation of the package `locale`.
157 locale.setlocale(locale.LC_ALL, self.locale)
157 locale.setlocale(locale.LC_ALL, self.locale)
158
158
159 language_code, encoding = locale.getlocale()
159 language_code, encoding = locale.getlocale()
160 log.info(
160 log.info(
161 'Locale set to language code "%s" with encoding "%s".',
161 'Locale set to language code "%s" with encoding "%s".',
162 language_code, encoding)
162 language_code, encoding)
163 except locale.Error:
163 except locale.Error:
164 log.exception('Cannot set locale, not configuring the locale system')
164 log.exception('Cannot set locale, not configuring the locale system')
165
165
166
166
167 class WsgiProxy:
167 class WsgiProxy:
168 def __init__(self, wsgi):
168 def __init__(self, wsgi):
169 self.wsgi = wsgi
169 self.wsgi = wsgi
170
170
171 def __call__(self, environ, start_response):
171 def __call__(self, environ, start_response):
172 input_data = environ['wsgi.input'].read()
172 input_data = environ['wsgi.input'].read()
173 input_data = msgpack.unpackb(input_data)
173 input_data = msgpack.unpackb(input_data)
174
174
175 error = None
175 error = None
176 try:
176 try:
177 data, status, headers = self.wsgi.handle(
177 data, status, headers = self.wsgi.handle(
178 input_data['environment'], input_data['input_data'],
178 input_data['environment'], input_data['input_data'],
179 *input_data['args'], **input_data['kwargs'])
179 *input_data['args'], **input_data['kwargs'])
180 except Exception as e:
180 except Exception as e:
181 data, status, headers = [], None, None
181 data, status, headers = [], None, None
182 error = {
182 error = {
183 'message': str(e),
183 'message': str(e),
184 '_vcs_kind': getattr(e, '_vcs_kind', None)
184 '_vcs_kind': getattr(e, '_vcs_kind', None)
185 }
185 }
186
186
187 start_response(200, {})
187 start_response(200, {})
188 return self._iterator(error, status, headers, data)
188 return self._iterator(error, status, headers, data)
189
189
190 def _iterator(self, error, status, headers, data):
190 def _iterator(self, error, status, headers, data):
191 initial_data = [
191 initial_data = [
192 error,
192 error,
193 status,
193 status,
194 headers,
194 headers,
195 ]
195 ]
196
196
197 for d in chain(initial_data, data):
197 for d in chain(initial_data, data):
198 yield msgpack.packb(d)
198 yield msgpack.packb(d)
199
199
200
200
201 def not_found(request):
201 def not_found(request):
202 return {'status': '404 NOT FOUND'}
202 return {'status': '404 NOT FOUND'}
203
203
204
204
205 class VCSViewPredicate:
205 class VCSViewPredicate:
206 def __init__(self, val, config):
206 def __init__(self, val, config):
207 self.remotes = val
207 self.remotes = val
208
208
209 def text(self):
209 def text(self):
210 return f'vcs view method = {list(self.remotes.keys())}'
210 return f'vcs view method = {list(self.remotes.keys())}'
211
211
212 phash = text
212 phash = text
213
213
214 def __call__(self, context, request):
214 def __call__(self, context, request):
215 """
215 """
216 View predicate that returns true if given backend is supported by
216 View predicate that returns true if given backend is supported by
217 defined remotes.
217 defined remotes.
218 """
218 """
219 backend = request.matchdict.get('backend')
219 backend = request.matchdict.get('backend')
220 return backend in self.remotes
220 return backend in self.remotes
221
221
222
222
223 class HTTPApplication:
223 class HTTPApplication:
224 ALLOWED_EXCEPTIONS = ('KeyError', 'URLError')
224 ALLOWED_EXCEPTIONS = ('KeyError', 'URLError')
225
225
226 remote_wsgi = remote_wsgi
226 remote_wsgi = remote_wsgi
227 _use_echo_app = False
227 _use_echo_app = False
228
228
229 def __init__(self, settings=None, global_config=None):
229 def __init__(self, settings=None, global_config=None):
230
230
231 self.config = Configurator(settings=settings)
231 self.config = Configurator(settings=settings)
232 # Init our statsd at very start
232 # Init our statsd at very start
233 self.config.registry.statsd = StatsdClient.statsd
233 self.config.registry.statsd = StatsdClient.statsd
234 self.config.registry.vcs_call_context = {}
234 self.config.registry.vcs_call_context = {}
235
235
236 self.global_config = global_config
236 self.global_config = global_config
237 self.config.include('vcsserver.lib.rc_cache')
237 self.config.include('vcsserver.lib.rc_cache')
238 self.config.include('vcsserver.lib.archive_cache')
238 self.config.include('vcsserver.lib.archive_cache')
239
239
240 settings_locale = settings.get('locale', '') or 'en_US.UTF-8'
240 settings_locale = settings.get('locale', '') or 'en_US.UTF-8'
241 vcs = VCS(locale_conf=settings_locale, cache_config=settings)
241 vcs = VCS(locale_conf=settings_locale, cache_config=settings)
242 self._remotes = {
242 self._remotes = {
243 'hg': vcs._hg_remote,
243 'hg': vcs._hg_remote,
244 'git': vcs._git_remote,
244 'git': vcs._git_remote,
245 'svn': vcs._svn_remote,
245 'svn': vcs._svn_remote,
246 'server': vcs._vcsserver,
246 'server': vcs._vcsserver,
247 }
247 }
248 if settings.get('dev.use_echo_app', 'false').lower() == 'true':
248 if settings.get('dev.use_echo_app', 'false').lower() == 'true':
249 self._use_echo_app = True
249 self._use_echo_app = True
250 log.warning("Using EchoApp for VCS operations.")
250 log.warning("Using EchoApp for VCS operations.")
251 self.remote_wsgi = remote_wsgi_stub
251 self.remote_wsgi = remote_wsgi_stub
252
252
253 configure_and_store_settings(global_config, settings)
253 configure_and_store_settings(global_config, settings)
254
254
255 self._configure()
255 self._configure()
256
256
257 def _configure(self):
257 def _configure(self):
258 self.config.add_renderer(name='msgpack', factory=self._msgpack_renderer_factory)
258 self.config.add_renderer(name='msgpack', factory=self._msgpack_renderer_factory)
259
259
260 self.config.add_route('service', '/_service')
260 self.config.add_route('service', '/_service')
261 self.config.add_route('status', '/status')
261 self.config.add_route('status', '/status')
262 self.config.add_route('hg_proxy', '/proxy/hg')
262 self.config.add_route('hg_proxy', '/proxy/hg')
263 self.config.add_route('git_proxy', '/proxy/git')
263 self.config.add_route('git_proxy', '/proxy/git')
264
264
265 # rpc methods
265 # rpc methods
266 self.config.add_route('vcs', '/{backend}')
266 self.config.add_route('vcs', '/{backend}')
267
267
268 # streaming rpc remote methods
268 # streaming rpc remote methods
269 self.config.add_route('vcs_stream', '/{backend}/stream')
269 self.config.add_route('vcs_stream', '/{backend}/stream')
270
270
271 # vcs operations clone/push as streaming
271 # vcs operations clone/push as streaming
272 self.config.add_route('stream_git', '/stream/git/*repo_name')
272 self.config.add_route('stream_git', '/stream/git/*repo_name')
273 self.config.add_route('stream_hg', '/stream/hg/*repo_name')
273 self.config.add_route('stream_hg', '/stream/hg/*repo_name')
274
274
275 self.config.add_view(self.status_view, route_name='status', renderer='json')
275 self.config.add_view(self.status_view, route_name='status', renderer='json')
276 self.config.add_view(self.service_view, route_name='service', renderer='msgpack')
276 self.config.add_view(self.service_view, route_name='service', renderer='msgpack')
277
277
278 self.config.add_view(self.hg_proxy(), route_name='hg_proxy')
278 self.config.add_view(self.hg_proxy(), route_name='hg_proxy')
279 self.config.add_view(self.git_proxy(), route_name='git_proxy')
279 self.config.add_view(self.git_proxy(), route_name='git_proxy')
280 self.config.add_view(self.vcs_view, route_name='vcs', renderer='msgpack',
280 self.config.add_view(self.vcs_view, route_name='vcs', renderer='msgpack',
281 vcs_view=self._remotes)
281 vcs_view=self._remotes)
282 self.config.add_view(self.vcs_stream_view, route_name='vcs_stream',
282 self.config.add_view(self.vcs_stream_view, route_name='vcs_stream',
283 vcs_view=self._remotes)
283 vcs_view=self._remotes)
284
284
285 self.config.add_view(self.hg_stream(), route_name='stream_hg')
285 self.config.add_view(self.hg_stream(), route_name='stream_hg')
286 self.config.add_view(self.git_stream(), route_name='stream_git')
286 self.config.add_view(self.git_stream(), route_name='stream_git')
287
287
288 self.config.add_view_predicate('vcs_view', VCSViewPredicate)
288 self.config.add_view_predicate('vcs_view', VCSViewPredicate)
289
289
290 self.config.add_notfound_view(not_found, renderer='json')
290 self.config.add_notfound_view(not_found, renderer='json')
291
291
292 self.config.add_view(self.handle_vcs_exception, context=Exception)
292 self.config.add_view(self.handle_vcs_exception, context=Exception)
293
293
294 self.config.add_tween(
294 self.config.add_tween(
295 'vcsserver.tweens.request_wrapper.RequestWrapperTween',
295 'vcsserver.tweens.request_wrapper.RequestWrapperTween',
296 )
296 )
297 self.config.add_request_method(
297 self.config.add_request_method(
298 'vcsserver.lib.request_counter.get_request_counter',
298 'vcsserver.lib.request_counter.get_request_counter',
299 'request_count')
299 'request_count')
300
300
301 def wsgi_app(self):
301 def wsgi_app(self):
302 return self.config.make_wsgi_app()
302 return self.config.make_wsgi_app()
303
303
304 def _vcs_view_params(self, request):
304 def _vcs_view_params(self, request):
305 remote = self._remotes[request.matchdict['backend']]
305 remote = self._remotes[request.matchdict['backend']]
306 payload = msgpack.unpackb(request.body, use_list=True)
306 payload = msgpack.unpackb(request.body, use_list=True)
307
307
308 method = payload.get('method')
308 method = payload.get('method')
309 params = payload['params']
309 params = payload['params']
310 wire = params.get('wire')
310 wire = params.get('wire')
311 args = params.get('args')
311 args = params.get('args')
312 kwargs = params.get('kwargs')
312 kwargs = params.get('kwargs')
313 context_uid = None
313 context_uid = None
314
314
315 request.registry.vcs_call_context = {
315 request.registry.vcs_call_context = {
316 'method': method,
316 'method': method,
317 'repo_name': payload.get('_repo_name'),
317 'repo_name': payload.get('_repo_name'),
318 }
318 }
319
319
320 if wire:
320 if wire:
321 try:
321 try:
322 wire['context'] = context_uid = uuid.UUID(wire['context'])
322 wire['context'] = context_uid = uuid.UUID(wire['context'])
323 except KeyError:
323 except KeyError:
324 pass
324 pass
325 args.insert(0, wire)
325 args.insert(0, wire)
326 repo_state_uid = wire.get('repo_state_uid') if wire else None
326 repo_state_uid = wire.get('repo_state_uid') if wire else None
327
327
328 # NOTE(marcink): trading complexity for slight performance
328 # NOTE(marcink): trading complexity for slight performance
329 if log.isEnabledFor(logging.DEBUG):
329 if log.isEnabledFor(logging.DEBUG):
330 # also we SKIP printing out any of those methods args since they maybe excessive
330 # also we SKIP printing out any of those methods args since they maybe excessive
331 just_args_methods = {
331 just_args_methods = {
332 'commitctx': ('content', 'removed', 'updated'),
332 'commitctx': ('content', 'removed', 'updated'),
333 'commit': ('content', 'removed', 'updated')
333 'commit': ('content', 'removed', 'updated')
334 }
334 }
335 if method in just_args_methods:
335 if method in just_args_methods:
336 skip_args = just_args_methods[method]
336 skip_args = just_args_methods[method]
337 call_args = ''
337 call_args = ''
338 call_kwargs = {}
338 call_kwargs = {}
339 for k in kwargs:
339 for k in kwargs:
340 if k in skip_args:
340 if k in skip_args:
341 # replace our skip key with dummy
341 # replace our skip key with dummy
342 call_kwargs[k] = f'RemovedParam({k})'
342 call_kwargs[k] = f'RemovedParam({k})'
343 else:
343 else:
344 call_kwargs[k] = kwargs[k]
344 call_kwargs[k] = kwargs[k]
345 else:
345 else:
346 call_args = args[1:]
346 call_args = args[1:]
347 call_kwargs = kwargs
347 call_kwargs = kwargs
348
348
349 log.debug('Method requested:`%s` with args:%s kwargs:%s context_uid: %s, repo_state_uid:%s',
349 log.debug('Method requested:`%s` with args:%s kwargs:%s context_uid: %s, repo_state_uid:%s',
350 method, call_args, call_kwargs, context_uid, repo_state_uid)
350 method, call_args, call_kwargs, context_uid, repo_state_uid)
351
351
352 statsd = request.registry.statsd
352 statsd = request.registry.statsd
353 if statsd:
353 if statsd:
354 statsd.incr(
354 statsd.incr(
355 'vcsserver_method_total', tags=[
355 'vcsserver_method_total', tags=[
356 f"method:{method}",
356 f"method:{method}",
357 ])
357 ])
358 return payload, remote, method, args, kwargs
358 return payload, remote, method, args, kwargs
359
359
360 def vcs_view(self, request):
360 def vcs_view(self, request):
361
361
362 payload, remote, method, args, kwargs = self._vcs_view_params(request)
362 payload, remote, method, args, kwargs = self._vcs_view_params(request)
363 payload_id = payload.get('id')
363 payload_id = payload.get('id')
364
364
365 try:
365 try:
366 resp = getattr(remote, method)(*args, **kwargs)
366 resp = getattr(remote, method)(*args, **kwargs)
367 except Exception as e:
367 except Exception as e:
368 exc_info = list(sys.exc_info())
368 exc_info = list(sys.exc_info())
369 exc_type, exc_value, exc_traceback = exc_info
369 exc_type, exc_value, exc_traceback = exc_info
370
370
371 org_exc = getattr(e, '_org_exc', None)
371 org_exc = getattr(e, '_org_exc', None)
372 org_exc_name = None
372 org_exc_name = None
373 org_exc_tb = ''
373 org_exc_tb = ''
374 if org_exc:
374 if org_exc:
375 org_exc_name = org_exc.__class__.__name__
375 org_exc_name = org_exc.__class__.__name__
376 org_exc_tb = getattr(e, '_org_exc_tb', '')
376 org_exc_tb = getattr(e, '_org_exc_tb', '')
377 # replace our "faked" exception with our org
377 # replace our "faked" exception with our org
378 exc_info[0] = org_exc.__class__
378 exc_info[0] = org_exc.__class__
379 exc_info[1] = org_exc
379 exc_info[1] = org_exc
380
380
381 should_store_exc = True
381 should_store_exc = True
382 if org_exc:
382 if org_exc:
383 def get_exc_fqn(_exc_obj):
383 def get_exc_fqn(_exc_obj):
384 module_name = getattr(org_exc.__class__, '__module__', 'UNKNOWN')
384 module_name = getattr(org_exc.__class__, '__module__', 'UNKNOWN')
385 return module_name + '.' + org_exc_name
385 return module_name + '.' + org_exc_name
386
386
387 exc_fqn = get_exc_fqn(org_exc)
387 exc_fqn = get_exc_fqn(org_exc)
388
388
389 if exc_fqn in ['mercurial.error.RepoLookupError',
389 if exc_fqn in ['mercurial.error.RepoLookupError',
390 'vcsserver.exceptions.RefNotFoundException']:
390 'vcsserver.exceptions.RefNotFoundException']:
391 should_store_exc = False
391 should_store_exc = False
392
392
393 if should_store_exc:
393 if should_store_exc:
394 store_exception(id(exc_info), exc_info, request_path=request.path)
394 store_exception(id(exc_info), exc_info, request_path=request.path)
395
395
396 tb_info = format_exc(exc_info)
396 tb_info = format_exc(exc_info)
397
397
398 type_ = e.__class__.__name__
398 type_ = e.__class__.__name__
399 if type_ not in self.ALLOWED_EXCEPTIONS:
399 if type_ not in self.ALLOWED_EXCEPTIONS:
400 type_ = None
400 type_ = None
401
401
402 resp = {
402 resp = {
403 'id': payload_id,
403 'id': payload_id,
404 'error': {
404 'error': {
405 'message': str(e),
405 'message': str(e),
406 'traceback': tb_info,
406 'traceback': tb_info,
407 'org_exc': org_exc_name,
407 'org_exc': org_exc_name,
408 'org_exc_tb': org_exc_tb,
408 'org_exc_tb': org_exc_tb,
409 'type': type_
409 'type': type_
410 }
410 }
411 }
411 }
412
412
413 try:
413 try:
414 resp['error']['_vcs_kind'] = getattr(e, '_vcs_kind', None)
414 resp['error']['_vcs_kind'] = getattr(e, '_vcs_kind', None)
415 except AttributeError:
415 except AttributeError:
416 pass
416 pass
417 else:
417 else:
418 resp = {
418 resp = {
419 'id': payload_id,
419 'id': payload_id,
420 'result': resp
420 'result': resp
421 }
421 }
422 log.debug('Serving data for method %s', method)
422 log.debug('Serving data for method %s', method)
423 return resp
423 return resp
424
424
425 def vcs_stream_view(self, request):
425 def vcs_stream_view(self, request):
426 payload, remote, method, args, kwargs = self._vcs_view_params(request)
426 payload, remote, method, args, kwargs = self._vcs_view_params(request)
427 # this method has a stream: marker we remove it here
427 # this method has a stream: marker we remove it here
428 method = method.split('stream:')[-1]
428 method = method.split('stream:')[-1]
429 chunk_size = safe_int(payload.get('chunk_size')) or 4096
429 chunk_size = safe_int(payload.get('chunk_size')) or 4096
430
430
431 resp = getattr(remote, method)(*args, **kwargs)
431 resp = getattr(remote, method)(*args, **kwargs)
432
432
433 def get_chunked_data(method_resp):
433 def get_chunked_data(method_resp):
434 stream = io.BytesIO(method_resp)
434 stream = io.BytesIO(method_resp)
435 while 1:
435 while 1:
436 chunk = stream.read(chunk_size)
436 chunk = stream.read(chunk_size)
437 if not chunk:
437 if not chunk:
438 break
438 break
439 yield chunk
439 yield chunk
440
440
441 response = Response(app_iter=get_chunked_data(resp))
441 response = Response(app_iter=get_chunked_data(resp))
442 response.content_type = 'application/octet-stream'
442 response.content_type = 'application/octet-stream'
443
443
444 return response
444 return response
445
445
446 def status_view(self, request):
446 def status_view(self, request):
447 import vcsserver
447 import vcsserver
448 _platform_id = platform.uname()[1] or 'instance'
448 _platform_id = platform.uname()[1] or 'instance'
449
449
450 return {
450 return {
451 "status": "OK",
451 "status": "OK",
452 "vcsserver_version": vcsserver.get_version(),
452 "vcsserver_version": vcsserver.get_version(),
453 "platform": _platform_id,
453 "platform": _platform_id,
454 "pid": os.getpid(),
454 "pid": os.getpid(),
455 }
455 }
456
456
457 def service_view(self, request):
457 def service_view(self, request):
458 import vcsserver
458 import vcsserver
459
459
460 payload = msgpack.unpackb(request.body, use_list=True)
460 payload = msgpack.unpackb(request.body, use_list=True)
461 server_config, app_config = {}, {}
461 server_config, app_config = {}, {}
462
462
463 try:
463 try:
464 path = self.global_config['__file__']
464 path = self.global_config['__file__']
465 config = configparser.RawConfigParser()
465 config = configparser.RawConfigParser()
466
466
467 config.read(path)
467 config.read(path)
468
468
469 if config.has_section('server:main'):
469 if config.has_section('server:main'):
470 server_config = dict(config.items('server:main'))
470 server_config = dict(config.items('server:main'))
471 if config.has_section('app:main'):
471 if config.has_section('app:main'):
472 app_config = dict(config.items('app:main'))
472 app_config = dict(config.items('app:main'))
473
473
474 except Exception:
474 except Exception:
475 log.exception('Failed to read .ini file for display')
475 log.exception('Failed to read .ini file for display')
476
476
477 environ = list(os.environ.items())
477 environ = list(os.environ.items())
478
478
479 resp = {
479 resp = {
480 'id': payload.get('id'),
480 'id': payload.get('id'),
481 'result': dict(
481 'result': dict(
482 version=vcsserver.get_version(),
482 version=vcsserver.get_version(),
483 config=server_config,
483 config=server_config,
484 app_config=app_config,
484 app_config=app_config,
485 environ=environ,
485 environ=environ,
486 payload=payload,
486 payload=payload,
487 )
487 )
488 }
488 }
489 return resp
489 return resp
490
490
491 def _msgpack_renderer_factory(self, info):
491 def _msgpack_renderer_factory(self, info):
492
492
493 def _render(value, system):
493 def _render(value, system):
494 bin_type = False
494 bin_type = False
495 res = value.get('result')
495 res = value.get('result')
496 if isinstance(res, BytesEnvelope):
496 if isinstance(res, BytesEnvelope):
497 log.debug('Result is wrapped in BytesEnvelope type')
497 log.debug('Result is wrapped in BytesEnvelope type')
498 bin_type = True
498 bin_type = True
499 elif isinstance(res, BinaryEnvelope):
499 elif isinstance(res, BinaryEnvelope):
500 log.debug('Result is wrapped in BinaryEnvelope type')
500 log.debug('Result is wrapped in BinaryEnvelope type')
501 value['result'] = res.val
501 value['result'] = res.val
502 bin_type = True
502 bin_type = True
503
503
504 request = system.get('request')
504 request = system.get('request')
505 if request is not None:
505 if request is not None:
506 response = request.response
506 response = request.response
507 ct = response.content_type
507 ct = response.content_type
508 if ct == response.default_content_type:
508 if ct == response.default_content_type:
509 response.content_type = 'application/x-msgpack'
509 response.content_type = 'application/x-msgpack'
510 if bin_type:
510 if bin_type:
511 response.content_type = 'application/x-msgpack-bin'
511 response.content_type = 'application/x-msgpack-bin'
512
512
513 return msgpack.packb(value, use_bin_type=bin_type)
513 return msgpack.packb(value, use_bin_type=bin_type)
514 return _render
514 return _render
515
515
516 def set_env_from_config(self, environ, config):
516 def set_env_from_config(self, environ, config):
517 dict_conf = {}
517 dict_conf = {}
518 try:
518 try:
519 for elem in config:
519 for elem in config:
520 if elem[0] == 'rhodecode':
520 if elem[0] == 'rhodecode':
521 dict_conf = json.loads(elem[2])
521 dict_conf = json.loads(elem[2])
522 break
522 break
523 except Exception:
523 except Exception:
524 log.exception('Failed to fetch SCM CONFIG')
524 log.exception('Failed to fetch SCM CONFIG')
525 return
525 return
526
526
527 username = dict_conf.get('username')
527 username = dict_conf.get('username')
528 if username:
528 if username:
529 environ['REMOTE_USER'] = username
529 environ['REMOTE_USER'] = username
530 # mercurial specific, some extension api rely on this
530 # mercurial specific, some extension api rely on this
531 environ['HGUSER'] = username
531 environ['HGUSER'] = username
532
532
533 ip = dict_conf.get('ip')
533 ip = dict_conf.get('ip')
534 if ip:
534 if ip:
535 environ['REMOTE_HOST'] = ip
535 environ['REMOTE_HOST'] = ip
536
536
537 if _is_request_chunked(environ):
537 if _is_request_chunked(environ):
538 # set the compatibility flag for webob
538 # set the compatibility flag for webob
539 environ['wsgi.input_terminated'] = True
539 environ['wsgi.input_terminated'] = True
540
540
541 def hg_proxy(self):
541 def hg_proxy(self):
542 @wsgiapp
542 @wsgiapp
543 def _hg_proxy(environ, start_response):
543 def _hg_proxy(environ, start_response):
544 app = WsgiProxy(self.remote_wsgi.HgRemoteWsgi())
544 app = WsgiProxy(self.remote_wsgi.HgRemoteWsgi())
545 return app(environ, start_response)
545 return app(environ, start_response)
546 return _hg_proxy
546 return _hg_proxy
547
547
548 def git_proxy(self):
548 def git_proxy(self):
549 @wsgiapp
549 @wsgiapp
550 def _git_proxy(environ, start_response):
550 def _git_proxy(environ, start_response):
551 app = WsgiProxy(self.remote_wsgi.GitRemoteWsgi())
551 app = WsgiProxy(self.remote_wsgi.GitRemoteWsgi())
552 return app(environ, start_response)
552 return app(environ, start_response)
553 return _git_proxy
553 return _git_proxy
554
554
555 def hg_stream(self):
555 def hg_stream(self):
556 if self._use_echo_app:
556 if self._use_echo_app:
557 @wsgiapp
557 @wsgiapp
558 def _hg_stream(environ, start_response):
558 def _hg_stream(environ, start_response):
559 app = EchoApp('fake_path', 'fake_name', None)
559 app = EchoApp('fake_path', 'fake_name', None)
560 return app(environ, start_response)
560 return app(environ, start_response)
561 return _hg_stream
561 return _hg_stream
562 else:
562 else:
563 @wsgiapp
563 @wsgiapp
564 def _hg_stream(environ, start_response):
564 def _hg_stream(environ, start_response):
565 log.debug('http-app: handling hg stream')
565 log.debug('http-app: handling hg stream')
566 call_context = get_headers_call_context(environ)
566 call_context = get_headers_call_context(environ)
567
567
568 repo_path = call_context['repo_path']
568 repo_path = call_context['repo_path']
569 repo_name = call_context['repo_name']
569 repo_name = call_context['repo_name']
570 config = call_context['repo_config']
570 config = call_context['repo_config']
571
571
572 app = scm_app.create_hg_wsgi_app(
572 app = scm_app.create_hg_wsgi_app(
573 repo_path, repo_name, config)
573 repo_path, repo_name, config)
574
574
575 # Consistent path information for hgweb
575 # Consistent path information for hgweb
576 environ['PATH_INFO'] = call_context['path_info']
576 environ['PATH_INFO'] = call_context['path_info']
577 environ['REPO_NAME'] = repo_name
577 environ['REPO_NAME'] = repo_name
578 self.set_env_from_config(environ, config)
578 self.set_env_from_config(environ, config)
579
579
580 log.debug('http-app: starting app handler '
580 log.debug('http-app: starting app handler '
581 'with %s and process request', app)
581 'with %s and process request', app)
582 return app(environ, ResponseFilter(start_response))
582 return app(environ, ResponseFilter(start_response))
583 return _hg_stream
583 return _hg_stream
584
584
585 def git_stream(self):
585 def git_stream(self):
586 if self._use_echo_app:
586 if self._use_echo_app:
587 @wsgiapp
587 @wsgiapp
588 def _git_stream(environ, start_response):
588 def _git_stream(environ, start_response):
589 app = EchoApp('fake_path', 'fake_name', None)
589 app = EchoApp('fake_path', 'fake_name', None)
590 return app(environ, start_response)
590 return app(environ, start_response)
591 return _git_stream
591 return _git_stream
592 else:
592 else:
593 @wsgiapp
593 @wsgiapp
594 def _git_stream(environ, start_response):
594 def _git_stream(environ, start_response):
595 log.debug('http-app: handling git stream')
595 log.debug('http-app: handling git stream')
596
596
597 call_context = get_headers_call_context(environ)
597 call_context = get_headers_call_context(environ)
598
598
599 repo_path = call_context['repo_path']
599 repo_path = call_context['repo_path']
600 repo_name = call_context['repo_name']
600 repo_name = call_context['repo_name']
601 config = call_context['repo_config']
601 config = call_context['repo_config']
602
602
603 environ['PATH_INFO'] = call_context['path_info']
603 environ['PATH_INFO'] = call_context['path_info']
604 self.set_env_from_config(environ, config)
604 self.set_env_from_config(environ, config)
605
605
606 content_type = environ.get('CONTENT_TYPE', '')
606 content_type = environ.get('CONTENT_TYPE', '')
607
607
608 path = environ['PATH_INFO']
608 path = environ['PATH_INFO']
609 is_lfs_request = GIT_LFS_CONTENT_TYPE in content_type
609 is_lfs_request = GIT_LFS_CONTENT_TYPE in content_type
610 log.debug(
610 log.debug(
611 'LFS: Detecting if request `%s` is LFS server path based '
611 'LFS: Detecting if request `%s` is LFS server path based '
612 'on content type:`%s`, is_lfs:%s',
612 'on content type:`%s`, is_lfs:%s',
613 path, content_type, is_lfs_request)
613 path, content_type, is_lfs_request)
614
614
615 if not is_lfs_request:
615 if not is_lfs_request:
616 # fallback detection by path
616 # fallback detection by path
617 if GIT_LFS_PROTO_PAT.match(path):
617 if GIT_LFS_PROTO_PAT.match(path):
618 is_lfs_request = True
618 is_lfs_request = True
619 log.debug(
619 log.debug(
620 'LFS: fallback detection by path of: `%s`, is_lfs:%s',
620 'LFS: fallback detection by path of: `%s`, is_lfs:%s',
621 path, is_lfs_request)
621 path, is_lfs_request)
622
622
623 if is_lfs_request:
623 if is_lfs_request:
624 app = scm_app.create_git_lfs_wsgi_app(
624 app = scm_app.create_git_lfs_wsgi_app(
625 repo_path, repo_name, config)
625 repo_path, repo_name, config)
626 else:
626 else:
627 app = scm_app.create_git_wsgi_app(
627 app = scm_app.create_git_wsgi_app(
628 repo_path, repo_name, config)
628 repo_path, repo_name, config)
629
629
630 log.debug('http-app: starting app handler '
630 log.debug('http-app: starting app handler '
631 'with %s and process request', app)
631 'with %s and process request', app)
632
632
633 return app(environ, start_response)
633 return app(environ, start_response)
634
634
635 return _git_stream
635 return _git_stream
636
636
637 def handle_vcs_exception(self, exception, request):
637 def handle_vcs_exception(self, exception, request):
638 _vcs_kind = getattr(exception, '_vcs_kind', '')
639
638
640 if _vcs_kind == 'repo_locked':
639 match _vcs_kind := getattr(exception, '_vcs_kind', ''):
641 headers_call_context = get_headers_call_context(request.environ)
640 case 'repo_locked':
642 status_code = safe_int(headers_call_context['locked_status_code'])
641 headers_call_context = get_headers_call_context(request.environ)
642 status_code = safe_int(headers_call_context['locked_status_code'])
643
643
644 return HTTPRepoLocked(
644 return HTTPRepoLocked(
645 title=str(exception), status_code=status_code, headers=[('X-Rc-Locked', '1')])
645 title=str(exception), status_code=status_code, headers=[('X-Rc-Locked', '1')])
646
646 case 'repo_branch_protected':
647 elif _vcs_kind == 'repo_branch_protected':
647 # Get custom repo-branch-protected status code if present.
648 # Get custom repo-branch-protected status code if present.
648 return HTTPRepoBranchProtected(
649 return HTTPRepoBranchProtected(
649 title=str(exception), headers=[('X-Rc-Branch-Protection', '1')])
650 title=str(exception), headers=[('X-Rc-Branch-Protection', '1')])
650 case 'client_not_supported':
651 return HTTPClientNotSupported(
652 title=str(exception), headers=[('X-Rc-Client-Not-Supported', '1')])
651
653
652 exc_info = request.exc_info
654 exc_info = request.exc_info
653 store_exception(id(exc_info), exc_info)
655 store_exception(id(exc_info), exc_info)
654
656
655 traceback_info = 'unavailable'
657 traceback_info = 'unavailable'
656 if request.exc_info:
658 if request.exc_info:
657 traceback_info = format_exc(request.exc_info)
659 traceback_info = format_exc(request.exc_info)
658
660
659 log.error(
661 log.error(
660 'error occurred handling this request for path: %s, \n%s',
662 'error occurred handling this request for path: %s, \n%s',
661 request.path, traceback_info)
663 request.path, traceback_info)
662
664
663 statsd = request.registry.statsd
665 statsd = request.registry.statsd
664 if statsd:
666 if statsd:
665 exc_type = f"{exception.__class__.__module__}.{exception.__class__.__name__}"
667 exc_type = f"{exception.__class__.__module__}.{exception.__class__.__name__}"
666 statsd.incr('vcsserver_exception_total',
668 statsd.incr('vcsserver_exception_total',
667 tags=[f"type:{exc_type}"])
669 tags=[f"type:{exc_type}"])
668 raise exception
670 raise exception
669
671
670
672
671 class ResponseFilter:
673 class ResponseFilter:
672
674
673 def __init__(self, start_response):
675 def __init__(self, start_response):
674 self._start_response = start_response
676 self._start_response = start_response
675
677
676 def __call__(self, status, response_headers, exc_info=None):
678 def __call__(self, status, response_headers, exc_info=None):
677 headers = tuple(
679 headers = tuple(
678 (h, v) for h, v in response_headers
680 (h, v) for h, v in response_headers
679 if not wsgiref.util.is_hop_by_hop(h))
681 if not wsgiref.util.is_hop_by_hop(h))
680 return self._start_response(status, headers, exc_info)
682 return self._start_response(status, headers, exc_info)
681
683
682
684
683 def sanitize_settings_and_apply_defaults(global_config, settings):
685 def sanitize_settings_and_apply_defaults(global_config, settings):
684 _global_settings_maker = SettingsMaker(global_config)
686 _global_settings_maker = SettingsMaker(global_config)
685 settings_maker = SettingsMaker(settings)
687 settings_maker = SettingsMaker(settings)
686
688
687 settings_maker.make_setting('logging.autoconfigure', False, parser='bool')
689 settings_maker.make_setting('logging.autoconfigure', False, parser='bool')
688
690
689 logging_conf = os.path.join(os.path.dirname(global_config.get('__file__')), 'logging.ini')
691 logging_conf = os.path.join(os.path.dirname(global_config.get('__file__')), 'logging.ini')
690 settings_maker.enable_logging(logging_conf)
692 settings_maker.enable_logging(logging_conf)
691
693
692 # Default includes, possible to change as a user
694 # Default includes, possible to change as a user
693 pyramid_includes = settings_maker.make_setting('pyramid.includes', [], parser='list:newline')
695 pyramid_includes = settings_maker.make_setting('pyramid.includes', [], parser='list:newline')
694 log.debug("Using the following pyramid.includes: %s", pyramid_includes)
696 log.debug("Using the following pyramid.includes: %s", pyramid_includes)
695
697
696 settings_maker.make_setting('__file__', global_config.get('__file__'))
698 settings_maker.make_setting('__file__', global_config.get('__file__'))
697
699
698 settings_maker.make_setting('pyramid.default_locale_name', 'en')
700 settings_maker.make_setting('pyramid.default_locale_name', 'en')
699 settings_maker.make_setting('locale', 'en_US.UTF-8')
701 settings_maker.make_setting('locale', 'en_US.UTF-8')
700
702
701 settings_maker.make_setting(
703 settings_maker.make_setting(
702 'core.binary_dir', '/usr/local/bin/rhodecode_bin/vcs_bin',
704 'core.binary_dir', '/usr/local/bin/rhodecode_bin/vcs_bin',
703 default_when_empty=True, parser='string:noquote')
705 default_when_empty=True, parser='string:noquote')
704
706
705 settings_maker.make_setting('vcs.svn.redis_conn', 'redis://redis:6379/0')
707 settings_maker.make_setting('vcs.svn.redis_conn', 'redis://redis:6379/0')
706
708
707 temp_store = tempfile.gettempdir()
709 temp_store = tempfile.gettempdir()
708 default_cache_dir = os.path.join(temp_store, 'rc_cache')
710 default_cache_dir = os.path.join(temp_store, 'rc_cache')
709 # save default, cache dir, and use it for all backends later.
711 # save default, cache dir, and use it for all backends later.
710 default_cache_dir = settings_maker.make_setting(
712 default_cache_dir = settings_maker.make_setting(
711 'cache_dir',
713 'cache_dir',
712 default=default_cache_dir, default_when_empty=True,
714 default=default_cache_dir, default_when_empty=True,
713 parser='dir:ensured')
715 parser='dir:ensured')
714
716
715 # exception store cache
717 # exception store cache
716 settings_maker.make_setting(
718 settings_maker.make_setting(
717 'exception_tracker.store_path',
719 'exception_tracker.store_path',
718 default=os.path.join(default_cache_dir, 'exc_store'), default_when_empty=True,
720 default=os.path.join(default_cache_dir, 'exc_store'), default_when_empty=True,
719 parser='dir:ensured'
721 parser='dir:ensured'
720 )
722 )
721
723
722 # repo_object cache defaults
724 # repo_object cache defaults
723 settings_maker.make_setting(
725 settings_maker.make_setting(
724 'rc_cache.repo_object.backend',
726 'rc_cache.repo_object.backend',
725 default='dogpile.cache.rc.file_namespace',
727 default='dogpile.cache.rc.file_namespace',
726 parser='string')
728 parser='string')
727 settings_maker.make_setting(
729 settings_maker.make_setting(
728 'rc_cache.repo_object.expiration_time',
730 'rc_cache.repo_object.expiration_time',
729 default=30 * 24 * 60 * 60, # 30days
731 default=30 * 24 * 60 * 60, # 30days
730 parser='int')
732 parser='int')
731 settings_maker.make_setting(
733 settings_maker.make_setting(
732 'rc_cache.repo_object.arguments.filename',
734 'rc_cache.repo_object.arguments.filename',
733 default=os.path.join(default_cache_dir, 'vcsserver_cache_repo_object.db'),
735 default=os.path.join(default_cache_dir, 'vcsserver_cache_repo_object.db'),
734 parser='string')
736 parser='string')
735
737
736 # statsd
738 # statsd
737 settings_maker.make_setting('statsd.enabled', False, parser='bool')
739 settings_maker.make_setting('statsd.enabled', False, parser='bool')
738 settings_maker.make_setting('statsd.statsd_host', 'statsd-exporter', parser='string')
740 settings_maker.make_setting('statsd.statsd_host', 'statsd-exporter', parser='string')
739 settings_maker.make_setting('statsd.statsd_port', 9125, parser='int')
741 settings_maker.make_setting('statsd.statsd_port', 9125, parser='int')
740 settings_maker.make_setting('statsd.statsd_prefix', '')
742 settings_maker.make_setting('statsd.statsd_prefix', '')
741 settings_maker.make_setting('statsd.statsd_ipv6', False, parser='bool')
743 settings_maker.make_setting('statsd.statsd_ipv6', False, parser='bool')
742
744
743 settings_maker.env_expand()
745 settings_maker.env_expand()
744
746
745
747
746 def main(global_config, **settings):
748 def main(global_config, **settings):
747 start_time = time.time()
749 start_time = time.time()
748 log.info('Pyramid app config starting')
750 log.info('Pyramid app config starting')
749
751
750 if MercurialFactory:
752 if MercurialFactory:
751 hgpatches.patch_largefiles_capabilities()
753 hgpatches.patch_largefiles_capabilities()
752 hgpatches.patch_subrepo_type_mapping()
754 hgpatches.patch_subrepo_type_mapping()
753
755
754 # Fill in and sanitize the defaults & do ENV expansion
756 # Fill in and sanitize the defaults & do ENV expansion
755 sanitize_settings_and_apply_defaults(global_config, settings)
757 sanitize_settings_and_apply_defaults(global_config, settings)
756
758
757 # init and bootstrap StatsdClient
759 # init and bootstrap StatsdClient
758 StatsdClient.setup(settings)
760 StatsdClient.setup(settings)
759
761
760 pyramid_app = HTTPApplication(settings=settings, global_config=global_config).wsgi_app()
762 pyramid_app = HTTPApplication(settings=settings, global_config=global_config).wsgi_app()
761 total_time = time.time() - start_time
763 total_time = time.time() - start_time
762 log.info('Pyramid app created and configured in %.2fs', total_time)
764 log.info('Pyramid app created and configured in %.2fs', total_time)
763 return pyramid_app
765 return pyramid_app
@@ -1,563 +1,564 b''
1 """
1 """
2 Module provides a class allowing to wrap communication over subprocess.Popen
2 Module provides a class allowing to wrap communication over subprocess.Popen
3 input, output, error streams into a meaningfull, non-blocking, concurrent
3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 stream processor exposing the output data as an iterator fitting to be a
4 stream processor exposing the output data as an iterator fitting to be a
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6
6
7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
8
8
9 This file is part of git_http_backend.py Project.
9 This file is part of git_http_backend.py Project.
10
10
11 git_http_backend.py Project is free software: you can redistribute it and/or
11 git_http_backend.py Project is free software: you can redistribute it and/or
12 modify it under the terms of the GNU Lesser General Public License as
12 modify it under the terms of the GNU Lesser General Public License as
13 published by the Free Software Foundation, either version 2.1 of the License,
13 published by the Free Software Foundation, either version 2.1 of the License,
14 or (at your option) any later version.
14 or (at your option) any later version.
15
15
16 git_http_backend.py Project is distributed in the hope that it will be useful,
16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU Lesser General Public License for more details.
19 GNU Lesser General Public License for more details.
20
20
21 You should have received a copy of the GNU Lesser General Public License
21 You should have received a copy of the GNU Lesser General Public License
22 along with git_http_backend.py Project.
22 along with git_http_backend.py Project.
23 If not, see <http://www.gnu.org/licenses/>.
23 If not, see <http://www.gnu.org/licenses/>.
24 """
24 """
25 import os
25 import os
26 import collections
26 import collections
27 import logging
27 import logging
28 import subprocess
28 import subprocess
29 import threading
29 import threading
30
30
31 from vcsserver.lib.str_utils import safe_str
31 from vcsserver.lib.str_utils import safe_str
32
32
33 log = logging.getLogger(__name__)
33 log = logging.getLogger(__name__)
34
34
35
35
36 class StreamFeeder(threading.Thread):
36 class StreamFeeder(threading.Thread):
37 """
37 """
38 Normal writing into pipe-like is blocking once the buffer is filled.
38 Normal writing into pipe-like is blocking once the buffer is filled.
39 This thread allows a thread to seep data from a file-like into a pipe
39 This thread allows a thread to seep data from a file-like into a pipe
40 without blocking the main thread.
40 without blocking the main thread.
41 We close inpipe once the end of the source stream is reached.
41 We close inpipe once the end of the source stream is reached.
42 """
42 """
43
43
44 def __init__(self, source):
44 def __init__(self, source):
45 super().__init__()
45 super().__init__()
46 self.daemon = True
46 self.daemon = True
47 filelike = False
47 filelike = False
48 self.bytes = b''
48 self.bytes = b''
49 if type(source) in (str, bytes, bytearray): # string-like
49 if type(source) in (str, bytes, bytearray): # string-like
50 self.bytes = bytes(source)
50 self.bytes = bytes(source)
51 else: # can be either file pointer or file-like
51 else: # can be either file pointer or file-like
52 if isinstance(source, int): # file pointer it is
52 if isinstance(source, int): # file pointer it is
53 # converting file descriptor (int) stdin into file-like
53 # converting file descriptor (int) stdin into file-like
54 source = os.fdopen(source, 'rb', 16384)
54 source = os.fdopen(source, 'rb', 16384)
55 # let's see if source is file-like by now
55 # let's see if source is file-like by now
56 filelike = hasattr(source, 'read')
56 filelike = hasattr(source, 'read')
57 if not filelike and not self.bytes:
57 if not filelike and not self.bytes:
58 raise TypeError("StreamFeeder's source object must be a readable "
58 raise TypeError("StreamFeeder's source object must be a readable "
59 "file-like, a file descriptor, or a string-like.")
59 "file-like, a file descriptor, or a string-like.")
60 self.source = source
60 self.source = source
61 self.readiface, self.writeiface = os.pipe()
61 self.readiface, self.writeiface = os.pipe()
62
62
63 def run(self):
63 def run(self):
64 writer = self.writeiface
64 writer = self.writeiface
65 try:
65 try:
66 if self.bytes:
66 if self.bytes:
67 os.write(writer, self.bytes)
67 os.write(writer, self.bytes)
68 else:
68 else:
69 s = self.source
69 s = self.source
70
70
71 while 1:
71 while 1:
72 _bytes = s.read(4096)
72 _bytes = s.read(4096)
73 if not _bytes:
73 if not _bytes:
74 break
74 break
75 os.write(writer, _bytes)
75 os.write(writer, _bytes)
76
76
77 finally:
77 finally:
78 os.close(writer)
78 os.close(writer)
79
79
80 @property
80 @property
81 def output(self):
81 def output(self):
82 return self.readiface
82 return self.readiface
83
83
84
84
85 class InputStreamChunker(threading.Thread):
85 class InputStreamChunker(threading.Thread):
86 def __init__(self, source, target, buffer_size, chunk_size):
86 def __init__(self, source, target, buffer_size, chunk_size):
87
87
88 super().__init__()
88 super().__init__()
89
89
90 self.daemon = True # die die die.
90 self.daemon = True # die die die.
91
91
92 self.source = source
92 self.source = source
93 self.target = target
93 self.target = target
94 self.chunk_count_max = int(buffer_size / chunk_size) + 1
94 self.chunk_count_max = int(buffer_size / chunk_size) + 1
95 self.chunk_size = chunk_size
95 self.chunk_size = chunk_size
96
96
97 self.data_added = threading.Event()
97 self.data_added = threading.Event()
98 self.data_added.clear()
98 self.data_added.clear()
99
99
100 self.keep_reading = threading.Event()
100 self.keep_reading = threading.Event()
101 self.keep_reading.set()
101 self.keep_reading.set()
102
102
103 self.EOF = threading.Event()
103 self.EOF = threading.Event()
104 self.EOF.clear()
104 self.EOF.clear()
105
105
106 self.go = threading.Event()
106 self.go = threading.Event()
107 self.go.set()
107 self.go.set()
108
108
109 def stop(self):
109 def stop(self):
110 self.go.clear()
110 self.go.clear()
111 self.EOF.set()
111 self.EOF.set()
112 try:
112 try:
113 # this is not proper, but is done to force the reader thread let
113 # this is not proper, but is done to force the reader thread let
114 # go of the input because, if successful, .close() will send EOF
114 # go of the input because, if successful, .close() will send EOF
115 # down the pipe.
115 # down the pipe.
116 self.source.close()
116 self.source.close()
117 except Exception:
117 except Exception:
118 pass
118 pass
119
119
120 def run(self):
120 def run(self):
121 s = self.source
121 s = self.source
122 t = self.target
122 t = self.target
123 cs = self.chunk_size
123 cs = self.chunk_size
124 chunk_count_max = self.chunk_count_max
124 chunk_count_max = self.chunk_count_max
125 keep_reading = self.keep_reading
125 keep_reading = self.keep_reading
126 da = self.data_added
126 da = self.data_added
127 go = self.go
127 go = self.go
128
128
129 try:
129 try:
130 b = s.read(cs)
130 b = s.read(cs)
131 except ValueError:
131 except ValueError:
132 b = ''
132 b = ''
133
133
134 timeout_input = 20
134 timeout_input = 20
135 while b and go.is_set():
135 while b and go.is_set():
136 if len(t) > chunk_count_max:
136 if len(t) > chunk_count_max:
137 keep_reading.clear()
137 keep_reading.clear()
138 keep_reading.wait(timeout_input)
138 keep_reading.wait(timeout_input)
139 if len(t) > chunk_count_max + timeout_input:
139 if len(t) > chunk_count_max + timeout_input:
140 log.error("Timed out while waiting for input from subprocess.")
140 log.error("Timed out while waiting for input from subprocess.")
141 os._exit(-1) # this will cause the worker to recycle itself
141 os._exit(-1) # this will cause the worker to recycle itself
142
142
143 t.append(b)
143 t.append(b)
144 da.set()
144 da.set()
145
145
146 try:
146 try:
147 b = s.read(cs)
147 b = s.read(cs)
148 except ValueError: # probably "I/O operation on closed file"
148 except ValueError: # probably "I/O operation on closed file"
149 b = ''
149 b = ''
150
150
151 self.EOF.set()
151 self.EOF.set()
152 da.set() # for cases when done but there was no input.
152 da.set() # for cases when done but there was no input.
153
153
154
154
155 class BufferedGenerator:
155 class BufferedGenerator:
156 """
156 """
157 Class behaves as a non-blocking, buffered pipe reader.
157 Class behaves as a non-blocking, buffered pipe reader.
158 Reads chunks of data (through a thread)
158 Reads chunks of data (through a thread)
159 from a blocking pipe, and attaches these to an array (Deque) of chunks.
159 from a blocking pipe, and attaches these to an array (Deque) of chunks.
160 Reading is halted in the thread when max chunks is internally buffered.
160 Reading is halted in the thread when max chunks is internally buffered.
161 The .next() may operate in blocking or non-blocking fashion by yielding
161 The .next() may operate in blocking or non-blocking fashion by yielding
162 '' if no data is ready
162 '' if no data is ready
163 to be sent or by not returning until there is some data to send
163 to be sent or by not returning until there is some data to send
164 When we get EOF from underlying source pipe we raise the marker to raise
164 When we get EOF from underlying source pipe we raise the marker to raise
165 StopIteration after the last chunk of data is yielded.
165 StopIteration after the last chunk of data is yielded.
166 """
166 """
167
167
168 def __init__(self, name, source, buffer_size=65536, chunk_size=4096,
168 def __init__(self, name, source, buffer_size=65536, chunk_size=4096,
169 starting_values=None, bottomless=False):
169 starting_values=None, bottomless=False):
170 starting_values = starting_values or []
170 starting_values = starting_values or []
171 self.name = name
171 self.name = name
172 self.buffer_size = buffer_size
172 self.buffer_size = buffer_size
173 self.chunk_size = chunk_size
173 self.chunk_size = chunk_size
174
174
175 if bottomless:
175 if bottomless:
176 maxlen = int(buffer_size / chunk_size)
176 maxlen = int(buffer_size / chunk_size)
177 else:
177 else:
178 maxlen = None
178 maxlen = None
179
179
180 self.data_queue = collections.deque(starting_values, maxlen)
180 self.data_queue = collections.deque(starting_values, maxlen)
181 self.worker = InputStreamChunker(source, self.data_queue, buffer_size, chunk_size)
181 self.worker = InputStreamChunker(source, self.data_queue, buffer_size, chunk_size)
182 if starting_values:
182 if starting_values:
183 self.worker.data_added.set()
183 self.worker.data_added.set()
184 self.worker.start()
184 self.worker.start()
185
185
186 ####################
186 ####################
187 # Generator's methods
187 # Generator's methods
188 ####################
188 ####################
189 def __str__(self):
189 def __str__(self):
190 return f'BufferedGenerator(name={self.name} chunk: {self.chunk_size} on buffer: {self.buffer_size})'
190 return f'BufferedGenerator(name={self.name} chunk: {self.chunk_size} on buffer: {self.buffer_size})'
191
191
192 def __iter__(self):
192 def __iter__(self):
193 return self
193 return self
194
194
195 def __next__(self):
195 def __next__(self):
196
196
197 while not self.length and not self.worker.EOF.is_set():
197 while not self.length and not self.worker.EOF.is_set():
198 self.worker.data_added.clear()
198 self.worker.data_added.clear()
199 self.worker.data_added.wait(0.2)
199 self.worker.data_added.wait(0.2)
200
200
201 if self.length:
201 if self.length:
202 self.worker.keep_reading.set()
202 self.worker.keep_reading.set()
203 return bytes(self.data_queue.popleft())
203 return bytes(self.data_queue.popleft())
204 elif self.worker.EOF.is_set():
204 elif self.worker.EOF.is_set():
205 raise StopIteration
205 raise StopIteration
206
206
207 def throw(self, exc_type, value=None, traceback=None):
207 def throw(self, exc_type, value=None, traceback=None):
208 if not self.worker.EOF.is_set():
208 if not self.worker.EOF.is_set():
209 raise exc_type(value)
209 raise exc_type(value)
210
210
211 def start(self):
211 def start(self):
212 self.worker.start()
212 self.worker.start()
213
213
214 def stop(self):
214 def stop(self):
215 self.worker.stop()
215 self.worker.stop()
216
216
217 def close(self):
217 def close(self):
218 try:
218 try:
219 self.worker.stop()
219 self.worker.stop()
220 self.throw(GeneratorExit)
220 self.throw(GeneratorExit)
221 except (GeneratorExit, StopIteration):
221 except (GeneratorExit, StopIteration):
222 pass
222 pass
223
223
224 ####################
224 ####################
225 # Threaded reader's infrastructure.
225 # Threaded reader's infrastructure.
226 ####################
226 ####################
227 @property
227 @property
228 def input(self):
228 def input(self):
229 return self.worker.w
229 return self.worker.w
230
230
231 @property
231 @property
232 def data_added_event(self):
232 def data_added_event(self):
233 return self.worker.data_added
233 return self.worker.data_added
234
234
235 @property
235 @property
236 def data_added(self):
236 def data_added(self):
237 return self.worker.data_added.is_set()
237 return self.worker.data_added.is_set()
238
238
239 @property
239 @property
240 def reading_paused(self):
240 def reading_paused(self):
241 return not self.worker.keep_reading.is_set()
241 return not self.worker.keep_reading.is_set()
242
242
243 @property
243 @property
244 def done_reading_event(self):
244 def done_reading_event(self):
245 """
245 """
246 Done_reding does not mean that the iterator's buffer is empty.
246 Done_reding does not mean that the iterator's buffer is empty.
247 Iterator might have done reading from underlying source, but the read
247 Iterator might have done reading from underlying source, but the read
248 chunks might still be available for serving through .next() method.
248 chunks might still be available for serving through .next() method.
249
249
250 :returns: An Event class instance.
250 :returns: An Event class instance.
251 """
251 """
252 return self.worker.EOF
252 return self.worker.EOF
253
253
254 @property
254 @property
255 def done_reading(self):
255 def done_reading(self):
256 """
256 """
257 Done_reading does not mean that the iterator's buffer is empty.
257 Done_reading does not mean that the iterator's buffer is empty.
258 Iterator might have done reading from underlying source, but the read
258 Iterator might have done reading from underlying source, but the read
259 chunks might still be available for serving through .next() method.
259 chunks might still be available for serving through .next() method.
260
260
261 :returns: An Bool value.
261 :returns: An Bool value.
262 """
262 """
263 return self.worker.EOF.is_set()
263 return self.worker.EOF.is_set()
264
264
265 @property
265 @property
266 def length(self):
266 def length(self):
267 """
267 """
268 returns int.
268 returns int.
269
269
270 This is the length of the queue of chunks, not the length of
270 This is the length of the queue of chunks, not the length of
271 the combined contents in those chunks.
271 the combined contents in those chunks.
272
272
273 __len__() cannot be meaningfully implemented because this
273 __len__() cannot be meaningfully implemented because this
274 reader is just flying through a bottomless pit content and
274 reader is just flying through a bottomless pit content and
275 can only know the length of what it already saw.
275 can only know the length of what it already saw.
276
276
277 If __len__() on WSGI server per PEP 3333 returns a value,
277 If __len__() on WSGI server per PEP 3333 returns a value,
278 the response's length will be set to that. In order not to
278 the response's length will be set to that. In order not to
279 confuse WSGI PEP3333 servers, we will not implement __len__
279 confuse WSGI PEP3333 servers, we will not implement __len__
280 at all.
280 at all.
281 """
281 """
282 return len(self.data_queue)
282 return len(self.data_queue)
283
283
284 def prepend(self, x):
284 def prepend(self, x):
285 self.data_queue.appendleft(x)
285 self.data_queue.appendleft(x)
286
286
287 def append(self, x):
287 def append(self, x):
288 self.data_queue.append(x)
288 self.data_queue.append(x)
289
289
290 def extend(self, o):
290 def extend(self, o):
291 self.data_queue.extend(o)
291 self.data_queue.extend(o)
292
292
293 def __getitem__(self, i):
293 def __getitem__(self, i):
294 return self.data_queue[i]
294 return self.data_queue[i]
295
295
296
296
297 class SubprocessIOChunker:
297 class SubprocessIOChunker:
298 """
298 """
299 Processor class wrapping handling of subprocess IO.
299 Processor class wrapping handling of subprocess IO.
300
300
301 .. important::
301 .. important::
302
302
303 Watch out for the method `__del__` on this class. If this object
303 Watch out for the method `__del__` on this class. If this object
304 is deleted, it will kill the subprocess, so avoid to
304 is deleted, it will kill the subprocess, so avoid to
305 return the `output` attribute or usage of it like in the following
305 return the `output` attribute or usage of it like in the following
306 example::
306 example::
307
307
308 # `args` expected to run a program that produces a lot of output
308 # `args` expected to run a program that produces a lot of output
309 output = ''.join(SubprocessIOChunker(
309 output = ''.join(SubprocessIOChunker(
310 args, shell=False, inputstream=inputstream, env=environ).output)
310 args, shell=False, inputstream=inputstream, env=environ).output)
311
311
312 # `output` will not contain all the data, because the __del__ method
312 # `output` will not contain all the data, because the __del__ method
313 # has already killed the subprocess in this case before all output
313 # has already killed the subprocess in this case before all output
314 # has been consumed.
314 # has been consumed.
315
315
316
316
317
317
318 In a way, this is a "communicate()" replacement with a twist.
318 In a way, this is a "communicate()" replacement with a twist.
319
319
320 - We are multithreaded. Writing in and reading out, err are all sep threads.
320 - We are multithreaded. Writing in and reading out, err are all sep threads.
321 - We support concurrent (in and out) stream processing.
321 - We support concurrent (in and out) stream processing.
322 - The output is not a stream. It's a queue of read string (bytes, not str)
322 - The output is not a stream. It's a queue of read string (bytes, not str)
323 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
323 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
324 - We are non-blocking in more respects than communicate()
324 - We are non-blocking in more respects than communicate()
325 (reading from subprocess out pauses when internal buffer is full, but
325 (reading from subprocess out pauses when internal buffer is full, but
326 does not block the parent calling code. On the flip side, reading from
326 does not block the parent calling code. On the flip side, reading from
327 slow-yielding subprocess may block the iteration until data shows up. This
327 slow-yielding subprocess may block the iteration until data shows up. This
328 does not block the parallel inpipe reading occurring parallel thread.)
328 does not block the parallel inpipe reading occurring parallel thread.)
329
329
330 The purpose of the object is to allow us to wrap subprocess interactions into
330 The purpose of the object is to allow us to wrap subprocess interactions into
331 an iterable that can be passed to a WSGI server as the application's return
331 an iterable that can be passed to a WSGI server as the application's return
332 value. Because of stream-processing-ability, WSGI does not have to read ALL
332 value. Because of stream-processing-ability, WSGI does not have to read ALL
333 of the subprocess's output and buffer it, before handing it to WSGI server for
333 of the subprocess's output and buffer it, before handing it to WSGI server for
334 HTTP response. Instead, the class initializer reads just a bit of the stream
334 HTTP response. Instead, the class initializer reads just a bit of the stream
335 to figure out if error occurred or likely to occur and if not, just hands the
335 to figure out if error occurred or likely to occur and if not, just hands the
336 further iteration over subprocess output to the server for completion of HTTP
336 further iteration over subprocess output to the server for completion of HTTP
337 response.
337 response.
338
338
339 The real or perceived subprocess error is trapped and raised as one of
339 The real or perceived subprocess error is trapped and raised as one of
340 OSError family of exceptions
340 OSError family of exceptions
341
341
342 Example usage:
342 Example usage:
343 # try:
343 # try:
344 # answer = SubprocessIOChunker(
344 # answer = SubprocessIOChunker(
345 # cmd,
345 # cmd,
346 # input,
346 # input,
347 # buffer_size = 65536,
347 # buffer_size = 65536,
348 # chunk_size = 4096
348 # chunk_size = 4096
349 # )
349 # )
350 # except (OSError) as e:
350 # except (OSError) as e:
351 # print str(e)
351 # print str(e)
352 # raise e
352 # raise e
353 #
353 #
354 # return answer
354 # return answer
355
355
356
356
357 """
357 """
358
358
359 # TODO: johbo: This is used to make sure that the open end of the PIPE
359 # TODO: johbo: This is used to make sure that the open end of the PIPE
360 # is closed in the end. It would be way better to wrap this into an
360 # is closed in the end. It would be way better to wrap this into an
361 # object, so that it is closed automatically once it is consumed or
361 # object, so that it is closed automatically once it is consumed or
362 # something similar.
362 # something similar.
363 _close_input_fd = None
363 _close_input_fd = None
364
364
365 _closed = False
365 _closed = False
366 _stdout = None
366 _stdout = None
367 _stderr = None
367 _stderr = None
368
368
369 def __init__(self, cmd, input_stream=None, buffer_size=65536,
369 def __init__(self, cmd, input_stream=None, buffer_size=65536,
370 chunk_size=4096, starting_values=None, fail_on_stderr=True,
370 chunk_size=4096, starting_values=None, fail_on_stderr=True,
371 fail_on_return_code=True, **kwargs):
371 fail_on_return_code=True, **kwargs):
372 """
372 """
373 Initializes SubprocessIOChunker
373 Initializes SubprocessIOChunker
374
374
375 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
375 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
376 :param input_stream: (Default: None) A file-like, string, or file pointer.
376 :param input_stream: (Default: None) A file-like, string, or file pointer.
377 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
377 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
378 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
378 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
379 :param starting_values: (Default: []) An array of strings to put in front of output que.
379 :param starting_values: (Default: []) An array of strings to put in front of output que.
380 :param fail_on_stderr: (Default: True) Whether to raise an exception in
380 :param fail_on_stderr: (Default: True) Whether to raise an exception in
381 case something is written to stderr.
381 case something is written to stderr.
382 :param fail_on_return_code: (Default: True) Whether to raise an
382 :param fail_on_return_code: (Default: True) Whether to raise an
383 exception if the return code is not 0.
383 exception if the return code is not 0.
384 """
384 """
385
385
386 kwargs['shell'] = kwargs.get('shell', True)
386 kwargs['shell'] = kwargs.get('shell', True)
387
387
388 starting_values = starting_values or []
388 starting_values = starting_values or []
389 if input_stream:
389 if input_stream:
390 input_streamer = StreamFeeder(input_stream)
390 input_streamer = StreamFeeder(input_stream)
391 input_streamer.start()
391 input_streamer.start()
392 input_stream = input_streamer.output
392 input_stream = input_streamer.output
393 self._close_input_fd = input_stream
393 self._close_input_fd = input_stream
394
394
395 self._fail_on_stderr = fail_on_stderr
395 self._fail_on_stderr = fail_on_stderr
396 self._fail_on_return_code = fail_on_return_code
396 self._fail_on_return_code = fail_on_return_code
397 self.cmd = cmd
397 self.cmd = cmd
398
398
399 _p = subprocess.Popen(cmd, bufsize=-1, stdin=input_stream, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
399 _p = subprocess.Popen(cmd, bufsize=-1, stdin=input_stream, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
400 **kwargs)
400 **kwargs)
401 self.process = _p
401 self.process = _p
402
402
403 bg_out = BufferedGenerator('stdout', _p.stdout, buffer_size, chunk_size, starting_values)
403 bg_out = BufferedGenerator('stdout', _p.stdout, buffer_size, chunk_size, starting_values)
404 bg_err = BufferedGenerator('stderr', _p.stderr, 10240, 1, bottomless=True)
404 bg_err = BufferedGenerator('stderr', _p.stderr, 10240, 1, bottomless=True)
405
405
406 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
406 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
407 # doing this until we reach either end of file, or end of buffer.
407 # doing this until we reach either end of file, or end of buffer.
408 bg_out.data_added_event.wait(0.2)
408 bg_out.data_added_event.wait(0.2)
409 bg_out.data_added_event.clear()
409 bg_out.data_added_event.clear()
410
410
411 # at this point it's still ambiguous if we are done reading or just full buffer.
411 # at this point it's still ambiguous if we are done reading or just full buffer.
412 # Either way, if error (returned by ended process, or implied based on
412 # Either way, if error (returned by ended process, or implied based on
413 # presence of stuff in stderr output) we error out.
413 # presence of stuff in stderr output) we error out.
414 # Else, we are happy.
414 # Else, we are happy.
415 return_code = _p.poll()
415 return_code = _p.poll()
416 ret_code_ok = return_code in [None, 0]
416 ret_code_ok = return_code in [None, 0]
417 ret_code_fail = return_code is not None and return_code != 0
417 ret_code_fail = return_code is not None and return_code != 0
418
418 if (
419 if (
419 (ret_code_fail and fail_on_return_code) or
420 (ret_code_fail and fail_on_return_code) or
420 (ret_code_ok and fail_on_stderr and bg_err.length)
421 (ret_code_ok and fail_on_stderr and bg_err.length)
421 ):
422 ):
422
423
423 try:
424 try:
424 _p.terminate()
425 _p.terminate()
425 except Exception:
426 except Exception:
426 pass
427 pass
427
428
428 bg_out.stop()
429 bg_out.stop()
429 out = b''.join(bg_out)
430 out = b''.join(bg_out)
430 self._stdout = out
431 self._stdout = out
431
432
432 bg_err.stop()
433 bg_err.stop()
433 err = b''.join(bg_err)
434 err = b''.join(bg_err)
434 self._stderr = err
435 self._stderr = err
435
436
436 # code from https://github.com/schacon/grack/pull/7
437 # code from https://github.com/schacon/grack/pull/7
437 if err.strip() == b'fatal: The remote end hung up unexpectedly' and out.startswith(b'0034shallow '):
438 if err.strip() == b'fatal: The remote end hung up unexpectedly' and out.startswith(b'0034shallow '):
438 bg_out = iter([out])
439 bg_out = iter([out])
439 _p = None
440 _p = None
440 elif err and fail_on_stderr:
441 elif err and fail_on_stderr:
441 text_err = err.decode()
442 text_err = err.decode()
442 raise OSError(
443 raise OSError(
443 f"Subprocess exited due to an error:\n{text_err}")
444 f"Subprocess exited due to an error:\n{text_err}")
444
445
445 if ret_code_fail and fail_on_return_code:
446 if ret_code_fail and fail_on_return_code:
446 text_err = err.decode()
447 text_err = err.decode()
447 if not err:
448 if not err:
448 # maybe get empty stderr, try stdout instead
449 # maybe get empty stderr, try stdout instead
449 # in many cases git reports the errors on stdout too
450 # in many cases git reports the errors on stdout too
450 text_err = out.decode()
451 text_err = out.decode()
451 raise OSError(
452 raise OSError(
452 f"Subprocess exited with non 0 ret code:{return_code}: stderr:{text_err}")
453 f"Subprocess exited with non 0 ret code:{return_code}: stderr:{text_err}")
453
454
454 self.stdout = bg_out
455 self.stdout = bg_out
455 self.stderr = bg_err
456 self.stderr = bg_err
456 self.inputstream = input_stream
457 self.inputstream = input_stream
457
458
458 def __str__(self):
459 def __str__(self):
459 proc = getattr(self, 'process', 'NO_PROCESS')
460 proc = getattr(self, 'process', 'NO_PROCESS')
460 return f'SubprocessIOChunker: {proc}'
461 return f'SubprocessIOChunker: {proc}'
461
462
462 def __iter__(self):
463 def __iter__(self):
463 return self
464 return self
464
465
465 def __next__(self):
466 def __next__(self):
466 # Note: mikhail: We need to be sure that we are checking the return
467 # Note: mikhail: We need to be sure that we are checking the return
467 # code after the stdout stream is closed. Some processes, e.g. git
468 # code after the stdout stream is closed. Some processes, e.g. git
468 # are doing some magic in between closing stdout and terminating the
469 # are doing some magic in between closing stdout and terminating the
469 # process and, as a result, we are not getting return code on "slow"
470 # process and, as a result, we are not getting return code on "slow"
470 # systems.
471 # systems.
471 result = None
472 result = None
472 stop_iteration = None
473 stop_iteration = None
473 try:
474 try:
474 result = next(self.stdout)
475 result = next(self.stdout)
475 except StopIteration as e:
476 except StopIteration as e:
476 stop_iteration = e
477 stop_iteration = e
477
478
478 if self.process:
479 if self.process:
479 return_code = self.process.poll()
480 return_code = self.process.poll()
480 ret_code_fail = return_code is not None and return_code != 0
481 ret_code_fail = return_code is not None and return_code != 0
481 if ret_code_fail and self._fail_on_return_code:
482 if ret_code_fail and self._fail_on_return_code:
482 self.stop_streams()
483 self.stop_streams()
483 err = self.get_stderr()
484 err = self.get_stderr()
484 raise OSError(
485 raise OSError(
485 f"Subprocess exited (exit_code:{return_code}) due to an error during iteration:\n{err}")
486 f"Subprocess exited (exit_code:{return_code}) due to an error during iteration:\n{err}")
486
487
487 if stop_iteration:
488 if stop_iteration:
488 raise stop_iteration
489 raise stop_iteration
489 return result
490 return result
490
491
491 def throw(self, exc_type, value=None, traceback=None):
492 def throw(self, exc_type, value=None, traceback=None):
492 if self.stdout.length or not self.stdout.done_reading:
493 if self.stdout.length or not self.stdout.done_reading:
493 raise exc_type(value)
494 raise exc_type(value)
494
495
495 def close(self):
496 def close(self):
496 if self._closed:
497 if self._closed:
497 return
498 return
498
499
499 try:
500 try:
500 self.process.terminate()
501 self.process.terminate()
501 except Exception:
502 except Exception:
502 pass
503 pass
503 if self._close_input_fd:
504 if self._close_input_fd:
504 os.close(self._close_input_fd)
505 os.close(self._close_input_fd)
505 try:
506 try:
506 self.stdout.close()
507 self.stdout.close()
507 except Exception:
508 except Exception:
508 pass
509 pass
509 try:
510 try:
510 self.stderr.close()
511 self.stderr.close()
511 except Exception:
512 except Exception:
512 pass
513 pass
513 try:
514 try:
514 os.close(self.inputstream)
515 os.close(self.inputstream)
515 except Exception:
516 except Exception:
516 pass
517 pass
517
518
518 self._closed = True
519 self._closed = True
519
520
520 def stop_streams(self):
521 def stop_streams(self):
521 getattr(self.stdout, 'stop', lambda: None)()
522 getattr(self.stdout, 'stop', lambda: None)()
522 getattr(self.stderr, 'stop', lambda: None)()
523 getattr(self.stderr, 'stop', lambda: None)()
523
524
524 def get_stdout(self):
525 def get_stdout(self):
525 if self._stdout:
526 if self._stdout:
526 return self._stdout
527 return self._stdout
527 else:
528 else:
528 return b''.join(self.stdout)
529 return b''.join(self.stdout)
529
530
530 def get_stderr(self):
531 def get_stderr(self):
531 if self._stderr:
532 if self._stderr:
532 return self._stderr
533 return self._stderr
533 else:
534 else:
534 return b''.join(self.stderr)
535 return b''.join(self.stderr)
535
536
536
537
537 def run_command(arguments, env=None):
538 def run_command(arguments, env=None):
538 """
539 """
539 Run the specified command and return the stdout.
540 Run the specified command and return the stdout.
540
541
541 :param arguments: sequence of program arguments (including the program name)
542 :param arguments: sequence of program arguments (including the program name)
542 :type arguments: list[str]
543 :type arguments: list[str]
543 """
544 """
544
545
545 cmd = arguments
546 cmd = arguments
546 log.debug('Running subprocessio command %s', cmd)
547 log.debug('Running subprocessio command %s', cmd)
547 proc = None
548 proc = None
548 try:
549 try:
549 _opts = {'shell': False, 'fail_on_stderr': False}
550 _opts = {'shell': False, 'fail_on_stderr': False}
550 if env:
551 if env:
551 _opts.update({'env': env})
552 _opts.update({'env': env})
552 proc = SubprocessIOChunker(cmd, **_opts)
553 proc = SubprocessIOChunker(cmd, **_opts)
553 return b''.join(proc), b''.join(proc.stderr)
554 return b''.join(proc), b''.join(proc.stderr)
554 except OSError as err:
555 except OSError as err:
555 cmd = ' '.join(map(safe_str, cmd)) # human friendly CMD
556 cmd = ' '.join(map(safe_str, cmd)) # human friendly CMD
556 tb_err = ("Couldn't run subprocessio command (%s).\n"
557 tb_err = ("Couldn't run subprocessio command (%s).\n"
557 "Original error was:%s\n" % (cmd, err))
558 "Original error was:%s\n" % (cmd, err))
558 log.exception(tb_err)
559 log.exception(tb_err)
559 raise Exception(tb_err)
560 raise Exception(tb_err)
560 finally:
561 finally:
561 if proc:
562 if proc:
562 proc.close()
563 proc.close()
563
564
General Comments 0
You need to be logged in to leave comments. Login now