##// END OF EJS Templates
fix(celery): don't use msgpack serializer due to problems with certain data types
super-admin -
r5407:ee39fc10 stable
parent child Browse files
Show More
@@ -1,371 +1,371 b''
1 # Copyright (C) 2010-2023 RhodeCode GmbH
1 # Copyright (C) 2010-2023 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 """
18 """
19 Celery loader, run with::
19 Celery loader, run with::
20
20
21 celery worker \
21 celery worker \
22 --task-events \
22 --task-events \
23 --beat \
23 --beat \
24 --autoscale=20,2 \
24 --autoscale=20,2 \
25 --max-tasks-per-child 1 \
25 --max-tasks-per-child 1 \
26 --app rhodecode.lib.celerylib.loader \
26 --app rhodecode.lib.celerylib.loader \
27 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
27 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
28 --loglevel DEBUG --ini=.dev/dev.ini
28 --loglevel DEBUG --ini=.dev/dev.ini
29 """
29 """
30 from rhodecode.config.patches import inspect_getargspec, inspect_formatargspec
30 from rhodecode.config.patches import inspect_getargspec, inspect_formatargspec
31 inspect_getargspec()
31 inspect_getargspec()
32 inspect_formatargspec()
32 inspect_formatargspec()
33 # python3.11 inspect patches for backward compat on `paste` code
33 # python3.11 inspect patches for backward compat on `paste` code
34
34
35 import logging
35 import logging
36 import importlib
36 import importlib
37
37
38 import click
38 import click
39 from celery import Celery
39 from celery import Celery
40 from celery import signals
40 from celery import signals
41 from celery import Task
41 from celery import Task
42 from celery import exceptions # noqa
42 from celery import exceptions # noqa
43
43
44 import rhodecode
44 import rhodecode
45
45
46 from rhodecode.lib.statsd_client import StatsdClient
46 from rhodecode.lib.statsd_client import StatsdClient
47 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
47 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
48 from rhodecode.lib.ext_json import json
48 from rhodecode.lib.ext_json import json
49 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
49 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
50 from rhodecode.lib.utils2 import str2bool
50 from rhodecode.lib.utils2 import str2bool
51 from rhodecode.model import meta
51 from rhodecode.model import meta
52
52
53 log = logging.getLogger('celery.rhodecode.loader')
53 log = logging.getLogger('celery.rhodecode.loader')
54
54
55
55
56 imports = ['rhodecode.lib.celerylib.tasks']
56 imports = ['rhodecode.lib.celerylib.tasks']
57
57
58 try:
58 try:
59 # try if we have EE tasks available
59 # try if we have EE tasks available
60 importlib.import_module('rc_ee')
60 importlib.import_module('rc_ee')
61 imports.append('rc_ee.lib.celerylib.tasks')
61 imports.append('rc_ee.lib.celerylib.tasks')
62 except ImportError:
62 except ImportError:
63 pass
63 pass
64
64
65
65
66 base_celery_config = {
66 base_celery_config = {
67 'result_backend': 'rpc://',
67 'result_backend': 'rpc://',
68 'result_expires': 60 * 60 * 24,
68 'result_expires': 60 * 60 * 24,
69 'result_persistent': True,
69 'result_persistent': True,
70 'imports': imports,
70 'imports': imports,
71 'worker_max_tasks_per_child': 100,
71 'worker_max_tasks_per_child': 100,
72 'worker_hijack_root_logger': False,
72 'worker_hijack_root_logger': False,
73 'worker_prefetch_multiplier': 1,
73 'worker_prefetch_multiplier': 1,
74 'task_serializer': 'msgpack',
74 'task_serializer': 'json',
75 'accept_content': ['json', 'msgpack'],
75 'accept_content': ['json', 'msgpack'],
76 'result_serializer': 'msgpack',
76 'result_serializer': 'json',
77 'result_accept_content': ['json', 'msgpack'],
77 'result_accept_content': ['json', 'msgpack'],
78
78
79 'broker_connection_retry_on_startup': True,
79 'broker_connection_retry_on_startup': True,
80 'database_table_names': {
80 'database_table_names': {
81 'task': 'beat_taskmeta',
81 'task': 'beat_taskmeta',
82 'group': 'beat_groupmeta',
82 'group': 'beat_groupmeta',
83 }
83 }
84 }
84 }
85
85
86
86
87 preload_option_ini = click.Option(
87 preload_option_ini = click.Option(
88 ('--ini',),
88 ('--ini',),
89 help='Path to ini configuration file.'
89 help='Path to ini configuration file.'
90 )
90 )
91
91
92 preload_option_ini_var = click.Option(
92 preload_option_ini_var = click.Option(
93 ('--ini-var',),
93 ('--ini-var',),
94 help='Comma separated list of key=value to pass to ini.'
94 help='Comma separated list of key=value to pass to ini.'
95 )
95 )
96
96
97
97
98 def get_logger(obj):
98 def get_logger(obj):
99 custom_log = logging.getLogger(
99 custom_log = logging.getLogger(
100 'rhodecode.task.{}'.format(obj.__class__.__name__))
100 'rhodecode.task.{}'.format(obj.__class__.__name__))
101
101
102 if rhodecode.CELERY_ENABLED:
102 if rhodecode.CELERY_ENABLED:
103 try:
103 try:
104 custom_log = obj.get_logger()
104 custom_log = obj.get_logger()
105 except Exception:
105 except Exception:
106 pass
106 pass
107
107
108 return custom_log
108 return custom_log
109
109
110
110
111 # init main celery app
111 # init main celery app
112 celery_app = Celery()
112 celery_app = Celery()
113 celery_app.user_options['preload'].add(preload_option_ini)
113 celery_app.user_options['preload'].add(preload_option_ini)
114 celery_app.user_options['preload'].add(preload_option_ini_var)
114 celery_app.user_options['preload'].add(preload_option_ini_var)
115
115
116
116
117 @signals.setup_logging.connect
117 @signals.setup_logging.connect
118 def setup_logging_callback(**kwargs):
118 def setup_logging_callback(**kwargs):
119
119
120 if 'RC_INI_FILE' in celery_app.conf:
120 if 'RC_INI_FILE' in celery_app.conf:
121 ini_file = celery_app.conf['RC_INI_FILE']
121 ini_file = celery_app.conf['RC_INI_FILE']
122 else:
122 else:
123 ini_file = celery_app.user_options['RC_INI_FILE']
123 ini_file = celery_app.user_options['RC_INI_FILE']
124
124
125 setup_logging(ini_file)
125 setup_logging(ini_file)
126
126
127
127
128 @signals.user_preload_options.connect
128 @signals.user_preload_options.connect
129 def on_preload_parsed(options, **kwargs):
129 def on_preload_parsed(options, **kwargs):
130
130
131 ini_file = options['ini']
131 ini_file = options['ini']
132 ini_vars = options['ini_var']
132 ini_vars = options['ini_var']
133
133
134 if ini_file is None:
134 if ini_file is None:
135 print('You must provide the --ini argument to start celery')
135 print('You must provide the --ini argument to start celery')
136 exit(-1)
136 exit(-1)
137
137
138 options = None
138 options = None
139 if ini_vars is not None:
139 if ini_vars is not None:
140 options = parse_ini_vars(ini_vars)
140 options = parse_ini_vars(ini_vars)
141
141
142 celery_app.conf['RC_INI_FILE'] = ini_file
142 celery_app.conf['RC_INI_FILE'] = ini_file
143 celery_app.user_options['RC_INI_FILE'] = ini_file
143 celery_app.user_options['RC_INI_FILE'] = ini_file
144
144
145 celery_app.conf['RC_INI_OPTIONS'] = options
145 celery_app.conf['RC_INI_OPTIONS'] = options
146 celery_app.user_options['RC_INI_OPTIONS'] = options
146 celery_app.user_options['RC_INI_OPTIONS'] = options
147
147
148 setup_logging(ini_file)
148 setup_logging(ini_file)
149
149
150
150
151 def _init_celery(app_type=''):
151 def _init_celery(app_type=''):
152 from rhodecode.config.middleware import get_celery_config
152 from rhodecode.config.middleware import get_celery_config
153
153
154 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
154 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
155
155
156 ini_file = celery_app.conf['RC_INI_FILE']
156 ini_file = celery_app.conf['RC_INI_FILE']
157 options = celery_app.conf['RC_INI_OPTIONS']
157 options = celery_app.conf['RC_INI_OPTIONS']
158
158
159 env = None
159 env = None
160 try:
160 try:
161 env = bootstrap(ini_file, options=options)
161 env = bootstrap(ini_file, options=options)
162 except Exception:
162 except Exception:
163 log.exception('Failed to bootstrap RhodeCode APP')
163 log.exception('Failed to bootstrap RhodeCode APP')
164
164
165 if not env:
165 if not env:
166 raise EnvironmentError(
166 raise EnvironmentError(
167 'Failed to load pyramid ENV. '
167 'Failed to load pyramid ENV. '
168 'Probably there is another error present that prevents from running pyramid app')
168 'Probably there is another error present that prevents from running pyramid app')
169
169
170 log.debug('Got Pyramid ENV: %s', env)
170 log.debug('Got Pyramid ENV: %s', env)
171
171
172 settings = env['registry'].settings
172 settings = env['registry'].settings
173 celery_settings = get_celery_config(settings)
173 celery_settings = get_celery_config(settings)
174
174
175 # init and bootstrap StatsdClient
175 # init and bootstrap StatsdClient
176 StatsdClient.setup(settings)
176 StatsdClient.setup(settings)
177
177
178 setup_celery_app(
178 setup_celery_app(
179 app=env['app'], root=env['root'], request=env['request'],
179 app=env['app'], root=env['root'], request=env['request'],
180 registry=env['registry'], closer=env['closer'],
180 registry=env['registry'], closer=env['closer'],
181 celery_settings=celery_settings)
181 celery_settings=celery_settings)
182
182
183
183
184 @signals.celeryd_init.connect
184 @signals.celeryd_init.connect
185 def on_celeryd_init(sender=None, conf=None, **kwargs):
185 def on_celeryd_init(sender=None, conf=None, **kwargs):
186 _init_celery('celery worker')
186 _init_celery('celery worker')
187
187
188 # fix the global flag even if it's disabled via .ini file because this
188 # fix the global flag even if it's disabled via .ini file because this
189 # is a worker code that doesn't need this to be disabled.
189 # is a worker code that doesn't need this to be disabled.
190 rhodecode.CELERY_ENABLED = True
190 rhodecode.CELERY_ENABLED = True
191
191
192
192
193 @signals.beat_init.connect
193 @signals.beat_init.connect
194 def on_beat_init(sender=None, conf=None, **kwargs):
194 def on_beat_init(sender=None, conf=None, **kwargs):
195 _init_celery('celery beat')
195 _init_celery('celery beat')
196
196
197
197
198 @signals.task_prerun.connect
198 @signals.task_prerun.connect
199 def task_prerun_signal(task_id, task, args, **kwargs):
199 def task_prerun_signal(task_id, task, args, **kwargs):
200 ping_db()
200 ping_db()
201 statsd = StatsdClient.statsd
201 statsd = StatsdClient.statsd
202
202
203 if statsd:
203 if statsd:
204 task_repr = getattr(task, 'name', task)
204 task_repr = getattr(task, 'name', task)
205 statsd.incr('rhodecode_celery_task_total', tags=[
205 statsd.incr('rhodecode_celery_task_total', tags=[
206 f'task:{task_repr}',
206 f'task:{task_repr}',
207 'mode:async'
207 'mode:async'
208 ])
208 ])
209
209
210
210
211 @signals.task_success.connect
211 @signals.task_success.connect
212 def task_success_signal(result, **kwargs):
212 def task_success_signal(result, **kwargs):
213 meta.Session.commit()
213 meta.Session.commit()
214 closer = celery_app.conf['PYRAMID_CLOSER']
214 closer = celery_app.conf['PYRAMID_CLOSER']
215 if closer:
215 if closer:
216 closer()
216 closer()
217
217
218
218
219 @signals.task_retry.connect
219 @signals.task_retry.connect
220 def task_retry_signal(
220 def task_retry_signal(
221 request, reason, einfo, **kwargs):
221 request, reason, einfo, **kwargs):
222 meta.Session.remove()
222 meta.Session.remove()
223 closer = celery_app.conf['PYRAMID_CLOSER']
223 closer = celery_app.conf['PYRAMID_CLOSER']
224 if closer:
224 if closer:
225 closer()
225 closer()
226
226
227
227
228 @signals.task_failure.connect
228 @signals.task_failure.connect
229 def task_failure_signal(
229 def task_failure_signal(
230 task_id, exception, args, kwargs, traceback, einfo, **kargs):
230 task_id, exception, args, kwargs, traceback, einfo, **kargs):
231
231
232 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
232 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
233 from rhodecode.lib.exc_tracking import store_exception
233 from rhodecode.lib.exc_tracking import store_exception
234 from rhodecode.lib.statsd_client import StatsdClient
234 from rhodecode.lib.statsd_client import StatsdClient
235
235
236 meta.Session.remove()
236 meta.Session.remove()
237
237
238 # simulate sys.exc_info()
238 # simulate sys.exc_info()
239 exc_info = (einfo.type, einfo.exception, einfo.tb)
239 exc_info = (einfo.type, einfo.exception, einfo.tb)
240 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
240 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
241 statsd = StatsdClient.statsd
241 statsd = StatsdClient.statsd
242 if statsd:
242 if statsd:
243 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
243 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
244 statsd.incr('rhodecode_exception_total',
244 statsd.incr('rhodecode_exception_total',
245 tags=["exc_source:celery", "type:{}".format(exc_type)])
245 tags=["exc_source:celery", "type:{}".format(exc_type)])
246
246
247 closer = celery_app.conf['PYRAMID_CLOSER']
247 closer = celery_app.conf['PYRAMID_CLOSER']
248 if closer:
248 if closer:
249 closer()
249 closer()
250
250
251
251
252 @signals.task_revoked.connect
252 @signals.task_revoked.connect
253 def task_revoked_signal(
253 def task_revoked_signal(
254 request, terminated, signum, expired, **kwargs):
254 request, terminated, signum, expired, **kwargs):
255 closer = celery_app.conf['PYRAMID_CLOSER']
255 closer = celery_app.conf['PYRAMID_CLOSER']
256 if closer:
256 if closer:
257 closer()
257 closer()
258
258
259
259
260 class UNSET(object):
260 class UNSET(object):
261 pass
261 pass
262
262
263
263
264 _unset = UNSET()
264 _unset = UNSET()
265
265
266
266
267 def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset):
267 def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset):
268
268
269 if request is not UNSET:
269 if request is not UNSET:
270 celery_app.conf.update({'PYRAMID_REQUEST': request})
270 celery_app.conf.update({'PYRAMID_REQUEST': request})
271
271
272 if registry is not UNSET:
272 if registry is not UNSET:
273 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
273 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
274
274
275
275
276 def setup_celery_app(app, root, request, registry, closer, celery_settings):
276 def setup_celery_app(app, root, request, registry, closer, celery_settings):
277 log.debug('Got custom celery conf: %s', celery_settings)
277 log.debug('Got custom celery conf: %s', celery_settings)
278 celery_config = base_celery_config
278 celery_config = base_celery_config
279 celery_config.update({
279 celery_config.update({
280 # store celerybeat scheduler db where the .ini file is
280 # store celerybeat scheduler db where the .ini file is
281 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
281 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
282 })
282 })
283
283
284 celery_config.update(celery_settings)
284 celery_config.update(celery_settings)
285 celery_app.config_from_object(celery_config)
285 celery_app.config_from_object(celery_config)
286
286
287 celery_app.conf.update({'PYRAMID_APP': app})
287 celery_app.conf.update({'PYRAMID_APP': app})
288 celery_app.conf.update({'PYRAMID_ROOT': root})
288 celery_app.conf.update({'PYRAMID_ROOT': root})
289 celery_app.conf.update({'PYRAMID_REQUEST': request})
289 celery_app.conf.update({'PYRAMID_REQUEST': request})
290 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
290 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
291 celery_app.conf.update({'PYRAMID_CLOSER': closer})
291 celery_app.conf.update({'PYRAMID_CLOSER': closer})
292
292
293
293
294 def configure_celery(config, celery_settings):
294 def configure_celery(config, celery_settings):
295 """
295 """
296 Helper that is called from our application creation logic. It gives
296 Helper that is called from our application creation logic. It gives
297 connection info into running webapp and allows execution of tasks from
297 connection info into running webapp and allows execution of tasks from
298 RhodeCode itself
298 RhodeCode itself
299 """
299 """
300 # store some globals into rhodecode
300 # store some globals into rhodecode
301 rhodecode.CELERY_ENABLED = str2bool(
301 rhodecode.CELERY_ENABLED = str2bool(
302 config.registry.settings.get('use_celery'))
302 config.registry.settings.get('use_celery'))
303 if rhodecode.CELERY_ENABLED:
303 if rhodecode.CELERY_ENABLED:
304 log.info('Configuring celery based on `%s` settings', celery_settings)
304 log.info('Configuring celery based on `%s` settings', celery_settings)
305 setup_celery_app(
305 setup_celery_app(
306 app=None, root=None, request=None, registry=config.registry,
306 app=None, root=None, request=None, registry=config.registry,
307 closer=None, celery_settings=celery_settings)
307 closer=None, celery_settings=celery_settings)
308
308
309
309
310 def maybe_prepare_env(req):
310 def maybe_prepare_env(req):
311 environ = {}
311 environ = {}
312 try:
312 try:
313 environ.update({
313 environ.update({
314 'PATH_INFO': req.environ['PATH_INFO'],
314 'PATH_INFO': req.environ['PATH_INFO'],
315 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
315 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
316 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
316 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
317 'SERVER_NAME': req.environ['SERVER_NAME'],
317 'SERVER_NAME': req.environ['SERVER_NAME'],
318 'SERVER_PORT': req.environ['SERVER_PORT'],
318 'SERVER_PORT': req.environ['SERVER_PORT'],
319 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
319 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
320 })
320 })
321 except Exception:
321 except Exception:
322 pass
322 pass
323
323
324 return environ
324 return environ
325
325
326
326
327 class RequestContextTask(Task):
327 class RequestContextTask(Task):
328 """
328 """
329 This is a celery task which will create a rhodecode app instance context
329 This is a celery task which will create a rhodecode app instance context
330 for the task, patch pyramid with the original request
330 for the task, patch pyramid with the original request
331 that created the task and also add the user to the context.
331 that created the task and also add the user to the context.
332 """
332 """
333
333
334 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
334 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
335 link=None, link_error=None, shadow=None, **options):
335 link=None, link_error=None, shadow=None, **options):
336 """ queue the job to run (we are in web request context here) """
336 """ queue the job to run (we are in web request context here) """
337 from rhodecode.lib.base import get_ip_addr
337 from rhodecode.lib.base import get_ip_addr
338
338
339 req = self.app.conf['PYRAMID_REQUEST']
339 req = self.app.conf['PYRAMID_REQUEST']
340 if not req:
340 if not req:
341 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
341 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
342
342
343 log.debug('Running Task with class: %s. Request Class: %s',
343 log.debug('Running Task with class: %s. Request Class: %s',
344 self.__class__, req.__class__)
344 self.__class__, req.__class__)
345
345
346 user_id = 0
346 user_id = 0
347
347
348 # web case
348 # web case
349 if hasattr(req, 'user'):
349 if hasattr(req, 'user'):
350 user_id = req.user.user_id
350 user_id = req.user.user_id
351
351
352 # api case
352 # api case
353 elif hasattr(req, 'rpc_user'):
353 elif hasattr(req, 'rpc_user'):
354 user_id = req.rpc_user.user_id
354 user_id = req.rpc_user.user_id
355
355
356 # we hook into kwargs since it is the only way to pass our data to
356 # we hook into kwargs since it is the only way to pass our data to
357 # the celery worker
357 # the celery worker
358 environ = maybe_prepare_env(req)
358 environ = maybe_prepare_env(req)
359 options['headers'] = options.get('headers', {})
359 options['headers'] = options.get('headers', {})
360 options['headers'].update({
360 options['headers'].update({
361 'rhodecode_proxy_data': {
361 'rhodecode_proxy_data': {
362 'environ': environ,
362 'environ': environ,
363 'auth_user': {
363 'auth_user': {
364 'ip_addr': get_ip_addr(req.environ),
364 'ip_addr': get_ip_addr(req.environ),
365 'user_id': user_id
365 'user_id': user_id
366 },
366 },
367 }
367 }
368 })
368 })
369
369
370 return super(RequestContextTask, self).apply_async(
370 return super(RequestContextTask, self).apply_async(
371 args, kwargs, task_id, producer, link, link_error, shadow, **options)
371 args, kwargs, task_id, producer, link, link_error, shadow, **options)
General Comments 0
You need to be logged in to leave comments. Login now