|
|
|
|
|
# Copyright (C) 2010-2023 RhodeCode GmbH
|
|
|
#
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU Affero General Public License, version 3
|
|
|
# (only), as published by the Free Software Foundation.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
#
|
|
|
# This program is dual-licensed. If you wish to learn more about the
|
|
|
# RhodeCode Enterprise Edition, including its added features, Support services,
|
|
|
# and proprietary license terms, please see https://rhodecode.com/licenses/
|
|
|
|
|
|
"""
|
|
|
Helpers for fixture generation
|
|
|
"""
|
|
|
|
|
|
import os
|
|
|
import time
|
|
|
import tempfile
|
|
|
import shutil
|
|
|
import configparser
|
|
|
|
|
|
from rhodecode.model.settings import SettingsModel
|
|
|
from rhodecode.model.db import Repository, User, RepoGroup, UserGroup, Gist, UserEmailMap
|
|
|
from rhodecode.model.meta import Session
|
|
|
from rhodecode.model.repo import RepoModel
|
|
|
from rhodecode.model.user import UserModel
|
|
|
from rhodecode.model.repo_group import RepoGroupModel
|
|
|
from rhodecode.model.user_group import UserGroupModel
|
|
|
from rhodecode.model.gist import GistModel
|
|
|
from rhodecode.model.auth_token import AuthTokenModel
|
|
|
from rhodecode.model.scm import ScmModel
|
|
|
from rhodecode.authentication.plugins.auth_rhodecode import \
|
|
|
RhodeCodeAuthPlugin
|
|
|
|
|
|
from rhodecode.tests import TEST_USER_ADMIN_LOGIN
|
|
|
|
|
|
dn = os.path.dirname
|
|
|
FIXTURES = os.path.join(dn(dn(os.path.abspath(__file__))), 'tests', 'fixtures')
|
|
|
|
|
|
|
|
|
def error_function(*args, **kwargs):
|
|
|
raise Exception('Total Crash !')
|
|
|
|
|
|
|
|
|
class TestINI(object):
|
|
|
"""
|
|
|
Allows to create a new test.ini file as a copy of existing one with edited
|
|
|
data. Example usage::
|
|
|
|
|
|
with TestINI('test.ini', [{'section':{'key':val'}]) as new_test_ini_path:
|
|
|
print('paster server %s' % new_test_ini)
|
|
|
"""
|
|
|
|
|
|
def __init__(self, ini_file_path, ini_params, new_file_prefix='DEFAULT',
|
|
|
destroy=True, dir=None):
|
|
|
self.ini_file_path = ini_file_path
|
|
|
self.ini_params = ini_params
|
|
|
self.new_path = None
|
|
|
self.new_path_prefix = new_file_prefix
|
|
|
self._destroy = destroy
|
|
|
self._dir = dir
|
|
|
|
|
|
def __enter__(self):
|
|
|
return self.create()
|
|
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
|
self.destroy()
|
|
|
|
|
|
def create(self):
|
|
|
parser = configparser.ConfigParser()
|
|
|
parser.read(self.ini_file_path)
|
|
|
|
|
|
for data in self.ini_params:
|
|
|
section, ini_params = list(data.items())[0]
|
|
|
|
|
|
for key, val in ini_params.items():
|
|
|
parser[section][key] = str(val)
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(
|
|
|
mode='w',
|
|
|
prefix=self.new_path_prefix, suffix='.ini', dir=self._dir,
|
|
|
delete=False) as new_ini_file:
|
|
|
parser.write(new_ini_file)
|
|
|
self.new_path = new_ini_file.name
|
|
|
|
|
|
return self.new_path
|
|
|
|
|
|
def destroy(self):
|
|
|
if self._destroy:
|
|
|
os.remove(self.new_path)
|
|
|
|
|
|
|
|
|
class Fixture(object):
|
|
|
|
|
|
def anon_access(self, status):
|
|
|
"""
|
|
|
Context process for disabling anonymous access. use like:
|
|
|
fixture = Fixture()
|
|
|
with fixture.anon_access(False):
|
|
|
#tests
|
|
|
|
|
|
after this block anon access will be set to `not status`
|
|
|
"""
|
|
|
|
|
|
class context(object):
|
|
|
def __enter__(self):
|
|
|
anon = User.get_default_user()
|
|
|
anon.active = status
|
|
|
Session().add(anon)
|
|
|
Session().commit()
|
|
|
time.sleep(1.5) # must sleep for cache (1s to expire)
|
|
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
|
anon = User.get_default_user()
|
|
|
anon.active = not status
|
|
|
Session().add(anon)
|
|
|
Session().commit()
|
|
|
|
|
|
return context()
|
|
|
|
|
|
def auth_restriction(self, registry, auth_restriction):
|
|
|
"""
|
|
|
Context process for changing the builtin rhodecode plugin auth restrictions.
|
|
|
Use like:
|
|
|
fixture = Fixture()
|
|
|
with fixture.auth_restriction('super_admin'):
|
|
|
#tests
|
|
|
|
|
|
after this block auth restriction will be taken off
|
|
|
"""
|
|
|
|
|
|
class context(object):
|
|
|
def _get_plugin(self):
|
|
|
plugin_id = 'egg:rhodecode-enterprise-ce#{}'.format(RhodeCodeAuthPlugin.uid)
|
|
|
plugin = RhodeCodeAuthPlugin(plugin_id)
|
|
|
return plugin
|
|
|
|
|
|
def __enter__(self):
|
|
|
|
|
|
plugin = self._get_plugin()
|
|
|
plugin.create_or_update_setting('auth_restriction', auth_restriction)
|
|
|
Session().commit()
|
|
|
SettingsModel().invalidate_settings_cache(hard=True)
|
|
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
|
|
|
|
plugin = self._get_plugin()
|
|
|
plugin.create_or_update_setting(
|
|
|
'auth_restriction', RhodeCodeAuthPlugin.AUTH_RESTRICTION_NONE)
|
|
|
Session().commit()
|
|
|
SettingsModel().invalidate_settings_cache(hard=True)
|
|
|
|
|
|
return context()
|
|
|
|
|
|
def scope_restriction(self, registry, scope_restriction):
|
|
|
"""
|
|
|
Context process for changing the builtin rhodecode plugin scope restrictions.
|
|
|
Use like:
|
|
|
fixture = Fixture()
|
|
|
with fixture.scope_restriction('scope_http'):
|
|
|
#tests
|
|
|
|
|
|
after this block scope restriction will be taken off
|
|
|
"""
|
|
|
|
|
|
class context(object):
|
|
|
def _get_plugin(self):
|
|
|
plugin_id = 'egg:rhodecode-enterprise-ce#{}'.format(RhodeCodeAuthPlugin.uid)
|
|
|
plugin = RhodeCodeAuthPlugin(plugin_id)
|
|
|
return plugin
|
|
|
|
|
|
def __enter__(self):
|
|
|
plugin = self._get_plugin()
|
|
|
plugin.create_or_update_setting('scope_restriction', scope_restriction)
|
|
|
Session().commit()
|
|
|
SettingsModel().invalidate_settings_cache(hard=True)
|
|
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
|
plugin = self._get_plugin()
|
|
|
plugin.create_or_update_setting(
|
|
|
'scope_restriction', RhodeCodeAuthPlugin.AUTH_RESTRICTION_SCOPE_ALL)
|
|
|
Session().commit()
|
|
|
SettingsModel().invalidate_settings_cache(hard=True)
|
|
|
|
|
|
return context()
|
|
|
|
|
|
def _get_repo_create_params(self, **custom):
|
|
|
repo_type = custom.get('repo_type') or 'hg'
|
|
|
|
|
|
default_landing_ref, landing_ref_lbl = ScmModel.backend_landing_ref(repo_type)
|
|
|
|
|
|
defs = {
|
|
|
'repo_name': None,
|
|
|
'repo_type': repo_type,
|
|
|
'clone_uri': '',
|
|
|
'push_uri': '',
|
|
|
'repo_group': '-1',
|
|
|
'repo_description': 'DESC',
|
|
|
'repo_private': False,
|
|
|
'repo_landing_commit_ref': default_landing_ref,
|
|
|
'repo_copy_permissions': False,
|
|
|
'repo_state': Repository.STATE_CREATED,
|
|
|
}
|
|
|
defs.update(custom)
|
|
|
if 'repo_name_full' not in custom:
|
|
|
defs.update({'repo_name_full': defs['repo_name']})
|
|
|
|
|
|
# fix the repo name if passed as repo_name_full
|
|
|
if defs['repo_name']:
|
|
|
defs['repo_name'] = defs['repo_name'].split('/')[-1]
|
|
|
|
|
|
return defs
|
|
|
|
|
|
def _get_group_create_params(self, **custom):
|
|
|
defs = {
|
|
|
'group_name': None,
|
|
|
'group_description': 'DESC',
|
|
|
'perm_updates': [],
|
|
|
'perm_additions': [],
|
|
|
'perm_deletions': [],
|
|
|
'group_parent_id': -1,
|
|
|
'enable_locking': False,
|
|
|
'recursive': False,
|
|
|
}
|
|
|
defs.update(custom)
|
|
|
|
|
|
return defs
|
|
|
|
|
|
def _get_user_create_params(self, name, **custom):
|
|
|
defs = {
|
|
|
'username': name,
|
|
|
'password': 'qweqwe',
|
|
|
'email': '%s+test@rhodecode.org' % name,
|
|
|
'firstname': 'TestUser',
|
|
|
'lastname': 'Test',
|
|
|
'description': 'test description',
|
|
|
'active': True,
|
|
|
'admin': False,
|
|
|
'extern_type': 'rhodecode',
|
|
|
'extern_name': None,
|
|
|
}
|
|
|
defs.update(custom)
|
|
|
|
|
|
return defs
|
|
|
|
|
|
def _get_user_group_create_params(self, name, **custom):
|
|
|
defs = {
|
|
|
'users_group_name': name,
|
|
|
'user_group_description': 'DESC',
|
|
|
'users_group_active': True,
|
|
|
'user_group_data': {},
|
|
|
}
|
|
|
defs.update(custom)
|
|
|
|
|
|
return defs
|
|
|
|
|
|
def create_repo(self, name, **kwargs):
|
|
|
repo_group = kwargs.get('repo_group')
|
|
|
if isinstance(repo_group, RepoGroup):
|
|
|
kwargs['repo_group'] = repo_group.group_id
|
|
|
name = name.split(Repository.NAME_SEP)[-1]
|
|
|
name = Repository.NAME_SEP.join((repo_group.group_name, name))
|
|
|
|
|
|
if 'skip_if_exists' in kwargs:
|
|
|
del kwargs['skip_if_exists']
|
|
|
r = Repository.get_by_repo_name(name)
|
|
|
if r:
|
|
|
return r
|
|
|
|
|
|
form_data = self._get_repo_create_params(repo_name=name, **kwargs)
|
|
|
cur_user = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
|
|
|
RepoModel().create(form_data, cur_user)
|
|
|
Session().commit()
|
|
|
repo = Repository.get_by_repo_name(name)
|
|
|
assert repo
|
|
|
return repo
|
|
|
|
|
|
def create_fork(self, repo_to_fork, fork_name, **kwargs):
|
|
|
repo_to_fork = Repository.get_by_repo_name(repo_to_fork)
|
|
|
|
|
|
form_data = self._get_repo_create_params(
|
|
|
repo_name=fork_name,
|
|
|
fork_parent_id=repo_to_fork.repo_id,
|
|
|
repo_type=repo_to_fork.repo_type,
|
|
|
**kwargs)
|
|
|
|
|
|
# TODO: fix it !!
|
|
|
form_data['description'] = form_data['repo_description']
|
|
|
form_data['private'] = form_data['repo_private']
|
|
|
form_data['landing_rev'] = form_data['repo_landing_commit_ref']
|
|
|
|
|
|
owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
|
|
|
RepoModel().create_fork(form_data, cur_user=owner)
|
|
|
Session().commit()
|
|
|
r = Repository.get_by_repo_name(fork_name)
|
|
|
assert r
|
|
|
return r
|
|
|
|
|
|
def destroy_repo(self, repo_name, **kwargs):
|
|
|
RepoModel().delete(repo_name, pull_requests='delete', artifacts='delete', **kwargs)
|
|
|
Session().commit()
|
|
|
|
|
|
def destroy_repo_on_filesystem(self, repo_name):
|
|
|
rm_path = os.path.join(RepoModel().repos_path, repo_name)
|
|
|
if os.path.isdir(rm_path):
|
|
|
shutil.rmtree(rm_path)
|
|
|
|
|
|
def create_repo_group(self, name, **kwargs):
|
|
|
if 'skip_if_exists' in kwargs:
|
|
|
del kwargs['skip_if_exists']
|
|
|
gr = RepoGroup.get_by_group_name(group_name=name)
|
|
|
if gr:
|
|
|
return gr
|
|
|
form_data = self._get_group_create_params(group_name=name, **kwargs)
|
|
|
owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
|
|
|
gr = RepoGroupModel().create(
|
|
|
group_name=form_data['group_name'],
|
|
|
group_description=form_data['group_name'],
|
|
|
owner=owner)
|
|
|
Session().commit()
|
|
|
gr = RepoGroup.get_by_group_name(gr.group_name)
|
|
|
return gr
|
|
|
|
|
|
def destroy_repo_group(self, repogroupid):
|
|
|
RepoGroupModel().delete(repogroupid)
|
|
|
Session().commit()
|
|
|
|
|
|
def create_user(self, name, **kwargs):
|
|
|
if 'skip_if_exists' in kwargs:
|
|
|
del kwargs['skip_if_exists']
|
|
|
user = User.get_by_username(name)
|
|
|
if user:
|
|
|
return user
|
|
|
form_data = self._get_user_create_params(name, **kwargs)
|
|
|
user = UserModel().create(form_data)
|
|
|
|
|
|
# create token for user
|
|
|
AuthTokenModel().create(
|
|
|
user=user, description=u'TEST_USER_TOKEN')
|
|
|
|
|
|
Session().commit()
|
|
|
user = User.get_by_username(user.username)
|
|
|
return user
|
|
|
|
|
|
def destroy_user(self, userid):
|
|
|
UserModel().delete(userid)
|
|
|
Session().commit()
|
|
|
|
|
|
def create_additional_user_email(self, user, email):
|
|
|
uem = UserEmailMap()
|
|
|
uem.user = user
|
|
|
uem.email = email
|
|
|
Session().add(uem)
|
|
|
return uem
|
|
|
|
|
|
def destroy_users(self, userid_iter):
|
|
|
for user_id in userid_iter:
|
|
|
if User.get_by_username(user_id):
|
|
|
UserModel().delete(user_id)
|
|
|
Session().commit()
|
|
|
|
|
|
def create_user_group(self, name, **kwargs):
|
|
|
if 'skip_if_exists' in kwargs:
|
|
|
del kwargs['skip_if_exists']
|
|
|
gr = UserGroup.get_by_group_name(group_name=name)
|
|
|
if gr:
|
|
|
return gr
|
|
|
# map active flag to the real attribute. For API consistency of fixtures
|
|
|
if 'active' in kwargs:
|
|
|
kwargs['users_group_active'] = kwargs['active']
|
|
|
del kwargs['active']
|
|
|
form_data = self._get_user_group_create_params(name, **kwargs)
|
|
|
owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
|
|
|
user_group = UserGroupModel().create(
|
|
|
name=form_data['users_group_name'],
|
|
|
description=form_data['user_group_description'],
|
|
|
owner=owner, active=form_data['users_group_active'],
|
|
|
group_data=form_data['user_group_data'])
|
|
|
Session().commit()
|
|
|
user_group = UserGroup.get_by_group_name(user_group.users_group_name)
|
|
|
return user_group
|
|
|
|
|
|
def destroy_user_group(self, usergroupid):
|
|
|
UserGroupModel().delete(user_group=usergroupid, force=True)
|
|
|
Session().commit()
|
|
|
|
|
|
def create_gist(self, **kwargs):
|
|
|
form_data = {
|
|
|
'description': 'new-gist',
|
|
|
'owner': TEST_USER_ADMIN_LOGIN,
|
|
|
'gist_type': GistModel.cls.GIST_PUBLIC,
|
|
|
'lifetime': -1,
|
|
|
'acl_level': Gist.ACL_LEVEL_PUBLIC,
|
|
|
'gist_mapping': {b'filename1.txt': {'content': b'hello world'},}
|
|
|
}
|
|
|
form_data.update(kwargs)
|
|
|
gist = GistModel().create(
|
|
|
description=form_data['description'], owner=form_data['owner'],
|
|
|
gist_mapping=form_data['gist_mapping'], gist_type=form_data['gist_type'],
|
|
|
lifetime=form_data['lifetime'], gist_acl_level=form_data['acl_level']
|
|
|
)
|
|
|
Session().commit()
|
|
|
return gist
|
|
|
|
|
|
def destroy_gists(self, gistid=None):
|
|
|
for g in GistModel.cls.get_all():
|
|
|
if gistid:
|
|
|
if gistid == g.gist_access_id:
|
|
|
GistModel().delete(g)
|
|
|
else:
|
|
|
GistModel().delete(g)
|
|
|
Session().commit()
|
|
|
|
|
|
def load_resource(self, resource_name, strip=False):
|
|
|
with open(os.path.join(FIXTURES, resource_name), 'rb') as f:
|
|
|
source = f.read()
|
|
|
if strip:
|
|
|
source = source.strip()
|
|
|
|
|
|
return source
|
|
|
|