##// END OF EJS Templates
caches: further improvements on the lock implementations
super-admin -
r952:757fecd5 default
parent child Browse files
Show More
@@ -1,313 +1,316 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2020 RhodeCode GmbH
2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 import time
18 import time
19 import errno
19 import errno
20 import logging
20 import logging
21
21
22 import msgpack
22 import msgpack
23 import redis
23 import redis
24
24
25 from dogpile.cache.api import CachedValue
25 from dogpile.cache.api import CachedValue
26 from dogpile.cache.backends import memory as memory_backend
26 from dogpile.cache.backends import memory as memory_backend
27 from dogpile.cache.backends import file as file_backend
27 from dogpile.cache.backends import file as file_backend
28 from dogpile.cache.backends import redis as redis_backend
28 from dogpile.cache.backends import redis as redis_backend
29 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
29 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
30 from dogpile.cache.util import memoized_property
30 from dogpile.cache.util import memoized_property
31
31
32 from pyramid.settings import asbool
32 from pyramid.settings import asbool
33
33
34 from vcsserver.lib.memory_lru_dict import LRUDict, LRUDictDebug
34 from vcsserver.lib.memory_lru_dict import LRUDict, LRUDictDebug
35
35
36
36
37 _default_max_size = 1024
37 _default_max_size = 1024
38
38
39 log = logging.getLogger(__name__)
39 log = logging.getLogger(__name__)
40
40
41
41
42 class LRUMemoryBackend(memory_backend.MemoryBackend):
42 class LRUMemoryBackend(memory_backend.MemoryBackend):
43 key_prefix = 'lru_mem_backend'
43 key_prefix = 'lru_mem_backend'
44 pickle_values = False
44 pickle_values = False
45
45
46 def __init__(self, arguments):
46 def __init__(self, arguments):
47 max_size = arguments.pop('max_size', _default_max_size)
47 max_size = arguments.pop('max_size', _default_max_size)
48
48
49 LRUDictClass = LRUDict
49 LRUDictClass = LRUDict
50 if arguments.pop('log_key_count', None):
50 if arguments.pop('log_key_count', None):
51 LRUDictClass = LRUDictDebug
51 LRUDictClass = LRUDictDebug
52
52
53 arguments['cache_dict'] = LRUDictClass(max_size)
53 arguments['cache_dict'] = LRUDictClass(max_size)
54 super(LRUMemoryBackend, self).__init__(arguments)
54 super(LRUMemoryBackend, self).__init__(arguments)
55
55
56 def delete(self, key):
56 def delete(self, key):
57 try:
57 try:
58 del self._cache[key]
58 del self._cache[key]
59 except KeyError:
59 except KeyError:
60 # we don't care if key isn't there at deletion
60 # we don't care if key isn't there at deletion
61 pass
61 pass
62
62
63 def delete_multi(self, keys):
63 def delete_multi(self, keys):
64 for key in keys:
64 for key in keys:
65 self.delete(key)
65 self.delete(key)
66
66
67
67
68 class PickleSerializer(object):
68 class PickleSerializer(object):
69
69
70 def _dumps(self, value, safe=False):
70 def _dumps(self, value, safe=False):
71 try:
71 try:
72 return compat.pickle.dumps(value)
72 return compat.pickle.dumps(value)
73 except Exception:
73 except Exception:
74 if safe:
74 if safe:
75 return NO_VALUE
75 return NO_VALUE
76 else:
76 else:
77 raise
77 raise
78
78
79 def _loads(self, value, safe=True):
79 def _loads(self, value, safe=True):
80 try:
80 try:
81 return compat.pickle.loads(value)
81 return compat.pickle.loads(value)
82 except Exception:
82 except Exception:
83 if safe:
83 if safe:
84 return NO_VALUE
84 return NO_VALUE
85 else:
85 else:
86 raise
86 raise
87
87
88
88
89 class MsgPackSerializer(object):
89 class MsgPackSerializer(object):
90
90
91 def _dumps(self, value, safe=False):
91 def _dumps(self, value, safe=False):
92 try:
92 try:
93 return msgpack.packb(value)
93 return msgpack.packb(value)
94 except Exception:
94 except Exception:
95 if safe:
95 if safe:
96 return NO_VALUE
96 return NO_VALUE
97 else:
97 else:
98 raise
98 raise
99
99
100 def _loads(self, value, safe=True):
100 def _loads(self, value, safe=True):
101 """
101 """
102 pickle maintained the `CachedValue` wrapper of the tuple
102 pickle maintained the `CachedValue` wrapper of the tuple
103 msgpack does not, so it must be added back in.
103 msgpack does not, so it must be added back in.
104 """
104 """
105 try:
105 try:
106 value = msgpack.unpackb(value, use_list=False)
106 value = msgpack.unpackb(value, use_list=False)
107 return CachedValue(*value)
107 return CachedValue(*value)
108 except Exception:
108 except Exception:
109 if safe:
109 if safe:
110 return NO_VALUE
110 return NO_VALUE
111 else:
111 else:
112 raise
112 raise
113
113
114
114
115 import fcntl
115 import fcntl
116 flock_org = fcntl.flock
116 flock_org = fcntl.flock
117
117
118
118
119 class CustomLockFactory(FileLock):
119 class CustomLockFactory(FileLock):
120
120
121 pass
121 pass
122
122
123
123
124 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
124 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
125 key_prefix = 'file_backend'
125 key_prefix = 'file_backend'
126
126
127 def __init__(self, arguments):
127 def __init__(self, arguments):
128 arguments['lock_factory'] = CustomLockFactory
128 arguments['lock_factory'] = CustomLockFactory
129 db_file = arguments.get('filename')
129 db_file = arguments.get('filename')
130
130
131 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
131 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
132 try:
132 try:
133 super(FileNamespaceBackend, self).__init__(arguments)
133 super(FileNamespaceBackend, self).__init__(arguments)
134 except Exception:
134 except Exception:
135 log.error('Failed to initialize db at: %s', db_file)
135 log.error('Failed to initialize db at: %s', db_file)
136 raise
136 raise
137
137
138 def __repr__(self):
138 def __repr__(self):
139 return '{} `{}`'.format(self.__class__, self.filename)
139 return '{} `{}`'.format(self.__class__, self.filename)
140
140
141 def list_keys(self, prefix=''):
141 def list_keys(self, prefix=''):
142 prefix = '{}:{}'.format(self.key_prefix, prefix)
142 prefix = '{}:{}'.format(self.key_prefix, prefix)
143
143
144 def cond(v):
144 def cond(v):
145 if not prefix:
145 if not prefix:
146 return True
146 return True
147
147
148 if v.startswith(prefix):
148 if v.startswith(prefix):
149 return True
149 return True
150 return False
150 return False
151
151
152 with self._dbm_file(True) as dbm:
152 with self._dbm_file(True) as dbm:
153 try:
153 try:
154 return filter(cond, dbm.keys())
154 return filter(cond, dbm.keys())
155 except Exception:
155 except Exception:
156 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
156 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
157 raise
157 raise
158
158
159 def get_store(self):
159 def get_store(self):
160 return self.filename
160 return self.filename
161
161
162 def _dbm_get(self, key):
162 def _dbm_get(self, key):
163 with self._dbm_file(False) as dbm:
163 with self._dbm_file(False) as dbm:
164 if hasattr(dbm, 'get'):
164 if hasattr(dbm, 'get'):
165 value = dbm.get(key, NO_VALUE)
165 value = dbm.get(key, NO_VALUE)
166 else:
166 else:
167 # gdbm objects lack a .get method
167 # gdbm objects lack a .get method
168 try:
168 try:
169 value = dbm[key]
169 value = dbm[key]
170 except KeyError:
170 except KeyError:
171 value = NO_VALUE
171 value = NO_VALUE
172 if value is not NO_VALUE:
172 if value is not NO_VALUE:
173 value = self._loads(value)
173 value = self._loads(value)
174 return value
174 return value
175
175
176 def get(self, key):
176 def get(self, key):
177 try:
177 try:
178 return self._dbm_get(key)
178 return self._dbm_get(key)
179 except Exception:
179 except Exception:
180 log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store())
180 log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store())
181 raise
181 raise
182
182
183 def set(self, key, value):
183 def set(self, key, value):
184 with self._dbm_file(True) as dbm:
184 with self._dbm_file(True) as dbm:
185 dbm[key] = self._dumps(value)
185 dbm[key] = self._dumps(value)
186
186
187 def set_multi(self, mapping):
187 def set_multi(self, mapping):
188 with self._dbm_file(True) as dbm:
188 with self._dbm_file(True) as dbm:
189 for key, value in mapping.items():
189 for key, value in mapping.items():
190 dbm[key] = self._dumps(value)
190 dbm[key] = self._dumps(value)
191
191
192
192
193 class BaseRedisBackend(redis_backend.RedisBackend):
193 class BaseRedisBackend(redis_backend.RedisBackend):
194 key_prefix = ''
194 key_prefix = ''
195
195
196 def __init__(self, arguments):
196 def __init__(self, arguments):
197 super(BaseRedisBackend, self).__init__(arguments)
197 super(BaseRedisBackend, self).__init__(arguments)
198 self._lock_timeout = self.lock_timeout
198 self._lock_timeout = self.lock_timeout
199 self._lock_auto_renewal = asbool(arguments.pop("lock_auto_renewal", True))
199 self._lock_auto_renewal = asbool(arguments.pop("lock_auto_renewal", True))
200
200
201 if self._lock_auto_renewal and not self._lock_timeout:
201 if self._lock_auto_renewal and not self._lock_timeout:
202 # set default timeout for auto_renewal
202 # set default timeout for auto_renewal
203 self._lock_timeout = 30
203 self._lock_timeout = 30
204
204
205 def _create_client(self):
205 def _create_client(self):
206 args = {}
206 args = {}
207
207
208 if self.url is not None:
208 if self.url is not None:
209 args.update(url=self.url)
209 args.update(url=self.url)
210
210
211 else:
211 else:
212 args.update(
212 args.update(
213 host=self.host, password=self.password,
213 host=self.host, password=self.password,
214 port=self.port, db=self.db
214 port=self.port, db=self.db
215 )
215 )
216
216
217 connection_pool = redis.ConnectionPool(**args)
217 connection_pool = redis.ConnectionPool(**args)
218
218
219 return redis.StrictRedis(connection_pool=connection_pool)
219 return redis.StrictRedis(connection_pool=connection_pool)
220
220
221 def list_keys(self, prefix=''):
221 def list_keys(self, prefix=''):
222 prefix = '{}:{}*'.format(self.key_prefix, prefix)
222 prefix = '{}:{}*'.format(self.key_prefix, prefix)
223 return self.client.keys(prefix)
223 return self.client.keys(prefix)
224
224
225 def get_store(self):
225 def get_store(self):
226 return self.client.connection_pool
226 return self.client.connection_pool
227
227
228 def get(self, key):
228 def get(self, key):
229 value = self.client.get(key)
229 value = self.client.get(key)
230 if value is None:
230 if value is None:
231 return NO_VALUE
231 return NO_VALUE
232 return self._loads(value)
232 return self._loads(value)
233
233
234 def get_multi(self, keys):
234 def get_multi(self, keys):
235 if not keys:
235 if not keys:
236 return []
236 return []
237 values = self.client.mget(keys)
237 values = self.client.mget(keys)
238 loads = self._loads
238 loads = self._loads
239 return [
239 return [
240 loads(v) if v is not None else NO_VALUE
240 loads(v) if v is not None else NO_VALUE
241 for v in values]
241 for v in values]
242
242
243 def set(self, key, value):
243 def set(self, key, value):
244 if self.redis_expiration_time:
244 if self.redis_expiration_time:
245 self.client.setex(key, self.redis_expiration_time,
245 self.client.setex(key, self.redis_expiration_time,
246 self._dumps(value))
246 self._dumps(value))
247 else:
247 else:
248 self.client.set(key, self._dumps(value))
248 self.client.set(key, self._dumps(value))
249
249
250 def set_multi(self, mapping):
250 def set_multi(self, mapping):
251 dumps = self._dumps
251 dumps = self._dumps
252 mapping = dict(
252 mapping = dict(
253 (k, dumps(v))
253 (k, dumps(v))
254 for k, v in mapping.items()
254 for k, v in mapping.items()
255 )
255 )
256
256
257 if not self.redis_expiration_time:
257 if not self.redis_expiration_time:
258 self.client.mset(mapping)
258 self.client.mset(mapping)
259 else:
259 else:
260 pipe = self.client.pipeline()
260 pipe = self.client.pipeline()
261 for key, value in mapping.items():
261 for key, value in mapping.items():
262 pipe.setex(key, self.redis_expiration_time, value)
262 pipe.setex(key, self.redis_expiration_time, value)
263 pipe.execute()
263 pipe.execute()
264
264
265 def get_mutex(self, key):
265 def get_mutex(self, key):
266 if self.distributed_lock:
266 if self.distributed_lock:
267 lock_key = redis_backend.u('_lock_{0}').format(key)
267 lock_key = redis_backend.u('_lock_{0}').format(key)
268 log.debug('Trying to acquire Redis lock for key %s', lock_key)
268 log.debug('Trying to acquire Redis lock for key %s', lock_key)
269 return get_mutex_lock(self.client, lock_key, self._lock_timeout,
269 return get_mutex_lock(self.client, lock_key, self._lock_timeout,
270 auto_renewal=self._lock_auto_renewal)
270 auto_renewal=self._lock_auto_renewal)
271 else:
271 else:
272 return None
272 return None
273
273
274
274
275 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
275 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
276 key_prefix = 'redis_pickle_backend'
276 key_prefix = 'redis_pickle_backend'
277 pass
277 pass
278
278
279
279
280 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
280 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
281 key_prefix = 'redis_msgpack_backend'
281 key_prefix = 'redis_msgpack_backend'
282 pass
282 pass
283
283
284
284
285 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
285 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
286 import redis_lock
286 import redis_lock
287
287
288 class _RedisLockWrapper(object):
288 class _RedisLockWrapper(object):
289 """LockWrapper for redis_lock"""
289 """LockWrapper for redis_lock"""
290
290
291 def __init__(self):
291 @classmethod
292 pass
292 def get_lock(cls):
293
294 @property
295 def lock(self):
296 return redis_lock.Lock(
293 return redis_lock.Lock(
297 redis_client=client,
294 redis_client=client,
298 name=lock_key,
295 name=lock_key,
299 expire=lock_timeout,
296 expire=lock_timeout,
300 auto_renewal=auto_renewal,
297 auto_renewal=auto_renewal,
301 strict=True,
298 strict=True,
302 )
299 )
303
300
301 def __init__(self):
302 self.lock = self.get_lock()
303
304 def acquire(self, wait=True):
304 def acquire(self, wait=True):
305 try:
305 return self.lock.acquire(wait)
306 return self.lock.acquire(wait)
307 except redis_lock.AlreadyAquited:
308 return False
306
309
307 def release(self):
310 def release(self):
308 try:
311 try:
309 self.lock.release()
312 self.lock.release()
310 except redis_lock.NotAcquired:
313 except redis_lock.NotAcquired:
311 pass
314 pass
312
315
313 return _RedisLockWrapper()
316 return _RedisLockWrapper()
General Comments 0
You need to be logged in to leave comments. Login now