##// END OF EJS Templates
tests: skip reading Git system and global configuration in test_vcs_operations...
tests: skip reading Git system and global configuration in test_vcs_operations The parent changeset reduced the dependency on global configuration and made it possible to run tests without any global git configuration. But it is still unfortunate to even look at the global configuration when running tests. Global configuration is already disabled for Mercurial by setting HGRCPATH. Now do something similar for Git. According to the git man page, GIT_CONFIG_GLOBAL and GIT_CONFIG_SYSTEM set to /dev/null will make Git skip reading the configuration files on all platforms. Note that the GIT_CONFIG variables were introduced in Git 2.32.0, so this will not work with all the Git versions supported by Kallithea.

File last commit:

r8695:98cbebff stable
r8769:511b20a6 stable
Show More
auth.py
723 lines | 28.3 KiB | text/x-python | PythonLexer
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 # -*- coding: utf-8 -*-
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
kallithea.lib.auth
~~~~~~~~~~~~~~~~~~
authentication and permission libraries
Bradley M. Kuhn
RhodeCode GmbH is not the sole author of this work
r4211 This file was forked by the Kallithea project in July 2014.
Original author and date, and relevant copyright and licensing information is below:
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 :created_on: Apr 4, 2010
:author: marcink
Bradley M. Kuhn
RhodeCode GmbH is not the sole author of this work
r4211 :copyright: (c) 2013 RhodeCode GmbH, and others.
Bradley M. Kuhn
Correct licensing information in individual files....
r4208 :license: GPLv3, see LICENSE.md for more details.
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 """
import itertools
Mads Kiilerich
scripts: initial run of import cleanup using isort
r7718 import logging
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
lib: use ipaddr from pip instead of vendoring it
r7716 import ipaddr
Mads Kiilerich
scripts: initial run of import cleanup using isort
r7718 from decorator import decorator
from sqlalchemy.orm import joinedload
from sqlalchemy.orm.exc import ObjectDeletedError
Mads Kiilerich
flake8: fix some F401 '...' imported but unused
r7719 from tg import request
Mads Kiilerich
tg: minimize future diff by some mocking and replacing some pylons imports with tg...
r6508 from tg.i18n import ugettext as _
Mads Kiilerich
flake8: fix some F401 '...' imported but unused
r7719 from webob.exc import HTTPForbidden, HTTPFound
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
db: introduce kallithea.DEFAULT_USER_ID to avoid repeated get_default_user()
r8230 import kallithea
Mads Kiilerich
lib: move webhelpers2 and friends to webutils...
r8488 from kallithea.lib import webutils
Mads Kiilerich
auth: simplify AuthUser.permissions and how it actually never use cache
r8214 from kallithea.lib.utils import get_repo_group_slug, get_repo_slug, get_user_group_slug
Mads Kiilerich
auth: compute AuthUser.permissions lazily...
r8374 from kallithea.lib.vcs.utils.lazy import LazyProperty
Mads Kiilerich
routing: separate url handling from routing - move it to webutils...
r8445 from kallithea.lib.webutils import url
Mads Kiilerich
model: always import the whole db module - drop "from" imports
r8453 from kallithea.model import db, meta
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 from kallithea.model.user import UserModel
log = logging.getLogger(__name__)
Mads Kiilerich
model: always import the whole db module - drop "from" imports
r8453 PERM_WEIGHTS = db.Permission.PERM_WEIGHTS
Mads Kiilerich
auth: extract private parts from get_user_permissions...
r8375
def bump_permission(permissions, key, new_perm):
"""Add a new permission for key to permissions.
Assuming the permissions are comparable, set the new permission if it
has higher weight, else drop it and keep the old permission.
"""
cur_perm = permissions[key]
new_perm_val = PERM_WEIGHTS[new_perm]
cur_perm_val = PERM_WEIGHTS[cur_perm]
if new_perm_val > cur_perm_val:
permissions[key] = new_perm
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 class AuthUser(object):
"""
Søren Løvborg
AuthUser: update docstring
r5253 Represents a Kallithea user, including various authentication and
authorization information. Typically used to store the current user,
but is also used as a generic user information data structure in
parts of the code, e.g. user management.
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Søren Løvborg
auth: construct AuthUser from either user_id or db.User object...
r5326 Constructed from a database `User` object, a user ID or cookie dict,
it looks up the user (if needed) and copies all attributes to itself,
Søren Løvborg
AuthUser: update docstring
r5253 adding various non-persistent data. If lookup fails but anonymous
access to Kallithea is enabled, the default user is loaded instead.
Mads Kiilerich
auth: drop unused AuthUser.is_authenticated...
r7600 `AuthUser` does not by itself authenticate users. It's up to other parts of
the code to check e.g. if a supplied password is correct, and if so, trust
the AuthUser object as an authenticated user.
Søren Løvborg
auth: miscellaneous improvements and typo fixes
r5316
However, `AuthUser` does refuse to load a user that is not `active`.
Søren Løvborg
auth: add AuthUser.is_anonymous, along with some exposition...
r6345
Note that Kallithea distinguishes between the default user (an actual
user in the database with username "default") and "no user" (no actual
User object, AuthUser filled with blank values and username "None").
If the default user is active, that will always be used instead of
"no user". On the other hand, if the default user is disabled (and
there is no login information), we instead get "no user"; this should
only happen on the login page (as all other requests are redirected).
`is_default_user` specifically checks if the AuthUser is the user named
"default". Use `is_anonymous` to check for both "default" and "no user".
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 """
Mads Kiilerich
auth: introduce AuthUser.make factory which can return None if user can't be authenticated
r7602 @classmethod
Mads Kiilerich
auth: drop authenticating_api_key from AuthUser...
r7610 def make(cls, dbuser=None, is_external_auth=False, ip_addr=None):
Mads Kiilerich
auth: introduce AuthUser.make factory which can return None if user can't be authenticated
r7602 """Create an AuthUser to be authenticated ... or return None if user for some reason can't be authenticated.
Mads Kiilerich
auth: move IP check to AuthUser.make - it is more about accepting authentication than about permissions after authentication
r7603 Checks that a non-None dbuser is provided, is active, and that the IP address is ok.
Mads Kiilerich
auth: introduce AuthUser.make factory which can return None if user can't be authenticated
r7602 """
Mads Kiilerich
auth: move IP check to AuthUser.make - it is more about accepting authentication than about permissions after authentication
r7603 assert ip_addr is not None
Mads Kiilerich
auth: introduce AuthUser.make factory which can return None if user can't be authenticated
r7602 if dbuser is None:
log.info('No db user for authentication')
return None
if not dbuser.active:
log.info('Db user %s not active', dbuser.username)
return None
Mads Kiilerich
db: drop SA caching_query and FromCache, and thus sql_cache_short beaker cache...
r8212 allowed_ips = AuthUser.get_allowed_ips(dbuser.user_id)
Mads Kiilerich
auth: move IP check to AuthUser.make - it is more about accepting authentication than about permissions after authentication
r7603 if not check_ip_access(source_ip=ip_addr, allowed_ips=allowed_ips):
log.info('Access for %s from %s forbidden - not in %s', dbuser.username, ip_addr, allowed_ips)
return None
Mads Kiilerich
auth: drop authenticating_api_key from AuthUser...
r7610 return cls(dbuser=dbuser, is_external_auth=is_external_auth)
Mads Kiilerich
auth: introduce AuthUser.make factory which can return None if user can't be authenticated
r7602
Mads Kiilerich
auth: drop authenticating_api_key from AuthUser...
r7610 def __init__(self, user_id=None, dbuser=None, is_external_auth=False):
Mads Kiilerich
auth: remove AuthUser __init__ magic for fallback to default user instead of the requested user...
r7601 self.is_external_auth = is_external_auth # container auth - don't show logout option
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
auth: fix bad comment reference to fill_data
r8146 # These attributes will be overridden below if the requested user is
# found or anonymous access (using the default user) is enabled.
Søren Løvborg
auth: fold AuthUser._propagate_data into constructor...
r5325 self.user_id = None
self.username = None
self.api_key = None
self.name = ''
self.lastname = ''
self.email = ''
self.admin = False
Søren Løvborg
auth: construct AuthUser from either user_id or db.User object...
r5326 # Look up database user, if necessary.
Søren Løvborg
auth: fold AuthUser._propagate_data into constructor...
r5325 if user_id is not None:
Mads Kiilerich
auth: remove AuthUser __init__ magic for fallback to default user instead of the requested user...
r7601 assert dbuser is None
Mads Kiilerich
cleanup: pass log strings unformatted - avoid unnecessary % formatting when not logging
r5375 log.debug('Auth User lookup by USER ID %s', user_id)
Mads Kiilerich
auth: let AuthUser fetch default user on demand
r7404 dbuser = UserModel().get(user_id)
Mads Kiilerich
auth: remove AuthUser __init__ magic for fallback to default user instead of the requested user...
r7601 assert dbuser is not None
Søren Løvborg
auth: construct AuthUser from either user_id or db.User object...
r5326 else:
Mads Kiilerich
auth: remove AuthUser __init__ magic for fallback to default user instead of the requested user...
r7601 assert dbuser is not None
Søren Løvborg
auth: construct AuthUser from either user_id or db.User object...
r5326 log.debug('Auth User lookup by database user %s', dbuser)
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
auth: fix failure when editing inactive users...
r7638 log.debug('filling %s data', dbuser)
self.is_anonymous = dbuser.is_default_user
if dbuser.is_default_user and not dbuser.active:
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 self.username = 'None'
Mads Kiilerich
auth: remove AuthUser __init__ magic for fallback to default user instead of the requested user...
r7601 self.is_default_user = False
Mads Kiilerich
auth: fix failure when editing inactive users...
r7638 else:
# copy non-confidential database fields from a `db.User` to this `AuthUser`.
Mads Kiilerich
py3: trivial renaming of .iteritems() to .items()...
r8059 for k, v in dbuser.get_dict().items():
Søren Løvborg
auth: turn dead AuthUser code into assertion...
r5328 assert k not in ['api_keys', 'permissions']
setattr(self, k, v)
Mads Kiilerich
auth: fix failure when editing inactive users...
r7638 self.is_default_user = dbuser.is_default_user
log.debug('Auth User is now %s', self)
Søren Løvborg
auth: move UserModel.fill_data to AuthUser...
r5327
Mads Kiilerich
auth: compute AuthUser.permissions lazily...
r8374 @LazyProperty
Mads Kiilerich
auth: compute AuthUser.global_permissions lazily
r8376 def global_permissions(self):
log.debug('Getting global permissions for %s', self)
if self.is_admin:
return set(['hg.admin'])
global_permissions = set()
# default global permissions from the default user
Mads Kiilerich
model: always import the whole db module - drop "from" imports
r8453 default_global_perms = db.UserToPerm.query() \
.filter(db.UserToPerm.user_id == kallithea.DEFAULT_USER_ID) \
.options(joinedload(db.UserToPerm.permission))
Mads Kiilerich
auth: compute AuthUser.global_permissions lazily
r8376 for perm in default_global_perms:
global_permissions.add(perm.permission.permission_name)
# user group global permissions
Mads Kiilerich
model: always import the whole db module - drop "from" imports
r8453 user_perms_from_users_groups = meta.Session().query(db.UserGroupToPerm) \
.options(joinedload(db.UserGroupToPerm.permission)) \
.join((db.UserGroupMember, db.UserGroupToPerm.users_group_id ==
db.UserGroupMember.users_group_id)) \
.filter(db.UserGroupMember.user_id == self.user_id) \
.join((db.UserGroup, db.UserGroupMember.users_group_id ==
db.UserGroup.users_group_id)) \
.filter(db.UserGroup.users_group_active == True) \
.order_by(db.UserGroupToPerm.users_group_id) \
Mads Kiilerich
auth: compute AuthUser.global_permissions lazily
r8376 .all()
# need to group here by groups since user can be in more than
# one group
_grouped = [[x, list(y)] for x, y in
itertools.groupby(user_perms_from_users_groups,
lambda x:x.users_group)]
for gr, perms in _grouped:
for perm in perms:
global_permissions.add(perm.permission.permission_name)
# user specific global permissions
Mads Kiilerich
model: always import the whole db module - drop "from" imports
r8453 user_perms = meta.Session().query(db.UserToPerm) \
.options(joinedload(db.UserToPerm.permission)) \
.filter(db.UserToPerm.user_id == self.user_id).all()
Mads Kiilerich
auth: compute AuthUser.global_permissions lazily
r8376 for perm in user_perms:
global_permissions.add(perm.permission.permission_name)
# for each kind of global permissions, only keep the one with heighest weight
kind_max_perm = {}
for perm in sorted(global_permissions, key=lambda n: PERM_WEIGHTS.get(n, -1)):
kind = perm.rsplit('.', 1)[0]
kind_max_perm[kind] = perm
return set(kind_max_perm.values())
@LazyProperty
Mads Kiilerich
auth: compute AuthUser.repository_permissions lazily
r8377 def repository_permissions(self):
log.debug('Getting repository permissions for %s', self)
repository_permissions = {}
Mads Kiilerich
model: always import the whole db module - drop "from" imports
r8453 default_repo_perms = db.Permission.get_default_perms(kallithea.DEFAULT_USER_ID)
Mads Kiilerich
auth: compute AuthUser.repository_permissions lazily
r8377
if self.is_admin:
for perm in default_repo_perms:
r_k = perm.repository.repo_name
p = 'repository.admin'
repository_permissions[r_k] = p
else:
# defaults for repositories from default user
for perm in default_repo_perms:
r_k = perm.repository.repo_name
if perm.repository.owner_id == self.user_id:
p = 'repository.admin'
elif perm.repository.private:
p = 'repository.none'
else:
p = perm.permission.permission_name
repository_permissions[r_k] = p
# user group repository permissions
user_repo_perms_from_users_groups = \
Mads Kiilerich
model: always import the whole db module - drop "from" imports
r8453 meta.Session().query(db.UserGroupRepoToPerm) \
.join((db.UserGroup, db.UserGroupRepoToPerm.users_group_id ==
db.UserGroup.users_group_id)) \
.filter(db.UserGroup.users_group_active == True) \
.join((db.UserGroupMember, db.UserGroupRepoToPerm.users_group_id ==
db.UserGroupMember.users_group_id)) \
.filter(db.UserGroupMember.user_id == self.user_id) \
.options(joinedload(db.UserGroupRepoToPerm.repository)) \
.options(joinedload(db.UserGroupRepoToPerm.permission)) \
Mads Kiilerich
auth: compute AuthUser.repository_permissions lazily
r8377 .all()
for perm in user_repo_perms_from_users_groups:
bump_permission(repository_permissions,
perm.repository.repo_name,
perm.permission.permission_name)
# user permissions for repositories
Mads Kiilerich
model: always import the whole db module - drop "from" imports
r8453 user_repo_perms = db.Permission.get_default_perms(self.user_id)
Mads Kiilerich
auth: compute AuthUser.repository_permissions lazily
r8377 for perm in user_repo_perms:
bump_permission(repository_permissions,
perm.repository.repo_name,
perm.permission.permission_name)
return repository_permissions
@LazyProperty
Mads Kiilerich
auth: compute AuthUser.repository_group_permissions lazily
r8378 def repository_group_permissions(self):
log.debug('Getting repository group permissions for %s', self)
repository_group_permissions = {}
Mads Kiilerich
model: always import the whole db module - drop "from" imports
r8453 default_repo_groups_perms = db.Permission.get_default_group_perms(kallithea.DEFAULT_USER_ID)
Mads Kiilerich
auth: compute AuthUser.repository_group_permissions lazily
r8378
if self.is_admin:
for perm in default_repo_groups_perms:
rg_k = perm.group.group_name
p = 'group.admin'
repository_group_permissions[rg_k] = p
else:
# defaults for repository groups taken from default user permission
# on given group
for perm in default_repo_groups_perms:
rg_k = perm.group.group_name
p = perm.permission.permission_name
repository_group_permissions[rg_k] = p
# user group for repo groups permissions
user_repo_group_perms_from_users_groups = \
Mads Kiilerich
model: always import the whole db module - drop "from" imports
r8453 meta.Session().query(db.UserGroupRepoGroupToPerm) \
.join((db.UserGroup, db.UserGroupRepoGroupToPerm.users_group_id ==
db.UserGroup.users_group_id)) \
.filter(db.UserGroup.users_group_active == True) \
.join((db.UserGroupMember, db.UserGroupRepoGroupToPerm.users_group_id
== db.UserGroupMember.users_group_id)) \
.filter(db.UserGroupMember.user_id == self.user_id) \
.options(joinedload(db.UserGroupRepoGroupToPerm.permission)) \
Mads Kiilerich
auth: compute AuthUser.repository_group_permissions lazily
r8378 .all()
for perm in user_repo_group_perms_from_users_groups:
bump_permission(repository_group_permissions,
perm.group.group_name,
perm.permission.permission_name)
# user explicit permissions for repository groups
Mads Kiilerich
model: always import the whole db module - drop "from" imports
r8453 user_repo_groups_perms = db.Permission.get_default_group_perms(self.user_id)
Mads Kiilerich
auth: compute AuthUser.repository_group_permissions lazily
r8378 for perm in user_repo_groups_perms:
bump_permission(repository_group_permissions,
perm.group.group_name,
perm.permission.permission_name)
return repository_group_permissions
@LazyProperty
Mads Kiilerich
auth: compute AuthUser.user_group_permissions lazily
r8379 def user_group_permissions(self):
log.debug('Getting user group permissions for %s', self)
user_group_permissions = {}
Mads Kiilerich
model: always import the whole db module - drop "from" imports
r8453 default_user_group_perms = db.Permission.get_default_user_group_perms(kallithea.DEFAULT_USER_ID)
Mads Kiilerich
auth: compute AuthUser.user_group_permissions lazily
r8379
if self.is_admin:
for perm in default_user_group_perms:
u_k = perm.user_group.users_group_name
p = 'usergroup.admin'
user_group_permissions[u_k] = p
else:
# defaults for user groups taken from default user permission
# on given user group
for perm in default_user_group_perms:
u_k = perm.user_group.users_group_name
p = perm.permission.permission_name
user_group_permissions[u_k] = p
# user group for user group permissions
user_group_user_groups_perms = \
Mads Kiilerich
model: always import the whole db module - drop "from" imports
r8453 meta.Session().query(db.UserGroupUserGroupToPerm) \
.join((db.UserGroup, db.UserGroupUserGroupToPerm.target_user_group_id
== db.UserGroup.users_group_id)) \
.join((db.UserGroupMember, db.UserGroupUserGroupToPerm.user_group_id
== db.UserGroupMember.users_group_id)) \
.filter(db.UserGroupMember.user_id == self.user_id) \
.join((db.UserGroup, db.UserGroupMember.users_group_id ==
db.UserGroup.users_group_id), aliased=True, from_joinpoint=True) \
.filter(db.UserGroup.users_group_active == True) \
.options(joinedload(db.UserGroupUserGroupToPerm.permission)) \
Mads Kiilerich
auth: compute AuthUser.user_group_permissions lazily
r8379 .all()
for perm in user_group_user_groups_perms:
bump_permission(user_group_permissions,
perm.target_user_group.users_group_name,
perm.permission.permission_name)
# user explicit permission for user groups
Mads Kiilerich
model: always import the whole db module - drop "from" imports
r8453 user_user_groups_perms = db.Permission.get_default_user_group_perms(self.user_id)
Mads Kiilerich
auth: compute AuthUser.user_group_permissions lazily
r8379 for perm in user_user_groups_perms:
bump_permission(user_group_permissions,
perm.user_group.users_group_name,
perm.permission.permission_name)
return user_group_permissions
@LazyProperty
Mads Kiilerich
auth: compute AuthUser.permissions lazily...
r8374 def permissions(self):
"""dict with all 4 kind of permissions - mainly for backwards compatibility"""
return {
Mads Kiilerich
auth: refactor permissions...
r8368 'global': self.global_permissions,
'repositories': self.repository_permissions,
'repositories_groups': self.repository_group_permissions,
'user_groups': self.user_group_permissions,
Mads Kiilerich
auth: compute AuthUser.permissions lazily...
r8374 }
Søren Løvborg
auth: fold AuthUser._propagate_data into constructor...
r5325
Søren Løvborg
auth: simplify repository permission checks...
r6471 def has_repository_permission_level(self, repo_name, level, purpose=None):
required_perms = {
'read': ['repository.read', 'repository.write', 'repository.admin'],
'write': ['repository.write', 'repository.admin'],
'admin': ['repository.admin'],
}[level]
Mads Kiilerich
auth: refactor permissions...
r8368 actual_perm = self.repository_permissions.get(repo_name)
Søren Løvborg
auth: simplify repository permission checks...
r6471 ok = actual_perm in required_perms
log.debug('Checking if user %r can %r repo %r (%s): %s (has %r)',
self.username, level, repo_name, purpose, ok, actual_perm)
return ok
Søren Løvborg
auth: simplify repository group permission checks...
r6472 def has_repository_group_permission_level(self, repo_group_name, level, purpose=None):
required_perms = {
'read': ['group.read', 'group.write', 'group.admin'],
'write': ['group.write', 'group.admin'],
'admin': ['group.admin'],
}[level]
Mads Kiilerich
auth: refactor permissions...
r8368 actual_perm = self.repository_group_permissions.get(repo_group_name)
Søren Løvborg
auth: simplify repository group permission checks...
r6472 ok = actual_perm in required_perms
log.debug('Checking if user %r can %r repo group %r (%s): %s (has %r)',
self.username, level, repo_group_name, purpose, ok, actual_perm)
return ok
Søren Løvborg
auth: simplify user group permission checks...
r6473 def has_user_group_permission_level(self, user_group_name, level, purpose=None):
required_perms = {
'read': ['usergroup.read', 'usergroup.write', 'usergroup.admin'],
'write': ['usergroup.write', 'usergroup.admin'],
'admin': ['usergroup.admin'],
}[level]
Mads Kiilerich
auth: refactor permissions...
r8368 actual_perm = self.user_group_permissions.get(user_group_name)
Søren Løvborg
auth: simplify user group permission checks...
r6473 ok = actual_perm in required_perms
log.debug('Checking if user %r can %r user group %r (%s): %s (has %r)',
self.username, level, user_group_name, purpose, ok, actual_perm)
return ok
Søren Løvborg
auth: fold AuthUser._propagate_data into constructor...
r5325 @property
def api_keys(self):
return self._get_api_keys()
Søren Løvborg
auth: make internal AuthUser methods private...
r5317 def _get_api_keys(self):
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 api_keys = [self.api_key]
Mads Kiilerich
model: always import the whole db module - drop "from" imports
r8453 for api_key in db.UserApiKeys.query() \
Søren Løvborg
api: simplify API key expiration checks
r6474 .filter_by(user_id=self.user_id, is_expired=False):
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 api_keys.append(api_key.api_key)
return api_keys
@property
def is_admin(self):
return self.admin
@property
def repositories_admin(self):
"""
Returns list of repositories you're an admin of
"""
Mads Kiilerich
auth: refactor permissions...
r8368 return [x[0] for x in self.repository_permissions.items()
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 if x[1] == 'repository.admin']
@property
def repository_groups_admin(self):
"""
Returns list of repository groups you're an admin of
"""
Mads Kiilerich
auth: refactor permissions...
r8368 return [x[0] for x in self.repository_group_permissions.items()
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 if x[1] == 'group.admin']
@property
def user_groups_admin(self):
"""
Returns list of user groups you're an admin of
"""
Mads Kiilerich
auth: refactor permissions...
r8368 return [x[0] for x in self.user_group_permissions.items()
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 if x[1] == 'usergroup.admin']
def __repr__(self):
Mads Kiilerich
py3: drop __unicode__ ... and generally move towards just providing a good __repr__...
r7957 return "<%s %s: %r>" % (self.__class__.__name__, self.user_id, self.username)
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Søren Løvborg
AuthUser: refactor AuthUser cookie/session serialization...
r5265 def to_cookie(self):
""" Serializes this login session to a cookie `dict`. """
return {
'user_id': self.user_id,
Søren Løvborg
BaseController: hide "Log out" link for external login sessions...
r5266 'is_external_auth': self.is_external_auth,
Søren Løvborg
AuthUser: refactor AuthUser cookie/session serialization...
r5265 }
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Søren Løvborg
AuthUser: refactor AuthUser cookie/session serialization...
r5265 @staticmethod
Mads Kiilerich
auth: move IP check to AuthUser.make - it is more about accepting authentication than about permissions after authentication
r7603 def from_cookie(cookie, ip_addr):
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 """
Mads Kiilerich
auth: introduce AuthUser.make factory which can return None if user can't be authenticated
r7602 Deserializes an `AuthUser` from a cookie `dict` ... or return None.
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 """
Mads Kiilerich
auth: introduce AuthUser.make factory which can return None if user can't be authenticated
r7602 return AuthUser.make(
dbuser=UserModel().get(cookie.get('user_id')),
Søren Løvborg
BaseController: hide "Log out" link for external login sessions...
r5266 is_external_auth=cookie.get('is_external_auth', False),
Mads Kiilerich
auth: move IP check to AuthUser.make - it is more about accepting authentication than about permissions after authentication
r7603 ip_addr=ip_addr,
Søren Løvborg
AuthUser: refactor AuthUser cookie/session serialization...
r5265 )
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
@classmethod
Mads Kiilerich
db: drop SA caching_query and FromCache, and thus sql_cache_short beaker cache...
r8212 def get_allowed_ips(cls, user_id):
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 _set = set()
Mads Kiilerich
model: always import the whole db module - drop "from" imports
r8453 default_ips = db.UserIpMap.query().filter(db.UserIpMap.user_id == kallithea.DEFAULT_USER_ID)
Mads Kiilerich
auth: global permissions given to the default user are the bare minimum and should apply to *all* other users too...
r7592 for ip in default_ips:
try:
_set.add(ip.ip_addr)
except ObjectDeletedError:
# since we use heavy caching sometimes it happens that we get
# deleted objects here, we just skip them
pass
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
model: always import the whole db module - drop "from" imports
r8453 user_ips = db.UserIpMap.query().filter(db.UserIpMap.user_id == user_id)
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 for ip in user_ips:
try:
_set.add(ip.ip_addr)
except ObjectDeletedError:
# since we use heavy caching sometimes it happens that we get
# deleted objects here, we just skip them
pass
return _set or set(['0.0.0.0/0', '::/0'])
Mads Kiilerich
lib: move get_all_user_repos to AuthUser
r8501 def get_all_user_repos(self):
"""
Gets all repositories that user have at least read access
"""
repos = [repo_name
for repo_name, perm in self.repository_permissions.items()
if perm in ['repository.read', 'repository.write', 'repository.admin']
]
return db.Repository.query().filter(db.Repository.repo_name.in_(repos))
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
#==============================================================================
# CHECK DECORATORS
#==============================================================================
Thomas De Schampheleire
auth: return early in LoginRequired on invalid IP...
r5115
Mads Kiilerich
auth: let login helper function return exception to raise instead of raising it self...
r5583 def _redirect_to_login(message=None):
"""Return an exception that must be raised. It will redirect to the login
page which will redirect back to the current URL after authentication.
The optional message will be shown in a flash message."""
Mads Kiilerich
auth: avoid flash message with 'None' on login redirect...
r5141 if message:
Mads Kiilerich
lib: move webhelpers2 and friends to webutils...
r8488 webutils.flash(message, category='warning')
Mads Kiilerich
auth: let login helper function return exception to raise instead of raising it self...
r5583 p = request.path_qs
Mads Kiilerich
cleanup: pass log strings unformatted - avoid unnecessary % formatting when not logging
r5375 log.debug('Redirecting to login page, origin: %s', p)
Mads Kiilerich
auth: let login helper function return exception to raise instead of raising it self...
r5583 return HTTPFound(location=url('login_home', came_from=p))
Søren Løvborg
login: include query parameters in came_from...
r5511
Thomas De Schampheleire
auth: return early in LoginRequired on invalid IP...
r5115
Mads Kiilerich
auth: simplify the double invoked auth classes used for permission checking...
r6413 # Use as decorator
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 class LoginRequired(object):
Mads Kiilerich
auth: refactor to introduce @LoginRequired(allow_default_user=True) and deprecate @NotAnonymous()...
r7019 """Client must be logged in as a valid User, or we'll redirect to the login
page.
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
auth: refactor to introduce @LoginRequired(allow_default_user=True) and deprecate @NotAnonymous()...
r7019 If the "default" user is enabled and allow_default_user is true, that is
considered valid too.
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
auth: drop api_access_controllers_whitelist and give API key auth same access as other kinds of auth...
r7598 Also checks that IP address is allowed.
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 """
Mads Kiilerich
auth: drop api_access_controllers_whitelist and give API key auth same access as other kinds of auth...
r7598 def __init__(self, allow_default_user=False):
Mads Kiilerich
auth: refactor to introduce @LoginRequired(allow_default_user=True) and deprecate @NotAnonymous()...
r7019 self.allow_default_user = allow_default_user
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
def __call__(self, func):
return decorator(self.__wrapper, func)
def __wrapper(self, func, *fargs, **fkwargs):
Søren Løvborg
AuthUser: Drop ip_addr field...
r5211 controller = fargs[0]
Mads Kiilerich
controllers: avoid setting request state in controller instances - set it in the thread global request variable...
r6412 user = request.authuser
Søren Løvborg
AuthUser: Drop ip_addr field...
r5211 loc = "%s:%s" % (controller.__class__.__name__, func.__name__)
Mads Kiilerich
cleanup: pass log strings unformatted - avoid unnecessary % formatting when not logging
r5375 log.debug('Checking access for user %s @ %s', user, loc)
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Thomas De Schampheleire
auth: simplify logging of regular authentication in LoginRequired
r5142 # regular user authentication
Mads Kiilerich
auth: use other and better checks than is_authenticated...
r7599 if user.is_default_user:
Mads Kiilerich
auth: refactor to introduce @LoginRequired(allow_default_user=True) and deprecate @NotAnonymous()...
r7019 if self.allow_default_user:
log.info('default user @ %s', loc)
return func(*fargs, **fkwargs)
Mads Kiilerich
auth: tweak log statements when redirecting unauthenticated users to login...
r8695 log.info('default user is redirected to login @ %s', loc)
Mads Kiilerich
auth: use other and better checks than is_authenticated...
r7599 elif user.is_anonymous: # default user is disabled and no proper authentication
Mads Kiilerich
auth: tweak log statements when redirecting unauthenticated users to login...
r8695 log.info('anonymous user is redirected to login @ %s', loc)
Mads Kiilerich
auth: use other and better checks than is_authenticated...
r7599 else: # regular authentication
log.info('user %s authenticated with regular auth @ %s', user, loc)
return func(*fargs, **fkwargs)
Mads Kiilerich
auth: refactor to introduce @LoginRequired(allow_default_user=True) and deprecate @NotAnonymous()...
r7019 raise _redirect_to_login()
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
auth: simplify the double invoked auth classes used for permission checking...
r6413
# Use as decorator
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 class NotAnonymous(object):
Søren Løvborg
auth: add AuthUser.is_anonymous, along with some exposition...
r6345 """Ensures that client is not logged in as the "default" user, and
redirects to the login page otherwise. Must be used together with
LoginRequired."""
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
def __call__(self, func):
return decorator(self.__wrapper, func)
def __wrapper(self, func, *fargs, **fkwargs):
cls = fargs[0]
Mads Kiilerich
auth: simplify the double invoked auth classes used for permission checking...
r6413 user = request.authuser
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
auth: simplify the double invoked auth classes used for permission checking...
r6413 log.debug('Checking that user %s is not anonymous @%s', user.username, cls)
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
auth: simplify the double invoked auth classes used for permission checking...
r6413 if user.is_default_user:
Mads Kiilerich
auth: let login helper function return exception to raise instead of raising it self...
r5583 raise _redirect_to_login(_('You need to be a registered user to '
'perform this action'))
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 else:
return func(*fargs, **fkwargs)
Mads Kiilerich
auth: simplify the double invoked auth classes used for permission checking...
r6413 class _PermsDecorator(object):
Mads Kiilerich
auth: refactor to make it explicit in the function profile when they only takes one permission
r6576 """Base class for controller decorators with multiple permissions"""
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
def __init__(self, *required_perms):
Mads Kiilerich
auth: simplify the double invoked auth classes used for permission checking...
r6413 self.required_perms = required_perms # usually very short - a list is thus fine
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
def __call__(self, func):
return decorator(self.__wrapper, func)
def __wrapper(self, func, *fargs, **fkwargs):
cls = fargs[0]
Mads Kiilerich
auth: simplify the double invoked auth classes used for permission checking...
r6413 user = request.authuser
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 log.debug('checking %s permissions %s for %s %s',
Mads Kiilerich
auth: simplify the double invoked auth classes used for permission checking...
r6413 self.__class__.__name__, self.required_perms, cls, user)
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
auth: simplify the double invoked auth classes used for permission checking...
r6413 if self.check_permissions(user):
log.debug('Permission granted for %s %s', cls, user)
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 return func(*fargs, **fkwargs)
else:
Thomas De Schampheleire
auth: raise log level of 'permission denied' from DEBUG to INFO (issue #243)...
r7207 log.info('Permission denied for %s %s', cls, user)
Mads Kiilerich
auth: simplify the double invoked auth classes used for permission checking...
r6413 if user.is_default_user:
Mads Kiilerich
auth: let login helper function return exception to raise instead of raising it self...
r5583 raise _redirect_to_login(_('You need to be signed in to view this page'))
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 else:
Søren Løvborg
cleanup: replace abort with WebOb exceptions...
r5542 raise HTTPForbidden()
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
auth: simplify the double invoked auth classes used for permission checking...
r6413 def check_permissions(self, user):
Søren Løvborg
cleanup: use standard NotImplementedError for abstract methods...
r6348 raise NotImplementedError()
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
auth: simplify the double invoked auth classes used for permission checking...
r6413 class HasPermissionAnyDecorator(_PermsDecorator):
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 """
Mads Kiilerich
auth: simplify the double invoked auth classes used for permission checking...
r6413 Checks the user has any of the given global permissions.
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 """
Mads Kiilerich
auth: simplify the double invoked auth classes used for permission checking...
r6413 def check_permissions(self, user):
Mads Kiilerich
auth: refactor permissions...
r8368 return any(p in user.global_permissions for p in self.required_perms)
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
auth: refactor to make it explicit in the function profile when they only takes one permission
r6576 class _PermDecorator(_PermsDecorator):
"""Base class for controller decorators with a single permission"""
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
auth: refactor to make it explicit in the function profile when they only takes one permission
r6576 def __init__(self, required_perm):
_PermsDecorator.__init__(self, [required_perm])
self.required_perm = required_perm
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
auth: refactor to make it explicit in the function profile when they only takes one permission
r6576 class HasRepoPermissionLevelDecorator(_PermDecorator):
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 """
Søren Løvborg
auth: simplify repository permission checks...
r6471 Checks the user has at least the specified permission level for the requested repository.
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 """
Mads Kiilerich
auth: simplify the double invoked auth classes used for permission checking...
r6413 def check_permissions(self, user):
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 repo_name = get_repo_slug(request)
Mads Kiilerich
auth: refactor to make it explicit in the function profile when they only takes one permission
r6576 return user.has_repository_permission_level(repo_name, self.required_perm)
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
auth: refactor to make it explicit in the function profile when they only takes one permission
r6576 class HasRepoGroupPermissionLevelDecorator(_PermDecorator):
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 """
Mads Kiilerich
auth: simplify the double invoked auth classes used for permission checking...
r6413 Checks the user has any of given permissions for the requested repository group.
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 """
Mads Kiilerich
auth: simplify the double invoked auth classes used for permission checking...
r6413 def check_permissions(self, user):
repo_group_name = get_repo_group_slug(request)
Mads Kiilerich
auth: refactor to make it explicit in the function profile when they only takes one permission
r6576 return user.has_repository_group_permission_level(repo_group_name, self.required_perm)
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
auth: refactor to make it explicit in the function profile when they only takes one permission
r6576 class HasUserGroupPermissionLevelDecorator(_PermDecorator):
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 """
Checks for access permission for any of given predicates for specific
user group. In order to fulfill the request any of predicates must be meet
"""
Mads Kiilerich
auth: simplify the double invoked auth classes used for permission checking...
r6413 def check_permissions(self, user):
user_group_name = get_user_group_slug(request)
Mads Kiilerich
auth: refactor to make it explicit in the function profile when they only takes one permission
r6576 return user.has_user_group_permission_level(user_group_name, self.required_perm)
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
#==============================================================================
# CHECK FUNCTIONS
#==============================================================================
Mads Kiilerich
auth: simplify the double invoked auth classes used for permission checking...
r6413 class _PermsFunction(object):
Mads Kiilerich
auth: refactor to make it explicit in the function profile when they only takes one permission
r6576 """Base function for other check functions with multiple permissions"""
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
auth: simplify the double invoked auth classes used for permission checking...
r6413 def __init__(self, *required_perms):
self.required_perms = required_perms # usually very short - a list is thus fine
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
py3: support __bool__...
r8055 def __bool__(self):
Søren Løvborg
auth: prevent misuse of PermFunction in bool context...
r5839 """ Defend against accidentally forgetting to call the object
and instead evaluating it directly in a boolean context,
which could have security implications.
"""
raise AssertionError(self.__class__.__name__ + ' is not a bool and must be called!')
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
auth: simplify the double invoked auth classes used for permission checking...
r6413 def __call__(self, *a, **b):
Søren Løvborg
cleanup: use standard NotImplementedError for abstract methods...
r6348 raise NotImplementedError()
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
auth: simplify the double invoked auth classes used for permission checking...
r6413 class HasPermissionAny(_PermsFunction):
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
auth: simplify the double invoked auth classes used for permission checking...
r6413 def __call__(self, purpose=None):
Mads Kiilerich
auth: refactor permissions...
r8368 ok = any(p in request.authuser.global_permissions for p in self.required_perms)
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
scripts: clean up and run the old scripts/logformat.py script
r7572 log.debug('Check %s for global %s (%s): %s',
request.authuser.username, self.required_perms, purpose, ok)
Mads Kiilerich
auth: simplify the double invoked auth classes used for permission checking...
r6413 return ok
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
auth: refactor to make it explicit in the function profile when they only takes one permission
r6576 class _PermFunction(_PermsFunction):
"""Base function for other check functions with a single permission"""
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
auth: refactor to make it explicit in the function profile when they only takes one permission
r6576 def __init__(self, required_perm):
_PermsFunction.__init__(self, [required_perm])
self.required_perm = required_perm
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
auth: refactor to make it explicit in the function profile when they only takes one permission
r6576 class HasRepoPermissionLevel(_PermFunction):
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
auth: simplify the double invoked auth classes used for permission checking...
r6413 def __call__(self, repo_name, purpose=None):
Mads Kiilerich
auth: consistently use request.authuser - drop request.user...
r7405 return request.authuser.has_repository_permission_level(repo_name, self.required_perm, purpose)
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
auth: refactor to make it explicit in the function profile when they only takes one permission
r6576 class HasRepoGroupPermissionLevel(_PermFunction):
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
auth: simplify the double invoked auth classes used for permission checking...
r6413 def __call__(self, group_name, purpose=None):
Mads Kiilerich
auth: consistently use request.authuser - drop request.user...
r7405 return request.authuser.has_repository_group_permission_level(group_name, self.required_perm, purpose)
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
auth: refactor to make it explicit in the function profile when they only takes one permission
r6576 class HasUserGroupPermissionLevel(_PermFunction):
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
auth: simplify the double invoked auth classes used for permission checking...
r6413 def __call__(self, user_group_name, purpose=None):
Mads Kiilerich
auth: consistently use request.authuser - drop request.user...
r7405 return request.authuser.has_user_group_permission_level(user_group_name, self.required_perm, purpose)
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
#==============================================================================
# SPECIAL VERSION TO HANDLE MIDDLEWARE AUTH
#==============================================================================
Mads Kiilerich
auth: simplify the double invoked auth classes used for permission checking...
r6413
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 class HasPermissionAnyMiddleware(object):
def __init__(self, *perms):
self.required_perms = set(perms)
Mads Kiilerich
auth: introduce AuthUser.make factory which can return None if user can't be authenticated
r7602 def __call__(self, authuser, repo_name, purpose=None):
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 try:
Mads Kiilerich
auth: refactor permissions...
r8368 ok = authuser.repository_permissions[repo_name] in self.required_perms
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 except KeyError:
Mads Kiilerich
auth: simplify the double invoked auth classes used for permission checking...
r6413 ok = False
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
Mads Kiilerich
auth: introduce AuthUser.make factory which can return None if user can't be authenticated
r7602 log.debug('Middleware check %s for %s for repo %s (%s): %s', authuser.username, self.required_perms, repo_name, purpose, ok)
Mads Kiilerich
auth: simplify the double invoked auth classes used for permission checking...
r6413 return ok
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187
def check_ip_access(source_ip, allowed_ips=None):
"""
Checks if source_ip is a subnet of any of allowed_ips.
:param source_ip:
:param allowed_ips: list of allowed ips together with mask
"""
Mads Kiilerich
auth: strip RFC4007 zone identifiers from IPv6 addresses before doing access control...
r7289 source_ip = source_ip.split('%', 1)[0]
Mads Kiilerich
cleanup: pass log strings unformatted - avoid unnecessary % formatting when not logging
r5375 log.debug('checking if ip:%s is subnet of %s', source_ip, allowed_ips)
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 if isinstance(allowed_ips, (tuple, list, set)):
for ip in allowed_ips:
Mads Kiilerich
cleanup: avoid some 'except Exception' catching - catch specific exceptions or log it and show what happened...
r4733 if ipaddr.IPAddress(source_ip) in ipaddr.IPNetwork(ip):
Mads Kiilerich
cleanup: pass log strings unformatted - avoid unnecessary % formatting when not logging
r5375 log.debug('IP %s is network %s',
ipaddr.IPAddress(source_ip), ipaddr.IPNetwork(ip))
Mads Kiilerich
cleanup: avoid some 'except Exception' catching - catch specific exceptions or log it and show what happened...
r4733 return True
Bradley M. Kuhn
Second step in two-part process to rename directories....
r4187 return False