##// END OF EJS Templates
tasks: improve logging on failed tasks
super-admin -
r4867:49f6563f default
parent child Browse files
Show More
@@ -1,313 +1,317 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2020 RhodeCode GmbH
3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 """
20 """
21 Celery loader, run with::
21 Celery loader, run with::
22
22
23 celery worker \
23 celery worker \
24 --beat \
24 --beat \
25 --app rhodecode.lib.celerylib.loader \
25 --app rhodecode.lib.celerylib.loader \
26 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
26 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
27 --loglevel DEBUG --ini=._dev/dev.ini
27 --loglevel DEBUG --ini=._dev/dev.ini
28 """
28 """
29 import os
29 import os
30 import logging
30 import logging
31 import importlib
31 import importlib
32
32
33 from celery import Celery
33 from celery import Celery
34 from celery import signals
34 from celery import signals
35 from celery import Task
35 from celery import Task
36 from celery import exceptions # pragma: no cover
36 from celery import exceptions # pragma: no cover
37 from kombu.serialization import register
37 from kombu.serialization import register
38 from pyramid.threadlocal import get_current_request
38 from pyramid.threadlocal import get_current_request
39
39
40 import rhodecode
40 import rhodecode
41
41
42 from rhodecode.lib.auth import AuthUser
42 from rhodecode.lib.auth import AuthUser
43 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
43 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
44 from rhodecode.lib.ext_json import json
44 from rhodecode.lib.ext_json import json
45 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
45 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
46 from rhodecode.lib.utils2 import str2bool
46 from rhodecode.lib.utils2 import str2bool
47 from rhodecode.model import meta
47 from rhodecode.model import meta
48
48
49
49
50 register('json_ext', json.dumps, json.loads,
50 register('json_ext', json.dumps, json.loads,
51 content_type='application/x-json-ext',
51 content_type='application/x-json-ext',
52 content_encoding='utf-8')
52 content_encoding='utf-8')
53
53
54 log = logging.getLogger('celery.rhodecode.loader')
54 log = logging.getLogger('celery.rhodecode.loader')
55
55
56
56
57 def add_preload_arguments(parser):
57 def add_preload_arguments(parser):
58 parser.add_argument(
58 parser.add_argument(
59 '--ini', default=None,
59 '--ini', default=None,
60 help='Path to ini configuration file.'
60 help='Path to ini configuration file.'
61 )
61 )
62 parser.add_argument(
62 parser.add_argument(
63 '--ini-var', default=None,
63 '--ini-var', default=None,
64 help='Comma separated list of key=value to pass to ini.'
64 help='Comma separated list of key=value to pass to ini.'
65 )
65 )
66
66
67
67
68 def get_logger(obj):
68 def get_logger(obj):
69 custom_log = logging.getLogger(
69 custom_log = logging.getLogger(
70 'rhodecode.task.{}'.format(obj.__class__.__name__))
70 'rhodecode.task.{}'.format(obj.__class__.__name__))
71
71
72 if rhodecode.CELERY_ENABLED:
72 if rhodecode.CELERY_ENABLED:
73 try:
73 try:
74 custom_log = obj.get_logger()
74 custom_log = obj.get_logger()
75 except Exception:
75 except Exception:
76 pass
76 pass
77
77
78 return custom_log
78 return custom_log
79
79
80
80
81 imports = ['rhodecode.lib.celerylib.tasks']
81 imports = ['rhodecode.lib.celerylib.tasks']
82
82
83 try:
83 try:
84 # try if we have EE tasks available
84 # try if we have EE tasks available
85 importlib.import_module('rc_ee')
85 importlib.import_module('rc_ee')
86 imports.append('rc_ee.lib.celerylib.tasks')
86 imports.append('rc_ee.lib.celerylib.tasks')
87 except ImportError:
87 except ImportError:
88 pass
88 pass
89
89
90
90
91 base_celery_config = {
91 base_celery_config = {
92 'result_backend': 'rpc://',
92 'result_backend': 'rpc://',
93 'result_expires': 60 * 60 * 24,
93 'result_expires': 60 * 60 * 24,
94 'result_persistent': True,
94 'result_persistent': True,
95 'imports': imports,
95 'imports': imports,
96 'worker_max_tasks_per_child': 100,
96 'worker_max_tasks_per_child': 100,
97 'accept_content': ['json_ext'],
97 'accept_content': ['json_ext'],
98 'task_serializer': 'json_ext',
98 'task_serializer': 'json_ext',
99 'result_serializer': 'json_ext',
99 'result_serializer': 'json_ext',
100 'worker_hijack_root_logger': False,
100 'worker_hijack_root_logger': False,
101 'database_table_names': {
101 'database_table_names': {
102 'task': 'beat_taskmeta',
102 'task': 'beat_taskmeta',
103 'group': 'beat_groupmeta',
103 'group': 'beat_groupmeta',
104 }
104 }
105 }
105 }
106 # init main celery app
106 # init main celery app
107 celery_app = Celery()
107 celery_app = Celery()
108 celery_app.user_options['preload'].add(add_preload_arguments)
108 celery_app.user_options['preload'].add(add_preload_arguments)
109 ini_file_glob = None
109 ini_file_glob = None
110
110
111
111
112 @signals.setup_logging.connect
112 @signals.setup_logging.connect
113 def setup_logging_callback(**kwargs):
113 def setup_logging_callback(**kwargs):
114 setup_logging(ini_file_glob)
114 setup_logging(ini_file_glob)
115
115
116
116
117 @signals.user_preload_options.connect
117 @signals.user_preload_options.connect
118 def on_preload_parsed(options, **kwargs):
118 def on_preload_parsed(options, **kwargs):
119 from rhodecode.config.middleware import get_celery_config
119 from rhodecode.config.middleware import get_celery_config
120
120
121 ini_location = options['ini']
121 ini_location = options['ini']
122 ini_vars = options['ini_var']
122 ini_vars = options['ini_var']
123 celery_app.conf['INI_PYRAMID'] = options['ini']
123 celery_app.conf['INI_PYRAMID'] = options['ini']
124
124
125 if ini_location is None:
125 if ini_location is None:
126 print('You must provide the paste --ini argument')
126 print('You must provide the paste --ini argument')
127 exit(-1)
127 exit(-1)
128
128
129 options = None
129 options = None
130 if ini_vars is not None:
130 if ini_vars is not None:
131 options = parse_ini_vars(ini_vars)
131 options = parse_ini_vars(ini_vars)
132
132
133 global ini_file_glob
133 global ini_file_glob
134 ini_file_glob = ini_location
134 ini_file_glob = ini_location
135
135
136 log.debug('Bootstrapping RhodeCode application...')
136 log.debug('Bootstrapping RhodeCode application...')
137 env = bootstrap(ini_location, options=options)
137 env = bootstrap(ini_location, options=options)
138
138
139 celery_settings = get_celery_config(env['registry'].settings)
139 celery_settings = get_celery_config(env['registry'].settings)
140 setup_celery_app(
140 setup_celery_app(
141 app=env['app'], root=env['root'], request=env['request'],
141 app=env['app'], root=env['root'], request=env['request'],
142 registry=env['registry'], closer=env['closer'],
142 registry=env['registry'], closer=env['closer'],
143 celery_settings=celery_settings)
143 celery_settings=celery_settings)
144
144
145 # fix the global flag even if it's disabled via .ini file because this
145 # fix the global flag even if it's disabled via .ini file because this
146 # is a worker code that doesn't need this to be disabled.
146 # is a worker code that doesn't need this to be disabled.
147 rhodecode.CELERY_ENABLED = True
147 rhodecode.CELERY_ENABLED = True
148
148
149
149
150 @signals.task_prerun.connect
150 @signals.task_prerun.connect
151 def task_prerun_signal(task_id, task, args, **kwargs):
151 def task_prerun_signal(task_id, task, args, **kwargs):
152 ping_db()
152 ping_db()
153
153
154
154
155 @signals.task_success.connect
155 @signals.task_success.connect
156 def task_success_signal(result, **kwargs):
156 def task_success_signal(result, **kwargs):
157 meta.Session.commit()
157 meta.Session.commit()
158 closer = celery_app.conf['PYRAMID_CLOSER']
158 closer = celery_app.conf['PYRAMID_CLOSER']
159 if closer:
159 if closer:
160 closer()
160 closer()
161
161
162
162
163 @signals.task_retry.connect
163 @signals.task_retry.connect
164 def task_retry_signal(
164 def task_retry_signal(
165 request, reason, einfo, **kwargs):
165 request, reason, einfo, **kwargs):
166 meta.Session.remove()
166 meta.Session.remove()
167 closer = celery_app.conf['PYRAMID_CLOSER']
167 closer = celery_app.conf['PYRAMID_CLOSER']
168 if closer:
168 if closer:
169 closer()
169 closer()
170
170
171
171
172 @signals.task_failure.connect
172 @signals.task_failure.connect
173 def task_failure_signal(
173 def task_failure_signal(
174 task_id, exception, args, kwargs, traceback, einfo, **kargs):
174 task_id, exception, args, kwargs, traceback, einfo, **kargs):
175 from rhodecode.lib.exc_tracking import store_exception
175 from rhodecode.lib.exc_tracking import store_exception
176 from rhodecode.lib.statsd_client import StatsdClient
176 from rhodecode.lib.statsd_client import StatsdClient
177
177
178 meta.Session.remove()
178 meta.Session.remove()
179
179
180 # simulate sys.exc_info()
180 # simulate sys.exc_info()
181 exc_info = (einfo.type, einfo.exception, einfo.tb)
181 exc_info = (einfo.type, einfo.exception, einfo.tb)
182 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
182 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
183 statsd = StatsdClient.statsd
183 statsd = StatsdClient.statsd
184 if statsd:
184 if statsd:
185 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
185 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
186 statsd.incr('rhodecode_exception_total',
186 statsd.incr('rhodecode_exception_total',
187 tags=["exc_source:celery", "type:{}".format(exc_type)])
187 tags=["exc_source:celery", "type:{}".format(exc_type)])
188
188
189 closer = celery_app.conf['PYRAMID_CLOSER']
189 closer = celery_app.conf['PYRAMID_CLOSER']
190 if closer:
190 if closer:
191 closer()
191 closer()
192
192
193
193
194 @signals.task_revoked.connect
194 @signals.task_revoked.connect
195 def task_revoked_signal(
195 def task_revoked_signal(
196 request, terminated, signum, expired, **kwargs):
196 request, terminated, signum, expired, **kwargs):
197 closer = celery_app.conf['PYRAMID_CLOSER']
197 closer = celery_app.conf['PYRAMID_CLOSER']
198 if closer:
198 if closer:
199 closer()
199 closer()
200
200
201
201
202 def setup_celery_app(app, root, request, registry, closer, celery_settings):
202 def setup_celery_app(app, root, request, registry, closer, celery_settings):
203 log.debug('Got custom celery conf: %s', celery_settings)
203 log.debug('Got custom celery conf: %s', celery_settings)
204 celery_config = base_celery_config
204 celery_config = base_celery_config
205 celery_config.update({
205 celery_config.update({
206 # store celerybeat scheduler db where the .ini file is
206 # store celerybeat scheduler db where the .ini file is
207 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
207 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
208 })
208 })
209
209
210 celery_config.update(celery_settings)
210 celery_config.update(celery_settings)
211 celery_app.config_from_object(celery_config)
211 celery_app.config_from_object(celery_config)
212
212
213 celery_app.conf.update({'PYRAMID_APP': app})
213 celery_app.conf.update({'PYRAMID_APP': app})
214 celery_app.conf.update({'PYRAMID_ROOT': root})
214 celery_app.conf.update({'PYRAMID_ROOT': root})
215 celery_app.conf.update({'PYRAMID_REQUEST': request})
215 celery_app.conf.update({'PYRAMID_REQUEST': request})
216 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
216 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
217 celery_app.conf.update({'PYRAMID_CLOSER': closer})
217 celery_app.conf.update({'PYRAMID_CLOSER': closer})
218
218
219
219
220 def configure_celery(config, celery_settings):
220 def configure_celery(config, celery_settings):
221 """
221 """
222 Helper that is called from our application creation logic. It gives
222 Helper that is called from our application creation logic. It gives
223 connection info into running webapp and allows execution of tasks from
223 connection info into running webapp and allows execution of tasks from
224 RhodeCode itself
224 RhodeCode itself
225 """
225 """
226 # store some globals into rhodecode
226 # store some globals into rhodecode
227 rhodecode.CELERY_ENABLED = str2bool(
227 rhodecode.CELERY_ENABLED = str2bool(
228 config.registry.settings.get('use_celery'))
228 config.registry.settings.get('use_celery'))
229 if rhodecode.CELERY_ENABLED:
229 if rhodecode.CELERY_ENABLED:
230 log.info('Configuring celery based on `%s` settings', celery_settings)
230 log.info('Configuring celery based on `%s` settings', celery_settings)
231 setup_celery_app(
231 setup_celery_app(
232 app=None, root=None, request=None, registry=config.registry,
232 app=None, root=None, request=None, registry=config.registry,
233 closer=None, celery_settings=celery_settings)
233 closer=None, celery_settings=celery_settings)
234
234
235
235
236 def maybe_prepare_env(req):
236 def maybe_prepare_env(req):
237 environ = {}
237 environ = {}
238 try:
238 try:
239 environ.update({
239 environ.update({
240 'PATH_INFO': req.environ['PATH_INFO'],
240 'PATH_INFO': req.environ['PATH_INFO'],
241 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
241 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
242 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
242 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
243 'SERVER_NAME': req.environ['SERVER_NAME'],
243 'SERVER_NAME': req.environ['SERVER_NAME'],
244 'SERVER_PORT': req.environ['SERVER_PORT'],
244 'SERVER_PORT': req.environ['SERVER_PORT'],
245 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
245 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
246 })
246 })
247 except Exception:
247 except Exception:
248 pass
248 pass
249
249
250 return environ
250 return environ
251
251
252
252
253 class RequestContextTask(Task):
253 class RequestContextTask(Task):
254 """
254 """
255 This is a celery task which will create a rhodecode app instance context
255 This is a celery task which will create a rhodecode app instance context
256 for the task, patch pyramid with the original request
256 for the task, patch pyramid with the original request
257 that created the task and also add the user to the context.
257 that created the task and also add the user to the context.
258 """
258 """
259
259
260 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
260 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
261 link=None, link_error=None, shadow=None, **options):
261 link=None, link_error=None, shadow=None, **options):
262 """ queue the job to run (we are in web request context here) """
262 """ queue the job to run (we are in web request context here) """
263
263
264 req = get_current_request()
264 req = get_current_request()
265
265
266 # web case
266 # web case
267 if hasattr(req, 'user'):
267 if hasattr(req, 'user'):
268 ip_addr = req.user.ip_addr
268 ip_addr = req.user.ip_addr
269 user_id = req.user.user_id
269 user_id = req.user.user_id
270
270
271 # api case
271 # api case
272 elif hasattr(req, 'rpc_user'):
272 elif hasattr(req, 'rpc_user'):
273 ip_addr = req.rpc_user.ip_addr
273 ip_addr = req.rpc_user.ip_addr
274 user_id = req.rpc_user.user_id
274 user_id = req.rpc_user.user_id
275 else:
275 else:
276 raise Exception(
276 raise Exception(
277 'Unable to fetch required data from request: {}. \n'
277 'Unable to fetch required data from request: {}. \n'
278 'This task is required to be executed from context of '
278 'This task is required to be executed from context of '
279 'request in a webapp'.format(repr(req)))
279 'request in a webapp. Task: {}'.format(
280 repr(req),
281 self
282 )
283 )
280
284
281 if req:
285 if req:
282 # we hook into kwargs since it is the only way to pass our data to
286 # we hook into kwargs since it is the only way to pass our data to
283 # the celery worker
287 # the celery worker
284 environ = maybe_prepare_env(req)
288 environ = maybe_prepare_env(req)
285 options['headers'] = options.get('headers', {})
289 options['headers'] = options.get('headers', {})
286 options['headers'].update({
290 options['headers'].update({
287 'rhodecode_proxy_data': {
291 'rhodecode_proxy_data': {
288 'environ': environ,
292 'environ': environ,
289 'auth_user': {
293 'auth_user': {
290 'ip_addr': ip_addr,
294 'ip_addr': ip_addr,
291 'user_id': user_id
295 'user_id': user_id
292 },
296 },
293 }
297 }
294 })
298 })
295
299
296 return super(RequestContextTask, self).apply_async(
300 return super(RequestContextTask, self).apply_async(
297 args, kwargs, task_id, producer, link, link_error, shadow, **options)
301 args, kwargs, task_id, producer, link, link_error, shadow, **options)
298
302
299 def __call__(self, *args, **kwargs):
303 def __call__(self, *args, **kwargs):
300 """ rebuild the context and then run task on celery worker """
304 """ rebuild the context and then run task on celery worker """
301
305
302 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
306 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
303 if not proxy_data:
307 if not proxy_data:
304 return super(RequestContextTask, self).__call__(*args, **kwargs)
308 return super(RequestContextTask, self).__call__(*args, **kwargs)
305
309
306 log.debug('using celery proxy data to run task: %r', proxy_data)
310 log.debug('using celery proxy data to run task: %r', proxy_data)
307 # re-inject and register threadlocals for proper routing support
311 # re-inject and register threadlocals for proper routing support
308 request = prepare_request(proxy_data['environ'])
312 request = prepare_request(proxy_data['environ'])
309 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
313 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
310 ip_addr=proxy_data['auth_user']['ip_addr'])
314 ip_addr=proxy_data['auth_user']['ip_addr'])
311
315
312 return super(RequestContextTask, self).__call__(*args, **kwargs)
316 return super(RequestContextTask, self).__call__(*args, **kwargs)
313
317
General Comments 0
You need to be logged in to leave comments. Login now