##// END OF EJS Templates
celery: use exc_tracker to store tasks exceptions for easier debugging.
marcink -
r3020:f1460979 stable
parent child Browse files
Show More
@@ -1,295 +1,302 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 """
21 21 Celery loader, run with::
22 22
23 23 celery worker \
24 24 --beat \
25 25 --app rhodecode.lib.celerylib.loader \
26 26 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
27 27 --loglevel DEBUG --ini=._dev/dev.ini
28 28 """
29 29 import os
30 30 import logging
31 31 import importlib
32 32
33 33 from celery import Celery
34 34 from celery import signals
35 35 from celery import Task
36 36 from celery import exceptions # noqa
37 37 from kombu.serialization import register
38 38 from pyramid.threadlocal import get_current_request
39 39
40 40 import rhodecode
41 41
42 42 from rhodecode.lib.auth import AuthUser
43 43 from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars
44 44 from rhodecode.lib.ext_json import json
45 45 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
46 46 from rhodecode.lib.utils2 import str2bool
47 47 from rhodecode.model import meta
48 48
49 49
50 50 register('json_ext', json.dumps, json.loads,
51 51 content_type='application/x-json-ext',
52 52 content_encoding='utf-8')
53 53
54 54 log = logging.getLogger('celery.rhodecode.loader')
55 55
56 56
57 57 def add_preload_arguments(parser):
58 58 parser.add_argument(
59 59 '--ini', default=None,
60 60 help='Path to ini configuration file.'
61 61 )
62 62 parser.add_argument(
63 63 '--ini-var', default=None,
64 64 help='Comma separated list of key=value to pass to ini.'
65 65 )
66 66
67 67
68 68 def get_logger(obj):
69 69 custom_log = logging.getLogger(
70 70 'rhodecode.task.{}'.format(obj.__class__.__name__))
71 71
72 72 if rhodecode.CELERY_ENABLED:
73 73 try:
74 74 custom_log = obj.get_logger()
75 75 except Exception:
76 76 pass
77 77
78 78 return custom_log
79 79
80 80
81 81 imports = ['rhodecode.lib.celerylib.tasks']
82 82
83 83 try:
84 84 # try if we have EE tasks available
85 85 importlib.import_module('rc_ee')
86 86 imports.append('rc_ee.lib.celerylib.tasks')
87 87 except ImportError:
88 88 pass
89 89
90 90
91 91 base_celery_config = {
92 92 'result_backend': 'rpc://',
93 93 'result_expires': 60 * 60 * 24,
94 94 'result_persistent': True,
95 95 'imports': imports,
96 96 'worker_max_tasks_per_child': 100,
97 97 'accept_content': ['json_ext'],
98 98 'task_serializer': 'json_ext',
99 99 'result_serializer': 'json_ext',
100 100 'worker_hijack_root_logger': False,
101 101 'database_table_names': {
102 102 'task': 'beat_taskmeta',
103 103 'group': 'beat_groupmeta',
104 104 }
105 105 }
106 106 # init main celery app
107 107 celery_app = Celery()
108 108 celery_app.user_options['preload'].add(add_preload_arguments)
109 109 ini_file_glob = None
110 110
111 111
112 112 @signals.setup_logging.connect
113 113 def setup_logging_callback(**kwargs):
114 114 setup_logging(ini_file_glob)
115 115
116 116
117 117 @signals.user_preload_options.connect
118 118 def on_preload_parsed(options, **kwargs):
119 119 ini_location = options['ini']
120 120 ini_vars = options['ini_var']
121 121 celery_app.conf['INI_PYRAMID'] = options['ini']
122 122
123 123 if ini_location is None:
124 124 print('You must provide the paste --ini argument')
125 125 exit(-1)
126 126
127 127 options = None
128 128 if ini_vars is not None:
129 129 options = parse_ini_vars(ini_vars)
130 130
131 131 global ini_file_glob
132 132 ini_file_glob = ini_location
133 133
134 134 log.debug('Bootstrapping RhodeCode application...')
135 135 env = bootstrap(ini_location, options=options)
136 136
137 137 setup_celery_app(
138 138 app=env['app'], root=env['root'], request=env['request'],
139 139 registry=env['registry'], closer=env['closer'],
140 140 ini_location=ini_location)
141 141
142 142 # fix the global flag even if it's disabled via .ini file because this
143 143 # is a worker code that doesn't need this to be disabled.
144 144 rhodecode.CELERY_ENABLED = True
145 145
146 146
147 147 @signals.task_success.connect
148 148 def task_success_signal(result, **kwargs):
149 149 meta.Session.commit()
150 150 closer = celery_app.conf['PYRAMID_CLOSER']
151 151 if closer:
152 152 closer()
153 153
154 154
155 155 @signals.task_retry.connect
156 156 def task_retry_signal(
157 157 request, reason, einfo, **kwargs):
158 158 meta.Session.remove()
159 159 closer = celery_app.conf['PYRAMID_CLOSER']
160 160 if closer:
161 161 closer()
162 162
163 163
164 164 @signals.task_failure.connect
165 165 def task_failure_signal(
166 166 task_id, exception, args, kwargs, traceback, einfo, **kargs):
167 from rhodecode.lib.exc_tracking import store_exception
168
167 169 meta.Session.remove()
170
171 # simulate sys.exc_info()
172 exc_info = (einfo.type, einfo.exception, einfo.tb)
173 store_exception(id(exc_info), exc_info, prefix='celery_rhodecode')
174
168 175 closer = celery_app.conf['PYRAMID_CLOSER']
169 176 if closer:
170 177 closer()
171 178
172 179
173 180 @signals.task_revoked.connect
174 181 def task_revoked_signal(
175 182 request, terminated, signum, expired, **kwargs):
176 183 closer = celery_app.conf['PYRAMID_CLOSER']
177 184 if closer:
178 185 closer()
179 186
180 187
181 188 def setup_celery_app(app, root, request, registry, closer, ini_location):
182 189 ini_dir = os.path.dirname(os.path.abspath(ini_location))
183 190 celery_config = base_celery_config
184 191 celery_config.update({
185 192 # store celerybeat scheduler db where the .ini file is
186 193 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'),
187 194 })
188 195 ini_settings = get_ini_config(ini_location)
189 196 log.debug('Got custom celery conf: %s', ini_settings)
190 197
191 198 celery_config.update(ini_settings)
192 199 celery_app.config_from_object(celery_config)
193 200
194 201 celery_app.conf.update({'PYRAMID_APP': app})
195 202 celery_app.conf.update({'PYRAMID_ROOT': root})
196 203 celery_app.conf.update({'PYRAMID_REQUEST': request})
197 204 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
198 205 celery_app.conf.update({'PYRAMID_CLOSER': closer})
199 206
200 207
201 208 def configure_celery(config, ini_location):
202 209 """
203 210 Helper that is called from our application creation logic. It gives
204 211 connection info into running webapp and allows execution of tasks from
205 212 RhodeCode itself
206 213 """
207 214 # store some globals into rhodecode
208 215 rhodecode.CELERY_ENABLED = str2bool(
209 216 config.registry.settings.get('use_celery'))
210 217 if rhodecode.CELERY_ENABLED:
211 218 log.info('Configuring celery based on `%s` file', ini_location)
212 219 setup_celery_app(
213 220 app=None, root=None, request=None, registry=config.registry,
214 221 closer=None, ini_location=ini_location)
215 222
216 223
217 224 def maybe_prepare_env(req):
218 225 environ = {}
219 226 try:
220 227 environ.update({
221 228 'PATH_INFO': req.environ['PATH_INFO'],
222 229 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
223 230 'HTTP_HOST':
224 231 req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
225 232 'SERVER_NAME': req.environ['SERVER_NAME'],
226 233 'SERVER_PORT': req.environ['SERVER_PORT'],
227 234 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
228 235 })
229 236 except Exception:
230 237 pass
231 238
232 239 return environ
233 240
234 241
235 242 class RequestContextTask(Task):
236 243 """
237 244 This is a celery task which will create a rhodecode app instance context
238 245 for the task, patch pyramid with the original request
239 246 that created the task and also add the user to the context.
240 247 """
241 248
242 249 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
243 250 link=None, link_error=None, shadow=None, **options):
244 251 """ queue the job to run (we are in web request context here) """
245 252
246 253 req = get_current_request()
247 254
248 255 # web case
249 256 if hasattr(req, 'user'):
250 257 ip_addr = req.user.ip_addr
251 258 user_id = req.user.user_id
252 259
253 260 # api case
254 261 elif hasattr(req, 'rpc_user'):
255 262 ip_addr = req.rpc_user.ip_addr
256 263 user_id = req.rpc_user.user_id
257 264 else:
258 265 raise Exception(
259 266 'Unable to fetch required data from request: {}. \n'
260 267 'This task is required to be executed from context of '
261 268 'request in a webapp'.format(repr(req)))
262 269
263 270 if req:
264 271 # we hook into kwargs since it is the only way to pass our data to
265 272 # the celery worker
266 273 environ = maybe_prepare_env(req)
267 274 options['headers'] = options.get('headers', {})
268 275 options['headers'].update({
269 276 'rhodecode_proxy_data': {
270 277 'environ': environ,
271 278 'auth_user': {
272 279 'ip_addr': ip_addr,
273 280 'user_id': user_id
274 281 },
275 282 }
276 283 })
277 284
278 285 return super(RequestContextTask, self).apply_async(
279 286 args, kwargs, task_id, producer, link, link_error, shadow, **options)
280 287
281 288 def __call__(self, *args, **kwargs):
282 289 """ rebuild the context and then run task on celery worker """
283 290
284 291 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
285 292 if not proxy_data:
286 293 return super(RequestContextTask, self).__call__(*args, **kwargs)
287 294
288 295 log.debug('using celery proxy data to run task: %r', proxy_data)
289 296 # re-inject and register threadlocals for proper routing support
290 297 request = prepare_request(proxy_data['environ'])
291 298 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
292 299 ip_addr=proxy_data['auth_user']['ip_addr'])
293 300
294 301 return super(RequestContextTask, self).__call__(*args, **kwargs)
295 302
General Comments 0
You need to be logged in to leave comments. Login now