# HG changeset patch # User Marcin Kuzminski # Date 2017-02-28 21:21:24 # Node ID b18f6dcff1a0ed81f15169958b9baee197c965fc # Parent a7de7554e74c7072c1f812949b019e35f917135d auth: make the perms decorators and function pyramid compatible. diff --git a/rhodecode/lib/auth.py b/rhodecode/lib/auth.py --- a/rhodecode/lib/auth.py +++ b/rhodecode/lib/auth.py @@ -22,24 +22,22 @@ authentication and permission libraries """ +import os import inspect import collections import fnmatch import hashlib import itertools import logging -import os import random -import time import traceback from functools import wraps import ipaddress -from pyramid.httpexceptions import HTTPForbidden +from pyramid.httpexceptions import HTTPForbidden, HTTPFound from pylons import url, request from pylons.controllers.util import abort, redirect from pylons.i18n.translation import _ -from sqlalchemy import or_ from sqlalchemy.orm.exc import ObjectDeletedError from sqlalchemy.orm import joinedload from zope.cachedescriptors.property import Lazy as LazyProperty @@ -1256,7 +1254,6 @@ class LoginRequired(object): auth_token_access_valid)) # we preserve the get PARAM came_from = request.path_qs - log.debug('redirecting to login page with %s' % (came_from,)) return redirect( h.route_path('login', _query={'came_from': came_from})) @@ -1348,6 +1345,20 @@ class PermsDecorator(object): def __call__(self, func): return get_cython_compat_decorator(self.__wrapper, func) + def _get_request(self): + from pyramid.threadlocal import get_current_request + pyramid_request = get_current_request() + if not pyramid_request: + # return global request of pylons incase pyramid one isn't available + return request + return pyramid_request + + def _get_came_from(self): + _request = self._get_request() + + # both pylons/pyramid has this attribute + return _request.path_qs + def __wrapper(self, func, *fargs, **fkwargs): cls = fargs[0] _user = cls._rhodecode_user @@ -1364,17 +1375,16 @@ class PermsDecorator(object): anonymous = _user.username == User.DEFAULT_USER if anonymous: - came_from = request.path_qs - import rhodecode.lib.helpers as h + came_from = self._get_came_from() h.flash(_('You need to be signed in to view this page'), category='warning') - return redirect( + raise HTTPFound( h.route_path('login', _query={'came_from': came_from})) else: # redirect with forbidden ret code - return abort(403) + raise HTTPForbidden() def check_permissions(self, user): """Dummy function for overriding""" @@ -1413,10 +1423,13 @@ class HasRepoPermissionAllDecorator(Perm Checks for access permission for all given predicates for specific repository. All of them have to be meet in order to fulfill the request """ + def _get_repo_name(self): + _request = self._get_request() + return get_repo_slug(_request) def check_permissions(self, user): perms = user.permissions - repo_name = get_repo_slug(request) + repo_name = self._get_repo_name() try: user_perms = set([perms['repositories'][repo_name]]) except KeyError: @@ -1431,10 +1444,13 @@ class HasRepoPermissionAnyDecorator(Perm Checks for access permission for any of given predicates for specific repository. In order to fulfill the request any of predicates must be meet """ + def _get_repo_name(self): + _request = self._get_request() + return get_repo_slug(_request) def check_permissions(self, user): perms = user.permissions - repo_name = get_repo_slug(request) + repo_name = self._get_repo_name() try: user_perms = set([perms['repositories'][repo_name]]) except KeyError: @@ -1451,10 +1467,13 @@ class HasRepoGroupPermissionAllDecorator repository group. All of them have to be meet in order to fulfill the request """ + def _get_repo_group_name(self): + _request = self._get_request() + return get_repo_group_slug(_request) def check_permissions(self, user): perms = user.permissions - group_name = get_repo_group_slug(request) + group_name = self._get_repo_group_name() try: user_perms = set([perms['repositories_groups'][group_name]]) except KeyError: @@ -1471,10 +1490,13 @@ class HasRepoGroupPermissionAnyDecorator repository group. In order to fulfill the request any of predicates must be met """ + def _get_repo_group_name(self): + _request = self._get_request() + return get_repo_group_slug(_request) def check_permissions(self, user): perms = user.permissions - group_name = get_repo_group_slug(request) + group_name = self._get_repo_group_name() try: user_perms = set([perms['repositories_groups'][group_name]]) except KeyError: @@ -1490,10 +1512,13 @@ class HasUserGroupPermissionAllDecorator Checks for access permission for all given predicates for specific user group. All of them have to be meet in order to fulfill the request """ + def _get_user_group_name(self): + _request = self._get_request() + return get_user_group_slug(_request) def check_permissions(self, user): perms = user.permissions - group_name = get_user_group_slug(request) + group_name = self._get_user_group_name() try: user_perms = set([perms['user_groups'][group_name]]) except KeyError: @@ -1509,10 +1534,13 @@ class HasUserGroupPermissionAnyDecorator Checks for access permission for any of given predicates for specific user group. In order to fulfill the request any of predicates must be meet """ + def _get_user_group_name(self): + _request = self._get_request() + return get_user_group_slug(_request) def check_permissions(self, user): perms = user.permissions - group_name = get_user_group_slug(request) + group_name = self._get_user_group_name() try: user_perms = set([perms['user_groups'][group_name]]) except KeyError: @@ -1575,6 +1603,14 @@ class PermsFunction(object): check_scope, user, check_location) return False + def _get_request(self): + from pyramid.threadlocal import get_current_request + pyramid_request = get_current_request() + if not pyramid_request: + # return global request of pylons incase pyramid one isn't available + return request + return pyramid_request + def _get_check_scope(self, cls_name): return { 'HasPermissionAll': 'GLOBAL', @@ -1613,10 +1649,14 @@ class HasRepoPermissionAll(PermsFunction self.repo_name = repo_name return super(HasRepoPermissionAll, self).__call__(check_location, user) - def check_permissions(self, user): + def _get_repo_name(self): if not self.repo_name: - self.repo_name = get_repo_slug(request) + _request = self._get_request() + self.repo_name = get_repo_slug(_request) + return self.repo_name + def check_permissions(self, user): + self.repo_name = self._get_repo_name() perms = user.permissions try: user_perms = set([perms['repositories'][self.repo_name]]) @@ -1632,10 +1672,13 @@ class HasRepoPermissionAny(PermsFunction self.repo_name = repo_name return super(HasRepoPermissionAny, self).__call__(check_location, user) - def check_permissions(self, user): + def _get_repo_name(self): if not self.repo_name: self.repo_name = get_repo_slug(request) + return self.repo_name + def check_permissions(self, user): + self.repo_name = self._get_repo_name() perms = user.permissions try: user_perms = set([perms['repositories'][self.repo_name]]) diff --git a/rhodecode/lib/utils.py b/rhodecode/lib/utils.py --- a/rhodecode/lib/utils.py +++ b/rhodecode/lib/utils.py @@ -42,6 +42,7 @@ from paste.script.command import Command from webhelpers.text import collapse, remove_formatting, strip_tags from mako import exceptions from pyramid.threadlocal import get_current_registry +from pyramid.request import Request from rhodecode.lib.fakemod import create_module from rhodecode.lib.vcs.backends.base import Config @@ -95,28 +96,43 @@ def repo_name_slug(value): # PERM DECORATOR HELPERS FOR EXTRACTING NAMES FOR PERM CHECKS #============================================================================== def get_repo_slug(request): - _repo = request.environ['pylons.routes_dict'].get('repo_name') + if isinstance(request, Request) and getattr(request, 'matchdict', None): + # pyramid + _repo = request.matchdict.get('repo_name') + else: + _repo = request.environ['pylons.routes_dict'].get('repo_name') + if _repo: _repo = _repo.rstrip('/') return _repo def get_repo_group_slug(request): - _group = request.environ['pylons.routes_dict'].get('group_name') + if isinstance(request, Request) and getattr(request, 'matchdict', None): + # pyramid + _group = request.matchdict.get('group_name') + else: + _group = request.environ['pylons.routes_dict'].get('group_name') + if _group: _group = _group.rstrip('/') return _group def get_user_group_slug(request): - _group = request.environ['pylons.routes_dict'].get('user_group_id') + if isinstance(request, Request) and getattr(request, 'matchdict', None): + # pyramid + _group = request.matchdict.get('user_group_id') + else: + _group = request.environ['pylons.routes_dict'].get('user_group_id') + try: _group = UserGroup.get(_group) if _group: _group = _group.users_group_name except Exception: log.debug(traceback.format_exc()) - #catch all failures here + # catch all failures here pass return _group diff --git a/rhodecode/tests/functional/test_admin_users.py b/rhodecode/tests/functional/test_admin_users.py --- a/rhodecode/tests/functional/test_admin_users.py +++ b/rhodecode/tests/functional/test_admin_users.py @@ -418,9 +418,6 @@ class TestAdminUsersController(TestContr msg = 'Deleted 1 user groups' assert_session_flash(response, msg) - def test_show(self): - self.app.get(url('user', user_id=1)) - def test_edit(self): self.log_user() user = User.get_by_username(TEST_USER_ADMIN_LOGIN)