diff --git a/rhodecode/apps/ssh_support/lib/backends/__init__.py b/rhodecode/apps/ssh_support/lib/backends/__init__.py --- a/rhodecode/apps/ssh_support/lib/backends/__init__.py +++ b/rhodecode/apps/ssh_support/lib/backends/__init__.py @@ -178,6 +178,7 @@ class SshWrapper(object): extras = { 'detect_force_push': detect_force_push, 'check_branch_perms': check_branch_perms, + 'config': self.ini_path } if vcs == 'hg': diff --git a/rhodecode/apps/ssh_support/lib/backends/base.py b/rhodecode/apps/ssh_support/lib/backends/base.py --- a/rhodecode/apps/ssh_support/lib/backends/base.py +++ b/rhodecode/apps/ssh_support/lib/backends/base.py @@ -149,8 +149,7 @@ class VcsServer(object): callback_daemon, extras = prepare_callback_daemon( extras, protocol=vcs_settings.HOOKS_PROTOCOL, - host=vcs_settings.HOOKS_HOST, - use_direct_calls=False) + host=vcs_settings.HOOKS_HOST) with callback_daemon: try: diff --git a/rhodecode/config/utils.py b/rhodecode/config/utils.py --- a/rhodecode/config/utils.py +++ b/rhodecode/config/utils.py @@ -37,7 +37,6 @@ def configure_vcs(config): conf.settings.HOOKS_PROTOCOL = config['vcs.hooks.protocol'] conf.settings.HOOKS_HOST = config['vcs.hooks.host'] - conf.settings.HOOKS_DIRECT_CALLS = config['vcs.hooks.direct_calls'] conf.settings.DEFAULT_ENCODINGS = config['default_encoding'] conf.settings.ALIASES[:] = config['vcs.backends'] conf.settings.SVN_COMPATIBLE_VERSION = config['vcs.svn.compatible_version'] diff --git a/rhodecode/lib/celerylib/tasks.py b/rhodecode/lib/celerylib/tasks.py --- a/rhodecode/lib/celerylib/tasks.py +++ b/rhodecode/lib/celerylib/tasks.py @@ -32,6 +32,7 @@ import rhodecode from rhodecode.lib import audit_logger from rhodecode.lib.celerylib import get_logger, async_task, RequestContextTask, run_task from rhodecode.lib import hooks_base +from rhodecode.lib.utils import adopt_for_celery from rhodecode.lib.utils2 import safe_int, str2bool, aslist from rhodecode.lib.statsd_client import StatsdClient from rhodecode.model.db import ( @@ -410,3 +411,38 @@ def beat_check(*args, **kwargs): log = get_logger(beat_check) log.info('%r: Got args: %r and kwargs %r', beat_check, args, kwargs) return time.time() + + +@async_task +@adopt_for_celery +def repo_size(extras): + from rhodecode.lib.hooks_base import repo_size + return repo_size(extras) + + +@async_task +@adopt_for_celery +def pre_pull(extras): + from rhodecode.lib.hooks_base import pre_pull + return pre_pull(extras) + + +@async_task +@adopt_for_celery +def post_pull(extras): + from rhodecode.lib.hooks_base import post_pull + return post_pull(extras) + + +@async_task +@adopt_for_celery +def pre_push(extras): + from rhodecode.lib.hooks_base import pre_push + return pre_push(extras) + + +@async_task +@adopt_for_celery +def post_push(extras): + from rhodecode.lib.hooks_base import post_push + return post_push(extras) diff --git a/rhodecode/lib/hooks_base.py b/rhodecode/lib/hooks_base.py --- a/rhodecode/lib/hooks_base.py +++ b/rhodecode/lib/hooks_base.py @@ -53,6 +53,9 @@ class HookResponse(object): def __bool__(self): return self.status == 0 + def to_json(self): + return {'status': self.status, 'output': self.output} + def is_shadow_repo(extras): """ diff --git a/rhodecode/lib/hooks_daemon.py b/rhodecode/lib/hooks_daemon.py --- a/rhodecode/lib/hooks_daemon.py +++ b/rhodecode/lib/hooks_daemon.py @@ -34,6 +34,7 @@ from rhodecode.lib.exceptions import HTT from rhodecode.model import meta from rhodecode.lib import hooks_base from rhodecode.lib.utils2 import AttributeDict +from rhodecode.lib.pyramid_utils import get_config from rhodecode.lib.ext_json import json from rhodecode.lib import rc_cache @@ -135,9 +136,10 @@ class HooksHttpHandler(BaseHTTPRequestHa self.log_date_time_string(), message) -class DummyHooksCallbackDaemon(object): - hooks_uri = '' - +class BaseHooksCallbackDaemon: + """ + Basic context manager for actions that don't require some extra + """ def __init__(self): self.hooks_module = Hooks.__module__ @@ -149,6 +151,16 @@ class DummyHooksCallbackDaemon(object): log.debug('Exiting `%s` callback daemon', self.__class__.__name__) +class CeleryHooksCallbackDaemon(BaseHooksCallbackDaemon): + """ + Context manger for achieving a compatibility with celery backend + """ + + def __init__(self, config): + self.task_queue = config.get('app:main', 'celery.broker_url') + self.task_backend = config.get('app:main', 'celery.result_backend') + + class ThreadedHookCallbackDaemon(object): _callback_thread = None @@ -334,28 +346,31 @@ def get_txn_id_from_store(txn_id): return {} -def prepare_callback_daemon(extras, protocol, host, use_direct_calls, txn_id=None): +def prepare_callback_daemon(extras, protocol, host, txn_id=None): txn_details = get_txn_id_from_store(txn_id) port = txn_details.get('port', 0) - if use_direct_calls: - callback_daemon = DummyHooksCallbackDaemon() - extras['hooks_module'] = callback_daemon.hooks_module - else: - if protocol == 'http': + match protocol: + case 'http': callback_daemon = HttpHooksCallbackDaemon( txn_id=txn_id, host=host, port=port) - else: + case 'celery': + callback_daemon = CeleryHooksCallbackDaemon(get_config(extras['config'])) + case 'local': + callback_daemon = BaseHooksCallbackDaemon() + case _: log.error('Unsupported callback daemon protocol "%s"', protocol) raise Exception('Unsupported callback daemon protocol.') - extras['hooks_uri'] = callback_daemon.hooks_uri + extras['hooks_uri'] = getattr(callback_daemon, 'hooks_uri', '') + extras['task_queue'] = getattr(callback_daemon, 'task_queue', '') + extras['task_backend'] = getattr(callback_daemon, 'task_backend', '') extras['hooks_protocol'] = protocol extras['time'] = time.time() # register txn_id extras['txn_id'] = txn_id - log.debug('Prepared a callback daemon: %s at url `%s`', - callback_daemon.__class__.__name__, callback_daemon.hooks_uri) + log.debug('Prepared a callback daemon: %s', + callback_daemon.__class__.__name__) return callback_daemon, extras diff --git a/rhodecode/lib/middleware/simplevcs.py b/rhodecode/lib/middleware/simplevcs.py --- a/rhodecode/lib/middleware/simplevcs.py +++ b/rhodecode/lib/middleware/simplevcs.py @@ -683,14 +683,14 @@ class SimpleVCS(object): return True def _prepare_callback_daemon(self, extras, environ, action, txn_id=None): - direct_calls = vcs_settings.HOOKS_DIRECT_CALLS + protocol = vcs_settings.HOOKS_PROTOCOL if not self._should_use_callback_daemon(extras, environ, action): # disable callback daemon for actions that don't require it - direct_calls = True + protocol = 'local' return prepare_callback_daemon( - extras, protocol=vcs_settings.HOOKS_PROTOCOL, - host=vcs_settings.HOOKS_HOST, use_direct_calls=direct_calls, txn_id=txn_id) + extras, protocol=protocol, + host=vcs_settings.HOOKS_HOST, txn_id=txn_id) def _should_check_locking(query_string): diff --git a/rhodecode/lib/utils.py b/rhodecode/lib/utils.py --- a/rhodecode/lib/utils.py +++ b/rhodecode/lib/utils.py @@ -32,6 +32,7 @@ import tempfile import traceback import tarfile import warnings +from functools import wraps from os.path import join as jn import paste @@ -41,6 +42,7 @@ from webhelpers2.text import collapse, s from mako import exceptions from rhodecode.lib.hash_utils import sha256_safe, md5, sha1 +from rhodecode.lib.type_utils import AttributeDict from rhodecode.lib.str_utils import safe_bytes, safe_str from rhodecode.lib.vcs.backends.base import Config from rhodecode.lib.vcs.exceptions import VCSError @@ -73,6 +75,19 @@ SLUG_RE = re.compile(r'[^{}]+'.format(SL _license_cache = None +def adopt_for_celery(func): + """ + Decorator designed to adopt hooks (from rhodecode.lib.hooks_base) + for further usage as a celery tasks. + """ + @wraps(func) + def wrapper(extras): + extras = AttributeDict(extras) + # HooksResponse implements to_json method which must be used there. + return func(extras).to_json() + return wrapper + + def repo_name_slug(value): """ Return slug of name of repository diff --git a/rhodecode/lib/vcs/conf/settings.py b/rhodecode/lib/vcs/conf/settings.py --- a/rhodecode/lib/vcs/conf/settings.py +++ b/rhodecode/lib/vcs/conf/settings.py @@ -49,7 +49,6 @@ ARCHIVE_SPECS = [ ] HOOKS_PROTOCOL = None -HOOKS_DIRECT_CALLS = False HOOKS_HOST = '127.0.0.1' diff --git a/rhodecode/model/pull_request.py b/rhodecode/model/pull_request.py --- a/rhodecode/model/pull_request.py +++ b/rhodecode/model/pull_request.py @@ -982,8 +982,7 @@ class PullRequestModel(BaseModel): callback_daemon, extras = prepare_callback_daemon( extras, protocol=vcs_settings.HOOKS_PROTOCOL, - host=vcs_settings.HOOKS_HOST, - use_direct_calls=vcs_settings.HOOKS_DIRECT_CALLS) + host=vcs_settings.HOOKS_HOST) with callback_daemon: # TODO: johbo: Implement a clean way to run a config_override diff --git a/rhodecode/tests/lib/middleware/test_simplevcs.py b/rhodecode/tests/lib/middleware/test_simplevcs.py --- a/rhodecode/tests/lib/middleware/test_simplevcs.py +++ b/rhodecode/tests/lib/middleware/test_simplevcs.py @@ -25,7 +25,6 @@ from rhodecode.lib.utils2 import Attribu from rhodecode.tests.utils import CustomTestApp from rhodecode.lib.caching_query import FromCache -from rhodecode.lib.hooks_daemon import DummyHooksCallbackDaemon from rhodecode.lib.middleware import simplevcs from rhodecode.lib.middleware.https_fixup import HttpsFixup from rhodecode.lib.middleware.utils import scm_app_http @@ -397,18 +396,6 @@ class TestGenerateVcsResponse(object): with pytest.raises(Exception): list(result) - def test_prepare_callback_daemon_is_called(self): - def side_effect(extras, environ, action, txn_id=None): - return DummyHooksCallbackDaemon(), extras - - prepare_patcher = mock.patch.object( - StubVCSController, '_prepare_callback_daemon') - with prepare_patcher as prepare_mock: - prepare_mock.side_effect = side_effect - self.call_controller_with_response_body(iter(['a', 'b'])) - assert prepare_mock.called - assert prepare_mock.call_count == 1 - def call_controller_with_response_body(self, response_body): settings = { 'base_path': 'fake_base_path', @@ -462,26 +449,3 @@ class TestInitializeGenerator(object): def factory(self, iterable): for elem in iterable: yield elem - - -class TestPrepareHooksDaemon(object): - def test_calls_imported_prepare_callback_daemon(self, app_settings, request_stub): - expected_extras = {'extra1': 'value1'} - daemon = DummyHooksCallbackDaemon() - - controller = StubVCSController(app_settings, request_stub.registry) - prepare_patcher = mock.patch.object( - simplevcs, 'prepare_callback_daemon', - return_value=(daemon, expected_extras)) - with prepare_patcher as prepare_mock: - callback_daemon, extras = controller._prepare_callback_daemon( - expected_extras.copy(), {}, 'push') - prepare_mock.assert_called_once_with( - expected_extras, - protocol=app_settings['vcs.hooks.protocol'], - host=app_settings['vcs.hooks.host'], - txn_id=None, - use_direct_calls=app_settings['vcs.hooks.direct_calls']) - - assert callback_daemon == daemon - assert extras == extras diff --git a/rhodecode/tests/lib/test_hooks_daemon.py b/rhodecode/tests/lib/test_hooks_daemon.py --- a/rhodecode/tests/lib/test_hooks_daemon.py +++ b/rhodecode/tests/lib/test_hooks_daemon.py @@ -23,6 +23,7 @@ import io import mock import msgpack import pytest +import tempfile from rhodecode.lib import hooks_daemon from rhodecode.lib.str_utils import safe_bytes @@ -32,28 +33,6 @@ from rhodecode.lib.ext_json import json test_proto = hooks_daemon.HooksHttpHandler.MSGPACK_HOOKS_PROTO -class TestDummyHooksCallbackDaemon(object): - def test_hooks_module_path_set_properly(self): - daemon = hooks_daemon.DummyHooksCallbackDaemon() - assert daemon.hooks_module == 'rhodecode.lib.hooks_daemon' - - def test_logs_entering_the_hook(self): - daemon = hooks_daemon.DummyHooksCallbackDaemon() - with mock.patch.object(hooks_daemon.log, 'debug') as log_mock: - with daemon as return_value: - log_mock.assert_called_once_with( - 'Running `%s` callback daemon', 'DummyHooksCallbackDaemon') - assert return_value == daemon - - def test_logs_exiting_the_hook(self): - daemon = hooks_daemon.DummyHooksCallbackDaemon() - with mock.patch.object(hooks_daemon.log, 'debug') as log_mock: - with daemon: - pass - log_mock.assert_called_with( - 'Exiting `%s` callback daemon', 'DummyHooksCallbackDaemon') - - class TestHooks(object): def test_hooks_can_be_used_as_a_context_processor(self): hooks = hooks_daemon.Hooks() @@ -299,17 +278,17 @@ class TestHttpHooksCallbackDaemon(object class TestPrepareHooksDaemon(object): - @pytest.mark.parametrize('protocol', ('http',)) - def test_returns_dummy_hooks_callback_daemon_when_using_direct_calls( + @pytest.mark.parametrize('protocol', ('celery',)) + def test_returns_celery_hooks_callback_daemon_when_celery_protocol_specified( self, protocol): - expected_extras = {'extra1': 'value1'} - callback, extras = hooks_daemon.prepare_callback_daemon( - expected_extras.copy(), protocol=protocol, - host='127.0.0.1', use_direct_calls=True) - assert isinstance(callback, hooks_daemon.DummyHooksCallbackDaemon) - expected_extras['hooks_module'] = 'rhodecode.lib.hooks_daemon' - expected_extras['time'] = extras['time'] - assert 'extra1' in extras + with tempfile.NamedTemporaryFile(mode='w') as temp_file: + temp_file.write("[app:main]\ncelery.broker_url = redis://redis/0\n" + "celery.result_backend = redis://redis/0") + temp_file.flush() + expected_extras = {'config': temp_file.name} + callback, extras = hooks_daemon.prepare_callback_daemon( + expected_extras, protocol=protocol, host='') + assert isinstance(callback, hooks_daemon.CeleryHooksCallbackDaemon) @pytest.mark.parametrize('protocol, expected_class', ( ('http', hooks_daemon.HttpHooksCallbackDaemon), @@ -319,11 +298,12 @@ class TestPrepareHooksDaemon(object): expected_extras = { 'extra1': 'value1', 'txn_id': 'txnid2', - 'hooks_protocol': protocol.lower() + 'hooks_protocol': protocol.lower(), + 'task_backend': '', + 'task_queue': '' } callback, extras = hooks_daemon.prepare_callback_daemon( expected_extras.copy(), protocol=protocol, host='127.0.0.1', - use_direct_calls=False, txn_id='txnid2') assert isinstance(callback, expected_class) extras.pop('hooks_uri') @@ -343,8 +323,7 @@ class TestPrepareHooksDaemon(object): with pytest.raises(Exception): callback, extras = hooks_daemon.prepare_callback_daemon( expected_extras.copy(), - protocol=protocol, host='127.0.0.1', - use_direct_calls=False) + protocol=protocol, host='127.0.0.1') class MockRequest(object):