##// END OF EJS Templates
hooks: cleanups hooks protocol code, and ensure we can propagate options from CE into VCS
super-admin -
r1324:a8f7c36d default
parent child Browse files
Show More
@@ -43,29 +43,38 b' log = logging.getLogger(__name__)'
43
43
44 class HooksCeleryClient:
44 class HooksCeleryClient:
45 TASK_TIMEOUT = 60 # time in seconds
45 TASK_TIMEOUT = 60 # time in seconds
46 custom_opts = {}
46
47
47 def __init__(self, queue, backend):
48 def __init__(self, broker_url, result_backend, **kwargs):
48 celery_app.config_from_object({
49 _celery_opts = {
49 'broker_url': queue, 'result_backend': backend,
50 'broker_url': broker_url,
51 'result_backend': result_backend,
50 'broker_connection_retry_on_startup': True,
52 'broker_connection_retry_on_startup': True,
51 'task_serializer': 'json',
53 'task_serializer': 'json',
52 'accept_content': ['json', 'msgpack'],
54 'accept_content': ['json', 'msgpack'],
53 'result_serializer': 'json',
55 'result_serializer': 'json',
54 'result_accept_content': ['json', 'msgpack']
56 'result_accept_content': ['json', 'msgpack']
55 })
57 }
58 if custom_opts := kwargs.pop('_celery_opts', {}):
59 _celery_opts.update(custom_opts)
60 self.custom_opts = custom_opts
61 celery_app.config_from_object(_celery_opts)
62
56 self.celery_app = celery_app
63 self.celery_app = celery_app
57
64
58 def __call__(self, method, extras):
65 def __call__(self, method, extras):
59 # NOTE: exception handling for those tasks executed is in
66 # NOTE: exception handling for those tasks executed is in
60 # @adapt_for_celery decorator
67 # @adapt_for_celery decorator
61 # also see: _maybe_handle_exception which is handling exceptions
68 # also see: _maybe_handle_exception which is handling exceptions
62 inquired_task = self.celery_app.signature(
69
63 f'rhodecode.lib.celerylib.tasks.{method}'
70 inquired_task = self.celery_app.signature(f'rhodecode.lib.celerylib.tasks.{method}')
64 )
71 task_meta = inquired_task.apply_async(args=(extras,))
65 result = inquired_task.delay(extras).get(timeout=self.TASK_TIMEOUT)
72 result = task_meta.get(timeout=self.TASK_TIMEOUT)
66
73
67 return result
74 return result
68
75
76 def __repr__(self):
77 return f'HooksCeleryClient(opts={self.custom_opts})'
69
78
70 class HooksShadowRepoClient:
79 class HooksShadowRepoClient:
71
80
@@ -141,12 +150,18 b' def _maybe_handle_exception(writer, resu'
141
150
142
151
143 def _get_hooks_client(extras):
152 def _get_hooks_client(extras):
144 task_queue = extras.get('task_queue')
145 task_backend = extras.get('task_backend')
146 is_shadow_repo = extras.get('is_shadow_repo')
153 is_shadow_repo = extras.get('is_shadow_repo')
154 hooks_protocol = extras.get('hooks_protocol')
147
155
148 if task_queue and task_backend:
156 if hooks_protocol == 'celery':
149 return HooksCeleryClient(task_queue, task_backend)
157 try:
158 celery_config = extras['hooks_config']
159 broker_url = celery_config['broker_url']
160 result_backend = celery_config['result_backend']
161 except Exception:
162 log.exception("Failed to get celery task queue and backend")
163 raise
164 return HooksCeleryClient(broker_url, result_backend, _celery_opts=celery_config)
150 elif is_shadow_repo:
165 elif is_shadow_repo:
151 return HooksShadowRepoClient()
166 return HooksShadowRepoClient()
152 else:
167 else:
@@ -117,12 +117,11 b' def test_git_post_pull_is_disabled():'
117 class TestGetHooksClient:
117 class TestGetHooksClient:
118
118
119 def test_return_celery_client_when_queue_and_backend_provided(self):
119 def test_return_celery_client_when_queue_and_backend_provided(self):
120 task_queue = 'redis://task_queue:0'
120 extras = {
121 task_backend = task_queue
121 'hooks_protocol': 'celery',
122 result = hooks._get_hooks_client({
122 'hooks_config': {'broker_url': 'redis://task_queue:0', 'result_backend': 'redis://task_queue:0'}
123 'task_queue': task_queue,
123 }
124 'task_backend': task_backend
124 result = hooks._get_hooks_client(extras)
125 })
126 assert isinstance(result, hooks.HooksCeleryClient)
125 assert isinstance(result, hooks.HooksCeleryClient)
127
126
128
127
@@ -133,3 +132,10 b' class TestHooksCeleryClient:'
133 backend = 'redis://redis:6379/0'
132 backend = 'redis://redis:6379/0'
134 client = hooks.HooksCeleryClient(queue, backend)
133 client = hooks.HooksCeleryClient(queue, backend)
135 assert client.celery_app.conf.broker_url == queue
134 assert client.celery_app.conf.broker_url == queue
135
136 def test_hooks_http_client_init_with_extra_opts(self):
137 queue = 'redis://redis:6379/0'
138 backend = 'redis://redis:6379/0'
139 client = hooks.HooksCeleryClient(queue, backend, _celery_opts={'task_always_eager': True})
140 assert client.celery_app.conf.broker_url == queue
141 assert client.celery_app.conf.task_always_eager == True
General Comments 0
You need to be logged in to leave comments. Login now