# Copyright (C) 2010-2024 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
"""
Celery loader, run with::

    celery worker \
        --task-events \
        --beat \
        --autoscale=20,2 \
        --max-tasks-per-child 1 \
        --app rhodecode.lib.celerylib.loader \
        --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
        --loglevel DEBUG --ini=.dev/dev.ini
"""
from rhodecode.config import patches
patches.inspect_getargspec()
patches.inspect_formatargspec()
# python3.11 inspect patches for backward compat on `paste` code
patches.repoze_sendmail_lf_fix()

import sys
import logging
import importlib

import click
from celery import Celery
from celery import signals
from celery import Task
from celery import exceptions  # noqa

import rhodecode

from rhodecode.lib.statsd_client import StatsdClient
from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
from rhodecode.lib.ext_json import json
from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
from rhodecode.lib.utils2 import str2bool
from rhodecode.model import meta

log = logging.getLogger('celery.rhodecode.loader')


imports = ['rhodecode.lib.celerylib.tasks']

try:
    # try if we have EE tasks available
    importlib.import_module('rc_ee')
    imports.append('rc_ee.lib.celerylib.tasks')
except ImportError:
    pass


base_celery_config = {
    'result_backend': 'rpc://',
    'result_expires': 60 * 60 * 24,
    'result_persistent': True,
    'imports': imports,
    'worker_max_tasks_per_child': 100,
    'worker_hijack_root_logger': False,
    'worker_prefetch_multiplier': 1,
    'task_serializer': 'json',
    'accept_content': ['json', 'msgpack'],
    'result_serializer': 'json',
    'result_accept_content': ['json', 'msgpack'],

    'broker_connection_retry_on_startup': True,
    'database_table_names': {
        'task': 'beat_taskmeta',
        'group': 'beat_groupmeta',
    }
}


preload_option_ini = click.Option(
    ('--ini',),
    help='Path to ini configuration file.'
)

preload_option_ini_var = click.Option(
    ('--ini-var',),
    help='Comma separated list of key=value to pass to ini.'
)


def get_logger(obj):
    custom_log = logging.getLogger(
        'rhodecode.task.{}'.format(obj.__class__.__name__))

    if rhodecode.CELERY_ENABLED:
        try:
            custom_log = obj.get_logger()
        except Exception:
            pass

    return custom_log


# init main celery app
celery_app = Celery()
celery_app.user_options['preload'].add(preload_option_ini)
celery_app.user_options['preload'].add(preload_option_ini_var)


@signals.setup_logging.connect
def setup_logging_callback(**kwargs):

    if 'RC_INI_FILE' in celery_app.conf:
        ini_file = celery_app.conf['RC_INI_FILE']
    else:
        ini_file = celery_app.user_options['RC_INI_FILE']

    setup_logging(ini_file)


@signals.user_preload_options.connect
def on_preload_parsed(options, **kwargs):

    ini_file = options['ini']
    ini_vars = options['ini_var']

    if ini_file is None:
        print('You must provide the --ini argument to start celery')
        exit(-1)

    options = None
    if ini_vars is not None:
        options = parse_ini_vars(ini_vars)

    celery_app.conf['RC_INI_FILE'] = ini_file
    celery_app.user_options['RC_INI_FILE'] = ini_file

    celery_app.conf['RC_INI_OPTIONS'] = options
    celery_app.user_options['RC_INI_OPTIONS'] = options

    setup_logging(ini_file)


def _init_celery(app_type=''):
    from rhodecode.config.middleware import get_celery_config

    log.debug('Bootstrapping RhodeCode application for %s...', app_type)

    ini_file = celery_app.conf['RC_INI_FILE']
    options = celery_app.conf['RC_INI_OPTIONS']

    env = None
    try:
        env = bootstrap(ini_file, options=options)
    except Exception:
        log.exception('Failed to bootstrap RhodeCode APP. '
                      'Probably there is another error present that prevents from running pyramid app')

    if not env:
        # we use sys.exit here since we need to signal app startup failure for docker to restart the container and re-try
        sys.exit(1)

    log.debug('Got Pyramid ENV: %s', env)

    settings = env['registry'].settings
    celery_settings = get_celery_config(settings)

    # init and bootstrap StatsdClient
    StatsdClient.setup(settings)

    setup_celery_app(
        app=env['app'], root=env['root'], request=env['request'],
        registry=env['registry'], closer=env['closer'],
        celery_settings=celery_settings)


@signals.celeryd_init.connect
def on_celeryd_init(sender=None, conf=None, **kwargs):
    _init_celery('celery worker')

    # fix the global flag even if it's disabled via .ini file because this
    # is a worker code that doesn't need this to be disabled.
    rhodecode.CELERY_ENABLED = True


@signals.beat_init.connect
def on_beat_init(sender=None, conf=None, **kwargs):
    _init_celery('celery beat')


