##// END OF EJS Templates
fix(tests): added ability for test to get an up-to-date version mercurial.
fix(tests): added ability for test to get an up-to-date version mercurial.

File last commit:

r5607:39b20522 default
r5653:11c8ab5c default
Show More
test_hooks_daemon.py
98 lines | 3.9 KiB | text/x-python | PythonLexer
/ rhodecode / tests / lib / test_hooks_daemon.py
tests: fixed test suite for celery adoption
r5607 # Copyright (C) 2010-2024 RhodeCode GmbH
project: added all source files and assets
r1 #
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import logging
import mock
import pytest
feat(celery-hooks): added all needed changes to support new celery backend, removed DummyHooksCallbackDaemon, updated tests. Fixes: RCCE-55
r5298 import tempfile
project: added all source files and assets
r1
feat(ssh-wrapper-speedup): major rewrite of code to address imports problem with ssh-wrapper-v2...
r5325 from rhodecode.lib.hook_daemon import celery_hooks_deamon
tests: fixed test suite for celery adoption
r5607 from rhodecode.lib.hook_daemon import utils as hooks_utils
feat(ssh-wrapper-speedup): major rewrite of code to address imports problem with ssh-wrapper-v2...
r5325 from rhodecode.lib.hook_daemon import base as hook_base
tests: fixed test suite for celery adoption
r5607
project: added all source files and assets
r1 from rhodecode.tests.utils import assert_message_in_log
class TestHooks(object):
def test_hooks_can_be_used_as_a_context_processor(self):
tests: fixed test suite for celery adoption
r5607 hooks = hook_base.Hooks()
project: added all source files and assets
r1 with hooks as return_value:
pass
assert hooks == return_value
class TestPrepareHooksDaemon(object):
feat(ssh-wrapper-speedup): major rewrite of code to address imports problem with ssh-wrapper-v2...
r5325
feat(celery-hooks): added all needed changes to support new celery backend, removed DummyHooksCallbackDaemon, updated tests. Fixes: RCCE-55
r5298 @pytest.mark.parametrize('protocol', ('celery',))
tests: fixed test suite for celery adoption
r5607 def test_returns_celery_hooks_callback_daemon_when_celery_protocol_specified(self, protocol):
feat(celery-hooks): added all needed changes to support new celery backend, removed DummyHooksCallbackDaemon, updated tests. Fixes: RCCE-55
r5298 with tempfile.NamedTemporaryFile(mode='w') as temp_file:
tests: fixed test suite for celery adoption
r5607 temp_file.write(
"[app:main]\n"
"celery.broker_url = redis://redis/0\n"
"celery.result_backend = redis://redis/0\n"
)
feat(celery-hooks): added all needed changes to support new celery backend, removed DummyHooksCallbackDaemon, updated tests. Fixes: RCCE-55
r5298 temp_file.flush()
expected_extras = {'config': temp_file.name}
tests: fixed test suite for celery adoption
r5607 callback, extras = hooks_utils.prepare_callback_daemon(expected_extras, protocol=protocol)
feat(ssh-wrapper-speedup): major rewrite of code to address imports problem with ssh-wrapper-v2...
r5325 assert isinstance(callback, celery_hooks_deamon.CeleryHooksCallbackDaemon)
project: added all source files and assets
r1
@pytest.mark.parametrize('protocol, expected_class', (
tests: fixed test suite for celery adoption
r5607 ('celery', celery_hooks_deamon.CeleryHooksCallbackDaemon),
project: added all source files and assets
r1 ))
tests: fixed test suite for celery adoption
r5607 def test_returns_real_hooks_callback_daemon_when_protocol_is_specified(self, protocol, expected_class):
with tempfile.NamedTemporaryFile(mode='w') as temp_file:
temp_file.write(
"[app:main]\n"
"celery.broker_url = redis://redis:6379/0\n"
"celery.result_backend = redis://redis:6379/0\n"
)
temp_file.flush()
expected_extras = {
'extra1': 'value1',
'txn_id': 'txnid2',
'hooks_protocol': protocol.lower(),
'hooks_config': {
'broker_url': 'redis://redis:6379/0',
'result_backend': 'redis://redis:6379/0',
},
'repo_store': '/var/opt/rhodecode_repo_store',
'repository': 'rhodecode',
'config': temp_file.name
}
from rhodecode import CONFIG
CONFIG['vcs.svn.redis_conn'] = 'redis://redis:6379/0'
callback, extras = hooks_utils.prepare_callback_daemon(expected_extras.copy(), protocol=protocol,txn_id='txnid2')
assert isinstance(callback, expected_class)
expected_extras['time'] = extras['time']
assert extras == expected_extras
project: added all source files and assets
r1
Martin Bornhold
tests: Fix tests that depend on the default value of 'pyro4' as backend.
r968 @pytest.mark.parametrize('protocol', (
'invalid',
'Http',
'HTTP',
fix(tests): fixed tests for PR celery hooks deamon
r5589 'celerY'
Martin Bornhold
tests: Fix tests that depend on the default value of 'pyro4' as backend.
r968 ))
def test_raises_on_invalid_protocol(self, protocol):
expected_extras = {
'extra1': 'value1',
'hooks_protocol': protocol.lower()
}
with pytest.raises(Exception):
tests: fixed test suite for celery adoption
r5607 callback, extras = hooks_utils.prepare_callback_daemon(expected_extras.copy(), protocol=protocol)