##// END OF EJS Templates
dogpile: use connection_pool for redis backend which is faster in gevent scenarios
marcink -
r3930:4df10256 default
parent child Browse files
Show More
@@ -1,267 +1,285 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2015-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import time
22 22 import errno
23 23 import logging
24 24
25 25 import msgpack
26 26 import gevent
27 import redis
27 28
28 29 from dogpile.cache.api import CachedValue
29 30 from dogpile.cache.backends import memory as memory_backend
30 31 from dogpile.cache.backends import file as file_backend
31 32 from dogpile.cache.backends import redis as redis_backend
32 33 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
33 34 from dogpile.cache.util import memoized_property
34 35
35 36 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
36 37
37 38
38 39 _default_max_size = 1024
39 40
40 41 log = logging.getLogger(__name__)
41 42
42 43
43 44 class LRUMemoryBackend(memory_backend.MemoryBackend):
44 45 key_prefix = 'lru_mem_backend'
45 46 pickle_values = False
46 47
47 48 def __init__(self, arguments):
48 49 max_size = arguments.pop('max_size', _default_max_size)
49 50
50 51 LRUDictClass = LRUDict
51 52 if arguments.pop('log_key_count', None):
52 53 LRUDictClass = LRUDictDebug
53 54
54 55 arguments['cache_dict'] = LRUDictClass(max_size)
55 56 super(LRUMemoryBackend, self).__init__(arguments)
56 57
57 58 def delete(self, key):
58 59 try:
59 60 del self._cache[key]
60 61 except KeyError:
61 62 # we don't care if key isn't there at deletion
62 63 pass
63 64
64 65 def delete_multi(self, keys):
65 66 for key in keys:
66 67 self.delete(key)
67 68
68 69
69 70 class PickleSerializer(object):
70 71
71 72 def _dumps(self, value, safe=False):
72 73 try:
73 74 return compat.pickle.dumps(value)
74 75 except Exception:
75 76 if safe:
76 77 return NO_VALUE
77 78 else:
78 79 raise
79 80
80 81 def _loads(self, value, safe=True):
81 82 try:
82 83 return compat.pickle.loads(value)
83 84 except Exception:
84 85 if safe:
85 86 return NO_VALUE
86 87 else:
87 88 raise
88 89
89 90
90 91 class MsgPackSerializer(object):
91 92
92 93 def _dumps(self, value, safe=False):
93 94 try:
94 95 return msgpack.packb(value)
95 96 except Exception:
96 97 if safe:
97 98 return NO_VALUE
98 99 else:
99 100 raise
100 101
101 102 def _loads(self, value, safe=True):
102 103 """
103 104 pickle maintained the `CachedValue` wrapper of the tuple
104 105 msgpack does not, so it must be added back in.
105 106 """
106 107 try:
107 108 value = msgpack.unpackb(value, use_list=False)
108 109 return CachedValue(*value)
109 110 except Exception:
110 111 if safe:
111 112 return NO_VALUE
112 113 else:
113 114 raise
114 115
115 116
116 117 import fcntl
117 118 flock_org = fcntl.flock
118 119
119 120
120 121 class CustomLockFactory(FileLock):
121 122
122 123 @memoized_property
123 124 def _module(self):
124 125
125 126 def gevent_flock(fd, operation):
126 127 """
127 128 Gevent compatible flock
128 129 """
129 130 # set non-blocking, this will cause an exception if we cannot acquire a lock
130 131 operation |= fcntl.LOCK_NB
131 132 start_lock_time = time.time()
132 133 timeout = 60 * 15 # 15min
133 134 while True:
134 135 try:
135 136 flock_org(fd, operation)
136 137 # lock has been acquired
137 138 break
138 139 except (OSError, IOError) as e:
139 140 # raise on other errors than Resource temporarily unavailable
140 141 if e.errno != errno.EAGAIN:
141 142 raise
142 143 elif (time.time() - start_lock_time) > timeout:
143 144 # waited to much time on a lock, better fail than loop for ever
144 145 log.error('Failed to acquire lock on `%s` after waiting %ss',
145 146 self.filename, timeout)
146 147 raise
147 148 wait_timeout = 0.03
148 149 log.debug('Failed to acquire lock on `%s`, retry in %ss',
149 150 self.filename, wait_timeout)
150 151 gevent.sleep(wait_timeout)
151 152
152 153 fcntl.flock = gevent_flock
153 154 return fcntl
154 155
155 156
156 157 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
157 158 key_prefix = 'file_backend'
158 159
159 160 def __init__(self, arguments):
160 161 arguments['lock_factory'] = CustomLockFactory
161 162 super(FileNamespaceBackend, self).__init__(arguments)
162 163
163 164 def list_keys(self, prefix=''):
164 165 prefix = '{}:{}'.format(self.key_prefix, prefix)
165 166
166 167 def cond(v):
167 168 if not prefix:
168 169 return True
169 170
170 171 if v.startswith(prefix):
171 172 return True
172 173 return False
173 174
174 175 with self._dbm_file(True) as dbm:
175 176
176 177 return filter(cond, dbm.keys())
177 178
178 179 def get_store(self):
179 180 return self.filename
180 181
181 182 def get(self, key):
182 183 with self._dbm_file(False) as dbm:
183 184 if hasattr(dbm, 'get'):
184 185 value = dbm.get(key, NO_VALUE)
185 186 else:
186 187 # gdbm objects lack a .get method
187 188 try:
188 189 value = dbm[key]
189 190 except KeyError:
190 191 value = NO_VALUE
191 192 if value is not NO_VALUE:
192 193 value = self._loads(value)
193 194 return value
194 195
195 196 def set(self, key, value):
196 197 with self._dbm_file(True) as dbm:
197 198 dbm[key] = self._dumps(value)
198 199
199 200 def set_multi(self, mapping):
200 201 with self._dbm_file(True) as dbm:
201 202 for key, value in mapping.items():
202 203 dbm[key] = self._dumps(value)
203 204
204 205
205 206 class BaseRedisBackend(redis_backend.RedisBackend):
207
208 def _create_client(self):
209 args = {}
210
211 if self.url is not None:
212 args.update(url=self.url)
213
214 else:
215 args.update(
216 host=self.host, password=self.password,
217 port=self.port, db=self.db
218 )
219
220 connection_pool = redis.ConnectionPool(**args)
221
222 return redis.StrictRedis(connection_pool=connection_pool)
223
206 224 def list_keys(self, prefix=''):
207 225 prefix = '{}:{}*'.format(self.key_prefix, prefix)
208 226 return self.client.keys(prefix)
209 227
210 228 def get_store(self):
211 229 return self.client.connection_pool
212 230
213 231 def get(self, key):
214 232 value = self.client.get(key)
215 233 if value is None:
216 234 return NO_VALUE
217 235 return self._loads(value)
218 236
219 237 def get_multi(self, keys):
220 238 if not keys:
221 239 return []
222 240 values = self.client.mget(keys)
223 241 loads = self._loads
224 242 return [
225 243 loads(v) if v is not None else NO_VALUE
226 244 for v in values]
227 245
228 246 def set(self, key, value):
229 247 if self.redis_expiration_time:
230 248 self.client.setex(key, self.redis_expiration_time,
231 249 self._dumps(value))
232 250 else:
233 251 self.client.set(key, self._dumps(value))
234 252
235 253 def set_multi(self, mapping):
236 254 dumps = self._dumps
237 255 mapping = dict(
238 256 (k, dumps(v))
239 257 for k, v in mapping.items()
240 258 )
241 259
242 260 if not self.redis_expiration_time:
243 261 self.client.mset(mapping)
244 262 else:
245 263 pipe = self.client.pipeline()
246 264 for key, value in mapping.items():
247 265 pipe.setex(key, self.redis_expiration_time, value)
248 266 pipe.execute()
249 267
250 268 def get_mutex(self, key):
251 269 u = redis_backend.u
252 270 if self.distributed_lock:
253 271 lock_key = u('_lock_{0}').format(key)
254 272 log.debug('Trying to acquire Redis lock for key %s', lock_key)
255 273 return self.client.lock(lock_key, self.lock_timeout, self.lock_sleep)
256 274 else:
257 275 return None
258 276
259 277
260 278 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
261 279 key_prefix = 'redis_pickle_backend'
262 280 pass
263 281
264 282
265 283 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
266 284 key_prefix = 'redis_msgpack_backend'
267 285 pass
General Comments 0
You need to be logged in to leave comments. Login now