@signals.task_prerun.connect
def task_prerun_signal(task_id, task, args, **kwargs):
    ping_db()
    statsd = StatsdClient.statsd

    if statsd:
        task_repr = getattr(task, 'name', task)
        statsd.incr('rhodecode_celery_task_total', tags=[
            f'task:{task_repr}',
            'mode:async'
        ])


@signals.task_success.connect
def task_success_signal(result, **kwargs):
    meta.Session.commit()
    closer = celery_app.conf['PYRAMID_CLOSER']
    if closer:
        closer()


@signals.task_retry.connect
def task_retry_signal(
        request, reason, einfo, **kwargs):
    meta.Session.remove()
    closer = celery_app.conf['PYRAMID_CLOSER']
    if closer:
        closer()


@signals.task_failure.connect
def task_failure_signal(
        task_id, exception, args, kwargs, traceback, einfo, **kargs):

    log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
    from rhodecode.lib.exc_tracking import store_exception
    from rhodecode.lib.statsd_client import StatsdClient

    meta.Session.remove()

    # simulate sys.exc_info()
    exc_info = (einfo.type, einfo.exception, einfo.tb)
    store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
    statsd = StatsdClient.statsd
    if statsd:
        exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
        statsd.incr('rhodecode_exception_total',
                    tags=["exc_source:celery", "type:{}".format(exc_type)])

    closer = celery_app.conf['PYRAMID_CLOSER']
    if closer:
        closer()


@signals.task_revoked.connect
def task_revoked_signal(
        request, terminated, signum, expired, **kwargs):
    closer = celery_app.conf['PYRAMID_CLOSER']
    if closer:
        closer()


class UNSET(object):
    pass


_unset = UNSET()


def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset):

    if request is not UNSET:
        celery_app.conf.update({'PYRAMID_REQUEST': request})

    if registry is not UNSET:
        celery_app.conf.update({'PYRAMID_REGISTRY': registry})


def setup_celery_app(app, root, request, registry, closer, celery_settings):
    log.debug('Got custom celery conf: %s', celery_settings)
    celery_config = base_celery_config
    celery_config.update({
        # store celerybeat scheduler db where the .ini file is
        'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
    })

    celery_config.update(celery_settings)
    celery_app.config_from_object(celery_config)

    celery_app.conf.update({'PYRAMID_APP': app})
    celery_app.conf.update({'PYRAMID_ROOT': root})
    celery_app.conf.update({'PYRAMID_REQUEST': request})
    celery_app.conf.update({'PYRAMID_REGISTRY': registry})
    celery_app.conf.update({'PYRAMID_CLOSER': closer})


def configure_celery(config, celery_settings):
    """
    Helper that is called from our application creation logic. It gives
    connection info into running webapp and allows execution of tasks from
    RhodeCode itself
    """
    # store some globals into rhodecode
    rhodecode.CELERY_ENABLED = str2bool(
        config.registry.settings.get('use_celery'))
    if rhodecode.CELERY_ENABLED:
        log.info('Configuring celery based on `%s` settings', celery_settings)
        setup_celery_app(
            app=None, root=None, request=None, registry=config.registry,
            closer=None, celery_settings=celery_settings)


def maybe_prepare_env(req):
    environ = {}
    try:
        environ.update({
            'PATH_INFO': req.environ['PATH_INFO'],
            'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
            'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
            'SERVER_NAME': req.environ['SERVER_NAME'],
            'SERVER_PORT': req.environ['SERVER_PORT'],
            'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
        })
    except Exception:
        pass

    return environ


class RequestContextTask(Task):
    """
    This is a celery task which will create a rhodecode app instance context
    for the task, patch pyramid with the original request
    that created the task and also add the user to the context.
    """

    def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
                    link=None, link_error=None, shadow=None, **options):
        """ queue the job to run (we are in web request context here) """
        from rhodecode.lib.base import get_ip_addr

        req = self.app.conf['PYRAMID_REQUEST']
        if not req:
            raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')

        log.debug('Running Task with class: %s. Request Class: %s',
                  self.__class__, req.__class__)

        user_id = 0

        # web case
        if hasattr(req, 'user'):
            user_id = req.user.user_id

        # api case
        elif hasattr(req, 'rpc_user'):
            user_id = req.rpc_user.user_id

        # we hook into kwargs since it is the only way to pass our data to
        # the celery worker
        environ = maybe_prepare_env(req)
        options['headers'] = options.get('headers', {})
        options['headers'].update({
            'rhodecode_proxy_data': {
                'environ': environ,
                'auth_user': {
                    'ip_addr': get_ip_addr(req.environ),
                    'user_id': user_id
                },
            }
        })

        return super(RequestContextTask, self).apply_async(
            args, kwargs, task_id, producer, link, link_error, shadow, **options)