hg.py
146 lines
| 5.0 KiB
| text/x-python
|
PythonLexer
r5088 | # Copyright (C) 2016-2023 RhodeCode GmbH | |||
r2187 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import os | ||||
import sys | ||||
import logging | ||||
import tempfile | ||||
import textwrap | ||||
r3626 | import collections | |||
r2187 | from .base import VcsServer | |||
r3637 | from rhodecode.model.db import RhodeCodeUi | |||
r3626 | from rhodecode.model.settings import VcsSettingsModel | |||
r2187 | ||||
log = logging.getLogger(__name__) | ||||
class MercurialTunnelWrapper(object): | ||||
process = None | ||||
def __init__(self, server): | ||||
self.server = server | ||||
self.stdin = sys.stdin | ||||
self.stdout = sys.stdout | ||||
r3626 | self.hooks_env_fd, self.hooks_env_path = tempfile.mkstemp(prefix='hgrc_rhodecode_') | |||
r2187 | ||||
def create_hooks_env(self): | ||||
r3626 | repo_name = self.server.repo_name | |||
r3660 | hg_flags = self.server.config_to_hgrc(repo_name) | |||
r2187 | ||||
content = textwrap.dedent( | ||||
''' | ||||
r3637 | # RhodeCode SSH hooks version=2.0.0 | |||
r3626 | {custom} | |||
r2187 | ''' | |||
r3626 | ).format(custom='\n'.join(hg_flags)) | |||
root = self.server.get_root_store() | ||||
hgrc_custom = os.path.join(root, repo_name, '.hg', 'hgrc_rhodecode') | ||||
hgrc_main = os.path.join(root, repo_name, '.hg', 'hgrc') | ||||
r2187 | ||||
r3626 | # cleanup custom hgrc file | |||
if os.path.isfile(hgrc_custom): | ||||
with open(hgrc_custom, 'wb') as f: | ||||
f.write('') | ||||
log.debug('Cleanup custom hgrc file under %s', hgrc_custom) | ||||
# write temp | ||||
r2187 | with os.fdopen(self.hooks_env_fd, 'w') as hooks_env_file: | |||
hooks_env_file.write(content) | ||||
r3626 | return self.hooks_env_path | |||
r2187 | ||||
r3626 | def remove_configs(self): | |||
os.remove(self.hooks_env_path) | ||||
r2187 | ||||
r3626 | def command(self, hgrc_path): | |||
r2187 | root = self.server.get_root_store() | |||
command = ( | ||||
r3626 | "cd {root}; HGRCPATH={hgrc} {hg_path} -R {root}{repo_name} " | |||
r2187 | "serve --stdio".format( | |||
root=root, hg_path=self.server.hg_path, | ||||
r3626 | repo_name=self.server.repo_name, hgrc=hgrc_path)) | |||
r2187 | log.debug("Final CMD: %s", command) | |||
return command | ||||
def run(self, extras): | ||||
# at this point we cannot tell, we do further ACL checks | ||||
# inside the hooks | ||||
action = '?' | ||||
# permissions are check via `pre_push_ssh_auth` hook | ||||
self.server.update_environment(action=action, extras=extras) | ||||
r3626 | custom_hgrc_file = self.create_hooks_env() | |||
try: | ||||
return os.system(self.command(custom_hgrc_file)) | ||||
finally: | ||||
self.remove_configs() | ||||
r2187 | ||||
class MercurialServer(VcsServer): | ||||
backend = 'hg' | ||||
r4858 | repo_user_agent = 'mercurial' | |||
r3637 | cli_flags = ['phases', 'largefiles', 'extensions', 'experimental', 'hooks'] | |||
r2187 | ||||
r3626 | def __init__(self, store, ini_path, repo_name, user, user_permissions, config, env): | |||
r5093 | super().__init__(user, user_permissions, config, env) | |||
r2187 | ||||
self.store = store | ||||
self.ini_path = ini_path | ||||
self.repo_name = repo_name | ||||
r3626 | self._path = self.hg_path = config.get('app:main', 'ssh.executable.hg') | |||
self.tunnel = MercurialTunnelWrapper(server=self) | ||||
def config_to_hgrc(self, repo_name): | ||||
ui_sections = collections.defaultdict(list) | ||||
ui = VcsSettingsModel(repo=repo_name).get_ui_settings(section=None, key=None) | ||||
r2187 | ||||
r3637 | # write default hooks | |||
default_hooks = [ | ||||
('pretxnchangegroup.ssh_auth', 'python:vcsserver.hooks.pre_push_ssh_auth'), | ||||
('pretxnchangegroup.ssh', 'python:vcsserver.hooks.pre_push_ssh'), | ||||
('changegroup.ssh', 'python:vcsserver.hooks.post_push_ssh'), | ||||
('preoutgoing.ssh', 'python:vcsserver.hooks.pre_pull_ssh'), | ||||
('outgoing.ssh', 'python:vcsserver.hooks.post_pull_ssh'), | ||||
] | ||||
for k, v in default_hooks: | ||||
ui_sections['hooks'].append((k, v)) | ||||
r3626 | for entry in ui: | |||
if not entry.active: | ||||
continue | ||||
sec = entry.section | ||||
r3637 | key = entry.key | |||
r3626 | ||||
if sec in self.cli_flags: | ||||
r3637 | # we want only custom hooks, so we skip builtins | |||
if sec == 'hooks' and key in RhodeCodeUi.HOOKS_BUILTIN: | ||||
continue | ||||
ui_sections[sec].append([key, entry.value]) | ||||
r3626 | ||||
flags = [] | ||||
for _sec, key_val in ui_sections.items(): | ||||
flags.append(' ') | ||||
r5093 | flags.append(f'[{_sec}]') | |||
r3626 | for key, val in key_val: | |||
r5093 | flags.append(f'{key}= {val}') | |||
r3626 | return flags | |||