__init__.py
84 lines
| 2.8 KiB
| text/x-python
|
PythonLexer
r1 | # -*- coding: utf-8 -*- | |||
r4306 | # Copyright (C) 2010-2020 RhodeCode GmbH | |||
r1 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import socket | ||||
import logging | ||||
import rhodecode | ||||
r2359 | from zope.cachedescriptors.property import Lazy as LazyProperty | |||
from rhodecode.lib.celerylib.loader import ( | ||||
celery_app, RequestContextTask, get_logger) | ||||
r4792 | from rhodecode.lib.statsd_client import StatsdClient | |||
r1 | ||||
r2359 | async_task = celery_app.task | |||
r1 | ||||
log = logging.getLogger(__name__) | ||||
class ResultWrapper(object): | ||||
def __init__(self, task): | ||||
self.task = task | ||||
@LazyProperty | ||||
def result(self): | ||||
return self.task | ||||
def run_task(task, *args, **kwargs): | ||||
r4792 | log.debug('Got task `%s` for execution, celery mode enabled:%s', task, rhodecode.CELERY_ENABLED) | |||
r4735 | if task is None: | |||
raise ValueError('Got non-existing task for execution') | ||||
r4792 | exec_mode = 'sync' | |||
r4817 | t = None | |||
r265 | if rhodecode.CELERY_ENABLED: | |||
r1 | try: | |||
t = task.apply_async(args=args, kwargs=kwargs) | ||||
r2359 | log.debug('executing task %s:%s in async mode', t.task_id, task) | |||
r4792 | exec_mode = 'async' | |||
r1 | except socket.error as e: | |||
if isinstance(e, IOError) and e.errno == 111: | ||||
r2412 | log.error('Unable to connect to celeryd `%s`. Sync execution', e) | |||
r1 | else: | |||
r266 | log.exception("Exception while connecting to celeryd.") | |||
r1 | except KeyError as e: | |||
r2412 | log.error('Unable to connect to celeryd `%s`. Sync execution', e) | |||
r1 | except Exception as e: | |||
log.exception( | ||||
"Exception while trying to run task asynchronous. " | ||||
"Fallback to sync execution.") | ||||
r315 | ||||
r265 | else: | |||
r2359 | log.debug('executing task %s:%s in sync mode', 'TASK', task) | |||
r1 | ||||
r4807 | statsd = StatsdClient.statsd | |||
r4792 | if statsd: | |||
r4807 | task_repr = getattr(task, 'name', task) | |||
r4803 | statsd.incr('rhodecode_celery_task_total', tags=[ | |||
r4807 | 'task:{}'.format(task_repr), | |||
r4792 | 'mode:{}'.format(exec_mode) | |||
]) | ||||
r4807 | ||||
# we got async task, return it after statsd call | ||||
if t: | ||||
return t | ||||
r2359 | return ResultWrapper(task(*args, **kwargs)) | |||