|
|
# Copyright (C) 2016-2023 RhodeCode GmbH
|
|
|
#
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU Affero General Public License, version 3
|
|
|
# (only), as published by the Free Software Foundation.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
#
|
|
|
# This program is dual-licensed. If you wish to learn more about the
|
|
|
# RhodeCode Enterprise Edition, including its added features, Support services,
|
|
|
# and proprietary license terms, please see https://rhodecode.com/licenses/
|
|
|
|
|
|
import re
|
|
|
import logging
|
|
|
import formencode
|
|
|
import formencode.htmlfill
|
|
|
import datetime
|
|
|
from pyramid.interfaces import IRoutesMapper
|
|
|
|
|
|
from pyramid.httpexceptions import HTTPFound
|
|
|
from pyramid.renderers import render
|
|
|
from pyramid.response import Response
|
|
|
|
|
|
from rhodecode.apps._base import BaseAppView, DataGridAppView
|
|
|
from rhodecode.apps.ssh_support.events import SshKeyFileChangeEvent
|
|
|
from rhodecode import events
|
|
|
|
|
|
from rhodecode.lib import helpers as h
|
|
|
from rhodecode.lib.auth import (
|
|
|
LoginRequired, HasPermissionAllDecorator, CSRFRequired)
|
|
|
from rhodecode.lib.utils2 import aslist, safe_str
|
|
|
from rhodecode.model.db import (
|
|
|
or_, coalesce, User, UserIpMap, UserSshKeys)
|
|
|
from rhodecode.model.forms import (
|
|
|
ApplicationPermissionsForm, ObjectPermissionsForm, UserPermissionsForm)
|
|
|
from rhodecode.model.meta import Session
|
|
|
from rhodecode.model.permission import PermissionModel
|
|
|
from rhodecode.model.settings import SettingsModel
|
|
|
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
class AdminPermissionsView(BaseAppView, DataGridAppView):
|
|
|
def load_default_context(self):
|
|
|
c = self._get_local_tmpl_context()
|
|
|
PermissionModel().set_global_permission_choices(
|
|
|
c, gettext_translator=self.request.translate)
|
|
|
return c
|
|
|
|
|
|
@LoginRequired()
|
|
|
@HasPermissionAllDecorator('hg.admin')
|
|
|
def permissions_application(self):
|
|
|
c = self.load_default_context()
|
|
|
c.active = 'application'
|
|
|
|
|
|
c.user = User.get_default_user(refresh=True)
|
|
|
|
|
|
app_settings = c.rc_config
|
|
|
|
|
|
defaults = {
|
|
|
'anonymous': c.user.active,
|
|
|
'default_register_message': app_settings.get(
|
|
|
'rhodecode_register_message')
|
|
|
}
|
|
|
defaults.update(c.user.get_default_perms())
|
|
|
|
|
|
data = render('rhodecode:templates/admin/permissions/permissions.mako',
|
|
|
self._get_template_context(c), self.request)
|
|
|
html = formencode.htmlfill.render(
|
|
|
data,
|
|
|
defaults=defaults,
|
|
|
encoding="UTF-8",
|
|
|
force_defaults=False
|
|
|
)
|
|
|
return Response(html)
|
|
|
|
|
|
@LoginRequired()
|
|
|
@HasPermissionAllDecorator('hg.admin')
|
|
|
@CSRFRequired()
|
|
|
def permissions_application_update(self):
|
|
|
_ = self.request.translate
|
|
|
c = self.load_default_context()
|
|
|
c.active = 'application'
|
|
|
|
|
|
_form = ApplicationPermissionsForm(
|
|
|
self.request.translate,
|
|
|
[x[0] for x in c.register_choices],
|
|
|
[x[0] for x in c.password_reset_choices],
|
|
|
[x[0] for x in c.extern_activate_choices])()
|
|
|
|
|
|
try:
|
|
|
form_result = _form.to_python(dict(self.request.POST))
|
|
|
form_result.update({'perm_user_name': User.DEFAULT_USER})
|
|
|
PermissionModel().update_application_permissions(form_result)
|
|
|
|
|
|
settings = [
|
|
|
('register_message', 'default_register_message'),
|
|
|
]
|
|
|
for setting, form_key in settings:
|
|
|
sett = SettingsModel().create_or_update_setting(
|
|
|
setting, form_result[form_key])
|
|
|
Session().add(sett)
|
|
|
|
|
|
Session().commit()
|
|
|
h.flash(_('Application permissions updated successfully'),
|
|
|
category='success')
|
|
|
|
|
|
except formencode.Invalid as errors:
|
|
|
defaults = errors.value
|
|
|
|
|
|
data = render(
|
|
|
'rhodecode:templates/admin/permissions/permissions.mako',
|
|
|
self._get_template_context(c), self.request)
|
|
|
html = formencode.htmlfill.render(
|
|
|
data,
|
|
|
defaults=defaults,
|
|
|
errors=errors.unpack_errors() or {},
|
|
|
prefix_error=False,
|
|
|
encoding="UTF-8",
|
|
|
force_defaults=False
|
|
|
)
|
|
|
return Response(html)
|
|
|
|
|
|
except Exception:
|
|
|
log.exception("Exception during update of permissions")
|
|
|
h.flash(_('Error occurred during update of permissions'),
|
|
|
category='error')
|
|
|
|
|
|
affected_user_ids = [User.get_default_user_id()]
|
|
|
PermissionModel().trigger_permission_flush(affected_user_ids)
|
|
|
|
|
|
raise HTTPFound(h.route_path('admin_permissions_application'))
|
|
|
|
|
|
@LoginRequired()
|
|
|
@HasPermissionAllDecorator('hg.admin')
|
|
|
def permissions_objects(self):
|
|
|
c = self.load_default_context()
|
|
|
c.active = 'objects'
|
|
|
|
|
|
c.user = User.get_default_user(refresh=True)
|
|
|
defaults = {}
|
|
|
defaults.update(c.user.get_default_perms())
|
|
|
|
|
|
data = render(
|
|
|
'rhodecode:templates/admin/permissions/permissions.mako',
|
|
|
self._get_template_context(c), self.request)
|
|
|
html = formencode.htmlfill.render(
|
|
|
data,
|
|
|
defaults=defaults,
|
|
|
encoding="UTF-8",
|
|
|
force_defaults=False
|
|
|
)
|
|
|
return Response(html)
|
|
|
|
|
|
@LoginRequired()
|
|
|
@HasPermissionAllDecorator('hg.admin')
|
|
|
@CSRFRequired()
|
|
|
def permissions_objects_update(self):
|
|
|
_ = self.request.translate
|
|
|
c = self.load_default_context()
|
|
|
c.active = 'objects'
|
|
|
|
|
|
_form = ObjectPermissionsForm(
|
|
|
self.request.translate,
|
|
|
[x[0] for x in c.repo_perms_choices],
|
|
|
[x[0] for x in c.group_perms_choices],
|
|
|
[x[0] for x in c.user_group_perms_choices],
|
|
|
)()
|
|
|
|
|
|
try:
|
|
|
form_result = _form.to_python(dict(self.request.POST))
|
|
|
form_result.update({'perm_user_name': User.DEFAULT_USER})
|
|
|
PermissionModel().update_object_permissions(form_result)
|
|
|
|
|
|
Session().commit()
|
|
|
h.flash(_('Object permissions updated successfully'),
|
|
|
category='success')
|
|
|
|
|
|
except formencode.Invalid as errors:
|
|
|
defaults = errors.value
|
|
|
|
|
|
data = render(
|
|
|
'rhodecode:templates/admin/permissions/permissions.mako',
|
|
|
self._get_template_context(c), self.request)
|
|
|
html = formencode.htmlfill.render(
|
|
|
data,
|
|
|
defaults=defaults,
|
|
|
errors=errors.unpack_errors() or {},
|
|
|
prefix_error=False,
|
|
|
encoding="UTF-8",
|
|
|
force_defaults=False
|
|
|
)
|
|
|
return Response(html)
|
|
|
except Exception:
|
|
|
log.exception("Exception during update of permissions")
|
|
|
h.flash(_('Error occurred during update of permissions'),
|
|
|
category='error')
|
|
|
|
|
|
affected_user_ids = [User.get_default_user_id()]
|
|
|
PermissionModel().trigger_permission_flush(affected_user_ids)
|
|
|
|
|
|
raise HTTPFound(h.route_path('admin_permissions_object'))
|
|
|
|
|
|
@LoginRequired()
|
|
|
@HasPermissionAllDecorator('hg.admin')
|
|
|
def permissions_branch(self):
|
|
|
c = self.load_default_context()
|
|
|
c.active = 'branch'
|
|
|
|
|
|
c.user = User.get_default_user(refresh=True)
|
|
|
defaults = {}
|
|
|
defaults.update(c.user.get_default_perms())
|
|
|
|
|
|
data = render(
|
|
|
'rhodecode:templates/admin/permissions/permissions.mako',
|
|
|
self._get_template_context(c), self.request)
|
|
|
html = formencode.htmlfill.render(
|
|
|
data,
|
|
|
defaults=defaults,
|
|
|
encoding="UTF-8",
|
|
|
force_defaults=False
|
|
|
)
|
|
|
return Response(html)
|
|
|
|
|
|
@LoginRequired()
|
|
|
@HasPermissionAllDecorator('hg.admin')
|
|
|
def permissions_global(self):
|
|
|
c = self.load_default_context()
|
|
|
c.active = 'global'
|
|
|
|
|
|
c.user = User.get_default_user(refresh=True)
|
|
|
defaults = {}
|
|
|
defaults.update(c.user.get_default_perms())
|
|
|
|
|
|
data = render(
|
|
|
'rhodecode:templates/admin/permissions/permissions.mako',
|
|
|
self._get_template_context(c), self.request)
|
|
|
html = formencode.htmlfill.render(
|
|
|
data,
|
|
|
defaults=defaults,
|
|
|
encoding="UTF-8",
|
|
|
force_defaults=False
|
|
|
)
|
|
|
return Response(html)
|
|
|
|
|
|
@LoginRequired()
|
|
|
@HasPermissionAllDecorator('hg.admin')
|
|
|
@CSRFRequired()
|
|
|
def permissions_global_update(self):
|
|
|
_ = self.request.translate
|
|
|
c = self.load_default_context()
|
|
|
c.active = 'global'
|
|
|
|
|
|
_form = UserPermissionsForm(
|
|
|
self.request.translate,
|
|
|
[x[0] for x in c.repo_create_choices],
|
|
|
[x[0] for x in c.repo_create_on_write_choices],
|
|
|
[x[0] for x in c.repo_group_create_choices],
|
|
|
[x[0] for x in c.user_group_create_choices],
|
|
|
[x[0] for x in c.fork_choices],
|
|
|
[x[0] for x in c.inherit_default_permission_choices])()
|
|
|
|
|
|
try:
|
|
|
form_result = _form.to_python(dict(self.request.POST))
|
|
|
form_result.update({'perm_user_name': User.DEFAULT_USER})
|
|
|
PermissionModel().update_user_permissions(form_result)
|
|
|
|
|
|
Session().commit()
|
|
|
h.flash(_('Global permissions updated successfully'),
|
|
|
category='success')
|
|
|
|
|
|
except formencode.Invalid as errors:
|
|
|
defaults = errors.value
|
|
|
|
|
|
data = render(
|
|
|
'rhodecode:templates/admin/permissions/permissions.mako',
|
|
|
self._get_template_context(c), self.request)
|
|
|
html = formencode.htmlfill.render(
|
|
|
data,
|
|
|
defaults=defaults,
|
|
|
errors=errors.unpack_errors() or {},
|
|
|
prefix_error=False,
|
|
|
encoding="UTF-8",
|
|
|
force_defaults=False
|
|
|
)
|
|
|
return Response(html)
|
|
|
except Exception:
|
|
|
log.exception("Exception during update of permissions")
|
|
|
h.flash(_('Error occurred during update of permissions'),
|
|
|
category='error')
|
|
|
|
|
|
affected_user_ids = [User.get_default_user_id()]
|
|
|
PermissionModel().trigger_permission_flush(affected_user_ids)
|
|
|
|
|
|
raise HTTPFound(h.route_path('admin_permissions_global'))
|
|
|
|
|
|
@LoginRequired()
|
|
|
@HasPermissionAllDecorator('hg.admin')
|
|
|
def permissions_ips(self):
|
|
|
c = self.load_default_context()
|
|
|
c.active = 'ips'
|
|
|
|
|
|
c.user = User.get_default_user(refresh=True)
|
|
|
c.user_ip_map = (
|
|
|
UserIpMap.query().filter(UserIpMap.user == c.user).all())
|
|
|
|
|
|
return self._get_template_context(c)
|
|
|
|
|
|
@LoginRequired()
|
|
|
@HasPermissionAllDecorator('hg.admin')
|
|
|
def permissions_overview(self):
|
|
|
c = self.load_default_context()
|
|
|
c.active = 'perms'
|
|
|
|
|
|
c.user = User.get_default_user(refresh=True)
|
|
|
c.perm_user = c.user.AuthUser()
|
|
|
return self._get_template_context(c)
|
|
|
|
|
|
@LoginRequired()
|
|
|
@HasPermissionAllDecorator('hg.admin')
|
|
|
def auth_token_access(self):
|
|
|
from rhodecode import CONFIG
|
|
|
|
|
|
c = self.load_default_context()
|
|
|
c.active = 'auth_token_access'
|
|
|
|
|
|
c.user = User.get_default_user(refresh=True)
|
|
|
c.perm_user = c.user.AuthUser()
|
|
|
|
|
|
mapper = self.request.registry.queryUtility(IRoutesMapper)
|
|
|
c.view_data = []
|
|
|
|
|
|
_argument_prog = re.compile(r'\{(.*?)\}|:\((.*)\)')
|
|
|
introspector = self.request.registry.introspector
|
|
|
|
|
|
view_intr = {}
|
|
|
for view_data in introspector.get_category('views'):
|
|
|
intr = view_data['introspectable']
|
|
|
|
|
|
if 'route_name' in intr and intr['attr']:
|
|
|
view_intr[intr['route_name']] = '{}:{}'.format(
|
|
|
str(intr['derived_callable'].__name__), intr['attr']
|
|
|
)
|
|
|
|
|
|
c.whitelist_key = 'api_access_controllers_whitelist'
|
|
|
c.whitelist_file = CONFIG.get('__file__')
|
|
|
whitelist_views = aslist(
|
|
|
CONFIG.get(c.whitelist_key), sep=',')
|
|
|
|
|
|
for route_info in mapper.get_routes():
|
|
|
if not route_info.name.startswith('__'):
|
|
|
routepath = route_info.pattern
|
|
|
|
|
|
def replace(matchobj):
|
|
|
if matchobj.group(1):
|
|
|
return "{%s}" % matchobj.group(1).split(':')[0]
|
|
|
else:
|
|
|
return "{%s}" % matchobj.group(2)
|
|
|
|
|
|
routepath = _argument_prog.sub(replace, routepath)
|
|
|
|
|
|
if not routepath.startswith('/'):
|
|
|
routepath = '/' + routepath
|
|
|
|
|
|
view_fqn = view_intr.get(route_info.name, 'NOT AVAILABLE')
|
|
|
active = view_fqn in whitelist_views
|
|
|
c.view_data.append((route_info.name, view_fqn, routepath, active))
|
|
|
|
|
|
c.whitelist_views = whitelist_views
|
|
|
return self._get_template_context(c)
|
|
|
|
|
|
def ssh_enabled(self):
|
|
|
return self.request.registry.settings.get(
|
|
|
'ssh.generate_authorized_keyfile')
|
|
|
|
|
|
@LoginRequired()
|
|
|
@HasPermissionAllDecorator('hg.admin')
|
|
|
def ssh_keys(self):
|
|
|
c = self.load_default_context()
|
|
|
c.active = 'ssh_keys'
|
|
|
c.ssh_enabled = self.ssh_enabled()
|
|
|
return self._get_template_context(c)
|
|
|
|
|
|
@LoginRequired()
|
|
|
@HasPermissionAllDecorator('hg.admin')
|
|
|
def ssh_keys_data(self):
|
|
|
_ = self.request.translate
|
|
|
self.load_default_context()
|
|
|
column_map = {
|
|
|
'fingerprint': 'ssh_key_fingerprint',
|
|
|
'username': User.username
|
|
|
}
|
|
|
draw, start, limit = self._extract_chunk(self.request)
|
|
|
search_q, order_by, order_dir = self._extract_ordering(
|
|
|
self.request, column_map=column_map)
|
|
|
|
|
|
ssh_keys_data_total_count = UserSshKeys.query()\
|
|
|
.count()
|
|
|
|
|
|
# json generate
|
|
|
base_q = UserSshKeys.query().join(UserSshKeys.user)
|
|
|
|
|
|
if search_q:
|
|
|
like_expression = f'%{safe_str(search_q)}%'
|
|
|
base_q = base_q.filter(or_(
|
|
|
User.username.ilike(like_expression),
|
|
|
UserSshKeys.ssh_key_fingerprint.ilike(like_expression),
|
|
|
))
|
|
|
|
|
|
users_data_total_filtered_count = base_q.count()
|
|
|
|
|
|
sort_col = self._get_order_col(order_by, UserSshKeys)
|
|
|
if sort_col:
|
|
|
if order_dir == 'asc':
|
|
|
# handle null values properly to order by NULL last
|
|
|
if order_by in ['created_on']:
|
|
|
sort_col = coalesce(sort_col, datetime.date.max)
|
|
|
sort_col = sort_col.asc()
|
|
|
else:
|
|
|
# handle null values properly to order by NULL last
|
|
|
if order_by in ['created_on']:
|
|
|
sort_col = coalesce(sort_col, datetime.date.min)
|
|
|
sort_col = sort_col.desc()
|
|
|
|
|
|
base_q = base_q.order_by(sort_col)
|
|
|
base_q = base_q.offset(start).limit(limit)
|
|
|
|
|
|
ssh_keys = base_q.all()
|
|
|
|
|
|
ssh_keys_data = []
|
|
|
for ssh_key in ssh_keys:
|
|
|
ssh_keys_data.append({
|
|
|
"username": h.gravatar_with_user(self.request, ssh_key.user.username),
|
|
|
"fingerprint": ssh_key.ssh_key_fingerprint,
|
|
|
"description": ssh_key.description,
|
|
|
"created_on": h.format_date(ssh_key.created_on),
|
|
|
"accessed_on": h.format_date(ssh_key.accessed_on),
|
|
|
"action": h.link_to(
|
|
|
_('Edit'), h.route_path('edit_user_ssh_keys',
|
|
|
user_id=ssh_key.user.user_id))
|
|
|
})
|
|
|
|
|
|
data = ({
|
|
|
'draw': draw,
|
|
|
'data': ssh_keys_data,
|
|
|
'recordsTotal': ssh_keys_data_total_count,
|
|
|
'recordsFiltered': users_data_total_filtered_count,
|
|
|
})
|
|
|
|
|
|
return data
|
|
|
|
|
|
@LoginRequired()
|
|
|
@HasPermissionAllDecorator('hg.admin')
|
|
|
@CSRFRequired()
|
|
|
def ssh_keys_update(self):
|
|
|
_ = self.request.translate
|
|
|
self.load_default_context()
|
|
|
|
|
|
ssh_enabled = self.ssh_enabled()
|
|
|
key_file = self.request.registry.settings.get(
|
|
|
'ssh.authorized_keys_file_path')
|
|
|
if ssh_enabled:
|
|
|
events.trigger(SshKeyFileChangeEvent(), self.request.registry)
|
|
|
h.flash(_('Updated SSH keys file: {}').format(key_file),
|
|
|
category='success')
|
|
|
else:
|
|
|
h.flash(_('SSH key support is disabled in .ini file'),
|
|
|
category='warning')
|
|
|
|
|
|
raise HTTPFound(h.route_path('admin_permissions_ssh_keys'))
|
|
|
|