##// END OF EJS Templates
diffs: use whole chunk diff to calculate if it's oversized or not....
diffs: use whole chunk diff to calculate if it's oversized or not. - This fixes an issue if a file is added that has very large number of small lines. In this case the time to detect if the diff should be limited was very very long and CPU intensive.

File last commit:

r2047:863217cc default
r2070:7939c6bf default
Show More
permissions.py
479 lines | 17.3 KiB | text/x-python | PythonLexer
global-permissions: ported controller to pyramid view....
r1941 # -*- coding: utf-8 -*-
# Copyright (C) 2016-2017 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
permissions: expose new view that lists all available views for usage in whitelist access.
r1943 import re
global-permissions: ported controller to pyramid view....
r1941 import logging
import formencode
ssh-keys: added admin panel for managing globally all SSH Keys....
r2042 import datetime
permissions: expose new view that lists all available views for usage in whitelist access.
r1943 from pyramid.interfaces import IRoutesMapper
global-permissions: ported controller to pyramid view....
r1941
from pyramid.view import view_config
from pyramid.httpexceptions import HTTPFound
from pyramid.renderers import render
from pyramid.response import Response
ssh-keys: added admin panel for managing globally all SSH Keys....
r2042 from rhodecode.apps._base import BaseAppView, DataGridAppView
from rhodecode.apps.ssh_support import SshKeyFileChangeEvent
from rhodecode.events import trigger
global-permissions: ported controller to pyramid view....
r1941
from rhodecode.lib import helpers as h
from rhodecode.lib.auth import (
LoginRequired, HasPermissionAllDecorator, CSRFRequired)
ssh-keys: added admin panel for managing globally all SSH Keys....
r2042 from rhodecode.lib.utils2 import aslist, safe_unicode
from rhodecode.model.db import or_, joinedload, coalesce, User, UserIpMap, UserSshKeys
global-permissions: ported controller to pyramid view....
r1941 from rhodecode.model.forms import (
ApplicationPermissionsForm, ObjectPermissionsForm, UserPermissionsForm)
from rhodecode.model.meta import Session
from rhodecode.model.permission import PermissionModel
from rhodecode.model.settings import SettingsModel
log = logging.getLogger(__name__)
ssh-keys: added admin panel for managing globally all SSH Keys....
r2042 class AdminPermissionsView(BaseAppView, DataGridAppView):
global-permissions: ported controller to pyramid view....
r1941 def load_default_context(self):
c = self._get_local_tmpl_context()
self._register_global_c(c)
PermissionModel().set_global_permission_choices(
c, gettext_translator=self.request.translate)
return c
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@view_config(
route_name='admin_permissions_application', request_method='GET',
renderer='rhodecode:templates/admin/permissions/permissions.mako')
def permissions_application(self):
c = self.load_default_context()
c.active = 'application'
c.user = User.get_default_user(refresh=True)
app_settings = SettingsModel().get_all_settings()
defaults = {
'anonymous': c.user.active,
'default_register_message': app_settings.get(
'rhodecode_register_message')
}
defaults.update(c.user.get_default_perms())
data = render('rhodecode:templates/admin/permissions/permissions.mako',
self._get_template_context(c), self.request)
html = formencode.htmlfill.render(
data,
defaults=defaults,
encoding="UTF-8",
force_defaults=False
)
return Response(html)
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@CSRFRequired()
@view_config(
route_name='admin_permissions_application_update', request_method='POST',
renderer='rhodecode:templates/admin/permissions/permissions.mako')
def permissions_application_update(self):
_ = self.request.translate
c = self.load_default_context()
c.active = 'application'
_form = ApplicationPermissionsForm(
[x[0] for x in c.register_choices],
[x[0] for x in c.password_reset_choices],
[x[0] for x in c.extern_activate_choices])()
try:
form_result = _form.to_python(dict(self.request.POST))
form_result.update({'perm_user_name': User.DEFAULT_USER})
PermissionModel().update_application_permissions(form_result)
settings = [
('register_message', 'default_register_message'),
]
for setting, form_key in settings:
sett = SettingsModel().create_or_update_setting(
setting, form_result[form_key])
Session().add(sett)
Session().commit()
h.flash(_('Application permissions updated successfully'),
category='success')
except formencode.Invalid as errors:
defaults = errors.value
data = render(
'rhodecode:templates/admin/permissions/permissions.mako',
self._get_template_context(c), self.request)
html = formencode.htmlfill.render(
data,
defaults=defaults,
errors=errors.error_dict or {},
prefix_error=False,
encoding="UTF-8",
force_defaults=False
)
return Response(html)
except Exception:
log.exception("Exception during update of permissions")
h.flash(_('Error occurred during update of permissions'),
category='error')
raise HTTPFound(h.route_path('admin_permissions_application'))
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@view_config(
route_name='admin_permissions_object', request_method='GET',
renderer='rhodecode:templates/admin/permissions/permissions.mako')
def permissions_objects(self):
c = self.load_default_context()
c.active = 'objects'
c.user = User.get_default_user(refresh=True)
defaults = {}
defaults.update(c.user.get_default_perms())
data = render(
'rhodecode:templates/admin/permissions/permissions.mako',
self._get_template_context(c), self.request)
html = formencode.htmlfill.render(
data,
defaults=defaults,
encoding="UTF-8",
force_defaults=False
)
return Response(html)
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@CSRFRequired()
@view_config(
route_name='admin_permissions_object_update', request_method='POST',
renderer='rhodecode:templates/admin/permissions/permissions.mako')
def permissions_objects_update(self):
_ = self.request.translate
c = self.load_default_context()
c.active = 'objects'
_form = ObjectPermissionsForm(
[x[0] for x in c.repo_perms_choices],
[x[0] for x in c.group_perms_choices],
[x[0] for x in c.user_group_perms_choices])()
try:
form_result = _form.to_python(dict(self.request.POST))
form_result.update({'perm_user_name': User.DEFAULT_USER})
PermissionModel().update_object_permissions(form_result)
Session().commit()
h.flash(_('Object permissions updated successfully'),
category='success')
except formencode.Invalid as errors:
defaults = errors.value
data = render(
'rhodecode:templates/admin/permissions/permissions.mako',
self._get_template_context(c), self.request)
html = formencode.htmlfill.render(
data,
defaults=defaults,
errors=errors.error_dict or {},
prefix_error=False,
encoding="UTF-8",
force_defaults=False
)
return Response(html)
except Exception:
log.exception("Exception during update of permissions")
h.flash(_('Error occurred during update of permissions'),
category='error')
raise HTTPFound(h.route_path('admin_permissions_object'))
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@view_config(
route_name='admin_permissions_global', request_method='GET',
renderer='rhodecode:templates/admin/permissions/permissions.mako')
def permissions_global(self):
c = self.load_default_context()
c.active = 'global'
c.user = User.get_default_user(refresh=True)
defaults = {}
defaults.update(c.user.get_default_perms())
data = render(
'rhodecode:templates/admin/permissions/permissions.mako',
self._get_template_context(c), self.request)
html = formencode.htmlfill.render(
data,
defaults=defaults,
encoding="UTF-8",
force_defaults=False
)
return Response(html)
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@CSRFRequired()
@view_config(
route_name='admin_permissions_global_update', request_method='POST',
renderer='rhodecode:templates/admin/permissions/permissions.mako')
def permissions_global_update(self):
_ = self.request.translate
c = self.load_default_context()
c.active = 'global'
_form = UserPermissionsForm(
[x[0] for x in c.repo_create_choices],
[x[0] for x in c.repo_create_on_write_choices],
[x[0] for x in c.repo_group_create_choices],
[x[0] for x in c.user_group_create_choices],
[x[0] for x in c.fork_choices],
[x[0] for x in c.inherit_default_permission_choices])()
try:
form_result = _form.to_python(dict(self.request.POST))
form_result.update({'perm_user_name': User.DEFAULT_USER})
PermissionModel().update_user_permissions(form_result)
Session().commit()
h.flash(_('Global permissions updated successfully'),
category='success')
except formencode.Invalid as errors:
defaults = errors.value
data = render(
'rhodecode:templates/admin/permissions/permissions.mako',
self._get_template_context(c), self.request)
html = formencode.htmlfill.render(
data,
defaults=defaults,
errors=errors.error_dict or {},
prefix_error=False,
encoding="UTF-8",
force_defaults=False
)
return Response(html)
except Exception:
log.exception("Exception during update of permissions")
h.flash(_('Error occurred during update of permissions'),
category='error')
raise HTTPFound(h.route_path('admin_permissions_global'))
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@view_config(
route_name='admin_permissions_ips', request_method='GET',
renderer='rhodecode:templates/admin/permissions/permissions.mako')
def permissions_ips(self):
c = self.load_default_context()
c.active = 'ips'
c.user = User.get_default_user(refresh=True)
c.user_ip_map = (
UserIpMap.query().filter(UserIpMap.user == c.user).all())
return self._get_template_context(c)
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@view_config(
route_name='admin_permissions_overview', request_method='GET',
renderer='rhodecode:templates/admin/permissions/permissions.mako')
def permissions_overview(self):
c = self.load_default_context()
c.active = 'perms'
c.user = User.get_default_user(refresh=True)
users: make AuthUser propert a method, and allow override of params.
r1997 c.perm_user = c.user.AuthUser()
global-permissions: ported controller to pyramid view....
r1941 return self._get_template_context(c)
permissions: expose new view that lists all available views for usage in whitelist access.
r1943
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@view_config(
route_name='admin_permissions_auth_token_access', request_method='GET',
renderer='rhodecode:templates/admin/permissions/permissions.mako')
def auth_token_access(self):
from rhodecode import CONFIG
c = self.load_default_context()
c.active = 'auth_token_access'
c.user = User.get_default_user(refresh=True)
users: make AuthUser propert a method, and allow override of params.
r1997 c.perm_user = c.user.AuthUser()
permissions: expose new view that lists all available views for usage in whitelist access.
r1943
mapper = self.request.registry.queryUtility(IRoutesMapper)
c.view_data = []
_argument_prog = re.compile('\{(.*?)\}|:\((.*)\)')
introspector = self.request.registry.introspector
view_intr = {}
for view_data in introspector.get_category('views'):
intr = view_data['introspectable']
if 'route_name' in intr and intr['attr']:
auth-token: fix way we generate entries inside the list of whitelist names....
r1950 view_intr[intr['route_name']] = '{}:{}'.format(
permissions: expose new view that lists all available views for usage in whitelist access.
r1943 str(intr['derived_callable'].func_name), intr['attr']
)
c.whitelist_key = 'api_access_controllers_whitelist'
c.whitelist_file = CONFIG.get('__file__')
whitelist_views = aslist(
CONFIG.get(c.whitelist_key), sep=',')
for route_info in mapper.get_routes():
if not route_info.name.startswith('__'):
routepath = route_info.pattern
def replace(matchobj):
if matchobj.group(1):
return "{%s}" % matchobj.group(1).split(':')[0]
else:
return "{%s}" % matchobj.group(2)
routepath = _argument_prog.sub(replace, routepath)
if not routepath.startswith('/'):
routepath = '/' + routepath
view_fqn = view_intr.get(route_info.name, 'NOT AVAILABLE')
active = view_fqn in whitelist_views
c.view_data.append((route_info.name, view_fqn, routepath, active))
c.whitelist_views = whitelist_views
return self._get_template_context(c)
ssh-keys: added admin panel for managing globally all SSH Keys....
r2042
ssh: fixed tests for disabled ssh support
r2047 def ssh_enabled(self):
return self.request.registry.settings.get(
'ssh.generate_authorized_keyfile')
ssh-keys: added admin panel for managing globally all SSH Keys....
r2042 @LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@view_config(
route_name='admin_permissions_ssh_keys', request_method='GET',
renderer='rhodecode:templates/admin/permissions/permissions.mako')
def ssh_keys(self):
c = self.load_default_context()
c.active = 'ssh_keys'
ssh: fixed tests for disabled ssh support
r2047 c.ssh_enabled = self.ssh_enabled()
ssh-keys: added admin panel for managing globally all SSH Keys....
r2042 return self._get_template_context(c)
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@view_config(
route_name='admin_permissions_ssh_keys_data', request_method='GET',
renderer='json_ext', xhr=True)
def ssh_keys_data(self):
_ = self.request.translate
column_map = {
'fingerprint': 'ssh_key_fingerprint',
'username': User.username
}
draw, start, limit = self._extract_chunk(self.request)
search_q, order_by, order_dir = self._extract_ordering(
self.request, column_map=column_map)
ssh_keys_data_total_count = UserSshKeys.query()\
.count()
# json generate
base_q = UserSshKeys.query().join(UserSshKeys.user)
if search_q:
like_expression = u'%{}%'.format(safe_unicode(search_q))
base_q = base_q.filter(or_(
User.username.ilike(like_expression),
UserSshKeys.ssh_key_fingerprint.ilike(like_expression),
))
users_data_total_filtered_count = base_q.count()
sort_col = self._get_order_col(order_by, UserSshKeys)
if sort_col:
if order_dir == 'asc':
# handle null values properly to order by NULL last
if order_by in ['created_on']:
sort_col = coalesce(sort_col, datetime.date.max)
sort_col = sort_col.asc()
else:
# handle null values properly to order by NULL last
if order_by in ['created_on']:
sort_col = coalesce(sort_col, datetime.date.min)
sort_col = sort_col.desc()
base_q = base_q.order_by(sort_col)
base_q = base_q.offset(start).limit(limit)
ssh_keys = base_q.all()
ssh_keys_data = []
for ssh_key in ssh_keys:
ssh_keys_data.append({
"username": h.gravatar_with_user(self.request, ssh_key.user.username),
"fingerprint": ssh_key.ssh_key_fingerprint,
"description": ssh_key.description,
"created_on": h.format_date(ssh_key.created_on),
"action": h.link_to(
_('Edit'), h.route_path('edit_user_ssh_keys',
user_id=ssh_key.user.user_id))
})
data = ({
'draw': draw,
'data': ssh_keys_data,
'recordsTotal': ssh_keys_data_total_count,
'recordsFiltered': users_data_total_filtered_count,
})
return data
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@CSRFRequired()
@view_config(
route_name='admin_permissions_ssh_keys_update', request_method='POST',
renderer='rhodecode:templates/admin/permissions/permissions.mako')
def ssh_keys_update(self):
_ = self.request.translate
self.load_default_context()
ssh: fixed tests for disabled ssh support
r2047 ssh_enabled = self.ssh_enabled()
ssh-keys: added admin panel for managing globally all SSH Keys....
r2042 key_file = self.request.registry.settings.get(
'ssh.authorized_keys_file_path')
if ssh_enabled:
trigger(SshKeyFileChangeEvent(), self.request.registry)
h.flash(_('Updated SSH keys file: {}').format(key_file),
category='success')
else:
h.flash(_('SSH key support is disabled in .ini file'),
category='warning')
raise HTTPFound(h.route_path('admin_permissions_ssh_keys'))