##// END OF EJS Templates
celery: fail with exception if bootstrap is failed
super-admin -
r4876:39926d5e default
parent child Browse files
Show More
@@ -1,323 +1,324 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2020 RhodeCode GmbH
3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 """
20 """
21 Celery loader, run with::
21 Celery loader, run with::
22
22
23 celery worker \
23 celery worker \
24 --task-events \
24 --task-events \
25 --beat \
25 --beat \
26 --autoscale=20,2 \
26 --autoscale=20,2 \
27 --max-tasks-per-child 1 \
27 --max-tasks-per-child 1 \
28 --app rhodecode.lib.celerylib.loader \
28 --app rhodecode.lib.celerylib.loader \
29 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
29 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
30 --loglevel DEBUG --ini=.dev/dev.ini
30 --loglevel DEBUG --ini=.dev/dev.ini
31 """
31 """
32 import os
32 import os
33 import logging
33 import logging
34 import importlib
34 import importlib
35
35
36 from celery import Celery
36 from celery import Celery
37 from celery import signals
37 from celery import signals
38 from celery import Task
38 from celery import Task
39 from celery import exceptions # pragma: no cover
39 from celery import exceptions # pragma: no cover
40 from kombu.serialization import register
40 from kombu.serialization import register
41 from pyramid.threadlocal import get_current_request
41 from pyramid.threadlocal import get_current_request
42
42
43 import rhodecode
43 import rhodecode
44
44
45 from rhodecode.lib.auth import AuthUser
45 from rhodecode.lib.auth import AuthUser
46 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
46 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
47 from rhodecode.lib.ext_json import json
47 from rhodecode.lib.ext_json import json
48 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
48 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
49 from rhodecode.lib.utils2 import str2bool
49 from rhodecode.lib.utils2 import str2bool
50 from rhodecode.model import meta
50 from rhodecode.model import meta
51
51
52
52
53 register('json_ext', json.dumps, json.loads,
53 register('json_ext', json.dumps, json.loads,
54 content_type='application/x-json-ext',
54 content_type='application/x-json-ext',
55 content_encoding='utf-8')
55 content_encoding='utf-8')
56
56
57 log = logging.getLogger('celery.rhodecode.loader')
57 log = logging.getLogger('celery.rhodecode.loader')
58
58
59
59
60 def add_preload_arguments(parser):
60 def add_preload_arguments(parser):
61 parser.add_argument(
61 parser.add_argument(
62 '--ini', default=None,
62 '--ini', default=None,
63 help='Path to ini configuration file.'
63 help='Path to ini configuration file.'
64 )
64 )
65 parser.add_argument(
65 parser.add_argument(
66 '--ini-var', default=None,
66 '--ini-var', default=None,
67 help='Comma separated list of key=value to pass to ini.'
67 help='Comma separated list of key=value to pass to ini.'
68 )
68 )
69
69
70
70
71 def get_logger(obj):
71 def get_logger(obj):
72 custom_log = logging.getLogger(
72 custom_log = logging.getLogger(
73 'rhodecode.task.{}'.format(obj.__class__.__name__))
73 'rhodecode.task.{}'.format(obj.__class__.__name__))
74
74
75 if rhodecode.CELERY_ENABLED:
75 if rhodecode.CELERY_ENABLED:
76 try:
76 try:
77 custom_log = obj.get_logger()
77 custom_log = obj.get_logger()
78 except Exception:
78 except Exception:
79 pass
79 pass
80
80
81 return custom_log
81 return custom_log
82
82
83
83
84 imports = ['rhodecode.lib.celerylib.tasks']
84 imports = ['rhodecode.lib.celerylib.tasks']
85
85
86 try:
86 try:
87 # try if we have EE tasks available
87 # try if we have EE tasks available
88 importlib.import_module('rc_ee')
88 importlib.import_module('rc_ee')
89 imports.append('rc_ee.lib.celerylib.tasks')
89 imports.append('rc_ee.lib.celerylib.tasks')
90 except ImportError:
90 except ImportError:
91 pass
91 pass
92
92
93
93
94 base_celery_config = {
94 base_celery_config = {
95 'result_backend': 'rpc://',
95 'result_backend': 'rpc://',
96 'result_expires': 60 * 60 * 24,
96 'result_expires': 60 * 60 * 24,
97 'result_persistent': True,
97 'result_persistent': True,
98 'imports': imports,
98 'imports': imports,
99 'worker_max_tasks_per_child': 100,
99 'worker_max_tasks_per_child': 100,
100 'accept_content': ['json_ext'],
100 'accept_content': ['json_ext'],
101 'task_serializer': 'json_ext',
101 'task_serializer': 'json_ext',
102 'result_serializer': 'json_ext',
102 'result_serializer': 'json_ext',
103 'worker_hijack_root_logger': False,
103 'worker_hijack_root_logger': False,
104 'database_table_names': {
104 'database_table_names': {
105 'task': 'beat_taskmeta',
105 'task': 'beat_taskmeta',
106 'group': 'beat_groupmeta',
106 'group': 'beat_groupmeta',
107 }
107 }
108 }
108 }
109 # init main celery app
109 # init main celery app
110 celery_app = Celery()
110 celery_app = Celery()
111 celery_app.user_options['preload'].add(add_preload_arguments)
111 celery_app.user_options['preload'].add(add_preload_arguments)
112 ini_file_glob = None
112 ini_file_glob = None
113
113
114
114
115 @signals.setup_logging.connect
115 @signals.setup_logging.connect
116 def setup_logging_callback(**kwargs):
116 def setup_logging_callback(**kwargs):
117 setup_logging(ini_file_glob)
117 setup_logging(ini_file_glob)
118
118
119
119
120 @signals.user_preload_options.connect
120 @signals.user_preload_options.connect
121 def on_preload_parsed(options, **kwargs):
121 def on_preload_parsed(options, **kwargs):
122 from rhodecode.config.middleware import get_celery_config
122 from rhodecode.config.middleware import get_celery_config
123
123
124 ini_location = options['ini']
124 ini_location = options['ini']
125 ini_vars = options['ini_var']
125 ini_vars = options['ini_var']
126 celery_app.conf['INI_PYRAMID'] = options['ini']
126 celery_app.conf['INI_PYRAMID'] = options['ini']
127
127
128 if ini_location is None:
128 if ini_location is None:
129 print('You must provide the paste --ini argument')
129 print('You must provide the paste --ini argument')
130 exit(-1)
130 exit(-1)
131
131
132 options = None
132 options = None
133 if ini_vars is not None:
133 if ini_vars is not None:
134 options = parse_ini_vars(ini_vars)
134 options = parse_ini_vars(ini_vars)
135
135
136 global ini_file_glob
136 global ini_file_glob
137 ini_file_glob = ini_location
137 ini_file_glob = ini_location
138
138
139 log.debug('Bootstrapping RhodeCode application...')
139 log.debug('Bootstrapping RhodeCode application...')
140
140
141 env = {}
142 try:
141 try:
143 env = bootstrap(ini_location, options=options)
142 env = bootstrap(ini_location, options=options)
144 except Exception:
143 except Exception:
145 log.exception('Failed to bootstrap RhodeCode APP')
144 log.exception('Failed to bootstrap RhodeCode APP')
145 raise
146
146
147 log.debug('Got Pyramid ENV: %s', env)
147 log.debug('Got Pyramid ENV: %s', env)
148
148 celery_settings = get_celery_config(env['registry'].settings)
149 celery_settings = get_celery_config(env['registry'].settings)
149
150
150 setup_celery_app(
151 setup_celery_app(
151 app=env['app'], root=env['root'], request=env['request'],
152 app=env['app'], root=env['root'], request=env['request'],
152 registry=env['registry'], closer=env['closer'],
153 registry=env['registry'], closer=env['closer'],
153 celery_settings=celery_settings)
154 celery_settings=celery_settings)
154
155
155 # fix the global flag even if it's disabled via .ini file because this
156 # fix the global flag even if it's disabled via .ini file because this
156 # is a worker code that doesn't need this to be disabled.
157 # is a worker code that doesn't need this to be disabled.
157 rhodecode.CELERY_ENABLED = True
158 rhodecode.CELERY_ENABLED = True
158
159
159
160
160 @signals.task_prerun.connect
161 @signals.task_prerun.connect
161 def task_prerun_signal(task_id, task, args, **kwargs):
162 def task_prerun_signal(task_id, task, args, **kwargs):
162 ping_db()
163 ping_db()
163
164
164
165
165 @signals.task_success.connect
166 @signals.task_success.connect
166 def task_success_signal(result, **kwargs):
167 def task_success_signal(result, **kwargs):
167 meta.Session.commit()
168 meta.Session.commit()
168 closer = celery_app.conf['PYRAMID_CLOSER']
169 closer = celery_app.conf['PYRAMID_CLOSER']
169 if closer:
170 if closer:
170 closer()
171 closer()
171
172
172
173
173 @signals.task_retry.connect
174 @signals.task_retry.connect
174 def task_retry_signal(
175 def task_retry_signal(
175 request, reason, einfo, **kwargs):
176 request, reason, einfo, **kwargs):
176 meta.Session.remove()
177 meta.Session.remove()
177 closer = celery_app.conf['PYRAMID_CLOSER']
178 closer = celery_app.conf['PYRAMID_CLOSER']
178 if closer:
179 if closer:
179 closer()
180 closer()
180
181
181
182
182 @signals.task_failure.connect
183 @signals.task_failure.connect
183 def task_failure_signal(
184 def task_failure_signal(
184 task_id, exception, args, kwargs, traceback, einfo, **kargs):
185 task_id, exception, args, kwargs, traceback, einfo, **kargs):
185
186
186 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
187 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
187 from rhodecode.lib.exc_tracking import store_exception
188 from rhodecode.lib.exc_tracking import store_exception
188 from rhodecode.lib.statsd_client import StatsdClient
189 from rhodecode.lib.statsd_client import StatsdClient
189
190
190 meta.Session.remove()
191 meta.Session.remove()
191
192
192 # simulate sys.exc_info()
193 # simulate sys.exc_info()
193 exc_info = (einfo.type, einfo.exception, einfo.tb)
194 exc_info = (einfo.type, einfo.exception, einfo.tb)
194 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
195 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
195 statsd = StatsdClient.statsd
196 statsd = StatsdClient.statsd
196 if statsd:
197 if statsd:
197 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
198 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
198 statsd.incr('rhodecode_exception_total',
199 statsd.incr('rhodecode_exception_total',
199 tags=["exc_source:celery", "type:{}".format(exc_type)])
200 tags=["exc_source:celery", "type:{}".format(exc_type)])
200
201
201 closer = celery_app.conf['PYRAMID_CLOSER']
202 closer = celery_app.conf['PYRAMID_CLOSER']
202 if closer:
203 if closer:
203 closer()
204 closer()
204
205
205
206
206 @signals.task_revoked.connect
207 @signals.task_revoked.connect
207 def task_revoked_signal(
208 def task_revoked_signal(
208 request, terminated, signum, expired, **kwargs):
209 request, terminated, signum, expired, **kwargs):
209 closer = celery_app.conf['PYRAMID_CLOSER']
210 closer = celery_app.conf['PYRAMID_CLOSER']
210 if closer:
211 if closer:
211 closer()
212 closer()
212
213
213
214
214 def setup_celery_app(app, root, request, registry, closer, celery_settings):
215 def setup_celery_app(app, root, request, registry, closer, celery_settings):
215 log.debug('Got custom celery conf: %s', celery_settings)
216 log.debug('Got custom celery conf: %s', celery_settings)
216 celery_config = base_celery_config
217 celery_config = base_celery_config
217 celery_config.update({
218 celery_config.update({
218 # store celerybeat scheduler db where the .ini file is
219 # store celerybeat scheduler db where the .ini file is
219 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
220 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
220 })
221 })
221
222
222 celery_config.update(celery_settings)
223 celery_config.update(celery_settings)
223 celery_app.config_from_object(celery_config)
224 celery_app.config_from_object(celery_config)
224
225
225 celery_app.conf.update({'PYRAMID_APP': app})
226 celery_app.conf.update({'PYRAMID_APP': app})
226 celery_app.conf.update({'PYRAMID_ROOT': root})
227 celery_app.conf.update({'PYRAMID_ROOT': root})
227 celery_app.conf.update({'PYRAMID_REQUEST': request})
228 celery_app.conf.update({'PYRAMID_REQUEST': request})
228 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
229 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
229 celery_app.conf.update({'PYRAMID_CLOSER': closer})
230 celery_app.conf.update({'PYRAMID_CLOSER': closer})
230
231
231
232
232 def configure_celery(config, celery_settings):
233 def configure_celery(config, celery_settings):
233 """
234 """
234 Helper that is called from our application creation logic. It gives
235 Helper that is called from our application creation logic. It gives
235 connection info into running webapp and allows execution of tasks from
236 connection info into running webapp and allows execution of tasks from
236 RhodeCode itself
237 RhodeCode itself
237 """
238 """
238 # store some globals into rhodecode
239 # store some globals into rhodecode
239 rhodecode.CELERY_ENABLED = str2bool(
240 rhodecode.CELERY_ENABLED = str2bool(
240 config.registry.settings.get('use_celery'))
241 config.registry.settings.get('use_celery'))
241 if rhodecode.CELERY_ENABLED:
242 if rhodecode.CELERY_ENABLED:
242 log.info('Configuring celery based on `%s` settings', celery_settings)
243 log.info('Configuring celery based on `%s` settings', celery_settings)
243 setup_celery_app(
244 setup_celery_app(
244 app=None, root=None, request=None, registry=config.registry,
245 app=None, root=None, request=None, registry=config.registry,
245 closer=None, celery_settings=celery_settings)
246 closer=None, celery_settings=celery_settings)
246
247
247
248
248 def maybe_prepare_env(req):
249 def maybe_prepare_env(req):
249 environ = {}
250 environ = {}
250 try:
251 try:
251 environ.update({
252 environ.update({
252 'PATH_INFO': req.environ['PATH_INFO'],
253 'PATH_INFO': req.environ['PATH_INFO'],
253 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
254 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
254 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
255 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
255 'SERVER_NAME': req.environ['SERVER_NAME'],
256 'SERVER_NAME': req.environ['SERVER_NAME'],
256 'SERVER_PORT': req.environ['SERVER_PORT'],
257 'SERVER_PORT': req.environ['SERVER_PORT'],
257 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
258 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
258 })
259 })
259 except Exception:
260 except Exception:
260 pass
261 pass
261
262
262 return environ
263 return environ
263
264
264
265
265 class RequestContextTask(Task):
266 class RequestContextTask(Task):
266 """
267 """
267 This is a celery task which will create a rhodecode app instance context
268 This is a celery task which will create a rhodecode app instance context
268 for the task, patch pyramid with the original request
269 for the task, patch pyramid with the original request
269 that created the task and also add the user to the context.
270 that created the task and also add the user to the context.
270 """
271 """
271
272
272 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
273 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
273 link=None, link_error=None, shadow=None, **options):
274 link=None, link_error=None, shadow=None, **options):
274 """ queue the job to run (we are in web request context here) """
275 """ queue the job to run (we are in web request context here) """
275
276
276 req = self.app.conf['PYRAMID_REQUEST'] or get_current_request()
277 req = self.app.conf['PYRAMID_REQUEST'] or get_current_request()
277
278
278 log.debug('Running Task with class: %s. Request Class: %s',
279 log.debug('Running Task with class: %s. Request Class: %s',
279 self.__class__, req.__class__)
280 self.__class__, req.__class__)
280
281
281 user_id = None
282 user_id = None
282 ip_addr = None
283 ip_addr = None
283
284
284 # web case
285 # web case
285 if hasattr(req, 'user'):
286 if hasattr(req, 'user'):
286 ip_addr = req.user.ip_addr
287 ip_addr = req.user.ip_addr
287 user_id = req.user.user_id
288 user_id = req.user.user_id
288
289
289 # api case
290 # api case
290 elif hasattr(req, 'rpc_user'):
291 elif hasattr(req, 'rpc_user'):
291 ip_addr = req.rpc_user.ip_addr
292 ip_addr = req.rpc_user.ip_addr
292 user_id = req.rpc_user.user_id
293 user_id = req.rpc_user.user_id
293 else:
294 else:
294 if user_id and ip_addr:
295 if user_id and ip_addr:
295 log.debug('Using data from celery proxy user')
296 log.debug('Using data from celery proxy user')
296
297
297 else:
298 else:
298 raise Exception(
299 raise Exception(
299 'Unable to fetch required data from request: {}. \n'
300 'Unable to fetch required data from request: {}. \n'
300 'This task is required to be executed from context of '
301 'This task is required to be executed from context of '
301 'request in a webapp. Task: {}'.format(
302 'request in a webapp. Task: {}'.format(
302 repr(req),
303 repr(req),
303 self.__class__
304 self.__class__
304 )
305 )
305 )
306 )
306
307
307 if req:
308 if req:
308 # we hook into kwargs since it is the only way to pass our data to
309 # we hook into kwargs since it is the only way to pass our data to
309 # the celery worker
310 # the celery worker
310 environ = maybe_prepare_env(req)
311 environ = maybe_prepare_env(req)
311 options['headers'] = options.get('headers', {})
312 options['headers'] = options.get('headers', {})
312 options['headers'].update({
313 options['headers'].update({
313 'rhodecode_proxy_data': {
314 'rhodecode_proxy_data': {
314 'environ': environ,
315 'environ': environ,
315 'auth_user': {
316 'auth_user': {
316 'ip_addr': ip_addr,
317 'ip_addr': ip_addr,
317 'user_id': user_id
318 'user_id': user_id
318 },
319 },
319 }
320 }
320 })
321 })
321
322
322 return super(RequestContextTask, self).apply_async(
323 return super(RequestContextTask, self).apply_async(
323 args, kwargs, task_id, producer, link, link_error, shadow, **options)
324 args, kwargs, task_id, producer, link, link_error, shadow, **options)
General Comments 0
You need to be logged in to leave comments. Login now