##// END OF EJS Templates
feat(configs): deprecared old hooks protocol and ssh wrapper....
feat(configs): deprecared old hooks protocol and ssh wrapper. New defaults are now set on v2 keys, so previous installation are automatically set to new keys. Fallback mode is still available.

File last commit:

r5092:d0d88608 default
r5496:cab50adf default
Show More
utils.py
456 lines | 12.9 KiB | text/x-python | PythonLexer
# Copyright (C) 2014-2023 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
"""
JSON RPC utils
"""
import collections
import logging
from rhodecode.api.exc import JSONRPCError
from rhodecode.lib.auth import (
HasPermissionAnyApi, HasRepoPermissionAnyApi, HasRepoGroupPermissionAnyApi)
from rhodecode.lib.str_utils import safe_str
from rhodecode.lib.vcs.exceptions import RepositoryError
from rhodecode.lib.view_utils import get_commit_from_ref_name
from rhodecode.lib.utils2 import str2bool
log = logging.getLogger(__name__)
class OAttr(object):
"""
Special Option that defines other attribute, and can default to them
Example::
def test(apiuser, userid=Optional(OAttr('apiuser')):
user = Optional.extract(userid, evaluate_locals=local())
#if we pass in userid, we get it, else it will default to apiuser
#attribute
"""
def __init__(self, attr_name):
self.attr_name = attr_name
def __repr__(self):
return '<OptionalAttr:%s>' % self.attr_name
def __call__(self):
return self
class Optional(object):
"""
Defines an optional parameter::
param = param.getval() if isinstance(param, Optional) else param
param = param() if isinstance(param, Optional) else param
is equivalent of::
param = Optional.extract(param)
"""
def __init__(self, type_):
self.type_ = type_
def __repr__(self):
return '<Optional:%s>' % self.type_.__repr__()
def __call__(self):
return self.getval()
def getval(self, evaluate_locals=None):
"""
returns value from this Optional instance
"""
if isinstance(self.type_, OAttr):
param_name = self.type_.attr_name
if evaluate_locals:
return evaluate_locals[param_name]
# use params name
return param_name
return self.type_
@classmethod
def extract(cls, val, evaluate_locals=None, binary=None):
"""
Extracts value from Optional() instance
:param val:
:return: original value if it's not Optional instance else
value of instance
"""
if isinstance(val, cls):
val = val.getval(evaluate_locals)
if binary:
val = str2bool(val)
return val
def parse_args(cli_args, key_prefix=''):
from rhodecode.lib.utils2 import (escape_split)
kwargs = collections.defaultdict(dict)
for el in escape_split(cli_args, ','):
kv = escape_split(el, '=', 1)
if len(kv) == 2:
k, v = kv
kwargs[key_prefix + k] = v
return kwargs
def get_origin(obj):
"""
Get origin of permission from object.
:param obj:
"""
origin = 'permission'
if getattr(obj, 'owner_row', '') and getattr(obj, 'admin_row', ''):
# admin and owner case, maybe we should use dual string ?
origin = 'owner'
elif getattr(obj, 'owner_row', ''):
origin = 'owner'
elif getattr(obj, 'admin_row', ''):
origin = 'super-admin'
return origin
def store_update(updates, attr, name):
"""
Stores param in updates dict if it's not instance of Optional
allows easy updates of passed in params
"""
if not isinstance(attr, Optional):
updates[name] = attr
def has_superadmin_permission(apiuser):
"""
Return True if apiuser is admin or return False
:param apiuser:
"""
if HasPermissionAnyApi('hg.admin')(user=apiuser):
return True
return False
def validate_repo_permissions(apiuser, repoid, repo, perms):
"""
Raise JsonRPCError if apiuser is not authorized or return True
:param apiuser:
:param repoid:
:param repo:
:param perms:
"""
if not HasRepoPermissionAnyApi(*perms)(
user=apiuser, repo_name=repo.repo_name):
raise JSONRPCError('repository `%s` does not exist' % repoid)
return True
def validate_repo_group_permissions(apiuser, repogroupid, repo_group, perms):
"""
Raise JsonRPCError if apiuser is not authorized or return True
:param apiuser:
:param repogroupid: just the id of repository group
:param repo_group: instance of repo_group
:param perms:
"""
if not HasRepoGroupPermissionAnyApi(*perms)(
user=apiuser, group_name=repo_group.group_name):
raise JSONRPCError(
'repository group `%s` does not exist' % repogroupid)
return True
def validate_set_owner_permissions(apiuser, owner):
if isinstance(owner, Optional):
owner = get_user_or_error(apiuser.user_id)
else:
if has_superadmin_permission(apiuser):
owner = get_user_or_error(owner)
else:
# forbid setting owner for non-admins
raise JSONRPCError(
'Only RhodeCode super-admin can specify `owner` param')
return owner
def get_user_or_error(userid):
"""
Get user by id or name or return JsonRPCError if not found
:param userid:
"""
from rhodecode.model.user import UserModel
user_model = UserModel()
if isinstance(userid, int):
try:
user = user_model.get_user(userid)
except ValueError:
user = None
else:
user = user_model.get_by_username(userid)
if user is None:
raise JSONRPCError(
'user `{}` does not exist'.format(userid))
return user
def get_repo_or_error(repoid):
"""
Get repo by id or name or return JsonRPCError if not found
:param repoid:
"""
from rhodecode.model.repo import RepoModel
repo_model = RepoModel()
if isinstance(repoid, int):
try:
repo = repo_model.get_repo(repoid)
except ValueError:
repo = None
else:
repo = repo_model.get_by_repo_name(repoid)
if repo is None:
raise JSONRPCError(
'repository `{}` does not exist'.format(repoid))
return repo
def get_repo_group_or_error(repogroupid):
"""
Get repo group by id or name or return JsonRPCError if not found
:param repogroupid:
"""
from rhodecode.model.repo_group import RepoGroupModel
repo_group_model = RepoGroupModel()
if isinstance(repogroupid, int):
try:
repo_group = repo_group_model._get_repo_group(repogroupid)
except ValueError:
repo_group = None
else:
repo_group = repo_group_model.get_by_group_name(repogroupid)
if repo_group is None:
raise JSONRPCError(
'repository group `{}` does not exist'.format(repogroupid))
return repo_group
def get_user_group_or_error(usergroupid):
"""
Get user group by id or name or return JsonRPCError if not found
:param usergroupid:
"""
from rhodecode.model.user_group import UserGroupModel
user_group_model = UserGroupModel()
if isinstance(usergroupid, int):
try:
user_group = user_group_model.get_group(usergroupid)
except ValueError:
user_group = None
else:
user_group = user_group_model.get_by_name(usergroupid)
if user_group is None:
raise JSONRPCError(
'user group `{}` does not exist'.format(usergroupid))
return user_group
def get_perm_or_error(permid, prefix=None):
"""
Get permission by id or name or return JsonRPCError if not found
:param permid:
"""
from rhodecode.model.permission import PermissionModel
perm = PermissionModel.cls.get_by_key(permid)
if perm is None:
msg = f'permission `{permid}` does not exist.'
if prefix:
msg += f' Permission should start with prefix: `{prefix}`'
raise JSONRPCError(msg)
if prefix:
if not perm.permission_name.startswith(prefix):
raise JSONRPCError('permission `%s` is invalid, '
'should start with %s' % (permid, prefix))
return perm
def get_gist_or_error(gistid):
"""
Get gist by id or gist_access_id or return JsonRPCError if not found
:param gistid:
"""
from rhodecode.model.gist import GistModel
gist = GistModel.cls.get_by_access_id(gistid)
if gist is None:
raise JSONRPCError('gist `{}` does not exist'.format(gistid))
return gist
def get_pull_request_or_error(pullrequestid):
"""
Get pull request by id or return JsonRPCError if not found
:param pullrequestid:
"""
from rhodecode.model.pull_request import PullRequestModel
try:
pull_request = PullRequestModel().get(int(pullrequestid))
except ValueError:
raise JSONRPCError('pullrequestid must be an integer')
if not pull_request:
raise JSONRPCError('pull request `{}` does not exist'.format(
pullrequestid))
return pull_request
def build_commit_data(rhodecode_vcs_repo, commit, detail_level):
commit2 = commit
commit1 = commit.first_parent
parsed_diff = []
if detail_level == 'extended':
for f_path in commit.added_paths:
parsed_diff.append(_get_commit_dict(filename=f_path, op='A'))
for f_path in commit.changed_paths:
parsed_diff.append(_get_commit_dict(filename=f_path, op='M'))
for f_path in commit.removed_paths:
parsed_diff.append(_get_commit_dict(filename=f_path, op='D'))
elif detail_level == 'full':
from rhodecode.lib import diffs
_diff = rhodecode_vcs_repo.get_diff(commit1, commit2,)
diff_processor = diffs.DiffProcessor(_diff, diff_format='newdiff', show_full_diff=True)
for dp in diff_processor.prepare():
del dp['stats']['ops']
_stats = dp['stats']
parsed_diff.append(_get_commit_dict(
filename=dp['filename'], op=dp['operation'],
new_revision=dp['new_revision'],
old_revision=dp['old_revision'],
raw_diff=dp['raw_diff'], stats=_stats))
return parsed_diff
def get_commit_or_error(ref, repo):
try:
ref_type, _, ref_hash = ref.split(':')
except ValueError:
raise JSONRPCError(
'Ref `{ref}` given in a wrong format. Please check the API'
' documentation for more details'.format(ref=ref))
try:
# TODO: dan: refactor this to use repo.scm_instance().get_commit()
# once get_commit supports ref_types
return get_commit_from_ref_name(repo, ref_hash)
except RepositoryError:
raise JSONRPCError(f'Ref `{ref}` does not exist')
def _get_ref_hash(repo, type_, name):
vcs_repo = repo.scm_instance()
if type_ in ['branch'] and vcs_repo.alias in ('hg', 'git'):
return vcs_repo.branches[name]
elif type_ in ['bookmark', 'book'] and vcs_repo.alias == 'hg':
return vcs_repo.bookmarks[name]
else:
raise ValueError()
def resolve_ref_or_error(ref, repo, allowed_ref_types=None):
allowed_ref_types = allowed_ref_types or ['bookmark', 'book', 'tag', 'branch']
def _parse_ref(type_, name, hash_=None):
return type_, name, hash_
try:
ref_type, ref_name, ref_hash = _parse_ref(*ref.split(':'))
except TypeError:
raise JSONRPCError(
'Ref `{ref}` given in a wrong format. Please check the API'
' documentation for more details'.format(ref=ref))
if ref_type not in allowed_ref_types:
raise JSONRPCError(
'Ref `{ref}` type is not allowed. '
'Only:{allowed_refs} are possible.'.format(
ref=ref, allowed_refs=allowed_ref_types))
try:
ref_hash = ref_hash or _get_ref_hash(repo, ref_type, ref_name)
except (KeyError, ValueError):
raise JSONRPCError(
'The specified value:{type}:`{name}` does not exist, or is not allowed.'.format(
type=ref_type, name=ref_name))
return ':'.join([ref_type, ref_name, ref_hash])
def _get_commit_dict(
filename, op, new_revision=None, old_revision=None,
raw_diff=None, stats=None):
if stats is None:
stats = {
"added": None,
"binary": None,
"deleted": None
}
return {
"filename": safe_str(filename),
"op": op,
# extra details
"new_revision": new_revision,
"old_revision": old_revision,
"raw_diff": raw_diff,
"stats": stats
}