auth.py
723 lines
| 28.3 KiB
| text/x-python
|
PythonLexer
Bradley M. Kuhn
|
r4187 | # -*- coding: utf-8 -*- | ||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU General Public License as published by | ||||
# the Free Software Foundation, either version 3 of the License, or | ||||
# (at your option) any later version. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
""" | ||||
kallithea.lib.auth | ||||
~~~~~~~~~~~~~~~~~~ | ||||
authentication and permission libraries | ||||
Bradley M. Kuhn
|
r4211 | This file was forked by the Kallithea project in July 2014. | ||
Original author and date, and relevant copyright and licensing information is below: | ||||
Bradley M. Kuhn
|
r4187 | :created_on: Apr 4, 2010 | ||
:author: marcink | ||||
Bradley M. Kuhn
|
r4211 | :copyright: (c) 2013 RhodeCode GmbH, and others. | ||
Bradley M. Kuhn
|
r4208 | :license: GPLv3, see LICENSE.md for more details. | ||
Bradley M. Kuhn
|
r4187 | """ | ||
import itertools | ||||
Mads Kiilerich
|
r7718 | import logging | ||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r7716 | import ipaddr | ||
Mads Kiilerich
|
r7718 | from decorator import decorator | ||
from sqlalchemy.orm import joinedload | ||||
from sqlalchemy.orm.exc import ObjectDeletedError | ||||
Mads Kiilerich
|
r7719 | from tg import request | ||
Mads Kiilerich
|
r6508 | from tg.i18n import ugettext as _ | ||
Mads Kiilerich
|
r7719 | from webob.exc import HTTPForbidden, HTTPFound | ||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r8230 | import kallithea | ||
Mads Kiilerich
|
r8488 | from kallithea.lib import webutils | ||
Mads Kiilerich
|
r8214 | from kallithea.lib.utils import get_repo_group_slug, get_repo_slug, get_user_group_slug | ||
Mads Kiilerich
|
r8374 | from kallithea.lib.vcs.utils.lazy import LazyProperty | ||
Mads Kiilerich
|
r8445 | from kallithea.lib.webutils import url | ||
Mads Kiilerich
|
r8453 | from kallithea.model import db, meta | ||
Bradley M. Kuhn
|
r4187 | from kallithea.model.user import UserModel | ||
log = logging.getLogger(__name__) | ||||
Mads Kiilerich
|
r8453 | PERM_WEIGHTS = db.Permission.PERM_WEIGHTS | ||
Mads Kiilerich
|
r8375 | |||
def bump_permission(permissions, key, new_perm): | ||||
"""Add a new permission for key to permissions. | ||||
Assuming the permissions are comparable, set the new permission if it | ||||
has higher weight, else drop it and keep the old permission. | ||||
""" | ||||
cur_perm = permissions[key] | ||||
new_perm_val = PERM_WEIGHTS[new_perm] | ||||
cur_perm_val = PERM_WEIGHTS[cur_perm] | ||||
if new_perm_val > cur_perm_val: | ||||
permissions[key] = new_perm | ||||
Bradley M. Kuhn
|
r4187 | class AuthUser(object): | ||
""" | ||||
Søren Løvborg
|
r5253 | Represents a Kallithea user, including various authentication and | ||
authorization information. Typically used to store the current user, | ||||
but is also used as a generic user information data structure in | ||||
parts of the code, e.g. user management. | ||||
Bradley M. Kuhn
|
r4187 | |||
Søren Løvborg
|
r5326 | Constructed from a database `User` object, a user ID or cookie dict, | ||
it looks up the user (if needed) and copies all attributes to itself, | ||||
Søren Løvborg
|
r5253 | adding various non-persistent data. If lookup fails but anonymous | ||
access to Kallithea is enabled, the default user is loaded instead. | ||||
Mads Kiilerich
|
r7600 | `AuthUser` does not by itself authenticate users. It's up to other parts of | ||
the code to check e.g. if a supplied password is correct, and if so, trust | ||||
the AuthUser object as an authenticated user. | ||||
Søren Løvborg
|
r5316 | |||
However, `AuthUser` does refuse to load a user that is not `active`. | ||||
Søren Løvborg
|
r6345 | |||
Note that Kallithea distinguishes between the default user (an actual | ||||
user in the database with username "default") and "no user" (no actual | ||||
User object, AuthUser filled with blank values and username "None"). | ||||
If the default user is active, that will always be used instead of | ||||
"no user". On the other hand, if the default user is disabled (and | ||||
there is no login information), we instead get "no user"; this should | ||||
only happen on the login page (as all other requests are redirected). | ||||
`is_default_user` specifically checks if the AuthUser is the user named | ||||
"default". Use `is_anonymous` to check for both "default" and "no user". | ||||
Bradley M. Kuhn
|
r4187 | """ | ||
Mads Kiilerich
|
r7602 | @classmethod | ||
Mads Kiilerich
|
r7610 | def make(cls, dbuser=None, is_external_auth=False, ip_addr=None): | ||
Mads Kiilerich
|
r7602 | """Create an AuthUser to be authenticated ... or return None if user for some reason can't be authenticated. | ||
Mads Kiilerich
|
r7603 | Checks that a non-None dbuser is provided, is active, and that the IP address is ok. | ||
Mads Kiilerich
|
r7602 | """ | ||
Mads Kiilerich
|
r7603 | assert ip_addr is not None | ||
Mads Kiilerich
|
r7602 | if dbuser is None: | ||
log.info('No db user for authentication') | ||||
return None | ||||
if not dbuser.active: | ||||
log.info('Db user %s not active', dbuser.username) | ||||
return None | ||||
Mads Kiilerich
|
r8212 | allowed_ips = AuthUser.get_allowed_ips(dbuser.user_id) | ||
Mads Kiilerich
|
r7603 | if not check_ip_access(source_ip=ip_addr, allowed_ips=allowed_ips): | ||
log.info('Access for %s from %s forbidden - not in %s', dbuser.username, ip_addr, allowed_ips) | ||||
return None | ||||
Mads Kiilerich
|
r7610 | return cls(dbuser=dbuser, is_external_auth=is_external_auth) | ||
Mads Kiilerich
|
r7602 | |||
Mads Kiilerich
|
r7610 | def __init__(self, user_id=None, dbuser=None, is_external_auth=False): | ||
Mads Kiilerich
|
r7601 | self.is_external_auth = is_external_auth # container auth - don't show logout option | ||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r8146 | # These attributes will be overridden below if the requested user is | ||
# found or anonymous access (using the default user) is enabled. | ||||
Søren Løvborg
|
r5325 | self.user_id = None | ||
self.username = None | ||||
self.api_key = None | ||||
self.name = '' | ||||
self.lastname = '' | ||||
self.email = '' | ||||
self.admin = False | ||||
Søren Løvborg
|
r5326 | # Look up database user, if necessary. | ||
Søren Løvborg
|
r5325 | if user_id is not None: | ||
Mads Kiilerich
|
r7601 | assert dbuser is None | ||
Mads Kiilerich
|
r5375 | log.debug('Auth User lookup by USER ID %s', user_id) | ||
Mads Kiilerich
|
r7404 | dbuser = UserModel().get(user_id) | ||
Mads Kiilerich
|
r7601 | assert dbuser is not None | ||
Søren Løvborg
|
r5326 | else: | ||
Mads Kiilerich
|
r7601 | assert dbuser is not None | ||
Søren Løvborg
|
r5326 | log.debug('Auth User lookup by database user %s', dbuser) | ||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r7638 | log.debug('filling %s data', dbuser) | ||
self.is_anonymous = dbuser.is_default_user | ||||
if dbuser.is_default_user and not dbuser.active: | ||||
Bradley M. Kuhn
|
r4187 | self.username = 'None' | ||
Mads Kiilerich
|
r7601 | self.is_default_user = False | ||
Mads Kiilerich
|
r7638 | else: | ||
# copy non-confidential database fields from a `db.User` to this `AuthUser`. | ||||
Mads Kiilerich
|
r8059 | for k, v in dbuser.get_dict().items(): | ||
Søren Løvborg
|
r5328 | assert k not in ['api_keys', 'permissions'] | ||
setattr(self, k, v) | ||||
Mads Kiilerich
|
r7638 | self.is_default_user = dbuser.is_default_user | ||
log.debug('Auth User is now %s', self) | ||||
Søren Løvborg
|
r5327 | |||
Mads Kiilerich
|
r8374 | @LazyProperty | ||
Mads Kiilerich
|
r8376 | def global_permissions(self): | ||
log.debug('Getting global permissions for %s', self) | ||||
if self.is_admin: | ||||
return set(['hg.admin']) | ||||
global_permissions = set() | ||||
# default global permissions from the default user | ||||
Mads Kiilerich
|
r8453 | default_global_perms = db.UserToPerm.query() \ | ||
.filter(db.UserToPerm.user_id == kallithea.DEFAULT_USER_ID) \ | ||||
.options(joinedload(db.UserToPerm.permission)) | ||||
Mads Kiilerich
|
r8376 | for perm in default_global_perms: | ||
global_permissions.add(perm.permission.permission_name) | ||||
# user group global permissions | ||||
Mads Kiilerich
|
r8453 | user_perms_from_users_groups = meta.Session().query(db.UserGroupToPerm) \ | ||
.options(joinedload(db.UserGroupToPerm.permission)) \ | ||||
.join((db.UserGroupMember, db.UserGroupToPerm.users_group_id == | ||||
db.UserGroupMember.users_group_id)) \ | ||||
.filter(db.UserGroupMember.user_id == self.user_id) \ | ||||
.join((db.UserGroup, db.UserGroupMember.users_group_id == | ||||
db.UserGroup.users_group_id)) \ | ||||
.filter(db.UserGroup.users_group_active == True) \ | ||||
.order_by(db.UserGroupToPerm.users_group_id) \ | ||||
Mads Kiilerich
|
r8376 | .all() | ||
# need to group here by groups since user can be in more than | ||||
# one group | ||||
_grouped = [[x, list(y)] for x, y in | ||||
itertools.groupby(user_perms_from_users_groups, | ||||
lambda x:x.users_group)] | ||||
for gr, perms in _grouped: | ||||
for perm in perms: | ||||
global_permissions.add(perm.permission.permission_name) | ||||
# user specific global permissions | ||||
Mads Kiilerich
|
r8453 | user_perms = meta.Session().query(db.UserToPerm) \ | ||
.options(joinedload(db.UserToPerm.permission)) \ | ||||
.filter(db.UserToPerm.user_id == self.user_id).all() | ||||
Mads Kiilerich
|
r8376 | for perm in user_perms: | ||
global_permissions.add(perm.permission.permission_name) | ||||
# for each kind of global permissions, only keep the one with heighest weight | ||||
kind_max_perm = {} | ||||
for perm in sorted(global_permissions, key=lambda n: PERM_WEIGHTS.get(n, -1)): | ||||
kind = perm.rsplit('.', 1)[0] | ||||
kind_max_perm[kind] = perm | ||||
return set(kind_max_perm.values()) | ||||
@LazyProperty | ||||
Mads Kiilerich
|
r8377 | def repository_permissions(self): | ||
log.debug('Getting repository permissions for %s', self) | ||||
repository_permissions = {} | ||||
Mads Kiilerich
|
r8453 | default_repo_perms = db.Permission.get_default_perms(kallithea.DEFAULT_USER_ID) | ||
Mads Kiilerich
|
r8377 | |||
if self.is_admin: | ||||
for perm in default_repo_perms: | ||||
r_k = perm.repository.repo_name | ||||
p = 'repository.admin' | ||||
repository_permissions[r_k] = p | ||||
else: | ||||
# defaults for repositories from default user | ||||
for perm in default_repo_perms: | ||||
r_k = perm.repository.repo_name | ||||
if perm.repository.owner_id == self.user_id: | ||||
p = 'repository.admin' | ||||
elif perm.repository.private: | ||||
p = 'repository.none' | ||||
else: | ||||
p = perm.permission.permission_name | ||||
repository_permissions[r_k] = p | ||||
# user group repository permissions | ||||
user_repo_perms_from_users_groups = \ | ||||
Mads Kiilerich
|
r8453 | meta.Session().query(db.UserGroupRepoToPerm) \ | ||
.join((db.UserGroup, db.UserGroupRepoToPerm.users_group_id == | ||||
db.UserGroup.users_group_id)) \ | ||||
.filter(db.UserGroup.users_group_active == True) \ | ||||
.join((db.UserGroupMember, db.UserGroupRepoToPerm.users_group_id == | ||||
db.UserGroupMember.users_group_id)) \ | ||||
.filter(db.UserGroupMember.user_id == self.user_id) \ | ||||
.options(joinedload(db.UserGroupRepoToPerm.repository)) \ | ||||
.options(joinedload(db.UserGroupRepoToPerm.permission)) \ | ||||
Mads Kiilerich
|
r8377 | .all() | ||
for perm in user_repo_perms_from_users_groups: | ||||
bump_permission(repository_permissions, | ||||
perm.repository.repo_name, | ||||
perm.permission.permission_name) | ||||
# user permissions for repositories | ||||
Mads Kiilerich
|
r8453 | user_repo_perms = db.Permission.get_default_perms(self.user_id) | ||
Mads Kiilerich
|
r8377 | for perm in user_repo_perms: | ||
bump_permission(repository_permissions, | ||||
perm.repository.repo_name, | ||||
perm.permission.permission_name) | ||||
return repository_permissions | ||||
@LazyProperty | ||||
Mads Kiilerich
|
r8378 | def repository_group_permissions(self): | ||
log.debug('Getting repository group permissions for %s', self) | ||||
repository_group_permissions = {} | ||||
Mads Kiilerich
|
r8453 | default_repo_groups_perms = db.Permission.get_default_group_perms(kallithea.DEFAULT_USER_ID) | ||
Mads Kiilerich
|
r8378 | |||
if self.is_admin: | ||||
for perm in default_repo_groups_perms: | ||||
rg_k = perm.group.group_name | ||||
p = 'group.admin' | ||||
repository_group_permissions[rg_k] = p | ||||
else: | ||||
# defaults for repository groups taken from default user permission | ||||
# on given group | ||||
for perm in default_repo_groups_perms: | ||||
rg_k = perm.group.group_name | ||||
p = perm.permission.permission_name | ||||
repository_group_permissions[rg_k] = p | ||||
# user group for repo groups permissions | ||||
user_repo_group_perms_from_users_groups = \ | ||||
Mads Kiilerich
|
r8453 | meta.Session().query(db.UserGroupRepoGroupToPerm) \ | ||
.join((db.UserGroup, db.UserGroupRepoGroupToPerm.users_group_id == | ||||
db.UserGroup.users_group_id)) \ | ||||
.filter(db.UserGroup.users_group_active == True) \ | ||||
.join((db.UserGroupMember, db.UserGroupRepoGroupToPerm.users_group_id | ||||
== db.UserGroupMember.users_group_id)) \ | ||||
.filter(db.UserGroupMember.user_id == self.user_id) \ | ||||
.options(joinedload(db.UserGroupRepoGroupToPerm.permission)) \ | ||||
Mads Kiilerich
|
r8378 | .all() | ||
for perm in user_repo_group_perms_from_users_groups: | ||||
bump_permission(repository_group_permissions, | ||||
perm.group.group_name, | ||||
perm.permission.permission_name) | ||||
# user explicit permissions for repository groups | ||||
Mads Kiilerich
|
r8453 | user_repo_groups_perms = db.Permission.get_default_group_perms(self.user_id) | ||
Mads Kiilerich
|
r8378 | for perm in user_repo_groups_perms: | ||
bump_permission(repository_group_permissions, | ||||
perm.group.group_name, | ||||
perm.permission.permission_name) | ||||
return repository_group_permissions | ||||
@LazyProperty | ||||
Mads Kiilerich
|
r8379 | def user_group_permissions(self): | ||
log.debug('Getting user group permissions for %s', self) | ||||
user_group_permissions = {} | ||||
Mads Kiilerich
|
r8453 | default_user_group_perms = db.Permission.get_default_user_group_perms(kallithea.DEFAULT_USER_ID) | ||
Mads Kiilerich
|
r8379 | |||
if self.is_admin: | ||||
for perm in default_user_group_perms: | ||||
u_k = perm.user_group.users_group_name | ||||
p = 'usergroup.admin' | ||||
user_group_permissions[u_k] = p | ||||
else: | ||||
# defaults for user groups taken from default user permission | ||||
# on given user group | ||||
for perm in default_user_group_perms: | ||||
u_k = perm.user_group.users_group_name | ||||
p = perm.permission.permission_name | ||||
user_group_permissions[u_k] = p | ||||
# user group for user group permissions | ||||
user_group_user_groups_perms = \ | ||||
Mads Kiilerich
|
r8453 | meta.Session().query(db.UserGroupUserGroupToPerm) \ | ||
.join((db.UserGroup, db.UserGroupUserGroupToPerm.target_user_group_id | ||||
== db.UserGroup.users_group_id)) \ | ||||
.join((db.UserGroupMember, db.UserGroupUserGroupToPerm.user_group_id | ||||
== db.UserGroupMember.users_group_id)) \ | ||||
.filter(db.UserGroupMember.user_id == self.user_id) \ | ||||
.join((db.UserGroup, db.UserGroupMember.users_group_id == | ||||
db.UserGroup.users_group_id), aliased=True, from_joinpoint=True) \ | ||||
.filter(db.UserGroup.users_group_active == True) \ | ||||
.options(joinedload(db.UserGroupUserGroupToPerm.permission)) \ | ||||
Mads Kiilerich
|
r8379 | .all() | ||
for perm in user_group_user_groups_perms: | ||||
bump_permission(user_group_permissions, | ||||
perm.target_user_group.users_group_name, | ||||
perm.permission.permission_name) | ||||
# user explicit permission for user groups | ||||
Mads Kiilerich
|
r8453 | user_user_groups_perms = db.Permission.get_default_user_group_perms(self.user_id) | ||
Mads Kiilerich
|
r8379 | for perm in user_user_groups_perms: | ||
bump_permission(user_group_permissions, | ||||
perm.user_group.users_group_name, | ||||
perm.permission.permission_name) | ||||
return user_group_permissions | ||||
@LazyProperty | ||||
Mads Kiilerich
|
r8374 | def permissions(self): | ||
"""dict with all 4 kind of permissions - mainly for backwards compatibility""" | ||||
return { | ||||
Mads Kiilerich
|
r8368 | 'global': self.global_permissions, | ||
'repositories': self.repository_permissions, | ||||
'repositories_groups': self.repository_group_permissions, | ||||
'user_groups': self.user_group_permissions, | ||||
Mads Kiilerich
|
r8374 | } | ||
Søren Løvborg
|
r5325 | |||
Søren Løvborg
|
r6471 | def has_repository_permission_level(self, repo_name, level, purpose=None): | ||
required_perms = { | ||||
'read': ['repository.read', 'repository.write', 'repository.admin'], | ||||
'write': ['repository.write', 'repository.admin'], | ||||
'admin': ['repository.admin'], | ||||
}[level] | ||||
Mads Kiilerich
|
r8368 | actual_perm = self.repository_permissions.get(repo_name) | ||
Søren Løvborg
|
r6471 | ok = actual_perm in required_perms | ||
log.debug('Checking if user %r can %r repo %r (%s): %s (has %r)', | ||||
self.username, level, repo_name, purpose, ok, actual_perm) | ||||
return ok | ||||
Søren Løvborg
|
r6472 | def has_repository_group_permission_level(self, repo_group_name, level, purpose=None): | ||
required_perms = { | ||||
'read': ['group.read', 'group.write', 'group.admin'], | ||||
'write': ['group.write', 'group.admin'], | ||||
'admin': ['group.admin'], | ||||
}[level] | ||||
Mads Kiilerich
|
r8368 | actual_perm = self.repository_group_permissions.get(repo_group_name) | ||
Søren Løvborg
|
r6472 | ok = actual_perm in required_perms | ||
log.debug('Checking if user %r can %r repo group %r (%s): %s (has %r)', | ||||
self.username, level, repo_group_name, purpose, ok, actual_perm) | ||||
return ok | ||||
Søren Løvborg
|
r6473 | def has_user_group_permission_level(self, user_group_name, level, purpose=None): | ||
required_perms = { | ||||
'read': ['usergroup.read', 'usergroup.write', 'usergroup.admin'], | ||||
'write': ['usergroup.write', 'usergroup.admin'], | ||||
'admin': ['usergroup.admin'], | ||||
}[level] | ||||
Mads Kiilerich
|
r8368 | actual_perm = self.user_group_permissions.get(user_group_name) | ||
Søren Løvborg
|
r6473 | ok = actual_perm in required_perms | ||
log.debug('Checking if user %r can %r user group %r (%s): %s (has %r)', | ||||
self.username, level, user_group_name, purpose, ok, actual_perm) | ||||
return ok | ||||
Søren Løvborg
|
r5325 | @property | ||
def api_keys(self): | ||||
return self._get_api_keys() | ||||
Søren Løvborg
|
r5317 | def _get_api_keys(self): | ||
Bradley M. Kuhn
|
r4187 | api_keys = [self.api_key] | ||
Mads Kiilerich
|
r8453 | for api_key in db.UserApiKeys.query() \ | ||
Søren Løvborg
|
r6474 | .filter_by(user_id=self.user_id, is_expired=False): | ||
Bradley M. Kuhn
|
r4187 | api_keys.append(api_key.api_key) | ||
return api_keys | ||||
@property | ||||
def is_admin(self): | ||||
return self.admin | ||||
@property | ||||
def repositories_admin(self): | ||||
""" | ||||
Returns list of repositories you're an admin of | ||||
""" | ||||
Mads Kiilerich
|
r8368 | return [x[0] for x in self.repository_permissions.items() | ||
Bradley M. Kuhn
|
r4187 | if x[1] == 'repository.admin'] | ||
@property | ||||
def repository_groups_admin(self): | ||||
""" | ||||
Returns list of repository groups you're an admin of | ||||
""" | ||||
Mads Kiilerich
|
r8368 | return [x[0] for x in self.repository_group_permissions.items() | ||
Bradley M. Kuhn
|
r4187 | if x[1] == 'group.admin'] | ||
@property | ||||
def user_groups_admin(self): | ||||
""" | ||||
Returns list of user groups you're an admin of | ||||
""" | ||||
Mads Kiilerich
|
r8368 | return [x[0] for x in self.user_group_permissions.items() | ||
Bradley M. Kuhn
|
r4187 | if x[1] == 'usergroup.admin'] | ||
def __repr__(self): | ||||
Mads Kiilerich
|
r7957 | return "<%s %s: %r>" % (self.__class__.__name__, self.user_id, self.username) | ||
Bradley M. Kuhn
|
r4187 | |||
Søren Løvborg
|
r5265 | def to_cookie(self): | ||
""" Serializes this login session to a cookie `dict`. """ | ||||
return { | ||||
'user_id': self.user_id, | ||||
Søren Løvborg
|
r5266 | 'is_external_auth': self.is_external_auth, | ||
Søren Løvborg
|
r5265 | } | ||
Bradley M. Kuhn
|
r4187 | |||
Søren Løvborg
|
r5265 | @staticmethod | ||
Mads Kiilerich
|
r7603 | def from_cookie(cookie, ip_addr): | ||
Bradley M. Kuhn
|
r4187 | """ | ||
Mads Kiilerich
|
r7602 | Deserializes an `AuthUser` from a cookie `dict` ... or return None. | ||
Bradley M. Kuhn
|
r4187 | """ | ||
Mads Kiilerich
|
r7602 | return AuthUser.make( | ||
dbuser=UserModel().get(cookie.get('user_id')), | ||||
Søren Løvborg
|
r5266 | is_external_auth=cookie.get('is_external_auth', False), | ||
Mads Kiilerich
|
r7603 | ip_addr=ip_addr, | ||
Søren Løvborg
|
r5265 | ) | ||
Bradley M. Kuhn
|
r4187 | |||
@classmethod | ||||
Mads Kiilerich
|
r8212 | def get_allowed_ips(cls, user_id): | ||
Bradley M. Kuhn
|
r4187 | _set = set() | ||
Mads Kiilerich
|
r8453 | default_ips = db.UserIpMap.query().filter(db.UserIpMap.user_id == kallithea.DEFAULT_USER_ID) | ||
Mads Kiilerich
|
r7592 | for ip in default_ips: | ||
try: | ||||
_set.add(ip.ip_addr) | ||||
except ObjectDeletedError: | ||||
# since we use heavy caching sometimes it happens that we get | ||||
# deleted objects here, we just skip them | ||||
pass | ||||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r8453 | user_ips = db.UserIpMap.query().filter(db.UserIpMap.user_id == user_id) | ||
Bradley M. Kuhn
|
r4187 | for ip in user_ips: | ||
try: | ||||
_set.add(ip.ip_addr) | ||||
except ObjectDeletedError: | ||||
# since we use heavy caching sometimes it happens that we get | ||||
# deleted objects here, we just skip them | ||||
pass | ||||
return _set or set(['0.0.0.0/0', '::/0']) | ||||
Mads Kiilerich
|
r8501 | def get_all_user_repos(self): | ||
""" | ||||
Gets all repositories that user have at least read access | ||||
""" | ||||
repos = [repo_name | ||||
for repo_name, perm in self.repository_permissions.items() | ||||
if perm in ['repository.read', 'repository.write', 'repository.admin'] | ||||
] | ||||
return db.Repository.query().filter(db.Repository.repo_name.in_(repos)) | ||||
Bradley M. Kuhn
|
r4187 | |||
#============================================================================== | ||||
# CHECK DECORATORS | ||||
#============================================================================== | ||||
Thomas De Schampheleire
|
r5115 | |||
Mads Kiilerich
|
r5583 | def _redirect_to_login(message=None): | ||
"""Return an exception that must be raised. It will redirect to the login | ||||
page which will redirect back to the current URL after authentication. | ||||
The optional message will be shown in a flash message.""" | ||||
Mads Kiilerich
|
r5141 | if message: | ||
Mads Kiilerich
|
r8488 | webutils.flash(message, category='warning') | ||
Mads Kiilerich
|
r5583 | p = request.path_qs | ||
Mads Kiilerich
|
r5375 | log.debug('Redirecting to login page, origin: %s', p) | ||
Mads Kiilerich
|
r5583 | return HTTPFound(location=url('login_home', came_from=p)) | ||
Søren Løvborg
|
r5511 | |||
Thomas De Schampheleire
|
r5115 | |||
Mads Kiilerich
|
r6413 | # Use as decorator | ||
Bradley M. Kuhn
|
r4187 | class LoginRequired(object): | ||
Mads Kiilerich
|
r7019 | """Client must be logged in as a valid User, or we'll redirect to the login | ||
page. | ||||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r7019 | If the "default" user is enabled and allow_default_user is true, that is | ||
considered valid too. | ||||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r7598 | Also checks that IP address is allowed. | ||
Bradley M. Kuhn
|
r4187 | """ | ||
Mads Kiilerich
|
r7598 | def __init__(self, allow_default_user=False): | ||
Mads Kiilerich
|
r7019 | self.allow_default_user = allow_default_user | ||
Bradley M. Kuhn
|
r4187 | |||
def __call__(self, func): | ||||
return decorator(self.__wrapper, func) | ||||
def __wrapper(self, func, *fargs, **fkwargs): | ||||
Søren Løvborg
|
r5211 | controller = fargs[0] | ||
Mads Kiilerich
|
r6412 | user = request.authuser | ||
Søren Løvborg
|
r5211 | loc = "%s:%s" % (controller.__class__.__name__, func.__name__) | ||
Mads Kiilerich
|
r5375 | log.debug('Checking access for user %s @ %s', user, loc) | ||
Bradley M. Kuhn
|
r4187 | |||
Thomas De Schampheleire
|
r5142 | # regular user authentication | ||
Mads Kiilerich
|
r7599 | if user.is_default_user: | ||
Mads Kiilerich
|
r7019 | if self.allow_default_user: | ||
log.info('default user @ %s', loc) | ||||
return func(*fargs, **fkwargs) | ||||
Mads Kiilerich
|
r8695 | log.info('default user is redirected to login @ %s', loc) | ||
Mads Kiilerich
|
r7599 | elif user.is_anonymous: # default user is disabled and no proper authentication | ||
Mads Kiilerich
|
r8695 | log.info('anonymous user is redirected to login @ %s', loc) | ||
Mads Kiilerich
|
r7599 | else: # regular authentication | ||
log.info('user %s authenticated with regular auth @ %s', user, loc) | ||||
return func(*fargs, **fkwargs) | ||||
Mads Kiilerich
|
r7019 | raise _redirect_to_login() | ||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r6413 | |||
# Use as decorator | ||||
Bradley M. Kuhn
|
r4187 | class NotAnonymous(object): | ||
Søren Løvborg
|
r6345 | """Ensures that client is not logged in as the "default" user, and | ||
redirects to the login page otherwise. Must be used together with | ||||
LoginRequired.""" | ||||
Bradley M. Kuhn
|
r4187 | |||
def __call__(self, func): | ||||
return decorator(self.__wrapper, func) | ||||
def __wrapper(self, func, *fargs, **fkwargs): | ||||
cls = fargs[0] | ||||
Mads Kiilerich
|
r6413 | user = request.authuser | ||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r6413 | log.debug('Checking that user %s is not anonymous @%s', user.username, cls) | ||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r6413 | if user.is_default_user: | ||
Mads Kiilerich
|
r5583 | raise _redirect_to_login(_('You need to be a registered user to ' | ||
'perform this action')) | ||||
Bradley M. Kuhn
|
r4187 | else: | ||
return func(*fargs, **fkwargs) | ||||
Mads Kiilerich
|
r6413 | class _PermsDecorator(object): | ||
Mads Kiilerich
|
r6576 | """Base class for controller decorators with multiple permissions""" | ||
Bradley M. Kuhn
|
r4187 | |||
def __init__(self, *required_perms): | ||||
Mads Kiilerich
|
r6413 | self.required_perms = required_perms # usually very short - a list is thus fine | ||
Bradley M. Kuhn
|
r4187 | |||
def __call__(self, func): | ||||
return decorator(self.__wrapper, func) | ||||
def __wrapper(self, func, *fargs, **fkwargs): | ||||
cls = fargs[0] | ||||
Mads Kiilerich
|
r6413 | user = request.authuser | ||
Bradley M. Kuhn
|
r4187 | log.debug('checking %s permissions %s for %s %s', | ||
Mads Kiilerich
|
r6413 | self.__class__.__name__, self.required_perms, cls, user) | ||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r6413 | if self.check_permissions(user): | ||
log.debug('Permission granted for %s %s', cls, user) | ||||
Bradley M. Kuhn
|
r4187 | return func(*fargs, **fkwargs) | ||
else: | ||||
Thomas De Schampheleire
|
r7207 | log.info('Permission denied for %s %s', cls, user) | ||
Mads Kiilerich
|
r6413 | if user.is_default_user: | ||
Mads Kiilerich
|
r5583 | raise _redirect_to_login(_('You need to be signed in to view this page')) | ||
Bradley M. Kuhn
|
r4187 | else: | ||
Søren Løvborg
|
r5542 | raise HTTPForbidden() | ||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r6413 | def check_permissions(self, user): | ||
Søren Løvborg
|
r6348 | raise NotImplementedError() | ||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r6413 | class HasPermissionAnyDecorator(_PermsDecorator): | ||
Bradley M. Kuhn
|
r4187 | """ | ||
Mads Kiilerich
|
r6413 | Checks the user has any of the given global permissions. | ||
Bradley M. Kuhn
|
r4187 | """ | ||
Mads Kiilerich
|
r6413 | def check_permissions(self, user): | ||
Mads Kiilerich
|
r8368 | return any(p in user.global_permissions for p in self.required_perms) | ||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r6576 | class _PermDecorator(_PermsDecorator): | ||
"""Base class for controller decorators with a single permission""" | ||||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r6576 | def __init__(self, required_perm): | ||
_PermsDecorator.__init__(self, [required_perm]) | ||||
self.required_perm = required_perm | ||||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r6576 | class HasRepoPermissionLevelDecorator(_PermDecorator): | ||
Bradley M. Kuhn
|
r4187 | """ | ||
Søren Løvborg
|
r6471 | Checks the user has at least the specified permission level for the requested repository. | ||
Bradley M. Kuhn
|
r4187 | """ | ||
Mads Kiilerich
|
r6413 | def check_permissions(self, user): | ||
Bradley M. Kuhn
|
r4187 | repo_name = get_repo_slug(request) | ||
Mads Kiilerich
|
r6576 | return user.has_repository_permission_level(repo_name, self.required_perm) | ||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r6576 | class HasRepoGroupPermissionLevelDecorator(_PermDecorator): | ||
Bradley M. Kuhn
|
r4187 | """ | ||
Mads Kiilerich
|
r6413 | Checks the user has any of given permissions for the requested repository group. | ||
Bradley M. Kuhn
|
r4187 | """ | ||
Mads Kiilerich
|
r6413 | def check_permissions(self, user): | ||
repo_group_name = get_repo_group_slug(request) | ||||
Mads Kiilerich
|
r6576 | return user.has_repository_group_permission_level(repo_group_name, self.required_perm) | ||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r6576 | class HasUserGroupPermissionLevelDecorator(_PermDecorator): | ||
Bradley M. Kuhn
|
r4187 | """ | ||
Checks for access permission for any of given predicates for specific | ||||
user group. In order to fulfill the request any of predicates must be meet | ||||
""" | ||||
Mads Kiilerich
|
r6413 | def check_permissions(self, user): | ||
user_group_name = get_user_group_slug(request) | ||||
Mads Kiilerich
|
r6576 | return user.has_user_group_permission_level(user_group_name, self.required_perm) | ||
Bradley M. Kuhn
|
r4187 | |||
#============================================================================== | ||||
# CHECK FUNCTIONS | ||||
#============================================================================== | ||||
Mads Kiilerich
|
r6413 | class _PermsFunction(object): | ||
Mads Kiilerich
|
r6576 | """Base function for other check functions with multiple permissions""" | ||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r6413 | def __init__(self, *required_perms): | ||
self.required_perms = required_perms # usually very short - a list is thus fine | ||||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r8055 | def __bool__(self): | ||
Søren Løvborg
|
r5839 | """ Defend against accidentally forgetting to call the object | ||
and instead evaluating it directly in a boolean context, | ||||
which could have security implications. | ||||
""" | ||||
raise AssertionError(self.__class__.__name__ + ' is not a bool and must be called!') | ||||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r6413 | def __call__(self, *a, **b): | ||
Søren Løvborg
|
r6348 | raise NotImplementedError() | ||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r6413 | class HasPermissionAny(_PermsFunction): | ||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r6413 | def __call__(self, purpose=None): | ||
Mads Kiilerich
|
r8368 | ok = any(p in request.authuser.global_permissions for p in self.required_perms) | ||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r7572 | log.debug('Check %s for global %s (%s): %s', | ||
request.authuser.username, self.required_perms, purpose, ok) | ||||
Mads Kiilerich
|
r6413 | return ok | ||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r6576 | class _PermFunction(_PermsFunction): | ||
"""Base function for other check functions with a single permission""" | ||||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r6576 | def __init__(self, required_perm): | ||
_PermsFunction.__init__(self, [required_perm]) | ||||
self.required_perm = required_perm | ||||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r6576 | class HasRepoPermissionLevel(_PermFunction): | ||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r6413 | def __call__(self, repo_name, purpose=None): | ||
Mads Kiilerich
|
r7405 | return request.authuser.has_repository_permission_level(repo_name, self.required_perm, purpose) | ||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r6576 | class HasRepoGroupPermissionLevel(_PermFunction): | ||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r6413 | def __call__(self, group_name, purpose=None): | ||
Mads Kiilerich
|
r7405 | return request.authuser.has_repository_group_permission_level(group_name, self.required_perm, purpose) | ||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r6576 | class HasUserGroupPermissionLevel(_PermFunction): | ||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r6413 | def __call__(self, user_group_name, purpose=None): | ||
Mads Kiilerich
|
r7405 | return request.authuser.has_user_group_permission_level(user_group_name, self.required_perm, purpose) | ||
Bradley M. Kuhn
|
r4187 | |||
#============================================================================== | ||||
# SPECIAL VERSION TO HANDLE MIDDLEWARE AUTH | ||||
#============================================================================== | ||||
Mads Kiilerich
|
r6413 | |||
Bradley M. Kuhn
|
r4187 | class HasPermissionAnyMiddleware(object): | ||
def __init__(self, *perms): | ||||
self.required_perms = set(perms) | ||||
Mads Kiilerich
|
r7602 | def __call__(self, authuser, repo_name, purpose=None): | ||
Bradley M. Kuhn
|
r4187 | try: | ||
Mads Kiilerich
|
r8368 | ok = authuser.repository_permissions[repo_name] in self.required_perms | ||
Bradley M. Kuhn
|
r4187 | except KeyError: | ||
Mads Kiilerich
|
r6413 | ok = False | ||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r7602 | log.debug('Middleware check %s for %s for repo %s (%s): %s', authuser.username, self.required_perms, repo_name, purpose, ok) | ||
Mads Kiilerich
|
r6413 | return ok | ||
Bradley M. Kuhn
|
r4187 | |||
def check_ip_access(source_ip, allowed_ips=None): | ||||
""" | ||||
Checks if source_ip is a subnet of any of allowed_ips. | ||||
:param source_ip: | ||||
:param allowed_ips: list of allowed ips together with mask | ||||
""" | ||||
Mads Kiilerich
|
r7289 | source_ip = source_ip.split('%', 1)[0] | ||
Mads Kiilerich
|
r5375 | log.debug('checking if ip:%s is subnet of %s', source_ip, allowed_ips) | ||
Bradley M. Kuhn
|
r4187 | if isinstance(allowed_ips, (tuple, list, set)): | ||
for ip in allowed_ips: | ||||
Mads Kiilerich
|
r4733 | if ipaddr.IPAddress(source_ip) in ipaddr.IPNetwork(ip): | ||
Mads Kiilerich
|
r5375 | log.debug('IP %s is network %s', | ||
ipaddr.IPAddress(source_ip), ipaddr.IPNetwork(ip)) | ||||
Mads Kiilerich
|
r4733 | return True | ||
Bradley M. Kuhn
|
r4187 | return False | ||