##// END OF EJS Templates
tasks: improve logging on failed tasks
super-admin -
r4867:49f6563f default
parent child Browse files
Show More
@@ -1,313 +1,317 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 """
21 21 Celery loader, run with::
22 22
23 23 celery worker \
24 24 --beat \
25 25 --app rhodecode.lib.celerylib.loader \
26 26 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
27 27 --loglevel DEBUG --ini=._dev/dev.ini
28 28 """
29 29 import os
30 30 import logging
31 31 import importlib
32 32
33 33 from celery import Celery
34 34 from celery import signals
35 35 from celery import Task
36 36 from celery import exceptions # pragma: no cover
37 37 from kombu.serialization import register
38 38 from pyramid.threadlocal import get_current_request
39 39
40 40 import rhodecode
41 41
42 42 from rhodecode.lib.auth import AuthUser
43 43 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
44 44 from rhodecode.lib.ext_json import json
45 45 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
46 46 from rhodecode.lib.utils2 import str2bool
47 47 from rhodecode.model import meta
48 48
49 49
50 50 register('json_ext', json.dumps, json.loads,
51 51 content_type='application/x-json-ext',
52 52 content_encoding='utf-8')
53 53
54 54 log = logging.getLogger('celery.rhodecode.loader')
55 55
56 56
57 57 def add_preload_arguments(parser):
58 58 parser.add_argument(
59 59 '--ini', default=None,
60 60 help='Path to ini configuration file.'
61 61 )
62 62 parser.add_argument(
63 63 '--ini-var', default=None,
64 64 help='Comma separated list of key=value to pass to ini.'
65 65 )
66 66
67 67
68 68 def get_logger(obj):
69 69 custom_log = logging.getLogger(
70 70 'rhodecode.task.{}'.format(obj.__class__.__name__))
71 71
72 72 if rhodecode.CELERY_ENABLED:
73 73 try:
74 74 custom_log = obj.get_logger()
75 75 except Exception:
76 76 pass
77 77
78 78 return custom_log
79 79
80 80
81 81 imports = ['rhodecode.lib.celerylib.tasks']
82 82
83 83 try:
84 84 # try if we have EE tasks available
85 85 importlib.import_module('rc_ee')
86 86 imports.append('rc_ee.lib.celerylib.tasks')
87 87 except ImportError:
88 88 pass
89 89
90 90
91 91 base_celery_config = {
92 92 'result_backend': 'rpc://',
93 93 'result_expires': 60 * 60 * 24,
94 94 'result_persistent': True,
95 95 'imports': imports,
96 96 'worker_max_tasks_per_child': 100,
97 97 'accept_content': ['json_ext'],
98 98 'task_serializer': 'json_ext',
99 99 'result_serializer': 'json_ext',
100 100 'worker_hijack_root_logger': False,
101 101 'database_table_names': {
102 102 'task': 'beat_taskmeta',
103 103 'group': 'beat_groupmeta',
104 104 }
105 105 }
106 106 # init main celery app
107 107 celery_app = Celery()
108 108 celery_app.user_options['preload'].add(add_preload_arguments)
109 109 ini_file_glob = None
110 110
111 111
112 112 @signals.setup_logging.connect
113 113 def setup_logging_callback(**kwargs):
114 114 setup_logging(ini_file_glob)
115 115
116 116
117 117 @signals.user_preload_options.connect
118 118 def on_preload_parsed(options, **kwargs):
119 119 from rhodecode.config.middleware import get_celery_config
120 120
121 121 ini_location = options['ini']
122 122 ini_vars = options['ini_var']
123 123 celery_app.conf['INI_PYRAMID'] = options['ini']
124 124
125 125 if ini_location is None:
126 126 print('You must provide the paste --ini argument')
127 127 exit(-1)
128 128
129 129 options = None
130 130 if ini_vars is not None:
131 131 options = parse_ini_vars(ini_vars)
132 132
133 133 global ini_file_glob
134 134 ini_file_glob = ini_location
135 135
136 136 log.debug('Bootstrapping RhodeCode application...')
137 137 env = bootstrap(ini_location, options=options)
138 138
139 139 celery_settings = get_celery_config(env['registry'].settings)
140 140 setup_celery_app(
141 141 app=env['app'], root=env['root'], request=env['request'],
142 142 registry=env['registry'], closer=env['closer'],
143 143 celery_settings=celery_settings)
144 144
145 145 # fix the global flag even if it's disabled via .ini file because this
146 146 # is a worker code that doesn't need this to be disabled.
147 147 rhodecode.CELERY_ENABLED = True
148 148
149 149
150 150 @signals.task_prerun.connect
151 151 def task_prerun_signal(task_id, task, args, **kwargs):
152 152 ping_db()
153 153
154 154
155 155 @signals.task_success.connect
156 156 def task_success_signal(result, **kwargs):
157 157 meta.Session.commit()
158 158 closer = celery_app.conf['PYRAMID_CLOSER']
159 159 if closer:
160 160 closer()
161 161
162 162
163 163 @signals.task_retry.connect
164 164 def task_retry_signal(
165 165 request, reason, einfo, **kwargs):
166 166 meta.Session.remove()
167 167 closer = celery_app.conf['PYRAMID_CLOSER']
168 168 if closer:
169 169 closer()
170 170
171 171
172 172 @signals.task_failure.connect
173 173 def task_failure_signal(
174 174 task_id, exception, args, kwargs, traceback, einfo, **kargs):
175 175 from rhodecode.lib.exc_tracking import store_exception
176 176 from rhodecode.lib.statsd_client import StatsdClient
177 177
178 178 meta.Session.remove()
179 179
180 180 # simulate sys.exc_info()
181 181 exc_info = (einfo.type, einfo.exception, einfo.tb)
182 182 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
183 183 statsd = StatsdClient.statsd
184 184 if statsd:
185 185 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
186 186 statsd.incr('rhodecode_exception_total',
187 187 tags=["exc_source:celery", "type:{}".format(exc_type)])
188 188
189 189 closer = celery_app.conf['PYRAMID_CLOSER']
190 190 if closer:
191 191 closer()
192 192
193 193
194 194 @signals.task_revoked.connect
195 195 def task_revoked_signal(
196 196 request, terminated, signum, expired, **kwargs):
197 197 closer = celery_app.conf['PYRAMID_CLOSER']
198 198 if closer:
199 199 closer()
200 200
201 201
202 202 def setup_celery_app(app, root, request, registry, closer, celery_settings):
203 203 log.debug('Got custom celery conf: %s', celery_settings)
204 204 celery_config = base_celery_config
205 205 celery_config.update({
206 206 # store celerybeat scheduler db where the .ini file is
207 207 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
208 208 })
209 209
210 210 celery_config.update(celery_settings)
211 211 celery_app.config_from_object(celery_config)
212 212
213 213 celery_app.conf.update({'PYRAMID_APP': app})
214 214 celery_app.conf.update({'PYRAMID_ROOT': root})
215 215 celery_app.conf.update({'PYRAMID_REQUEST': request})
216 216 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
217 217 celery_app.conf.update({'PYRAMID_CLOSER': closer})
218 218
219 219
220 220 def configure_celery(config, celery_settings):
221 221 """
222 222 Helper that is called from our application creation logic. It gives
223 223 connection info into running webapp and allows execution of tasks from
224 224 RhodeCode itself
225 225 """
226 226 # store some globals into rhodecode
227 227 rhodecode.CELERY_ENABLED = str2bool(
228 228 config.registry.settings.get('use_celery'))
229 229 if rhodecode.CELERY_ENABLED:
230 230 log.info('Configuring celery based on `%s` settings', celery_settings)
231 231 setup_celery_app(
232 232 app=None, root=None, request=None, registry=config.registry,
233 233 closer=None, celery_settings=celery_settings)
234 234
235 235
236 236 def maybe_prepare_env(req):
237 237 environ = {}
238 238 try:
239 239 environ.update({
240 240 'PATH_INFO': req.environ['PATH_INFO'],
241 241 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
242 242 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
243 243 'SERVER_NAME': req.environ['SERVER_NAME'],
244 244 'SERVER_PORT': req.environ['SERVER_PORT'],
245 245 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
246 246 })
247 247 except Exception:
248 248 pass
249 249
250 250 return environ
251 251
252 252
253 253 class RequestContextTask(Task):
254 254 """
255 255 This is a celery task which will create a rhodecode app instance context
256 256 for the task, patch pyramid with the original request
257 257 that created the task and also add the user to the context.
258 258 """
259 259
260 260 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
261 261 link=None, link_error=None, shadow=None, **options):
262 262 """ queue the job to run (we are in web request context here) """
263 263
264 264 req = get_current_request()
265 265
266 266 # web case
267 267 if hasattr(req, 'user'):
268 268 ip_addr = req.user.ip_addr
269 269 user_id = req.user.user_id
270 270
271 271 # api case
272 272 elif hasattr(req, 'rpc_user'):
273 273 ip_addr = req.rpc_user.ip_addr
274 274 user_id = req.rpc_user.user_id
275 275 else:
276 276 raise Exception(
277 277 'Unable to fetch required data from request: {}. \n'
278 278 'This task is required to be executed from context of '
279 'request in a webapp'.format(repr(req)))
279 'request in a webapp. Task: {}'.format(
280 repr(req),
281 self
282 )
283 )
280 284
281 285 if req:
282 286 # we hook into kwargs since it is the only way to pass our data to
283 287 # the celery worker
284 288 environ = maybe_prepare_env(req)
285 289 options['headers'] = options.get('headers', {})
286 290 options['headers'].update({
287 291 'rhodecode_proxy_data': {
288 292 'environ': environ,
289 293 'auth_user': {
290 294 'ip_addr': ip_addr,
291 295 'user_id': user_id
292 296 },
293 297 }
294 298 })
295 299
296 300 return super(RequestContextTask, self).apply_async(
297 301 args, kwargs, task_id, producer, link, link_error, shadow, **options)
298 302
299 303 def __call__(self, *args, **kwargs):
300 304 """ rebuild the context and then run task on celery worker """
301 305
302 306 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
303 307 if not proxy_data:
304 308 return super(RequestContextTask, self).__call__(*args, **kwargs)
305 309
306 310 log.debug('using celery proxy data to run task: %r', proxy_data)
307 311 # re-inject and register threadlocals for proper routing support
308 312 request = prepare_request(proxy_data['environ'])
309 313 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
310 314 ip_addr=proxy_data['auth_user']['ip_addr'])
311 315
312 316 return super(RequestContextTask, self).__call__(*args, **kwargs)
313 317
General Comments 0
You need to be logged in to leave comments. Login now