##// END OF EJS Templates
celery: ping db connection before task execution to recycle db connections.
marcink -
r3390:02f7713a default
parent child Browse files
Show More
@@ -1,302 +1,306 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2019 RhodeCode GmbH
3 # Copyright (C) 2010-2019 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 """
20 """
21 Celery loader, run with::
21 Celery loader, run with::
22
22
23 celery worker \
23 celery worker \
24 --beat \
24 --beat \
25 --app rhodecode.lib.celerylib.loader \
25 --app rhodecode.lib.celerylib.loader \
26 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
26 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
27 --loglevel DEBUG --ini=._dev/dev.ini
27 --loglevel DEBUG --ini=._dev/dev.ini
28 """
28 """
29 import os
29 import os
30 import logging
30 import logging
31 import importlib
31 import importlib
32
32
33 from celery import Celery
33 from celery import Celery
34 from celery import signals
34 from celery import signals
35 from celery import Task
35 from celery import Task
36 from celery import exceptions # pragma: no cover
36 from celery import exceptions # pragma: no cover
37 from kombu.serialization import register
37 from kombu.serialization import register
38 from pyramid.threadlocal import get_current_request
38 from pyramid.threadlocal import get_current_request
39
39
40 import rhodecode
40 import rhodecode
41
41
42 from rhodecode.lib.auth import AuthUser
42 from rhodecode.lib.auth import AuthUser
43 from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars
43 from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars, ping_db
44 from rhodecode.lib.ext_json import json
44 from rhodecode.lib.ext_json import json
45 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
45 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
46 from rhodecode.lib.utils2 import str2bool
46 from rhodecode.lib.utils2 import str2bool
47 from rhodecode.model import meta
47 from rhodecode.model import meta
48
48
49
49
50 register('json_ext', json.dumps, json.loads,
50 register('json_ext', json.dumps, json.loads,
51 content_type='application/x-json-ext',
51 content_type='application/x-json-ext',
52 content_encoding='utf-8')
52 content_encoding='utf-8')
53
53
54 log = logging.getLogger('celery.rhodecode.loader')
54 log = logging.getLogger('celery.rhodecode.loader')
55
55
56
56
57 def add_preload_arguments(parser):
57 def add_preload_arguments(parser):
58 parser.add_argument(
58 parser.add_argument(
59 '--ini', default=None,
59 '--ini', default=None,
60 help='Path to ini configuration file.'
60 help='Path to ini configuration file.'
61 )
61 )
62 parser.add_argument(
62 parser.add_argument(
63 '--ini-var', default=None,
63 '--ini-var', default=None,
64 help='Comma separated list of key=value to pass to ini.'
64 help='Comma separated list of key=value to pass to ini.'
65 )
65 )
66
66
67
67
68 def get_logger(obj):
68 def get_logger(obj):
69 custom_log = logging.getLogger(
69 custom_log = logging.getLogger(
70 'rhodecode.task.{}'.format(obj.__class__.__name__))
70 'rhodecode.task.{}'.format(obj.__class__.__name__))
71
71
72 if rhodecode.CELERY_ENABLED:
72 if rhodecode.CELERY_ENABLED:
73 try:
73 try:
74 custom_log = obj.get_logger()
74 custom_log = obj.get_logger()
75 except Exception:
75 except Exception:
76 pass
76 pass
77
77
78 return custom_log
78 return custom_log
79
79
80
80
81 imports = ['rhodecode.lib.celerylib.tasks']
81 imports = ['rhodecode.lib.celerylib.tasks']
82
82
83 try:
83 try:
84 # try if we have EE tasks available
84 # try if we have EE tasks available
85 importlib.import_module('rc_ee')
85 importlib.import_module('rc_ee')
86 imports.append('rc_ee.lib.celerylib.tasks')
86 imports.append('rc_ee.lib.celerylib.tasks')
87 except ImportError:
87 except ImportError:
88 pass
88 pass
89
89
90
90
91 base_celery_config = {
91 base_celery_config = {
92 'result_backend': 'rpc://',
92 'result_backend': 'rpc://',
93 'result_expires': 60 * 60 * 24,
93 'result_expires': 60 * 60 * 24,
94 'result_persistent': True,
94 'result_persistent': True,
95 'imports': imports,
95 'imports': imports,
96 'worker_max_tasks_per_child': 100,
96 'worker_max_tasks_per_child': 100,
97 'accept_content': ['json_ext'],
97 'accept_content': ['json_ext'],
98 'task_serializer': 'json_ext',
98 'task_serializer': 'json_ext',
99 'result_serializer': 'json_ext',
99 'result_serializer': 'json_ext',
100 'worker_hijack_root_logger': False,
100 'worker_hijack_root_logger': False,
101 'database_table_names': {
101 'database_table_names': {
102 'task': 'beat_taskmeta',
102 'task': 'beat_taskmeta',
103 'group': 'beat_groupmeta',
103 'group': 'beat_groupmeta',
104 }
104 }
105 }
105 }
106 # init main celery app
106 # init main celery app
107 celery_app = Celery()
107 celery_app = Celery()
108 celery_app.user_options['preload'].add(add_preload_arguments)
108 celery_app.user_options['preload'].add(add_preload_arguments)
109 ini_file_glob = None
109 ini_file_glob = None
110
110
111
111
112 @signals.setup_logging.connect
112 @signals.setup_logging.connect
113 def setup_logging_callback(**kwargs):
113 def setup_logging_callback(**kwargs):
114 setup_logging(ini_file_glob)
114 setup_logging(ini_file_glob)
115
115
116
116
117 @signals.user_preload_options.connect
117 @signals.user_preload_options.connect
118 def on_preload_parsed(options, **kwargs):
118 def on_preload_parsed(options, **kwargs):
119 ini_location = options['ini']
119 ini_location = options['ini']
120 ini_vars = options['ini_var']
120 ini_vars = options['ini_var']
121 celery_app.conf['INI_PYRAMID'] = options['ini']
121 celery_app.conf['INI_PYRAMID'] = options['ini']
122
122
123 if ini_location is None:
123 if ini_location is None:
124 print('You must provide the paste --ini argument')
124 print('You must provide the paste --ini argument')
125 exit(-1)
125 exit(-1)
126
126
127 options = None
127 options = None
128 if ini_vars is not None:
128 if ini_vars is not None:
129 options = parse_ini_vars(ini_vars)
129 options = parse_ini_vars(ini_vars)
130
130
131 global ini_file_glob
131 global ini_file_glob
132 ini_file_glob = ini_location
132 ini_file_glob = ini_location
133
133
134 log.debug('Bootstrapping RhodeCode application...')
134 log.debug('Bootstrapping RhodeCode application...')
135 env = bootstrap(ini_location, options=options)
135 env = bootstrap(ini_location, options=options)
136
136
137 setup_celery_app(
137 setup_celery_app(
138 app=env['app'], root=env['root'], request=env['request'],
138 app=env['app'], root=env['root'], request=env['request'],
139 registry=env['registry'], closer=env['closer'],
139 registry=env['registry'], closer=env['closer'],
140 ini_location=ini_location)
140 ini_location=ini_location)
141
141
142 # fix the global flag even if it's disabled via .ini file because this
142 # fix the global flag even if it's disabled via .ini file because this
143 # is a worker code that doesn't need this to be disabled.
143 # is a worker code that doesn't need this to be disabled.
144 rhodecode.CELERY_ENABLED = True
144 rhodecode.CELERY_ENABLED = True
145
145
146
146
147 @signals.task_prerun.connect
148 def task_prerun_signal(task_id, task, args, **kwargs):
149 ping_db()
150
151
147 @signals.task_success.connect
152 @signals.task_success.connect
148 def task_success_signal(result, **kwargs):
153 def task_success_signal(result, **kwargs):
149 meta.Session.commit()
154 meta.Session.commit()
150 closer = celery_app.conf['PYRAMID_CLOSER']
155 closer = celery_app.conf['PYRAMID_CLOSER']
151 if closer:
156 if closer:
152 closer()
157 closer()
153
158
154
159
155 @signals.task_retry.connect
160 @signals.task_retry.connect
156 def task_retry_signal(
161 def task_retry_signal(
157 request, reason, einfo, **kwargs):
162 request, reason, einfo, **kwargs):
158 meta.Session.remove()
163 meta.Session.remove()
159 closer = celery_app.conf['PYRAMID_CLOSER']
164 closer = celery_app.conf['PYRAMID_CLOSER']
160 if closer:
165 if closer:
161 closer()
166 closer()
162
167
163
168
164 @signals.task_failure.connect
169 @signals.task_failure.connect
165 def task_failure_signal(
170 def task_failure_signal(
166 task_id, exception, args, kwargs, traceback, einfo, **kargs):
171 task_id, exception, args, kwargs, traceback, einfo, **kargs):
167 from rhodecode.lib.exc_tracking import store_exception
172 from rhodecode.lib.exc_tracking import store_exception
168
173
169 meta.Session.remove()
174 meta.Session.remove()
170
175
171 # simulate sys.exc_info()
176 # simulate sys.exc_info()
172 exc_info = (einfo.type, einfo.exception, einfo.tb)
177 exc_info = (einfo.type, einfo.exception, einfo.tb)
173 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
178 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
174
179
175 closer = celery_app.conf['PYRAMID_CLOSER']
180 closer = celery_app.conf['PYRAMID_CLOSER']
176 if closer:
181 if closer:
177 closer()
182 closer()
178
183
179
184
180 @signals.task_revoked.connect
185 @signals.task_revoked.connect
181 def task_revoked_signal(
186 def task_revoked_signal(
182 request, terminated, signum, expired, **kwargs):
187 request, terminated, signum, expired, **kwargs):
183 closer = celery_app.conf['PYRAMID_CLOSER']
188 closer = celery_app.conf['PYRAMID_CLOSER']
184 if closer:
189 if closer:
185 closer()
190 closer()
186
191
187
192
188 def setup_celery_app(app, root, request, registry, closer, ini_location):
193 def setup_celery_app(app, root, request, registry, closer, ini_location):
189 ini_dir = os.path.dirname(os.path.abspath(ini_location))
194 ini_dir = os.path.dirname(os.path.abspath(ini_location))
190 celery_config = base_celery_config
195 celery_config = base_celery_config
191 celery_config.update({
196 celery_config.update({
192 # store celerybeat scheduler db where the .ini file is
197 # store celerybeat scheduler db where the .ini file is
193 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'),
198 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'),
194 })
199 })
195 ini_settings = get_ini_config(ini_location)
200 ini_settings = get_ini_config(ini_location)
196 log.debug('Got custom celery conf: %s', ini_settings)
201 log.debug('Got custom celery conf: %s', ini_settings)
197
202
198 celery_config.update(ini_settings)
203 celery_config.update(ini_settings)
199 celery_app.config_from_object(celery_config)
204 celery_app.config_from_object(celery_config)
200
205
201 celery_app.conf.update({'PYRAMID_APP': app})
206 celery_app.conf.update({'PYRAMID_APP': app})
202 celery_app.conf.update({'PYRAMID_ROOT': root})
207 celery_app.conf.update({'PYRAMID_ROOT': root})
203 celery_app.conf.update({'PYRAMID_REQUEST': request})
208 celery_app.conf.update({'PYRAMID_REQUEST': request})
204 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
209 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
205 celery_app.conf.update({'PYRAMID_CLOSER': closer})
210 celery_app.conf.update({'PYRAMID_CLOSER': closer})
206
211
207
212
208 def configure_celery(config, ini_location):
213 def configure_celery(config, ini_location):
209 """
214 """
210 Helper that is called from our application creation logic. It gives
215 Helper that is called from our application creation logic. It gives
211 connection info into running webapp and allows execution of tasks from
216 connection info into running webapp and allows execution of tasks from
212 RhodeCode itself
217 RhodeCode itself
213 """
218 """
214 # store some globals into rhodecode
219 # store some globals into rhodecode
215 rhodecode.CELERY_ENABLED = str2bool(
220 rhodecode.CELERY_ENABLED = str2bool(
216 config.registry.settings.get('use_celery'))
221 config.registry.settings.get('use_celery'))
217 if rhodecode.CELERY_ENABLED:
222 if rhodecode.CELERY_ENABLED:
218 log.info('Configuring celery based on `%s` file', ini_location)
223 log.info('Configuring celery based on `%s` file', ini_location)
219 setup_celery_app(
224 setup_celery_app(
220 app=None, root=None, request=None, registry=config.registry,
225 app=None, root=None, request=None, registry=config.registry,
221 closer=None, ini_location=ini_location)
226 closer=None, ini_location=ini_location)
222
227
223
228
224 def maybe_prepare_env(req):
229 def maybe_prepare_env(req):
225 environ = {}
230 environ = {}
226 try:
231 try:
227 environ.update({
232 environ.update({
228 'PATH_INFO': req.environ['PATH_INFO'],
233 'PATH_INFO': req.environ['PATH_INFO'],
229 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
234 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
230 'HTTP_HOST':
235 'HTTP_HOST':req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
231 req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
232 'SERVER_NAME': req.environ['SERVER_NAME'],
236 'SERVER_NAME': req.environ['SERVER_NAME'],
233 'SERVER_PORT': req.environ['SERVER_PORT'],
237 'SERVER_PORT': req.environ['SERVER_PORT'],
234 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
238 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
235 })
239 })
236 except Exception:
240 except Exception:
237 pass
241 pass
238
242
239 return environ
243 return environ
240
244
241
245
242 class RequestContextTask(Task):
246 class RequestContextTask(Task):
243 """
247 """
244 This is a celery task which will create a rhodecode app instance context
248 This is a celery task which will create a rhodecode app instance context
245 for the task, patch pyramid with the original request
249 for the task, patch pyramid with the original request
246 that created the task and also add the user to the context.
250 that created the task and also add the user to the context.
247 """
251 """
248
252
249 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
253 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
250 link=None, link_error=None, shadow=None, **options):
254 link=None, link_error=None, shadow=None, **options):
251 """ queue the job to run (we are in web request context here) """
255 """ queue the job to run (we are in web request context here) """
252
256
253 req = get_current_request()
257 req = get_current_request()
254
258
255 # web case
259 # web case
256 if hasattr(req, 'user'):
260 if hasattr(req, 'user'):
257 ip_addr = req.user.ip_addr
261 ip_addr = req.user.ip_addr
258 user_id = req.user.user_id
262 user_id = req.user.user_id
259
263
260 # api case
264 # api case
261 elif hasattr(req, 'rpc_user'):
265 elif hasattr(req, 'rpc_user'):
262 ip_addr = req.rpc_user.ip_addr
266 ip_addr = req.rpc_user.ip_addr
263 user_id = req.rpc_user.user_id
267 user_id = req.rpc_user.user_id
264 else:
268 else:
265 raise Exception(
269 raise Exception(
266 'Unable to fetch required data from request: {}. \n'
270 'Unable to fetch required data from request: {}. \n'
267 'This task is required to be executed from context of '
271 'This task is required to be executed from context of '
268 'request in a webapp'.format(repr(req)))
272 'request in a webapp'.format(repr(req)))
269
273
270 if req:
274 if req:
271 # we hook into kwargs since it is the only way to pass our data to
275 # we hook into kwargs since it is the only way to pass our data to
272 # the celery worker
276 # the celery worker
273 environ = maybe_prepare_env(req)
277 environ = maybe_prepare_env(req)
274 options['headers'] = options.get('headers', {})
278 options['headers'] = options.get('headers', {})
275 options['headers'].update({
279 options['headers'].update({
276 'rhodecode_proxy_data': {
280 'rhodecode_proxy_data': {
277 'environ': environ,
281 'environ': environ,
278 'auth_user': {
282 'auth_user': {
279 'ip_addr': ip_addr,
283 'ip_addr': ip_addr,
280 'user_id': user_id
284 'user_id': user_id
281 },
285 },
282 }
286 }
283 })
287 })
284
288
285 return super(RequestContextTask, self).apply_async(
289 return super(RequestContextTask, self).apply_async(
286 args, kwargs, task_id, producer, link, link_error, shadow, **options)
290 args, kwargs, task_id, producer, link, link_error, shadow, **options)
287
291
288 def __call__(self, *args, **kwargs):
292 def __call__(self, *args, **kwargs):
289 """ rebuild the context and then run task on celery worker """
293 """ rebuild the context and then run task on celery worker """
290
294
291 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
295 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
292 if not proxy_data:
296 if not proxy_data:
293 return super(RequestContextTask, self).__call__(*args, **kwargs)
297 return super(RequestContextTask, self).__call__(*args, **kwargs)
294
298
295 log.debug('using celery proxy data to run task: %r', proxy_data)
299 log.debug('using celery proxy data to run task: %r', proxy_data)
296 # re-inject and register threadlocals for proper routing support
300 # re-inject and register threadlocals for proper routing support
297 request = prepare_request(proxy_data['environ'])
301 request = prepare_request(proxy_data['environ'])
298 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
302 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
299 ip_addr=proxy_data['auth_user']['ip_addr'])
303 ip_addr=proxy_data['auth_user']['ip_addr'])
300
304
301 return super(RequestContextTask, self).__call__(*args, **kwargs)
305 return super(RequestContextTask, self).__call__(*args, **kwargs)
302
306
@@ -1,169 +1,187 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2019 RhodeCode GmbH
3 # Copyright (C) 2010-2019 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import os
21 import os
22 import json
22 import json
23 import logging
23 import logging
24 import datetime
24 import datetime
25 import time
25
26
26 from functools import partial
27 from functools import partial
27
28
28 from pyramid.compat import configparser
29 from pyramid.compat import configparser
29 from celery.result import AsyncResult
30 from celery.result import AsyncResult
30 import celery.loaders.base
31 import celery.loaders.base
31 import celery.schedules
32 import celery.schedules
32
33
33
34 log = logging.getLogger(__name__)
34 log = logging.getLogger(__name__)
35
35
36
36
37 def get_task_id(task):
37 def get_task_id(task):
38 task_id = None
38 task_id = None
39 if isinstance(task, AsyncResult):
39 if isinstance(task, AsyncResult):
40 task_id = task.task_id
40 task_id = task.task_id
41
41
42 return task_id
42 return task_id
43
43
44
44
45 def crontab(value):
45 def crontab(value):
46 return celery.schedules.crontab(**value)
46 return celery.schedules.crontab(**value)
47
47
48
48
49 def timedelta(value):
49 def timedelta(value):
50 return datetime.timedelta(**value)
50 return datetime.timedelta(**value)
51
51
52
52
53 def safe_json(get, section, key):
53 def safe_json(get, section, key):
54 value = ''
54 value = ''
55 try:
55 try:
56 value = get(key)
56 value = get(key)
57 json_value = json.loads(value)
57 json_value = json.loads(value)
58 except ValueError:
58 except ValueError:
59 msg = 'The %s=%s is not valid json in section %s' % (
59 msg = 'The %s=%s is not valid json in section %s' % (
60 key, value, section
60 key, value, section
61 )
61 )
62 raise ValueError(msg)
62 raise ValueError(msg)
63
63
64 return json_value
64 return json_value
65
65
66
66
67 def raw_2_schedule(schedule_value, schedule_type):
67 def raw_2_schedule(schedule_value, schedule_type):
68 schedule_type_map = {
68 schedule_type_map = {
69 'crontab': crontab,
69 'crontab': crontab,
70 'timedelta': timedelta,
70 'timedelta': timedelta,
71 'integer': int
71 'integer': int
72 }
72 }
73 scheduler_cls = schedule_type_map.get(schedule_type)
73 scheduler_cls = schedule_type_map.get(schedule_type)
74
74
75 if scheduler_cls is None:
75 if scheduler_cls is None:
76 raise ValueError(
76 raise ValueError(
77 'schedule type %s in section is invalid' % (
77 'schedule type %s in section is invalid' % (
78 schedule_type,
78 schedule_type,
79 )
79 )
80 )
80 )
81 try:
81 try:
82 schedule = scheduler_cls(schedule_value)
82 schedule = scheduler_cls(schedule_value)
83 except TypeError:
83 except TypeError:
84 log.exception('Failed to compose a schedule from value: %r', schedule_value)
84 log.exception('Failed to compose a schedule from value: %r', schedule_value)
85 schedule = None
85 schedule = None
86 return schedule
86 return schedule
87
87
88
88
89 def get_beat_config(parser, section):
89 def get_beat_config(parser, section):
90
90
91 get = partial(parser.get, section)
91 get = partial(parser.get, section)
92 has_option = partial(parser.has_option, section)
92 has_option = partial(parser.has_option, section)
93
93
94 schedule_type = get('type')
94 schedule_type = get('type')
95 schedule_value = safe_json(get, section, 'schedule')
95 schedule_value = safe_json(get, section, 'schedule')
96
96
97 config = {
97 config = {
98 'schedule_type': schedule_type,
98 'schedule_type': schedule_type,
99 'schedule_value': schedule_value,
99 'schedule_value': schedule_value,
100 'task': get('task'),
100 'task': get('task'),
101 }
101 }
102 schedule = raw_2_schedule(schedule_value, schedule_type)
102 schedule = raw_2_schedule(schedule_value, schedule_type)
103 if schedule:
103 if schedule:
104 config['schedule'] = schedule
104 config['schedule'] = schedule
105
105
106 if has_option('args'):
106 if has_option('args'):
107 config['args'] = safe_json(get, section, 'args')
107 config['args'] = safe_json(get, section, 'args')
108
108
109 if has_option('kwargs'):
109 if has_option('kwargs'):
110 config['kwargs'] = safe_json(get, section, 'kwargs')
110 config['kwargs'] = safe_json(get, section, 'kwargs')
111
111
112 if has_option('force_update'):
112 if has_option('force_update'):
113 config['force_update'] = get('force_update')
113 config['force_update'] = get('force_update')
114
114
115 return config
115 return config
116
116
117
117
118 def get_ini_config(ini_location):
118 def get_ini_config(ini_location):
119 """
119 """
120 Converts basic ini configuration into celery 4.X options
120 Converts basic ini configuration into celery 4.X options
121 """
121 """
122 def key_converter(key_name):
122 def key_converter(key_name):
123 pref = 'celery.'
123 pref = 'celery.'
124 if key_name.startswith(pref):
124 if key_name.startswith(pref):
125 return key_name[len(pref):].replace('.', '_').lower()
125 return key_name[len(pref):].replace('.', '_').lower()
126
126
127 def type_converter(parsed_key, value):
127 def type_converter(parsed_key, value):
128 # cast to int
128 # cast to int
129 if value.isdigit():
129 if value.isdigit():
130 return int(value)
130 return int(value)
131
131
132 # cast to bool
132 # cast to bool
133 if value.lower() in ['true', 'false', 'True', 'False']:
133 if value.lower() in ['true', 'false', 'True', 'False']:
134 return value.lower() == 'true'
134 return value.lower() == 'true'
135 return value
135 return value
136
136
137 parser = configparser.SafeConfigParser(
137 parser = configparser.SafeConfigParser(
138 defaults={'here': os.path.abspath(ini_location)})
138 defaults={'here': os.path.abspath(ini_location)})
139 parser.read(ini_location)
139 parser.read(ini_location)
140
140
141 ini_config = {}
141 ini_config = {}
142 for k, v in parser.items('app:main'):
142 for k, v in parser.items('app:main'):
143 pref = 'celery.'
143 pref = 'celery.'
144 if k.startswith(pref):
144 if k.startswith(pref):
145 ini_config[key_converter(k)] = type_converter(key_converter(k), v)
145 ini_config[key_converter(k)] = type_converter(key_converter(k), v)
146
146
147 beat_config = {}
147 beat_config = {}
148 for section in parser.sections():
148 for section in parser.sections():
149 if section.startswith('celerybeat:'):
149 if section.startswith('celerybeat:'):
150 name = section.split(':', 1)[1]
150 name = section.split(':', 1)[1]
151 beat_config[name] = get_beat_config(parser, section)
151 beat_config[name] = get_beat_config(parser, section)
152
152
153 # final compose of settings
153 # final compose of settings
154 celery_settings = {}
154 celery_settings = {}
155
155
156 if ini_config:
156 if ini_config:
157 celery_settings.update(ini_config)
157 celery_settings.update(ini_config)
158 if beat_config:
158 if beat_config:
159 celery_settings.update({'beat_schedule': beat_config})
159 celery_settings.update({'beat_schedule': beat_config})
160
160
161 return celery_settings
161 return celery_settings
162
162
163
163
164 def parse_ini_vars(ini_vars):
164 def parse_ini_vars(ini_vars):
165 options = {}
165 options = {}
166 for pairs in ini_vars.split(','):
166 for pairs in ini_vars.split(','):
167 key, value = pairs.split('=')
167 key, value = pairs.split('=')
168 options[key] = value
168 options[key] = value
169 return options
169 return options
170
171
172 def ping_db():
173 from rhodecode.model import meta
174 from rhodecode.model.db import DbMigrateVersion
175 log.info('Testing DB connection...')
176
177 for test in range(10):
178 try:
179 scalar = DbMigrateVersion.query().scalar()
180 log.debug('DB PING %s@%s', scalar, scalar.version)
181 break
182 except Exception:
183 retry = 1
184 log.debug('DB not ready, next try in %ss', retry)
185 time.sleep(retry)
186 finally:
187 meta.Session.remove()
General Comments 0
You need to be logged in to leave comments. Login now