##// END OF EJS Templates
locks: handle refresh thread already started
super-admin -
r4726:a9d97751 default
parent child Browse files
Show More
@@ -1,351 +1,354 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2015-2020 RhodeCode GmbH
3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import time
21 import time
22 import errno
22 import errno
23 import logging
23 import logging
24
24
25 import msgpack
25 import msgpack
26 import gevent
26 import gevent
27 import redis
27 import redis
28
28
29 from dogpile.cache.api import CachedValue
29 from dogpile.cache.api import CachedValue
30 from dogpile.cache.backends import memory as memory_backend
30 from dogpile.cache.backends import memory as memory_backend
31 from dogpile.cache.backends import file as file_backend
31 from dogpile.cache.backends import file as file_backend
32 from dogpile.cache.backends import redis as redis_backend
32 from dogpile.cache.backends import redis as redis_backend
33 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
33 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
34 from dogpile.cache.util import memoized_property
34 from dogpile.cache.util import memoized_property
35
35
36 from pyramid.settings import asbool
36 from pyramid.settings import asbool
37
37
38 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
38 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
39
39
40
40
41 _default_max_size = 1024
41 _default_max_size = 1024
42
42
43 log = logging.getLogger(__name__)
43 log = logging.getLogger(__name__)
44
44
45
45
46 class LRUMemoryBackend(memory_backend.MemoryBackend):
46 class LRUMemoryBackend(memory_backend.MemoryBackend):
47 key_prefix = 'lru_mem_backend'
47 key_prefix = 'lru_mem_backend'
48 pickle_values = False
48 pickle_values = False
49
49
50 def __init__(self, arguments):
50 def __init__(self, arguments):
51 max_size = arguments.pop('max_size', _default_max_size)
51 max_size = arguments.pop('max_size', _default_max_size)
52
52
53 LRUDictClass = LRUDict
53 LRUDictClass = LRUDict
54 if arguments.pop('log_key_count', None):
54 if arguments.pop('log_key_count', None):
55 LRUDictClass = LRUDictDebug
55 LRUDictClass = LRUDictDebug
56
56
57 arguments['cache_dict'] = LRUDictClass(max_size)
57 arguments['cache_dict'] = LRUDictClass(max_size)
58 super(LRUMemoryBackend, self).__init__(arguments)
58 super(LRUMemoryBackend, self).__init__(arguments)
59
59
60 def delete(self, key):
60 def delete(self, key):
61 try:
61 try:
62 del self._cache[key]
62 del self._cache[key]
63 except KeyError:
63 except KeyError:
64 # we don't care if key isn't there at deletion
64 # we don't care if key isn't there at deletion
65 pass
65 pass
66
66
67 def delete_multi(self, keys):
67 def delete_multi(self, keys):
68 for key in keys:
68 for key in keys:
69 self.delete(key)
69 self.delete(key)
70
70
71
71
72 class PickleSerializer(object):
72 class PickleSerializer(object):
73
73
74 def _dumps(self, value, safe=False):
74 def _dumps(self, value, safe=False):
75 try:
75 try:
76 return compat.pickle.dumps(value)
76 return compat.pickle.dumps(value)
77 except Exception:
77 except Exception:
78 if safe:
78 if safe:
79 return NO_VALUE
79 return NO_VALUE
80 else:
80 else:
81 raise
81 raise
82
82
83 def _loads(self, value, safe=True):
83 def _loads(self, value, safe=True):
84 try:
84 try:
85 return compat.pickle.loads(value)
85 return compat.pickle.loads(value)
86 except Exception:
86 except Exception:
87 if safe:
87 if safe:
88 return NO_VALUE
88 return NO_VALUE
89 else:
89 else:
90 raise
90 raise
91
91
92
92
93 class MsgPackSerializer(object):
93 class MsgPackSerializer(object):
94
94
95 def _dumps(self, value, safe=False):
95 def _dumps(self, value, safe=False):
96 try:
96 try:
97 return msgpack.packb(value)
97 return msgpack.packb(value)
98 except Exception:
98 except Exception:
99 if safe:
99 if safe:
100 return NO_VALUE
100 return NO_VALUE
101 else:
101 else:
102 raise
102 raise
103
103
104 def _loads(self, value, safe=True):
104 def _loads(self, value, safe=True):
105 """
105 """
106 pickle maintained the `CachedValue` wrapper of the tuple
106 pickle maintained the `CachedValue` wrapper of the tuple
107 msgpack does not, so it must be added back in.
107 msgpack does not, so it must be added back in.
108 """
108 """
109 try:
109 try:
110 value = msgpack.unpackb(value, use_list=False)
110 value = msgpack.unpackb(value, use_list=False)
111 return CachedValue(*value)
111 return CachedValue(*value)
112 except Exception:
112 except Exception:
113 if safe:
113 if safe:
114 return NO_VALUE
114 return NO_VALUE
115 else:
115 else:
116 raise
116 raise
117
117
118
118
119 import fcntl
119 import fcntl
120 flock_org = fcntl.flock
120 flock_org = fcntl.flock
121
121
122
122
123 class CustomLockFactory(FileLock):
123 class CustomLockFactory(FileLock):
124
124
125 @memoized_property
125 @memoized_property
126 def _module(self):
126 def _module(self):
127
127
128 def gevent_flock(fd, operation):
128 def gevent_flock(fd, operation):
129 """
129 """
130 Gevent compatible flock
130 Gevent compatible flock
131 """
131 """
132 # set non-blocking, this will cause an exception if we cannot acquire a lock
132 # set non-blocking, this will cause an exception if we cannot acquire a lock
133 operation |= fcntl.LOCK_NB
133 operation |= fcntl.LOCK_NB
134 start_lock_time = time.time()
134 start_lock_time = time.time()
135 timeout = 60 * 15 # 15min
135 timeout = 60 * 15 # 15min
136 while True:
136 while True:
137 try:
137 try:
138 flock_org(fd, operation)
138 flock_org(fd, operation)
139 # lock has been acquired
139 # lock has been acquired
140 break
140 break
141 except (OSError, IOError) as e:
141 except (OSError, IOError) as e:
142 # raise on other errors than Resource temporarily unavailable
142 # raise on other errors than Resource temporarily unavailable
143 if e.errno != errno.EAGAIN:
143 if e.errno != errno.EAGAIN:
144 raise
144 raise
145 elif (time.time() - start_lock_time) > timeout:
145 elif (time.time() - start_lock_time) > timeout:
146 # waited to much time on a lock, better fail than loop for ever
146 # waited to much time on a lock, better fail than loop for ever
147 log.error('Failed to acquire lock on `%s` after waiting %ss',
147 log.error('Failed to acquire lock on `%s` after waiting %ss',
148 self.filename, timeout)
148 self.filename, timeout)
149 raise
149 raise
150 wait_timeout = 0.03
150 wait_timeout = 0.03
151 log.debug('Failed to acquire lock on `%s`, retry in %ss',
151 log.debug('Failed to acquire lock on `%s`, retry in %ss',
152 self.filename, wait_timeout)
152 self.filename, wait_timeout)
153 gevent.sleep(wait_timeout)
153 gevent.sleep(wait_timeout)
154
154
155 fcntl.flock = gevent_flock
155 fcntl.flock = gevent_flock
156 return fcntl
156 return fcntl
157
157
158
158
159 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
159 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
160 key_prefix = 'file_backend'
160 key_prefix = 'file_backend'
161
161
162 def __init__(self, arguments):
162 def __init__(self, arguments):
163 arguments['lock_factory'] = CustomLockFactory
163 arguments['lock_factory'] = CustomLockFactory
164 db_file = arguments.get('filename')
164 db_file = arguments.get('filename')
165
165
166 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
166 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
167 try:
167 try:
168 super(FileNamespaceBackend, self).__init__(arguments)
168 super(FileNamespaceBackend, self).__init__(arguments)
169 except Exception:
169 except Exception:
170 log.error('Failed to initialize db at: %s', db_file)
170 log.error('Failed to initialize db at: %s', db_file)
171 raise
171 raise
172
172
173 def __repr__(self):
173 def __repr__(self):
174 return '{} `{}`'.format(self.__class__, self.filename)
174 return '{} `{}`'.format(self.__class__, self.filename)
175
175
176 def list_keys(self, prefix=''):
176 def list_keys(self, prefix=''):
177 prefix = '{}:{}'.format(self.key_prefix, prefix)
177 prefix = '{}:{}'.format(self.key_prefix, prefix)
178
178
179 def cond(v):
179 def cond(v):
180 if not prefix:
180 if not prefix:
181 return True
181 return True
182
182
183 if v.startswith(prefix):
183 if v.startswith(prefix):
184 return True
184 return True
185 return False
185 return False
186
186
187 with self._dbm_file(True) as dbm:
187 with self._dbm_file(True) as dbm:
188 try:
188 try:
189 return filter(cond, dbm.keys())
189 return filter(cond, dbm.keys())
190 except Exception:
190 except Exception:
191 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
191 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
192 raise
192 raise
193
193
194 def get_store(self):
194 def get_store(self):
195 return self.filename
195 return self.filename
196
196
197 def _dbm_get(self, key):
197 def _dbm_get(self, key):
198 with self._dbm_file(False) as dbm:
198 with self._dbm_file(False) as dbm:
199 if hasattr(dbm, 'get'):
199 if hasattr(dbm, 'get'):
200 value = dbm.get(key, NO_VALUE)
200 value = dbm.get(key, NO_VALUE)
201 else:
201 else:
202 # gdbm objects lack a .get method
202 # gdbm objects lack a .get method
203 try:
203 try:
204 value = dbm[key]
204 value = dbm[key]
205 except KeyError:
205 except KeyError:
206 value = NO_VALUE
206 value = NO_VALUE
207 if value is not NO_VALUE:
207 if value is not NO_VALUE:
208 value = self._loads(value)
208 value = self._loads(value)
209 return value
209 return value
210
210
211 def get(self, key):
211 def get(self, key):
212 try:
212 try:
213 return self._dbm_get(key)
213 return self._dbm_get(key)
214 except Exception:
214 except Exception:
215 log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store())
215 log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store())
216 raise
216 raise
217
217
218 def set(self, key, value):
218 def set(self, key, value):
219 with self._dbm_file(True) as dbm:
219 with self._dbm_file(True) as dbm:
220 dbm[key] = self._dumps(value)
220 dbm[key] = self._dumps(value)
221
221
222 def set_multi(self, mapping):
222 def set_multi(self, mapping):
223 with self._dbm_file(True) as dbm:
223 with self._dbm_file(True) as dbm:
224 for key, value in mapping.items():
224 for key, value in mapping.items():
225 dbm[key] = self._dumps(value)
225 dbm[key] = self._dumps(value)
226
226
227
227
228 class BaseRedisBackend(redis_backend.RedisBackend):
228 class BaseRedisBackend(redis_backend.RedisBackend):
229 key_prefix = ''
229 key_prefix = ''
230
230
231 def __init__(self, arguments):
231 def __init__(self, arguments):
232 super(BaseRedisBackend, self).__init__(arguments)
232 super(BaseRedisBackend, self).__init__(arguments)
233 self._lock_timeout = self.lock_timeout
233 self._lock_timeout = self.lock_timeout
234 self._lock_auto_renewal = asbool(arguments.pop("lock_auto_renewal", True))
234 self._lock_auto_renewal = asbool(arguments.pop("lock_auto_renewal", True))
235
235
236 if self._lock_auto_renewal and not self._lock_timeout:
236 if self._lock_auto_renewal and not self._lock_timeout:
237 # set default timeout for auto_renewal
237 # set default timeout for auto_renewal
238 self._lock_timeout = 30
238 self._lock_timeout = 30
239
239
240 def _create_client(self):
240 def _create_client(self):
241 args = {}
241 args = {}
242
242
243 if self.url is not None:
243 if self.url is not None:
244 args.update(url=self.url)
244 args.update(url=self.url)
245
245
246 else:
246 else:
247 args.update(
247 args.update(
248 host=self.host, password=self.password,
248 host=self.host, password=self.password,
249 port=self.port, db=self.db
249 port=self.port, db=self.db
250 )
250 )
251
251
252 connection_pool = redis.ConnectionPool(**args)
252 connection_pool = redis.ConnectionPool(**args)
253
253
254 return redis.StrictRedis(connection_pool=connection_pool)
254 return redis.StrictRedis(connection_pool=connection_pool)
255
255
256 def list_keys(self, prefix=''):
256 def list_keys(self, prefix=''):
257 prefix = '{}:{}*'.format(self.key_prefix, prefix)
257 prefix = '{}:{}*'.format(self.key_prefix, prefix)
258 return self.client.keys(prefix)
258 return self.client.keys(prefix)
259
259
260 def get_store(self):
260 def get_store(self):
261 return self.client.connection_pool
261 return self.client.connection_pool
262
262
263 def get(self, key):
263 def get(self, key):
264 value = self.client.get(key)
264 value = self.client.get(key)
265 if value is None:
265 if value is None:
266 return NO_VALUE
266 return NO_VALUE
267 return self._loads(value)
267 return self._loads(value)
268
268
269 def get_multi(self, keys):
269 def get_multi(self, keys):
270 if not keys:
270 if not keys:
271 return []
271 return []
272 values = self.client.mget(keys)
272 values = self.client.mget(keys)
273 loads = self._loads
273 loads = self._loads
274 return [
274 return [
275 loads(v) if v is not None else NO_VALUE
275 loads(v) if v is not None else NO_VALUE
276 for v in values]
276 for v in values]
277
277
278 def set(self, key, value):
278 def set(self, key, value):
279 if self.redis_expiration_time:
279 if self.redis_expiration_time:
280 self.client.setex(key, self.redis_expiration_time,
280 self.client.setex(key, self.redis_expiration_time,
281 self._dumps(value))
281 self._dumps(value))
282 else:
282 else:
283 self.client.set(key, self._dumps(value))
283 self.client.set(key, self._dumps(value))
284
284
285 def set_multi(self, mapping):
285 def set_multi(self, mapping):
286 dumps = self._dumps
286 dumps = self._dumps
287 mapping = dict(
287 mapping = dict(
288 (k, dumps(v))
288 (k, dumps(v))
289 for k, v in mapping.items()
289 for k, v in mapping.items()
290 )
290 )
291
291
292 if not self.redis_expiration_time:
292 if not self.redis_expiration_time:
293 self.client.mset(mapping)
293 self.client.mset(mapping)
294 else:
294 else:
295 pipe = self.client.pipeline()
295 pipe = self.client.pipeline()
296 for key, value in mapping.items():
296 for key, value in mapping.items():
297 pipe.setex(key, self.redis_expiration_time, value)
297 pipe.setex(key, self.redis_expiration_time, value)
298 pipe.execute()
298 pipe.execute()
299
299
300 def get_mutex(self, key):
300 def get_mutex(self, key):
301 if self.distributed_lock:
301 if self.distributed_lock:
302 lock_key = redis_backend.u('_lock_{0}').format(key)
302 lock_key = redis_backend.u('_lock_{0}').format(key)
303 log.debug('Trying to acquire Redis lock for key %s', lock_key)
303 log.debug('Trying to acquire Redis lock for key %s', lock_key)
304 return get_mutex_lock(self.client, lock_key, self._lock_timeout,
304 return get_mutex_lock(self.client, lock_key, self._lock_timeout,
305 auto_renewal=self._lock_auto_renewal)
305 auto_renewal=self._lock_auto_renewal)
306 else:
306 else:
307 return None
307 return None
308
308
309
309
310 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
310 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
311 key_prefix = 'redis_pickle_backend'
311 key_prefix = 'redis_pickle_backend'
312 pass
312 pass
313
313
314
314
315 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
315 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
316 key_prefix = 'redis_msgpack_backend'
316 key_prefix = 'redis_msgpack_backend'
317 pass
317 pass
318
318
319
319
320 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
320 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
321 import redis_lock
321 import redis_lock
322
322
323 class _RedisLockWrapper(object):
323 class _RedisLockWrapper(object):
324 """LockWrapper for redis_lock"""
324 """LockWrapper for redis_lock"""
325
325
326 @classmethod
326 @classmethod
327 def get_lock(cls):
327 def get_lock(cls):
328 return redis_lock.Lock(
328 return redis_lock.Lock(
329 redis_client=client,
329 redis_client=client,
330 name=lock_key,
330 name=lock_key,
331 expire=lock_timeout,
331 expire=lock_timeout,
332 auto_renewal=auto_renewal,
332 auto_renewal=auto_renewal,
333 strict=True,
333 strict=True,
334 )
334 )
335
335
336 def __init__(self):
336 def __init__(self):
337 self.lock = self.get_lock()
337 self.lock = self.get_lock()
338
338
339 def acquire(self, wait=True):
339 def acquire(self, wait=True):
340 try:
340 try:
341 return self.lock.acquire(wait)
341 return self.lock.acquire(wait)
342 except redis_lock.AlreadyAcquired:
342 except redis_lock.AlreadyAcquired:
343 return False
343 return False
344 except redis_lock.AlreadyStarted:
345 # refresh thread exists, but it also means we acquired the lock
346 return True
344
347
345 def release(self):
348 def release(self):
346 try:
349 try:
347 self.lock.release()
350 self.lock.release()
348 except redis_lock.NotAcquired:
351 except redis_lock.NotAcquired:
349 pass
352 pass
350
353
351 return _RedisLockWrapper()
354 return _RedisLockWrapper()
General Comments 0
You need to be logged in to leave comments. Login now