utils.py
169 lines
| 4.5 KiB
| text/x-python
|
PythonLexer
r2359 | # -*- coding: utf-8 -*- | |||
r3363 | # Copyright (C) 2010-2019 RhodeCode GmbH | |||
r2359 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import os | ||||
import json | ||||
import logging | ||||
import datetime | ||||
from functools import partial | ||||
from pyramid.compat import configparser | ||||
from celery.result import AsyncResult | ||||
import celery.loaders.base | ||||
import celery.schedules | ||||
log = logging.getLogger(__name__) | ||||
def get_task_id(task): | ||||
task_id = None | ||||
if isinstance(task, AsyncResult): | ||||
task_id = task.task_id | ||||
return task_id | ||||
def crontab(value): | ||||
return celery.schedules.crontab(**value) | ||||
def timedelta(value): | ||||
return datetime.timedelta(**value) | ||||
def safe_json(get, section, key): | ||||
value = '' | ||||
try: | ||||
value = get(key) | ||||
json_value = json.loads(value) | ||||
except ValueError: | ||||
msg = 'The %s=%s is not valid json in section %s' % ( | ||||
key, value, section | ||||
) | ||||
raise ValueError(msg) | ||||
return json_value | ||||
r2406 | def raw_2_schedule(schedule_value, schedule_type): | |||
schedule_type_map = { | ||||
r2359 | 'crontab': crontab, | |||
'timedelta': timedelta, | ||||
'integer': int | ||||
} | ||||
r2406 | scheduler_cls = schedule_type_map.get(schedule_type) | |||
if scheduler_cls is None: | ||||
raise ValueError( | ||||
'schedule type %s in section is invalid' % ( | ||||
schedule_type, | ||||
) | ||||
) | ||||
try: | ||||
schedule = scheduler_cls(schedule_value) | ||||
except TypeError: | ||||
log.exception('Failed to compose a schedule from value: %r', schedule_value) | ||||
schedule = None | ||||
return schedule | ||||
def get_beat_config(parser, section): | ||||
r2359 | get = partial(parser.get, section) | |||
has_option = partial(parser.has_option, section) | ||||
schedule_type = get('type') | ||||
schedule_value = safe_json(get, section, 'schedule') | ||||
config = { | ||||
r2406 | 'schedule_type': schedule_type, | |||
'schedule_value': schedule_value, | ||||
r2359 | 'task': get('task'), | |||
} | ||||
r2406 | schedule = raw_2_schedule(schedule_value, schedule_type) | |||
if schedule: | ||||
config['schedule'] = schedule | ||||
r2359 | ||||
if has_option('args'): | ||||
config['args'] = safe_json(get, section, 'args') | ||||
if has_option('kwargs'): | ||||
config['kwargs'] = safe_json(get, section, 'kwargs') | ||||
r2406 | if has_option('force_update'): | |||
config['force_update'] = get('force_update') | ||||
r2359 | return config | |||
def get_ini_config(ini_location): | ||||
""" | ||||
Converts basic ini configuration into celery 4.X options | ||||
""" | ||||
def key_converter(key_name): | ||||
pref = 'celery.' | ||||
if key_name.startswith(pref): | ||||
return key_name[len(pref):].replace('.', '_').lower() | ||||
def type_converter(parsed_key, value): | ||||
# cast to int | ||||
if value.isdigit(): | ||||
return int(value) | ||||
# cast to bool | ||||
if value.lower() in ['true', 'false', 'True', 'False']: | ||||
return value.lower() == 'true' | ||||
return value | ||||
parser = configparser.SafeConfigParser( | ||||
defaults={'here': os.path.abspath(ini_location)}) | ||||
parser.read(ini_location) | ||||
ini_config = {} | ||||
for k, v in parser.items('app:main'): | ||||
pref = 'celery.' | ||||
if k.startswith(pref): | ||||
ini_config[key_converter(k)] = type_converter(key_converter(k), v) | ||||
beat_config = {} | ||||
for section in parser.sections(): | ||||
if section.startswith('celerybeat:'): | ||||
name = section.split(':', 1)[1] | ||||
beat_config[name] = get_beat_config(parser, section) | ||||
# final compose of settings | ||||
celery_settings = {} | ||||
if ini_config: | ||||
celery_settings.update(ini_config) | ||||
if beat_config: | ||||
celery_settings.update({'beat_schedule': beat_config}) | ||||
return celery_settings | ||||
def parse_ini_vars(ini_vars): | ||||
options = {} | ||||
for pairs in ini_vars.split(','): | ||||
key, value = pairs.split('=') | ||||
options[key] = value | ||||
return options | ||||