##// END OF EJS Templates
feat(archive-cache): added option to define a configurable top-level bucket for all shards
feat(archive-cache): added option to define a configurable top-level bucket for all shards

File last commit:

r5318:ad20d5fb default
r5445:fdcdfe77 default
Show More
permissions.py
477 lines | 16.5 KiB | text/x-python | PythonLexer
copyrights: updated for 2023
r5088 # Copyright (C) 2016-2023 RhodeCode GmbH
global-permissions: ported controller to pyramid view....
r1941 #
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
permissions: expose new view that lists all available views for usage in whitelist access.
r1943 import re
global-permissions: ported controller to pyramid view....
r1941 import logging
import formencode
admin: cleanup imports.
r2079 import formencode.htmlfill
ssh-keys: added admin panel for managing globally all SSH Keys....
r2042 import datetime
permissions: expose new view that lists all available views for usage in whitelist access.
r1943 from pyramid.interfaces import IRoutesMapper
global-permissions: ported controller to pyramid view....
r1941
from pyramid.httpexceptions import HTTPFound
from pyramid.renderers import render
from pyramid.response import Response
ssh-keys: added admin panel for managing globally all SSH Keys....
r2042 from rhodecode.apps._base import BaseAppView, DataGridAppView
chore(imports): optimize imports for ssh events
r5318 from rhodecode.apps.ssh_support.events import SshKeyFileChangeEvent
permissions: flush default user permissions on global app permission changes.
r3412 from rhodecode import events
global-permissions: ported controller to pyramid view....
r1941
from rhodecode.lib import helpers as h
from rhodecode.lib.auth import (
LoginRequired, HasPermissionAllDecorator, CSRFRequired)
core: multiple fixes to unicode vs str usage...
r5065 from rhodecode.lib.utils2 import aslist, safe_str
admin: cleanup imports.
r2079 from rhodecode.model.db import (
or_, coalesce, User, UserIpMap, UserSshKeys)
global-permissions: ported controller to pyramid view....
r1941 from rhodecode.model.forms import (
ApplicationPermissionsForm, ObjectPermissionsForm, UserPermissionsForm)
from rhodecode.model.meta import Session
from rhodecode.model.permission import PermissionModel
from rhodecode.model.settings import SettingsModel
log = logging.getLogger(__name__)
ssh-keys: added admin panel for managing globally all SSH Keys....
r2042 class AdminPermissionsView(BaseAppView, DataGridAppView):
global-permissions: ported controller to pyramid view....
r1941 def load_default_context(self):
c = self._get_local_tmpl_context()
PermissionModel().set_global_permission_choices(
c, gettext_translator=self.request.translate)
return c
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
def permissions_application(self):
c = self.load_default_context()
c.active = 'application'
c.user = User.get_default_user(refresh=True)
settings: reduce number of settings fetch since it uses locking for cache invalidation and is generally slow....
r3855 app_settings = c.rc_config
global-permissions: ported controller to pyramid view....
r1941 defaults = {
'anonymous': c.user.active,
'default_register_message': app_settings.get(
'rhodecode_register_message')
}
defaults.update(c.user.get_default_perms())
data = render('rhodecode:templates/admin/permissions/permissions.mako',
self._get_template_context(c), self.request)
html = formencode.htmlfill.render(
data,
defaults=defaults,
encoding="UTF-8",
force_defaults=False
)
return Response(html)
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@CSRFRequired()
def permissions_application_update(self):
_ = self.request.translate
c = self.load_default_context()
c.active = 'application'
_form = ApplicationPermissionsForm(
pylons: remove pylons as dependency...
r2351 self.request.translate,
global-permissions: ported controller to pyramid view....
r1941 [x[0] for x in c.register_choices],
[x[0] for x in c.password_reset_choices],
[x[0] for x in c.extern_activate_choices])()
try:
form_result = _form.to_python(dict(self.request.POST))
form_result.update({'perm_user_name': User.DEFAULT_USER})
PermissionModel().update_application_permissions(form_result)
settings = [
('register_message', 'default_register_message'),
]
for setting, form_key in settings:
sett = SettingsModel().create_or_update_setting(
setting, form_result[form_key])
Session().add(sett)
Session().commit()
h.flash(_('Application permissions updated successfully'),
category='success')
except formencode.Invalid as errors:
defaults = errors.value
data = render(
'rhodecode:templates/admin/permissions/permissions.mako',
self._get_template_context(c), self.request)
html = formencode.htmlfill.render(
data,
defaults=defaults,
forms: fixed error handling in forms
r5018 errors=errors.unpack_errors() or {},
global-permissions: ported controller to pyramid view....
r1941 prefix_error=False,
encoding="UTF-8",
force_defaults=False
)
return Response(html)
except Exception:
log.exception("Exception during update of permissions")
h.flash(_('Error occurred during update of permissions'),
category='error')
app: use simpler way to extract default_user_id, this will be now registered at server...
r4332 affected_user_ids = [User.get_default_user_id()]
permissions: properly flush user cache permissions in more cases of permission changes....
r3824 PermissionModel().trigger_permission_flush(affected_user_ids)
permissions: flush default user permissions on global app permission changes.
r3412
global-permissions: ported controller to pyramid view....
r1941 raise HTTPFound(h.route_path('admin_permissions_application'))
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
def permissions_objects(self):
c = self.load_default_context()
c.active = 'objects'
c.user = User.get_default_user(refresh=True)
defaults = {}
defaults.update(c.user.get_default_perms())
data = render(
'rhodecode:templates/admin/permissions/permissions.mako',
self._get_template_context(c), self.request)
html = formencode.htmlfill.render(
data,
defaults=defaults,
encoding="UTF-8",
force_defaults=False
)
return Response(html)
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@CSRFRequired()
def permissions_objects_update(self):
_ = self.request.translate
c = self.load_default_context()
c.active = 'objects'
_form = ObjectPermissionsForm(
pylons: remove pylons as dependency...
r2351 self.request.translate,
global-permissions: ported controller to pyramid view....
r1941 [x[0] for x in c.repo_perms_choices],
[x[0] for x in c.group_perms_choices],
branch permissions: added logic to define in UI branch permissions....
r2975 [x[0] for x in c.user_group_perms_choices],
)()
global-permissions: ported controller to pyramid view....
r1941
try:
form_result = _form.to_python(dict(self.request.POST))
form_result.update({'perm_user_name': User.DEFAULT_USER})
PermissionModel().update_object_permissions(form_result)
Session().commit()
h.flash(_('Object permissions updated successfully'),
category='success')
except formencode.Invalid as errors:
defaults = errors.value
data = render(
'rhodecode:templates/admin/permissions/permissions.mako',
self._get_template_context(c), self.request)
html = formencode.htmlfill.render(
data,
defaults=defaults,
forms: fixed error handling in forms
r5018 errors=errors.unpack_errors() or {},
global-permissions: ported controller to pyramid view....
r1941 prefix_error=False,
encoding="UTF-8",
force_defaults=False
)
return Response(html)
except Exception:
log.exception("Exception during update of permissions")
h.flash(_('Error occurred during update of permissions'),
category='error')
app: use simpler way to extract default_user_id, this will be now registered at server...
r4332 affected_user_ids = [User.get_default_user_id()]
permissions: properly flush user cache permissions in more cases of permission changes....
r3824 PermissionModel().trigger_permission_flush(affected_user_ids)
permissions: flush default user permissions on global app permission changes.
r3412
global-permissions: ported controller to pyramid view....
r1941 raise HTTPFound(h.route_path('admin_permissions_object'))
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
branch permissions: added logic to define in UI branch permissions....
r2975 def permissions_branch(self):
c = self.load_default_context()
c.active = 'branch'
c.user = User.get_default_user(refresh=True)
defaults = {}
defaults.update(c.user.get_default_perms())
data = render(
'rhodecode:templates/admin/permissions/permissions.mako',
self._get_template_context(c), self.request)
html = formencode.htmlfill.render(
data,
defaults=defaults,
encoding="UTF-8",
force_defaults=False
)
return Response(html)
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
global-permissions: ported controller to pyramid view....
r1941 def permissions_global(self):
c = self.load_default_context()
c.active = 'global'
c.user = User.get_default_user(refresh=True)
defaults = {}
defaults.update(c.user.get_default_perms())
data = render(
'rhodecode:templates/admin/permissions/permissions.mako',
self._get_template_context(c), self.request)
html = formencode.htmlfill.render(
data,
defaults=defaults,
encoding="UTF-8",
force_defaults=False
)
return Response(html)
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@CSRFRequired()
def permissions_global_update(self):
_ = self.request.translate
c = self.load_default_context()
c.active = 'global'
_form = UserPermissionsForm(
pylons: remove pylons as dependency...
r2351 self.request.translate,
global-permissions: ported controller to pyramid view....
r1941 [x[0] for x in c.repo_create_choices],
[x[0] for x in c.repo_create_on_write_choices],
[x[0] for x in c.repo_group_create_choices],
[x[0] for x in c.user_group_create_choices],
[x[0] for x in c.fork_choices],
[x[0] for x in c.inherit_default_permission_choices])()
try:
form_result = _form.to_python(dict(self.request.POST))
form_result.update({'perm_user_name': User.DEFAULT_USER})
PermissionModel().update_user_permissions(form_result)
Session().commit()
h.flash(_('Global permissions updated successfully'),
category='success')
except formencode.Invalid as errors:
defaults = errors.value
data = render(
'rhodecode:templates/admin/permissions/permissions.mako',
self._get_template_context(c), self.request)
html = formencode.htmlfill.render(
data,
defaults=defaults,
forms: fixed error handling in forms
r5018 errors=errors.unpack_errors() or {},
global-permissions: ported controller to pyramid view....
r1941 prefix_error=False,
encoding="UTF-8",
force_defaults=False
)
return Response(html)
except Exception:
log.exception("Exception during update of permissions")
h.flash(_('Error occurred during update of permissions'),
category='error')
app: use simpler way to extract default_user_id, this will be now registered at server...
r4332 affected_user_ids = [User.get_default_user_id()]
permissions: properly flush user cache permissions in more cases of permission changes....
r3824 PermissionModel().trigger_permission_flush(affected_user_ids)
permissions: flush default user permissions on global app permission changes.
r3412
global-permissions: ported controller to pyramid view....
r1941 raise HTTPFound(h.route_path('admin_permissions_global'))
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
def permissions_ips(self):
c = self.load_default_context()
c.active = 'ips'
c.user = User.get_default_user(refresh=True)
c.user_ip_map = (
UserIpMap.query().filter(UserIpMap.user == c.user).all())
return self._get_template_context(c)
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
def permissions_overview(self):
c = self.load_default_context()
c.active = 'perms'
c.user = User.get_default_user(refresh=True)
users: make AuthUser propert a method, and allow override of params.
r1997 c.perm_user = c.user.AuthUser()
global-permissions: ported controller to pyramid view....
r1941 return self._get_template_context(c)
permissions: expose new view that lists all available views for usage in whitelist access.
r1943
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
def auth_token_access(self):
from rhodecode import CONFIG
c = self.load_default_context()
c.active = 'auth_token_access'
c.user = User.get_default_user(refresh=True)
users: make AuthUser propert a method, and allow override of params.
r1997 c.perm_user = c.user.AuthUser()
permissions: expose new view that lists all available views for usage in whitelist access.
r1943
mapper = self.request.registry.queryUtility(IRoutesMapper)
c.view_data = []
python3: fixed various code issues...
r4973 _argument_prog = re.compile(r'\{(.*?)\}|:\((.*)\)')
permissions: expose new view that lists all available views for usage in whitelist access.
r1943 introspector = self.request.registry.introspector
view_intr = {}
for view_data in introspector.get_category('views'):
intr = view_data['introspectable']
if 'route_name' in intr and intr['attr']:
auth-token: fix way we generate entries inside the list of whitelist names....
r1950 view_intr[intr['route_name']] = '{}:{}'.format(
python3: fixed usage of .next() and .func_name
r4936 str(intr['derived_callable'].__name__), intr['attr']
permissions: expose new view that lists all available views for usage in whitelist access.
r1943 )
c.whitelist_key = 'api_access_controllers_whitelist'
c.whitelist_file = CONFIG.get('__file__')
whitelist_views = aslist(
CONFIG.get(c.whitelist_key), sep=',')
for route_info in mapper.get_routes():
if not route_info.name.startswith('__'):
routepath = route_info.pattern
def replace(matchobj):
if matchobj.group(1):
return "{%s}" % matchobj.group(1).split(':')[0]
else:
return "{%s}" % matchobj.group(2)
routepath = _argument_prog.sub(replace, routepath)
if not routepath.startswith('/'):
routepath = '/' + routepath
view_fqn = view_intr.get(route_info.name, 'NOT AVAILABLE')
active = view_fqn in whitelist_views
c.view_data.append((route_info.name, view_fqn, routepath, active))
c.whitelist_views = whitelist_views
return self._get_template_context(c)
ssh-keys: added admin panel for managing globally all SSH Keys....
r2042
ssh: fixed tests for disabled ssh support
r2047 def ssh_enabled(self):
return self.request.registry.settings.get(
'ssh.generate_authorized_keyfile')
ssh-keys: added admin panel for managing globally all SSH Keys....
r2042 @LoginRequired()
@HasPermissionAllDecorator('hg.admin')
def ssh_keys(self):
c = self.load_default_context()
c.active = 'ssh_keys'
ssh: fixed tests for disabled ssh support
r2047 c.ssh_enabled = self.ssh_enabled()
ssh-keys: added admin panel for managing globally all SSH Keys....
r2042 return self._get_template_context(c)
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
def ssh_keys_data(self):
_ = self.request.translate
pylons: remove pylons as dependency...
r2351 self.load_default_context()
ssh-keys: added admin panel for managing globally all SSH Keys....
r2042 column_map = {
'fingerprint': 'ssh_key_fingerprint',
'username': User.username
}
draw, start, limit = self._extract_chunk(self.request)
search_q, order_by, order_dir = self._extract_ordering(
self.request, column_map=column_map)
ssh_keys_data_total_count = UserSshKeys.query()\
.count()
# json generate
base_q = UserSshKeys.query().join(UserSshKeys.user)
if search_q:
apps: modernize for python3
r5093 like_expression = f'%{safe_str(search_q)}%'
ssh-keys: added admin panel for managing globally all SSH Keys....
r2042 base_q = base_q.filter(or_(
User.username.ilike(like_expression),
UserSshKeys.ssh_key_fingerprint.ilike(like_expression),
))
users_data_total_filtered_count = base_q.count()
sort_col = self._get_order_col(order_by, UserSshKeys)
if sort_col:
if order_dir == 'asc':
# handle null values properly to order by NULL last
if order_by in ['created_on']:
sort_col = coalesce(sort_col, datetime.date.max)
sort_col = sort_col.asc()
else:
# handle null values properly to order by NULL last
if order_by in ['created_on']:
sort_col = coalesce(sort_col, datetime.date.min)
sort_col = sort_col.desc()
base_q = base_q.order_by(sort_col)
base_q = base_q.offset(start).limit(limit)
ssh_keys = base_q.all()
ssh_keys_data = []
for ssh_key in ssh_keys:
ssh_keys_data.append({
"username": h.gravatar_with_user(self.request, ssh_key.user.username),
"fingerprint": ssh_key.ssh_key_fingerprint,
"description": ssh_key.description,
"created_on": h.format_date(ssh_key.created_on),
ssh-keys: expose last access time on admin summary page for SSH keys.
r2134 "accessed_on": h.format_date(ssh_key.accessed_on),
ssh-keys: added admin panel for managing globally all SSH Keys....
r2042 "action": h.link_to(
_('Edit'), h.route_path('edit_user_ssh_keys',
user_id=ssh_key.user.user_id))
})
data = ({
'draw': draw,
'data': ssh_keys_data,
'recordsTotal': ssh_keys_data_total_count,
'recordsFiltered': users_data_total_filtered_count,
})
return data
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@CSRFRequired()
def ssh_keys_update(self):
_ = self.request.translate
self.load_default_context()
ssh: fixed tests for disabled ssh support
r2047 ssh_enabled = self.ssh_enabled()
ssh-keys: added admin panel for managing globally all SSH Keys....
r2042 key_file = self.request.registry.settings.get(
'ssh.authorized_keys_file_path')
if ssh_enabled:
permissions: flush default user permissions on global app permission changes.
r3412 events.trigger(SshKeyFileChangeEvent(), self.request.registry)
ssh-keys: added admin panel for managing globally all SSH Keys....
r2042 h.flash(_('Updated SSH keys file: {}').format(key_file),
category='success')
else:
h.flash(_('SSH key support is disabled in .ini file'),
category='warning')
raise HTTPFound(h.route_path('admin_permissions_ssh_keys'))