# RhodeCode VCSServer provides access to different vcs backends via network. # Copyright (C) 2014-2023 RhodeCode GmbH # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA import pytest import mercurial.ui import mock from vcsserver.lib.ext_json import json from vcsserver import hooks def get_hg_ui(extras=None): """Create a Config object with a valid RC_SCM_DATA entry.""" extras = extras or {} required_extras = { 'username': '', 'repository': '', 'locked_by': '', 'scm': '', 'make_lock': '', 'action': '', 'ip': '', } required_extras.update(extras) hg_ui = mercurial.ui.ui() hg_ui.setconfig(b'rhodecode', b'RC_SCM_DATA', json.dumps(required_extras)) return hg_ui def test_git_pre_receive_is_disabled(): extras = {'hooks': ['pull']} response = hooks.git_pre_receive(None, None, {'RC_SCM_DATA': json.dumps(extras)}) assert response == 0 def test_git_post_receive_is_disabled(): extras = {'hooks': ['pull']} response = hooks.git_post_receive(None, '', {'RC_SCM_DATA': json.dumps(extras)}) assert response == 0 def test_git_post_receive_calls_repo_size(): extras = {'hooks': ['push', 'repo_size']} with mock.patch.object(hooks, '_call_hook') as call_hook_mock: hooks.git_post_receive( None, '', {'RC_SCM_DATA': json.dumps(extras)}) extras.update({'commit_ids': [], 'hook_type': 'post_receive', 'new_refs': {'bookmarks': [], 'branches': [], 'tags': []}}) expected_calls = [ mock.call('repo_size', extras, mock.ANY), mock.call('post_push', extras, mock.ANY), ] assert call_hook_mock.call_args_list == expected_calls def test_git_post_receive_does_not_call_disabled_repo_size(): extras = {'hooks': ['push']} with mock.patch.object(hooks, '_call_hook') as call_hook_mock: hooks.git_post_receive( None, '', {'RC_SCM_DATA': json.dumps(extras)}) extras.update({'commit_ids': [], 'hook_type': 'post_receive', 'new_refs': {'bookmarks': [], 'branches': [], 'tags': []}}) expected_calls = [ mock.call('post_push', extras, mock.ANY) ] assert call_hook_mock.call_args_list == expected_calls def test_repo_size_exception_does_not_affect_git_post_receive(): extras = {'hooks': ['push', 'repo_size']} status = 0 def side_effect(name, *args, **kwargs): if name == 'repo_size': raise Exception('Fake exception') else: return status with mock.patch.object(hooks, '_call_hook') as call_hook_mock: call_hook_mock.side_effect = side_effect result = hooks.git_post_receive( None, '', {'RC_SCM_DATA': json.dumps(extras)}) assert result == status def test_git_pre_pull_is_disabled(): assert hooks.git_pre_pull({'hooks': ['push']}) == hooks.HookResponse(0, '') def test_git_post_pull_is_disabled(): assert ( hooks.git_post_pull({'hooks': ['push']}) == hooks.HookResponse(0, '')) class TestGetHooksClient: def test_return_celery_client_when_queue_and_backend_provided(self): extras = { 'hooks_protocol': 'celery', 'hooks_config': {'broker_url': 'redis://task_queue:0', 'result_backend': 'redis://task_queue:0'} } result = hooks._get_hooks_client(extras) assert isinstance(result, hooks.HooksCeleryClient) class TestHooksCeleryClient: def test_hooks_http_client_init(self): queue = 'redis://redis:6379/0' backend = 'redis://redis:6379/0' client = hooks.HooksCeleryClient(queue, backend) assert client.celery_app.conf.broker_url == queue def test_hooks_http_client_init_with_extra_opts(self): queue = 'redis://redis:6379/0' backend = 'redis://redis:6379/0' client = hooks.HooksCeleryClient(queue, backend, _celery_opts={'task_always_eager': True}) assert client.celery_app.conf.broker_url == queue assert client.celery_app.conf.task_always_eager == True