##// END OF EJS Templates
caches: use individual namespaces per user to prevent beaker caching problems....
r2591:36829a17 stable
Show More
user.py
914 lines | 33.4 KiB | text/x-python | PythonLexer
project: added all source files and assets
r1 # -*- coding: utf-8 -*-
release: update copyright year to 2018
r2487 # Copyright (C) 2010-2018 RhodeCode GmbH
project: added all source files and assets
r1 #
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
"""
users model for RhodeCode
"""
import logging
import traceback
pylons: fixed code and test suite after removal of pylons.
r2358 import datetime
import ipaddress
project: added all source files and assets
r1
pylons: fixed code and test suite after removal of pylons.
r2358 from pyramid.threadlocal import get_current_request
project: added all source files and assets
r1 from sqlalchemy.exc import DatabaseError
dan
events: add event system for RepoEvents
r375 from rhodecode import events
admin-users: add audit page to allow showing user actions in RhodeCode....
r1559 from rhodecode.lib.user_log_filter import user_log_filter
project: added all source files and assets
r1 from rhodecode.lib.utils2 import (
safe_unicode, get_current_rhodecode_user, action_logger_generic,
repo-groups: implemented default personal repo groups logic....
r1094 AttributeDict, str2bool)
security: escape always the provided user data like firstname/lastname.
r1780 from rhodecode.lib.exceptions import (
DefaultUserException, UserOwnsReposException, UserOwnsRepoGroupsException,
UserOwnsUserGroupsException, NotAllowedToCreateUserError)
project: added all source files and assets
r1 from rhodecode.lib.caching_query import FromCache
from rhodecode.model import BaseModel
from rhodecode.model.auth_token import AuthTokenModel
security: escape always the provided user data like firstname/lastname.
r1780 from rhodecode.model.db import (
_hash_key, true, false, or_, joinedload, User, UserToPerm,
UserEmailMap, UserIpMap, UserLog)
project: added all source files and assets
r1 from rhodecode.model.meta import Session
from rhodecode.model.repo_group import RepoGroupModel
log = logging.getLogger(__name__)
class UserModel(BaseModel):
cls = User
def get(self, user_id, cache=False):
user = self.sa.query(User)
if cache:
caches: ensure we don't use non-ascii characters in cache keys....
r1749 user = user.options(
FromCache("sql_cache_short", "get_user_%s" % user_id))
project: added all source files and assets
r1 return user.get(user_id)
def get_user(self, user):
return self._get_user(user)
pull-request-reviewers: added option to add reviewers by picking an user group for pull requests....
r1678 def _serialize_user(self, user):
import rhodecode.lib.helpers as h
return {
'id': user.user_id,
security: use new safe escaped user attributes across the application....
r1815 'first_name': user.first_name,
'last_name': user.last_name,
pull-request-reviewers: added option to add reviewers by picking an user group for pull requests....
r1678 'username': user.username,
'email': user.email,
'icon_link': h.gravatar_url(user.email, 30),
default-reviewers: introduce new voting rule logic that allows...
r2484 'profile_link': h.link_to_user(user),
security: escape always the provided user data like firstname/lastname.
r1780 'value_display': h.escape(h.person(user)),
pull-request-reviewers: added option to add reviewers by picking an user group for pull requests....
r1678 'value': user.username,
'value_type': 'user',
'active': user.active,
}
users: moved get_users from RepoModel to UserModel.
r1677 def get_users(self, name_contains=None, limit=20, only_active=True):
query = self.sa.query(User)
if only_active:
query = query.filter(User.active == true())
if name_contains:
ilike_expression = u'%{}%'.format(safe_unicode(name_contains))
query = query.filter(
or_(
User.name.ilike(ilike_expression),
User.lastname.ilike(ilike_expression),
User.username.ilike(ilike_expression)
)
)
query = query.limit(limit)
users = query.all()
_users = [
pull-request-reviewers: added option to add reviewers by picking an user group for pull requests....
r1678 self._serialize_user(user) for user in users
users: moved get_users from RepoModel to UserModel.
r1677 ]
return _users
project: added all source files and assets
r1 def get_by_username(self, username, cache=False, case_insensitive=False):
if case_insensitive:
user = self.sa.query(User).filter(User.username.ilike(username))
else:
user = self.sa.query(User)\
.filter(User.username == username)
if cache:
caches: ensure we don't use non-ascii characters in cache keys....
r1749 name_key = _hash_key(username)
user = user.options(
FromCache("sql_cache_short", "get_user_%s" % name_key))
project: added all source files and assets
r1 return user.scalar()
def get_by_email(self, email, cache=False, case_insensitive=False):
return User.get_by_email(email, case_insensitive, cache)
def get_by_auth_token(self, auth_token, cache=False):
return User.get_by_auth_token(auth_token, cache)
def get_active_user_count(self, cache=False):
users: allow caching of active users count.
r2427 qry = User.query().filter(
User.active == true()).filter(
User.username != User.DEFAULT_USER)
if cache:
qry = qry.options(
FromCache("sql_cache_short", "get_active_users"))
return qry.count()
project: added all source files and assets
r1
def create(self, form_data, cur_user=None):
if not cur_user:
cur_user = getattr(get_current_rhodecode_user(), 'username', None)
user_data = {
'username': form_data['username'],
'password': form_data['password'],
'email': form_data['email'],
'firstname': form_data['firstname'],
'lastname': form_data['lastname'],
'active': form_data['active'],
'extern_type': form_data['extern_type'],
'extern_name': form_data['extern_name'],
'admin': False,
'cur_user': cur_user
}
repo-groups: implemented default personal repo groups logic....
r1094 if 'create_repo_group' in form_data:
user_data['create_repo_group'] = str2bool(
form_data.get('create_repo_group'))
project: added all source files and assets
r1 try:
if form_data.get('password_change'):
user_data['force_password_change'] = True
return UserModel().create_or_update(**user_data)
except Exception:
log.error(traceback.format_exc())
raise
def update_user(self, user, skip_attrs=None, **kwargs):
from rhodecode.lib.auth import get_crypt_password
user = self._get_user(user)
if user.username == User.DEFAULT_USER:
raise DefaultUserException(
pylons: fixed code and test suite after removal of pylons.
r2358 "You can't edit this user (`%(username)s`) since it's "
"crucial for entire application" % {
'username': user.username})
project: added all source files and assets
r1
# first store only defaults
user_attrs = {
'updating_user_id': user.user_id,
'username': user.username,
'password': user.password,
'email': user.email,
'firstname': user.name,
'lastname': user.lastname,
'active': user.active,
'admin': user.admin,
'extern_name': user.extern_name,
'extern_type': user.extern_type,
'language': user.user_data.get('language')
}
# in case there's new_password, that comes from form, use it to
# store password
if kwargs.get('new_password'):
kwargs['password'] = kwargs['new_password']
# cleanups, my_account password change form
kwargs.pop('current_password', None)
kwargs.pop('new_password', None)
# cleanups, user edit password change form
kwargs.pop('password_confirmation', None)
kwargs.pop('password_change', None)
# create repo group on user creation
kwargs.pop('create_repo_group', None)
# legacy forms send name, which is the firstname
firstname = kwargs.pop('name', None)
if firstname:
kwargs['firstname'] = firstname
for k, v in kwargs.items():
# skip if we don't want to update this
if skip_attrs and k in skip_attrs:
continue
user_attrs[k] = v
try:
return self.create_or_update(**user_attrs)
except Exception:
log.error(traceback.format_exc())
raise
def create_or_update(
self, username, password, email, firstname='', lastname='',
active=True, admin=False, extern_type=None, extern_name=None,
cur_user=None, plugin=None, force_password_change=False,
repo-groups: implemented default personal repo groups logic....
r1094 allow_to_create_user=True, create_repo_group=None,
project: added all source files and assets
r1 updating_user_id=None, language=None, strict_creation_check=True):
"""
Creates a new instance if not found, or updates current one
:param username:
:param password:
:param email:
:param firstname:
:param lastname:
:param active:
:param admin:
:param extern_type:
:param extern_name:
:param cur_user:
:param plugin: optional plugin this method was called from
:param force_password_change: toggles new or existing user flag
for password change
:param allow_to_create_user: Defines if the method can actually create
new users
:param create_repo_group: Defines if the method should also
create an repo group with user name, and owner
:param updating_user_id: if we set it up this is the user we want to
update this allows to editing username.
:param language: language of user from interface.
:returns: new User object with injected `is_new_user` attribute.
"""
pylons: fixed code and test suite after removal of pylons.
r2358
project: added all source files and assets
r1 if not cur_user:
cur_user = getattr(get_current_rhodecode_user(), 'username', None)
from rhodecode.lib.auth import (
get_crypt_password, check_password, generate_auth_token)
from rhodecode.lib.hooks_base import (
log_create_user, check_allowed_create_user)
def _password_change(new_user, password):
auth-rhodecode: don't fail on bcrypt if user password is set to None....
r2153 old_password = new_user.password or ''
project: added all source files and assets
r1 # empty password
auth-rhodecode: don't fail on bcrypt if user password is set to None....
r2153 if not old_password:
project: added all source files and assets
r1 return False
# password check is only needed for RhodeCode internal auth calls
# in case it's a plugin we don't care
if not plugin:
repo-groups: implemented default personal repo groups logic....
r1094 # first check if we gave crypted password back, and if it
# matches it's not password change
project: added all source files and assets
r1 if new_user.password == password:
return False
auth-rhodecode: don't fail on bcrypt if user password is set to None....
r2153 password_match = check_password(password, old_password)
project: added all source files and assets
r1 if not password_match:
return True
return False
repo-groups: implemented default personal repo groups logic....
r1094 # read settings on default personal repo group creation
if create_repo_group is None:
default_create_repo_group = RepoGroupModel()\
.get_default_create_personal_repo_group()
create_repo_group = default_create_repo_group
project: added all source files and assets
r1 user_data = {
'username': username,
'password': password,
'email': email,
'firstname': firstname,
'lastname': lastname,
'active': active,
'admin': admin
}
if updating_user_id:
log.debug('Checking for existing account in RhodeCode '
'database with user_id `%s` ' % (updating_user_id,))
user = User.get(updating_user_id)
else:
log.debug('Checking for existing account in RhodeCode '
'database with username `%s` ' % (username,))
user = User.get_by_username(username, case_insensitive=True)
if user is None:
# we check internal flag if this method is actually allowed to
# create new user
if not allow_to_create_user:
msg = ('Method wants to create new user, but it is not '
'allowed to do so')
log.warning(msg)
raise NotAllowedToCreateUserError(msg)
log.debug('Creating new user %s', username)
# only if we create user that is active
new_active_user = active
if new_active_user and strict_creation_check:
# raises UserCreationError if it's not allowed for any reason to
# create new active user, this also executes pre-create hooks
check_allowed_create_user(user_data, cur_user, strict_check=True)
dan
events: add event system for RepoEvents
r375 events.trigger(events.UserPreCreate(user_data))
project: added all source files and assets
r1 new_user = User()
edit = False
else:
log.debug('updating user %s', username)
dan
events: add event system for RepoEvents
r375 events.trigger(events.UserPreUpdate(user, user_data))
project: added all source files and assets
r1 new_user = user
edit = True
# we're not allowed to edit default user
if user.username == User.DEFAULT_USER:
raise DefaultUserException(
pylons: fixed code and test suite after removal of pylons.
r2358 "You can't edit this user (`%(username)s`) since it's "
"crucial for entire application"
% {'username': user.username})
project: added all source files and assets
r1
# inject special attribute that will tell us if User is new or old
new_user.is_new_user = not edit
# for users that didn's specify auth type, we use RhodeCode built in
from rhodecode.authentication.plugins import auth_rhodecode
extern_name = extern_name or auth_rhodecode.RhodeCodeAuthPlugin.name
extern_type = extern_type or auth_rhodecode.RhodeCodeAuthPlugin.name
try:
new_user.username = username
new_user.admin = admin
new_user.email = email
new_user.active = active
new_user.extern_name = safe_unicode(extern_name)
new_user.extern_type = safe_unicode(extern_type)
new_user.name = firstname
new_user.lastname = lastname
# set password only if creating an user or password is changed
if not edit or _password_change(new_user, password):
reason = 'new password' if edit else 'new user'
log.debug('Updating password reason=>%s', reason)
new_user.password = get_crypt_password(password) if password else None
if force_password_change:
new_user.update_userdata(force_password_change=True)
if language:
new_user.update_userdata(language=language)
notifications: store notification status in channelstream
r734 new_user.update_userdata(notification_status=True)
project: added all source files and assets
r1
self.sa.add(new_user)
if not edit and create_repo_group:
repo-groups: implemented default personal repo groups logic....
r1094 RepoGroupModel().create_personal_repo_group(
new_user, commit_early=False)
project: added all source files and assets
r1 if not edit:
# add the RSS token
AuthTokenModel().create(username,
db: use unicode for some of the defaults to reduce sqlalchemy warnings.
r1967 description=u'Generated feed token',
project: added all source files and assets
r1 role=AuthTokenModel.cls.ROLE_FEED)
user: deprecated usage of api_keys....
r1953 kwargs = new_user.get_dict()
# backward compat, require api_keys present
kwargs['api_keys'] = kwargs['auth_tokens']
log_create_user(created_by=cur_user, **kwargs)
repo-groups: implemented default personal repo groups logic....
r1094 events.trigger(events.UserPostCreate(user_data))
project: added all source files and assets
r1 return new_user
except (DatabaseError,):
log.error(traceback.format_exc())
raise
def create_registration(self, form_data):
from rhodecode.model.notification import NotificationModel
from rhodecode.model.notification import EmailNotificationModel
try:
form_data['admin'] = False
form_data['extern_name'] = 'rhodecode'
form_data['extern_type'] = 'rhodecode'
new_user = self.create(form_data)
self.sa.add(new_user)
self.sa.flush()
user_data = new_user.get_dict()
kwargs = {
# use SQLALCHEMY safe dump of user data
'user': AttributeDict(user_data),
'date': datetime.datetime.now()
}
notification_type = EmailNotificationModel.TYPE_REGISTRATION
# pre-generate the subject for notification itself
(subject,
_h, _e, # we don't care about those
body_plaintext) = EmailNotificationModel().render_email(
notification_type, **kwargs)
# create notification objects, and emails
NotificationModel().create(
created_by=new_user,
notification_subject=subject,
notification_body=body_plaintext,
notification_type=notification_type,
recipients=None, # all admins
email_kwargs=kwargs,
)
return new_user
except Exception:
log.error(traceback.format_exc())
raise
def _handle_user_repos(self, username, repositories, handle_mode=None):
refactor: renamed get_first_admin to get_first_super_admin which...
r278 _superadmin = self.cls.get_first_super_admin()
project: added all source files and assets
r1 left_overs = True
from rhodecode.model.repo import RepoModel
if handle_mode == 'detach':
for obj in repositories:
obj.user = _superadmin
# set description we know why we super admin now owns
# additional repositories that were orphaned !
obj.description += ' \n::detached repository from deleted user: %s' % (username,)
self.sa.add(obj)
left_overs = False
elif handle_mode == 'delete':
for obj in repositories:
RepoModel().delete(obj, forks='detach')
left_overs = False
# if nothing is done we have left overs left
return left_overs
def _handle_user_repo_groups(self, username, repository_groups,
handle_mode=None):
refactor: renamed get_first_admin to get_first_super_admin which...
r278 _superadmin = self.cls.get_first_super_admin()
project: added all source files and assets
r1 left_overs = True
from rhodecode.model.repo_group import RepoGroupModel
if handle_mode == 'detach':
for r in repository_groups:
r.user = _superadmin
# set description we know why we super admin now owns
# additional repositories that were orphaned !
r.group_description += ' \n::detached repository group from deleted user: %s' % (username,)
self.sa.add(r)
left_overs = False
elif handle_mode == 'delete':
for r in repository_groups:
RepoGroupModel().delete(r)
left_overs = False
# if nothing is done we have left overs left
return left_overs
def _handle_user_user_groups(self, username, user_groups, handle_mode=None):
refactor: renamed get_first_admin to get_first_super_admin which...
r278 _superadmin = self.cls.get_first_super_admin()
project: added all source files and assets
r1 left_overs = True
from rhodecode.model.user_group import UserGroupModel
if handle_mode == 'detach':
for r in user_groups:
for user_user_group_to_perm in r.user_user_group_to_perm:
if user_user_group_to_perm.user.username == username:
user_user_group_to_perm.user = _superadmin
r.user = _superadmin
# set description we know why we super admin now owns
# additional repositories that were orphaned !
r.user_group_description += ' \n::detached user group from deleted user: %s' % (username,)
self.sa.add(r)
left_overs = False
elif handle_mode == 'delete':
for r in user_groups:
UserGroupModel().delete(r)
left_overs = False
# if nothing is done we have left overs left
return left_overs
def delete(self, user, cur_user=None, handle_repos=None,
handle_repo_groups=None, handle_user_groups=None):
if not cur_user:
pylons: fixed code and test suite after removal of pylons.
r2358 cur_user = getattr(
get_current_rhodecode_user(), 'username', None)
project: added all source files and assets
r1 user = self._get_user(user)
try:
if user.username == User.DEFAULT_USER:
raise DefaultUserException(
pylons: fixed code and test suite after removal of pylons.
r2358 u"You can't remove this user since it's"
u" crucial for entire application")
project: added all source files and assets
r1
left_overs = self._handle_user_repos(
user.username, user.repositories, handle_repos)
if left_overs and user.repositories:
repos = [x.repo_name for x in user.repositories]
raise UserOwnsReposException(
pylons: fixed code and test suite after removal of pylons.
r2358 u'user "%(username)s" still owns %(len_repos)s repositories and cannot be '
u'removed. Switch owners or remove those repositories:%(list_repos)s'
% {'username': user.username, 'len_repos': len(repos),
'list_repos': ', '.join(repos)})
project: added all source files and assets
r1
left_overs = self._handle_user_repo_groups(
user.username, user.repository_groups, handle_repo_groups)
if left_overs and user.repository_groups:
repo_groups = [x.group_name for x in user.repository_groups]
raise UserOwnsRepoGroupsException(
pylons: fixed code and test suite after removal of pylons.
r2358 u'user "%(username)s" still owns %(len_repo_groups)s repository groups and cannot be '
u'removed. Switch owners or remove those repository groups:%(list_repo_groups)s'
% {'username': user.username, 'len_repo_groups': len(repo_groups),
'list_repo_groups': ', '.join(repo_groups)})
project: added all source files and assets
r1
left_overs = self._handle_user_user_groups(
user.username, user.user_groups, handle_user_groups)
if left_overs and user.user_groups:
user_groups = [x.users_group_name for x in user.user_groups]
raise UserOwnsUserGroupsException(
pylons: fixed code and test suite after removal of pylons.
r2358 u'user "%s" still owns %s user groups and cannot be '
u'removed. Switch owners or remove those user groups:%s'
project: added all source files and assets
r1 % (user.username, len(user_groups), ', '.join(user_groups)))
# we might change the user data with detach/delete, make sure
# the object is marked as expired before actually deleting !
self.sa.expire(user)
self.sa.delete(user)
from rhodecode.lib.hooks_base import log_delete_user
log_delete_user(deleted_by=cur_user, **user.get_dict())
except Exception:
log.error(traceback.format_exc())
raise
login: Fix password reset mail link....
r37 def reset_password_link(self, data, pwd_reset_url):
project: added all source files and assets
r1 from rhodecode.lib.celerylib import tasks, run_task
from rhodecode.model.notification import EmailNotificationModel
user_email = data['email']
try:
user = User.get_by_email(user_email)
if user:
log.debug('password reset user found %s', user)
email_kwargs = {
login: Fix password reset mail link....
r37 'password_reset_url': pwd_reset_url,
project: added all source files and assets
r1 'user': user,
'email': user_email,
'date': datetime.datetime.now()
}
(subject, headers, email_body,
email_body_plaintext) = EmailNotificationModel().render_email(
EmailNotificationModel.TYPE_PASSWORD_RESET, **email_kwargs)
recipients = [user_email]
action_logger_generic(
'sending password reset email to user: {}'.format(
user), namespace='security.password_reset')
run_task(tasks.send_email, recipients, subject,
email_body_plaintext, email_body)
else:
log.debug("password reset email %s not found", user_email)
except Exception:
log.error(traceback.format_exc())
return False
return True
password-reset: strengthten security on password reset logic....
r1471 def reset_password(self, data):
project: added all source files and assets
r1 from rhodecode.lib.celerylib import tasks, run_task
from rhodecode.model.notification import EmailNotificationModel
from rhodecode.lib import auth
user_email = data['email']
pre_db = True
try:
user = User.get_by_email(user_email)
new_passwd = auth.PasswordGenerator().gen_password(
12, auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
if user:
user.password = auth.get_crypt_password(new_passwd)
# also force this user to reset his password !
user.update_userdata(force_password_change=True)
Session().add(user)
password-reset: strengthten security on password reset logic....
r1471
# now delete the token in question
UserApiKeys = AuthTokenModel.cls
UserApiKeys().query().filter(
UserApiKeys.api_key == data['token']).delete()
project: added all source files and assets
r1 Session().commit()
logging: improve slightly password reset logs.
r1296 log.info('successfully reset password for `%s`', user_email)
password-reset: strengthten security on password reset logic....
r1471
project: added all source files and assets
r1 if new_passwd is None:
raise Exception('unable to generate new password')
pre_db = False
email_kwargs = {
'new_password': new_passwd,
'user': user,
'email': user_email,
'date': datetime.datetime.now()
}
(subject, headers, email_body,
email_body_plaintext) = EmailNotificationModel().render_email(
password-reset: strengthten security on password reset logic....
r1471 EmailNotificationModel.TYPE_PASSWORD_RESET_CONFIRMATION,
**email_kwargs)
project: added all source files and assets
r1
recipients = [user_email]
action_logger_generic(
'sent new password to user: {} with email: {}'.format(
user, user_email), namespace='security.password_reset')
run_task(tasks.send_email, recipients, subject,
email_body_plaintext, email_body)
except Exception:
log.error('Failed to update user password')
log.error(traceback.format_exc())
if pre_db:
# we rollback only if local db stuff fails. If it goes into
# run_task, we're pass rollback state this wouldn't work then
Session().rollback()
return True
def fill_data(self, auth_user, user_id=None, api_key=None, username=None):
"""
Fetches auth_user by user_id,or api_key if present.
Fills auth_user attributes with those taken from database.
Additionally set's is_authenitated if lookup fails
present in database
:param auth_user: instance of user to set attributes
:param user_id: user id to fetch by
:param api_key: api key to fetch by
:param username: username to fetch by
"""
if user_id is None and api_key is None and username is None:
raise Exception('You need to pass user_id, api_key or username')
log.debug(
auth: improve logging
r1955 'AuthUser: fill data execution based on: '
'user_id:%s api_key:%s username:%s', user_id, api_key, username)
project: added all source files and assets
r1 try:
dbuser = None
if user_id:
dbuser = self.get(user_id)
elif api_key:
dbuser = self.get_by_auth_token(api_key)
elif username:
dbuser = self.get_by_username(username)
if not dbuser:
log.warning(
'Unable to lookup user by id:%s api_key:%s username:%s',
user_id, api_key, username)
return False
if not dbuser.active:
users: personal repo-group shouldn't be available for default user.
r1690 log.debug('User `%s:%s` is inactive, skipping fill data',
username, user_id)
project: added all source files and assets
r1 return False
auth: improve logging
r1955 log.debug('AuthUser: filling found user:%s data', dbuser)
user: deprecated usage of api_keys....
r1953 user_data = dbuser.get_dict()
project: added all source files and assets
r1
security: use new safe escaped user attributes across the application....
r1815 user_data.update({
# set explicit the safe escaped values
'first_name': dbuser.first_name,
'last_name': dbuser.last_name,
})
project: added all source files and assets
r1
user: deprecated usage of api_keys....
r1953 for k, v in user_data.items():
project: added all source files and assets
r1 # properties of auth user we dont update
if k not in ['auth_tokens', 'permissions']:
setattr(auth_user, k, v)
except Exception:
log.error(traceback.format_exc())
auth_user.is_authenticated = False
return False
return True
def has_perm(self, user, perm):
perm = self._get_perm(perm)
user = self._get_user(user)
return UserToPerm.query().filter(UserToPerm.user == user)\
.filter(UserToPerm.permission == perm).scalar() is not None
def grant_perm(self, user, perm):
"""
Grant user global permissions
:param user:
:param perm:
"""
user = self._get_user(user)
perm = self._get_perm(perm)
# if this permission is already granted skip it
_perm = UserToPerm.query()\
.filter(UserToPerm.user == user)\
.filter(UserToPerm.permission == perm)\
.scalar()
if _perm:
return
new = UserToPerm()
new.user = user
new.permission = perm
self.sa.add(new)
return new
def revoke_perm(self, user, perm):
"""
Revoke users global permissions
:param user:
:param perm:
"""
user = self._get_user(user)
perm = self._get_perm(perm)
obj = UserToPerm.query()\
.filter(UserToPerm.user == user)\
.filter(UserToPerm.permission == perm)\
.scalar()
if obj:
self.sa.delete(obj)
def add_extra_email(self, user, email):
"""
Adds email address to UserEmailMap
:param user:
:param email:
"""
pylons: remove pylons as dependency...
r2351
project: added all source files and assets
r1 user = self._get_user(user)
obj = UserEmailMap()
obj.user = user
pylons: remove pylons as dependency...
r2351 obj.email = email
project: added all source files and assets
r1 self.sa.add(obj)
return obj
def delete_extra_email(self, user, email_id):
"""
Removes email address from UserEmailMap
:param user:
:param email_id:
"""
user = self._get_user(user)
obj = UserEmailMap.query().get(email_id)
audit-logs: added audit-logs on user actions....
r1801 if obj and obj.user_id == user.user_id:
project: added all source files and assets
r1 self.sa.delete(obj)
def parse_ip_range(self, ip_range):
ip_list = []
dependencies: bumped pyramid-debugtoolbar to 4.3.1
r1907
project: added all source files and assets
r1 def make_unique(value):
seen = []
return [c for c in value if not (c in seen or seen.append(c))]
# firsts split by commas
for ip_range in ip_range.split(','):
if not ip_range:
continue
ip_range = ip_range.strip()
if '-' in ip_range:
start_ip, end_ip = ip_range.split('-', 1)
dependencies: bumped pyramid-debugtoolbar to 4.3.1
r1907 start_ip = ipaddress.ip_address(safe_unicode(start_ip.strip()))
end_ip = ipaddress.ip_address(safe_unicode(end_ip.strip()))
project: added all source files and assets
r1 parsed_ip_range = []
for index in xrange(int(start_ip), int(end_ip) + 1):
new_ip = ipaddress.ip_address(index)
parsed_ip_range.append(str(new_ip))
ip_list.extend(parsed_ip_range)
else:
ip_list.append(ip_range)
return make_unique(ip_list)
def add_extra_ip(self, user, ip, description=None):
"""
Adds ip address to UserIpMap
:param user:
:param ip:
"""
pylons: remove pylons as dependency...
r2351
project: added all source files and assets
r1 user = self._get_user(user)
obj = UserIpMap()
obj.user = user
pylons: remove pylons as dependency...
r2351 obj.ip_addr = ip
project: added all source files and assets
r1 obj.description = description
self.sa.add(obj)
return obj
def delete_extra_ip(self, user, ip_id):
"""
Removes ip address from UserIpMap
:param user:
:param ip_id:
"""
user = self._get_user(user)
obj = UserIpMap.query().get(ip_id)
audit-logs: added audit-logs on user actions....
r1801 if obj and obj.user_id == user.user_id:
project: added all source files and assets
r1 self.sa.delete(obj)
def get_accounts_in_creation_order(self, current_user=None):
"""
Get accounts in order of creation for deactivation for license limits
pick currently logged in user, and append to the list in position 0
pick all super-admins in order of creation date and add it to the list
pick all other accounts in order of creation and add it to the list.
Based on that list, the last accounts can be disabled as they are
created at the end and don't include any of the super admins as well
as the current user.
:param current_user: optionally current user running this operation
"""
if not current_user:
current_user = get_current_rhodecode_user()
active_super_admins = [
x.user_id for x in User.query()
.filter(User.user_id != current_user.user_id)
.filter(User.active == true())
.filter(User.admin == true())
.order_by(User.created_on.asc())]
active_regular_users = [
x.user_id for x in User.query()
.filter(User.user_id != current_user.user_id)
.filter(User.active == true())
.filter(User.admin == false())
.order_by(User.created_on.asc())]
list_of_accounts = [current_user.user_id]
list_of_accounts += active_super_admins
list_of_accounts += active_regular_users
return list_of_accounts
users: made the function for account deactivation easier to use inside ishell.
r1946 def deactivate_last_users(self, expected_users, current_user=None):
project: added all source files and assets
r1 """
Deactivate accounts that are over the license limits.
Algorithm of which accounts to disabled is based on the formula:
Get current user, then super admins in creation order, then regular
active users in creation order.
Using that list we mark all accounts from the end of it as inactive.
This way we block only latest created accounts.
:param expected_users: list of users in special order, we deactivate
the end N ammoun of users from that list
"""
users: made the function for account deactivation easier to use inside ishell.
r1946 list_of_accounts = self.get_accounts_in_creation_order(
current_user=current_user)
project: added all source files and assets
r1
for acc_id in list_of_accounts[expected_users + 1:]:
user = User.get(acc_id)
log.info('Deactivating account %s for license unlock', user)
user.active = False
Session().add(user)
Session().commit()
return
admin-users: add audit page to allow showing user actions in RhodeCode....
r1559
def get_user_log(self, user, filter_term):
user_log = UserLog.query()\
.filter(or_(UserLog.user_id == user.user_id,
UserLog.username == user.username))\
.options(joinedload(UserLog.user))\
.options(joinedload(UserLog.repository))\
.order_by(UserLog.action_date.desc())
user_log = user_log_filter(user_log, filter_term)
return user_log