conftest.py
136 lines
| 4.4 KiB
| text/x-python
|
PythonLexer
r5088 | # Copyright (C) 2010-2023 RhodeCode GmbH | |||
r5087 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
""" | ||||
py.test config for test suite for making push/pull operations. | ||||
.. important:: | ||||
You must have git >= 1.8.5 for tests to work fine. With 68b939b git started | ||||
to redirect things to stderr instead of stdout. | ||||
""" | ||||
import pytest | ||||
import logging | ||||
from rhodecode.authentication import AuthenticationPluginRegistry | ||||
from rhodecode.model.db import Permission, User | ||||
from rhodecode.model.meta import Session | ||||
from rhodecode.model.settings import SettingsModel | ||||
from rhodecode.model.user import UserModel | ||||
log = logging.getLogger(__name__) | ||||
r5153 | # Docker image running httpbin... | |||
HTTPBIN_DOMAIN = 'http://httpbin' | ||||
HTTPBIN_POST = HTTPBIN_DOMAIN + '/post' | ||||
r5087 | ||||
@pytest.fixture() | ||||
def enable_auth_plugins(request, baseapp, csrf_token): | ||||
""" | ||||
Return a factory object that when called, allows to control which | ||||
authentication plugins are enabled. | ||||
""" | ||||
class AuthPluginManager(object): | ||||
def cleanup(self): | ||||
self._enable_plugins(['egg:rhodecode-enterprise-ce#rhodecode']) | ||||
def enable(self, plugins_list, override=None): | ||||
return self._enable_plugins(plugins_list, override) | ||||
def _enable_plugins(self, plugins_list, override=None): | ||||
override = override or {} | ||||
params = { | ||||
'auth_plugins': ','.join(plugins_list), | ||||
} | ||||
# helper translate some names to others, to fix settings code | ||||
name_map = { | ||||
'token': 'authtoken' | ||||
} | ||||
log.debug('enable_auth_plugins: enabling following auth-plugins: %s', plugins_list) | ||||
for module in plugins_list: | ||||
plugin_name = module.partition('#')[-1] | ||||
if plugin_name in name_map: | ||||
plugin_name = name_map[plugin_name] | ||||
enabled_plugin = f'auth_{plugin_name}_enabled' | ||||
cache_ttl = f'auth_{plugin_name}_cache_ttl' | ||||
# default params that are needed for each plugin, | ||||
# `enabled` and `cache_ttl` | ||||
params.update({ | ||||
enabled_plugin: True, | ||||
cache_ttl: 0 | ||||
}) | ||||
if override.get: | ||||
params.update(override.get(module, {})) | ||||
validated_params = params | ||||
for k, v in validated_params.items(): | ||||
setting = SettingsModel().create_or_update_setting(k, v) | ||||
Session().add(setting) | ||||
Session().commit() | ||||
AuthenticationPluginRegistry.invalidate_auth_plugins_cache(hard=True) | ||||
enabled_plugins = SettingsModel().get_auth_plugins() | ||||
assert plugins_list == enabled_plugins | ||||
enabler = AuthPluginManager() | ||||
request.addfinalizer(enabler.cleanup) | ||||
return enabler | ||||
@pytest.fixture() | ||||
def test_user_factory(request, baseapp): | ||||
def user_factory(username='test_user', password='qweqwe', first_name='John', last_name='Testing', **kwargs): | ||||
usr = UserModel().create_or_update( | ||||
username=username, | ||||
password=password, | ||||
email=f'{username}@rhodecode.org', | ||||
firstname=first_name, lastname=last_name) | ||||
Session().commit() | ||||
for k, v in kwargs.items(): | ||||
setattr(usr, k, v) | ||||
Session().add(usr) | ||||
assert User.get_by_username(username) == usr | ||||
@request.addfinalizer | ||||
def cleanup(): | ||||
if UserModel().get_user(usr.user_id) is None: | ||||
return | ||||
perm = Permission.query().all() | ||||
for p in perm: | ||||
UserModel().revoke_perm(usr, p) | ||||
UserModel().delete(usr.user_id) | ||||
Session().commit() | ||||
return usr | ||||
return user_factory | ||||