__init__.py
92 lines
| 3.2 KiB
| text/x-python
|
PythonLexer
r5088 | # Copyright (C) 2010-2023 RhodeCode GmbH | |||
r1 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import socket | ||||
import logging | ||||
import rhodecode | ||||
r2359 | from zope.cachedescriptors.property import Lazy as LazyProperty | |||
from rhodecode.lib.celerylib.loader import ( | ||||
celery_app, RequestContextTask, get_logger) | ||||
r4792 | from rhodecode.lib.statsd_client import StatsdClient | |||
r1 | ||||
r2359 | async_task = celery_app.task | |||
r1 | ||||
log = logging.getLogger(__name__) | ||||
class ResultWrapper(object): | ||||
def __init__(self, task): | ||||
self.task = task | ||||
@LazyProperty | ||||
def result(self): | ||||
return self.task | ||||
def run_task(task, *args, **kwargs): | ||||
r4875 | import celery | |||
r4792 | log.debug('Got task `%s` for execution, celery mode enabled:%s', task, rhodecode.CELERY_ENABLED) | |||
r4735 | if task is None: | |||
r5137 | raise ValueError(f'Got non-existing task: {task} for execution') | |||
r4735 | ||||
r4875 | allow_async = True | |||
# if we're already in a celery task, don't allow async execution again | ||||
# e.g task within task | ||||
in_task = celery.current_task | ||||
if in_task: | ||||
log.debug('This task in in context of another task: %s, not allowing another async execution', in_task) | ||||
allow_async = False | ||||
if kwargs.pop('allow_subtask', False): | ||||
log.debug('Forced async by allow_async=True flag') | ||||
allow_async = True | ||||
r4792 | ||||
r4817 | t = None | |||
r4875 | if rhodecode.CELERY_ENABLED and allow_async: | |||
r4891 | ||||
r1 | try: | |||
t = task.apply_async(args=args, kwargs=kwargs) | ||||
r2359 | log.debug('executing task %s:%s in async mode', t.task_id, task) | |||
r1 | except socket.error as e: | |||
if isinstance(e, IOError) and e.errno == 111: | ||||
r2412 | log.error('Unable to connect to celeryd `%s`. Sync execution', e) | |||
r1 | else: | |||
r266 | log.exception("Exception while connecting to celeryd.") | |||
r1 | except KeyError as e: | |||
r2412 | log.error('Unable to connect to celeryd `%s`. Sync execution', e) | |||
r1 | except Exception as e: | |||
log.exception( | ||||
"Exception while trying to run task asynchronous. " | ||||
"Fallback to sync execution.") | ||||
r315 | ||||
r265 | else: | |||
r2359 | log.debug('executing task %s:%s in sync mode', 'TASK', task) | |||
r4892 | statsd = StatsdClient.statsd | |||
if statsd: | ||||
task_repr = getattr(task, 'name', task) | ||||
statsd.incr('rhodecode_celery_task_total', tags=[ | ||||
r5095 | f'task:{task_repr}', | |||
r4892 | 'mode:sync' | |||
]) | ||||
r1 | ||||
r4807 | # we got async task, return it after statsd call | |||
if t: | ||||
return t | ||||
r2359 | return ResultWrapper(task(*args, **kwargs)) | |||