##// END OF EJS Templates
celery: fail with exception if bootstrap is failed
super-admin -
r4876:39926d5e default
parent child Browse files
Show More
@@ -1,323 +1,324 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 """
21 21 Celery loader, run with::
22 22
23 23 celery worker \
24 24 --task-events \
25 25 --beat \
26 26 --autoscale=20,2 \
27 27 --max-tasks-per-child 1 \
28 28 --app rhodecode.lib.celerylib.loader \
29 29 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
30 30 --loglevel DEBUG --ini=.dev/dev.ini
31 31 """
32 32 import os
33 33 import logging
34 34 import importlib
35 35
36 36 from celery import Celery
37 37 from celery import signals
38 38 from celery import Task
39 39 from celery import exceptions # pragma: no cover
40 40 from kombu.serialization import register
41 41 from pyramid.threadlocal import get_current_request
42 42
43 43 import rhodecode
44 44
45 45 from rhodecode.lib.auth import AuthUser
46 46 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
47 47 from rhodecode.lib.ext_json import json
48 48 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
49 49 from rhodecode.lib.utils2 import str2bool
50 50 from rhodecode.model import meta
51 51
52 52
53 53 register('json_ext', json.dumps, json.loads,
54 54 content_type='application/x-json-ext',
55 55 content_encoding='utf-8')
56 56
57 57 log = logging.getLogger('celery.rhodecode.loader')
58 58
59 59
60 60 def add_preload_arguments(parser):
61 61 parser.add_argument(
62 62 '--ini', default=None,
63 63 help='Path to ini configuration file.'
64 64 )
65 65 parser.add_argument(
66 66 '--ini-var', default=None,
67 67 help='Comma separated list of key=value to pass to ini.'
68 68 )
69 69
70 70
71 71 def get_logger(obj):
72 72 custom_log = logging.getLogger(
73 73 'rhodecode.task.{}'.format(obj.__class__.__name__))
74 74
75 75 if rhodecode.CELERY_ENABLED:
76 76 try:
77 77 custom_log = obj.get_logger()
78 78 except Exception:
79 79 pass
80 80
81 81 return custom_log
82 82
83 83
84 84 imports = ['rhodecode.lib.celerylib.tasks']
85 85
86 86 try:
87 87 # try if we have EE tasks available
88 88 importlib.import_module('rc_ee')
89 89 imports.append('rc_ee.lib.celerylib.tasks')
90 90 except ImportError:
91 91 pass
92 92
93 93
94 94 base_celery_config = {
95 95 'result_backend': 'rpc://',
96 96 'result_expires': 60 * 60 * 24,
97 97 'result_persistent': True,
98 98 'imports': imports,
99 99 'worker_max_tasks_per_child': 100,
100 100 'accept_content': ['json_ext'],
101 101 'task_serializer': 'json_ext',
102 102 'result_serializer': 'json_ext',
103 103 'worker_hijack_root_logger': False,
104 104 'database_table_names': {
105 105 'task': 'beat_taskmeta',
106 106 'group': 'beat_groupmeta',
107 107 }
108 108 }
109 109 # init main celery app
110 110 celery_app = Celery()
111 111 celery_app.user_options['preload'].add(add_preload_arguments)
112 112 ini_file_glob = None
113 113
114 114
115 115 @signals.setup_logging.connect
116 116 def setup_logging_callback(**kwargs):
117 117 setup_logging(ini_file_glob)
118 118
119 119
120 120 @signals.user_preload_options.connect
121 121 def on_preload_parsed(options, **kwargs):
122 122 from rhodecode.config.middleware import get_celery_config
123 123
124 124 ini_location = options['ini']
125 125 ini_vars = options['ini_var']
126 126 celery_app.conf['INI_PYRAMID'] = options['ini']
127 127
128 128 if ini_location is None:
129 129 print('You must provide the paste --ini argument')
130 130 exit(-1)
131 131
132 132 options = None
133 133 if ini_vars is not None:
134 134 options = parse_ini_vars(ini_vars)
135 135
136 136 global ini_file_glob
137 137 ini_file_glob = ini_location
138 138
139 139 log.debug('Bootstrapping RhodeCode application...')
140 140
141 env = {}
142 141 try:
143 142 env = bootstrap(ini_location, options=options)
144 143 except Exception:
145 144 log.exception('Failed to bootstrap RhodeCode APP')
145 raise
146 146
147 147 log.debug('Got Pyramid ENV: %s', env)
148
148 149 celery_settings = get_celery_config(env['registry'].settings)
149 150
150 151 setup_celery_app(
151 152 app=env['app'], root=env['root'], request=env['request'],
152 153 registry=env['registry'], closer=env['closer'],
153 154 celery_settings=celery_settings)
154 155
155 156 # fix the global flag even if it's disabled via .ini file because this
156 157 # is a worker code that doesn't need this to be disabled.
157 158 rhodecode.CELERY_ENABLED = True
158 159
159 160
160 161 @signals.task_prerun.connect
161 162 def task_prerun_signal(task_id, task, args, **kwargs):
162 163 ping_db()
163 164
164 165
165 166 @signals.task_success.connect
166 167 def task_success_signal(result, **kwargs):
167 168 meta.Session.commit()
168 169 closer = celery_app.conf['PYRAMID_CLOSER']
169 170 if closer:
170 171 closer()
171 172
172 173
173 174 @signals.task_retry.connect
174 175 def task_retry_signal(
175 176 request, reason, einfo, **kwargs):
176 177 meta.Session.remove()
177 178 closer = celery_app.conf['PYRAMID_CLOSER']
178 179 if closer:
179 180 closer()
180 181
181 182
182 183 @signals.task_failure.connect
183 184 def task_failure_signal(
184 185 task_id, exception, args, kwargs, traceback, einfo, **kargs):
185 186
186 187 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
187 188 from rhodecode.lib.exc_tracking import store_exception
188 189 from rhodecode.lib.statsd_client import StatsdClient
189 190
190 191 meta.Session.remove()
191 192
192 193 # simulate sys.exc_info()
193 194 exc_info = (einfo.type, einfo.exception, einfo.tb)
194 195 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
195 196 statsd = StatsdClient.statsd
196 197 if statsd:
197 198 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
198 199 statsd.incr('rhodecode_exception_total',
199 200 tags=["exc_source:celery", "type:{}".format(exc_type)])
200 201
201 202 closer = celery_app.conf['PYRAMID_CLOSER']
202 203 if closer:
203 204 closer()
204 205
205 206
206 207 @signals.task_revoked.connect
207 208 def task_revoked_signal(
208 209 request, terminated, signum, expired, **kwargs):
209 210 closer = celery_app.conf['PYRAMID_CLOSER']
210 211 if closer:
211 212 closer()
212 213
213 214
214 215 def setup_celery_app(app, root, request, registry, closer, celery_settings):
215 216 log.debug('Got custom celery conf: %s', celery_settings)
216 217 celery_config = base_celery_config
217 218 celery_config.update({
218 219 # store celerybeat scheduler db where the .ini file is
219 220 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
220 221 })
221 222
222 223 celery_config.update(celery_settings)
223 224 celery_app.config_from_object(celery_config)
224 225
225 226 celery_app.conf.update({'PYRAMID_APP': app})
226 227 celery_app.conf.update({'PYRAMID_ROOT': root})
227 228 celery_app.conf.update({'PYRAMID_REQUEST': request})
228 229 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
229 230 celery_app.conf.update({'PYRAMID_CLOSER': closer})
230 231
231 232
232 233 def configure_celery(config, celery_settings):
233 234 """
234 235 Helper that is called from our application creation logic. It gives
235 236 connection info into running webapp and allows execution of tasks from
236 237 RhodeCode itself
237 238 """
238 239 # store some globals into rhodecode
239 240 rhodecode.CELERY_ENABLED = str2bool(
240 241 config.registry.settings.get('use_celery'))
241 242 if rhodecode.CELERY_ENABLED:
242 243 log.info('Configuring celery based on `%s` settings', celery_settings)
243 244 setup_celery_app(
244 245 app=None, root=None, request=None, registry=config.registry,
245 246 closer=None, celery_settings=celery_settings)
246 247
247 248
248 249 def maybe_prepare_env(req):
249 250 environ = {}
250 251 try:
251 252 environ.update({
252 253 'PATH_INFO': req.environ['PATH_INFO'],
253 254 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
254 255 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
255 256 'SERVER_NAME': req.environ['SERVER_NAME'],
256 257 'SERVER_PORT': req.environ['SERVER_PORT'],
257 258 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
258 259 })
259 260 except Exception:
260 261 pass
261 262
262 263 return environ
263 264
264 265
265 266 class RequestContextTask(Task):
266 267 """
267 268 This is a celery task which will create a rhodecode app instance context
268 269 for the task, patch pyramid with the original request
269 270 that created the task and also add the user to the context.
270 271 """
271 272
272 273 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
273 274 link=None, link_error=None, shadow=None, **options):
274 275 """ queue the job to run (we are in web request context here) """
275 276
276 277 req = self.app.conf['PYRAMID_REQUEST'] or get_current_request()
277 278
278 279 log.debug('Running Task with class: %s. Request Class: %s',
279 280 self.__class__, req.__class__)
280 281
281 282 user_id = None
282 283 ip_addr = None
283 284
284 285 # web case
285 286 if hasattr(req, 'user'):
286 287 ip_addr = req.user.ip_addr
287 288 user_id = req.user.user_id
288 289
289 290 # api case
290 291 elif hasattr(req, 'rpc_user'):
291 292 ip_addr = req.rpc_user.ip_addr
292 293 user_id = req.rpc_user.user_id
293 294 else:
294 295 if user_id and ip_addr:
295 296 log.debug('Using data from celery proxy user')
296 297
297 298 else:
298 299 raise Exception(
299 300 'Unable to fetch required data from request: {}. \n'
300 301 'This task is required to be executed from context of '
301 302 'request in a webapp. Task: {}'.format(
302 303 repr(req),
303 304 self.__class__
304 305 )
305 306 )
306 307
307 308 if req:
308 309 # we hook into kwargs since it is the only way to pass our data to
309 310 # the celery worker
310 311 environ = maybe_prepare_env(req)
311 312 options['headers'] = options.get('headers', {})
312 313 options['headers'].update({
313 314 'rhodecode_proxy_data': {
314 315 'environ': environ,
315 316 'auth_user': {
316 317 'ip_addr': ip_addr,
317 318 'user_id': user_id
318 319 },
319 320 }
320 321 })
321 322
322 323 return super(RequestContextTask, self).apply_async(
323 324 args, kwargs, task_id, producer, link, link_error, shadow, **options)
General Comments 0
You need to be logged in to leave comments. Login now