# Copyright (C) 2010-2023 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import socket import logging import rhodecode from zope.cachedescriptors.property import Lazy as LazyProperty from rhodecode.lib.celerylib.loader import ( celery_app, RequestContextTask, get_logger) from rhodecode.lib.statsd_client import StatsdClient async_task = celery_app.task log = logging.getLogger(__name__) class ResultWrapper(object): def __init__(self, task): self.task = task @LazyProperty def result(self): return self.task def run_task(task, *args, **kwargs): import celery log.debug('Got task `%s` for execution, celery mode enabled:%s', task, rhodecode.CELERY_ENABLED) if task is None: raise ValueError(f'Got non-existing task: {task} for execution') exec_mode = 'sync' allow_async = True # if we're already in a celery task, don't allow async execution again # e.g task within task in_task = celery.current_task if in_task: log.debug('This task in in context of another task: %s, not allowing another async execution', in_task) allow_async = False if kwargs.pop('allow_subtask', False): log.debug('Forced async by allow_async=True flag') allow_async = True t = None if rhodecode.CELERY_ENABLED and allow_async: try: t = task.apply_async(args=args, kwargs=kwargs) log.debug('executing task %s:%s in async mode', t.task_id, task) except socket.error as e: if isinstance(e, IOError) and e.errno == 111: log.error('Unable to connect to celeryd `%s`. Sync execution', e) else: log.exception("Exception while connecting to celeryd.") except KeyError as e: log.error('Unable to connect to celeryd `%s`. Sync execution', e) except Exception as e: log.exception( "Exception while trying to run task asynchronous. " "Fallback to sync execution.") else: log.debug('executing task %s:%s in sync mode', 'TASK', task) statsd = StatsdClient.statsd if statsd: task_repr = getattr(task, 'name', task) statsd.incr('rhodecode_celery_task_total', tags=[ f'task:{task_repr}', 'mode:sync' ]) # we got async task, return it after statsd call if t: return t return ResultWrapper(task(*args, **kwargs))