Show More
@@ -43,29 +43,38 b' log = logging.getLogger(__name__)' | |||||
43 |
|
43 | |||
44 | class HooksCeleryClient: |
|
44 | class HooksCeleryClient: | |
45 | TASK_TIMEOUT = 60 # time in seconds |
|
45 | TASK_TIMEOUT = 60 # time in seconds | |
|
46 | custom_opts = {} | |||
46 |
|
47 | |||
47 |
def __init__(self, |
|
48 | def __init__(self, broker_url, result_backend, **kwargs): | |
48 | celery_app.config_from_object({ |
|
49 | _celery_opts = { | |
49 |
'broker_url': |
|
50 | 'broker_url': broker_url, | |
|
51 | 'result_backend': result_backend, | |||
50 | 'broker_connection_retry_on_startup': True, |
|
52 | 'broker_connection_retry_on_startup': True, | |
51 | 'task_serializer': 'json', |
|
53 | 'task_serializer': 'json', | |
52 | 'accept_content': ['json', 'msgpack'], |
|
54 | 'accept_content': ['json', 'msgpack'], | |
53 | 'result_serializer': 'json', |
|
55 | 'result_serializer': 'json', | |
54 | 'result_accept_content': ['json', 'msgpack'] |
|
56 | 'result_accept_content': ['json', 'msgpack'] | |
55 |
} |
|
57 | } | |
|
58 | if custom_opts := kwargs.pop('_celery_opts', {}): | |||
|
59 | _celery_opts.update(custom_opts) | |||
|
60 | self.custom_opts = custom_opts | |||
|
61 | celery_app.config_from_object(_celery_opts) | |||
|
62 | ||||
56 | self.celery_app = celery_app |
|
63 | self.celery_app = celery_app | |
57 |
|
64 | |||
58 | def __call__(self, method, extras): |
|
65 | def __call__(self, method, extras): | |
59 | # NOTE: exception handling for those tasks executed is in |
|
66 | # NOTE: exception handling for those tasks executed is in | |
60 | # @adapt_for_celery decorator |
|
67 | # @adapt_for_celery decorator | |
61 | # also see: _maybe_handle_exception which is handling exceptions |
|
68 | # also see: _maybe_handle_exception which is handling exceptions | |
62 | inquired_task = self.celery_app.signature( |
|
69 | ||
63 |
|
|
70 | inquired_task = self.celery_app.signature(f'rhodecode.lib.celerylib.tasks.{method}') | |
64 | ) |
|
71 | task_meta = inquired_task.apply_async(args=(extras,)) | |
65 |
result = |
|
72 | result = task_meta.get(timeout=self.TASK_TIMEOUT) | |
66 |
|
73 | |||
67 | return result |
|
74 | return result | |
68 |
|
75 | |||
|
76 | def __repr__(self): | |||
|
77 | return f'HooksCeleryClient(opts={self.custom_opts})' | |||
69 |
|
78 | |||
70 | class HooksShadowRepoClient: |
|
79 | class HooksShadowRepoClient: | |
71 |
|
80 | |||
@@ -141,12 +150,18 b' def _maybe_handle_exception(writer, resu' | |||||
141 |
|
150 | |||
142 |
|
151 | |||
143 | def _get_hooks_client(extras): |
|
152 | def _get_hooks_client(extras): | |
144 | task_queue = extras.get('task_queue') |
|
|||
145 | task_backend = extras.get('task_backend') |
|
|||
146 | is_shadow_repo = extras.get('is_shadow_repo') |
|
153 | is_shadow_repo = extras.get('is_shadow_repo') | |
|
154 | hooks_protocol = extras.get('hooks_protocol') | |||
147 |
|
155 | |||
148 | if task_queue and task_backend: |
|
156 | if hooks_protocol == 'celery': | |
149 | return HooksCeleryClient(task_queue, task_backend) |
|
157 | try: | |
|
158 | celery_config = extras['hooks_config'] | |||
|
159 | broker_url = celery_config['broker_url'] | |||
|
160 | result_backend = celery_config['result_backend'] | |||
|
161 | except Exception: | |||
|
162 | log.exception("Failed to get celery task queue and backend") | |||
|
163 | raise | |||
|
164 | return HooksCeleryClient(broker_url, result_backend, _celery_opts=celery_config) | |||
150 | elif is_shadow_repo: |
|
165 | elif is_shadow_repo: | |
151 | return HooksShadowRepoClient() |
|
166 | return HooksShadowRepoClient() | |
152 | else: |
|
167 | else: |
@@ -117,12 +117,11 b' def test_git_post_pull_is_disabled():' | |||||
117 | class TestGetHooksClient: |
|
117 | class TestGetHooksClient: | |
118 |
|
118 | |||
119 | def test_return_celery_client_when_queue_and_backend_provided(self): |
|
119 | def test_return_celery_client_when_queue_and_backend_provided(self): | |
120 | task_queue = 'redis://task_queue:0' |
|
120 | extras = { | |
121 | task_backend = task_queue |
|
121 | 'hooks_protocol': 'celery', | |
122 | result = hooks._get_hooks_client({ |
|
122 | 'hooks_config': {'broker_url': 'redis://task_queue:0', 'result_backend': 'redis://task_queue:0'} | |
123 | 'task_queue': task_queue, |
|
123 | } | |
124 | 'task_backend': task_backend |
|
124 | result = hooks._get_hooks_client(extras) | |
125 | }) |
|
|||
126 | assert isinstance(result, hooks.HooksCeleryClient) |
|
125 | assert isinstance(result, hooks.HooksCeleryClient) | |
127 |
|
126 | |||
128 |
|
127 | |||
@@ -133,3 +132,10 b' class TestHooksCeleryClient:' | |||||
133 | backend = 'redis://redis:6379/0' |
|
132 | backend = 'redis://redis:6379/0' | |
134 | client = hooks.HooksCeleryClient(queue, backend) |
|
133 | client = hooks.HooksCeleryClient(queue, backend) | |
135 | assert client.celery_app.conf.broker_url == queue |
|
134 | assert client.celery_app.conf.broker_url == queue | |
|
135 | ||||
|
136 | def test_hooks_http_client_init_with_extra_opts(self): | |||
|
137 | queue = 'redis://redis:6379/0' | |||
|
138 | backend = 'redis://redis:6379/0' | |||
|
139 | client = hooks.HooksCeleryClient(queue, backend, _celery_opts={'task_always_eager': True}) | |||
|
140 | assert client.celery_app.conf.broker_url == queue | |||
|
141 | assert client.celery_app.conf.task_always_eager == True |
General Comments 0
You need to be logged in to leave comments.
Login now