# Copyright (C) 2016-2023 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import logging import datetime import formencode import formencode.htmlfill from pyramid.httpexceptions import HTTPFound from pyramid.renderers import render from pyramid.response import Response from rhodecode import events from rhodecode.apps._base import BaseAppView, DataGridAppView, UserAppView from rhodecode.apps.ssh_support import SshKeyFileChangeEvent from rhodecode.authentication.base import get_authn_registry, RhodeCodeExternalAuthPlugin from rhodecode.authentication.plugins import auth_rhodecode from rhodecode.events import trigger from rhodecode.model.db import true, UserNotice from rhodecode.lib import audit_logger, rc_cache, auth from rhodecode.lib.exceptions import ( UserCreationError, UserOwnsReposException, UserOwnsRepoGroupsException, UserOwnsUserGroupsException, UserOwnsPullRequestsException, UserOwnsArtifactsException, DefaultUserException) from rhodecode.lib import ext_json from rhodecode.lib.auth import ( LoginRequired, HasPermissionAllDecorator, CSRFRequired) from rhodecode.lib import helpers as h from rhodecode.lib.helpers import SqlPage from rhodecode.lib.utils2 import safe_int, safe_str, AttributeDict from rhodecode.model.auth_token import AuthTokenModel from rhodecode.model.forms import ( UserForm, UserIndividualPermissionsForm, UserPermissionsForm, UserExtraEmailForm, UserExtraIpForm) from rhodecode.model.permission import PermissionModel from rhodecode.model.repo_group import RepoGroupModel from rhodecode.model.ssh_key import SshKeyModel from rhodecode.model.user import UserModel from rhodecode.model.user_group import UserGroupModel from rhodecode.model.db import ( or_, coalesce,IntegrityError, User, UserGroup, UserIpMap, UserEmailMap, UserApiKeys, UserSshKeys, RepoGroup) from rhodecode.model.meta import Session log = logging.getLogger(__name__) class AdminUsersView(BaseAppView, DataGridAppView): def load_default_context(self): c = self._get_local_tmpl_context() return c @LoginRequired() @HasPermissionAllDecorator('hg.admin') def users_list(self): c = self.load_default_context() return self._get_template_context(c) @LoginRequired() @HasPermissionAllDecorator('hg.admin') def users_list_data(self): self.load_default_context() column_map = { 'first_name': 'name', 'last_name': 'lastname', } draw, start, limit = self._extract_chunk(self.request) search_q, order_by, order_dir = self._extract_ordering( self.request, column_map=column_map) _render = self.request.get_partial_renderer( 'rhodecode:templates/data_table/_dt_elements.mako') def user_actions(user_id, username): return _render("user_actions", user_id, username) users_data_total_count = User.query()\ .filter(User.username != User.DEFAULT_USER) \ .count() users_data_total_inactive_count = User.query()\ .filter(User.username != User.DEFAULT_USER) \ .filter(User.active != true())\ .count() # json generate base_q = User.query().filter(User.username != User.DEFAULT_USER) base_inactive_q = base_q.filter(User.active != true()) if search_q: like_expression = '%{}%'.format(safe_str(search_q)) base_q = base_q.filter(or_( User.username.ilike(like_expression), User._email.ilike(like_expression), User.name.ilike(like_expression), User.lastname.ilike(like_expression), )) base_inactive_q = base_q.filter(User.active != true()) users_data_total_filtered_count = base_q.count() users_data_total_filtered_inactive_count = base_inactive_q.count() sort_col = getattr(User, order_by, None) if sort_col: if order_dir == 'asc': # handle null values properly to order by NULL last if order_by in ['last_activity']: sort_col = coalesce(sort_col, datetime.date.max) sort_col = sort_col.asc() else: # handle null values properly to order by NULL last if order_by in ['last_activity']: sort_col = coalesce(sort_col, datetime.date.min) sort_col = sort_col.desc() base_q = base_q.order_by(sort_col) base_q = base_q.offset(start).limit(limit) users_list = base_q.all() users_data = [] for user in users_list: users_data.append({ "username": h.gravatar_with_user(self.request, user.username), "email": user.email, "first_name": user.first_name, "last_name": user.last_name, "last_login": h.format_date(user.last_login), "last_activity": h.format_date(user.last_activity), "active": h.bool2icon(user.active), "active_raw": user.active, "admin": h.bool2icon(user.admin), "extern_type": user.extern_type, "extern_name": user.extern_name, "action": user_actions(user.user_id, user.username), }) data = ({ 'draw': draw, 'data': users_data, 'recordsTotal': users_data_total_count, 'recordsFiltered': users_data_total_filtered_count, 'recordsTotalInactive': users_data_total_inactive_count, 'recordsFilteredInactive': users_data_total_filtered_inactive_count }) return data def _set_personal_repo_group_template_vars(self, c_obj): DummyUser = AttributeDict({ 'username': '${username}', 'user_id': '${user_id}', }) c_obj.default_create_repo_group = RepoGroupModel() \ .get_default_create_personal_repo_group() c_obj.personal_repo_group_name = RepoGroupModel() \ .get_personal_group_name(DummyUser) @LoginRequired() @HasPermissionAllDecorator('hg.admin') def users_new(self): _ = self.request.translate c = self.load_default_context() c.default_extern_type = auth_rhodecode.RhodeCodeAuthPlugin.uid self._set_personal_repo_group_template_vars(c) return self._get_template_context(c) @LoginRequired() @HasPermissionAllDecorator('hg.admin') @CSRFRequired() def users_create(self): _ = self.request.translate c = self.load_default_context() c.default_extern_type = auth_rhodecode.RhodeCodeAuthPlugin.uid user_model = UserModel() user_form = UserForm(self.request.translate)() try: form_result = user_form.to_python(dict(self.request.POST)) user = user_model.create(form_result) Session().flush() creation_data = user.get_api_data() username = form_result['username'] audit_logger.store_web( 'user.create', action_data={'data': creation_data}, user=c.rhodecode_user) user_link = h.link_to( h.escape(username), h.route_path('user_edit', user_id=user.user_id)) h.flash(h.literal(_('Created user %(user_link)s') % {'user_link': user_link}), category='success') Session().commit() except formencode.Invalid as errors: self._set_personal_repo_group_template_vars(c) data = render( 'rhodecode:templates/admin/users/user_add.mako', self._get_template_context(c), self.request) html = formencode.htmlfill.render( data, defaults=errors.value, errors=errors.unpack_errors() or {}, prefix_error=False, encoding="UTF-8", force_defaults=False ) return Response(html) except UserCreationError as e: h.flash(safe_str(e), 'error') except Exception: log.exception("Exception creation of user") h.flash(_('Error occurred during creation of user %s') % self.request.POST.get('username'), category='error') raise HTTPFound(h.route_path('users')) class UsersView(UserAppView): ALLOW_SCOPED_TOKENS = False """ This view has alternative version inside EE, if modified please take a look in there as well. """ def get_auth_plugins(self): valid_plugins = [] authn_registry = get_authn_registry(self.request.registry) for plugin in authn_registry.get_plugins_for_authentication(): if isinstance(plugin, RhodeCodeExternalAuthPlugin): valid_plugins.append(plugin) elif plugin.name == 'rhodecode': valid_plugins.append(plugin) # extend our choices if user has set a bound plugin which isn't enabled at the # moment extern_type = self.db_user.extern_type if extern_type not in [x.uid for x in valid_plugins]: try: plugin = authn_registry.get_plugin_by_uid(extern_type) if plugin: valid_plugins.append(plugin) except Exception: log.exception( f'Could not extend user plugins with `{extern_type}`') return valid_plugins def load_default_context(self): req = self.request c = self._get_local_tmpl_context() c.allow_scoped_tokens = self.ALLOW_SCOPED_TOKENS c.allowed_languages = [ ('en', 'English (en)'), ('de', 'German (de)'), ('fr', 'French (fr)'), ('it', 'Italian (it)'), ('ja', 'Japanese (ja)'), ('pl', 'Polish (pl)'), ('pt', 'Portuguese (pt)'), ('ru', 'Russian (ru)'), ('zh', 'Chinese (zh)'), ] c.allowed_extern_types = [ (x.uid, x.get_display_name()) for x in self.get_auth_plugins() ] perms = req.registry.settings.get('available_permissions') if not perms: # inject info about available permissions auth.set_available_permissions(req.registry.settings) c.available_permissions = req.registry.settings['available_permissions'] PermissionModel().set_global_permission_choices( c, gettext_translator=req.translate) return c @LoginRequired() @HasPermissionAllDecorator('hg.admin') @CSRFRequired() def user_update(self): _ = self.request.translate c = self.load_default_context() user_id = self.db_user_id c.user = self.db_user c.active = 'profile' c.extern_type = c.user.extern_type c.extern_name = c.user.extern_name c.perm_user = c.user.AuthUser(ip_addr=self.request.remote_addr) available_languages = [x[0] for x in c.allowed_languages] _form = UserForm(self.request.translate, edit=True, available_languages=available_languages, old_data={'user_id': user_id, 'email': c.user.email})() c.edit_mode = self.request.POST.get('edit') == '1' form_result = {} old_values = c.user.get_api_data() try: form_result = _form.to_python(dict(self.request.POST)) skip_attrs = ['extern_name'] # TODO: plugin should define if username can be updated if c.extern_type != "rhodecode" and not c.edit_mode: # forbid updating username for external accounts skip_attrs.append('username') UserModel().update_user( user_id, skip_attrs=skip_attrs, **form_result) audit_logger.store_web( 'user.edit', action_data={'old_data': old_values}, user=c.rhodecode_user) Session().commit() h.flash(_('User updated successfully'), category='success') except formencode.Invalid as errors: data = render( 'rhodecode:templates/admin/users/user_edit.mako', self._get_template_context(c), self.request) html = formencode.htmlfill.render( data, defaults=errors.value, errors=errors.unpack_errors() or {}, prefix_error=False, encoding="UTF-8", force_defaults=False ) return Response(html) except UserCreationError as e: h.flash(safe_str(e), 'error') except Exception: log.exception("Exception updating user") h.flash(_('Error occurred during update of user %s') % form_result.get('username'), category='error') raise HTTPFound(h.route_path('user_edit', user_id=user_id)) @LoginRequired() @HasPermissionAllDecorator('hg.admin') @CSRFRequired() def user_delete(self): _ = self.request.translate c = self.load_default_context() c.user = self.db_user _repos = len(c.user.repositories) _repo_groups = len(c.user.repository_groups) _user_groups = len(c.user.user_groups) _pull_requests = len(c.user.user_pull_requests) _artifacts = len(c.user.artifacts) handle_repos = None handle_repo_groups = None handle_user_groups = None handle_pull_requests = None handle_artifacts = None # calls for flash of handle based on handle case detach or delete def set_handle_flash_repos(): handle = handle_repos if handle == 'detach': h.flash(_('Detached %s repositories') % _repos, category='success') elif handle == 'delete': h.flash(_('Deleted %s repositories') % _repos, category='success') def set_handle_flash_repo_groups(): handle = handle_repo_groups if handle == 'detach': h.flash(_('Detached %s repository groups') % _repo_groups, category='success') elif handle == 'delete': h.flash(_('Deleted %s repository groups') % _repo_groups, category='success') def set_handle_flash_user_groups(): handle = handle_user_groups if handle == 'detach': h.flash(_('Detached %s user groups') % _user_groups, category='success') elif handle == 'delete': h.flash(_('Deleted %s user groups') % _user_groups, category='success') def set_handle_flash_pull_requests(): handle = handle_pull_requests if handle == 'detach': h.flash(_('Detached %s pull requests') % _pull_requests, category='success') elif handle == 'delete': h.flash(_('Deleted %s pull requests') % _pull_requests, category='success') def set_handle_flash_artifacts(): handle = handle_artifacts if handle == 'detach': h.flash(_('Detached %s artifacts') % _artifacts, category='success') elif handle == 'delete': h.flash(_('Deleted %s artifacts') % _artifacts, category='success') handle_user = User.get_first_super_admin() handle_user_id = safe_int(self.request.POST.get('detach_user_id')) if handle_user_id: # NOTE(marcink): we get new owner for objects... handle_user = User.get_or_404(handle_user_id) if _repos and self.request.POST.get('user_repos'): handle_repos = self.request.POST['user_repos'] if _repo_groups and self.request.POST.get('user_repo_groups'): handle_repo_groups = self.request.POST['user_repo_groups'] if _user_groups and self.request.POST.get('user_user_groups'): handle_user_groups = self.request.POST['user_user_groups'] if _pull_requests and self.request.POST.get('user_pull_requests'): handle_pull_requests = self.request.POST['user_pull_requests'] if _artifacts and self.request.POST.get('user_artifacts'): handle_artifacts = self.request.POST['user_artifacts'] old_values = c.user.get_api_data() try: UserModel().delete( c.user, handle_repos=handle_repos, handle_repo_groups=handle_repo_groups, handle_user_groups=handle_user_groups, handle_pull_requests=handle_pull_requests, handle_artifacts=handle_artifacts, handle_new_owner=handle_user ) audit_logger.store_web( 'user.delete', action_data={'old_data': old_values}, user=c.rhodecode_user) Session().commit() set_handle_flash_repos() set_handle_flash_repo_groups() set_handle_flash_user_groups() set_handle_flash_pull_requests() set_handle_flash_artifacts() username = h.escape(old_values['username']) h.flash(_('Successfully deleted user `{}`').format(username), category='success') except (UserOwnsReposException, UserOwnsRepoGroupsException, UserOwnsUserGroupsException, UserOwnsPullRequestsException, UserOwnsArtifactsException, DefaultUserException) as e: h.flash(safe_str(e), category='warning') except Exception: log.exception("Exception during deletion of user") h.flash(_('An error occurred during deletion of user'), category='error') raise HTTPFound(h.route_path('users')) @LoginRequired() @HasPermissionAllDecorator('hg.admin') def user_edit(self): _ = self.request.translate c = self.load_default_context() c.user = self.db_user c.active = 'profile' c.extern_type = c.user.extern_type c.extern_name = c.user.extern_name c.perm_user = c.user.AuthUser(ip_addr=self.request.remote_addr) c.edit_mode = self.request.GET.get('edit') == '1' defaults = c.user.get_dict() defaults.update({'language': c.user.user_data.get('language')}) data = render( 'rhodecode:templates/admin/users/user_edit.mako', self._get_template_context(c), self.request) html = formencode.htmlfill.render( data, defaults=defaults, encoding="UTF-8", force_defaults=False ) return Response(html) @LoginRequired() @HasPermissionAllDecorator('hg.admin') def user_edit_advanced(self): _ = self.request.translate c = self.load_default_context() user_id = self.db_user_id c.user = self.db_user c.detach_user = User.get_first_super_admin() detach_user_id = safe_int(self.request.GET.get('detach_user_id')) if detach_user_id: c.detach_user = User.get_or_404(detach_user_id) c.active = 'advanced' c.personal_repo_group = RepoGroup.get_user_personal_repo_group(user_id) c.personal_repo_group_name = RepoGroupModel()\ .get_personal_group_name(c.user) c.user_to_review_rules = sorted( (x.user for x in c.user.user_review_rules), key=lambda u: u.username.lower()) defaults = c.user.get_dict() # Interim workaround if the user participated on any pull requests as a # reviewer. has_review = len(c.user.reviewer_pull_requests) c.can_delete_user = not has_review c.can_delete_user_message = '' inactive_link = h.link_to( 'inactive', h.route_path('user_edit', user_id=user_id, _anchor='active')) if has_review == 1: c.can_delete_user_message = h.literal(_( 'The user participates as reviewer in {} pull request and ' 'cannot be deleted. \nYou can set the user to ' '"{}" instead of deleting it.').format( has_review, inactive_link)) elif has_review: c.can_delete_user_message = h.literal(_( 'The user participates as reviewer in {} pull requests and ' 'cannot be deleted. \nYou can set the user to ' '"{}" instead of deleting it.').format( has_review, inactive_link)) data = render( 'rhodecode:templates/admin/users/user_edit.mako', self._get_template_context(c), self.request) html = formencode.htmlfill.render( data, defaults=defaults, encoding="UTF-8", force_defaults=False ) return Response(html) @LoginRequired() @HasPermissionAllDecorator('hg.admin') def user_edit_global_perms(self): _ = self.request.translate c = self.load_default_context() c.user = self.db_user c.active = 'global_perms' c.default_user = User.get_default_user() defaults = c.user.get_dict() defaults.update(c.default_user.get_default_perms(suffix='_inherited')) defaults.update(c.default_user.get_default_perms()) defaults.update(c.user.get_default_perms()) data = render( 'rhodecode:templates/admin/users/user_edit.mako', self._get_template_context(c), self.request) html = formencode.htmlfill.render( data, defaults=defaults, encoding="UTF-8", force_defaults=False ) return Response(html) @LoginRequired() @HasPermissionAllDecorator('hg.admin') @CSRFRequired() def user_edit_global_perms_update(self): _ = self.request.translate c = self.load_default_context() user_id = self.db_user_id c.user = self.db_user c.active = 'global_perms' try: # first stage that verifies the checkbox _form = UserIndividualPermissionsForm(self.request.translate) form_result = _form.to_python(dict(self.request.POST)) inherit_perms = form_result['inherit_default_permissions'] c.user.inherit_default_permissions = inherit_perms Session().add(c.user) if not inherit_perms: # only update the individual ones if we un check the flag _form = UserPermissionsForm( self.request.translate, [x[0] for x in c.repo_create_choices], [x[0] for x in c.repo_create_on_write_choices], [x[0] for x in c.repo_group_create_choices], [x[0] for x in c.user_group_create_choices], [x[0] for x in c.fork_choices], [x[0] for x in c.inherit_default_permission_choices])() form_result = _form.to_python(dict(self.request.POST)) form_result.update({'perm_user_id': c.user.user_id}) PermissionModel().update_user_permissions(form_result) # TODO(marcink): implement global permissions # audit_log.store_web('user.edit.permissions') Session().commit() h.flash(_('User global permissions updated successfully'), category='success') except formencode.Invalid as errors: data = render( 'rhodecode:templates/admin/users/user_edit.mako', self._get_template_context(c), self.request) html = formencode.htmlfill.render( data, defaults=errors.value, errors=errors.unpack_errors() or {}, prefix_error=False, encoding="UTF-8", force_defaults=False ) return Response(html) except Exception: log.exception("Exception during permissions saving") h.flash(_('An error occurred during permissions saving'), category='error') affected_user_ids = [user_id] PermissionModel().trigger_permission_flush(affected_user_ids) raise HTTPFound(h.route_path('user_edit_global_perms', user_id=user_id)) @LoginRequired() @HasPermissionAllDecorator('hg.admin') @CSRFRequired() def user_enable_force_password_reset(self): _ = self.request.translate c = self.load_default_context() user_id = self.db_user_id c.user = self.db_user try: c.user.update_userdata(force_password_change=True) msg = _('Force password change enabled for user') audit_logger.store_web('user.edit.password_reset.enabled', user=c.rhodecode_user) Session().commit() h.flash(msg, category='success') except Exception: log.exception("Exception during password reset for user") h.flash(_('An error occurred during password reset for user'), category='error') raise HTTPFound(h.route_path('user_edit_advanced', user_id=user_id)) @LoginRequired() @HasPermissionAllDecorator('hg.admin') @CSRFRequired() def user_disable_force_password_reset(self): _ = self.request.translate c = self.load_default_context() user_id = self.db_user_id c.user = self.db_user try: c.user.update_userdata(force_password_change=False) msg = _('Force password change disabled for user') audit_logger.store_web( 'user.edit.password_reset.disabled', user=c.rhodecode_user) Session().commit() h.flash(msg, category='success') except Exception: log.exception("Exception during password reset for user") h.flash(_('An error occurred during password reset for user'), category='error') raise HTTPFound(h.route_path('user_edit_advanced', user_id=user_id)) @LoginRequired() @HasPermissionAllDecorator('hg.admin') @CSRFRequired() def user_notice_dismiss(self): _ = self.request.translate c = self.load_default_context() user_id = self.db_user_id c.user = self.db_user user_notice_id = safe_int(self.request.POST.get('notice_id')) notice = UserNotice().query()\ .filter(UserNotice.user_id == user_id)\ .filter(UserNotice.user_notice_id == user_notice_id)\ .scalar() read = False if notice: notice.notice_read = True Session().add(notice) Session().commit() read = True return {'notice': user_notice_id, 'read': read} @LoginRequired() @HasPermissionAllDecorator('hg.admin') @CSRFRequired() def user_create_personal_repo_group(self): """ Create personal repository group for this user """ from rhodecode.model.repo_group import RepoGroupModel _ = self.request.translate c = self.load_default_context() user_id = self.db_user_id c.user = self.db_user personal_repo_group = RepoGroup.get_user_personal_repo_group( c.user.user_id) if personal_repo_group: raise HTTPFound(h.route_path('user_edit_advanced', user_id=user_id)) personal_repo_group_name = RepoGroupModel().get_personal_group_name(c.user) named_personal_group = RepoGroup.get_by_group_name( personal_repo_group_name) try: if named_personal_group and named_personal_group.user_id == c.user.user_id: # migrate the same named group, and mark it as personal named_personal_group.personal = True Session().add(named_personal_group) Session().commit() msg = _('Linked repository group `{}` as personal'.format( personal_repo_group_name)) h.flash(msg, category='success') elif not named_personal_group: RepoGroupModel().create_personal_repo_group(c.user) msg = _('Created repository group `{}`'.format( personal_repo_group_name)) h.flash(msg, category='success') else: msg = _('Repository group `{}` is already taken'.format( personal_repo_group_name)) h.flash(msg, category='warning') except Exception: log.exception("Exception during repository group creation") msg = _( 'An error occurred during repository group creation for user') h.flash(msg, category='error') Session().rollback() raise HTTPFound(h.route_path('user_edit_advanced', user_id=user_id)) @LoginRequired() @HasPermissionAllDecorator('hg.admin') def auth_tokens(self): _ = self.request.translate c = self.load_default_context() c.user = self.db_user c.active = 'auth_tokens' c.lifetime_values = AuthTokenModel.get_lifetime_values(translator=_) c.role_values = [ (x, AuthTokenModel.cls._get_role_name(x)) for x in AuthTokenModel.cls.ROLES] c.role_options = [(c.role_values, _("Role"))] c.user_auth_tokens = AuthTokenModel().get_auth_tokens( c.user.user_id, show_expired=True) c.role_vcs = AuthTokenModel.cls.ROLE_VCS return self._get_template_context(c) @LoginRequired() @HasPermissionAllDecorator('hg.admin') def auth_tokens_view(self): _ = self.request.translate c = self.load_default_context() c.user = self.db_user auth_token_id = self.request.POST.get('auth_token_id') if auth_token_id: token = UserApiKeys.get_or_404(auth_token_id) return { 'auth_token': token.api_key } def maybe_attach_token_scope(self, token): # implemented in EE edition pass @LoginRequired() @HasPermissionAllDecorator('hg.admin') @CSRFRequired() def auth_tokens_add(self): _ = self.request.translate c = self.load_default_context() user_id = self.db_user_id c.user = self.db_user user_data = c.user.get_api_data() lifetime = safe_int(self.request.POST.get('lifetime'), -1) description = self.request.POST.get('description') role = self.request.POST.get('role') token = UserModel().add_auth_token( user=c.user.user_id, lifetime_minutes=lifetime, role=role, description=description, scope_callback=self.maybe_attach_token_scope) token_data = token.get_api_data() audit_logger.store_web( 'user.edit.token.add', action_data={ 'data': {'token': token_data, 'user': user_data}}, user=self._rhodecode_user, ) Session().commit() h.flash(_("Auth token successfully created"), category='success') return HTTPFound(h.route_path('edit_user_auth_tokens', user_id=user_id)) @LoginRequired() @HasPermissionAllDecorator('hg.admin') @CSRFRequired() def auth_tokens_delete(self): _ = self.request.translate c = self.load_default_context() user_id = self.db_user_id c.user = self.db_user user_data = c.user.get_api_data() del_auth_token = self.request.POST.get('del_auth_token') if del_auth_token: token = UserApiKeys.get_or_404(del_auth_token) token_data = token.get_api_data() AuthTokenModel().delete(del_auth_token, c.user.user_id) audit_logger.store_web( 'user.edit.token.delete', action_data={ 'data': {'token': token_data, 'user': user_data}}, user=self._rhodecode_user,) Session().commit() h.flash(_("Auth token successfully deleted"), category='success') return HTTPFound(h.route_path('edit_user_auth_tokens', user_id=user_id)) @LoginRequired() @HasPermissionAllDecorator('hg.admin') def ssh_keys(self): _ = self.request.translate c = self.load_default_context() c.user = self.db_user c.active = 'ssh_keys' c.default_key = self.request.GET.get('default_key') c.user_ssh_keys = SshKeyModel().get_ssh_keys(c.user.user_id) return self._get_template_context(c) @LoginRequired() @HasPermissionAllDecorator('hg.admin') def ssh_keys_generate_keypair(self): _ = self.request.translate c = self.load_default_context() c.user = self.db_user c.active = 'ssh_keys_generate' comment = 'RhodeCode-SSH {}'.format(c.user.email or '') private_format = self.request.GET.get('private_format') \ or SshKeyModel.DEFAULT_PRIVATE_KEY_FORMAT c.private, c.public = SshKeyModel().generate_keypair( comment=comment, private_format=private_format) return self._get_template_context(c) @LoginRequired() @HasPermissionAllDecorator('hg.admin') @CSRFRequired() def ssh_keys_add(self): _ = self.request.translate c = self.load_default_context() user_id = self.db_user_id c.user = self.db_user user_data = c.user.get_api_data() key_data = self.request.POST.get('key_data') description = self.request.POST.get('description') fingerprint = 'unknown' try: if not key_data: raise ValueError('Please add a valid public key') key = SshKeyModel().parse_key(key_data.strip()) fingerprint = key.hash_md5() ssh_key = SshKeyModel().create( c.user.user_id, fingerprint, key.keydata, description) ssh_key_data = ssh_key.get_api_data() audit_logger.store_web( 'user.edit.ssh_key.add', action_data={ 'data': {'ssh_key': ssh_key_data, 'user': user_data}}, user=self._rhodecode_user, ) Session().commit() # Trigger an event on change of keys. trigger(SshKeyFileChangeEvent(), self.request.registry) h.flash(_("Ssh Key successfully created"), category='success') except IntegrityError: log.exception("Exception during ssh key saving") err = 'Such key with fingerprint `{}` already exists, ' \ 'please use a different one'.format(fingerprint) h.flash(_('An error occurred during ssh key saving: {}').format(err), category='error') except Exception as e: log.exception("Exception during ssh key saving") h.flash(_('An error occurred during ssh key saving: {}').format(e), category='error') return HTTPFound( h.route_path('edit_user_ssh_keys', user_id=user_id)) @LoginRequired() @HasPermissionAllDecorator('hg.admin') @CSRFRequired() def ssh_keys_delete(self): _ = self.request.translate c = self.load_default_context() user_id = self.db_user_id c.user = self.db_user user_data = c.user.get_api_data() del_ssh_key = self.request.POST.get('del_ssh_key') if del_ssh_key: ssh_key = UserSshKeys.get_or_404(del_ssh_key) ssh_key_data = ssh_key.get_api_data() SshKeyModel().delete(del_ssh_key, c.user.user_id) audit_logger.store_web( 'user.edit.ssh_key.delete', action_data={ 'data': {'ssh_key': ssh_key_data, 'user': user_data}}, user=self._rhodecode_user,) Session().commit() # Trigger an event on change of keys. trigger(SshKeyFileChangeEvent(), self.request.registry) h.flash(_("Ssh key successfully deleted"), category='success') return HTTPFound(h.route_path('edit_user_ssh_keys', user_id=user_id)) @LoginRequired() @HasPermissionAllDecorator('hg.admin') def emails(self): _ = self.request.translate c = self.load_default_context() c.user = self.db_user c.active = 'emails' c.user_email_map = UserEmailMap.query() \ .filter(UserEmailMap.user == c.user).all() return self._get_template_context(c) @LoginRequired() @HasPermissionAllDecorator('hg.admin') @CSRFRequired() def emails_add(self): _ = self.request.translate c = self.load_default_context() user_id = self.db_user_id c.user = self.db_user email = self.request.POST.get('new_email') user_data = c.user.get_api_data() try: form = UserExtraEmailForm(self.request.translate)() data = form.to_python({'email': email}) email = data['email'] UserModel().add_extra_email(c.user.user_id, email) audit_logger.store_web( 'user.edit.email.add', action_data={'email': email, 'user': user_data}, user=self._rhodecode_user) Session().commit() h.flash(_("Added new email address `%s` for user account") % email, category='success') except formencode.Invalid as error: msg = error.unpack_errors()['email'] h.flash(h.escape(msg), category='error') except IntegrityError: log.warning("Email %s already exists", email) h.flash(_('Email `{}` is already registered for another user.').format(email), category='error') except Exception: log.exception("Exception during email saving") h.flash(_('An error occurred during email saving'), category='error') raise HTTPFound(h.route_path('edit_user_emails', user_id=user_id)) @LoginRequired() @HasPermissionAllDecorator('hg.admin') @CSRFRequired() def emails_delete(self): _ = self.request.translate c = self.load_default_context() user_id = self.db_user_id c.user = self.db_user email_id = self.request.POST.get('del_email_id') user_model = UserModel() email = UserEmailMap.query().get(email_id).email user_data = c.user.get_api_data() user_model.delete_extra_email(c.user.user_id, email_id) audit_logger.store_web( 'user.edit.email.delete', action_data={'email': email, 'user': user_data}, user=self._rhodecode_user) Session().commit() h.flash(_("Removed email address from user account"), category='success') raise HTTPFound(h.route_path('edit_user_emails', user_id=user_id)) @LoginRequired() @HasPermissionAllDecorator('hg.admin') def ips(self): _ = self.request.translate c = self.load_default_context() c.user = self.db_user c.active = 'ips' c.user_ip_map = UserIpMap.query() \ .filter(UserIpMap.user == c.user).all() c.inherit_default_ips = c.user.inherit_default_permissions c.default_user_ip_map = UserIpMap.query() \ .filter(UserIpMap.user == User.get_default_user()).all() return self._get_template_context(c) @LoginRequired() @HasPermissionAllDecorator('hg.admin') @CSRFRequired() # NOTE(marcink): this view is allowed for default users, as we can # edit their IP white list def ips_add(self): _ = self.request.translate c = self.load_default_context() user_id = self.db_user_id c.user = self.db_user user_model = UserModel() desc = self.request.POST.get('description') try: ip_list = user_model.parse_ip_range( self.request.POST.get('new_ip')) except Exception as e: ip_list = [] log.exception("Exception during ip saving") h.flash(_('An error occurred during ip saving:%s' % (e,)), category='error') added = [] user_data = c.user.get_api_data() for ip in ip_list: try: form = UserExtraIpForm(self.request.translate)() data = form.to_python({'ip': ip}) ip = data['ip'] user_model.add_extra_ip(c.user.user_id, ip, desc) audit_logger.store_web( 'user.edit.ip.add', action_data={'ip': ip, 'user': user_data}, user=self._rhodecode_user) Session().commit() added.append(ip) except formencode.Invalid as error: msg = error.unpack_errors()['ip'] h.flash(msg, category='error') except Exception: log.exception("Exception during ip saving") h.flash(_('An error occurred during ip saving'), category='error') if added: h.flash( _("Added ips %s to user whitelist") % (', '.join(ip_list), ), category='success') if 'default_user' in self.request.POST: # case for editing global IP list we do it for 'DEFAULT' user raise HTTPFound(h.route_path('admin_permissions_ips')) raise HTTPFound(h.route_path('edit_user_ips', user_id=user_id)) @LoginRequired() @HasPermissionAllDecorator('hg.admin') @CSRFRequired() # NOTE(marcink): this view is allowed for default users, as we can # edit their IP white list def ips_delete(self): _ = self.request.translate c = self.load_default_context() user_id = self.db_user_id c.user = self.db_user ip_id = self.request.POST.get('del_ip_id') user_model = UserModel() user_data = c.user.get_api_data() ip = UserIpMap.query().get(ip_id).ip_addr user_model.delete_extra_ip(c.user.user_id, ip_id) audit_logger.store_web( 'user.edit.ip.delete', action_data={'ip': ip, 'user': user_data}, user=self._rhodecode_user) Session().commit() h.flash(_("Removed ip address from user whitelist"), category='success') if 'default_user' in self.request.POST: # case for editing global IP list we do it for 'DEFAULT' user raise HTTPFound(h.route_path('admin_permissions_ips')) raise HTTPFound(h.route_path('edit_user_ips', user_id=user_id)) @LoginRequired() @HasPermissionAllDecorator('hg.admin') def groups_management(self): c = self.load_default_context() c.user = self.db_user c.data = c.user.group_member groups = [UserGroupModel.get_user_groups_as_dict(group.users_group) for group in c.user.group_member] c.groups = ext_json.str_json(groups) c.active = 'groups' return self._get_template_context(c) @LoginRequired() @HasPermissionAllDecorator('hg.admin') @CSRFRequired() def groups_management_updates(self): _ = self.request.translate c = self.load_default_context() user_id = self.db_user_id c.user = self.db_user user_groups = set(self.request.POST.getall('users_group_id')) user_groups_objects = [] for ugid in user_groups: user_groups_objects.append( UserGroupModel().get_group(safe_int(ugid))) user_group_model = UserGroupModel() added_to_groups, removed_from_groups = \ user_group_model.change_groups(c.user, user_groups_objects) user_data = c.user.get_api_data() for user_group_id in added_to_groups: user_group = UserGroup.get(user_group_id) old_values = user_group.get_api_data() audit_logger.store_web( 'user_group.edit.member.add', action_data={'user': user_data, 'old_data': old_values}, user=self._rhodecode_user) for user_group_id in removed_from_groups: user_group = UserGroup.get(user_group_id) old_values = user_group.get_api_data() audit_logger.store_web( 'user_group.edit.member.delete', action_data={'user': user_data, 'old_data': old_values}, user=self._rhodecode_user) Session().commit() c.active = 'user_groups_management' h.flash(_("Groups successfully changed"), category='success') return HTTPFound(h.route_path( 'edit_user_groups_management', user_id=user_id)) @LoginRequired() @HasPermissionAllDecorator('hg.admin') def user_audit_logs(self): _ = self.request.translate c = self.load_default_context() c.user = self.db_user c.active = 'audit' p = safe_int(self.request.GET.get('page', 1), 1) filter_term = self.request.GET.get('filter') user_log = UserModel().get_user_log(c.user, filter_term) def url_generator(page_num): query_params = { 'page': page_num } if filter_term: query_params['filter'] = filter_term return self.request.current_route_path(_query=query_params) c.audit_logs = SqlPage( user_log, page=p, items_per_page=10, url_maker=url_generator) c.filter_term = filter_term return self._get_template_context(c) @LoginRequired() @HasPermissionAllDecorator('hg.admin') def user_audit_logs_download(self): _ = self.request.translate c = self.load_default_context() c.user = self.db_user user_log = UserModel().get_user_log(c.user, filter_term=None) audit_log_data = {} for entry in user_log: audit_log_data[entry.user_log_id] = entry.get_dict() response = Response(ext_json.formatted_str_json(audit_log_data)) response.content_disposition = f'attachment; filename=user_{c.user.user_id}_audit_logs.json' response.content_type = 'application/json' return response @LoginRequired() @HasPermissionAllDecorator('hg.admin') def user_perms_summary(self): _ = self.request.translate c = self.load_default_context() c.user = self.db_user c.active = 'perms_summary' c.perm_user = c.user.AuthUser(ip_addr=self.request.remote_addr) return self._get_template_context(c) @LoginRequired() @HasPermissionAllDecorator('hg.admin') def user_perms_summary_json(self): self.load_default_context() perm_user = self.db_user.AuthUser(ip_addr=self.request.remote_addr) return perm_user.permissions @LoginRequired() @HasPermissionAllDecorator('hg.admin') def user_caches(self): _ = self.request.translate c = self.load_default_context() c.user = self.db_user c.active = 'caches' c.perm_user = c.user.AuthUser(ip_addr=self.request.remote_addr) cache_namespace_uid = f'cache_user_auth.{self.db_user.user_id}' c.region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid) c.backend = c.region.backend c.user_keys = sorted(c.region.backend.list_keys(prefix=cache_namespace_uid)) return self._get_template_context(c) @LoginRequired() @HasPermissionAllDecorator('hg.admin') @CSRFRequired() def user_caches_update(self): _ = self.request.translate c = self.load_default_context() c.user = self.db_user c.active = 'caches' c.perm_user = c.user.AuthUser(ip_addr=self.request.remote_addr) cache_namespace_uid = f'cache_user_auth.{self.db_user.user_id}' del_keys = rc_cache.clear_cache_namespace('cache_perms', cache_namespace_uid) h.flash(_("Deleted {} cache keys").format(del_keys), category='success') return HTTPFound(h.route_path( 'edit_user_caches', user_id=c.user.user_id))