##// END OF EJS Templates
dogpile: use connection_pool for redis backend which is faster in gevent scenarios
marcink -
r3930:4df10256 default
parent child Browse files
Show More
@@ -1,267 +1,285 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2015-2019 RhodeCode GmbH
3 # Copyright (C) 2015-2019 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import time
21 import time
22 import errno
22 import errno
23 import logging
23 import logging
24
24
25 import msgpack
25 import msgpack
26 import gevent
26 import gevent
27 import redis
27
28
28 from dogpile.cache.api import CachedValue
29 from dogpile.cache.api import CachedValue
29 from dogpile.cache.backends import memory as memory_backend
30 from dogpile.cache.backends import memory as memory_backend
30 from dogpile.cache.backends import file as file_backend
31 from dogpile.cache.backends import file as file_backend
31 from dogpile.cache.backends import redis as redis_backend
32 from dogpile.cache.backends import redis as redis_backend
32 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
33 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
33 from dogpile.cache.util import memoized_property
34 from dogpile.cache.util import memoized_property
34
35
35 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
36 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
36
37
37
38
38 _default_max_size = 1024
39 _default_max_size = 1024
39
40
40 log = logging.getLogger(__name__)
41 log = logging.getLogger(__name__)
41
42
42
43
43 class LRUMemoryBackend(memory_backend.MemoryBackend):
44 class LRUMemoryBackend(memory_backend.MemoryBackend):
44 key_prefix = 'lru_mem_backend'
45 key_prefix = 'lru_mem_backend'
45 pickle_values = False
46 pickle_values = False
46
47
47 def __init__(self, arguments):
48 def __init__(self, arguments):
48 max_size = arguments.pop('max_size', _default_max_size)
49 max_size = arguments.pop('max_size', _default_max_size)
49
50
50 LRUDictClass = LRUDict
51 LRUDictClass = LRUDict
51 if arguments.pop('log_key_count', None):
52 if arguments.pop('log_key_count', None):
52 LRUDictClass = LRUDictDebug
53 LRUDictClass = LRUDictDebug
53
54
54 arguments['cache_dict'] = LRUDictClass(max_size)
55 arguments['cache_dict'] = LRUDictClass(max_size)
55 super(LRUMemoryBackend, self).__init__(arguments)
56 super(LRUMemoryBackend, self).__init__(arguments)
56
57
57 def delete(self, key):
58 def delete(self, key):
58 try:
59 try:
59 del self._cache[key]
60 del self._cache[key]
60 except KeyError:
61 except KeyError:
61 # we don't care if key isn't there at deletion
62 # we don't care if key isn't there at deletion
62 pass
63 pass
63
64
64 def delete_multi(self, keys):
65 def delete_multi(self, keys):
65 for key in keys:
66 for key in keys:
66 self.delete(key)
67 self.delete(key)
67
68
68
69
69 class PickleSerializer(object):
70 class PickleSerializer(object):
70
71
71 def _dumps(self, value, safe=False):
72 def _dumps(self, value, safe=False):
72 try:
73 try:
73 return compat.pickle.dumps(value)
74 return compat.pickle.dumps(value)
74 except Exception:
75 except Exception:
75 if safe:
76 if safe:
76 return NO_VALUE
77 return NO_VALUE
77 else:
78 else:
78 raise
79 raise
79
80
80 def _loads(self, value, safe=True):
81 def _loads(self, value, safe=True):
81 try:
82 try:
82 return compat.pickle.loads(value)
83 return compat.pickle.loads(value)
83 except Exception:
84 except Exception:
84 if safe:
85 if safe:
85 return NO_VALUE
86 return NO_VALUE
86 else:
87 else:
87 raise
88 raise
88
89
89
90
90 class MsgPackSerializer(object):
91 class MsgPackSerializer(object):
91
92
92 def _dumps(self, value, safe=False):
93 def _dumps(self, value, safe=False):
93 try:
94 try:
94 return msgpack.packb(value)
95 return msgpack.packb(value)
95 except Exception:
96 except Exception:
96 if safe:
97 if safe:
97 return NO_VALUE
98 return NO_VALUE
98 else:
99 else:
99 raise
100 raise
100
101
101 def _loads(self, value, safe=True):
102 def _loads(self, value, safe=True):
102 """
103 """
103 pickle maintained the `CachedValue` wrapper of the tuple
104 pickle maintained the `CachedValue` wrapper of the tuple
104 msgpack does not, so it must be added back in.
105 msgpack does not, so it must be added back in.
105 """
106 """
106 try:
107 try:
107 value = msgpack.unpackb(value, use_list=False)
108 value = msgpack.unpackb(value, use_list=False)
108 return CachedValue(*value)
109 return CachedValue(*value)
109 except Exception:
110 except Exception:
110 if safe:
111 if safe:
111 return NO_VALUE
112 return NO_VALUE
112 else:
113 else:
113 raise
114 raise
114
115
115
116
116 import fcntl
117 import fcntl
117 flock_org = fcntl.flock
118 flock_org = fcntl.flock
118
119
119
120
120 class CustomLockFactory(FileLock):
121 class CustomLockFactory(FileLock):
121
122
122 @memoized_property
123 @memoized_property
123 def _module(self):
124 def _module(self):
124
125
125 def gevent_flock(fd, operation):
126 def gevent_flock(fd, operation):
126 """
127 """
127 Gevent compatible flock
128 Gevent compatible flock
128 """
129 """
129 # set non-blocking, this will cause an exception if we cannot acquire a lock
130 # set non-blocking, this will cause an exception if we cannot acquire a lock
130 operation |= fcntl.LOCK_NB
131 operation |= fcntl.LOCK_NB
131 start_lock_time = time.time()
132 start_lock_time = time.time()
132 timeout = 60 * 15 # 15min
133 timeout = 60 * 15 # 15min
133 while True:
134 while True:
134 try:
135 try:
135 flock_org(fd, operation)
136 flock_org(fd, operation)
136 # lock has been acquired
137 # lock has been acquired
137 break
138 break
138 except (OSError, IOError) as e:
139 except (OSError, IOError) as e:
139 # raise on other errors than Resource temporarily unavailable
140 # raise on other errors than Resource temporarily unavailable
140 if e.errno != errno.EAGAIN:
141 if e.errno != errno.EAGAIN:
141 raise
142 raise
142 elif (time.time() - start_lock_time) > timeout:
143 elif (time.time() - start_lock_time) > timeout:
143 # waited to much time on a lock, better fail than loop for ever
144 # waited to much time on a lock, better fail than loop for ever
144 log.error('Failed to acquire lock on `%s` after waiting %ss',
145 log.error('Failed to acquire lock on `%s` after waiting %ss',
145 self.filename, timeout)
146 self.filename, timeout)
146 raise
147 raise
147 wait_timeout = 0.03
148 wait_timeout = 0.03
148 log.debug('Failed to acquire lock on `%s`, retry in %ss',
149 log.debug('Failed to acquire lock on `%s`, retry in %ss',
149 self.filename, wait_timeout)
150 self.filename, wait_timeout)
150 gevent.sleep(wait_timeout)
151 gevent.sleep(wait_timeout)
151
152
152 fcntl.flock = gevent_flock
153 fcntl.flock = gevent_flock
153 return fcntl
154 return fcntl
154
155
155
156
156 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
157 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
157 key_prefix = 'file_backend'
158 key_prefix = 'file_backend'
158
159
159 def __init__(self, arguments):
160 def __init__(self, arguments):
160 arguments['lock_factory'] = CustomLockFactory
161 arguments['lock_factory'] = CustomLockFactory
161 super(FileNamespaceBackend, self).__init__(arguments)
162 super(FileNamespaceBackend, self).__init__(arguments)
162
163
163 def list_keys(self, prefix=''):
164 def list_keys(self, prefix=''):
164 prefix = '{}:{}'.format(self.key_prefix, prefix)
165 prefix = '{}:{}'.format(self.key_prefix, prefix)
165
166
166 def cond(v):
167 def cond(v):
167 if not prefix:
168 if not prefix:
168 return True
169 return True
169
170
170 if v.startswith(prefix):
171 if v.startswith(prefix):
171 return True
172 return True
172 return False
173 return False
173
174
174 with self._dbm_file(True) as dbm:
175 with self._dbm_file(True) as dbm:
175
176
176 return filter(cond, dbm.keys())
177 return filter(cond, dbm.keys())
177
178
178 def get_store(self):
179 def get_store(self):
179 return self.filename
180 return self.filename
180
181
181 def get(self, key):
182 def get(self, key):
182 with self._dbm_file(False) as dbm:
183 with self._dbm_file(False) as dbm:
183 if hasattr(dbm, 'get'):
184 if hasattr(dbm, 'get'):
184 value = dbm.get(key, NO_VALUE)
185 value = dbm.get(key, NO_VALUE)
185 else:
186 else:
186 # gdbm objects lack a .get method
187 # gdbm objects lack a .get method
187 try:
188 try:
188 value = dbm[key]
189 value = dbm[key]
189 except KeyError:
190 except KeyError:
190 value = NO_VALUE
191 value = NO_VALUE
191 if value is not NO_VALUE:
192 if value is not NO_VALUE:
192 value = self._loads(value)
193 value = self._loads(value)
193 return value
194 return value
194
195
195 def set(self, key, value):
196 def set(self, key, value):
196 with self._dbm_file(True) as dbm:
197 with self._dbm_file(True) as dbm:
197 dbm[key] = self._dumps(value)
198 dbm[key] = self._dumps(value)
198
199
199 def set_multi(self, mapping):
200 def set_multi(self, mapping):
200 with self._dbm_file(True) as dbm:
201 with self._dbm_file(True) as dbm:
201 for key, value in mapping.items():
202 for key, value in mapping.items():
202 dbm[key] = self._dumps(value)
203 dbm[key] = self._dumps(value)
203
204
204
205
205 class BaseRedisBackend(redis_backend.RedisBackend):
206 class BaseRedisBackend(redis_backend.RedisBackend):
207
208 def _create_client(self):
209 args = {}
210
211 if self.url is not None:
212 args.update(url=self.url)
213
214 else:
215 args.update(
216 host=self.host, password=self.password,
217 port=self.port, db=self.db
218 )
219
220 connection_pool = redis.ConnectionPool(**args)
221
222 return redis.StrictRedis(connection_pool=connection_pool)
223
206 def list_keys(self, prefix=''):
224 def list_keys(self, prefix=''):
207 prefix = '{}:{}*'.format(self.key_prefix, prefix)
225 prefix = '{}:{}*'.format(self.key_prefix, prefix)
208 return self.client.keys(prefix)
226 return self.client.keys(prefix)
209
227
210 def get_store(self):
228 def get_store(self):
211 return self.client.connection_pool
229 return self.client.connection_pool
212
230
213 def get(self, key):
231 def get(self, key):
214 value = self.client.get(key)
232 value = self.client.get(key)
215 if value is None:
233 if value is None:
216 return NO_VALUE
234 return NO_VALUE
217 return self._loads(value)
235 return self._loads(value)
218
236
219 def get_multi(self, keys):
237 def get_multi(self, keys):
220 if not keys:
238 if not keys:
221 return []
239 return []
222 values = self.client.mget(keys)
240 values = self.client.mget(keys)
223 loads = self._loads
241 loads = self._loads
224 return [
242 return [
225 loads(v) if v is not None else NO_VALUE
243 loads(v) if v is not None else NO_VALUE
226 for v in values]
244 for v in values]
227
245
228 def set(self, key, value):
246 def set(self, key, value):
229 if self.redis_expiration_time:
247 if self.redis_expiration_time:
230 self.client.setex(key, self.redis_expiration_time,
248 self.client.setex(key, self.redis_expiration_time,
231 self._dumps(value))
249 self._dumps(value))
232 else:
250 else:
233 self.client.set(key, self._dumps(value))
251 self.client.set(key, self._dumps(value))
234
252
235 def set_multi(self, mapping):
253 def set_multi(self, mapping):
236 dumps = self._dumps
254 dumps = self._dumps
237 mapping = dict(
255 mapping = dict(
238 (k, dumps(v))
256 (k, dumps(v))
239 for k, v in mapping.items()
257 for k, v in mapping.items()
240 )
258 )
241
259
242 if not self.redis_expiration_time:
260 if not self.redis_expiration_time:
243 self.client.mset(mapping)
261 self.client.mset(mapping)
244 else:
262 else:
245 pipe = self.client.pipeline()
263 pipe = self.client.pipeline()
246 for key, value in mapping.items():
264 for key, value in mapping.items():
247 pipe.setex(key, self.redis_expiration_time, value)
265 pipe.setex(key, self.redis_expiration_time, value)
248 pipe.execute()
266 pipe.execute()
249
267
250 def get_mutex(self, key):
268 def get_mutex(self, key):
251 u = redis_backend.u
269 u = redis_backend.u
252 if self.distributed_lock:
270 if self.distributed_lock:
253 lock_key = u('_lock_{0}').format(key)
271 lock_key = u('_lock_{0}').format(key)
254 log.debug('Trying to acquire Redis lock for key %s', lock_key)
272 log.debug('Trying to acquire Redis lock for key %s', lock_key)
255 return self.client.lock(lock_key, self.lock_timeout, self.lock_sleep)
273 return self.client.lock(lock_key, self.lock_timeout, self.lock_sleep)
256 else:
274 else:
257 return None
275 return None
258
276
259
277
260 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
278 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
261 key_prefix = 'redis_pickle_backend'
279 key_prefix = 'redis_pickle_backend'
262 pass
280 pass
263
281
264
282
265 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
283 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
266 key_prefix = 'redis_msgpack_backend'
284 key_prefix = 'redis_msgpack_backend'
267 pass
285 pass
General Comments 0
You need to be logged in to leave comments. Login now