##// END OF EJS Templates
fix(celery): use a lightweight method of db ping to speed up tasks executions
super-admin -
r5290:2ffa4031 default
parent child Browse files
Show More
@@ -1,133 +1,138 b''
1 # Copyright (C) 2010-2023 RhodeCode GmbH
1 # Copyright (C) 2010-2023 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
18
19 import logging
19 import logging
20 import datetime
20 import datetime
21 import time
21 import time
22
22
23 from functools import partial
23 from functools import partial
24
24
25 import configparser
25 import configparser
26 from celery.result import AsyncResult
26 from celery.result import AsyncResult
27 import celery.loaders.base
27 import celery.loaders.base
28 import celery.schedules
28 import celery.schedules
29
29
30 from rhodecode.lib.ext_json import sjson as json
30 from rhodecode.lib.ext_json import sjson as json
31
31
32 log = logging.getLogger(__name__)
32 log = logging.getLogger(__name__)
33
33
34
34
35 def get_task_id(task):
35 def get_task_id(task):
36 task_id = None
36 task_id = None
37 if isinstance(task, AsyncResult):
37 if isinstance(task, AsyncResult):
38 task_id = task.task_id
38 task_id = task.task_id
39
39
40 return task_id
40 return task_id
41
41
42
42
43 def crontab(value):
43 def crontab(value):
44 return celery.schedules.crontab(**value)
44 return celery.schedules.crontab(**value)
45
45
46
46
47 def timedelta(value):
47 def timedelta(value):
48 return datetime.timedelta(**value)
48 return datetime.timedelta(**value)
49
49
50
50
51 def safe_json(get, section, key):
51 def safe_json(get, section, key):
52 value = ''
52 value = ''
53 try:
53 try:
54 value = get(key)
54 value = get(key)
55 json_value = json.loads(value)
55 json_value = json.loads(value)
56 except ValueError:
56 except ValueError:
57 msg = f'The {key}={value} is not valid json in section {section}'
57 msg = f'The {key}={value} is not valid json in section {section}'
58 raise ValueError(msg)
58 raise ValueError(msg)
59
59
60 return json_value
60 return json_value
61
61
62
62
63 def raw_2_schedule(schedule_value, schedule_type):
63 def raw_2_schedule(schedule_value, schedule_type):
64 schedule_type_map = {
64 schedule_type_map = {
65 'crontab': crontab,
65 'crontab': crontab,
66 'timedelta': timedelta,
66 'timedelta': timedelta,
67 'integer': int
67 'integer': int
68 }
68 }
69 scheduler_cls = schedule_type_map.get(schedule_type)
69 scheduler_cls = schedule_type_map.get(schedule_type)
70
70
71 if scheduler_cls is None:
71 if scheduler_cls is None:
72 raise ValueError(f'schedule type {schedule_type} in section is invalid')
72 raise ValueError(f'schedule type {schedule_type} in section is invalid')
73 try:
73 try:
74 schedule = scheduler_cls(schedule_value)
74 schedule = scheduler_cls(schedule_value)
75 except TypeError:
75 except TypeError:
76 log.exception('Failed to compose a schedule from value: %r', schedule_value)
76 log.exception('Failed to compose a schedule from value: %r', schedule_value)
77 schedule = None
77 schedule = None
78 return schedule
78 return schedule
79
79
80
80
81 def get_beat_config(parser, section):
81 def get_beat_config(parser, section):
82
82
83 get = partial(parser.get, section)
83 get = partial(parser.get, section)
84 has_option = partial(parser.has_option, section)
84 has_option = partial(parser.has_option, section)
85
85
86 schedule_type = get('type')
86 schedule_type = get('type')
87 schedule_value = safe_json(get, section, 'schedule')
87 schedule_value = safe_json(get, section, 'schedule')
88
88
89 config = {
89 config = {
90 'schedule_type': schedule_type,
90 'schedule_type': schedule_type,
91 'schedule_value': schedule_value,
91 'schedule_value': schedule_value,
92 'task': get('task'),
92 'task': get('task'),
93 }
93 }
94 schedule = raw_2_schedule(schedule_value, schedule_type)
94 schedule = raw_2_schedule(schedule_value, schedule_type)
95 if schedule:
95 if schedule:
96 config['schedule'] = schedule
96 config['schedule'] = schedule
97
97
98 if has_option('args'):
98 if has_option('args'):
99 config['args'] = safe_json(get, section, 'args')
99 config['args'] = safe_json(get, section, 'args')
100
100
101 if has_option('kwargs'):
101 if has_option('kwargs'):
102 config['kwargs'] = safe_json(get, section, 'kwargs')
102 config['kwargs'] = safe_json(get, section, 'kwargs')
103
103
104 if has_option('force_update'):
104 if has_option('force_update'):
105 config['force_update'] = get('force_update')
105 config['force_update'] = get('force_update')
106
106
107 return config
107 return config
108
108
109
109
110 def parse_ini_vars(ini_vars):
110 def parse_ini_vars(ini_vars):
111 options = {}
111 options = {}
112 for pairs in ini_vars.split(','):
112 for pairs in ini_vars.split(','):
113 key, value = pairs.split('=')
113 key, value = pairs.split('=')
114 options[key] = value
114 options[key] = value
115 return options
115 return options
116
116
117
117
118 def ping_db():
118 def ping_db():
119 from rhodecode.model import meta
120 from rhodecode.model.db import DbMigrateVersion
121 log.info('Testing DB connection...')
119 log.info('Testing DB connection...')
122
120
121 from sqlalchemy import text
122 from rhodecode.model import meta
123 qry = text("SELECT user_id from users where username = :uname")
123 for test in range(10):
124 for test in range(10):
124 try:
125 try:
125 scalar = DbMigrateVersion.query().scalar()
126 engine = meta.get_engine()
126 log.debug('DB PING %s@%s', scalar, scalar.version)
127 with meta.SA_Session(engine) as session:
128 result = session.execute(qry, {'uname': 'default'})
129 user_id = result.first()[0]
130
131 log.debug('DB PING user_id:%s', user_id)
127 break
132 break
128 except Exception:
133 except Exception:
129 retry = 1
134 retry = 1
130 log.debug('DB not ready, next try in %ss', retry)
135 log.debug('DB not ready, next try in %ss', retry)
131 time.sleep(retry)
136 time.sleep(retry)
132 finally:
137 finally:
133 meta.Session.remove()
138 meta.Session.remove()
General Comments 0
You need to be logged in to leave comments. Login now