##// END OF EJS Templates
fix(celery): fixed celery logging error about the missing keys for ini options
super-admin -
r5244:5dc4258b default
parent child Browse files
Show More
@@ -1,364 +1,373 b''
1 1 # Copyright (C) 2010-2023 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18 """
19 19 Celery loader, run with::
20 20
21 21 celery worker \
22 22 --task-events \
23 23 --beat \
24 24 --autoscale=20,2 \
25 25 --max-tasks-per-child 1 \
26 26 --app rhodecode.lib.celerylib.loader \
27 27 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
28 28 --loglevel DEBUG --ini=.dev/dev.ini
29 29 """
30 30 from rhodecode.config.patches import inspect_getargspec, inspect_formatargspec
31 31 inspect_getargspec()
32 32 inspect_formatargspec()
33 33 # python3.11 inspect patches for backward compat on `paste` code
34 34
35 35 import logging
36 36 import importlib
37 37
38 38 import click
39 39 from celery import Celery
40 40 from celery import signals
41 41 from celery import Task
42 42 from celery import exceptions # noqa
43 43 from kombu.serialization import register
44 44
45 45 import rhodecode
46 46
47 47 from rhodecode.lib.statsd_client import StatsdClient
48 48 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
49 49 from rhodecode.lib.ext_json import json
50 50 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
51 51 from rhodecode.lib.utils2 import str2bool
52 52 from rhodecode.model import meta
53 53
54 54
55 55 register('json_ext', json.dumps, json.loads,
56 56 content_type='application/x-json-ext',
57 57 content_encoding='utf-8')
58 58
59 59 log = logging.getLogger('celery.rhodecode.loader')
60 60
61 61
62 62 imports = ['rhodecode.lib.celerylib.tasks']
63 63
64 64 try:
65 65 # try if we have EE tasks available
66 66 importlib.import_module('rc_ee')
67 67 imports.append('rc_ee.lib.celerylib.tasks')
68 68 except ImportError:
69 69 pass
70 70
71 71
72 72 base_celery_config = {
73 73 'result_backend': 'rpc://',
74 74 'result_expires': 60 * 60 * 24,
75 75 'result_persistent': True,
76 76 'imports': imports,
77 77 'worker_max_tasks_per_child': 20,
78 78 'accept_content': ['json_ext', 'json'],
79 79 'task_serializer': 'json_ext',
80 80 'result_serializer': 'json_ext',
81 81 'worker_hijack_root_logger': False,
82 82 'database_table_names': {
83 83 'task': 'beat_taskmeta',
84 84 'group': 'beat_groupmeta',
85 85 }
86 86 }
87 87
88 88
89 89 preload_option_ini = click.Option(
90 90 ('--ini',),
91 91 help='Path to ini configuration file.'
92 92 )
93 93
94 94 preload_option_ini_var = click.Option(
95 95 ('--ini-var',),
96 96 help='Comma separated list of key=value to pass to ini.'
97 97 )
98 98
99 99
100 100 def get_logger(obj):
101 101 custom_log = logging.getLogger(
102 102 'rhodecode.task.{}'.format(obj.__class__.__name__))
103 103
104 104 if rhodecode.CELERY_ENABLED:
105 105 try:
106 106 custom_log = obj.get_logger()
107 107 except Exception:
108 108 pass
109 109
110 110 return custom_log
111 111
112 112
113 113 # init main celery app
114 114 celery_app = Celery()
115 115 celery_app.user_options['preload'].add(preload_option_ini)
116 116 celery_app.user_options['preload'].add(preload_option_ini_var)
117 117
118 118
119 119 @signals.setup_logging.connect
120 120 def setup_logging_callback(**kwargs):
121
122 if 'RC_INI_FILE' in celery_app.conf:
121 123 ini_file = celery_app.conf['RC_INI_FILE']
124 else:
125 ini_file = celery_app.user_options['RC_INI_FILE']
126
122 127 setup_logging(ini_file)
123 128
124 129
125 130 @signals.user_preload_options.connect
126 131 def on_preload_parsed(options, **kwargs):
127 132
128 133 ini_file = options['ini']
129 134 ini_vars = options['ini_var']
130 135
131 136 if ini_file is None:
132 137 print('You must provide the --ini argument to start celery')
133 138 exit(-1)
134 139
135 140 options = None
136 141 if ini_vars is not None:
137 142 options = parse_ini_vars(ini_vars)
138 143
139 144 celery_app.conf['RC_INI_FILE'] = ini_file
145 celery_app.user_options['RC_INI_FILE'] = ini_file
146
140 147 celery_app.conf['RC_INI_OPTIONS'] = options
148 celery_app.user_options['RC_INI_OPTIONS'] = options
149
141 150 setup_logging(ini_file)
142 151
143 152
144 153 def _init_celery(app_type=''):
145 154 from rhodecode.config.middleware import get_celery_config
146 155
147 156 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
148 157
149 158 ini_file = celery_app.conf['RC_INI_FILE']
150 159 options = celery_app.conf['RC_INI_OPTIONS']
151 160
152 161 env = None
153 162 try:
154 163 env = bootstrap(ini_file, options=options)
155 164 except Exception:
156 165 log.exception('Failed to bootstrap RhodeCode APP')
157 166
158 167 if not env:
159 168 raise EnvironmentError(
160 169 'Failed to load pyramid ENV. '
161 170 'Probably there is another error present that prevents from running pyramid app')
162 171
163 172 log.debug('Got Pyramid ENV: %s', env)
164 173
165 174 settings = env['registry'].settings
166 175 celery_settings = get_celery_config(settings)
167 176
168 177 # init and bootstrap StatsdClient
169 178 StatsdClient.setup(settings)
170 179
171 180 setup_celery_app(
172 181 app=env['app'], root=env['root'], request=env['request'],
173 182 registry=env['registry'], closer=env['closer'],
174 183 celery_settings=celery_settings)
175 184
176 185
177 186 @signals.celeryd_init.connect
178 187 def on_celeryd_init(sender=None, conf=None, **kwargs):
179 188 _init_celery('celery worker')
180 189
181 190 # fix the global flag even if it's disabled via .ini file because this
182 191 # is a worker code that doesn't need this to be disabled.
183 192 rhodecode.CELERY_ENABLED = True
184 193
185 194
186 195 @signals.beat_init.connect
187 196 def on_beat_init(sender=None, conf=None, **kwargs):
188 197 _init_celery('celery beat')
189 198
190 199
191 200 @signals.task_prerun.connect
192 201 def task_prerun_signal(task_id, task, args, **kwargs):
193 202 ping_db()
194 203 statsd = StatsdClient.statsd
195 204
196 205 if statsd:
197 206 task_repr = getattr(task, 'name', task)
198 207 statsd.incr('rhodecode_celery_task_total', tags=[
199 208 f'task:{task_repr}',
200 209 'mode:async'
201 210 ])
202 211
203 212
204 213 @signals.task_success.connect
205 214 def task_success_signal(result, **kwargs):
206 215 meta.Session.commit()
207 216 closer = celery_app.conf['PYRAMID_CLOSER']
208 217 if closer:
209 218 closer()
210 219
211 220
212 221 @signals.task_retry.connect
213 222 def task_retry_signal(
214 223 request, reason, einfo, **kwargs):
215 224 meta.Session.remove()
216 225 closer = celery_app.conf['PYRAMID_CLOSER']
217 226 if closer:
218 227 closer()
219 228
220 229
221 230 @signals.task_failure.connect
222 231 def task_failure_signal(
223 232 task_id, exception, args, kwargs, traceback, einfo, **kargs):
224 233
225 234 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
226 235 from rhodecode.lib.exc_tracking import store_exception
227 236 from rhodecode.lib.statsd_client import StatsdClient
228 237
229 238 meta.Session.remove()
230 239
231 240 # simulate sys.exc_info()
232 241 exc_info = (einfo.type, einfo.exception, einfo.tb)
233 242 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
234 243 statsd = StatsdClient.statsd
235 244 if statsd:
236 245 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
237 246 statsd.incr('rhodecode_exception_total',
238 247 tags=["exc_source:celery", "type:{}".format(exc_type)])
239 248
240 249 closer = celery_app.conf['PYRAMID_CLOSER']
241 250 if closer:
242 251 closer()
243 252
244 253
245 254 @signals.task_revoked.connect
246 255 def task_revoked_signal(
247 256 request, terminated, signum, expired, **kwargs):
248 257 closer = celery_app.conf['PYRAMID_CLOSER']
249 258 if closer:
250 259 closer()
251 260
252 261
253 262 class UNSET(object):
254 263 pass
255 264
256 265
257 266 _unset = UNSET()
258 267
259 268
260 269 def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset):
261 270
262 271 if request is not UNSET:
263 272 celery_app.conf.update({'PYRAMID_REQUEST': request})
264 273
265 274 if registry is not UNSET:
266 275 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
267 276
268 277
269 278 def setup_celery_app(app, root, request, registry, closer, celery_settings):
270 279 log.debug('Got custom celery conf: %s', celery_settings)
271 280 celery_config = base_celery_config
272 281 celery_config.update({
273 282 # store celerybeat scheduler db where the .ini file is
274 283 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
275 284 })
276 285
277 286 celery_config.update(celery_settings)
278 287 celery_app.config_from_object(celery_config)
279 288
280 289 celery_app.conf.update({'PYRAMID_APP': app})
281 290 celery_app.conf.update({'PYRAMID_ROOT': root})
282 291 celery_app.conf.update({'PYRAMID_REQUEST': request})
283 292 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
284 293 celery_app.conf.update({'PYRAMID_CLOSER': closer})
285 294
286 295
287 296 def configure_celery(config, celery_settings):
288 297 """
289 298 Helper that is called from our application creation logic. It gives
290 299 connection info into running webapp and allows execution of tasks from
291 300 RhodeCode itself
292 301 """
293 302 # store some globals into rhodecode
294 303 rhodecode.CELERY_ENABLED = str2bool(
295 304 config.registry.settings.get('use_celery'))
296 305 if rhodecode.CELERY_ENABLED:
297 306 log.info('Configuring celery based on `%s` settings', celery_settings)
298 307 setup_celery_app(
299 308 app=None, root=None, request=None, registry=config.registry,
300 309 closer=None, celery_settings=celery_settings)
301 310
302 311
303 312 def maybe_prepare_env(req):
304 313 environ = {}
305 314 try:
306 315 environ.update({
307 316 'PATH_INFO': req.environ['PATH_INFO'],
308 317 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
309 318 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
310 319 'SERVER_NAME': req.environ['SERVER_NAME'],
311 320 'SERVER_PORT': req.environ['SERVER_PORT'],
312 321 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
313 322 })
314 323 except Exception:
315 324 pass
316 325
317 326 return environ
318 327
319 328
320 329 class RequestContextTask(Task):
321 330 """
322 331 This is a celery task which will create a rhodecode app instance context
323 332 for the task, patch pyramid with the original request
324 333 that created the task and also add the user to the context.
325 334 """
326 335
327 336 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
328 337 link=None, link_error=None, shadow=None, **options):
329 338 """ queue the job to run (we are in web request context here) """
330 339 from rhodecode.lib.base import get_ip_addr
331 340
332 341 req = self.app.conf['PYRAMID_REQUEST']
333 342 if not req:
334 343 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
335 344
336 345 log.debug('Running Task with class: %s. Request Class: %s',
337 346 self.__class__, req.__class__)
338 347
339 348 user_id = 0
340 349
341 350 # web case
342 351 if hasattr(req, 'user'):
343 352 user_id = req.user.user_id
344 353
345 354 # api case
346 355 elif hasattr(req, 'rpc_user'):
347 356 user_id = req.rpc_user.user_id
348 357
349 358 # we hook into kwargs since it is the only way to pass our data to
350 359 # the celery worker
351 360 environ = maybe_prepare_env(req)
352 361 options['headers'] = options.get('headers', {})
353 362 options['headers'].update({
354 363 'rhodecode_proxy_data': {
355 364 'environ': environ,
356 365 'auth_user': {
357 366 'ip_addr': get_ip_addr(req.environ),
358 367 'user_id': user_id
359 368 },
360 369 }
361 370 })
362 371
363 372 return super(RequestContextTask, self).apply_async(
364 373 args, kwargs, task_id, producer, link, link_error, shadow, **options)
General Comments 0
You need to be logged in to leave comments. Login now