##// END OF EJS Templates
fix(celery): don't use msgpack serializer due to problems with certain data types
super-admin -
r5406:f8c36854 default
parent child Browse files
Show More
@@ -1,372 +1,372 b''
1 1 # Copyright (C) 2010-2023 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18 """
19 19 Celery loader, run with::
20 20
21 21 celery worker \
22 22 --task-events \
23 23 --beat \
24 24 --autoscale=20,2 \
25 25 --max-tasks-per-child 1 \
26 26 --app rhodecode.lib.celerylib.loader \
27 27 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
28 28 --loglevel DEBUG --ini=.dev/dev.ini
29 29 """
30 30 from rhodecode.config.patches import inspect_getargspec, inspect_formatargspec
31 31 inspect_getargspec()
32 32 inspect_formatargspec()
33 33 # python3.11 inspect patches for backward compat on `paste` code
34 34
35 35 import sys
36 36 import logging
37 37 import importlib
38 38
39 39 import click
40 40 from celery import Celery
41 41 from celery import signals
42 42 from celery import Task
43 43 from celery import exceptions # noqa
44 44
45 45 import rhodecode
46 46
47 47 from rhodecode.lib.statsd_client import StatsdClient
48 48 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
49 49 from rhodecode.lib.ext_json import json
50 50 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
51 51 from rhodecode.lib.utils2 import str2bool
52 52 from rhodecode.model import meta
53 53
54 54 log = logging.getLogger('celery.rhodecode.loader')
55 55
56 56
57 57 imports = ['rhodecode.lib.celerylib.tasks']
58 58
59 59 try:
60 60 # try if we have EE tasks available
61 61 importlib.import_module('rc_ee')
62 62 imports.append('rc_ee.lib.celerylib.tasks')
63 63 except ImportError:
64 64 pass
65 65
66 66
67 67 base_celery_config = {
68 68 'result_backend': 'rpc://',
69 69 'result_expires': 60 * 60 * 24,
70 70 'result_persistent': True,
71 71 'imports': imports,
72 72 'worker_max_tasks_per_child': 100,
73 73 'worker_hijack_root_logger': False,
74 74 'worker_prefetch_multiplier': 1,
75 'task_serializer': 'msgpack',
75 'task_serializer': 'json',
76 76 'accept_content': ['json', 'msgpack'],
77 'result_serializer': 'msgpack',
77 'result_serializer': 'json',
78 78 'result_accept_content': ['json', 'msgpack'],
79 79
80 80 'broker_connection_retry_on_startup': True,
81 81 'database_table_names': {
82 82 'task': 'beat_taskmeta',
83 83 'group': 'beat_groupmeta',
84 84 }
85 85 }
86 86
87 87
88 88 preload_option_ini = click.Option(
89 89 ('--ini',),
90 90 help='Path to ini configuration file.'
91 91 )
92 92
93 93 preload_option_ini_var = click.Option(
94 94 ('--ini-var',),
95 95 help='Comma separated list of key=value to pass to ini.'
96 96 )
97 97
98 98
99 99 def get_logger(obj):
100 100 custom_log = logging.getLogger(
101 101 'rhodecode.task.{}'.format(obj.__class__.__name__))
102 102
103 103 if rhodecode.CELERY_ENABLED:
104 104 try:
105 105 custom_log = obj.get_logger()
106 106 except Exception:
107 107 pass
108 108
109 109 return custom_log
110 110
111 111
112 112 # init main celery app
113 113 celery_app = Celery()
114 114 celery_app.user_options['preload'].add(preload_option_ini)
115 115 celery_app.user_options['preload'].add(preload_option_ini_var)
116 116
117 117
118 118 @signals.setup_logging.connect
119 119 def setup_logging_callback(**kwargs):
120 120
121 121 if 'RC_INI_FILE' in celery_app.conf:
122 122 ini_file = celery_app.conf['RC_INI_FILE']
123 123 else:
124 124 ini_file = celery_app.user_options['RC_INI_FILE']
125 125
126 126 setup_logging(ini_file)
127 127
128 128
129 129 @signals.user_preload_options.connect
130 130 def on_preload_parsed(options, **kwargs):
131 131
132 132 ini_file = options['ini']
133 133 ini_vars = options['ini_var']
134 134
135 135 if ini_file is None:
136 136 print('You must provide the --ini argument to start celery')
137 137 exit(-1)
138 138
139 139 options = None
140 140 if ini_vars is not None:
141 141 options = parse_ini_vars(ini_vars)
142 142
143 143 celery_app.conf['RC_INI_FILE'] = ini_file
144 144 celery_app.user_options['RC_INI_FILE'] = ini_file
145 145
146 146 celery_app.conf['RC_INI_OPTIONS'] = options
147 147 celery_app.user_options['RC_INI_OPTIONS'] = options
148 148
149 149 setup_logging(ini_file)
150 150
151 151
152 152 def _init_celery(app_type=''):
153 153 from rhodecode.config.middleware import get_celery_config
154 154
155 155 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
156 156
157 157 ini_file = celery_app.conf['RC_INI_FILE']
158 158 options = celery_app.conf['RC_INI_OPTIONS']
159 159
160 160 env = None
161 161 try:
162 162 env = bootstrap(ini_file, options=options)
163 163 except Exception:
164 164 log.exception('Failed to bootstrap RhodeCode APP. '
165 165 'Probably there is another error present that prevents from running pyramid app')
166 166
167 167 if not env:
168 168 # we use sys.exit here since we need to signal app startup failure for docker to restart the container and re-try
169 169 sys.exit(1)
170 170
171 171 log.debug('Got Pyramid ENV: %s', env)
172 172
173 173 settings = env['registry'].settings
174 174 celery_settings = get_celery_config(settings)
175 175
176 176 # init and bootstrap StatsdClient
177 177 StatsdClient.setup(settings)
178 178
179 179 setup_celery_app(
180 180 app=env['app'], root=env['root'], request=env['request'],
181 181 registry=env['registry'], closer=env['closer'],
182 182 celery_settings=celery_settings)
183 183
184 184
185 185 @signals.celeryd_init.connect
186 186 def on_celeryd_init(sender=None, conf=None, **kwargs):
187 187 _init_celery('celery worker')
188 188
189 189 # fix the global flag even if it's disabled via .ini file because this
190 190 # is a worker code that doesn't need this to be disabled.
191 191 rhodecode.CELERY_ENABLED = True
192 192
193 193
194 194 @signals.beat_init.connect
195 195 def on_beat_init(sender=None, conf=None, **kwargs):
196 196 _init_celery('celery beat')
197 197
198 198
199 199 @signals.task_prerun.connect
200 200 def task_prerun_signal(task_id, task, args, **kwargs):
201 201 ping_db()
202 202 statsd = StatsdClient.statsd
203 203
204 204 if statsd:
205 205 task_repr = getattr(task, 'name', task)
206 206 statsd.incr('rhodecode_celery_task_total', tags=[
207 207 f'task:{task_repr}',
208 208 'mode:async'
209 209 ])
210 210
211 211
212 212 @signals.task_success.connect
213 213 def task_success_signal(result, **kwargs):
214 214 meta.Session.commit()
215 215 closer = celery_app.conf['PYRAMID_CLOSER']
216 216 if closer:
217 217 closer()
218 218
219 219
220 220 @signals.task_retry.connect
221 221 def task_retry_signal(
222 222 request, reason, einfo, **kwargs):
223 223 meta.Session.remove()
224 224 closer = celery_app.conf['PYRAMID_CLOSER']
225 225 if closer:
226 226 closer()
227 227
228 228
229 229 @signals.task_failure.connect
230 230 def task_failure_signal(
231 231 task_id, exception, args, kwargs, traceback, einfo, **kargs):
232 232
233 233 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
234 234 from rhodecode.lib.exc_tracking import store_exception
235 235 from rhodecode.lib.statsd_client import StatsdClient
236 236
237 237 meta.Session.remove()
238 238
239 239 # simulate sys.exc_info()
240 240 exc_info = (einfo.type, einfo.exception, einfo.tb)
241 241 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
242 242 statsd = StatsdClient.statsd
243 243 if statsd:
244 244 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
245 245 statsd.incr('rhodecode_exception_total',
246 246 tags=["exc_source:celery", "type:{}".format(exc_type)])
247 247
248 248 closer = celery_app.conf['PYRAMID_CLOSER']
249 249 if closer:
250 250 closer()
251 251
252 252
253 253 @signals.task_revoked.connect
254 254 def task_revoked_signal(
255 255 request, terminated, signum, expired, **kwargs):
256 256 closer = celery_app.conf['PYRAMID_CLOSER']
257 257 if closer:
258 258 closer()
259 259
260 260
261 261 class UNSET(object):
262 262 pass
263 263
264 264
265 265 _unset = UNSET()
266 266
267 267
268 268 def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset):
269 269
270 270 if request is not UNSET:
271 271 celery_app.conf.update({'PYRAMID_REQUEST': request})
272 272
273 273 if registry is not UNSET:
274 274 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
275 275
276 276
277 277 def setup_celery_app(app, root, request, registry, closer, celery_settings):
278 278 log.debug('Got custom celery conf: %s', celery_settings)
279 279 celery_config = base_celery_config
280 280 celery_config.update({
281 281 # store celerybeat scheduler db where the .ini file is
282 282 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
283 283 })
284 284
285 285 celery_config.update(celery_settings)
286 286 celery_app.config_from_object(celery_config)
287 287
288 288 celery_app.conf.update({'PYRAMID_APP': app})
289 289 celery_app.conf.update({'PYRAMID_ROOT': root})
290 290 celery_app.conf.update({'PYRAMID_REQUEST': request})
291 291 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
292 292 celery_app.conf.update({'PYRAMID_CLOSER': closer})
293 293
294 294
295 295 def configure_celery(config, celery_settings):
296 296 """
297 297 Helper that is called from our application creation logic. It gives
298 298 connection info into running webapp and allows execution of tasks from
299 299 RhodeCode itself
300 300 """
301 301 # store some globals into rhodecode
302 302 rhodecode.CELERY_ENABLED = str2bool(
303 303 config.registry.settings.get('use_celery'))
304 304 if rhodecode.CELERY_ENABLED:
305 305 log.info('Configuring celery based on `%s` settings', celery_settings)
306 306 setup_celery_app(
307 307 app=None, root=None, request=None, registry=config.registry,
308 308 closer=None, celery_settings=celery_settings)
309 309
310 310
311 311 def maybe_prepare_env(req):
312 312 environ = {}
313 313 try:
314 314 environ.update({
315 315 'PATH_INFO': req.environ['PATH_INFO'],
316 316 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
317 317 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
318 318 'SERVER_NAME': req.environ['SERVER_NAME'],
319 319 'SERVER_PORT': req.environ['SERVER_PORT'],
320 320 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
321 321 })
322 322 except Exception:
323 323 pass
324 324
325 325 return environ
326 326
327 327
328 328 class RequestContextTask(Task):
329 329 """
330 330 This is a celery task which will create a rhodecode app instance context
331 331 for the task, patch pyramid with the original request
332 332 that created the task and also add the user to the context.
333 333 """
334 334
335 335 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
336 336 link=None, link_error=None, shadow=None, **options):
337 337 """ queue the job to run (we are in web request context here) """
338 338 from rhodecode.lib.base import get_ip_addr
339 339
340 340 req = self.app.conf['PYRAMID_REQUEST']
341 341 if not req:
342 342 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
343 343
344 344 log.debug('Running Task with class: %s. Request Class: %s',
345 345 self.__class__, req.__class__)
346 346
347 347 user_id = 0
348 348
349 349 # web case
350 350 if hasattr(req, 'user'):
351 351 user_id = req.user.user_id
352 352
353 353 # api case
354 354 elif hasattr(req, 'rpc_user'):
355 355 user_id = req.rpc_user.user_id
356 356
357 357 # we hook into kwargs since it is the only way to pass our data to
358 358 # the celery worker
359 359 environ = maybe_prepare_env(req)
360 360 options['headers'] = options.get('headers', {})
361 361 options['headers'].update({
362 362 'rhodecode_proxy_data': {
363 363 'environ': environ,
364 364 'auth_user': {
365 365 'ip_addr': get_ip_addr(req.environ),
366 366 'user_id': user_id
367 367 },
368 368 }
369 369 })
370 370
371 371 return super(RequestContextTask, self).apply_async(
372 372 args, kwargs, task_id, producer, link, link_error, shadow, **options)
General Comments 0
You need to be logged in to leave comments. Login now