# Copyright (C) 2011-2023 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import logging import datetime from collections import defaultdict from sqlalchemy import Table from rhodecode.api import jsonrpc_method, SERVICE_API_IDENTIFIER log = logging.getLogger(__name__) @jsonrpc_method() def service_get_data_for_ssh_wrapper(request, apiuser, user_id, repo_name, key_id=None): from rhodecode.model.db import User from rhodecode.model.scm import ScmModel from rhodecode.model.meta import raw_query_executor, Base if key_id: table = Table('user_ssh_keys', Base.metadata, autoload=False) atime = datetime.datetime.utcnow() stmt = ( table.update() .where(table.c.ssh_key_id == key_id) .values(accessed_on=atime) ) res_count = None with raw_query_executor() as session: result = session.execute(stmt) if result.rowcount: res_count = result.rowcount if res_count: log.debug(f'Update key id:{key_id} access time') db_user = User.get(user_id) if not db_user: return None auth_user = db_user.AuthUser() return { 'user_id': db_user.user_id, 'username': db_user.username, 'repo_permissions': auth_user.permissions['repositories'], "branch_permissions": auth_user.get_branch_permissions(repo_name), "repos_path": ScmModel().repos_path } @jsonrpc_method() def service_get_repo_name_by_id(request, apiuser, repo_id): from rhodecode.model.repo import RepoModel by_id_match = RepoModel().get_repo_by_id(repo_id) if by_id_match: repo_name = by_id_match.repo_name return { 'repo_name': repo_name } return None @jsonrpc_method() def service_mark_for_invalidation(request, apiuser, repo_name): from rhodecode.model.scm import ScmModel ScmModel().mark_for_invalidation(repo_name) return {'msg': "Applied"} @jsonrpc_method() def service_config_to_hgrc(request, apiuser, cli_flags, repo_name): from rhodecode.model.db import RhodeCodeUi from rhodecode.model.settings import VcsSettingsModel ui_sections = defaultdict(list) ui = VcsSettingsModel(repo=repo_name).get_ui_settings(section=None, key=None) default_hooks = [ ('pretxnchangegroup.ssh_auth', 'python:vcsserver.hooks.pre_push_ssh_auth'), ('pretxnchangegroup.ssh', 'python:vcsserver.hooks.pre_push_ssh'), ('changegroup.ssh', 'python:vcsserver.hooks.post_push_ssh'), ('preoutgoing.ssh', 'python:vcsserver.hooks.pre_pull_ssh'), ('outgoing.ssh', 'python:vcsserver.hooks.post_pull_ssh'), ] for k, v in default_hooks: ui_sections['hooks'].append((k, v)) for entry in ui: if not entry.active: continue sec = entry.section key = entry.key if sec in cli_flags: # we want only custom hooks, so we skip builtins if sec == 'hooks' and key in RhodeCodeUi.HOOKS_BUILTIN: continue ui_sections[sec].append([key, entry.value]) flags = [] for _sec, key_val in ui_sections.items(): flags.append(' ') flags.append(f'[{_sec}]') for key, val in key_val: flags.append(f'{key}= {val}') return {'flags': flags}