base.py
115 lines
| 3.9 KiB
| text/x-python
|
PythonLexer
r5325 | # Copyright (C) 2010-2023 RhodeCode GmbH | |||
# | ||||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import os | ||||
import time | ||||
import logging | ||||
import tempfile | ||||
from rhodecode.lib.config_utils import get_config | ||||
from rhodecode.lib.ext_json import json | ||||
log = logging.getLogger(__name__) | ||||
class BaseHooksCallbackDaemon: | ||||
""" | ||||
Basic context manager for actions that don't require some extra | ||||
""" | ||||
def __init__(self): | ||||
pass | ||||
def __enter__(self): | ||||
log.debug('Running `%s` callback daemon', self.__class__.__name__) | ||||
return self | ||||
def __exit__(self, exc_type, exc_val, exc_tb): | ||||
log.debug('Exiting `%s` callback daemon', self.__class__.__name__) | ||||
class HooksModuleCallbackDaemon(BaseHooksCallbackDaemon): | ||||
def __init__(self, module): | ||||
super().__init__() | ||||
self.hooks_module = module | ||||
def get_txn_id_data_path(txn_id): | ||||
import rhodecode | ||||
root = rhodecode.CONFIG.get('cache_dir') or tempfile.gettempdir() | ||||
final_dir = os.path.join(root, 'svn_txn_id') | ||||
if not os.path.isdir(final_dir): | ||||
os.makedirs(final_dir) | ||||
return os.path.join(final_dir, 'rc_txn_id_{}'.format(txn_id)) | ||||
def store_txn_id_data(txn_id, data_dict): | ||||
if not txn_id: | ||||
log.warning('Cannot store txn_id because it is empty') | ||||
return | ||||
path = get_txn_id_data_path(txn_id) | ||||
try: | ||||
with open(path, 'wb') as f: | ||||
f.write(json.dumps(data_dict)) | ||||
except Exception: | ||||
log.exception('Failed to write txn_id metadata') | ||||
def get_txn_id_from_store(txn_id): | ||||
""" | ||||
Reads txn_id from store and if present returns the data for callback manager | ||||
""" | ||||
path = get_txn_id_data_path(txn_id) | ||||
try: | ||||
with open(path, 'rb') as f: | ||||
return json.loads(f.read()) | ||||
except Exception: | ||||
return {} | ||||
def prepare_callback_daemon(extras, protocol, host, txn_id=None): | ||||
txn_details = get_txn_id_from_store(txn_id) | ||||
port = txn_details.get('port', 0) | ||||
match protocol: | ||||
case 'http': | ||||
from rhodecode.lib.hook_daemon.http_hooks_deamon import HttpHooksCallbackDaemon | ||||
callback_daemon = HttpHooksCallbackDaemon( | ||||
txn_id=txn_id, host=host, port=port) | ||||
case 'celery': | ||||
from rhodecode.lib.hook_daemon.celery_hooks_deamon import CeleryHooksCallbackDaemon | ||||
callback_daemon = CeleryHooksCallbackDaemon(get_config(extras['config'])) | ||||
case 'local': | ||||
from rhodecode.lib.hook_daemon.hook_module import Hooks | ||||
callback_daemon = HooksModuleCallbackDaemon(Hooks.__module__) | ||||
case _: | ||||
log.error('Unsupported callback daemon protocol "%s"', protocol) | ||||
raise Exception('Unsupported callback daemon protocol.') | ||||
extras['hooks_uri'] = getattr(callback_daemon, 'hooks_uri', '') | ||||
extras['task_queue'] = getattr(callback_daemon, 'task_queue', '') | ||||
extras['task_backend'] = getattr(callback_daemon, 'task_backend', '') | ||||
extras['hooks_protocol'] = protocol | ||||
extras['time'] = time.time() | ||||
# register txn_id | ||||
extras['txn_id'] = txn_id | ||||
log.debug('Prepared a callback daemon: %s', | ||||
callback_daemon.__class__.__name__) | ||||
return callback_daemon, extras | ||||