##// END OF EJS Templates
fix(celery): use a lightweight method of db ping to speed up tasks executions
super-admin -
r5290:2ffa4031 default
parent child Browse files
Show More
@@ -1,133 +1,138 b''
1 1 # Copyright (C) 2010-2023 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import logging
20 20 import datetime
21 21 import time
22 22
23 23 from functools import partial
24 24
25 25 import configparser
26 26 from celery.result import AsyncResult
27 27 import celery.loaders.base
28 28 import celery.schedules
29 29
30 30 from rhodecode.lib.ext_json import sjson as json
31 31
32 32 log = logging.getLogger(__name__)
33 33
34 34
35 35 def get_task_id(task):
36 36 task_id = None
37 37 if isinstance(task, AsyncResult):
38 38 task_id = task.task_id
39 39
40 40 return task_id
41 41
42 42
43 43 def crontab(value):
44 44 return celery.schedules.crontab(**value)
45 45
46 46
47 47 def timedelta(value):
48 48 return datetime.timedelta(**value)
49 49
50 50
51 51 def safe_json(get, section, key):
52 52 value = ''
53 53 try:
54 54 value = get(key)
55 55 json_value = json.loads(value)
56 56 except ValueError:
57 57 msg = f'The {key}={value} is not valid json in section {section}'
58 58 raise ValueError(msg)
59 59
60 60 return json_value
61 61
62 62
63 63 def raw_2_schedule(schedule_value, schedule_type):
64 64 schedule_type_map = {
65 65 'crontab': crontab,
66 66 'timedelta': timedelta,
67 67 'integer': int
68 68 }
69 69 scheduler_cls = schedule_type_map.get(schedule_type)
70 70
71 71 if scheduler_cls is None:
72 72 raise ValueError(f'schedule type {schedule_type} in section is invalid')
73 73 try:
74 74 schedule = scheduler_cls(schedule_value)
75 75 except TypeError:
76 76 log.exception('Failed to compose a schedule from value: %r', schedule_value)
77 77 schedule = None
78 78 return schedule
79 79
80 80
81 81 def get_beat_config(parser, section):
82 82
83 83 get = partial(parser.get, section)
84 84 has_option = partial(parser.has_option, section)
85 85
86 86 schedule_type = get('type')
87 87 schedule_value = safe_json(get, section, 'schedule')
88 88
89 89 config = {
90 90 'schedule_type': schedule_type,
91 91 'schedule_value': schedule_value,
92 92 'task': get('task'),
93 93 }
94 94 schedule = raw_2_schedule(schedule_value, schedule_type)
95 95 if schedule:
96 96 config['schedule'] = schedule
97 97
98 98 if has_option('args'):
99 99 config['args'] = safe_json(get, section, 'args')
100 100
101 101 if has_option('kwargs'):
102 102 config['kwargs'] = safe_json(get, section, 'kwargs')
103 103
104 104 if has_option('force_update'):
105 105 config['force_update'] = get('force_update')
106 106
107 107 return config
108 108
109 109
110 110 def parse_ini_vars(ini_vars):
111 111 options = {}
112 112 for pairs in ini_vars.split(','):
113 113 key, value = pairs.split('=')
114 114 options[key] = value
115 115 return options
116 116
117 117
118 118 def ping_db():
119 from rhodecode.model import meta
120 from rhodecode.model.db import DbMigrateVersion
121 119 log.info('Testing DB connection...')
122 120
121 from sqlalchemy import text
122 from rhodecode.model import meta
123 qry = text("SELECT user_id from users where username = :uname")
123 124 for test in range(10):
124 125 try:
125 scalar = DbMigrateVersion.query().scalar()
126 log.debug('DB PING %s@%s', scalar, scalar.version)
126 engine = meta.get_engine()
127 with meta.SA_Session(engine) as session:
128 result = session.execute(qry, {'uname': 'default'})
129 user_id = result.first()[0]
130
131 log.debug('DB PING user_id:%s', user_id)
127 132 break
128 133 except Exception:
129 134 retry = 1
130 135 log.debug('DB not ready, next try in %ss', retry)
131 136 time.sleep(retry)
132 137 finally:
133 138 meta.Session.remove()
General Comments 0
You need to be logged in to leave comments. Login now