# Copyright (C) 2010-2023 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import logging import datetime import time from functools import partial import configparser from celery.result import AsyncResult import celery.loaders.base import celery.schedules from rhodecode.lib.ext_json import sjson as json log = logging.getLogger(__name__) def get_task_id(task): task_id = None if isinstance(task, AsyncResult): task_id = task.task_id return task_id def crontab(value): return celery.schedules.crontab(**value) def timedelta(value): return datetime.timedelta(**value) def safe_json(get, section, key): value = '' try: value = get(key) json_value = json.loads(value) except ValueError: msg = f'The {key}={value} is not valid json in section {section}' raise ValueError(msg) return json_value def raw_2_schedule(schedule_value, schedule_type): schedule_type_map = { 'crontab': crontab, 'timedelta': timedelta, 'integer': int } scheduler_cls = schedule_type_map.get(schedule_type) if scheduler_cls is None: raise ValueError(f'schedule type {schedule_type} in section is invalid') try: schedule = scheduler_cls(schedule_value) except TypeError: log.exception('Failed to compose a schedule from value: %r', schedule_value) schedule = None return schedule def get_beat_config(parser, section): get = partial(parser.get, section) has_option = partial(parser.has_option, section) schedule_type = get('type') schedule_value = safe_json(get, section, 'schedule') config = { 'schedule_type': schedule_type, 'schedule_value': schedule_value, 'task': get('task'), } schedule = raw_2_schedule(schedule_value, schedule_type) if schedule: config['schedule'] = schedule if has_option('args'): config['args'] = safe_json(get, section, 'args') if has_option('kwargs'): config['kwargs'] = safe_json(get, section, 'kwargs') if has_option('force_update'): config['force_update'] = get('force_update') return config def parse_ini_vars(ini_vars): options = {} for pairs in ini_vars.split(','): key, value = pairs.split('=') options[key] = value return options def ping_db(): log.info('Testing DB connection...') from sqlalchemy import text from rhodecode.model import meta qry = text("SELECT user_id from users where username = :uname") for test in range(10): try: engine = meta.get_engine() with meta.SA_Session(engine) as session: result = session.execute(qry, {'uname': 'default'}) user_id = result.first()[0] log.debug('DB PING user_id:%s', user_id) break except Exception: retry = 1 log.debug('DB not ready, next try in %ss', retry) time.sleep(retry) finally: meta.Session.remove()