##// END OF EJS Templates
Added diff option into git and hg changeset objects, representing git formated patch against parent1
Added diff option into git and hg changeset objects, representing git formated patch against parent1

File last commit:

r2379:7ac09514 beta
r2384:5563af83 beta
Show More
api.py
706 lines | 22.8 KiB | text/x-python | PythonLexer
# -*- coding: utf-8 -*-
"""
rhodecode.controllers.api
~~~~~~~~~~~~~~~~~~~~~~~~~
API controller for RhodeCode
:created_on: Aug 20, 2011
:author: marcink
:copyright: (C) 2011-2012 Marcin Kuzminski <marcin@python-works.com>
:license: GPLv3, see COPYING for more details.
"""
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; version 2
# of the License or (at your opinion) any later version of the license.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
import traceback
import logging
from rhodecode.controllers.api import JSONRPCController, JSONRPCError
from rhodecode.lib.auth import HasPermissionAllDecorator, \
HasPermissionAnyDecorator, PasswordGenerator, AuthUser
from rhodecode.model.meta import Session
from rhodecode.model.scm import ScmModel
from rhodecode.model.db import User, UsersGroup, Repository
from rhodecode.model.repo import RepoModel
from rhodecode.model.user import UserModel
from rhodecode.model.users_group import UsersGroupModel
from rhodecode.lib.utils import map_groups
log = logging.getLogger(__name__)
class ApiController(JSONRPCController):
"""
API Controller
Each method needs to have USER as argument this is then based on given
API_KEY propagated as instance of user object
Preferably this should be first argument also
Each function should also **raise** JSONRPCError for any
errors that happens
"""
@HasPermissionAllDecorator('hg.admin')
def pull(self, apiuser, repo_name):
"""
Dispatch pull action on given repo
:param user:
:param repo_name:
"""
if Repository.is_valid(repo_name) is False:
raise JSONRPCError('Unknown repo "%s"' % repo_name)
try:
ScmModel().pull_changes(repo_name, self.rhodecode_user.username)
return 'Pulled from %s' % repo_name
except Exception:
raise JSONRPCError('Unable to pull changes from "%s"' % repo_name)
@HasPermissionAllDecorator('hg.admin')
def get_user(self, apiuser, userid):
""""
Get a user by username
:param apiuser:
:param username:
"""
user = UserModel().get_user(userid)
if user is None:
return user
return dict(
id=user.user_id,
username=user.username,
firstname=user.name,
lastname=user.lastname,
email=user.email,
active=user.active,
admin=user.admin,
ldap_dn=user.ldap_dn,
last_login=user.last_login,
permissions=AuthUser(user_id=user.user_id).permissions
)
@HasPermissionAllDecorator('hg.admin')
def get_users(self, apiuser):
""""
Get all users
:param apiuser:
"""
result = []
for user in User.getAll():
result.append(
dict(
id=user.user_id,
username=user.username,
firstname=user.name,
lastname=user.lastname,
email=user.email,
active=user.active,
admin=user.admin,
ldap_dn=user.ldap_dn,
last_login=user.last_login,
)
)
return result
@HasPermissionAllDecorator('hg.admin')
def create_user(self, apiuser, username, email, password, firstname=None,
lastname=None, active=True, admin=False, ldap_dn=None):
"""
Create new user
:param apiuser:
:param username:
:param password:
:param email:
:param name:
:param lastname:
:param active:
:param admin:
:param ldap_dn:
"""
if User.get_by_username(username):
raise JSONRPCError("user %s already exist" % username)
if User.get_by_email(email, case_insensitive=True):
raise JSONRPCError("email %s already exist" % email)
if ldap_dn:
# generate temporary password if ldap_dn
password = PasswordGenerator().gen_password(length=8)
try:
user = UserModel().create_or_update(
username, password, email, firstname,
lastname, active, admin, ldap_dn
)
Session.commit()
return dict(
id=user.user_id,
msg='created new user %s' % username,
user=dict(
id=user.user_id,
username=user.username,
firstname=user.name,
lastname=user.lastname,
email=user.email,
active=user.active,
admin=user.admin,
ldap_dn=user.ldap_dn,
last_login=user.last_login,
)
)
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError('failed to create user %s' % username)
@HasPermissionAllDecorator('hg.admin')
def update_user(self, apiuser, userid, username, password, email,
firstname, lastname, active, admin, ldap_dn):
"""
Updates given user
:param apiuser:
:param username:
:param password:
:param email:
:param name:
:param lastname:
:param active:
:param admin:
:param ldap_dn:
"""
usr = UserModel().get_user(userid)
if not usr:
raise JSONRPCError("user ID:%s does not exist" % userid)
try:
usr = UserModel().create_or_update(
username, password, email, firstname,
lastname, active, admin, ldap_dn
)
Session.commit()
return dict(
id=usr.user_id,
msg='updated user ID:%s %s' % (usr.user_id, usr.username)
)
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError('failed to update user %s' % userid)
@HasPermissionAllDecorator('hg.admin')
def delete_user(self, apiuser, userid):
""""
Deletes an user
:param apiuser:
"""
usr = UserModel().get_user(userid)
if not usr:
raise JSONRPCError("user ID:%s does not exist" % userid)
try:
UserModel().delete(userid)
Session.commit()
return dict(
id=usr.user_id,
msg='deleted user ID:%s %s' % (usr.user_id, usr.username)
)
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError('failed to delete ID:%s %s' % (usr.user_id,
usr.username))
@HasPermissionAllDecorator('hg.admin')
def get_users_group(self, apiuser, group_name):
""""
Get users group by name
:param apiuser:
:param group_name:
"""
users_group = UsersGroup.get_by_group_name(group_name)
if not users_group:
return None
members = []
for user in users_group.members:
user = user.user
members.append(dict(id=user.user_id,
username=user.username,
firstname=user.name,
lastname=user.lastname,
email=user.email,
active=user.active,
admin=user.admin,
ldap=user.ldap_dn))
return dict(id=users_group.users_group_id,
group_name=users_group.users_group_name,
active=users_group.users_group_active,
members=members)
@HasPermissionAllDecorator('hg.admin')
def get_users_groups(self, apiuser):
""""
Get all users groups
:param apiuser:
"""
result = []
for users_group in UsersGroup.getAll():
members = []
for user in users_group.members:
user = user.user
members.append(dict(id=user.user_id,
username=user.username,
firstname=user.name,
lastname=user.lastname,
email=user.email,
active=user.active,
admin=user.admin,
ldap=user.ldap_dn))
result.append(dict(id=users_group.users_group_id,
group_name=users_group.users_group_name,
active=users_group.users_group_active,
members=members))
return result
@HasPermissionAllDecorator('hg.admin')
def create_users_group(self, apiuser, group_name, active=True):
"""
Creates an new usergroup
:param group_name:
:param active:
"""
if self.get_users_group(apiuser, group_name):
raise JSONRPCError("users group %s already exist" % group_name)
try:
ug = UsersGroupModel().create(name=group_name, active=active)
Session.commit()
return dict(id=ug.users_group_id,
msg='created new users group %s' % group_name)
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError('failed to create group %s' % group_name)
@HasPermissionAllDecorator('hg.admin')
def add_user_to_users_group(self, apiuser, group_name, username):
""""
Add a user to a users group
:param apiuser:
:param group_name:
:param username:
"""
try:
users_group = UsersGroup.get_by_group_name(group_name)
if not users_group:
raise JSONRPCError('unknown users group %s' % group_name)
user = User.get_by_username(username)
if user is None:
raise JSONRPCError('unknown user %s' % username)
ugm = UsersGroupModel().add_user_to_group(users_group, user)
success = True if ugm != True else False
msg = 'added member %s to users group %s' % (username, group_name)
msg = msg if success else 'User is already in that group'
Session.commit()
return dict(
id=ugm.users_group_member_id if ugm != True else None,
success=success,
msg=msg
)
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError('failed to add users group member')
@HasPermissionAllDecorator('hg.admin')
def remove_user_from_users_group(self, apiuser, group_name, username):
"""
Remove user from a group
:param apiuser
:param group_name
:param username
"""
try:
users_group = UsersGroup.get_by_group_name(group_name)
if not users_group:
raise JSONRPCError('unknown users group %s' % group_name)
user = User.get_by_username(username)
if user is None:
raise JSONRPCError('unknown user %s' % username)
success = UsersGroupModel().remove_user_from_group(users_group, user)
msg = 'removed member %s from users group %s' % (username, group_name)
msg = msg if success else "User wasn't in group"
Session.commit()
return dict(success=success, msg=msg)
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError('failed to remove user from group')
@HasPermissionAnyDecorator('hg.admin')
def get_repo(self, apiuser, repoid):
""""
Get repository by name
:param apiuser:
:param repo_name:
"""
repo = RepoModel().get_repo(repoid)
if repo is None:
raise JSONRPCError('unknown repository "%s"' % (repo or repoid))
members = []
for user in repo.repo_to_perm:
perm = user.permission.permission_name
user = user.user
members.append(
dict(
type="user",
id=user.user_id,
username=user.username,
firstname=user.name,
lastname=user.lastname,
email=user.email,
active=user.active,
admin=user.admin,
ldap=user.ldap_dn,
permission=perm
)
)
for users_group in repo.users_group_to_perm:
perm = users_group.permission.permission_name
users_group = users_group.users_group
members.append(
dict(
type="users_group",
id=users_group.users_group_id,
name=users_group.users_group_name,
active=users_group.users_group_active,
permission=perm
)
)
return dict(
id=repo.repo_id,
repo_name=repo.repo_name,
type=repo.repo_type,
clone_uri=repo.clone_uri,
private=repo.private,
created_on=repo.created_on,
description=repo.description,
members=members
)
@HasPermissionAnyDecorator('hg.admin')
def get_repos(self, apiuser):
""""
Get all repositories
:param apiuser:
"""
result = []
for repo in Repository.getAll():
result.append(
dict(
id=repo.repo_id,
repo_name=repo.repo_name,
type=repo.repo_type,
clone_uri=repo.clone_uri,
private=repo.private,
created_on=repo.created_on,
description=repo.description,
)
)
return result
@HasPermissionAnyDecorator('hg.admin')
def get_repo_nodes(self, apiuser, repo_name, revision, root_path,
ret_type='all'):
"""
returns a list of nodes and it's children
for a given path at given revision. It's possible to specify ret_type
to show only files or dirs
:param apiuser:
:param repo_name: name of repository
:param revision: revision for which listing should be done
:param root_path: path from which start displaying
:param ret_type: return type 'all|files|dirs' nodes
"""
try:
_d, _f = ScmModel().get_nodes(repo_name, revision, root_path,
flat=False)
_map = {
'all': _d + _f,
'files': _f,
'dirs': _d,
}
return _map[ret_type]
except KeyError:
raise JSONRPCError('ret_type must be one of %s' % _map.keys())
except Exception, e:
raise JSONRPCError(e)
@HasPermissionAnyDecorator('hg.admin', 'hg.create.repository')
def create_repo(self, apiuser, repo_name, owner_name, description='',
repo_type='hg', private=False, clone_uri=None):
"""
Create repository, if clone_url is given it makes a remote clone
:param apiuser:
:param repo_name:
:param owner_name:
:param description:
:param repo_type:
:param private:
:param clone_uri:
"""
try:
owner = User.get_by_username(owner_name)
if owner is None:
raise JSONRPCError('unknown user %s' % owner_name)
if Repository.get_by_repo_name(repo_name):
raise JSONRPCError("repo %s already exist" % repo_name)
groups = repo_name.split(Repository.url_sep())
real_name = groups[-1]
# create structure of groups
group = map_groups(repo_name)
repo = RepoModel().create(
dict(
repo_name=real_name,
repo_name_full=repo_name,
description=description,
private=private,
repo_type=repo_type,
repo_group=group.group_id if group else None,
clone_uri=clone_uri
)
)
Session.commit()
return dict(
id=repo.repo_id,
msg="Created new repository %s" % (repo.repo_name),
repo=dict(
id=repo.repo_id,
repo_name=repo.repo_name,
type=repo.repo_type,
clone_uri=repo.clone_uri,
private=repo.private,
created_on=repo.created_on,
description=repo.description,
)
)
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError('failed to create repository %s' % repo_name)
@HasPermissionAnyDecorator('hg.admin')
def delete_repo(self, apiuser, repo_name):
"""
Deletes a given repository
:param repo_name:
"""
if not Repository.get_by_repo_name(repo_name):
raise JSONRPCError("repo %s does not exist" % repo_name)
try:
RepoModel().delete(repo_name)
Session.commit()
return dict(
msg='Deleted repository %s' % repo_name
)
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError('failed to delete repository %s' % repo_name)
@HasPermissionAnyDecorator('hg.admin')
def grant_user_permission(self, apiuser, repo_name, username, perm):
"""
Grant permission for user on given repository, or update existing one
if found
:param repo_name:
:param username:
:param perm:
"""
try:
repo = Repository.get_by_repo_name(repo_name)
if repo is None:
raise JSONRPCError('unknown repository %s' % repo)
user = User.get_by_username(username)
if user is None:
raise JSONRPCError('unknown user %s' % username)
RepoModel().grant_user_permission(repo=repo, user=user, perm=perm)
Session.commit()
return dict(
msg='Granted perm: %s for user: %s in repo: %s' % (
perm, username, repo_name
)
)
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError(
'failed to edit permission %(repo)s for %(user)s' % dict(
user=username, repo=repo_name
)
)
@HasPermissionAnyDecorator('hg.admin')
def revoke_user_permission(self, apiuser, repo_name, username):
"""
Revoke permission for user on given repository
:param repo_name:
:param username:
"""
try:
repo = Repository.get_by_repo_name(repo_name)
if repo is None:
raise JSONRPCError('unknown repository %s' % repo)
user = User.get_by_username(username)
if user is None:
raise JSONRPCError('unknown user %s' % username)
RepoModel().revoke_user_permission(repo=repo_name, user=username)
Session.commit()
return dict(
msg='Revoked perm for user: %s in repo: %s' % (
username, repo_name
)
)
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError(
'failed to edit permission %(repo)s for %(user)s' % dict(
user=username, repo=repo_name
)
)
@HasPermissionAnyDecorator('hg.admin')
def grant_users_group_permission(self, apiuser, repo_name, group_name, perm):
"""
Grant permission for users group on given repository, or update
existing one if found
:param repo_name:
:param group_name:
:param perm:
"""
try:
repo = Repository.get_by_repo_name(repo_name)
if repo is None:
raise JSONRPCError('unknown repository %s' % repo)
user_group = UsersGroup.get_by_group_name(group_name)
if user_group is None:
raise JSONRPCError('unknown users group %s' % user_group)
RepoModel().grant_users_group_permission(repo=repo_name,
group_name=group_name,
perm=perm)
Session.commit()
return dict(
msg='Granted perm: %s for group: %s in repo: %s' % (
perm, group_name, repo_name
)
)
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError(
'failed to edit permission %(repo)s for %(usersgr)s' % dict(
usersgr=group_name, repo=repo_name
)
)
@HasPermissionAnyDecorator('hg.admin')
def revoke_users_group_permission(self, apiuser, repo_name, group_name):
"""
Revoke permission for users group on given repository
:param repo_name:
:param group_name:
"""
try:
repo = Repository.get_by_repo_name(repo_name)
if repo is None:
raise JSONRPCError('unknown repository %s' % repo)
user_group = UsersGroup.get_by_group_name(group_name)
if user_group is None:
raise JSONRPCError('unknown users group %s' % user_group)
RepoModel().revoke_users_group_permission(repo=repo_name,
group_name=group_name)
Session.commit()
return dict(
msg='Revoked perm for group: %s in repo: %s' % (
group_name, repo_name
)
)
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError(
'failed to edit permission %(repo)s for %(usersgr)s' % dict(
usersgr=group_name, repo=repo_name
)
)