|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
# Copyright (C) 2014-2020 RhodeCode GmbH
|
|
|
#
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU Affero General Public License, version 3
|
|
|
# (only), as published by the Free Software Foundation.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
#
|
|
|
# This program is dual-licensed. If you wish to learn more about the
|
|
|
# RhodeCode Enterprise Edition, including its added features, Support services,
|
|
|
# and proprietary license terms, please see https://rhodecode.com/licenses/
|
|
|
|
|
|
"""
|
|
|
JSON RPC utils
|
|
|
"""
|
|
|
|
|
|
import collections
|
|
|
import logging
|
|
|
|
|
|
from rhodecode.api.exc import JSONRPCError
|
|
|
from rhodecode.lib.auth import (
|
|
|
HasPermissionAnyApi, HasRepoPermissionAnyApi, HasRepoGroupPermissionAnyApi)
|
|
|
from rhodecode.lib.utils import safe_unicode
|
|
|
from rhodecode.lib.vcs.exceptions import RepositoryError
|
|
|
from rhodecode.lib.view_utils import get_commit_from_ref_name
|
|
|
from rhodecode.lib.utils2 import str2bool
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
class OAttr(object):
|
|
|
"""
|
|
|
Special Option that defines other attribute, and can default to them
|
|
|
|
|
|
Example::
|
|
|
|
|
|
def test(apiuser, userid=Optional(OAttr('apiuser')):
|
|
|
user = Optional.extract(userid, evaluate_locals=local())
|
|
|
#if we pass in userid, we get it, else it will default to apiuser
|
|
|
#attribute
|
|
|
"""
|
|
|
|
|
|
def __init__(self, attr_name):
|
|
|
self.attr_name = attr_name
|
|
|
|
|
|
def __repr__(self):
|
|
|
return '<OptionalAttr:%s>' % self.attr_name
|
|
|
|
|
|
def __call__(self):
|
|
|
return self
|
|
|
|
|
|
|
|
|
class Optional(object):
|
|
|
"""
|
|
|
Defines an optional parameter::
|
|
|
|
|
|
param = param.getval() if isinstance(param, Optional) else param
|
|
|
param = param() if isinstance(param, Optional) else param
|
|
|
|
|
|
is equivalent of::
|
|
|
|
|
|
param = Optional.extract(param)
|
|
|
|
|
|
"""
|
|
|
|
|
|
def __init__(self, type_):
|
|
|
self.type_ = type_
|
|
|
|
|
|
def __repr__(self):
|
|
|
return '<Optional:%s>' % self.type_.__repr__()
|
|
|
|
|
|
def __call__(self):
|
|
|
return self.getval()
|
|
|
|
|
|
def getval(self, evaluate_locals=None):
|
|
|
"""
|
|
|
returns value from this Optional instance
|
|
|
"""
|
|
|
if isinstance(self.type_, OAttr):
|
|
|
param_name = self.type_.attr_name
|
|
|
if evaluate_locals:
|
|
|
return evaluate_locals[param_name]
|
|
|
# use params name
|
|
|
return param_name
|
|
|
return self.type_
|
|
|
|
|
|
@classmethod
|
|
|
def extract(cls, val, evaluate_locals=None, binary=None):
|
|
|
"""
|
|
|
Extracts value from Optional() instance
|
|
|
|
|
|
:param val:
|
|
|
:return: original value if it's not Optional instance else
|
|
|
value of instance
|
|
|
"""
|
|
|
if isinstance(val, cls):
|
|
|
val = val.getval(evaluate_locals)
|
|
|
|
|
|
if binary:
|
|
|
val = str2bool(val)
|
|
|
|
|
|
return val
|
|
|
|
|
|
|
|
|
def parse_args(cli_args, key_prefix=''):
|
|
|
from rhodecode.lib.utils2 import (escape_split)
|
|
|
kwargs = collections.defaultdict(dict)
|
|
|
for el in escape_split(cli_args, ','):
|
|
|
kv = escape_split(el, '=', 1)
|
|
|
if len(kv) == 2:
|
|
|
k, v = kv
|
|
|
kwargs[key_prefix + k] = v
|
|
|
return kwargs
|
|
|
|
|
|
|
|
|
def get_origin(obj):
|
|
|
"""
|
|
|
Get origin of permission from object.
|
|
|
|
|
|
:param obj:
|
|
|
"""
|
|
|
origin = 'permission'
|
|
|
|
|
|
if getattr(obj, 'owner_row', '') and getattr(obj, 'admin_row', ''):
|
|
|
# admin and owner case, maybe we should use dual string ?
|
|
|
origin = 'owner'
|
|
|
elif getattr(obj, 'owner_row', ''):
|
|
|
origin = 'owner'
|
|
|
elif getattr(obj, 'admin_row', ''):
|
|
|
origin = 'super-admin'
|
|
|
return origin
|
|
|
|
|
|
|
|
|
def store_update(updates, attr, name):
|
|
|
"""
|
|
|
Stores param in updates dict if it's not instance of Optional
|
|
|
allows easy updates of passed in params
|
|
|
"""
|
|
|
if not isinstance(attr, Optional):
|
|
|
updates[name] = attr
|
|
|
|
|
|
|
|
|
def has_superadmin_permission(apiuser):
|
|
|
"""
|
|
|
Return True if apiuser is admin or return False
|
|
|
|
|
|
:param apiuser:
|
|
|
"""
|
|
|
if HasPermissionAnyApi('hg.admin')(user=apiuser):
|
|
|
return True
|
|
|
return False
|
|
|
|
|
|
|
|
|
def validate_repo_permissions(apiuser, repoid, repo, perms):
|
|
|
"""
|
|
|
Raise JsonRPCError if apiuser is not authorized or return True
|
|
|
|
|
|
:param apiuser:
|
|
|
:param repoid:
|
|
|
:param repo:
|
|
|
:param perms:
|
|
|
"""
|
|
|
if not HasRepoPermissionAnyApi(*perms)(
|
|
|
user=apiuser, repo_name=repo.repo_name):
|
|
|
raise JSONRPCError('repository `%s` does not exist' % repoid)
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
def validate_repo_group_permissions(apiuser, repogroupid, repo_group, perms):
|
|
|
"""
|
|
|
Raise JsonRPCError if apiuser is not authorized or return True
|
|
|
|
|
|
:param apiuser:
|
|
|
:param repogroupid: just the id of repository group
|
|
|
:param repo_group: instance of repo_group
|
|
|
:param perms:
|
|
|
"""
|
|
|
if not HasRepoGroupPermissionAnyApi(*perms)(
|
|
|
user=apiuser, group_name=repo_group.group_name):
|
|
|
raise JSONRPCError(
|
|
|
'repository group `%s` does not exist' % repogroupid)
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
def validate_set_owner_permissions(apiuser, owner):
|
|
|
if isinstance(owner, Optional):
|
|
|
owner = get_user_or_error(apiuser.user_id)
|
|
|
else:
|
|
|
if has_superadmin_permission(apiuser):
|
|
|
owner = get_user_or_error(owner)
|
|
|
else:
|
|
|
# forbid setting owner for non-admins
|
|
|
raise JSONRPCError(
|
|
|
'Only RhodeCode super-admin can specify `owner` param')
|
|
|
return owner
|
|
|
|
|
|
|
|
|
def get_user_or_error(userid):
|
|
|
"""
|
|
|
Get user by id or name or return JsonRPCError if not found
|
|
|
|
|
|
:param userid:
|
|
|
"""
|
|
|
from rhodecode.model.user import UserModel
|
|
|
user_model = UserModel()
|
|
|
|
|
|
if isinstance(userid, int):
|
|
|
try:
|
|
|
user = user_model.get_user(userid)
|
|
|
except ValueError:
|
|
|
user = None
|
|
|
else:
|
|
|
user = user_model.get_by_username(userid)
|
|
|
|
|
|
if user is None:
|
|
|
raise JSONRPCError(
|
|
|
'user `%s` does not exist' % (userid,))
|
|
|
return user
|
|
|
|
|
|
|
|
|
def get_repo_or_error(repoid):
|
|
|
"""
|
|
|
Get repo by id or name or return JsonRPCError if not found
|
|
|
|
|
|
:param repoid:
|
|
|
"""
|
|
|
from rhodecode.model.repo import RepoModel
|
|
|
repo_model = RepoModel()
|
|
|
|
|
|
if isinstance(repoid, int):
|
|
|
try:
|
|
|
repo = repo_model.get_repo(repoid)
|
|
|
except ValueError:
|
|
|
repo = None
|
|
|
else:
|
|
|
repo = repo_model.get_by_repo_name(repoid)
|
|
|
|
|
|
if repo is None:
|
|
|
raise JSONRPCError(
|
|
|
'repository `%s` does not exist' % (repoid,))
|
|
|
return repo
|
|
|
|
|
|
|
|
|
def get_repo_group_or_error(repogroupid):
|
|
|
"""
|
|
|
Get repo group by id or name or return JsonRPCError if not found
|
|
|
|
|
|
:param repogroupid:
|
|
|
"""
|
|
|
from rhodecode.model.repo_group import RepoGroupModel
|
|
|
repo_group_model = RepoGroupModel()
|
|
|
|
|
|
if isinstance(repogroupid, int):
|
|
|
try:
|
|
|
repo_group = repo_group_model._get_repo_group(repogroupid)
|
|
|
except ValueError:
|
|
|
repo_group = None
|
|
|
else:
|
|
|
repo_group = repo_group_model.get_by_group_name(repogroupid)
|
|
|
|
|
|
if repo_group is None:
|
|
|
raise JSONRPCError(
|
|
|
'repository group `%s` does not exist' % (repogroupid,))
|
|
|
return repo_group
|
|
|
|
|
|
|
|
|
def get_user_group_or_error(usergroupid):
|
|
|
"""
|
|
|
Get user group by id or name or return JsonRPCError if not found
|
|
|
|
|
|
:param usergroupid:
|
|
|
"""
|
|
|
from rhodecode.model.user_group import UserGroupModel
|
|
|
user_group_model = UserGroupModel()
|
|
|
|
|
|
if isinstance(usergroupid, int):
|
|
|
try:
|
|
|
user_group = user_group_model.get_group(usergroupid)
|
|
|
except ValueError:
|
|
|
user_group = None
|
|
|
else:
|
|
|
user_group = user_group_model.get_by_name(usergroupid)
|
|
|
|
|
|
if user_group is None:
|
|
|
raise JSONRPCError(
|
|
|
'user group `%s` does not exist' % (usergroupid,))
|
|
|
return user_group
|
|
|
|
|
|
|
|
|
def get_perm_or_error(permid, prefix=None):
|
|
|
"""
|
|
|
Get permission by id or name or return JsonRPCError if not found
|
|
|
|
|
|
:param permid:
|
|
|
"""
|
|
|
from rhodecode.model.permission import PermissionModel
|
|
|
|
|
|
perm = PermissionModel.cls.get_by_key(permid)
|
|
|
if perm is None:
|
|
|
msg = 'permission `{}` does not exist.'.format(permid)
|
|
|
if prefix:
|
|
|
msg += ' Permission should start with prefix: `{}`'.format(prefix)
|
|
|
raise JSONRPCError(msg)
|
|
|
|
|
|
if prefix:
|
|
|
if not perm.permission_name.startswith(prefix):
|
|
|
raise JSONRPCError('permission `%s` is invalid, '
|
|
|
'should start with %s' % (permid, prefix))
|
|
|
return perm
|
|
|
|
|
|
|
|
|
def get_gist_or_error(gistid):
|
|
|
"""
|
|
|
Get gist by id or gist_access_id or return JsonRPCError if not found
|
|
|
|
|
|
:param gistid:
|
|
|
"""
|
|
|
from rhodecode.model.gist import GistModel
|
|
|
|
|
|
gist = GistModel.cls.get_by_access_id(gistid)
|
|
|
if gist is None:
|
|
|
raise JSONRPCError('gist `%s` does not exist' % (gistid,))
|
|
|
return gist
|
|
|
|
|
|
|
|
|
def get_pull_request_or_error(pullrequestid):
|
|
|
"""
|
|
|
Get pull request by id or return JsonRPCError if not found
|
|
|
|
|
|
:param pullrequestid:
|
|
|
"""
|
|
|
from rhodecode.model.pull_request import PullRequestModel
|
|
|
|
|
|
try:
|
|
|
pull_request = PullRequestModel().get(int(pullrequestid))
|
|
|
except ValueError:
|
|
|
raise JSONRPCError('pullrequestid must be an integer')
|
|
|
if not pull_request:
|
|
|
raise JSONRPCError('pull request `%s` does not exist' % (
|
|
|
pullrequestid,))
|
|
|
return pull_request
|
|
|
|
|
|
|
|
|
def build_commit_data(rhodecode_vcs_repo, commit, detail_level):
|
|
|
commit2 = commit
|
|
|
commit1 = commit.first_parent
|
|
|
|
|
|
parsed_diff = []
|
|
|
if detail_level == 'extended':
|
|
|
for f_path in commit.added_paths:
|
|
|
parsed_diff.append(_get_commit_dict(filename=f_path, op='A'))
|
|
|
for f_path in commit.changed_paths:
|
|
|
parsed_diff.append(_get_commit_dict(filename=f_path, op='M'))
|
|
|
for f_path in commit.removed_paths:
|
|
|
parsed_diff.append(_get_commit_dict(filename=f_path, op='D'))
|
|
|
|
|
|
elif detail_level == 'full':
|
|
|
from rhodecode.lib import diffs
|
|
|
|
|
|
_diff = rhodecode_vcs_repo.get_diff(commit1, commit2,)
|
|
|
diff_processor = diffs.DiffProcessor(_diff, format='newdiff', show_full_diff=True)
|
|
|
|
|
|
for dp in diff_processor.prepare():
|
|
|
del dp['stats']['ops']
|
|
|
_stats = dp['stats']
|
|
|
parsed_diff.append(_get_commit_dict(
|
|
|
filename=dp['filename'], op=dp['operation'],
|
|
|
new_revision=dp['new_revision'],
|
|
|
old_revision=dp['old_revision'],
|
|
|
raw_diff=dp['raw_diff'], stats=_stats))
|
|
|
|
|
|
return parsed_diff
|
|
|
|
|
|
|
|
|
def get_commit_or_error(ref, repo):
|
|
|
try:
|
|
|
ref_type, _, ref_hash = ref.split(':')
|
|
|
except ValueError:
|
|
|
raise JSONRPCError(
|
|
|
'Ref `{ref}` given in a wrong format. Please check the API'
|
|
|
' documentation for more details'.format(ref=ref))
|
|
|
try:
|
|
|
# TODO: dan: refactor this to use repo.scm_instance().get_commit()
|
|
|
# once get_commit supports ref_types
|
|
|
return get_commit_from_ref_name(repo, ref_hash)
|
|
|
except RepositoryError:
|
|
|
raise JSONRPCError('Ref `{ref}` does not exist'.format(ref=ref))
|
|
|
|
|
|
|
|
|
def _get_ref_hash(repo, type_, name):
|
|
|
vcs_repo = repo.scm_instance()
|
|
|
if type_ in ['branch'] and vcs_repo.alias in ('hg', 'git'):
|
|
|
return vcs_repo.branches[name]
|
|
|
elif type_ in ['bookmark', 'book'] and vcs_repo.alias == 'hg':
|
|
|
return vcs_repo.bookmarks[name]
|
|
|
else:
|
|
|
raise ValueError()
|
|
|
|
|
|
|
|
|
def resolve_ref_or_error(ref, repo, allowed_ref_types=None):
|
|
|
allowed_ref_types = allowed_ref_types or ['bookmark', 'book', 'tag', 'branch']
|
|
|
|
|
|
def _parse_ref(type_, name, hash_=None):
|
|
|
return type_, name, hash_
|
|
|
|
|
|
try:
|
|
|
ref_type, ref_name, ref_hash = _parse_ref(*ref.split(':'))
|
|
|
except TypeError:
|
|
|
raise JSONRPCError(
|
|
|
'Ref `{ref}` given in a wrong format. Please check the API'
|
|
|
' documentation for more details'.format(ref=ref))
|
|
|
|
|
|
if ref_type not in allowed_ref_types:
|
|
|
raise JSONRPCError(
|
|
|
'Ref `{ref}` type is not allowed. '
|
|
|
'Only:{allowed_refs} are possible.'.format(
|
|
|
ref=ref, allowed_refs=allowed_ref_types))
|
|
|
|
|
|
try:
|
|
|
ref_hash = ref_hash or _get_ref_hash(repo, ref_type, ref_name)
|
|
|
except (KeyError, ValueError):
|
|
|
raise JSONRPCError(
|
|
|
'The specified value:{type}:`{name}` does not exist, or is not allowed.'.format(
|
|
|
type=ref_type, name=ref_name))
|
|
|
|
|
|
return ':'.join([ref_type, ref_name, ref_hash])
|
|
|
|
|
|
|
|
|
def _get_commit_dict(
|
|
|
filename, op, new_revision=None, old_revision=None,
|
|
|
raw_diff=None, stats=None):
|
|
|
if stats is None:
|
|
|
stats = {
|
|
|
"added": None,
|
|
|
"binary": None,
|
|
|
"deleted": None
|
|
|
}
|
|
|
return {
|
|
|
"filename": safe_unicode(filename),
|
|
|
"op": op,
|
|
|
|
|
|
# extra details
|
|
|
"new_revision": new_revision,
|
|
|
"old_revision": old_revision,
|
|
|
|
|
|
"raw_diff": raw_diff,
|
|
|
"stats": stats
|
|
|
}
|
|
|
|