##// END OF EJS Templates
caches: use safer method of purging keys from memory dict. During some concurrency tests...
marcink -
r2931:474abf70 default
parent child Browse files
Show More
@@ -1,199 +1,201 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2015-2018 RhodeCode GmbH
3 # Copyright (C) 2015-2018 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 import time
20 import time
21 import errno
21 import errno
22 import logging
22 import logging
23
23
24 import gevent
24 import gevent
25
25
26 from dogpile.cache.backends import memory as memory_backend
26 from dogpile.cache.backends import memory as memory_backend
27 from dogpile.cache.backends import file as file_backend
27 from dogpile.cache.backends import file as file_backend
28 from dogpile.cache.backends import redis as redis_backend
28 from dogpile.cache.backends import redis as redis_backend
29 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
29 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
30 from dogpile.cache.util import memoized_property
30 from dogpile.cache.util import memoized_property
31 from lru import LRU as LRUDict
31 from lru import LRU as LRUDict
32
32
33
33
34 _default_max_size = 1024
34 _default_max_size = 1024
35
35
36 log = logging.getLogger(__name__)
36 log = logging.getLogger(__name__)
37
37
38
38
39 class LRUMemoryBackend(memory_backend.MemoryBackend):
39 class LRUMemoryBackend(memory_backend.MemoryBackend):
40 pickle_values = False
40 pickle_values = False
41
41
42 def __init__(self, arguments):
42 def __init__(self, arguments):
43 max_size = arguments.pop('max_size', _default_max_size)
43 max_size = arguments.pop('max_size', _default_max_size)
44 callback = None
44 callback = None
45 if arguments.pop('log_max_size_reached', None):
45 if arguments.pop('log_max_size_reached', None):
46 def evicted(key, value):
46 def evicted(key, value):
47 log.debug(
47 log.debug(
48 'LRU: evicting key `%s` due to max size %s reach', key, max_size)
48 'LRU: evicting key `%s` due to max size %s reach', key, max_size)
49 callback = evicted
49 callback = evicted
50
50
51 arguments['cache_dict'] = LRUDict(max_size, callback=callback)
51 arguments['cache_dict'] = LRUDict(max_size, callback=callback)
52 super(LRUMemoryBackend, self).__init__(arguments)
52 super(LRUMemoryBackend, self).__init__(arguments)
53
53
54 def delete(self, key):
54 def delete(self, key):
55 if self._cache.has_key(key):
55 try:
56 del self._cache[key]
56 del self._cache[key]
57 except KeyError:
58 # we don't care if key isn't there at deletion
59 pass
57
60
58 def delete_multi(self, keys):
61 def delete_multi(self, keys):
59 for key in keys:
62 for key in keys:
60 if self._cache.has_key(key):
63 self.delete(key)
61 del self._cache[key]
62
64
63
65
64 class Serializer(object):
66 class Serializer(object):
65 def _dumps(self, value, safe=False):
67 def _dumps(self, value, safe=False):
66 try:
68 try:
67 return compat.pickle.dumps(value)
69 return compat.pickle.dumps(value)
68 except Exception:
70 except Exception:
69 if safe:
71 if safe:
70 return NO_VALUE
72 return NO_VALUE
71 else:
73 else:
72 raise
74 raise
73
75
74 def _loads(self, value, safe=True):
76 def _loads(self, value, safe=True):
75 try:
77 try:
76 return compat.pickle.loads(value)
78 return compat.pickle.loads(value)
77 except Exception:
79 except Exception:
78 if safe:
80 if safe:
79 return NO_VALUE
81 return NO_VALUE
80 else:
82 else:
81 raise
83 raise
82
84
83
85
84 class CustomLockFactory(FileLock):
86 class CustomLockFactory(FileLock):
85
87
86 @memoized_property
88 @memoized_property
87 def _module(self):
89 def _module(self):
88 import fcntl
90 import fcntl
89 flock_org = fcntl.flock
91 flock_org = fcntl.flock
90
92
91 def gevent_flock(fd, operation):
93 def gevent_flock(fd, operation):
92 """
94 """
93 Gevent compatible flock
95 Gevent compatible flock
94 """
96 """
95 # set non-blocking, this will cause an exception if we cannot acquire a lock
97 # set non-blocking, this will cause an exception if we cannot acquire a lock
96 operation |= fcntl.LOCK_NB
98 operation |= fcntl.LOCK_NB
97 start_lock_time = time.time()
99 start_lock_time = time.time()
98 timeout = 60 * 5 # 5min
100 timeout = 60 * 5 # 5min
99 while True:
101 while True:
100 try:
102 try:
101 flock_org(fd, operation)
103 flock_org(fd, operation)
102 # lock has been acquired
104 # lock has been acquired
103 break
105 break
104 except (OSError, IOError) as e:
106 except (OSError, IOError) as e:
105 # raise on other errors than Resource temporarily unavailable
107 # raise on other errors than Resource temporarily unavailable
106 if e.errno != errno.EAGAIN:
108 if e.errno != errno.EAGAIN:
107 raise
109 raise
108 elif (time.time() - start_lock_time) > timeout:
110 elif (time.time() - start_lock_time) > timeout:
109 # waited to much time on a lock, better fail than loop for ever
111 # waited to much time on a lock, better fail than loop for ever
110 raise
112 raise
111
113
112 log.debug('Failed to acquire lock, retry in 0.1')
114 log.debug('Failed to acquire lock, retry in 0.1')
113 gevent.sleep(0.1)
115 gevent.sleep(0.1)
114
116
115 fcntl.flock = gevent_flock
117 fcntl.flock = gevent_flock
116 return fcntl
118 return fcntl
117
119
118
120
119 class FileNamespaceBackend(Serializer, file_backend.DBMBackend):
121 class FileNamespaceBackend(Serializer, file_backend.DBMBackend):
120
122
121 def __init__(self, arguments):
123 def __init__(self, arguments):
122 arguments['lock_factory'] = CustomLockFactory
124 arguments['lock_factory'] = CustomLockFactory
123 super(FileNamespaceBackend, self).__init__(arguments)
125 super(FileNamespaceBackend, self).__init__(arguments)
124
126
125 def list_keys(self, prefix=''):
127 def list_keys(self, prefix=''):
126 def cond(v):
128 def cond(v):
127 if not prefix:
129 if not prefix:
128 return True
130 return True
129
131
130 if v.startswith(prefix):
132 if v.startswith(prefix):
131 return True
133 return True
132 return False
134 return False
133
135
134 with self._dbm_file(True) as dbm:
136 with self._dbm_file(True) as dbm:
135
137
136 return filter(cond, dbm.keys())
138 return filter(cond, dbm.keys())
137
139
138 def get_store(self):
140 def get_store(self):
139 return self.filename
141 return self.filename
140
142
141 def get(self, key):
143 def get(self, key):
142 with self._dbm_file(False) as dbm:
144 with self._dbm_file(False) as dbm:
143 if hasattr(dbm, 'get'):
145 if hasattr(dbm, 'get'):
144 value = dbm.get(key, NO_VALUE)
146 value = dbm.get(key, NO_VALUE)
145 else:
147 else:
146 # gdbm objects lack a .get method
148 # gdbm objects lack a .get method
147 try:
149 try:
148 value = dbm[key]
150 value = dbm[key]
149 except KeyError:
151 except KeyError:
150 value = NO_VALUE
152 value = NO_VALUE
151 if value is not NO_VALUE:
153 if value is not NO_VALUE:
152 value = self._loads(value)
154 value = self._loads(value)
153 return value
155 return value
154
156
155 def set(self, key, value):
157 def set(self, key, value):
156 with self._dbm_file(True) as dbm:
158 with self._dbm_file(True) as dbm:
157 dbm[key] = self._dumps(value)
159 dbm[key] = self._dumps(value)
158
160
159 def set_multi(self, mapping):
161 def set_multi(self, mapping):
160 with self._dbm_file(True) as dbm:
162 with self._dbm_file(True) as dbm:
161 for key, value in mapping.items():
163 for key, value in mapping.items():
162 dbm[key] = self._dumps(value)
164 dbm[key] = self._dumps(value)
163
165
164
166
165 class RedisPickleBackend(Serializer, redis_backend.RedisBackend):
167 class RedisPickleBackend(Serializer, redis_backend.RedisBackend):
166 def list_keys(self, prefix=''):
168 def list_keys(self, prefix=''):
167 if prefix:
169 if prefix:
168 prefix = prefix + '*'
170 prefix = prefix + '*'
169 return self.client.keys(prefix)
171 return self.client.keys(prefix)
170
172
171 def get_store(self):
173 def get_store(self):
172 return self.client.connection_pool
174 return self.client.connection_pool
173
175
174 def get(self, key):
176 def get(self, key):
175 value = self.client.get(key)
177 value = self.client.get(key)
176 if value is None:
178 if value is None:
177 return NO_VALUE
179 return NO_VALUE
178 return self._loads(value)
180 return self._loads(value)
179
181
180 def set(self, key, value):
182 def set(self, key, value):
181 if self.redis_expiration_time:
183 if self.redis_expiration_time:
182 self.client.setex(key, self.redis_expiration_time,
184 self.client.setex(key, self.redis_expiration_time,
183 self._dumps(value))
185 self._dumps(value))
184 else:
186 else:
185 self.client.set(key, self._dumps(value))
187 self.client.set(key, self._dumps(value))
186
188
187 def set_multi(self, mapping):
189 def set_multi(self, mapping):
188 mapping = dict(
190 mapping = dict(
189 (k, self._dumps(v))
191 (k, self._dumps(v))
190 for k, v in mapping.items()
192 for k, v in mapping.items()
191 )
193 )
192
194
193 if not self.redis_expiration_time:
195 if not self.redis_expiration_time:
194 self.client.mset(mapping)
196 self.client.mset(mapping)
195 else:
197 else:
196 pipe = self.client.pipeline()
198 pipe = self.client.pipeline()
197 for key, value in mapping.items():
199 for key, value in mapping.items():
198 pipe.setex(key, self.redis_expiration_time, value)
200 pipe.setex(key, self.redis_expiration_time, value)
199 pipe.execute()
201 pipe.execute()
General Comments 0
You need to be logged in to leave comments. Login now