# Copyright (C) 2010-2024 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ """ Celery loader, run with:: celery worker \ --task-events \ --beat \ --autoscale=20,2 \ --max-tasks-per-child 1 \ --app rhodecode.lib.celerylib.loader \ --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \ --loglevel DEBUG --ini=.dev/dev.ini """ from rhodecode.config import patches patches.inspect_getargspec() patches.inspect_formatargspec() # python3.11 inspect patches for backward compat on `paste` code patches.repoze_sendmail_lf_fix() import sys import logging import importlib import click from celery import Celery from celery import signals from celery import Task from celery import exceptions # noqa import rhodecode from rhodecode.lib.statsd_client import StatsdClient from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db from rhodecode.lib.ext_json import json from rhodecode.lib.pyramid_utils import bootstrap, setup_logging from rhodecode.lib.utils2 import str2bool from rhodecode.model import meta log = logging.getLogger('celery.rhodecode.loader') imports = ['rhodecode.lib.celerylib.tasks'] try: # try if we have EE tasks available importlib.import_module('rc_ee') imports.append('rc_ee.lib.celerylib.tasks') except ImportError: pass base_celery_config = { 'result_backend': 'rpc://', 'result_expires': 60 * 60 * 24, 'result_persistent': True, 'imports': imports, 'worker_max_tasks_per_child': 100, 'worker_hijack_root_logger': False, 'worker_prefetch_multiplier': 1, 'task_serializer': 'json', 'accept_content': ['json', 'msgpack'], 'result_serializer': 'json', 'result_accept_content': ['json', 'msgpack'], 'broker_connection_retry_on_startup': True, 'database_table_names': { 'task': 'beat_taskmeta', 'group': 'beat_groupmeta', } } preload_option_ini = click.Option( ('--ini',), help='Path to ini configuration file.' ) preload_option_ini_var = click.Option( ('--ini-var',), help='Comma separated list of key=value to pass to ini.' ) def get_logger(obj): custom_log = logging.getLogger( 'rhodecode.task.{}'.format(obj.__class__.__name__)) if rhodecode.CELERY_ENABLED: try: custom_log = obj.get_logger() except Exception: pass return custom_log # init main celery app celery_app = Celery() celery_app.user_options['preload'].add(preload_option_ini) celery_app.user_options['preload'].add(preload_option_ini_var) @signals.setup_logging.connect def setup_logging_callback(**kwargs): if 'RC_INI_FILE' in celery_app.conf: ini_file = celery_app.conf['RC_INI_FILE'] else: ini_file = celery_app.user_options['RC_INI_FILE'] setup_logging(ini_file) @signals.user_preload_options.connect def on_preload_parsed(options, **kwargs): ini_file = options['ini'] ini_vars = options['ini_var'] if ini_file is None: print('You must provide the --ini argument to start celery') exit(-1) options = None if ini_vars is not None: options = parse_ini_vars(ini_vars) celery_app.conf['RC_INI_FILE'] = ini_file celery_app.user_options['RC_INI_FILE'] = ini_file celery_app.conf['RC_INI_OPTIONS'] = options celery_app.user_options['RC_INI_OPTIONS'] = options setup_logging(ini_file) def _init_celery(app_type=''): from rhodecode.config.middleware import get_celery_config log.debug('Bootstrapping RhodeCode application for %s...', app_type) ini_file = celery_app.conf['RC_INI_FILE'] options = celery_app.conf['RC_INI_OPTIONS'] env = None try: env = bootstrap(ini_file, options=options) except Exception: log.exception('Failed to bootstrap RhodeCode APP. ' 'Probably there is another error present that prevents from running pyramid app') if not env: # we use sys.exit here since we need to signal app startup failure for docker to restart the container and re-try sys.exit(1) log.debug('Got Pyramid ENV: %s', env) settings = env['registry'].settings celery_settings = get_celery_config(settings) # init and bootstrap StatsdClient StatsdClient.setup(settings) setup_celery_app( app=env['app'], root=env['root'], request=env['request'], registry=env['registry'], closer=env['closer'], celery_settings=celery_settings) @signals.celeryd_init.connect def on_celeryd_init(sender=None, conf=None, **kwargs): _init_celery('celery worker') # fix the global flag even if it's disabled via .ini file because this # is a worker code that doesn't need this to be disabled. rhodecode.CELERY_ENABLED = True @signals.beat_init.connect def on_beat_init(sender=None, conf=None, **kwargs): _init_celery('celery beat') @signals.task_prerun.connect def task_prerun_signal(task_id, task, args, **kwargs): ping_db() statsd = StatsdClient.statsd if statsd: task_repr = getattr(task, 'name', task) statsd.incr('rhodecode_celery_task_total', tags=[ f'task:{task_repr}', 'mode:async' ]) @signals.task_success.connect def task_success_signal(result, **kwargs): meta.Session.commit() closer = celery_app.conf['PYRAMID_CLOSER'] if closer: closer() @signals.task_retry.connect def task_retry_signal( request, reason, einfo, **kwargs): meta.Session.remove() closer = celery_app.conf['PYRAMID_CLOSER'] if closer: closer() @signals.task_failure.connect def task_failure_signal( task_id, exception, args, kwargs, traceback, einfo, **kargs): log.error('Task: %s failed !! exc_info: %s', task_id, einfo) from rhodecode.lib.exc_tracking import store_exception from rhodecode.lib.statsd_client import StatsdClient meta.Session.remove() # simulate sys.exc_info() exc_info = (einfo.type, einfo.exception, einfo.tb) store_exception(id(exc_info), exc_info, prefix='rhodecode-celery') statsd = StatsdClient.statsd if statsd: exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__) statsd.incr('rhodecode_exception_total', tags=["exc_source:celery", "type:{}".format(exc_type)]) closer = celery_app.conf['PYRAMID_CLOSER'] if closer: closer() @signals.task_revoked.connect def task_revoked_signal( request, terminated, signum, expired, **kwargs): closer = celery_app.conf['PYRAMID_CLOSER'] if closer: closer() class UNSET(object): pass _unset = UNSET() def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset): if request is not UNSET: celery_app.conf.update({'PYRAMID_REQUEST': request}) if registry is not UNSET: celery_app.conf.update({'PYRAMID_REGISTRY': registry}) def setup_celery_app(app, root, request, registry, closer, celery_settings): log.debug('Got custom celery conf: %s', celery_settings) celery_config = base_celery_config celery_config.update({ # store celerybeat scheduler db where the .ini file is 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'], }) celery_config.update(celery_settings) celery_app.config_from_object(celery_config) celery_app.conf.update({'PYRAMID_APP': app}) celery_app.conf.update({'PYRAMID_ROOT': root}) celery_app.conf.update({'PYRAMID_REQUEST': request}) celery_app.conf.update({'PYRAMID_REGISTRY': registry}) celery_app.conf.update({'PYRAMID_CLOSER': closer}) def configure_celery(config, celery_settings): """ Helper that is called from our application creation logic. It gives connection info into running webapp and allows execution of tasks from RhodeCode itself """ # store some globals into rhodecode rhodecode.CELERY_ENABLED = str2bool( config.registry.settings.get('use_celery')) if rhodecode.CELERY_ENABLED: log.info('Configuring celery based on `%s` settings', celery_settings) setup_celery_app( app=None, root=None, request=None, registry=config.registry, closer=None, celery_settings=celery_settings) def maybe_prepare_env(req): environ = {} try: environ.update({ 'PATH_INFO': req.environ['PATH_INFO'], 'SCRIPT_NAME': req.environ['SCRIPT_NAME'], 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']), 'SERVER_NAME': req.environ['SERVER_NAME'], 'SERVER_PORT': req.environ['SERVER_PORT'], 'wsgi.url_scheme': req.environ['wsgi.url_scheme'], }) except Exception: pass return environ class RequestContextTask(Task): """ This is a celery task which will create a rhodecode app instance context for the task, patch pyramid with the original request that created the task and also add the user to the context. """ def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, shadow=None, **options): """ queue the job to run (we are in web request context here) """ from rhodecode.lib.base import get_ip_addr req = self.app.conf['PYRAMID_REQUEST'] if not req: raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key') log.debug('Running Task with class: %s. Request Class: %s', self.__class__, req.__class__) user_id = 0 # web case if hasattr(req, 'user'): user_id = req.user.user_id # api case elif hasattr(req, 'rpc_user'): user_id = req.rpc_user.user_id # we hook into kwargs since it is the only way to pass our data to # the celery worker environ = maybe_prepare_env(req) options['headers'] = options.get('headers', {}) options['headers'].update({ 'rhodecode_proxy_data': { 'environ': environ, 'auth_user': { 'ip_addr': get_ip_addr(req.environ), 'user_id': user_id }, } }) return super(RequestContextTask, self).apply_async( args, kwargs, task_id, producer, link, link_error, shadow, **options)