test_hooks.py
141 lines
| 4.7 KiB
| text/x-python
|
PythonLexer
r130 | # RhodeCode VCSServer provides access to different vcs backends via network. | |||
r1126 | # Copyright (C) 2014-2023 RhodeCode GmbH | |||
r130 | # | |||
# This program is free software; you can redistribute it and/or modify | ||||
# it under the terms of the GNU General Public License as published by | ||||
# the Free Software Foundation; either version 3 of the License, or | ||||
# (at your option) any later version. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU General Public License | ||||
# along with this program; if not, write to the Free Software Foundation, | ||||
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | ||||
r1315 | import pytest | |||
r130 | ||||
import mercurial.ui | ||||
import mock | ||||
r1243 | from vcsserver.lib.ext_json import json | |||
r130 | from vcsserver import hooks | |||
def get_hg_ui(extras=None): | ||||
"""Create a Config object with a valid RC_SCM_DATA entry.""" | ||||
extras = extras or {} | ||||
required_extras = { | ||||
'username': '', | ||||
'repository': '', | ||||
'locked_by': '', | ||||
'scm': '', | ||||
'make_lock': '', | ||||
'action': '', | ||||
'ip': '', | ||||
} | ||||
required_extras.update(extras) | ||||
hg_ui = mercurial.ui.ui() | ||||
r1048 | hg_ui.setconfig(b'rhodecode', b'RC_SCM_DATA', json.dumps(required_extras)) | |||
r130 | ||||
return hg_ui | ||||
def test_git_pre_receive_is_disabled(): | ||||
extras = {'hooks': ['pull']} | ||||
response = hooks.git_pre_receive(None, None, | ||||
{'RC_SCM_DATA': json.dumps(extras)}) | ||||
assert response == 0 | ||||
def test_git_post_receive_is_disabled(): | ||||
extras = {'hooks': ['pull']} | ||||
response = hooks.git_post_receive(None, '', | ||||
{'RC_SCM_DATA': json.dumps(extras)}) | ||||
assert response == 0 | ||||
def test_git_post_receive_calls_repo_size(): | ||||
extras = {'hooks': ['push', 'repo_size']} | ||||
r1048 | ||||
r130 | with mock.patch.object(hooks, '_call_hook') as call_hook_mock: | |||
hooks.git_post_receive( | ||||
None, '', {'RC_SCM_DATA': json.dumps(extras)}) | ||||
r555 | extras.update({'commit_ids': [], 'hook_type': 'post_receive', | |||
r223 | 'new_refs': {'bookmarks': [], 'branches': [], 'tags': []}}) | |||
r130 | expected_calls = [ | |||
mock.call('repo_size', extras, mock.ANY), | ||||
mock.call('post_push', extras, mock.ANY), | ||||
] | ||||
assert call_hook_mock.call_args_list == expected_calls | ||||
def test_git_post_receive_does_not_call_disabled_repo_size(): | ||||
extras = {'hooks': ['push']} | ||||
r1048 | ||||
r130 | with mock.patch.object(hooks, '_call_hook') as call_hook_mock: | |||
hooks.git_post_receive( | ||||
None, '', {'RC_SCM_DATA': json.dumps(extras)}) | ||||
r555 | extras.update({'commit_ids': [], 'hook_type': 'post_receive', | |||
r223 | 'new_refs': {'bookmarks': [], 'branches': [], 'tags': []}}) | |||
r130 | expected_calls = [ | |||
mock.call('post_push', extras, mock.ANY) | ||||
] | ||||
assert call_hook_mock.call_args_list == expected_calls | ||||
def test_repo_size_exception_does_not_affect_git_post_receive(): | ||||
extras = {'hooks': ['push', 'repo_size']} | ||||
status = 0 | ||||
def side_effect(name, *args, **kwargs): | ||||
if name == 'repo_size': | ||||
raise Exception('Fake exception') | ||||
else: | ||||
return status | ||||
with mock.patch.object(hooks, '_call_hook') as call_hook_mock: | ||||
call_hook_mock.side_effect = side_effect | ||||
result = hooks.git_post_receive( | ||||
None, '', {'RC_SCM_DATA': json.dumps(extras)}) | ||||
assert result == status | ||||
def test_git_pre_pull_is_disabled(): | ||||
assert hooks.git_pre_pull({'hooks': ['push']}) == hooks.HookResponse(0, '') | ||||
def test_git_post_pull_is_disabled(): | ||||
assert ( | ||||
hooks.git_post_pull({'hooks': ['push']}) == hooks.HookResponse(0, '')) | ||||
r1152 | class TestGetHooksClient: | |||
r130 | ||||
r1204 | def test_return_celery_client_when_queue_and_backend_provided(self): | |||
r1324 | extras = { | |||
'hooks_protocol': 'celery', | ||||
'hooks_config': {'broker_url': 'redis://task_queue:0', 'result_backend': 'redis://task_queue:0'} | ||||
} | ||||
result = hooks._get_hooks_client(extras) | ||||
r1204 | assert isinstance(result, hooks.HooksCeleryClient) | |||
r130 | ||||
r1315 | class TestHooksCeleryClient: | |||
r130 | ||||
r1315 | def test_hooks_http_client_init(self): | |||
queue = 'redis://redis:6379/0' | ||||
backend = 'redis://redis:6379/0' | ||||
client = hooks.HooksCeleryClient(queue, backend) | ||||
assert client.celery_app.conf.broker_url == queue | ||||
r1324 | ||||
def test_hooks_http_client_init_with_extra_opts(self): | ||||
queue = 'redis://redis:6379/0' | ||||
backend = 'redis://redis:6379/0' | ||||
client = hooks.HooksCeleryClient(queue, backend, _celery_opts={'task_always_eager': True}) | ||||
assert client.celery_app.conf.broker_url == queue | ||||
assert client.celery_app.conf.task_always_eager == True | ||||