##// END OF EJS Templates
fix(celery): remove warning about retry connection flag
super-admin -
r5291:c778c694 default
parent child Browse files
Show More
@@ -1,373 +1,374 b''
1 1 # Copyright (C) 2010-2023 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18 """
19 19 Celery loader, run with::
20 20
21 21 celery worker \
22 22 --task-events \
23 23 --beat \
24 24 --autoscale=20,2 \
25 25 --max-tasks-per-child 1 \
26 26 --app rhodecode.lib.celerylib.loader \
27 27 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
28 28 --loglevel DEBUG --ini=.dev/dev.ini
29 29 """
30 30 from rhodecode.config.patches import inspect_getargspec, inspect_formatargspec
31 31 inspect_getargspec()
32 32 inspect_formatargspec()
33 33 # python3.11 inspect patches for backward compat on `paste` code
34 34
35 35 import logging
36 36 import importlib
37 37
38 38 import click
39 39 from celery import Celery
40 40 from celery import signals
41 41 from celery import Task
42 42 from celery import exceptions # noqa
43 43 from kombu.serialization import register
44 44
45 45 import rhodecode
46 46
47 47 from rhodecode.lib.statsd_client import StatsdClient
48 48 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
49 49 from rhodecode.lib.ext_json import json
50 50 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
51 51 from rhodecode.lib.utils2 import str2bool
52 52 from rhodecode.model import meta
53 53
54 54
55 55 register('json_ext', json.dumps, json.loads,
56 56 content_type='application/x-json-ext',
57 57 content_encoding='utf-8')
58 58
59 59 log = logging.getLogger('celery.rhodecode.loader')
60 60
61 61
62 62 imports = ['rhodecode.lib.celerylib.tasks']
63 63
64 64 try:
65 65 # try if we have EE tasks available
66 66 importlib.import_module('rc_ee')
67 67 imports.append('rc_ee.lib.celerylib.tasks')
68 68 except ImportError:
69 69 pass
70 70
71 71
72 72 base_celery_config = {
73 73 'result_backend': 'rpc://',
74 74 'result_expires': 60 * 60 * 24,
75 75 'result_persistent': True,
76 76 'imports': imports,
77 77 'worker_max_tasks_per_child': 20,
78 78 'accept_content': ['json_ext', 'json'],
79 79 'task_serializer': 'json_ext',
80 80 'result_serializer': 'json_ext',
81 81 'worker_hijack_root_logger': False,
82 'broker_connection_retry_on_startup': True,
82 83 'database_table_names': {
83 84 'task': 'beat_taskmeta',
84 85 'group': 'beat_groupmeta',
85 86 }
86 87 }
87 88
88 89
89 90 preload_option_ini = click.Option(
90 91 ('--ini',),
91 92 help='Path to ini configuration file.'
92 93 )
93 94
94 95 preload_option_ini_var = click.Option(
95 96 ('--ini-var',),
96 97 help='Comma separated list of key=value to pass to ini.'
97 98 )
98 99
99 100
100 101 def get_logger(obj):
101 102 custom_log = logging.getLogger(
102 103 'rhodecode.task.{}'.format(obj.__class__.__name__))
103 104
104 105 if rhodecode.CELERY_ENABLED:
105 106 try:
106 107 custom_log = obj.get_logger()
107 108 except Exception:
108 109 pass
109 110
110 111 return custom_log
111 112
112 113
113 114 # init main celery app
114 115 celery_app = Celery()
115 116 celery_app.user_options['preload'].add(preload_option_ini)
116 117 celery_app.user_options['preload'].add(preload_option_ini_var)
117 118
118 119
119 120 @signals.setup_logging.connect
120 121 def setup_logging_callback(**kwargs):
121 122
122 123 if 'RC_INI_FILE' in celery_app.conf:
123 124 ini_file = celery_app.conf['RC_INI_FILE']
124 125 else:
125 126 ini_file = celery_app.user_options['RC_INI_FILE']
126 127
127 128 setup_logging(ini_file)
128 129
129 130
130 131 @signals.user_preload_options.connect
131 132 def on_preload_parsed(options, **kwargs):
132 133
133 134 ini_file = options['ini']
134 135 ini_vars = options['ini_var']
135 136
136 137 if ini_file is None:
137 138 print('You must provide the --ini argument to start celery')
138 139 exit(-1)
139 140
140 141 options = None
141 142 if ini_vars is not None:
142 143 options = parse_ini_vars(ini_vars)
143 144
144 145 celery_app.conf['RC_INI_FILE'] = ini_file
145 146 celery_app.user_options['RC_INI_FILE'] = ini_file
146 147
147 148 celery_app.conf['RC_INI_OPTIONS'] = options
148 149 celery_app.user_options['RC_INI_OPTIONS'] = options
149 150
150 151 setup_logging(ini_file)
151 152
152 153
153 154 def _init_celery(app_type=''):
154 155 from rhodecode.config.middleware import get_celery_config
155 156
156 157 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
157 158
158 159 ini_file = celery_app.conf['RC_INI_FILE']
159 160 options = celery_app.conf['RC_INI_OPTIONS']
160 161
161 162 env = None
162 163 try:
163 164 env = bootstrap(ini_file, options=options)
164 165 except Exception:
165 166 log.exception('Failed to bootstrap RhodeCode APP')
166 167
167 168 if not env:
168 169 raise EnvironmentError(
169 170 'Failed to load pyramid ENV. '
170 171 'Probably there is another error present that prevents from running pyramid app')
171 172
172 173 log.debug('Got Pyramid ENV: %s', env)
173 174
174 175 settings = env['registry'].settings
175 176 celery_settings = get_celery_config(settings)
176 177
177 178 # init and bootstrap StatsdClient
178 179 StatsdClient.setup(settings)
179 180
180 181 setup_celery_app(
181 182 app=env['app'], root=env['root'], request=env['request'],
182 183 registry=env['registry'], closer=env['closer'],
183 184 celery_settings=celery_settings)
184 185
185 186
186 187 @signals.celeryd_init.connect
187 188 def on_celeryd_init(sender=None, conf=None, **kwargs):
188 189 _init_celery('celery worker')
189 190
190 191 # fix the global flag even if it's disabled via .ini file because this
191 192 # is a worker code that doesn't need this to be disabled.
192 193 rhodecode.CELERY_ENABLED = True
193 194
194 195
195 196 @signals.beat_init.connect
196 197 def on_beat_init(sender=None, conf=None, **kwargs):
197 198 _init_celery('celery beat')
198 199
199 200
200 201 @signals.task_prerun.connect
201 202 def task_prerun_signal(task_id, task, args, **kwargs):
202 203 ping_db()
203 204 statsd = StatsdClient.statsd
204 205
205 206 if statsd:
206 207 task_repr = getattr(task, 'name', task)
207 208 statsd.incr('rhodecode_celery_task_total', tags=[
208 209 f'task:{task_repr}',
209 210 'mode:async'
210 211 ])
211 212
212 213
213 214 @signals.task_success.connect
214 215 def task_success_signal(result, **kwargs):
215 216 meta.Session.commit()
216 217 closer = celery_app.conf['PYRAMID_CLOSER']
217 218 if closer:
218 219 closer()
219 220
220 221
221 222 @signals.task_retry.connect
222 223 def task_retry_signal(
223 224 request, reason, einfo, **kwargs):
224 225 meta.Session.remove()
225 226 closer = celery_app.conf['PYRAMID_CLOSER']
226 227 if closer:
227 228 closer()
228 229
229 230
230 231 @signals.task_failure.connect
231 232 def task_failure_signal(
232 233 task_id, exception, args, kwargs, traceback, einfo, **kargs):
233 234
234 235 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
235 236 from rhodecode.lib.exc_tracking import store_exception
236 237 from rhodecode.lib.statsd_client import StatsdClient
237 238
238 239 meta.Session.remove()
239 240
240 241 # simulate sys.exc_info()
241 242 exc_info = (einfo.type, einfo.exception, einfo.tb)
242 243 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
243 244 statsd = StatsdClient.statsd
244 245 if statsd:
245 246 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
246 247 statsd.incr('rhodecode_exception_total',
247 248 tags=["exc_source:celery", "type:{}".format(exc_type)])
248 249
249 250 closer = celery_app.conf['PYRAMID_CLOSER']
250 251 if closer:
251 252 closer()
252 253
253 254
254 255 @signals.task_revoked.connect
255 256 def task_revoked_signal(
256 257 request, terminated, signum, expired, **kwargs):
257 258 closer = celery_app.conf['PYRAMID_CLOSER']
258 259 if closer:
259 260 closer()
260 261
261 262
262 263 class UNSET(object):
263 264 pass
264 265
265 266
266 267 _unset = UNSET()
267 268
268 269
269 270 def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset):
270 271
271 272 if request is not UNSET:
272 273 celery_app.conf.update({'PYRAMID_REQUEST': request})
273 274
274 275 if registry is not UNSET:
275 276 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
276 277
277 278
278 279 def setup_celery_app(app, root, request, registry, closer, celery_settings):
279 280 log.debug('Got custom celery conf: %s', celery_settings)
280 281 celery_config = base_celery_config
281 282 celery_config.update({
282 283 # store celerybeat scheduler db where the .ini file is
283 284 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
284 285 })
285 286
286 287 celery_config.update(celery_settings)
287 288 celery_app.config_from_object(celery_config)
288 289
289 290 celery_app.conf.update({'PYRAMID_APP': app})
290 291 celery_app.conf.update({'PYRAMID_ROOT': root})
291 292 celery_app.conf.update({'PYRAMID_REQUEST': request})
292 293 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
293 294 celery_app.conf.update({'PYRAMID_CLOSER': closer})
294 295
295 296
296 297 def configure_celery(config, celery_settings):
297 298 """
298 299 Helper that is called from our application creation logic. It gives
299 300 connection info into running webapp and allows execution of tasks from
300 301 RhodeCode itself
301 302 """
302 303 # store some globals into rhodecode
303 304 rhodecode.CELERY_ENABLED = str2bool(
304 305 config.registry.settings.get('use_celery'))
305 306 if rhodecode.CELERY_ENABLED:
306 307 log.info('Configuring celery based on `%s` settings', celery_settings)
307 308 setup_celery_app(
308 309 app=None, root=None, request=None, registry=config.registry,
309 310 closer=None, celery_settings=celery_settings)
310 311
311 312
312 313 def maybe_prepare_env(req):
313 314 environ = {}
314 315 try:
315 316 environ.update({
316 317 'PATH_INFO': req.environ['PATH_INFO'],
317 318 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
318 319 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
319 320 'SERVER_NAME': req.environ['SERVER_NAME'],
320 321 'SERVER_PORT': req.environ['SERVER_PORT'],
321 322 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
322 323 })
323 324 except Exception:
324 325 pass
325 326
326 327 return environ
327 328
328 329
329 330 class RequestContextTask(Task):
330 331 """
331 332 This is a celery task which will create a rhodecode app instance context
332 333 for the task, patch pyramid with the original request
333 334 that created the task and also add the user to the context.
334 335 """
335 336
336 337 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
337 338 link=None, link_error=None, shadow=None, **options):
338 339 """ queue the job to run (we are in web request context here) """
339 340 from rhodecode.lib.base import get_ip_addr
340 341
341 342 req = self.app.conf['PYRAMID_REQUEST']
342 343 if not req:
343 344 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
344 345
345 346 log.debug('Running Task with class: %s. Request Class: %s',
346 347 self.__class__, req.__class__)
347 348
348 349 user_id = 0
349 350
350 351 # web case
351 352 if hasattr(req, 'user'):
352 353 user_id = req.user.user_id
353 354
354 355 # api case
355 356 elif hasattr(req, 'rpc_user'):
356 357 user_id = req.rpc_user.user_id
357 358
358 359 # we hook into kwargs since it is the only way to pass our data to
359 360 # the celery worker
360 361 environ = maybe_prepare_env(req)
361 362 options['headers'] = options.get('headers', {})
362 363 options['headers'].update({
363 364 'rhodecode_proxy_data': {
364 365 'environ': environ,
365 366 'auth_user': {
366 367 'ip_addr': get_ip_addr(req.environ),
367 368 'user_id': user_id
368 369 },
369 370 }
370 371 })
371 372
372 373 return super(RequestContextTask, self).apply_async(
373 374 args, kwargs, task_id, producer, link, link_error, shadow, **options)
General Comments 0
You need to be logged in to leave comments. Login now