##// END OF EJS Templates
fix(celery): signal startup failure with sys.exit to recycle celery worker on errors
super-admin -
r5403:64d7f92c default
parent child Browse files
Show More
@@ -1,371 +1,372 b''
1 1 # Copyright (C) 2010-2023 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18 """
19 19 Celery loader, run with::
20 20
21 21 celery worker \
22 22 --task-events \
23 23 --beat \
24 24 --autoscale=20,2 \
25 25 --max-tasks-per-child 1 \
26 26 --app rhodecode.lib.celerylib.loader \
27 27 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
28 28 --loglevel DEBUG --ini=.dev/dev.ini
29 29 """
30 30 from rhodecode.config.patches import inspect_getargspec, inspect_formatargspec
31 31 inspect_getargspec()
32 32 inspect_formatargspec()
33 33 # python3.11 inspect patches for backward compat on `paste` code
34 34
35 import sys
35 36 import logging
36 37 import importlib
37 38
38 39 import click
39 40 from celery import Celery
40 41 from celery import signals
41 42 from celery import Task
42 43 from celery import exceptions # noqa
43 44
44 45 import rhodecode
45 46
46 47 from rhodecode.lib.statsd_client import StatsdClient
47 48 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
48 49 from rhodecode.lib.ext_json import json
49 50 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
50 51 from rhodecode.lib.utils2 import str2bool
51 52 from rhodecode.model import meta
52 53
53 54 log = logging.getLogger('celery.rhodecode.loader')
54 55
55 56
56 57 imports = ['rhodecode.lib.celerylib.tasks']
57 58
58 59 try:
59 60 # try if we have EE tasks available
60 61 importlib.import_module('rc_ee')
61 62 imports.append('rc_ee.lib.celerylib.tasks')
62 63 except ImportError:
63 64 pass
64 65
65 66
66 67 base_celery_config = {
67 68 'result_backend': 'rpc://',
68 69 'result_expires': 60 * 60 * 24,
69 70 'result_persistent': True,
70 71 'imports': imports,
71 72 'worker_max_tasks_per_child': 100,
72 73 'worker_hijack_root_logger': False,
73 74 'worker_prefetch_multiplier': 1,
74 75 'task_serializer': 'msgpack',
75 76 'accept_content': ['json', 'msgpack'],
76 77 'result_serializer': 'msgpack',
77 78 'result_accept_content': ['json', 'msgpack'],
78 79
79 80 'broker_connection_retry_on_startup': True,
80 81 'database_table_names': {
81 82 'task': 'beat_taskmeta',
82 83 'group': 'beat_groupmeta',
83 84 }
84 85 }
85 86
86 87
87 88 preload_option_ini = click.Option(
88 89 ('--ini',),
89 90 help='Path to ini configuration file.'
90 91 )
91 92
92 93 preload_option_ini_var = click.Option(
93 94 ('--ini-var',),
94 95 help='Comma separated list of key=value to pass to ini.'
95 96 )
96 97
97 98
98 99 def get_logger(obj):
99 100 custom_log = logging.getLogger(
100 101 'rhodecode.task.{}'.format(obj.__class__.__name__))
101 102
102 103 if rhodecode.CELERY_ENABLED:
103 104 try:
104 105 custom_log = obj.get_logger()
105 106 except Exception:
106 107 pass
107 108
108 109 return custom_log
109 110
110 111
111 112 # init main celery app
112 113 celery_app = Celery()
113 114 celery_app.user_options['preload'].add(preload_option_ini)
114 115 celery_app.user_options['preload'].add(preload_option_ini_var)
115 116
116 117
117 118 @signals.setup_logging.connect
118 119 def setup_logging_callback(**kwargs):
119 120
120 121 if 'RC_INI_FILE' in celery_app.conf:
121 122 ini_file = celery_app.conf['RC_INI_FILE']
122 123 else:
123 124 ini_file = celery_app.user_options['RC_INI_FILE']
124 125
125 126 setup_logging(ini_file)
126 127
127 128
128 129 @signals.user_preload_options.connect
129 130 def on_preload_parsed(options, **kwargs):
130 131
131 132 ini_file = options['ini']
132 133 ini_vars = options['ini_var']
133 134
134 135 if ini_file is None:
135 136 print('You must provide the --ini argument to start celery')
136 137 exit(-1)
137 138
138 139 options = None
139 140 if ini_vars is not None:
140 141 options = parse_ini_vars(ini_vars)
141 142
142 143 celery_app.conf['RC_INI_FILE'] = ini_file
143 144 celery_app.user_options['RC_INI_FILE'] = ini_file
144 145
145 146 celery_app.conf['RC_INI_OPTIONS'] = options
146 147 celery_app.user_options['RC_INI_OPTIONS'] = options
147 148
148 149 setup_logging(ini_file)
149 150
150 151
151 152 def _init_celery(app_type=''):
152 153 from rhodecode.config.middleware import get_celery_config
153 154
154 155 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
155 156
156 157 ini_file = celery_app.conf['RC_INI_FILE']
157 158 options = celery_app.conf['RC_INI_OPTIONS']
158 159
159 160 env = None
160 161 try:
161 162 env = bootstrap(ini_file, options=options)
162 163 except Exception:
163 log.exception('Failed to bootstrap RhodeCode APP')
164 log.exception('Failed to bootstrap RhodeCode APP. '
165 'Probably there is another error present that prevents from running pyramid app')
164 166
165 167 if not env:
166 raise EnvironmentError(
167 'Failed to load pyramid ENV. '
168 'Probably there is another error present that prevents from running pyramid app')
168 # we use sys.exit here since we need to signal app startup failure for docker to restart the container and re-try
169 sys.exit(1)
169 170
170 171 log.debug('Got Pyramid ENV: %s', env)
171 172
172 173 settings = env['registry'].settings
173 174 celery_settings = get_celery_config(settings)
174 175
175 176 # init and bootstrap StatsdClient
176 177 StatsdClient.setup(settings)
177 178
178 179 setup_celery_app(
179 180 app=env['app'], root=env['root'], request=env['request'],
180 181 registry=env['registry'], closer=env['closer'],
181 182 celery_settings=celery_settings)
182 183
183 184
184 185 @signals.celeryd_init.connect
185 186 def on_celeryd_init(sender=None, conf=None, **kwargs):
186 187 _init_celery('celery worker')
187 188
188 189 # fix the global flag even if it's disabled via .ini file because this
189 190 # is a worker code that doesn't need this to be disabled.
190 191 rhodecode.CELERY_ENABLED = True
191 192
192 193
193 194 @signals.beat_init.connect
194 195 def on_beat_init(sender=None, conf=None, **kwargs):
195 196 _init_celery('celery beat')
196 197
197 198
198 199 @signals.task_prerun.connect
199 200 def task_prerun_signal(task_id, task, args, **kwargs):
200 201 ping_db()
201 202 statsd = StatsdClient.statsd
202 203
203 204 if statsd:
204 205 task_repr = getattr(task, 'name', task)
205 206 statsd.incr('rhodecode_celery_task_total', tags=[
206 207 f'task:{task_repr}',
207 208 'mode:async'
208 209 ])
209 210
210 211
211 212 @signals.task_success.connect
212 213 def task_success_signal(result, **kwargs):
213 214 meta.Session.commit()
214 215 closer = celery_app.conf['PYRAMID_CLOSER']
215 216 if closer:
216 217 closer()
217 218
218 219
219 220 @signals.task_retry.connect
220 221 def task_retry_signal(
221 222 request, reason, einfo, **kwargs):
222 223 meta.Session.remove()
223 224 closer = celery_app.conf['PYRAMID_CLOSER']
224 225 if closer:
225 226 closer()
226 227
227 228
228 229 @signals.task_failure.connect
229 230 def task_failure_signal(
230 231 task_id, exception, args, kwargs, traceback, einfo, **kargs):
231 232
232 233 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
233 234 from rhodecode.lib.exc_tracking import store_exception
234 235 from rhodecode.lib.statsd_client import StatsdClient
235 236
236 237 meta.Session.remove()
237 238
238 239 # simulate sys.exc_info()
239 240 exc_info = (einfo.type, einfo.exception, einfo.tb)
240 241 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
241 242 statsd = StatsdClient.statsd
242 243 if statsd:
243 244 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
244 245 statsd.incr('rhodecode_exception_total',
245 246 tags=["exc_source:celery", "type:{}".format(exc_type)])
246 247
247 248 closer = celery_app.conf['PYRAMID_CLOSER']
248 249 if closer:
249 250 closer()
250 251
251 252
252 253 @signals.task_revoked.connect
253 254 def task_revoked_signal(
254 255 request, terminated, signum, expired, **kwargs):
255 256 closer = celery_app.conf['PYRAMID_CLOSER']
256 257 if closer:
257 258 closer()
258 259
259 260
260 261 class UNSET(object):
261 262 pass
262 263
263 264
264 265 _unset = UNSET()
265 266
266 267
267 268 def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset):
268 269
269 270 if request is not UNSET:
270 271 celery_app.conf.update({'PYRAMID_REQUEST': request})
271 272
272 273 if registry is not UNSET:
273 274 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
274 275
275 276
276 277 def setup_celery_app(app, root, request, registry, closer, celery_settings):
277 278 log.debug('Got custom celery conf: %s', celery_settings)
278 279 celery_config = base_celery_config
279 280 celery_config.update({
280 281 # store celerybeat scheduler db where the .ini file is
281 282 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
282 283 })
283 284
284 285 celery_config.update(celery_settings)
285 286 celery_app.config_from_object(celery_config)
286 287
287 288 celery_app.conf.update({'PYRAMID_APP': app})
288 289 celery_app.conf.update({'PYRAMID_ROOT': root})
289 290 celery_app.conf.update({'PYRAMID_REQUEST': request})
290 291 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
291 292 celery_app.conf.update({'PYRAMID_CLOSER': closer})
292 293
293 294
294 295 def configure_celery(config, celery_settings):
295 296 """
296 297 Helper that is called from our application creation logic. It gives
297 298 connection info into running webapp and allows execution of tasks from
298 299 RhodeCode itself
299 300 """
300 301 # store some globals into rhodecode
301 302 rhodecode.CELERY_ENABLED = str2bool(
302 303 config.registry.settings.get('use_celery'))
303 304 if rhodecode.CELERY_ENABLED:
304 305 log.info('Configuring celery based on `%s` settings', celery_settings)
305 306 setup_celery_app(
306 307 app=None, root=None, request=None, registry=config.registry,
307 308 closer=None, celery_settings=celery_settings)
308 309
309 310
310 311 def maybe_prepare_env(req):
311 312 environ = {}
312 313 try:
313 314 environ.update({
314 315 'PATH_INFO': req.environ['PATH_INFO'],
315 316 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
316 317 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
317 318 'SERVER_NAME': req.environ['SERVER_NAME'],
318 319 'SERVER_PORT': req.environ['SERVER_PORT'],
319 320 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
320 321 })
321 322 except Exception:
322 323 pass
323 324
324 325 return environ
325 326
326 327
327 328 class RequestContextTask(Task):
328 329 """
329 330 This is a celery task which will create a rhodecode app instance context
330 331 for the task, patch pyramid with the original request
331 332 that created the task and also add the user to the context.
332 333 """
333 334
334 335 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
335 336 link=None, link_error=None, shadow=None, **options):
336 337 """ queue the job to run (we are in web request context here) """
337 338 from rhodecode.lib.base import get_ip_addr
338 339
339 340 req = self.app.conf['PYRAMID_REQUEST']
340 341 if not req:
341 342 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
342 343
343 344 log.debug('Running Task with class: %s. Request Class: %s',
344 345 self.__class__, req.__class__)
345 346
346 347 user_id = 0
347 348
348 349 # web case
349 350 if hasattr(req, 'user'):
350 351 user_id = req.user.user_id
351 352
352 353 # api case
353 354 elif hasattr(req, 'rpc_user'):
354 355 user_id = req.rpc_user.user_id
355 356
356 357 # we hook into kwargs since it is the only way to pass our data to
357 358 # the celery worker
358 359 environ = maybe_prepare_env(req)
359 360 options['headers'] = options.get('headers', {})
360 361 options['headers'].update({
361 362 'rhodecode_proxy_data': {
362 363 'environ': environ,
363 364 'auth_user': {
364 365 'ip_addr': get_ip_addr(req.environ),
365 366 'user_id': user_id
366 367 },
367 368 }
368 369 })
369 370
370 371 return super(RequestContextTask, self).apply_async(
371 372 args, kwargs, task_id, producer, link, link_error, shadow, **options)
General Comments 0
You need to be logged in to leave comments. Login now