# HG changeset patch # User Marcin Kuzminski # Date 2018-08-29 18:22:43 # Node ID 095dcb4bb929309c9a2a5977e1cfb0fdda49b869 # Parent 7cabaaedb485de2599d4515f0a082f4b5c29270f branch-permissions: handle vcs operations and branch permissions. - use new exceptions for branch permissions - detect if we need to check permissions - calculate super-admin the same way as regular users for override capabilities - added new tests for branch permissions diff --git a/rhodecode/lib/auth.py b/rhodecode/lib/auth.py --- a/rhodecode/lib/auth.py +++ b/rhodecode/lib/auth.py @@ -404,14 +404,14 @@ class PermissionCalculator(object): def __init__( self, user_id, scope, user_is_admin, user_inherit_default_permissions, explicit, algo, - calculate_super_admin=False): + calculate_super_admin_as_user=False): self.user_id = user_id self.user_is_admin = user_is_admin self.inherit_default_permissions = user_inherit_default_permissions self.explicit = explicit self.algo = algo - self.calculate_super_admin = calculate_super_admin + self.calculate_super_admin_as_user = calculate_super_admin_as_user scope = scope or {} self.scope_repo_id = scope.get('repo_id') @@ -440,8 +440,8 @@ class PermissionCalculator(object): self.default_user_id, self.scope_repo_id) def calculate(self): - if self.user_is_admin and not self.calculate_super_admin: - return self._admin_permissions() + if self.user_is_admin and not self.calculate_super_admin_as_user: + return self._calculate_admin_permissions() self._calculate_global_default_permissions() self._calculate_global_permissions() @@ -452,7 +452,7 @@ class PermissionCalculator(object): self._calculate_user_group_permissions() return self._permission_structure() - def _admin_permissions(self): + def _calculate_admin_permissions(self): """ admin user have all default rights for repositories and groups set to admin @@ -479,13 +479,11 @@ class PermissionCalculator(object): self.permissions_user_groups[u_k] = p, PermOrigin.SUPER_ADMIN # branch permissions - # TODO(marcink): validate this, especially - # how this should work using multiple patterns specified ?? - # looks ok, but still needs double check !! - for perm in self.default_branch_repo_perms: - r_k = perm.UserRepoToPerm.repository.repo_name - p = 'branch.push_force' - self.permissions_repository_branches[r_k] = '*', p, PermOrigin.SUPER_ADMIN + # since super-admin also can have custom rule permissions + # we *always* need to calculate those inherited from default, and also explicit + self._calculate_default_permissions_repository_branches( + user_inherit_object_permissions=False) + self._calculate_repository_branch_permissions() return self._permission_structure() @@ -571,27 +569,7 @@ class PermissionCalculator(object): for perm in user_perms: self.permissions_global.add(perm.permission.permission_name) - def _calculate_default_permissions(self): - """ - Set default user permissions for repositories, repository branches, - repository groups, user groups taken from the default user. - - Calculate inheritance of object permissions based on what we have now - in GLOBAL permissions. We check if .false is in GLOBAL since this is - explicitly set. Inherit is the opposite of .false being there. - - .. note:: - - the syntax is little bit odd but what we need to check here is - the opposite of .false permission being in the list so even for - inconsistent state when both .true/.false is there - .false is more important - - """ - user_inherit_object_permissions = not ('hg.inherit_default_perms.false' - in self.permissions_global) - - # default permissions for repositories, taken from `default` user permissions + def _calculate_default_permissions_repositories(self, user_inherit_object_permissions): for perm in self.default_repo_perms: r_k = perm.UserRepoToPerm.repository.repo_name p = perm.Permission.permission_name @@ -624,7 +602,7 @@ class PermissionCalculator(object): o = PermOrigin.SUPER_ADMIN self.permissions_repositories[r_k] = p, o - # default permissions branch for repositories, taken from `default` user permissions + def _calculate_default_permissions_repository_branches(self, user_inherit_object_permissions): for perm in self.default_branch_repo_perms: r_k = perm.UserRepoToPerm.repository.repo_name @@ -641,7 +619,7 @@ class PermissionCalculator(object): # special dict that aggregates entries self.permissions_repository_branches[r_k] = pattern, p, o - # default permissions for repository groups taken from `default` user permission + def _calculate_default_permissions_repository_groups(self, user_inherit_object_permissions): for perm in self.default_repo_groups_perms: rg_k = perm.UserRepoGroupToPerm.group.group_name p = perm.Permission.permission_name @@ -666,7 +644,7 @@ class PermissionCalculator(object): o = PermOrigin.SUPER_ADMIN self.permissions_repository_groups[rg_k] = p, o - # default permissions for user groups taken from `default` user permission + def _calculate_default_permissions_user_groups(self, user_inherit_object_permissions): for perm in self.default_user_group_perms: u_k = perm.UserUserGroupToPerm.user_group.users_group_name p = perm.Permission.permission_name @@ -691,6 +669,39 @@ class PermissionCalculator(object): o = PermOrigin.SUPER_ADMIN self.permissions_user_groups[u_k] = p, o + def _calculate_default_permissions(self): + """ + Set default user permissions for repositories, repository branches, + repository groups, user groups taken from the default user. + + Calculate inheritance of object permissions based on what we have now + in GLOBAL permissions. We check if .false is in GLOBAL since this is + explicitly set. Inherit is the opposite of .false being there. + + .. note:: + + the syntax is little bit odd but what we need to check here is + the opposite of .false permission being in the list so even for + inconsistent state when both .true/.false is there + .false is more important + + """ + user_inherit_object_permissions = not ('hg.inherit_default_perms.false' + in self.permissions_global) + + # default permissions inherited from `default` user permissions + self._calculate_default_permissions_repositories( + user_inherit_object_permissions) + + self._calculate_default_permissions_repository_branches( + user_inherit_object_permissions) + + self._calculate_default_permissions_repository_groups( + user_inherit_object_permissions) + + self._calculate_default_permissions_user_groups( + user_inherit_object_permissions) + def _calculate_repository_permissions(self): """ Repository permissions for the current user. @@ -783,6 +794,7 @@ class PermissionCalculator(object): # any specified by the group permission user_repo_branch_perms = Permission.get_default_repo_branch_perms( self.user_id, self.scope_repo_id) + for perm in user_repo_branch_perms: r_k = perm.UserRepoToPerm.repository.repo_name @@ -799,7 +811,6 @@ class PermissionCalculator(object): # special dict that aggregates entries self.permissions_repository_branches[r_k] = pattern, p, o - def _calculate_repository_group_permissions(self): """ Repository group permissions for the current user. @@ -1036,14 +1047,14 @@ class AuthUser(object): @LazyProperty def permissions(self): - return self.get_perms(user=self, cache=False) + return self.get_perms(user=self, cache=None) @LazyProperty def permissions_safe(self): """ Filtered permissions excluding not allowed repositories """ - perms = self.get_perms(user=self, cache=False) + perms = self.get_perms(user=self, cache=None) perms['repositories'] = { k: v for k, v in perms['repositories'].items() @@ -1062,7 +1073,7 @@ class AuthUser(object): @LazyProperty def permissions_full_details(self): return self.get_perms( - user=self, cache=False, calculate_super_admin=True) + user=self, cache=None, calculate_super_admin=True) def permissions_with_scope(self, scope): """ @@ -1088,7 +1099,7 @@ class AuthUser(object): # store in cache to mimic how the @LazyProperty works, # the difference here is that we use the unique key calculated # from params and values - return self.get_perms(user=self, cache=False, scope=_scope) + return self.get_perms(user=self, cache=None, scope=_scope) def get_instance(self): return User.get(self.user_id) @@ -1143,7 +1154,7 @@ class AuthUser(object): log.debug('AuthUser: propagated user is now %s', self) def get_perms(self, user, scope=None, explicit=True, algo='higherwin', - calculate_super_admin=False, cache=False): + calculate_super_admin=False, cache=None): """ Fills user permission attribute with permissions taken from database works for permissions given for repositories, and for permissions that @@ -1158,6 +1169,9 @@ class AuthUser(object): it's multiple defined, eg user in two different groups. It also decides if explicit flag is turned off how to specify the permission for case when user is in a group + have defined separate permission + :param calculate_super_admin: calculate permissions for super-admin in the + same way as for regular user without speedups + :param cache: Use caching for calculation, None = let the cache backend decide """ user_id = user.user_id user_is_admin = user.is_admin @@ -1168,7 +1182,12 @@ class AuthUser(object): cache_seconds = safe_int( rhodecode.CONFIG.get('rc_cache.cache_perms.expiration_time')) - cache_on = cache or cache_seconds > 0 + if cache is None: + # let the backend cache decide + cache_on = cache_seconds > 0 + else: + cache_on = cache + log.debug( 'Computing PERMISSION tree for user %s scope `%s` ' 'with caching: %s[TTL: %ss]' % (user, scope, cache_on, cache_seconds or 0)) @@ -1186,9 +1205,10 @@ class AuthUser(object): explicit, algo, calculate_super_admin) start = time.time() - result = compute_perm_tree('permissions', user_id, scope, user_is_admin, - user_inherit_default_permissions, explicit, algo, - calculate_super_admin) + result = compute_perm_tree( + 'permissions', user_id, scope, user_is_admin, + user_inherit_default_permissions, explicit, algo, + calculate_super_admin) result_repr = [] for k in result: @@ -1340,6 +1360,35 @@ class AuthUser(object): 'not in %s' % (ip_addr, user_id, allowed_ips)) return False + def get_branch_permissions(self, repo_name, perms=None): + perms = perms or self.permissions_with_scope({'repo_name': repo_name}) + branch_perms = perms.get('repository_branches') + return branch_perms + + def get_rule_and_branch_permission(self, repo_name, branch_name): + """ + Check if this AuthUser has defined any permissions for branches. If any of + the rules match in order, we return the matching permissions + """ + + rule = default_perm = '' + + branch_perms = self.get_branch_permissions(repo_name=repo_name) + if not branch_perms: + return rule, default_perm + + repo_branch_perms = branch_perms.get(repo_name) + if not repo_branch_perms: + return rule, default_perm + + # now calculate the permissions + for pattern, branch_perm in repo_branch_perms.items(): + if fnmatch.fnmatch(branch_name, pattern): + rule = '`{}`=>{}'.format(pattern, branch_perm) + return rule, branch_perm + + return rule, default_perm + def __repr__(self): return ""\ % (self.user_id, self.username, self.ip_addr, self.is_authenticated) @@ -2084,23 +2133,22 @@ class HasPermissionAnyMiddleware(object) def __init__(self, *perms): self.required_perms = set(perms) - def __call__(self, user, repo_name): + def __call__(self, auth_user, repo_name): # repo_name MUST be unicode, since we handle keys in permission # dict by unicode repo_name = safe_unicode(repo_name) - user = AuthUser(user.user_id) log.debug( 'Checking VCS protocol permissions %s for user:%s repo:`%s`', - self.required_perms, user, repo_name) - - if self.check_permissions(user, repo_name): + self.required_perms, auth_user, repo_name) + + if self.check_permissions(auth_user, repo_name): log.debug('Permission to repo:`%s` GRANTED for user:%s @ %s', - repo_name, user, 'PermissionMiddleware') + repo_name, auth_user, 'PermissionMiddleware') return True else: log.debug('Permission to repo:`%s` DENIED for user:%s @ %s', - repo_name, user, 'PermissionMiddleware') + repo_name, auth_user, 'PermissionMiddleware') return False def check_permissions(self, user, repo_name): diff --git a/rhodecode/lib/base.py b/rhodecode/lib/base.py --- a/rhodecode/lib/base.py +++ b/rhodecode/lib/base.py @@ -153,7 +153,7 @@ def get_user_agent(environ): def vcs_operation_context( environ, repo_name, username, action, scm, check_locking=True, - is_shadow_repo=False): + is_shadow_repo=False, check_branch_perms=False, detect_force_push=False): """ Generate the context for a vcs operation, e.g. push or pull. @@ -193,6 +193,8 @@ def vcs_operation_context( 'user_agent': get_user_agent(environ), 'hooks': get_enabled_hook_classes(ui_settings), 'is_shadow_repo': is_shadow_repo, + 'detect_force_push': detect_force_push, + 'check_branch_perms': check_branch_perms, } return extras diff --git a/rhodecode/lib/exceptions.py b/rhodecode/lib/exceptions.py --- a/rhodecode/lib/exceptions.py +++ b/rhodecode/lib/exceptions.py @@ -107,6 +107,21 @@ class HTTPLockedRC(HTTPClientError): self.args = (message, ) +class HTTPBranchProtected(HTTPClientError): + """ + Special Exception For Indicating that branch is protected in RhodeCode, the + return code can be overwritten by _code keyword argument passed into constructors + """ + code = 403 + title = explanation = 'Branch Protected' + reason = None + + def __init__(self, message, *args, **kwargs): + self.title = self.explanation = message + super(HTTPBranchProtected, self).__init__(*args, **kwargs) + self.args = (message, ) + + class IMCCommitError(Exception): pass diff --git a/rhodecode/lib/hooks_base.py b/rhodecode/lib/hooks_base.py --- a/rhodecode/lib/hooks_base.py +++ b/rhodecode/lib/hooks_base.py @@ -32,7 +32,8 @@ from rhodecode import events from rhodecode.lib import helpers as h from rhodecode.lib import audit_logger from rhodecode.lib.utils2 import safe_str -from rhodecode.lib.exceptions import HTTPLockedRC, UserCreationError +from rhodecode.lib.exceptions import ( + HTTPLockedRC, HTTPBranchProtected, UserCreationError) from rhodecode.model.db import Repository, User log = logging.getLogger(__name__) @@ -94,9 +95,9 @@ def pre_push(extras): It bans pushing when the repository is locked. """ - usr = User.get_by_username(extras.username) + user = User.get_by_username(extras.username) output = '' - if extras.locked_by[0] and usr.user_id != int(extras.locked_by[0]): + if extras.locked_by[0] and user.user_id != int(extras.locked_by[0]): locked_by = User.get(extras.locked_by[0]).username reason = extras.locked_by[2] # this exception is interpreted in git/hg middlewares and based @@ -109,9 +110,48 @@ def pre_push(extras): else: raise _http_ret - # Propagate to external components. This is done after checking the - # lock, for consistent behavior. if not is_shadow_repo(extras): + if extras.commit_ids and extras.check_branch_perms: + + auth_user = user.AuthUser() + repo = Repository.get_by_repo_name(extras.repository) + affected_branches = [] + if repo.repo_type == 'hg': + for entry in extras.commit_ids: + if entry['type'] == 'branch': + is_forced = bool(entry['multiple_heads']) + affected_branches.append([entry['name'], is_forced]) + elif repo.repo_type == 'git': + for entry in extras.commit_ids: + if entry['type'] == 'heads': + is_forced = bool(entry['pruned_sha']) + affected_branches.append([entry['name'], is_forced]) + + for branch_name, is_forced in affected_branches: + + rule, branch_perm = auth_user.get_rule_and_branch_permission( + extras.repository, branch_name) + if not branch_perm: + # no branch permission found for this branch, just keep checking + continue + + if branch_perm == 'branch.push_force': + continue + elif branch_perm == 'branch.push' and is_forced is False: + continue + elif branch_perm == 'branch.push' and is_forced is True: + halt_message = 'Branch `{}` changes rejected by rule {}. ' \ + 'FORCE PUSH FORBIDDEN.'.format(branch_name, rule) + else: + halt_message = 'Branch `{}` changes rejected by rule {}.'.format( + branch_name, rule) + + if halt_message: + _http_ret = HTTPBranchProtected(halt_message) + raise _http_ret + + # Propagate to external components. This is done after checking the + # lock, for consistent behavior. pre_push_extension(repo_store_path=Repository.base_path(), **extras) events.trigger(events.RepoPrePushEvent( repo_name=extras.repository, extras=extras)) diff --git a/rhodecode/lib/hooks_daemon.py b/rhodecode/lib/hooks_daemon.py --- a/rhodecode/lib/hooks_daemon.py +++ b/rhodecode/lib/hooks_daemon.py @@ -29,6 +29,7 @@ from BaseHTTPServer import BaseHTTPReque from SocketServer import TCPServer import rhodecode +from rhodecode.lib.exceptions import HTTPLockedRC, HTTPBranchProtected from rhodecode.model import meta from rhodecode.lib.base import bootstrap_request, bootstrap_config from rhodecode.lib import hooks_base @@ -285,9 +286,20 @@ class Hooks(object): try: result = hook(extras) - except Exception as error: - exc_tb = traceback.format_exc() - log.exception('Exception when handling hook %s', hook) + except HTTPBranchProtected as handled_error: + # Those special cases doesn't need error reporting. It's a case of + # locked repo or protected branch + result = AttributeDict({ + 'status': handled_error.code, + 'output': handled_error.explanation + }) + except (HTTPLockedRC, Exception) as error: + # locked needs different handling since we need to also + # handle PULL operations + exc_tb = '' + if not isinstance(error, HTTPLockedRC): + exc_tb = traceback.format_exc() + log.exception('Exception when handling hook %s', hook) error_args = error.args return { 'status': 128, diff --git a/rhodecode/lib/middleware/simplevcs.py b/rhodecode/lib/middleware/simplevcs.py --- a/rhodecode/lib/middleware/simplevcs.py +++ b/rhodecode/lib/middleware/simplevcs.py @@ -297,7 +297,7 @@ class SimpleVCS(object): def is_shadow_repo_dir(self): return os.path.isdir(self.vcs_repo_name) - def _check_permission(self, action, user, repo_name, ip_addr=None, + def _check_permission(self, action, user, auth_user, repo_name, ip_addr=None, plugin_id='', plugin_cache_active=False, cache_ttl=0): """ Checks permissions using action (push/pull) user and repository @@ -335,14 +335,14 @@ class SimpleVCS(object): if action == 'push': perms = ('repository.write', 'repository.admin') - if not HasPermissionAnyMiddleware(*perms)(user, repo_name): + if not HasPermissionAnyMiddleware(*perms)(auth_user, repo_name): return False else: # any other action need at least read permission perms = ( 'repository.read', 'repository.write', 'repository.admin') - if not HasPermissionAnyMiddleware(*perms)(user, repo_name): + if not HasPermissionAnyMiddleware(*perms)(auth_user, repo_name): return False return True @@ -441,14 +441,17 @@ class SimpleVCS(object): # ====================================================================== # CHECK ANONYMOUS PERMISSION # ====================================================================== + detect_force_push = False + check_branch_perms = False if action in ['pull', 'push']: - anonymous_user = User.get_default_user() + user_obj = anonymous_user = User.get_default_user() + auth_user = user_obj.AuthUser() username = anonymous_user.username if anonymous_user.active: plugin_cache_active, cache_ttl = self._get_default_cache_ttl() # ONLY check permissions if the user is activated anonymous_perm = self._check_permission( - action, anonymous_user, self.acl_repo_name, ip_addr, + action, anonymous_user, auth_user, self.acl_repo_name, ip_addr, plugin_id='anonymous_access', plugin_cache_active=plugin_cache_active, cache_ttl=cache_ttl, @@ -525,6 +528,7 @@ class SimpleVCS(object): # check user attributes for password change flag user_obj = user + auth_user = user_obj.AuthUser() if user_obj and user_obj.username != User.DEFAULT_USER and \ user_obj.user_data.get('force_password_change'): reason = 'password change required' @@ -533,19 +537,27 @@ class SimpleVCS(object): # check permissions for this repository perm = self._check_permission( - action, user, self.acl_repo_name, ip_addr, + action, user, auth_user, self.acl_repo_name, ip_addr, plugin, plugin_cache_active, cache_ttl) if not perm: return HTTPForbidden()(environ, start_response) environ['rc_auth_user_id'] = user_id + if action == 'push': + perms = auth_user.get_branch_permissions(self.acl_repo_name) + if perms: + check_branch_perms = True + detect_force_push = True + # extras are injected into UI object and later available # in hooks executed by RhodeCode check_locking = _should_check_locking(environ.get('QUERY_STRING')) + extras = vcs_operation_context( environ, repo_name=self.acl_repo_name, username=username, action=action, scm=self.SCM, check_locking=check_locking, - is_shadow_repo=self.is_shadow_repo + is_shadow_repo=self.is_shadow_repo, check_branch_perms=check_branch_perms, + detect_force_push=detect_force_push ) # ====================================================================== diff --git a/rhodecode/tests/lib/test_auth.py b/rhodecode/tests/lib/test_auth.py --- a/rhodecode/tests/lib/test_auth.py +++ b/rhodecode/tests/lib/test_auth.py @@ -419,7 +419,7 @@ def test_permission_calculator_admin_per calculator = auth.PermissionCalculator( user.user_id, {}, False, False, True, 'higherwin') - permissions = calculator._admin_permissions() + permissions = calculator._calculate_admin_permissions() assert permissions['repositories_groups'][repo_group.group_name] == \ 'group.admin' diff --git a/rhodecode/tests/vcs_operations/__init__.py b/rhodecode/tests/vcs_operations/__init__.py --- a/rhodecode/tests/vcs_operations/__init__.py +++ b/rhodecode/tests/vcs_operations/__init__.py @@ -77,22 +77,18 @@ class Command(object): assert self.process.returncode == 0 -def _add_files_and_push(vcs, dest, clone_url=None, tags=None, **kwargs): - """ - Generate some files, add it to DEST repo and push back - vcs is git or hg and defines what VCS we want to make those files for - """ - # commit some stuff into this repo +def _add_files(vcs, dest, clone_url=None, tags=None, target_branch=None, + new_branch=False, **kwargs): + git_ident = "git config user.name {} && git config user.email {}".format( + 'Marcin Kuźminski', 'me@email.com') + cwd = path = jn(dest) + tags = tags or [] - cwd = path = jn(dest) added_file = jn(path, '%ssetup.py' % tempfile._RandomNameSequence().next()) Command(cwd).execute('touch %s' % added_file) Command(cwd).execute('%s add %s' % (vcs, added_file)) author_str = 'Marcin Kuźminski ' - git_ident = "git config user.name {} && git config user.email {}".format( - 'Marcin Kuźminski', 'me@email.com') - for i in range(kwargs.get('files_no', 3)): cmd = """echo 'added_line%s' >> %s""" % (i, added_file) Command(cwd).execute(cmd) @@ -107,30 +103,55 @@ def _add_files_and_push(vcs, dest, clone for tag in tags: if vcs == 'hg': - stdout, stderr = Command(cwd).execute( + Command(cwd).execute( 'hg tag', tag['name']) elif vcs == 'git': if tag['commit']: # annotated tag - stdout, stderr = Command(cwd).execute( + _stdout, _stderr = Command(cwd).execute( """%s && git tag -a %s -m "%s" """ % ( git_ident, tag['name'], tag['commit'])) else: # lightweight tag - stdout, stderr = Command(cwd).execute( + _stdout, _stderr = Command(cwd).execute( """%s && git tag %s""" % ( git_ident, tag['name'])) + +def _add_files_and_push(vcs, dest, clone_url=None, tags=None, target_branch=None, + new_branch=False, **kwargs): + """ + Generate some files, add it to DEST repo and push back + vcs is git or hg and defines what VCS we want to make those files for + """ + git_ident = "git config user.name {} && git config user.email {}".format( + 'Marcin Kuźminski', 'me@email.com') + cwd = path = jn(dest) + + # commit some stuff into this repo + _add_files(vcs, dest, clone_url, tags, target_branch, new_branch, **kwargs) + + default_target_branch = { + 'git': 'master', + 'hg': 'default' + }.get(vcs) + + target_branch = target_branch or default_target_branch + # PUSH it back stdout = stderr = None if vcs == 'hg': + maybe_new_branch = '' + if new_branch: + maybe_new_branch = '--new-branch' stdout, stderr = Command(cwd).execute( - 'hg push --verbose', clone_url) + 'hg push --verbose {} -r {} {}'.format(maybe_new_branch, target_branch, clone_url) + ) elif vcs == 'git': stdout, stderr = Command(cwd).execute( - """%s && - git push --verbose --tags %s master""" % ( - git_ident, clone_url)) + """{} && + git push --verbose --tags {} {}""".format(git_ident, clone_url, target_branch) + ) return stdout, stderr diff --git a/rhodecode/tests/vcs_operations/conftest.py b/rhodecode/tests/vcs_operations/conftest.py --- a/rhodecode/tests/vcs_operations/conftest.py +++ b/rhodecode/tests/vcs_operations/conftest.py @@ -33,7 +33,8 @@ import textwrap import pytest from rhodecode import events -from rhodecode.model.db import Integration +from rhodecode.model.db import Integration, UserRepoToPerm, Permission, \ + UserToRepoBranchPermission, User from rhodecode.model.integration import IntegrationModel from rhodecode.model.db import Repository from rhodecode.model.meta import Session @@ -267,3 +268,74 @@ def enable_webhook_push_integration(requ Session().delete(integration) Session().commit() + +@pytest.fixture +def branch_permission_setter(request): + """ + + def my_test(branch_permission_setter) + branch_permission_setter(repo_name, username, pattern='*', permission='branch.push') + + """ + + rule_id = None + write_perm_id = None + + def _branch_permissions_setter( + repo_name, username, pattern='*', permission='branch.push_force'): + global rule_id, write_perm_id + + repo = Repository.get_by_repo_name(repo_name) + repo_id = repo.repo_id + + user = User.get_by_username(username) + user_id = user.user_id + + rule_perm_obj = Permission.get_by_key(permission) + + write_perm = None + + # add new entry, based on existing perm entry + perm = UserRepoToPerm.query() \ + .filter(UserRepoToPerm.repository_id == repo_id) \ + .filter(UserRepoToPerm.user_id == user_id) \ + .first() + + if not perm: + # such user isn't defined in Permissions for repository + # we now on-the-fly add new permission + + write_perm = UserRepoToPerm() + write_perm.permission = Permission.get_by_key('repository.write') + write_perm.repository_id = repo_id + write_perm.user_id = user_id + Session().add(write_perm) + Session().flush() + + perm = write_perm + + rule = UserToRepoBranchPermission() + rule.rule_to_perm_id = perm.repo_to_perm_id + rule.branch_pattern = pattern + rule.rule_order = 10 + rule.permission = rule_perm_obj + rule.repository_id = repo_id + Session().add(rule) + Session().commit() + + global rule, write_perm + + return rule + + @request.addfinalizer + def cleanup(): + if rule: + Session().delete(rule) + Session().commit() + if write_perm: + Session().delete(write_perm) + Session().commit() + + return _branch_permissions_setter + + diff --git a/rhodecode/tests/vcs_operations/test_vcs_operations_branch_protection.py b/rhodecode/tests/vcs_operations/test_vcs_operations_branch_protection.py new file mode 100644 --- /dev/null +++ b/rhodecode/tests/vcs_operations/test_vcs_operations_branch_protection.py @@ -0,0 +1,94 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2010-2018 RhodeCode GmbH +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3 +# (only), as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +# This program is dual-licensed. If you wish to learn more about the +# RhodeCode Enterprise Edition, including its added features, Support services, +# and proprietary license terms, please see https://rhodecode.com/licenses/ + +import os +import pytest + +from rhodecode.tests import ( + TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS, + TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS) +from rhodecode.tests.vcs_operations import ( + Command, _check_proper_hg_push, _check_proper_git_push, _add_files_and_push) + + +@pytest.mark.usefixtures("disable_anonymous_user") +class TestVCSOperations(object): + + @pytest.mark.parametrize('username, password', [ + (TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS), + (TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS), + ]) + @pytest.mark.parametrize('branch_perm', [ + 'branch.none', + 'branch.merge', + 'branch.push', + 'branch.push_force', + ]) + def test_push_to_protected_branch_fails_with_message_hg( + self, rc_web_server, tmpdir, branch_perm, user_util, + branch_permission_setter, username, password): + repo = user_util.create_repo(repo_type='hg') + repo_name = repo.repo_name + branch_permission_setter(repo_name, username, permission=branch_perm) + + clone_url = rc_web_server.repo_clone_url( + repo.repo_name, user=username, passwd=password) + Command(os.path.dirname(tmpdir.strpath)).execute( + 'hg clone', clone_url, tmpdir.strpath) + + stdout, stderr = _add_files_and_push( + 'hg', tmpdir.strpath, clone_url=clone_url) + if branch_perm in ['branch.push', 'branch.push_force']: + _check_proper_hg_push(stdout, stderr) + else: + msg = "Branch `default` changes rejected by rule `*`=>{}".format(branch_perm) + assert msg in stdout + assert "transaction abort" in stdout + + @pytest.mark.parametrize('username, password', [ + (TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS), + (TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS), + ]) + @pytest.mark.parametrize('branch_perm', [ + 'branch.none', + 'branch.merge', + 'branch.push', + 'branch.push_force', + ]) + def test_push_to_protected_branch_fails_with_message_git( + self, rc_web_server, tmpdir, branch_perm, user_util, + branch_permission_setter, username, password): + repo = user_util.create_repo(repo_type='git') + repo_name = repo.repo_name + branch_permission_setter(repo_name, username, permission=branch_perm) + + clone_url = rc_web_server.repo_clone_url( + repo.repo_name, user=username, passwd=password) + Command(os.path.dirname(tmpdir.strpath)).execute( + 'git clone', clone_url, tmpdir.strpath) + + stdout, stderr = _add_files_and_push( + 'git', tmpdir.strpath, clone_url=clone_url) + if branch_perm in ['branch.push', 'branch.push_force']: + _check_proper_git_push(stdout, stderr) + else: + msg = "Branch `master` changes rejected by rule `*`=>{}".format(branch_perm) + assert msg in stderr + assert "(pre-receive hook declined)" in stderr diff --git a/rhodecode/tests/vcs_operations/test_vcs_operations_force_push.py b/rhodecode/tests/vcs_operations/test_vcs_operations_force_push.py new file mode 100644 --- /dev/null +++ b/rhodecode/tests/vcs_operations/test_vcs_operations_force_push.py @@ -0,0 +1,122 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2010-2018 RhodeCode GmbH +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3 +# (only), as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +# This program is dual-licensed. If you wish to learn more about the +# RhodeCode Enterprise Edition, including its added features, Support services, +# and proprietary license terms, please see https://rhodecode.com/licenses/ + + +import os +import pytest + +from rhodecode.tests import TEST_USER_ADMIN_LOGIN +from rhodecode.tests.vcs_operations import ( + Command, _check_proper_hg_push, _check_proper_git_push, + _add_files, _add_files_and_push) + + +@pytest.mark.usefixtures("disable_anonymous_user") +class TestVCSOperations(object): + + def test_push_force_hg(self, rc_web_server, tmpdir, user_util): + repo = user_util.create_repo(repo_type='hg') + clone_url = rc_web_server.repo_clone_url(repo.repo_name) + Command(os.path.dirname(tmpdir.strpath)).execute( + 'hg clone', clone_url, tmpdir.strpath) + + stdout, stderr = _add_files_and_push( + 'hg', tmpdir.strpath, clone_url=clone_url) + _check_proper_hg_push(stdout, stderr) + + # rewrite history, and push with force + Command(tmpdir.strpath).execute( + 'hg checkout -r 1 && hg commit -m "starting new head"') + _add_files('hg', tmpdir.strpath, clone_url=clone_url) + + stdout, stderr = Command(tmpdir.strpath).execute( + 'hg push --verbose -f {}'.format(clone_url)) + + _check_proper_hg_push(stdout, stderr) + + def test_push_force_git(self, rc_web_server, tmpdir, user_util): + repo = user_util.create_repo(repo_type='git') + clone_url = rc_web_server.repo_clone_url(repo.repo_name) + Command(os.path.dirname(tmpdir.strpath)).execute( + 'git clone', clone_url, tmpdir.strpath) + + stdout, stderr = _add_files_and_push( + 'git', tmpdir.strpath, clone_url=clone_url) + _check_proper_git_push(stdout, stderr) + + # rewrite history, and push with force + Command(tmpdir.strpath).execute( + 'git reset --hard HEAD~2') + stdout, stderr = Command(tmpdir.strpath).execute( + 'git push -f {} master'.format(clone_url)) + + assert '(forced update)' in stderr + + def test_push_force_hg_blocked_by_branch_permissions( + self, rc_web_server, tmpdir, user_util, branch_permission_setter): + repo = user_util.create_repo(repo_type='hg') + repo_name = repo.repo_name + username = TEST_USER_ADMIN_LOGIN + branch_permission_setter(repo_name, username, permission='branch.push') + + clone_url = rc_web_server.repo_clone_url(repo.repo_name) + Command(os.path.dirname(tmpdir.strpath)).execute( + 'hg clone', clone_url, tmpdir.strpath) + + stdout, stderr = _add_files_and_push( + 'hg', tmpdir.strpath, clone_url=clone_url) + _check_proper_hg_push(stdout, stderr) + + # rewrite history, and push with force + Command(tmpdir.strpath).execute( + 'hg checkout -r 1 && hg commit -m "starting new head"') + _add_files('hg', tmpdir.strpath, clone_url=clone_url) + + stdout, stderr = Command(tmpdir.strpath).execute( + 'hg push --verbose -f {}'.format(clone_url)) + + assert "Branch `default` changes rejected by rule `*`=>branch.push" in stdout + assert "FORCE PUSH FORBIDDEN" in stdout + assert "transaction abort" in stdout + + def test_push_force_git_blocked_by_branch_permissions( + self, rc_web_server, tmpdir, user_util, branch_permission_setter): + repo = user_util.create_repo(repo_type='git') + repo_name = repo.repo_name + username = TEST_USER_ADMIN_LOGIN + branch_permission_setter(repo_name, username, permission='branch.push') + + clone_url = rc_web_server.repo_clone_url(repo.repo_name) + Command(os.path.dirname(tmpdir.strpath)).execute( + 'git clone', clone_url, tmpdir.strpath) + + stdout, stderr = _add_files_and_push( + 'git', tmpdir.strpath, clone_url=clone_url) + _check_proper_git_push(stdout, stderr) + + # rewrite history, and push with force + Command(tmpdir.strpath).execute( + 'git reset --hard HEAD~2') + stdout, stderr = Command(tmpdir.strpath).execute( + 'git push -f {} master'.format(clone_url)) + + assert "Branch `master` changes rejected by rule `*`=>branch.push" in stderr + assert "FORCE PUSH FORBIDDEN" in stderr + assert "(pre-receive hook declined)" in stderr diff --git a/rhodecode/tests/vcs_operations/test_vcs_operations_new_branch_push.py b/rhodecode/tests/vcs_operations/test_vcs_operations_new_branch_push.py new file mode 100644 --- /dev/null +++ b/rhodecode/tests/vcs_operations/test_vcs_operations_new_branch_push.py @@ -0,0 +1,115 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2010-2018 RhodeCode GmbH +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3 +# (only), as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +# This program is dual-licensed. If you wish to learn more about the +# RhodeCode Enterprise Edition, including its added features, Support services, +# and proprietary license terms, please see https://rhodecode.com/licenses/ + + +import os +import pytest + +from rhodecode.tests import TEST_USER_ADMIN_LOGIN +from rhodecode.tests.vcs_operations import ( + Command, _check_proper_hg_push, _check_proper_git_push, _add_files_and_push) + + +@pytest.mark.usefixtures("disable_anonymous_user") +class TestVCSOperations(object): + + def test_push_new_branch_hg(self, rc_web_server, tmpdir, user_util): + repo = user_util.create_repo(repo_type='hg') + clone_url = rc_web_server.repo_clone_url(repo.repo_name) + Command(os.path.dirname(tmpdir.strpath)).execute( + 'hg clone', clone_url, tmpdir.strpath) + + stdout, stderr = _add_files_and_push( + 'hg', tmpdir.strpath, clone_url=clone_url) + _check_proper_hg_push(stdout, stderr) + + # start new branch, and push file into it + Command(tmpdir.strpath).execute( + 'hg branch dev && hg commit -m "starting dev branch"') + stdout, stderr = _add_files_and_push( + 'hg', tmpdir.strpath, clone_url=clone_url, target_branch='dev', + new_branch=True) + + _check_proper_hg_push(stdout, stderr) + + def test_push_new_branch_git(self, rc_web_server, tmpdir, user_util): + repo = user_util.create_repo(repo_type='git') + clone_url = rc_web_server.repo_clone_url(repo.repo_name) + Command(os.path.dirname(tmpdir.strpath)).execute( + 'git clone', clone_url, tmpdir.strpath) + + stdout, stderr = _add_files_and_push( + 'git', tmpdir.strpath, clone_url=clone_url) + _check_proper_git_push(stdout, stderr) + + # start new branch, and push file into it + Command(tmpdir.strpath).execute('git checkout -b dev') + stdout, stderr = _add_files_and_push( + 'git', tmpdir.strpath, clone_url=clone_url, target_branch='dev', + new_branch=True) + + _check_proper_git_push(stdout, stderr, branch='dev') + + def test_push_new_branch_hg_with_branch_permissions_no_force_push( + self, rc_web_server, tmpdir, user_util, branch_permission_setter): + repo = user_util.create_repo(repo_type='hg') + repo_name = repo.repo_name + username = TEST_USER_ADMIN_LOGIN + branch_permission_setter(repo_name, username, permission='branch.push') + + clone_url = rc_web_server.repo_clone_url(repo.repo_name) + Command(os.path.dirname(tmpdir.strpath)).execute( + 'hg clone', clone_url, tmpdir.strpath) + + stdout, stderr = _add_files_and_push( + 'hg', tmpdir.strpath, clone_url=clone_url) + _check_proper_hg_push(stdout, stderr) + + # start new branch, and push file into it + Command(tmpdir.strpath).execute( + 'hg branch dev && hg commit -m "starting dev branch"') + stdout, stderr = _add_files_and_push( + 'hg', tmpdir.strpath, clone_url=clone_url, target_branch='dev', + new_branch=True) + + _check_proper_hg_push(stdout, stderr) + + def test_push_new_branch_git_with_branch_permissions_no_force_push( + self, rc_web_server, tmpdir, user_util, branch_permission_setter): + repo = user_util.create_repo(repo_type='git') + repo_name = repo.repo_name + username = TEST_USER_ADMIN_LOGIN + branch_permission_setter(repo_name, username, permission='branch.push') + + clone_url = rc_web_server.repo_clone_url(repo.repo_name) + Command(os.path.dirname(tmpdir.strpath)).execute( + 'git clone', clone_url, tmpdir.strpath) + + stdout, stderr = _add_files_and_push( + 'git', tmpdir.strpath, clone_url=clone_url) + _check_proper_git_push(stdout, stderr) + + # start new branch, and push file into it + Command(tmpdir.strpath).execute('git checkout -b dev') + stdout, stderr = _add_files_and_push( + 'git', tmpdir.strpath, clone_url=clone_url, target_branch='dev', + new_branch=True) + + _check_proper_git_push(stdout, stderr, branch='dev')