permissions.py
477 lines
| 16.5 KiB
| text/x-python
|
PythonLexer
r5088 | # Copyright (C) 2016-2023 RhodeCode GmbH | |||
r1941 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
r1943 | import re | |||
r1941 | import logging | |||
import formencode | ||||
r2079 | import formencode.htmlfill | |||
r2042 | import datetime | |||
r1943 | from pyramid.interfaces import IRoutesMapper | |||
r1941 | ||||
from pyramid.httpexceptions import HTTPFound | ||||
from pyramid.renderers import render | ||||
from pyramid.response import Response | ||||
r2042 | from rhodecode.apps._base import BaseAppView, DataGridAppView | |||
r5318 | from rhodecode.apps.ssh_support.events import SshKeyFileChangeEvent | |||
r3412 | from rhodecode import events | |||
r1941 | ||||
from rhodecode.lib import helpers as h | ||||
from rhodecode.lib.auth import ( | ||||
LoginRequired, HasPermissionAllDecorator, CSRFRequired) | ||||
r5065 | from rhodecode.lib.utils2 import aslist, safe_str | |||
r2079 | from rhodecode.model.db import ( | |||
or_, coalesce, User, UserIpMap, UserSshKeys) | ||||
r1941 | from rhodecode.model.forms import ( | |||
ApplicationPermissionsForm, ObjectPermissionsForm, UserPermissionsForm) | ||||
from rhodecode.model.meta import Session | ||||
from rhodecode.model.permission import PermissionModel | ||||
from rhodecode.model.settings import SettingsModel | ||||
log = logging.getLogger(__name__) | ||||
r2042 | class AdminPermissionsView(BaseAppView, DataGridAppView): | |||
r1941 | def load_default_context(self): | |||
c = self._get_local_tmpl_context() | ||||
PermissionModel().set_global_permission_choices( | ||||
c, gettext_translator=self.request.translate) | ||||
return c | ||||
@LoginRequired() | ||||
@HasPermissionAllDecorator('hg.admin') | ||||
def permissions_application(self): | ||||
c = self.load_default_context() | ||||
c.active = 'application' | ||||
c.user = User.get_default_user(refresh=True) | ||||
r3855 | app_settings = c.rc_config | |||
r1941 | defaults = { | |||
'anonymous': c.user.active, | ||||
'default_register_message': app_settings.get( | ||||
'rhodecode_register_message') | ||||
} | ||||
defaults.update(c.user.get_default_perms()) | ||||
data = render('rhodecode:templates/admin/permissions/permissions.mako', | ||||
self._get_template_context(c), self.request) | ||||
html = formencode.htmlfill.render( | ||||
data, | ||||
defaults=defaults, | ||||
encoding="UTF-8", | ||||
force_defaults=False | ||||
) | ||||
return Response(html) | ||||
@LoginRequired() | ||||
@HasPermissionAllDecorator('hg.admin') | ||||
@CSRFRequired() | ||||
def permissions_application_update(self): | ||||
_ = self.request.translate | ||||
c = self.load_default_context() | ||||
c.active = 'application' | ||||
_form = ApplicationPermissionsForm( | ||||
r2351 | self.request.translate, | |||
r1941 | [x[0] for x in c.register_choices], | |||
[x[0] for x in c.password_reset_choices], | ||||
[x[0] for x in c.extern_activate_choices])() | ||||
try: | ||||
form_result = _form.to_python(dict(self.request.POST)) | ||||
form_result.update({'perm_user_name': User.DEFAULT_USER}) | ||||
PermissionModel().update_application_permissions(form_result) | ||||
settings = [ | ||||
('register_message', 'default_register_message'), | ||||
] | ||||
for setting, form_key in settings: | ||||
sett = SettingsModel().create_or_update_setting( | ||||
setting, form_result[form_key]) | ||||
Session().add(sett) | ||||
Session().commit() | ||||
h.flash(_('Application permissions updated successfully'), | ||||
category='success') | ||||
except formencode.Invalid as errors: | ||||
defaults = errors.value | ||||
data = render( | ||||
'rhodecode:templates/admin/permissions/permissions.mako', | ||||
self._get_template_context(c), self.request) | ||||
html = formencode.htmlfill.render( | ||||
data, | ||||
defaults=defaults, | ||||
r5018 | errors=errors.unpack_errors() or {}, | |||
r1941 | prefix_error=False, | |||
encoding="UTF-8", | ||||
force_defaults=False | ||||
) | ||||
return Response(html) | ||||
except Exception: | ||||
log.exception("Exception during update of permissions") | ||||
h.flash(_('Error occurred during update of permissions'), | ||||
category='error') | ||||
r4332 | affected_user_ids = [User.get_default_user_id()] | |||
r3824 | PermissionModel().trigger_permission_flush(affected_user_ids) | |||
r3412 | ||||
r1941 | raise HTTPFound(h.route_path('admin_permissions_application')) | |||
@LoginRequired() | ||||
@HasPermissionAllDecorator('hg.admin') | ||||
def permissions_objects(self): | ||||
c = self.load_default_context() | ||||
c.active = 'objects' | ||||
c.user = User.get_default_user(refresh=True) | ||||
defaults = {} | ||||
defaults.update(c.user.get_default_perms()) | ||||
data = render( | ||||
'rhodecode:templates/admin/permissions/permissions.mako', | ||||
self._get_template_context(c), self.request) | ||||
html = formencode.htmlfill.render( | ||||
data, | ||||
defaults=defaults, | ||||
encoding="UTF-8", | ||||
force_defaults=False | ||||
) | ||||
return Response(html) | ||||
@LoginRequired() | ||||
@HasPermissionAllDecorator('hg.admin') | ||||
@CSRFRequired() | ||||
def permissions_objects_update(self): | ||||
_ = self.request.translate | ||||
c = self.load_default_context() | ||||
c.active = 'objects' | ||||
_form = ObjectPermissionsForm( | ||||
r2351 | self.request.translate, | |||
r1941 | [x[0] for x in c.repo_perms_choices], | |||
[x[0] for x in c.group_perms_choices], | ||||
r2975 | [x[0] for x in c.user_group_perms_choices], | |||
)() | ||||
r1941 | ||||
try: | ||||
form_result = _form.to_python(dict(self.request.POST)) | ||||
form_result.update({'perm_user_name': User.DEFAULT_USER}) | ||||
PermissionModel().update_object_permissions(form_result) | ||||
Session().commit() | ||||
h.flash(_('Object permissions updated successfully'), | ||||
category='success') | ||||
except formencode.Invalid as errors: | ||||
defaults = errors.value | ||||
data = render( | ||||
'rhodecode:templates/admin/permissions/permissions.mako', | ||||
self._get_template_context(c), self.request) | ||||
html = formencode.htmlfill.render( | ||||
data, | ||||
defaults=defaults, | ||||
r5018 | errors=errors.unpack_errors() or {}, | |||
r1941 | prefix_error=False, | |||
encoding="UTF-8", | ||||
force_defaults=False | ||||
) | ||||
return Response(html) | ||||
except Exception: | ||||
log.exception("Exception during update of permissions") | ||||
h.flash(_('Error occurred during update of permissions'), | ||||
category='error') | ||||
r4332 | affected_user_ids = [User.get_default_user_id()] | |||
r3824 | PermissionModel().trigger_permission_flush(affected_user_ids) | |||
r3412 | ||||
r1941 | raise HTTPFound(h.route_path('admin_permissions_object')) | |||
@LoginRequired() | ||||
@HasPermissionAllDecorator('hg.admin') | ||||
r2975 | def permissions_branch(self): | |||
c = self.load_default_context() | ||||
c.active = 'branch' | ||||
c.user = User.get_default_user(refresh=True) | ||||
defaults = {} | ||||
defaults.update(c.user.get_default_perms()) | ||||
data = render( | ||||
'rhodecode:templates/admin/permissions/permissions.mako', | ||||
self._get_template_context(c), self.request) | ||||
html = formencode.htmlfill.render( | ||||
data, | ||||
defaults=defaults, | ||||
encoding="UTF-8", | ||||
force_defaults=False | ||||
) | ||||
return Response(html) | ||||
@LoginRequired() | ||||
@HasPermissionAllDecorator('hg.admin') | ||||
r1941 | def permissions_global(self): | |||
c = self.load_default_context() | ||||
c.active = 'global' | ||||
c.user = User.get_default_user(refresh=True) | ||||
defaults = {} | ||||
defaults.update(c.user.get_default_perms()) | ||||
data = render( | ||||
'rhodecode:templates/admin/permissions/permissions.mako', | ||||
self._get_template_context(c), self.request) | ||||
html = formencode.htmlfill.render( | ||||
data, | ||||
defaults=defaults, | ||||
encoding="UTF-8", | ||||
force_defaults=False | ||||
) | ||||
return Response(html) | ||||
@LoginRequired() | ||||
@HasPermissionAllDecorator('hg.admin') | ||||
@CSRFRequired() | ||||
def permissions_global_update(self): | ||||
_ = self.request.translate | ||||
c = self.load_default_context() | ||||
c.active = 'global' | ||||
_form = UserPermissionsForm( | ||||
r2351 | self.request.translate, | |||
r1941 | [x[0] for x in c.repo_create_choices], | |||
[x[0] for x in c.repo_create_on_write_choices], | ||||
[x[0] for x in c.repo_group_create_choices], | ||||
[x[0] for x in c.user_group_create_choices], | ||||
[x[0] for x in c.fork_choices], | ||||
[x[0] for x in c.inherit_default_permission_choices])() | ||||
try: | ||||
form_result = _form.to_python(dict(self.request.POST)) | ||||
form_result.update({'perm_user_name': User.DEFAULT_USER}) | ||||
PermissionModel().update_user_permissions(form_result) | ||||
Session().commit() | ||||
h.flash(_('Global permissions updated successfully'), | ||||
category='success') | ||||
except formencode.Invalid as errors: | ||||
defaults = errors.value | ||||
data = render( | ||||
'rhodecode:templates/admin/permissions/permissions.mako', | ||||
self._get_template_context(c), self.request) | ||||
html = formencode.htmlfill.render( | ||||
data, | ||||
defaults=defaults, | ||||
r5018 | errors=errors.unpack_errors() or {}, | |||
r1941 | prefix_error=False, | |||
encoding="UTF-8", | ||||
force_defaults=False | ||||
) | ||||
return Response(html) | ||||
except Exception: | ||||
log.exception("Exception during update of permissions") | ||||
h.flash(_('Error occurred during update of permissions'), | ||||
category='error') | ||||
r4332 | affected_user_ids = [User.get_default_user_id()] | |||
r3824 | PermissionModel().trigger_permission_flush(affected_user_ids) | |||
r3412 | ||||
r1941 | raise HTTPFound(h.route_path('admin_permissions_global')) | |||
@LoginRequired() | ||||
@HasPermissionAllDecorator('hg.admin') | ||||
def permissions_ips(self): | ||||
c = self.load_default_context() | ||||
c.active = 'ips' | ||||
c.user = User.get_default_user(refresh=True) | ||||
c.user_ip_map = ( | ||||
UserIpMap.query().filter(UserIpMap.user == c.user).all()) | ||||
return self._get_template_context(c) | ||||
@LoginRequired() | ||||
@HasPermissionAllDecorator('hg.admin') | ||||
def permissions_overview(self): | ||||
c = self.load_default_context() | ||||
c.active = 'perms' | ||||
c.user = User.get_default_user(refresh=True) | ||||
r1997 | c.perm_user = c.user.AuthUser() | |||
r1941 | return self._get_template_context(c) | |||
r1943 | ||||
@LoginRequired() | ||||
@HasPermissionAllDecorator('hg.admin') | ||||
def auth_token_access(self): | ||||
from rhodecode import CONFIG | ||||
c = self.load_default_context() | ||||
c.active = 'auth_token_access' | ||||
c.user = User.get_default_user(refresh=True) | ||||
r1997 | c.perm_user = c.user.AuthUser() | |||
r1943 | ||||
mapper = self.request.registry.queryUtility(IRoutesMapper) | ||||
c.view_data = [] | ||||
r4973 | _argument_prog = re.compile(r'\{(.*?)\}|:\((.*)\)') | |||
r1943 | introspector = self.request.registry.introspector | |||
view_intr = {} | ||||
for view_data in introspector.get_category('views'): | ||||
intr = view_data['introspectable'] | ||||
if 'route_name' in intr and intr['attr']: | ||||
r1950 | view_intr[intr['route_name']] = '{}:{}'.format( | |||
r4936 | str(intr['derived_callable'].__name__), intr['attr'] | |||
r1943 | ) | |||
c.whitelist_key = 'api_access_controllers_whitelist' | ||||
c.whitelist_file = CONFIG.get('__file__') | ||||
whitelist_views = aslist( | ||||
CONFIG.get(c.whitelist_key), sep=',') | ||||
for route_info in mapper.get_routes(): | ||||
if not route_info.name.startswith('__'): | ||||
routepath = route_info.pattern | ||||
def replace(matchobj): | ||||
if matchobj.group(1): | ||||
return "{%s}" % matchobj.group(1).split(':')[0] | ||||
else: | ||||
return "{%s}" % matchobj.group(2) | ||||
routepath = _argument_prog.sub(replace, routepath) | ||||
if not routepath.startswith('/'): | ||||
routepath = '/' + routepath | ||||
view_fqn = view_intr.get(route_info.name, 'NOT AVAILABLE') | ||||
active = view_fqn in whitelist_views | ||||
c.view_data.append((route_info.name, view_fqn, routepath, active)) | ||||
c.whitelist_views = whitelist_views | ||||
return self._get_template_context(c) | ||||
r2042 | ||||
r2047 | def ssh_enabled(self): | |||
return self.request.registry.settings.get( | ||||
'ssh.generate_authorized_keyfile') | ||||
r2042 | @LoginRequired() | |||
@HasPermissionAllDecorator('hg.admin') | ||||
def ssh_keys(self): | ||||
c = self.load_default_context() | ||||
c.active = 'ssh_keys' | ||||
r2047 | c.ssh_enabled = self.ssh_enabled() | |||
r2042 | return self._get_template_context(c) | |||
@LoginRequired() | ||||
@HasPermissionAllDecorator('hg.admin') | ||||
def ssh_keys_data(self): | ||||
_ = self.request.translate | ||||
r2351 | self.load_default_context() | |||
r2042 | column_map = { | |||
'fingerprint': 'ssh_key_fingerprint', | ||||
'username': User.username | ||||
} | ||||
draw, start, limit = self._extract_chunk(self.request) | ||||
search_q, order_by, order_dir = self._extract_ordering( | ||||
self.request, column_map=column_map) | ||||
ssh_keys_data_total_count = UserSshKeys.query()\ | ||||
.count() | ||||
# json generate | ||||
base_q = UserSshKeys.query().join(UserSshKeys.user) | ||||
if search_q: | ||||
r5093 | like_expression = f'%{safe_str(search_q)}%' | |||
r2042 | base_q = base_q.filter(or_( | |||
User.username.ilike(like_expression), | ||||
UserSshKeys.ssh_key_fingerprint.ilike(like_expression), | ||||
)) | ||||
users_data_total_filtered_count = base_q.count() | ||||
sort_col = self._get_order_col(order_by, UserSshKeys) | ||||
if sort_col: | ||||
if order_dir == 'asc': | ||||
# handle null values properly to order by NULL last | ||||
if order_by in ['created_on']: | ||||
sort_col = coalesce(sort_col, datetime.date.max) | ||||
sort_col = sort_col.asc() | ||||
else: | ||||
# handle null values properly to order by NULL last | ||||
if order_by in ['created_on']: | ||||
sort_col = coalesce(sort_col, datetime.date.min) | ||||
sort_col = sort_col.desc() | ||||
base_q = base_q.order_by(sort_col) | ||||
base_q = base_q.offset(start).limit(limit) | ||||
ssh_keys = base_q.all() | ||||
ssh_keys_data = [] | ||||
for ssh_key in ssh_keys: | ||||
ssh_keys_data.append({ | ||||
"username": h.gravatar_with_user(self.request, ssh_key.user.username), | ||||
"fingerprint": ssh_key.ssh_key_fingerprint, | ||||
"description": ssh_key.description, | ||||
"created_on": h.format_date(ssh_key.created_on), | ||||
r2134 | "accessed_on": h.format_date(ssh_key.accessed_on), | |||
r2042 | "action": h.link_to( | |||
_('Edit'), h.route_path('edit_user_ssh_keys', | ||||
user_id=ssh_key.user.user_id)) | ||||
}) | ||||
data = ({ | ||||
'draw': draw, | ||||
'data': ssh_keys_data, | ||||
'recordsTotal': ssh_keys_data_total_count, | ||||
'recordsFiltered': users_data_total_filtered_count, | ||||
}) | ||||
return data | ||||
@LoginRequired() | ||||
@HasPermissionAllDecorator('hg.admin') | ||||
@CSRFRequired() | ||||
def ssh_keys_update(self): | ||||
_ = self.request.translate | ||||
self.load_default_context() | ||||
r2047 | ssh_enabled = self.ssh_enabled() | |||
r2042 | key_file = self.request.registry.settings.get( | |||
'ssh.authorized_keys_file_path') | ||||
if ssh_enabled: | ||||
r3412 | events.trigger(SshKeyFileChangeEvent(), self.request.registry) | |||
r2042 | h.flash(_('Updated SSH keys file: {}').format(key_file), | |||
category='success') | ||||
else: | ||||
h.flash(_('SSH key support is disabled in .ini file'), | ||||
category='warning') | ||||
raise HTTPFound(h.route_path('admin_permissions_ssh_keys')) | ||||