##// END OF EJS Templates
caches: further improvements on the lock implementations
super-admin -
r952:757fecd5 default
parent child Browse files
Show More
@@ -1,313 +1,316 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import time
19 19 import errno
20 20 import logging
21 21
22 22 import msgpack
23 23 import redis
24 24
25 25 from dogpile.cache.api import CachedValue
26 26 from dogpile.cache.backends import memory as memory_backend
27 27 from dogpile.cache.backends import file as file_backend
28 28 from dogpile.cache.backends import redis as redis_backend
29 29 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
30 30 from dogpile.cache.util import memoized_property
31 31
32 32 from pyramid.settings import asbool
33 33
34 34 from vcsserver.lib.memory_lru_dict import LRUDict, LRUDictDebug
35 35
36 36
37 37 _default_max_size = 1024
38 38
39 39 log = logging.getLogger(__name__)
40 40
41 41
42 42 class LRUMemoryBackend(memory_backend.MemoryBackend):
43 43 key_prefix = 'lru_mem_backend'
44 44 pickle_values = False
45 45
46 46 def __init__(self, arguments):
47 47 max_size = arguments.pop('max_size', _default_max_size)
48 48
49 49 LRUDictClass = LRUDict
50 50 if arguments.pop('log_key_count', None):
51 51 LRUDictClass = LRUDictDebug
52 52
53 53 arguments['cache_dict'] = LRUDictClass(max_size)
54 54 super(LRUMemoryBackend, self).__init__(arguments)
55 55
56 56 def delete(self, key):
57 57 try:
58 58 del self._cache[key]
59 59 except KeyError:
60 60 # we don't care if key isn't there at deletion
61 61 pass
62 62
63 63 def delete_multi(self, keys):
64 64 for key in keys:
65 65 self.delete(key)
66 66
67 67
68 68 class PickleSerializer(object):
69 69
70 70 def _dumps(self, value, safe=False):
71 71 try:
72 72 return compat.pickle.dumps(value)
73 73 except Exception:
74 74 if safe:
75 75 return NO_VALUE
76 76 else:
77 77 raise
78 78
79 79 def _loads(self, value, safe=True):
80 80 try:
81 81 return compat.pickle.loads(value)
82 82 except Exception:
83 83 if safe:
84 84 return NO_VALUE
85 85 else:
86 86 raise
87 87
88 88
89 89 class MsgPackSerializer(object):
90 90
91 91 def _dumps(self, value, safe=False):
92 92 try:
93 93 return msgpack.packb(value)
94 94 except Exception:
95 95 if safe:
96 96 return NO_VALUE
97 97 else:
98 98 raise
99 99
100 100 def _loads(self, value, safe=True):
101 101 """
102 102 pickle maintained the `CachedValue` wrapper of the tuple
103 103 msgpack does not, so it must be added back in.
104 104 """
105 105 try:
106 106 value = msgpack.unpackb(value, use_list=False)
107 107 return CachedValue(*value)
108 108 except Exception:
109 109 if safe:
110 110 return NO_VALUE
111 111 else:
112 112 raise
113 113
114 114
115 115 import fcntl
116 116 flock_org = fcntl.flock
117 117
118 118
119 119 class CustomLockFactory(FileLock):
120 120
121 121 pass
122 122
123 123
124 124 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
125 125 key_prefix = 'file_backend'
126 126
127 127 def __init__(self, arguments):
128 128 arguments['lock_factory'] = CustomLockFactory
129 129 db_file = arguments.get('filename')
130 130
131 131 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
132 132 try:
133 133 super(FileNamespaceBackend, self).__init__(arguments)
134 134 except Exception:
135 135 log.error('Failed to initialize db at: %s', db_file)
136 136 raise
137 137
138 138 def __repr__(self):
139 139 return '{} `{}`'.format(self.__class__, self.filename)
140 140
141 141 def list_keys(self, prefix=''):
142 142 prefix = '{}:{}'.format(self.key_prefix, prefix)
143 143
144 144 def cond(v):
145 145 if not prefix:
146 146 return True
147 147
148 148 if v.startswith(prefix):
149 149 return True
150 150 return False
151 151
152 152 with self._dbm_file(True) as dbm:
153 153 try:
154 154 return filter(cond, dbm.keys())
155 155 except Exception:
156 156 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
157 157 raise
158 158
159 159 def get_store(self):
160 160 return self.filename
161 161
162 162 def _dbm_get(self, key):
163 163 with self._dbm_file(False) as dbm:
164 164 if hasattr(dbm, 'get'):
165 165 value = dbm.get(key, NO_VALUE)
166 166 else:
167 167 # gdbm objects lack a .get method
168 168 try:
169 169 value = dbm[key]
170 170 except KeyError:
171 171 value = NO_VALUE
172 172 if value is not NO_VALUE:
173 173 value = self._loads(value)
174 174 return value
175 175
176 176 def get(self, key):
177 177 try:
178 178 return self._dbm_get(key)
179 179 except Exception:
180 180 log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store())
181 181 raise
182 182
183 183 def set(self, key, value):
184 184 with self._dbm_file(True) as dbm:
185 185 dbm[key] = self._dumps(value)
186 186
187 187 def set_multi(self, mapping):
188 188 with self._dbm_file(True) as dbm:
189 189 for key, value in mapping.items():
190 190 dbm[key] = self._dumps(value)
191 191
192 192
193 193 class BaseRedisBackend(redis_backend.RedisBackend):
194 194 key_prefix = ''
195 195
196 196 def __init__(self, arguments):
197 197 super(BaseRedisBackend, self).__init__(arguments)
198 198 self._lock_timeout = self.lock_timeout
199 199 self._lock_auto_renewal = asbool(arguments.pop("lock_auto_renewal", True))
200 200
201 201 if self._lock_auto_renewal and not self._lock_timeout:
202 202 # set default timeout for auto_renewal
203 203 self._lock_timeout = 30
204 204
205 205 def _create_client(self):
206 206 args = {}
207 207
208 208 if self.url is not None:
209 209 args.update(url=self.url)
210 210
211 211 else:
212 212 args.update(
213 213 host=self.host, password=self.password,
214 214 port=self.port, db=self.db
215 215 )
216 216
217 217 connection_pool = redis.ConnectionPool(**args)
218 218
219 219 return redis.StrictRedis(connection_pool=connection_pool)
220 220
221 221 def list_keys(self, prefix=''):
222 222 prefix = '{}:{}*'.format(self.key_prefix, prefix)
223 223 return self.client.keys(prefix)
224 224
225 225 def get_store(self):
226 226 return self.client.connection_pool
227 227
228 228 def get(self, key):
229 229 value = self.client.get(key)
230 230 if value is None:
231 231 return NO_VALUE
232 232 return self._loads(value)
233 233
234 234 def get_multi(self, keys):
235 235 if not keys:
236 236 return []
237 237 values = self.client.mget(keys)
238 238 loads = self._loads
239 239 return [
240 240 loads(v) if v is not None else NO_VALUE
241 241 for v in values]
242 242
243 243 def set(self, key, value):
244 244 if self.redis_expiration_time:
245 245 self.client.setex(key, self.redis_expiration_time,
246 246 self._dumps(value))
247 247 else:
248 248 self.client.set(key, self._dumps(value))
249 249
250 250 def set_multi(self, mapping):
251 251 dumps = self._dumps
252 252 mapping = dict(
253 253 (k, dumps(v))
254 254 for k, v in mapping.items()
255 255 )
256 256
257 257 if not self.redis_expiration_time:
258 258 self.client.mset(mapping)
259 259 else:
260 260 pipe = self.client.pipeline()
261 261 for key, value in mapping.items():
262 262 pipe.setex(key, self.redis_expiration_time, value)
263 263 pipe.execute()
264 264
265 265 def get_mutex(self, key):
266 266 if self.distributed_lock:
267 267 lock_key = redis_backend.u('_lock_{0}').format(key)
268 268 log.debug('Trying to acquire Redis lock for key %s', lock_key)
269 269 return get_mutex_lock(self.client, lock_key, self._lock_timeout,
270 270 auto_renewal=self._lock_auto_renewal)
271 271 else:
272 272 return None
273 273
274 274
275 275 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
276 276 key_prefix = 'redis_pickle_backend'
277 277 pass
278 278
279 279
280 280 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
281 281 key_prefix = 'redis_msgpack_backend'
282 282 pass
283 283
284 284
285 285 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
286 286 import redis_lock
287 287
288 288 class _RedisLockWrapper(object):
289 289 """LockWrapper for redis_lock"""
290 290
291 def __init__(self):
292 pass
293
294 @property
295 def lock(self):
291 @classmethod
292 def get_lock(cls):
296 293 return redis_lock.Lock(
297 294 redis_client=client,
298 295 name=lock_key,
299 296 expire=lock_timeout,
300 297 auto_renewal=auto_renewal,
301 298 strict=True,
302 299 )
303 300
301 def __init__(self):
302 self.lock = self.get_lock()
303
304 304 def acquire(self, wait=True):
305 try:
305 306 return self.lock.acquire(wait)
307 except redis_lock.AlreadyAquited:
308 return False
306 309
307 310 def release(self):
308 311 try:
309 312 self.lock.release()
310 313 except redis_lock.NotAcquired:
311 314 pass
312 315
313 316 return _RedisLockWrapper()
General Comments 0
You need to be logged in to leave comments. Login now