##// END OF EJS Templates
celery: set global CELERY_ENABLED on any connection error
dan -
r327:cf2d92d8 stable
parent child Browse files
Show More
@@ -1,141 +1,147 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 """
21 21 celery libs for RhodeCode
22 22 """
23 23
24 24
25 25 import socket
26 26 import logging
27 27
28 28 import rhodecode
29 29
30 30 from os.path import join as jn
31 31 from pylons import config
32 32
33 33 from decorator import decorator
34 34
35 35 from zope.cachedescriptors.property import Lazy as LazyProperty
36 36
37 37 from rhodecode.config import utils
38 38 from rhodecode.lib.utils2 import safe_str, md5_safe, aslist
39 39 from rhodecode.lib.pidlock import DaemonLock, LockHeld
40 40 from rhodecode.lib.vcs import connect_vcs
41 41 from rhodecode.model import meta
42 42
43 43 log = logging.getLogger(__name__)
44 44
45 45
46 46 class ResultWrapper(object):
47 47 def __init__(self, task):
48 48 self.task = task
49 49
50 50 @LazyProperty
51 51 def result(self):
52 52 return self.task
53 53
54 54
55 55 def run_task(task, *args, **kwargs):
56 56 if rhodecode.CELERY_ENABLED:
57 celery_is_up = False
57 58 try:
58 59 t = task.apply_async(args=args, kwargs=kwargs)
59 60 log.info('running task %s:%s', t.task_id, task)
61 celery_is_up = True
60 62 return t
61 63
62 64 except socket.error as e:
63 65 if isinstance(e, IOError) and e.errno == 111:
64 66 log.error('Unable to connect to celeryd. Sync execution')
65 rhodecode.CELERY_ENABLED = False
66 67 else:
67 68 log.exception("Exception while connecting to celeryd.")
68 69 except KeyError as e:
69 70 log.error('Unable to connect to celeryd. Sync execution')
70 71 except Exception as e:
71 72 log.exception(
72 73 "Exception while trying to run task asynchronous. "
73 74 "Fallback to sync execution.")
75
76 # keep in mind there maybe a subtle race condition where something
77 # depending on rhodecode.CELERY_ENABLED such as @dbsession decorator
78 # will see CELERY_ENABLED as True before this has a chance to set False
79 rhodecode.CELERY_ENABLED = celery_is_up
74 80 else:
75 81 log.debug('executing task %s in sync mode', task)
76 82 return ResultWrapper(task(*args, **kwargs))
77 83
78 84
79 85 def __get_lockkey(func, *fargs, **fkwargs):
80 86 params = list(fargs)
81 87 params.extend(['%s-%s' % ar for ar in fkwargs.items()])
82 88
83 89 func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
84 90 _lock_key = func_name + '-' + '-'.join(map(safe_str, params))
85 91 return 'task_%s.lock' % (md5_safe(_lock_key),)
86 92
87 93
88 94 def locked_task(func):
89 95 def __wrapper(func, *fargs, **fkwargs):
90 96 lockkey = __get_lockkey(func, *fargs, **fkwargs)
91 97 lockkey_path = config['app_conf']['cache_dir']
92 98
93 99 log.info('running task with lockkey %s' % lockkey)
94 100 try:
95 101 l = DaemonLock(file_=jn(lockkey_path, lockkey))
96 102 ret = func(*fargs, **fkwargs)
97 103 l.release()
98 104 return ret
99 105 except LockHeld:
100 106 log.info('LockHeld')
101 107 return 'Task with key %s already running' % lockkey
102 108
103 109 return decorator(__wrapper, func)
104 110
105 111
106 112 def get_session():
107 113 if rhodecode.CELERY_ENABLED:
108 114 utils.initialize_database(config)
109 115 sa = meta.Session()
110 116 return sa
111 117
112 118
113 119 def dbsession(func):
114 120 def __wrapper(func, *fargs, **fkwargs):
115 121 try:
116 122 ret = func(*fargs, **fkwargs)
117 123 return ret
118 124 finally:
119 125 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
120 126 meta.Session.remove()
121 127
122 128 return decorator(__wrapper, func)
123 129
124 130
125 131 def vcsconnection(func):
126 132 def __wrapper(func, *fargs, **fkwargs):
127 133 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
128 134 backends = config['vcs.backends'] = aslist(
129 135 config.get('vcs.backends', 'hg,git'), sep=',')
130 136 for alias in rhodecode.BACKENDS.keys():
131 137 if alias not in backends:
132 138 del rhodecode.BACKENDS[alias]
133 139 utils.configure_pyro4(config)
134 140 utils.configure_vcs(config)
135 141 connect_vcs(
136 142 config['vcs.server'],
137 143 utils.get_vcs_server_protocol(config))
138 144 ret = func(*fargs, **fkwargs)
139 145 return ret
140 146
141 147 return decorator(__wrapper, func)
General Comments 0
You need to be logged in to leave comments. Login now