loader.py
372 lines
| 11.0 KiB
| text/x-python
|
PythonLexer
r5088 | # Copyright (C) 2010-2023 RhodeCode GmbH | |||
r2359 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
""" | ||||
Celery loader, run with:: | ||||
r2406 | celery worker \ | |||
r4868 | --task-events \ | |||
r2406 | --beat \ | |||
r4872 | --autoscale=20,2 \ | |||
--max-tasks-per-child 1 \ | ||||
r2406 | --app rhodecode.lib.celerylib.loader \ | |||
--scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \ | ||||
r4868 | --loglevel DEBUG --ini=.dev/dev.ini | |||
r2359 | """ | |||
r5102 | from rhodecode.config.patches import inspect_getargspec, inspect_formatargspec | |||
inspect_getargspec() | ||||
inspect_formatargspec() | ||||
# python3.11 inspect patches for backward compat on `paste` code | ||||
r5403 | import sys | |||
r2359 | import logging | |||
r2465 | import importlib | |||
r2359 | ||||
r5102 | import click | |||
r2359 | from celery import Celery | |||
from celery import signals | ||||
from celery import Task | ||||
r5102 | from celery import exceptions # noqa | |||
r2359 | ||||
import rhodecode | ||||
r4892 | from rhodecode.lib.statsd_client import StatsdClient | |||
r4823 | from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db | |||
r2359 | from rhodecode.lib.ext_json import json | |||
r4873 | from rhodecode.lib.pyramid_utils import bootstrap, setup_logging | |||
r2359 | from rhodecode.lib.utils2 import str2bool | |||
from rhodecode.model import meta | ||||
log = logging.getLogger('celery.rhodecode.loader') | ||||
r2465 | imports = ['rhodecode.lib.celerylib.tasks'] | |||
try: | ||||
# try if we have EE tasks available | ||||
importlib.import_module('rc_ee') | ||||
imports.append('rc_ee.lib.celerylib.tasks') | ||||
except ImportError: | ||||
pass | ||||
r2359 | base_celery_config = { | |||
'result_backend': 'rpc://', | ||||
'result_expires': 60 * 60 * 24, | ||||
'result_persistent': True, | ||||
r2465 | 'imports': imports, | |||
r5299 | 'worker_max_tasks_per_child': 100, | |||
'worker_hijack_root_logger': False, | ||||
'worker_prefetch_multiplier': 1, | ||||
r5406 | 'task_serializer': 'json', | |||
r5296 | 'accept_content': ['json', 'msgpack'], | |||
r5406 | 'result_serializer': 'json', | |||
r5296 | 'result_accept_content': ['json', 'msgpack'], | |||
r5299 | ||||
r5291 | 'broker_connection_retry_on_startup': True, | |||
r2406 | 'database_table_names': { | |||
'task': 'beat_taskmeta', | ||||
'group': 'beat_groupmeta', | ||||
} | ||||
r2359 | } | |||
r4880 | ||||
r5102 | preload_option_ini = click.Option( | |||
('--ini',), | ||||
help='Path to ini configuration file.' | ||||
) | ||||
preload_option_ini_var = click.Option( | ||||
('--ini-var',), | ||||
help='Comma separated list of key=value to pass to ini.' | ||||
) | ||||
r4880 | ||||
def get_logger(obj): | ||||
custom_log = logging.getLogger( | ||||
'rhodecode.task.{}'.format(obj.__class__.__name__)) | ||||
if rhodecode.CELERY_ENABLED: | ||||
try: | ||||
custom_log = obj.get_logger() | ||||
except Exception: | ||||
pass | ||||
return custom_log | ||||
r2359 | # init main celery app | |||
celery_app = Celery() | ||||
r5102 | celery_app.user_options['preload'].add(preload_option_ini) | |||
celery_app.user_options['preload'].add(preload_option_ini_var) | ||||
r2359 | ||||
@signals.setup_logging.connect | ||||
def setup_logging_callback(**kwargs): | ||||
r5244 | ||||
if 'RC_INI_FILE' in celery_app.conf: | ||||
ini_file = celery_app.conf['RC_INI_FILE'] | ||||
else: | ||||
ini_file = celery_app.user_options['RC_INI_FILE'] | ||||
r4880 | setup_logging(ini_file) | |||
r2359 | ||||
@signals.user_preload_options.connect | ||||
def on_preload_parsed(options, **kwargs): | ||||
r4823 | ||||
r4880 | ini_file = options['ini'] | |||
r2359 | ini_vars = options['ini_var'] | |||
r4880 | if ini_file is None: | |||
r4889 | print('You must provide the --ini argument to start celery') | |||
r2359 | exit(-1) | |||
options = None | ||||
if ini_vars is not None: | ||||
options = parse_ini_vars(ini_vars) | ||||
r4880 | celery_app.conf['RC_INI_FILE'] = ini_file | |||
r5244 | celery_app.user_options['RC_INI_FILE'] = ini_file | |||
r4880 | celery_app.conf['RC_INI_OPTIONS'] = options | |||
r5244 | celery_app.user_options['RC_INI_OPTIONS'] = options | |||
r4880 | setup_logging(ini_file) | |||
r4889 | def _init_celery(app_type=''): | |||
r4880 | from rhodecode.config.middleware import get_celery_config | |||
r2359 | ||||
r4889 | log.debug('Bootstrapping RhodeCode application for %s...', app_type) | |||
r4872 | ||||
r4889 | ini_file = celery_app.conf['RC_INI_FILE'] | |||
options = celery_app.conf['RC_INI_OPTIONS'] | ||||
r4880 | ||||
r4889 | env = None | |||
r4868 | try: | |||
r4880 | env = bootstrap(ini_file, options=options) | |||
r4868 | except Exception: | |||
r5403 | log.exception('Failed to bootstrap RhodeCode APP. ' | |||
'Probably there is another error present that prevents from running pyramid app') | ||||
r2359 | ||||
r4889 | if not env: | |||
r5403 | # we use sys.exit here since we need to signal app startup failure for docker to restart the container and re-try | |||
sys.exit(1) | ||||
r4889 | ||||
r4872 | log.debug('Got Pyramid ENV: %s', env) | |||
r4876 | ||||
r5243 | settings = env['registry'].settings | |||
celery_settings = get_celery_config(settings) | ||||
# init and bootstrap StatsdClient | ||||
StatsdClient.setup(settings) | ||||
r4872 | ||||
r2359 | setup_celery_app( | |||
app=env['app'], root=env['root'], request=env['request'], | ||||
registry=env['registry'], closer=env['closer'], | ||||
r4823 | celery_settings=celery_settings) | |||
r2359 | ||||
r4889 | ||||
@signals.celeryd_init.connect | ||||
def on_celeryd_init(sender=None, conf=None, **kwargs): | ||||
_init_celery('celery worker') | ||||
r2359 | # fix the global flag even if it's disabled via .ini file because this | |||
# is a worker code that doesn't need this to be disabled. | ||||
rhodecode.CELERY_ENABLED = True | ||||
r4889 | @signals.beat_init.connect | |||
def on_beat_init(sender=None, conf=None, **kwargs): | ||||
_init_celery('celery beat') | ||||
r3390 | @signals.task_prerun.connect | |||
def task_prerun_signal(task_id, task, args, **kwargs): | ||||
ping_db() | ||||
r4892 | statsd = StatsdClient.statsd | |||
r5243 | ||||
r4892 | if statsd: | |||
task_repr = getattr(task, 'name', task) | ||||
statsd.incr('rhodecode_celery_task_total', tags=[ | ||||
r5243 | f'task:{task_repr}', | |||
r4892 | 'mode:async' | |||
]) | ||||
r3390 | ||||
r2359 | @signals.task_success.connect | |||
def task_success_signal(result, **kwargs): | ||||
meta.Session.commit() | ||||
r2464 | closer = celery_app.conf['PYRAMID_CLOSER'] | |||
if closer: | ||||
closer() | ||||
r2359 | ||||
@signals.task_retry.connect | ||||
def task_retry_signal( | ||||
request, reason, einfo, **kwargs): | ||||
meta.Session.remove() | ||||
r2464 | closer = celery_app.conf['PYRAMID_CLOSER'] | |||
if closer: | ||||
closer() | ||||
r2359 | ||||
@signals.task_failure.connect | ||||
def task_failure_signal( | ||||
task_id, exception, args, kwargs, traceback, einfo, **kargs): | ||||
r4875 | ||||
log.error('Task: %s failed !! exc_info: %s', task_id, einfo) | ||||
r3020 | from rhodecode.lib.exc_tracking import store_exception | |||
r4801 | from rhodecode.lib.statsd_client import StatsdClient | |||
r3020 | ||||
r2359 | meta.Session.remove() | |||
r3020 | ||||
# simulate sys.exc_info() | ||||
exc_info = (einfo.type, einfo.exception, einfo.tb) | ||||
r3335 | store_exception(id(exc_info), exc_info, prefix='rhodecode-celery') | |||
r4801 | statsd = StatsdClient.statsd | |||
if statsd: | ||||
r4808 | exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__) | |||
statsd.incr('rhodecode_exception_total', | ||||
tags=["exc_source:celery", "type:{}".format(exc_type)]) | ||||
r3020 | ||||
r2464 | closer = celery_app.conf['PYRAMID_CLOSER'] | |||
if closer: | ||||
closer() | ||||
r2359 | ||||
@signals.task_revoked.connect | ||||
def task_revoked_signal( | ||||
request, terminated, signum, expired, **kwargs): | ||||
r2464 | closer = celery_app.conf['PYRAMID_CLOSER'] | |||
if closer: | ||||
closer() | ||||
r2359 | ||||
r4878 | class UNSET(object): | |||
pass | ||||
r5019 | _unset = UNSET() | |||
def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset): | ||||
r4878 | ||||
if request is not UNSET: | ||||
celery_app.conf.update({'PYRAMID_REQUEST': request}) | ||||
if registry is not UNSET: | ||||
celery_app.conf.update({'PYRAMID_REGISTRY': registry}) | ||||
r4823 | def setup_celery_app(app, root, request, registry, closer, celery_settings): | |||
log.debug('Got custom celery conf: %s', celery_settings) | ||||
r2359 | celery_config = base_celery_config | |||
celery_config.update({ | ||||
# store celerybeat scheduler db where the .ini file is | ||||
r4823 | 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'], | |||
r2359 | }) | |||
r4823 | celery_config.update(celery_settings) | |||
r2359 | celery_app.config_from_object(celery_config) | |||
celery_app.conf.update({'PYRAMID_APP': app}) | ||||
celery_app.conf.update({'PYRAMID_ROOT': root}) | ||||
celery_app.conf.update({'PYRAMID_REQUEST': request}) | ||||
celery_app.conf.update({'PYRAMID_REGISTRY': registry}) | ||||
celery_app.conf.update({'PYRAMID_CLOSER': closer}) | ||||
r4823 | def configure_celery(config, celery_settings): | |||
r2359 | """ | |||
Helper that is called from our application creation logic. It gives | ||||
connection info into running webapp and allows execution of tasks from | ||||
RhodeCode itself | ||||
""" | ||||
# store some globals into rhodecode | ||||
rhodecode.CELERY_ENABLED = str2bool( | ||||
config.registry.settings.get('use_celery')) | ||||
if rhodecode.CELERY_ENABLED: | ||||
r4823 | log.info('Configuring celery based on `%s` settings', celery_settings) | |||
r2359 | setup_celery_app( | |||
app=None, root=None, request=None, registry=config.registry, | ||||
r4823 | closer=None, celery_settings=celery_settings) | |||
r2359 | ||||
r2417 | def maybe_prepare_env(req): | |||
environ = {} | ||||
try: | ||||
environ.update({ | ||||
'PATH_INFO': req.environ['PATH_INFO'], | ||||
'SCRIPT_NAME': req.environ['SCRIPT_NAME'], | ||||
r4823 | 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']), | |||
r2417 | 'SERVER_NAME': req.environ['SERVER_NAME'], | |||
'SERVER_PORT': req.environ['SERVER_PORT'], | ||||
'wsgi.url_scheme': req.environ['wsgi.url_scheme'], | ||||
}) | ||||
except Exception: | ||||
pass | ||||
return environ | ||||
r2359 | class RequestContextTask(Task): | |||
""" | ||||
This is a celery task which will create a rhodecode app instance context | ||||
for the task, patch pyramid with the original request | ||||
that created the task and also add the user to the context. | ||||
""" | ||||
def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, | ||||
link=None, link_error=None, shadow=None, **options): | ||||
""" queue the job to run (we are in web request context here) """ | ||||
r4878 | from rhodecode.lib.base import get_ip_addr | |||
r2359 | ||||
r4878 | req = self.app.conf['PYRAMID_REQUEST'] | |||
if not req: | ||||
raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key') | ||||
r4873 | ||||
r4871 | log.debug('Running Task with class: %s. Request Class: %s', | |||
self.__class__, req.__class__) | ||||
r2359 | ||||
r4878 | user_id = 0 | |||
r4873 | ||||
r2359 | # web case | |||
if hasattr(req, 'user'): | ||||
user_id = req.user.user_id | ||||
# api case | ||||
elif hasattr(req, 'rpc_user'): | ||||
user_id = req.rpc_user.user_id | ||||
r4873 | ||||
r4878 | # we hook into kwargs since it is the only way to pass our data to | |||
# the celery worker | ||||
environ = maybe_prepare_env(req) | ||||
options['headers'] = options.get('headers', {}) | ||||
options['headers'].update({ | ||||
'rhodecode_proxy_data': { | ||||
'environ': environ, | ||||
'auth_user': { | ||||
'ip_addr': get_ip_addr(req.environ), | ||||
'user_id': user_id | ||||
}, | ||||
} | ||||
}) | ||||
r2359 | ||||
return super(RequestContextTask, self).apply_async( | ||||
args, kwargs, task_id, producer, link, link_error, shadow, **options) | ||||