|
|
# Copyright (C) 2010-2023 RhodeCode GmbH
|
|
|
#
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU Affero General Public License, version 3
|
|
|
# (only), as published by the Free Software Foundation.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
#
|
|
|
# This program is dual-licensed. If you wish to learn more about the
|
|
|
# RhodeCode Enterprise Edition, including its added features, Support services,
|
|
|
# and proprietary license terms, please see https://rhodecode.com/licenses/
|
|
|
|
|
|
"""
|
|
|
Helpers for fixture generation
|
|
|
"""
|
|
|
|
|
|
import os
|
|
|
import time
|
|
|
import tempfile
|
|
|
import shutil
|
|
|
import configparser
|
|
|
|
|
|
from rhodecode.model.settings import SettingsModel
|
|
|
from rhodecode.model.db import Repository, User, RepoGroup, UserGroup, Gist, UserEmailMap
|
|
|
from rhodecode.model.meta import Session
|
|
|
from rhodecode.model.repo import RepoModel
|
|
|
from rhodecode.model.user import UserModel
|
|
|
from rhodecode.model.repo_group import RepoGroupModel
|
|
|
from rhodecode.model.user_group import UserGroupModel
|
|
|
from rhodecode.model.gist import GistModel
|
|
|
from rhodecode.model.auth_token import AuthTokenModel
|
|
|
from rhodecode.model.scm import ScmModel
|
|
|
from rhodecode.authentication.plugins.auth_rhodecode import RhodeCodeAuthPlugin
|
|
|
|
|
|
from rhodecode.tests import TEST_USER_ADMIN_LOGIN
|
|
|
|
|
|
dn = os.path.dirname
|
|
|
FIXTURES = os.path.join(dn(os.path.abspath(__file__)), "diff_fixtures")
|
|
|
|
|
|
|
|
|
def error_function(*args, **kwargs):
|
|
|
raise Exception("Total Crash !")
|
|
|
|
|
|
|
|
|
class TestINI(object):
|
|
|
"""
|
|
|
Allows to create a new test.ini file as a copy of existing one with edited
|
|
|
data. Example usage::
|
|
|
|
|
|
with TestINI('test.ini', [{'section':{'key':val'}]) as new_test_ini_path:
|
|
|
print('paster server %s' % new_test_ini)
|
|
|
"""
|
|
|
|
|
|
def __init__(self, ini_file_path, ini_params, new_file_prefix="DEFAULT", destroy=True, dir=None):
|
|
|
self.ini_file_path = ini_file_path
|
|
|
self.ini_params = ini_params
|
|
|
self.new_path = None
|
|
|
self.new_path_prefix = new_file_prefix
|
|
|
self._destroy = destroy
|
|
|
self._dir = dir
|
|
|
|
|
|
def __enter__(self):
|
|
|
return self.create()
|
|
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
|
self.destroy()
|
|
|
|
|
|
def create(self):
|
|
|
parser = configparser.ConfigParser()
|
|
|
parser.read(self.ini_file_path)
|
|
|
|
|
|
for data in self.ini_params:
|
|
|
section, ini_params = list(data.items())[0]
|
|
|
|
|
|
for key, val in ini_params.items():
|
|
|
parser[section][key] = str(val)
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(
|
|
|
mode="w", prefix=self.new_path_prefix, suffix=".ini", dir=self._dir, delete=False
|
|
|
) as new_ini_file:
|
|
|
parser.write(new_ini_file)
|
|
|
self.new_path = new_ini_file.name
|
|
|
|
|
|
return self.new_path
|
|
|
|
|
|
def destroy(self):
|
|
|
if self._destroy:
|
|
|
os.remove(self.new_path)
|
|
|
|
|
|
|
|
|
class Fixture(object):
|
|
|
def anon_access(self, status):
|
|
|
"""
|
|
|
Context process for disabling anonymous access. use like:
|
|
|
fixture = Fixture()
|
|
|
with fixture.anon_access(False):
|
|
|
#tests
|
|
|
|
|
|
after this block anon access will be set to `not status`
|
|
|
"""
|
|
|
|
|
|
class context(object):
|
|
|
def __enter__(self):
|
|
|
anon = User.get_default_user()
|
|
|
anon.active = status
|
|
|
Session().add(anon)
|
|
|
Session().commit()
|
|
|
time.sleep(1.5) # must sleep for cache (1s to expire)
|
|
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
|
anon = User.get_default_user()
|
|
|
anon.active = not status
|
|
|
Session().add(anon)
|
|
|
Session().commit()
|
|
|
|
|
|
return context()
|
|
|
|
|
|
def auth_restriction(self, registry, auth_restriction):
|
|
|
"""
|
|
|
Context process for changing the builtin rhodecode plugin auth restrictions.
|
|
|
Use like:
|
|
|
fixture = Fixture()
|
|
|
with fixture.auth_restriction('super_admin'):
|
|
|
#tests
|
|
|
|
|
|
after this block auth restriction will be taken off
|
|
|
"""
|
|
|
|
|
|
class context(object):
|
|
|
def _get_plugin(self):
|
|
|
plugin_id = "egg:rhodecode-enterprise-ce#{}".format(RhodeCodeAuthPlugin.uid)
|
|
|
plugin = RhodeCodeAuthPlugin(plugin_id)
|
|
|
return plugin
|
|
|
|
|
|
def __enter__(self):
|
|
|
plugin = self._get_plugin()
|
|
|
plugin.create_or_update_setting("auth_restriction", auth_restriction)
|
|
|
Session().commit()
|
|
|
SettingsModel().invalidate_settings_cache(hard=True)
|
|
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
|
plugin = self._get_plugin()
|
|
|
plugin.create_or_update_setting("auth_restriction", RhodeCodeAuthPlugin.AUTH_RESTRICTION_NONE)
|
|
|
Session().commit()
|
|
|
SettingsModel().invalidate_settings_cache(hard=True)
|
|
|
|
|
|
return context()
|
|
|
|
|
|
def scope_restriction(self, registry, scope_restriction):
|
|
|
"""
|
|
|
Context process for changing the builtin rhodecode plugin scope restrictions.
|
|
|
Use like:
|
|
|
fixture = Fixture()
|
|
|
with fixture.scope_restriction('scope_http'):
|
|
|
#tests
|
|
|
|
|
|
after this block scope restriction will be taken off
|
|
|
"""
|
|
|
|
|
|
class context(object):
|
|
|
def _get_plugin(self):
|
|
|
plugin_id = "egg:rhodecode-enterprise-ce#{}".format(RhodeCodeAuthPlugin.uid)
|
|
|
plugin = RhodeCodeAuthPlugin(plugin_id)
|
|
|
return plugin
|
|
|
|
|
|
def __enter__(self):
|
|
|
plugin = self._get_plugin()
|
|
|
plugin.create_or_update_setting("scope_restriction", scope_restriction)
|
|
|
Session().commit()
|
|
|
SettingsModel().invalidate_settings_cache(hard=True)
|
|
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
|
plugin = self._get_plugin()
|
|
|
plugin.create_or_update_setting("scope_restriction", RhodeCodeAuthPlugin.AUTH_RESTRICTION_SCOPE_ALL)
|
|
|
Session().commit()
|
|
|
SettingsModel().invalidate_settings_cache(hard=True)
|
|
|
|
|
|
return context()
|
|
|
|
|
|
def _get_repo_create_params(self, **custom):
|
|
|
repo_type = custom.get("repo_type") or "hg"
|
|
|
|
|
|
default_landing_ref, landing_ref_lbl = ScmModel.backend_landing_ref(repo_type)
|
|
|
|
|
|
defs = {
|
|
|
"repo_name": None,
|
|
|
"repo_type": repo_type,
|
|
|
"clone_uri": "",
|
|
|
"push_uri": "",
|
|
|
"repo_group": "-1",
|
|
|
"repo_description": "DESC",
|
|
|
"repo_private": False,
|
|
|
"repo_landing_commit_ref": default_landing_ref,
|
|
|
"repo_copy_permissions": False,
|
|
|
"repo_state": Repository.STATE_CREATED,
|
|
|
}
|
|
|
defs.update(custom)
|
|
|
if "repo_name_full" not in custom:
|
|
|
defs.update({"repo_name_full": defs["repo_name"]})
|
|
|
|
|
|
# fix the repo name if passed as repo_name_full
|
|
|
if defs["repo_name"]:
|
|
|
defs["repo_name"] = defs["repo_name"].split("/")[-1]
|
|
|
|
|
|
return defs
|
|
|
|
|
|
def _get_group_create_params(self, **custom):
|
|
|
defs = {
|
|
|
"group_name": None,
|
|
|
"group_description": "DESC",
|
|
|
"perm_updates": [],
|
|
|
"perm_additions": [],
|
|
|
"perm_deletions": [],
|
|
|
"group_parent_id": -1,
|
|
|
"enable_locking": False,
|
|
|
"recursive": False,
|
|
|
}
|
|
|
defs.update(custom)
|
|
|
|
|
|
return defs
|
|
|
|
|
|
def _get_user_create_params(self, name, **custom):
|
|
|
defs = {
|
|
|
"username": name,
|
|
|
"password": "qweqwe",
|
|
|
"email": "%s+test@rhodecode.org" % name,
|
|
|
"firstname": "TestUser",
|
|
|
"lastname": "Test",
|
|
|
"description": "test description",
|
|
|
"active": True,
|
|
|
"admin": False,
|
|
|
"extern_type": "rhodecode",
|
|
|
"extern_name": None,
|
|
|
}
|
|
|
defs.update(custom)
|
|
|
|
|
|
return defs
|
|
|
|
|
|
def _get_user_group_create_params(self, name, **custom):
|
|
|
defs = {
|
|
|
"users_group_name": name,
|
|
|
"user_group_description": "DESC",
|
|
|
"users_group_active": True,
|
|
|
"user_group_data": {},
|
|
|
}
|
|
|
defs.update(custom)
|
|
|
|
|
|
return defs
|
|
|
|
|
|
def create_repo(self, name, **kwargs):
|
|
|
repo_group = kwargs.get("repo_group")
|
|
|
if isinstance(repo_group, RepoGroup):
|
|
|
kwargs["repo_group"] = repo_group.group_id
|
|
|
name = name.split(Repository.NAME_SEP)[-1]
|
|
|
name = Repository.NAME_SEP.join((repo_group.group_name, name))
|
|
|
|
|
|
if "skip_if_exists" in kwargs:
|
|
|
del kwargs["skip_if_exists"]
|
|
|
r = Repository.get_by_repo_name(name)
|
|
|
if r:
|
|
|
return r
|
|
|
|
|
|
form_data = self._get_repo_create_params(repo_name=name, **kwargs)
|
|
|
cur_user = kwargs.get("cur_user", TEST_USER_ADMIN_LOGIN)
|
|
|
RepoModel().create(form_data, cur_user)
|
|
|
Session().commit()
|
|
|
repo = Repository.get_by_repo_name(name)
|
|
|
assert repo
|
|
|
return repo
|
|
|
|
|
|
def create_fork(self, repo_to_fork, fork_name, **kwargs):
|
|
|
repo_to_fork = Repository.get_by_repo_name(repo_to_fork)
|
|
|
|
|
|
form_data = self._get_repo_create_params(
|
|
|
repo_name=fork_name, fork_parent_id=repo_to_fork.repo_id, repo_type=repo_to_fork.repo_type, **kwargs
|
|
|
)
|
|
|
|
|
|
# TODO: fix it !!
|
|
|
form_data["description"] = form_data["repo_description"]
|
|
|
form_data["private"] = form_data["repo_private"]
|
|
|
form_data["landing_rev"] = form_data["repo_landing_commit_ref"]
|
|
|
|
|
|
owner = kwargs.get("cur_user", TEST_USER_ADMIN_LOGIN)
|
|
|
RepoModel().create_fork(form_data, cur_user=owner)
|
|
|
Session().commit()
|
|
|
r = Repository.get_by_repo_name(fork_name)
|
|
|
assert r
|
|
|
return r
|
|
|
|
|
|
def destroy_repo(self, repo_name, **kwargs):
|
|
|
RepoModel().delete(repo_name, pull_requests="delete", artifacts="delete", **kwargs)
|
|
|
Session().commit()
|
|
|
|
|
|
def destroy_repo_on_filesystem(self, repo_name):
|
|
|
rm_path = os.path.join(RepoModel().repos_path, repo_name)
|
|
|
if os.path.isdir(rm_path):
|
|
|
shutil.rmtree(rm_path)
|
|
|
|
|
|
def create_repo_group(self, name, **kwargs):
|
|
|
if "skip_if_exists" in kwargs:
|
|
|
del kwargs["skip_if_exists"]
|
|
|
gr = RepoGroup.get_by_group_name(group_name=name)
|
|
|
if gr:
|
|
|
return gr
|
|
|
form_data = self._get_group_create_params(group_name=name, **kwargs)
|
|
|
owner = kwargs.get("cur_user", TEST_USER_ADMIN_LOGIN)
|
|
|
gr = RepoGroupModel().create(
|
|
|
group_name=form_data["group_name"], group_description=form_data["group_name"], owner=owner
|
|
|
)
|
|
|
Session().commit()
|
|
|
gr = RepoGroup.get_by_group_name(gr.group_name)
|
|
|
return gr
|
|
|
|
|
|
def destroy_repo_group(self, repogroupid):
|
|
|
RepoGroupModel().delete(repogroupid)
|
|
|
Session().commit()
|
|
|
|
|
|
def create_user(self, name, **kwargs):
|
|
|
if "skip_if_exists" in kwargs:
|
|
|
del kwargs["skip_if_exists"]
|
|
|
user = User.get_by_username(name)
|
|
|
if user:
|
|
|
return user
|
|
|
form_data = self._get_user_create_params(name, **kwargs)
|
|
|
user = UserModel().create(form_data)
|
|
|
|
|
|
# create token for user
|
|
|
AuthTokenModel().create(user=user, description="TEST_USER_TOKEN")
|
|
|
|
|
|
Session().commit()
|
|
|
user = User.get_by_username(user.username)
|
|
|
return user
|
|
|
|
|
|
def destroy_user(self, userid):
|
|
|
UserModel().delete(userid)
|
|
|
Session().commit()
|
|
|
|
|
|
def create_additional_user_email(self, user, email):
|
|
|
uem = UserEmailMap()
|
|
|
uem.user = user
|
|
|
uem.email = email
|
|
|
Session().add(uem)
|
|
|
return uem
|
|
|
|
|
|
def destroy_users(self, userid_iter):
|
|
|
for user_id in userid_iter:
|
|
|
if User.get_by_username(user_id):
|
|
|
UserModel().delete(user_id)
|
|
|
Session().commit()
|
|
|
|
|
|
def create_user_group(self, name, **kwargs):
|
|
|
if "skip_if_exists" in kwargs:
|
|
|
del kwargs["skip_if_exists"]
|
|
|
gr = UserGroup.get_by_group_name(group_name=name)
|
|
|
if gr:
|
|
|
return gr
|
|
|
# map active flag to the real attribute. For API consistency of fixtures
|
|
|
if "active" in kwargs:
|
|
|
kwargs["users_group_active"] = kwargs["active"]
|
|
|
del kwargs["active"]
|
|
|
form_data = self._get_user_group_create_params(name, **kwargs)
|
|
|
owner = kwargs.get("cur_user", TEST_USER_ADMIN_LOGIN)
|
|
|
user_group = UserGroupModel().create(
|
|
|
name=form_data["users_group_name"],
|
|
|
description=form_data["user_group_description"],
|
|
|
owner=owner,
|
|
|
active=form_data["users_group_active"],
|
|
|
group_data=form_data["user_group_data"],
|
|
|
)
|
|
|
Session().commit()
|
|
|
user_group = UserGroup.get_by_group_name(user_group.users_group_name)
|
|
|
return user_group
|
|
|
|
|
|
def destroy_user_group(self, usergroupid):
|
|
|
UserGroupModel().delete(user_group=usergroupid, force=True)
|
|
|
Session().commit()
|
|
|
|
|
|
def create_gist(self, **kwargs):
|
|
|
form_data = {
|
|
|
"description": "new-gist",
|
|
|
"owner": TEST_USER_ADMIN_LOGIN,
|
|
|
"gist_type": GistModel.cls.GIST_PUBLIC,
|
|
|
"lifetime": -1,
|
|
|
"acl_level": Gist.ACL_LEVEL_PUBLIC,
|
|
|
"gist_mapping": {
|
|
|
b"filename1.txt": {"content": b"hello world"},
|
|
|
},
|
|
|
}
|
|
|
form_data.update(kwargs)
|
|
|
gist = GistModel().create(
|
|
|
description=form_data["description"],
|
|
|
owner=form_data["owner"],
|
|
|
gist_mapping=form_data["gist_mapping"],
|
|
|
gist_type=form_data["gist_type"],
|
|
|
lifetime=form_data["lifetime"],
|
|
|
gist_acl_level=form_data["acl_level"],
|
|
|
)
|
|
|
Session().commit()
|
|
|
return gist
|
|
|
|
|
|
def destroy_gists(self, gistid=None):
|
|
|
for g in GistModel.cls.get_all():
|
|
|
if gistid:
|
|
|
if gistid == g.gist_access_id:
|
|
|
GistModel().delete(g)
|
|
|
else:
|
|
|
GistModel().delete(g)
|
|
|
Session().commit()
|
|
|
|
|
|
def load_resource(self, resource_name, strip=False):
|
|
|
with open(os.path.join(FIXTURES, resource_name), "rb") as f:
|
|
|
source = f.read()
|
|
|
if strip:
|
|
|
source = source.strip()
|
|
|
|
|
|
return source
|
|
|
|