##// END OF EJS Templates
fix(celery): setup statsd on celery
super-admin -
r5243:adbd81da default
parent child Browse files
Show More
@@ -1,359 +1,364 b''
1 # Copyright (C) 2010-2023 RhodeCode GmbH
1 # Copyright (C) 2010-2023 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 """
18 """
19 Celery loader, run with::
19 Celery loader, run with::
20
20
21 celery worker \
21 celery worker \
22 --task-events \
22 --task-events \
23 --beat \
23 --beat \
24 --autoscale=20,2 \
24 --autoscale=20,2 \
25 --max-tasks-per-child 1 \
25 --max-tasks-per-child 1 \
26 --app rhodecode.lib.celerylib.loader \
26 --app rhodecode.lib.celerylib.loader \
27 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
27 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
28 --loglevel DEBUG --ini=.dev/dev.ini
28 --loglevel DEBUG --ini=.dev/dev.ini
29 """
29 """
30 from rhodecode.config.patches import inspect_getargspec, inspect_formatargspec
30 from rhodecode.config.patches import inspect_getargspec, inspect_formatargspec
31 inspect_getargspec()
31 inspect_getargspec()
32 inspect_formatargspec()
32 inspect_formatargspec()
33 # python3.11 inspect patches for backward compat on `paste` code
33 # python3.11 inspect patches for backward compat on `paste` code
34
34
35 import logging
35 import logging
36 import importlib
36 import importlib
37
37
38 import click
38 import click
39 from celery import Celery
39 from celery import Celery
40 from celery import signals
40 from celery import signals
41 from celery import Task
41 from celery import Task
42 from celery import exceptions # noqa
42 from celery import exceptions # noqa
43 from kombu.serialization import register
43 from kombu.serialization import register
44
44
45 import rhodecode
45 import rhodecode
46
46
47 from rhodecode.lib.statsd_client import StatsdClient
47 from rhodecode.lib.statsd_client import StatsdClient
48 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
48 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
49 from rhodecode.lib.ext_json import json
49 from rhodecode.lib.ext_json import json
50 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
50 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
51 from rhodecode.lib.utils2 import str2bool
51 from rhodecode.lib.utils2 import str2bool
52 from rhodecode.model import meta
52 from rhodecode.model import meta
53
53
54
54
55 register('json_ext', json.dumps, json.loads,
55 register('json_ext', json.dumps, json.loads,
56 content_type='application/x-json-ext',
56 content_type='application/x-json-ext',
57 content_encoding='utf-8')
57 content_encoding='utf-8')
58
58
59 log = logging.getLogger('celery.rhodecode.loader')
59 log = logging.getLogger('celery.rhodecode.loader')
60
60
61
61
62 imports = ['rhodecode.lib.celerylib.tasks']
62 imports = ['rhodecode.lib.celerylib.tasks']
63
63
64 try:
64 try:
65 # try if we have EE tasks available
65 # try if we have EE tasks available
66 importlib.import_module('rc_ee')
66 importlib.import_module('rc_ee')
67 imports.append('rc_ee.lib.celerylib.tasks')
67 imports.append('rc_ee.lib.celerylib.tasks')
68 except ImportError:
68 except ImportError:
69 pass
69 pass
70
70
71
71
72 base_celery_config = {
72 base_celery_config = {
73 'result_backend': 'rpc://',
73 'result_backend': 'rpc://',
74 'result_expires': 60 * 60 * 24,
74 'result_expires': 60 * 60 * 24,
75 'result_persistent': True,
75 'result_persistent': True,
76 'imports': imports,
76 'imports': imports,
77 'worker_max_tasks_per_child': 20,
77 'worker_max_tasks_per_child': 20,
78 'accept_content': ['json_ext', 'json'],
78 'accept_content': ['json_ext', 'json'],
79 'task_serializer': 'json_ext',
79 'task_serializer': 'json_ext',
80 'result_serializer': 'json_ext',
80 'result_serializer': 'json_ext',
81 'worker_hijack_root_logger': False,
81 'worker_hijack_root_logger': False,
82 'database_table_names': {
82 'database_table_names': {
83 'task': 'beat_taskmeta',
83 'task': 'beat_taskmeta',
84 'group': 'beat_groupmeta',
84 'group': 'beat_groupmeta',
85 }
85 }
86 }
86 }
87
87
88
88
89 preload_option_ini = click.Option(
89 preload_option_ini = click.Option(
90 ('--ini',),
90 ('--ini',),
91 help='Path to ini configuration file.'
91 help='Path to ini configuration file.'
92 )
92 )
93
93
94 preload_option_ini_var = click.Option(
94 preload_option_ini_var = click.Option(
95 ('--ini-var',),
95 ('--ini-var',),
96 help='Comma separated list of key=value to pass to ini.'
96 help='Comma separated list of key=value to pass to ini.'
97 )
97 )
98
98
99
99
100 def get_logger(obj):
100 def get_logger(obj):
101 custom_log = logging.getLogger(
101 custom_log = logging.getLogger(
102 'rhodecode.task.{}'.format(obj.__class__.__name__))
102 'rhodecode.task.{}'.format(obj.__class__.__name__))
103
103
104 if rhodecode.CELERY_ENABLED:
104 if rhodecode.CELERY_ENABLED:
105 try:
105 try:
106 custom_log = obj.get_logger()
106 custom_log = obj.get_logger()
107 except Exception:
107 except Exception:
108 pass
108 pass
109
109
110 return custom_log
110 return custom_log
111
111
112
112
113 # init main celery app
113 # init main celery app
114 celery_app = Celery()
114 celery_app = Celery()
115 celery_app.user_options['preload'].add(preload_option_ini)
115 celery_app.user_options['preload'].add(preload_option_ini)
116 celery_app.user_options['preload'].add(preload_option_ini_var)
116 celery_app.user_options['preload'].add(preload_option_ini_var)
117
117
118
118
119 @signals.setup_logging.connect
119 @signals.setup_logging.connect
120 def setup_logging_callback(**kwargs):
120 def setup_logging_callback(**kwargs):
121 ini_file = celery_app.conf['RC_INI_FILE']
121 ini_file = celery_app.conf['RC_INI_FILE']
122 setup_logging(ini_file)
122 setup_logging(ini_file)
123
123
124
124
125 @signals.user_preload_options.connect
125 @signals.user_preload_options.connect
126 def on_preload_parsed(options, **kwargs):
126 def on_preload_parsed(options, **kwargs):
127
127
128 ini_file = options['ini']
128 ini_file = options['ini']
129 ini_vars = options['ini_var']
129 ini_vars = options['ini_var']
130
130
131 if ini_file is None:
131 if ini_file is None:
132 print('You must provide the --ini argument to start celery')
132 print('You must provide the --ini argument to start celery')
133 exit(-1)
133 exit(-1)
134
134
135 options = None
135 options = None
136 if ini_vars is not None:
136 if ini_vars is not None:
137 options = parse_ini_vars(ini_vars)
137 options = parse_ini_vars(ini_vars)
138
138
139 celery_app.conf['RC_INI_FILE'] = ini_file
139 celery_app.conf['RC_INI_FILE'] = ini_file
140 celery_app.conf['RC_INI_OPTIONS'] = options
140 celery_app.conf['RC_INI_OPTIONS'] = options
141 setup_logging(ini_file)
141 setup_logging(ini_file)
142
142
143
143
144 def _init_celery(app_type=''):
144 def _init_celery(app_type=''):
145 from rhodecode.config.middleware import get_celery_config
145 from rhodecode.config.middleware import get_celery_config
146
146
147 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
147 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
148
148
149 ini_file = celery_app.conf['RC_INI_FILE']
149 ini_file = celery_app.conf['RC_INI_FILE']
150 options = celery_app.conf['RC_INI_OPTIONS']
150 options = celery_app.conf['RC_INI_OPTIONS']
151
151
152 env = None
152 env = None
153 try:
153 try:
154 env = bootstrap(ini_file, options=options)
154 env = bootstrap(ini_file, options=options)
155 except Exception:
155 except Exception:
156 log.exception('Failed to bootstrap RhodeCode APP')
156 log.exception('Failed to bootstrap RhodeCode APP')
157
157
158 if not env:
158 if not env:
159 raise EnvironmentError(
159 raise EnvironmentError(
160 'Failed to load pyramid ENV. '
160 'Failed to load pyramid ENV. '
161 'Probably there is another error present that prevents from running pyramid app')
161 'Probably there is another error present that prevents from running pyramid app')
162
162
163 log.debug('Got Pyramid ENV: %s', env)
163 log.debug('Got Pyramid ENV: %s', env)
164
164
165 celery_settings = get_celery_config(env['registry'].settings)
165 settings = env['registry'].settings
166 celery_settings = get_celery_config(settings)
167
168 # init and bootstrap StatsdClient
169 StatsdClient.setup(settings)
166
170
167 setup_celery_app(
171 setup_celery_app(
168 app=env['app'], root=env['root'], request=env['request'],
172 app=env['app'], root=env['root'], request=env['request'],
169 registry=env['registry'], closer=env['closer'],
173 registry=env['registry'], closer=env['closer'],
170 celery_settings=celery_settings)
174 celery_settings=celery_settings)
171
175
172
176
173 @signals.celeryd_init.connect
177 @signals.celeryd_init.connect
174 def on_celeryd_init(sender=None, conf=None, **kwargs):
178 def on_celeryd_init(sender=None, conf=None, **kwargs):
175 _init_celery('celery worker')
179 _init_celery('celery worker')
176
180
177 # fix the global flag even if it's disabled via .ini file because this
181 # fix the global flag even if it's disabled via .ini file because this
178 # is a worker code that doesn't need this to be disabled.
182 # is a worker code that doesn't need this to be disabled.
179 rhodecode.CELERY_ENABLED = True
183 rhodecode.CELERY_ENABLED = True
180
184
181
185
182 @signals.beat_init.connect
186 @signals.beat_init.connect
183 def on_beat_init(sender=None, conf=None, **kwargs):
187 def on_beat_init(sender=None, conf=None, **kwargs):
184 _init_celery('celery beat')
188 _init_celery('celery beat')
185
189
186
190
187 @signals.task_prerun.connect
191 @signals.task_prerun.connect
188 def task_prerun_signal(task_id, task, args, **kwargs):
192 def task_prerun_signal(task_id, task, args, **kwargs):
189 ping_db()
193 ping_db()
190 statsd = StatsdClient.statsd
194 statsd = StatsdClient.statsd
195
191 if statsd:
196 if statsd:
192 task_repr = getattr(task, 'name', task)
197 task_repr = getattr(task, 'name', task)
193 statsd.incr('rhodecode_celery_task_total', tags=[
198 statsd.incr('rhodecode_celery_task_total', tags=[
194 'task:{}'.format(task_repr),
199 f'task:{task_repr}',
195 'mode:async'
200 'mode:async'
196 ])
201 ])
197
202
198
203
199 @signals.task_success.connect
204 @signals.task_success.connect
200 def task_success_signal(result, **kwargs):
205 def task_success_signal(result, **kwargs):
201 meta.Session.commit()
206 meta.Session.commit()
202 closer = celery_app.conf['PYRAMID_CLOSER']
207 closer = celery_app.conf['PYRAMID_CLOSER']
203 if closer:
208 if closer:
204 closer()
209 closer()
205
210
206
211
207 @signals.task_retry.connect
212 @signals.task_retry.connect
208 def task_retry_signal(
213 def task_retry_signal(
209 request, reason, einfo, **kwargs):
214 request, reason, einfo, **kwargs):
210 meta.Session.remove()
215 meta.Session.remove()
211 closer = celery_app.conf['PYRAMID_CLOSER']
216 closer = celery_app.conf['PYRAMID_CLOSER']
212 if closer:
217 if closer:
213 closer()
218 closer()
214
219
215
220
216 @signals.task_failure.connect
221 @signals.task_failure.connect
217 def task_failure_signal(
222 def task_failure_signal(
218 task_id, exception, args, kwargs, traceback, einfo, **kargs):
223 task_id, exception, args, kwargs, traceback, einfo, **kargs):
219
224
220 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
225 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
221 from rhodecode.lib.exc_tracking import store_exception
226 from rhodecode.lib.exc_tracking import store_exception
222 from rhodecode.lib.statsd_client import StatsdClient
227 from rhodecode.lib.statsd_client import StatsdClient
223
228
224 meta.Session.remove()
229 meta.Session.remove()
225
230
226 # simulate sys.exc_info()
231 # simulate sys.exc_info()
227 exc_info = (einfo.type, einfo.exception, einfo.tb)
232 exc_info = (einfo.type, einfo.exception, einfo.tb)
228 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
233 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
229 statsd = StatsdClient.statsd
234 statsd = StatsdClient.statsd
230 if statsd:
235 if statsd:
231 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
236 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
232 statsd.incr('rhodecode_exception_total',
237 statsd.incr('rhodecode_exception_total',
233 tags=["exc_source:celery", "type:{}".format(exc_type)])
238 tags=["exc_source:celery", "type:{}".format(exc_type)])
234
239
235 closer = celery_app.conf['PYRAMID_CLOSER']
240 closer = celery_app.conf['PYRAMID_CLOSER']
236 if closer:
241 if closer:
237 closer()
242 closer()
238
243
239
244
240 @signals.task_revoked.connect
245 @signals.task_revoked.connect
241 def task_revoked_signal(
246 def task_revoked_signal(
242 request, terminated, signum, expired, **kwargs):
247 request, terminated, signum, expired, **kwargs):
243 closer = celery_app.conf['PYRAMID_CLOSER']
248 closer = celery_app.conf['PYRAMID_CLOSER']
244 if closer:
249 if closer:
245 closer()
250 closer()
246
251
247
252
248 class UNSET(object):
253 class UNSET(object):
249 pass
254 pass
250
255
251
256
252 _unset = UNSET()
257 _unset = UNSET()
253
258
254
259
255 def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset):
260 def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset):
256
261
257 if request is not UNSET:
262 if request is not UNSET:
258 celery_app.conf.update({'PYRAMID_REQUEST': request})
263 celery_app.conf.update({'PYRAMID_REQUEST': request})
259
264
260 if registry is not UNSET:
265 if registry is not UNSET:
261 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
266 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
262
267
263
268
264 def setup_celery_app(app, root, request, registry, closer, celery_settings):
269 def setup_celery_app(app, root, request, registry, closer, celery_settings):
265 log.debug('Got custom celery conf: %s', celery_settings)
270 log.debug('Got custom celery conf: %s', celery_settings)
266 celery_config = base_celery_config
271 celery_config = base_celery_config
267 celery_config.update({
272 celery_config.update({
268 # store celerybeat scheduler db where the .ini file is
273 # store celerybeat scheduler db where the .ini file is
269 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
274 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
270 })
275 })
271
276
272 celery_config.update(celery_settings)
277 celery_config.update(celery_settings)
273 celery_app.config_from_object(celery_config)
278 celery_app.config_from_object(celery_config)
274
279
275 celery_app.conf.update({'PYRAMID_APP': app})
280 celery_app.conf.update({'PYRAMID_APP': app})
276 celery_app.conf.update({'PYRAMID_ROOT': root})
281 celery_app.conf.update({'PYRAMID_ROOT': root})
277 celery_app.conf.update({'PYRAMID_REQUEST': request})
282 celery_app.conf.update({'PYRAMID_REQUEST': request})
278 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
283 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
279 celery_app.conf.update({'PYRAMID_CLOSER': closer})
284 celery_app.conf.update({'PYRAMID_CLOSER': closer})
280
285
281
286
282 def configure_celery(config, celery_settings):
287 def configure_celery(config, celery_settings):
283 """
288 """
284 Helper that is called from our application creation logic. It gives
289 Helper that is called from our application creation logic. It gives
285 connection info into running webapp and allows execution of tasks from
290 connection info into running webapp and allows execution of tasks from
286 RhodeCode itself
291 RhodeCode itself
287 """
292 """
288 # store some globals into rhodecode
293 # store some globals into rhodecode
289 rhodecode.CELERY_ENABLED = str2bool(
294 rhodecode.CELERY_ENABLED = str2bool(
290 config.registry.settings.get('use_celery'))
295 config.registry.settings.get('use_celery'))
291 if rhodecode.CELERY_ENABLED:
296 if rhodecode.CELERY_ENABLED:
292 log.info('Configuring celery based on `%s` settings', celery_settings)
297 log.info('Configuring celery based on `%s` settings', celery_settings)
293 setup_celery_app(
298 setup_celery_app(
294 app=None, root=None, request=None, registry=config.registry,
299 app=None, root=None, request=None, registry=config.registry,
295 closer=None, celery_settings=celery_settings)
300 closer=None, celery_settings=celery_settings)
296
301
297
302
298 def maybe_prepare_env(req):
303 def maybe_prepare_env(req):
299 environ = {}
304 environ = {}
300 try:
305 try:
301 environ.update({
306 environ.update({
302 'PATH_INFO': req.environ['PATH_INFO'],
307 'PATH_INFO': req.environ['PATH_INFO'],
303 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
308 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
304 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
309 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
305 'SERVER_NAME': req.environ['SERVER_NAME'],
310 'SERVER_NAME': req.environ['SERVER_NAME'],
306 'SERVER_PORT': req.environ['SERVER_PORT'],
311 'SERVER_PORT': req.environ['SERVER_PORT'],
307 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
312 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
308 })
313 })
309 except Exception:
314 except Exception:
310 pass
315 pass
311
316
312 return environ
317 return environ
313
318
314
319
315 class RequestContextTask(Task):
320 class RequestContextTask(Task):
316 """
321 """
317 This is a celery task which will create a rhodecode app instance context
322 This is a celery task which will create a rhodecode app instance context
318 for the task, patch pyramid with the original request
323 for the task, patch pyramid with the original request
319 that created the task and also add the user to the context.
324 that created the task and also add the user to the context.
320 """
325 """
321
326
322 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
327 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
323 link=None, link_error=None, shadow=None, **options):
328 link=None, link_error=None, shadow=None, **options):
324 """ queue the job to run (we are in web request context here) """
329 """ queue the job to run (we are in web request context here) """
325 from rhodecode.lib.base import get_ip_addr
330 from rhodecode.lib.base import get_ip_addr
326
331
327 req = self.app.conf['PYRAMID_REQUEST']
332 req = self.app.conf['PYRAMID_REQUEST']
328 if not req:
333 if not req:
329 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
334 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
330
335
331 log.debug('Running Task with class: %s. Request Class: %s',
336 log.debug('Running Task with class: %s. Request Class: %s',
332 self.__class__, req.__class__)
337 self.__class__, req.__class__)
333
338
334 user_id = 0
339 user_id = 0
335
340
336 # web case
341 # web case
337 if hasattr(req, 'user'):
342 if hasattr(req, 'user'):
338 user_id = req.user.user_id
343 user_id = req.user.user_id
339
344
340 # api case
345 # api case
341 elif hasattr(req, 'rpc_user'):
346 elif hasattr(req, 'rpc_user'):
342 user_id = req.rpc_user.user_id
347 user_id = req.rpc_user.user_id
343
348
344 # we hook into kwargs since it is the only way to pass our data to
349 # we hook into kwargs since it is the only way to pass our data to
345 # the celery worker
350 # the celery worker
346 environ = maybe_prepare_env(req)
351 environ = maybe_prepare_env(req)
347 options['headers'] = options.get('headers', {})
352 options['headers'] = options.get('headers', {})
348 options['headers'].update({
353 options['headers'].update({
349 'rhodecode_proxy_data': {
354 'rhodecode_proxy_data': {
350 'environ': environ,
355 'environ': environ,
351 'auth_user': {
356 'auth_user': {
352 'ip_addr': get_ip_addr(req.environ),
357 'ip_addr': get_ip_addr(req.environ),
353 'user_id': user_id
358 'user_id': user_id
354 },
359 },
355 }
360 }
356 })
361 })
357
362
358 return super(RequestContextTask, self).apply_async(
363 return super(RequestContextTask, self).apply_async(
359 args, kwargs, task_id, producer, link, link_error, shadow, **options)
364 args, kwargs, task_id, producer, link, link_error, shadow, **options)
General Comments 0
You need to be logged in to leave comments. Login now