##// END OF EJS Templates
deps: bumped rhodecode-tools==4.0.0
deps: bumped rhodecode-tools==4.0.0

File last commit:

r5314:585ee450 default
r5592:2f9a9e11 default
Show More
service_api.py
125 lines | 4.1 KiB | text/x-python | PythonLexer
fix(ssh): Added alternative SshWrapper and changes needed to support it + service api. Fixes: RCCE-6
r5314 # Copyright (C) 2011-2023 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import logging
import datetime
from collections import defaultdict
from sqlalchemy import Table
from rhodecode.api import jsonrpc_method, SERVICE_API_IDENTIFIER
log = logging.getLogger(__name__)
@jsonrpc_method()
def service_get_data_for_ssh_wrapper(request, apiuser, user_id, repo_name, key_id=None):
from rhodecode.model.db import User
from rhodecode.model.scm import ScmModel
from rhodecode.model.meta import raw_query_executor, Base
if key_id:
table = Table('user_ssh_keys', Base.metadata, autoload=False)
atime = datetime.datetime.utcnow()
stmt = (
table.update()
.where(table.c.ssh_key_id == key_id)
.values(accessed_on=atime)
)
res_count = None
with raw_query_executor() as session:
result = session.execute(stmt)
if result.rowcount:
res_count = result.rowcount
if res_count:
log.debug(f'Update key id:{key_id} access time')
db_user = User.get(user_id)
if not db_user:
return None
auth_user = db_user.AuthUser()
return {
'user_id': db_user.user_id,
'username': db_user.username,
'repo_permissions': auth_user.permissions['repositories'],
"branch_permissions": auth_user.get_branch_permissions(repo_name),
"repos_path": ScmModel().repos_path
}
@jsonrpc_method()
def service_get_repo_name_by_id(request, apiuser, repo_id):
from rhodecode.model.repo import RepoModel
by_id_match = RepoModel().get_repo_by_id(repo_id)
if by_id_match:
repo_name = by_id_match.repo_name
return {
'repo_name': repo_name
}
return None
@jsonrpc_method()
def service_mark_for_invalidation(request, apiuser, repo_name):
from rhodecode.model.scm import ScmModel
ScmModel().mark_for_invalidation(repo_name)
return {'msg': "Applied"}
@jsonrpc_method()
def service_config_to_hgrc(request, apiuser, cli_flags, repo_name):
from rhodecode.model.db import RhodeCodeUi
from rhodecode.model.settings import VcsSettingsModel
ui_sections = defaultdict(list)
ui = VcsSettingsModel(repo=repo_name).get_ui_settings(section=None, key=None)
default_hooks = [
('pretxnchangegroup.ssh_auth', 'python:vcsserver.hooks.pre_push_ssh_auth'),
('pretxnchangegroup.ssh', 'python:vcsserver.hooks.pre_push_ssh'),
('changegroup.ssh', 'python:vcsserver.hooks.post_push_ssh'),
('preoutgoing.ssh', 'python:vcsserver.hooks.pre_pull_ssh'),
('outgoing.ssh', 'python:vcsserver.hooks.post_pull_ssh'),
]
for k, v in default_hooks:
ui_sections['hooks'].append((k, v))
for entry in ui:
if not entry.active:
continue
sec = entry.section
key = entry.key
if sec in cli_flags:
# we want only custom hooks, so we skip builtins
if sec == 'hooks' and key in RhodeCodeUi.HOOKS_BUILTIN:
continue
ui_sections[sec].append([key, entry.value])
flags = []
for _sec, key_val in ui_sections.items():
flags.append(' ')
flags.append(f'[{_sec}]')
for key, val in key_val:
flags.append(f'{key}= {val}')
return {'flags': flags}