##// END OF EJS Templates
celery: ping db connection before task execution to recycle db connections.
marcink -
r3390:02f7713a default
parent child Browse files
Show More
@@ -1,302 +1,306 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 """
21 21 Celery loader, run with::
22 22
23 23 celery worker \
24 24 --beat \
25 25 --app rhodecode.lib.celerylib.loader \
26 26 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
27 27 --loglevel DEBUG --ini=._dev/dev.ini
28 28 """
29 29 import os
30 30 import logging
31 31 import importlib
32 32
33 33 from celery import Celery
34 34 from celery import signals
35 35 from celery import Task
36 36 from celery import exceptions # pragma: no cover
37 37 from kombu.serialization import register
38 38 from pyramid.threadlocal import get_current_request
39 39
40 40 import rhodecode
41 41
42 42 from rhodecode.lib.auth import AuthUser
43 from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars
43 from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars, ping_db
44 44 from rhodecode.lib.ext_json import json
45 45 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
46 46 from rhodecode.lib.utils2 import str2bool
47 47 from rhodecode.model import meta
48 48
49 49
50 50 register('json_ext', json.dumps, json.loads,
51 51 content_type='application/x-json-ext',
52 52 content_encoding='utf-8')
53 53
54 54 log = logging.getLogger('celery.rhodecode.loader')
55 55
56 56
57 57 def add_preload_arguments(parser):
58 58 parser.add_argument(
59 59 '--ini', default=None,
60 60 help='Path to ini configuration file.'
61 61 )
62 62 parser.add_argument(
63 63 '--ini-var', default=None,
64 64 help='Comma separated list of key=value to pass to ini.'
65 65 )
66 66
67 67
68 68 def get_logger(obj):
69 69 custom_log = logging.getLogger(
70 70 'rhodecode.task.{}'.format(obj.__class__.__name__))
71 71
72 72 if rhodecode.CELERY_ENABLED:
73 73 try:
74 74 custom_log = obj.get_logger()
75 75 except Exception:
76 76 pass
77 77
78 78 return custom_log
79 79
80 80
81 81 imports = ['rhodecode.lib.celerylib.tasks']
82 82
83 83 try:
84 84 # try if we have EE tasks available
85 85 importlib.import_module('rc_ee')
86 86 imports.append('rc_ee.lib.celerylib.tasks')
87 87 except ImportError:
88 88 pass
89 89
90 90
91 91 base_celery_config = {
92 92 'result_backend': 'rpc://',
93 93 'result_expires': 60 * 60 * 24,
94 94 'result_persistent': True,
95 95 'imports': imports,
96 96 'worker_max_tasks_per_child': 100,
97 97 'accept_content': ['json_ext'],
98 98 'task_serializer': 'json_ext',
99 99 'result_serializer': 'json_ext',
100 100 'worker_hijack_root_logger': False,
101 101 'database_table_names': {
102 102 'task': 'beat_taskmeta',
103 103 'group': 'beat_groupmeta',
104 104 }
105 105 }
106 106 # init main celery app
107 107 celery_app = Celery()
108 108 celery_app.user_options['preload'].add(add_preload_arguments)
109 109 ini_file_glob = None
110 110
111 111
112 112 @signals.setup_logging.connect
113 113 def setup_logging_callback(**kwargs):
114 114 setup_logging(ini_file_glob)
115 115
116 116
117 117 @signals.user_preload_options.connect
118 118 def on_preload_parsed(options, **kwargs):
119 119 ini_location = options['ini']
120 120 ini_vars = options['ini_var']
121 121 celery_app.conf['INI_PYRAMID'] = options['ini']
122 122
123 123 if ini_location is None:
124 124 print('You must provide the paste --ini argument')
125 125 exit(-1)
126 126
127 127 options = None
128 128 if ini_vars is not None:
129 129 options = parse_ini_vars(ini_vars)
130 130
131 131 global ini_file_glob
132 132 ini_file_glob = ini_location
133 133
134 134 log.debug('Bootstrapping RhodeCode application...')
135 135 env = bootstrap(ini_location, options=options)
136 136
137 137 setup_celery_app(
138 138 app=env['app'], root=env['root'], request=env['request'],
139 139 registry=env['registry'], closer=env['closer'],
140 140 ini_location=ini_location)
141 141
142 142 # fix the global flag even if it's disabled via .ini file because this
143 143 # is a worker code that doesn't need this to be disabled.
144 144 rhodecode.CELERY_ENABLED = True
145 145
146 146
147 @signals.task_prerun.connect
148 def task_prerun_signal(task_id, task, args, **kwargs):
149 ping_db()
150
151
147 152 @signals.task_success.connect
148 153 def task_success_signal(result, **kwargs):
149 154 meta.Session.commit()
150 155 closer = celery_app.conf['PYRAMID_CLOSER']
151 156 if closer:
152 157 closer()
153 158
154 159
155 160 @signals.task_retry.connect
156 161 def task_retry_signal(
157 162 request, reason, einfo, **kwargs):
158 163 meta.Session.remove()
159 164 closer = celery_app.conf['PYRAMID_CLOSER']
160 165 if closer:
161 166 closer()
162 167
163 168
164 169 @signals.task_failure.connect
165 170 def task_failure_signal(
166 171 task_id, exception, args, kwargs, traceback, einfo, **kargs):
167 172 from rhodecode.lib.exc_tracking import store_exception
168 173
169 174 meta.Session.remove()
170 175
171 176 # simulate sys.exc_info()
172 177 exc_info = (einfo.type, einfo.exception, einfo.tb)
173 178 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
174 179
175 180 closer = celery_app.conf['PYRAMID_CLOSER']
176 181 if closer:
177 182 closer()
178 183
179 184
180 185 @signals.task_revoked.connect
181 186 def task_revoked_signal(
182 187 request, terminated, signum, expired, **kwargs):
183 188 closer = celery_app.conf['PYRAMID_CLOSER']
184 189 if closer:
185 190 closer()
186 191
187 192
188 193 def setup_celery_app(app, root, request, registry, closer, ini_location):
189 194 ini_dir = os.path.dirname(os.path.abspath(ini_location))
190 195 celery_config = base_celery_config
191 196 celery_config.update({
192 197 # store celerybeat scheduler db where the .ini file is
193 198 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'),
194 199 })
195 200 ini_settings = get_ini_config(ini_location)
196 201 log.debug('Got custom celery conf: %s', ini_settings)
197 202
198 203 celery_config.update(ini_settings)
199 204 celery_app.config_from_object(celery_config)
200 205
201 206 celery_app.conf.update({'PYRAMID_APP': app})
202 207 celery_app.conf.update({'PYRAMID_ROOT': root})
203 208 celery_app.conf.update({'PYRAMID_REQUEST': request})
204 209 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
205 210 celery_app.conf.update({'PYRAMID_CLOSER': closer})
206 211
207 212
208 213 def configure_celery(config, ini_location):
209 214 """
210 215 Helper that is called from our application creation logic. It gives
211 216 connection info into running webapp and allows execution of tasks from
212 217 RhodeCode itself
213 218 """
214 219 # store some globals into rhodecode
215 220 rhodecode.CELERY_ENABLED = str2bool(
216 221 config.registry.settings.get('use_celery'))
217 222 if rhodecode.CELERY_ENABLED:
218 223 log.info('Configuring celery based on `%s` file', ini_location)
219 224 setup_celery_app(
220 225 app=None, root=None, request=None, registry=config.registry,
221 226 closer=None, ini_location=ini_location)
222 227
223 228
224 229 def maybe_prepare_env(req):
225 230 environ = {}
226 231 try:
227 232 environ.update({
228 233 'PATH_INFO': req.environ['PATH_INFO'],
229 234 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
230 'HTTP_HOST':
231 req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
235 'HTTP_HOST':req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
232 236 'SERVER_NAME': req.environ['SERVER_NAME'],
233 237 'SERVER_PORT': req.environ['SERVER_PORT'],
234 238 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
235 239 })
236 240 except Exception:
237 241 pass
238 242
239 243 return environ
240 244
241 245
242 246 class RequestContextTask(Task):
243 247 """
244 248 This is a celery task which will create a rhodecode app instance context
245 249 for the task, patch pyramid with the original request
246 250 that created the task and also add the user to the context.
247 251 """
248 252
249 253 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
250 254 link=None, link_error=None, shadow=None, **options):
251 255 """ queue the job to run (we are in web request context here) """
252 256
253 257 req = get_current_request()
254 258
255 259 # web case
256 260 if hasattr(req, 'user'):
257 261 ip_addr = req.user.ip_addr
258 262 user_id = req.user.user_id
259 263
260 264 # api case
261 265 elif hasattr(req, 'rpc_user'):
262 266 ip_addr = req.rpc_user.ip_addr
263 267 user_id = req.rpc_user.user_id
264 268 else:
265 269 raise Exception(
266 270 'Unable to fetch required data from request: {}. \n'
267 271 'This task is required to be executed from context of '
268 272 'request in a webapp'.format(repr(req)))
269 273
270 274 if req:
271 275 # we hook into kwargs since it is the only way to pass our data to
272 276 # the celery worker
273 277 environ = maybe_prepare_env(req)
274 278 options['headers'] = options.get('headers', {})
275 279 options['headers'].update({
276 280 'rhodecode_proxy_data': {
277 281 'environ': environ,
278 282 'auth_user': {
279 283 'ip_addr': ip_addr,
280 284 'user_id': user_id
281 285 },
282 286 }
283 287 })
284 288
285 289 return super(RequestContextTask, self).apply_async(
286 290 args, kwargs, task_id, producer, link, link_error, shadow, **options)
287 291
288 292 def __call__(self, *args, **kwargs):
289 293 """ rebuild the context and then run task on celery worker """
290 294
291 295 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
292 296 if not proxy_data:
293 297 return super(RequestContextTask, self).__call__(*args, **kwargs)
294 298
295 299 log.debug('using celery proxy data to run task: %r', proxy_data)
296 300 # re-inject and register threadlocals for proper routing support
297 301 request = prepare_request(proxy_data['environ'])
298 302 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
299 303 ip_addr=proxy_data['auth_user']['ip_addr'])
300 304
301 305 return super(RequestContextTask, self).__call__(*args, **kwargs)
302 306
@@ -1,169 +1,187 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22 import json
23 23 import logging
24 24 import datetime
25 import time
25 26
26 27 from functools import partial
27 28
28 29 from pyramid.compat import configparser
29 30 from celery.result import AsyncResult
30 31 import celery.loaders.base
31 32 import celery.schedules
32 33
33
34 34 log = logging.getLogger(__name__)
35 35
36 36
37 37 def get_task_id(task):
38 38 task_id = None
39 39 if isinstance(task, AsyncResult):
40 40 task_id = task.task_id
41 41
42 42 return task_id
43 43
44 44
45 45 def crontab(value):
46 46 return celery.schedules.crontab(**value)
47 47
48 48
49 49 def timedelta(value):
50 50 return datetime.timedelta(**value)
51 51
52 52
53 53 def safe_json(get, section, key):
54 54 value = ''
55 55 try:
56 56 value = get(key)
57 57 json_value = json.loads(value)
58 58 except ValueError:
59 59 msg = 'The %s=%s is not valid json in section %s' % (
60 60 key, value, section
61 61 )
62 62 raise ValueError(msg)
63 63
64 64 return json_value
65 65
66 66
67 67 def raw_2_schedule(schedule_value, schedule_type):
68 68 schedule_type_map = {
69 69 'crontab': crontab,
70 70 'timedelta': timedelta,
71 71 'integer': int
72 72 }
73 73 scheduler_cls = schedule_type_map.get(schedule_type)
74 74
75 75 if scheduler_cls is None:
76 76 raise ValueError(
77 77 'schedule type %s in section is invalid' % (
78 78 schedule_type,
79 79 )
80 80 )
81 81 try:
82 82 schedule = scheduler_cls(schedule_value)
83 83 except TypeError:
84 84 log.exception('Failed to compose a schedule from value: %r', schedule_value)
85 85 schedule = None
86 86 return schedule
87 87
88 88
89 89 def get_beat_config(parser, section):
90 90
91 91 get = partial(parser.get, section)
92 92 has_option = partial(parser.has_option, section)
93 93
94 94 schedule_type = get('type')
95 95 schedule_value = safe_json(get, section, 'schedule')
96 96
97 97 config = {
98 98 'schedule_type': schedule_type,
99 99 'schedule_value': schedule_value,
100 100 'task': get('task'),
101 101 }
102 102 schedule = raw_2_schedule(schedule_value, schedule_type)
103 103 if schedule:
104 104 config['schedule'] = schedule
105 105
106 106 if has_option('args'):
107 107 config['args'] = safe_json(get, section, 'args')
108 108
109 109 if has_option('kwargs'):
110 110 config['kwargs'] = safe_json(get, section, 'kwargs')
111 111
112 112 if has_option('force_update'):
113 113 config['force_update'] = get('force_update')
114 114
115 115 return config
116 116
117 117
118 118 def get_ini_config(ini_location):
119 119 """
120 120 Converts basic ini configuration into celery 4.X options
121 121 """
122 122 def key_converter(key_name):
123 123 pref = 'celery.'
124 124 if key_name.startswith(pref):
125 125 return key_name[len(pref):].replace('.', '_').lower()
126 126
127 127 def type_converter(parsed_key, value):
128 128 # cast to int
129 129 if value.isdigit():
130 130 return int(value)
131 131
132 132 # cast to bool
133 133 if value.lower() in ['true', 'false', 'True', 'False']:
134 134 return value.lower() == 'true'
135 135 return value
136 136
137 137 parser = configparser.SafeConfigParser(
138 138 defaults={'here': os.path.abspath(ini_location)})
139 139 parser.read(ini_location)
140 140
141 141 ini_config = {}
142 142 for k, v in parser.items('app:main'):
143 143 pref = 'celery.'
144 144 if k.startswith(pref):
145 145 ini_config[key_converter(k)] = type_converter(key_converter(k), v)
146 146
147 147 beat_config = {}
148 148 for section in parser.sections():
149 149 if section.startswith('celerybeat:'):
150 150 name = section.split(':', 1)[1]
151 151 beat_config[name] = get_beat_config(parser, section)
152 152
153 153 # final compose of settings
154 154 celery_settings = {}
155 155
156 156 if ini_config:
157 157 celery_settings.update(ini_config)
158 158 if beat_config:
159 159 celery_settings.update({'beat_schedule': beat_config})
160 160
161 161 return celery_settings
162 162
163 163
164 164 def parse_ini_vars(ini_vars):
165 165 options = {}
166 166 for pairs in ini_vars.split(','):
167 167 key, value = pairs.split('=')
168 168 options[key] = value
169 169 return options
170
171
172 def ping_db():
173 from rhodecode.model import meta
174 from rhodecode.model.db import DbMigrateVersion
175 log.info('Testing DB connection...')
176
177 for test in range(10):
178 try:
179 scalar = DbMigrateVersion.query().scalar()
180 log.debug('DB PING %s@%s', scalar, scalar.version)
181 break
182 except Exception:
183 retry = 1
184 log.debug('DB not ready, next try in %ss', retry)
185 time.sleep(retry)
186 finally:
187 meta.Session.remove()
General Comments 0
You need to be logged in to leave comments. Login now