|
|
# Copyright (C) 2011-2024 RhodeCode GmbH
|
|
|
#
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU Affero General Public License, version 3
|
|
|
# (only), as published by the Free Software Foundation.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
#
|
|
|
# This program is dual-licensed. If you wish to learn more about the
|
|
|
# RhodeCode Enterprise Edition, including its added features, Support services,
|
|
|
# and proprietary license terms, please see https://rhodecode.com/licenses/
|
|
|
|
|
|
import logging
|
|
|
import datetime
|
|
|
from collections import defaultdict
|
|
|
|
|
|
from sqlalchemy import Table
|
|
|
from rhodecode.api import jsonrpc_method, SERVICE_API_IDENTIFIER
|
|
|
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
@jsonrpc_method()
|
|
|
def service_get_data_for_ssh_wrapper(request, apiuser, user_id, repo_name, key_id=None):
|
|
|
from rhodecode.model.db import User
|
|
|
from rhodecode.model.scm import ScmModel
|
|
|
from rhodecode.model.meta import raw_query_executor, Base
|
|
|
|
|
|
if key_id:
|
|
|
table = Table('user_ssh_keys', Base.metadata, autoload=False)
|
|
|
atime = datetime.datetime.utcnow()
|
|
|
stmt = (
|
|
|
table.update()
|
|
|
.where(table.c.ssh_key_id == key_id)
|
|
|
.values(accessed_on=atime)
|
|
|
)
|
|
|
|
|
|
res_count = None
|
|
|
with raw_query_executor() as session:
|
|
|
result = session.execute(stmt)
|
|
|
if result.rowcount:
|
|
|
res_count = result.rowcount
|
|
|
|
|
|
if res_count:
|
|
|
log.debug(f'Update key id:{key_id} access time')
|
|
|
db_user = User.get(user_id)
|
|
|
if not db_user:
|
|
|
return None
|
|
|
auth_user = db_user.AuthUser()
|
|
|
|
|
|
return {
|
|
|
'user_id': db_user.user_id,
|
|
|
'username': db_user.username,
|
|
|
'repo_permissions': auth_user.permissions['repositories'],
|
|
|
"branch_permissions": auth_user.get_branch_permissions(repo_name),
|
|
|
"repos_path": ScmModel().repos_path
|
|
|
}
|
|
|
|
|
|
|
|
|
@jsonrpc_method()
|
|
|
def service_get_repo_name_by_id(request, apiuser, repo_id):
|
|
|
from rhodecode.model.repo import RepoModel
|
|
|
by_id_match = RepoModel().get_repo_by_id(repo_id)
|
|
|
if by_id_match:
|
|
|
repo_name = by_id_match.repo_name
|
|
|
return {
|
|
|
'repo_name': repo_name
|
|
|
}
|
|
|
return None
|
|
|
|
|
|
|
|
|
@jsonrpc_method()
|
|
|
def service_mark_for_invalidation(request, apiuser, repo_name):
|
|
|
from rhodecode.model.scm import ScmModel
|
|
|
ScmModel().mark_for_invalidation(repo_name)
|
|
|
return {'msg': "Applied"}
|
|
|
|
|
|
|
|
|
@jsonrpc_method()
|
|
|
def service_config_to_hgrc(request, apiuser, cli_flags, repo_name):
|
|
|
from rhodecode.model.db import RhodeCodeUi
|
|
|
from rhodecode.model.settings import VcsSettingsModel
|
|
|
|
|
|
ui_sections = defaultdict(list)
|
|
|
ui = VcsSettingsModel(repo=repo_name).get_ui_settings(section=None, key=None)
|
|
|
|
|
|
default_hooks = [
|
|
|
('pretxnchangegroup.ssh_auth', 'python:vcsserver.hooks.pre_push_ssh_auth'),
|
|
|
('pretxnchangegroup.ssh', 'python:vcsserver.hooks.pre_push_ssh'),
|
|
|
('changegroup.ssh', 'python:vcsserver.hooks.post_push_ssh'),
|
|
|
|
|
|
('preoutgoing.ssh', 'python:vcsserver.hooks.pre_pull_ssh'),
|
|
|
('outgoing.ssh', 'python:vcsserver.hooks.post_pull_ssh'),
|
|
|
]
|
|
|
|
|
|
for k, v in default_hooks:
|
|
|
ui_sections['hooks'].append((k, v))
|
|
|
|
|
|
for entry in ui:
|
|
|
if not entry.active:
|
|
|
continue
|
|
|
sec = entry.section
|
|
|
key = entry.key
|
|
|
|
|
|
if sec in cli_flags:
|
|
|
# we want only custom hooks, so we skip builtins
|
|
|
if sec == 'hooks' and key in RhodeCodeUi.HOOKS_BUILTIN:
|
|
|
continue
|
|
|
|
|
|
ui_sections[sec].append([key, entry.value])
|
|
|
|
|
|
flags = []
|
|
|
for _sec, key_val in ui_sections.items():
|
|
|
flags.append(' ')
|
|
|
flags.append(f'[{_sec}]')
|
|
|
for key, val in key_val:
|
|
|
flags.append(f'{key}= {val}')
|
|
|
return {'flags': flags}
|
|
|
|