##// END OF EJS Templates
celery: fixed bootstrap of beat workers
super-admin -
r4889:609be9c6 default
parent child Browse files
Show More
@@ -1,329 +1,344 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2020 RhodeCode GmbH
3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 """
20 """
21 Celery loader, run with::
21 Celery loader, run with::
22
22
23 celery worker \
23 celery worker \
24 --task-events \
24 --task-events \
25 --beat \
25 --beat \
26 --autoscale=20,2 \
26 --autoscale=20,2 \
27 --max-tasks-per-child 1 \
27 --max-tasks-per-child 1 \
28 --app rhodecode.lib.celerylib.loader \
28 --app rhodecode.lib.celerylib.loader \
29 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
29 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
30 --loglevel DEBUG --ini=.dev/dev.ini
30 --loglevel DEBUG --ini=.dev/dev.ini
31 """
31 """
32 import os
32 import os
33 import logging
33 import logging
34 import importlib
34 import importlib
35
35
36 from celery import Celery
36 from celery import Celery
37 from celery import signals
37 from celery import signals
38 from celery import Task
38 from celery import Task
39 from celery import exceptions # pragma: no cover
39 from celery import exceptions # pragma: no cover
40 from kombu.serialization import register
40 from kombu.serialization import register
41
41
42 import rhodecode
42 import rhodecode
43
43
44 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
44 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
45 from rhodecode.lib.ext_json import json
45 from rhodecode.lib.ext_json import json
46 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
46 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
47 from rhodecode.lib.utils2 import str2bool
47 from rhodecode.lib.utils2 import str2bool
48 from rhodecode.model import meta
48 from rhodecode.model import meta
49
49
50
50
51 register('json_ext', json.dumps, json.loads,
51 register('json_ext', json.dumps, json.loads,
52 content_type='application/x-json-ext',
52 content_type='application/x-json-ext',
53 content_encoding='utf-8')
53 content_encoding='utf-8')
54
54
55 log = logging.getLogger('celery.rhodecode.loader')
55 log = logging.getLogger('celery.rhodecode.loader')
56
56
57
57
58 imports = ['rhodecode.lib.celerylib.tasks']
58 imports = ['rhodecode.lib.celerylib.tasks']
59
59
60 try:
60 try:
61 # try if we have EE tasks available
61 # try if we have EE tasks available
62 importlib.import_module('rc_ee')
62 importlib.import_module('rc_ee')
63 imports.append('rc_ee.lib.celerylib.tasks')
63 imports.append('rc_ee.lib.celerylib.tasks')
64 except ImportError:
64 except ImportError:
65 pass
65 pass
66
66
67
67
68 base_celery_config = {
68 base_celery_config = {
69 'result_backend': 'rpc://',
69 'result_backend': 'rpc://',
70 'result_expires': 60 * 60 * 24,
70 'result_expires': 60 * 60 * 24,
71 'result_persistent': True,
71 'result_persistent': True,
72 'imports': imports,
72 'imports': imports,
73 'worker_max_tasks_per_child': 100,
73 'worker_max_tasks_per_child': 100,
74 'accept_content': ['json_ext', 'json'],
74 'accept_content': ['json_ext', 'json'],
75 'task_serializer': 'json_ext',
75 'task_serializer': 'json_ext',
76 'result_serializer': 'json_ext',
76 'result_serializer': 'json_ext',
77 'worker_hijack_root_logger': False,
77 'worker_hijack_root_logger': False,
78 'database_table_names': {
78 'database_table_names': {
79 'task': 'beat_taskmeta',
79 'task': 'beat_taskmeta',
80 'group': 'beat_groupmeta',
80 'group': 'beat_groupmeta',
81 }
81 }
82 }
82 }
83
83
84
84
85 def add_preload_arguments(parser):
85 def add_preload_arguments(parser):
86 parser.add_argument(
86 parser.add_argument(
87 '--ini', default=None,
87 '--ini', default=None,
88 help='Path to ini configuration file.'
88 help='Path to ini configuration file.'
89 )
89 )
90 parser.add_argument(
90 parser.add_argument(
91 '--ini-var', default=None,
91 '--ini-var', default=None,
92 help='Comma separated list of key=value to pass to ini.'
92 help='Comma separated list of key=value to pass to ini.'
93 )
93 )
94
94
95
95
96 def get_logger(obj):
96 def get_logger(obj):
97 custom_log = logging.getLogger(
97 custom_log = logging.getLogger(
98 'rhodecode.task.{}'.format(obj.__class__.__name__))
98 'rhodecode.task.{}'.format(obj.__class__.__name__))
99
99
100 if rhodecode.CELERY_ENABLED:
100 if rhodecode.CELERY_ENABLED:
101 try:
101 try:
102 custom_log = obj.get_logger()
102 custom_log = obj.get_logger()
103 except Exception:
103 except Exception:
104 pass
104 pass
105
105
106 return custom_log
106 return custom_log
107
107
108
108
109 # init main celery app
109 # init main celery app
110 celery_app = Celery()
110 celery_app = Celery()
111 celery_app.user_options['preload'].add(add_preload_arguments)
111 celery_app.user_options['preload'].add(add_preload_arguments)
112
112
113
113
114 @signals.setup_logging.connect
114 @signals.setup_logging.connect
115 def setup_logging_callback(**kwargs):
115 def setup_logging_callback(**kwargs):
116 ini_file = celery_app.conf['RC_INI_FILE']
116 ini_file = celery_app.conf['RC_INI_FILE']
117 setup_logging(ini_file)
117 setup_logging(ini_file)
118
118
119
119
120 @signals.user_preload_options.connect
120 @signals.user_preload_options.connect
121 def on_preload_parsed(options, **kwargs):
121 def on_preload_parsed(options, **kwargs):
122
122
123 ini_file = options['ini']
123 ini_file = options['ini']
124 ini_vars = options['ini_var']
124 ini_vars = options['ini_var']
125
125
126 if ini_file is None:
126 if ini_file is None:
127 print('You must provide the paste --ini argument')
127 print('You must provide the --ini argument to start celery')
128 exit(-1)
128 exit(-1)
129
129
130 options = None
130 options = None
131 if ini_vars is not None:
131 if ini_vars is not None:
132 options = parse_ini_vars(ini_vars)
132 options = parse_ini_vars(ini_vars)
133
133
134 celery_app.conf['RC_INI_FILE'] = ini_file
134 celery_app.conf['RC_INI_FILE'] = ini_file
135 celery_app.conf['RC_INI_OPTIONS'] = options
135 celery_app.conf['RC_INI_OPTIONS'] = options
136 setup_logging(ini_file)
136 setup_logging(ini_file)
137
137
138
138
139 @signals.celeryd_init.connect
139 def _init_celery(app_type=''):
140 def on_celeryd_init(sender=None, conf=None, **kwargs):
141 from rhodecode.config.middleware import get_celery_config
140 from rhodecode.config.middleware import get_celery_config
142
141
143 log.debug('Bootstrapping RhodeCode application...')
142 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
144
143
145 ini_file = conf['RC_INI_FILE']
144 ini_file = celery_app.conf['RC_INI_FILE']
146 options = conf['RC_INI_OPTIONS']
145 options = celery_app.conf['RC_INI_OPTIONS']
147
146
147 env = None
148 try:
148 try:
149 env = bootstrap(ini_file, options=options)
149 env = bootstrap(ini_file, options=options)
150 except Exception:
150 except Exception:
151 log.exception('Failed to bootstrap RhodeCode APP')
151 log.exception('Failed to bootstrap RhodeCode APP')
152
152
153 if not env:
154 raise EnvironmentError(
155 'Failed to load pyramid ENV. '
156 'Probably there is another error present that prevents from running pyramid app')
157
153 log.debug('Got Pyramid ENV: %s', env)
158 log.debug('Got Pyramid ENV: %s', env)
154
159
155 celery_settings = get_celery_config(env['registry'].settings)
160 celery_settings = get_celery_config(env['registry'].settings)
156
161
157 setup_celery_app(
162 setup_celery_app(
158 app=env['app'], root=env['root'], request=env['request'],
163 app=env['app'], root=env['root'], request=env['request'],
159 registry=env['registry'], closer=env['closer'],
164 registry=env['registry'], closer=env['closer'],
160 celery_settings=celery_settings)
165 celery_settings=celery_settings)
161
166
167
168 @signals.celeryd_init.connect
169 def on_celeryd_init(sender=None, conf=None, **kwargs):
170 _init_celery('celery worker')
171
162 # fix the global flag even if it's disabled via .ini file because this
172 # fix the global flag even if it's disabled via .ini file because this
163 # is a worker code that doesn't need this to be disabled.
173 # is a worker code that doesn't need this to be disabled.
164 rhodecode.CELERY_ENABLED = True
174 rhodecode.CELERY_ENABLED = True
165
175
166
176
177 @signals.beat_init.connect
178 def on_beat_init(sender=None, conf=None, **kwargs):
179 _init_celery('celery beat')
180
181
167 @signals.task_prerun.connect
182 @signals.task_prerun.connect
168 def task_prerun_signal(task_id, task, args, **kwargs):
183 def task_prerun_signal(task_id, task, args, **kwargs):
169 ping_db()
184 ping_db()
170
185
171
186
172 @signals.task_success.connect
187 @signals.task_success.connect
173 def task_success_signal(result, **kwargs):
188 def task_success_signal(result, **kwargs):
174 meta.Session.commit()
189 meta.Session.commit()
175 closer = celery_app.conf['PYRAMID_CLOSER']
190 closer = celery_app.conf['PYRAMID_CLOSER']
176 if closer:
191 if closer:
177 closer()
192 closer()
178
193
179
194
180 @signals.task_retry.connect
195 @signals.task_retry.connect
181 def task_retry_signal(
196 def task_retry_signal(
182 request, reason, einfo, **kwargs):
197 request, reason, einfo, **kwargs):
183 meta.Session.remove()
198 meta.Session.remove()
184 closer = celery_app.conf['PYRAMID_CLOSER']
199 closer = celery_app.conf['PYRAMID_CLOSER']
185 if closer:
200 if closer:
186 closer()
201 closer()
187
202
188
203
189 @signals.task_failure.connect
204 @signals.task_failure.connect
190 def task_failure_signal(
205 def task_failure_signal(
191 task_id, exception, args, kwargs, traceback, einfo, **kargs):
206 task_id, exception, args, kwargs, traceback, einfo, **kargs):
192
207
193 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
208 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
194 from rhodecode.lib.exc_tracking import store_exception
209 from rhodecode.lib.exc_tracking import store_exception
195 from rhodecode.lib.statsd_client import StatsdClient
210 from rhodecode.lib.statsd_client import StatsdClient
196
211
197 meta.Session.remove()
212 meta.Session.remove()
198
213
199 # simulate sys.exc_info()
214 # simulate sys.exc_info()
200 exc_info = (einfo.type, einfo.exception, einfo.tb)
215 exc_info = (einfo.type, einfo.exception, einfo.tb)
201 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
216 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
202 statsd = StatsdClient.statsd
217 statsd = StatsdClient.statsd
203 if statsd:
218 if statsd:
204 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
219 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
205 statsd.incr('rhodecode_exception_total',
220 statsd.incr('rhodecode_exception_total',
206 tags=["exc_source:celery", "type:{}".format(exc_type)])
221 tags=["exc_source:celery", "type:{}".format(exc_type)])
207
222
208 closer = celery_app.conf['PYRAMID_CLOSER']
223 closer = celery_app.conf['PYRAMID_CLOSER']
209 if closer:
224 if closer:
210 closer()
225 closer()
211
226
212
227
213 @signals.task_revoked.connect
228 @signals.task_revoked.connect
214 def task_revoked_signal(
229 def task_revoked_signal(
215 request, terminated, signum, expired, **kwargs):
230 request, terminated, signum, expired, **kwargs):
216 closer = celery_app.conf['PYRAMID_CLOSER']
231 closer = celery_app.conf['PYRAMID_CLOSER']
217 if closer:
232 if closer:
218 closer()
233 closer()
219
234
220
235
221 class UNSET(object):
236 class UNSET(object):
222 pass
237 pass
223
238
224
239
225 def set_celery_conf(app=UNSET(), root=UNSET(), request=UNSET(), registry=UNSET(), closer=UNSET()):
240 def set_celery_conf(app=UNSET(), root=UNSET(), request=UNSET(), registry=UNSET(), closer=UNSET()):
226
241
227 if request is not UNSET:
242 if request is not UNSET:
228 celery_app.conf.update({'PYRAMID_REQUEST': request})
243 celery_app.conf.update({'PYRAMID_REQUEST': request})
229
244
230 if registry is not UNSET:
245 if registry is not UNSET:
231 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
246 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
232
247
233
248
234 def setup_celery_app(app, root, request, registry, closer, celery_settings):
249 def setup_celery_app(app, root, request, registry, closer, celery_settings):
235 log.debug('Got custom celery conf: %s', celery_settings)
250 log.debug('Got custom celery conf: %s', celery_settings)
236 celery_config = base_celery_config
251 celery_config = base_celery_config
237 celery_config.update({
252 celery_config.update({
238 # store celerybeat scheduler db where the .ini file is
253 # store celerybeat scheduler db where the .ini file is
239 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
254 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
240 })
255 })
241
256
242 celery_config.update(celery_settings)
257 celery_config.update(celery_settings)
243 celery_app.config_from_object(celery_config)
258 celery_app.config_from_object(celery_config)
244
259
245 celery_app.conf.update({'PYRAMID_APP': app})
260 celery_app.conf.update({'PYRAMID_APP': app})
246 celery_app.conf.update({'PYRAMID_ROOT': root})
261 celery_app.conf.update({'PYRAMID_ROOT': root})
247 celery_app.conf.update({'PYRAMID_REQUEST': request})
262 celery_app.conf.update({'PYRAMID_REQUEST': request})
248 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
263 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
249 celery_app.conf.update({'PYRAMID_CLOSER': closer})
264 celery_app.conf.update({'PYRAMID_CLOSER': closer})
250
265
251
266
252 def configure_celery(config, celery_settings):
267 def configure_celery(config, celery_settings):
253 """
268 """
254 Helper that is called from our application creation logic. It gives
269 Helper that is called from our application creation logic. It gives
255 connection info into running webapp and allows execution of tasks from
270 connection info into running webapp and allows execution of tasks from
256 RhodeCode itself
271 RhodeCode itself
257 """
272 """
258 # store some globals into rhodecode
273 # store some globals into rhodecode
259 rhodecode.CELERY_ENABLED = str2bool(
274 rhodecode.CELERY_ENABLED = str2bool(
260 config.registry.settings.get('use_celery'))
275 config.registry.settings.get('use_celery'))
261 if rhodecode.CELERY_ENABLED:
276 if rhodecode.CELERY_ENABLED:
262 log.info('Configuring celery based on `%s` settings', celery_settings)
277 log.info('Configuring celery based on `%s` settings', celery_settings)
263 setup_celery_app(
278 setup_celery_app(
264 app=None, root=None, request=None, registry=config.registry,
279 app=None, root=None, request=None, registry=config.registry,
265 closer=None, celery_settings=celery_settings)
280 closer=None, celery_settings=celery_settings)
266
281
267
282
268 def maybe_prepare_env(req):
283 def maybe_prepare_env(req):
269 environ = {}
284 environ = {}
270 try:
285 try:
271 environ.update({
286 environ.update({
272 'PATH_INFO': req.environ['PATH_INFO'],
287 'PATH_INFO': req.environ['PATH_INFO'],
273 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
288 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
274 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
289 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
275 'SERVER_NAME': req.environ['SERVER_NAME'],
290 'SERVER_NAME': req.environ['SERVER_NAME'],
276 'SERVER_PORT': req.environ['SERVER_PORT'],
291 'SERVER_PORT': req.environ['SERVER_PORT'],
277 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
292 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
278 })
293 })
279 except Exception:
294 except Exception:
280 pass
295 pass
281
296
282 return environ
297 return environ
283
298
284
299
285 class RequestContextTask(Task):
300 class RequestContextTask(Task):
286 """
301 """
287 This is a celery task which will create a rhodecode app instance context
302 This is a celery task which will create a rhodecode app instance context
288 for the task, patch pyramid with the original request
303 for the task, patch pyramid with the original request
289 that created the task and also add the user to the context.
304 that created the task and also add the user to the context.
290 """
305 """
291
306
292 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
307 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
293 link=None, link_error=None, shadow=None, **options):
308 link=None, link_error=None, shadow=None, **options):
294 """ queue the job to run (we are in web request context here) """
309 """ queue the job to run (we are in web request context here) """
295 from rhodecode.lib.base import get_ip_addr
310 from rhodecode.lib.base import get_ip_addr
296
311
297 req = self.app.conf['PYRAMID_REQUEST']
312 req = self.app.conf['PYRAMID_REQUEST']
298 if not req:
313 if not req:
299 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
314 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
300
315
301 log.debug('Running Task with class: %s. Request Class: %s',
316 log.debug('Running Task with class: %s. Request Class: %s',
302 self.__class__, req.__class__)
317 self.__class__, req.__class__)
303
318
304 user_id = 0
319 user_id = 0
305
320
306 # web case
321 # web case
307 if hasattr(req, 'user'):
322 if hasattr(req, 'user'):
308 user_id = req.user.user_id
323 user_id = req.user.user_id
309
324
310 # api case
325 # api case
311 elif hasattr(req, 'rpc_user'):
326 elif hasattr(req, 'rpc_user'):
312 user_id = req.rpc_user.user_id
327 user_id = req.rpc_user.user_id
313
328
314 # we hook into kwargs since it is the only way to pass our data to
329 # we hook into kwargs since it is the only way to pass our data to
315 # the celery worker
330 # the celery worker
316 environ = maybe_prepare_env(req)
331 environ = maybe_prepare_env(req)
317 options['headers'] = options.get('headers', {})
332 options['headers'] = options.get('headers', {})
318 options['headers'].update({
333 options['headers'].update({
319 'rhodecode_proxy_data': {
334 'rhodecode_proxy_data': {
320 'environ': environ,
335 'environ': environ,
321 'auth_user': {
336 'auth_user': {
322 'ip_addr': get_ip_addr(req.environ),
337 'ip_addr': get_ip_addr(req.environ),
323 'user_id': user_id
338 'user_id': user_id
324 },
339 },
325 }
340 }
326 })
341 })
327
342
328 return super(RequestContextTask, self).apply_async(
343 return super(RequestContextTask, self).apply_async(
329 args, kwargs, task_id, producer, link, link_error, shadow, **options)
344 args, kwargs, task_id, producer, link, link_error, shadow, **options)
General Comments 0
You need to be logged in to leave comments. Login now