##// END OF EJS Templates
chore(config): optimized celery configuration
super-admin -
r5299:2823fa9f default
parent child Browse files
Show More
@@ -1,369 +1,371 b''
1 # Copyright (C) 2010-2023 RhodeCode GmbH
1 # Copyright (C) 2010-2023 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 """
18 """
19 Celery loader, run with::
19 Celery loader, run with::
20
20
21 celery worker \
21 celery worker \
22 --task-events \
22 --task-events \
23 --beat \
23 --beat \
24 --autoscale=20,2 \
24 --autoscale=20,2 \
25 --max-tasks-per-child 1 \
25 --max-tasks-per-child 1 \
26 --app rhodecode.lib.celerylib.loader \
26 --app rhodecode.lib.celerylib.loader \
27 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
27 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
28 --loglevel DEBUG --ini=.dev/dev.ini
28 --loglevel DEBUG --ini=.dev/dev.ini
29 """
29 """
30 from rhodecode.config.patches import inspect_getargspec, inspect_formatargspec
30 from rhodecode.config.patches import inspect_getargspec, inspect_formatargspec
31 inspect_getargspec()
31 inspect_getargspec()
32 inspect_formatargspec()
32 inspect_formatargspec()
33 # python3.11 inspect patches for backward compat on `paste` code
33 # python3.11 inspect patches for backward compat on `paste` code
34
34
35 import logging
35 import logging
36 import importlib
36 import importlib
37
37
38 import click
38 import click
39 from celery import Celery
39 from celery import Celery
40 from celery import signals
40 from celery import signals
41 from celery import Task
41 from celery import Task
42 from celery import exceptions # noqa
42 from celery import exceptions # noqa
43
43
44 import rhodecode
44 import rhodecode
45
45
46 from rhodecode.lib.statsd_client import StatsdClient
46 from rhodecode.lib.statsd_client import StatsdClient
47 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
47 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
48 from rhodecode.lib.ext_json import json
48 from rhodecode.lib.ext_json import json
49 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
49 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
50 from rhodecode.lib.utils2 import str2bool
50 from rhodecode.lib.utils2 import str2bool
51 from rhodecode.model import meta
51 from rhodecode.model import meta
52
52
53 log = logging.getLogger('celery.rhodecode.loader')
53 log = logging.getLogger('celery.rhodecode.loader')
54
54
55
55
56 imports = ['rhodecode.lib.celerylib.tasks']
56 imports = ['rhodecode.lib.celerylib.tasks']
57
57
58 try:
58 try:
59 # try if we have EE tasks available
59 # try if we have EE tasks available
60 importlib.import_module('rc_ee')
60 importlib.import_module('rc_ee')
61 imports.append('rc_ee.lib.celerylib.tasks')
61 imports.append('rc_ee.lib.celerylib.tasks')
62 except ImportError:
62 except ImportError:
63 pass
63 pass
64
64
65
65
66 base_celery_config = {
66 base_celery_config = {
67 'result_backend': 'rpc://',
67 'result_backend': 'rpc://',
68 'result_expires': 60 * 60 * 24,
68 'result_expires': 60 * 60 * 24,
69 'result_persistent': True,
69 'result_persistent': True,
70 'imports': imports,
70 'imports': imports,
71 'worker_max_tasks_per_child': 20,
71 'worker_max_tasks_per_child': 100,
72 'worker_hijack_root_logger': False,
73 'worker_prefetch_multiplier': 1,
72 'task_serializer': 'msgpack',
74 'task_serializer': 'msgpack',
73 'accept_content': ['json', 'msgpack'],
75 'accept_content': ['json', 'msgpack'],
74 'result_serializer': 'msgpack',
76 'result_serializer': 'msgpack',
75 'result_accept_content': ['json', 'msgpack'],
77 'result_accept_content': ['json', 'msgpack'],
76 'worker_hijack_root_logger': False,
78
77 'broker_connection_retry_on_startup': True,
79 'broker_connection_retry_on_startup': True,
78 'database_table_names': {
80 'database_table_names': {
79 'task': 'beat_taskmeta',
81 'task': 'beat_taskmeta',
80 'group': 'beat_groupmeta',
82 'group': 'beat_groupmeta',
81 }
83 }
82 }
84 }
83
85
84
86
85 preload_option_ini = click.Option(
87 preload_option_ini = click.Option(
86 ('--ini',),
88 ('--ini',),
87 help='Path to ini configuration file.'
89 help='Path to ini configuration file.'
88 )
90 )
89
91
90 preload_option_ini_var = click.Option(
92 preload_option_ini_var = click.Option(
91 ('--ini-var',),
93 ('--ini-var',),
92 help='Comma separated list of key=value to pass to ini.'
94 help='Comma separated list of key=value to pass to ini.'
93 )
95 )
94
96
95
97
96 def get_logger(obj):
98 def get_logger(obj):
97 custom_log = logging.getLogger(
99 custom_log = logging.getLogger(
98 'rhodecode.task.{}'.format(obj.__class__.__name__))
100 'rhodecode.task.{}'.format(obj.__class__.__name__))
99
101
100 if rhodecode.CELERY_ENABLED:
102 if rhodecode.CELERY_ENABLED:
101 try:
103 try:
102 custom_log = obj.get_logger()
104 custom_log = obj.get_logger()
103 except Exception:
105 except Exception:
104 pass
106 pass
105
107
106 return custom_log
108 return custom_log
107
109
108
110
109 # init main celery app
111 # init main celery app
110 celery_app = Celery()
112 celery_app = Celery()
111 celery_app.user_options['preload'].add(preload_option_ini)
113 celery_app.user_options['preload'].add(preload_option_ini)
112 celery_app.user_options['preload'].add(preload_option_ini_var)
114 celery_app.user_options['preload'].add(preload_option_ini_var)
113
115
114
116
115 @signals.setup_logging.connect
117 @signals.setup_logging.connect
116 def setup_logging_callback(**kwargs):
118 def setup_logging_callback(**kwargs):
117
119
118 if 'RC_INI_FILE' in celery_app.conf:
120 if 'RC_INI_FILE' in celery_app.conf:
119 ini_file = celery_app.conf['RC_INI_FILE']
121 ini_file = celery_app.conf['RC_INI_FILE']
120 else:
122 else:
121 ini_file = celery_app.user_options['RC_INI_FILE']
123 ini_file = celery_app.user_options['RC_INI_FILE']
122
124
123 setup_logging(ini_file)
125 setup_logging(ini_file)
124
126
125
127
126 @signals.user_preload_options.connect
128 @signals.user_preload_options.connect
127 def on_preload_parsed(options, **kwargs):
129 def on_preload_parsed(options, **kwargs):
128
130
129 ini_file = options['ini']
131 ini_file = options['ini']
130 ini_vars = options['ini_var']
132 ini_vars = options['ini_var']
131
133
132 if ini_file is None:
134 if ini_file is None:
133 print('You must provide the --ini argument to start celery')
135 print('You must provide the --ini argument to start celery')
134 exit(-1)
136 exit(-1)
135
137
136 options = None
138 options = None
137 if ini_vars is not None:
139 if ini_vars is not None:
138 options = parse_ini_vars(ini_vars)
140 options = parse_ini_vars(ini_vars)
139
141
140 celery_app.conf['RC_INI_FILE'] = ini_file
142 celery_app.conf['RC_INI_FILE'] = ini_file
141 celery_app.user_options['RC_INI_FILE'] = ini_file
143 celery_app.user_options['RC_INI_FILE'] = ini_file
142
144
143 celery_app.conf['RC_INI_OPTIONS'] = options
145 celery_app.conf['RC_INI_OPTIONS'] = options
144 celery_app.user_options['RC_INI_OPTIONS'] = options
146 celery_app.user_options['RC_INI_OPTIONS'] = options
145
147
146 setup_logging(ini_file)
148 setup_logging(ini_file)
147
149
148
150
149 def _init_celery(app_type=''):
151 def _init_celery(app_type=''):
150 from rhodecode.config.middleware import get_celery_config
152 from rhodecode.config.middleware import get_celery_config
151
153
152 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
154 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
153
155
154 ini_file = celery_app.conf['RC_INI_FILE']
156 ini_file = celery_app.conf['RC_INI_FILE']
155 options = celery_app.conf['RC_INI_OPTIONS']
157 options = celery_app.conf['RC_INI_OPTIONS']
156
158
157 env = None
159 env = None
158 try:
160 try:
159 env = bootstrap(ini_file, options=options)
161 env = bootstrap(ini_file, options=options)
160 except Exception:
162 except Exception:
161 log.exception('Failed to bootstrap RhodeCode APP')
163 log.exception('Failed to bootstrap RhodeCode APP')
162
164
163 if not env:
165 if not env:
164 raise EnvironmentError(
166 raise EnvironmentError(
165 'Failed to load pyramid ENV. '
167 'Failed to load pyramid ENV. '
166 'Probably there is another error present that prevents from running pyramid app')
168 'Probably there is another error present that prevents from running pyramid app')
167
169
168 log.debug('Got Pyramid ENV: %s', env)
170 log.debug('Got Pyramid ENV: %s', env)
169
171
170 settings = env['registry'].settings
172 settings = env['registry'].settings
171 celery_settings = get_celery_config(settings)
173 celery_settings = get_celery_config(settings)
172
174
173 # init and bootstrap StatsdClient
175 # init and bootstrap StatsdClient
174 StatsdClient.setup(settings)
176 StatsdClient.setup(settings)
175
177
176 setup_celery_app(
178 setup_celery_app(
177 app=env['app'], root=env['root'], request=env['request'],
179 app=env['app'], root=env['root'], request=env['request'],
178 registry=env['registry'], closer=env['closer'],
180 registry=env['registry'], closer=env['closer'],
179 celery_settings=celery_settings)
181 celery_settings=celery_settings)
180
182
181
183
182 @signals.celeryd_init.connect
184 @signals.celeryd_init.connect
183 def on_celeryd_init(sender=None, conf=None, **kwargs):
185 def on_celeryd_init(sender=None, conf=None, **kwargs):
184 _init_celery('celery worker')
186 _init_celery('celery worker')
185
187
186 # fix the global flag even if it's disabled via .ini file because this
188 # fix the global flag even if it's disabled via .ini file because this
187 # is a worker code that doesn't need this to be disabled.
189 # is a worker code that doesn't need this to be disabled.
188 rhodecode.CELERY_ENABLED = True
190 rhodecode.CELERY_ENABLED = True
189
191
190
192
191 @signals.beat_init.connect
193 @signals.beat_init.connect
192 def on_beat_init(sender=None, conf=None, **kwargs):
194 def on_beat_init(sender=None, conf=None, **kwargs):
193 _init_celery('celery beat')
195 _init_celery('celery beat')
194
196
195
197
196 @signals.task_prerun.connect
198 @signals.task_prerun.connect
197 def task_prerun_signal(task_id, task, args, **kwargs):
199 def task_prerun_signal(task_id, task, args, **kwargs):
198 ping_db()
200 ping_db()
199 statsd = StatsdClient.statsd
201 statsd = StatsdClient.statsd
200
202
201 if statsd:
203 if statsd:
202 task_repr = getattr(task, 'name', task)
204 task_repr = getattr(task, 'name', task)
203 statsd.incr('rhodecode_celery_task_total', tags=[
205 statsd.incr('rhodecode_celery_task_total', tags=[
204 f'task:{task_repr}',
206 f'task:{task_repr}',
205 'mode:async'
207 'mode:async'
206 ])
208 ])
207
209
208
210
209 @signals.task_success.connect
211 @signals.task_success.connect
210 def task_success_signal(result, **kwargs):
212 def task_success_signal(result, **kwargs):
211 meta.Session.commit()
213 meta.Session.commit()
212 closer = celery_app.conf['PYRAMID_CLOSER']
214 closer = celery_app.conf['PYRAMID_CLOSER']
213 if closer:
215 if closer:
214 closer()
216 closer()
215
217
216
218
217 @signals.task_retry.connect
219 @signals.task_retry.connect
218 def task_retry_signal(
220 def task_retry_signal(
219 request, reason, einfo, **kwargs):
221 request, reason, einfo, **kwargs):
220 meta.Session.remove()
222 meta.Session.remove()
221 closer = celery_app.conf['PYRAMID_CLOSER']
223 closer = celery_app.conf['PYRAMID_CLOSER']
222 if closer:
224 if closer:
223 closer()
225 closer()
224
226
225
227
226 @signals.task_failure.connect
228 @signals.task_failure.connect
227 def task_failure_signal(
229 def task_failure_signal(
228 task_id, exception, args, kwargs, traceback, einfo, **kargs):
230 task_id, exception, args, kwargs, traceback, einfo, **kargs):
229
231
230 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
232 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
231 from rhodecode.lib.exc_tracking import store_exception
233 from rhodecode.lib.exc_tracking import store_exception
232 from rhodecode.lib.statsd_client import StatsdClient
234 from rhodecode.lib.statsd_client import StatsdClient
233
235
234 meta.Session.remove()
236 meta.Session.remove()
235
237
236 # simulate sys.exc_info()
238 # simulate sys.exc_info()
237 exc_info = (einfo.type, einfo.exception, einfo.tb)
239 exc_info = (einfo.type, einfo.exception, einfo.tb)
238 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
240 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
239 statsd = StatsdClient.statsd
241 statsd = StatsdClient.statsd
240 if statsd:
242 if statsd:
241 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
243 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
242 statsd.incr('rhodecode_exception_total',
244 statsd.incr('rhodecode_exception_total',
243 tags=["exc_source:celery", "type:{}".format(exc_type)])
245 tags=["exc_source:celery", "type:{}".format(exc_type)])
244
246
245 closer = celery_app.conf['PYRAMID_CLOSER']
247 closer = celery_app.conf['PYRAMID_CLOSER']
246 if closer:
248 if closer:
247 closer()
249 closer()
248
250
249
251
250 @signals.task_revoked.connect
252 @signals.task_revoked.connect
251 def task_revoked_signal(
253 def task_revoked_signal(
252 request, terminated, signum, expired, **kwargs):
254 request, terminated, signum, expired, **kwargs):
253 closer = celery_app.conf['PYRAMID_CLOSER']
255 closer = celery_app.conf['PYRAMID_CLOSER']
254 if closer:
256 if closer:
255 closer()
257 closer()
256
258
257
259
258 class UNSET(object):
260 class UNSET(object):
259 pass
261 pass
260
262
261
263
262 _unset = UNSET()
264 _unset = UNSET()
263
265
264
266
265 def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset):
267 def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset):
266
268
267 if request is not UNSET:
269 if request is not UNSET:
268 celery_app.conf.update({'PYRAMID_REQUEST': request})
270 celery_app.conf.update({'PYRAMID_REQUEST': request})
269
271
270 if registry is not UNSET:
272 if registry is not UNSET:
271 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
273 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
272
274
273
275
274 def setup_celery_app(app, root, request, registry, closer, celery_settings):
276 def setup_celery_app(app, root, request, registry, closer, celery_settings):
275 log.debug('Got custom celery conf: %s', celery_settings)
277 log.debug('Got custom celery conf: %s', celery_settings)
276 celery_config = base_celery_config
278 celery_config = base_celery_config
277 celery_config.update({
279 celery_config.update({
278 # store celerybeat scheduler db where the .ini file is
280 # store celerybeat scheduler db where the .ini file is
279 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
281 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
280 })
282 })
281
283
282 celery_config.update(celery_settings)
284 celery_config.update(celery_settings)
283 celery_app.config_from_object(celery_config)
285 celery_app.config_from_object(celery_config)
284
286
285 celery_app.conf.update({'PYRAMID_APP': app})
287 celery_app.conf.update({'PYRAMID_APP': app})
286 celery_app.conf.update({'PYRAMID_ROOT': root})
288 celery_app.conf.update({'PYRAMID_ROOT': root})
287 celery_app.conf.update({'PYRAMID_REQUEST': request})
289 celery_app.conf.update({'PYRAMID_REQUEST': request})
288 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
290 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
289 celery_app.conf.update({'PYRAMID_CLOSER': closer})
291 celery_app.conf.update({'PYRAMID_CLOSER': closer})
290
292
291
293
292 def configure_celery(config, celery_settings):
294 def configure_celery(config, celery_settings):
293 """
295 """
294 Helper that is called from our application creation logic. It gives
296 Helper that is called from our application creation logic. It gives
295 connection info into running webapp and allows execution of tasks from
297 connection info into running webapp and allows execution of tasks from
296 RhodeCode itself
298 RhodeCode itself
297 """
299 """
298 # store some globals into rhodecode
300 # store some globals into rhodecode
299 rhodecode.CELERY_ENABLED = str2bool(
301 rhodecode.CELERY_ENABLED = str2bool(
300 config.registry.settings.get('use_celery'))
302 config.registry.settings.get('use_celery'))
301 if rhodecode.CELERY_ENABLED:
303 if rhodecode.CELERY_ENABLED:
302 log.info('Configuring celery based on `%s` settings', celery_settings)
304 log.info('Configuring celery based on `%s` settings', celery_settings)
303 setup_celery_app(
305 setup_celery_app(
304 app=None, root=None, request=None, registry=config.registry,
306 app=None, root=None, request=None, registry=config.registry,
305 closer=None, celery_settings=celery_settings)
307 closer=None, celery_settings=celery_settings)
306
308
307
309
308 def maybe_prepare_env(req):
310 def maybe_prepare_env(req):
309 environ = {}
311 environ = {}
310 try:
312 try:
311 environ.update({
313 environ.update({
312 'PATH_INFO': req.environ['PATH_INFO'],
314 'PATH_INFO': req.environ['PATH_INFO'],
313 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
315 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
314 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
316 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
315 'SERVER_NAME': req.environ['SERVER_NAME'],
317 'SERVER_NAME': req.environ['SERVER_NAME'],
316 'SERVER_PORT': req.environ['SERVER_PORT'],
318 'SERVER_PORT': req.environ['SERVER_PORT'],
317 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
319 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
318 })
320 })
319 except Exception:
321 except Exception:
320 pass
322 pass
321
323
322 return environ
324 return environ
323
325
324
326
325 class RequestContextTask(Task):
327 class RequestContextTask(Task):
326 """
328 """
327 This is a celery task which will create a rhodecode app instance context
329 This is a celery task which will create a rhodecode app instance context
328 for the task, patch pyramid with the original request
330 for the task, patch pyramid with the original request
329 that created the task and also add the user to the context.
331 that created the task and also add the user to the context.
330 """
332 """
331
333
332 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
334 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
333 link=None, link_error=None, shadow=None, **options):
335 link=None, link_error=None, shadow=None, **options):
334 """ queue the job to run (we are in web request context here) """
336 """ queue the job to run (we are in web request context here) """
335 from rhodecode.lib.base import get_ip_addr
337 from rhodecode.lib.base import get_ip_addr
336
338
337 req = self.app.conf['PYRAMID_REQUEST']
339 req = self.app.conf['PYRAMID_REQUEST']
338 if not req:
340 if not req:
339 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
341 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
340
342
341 log.debug('Running Task with class: %s. Request Class: %s',
343 log.debug('Running Task with class: %s. Request Class: %s',
342 self.__class__, req.__class__)
344 self.__class__, req.__class__)
343
345
344 user_id = 0
346 user_id = 0
345
347
346 # web case
348 # web case
347 if hasattr(req, 'user'):
349 if hasattr(req, 'user'):
348 user_id = req.user.user_id
350 user_id = req.user.user_id
349
351
350 # api case
352 # api case
351 elif hasattr(req, 'rpc_user'):
353 elif hasattr(req, 'rpc_user'):
352 user_id = req.rpc_user.user_id
354 user_id = req.rpc_user.user_id
353
355
354 # we hook into kwargs since it is the only way to pass our data to
356 # we hook into kwargs since it is the only way to pass our data to
355 # the celery worker
357 # the celery worker
356 environ = maybe_prepare_env(req)
358 environ = maybe_prepare_env(req)
357 options['headers'] = options.get('headers', {})
359 options['headers'] = options.get('headers', {})
358 options['headers'].update({
360 options['headers'].update({
359 'rhodecode_proxy_data': {
361 'rhodecode_proxy_data': {
360 'environ': environ,
362 'environ': environ,
361 'auth_user': {
363 'auth_user': {
362 'ip_addr': get_ip_addr(req.environ),
364 'ip_addr': get_ip_addr(req.environ),
363 'user_id': user_id
365 'user_id': user_id
364 },
366 },
365 }
367 }
366 })
368 })
367
369
368 return super(RequestContextTask, self).apply_async(
370 return super(RequestContextTask, self).apply_async(
369 args, kwargs, task_id, producer, link, link_error, shadow, **options)
371 args, kwargs, task_id, producer, link, link_error, shadow, **options)
General Comments 0
You need to be logged in to leave comments. Login now