##// END OF EJS Templates
added some failing tests for compare using cherry pick changesets, to be fixed later
added some failing tests for compare using cherry pick changesets, to be fixed later

File last commit:

r3235:d6029dac beta
r3331:0379e15f beta
Show More
api.py
958 lines | 31.7 KiB | text/x-python | PythonLexer
# -*- coding: utf-8 -*-
"""
rhodecode.controllers.api
~~~~~~~~~~~~~~~~~~~~~~~~~
API controller for RhodeCode
:created_on: Aug 20, 2011
:author: marcink
:copyright: (C) 2011-2012 Marcin Kuzminski <marcin@python-works.com>
:license: GPLv3, see COPYING for more details.
"""
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; version 2
# of the License or (at your opinion) any later version of the license.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
import traceback
import logging
from pylons.controllers.util import abort
from rhodecode.controllers.api import JSONRPCController, JSONRPCError
from rhodecode.lib.auth import PasswordGenerator, AuthUser, \
HasPermissionAllDecorator, HasPermissionAnyDecorator, \
HasPermissionAnyApi, HasRepoPermissionAnyApi
from rhodecode.lib.utils import map_groups, repo2db_mapper
from rhodecode.model.meta import Session
from rhodecode.model.scm import ScmModel
from rhodecode.model.repo import RepoModel
from rhodecode.model.user import UserModel
from rhodecode.model.users_group import UsersGroupModel
from rhodecode.model.permission import PermissionModel
from rhodecode.model.db import Repository, RhodeCodeSetting, UserIpMap
log = logging.getLogger(__name__)
class OptionalAttr(object):
"""
Special Optional Option that defines other attribute
"""
def __init__(self, attr_name):
self.attr_name = attr_name
def __repr__(self):
return '<OptionalAttr:%s>' % self.attr_name
def __call__(self):
return self
#alias
OAttr = OptionalAttr
class Optional(object):
"""
Defines an optional parameter::
param = param.getval() if isinstance(param, Optional) else param
param = param() if isinstance(param, Optional) else param
is equivalent of::
param = Optional.extract(param)
"""
def __init__(self, type_):
self.type_ = type_
def __repr__(self):
return '<Optional:%s>' % self.type_.__repr__()
def __call__(self):
return self.getval()
def getval(self):
"""
returns value from this Optional instance
"""
return self.type_
@classmethod
def extract(cls, val):
if isinstance(val, cls):
return val.getval()
return val
def get_user_or_error(userid):
"""
Get user by id or name or return JsonRPCError if not found
:param userid:
"""
user = UserModel().get_user(userid)
if user is None:
raise JSONRPCError("user `%s` does not exist" % userid)
return user
def get_repo_or_error(repoid):
"""
Get repo by id or name or return JsonRPCError if not found
:param userid:
"""
repo = RepoModel().get_repo(repoid)
if repo is None:
raise JSONRPCError('repository `%s` does not exist' % (repoid))
return repo
def get_users_group_or_error(usersgroupid):
"""
Get users group by id or name or return JsonRPCError if not found
:param userid:
"""
users_group = UsersGroupModel().get_group(usersgroupid)
if users_group is None:
raise JSONRPCError('users group `%s` does not exist' % usersgroupid)
return users_group
def get_perm_or_error(permid):
"""
Get permission by id or name or return JsonRPCError if not found
:param userid:
"""
perm = PermissionModel().get_permission_by_name(permid)
if perm is None:
raise JSONRPCError('permission `%s` does not exist' % (permid))
return perm
class ApiController(JSONRPCController):
"""
API Controller
Each method needs to have USER as argument this is then based on given
API_KEY propagated as instance of user object
Preferably this should be first argument also
Each function should also **raise** JSONRPCError for any
errors that happens
"""
@HasPermissionAllDecorator('hg.admin')
def pull(self, apiuser, repoid):
"""
Dispatch pull action on given repo
:param apiuser:
:param repoid:
"""
repo = get_repo_or_error(repoid)
try:
ScmModel().pull_changes(repo.repo_name,
self.rhodecode_user.username)
return 'Pulled from `%s`' % repo.repo_name
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError(
'Unable to pull changes from `%s`' % repo.repo_name
)
@HasPermissionAllDecorator('hg.admin')
def rescan_repos(self, apiuser, remove_obsolete=Optional(False)):
"""
Dispatch rescan repositories action. If remove_obsolete is set
than also delete repos that are in database but not in the filesystem.
aka "clean zombies"
:param apiuser:
:param remove_obsolete:
"""
try:
rm_obsolete = Optional.extract(remove_obsolete)
added, removed = repo2db_mapper(ScmModel().repo_scan(),
remove_obsolete=rm_obsolete)
return {'added': added, 'removed': removed}
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError(
'Error occurred during rescan repositories action'
)
def invalidate_cache(self, apiuser, repoid):
"""
Dispatch cache invalidation action on given repo
:param apiuser:
:param repoid:
"""
repo = get_repo_or_error(repoid)
if HasPermissionAnyApi('hg.admin')(user=apiuser) is False:
# check if we have admin permission for this repo !
if HasRepoPermissionAnyApi('repository.admin',
'repository.write')(user=apiuser,
repo_name=repo.repo_name) is False:
raise JSONRPCError('repository `%s` does not exist' % (repoid))
try:
invalidated_keys = ScmModel().mark_for_invalidation(repo.repo_name)
Session().commit()
return ('Cache for repository `%s` was invalidated: '
'invalidated cache keys: %s' % (repoid, invalidated_keys))
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError(
'Error occurred during cache invalidation action'
)
def lock(self, apiuser, repoid, locked, userid=Optional(OAttr('apiuser'))):
"""
Set locking state on particular repository by given user, if
this command is runned by non-admin account userid is set to user
who is calling this method
:param apiuser:
:param repoid:
:param userid:
:param locked:
"""
repo = get_repo_or_error(repoid)
if HasPermissionAnyApi('hg.admin')(user=apiuser):
pass
elif HasRepoPermissionAnyApi('repository.admin',
'repository.write')(user=apiuser,
repo_name=repo.repo_name):
#make sure normal user does not pass someone else userid,
#he is not allowed to do that
if not isinstance(userid, Optional) and userid != apiuser.user_id:
raise JSONRPCError(
'userid is not the same as your user'
)
else:
raise JSONRPCError('repository `%s` does not exist' % (repoid))
if isinstance(userid, Optional):
userid = apiuser.user_id
user = get_user_or_error(userid)
locked = bool(locked)
try:
if locked:
Repository.lock(repo, user.user_id)
else:
Repository.unlock(repo)
return ('User `%s` set lock state for repo `%s` to `%s`'
% (user.username, repo.repo_name, locked))
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError(
'Error occurred locking repository `%s`' % repo.repo_name
)
@HasPermissionAllDecorator('hg.admin')
def show_ip(self, apiuser, userid):
"""
Shows IP address as seen from RhodeCode server, together with all
defined IP addresses for given user
:param apiuser:
:param userid:
"""
user = get_user_or_error(userid)
ips = UserIpMap.query().filter(UserIpMap.user == user).all()
return dict(
ip_addr_server=self.ip_addr,
user_ips=ips
)
def get_user(self, apiuser, userid=Optional(OAttr('apiuser'))):
""""
Get a user by username, or userid, if userid is given
:param apiuser:
:param userid:
"""
if HasPermissionAnyApi('hg.admin')(user=apiuser) is False:
#make sure normal user does not pass someone else userid,
#he is not allowed to do that
if not isinstance(userid, Optional) and userid != apiuser.user_id:
raise JSONRPCError(
'userid is not the same as your user'
)
if isinstance(userid, Optional):
userid = apiuser.user_id
user = get_user_or_error(userid)
data = user.get_api_data()
data['permissions'] = AuthUser(user_id=user.user_id).permissions
return data
@HasPermissionAllDecorator('hg.admin')
def get_users(self, apiuser):
""""
Get all users
:param apiuser:
"""
result = []
for user in UserModel().get_all():
result.append(user.get_api_data())
return result
@HasPermissionAllDecorator('hg.admin')
def create_user(self, apiuser, username, email, password,
firstname=Optional(None), lastname=Optional(None),
active=Optional(True), admin=Optional(False),
ldap_dn=Optional(None)):
"""
Create new user
:param apiuser:
:param username:
:param email:
:param password:
:param firstname:
:param lastname:
:param active:
:param admin:
:param ldap_dn:
"""
if UserModel().get_by_username(username):
raise JSONRPCError("user `%s` already exist" % username)
if UserModel().get_by_email(email, case_insensitive=True):
raise JSONRPCError("email `%s` already exist" % email)
if Optional.extract(ldap_dn):
# generate temporary password if ldap_dn
password = PasswordGenerator().gen_password(length=8)
try:
user = UserModel().create_or_update(
username=Optional.extract(username),
password=Optional.extract(password),
email=Optional.extract(email),
firstname=Optional.extract(firstname),
lastname=Optional.extract(lastname),
active=Optional.extract(active),
admin=Optional.extract(admin),
ldap_dn=Optional.extract(ldap_dn)
)
Session().commit()
return dict(
msg='created new user `%s`' % username,
user=user.get_api_data()
)
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError('failed to create user `%s`' % username)
@HasPermissionAllDecorator('hg.admin')
def update_user(self, apiuser, userid, username=Optional(None),
email=Optional(None), firstname=Optional(None),
lastname=Optional(None), active=Optional(None),
admin=Optional(None), ldap_dn=Optional(None),
password=Optional(None)):
"""
Updates given user
:param apiuser:
:param userid:
:param username:
:param email:
:param firstname:
:param lastname:
:param active:
:param admin:
:param ldap_dn:
:param password:
"""
user = get_user_or_error(userid)
# call function and store only updated arguments
updates = {}
def store_update(attr, name):
if not isinstance(attr, Optional):
updates[name] = attr
try:
store_update(username, 'username')
store_update(password, 'password')
store_update(email, 'email')
store_update(firstname, 'name')
store_update(lastname, 'lastname')
store_update(active, 'active')
store_update(admin, 'admin')
store_update(ldap_dn, 'ldap_dn')
user = UserModel().update_user(user, **updates)
Session().commit()
return dict(
msg='updated user ID:%s %s' % (user.user_id, user.username),
user=user.get_api_data()
)
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError('failed to update user `%s`' % userid)
@HasPermissionAllDecorator('hg.admin')
def delete_user(self, apiuser, userid):
""""
Deletes an user
:param apiuser:
:param userid:
"""
user = get_user_or_error(userid)
try:
UserModel().delete(userid)
Session().commit()
return dict(
msg='deleted user ID:%s %s' % (user.user_id, user.username),
user=None
)
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError('failed to delete ID:%s %s' % (user.user_id,
user.username))
@HasPermissionAllDecorator('hg.admin')
def get_users_group(self, apiuser, usersgroupid):
""""
Get users group by name or id
:param apiuser:
:param usersgroupid:
"""
users_group = get_users_group_or_error(usersgroupid)
data = users_group.get_api_data()
members = []
for user in users_group.members:
user = user.user
members.append(user.get_api_data())
data['members'] = members
return data
@HasPermissionAllDecorator('hg.admin')
def get_users_groups(self, apiuser):
""""
Get all users groups
:param apiuser:
"""
result = []
for users_group in UsersGroupModel().get_all():
result.append(users_group.get_api_data())
return result
@HasPermissionAllDecorator('hg.admin')
def create_users_group(self, apiuser, group_name, active=Optional(True)):
"""
Creates an new usergroup
:param apiuser:
:param group_name:
:param active:
"""
if UsersGroupModel().get_by_name(group_name):
raise JSONRPCError("users group `%s` already exist" % group_name)
try:
active = Optional.extract(active)
ug = UsersGroupModel().create(name=group_name, active=active)
Session().commit()
return dict(
msg='created new users group `%s`' % group_name,
users_group=ug.get_api_data()
)
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError('failed to create group `%s`' % group_name)
@HasPermissionAllDecorator('hg.admin')
def add_user_to_users_group(self, apiuser, usersgroupid, userid):
""""
Add a user to a users group
:param apiuser:
:param usersgroupid:
:param userid:
"""
user = get_user_or_error(userid)
users_group = get_users_group_or_error(usersgroupid)
try:
ugm = UsersGroupModel().add_user_to_group(users_group, user)
success = True if ugm != True else False
msg = 'added member `%s` to users group `%s`' % (
user.username, users_group.users_group_name
)
msg = msg if success else 'User is already in that group'
Session().commit()
return dict(
success=success,
msg=msg
)
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError(
'failed to add member to users group `%s`' % (
users_group.users_group_name
)
)
@HasPermissionAllDecorator('hg.admin')
def remove_user_from_users_group(self, apiuser, usersgroupid, userid):
"""
Remove user from a group
:param apiuser:
:param usersgroupid:
:param userid:
"""
user = get_user_or_error(userid)
users_group = get_users_group_or_error(usersgroupid)
try:
success = UsersGroupModel().remove_user_from_group(users_group,
user)
msg = 'removed member `%s` from users group `%s`' % (
user.username, users_group.users_group_name
)
msg = msg if success else "User wasn't in group"
Session().commit()
return dict(success=success, msg=msg)
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError(
'failed to remove member from users group `%s`' % (
users_group.users_group_name
)
)
def get_repo(self, apiuser, repoid):
""""
Get repository by name
:param apiuser:
:param repoid:
"""
repo = get_repo_or_error(repoid)
if HasPermissionAnyApi('hg.admin')(user=apiuser) is False:
# check if we have admin permission for this repo !
if HasRepoPermissionAnyApi('repository.admin')(user=apiuser,
repo_name=repo.repo_name) is False:
raise JSONRPCError('repository `%s` does not exist' % (repoid))
members = []
followers = []
for user in repo.repo_to_perm:
perm = user.permission.permission_name
user = user.user
user_data = user.get_api_data()
user_data['type'] = "user"
user_data['permission'] = perm
members.append(user_data)
for users_group in repo.users_group_to_perm:
perm = users_group.permission.permission_name
users_group = users_group.users_group
users_group_data = users_group.get_api_data()
users_group_data['type'] = "users_group"
users_group_data['permission'] = perm
members.append(users_group_data)
for user in repo.followers:
followers.append(user.user.get_api_data())
data = repo.get_api_data()
data['members'] = members
data['followers'] = followers
return data
def get_repos(self, apiuser):
""""
Get all repositories
:param apiuser:
"""
result = []
if HasPermissionAnyApi('hg.admin')(user=apiuser) is False:
repos = RepoModel().get_all_user_repos(user=apiuser)
else:
repos = RepoModel().get_all()
for repo in repos:
result.append(repo.get_api_data())
return result
@HasPermissionAllDecorator('hg.admin')
def get_repo_nodes(self, apiuser, repoid, revision, root_path,
ret_type='all'):
"""
returns a list of nodes and it's children
for a given path at given revision. It's possible to specify ret_type
to show only files or dirs
:param apiuser:
:param repoid: name or id of repository
:param revision: revision for which listing should be done
:param root_path: path from which start displaying
:param ret_type: return type 'all|files|dirs' nodes
"""
repo = get_repo_or_error(repoid)
try:
_d, _f = ScmModel().get_nodes(repo, revision, root_path,
flat=False)
_map = {
'all': _d + _f,
'files': _f,
'dirs': _d,
}
return _map[ret_type]
except KeyError:
raise JSONRPCError('ret_type must be one of %s' % _map.keys())
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError(
'failed to get repo: `%s` nodes' % repo.repo_name
)
@HasPermissionAnyDecorator('hg.admin', 'hg.create.repository')
def create_repo(self, apiuser, repo_name, owner=Optional(OAttr('apiuser')),
repo_type=Optional('hg'),
description=Optional(''), private=Optional(False),
clone_uri=Optional(None), landing_rev=Optional('tip'),
enable_statistics=Optional(False),
enable_locking=Optional(False),
enable_downloads=Optional(False)):
"""
Create repository, if clone_url is given it makes a remote clone
if repo_name is within a group name the groups will be created
automatically if they aren't present
:param apiuser:
:param repo_name:
:param onwer:
:param repo_type:
:param description:
:param private:
:param clone_uri:
:param landing_rev:
"""
if HasPermissionAnyApi('hg.admin')(user=apiuser) is False:
if not isinstance(owner, Optional):
#forbid setting owner for non-admins
raise JSONRPCError(
'Only RhodeCode admin can specify `owner` param'
)
if isinstance(owner, Optional):
owner = apiuser.user_id
owner = get_user_or_error(owner)
if RepoModel().get_by_repo_name(repo_name):
raise JSONRPCError("repo `%s` already exist" % repo_name)
defs = RhodeCodeSetting.get_default_repo_settings(strip_prefix=True)
if isinstance(private, Optional):
private = defs.get('repo_private') or Optional.extract(private)
if isinstance(repo_type, Optional):
repo_type = defs.get('repo_type')
if isinstance(enable_statistics, Optional):
enable_statistics = defs.get('repo_enable_statistics')
if isinstance(enable_locking, Optional):
enable_locking = defs.get('repo_enable_locking')
if isinstance(enable_downloads, Optional):
enable_downloads = defs.get('repo_enable_downloads')
clone_uri = Optional.extract(clone_uri)
description = Optional.extract(description)
landing_rev = Optional.extract(landing_rev)
try:
# create structure of groups and return the last group
group = map_groups(repo_name)
repo = RepoModel().create_repo(
repo_name=repo_name,
repo_type=repo_type,
description=description,
owner=owner,
private=private,
clone_uri=clone_uri,
repos_group=group,
landing_rev=landing_rev,
enable_statistics=enable_statistics,
enable_downloads=enable_downloads,
enable_locking=enable_locking
)
Session().commit()
return dict(
msg="Created new repository `%s`" % (repo.repo_name),
repo=repo.get_api_data()
)
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError('failed to create repository `%s`' % repo_name)
@HasPermissionAnyDecorator('hg.admin', 'hg.fork.repository')
def fork_repo(self, apiuser, repoid, fork_name, owner=Optional(OAttr('apiuser')),
description=Optional(''), copy_permissions=Optional(False),
private=Optional(False), landing_rev=Optional('tip')):
repo = get_repo_or_error(repoid)
repo_name = repo.repo_name
_repo = RepoModel().get_by_repo_name(fork_name)
if _repo:
type_ = 'fork' if _repo.fork else 'repo'
raise JSONRPCError("%s `%s` already exist" % (type_, fork_name))
if HasPermissionAnyApi('hg.admin')(user=apiuser):
pass
elif HasRepoPermissionAnyApi('repository.admin',
'repository.write',
'repository.read')(user=apiuser,
repo_name=repo.repo_name):
if not isinstance(owner, Optional):
#forbid setting owner for non-admins
raise JSONRPCError(
'Only RhodeCode admin can specify `owner` param'
)
else:
raise JSONRPCError('repository `%s` does not exist' % (repoid))
if isinstance(owner, Optional):
owner = apiuser.user_id
owner = get_user_or_error(owner)
try:
# create structure of groups and return the last group
group = map_groups(fork_name)
form_data = dict(
repo_name=fork_name,
repo_name_full=fork_name,
repo_group=group,
repo_type=repo.repo_type,
description=Optional.extract(description),
private=Optional.extract(private),
copy_permissions=Optional.extract(copy_permissions),
landing_rev=Optional.extract(landing_rev),
update_after_clone=False,
fork_parent_id=repo.repo_id,
)
RepoModel().create_fork(form_data, cur_user=owner)
return dict(
msg='Created fork of `%s` as `%s`' % (repo.repo_name,
fork_name),
success=True # cannot return the repo data here since fork
# cann be done async
)
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError(
'failed to fork repository `%s` as `%s`' % (repo_name,
fork_name)
)
def delete_repo(self, apiuser, repoid):
"""
Deletes a given repository
:param apiuser:
:param repoid:
"""
repo = get_repo_or_error(repoid)
if HasPermissionAnyApi('hg.admin')(user=apiuser) is False:
# check if we have admin permission for this repo !
if HasRepoPermissionAnyApi('repository.admin')(user=apiuser,
repo_name=repo.repo_name) is False:
raise JSONRPCError('repository `%s` does not exist' % (repoid))
try:
RepoModel().delete(repo)
Session().commit()
return dict(
msg='Deleted repository `%s`' % repo.repo_name,
success=True
)
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError(
'failed to delete repository `%s`' % repo.repo_name
)
@HasPermissionAllDecorator('hg.admin')
def grant_user_permission(self, apiuser, repoid, userid, perm):
"""
Grant permission for user on given repository, or update existing one
if found
:param repoid:
:param userid:
:param perm:
"""
repo = get_repo_or_error(repoid)
user = get_user_or_error(userid)
perm = get_perm_or_error(perm)
try:
RepoModel().grant_user_permission(repo=repo, user=user, perm=perm)
Session().commit()
return dict(
msg='Granted perm: `%s` for user: `%s` in repo: `%s`' % (
perm.permission_name, user.username, repo.repo_name
),
success=True
)
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError(
'failed to edit permission for user: `%s` in repo: `%s`' % (
userid, repoid
)
)
@HasPermissionAllDecorator('hg.admin')
def revoke_user_permission(self, apiuser, repoid, userid):
"""
Revoke permission for user on given repository
:param apiuser:
:param repoid:
:param userid:
"""
repo = get_repo_or_error(repoid)
user = get_user_or_error(userid)
try:
RepoModel().revoke_user_permission(repo=repo, user=user)
Session().commit()
return dict(
msg='Revoked perm for user: `%s` in repo: `%s`' % (
user.username, repo.repo_name
),
success=True
)
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError(
'failed to edit permission for user: `%s` in repo: `%s`' % (
userid, repoid
)
)
@HasPermissionAllDecorator('hg.admin')
def grant_users_group_permission(self, apiuser, repoid, usersgroupid,
perm):
"""
Grant permission for users group on given repository, or update
existing one if found
:param apiuser:
:param repoid:
:param usersgroupid:
:param perm:
"""
repo = get_repo_or_error(repoid)
perm = get_perm_or_error(perm)
users_group = get_users_group_or_error(usersgroupid)
try:
RepoModel().grant_users_group_permission(repo=repo,
group_name=users_group,
perm=perm)
Session().commit()
return dict(
msg='Granted perm: `%s` for users group: `%s` in '
'repo: `%s`' % (
perm.permission_name, users_group.users_group_name,
repo.repo_name
),
success=True
)
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError(
'failed to edit permission for users group: `%s` in '
'repo: `%s`' % (
usersgroupid, repo.repo_name
)
)
@HasPermissionAllDecorator('hg.admin')
def revoke_users_group_permission(self, apiuser, repoid, usersgroupid):
"""
Revoke permission for users group on given repository
:param apiuser:
:param repoid:
:param usersgroupid:
"""
repo = get_repo_or_error(repoid)
users_group = get_users_group_or_error(usersgroupid)
try:
RepoModel().revoke_users_group_permission(repo=repo,
group_name=users_group)
Session().commit()
return dict(
msg='Revoked perm for users group: `%s` in repo: `%s`' % (
users_group.users_group_name, repo.repo_name
),
success=True
)
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError(
'failed to edit permission for users group: `%s` in '
'repo: `%s`' % (
users_group.users_group_name, repo.repo_name
)
)