##// END OF EJS Templates
chore(config): optimized celery configuration
super-admin -
r5299:2823fa9f default
parent child Browse files
Show More
@@ -1,369 +1,371 b''
1 1 # Copyright (C) 2010-2023 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18 """
19 19 Celery loader, run with::
20 20
21 21 celery worker \
22 22 --task-events \
23 23 --beat \
24 24 --autoscale=20,2 \
25 25 --max-tasks-per-child 1 \
26 26 --app rhodecode.lib.celerylib.loader \
27 27 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
28 28 --loglevel DEBUG --ini=.dev/dev.ini
29 29 """
30 30 from rhodecode.config.patches import inspect_getargspec, inspect_formatargspec
31 31 inspect_getargspec()
32 32 inspect_formatargspec()
33 33 # python3.11 inspect patches for backward compat on `paste` code
34 34
35 35 import logging
36 36 import importlib
37 37
38 38 import click
39 39 from celery import Celery
40 40 from celery import signals
41 41 from celery import Task
42 42 from celery import exceptions # noqa
43 43
44 44 import rhodecode
45 45
46 46 from rhodecode.lib.statsd_client import StatsdClient
47 47 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
48 48 from rhodecode.lib.ext_json import json
49 49 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
50 50 from rhodecode.lib.utils2 import str2bool
51 51 from rhodecode.model import meta
52 52
53 53 log = logging.getLogger('celery.rhodecode.loader')
54 54
55 55
56 56 imports = ['rhodecode.lib.celerylib.tasks']
57 57
58 58 try:
59 59 # try if we have EE tasks available
60 60 importlib.import_module('rc_ee')
61 61 imports.append('rc_ee.lib.celerylib.tasks')
62 62 except ImportError:
63 63 pass
64 64
65 65
66 66 base_celery_config = {
67 67 'result_backend': 'rpc://',
68 68 'result_expires': 60 * 60 * 24,
69 69 'result_persistent': True,
70 70 'imports': imports,
71 'worker_max_tasks_per_child': 20,
71 'worker_max_tasks_per_child': 100,
72 'worker_hijack_root_logger': False,
73 'worker_prefetch_multiplier': 1,
72 74 'task_serializer': 'msgpack',
73 75 'accept_content': ['json', 'msgpack'],
74 76 'result_serializer': 'msgpack',
75 77 'result_accept_content': ['json', 'msgpack'],
76 'worker_hijack_root_logger': False,
78
77 79 'broker_connection_retry_on_startup': True,
78 80 'database_table_names': {
79 81 'task': 'beat_taskmeta',
80 82 'group': 'beat_groupmeta',
81 83 }
82 84 }
83 85
84 86
85 87 preload_option_ini = click.Option(
86 88 ('--ini',),
87 89 help='Path to ini configuration file.'
88 90 )
89 91
90 92 preload_option_ini_var = click.Option(
91 93 ('--ini-var',),
92 94 help='Comma separated list of key=value to pass to ini.'
93 95 )
94 96
95 97
96 98 def get_logger(obj):
97 99 custom_log = logging.getLogger(
98 100 'rhodecode.task.{}'.format(obj.__class__.__name__))
99 101
100 102 if rhodecode.CELERY_ENABLED:
101 103 try:
102 104 custom_log = obj.get_logger()
103 105 except Exception:
104 106 pass
105 107
106 108 return custom_log
107 109
108 110
109 111 # init main celery app
110 112 celery_app = Celery()
111 113 celery_app.user_options['preload'].add(preload_option_ini)
112 114 celery_app.user_options['preload'].add(preload_option_ini_var)
113 115
114 116
115 117 @signals.setup_logging.connect
116 118 def setup_logging_callback(**kwargs):
117 119
118 120 if 'RC_INI_FILE' in celery_app.conf:
119 121 ini_file = celery_app.conf['RC_INI_FILE']
120 122 else:
121 123 ini_file = celery_app.user_options['RC_INI_FILE']
122 124
123 125 setup_logging(ini_file)
124 126
125 127
126 128 @signals.user_preload_options.connect
127 129 def on_preload_parsed(options, **kwargs):
128 130
129 131 ini_file = options['ini']
130 132 ini_vars = options['ini_var']
131 133
132 134 if ini_file is None:
133 135 print('You must provide the --ini argument to start celery')
134 136 exit(-1)
135 137
136 138 options = None
137 139 if ini_vars is not None:
138 140 options = parse_ini_vars(ini_vars)
139 141
140 142 celery_app.conf['RC_INI_FILE'] = ini_file
141 143 celery_app.user_options['RC_INI_FILE'] = ini_file
142 144
143 145 celery_app.conf['RC_INI_OPTIONS'] = options
144 146 celery_app.user_options['RC_INI_OPTIONS'] = options
145 147
146 148 setup_logging(ini_file)
147 149
148 150
149 151 def _init_celery(app_type=''):
150 152 from rhodecode.config.middleware import get_celery_config
151 153
152 154 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
153 155
154 156 ini_file = celery_app.conf['RC_INI_FILE']
155 157 options = celery_app.conf['RC_INI_OPTIONS']
156 158
157 159 env = None
158 160 try:
159 161 env = bootstrap(ini_file, options=options)
160 162 except Exception:
161 163 log.exception('Failed to bootstrap RhodeCode APP')
162 164
163 165 if not env:
164 166 raise EnvironmentError(
165 167 'Failed to load pyramid ENV. '
166 168 'Probably there is another error present that prevents from running pyramid app')
167 169
168 170 log.debug('Got Pyramid ENV: %s', env)
169 171
170 172 settings = env['registry'].settings
171 173 celery_settings = get_celery_config(settings)
172 174
173 175 # init and bootstrap StatsdClient
174 176 StatsdClient.setup(settings)
175 177
176 178 setup_celery_app(
177 179 app=env['app'], root=env['root'], request=env['request'],
178 180 registry=env['registry'], closer=env['closer'],
179 181 celery_settings=celery_settings)
180 182
181 183
182 184 @signals.celeryd_init.connect
183 185 def on_celeryd_init(sender=None, conf=None, **kwargs):
184 186 _init_celery('celery worker')
185 187
186 188 # fix the global flag even if it's disabled via .ini file because this
187 189 # is a worker code that doesn't need this to be disabled.
188 190 rhodecode.CELERY_ENABLED = True
189 191
190 192
191 193 @signals.beat_init.connect
192 194 def on_beat_init(sender=None, conf=None, **kwargs):
193 195 _init_celery('celery beat')
194 196
195 197
196 198 @signals.task_prerun.connect
197 199 def task_prerun_signal(task_id, task, args, **kwargs):
198 200 ping_db()
199 201 statsd = StatsdClient.statsd
200 202
201 203 if statsd:
202 204 task_repr = getattr(task, 'name', task)
203 205 statsd.incr('rhodecode_celery_task_total', tags=[
204 206 f'task:{task_repr}',
205 207 'mode:async'
206 208 ])
207 209
208 210
209 211 @signals.task_success.connect
210 212 def task_success_signal(result, **kwargs):
211 213 meta.Session.commit()
212 214 closer = celery_app.conf['PYRAMID_CLOSER']
213 215 if closer:
214 216 closer()
215 217
216 218
217 219 @signals.task_retry.connect
218 220 def task_retry_signal(
219 221 request, reason, einfo, **kwargs):
220 222 meta.Session.remove()
221 223 closer = celery_app.conf['PYRAMID_CLOSER']
222 224 if closer:
223 225 closer()
224 226
225 227
226 228 @signals.task_failure.connect
227 229 def task_failure_signal(
228 230 task_id, exception, args, kwargs, traceback, einfo, **kargs):
229 231
230 232 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
231 233 from rhodecode.lib.exc_tracking import store_exception
232 234 from rhodecode.lib.statsd_client import StatsdClient
233 235
234 236 meta.Session.remove()
235 237
236 238 # simulate sys.exc_info()
237 239 exc_info = (einfo.type, einfo.exception, einfo.tb)
238 240 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
239 241 statsd = StatsdClient.statsd
240 242 if statsd:
241 243 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
242 244 statsd.incr('rhodecode_exception_total',
243 245 tags=["exc_source:celery", "type:{}".format(exc_type)])
244 246
245 247 closer = celery_app.conf['PYRAMID_CLOSER']
246 248 if closer:
247 249 closer()
248 250
249 251
250 252 @signals.task_revoked.connect
251 253 def task_revoked_signal(
252 254 request, terminated, signum, expired, **kwargs):
253 255 closer = celery_app.conf['PYRAMID_CLOSER']
254 256 if closer:
255 257 closer()
256 258
257 259
258 260 class UNSET(object):
259 261 pass
260 262
261 263
262 264 _unset = UNSET()
263 265
264 266
265 267 def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset):
266 268
267 269 if request is not UNSET:
268 270 celery_app.conf.update({'PYRAMID_REQUEST': request})
269 271
270 272 if registry is not UNSET:
271 273 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
272 274
273 275
274 276 def setup_celery_app(app, root, request, registry, closer, celery_settings):
275 277 log.debug('Got custom celery conf: %s', celery_settings)
276 278 celery_config = base_celery_config
277 279 celery_config.update({
278 280 # store celerybeat scheduler db where the .ini file is
279 281 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
280 282 })
281 283
282 284 celery_config.update(celery_settings)
283 285 celery_app.config_from_object(celery_config)
284 286
285 287 celery_app.conf.update({'PYRAMID_APP': app})
286 288 celery_app.conf.update({'PYRAMID_ROOT': root})
287 289 celery_app.conf.update({'PYRAMID_REQUEST': request})
288 290 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
289 291 celery_app.conf.update({'PYRAMID_CLOSER': closer})
290 292
291 293
292 294 def configure_celery(config, celery_settings):
293 295 """
294 296 Helper that is called from our application creation logic. It gives
295 297 connection info into running webapp and allows execution of tasks from
296 298 RhodeCode itself
297 299 """
298 300 # store some globals into rhodecode
299 301 rhodecode.CELERY_ENABLED = str2bool(
300 302 config.registry.settings.get('use_celery'))
301 303 if rhodecode.CELERY_ENABLED:
302 304 log.info('Configuring celery based on `%s` settings', celery_settings)
303 305 setup_celery_app(
304 306 app=None, root=None, request=None, registry=config.registry,
305 307 closer=None, celery_settings=celery_settings)
306 308
307 309
308 310 def maybe_prepare_env(req):
309 311 environ = {}
310 312 try:
311 313 environ.update({
312 314 'PATH_INFO': req.environ['PATH_INFO'],
313 315 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
314 316 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
315 317 'SERVER_NAME': req.environ['SERVER_NAME'],
316 318 'SERVER_PORT': req.environ['SERVER_PORT'],
317 319 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
318 320 })
319 321 except Exception:
320 322 pass
321 323
322 324 return environ
323 325
324 326
325 327 class RequestContextTask(Task):
326 328 """
327 329 This is a celery task which will create a rhodecode app instance context
328 330 for the task, patch pyramid with the original request
329 331 that created the task and also add the user to the context.
330 332 """
331 333
332 334 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
333 335 link=None, link_error=None, shadow=None, **options):
334 336 """ queue the job to run (we are in web request context here) """
335 337 from rhodecode.lib.base import get_ip_addr
336 338
337 339 req = self.app.conf['PYRAMID_REQUEST']
338 340 if not req:
339 341 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
340 342
341 343 log.debug('Running Task with class: %s. Request Class: %s',
342 344 self.__class__, req.__class__)
343 345
344 346 user_id = 0
345 347
346 348 # web case
347 349 if hasattr(req, 'user'):
348 350 user_id = req.user.user_id
349 351
350 352 # api case
351 353 elif hasattr(req, 'rpc_user'):
352 354 user_id = req.rpc_user.user_id
353 355
354 356 # we hook into kwargs since it is the only way to pass our data to
355 357 # the celery worker
356 358 environ = maybe_prepare_env(req)
357 359 options['headers'] = options.get('headers', {})
358 360 options['headers'].update({
359 361 'rhodecode_proxy_data': {
360 362 'environ': environ,
361 363 'auth_user': {
362 364 'ip_addr': get_ip_addr(req.environ),
363 365 'user_id': user_id
364 366 },
365 367 }
366 368 })
367 369
368 370 return super(RequestContextTask, self).apply_async(
369 371 args, kwargs, task_id, producer, link, link_error, shadow, **options)
General Comments 0
You need to be logged in to leave comments. Login now