from rhodecode.lib.pidlock import DaemonLock, LockHeld from vcs.utils.lazy import LazyProperty from decorator import decorator import logging import os import sys import traceback from hashlib import md5 import socket log = logging.getLogger(__name__) class ResultWrapper(object): def __init__(self, task): self.task = task @LazyProperty def result(self): return self.task def run_task(task, *args, **kwargs): try: t = task.delay(*args, **kwargs) log.info('running task %s', t.task_id) return t except socket.error, e: try: conn_failed = e.errno == 111 except AttributeError: conn_failed = False if conn_failed: log.debug('Unable to connect to celeryd. Sync execution') else: log.debug('Unable to connect to celeryd. Sync execution') except KeyError, e: log.debug('Unable to connect to celeryd. Sync execution') except Exception, e: log.error(traceback.format_exc()) return ResultWrapper(task(*args, **kwargs)) def locked_task(func): def __wrapper(func, *fargs, **fkwargs): params = list(fargs) params.extend(['%s-%s' % ar for ar in fkwargs.items()]) lockkey = 'task_%s' % \ md5(str(func.__name__) + '-' + \ '-'.join(map(str, params))).hexdigest() log.info('running task with lockkey %s', lockkey) try: l = DaemonLock(lockkey) ret = func(*fargs, **fkwargs) l.release() return ret except LockHeld: log.info('LockHeld') return 'Task with key %s already running' % lockkey return decorator(__wrapper, func)