##// END OF EJS Templates
fixed Python2.5 socket error
marcink -
r1414:4f2c4fcc beta
parent child Browse files
Show More
@@ -1,109 +1,109 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """
2 """
3 rhodecode.lib.celerylib.__init__
3 rhodecode.lib.celerylib.__init__
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5
5
6 celery libs for RhodeCode
6 celery libs for RhodeCode
7
7
8 :created_on: Nov 27, 2010
8 :created_on: Nov 27, 2010
9 :author: marcink
9 :author: marcink
10 :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
10 :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
11 :license: GPLv3, see COPYING for more details.
11 :license: GPLv3, see COPYING for more details.
12 """
12 """
13 # This program is free software: you can redistribute it and/or modify
13 # This program is free software: you can redistribute it and/or modify
14 # it under the terms of the GNU General Public License as published by
14 # it under the terms of the GNU General Public License as published by
15 # the Free Software Foundation, either version 3 of the License, or
15 # the Free Software Foundation, either version 3 of the License, or
16 # (at your option) any later version.
16 # (at your option) any later version.
17 #
17 #
18 # This program is distributed in the hope that it will be useful,
18 # This program is distributed in the hope that it will be useful,
19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 # GNU General Public License for more details.
21 # GNU General Public License for more details.
22 #
22 #
23 # You should have received a copy of the GNU General Public License
23 # You should have received a copy of the GNU General Public License
24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25
25
26 import os
26 import os
27 import sys
27 import sys
28 import socket
28 import socket
29 import traceback
29 import traceback
30 import logging
30 import logging
31 from os.path import dirname as dn, join as jn
31 from os.path import dirname as dn, join as jn
32
32
33 from hashlib import md5
33 from hashlib import md5
34 from decorator import decorator
34 from decorator import decorator
35 from pylons import config
35 from pylons import config
36
36
37 from vcs.utils.lazy import LazyProperty
37 from vcs.utils.lazy import LazyProperty
38
38
39 from rhodecode.lib import str2bool
39 from rhodecode.lib import str2bool
40 from rhodecode.lib.pidlock import DaemonLock, LockHeld
40 from rhodecode.lib.pidlock import DaemonLock, LockHeld
41
41
42 from celery.messaging import establish_connection
42 from celery.messaging import establish_connection
43
43
44
44
45 log = logging.getLogger(__name__)
45 log = logging.getLogger(__name__)
46
46
47 try:
47 try:
48 CELERY_ON = str2bool(config['app_conf'].get('use_celery'))
48 CELERY_ON = str2bool(config['app_conf'].get('use_celery'))
49 except KeyError:
49 except KeyError:
50 CELERY_ON = False
50 CELERY_ON = False
51
51
52
52
53 class ResultWrapper(object):
53 class ResultWrapper(object):
54 def __init__(self, task):
54 def __init__(self, task):
55 self.task = task
55 self.task = task
56
56
57 @LazyProperty
57 @LazyProperty
58 def result(self):
58 def result(self):
59 return self.task
59 return self.task
60
60
61
61
62 def run_task(task, *args, **kwargs):
62 def run_task(task, *args, **kwargs):
63 if CELERY_ON:
63 if CELERY_ON:
64 try:
64 try:
65 t = task.apply_async(args=args, kwargs=kwargs)
65 t = task.apply_async(args=args, kwargs=kwargs)
66 log.info('running task %s:%s', t.task_id, task)
66 log.info('running task %s:%s', t.task_id, task)
67 return t
67 return t
68
68
69 except socket.error, e:
69 except socket.error, e:
70 if e.errno == 111:
70 if isinstance(e, IOError) and e.errno == 111:
71 log.debug('Unable to connect to celeryd. Sync execution')
71 log.debug('Unable to connect to celeryd. Sync execution')
72 else:
72 else:
73 log.error(traceback.format_exc())
73 log.error(traceback.format_exc())
74 except KeyError, e:
74 except KeyError, e:
75 log.debug('Unable to connect to celeryd. Sync execution')
75 log.debug('Unable to connect to celeryd. Sync execution')
76 except Exception, e:
76 except Exception, e:
77 log.error(traceback.format_exc())
77 log.error(traceback.format_exc())
78
78
79 log.debug('executing task %s in sync mode', task)
79 log.debug('executing task %s in sync mode', task)
80 return ResultWrapper(task(*args, **kwargs))
80 return ResultWrapper(task(*args, **kwargs))
81
81
82
82
83 def __get_lockkey(func, *fargs, **fkwargs):
83 def __get_lockkey(func, *fargs, **fkwargs):
84 params = list(fargs)
84 params = list(fargs)
85 params.extend(['%s-%s' % ar for ar in fkwargs.items()])
85 params.extend(['%s-%s' % ar for ar in fkwargs.items()])
86
86
87 func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
87 func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
88
88
89 lockkey = 'task_%s.lock' % \
89 lockkey = 'task_%s.lock' % \
90 md5(func_name + '-' + '-'.join(map(str, params))).hexdigest()
90 md5(func_name + '-' + '-'.join(map(str, params))).hexdigest()
91 return lockkey
91 return lockkey
92
92
93
93
94 def locked_task(func):
94 def locked_task(func):
95 def __wrapper(func, *fargs, **fkwargs):
95 def __wrapper(func, *fargs, **fkwargs):
96 lockkey = __get_lockkey(func, *fargs, **fkwargs)
96 lockkey = __get_lockkey(func, *fargs, **fkwargs)
97 lockkey_path = dn(dn(dn(os.path.abspath(__file__))))
97 lockkey_path = dn(dn(dn(os.path.abspath(__file__))))
98
98
99 log.info('running task with lockkey %s', lockkey)
99 log.info('running task with lockkey %s', lockkey)
100 try:
100 try:
101 l = DaemonLock(jn(lockkey_path, lockkey))
101 l = DaemonLock(jn(lockkey_path, lockkey))
102 ret = func(*fargs, **fkwargs)
102 ret = func(*fargs, **fkwargs)
103 l.release()
103 l.release()
104 return ret
104 return ret
105 except LockHeld:
105 except LockHeld:
106 log.info('LockHeld')
106 log.info('LockHeld')
107 return 'Task with key %s already running' % lockkey
107 return 'Task with key %s already running' % lockkey
108
108
109 return decorator(__wrapper, func)
109 return decorator(__wrapper, func)
General Comments 0
You need to be logged in to leave comments. Login now