Show More
@@ -43,29 +43,38 b' log = logging.getLogger(__name__)' | |||
|
43 | 43 | |
|
44 | 44 | class HooksCeleryClient: |
|
45 | 45 | TASK_TIMEOUT = 60 # time in seconds |
|
46 | custom_opts = {} | |
|
46 | 47 | |
|
47 |
def __init__(self, |
|
|
48 | celery_app.config_from_object({ | |
|
49 |
'broker_url': |
|
|
48 | def __init__(self, broker_url, result_backend, **kwargs): | |
|
49 | _celery_opts = { | |
|
50 | 'broker_url': broker_url, | |
|
51 | 'result_backend': result_backend, | |
|
50 | 52 | 'broker_connection_retry_on_startup': True, |
|
51 | 53 | 'task_serializer': 'json', |
|
52 | 54 | 'accept_content': ['json', 'msgpack'], |
|
53 | 55 | 'result_serializer': 'json', |
|
54 | 56 | 'result_accept_content': ['json', 'msgpack'] |
|
55 |
} |
|
|
57 | } | |
|
58 | if custom_opts := kwargs.pop('_celery_opts', {}): | |
|
59 | _celery_opts.update(custom_opts) | |
|
60 | self.custom_opts = custom_opts | |
|
61 | celery_app.config_from_object(_celery_opts) | |
|
62 | ||
|
56 | 63 | self.celery_app = celery_app |
|
57 | 64 | |
|
58 | 65 | def __call__(self, method, extras): |
|
59 | 66 | # NOTE: exception handling for those tasks executed is in |
|
60 | 67 | # @adapt_for_celery decorator |
|
61 | 68 | # also see: _maybe_handle_exception which is handling exceptions |
|
62 | inquired_task = self.celery_app.signature( | |
|
63 |
|
|
|
64 | ) | |
|
65 |
result = |
|
|
69 | ||
|
70 | inquired_task = self.celery_app.signature(f'rhodecode.lib.celerylib.tasks.{method}') | |
|
71 | task_meta = inquired_task.apply_async(args=(extras,)) | |
|
72 | result = task_meta.get(timeout=self.TASK_TIMEOUT) | |
|
66 | 73 | |
|
67 | 74 | return result |
|
68 | 75 | |
|
76 | def __repr__(self): | |
|
77 | return f'HooksCeleryClient(opts={self.custom_opts})' | |
|
69 | 78 | |
|
70 | 79 | class HooksShadowRepoClient: |
|
71 | 80 | |
@@ -141,12 +150,18 b' def _maybe_handle_exception(writer, resu' | |||
|
141 | 150 | |
|
142 | 151 | |
|
143 | 152 | def _get_hooks_client(extras): |
|
144 | task_queue = extras.get('task_queue') | |
|
145 | task_backend = extras.get('task_backend') | |
|
146 | 153 | is_shadow_repo = extras.get('is_shadow_repo') |
|
154 | hooks_protocol = extras.get('hooks_protocol') | |
|
147 | 155 | |
|
148 | if task_queue and task_backend: | |
|
149 | return HooksCeleryClient(task_queue, task_backend) | |
|
156 | if hooks_protocol == 'celery': | |
|
157 | try: | |
|
158 | celery_config = extras['hooks_config'] | |
|
159 | broker_url = celery_config['broker_url'] | |
|
160 | result_backend = celery_config['result_backend'] | |
|
161 | except Exception: | |
|
162 | log.exception("Failed to get celery task queue and backend") | |
|
163 | raise | |
|
164 | return HooksCeleryClient(broker_url, result_backend, _celery_opts=celery_config) | |
|
150 | 165 | elif is_shadow_repo: |
|
151 | 166 | return HooksShadowRepoClient() |
|
152 | 167 | else: |
@@ -117,12 +117,11 b' def test_git_post_pull_is_disabled():' | |||
|
117 | 117 | class TestGetHooksClient: |
|
118 | 118 | |
|
119 | 119 | def test_return_celery_client_when_queue_and_backend_provided(self): |
|
120 | task_queue = 'redis://task_queue:0' | |
|
121 | task_backend = task_queue | |
|
122 | result = hooks._get_hooks_client({ | |
|
123 | 'task_queue': task_queue, | |
|
124 | 'task_backend': task_backend | |
|
125 | }) | |
|
120 | extras = { | |
|
121 | 'hooks_protocol': 'celery', | |
|
122 | 'hooks_config': {'broker_url': 'redis://task_queue:0', 'result_backend': 'redis://task_queue:0'} | |
|
123 | } | |
|
124 | result = hooks._get_hooks_client(extras) | |
|
126 | 125 | assert isinstance(result, hooks.HooksCeleryClient) |
|
127 | 126 | |
|
128 | 127 | |
@@ -133,3 +132,10 b' class TestHooksCeleryClient:' | |||
|
133 | 132 | backend = 'redis://redis:6379/0' |
|
134 | 133 | client = hooks.HooksCeleryClient(queue, backend) |
|
135 | 134 | assert client.celery_app.conf.broker_url == queue |
|
135 | ||
|
136 | def test_hooks_http_client_init_with_extra_opts(self): | |
|
137 | queue = 'redis://redis:6379/0' | |
|
138 | backend = 'redis://redis:6379/0' | |
|
139 | client = hooks.HooksCeleryClient(queue, backend, _celery_opts={'task_always_eager': True}) | |
|
140 | assert client.celery_app.conf.broker_url == queue | |
|
141 | assert client.celery_app.conf.task_always_eager == True |
General Comments 0
You need to be logged in to leave comments.
Login now