# -*- coding: utf-8 -*- """ rhodecode.model.permission ~~~~~~~~~~~~~~~~~~~~~~~~~~ permissions model for RhodeCode :created_on: Aug 20, 2010 :author: marcink :copyright: (C) 2010-2012 Marcin Kuzminski :license: GPLv3, see COPYING for more details. """ # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import logging import traceback from sqlalchemy.exc import DatabaseError from rhodecode.lib.caching_query import FromCache from rhodecode.model import BaseModel from rhodecode.model.db import User, Permission, UserToPerm, UserRepoToPerm,\ UserRepoGroupToPerm log = logging.getLogger(__name__) class PermissionModel(BaseModel): """ Permissions model for RhodeCode """ cls = Permission def get_permission(self, permission_id, cache=False): """ Get's permissions by id :param permission_id: id of permission to get from database :param cache: use Cache for this query """ perm = self.sa.query(Permission) if cache: perm = perm.options(FromCache("sql_cache_short", "get_permission_%s" % permission_id)) return perm.get(permission_id) def get_permission_by_name(self, name, cache=False): """ Get's permissions by given name :param name: name to fetch :param cache: Use cache for this query """ perm = self.sa.query(Permission)\ .filter(Permission.permission_name == name) if cache: perm = perm.options(FromCache("sql_cache_short", "get_permission_%s" % name)) return perm.scalar() def update(self, form_result): perm_user = self.sa.query(User)\ .filter(User.username == form_result['perm_user_name']).scalar() u2p = self.sa.query(UserToPerm).filter(UserToPerm.user == perm_user).all() if len(u2p) != len(User.DEFAULT_PERMISSIONS): raise Exception('Defined: %s should be %s permissions for default' ' user. This should not happen please verify' ' your database' % (len(u2p), len(User.DEFAULT_PERMISSIONS))) try: # stage 1 change defaults for p in u2p: if p.permission.permission_name.startswith('repository.'): p.permission = self.get_permission_by_name( form_result['default_repo_perm']) self.sa.add(p) elif p.permission.permission_name.startswith('group.'): p.permission = self.get_permission_by_name( form_result['default_group_perm']) self.sa.add(p) elif p.permission.permission_name.startswith('hg.register.'): p.permission = self.get_permission_by_name( form_result['default_register']) self.sa.add(p) elif p.permission.permission_name.startswith('hg.create.'): p.permission = self.get_permission_by_name( form_result['default_create']) self.sa.add(p) elif p.permission.permission_name.startswith('hg.fork.'): p.permission = self.get_permission_by_name( form_result['default_fork']) self.sa.add(p) #stage 2 update all default permissions for repos if checked if form_result['overwrite_default_repo'] == True: _def_name = form_result['default_repo_perm'].split('repository.')[-1] _def = self.get_permission_by_name('repository.' + _def_name) # repos for r2p in self.sa.query(UserRepoToPerm)\ .filter(UserRepoToPerm.user == perm_user)\ .all(): #don't reset PRIVATE repositories if r2p.repository.private is False: r2p.permission = _def self.sa.add(r2p) if form_result['overwrite_default_group'] == True: _def_name = form_result['default_group_perm'].split('group.')[-1] # groups _def = self.get_permission_by_name('group.' + _def_name) for g2p in self.sa.query(UserRepoGroupToPerm)\ .filter(UserRepoGroupToPerm.user == perm_user)\ .all(): g2p.permission = _def self.sa.add(g2p) # stage 3 set anonymous access if perm_user.username == 'default': perm_user.active = bool(form_result['anonymous']) self.sa.add(perm_user) except (DatabaseError,): log.error(traceback.format_exc()) raise