##// END OF EJS Templates
caches: log errors when doing an initilize of DB caches.
marcink -
r4423:82526ce6 default
parent child Browse files
Show More
@@ -1,295 +1,302 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import time
22 22 import errno
23 23 import logging
24 24
25 25 import msgpack
26 26 import gevent
27 27 import redis
28 28
29 29 from dogpile.cache.api import CachedValue
30 30 from dogpile.cache.backends import memory as memory_backend
31 31 from dogpile.cache.backends import file as file_backend
32 32 from dogpile.cache.backends import redis as redis_backend
33 33 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
34 34 from dogpile.cache.util import memoized_property
35 35
36 36 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
37 37
38 38
39 39 _default_max_size = 1024
40 40
41 41 log = logging.getLogger(__name__)
42 42
43 43
44 44 class LRUMemoryBackend(memory_backend.MemoryBackend):
45 45 key_prefix = 'lru_mem_backend'
46 46 pickle_values = False
47 47
48 48 def __init__(self, arguments):
49 49 max_size = arguments.pop('max_size', _default_max_size)
50 50
51 51 LRUDictClass = LRUDict
52 52 if arguments.pop('log_key_count', None):
53 53 LRUDictClass = LRUDictDebug
54 54
55 55 arguments['cache_dict'] = LRUDictClass(max_size)
56 56 super(LRUMemoryBackend, self).__init__(arguments)
57 57
58 58 def delete(self, key):
59 59 try:
60 60 del self._cache[key]
61 61 except KeyError:
62 62 # we don't care if key isn't there at deletion
63 63 pass
64 64
65 65 def delete_multi(self, keys):
66 66 for key in keys:
67 67 self.delete(key)
68 68
69 69
70 70 class PickleSerializer(object):
71 71
72 72 def _dumps(self, value, safe=False):
73 73 try:
74 74 return compat.pickle.dumps(value)
75 75 except Exception:
76 76 if safe:
77 77 return NO_VALUE
78 78 else:
79 79 raise
80 80
81 81 def _loads(self, value, safe=True):
82 82 try:
83 83 return compat.pickle.loads(value)
84 84 except Exception:
85 85 if safe:
86 86 return NO_VALUE
87 87 else:
88 88 raise
89 89
90 90
91 91 class MsgPackSerializer(object):
92 92
93 93 def _dumps(self, value, safe=False):
94 94 try:
95 95 return msgpack.packb(value)
96 96 except Exception:
97 97 if safe:
98 98 return NO_VALUE
99 99 else:
100 100 raise
101 101
102 102 def _loads(self, value, safe=True):
103 103 """
104 104 pickle maintained the `CachedValue` wrapper of the tuple
105 105 msgpack does not, so it must be added back in.
106 106 """
107 107 try:
108 108 value = msgpack.unpackb(value, use_list=False)
109 109 return CachedValue(*value)
110 110 except Exception:
111 111 if safe:
112 112 return NO_VALUE
113 113 else:
114 114 raise
115 115
116 116
117 117 import fcntl
118 118 flock_org = fcntl.flock
119 119
120 120
121 121 class CustomLockFactory(FileLock):
122 122
123 123 @memoized_property
124 124 def _module(self):
125 125
126 126 def gevent_flock(fd, operation):
127 127 """
128 128 Gevent compatible flock
129 129 """
130 130 # set non-blocking, this will cause an exception if we cannot acquire a lock
131 131 operation |= fcntl.LOCK_NB
132 132 start_lock_time = time.time()
133 133 timeout = 60 * 15 # 15min
134 134 while True:
135 135 try:
136 136 flock_org(fd, operation)
137 137 # lock has been acquired
138 138 break
139 139 except (OSError, IOError) as e:
140 140 # raise on other errors than Resource temporarily unavailable
141 141 if e.errno != errno.EAGAIN:
142 142 raise
143 143 elif (time.time() - start_lock_time) > timeout:
144 144 # waited to much time on a lock, better fail than loop for ever
145 145 log.error('Failed to acquire lock on `%s` after waiting %ss',
146 146 self.filename, timeout)
147 147 raise
148 148 wait_timeout = 0.03
149 149 log.debug('Failed to acquire lock on `%s`, retry in %ss',
150 150 self.filename, wait_timeout)
151 151 gevent.sleep(wait_timeout)
152 152
153 153 fcntl.flock = gevent_flock
154 154 return fcntl
155 155
156 156
157 157 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
158 158 key_prefix = 'file_backend'
159 159
160 160 def __init__(self, arguments):
161 161 arguments['lock_factory'] = CustomLockFactory
162 db_file = arguments.get('filename')
163
164 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
165 try:
162 166 super(FileNamespaceBackend, self).__init__(arguments)
167 except Exception:
168 log.error('Failed to initialize db at: %s', db_file)
169 raise
163 170
164 171 def __repr__(self):
165 172 return '{} `{}`'.format(self.__class__, self.filename)
166 173
167 174 def list_keys(self, prefix=''):
168 175 prefix = '{}:{}'.format(self.key_prefix, prefix)
169 176
170 177 def cond(v):
171 178 if not prefix:
172 179 return True
173 180
174 181 if v.startswith(prefix):
175 182 return True
176 183 return False
177 184
178 185 with self._dbm_file(True) as dbm:
179 186
180 187 return filter(cond, dbm.keys())
181 188
182 189 def get_store(self):
183 190 return self.filename
184 191
185 192 def _dbm_get(self, key):
186 193 with self._dbm_file(False) as dbm:
187 194 if hasattr(dbm, 'get'):
188 195 value = dbm.get(key, NO_VALUE)
189 196 else:
190 197 # gdbm objects lack a .get method
191 198 try:
192 199 value = dbm[key]
193 200 except KeyError:
194 201 value = NO_VALUE
195 202 if value is not NO_VALUE:
196 203 value = self._loads(value)
197 204 return value
198 205
199 206 def get(self, key):
200 207 try:
201 208 return self._dbm_get(key)
202 209 except Exception:
203 210 log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store())
204 211 raise
205 212
206 213 def set(self, key, value):
207 214 with self._dbm_file(True) as dbm:
208 215 dbm[key] = self._dumps(value)
209 216
210 217 def set_multi(self, mapping):
211 218 with self._dbm_file(True) as dbm:
212 219 for key, value in mapping.items():
213 220 dbm[key] = self._dumps(value)
214 221
215 222
216 223 class BaseRedisBackend(redis_backend.RedisBackend):
217 224
218 225 def _create_client(self):
219 226 args = {}
220 227
221 228 if self.url is not None:
222 229 args.update(url=self.url)
223 230
224 231 else:
225 232 args.update(
226 233 host=self.host, password=self.password,
227 234 port=self.port, db=self.db
228 235 )
229 236
230 237 connection_pool = redis.ConnectionPool(**args)
231 238
232 239 return redis.StrictRedis(connection_pool=connection_pool)
233 240
234 241 def list_keys(self, prefix=''):
235 242 prefix = '{}:{}*'.format(self.key_prefix, prefix)
236 243 return self.client.keys(prefix)
237 244
238 245 def get_store(self):
239 246 return self.client.connection_pool
240 247
241 248 def get(self, key):
242 249 value = self.client.get(key)
243 250 if value is None:
244 251 return NO_VALUE
245 252 return self._loads(value)
246 253
247 254 def get_multi(self, keys):
248 255 if not keys:
249 256 return []
250 257 values = self.client.mget(keys)
251 258 loads = self._loads
252 259 return [
253 260 loads(v) if v is not None else NO_VALUE
254 261 for v in values]
255 262
256 263 def set(self, key, value):
257 264 if self.redis_expiration_time:
258 265 self.client.setex(key, self.redis_expiration_time,
259 266 self._dumps(value))
260 267 else:
261 268 self.client.set(key, self._dumps(value))
262 269
263 270 def set_multi(self, mapping):
264 271 dumps = self._dumps
265 272 mapping = dict(
266 273 (k, dumps(v))
267 274 for k, v in mapping.items()
268 275 )
269 276
270 277 if not self.redis_expiration_time:
271 278 self.client.mset(mapping)
272 279 else:
273 280 pipe = self.client.pipeline()
274 281 for key, value in mapping.items():
275 282 pipe.setex(key, self.redis_expiration_time, value)
276 283 pipe.execute()
277 284
278 285 def get_mutex(self, key):
279 286 u = redis_backend.u
280 287 if self.distributed_lock:
281 288 lock_key = u('_lock_{0}').format(key)
282 289 log.debug('Trying to acquire Redis lock for key %s', lock_key)
283 290 return self.client.lock(lock_key, self.lock_timeout, self.lock_sleep)
284 291 else:
285 292 return None
286 293
287 294
288 295 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
289 296 key_prefix = 'redis_pickle_backend'
290 297 pass
291 298
292 299
293 300 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
294 301 key_prefix = 'redis_msgpack_backend'
295 302 pass
General Comments 0
You need to be logged in to leave comments. Login now