##// END OF EJS Templates
celery: use just one instance of UNSET() object
super-admin -
r5019:7f6920ce default
parent child Browse files
Show More
@@ -1,353 +1,356 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 """
21 21 Celery loader, run with::
22 22
23 23 celery worker \
24 24 --task-events \
25 25 --beat \
26 26 --autoscale=20,2 \
27 27 --max-tasks-per-child 1 \
28 28 --app rhodecode.lib.celerylib.loader \
29 29 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
30 30 --loglevel DEBUG --ini=.dev/dev.ini
31 31 """
32 32 import os
33 33 import logging
34 34 import importlib
35 35
36 36 from celery import Celery
37 37 from celery import signals
38 38 from celery import Task
39 39 from celery import exceptions # pragma: no cover
40 40 from kombu.serialization import register
41 41
42 42 import rhodecode
43 43
44 44 from rhodecode.lib.statsd_client import StatsdClient
45 45 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
46 46 from rhodecode.lib.ext_json import json
47 47 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
48 48 from rhodecode.lib.utils2 import str2bool
49 49 from rhodecode.model import meta
50 50
51 51
52 52 register('json_ext', json.dumps, json.loads,
53 53 content_type='application/x-json-ext',
54 54 content_encoding='utf-8')
55 55
56 56 log = logging.getLogger('celery.rhodecode.loader')
57 57
58 58
59 59 imports = ['rhodecode.lib.celerylib.tasks']
60 60
61 61 try:
62 62 # try if we have EE tasks available
63 63 importlib.import_module('rc_ee')
64 64 imports.append('rc_ee.lib.celerylib.tasks')
65 65 except ImportError:
66 66 pass
67 67
68 68
69 69 base_celery_config = {
70 70 'result_backend': 'rpc://',
71 71 'result_expires': 60 * 60 * 24,
72 72 'result_persistent': True,
73 73 'imports': imports,
74 74 'worker_max_tasks_per_child': 20,
75 75 'accept_content': ['json_ext', 'json'],
76 76 'task_serializer': 'json_ext',
77 77 'result_serializer': 'json_ext',
78 78 'worker_hijack_root_logger': False,
79 79 'database_table_names': {
80 80 'task': 'beat_taskmeta',
81 81 'group': 'beat_groupmeta',
82 82 }
83 83 }
84 84
85 85
86 86 def add_preload_arguments(parser):
87 87 parser.add_argument(
88 88 '--ini', default=None,
89 89 help='Path to ini configuration file.'
90 90 )
91 91 parser.add_argument(
92 92 '--ini-var', default=None,
93 93 help='Comma separated list of key=value to pass to ini.'
94 94 )
95 95
96 96
97 97 def get_logger(obj):
98 98 custom_log = logging.getLogger(
99 99 'rhodecode.task.{}'.format(obj.__class__.__name__))
100 100
101 101 if rhodecode.CELERY_ENABLED:
102 102 try:
103 103 custom_log = obj.get_logger()
104 104 except Exception:
105 105 pass
106 106
107 107 return custom_log
108 108
109 109
110 110 # init main celery app
111 111 celery_app = Celery()
112 112 celery_app.user_options['preload'].add(add_preload_arguments)
113 113
114 114
115 115 @signals.setup_logging.connect
116 116 def setup_logging_callback(**kwargs):
117 117 ini_file = celery_app.conf['RC_INI_FILE']
118 118 setup_logging(ini_file)
119 119
120 120
121 121 @signals.user_preload_options.connect
122 122 def on_preload_parsed(options, **kwargs):
123 123
124 124 ini_file = options['ini']
125 125 ini_vars = options['ini_var']
126 126
127 127 if ini_file is None:
128 128 print('You must provide the --ini argument to start celery')
129 129 exit(-1)
130 130
131 131 options = None
132 132 if ini_vars is not None:
133 133 options = parse_ini_vars(ini_vars)
134 134
135 135 celery_app.conf['RC_INI_FILE'] = ini_file
136 136 celery_app.conf['RC_INI_OPTIONS'] = options
137 137 setup_logging(ini_file)
138 138
139 139
140 140 def _init_celery(app_type=''):
141 141 from rhodecode.config.middleware import get_celery_config
142 142
143 143 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
144 144
145 145 ini_file = celery_app.conf['RC_INI_FILE']
146 146 options = celery_app.conf['RC_INI_OPTIONS']
147 147
148 148 env = None
149 149 try:
150 150 env = bootstrap(ini_file, options=options)
151 151 except Exception:
152 152 log.exception('Failed to bootstrap RhodeCode APP')
153 153
154 154 if not env:
155 155 raise EnvironmentError(
156 156 'Failed to load pyramid ENV. '
157 157 'Probably there is another error present that prevents from running pyramid app')
158 158
159 159 log.debug('Got Pyramid ENV: %s', env)
160 160
161 161 celery_settings = get_celery_config(env['registry'].settings)
162 162
163 163 setup_celery_app(
164 164 app=env['app'], root=env['root'], request=env['request'],
165 165 registry=env['registry'], closer=env['closer'],
166 166 celery_settings=celery_settings)
167 167
168 168
169 169 @signals.celeryd_init.connect
170 170 def on_celeryd_init(sender=None, conf=None, **kwargs):
171 171 _init_celery('celery worker')
172 172
173 173 # fix the global flag even if it's disabled via .ini file because this
174 174 # is a worker code that doesn't need this to be disabled.
175 175 rhodecode.CELERY_ENABLED = True
176 176
177 177
178 178 @signals.beat_init.connect
179 179 def on_beat_init(sender=None, conf=None, **kwargs):
180 180 _init_celery('celery beat')
181 181
182 182
183 183 @signals.task_prerun.connect
184 184 def task_prerun_signal(task_id, task, args, **kwargs):
185 185 ping_db()
186 186 statsd = StatsdClient.statsd
187 187 if statsd:
188 188 task_repr = getattr(task, 'name', task)
189 189 statsd.incr('rhodecode_celery_task_total', tags=[
190 190 'task:{}'.format(task_repr),
191 191 'mode:async'
192 192 ])
193 193
194 194
195 195 @signals.task_success.connect
196 196 def task_success_signal(result, **kwargs):
197 197 meta.Session.commit()
198 198 closer = celery_app.conf['PYRAMID_CLOSER']
199 199 if closer:
200 200 closer()
201 201
202 202
203 203
204 204 @signals.task_retry.connect
205 205 def task_retry_signal(
206 206 request, reason, einfo, **kwargs):
207 207 meta.Session.remove()
208 208 closer = celery_app.conf['PYRAMID_CLOSER']
209 209 if closer:
210 210 closer()
211 211
212 212
213 213 @signals.task_failure.connect
214 214 def task_failure_signal(
215 215 task_id, exception, args, kwargs, traceback, einfo, **kargs):
216 216
217 217 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
218 218 from rhodecode.lib.exc_tracking import store_exception
219 219 from rhodecode.lib.statsd_client import StatsdClient
220 220
221 221 meta.Session.remove()
222 222
223 223 # simulate sys.exc_info()
224 224 exc_info = (einfo.type, einfo.exception, einfo.tb)
225 225 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
226 226 statsd = StatsdClient.statsd
227 227 if statsd:
228 228 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
229 229 statsd.incr('rhodecode_exception_total',
230 230 tags=["exc_source:celery", "type:{}".format(exc_type)])
231 231
232 232 closer = celery_app.conf['PYRAMID_CLOSER']
233 233 if closer:
234 234 closer()
235 235
236 236
237 237 @signals.task_revoked.connect
238 238 def task_revoked_signal(
239 239 request, terminated, signum, expired, **kwargs):
240 240 closer = celery_app.conf['PYRAMID_CLOSER']
241 241 if closer:
242 242 closer()
243 243
244 244
245 245 class UNSET(object):
246 246 pass
247 247
248 248
249 def set_celery_conf(app=UNSET(), root=UNSET(), request=UNSET(), registry=UNSET(), closer=UNSET()):
249 _unset = UNSET()
250
251
252 def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset):
250 253
251 254 if request is not UNSET:
252 255 celery_app.conf.update({'PYRAMID_REQUEST': request})
253 256
254 257 if registry is not UNSET:
255 258 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
256 259
257 260
258 261 def setup_celery_app(app, root, request, registry, closer, celery_settings):
259 262 log.debug('Got custom celery conf: %s', celery_settings)
260 263 celery_config = base_celery_config
261 264 celery_config.update({
262 265 # store celerybeat scheduler db where the .ini file is
263 266 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
264 267 })
265 268
266 269 celery_config.update(celery_settings)
267 270 celery_app.config_from_object(celery_config)
268 271
269 272 celery_app.conf.update({'PYRAMID_APP': app})
270 273 celery_app.conf.update({'PYRAMID_ROOT': root})
271 274 celery_app.conf.update({'PYRAMID_REQUEST': request})
272 275 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
273 276 celery_app.conf.update({'PYRAMID_CLOSER': closer})
274 277
275 278
276 279 def configure_celery(config, celery_settings):
277 280 """
278 281 Helper that is called from our application creation logic. It gives
279 282 connection info into running webapp and allows execution of tasks from
280 283 RhodeCode itself
281 284 """
282 285 # store some globals into rhodecode
283 286 rhodecode.CELERY_ENABLED = str2bool(
284 287 config.registry.settings.get('use_celery'))
285 288 if rhodecode.CELERY_ENABLED:
286 289 log.info('Configuring celery based on `%s` settings', celery_settings)
287 290 setup_celery_app(
288 291 app=None, root=None, request=None, registry=config.registry,
289 292 closer=None, celery_settings=celery_settings)
290 293
291 294
292 295 def maybe_prepare_env(req):
293 296 environ = {}
294 297 try:
295 298 environ.update({
296 299 'PATH_INFO': req.environ['PATH_INFO'],
297 300 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
298 301 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
299 302 'SERVER_NAME': req.environ['SERVER_NAME'],
300 303 'SERVER_PORT': req.environ['SERVER_PORT'],
301 304 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
302 305 })
303 306 except Exception:
304 307 pass
305 308
306 309 return environ
307 310
308 311
309 312 class RequestContextTask(Task):
310 313 """
311 314 This is a celery task which will create a rhodecode app instance context
312 315 for the task, patch pyramid with the original request
313 316 that created the task and also add the user to the context.
314 317 """
315 318
316 319 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
317 320 link=None, link_error=None, shadow=None, **options):
318 321 """ queue the job to run (we are in web request context here) """
319 322 from rhodecode.lib.base import get_ip_addr
320 323
321 324 req = self.app.conf['PYRAMID_REQUEST']
322 325 if not req:
323 326 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
324 327
325 328 log.debug('Running Task with class: %s. Request Class: %s',
326 329 self.__class__, req.__class__)
327 330
328 331 user_id = 0
329 332
330 333 # web case
331 334 if hasattr(req, 'user'):
332 335 user_id = req.user.user_id
333 336
334 337 # api case
335 338 elif hasattr(req, 'rpc_user'):
336 339 user_id = req.rpc_user.user_id
337 340
338 341 # we hook into kwargs since it is the only way to pass our data to
339 342 # the celery worker
340 343 environ = maybe_prepare_env(req)
341 344 options['headers'] = options.get('headers', {})
342 345 options['headers'].update({
343 346 'rhodecode_proxy_data': {
344 347 'environ': environ,
345 348 'auth_user': {
346 349 'ip_addr': get_ip_addr(req.environ),
347 350 'user_id': user_id
348 351 },
349 352 }
350 353 })
351 354
352 355 return super(RequestContextTask, self).apply_async(
353 356 args, kwargs, task_id, producer, link, link_error, shadow, **options)
General Comments 0
You need to be logged in to leave comments. Login now