test_hooks.py
245 lines
| 7.8 KiB
| text/x-python
|
PythonLexer
r130 | # RhodeCode VCSServer provides access to different vcs backends via network. | |||
r850 | # Copyright (C) 2014-2020 RhodeCode GmbH | |||
r130 | # | |||
# This program is free software; you can redistribute it and/or modify | ||||
# it under the terms of the GNU General Public License as published by | ||||
# the Free Software Foundation; either version 3 of the License, or | ||||
# (at your option) any later version. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU General Public License | ||||
# along with this program; if not, write to the Free Software Foundation, | ||||
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | ||||
import threading | ||||
r1048 | import msgpack | |||
r987 | from http.server import BaseHTTPRequestHandler | |||
from socketserver import TCPServer | ||||
r130 | ||||
import mercurial.ui | ||||
import mock | ||||
import pytest | ||||
r1048 | from vcsserver.lib.rc_json import json | |||
r130 | from vcsserver import hooks | |||
def get_hg_ui(extras=None): | ||||
"""Create a Config object with a valid RC_SCM_DATA entry.""" | ||||
extras = extras or {} | ||||
required_extras = { | ||||
'username': '', | ||||
'repository': '', | ||||
'locked_by': '', | ||||
'scm': '', | ||||
'make_lock': '', | ||||
'action': '', | ||||
'ip': '', | ||||
'hooks_uri': 'fake_hooks_uri', | ||||
} | ||||
required_extras.update(extras) | ||||
hg_ui = mercurial.ui.ui() | ||||
r1048 | hg_ui.setconfig(b'rhodecode', b'RC_SCM_DATA', json.dumps(required_extras)) | |||
r130 | ||||
return hg_ui | ||||
def test_git_pre_receive_is_disabled(): | ||||
extras = {'hooks': ['pull']} | ||||
response = hooks.git_pre_receive(None, None, | ||||
{'RC_SCM_DATA': json.dumps(extras)}) | ||||
assert response == 0 | ||||
def test_git_post_receive_is_disabled(): | ||||
extras = {'hooks': ['pull']} | ||||
response = hooks.git_post_receive(None, '', | ||||
{'RC_SCM_DATA': json.dumps(extras)}) | ||||
assert response == 0 | ||||
def test_git_post_receive_calls_repo_size(): | ||||
extras = {'hooks': ['push', 'repo_size']} | ||||
r1048 | ||||
r130 | with mock.patch.object(hooks, '_call_hook') as call_hook_mock: | |||
hooks.git_post_receive( | ||||
None, '', {'RC_SCM_DATA': json.dumps(extras)}) | ||||
r555 | extras.update({'commit_ids': [], 'hook_type': 'post_receive', | |||
r223 | 'new_refs': {'bookmarks': [], 'branches': [], 'tags': []}}) | |||
r130 | expected_calls = [ | |||
mock.call('repo_size', extras, mock.ANY), | ||||
mock.call('post_push', extras, mock.ANY), | ||||
] | ||||
assert call_hook_mock.call_args_list == expected_calls | ||||
def test_git_post_receive_does_not_call_disabled_repo_size(): | ||||
extras = {'hooks': ['push']} | ||||
r1048 | ||||
r130 | with mock.patch.object(hooks, '_call_hook') as call_hook_mock: | |||
hooks.git_post_receive( | ||||
None, '', {'RC_SCM_DATA': json.dumps(extras)}) | ||||
r555 | extras.update({'commit_ids': [], 'hook_type': 'post_receive', | |||
r223 | 'new_refs': {'bookmarks': [], 'branches': [], 'tags': []}}) | |||
r130 | expected_calls = [ | |||
mock.call('post_push', extras, mock.ANY) | ||||
] | ||||
assert call_hook_mock.call_args_list == expected_calls | ||||
def test_repo_size_exception_does_not_affect_git_post_receive(): | ||||
extras = {'hooks': ['push', 'repo_size']} | ||||
status = 0 | ||||
def side_effect(name, *args, **kwargs): | ||||
if name == 'repo_size': | ||||
raise Exception('Fake exception') | ||||
else: | ||||
return status | ||||
with mock.patch.object(hooks, '_call_hook') as call_hook_mock: | ||||
call_hook_mock.side_effect = side_effect | ||||
result = hooks.git_post_receive( | ||||
None, '', {'RC_SCM_DATA': json.dumps(extras)}) | ||||
assert result == status | ||||
def test_git_pre_pull_is_disabled(): | ||||
assert hooks.git_pre_pull({'hooks': ['push']}) == hooks.HookResponse(0, '') | ||||
def test_git_post_pull_is_disabled(): | ||||
assert ( | ||||
hooks.git_post_pull({'hooks': ['push']}) == hooks.HookResponse(0, '')) | ||||
class TestGetHooksClient(object): | ||||
def test_returns_http_client_when_protocol_matches(self): | ||||
hooks_uri = 'localhost:8000' | ||||
result = hooks._get_hooks_client({ | ||||
'hooks_uri': hooks_uri, | ||||
'hooks_protocol': 'http' | ||||
}) | ||||
assert isinstance(result, hooks.HooksHttpClient) | ||||
assert result.hooks_uri == hooks_uri | ||||
def test_returns_dummy_client_when_hooks_uri_not_specified(self): | ||||
fake_module = mock.Mock() | ||||
import_patcher = mock.patch.object( | ||||
hooks.importlib, 'import_module', return_value=fake_module) | ||||
fake_module_name = 'fake.module' | ||||
with import_patcher as import_mock: | ||||
result = hooks._get_hooks_client( | ||||
{'hooks_module': fake_module_name}) | ||||
import_mock.assert_called_once_with(fake_module_name) | ||||
assert isinstance(result, hooks.HooksDummyClient) | ||||
assert result._hooks_module == fake_module | ||||
class TestHooksHttpClient(object): | ||||
def test_init_sets_hooks_uri(self): | ||||
uri = 'localhost:3000' | ||||
client = hooks.HooksHttpClient(uri) | ||||
assert client.hooks_uri == uri | ||||
r1048 | def test_serialize_returns_serialized_string(self): | |||
r130 | client = hooks.HooksHttpClient('localhost:3000') | |||
hook_name = 'test' | ||||
extras = { | ||||
'first': 1, | ||||
'second': 'two' | ||||
} | ||||
r1048 | hooks_proto, result = client._serialize(hook_name, extras) | |||
expected_result = msgpack.packb({ | ||||
r130 | 'method': hook_name, | |||
r1048 | 'extras': extras, | |||
r130 | }) | |||
r1048 | assert hooks_proto == {'rc-hooks-protocol': 'msgpack.v1'} | |||
r130 | assert result == expected_result | |||
def test_call_queries_http_server(self, http_mirror): | ||||
client = hooks.HooksHttpClient(http_mirror.uri) | ||||
hook_name = 'test' | ||||
extras = { | ||||
'first': 1, | ||||
'second': 'two' | ||||
} | ||||
result = client(hook_name, extras) | ||||
r1048 | expected_result = msgpack.unpackb(msgpack.packb({ | |||
r130 | 'method': hook_name, | |||
'extras': extras | ||||
r1048 | }), raw=False) | |||
r130 | assert result == expected_result | |||
class TestHooksDummyClient(object): | ||||
def test_init_imports_hooks_module(self): | ||||
hooks_module_name = 'rhodecode.fake.module' | ||||
hooks_module = mock.MagicMock() | ||||
import_patcher = mock.patch.object( | ||||
hooks.importlib, 'import_module', return_value=hooks_module) | ||||
with import_patcher as import_mock: | ||||
client = hooks.HooksDummyClient(hooks_module_name) | ||||
import_mock.assert_called_once_with(hooks_module_name) | ||||
assert client._hooks_module == hooks_module | ||||
def test_call_returns_hook_result(self): | ||||
hooks_module_name = 'rhodecode.fake.module' | ||||
hooks_module = mock.MagicMock() | ||||
import_patcher = mock.patch.object( | ||||
hooks.importlib, 'import_module', return_value=hooks_module) | ||||
with import_patcher: | ||||
client = hooks.HooksDummyClient(hooks_module_name) | ||||
result = client('post_push', {}) | ||||
hooks_module.Hooks.assert_called_once_with() | ||||
assert result == hooks_module.Hooks().__enter__().post_push() | ||||
@pytest.fixture | ||||
def http_mirror(request): | ||||
server = MirrorHttpServer() | ||||
request.addfinalizer(server.stop) | ||||
return server | ||||
class MirrorHttpHandler(BaseHTTPRequestHandler): | ||||
r1048 | ||||
r130 | def do_POST(self): | |||
length = int(self.headers['Content-Length']) | ||||
r1048 | body = self.rfile.read(length) | |||
r130 | self.send_response(200) | |||
self.end_headers() | ||||
self.wfile.write(body) | ||||
class MirrorHttpServer(object): | ||||
ip_address = '127.0.0.1' | ||||
port = 0 | ||||
def __init__(self): | ||||
self._daemon = TCPServer((self.ip_address, 0), MirrorHttpHandler) | ||||
_, self.port = self._daemon.server_address | ||||
self._thread = threading.Thread(target=self._daemon.serve_forever) | ||||
self._thread.daemon = True | ||||
self._thread.start() | ||||
def stop(self): | ||||
self._daemon.shutdown() | ||||
self._thread.join() | ||||
self._daemon = None | ||||
self._thread = None | ||||
@property | ||||
def uri(self): | ||||
return '{}:{}'.format(self.ip_address, self.port) | ||||