##// END OF EJS Templates
fix(celery): don't use msgpack serializer due to problems with certain data types
super-admin -
r5406:f8c36854 default
parent child Browse files
Show More
@@ -1,372 +1,372 b''
1 # Copyright (C) 2010-2023 RhodeCode GmbH
1 # Copyright (C) 2010-2023 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 """
18 """
19 Celery loader, run with::
19 Celery loader, run with::
20
20
21 celery worker \
21 celery worker \
22 --task-events \
22 --task-events \
23 --beat \
23 --beat \
24 --autoscale=20,2 \
24 --autoscale=20,2 \
25 --max-tasks-per-child 1 \
25 --max-tasks-per-child 1 \
26 --app rhodecode.lib.celerylib.loader \
26 --app rhodecode.lib.celerylib.loader \
27 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
27 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
28 --loglevel DEBUG --ini=.dev/dev.ini
28 --loglevel DEBUG --ini=.dev/dev.ini
29 """
29 """
30 from rhodecode.config.patches import inspect_getargspec, inspect_formatargspec
30 from rhodecode.config.patches import inspect_getargspec, inspect_formatargspec
31 inspect_getargspec()
31 inspect_getargspec()
32 inspect_formatargspec()
32 inspect_formatargspec()
33 # python3.11 inspect patches for backward compat on `paste` code
33 # python3.11 inspect patches for backward compat on `paste` code
34
34
35 import sys
35 import sys
36 import logging
36 import logging
37 import importlib
37 import importlib
38
38
39 import click
39 import click
40 from celery import Celery
40 from celery import Celery
41 from celery import signals
41 from celery import signals
42 from celery import Task
42 from celery import Task
43 from celery import exceptions # noqa
43 from celery import exceptions # noqa
44
44
45 import rhodecode
45 import rhodecode
46
46
47 from rhodecode.lib.statsd_client import StatsdClient
47 from rhodecode.lib.statsd_client import StatsdClient
48 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
48 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
49 from rhodecode.lib.ext_json import json
49 from rhodecode.lib.ext_json import json
50 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
50 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
51 from rhodecode.lib.utils2 import str2bool
51 from rhodecode.lib.utils2 import str2bool
52 from rhodecode.model import meta
52 from rhodecode.model import meta
53
53
54 log = logging.getLogger('celery.rhodecode.loader')
54 log = logging.getLogger('celery.rhodecode.loader')
55
55
56
56
57 imports = ['rhodecode.lib.celerylib.tasks']
57 imports = ['rhodecode.lib.celerylib.tasks']
58
58
59 try:
59 try:
60 # try if we have EE tasks available
60 # try if we have EE tasks available
61 importlib.import_module('rc_ee')
61 importlib.import_module('rc_ee')
62 imports.append('rc_ee.lib.celerylib.tasks')
62 imports.append('rc_ee.lib.celerylib.tasks')
63 except ImportError:
63 except ImportError:
64 pass
64 pass
65
65
66
66
67 base_celery_config = {
67 base_celery_config = {
68 'result_backend': 'rpc://',
68 'result_backend': 'rpc://',
69 'result_expires': 60 * 60 * 24,
69 'result_expires': 60 * 60 * 24,
70 'result_persistent': True,
70 'result_persistent': True,
71 'imports': imports,
71 'imports': imports,
72 'worker_max_tasks_per_child': 100,
72 'worker_max_tasks_per_child': 100,
73 'worker_hijack_root_logger': False,
73 'worker_hijack_root_logger': False,
74 'worker_prefetch_multiplier': 1,
74 'worker_prefetch_multiplier': 1,
75 'task_serializer': 'msgpack',
75 'task_serializer': 'json',
76 'accept_content': ['json', 'msgpack'],
76 'accept_content': ['json', 'msgpack'],
77 'result_serializer': 'msgpack',
77 'result_serializer': 'json',
78 'result_accept_content': ['json', 'msgpack'],
78 'result_accept_content': ['json', 'msgpack'],
79
79
80 'broker_connection_retry_on_startup': True,
80 'broker_connection_retry_on_startup': True,
81 'database_table_names': {
81 'database_table_names': {
82 'task': 'beat_taskmeta',
82 'task': 'beat_taskmeta',
83 'group': 'beat_groupmeta',
83 'group': 'beat_groupmeta',
84 }
84 }
85 }
85 }
86
86
87
87
88 preload_option_ini = click.Option(
88 preload_option_ini = click.Option(
89 ('--ini',),
89 ('--ini',),
90 help='Path to ini configuration file.'
90 help='Path to ini configuration file.'
91 )
91 )
92
92
93 preload_option_ini_var = click.Option(
93 preload_option_ini_var = click.Option(
94 ('--ini-var',),
94 ('--ini-var',),
95 help='Comma separated list of key=value to pass to ini.'
95 help='Comma separated list of key=value to pass to ini.'
96 )
96 )
97
97
98
98
99 def get_logger(obj):
99 def get_logger(obj):
100 custom_log = logging.getLogger(
100 custom_log = logging.getLogger(
101 'rhodecode.task.{}'.format(obj.__class__.__name__))
101 'rhodecode.task.{}'.format(obj.__class__.__name__))
102
102
103 if rhodecode.CELERY_ENABLED:
103 if rhodecode.CELERY_ENABLED:
104 try:
104 try:
105 custom_log = obj.get_logger()
105 custom_log = obj.get_logger()
106 except Exception:
106 except Exception:
107 pass
107 pass
108
108
109 return custom_log
109 return custom_log
110
110
111
111
112 # init main celery app
112 # init main celery app
113 celery_app = Celery()
113 celery_app = Celery()
114 celery_app.user_options['preload'].add(preload_option_ini)
114 celery_app.user_options['preload'].add(preload_option_ini)
115 celery_app.user_options['preload'].add(preload_option_ini_var)
115 celery_app.user_options['preload'].add(preload_option_ini_var)
116
116
117
117
118 @signals.setup_logging.connect
118 @signals.setup_logging.connect
119 def setup_logging_callback(**kwargs):
119 def setup_logging_callback(**kwargs):
120
120
121 if 'RC_INI_FILE' in celery_app.conf:
121 if 'RC_INI_FILE' in celery_app.conf:
122 ini_file = celery_app.conf['RC_INI_FILE']
122 ini_file = celery_app.conf['RC_INI_FILE']
123 else:
123 else:
124 ini_file = celery_app.user_options['RC_INI_FILE']
124 ini_file = celery_app.user_options['RC_INI_FILE']
125
125
126 setup_logging(ini_file)
126 setup_logging(ini_file)
127
127
128
128
129 @signals.user_preload_options.connect
129 @signals.user_preload_options.connect
130 def on_preload_parsed(options, **kwargs):
130 def on_preload_parsed(options, **kwargs):
131
131
132 ini_file = options['ini']
132 ini_file = options['ini']
133 ini_vars = options['ini_var']
133 ini_vars = options['ini_var']
134
134
135 if ini_file is None:
135 if ini_file is None:
136 print('You must provide the --ini argument to start celery')
136 print('You must provide the --ini argument to start celery')
137 exit(-1)
137 exit(-1)
138
138
139 options = None
139 options = None
140 if ini_vars is not None:
140 if ini_vars is not None:
141 options = parse_ini_vars(ini_vars)
141 options = parse_ini_vars(ini_vars)
142
142
143 celery_app.conf['RC_INI_FILE'] = ini_file
143 celery_app.conf['RC_INI_FILE'] = ini_file
144 celery_app.user_options['RC_INI_FILE'] = ini_file
144 celery_app.user_options['RC_INI_FILE'] = ini_file
145
145
146 celery_app.conf['RC_INI_OPTIONS'] = options
146 celery_app.conf['RC_INI_OPTIONS'] = options
147 celery_app.user_options['RC_INI_OPTIONS'] = options
147 celery_app.user_options['RC_INI_OPTIONS'] = options
148
148
149 setup_logging(ini_file)
149 setup_logging(ini_file)
150
150
151
151
152 def _init_celery(app_type=''):
152 def _init_celery(app_type=''):
153 from rhodecode.config.middleware import get_celery_config
153 from rhodecode.config.middleware import get_celery_config
154
154
155 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
155 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
156
156
157 ini_file = celery_app.conf['RC_INI_FILE']
157 ini_file = celery_app.conf['RC_INI_FILE']
158 options = celery_app.conf['RC_INI_OPTIONS']
158 options = celery_app.conf['RC_INI_OPTIONS']
159
159
160 env = None
160 env = None
161 try:
161 try:
162 env = bootstrap(ini_file, options=options)
162 env = bootstrap(ini_file, options=options)
163 except Exception:
163 except Exception:
164 log.exception('Failed to bootstrap RhodeCode APP. '
164 log.exception('Failed to bootstrap RhodeCode APP. '
165 'Probably there is another error present that prevents from running pyramid app')
165 'Probably there is another error present that prevents from running pyramid app')
166
166
167 if not env:
167 if not env:
168 # we use sys.exit here since we need to signal app startup failure for docker to restart the container and re-try
168 # we use sys.exit here since we need to signal app startup failure for docker to restart the container and re-try
169 sys.exit(1)
169 sys.exit(1)
170
170
171 log.debug('Got Pyramid ENV: %s', env)
171 log.debug('Got Pyramid ENV: %s', env)
172
172
173 settings = env['registry'].settings
173 settings = env['registry'].settings
174 celery_settings = get_celery_config(settings)
174 celery_settings = get_celery_config(settings)
175
175
176 # init and bootstrap StatsdClient
176 # init and bootstrap StatsdClient
177 StatsdClient.setup(settings)
177 StatsdClient.setup(settings)
178
178
179 setup_celery_app(
179 setup_celery_app(
180 app=env['app'], root=env['root'], request=env['request'],
180 app=env['app'], root=env['root'], request=env['request'],
181 registry=env['registry'], closer=env['closer'],
181 registry=env['registry'], closer=env['closer'],
182 celery_settings=celery_settings)
182 celery_settings=celery_settings)
183
183
184
184
185 @signals.celeryd_init.connect
185 @signals.celeryd_init.connect
186 def on_celeryd_init(sender=None, conf=None, **kwargs):
186 def on_celeryd_init(sender=None, conf=None, **kwargs):
187 _init_celery('celery worker')
187 _init_celery('celery worker')
188
188
189 # fix the global flag even if it's disabled via .ini file because this
189 # fix the global flag even if it's disabled via .ini file because this
190 # is a worker code that doesn't need this to be disabled.
190 # is a worker code that doesn't need this to be disabled.
191 rhodecode.CELERY_ENABLED = True
191 rhodecode.CELERY_ENABLED = True
192
192
193
193
194 @signals.beat_init.connect
194 @signals.beat_init.connect
195 def on_beat_init(sender=None, conf=None, **kwargs):
195 def on_beat_init(sender=None, conf=None, **kwargs):
196 _init_celery('celery beat')
196 _init_celery('celery beat')
197
197
198
198
199 @signals.task_prerun.connect
199 @signals.task_prerun.connect
200 def task_prerun_signal(task_id, task, args, **kwargs):
200 def task_prerun_signal(task_id, task, args, **kwargs):
201 ping_db()
201 ping_db()
202 statsd = StatsdClient.statsd
202 statsd = StatsdClient.statsd
203
203
204 if statsd:
204 if statsd:
205 task_repr = getattr(task, 'name', task)
205 task_repr = getattr(task, 'name', task)
206 statsd.incr('rhodecode_celery_task_total', tags=[
206 statsd.incr('rhodecode_celery_task_total', tags=[
207 f'task:{task_repr}',
207 f'task:{task_repr}',
208 'mode:async'
208 'mode:async'
209 ])
209 ])
210
210
211
211
212 @signals.task_success.connect
212 @signals.task_success.connect
213 def task_success_signal(result, **kwargs):
213 def task_success_signal(result, **kwargs):
214 meta.Session.commit()
214 meta.Session.commit()
215 closer = celery_app.conf['PYRAMID_CLOSER']
215 closer = celery_app.conf['PYRAMID_CLOSER']
216 if closer:
216 if closer:
217 closer()
217 closer()
218
218
219
219
220 @signals.task_retry.connect
220 @signals.task_retry.connect
221 def task_retry_signal(
221 def task_retry_signal(
222 request, reason, einfo, **kwargs):
222 request, reason, einfo, **kwargs):
223 meta.Session.remove()
223 meta.Session.remove()
224 closer = celery_app.conf['PYRAMID_CLOSER']
224 closer = celery_app.conf['PYRAMID_CLOSER']
225 if closer:
225 if closer:
226 closer()
226 closer()
227
227
228
228
229 @signals.task_failure.connect
229 @signals.task_failure.connect
230 def task_failure_signal(
230 def task_failure_signal(
231 task_id, exception, args, kwargs, traceback, einfo, **kargs):
231 task_id, exception, args, kwargs, traceback, einfo, **kargs):
232
232
233 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
233 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
234 from rhodecode.lib.exc_tracking import store_exception
234 from rhodecode.lib.exc_tracking import store_exception
235 from rhodecode.lib.statsd_client import StatsdClient
235 from rhodecode.lib.statsd_client import StatsdClient
236
236
237 meta.Session.remove()
237 meta.Session.remove()
238
238
239 # simulate sys.exc_info()
239 # simulate sys.exc_info()
240 exc_info = (einfo.type, einfo.exception, einfo.tb)
240 exc_info = (einfo.type, einfo.exception, einfo.tb)
241 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
241 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
242 statsd = StatsdClient.statsd
242 statsd = StatsdClient.statsd
243 if statsd:
243 if statsd:
244 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
244 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
245 statsd.incr('rhodecode_exception_total',
245 statsd.incr('rhodecode_exception_total',
246 tags=["exc_source:celery", "type:{}".format(exc_type)])
246 tags=["exc_source:celery", "type:{}".format(exc_type)])
247
247
248 closer = celery_app.conf['PYRAMID_CLOSER']
248 closer = celery_app.conf['PYRAMID_CLOSER']
249 if closer:
249 if closer:
250 closer()
250 closer()
251
251
252
252
253 @signals.task_revoked.connect
253 @signals.task_revoked.connect
254 def task_revoked_signal(
254 def task_revoked_signal(
255 request, terminated, signum, expired, **kwargs):
255 request, terminated, signum, expired, **kwargs):
256 closer = celery_app.conf['PYRAMID_CLOSER']
256 closer = celery_app.conf['PYRAMID_CLOSER']
257 if closer:
257 if closer:
258 closer()
258 closer()
259
259
260
260
261 class UNSET(object):
261 class UNSET(object):
262 pass
262 pass
263
263
264
264
265 _unset = UNSET()
265 _unset = UNSET()
266
266
267
267
268 def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset):
268 def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset):
269
269
270 if request is not UNSET:
270 if request is not UNSET:
271 celery_app.conf.update({'PYRAMID_REQUEST': request})
271 celery_app.conf.update({'PYRAMID_REQUEST': request})
272
272
273 if registry is not UNSET:
273 if registry is not UNSET:
274 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
274 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
275
275
276
276
277 def setup_celery_app(app, root, request, registry, closer, celery_settings):
277 def setup_celery_app(app, root, request, registry, closer, celery_settings):
278 log.debug('Got custom celery conf: %s', celery_settings)
278 log.debug('Got custom celery conf: %s', celery_settings)
279 celery_config = base_celery_config
279 celery_config = base_celery_config
280 celery_config.update({
280 celery_config.update({
281 # store celerybeat scheduler db where the .ini file is
281 # store celerybeat scheduler db where the .ini file is
282 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
282 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
283 })
283 })
284
284
285 celery_config.update(celery_settings)
285 celery_config.update(celery_settings)
286 celery_app.config_from_object(celery_config)
286 celery_app.config_from_object(celery_config)
287
287
288 celery_app.conf.update({'PYRAMID_APP': app})
288 celery_app.conf.update({'PYRAMID_APP': app})
289 celery_app.conf.update({'PYRAMID_ROOT': root})
289 celery_app.conf.update({'PYRAMID_ROOT': root})
290 celery_app.conf.update({'PYRAMID_REQUEST': request})
290 celery_app.conf.update({'PYRAMID_REQUEST': request})
291 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
291 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
292 celery_app.conf.update({'PYRAMID_CLOSER': closer})
292 celery_app.conf.update({'PYRAMID_CLOSER': closer})
293
293
294
294
295 def configure_celery(config, celery_settings):
295 def configure_celery(config, celery_settings):
296 """
296 """
297 Helper that is called from our application creation logic. It gives
297 Helper that is called from our application creation logic. It gives
298 connection info into running webapp and allows execution of tasks from
298 connection info into running webapp and allows execution of tasks from
299 RhodeCode itself
299 RhodeCode itself
300 """
300 """
301 # store some globals into rhodecode
301 # store some globals into rhodecode
302 rhodecode.CELERY_ENABLED = str2bool(
302 rhodecode.CELERY_ENABLED = str2bool(
303 config.registry.settings.get('use_celery'))
303 config.registry.settings.get('use_celery'))
304 if rhodecode.CELERY_ENABLED:
304 if rhodecode.CELERY_ENABLED:
305 log.info('Configuring celery based on `%s` settings', celery_settings)
305 log.info('Configuring celery based on `%s` settings', celery_settings)
306 setup_celery_app(
306 setup_celery_app(
307 app=None, root=None, request=None, registry=config.registry,
307 app=None, root=None, request=None, registry=config.registry,
308 closer=None, celery_settings=celery_settings)
308 closer=None, celery_settings=celery_settings)
309
309
310
310
311 def maybe_prepare_env(req):
311 def maybe_prepare_env(req):
312 environ = {}
312 environ = {}
313 try:
313 try:
314 environ.update({
314 environ.update({
315 'PATH_INFO': req.environ['PATH_INFO'],
315 'PATH_INFO': req.environ['PATH_INFO'],
316 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
316 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
317 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
317 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
318 'SERVER_NAME': req.environ['SERVER_NAME'],
318 'SERVER_NAME': req.environ['SERVER_NAME'],
319 'SERVER_PORT': req.environ['SERVER_PORT'],
319 'SERVER_PORT': req.environ['SERVER_PORT'],
320 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
320 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
321 })
321 })
322 except Exception:
322 except Exception:
323 pass
323 pass
324
324
325 return environ
325 return environ
326
326
327
327
328 class RequestContextTask(Task):
328 class RequestContextTask(Task):
329 """
329 """
330 This is a celery task which will create a rhodecode app instance context
330 This is a celery task which will create a rhodecode app instance context
331 for the task, patch pyramid with the original request
331 for the task, patch pyramid with the original request
332 that created the task and also add the user to the context.
332 that created the task and also add the user to the context.
333 """
333 """
334
334
335 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
335 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
336 link=None, link_error=None, shadow=None, **options):
336 link=None, link_error=None, shadow=None, **options):
337 """ queue the job to run (we are in web request context here) """
337 """ queue the job to run (we are in web request context here) """
338 from rhodecode.lib.base import get_ip_addr
338 from rhodecode.lib.base import get_ip_addr
339
339
340 req = self.app.conf['PYRAMID_REQUEST']
340 req = self.app.conf['PYRAMID_REQUEST']
341 if not req:
341 if not req:
342 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
342 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
343
343
344 log.debug('Running Task with class: %s. Request Class: %s',
344 log.debug('Running Task with class: %s. Request Class: %s',
345 self.__class__, req.__class__)
345 self.__class__, req.__class__)
346
346
347 user_id = 0
347 user_id = 0
348
348
349 # web case
349 # web case
350 if hasattr(req, 'user'):
350 if hasattr(req, 'user'):
351 user_id = req.user.user_id
351 user_id = req.user.user_id
352
352
353 # api case
353 # api case
354 elif hasattr(req, 'rpc_user'):
354 elif hasattr(req, 'rpc_user'):
355 user_id = req.rpc_user.user_id
355 user_id = req.rpc_user.user_id
356
356
357 # we hook into kwargs since it is the only way to pass our data to
357 # we hook into kwargs since it is the only way to pass our data to
358 # the celery worker
358 # the celery worker
359 environ = maybe_prepare_env(req)
359 environ = maybe_prepare_env(req)
360 options['headers'] = options.get('headers', {})
360 options['headers'] = options.get('headers', {})
361 options['headers'].update({
361 options['headers'].update({
362 'rhodecode_proxy_data': {
362 'rhodecode_proxy_data': {
363 'environ': environ,
363 'environ': environ,
364 'auth_user': {
364 'auth_user': {
365 'ip_addr': get_ip_addr(req.environ),
365 'ip_addr': get_ip_addr(req.environ),
366 'user_id': user_id
366 'user_id': user_id
367 },
367 },
368 }
368 }
369 })
369 })
370
370
371 return super(RequestContextTask, self).apply_async(
371 return super(RequestContextTask, self).apply_async(
372 args, kwargs, task_id, producer, link, link_error, shadow, **options)
372 args, kwargs, task_id, producer, link, link_error, shadow, **options)
General Comments 0
You need to be logged in to leave comments. Login now