utils.py
138 lines
| 3.8 KiB
| text/x-python
|
PythonLexer
r5088 | # Copyright (C) 2010-2023 RhodeCode GmbH | |||
r2359 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import logging | ||||
import datetime | ||||
r3390 | import time | |||
r2359 | ||||
from functools import partial | ||||
r4927 | import configparser | |||
r2359 | from celery.result import AsyncResult | |||
import celery.loaders.base | ||||
import celery.schedules | ||||
r5065 | from rhodecode.lib.ext_json import sjson as json | |||
r2359 | log = logging.getLogger(__name__) | |||
def get_task_id(task): | ||||
task_id = None | ||||
if isinstance(task, AsyncResult): | ||||
task_id = task.task_id | ||||
return task_id | ||||
def crontab(value): | ||||
return celery.schedules.crontab(**value) | ||||
def timedelta(value): | ||||
return datetime.timedelta(**value) | ||||
def safe_json(get, section, key): | ||||
value = '' | ||||
try: | ||||
value = get(key) | ||||
json_value = json.loads(value) | ||||
except ValueError: | ||||
r5138 | msg = f'The {key}={value} is not valid json in section {section}' | |||
r2359 | raise ValueError(msg) | |||
return json_value | ||||
r2406 | def raw_2_schedule(schedule_value, schedule_type): | |||
schedule_type_map = { | ||||
r2359 | 'crontab': crontab, | |||
'timedelta': timedelta, | ||||
'integer': int | ||||
} | ||||
r2406 | scheduler_cls = schedule_type_map.get(schedule_type) | |||
if scheduler_cls is None: | ||||
r5138 | raise ValueError(f'schedule type {schedule_type} in section is invalid') | |||
r2406 | try: | |||
schedule = scheduler_cls(schedule_value) | ||||
except TypeError: | ||||
log.exception('Failed to compose a schedule from value: %r', schedule_value) | ||||
schedule = None | ||||
return schedule | ||||
def get_beat_config(parser, section): | ||||
r2359 | get = partial(parser.get, section) | |||
has_option = partial(parser.has_option, section) | ||||
schedule_type = get('type') | ||||
schedule_value = safe_json(get, section, 'schedule') | ||||
config = { | ||||
r2406 | 'schedule_type': schedule_type, | |||
'schedule_value': schedule_value, | ||||
r2359 | 'task': get('task'), | |||
} | ||||
r2406 | schedule = raw_2_schedule(schedule_value, schedule_type) | |||
if schedule: | ||||
config['schedule'] = schedule | ||||
r2359 | ||||
if has_option('args'): | ||||
config['args'] = safe_json(get, section, 'args') | ||||
if has_option('kwargs'): | ||||
config['kwargs'] = safe_json(get, section, 'kwargs') | ||||
r2406 | if has_option('force_update'): | |||
config['force_update'] = get('force_update') | ||||
r2359 | return config | |||
def parse_ini_vars(ini_vars): | ||||
options = {} | ||||
for pairs in ini_vars.split(','): | ||||
key, value = pairs.split('=') | ||||
options[key] = value | ||||
return options | ||||
r3390 | ||||
def ping_db(): | ||||
log.info('Testing DB connection...') | ||||
r5290 | from sqlalchemy import text | |||
from rhodecode.model import meta | ||||
qry = text("SELECT user_id from users where username = :uname") | ||||
r3390 | for test in range(10): | |||
try: | ||||
r5290 | engine = meta.get_engine() | |||
with meta.SA_Session(engine) as session: | ||||
result = session.execute(qry, {'uname': 'default'}) | ||||
user_id = result.first()[0] | ||||
log.debug('DB PING user_id:%s', user_id) | ||||
r3390 | break | |||
except Exception: | ||||
retry = 1 | ||||
log.debug('DB not ready, next try in %ss', retry) | ||||
time.sleep(retry) | ||||
finally: | ||||
meta.Session.remove() | ||||