##// END OF EJS Templates
celery: use safe environ extraction. In few cases of executing tasks at non-pyramid level....
marcink -
r2417:51f63062 default
parent child Browse files
Show More
@@ -1,265 +1,276 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 """
21 21 Celery loader, run with::
22 22
23 23 celery worker \
24 24 --beat \
25 25 --app rhodecode.lib.celerylib.loader \
26 26 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
27 27 --loglevel DEBUG --ini=._dev/dev.ini
28 28 """
29 29 import os
30 30 import logging
31 31
32 32 from celery import Celery
33 33 from celery import signals
34 34 from celery import Task
35 35 from celery import exceptions # noqa
36 36 from kombu.serialization import register
37 37 from pyramid.threadlocal import get_current_request
38 38
39 39 import rhodecode
40 40
41 41 from rhodecode.lib.auth import AuthUser
42 42 from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars
43 43 from rhodecode.lib.ext_json import json
44 44 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
45 45 from rhodecode.lib.utils2 import str2bool
46 46 from rhodecode.model import meta
47 47
48 48
49 49 register('json_ext', json.dumps, json.loads,
50 50 content_type='application/x-json-ext',
51 51 content_encoding='utf-8')
52 52
53 53 log = logging.getLogger('celery.rhodecode.loader')
54 54
55 55
56 56 def add_preload_arguments(parser):
57 57 parser.add_argument(
58 58 '--ini', default=None,
59 59 help='Path to ini configuration file.'
60 60 )
61 61 parser.add_argument(
62 62 '--ini-var', default=None,
63 63 help='Comma separated list of key=value to pass to ini.'
64 64 )
65 65
66 66
67 67 def get_logger(obj):
68 68 custom_log = logging.getLogger(
69 69 'rhodecode.task.{}'.format(obj.__class__.__name__))
70 70
71 71 if rhodecode.CELERY_ENABLED:
72 72 try:
73 73 custom_log = obj.get_logger()
74 74 except Exception:
75 75 pass
76 76
77 77 return custom_log
78 78
79 79
80 80 base_celery_config = {
81 81 'result_backend': 'rpc://',
82 82 'result_expires': 60 * 60 * 24,
83 83 'result_persistent': True,
84 84 'imports': [],
85 85 'worker_max_tasks_per_child': 100,
86 86 'accept_content': ['json_ext'],
87 87 'task_serializer': 'json_ext',
88 88 'result_serializer': 'json_ext',
89 89 'worker_hijack_root_logger': False,
90 90 'database_table_names': {
91 91 'task': 'beat_taskmeta',
92 92 'group': 'beat_groupmeta',
93 93 }
94 94 }
95 95 # init main celery app
96 96 celery_app = Celery()
97 97 celery_app.user_options['preload'].add(add_preload_arguments)
98 98 ini_file_glob = None
99 99
100 100
101 101 @signals.setup_logging.connect
102 102 def setup_logging_callback(**kwargs):
103 103 setup_logging(ini_file_glob)
104 104
105 105
106 106 @signals.user_preload_options.connect
107 107 def on_preload_parsed(options, **kwargs):
108 108 ini_location = options['ini']
109 109 ini_vars = options['ini_var']
110 110 celery_app.conf['INI_PYRAMID'] = options['ini']
111 111
112 112 if ini_location is None:
113 113 print('You must provide the paste --ini argument')
114 114 exit(-1)
115 115
116 116 options = None
117 117 if ini_vars is not None:
118 118 options = parse_ini_vars(ini_vars)
119 119
120 120 global ini_file_glob
121 121 ini_file_glob = ini_location
122 122
123 123 log.debug('Bootstrapping RhodeCode application...')
124 124 env = bootstrap(ini_location, options=options)
125 125
126 126 setup_celery_app(
127 127 app=env['app'], root=env['root'], request=env['request'],
128 128 registry=env['registry'], closer=env['closer'],
129 129 ini_location=ini_location)
130 130
131 131 # fix the global flag even if it's disabled via .ini file because this
132 132 # is a worker code that doesn't need this to be disabled.
133 133 rhodecode.CELERY_ENABLED = True
134 134
135 135
136 136 @signals.task_success.connect
137 137 def task_success_signal(result, **kwargs):
138 138 meta.Session.commit()
139 139 celery_app.conf['PYRAMID_CLOSER']()
140 140
141 141
142 142 @signals.task_retry.connect
143 143 def task_retry_signal(
144 144 request, reason, einfo, **kwargs):
145 145 meta.Session.remove()
146 146 celery_app.conf['PYRAMID_CLOSER']()
147 147
148 148
149 149 @signals.task_failure.connect
150 150 def task_failure_signal(
151 151 task_id, exception, args, kwargs, traceback, einfo, **kargs):
152 152 meta.Session.remove()
153 153 celery_app.conf['PYRAMID_CLOSER']()
154 154
155 155
156 156 @signals.task_revoked.connect
157 157 def task_revoked_signal(
158 158 request, terminated, signum, expired, **kwargs):
159 159 celery_app.conf['PYRAMID_CLOSER']()
160 160
161 161
162 162 def setup_celery_app(app, root, request, registry, closer, ini_location):
163 163 ini_dir = os.path.dirname(os.path.abspath(ini_location))
164 164 celery_config = base_celery_config
165 165 celery_config.update({
166 166 # store celerybeat scheduler db where the .ini file is
167 167 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'),
168 168 })
169 169 ini_settings = get_ini_config(ini_location)
170 170 log.debug('Got custom celery conf: %s', ini_settings)
171 171
172 172 celery_config.update(ini_settings)
173 173 celery_app.config_from_object(celery_config)
174 174
175 175 celery_app.conf.update({'PYRAMID_APP': app})
176 176 celery_app.conf.update({'PYRAMID_ROOT': root})
177 177 celery_app.conf.update({'PYRAMID_REQUEST': request})
178 178 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
179 179 celery_app.conf.update({'PYRAMID_CLOSER': closer})
180 180
181 181
182 182 def configure_celery(config, ini_location):
183 183 """
184 184 Helper that is called from our application creation logic. It gives
185 185 connection info into running webapp and allows execution of tasks from
186 186 RhodeCode itself
187 187 """
188 188 # store some globals into rhodecode
189 189 rhodecode.CELERY_ENABLED = str2bool(
190 190 config.registry.settings.get('use_celery'))
191 191 if rhodecode.CELERY_ENABLED:
192 192 log.info('Configuring celery based on `%s` file', ini_location)
193 193 setup_celery_app(
194 194 app=None, root=None, request=None, registry=config.registry,
195 195 closer=None, ini_location=ini_location)
196 196
197 197
198 def maybe_prepare_env(req):
199 environ = {}
200 try:
201 environ.update({
202 'PATH_INFO': req.environ['PATH_INFO'],
203 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
204 'HTTP_HOST':
205 req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
206 'SERVER_NAME': req.environ['SERVER_NAME'],
207 'SERVER_PORT': req.environ['SERVER_PORT'],
208 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
209 })
210 except Exception:
211 pass
212
213 return environ
214
215
198 216 class RequestContextTask(Task):
199 217 """
200 218 This is a celery task which will create a rhodecode app instance context
201 219 for the task, patch pyramid with the original request
202 220 that created the task and also add the user to the context.
203 221 """
204 222
205 223 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
206 224 link=None, link_error=None, shadow=None, **options):
207 225 """ queue the job to run (we are in web request context here) """
208 226
209 227 req = get_current_request()
210 228
211 229 # web case
212 230 if hasattr(req, 'user'):
213 231 ip_addr = req.user.ip_addr
214 232 user_id = req.user.user_id
215 233
216 234 # api case
217 235 elif hasattr(req, 'rpc_user'):
218 236 ip_addr = req.rpc_user.ip_addr
219 237 user_id = req.rpc_user.user_id
220 238 else:
221 239 raise Exception(
222 240 'Unable to fetch required data from request: {}. \n'
223 241 'This task is required to be executed from context of '
224 242 'request in a webapp'.format(repr(req)))
225 243
226 244 if req:
227 245 # we hook into kwargs since it is the only way to pass our data to
228 246 # the celery worker
247 environ = maybe_prepare_env(req)
229 248 options['headers'] = options.get('headers', {})
230 249 options['headers'].update({
231 250 'rhodecode_proxy_data': {
232 'environ': {
233 'PATH_INFO': req.environ['PATH_INFO'],
234 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
235 'HTTP_HOST': req.environ.get('HTTP_HOST',
236 req.environ['SERVER_NAME']),
237 'SERVER_NAME': req.environ['SERVER_NAME'],
238 'SERVER_PORT': req.environ['SERVER_PORT'],
239 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
240 },
251 'environ': environ,
241 252 'auth_user': {
242 253 'ip_addr': ip_addr,
243 254 'user_id': user_id
244 255 },
245 256 }
246 257 })
247 258
248 259 return super(RequestContextTask, self).apply_async(
249 260 args, kwargs, task_id, producer, link, link_error, shadow, **options)
250 261
251 262 def __call__(self, *args, **kwargs):
252 263 """ rebuild the context and then run task on celery worker """
253 264
254 265 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
255 266 if not proxy_data:
256 267 return super(RequestContextTask, self).__call__(*args, **kwargs)
257 268
258 269 log.debug('using celery proxy data to run task: %r', proxy_data)
259 270 # re-inject and register threadlocals for proper routing support
260 271 request = prepare_request(proxy_data['environ'])
261 272 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
262 273 ip_addr=proxy_data['auth_user']['ip_addr'])
263 274
264 275 return super(RequestContextTask, self).__call__(*args, **kwargs)
265 276
General Comments 0
You need to be logged in to leave comments. Login now