##// END OF EJS Templates
feat(celery): set msgpack as default serializer and drop json_ext as it's actually slower
super-admin -
r5296:121552d9 default
parent child Browse files
Show More
@@ -1,374 +1,369 b''
1 # Copyright (C) 2010-2023 RhodeCode GmbH
1 # Copyright (C) 2010-2023 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 """
18 """
19 Celery loader, run with::
19 Celery loader, run with::
20
20
21 celery worker \
21 celery worker \
22 --task-events \
22 --task-events \
23 --beat \
23 --beat \
24 --autoscale=20,2 \
24 --autoscale=20,2 \
25 --max-tasks-per-child 1 \
25 --max-tasks-per-child 1 \
26 --app rhodecode.lib.celerylib.loader \
26 --app rhodecode.lib.celerylib.loader \
27 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
27 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
28 --loglevel DEBUG --ini=.dev/dev.ini
28 --loglevel DEBUG --ini=.dev/dev.ini
29 """
29 """
30 from rhodecode.config.patches import inspect_getargspec, inspect_formatargspec
30 from rhodecode.config.patches import inspect_getargspec, inspect_formatargspec
31 inspect_getargspec()
31 inspect_getargspec()
32 inspect_formatargspec()
32 inspect_formatargspec()
33 # python3.11 inspect patches for backward compat on `paste` code
33 # python3.11 inspect patches for backward compat on `paste` code
34
34
35 import logging
35 import logging
36 import importlib
36 import importlib
37
37
38 import click
38 import click
39 from celery import Celery
39 from celery import Celery
40 from celery import signals
40 from celery import signals
41 from celery import Task
41 from celery import Task
42 from celery import exceptions # noqa
42 from celery import exceptions # noqa
43 from kombu.serialization import register
44
43
45 import rhodecode
44 import rhodecode
46
45
47 from rhodecode.lib.statsd_client import StatsdClient
46 from rhodecode.lib.statsd_client import StatsdClient
48 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
47 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
49 from rhodecode.lib.ext_json import json
48 from rhodecode.lib.ext_json import json
50 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
49 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
51 from rhodecode.lib.utils2 import str2bool
50 from rhodecode.lib.utils2 import str2bool
52 from rhodecode.model import meta
51 from rhodecode.model import meta
53
52
54
55 register('json_ext', json.dumps, json.loads,
56 content_type='application/x-json-ext',
57 content_encoding='utf-8')
58
59 log = logging.getLogger('celery.rhodecode.loader')
53 log = logging.getLogger('celery.rhodecode.loader')
60
54
61
55
62 imports = ['rhodecode.lib.celerylib.tasks']
56 imports = ['rhodecode.lib.celerylib.tasks']
63
57
64 try:
58 try:
65 # try if we have EE tasks available
59 # try if we have EE tasks available
66 importlib.import_module('rc_ee')
60 importlib.import_module('rc_ee')
67 imports.append('rc_ee.lib.celerylib.tasks')
61 imports.append('rc_ee.lib.celerylib.tasks')
68 except ImportError:
62 except ImportError:
69 pass
63 pass
70
64
71
65
72 base_celery_config = {
66 base_celery_config = {
73 'result_backend': 'rpc://',
67 'result_backend': 'rpc://',
74 'result_expires': 60 * 60 * 24,
68 'result_expires': 60 * 60 * 24,
75 'result_persistent': True,
69 'result_persistent': True,
76 'imports': imports,
70 'imports': imports,
77 'worker_max_tasks_per_child': 20,
71 'worker_max_tasks_per_child': 20,
78 'accept_content': ['json_ext', 'json'],
72 'task_serializer': 'msgpack',
79 'task_serializer': 'json_ext',
73 'accept_content': ['json', 'msgpack'],
80 'result_serializer': 'json_ext',
74 'result_serializer': 'msgpack',
75 'result_accept_content': ['json', 'msgpack'],
81 'worker_hijack_root_logger': False,
76 'worker_hijack_root_logger': False,
82 'broker_connection_retry_on_startup': True,
77 'broker_connection_retry_on_startup': True,
83 'database_table_names': {
78 'database_table_names': {
84 'task': 'beat_taskmeta',
79 'task': 'beat_taskmeta',
85 'group': 'beat_groupmeta',
80 'group': 'beat_groupmeta',
86 }
81 }
87 }
82 }
88
83
89
84
90 preload_option_ini = click.Option(
85 preload_option_ini = click.Option(
91 ('--ini',),
86 ('--ini',),
92 help='Path to ini configuration file.'
87 help='Path to ini configuration file.'
93 )
88 )
94
89
95 preload_option_ini_var = click.Option(
90 preload_option_ini_var = click.Option(
96 ('--ini-var',),
91 ('--ini-var',),
97 help='Comma separated list of key=value to pass to ini.'
92 help='Comma separated list of key=value to pass to ini.'
98 )
93 )
99
94
100
95
101 def get_logger(obj):
96 def get_logger(obj):
102 custom_log = logging.getLogger(
97 custom_log = logging.getLogger(
103 'rhodecode.task.{}'.format(obj.__class__.__name__))
98 'rhodecode.task.{}'.format(obj.__class__.__name__))
104
99
105 if rhodecode.CELERY_ENABLED:
100 if rhodecode.CELERY_ENABLED:
106 try:
101 try:
107 custom_log = obj.get_logger()
102 custom_log = obj.get_logger()
108 except Exception:
103 except Exception:
109 pass
104 pass
110
105
111 return custom_log
106 return custom_log
112
107
113
108
114 # init main celery app
109 # init main celery app
115 celery_app = Celery()
110 celery_app = Celery()
116 celery_app.user_options['preload'].add(preload_option_ini)
111 celery_app.user_options['preload'].add(preload_option_ini)
117 celery_app.user_options['preload'].add(preload_option_ini_var)
112 celery_app.user_options['preload'].add(preload_option_ini_var)
118
113
119
114
120 @signals.setup_logging.connect
115 @signals.setup_logging.connect
121 def setup_logging_callback(**kwargs):
116 def setup_logging_callback(**kwargs):
122
117
123 if 'RC_INI_FILE' in celery_app.conf:
118 if 'RC_INI_FILE' in celery_app.conf:
124 ini_file = celery_app.conf['RC_INI_FILE']
119 ini_file = celery_app.conf['RC_INI_FILE']
125 else:
120 else:
126 ini_file = celery_app.user_options['RC_INI_FILE']
121 ini_file = celery_app.user_options['RC_INI_FILE']
127
122
128 setup_logging(ini_file)
123 setup_logging(ini_file)
129
124
130
125
131 @signals.user_preload_options.connect
126 @signals.user_preload_options.connect
132 def on_preload_parsed(options, **kwargs):
127 def on_preload_parsed(options, **kwargs):
133
128
134 ini_file = options['ini']
129 ini_file = options['ini']
135 ini_vars = options['ini_var']
130 ini_vars = options['ini_var']
136
131
137 if ini_file is None:
132 if ini_file is None:
138 print('You must provide the --ini argument to start celery')
133 print('You must provide the --ini argument to start celery')
139 exit(-1)
134 exit(-1)
140
135
141 options = None
136 options = None
142 if ini_vars is not None:
137 if ini_vars is not None:
143 options = parse_ini_vars(ini_vars)
138 options = parse_ini_vars(ini_vars)
144
139
145 celery_app.conf['RC_INI_FILE'] = ini_file
140 celery_app.conf['RC_INI_FILE'] = ini_file
146 celery_app.user_options['RC_INI_FILE'] = ini_file
141 celery_app.user_options['RC_INI_FILE'] = ini_file
147
142
148 celery_app.conf['RC_INI_OPTIONS'] = options
143 celery_app.conf['RC_INI_OPTIONS'] = options
149 celery_app.user_options['RC_INI_OPTIONS'] = options
144 celery_app.user_options['RC_INI_OPTIONS'] = options
150
145
151 setup_logging(ini_file)
146 setup_logging(ini_file)
152
147
153
148
154 def _init_celery(app_type=''):
149 def _init_celery(app_type=''):
155 from rhodecode.config.middleware import get_celery_config
150 from rhodecode.config.middleware import get_celery_config
156
151
157 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
152 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
158
153
159 ini_file = celery_app.conf['RC_INI_FILE']
154 ini_file = celery_app.conf['RC_INI_FILE']
160 options = celery_app.conf['RC_INI_OPTIONS']
155 options = celery_app.conf['RC_INI_OPTIONS']
161
156
162 env = None
157 env = None
163 try:
158 try:
164 env = bootstrap(ini_file, options=options)
159 env = bootstrap(ini_file, options=options)
165 except Exception:
160 except Exception:
166 log.exception('Failed to bootstrap RhodeCode APP')
161 log.exception('Failed to bootstrap RhodeCode APP')
167
162
168 if not env:
163 if not env:
169 raise EnvironmentError(
164 raise EnvironmentError(
170 'Failed to load pyramid ENV. '
165 'Failed to load pyramid ENV. '
171 'Probably there is another error present that prevents from running pyramid app')
166 'Probably there is another error present that prevents from running pyramid app')
172
167
173 log.debug('Got Pyramid ENV: %s', env)
168 log.debug('Got Pyramid ENV: %s', env)
174
169
175 settings = env['registry'].settings
170 settings = env['registry'].settings
176 celery_settings = get_celery_config(settings)
171 celery_settings = get_celery_config(settings)
177
172
178 # init and bootstrap StatsdClient
173 # init and bootstrap StatsdClient
179 StatsdClient.setup(settings)
174 StatsdClient.setup(settings)
180
175
181 setup_celery_app(
176 setup_celery_app(
182 app=env['app'], root=env['root'], request=env['request'],
177 app=env['app'], root=env['root'], request=env['request'],
183 registry=env['registry'], closer=env['closer'],
178 registry=env['registry'], closer=env['closer'],
184 celery_settings=celery_settings)
179 celery_settings=celery_settings)
185
180
186
181
187 @signals.celeryd_init.connect
182 @signals.celeryd_init.connect
188 def on_celeryd_init(sender=None, conf=None, **kwargs):
183 def on_celeryd_init(sender=None, conf=None, **kwargs):
189 _init_celery('celery worker')
184 _init_celery('celery worker')
190
185
191 # fix the global flag even if it's disabled via .ini file because this
186 # fix the global flag even if it's disabled via .ini file because this
192 # is a worker code that doesn't need this to be disabled.
187 # is a worker code that doesn't need this to be disabled.
193 rhodecode.CELERY_ENABLED = True
188 rhodecode.CELERY_ENABLED = True
194
189
195
190
196 @signals.beat_init.connect
191 @signals.beat_init.connect
197 def on_beat_init(sender=None, conf=None, **kwargs):
192 def on_beat_init(sender=None, conf=None, **kwargs):
198 _init_celery('celery beat')
193 _init_celery('celery beat')
199
194
200
195
201 @signals.task_prerun.connect
196 @signals.task_prerun.connect
202 def task_prerun_signal(task_id, task, args, **kwargs):
197 def task_prerun_signal(task_id, task, args, **kwargs):
203 ping_db()
198 ping_db()
204 statsd = StatsdClient.statsd
199 statsd = StatsdClient.statsd
205
200
206 if statsd:
201 if statsd:
207 task_repr = getattr(task, 'name', task)
202 task_repr = getattr(task, 'name', task)
208 statsd.incr('rhodecode_celery_task_total', tags=[
203 statsd.incr('rhodecode_celery_task_total', tags=[
209 f'task:{task_repr}',
204 f'task:{task_repr}',
210 'mode:async'
205 'mode:async'
211 ])
206 ])
212
207
213
208
214 @signals.task_success.connect
209 @signals.task_success.connect
215 def task_success_signal(result, **kwargs):
210 def task_success_signal(result, **kwargs):
216 meta.Session.commit()
211 meta.Session.commit()
217 closer = celery_app.conf['PYRAMID_CLOSER']
212 closer = celery_app.conf['PYRAMID_CLOSER']
218 if closer:
213 if closer:
219 closer()
214 closer()
220
215
221
216
222 @signals.task_retry.connect
217 @signals.task_retry.connect
223 def task_retry_signal(
218 def task_retry_signal(
224 request, reason, einfo, **kwargs):
219 request, reason, einfo, **kwargs):
225 meta.Session.remove()
220 meta.Session.remove()
226 closer = celery_app.conf['PYRAMID_CLOSER']
221 closer = celery_app.conf['PYRAMID_CLOSER']
227 if closer:
222 if closer:
228 closer()
223 closer()
229
224
230
225
231 @signals.task_failure.connect
226 @signals.task_failure.connect
232 def task_failure_signal(
227 def task_failure_signal(
233 task_id, exception, args, kwargs, traceback, einfo, **kargs):
228 task_id, exception, args, kwargs, traceback, einfo, **kargs):
234
229
235 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
230 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
236 from rhodecode.lib.exc_tracking import store_exception
231 from rhodecode.lib.exc_tracking import store_exception
237 from rhodecode.lib.statsd_client import StatsdClient
232 from rhodecode.lib.statsd_client import StatsdClient
238
233
239 meta.Session.remove()
234 meta.Session.remove()
240
235
241 # simulate sys.exc_info()
236 # simulate sys.exc_info()
242 exc_info = (einfo.type, einfo.exception, einfo.tb)
237 exc_info = (einfo.type, einfo.exception, einfo.tb)
243 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
238 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
244 statsd = StatsdClient.statsd
239 statsd = StatsdClient.statsd
245 if statsd:
240 if statsd:
246 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
241 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
247 statsd.incr('rhodecode_exception_total',
242 statsd.incr('rhodecode_exception_total',
248 tags=["exc_source:celery", "type:{}".format(exc_type)])
243 tags=["exc_source:celery", "type:{}".format(exc_type)])
249
244
250 closer = celery_app.conf['PYRAMID_CLOSER']
245 closer = celery_app.conf['PYRAMID_CLOSER']
251 if closer:
246 if closer:
252 closer()
247 closer()
253
248
254
249
255 @signals.task_revoked.connect
250 @signals.task_revoked.connect
256 def task_revoked_signal(
251 def task_revoked_signal(
257 request, terminated, signum, expired, **kwargs):
252 request, terminated, signum, expired, **kwargs):
258 closer = celery_app.conf['PYRAMID_CLOSER']
253 closer = celery_app.conf['PYRAMID_CLOSER']
259 if closer:
254 if closer:
260 closer()
255 closer()
261
256
262
257
263 class UNSET(object):
258 class UNSET(object):
264 pass
259 pass
265
260
266
261
267 _unset = UNSET()
262 _unset = UNSET()
268
263
269
264
270 def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset):
265 def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset):
271
266
272 if request is not UNSET:
267 if request is not UNSET:
273 celery_app.conf.update({'PYRAMID_REQUEST': request})
268 celery_app.conf.update({'PYRAMID_REQUEST': request})
274
269
275 if registry is not UNSET:
270 if registry is not UNSET:
276 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
271 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
277
272
278
273
279 def setup_celery_app(app, root, request, registry, closer, celery_settings):
274 def setup_celery_app(app, root, request, registry, closer, celery_settings):
280 log.debug('Got custom celery conf: %s', celery_settings)
275 log.debug('Got custom celery conf: %s', celery_settings)
281 celery_config = base_celery_config
276 celery_config = base_celery_config
282 celery_config.update({
277 celery_config.update({
283 # store celerybeat scheduler db where the .ini file is
278 # store celerybeat scheduler db where the .ini file is
284 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
279 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
285 })
280 })
286
281
287 celery_config.update(celery_settings)
282 celery_config.update(celery_settings)
288 celery_app.config_from_object(celery_config)
283 celery_app.config_from_object(celery_config)
289
284
290 celery_app.conf.update({'PYRAMID_APP': app})
285 celery_app.conf.update({'PYRAMID_APP': app})
291 celery_app.conf.update({'PYRAMID_ROOT': root})
286 celery_app.conf.update({'PYRAMID_ROOT': root})
292 celery_app.conf.update({'PYRAMID_REQUEST': request})
287 celery_app.conf.update({'PYRAMID_REQUEST': request})
293 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
288 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
294 celery_app.conf.update({'PYRAMID_CLOSER': closer})
289 celery_app.conf.update({'PYRAMID_CLOSER': closer})
295
290
296
291
297 def configure_celery(config, celery_settings):
292 def configure_celery(config, celery_settings):
298 """
293 """
299 Helper that is called from our application creation logic. It gives
294 Helper that is called from our application creation logic. It gives
300 connection info into running webapp and allows execution of tasks from
295 connection info into running webapp and allows execution of tasks from
301 RhodeCode itself
296 RhodeCode itself
302 """
297 """
303 # store some globals into rhodecode
298 # store some globals into rhodecode
304 rhodecode.CELERY_ENABLED = str2bool(
299 rhodecode.CELERY_ENABLED = str2bool(
305 config.registry.settings.get('use_celery'))
300 config.registry.settings.get('use_celery'))
306 if rhodecode.CELERY_ENABLED:
301 if rhodecode.CELERY_ENABLED:
307 log.info('Configuring celery based on `%s` settings', celery_settings)
302 log.info('Configuring celery based on `%s` settings', celery_settings)
308 setup_celery_app(
303 setup_celery_app(
309 app=None, root=None, request=None, registry=config.registry,
304 app=None, root=None, request=None, registry=config.registry,
310 closer=None, celery_settings=celery_settings)
305 closer=None, celery_settings=celery_settings)
311
306
312
307
313 def maybe_prepare_env(req):
308 def maybe_prepare_env(req):
314 environ = {}
309 environ = {}
315 try:
310 try:
316 environ.update({
311 environ.update({
317 'PATH_INFO': req.environ['PATH_INFO'],
312 'PATH_INFO': req.environ['PATH_INFO'],
318 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
313 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
319 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
314 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
320 'SERVER_NAME': req.environ['SERVER_NAME'],
315 'SERVER_NAME': req.environ['SERVER_NAME'],
321 'SERVER_PORT': req.environ['SERVER_PORT'],
316 'SERVER_PORT': req.environ['SERVER_PORT'],
322 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
317 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
323 })
318 })
324 except Exception:
319 except Exception:
325 pass
320 pass
326
321
327 return environ
322 return environ
328
323
329
324
330 class RequestContextTask(Task):
325 class RequestContextTask(Task):
331 """
326 """
332 This is a celery task which will create a rhodecode app instance context
327 This is a celery task which will create a rhodecode app instance context
333 for the task, patch pyramid with the original request
328 for the task, patch pyramid with the original request
334 that created the task and also add the user to the context.
329 that created the task and also add the user to the context.
335 """
330 """
336
331
337 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
332 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
338 link=None, link_error=None, shadow=None, **options):
333 link=None, link_error=None, shadow=None, **options):
339 """ queue the job to run (we are in web request context here) """
334 """ queue the job to run (we are in web request context here) """
340 from rhodecode.lib.base import get_ip_addr
335 from rhodecode.lib.base import get_ip_addr
341
336
342 req = self.app.conf['PYRAMID_REQUEST']
337 req = self.app.conf['PYRAMID_REQUEST']
343 if not req:
338 if not req:
344 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
339 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
345
340
346 log.debug('Running Task with class: %s. Request Class: %s',
341 log.debug('Running Task with class: %s. Request Class: %s',
347 self.__class__, req.__class__)
342 self.__class__, req.__class__)
348
343
349 user_id = 0
344 user_id = 0
350
345
351 # web case
346 # web case
352 if hasattr(req, 'user'):
347 if hasattr(req, 'user'):
353 user_id = req.user.user_id
348 user_id = req.user.user_id
354
349
355 # api case
350 # api case
356 elif hasattr(req, 'rpc_user'):
351 elif hasattr(req, 'rpc_user'):
357 user_id = req.rpc_user.user_id
352 user_id = req.rpc_user.user_id
358
353
359 # we hook into kwargs since it is the only way to pass our data to
354 # we hook into kwargs since it is the only way to pass our data to
360 # the celery worker
355 # the celery worker
361 environ = maybe_prepare_env(req)
356 environ = maybe_prepare_env(req)
362 options['headers'] = options.get('headers', {})
357 options['headers'] = options.get('headers', {})
363 options['headers'].update({
358 options['headers'].update({
364 'rhodecode_proxy_data': {
359 'rhodecode_proxy_data': {
365 'environ': environ,
360 'environ': environ,
366 'auth_user': {
361 'auth_user': {
367 'ip_addr': get_ip_addr(req.environ),
362 'ip_addr': get_ip_addr(req.environ),
368 'user_id': user_id
363 'user_id': user_id
369 },
364 },
370 }
365 }
371 })
366 })
372
367
373 return super(RequestContextTask, self).apply_async(
368 return super(RequestContextTask, self).apply_async(
374 args, kwargs, task_id, producer, link, link_error, shadow, **options)
369 args, kwargs, task_id, producer, link, link_error, shadow, **options)
General Comments 0
You need to be logged in to leave comments. Login now