##// END OF EJS Templates
celery: use just one instance of UNSET() object
super-admin -
r5019:7f6920ce default
parent child Browse files
Show More
@@ -1,353 +1,356 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2020 RhodeCode GmbH
3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 """
20 """
21 Celery loader, run with::
21 Celery loader, run with::
22
22
23 celery worker \
23 celery worker \
24 --task-events \
24 --task-events \
25 --beat \
25 --beat \
26 --autoscale=20,2 \
26 --autoscale=20,2 \
27 --max-tasks-per-child 1 \
27 --max-tasks-per-child 1 \
28 --app rhodecode.lib.celerylib.loader \
28 --app rhodecode.lib.celerylib.loader \
29 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
29 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
30 --loglevel DEBUG --ini=.dev/dev.ini
30 --loglevel DEBUG --ini=.dev/dev.ini
31 """
31 """
32 import os
32 import os
33 import logging
33 import logging
34 import importlib
34 import importlib
35
35
36 from celery import Celery
36 from celery import Celery
37 from celery import signals
37 from celery import signals
38 from celery import Task
38 from celery import Task
39 from celery import exceptions # pragma: no cover
39 from celery import exceptions # pragma: no cover
40 from kombu.serialization import register
40 from kombu.serialization import register
41
41
42 import rhodecode
42 import rhodecode
43
43
44 from rhodecode.lib.statsd_client import StatsdClient
44 from rhodecode.lib.statsd_client import StatsdClient
45 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
45 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
46 from rhodecode.lib.ext_json import json
46 from rhodecode.lib.ext_json import json
47 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
47 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
48 from rhodecode.lib.utils2 import str2bool
48 from rhodecode.lib.utils2 import str2bool
49 from rhodecode.model import meta
49 from rhodecode.model import meta
50
50
51
51
52 register('json_ext', json.dumps, json.loads,
52 register('json_ext', json.dumps, json.loads,
53 content_type='application/x-json-ext',
53 content_type='application/x-json-ext',
54 content_encoding='utf-8')
54 content_encoding='utf-8')
55
55
56 log = logging.getLogger('celery.rhodecode.loader')
56 log = logging.getLogger('celery.rhodecode.loader')
57
57
58
58
59 imports = ['rhodecode.lib.celerylib.tasks']
59 imports = ['rhodecode.lib.celerylib.tasks']
60
60
61 try:
61 try:
62 # try if we have EE tasks available
62 # try if we have EE tasks available
63 importlib.import_module('rc_ee')
63 importlib.import_module('rc_ee')
64 imports.append('rc_ee.lib.celerylib.tasks')
64 imports.append('rc_ee.lib.celerylib.tasks')
65 except ImportError:
65 except ImportError:
66 pass
66 pass
67
67
68
68
69 base_celery_config = {
69 base_celery_config = {
70 'result_backend': 'rpc://',
70 'result_backend': 'rpc://',
71 'result_expires': 60 * 60 * 24,
71 'result_expires': 60 * 60 * 24,
72 'result_persistent': True,
72 'result_persistent': True,
73 'imports': imports,
73 'imports': imports,
74 'worker_max_tasks_per_child': 20,
74 'worker_max_tasks_per_child': 20,
75 'accept_content': ['json_ext', 'json'],
75 'accept_content': ['json_ext', 'json'],
76 'task_serializer': 'json_ext',
76 'task_serializer': 'json_ext',
77 'result_serializer': 'json_ext',
77 'result_serializer': 'json_ext',
78 'worker_hijack_root_logger': False,
78 'worker_hijack_root_logger': False,
79 'database_table_names': {
79 'database_table_names': {
80 'task': 'beat_taskmeta',
80 'task': 'beat_taskmeta',
81 'group': 'beat_groupmeta',
81 'group': 'beat_groupmeta',
82 }
82 }
83 }
83 }
84
84
85
85
86 def add_preload_arguments(parser):
86 def add_preload_arguments(parser):
87 parser.add_argument(
87 parser.add_argument(
88 '--ini', default=None,
88 '--ini', default=None,
89 help='Path to ini configuration file.'
89 help='Path to ini configuration file.'
90 )
90 )
91 parser.add_argument(
91 parser.add_argument(
92 '--ini-var', default=None,
92 '--ini-var', default=None,
93 help='Comma separated list of key=value to pass to ini.'
93 help='Comma separated list of key=value to pass to ini.'
94 )
94 )
95
95
96
96
97 def get_logger(obj):
97 def get_logger(obj):
98 custom_log = logging.getLogger(
98 custom_log = logging.getLogger(
99 'rhodecode.task.{}'.format(obj.__class__.__name__))
99 'rhodecode.task.{}'.format(obj.__class__.__name__))
100
100
101 if rhodecode.CELERY_ENABLED:
101 if rhodecode.CELERY_ENABLED:
102 try:
102 try:
103 custom_log = obj.get_logger()
103 custom_log = obj.get_logger()
104 except Exception:
104 except Exception:
105 pass
105 pass
106
106
107 return custom_log
107 return custom_log
108
108
109
109
110 # init main celery app
110 # init main celery app
111 celery_app = Celery()
111 celery_app = Celery()
112 celery_app.user_options['preload'].add(add_preload_arguments)
112 celery_app.user_options['preload'].add(add_preload_arguments)
113
113
114
114
115 @signals.setup_logging.connect
115 @signals.setup_logging.connect
116 def setup_logging_callback(**kwargs):
116 def setup_logging_callback(**kwargs):
117 ini_file = celery_app.conf['RC_INI_FILE']
117 ini_file = celery_app.conf['RC_INI_FILE']
118 setup_logging(ini_file)
118 setup_logging(ini_file)
119
119
120
120
121 @signals.user_preload_options.connect
121 @signals.user_preload_options.connect
122 def on_preload_parsed(options, **kwargs):
122 def on_preload_parsed(options, **kwargs):
123
123
124 ini_file = options['ini']
124 ini_file = options['ini']
125 ini_vars = options['ini_var']
125 ini_vars = options['ini_var']
126
126
127 if ini_file is None:
127 if ini_file is None:
128 print('You must provide the --ini argument to start celery')
128 print('You must provide the --ini argument to start celery')
129 exit(-1)
129 exit(-1)
130
130
131 options = None
131 options = None
132 if ini_vars is not None:
132 if ini_vars is not None:
133 options = parse_ini_vars(ini_vars)
133 options = parse_ini_vars(ini_vars)
134
134
135 celery_app.conf['RC_INI_FILE'] = ini_file
135 celery_app.conf['RC_INI_FILE'] = ini_file
136 celery_app.conf['RC_INI_OPTIONS'] = options
136 celery_app.conf['RC_INI_OPTIONS'] = options
137 setup_logging(ini_file)
137 setup_logging(ini_file)
138
138
139
139
140 def _init_celery(app_type=''):
140 def _init_celery(app_type=''):
141 from rhodecode.config.middleware import get_celery_config
141 from rhodecode.config.middleware import get_celery_config
142
142
143 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
143 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
144
144
145 ini_file = celery_app.conf['RC_INI_FILE']
145 ini_file = celery_app.conf['RC_INI_FILE']
146 options = celery_app.conf['RC_INI_OPTIONS']
146 options = celery_app.conf['RC_INI_OPTIONS']
147
147
148 env = None
148 env = None
149 try:
149 try:
150 env = bootstrap(ini_file, options=options)
150 env = bootstrap(ini_file, options=options)
151 except Exception:
151 except Exception:
152 log.exception('Failed to bootstrap RhodeCode APP')
152 log.exception('Failed to bootstrap RhodeCode APP')
153
153
154 if not env:
154 if not env:
155 raise EnvironmentError(
155 raise EnvironmentError(
156 'Failed to load pyramid ENV. '
156 'Failed to load pyramid ENV. '
157 'Probably there is another error present that prevents from running pyramid app')
157 'Probably there is another error present that prevents from running pyramid app')
158
158
159 log.debug('Got Pyramid ENV: %s', env)
159 log.debug('Got Pyramid ENV: %s', env)
160
160
161 celery_settings = get_celery_config(env['registry'].settings)
161 celery_settings = get_celery_config(env['registry'].settings)
162
162
163 setup_celery_app(
163 setup_celery_app(
164 app=env['app'], root=env['root'], request=env['request'],
164 app=env['app'], root=env['root'], request=env['request'],
165 registry=env['registry'], closer=env['closer'],
165 registry=env['registry'], closer=env['closer'],
166 celery_settings=celery_settings)
166 celery_settings=celery_settings)
167
167
168
168
169 @signals.celeryd_init.connect
169 @signals.celeryd_init.connect
170 def on_celeryd_init(sender=None, conf=None, **kwargs):
170 def on_celeryd_init(sender=None, conf=None, **kwargs):
171 _init_celery('celery worker')
171 _init_celery('celery worker')
172
172
173 # fix the global flag even if it's disabled via .ini file because this
173 # fix the global flag even if it's disabled via .ini file because this
174 # is a worker code that doesn't need this to be disabled.
174 # is a worker code that doesn't need this to be disabled.
175 rhodecode.CELERY_ENABLED = True
175 rhodecode.CELERY_ENABLED = True
176
176
177
177
178 @signals.beat_init.connect
178 @signals.beat_init.connect
179 def on_beat_init(sender=None, conf=None, **kwargs):
179 def on_beat_init(sender=None, conf=None, **kwargs):
180 _init_celery('celery beat')
180 _init_celery('celery beat')
181
181
182
182
183 @signals.task_prerun.connect
183 @signals.task_prerun.connect
184 def task_prerun_signal(task_id, task, args, **kwargs):
184 def task_prerun_signal(task_id, task, args, **kwargs):
185 ping_db()
185 ping_db()
186 statsd = StatsdClient.statsd
186 statsd = StatsdClient.statsd
187 if statsd:
187 if statsd:
188 task_repr = getattr(task, 'name', task)
188 task_repr = getattr(task, 'name', task)
189 statsd.incr('rhodecode_celery_task_total', tags=[
189 statsd.incr('rhodecode_celery_task_total', tags=[
190 'task:{}'.format(task_repr),
190 'task:{}'.format(task_repr),
191 'mode:async'
191 'mode:async'
192 ])
192 ])
193
193
194
194
195 @signals.task_success.connect
195 @signals.task_success.connect
196 def task_success_signal(result, **kwargs):
196 def task_success_signal(result, **kwargs):
197 meta.Session.commit()
197 meta.Session.commit()
198 closer = celery_app.conf['PYRAMID_CLOSER']
198 closer = celery_app.conf['PYRAMID_CLOSER']
199 if closer:
199 if closer:
200 closer()
200 closer()
201
201
202
202
203
203
204 @signals.task_retry.connect
204 @signals.task_retry.connect
205 def task_retry_signal(
205 def task_retry_signal(
206 request, reason, einfo, **kwargs):
206 request, reason, einfo, **kwargs):
207 meta.Session.remove()
207 meta.Session.remove()
208 closer = celery_app.conf['PYRAMID_CLOSER']
208 closer = celery_app.conf['PYRAMID_CLOSER']
209 if closer:
209 if closer:
210 closer()
210 closer()
211
211
212
212
213 @signals.task_failure.connect
213 @signals.task_failure.connect
214 def task_failure_signal(
214 def task_failure_signal(
215 task_id, exception, args, kwargs, traceback, einfo, **kargs):
215 task_id, exception, args, kwargs, traceback, einfo, **kargs):
216
216
217 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
217 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
218 from rhodecode.lib.exc_tracking import store_exception
218 from rhodecode.lib.exc_tracking import store_exception
219 from rhodecode.lib.statsd_client import StatsdClient
219 from rhodecode.lib.statsd_client import StatsdClient
220
220
221 meta.Session.remove()
221 meta.Session.remove()
222
222
223 # simulate sys.exc_info()
223 # simulate sys.exc_info()
224 exc_info = (einfo.type, einfo.exception, einfo.tb)
224 exc_info = (einfo.type, einfo.exception, einfo.tb)
225 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
225 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
226 statsd = StatsdClient.statsd
226 statsd = StatsdClient.statsd
227 if statsd:
227 if statsd:
228 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
228 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
229 statsd.incr('rhodecode_exception_total',
229 statsd.incr('rhodecode_exception_total',
230 tags=["exc_source:celery", "type:{}".format(exc_type)])
230 tags=["exc_source:celery", "type:{}".format(exc_type)])
231
231
232 closer = celery_app.conf['PYRAMID_CLOSER']
232 closer = celery_app.conf['PYRAMID_CLOSER']
233 if closer:
233 if closer:
234 closer()
234 closer()
235
235
236
236
237 @signals.task_revoked.connect
237 @signals.task_revoked.connect
238 def task_revoked_signal(
238 def task_revoked_signal(
239 request, terminated, signum, expired, **kwargs):
239 request, terminated, signum, expired, **kwargs):
240 closer = celery_app.conf['PYRAMID_CLOSER']
240 closer = celery_app.conf['PYRAMID_CLOSER']
241 if closer:
241 if closer:
242 closer()
242 closer()
243
243
244
244
245 class UNSET(object):
245 class UNSET(object):
246 pass
246 pass
247
247
248
248
249 def set_celery_conf(app=UNSET(), root=UNSET(), request=UNSET(), registry=UNSET(), closer=UNSET()):
249 _unset = UNSET()
250
251
252 def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset):
250
253
251 if request is not UNSET:
254 if request is not UNSET:
252 celery_app.conf.update({'PYRAMID_REQUEST': request})
255 celery_app.conf.update({'PYRAMID_REQUEST': request})
253
256
254 if registry is not UNSET:
257 if registry is not UNSET:
255 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
258 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
256
259
257
260
258 def setup_celery_app(app, root, request, registry, closer, celery_settings):
261 def setup_celery_app(app, root, request, registry, closer, celery_settings):
259 log.debug('Got custom celery conf: %s', celery_settings)
262 log.debug('Got custom celery conf: %s', celery_settings)
260 celery_config = base_celery_config
263 celery_config = base_celery_config
261 celery_config.update({
264 celery_config.update({
262 # store celerybeat scheduler db where the .ini file is
265 # store celerybeat scheduler db where the .ini file is
263 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
266 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
264 })
267 })
265
268
266 celery_config.update(celery_settings)
269 celery_config.update(celery_settings)
267 celery_app.config_from_object(celery_config)
270 celery_app.config_from_object(celery_config)
268
271
269 celery_app.conf.update({'PYRAMID_APP': app})
272 celery_app.conf.update({'PYRAMID_APP': app})
270 celery_app.conf.update({'PYRAMID_ROOT': root})
273 celery_app.conf.update({'PYRAMID_ROOT': root})
271 celery_app.conf.update({'PYRAMID_REQUEST': request})
274 celery_app.conf.update({'PYRAMID_REQUEST': request})
272 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
275 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
273 celery_app.conf.update({'PYRAMID_CLOSER': closer})
276 celery_app.conf.update({'PYRAMID_CLOSER': closer})
274
277
275
278
276 def configure_celery(config, celery_settings):
279 def configure_celery(config, celery_settings):
277 """
280 """
278 Helper that is called from our application creation logic. It gives
281 Helper that is called from our application creation logic. It gives
279 connection info into running webapp and allows execution of tasks from
282 connection info into running webapp and allows execution of tasks from
280 RhodeCode itself
283 RhodeCode itself
281 """
284 """
282 # store some globals into rhodecode
285 # store some globals into rhodecode
283 rhodecode.CELERY_ENABLED = str2bool(
286 rhodecode.CELERY_ENABLED = str2bool(
284 config.registry.settings.get('use_celery'))
287 config.registry.settings.get('use_celery'))
285 if rhodecode.CELERY_ENABLED:
288 if rhodecode.CELERY_ENABLED:
286 log.info('Configuring celery based on `%s` settings', celery_settings)
289 log.info('Configuring celery based on `%s` settings', celery_settings)
287 setup_celery_app(
290 setup_celery_app(
288 app=None, root=None, request=None, registry=config.registry,
291 app=None, root=None, request=None, registry=config.registry,
289 closer=None, celery_settings=celery_settings)
292 closer=None, celery_settings=celery_settings)
290
293
291
294
292 def maybe_prepare_env(req):
295 def maybe_prepare_env(req):
293 environ = {}
296 environ = {}
294 try:
297 try:
295 environ.update({
298 environ.update({
296 'PATH_INFO': req.environ['PATH_INFO'],
299 'PATH_INFO': req.environ['PATH_INFO'],
297 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
300 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
298 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
301 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
299 'SERVER_NAME': req.environ['SERVER_NAME'],
302 'SERVER_NAME': req.environ['SERVER_NAME'],
300 'SERVER_PORT': req.environ['SERVER_PORT'],
303 'SERVER_PORT': req.environ['SERVER_PORT'],
301 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
304 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
302 })
305 })
303 except Exception:
306 except Exception:
304 pass
307 pass
305
308
306 return environ
309 return environ
307
310
308
311
309 class RequestContextTask(Task):
312 class RequestContextTask(Task):
310 """
313 """
311 This is a celery task which will create a rhodecode app instance context
314 This is a celery task which will create a rhodecode app instance context
312 for the task, patch pyramid with the original request
315 for the task, patch pyramid with the original request
313 that created the task and also add the user to the context.
316 that created the task and also add the user to the context.
314 """
317 """
315
318
316 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
319 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
317 link=None, link_error=None, shadow=None, **options):
320 link=None, link_error=None, shadow=None, **options):
318 """ queue the job to run (we are in web request context here) """
321 """ queue the job to run (we are in web request context here) """
319 from rhodecode.lib.base import get_ip_addr
322 from rhodecode.lib.base import get_ip_addr
320
323
321 req = self.app.conf['PYRAMID_REQUEST']
324 req = self.app.conf['PYRAMID_REQUEST']
322 if not req:
325 if not req:
323 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
326 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
324
327
325 log.debug('Running Task with class: %s. Request Class: %s',
328 log.debug('Running Task with class: %s. Request Class: %s',
326 self.__class__, req.__class__)
329 self.__class__, req.__class__)
327
330
328 user_id = 0
331 user_id = 0
329
332
330 # web case
333 # web case
331 if hasattr(req, 'user'):
334 if hasattr(req, 'user'):
332 user_id = req.user.user_id
335 user_id = req.user.user_id
333
336
334 # api case
337 # api case
335 elif hasattr(req, 'rpc_user'):
338 elif hasattr(req, 'rpc_user'):
336 user_id = req.rpc_user.user_id
339 user_id = req.rpc_user.user_id
337
340
338 # we hook into kwargs since it is the only way to pass our data to
341 # we hook into kwargs since it is the only way to pass our data to
339 # the celery worker
342 # the celery worker
340 environ = maybe_prepare_env(req)
343 environ = maybe_prepare_env(req)
341 options['headers'] = options.get('headers', {})
344 options['headers'] = options.get('headers', {})
342 options['headers'].update({
345 options['headers'].update({
343 'rhodecode_proxy_data': {
346 'rhodecode_proxy_data': {
344 'environ': environ,
347 'environ': environ,
345 'auth_user': {
348 'auth_user': {
346 'ip_addr': get_ip_addr(req.environ),
349 'ip_addr': get_ip_addr(req.environ),
347 'user_id': user_id
350 'user_id': user_id
348 },
351 },
349 }
352 }
350 })
353 })
351
354
352 return super(RequestContextTask, self).apply_async(
355 return super(RequestContextTask, self).apply_async(
353 args, kwargs, task_id, producer, link, link_error, shadow, **options)
356 args, kwargs, task_id, producer, link, link_error, shadow, **options)
General Comments 0
You need to be logged in to leave comments. Login now