##// END OF EJS Templates
celery: don't allow subtasks within tasks
super-admin -
r4875:92d53da0 default
parent child Browse files
Show More
@@ -1,84 +1,96 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import socket
22 22 import logging
23 23
24 24 import rhodecode
25 25 from zope.cachedescriptors.property import Lazy as LazyProperty
26 26 from rhodecode.lib.celerylib.loader import (
27 27 celery_app, RequestContextTask, get_logger)
28 28 from rhodecode.lib.statsd_client import StatsdClient
29 29
30 30 async_task = celery_app.task
31 31
32 32
33 33 log = logging.getLogger(__name__)
34 34
35 35
36 36 class ResultWrapper(object):
37 37 def __init__(self, task):
38 38 self.task = task
39 39
40 40 @LazyProperty
41 41 def result(self):
42 42 return self.task
43 43
44 44
45 45 def run_task(task, *args, **kwargs):
46 import celery
46 47 log.debug('Got task `%s` for execution, celery mode enabled:%s', task, rhodecode.CELERY_ENABLED)
47 48 if task is None:
48 49 raise ValueError('Got non-existing task for execution')
49 50
50 51 exec_mode = 'sync'
52 allow_async = True
53
54 # if we're already in a celery task, don't allow async execution again
55 # e.g task within task
56 in_task = celery.current_task
57 if in_task:
58 log.debug('This task in in context of another task: %s, not allowing another async execution', in_task)
59 allow_async = False
60 if kwargs.pop('allow_subtask', False):
61 log.debug('Forced async by allow_async=True flag')
62 allow_async = True
51 63
52 64 t = None
53 if rhodecode.CELERY_ENABLED:
65 if rhodecode.CELERY_ENABLED and allow_async:
54 66 try:
55 67 t = task.apply_async(args=args, kwargs=kwargs)
56 68 log.debug('executing task %s:%s in async mode', t.task_id, task)
57 69 exec_mode = 'async'
58 70 except socket.error as e:
59 71 if isinstance(e, IOError) and e.errno == 111:
60 72 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
61 73 else:
62 74 log.exception("Exception while connecting to celeryd.")
63 75 except KeyError as e:
64 76 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
65 77 except Exception as e:
66 78 log.exception(
67 79 "Exception while trying to run task asynchronous. "
68 80 "Fallback to sync execution.")
69 81
70 82 else:
71 83 log.debug('executing task %s:%s in sync mode', 'TASK', task)
72 84
73 85 statsd = StatsdClient.statsd
74 86 if statsd:
75 87 task_repr = getattr(task, 'name', task)
76 88 statsd.incr('rhodecode_celery_task_total', tags=[
77 89 'task:{}'.format(task_repr),
78 90 'mode:{}'.format(exec_mode)
79 91 ])
80 92
81 93 # we got async task, return it after statsd call
82 94 if t:
83 95 return t
84 96 return ResultWrapper(task(*args, **kwargs))
@@ -1,327 +1,323 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 """
21 21 Celery loader, run with::
22 22
23 23 celery worker \
24 24 --task-events \
25 25 --beat \
26 26 --autoscale=20,2 \
27 27 --max-tasks-per-child 1 \
28 28 --app rhodecode.lib.celerylib.loader \
29 29 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
30 30 --loglevel DEBUG --ini=.dev/dev.ini
31 31 """
32 32 import os
33 33 import logging
34 34 import importlib
35 35
36 36 from celery import Celery
37 37 from celery import signals
38 38 from celery import Task
39 39 from celery import exceptions # pragma: no cover
40 40 from kombu.serialization import register
41 41 from pyramid.threadlocal import get_current_request
42 42
43 43 import rhodecode
44 44
45 45 from rhodecode.lib.auth import AuthUser
46 46 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
47 47 from rhodecode.lib.ext_json import json
48 48 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
49 49 from rhodecode.lib.utils2 import str2bool
50 50 from rhodecode.model import meta
51 51
52 52
53 53 register('json_ext', json.dumps, json.loads,
54 54 content_type='application/x-json-ext',
55 55 content_encoding='utf-8')
56 56
57 57 log = logging.getLogger('celery.rhodecode.loader')
58 58
59 59
60 60 def add_preload_arguments(parser):
61 61 parser.add_argument(
62 62 '--ini', default=None,
63 63 help='Path to ini configuration file.'
64 64 )
65 65 parser.add_argument(
66 66 '--ini-var', default=None,
67 67 help='Comma separated list of key=value to pass to ini.'
68 68 )
69 69
70 70
71 71 def get_logger(obj):
72 72 custom_log = logging.getLogger(
73 73 'rhodecode.task.{}'.format(obj.__class__.__name__))
74 74
75 75 if rhodecode.CELERY_ENABLED:
76 76 try:
77 77 custom_log = obj.get_logger()
78 78 except Exception:
79 79 pass
80 80
81 81 return custom_log
82 82
83 83
84 84 imports = ['rhodecode.lib.celerylib.tasks']
85 85
86 86 try:
87 87 # try if we have EE tasks available
88 88 importlib.import_module('rc_ee')
89 89 imports.append('rc_ee.lib.celerylib.tasks')
90 90 except ImportError:
91 91 pass
92 92
93 93
94 94 base_celery_config = {
95 95 'result_backend': 'rpc://',
96 96 'result_expires': 60 * 60 * 24,
97 97 'result_persistent': True,
98 98 'imports': imports,
99 99 'worker_max_tasks_per_child': 100,
100 100 'accept_content': ['json_ext'],
101 101 'task_serializer': 'json_ext',
102 102 'result_serializer': 'json_ext',
103 103 'worker_hijack_root_logger': False,
104 104 'database_table_names': {
105 105 'task': 'beat_taskmeta',
106 106 'group': 'beat_groupmeta',
107 107 }
108 108 }
109 109 # init main celery app
110 110 celery_app = Celery()
111 111 celery_app.user_options['preload'].add(add_preload_arguments)
112 112 ini_file_glob = None
113 113
114 114
115 115 @signals.setup_logging.connect
116 116 def setup_logging_callback(**kwargs):
117 117 setup_logging(ini_file_glob)
118 118
119 119
120 120 @signals.user_preload_options.connect
121 121 def on_preload_parsed(options, **kwargs):
122 122 from rhodecode.config.middleware import get_celery_config
123 123
124 124 ini_location = options['ini']
125 125 ini_vars = options['ini_var']
126 126 celery_app.conf['INI_PYRAMID'] = options['ini']
127 127
128 128 if ini_location is None:
129 129 print('You must provide the paste --ini argument')
130 130 exit(-1)
131 131
132 132 options = None
133 133 if ini_vars is not None:
134 134 options = parse_ini_vars(ini_vars)
135 135
136 136 global ini_file_glob
137 137 ini_file_glob = ini_location
138 138
139 139 log.debug('Bootstrapping RhodeCode application...')
140 140
141 141 env = {}
142 142 try:
143 143 env = bootstrap(ini_location, options=options)
144 144 except Exception:
145 145 log.exception('Failed to bootstrap RhodeCode APP')
146 146
147 147 log.debug('Got Pyramid ENV: %s', env)
148 148 celery_settings = get_celery_config(env['registry'].settings)
149 149
150 150 setup_celery_app(
151 151 app=env['app'], root=env['root'], request=env['request'],
152 152 registry=env['registry'], closer=env['closer'],
153 153 celery_settings=celery_settings)
154 154
155 155 # fix the global flag even if it's disabled via .ini file because this
156 156 # is a worker code that doesn't need this to be disabled.
157 157 rhodecode.CELERY_ENABLED = True
158 158
159 159
160 160 @signals.task_prerun.connect
161 161 def task_prerun_signal(task_id, task, args, **kwargs):
162 162 ping_db()
163 163
164 164
165 165 @signals.task_success.connect
166 166 def task_success_signal(result, **kwargs):
167 167 meta.Session.commit()
168 168 closer = celery_app.conf['PYRAMID_CLOSER']
169 169 if closer:
170 170 closer()
171 171
172 172
173 173 @signals.task_retry.connect
174 174 def task_retry_signal(
175 175 request, reason, einfo, **kwargs):
176 176 meta.Session.remove()
177 177 closer = celery_app.conf['PYRAMID_CLOSER']
178 178 if closer:
179 179 closer()
180 180
181 181
182 182 @signals.task_failure.connect
183 183 def task_failure_signal(
184 184 task_id, exception, args, kwargs, traceback, einfo, **kargs):
185
186 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
185 187 from rhodecode.lib.exc_tracking import store_exception
186 188 from rhodecode.lib.statsd_client import StatsdClient
187 189
188 190 meta.Session.remove()
189 191
190 192 # simulate sys.exc_info()
191 193 exc_info = (einfo.type, einfo.exception, einfo.tb)
192 194 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
193 195 statsd = StatsdClient.statsd
194 196 if statsd:
195 197 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
196 198 statsd.incr('rhodecode_exception_total',
197 199 tags=["exc_source:celery", "type:{}".format(exc_type)])
198 200
199 201 closer = celery_app.conf['PYRAMID_CLOSER']
200 202 if closer:
201 203 closer()
202 204
203 205
204 206 @signals.task_revoked.connect
205 207 def task_revoked_signal(
206 208 request, terminated, signum, expired, **kwargs):
207 209 closer = celery_app.conf['PYRAMID_CLOSER']
208 210 if closer:
209 211 closer()
210 212
211 213
212 214 def setup_celery_app(app, root, request, registry, closer, celery_settings):
213 215 log.debug('Got custom celery conf: %s', celery_settings)
214 216 celery_config = base_celery_config
215 217 celery_config.update({
216 218 # store celerybeat scheduler db where the .ini file is
217 219 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
218 220 })
219 221
220 222 celery_config.update(celery_settings)
221 223 celery_app.config_from_object(celery_config)
222 224
223 225 celery_app.conf.update({'PYRAMID_APP': app})
224 226 celery_app.conf.update({'PYRAMID_ROOT': root})
225 227 celery_app.conf.update({'PYRAMID_REQUEST': request})
226 228 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
227 229 celery_app.conf.update({'PYRAMID_CLOSER': closer})
228 230
229 231
230 232 def configure_celery(config, celery_settings):
231 233 """
232 234 Helper that is called from our application creation logic. It gives
233 235 connection info into running webapp and allows execution of tasks from
234 236 RhodeCode itself
235 237 """
236 238 # store some globals into rhodecode
237 239 rhodecode.CELERY_ENABLED = str2bool(
238 240 config.registry.settings.get('use_celery'))
239 241 if rhodecode.CELERY_ENABLED:
240 242 log.info('Configuring celery based on `%s` settings', celery_settings)
241 243 setup_celery_app(
242 244 app=None, root=None, request=None, registry=config.registry,
243 245 closer=None, celery_settings=celery_settings)
244 246
245 247
246 248 def maybe_prepare_env(req):
247 249 environ = {}
248 250 try:
249 251 environ.update({
250 252 'PATH_INFO': req.environ['PATH_INFO'],
251 253 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
252 254 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
253 255 'SERVER_NAME': req.environ['SERVER_NAME'],
254 256 'SERVER_PORT': req.environ['SERVER_PORT'],
255 257 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
256 258 })
257 259 except Exception:
258 260 pass
259 261
260 262 return environ
261 263
262 264
263 265 class RequestContextTask(Task):
264 266 """
265 267 This is a celery task which will create a rhodecode app instance context
266 268 for the task, patch pyramid with the original request
267 269 that created the task and also add the user to the context.
268 270 """
269 271
270 272 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
271 273 link=None, link_error=None, shadow=None, **options):
272 274 """ queue the job to run (we are in web request context here) """
273 275
274 276 req = self.app.conf['PYRAMID_REQUEST'] or get_current_request()
275 277
276 278 log.debug('Running Task with class: %s. Request Class: %s',
277 279 self.__class__, req.__class__)
278 280
279 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
280 log.debug('celery proxy data:%r', proxy_data)
281
282 281 user_id = None
283 282 ip_addr = None
284 if proxy_data:
285 user_id = proxy_data['auth_user']['user_id']
286 ip_addr = proxy_data['auth_user']['ip_addr']
287 283
288 284 # web case
289 285 if hasattr(req, 'user'):
290 286 ip_addr = req.user.ip_addr
291 287 user_id = req.user.user_id
292 288
293 289 # api case
294 290 elif hasattr(req, 'rpc_user'):
295 291 ip_addr = req.rpc_user.ip_addr
296 292 user_id = req.rpc_user.user_id
297 293 else:
298 294 if user_id and ip_addr:
299 295 log.debug('Using data from celery proxy user')
300 296
301 297 else:
302 298 raise Exception(
303 299 'Unable to fetch required data from request: {}. \n'
304 300 'This task is required to be executed from context of '
305 301 'request in a webapp. Task: {}'.format(
306 302 repr(req),
307 303 self.__class__
308 304 )
309 305 )
310 306
311 307 if req:
312 308 # we hook into kwargs since it is the only way to pass our data to
313 309 # the celery worker
314 310 environ = maybe_prepare_env(req)
315 311 options['headers'] = options.get('headers', {})
316 312 options['headers'].update({
317 313 'rhodecode_proxy_data': {
318 314 'environ': environ,
319 315 'auth_user': {
320 316 'ip_addr': ip_addr,
321 317 'user_id': user_id
322 318 },
323 319 }
324 320 })
325 321
326 322 return super(RequestContextTask, self).apply_async(
327 323 args, kwargs, task_id, producer, link, link_error, shadow, **options)
General Comments 0
You need to be logged in to leave comments. Login now