##// END OF EJS Templates
celery: ensure celery workers are compatible with python3.11 and also fix CLI option for celery 5.X
super-admin -
r5102:4a1c6bb5 default
parent child Browse files
Show More
@@ -1,98 +1,160 b''
1 # Copyright (C) 2016-2023 RhodeCode GmbH
1 # Copyright (C) 2016-2023 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
18
19 """
19 """
20 Compatibility patches.
20 Compatibility patches.
21
21
22 Please keep the following principles in mind:
22 Please keep the following principles in mind:
23
23
24 * Keep imports local, so that importing this module does not cause too many
24 * Keep imports local, so that importing this module does not cause too many
25 side effects by itself.
25 side effects by itself.
26
26
27 * Try to make patches idempotent, calling them multiple times should not do
27 * Try to make patches idempotent, calling them multiple times should not do
28 harm. If that is not possible, ensure that the second call explodes.
28 harm. If that is not possible, ensure that the second call explodes.
29
29
30 """
30 """
31
31
32
32
33 def inspect_formatargspec():
34
35 import inspect
36 from inspect import formatannotation
37
38 def backport_inspect_formatargspec(
39 args, varargs=None, varkw=None, defaults=None,
40 kwonlyargs=(), kwonlydefaults={}, annotations={},
41 formatarg=str,
42 formatvarargs=lambda name: '*' + name,
43 formatvarkw=lambda name: '**' + name,
44 formatvalue=lambda value: '=' + repr(value),
45 formatreturns=lambda text: ' -> ' + text,
46 formatannotation=formatannotation):
47 """Copy formatargspec from python 3.7 standard library.
48 Python 3 has deprecated formatargspec and requested that Signature
49 be used instead, however this requires a full reimplementation
50 of formatargspec() in terms of creating Parameter objects and such.
51 Instead of introducing all the object-creation overhead and having
52 to reinvent from scratch, just copy their compatibility routine.
53 Utimately we would need to rewrite our "decorator" routine completely
54 which is not really worth it right now, until all Python 2.x support
55 is dropped.
56 """
57
58 def formatargandannotation(arg):
59 result = formatarg(arg)
60 if arg in annotations:
61 result += ': ' + formatannotation(annotations[arg])
62 return result
63
64 specs = []
65 if defaults:
66 firstdefault = len(args) - len(defaults)
67 for i, arg in enumerate(args):
68 spec = formatargandannotation(arg)
69 if defaults and i >= firstdefault:
70 spec = spec + formatvalue(defaults[i - firstdefault])
71 specs.append(spec)
72 if varargs is not None:
73 specs.append(formatvarargs(formatargandannotation(varargs)))
74 else:
75 if kwonlyargs:
76 specs.append('*')
77 if kwonlyargs:
78 for kwonlyarg in kwonlyargs:
79 spec = formatargandannotation(kwonlyarg)
80 if kwonlydefaults and kwonlyarg in kwonlydefaults:
81 spec += formatvalue(kwonlydefaults[kwonlyarg])
82 specs.append(spec)
83 if varkw is not None:
84 specs.append(formatvarkw(formatargandannotation(varkw)))
85 result = '(' + ', '.join(specs) + ')'
86 if 'return' in annotations:
87 result += formatreturns(formatannotation(annotations['return']))
88 return result
89
90 # NOTE: inject for python3.11
91 inspect.formatargspec = backport_inspect_formatargspec
92 return inspect
93
94
33 def inspect_getargspec():
95 def inspect_getargspec():
34 """
96 """
35 Pyramid rely on inspect.getargspec to lookup the signature of
97 Pyramid rely on inspect.getargspec to lookup the signature of
36 view functions. This is not compatible with cython, therefore we replace
98 view functions. This is not compatible with cython, therefore we replace
37 getargspec with a custom version.
99 getargspec with a custom version.
38 Code is inspired by the inspect module from Python-3.4
100 Code is inspired by the inspect module from Python-3.4
39 """
101 """
40 import inspect
102 import inspect
41
103
42 def _isCython(func):
104 def _isCython(func):
43 """
105 """
44 Private helper that checks if a function is a cython function.
106 Private helper that checks if a function is a cython function.
45 """
107 """
46 return func.__class__.__name__ == 'cython_function_or_method'
108 return func.__class__.__name__ == 'cython_function_or_method'
47
109
48 def unwrap(func):
110 def unwrap(func):
49 """
111 """
50 Get the object wrapped by *func*.
112 Get the object wrapped by *func*.
51
113
52 Follows the chain of :attr:`__wrapped__` attributes returning the last
114 Follows the chain of :attr:`__wrapped__` attributes returning the last
53 object in the chain.
115 object in the chain.
54
116
55 *stop* is an optional callback accepting an object in the wrapper chain
117 *stop* is an optional callback accepting an object in the wrapper chain
56 as its sole argument that allows the unwrapping to be terminated early
118 as its sole argument that allows the unwrapping to be terminated early
57 if the callback returns a true value. If the callback never returns a
119 if the callback returns a true value. If the callback never returns a
58 true value, the last object in the chain is returned as usual. For
120 true value, the last object in the chain is returned as usual. For
59 example, :func:`signature` uses this to stop unwrapping if any object
121 example, :func:`signature` uses this to stop unwrapping if any object
60 in the chain has a ``__signature__`` attribute defined.
122 in the chain has a ``__signature__`` attribute defined.
61
123
62 :exc:`ValueError` is raised if a cycle is encountered.
124 :exc:`ValueError` is raised if a cycle is encountered.
63 """
125 """
64 f = func # remember the original func for error reporting
126 f = func # remember the original func for error reporting
65 memo = {id(f)} # Memoise by id to tolerate non-hashable objects
127 memo = {id(f)} # Memoise by id to tolerate non-hashable objects
66 while hasattr(func, '__wrapped__'):
128 while hasattr(func, '__wrapped__'):
67 func = func.__wrapped__
129 func = func.__wrapped__
68 id_func = id(func)
130 id_func = id(func)
69 if id_func in memo:
131 if id_func in memo:
70 raise ValueError(f'wrapper loop when unwrapping {f!r}')
132 raise ValueError(f'wrapper loop when unwrapping {f!r}')
71 memo.add(id_func)
133 memo.add(id_func)
72 return func
134 return func
73
135
74 def custom_getargspec(func):
136 def custom_getargspec(func):
75 """
137 """
76 Get the names and default values of a function's arguments.
138 Get the names and default values of a function's arguments.
77
139
78 A tuple of four things is returned: (args, varargs, varkw, defaults).
140 A tuple of four things is returned: (args, varargs, varkw, defaults).
79 'args' is a list of the argument names (it may contain nested lists).
141 'args' is a list of the argument names (it may contain nested lists).
80 'varargs' and 'varkw' are the names of the * and ** arguments or None.
142 'varargs' and 'varkw' are the names of the * and ** arguments or None.
81 'defaults' is an n-tuple of the default values of the last n arguments.
143 'defaults' is an n-tuple of the default values of the last n arguments.
82 """
144 """
83
145
84 func = unwrap(func)
146 func = unwrap(func)
85
147
86 if inspect.ismethod(func):
148 if inspect.ismethod(func):
87 func = func.im_func
149 func = func.im_func
88 if not inspect.isfunction(func):
150 if not inspect.isfunction(func):
89 if not _isCython(func):
151 if not _isCython(func):
90 raise TypeError('{!r} is not a Python or Cython function'
152 raise TypeError('{!r} is not a Python or Cython function'
91 .format(func))
153 .format(func))
92 args, varargs, varkw = inspect.getargs(func.func_code)
154 args, varargs, varkw = inspect.getargs(func.func_code)
93 return inspect.ArgSpec(args, varargs, varkw, func.func_defaults)
155 return inspect.ArgSpec(args, varargs, varkw, func.func_defaults)
94
156
95 #TODO: fix it and test it on python3.11
157 # NOTE: inject for python3.11
96 inspect.getargspec = inspect.getfullargspec #custom_getargspec
158 inspect.getargspec = inspect.getfullargspec
97
159
98 return inspect
160 return inspect
@@ -1,355 +1,359 b''
1
2 # Copyright (C) 2010-2023 RhodeCode GmbH
1 # Copyright (C) 2010-2023 RhodeCode GmbH
3 #
2 #
4 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
6 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
7 #
6 #
8 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
10 # GNU General Public License for more details.
12 #
11 #
13 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 #
14 #
16 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
17 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 """
18 """
20 Celery loader, run with::
19 Celery loader, run with::
21
20
22 celery worker \
21 celery worker \
23 --task-events \
22 --task-events \
24 --beat \
23 --beat \
25 --autoscale=20,2 \
24 --autoscale=20,2 \
26 --max-tasks-per-child 1 \
25 --max-tasks-per-child 1 \
27 --app rhodecode.lib.celerylib.loader \
26 --app rhodecode.lib.celerylib.loader \
28 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
27 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
29 --loglevel DEBUG --ini=.dev/dev.ini
28 --loglevel DEBUG --ini=.dev/dev.ini
30 """
29 """
31 import os
30 from rhodecode.config.patches import inspect_getargspec, inspect_formatargspec
31 inspect_getargspec()
32 inspect_formatargspec()
33 # python3.11 inspect patches for backward compat on `paste` code
34
32 import logging
35 import logging
33 import importlib
36 import importlib
34
37
38 import click
35 from celery import Celery
39 from celery import Celery
36 from celery import signals
40 from celery import signals
37 from celery import Task
41 from celery import Task
38 from celery import exceptions # pragma: no cover
42 from celery import exceptions # noqa
39 from kombu.serialization import register
43 from kombu.serialization import register
40
44
41 import rhodecode
45 import rhodecode
42
46
43 from rhodecode.lib.statsd_client import StatsdClient
47 from rhodecode.lib.statsd_client import StatsdClient
44 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
48 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
45 from rhodecode.lib.ext_json import json
49 from rhodecode.lib.ext_json import json
46 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
50 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
47 from rhodecode.lib.utils2 import str2bool
51 from rhodecode.lib.utils2 import str2bool
48 from rhodecode.model import meta
52 from rhodecode.model import meta
49
53
50
54
51 register('json_ext', json.dumps, json.loads,
55 register('json_ext', json.dumps, json.loads,
52 content_type='application/x-json-ext',
56 content_type='application/x-json-ext',
53 content_encoding='utf-8')
57 content_encoding='utf-8')
54
58
55 log = logging.getLogger('celery.rhodecode.loader')
59 log = logging.getLogger('celery.rhodecode.loader')
56
60
57
61
58 imports = ['rhodecode.lib.celerylib.tasks']
62 imports = ['rhodecode.lib.celerylib.tasks']
59
63
60 try:
64 try:
61 # try if we have EE tasks available
65 # try if we have EE tasks available
62 importlib.import_module('rc_ee')
66 importlib.import_module('rc_ee')
63 imports.append('rc_ee.lib.celerylib.tasks')
67 imports.append('rc_ee.lib.celerylib.tasks')
64 except ImportError:
68 except ImportError:
65 pass
69 pass
66
70
67
71
68 base_celery_config = {
72 base_celery_config = {
69 'result_backend': 'rpc://',
73 'result_backend': 'rpc://',
70 'result_expires': 60 * 60 * 24,
74 'result_expires': 60 * 60 * 24,
71 'result_persistent': True,
75 'result_persistent': True,
72 'imports': imports,
76 'imports': imports,
73 'worker_max_tasks_per_child': 20,
77 'worker_max_tasks_per_child': 20,
74 'accept_content': ['json_ext', 'json'],
78 'accept_content': ['json_ext', 'json'],
75 'task_serializer': 'json_ext',
79 'task_serializer': 'json_ext',
76 'result_serializer': 'json_ext',
80 'result_serializer': 'json_ext',
77 'worker_hijack_root_logger': False,
81 'worker_hijack_root_logger': False,
78 'database_table_names': {
82 'database_table_names': {
79 'task': 'beat_taskmeta',
83 'task': 'beat_taskmeta',
80 'group': 'beat_groupmeta',
84 'group': 'beat_groupmeta',
81 }
85 }
82 }
86 }
83
87
84
88
85 def add_preload_arguments(parser):
89 preload_option_ini = click.Option(
86 parser.add_argument(
90 ('--ini',),
87 '--ini', default=None,
91 help='Path to ini configuration file.'
88 help='Path to ini configuration file.'
92 )
89 )
93
90 parser.add_argument(
94 preload_option_ini_var = click.Option(
91 '--ini-var', default=None,
95 ('--ini-var',),
92 help='Comma separated list of key=value to pass to ini.'
96 help='Comma separated list of key=value to pass to ini.'
93 )
97 )
94
98
95
99
96 def get_logger(obj):
100 def get_logger(obj):
97 custom_log = logging.getLogger(
101 custom_log = logging.getLogger(
98 'rhodecode.task.{}'.format(obj.__class__.__name__))
102 'rhodecode.task.{}'.format(obj.__class__.__name__))
99
103
100 if rhodecode.CELERY_ENABLED:
104 if rhodecode.CELERY_ENABLED:
101 try:
105 try:
102 custom_log = obj.get_logger()
106 custom_log = obj.get_logger()
103 except Exception:
107 except Exception:
104 pass
108 pass
105
109
106 return custom_log
110 return custom_log
107
111
108
112
109 # init main celery app
113 # init main celery app
110 celery_app = Celery()
114 celery_app = Celery()
111 celery_app.user_options['preload'].add(add_preload_arguments)
115 celery_app.user_options['preload'].add(preload_option_ini)
116 celery_app.user_options['preload'].add(preload_option_ini_var)
112
117
113
118
114 @signals.setup_logging.connect
119 @signals.setup_logging.connect
115 def setup_logging_callback(**kwargs):
120 def setup_logging_callback(**kwargs):
116 ini_file = celery_app.conf['RC_INI_FILE']
121 ini_file = celery_app.conf['RC_INI_FILE']
117 setup_logging(ini_file)
122 setup_logging(ini_file)
118
123
119
124
120 @signals.user_preload_options.connect
125 @signals.user_preload_options.connect
121 def on_preload_parsed(options, **kwargs):
126 def on_preload_parsed(options, **kwargs):
122
127
123 ini_file = options['ini']
128 ini_file = options['ini']
124 ini_vars = options['ini_var']
129 ini_vars = options['ini_var']
125
130
126 if ini_file is None:
131 if ini_file is None:
127 print('You must provide the --ini argument to start celery')
132 print('You must provide the --ini argument to start celery')
128 exit(-1)
133 exit(-1)
129
134
130 options = None
135 options = None
131 if ini_vars is not None:
136 if ini_vars is not None:
132 options = parse_ini_vars(ini_vars)
137 options = parse_ini_vars(ini_vars)
133
138
134 celery_app.conf['RC_INI_FILE'] = ini_file
139 celery_app.conf['RC_INI_FILE'] = ini_file
135 celery_app.conf['RC_INI_OPTIONS'] = options
140 celery_app.conf['RC_INI_OPTIONS'] = options
136 setup_logging(ini_file)
141 setup_logging(ini_file)
137
142
138
143
139 def _init_celery(app_type=''):
144 def _init_celery(app_type=''):
140 from rhodecode.config.middleware import get_celery_config
145 from rhodecode.config.middleware import get_celery_config
141
146
142 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
147 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
143
148
144 ini_file = celery_app.conf['RC_INI_FILE']
149 ini_file = celery_app.conf['RC_INI_FILE']
145 options = celery_app.conf['RC_INI_OPTIONS']
150 options = celery_app.conf['RC_INI_OPTIONS']
146
151
147 env = None
152 env = None
148 try:
153 try:
149 env = bootstrap(ini_file, options=options)
154 env = bootstrap(ini_file, options=options)
150 except Exception:
155 except Exception:
151 log.exception('Failed to bootstrap RhodeCode APP')
156 log.exception('Failed to bootstrap RhodeCode APP')
152
157
153 if not env:
158 if not env:
154 raise EnvironmentError(
159 raise EnvironmentError(
155 'Failed to load pyramid ENV. '
160 'Failed to load pyramid ENV. '
156 'Probably there is another error present that prevents from running pyramid app')
161 'Probably there is another error present that prevents from running pyramid app')
157
162
158 log.debug('Got Pyramid ENV: %s', env)
163 log.debug('Got Pyramid ENV: %s', env)
159
164
160 celery_settings = get_celery_config(env['registry'].settings)
165 celery_settings = get_celery_config(env['registry'].settings)
161
166
162 setup_celery_app(
167 setup_celery_app(
163 app=env['app'], root=env['root'], request=env['request'],
168 app=env['app'], root=env['root'], request=env['request'],
164 registry=env['registry'], closer=env['closer'],
169 registry=env['registry'], closer=env['closer'],
165 celery_settings=celery_settings)
170 celery_settings=celery_settings)
166
171
167
172
168 @signals.celeryd_init.connect
173 @signals.celeryd_init.connect
169 def on_celeryd_init(sender=None, conf=None, **kwargs):
174 def on_celeryd_init(sender=None, conf=None, **kwargs):
170 _init_celery('celery worker')
175 _init_celery('celery worker')
171
176
172 # fix the global flag even if it's disabled via .ini file because this
177 # fix the global flag even if it's disabled via .ini file because this
173 # is a worker code that doesn't need this to be disabled.
178 # is a worker code that doesn't need this to be disabled.
174 rhodecode.CELERY_ENABLED = True
179 rhodecode.CELERY_ENABLED = True
175
180
176
181
177 @signals.beat_init.connect
182 @signals.beat_init.connect
178 def on_beat_init(sender=None, conf=None, **kwargs):
183 def on_beat_init(sender=None, conf=None, **kwargs):
179 _init_celery('celery beat')
184 _init_celery('celery beat')
180
185
181
186
182 @signals.task_prerun.connect
187 @signals.task_prerun.connect
183 def task_prerun_signal(task_id, task, args, **kwargs):
188 def task_prerun_signal(task_id, task, args, **kwargs):
184 ping_db()
189 ping_db()
185 statsd = StatsdClient.statsd
190 statsd = StatsdClient.statsd
186 if statsd:
191 if statsd:
187 task_repr = getattr(task, 'name', task)
192 task_repr = getattr(task, 'name', task)
188 statsd.incr('rhodecode_celery_task_total', tags=[
193 statsd.incr('rhodecode_celery_task_total', tags=[
189 'task:{}'.format(task_repr),
194 'task:{}'.format(task_repr),
190 'mode:async'
195 'mode:async'
191 ])
196 ])
192
197
193
198
194 @signals.task_success.connect
199 @signals.task_success.connect
195 def task_success_signal(result, **kwargs):
200 def task_success_signal(result, **kwargs):
196 meta.Session.commit()
201 meta.Session.commit()
197 closer = celery_app.conf['PYRAMID_CLOSER']
202 closer = celery_app.conf['PYRAMID_CLOSER']
198 if closer:
203 if closer:
199 closer()
204 closer()
200
205
201
206
202
203 @signals.task_retry.connect
207 @signals.task_retry.connect
204 def task_retry_signal(
208 def task_retry_signal(
205 request, reason, einfo, **kwargs):
209 request, reason, einfo, **kwargs):
206 meta.Session.remove()
210 meta.Session.remove()
207 closer = celery_app.conf['PYRAMID_CLOSER']
211 closer = celery_app.conf['PYRAMID_CLOSER']
208 if closer:
212 if closer:
209 closer()
213 closer()
210
214
211
215
212 @signals.task_failure.connect
216 @signals.task_failure.connect
213 def task_failure_signal(
217 def task_failure_signal(
214 task_id, exception, args, kwargs, traceback, einfo, **kargs):
218 task_id, exception, args, kwargs, traceback, einfo, **kargs):
215
219
216 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
220 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
217 from rhodecode.lib.exc_tracking import store_exception
221 from rhodecode.lib.exc_tracking import store_exception
218 from rhodecode.lib.statsd_client import StatsdClient
222 from rhodecode.lib.statsd_client import StatsdClient
219
223
220 meta.Session.remove()
224 meta.Session.remove()
221
225
222 # simulate sys.exc_info()
226 # simulate sys.exc_info()
223 exc_info = (einfo.type, einfo.exception, einfo.tb)
227 exc_info = (einfo.type, einfo.exception, einfo.tb)
224 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
228 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
225 statsd = StatsdClient.statsd
229 statsd = StatsdClient.statsd
226 if statsd:
230 if statsd:
227 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
231 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
228 statsd.incr('rhodecode_exception_total',
232 statsd.incr('rhodecode_exception_total',
229 tags=["exc_source:celery", "type:{}".format(exc_type)])
233 tags=["exc_source:celery", "type:{}".format(exc_type)])
230
234
231 closer = celery_app.conf['PYRAMID_CLOSER']
235 closer = celery_app.conf['PYRAMID_CLOSER']
232 if closer:
236 if closer:
233 closer()
237 closer()
234
238
235
239
236 @signals.task_revoked.connect
240 @signals.task_revoked.connect
237 def task_revoked_signal(
241 def task_revoked_signal(
238 request, terminated, signum, expired, **kwargs):
242 request, terminated, signum, expired, **kwargs):
239 closer = celery_app.conf['PYRAMID_CLOSER']
243 closer = celery_app.conf['PYRAMID_CLOSER']
240 if closer:
244 if closer:
241 closer()
245 closer()
242
246
243
247
244 class UNSET(object):
248 class UNSET(object):
245 pass
249 pass
246
250
247
251
248 _unset = UNSET()
252 _unset = UNSET()
249
253
250
254
251 def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset):
255 def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset):
252
256
253 if request is not UNSET:
257 if request is not UNSET:
254 celery_app.conf.update({'PYRAMID_REQUEST': request})
258 celery_app.conf.update({'PYRAMID_REQUEST': request})
255
259
256 if registry is not UNSET:
260 if registry is not UNSET:
257 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
261 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
258
262
259
263
260 def setup_celery_app(app, root, request, registry, closer, celery_settings):
264 def setup_celery_app(app, root, request, registry, closer, celery_settings):
261 log.debug('Got custom celery conf: %s', celery_settings)
265 log.debug('Got custom celery conf: %s', celery_settings)
262 celery_config = base_celery_config
266 celery_config = base_celery_config
263 celery_config.update({
267 celery_config.update({
264 # store celerybeat scheduler db where the .ini file is
268 # store celerybeat scheduler db where the .ini file is
265 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
269 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
266 })
270 })
267
271
268 celery_config.update(celery_settings)
272 celery_config.update(celery_settings)
269 celery_app.config_from_object(celery_config)
273 celery_app.config_from_object(celery_config)
270
274
271 celery_app.conf.update({'PYRAMID_APP': app})
275 celery_app.conf.update({'PYRAMID_APP': app})
272 celery_app.conf.update({'PYRAMID_ROOT': root})
276 celery_app.conf.update({'PYRAMID_ROOT': root})
273 celery_app.conf.update({'PYRAMID_REQUEST': request})
277 celery_app.conf.update({'PYRAMID_REQUEST': request})
274 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
278 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
275 celery_app.conf.update({'PYRAMID_CLOSER': closer})
279 celery_app.conf.update({'PYRAMID_CLOSER': closer})
276
280
277
281
278 def configure_celery(config, celery_settings):
282 def configure_celery(config, celery_settings):
279 """
283 """
280 Helper that is called from our application creation logic. It gives
284 Helper that is called from our application creation logic. It gives
281 connection info into running webapp and allows execution of tasks from
285 connection info into running webapp and allows execution of tasks from
282 RhodeCode itself
286 RhodeCode itself
283 """
287 """
284 # store some globals into rhodecode
288 # store some globals into rhodecode
285 rhodecode.CELERY_ENABLED = str2bool(
289 rhodecode.CELERY_ENABLED = str2bool(
286 config.registry.settings.get('use_celery'))
290 config.registry.settings.get('use_celery'))
287 if rhodecode.CELERY_ENABLED:
291 if rhodecode.CELERY_ENABLED:
288 log.info('Configuring celery based on `%s` settings', celery_settings)
292 log.info('Configuring celery based on `%s` settings', celery_settings)
289 setup_celery_app(
293 setup_celery_app(
290 app=None, root=None, request=None, registry=config.registry,
294 app=None, root=None, request=None, registry=config.registry,
291 closer=None, celery_settings=celery_settings)
295 closer=None, celery_settings=celery_settings)
292
296
293
297
294 def maybe_prepare_env(req):
298 def maybe_prepare_env(req):
295 environ = {}
299 environ = {}
296 try:
300 try:
297 environ.update({
301 environ.update({
298 'PATH_INFO': req.environ['PATH_INFO'],
302 'PATH_INFO': req.environ['PATH_INFO'],
299 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
303 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
300 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
304 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
301 'SERVER_NAME': req.environ['SERVER_NAME'],
305 'SERVER_NAME': req.environ['SERVER_NAME'],
302 'SERVER_PORT': req.environ['SERVER_PORT'],
306 'SERVER_PORT': req.environ['SERVER_PORT'],
303 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
307 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
304 })
308 })
305 except Exception:
309 except Exception:
306 pass
310 pass
307
311
308 return environ
312 return environ
309
313
310
314
311 class RequestContextTask(Task):
315 class RequestContextTask(Task):
312 """
316 """
313 This is a celery task which will create a rhodecode app instance context
317 This is a celery task which will create a rhodecode app instance context
314 for the task, patch pyramid with the original request
318 for the task, patch pyramid with the original request
315 that created the task and also add the user to the context.
319 that created the task and also add the user to the context.
316 """
320 """
317
321
318 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
322 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
319 link=None, link_error=None, shadow=None, **options):
323 link=None, link_error=None, shadow=None, **options):
320 """ queue the job to run (we are in web request context here) """
324 """ queue the job to run (we are in web request context here) """
321 from rhodecode.lib.base import get_ip_addr
325 from rhodecode.lib.base import get_ip_addr
322
326
323 req = self.app.conf['PYRAMID_REQUEST']
327 req = self.app.conf['PYRAMID_REQUEST']
324 if not req:
328 if not req:
325 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
329 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
326
330
327 log.debug('Running Task with class: %s. Request Class: %s',
331 log.debug('Running Task with class: %s. Request Class: %s',
328 self.__class__, req.__class__)
332 self.__class__, req.__class__)
329
333
330 user_id = 0
334 user_id = 0
331
335
332 # web case
336 # web case
333 if hasattr(req, 'user'):
337 if hasattr(req, 'user'):
334 user_id = req.user.user_id
338 user_id = req.user.user_id
335
339
336 # api case
340 # api case
337 elif hasattr(req, 'rpc_user'):
341 elif hasattr(req, 'rpc_user'):
338 user_id = req.rpc_user.user_id
342 user_id = req.rpc_user.user_id
339
343
340 # we hook into kwargs since it is the only way to pass our data to
344 # we hook into kwargs since it is the only way to pass our data to
341 # the celery worker
345 # the celery worker
342 environ = maybe_prepare_env(req)
346 environ = maybe_prepare_env(req)
343 options['headers'] = options.get('headers', {})
347 options['headers'] = options.get('headers', {})
344 options['headers'].update({
348 options['headers'].update({
345 'rhodecode_proxy_data': {
349 'rhodecode_proxy_data': {
346 'environ': environ,
350 'environ': environ,
347 'auth_user': {
351 'auth_user': {
348 'ip_addr': get_ip_addr(req.environ),
352 'ip_addr': get_ip_addr(req.environ),
349 'user_id': user_id
353 'user_id': user_id
350 },
354 },
351 }
355 }
352 })
356 })
353
357
354 return super(RequestContextTask, self).apply_async(
358 return super(RequestContextTask, self).apply_async(
355 args, kwargs, task_id, producer, link, link_error, shadow, **options)
359 args, kwargs, task_id, producer, link, link_error, shadow, **options)
General Comments 0
You need to be logged in to leave comments. Login now