utils.py
456 lines
| 12.9 KiB
| text/x-python
|
PythonLexer
r5088 | # Copyright (C) 2014-2023 RhodeCode GmbH | |||
r1 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
""" | ||||
JSON RPC utils | ||||
""" | ||||
import collections | ||||
import logging | ||||
from rhodecode.api.exc import JSONRPCError | ||||
r1265 | from rhodecode.lib.auth import ( | |||
HasPermissionAnyApi, HasRepoPermissionAnyApi, HasRepoGroupPermissionAnyApi) | ||||
r5047 | from rhodecode.lib.str_utils import safe_str | |||
r1265 | from rhodecode.lib.vcs.exceptions import RepositoryError | |||
r3346 | from rhodecode.lib.view_utils import get_commit_from_ref_name | |||
r1265 | from rhodecode.lib.utils2 import str2bool | |||
r1 | ||||
log = logging.getLogger(__name__) | ||||
class OAttr(object): | ||||
""" | ||||
Special Option that defines other attribute, and can default to them | ||||
Example:: | ||||
def test(apiuser, userid=Optional(OAttr('apiuser')): | ||||
user = Optional.extract(userid, evaluate_locals=local()) | ||||
#if we pass in userid, we get it, else it will default to apiuser | ||||
#attribute | ||||
""" | ||||
def __init__(self, attr_name): | ||||
self.attr_name = attr_name | ||||
def __repr__(self): | ||||
return '<OptionalAttr:%s>' % self.attr_name | ||||
def __call__(self): | ||||
return self | ||||
class Optional(object): | ||||
""" | ||||
Defines an optional parameter:: | ||||
param = param.getval() if isinstance(param, Optional) else param | ||||
param = param() if isinstance(param, Optional) else param | ||||
is equivalent of:: | ||||
param = Optional.extract(param) | ||||
""" | ||||
def __init__(self, type_): | ||||
self.type_ = type_ | ||||
def __repr__(self): | ||||
return '<Optional:%s>' % self.type_.__repr__() | ||||
def __call__(self): | ||||
return self.getval() | ||||
def getval(self, evaluate_locals=None): | ||||
""" | ||||
returns value from this Optional instance | ||||
""" | ||||
if isinstance(self.type_, OAttr): | ||||
param_name = self.type_.attr_name | ||||
if evaluate_locals: | ||||
return evaluate_locals[param_name] | ||||
# use params name | ||||
return param_name | ||||
return self.type_ | ||||
@classmethod | ||||
r1265 | def extract(cls, val, evaluate_locals=None, binary=None): | |||
r1 | """ | |||
Extracts value from Optional() instance | ||||
:param val: | ||||
:return: original value if it's not Optional instance else | ||||
value of instance | ||||
""" | ||||
if isinstance(val, cls): | ||||
r1265 | val = val.getval(evaluate_locals) | |||
if binary: | ||||
val = str2bool(val) | ||||
r1 | return val | |||
def parse_args(cli_args, key_prefix=''): | ||||
from rhodecode.lib.utils2 import (escape_split) | ||||
kwargs = collections.defaultdict(dict) | ||||
for el in escape_split(cli_args, ','): | ||||
kv = escape_split(el, '=', 1) | ||||
if len(kv) == 2: | ||||
k, v = kv | ||||
kwargs[key_prefix + k] = v | ||||
return kwargs | ||||
def get_origin(obj): | ||||
""" | ||||
Get origin of permission from object. | ||||
:param obj: | ||||
""" | ||||
origin = 'permission' | ||||
if getattr(obj, 'owner_row', '') and getattr(obj, 'admin_row', ''): | ||||
# admin and owner case, maybe we should use dual string ? | ||||
origin = 'owner' | ||||
elif getattr(obj, 'owner_row', ''): | ||||
origin = 'owner' | ||||
elif getattr(obj, 'admin_row', ''): | ||||
origin = 'super-admin' | ||||
return origin | ||||
def store_update(updates, attr, name): | ||||
""" | ||||
Stores param in updates dict if it's not instance of Optional | ||||
allows easy updates of passed in params | ||||
""" | ||||
if not isinstance(attr, Optional): | ||||
updates[name] = attr | ||||
def has_superadmin_permission(apiuser): | ||||
""" | ||||
Return True if apiuser is admin or return False | ||||
:param apiuser: | ||||
""" | ||||
if HasPermissionAnyApi('hg.admin')(user=apiuser): | ||||
return True | ||||
return False | ||||
r1150 | def validate_repo_permissions(apiuser, repoid, repo, perms): | |||
r1 | """ | |||
Raise JsonRPCError if apiuser is not authorized or return True | ||||
:param apiuser: | ||||
:param repoid: | ||||
:param repo: | ||||
:param perms: | ||||
""" | ||||
if not HasRepoPermissionAnyApi(*perms)( | ||||
user=apiuser, repo_name=repo.repo_name): | ||||
r4474 | raise JSONRPCError('repository `%s` does not exist' % repoid) | |||
r1 | ||||
return True | ||||
r1148 | def validate_repo_group_permissions(apiuser, repogroupid, repo_group, perms): | |||
""" | ||||
Raise JsonRPCError if apiuser is not authorized or return True | ||||
:param apiuser: | ||||
:param repogroupid: just the id of repository group | ||||
:param repo_group: instance of repo_group | ||||
:param perms: | ||||
""" | ||||
if not HasRepoGroupPermissionAnyApi(*perms)( | ||||
user=apiuser, group_name=repo_group.group_name): | ||||
raise JSONRPCError( | ||||
'repository group `%s` does not exist' % repogroupid) | ||||
return True | ||||
r1150 | def validate_set_owner_permissions(apiuser, owner): | |||
r1148 | if isinstance(owner, Optional): | |||
owner = get_user_or_error(apiuser.user_id) | ||||
else: | ||||
if has_superadmin_permission(apiuser): | ||||
owner = get_user_or_error(owner) | ||||
else: | ||||
# forbid setting owner for non-admins | ||||
raise JSONRPCError( | ||||
'Only RhodeCode super-admin can specify `owner` param') | ||||
return owner | ||||
r1 | def get_user_or_error(userid): | |||
""" | ||||
Get user by id or name or return JsonRPCError if not found | ||||
:param userid: | ||||
""" | ||||
from rhodecode.model.user import UserModel | ||||
r1530 | user_model = UserModel() | |||
r1 | ||||
r4935 | if isinstance(userid, int): | |||
r1530 | try: | |||
user = user_model.get_user(userid) | ||||
except ValueError: | ||||
user = None | ||||
else: | ||||
r1 | user = user_model.get_by_username(userid) | |||
if user is None: | ||||
r1530 | raise JSONRPCError( | |||
r5092 | 'user `{}` does not exist'.format(userid)) | |||
r1 | return user | |||
def get_repo_or_error(repoid): | ||||
""" | ||||
Get repo by id or name or return JsonRPCError if not found | ||||
:param repoid: | ||||
""" | ||||
from rhodecode.model.repo import RepoModel | ||||
r1530 | repo_model = RepoModel() | |||
r1 | ||||
r4935 | if isinstance(repoid, int): | |||
r1530 | try: | |||
repo = repo_model.get_repo(repoid) | ||||
except ValueError: | ||||
repo = None | ||||
else: | ||||
repo = repo_model.get_by_repo_name(repoid) | ||||
r1 | if repo is None: | |||
r1530 | raise JSONRPCError( | |||
r5092 | 'repository `{}` does not exist'.format(repoid)) | |||
r1 | return repo | |||
def get_repo_group_or_error(repogroupid): | ||||
""" | ||||
Get repo group by id or name or return JsonRPCError if not found | ||||
:param repogroupid: | ||||
""" | ||||
from rhodecode.model.repo_group import RepoGroupModel | ||||
r1530 | repo_group_model = RepoGroupModel() | |||
r1 | ||||
r4935 | if isinstance(repogroupid, int): | |||
r1530 | try: | |||
repo_group = repo_group_model._get_repo_group(repogroupid) | ||||
except ValueError: | ||||
repo_group = None | ||||
else: | ||||
repo_group = repo_group_model.get_by_group_name(repogroupid) | ||||
r1 | if repo_group is None: | |||
raise JSONRPCError( | ||||
r5092 | 'repository group `{}` does not exist'.format(repogroupid)) | |||
r1 | return repo_group | |||
def get_user_group_or_error(usergroupid): | ||||
""" | ||||
Get user group by id or name or return JsonRPCError if not found | ||||
:param usergroupid: | ||||
""" | ||||
from rhodecode.model.user_group import UserGroupModel | ||||
r1530 | user_group_model = UserGroupModel() | |||
r1 | ||||
r4935 | if isinstance(usergroupid, int): | |||
r1530 | try: | |||
user_group = user_group_model.get_group(usergroupid) | ||||
except ValueError: | ||||
user_group = None | ||||
else: | ||||
user_group = user_group_model.get_by_name(usergroupid) | ||||
r1 | if user_group is None: | |||
r1530 | raise JSONRPCError( | |||
r5092 | 'user group `{}` does not exist'.format(usergroupid)) | |||
r1 | return user_group | |||
def get_perm_or_error(permid, prefix=None): | ||||
""" | ||||
Get permission by id or name or return JsonRPCError if not found | ||||
:param permid: | ||||
""" | ||||
from rhodecode.model.permission import PermissionModel | ||||
perm = PermissionModel.cls.get_by_key(permid) | ||||
if perm is None: | ||||
r5092 | msg = f'permission `{permid}` does not exist.' | |||
r4194 | if prefix: | |||
r5092 | msg += f' Permission should start with prefix: `{prefix}`' | |||
r4194 | raise JSONRPCError(msg) | |||
r1 | if prefix: | |||
if not perm.permission_name.startswith(prefix): | ||||
raise JSONRPCError('permission `%s` is invalid, ' | ||||
'should start with %s' % (permid, prefix)) | ||||
return perm | ||||
def get_gist_or_error(gistid): | ||||
""" | ||||
Get gist by id or gist_access_id or return JsonRPCError if not found | ||||
:param gistid: | ||||
""" | ||||
from rhodecode.model.gist import GistModel | ||||
gist = GistModel.cls.get_by_access_id(gistid) | ||||
if gist is None: | ||||
r5092 | raise JSONRPCError('gist `{}` does not exist'.format(gistid)) | |||
r1 | return gist | |||
def get_pull_request_or_error(pullrequestid): | ||||
""" | ||||
Get pull request by id or return JsonRPCError if not found | ||||
:param pullrequestid: | ||||
""" | ||||
from rhodecode.model.pull_request import PullRequestModel | ||||
try: | ||||
pull_request = PullRequestModel().get(int(pullrequestid)) | ||||
except ValueError: | ||||
raise JSONRPCError('pullrequestid must be an integer') | ||||
if not pull_request: | ||||
r5092 | raise JSONRPCError('pull request `{}` does not exist'.format( | |||
pullrequestid)) | ||||
r1 | return pull_request | |||
r4531 | def build_commit_data(rhodecode_vcs_repo, commit, detail_level): | |||
commit2 = commit | ||||
commit1 = commit.first_parent | ||||
r1 | parsed_diff = [] | |||
if detail_level == 'extended': | ||||
r4242 | for f_path in commit.added_paths: | |||
parsed_diff.append(_get_commit_dict(filename=f_path, op='A')) | ||||
for f_path in commit.changed_paths: | ||||
parsed_diff.append(_get_commit_dict(filename=f_path, op='M')) | ||||
for f_path in commit.removed_paths: | ||||
parsed_diff.append(_get_commit_dict(filename=f_path, op='D')) | ||||
r1 | ||||
elif detail_level == 'full': | ||||
r4531 | from rhodecode.lib import diffs | |||
_diff = rhodecode_vcs_repo.get_diff(commit1, commit2,) | ||||
r5047 | diff_processor = diffs.DiffProcessor(_diff, diff_format='newdiff', show_full_diff=True) | |||
r4531 | ||||
r1 | for dp in diff_processor.prepare(): | |||
del dp['stats']['ops'] | ||||
_stats = dp['stats'] | ||||
parsed_diff.append(_get_commit_dict( | ||||
filename=dp['filename'], op=dp['operation'], | ||||
new_revision=dp['new_revision'], | ||||
old_revision=dp['old_revision'], | ||||
raw_diff=dp['raw_diff'], stats=_stats)) | ||||
return parsed_diff | ||||
def get_commit_or_error(ref, repo): | ||||
try: | ||||
ref_type, _, ref_hash = ref.split(':') | ||||
except ValueError: | ||||
raise JSONRPCError( | ||||
'Ref `{ref}` given in a wrong format. Please check the API' | ||||
' documentation for more details'.format(ref=ref)) | ||||
try: | ||||
# TODO: dan: refactor this to use repo.scm_instance().get_commit() | ||||
# once get_commit supports ref_types | ||||
return get_commit_from_ref_name(repo, ref_hash) | ||||
except RepositoryError: | ||||
r5092 | raise JSONRPCError(f'Ref `{ref}` does not exist') | |||
r1 | ||||
r3302 | def _get_ref_hash(repo, type_, name): | |||
vcs_repo = repo.scm_instance() | ||||
if type_ in ['branch'] and vcs_repo.alias in ('hg', 'git'): | ||||
return vcs_repo.branches[name] | ||||
elif type_ in ['bookmark', 'book'] and vcs_repo.alias == 'hg': | ||||
return vcs_repo.bookmarks[name] | ||||
else: | ||||
raise ValueError() | ||||
def resolve_ref_or_error(ref, repo, allowed_ref_types=None): | ||||
allowed_ref_types = allowed_ref_types or ['bookmark', 'book', 'tag', 'branch'] | ||||
r1 | def _parse_ref(type_, name, hash_=None): | |||
return type_, name, hash_ | ||||
try: | ||||
ref_type, ref_name, ref_hash = _parse_ref(*ref.split(':')) | ||||
except TypeError: | ||||
raise JSONRPCError( | ||||
'Ref `{ref}` given in a wrong format. Please check the API' | ||||
' documentation for more details'.format(ref=ref)) | ||||
r3302 | if ref_type not in allowed_ref_types: | |||
raise JSONRPCError( | ||||
'Ref `{ref}` type is not allowed. ' | ||||
'Only:{allowed_refs} are possible.'.format( | ||||
ref=ref, allowed_refs=allowed_ref_types)) | ||||
r1 | try: | |||
ref_hash = ref_hash or _get_ref_hash(repo, ref_type, ref_name) | ||||
except (KeyError, ValueError): | ||||
raise JSONRPCError( | ||||
r2981 | 'The specified value:{type}:`{name}` does not exist, or is not allowed.'.format( | |||
r1 | type=ref_type, name=ref_name)) | |||
return ':'.join([ref_type, ref_name, ref_hash]) | ||||
def _get_commit_dict( | ||||
filename, op, new_revision=None, old_revision=None, | ||||
raw_diff=None, stats=None): | ||||
if stats is None: | ||||
stats = { | ||||
"added": None, | ||||
"binary": None, | ||||
"deleted": None | ||||
} | ||||
return { | ||||
r5047 | "filename": safe_str(filename), | |||
r1 | "op": op, | |||
# extra details | ||||
"new_revision": new_revision, | ||||
"old_revision": old_revision, | ||||
"raw_diff": raw_diff, | ||||
"stats": stats | ||||
} | ||||