##// END OF EJS Templates
fix(celery): don't use msgpack serializer due to problems with certain data types
super-admin -
r5407:ee39fc10 stable
parent child Browse files
Show More
@@ -1,371 +1,371 b''
1 1 # Copyright (C) 2010-2023 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18 """
19 19 Celery loader, run with::
20 20
21 21 celery worker \
22 22 --task-events \
23 23 --beat \
24 24 --autoscale=20,2 \
25 25 --max-tasks-per-child 1 \
26 26 --app rhodecode.lib.celerylib.loader \
27 27 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
28 28 --loglevel DEBUG --ini=.dev/dev.ini
29 29 """
30 30 from rhodecode.config.patches import inspect_getargspec, inspect_formatargspec
31 31 inspect_getargspec()
32 32 inspect_formatargspec()
33 33 # python3.11 inspect patches for backward compat on `paste` code
34 34
35 35 import logging
36 36 import importlib
37 37
38 38 import click
39 39 from celery import Celery
40 40 from celery import signals
41 41 from celery import Task
42 42 from celery import exceptions # noqa
43 43
44 44 import rhodecode
45 45
46 46 from rhodecode.lib.statsd_client import StatsdClient
47 47 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
48 48 from rhodecode.lib.ext_json import json
49 49 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
50 50 from rhodecode.lib.utils2 import str2bool
51 51 from rhodecode.model import meta
52 52
53 53 log = logging.getLogger('celery.rhodecode.loader')
54 54
55 55
56 56 imports = ['rhodecode.lib.celerylib.tasks']
57 57
58 58 try:
59 59 # try if we have EE tasks available
60 60 importlib.import_module('rc_ee')
61 61 imports.append('rc_ee.lib.celerylib.tasks')
62 62 except ImportError:
63 63 pass
64 64
65 65
66 66 base_celery_config = {
67 67 'result_backend': 'rpc://',
68 68 'result_expires': 60 * 60 * 24,
69 69 'result_persistent': True,
70 70 'imports': imports,
71 71 'worker_max_tasks_per_child': 100,
72 72 'worker_hijack_root_logger': False,
73 73 'worker_prefetch_multiplier': 1,
74 'task_serializer': 'msgpack',
74 'task_serializer': 'json',
75 75 'accept_content': ['json', 'msgpack'],
76 'result_serializer': 'msgpack',
76 'result_serializer': 'json',
77 77 'result_accept_content': ['json', 'msgpack'],
78 78
79 79 'broker_connection_retry_on_startup': True,
80 80 'database_table_names': {
81 81 'task': 'beat_taskmeta',
82 82 'group': 'beat_groupmeta',
83 83 }
84 84 }
85 85
86 86
87 87 preload_option_ini = click.Option(
88 88 ('--ini',),
89 89 help='Path to ini configuration file.'
90 90 )
91 91
92 92 preload_option_ini_var = click.Option(
93 93 ('--ini-var',),
94 94 help='Comma separated list of key=value to pass to ini.'
95 95 )
96 96
97 97
98 98 def get_logger(obj):
99 99 custom_log = logging.getLogger(
100 100 'rhodecode.task.{}'.format(obj.__class__.__name__))
101 101
102 102 if rhodecode.CELERY_ENABLED:
103 103 try:
104 104 custom_log = obj.get_logger()
105 105 except Exception:
106 106 pass
107 107
108 108 return custom_log
109 109
110 110
111 111 # init main celery app
112 112 celery_app = Celery()
113 113 celery_app.user_options['preload'].add(preload_option_ini)
114 114 celery_app.user_options['preload'].add(preload_option_ini_var)
115 115
116 116
117 117 @signals.setup_logging.connect
118 118 def setup_logging_callback(**kwargs):
119 119
120 120 if 'RC_INI_FILE' in celery_app.conf:
121 121 ini_file = celery_app.conf['RC_INI_FILE']
122 122 else:
123 123 ini_file = celery_app.user_options['RC_INI_FILE']
124 124
125 125 setup_logging(ini_file)
126 126
127 127
128 128 @signals.user_preload_options.connect
129 129 def on_preload_parsed(options, **kwargs):
130 130
131 131 ini_file = options['ini']
132 132 ini_vars = options['ini_var']
133 133
134 134 if ini_file is None:
135 135 print('You must provide the --ini argument to start celery')
136 136 exit(-1)
137 137
138 138 options = None
139 139 if ini_vars is not None:
140 140 options = parse_ini_vars(ini_vars)
141 141
142 142 celery_app.conf['RC_INI_FILE'] = ini_file
143 143 celery_app.user_options['RC_INI_FILE'] = ini_file
144 144
145 145 celery_app.conf['RC_INI_OPTIONS'] = options
146 146 celery_app.user_options['RC_INI_OPTIONS'] = options
147 147
148 148 setup_logging(ini_file)
149 149
150 150
151 151 def _init_celery(app_type=''):
152 152 from rhodecode.config.middleware import get_celery_config
153 153
154 154 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
155 155
156 156 ini_file = celery_app.conf['RC_INI_FILE']
157 157 options = celery_app.conf['RC_INI_OPTIONS']
158 158
159 159 env = None
160 160 try:
161 161 env = bootstrap(ini_file, options=options)
162 162 except Exception:
163 163 log.exception('Failed to bootstrap RhodeCode APP')
164 164
165 165 if not env:
166 166 raise EnvironmentError(
167 167 'Failed to load pyramid ENV. '
168 168 'Probably there is another error present that prevents from running pyramid app')
169 169
170 170 log.debug('Got Pyramid ENV: %s', env)
171 171
172 172 settings = env['registry'].settings
173 173 celery_settings = get_celery_config(settings)
174 174
175 175 # init and bootstrap StatsdClient
176 176 StatsdClient.setup(settings)
177 177
178 178 setup_celery_app(
179 179 app=env['app'], root=env['root'], request=env['request'],
180 180 registry=env['registry'], closer=env['closer'],
181 181 celery_settings=celery_settings)
182 182
183 183
184 184 @signals.celeryd_init.connect
185 185 def on_celeryd_init(sender=None, conf=None, **kwargs):
186 186 _init_celery('celery worker')
187 187
188 188 # fix the global flag even if it's disabled via .ini file because this
189 189 # is a worker code that doesn't need this to be disabled.
190 190 rhodecode.CELERY_ENABLED = True
191 191
192 192
193 193 @signals.beat_init.connect
194 194 def on_beat_init(sender=None, conf=None, **kwargs):
195 195 _init_celery('celery beat')
196 196
197 197
198 198 @signals.task_prerun.connect
199 199 def task_prerun_signal(task_id, task, args, **kwargs):
200 200 ping_db()
201 201 statsd = StatsdClient.statsd
202 202
203 203 if statsd:
204 204 task_repr = getattr(task, 'name', task)
205 205 statsd.incr('rhodecode_celery_task_total', tags=[
206 206 f'task:{task_repr}',
207 207 'mode:async'
208 208 ])
209 209
210 210
211 211 @signals.task_success.connect
212 212 def task_success_signal(result, **kwargs):
213 213 meta.Session.commit()
214 214 closer = celery_app.conf['PYRAMID_CLOSER']
215 215 if closer:
216 216 closer()
217 217
218 218
219 219 @signals.task_retry.connect
220 220 def task_retry_signal(
221 221 request, reason, einfo, **kwargs):
222 222 meta.Session.remove()
223 223 closer = celery_app.conf['PYRAMID_CLOSER']
224 224 if closer:
225 225 closer()
226 226
227 227
228 228 @signals.task_failure.connect
229 229 def task_failure_signal(
230 230 task_id, exception, args, kwargs, traceback, einfo, **kargs):
231 231
232 232 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
233 233 from rhodecode.lib.exc_tracking import store_exception
234 234 from rhodecode.lib.statsd_client import StatsdClient
235 235
236 236 meta.Session.remove()
237 237
238 238 # simulate sys.exc_info()
239 239 exc_info = (einfo.type, einfo.exception, einfo.tb)
240 240 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
241 241 statsd = StatsdClient.statsd
242 242 if statsd:
243 243 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
244 244 statsd.incr('rhodecode_exception_total',
245 245 tags=["exc_source:celery", "type:{}".format(exc_type)])
246 246
247 247 closer = celery_app.conf['PYRAMID_CLOSER']
248 248 if closer:
249 249 closer()
250 250
251 251
252 252 @signals.task_revoked.connect
253 253 def task_revoked_signal(
254 254 request, terminated, signum, expired, **kwargs):
255 255 closer = celery_app.conf['PYRAMID_CLOSER']
256 256 if closer:
257 257 closer()
258 258
259 259
260 260 class UNSET(object):
261 261 pass
262 262
263 263
264 264 _unset = UNSET()
265 265
266 266
267 267 def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset):
268 268
269 269 if request is not UNSET:
270 270 celery_app.conf.update({'PYRAMID_REQUEST': request})
271 271
272 272 if registry is not UNSET:
273 273 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
274 274
275 275
276 276 def setup_celery_app(app, root, request, registry, closer, celery_settings):
277 277 log.debug('Got custom celery conf: %s', celery_settings)
278 278 celery_config = base_celery_config
279 279 celery_config.update({
280 280 # store celerybeat scheduler db where the .ini file is
281 281 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
282 282 })
283 283
284 284 celery_config.update(celery_settings)
285 285 celery_app.config_from_object(celery_config)
286 286
287 287 celery_app.conf.update({'PYRAMID_APP': app})
288 288 celery_app.conf.update({'PYRAMID_ROOT': root})
289 289 celery_app.conf.update({'PYRAMID_REQUEST': request})
290 290 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
291 291 celery_app.conf.update({'PYRAMID_CLOSER': closer})
292 292
293 293
294 294 def configure_celery(config, celery_settings):
295 295 """
296 296 Helper that is called from our application creation logic. It gives
297 297 connection info into running webapp and allows execution of tasks from
298 298 RhodeCode itself
299 299 """
300 300 # store some globals into rhodecode
301 301 rhodecode.CELERY_ENABLED = str2bool(
302 302 config.registry.settings.get('use_celery'))
303 303 if rhodecode.CELERY_ENABLED:
304 304 log.info('Configuring celery based on `%s` settings', celery_settings)
305 305 setup_celery_app(
306 306 app=None, root=None, request=None, registry=config.registry,
307 307 closer=None, celery_settings=celery_settings)
308 308
309 309
310 310 def maybe_prepare_env(req):
311 311 environ = {}
312 312 try:
313 313 environ.update({
314 314 'PATH_INFO': req.environ['PATH_INFO'],
315 315 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
316 316 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
317 317 'SERVER_NAME': req.environ['SERVER_NAME'],
318 318 'SERVER_PORT': req.environ['SERVER_PORT'],
319 319 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
320 320 })
321 321 except Exception:
322 322 pass
323 323
324 324 return environ
325 325
326 326
327 327 class RequestContextTask(Task):
328 328 """
329 329 This is a celery task which will create a rhodecode app instance context
330 330 for the task, patch pyramid with the original request
331 331 that created the task and also add the user to the context.
332 332 """
333 333
334 334 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
335 335 link=None, link_error=None, shadow=None, **options):
336 336 """ queue the job to run (we are in web request context here) """
337 337 from rhodecode.lib.base import get_ip_addr
338 338
339 339 req = self.app.conf['PYRAMID_REQUEST']
340 340 if not req:
341 341 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
342 342
343 343 log.debug('Running Task with class: %s. Request Class: %s',
344 344 self.__class__, req.__class__)
345 345
346 346 user_id = 0
347 347
348 348 # web case
349 349 if hasattr(req, 'user'):
350 350 user_id = req.user.user_id
351 351
352 352 # api case
353 353 elif hasattr(req, 'rpc_user'):
354 354 user_id = req.rpc_user.user_id
355 355
356 356 # we hook into kwargs since it is the only way to pass our data to
357 357 # the celery worker
358 358 environ = maybe_prepare_env(req)
359 359 options['headers'] = options.get('headers', {})
360 360 options['headers'].update({
361 361 'rhodecode_proxy_data': {
362 362 'environ': environ,
363 363 'auth_user': {
364 364 'ip_addr': get_ip_addr(req.environ),
365 365 'user_id': user_id
366 366 },
367 367 }
368 368 })
369 369
370 370 return super(RequestContextTask, self).apply_async(
371 371 args, kwargs, task_id, producer, link, link_error, shadow, **options)
General Comments 0
You need to be logged in to leave comments. Login now