##// END OF EJS Templates
caches: dbm backend needs to use bytes to list keys
super-admin -
r5022:b2c2c0f3 default
parent child Browse files
Show More
@@ -1,274 +1,274 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2015-2020 RhodeCode GmbH
3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import time
21 import time
22 import errno
22 import errno
23 import logging
23 import logging
24 import functools
24 import functools
25
25
26 import msgpack
26 import msgpack
27 import redis
27 import redis
28 import gevent
28 import gevent
29 import pickle
29 import pickle
30 import fcntl
30 import fcntl
31 flock_org = fcntl.flock
31 flock_org = fcntl.flock
32 from typing import Union
32 from typing import Union
33
33
34 from dogpile.cache.backends import memory as memory_backend
34 from dogpile.cache.backends import memory as memory_backend
35 from dogpile.cache.backends import file as file_backend
35 from dogpile.cache.backends import file as file_backend
36 from dogpile.cache.backends import redis as redis_backend
36 from dogpile.cache.backends import redis as redis_backend
37 from dogpile.cache.backends.file import FileLock
37 from dogpile.cache.backends.file import FileLock
38 from dogpile.cache.util import memoized_property
38 from dogpile.cache.util import memoized_property
39 from dogpile.cache.api import Serializer, Deserializer
39 from dogpile.cache.api import Serializer, Deserializer
40
40
41 from pyramid.settings import asbool
41 from pyramid.settings import asbool
42
42
43 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
43 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
44 from rhodecode.lib.str_utils import safe_str
44 from rhodecode.lib.str_utils import safe_str, safe_bytes
45
45
46
46
47 _default_max_size = 1024
47 _default_max_size = 1024
48
48
49 log = logging.getLogger(__name__)
49 log = logging.getLogger(__name__)
50
50
51
51
52 class LRUMemoryBackend(memory_backend.MemoryBackend):
52 class LRUMemoryBackend(memory_backend.MemoryBackend):
53 key_prefix = 'lru_mem_backend'
53 key_prefix = 'lru_mem_backend'
54 pickle_values = False
54 pickle_values = False
55
55
56 def __init__(self, arguments):
56 def __init__(self, arguments):
57 max_size = arguments.pop('max_size', _default_max_size)
57 max_size = arguments.pop('max_size', _default_max_size)
58
58
59 LRUDictClass = LRUDict
59 LRUDictClass = LRUDict
60 if arguments.pop('log_key_count', None):
60 if arguments.pop('log_key_count', None):
61 LRUDictClass = LRUDictDebug
61 LRUDictClass = LRUDictDebug
62
62
63 arguments['cache_dict'] = LRUDictClass(max_size)
63 arguments['cache_dict'] = LRUDictClass(max_size)
64 super(LRUMemoryBackend, self).__init__(arguments)
64 super(LRUMemoryBackend, self).__init__(arguments)
65
65
66 def delete(self, key):
66 def delete(self, key):
67 try:
67 try:
68 del self._cache[key]
68 del self._cache[key]
69 except KeyError:
69 except KeyError:
70 # we don't care if key isn't there at deletion
70 # we don't care if key isn't there at deletion
71 pass
71 pass
72
72
73 def delete_multi(self, keys):
73 def delete_multi(self, keys):
74 for key in keys:
74 for key in keys:
75 self.delete(key)
75 self.delete(key)
76
76
77
77
78 class PickleSerializer:
78 class PickleSerializer:
79 serializer: Union[None, Serializer] = staticmethod( # type: ignore
79 serializer: Union[None, Serializer] = staticmethod( # type: ignore
80 functools.partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
80 functools.partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
81 )
81 )
82 deserializer: Union[None, Deserializer] = staticmethod( # type: ignore
82 deserializer: Union[None, Deserializer] = staticmethod( # type: ignore
83 functools.partial(pickle.loads)
83 functools.partial(pickle.loads)
84 )
84 )
85
85
86
86
87 class MsgPackSerializer(object):
87 class MsgPackSerializer(object):
88 serializer: Union[None, Serializer] = staticmethod( # type: ignore
88 serializer: Union[None, Serializer] = staticmethod( # type: ignore
89 msgpack.packb
89 msgpack.packb
90 )
90 )
91 deserializer: Union[None, Deserializer] = staticmethod( # type: ignore
91 deserializer: Union[None, Deserializer] = staticmethod( # type: ignore
92 functools.partial(msgpack.unpackb, use_list=False)
92 functools.partial(msgpack.unpackb, use_list=False)
93 )
93 )
94
94
95
95
96 class CustomLockFactory(FileLock):
96 class CustomLockFactory(FileLock):
97
97
98 @memoized_property
98 @memoized_property
99 def _module(self):
99 def _module(self):
100
100
101 def gevent_flock(fd, operation):
101 def gevent_flock(fd, operation):
102 """
102 """
103 Gevent compatible flock
103 Gevent compatible flock
104 """
104 """
105 # set non-blocking, this will cause an exception if we cannot acquire a lock
105 # set non-blocking, this will cause an exception if we cannot acquire a lock
106 operation |= fcntl.LOCK_NB
106 operation |= fcntl.LOCK_NB
107 start_lock_time = time.time()
107 start_lock_time = time.time()
108 timeout = 60 * 15 # 15min
108 timeout = 60 * 15 # 15min
109 while True:
109 while True:
110 try:
110 try:
111 flock_org(fd, operation)
111 flock_org(fd, operation)
112 # lock has been acquired
112 # lock has been acquired
113 break
113 break
114 except (OSError, IOError) as e:
114 except (OSError, IOError) as e:
115 # raise on other errors than Resource temporarily unavailable
115 # raise on other errors than Resource temporarily unavailable
116 if e.errno != errno.EAGAIN:
116 if e.errno != errno.EAGAIN:
117 raise
117 raise
118 elif (time.time() - start_lock_time) > timeout:
118 elif (time.time() - start_lock_time) > timeout:
119 # waited to much time on a lock, better fail than loop for ever
119 # waited to much time on a lock, better fail than loop for ever
120 log.error('Failed to acquire lock on `%s` after waiting %ss',
120 log.error('Failed to acquire lock on `%s` after waiting %ss',
121 self.filename, timeout)
121 self.filename, timeout)
122 raise
122 raise
123 wait_timeout = 0.03
123 wait_timeout = 0.03
124 log.debug('Failed to acquire lock on `%s`, retry in %ss',
124 log.debug('Failed to acquire lock on `%s`, retry in %ss',
125 self.filename, wait_timeout)
125 self.filename, wait_timeout)
126 gevent.sleep(wait_timeout)
126 gevent.sleep(wait_timeout)
127
127
128 fcntl.flock = gevent_flock
128 fcntl.flock = gevent_flock
129 return fcntl
129 return fcntl
130
130
131
131
132 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
132 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
133 key_prefix = 'file_backend'
133 key_prefix = 'file_backend'
134
134
135 def __init__(self, arguments):
135 def __init__(self, arguments):
136 arguments['lock_factory'] = CustomLockFactory
136 arguments['lock_factory'] = CustomLockFactory
137 db_file = arguments.get('filename')
137 db_file = arguments.get('filename')
138
138
139 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
139 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
140 try:
140 try:
141 super(FileNamespaceBackend, self).__init__(arguments)
141 super(FileNamespaceBackend, self).__init__(arguments)
142 except Exception:
142 except Exception:
143 log.exception('Failed to initialize db at: %s', db_file)
143 log.exception('Failed to initialize db at: %s', db_file)
144 raise
144 raise
145
145
146 def __repr__(self):
146 def __repr__(self):
147 return '{} `{}`'.format(self.__class__, self.filename)
147 return '{} `{}`'.format(self.__class__, self.filename)
148
148
149 def list_keys(self, prefix=''):
149 def list_keys(self, prefix: bytes = b''):
150 prefix = '{}:{}'.format(self.key_prefix, prefix)
150 prefix = b'%b:%b' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
151
151
152 def cond(v):
152 def cond(dbm_key: bytes):
153 if not prefix:
153 if not prefix:
154 return True
154 return True
155
155
156 if v.startswith(prefix):
156 if dbm_key.startswith(prefix):
157 return True
157 return True
158 return False
158 return False
159
159
160 with self._dbm_file(True) as dbm:
160 with self._dbm_file(True) as dbm:
161 try:
161 try:
162 return list(filter(cond, list(dbm.keys())))
162 return list(filter(cond, dbm.keys()))
163 except Exception:
163 except Exception:
164 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
164 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
165 raise
165 raise
166
166
167 def get_store(self):
167 def get_store(self):
168 return self.filename
168 return self.filename
169
169
170
170
171 class BaseRedisBackend(redis_backend.RedisBackend):
171 class BaseRedisBackend(redis_backend.RedisBackend):
172 key_prefix = ''
172 key_prefix = ''
173
173
174 def __init__(self, arguments):
174 def __init__(self, arguments):
175 super(BaseRedisBackend, self).__init__(arguments)
175 super(BaseRedisBackend, self).__init__(arguments)
176 self._lock_timeout = self.lock_timeout
176 self._lock_timeout = self.lock_timeout
177 self._lock_auto_renewal = asbool(arguments.pop("lock_auto_renewal", True))
177 self._lock_auto_renewal = asbool(arguments.pop("lock_auto_renewal", True))
178
178
179 if self._lock_auto_renewal and not self._lock_timeout:
179 if self._lock_auto_renewal and not self._lock_timeout:
180 # set default timeout for auto_renewal
180 # set default timeout for auto_renewal
181 self._lock_timeout = 30
181 self._lock_timeout = 30
182
182
183 def _create_client(self):
183 def _create_client(self):
184 args = {}
184 args = {}
185
185
186 if self.url is not None:
186 if self.url is not None:
187 args.update(url=self.url)
187 args.update(url=self.url)
188
188
189 else:
189 else:
190 args.update(
190 args.update(
191 host=self.host, password=self.password,
191 host=self.host, password=self.password,
192 port=self.port, db=self.db
192 port=self.port, db=self.db
193 )
193 )
194
194
195 connection_pool = redis.ConnectionPool(**args)
195 connection_pool = redis.ConnectionPool(**args)
196 self.writer_client = redis.StrictRedis(
196 self.writer_client = redis.StrictRedis(
197 connection_pool=connection_pool
197 connection_pool=connection_pool
198 )
198 )
199 self.reader_client = self.writer_client
199 self.reader_client = self.writer_client
200
200
201 def list_keys(self, prefix=''):
201 def list_keys(self, prefix=''):
202 prefix = '{}:{}*'.format(self.key_prefix, prefix)
202 prefix = '{}:{}*'.format(self.key_prefix, prefix)
203 return self.reader_client.keys(prefix)
203 return self.reader_client.keys(prefix)
204
204
205 def get_store(self):
205 def get_store(self):
206 return self.reader_client.connection_pool
206 return self.reader_client.connection_pool
207
207
208 def get_mutex(self, key):
208 def get_mutex(self, key):
209 if self.distributed_lock:
209 if self.distributed_lock:
210 lock_key = '_lock_{0}'.format(safe_str(key))
210 lock_key = '_lock_{0}'.format(safe_str(key))
211 return get_mutex_lock(
211 return get_mutex_lock(
212 self.writer_client, lock_key,
212 self.writer_client, lock_key,
213 self._lock_timeout,
213 self._lock_timeout,
214 auto_renewal=self._lock_auto_renewal
214 auto_renewal=self._lock_auto_renewal
215 )
215 )
216 else:
216 else:
217 return None
217 return None
218
218
219
219
220 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
220 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
221 key_prefix = 'redis_pickle_backend'
221 key_prefix = 'redis_pickle_backend'
222 pass
222 pass
223
223
224
224
225 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
225 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
226 key_prefix = 'redis_msgpack_backend'
226 key_prefix = 'redis_msgpack_backend'
227 pass
227 pass
228
228
229
229
230 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
230 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
231 from rhodecode.lib._vendor import redis_lock
231 from rhodecode.lib._vendor import redis_lock
232
232
233 class _RedisLockWrapper(object):
233 class _RedisLockWrapper(object):
234 """LockWrapper for redis_lock"""
234 """LockWrapper for redis_lock"""
235
235
236 @classmethod
236 @classmethod
237 def get_lock(cls):
237 def get_lock(cls):
238 return redis_lock.Lock(
238 return redis_lock.Lock(
239 redis_client=client,
239 redis_client=client,
240 name=lock_key,
240 name=lock_key,
241 expire=lock_timeout,
241 expire=lock_timeout,
242 auto_renewal=auto_renewal,
242 auto_renewal=auto_renewal,
243 strict=True,
243 strict=True,
244 )
244 )
245
245
246 def __repr__(self):
246 def __repr__(self):
247 return "{}:{}".format(self.__class__.__name__, lock_key)
247 return "{}:{}".format(self.__class__.__name__, lock_key)
248
248
249 def __str__(self):
249 def __str__(self):
250 return "{}:{}".format(self.__class__.__name__, lock_key)
250 return "{}:{}".format(self.__class__.__name__, lock_key)
251
251
252 def __init__(self):
252 def __init__(self):
253 self.lock = self.get_lock()
253 self.lock = self.get_lock()
254 self.lock_key = lock_key
254 self.lock_key = lock_key
255
255
256 def acquire(self, wait=True):
256 def acquire(self, wait=True):
257 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
257 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
258 try:
258 try:
259 acquired = self.lock.acquire(wait)
259 acquired = self.lock.acquire(wait)
260 log.debug('Got lock for key %s, %s', self.lock_key, acquired)
260 log.debug('Got lock for key %s, %s', self.lock_key, acquired)
261 return acquired
261 return acquired
262 except redis_lock.AlreadyAcquired:
262 except redis_lock.AlreadyAcquired:
263 return False
263 return False
264 except redis_lock.AlreadyStarted:
264 except redis_lock.AlreadyStarted:
265 # refresh thread exists, but it also means we acquired the lock
265 # refresh thread exists, but it also means we acquired the lock
266 return True
266 return True
267
267
268 def release(self):
268 def release(self):
269 try:
269 try:
270 self.lock.release()
270 self.lock.release()
271 except redis_lock.NotAcquired:
271 except redis_lock.NotAcquired:
272 pass
272 pass
273
273
274 return _RedisLockWrapper()
274 return _RedisLockWrapper()
General Comments 0
You need to be logged in to leave comments. Login now