##// END OF EJS Templates
feat(configs): deprecared old hooks protocol and ssh wrapper....
feat(configs): deprecared old hooks protocol and ssh wrapper. New defaults are now set on v2 keys, so previous installation are automatically set to new keys. Fallback mode is still available.

File last commit:

r5178:30100c99 default
r5496:cab50adf default
Show More
fixture.py
428 lines | 14.4 KiB | text/x-python | PythonLexer
project: added all source files and assets
r1
copyrights: updated for 2023
r5088 # Copyright (C) 2010-2023 RhodeCode GmbH
project: added all source files and assets
r1 #
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
"""
Helpers for fixture generation
"""
import os
import time
import tempfile
import shutil
core: dropped usage of configobj entirely
r4975 import configparser
project: added all source files and assets
r1
Authentication: cache plugins for auth and their settings in the auth_registry....
r4220 from rhodecode.model.settings import SettingsModel
Bartłomiej Wołyńczyk
my-account: security change, added select filed with email from extra emails while editing user profile, now adding extra emails required type password. Task #5386
r2592 from rhodecode.model.db import Repository, User, RepoGroup, UserGroup, Gist, UserEmailMap
project: added all source files and assets
r1 from rhodecode.model.meta import Session
from rhodecode.model.repo import RepoModel
from rhodecode.model.user import UserModel
from rhodecode.model.repo_group import RepoGroupModel
from rhodecode.model.user_group import UserGroupModel
from rhodecode.model.gist import GistModel
auth-tokens: fixed tests
r1482 from rhodecode.model.auth_token import AuthTokenModel
repos: cleannup and fix landing-ref code....
r4852 from rhodecode.model.scm import ScmModel
authentication: introduce login restriction option for builtin rhodecode plugin.
r3387 from rhodecode.authentication.plugins.auth_rhodecode import \
RhodeCodeAuthPlugin
project: added all source files and assets
r1
code: import fix/pep8
r5178 from rhodecode.tests import TEST_USER_ADMIN_LOGIN
project: added all source files and assets
r1 dn = os.path.dirname
FIXTURES = os.path.join(dn(dn(os.path.abspath(__file__))), 'tests', 'fixtures')
def error_function(*args, **kwargs):
raise Exception('Total Crash !')
class TestINI(object):
"""
Allows to create a new test.ini file as a copy of existing one with edited
data. Example usage::
with TestINI('test.ini', [{'section':{'key':val'}]) as new_test_ini_path:
core: use py3 compatible prints
r3057 print('paster server %s' % new_test_ini)
project: added all source files and assets
r1 """
def __init__(self, ini_file_path, ini_params, new_file_prefix='DEFAULT',
destroy=True, dir=None):
self.ini_file_path = ini_file_path
self.ini_params = ini_params
self.new_path = None
self.new_path_prefix = new_file_prefix
self._destroy = destroy
self._dir = dir
def __enter__(self):
return self.create()
def __exit__(self, exc_type, exc_val, exc_tb):
self.destroy()
def create(self):
core: dropped usage of configobj entirely
r4975 parser = configparser.ConfigParser()
parser.read(self.ini_file_path)
project: added all source files and assets
r1
for data in self.ini_params:
core: dropped usage of configobj entirely
r4975 section, ini_params = list(data.items())[0]
tests: Fix --without-vcsserver parameter...
r816 for key, val in ini_params.items():
core: dropped usage of configobj entirely
r4975 parser[section][key] = str(val)
project: added all source files and assets
r1 with tempfile.NamedTemporaryFile(
core: dropped usage of configobj entirely
r4975 mode='w',
project: added all source files and assets
r1 prefix=self.new_path_prefix, suffix='.ini', dir=self._dir,
delete=False) as new_ini_file:
core: dropped usage of configobj entirely
r4975 parser.write(new_ini_file)
project: added all source files and assets
r1 self.new_path = new_ini_file.name
return self.new_path
def destroy(self):
if self._destroy:
os.remove(self.new_path)
class Fixture(object):
def anon_access(self, status):
"""
Context process for disabling anonymous access. use like:
fixture = Fixture()
with fixture.anon_access(False):
#tests
after this block anon access will be set to `not status`
"""
class context(object):
def __enter__(self):
anon = User.get_default_user()
anon.active = status
Session().add(anon)
Session().commit()
time.sleep(1.5) # must sleep for cache (1s to expire)
def __exit__(self, exc_type, exc_val, exc_tb):
anon = User.get_default_user()
anon.active = not status
Session().add(anon)
Session().commit()
return context()
Authentication: cache plugins for auth and their settings in the auth_registry....
r4220 def auth_restriction(self, registry, auth_restriction):
authentication: introduce login restriction option for builtin rhodecode plugin.
r3387 """
auth: add scope and login restrictions to rhodecode plugin, and scope restriction to token plugin....
r3392 Context process for changing the builtin rhodecode plugin auth restrictions.
authentication: introduce login restriction option for builtin rhodecode plugin.
r3387 Use like:
fixture = Fixture()
auth: add scope and login restrictions to rhodecode plugin, and scope restriction to token plugin....
r3392 with fixture.auth_restriction('super_admin'):
authentication: introduce login restriction option for builtin rhodecode plugin.
r3387 #tests
auth: add scope and login restrictions to rhodecode plugin, and scope restriction to token plugin....
r3392 after this block auth restriction will be taken off
authentication: introduce login restriction option for builtin rhodecode plugin.
r3387 """
class context(object):
caches: fixed auth plugin usage of cached settings....
r4836 def _get_plugin(self):
Authentication: cache plugins for auth and their settings in the auth_registry....
r4220 plugin_id = 'egg:rhodecode-enterprise-ce#{}'.format(RhodeCodeAuthPlugin.uid)
authentication: introduce login restriction option for builtin rhodecode plugin.
r3387 plugin = RhodeCodeAuthPlugin(plugin_id)
return plugin
def __enter__(self):
caches: fixed auth plugin usage of cached settings....
r4836
plugin = self._get_plugin()
Authentication: cache plugins for auth and their settings in the auth_registry....
r4220 plugin.create_or_update_setting('auth_restriction', auth_restriction)
authentication: introduce login restriction option for builtin rhodecode plugin.
r3387 Session().commit()
tests: fixed all tests for python3 BIG changes
r5087 SettingsModel().invalidate_settings_cache(hard=True)
authentication: introduce login restriction option for builtin rhodecode plugin.
r3387
def __exit__(self, exc_type, exc_val, exc_tb):
caches: fixed auth plugin usage of cached settings....
r4836
plugin = self._get_plugin()
authentication: introduce login restriction option for builtin rhodecode plugin.
r3387 plugin.create_or_update_setting(
auth: add scope and login restrictions to rhodecode plugin, and scope restriction to token plugin....
r3392 'auth_restriction', RhodeCodeAuthPlugin.AUTH_RESTRICTION_NONE)
Session().commit()
tests: fixed all tests for python3 BIG changes
r5087 SettingsModel().invalidate_settings_cache(hard=True)
auth: add scope and login restrictions to rhodecode plugin, and scope restriction to token plugin....
r3392
return context()
Authentication: cache plugins for auth and their settings in the auth_registry....
r4220 def scope_restriction(self, registry, scope_restriction):
auth: add scope and login restrictions to rhodecode plugin, and scope restriction to token plugin....
r3392 """
Context process for changing the builtin rhodecode plugin scope restrictions.
Use like:
fixture = Fixture()
with fixture.scope_restriction('scope_http'):
#tests
after this block scope restriction will be taken off
"""
class context(object):
caches: fixed auth plugin usage of cached settings....
r4836 def _get_plugin(self):
Authentication: cache plugins for auth and their settings in the auth_registry....
r4220 plugin_id = 'egg:rhodecode-enterprise-ce#{}'.format(RhodeCodeAuthPlugin.uid)
auth: add scope and login restrictions to rhodecode plugin, and scope restriction to token plugin....
r3392 plugin = RhodeCodeAuthPlugin(plugin_id)
return plugin
def __enter__(self):
caches: fixed auth plugin usage of cached settings....
r4836 plugin = self._get_plugin()
Authentication: cache plugins for auth and their settings in the auth_registry....
r4220 plugin.create_or_update_setting('scope_restriction', scope_restriction)
auth: add scope and login restrictions to rhodecode plugin, and scope restriction to token plugin....
r3392 Session().commit()
tests: fixed all tests for python3 BIG changes
r5087 SettingsModel().invalidate_settings_cache(hard=True)
auth: add scope and login restrictions to rhodecode plugin, and scope restriction to token plugin....
r3392
def __exit__(self, exc_type, exc_val, exc_tb):
caches: fixed auth plugin usage of cached settings....
r4836 plugin = self._get_plugin()
auth: add scope and login restrictions to rhodecode plugin, and scope restriction to token plugin....
r3392 plugin.create_or_update_setting(
'scope_restriction', RhodeCodeAuthPlugin.AUTH_RESTRICTION_SCOPE_ALL)
authentication: introduce login restriction option for builtin rhodecode plugin.
r3387 Session().commit()
tests: fixed all tests for python3 BIG changes
r5087 SettingsModel().invalidate_settings_cache(hard=True)
authentication: introduce login restriction option for builtin rhodecode plugin.
r3387
return context()
project: added all source files and assets
r1 def _get_repo_create_params(self, **custom):
repos: cleannup and fix landing-ref code....
r4852 repo_type = custom.get('repo_type') or 'hg'
default_landing_ref, landing_ref_lbl = ScmModel.backend_landing_ref(repo_type)
project: added all source files and assets
r1 defs = {
'repo_name': None,
repos: cleannup and fix landing-ref code....
r4852 'repo_type': repo_type,
project: added all source files and assets
r1 'clone_uri': '',
api: update pull method with possible specification of the url
r2563 'push_uri': '',
project: added all source files and assets
r1 'repo_group': '-1',
'repo_description': 'DESC',
'repo_private': False,
repos: cleannup and fix landing-ref code....
r4852 'repo_landing_commit_ref': default_landing_ref,
project: added all source files and assets
r1 'repo_copy_permissions': False,
'repo_state': Repository.STATE_CREATED,
}
defs.update(custom)
if 'repo_name_full' not in custom:
defs.update({'repo_name_full': defs['repo_name']})
# fix the repo name if passed as repo_name_full
if defs['repo_name']:
defs['repo_name'] = defs['repo_name'].split('/')[-1]
return defs
def _get_group_create_params(self, **custom):
defs = {
'group_name': None,
'group_description': 'DESC',
'perm_updates': [],
'perm_additions': [],
'perm_deletions': [],
'group_parent_id': -1,
'enable_locking': False,
'recursive': False,
}
defs.update(custom)
return defs
def _get_user_create_params(self, name, **custom):
defs = {
'username': name,
'password': 'qweqwe',
'email': '%s+test@rhodecode.org' % name,
'firstname': 'TestUser',
'lastname': 'Test',
users: description edit fixes...
r4024 'description': 'test description',
project: added all source files and assets
r1 'active': True,
'admin': False,
'extern_type': 'rhodecode',
'extern_name': None,
}
defs.update(custom)
return defs
def _get_user_group_create_params(self, name, **custom):
defs = {
'users_group_name': name,
'user_group_description': 'DESC',
'users_group_active': True,
'user_group_data': {},
}
defs.update(custom)
return defs
def create_repo(self, name, **kwargs):
repo_group = kwargs.get('repo_group')
if isinstance(repo_group, RepoGroup):
kwargs['repo_group'] = repo_group.group_id
name = name.split(Repository.NAME_SEP)[-1]
name = Repository.NAME_SEP.join((repo_group.group_name, name))
if 'skip_if_exists' in kwargs:
del kwargs['skip_if_exists']
r = Repository.get_by_repo_name(name)
if r:
return r
form_data = self._get_repo_create_params(repo_name=name, **kwargs)
cur_user = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
RepoModel().create(form_data, cur_user)
Session().commit()
repo = Repository.get_by_repo_name(name)
assert repo
return repo
def create_fork(self, repo_to_fork, fork_name, **kwargs):
repo_to_fork = Repository.get_by_repo_name(repo_to_fork)
repos: cleannup and fix landing-ref code....
r4852 form_data = self._get_repo_create_params(
repo_name=fork_name,
fork_parent_id=repo_to_fork.repo_id,
repo_type=repo_to_fork.repo_type,
**kwargs)
code: import fix/pep8
r5178 # TODO: fix it !!
project: added all source files and assets
r1 form_data['description'] = form_data['repo_description']
form_data['private'] = form_data['repo_private']
repos: cleannup and fix landing-ref code....
r4852 form_data['landing_rev'] = form_data['repo_landing_commit_ref']
project: added all source files and assets
r1
owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
RepoModel().create_fork(form_data, cur_user=owner)
Session().commit()
r = Repository.get_by_repo_name(fork_name)
assert r
return r
def destroy_repo(self, repo_name, **kwargs):
repository: add check preventing of removal of repo with attached pull requests....
r3089 RepoModel().delete(repo_name, pull_requests='delete', **kwargs)
project: added all source files and assets
r1 Session().commit()
def destroy_repo_on_filesystem(self, repo_name):
rm_path = os.path.join(RepoModel().repos_path, repo_name)
if os.path.isdir(rm_path):
shutil.rmtree(rm_path)
def create_repo_group(self, name, **kwargs):
if 'skip_if_exists' in kwargs:
del kwargs['skip_if_exists']
gr = RepoGroup.get_by_group_name(group_name=name)
if gr:
return gr
form_data = self._get_group_create_params(group_name=name, **kwargs)
owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
gr = RepoGroupModel().create(
group_name=form_data['group_name'],
group_description=form_data['group_name'],
owner=owner)
Session().commit()
gr = RepoGroup.get_by_group_name(gr.group_name)
return gr
def destroy_repo_group(self, repogroupid):
RepoGroupModel().delete(repogroupid)
Session().commit()
def create_user(self, name, **kwargs):
if 'skip_if_exists' in kwargs:
del kwargs['skip_if_exists']
user = User.get_by_username(name)
if user:
return user
form_data = self._get_user_create_params(name, **kwargs)
user = UserModel().create(form_data)
auth-tokens: fixed tests
r1482
# create token for user
AuthTokenModel().create(
db: use unicode for some of the defaults to reduce sqlalchemy warnings.
r1967 user=user, description=u'TEST_USER_TOKEN')
auth-tokens: fixed tests
r1482
project: added all source files and assets
r1 Session().commit()
user = User.get_by_username(user.username)
return user
def destroy_user(self, userid):
UserModel().delete(userid)
Session().commit()
Bartłomiej Wołyńczyk
my-account: security change, added select filed with email from extra emails while editing user profile, now adding extra emails required type password. Task #5386
r2592 def create_additional_user_email(self, user, email):
uem = UserEmailMap()
uem.user = user
uem.email = email
Session().add(uem)
return uem
project: added all source files and assets
r1 def destroy_users(self, userid_iter):
for user_id in userid_iter:
if User.get_by_username(user_id):
UserModel().delete(user_id)
Session().commit()
def create_user_group(self, name, **kwargs):
if 'skip_if_exists' in kwargs:
del kwargs['skip_if_exists']
gr = UserGroup.get_by_group_name(group_name=name)
if gr:
return gr
core: moved users and user groups autocomplete into pyramid....
r1666 # map active flag to the real attribute. For API consistency of fixtures
if 'active' in kwargs:
kwargs['users_group_active'] = kwargs['active']
del kwargs['active']
project: added all source files and assets
r1 form_data = self._get_user_group_create_params(name, **kwargs)
owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
user_group = UserGroupModel().create(
name=form_data['users_group_name'],
description=form_data['user_group_description'],
owner=owner, active=form_data['users_group_active'],
group_data=form_data['user_group_data'])
Session().commit()
user_group = UserGroup.get_by_group_name(user_group.users_group_name)
return user_group
def destroy_user_group(self, usergroupid):
UserGroupModel().delete(user_group=usergroupid, force=True)
Session().commit()
def create_gist(self, **kwargs):
form_data = {
'description': 'new-gist',
'owner': TEST_USER_ADMIN_LOGIN,
'gist_type': GistModel.cls.GIST_PUBLIC,
'lifetime': -1,
'acl_level': Gist.ACL_LEVEL_PUBLIC,
tests: fixed all tests for python3 BIG changes
r5087 'gist_mapping': {b'filename1.txt': {'content': b'hello world'},}
project: added all source files and assets
r1 }
form_data.update(kwargs)
gist = GistModel().create(
description=form_data['description'], owner=form_data['owner'],
gist_mapping=form_data['gist_mapping'], gist_type=form_data['gist_type'],
lifetime=form_data['lifetime'], gist_acl_level=form_data['acl_level']
)
Session().commit()
return gist
def destroy_gists(self, gistid=None):
for g in GistModel.cls.get_all():
if gistid:
if gistid == g.gist_access_id:
GistModel().delete(g)
else:
GistModel().delete(g)
Session().commit()
def load_resource(self, resource_name, strip=False):
tests: fixed all tests for python3 BIG changes
r5087 with open(os.path.join(FIXTURES, resource_name), 'rb') as f:
project: added all source files and assets
r1 source = f.read()
if strip:
source = source.strip()
return source