##// END OF EJS Templates
release: added 2 ee-only features to release notes
release: added 2 ee-only features to release notes

File last commit:

r5608:6d33e504 default
r5661:2ba2d4b2 default
Show More
utils.py
138 lines | 3.8 KiB | text/x-python | PythonLexer
core: updated copyright to 2024
r5608 # Copyright (C) 2010-2024 RhodeCode GmbH
celery: celery 4.X support. Fixes #4169...
r2359 #
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import logging
import datetime
celery: ping db connection before task execution to recycle db connections.
r3390 import time
celery: celery 4.X support. Fixes #4169...
r2359
from functools import partial
python3: fix import configparser
r4927 import configparser
celery: celery 4.X support. Fixes #4169...
r2359 from celery.result import AsyncResult
import celery.loaders.base
import celery.schedules
core: multiple fixes to unicode vs str usage...
r5065 from rhodecode.lib.ext_json import sjson as json
celery: celery 4.X support. Fixes #4169...
r2359 log = logging.getLogger(__name__)
def get_task_id(task):
task_id = None
if isinstance(task, AsyncResult):
task_id = task.task_id
return task_id
def crontab(value):
return celery.schedules.crontab(**value)
def timedelta(value):
return datetime.timedelta(**value)
def safe_json(get, section, key):
value = ''
try:
value = get(key)
json_value = json.loads(value)
except ValueError:
small changes for scheduler fixes of EE edition
r5138 msg = f'The {key}={value} is not valid json in section {section}'
celery: celery 4.X support. Fixes #4169...
r2359 raise ValueError(msg)
return json_value
scheduler: added DB models and db parsers for the RhodeCode scheduler....
r2406 def raw_2_schedule(schedule_value, schedule_type):
schedule_type_map = {
celery: celery 4.X support. Fixes #4169...
r2359 'crontab': crontab,
'timedelta': timedelta,
'integer': int
}
scheduler: added DB models and db parsers for the RhodeCode scheduler....
r2406 scheduler_cls = schedule_type_map.get(schedule_type)
if scheduler_cls is None:
small changes for scheduler fixes of EE edition
r5138 raise ValueError(f'schedule type {schedule_type} in section is invalid')
scheduler: added DB models and db parsers for the RhodeCode scheduler....
r2406 try:
schedule = scheduler_cls(schedule_value)
except TypeError:
log.exception('Failed to compose a schedule from value: %r', schedule_value)
schedule = None
return schedule
def get_beat_config(parser, section):
celery: celery 4.X support. Fixes #4169...
r2359 get = partial(parser.get, section)
has_option = partial(parser.has_option, section)
schedule_type = get('type')
schedule_value = safe_json(get, section, 'schedule')
config = {
scheduler: added DB models and db parsers for the RhodeCode scheduler....
r2406 'schedule_type': schedule_type,
'schedule_value': schedule_value,
celery: celery 4.X support. Fixes #4169...
r2359 'task': get('task'),
}
scheduler: added DB models and db parsers for the RhodeCode scheduler....
r2406 schedule = raw_2_schedule(schedule_value, schedule_type)
if schedule:
config['schedule'] = schedule
celery: celery 4.X support. Fixes #4169...
r2359
if has_option('args'):
config['args'] = safe_json(get, section, 'args')
if has_option('kwargs'):
config['kwargs'] = safe_json(get, section, 'kwargs')
scheduler: added DB models and db parsers for the RhodeCode scheduler....
r2406 if has_option('force_update'):
config['force_update'] = get('force_update')
celery: celery 4.X support. Fixes #4169...
r2359 return config
def parse_ini_vars(ini_vars):
options = {}
for pairs in ini_vars.split(','):
key, value = pairs.split('=')
options[key] = value
return options
celery: ping db connection before task execution to recycle db connections.
r3390
def ping_db():
log.info('Testing DB connection...')
fix(celery): use a lightweight method of db ping to speed up tasks executions
r5290 from sqlalchemy import text
from rhodecode.model import meta
qry = text("SELECT user_id from users where username = :uname")
celery: ping db connection before task execution to recycle db connections.
r3390 for test in range(10):
try:
fix(celery): use a lightweight method of db ping to speed up tasks executions
r5290 engine = meta.get_engine()
with meta.SA_Session(engine) as session:
result = session.execute(qry, {'uname': 'default'})
user_id = result.first()[0]
log.debug('DB PING user_id:%s', user_id)
celery: ping db connection before task execution to recycle db connections.
r3390 break
except Exception:
retry = 1
log.debug('DB not ready, next try in %ss', retry)
time.sleep(retry)
finally:
meta.Session.remove()