##// END OF EJS Templates
celery: register EE tasks if we have EE enabled
marcink -
r2465:0e246bbe default
parent child Browse files
Show More
@@ -1,284 +1,295 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 """
21 21 Celery loader, run with::
22 22
23 23 celery worker \
24 24 --beat \
25 25 --app rhodecode.lib.celerylib.loader \
26 26 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
27 27 --loglevel DEBUG --ini=._dev/dev.ini
28 28 """
29 29 import os
30 30 import logging
31 import importlib
31 32
32 33 from celery import Celery
33 34 from celery import signals
34 35 from celery import Task
35 36 from celery import exceptions # noqa
36 37 from kombu.serialization import register
37 38 from pyramid.threadlocal import get_current_request
38 39
39 40 import rhodecode
40 41
41 42 from rhodecode.lib.auth import AuthUser
42 43 from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars
43 44 from rhodecode.lib.ext_json import json
44 45 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
45 46 from rhodecode.lib.utils2 import str2bool
46 47 from rhodecode.model import meta
47 48
48 49
49 50 register('json_ext', json.dumps, json.loads,
50 51 content_type='application/x-json-ext',
51 52 content_encoding='utf-8')
52 53
53 54 log = logging.getLogger('celery.rhodecode.loader')
54 55
55 56
56 57 def add_preload_arguments(parser):
57 58 parser.add_argument(
58 59 '--ini', default=None,
59 60 help='Path to ini configuration file.'
60 61 )
61 62 parser.add_argument(
62 63 '--ini-var', default=None,
63 64 help='Comma separated list of key=value to pass to ini.'
64 65 )
65 66
66 67
67 68 def get_logger(obj):
68 69 custom_log = logging.getLogger(
69 70 'rhodecode.task.{}'.format(obj.__class__.__name__))
70 71
71 72 if rhodecode.CELERY_ENABLED:
72 73 try:
73 74 custom_log = obj.get_logger()
74 75 except Exception:
75 76 pass
76 77
77 78 return custom_log
78 79
79 80
81 imports = ['rhodecode.lib.celerylib.tasks']
82
83 try:
84 # try if we have EE tasks available
85 importlib.import_module('rc_ee')
86 imports.append('rc_ee.lib.celerylib.tasks')
87 except ImportError:
88 pass
89
90
80 91 base_celery_config = {
81 92 'result_backend': 'rpc://',
82 93 'result_expires': 60 * 60 * 24,
83 94 'result_persistent': True,
84 'imports': [],
95 'imports': imports,
85 96 'worker_max_tasks_per_child': 100,
86 97 'accept_content': ['json_ext'],
87 98 'task_serializer': 'json_ext',
88 99 'result_serializer': 'json_ext',
89 100 'worker_hijack_root_logger': False,
90 101 'database_table_names': {
91 102 'task': 'beat_taskmeta',
92 103 'group': 'beat_groupmeta',
93 104 }
94 105 }
95 106 # init main celery app
96 107 celery_app = Celery()
97 108 celery_app.user_options['preload'].add(add_preload_arguments)
98 109 ini_file_glob = None
99 110
100 111
101 112 @signals.setup_logging.connect
102 113 def setup_logging_callback(**kwargs):
103 114 setup_logging(ini_file_glob)
104 115
105 116
106 117 @signals.user_preload_options.connect
107 118 def on_preload_parsed(options, **kwargs):
108 119 ini_location = options['ini']
109 120 ini_vars = options['ini_var']
110 121 celery_app.conf['INI_PYRAMID'] = options['ini']
111 122
112 123 if ini_location is None:
113 124 print('You must provide the paste --ini argument')
114 125 exit(-1)
115 126
116 127 options = None
117 128 if ini_vars is not None:
118 129 options = parse_ini_vars(ini_vars)
119 130
120 131 global ini_file_glob
121 132 ini_file_glob = ini_location
122 133
123 134 log.debug('Bootstrapping RhodeCode application...')
124 135 env = bootstrap(ini_location, options=options)
125 136
126 137 setup_celery_app(
127 138 app=env['app'], root=env['root'], request=env['request'],
128 139 registry=env['registry'], closer=env['closer'],
129 140 ini_location=ini_location)
130 141
131 142 # fix the global flag even if it's disabled via .ini file because this
132 143 # is a worker code that doesn't need this to be disabled.
133 144 rhodecode.CELERY_ENABLED = True
134 145
135 146
136 147 @signals.task_success.connect
137 148 def task_success_signal(result, **kwargs):
138 149 meta.Session.commit()
139 150 closer = celery_app.conf['PYRAMID_CLOSER']
140 151 if closer:
141 152 closer()
142 153
143 154
144 155 @signals.task_retry.connect
145 156 def task_retry_signal(
146 157 request, reason, einfo, **kwargs):
147 158 meta.Session.remove()
148 159 closer = celery_app.conf['PYRAMID_CLOSER']
149 160 if closer:
150 161 closer()
151 162
152 163
153 164 @signals.task_failure.connect
154 165 def task_failure_signal(
155 166 task_id, exception, args, kwargs, traceback, einfo, **kargs):
156 167 meta.Session.remove()
157 168 closer = celery_app.conf['PYRAMID_CLOSER']
158 169 if closer:
159 170 closer()
160 171
161 172
162 173 @signals.task_revoked.connect
163 174 def task_revoked_signal(
164 175 request, terminated, signum, expired, **kwargs):
165 176 closer = celery_app.conf['PYRAMID_CLOSER']
166 177 if closer:
167 178 closer()
168 179
169 180
170 181 def setup_celery_app(app, root, request, registry, closer, ini_location):
171 182 ini_dir = os.path.dirname(os.path.abspath(ini_location))
172 183 celery_config = base_celery_config
173 184 celery_config.update({
174 185 # store celerybeat scheduler db where the .ini file is
175 186 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'),
176 187 })
177 188 ini_settings = get_ini_config(ini_location)
178 189 log.debug('Got custom celery conf: %s', ini_settings)
179 190
180 191 celery_config.update(ini_settings)
181 192 celery_app.config_from_object(celery_config)
182 193
183 194 celery_app.conf.update({'PYRAMID_APP': app})
184 195 celery_app.conf.update({'PYRAMID_ROOT': root})
185 196 celery_app.conf.update({'PYRAMID_REQUEST': request})
186 197 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
187 198 celery_app.conf.update({'PYRAMID_CLOSER': closer})
188 199
189 200
190 201 def configure_celery(config, ini_location):
191 202 """
192 203 Helper that is called from our application creation logic. It gives
193 204 connection info into running webapp and allows execution of tasks from
194 205 RhodeCode itself
195 206 """
196 207 # store some globals into rhodecode
197 208 rhodecode.CELERY_ENABLED = str2bool(
198 209 config.registry.settings.get('use_celery'))
199 210 if rhodecode.CELERY_ENABLED:
200 211 log.info('Configuring celery based on `%s` file', ini_location)
201 212 setup_celery_app(
202 213 app=None, root=None, request=None, registry=config.registry,
203 214 closer=None, ini_location=ini_location)
204 215
205 216
206 217 def maybe_prepare_env(req):
207 218 environ = {}
208 219 try:
209 220 environ.update({
210 221 'PATH_INFO': req.environ['PATH_INFO'],
211 222 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
212 223 'HTTP_HOST':
213 224 req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
214 225 'SERVER_NAME': req.environ['SERVER_NAME'],
215 226 'SERVER_PORT': req.environ['SERVER_PORT'],
216 227 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
217 228 })
218 229 except Exception:
219 230 pass
220 231
221 232 return environ
222 233
223 234
224 235 class RequestContextTask(Task):
225 236 """
226 237 This is a celery task which will create a rhodecode app instance context
227 238 for the task, patch pyramid with the original request
228 239 that created the task and also add the user to the context.
229 240 """
230 241
231 242 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
232 243 link=None, link_error=None, shadow=None, **options):
233 244 """ queue the job to run (we are in web request context here) """
234 245
235 246 req = get_current_request()
236 247
237 248 # web case
238 249 if hasattr(req, 'user'):
239 250 ip_addr = req.user.ip_addr
240 251 user_id = req.user.user_id
241 252
242 253 # api case
243 254 elif hasattr(req, 'rpc_user'):
244 255 ip_addr = req.rpc_user.ip_addr
245 256 user_id = req.rpc_user.user_id
246 257 else:
247 258 raise Exception(
248 259 'Unable to fetch required data from request: {}. \n'
249 260 'This task is required to be executed from context of '
250 261 'request in a webapp'.format(repr(req)))
251 262
252 263 if req:
253 264 # we hook into kwargs since it is the only way to pass our data to
254 265 # the celery worker
255 266 environ = maybe_prepare_env(req)
256 267 options['headers'] = options.get('headers', {})
257 268 options['headers'].update({
258 269 'rhodecode_proxy_data': {
259 270 'environ': environ,
260 271 'auth_user': {
261 272 'ip_addr': ip_addr,
262 273 'user_id': user_id
263 274 },
264 275 }
265 276 })
266 277
267 278 return super(RequestContextTask, self).apply_async(
268 279 args, kwargs, task_id, producer, link, link_error, shadow, **options)
269 280
270 281 def __call__(self, *args, **kwargs):
271 282 """ rebuild the context and then run task on celery worker """
272 283
273 284 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
274 285 if not proxy_data:
275 286 return super(RequestContextTask, self).__call__(*args, **kwargs)
276 287
277 288 log.debug('using celery proxy data to run task: %r', proxy_data)
278 289 # re-inject and register threadlocals for proper routing support
279 290 request = prepare_request(proxy_data['environ'])
280 291 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
281 292 ip_addr=proxy_data['auth_user']['ip_addr'])
282 293
283 294 return super(RequestContextTask, self).__call__(*args, **kwargs)
284 295
General Comments 0
You need to be logged in to leave comments. Login now