##// END OF EJS Templates
Celery is configured by the .ini files and run from paster now...
Celery is configured by the .ini files and run from paster now removed celeryconfig, added homebrew celery-pylons, added paster celeryd command, fixed tasks to use pylons configs, sqlalchemy sessions

File last commit:

r776:f6c613fb beta
r776:f6c613fb beta
Show More
__init__.py
74 lines | 1.9 KiB | text/x-python | PythonLexer
Celery is configured by the .ini files and run from paster now...
r776 import os
import sys
import socket
import traceback
import logging
renamed project to rhodecode
r547 from rhodecode.lib.pidlock import DaemonLock, LockHeld
from vcs.utils.lazy import LazyProperty
from decorator import decorator
from hashlib import md5
Celery is configured by the .ini files and run from paster now...
r776 from pylons import config
renamed project to rhodecode
r547 log = logging.getLogger(__name__)
Celery is configured by the .ini files and run from paster now...
r776 def str2bool(v):
return v.lower() in ["yes", "true", "t", "1"] if v else None
CELERY_ON = str2bool(config['app_conf'].get('use_celery'))
renamed project to rhodecode
r547 class ResultWrapper(object):
def __init__(self, task):
self.task = task
Celery is configured by the .ini files and run from paster now...
r776
renamed project to rhodecode
r547 @LazyProperty
def result(self):
return self.task
def run_task(task, *args, **kwargs):
Celery is configured by the .ini files and run from paster now...
r776 if CELERY_ON:
try:
t = task.delay(*args, **kwargs)
log.info('running task %s:%s', t.task_id, task)
return t
except socket.error, e:
if e.errno == 111:
log.debug('Unable to connect to celeryd. Sync execution')
else:
log.error(traceback.format_exc())
except KeyError, e:
log.debug('Unable to connect to celeryd. Sync execution')
except Exception, e:
log.error(traceback.format_exc())
log.debug('executing task %s in sync mode', task)
more error catching on celery run_task
r558 return ResultWrapper(task(*args, **kwargs))
renamed project to rhodecode
r547
def locked_task(func):
def __wrapper(func, *fargs, **fkwargs):
params = list(fargs)
params.extend(['%s-%s' % ar for ar in fkwargs.items()])
Celery is configured by the .ini files and run from paster now...
r776
renamed project to rhodecode
r547 lockkey = 'task_%s' % \
md5(str(func.__name__) + '-' + \
'-'.join(map(str, params))).hexdigest()
log.info('running task with lockkey %s', lockkey)
try:
l = DaemonLock(lockkey)
ret = func(*fargs, **fkwargs)
l.release()
return ret
except LockHeld:
log.info('LockHeld')
Celery is configured by the .ini files and run from paster now...
r776 return 'Task with key %s already running' % lockkey
renamed project to rhodecode
r547
Celery is configured by the .ini files and run from paster now...
r776 return decorator(__wrapper, func)
renamed project to rhodecode
r547
Celery is configured by the .ini files and run from paster now...
r776