##// END OF EJS Templates
celery: use exc_tracker to store tasks exceptions for easier debugging.
marcink -
r3020:f1460979 stable
parent child Browse files
Show More
@@ -1,295 +1,302 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2018 RhodeCode GmbH
3 # Copyright (C) 2010-2018 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 """
20 """
21 Celery loader, run with::
21 Celery loader, run with::
22
22
23 celery worker \
23 celery worker \
24 --beat \
24 --beat \
25 --app rhodecode.lib.celerylib.loader \
25 --app rhodecode.lib.celerylib.loader \
26 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
26 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
27 --loglevel DEBUG --ini=._dev/dev.ini
27 --loglevel DEBUG --ini=._dev/dev.ini
28 """
28 """
29 import os
29 import os
30 import logging
30 import logging
31 import importlib
31 import importlib
32
32
33 from celery import Celery
33 from celery import Celery
34 from celery import signals
34 from celery import signals
35 from celery import Task
35 from celery import Task
36 from celery import exceptions # noqa
36 from celery import exceptions # noqa
37 from kombu.serialization import register
37 from kombu.serialization import register
38 from pyramid.threadlocal import get_current_request
38 from pyramid.threadlocal import get_current_request
39
39
40 import rhodecode
40 import rhodecode
41
41
42 from rhodecode.lib.auth import AuthUser
42 from rhodecode.lib.auth import AuthUser
43 from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars
43 from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars
44 from rhodecode.lib.ext_json import json
44 from rhodecode.lib.ext_json import json
45 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
45 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
46 from rhodecode.lib.utils2 import str2bool
46 from rhodecode.lib.utils2 import str2bool
47 from rhodecode.model import meta
47 from rhodecode.model import meta
48
48
49
49
50 register('json_ext', json.dumps, json.loads,
50 register('json_ext', json.dumps, json.loads,
51 content_type='application/x-json-ext',
51 content_type='application/x-json-ext',
52 content_encoding='utf-8')
52 content_encoding='utf-8')
53
53
54 log = logging.getLogger('celery.rhodecode.loader')
54 log = logging.getLogger('celery.rhodecode.loader')
55
55
56
56
57 def add_preload_arguments(parser):
57 def add_preload_arguments(parser):
58 parser.add_argument(
58 parser.add_argument(
59 '--ini', default=None,
59 '--ini', default=None,
60 help='Path to ini configuration file.'
60 help='Path to ini configuration file.'
61 )
61 )
62 parser.add_argument(
62 parser.add_argument(
63 '--ini-var', default=None,
63 '--ini-var', default=None,
64 help='Comma separated list of key=value to pass to ini.'
64 help='Comma separated list of key=value to pass to ini.'
65 )
65 )
66
66
67
67
68 def get_logger(obj):
68 def get_logger(obj):
69 custom_log = logging.getLogger(
69 custom_log = logging.getLogger(
70 'rhodecode.task.{}'.format(obj.__class__.__name__))
70 'rhodecode.task.{}'.format(obj.__class__.__name__))
71
71
72 if rhodecode.CELERY_ENABLED:
72 if rhodecode.CELERY_ENABLED:
73 try:
73 try:
74 custom_log = obj.get_logger()
74 custom_log = obj.get_logger()
75 except Exception:
75 except Exception:
76 pass
76 pass
77
77
78 return custom_log
78 return custom_log
79
79
80
80
81 imports = ['rhodecode.lib.celerylib.tasks']
81 imports = ['rhodecode.lib.celerylib.tasks']
82
82
83 try:
83 try:
84 # try if we have EE tasks available
84 # try if we have EE tasks available
85 importlib.import_module('rc_ee')
85 importlib.import_module('rc_ee')
86 imports.append('rc_ee.lib.celerylib.tasks')
86 imports.append('rc_ee.lib.celerylib.tasks')
87 except ImportError:
87 except ImportError:
88 pass
88 pass
89
89
90
90
91 base_celery_config = {
91 base_celery_config = {
92 'result_backend': 'rpc://',
92 'result_backend': 'rpc://',
93 'result_expires': 60 * 60 * 24,
93 'result_expires': 60 * 60 * 24,
94 'result_persistent': True,
94 'result_persistent': True,
95 'imports': imports,
95 'imports': imports,
96 'worker_max_tasks_per_child': 100,
96 'worker_max_tasks_per_child': 100,
97 'accept_content': ['json_ext'],
97 'accept_content': ['json_ext'],
98 'task_serializer': 'json_ext',
98 'task_serializer': 'json_ext',
99 'result_serializer': 'json_ext',
99 'result_serializer': 'json_ext',
100 'worker_hijack_root_logger': False,
100 'worker_hijack_root_logger': False,
101 'database_table_names': {
101 'database_table_names': {
102 'task': 'beat_taskmeta',
102 'task': 'beat_taskmeta',
103 'group': 'beat_groupmeta',
103 'group': 'beat_groupmeta',
104 }
104 }
105 }
105 }
106 # init main celery app
106 # init main celery app
107 celery_app = Celery()
107 celery_app = Celery()
108 celery_app.user_options['preload'].add(add_preload_arguments)
108 celery_app.user_options['preload'].add(add_preload_arguments)
109 ini_file_glob = None
109 ini_file_glob = None
110
110
111
111
112 @signals.setup_logging.connect
112 @signals.setup_logging.connect
113 def setup_logging_callback(**kwargs):
113 def setup_logging_callback(**kwargs):
114 setup_logging(ini_file_glob)
114 setup_logging(ini_file_glob)
115
115
116
116
117 @signals.user_preload_options.connect
117 @signals.user_preload_options.connect
118 def on_preload_parsed(options, **kwargs):
118 def on_preload_parsed(options, **kwargs):
119 ini_location = options['ini']
119 ini_location = options['ini']
120 ini_vars = options['ini_var']
120 ini_vars = options['ini_var']
121 celery_app.conf['INI_PYRAMID'] = options['ini']
121 celery_app.conf['INI_PYRAMID'] = options['ini']
122
122
123 if ini_location is None:
123 if ini_location is None:
124 print('You must provide the paste --ini argument')
124 print('You must provide the paste --ini argument')
125 exit(-1)
125 exit(-1)
126
126
127 options = None
127 options = None
128 if ini_vars is not None:
128 if ini_vars is not None:
129 options = parse_ini_vars(ini_vars)
129 options = parse_ini_vars(ini_vars)
130
130
131 global ini_file_glob
131 global ini_file_glob
132 ini_file_glob = ini_location
132 ini_file_glob = ini_location
133
133
134 log.debug('Bootstrapping RhodeCode application...')
134 log.debug('Bootstrapping RhodeCode application...')
135 env = bootstrap(ini_location, options=options)
135 env = bootstrap(ini_location, options=options)
136
136
137 setup_celery_app(
137 setup_celery_app(
138 app=env['app'], root=env['root'], request=env['request'],
138 app=env['app'], root=env['root'], request=env['request'],
139 registry=env['registry'], closer=env['closer'],
139 registry=env['registry'], closer=env['closer'],
140 ini_location=ini_location)
140 ini_location=ini_location)
141
141
142 # fix the global flag even if it's disabled via .ini file because this
142 # fix the global flag even if it's disabled via .ini file because this
143 # is a worker code that doesn't need this to be disabled.
143 # is a worker code that doesn't need this to be disabled.
144 rhodecode.CELERY_ENABLED = True
144 rhodecode.CELERY_ENABLED = True
145
145
146
146
147 @signals.task_success.connect
147 @signals.task_success.connect
148 def task_success_signal(result, **kwargs):
148 def task_success_signal(result, **kwargs):
149 meta.Session.commit()
149 meta.Session.commit()
150 closer = celery_app.conf['PYRAMID_CLOSER']
150 closer = celery_app.conf['PYRAMID_CLOSER']
151 if closer:
151 if closer:
152 closer()
152 closer()
153
153
154
154
155 @signals.task_retry.connect
155 @signals.task_retry.connect
156 def task_retry_signal(
156 def task_retry_signal(
157 request, reason, einfo, **kwargs):
157 request, reason, einfo, **kwargs):
158 meta.Session.remove()
158 meta.Session.remove()
159 closer = celery_app.conf['PYRAMID_CLOSER']
159 closer = celery_app.conf['PYRAMID_CLOSER']
160 if closer:
160 if closer:
161 closer()
161 closer()
162
162
163
163
164 @signals.task_failure.connect
164 @signals.task_failure.connect
165 def task_failure_signal(
165 def task_failure_signal(
166 task_id, exception, args, kwargs, traceback, einfo, **kargs):
166 task_id, exception, args, kwargs, traceback, einfo, **kargs):
167 from rhodecode.lib.exc_tracking import store_exception
168
167 meta.Session.remove()
169 meta.Session.remove()
170
171 # simulate sys.exc_info()
172 exc_info = (einfo.type, einfo.exception, einfo.tb)
173 store_exception(id(exc_info), exc_info, prefix='celery_rhodecode')
174
168 closer = celery_app.conf['PYRAMID_CLOSER']
175 closer = celery_app.conf['PYRAMID_CLOSER']
169 if closer:
176 if closer:
170 closer()
177 closer()
171
178
172
179
173 @signals.task_revoked.connect
180 @signals.task_revoked.connect
174 def task_revoked_signal(
181 def task_revoked_signal(
175 request, terminated, signum, expired, **kwargs):
182 request, terminated, signum, expired, **kwargs):
176 closer = celery_app.conf['PYRAMID_CLOSER']
183 closer = celery_app.conf['PYRAMID_CLOSER']
177 if closer:
184 if closer:
178 closer()
185 closer()
179
186
180
187
181 def setup_celery_app(app, root, request, registry, closer, ini_location):
188 def setup_celery_app(app, root, request, registry, closer, ini_location):
182 ini_dir = os.path.dirname(os.path.abspath(ini_location))
189 ini_dir = os.path.dirname(os.path.abspath(ini_location))
183 celery_config = base_celery_config
190 celery_config = base_celery_config
184 celery_config.update({
191 celery_config.update({
185 # store celerybeat scheduler db where the .ini file is
192 # store celerybeat scheduler db where the .ini file is
186 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'),
193 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'),
187 })
194 })
188 ini_settings = get_ini_config(ini_location)
195 ini_settings = get_ini_config(ini_location)
189 log.debug('Got custom celery conf: %s', ini_settings)
196 log.debug('Got custom celery conf: %s', ini_settings)
190
197
191 celery_config.update(ini_settings)
198 celery_config.update(ini_settings)
192 celery_app.config_from_object(celery_config)
199 celery_app.config_from_object(celery_config)
193
200
194 celery_app.conf.update({'PYRAMID_APP': app})
201 celery_app.conf.update({'PYRAMID_APP': app})
195 celery_app.conf.update({'PYRAMID_ROOT': root})
202 celery_app.conf.update({'PYRAMID_ROOT': root})
196 celery_app.conf.update({'PYRAMID_REQUEST': request})
203 celery_app.conf.update({'PYRAMID_REQUEST': request})
197 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
204 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
198 celery_app.conf.update({'PYRAMID_CLOSER': closer})
205 celery_app.conf.update({'PYRAMID_CLOSER': closer})
199
206
200
207
201 def configure_celery(config, ini_location):
208 def configure_celery(config, ini_location):
202 """
209 """
203 Helper that is called from our application creation logic. It gives
210 Helper that is called from our application creation logic. It gives
204 connection info into running webapp and allows execution of tasks from
211 connection info into running webapp and allows execution of tasks from
205 RhodeCode itself
212 RhodeCode itself
206 """
213 """
207 # store some globals into rhodecode
214 # store some globals into rhodecode
208 rhodecode.CELERY_ENABLED = str2bool(
215 rhodecode.CELERY_ENABLED = str2bool(
209 config.registry.settings.get('use_celery'))
216 config.registry.settings.get('use_celery'))
210 if rhodecode.CELERY_ENABLED:
217 if rhodecode.CELERY_ENABLED:
211 log.info('Configuring celery based on `%s` file', ini_location)
218 log.info('Configuring celery based on `%s` file', ini_location)
212 setup_celery_app(
219 setup_celery_app(
213 app=None, root=None, request=None, registry=config.registry,
220 app=None, root=None, request=None, registry=config.registry,
214 closer=None, ini_location=ini_location)
221 closer=None, ini_location=ini_location)
215
222
216
223
217 def maybe_prepare_env(req):
224 def maybe_prepare_env(req):
218 environ = {}
225 environ = {}
219 try:
226 try:
220 environ.update({
227 environ.update({
221 'PATH_INFO': req.environ['PATH_INFO'],
228 'PATH_INFO': req.environ['PATH_INFO'],
222 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
229 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
223 'HTTP_HOST':
230 'HTTP_HOST':
224 req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
231 req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
225 'SERVER_NAME': req.environ['SERVER_NAME'],
232 'SERVER_NAME': req.environ['SERVER_NAME'],
226 'SERVER_PORT': req.environ['SERVER_PORT'],
233 'SERVER_PORT': req.environ['SERVER_PORT'],
227 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
234 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
228 })
235 })
229 except Exception:
236 except Exception:
230 pass
237 pass
231
238
232 return environ
239 return environ
233
240
234
241
235 class RequestContextTask(Task):
242 class RequestContextTask(Task):
236 """
243 """
237 This is a celery task which will create a rhodecode app instance context
244 This is a celery task which will create a rhodecode app instance context
238 for the task, patch pyramid with the original request
245 for the task, patch pyramid with the original request
239 that created the task and also add the user to the context.
246 that created the task and also add the user to the context.
240 """
247 """
241
248
242 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
249 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
243 link=None, link_error=None, shadow=None, **options):
250 link=None, link_error=None, shadow=None, **options):
244 """ queue the job to run (we are in web request context here) """
251 """ queue the job to run (we are in web request context here) """
245
252
246 req = get_current_request()
253 req = get_current_request()
247
254
248 # web case
255 # web case
249 if hasattr(req, 'user'):
256 if hasattr(req, 'user'):
250 ip_addr = req.user.ip_addr
257 ip_addr = req.user.ip_addr
251 user_id = req.user.user_id
258 user_id = req.user.user_id
252
259
253 # api case
260 # api case
254 elif hasattr(req, 'rpc_user'):
261 elif hasattr(req, 'rpc_user'):
255 ip_addr = req.rpc_user.ip_addr
262 ip_addr = req.rpc_user.ip_addr
256 user_id = req.rpc_user.user_id
263 user_id = req.rpc_user.user_id
257 else:
264 else:
258 raise Exception(
265 raise Exception(
259 'Unable to fetch required data from request: {}. \n'
266 'Unable to fetch required data from request: {}. \n'
260 'This task is required to be executed from context of '
267 'This task is required to be executed from context of '
261 'request in a webapp'.format(repr(req)))
268 'request in a webapp'.format(repr(req)))
262
269
263 if req:
270 if req:
264 # we hook into kwargs since it is the only way to pass our data to
271 # we hook into kwargs since it is the only way to pass our data to
265 # the celery worker
272 # the celery worker
266 environ = maybe_prepare_env(req)
273 environ = maybe_prepare_env(req)
267 options['headers'] = options.get('headers', {})
274 options['headers'] = options.get('headers', {})
268 options['headers'].update({
275 options['headers'].update({
269 'rhodecode_proxy_data': {
276 'rhodecode_proxy_data': {
270 'environ': environ,
277 'environ': environ,
271 'auth_user': {
278 'auth_user': {
272 'ip_addr': ip_addr,
279 'ip_addr': ip_addr,
273 'user_id': user_id
280 'user_id': user_id
274 },
281 },
275 }
282 }
276 })
283 })
277
284
278 return super(RequestContextTask, self).apply_async(
285 return super(RequestContextTask, self).apply_async(
279 args, kwargs, task_id, producer, link, link_error, shadow, **options)
286 args, kwargs, task_id, producer, link, link_error, shadow, **options)
280
287
281 def __call__(self, *args, **kwargs):
288 def __call__(self, *args, **kwargs):
282 """ rebuild the context and then run task on celery worker """
289 """ rebuild the context and then run task on celery worker """
283
290
284 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
291 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
285 if not proxy_data:
292 if not proxy_data:
286 return super(RequestContextTask, self).__call__(*args, **kwargs)
293 return super(RequestContextTask, self).__call__(*args, **kwargs)
287
294
288 log.debug('using celery proxy data to run task: %r', proxy_data)
295 log.debug('using celery proxy data to run task: %r', proxy_data)
289 # re-inject and register threadlocals for proper routing support
296 # re-inject and register threadlocals for proper routing support
290 request = prepare_request(proxy_data['environ'])
297 request = prepare_request(proxy_data['environ'])
291 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
298 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
292 ip_addr=proxy_data['auth_user']['ip_addr'])
299 ip_addr=proxy_data['auth_user']['ip_addr'])
293
300
294 return super(RequestContextTask, self).__call__(*args, **kwargs)
301 return super(RequestContextTask, self).__call__(*args, **kwargs)
295
302
General Comments 0
You need to be logged in to leave comments. Login now