##// END OF EJS Templates
caches: fixed dbm keys calls to use bytes
super-admin -
r1082:58fdadac python3
parent child Browse files
Show More
@@ -1,239 +1,239 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2020 RhodeCode GmbH
2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 import time
18 import time
19 import errno
19 import errno
20 import logging
20 import logging
21 import functools
21 import functools
22
22
23 import msgpack
23 import msgpack
24 import redis
24 import redis
25 import pickle
25 import pickle
26 import fcntl
26 import fcntl
27 flock_org = fcntl.flock
27 flock_org = fcntl.flock
28 from typing import Union
28 from typing import Union
29
29
30 from dogpile.cache.backends import memory as memory_backend
30 from dogpile.cache.backends import memory as memory_backend
31 from dogpile.cache.backends import file as file_backend
31 from dogpile.cache.backends import file as file_backend
32 from dogpile.cache.backends import redis as redis_backend
32 from dogpile.cache.backends import redis as redis_backend
33 from dogpile.cache.backends.file import FileLock
33 from dogpile.cache.backends.file import FileLock
34 from dogpile.cache.util import memoized_property
34 from dogpile.cache.util import memoized_property
35 from dogpile.cache.api import Serializer, Deserializer
35 from dogpile.cache.api import Serializer, Deserializer
36
36
37 from pyramid.settings import asbool
37 from pyramid.settings import asbool
38
38
39 from vcsserver.lib.memory_lru_dict import LRUDict, LRUDictDebug
39 from vcsserver.lib.memory_lru_dict import LRUDict, LRUDictDebug
40 from vcsserver.str_utils import safe_str
40 from vcsserver.str_utils import safe_str, safe_bytes
41
41
42
42
43 _default_max_size = 1024
43 _default_max_size = 1024
44
44
45 log = logging.getLogger(__name__)
45 log = logging.getLogger(__name__)
46
46
47
47
48 class LRUMemoryBackend(memory_backend.MemoryBackend):
48 class LRUMemoryBackend(memory_backend.MemoryBackend):
49 key_prefix = 'lru_mem_backend'
49 key_prefix = 'lru_mem_backend'
50 pickle_values = False
50 pickle_values = False
51
51
52 def __init__(self, arguments):
52 def __init__(self, arguments):
53 max_size = arguments.pop('max_size', _default_max_size)
53 max_size = arguments.pop('max_size', _default_max_size)
54
54
55 LRUDictClass = LRUDict
55 LRUDictClass = LRUDict
56 if arguments.pop('log_key_count', None):
56 if arguments.pop('log_key_count', None):
57 LRUDictClass = LRUDictDebug
57 LRUDictClass = LRUDictDebug
58
58
59 arguments['cache_dict'] = LRUDictClass(max_size)
59 arguments['cache_dict'] = LRUDictClass(max_size)
60 super(LRUMemoryBackend, self).__init__(arguments)
60 super(LRUMemoryBackend, self).__init__(arguments)
61
61
62 def delete(self, key):
62 def delete(self, key):
63 try:
63 try:
64 del self._cache[key]
64 del self._cache[key]
65 except KeyError:
65 except KeyError:
66 # we don't care if key isn't there at deletion
66 # we don't care if key isn't there at deletion
67 pass
67 pass
68
68
69 def delete_multi(self, keys):
69 def delete_multi(self, keys):
70 for key in keys:
70 for key in keys:
71 self.delete(key)
71 self.delete(key)
72
72
73
73
74 class PickleSerializer:
74 class PickleSerializer:
75 serializer: Union[None, Serializer] = staticmethod( # type: ignore
75 serializer: Union[None, Serializer] = staticmethod( # type: ignore
76 functools.partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
76 functools.partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
77 )
77 )
78 deserializer: Union[None, Deserializer] = staticmethod( # type: ignore
78 deserializer: Union[None, Deserializer] = staticmethod( # type: ignore
79 functools.partial(pickle.loads)
79 functools.partial(pickle.loads)
80 )
80 )
81
81
82
82
83 class MsgPackSerializer(object):
83 class MsgPackSerializer(object):
84 serializer: Union[None, Serializer] = staticmethod( # type: ignore
84 serializer: Union[None, Serializer] = staticmethod( # type: ignore
85 msgpack.packb
85 msgpack.packb
86 )
86 )
87 deserializer: Union[None, Deserializer] = staticmethod( # type: ignore
87 deserializer: Union[None, Deserializer] = staticmethod( # type: ignore
88 functools.partial(msgpack.unpackb, use_list=False)
88 functools.partial(msgpack.unpackb, use_list=False)
89 )
89 )
90
90
91
91
92 class CustomLockFactory(FileLock):
92 class CustomLockFactory(FileLock):
93
93
94 pass
94 pass
95
95
96
96
97 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
97 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
98 key_prefix = 'file_backend'
98 key_prefix = 'file_backend'
99
99
100 def __init__(self, arguments):
100 def __init__(self, arguments):
101 arguments['lock_factory'] = CustomLockFactory
101 arguments['lock_factory'] = CustomLockFactory
102 db_file = arguments.get('filename')
102 db_file = arguments.get('filename')
103
103
104 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
104 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
105 try:
105 try:
106 super(FileNamespaceBackend, self).__init__(arguments)
106 super(FileNamespaceBackend, self).__init__(arguments)
107 except Exception:
107 except Exception:
108 log.exception('Failed to initialize db at: %s', db_file)
108 log.exception('Failed to initialize db at: %s', db_file)
109 raise
109 raise
110
110
111 def __repr__(self):
111 def __repr__(self):
112 return '{} `{}`'.format(self.__class__, self.filename)
112 return '{} `{}`'.format(self.__class__, self.filename)
113
113
114 def list_keys(self, prefix=''):
114 def list_keys(self, prefix: bytes = b''):
115 prefix = '{}:{}'.format(self.key_prefix, prefix)
115 prefix = b'%b:%b' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
116
116
117 def cond(v):
117 def cond(dbm_key: bytes):
118 if not prefix:
118 if not prefix:
119 return True
119 return True
120
120
121 if v.startswith(prefix):
121 if dbm_key.startswith(prefix):
122 return True
122 return True
123 return False
123 return False
124
124
125 with self._dbm_file(True) as dbm:
125 with self._dbm_file(True) as dbm:
126 try:
126 try:
127 return list(filter(cond, list(dbm.keys())))
127 return list(filter(cond, dbm.keys()))
128 except Exception:
128 except Exception:
129 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
129 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
130 raise
130 raise
131
131
132 def get_store(self):
132 def get_store(self):
133 return self.filename
133 return self.filename
134
134
135
135
136 class BaseRedisBackend(redis_backend.RedisBackend):
136 class BaseRedisBackend(redis_backend.RedisBackend):
137 key_prefix = ''
137 key_prefix = ''
138
138
139 def __init__(self, arguments):
139 def __init__(self, arguments):
140 super(BaseRedisBackend, self).__init__(arguments)
140 super(BaseRedisBackend, self).__init__(arguments)
141 self._lock_timeout = self.lock_timeout
141 self._lock_timeout = self.lock_timeout
142 self._lock_auto_renewal = asbool(arguments.pop("lock_auto_renewal", True))
142 self._lock_auto_renewal = asbool(arguments.pop("lock_auto_renewal", True))
143
143
144 if self._lock_auto_renewal and not self._lock_timeout:
144 if self._lock_auto_renewal and not self._lock_timeout:
145 # set default timeout for auto_renewal
145 # set default timeout for auto_renewal
146 self._lock_timeout = 30
146 self._lock_timeout = 30
147
147
148 def _create_client(self):
148 def _create_client(self):
149 args = {}
149 args = {}
150
150
151 if self.url is not None:
151 if self.url is not None:
152 args.update(url=self.url)
152 args.update(url=self.url)
153
153
154 else:
154 else:
155 args.update(
155 args.update(
156 host=self.host, password=self.password,
156 host=self.host, password=self.password,
157 port=self.port, db=self.db
157 port=self.port, db=self.db
158 )
158 )
159
159
160 connection_pool = redis.ConnectionPool(**args)
160 connection_pool = redis.ConnectionPool(**args)
161 self.writer_client = redis.StrictRedis(
161 self.writer_client = redis.StrictRedis(
162 connection_pool=connection_pool
162 connection_pool=connection_pool
163 )
163 )
164 self.reader_client = self.writer_client
164 self.reader_client = self.writer_client
165
165
166 def list_keys(self, prefix=''):
166 def list_keys(self, prefix=''):
167 prefix = '{}:{}*'.format(self.key_prefix, prefix)
167 prefix = '{}:{}*'.format(self.key_prefix, prefix)
168 return self.reader_client.keys(prefix)
168 return self.reader_client.keys(prefix)
169
169
170 def get_store(self):
170 def get_store(self):
171 return self.reader_client.connection_pool
171 return self.reader_client.connection_pool
172
172
173 def get_mutex(self, key):
173 def get_mutex(self, key):
174 if self.distributed_lock:
174 if self.distributed_lock:
175 lock_key = '_lock_{0}'.format(safe_str(key))
175 lock_key = '_lock_{0}'.format(safe_str(key))
176 return get_mutex_lock(
176 return get_mutex_lock(
177 self.writer_client, lock_key,
177 self.writer_client, lock_key,
178 self._lock_timeout,
178 self._lock_timeout,
179 auto_renewal=self._lock_auto_renewal
179 auto_renewal=self._lock_auto_renewal
180 )
180 )
181 else:
181 else:
182 return None
182 return None
183
183
184
184
185 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
185 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
186 key_prefix = 'redis_pickle_backend'
186 key_prefix = 'redis_pickle_backend'
187 pass
187 pass
188
188
189
189
190 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
190 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
191 key_prefix = 'redis_msgpack_backend'
191 key_prefix = 'redis_msgpack_backend'
192 pass
192 pass
193
193
194
194
195 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
195 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
196 from vcsserver.lib._vendor import redis_lock
196 from vcsserver.lib._vendor import redis_lock
197
197
198 class _RedisLockWrapper(object):
198 class _RedisLockWrapper(object):
199 """LockWrapper for redis_lock"""
199 """LockWrapper for redis_lock"""
200
200
201 @classmethod
201 @classmethod
202 def get_lock(cls):
202 def get_lock(cls):
203 return redis_lock.Lock(
203 return redis_lock.Lock(
204 redis_client=client,
204 redis_client=client,
205 name=lock_key,
205 name=lock_key,
206 expire=lock_timeout,
206 expire=lock_timeout,
207 auto_renewal=auto_renewal,
207 auto_renewal=auto_renewal,
208 strict=True,
208 strict=True,
209 )
209 )
210
210
211 def __repr__(self):
211 def __repr__(self):
212 return "{}:{}".format(self.__class__.__name__, lock_key)
212 return "{}:{}".format(self.__class__.__name__, lock_key)
213
213
214 def __str__(self):
214 def __str__(self):
215 return "{}:{}".format(self.__class__.__name__, lock_key)
215 return "{}:{}".format(self.__class__.__name__, lock_key)
216
216
217 def __init__(self):
217 def __init__(self):
218 self.lock = self.get_lock()
218 self.lock = self.get_lock()
219 self.lock_key = lock_key
219 self.lock_key = lock_key
220
220
221 def acquire(self, wait=True):
221 def acquire(self, wait=True):
222 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
222 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
223 try:
223 try:
224 acquired = self.lock.acquire(wait)
224 acquired = self.lock.acquire(wait)
225 log.debug('Got lock for key %s, %s', self.lock_key, acquired)
225 log.debug('Got lock for key %s, %s', self.lock_key, acquired)
226 return acquired
226 return acquired
227 except redis_lock.AlreadyAcquired:
227 except redis_lock.AlreadyAcquired:
228 return False
228 return False
229 except redis_lock.AlreadyStarted:
229 except redis_lock.AlreadyStarted:
230 # refresh thread exists, but it also means we acquired the lock
230 # refresh thread exists, but it also means we acquired the lock
231 return True
231 return True
232
232
233 def release(self):
233 def release(self):
234 try:
234 try:
235 self.lock.release()
235 self.lock.release()
236 except redis_lock.NotAcquired:
236 except redis_lock.NotAcquired:
237 pass
237 pass
238
238
239 return _RedisLockWrapper()
239 return _RedisLockWrapper()
General Comments 0
You need to be logged in to leave comments. Login now