##// END OF EJS Templates
fix(celery): signal startup failure with sys.exit to recycle celery worker on errors
super-admin -
r5408:fcfc4234 v5.0.1 stable
parent child Browse files
Show More
@@ -1,371 +1,372 b''
1 # Copyright (C) 2010-2023 RhodeCode GmbH
1 # Copyright (C) 2010-2023 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 """
18 """
19 Celery loader, run with::
19 Celery loader, run with::
20
20
21 celery worker \
21 celery worker \
22 --task-events \
22 --task-events \
23 --beat \
23 --beat \
24 --autoscale=20,2 \
24 --autoscale=20,2 \
25 --max-tasks-per-child 1 \
25 --max-tasks-per-child 1 \
26 --app rhodecode.lib.celerylib.loader \
26 --app rhodecode.lib.celerylib.loader \
27 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
27 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
28 --loglevel DEBUG --ini=.dev/dev.ini
28 --loglevel DEBUG --ini=.dev/dev.ini
29 """
29 """
30 from rhodecode.config.patches import inspect_getargspec, inspect_formatargspec
30 from rhodecode.config.patches import inspect_getargspec, inspect_formatargspec
31 inspect_getargspec()
31 inspect_getargspec()
32 inspect_formatargspec()
32 inspect_formatargspec()
33 # python3.11 inspect patches for backward compat on `paste` code
33 # python3.11 inspect patches for backward compat on `paste` code
34
34
35 import sys
35 import logging
36 import logging
36 import importlib
37 import importlib
37
38
38 import click
39 import click
39 from celery import Celery
40 from celery import Celery
40 from celery import signals
41 from celery import signals
41 from celery import Task
42 from celery import Task
42 from celery import exceptions # noqa
43 from celery import exceptions # noqa
43
44
44 import rhodecode
45 import rhodecode
45
46
46 from rhodecode.lib.statsd_client import StatsdClient
47 from rhodecode.lib.statsd_client import StatsdClient
47 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
48 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
48 from rhodecode.lib.ext_json import json
49 from rhodecode.lib.ext_json import json
49 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
50 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
50 from rhodecode.lib.utils2 import str2bool
51 from rhodecode.lib.utils2 import str2bool
51 from rhodecode.model import meta
52 from rhodecode.model import meta
52
53
53 log = logging.getLogger('celery.rhodecode.loader')
54 log = logging.getLogger('celery.rhodecode.loader')
54
55
55
56
56 imports = ['rhodecode.lib.celerylib.tasks']
57 imports = ['rhodecode.lib.celerylib.tasks']
57
58
58 try:
59 try:
59 # try if we have EE tasks available
60 # try if we have EE tasks available
60 importlib.import_module('rc_ee')
61 importlib.import_module('rc_ee')
61 imports.append('rc_ee.lib.celerylib.tasks')
62 imports.append('rc_ee.lib.celerylib.tasks')
62 except ImportError:
63 except ImportError:
63 pass
64 pass
64
65
65
66
66 base_celery_config = {
67 base_celery_config = {
67 'result_backend': 'rpc://',
68 'result_backend': 'rpc://',
68 'result_expires': 60 * 60 * 24,
69 'result_expires': 60 * 60 * 24,
69 'result_persistent': True,
70 'result_persistent': True,
70 'imports': imports,
71 'imports': imports,
71 'worker_max_tasks_per_child': 100,
72 'worker_max_tasks_per_child': 100,
72 'worker_hijack_root_logger': False,
73 'worker_hijack_root_logger': False,
73 'worker_prefetch_multiplier': 1,
74 'worker_prefetch_multiplier': 1,
74 'task_serializer': 'json',
75 'task_serializer': 'json',
75 'accept_content': ['json', 'msgpack'],
76 'accept_content': ['json', 'msgpack'],
76 'result_serializer': 'json',
77 'result_serializer': 'json',
77 'result_accept_content': ['json', 'msgpack'],
78 'result_accept_content': ['json', 'msgpack'],
78
79
79 'broker_connection_retry_on_startup': True,
80 'broker_connection_retry_on_startup': True,
80 'database_table_names': {
81 'database_table_names': {
81 'task': 'beat_taskmeta',
82 'task': 'beat_taskmeta',
82 'group': 'beat_groupmeta',
83 'group': 'beat_groupmeta',
83 }
84 }
84 }
85 }
85
86
86
87
87 preload_option_ini = click.Option(
88 preload_option_ini = click.Option(
88 ('--ini',),
89 ('--ini',),
89 help='Path to ini configuration file.'
90 help='Path to ini configuration file.'
90 )
91 )
91
92
92 preload_option_ini_var = click.Option(
93 preload_option_ini_var = click.Option(
93 ('--ini-var',),
94 ('--ini-var',),
94 help='Comma separated list of key=value to pass to ini.'
95 help='Comma separated list of key=value to pass to ini.'
95 )
96 )
96
97
97
98
98 def get_logger(obj):
99 def get_logger(obj):
99 custom_log = logging.getLogger(
100 custom_log = logging.getLogger(
100 'rhodecode.task.{}'.format(obj.__class__.__name__))
101 'rhodecode.task.{}'.format(obj.__class__.__name__))
101
102
102 if rhodecode.CELERY_ENABLED:
103 if rhodecode.CELERY_ENABLED:
103 try:
104 try:
104 custom_log = obj.get_logger()
105 custom_log = obj.get_logger()
105 except Exception:
106 except Exception:
106 pass
107 pass
107
108
108 return custom_log
109 return custom_log
109
110
110
111
111 # init main celery app
112 # init main celery app
112 celery_app = Celery()
113 celery_app = Celery()
113 celery_app.user_options['preload'].add(preload_option_ini)
114 celery_app.user_options['preload'].add(preload_option_ini)
114 celery_app.user_options['preload'].add(preload_option_ini_var)
115 celery_app.user_options['preload'].add(preload_option_ini_var)
115
116
116
117
117 @signals.setup_logging.connect
118 @signals.setup_logging.connect
118 def setup_logging_callback(**kwargs):
119 def setup_logging_callback(**kwargs):
119
120
120 if 'RC_INI_FILE' in celery_app.conf:
121 if 'RC_INI_FILE' in celery_app.conf:
121 ini_file = celery_app.conf['RC_INI_FILE']
122 ini_file = celery_app.conf['RC_INI_FILE']
122 else:
123 else:
123 ini_file = celery_app.user_options['RC_INI_FILE']
124 ini_file = celery_app.user_options['RC_INI_FILE']
124
125
125 setup_logging(ini_file)
126 setup_logging(ini_file)
126
127
127
128
128 @signals.user_preload_options.connect
129 @signals.user_preload_options.connect
129 def on_preload_parsed(options, **kwargs):
130 def on_preload_parsed(options, **kwargs):
130
131
131 ini_file = options['ini']
132 ini_file = options['ini']
132 ini_vars = options['ini_var']
133 ini_vars = options['ini_var']
133
134
134 if ini_file is None:
135 if ini_file is None:
135 print('You must provide the --ini argument to start celery')
136 print('You must provide the --ini argument to start celery')
136 exit(-1)
137 exit(-1)
137
138
138 options = None
139 options = None
139 if ini_vars is not None:
140 if ini_vars is not None:
140 options = parse_ini_vars(ini_vars)
141 options = parse_ini_vars(ini_vars)
141
142
142 celery_app.conf['RC_INI_FILE'] = ini_file
143 celery_app.conf['RC_INI_FILE'] = ini_file
143 celery_app.user_options['RC_INI_FILE'] = ini_file
144 celery_app.user_options['RC_INI_FILE'] = ini_file
144
145
145 celery_app.conf['RC_INI_OPTIONS'] = options
146 celery_app.conf['RC_INI_OPTIONS'] = options
146 celery_app.user_options['RC_INI_OPTIONS'] = options
147 celery_app.user_options['RC_INI_OPTIONS'] = options
147
148
148 setup_logging(ini_file)
149 setup_logging(ini_file)
149
150
150
151
151 def _init_celery(app_type=''):
152 def _init_celery(app_type=''):
152 from rhodecode.config.middleware import get_celery_config
153 from rhodecode.config.middleware import get_celery_config
153
154
154 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
155 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
155
156
156 ini_file = celery_app.conf['RC_INI_FILE']
157 ini_file = celery_app.conf['RC_INI_FILE']
157 options = celery_app.conf['RC_INI_OPTIONS']
158 options = celery_app.conf['RC_INI_OPTIONS']
158
159
159 env = None
160 env = None
160 try:
161 try:
161 env = bootstrap(ini_file, options=options)
162 env = bootstrap(ini_file, options=options)
162 except Exception:
163 except Exception:
163 log.exception('Failed to bootstrap RhodeCode APP')
164 log.exception('Failed to bootstrap RhodeCode APP. '
165 'Probably there is another error present that prevents from running pyramid app')
164
166
165 if not env:
167 if not env:
166 raise EnvironmentError(
168 # we use sys.exit here since we need to signal app startup failure for docker to restart the container and re-try
167 'Failed to load pyramid ENV. '
169 sys.exit(1)
168 'Probably there is another error present that prevents from running pyramid app')
169
170
170 log.debug('Got Pyramid ENV: %s', env)
171 log.debug('Got Pyramid ENV: %s', env)
171
172
172 settings = env['registry'].settings
173 settings = env['registry'].settings
173 celery_settings = get_celery_config(settings)
174 celery_settings = get_celery_config(settings)
174
175
175 # init and bootstrap StatsdClient
176 # init and bootstrap StatsdClient
176 StatsdClient.setup(settings)
177 StatsdClient.setup(settings)
177
178
178 setup_celery_app(
179 setup_celery_app(
179 app=env['app'], root=env['root'], request=env['request'],
180 app=env['app'], root=env['root'], request=env['request'],
180 registry=env['registry'], closer=env['closer'],
181 registry=env['registry'], closer=env['closer'],
181 celery_settings=celery_settings)
182 celery_settings=celery_settings)
182
183
183
184
184 @signals.celeryd_init.connect
185 @signals.celeryd_init.connect
185 def on_celeryd_init(sender=None, conf=None, **kwargs):
186 def on_celeryd_init(sender=None, conf=None, **kwargs):
186 _init_celery('celery worker')
187 _init_celery('celery worker')
187
188
188 # fix the global flag even if it's disabled via .ini file because this
189 # fix the global flag even if it's disabled via .ini file because this
189 # is a worker code that doesn't need this to be disabled.
190 # is a worker code that doesn't need this to be disabled.
190 rhodecode.CELERY_ENABLED = True
191 rhodecode.CELERY_ENABLED = True
191
192
192
193
193 @signals.beat_init.connect
194 @signals.beat_init.connect
194 def on_beat_init(sender=None, conf=None, **kwargs):
195 def on_beat_init(sender=None, conf=None, **kwargs):
195 _init_celery('celery beat')
196 _init_celery('celery beat')
196
197
197
198
198 @signals.task_prerun.connect
199 @signals.task_prerun.connect
199 def task_prerun_signal(task_id, task, args, **kwargs):
200 def task_prerun_signal(task_id, task, args, **kwargs):
200 ping_db()
201 ping_db()
201 statsd = StatsdClient.statsd
202 statsd = StatsdClient.statsd
202
203
203 if statsd:
204 if statsd:
204 task_repr = getattr(task, 'name', task)
205 task_repr = getattr(task, 'name', task)
205 statsd.incr('rhodecode_celery_task_total', tags=[
206 statsd.incr('rhodecode_celery_task_total', tags=[
206 f'task:{task_repr}',
207 f'task:{task_repr}',
207 'mode:async'
208 'mode:async'
208 ])
209 ])
209
210
210
211
211 @signals.task_success.connect
212 @signals.task_success.connect
212 def task_success_signal(result, **kwargs):
213 def task_success_signal(result, **kwargs):
213 meta.Session.commit()
214 meta.Session.commit()
214 closer = celery_app.conf['PYRAMID_CLOSER']
215 closer = celery_app.conf['PYRAMID_CLOSER']
215 if closer:
216 if closer:
216 closer()
217 closer()
217
218
218
219
219 @signals.task_retry.connect
220 @signals.task_retry.connect
220 def task_retry_signal(
221 def task_retry_signal(
221 request, reason, einfo, **kwargs):
222 request, reason, einfo, **kwargs):
222 meta.Session.remove()
223 meta.Session.remove()
223 closer = celery_app.conf['PYRAMID_CLOSER']
224 closer = celery_app.conf['PYRAMID_CLOSER']
224 if closer:
225 if closer:
225 closer()
226 closer()
226
227
227
228
228 @signals.task_failure.connect
229 @signals.task_failure.connect
229 def task_failure_signal(
230 def task_failure_signal(
230 task_id, exception, args, kwargs, traceback, einfo, **kargs):
231 task_id, exception, args, kwargs, traceback, einfo, **kargs):
231
232
232 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
233 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
233 from rhodecode.lib.exc_tracking import store_exception
234 from rhodecode.lib.exc_tracking import store_exception
234 from rhodecode.lib.statsd_client import StatsdClient
235 from rhodecode.lib.statsd_client import StatsdClient
235
236
236 meta.Session.remove()
237 meta.Session.remove()
237
238
238 # simulate sys.exc_info()
239 # simulate sys.exc_info()
239 exc_info = (einfo.type, einfo.exception, einfo.tb)
240 exc_info = (einfo.type, einfo.exception, einfo.tb)
240 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
241 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
241 statsd = StatsdClient.statsd
242 statsd = StatsdClient.statsd
242 if statsd:
243 if statsd:
243 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
244 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
244 statsd.incr('rhodecode_exception_total',
245 statsd.incr('rhodecode_exception_total',
245 tags=["exc_source:celery", "type:{}".format(exc_type)])
246 tags=["exc_source:celery", "type:{}".format(exc_type)])
246
247
247 closer = celery_app.conf['PYRAMID_CLOSER']
248 closer = celery_app.conf['PYRAMID_CLOSER']
248 if closer:
249 if closer:
249 closer()
250 closer()
250
251
251
252
252 @signals.task_revoked.connect
253 @signals.task_revoked.connect
253 def task_revoked_signal(
254 def task_revoked_signal(
254 request, terminated, signum, expired, **kwargs):
255 request, terminated, signum, expired, **kwargs):
255 closer = celery_app.conf['PYRAMID_CLOSER']
256 closer = celery_app.conf['PYRAMID_CLOSER']
256 if closer:
257 if closer:
257 closer()
258 closer()
258
259
259
260
260 class UNSET(object):
261 class UNSET(object):
261 pass
262 pass
262
263
263
264
264 _unset = UNSET()
265 _unset = UNSET()
265
266
266
267
267 def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset):
268 def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset):
268
269
269 if request is not UNSET:
270 if request is not UNSET:
270 celery_app.conf.update({'PYRAMID_REQUEST': request})
271 celery_app.conf.update({'PYRAMID_REQUEST': request})
271
272
272 if registry is not UNSET:
273 if registry is not UNSET:
273 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
274 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
274
275
275
276
276 def setup_celery_app(app, root, request, registry, closer, celery_settings):
277 def setup_celery_app(app, root, request, registry, closer, celery_settings):
277 log.debug('Got custom celery conf: %s', celery_settings)
278 log.debug('Got custom celery conf: %s', celery_settings)
278 celery_config = base_celery_config
279 celery_config = base_celery_config
279 celery_config.update({
280 celery_config.update({
280 # store celerybeat scheduler db where the .ini file is
281 # store celerybeat scheduler db where the .ini file is
281 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
282 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
282 })
283 })
283
284
284 celery_config.update(celery_settings)
285 celery_config.update(celery_settings)
285 celery_app.config_from_object(celery_config)
286 celery_app.config_from_object(celery_config)
286
287
287 celery_app.conf.update({'PYRAMID_APP': app})
288 celery_app.conf.update({'PYRAMID_APP': app})
288 celery_app.conf.update({'PYRAMID_ROOT': root})
289 celery_app.conf.update({'PYRAMID_ROOT': root})
289 celery_app.conf.update({'PYRAMID_REQUEST': request})
290 celery_app.conf.update({'PYRAMID_REQUEST': request})
290 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
291 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
291 celery_app.conf.update({'PYRAMID_CLOSER': closer})
292 celery_app.conf.update({'PYRAMID_CLOSER': closer})
292
293
293
294
294 def configure_celery(config, celery_settings):
295 def configure_celery(config, celery_settings):
295 """
296 """
296 Helper that is called from our application creation logic. It gives
297 Helper that is called from our application creation logic. It gives
297 connection info into running webapp and allows execution of tasks from
298 connection info into running webapp and allows execution of tasks from
298 RhodeCode itself
299 RhodeCode itself
299 """
300 """
300 # store some globals into rhodecode
301 # store some globals into rhodecode
301 rhodecode.CELERY_ENABLED = str2bool(
302 rhodecode.CELERY_ENABLED = str2bool(
302 config.registry.settings.get('use_celery'))
303 config.registry.settings.get('use_celery'))
303 if rhodecode.CELERY_ENABLED:
304 if rhodecode.CELERY_ENABLED:
304 log.info('Configuring celery based on `%s` settings', celery_settings)
305 log.info('Configuring celery based on `%s` settings', celery_settings)
305 setup_celery_app(
306 setup_celery_app(
306 app=None, root=None, request=None, registry=config.registry,
307 app=None, root=None, request=None, registry=config.registry,
307 closer=None, celery_settings=celery_settings)
308 closer=None, celery_settings=celery_settings)
308
309
309
310
310 def maybe_prepare_env(req):
311 def maybe_prepare_env(req):
311 environ = {}
312 environ = {}
312 try:
313 try:
313 environ.update({
314 environ.update({
314 'PATH_INFO': req.environ['PATH_INFO'],
315 'PATH_INFO': req.environ['PATH_INFO'],
315 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
316 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
316 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
317 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
317 'SERVER_NAME': req.environ['SERVER_NAME'],
318 'SERVER_NAME': req.environ['SERVER_NAME'],
318 'SERVER_PORT': req.environ['SERVER_PORT'],
319 'SERVER_PORT': req.environ['SERVER_PORT'],
319 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
320 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
320 })
321 })
321 except Exception:
322 except Exception:
322 pass
323 pass
323
324
324 return environ
325 return environ
325
326
326
327
327 class RequestContextTask(Task):
328 class RequestContextTask(Task):
328 """
329 """
329 This is a celery task which will create a rhodecode app instance context
330 This is a celery task which will create a rhodecode app instance context
330 for the task, patch pyramid with the original request
331 for the task, patch pyramid with the original request
331 that created the task and also add the user to the context.
332 that created the task and also add the user to the context.
332 """
333 """
333
334
334 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
335 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
335 link=None, link_error=None, shadow=None, **options):
336 link=None, link_error=None, shadow=None, **options):
336 """ queue the job to run (we are in web request context here) """
337 """ queue the job to run (we are in web request context here) """
337 from rhodecode.lib.base import get_ip_addr
338 from rhodecode.lib.base import get_ip_addr
338
339
339 req = self.app.conf['PYRAMID_REQUEST']
340 req = self.app.conf['PYRAMID_REQUEST']
340 if not req:
341 if not req:
341 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
342 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
342
343
343 log.debug('Running Task with class: %s. Request Class: %s',
344 log.debug('Running Task with class: %s. Request Class: %s',
344 self.__class__, req.__class__)
345 self.__class__, req.__class__)
345
346
346 user_id = 0
347 user_id = 0
347
348
348 # web case
349 # web case
349 if hasattr(req, 'user'):
350 if hasattr(req, 'user'):
350 user_id = req.user.user_id
351 user_id = req.user.user_id
351
352
352 # api case
353 # api case
353 elif hasattr(req, 'rpc_user'):
354 elif hasattr(req, 'rpc_user'):
354 user_id = req.rpc_user.user_id
355 user_id = req.rpc_user.user_id
355
356
356 # we hook into kwargs since it is the only way to pass our data to
357 # we hook into kwargs since it is the only way to pass our data to
357 # the celery worker
358 # the celery worker
358 environ = maybe_prepare_env(req)
359 environ = maybe_prepare_env(req)
359 options['headers'] = options.get('headers', {})
360 options['headers'] = options.get('headers', {})
360 options['headers'].update({
361 options['headers'].update({
361 'rhodecode_proxy_data': {
362 'rhodecode_proxy_data': {
362 'environ': environ,
363 'environ': environ,
363 'auth_user': {
364 'auth_user': {
364 'ip_addr': get_ip_addr(req.environ),
365 'ip_addr': get_ip_addr(req.environ),
365 'user_id': user_id
366 'user_id': user_id
366 },
367 },
367 }
368 }
368 })
369 })
369
370
370 return super(RequestContextTask, self).apply_async(
371 return super(RequestContextTask, self).apply_async(
371 args, kwargs, task_id, producer, link, link_error, shadow, **options)
372 args, kwargs, task_id, producer, link, link_error, shadow, **options)
General Comments 0
You need to be logged in to leave comments. Login now