##// END OF EJS Templates
fix(celery): fixed celery logging error about the missing keys for ini options
super-admin -
r5244:5dc4258b default
parent child Browse files
Show More
@@ -1,364 +1,373 b''
1 # Copyright (C) 2010-2023 RhodeCode GmbH
1 # Copyright (C) 2010-2023 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 """
18 """
19 Celery loader, run with::
19 Celery loader, run with::
20
20
21 celery worker \
21 celery worker \
22 --task-events \
22 --task-events \
23 --beat \
23 --beat \
24 --autoscale=20,2 \
24 --autoscale=20,2 \
25 --max-tasks-per-child 1 \
25 --max-tasks-per-child 1 \
26 --app rhodecode.lib.celerylib.loader \
26 --app rhodecode.lib.celerylib.loader \
27 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
27 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
28 --loglevel DEBUG --ini=.dev/dev.ini
28 --loglevel DEBUG --ini=.dev/dev.ini
29 """
29 """
30 from rhodecode.config.patches import inspect_getargspec, inspect_formatargspec
30 from rhodecode.config.patches import inspect_getargspec, inspect_formatargspec
31 inspect_getargspec()
31 inspect_getargspec()
32 inspect_formatargspec()
32 inspect_formatargspec()
33 # python3.11 inspect patches for backward compat on `paste` code
33 # python3.11 inspect patches for backward compat on `paste` code
34
34
35 import logging
35 import logging
36 import importlib
36 import importlib
37
37
38 import click
38 import click
39 from celery import Celery
39 from celery import Celery
40 from celery import signals
40 from celery import signals
41 from celery import Task
41 from celery import Task
42 from celery import exceptions # noqa
42 from celery import exceptions # noqa
43 from kombu.serialization import register
43 from kombu.serialization import register
44
44
45 import rhodecode
45 import rhodecode
46
46
47 from rhodecode.lib.statsd_client import StatsdClient
47 from rhodecode.lib.statsd_client import StatsdClient
48 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
48 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
49 from rhodecode.lib.ext_json import json
49 from rhodecode.lib.ext_json import json
50 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
50 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
51 from rhodecode.lib.utils2 import str2bool
51 from rhodecode.lib.utils2 import str2bool
52 from rhodecode.model import meta
52 from rhodecode.model import meta
53
53
54
54
55 register('json_ext', json.dumps, json.loads,
55 register('json_ext', json.dumps, json.loads,
56 content_type='application/x-json-ext',
56 content_type='application/x-json-ext',
57 content_encoding='utf-8')
57 content_encoding='utf-8')
58
58
59 log = logging.getLogger('celery.rhodecode.loader')
59 log = logging.getLogger('celery.rhodecode.loader')
60
60
61
61
62 imports = ['rhodecode.lib.celerylib.tasks']
62 imports = ['rhodecode.lib.celerylib.tasks']
63
63
64 try:
64 try:
65 # try if we have EE tasks available
65 # try if we have EE tasks available
66 importlib.import_module('rc_ee')
66 importlib.import_module('rc_ee')
67 imports.append('rc_ee.lib.celerylib.tasks')
67 imports.append('rc_ee.lib.celerylib.tasks')
68 except ImportError:
68 except ImportError:
69 pass
69 pass
70
70
71
71
72 base_celery_config = {
72 base_celery_config = {
73 'result_backend': 'rpc://',
73 'result_backend': 'rpc://',
74 'result_expires': 60 * 60 * 24,
74 'result_expires': 60 * 60 * 24,
75 'result_persistent': True,
75 'result_persistent': True,
76 'imports': imports,
76 'imports': imports,
77 'worker_max_tasks_per_child': 20,
77 'worker_max_tasks_per_child': 20,
78 'accept_content': ['json_ext', 'json'],
78 'accept_content': ['json_ext', 'json'],
79 'task_serializer': 'json_ext',
79 'task_serializer': 'json_ext',
80 'result_serializer': 'json_ext',
80 'result_serializer': 'json_ext',
81 'worker_hijack_root_logger': False,
81 'worker_hijack_root_logger': False,
82 'database_table_names': {
82 'database_table_names': {
83 'task': 'beat_taskmeta',
83 'task': 'beat_taskmeta',
84 'group': 'beat_groupmeta',
84 'group': 'beat_groupmeta',
85 }
85 }
86 }
86 }
87
87
88
88
89 preload_option_ini = click.Option(
89 preload_option_ini = click.Option(
90 ('--ini',),
90 ('--ini',),
91 help='Path to ini configuration file.'
91 help='Path to ini configuration file.'
92 )
92 )
93
93
94 preload_option_ini_var = click.Option(
94 preload_option_ini_var = click.Option(
95 ('--ini-var',),
95 ('--ini-var',),
96 help='Comma separated list of key=value to pass to ini.'
96 help='Comma separated list of key=value to pass to ini.'
97 )
97 )
98
98
99
99
100 def get_logger(obj):
100 def get_logger(obj):
101 custom_log = logging.getLogger(
101 custom_log = logging.getLogger(
102 'rhodecode.task.{}'.format(obj.__class__.__name__))
102 'rhodecode.task.{}'.format(obj.__class__.__name__))
103
103
104 if rhodecode.CELERY_ENABLED:
104 if rhodecode.CELERY_ENABLED:
105 try:
105 try:
106 custom_log = obj.get_logger()
106 custom_log = obj.get_logger()
107 except Exception:
107 except Exception:
108 pass
108 pass
109
109
110 return custom_log
110 return custom_log
111
111
112
112
113 # init main celery app
113 # init main celery app
114 celery_app = Celery()
114 celery_app = Celery()
115 celery_app.user_options['preload'].add(preload_option_ini)
115 celery_app.user_options['preload'].add(preload_option_ini)
116 celery_app.user_options['preload'].add(preload_option_ini_var)
116 celery_app.user_options['preload'].add(preload_option_ini_var)
117
117
118
118
119 @signals.setup_logging.connect
119 @signals.setup_logging.connect
120 def setup_logging_callback(**kwargs):
120 def setup_logging_callback(**kwargs):
121 ini_file = celery_app.conf['RC_INI_FILE']
121
122 if 'RC_INI_FILE' in celery_app.conf:
123 ini_file = celery_app.conf['RC_INI_FILE']
124 else:
125 ini_file = celery_app.user_options['RC_INI_FILE']
126
122 setup_logging(ini_file)
127 setup_logging(ini_file)
123
128
124
129
125 @signals.user_preload_options.connect
130 @signals.user_preload_options.connect
126 def on_preload_parsed(options, **kwargs):
131 def on_preload_parsed(options, **kwargs):
127
132
128 ini_file = options['ini']
133 ini_file = options['ini']
129 ini_vars = options['ini_var']
134 ini_vars = options['ini_var']
130
135
131 if ini_file is None:
136 if ini_file is None:
132 print('You must provide the --ini argument to start celery')
137 print('You must provide the --ini argument to start celery')
133 exit(-1)
138 exit(-1)
134
139
135 options = None
140 options = None
136 if ini_vars is not None:
141 if ini_vars is not None:
137 options = parse_ini_vars(ini_vars)
142 options = parse_ini_vars(ini_vars)
138
143
139 celery_app.conf['RC_INI_FILE'] = ini_file
144 celery_app.conf['RC_INI_FILE'] = ini_file
145 celery_app.user_options['RC_INI_FILE'] = ini_file
146
140 celery_app.conf['RC_INI_OPTIONS'] = options
147 celery_app.conf['RC_INI_OPTIONS'] = options
148 celery_app.user_options['RC_INI_OPTIONS'] = options
149
141 setup_logging(ini_file)
150 setup_logging(ini_file)
142
151
143
152
144 def _init_celery(app_type=''):
153 def _init_celery(app_type=''):
145 from rhodecode.config.middleware import get_celery_config
154 from rhodecode.config.middleware import get_celery_config
146
155
147 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
156 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
148
157
149 ini_file = celery_app.conf['RC_INI_FILE']
158 ini_file = celery_app.conf['RC_INI_FILE']
150 options = celery_app.conf['RC_INI_OPTIONS']
159 options = celery_app.conf['RC_INI_OPTIONS']
151
160
152 env = None
161 env = None
153 try:
162 try:
154 env = bootstrap(ini_file, options=options)
163 env = bootstrap(ini_file, options=options)
155 except Exception:
164 except Exception:
156 log.exception('Failed to bootstrap RhodeCode APP')
165 log.exception('Failed to bootstrap RhodeCode APP')
157
166
158 if not env:
167 if not env:
159 raise EnvironmentError(
168 raise EnvironmentError(
160 'Failed to load pyramid ENV. '
169 'Failed to load pyramid ENV. '
161 'Probably there is another error present that prevents from running pyramid app')
170 'Probably there is another error present that prevents from running pyramid app')
162
171
163 log.debug('Got Pyramid ENV: %s', env)
172 log.debug('Got Pyramid ENV: %s', env)
164
173
165 settings = env['registry'].settings
174 settings = env['registry'].settings
166 celery_settings = get_celery_config(settings)
175 celery_settings = get_celery_config(settings)
167
176
168 # init and bootstrap StatsdClient
177 # init and bootstrap StatsdClient
169 StatsdClient.setup(settings)
178 StatsdClient.setup(settings)
170
179
171 setup_celery_app(
180 setup_celery_app(
172 app=env['app'], root=env['root'], request=env['request'],
181 app=env['app'], root=env['root'], request=env['request'],
173 registry=env['registry'], closer=env['closer'],
182 registry=env['registry'], closer=env['closer'],
174 celery_settings=celery_settings)
183 celery_settings=celery_settings)
175
184
176
185
177 @signals.celeryd_init.connect
186 @signals.celeryd_init.connect
178 def on_celeryd_init(sender=None, conf=None, **kwargs):
187 def on_celeryd_init(sender=None, conf=None, **kwargs):
179 _init_celery('celery worker')
188 _init_celery('celery worker')
180
189
181 # fix the global flag even if it's disabled via .ini file because this
190 # fix the global flag even if it's disabled via .ini file because this
182 # is a worker code that doesn't need this to be disabled.
191 # is a worker code that doesn't need this to be disabled.
183 rhodecode.CELERY_ENABLED = True
192 rhodecode.CELERY_ENABLED = True
184
193
185
194
186 @signals.beat_init.connect
195 @signals.beat_init.connect
187 def on_beat_init(sender=None, conf=None, **kwargs):
196 def on_beat_init(sender=None, conf=None, **kwargs):
188 _init_celery('celery beat')
197 _init_celery('celery beat')
189
198
190
199
191 @signals.task_prerun.connect
200 @signals.task_prerun.connect
192 def task_prerun_signal(task_id, task, args, **kwargs):
201 def task_prerun_signal(task_id, task, args, **kwargs):
193 ping_db()
202 ping_db()
194 statsd = StatsdClient.statsd
203 statsd = StatsdClient.statsd
195
204
196 if statsd:
205 if statsd:
197 task_repr = getattr(task, 'name', task)
206 task_repr = getattr(task, 'name', task)
198 statsd.incr('rhodecode_celery_task_total', tags=[
207 statsd.incr('rhodecode_celery_task_total', tags=[
199 f'task:{task_repr}',
208 f'task:{task_repr}',
200 'mode:async'
209 'mode:async'
201 ])
210 ])
202
211
203
212
204 @signals.task_success.connect
213 @signals.task_success.connect
205 def task_success_signal(result, **kwargs):
214 def task_success_signal(result, **kwargs):
206 meta.Session.commit()
215 meta.Session.commit()
207 closer = celery_app.conf['PYRAMID_CLOSER']
216 closer = celery_app.conf['PYRAMID_CLOSER']
208 if closer:
217 if closer:
209 closer()
218 closer()
210
219
211
220
212 @signals.task_retry.connect
221 @signals.task_retry.connect
213 def task_retry_signal(
222 def task_retry_signal(
214 request, reason, einfo, **kwargs):
223 request, reason, einfo, **kwargs):
215 meta.Session.remove()
224 meta.Session.remove()
216 closer = celery_app.conf['PYRAMID_CLOSER']
225 closer = celery_app.conf['PYRAMID_CLOSER']
217 if closer:
226 if closer:
218 closer()
227 closer()
219
228
220
229
221 @signals.task_failure.connect
230 @signals.task_failure.connect
222 def task_failure_signal(
231 def task_failure_signal(
223 task_id, exception, args, kwargs, traceback, einfo, **kargs):
232 task_id, exception, args, kwargs, traceback, einfo, **kargs):
224
233
225 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
234 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
226 from rhodecode.lib.exc_tracking import store_exception
235 from rhodecode.lib.exc_tracking import store_exception
227 from rhodecode.lib.statsd_client import StatsdClient
236 from rhodecode.lib.statsd_client import StatsdClient
228
237
229 meta.Session.remove()
238 meta.Session.remove()
230
239
231 # simulate sys.exc_info()
240 # simulate sys.exc_info()
232 exc_info = (einfo.type, einfo.exception, einfo.tb)
241 exc_info = (einfo.type, einfo.exception, einfo.tb)
233 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
242 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
234 statsd = StatsdClient.statsd
243 statsd = StatsdClient.statsd
235 if statsd:
244 if statsd:
236 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
245 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
237 statsd.incr('rhodecode_exception_total',
246 statsd.incr('rhodecode_exception_total',
238 tags=["exc_source:celery", "type:{}".format(exc_type)])
247 tags=["exc_source:celery", "type:{}".format(exc_type)])
239
248
240 closer = celery_app.conf['PYRAMID_CLOSER']
249 closer = celery_app.conf['PYRAMID_CLOSER']
241 if closer:
250 if closer:
242 closer()
251 closer()
243
252
244
253
245 @signals.task_revoked.connect
254 @signals.task_revoked.connect
246 def task_revoked_signal(
255 def task_revoked_signal(
247 request, terminated, signum, expired, **kwargs):
256 request, terminated, signum, expired, **kwargs):
248 closer = celery_app.conf['PYRAMID_CLOSER']
257 closer = celery_app.conf['PYRAMID_CLOSER']
249 if closer:
258 if closer:
250 closer()
259 closer()
251
260
252
261
253 class UNSET(object):
262 class UNSET(object):
254 pass
263 pass
255
264
256
265
257 _unset = UNSET()
266 _unset = UNSET()
258
267
259
268
260 def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset):
269 def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset):
261
270
262 if request is not UNSET:
271 if request is not UNSET:
263 celery_app.conf.update({'PYRAMID_REQUEST': request})
272 celery_app.conf.update({'PYRAMID_REQUEST': request})
264
273
265 if registry is not UNSET:
274 if registry is not UNSET:
266 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
275 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
267
276
268
277
269 def setup_celery_app(app, root, request, registry, closer, celery_settings):
278 def setup_celery_app(app, root, request, registry, closer, celery_settings):
270 log.debug('Got custom celery conf: %s', celery_settings)
279 log.debug('Got custom celery conf: %s', celery_settings)
271 celery_config = base_celery_config
280 celery_config = base_celery_config
272 celery_config.update({
281 celery_config.update({
273 # store celerybeat scheduler db where the .ini file is
282 # store celerybeat scheduler db where the .ini file is
274 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
283 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
275 })
284 })
276
285
277 celery_config.update(celery_settings)
286 celery_config.update(celery_settings)
278 celery_app.config_from_object(celery_config)
287 celery_app.config_from_object(celery_config)
279
288
280 celery_app.conf.update({'PYRAMID_APP': app})
289 celery_app.conf.update({'PYRAMID_APP': app})
281 celery_app.conf.update({'PYRAMID_ROOT': root})
290 celery_app.conf.update({'PYRAMID_ROOT': root})
282 celery_app.conf.update({'PYRAMID_REQUEST': request})
291 celery_app.conf.update({'PYRAMID_REQUEST': request})
283 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
292 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
284 celery_app.conf.update({'PYRAMID_CLOSER': closer})
293 celery_app.conf.update({'PYRAMID_CLOSER': closer})
285
294
286
295
287 def configure_celery(config, celery_settings):
296 def configure_celery(config, celery_settings):
288 """
297 """
289 Helper that is called from our application creation logic. It gives
298 Helper that is called from our application creation logic. It gives
290 connection info into running webapp and allows execution of tasks from
299 connection info into running webapp and allows execution of tasks from
291 RhodeCode itself
300 RhodeCode itself
292 """
301 """
293 # store some globals into rhodecode
302 # store some globals into rhodecode
294 rhodecode.CELERY_ENABLED = str2bool(
303 rhodecode.CELERY_ENABLED = str2bool(
295 config.registry.settings.get('use_celery'))
304 config.registry.settings.get('use_celery'))
296 if rhodecode.CELERY_ENABLED:
305 if rhodecode.CELERY_ENABLED:
297 log.info('Configuring celery based on `%s` settings', celery_settings)
306 log.info('Configuring celery based on `%s` settings', celery_settings)
298 setup_celery_app(
307 setup_celery_app(
299 app=None, root=None, request=None, registry=config.registry,
308 app=None, root=None, request=None, registry=config.registry,
300 closer=None, celery_settings=celery_settings)
309 closer=None, celery_settings=celery_settings)
301
310
302
311
303 def maybe_prepare_env(req):
312 def maybe_prepare_env(req):
304 environ = {}
313 environ = {}
305 try:
314 try:
306 environ.update({
315 environ.update({
307 'PATH_INFO': req.environ['PATH_INFO'],
316 'PATH_INFO': req.environ['PATH_INFO'],
308 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
317 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
309 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
318 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
310 'SERVER_NAME': req.environ['SERVER_NAME'],
319 'SERVER_NAME': req.environ['SERVER_NAME'],
311 'SERVER_PORT': req.environ['SERVER_PORT'],
320 'SERVER_PORT': req.environ['SERVER_PORT'],
312 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
321 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
313 })
322 })
314 except Exception:
323 except Exception:
315 pass
324 pass
316
325
317 return environ
326 return environ
318
327
319
328
320 class RequestContextTask(Task):
329 class RequestContextTask(Task):
321 """
330 """
322 This is a celery task which will create a rhodecode app instance context
331 This is a celery task which will create a rhodecode app instance context
323 for the task, patch pyramid with the original request
332 for the task, patch pyramid with the original request
324 that created the task and also add the user to the context.
333 that created the task and also add the user to the context.
325 """
334 """
326
335
327 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
336 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
328 link=None, link_error=None, shadow=None, **options):
337 link=None, link_error=None, shadow=None, **options):
329 """ queue the job to run (we are in web request context here) """
338 """ queue the job to run (we are in web request context here) """
330 from rhodecode.lib.base import get_ip_addr
339 from rhodecode.lib.base import get_ip_addr
331
340
332 req = self.app.conf['PYRAMID_REQUEST']
341 req = self.app.conf['PYRAMID_REQUEST']
333 if not req:
342 if not req:
334 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
343 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
335
344
336 log.debug('Running Task with class: %s. Request Class: %s',
345 log.debug('Running Task with class: %s. Request Class: %s',
337 self.__class__, req.__class__)
346 self.__class__, req.__class__)
338
347
339 user_id = 0
348 user_id = 0
340
349
341 # web case
350 # web case
342 if hasattr(req, 'user'):
351 if hasattr(req, 'user'):
343 user_id = req.user.user_id
352 user_id = req.user.user_id
344
353
345 # api case
354 # api case
346 elif hasattr(req, 'rpc_user'):
355 elif hasattr(req, 'rpc_user'):
347 user_id = req.rpc_user.user_id
356 user_id = req.rpc_user.user_id
348
357
349 # we hook into kwargs since it is the only way to pass our data to
358 # we hook into kwargs since it is the only way to pass our data to
350 # the celery worker
359 # the celery worker
351 environ = maybe_prepare_env(req)
360 environ = maybe_prepare_env(req)
352 options['headers'] = options.get('headers', {})
361 options['headers'] = options.get('headers', {})
353 options['headers'].update({
362 options['headers'].update({
354 'rhodecode_proxy_data': {
363 'rhodecode_proxy_data': {
355 'environ': environ,
364 'environ': environ,
356 'auth_user': {
365 'auth_user': {
357 'ip_addr': get_ip_addr(req.environ),
366 'ip_addr': get_ip_addr(req.environ),
358 'user_id': user_id
367 'user_id': user_id
359 },
368 },
360 }
369 }
361 })
370 })
362
371
363 return super(RequestContextTask, self).apply_async(
372 return super(RequestContextTask, self).apply_async(
364 args, kwargs, task_id, producer, link, link_error, shadow, **options)
373 args, kwargs, task_id, producer, link, link_error, shadow, **options)
General Comments 0
You need to be logged in to leave comments. Login now