##// END OF EJS Templates
statsd: better task execution reporting on celery
super-admin -
r4892:22facf19 default
parent child Browse files
Show More
@@ -1,95 +1,95 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import socket
22 22 import logging
23 23
24 24 import rhodecode
25 25 from zope.cachedescriptors.property import Lazy as LazyProperty
26 26 from rhodecode.lib.celerylib.loader import (
27 27 celery_app, RequestContextTask, get_logger)
28 28 from rhodecode.lib.statsd_client import StatsdClient
29 29
30 30 async_task = celery_app.task
31 31
32 32
33 33 log = logging.getLogger(__name__)
34 34
35 35
36 36 class ResultWrapper(object):
37 37 def __init__(self, task):
38 38 self.task = task
39 39
40 40 @LazyProperty
41 41 def result(self):
42 42 return self.task
43 43
44 44
45 45 def run_task(task, *args, **kwargs):
46 46 import celery
47 47 log.debug('Got task `%s` for execution, celery mode enabled:%s', task, rhodecode.CELERY_ENABLED)
48 48 if task is None:
49 49 raise ValueError('Got non-existing task for execution')
50 50
51 51 exec_mode = 'sync'
52 52 allow_async = True
53 53
54 54 # if we're already in a celery task, don't allow async execution again
55 55 # e.g task within task
56 56 in_task = celery.current_task
57 57 if in_task:
58 58 log.debug('This task in in context of another task: %s, not allowing another async execution', in_task)
59 59 allow_async = False
60 60 if kwargs.pop('allow_subtask', False):
61 61 log.debug('Forced async by allow_async=True flag')
62 62 allow_async = True
63 63
64 64 t = None
65 65 if rhodecode.CELERY_ENABLED and allow_async:
66 66
67 statsd = StatsdClient.statsd
68 if statsd:
69 task_repr = getattr(task, 'name', task)
70 statsd.incr('rhodecode_celery_task_total', tags=[
71 'task:{}'.format(task_repr)
72 ])
73
74 67 try:
75 68 t = task.apply_async(args=args, kwargs=kwargs)
76 69 log.debug('executing task %s:%s in async mode', t.task_id, task)
77 70 except socket.error as e:
78 71 if isinstance(e, IOError) and e.errno == 111:
79 72 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
80 73 else:
81 74 log.exception("Exception while connecting to celeryd.")
82 75 except KeyError as e:
83 76 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
84 77 except Exception as e:
85 78 log.exception(
86 79 "Exception while trying to run task asynchronous. "
87 80 "Fallback to sync execution.")
88 81
89 82 else:
90 83 log.debug('executing task %s:%s in sync mode', 'TASK', task)
84 statsd = StatsdClient.statsd
85 if statsd:
86 task_repr = getattr(task, 'name', task)
87 statsd.incr('rhodecode_celery_task_total', tags=[
88 'task:{}'.format(task_repr),
89 'mode:sync'
90 ])
91 91
92 92 # we got async task, return it after statsd call
93 93 if t:
94 94 return t
95 95 return ResultWrapper(task(*args, **kwargs))
@@ -1,344 +1,353 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 """
21 21 Celery loader, run with::
22 22
23 23 celery worker \
24 24 --task-events \
25 25 --beat \
26 26 --autoscale=20,2 \
27 27 --max-tasks-per-child 1 \
28 28 --app rhodecode.lib.celerylib.loader \
29 29 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
30 30 --loglevel DEBUG --ini=.dev/dev.ini
31 31 """
32 32 import os
33 33 import logging
34 34 import importlib
35 35
36 36 from celery import Celery
37 37 from celery import signals
38 38 from celery import Task
39 39 from celery import exceptions # pragma: no cover
40 40 from kombu.serialization import register
41 41
42 42 import rhodecode
43 43
44 from rhodecode.lib.statsd_client import StatsdClient
44 45 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
45 46 from rhodecode.lib.ext_json import json
46 47 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
47 48 from rhodecode.lib.utils2 import str2bool
48 49 from rhodecode.model import meta
49 50
50 51
51 52 register('json_ext', json.dumps, json.loads,
52 53 content_type='application/x-json-ext',
53 54 content_encoding='utf-8')
54 55
55 56 log = logging.getLogger('celery.rhodecode.loader')
56 57
57 58
58 59 imports = ['rhodecode.lib.celerylib.tasks']
59 60
60 61 try:
61 62 # try if we have EE tasks available
62 63 importlib.import_module('rc_ee')
63 64 imports.append('rc_ee.lib.celerylib.tasks')
64 65 except ImportError:
65 66 pass
66 67
67 68
68 69 base_celery_config = {
69 70 'result_backend': 'rpc://',
70 71 'result_expires': 60 * 60 * 24,
71 72 'result_persistent': True,
72 73 'imports': imports,
73 74 'worker_max_tasks_per_child': 20,
74 75 'accept_content': ['json_ext', 'json'],
75 76 'task_serializer': 'json_ext',
76 77 'result_serializer': 'json_ext',
77 78 'worker_hijack_root_logger': False,
78 79 'database_table_names': {
79 80 'task': 'beat_taskmeta',
80 81 'group': 'beat_groupmeta',
81 82 }
82 83 }
83 84
84 85
85 86 def add_preload_arguments(parser):
86 87 parser.add_argument(
87 88 '--ini', default=None,
88 89 help='Path to ini configuration file.'
89 90 )
90 91 parser.add_argument(
91 92 '--ini-var', default=None,
92 93 help='Comma separated list of key=value to pass to ini.'
93 94 )
94 95
95 96
96 97 def get_logger(obj):
97 98 custom_log = logging.getLogger(
98 99 'rhodecode.task.{}'.format(obj.__class__.__name__))
99 100
100 101 if rhodecode.CELERY_ENABLED:
101 102 try:
102 103 custom_log = obj.get_logger()
103 104 except Exception:
104 105 pass
105 106
106 107 return custom_log
107 108
108 109
109 110 # init main celery app
110 111 celery_app = Celery()
111 112 celery_app.user_options['preload'].add(add_preload_arguments)
112 113
113 114
114 115 @signals.setup_logging.connect
115 116 def setup_logging_callback(**kwargs):
116 117 ini_file = celery_app.conf['RC_INI_FILE']
117 118 setup_logging(ini_file)
118 119
119 120
120 121 @signals.user_preload_options.connect
121 122 def on_preload_parsed(options, **kwargs):
122 123
123 124 ini_file = options['ini']
124 125 ini_vars = options['ini_var']
125 126
126 127 if ini_file is None:
127 128 print('You must provide the --ini argument to start celery')
128 129 exit(-1)
129 130
130 131 options = None
131 132 if ini_vars is not None:
132 133 options = parse_ini_vars(ini_vars)
133 134
134 135 celery_app.conf['RC_INI_FILE'] = ini_file
135 136 celery_app.conf['RC_INI_OPTIONS'] = options
136 137 setup_logging(ini_file)
137 138
138 139
139 140 def _init_celery(app_type=''):
140 141 from rhodecode.config.middleware import get_celery_config
141 142
142 143 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
143 144
144 145 ini_file = celery_app.conf['RC_INI_FILE']
145 146 options = celery_app.conf['RC_INI_OPTIONS']
146 147
147 148 env = None
148 149 try:
149 150 env = bootstrap(ini_file, options=options)
150 151 except Exception:
151 152 log.exception('Failed to bootstrap RhodeCode APP')
152 153
153 154 if not env:
154 155 raise EnvironmentError(
155 156 'Failed to load pyramid ENV. '
156 157 'Probably there is another error present that prevents from running pyramid app')
157 158
158 159 log.debug('Got Pyramid ENV: %s', env)
159 160
160 161 celery_settings = get_celery_config(env['registry'].settings)
161 162
162 163 setup_celery_app(
163 164 app=env['app'], root=env['root'], request=env['request'],
164 165 registry=env['registry'], closer=env['closer'],
165 166 celery_settings=celery_settings)
166 167
167 168
168 169 @signals.celeryd_init.connect
169 170 def on_celeryd_init(sender=None, conf=None, **kwargs):
170 171 _init_celery('celery worker')
171 172
172 173 # fix the global flag even if it's disabled via .ini file because this
173 174 # is a worker code that doesn't need this to be disabled.
174 175 rhodecode.CELERY_ENABLED = True
175 176
176 177
177 178 @signals.beat_init.connect
178 179 def on_beat_init(sender=None, conf=None, **kwargs):
179 180 _init_celery('celery beat')
180 181
181 182
182 183 @signals.task_prerun.connect
183 184 def task_prerun_signal(task_id, task, args, **kwargs):
184 185 ping_db()
186 statsd = StatsdClient.statsd
187 if statsd:
188 task_repr = getattr(task, 'name', task)
189 statsd.incr('rhodecode_celery_task_total', tags=[
190 'task:{}'.format(task_repr),
191 'mode:async'
192 ])
185 193
186 194
187 195 @signals.task_success.connect
188 196 def task_success_signal(result, **kwargs):
189 197 meta.Session.commit()
190 198 closer = celery_app.conf['PYRAMID_CLOSER']
191 199 if closer:
192 200 closer()
193 201
194 202
203
195 204 @signals.task_retry.connect
196 205 def task_retry_signal(
197 206 request, reason, einfo, **kwargs):
198 207 meta.Session.remove()
199 208 closer = celery_app.conf['PYRAMID_CLOSER']
200 209 if closer:
201 210 closer()
202 211
203 212
204 213 @signals.task_failure.connect
205 214 def task_failure_signal(
206 215 task_id, exception, args, kwargs, traceback, einfo, **kargs):
207 216
208 217 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
209 218 from rhodecode.lib.exc_tracking import store_exception
210 219 from rhodecode.lib.statsd_client import StatsdClient
211 220
212 221 meta.Session.remove()
213 222
214 223 # simulate sys.exc_info()
215 224 exc_info = (einfo.type, einfo.exception, einfo.tb)
216 225 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
217 226 statsd = StatsdClient.statsd
218 227 if statsd:
219 228 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
220 229 statsd.incr('rhodecode_exception_total',
221 230 tags=["exc_source:celery", "type:{}".format(exc_type)])
222 231
223 232 closer = celery_app.conf['PYRAMID_CLOSER']
224 233 if closer:
225 234 closer()
226 235
227 236
228 237 @signals.task_revoked.connect
229 238 def task_revoked_signal(
230 239 request, terminated, signum, expired, **kwargs):
231 240 closer = celery_app.conf['PYRAMID_CLOSER']
232 241 if closer:
233 242 closer()
234 243
235 244
236 245 class UNSET(object):
237 246 pass
238 247
239 248
240 249 def set_celery_conf(app=UNSET(), root=UNSET(), request=UNSET(), registry=UNSET(), closer=UNSET()):
241 250
242 251 if request is not UNSET:
243 252 celery_app.conf.update({'PYRAMID_REQUEST': request})
244 253
245 254 if registry is not UNSET:
246 255 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
247 256
248 257
249 258 def setup_celery_app(app, root, request, registry, closer, celery_settings):
250 259 log.debug('Got custom celery conf: %s', celery_settings)
251 260 celery_config = base_celery_config
252 261 celery_config.update({
253 262 # store celerybeat scheduler db where the .ini file is
254 263 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
255 264 })
256 265
257 266 celery_config.update(celery_settings)
258 267 celery_app.config_from_object(celery_config)
259 268
260 269 celery_app.conf.update({'PYRAMID_APP': app})
261 270 celery_app.conf.update({'PYRAMID_ROOT': root})
262 271 celery_app.conf.update({'PYRAMID_REQUEST': request})
263 272 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
264 273 celery_app.conf.update({'PYRAMID_CLOSER': closer})
265 274
266 275
267 276 def configure_celery(config, celery_settings):
268 277 """
269 278 Helper that is called from our application creation logic. It gives
270 279 connection info into running webapp and allows execution of tasks from
271 280 RhodeCode itself
272 281 """
273 282 # store some globals into rhodecode
274 283 rhodecode.CELERY_ENABLED = str2bool(
275 284 config.registry.settings.get('use_celery'))
276 285 if rhodecode.CELERY_ENABLED:
277 286 log.info('Configuring celery based on `%s` settings', celery_settings)
278 287 setup_celery_app(
279 288 app=None, root=None, request=None, registry=config.registry,
280 289 closer=None, celery_settings=celery_settings)
281 290
282 291
283 292 def maybe_prepare_env(req):
284 293 environ = {}
285 294 try:
286 295 environ.update({
287 296 'PATH_INFO': req.environ['PATH_INFO'],
288 297 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
289 298 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
290 299 'SERVER_NAME': req.environ['SERVER_NAME'],
291 300 'SERVER_PORT': req.environ['SERVER_PORT'],
292 301 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
293 302 })
294 303 except Exception:
295 304 pass
296 305
297 306 return environ
298 307
299 308
300 309 class RequestContextTask(Task):
301 310 """
302 311 This is a celery task which will create a rhodecode app instance context
303 312 for the task, patch pyramid with the original request
304 313 that created the task and also add the user to the context.
305 314 """
306 315
307 316 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
308 317 link=None, link_error=None, shadow=None, **options):
309 318 """ queue the job to run (we are in web request context here) """
310 319 from rhodecode.lib.base import get_ip_addr
311 320
312 321 req = self.app.conf['PYRAMID_REQUEST']
313 322 if not req:
314 323 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
315 324
316 325 log.debug('Running Task with class: %s. Request Class: %s',
317 326 self.__class__, req.__class__)
318 327
319 328 user_id = 0
320 329
321 330 # web case
322 331 if hasattr(req, 'user'):
323 332 user_id = req.user.user_id
324 333
325 334 # api case
326 335 elif hasattr(req, 'rpc_user'):
327 336 user_id = req.rpc_user.user_id
328 337
329 338 # we hook into kwargs since it is the only way to pass our data to
330 339 # the celery worker
331 340 environ = maybe_prepare_env(req)
332 341 options['headers'] = options.get('headers', {})
333 342 options['headers'].update({
334 343 'rhodecode_proxy_data': {
335 344 'environ': environ,
336 345 'auth_user': {
337 346 'ip_addr': get_ip_addr(req.environ),
338 347 'user_id': user_id
339 348 },
340 349 }
341 350 })
342 351
343 352 return super(RequestContextTask, self).apply_async(
344 353 args, kwargs, task_id, producer, link, link_error, shadow, **options)
General Comments 0
You need to be logged in to leave comments. Login now