##// END OF EJS Templates
caches: fixed dbm keys calls to use bytes
super-admin -
r1082:58fdadac python3
parent child Browse files
Show More
@@ -1,239 +1,239 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import time
19 19 import errno
20 20 import logging
21 21 import functools
22 22
23 23 import msgpack
24 24 import redis
25 25 import pickle
26 26 import fcntl
27 27 flock_org = fcntl.flock
28 28 from typing import Union
29 29
30 30 from dogpile.cache.backends import memory as memory_backend
31 31 from dogpile.cache.backends import file as file_backend
32 32 from dogpile.cache.backends import redis as redis_backend
33 33 from dogpile.cache.backends.file import FileLock
34 34 from dogpile.cache.util import memoized_property
35 35 from dogpile.cache.api import Serializer, Deserializer
36 36
37 37 from pyramid.settings import asbool
38 38
39 39 from vcsserver.lib.memory_lru_dict import LRUDict, LRUDictDebug
40 from vcsserver.str_utils import safe_str
40 from vcsserver.str_utils import safe_str, safe_bytes
41 41
42 42
43 43 _default_max_size = 1024
44 44
45 45 log = logging.getLogger(__name__)
46 46
47 47
48 48 class LRUMemoryBackend(memory_backend.MemoryBackend):
49 49 key_prefix = 'lru_mem_backend'
50 50 pickle_values = False
51 51
52 52 def __init__(self, arguments):
53 53 max_size = arguments.pop('max_size', _default_max_size)
54 54
55 55 LRUDictClass = LRUDict
56 56 if arguments.pop('log_key_count', None):
57 57 LRUDictClass = LRUDictDebug
58 58
59 59 arguments['cache_dict'] = LRUDictClass(max_size)
60 60 super(LRUMemoryBackend, self).__init__(arguments)
61 61
62 62 def delete(self, key):
63 63 try:
64 64 del self._cache[key]
65 65 except KeyError:
66 66 # we don't care if key isn't there at deletion
67 67 pass
68 68
69 69 def delete_multi(self, keys):
70 70 for key in keys:
71 71 self.delete(key)
72 72
73 73
74 74 class PickleSerializer:
75 75 serializer: Union[None, Serializer] = staticmethod( # type: ignore
76 76 functools.partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
77 77 )
78 78 deserializer: Union[None, Deserializer] = staticmethod( # type: ignore
79 79 functools.partial(pickle.loads)
80 80 )
81 81
82 82
83 83 class MsgPackSerializer(object):
84 84 serializer: Union[None, Serializer] = staticmethod( # type: ignore
85 85 msgpack.packb
86 86 )
87 87 deserializer: Union[None, Deserializer] = staticmethod( # type: ignore
88 88 functools.partial(msgpack.unpackb, use_list=False)
89 89 )
90 90
91 91
92 92 class CustomLockFactory(FileLock):
93 93
94 94 pass
95 95
96 96
97 97 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
98 98 key_prefix = 'file_backend'
99 99
100 100 def __init__(self, arguments):
101 101 arguments['lock_factory'] = CustomLockFactory
102 102 db_file = arguments.get('filename')
103 103
104 104 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
105 105 try:
106 106 super(FileNamespaceBackend, self).__init__(arguments)
107 107 except Exception:
108 108 log.exception('Failed to initialize db at: %s', db_file)
109 109 raise
110 110
111 111 def __repr__(self):
112 112 return '{} `{}`'.format(self.__class__, self.filename)
113 113
114 def list_keys(self, prefix=''):
115 prefix = '{}:{}'.format(self.key_prefix, prefix)
114 def list_keys(self, prefix: bytes = b''):
115 prefix = b'%b:%b' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
116 116
117 def cond(v):
117 def cond(dbm_key: bytes):
118 118 if not prefix:
119 119 return True
120 120
121 if v.startswith(prefix):
121 if dbm_key.startswith(prefix):
122 122 return True
123 123 return False
124 124
125 125 with self._dbm_file(True) as dbm:
126 126 try:
127 return list(filter(cond, list(dbm.keys())))
127 return list(filter(cond, dbm.keys()))
128 128 except Exception:
129 129 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
130 130 raise
131 131
132 132 def get_store(self):
133 133 return self.filename
134 134
135 135
136 136 class BaseRedisBackend(redis_backend.RedisBackend):
137 137 key_prefix = ''
138 138
139 139 def __init__(self, arguments):
140 140 super(BaseRedisBackend, self).__init__(arguments)
141 141 self._lock_timeout = self.lock_timeout
142 142 self._lock_auto_renewal = asbool(arguments.pop("lock_auto_renewal", True))
143 143
144 144 if self._lock_auto_renewal and not self._lock_timeout:
145 145 # set default timeout for auto_renewal
146 146 self._lock_timeout = 30
147 147
148 148 def _create_client(self):
149 149 args = {}
150 150
151 151 if self.url is not None:
152 152 args.update(url=self.url)
153 153
154 154 else:
155 155 args.update(
156 156 host=self.host, password=self.password,
157 157 port=self.port, db=self.db
158 158 )
159 159
160 160 connection_pool = redis.ConnectionPool(**args)
161 161 self.writer_client = redis.StrictRedis(
162 162 connection_pool=connection_pool
163 163 )
164 164 self.reader_client = self.writer_client
165 165
166 166 def list_keys(self, prefix=''):
167 167 prefix = '{}:{}*'.format(self.key_prefix, prefix)
168 168 return self.reader_client.keys(prefix)
169 169
170 170 def get_store(self):
171 171 return self.reader_client.connection_pool
172 172
173 173 def get_mutex(self, key):
174 174 if self.distributed_lock:
175 175 lock_key = '_lock_{0}'.format(safe_str(key))
176 176 return get_mutex_lock(
177 177 self.writer_client, lock_key,
178 178 self._lock_timeout,
179 179 auto_renewal=self._lock_auto_renewal
180 180 )
181 181 else:
182 182 return None
183 183
184 184
185 185 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
186 186 key_prefix = 'redis_pickle_backend'
187 187 pass
188 188
189 189
190 190 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
191 191 key_prefix = 'redis_msgpack_backend'
192 192 pass
193 193
194 194
195 195 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
196 196 from vcsserver.lib._vendor import redis_lock
197 197
198 198 class _RedisLockWrapper(object):
199 199 """LockWrapper for redis_lock"""
200 200
201 201 @classmethod
202 202 def get_lock(cls):
203 203 return redis_lock.Lock(
204 204 redis_client=client,
205 205 name=lock_key,
206 206 expire=lock_timeout,
207 207 auto_renewal=auto_renewal,
208 208 strict=True,
209 209 )
210 210
211 211 def __repr__(self):
212 212 return "{}:{}".format(self.__class__.__name__, lock_key)
213 213
214 214 def __str__(self):
215 215 return "{}:{}".format(self.__class__.__name__, lock_key)
216 216
217 217 def __init__(self):
218 218 self.lock = self.get_lock()
219 219 self.lock_key = lock_key
220 220
221 221 def acquire(self, wait=True):
222 222 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
223 223 try:
224 224 acquired = self.lock.acquire(wait)
225 225 log.debug('Got lock for key %s, %s', self.lock_key, acquired)
226 226 return acquired
227 227 except redis_lock.AlreadyAcquired:
228 228 return False
229 229 except redis_lock.AlreadyStarted:
230 230 # refresh thread exists, but it also means we acquired the lock
231 231 return True
232 232
233 233 def release(self):
234 234 try:
235 235 self.lock.release()
236 236 except redis_lock.NotAcquired:
237 237 pass
238 238
239 239 return _RedisLockWrapper()
General Comments 0
You need to be logged in to leave comments. Login now