##// END OF EJS Templates
celery: use safe environ extraction. In few cases of executing tasks at non-pyramid level....
marcink -
r2417:51f63062 default
parent child Browse files
Show More
@@ -1,265 +1,276 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2017 RhodeCode GmbH
3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 """
20 """
21 Celery loader, run with::
21 Celery loader, run with::
22
22
23 celery worker \
23 celery worker \
24 --beat \
24 --beat \
25 --app rhodecode.lib.celerylib.loader \
25 --app rhodecode.lib.celerylib.loader \
26 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
26 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
27 --loglevel DEBUG --ini=._dev/dev.ini
27 --loglevel DEBUG --ini=._dev/dev.ini
28 """
28 """
29 import os
29 import os
30 import logging
30 import logging
31
31
32 from celery import Celery
32 from celery import Celery
33 from celery import signals
33 from celery import signals
34 from celery import Task
34 from celery import Task
35 from celery import exceptions # noqa
35 from celery import exceptions # noqa
36 from kombu.serialization import register
36 from kombu.serialization import register
37 from pyramid.threadlocal import get_current_request
37 from pyramid.threadlocal import get_current_request
38
38
39 import rhodecode
39 import rhodecode
40
40
41 from rhodecode.lib.auth import AuthUser
41 from rhodecode.lib.auth import AuthUser
42 from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars
42 from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars
43 from rhodecode.lib.ext_json import json
43 from rhodecode.lib.ext_json import json
44 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
44 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
45 from rhodecode.lib.utils2 import str2bool
45 from rhodecode.lib.utils2 import str2bool
46 from rhodecode.model import meta
46 from rhodecode.model import meta
47
47
48
48
49 register('json_ext', json.dumps, json.loads,
49 register('json_ext', json.dumps, json.loads,
50 content_type='application/x-json-ext',
50 content_type='application/x-json-ext',
51 content_encoding='utf-8')
51 content_encoding='utf-8')
52
52
53 log = logging.getLogger('celery.rhodecode.loader')
53 log = logging.getLogger('celery.rhodecode.loader')
54
54
55
55
56 def add_preload_arguments(parser):
56 def add_preload_arguments(parser):
57 parser.add_argument(
57 parser.add_argument(
58 '--ini', default=None,
58 '--ini', default=None,
59 help='Path to ini configuration file.'
59 help='Path to ini configuration file.'
60 )
60 )
61 parser.add_argument(
61 parser.add_argument(
62 '--ini-var', default=None,
62 '--ini-var', default=None,
63 help='Comma separated list of key=value to pass to ini.'
63 help='Comma separated list of key=value to pass to ini.'
64 )
64 )
65
65
66
66
67 def get_logger(obj):
67 def get_logger(obj):
68 custom_log = logging.getLogger(
68 custom_log = logging.getLogger(
69 'rhodecode.task.{}'.format(obj.__class__.__name__))
69 'rhodecode.task.{}'.format(obj.__class__.__name__))
70
70
71 if rhodecode.CELERY_ENABLED:
71 if rhodecode.CELERY_ENABLED:
72 try:
72 try:
73 custom_log = obj.get_logger()
73 custom_log = obj.get_logger()
74 except Exception:
74 except Exception:
75 pass
75 pass
76
76
77 return custom_log
77 return custom_log
78
78
79
79
80 base_celery_config = {
80 base_celery_config = {
81 'result_backend': 'rpc://',
81 'result_backend': 'rpc://',
82 'result_expires': 60 * 60 * 24,
82 'result_expires': 60 * 60 * 24,
83 'result_persistent': True,
83 'result_persistent': True,
84 'imports': [],
84 'imports': [],
85 'worker_max_tasks_per_child': 100,
85 'worker_max_tasks_per_child': 100,
86 'accept_content': ['json_ext'],
86 'accept_content': ['json_ext'],
87 'task_serializer': 'json_ext',
87 'task_serializer': 'json_ext',
88 'result_serializer': 'json_ext',
88 'result_serializer': 'json_ext',
89 'worker_hijack_root_logger': False,
89 'worker_hijack_root_logger': False,
90 'database_table_names': {
90 'database_table_names': {
91 'task': 'beat_taskmeta',
91 'task': 'beat_taskmeta',
92 'group': 'beat_groupmeta',
92 'group': 'beat_groupmeta',
93 }
93 }
94 }
94 }
95 # init main celery app
95 # init main celery app
96 celery_app = Celery()
96 celery_app = Celery()
97 celery_app.user_options['preload'].add(add_preload_arguments)
97 celery_app.user_options['preload'].add(add_preload_arguments)
98 ini_file_glob = None
98 ini_file_glob = None
99
99
100
100
101 @signals.setup_logging.connect
101 @signals.setup_logging.connect
102 def setup_logging_callback(**kwargs):
102 def setup_logging_callback(**kwargs):
103 setup_logging(ini_file_glob)
103 setup_logging(ini_file_glob)
104
104
105
105
106 @signals.user_preload_options.connect
106 @signals.user_preload_options.connect
107 def on_preload_parsed(options, **kwargs):
107 def on_preload_parsed(options, **kwargs):
108 ini_location = options['ini']
108 ini_location = options['ini']
109 ini_vars = options['ini_var']
109 ini_vars = options['ini_var']
110 celery_app.conf['INI_PYRAMID'] = options['ini']
110 celery_app.conf['INI_PYRAMID'] = options['ini']
111
111
112 if ini_location is None:
112 if ini_location is None:
113 print('You must provide the paste --ini argument')
113 print('You must provide the paste --ini argument')
114 exit(-1)
114 exit(-1)
115
115
116 options = None
116 options = None
117 if ini_vars is not None:
117 if ini_vars is not None:
118 options = parse_ini_vars(ini_vars)
118 options = parse_ini_vars(ini_vars)
119
119
120 global ini_file_glob
120 global ini_file_glob
121 ini_file_glob = ini_location
121 ini_file_glob = ini_location
122
122
123 log.debug('Bootstrapping RhodeCode application...')
123 log.debug('Bootstrapping RhodeCode application...')
124 env = bootstrap(ini_location, options=options)
124 env = bootstrap(ini_location, options=options)
125
125
126 setup_celery_app(
126 setup_celery_app(
127 app=env['app'], root=env['root'], request=env['request'],
127 app=env['app'], root=env['root'], request=env['request'],
128 registry=env['registry'], closer=env['closer'],
128 registry=env['registry'], closer=env['closer'],
129 ini_location=ini_location)
129 ini_location=ini_location)
130
130
131 # fix the global flag even if it's disabled via .ini file because this
131 # fix the global flag even if it's disabled via .ini file because this
132 # is a worker code that doesn't need this to be disabled.
132 # is a worker code that doesn't need this to be disabled.
133 rhodecode.CELERY_ENABLED = True
133 rhodecode.CELERY_ENABLED = True
134
134
135
135
136 @signals.task_success.connect
136 @signals.task_success.connect
137 def task_success_signal(result, **kwargs):
137 def task_success_signal(result, **kwargs):
138 meta.Session.commit()
138 meta.Session.commit()
139 celery_app.conf['PYRAMID_CLOSER']()
139 celery_app.conf['PYRAMID_CLOSER']()
140
140
141
141
142 @signals.task_retry.connect
142 @signals.task_retry.connect
143 def task_retry_signal(
143 def task_retry_signal(
144 request, reason, einfo, **kwargs):
144 request, reason, einfo, **kwargs):
145 meta.Session.remove()
145 meta.Session.remove()
146 celery_app.conf['PYRAMID_CLOSER']()
146 celery_app.conf['PYRAMID_CLOSER']()
147
147
148
148
149 @signals.task_failure.connect
149 @signals.task_failure.connect
150 def task_failure_signal(
150 def task_failure_signal(
151 task_id, exception, args, kwargs, traceback, einfo, **kargs):
151 task_id, exception, args, kwargs, traceback, einfo, **kargs):
152 meta.Session.remove()
152 meta.Session.remove()
153 celery_app.conf['PYRAMID_CLOSER']()
153 celery_app.conf['PYRAMID_CLOSER']()
154
154
155
155
156 @signals.task_revoked.connect
156 @signals.task_revoked.connect
157 def task_revoked_signal(
157 def task_revoked_signal(
158 request, terminated, signum, expired, **kwargs):
158 request, terminated, signum, expired, **kwargs):
159 celery_app.conf['PYRAMID_CLOSER']()
159 celery_app.conf['PYRAMID_CLOSER']()
160
160
161
161
162 def setup_celery_app(app, root, request, registry, closer, ini_location):
162 def setup_celery_app(app, root, request, registry, closer, ini_location):
163 ini_dir = os.path.dirname(os.path.abspath(ini_location))
163 ini_dir = os.path.dirname(os.path.abspath(ini_location))
164 celery_config = base_celery_config
164 celery_config = base_celery_config
165 celery_config.update({
165 celery_config.update({
166 # store celerybeat scheduler db where the .ini file is
166 # store celerybeat scheduler db where the .ini file is
167 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'),
167 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'),
168 })
168 })
169 ini_settings = get_ini_config(ini_location)
169 ini_settings = get_ini_config(ini_location)
170 log.debug('Got custom celery conf: %s', ini_settings)
170 log.debug('Got custom celery conf: %s', ini_settings)
171
171
172 celery_config.update(ini_settings)
172 celery_config.update(ini_settings)
173 celery_app.config_from_object(celery_config)
173 celery_app.config_from_object(celery_config)
174
174
175 celery_app.conf.update({'PYRAMID_APP': app})
175 celery_app.conf.update({'PYRAMID_APP': app})
176 celery_app.conf.update({'PYRAMID_ROOT': root})
176 celery_app.conf.update({'PYRAMID_ROOT': root})
177 celery_app.conf.update({'PYRAMID_REQUEST': request})
177 celery_app.conf.update({'PYRAMID_REQUEST': request})
178 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
178 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
179 celery_app.conf.update({'PYRAMID_CLOSER': closer})
179 celery_app.conf.update({'PYRAMID_CLOSER': closer})
180
180
181
181
182 def configure_celery(config, ini_location):
182 def configure_celery(config, ini_location):
183 """
183 """
184 Helper that is called from our application creation logic. It gives
184 Helper that is called from our application creation logic. It gives
185 connection info into running webapp and allows execution of tasks from
185 connection info into running webapp and allows execution of tasks from
186 RhodeCode itself
186 RhodeCode itself
187 """
187 """
188 # store some globals into rhodecode
188 # store some globals into rhodecode
189 rhodecode.CELERY_ENABLED = str2bool(
189 rhodecode.CELERY_ENABLED = str2bool(
190 config.registry.settings.get('use_celery'))
190 config.registry.settings.get('use_celery'))
191 if rhodecode.CELERY_ENABLED:
191 if rhodecode.CELERY_ENABLED:
192 log.info('Configuring celery based on `%s` file', ini_location)
192 log.info('Configuring celery based on `%s` file', ini_location)
193 setup_celery_app(
193 setup_celery_app(
194 app=None, root=None, request=None, registry=config.registry,
194 app=None, root=None, request=None, registry=config.registry,
195 closer=None, ini_location=ini_location)
195 closer=None, ini_location=ini_location)
196
196
197
197
198 def maybe_prepare_env(req):
199 environ = {}
200 try:
201 environ.update({
202 'PATH_INFO': req.environ['PATH_INFO'],
203 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
204 'HTTP_HOST':
205 req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
206 'SERVER_NAME': req.environ['SERVER_NAME'],
207 'SERVER_PORT': req.environ['SERVER_PORT'],
208 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
209 })
210 except Exception:
211 pass
212
213 return environ
214
215
198 class RequestContextTask(Task):
216 class RequestContextTask(Task):
199 """
217 """
200 This is a celery task which will create a rhodecode app instance context
218 This is a celery task which will create a rhodecode app instance context
201 for the task, patch pyramid with the original request
219 for the task, patch pyramid with the original request
202 that created the task and also add the user to the context.
220 that created the task and also add the user to the context.
203 """
221 """
204
222
205 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
223 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
206 link=None, link_error=None, shadow=None, **options):
224 link=None, link_error=None, shadow=None, **options):
207 """ queue the job to run (we are in web request context here) """
225 """ queue the job to run (we are in web request context here) """
208
226
209 req = get_current_request()
227 req = get_current_request()
210
228
211 # web case
229 # web case
212 if hasattr(req, 'user'):
230 if hasattr(req, 'user'):
213 ip_addr = req.user.ip_addr
231 ip_addr = req.user.ip_addr
214 user_id = req.user.user_id
232 user_id = req.user.user_id
215
233
216 # api case
234 # api case
217 elif hasattr(req, 'rpc_user'):
235 elif hasattr(req, 'rpc_user'):
218 ip_addr = req.rpc_user.ip_addr
236 ip_addr = req.rpc_user.ip_addr
219 user_id = req.rpc_user.user_id
237 user_id = req.rpc_user.user_id
220 else:
238 else:
221 raise Exception(
239 raise Exception(
222 'Unable to fetch required data from request: {}. \n'
240 'Unable to fetch required data from request: {}. \n'
223 'This task is required to be executed from context of '
241 'This task is required to be executed from context of '
224 'request in a webapp'.format(repr(req)))
242 'request in a webapp'.format(repr(req)))
225
243
226 if req:
244 if req:
227 # we hook into kwargs since it is the only way to pass our data to
245 # we hook into kwargs since it is the only way to pass our data to
228 # the celery worker
246 # the celery worker
247 environ = maybe_prepare_env(req)
229 options['headers'] = options.get('headers', {})
248 options['headers'] = options.get('headers', {})
230 options['headers'].update({
249 options['headers'].update({
231 'rhodecode_proxy_data': {
250 'rhodecode_proxy_data': {
232 'environ': {
251 'environ': environ,
233 'PATH_INFO': req.environ['PATH_INFO'],
234 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
235 'HTTP_HOST': req.environ.get('HTTP_HOST',
236 req.environ['SERVER_NAME']),
237 'SERVER_NAME': req.environ['SERVER_NAME'],
238 'SERVER_PORT': req.environ['SERVER_PORT'],
239 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
240 },
241 'auth_user': {
252 'auth_user': {
242 'ip_addr': ip_addr,
253 'ip_addr': ip_addr,
243 'user_id': user_id
254 'user_id': user_id
244 },
255 },
245 }
256 }
246 })
257 })
247
258
248 return super(RequestContextTask, self).apply_async(
259 return super(RequestContextTask, self).apply_async(
249 args, kwargs, task_id, producer, link, link_error, shadow, **options)
260 args, kwargs, task_id, producer, link, link_error, shadow, **options)
250
261
251 def __call__(self, *args, **kwargs):
262 def __call__(self, *args, **kwargs):
252 """ rebuild the context and then run task on celery worker """
263 """ rebuild the context and then run task on celery worker """
253
264
254 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
265 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
255 if not proxy_data:
266 if not proxy_data:
256 return super(RequestContextTask, self).__call__(*args, **kwargs)
267 return super(RequestContextTask, self).__call__(*args, **kwargs)
257
268
258 log.debug('using celery proxy data to run task: %r', proxy_data)
269 log.debug('using celery proxy data to run task: %r', proxy_data)
259 # re-inject and register threadlocals for proper routing support
270 # re-inject and register threadlocals for proper routing support
260 request = prepare_request(proxy_data['environ'])
271 request = prepare_request(proxy_data['environ'])
261 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
272 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
262 ip_addr=proxy_data['auth_user']['ip_addr'])
273 ip_addr=proxy_data['auth_user']['ip_addr'])
263
274
264 return super(RequestContextTask, self).__call__(*args, **kwargs)
275 return super(RequestContextTask, self).__call__(*args, **kwargs)
265
276
General Comments 0
You need to be logged in to leave comments. Login now