##// END OF EJS Templates
raise na OSERROR if repository data sent from git hook to hook handler is somehow invalid
raise na OSERROR if repository data sent from git hook to hook handler is somehow invalid

File last commit:

r2709:d2d35cf2 beta
r2818:3d0bd5f7 beta
Show More
users_group.py
192 lines | 6.4 KiB | text/x-python | PythonLexer
# -*- coding: utf-8 -*-
"""
rhodecode.model.users_group
~~~~~~~~~~~~~~~~~~~~~~~~~~~
users group model for RhodeCode
:created_on: Oct 1, 2011
:author: nvinot
:copyright: (C) 2011-2011 Nicolas Vinot <aeris@imirhil.fr>
:copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
:license: GPLv3, see COPYING for more details.
"""
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import traceback
from rhodecode.model import BaseModel
from rhodecode.model.db import UsersGroupMember, UsersGroup,\
UsersGroupRepoToPerm, Permission, UsersGroupToPerm, User
from rhodecode.lib.exceptions import UsersGroupsAssignedException
log = logging.getLogger(__name__)
class UsersGroupModel(BaseModel):
cls = UsersGroup
def __get_users_group(self, users_group):
return self._get_instance(UsersGroup, users_group,
callback=UsersGroup.get_by_group_name)
def get(self, users_group_id, cache=False):
return UsersGroup.get(users_group_id)
def get_group(self, users_group):
return self.__get_users_group(users_group)
def get_by_name(self, name, cache=False, case_insensitive=False):
return UsersGroup.get_by_group_name(name, cache, case_insensitive)
def create(self, name, active=True):
try:
new = UsersGroup()
new.users_group_name = name
new.users_group_active = active
self.sa.add(new)
return new
except:
log.error(traceback.format_exc())
raise
def update(self, users_group, form_data):
try:
users_group = self.__get_users_group(users_group)
for k, v in form_data.items():
if k == 'users_group_members':
users_group.members = []
self.sa.flush()
members_list = []
if v:
v = [v] if isinstance(v, basestring) else v
for u_id in set(v):
member = UsersGroupMember(users_group.users_group_id, u_id)
members_list.append(member)
setattr(users_group, 'members', members_list)
setattr(users_group, k, v)
self.sa.add(users_group)
except:
log.error(traceback.format_exc())
raise
def delete(self, users_group, force=False):
"""
Deletes repos group, unless force flag is used
raises exception if there are members in that group, else deletes
group and users
:param users_group:
:param force:
"""
try:
users_group = self.__get_users_group(users_group)
# check if this group is not assigned to repo
assigned_groups = UsersGroupRepoToPerm.query()\
.filter(UsersGroupRepoToPerm.users_group == users_group).all()
if assigned_groups and force is False:
raise UsersGroupsAssignedException('RepoGroup assigned to %s' %
assigned_groups)
self.sa.delete(users_group)
except:
log.error(traceback.format_exc())
raise
def add_user_to_group(self, users_group, user):
users_group = self.__get_users_group(users_group)
user = self._get_user(user)
for m in users_group.members:
u = m.user
if u.user_id == user.user_id:
return True
try:
users_group_member = UsersGroupMember()
users_group_member.user = user
users_group_member.users_group = users_group
users_group.members.append(users_group_member)
user.group_member.append(users_group_member)
self.sa.add(users_group_member)
return users_group_member
except:
log.error(traceback.format_exc())
raise
def remove_user_from_group(self, users_group, user):
users_group = self.__get_users_group(users_group)
user = self._get_user(user)
users_group_member = None
for m in users_group.members:
if m.user.user_id == user.user_id:
# Found this user's membership row
users_group_member = m
break
if users_group_member:
try:
self.sa.delete(users_group_member)
return True
except:
log.error(traceback.format_exc())
raise
else:
# User isn't in that group
return False
def has_perm(self, users_group, perm):
users_group = self.__get_users_group(users_group)
perm = self._get_perm(perm)
return UsersGroupToPerm.query()\
.filter(UsersGroupToPerm.users_group == users_group)\
.filter(UsersGroupToPerm.permission == perm).scalar() is not None
def grant_perm(self, users_group, perm):
users_group = self.__get_users_group(users_group)
perm = self._get_perm(perm)
# if this permission is already granted skip it
_perm = UsersGroupToPerm.query()\
.filter(UsersGroupToPerm.users_group == users_group)\
.filter(UsersGroupToPerm.permission == perm)\
.scalar()
if _perm:
return
new = UsersGroupToPerm()
new.users_group = users_group
new.permission = perm
self.sa.add(new)
def revoke_perm(self, users_group, perm):
users_group = self.__get_users_group(users_group)
perm = self._get_perm(perm)
obj = UsersGroupToPerm.query()\
.filter(UsersGroupToPerm.users_group == users_group)\
.filter(UsersGroupToPerm.permission == perm).scalar()
if obj:
self.sa.delete(obj)