##// END OF EJS Templates
core: code changes to handle branch-permissions / locking / incorrect client support
super-admin -
r1323:a1a9e846 default
parent child Browse files
Show More
@@ -1,131 +1,138 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 """
19 19 Special exception handling over the wire.
20 20
21 21 Since we cannot assume that our client is able to import our exception classes,
22 22 this module provides a "wrapping" mechanism to raise plain exceptions
23 23 which contain an extra attribute `_vcs_kind` to allow a client to distinguish
24 24 different error conditions.
25 25 """
26 26
27 27 from pyramid.httpexceptions import HTTPLocked, HTTPForbidden
28 28
29 29
30 30 def _make_exception(kind, org_exc, *args):
31 31 """
32 32 Prepares a base `Exception` instance to be sent over the wire.
33 33
34 34 To give our caller a hint what this is about, it will attach an attribute
35 35 `_vcs_kind` to the exception.
36 36 """
37 37 exc = Exception(*args)
38 38 exc._vcs_kind = kind
39 39 exc._org_exc = org_exc
40 40 exc._org_exc_tb = getattr(org_exc, '_org_exc_tb', '')
41 41 return exc
42 42
43 43
44 44 def AbortException(org_exc=None):
45 45 def _make_exception_wrapper(*args):
46 46 return _make_exception('abort', org_exc, *args)
47 47 return _make_exception_wrapper
48 48
49 49
50 50 def ArchiveException(org_exc=None):
51 51 def _make_exception_wrapper(*args):
52 52 return _make_exception('archive', org_exc, *args)
53 53 return _make_exception_wrapper
54 54
55 55
56 def ClientNotSupportedException(org_exc=None):
57 def _make_exception_wrapper(*args):
58 return _make_exception('client_not_supported', org_exc, *args)
59 return _make_exception_wrapper
60
61
62 56 def LookupException(org_exc=None):
63 57 def _make_exception_wrapper(*args):
64 58 return _make_exception('lookup', org_exc, *args)
65 59 return _make_exception_wrapper
66 60
67 61
68 62 def VcsException(org_exc=None):
69 63 def _make_exception_wrapper(*args):
70 64 return _make_exception('error', org_exc, *args)
71 65 return _make_exception_wrapper
72 66
73 67
74 68 def LockedRepoException(org_exc=None):
75 69 def _make_exception_wrapper(*args):
76 70 return _make_exception('repo_locked', org_exc, *args)
77 71 return _make_exception_wrapper
78 72
79 73
80 74 def RepositoryBranchProtectedException(org_exc=None):
81 75 def _make_exception_wrapper(*args):
82 76 return _make_exception('repo_branch_protected', org_exc, *args)
83 77 return _make_exception_wrapper
84 78
79 def ClientNotSupportedException(org_exc=None):
80 def _make_exception_wrapper(*args):
81 return _make_exception('client_not_supported', org_exc, *args)
82 return _make_exception_wrapper
85 83
86 84 def RequirementException(org_exc=None):
87 85 def _make_exception_wrapper(*args):
88 86 return _make_exception('requirement', org_exc, *args)
89 87 return _make_exception_wrapper
90 88
91 89
92 90 def UnhandledException(org_exc=None):
93 91 def _make_exception_wrapper(*args):
94 92 return _make_exception('unhandled', org_exc, *args)
95 93 return _make_exception_wrapper
96 94
97 95
98 96 def URLError(org_exc=None):
99 97 def _make_exception_wrapper(*args):
100 98 return _make_exception('url_error', org_exc, *args)
101 99 return _make_exception_wrapper
102 100
103 101
104 102 def SubrepoMergeException(org_exc=None):
105 103 def _make_exception_wrapper(*args):
106 104 return _make_exception('subrepo_merge_error', org_exc, *args)
107 105 return _make_exception_wrapper
108 106
109 107
110 108 class HTTPRepoLocked(HTTPLocked):
111 109 """
112 110 Subclass of HTTPLocked response that allows to set the title and status
113 111 code via constructor arguments.
114 112 """
115 113 def __init__(self, title, status_code=None, **kwargs):
116 114 self.code = status_code or HTTPLocked.code
117 115 self.title = title
118 116 super().__init__(**kwargs)
119 117
120 118
121 class HTTPRepoBranchProtected(HTTPForbidden):
122 def __init__(self, *args, **kwargs):
123 super(HTTPForbidden, self).__init__(*args, **kwargs)
119 class HTTPRepoBranchProtected(HTTPLocked):
120 def __init__(self, title, status_code=None, **kwargs):
121 self.code = status_code or HTTPLocked.code
122 self.title = title
123 super().__init__(**kwargs)
124
125
126 class HTTPClientNotSupported(HTTPLocked):
127 def __init__(self, title, status_code=None, **kwargs):
128 self.code = status_code or HTTPLocked.code
129 self.title = title
130 super().__init__(**kwargs)
124 131
125 132
126 133 class RefNotFoundException(KeyError):
127 134 pass
128 135
129 136
130 137 class NoContentException(ValueError):
131 138 pass
@@ -1,780 +1,782 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2023 RhodeCode GmbH
2 # Copyright (C) 2014-2024 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import io
19 19 import os
20 20 import sys
21 21 import logging
22 22 import collections
23 23 import base64
24 24 import msgpack
25 25 import dataclasses
26 26 import pygit2
27 27
28 28 import http.client
29 29 from celery import Celery
30 30
31 31 import mercurial.scmutil
32 32 import mercurial.node
33 33
34 34 from vcsserver import exceptions, subprocessio, settings
35 35 from vcsserver.lib.ext_json import json
36 36 from vcsserver.lib.str_utils import ascii_str, safe_str
37 37 from vcsserver.lib.svn_txn_utils import get_txn_id_from_store
38 38 from vcsserver.remote.git_remote import Repository
39 39
40 40 celery_app = Celery('__vcsserver__')
41 41 log = logging.getLogger(__name__)
42 42
43 43
44 44 class HooksCeleryClient:
45 45 TASK_TIMEOUT = 60 # time in seconds
46 46
47 47 def __init__(self, queue, backend):
48 48 celery_app.config_from_object({
49 49 'broker_url': queue, 'result_backend': backend,
50 50 'broker_connection_retry_on_startup': True,
51 51 'task_serializer': 'json',
52 52 'accept_content': ['json', 'msgpack'],
53 53 'result_serializer': 'json',
54 54 'result_accept_content': ['json', 'msgpack']
55 55 })
56 56 self.celery_app = celery_app
57 57
58 58 def __call__(self, method, extras):
59 # NOTE: exception handling for those tasks executed is in
60 # @adapt_for_celery decorator
61 # also see: _maybe_handle_exception which is handling exceptions
59 62 inquired_task = self.celery_app.signature(
60 63 f'rhodecode.lib.celerylib.tasks.{method}'
61 64 )
62 65 result = inquired_task.delay(extras).get(timeout=self.TASK_TIMEOUT)
63 66
64 67 return result
65 68
66 69
67 70 class HooksShadowRepoClient:
68 71
69 72 def __call__(self, hook_name, extras):
70 73 return {'output': '', 'status': 0}
71 74
72 75
73 76 class RemoteMessageWriter:
74 77 """Writer base class."""
75 78 def write(self, message):
76 79 raise NotImplementedError()
77 80
78 81
79 82 class HgMessageWriter(RemoteMessageWriter):
80 83 """Writer that knows how to send messages to mercurial clients."""
81 84
82 85 def __init__(self, ui):
83 86 self.ui = ui
84 87
85 88 def write(self, message: str):
86 # TODO: Check why the quiet flag is set by default.
87 old = self.ui.quiet
88 self.ui.quiet = False
89 self.ui.status(message.encode('utf-8'))
90 self.ui.quiet = old
89 args = (message.encode('utf-8'),)
90 self.ui._writemsg(self.ui._fmsgerr, type=b'status', *args)
91 91
92 92
93 93 class GitMessageWriter(RemoteMessageWriter):
94 94 """Writer that knows how to send messages to git clients."""
95 95
96 96 def __init__(self, stdout=None):
97 97 self.stdout = stdout or sys.stdout
98 98
99 99 def write(self, message: str):
100 self.stdout.write(message)
100 self.stdout.write(message + "\n" if message else "")
101 101
102 102
103 103 class SvnMessageWriter(RemoteMessageWriter):
104 104 """Writer that knows how to send messages to svn clients."""
105 105
106 106 def __init__(self, stderr=None):
107 107 # SVN needs data sent to stderr for back-to-client messaging
108 108 self.stderr = stderr or sys.stderr
109 109
110 110 def write(self, message):
111 111 self.stderr.write(message)
112 112
113 113
114 def _maybe_handle_exception(result):
115
114 def _maybe_handle_exception(writer, result):
115 """
116 adopt_for_celery defines the exception/exception_traceback
117 Ths result is a direct output from a celery task
118 """
116 119
117 120 exception_class = result.get('exception')
118 121 exception_traceback = result.get('exception_traceback')
119 if not exception_class:
120 return
121 122
122 log.debug('Handling hook-call exception: %s', exception_class)
123
124 if exception_traceback:
125 log.error('Got traceback from remote call:%s', exception_traceback)
126
127 if exception_class == 'HTTPLockedRepo':
128 raise exceptions.LockedRepoException()(*result['exception_args'])
129 elif exception_class == 'ClientNotSupportedError':
130 raise exceptions.ClientNotSupportedException()(*result['exception_args'])
131 elif exception_class == 'HTTPBranchProtected':
132 raise exceptions.RepositoryBranchProtectedException()(*result['exception_args'])
133 elif exception_class == 'RepositoryError':
134 raise exceptions.VcsException()(*result['exception_args'])
135 elif exception_class:
136 raise Exception(
137 f"""Got remote exception "{exception_class}" with args "{result['exception_args']}" """
138 )
123 match exception_class:
124 # NOTE: the underlying exceptions are setting _vcs_kind special marker
125 # which is later handled by `handle_vcs_exception` and translated into a special HTTP exception
126 # propagated later to the client
127 case 'HTTPLockedRepo':
128 raise exceptions.LockedRepoException()(*result['exception_args'])
129 case 'ClientNotSupported':
130 raise exceptions.ClientNotSupportedException()(*result['exception_args'])
131 case 'HTTPBranchProtected':
132 raise exceptions.RepositoryBranchProtectedException()(*result['exception_args'])
133 case 'RepositoryError':
134 raise exceptions.VcsException()(*result['exception_args'])
135 case _:
136 if exception_class:
137 log.error('Handling hook-call exception. Got traceback from remote call:%s', exception_traceback)
138 raise Exception(
139 f"""Got remote exception "{exception_class}" with args "{result['exception_args']}" """
140 )
139 141
140 142
141 143 def _get_hooks_client(extras):
142 144 task_queue = extras.get('task_queue')
143 145 task_backend = extras.get('task_backend')
144 146 is_shadow_repo = extras.get('is_shadow_repo')
145 147
146 148 if task_queue and task_backend:
147 149 return HooksCeleryClient(task_queue, task_backend)
148 150 elif is_shadow_repo:
149 151 return HooksShadowRepoClient()
150 152 else:
151 153 raise Exception("Hooks client not found!")
152 154
153 155
154 156 def _call_hook(hook_name, extras, writer):
155 157 hooks_client = _get_hooks_client(extras)
156 158 log.debug('Hooks, using client:%s', hooks_client)
157 159 result = hooks_client(hook_name, extras)
158 160 log.debug('Hooks got result: %s', result)
159 _maybe_handle_exception(result)
161 _maybe_handle_exception(writer, result)
160 162 writer.write(result['output'])
161 163
162 164 return result['status']
163 165
164 166
165 167 def _extras_from_ui(ui):
166 168 hook_data = ui.config(b'rhodecode', b'RC_SCM_DATA')
167 169 if not hook_data:
168 170 # maybe it's inside environ ?
169 171 env_hook_data = os.environ.get('RC_SCM_DATA')
170 172 if env_hook_data:
171 173 hook_data = env_hook_data
172 174
173 175 extras = {}
174 176 if hook_data:
175 177 extras = json.loads(hook_data)
176 178 return extras
177 179
178 180
179 181 def _rev_range_hash(repo, node, check_heads=False):
180 182 from vcsserver.hgcompat import get_ctx
181 183
182 184 commits = []
183 185 revs = []
184 186 start = get_ctx(repo, node).rev()
185 187 end = len(repo)
186 188 for rev in range(start, end):
187 189 revs.append(rev)
188 190 ctx = get_ctx(repo, rev)
189 191 commit_id = ascii_str(mercurial.node.hex(ctx.node()))
190 192 branch = safe_str(ctx.branch())
191 193 commits.append((commit_id, branch))
192 194
193 195 parent_heads = []
194 196 if check_heads:
195 197 parent_heads = _check_heads(repo, start, end, revs)
196 198 return commits, parent_heads
197 199
198 200
199 201 def _check_heads(repo, start, end, commits):
200 202 from vcsserver.hgcompat import get_ctx
201 203 changelog = repo.changelog
202 204 parents = set()
203 205
204 206 for new_rev in commits:
205 207 for p in changelog.parentrevs(new_rev):
206 208 if p == mercurial.node.nullrev:
207 209 continue
208 210 if p < start:
209 211 parents.add(p)
210 212
211 213 for p in parents:
212 214 branch = get_ctx(repo, p).branch()
213 215 # The heads descending from that parent, on the same branch
214 216 parent_heads = {p}
215 217 reachable = {p}
216 218 for x in range(p + 1, end):
217 219 if get_ctx(repo, x).branch() != branch:
218 220 continue
219 221 for pp in changelog.parentrevs(x):
220 222 if pp in reachable:
221 223 reachable.add(x)
222 224 parent_heads.discard(pp)
223 225 parent_heads.add(x)
224 226 # More than one head? Suggest merging
225 227 if len(parent_heads) > 1:
226 228 return list(parent_heads)
227 229
228 230 return []
229 231
230 232
231 233 def _get_git_env():
232 234 env = {}
233 235 for k, v in os.environ.items():
234 236 if k.startswith('GIT'):
235 237 env[k] = v
236 238
237 239 # serialized version
238 240 return [(k, v) for k, v in env.items()]
239 241
240 242
241 243 def _get_hg_env(old_rev, new_rev, txnid, repo_path):
242 244 env = {}
243 245 for k, v in os.environ.items():
244 246 if k.startswith('HG'):
245 247 env[k] = v
246 248
247 249 env['HG_NODE'] = old_rev
248 250 env['HG_NODE_LAST'] = new_rev
249 251 env['HG_TXNID'] = txnid
250 252 env['HG_PENDING'] = repo_path
251 253
252 254 return [(k, v) for k, v in env.items()]
253 255
254 256
255 257 def _get_ini_settings(ini_file):
256 258 from vcsserver.http_main import sanitize_settings_and_apply_defaults
257 259 from vcsserver.lib.config_utils import get_app_config_lightweight, configure_and_store_settings
258 260
259 261 global_config = {'__file__': ini_file}
260 262 ini_settings = get_app_config_lightweight(ini_file)
261 263 sanitize_settings_and_apply_defaults(global_config, ini_settings)
262 264 configure_and_store_settings(global_config, ini_settings)
263 265
264 266 return ini_settings
265 267
266 268
267 269 def _fix_hooks_executables(ini_path=''):
268 270 """
269 271 This is a trick to set proper settings.EXECUTABLE paths for certain execution patterns
270 272 especially for subversion where hooks strip entire env, and calling just 'svn' command will most likely fail
271 273 because svn is not on PATH
272 274 """
273 275 # set defaults, in case we can't read from ini_file
274 276 core_binary_dir = settings.BINARY_DIR or '/usr/local/bin/rhodecode_bin/vcs_bin'
275 277 if ini_path:
276 278 ini_settings = _get_ini_settings(ini_path)
277 279 core_binary_dir = ini_settings['core.binary_dir']
278 280
279 281 settings.BINARY_DIR = core_binary_dir
280 282
281 283
282 284 def repo_size(ui, repo, **kwargs):
283 285 extras = _extras_from_ui(ui)
284 286 return _call_hook('repo_size', extras, HgMessageWriter(ui))
285 287
286 288
287 289 def pre_pull(ui, repo, **kwargs):
288 290 extras = _extras_from_ui(ui)
289 291 return _call_hook('pre_pull', extras, HgMessageWriter(ui))
290 292
291 293
292 294 def pre_pull_ssh(ui, repo, **kwargs):
293 295 extras = _extras_from_ui(ui)
294 296 if extras and extras.get('SSH'):
295 297 return pre_pull(ui, repo, **kwargs)
296 298 return 0
297 299
298 300
299 301 def post_pull(ui, repo, **kwargs):
300 302 extras = _extras_from_ui(ui)
301 303 return _call_hook('post_pull', extras, HgMessageWriter(ui))
302 304
303 305
304 306 def post_pull_ssh(ui, repo, **kwargs):
305 307 extras = _extras_from_ui(ui)
306 308 if extras and extras.get('SSH'):
307 309 return post_pull(ui, repo, **kwargs)
308 310 return 0
309 311
310 312
311 313 def pre_push(ui, repo, node=None, **kwargs):
312 314 """
313 315 Mercurial pre_push hook
314 316 """
315 317 extras = _extras_from_ui(ui)
316 318 detect_force_push = extras.get('detect_force_push')
317 319
318 320 rev_data = []
319 321 hook_type: str = safe_str(kwargs.get('hooktype'))
320 322
321 323 if node and hook_type == 'pretxnchangegroup':
322 324 branches = collections.defaultdict(list)
323 325 commits, _heads = _rev_range_hash(repo, node, check_heads=detect_force_push)
324 326 for commit_id, branch in commits:
325 327 branches[branch].append(commit_id)
326 328
327 329 for branch, commits in branches.items():
328 330 old_rev = ascii_str(kwargs.get('node_last')) or commits[0]
329 331 rev_data.append({
330 332 'total_commits': len(commits),
331 333 'old_rev': old_rev,
332 334 'new_rev': commits[-1],
333 335 'ref': '',
334 336 'type': 'branch',
335 337 'name': branch,
336 338 })
337 339
338 340 for push_ref in rev_data:
339 341 push_ref['multiple_heads'] = _heads
340 342
341 343 repo_path = os.path.join(
342 344 extras.get('repo_store', ''), extras.get('repository', ''))
343 345 push_ref['hg_env'] = _get_hg_env(
344 346 old_rev=push_ref['old_rev'],
345 347 new_rev=push_ref['new_rev'], txnid=ascii_str(kwargs.get('txnid')),
346 348 repo_path=repo_path)
347 349
348 350 extras['hook_type'] = hook_type or 'pre_push'
349 351 extras['commit_ids'] = rev_data
350 352
351 353 return _call_hook('pre_push', extras, HgMessageWriter(ui))
352 354
353 355
354 356 def pre_push_ssh(ui, repo, node=None, **kwargs):
355 357 extras = _extras_from_ui(ui)
356 358 if extras.get('SSH'):
357 359 return pre_push(ui, repo, node, **kwargs)
358 360
359 361 return 0
360 362
361 363
362 364 def pre_push_ssh_auth(ui, repo, node=None, **kwargs):
363 365 """
364 366 Mercurial pre_push hook for SSH
365 367 """
366 368 extras = _extras_from_ui(ui)
367 369 if extras.get('SSH'):
368 370 permission = extras['SSH_PERMISSIONS']
369 371
370 372 if 'repository.write' == permission or 'repository.admin' == permission:
371 373 return 0
372 374
373 375 # non-zero ret code
374 376 return 1
375 377
376 378 return 0
377 379
378 380
379 381 def post_push(ui, repo, node, **kwargs):
380 382 """
381 383 Mercurial post_push hook
382 384 """
383 385 extras = _extras_from_ui(ui)
384 386
385 387 commit_ids = []
386 388 branches = []
387 389 bookmarks = []
388 390 tags = []
389 391 hook_type: str = safe_str(kwargs.get('hooktype'))
390 392
391 393 commits, _heads = _rev_range_hash(repo, node)
392 394 for commit_id, branch in commits:
393 395 commit_ids.append(commit_id)
394 396 if branch not in branches:
395 397 branches.append(branch)
396 398
397 399 if hasattr(ui, '_rc_pushkey_bookmarks'):
398 400 bookmarks = ui._rc_pushkey_bookmarks
399 401
400 402 extras['hook_type'] = hook_type or 'post_push'
401 403 extras['commit_ids'] = commit_ids
402 404
403 405 extras['new_refs'] = {
404 406 'branches': branches,
405 407 'bookmarks': bookmarks,
406 408 'tags': tags
407 409 }
408 410
409 411 return _call_hook('post_push', extras, HgMessageWriter(ui))
410 412
411 413
412 414 def post_push_ssh(ui, repo, node, **kwargs):
413 415 """
414 416 Mercurial post_push hook for SSH
415 417 """
416 418 if _extras_from_ui(ui).get('SSH'):
417 419 return post_push(ui, repo, node, **kwargs)
418 420 return 0
419 421
420 422
421 423 def key_push(ui, repo, **kwargs):
422 424 from vcsserver.hgcompat import get_ctx
423 425
424 426 if kwargs['new'] != b'0' and kwargs['namespace'] == b'bookmarks':
425 427 # store new bookmarks in our UI object propagated later to post_push
426 428 ui._rc_pushkey_bookmarks = get_ctx(repo, kwargs['key']).bookmarks()
427 429 return
428 430
429 431
430 432 # backward compat
431 433 log_pull_action = post_pull
432 434
433 435 # backward compat
434 436 log_push_action = post_push
435 437
436 438
437 439 def handle_git_pre_receive(unused_repo_path, unused_revs, unused_env):
438 440 """
439 441 Old hook name: keep here for backward compatibility.
440 442
441 443 This is only required when the installed git hooks are not upgraded.
442 444 """
443 445 pass
444 446
445 447
446 448 def handle_git_post_receive(unused_repo_path, unused_revs, unused_env):
447 449 """
448 450 Old hook name: keep here for backward compatibility.
449 451
450 452 This is only required when the installed git hooks are not upgraded.
451 453 """
452 454 pass
453 455
454 456
455 457 @dataclasses.dataclass
456 458 class HookResponse:
457 459 status: int
458 460 output: str
459 461
460 462
461 463 def git_pre_pull(extras) -> HookResponse:
462 464 """
463 465 Pre pull hook.
464 466
465 467 :param extras: dictionary containing the keys defined in simplevcs
466 468 :type extras: dict
467 469
468 470 :return: status code of the hook. 0 for success.
469 471 :rtype: int
470 472 """
471 473
472 474 if 'pull' not in extras['hooks']:
473 475 return HookResponse(0, '')
474 476
475 477 stdout = io.StringIO()
476 478 try:
477 479 status_code = _call_hook('pre_pull', extras, GitMessageWriter(stdout))
478 480
479 481 except Exception as error:
480 482 log.exception('Failed to call pre_pull hook')
481 483 status_code = 128
482 484 stdout.write(f'ERROR: {error}\n')
483 485
484 486 return HookResponse(status_code, stdout.getvalue())
485 487
486 488
487 489 def git_post_pull(extras) -> HookResponse:
488 490 """
489 491 Post pull hook.
490 492
491 493 :param extras: dictionary containing the keys defined in simplevcs
492 494 :type extras: dict
493 495
494 496 :return: status code of the hook. 0 for success.
495 497 :rtype: int
496 498 """
497 499 if 'pull' not in extras['hooks']:
498 500 return HookResponse(0, '')
499 501
500 502 stdout = io.StringIO()
501 503 try:
502 504 status = _call_hook('post_pull', extras, GitMessageWriter(stdout))
503 505 except Exception as error:
504 506 status = 128
505 507 stdout.write(f'ERROR: {error}\n')
506 508
507 509 return HookResponse(status, stdout.getvalue())
508 510
509 511
510 512 def _parse_git_ref_lines(revision_lines):
511 513 rev_data = []
512 514 for revision_line in revision_lines or []:
513 515 old_rev, new_rev, ref = revision_line.strip().split(' ')
514 516 ref_data = ref.split('/', 2)
515 517 if ref_data[1] in ('tags', 'heads'):
516 518 rev_data.append({
517 519 # NOTE(marcink):
518 520 # we're unable to tell total_commits for git at this point
519 521 # but we set the variable for consistency with GIT
520 522 'total_commits': -1,
521 523 'old_rev': old_rev,
522 524 'new_rev': new_rev,
523 525 'ref': ref,
524 526 'type': ref_data[1],
525 527 'name': ref_data[2],
526 528 })
527 529 return rev_data
528 530
529 531
530 532 def git_pre_receive(unused_repo_path, revision_lines, env) -> int:
531 533 """
532 534 Pre push hook.
533 535
534 536 :return: status code of the hook. 0 for success.
535 537 """
536 538 extras = json.loads(env['RC_SCM_DATA'])
537 539 rev_data = _parse_git_ref_lines(revision_lines)
538 540 if 'push' not in extras['hooks']:
539 541 return 0
540 542 _fix_hooks_executables(env.get('RC_INI_FILE'))
541 543
542 544 empty_commit_id = '0' * 40
543 545
544 546 detect_force_push = extras.get('detect_force_push')
545 547
546 548 for push_ref in rev_data:
547 549 # store our git-env which holds the temp store
548 550 push_ref['git_env'] = _get_git_env()
549 551 push_ref['pruned_sha'] = ''
550 552 if not detect_force_push:
551 553 # don't check for forced-push when we don't need to
552 554 continue
553 555
554 556 type_ = push_ref['type']
555 557 new_branch = push_ref['old_rev'] == empty_commit_id
556 558 delete_branch = push_ref['new_rev'] == empty_commit_id
557 559 if type_ == 'heads' and not (new_branch or delete_branch):
558 560 old_rev = push_ref['old_rev']
559 561 new_rev = push_ref['new_rev']
560 562 cmd = [settings.GIT_EXECUTABLE(), 'rev-list', old_rev, f'^{new_rev}']
561 563 stdout, stderr = subprocessio.run_command(
562 564 cmd, env=os.environ.copy())
563 565 # means we're having some non-reachable objects, this forced push was used
564 566 if stdout:
565 567 push_ref['pruned_sha'] = stdout.splitlines()
566 568
567 569 extras['hook_type'] = 'pre_receive'
568 570 extras['commit_ids'] = rev_data
569 571
570 572 stdout = sys.stdout
571 573 status_code = _call_hook('pre_push', extras, GitMessageWriter(stdout))
572 574
573 575 return status_code
574 576
575 577
576 578 def git_post_receive(unused_repo_path, revision_lines, env) -> int:
577 579 """
578 580 Post push hook.
579 581
580 582 :return: status code of the hook. 0 for success.
581 583 """
582 584 extras = json.loads(env['RC_SCM_DATA'])
583 585 if 'push' not in extras['hooks']:
584 586 return 0
585 587
586 588 _fix_hooks_executables(env.get('RC_INI_FILE'))
587 589
588 590 rev_data = _parse_git_ref_lines(revision_lines)
589 591
590 592 git_revs = []
591 593
592 594 # N.B.(skreft): it is ok to just call git, as git before calling a
593 595 # subcommand sets the PATH environment variable so that it point to the
594 596 # correct version of the git executable.
595 597 empty_commit_id = '0' * 40
596 598 branches = []
597 599 tags = []
598 600 for push_ref in rev_data:
599 601 type_ = push_ref['type']
600 602
601 603 if type_ == 'heads':
602 604 # starting new branch case
603 605 if push_ref['old_rev'] == empty_commit_id:
604 606 push_ref_name = push_ref['name']
605 607
606 608 if push_ref_name not in branches:
607 609 branches.append(push_ref_name)
608 610
609 611 need_head_set = ''
610 612 with Repository(os.getcwd()) as repo:
611 613 try:
612 614 repo.head
613 615 except pygit2.GitError:
614 616 need_head_set = f'refs/heads/{push_ref_name}'
615 617
616 618 if need_head_set:
617 619 repo.set_head(need_head_set)
618 620 print(f"Setting default branch to {push_ref_name}")
619 621
620 622 cmd = [settings.GIT_EXECUTABLE(), 'for-each-ref', '--format=%(refname)', 'refs/heads/*']
621 623 stdout, stderr = subprocessio.run_command(
622 624 cmd, env=os.environ.copy())
623 625 heads = safe_str(stdout)
624 626 heads = heads.replace(push_ref['ref'], '')
625 627 heads = ' '.join(head for head
626 628 in heads.splitlines() if head) or '.'
627 629 cmd = [settings.GIT_EXECUTABLE(), 'log', '--reverse',
628 630 '--pretty=format:%H', '--', push_ref['new_rev'],
629 631 '--not', heads]
630 632 stdout, stderr = subprocessio.run_command(
631 633 cmd, env=os.environ.copy())
632 634 git_revs.extend(list(map(ascii_str, stdout.splitlines())))
633 635
634 636 # delete branch case
635 637 elif push_ref['new_rev'] == empty_commit_id:
636 638 git_revs.append(f'delete_branch=>{push_ref["name"]}')
637 639 else:
638 640 if push_ref['name'] not in branches:
639 641 branches.append(push_ref['name'])
640 642
641 643 cmd = [settings.GIT_EXECUTABLE(), 'log',
642 644 f'{push_ref["old_rev"]}..{push_ref["new_rev"]}',
643 645 '--reverse', '--pretty=format:%H']
644 646 stdout, stderr = subprocessio.run_command(
645 647 cmd, env=os.environ.copy())
646 648 # we get bytes from stdout, we need str to be consistent
647 649 log_revs = list(map(ascii_str, stdout.splitlines()))
648 650 git_revs.extend(log_revs)
649 651
650 652 # Pure pygit2 impl. but still 2-3x slower :/
651 653 # results = []
652 654 #
653 655 # with Repository(os.getcwd()) as repo:
654 656 # repo_new_rev = repo[push_ref['new_rev']]
655 657 # repo_old_rev = repo[push_ref['old_rev']]
656 658 # walker = repo.walk(repo_new_rev.id, pygit2.GIT_SORT_TOPOLOGICAL)
657 659 #
658 660 # for commit in walker:
659 661 # if commit.id == repo_old_rev.id:
660 662 # break
661 663 # results.append(commit.id.hex)
662 664 # # reverse the order, can't use GIT_SORT_REVERSE
663 665 # log_revs = results[::-1]
664 666
665 667 elif type_ == 'tags':
666 668 if push_ref['name'] not in tags:
667 669 tags.append(push_ref['name'])
668 670 git_revs.append(f'tag=>{push_ref["name"]}')
669 671
670 672 extras['hook_type'] = 'post_receive'
671 673 extras['commit_ids'] = git_revs
672 674 extras['new_refs'] = {
673 675 'branches': branches,
674 676 'bookmarks': [],
675 677 'tags': tags,
676 678 }
677 679
678 680 stdout = sys.stdout
679 681
680 682 if 'repo_size' in extras['hooks']:
681 683 try:
682 684 _call_hook('repo_size', extras, GitMessageWriter(stdout))
683 685 except Exception:
684 686 pass
685 687
686 688 status_code = _call_hook('post_push', extras, GitMessageWriter(stdout))
687 689 return status_code
688 690
689 691
690 692 def get_extras_from_txn_id(repo_path, txn_id):
691 693 extras = get_txn_id_from_store(repo_path, txn_id)
692 694 return extras
693 695
694 696
695 697 def svn_pre_commit(repo_path, commit_data, env):
696 698
697 699 path, txn_id = commit_data
698 700 branches = []
699 701 tags = []
700 702
701 703 if env.get('RC_SCM_DATA'):
702 704 extras = json.loads(env['RC_SCM_DATA'])
703 705 else:
704 706 ini_path = env.get('RC_INI_FILE')
705 707 if ini_path:
706 708 _get_ini_settings(ini_path)
707 709 # fallback method to read from TXN-ID stored data
708 710 extras = get_extras_from_txn_id(path, txn_id)
709 711
710 712 if not extras:
711 713 raise ValueError('SVN-PRE-COMMIT: Failed to extract context data in called extras for hook execution')
712 714
713 715 if extras.get('rc_internal_commit'):
714 716 # special marker for internal commit, we don't call hooks client
715 717 return 0
716 718
717 719 extras['hook_type'] = 'pre_commit'
718 720 extras['commit_ids'] = [txn_id]
719 721 extras['txn_id'] = txn_id
720 722 extras['new_refs'] = {
721 723 'total_commits': 1,
722 724 'branches': branches,
723 725 'bookmarks': [],
724 726 'tags': tags,
725 727 }
726 728
727 729 return _call_hook('pre_push', extras, SvnMessageWriter())
728 730
729 731
730 732 def svn_post_commit(repo_path, commit_data, env):
731 733 """
732 734 commit_data is path, rev, txn_id
733 735 """
734 736
735 737 if len(commit_data) == 3:
736 738 path, commit_id, txn_id = commit_data
737 739 elif len(commit_data) == 2:
738 740 log.error('Failed to extract txn_id from commit_data using legacy method. '
739 741 'Some functionality might be limited')
740 742 path, commit_id = commit_data
741 743 txn_id = None
742 744 else:
743 745 return 0
744 746
745 747 branches = []
746 748 tags = []
747 749
748 750 if env.get('RC_SCM_DATA'):
749 751 extras = json.loads(env['RC_SCM_DATA'])
750 752 else:
751 753 ini_path = env.get('RC_INI_FILE')
752 754 if ini_path:
753 755 _get_ini_settings(ini_path)
754 756 # fallback method to read from TXN-ID stored data
755 757 extras = get_extras_from_txn_id(path, txn_id)
756 758
757 759 if not extras and txn_id:
758 760 raise ValueError('SVN-POST-COMMIT: Failed to extract context data in called extras for hook execution')
759 761
760 762 if extras.get('rc_internal_commit'):
761 763 # special marker for internal commit, we don't call hooks client
762 764 return 0
763 765
764 766 extras['hook_type'] = 'post_commit'
765 767 extras['commit_ids'] = [commit_id]
766 768 extras['txn_id'] = txn_id
767 769 extras['new_refs'] = {
768 770 'branches': branches,
769 771 'bookmarks': [],
770 772 'tags': tags,
771 773 'total_commits': 1,
772 774 }
773 775
774 776 if 'repo_size' in extras['hooks']:
775 777 try:
776 778 _call_hook('repo_size', extras, SvnMessageWriter())
777 779 except Exception:
778 780 pass
779 781
780 782 return _call_hook('post_push', extras, SvnMessageWriter())
@@ -1,763 +1,765 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import io
19 19 import os
20 20 import platform
21 21 import sys
22 22 import locale
23 23 import logging
24 24 import uuid
25 25 import time
26 26 import wsgiref.util
27 27 import tempfile
28 28 import psutil
29 29
30 30 from itertools import chain
31 31
32 32 import msgpack
33 33 import configparser
34 34
35 35 from pyramid.config import Configurator
36 36 from pyramid.wsgi import wsgiapp
37 37 from pyramid.response import Response
38 38
39 39 from vcsserver.base import BytesEnvelope, BinaryEnvelope
40 40
41 41 from vcsserver.config.settings_maker import SettingsMaker
42 42
43 43 from vcsserver.tweens.request_wrapper import get_headers_call_context
44 44
45 45 from vcsserver import remote_wsgi, scm_app, hgpatches
46 46 from vcsserver.server import VcsServer
47 47 from vcsserver.git_lfs.app import GIT_LFS_CONTENT_TYPE, GIT_LFS_PROTO_PAT
48 48 from vcsserver.echo_stub import remote_wsgi as remote_wsgi_stub
49 49 from vcsserver.echo_stub.echo_app import EchoApp
50 from vcsserver.exceptions import HTTPRepoLocked, HTTPRepoBranchProtected
50 from vcsserver.exceptions import HTTPRepoLocked, HTTPRepoBranchProtected, HTTPClientNotSupported
51 51 from vcsserver.lib.exc_tracking import store_exception, format_exc
52 52 from vcsserver.lib.str_utils import safe_int
53 53 from vcsserver.lib.statsd_client import StatsdClient
54 54 from vcsserver.lib.ext_json import json
55 55 from vcsserver.lib.config_utils import configure_and_store_settings
56 56
57 57
58 58 strict_vcs = True
59 59
60 60 git_import_err = None
61 61 try:
62 62 from vcsserver.remote.git_remote import GitFactory, GitRemote
63 63 except ImportError as e:
64 64 GitFactory = None
65 65 GitRemote = None
66 66 git_import_err = e
67 67 if strict_vcs:
68 68 raise
69 69
70 70
71 71 hg_import_err = None
72 72 try:
73 73 from vcsserver.remote.hg_remote import MercurialFactory, HgRemote
74 74 except ImportError as e:
75 75 MercurialFactory = None
76 76 HgRemote = None
77 77 hg_import_err = e
78 78 if strict_vcs:
79 79 raise
80 80
81 81
82 82 svn_import_err = None
83 83 try:
84 84 from vcsserver.remote.svn_remote import SubversionFactory, SvnRemote
85 85 except ImportError as e:
86 86 SubversionFactory = None
87 87 SvnRemote = None
88 88 svn_import_err = e
89 89 if strict_vcs:
90 90 raise
91 91
92 92 log = logging.getLogger(__name__)
93 93
94 94 # due to Mercurial/glibc2.27 problems we need to detect if locale settings are
95 95 # causing problems and "fix" it in case they do and fallback to LC_ALL = C
96 96
97 97 try:
98 98 locale.setlocale(locale.LC_ALL, '')
99 99 except locale.Error as e:
100 100 log.error('LOCALE ERROR: failed to set LC_ALL, fallback to LC_ALL=C, org error: %s', e)
101 101 os.environ['LC_ALL'] = 'C'
102 102
103 103
104 104 def _is_request_chunked(environ):
105 105 stream = environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked'
106 106 return stream
107 107
108 108
109 109 def log_max_fd():
110 110 try:
111 111 maxfd = psutil.Process().rlimit(psutil.RLIMIT_NOFILE)[1]
112 112 log.info('Max file descriptors value: %s', maxfd)
113 113 except Exception:
114 114 pass
115 115
116 116
117 117 class VCS:
118 118 def __init__(self, locale_conf=None, cache_config=None):
119 119 self.locale = locale_conf
120 120 self.cache_config = cache_config
121 121 self._configure_locale()
122 122
123 123 log_max_fd()
124 124
125 125 if GitFactory and GitRemote:
126 126 git_factory = GitFactory()
127 127 self._git_remote = GitRemote(git_factory)
128 128 else:
129 129 log.error("Git client import failed: %s", git_import_err)
130 130
131 131 if MercurialFactory and HgRemote:
132 132 hg_factory = MercurialFactory()
133 133 self._hg_remote = HgRemote(hg_factory)
134 134 else:
135 135 log.error("Mercurial client import failed: %s", hg_import_err)
136 136
137 137 if SubversionFactory and SvnRemote:
138 138 svn_factory = SubversionFactory()
139 139
140 140 # hg factory is used for svn url validation
141 141 hg_factory = MercurialFactory()
142 142 self._svn_remote = SvnRemote(svn_factory, hg_factory=hg_factory)
143 143 else:
144 144 log.error("Subversion client import failed: %s", svn_import_err)
145 145
146 146 self._vcsserver = VcsServer()
147 147
148 148 def _configure_locale(self):
149 149 if self.locale:
150 150 log.info('Settings locale: `LC_ALL` to %s', self.locale)
151 151 else:
152 152 log.info('Configuring locale subsystem based on environment variables')
153 153 try:
154 154 # If self.locale is the empty string, then the locale
155 155 # module will use the environment variables. See the
156 156 # documentation of the package `locale`.
157 157 locale.setlocale(locale.LC_ALL, self.locale)
158 158
159 159 language_code, encoding = locale.getlocale()
160 160 log.info(
161 161 'Locale set to language code "%s" with encoding "%s".',
162 162 language_code, encoding)
163 163 except locale.Error:
164 164 log.exception('Cannot set locale, not configuring the locale system')
165 165
166 166
167 167 class WsgiProxy:
168 168 def __init__(self, wsgi):
169 169 self.wsgi = wsgi
170 170
171 171 def __call__(self, environ, start_response):
172 172 input_data = environ['wsgi.input'].read()
173 173 input_data = msgpack.unpackb(input_data)
174 174
175 175 error = None
176 176 try:
177 177 data, status, headers = self.wsgi.handle(
178 178 input_data['environment'], input_data['input_data'],
179 179 *input_data['args'], **input_data['kwargs'])
180 180 except Exception as e:
181 181 data, status, headers = [], None, None
182 182 error = {
183 183 'message': str(e),
184 184 '_vcs_kind': getattr(e, '_vcs_kind', None)
185 185 }
186 186
187 187 start_response(200, {})
188 188 return self._iterator(error, status, headers, data)
189 189
190 190 def _iterator(self, error, status, headers, data):
191 191 initial_data = [
192 192 error,
193 193 status,
194 194 headers,
195 195 ]
196 196
197 197 for d in chain(initial_data, data):
198 198 yield msgpack.packb(d)
199 199
200 200
201 201 def not_found(request):
202 202 return {'status': '404 NOT FOUND'}
203 203
204 204
205 205 class VCSViewPredicate:
206 206 def __init__(self, val, config):
207 207 self.remotes = val
208 208
209 209 def text(self):
210 210 return f'vcs view method = {list(self.remotes.keys())}'
211 211
212 212 phash = text
213 213
214 214 def __call__(self, context, request):
215 215 """
216 216 View predicate that returns true if given backend is supported by
217 217 defined remotes.
218 218 """
219 219 backend = request.matchdict.get('backend')
220 220 return backend in self.remotes
221 221
222 222
223 223 class HTTPApplication:
224 224 ALLOWED_EXCEPTIONS = ('KeyError', 'URLError')
225 225
226 226 remote_wsgi = remote_wsgi
227 227 _use_echo_app = False
228 228
229 229 def __init__(self, settings=None, global_config=None):
230 230
231 231 self.config = Configurator(settings=settings)
232 232 # Init our statsd at very start
233 233 self.config.registry.statsd = StatsdClient.statsd
234 234 self.config.registry.vcs_call_context = {}
235 235
236 236 self.global_config = global_config
237 237 self.config.include('vcsserver.lib.rc_cache')
238 238 self.config.include('vcsserver.lib.archive_cache')
239 239
240 240 settings_locale = settings.get('locale', '') or 'en_US.UTF-8'
241 241 vcs = VCS(locale_conf=settings_locale, cache_config=settings)
242 242 self._remotes = {
243 243 'hg': vcs._hg_remote,
244 244 'git': vcs._git_remote,
245 245 'svn': vcs._svn_remote,
246 246 'server': vcs._vcsserver,
247 247 }
248 248 if settings.get('dev.use_echo_app', 'false').lower() == 'true':
249 249 self._use_echo_app = True
250 250 log.warning("Using EchoApp for VCS operations.")
251 251 self.remote_wsgi = remote_wsgi_stub
252 252
253 253 configure_and_store_settings(global_config, settings)
254 254
255 255 self._configure()
256 256
257 257 def _configure(self):
258 258 self.config.add_renderer(name='msgpack', factory=self._msgpack_renderer_factory)
259 259
260 260 self.config.add_route('service', '/_service')
261 261 self.config.add_route('status', '/status')
262 262 self.config.add_route('hg_proxy', '/proxy/hg')
263 263 self.config.add_route('git_proxy', '/proxy/git')
264 264
265 265 # rpc methods
266 266 self.config.add_route('vcs', '/{backend}')
267 267
268 268 # streaming rpc remote methods
269 269 self.config.add_route('vcs_stream', '/{backend}/stream')
270 270
271 271 # vcs operations clone/push as streaming
272 272 self.config.add_route('stream_git', '/stream/git/*repo_name')
273 273 self.config.add_route('stream_hg', '/stream/hg/*repo_name')
274 274
275 275 self.config.add_view(self.status_view, route_name='status', renderer='json')
276 276 self.config.add_view(self.service_view, route_name='service', renderer='msgpack')
277 277
278 278 self.config.add_view(self.hg_proxy(), route_name='hg_proxy')
279 279 self.config.add_view(self.git_proxy(), route_name='git_proxy')
280 280 self.config.add_view(self.vcs_view, route_name='vcs', renderer='msgpack',
281 281 vcs_view=self._remotes)
282 282 self.config.add_view(self.vcs_stream_view, route_name='vcs_stream',
283 283 vcs_view=self._remotes)
284 284
285 285 self.config.add_view(self.hg_stream(), route_name='stream_hg')
286 286 self.config.add_view(self.git_stream(), route_name='stream_git')
287 287
288 288 self.config.add_view_predicate('vcs_view', VCSViewPredicate)
289 289
290 290 self.config.add_notfound_view(not_found, renderer='json')
291 291
292 292 self.config.add_view(self.handle_vcs_exception, context=Exception)
293 293
294 294 self.config.add_tween(
295 295 'vcsserver.tweens.request_wrapper.RequestWrapperTween',
296 296 )
297 297 self.config.add_request_method(
298 298 'vcsserver.lib.request_counter.get_request_counter',
299 299 'request_count')
300 300
301 301 def wsgi_app(self):
302 302 return self.config.make_wsgi_app()
303 303
304 304 def _vcs_view_params(self, request):
305 305 remote = self._remotes[request.matchdict['backend']]
306 306 payload = msgpack.unpackb(request.body, use_list=True)
307 307
308 308 method = payload.get('method')
309 309 params = payload['params']
310 310 wire = params.get('wire')
311 311 args = params.get('args')
312 312 kwargs = params.get('kwargs')
313 313 context_uid = None
314 314
315 315 request.registry.vcs_call_context = {
316 316 'method': method,
317 317 'repo_name': payload.get('_repo_name'),
318 318 }
319 319
320 320 if wire:
321 321 try:
322 322 wire['context'] = context_uid = uuid.UUID(wire['context'])
323 323 except KeyError:
324 324 pass
325 325 args.insert(0, wire)
326 326 repo_state_uid = wire.get('repo_state_uid') if wire else None
327 327
328 328 # NOTE(marcink): trading complexity for slight performance
329 329 if log.isEnabledFor(logging.DEBUG):
330 330 # also we SKIP printing out any of those methods args since they maybe excessive
331 331 just_args_methods = {
332 332 'commitctx': ('content', 'removed', 'updated'),
333 333 'commit': ('content', 'removed', 'updated')
334 334 }
335 335 if method in just_args_methods:
336 336 skip_args = just_args_methods[method]
337 337 call_args = ''
338 338 call_kwargs = {}
339 339 for k in kwargs:
340 340 if k in skip_args:
341 341 # replace our skip key with dummy
342 342 call_kwargs[k] = f'RemovedParam({k})'
343 343 else:
344 344 call_kwargs[k] = kwargs[k]
345 345 else:
346 346 call_args = args[1:]
347 347 call_kwargs = kwargs
348 348
349 349 log.debug('Method requested:`%s` with args:%s kwargs:%s context_uid: %s, repo_state_uid:%s',
350 350 method, call_args, call_kwargs, context_uid, repo_state_uid)
351 351
352 352 statsd = request.registry.statsd
353 353 if statsd:
354 354 statsd.incr(
355 355 'vcsserver_method_total', tags=[
356 356 f"method:{method}",
357 357 ])
358 358 return payload, remote, method, args, kwargs
359 359
360 360 def vcs_view(self, request):
361 361
362 362 payload, remote, method, args, kwargs = self._vcs_view_params(request)
363 363 payload_id = payload.get('id')
364 364
365 365 try:
366 366 resp = getattr(remote, method)(*args, **kwargs)
367 367 except Exception as e:
368 368 exc_info = list(sys.exc_info())
369 369 exc_type, exc_value, exc_traceback = exc_info
370 370
371 371 org_exc = getattr(e, '_org_exc', None)
372 372 org_exc_name = None
373 373 org_exc_tb = ''
374 374 if org_exc:
375 375 org_exc_name = org_exc.__class__.__name__
376 376 org_exc_tb = getattr(e, '_org_exc_tb', '')
377 377 # replace our "faked" exception with our org
378 378 exc_info[0] = org_exc.__class__
379 379 exc_info[1] = org_exc
380 380
381 381 should_store_exc = True
382 382 if org_exc:
383 383 def get_exc_fqn(_exc_obj):
384 384 module_name = getattr(org_exc.__class__, '__module__', 'UNKNOWN')
385 385 return module_name + '.' + org_exc_name
386 386
387 387 exc_fqn = get_exc_fqn(org_exc)
388 388
389 389 if exc_fqn in ['mercurial.error.RepoLookupError',
390 390 'vcsserver.exceptions.RefNotFoundException']:
391 391 should_store_exc = False
392 392
393 393 if should_store_exc:
394 394 store_exception(id(exc_info), exc_info, request_path=request.path)
395 395
396 396 tb_info = format_exc(exc_info)
397 397
398 398 type_ = e.__class__.__name__
399 399 if type_ not in self.ALLOWED_EXCEPTIONS:
400 400 type_ = None
401 401
402 402 resp = {
403 403 'id': payload_id,
404 404 'error': {
405 405 'message': str(e),
406 406 'traceback': tb_info,
407 407 'org_exc': org_exc_name,
408 408 'org_exc_tb': org_exc_tb,
409 409 'type': type_
410 410 }
411 411 }
412 412
413 413 try:
414 414 resp['error']['_vcs_kind'] = getattr(e, '_vcs_kind', None)
415 415 except AttributeError:
416 416 pass
417 417 else:
418 418 resp = {
419 419 'id': payload_id,
420 420 'result': resp
421 421 }
422 422 log.debug('Serving data for method %s', method)
423 423 return resp
424 424
425 425 def vcs_stream_view(self, request):
426 426 payload, remote, method, args, kwargs = self._vcs_view_params(request)
427 427 # this method has a stream: marker we remove it here
428 428 method = method.split('stream:')[-1]
429 429 chunk_size = safe_int(payload.get('chunk_size')) or 4096
430 430
431 431 resp = getattr(remote, method)(*args, **kwargs)
432 432
433 433 def get_chunked_data(method_resp):
434 434 stream = io.BytesIO(method_resp)
435 435 while 1:
436 436 chunk = stream.read(chunk_size)
437 437 if not chunk:
438 438 break
439 439 yield chunk
440 440
441 441 response = Response(app_iter=get_chunked_data(resp))
442 442 response.content_type = 'application/octet-stream'
443 443
444 444 return response
445 445
446 446 def status_view(self, request):
447 447 import vcsserver
448 448 _platform_id = platform.uname()[1] or 'instance'
449 449
450 450 return {
451 451 "status": "OK",
452 452 "vcsserver_version": vcsserver.get_version(),
453 453 "platform": _platform_id,
454 454 "pid": os.getpid(),
455 455 }
456 456
457 457 def service_view(self, request):
458 458 import vcsserver
459 459
460 460 payload = msgpack.unpackb(request.body, use_list=True)
461 461 server_config, app_config = {}, {}
462 462
463 463 try:
464 464 path = self.global_config['__file__']
465 465 config = configparser.RawConfigParser()
466 466
467 467 config.read(path)
468 468
469 469 if config.has_section('server:main'):
470 470 server_config = dict(config.items('server:main'))
471 471 if config.has_section('app:main'):
472 472 app_config = dict(config.items('app:main'))
473 473
474 474 except Exception:
475 475 log.exception('Failed to read .ini file for display')
476 476
477 477 environ = list(os.environ.items())
478 478
479 479 resp = {
480 480 'id': payload.get('id'),
481 481 'result': dict(
482 482 version=vcsserver.get_version(),
483 483 config=server_config,
484 484 app_config=app_config,
485 485 environ=environ,
486 486 payload=payload,
487 487 )
488 488 }
489 489 return resp
490 490
491 491 def _msgpack_renderer_factory(self, info):
492 492
493 493 def _render(value, system):
494 494 bin_type = False
495 495 res = value.get('result')
496 496 if isinstance(res, BytesEnvelope):
497 497 log.debug('Result is wrapped in BytesEnvelope type')
498 498 bin_type = True
499 499 elif isinstance(res, BinaryEnvelope):
500 500 log.debug('Result is wrapped in BinaryEnvelope type')
501 501 value['result'] = res.val
502 502 bin_type = True
503 503
504 504 request = system.get('request')
505 505 if request is not None:
506 506 response = request.response
507 507 ct = response.content_type
508 508 if ct == response.default_content_type:
509 509 response.content_type = 'application/x-msgpack'
510 510 if bin_type:
511 511 response.content_type = 'application/x-msgpack-bin'
512 512
513 513 return msgpack.packb(value, use_bin_type=bin_type)
514 514 return _render
515 515
516 516 def set_env_from_config(self, environ, config):
517 517 dict_conf = {}
518 518 try:
519 519 for elem in config:
520 520 if elem[0] == 'rhodecode':
521 521 dict_conf = json.loads(elem[2])
522 522 break
523 523 except Exception:
524 524 log.exception('Failed to fetch SCM CONFIG')
525 525 return
526 526
527 527 username = dict_conf.get('username')
528 528 if username:
529 529 environ['REMOTE_USER'] = username
530 530 # mercurial specific, some extension api rely on this
531 531 environ['HGUSER'] = username
532 532
533 533 ip = dict_conf.get('ip')
534 534 if ip:
535 535 environ['REMOTE_HOST'] = ip
536 536
537 537 if _is_request_chunked(environ):
538 538 # set the compatibility flag for webob
539 539 environ['wsgi.input_terminated'] = True
540 540
541 541 def hg_proxy(self):
542 542 @wsgiapp
543 543 def _hg_proxy(environ, start_response):
544 544 app = WsgiProxy(self.remote_wsgi.HgRemoteWsgi())
545 545 return app(environ, start_response)
546 546 return _hg_proxy
547 547
548 548 def git_proxy(self):
549 549 @wsgiapp
550 550 def _git_proxy(environ, start_response):
551 551 app = WsgiProxy(self.remote_wsgi.GitRemoteWsgi())
552 552 return app(environ, start_response)
553 553 return _git_proxy
554 554
555 555 def hg_stream(self):
556 556 if self._use_echo_app:
557 557 @wsgiapp
558 558 def _hg_stream(environ, start_response):
559 559 app = EchoApp('fake_path', 'fake_name', None)
560 560 return app(environ, start_response)
561 561 return _hg_stream
562 562 else:
563 563 @wsgiapp
564 564 def _hg_stream(environ, start_response):
565 565 log.debug('http-app: handling hg stream')
566 566 call_context = get_headers_call_context(environ)
567 567
568 568 repo_path = call_context['repo_path']
569 569 repo_name = call_context['repo_name']
570 570 config = call_context['repo_config']
571 571
572 572 app = scm_app.create_hg_wsgi_app(
573 573 repo_path, repo_name, config)
574 574
575 575 # Consistent path information for hgweb
576 576 environ['PATH_INFO'] = call_context['path_info']
577 577 environ['REPO_NAME'] = repo_name
578 578 self.set_env_from_config(environ, config)
579 579
580 580 log.debug('http-app: starting app handler '
581 581 'with %s and process request', app)
582 582 return app(environ, ResponseFilter(start_response))
583 583 return _hg_stream
584 584
585 585 def git_stream(self):
586 586 if self._use_echo_app:
587 587 @wsgiapp
588 588 def _git_stream(environ, start_response):
589 589 app = EchoApp('fake_path', 'fake_name', None)
590 590 return app(environ, start_response)
591 591 return _git_stream
592 592 else:
593 593 @wsgiapp
594 594 def _git_stream(environ, start_response):
595 595 log.debug('http-app: handling git stream')
596 596
597 597 call_context = get_headers_call_context(environ)
598 598
599 599 repo_path = call_context['repo_path']
600 600 repo_name = call_context['repo_name']
601 601 config = call_context['repo_config']
602 602
603 603 environ['PATH_INFO'] = call_context['path_info']
604 604 self.set_env_from_config(environ, config)
605 605
606 606 content_type = environ.get('CONTENT_TYPE', '')
607 607
608 608 path = environ['PATH_INFO']
609 609 is_lfs_request = GIT_LFS_CONTENT_TYPE in content_type
610 610 log.debug(
611 611 'LFS: Detecting if request `%s` is LFS server path based '
612 612 'on content type:`%s`, is_lfs:%s',
613 613 path, content_type, is_lfs_request)
614 614
615 615 if not is_lfs_request:
616 616 # fallback detection by path
617 617 if GIT_LFS_PROTO_PAT.match(path):
618 618 is_lfs_request = True
619 619 log.debug(
620 620 'LFS: fallback detection by path of: `%s`, is_lfs:%s',
621 621 path, is_lfs_request)
622 622
623 623 if is_lfs_request:
624 624 app = scm_app.create_git_lfs_wsgi_app(
625 625 repo_path, repo_name, config)
626 626 else:
627 627 app = scm_app.create_git_wsgi_app(
628 628 repo_path, repo_name, config)
629 629
630 630 log.debug('http-app: starting app handler '
631 631 'with %s and process request', app)
632 632
633 633 return app(environ, start_response)
634 634
635 635 return _git_stream
636 636
637 637 def handle_vcs_exception(self, exception, request):
638 _vcs_kind = getattr(exception, '_vcs_kind', '')
639 638
640 if _vcs_kind == 'repo_locked':
641 headers_call_context = get_headers_call_context(request.environ)
642 status_code = safe_int(headers_call_context['locked_status_code'])
639 match _vcs_kind := getattr(exception, '_vcs_kind', ''):
640 case 'repo_locked':
641 headers_call_context = get_headers_call_context(request.environ)
642 status_code = safe_int(headers_call_context['locked_status_code'])
643 643
644 return HTTPRepoLocked(
645 title=str(exception), status_code=status_code, headers=[('X-Rc-Locked', '1')])
646
647 elif _vcs_kind == 'repo_branch_protected':
648 # Get custom repo-branch-protected status code if present.
649 return HTTPRepoBranchProtected(
650 title=str(exception), headers=[('X-Rc-Branch-Protection', '1')])
644 return HTTPRepoLocked(
645 title=str(exception), status_code=status_code, headers=[('X-Rc-Locked', '1')])
646 case 'repo_branch_protected':
647 # Get custom repo-branch-protected status code if present.
648 return HTTPRepoBranchProtected(
649 title=str(exception), headers=[('X-Rc-Branch-Protection', '1')])
650 case 'client_not_supported':
651 return HTTPClientNotSupported(
652 title=str(exception), headers=[('X-Rc-Client-Not-Supported', '1')])
651 653
652 654 exc_info = request.exc_info
653 655 store_exception(id(exc_info), exc_info)
654 656
655 657 traceback_info = 'unavailable'
656 658 if request.exc_info:
657 659 traceback_info = format_exc(request.exc_info)
658 660
659 661 log.error(
660 662 'error occurred handling this request for path: %s, \n%s',
661 663 request.path, traceback_info)
662 664
663 665 statsd = request.registry.statsd
664 666 if statsd:
665 667 exc_type = f"{exception.__class__.__module__}.{exception.__class__.__name__}"
666 668 statsd.incr('vcsserver_exception_total',
667 669 tags=[f"type:{exc_type}"])
668 670 raise exception
669 671
670 672
671 673 class ResponseFilter:
672 674
673 675 def __init__(self, start_response):
674 676 self._start_response = start_response
675 677
676 678 def __call__(self, status, response_headers, exc_info=None):
677 679 headers = tuple(
678 680 (h, v) for h, v in response_headers
679 681 if not wsgiref.util.is_hop_by_hop(h))
680 682 return self._start_response(status, headers, exc_info)
681 683
682 684
683 685 def sanitize_settings_and_apply_defaults(global_config, settings):
684 686 _global_settings_maker = SettingsMaker(global_config)
685 687 settings_maker = SettingsMaker(settings)
686 688
687 689 settings_maker.make_setting('logging.autoconfigure', False, parser='bool')
688 690
689 691 logging_conf = os.path.join(os.path.dirname(global_config.get('__file__')), 'logging.ini')
690 692 settings_maker.enable_logging(logging_conf)
691 693
692 694 # Default includes, possible to change as a user
693 695 pyramid_includes = settings_maker.make_setting('pyramid.includes', [], parser='list:newline')
694 696 log.debug("Using the following pyramid.includes: %s", pyramid_includes)
695 697
696 698 settings_maker.make_setting('__file__', global_config.get('__file__'))
697 699
698 700 settings_maker.make_setting('pyramid.default_locale_name', 'en')
699 701 settings_maker.make_setting('locale', 'en_US.UTF-8')
700 702
701 703 settings_maker.make_setting(
702 704 'core.binary_dir', '/usr/local/bin/rhodecode_bin/vcs_bin',
703 705 default_when_empty=True, parser='string:noquote')
704 706
705 707 settings_maker.make_setting('vcs.svn.redis_conn', 'redis://redis:6379/0')
706 708
707 709 temp_store = tempfile.gettempdir()
708 710 default_cache_dir = os.path.join(temp_store, 'rc_cache')
709 711 # save default, cache dir, and use it for all backends later.
710 712 default_cache_dir = settings_maker.make_setting(
711 713 'cache_dir',
712 714 default=default_cache_dir, default_when_empty=True,
713 715 parser='dir:ensured')
714 716
715 717 # exception store cache
716 718 settings_maker.make_setting(
717 719 'exception_tracker.store_path',
718 720 default=os.path.join(default_cache_dir, 'exc_store'), default_when_empty=True,
719 721 parser='dir:ensured'
720 722 )
721 723
722 724 # repo_object cache defaults
723 725 settings_maker.make_setting(
724 726 'rc_cache.repo_object.backend',
725 727 default='dogpile.cache.rc.file_namespace',
726 728 parser='string')
727 729 settings_maker.make_setting(
728 730 'rc_cache.repo_object.expiration_time',
729 731 default=30 * 24 * 60 * 60, # 30days
730 732 parser='int')
731 733 settings_maker.make_setting(
732 734 'rc_cache.repo_object.arguments.filename',
733 735 default=os.path.join(default_cache_dir, 'vcsserver_cache_repo_object.db'),
734 736 parser='string')
735 737
736 738 # statsd
737 739 settings_maker.make_setting('statsd.enabled', False, parser='bool')
738 740 settings_maker.make_setting('statsd.statsd_host', 'statsd-exporter', parser='string')
739 741 settings_maker.make_setting('statsd.statsd_port', 9125, parser='int')
740 742 settings_maker.make_setting('statsd.statsd_prefix', '')
741 743 settings_maker.make_setting('statsd.statsd_ipv6', False, parser='bool')
742 744
743 745 settings_maker.env_expand()
744 746
745 747
746 748 def main(global_config, **settings):
747 749 start_time = time.time()
748 750 log.info('Pyramid app config starting')
749 751
750 752 if MercurialFactory:
751 753 hgpatches.patch_largefiles_capabilities()
752 754 hgpatches.patch_subrepo_type_mapping()
753 755
754 756 # Fill in and sanitize the defaults & do ENV expansion
755 757 sanitize_settings_and_apply_defaults(global_config, settings)
756 758
757 759 # init and bootstrap StatsdClient
758 760 StatsdClient.setup(settings)
759 761
760 762 pyramid_app = HTTPApplication(settings=settings, global_config=global_config).wsgi_app()
761 763 total_time = time.time() - start_time
762 764 log.info('Pyramid app created and configured in %.2fs', total_time)
763 765 return pyramid_app
@@ -1,563 +1,564 b''
1 1 """
2 2 Module provides a class allowing to wrap communication over subprocess.Popen
3 3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 4 stream processor exposing the output data as an iterator fitting to be a
5 5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6 6
7 7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
8 8
9 9 This file is part of git_http_backend.py Project.
10 10
11 11 git_http_backend.py Project is free software: you can redistribute it and/or
12 12 modify it under the terms of the GNU Lesser General Public License as
13 13 published by the Free Software Foundation, either version 2.1 of the License,
14 14 or (at your option) any later version.
15 15
16 16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 19 GNU Lesser General Public License for more details.
20 20
21 21 You should have received a copy of the GNU Lesser General Public License
22 22 along with git_http_backend.py Project.
23 23 If not, see <http://www.gnu.org/licenses/>.
24 24 """
25 25 import os
26 26 import collections
27 27 import logging
28 28 import subprocess
29 29 import threading
30 30
31 31 from vcsserver.lib.str_utils import safe_str
32 32
33 33 log = logging.getLogger(__name__)
34 34
35 35
36 36 class StreamFeeder(threading.Thread):
37 37 """
38 38 Normal writing into pipe-like is blocking once the buffer is filled.
39 39 This thread allows a thread to seep data from a file-like into a pipe
40 40 without blocking the main thread.
41 41 We close inpipe once the end of the source stream is reached.
42 42 """
43 43
44 44 def __init__(self, source):
45 45 super().__init__()
46 46 self.daemon = True
47 47 filelike = False
48 48 self.bytes = b''
49 49 if type(source) in (str, bytes, bytearray): # string-like
50 50 self.bytes = bytes(source)
51 51 else: # can be either file pointer or file-like
52 52 if isinstance(source, int): # file pointer it is
53 53 # converting file descriptor (int) stdin into file-like
54 54 source = os.fdopen(source, 'rb', 16384)
55 55 # let's see if source is file-like by now
56 56 filelike = hasattr(source, 'read')
57 57 if not filelike and not self.bytes:
58 58 raise TypeError("StreamFeeder's source object must be a readable "
59 59 "file-like, a file descriptor, or a string-like.")
60 60 self.source = source
61 61 self.readiface, self.writeiface = os.pipe()
62 62
63 63 def run(self):
64 64 writer = self.writeiface
65 65 try:
66 66 if self.bytes:
67 67 os.write(writer, self.bytes)
68 68 else:
69 69 s = self.source
70 70
71 71 while 1:
72 72 _bytes = s.read(4096)
73 73 if not _bytes:
74 74 break
75 75 os.write(writer, _bytes)
76 76
77 77 finally:
78 78 os.close(writer)
79 79
80 80 @property
81 81 def output(self):
82 82 return self.readiface
83 83
84 84
85 85 class InputStreamChunker(threading.Thread):
86 86 def __init__(self, source, target, buffer_size, chunk_size):
87 87
88 88 super().__init__()
89 89
90 90 self.daemon = True # die die die.
91 91
92 92 self.source = source
93 93 self.target = target
94 94 self.chunk_count_max = int(buffer_size / chunk_size) + 1
95 95 self.chunk_size = chunk_size
96 96
97 97 self.data_added = threading.Event()
98 98 self.data_added.clear()
99 99
100 100 self.keep_reading = threading.Event()
101 101 self.keep_reading.set()
102 102
103 103 self.EOF = threading.Event()
104 104 self.EOF.clear()
105 105
106 106 self.go = threading.Event()
107 107 self.go.set()
108 108
109 109 def stop(self):
110 110 self.go.clear()
111 111 self.EOF.set()
112 112 try:
113 113 # this is not proper, but is done to force the reader thread let
114 114 # go of the input because, if successful, .close() will send EOF
115 115 # down the pipe.
116 116 self.source.close()
117 117 except Exception:
118 118 pass
119 119
120 120 def run(self):
121 121 s = self.source
122 122 t = self.target
123 123 cs = self.chunk_size
124 124 chunk_count_max = self.chunk_count_max
125 125 keep_reading = self.keep_reading
126 126 da = self.data_added
127 127 go = self.go
128 128
129 129 try:
130 130 b = s.read(cs)
131 131 except ValueError:
132 132 b = ''
133 133
134 134 timeout_input = 20
135 135 while b and go.is_set():
136 136 if len(t) > chunk_count_max:
137 137 keep_reading.clear()
138 138 keep_reading.wait(timeout_input)
139 139 if len(t) > chunk_count_max + timeout_input:
140 140 log.error("Timed out while waiting for input from subprocess.")
141 141 os._exit(-1) # this will cause the worker to recycle itself
142 142
143 143 t.append(b)
144 144 da.set()
145 145
146 146 try:
147 147 b = s.read(cs)
148 148 except ValueError: # probably "I/O operation on closed file"
149 149 b = ''
150 150
151 151 self.EOF.set()
152 152 da.set() # for cases when done but there was no input.
153 153
154 154
155 155 class BufferedGenerator:
156 156 """
157 157 Class behaves as a non-blocking, buffered pipe reader.
158 158 Reads chunks of data (through a thread)
159 159 from a blocking pipe, and attaches these to an array (Deque) of chunks.
160 160 Reading is halted in the thread when max chunks is internally buffered.
161 161 The .next() may operate in blocking or non-blocking fashion by yielding
162 162 '' if no data is ready
163 163 to be sent or by not returning until there is some data to send
164 164 When we get EOF from underlying source pipe we raise the marker to raise
165 165 StopIteration after the last chunk of data is yielded.
166 166 """
167 167
168 168 def __init__(self, name, source, buffer_size=65536, chunk_size=4096,
169 169 starting_values=None, bottomless=False):
170 170 starting_values = starting_values or []
171 171 self.name = name
172 172 self.buffer_size = buffer_size
173 173 self.chunk_size = chunk_size
174 174
175 175 if bottomless:
176 176 maxlen = int(buffer_size / chunk_size)
177 177 else:
178 178 maxlen = None
179 179
180 180 self.data_queue = collections.deque(starting_values, maxlen)
181 181 self.worker = InputStreamChunker(source, self.data_queue, buffer_size, chunk_size)
182 182 if starting_values:
183 183 self.worker.data_added.set()
184 184 self.worker.start()
185 185
186 186 ####################
187 187 # Generator's methods
188 188 ####################
189 189 def __str__(self):
190 190 return f'BufferedGenerator(name={self.name} chunk: {self.chunk_size} on buffer: {self.buffer_size})'
191 191
192 192 def __iter__(self):
193 193 return self
194 194
195 195 def __next__(self):
196 196
197 197 while not self.length and not self.worker.EOF.is_set():
198 198 self.worker.data_added.clear()
199 199 self.worker.data_added.wait(0.2)
200 200
201 201 if self.length:
202 202 self.worker.keep_reading.set()
203 203 return bytes(self.data_queue.popleft())
204 204 elif self.worker.EOF.is_set():
205 205 raise StopIteration
206 206
207 207 def throw(self, exc_type, value=None, traceback=None):
208 208 if not self.worker.EOF.is_set():
209 209 raise exc_type(value)
210 210
211 211 def start(self):
212 212 self.worker.start()
213 213
214 214 def stop(self):
215 215 self.worker.stop()
216 216
217 217 def close(self):
218 218 try:
219 219 self.worker.stop()
220 220 self.throw(GeneratorExit)
221 221 except (GeneratorExit, StopIteration):
222 222 pass
223 223
224 224 ####################
225 225 # Threaded reader's infrastructure.
226 226 ####################
227 227 @property
228 228 def input(self):
229 229 return self.worker.w
230 230
231 231 @property
232 232 def data_added_event(self):
233 233 return self.worker.data_added
234 234
235 235 @property
236 236 def data_added(self):
237 237 return self.worker.data_added.is_set()
238 238
239 239 @property
240 240 def reading_paused(self):
241 241 return not self.worker.keep_reading.is_set()
242 242
243 243 @property
244 244 def done_reading_event(self):
245 245 """
246 246 Done_reding does not mean that the iterator's buffer is empty.
247 247 Iterator might have done reading from underlying source, but the read
248 248 chunks might still be available for serving through .next() method.
249 249
250 250 :returns: An Event class instance.
251 251 """
252 252 return self.worker.EOF
253 253
254 254 @property
255 255 def done_reading(self):
256 256 """
257 257 Done_reading does not mean that the iterator's buffer is empty.
258 258 Iterator might have done reading from underlying source, but the read
259 259 chunks might still be available for serving through .next() method.
260 260
261 261 :returns: An Bool value.
262 262 """
263 263 return self.worker.EOF.is_set()
264 264
265 265 @property
266 266 def length(self):
267 267 """
268 268 returns int.
269 269
270 270 This is the length of the queue of chunks, not the length of
271 271 the combined contents in those chunks.
272 272
273 273 __len__() cannot be meaningfully implemented because this
274 274 reader is just flying through a bottomless pit content and
275 275 can only know the length of what it already saw.
276 276
277 277 If __len__() on WSGI server per PEP 3333 returns a value,
278 278 the response's length will be set to that. In order not to
279 279 confuse WSGI PEP3333 servers, we will not implement __len__
280 280 at all.
281 281 """
282 282 return len(self.data_queue)
283 283
284 284 def prepend(self, x):
285 285 self.data_queue.appendleft(x)
286 286
287 287 def append(self, x):
288 288 self.data_queue.append(x)
289 289
290 290 def extend(self, o):
291 291 self.data_queue.extend(o)
292 292
293 293 def __getitem__(self, i):
294 294 return self.data_queue[i]
295 295
296 296
297 297 class SubprocessIOChunker:
298 298 """
299 299 Processor class wrapping handling of subprocess IO.
300 300
301 301 .. important::
302 302
303 303 Watch out for the method `__del__` on this class. If this object
304 304 is deleted, it will kill the subprocess, so avoid to
305 305 return the `output` attribute or usage of it like in the following
306 306 example::
307 307
308 308 # `args` expected to run a program that produces a lot of output
309 309 output = ''.join(SubprocessIOChunker(
310 310 args, shell=False, inputstream=inputstream, env=environ).output)
311 311
312 312 # `output` will not contain all the data, because the __del__ method
313 313 # has already killed the subprocess in this case before all output
314 314 # has been consumed.
315 315
316 316
317 317
318 318 In a way, this is a "communicate()" replacement with a twist.
319 319
320 320 - We are multithreaded. Writing in and reading out, err are all sep threads.
321 321 - We support concurrent (in and out) stream processing.
322 322 - The output is not a stream. It's a queue of read string (bytes, not str)
323 323 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
324 324 - We are non-blocking in more respects than communicate()
325 325 (reading from subprocess out pauses when internal buffer is full, but
326 326 does not block the parent calling code. On the flip side, reading from
327 327 slow-yielding subprocess may block the iteration until data shows up. This
328 328 does not block the parallel inpipe reading occurring parallel thread.)
329 329
330 330 The purpose of the object is to allow us to wrap subprocess interactions into
331 331 an iterable that can be passed to a WSGI server as the application's return
332 332 value. Because of stream-processing-ability, WSGI does not have to read ALL
333 333 of the subprocess's output and buffer it, before handing it to WSGI server for
334 334 HTTP response. Instead, the class initializer reads just a bit of the stream
335 335 to figure out if error occurred or likely to occur and if not, just hands the
336 336 further iteration over subprocess output to the server for completion of HTTP
337 337 response.
338 338
339 339 The real or perceived subprocess error is trapped and raised as one of
340 340 OSError family of exceptions
341 341
342 342 Example usage:
343 343 # try:
344 344 # answer = SubprocessIOChunker(
345 345 # cmd,
346 346 # input,
347 347 # buffer_size = 65536,
348 348 # chunk_size = 4096
349 349 # )
350 350 # except (OSError) as e:
351 351 # print str(e)
352 352 # raise e
353 353 #
354 354 # return answer
355 355
356 356
357 357 """
358 358
359 359 # TODO: johbo: This is used to make sure that the open end of the PIPE
360 360 # is closed in the end. It would be way better to wrap this into an
361 361 # object, so that it is closed automatically once it is consumed or
362 362 # something similar.
363 363 _close_input_fd = None
364 364
365 365 _closed = False
366 366 _stdout = None
367 367 _stderr = None
368 368
369 369 def __init__(self, cmd, input_stream=None, buffer_size=65536,
370 370 chunk_size=4096, starting_values=None, fail_on_stderr=True,
371 371 fail_on_return_code=True, **kwargs):
372 372 """
373 373 Initializes SubprocessIOChunker
374 374
375 375 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
376 376 :param input_stream: (Default: None) A file-like, string, or file pointer.
377 377 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
378 378 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
379 379 :param starting_values: (Default: []) An array of strings to put in front of output que.
380 380 :param fail_on_stderr: (Default: True) Whether to raise an exception in
381 381 case something is written to stderr.
382 382 :param fail_on_return_code: (Default: True) Whether to raise an
383 383 exception if the return code is not 0.
384 384 """
385 385
386 386 kwargs['shell'] = kwargs.get('shell', True)
387 387
388 388 starting_values = starting_values or []
389 389 if input_stream:
390 390 input_streamer = StreamFeeder(input_stream)
391 391 input_streamer.start()
392 392 input_stream = input_streamer.output
393 393 self._close_input_fd = input_stream
394 394
395 395 self._fail_on_stderr = fail_on_stderr
396 396 self._fail_on_return_code = fail_on_return_code
397 397 self.cmd = cmd
398 398
399 399 _p = subprocess.Popen(cmd, bufsize=-1, stdin=input_stream, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
400 400 **kwargs)
401 401 self.process = _p
402 402
403 403 bg_out = BufferedGenerator('stdout', _p.stdout, buffer_size, chunk_size, starting_values)
404 404 bg_err = BufferedGenerator('stderr', _p.stderr, 10240, 1, bottomless=True)
405 405
406 406 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
407 407 # doing this until we reach either end of file, or end of buffer.
408 408 bg_out.data_added_event.wait(0.2)
409 409 bg_out.data_added_event.clear()
410 410
411 411 # at this point it's still ambiguous if we are done reading or just full buffer.
412 412 # Either way, if error (returned by ended process, or implied based on
413 413 # presence of stuff in stderr output) we error out.
414 414 # Else, we are happy.
415 415 return_code = _p.poll()
416 416 ret_code_ok = return_code in [None, 0]
417 417 ret_code_fail = return_code is not None and return_code != 0
418
418 419 if (
419 420 (ret_code_fail and fail_on_return_code) or
420 421 (ret_code_ok and fail_on_stderr and bg_err.length)
421 422 ):
422 423
423 424 try:
424 425 _p.terminate()
425 426 except Exception:
426 427 pass
427 428
428 429 bg_out.stop()
429 430 out = b''.join(bg_out)
430 431 self._stdout = out
431 432
432 433 bg_err.stop()
433 434 err = b''.join(bg_err)
434 435 self._stderr = err
435 436
436 437 # code from https://github.com/schacon/grack/pull/7
437 438 if err.strip() == b'fatal: The remote end hung up unexpectedly' and out.startswith(b'0034shallow '):
438 439 bg_out = iter([out])
439 440 _p = None
440 441 elif err and fail_on_stderr:
441 442 text_err = err.decode()
442 443 raise OSError(
443 444 f"Subprocess exited due to an error:\n{text_err}")
444 445
445 446 if ret_code_fail and fail_on_return_code:
446 447 text_err = err.decode()
447 448 if not err:
448 449 # maybe get empty stderr, try stdout instead
449 450 # in many cases git reports the errors on stdout too
450 451 text_err = out.decode()
451 452 raise OSError(
452 453 f"Subprocess exited with non 0 ret code:{return_code}: stderr:{text_err}")
453 454
454 455 self.stdout = bg_out
455 456 self.stderr = bg_err
456 457 self.inputstream = input_stream
457 458
458 459 def __str__(self):
459 460 proc = getattr(self, 'process', 'NO_PROCESS')
460 461 return f'SubprocessIOChunker: {proc}'
461 462
462 463 def __iter__(self):
463 464 return self
464 465
465 466 def __next__(self):
466 467 # Note: mikhail: We need to be sure that we are checking the return
467 468 # code after the stdout stream is closed. Some processes, e.g. git
468 469 # are doing some magic in between closing stdout and terminating the
469 470 # process and, as a result, we are not getting return code on "slow"
470 471 # systems.
471 472 result = None
472 473 stop_iteration = None
473 474 try:
474 475 result = next(self.stdout)
475 476 except StopIteration as e:
476 477 stop_iteration = e
477 478
478 479 if self.process:
479 480 return_code = self.process.poll()
480 481 ret_code_fail = return_code is not None and return_code != 0
481 482 if ret_code_fail and self._fail_on_return_code:
482 483 self.stop_streams()
483 484 err = self.get_stderr()
484 485 raise OSError(
485 486 f"Subprocess exited (exit_code:{return_code}) due to an error during iteration:\n{err}")
486 487
487 488 if stop_iteration:
488 489 raise stop_iteration
489 490 return result
490 491
491 492 def throw(self, exc_type, value=None, traceback=None):
492 493 if self.stdout.length or not self.stdout.done_reading:
493 494 raise exc_type(value)
494 495
495 496 def close(self):
496 497 if self._closed:
497 498 return
498 499
499 500 try:
500 501 self.process.terminate()
501 502 except Exception:
502 503 pass
503 504 if self._close_input_fd:
504 505 os.close(self._close_input_fd)
505 506 try:
506 507 self.stdout.close()
507 508 except Exception:
508 509 pass
509 510 try:
510 511 self.stderr.close()
511 512 except Exception:
512 513 pass
513 514 try:
514 515 os.close(self.inputstream)
515 516 except Exception:
516 517 pass
517 518
518 519 self._closed = True
519 520
520 521 def stop_streams(self):
521 522 getattr(self.stdout, 'stop', lambda: None)()
522 523 getattr(self.stderr, 'stop', lambda: None)()
523 524
524 525 def get_stdout(self):
525 526 if self._stdout:
526 527 return self._stdout
527 528 else:
528 529 return b''.join(self.stdout)
529 530
530 531 def get_stderr(self):
531 532 if self._stderr:
532 533 return self._stderr
533 534 else:
534 535 return b''.join(self.stderr)
535 536
536 537
537 538 def run_command(arguments, env=None):
538 539 """
539 540 Run the specified command and return the stdout.
540 541
541 542 :param arguments: sequence of program arguments (including the program name)
542 543 :type arguments: list[str]
543 544 """
544 545
545 546 cmd = arguments
546 547 log.debug('Running subprocessio command %s', cmd)
547 548 proc = None
548 549 try:
549 550 _opts = {'shell': False, 'fail_on_stderr': False}
550 551 if env:
551 552 _opts.update({'env': env})
552 553 proc = SubprocessIOChunker(cmd, **_opts)
553 554 return b''.join(proc), b''.join(proc.stderr)
554 555 except OSError as err:
555 556 cmd = ' '.join(map(safe_str, cmd)) # human friendly CMD
556 557 tb_err = ("Couldn't run subprocessio command (%s).\n"
557 558 "Original error was:%s\n" % (cmd, err))
558 559 log.exception(tb_err)
559 560 raise Exception(tb_err)
560 561 finally:
561 562 if proc:
562 563 proc.close()
563 564
General Comments 0
You need to be logged in to leave comments. Login now