##// END OF EJS Templates
celery: register EE tasks if we have EE enabled
marcink -
r2465:0e246bbe default
parent child Browse files
Show More
@@ -1,284 +1,295 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2017 RhodeCode GmbH
3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 """
20 """
21 Celery loader, run with::
21 Celery loader, run with::
22
22
23 celery worker \
23 celery worker \
24 --beat \
24 --beat \
25 --app rhodecode.lib.celerylib.loader \
25 --app rhodecode.lib.celerylib.loader \
26 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
26 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
27 --loglevel DEBUG --ini=._dev/dev.ini
27 --loglevel DEBUG --ini=._dev/dev.ini
28 """
28 """
29 import os
29 import os
30 import logging
30 import logging
31 import importlib
31
32
32 from celery import Celery
33 from celery import Celery
33 from celery import signals
34 from celery import signals
34 from celery import Task
35 from celery import Task
35 from celery import exceptions # noqa
36 from celery import exceptions # noqa
36 from kombu.serialization import register
37 from kombu.serialization import register
37 from pyramid.threadlocal import get_current_request
38 from pyramid.threadlocal import get_current_request
38
39
39 import rhodecode
40 import rhodecode
40
41
41 from rhodecode.lib.auth import AuthUser
42 from rhodecode.lib.auth import AuthUser
42 from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars
43 from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars
43 from rhodecode.lib.ext_json import json
44 from rhodecode.lib.ext_json import json
44 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
45 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
45 from rhodecode.lib.utils2 import str2bool
46 from rhodecode.lib.utils2 import str2bool
46 from rhodecode.model import meta
47 from rhodecode.model import meta
47
48
48
49
49 register('json_ext', json.dumps, json.loads,
50 register('json_ext', json.dumps, json.loads,
50 content_type='application/x-json-ext',
51 content_type='application/x-json-ext',
51 content_encoding='utf-8')
52 content_encoding='utf-8')
52
53
53 log = logging.getLogger('celery.rhodecode.loader')
54 log = logging.getLogger('celery.rhodecode.loader')
54
55
55
56
56 def add_preload_arguments(parser):
57 def add_preload_arguments(parser):
57 parser.add_argument(
58 parser.add_argument(
58 '--ini', default=None,
59 '--ini', default=None,
59 help='Path to ini configuration file.'
60 help='Path to ini configuration file.'
60 )
61 )
61 parser.add_argument(
62 parser.add_argument(
62 '--ini-var', default=None,
63 '--ini-var', default=None,
63 help='Comma separated list of key=value to pass to ini.'
64 help='Comma separated list of key=value to pass to ini.'
64 )
65 )
65
66
66
67
67 def get_logger(obj):
68 def get_logger(obj):
68 custom_log = logging.getLogger(
69 custom_log = logging.getLogger(
69 'rhodecode.task.{}'.format(obj.__class__.__name__))
70 'rhodecode.task.{}'.format(obj.__class__.__name__))
70
71
71 if rhodecode.CELERY_ENABLED:
72 if rhodecode.CELERY_ENABLED:
72 try:
73 try:
73 custom_log = obj.get_logger()
74 custom_log = obj.get_logger()
74 except Exception:
75 except Exception:
75 pass
76 pass
76
77
77 return custom_log
78 return custom_log
78
79
79
80
81 imports = ['rhodecode.lib.celerylib.tasks']
82
83 try:
84 # try if we have EE tasks available
85 importlib.import_module('rc_ee')
86 imports.append('rc_ee.lib.celerylib.tasks')
87 except ImportError:
88 pass
89
90
80 base_celery_config = {
91 base_celery_config = {
81 'result_backend': 'rpc://',
92 'result_backend': 'rpc://',
82 'result_expires': 60 * 60 * 24,
93 'result_expires': 60 * 60 * 24,
83 'result_persistent': True,
94 'result_persistent': True,
84 'imports': [],
95 'imports': imports,
85 'worker_max_tasks_per_child': 100,
96 'worker_max_tasks_per_child': 100,
86 'accept_content': ['json_ext'],
97 'accept_content': ['json_ext'],
87 'task_serializer': 'json_ext',
98 'task_serializer': 'json_ext',
88 'result_serializer': 'json_ext',
99 'result_serializer': 'json_ext',
89 'worker_hijack_root_logger': False,
100 'worker_hijack_root_logger': False,
90 'database_table_names': {
101 'database_table_names': {
91 'task': 'beat_taskmeta',
102 'task': 'beat_taskmeta',
92 'group': 'beat_groupmeta',
103 'group': 'beat_groupmeta',
93 }
104 }
94 }
105 }
95 # init main celery app
106 # init main celery app
96 celery_app = Celery()
107 celery_app = Celery()
97 celery_app.user_options['preload'].add(add_preload_arguments)
108 celery_app.user_options['preload'].add(add_preload_arguments)
98 ini_file_glob = None
109 ini_file_glob = None
99
110
100
111
101 @signals.setup_logging.connect
112 @signals.setup_logging.connect
102 def setup_logging_callback(**kwargs):
113 def setup_logging_callback(**kwargs):
103 setup_logging(ini_file_glob)
114 setup_logging(ini_file_glob)
104
115
105
116
106 @signals.user_preload_options.connect
117 @signals.user_preload_options.connect
107 def on_preload_parsed(options, **kwargs):
118 def on_preload_parsed(options, **kwargs):
108 ini_location = options['ini']
119 ini_location = options['ini']
109 ini_vars = options['ini_var']
120 ini_vars = options['ini_var']
110 celery_app.conf['INI_PYRAMID'] = options['ini']
121 celery_app.conf['INI_PYRAMID'] = options['ini']
111
122
112 if ini_location is None:
123 if ini_location is None:
113 print('You must provide the paste --ini argument')
124 print('You must provide the paste --ini argument')
114 exit(-1)
125 exit(-1)
115
126
116 options = None
127 options = None
117 if ini_vars is not None:
128 if ini_vars is not None:
118 options = parse_ini_vars(ini_vars)
129 options = parse_ini_vars(ini_vars)
119
130
120 global ini_file_glob
131 global ini_file_glob
121 ini_file_glob = ini_location
132 ini_file_glob = ini_location
122
133
123 log.debug('Bootstrapping RhodeCode application...')
134 log.debug('Bootstrapping RhodeCode application...')
124 env = bootstrap(ini_location, options=options)
135 env = bootstrap(ini_location, options=options)
125
136
126 setup_celery_app(
137 setup_celery_app(
127 app=env['app'], root=env['root'], request=env['request'],
138 app=env['app'], root=env['root'], request=env['request'],
128 registry=env['registry'], closer=env['closer'],
139 registry=env['registry'], closer=env['closer'],
129 ini_location=ini_location)
140 ini_location=ini_location)
130
141
131 # fix the global flag even if it's disabled via .ini file because this
142 # fix the global flag even if it's disabled via .ini file because this
132 # is a worker code that doesn't need this to be disabled.
143 # is a worker code that doesn't need this to be disabled.
133 rhodecode.CELERY_ENABLED = True
144 rhodecode.CELERY_ENABLED = True
134
145
135
146
136 @signals.task_success.connect
147 @signals.task_success.connect
137 def task_success_signal(result, **kwargs):
148 def task_success_signal(result, **kwargs):
138 meta.Session.commit()
149 meta.Session.commit()
139 closer = celery_app.conf['PYRAMID_CLOSER']
150 closer = celery_app.conf['PYRAMID_CLOSER']
140 if closer:
151 if closer:
141 closer()
152 closer()
142
153
143
154
144 @signals.task_retry.connect
155 @signals.task_retry.connect
145 def task_retry_signal(
156 def task_retry_signal(
146 request, reason, einfo, **kwargs):
157 request, reason, einfo, **kwargs):
147 meta.Session.remove()
158 meta.Session.remove()
148 closer = celery_app.conf['PYRAMID_CLOSER']
159 closer = celery_app.conf['PYRAMID_CLOSER']
149 if closer:
160 if closer:
150 closer()
161 closer()
151
162
152
163
153 @signals.task_failure.connect
164 @signals.task_failure.connect
154 def task_failure_signal(
165 def task_failure_signal(
155 task_id, exception, args, kwargs, traceback, einfo, **kargs):
166 task_id, exception, args, kwargs, traceback, einfo, **kargs):
156 meta.Session.remove()
167 meta.Session.remove()
157 closer = celery_app.conf['PYRAMID_CLOSER']
168 closer = celery_app.conf['PYRAMID_CLOSER']
158 if closer:
169 if closer:
159 closer()
170 closer()
160
171
161
172
162 @signals.task_revoked.connect
173 @signals.task_revoked.connect
163 def task_revoked_signal(
174 def task_revoked_signal(
164 request, terminated, signum, expired, **kwargs):
175 request, terminated, signum, expired, **kwargs):
165 closer = celery_app.conf['PYRAMID_CLOSER']
176 closer = celery_app.conf['PYRAMID_CLOSER']
166 if closer:
177 if closer:
167 closer()
178 closer()
168
179
169
180
170 def setup_celery_app(app, root, request, registry, closer, ini_location):
181 def setup_celery_app(app, root, request, registry, closer, ini_location):
171 ini_dir = os.path.dirname(os.path.abspath(ini_location))
182 ini_dir = os.path.dirname(os.path.abspath(ini_location))
172 celery_config = base_celery_config
183 celery_config = base_celery_config
173 celery_config.update({
184 celery_config.update({
174 # store celerybeat scheduler db where the .ini file is
185 # store celerybeat scheduler db where the .ini file is
175 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'),
186 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'),
176 })
187 })
177 ini_settings = get_ini_config(ini_location)
188 ini_settings = get_ini_config(ini_location)
178 log.debug('Got custom celery conf: %s', ini_settings)
189 log.debug('Got custom celery conf: %s', ini_settings)
179
190
180 celery_config.update(ini_settings)
191 celery_config.update(ini_settings)
181 celery_app.config_from_object(celery_config)
192 celery_app.config_from_object(celery_config)
182
193
183 celery_app.conf.update({'PYRAMID_APP': app})
194 celery_app.conf.update({'PYRAMID_APP': app})
184 celery_app.conf.update({'PYRAMID_ROOT': root})
195 celery_app.conf.update({'PYRAMID_ROOT': root})
185 celery_app.conf.update({'PYRAMID_REQUEST': request})
196 celery_app.conf.update({'PYRAMID_REQUEST': request})
186 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
197 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
187 celery_app.conf.update({'PYRAMID_CLOSER': closer})
198 celery_app.conf.update({'PYRAMID_CLOSER': closer})
188
199
189
200
190 def configure_celery(config, ini_location):
201 def configure_celery(config, ini_location):
191 """
202 """
192 Helper that is called from our application creation logic. It gives
203 Helper that is called from our application creation logic. It gives
193 connection info into running webapp and allows execution of tasks from
204 connection info into running webapp and allows execution of tasks from
194 RhodeCode itself
205 RhodeCode itself
195 """
206 """
196 # store some globals into rhodecode
207 # store some globals into rhodecode
197 rhodecode.CELERY_ENABLED = str2bool(
208 rhodecode.CELERY_ENABLED = str2bool(
198 config.registry.settings.get('use_celery'))
209 config.registry.settings.get('use_celery'))
199 if rhodecode.CELERY_ENABLED:
210 if rhodecode.CELERY_ENABLED:
200 log.info('Configuring celery based on `%s` file', ini_location)
211 log.info('Configuring celery based on `%s` file', ini_location)
201 setup_celery_app(
212 setup_celery_app(
202 app=None, root=None, request=None, registry=config.registry,
213 app=None, root=None, request=None, registry=config.registry,
203 closer=None, ini_location=ini_location)
214 closer=None, ini_location=ini_location)
204
215
205
216
206 def maybe_prepare_env(req):
217 def maybe_prepare_env(req):
207 environ = {}
218 environ = {}
208 try:
219 try:
209 environ.update({
220 environ.update({
210 'PATH_INFO': req.environ['PATH_INFO'],
221 'PATH_INFO': req.environ['PATH_INFO'],
211 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
222 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
212 'HTTP_HOST':
223 'HTTP_HOST':
213 req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
224 req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
214 'SERVER_NAME': req.environ['SERVER_NAME'],
225 'SERVER_NAME': req.environ['SERVER_NAME'],
215 'SERVER_PORT': req.environ['SERVER_PORT'],
226 'SERVER_PORT': req.environ['SERVER_PORT'],
216 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
227 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
217 })
228 })
218 except Exception:
229 except Exception:
219 pass
230 pass
220
231
221 return environ
232 return environ
222
233
223
234
224 class RequestContextTask(Task):
235 class RequestContextTask(Task):
225 """
236 """
226 This is a celery task which will create a rhodecode app instance context
237 This is a celery task which will create a rhodecode app instance context
227 for the task, patch pyramid with the original request
238 for the task, patch pyramid with the original request
228 that created the task and also add the user to the context.
239 that created the task and also add the user to the context.
229 """
240 """
230
241
231 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
242 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
232 link=None, link_error=None, shadow=None, **options):
243 link=None, link_error=None, shadow=None, **options):
233 """ queue the job to run (we are in web request context here) """
244 """ queue the job to run (we are in web request context here) """
234
245
235 req = get_current_request()
246 req = get_current_request()
236
247
237 # web case
248 # web case
238 if hasattr(req, 'user'):
249 if hasattr(req, 'user'):
239 ip_addr = req.user.ip_addr
250 ip_addr = req.user.ip_addr
240 user_id = req.user.user_id
251 user_id = req.user.user_id
241
252
242 # api case
253 # api case
243 elif hasattr(req, 'rpc_user'):
254 elif hasattr(req, 'rpc_user'):
244 ip_addr = req.rpc_user.ip_addr
255 ip_addr = req.rpc_user.ip_addr
245 user_id = req.rpc_user.user_id
256 user_id = req.rpc_user.user_id
246 else:
257 else:
247 raise Exception(
258 raise Exception(
248 'Unable to fetch required data from request: {}. \n'
259 'Unable to fetch required data from request: {}. \n'
249 'This task is required to be executed from context of '
260 'This task is required to be executed from context of '
250 'request in a webapp'.format(repr(req)))
261 'request in a webapp'.format(repr(req)))
251
262
252 if req:
263 if req:
253 # we hook into kwargs since it is the only way to pass our data to
264 # we hook into kwargs since it is the only way to pass our data to
254 # the celery worker
265 # the celery worker
255 environ = maybe_prepare_env(req)
266 environ = maybe_prepare_env(req)
256 options['headers'] = options.get('headers', {})
267 options['headers'] = options.get('headers', {})
257 options['headers'].update({
268 options['headers'].update({
258 'rhodecode_proxy_data': {
269 'rhodecode_proxy_data': {
259 'environ': environ,
270 'environ': environ,
260 'auth_user': {
271 'auth_user': {
261 'ip_addr': ip_addr,
272 'ip_addr': ip_addr,
262 'user_id': user_id
273 'user_id': user_id
263 },
274 },
264 }
275 }
265 })
276 })
266
277
267 return super(RequestContextTask, self).apply_async(
278 return super(RequestContextTask, self).apply_async(
268 args, kwargs, task_id, producer, link, link_error, shadow, **options)
279 args, kwargs, task_id, producer, link, link_error, shadow, **options)
269
280
270 def __call__(self, *args, **kwargs):
281 def __call__(self, *args, **kwargs):
271 """ rebuild the context and then run task on celery worker """
282 """ rebuild the context and then run task on celery worker """
272
283
273 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
284 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
274 if not proxy_data:
285 if not proxy_data:
275 return super(RequestContextTask, self).__call__(*args, **kwargs)
286 return super(RequestContextTask, self).__call__(*args, **kwargs)
276
287
277 log.debug('using celery proxy data to run task: %r', proxy_data)
288 log.debug('using celery proxy data to run task: %r', proxy_data)
278 # re-inject and register threadlocals for proper routing support
289 # re-inject and register threadlocals for proper routing support
279 request = prepare_request(proxy_data['environ'])
290 request = prepare_request(proxy_data['environ'])
280 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
291 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
281 ip_addr=proxy_data['auth_user']['ip_addr'])
292 ip_addr=proxy_data['auth_user']['ip_addr'])
282
293
283 return super(RequestContextTask, self).__call__(*args, **kwargs)
294 return super(RequestContextTask, self).__call__(*args, **kwargs)
284
295
General Comments 0
You need to be logged in to leave comments. Login now