##// END OF EJS Templates
caches: dbm backend needs to use bytes to list keys
super-admin -
r5022:b2c2c0f3 default
parent child Browse files
Show More
@@ -1,274 +1,274 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import time
22 22 import errno
23 23 import logging
24 24 import functools
25 25
26 26 import msgpack
27 27 import redis
28 28 import gevent
29 29 import pickle
30 30 import fcntl
31 31 flock_org = fcntl.flock
32 32 from typing import Union
33 33
34 34 from dogpile.cache.backends import memory as memory_backend
35 35 from dogpile.cache.backends import file as file_backend
36 36 from dogpile.cache.backends import redis as redis_backend
37 37 from dogpile.cache.backends.file import FileLock
38 38 from dogpile.cache.util import memoized_property
39 39 from dogpile.cache.api import Serializer, Deserializer
40 40
41 41 from pyramid.settings import asbool
42 42
43 43 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
44 from rhodecode.lib.str_utils import safe_str
44 from rhodecode.lib.str_utils import safe_str, safe_bytes
45 45
46 46
47 47 _default_max_size = 1024
48 48
49 49 log = logging.getLogger(__name__)
50 50
51 51
52 52 class LRUMemoryBackend(memory_backend.MemoryBackend):
53 53 key_prefix = 'lru_mem_backend'
54 54 pickle_values = False
55 55
56 56 def __init__(self, arguments):
57 57 max_size = arguments.pop('max_size', _default_max_size)
58 58
59 59 LRUDictClass = LRUDict
60 60 if arguments.pop('log_key_count', None):
61 61 LRUDictClass = LRUDictDebug
62 62
63 63 arguments['cache_dict'] = LRUDictClass(max_size)
64 64 super(LRUMemoryBackend, self).__init__(arguments)
65 65
66 66 def delete(self, key):
67 67 try:
68 68 del self._cache[key]
69 69 except KeyError:
70 70 # we don't care if key isn't there at deletion
71 71 pass
72 72
73 73 def delete_multi(self, keys):
74 74 for key in keys:
75 75 self.delete(key)
76 76
77 77
78 78 class PickleSerializer:
79 79 serializer: Union[None, Serializer] = staticmethod( # type: ignore
80 80 functools.partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
81 81 )
82 82 deserializer: Union[None, Deserializer] = staticmethod( # type: ignore
83 83 functools.partial(pickle.loads)
84 84 )
85 85
86 86
87 87 class MsgPackSerializer(object):
88 88 serializer: Union[None, Serializer] = staticmethod( # type: ignore
89 89 msgpack.packb
90 90 )
91 91 deserializer: Union[None, Deserializer] = staticmethod( # type: ignore
92 92 functools.partial(msgpack.unpackb, use_list=False)
93 93 )
94 94
95 95
96 96 class CustomLockFactory(FileLock):
97 97
98 98 @memoized_property
99 99 def _module(self):
100 100
101 101 def gevent_flock(fd, operation):
102 102 """
103 103 Gevent compatible flock
104 104 """
105 105 # set non-blocking, this will cause an exception if we cannot acquire a lock
106 106 operation |= fcntl.LOCK_NB
107 107 start_lock_time = time.time()
108 108 timeout = 60 * 15 # 15min
109 109 while True:
110 110 try:
111 111 flock_org(fd, operation)
112 112 # lock has been acquired
113 113 break
114 114 except (OSError, IOError) as e:
115 115 # raise on other errors than Resource temporarily unavailable
116 116 if e.errno != errno.EAGAIN:
117 117 raise
118 118 elif (time.time() - start_lock_time) > timeout:
119 119 # waited to much time on a lock, better fail than loop for ever
120 120 log.error('Failed to acquire lock on `%s` after waiting %ss',
121 121 self.filename, timeout)
122 122 raise
123 123 wait_timeout = 0.03
124 124 log.debug('Failed to acquire lock on `%s`, retry in %ss',
125 125 self.filename, wait_timeout)
126 126 gevent.sleep(wait_timeout)
127 127
128 128 fcntl.flock = gevent_flock
129 129 return fcntl
130 130
131 131
132 132 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
133 133 key_prefix = 'file_backend'
134 134
135 135 def __init__(self, arguments):
136 136 arguments['lock_factory'] = CustomLockFactory
137 137 db_file = arguments.get('filename')
138 138
139 139 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
140 140 try:
141 141 super(FileNamespaceBackend, self).__init__(arguments)
142 142 except Exception:
143 143 log.exception('Failed to initialize db at: %s', db_file)
144 144 raise
145 145
146 146 def __repr__(self):
147 147 return '{} `{}`'.format(self.__class__, self.filename)
148 148
149 def list_keys(self, prefix=''):
150 prefix = '{}:{}'.format(self.key_prefix, prefix)
149 def list_keys(self, prefix: bytes = b''):
150 prefix = b'%b:%b' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
151 151
152 def cond(v):
152 def cond(dbm_key: bytes):
153 153 if not prefix:
154 154 return True
155 155
156 if v.startswith(prefix):
156 if dbm_key.startswith(prefix):
157 157 return True
158 158 return False
159 159
160 160 with self._dbm_file(True) as dbm:
161 161 try:
162 return list(filter(cond, list(dbm.keys())))
162 return list(filter(cond, dbm.keys()))
163 163 except Exception:
164 164 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
165 165 raise
166 166
167 167 def get_store(self):
168 168 return self.filename
169 169
170 170
171 171 class BaseRedisBackend(redis_backend.RedisBackend):
172 172 key_prefix = ''
173 173
174 174 def __init__(self, arguments):
175 175 super(BaseRedisBackend, self).__init__(arguments)
176 176 self._lock_timeout = self.lock_timeout
177 177 self._lock_auto_renewal = asbool(arguments.pop("lock_auto_renewal", True))
178 178
179 179 if self._lock_auto_renewal and not self._lock_timeout:
180 180 # set default timeout for auto_renewal
181 181 self._lock_timeout = 30
182 182
183 183 def _create_client(self):
184 184 args = {}
185 185
186 186 if self.url is not None:
187 187 args.update(url=self.url)
188 188
189 189 else:
190 190 args.update(
191 191 host=self.host, password=self.password,
192 192 port=self.port, db=self.db
193 193 )
194 194
195 195 connection_pool = redis.ConnectionPool(**args)
196 196 self.writer_client = redis.StrictRedis(
197 197 connection_pool=connection_pool
198 198 )
199 199 self.reader_client = self.writer_client
200 200
201 201 def list_keys(self, prefix=''):
202 202 prefix = '{}:{}*'.format(self.key_prefix, prefix)
203 203 return self.reader_client.keys(prefix)
204 204
205 205 def get_store(self):
206 206 return self.reader_client.connection_pool
207 207
208 208 def get_mutex(self, key):
209 209 if self.distributed_lock:
210 210 lock_key = '_lock_{0}'.format(safe_str(key))
211 211 return get_mutex_lock(
212 212 self.writer_client, lock_key,
213 213 self._lock_timeout,
214 214 auto_renewal=self._lock_auto_renewal
215 215 )
216 216 else:
217 217 return None
218 218
219 219
220 220 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
221 221 key_prefix = 'redis_pickle_backend'
222 222 pass
223 223
224 224
225 225 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
226 226 key_prefix = 'redis_msgpack_backend'
227 227 pass
228 228
229 229
230 230 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
231 231 from rhodecode.lib._vendor import redis_lock
232 232
233 233 class _RedisLockWrapper(object):
234 234 """LockWrapper for redis_lock"""
235 235
236 236 @classmethod
237 237 def get_lock(cls):
238 238 return redis_lock.Lock(
239 239 redis_client=client,
240 240 name=lock_key,
241 241 expire=lock_timeout,
242 242 auto_renewal=auto_renewal,
243 243 strict=True,
244 244 )
245 245
246 246 def __repr__(self):
247 247 return "{}:{}".format(self.__class__.__name__, lock_key)
248 248
249 249 def __str__(self):
250 250 return "{}:{}".format(self.__class__.__name__, lock_key)
251 251
252 252 def __init__(self):
253 253 self.lock = self.get_lock()
254 254 self.lock_key = lock_key
255 255
256 256 def acquire(self, wait=True):
257 257 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
258 258 try:
259 259 acquired = self.lock.acquire(wait)
260 260 log.debug('Got lock for key %s, %s', self.lock_key, acquired)
261 261 return acquired
262 262 except redis_lock.AlreadyAcquired:
263 263 return False
264 264 except redis_lock.AlreadyStarted:
265 265 # refresh thread exists, but it also means we acquired the lock
266 266 return True
267 267
268 268 def release(self):
269 269 try:
270 270 self.lock.release()
271 271 except redis_lock.NotAcquired:
272 272 pass
273 273
274 274 return _RedisLockWrapper()
General Comments 0
You need to be logged in to leave comments. Login now