##// END OF EJS Templates
redis: add logging about lock acquire, this should help in case of locked row debugging
marcink -
r3466:24601e96 default
parent child Browse files
Show More
@@ -1,205 +1,214 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2015-2019 RhodeCode GmbH
3 # Copyright (C) 2015-2019 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 import time
20 import time
21 import errno
21 import errno
22 import logging
22 import logging
23
23
24 import gevent
24 import gevent
25
25
26 from dogpile.cache.backends import memory as memory_backend
26 from dogpile.cache.backends import memory as memory_backend
27 from dogpile.cache.backends import file as file_backend
27 from dogpile.cache.backends import file as file_backend
28 from dogpile.cache.backends import redis as redis_backend
28 from dogpile.cache.backends import redis as redis_backend
29 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
29 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
30 from dogpile.cache.util import memoized_property
30 from dogpile.cache.util import memoized_property
31
31
32 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
32 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
33
33
34
34
35 _default_max_size = 1024
35 _default_max_size = 1024
36
36
37 log = logging.getLogger(__name__)
37 log = logging.getLogger(__name__)
38
38
39
39
40 class LRUMemoryBackend(memory_backend.MemoryBackend):
40 class LRUMemoryBackend(memory_backend.MemoryBackend):
41 pickle_values = False
41 pickle_values = False
42
42
43 def __init__(self, arguments):
43 def __init__(self, arguments):
44 max_size = arguments.pop('max_size', _default_max_size)
44 max_size = arguments.pop('max_size', _default_max_size)
45
45
46 LRUDictClass = LRUDict
46 LRUDictClass = LRUDict
47 if arguments.pop('log_key_count', None):
47 if arguments.pop('log_key_count', None):
48 LRUDictClass = LRUDictDebug
48 LRUDictClass = LRUDictDebug
49
49
50 arguments['cache_dict'] = LRUDictClass(max_size)
50 arguments['cache_dict'] = LRUDictClass(max_size)
51 super(LRUMemoryBackend, self).__init__(arguments)
51 super(LRUMemoryBackend, self).__init__(arguments)
52
52
53 def delete(self, key):
53 def delete(self, key):
54 try:
54 try:
55 del self._cache[key]
55 del self._cache[key]
56 except KeyError:
56 except KeyError:
57 # we don't care if key isn't there at deletion
57 # we don't care if key isn't there at deletion
58 pass
58 pass
59
59
60 def delete_multi(self, keys):
60 def delete_multi(self, keys):
61 for key in keys:
61 for key in keys:
62 self.delete(key)
62 self.delete(key)
63
63
64
64
65 class Serializer(object):
65 class Serializer(object):
66 def _dumps(self, value, safe=False):
66 def _dumps(self, value, safe=False):
67 try:
67 try:
68 return compat.pickle.dumps(value)
68 return compat.pickle.dumps(value)
69 except Exception:
69 except Exception:
70 if safe:
70 if safe:
71 return NO_VALUE
71 return NO_VALUE
72 else:
72 else:
73 raise
73 raise
74
74
75 def _loads(self, value, safe=True):
75 def _loads(self, value, safe=True):
76 try:
76 try:
77 return compat.pickle.loads(value)
77 return compat.pickle.loads(value)
78 except Exception:
78 except Exception:
79 if safe:
79 if safe:
80 return NO_VALUE
80 return NO_VALUE
81 else:
81 else:
82 raise
82 raise
83
83
84
84
85 import fcntl
85 import fcntl
86 flock_org = fcntl.flock
86 flock_org = fcntl.flock
87
87
88
88
89 class CustomLockFactory(FileLock):
89 class CustomLockFactory(FileLock):
90
90
91 @memoized_property
91 @memoized_property
92 def _module(self):
92 def _module(self):
93
93
94 def gevent_flock(fd, operation):
94 def gevent_flock(fd, operation):
95 """
95 """
96 Gevent compatible flock
96 Gevent compatible flock
97 """
97 """
98 # set non-blocking, this will cause an exception if we cannot acquire a lock
98 # set non-blocking, this will cause an exception if we cannot acquire a lock
99 operation |= fcntl.LOCK_NB
99 operation |= fcntl.LOCK_NB
100 start_lock_time = time.time()
100 start_lock_time = time.time()
101 timeout = 60 * 15 # 15min
101 timeout = 60 * 15 # 15min
102 while True:
102 while True:
103 try:
103 try:
104 flock_org(fd, operation)
104 flock_org(fd, operation)
105 # lock has been acquired
105 # lock has been acquired
106 break
106 break
107 except (OSError, IOError) as e:
107 except (OSError, IOError) as e:
108 # raise on other errors than Resource temporarily unavailable
108 # raise on other errors than Resource temporarily unavailable
109 if e.errno != errno.EAGAIN:
109 if e.errno != errno.EAGAIN:
110 raise
110 raise
111 elif (time.time() - start_lock_time) > timeout:
111 elif (time.time() - start_lock_time) > timeout:
112 # waited to much time on a lock, better fail than loop for ever
112 # waited to much time on a lock, better fail than loop for ever
113 log.error('Failed to acquire lock on `%s` after waiting %ss',
113 log.error('Failed to acquire lock on `%s` after waiting %ss',
114 self.filename, timeout)
114 self.filename, timeout)
115 raise
115 raise
116 wait_timeout = 0.03
116 wait_timeout = 0.03
117 log.debug('Failed to acquire lock on `%s`, retry in %ss',
117 log.debug('Failed to acquire lock on `%s`, retry in %ss',
118 self.filename, wait_timeout)
118 self.filename, wait_timeout)
119 gevent.sleep(wait_timeout)
119 gevent.sleep(wait_timeout)
120
120
121 fcntl.flock = gevent_flock
121 fcntl.flock = gevent_flock
122 return fcntl
122 return fcntl
123
123
124
124
125 class FileNamespaceBackend(Serializer, file_backend.DBMBackend):
125 class FileNamespaceBackend(Serializer, file_backend.DBMBackend):
126
126
127 def __init__(self, arguments):
127 def __init__(self, arguments):
128 arguments['lock_factory'] = CustomLockFactory
128 arguments['lock_factory'] = CustomLockFactory
129 super(FileNamespaceBackend, self).__init__(arguments)
129 super(FileNamespaceBackend, self).__init__(arguments)
130
130
131 def list_keys(self, prefix=''):
131 def list_keys(self, prefix=''):
132 def cond(v):
132 def cond(v):
133 if not prefix:
133 if not prefix:
134 return True
134 return True
135
135
136 if v.startswith(prefix):
136 if v.startswith(prefix):
137 return True
137 return True
138 return False
138 return False
139
139
140 with self._dbm_file(True) as dbm:
140 with self._dbm_file(True) as dbm:
141
141
142 return filter(cond, dbm.keys())
142 return filter(cond, dbm.keys())
143
143
144 def get_store(self):
144 def get_store(self):
145 return self.filename
145 return self.filename
146
146
147 def get(self, key):
147 def get(self, key):
148 with self._dbm_file(False) as dbm:
148 with self._dbm_file(False) as dbm:
149 if hasattr(dbm, 'get'):
149 if hasattr(dbm, 'get'):
150 value = dbm.get(key, NO_VALUE)
150 value = dbm.get(key, NO_VALUE)
151 else:
151 else:
152 # gdbm objects lack a .get method
152 # gdbm objects lack a .get method
153 try:
153 try:
154 value = dbm[key]
154 value = dbm[key]
155 except KeyError:
155 except KeyError:
156 value = NO_VALUE
156 value = NO_VALUE
157 if value is not NO_VALUE:
157 if value is not NO_VALUE:
158 value = self._loads(value)
158 value = self._loads(value)
159 return value
159 return value
160
160
161 def set(self, key, value):
161 def set(self, key, value):
162 with self._dbm_file(True) as dbm:
162 with self._dbm_file(True) as dbm:
163 dbm[key] = self._dumps(value)
163 dbm[key] = self._dumps(value)
164
164
165 def set_multi(self, mapping):
165 def set_multi(self, mapping):
166 with self._dbm_file(True) as dbm:
166 with self._dbm_file(True) as dbm:
167 for key, value in mapping.items():
167 for key, value in mapping.items():
168 dbm[key] = self._dumps(value)
168 dbm[key] = self._dumps(value)
169
169
170
170
171 class RedisPickleBackend(Serializer, redis_backend.RedisBackend):
171 class RedisPickleBackend(Serializer, redis_backend.RedisBackend):
172 def list_keys(self, prefix=''):
172 def list_keys(self, prefix=''):
173 if prefix:
173 if prefix:
174 prefix = prefix + '*'
174 prefix = prefix + '*'
175 return self.client.keys(prefix)
175 return self.client.keys(prefix)
176
176
177 def get_store(self):
177 def get_store(self):
178 return self.client.connection_pool
178 return self.client.connection_pool
179
179
180 def get(self, key):
180 def get(self, key):
181 value = self.client.get(key)
181 value = self.client.get(key)
182 if value is None:
182 if value is None:
183 return NO_VALUE
183 return NO_VALUE
184 return self._loads(value)
184 return self._loads(value)
185
185
186 def set(self, key, value):
186 def set(self, key, value):
187 if self.redis_expiration_time:
187 if self.redis_expiration_time:
188 self.client.setex(key, self.redis_expiration_time,
188 self.client.setex(key, self.redis_expiration_time,
189 self._dumps(value))
189 self._dumps(value))
190 else:
190 else:
191 self.client.set(key, self._dumps(value))
191 self.client.set(key, self._dumps(value))
192
192
193 def set_multi(self, mapping):
193 def set_multi(self, mapping):
194 mapping = dict(
194 mapping = dict(
195 (k, self._dumps(v))
195 (k, self._dumps(v))
196 for k, v in mapping.items()
196 for k, v in mapping.items()
197 )
197 )
198
198
199 if not self.redis_expiration_time:
199 if not self.redis_expiration_time:
200 self.client.mset(mapping)
200 self.client.mset(mapping)
201 else:
201 else:
202 pipe = self.client.pipeline()
202 pipe = self.client.pipeline()
203 for key, value in mapping.items():
203 for key, value in mapping.items():
204 pipe.setex(key, self.redis_expiration_time, value)
204 pipe.setex(key, self.redis_expiration_time, value)
205 pipe.execute()
205 pipe.execute()
206
207 def get_mutex(self, key):
208 u = redis_backend.u
209 if self.distributed_lock:
210 lock_key = u('_lock_{0}').format(key)
211 log.debug('Trying to acquire Redis lock for key %s', lock_key)
212 return self.client.lock(lock_key, self.lock_timeout, self.lock_sleep)
213 else:
214 return None
General Comments 0
You need to be logged in to leave comments. Login now