##// END OF EJS Templates
redis-lock: strict-redis is what we use.
super-admin -
r4708:0440eaa8 stable
parent child Browse files
Show More
@@ -1,311 +1,312 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import time
22 22 import errno
23 23 import logging
24 24
25 25 import msgpack
26 26 import gevent
27 27 import redis
28 28
29 29 from dogpile.cache.api import CachedValue
30 30 from dogpile.cache.backends import memory as memory_backend
31 31 from dogpile.cache.backends import file as file_backend
32 32 from dogpile.cache.backends import redis as redis_backend
33 33 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
34 34 from dogpile.cache.util import memoized_property
35 35
36 36 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
37 37
38 38
39 39 _default_max_size = 1024
40 40
41 41 log = logging.getLogger(__name__)
42 42
43 43
44 44 class LRUMemoryBackend(memory_backend.MemoryBackend):
45 45 key_prefix = 'lru_mem_backend'
46 46 pickle_values = False
47 47
48 48 def __init__(self, arguments):
49 49 max_size = arguments.pop('max_size', _default_max_size)
50 50
51 51 LRUDictClass = LRUDict
52 52 if arguments.pop('log_key_count', None):
53 53 LRUDictClass = LRUDictDebug
54 54
55 55 arguments['cache_dict'] = LRUDictClass(max_size)
56 56 super(LRUMemoryBackend, self).__init__(arguments)
57 57
58 58 def delete(self, key):
59 59 try:
60 60 del self._cache[key]
61 61 except KeyError:
62 62 # we don't care if key isn't there at deletion
63 63 pass
64 64
65 65 def delete_multi(self, keys):
66 66 for key in keys:
67 67 self.delete(key)
68 68
69 69
70 70 class PickleSerializer(object):
71 71
72 72 def _dumps(self, value, safe=False):
73 73 try:
74 74 return compat.pickle.dumps(value)
75 75 except Exception:
76 76 if safe:
77 77 return NO_VALUE
78 78 else:
79 79 raise
80 80
81 81 def _loads(self, value, safe=True):
82 82 try:
83 83 return compat.pickle.loads(value)
84 84 except Exception:
85 85 if safe:
86 86 return NO_VALUE
87 87 else:
88 88 raise
89 89
90 90
91 91 class MsgPackSerializer(object):
92 92
93 93 def _dumps(self, value, safe=False):
94 94 try:
95 95 return msgpack.packb(value)
96 96 except Exception:
97 97 if safe:
98 98 return NO_VALUE
99 99 else:
100 100 raise
101 101
102 102 def _loads(self, value, safe=True):
103 103 """
104 104 pickle maintained the `CachedValue` wrapper of the tuple
105 105 msgpack does not, so it must be added back in.
106 106 """
107 107 try:
108 108 value = msgpack.unpackb(value, use_list=False)
109 109 return CachedValue(*value)
110 110 except Exception:
111 111 if safe:
112 112 return NO_VALUE
113 113 else:
114 114 raise
115 115
116 116
117 117 import fcntl
118 118 flock_org = fcntl.flock
119 119
120 120
121 121 class CustomLockFactory(FileLock):
122 122
123 123 @memoized_property
124 124 def _module(self):
125 125
126 126 def gevent_flock(fd, operation):
127 127 """
128 128 Gevent compatible flock
129 129 """
130 130 # set non-blocking, this will cause an exception if we cannot acquire a lock
131 131 operation |= fcntl.LOCK_NB
132 132 start_lock_time = time.time()
133 133 timeout = 60 * 15 # 15min
134 134 while True:
135 135 try:
136 136 flock_org(fd, operation)
137 137 # lock has been acquired
138 138 break
139 139 except (OSError, IOError) as e:
140 140 # raise on other errors than Resource temporarily unavailable
141 141 if e.errno != errno.EAGAIN:
142 142 raise
143 143 elif (time.time() - start_lock_time) > timeout:
144 144 # waited to much time on a lock, better fail than loop for ever
145 145 log.error('Failed to acquire lock on `%s` after waiting %ss',
146 146 self.filename, timeout)
147 147 raise
148 148 wait_timeout = 0.03
149 149 log.debug('Failed to acquire lock on `%s`, retry in %ss',
150 150 self.filename, wait_timeout)
151 151 gevent.sleep(wait_timeout)
152 152
153 153 fcntl.flock = gevent_flock
154 154 return fcntl
155 155
156 156
157 157 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
158 158 key_prefix = 'file_backend'
159 159
160 160 def __init__(self, arguments):
161 161 arguments['lock_factory'] = CustomLockFactory
162 162 db_file = arguments.get('filename')
163 163
164 164 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
165 165 try:
166 166 super(FileNamespaceBackend, self).__init__(arguments)
167 167 except Exception:
168 168 log.error('Failed to initialize db at: %s', db_file)
169 169 raise
170 170
171 171 def __repr__(self):
172 172 return '{} `{}`'.format(self.__class__, self.filename)
173 173
174 174 def list_keys(self, prefix=''):
175 175 prefix = '{}:{}'.format(self.key_prefix, prefix)
176 176
177 177 def cond(v):
178 178 if not prefix:
179 179 return True
180 180
181 181 if v.startswith(prefix):
182 182 return True
183 183 return False
184 184
185 185 with self._dbm_file(True) as dbm:
186 186 try:
187 187 return filter(cond, dbm.keys())
188 188 except Exception:
189 189 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
190 190 raise
191 191
192 192 def get_store(self):
193 193 return self.filename
194 194
195 195 def _dbm_get(self, key):
196 196 with self._dbm_file(False) as dbm:
197 197 if hasattr(dbm, 'get'):
198 198 value = dbm.get(key, NO_VALUE)
199 199 else:
200 200 # gdbm objects lack a .get method
201 201 try:
202 202 value = dbm[key]
203 203 except KeyError:
204 204 value = NO_VALUE
205 205 if value is not NO_VALUE:
206 206 value = self._loads(value)
207 207 return value
208 208
209 209 def get(self, key):
210 210 try:
211 211 return self._dbm_get(key)
212 212 except Exception:
213 213 log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store())
214 214 raise
215 215
216 216 def set(self, key, value):
217 217 with self._dbm_file(True) as dbm:
218 218 dbm[key] = self._dumps(value)
219 219
220 220 def set_multi(self, mapping):
221 221 with self._dbm_file(True) as dbm:
222 222 for key, value in mapping.items():
223 223 dbm[key] = self._dumps(value)
224 224
225 225
226 226 class BaseRedisBackend(redis_backend.RedisBackend):
227 227
228 228 def _create_client(self):
229 229 args = {}
230 230
231 231 if self.url is not None:
232 232 args.update(url=self.url)
233 233
234 234 else:
235 235 args.update(
236 236 host=self.host, password=self.password,
237 237 port=self.port, db=self.db
238 238 )
239 239
240 240 connection_pool = redis.ConnectionPool(**args)
241 241
242 242 return redis.StrictRedis(connection_pool=connection_pool)
243 243
244 244 def list_keys(self, prefix=''):
245 245 prefix = '{}:{}*'.format(self.key_prefix, prefix)
246 246 return self.client.keys(prefix)
247 247
248 248 def get_store(self):
249 249 return self.client.connection_pool
250 250
251 251 def get(self, key):
252 252 value = self.client.get(key)
253 253 if value is None:
254 254 return NO_VALUE
255 255 return self._loads(value)
256 256
257 257 def get_multi(self, keys):
258 258 if not keys:
259 259 return []
260 260 values = self.client.mget(keys)
261 261 loads = self._loads
262 262 return [
263 263 loads(v) if v is not None else NO_VALUE
264 264 for v in values]
265 265
266 266 def set(self, key, value):
267 267 if self.redis_expiration_time:
268 268 self.client.setex(key, self.redis_expiration_time,
269 269 self._dumps(value))
270 270 else:
271 271 self.client.set(key, self._dumps(value))
272 272
273 273 def set_multi(self, mapping):
274 274 dumps = self._dumps
275 275 mapping = dict(
276 276 (k, dumps(v))
277 277 for k, v in mapping.items()
278 278 )
279 279
280 280 if not self.redis_expiration_time:
281 281 self.client.mset(mapping)
282 282 else:
283 283 pipe = self.client.pipeline()
284 284 for key, value in mapping.items():
285 285 pipe.setex(key, self.redis_expiration_time, value)
286 286 pipe.execute()
287 287
288 288 def get_mutex(self, key):
289 289 if self.distributed_lock:
290 290 import redis_lock
291 291 lock_key = redis_backend.u('_lock_{0}').format(key)
292 292 log.debug('Trying to acquire Redis lock for key %s', lock_key)
293 293 lock = redis_lock.Lock(
294 294 redis_client=self.client,
295 295 name=lock_key,
296 296 expire=self.lock_timeout,
297 297 auto_renewal=False,
298 strict=True,
298 299 )
299 300 return lock
300 301 else:
301 302 return None
302 303
303 304
304 305 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
305 306 key_prefix = 'redis_pickle_backend'
306 307 pass
307 308
308 309
309 310 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
310 311 key_prefix = 'redis_msgpack_backend'
311 312 pass
General Comments 0
You need to be logged in to leave comments. Login now