|
|
# RhodeCode VCSServer provides access to different vcs backends via network.
|
|
|
# Copyright (C) 2014-2023 RhodeCode GmbH
|
|
|
#
|
|
|
# This program is free software; you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU General Public License as published by
|
|
|
# the Free Software Foundation; either version 3 of the License, or
|
|
|
# (at your option) any later version.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU General Public License
|
|
|
# along with this program; if not, write to the Free Software Foundation,
|
|
|
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
|
|
|
|
|
import os
|
|
|
import logging
|
|
|
import itertools
|
|
|
|
|
|
import mercurial
|
|
|
import mercurial.error
|
|
|
import mercurial.wireprotoserver
|
|
|
import mercurial.hgweb.common
|
|
|
import mercurial.hgweb.hgweb_mod
|
|
|
import webob.exc
|
|
|
|
|
|
from vcsserver import pygrack, exceptions, settings, git_lfs
|
|
|
from vcsserver.str_utils import ascii_bytes, safe_bytes
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
# propagated from mercurial documentation
|
|
|
HG_UI_SECTIONS = [
|
|
|
'alias', 'auth', 'decode/encode', 'defaults', 'diff', 'email', 'extensions',
|
|
|
'format', 'merge-patterns', 'merge-tools', 'hooks', 'http_proxy', 'smtp',
|
|
|
'patch', 'paths', 'profiling', 'server', 'trusted', 'ui', 'web',
|
|
|
]
|
|
|
|
|
|
|
|
|
class HgWeb(mercurial.hgweb.hgweb_mod.hgweb):
|
|
|
"""Extension of hgweb that simplifies some functions."""
|
|
|
|
|
|
def _get_view(self, repo):
|
|
|
"""Views are not supported."""
|
|
|
return repo
|
|
|
|
|
|
def loadsubweb(self):
|
|
|
"""The result is only used in the templater method which is not used."""
|
|
|
return None
|
|
|
|
|
|
def run(self):
|
|
|
"""Unused function so raise an exception if accidentally called."""
|
|
|
raise NotImplementedError
|
|
|
|
|
|
def templater(self, req):
|
|
|
"""Function used in an unreachable code path.
|
|
|
|
|
|
This code is unreachable because we guarantee that the HTTP request,
|
|
|
corresponds to a Mercurial command. See the is_hg method. So, we are
|
|
|
never going to get a user-visible url.
|
|
|
"""
|
|
|
raise NotImplementedError
|
|
|
|
|
|
def archivelist(self, nodeid):
|
|
|
"""Unused function so raise an exception if accidentally called."""
|
|
|
raise NotImplementedError
|
|
|
|
|
|
def __call__(self, environ, start_response):
|
|
|
"""Run the WSGI application.
|
|
|
|
|
|
This may be called by multiple threads.
|
|
|
"""
|
|
|
from mercurial.hgweb import request as requestmod
|
|
|
req = requestmod.parserequestfromenv(environ)
|
|
|
res = requestmod.wsgiresponse(req, start_response)
|
|
|
gen = self.run_wsgi(req, res)
|
|
|
|
|
|
first_chunk = None
|
|
|
|
|
|
try:
|
|
|
data = next(gen)
|
|
|
|
|
|
def first_chunk():
|
|
|
yield data
|
|
|
except StopIteration:
|
|
|
pass
|
|
|
|
|
|
if first_chunk:
|
|
|
return itertools.chain(first_chunk(), gen)
|
|
|
return gen
|
|
|
|
|
|
def _runwsgi(self, req, res, repo):
|
|
|
|
|
|
cmd = req.qsparams.get(b'cmd', '')
|
|
|
if not mercurial.wireprotoserver.iscmd(cmd):
|
|
|
# NOTE(marcink): for unsupported commands, we return bad request
|
|
|
# internally from HG
|
|
|
log.warning('cmd: `%s` is not supported by the mercurial wireprotocol v1', cmd)
|
|
|
from mercurial.hgweb.common import statusmessage
|
|
|
res.status = statusmessage(mercurial.hgweb.common.HTTP_BAD_REQUEST)
|
|
|
res.setbodybytes(b'')
|
|
|
return res.sendresponse()
|
|
|
|
|
|
return super()._runwsgi(req, res, repo)
|
|
|
|
|
|
|
|
|
def sanitize_hg_ui(baseui):
|
|
|
# NOTE(marcink): since python3 hgsubversion is deprecated.
|
|
|
# From old installations we might still have this set enabled
|
|
|
# we explicitly remove this now here to make sure it wont propagate further
|
|
|
|
|
|
if baseui.config(b'extensions', b'hgsubversion') is not None:
|
|
|
for cfg in (baseui._ocfg, baseui._tcfg, baseui._ucfg):
|
|
|
if b'extensions' in cfg:
|
|
|
if b'hgsubversion' in cfg[b'extensions']:
|
|
|
del cfg[b'extensions'][b'hgsubversion']
|
|
|
|
|
|
|
|
|
def make_hg_ui_from_config(repo_config):
|
|
|
baseui = mercurial.ui.ui()
|
|
|
|
|
|
# clean the baseui object
|
|
|
baseui._ocfg = mercurial.config.config()
|
|
|
baseui._ucfg = mercurial.config.config()
|
|
|
baseui._tcfg = mercurial.config.config()
|
|
|
|
|
|
for section, option, value in repo_config:
|
|
|
baseui.setconfig(
|
|
|
ascii_bytes(section, allow_bytes=True),
|
|
|
ascii_bytes(option, allow_bytes=True),
|
|
|
ascii_bytes(value, allow_bytes=True))
|
|
|
|
|
|
# make our hgweb quiet so it doesn't print output
|
|
|
baseui.setconfig(b'ui', b'quiet', b'true')
|
|
|
|
|
|
return baseui
|
|
|
|
|
|
|
|
|
def update_hg_ui_from_hgrc(baseui, repo_path):
|
|
|
path = os.path.join(repo_path, '.hg', 'hgrc')
|
|
|
|
|
|
if not os.path.isfile(path):
|
|
|
log.debug('hgrc file is not present at %s, skipping...', path)
|
|
|
return
|
|
|
log.debug('reading hgrc from %s', path)
|
|
|
cfg = mercurial.config.config()
|
|
|
cfg.read(ascii_bytes(path))
|
|
|
for section in HG_UI_SECTIONS:
|
|
|
for k, v in cfg.items(section):
|
|
|
log.debug('settings ui from file: [%s] %s=%s', section, k, v)
|
|
|
baseui.setconfig(
|
|
|
ascii_bytes(section, allow_bytes=True),
|
|
|
ascii_bytes(k, allow_bytes=True),
|
|
|
ascii_bytes(v, allow_bytes=True))
|
|
|
|
|
|
|
|
|
def create_hg_wsgi_app(repo_path, repo_name, config):
|
|
|
"""
|
|
|
Prepares a WSGI application to handle Mercurial requests.
|
|
|
|
|
|
:param config: is a list of 3-item tuples representing a ConfigObject
|
|
|
(it is the serialized version of the config object).
|
|
|
"""
|
|
|
log.debug("Creating Mercurial WSGI application")
|
|
|
|
|
|
baseui = make_hg_ui_from_config(config)
|
|
|
update_hg_ui_from_hgrc(baseui, repo_path)
|
|
|
sanitize_hg_ui(baseui)
|
|
|
|
|
|
try:
|
|
|
return HgWeb(safe_bytes(repo_path), name=safe_bytes(repo_name), baseui=baseui)
|
|
|
except mercurial.error.RequirementError as e:
|
|
|
raise exceptions.RequirementException(e)(e)
|
|
|
|
|
|
|
|
|
class GitHandler:
|
|
|
"""
|
|
|
Handler for Git operations like push/pull etc
|
|
|
"""
|
|
|
def __init__(self, repo_location, repo_name, git_path, update_server_info,
|
|
|
extras):
|
|
|
if not os.path.isdir(repo_location):
|
|
|
raise OSError(repo_location)
|
|
|
self.content_path = repo_location
|
|
|
self.repo_name = repo_name
|
|
|
self.repo_location = repo_location
|
|
|
self.extras = extras
|
|
|
self.git_path = git_path
|
|
|
self.update_server_info = update_server_info
|
|
|
|
|
|
def __call__(self, environ, start_response):
|
|
|
app = webob.exc.HTTPNotFound()
|
|
|
candidate_paths = (
|
|
|
self.content_path, os.path.join(self.content_path, '.git'))
|
|
|
|
|
|
for content_path in candidate_paths:
|
|
|
try:
|
|
|
app = pygrack.GitRepository(
|
|
|
self.repo_name, content_path, self.git_path,
|
|
|
self.update_server_info, self.extras)
|
|
|
break
|
|
|
except OSError:
|
|
|
continue
|
|
|
|
|
|
return app(environ, start_response)
|
|
|
|
|
|
|
|
|
def create_git_wsgi_app(repo_path, repo_name, config):
|
|
|
"""
|
|
|
Creates a WSGI application to handle Git requests.
|
|
|
|
|
|
:param config: is a dictionary holding the extras.
|
|
|
"""
|
|
|
git_path = settings.GIT_EXECUTABLE
|
|
|
update_server_info = config.pop('git_update_server_info')
|
|
|
app = GitHandler(
|
|
|
repo_path, repo_name, git_path, update_server_info, config)
|
|
|
|
|
|
return app
|
|
|
|
|
|
|
|
|
class GitLFSHandler:
|
|
|
"""
|
|
|
Handler for Git LFS operations
|
|
|
"""
|
|
|
|
|
|
def __init__(self, repo_location, repo_name, git_path, update_server_info,
|
|
|
extras):
|
|
|
if not os.path.isdir(repo_location):
|
|
|
raise OSError(repo_location)
|
|
|
self.content_path = repo_location
|
|
|
self.repo_name = repo_name
|
|
|
self.repo_location = repo_location
|
|
|
self.extras = extras
|
|
|
self.git_path = git_path
|
|
|
self.update_server_info = update_server_info
|
|
|
|
|
|
def get_app(self, git_lfs_enabled, git_lfs_store_path, git_lfs_http_scheme):
|
|
|
app = git_lfs.create_app(git_lfs_enabled, git_lfs_store_path, git_lfs_http_scheme)
|
|
|
return app
|
|
|
|
|
|
|
|
|
def create_git_lfs_wsgi_app(repo_path, repo_name, config):
|
|
|
git_path = settings.GIT_EXECUTABLE
|
|
|
update_server_info = config.pop('git_update_server_info')
|
|
|
git_lfs_enabled = config.pop('git_lfs_enabled')
|
|
|
git_lfs_store_path = config.pop('git_lfs_store_path')
|
|
|
git_lfs_http_scheme = config.pop('git_lfs_http_scheme', 'http')
|
|
|
app = GitLFSHandler(
|
|
|
repo_path, repo_name, git_path, update_server_info, config)
|
|
|
|
|
|
return app.get_app(git_lfs_enabled, git_lfs_store_path, git_lfs_http_scheme)
|
|
|
|