utils.py
141 lines
| 3.7 KiB
| text/x-python
|
PythonLexer
r2359 | # -*- coding: utf-8 -*- | |||
r4306 | # Copyright (C) 2010-2020 RhodeCode GmbH | |||
r2359 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import os | ||||
import json | ||||
import logging | ||||
import datetime | ||||
r3390 | import time | |||
r2359 | ||||
from functools import partial | ||||
from pyramid.compat import configparser | ||||
from celery.result import AsyncResult | ||||
import celery.loaders.base | ||||
import celery.schedules | ||||
log = logging.getLogger(__name__) | ||||
def get_task_id(task): | ||||
task_id = None | ||||
if isinstance(task, AsyncResult): | ||||
task_id = task.task_id | ||||
return task_id | ||||
def crontab(value): | ||||
return celery.schedules.crontab(**value) | ||||
def timedelta(value): | ||||
return datetime.timedelta(**value) | ||||
def safe_json(get, section, key): | ||||
value = '' | ||||
try: | ||||
value = get(key) | ||||
json_value = json.loads(value) | ||||
except ValueError: | ||||
msg = 'The %s=%s is not valid json in section %s' % ( | ||||
key, value, section | ||||
) | ||||
raise ValueError(msg) | ||||
return json_value | ||||
r2406 | def raw_2_schedule(schedule_value, schedule_type): | |||
schedule_type_map = { | ||||
r2359 | 'crontab': crontab, | |||
'timedelta': timedelta, | ||||
'integer': int | ||||
} | ||||
r2406 | scheduler_cls = schedule_type_map.get(schedule_type) | |||
if scheduler_cls is None: | ||||
raise ValueError( | ||||
'schedule type %s in section is invalid' % ( | ||||
schedule_type, | ||||
) | ||||
) | ||||
try: | ||||
schedule = scheduler_cls(schedule_value) | ||||
except TypeError: | ||||
log.exception('Failed to compose a schedule from value: %r', schedule_value) | ||||
schedule = None | ||||
return schedule | ||||
def get_beat_config(parser, section): | ||||
r2359 | get = partial(parser.get, section) | |||
has_option = partial(parser.has_option, section) | ||||
schedule_type = get('type') | ||||
schedule_value = safe_json(get, section, 'schedule') | ||||
config = { | ||||
r2406 | 'schedule_type': schedule_type, | |||
'schedule_value': schedule_value, | ||||
r2359 | 'task': get('task'), | |||
} | ||||
r2406 | schedule = raw_2_schedule(schedule_value, schedule_type) | |||
if schedule: | ||||
config['schedule'] = schedule | ||||
r2359 | ||||
if has_option('args'): | ||||
config['args'] = safe_json(get, section, 'args') | ||||
if has_option('kwargs'): | ||||
config['kwargs'] = safe_json(get, section, 'kwargs') | ||||
r2406 | if has_option('force_update'): | |||
config['force_update'] = get('force_update') | ||||
r2359 | return config | |||
def parse_ini_vars(ini_vars): | ||||
options = {} | ||||
for pairs in ini_vars.split(','): | ||||
key, value = pairs.split('=') | ||||
options[key] = value | ||||
return options | ||||
r3390 | ||||
def ping_db(): | ||||
from rhodecode.model import meta | ||||
from rhodecode.model.db import DbMigrateVersion | ||||
log.info('Testing DB connection...') | ||||
for test in range(10): | ||||
try: | ||||
scalar = DbMigrateVersion.query().scalar() | ||||
log.debug('DB PING %s@%s', scalar, scalar.version) | ||||
break | ||||
except Exception: | ||||
retry = 1 | ||||
log.debug('DB not ready, next try in %ss', retry) | ||||
time.sleep(retry) | ||||
finally: | ||||
meta.Session.remove() | ||||