##// END OF EJS Templates
Another better solution for establishing connection with messaging broker in celery....
marcink -
r1003:9037456b beta
parent child Browse files
Show More
@@ -38,6 +38,7 b' from vcs.utils.lazy import LazyProperty'
38 from rhodecode.lib import str2bool
38 from rhodecode.lib import str2bool
39 from rhodecode.lib.pidlock import DaemonLock, LockHeld
39 from rhodecode.lib.pidlock import DaemonLock, LockHeld
40
40
41 from celery.messaging import establish_connection
41 from pylons import config
42 from pylons import config
42
43
43 log = logging.getLogger(__name__)
44 log = logging.getLogger(__name__)
@@ -58,7 +59,17 b' class ResultWrapper(object):'
58 def run_task(task, *args, **kwargs):
59 def run_task(task, *args, **kwargs):
59 if CELERY_ON:
60 if CELERY_ON:
60 try:
61 try:
61 t = task.delay(*args, **kwargs)
62 kw = {
63 'hostname':config['app_conf'].get('broker.host'),
64 'userid':config['app_conf'].get('broker.user'),
65 'password':config['app_conf'].get('broker.password'),
66 'virtual_host':config['app_conf'].get('broker.vhost'),
67 'port':config['app_conf'].get('broker.port'),
68 }
69 conn = establish_connection(**kw)
70 publisher = task.get_publisher(connection=conn)
71 t = task.apply_async(args=args, kwargs=kwargs, publisher=publisher)
72
62 log.info('running task %s:%s', t.task_id, task)
73 log.info('running task %s:%s', t.task_id, task)
63 return t
74 return t
64 except socket.error, e:
75 except socket.error, e:
1 NO CONTENT: file was removed
NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now