hg.py
157 lines
| 5.5 KiB
| text/x-python
|
PythonLexer
r5088 | # Copyright (C) 2016-2023 RhodeCode GmbH | |||
r2187 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import os | ||||
import sys | ||||
import logging | ||||
import tempfile | ||||
import textwrap | ||||
r3626 | import collections | |||
r5325 | ||||
r5326 | from .base import SshVcsServer | |||
r5325 | ||||
from rhodecode.lib.api_utils import call_service_api | ||||
r2187 | ||||
log = logging.getLogger(__name__) | ||||
class MercurialTunnelWrapper(object): | ||||
process = None | ||||
def __init__(self, server): | ||||
self.server = server | ||||
self.stdin = sys.stdin | ||||
self.stdout = sys.stdout | ||||
r3626 | self.hooks_env_fd, self.hooks_env_path = tempfile.mkstemp(prefix='hgrc_rhodecode_') | |||
r2187 | ||||
def create_hooks_env(self): | ||||
r3626 | repo_name = self.server.repo_name | |||
r3660 | hg_flags = self.server.config_to_hgrc(repo_name) | |||
r2187 | ||||
content = textwrap.dedent( | ||||
''' | ||||
r3637 | # RhodeCode SSH hooks version=2.0.0 | |||
r3626 | {custom} | |||
r2187 | ''' | |||
r3626 | ).format(custom='\n'.join(hg_flags)) | |||
root = self.server.get_root_store() | ||||
hgrc_custom = os.path.join(root, repo_name, '.hg', 'hgrc_rhodecode') | ||||
hgrc_main = os.path.join(root, repo_name, '.hg', 'hgrc') | ||||
r2187 | ||||
r3626 | # cleanup custom hgrc file | |||
if os.path.isfile(hgrc_custom): | ||||
with open(hgrc_custom, 'wb') as f: | ||||
r5325 | f.write(b'') | |||
r3626 | log.debug('Cleanup custom hgrc file under %s', hgrc_custom) | |||
# write temp | ||||
r2187 | with os.fdopen(self.hooks_env_fd, 'w') as hooks_env_file: | |||
hooks_env_file.write(content) | ||||
r3626 | return self.hooks_env_path | |||
r2187 | ||||
r3626 | def remove_configs(self): | |||
os.remove(self.hooks_env_path) | ||||
r2187 | ||||
r3626 | def command(self, hgrc_path): | |||
r2187 | root = self.server.get_root_store() | |||
command = ( | ||||
r3626 | "cd {root}; HGRCPATH={hgrc} {hg_path} -R {root}{repo_name} " | |||
r2187 | "serve --stdio".format( | |||
root=root, hg_path=self.server.hg_path, | ||||
r3626 | repo_name=self.server.repo_name, hgrc=hgrc_path)) | |||
r2187 | log.debug("Final CMD: %s", command) | |||
return command | ||||
def run(self, extras): | ||||
# at this point we cannot tell, we do further ACL checks | ||||
# inside the hooks | ||||
action = '?' | ||||
# permissions are check via `pre_push_ssh_auth` hook | ||||
self.server.update_environment(action=action, extras=extras) | ||||
r3626 | custom_hgrc_file = self.create_hooks_env() | |||
try: | ||||
return os.system(self.command(custom_hgrc_file)) | ||||
finally: | ||||
self.remove_configs() | ||||
r2187 | ||||
r5326 | class MercurialServer(SshVcsServer): | |||
r2187 | backend = 'hg' | |||
r4858 | repo_user_agent = 'mercurial' | |||
r3637 | cli_flags = ['phases', 'largefiles', 'extensions', 'experimental', 'hooks'] | |||
r2187 | ||||
r5325 | def __init__(self, store, ini_path, repo_name, user, user_permissions, settings, env): | |||
super().__init__(user, user_permissions, settings, env) | ||||
r2187 | ||||
self.store = store | ||||
self.ini_path = ini_path | ||||
self.repo_name = repo_name | ||||
r5325 | self._path = self.hg_path = settings['ssh.executable.hg'] | |||
r3626 | self.tunnel = MercurialTunnelWrapper(server=self) | |||
def config_to_hgrc(self, repo_name): | ||||
r5314 | # Todo: once transition is done only call to service api should exist | |||
if self.hooks_protocol == 'celery': | ||||
r5326 | data = call_service_api(self.settings, { | |||
r5314 | "method": "service_config_to_hgrc", | |||
"args": {"cli_flags": self.cli_flags, "repo_name": repo_name} | ||||
}) | ||||
return data['flags'] | ||||
r5325 | else: | |||
from rhodecode.model.db import RhodeCodeUi | ||||
from rhodecode.model.settings import VcsSettingsModel | ||||
ui_sections = collections.defaultdict(list) | ||||
ui = VcsSettingsModel(repo=repo_name).get_ui_settings(section=None, key=None) | ||||
r2187 | ||||
r5325 | # write default hooks | |||
default_hooks = [ | ||||
('pretxnchangegroup.ssh_auth', 'python:vcsserver.hooks.pre_push_ssh_auth'), | ||||
('pretxnchangegroup.ssh', 'python:vcsserver.hooks.pre_push_ssh'), | ||||
('changegroup.ssh', 'python:vcsserver.hooks.post_push_ssh'), | ||||
r3637 | ||||
r5325 | ('preoutgoing.ssh', 'python:vcsserver.hooks.pre_pull_ssh'), | |||
('outgoing.ssh', 'python:vcsserver.hooks.post_pull_ssh'), | ||||
] | ||||
r3637 | ||||
r5325 | for k, v in default_hooks: | |||
ui_sections['hooks'].append((k, v)) | ||||
r3637 | ||||
r5325 | for entry in ui: | |||
if not entry.active: | ||||
r3637 | continue | |||
r5325 | sec = entry.section | |||
key = entry.key | ||||
r3637 | ||||
r5325 | if sec in self.cli_flags: | |||
# we want only custom hooks, so we skip builtins | ||||
if sec == 'hooks' and key in RhodeCodeUi.HOOKS_BUILTIN: | ||||
continue | ||||
r3626 | ||||
r5325 | ui_sections[sec].append([key, entry.value]) | |||
flags = [] | ||||
for _sec, key_val in ui_sections.items(): | ||||
flags.append(' ') | ||||
flags.append(f'[{_sec}]') | ||||
for key, val in key_val: | ||||
flags.append(f'{key}= {val}') | ||||
return flags | ||||