##// END OF EJS Templates
caches: log errors when doing an initilize of DB caches.
marcink -
r4423:82526ce6 default
parent child Browse files
Show More
@@ -1,295 +1,302 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2015-2020 RhodeCode GmbH
3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import time
21 import time
22 import errno
22 import errno
23 import logging
23 import logging
24
24
25 import msgpack
25 import msgpack
26 import gevent
26 import gevent
27 import redis
27 import redis
28
28
29 from dogpile.cache.api import CachedValue
29 from dogpile.cache.api import CachedValue
30 from dogpile.cache.backends import memory as memory_backend
30 from dogpile.cache.backends import memory as memory_backend
31 from dogpile.cache.backends import file as file_backend
31 from dogpile.cache.backends import file as file_backend
32 from dogpile.cache.backends import redis as redis_backend
32 from dogpile.cache.backends import redis as redis_backend
33 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
33 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
34 from dogpile.cache.util import memoized_property
34 from dogpile.cache.util import memoized_property
35
35
36 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
36 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
37
37
38
38
39 _default_max_size = 1024
39 _default_max_size = 1024
40
40
41 log = logging.getLogger(__name__)
41 log = logging.getLogger(__name__)
42
42
43
43
44 class LRUMemoryBackend(memory_backend.MemoryBackend):
44 class LRUMemoryBackend(memory_backend.MemoryBackend):
45 key_prefix = 'lru_mem_backend'
45 key_prefix = 'lru_mem_backend'
46 pickle_values = False
46 pickle_values = False
47
47
48 def __init__(self, arguments):
48 def __init__(self, arguments):
49 max_size = arguments.pop('max_size', _default_max_size)
49 max_size = arguments.pop('max_size', _default_max_size)
50
50
51 LRUDictClass = LRUDict
51 LRUDictClass = LRUDict
52 if arguments.pop('log_key_count', None):
52 if arguments.pop('log_key_count', None):
53 LRUDictClass = LRUDictDebug
53 LRUDictClass = LRUDictDebug
54
54
55 arguments['cache_dict'] = LRUDictClass(max_size)
55 arguments['cache_dict'] = LRUDictClass(max_size)
56 super(LRUMemoryBackend, self).__init__(arguments)
56 super(LRUMemoryBackend, self).__init__(arguments)
57
57
58 def delete(self, key):
58 def delete(self, key):
59 try:
59 try:
60 del self._cache[key]
60 del self._cache[key]
61 except KeyError:
61 except KeyError:
62 # we don't care if key isn't there at deletion
62 # we don't care if key isn't there at deletion
63 pass
63 pass
64
64
65 def delete_multi(self, keys):
65 def delete_multi(self, keys):
66 for key in keys:
66 for key in keys:
67 self.delete(key)
67 self.delete(key)
68
68
69
69
70 class PickleSerializer(object):
70 class PickleSerializer(object):
71
71
72 def _dumps(self, value, safe=False):
72 def _dumps(self, value, safe=False):
73 try:
73 try:
74 return compat.pickle.dumps(value)
74 return compat.pickle.dumps(value)
75 except Exception:
75 except Exception:
76 if safe:
76 if safe:
77 return NO_VALUE
77 return NO_VALUE
78 else:
78 else:
79 raise
79 raise
80
80
81 def _loads(self, value, safe=True):
81 def _loads(self, value, safe=True):
82 try:
82 try:
83 return compat.pickle.loads(value)
83 return compat.pickle.loads(value)
84 except Exception:
84 except Exception:
85 if safe:
85 if safe:
86 return NO_VALUE
86 return NO_VALUE
87 else:
87 else:
88 raise
88 raise
89
89
90
90
91 class MsgPackSerializer(object):
91 class MsgPackSerializer(object):
92
92
93 def _dumps(self, value, safe=False):
93 def _dumps(self, value, safe=False):
94 try:
94 try:
95 return msgpack.packb(value)
95 return msgpack.packb(value)
96 except Exception:
96 except Exception:
97 if safe:
97 if safe:
98 return NO_VALUE
98 return NO_VALUE
99 else:
99 else:
100 raise
100 raise
101
101
102 def _loads(self, value, safe=True):
102 def _loads(self, value, safe=True):
103 """
103 """
104 pickle maintained the `CachedValue` wrapper of the tuple
104 pickle maintained the `CachedValue` wrapper of the tuple
105 msgpack does not, so it must be added back in.
105 msgpack does not, so it must be added back in.
106 """
106 """
107 try:
107 try:
108 value = msgpack.unpackb(value, use_list=False)
108 value = msgpack.unpackb(value, use_list=False)
109 return CachedValue(*value)
109 return CachedValue(*value)
110 except Exception:
110 except Exception:
111 if safe:
111 if safe:
112 return NO_VALUE
112 return NO_VALUE
113 else:
113 else:
114 raise
114 raise
115
115
116
116
117 import fcntl
117 import fcntl
118 flock_org = fcntl.flock
118 flock_org = fcntl.flock
119
119
120
120
121 class CustomLockFactory(FileLock):
121 class CustomLockFactory(FileLock):
122
122
123 @memoized_property
123 @memoized_property
124 def _module(self):
124 def _module(self):
125
125
126 def gevent_flock(fd, operation):
126 def gevent_flock(fd, operation):
127 """
127 """
128 Gevent compatible flock
128 Gevent compatible flock
129 """
129 """
130 # set non-blocking, this will cause an exception if we cannot acquire a lock
130 # set non-blocking, this will cause an exception if we cannot acquire a lock
131 operation |= fcntl.LOCK_NB
131 operation |= fcntl.LOCK_NB
132 start_lock_time = time.time()
132 start_lock_time = time.time()
133 timeout = 60 * 15 # 15min
133 timeout = 60 * 15 # 15min
134 while True:
134 while True:
135 try:
135 try:
136 flock_org(fd, operation)
136 flock_org(fd, operation)
137 # lock has been acquired
137 # lock has been acquired
138 break
138 break
139 except (OSError, IOError) as e:
139 except (OSError, IOError) as e:
140 # raise on other errors than Resource temporarily unavailable
140 # raise on other errors than Resource temporarily unavailable
141 if e.errno != errno.EAGAIN:
141 if e.errno != errno.EAGAIN:
142 raise
142 raise
143 elif (time.time() - start_lock_time) > timeout:
143 elif (time.time() - start_lock_time) > timeout:
144 # waited to much time on a lock, better fail than loop for ever
144 # waited to much time on a lock, better fail than loop for ever
145 log.error('Failed to acquire lock on `%s` after waiting %ss',
145 log.error('Failed to acquire lock on `%s` after waiting %ss',
146 self.filename, timeout)
146 self.filename, timeout)
147 raise
147 raise
148 wait_timeout = 0.03
148 wait_timeout = 0.03
149 log.debug('Failed to acquire lock on `%s`, retry in %ss',
149 log.debug('Failed to acquire lock on `%s`, retry in %ss',
150 self.filename, wait_timeout)
150 self.filename, wait_timeout)
151 gevent.sleep(wait_timeout)
151 gevent.sleep(wait_timeout)
152
152
153 fcntl.flock = gevent_flock
153 fcntl.flock = gevent_flock
154 return fcntl
154 return fcntl
155
155
156
156
157 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
157 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
158 key_prefix = 'file_backend'
158 key_prefix = 'file_backend'
159
159
160 def __init__(self, arguments):
160 def __init__(self, arguments):
161 arguments['lock_factory'] = CustomLockFactory
161 arguments['lock_factory'] = CustomLockFactory
162 super(FileNamespaceBackend, self).__init__(arguments)
162 db_file = arguments.get('filename')
163
164 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
165 try:
166 super(FileNamespaceBackend, self).__init__(arguments)
167 except Exception:
168 log.error('Failed to initialize db at: %s', db_file)
169 raise
163
170
164 def __repr__(self):
171 def __repr__(self):
165 return '{} `{}`'.format(self.__class__, self.filename)
172 return '{} `{}`'.format(self.__class__, self.filename)
166
173
167 def list_keys(self, prefix=''):
174 def list_keys(self, prefix=''):
168 prefix = '{}:{}'.format(self.key_prefix, prefix)
175 prefix = '{}:{}'.format(self.key_prefix, prefix)
169
176
170 def cond(v):
177 def cond(v):
171 if not prefix:
178 if not prefix:
172 return True
179 return True
173
180
174 if v.startswith(prefix):
181 if v.startswith(prefix):
175 return True
182 return True
176 return False
183 return False
177
184
178 with self._dbm_file(True) as dbm:
185 with self._dbm_file(True) as dbm:
179
186
180 return filter(cond, dbm.keys())
187 return filter(cond, dbm.keys())
181
188
182 def get_store(self):
189 def get_store(self):
183 return self.filename
190 return self.filename
184
191
185 def _dbm_get(self, key):
192 def _dbm_get(self, key):
186 with self._dbm_file(False) as dbm:
193 with self._dbm_file(False) as dbm:
187 if hasattr(dbm, 'get'):
194 if hasattr(dbm, 'get'):
188 value = dbm.get(key, NO_VALUE)
195 value = dbm.get(key, NO_VALUE)
189 else:
196 else:
190 # gdbm objects lack a .get method
197 # gdbm objects lack a .get method
191 try:
198 try:
192 value = dbm[key]
199 value = dbm[key]
193 except KeyError:
200 except KeyError:
194 value = NO_VALUE
201 value = NO_VALUE
195 if value is not NO_VALUE:
202 if value is not NO_VALUE:
196 value = self._loads(value)
203 value = self._loads(value)
197 return value
204 return value
198
205
199 def get(self, key):
206 def get(self, key):
200 try:
207 try:
201 return self._dbm_get(key)
208 return self._dbm_get(key)
202 except Exception:
209 except Exception:
203 log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store())
210 log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store())
204 raise
211 raise
205
212
206 def set(self, key, value):
213 def set(self, key, value):
207 with self._dbm_file(True) as dbm:
214 with self._dbm_file(True) as dbm:
208 dbm[key] = self._dumps(value)
215 dbm[key] = self._dumps(value)
209
216
210 def set_multi(self, mapping):
217 def set_multi(self, mapping):
211 with self._dbm_file(True) as dbm:
218 with self._dbm_file(True) as dbm:
212 for key, value in mapping.items():
219 for key, value in mapping.items():
213 dbm[key] = self._dumps(value)
220 dbm[key] = self._dumps(value)
214
221
215
222
216 class BaseRedisBackend(redis_backend.RedisBackend):
223 class BaseRedisBackend(redis_backend.RedisBackend):
217
224
218 def _create_client(self):
225 def _create_client(self):
219 args = {}
226 args = {}
220
227
221 if self.url is not None:
228 if self.url is not None:
222 args.update(url=self.url)
229 args.update(url=self.url)
223
230
224 else:
231 else:
225 args.update(
232 args.update(
226 host=self.host, password=self.password,
233 host=self.host, password=self.password,
227 port=self.port, db=self.db
234 port=self.port, db=self.db
228 )
235 )
229
236
230 connection_pool = redis.ConnectionPool(**args)
237 connection_pool = redis.ConnectionPool(**args)
231
238
232 return redis.StrictRedis(connection_pool=connection_pool)
239 return redis.StrictRedis(connection_pool=connection_pool)
233
240
234 def list_keys(self, prefix=''):
241 def list_keys(self, prefix=''):
235 prefix = '{}:{}*'.format(self.key_prefix, prefix)
242 prefix = '{}:{}*'.format(self.key_prefix, prefix)
236 return self.client.keys(prefix)
243 return self.client.keys(prefix)
237
244
238 def get_store(self):
245 def get_store(self):
239 return self.client.connection_pool
246 return self.client.connection_pool
240
247
241 def get(self, key):
248 def get(self, key):
242 value = self.client.get(key)
249 value = self.client.get(key)
243 if value is None:
250 if value is None:
244 return NO_VALUE
251 return NO_VALUE
245 return self._loads(value)
252 return self._loads(value)
246
253
247 def get_multi(self, keys):
254 def get_multi(self, keys):
248 if not keys:
255 if not keys:
249 return []
256 return []
250 values = self.client.mget(keys)
257 values = self.client.mget(keys)
251 loads = self._loads
258 loads = self._loads
252 return [
259 return [
253 loads(v) if v is not None else NO_VALUE
260 loads(v) if v is not None else NO_VALUE
254 for v in values]
261 for v in values]
255
262
256 def set(self, key, value):
263 def set(self, key, value):
257 if self.redis_expiration_time:
264 if self.redis_expiration_time:
258 self.client.setex(key, self.redis_expiration_time,
265 self.client.setex(key, self.redis_expiration_time,
259 self._dumps(value))
266 self._dumps(value))
260 else:
267 else:
261 self.client.set(key, self._dumps(value))
268 self.client.set(key, self._dumps(value))
262
269
263 def set_multi(self, mapping):
270 def set_multi(self, mapping):
264 dumps = self._dumps
271 dumps = self._dumps
265 mapping = dict(
272 mapping = dict(
266 (k, dumps(v))
273 (k, dumps(v))
267 for k, v in mapping.items()
274 for k, v in mapping.items()
268 )
275 )
269
276
270 if not self.redis_expiration_time:
277 if not self.redis_expiration_time:
271 self.client.mset(mapping)
278 self.client.mset(mapping)
272 else:
279 else:
273 pipe = self.client.pipeline()
280 pipe = self.client.pipeline()
274 for key, value in mapping.items():
281 for key, value in mapping.items():
275 pipe.setex(key, self.redis_expiration_time, value)
282 pipe.setex(key, self.redis_expiration_time, value)
276 pipe.execute()
283 pipe.execute()
277
284
278 def get_mutex(self, key):
285 def get_mutex(self, key):
279 u = redis_backend.u
286 u = redis_backend.u
280 if self.distributed_lock:
287 if self.distributed_lock:
281 lock_key = u('_lock_{0}').format(key)
288 lock_key = u('_lock_{0}').format(key)
282 log.debug('Trying to acquire Redis lock for key %s', lock_key)
289 log.debug('Trying to acquire Redis lock for key %s', lock_key)
283 return self.client.lock(lock_key, self.lock_timeout, self.lock_sleep)
290 return self.client.lock(lock_key, self.lock_timeout, self.lock_sleep)
284 else:
291 else:
285 return None
292 return None
286
293
287
294
288 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
295 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
289 key_prefix = 'redis_pickle_backend'
296 key_prefix = 'redis_pickle_backend'
290 pass
297 pass
291
298
292
299
293 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
300 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
294 key_prefix = 'redis_msgpack_backend'
301 key_prefix = 'redis_msgpack_backend'
295 pass
302 pass
General Comments 0
You need to be logged in to leave comments. Login now