##// END OF EJS Templates
celery: set global CELERY_ENABLED on any connection error
dan -
r327:cf2d92d8 stable
parent child Browse files
Show More
@@ -1,141 +1,147 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 """
20 """
21 celery libs for RhodeCode
21 celery libs for RhodeCode
22 """
22 """
23
23
24
24
25 import socket
25 import socket
26 import logging
26 import logging
27
27
28 import rhodecode
28 import rhodecode
29
29
30 from os.path import join as jn
30 from os.path import join as jn
31 from pylons import config
31 from pylons import config
32
32
33 from decorator import decorator
33 from decorator import decorator
34
34
35 from zope.cachedescriptors.property import Lazy as LazyProperty
35 from zope.cachedescriptors.property import Lazy as LazyProperty
36
36
37 from rhodecode.config import utils
37 from rhodecode.config import utils
38 from rhodecode.lib.utils2 import safe_str, md5_safe, aslist
38 from rhodecode.lib.utils2 import safe_str, md5_safe, aslist
39 from rhodecode.lib.pidlock import DaemonLock, LockHeld
39 from rhodecode.lib.pidlock import DaemonLock, LockHeld
40 from rhodecode.lib.vcs import connect_vcs
40 from rhodecode.lib.vcs import connect_vcs
41 from rhodecode.model import meta
41 from rhodecode.model import meta
42
42
43 log = logging.getLogger(__name__)
43 log = logging.getLogger(__name__)
44
44
45
45
46 class ResultWrapper(object):
46 class ResultWrapper(object):
47 def __init__(self, task):
47 def __init__(self, task):
48 self.task = task
48 self.task = task
49
49
50 @LazyProperty
50 @LazyProperty
51 def result(self):
51 def result(self):
52 return self.task
52 return self.task
53
53
54
54
55 def run_task(task, *args, **kwargs):
55 def run_task(task, *args, **kwargs):
56 if rhodecode.CELERY_ENABLED:
56 if rhodecode.CELERY_ENABLED:
57 celery_is_up = False
57 try:
58 try:
58 t = task.apply_async(args=args, kwargs=kwargs)
59 t = task.apply_async(args=args, kwargs=kwargs)
59 log.info('running task %s:%s', t.task_id, task)
60 log.info('running task %s:%s', t.task_id, task)
61 celery_is_up = True
60 return t
62 return t
61
63
62 except socket.error as e:
64 except socket.error as e:
63 if isinstance(e, IOError) and e.errno == 111:
65 if isinstance(e, IOError) and e.errno == 111:
64 log.error('Unable to connect to celeryd. Sync execution')
66 log.error('Unable to connect to celeryd. Sync execution')
65 rhodecode.CELERY_ENABLED = False
66 else:
67 else:
67 log.exception("Exception while connecting to celeryd.")
68 log.exception("Exception while connecting to celeryd.")
68 except KeyError as e:
69 except KeyError as e:
69 log.error('Unable to connect to celeryd. Sync execution')
70 log.error('Unable to connect to celeryd. Sync execution')
70 except Exception as e:
71 except Exception as e:
71 log.exception(
72 log.exception(
72 "Exception while trying to run task asynchronous. "
73 "Exception while trying to run task asynchronous. "
73 "Fallback to sync execution.")
74 "Fallback to sync execution.")
75
76 # keep in mind there maybe a subtle race condition where something
77 # depending on rhodecode.CELERY_ENABLED such as @dbsession decorator
78 # will see CELERY_ENABLED as True before this has a chance to set False
79 rhodecode.CELERY_ENABLED = celery_is_up
74 else:
80 else:
75 log.debug('executing task %s in sync mode', task)
81 log.debug('executing task %s in sync mode', task)
76 return ResultWrapper(task(*args, **kwargs))
82 return ResultWrapper(task(*args, **kwargs))
77
83
78
84
79 def __get_lockkey(func, *fargs, **fkwargs):
85 def __get_lockkey(func, *fargs, **fkwargs):
80 params = list(fargs)
86 params = list(fargs)
81 params.extend(['%s-%s' % ar for ar in fkwargs.items()])
87 params.extend(['%s-%s' % ar for ar in fkwargs.items()])
82
88
83 func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
89 func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
84 _lock_key = func_name + '-' + '-'.join(map(safe_str, params))
90 _lock_key = func_name + '-' + '-'.join(map(safe_str, params))
85 return 'task_%s.lock' % (md5_safe(_lock_key),)
91 return 'task_%s.lock' % (md5_safe(_lock_key),)
86
92
87
93
88 def locked_task(func):
94 def locked_task(func):
89 def __wrapper(func, *fargs, **fkwargs):
95 def __wrapper(func, *fargs, **fkwargs):
90 lockkey = __get_lockkey(func, *fargs, **fkwargs)
96 lockkey = __get_lockkey(func, *fargs, **fkwargs)
91 lockkey_path = config['app_conf']['cache_dir']
97 lockkey_path = config['app_conf']['cache_dir']
92
98
93 log.info('running task with lockkey %s' % lockkey)
99 log.info('running task with lockkey %s' % lockkey)
94 try:
100 try:
95 l = DaemonLock(file_=jn(lockkey_path, lockkey))
101 l = DaemonLock(file_=jn(lockkey_path, lockkey))
96 ret = func(*fargs, **fkwargs)
102 ret = func(*fargs, **fkwargs)
97 l.release()
103 l.release()
98 return ret
104 return ret
99 except LockHeld:
105 except LockHeld:
100 log.info('LockHeld')
106 log.info('LockHeld')
101 return 'Task with key %s already running' % lockkey
107 return 'Task with key %s already running' % lockkey
102
108
103 return decorator(__wrapper, func)
109 return decorator(__wrapper, func)
104
110
105
111
106 def get_session():
112 def get_session():
107 if rhodecode.CELERY_ENABLED:
113 if rhodecode.CELERY_ENABLED:
108 utils.initialize_database(config)
114 utils.initialize_database(config)
109 sa = meta.Session()
115 sa = meta.Session()
110 return sa
116 return sa
111
117
112
118
113 def dbsession(func):
119 def dbsession(func):
114 def __wrapper(func, *fargs, **fkwargs):
120 def __wrapper(func, *fargs, **fkwargs):
115 try:
121 try:
116 ret = func(*fargs, **fkwargs)
122 ret = func(*fargs, **fkwargs)
117 return ret
123 return ret
118 finally:
124 finally:
119 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
125 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
120 meta.Session.remove()
126 meta.Session.remove()
121
127
122 return decorator(__wrapper, func)
128 return decorator(__wrapper, func)
123
129
124
130
125 def vcsconnection(func):
131 def vcsconnection(func):
126 def __wrapper(func, *fargs, **fkwargs):
132 def __wrapper(func, *fargs, **fkwargs):
127 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
133 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
128 backends = config['vcs.backends'] = aslist(
134 backends = config['vcs.backends'] = aslist(
129 config.get('vcs.backends', 'hg,git'), sep=',')
135 config.get('vcs.backends', 'hg,git'), sep=',')
130 for alias in rhodecode.BACKENDS.keys():
136 for alias in rhodecode.BACKENDS.keys():
131 if alias not in backends:
137 if alias not in backends:
132 del rhodecode.BACKENDS[alias]
138 del rhodecode.BACKENDS[alias]
133 utils.configure_pyro4(config)
139 utils.configure_pyro4(config)
134 utils.configure_vcs(config)
140 utils.configure_vcs(config)
135 connect_vcs(
141 connect_vcs(
136 config['vcs.server'],
142 config['vcs.server'],
137 utils.get_vcs_server_protocol(config))
143 utils.get_vcs_server_protocol(config))
138 ret = func(*fargs, **fkwargs)
144 ret = func(*fargs, **fkwargs)
139 return ret
145 return ret
140
146
141 return decorator(__wrapper, func)
147 return decorator(__wrapper, func)
General Comments 0
You need to be logged in to leave comments. Login now