##// END OF EJS Templates
fix(celery): remove warning about retry connection flag
super-admin -
r5291:c778c694 default
parent child Browse files
Show More
@@ -1,373 +1,374 b''
1 # Copyright (C) 2010-2023 RhodeCode GmbH
1 # Copyright (C) 2010-2023 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 """
18 """
19 Celery loader, run with::
19 Celery loader, run with::
20
20
21 celery worker \
21 celery worker \
22 --task-events \
22 --task-events \
23 --beat \
23 --beat \
24 --autoscale=20,2 \
24 --autoscale=20,2 \
25 --max-tasks-per-child 1 \
25 --max-tasks-per-child 1 \
26 --app rhodecode.lib.celerylib.loader \
26 --app rhodecode.lib.celerylib.loader \
27 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
27 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
28 --loglevel DEBUG --ini=.dev/dev.ini
28 --loglevel DEBUG --ini=.dev/dev.ini
29 """
29 """
30 from rhodecode.config.patches import inspect_getargspec, inspect_formatargspec
30 from rhodecode.config.patches import inspect_getargspec, inspect_formatargspec
31 inspect_getargspec()
31 inspect_getargspec()
32 inspect_formatargspec()
32 inspect_formatargspec()
33 # python3.11 inspect patches for backward compat on `paste` code
33 # python3.11 inspect patches for backward compat on `paste` code
34
34
35 import logging
35 import logging
36 import importlib
36 import importlib
37
37
38 import click
38 import click
39 from celery import Celery
39 from celery import Celery
40 from celery import signals
40 from celery import signals
41 from celery import Task
41 from celery import Task
42 from celery import exceptions # noqa
42 from celery import exceptions # noqa
43 from kombu.serialization import register
43 from kombu.serialization import register
44
44
45 import rhodecode
45 import rhodecode
46
46
47 from rhodecode.lib.statsd_client import StatsdClient
47 from rhodecode.lib.statsd_client import StatsdClient
48 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
48 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
49 from rhodecode.lib.ext_json import json
49 from rhodecode.lib.ext_json import json
50 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
50 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
51 from rhodecode.lib.utils2 import str2bool
51 from rhodecode.lib.utils2 import str2bool
52 from rhodecode.model import meta
52 from rhodecode.model import meta
53
53
54
54
55 register('json_ext', json.dumps, json.loads,
55 register('json_ext', json.dumps, json.loads,
56 content_type='application/x-json-ext',
56 content_type='application/x-json-ext',
57 content_encoding='utf-8')
57 content_encoding='utf-8')
58
58
59 log = logging.getLogger('celery.rhodecode.loader')
59 log = logging.getLogger('celery.rhodecode.loader')
60
60
61
61
62 imports = ['rhodecode.lib.celerylib.tasks']
62 imports = ['rhodecode.lib.celerylib.tasks']
63
63
64 try:
64 try:
65 # try if we have EE tasks available
65 # try if we have EE tasks available
66 importlib.import_module('rc_ee')
66 importlib.import_module('rc_ee')
67 imports.append('rc_ee.lib.celerylib.tasks')
67 imports.append('rc_ee.lib.celerylib.tasks')
68 except ImportError:
68 except ImportError:
69 pass
69 pass
70
70
71
71
72 base_celery_config = {
72 base_celery_config = {
73 'result_backend': 'rpc://',
73 'result_backend': 'rpc://',
74 'result_expires': 60 * 60 * 24,
74 'result_expires': 60 * 60 * 24,
75 'result_persistent': True,
75 'result_persistent': True,
76 'imports': imports,
76 'imports': imports,
77 'worker_max_tasks_per_child': 20,
77 'worker_max_tasks_per_child': 20,
78 'accept_content': ['json_ext', 'json'],
78 'accept_content': ['json_ext', 'json'],
79 'task_serializer': 'json_ext',
79 'task_serializer': 'json_ext',
80 'result_serializer': 'json_ext',
80 'result_serializer': 'json_ext',
81 'worker_hijack_root_logger': False,
81 'worker_hijack_root_logger': False,
82 'broker_connection_retry_on_startup': True,
82 'database_table_names': {
83 'database_table_names': {
83 'task': 'beat_taskmeta',
84 'task': 'beat_taskmeta',
84 'group': 'beat_groupmeta',
85 'group': 'beat_groupmeta',
85 }
86 }
86 }
87 }
87
88
88
89
89 preload_option_ini = click.Option(
90 preload_option_ini = click.Option(
90 ('--ini',),
91 ('--ini',),
91 help='Path to ini configuration file.'
92 help='Path to ini configuration file.'
92 )
93 )
93
94
94 preload_option_ini_var = click.Option(
95 preload_option_ini_var = click.Option(
95 ('--ini-var',),
96 ('--ini-var',),
96 help='Comma separated list of key=value to pass to ini.'
97 help='Comma separated list of key=value to pass to ini.'
97 )
98 )
98
99
99
100
100 def get_logger(obj):
101 def get_logger(obj):
101 custom_log = logging.getLogger(
102 custom_log = logging.getLogger(
102 'rhodecode.task.{}'.format(obj.__class__.__name__))
103 'rhodecode.task.{}'.format(obj.__class__.__name__))
103
104
104 if rhodecode.CELERY_ENABLED:
105 if rhodecode.CELERY_ENABLED:
105 try:
106 try:
106 custom_log = obj.get_logger()
107 custom_log = obj.get_logger()
107 except Exception:
108 except Exception:
108 pass
109 pass
109
110
110 return custom_log
111 return custom_log
111
112
112
113
113 # init main celery app
114 # init main celery app
114 celery_app = Celery()
115 celery_app = Celery()
115 celery_app.user_options['preload'].add(preload_option_ini)
116 celery_app.user_options['preload'].add(preload_option_ini)
116 celery_app.user_options['preload'].add(preload_option_ini_var)
117 celery_app.user_options['preload'].add(preload_option_ini_var)
117
118
118
119
119 @signals.setup_logging.connect
120 @signals.setup_logging.connect
120 def setup_logging_callback(**kwargs):
121 def setup_logging_callback(**kwargs):
121
122
122 if 'RC_INI_FILE' in celery_app.conf:
123 if 'RC_INI_FILE' in celery_app.conf:
123 ini_file = celery_app.conf['RC_INI_FILE']
124 ini_file = celery_app.conf['RC_INI_FILE']
124 else:
125 else:
125 ini_file = celery_app.user_options['RC_INI_FILE']
126 ini_file = celery_app.user_options['RC_INI_FILE']
126
127
127 setup_logging(ini_file)
128 setup_logging(ini_file)
128
129
129
130
130 @signals.user_preload_options.connect
131 @signals.user_preload_options.connect
131 def on_preload_parsed(options, **kwargs):
132 def on_preload_parsed(options, **kwargs):
132
133
133 ini_file = options['ini']
134 ini_file = options['ini']
134 ini_vars = options['ini_var']
135 ini_vars = options['ini_var']
135
136
136 if ini_file is None:
137 if ini_file is None:
137 print('You must provide the --ini argument to start celery')
138 print('You must provide the --ini argument to start celery')
138 exit(-1)
139 exit(-1)
139
140
140 options = None
141 options = None
141 if ini_vars is not None:
142 if ini_vars is not None:
142 options = parse_ini_vars(ini_vars)
143 options = parse_ini_vars(ini_vars)
143
144
144 celery_app.conf['RC_INI_FILE'] = ini_file
145 celery_app.conf['RC_INI_FILE'] = ini_file
145 celery_app.user_options['RC_INI_FILE'] = ini_file
146 celery_app.user_options['RC_INI_FILE'] = ini_file
146
147
147 celery_app.conf['RC_INI_OPTIONS'] = options
148 celery_app.conf['RC_INI_OPTIONS'] = options
148 celery_app.user_options['RC_INI_OPTIONS'] = options
149 celery_app.user_options['RC_INI_OPTIONS'] = options
149
150
150 setup_logging(ini_file)
151 setup_logging(ini_file)
151
152
152
153
153 def _init_celery(app_type=''):
154 def _init_celery(app_type=''):
154 from rhodecode.config.middleware import get_celery_config
155 from rhodecode.config.middleware import get_celery_config
155
156
156 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
157 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
157
158
158 ini_file = celery_app.conf['RC_INI_FILE']
159 ini_file = celery_app.conf['RC_INI_FILE']
159 options = celery_app.conf['RC_INI_OPTIONS']
160 options = celery_app.conf['RC_INI_OPTIONS']
160
161
161 env = None
162 env = None
162 try:
163 try:
163 env = bootstrap(ini_file, options=options)
164 env = bootstrap(ini_file, options=options)
164 except Exception:
165 except Exception:
165 log.exception('Failed to bootstrap RhodeCode APP')
166 log.exception('Failed to bootstrap RhodeCode APP')
166
167
167 if not env:
168 if not env:
168 raise EnvironmentError(
169 raise EnvironmentError(
169 'Failed to load pyramid ENV. '
170 'Failed to load pyramid ENV. '
170 'Probably there is another error present that prevents from running pyramid app')
171 'Probably there is another error present that prevents from running pyramid app')
171
172
172 log.debug('Got Pyramid ENV: %s', env)
173 log.debug('Got Pyramid ENV: %s', env)
173
174
174 settings = env['registry'].settings
175 settings = env['registry'].settings
175 celery_settings = get_celery_config(settings)
176 celery_settings = get_celery_config(settings)
176
177
177 # init and bootstrap StatsdClient
178 # init and bootstrap StatsdClient
178 StatsdClient.setup(settings)
179 StatsdClient.setup(settings)
179
180
180 setup_celery_app(
181 setup_celery_app(
181 app=env['app'], root=env['root'], request=env['request'],
182 app=env['app'], root=env['root'], request=env['request'],
182 registry=env['registry'], closer=env['closer'],
183 registry=env['registry'], closer=env['closer'],
183 celery_settings=celery_settings)
184 celery_settings=celery_settings)
184
185
185
186
186 @signals.celeryd_init.connect
187 @signals.celeryd_init.connect
187 def on_celeryd_init(sender=None, conf=None, **kwargs):
188 def on_celeryd_init(sender=None, conf=None, **kwargs):
188 _init_celery('celery worker')
189 _init_celery('celery worker')
189
190
190 # fix the global flag even if it's disabled via .ini file because this
191 # fix the global flag even if it's disabled via .ini file because this
191 # is a worker code that doesn't need this to be disabled.
192 # is a worker code that doesn't need this to be disabled.
192 rhodecode.CELERY_ENABLED = True
193 rhodecode.CELERY_ENABLED = True
193
194
194
195
195 @signals.beat_init.connect
196 @signals.beat_init.connect
196 def on_beat_init(sender=None, conf=None, **kwargs):
197 def on_beat_init(sender=None, conf=None, **kwargs):
197 _init_celery('celery beat')
198 _init_celery('celery beat')
198
199
199
200
200 @signals.task_prerun.connect
201 @signals.task_prerun.connect
201 def task_prerun_signal(task_id, task, args, **kwargs):
202 def task_prerun_signal(task_id, task, args, **kwargs):
202 ping_db()
203 ping_db()
203 statsd = StatsdClient.statsd
204 statsd = StatsdClient.statsd
204
205
205 if statsd:
206 if statsd:
206 task_repr = getattr(task, 'name', task)
207 task_repr = getattr(task, 'name', task)
207 statsd.incr('rhodecode_celery_task_total', tags=[
208 statsd.incr('rhodecode_celery_task_total', tags=[
208 f'task:{task_repr}',
209 f'task:{task_repr}',
209 'mode:async'
210 'mode:async'
210 ])
211 ])
211
212
212
213
213 @signals.task_success.connect
214 @signals.task_success.connect
214 def task_success_signal(result, **kwargs):
215 def task_success_signal(result, **kwargs):
215 meta.Session.commit()
216 meta.Session.commit()
216 closer = celery_app.conf['PYRAMID_CLOSER']
217 closer = celery_app.conf['PYRAMID_CLOSER']
217 if closer:
218 if closer:
218 closer()
219 closer()
219
220
220
221
221 @signals.task_retry.connect
222 @signals.task_retry.connect
222 def task_retry_signal(
223 def task_retry_signal(
223 request, reason, einfo, **kwargs):
224 request, reason, einfo, **kwargs):
224 meta.Session.remove()
225 meta.Session.remove()
225 closer = celery_app.conf['PYRAMID_CLOSER']
226 closer = celery_app.conf['PYRAMID_CLOSER']
226 if closer:
227 if closer:
227 closer()
228 closer()
228
229
229
230
230 @signals.task_failure.connect
231 @signals.task_failure.connect
231 def task_failure_signal(
232 def task_failure_signal(
232 task_id, exception, args, kwargs, traceback, einfo, **kargs):
233 task_id, exception, args, kwargs, traceback, einfo, **kargs):
233
234
234 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
235 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
235 from rhodecode.lib.exc_tracking import store_exception
236 from rhodecode.lib.exc_tracking import store_exception
236 from rhodecode.lib.statsd_client import StatsdClient
237 from rhodecode.lib.statsd_client import StatsdClient
237
238
238 meta.Session.remove()
239 meta.Session.remove()
239
240
240 # simulate sys.exc_info()
241 # simulate sys.exc_info()
241 exc_info = (einfo.type, einfo.exception, einfo.tb)
242 exc_info = (einfo.type, einfo.exception, einfo.tb)
242 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
243 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
243 statsd = StatsdClient.statsd
244 statsd = StatsdClient.statsd
244 if statsd:
245 if statsd:
245 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
246 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
246 statsd.incr('rhodecode_exception_total',
247 statsd.incr('rhodecode_exception_total',
247 tags=["exc_source:celery", "type:{}".format(exc_type)])
248 tags=["exc_source:celery", "type:{}".format(exc_type)])
248
249
249 closer = celery_app.conf['PYRAMID_CLOSER']
250 closer = celery_app.conf['PYRAMID_CLOSER']
250 if closer:
251 if closer:
251 closer()
252 closer()
252
253
253
254
254 @signals.task_revoked.connect
255 @signals.task_revoked.connect
255 def task_revoked_signal(
256 def task_revoked_signal(
256 request, terminated, signum, expired, **kwargs):
257 request, terminated, signum, expired, **kwargs):
257 closer = celery_app.conf['PYRAMID_CLOSER']
258 closer = celery_app.conf['PYRAMID_CLOSER']
258 if closer:
259 if closer:
259 closer()
260 closer()
260
261
261
262
262 class UNSET(object):
263 class UNSET(object):
263 pass
264 pass
264
265
265
266
266 _unset = UNSET()
267 _unset = UNSET()
267
268
268
269
269 def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset):
270 def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset):
270
271
271 if request is not UNSET:
272 if request is not UNSET:
272 celery_app.conf.update({'PYRAMID_REQUEST': request})
273 celery_app.conf.update({'PYRAMID_REQUEST': request})
273
274
274 if registry is not UNSET:
275 if registry is not UNSET:
275 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
276 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
276
277
277
278
278 def setup_celery_app(app, root, request, registry, closer, celery_settings):
279 def setup_celery_app(app, root, request, registry, closer, celery_settings):
279 log.debug('Got custom celery conf: %s', celery_settings)
280 log.debug('Got custom celery conf: %s', celery_settings)
280 celery_config = base_celery_config
281 celery_config = base_celery_config
281 celery_config.update({
282 celery_config.update({
282 # store celerybeat scheduler db where the .ini file is
283 # store celerybeat scheduler db where the .ini file is
283 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
284 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
284 })
285 })
285
286
286 celery_config.update(celery_settings)
287 celery_config.update(celery_settings)
287 celery_app.config_from_object(celery_config)
288 celery_app.config_from_object(celery_config)
288
289
289 celery_app.conf.update({'PYRAMID_APP': app})
290 celery_app.conf.update({'PYRAMID_APP': app})
290 celery_app.conf.update({'PYRAMID_ROOT': root})
291 celery_app.conf.update({'PYRAMID_ROOT': root})
291 celery_app.conf.update({'PYRAMID_REQUEST': request})
292 celery_app.conf.update({'PYRAMID_REQUEST': request})
292 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
293 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
293 celery_app.conf.update({'PYRAMID_CLOSER': closer})
294 celery_app.conf.update({'PYRAMID_CLOSER': closer})
294
295
295
296
296 def configure_celery(config, celery_settings):
297 def configure_celery(config, celery_settings):
297 """
298 """
298 Helper that is called from our application creation logic. It gives
299 Helper that is called from our application creation logic. It gives
299 connection info into running webapp and allows execution of tasks from
300 connection info into running webapp and allows execution of tasks from
300 RhodeCode itself
301 RhodeCode itself
301 """
302 """
302 # store some globals into rhodecode
303 # store some globals into rhodecode
303 rhodecode.CELERY_ENABLED = str2bool(
304 rhodecode.CELERY_ENABLED = str2bool(
304 config.registry.settings.get('use_celery'))
305 config.registry.settings.get('use_celery'))
305 if rhodecode.CELERY_ENABLED:
306 if rhodecode.CELERY_ENABLED:
306 log.info('Configuring celery based on `%s` settings', celery_settings)
307 log.info('Configuring celery based on `%s` settings', celery_settings)
307 setup_celery_app(
308 setup_celery_app(
308 app=None, root=None, request=None, registry=config.registry,
309 app=None, root=None, request=None, registry=config.registry,
309 closer=None, celery_settings=celery_settings)
310 closer=None, celery_settings=celery_settings)
310
311
311
312
312 def maybe_prepare_env(req):
313 def maybe_prepare_env(req):
313 environ = {}
314 environ = {}
314 try:
315 try:
315 environ.update({
316 environ.update({
316 'PATH_INFO': req.environ['PATH_INFO'],
317 'PATH_INFO': req.environ['PATH_INFO'],
317 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
318 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
318 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
319 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
319 'SERVER_NAME': req.environ['SERVER_NAME'],
320 'SERVER_NAME': req.environ['SERVER_NAME'],
320 'SERVER_PORT': req.environ['SERVER_PORT'],
321 'SERVER_PORT': req.environ['SERVER_PORT'],
321 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
322 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
322 })
323 })
323 except Exception:
324 except Exception:
324 pass
325 pass
325
326
326 return environ
327 return environ
327
328
328
329
329 class RequestContextTask(Task):
330 class RequestContextTask(Task):
330 """
331 """
331 This is a celery task which will create a rhodecode app instance context
332 This is a celery task which will create a rhodecode app instance context
332 for the task, patch pyramid with the original request
333 for the task, patch pyramid with the original request
333 that created the task and also add the user to the context.
334 that created the task and also add the user to the context.
334 """
335 """
335
336
336 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
337 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
337 link=None, link_error=None, shadow=None, **options):
338 link=None, link_error=None, shadow=None, **options):
338 """ queue the job to run (we are in web request context here) """
339 """ queue the job to run (we are in web request context here) """
339 from rhodecode.lib.base import get_ip_addr
340 from rhodecode.lib.base import get_ip_addr
340
341
341 req = self.app.conf['PYRAMID_REQUEST']
342 req = self.app.conf['PYRAMID_REQUEST']
342 if not req:
343 if not req:
343 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
344 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
344
345
345 log.debug('Running Task with class: %s. Request Class: %s',
346 log.debug('Running Task with class: %s. Request Class: %s',
346 self.__class__, req.__class__)
347 self.__class__, req.__class__)
347
348
348 user_id = 0
349 user_id = 0
349
350
350 # web case
351 # web case
351 if hasattr(req, 'user'):
352 if hasattr(req, 'user'):
352 user_id = req.user.user_id
353 user_id = req.user.user_id
353
354
354 # api case
355 # api case
355 elif hasattr(req, 'rpc_user'):
356 elif hasattr(req, 'rpc_user'):
356 user_id = req.rpc_user.user_id
357 user_id = req.rpc_user.user_id
357
358
358 # we hook into kwargs since it is the only way to pass our data to
359 # we hook into kwargs since it is the only way to pass our data to
359 # the celery worker
360 # the celery worker
360 environ = maybe_prepare_env(req)
361 environ = maybe_prepare_env(req)
361 options['headers'] = options.get('headers', {})
362 options['headers'] = options.get('headers', {})
362 options['headers'].update({
363 options['headers'].update({
363 'rhodecode_proxy_data': {
364 'rhodecode_proxy_data': {
364 'environ': environ,
365 'environ': environ,
365 'auth_user': {
366 'auth_user': {
366 'ip_addr': get_ip_addr(req.environ),
367 'ip_addr': get_ip_addr(req.environ),
367 'user_id': user_id
368 'user_id': user_id
368 },
369 },
369 }
370 }
370 })
371 })
371
372
372 return super(RequestContextTask, self).apply_async(
373 return super(RequestContextTask, self).apply_async(
373 args, kwargs, task_id, producer, link, link_error, shadow, **options)
374 args, kwargs, task_id, producer, link, link_error, shadow, **options)
General Comments 0
You need to be logged in to leave comments. Login now