##// END OF EJS Templates
caches: the file backend should report damaged DB in case of failures.
dan -
r3992:e884ccc4 default
parent child Browse files
Show More
@@ -1,288 +1,295 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2015-2019 RhodeCode GmbH
3 # Copyright (C) 2015-2019 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import time
21 import time
22 import errno
22 import errno
23 import logging
23 import logging
24
24
25 import msgpack
25 import msgpack
26 import gevent
26 import gevent
27 import redis
27 import redis
28
28
29 from dogpile.cache.api import CachedValue
29 from dogpile.cache.api import CachedValue
30 from dogpile.cache.backends import memory as memory_backend
30 from dogpile.cache.backends import memory as memory_backend
31 from dogpile.cache.backends import file as file_backend
31 from dogpile.cache.backends import file as file_backend
32 from dogpile.cache.backends import redis as redis_backend
32 from dogpile.cache.backends import redis as redis_backend
33 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
33 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
34 from dogpile.cache.util import memoized_property
34 from dogpile.cache.util import memoized_property
35
35
36 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
36 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
37
37
38
38
39 _default_max_size = 1024
39 _default_max_size = 1024
40
40
41 log = logging.getLogger(__name__)
41 log = logging.getLogger(__name__)
42
42
43
43
44 class LRUMemoryBackend(memory_backend.MemoryBackend):
44 class LRUMemoryBackend(memory_backend.MemoryBackend):
45 key_prefix = 'lru_mem_backend'
45 key_prefix = 'lru_mem_backend'
46 pickle_values = False
46 pickle_values = False
47
47
48 def __init__(self, arguments):
48 def __init__(self, arguments):
49 max_size = arguments.pop('max_size', _default_max_size)
49 max_size = arguments.pop('max_size', _default_max_size)
50
50
51 LRUDictClass = LRUDict
51 LRUDictClass = LRUDict
52 if arguments.pop('log_key_count', None):
52 if arguments.pop('log_key_count', None):
53 LRUDictClass = LRUDictDebug
53 LRUDictClass = LRUDictDebug
54
54
55 arguments['cache_dict'] = LRUDictClass(max_size)
55 arguments['cache_dict'] = LRUDictClass(max_size)
56 super(LRUMemoryBackend, self).__init__(arguments)
56 super(LRUMemoryBackend, self).__init__(arguments)
57
57
58 def delete(self, key):
58 def delete(self, key):
59 try:
59 try:
60 del self._cache[key]
60 del self._cache[key]
61 except KeyError:
61 except KeyError:
62 # we don't care if key isn't there at deletion
62 # we don't care if key isn't there at deletion
63 pass
63 pass
64
64
65 def delete_multi(self, keys):
65 def delete_multi(self, keys):
66 for key in keys:
66 for key in keys:
67 self.delete(key)
67 self.delete(key)
68
68
69
69
70 class PickleSerializer(object):
70 class PickleSerializer(object):
71
71
72 def _dumps(self, value, safe=False):
72 def _dumps(self, value, safe=False):
73 try:
73 try:
74 return compat.pickle.dumps(value)
74 return compat.pickle.dumps(value)
75 except Exception:
75 except Exception:
76 if safe:
76 if safe:
77 return NO_VALUE
77 return NO_VALUE
78 else:
78 else:
79 raise
79 raise
80
80
81 def _loads(self, value, safe=True):
81 def _loads(self, value, safe=True):
82 try:
82 try:
83 return compat.pickle.loads(value)
83 return compat.pickle.loads(value)
84 except Exception:
84 except Exception:
85 if safe:
85 if safe:
86 return NO_VALUE
86 return NO_VALUE
87 else:
87 else:
88 raise
88 raise
89
89
90
90
91 class MsgPackSerializer(object):
91 class MsgPackSerializer(object):
92
92
93 def _dumps(self, value, safe=False):
93 def _dumps(self, value, safe=False):
94 try:
94 try:
95 return msgpack.packb(value)
95 return msgpack.packb(value)
96 except Exception:
96 except Exception:
97 if safe:
97 if safe:
98 return NO_VALUE
98 return NO_VALUE
99 else:
99 else:
100 raise
100 raise
101
101
102 def _loads(self, value, safe=True):
102 def _loads(self, value, safe=True):
103 """
103 """
104 pickle maintained the `CachedValue` wrapper of the tuple
104 pickle maintained the `CachedValue` wrapper of the tuple
105 msgpack does not, so it must be added back in.
105 msgpack does not, so it must be added back in.
106 """
106 """
107 try:
107 try:
108 value = msgpack.unpackb(value, use_list=False)
108 value = msgpack.unpackb(value, use_list=False)
109 return CachedValue(*value)
109 return CachedValue(*value)
110 except Exception:
110 except Exception:
111 if safe:
111 if safe:
112 return NO_VALUE
112 return NO_VALUE
113 else:
113 else:
114 raise
114 raise
115
115
116
116
117 import fcntl
117 import fcntl
118 flock_org = fcntl.flock
118 flock_org = fcntl.flock
119
119
120
120
121 class CustomLockFactory(FileLock):
121 class CustomLockFactory(FileLock):
122
122
123 @memoized_property
123 @memoized_property
124 def _module(self):
124 def _module(self):
125
125
126 def gevent_flock(fd, operation):
126 def gevent_flock(fd, operation):
127 """
127 """
128 Gevent compatible flock
128 Gevent compatible flock
129 """
129 """
130 # set non-blocking, this will cause an exception if we cannot acquire a lock
130 # set non-blocking, this will cause an exception if we cannot acquire a lock
131 operation |= fcntl.LOCK_NB
131 operation |= fcntl.LOCK_NB
132 start_lock_time = time.time()
132 start_lock_time = time.time()
133 timeout = 60 * 15 # 15min
133 timeout = 60 * 15 # 15min
134 while True:
134 while True:
135 try:
135 try:
136 flock_org(fd, operation)
136 flock_org(fd, operation)
137 # lock has been acquired
137 # lock has been acquired
138 break
138 break
139 except (OSError, IOError) as e:
139 except (OSError, IOError) as e:
140 # raise on other errors than Resource temporarily unavailable
140 # raise on other errors than Resource temporarily unavailable
141 if e.errno != errno.EAGAIN:
141 if e.errno != errno.EAGAIN:
142 raise
142 raise
143 elif (time.time() - start_lock_time) > timeout:
143 elif (time.time() - start_lock_time) > timeout:
144 # waited to much time on a lock, better fail than loop for ever
144 # waited to much time on a lock, better fail than loop for ever
145 log.error('Failed to acquire lock on `%s` after waiting %ss',
145 log.error('Failed to acquire lock on `%s` after waiting %ss',
146 self.filename, timeout)
146 self.filename, timeout)
147 raise
147 raise
148 wait_timeout = 0.03
148 wait_timeout = 0.03
149 log.debug('Failed to acquire lock on `%s`, retry in %ss',
149 log.debug('Failed to acquire lock on `%s`, retry in %ss',
150 self.filename, wait_timeout)
150 self.filename, wait_timeout)
151 gevent.sleep(wait_timeout)
151 gevent.sleep(wait_timeout)
152
152
153 fcntl.flock = gevent_flock
153 fcntl.flock = gevent_flock
154 return fcntl
154 return fcntl
155
155
156
156
157 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
157 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
158 key_prefix = 'file_backend'
158 key_prefix = 'file_backend'
159
159
160 def __init__(self, arguments):
160 def __init__(self, arguments):
161 arguments['lock_factory'] = CustomLockFactory
161 arguments['lock_factory'] = CustomLockFactory
162 super(FileNamespaceBackend, self).__init__(arguments)
162 super(FileNamespaceBackend, self).__init__(arguments)
163
163
164 def __repr__(self):
164 def __repr__(self):
165 return '{} `{}`'.format(self.__class__, self.filename)
165 return '{} `{}`'.format(self.__class__, self.filename)
166
166
167 def list_keys(self, prefix=''):
167 def list_keys(self, prefix=''):
168 prefix = '{}:{}'.format(self.key_prefix, prefix)
168 prefix = '{}:{}'.format(self.key_prefix, prefix)
169
169
170 def cond(v):
170 def cond(v):
171 if not prefix:
171 if not prefix:
172 return True
172 return True
173
173
174 if v.startswith(prefix):
174 if v.startswith(prefix):
175 return True
175 return True
176 return False
176 return False
177
177
178 with self._dbm_file(True) as dbm:
178 with self._dbm_file(True) as dbm:
179
179
180 return filter(cond, dbm.keys())
180 return filter(cond, dbm.keys())
181
181
182 def get_store(self):
182 def get_store(self):
183 return self.filename
183 return self.filename
184
184
185 def get(self, key):
185 def _dbm_get(self, key):
186 with self._dbm_file(False) as dbm:
186 with self._dbm_file(False) as dbm:
187 if hasattr(dbm, 'get'):
187 if hasattr(dbm, 'get'):
188 value = dbm.get(key, NO_VALUE)
188 value = dbm.get(key, NO_VALUE)
189 else:
189 else:
190 # gdbm objects lack a .get method
190 # gdbm objects lack a .get method
191 try:
191 try:
192 value = dbm[key]
192 value = dbm[key]
193 except KeyError:
193 except KeyError:
194 value = NO_VALUE
194 value = NO_VALUE
195 if value is not NO_VALUE:
195 if value is not NO_VALUE:
196 value = self._loads(value)
196 value = self._loads(value)
197 return value
197 return value
198
198
199 def get(self, key):
200 try:
201 return self._dbm_get(key)
202 except Exception:
203 log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store())
204 raise
205
199 def set(self, key, value):
206 def set(self, key, value):
200 with self._dbm_file(True) as dbm:
207 with self._dbm_file(True) as dbm:
201 dbm[key] = self._dumps(value)
208 dbm[key] = self._dumps(value)
202
209
203 def set_multi(self, mapping):
210 def set_multi(self, mapping):
204 with self._dbm_file(True) as dbm:
211 with self._dbm_file(True) as dbm:
205 for key, value in mapping.items():
212 for key, value in mapping.items():
206 dbm[key] = self._dumps(value)
213 dbm[key] = self._dumps(value)
207
214
208
215
209 class BaseRedisBackend(redis_backend.RedisBackend):
216 class BaseRedisBackend(redis_backend.RedisBackend):
210
217
211 def _create_client(self):
218 def _create_client(self):
212 args = {}
219 args = {}
213
220
214 if self.url is not None:
221 if self.url is not None:
215 args.update(url=self.url)
222 args.update(url=self.url)
216
223
217 else:
224 else:
218 args.update(
225 args.update(
219 host=self.host, password=self.password,
226 host=self.host, password=self.password,
220 port=self.port, db=self.db
227 port=self.port, db=self.db
221 )
228 )
222
229
223 connection_pool = redis.ConnectionPool(**args)
230 connection_pool = redis.ConnectionPool(**args)
224
231
225 return redis.StrictRedis(connection_pool=connection_pool)
232 return redis.StrictRedis(connection_pool=connection_pool)
226
233
227 def list_keys(self, prefix=''):
234 def list_keys(self, prefix=''):
228 prefix = '{}:{}*'.format(self.key_prefix, prefix)
235 prefix = '{}:{}*'.format(self.key_prefix, prefix)
229 return self.client.keys(prefix)
236 return self.client.keys(prefix)
230
237
231 def get_store(self):
238 def get_store(self):
232 return self.client.connection_pool
239 return self.client.connection_pool
233
240
234 def get(self, key):
241 def get(self, key):
235 value = self.client.get(key)
242 value = self.client.get(key)
236 if value is None:
243 if value is None:
237 return NO_VALUE
244 return NO_VALUE
238 return self._loads(value)
245 return self._loads(value)
239
246
240 def get_multi(self, keys):
247 def get_multi(self, keys):
241 if not keys:
248 if not keys:
242 return []
249 return []
243 values = self.client.mget(keys)
250 values = self.client.mget(keys)
244 loads = self._loads
251 loads = self._loads
245 return [
252 return [
246 loads(v) if v is not None else NO_VALUE
253 loads(v) if v is not None else NO_VALUE
247 for v in values]
254 for v in values]
248
255
249 def set(self, key, value):
256 def set(self, key, value):
250 if self.redis_expiration_time:
257 if self.redis_expiration_time:
251 self.client.setex(key, self.redis_expiration_time,
258 self.client.setex(key, self.redis_expiration_time,
252 self._dumps(value))
259 self._dumps(value))
253 else:
260 else:
254 self.client.set(key, self._dumps(value))
261 self.client.set(key, self._dumps(value))
255
262
256 def set_multi(self, mapping):
263 def set_multi(self, mapping):
257 dumps = self._dumps
264 dumps = self._dumps
258 mapping = dict(
265 mapping = dict(
259 (k, dumps(v))
266 (k, dumps(v))
260 for k, v in mapping.items()
267 for k, v in mapping.items()
261 )
268 )
262
269
263 if not self.redis_expiration_time:
270 if not self.redis_expiration_time:
264 self.client.mset(mapping)
271 self.client.mset(mapping)
265 else:
272 else:
266 pipe = self.client.pipeline()
273 pipe = self.client.pipeline()
267 for key, value in mapping.items():
274 for key, value in mapping.items():
268 pipe.setex(key, self.redis_expiration_time, value)
275 pipe.setex(key, self.redis_expiration_time, value)
269 pipe.execute()
276 pipe.execute()
270
277
271 def get_mutex(self, key):
278 def get_mutex(self, key):
272 u = redis_backend.u
279 u = redis_backend.u
273 if self.distributed_lock:
280 if self.distributed_lock:
274 lock_key = u('_lock_{0}').format(key)
281 lock_key = u('_lock_{0}').format(key)
275 log.debug('Trying to acquire Redis lock for key %s', lock_key)
282 log.debug('Trying to acquire Redis lock for key %s', lock_key)
276 return self.client.lock(lock_key, self.lock_timeout, self.lock_sleep)
283 return self.client.lock(lock_key, self.lock_timeout, self.lock_sleep)
277 else:
284 else:
278 return None
285 return None
279
286
280
287
281 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
288 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
282 key_prefix = 'redis_pickle_backend'
289 key_prefix = 'redis_pickle_backend'
283 pass
290 pass
284
291
285
292
286 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
293 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
287 key_prefix = 'redis_msgpack_backend'
294 key_prefix = 'redis_msgpack_backend'
288 pass
295 pass
General Comments 0
You need to be logged in to leave comments. Login now