##// END OF EJS Templates
feat(git): added update_refs method that validate refs before settings a value
feat(git): added update_refs method that validate refs before settings a value

File last commit:

r1190:cb9d8354 default
r1191:3687e783 default
Show More
scm_app.py
255 lines | 8.5 KiB | text/x-python | PythonLexer
# RhodeCode VCSServer provides access to different vcs backends via network.
# Copyright (C) 2014-2023 RhodeCode GmbH
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
import os
import logging
import itertools
import mercurial
import mercurial.error
import mercurial.wireprotoserver
import mercurial.hgweb.common
import mercurial.hgweb.hgweb_mod
import webob.exc
from vcsserver import pygrack, exceptions, settings, git_lfs
from vcsserver.str_utils import ascii_bytes, safe_bytes
log = logging.getLogger(__name__)
# propagated from mercurial documentation
HG_UI_SECTIONS = [
'alias', 'auth', 'decode/encode', 'defaults', 'diff', 'email', 'extensions',
'format', 'merge-patterns', 'merge-tools', 'hooks', 'http_proxy', 'smtp',
'patch', 'paths', 'profiling', 'server', 'trusted', 'ui', 'web',
]
class HgWeb(mercurial.hgweb.hgweb_mod.hgweb):
"""Extension of hgweb that simplifies some functions."""
def _get_view(self, repo):
"""Views are not supported."""
return repo
def loadsubweb(self):
"""The result is only used in the templater method which is not used."""
return None
def run(self):
"""Unused function so raise an exception if accidentally called."""
raise NotImplementedError
def templater(self, req):
"""Function used in an unreachable code path.
This code is unreachable because we guarantee that the HTTP request,
corresponds to a Mercurial command. See the is_hg method. So, we are
never going to get a user-visible url.
"""
raise NotImplementedError
def archivelist(self, nodeid):
"""Unused function so raise an exception if accidentally called."""
raise NotImplementedError
def __call__(self, environ, start_response):
"""Run the WSGI application.
This may be called by multiple threads.
"""
from mercurial.hgweb import request as requestmod
req = requestmod.parserequestfromenv(environ)
res = requestmod.wsgiresponse(req, start_response)
gen = self.run_wsgi(req, res)
first_chunk = None
try:
data = next(gen)
def first_chunk():
yield data
except StopIteration:
pass
if first_chunk:
return itertools.chain(first_chunk(), gen)
return gen
def _runwsgi(self, req, res, repo):
cmd = req.qsparams.get(b'cmd', '')
if not mercurial.wireprotoserver.iscmd(cmd):
# NOTE(marcink): for unsupported commands, we return bad request
# internally from HG
log.warning('cmd: `%s` is not supported by the mercurial wireprotocol v1', cmd)
from mercurial.hgweb.common import statusmessage
res.status = statusmessage(mercurial.hgweb.common.HTTP_BAD_REQUEST)
res.setbodybytes(b'')
return res.sendresponse()
return super()._runwsgi(req, res, repo)
def sanitize_hg_ui(baseui):
# NOTE(marcink): since python3 hgsubversion is deprecated.
# From old installations we might still have this set enabled
# we explicitly remove this now here to make sure it wont propagate further
if baseui.config(b'extensions', b'hgsubversion') is not None:
for cfg in (baseui._ocfg, baseui._tcfg, baseui._ucfg):
if b'extensions' in cfg:
if b'hgsubversion' in cfg[b'extensions']:
del cfg[b'extensions'][b'hgsubversion']
def make_hg_ui_from_config(repo_config):
baseui = mercurial.ui.ui()
# clean the baseui object
baseui._ocfg = mercurial.config.config()
baseui._ucfg = mercurial.config.config()
baseui._tcfg = mercurial.config.config()
for section, option, value in repo_config:
baseui.setconfig(
ascii_bytes(section, allow_bytes=True),
ascii_bytes(option, allow_bytes=True),
ascii_bytes(value, allow_bytes=True))
# make our hgweb quiet so it doesn't print output
baseui.setconfig(b'ui', b'quiet', b'true')
return baseui
def update_hg_ui_from_hgrc(baseui, repo_path):
path = os.path.join(repo_path, '.hg', 'hgrc')
if not os.path.isfile(path):
log.debug('hgrc file is not present at %s, skipping...', path)
return
log.debug('reading hgrc from %s', path)
cfg = mercurial.config.config()
cfg.read(ascii_bytes(path))
for section in HG_UI_SECTIONS:
for k, v in cfg.items(section):
log.debug('settings ui from file: [%s] %s=%s', section, k, v)
baseui.setconfig(
ascii_bytes(section, allow_bytes=True),
ascii_bytes(k, allow_bytes=True),
ascii_bytes(v, allow_bytes=True))
def create_hg_wsgi_app(repo_path, repo_name, config):
"""
Prepares a WSGI application to handle Mercurial requests.
:param config: is a list of 3-item tuples representing a ConfigObject
(it is the serialized version of the config object).
"""
log.debug("Creating Mercurial WSGI application")
baseui = make_hg_ui_from_config(config)
update_hg_ui_from_hgrc(baseui, repo_path)
sanitize_hg_ui(baseui)
try:
return HgWeb(safe_bytes(repo_path), name=safe_bytes(repo_name), baseui=baseui)
except mercurial.error.RequirementError as e:
raise exceptions.RequirementException(e)(e)
class GitHandler:
"""
Handler for Git operations like push/pull etc
"""
def __init__(self, repo_location, repo_name, git_path, update_server_info,
extras):
if not os.path.isdir(repo_location):
raise OSError(repo_location)
self.content_path = repo_location
self.repo_name = repo_name
self.repo_location = repo_location
self.extras = extras
self.git_path = git_path
self.update_server_info = update_server_info
def __call__(self, environ, start_response):
app = webob.exc.HTTPNotFound()
candidate_paths = (
self.content_path, os.path.join(self.content_path, '.git'))
for content_path in candidate_paths:
try:
app = pygrack.GitRepository(
self.repo_name, content_path, self.git_path,
self.update_server_info, self.extras)
break
except OSError:
continue
return app(environ, start_response)
def create_git_wsgi_app(repo_path, repo_name, config):
"""
Creates a WSGI application to handle Git requests.
:param config: is a dictionary holding the extras.
"""
git_path = settings.GIT_EXECUTABLE
update_server_info = config.pop('git_update_server_info')
app = GitHandler(
repo_path, repo_name, git_path, update_server_info, config)
return app
class GitLFSHandler:
"""
Handler for Git LFS operations
"""
def __init__(self, repo_location, repo_name, git_path, update_server_info,
extras):
if not os.path.isdir(repo_location):
raise OSError(repo_location)
self.content_path = repo_location
self.repo_name = repo_name
self.repo_location = repo_location
self.extras = extras
self.git_path = git_path
self.update_server_info = update_server_info
def get_app(self, git_lfs_enabled, git_lfs_store_path, git_lfs_http_scheme):
app = git_lfs.create_app(git_lfs_enabled, git_lfs_store_path, git_lfs_http_scheme)
return app
def create_git_lfs_wsgi_app(repo_path, repo_name, config):
git_path = settings.GIT_EXECUTABLE
update_server_info = config.pop(b'git_update_server_info')
git_lfs_enabled = config.pop(b'git_lfs_enabled')
git_lfs_store_path = config.pop(b'git_lfs_store_path')
git_lfs_http_scheme = config.pop(b'git_lfs_http_scheme', 'http')
app = GitLFSHandler(
repo_path, repo_name, git_path, update_server_info, config)
return app.get_app(git_lfs_enabled, git_lfs_store_path, git_lfs_http_scheme)