##// END OF EJS Templates
Another better solution for establishing connection with messaging broker in celery....
marcink -
r1003:9037456b beta
parent child Browse files
Show More
@@ -1,104 +1,115 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """
2 """
3 rhodecode.lib.celerylib.__init__
3 rhodecode.lib.celerylib.__init__
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5
5
6 celery libs for RhodeCode
6 celery libs for RhodeCode
7
7
8 :created_on: Nov 27, 2010
8 :created_on: Nov 27, 2010
9 :author: marcink
9 :author: marcink
10 :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
10 :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
11 :license: GPLv3, see COPYING for more details.
11 :license: GPLv3, see COPYING for more details.
12 """
12 """
13 # This program is free software; you can redistribute it and/or
13 # This program is free software; you can redistribute it and/or
14 # modify it under the terms of the GNU General Public License
14 # modify it under the terms of the GNU General Public License
15 # as published by the Free Software Foundation; version 2
15 # as published by the Free Software Foundation; version 2
16 # of the License or (at your opinion) any later version of the license.
16 # of the License or (at your opinion) any later version of the license.
17 #
17 #
18 # This program is distributed in the hope that it will be useful,
18 # This program is distributed in the hope that it will be useful,
19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 # GNU General Public License for more details.
21 # GNU General Public License for more details.
22 #
22 #
23 # You should have received a copy of the GNU General Public License
23 # You should have received a copy of the GNU General Public License
24 # along with this program; if not, write to the Free Software
24 # along with this program; if not, write to the Free Software
25 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
25 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
26 # MA 02110-1301, USA.
26 # MA 02110-1301, USA.
27
27
28 import os
28 import os
29 import sys
29 import sys
30 import socket
30 import socket
31 import traceback
31 import traceback
32 import logging
32 import logging
33
33
34 from hashlib import md5
34 from hashlib import md5
35 from decorator import decorator
35 from decorator import decorator
36 from vcs.utils.lazy import LazyProperty
36 from vcs.utils.lazy import LazyProperty
37
37
38 from rhodecode.lib import str2bool
38 from rhodecode.lib import str2bool
39 from rhodecode.lib.pidlock import DaemonLock, LockHeld
39 from rhodecode.lib.pidlock import DaemonLock, LockHeld
40
40
41 from celery.messaging import establish_connection
41 from pylons import config
42 from pylons import config
42
43
43 log = logging.getLogger(__name__)
44 log = logging.getLogger(__name__)
44
45
45 try:
46 try:
46 CELERY_ON = str2bool(config['app_conf'].get('use_celery'))
47 CELERY_ON = str2bool(config['app_conf'].get('use_celery'))
47 except KeyError:
48 except KeyError:
48 CELERY_ON = False
49 CELERY_ON = False
49
50
50 class ResultWrapper(object):
51 class ResultWrapper(object):
51 def __init__(self, task):
52 def __init__(self, task):
52 self.task = task
53 self.task = task
53
54
54 @LazyProperty
55 @LazyProperty
55 def result(self):
56 def result(self):
56 return self.task
57 return self.task
57
58
58 def run_task(task, *args, **kwargs):
59 def run_task(task, *args, **kwargs):
59 if CELERY_ON:
60 if CELERY_ON:
60 try:
61 try:
61 t = task.delay(*args, **kwargs)
62 kw = {
63 'hostname':config['app_conf'].get('broker.host'),
64 'userid':config['app_conf'].get('broker.user'),
65 'password':config['app_conf'].get('broker.password'),
66 'virtual_host':config['app_conf'].get('broker.vhost'),
67 'port':config['app_conf'].get('broker.port'),
68 }
69 conn = establish_connection(**kw)
70 publisher = task.get_publisher(connection=conn)
71 t = task.apply_async(args=args, kwargs=kwargs, publisher=publisher)
72
62 log.info('running task %s:%s', t.task_id, task)
73 log.info('running task %s:%s', t.task_id, task)
63 return t
74 return t
64 except socket.error, e:
75 except socket.error, e:
65 if e.errno == 111:
76 if e.errno == 111:
66 log.debug('Unable to connect to celeryd. Sync execution')
77 log.debug('Unable to connect to celeryd. Sync execution')
67 else:
78 else:
68 log.error(traceback.format_exc())
79 log.error(traceback.format_exc())
69 except KeyError, e:
80 except KeyError, e:
70 log.debug('Unable to connect to celeryd. Sync execution')
81 log.debug('Unable to connect to celeryd. Sync execution')
71 except Exception, e:
82 except Exception, e:
72 log.error(traceback.format_exc())
83 log.error(traceback.format_exc())
73
84
74 log.debug('executing task %s in sync mode', task)
85 log.debug('executing task %s in sync mode', task)
75 return ResultWrapper(task(*args, **kwargs))
86 return ResultWrapper(task(*args, **kwargs))
76
87
77
88
78 def locked_task(func):
89 def locked_task(func):
79 def __wrapper(func, *fargs, **fkwargs):
90 def __wrapper(func, *fargs, **fkwargs):
80 params = list(fargs)
91 params = list(fargs)
81 params.extend(['%s-%s' % ar for ar in fkwargs.items()])
92 params.extend(['%s-%s' % ar for ar in fkwargs.items()])
82
93
83 lockkey = 'task_%s' % \
94 lockkey = 'task_%s' % \
84 md5(str(func.__name__) + '-' + \
95 md5(str(func.__name__) + '-' + \
85 '-'.join(map(str, params))).hexdigest()
96 '-'.join(map(str, params))).hexdigest()
86 log.info('running task with lockkey %s', lockkey)
97 log.info('running task with lockkey %s', lockkey)
87 try:
98 try:
88 l = DaemonLock(lockkey)
99 l = DaemonLock(lockkey)
89 ret = func(*fargs, **fkwargs)
100 ret = func(*fargs, **fkwargs)
90 l.release()
101 l.release()
91 return ret
102 return ret
92 except LockHeld:
103 except LockHeld:
93 log.info('LockHeld')
104 log.info('LockHeld')
94 return 'Task with key %s already running' % lockkey
105 return 'Task with key %s already running' % lockkey
95
106
96 return decorator(__wrapper, func)
107 return decorator(__wrapper, func)
97
108
98
109
99
110
100
111
101
112
102
113
103
114
104
115
1 NO CONTENT: file was removed
NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now