__init__.py
109 lines
| 3.2 KiB
| text/x-python
|
PythonLexer
r783 | # -*- coding: utf-8 -*- | |||
""" | ||||
r903 | rhodecode.lib.celerylib.__init__ | |||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | ||||
r783 | ||||
celery libs for RhodeCode | ||||
r1203 | ||||
r783 | :created_on: Nov 27, 2010 | |||
:author: marcink | ||||
r1203 | :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com> | |||
r783 | :license: GPLv3, see COPYING for more details. | |||
""" | ||||
r1206 | # This program is free software: you can redistribute it and/or modify | |||
# it under the terms of the GNU General Public License as published by | ||||
# the Free Software Foundation, either version 3 of the License, or | ||||
# (at your option) any later version. | ||||
r1203 | # | |||
r783 | # This program is distributed in the hope that it will be useful, | |||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
r1203 | # | |||
r783 | # You should have received a copy of the GNU General Public License | |||
r1206 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |||
r783 | ||||
r776 | import os | |||
import sys | ||||
import socket | ||||
import traceback | ||||
import logging | ||||
r1354 | from os.path import dirname as dn, join as jn | |||
r776 | ||||
r783 | from hashlib import md5 | |||
from decorator import decorator | ||||
r1082 | from pylons import config | |||
r547 | from vcs.utils.lazy import LazyProperty | |||
r783 | ||||
r1641 | from rhodecode.lib import str2bool, safe_str | |||
r783 | from rhodecode.lib.pidlock import DaemonLock, LockHeld | |||
r1003 | from celery.messaging import establish_connection | |||
r1082 | ||||
r776 | ||||
r547 | log = logging.getLogger(__name__) | |||
r783 | try: | |||
CELERY_ON = str2bool(config['app_conf'].get('use_celery')) | ||||
except KeyError: | ||||
CELERY_ON = False | ||||
r776 | ||||
r1264 | ||||
r547 | class ResultWrapper(object): | |||
def __init__(self, task): | ||||
self.task = task | ||||
r776 | ||||
r547 | @LazyProperty | |||
def result(self): | ||||
return self.task | ||||
r1264 | ||||
r547 | def run_task(task, *args, **kwargs): | |||
r776 | if CELERY_ON: | |||
try: | ||||
r1004 | t = task.apply_async(args=args, kwargs=kwargs) | |||
r776 | log.info('running task %s:%s', t.task_id, task) | |||
return t | ||||
r1264 | ||||
r776 | except socket.error, e: | |||
r1414 | if isinstance(e, IOError) and e.errno == 111: | |||
r776 | log.debug('Unable to connect to celeryd. Sync execution') | |||
else: | ||||
log.error(traceback.format_exc()) | ||||
except KeyError, e: | ||||
log.debug('Unable to connect to celeryd. Sync execution') | ||||
except Exception, e: | ||||
log.error(traceback.format_exc()) | ||||
log.debug('executing task %s in sync mode', task) | ||||
r558 | return ResultWrapper(task(*args, **kwargs)) | |||
r547 | ||||
r1264 | def __get_lockkey(func, *fargs, **fkwargs): | |||
params = list(fargs) | ||||
params.extend(['%s-%s' % ar for ar in fkwargs.items()]) | ||||
func_name = str(func.__name__) if hasattr(func, '__name__') else str(func) | ||||
r1354 | lockkey = 'task_%s.lock' % \ | |||
r1641 | md5(func_name + '-' + '-'.join(map(safe_str, params))).hexdigest() | |||
r1264 | return lockkey | |||
r547 | def locked_task(func): | |||
def __wrapper(func, *fargs, **fkwargs): | ||||
r1264 | lockkey = __get_lockkey(func, *fargs, **fkwargs) | |||
r1540 | lockkey_path = config['here'] | |||
r1354 | ||||
r547 | log.info('running task with lockkey %s', lockkey) | |||
try: | ||||
r1540 | l = DaemonLock(file_=jn(lockkey_path, lockkey)) | |||
r547 | ret = func(*fargs, **fkwargs) | |||
l.release() | ||||
return ret | ||||
except LockHeld: | ||||
log.info('LockHeld') | ||||
r776 | return 'Task with key %s already running' % lockkey | |||
r547 | ||||
r776 | return decorator(__wrapper, func) | |||