# Copyright (C) 2016-2023 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import os import sys import logging from rhodecode.lib.hook_daemon.base import prepare_callback_daemon from rhodecode.lib.ext_json import sjson as json from rhodecode.lib.vcs.conf import settings as vcs_settings from rhodecode.lib.api_utils import call_service_api log = logging.getLogger(__name__) class SshVcsServer(object): repo_user_agent = None # set in child classes _path = None # set executable path for hg/git/svn binary backend = None # set in child classes tunnel = None # subprocess handling tunnel settings = None # parsed settings module write_perms = ['repository.admin', 'repository.write'] read_perms = ['repository.read', 'repository.admin', 'repository.write'] def __init__(self, user, user_permissions, settings, env): self.user = user self.user_permissions = user_permissions self.settings = settings self.env = env self.stdin = sys.stdin self.repo_name = None self.repo_mode = None self.store = '' self.ini_path = '' self.hooks_protocol = None def _invalidate_cache(self, repo_name): """ Set's cache for this repository for invalidation on next access :param repo_name: full repo name, also a cache key """ # Todo: Leave only "celery" case after transition. match self.hooks_protocol: case 'http': from rhodecode.model.scm import ScmModel ScmModel().mark_for_invalidation(repo_name) case 'celery': call_service_api(self.settings, { "method": "service_mark_for_invalidation", "args": {"repo_name": repo_name} }) def has_write_perm(self): permission = self.user_permissions.get(self.repo_name) if permission in ['repository.write', 'repository.admin']: return True return False def _check_permissions(self, action): permission = self.user_permissions.get(self.repo_name) user_info = f'{self.user["user_id"]}:{self.user["username"]}' log.debug('permission for %s on %s are: %s', user_info, self.repo_name, permission) if not permission: log.error('user `%s` permissions to repo:%s are empty. Forbidding access.', user_info, self.repo_name) return -2 if action == 'pull': if permission in self.read_perms: log.info( 'READ Permissions for User "%s" detected to repo "%s"!', user_info, self.repo_name) return 0 else: if permission in self.write_perms: log.info( 'WRITE, or Higher Permissions for User "%s" detected to repo "%s"!', user_info, self.repo_name) return 0 log.error('Cannot properly fetch or verify user `%s` permissions. ' 'Permissions: %s, vcs action: %s', user_info, permission, action) return -2 def update_environment(self, action, extras=None): scm_data = { 'ip': os.environ['SSH_CLIENT'].split()[0], 'username': self.user.username, 'user_id': self.user.user_id, 'action': action, 'repository': self.repo_name, 'scm': self.backend, 'config': self.ini_path, 'repo_store': self.store, 'make_lock': None, 'locked_by': [None, None], 'server_url': None, 'user_agent': f'{self.repo_user_agent}/ssh-user-agent', 'hooks': ['push', 'pull'], 'hooks_module': 'rhodecode.lib.hook_daemon.hook_module', 'is_shadow_repo': False, 'detect_force_push': False, 'check_branch_perms': False, 'SSH': True, 'SSH_PERMISSIONS': self.user_permissions.get(self.repo_name), } if extras: scm_data.update(extras) os.putenv("RC_SCM_DATA", json.dumps(scm_data)) return scm_data def get_root_store(self): root_store = self.store if not root_store.endswith('/'): # always append trailing slash root_store = root_store + '/' return root_store def _handle_tunnel(self, extras): # pre-auth action = 'pull' exit_code = self._check_permissions(action) if exit_code: return exit_code, False req = self.env.get('request') if req: server_url = req.host_url + req.script_name extras['server_url'] = server_url log.debug('Using %s binaries from path %s', self.backend, self._path) exit_code = self.tunnel.run(extras) return exit_code, action == "push" def run(self, tunnel_extras=None): self.hooks_protocol = self.settings['vcs.hooks.protocol'] tunnel_extras = tunnel_extras or {} extras = {} extras.update(tunnel_extras) callback_daemon, extras = prepare_callback_daemon( extras, protocol=self.hooks_protocol, host=vcs_settings.HOOKS_HOST) with callback_daemon: try: return self._handle_tunnel(extras) finally: log.debug('Running cleanup with cache invalidation') if self.repo_name: self._invalidate_cache(self.repo_name)