##// END OF EJS Templates
feat(celery): set msgpack as default serializer and drop json_ext as it's actually slower
super-admin -
r5296:121552d9 default
parent child Browse files
Show More
@@ -1,374 +1,369 b''
1 1 # Copyright (C) 2010-2023 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18 """
19 19 Celery loader, run with::
20 20
21 21 celery worker \
22 22 --task-events \
23 23 --beat \
24 24 --autoscale=20,2 \
25 25 --max-tasks-per-child 1 \
26 26 --app rhodecode.lib.celerylib.loader \
27 27 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
28 28 --loglevel DEBUG --ini=.dev/dev.ini
29 29 """
30 30 from rhodecode.config.patches import inspect_getargspec, inspect_formatargspec
31 31 inspect_getargspec()
32 32 inspect_formatargspec()
33 33 # python3.11 inspect patches for backward compat on `paste` code
34 34
35 35 import logging
36 36 import importlib
37 37
38 38 import click
39 39 from celery import Celery
40 40 from celery import signals
41 41 from celery import Task
42 42 from celery import exceptions # noqa
43 from kombu.serialization import register
44 43
45 44 import rhodecode
46 45
47 46 from rhodecode.lib.statsd_client import StatsdClient
48 47 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
49 48 from rhodecode.lib.ext_json import json
50 49 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
51 50 from rhodecode.lib.utils2 import str2bool
52 51 from rhodecode.model import meta
53 52
54
55 register('json_ext', json.dumps, json.loads,
56 content_type='application/x-json-ext',
57 content_encoding='utf-8')
58
59 53 log = logging.getLogger('celery.rhodecode.loader')
60 54
61 55
62 56 imports = ['rhodecode.lib.celerylib.tasks']
63 57
64 58 try:
65 59 # try if we have EE tasks available
66 60 importlib.import_module('rc_ee')
67 61 imports.append('rc_ee.lib.celerylib.tasks')
68 62 except ImportError:
69 63 pass
70 64
71 65
72 66 base_celery_config = {
73 67 'result_backend': 'rpc://',
74 68 'result_expires': 60 * 60 * 24,
75 69 'result_persistent': True,
76 70 'imports': imports,
77 71 'worker_max_tasks_per_child': 20,
78 'accept_content': ['json_ext', 'json'],
79 'task_serializer': 'json_ext',
80 'result_serializer': 'json_ext',
72 'task_serializer': 'msgpack',
73 'accept_content': ['json', 'msgpack'],
74 'result_serializer': 'msgpack',
75 'result_accept_content': ['json', 'msgpack'],
81 76 'worker_hijack_root_logger': False,
82 77 'broker_connection_retry_on_startup': True,
83 78 'database_table_names': {
84 79 'task': 'beat_taskmeta',
85 80 'group': 'beat_groupmeta',
86 81 }
87 82 }
88 83
89 84
90 85 preload_option_ini = click.Option(
91 86 ('--ini',),
92 87 help='Path to ini configuration file.'
93 88 )
94 89
95 90 preload_option_ini_var = click.Option(
96 91 ('--ini-var',),
97 92 help='Comma separated list of key=value to pass to ini.'
98 93 )
99 94
100 95
101 96 def get_logger(obj):
102 97 custom_log = logging.getLogger(
103 98 'rhodecode.task.{}'.format(obj.__class__.__name__))
104 99
105 100 if rhodecode.CELERY_ENABLED:
106 101 try:
107 102 custom_log = obj.get_logger()
108 103 except Exception:
109 104 pass
110 105
111 106 return custom_log
112 107
113 108
114 109 # init main celery app
115 110 celery_app = Celery()
116 111 celery_app.user_options['preload'].add(preload_option_ini)
117 112 celery_app.user_options['preload'].add(preload_option_ini_var)
118 113
119 114
120 115 @signals.setup_logging.connect
121 116 def setup_logging_callback(**kwargs):
122 117
123 118 if 'RC_INI_FILE' in celery_app.conf:
124 119 ini_file = celery_app.conf['RC_INI_FILE']
125 120 else:
126 121 ini_file = celery_app.user_options['RC_INI_FILE']
127 122
128 123 setup_logging(ini_file)
129 124
130 125
131 126 @signals.user_preload_options.connect
132 127 def on_preload_parsed(options, **kwargs):
133 128
134 129 ini_file = options['ini']
135 130 ini_vars = options['ini_var']
136 131
137 132 if ini_file is None:
138 133 print('You must provide the --ini argument to start celery')
139 134 exit(-1)
140 135
141 136 options = None
142 137 if ini_vars is not None:
143 138 options = parse_ini_vars(ini_vars)
144 139
145 140 celery_app.conf['RC_INI_FILE'] = ini_file
146 141 celery_app.user_options['RC_INI_FILE'] = ini_file
147 142
148 143 celery_app.conf['RC_INI_OPTIONS'] = options
149 144 celery_app.user_options['RC_INI_OPTIONS'] = options
150 145
151 146 setup_logging(ini_file)
152 147
153 148
154 149 def _init_celery(app_type=''):
155 150 from rhodecode.config.middleware import get_celery_config
156 151
157 152 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
158 153
159 154 ini_file = celery_app.conf['RC_INI_FILE']
160 155 options = celery_app.conf['RC_INI_OPTIONS']
161 156
162 157 env = None
163 158 try:
164 159 env = bootstrap(ini_file, options=options)
165 160 except Exception:
166 161 log.exception('Failed to bootstrap RhodeCode APP')
167 162
168 163 if not env:
169 164 raise EnvironmentError(
170 165 'Failed to load pyramid ENV. '
171 166 'Probably there is another error present that prevents from running pyramid app')
172 167
173 168 log.debug('Got Pyramid ENV: %s', env)
174 169
175 170 settings = env['registry'].settings
176 171 celery_settings = get_celery_config(settings)
177 172
178 173 # init and bootstrap StatsdClient
179 174 StatsdClient.setup(settings)
180 175
181 176 setup_celery_app(
182 177 app=env['app'], root=env['root'], request=env['request'],
183 178 registry=env['registry'], closer=env['closer'],
184 179 celery_settings=celery_settings)
185 180
186 181
187 182 @signals.celeryd_init.connect
188 183 def on_celeryd_init(sender=None, conf=None, **kwargs):
189 184 _init_celery('celery worker')
190 185
191 186 # fix the global flag even if it's disabled via .ini file because this
192 187 # is a worker code that doesn't need this to be disabled.
193 188 rhodecode.CELERY_ENABLED = True
194 189
195 190
196 191 @signals.beat_init.connect
197 192 def on_beat_init(sender=None, conf=None, **kwargs):
198 193 _init_celery('celery beat')
199 194
200 195
201 196 @signals.task_prerun.connect
202 197 def task_prerun_signal(task_id, task, args, **kwargs):
203 198 ping_db()
204 199 statsd = StatsdClient.statsd
205 200
206 201 if statsd:
207 202 task_repr = getattr(task, 'name', task)
208 203 statsd.incr('rhodecode_celery_task_total', tags=[
209 204 f'task:{task_repr}',
210 205 'mode:async'
211 206 ])
212 207
213 208
214 209 @signals.task_success.connect
215 210 def task_success_signal(result, **kwargs):
216 211 meta.Session.commit()
217 212 closer = celery_app.conf['PYRAMID_CLOSER']
218 213 if closer:
219 214 closer()
220 215
221 216
222 217 @signals.task_retry.connect
223 218 def task_retry_signal(
224 219 request, reason, einfo, **kwargs):
225 220 meta.Session.remove()
226 221 closer = celery_app.conf['PYRAMID_CLOSER']
227 222 if closer:
228 223 closer()
229 224
230 225
231 226 @signals.task_failure.connect
232 227 def task_failure_signal(
233 228 task_id, exception, args, kwargs, traceback, einfo, **kargs):
234 229
235 230 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
236 231 from rhodecode.lib.exc_tracking import store_exception
237 232 from rhodecode.lib.statsd_client import StatsdClient
238 233
239 234 meta.Session.remove()
240 235
241 236 # simulate sys.exc_info()
242 237 exc_info = (einfo.type, einfo.exception, einfo.tb)
243 238 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
244 239 statsd = StatsdClient.statsd
245 240 if statsd:
246 241 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
247 242 statsd.incr('rhodecode_exception_total',
248 243 tags=["exc_source:celery", "type:{}".format(exc_type)])
249 244
250 245 closer = celery_app.conf['PYRAMID_CLOSER']
251 246 if closer:
252 247 closer()
253 248
254 249
255 250 @signals.task_revoked.connect
256 251 def task_revoked_signal(
257 252 request, terminated, signum, expired, **kwargs):
258 253 closer = celery_app.conf['PYRAMID_CLOSER']
259 254 if closer:
260 255 closer()
261 256
262 257
263 258 class UNSET(object):
264 259 pass
265 260
266 261
267 262 _unset = UNSET()
268 263
269 264
270 265 def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset):
271 266
272 267 if request is not UNSET:
273 268 celery_app.conf.update({'PYRAMID_REQUEST': request})
274 269
275 270 if registry is not UNSET:
276 271 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
277 272
278 273
279 274 def setup_celery_app(app, root, request, registry, closer, celery_settings):
280 275 log.debug('Got custom celery conf: %s', celery_settings)
281 276 celery_config = base_celery_config
282 277 celery_config.update({
283 278 # store celerybeat scheduler db where the .ini file is
284 279 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
285 280 })
286 281
287 282 celery_config.update(celery_settings)
288 283 celery_app.config_from_object(celery_config)
289 284
290 285 celery_app.conf.update({'PYRAMID_APP': app})
291 286 celery_app.conf.update({'PYRAMID_ROOT': root})
292 287 celery_app.conf.update({'PYRAMID_REQUEST': request})
293 288 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
294 289 celery_app.conf.update({'PYRAMID_CLOSER': closer})
295 290
296 291
297 292 def configure_celery(config, celery_settings):
298 293 """
299 294 Helper that is called from our application creation logic. It gives
300 295 connection info into running webapp and allows execution of tasks from
301 296 RhodeCode itself
302 297 """
303 298 # store some globals into rhodecode
304 299 rhodecode.CELERY_ENABLED = str2bool(
305 300 config.registry.settings.get('use_celery'))
306 301 if rhodecode.CELERY_ENABLED:
307 302 log.info('Configuring celery based on `%s` settings', celery_settings)
308 303 setup_celery_app(
309 304 app=None, root=None, request=None, registry=config.registry,
310 305 closer=None, celery_settings=celery_settings)
311 306
312 307
313 308 def maybe_prepare_env(req):
314 309 environ = {}
315 310 try:
316 311 environ.update({
317 312 'PATH_INFO': req.environ['PATH_INFO'],
318 313 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
319 314 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
320 315 'SERVER_NAME': req.environ['SERVER_NAME'],
321 316 'SERVER_PORT': req.environ['SERVER_PORT'],
322 317 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
323 318 })
324 319 except Exception:
325 320 pass
326 321
327 322 return environ
328 323
329 324
330 325 class RequestContextTask(Task):
331 326 """
332 327 This is a celery task which will create a rhodecode app instance context
333 328 for the task, patch pyramid with the original request
334 329 that created the task and also add the user to the context.
335 330 """
336 331
337 332 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
338 333 link=None, link_error=None, shadow=None, **options):
339 334 """ queue the job to run (we are in web request context here) """
340 335 from rhodecode.lib.base import get_ip_addr
341 336
342 337 req = self.app.conf['PYRAMID_REQUEST']
343 338 if not req:
344 339 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
345 340
346 341 log.debug('Running Task with class: %s. Request Class: %s',
347 342 self.__class__, req.__class__)
348 343
349 344 user_id = 0
350 345
351 346 # web case
352 347 if hasattr(req, 'user'):
353 348 user_id = req.user.user_id
354 349
355 350 # api case
356 351 elif hasattr(req, 'rpc_user'):
357 352 user_id = req.rpc_user.user_id
358 353
359 354 # we hook into kwargs since it is the only way to pass our data to
360 355 # the celery worker
361 356 environ = maybe_prepare_env(req)
362 357 options['headers'] = options.get('headers', {})
363 358 options['headers'].update({
364 359 'rhodecode_proxy_data': {
365 360 'environ': environ,
366 361 'auth_user': {
367 362 'ip_addr': get_ip_addr(req.environ),
368 363 'user_id': user_id
369 364 },
370 365 }
371 366 })
372 367
373 368 return super(RequestContextTask, self).apply_async(
374 369 args, kwargs, task_id, producer, link, link_error, shadow, **options)
General Comments 0
You need to be logged in to leave comments. Login now