##// END OF EJS Templates
celery: use safer events for execution of tasks
marcink -
r2464:d10039ef default
parent child Browse files
Show More
@@ -1,276 +1,284 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 """
21 21 Celery loader, run with::
22 22
23 23 celery worker \
24 24 --beat \
25 25 --app rhodecode.lib.celerylib.loader \
26 26 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
27 27 --loglevel DEBUG --ini=._dev/dev.ini
28 28 """
29 29 import os
30 30 import logging
31 31
32 32 from celery import Celery
33 33 from celery import signals
34 34 from celery import Task
35 35 from celery import exceptions # noqa
36 36 from kombu.serialization import register
37 37 from pyramid.threadlocal import get_current_request
38 38
39 39 import rhodecode
40 40
41 41 from rhodecode.lib.auth import AuthUser
42 42 from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars
43 43 from rhodecode.lib.ext_json import json
44 44 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
45 45 from rhodecode.lib.utils2 import str2bool
46 46 from rhodecode.model import meta
47 47
48 48
49 49 register('json_ext', json.dumps, json.loads,
50 50 content_type='application/x-json-ext',
51 51 content_encoding='utf-8')
52 52
53 53 log = logging.getLogger('celery.rhodecode.loader')
54 54
55 55
56 56 def add_preload_arguments(parser):
57 57 parser.add_argument(
58 58 '--ini', default=None,
59 59 help='Path to ini configuration file.'
60 60 )
61 61 parser.add_argument(
62 62 '--ini-var', default=None,
63 63 help='Comma separated list of key=value to pass to ini.'
64 64 )
65 65
66 66
67 67 def get_logger(obj):
68 68 custom_log = logging.getLogger(
69 69 'rhodecode.task.{}'.format(obj.__class__.__name__))
70 70
71 71 if rhodecode.CELERY_ENABLED:
72 72 try:
73 73 custom_log = obj.get_logger()
74 74 except Exception:
75 75 pass
76 76
77 77 return custom_log
78 78
79 79
80 80 base_celery_config = {
81 81 'result_backend': 'rpc://',
82 82 'result_expires': 60 * 60 * 24,
83 83 'result_persistent': True,
84 84 'imports': [],
85 85 'worker_max_tasks_per_child': 100,
86 86 'accept_content': ['json_ext'],
87 87 'task_serializer': 'json_ext',
88 88 'result_serializer': 'json_ext',
89 89 'worker_hijack_root_logger': False,
90 90 'database_table_names': {
91 91 'task': 'beat_taskmeta',
92 92 'group': 'beat_groupmeta',
93 93 }
94 94 }
95 95 # init main celery app
96 96 celery_app = Celery()
97 97 celery_app.user_options['preload'].add(add_preload_arguments)
98 98 ini_file_glob = None
99 99
100 100
101 101 @signals.setup_logging.connect
102 102 def setup_logging_callback(**kwargs):
103 103 setup_logging(ini_file_glob)
104 104
105 105
106 106 @signals.user_preload_options.connect
107 107 def on_preload_parsed(options, **kwargs):
108 108 ini_location = options['ini']
109 109 ini_vars = options['ini_var']
110 110 celery_app.conf['INI_PYRAMID'] = options['ini']
111 111
112 112 if ini_location is None:
113 113 print('You must provide the paste --ini argument')
114 114 exit(-1)
115 115
116 116 options = None
117 117 if ini_vars is not None:
118 118 options = parse_ini_vars(ini_vars)
119 119
120 120 global ini_file_glob
121 121 ini_file_glob = ini_location
122 122
123 123 log.debug('Bootstrapping RhodeCode application...')
124 124 env = bootstrap(ini_location, options=options)
125 125
126 126 setup_celery_app(
127 127 app=env['app'], root=env['root'], request=env['request'],
128 128 registry=env['registry'], closer=env['closer'],
129 129 ini_location=ini_location)
130 130
131 131 # fix the global flag even if it's disabled via .ini file because this
132 132 # is a worker code that doesn't need this to be disabled.
133 133 rhodecode.CELERY_ENABLED = True
134 134
135 135
136 136 @signals.task_success.connect
137 137 def task_success_signal(result, **kwargs):
138 138 meta.Session.commit()
139 celery_app.conf['PYRAMID_CLOSER']()
139 closer = celery_app.conf['PYRAMID_CLOSER']
140 if closer:
141 closer()
140 142
141 143
142 144 @signals.task_retry.connect
143 145 def task_retry_signal(
144 146 request, reason, einfo, **kwargs):
145 147 meta.Session.remove()
146 celery_app.conf['PYRAMID_CLOSER']()
148 closer = celery_app.conf['PYRAMID_CLOSER']
149 if closer:
150 closer()
147 151
148 152
149 153 @signals.task_failure.connect
150 154 def task_failure_signal(
151 155 task_id, exception, args, kwargs, traceback, einfo, **kargs):
152 156 meta.Session.remove()
153 celery_app.conf['PYRAMID_CLOSER']()
157 closer = celery_app.conf['PYRAMID_CLOSER']
158 if closer:
159 closer()
154 160
155 161
156 162 @signals.task_revoked.connect
157 163 def task_revoked_signal(
158 164 request, terminated, signum, expired, **kwargs):
159 celery_app.conf['PYRAMID_CLOSER']()
165 closer = celery_app.conf['PYRAMID_CLOSER']
166 if closer:
167 closer()
160 168
161 169
162 170 def setup_celery_app(app, root, request, registry, closer, ini_location):
163 171 ini_dir = os.path.dirname(os.path.abspath(ini_location))
164 172 celery_config = base_celery_config
165 173 celery_config.update({
166 174 # store celerybeat scheduler db where the .ini file is
167 175 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'),
168 176 })
169 177 ini_settings = get_ini_config(ini_location)
170 178 log.debug('Got custom celery conf: %s', ini_settings)
171 179
172 180 celery_config.update(ini_settings)
173 181 celery_app.config_from_object(celery_config)
174 182
175 183 celery_app.conf.update({'PYRAMID_APP': app})
176 184 celery_app.conf.update({'PYRAMID_ROOT': root})
177 185 celery_app.conf.update({'PYRAMID_REQUEST': request})
178 186 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
179 187 celery_app.conf.update({'PYRAMID_CLOSER': closer})
180 188
181 189
182 190 def configure_celery(config, ini_location):
183 191 """
184 192 Helper that is called from our application creation logic. It gives
185 193 connection info into running webapp and allows execution of tasks from
186 194 RhodeCode itself
187 195 """
188 196 # store some globals into rhodecode
189 197 rhodecode.CELERY_ENABLED = str2bool(
190 198 config.registry.settings.get('use_celery'))
191 199 if rhodecode.CELERY_ENABLED:
192 200 log.info('Configuring celery based on `%s` file', ini_location)
193 201 setup_celery_app(
194 202 app=None, root=None, request=None, registry=config.registry,
195 203 closer=None, ini_location=ini_location)
196 204
197 205
198 206 def maybe_prepare_env(req):
199 207 environ = {}
200 208 try:
201 209 environ.update({
202 210 'PATH_INFO': req.environ['PATH_INFO'],
203 211 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
204 212 'HTTP_HOST':
205 213 req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
206 214 'SERVER_NAME': req.environ['SERVER_NAME'],
207 215 'SERVER_PORT': req.environ['SERVER_PORT'],
208 216 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
209 217 })
210 218 except Exception:
211 219 pass
212 220
213 221 return environ
214 222
215 223
216 224 class RequestContextTask(Task):
217 225 """
218 226 This is a celery task which will create a rhodecode app instance context
219 227 for the task, patch pyramid with the original request
220 228 that created the task and also add the user to the context.
221 229 """
222 230
223 231 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
224 232 link=None, link_error=None, shadow=None, **options):
225 233 """ queue the job to run (we are in web request context here) """
226 234
227 235 req = get_current_request()
228 236
229 237 # web case
230 238 if hasattr(req, 'user'):
231 239 ip_addr = req.user.ip_addr
232 240 user_id = req.user.user_id
233 241
234 242 # api case
235 243 elif hasattr(req, 'rpc_user'):
236 244 ip_addr = req.rpc_user.ip_addr
237 245 user_id = req.rpc_user.user_id
238 246 else:
239 247 raise Exception(
240 248 'Unable to fetch required data from request: {}. \n'
241 249 'This task is required to be executed from context of '
242 250 'request in a webapp'.format(repr(req)))
243 251
244 252 if req:
245 253 # we hook into kwargs since it is the only way to pass our data to
246 254 # the celery worker
247 255 environ = maybe_prepare_env(req)
248 256 options['headers'] = options.get('headers', {})
249 257 options['headers'].update({
250 258 'rhodecode_proxy_data': {
251 259 'environ': environ,
252 260 'auth_user': {
253 261 'ip_addr': ip_addr,
254 262 'user_id': user_id
255 263 },
256 264 }
257 265 })
258 266
259 267 return super(RequestContextTask, self).apply_async(
260 268 args, kwargs, task_id, producer, link, link_error, shadow, **options)
261 269
262 270 def __call__(self, *args, **kwargs):
263 271 """ rebuild the context and then run task on celery worker """
264 272
265 273 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
266 274 if not proxy_data:
267 275 return super(RequestContextTask, self).__call__(*args, **kwargs)
268 276
269 277 log.debug('using celery proxy data to run task: %r', proxy_data)
270 278 # re-inject and register threadlocals for proper routing support
271 279 request = prepare_request(proxy_data['environ'])
272 280 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
273 281 ip_addr=proxy_data['auth_user']['ip_addr'])
274 282
275 283 return super(RequestContextTask, self).__call__(*args, **kwargs)
276 284
General Comments 0
You need to be logged in to leave comments. Login now