|
|
# RhodeCode VCSServer provides access to different vcs backends via network.
|
|
|
# Copyright (C) 2014-2023 RhodeCode GmbH
|
|
|
#
|
|
|
# This program is free software; you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU General Public License as published by
|
|
|
# the Free Software Foundation; either version 3 of the License, or
|
|
|
# (at your option) any later version.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU General Public License
|
|
|
# along with this program; if not, write to the Free Software Foundation,
|
|
|
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
|
|
|
|
|
import pytest
|
|
|
|
|
|
import mercurial.ui
|
|
|
import mock
|
|
|
|
|
|
from vcsserver.lib.ext_json import json
|
|
|
from vcsserver import hooks
|
|
|
|
|
|
|
|
|
def get_hg_ui(extras=None):
|
|
|
"""Create a Config object with a valid RC_SCM_DATA entry."""
|
|
|
extras = extras or {}
|
|
|
required_extras = {
|
|
|
'username': '',
|
|
|
'repository': '',
|
|
|
'locked_by': '',
|
|
|
'scm': '',
|
|
|
'make_lock': '',
|
|
|
'action': '',
|
|
|
'ip': '',
|
|
|
}
|
|
|
required_extras.update(extras)
|
|
|
hg_ui = mercurial.ui.ui()
|
|
|
hg_ui.setconfig(b'rhodecode', b'RC_SCM_DATA', json.dumps(required_extras))
|
|
|
|
|
|
return hg_ui
|
|
|
|
|
|
|
|
|
def test_git_pre_receive_is_disabled():
|
|
|
extras = {'hooks': ['pull']}
|
|
|
response = hooks.git_pre_receive(None, None,
|
|
|
{'RC_SCM_DATA': json.dumps(extras)})
|
|
|
|
|
|
assert response == 0
|
|
|
|
|
|
|
|
|
def test_git_post_receive_is_disabled():
|
|
|
extras = {'hooks': ['pull']}
|
|
|
response = hooks.git_post_receive(None, '',
|
|
|
{'RC_SCM_DATA': json.dumps(extras)})
|
|
|
|
|
|
assert response == 0
|
|
|
|
|
|
|
|
|
def test_git_post_receive_calls_repo_size():
|
|
|
extras = {'hooks': ['push', 'repo_size']}
|
|
|
|
|
|
with mock.patch.object(hooks, '_call_hook') as call_hook_mock:
|
|
|
hooks.git_post_receive(
|
|
|
None, '', {'RC_SCM_DATA': json.dumps(extras)})
|
|
|
extras.update({'commit_ids': [], 'hook_type': 'post_receive',
|
|
|
'new_refs': {'bookmarks': [], 'branches': [], 'tags': []}})
|
|
|
expected_calls = [
|
|
|
mock.call('repo_size', extras, mock.ANY),
|
|
|
mock.call('post_push', extras, mock.ANY),
|
|
|
]
|
|
|
assert call_hook_mock.call_args_list == expected_calls
|
|
|
|
|
|
|
|
|
def test_git_post_receive_does_not_call_disabled_repo_size():
|
|
|
extras = {'hooks': ['push']}
|
|
|
|
|
|
with mock.patch.object(hooks, '_call_hook') as call_hook_mock:
|
|
|
hooks.git_post_receive(
|
|
|
None, '', {'RC_SCM_DATA': json.dumps(extras)})
|
|
|
extras.update({'commit_ids': [], 'hook_type': 'post_receive',
|
|
|
'new_refs': {'bookmarks': [], 'branches': [], 'tags': []}})
|
|
|
expected_calls = [
|
|
|
mock.call('post_push', extras, mock.ANY)
|
|
|
]
|
|
|
assert call_hook_mock.call_args_list == expected_calls
|
|
|
|
|
|
|
|
|
def test_repo_size_exception_does_not_affect_git_post_receive():
|
|
|
extras = {'hooks': ['push', 'repo_size']}
|
|
|
status = 0
|
|
|
|
|
|
def side_effect(name, *args, **kwargs):
|
|
|
if name == 'repo_size':
|
|
|
raise Exception('Fake exception')
|
|
|
else:
|
|
|
return status
|
|
|
|
|
|
with mock.patch.object(hooks, '_call_hook') as call_hook_mock:
|
|
|
call_hook_mock.side_effect = side_effect
|
|
|
result = hooks.git_post_receive(
|
|
|
None, '', {'RC_SCM_DATA': json.dumps(extras)})
|
|
|
assert result == status
|
|
|
|
|
|
|
|
|
def test_git_pre_pull_is_disabled():
|
|
|
assert hooks.git_pre_pull({'hooks': ['push']}) == hooks.HookResponse(0, '')
|
|
|
|
|
|
|
|
|
def test_git_post_pull_is_disabled():
|
|
|
assert (
|
|
|
hooks.git_post_pull({'hooks': ['push']}) == hooks.HookResponse(0, ''))
|
|
|
|
|
|
|
|
|
class TestGetHooksClient:
|
|
|
|
|
|
def test_return_celery_client_when_queue_and_backend_provided(self):
|
|
|
extras = {
|
|
|
'hooks_protocol': 'celery',
|
|
|
'hooks_config': {'broker_url': 'redis://task_queue:0', 'result_backend': 'redis://task_queue:0'}
|
|
|
}
|
|
|
result = hooks._get_hooks_client(extras)
|
|
|
assert isinstance(result, hooks.HooksCeleryClient)
|
|
|
|
|
|
|
|
|
class TestHooksCeleryClient:
|
|
|
|
|
|
def test_hooks_http_client_init(self):
|
|
|
queue = 'redis://redis:6379/0'
|
|
|
backend = 'redis://redis:6379/0'
|
|
|
client = hooks.HooksCeleryClient(queue, backend)
|
|
|
assert client.celery_app.conf.broker_url == queue
|
|
|
|
|
|
def test_hooks_http_client_init_with_extra_opts(self):
|
|
|
queue = 'redis://redis:6379/0'
|
|
|
backend = 'redis://redis:6379/0'
|
|
|
client = hooks.HooksCeleryClient(queue, backend, _celery_opts={'task_always_eager': True})
|
|
|
assert client.celery_app.conf.broker_url == queue
|
|
|
assert client.celery_app.conf.task_always_eager == True
|
|
|
|