##// END OF EJS Templates
fix(celery): setup statsd on celery
super-admin -
r5243:adbd81da default
parent child Browse files
Show More
@@ -1,359 +1,364 b''
1 1 # Copyright (C) 2010-2023 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18 """
19 19 Celery loader, run with::
20 20
21 21 celery worker \
22 22 --task-events \
23 23 --beat \
24 24 --autoscale=20,2 \
25 25 --max-tasks-per-child 1 \
26 26 --app rhodecode.lib.celerylib.loader \
27 27 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
28 28 --loglevel DEBUG --ini=.dev/dev.ini
29 29 """
30 30 from rhodecode.config.patches import inspect_getargspec, inspect_formatargspec
31 31 inspect_getargspec()
32 32 inspect_formatargspec()
33 33 # python3.11 inspect patches for backward compat on `paste` code
34 34
35 35 import logging
36 36 import importlib
37 37
38 38 import click
39 39 from celery import Celery
40 40 from celery import signals
41 41 from celery import Task
42 42 from celery import exceptions # noqa
43 43 from kombu.serialization import register
44 44
45 45 import rhodecode
46 46
47 47 from rhodecode.lib.statsd_client import StatsdClient
48 48 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
49 49 from rhodecode.lib.ext_json import json
50 50 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
51 51 from rhodecode.lib.utils2 import str2bool
52 52 from rhodecode.model import meta
53 53
54 54
55 55 register('json_ext', json.dumps, json.loads,
56 56 content_type='application/x-json-ext',
57 57 content_encoding='utf-8')
58 58
59 59 log = logging.getLogger('celery.rhodecode.loader')
60 60
61 61
62 62 imports = ['rhodecode.lib.celerylib.tasks']
63 63
64 64 try:
65 65 # try if we have EE tasks available
66 66 importlib.import_module('rc_ee')
67 67 imports.append('rc_ee.lib.celerylib.tasks')
68 68 except ImportError:
69 69 pass
70 70
71 71
72 72 base_celery_config = {
73 73 'result_backend': 'rpc://',
74 74 'result_expires': 60 * 60 * 24,
75 75 'result_persistent': True,
76 76 'imports': imports,
77 77 'worker_max_tasks_per_child': 20,
78 78 'accept_content': ['json_ext', 'json'],
79 79 'task_serializer': 'json_ext',
80 80 'result_serializer': 'json_ext',
81 81 'worker_hijack_root_logger': False,
82 82 'database_table_names': {
83 83 'task': 'beat_taskmeta',
84 84 'group': 'beat_groupmeta',
85 85 }
86 86 }
87 87
88 88
89 89 preload_option_ini = click.Option(
90 90 ('--ini',),
91 91 help='Path to ini configuration file.'
92 92 )
93 93
94 94 preload_option_ini_var = click.Option(
95 95 ('--ini-var',),
96 96 help='Comma separated list of key=value to pass to ini.'
97 97 )
98 98
99 99
100 100 def get_logger(obj):
101 101 custom_log = logging.getLogger(
102 102 'rhodecode.task.{}'.format(obj.__class__.__name__))
103 103
104 104 if rhodecode.CELERY_ENABLED:
105 105 try:
106 106 custom_log = obj.get_logger()
107 107 except Exception:
108 108 pass
109 109
110 110 return custom_log
111 111
112 112
113 113 # init main celery app
114 114 celery_app = Celery()
115 115 celery_app.user_options['preload'].add(preload_option_ini)
116 116 celery_app.user_options['preload'].add(preload_option_ini_var)
117 117
118 118
119 119 @signals.setup_logging.connect
120 120 def setup_logging_callback(**kwargs):
121 121 ini_file = celery_app.conf['RC_INI_FILE']
122 122 setup_logging(ini_file)
123 123
124 124
125 125 @signals.user_preload_options.connect
126 126 def on_preload_parsed(options, **kwargs):
127 127
128 128 ini_file = options['ini']
129 129 ini_vars = options['ini_var']
130 130
131 131 if ini_file is None:
132 132 print('You must provide the --ini argument to start celery')
133 133 exit(-1)
134 134
135 135 options = None
136 136 if ini_vars is not None:
137 137 options = parse_ini_vars(ini_vars)
138 138
139 139 celery_app.conf['RC_INI_FILE'] = ini_file
140 140 celery_app.conf['RC_INI_OPTIONS'] = options
141 141 setup_logging(ini_file)
142 142
143 143
144 144 def _init_celery(app_type=''):
145 145 from rhodecode.config.middleware import get_celery_config
146 146
147 147 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
148 148
149 149 ini_file = celery_app.conf['RC_INI_FILE']
150 150 options = celery_app.conf['RC_INI_OPTIONS']
151 151
152 152 env = None
153 153 try:
154 154 env = bootstrap(ini_file, options=options)
155 155 except Exception:
156 156 log.exception('Failed to bootstrap RhodeCode APP')
157 157
158 158 if not env:
159 159 raise EnvironmentError(
160 160 'Failed to load pyramid ENV. '
161 161 'Probably there is another error present that prevents from running pyramid app')
162 162
163 163 log.debug('Got Pyramid ENV: %s', env)
164 164
165 celery_settings = get_celery_config(env['registry'].settings)
165 settings = env['registry'].settings
166 celery_settings = get_celery_config(settings)
167
168 # init and bootstrap StatsdClient
169 StatsdClient.setup(settings)
166 170
167 171 setup_celery_app(
168 172 app=env['app'], root=env['root'], request=env['request'],
169 173 registry=env['registry'], closer=env['closer'],
170 174 celery_settings=celery_settings)
171 175
172 176
173 177 @signals.celeryd_init.connect
174 178 def on_celeryd_init(sender=None, conf=None, **kwargs):
175 179 _init_celery('celery worker')
176 180
177 181 # fix the global flag even if it's disabled via .ini file because this
178 182 # is a worker code that doesn't need this to be disabled.
179 183 rhodecode.CELERY_ENABLED = True
180 184
181 185
182 186 @signals.beat_init.connect
183 187 def on_beat_init(sender=None, conf=None, **kwargs):
184 188 _init_celery('celery beat')
185 189
186 190
187 191 @signals.task_prerun.connect
188 192 def task_prerun_signal(task_id, task, args, **kwargs):
189 193 ping_db()
190 194 statsd = StatsdClient.statsd
195
191 196 if statsd:
192 197 task_repr = getattr(task, 'name', task)
193 198 statsd.incr('rhodecode_celery_task_total', tags=[
194 'task:{}'.format(task_repr),
199 f'task:{task_repr}',
195 200 'mode:async'
196 201 ])
197 202
198 203
199 204 @signals.task_success.connect
200 205 def task_success_signal(result, **kwargs):
201 206 meta.Session.commit()
202 207 closer = celery_app.conf['PYRAMID_CLOSER']
203 208 if closer:
204 209 closer()
205 210
206 211
207 212 @signals.task_retry.connect
208 213 def task_retry_signal(
209 214 request, reason, einfo, **kwargs):
210 215 meta.Session.remove()
211 216 closer = celery_app.conf['PYRAMID_CLOSER']
212 217 if closer:
213 218 closer()
214 219
215 220
216 221 @signals.task_failure.connect
217 222 def task_failure_signal(
218 223 task_id, exception, args, kwargs, traceback, einfo, **kargs):
219 224
220 225 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
221 226 from rhodecode.lib.exc_tracking import store_exception
222 227 from rhodecode.lib.statsd_client import StatsdClient
223 228
224 229 meta.Session.remove()
225 230
226 231 # simulate sys.exc_info()
227 232 exc_info = (einfo.type, einfo.exception, einfo.tb)
228 233 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
229 234 statsd = StatsdClient.statsd
230 235 if statsd:
231 236 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
232 237 statsd.incr('rhodecode_exception_total',
233 238 tags=["exc_source:celery", "type:{}".format(exc_type)])
234 239
235 240 closer = celery_app.conf['PYRAMID_CLOSER']
236 241 if closer:
237 242 closer()
238 243
239 244
240 245 @signals.task_revoked.connect
241 246 def task_revoked_signal(
242 247 request, terminated, signum, expired, **kwargs):
243 248 closer = celery_app.conf['PYRAMID_CLOSER']
244 249 if closer:
245 250 closer()
246 251
247 252
248 253 class UNSET(object):
249 254 pass
250 255
251 256
252 257 _unset = UNSET()
253 258
254 259
255 260 def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset):
256 261
257 262 if request is not UNSET:
258 263 celery_app.conf.update({'PYRAMID_REQUEST': request})
259 264
260 265 if registry is not UNSET:
261 266 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
262 267
263 268
264 269 def setup_celery_app(app, root, request, registry, closer, celery_settings):
265 270 log.debug('Got custom celery conf: %s', celery_settings)
266 271 celery_config = base_celery_config
267 272 celery_config.update({
268 273 # store celerybeat scheduler db where the .ini file is
269 274 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
270 275 })
271 276
272 277 celery_config.update(celery_settings)
273 278 celery_app.config_from_object(celery_config)
274 279
275 280 celery_app.conf.update({'PYRAMID_APP': app})
276 281 celery_app.conf.update({'PYRAMID_ROOT': root})
277 282 celery_app.conf.update({'PYRAMID_REQUEST': request})
278 283 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
279 284 celery_app.conf.update({'PYRAMID_CLOSER': closer})
280 285
281 286
282 287 def configure_celery(config, celery_settings):
283 288 """
284 289 Helper that is called from our application creation logic. It gives
285 290 connection info into running webapp and allows execution of tasks from
286 291 RhodeCode itself
287 292 """
288 293 # store some globals into rhodecode
289 294 rhodecode.CELERY_ENABLED = str2bool(
290 295 config.registry.settings.get('use_celery'))
291 296 if rhodecode.CELERY_ENABLED:
292 297 log.info('Configuring celery based on `%s` settings', celery_settings)
293 298 setup_celery_app(
294 299 app=None, root=None, request=None, registry=config.registry,
295 300 closer=None, celery_settings=celery_settings)
296 301
297 302
298 303 def maybe_prepare_env(req):
299 304 environ = {}
300 305 try:
301 306 environ.update({
302 307 'PATH_INFO': req.environ['PATH_INFO'],
303 308 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
304 309 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
305 310 'SERVER_NAME': req.environ['SERVER_NAME'],
306 311 'SERVER_PORT': req.environ['SERVER_PORT'],
307 312 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
308 313 })
309 314 except Exception:
310 315 pass
311 316
312 317 return environ
313 318
314 319
315 320 class RequestContextTask(Task):
316 321 """
317 322 This is a celery task which will create a rhodecode app instance context
318 323 for the task, patch pyramid with the original request
319 324 that created the task and also add the user to the context.
320 325 """
321 326
322 327 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
323 328 link=None, link_error=None, shadow=None, **options):
324 329 """ queue the job to run (we are in web request context here) """
325 330 from rhodecode.lib.base import get_ip_addr
326 331
327 332 req = self.app.conf['PYRAMID_REQUEST']
328 333 if not req:
329 334 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
330 335
331 336 log.debug('Running Task with class: %s. Request Class: %s',
332 337 self.__class__, req.__class__)
333 338
334 339 user_id = 0
335 340
336 341 # web case
337 342 if hasattr(req, 'user'):
338 343 user_id = req.user.user_id
339 344
340 345 # api case
341 346 elif hasattr(req, 'rpc_user'):
342 347 user_id = req.rpc_user.user_id
343 348
344 349 # we hook into kwargs since it is the only way to pass our data to
345 350 # the celery worker
346 351 environ = maybe_prepare_env(req)
347 352 options['headers'] = options.get('headers', {})
348 353 options['headers'].update({
349 354 'rhodecode_proxy_data': {
350 355 'environ': environ,
351 356 'auth_user': {
352 357 'ip_addr': get_ip_addr(req.environ),
353 358 'user_id': user_id
354 359 },
355 360 }
356 361 })
357 362
358 363 return super(RequestContextTask, self).apply_async(
359 364 args, kwargs, task_id, producer, link, link_error, shadow, **options)
General Comments 0
You need to be logged in to leave comments. Login now