##// END OF EJS Templates
celery: ensure celery workers are compatible with python3.11 and also fix CLI option for celery 5.X
super-admin -
r5102:4a1c6bb5 default
parent child Browse files
Show More
@@ -1,98 +1,160 b''
1 1 # Copyright (C) 2016-2023 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 """
20 20 Compatibility patches.
21 21
22 22 Please keep the following principles in mind:
23 23
24 24 * Keep imports local, so that importing this module does not cause too many
25 25 side effects by itself.
26 26
27 27 * Try to make patches idempotent, calling them multiple times should not do
28 28 harm. If that is not possible, ensure that the second call explodes.
29 29
30 30 """
31 31
32 32
33 def inspect_formatargspec():
34
35 import inspect
36 from inspect import formatannotation
37
38 def backport_inspect_formatargspec(
39 args, varargs=None, varkw=None, defaults=None,
40 kwonlyargs=(), kwonlydefaults={}, annotations={},
41 formatarg=str,
42 formatvarargs=lambda name: '*' + name,
43 formatvarkw=lambda name: '**' + name,
44 formatvalue=lambda value: '=' + repr(value),
45 formatreturns=lambda text: ' -> ' + text,
46 formatannotation=formatannotation):
47 """Copy formatargspec from python 3.7 standard library.
48 Python 3 has deprecated formatargspec and requested that Signature
49 be used instead, however this requires a full reimplementation
50 of formatargspec() in terms of creating Parameter objects and such.
51 Instead of introducing all the object-creation overhead and having
52 to reinvent from scratch, just copy their compatibility routine.
53 Utimately we would need to rewrite our "decorator" routine completely
54 which is not really worth it right now, until all Python 2.x support
55 is dropped.
56 """
57
58 def formatargandannotation(arg):
59 result = formatarg(arg)
60 if arg in annotations:
61 result += ': ' + formatannotation(annotations[arg])
62 return result
63
64 specs = []
65 if defaults:
66 firstdefault = len(args) - len(defaults)
67 for i, arg in enumerate(args):
68 spec = formatargandannotation(arg)
69 if defaults and i >= firstdefault:
70 spec = spec + formatvalue(defaults[i - firstdefault])
71 specs.append(spec)
72 if varargs is not None:
73 specs.append(formatvarargs(formatargandannotation(varargs)))
74 else:
75 if kwonlyargs:
76 specs.append('*')
77 if kwonlyargs:
78 for kwonlyarg in kwonlyargs:
79 spec = formatargandannotation(kwonlyarg)
80 if kwonlydefaults and kwonlyarg in kwonlydefaults:
81 spec += formatvalue(kwonlydefaults[kwonlyarg])
82 specs.append(spec)
83 if varkw is not None:
84 specs.append(formatvarkw(formatargandannotation(varkw)))
85 result = '(' + ', '.join(specs) + ')'
86 if 'return' in annotations:
87 result += formatreturns(formatannotation(annotations['return']))
88 return result
89
90 # NOTE: inject for python3.11
91 inspect.formatargspec = backport_inspect_formatargspec
92 return inspect
93
94
33 95 def inspect_getargspec():
34 96 """
35 97 Pyramid rely on inspect.getargspec to lookup the signature of
36 98 view functions. This is not compatible with cython, therefore we replace
37 99 getargspec with a custom version.
38 100 Code is inspired by the inspect module from Python-3.4
39 101 """
40 102 import inspect
41 103
42 104 def _isCython(func):
43 105 """
44 106 Private helper that checks if a function is a cython function.
45 107 """
46 108 return func.__class__.__name__ == 'cython_function_or_method'
47 109
48 110 def unwrap(func):
49 111 """
50 112 Get the object wrapped by *func*.
51 113
52 114 Follows the chain of :attr:`__wrapped__` attributes returning the last
53 115 object in the chain.
54 116
55 117 *stop* is an optional callback accepting an object in the wrapper chain
56 118 as its sole argument that allows the unwrapping to be terminated early
57 119 if the callback returns a true value. If the callback never returns a
58 120 true value, the last object in the chain is returned as usual. For
59 121 example, :func:`signature` uses this to stop unwrapping if any object
60 122 in the chain has a ``__signature__`` attribute defined.
61 123
62 124 :exc:`ValueError` is raised if a cycle is encountered.
63 125 """
64 126 f = func # remember the original func for error reporting
65 127 memo = {id(f)} # Memoise by id to tolerate non-hashable objects
66 128 while hasattr(func, '__wrapped__'):
67 129 func = func.__wrapped__
68 130 id_func = id(func)
69 131 if id_func in memo:
70 132 raise ValueError(f'wrapper loop when unwrapping {f!r}')
71 133 memo.add(id_func)
72 134 return func
73 135
74 136 def custom_getargspec(func):
75 137 """
76 138 Get the names and default values of a function's arguments.
77 139
78 140 A tuple of four things is returned: (args, varargs, varkw, defaults).
79 141 'args' is a list of the argument names (it may contain nested lists).
80 142 'varargs' and 'varkw' are the names of the * and ** arguments or None.
81 143 'defaults' is an n-tuple of the default values of the last n arguments.
82 144 """
83 145
84 146 func = unwrap(func)
85 147
86 148 if inspect.ismethod(func):
87 149 func = func.im_func
88 150 if not inspect.isfunction(func):
89 151 if not _isCython(func):
90 152 raise TypeError('{!r} is not a Python or Cython function'
91 153 .format(func))
92 154 args, varargs, varkw = inspect.getargs(func.func_code)
93 155 return inspect.ArgSpec(args, varargs, varkw, func.func_defaults)
94 156
95 #TODO: fix it and test it on python3.11
96 inspect.getargspec = inspect.getfullargspec #custom_getargspec
157 # NOTE: inject for python3.11
158 inspect.getargspec = inspect.getfullargspec
97 159
98 160 return inspect
@@ -1,355 +1,359 b''
1
2 1 # Copyright (C) 2010-2023 RhodeCode GmbH
3 2 #
4 3 # This program is free software: you can redistribute it and/or modify
5 4 # it under the terms of the GNU Affero General Public License, version 3
6 5 # (only), as published by the Free Software Foundation.
7 6 #
8 7 # This program is distributed in the hope that it will be useful,
9 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 10 # GNU General Public License for more details.
12 11 #
13 12 # You should have received a copy of the GNU Affero General Public License
14 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 14 #
16 15 # This program is dual-licensed. If you wish to learn more about the
17 16 # RhodeCode Enterprise Edition, including its added features, Support services,
18 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 18 """
20 19 Celery loader, run with::
21 20
22 21 celery worker \
23 22 --task-events \
24 23 --beat \
25 24 --autoscale=20,2 \
26 25 --max-tasks-per-child 1 \
27 26 --app rhodecode.lib.celerylib.loader \
28 27 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
29 28 --loglevel DEBUG --ini=.dev/dev.ini
30 29 """
31 import os
30 from rhodecode.config.patches import inspect_getargspec, inspect_formatargspec
31 inspect_getargspec()
32 inspect_formatargspec()
33 # python3.11 inspect patches for backward compat on `paste` code
34
32 35 import logging
33 36 import importlib
34 37
38 import click
35 39 from celery import Celery
36 40 from celery import signals
37 41 from celery import Task
38 from celery import exceptions # pragma: no cover
42 from celery import exceptions # noqa
39 43 from kombu.serialization import register
40 44
41 45 import rhodecode
42 46
43 47 from rhodecode.lib.statsd_client import StatsdClient
44 48 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
45 49 from rhodecode.lib.ext_json import json
46 50 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
47 51 from rhodecode.lib.utils2 import str2bool
48 52 from rhodecode.model import meta
49 53
50 54
51 55 register('json_ext', json.dumps, json.loads,
52 56 content_type='application/x-json-ext',
53 57 content_encoding='utf-8')
54 58
55 59 log = logging.getLogger('celery.rhodecode.loader')
56 60
57 61
58 62 imports = ['rhodecode.lib.celerylib.tasks']
59 63
60 64 try:
61 65 # try if we have EE tasks available
62 66 importlib.import_module('rc_ee')
63 67 imports.append('rc_ee.lib.celerylib.tasks')
64 68 except ImportError:
65 69 pass
66 70
67 71
68 72 base_celery_config = {
69 73 'result_backend': 'rpc://',
70 74 'result_expires': 60 * 60 * 24,
71 75 'result_persistent': True,
72 76 'imports': imports,
73 77 'worker_max_tasks_per_child': 20,
74 78 'accept_content': ['json_ext', 'json'],
75 79 'task_serializer': 'json_ext',
76 80 'result_serializer': 'json_ext',
77 81 'worker_hijack_root_logger': False,
78 82 'database_table_names': {
79 83 'task': 'beat_taskmeta',
80 84 'group': 'beat_groupmeta',
81 85 }
82 86 }
83 87
84 88
85 def add_preload_arguments(parser):
86 parser.add_argument(
87 '--ini', default=None,
89 preload_option_ini = click.Option(
90 ('--ini',),
88 91 help='Path to ini configuration file.'
89 92 )
90 parser.add_argument(
91 '--ini-var', default=None,
93
94 preload_option_ini_var = click.Option(
95 ('--ini-var',),
92 96 help='Comma separated list of key=value to pass to ini.'
93 97 )
94 98
95 99
96 100 def get_logger(obj):
97 101 custom_log = logging.getLogger(
98 102 'rhodecode.task.{}'.format(obj.__class__.__name__))
99 103
100 104 if rhodecode.CELERY_ENABLED:
101 105 try:
102 106 custom_log = obj.get_logger()
103 107 except Exception:
104 108 pass
105 109
106 110 return custom_log
107 111
108 112
109 113 # init main celery app
110 114 celery_app = Celery()
111 celery_app.user_options['preload'].add(add_preload_arguments)
115 celery_app.user_options['preload'].add(preload_option_ini)
116 celery_app.user_options['preload'].add(preload_option_ini_var)
112 117
113 118
114 119 @signals.setup_logging.connect
115 120 def setup_logging_callback(**kwargs):
116 121 ini_file = celery_app.conf['RC_INI_FILE']
117 122 setup_logging(ini_file)
118 123
119 124
120 125 @signals.user_preload_options.connect
121 126 def on_preload_parsed(options, **kwargs):
122 127
123 128 ini_file = options['ini']
124 129 ini_vars = options['ini_var']
125 130
126 131 if ini_file is None:
127 132 print('You must provide the --ini argument to start celery')
128 133 exit(-1)
129 134
130 135 options = None
131 136 if ini_vars is not None:
132 137 options = parse_ini_vars(ini_vars)
133 138
134 139 celery_app.conf['RC_INI_FILE'] = ini_file
135 140 celery_app.conf['RC_INI_OPTIONS'] = options
136 141 setup_logging(ini_file)
137 142
138 143
139 144 def _init_celery(app_type=''):
140 145 from rhodecode.config.middleware import get_celery_config
141 146
142 147 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
143 148
144 149 ini_file = celery_app.conf['RC_INI_FILE']
145 150 options = celery_app.conf['RC_INI_OPTIONS']
146 151
147 152 env = None
148 153 try:
149 154 env = bootstrap(ini_file, options=options)
150 155 except Exception:
151 156 log.exception('Failed to bootstrap RhodeCode APP')
152 157
153 158 if not env:
154 159 raise EnvironmentError(
155 160 'Failed to load pyramid ENV. '
156 161 'Probably there is another error present that prevents from running pyramid app')
157 162
158 163 log.debug('Got Pyramid ENV: %s', env)
159 164
160 165 celery_settings = get_celery_config(env['registry'].settings)
161 166
162 167 setup_celery_app(
163 168 app=env['app'], root=env['root'], request=env['request'],
164 169 registry=env['registry'], closer=env['closer'],
165 170 celery_settings=celery_settings)
166 171
167 172
168 173 @signals.celeryd_init.connect
169 174 def on_celeryd_init(sender=None, conf=None, **kwargs):
170 175 _init_celery('celery worker')
171 176
172 177 # fix the global flag even if it's disabled via .ini file because this
173 178 # is a worker code that doesn't need this to be disabled.
174 179 rhodecode.CELERY_ENABLED = True
175 180
176 181
177 182 @signals.beat_init.connect
178 183 def on_beat_init(sender=None, conf=None, **kwargs):
179 184 _init_celery('celery beat')
180 185
181 186
182 187 @signals.task_prerun.connect
183 188 def task_prerun_signal(task_id, task, args, **kwargs):
184 189 ping_db()
185 190 statsd = StatsdClient.statsd
186 191 if statsd:
187 192 task_repr = getattr(task, 'name', task)
188 193 statsd.incr('rhodecode_celery_task_total', tags=[
189 194 'task:{}'.format(task_repr),
190 195 'mode:async'
191 196 ])
192 197
193 198
194 199 @signals.task_success.connect
195 200 def task_success_signal(result, **kwargs):
196 201 meta.Session.commit()
197 202 closer = celery_app.conf['PYRAMID_CLOSER']
198 203 if closer:
199 204 closer()
200 205
201 206
202
203 207 @signals.task_retry.connect
204 208 def task_retry_signal(
205 209 request, reason, einfo, **kwargs):
206 210 meta.Session.remove()
207 211 closer = celery_app.conf['PYRAMID_CLOSER']
208 212 if closer:
209 213 closer()
210 214
211 215
212 216 @signals.task_failure.connect
213 217 def task_failure_signal(
214 218 task_id, exception, args, kwargs, traceback, einfo, **kargs):
215 219
216 220 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
217 221 from rhodecode.lib.exc_tracking import store_exception
218 222 from rhodecode.lib.statsd_client import StatsdClient
219 223
220 224 meta.Session.remove()
221 225
222 226 # simulate sys.exc_info()
223 227 exc_info = (einfo.type, einfo.exception, einfo.tb)
224 228 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
225 229 statsd = StatsdClient.statsd
226 230 if statsd:
227 231 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
228 232 statsd.incr('rhodecode_exception_total',
229 233 tags=["exc_source:celery", "type:{}".format(exc_type)])
230 234
231 235 closer = celery_app.conf['PYRAMID_CLOSER']
232 236 if closer:
233 237 closer()
234 238
235 239
236 240 @signals.task_revoked.connect
237 241 def task_revoked_signal(
238 242 request, terminated, signum, expired, **kwargs):
239 243 closer = celery_app.conf['PYRAMID_CLOSER']
240 244 if closer:
241 245 closer()
242 246
243 247
244 248 class UNSET(object):
245 249 pass
246 250
247 251
248 252 _unset = UNSET()
249 253
250 254
251 255 def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset):
252 256
253 257 if request is not UNSET:
254 258 celery_app.conf.update({'PYRAMID_REQUEST': request})
255 259
256 260 if registry is not UNSET:
257 261 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
258 262
259 263
260 264 def setup_celery_app(app, root, request, registry, closer, celery_settings):
261 265 log.debug('Got custom celery conf: %s', celery_settings)
262 266 celery_config = base_celery_config
263 267 celery_config.update({
264 268 # store celerybeat scheduler db where the .ini file is
265 269 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
266 270 })
267 271
268 272 celery_config.update(celery_settings)
269 273 celery_app.config_from_object(celery_config)
270 274
271 275 celery_app.conf.update({'PYRAMID_APP': app})
272 276 celery_app.conf.update({'PYRAMID_ROOT': root})
273 277 celery_app.conf.update({'PYRAMID_REQUEST': request})
274 278 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
275 279 celery_app.conf.update({'PYRAMID_CLOSER': closer})
276 280
277 281
278 282 def configure_celery(config, celery_settings):
279 283 """
280 284 Helper that is called from our application creation logic. It gives
281 285 connection info into running webapp and allows execution of tasks from
282 286 RhodeCode itself
283 287 """
284 288 # store some globals into rhodecode
285 289 rhodecode.CELERY_ENABLED = str2bool(
286 290 config.registry.settings.get('use_celery'))
287 291 if rhodecode.CELERY_ENABLED:
288 292 log.info('Configuring celery based on `%s` settings', celery_settings)
289 293 setup_celery_app(
290 294 app=None, root=None, request=None, registry=config.registry,
291 295 closer=None, celery_settings=celery_settings)
292 296
293 297
294 298 def maybe_prepare_env(req):
295 299 environ = {}
296 300 try:
297 301 environ.update({
298 302 'PATH_INFO': req.environ['PATH_INFO'],
299 303 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
300 304 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
301 305 'SERVER_NAME': req.environ['SERVER_NAME'],
302 306 'SERVER_PORT': req.environ['SERVER_PORT'],
303 307 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
304 308 })
305 309 except Exception:
306 310 pass
307 311
308 312 return environ
309 313
310 314
311 315 class RequestContextTask(Task):
312 316 """
313 317 This is a celery task which will create a rhodecode app instance context
314 318 for the task, patch pyramid with the original request
315 319 that created the task and also add the user to the context.
316 320 """
317 321
318 322 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
319 323 link=None, link_error=None, shadow=None, **options):
320 324 """ queue the job to run (we are in web request context here) """
321 325 from rhodecode.lib.base import get_ip_addr
322 326
323 327 req = self.app.conf['PYRAMID_REQUEST']
324 328 if not req:
325 329 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
326 330
327 331 log.debug('Running Task with class: %s. Request Class: %s',
328 332 self.__class__, req.__class__)
329 333
330 334 user_id = 0
331 335
332 336 # web case
333 337 if hasattr(req, 'user'):
334 338 user_id = req.user.user_id
335 339
336 340 # api case
337 341 elif hasattr(req, 'rpc_user'):
338 342 user_id = req.rpc_user.user_id
339 343
340 344 # we hook into kwargs since it is the only way to pass our data to
341 345 # the celery worker
342 346 environ = maybe_prepare_env(req)
343 347 options['headers'] = options.get('headers', {})
344 348 options['headers'].update({
345 349 'rhodecode_proxy_data': {
346 350 'environ': environ,
347 351 'auth_user': {
348 352 'ip_addr': get_ip_addr(req.environ),
349 353 'user_id': user_id
350 354 },
351 355 }
352 356 })
353 357
354 358 return super(RequestContextTask, self).apply_async(
355 359 args, kwargs, task_id, producer, link, link_error, shadow, **options)
General Comments 0
You need to be logged in to leave comments. Login now