##// END OF EJS Templates
celery: use safer events for execution of tasks
marcink -
r2464:d10039ef default
parent child Browse files
Show More
@@ -1,276 +1,284 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2017 RhodeCode GmbH
3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 """
20 """
21 Celery loader, run with::
21 Celery loader, run with::
22
22
23 celery worker \
23 celery worker \
24 --beat \
24 --beat \
25 --app rhodecode.lib.celerylib.loader \
25 --app rhodecode.lib.celerylib.loader \
26 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
26 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
27 --loglevel DEBUG --ini=._dev/dev.ini
27 --loglevel DEBUG --ini=._dev/dev.ini
28 """
28 """
29 import os
29 import os
30 import logging
30 import logging
31
31
32 from celery import Celery
32 from celery import Celery
33 from celery import signals
33 from celery import signals
34 from celery import Task
34 from celery import Task
35 from celery import exceptions # noqa
35 from celery import exceptions # noqa
36 from kombu.serialization import register
36 from kombu.serialization import register
37 from pyramid.threadlocal import get_current_request
37 from pyramid.threadlocal import get_current_request
38
38
39 import rhodecode
39 import rhodecode
40
40
41 from rhodecode.lib.auth import AuthUser
41 from rhodecode.lib.auth import AuthUser
42 from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars
42 from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars
43 from rhodecode.lib.ext_json import json
43 from rhodecode.lib.ext_json import json
44 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
44 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
45 from rhodecode.lib.utils2 import str2bool
45 from rhodecode.lib.utils2 import str2bool
46 from rhodecode.model import meta
46 from rhodecode.model import meta
47
47
48
48
49 register('json_ext', json.dumps, json.loads,
49 register('json_ext', json.dumps, json.loads,
50 content_type='application/x-json-ext',
50 content_type='application/x-json-ext',
51 content_encoding='utf-8')
51 content_encoding='utf-8')
52
52
53 log = logging.getLogger('celery.rhodecode.loader')
53 log = logging.getLogger('celery.rhodecode.loader')
54
54
55
55
56 def add_preload_arguments(parser):
56 def add_preload_arguments(parser):
57 parser.add_argument(
57 parser.add_argument(
58 '--ini', default=None,
58 '--ini', default=None,
59 help='Path to ini configuration file.'
59 help='Path to ini configuration file.'
60 )
60 )
61 parser.add_argument(
61 parser.add_argument(
62 '--ini-var', default=None,
62 '--ini-var', default=None,
63 help='Comma separated list of key=value to pass to ini.'
63 help='Comma separated list of key=value to pass to ini.'
64 )
64 )
65
65
66
66
67 def get_logger(obj):
67 def get_logger(obj):
68 custom_log = logging.getLogger(
68 custom_log = logging.getLogger(
69 'rhodecode.task.{}'.format(obj.__class__.__name__))
69 'rhodecode.task.{}'.format(obj.__class__.__name__))
70
70
71 if rhodecode.CELERY_ENABLED:
71 if rhodecode.CELERY_ENABLED:
72 try:
72 try:
73 custom_log = obj.get_logger()
73 custom_log = obj.get_logger()
74 except Exception:
74 except Exception:
75 pass
75 pass
76
76
77 return custom_log
77 return custom_log
78
78
79
79
80 base_celery_config = {
80 base_celery_config = {
81 'result_backend': 'rpc://',
81 'result_backend': 'rpc://',
82 'result_expires': 60 * 60 * 24,
82 'result_expires': 60 * 60 * 24,
83 'result_persistent': True,
83 'result_persistent': True,
84 'imports': [],
84 'imports': [],
85 'worker_max_tasks_per_child': 100,
85 'worker_max_tasks_per_child': 100,
86 'accept_content': ['json_ext'],
86 'accept_content': ['json_ext'],
87 'task_serializer': 'json_ext',
87 'task_serializer': 'json_ext',
88 'result_serializer': 'json_ext',
88 'result_serializer': 'json_ext',
89 'worker_hijack_root_logger': False,
89 'worker_hijack_root_logger': False,
90 'database_table_names': {
90 'database_table_names': {
91 'task': 'beat_taskmeta',
91 'task': 'beat_taskmeta',
92 'group': 'beat_groupmeta',
92 'group': 'beat_groupmeta',
93 }
93 }
94 }
94 }
95 # init main celery app
95 # init main celery app
96 celery_app = Celery()
96 celery_app = Celery()
97 celery_app.user_options['preload'].add(add_preload_arguments)
97 celery_app.user_options['preload'].add(add_preload_arguments)
98 ini_file_glob = None
98 ini_file_glob = None
99
99
100
100
101 @signals.setup_logging.connect
101 @signals.setup_logging.connect
102 def setup_logging_callback(**kwargs):
102 def setup_logging_callback(**kwargs):
103 setup_logging(ini_file_glob)
103 setup_logging(ini_file_glob)
104
104
105
105
106 @signals.user_preload_options.connect
106 @signals.user_preload_options.connect
107 def on_preload_parsed(options, **kwargs):
107 def on_preload_parsed(options, **kwargs):
108 ini_location = options['ini']
108 ini_location = options['ini']
109 ini_vars = options['ini_var']
109 ini_vars = options['ini_var']
110 celery_app.conf['INI_PYRAMID'] = options['ini']
110 celery_app.conf['INI_PYRAMID'] = options['ini']
111
111
112 if ini_location is None:
112 if ini_location is None:
113 print('You must provide the paste --ini argument')
113 print('You must provide the paste --ini argument')
114 exit(-1)
114 exit(-1)
115
115
116 options = None
116 options = None
117 if ini_vars is not None:
117 if ini_vars is not None:
118 options = parse_ini_vars(ini_vars)
118 options = parse_ini_vars(ini_vars)
119
119
120 global ini_file_glob
120 global ini_file_glob
121 ini_file_glob = ini_location
121 ini_file_glob = ini_location
122
122
123 log.debug('Bootstrapping RhodeCode application...')
123 log.debug('Bootstrapping RhodeCode application...')
124 env = bootstrap(ini_location, options=options)
124 env = bootstrap(ini_location, options=options)
125
125
126 setup_celery_app(
126 setup_celery_app(
127 app=env['app'], root=env['root'], request=env['request'],
127 app=env['app'], root=env['root'], request=env['request'],
128 registry=env['registry'], closer=env['closer'],
128 registry=env['registry'], closer=env['closer'],
129 ini_location=ini_location)
129 ini_location=ini_location)
130
130
131 # fix the global flag even if it's disabled via .ini file because this
131 # fix the global flag even if it's disabled via .ini file because this
132 # is a worker code that doesn't need this to be disabled.
132 # is a worker code that doesn't need this to be disabled.
133 rhodecode.CELERY_ENABLED = True
133 rhodecode.CELERY_ENABLED = True
134
134
135
135
136 @signals.task_success.connect
136 @signals.task_success.connect
137 def task_success_signal(result, **kwargs):
137 def task_success_signal(result, **kwargs):
138 meta.Session.commit()
138 meta.Session.commit()
139 celery_app.conf['PYRAMID_CLOSER']()
139 closer = celery_app.conf['PYRAMID_CLOSER']
140 if closer:
141 closer()
140
142
141
143
142 @signals.task_retry.connect
144 @signals.task_retry.connect
143 def task_retry_signal(
145 def task_retry_signal(
144 request, reason, einfo, **kwargs):
146 request, reason, einfo, **kwargs):
145 meta.Session.remove()
147 meta.Session.remove()
146 celery_app.conf['PYRAMID_CLOSER']()
148 closer = celery_app.conf['PYRAMID_CLOSER']
149 if closer:
150 closer()
147
151
148
152
149 @signals.task_failure.connect
153 @signals.task_failure.connect
150 def task_failure_signal(
154 def task_failure_signal(
151 task_id, exception, args, kwargs, traceback, einfo, **kargs):
155 task_id, exception, args, kwargs, traceback, einfo, **kargs):
152 meta.Session.remove()
156 meta.Session.remove()
153 celery_app.conf['PYRAMID_CLOSER']()
157 closer = celery_app.conf['PYRAMID_CLOSER']
158 if closer:
159 closer()
154
160
155
161
156 @signals.task_revoked.connect
162 @signals.task_revoked.connect
157 def task_revoked_signal(
163 def task_revoked_signal(
158 request, terminated, signum, expired, **kwargs):
164 request, terminated, signum, expired, **kwargs):
159 celery_app.conf['PYRAMID_CLOSER']()
165 closer = celery_app.conf['PYRAMID_CLOSER']
166 if closer:
167 closer()
160
168
161
169
162 def setup_celery_app(app, root, request, registry, closer, ini_location):
170 def setup_celery_app(app, root, request, registry, closer, ini_location):
163 ini_dir = os.path.dirname(os.path.abspath(ini_location))
171 ini_dir = os.path.dirname(os.path.abspath(ini_location))
164 celery_config = base_celery_config
172 celery_config = base_celery_config
165 celery_config.update({
173 celery_config.update({
166 # store celerybeat scheduler db where the .ini file is
174 # store celerybeat scheduler db where the .ini file is
167 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'),
175 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'),
168 })
176 })
169 ini_settings = get_ini_config(ini_location)
177 ini_settings = get_ini_config(ini_location)
170 log.debug('Got custom celery conf: %s', ini_settings)
178 log.debug('Got custom celery conf: %s', ini_settings)
171
179
172 celery_config.update(ini_settings)
180 celery_config.update(ini_settings)
173 celery_app.config_from_object(celery_config)
181 celery_app.config_from_object(celery_config)
174
182
175 celery_app.conf.update({'PYRAMID_APP': app})
183 celery_app.conf.update({'PYRAMID_APP': app})
176 celery_app.conf.update({'PYRAMID_ROOT': root})
184 celery_app.conf.update({'PYRAMID_ROOT': root})
177 celery_app.conf.update({'PYRAMID_REQUEST': request})
185 celery_app.conf.update({'PYRAMID_REQUEST': request})
178 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
186 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
179 celery_app.conf.update({'PYRAMID_CLOSER': closer})
187 celery_app.conf.update({'PYRAMID_CLOSER': closer})
180
188
181
189
182 def configure_celery(config, ini_location):
190 def configure_celery(config, ini_location):
183 """
191 """
184 Helper that is called from our application creation logic. It gives
192 Helper that is called from our application creation logic. It gives
185 connection info into running webapp and allows execution of tasks from
193 connection info into running webapp and allows execution of tasks from
186 RhodeCode itself
194 RhodeCode itself
187 """
195 """
188 # store some globals into rhodecode
196 # store some globals into rhodecode
189 rhodecode.CELERY_ENABLED = str2bool(
197 rhodecode.CELERY_ENABLED = str2bool(
190 config.registry.settings.get('use_celery'))
198 config.registry.settings.get('use_celery'))
191 if rhodecode.CELERY_ENABLED:
199 if rhodecode.CELERY_ENABLED:
192 log.info('Configuring celery based on `%s` file', ini_location)
200 log.info('Configuring celery based on `%s` file', ini_location)
193 setup_celery_app(
201 setup_celery_app(
194 app=None, root=None, request=None, registry=config.registry,
202 app=None, root=None, request=None, registry=config.registry,
195 closer=None, ini_location=ini_location)
203 closer=None, ini_location=ini_location)
196
204
197
205
198 def maybe_prepare_env(req):
206 def maybe_prepare_env(req):
199 environ = {}
207 environ = {}
200 try:
208 try:
201 environ.update({
209 environ.update({
202 'PATH_INFO': req.environ['PATH_INFO'],
210 'PATH_INFO': req.environ['PATH_INFO'],
203 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
211 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
204 'HTTP_HOST':
212 'HTTP_HOST':
205 req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
213 req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
206 'SERVER_NAME': req.environ['SERVER_NAME'],
214 'SERVER_NAME': req.environ['SERVER_NAME'],
207 'SERVER_PORT': req.environ['SERVER_PORT'],
215 'SERVER_PORT': req.environ['SERVER_PORT'],
208 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
216 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
209 })
217 })
210 except Exception:
218 except Exception:
211 pass
219 pass
212
220
213 return environ
221 return environ
214
222
215
223
216 class RequestContextTask(Task):
224 class RequestContextTask(Task):
217 """
225 """
218 This is a celery task which will create a rhodecode app instance context
226 This is a celery task which will create a rhodecode app instance context
219 for the task, patch pyramid with the original request
227 for the task, patch pyramid with the original request
220 that created the task and also add the user to the context.
228 that created the task and also add the user to the context.
221 """
229 """
222
230
223 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
231 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
224 link=None, link_error=None, shadow=None, **options):
232 link=None, link_error=None, shadow=None, **options):
225 """ queue the job to run (we are in web request context here) """
233 """ queue the job to run (we are in web request context here) """
226
234
227 req = get_current_request()
235 req = get_current_request()
228
236
229 # web case
237 # web case
230 if hasattr(req, 'user'):
238 if hasattr(req, 'user'):
231 ip_addr = req.user.ip_addr
239 ip_addr = req.user.ip_addr
232 user_id = req.user.user_id
240 user_id = req.user.user_id
233
241
234 # api case
242 # api case
235 elif hasattr(req, 'rpc_user'):
243 elif hasattr(req, 'rpc_user'):
236 ip_addr = req.rpc_user.ip_addr
244 ip_addr = req.rpc_user.ip_addr
237 user_id = req.rpc_user.user_id
245 user_id = req.rpc_user.user_id
238 else:
246 else:
239 raise Exception(
247 raise Exception(
240 'Unable to fetch required data from request: {}. \n'
248 'Unable to fetch required data from request: {}. \n'
241 'This task is required to be executed from context of '
249 'This task is required to be executed from context of '
242 'request in a webapp'.format(repr(req)))
250 'request in a webapp'.format(repr(req)))
243
251
244 if req:
252 if req:
245 # we hook into kwargs since it is the only way to pass our data to
253 # we hook into kwargs since it is the only way to pass our data to
246 # the celery worker
254 # the celery worker
247 environ = maybe_prepare_env(req)
255 environ = maybe_prepare_env(req)
248 options['headers'] = options.get('headers', {})
256 options['headers'] = options.get('headers', {})
249 options['headers'].update({
257 options['headers'].update({
250 'rhodecode_proxy_data': {
258 'rhodecode_proxy_data': {
251 'environ': environ,
259 'environ': environ,
252 'auth_user': {
260 'auth_user': {
253 'ip_addr': ip_addr,
261 'ip_addr': ip_addr,
254 'user_id': user_id
262 'user_id': user_id
255 },
263 },
256 }
264 }
257 })
265 })
258
266
259 return super(RequestContextTask, self).apply_async(
267 return super(RequestContextTask, self).apply_async(
260 args, kwargs, task_id, producer, link, link_error, shadow, **options)
268 args, kwargs, task_id, producer, link, link_error, shadow, **options)
261
269
262 def __call__(self, *args, **kwargs):
270 def __call__(self, *args, **kwargs):
263 """ rebuild the context and then run task on celery worker """
271 """ rebuild the context and then run task on celery worker """
264
272
265 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
273 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
266 if not proxy_data:
274 if not proxy_data:
267 return super(RequestContextTask, self).__call__(*args, **kwargs)
275 return super(RequestContextTask, self).__call__(*args, **kwargs)
268
276
269 log.debug('using celery proxy data to run task: %r', proxy_data)
277 log.debug('using celery proxy data to run task: %r', proxy_data)
270 # re-inject and register threadlocals for proper routing support
278 # re-inject and register threadlocals for proper routing support
271 request = prepare_request(proxy_data['environ'])
279 request = prepare_request(proxy_data['environ'])
272 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
280 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
273 ip_addr=proxy_data['auth_user']['ip_addr'])
281 ip_addr=proxy_data['auth_user']['ip_addr'])
274
282
275 return super(RequestContextTask, self).__call__(*args, **kwargs)
283 return super(RequestContextTask, self).__call__(*args, **kwargs)
276
284
General Comments 0
You need to be logged in to leave comments. Login now