diff --git a/vcsserver/hooks.py b/vcsserver/hooks.py --- a/vcsserver/hooks.py +++ b/vcsserver/hooks.py @@ -43,29 +43,38 @@ log = logging.getLogger(__name__) class HooksCeleryClient: TASK_TIMEOUT = 60 # time in seconds + custom_opts = {} - def __init__(self, queue, backend): - celery_app.config_from_object({ - 'broker_url': queue, 'result_backend': backend, + def __init__(self, broker_url, result_backend, **kwargs): + _celery_opts = { + 'broker_url': broker_url, + 'result_backend': result_backend, 'broker_connection_retry_on_startup': True, 'task_serializer': 'json', 'accept_content': ['json', 'msgpack'], 'result_serializer': 'json', 'result_accept_content': ['json', 'msgpack'] - }) + } + if custom_opts := kwargs.pop('_celery_opts', {}): + _celery_opts.update(custom_opts) + self.custom_opts = custom_opts + celery_app.config_from_object(_celery_opts) + self.celery_app = celery_app def __call__(self, method, extras): # NOTE: exception handling for those tasks executed is in # @adapt_for_celery decorator # also see: _maybe_handle_exception which is handling exceptions - inquired_task = self.celery_app.signature( - f'rhodecode.lib.celerylib.tasks.{method}' - ) - result = inquired_task.delay(extras).get(timeout=self.TASK_TIMEOUT) + + inquired_task = self.celery_app.signature(f'rhodecode.lib.celerylib.tasks.{method}') + task_meta = inquired_task.apply_async(args=(extras,)) + result = task_meta.get(timeout=self.TASK_TIMEOUT) return result + def __repr__(self): + return f'HooksCeleryClient(opts={self.custom_opts})' class HooksShadowRepoClient: @@ -141,12 +150,18 @@ def _maybe_handle_exception(writer, resu def _get_hooks_client(extras): - task_queue = extras.get('task_queue') - task_backend = extras.get('task_backend') is_shadow_repo = extras.get('is_shadow_repo') + hooks_protocol = extras.get('hooks_protocol') - if task_queue and task_backend: - return HooksCeleryClient(task_queue, task_backend) + if hooks_protocol == 'celery': + try: + celery_config = extras['hooks_config'] + broker_url = celery_config['broker_url'] + result_backend = celery_config['result_backend'] + except Exception: + log.exception("Failed to get celery task queue and backend") + raise + return HooksCeleryClient(broker_url, result_backend, _celery_opts=celery_config) elif is_shadow_repo: return HooksShadowRepoClient() else: diff --git a/vcsserver/tests/test_hooks.py b/vcsserver/tests/test_hooks.py --- a/vcsserver/tests/test_hooks.py +++ b/vcsserver/tests/test_hooks.py @@ -117,12 +117,11 @@ def test_git_post_pull_is_disabled(): class TestGetHooksClient: def test_return_celery_client_when_queue_and_backend_provided(self): - task_queue = 'redis://task_queue:0' - task_backend = task_queue - result = hooks._get_hooks_client({ - 'task_queue': task_queue, - 'task_backend': task_backend - }) + extras = { + 'hooks_protocol': 'celery', + 'hooks_config': {'broker_url': 'redis://task_queue:0', 'result_backend': 'redis://task_queue:0'} + } + result = hooks._get_hooks_client(extras) assert isinstance(result, hooks.HooksCeleryClient) @@ -133,3 +132,10 @@ class TestHooksCeleryClient: backend = 'redis://redis:6379/0' client = hooks.HooksCeleryClient(queue, backend) assert client.celery_app.conf.broker_url == queue + + def test_hooks_http_client_init_with_extra_opts(self): + queue = 'redis://redis:6379/0' + backend = 'redis://redis:6379/0' + client = hooks.HooksCeleryClient(queue, backend, _celery_opts={'task_always_eager': True}) + assert client.celery_app.conf.broker_url == queue + assert client.celery_app.conf.task_always_eager == True