##// END OF EJS Templates
cache: use global flock to prevent recursion when using gevent workers.
marcink -
r3402:c138a747 default
parent child Browse files
Show More
@@ -1,203 +1,205 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2015-2019 RhodeCode GmbH
3 # Copyright (C) 2015-2019 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 import time
20 import time
21 import errno
21 import errno
22 import logging
22 import logging
23
23
24 import gevent
24 import gevent
25
25
26 from dogpile.cache.backends import memory as memory_backend
26 from dogpile.cache.backends import memory as memory_backend
27 from dogpile.cache.backends import file as file_backend
27 from dogpile.cache.backends import file as file_backend
28 from dogpile.cache.backends import redis as redis_backend
28 from dogpile.cache.backends import redis as redis_backend
29 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
29 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
30 from dogpile.cache.util import memoized_property
30 from dogpile.cache.util import memoized_property
31
31
32 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
32 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
33
33
34
34
35 _default_max_size = 1024
35 _default_max_size = 1024
36
36
37 log = logging.getLogger(__name__)
37 log = logging.getLogger(__name__)
38
38
39
39
40 class LRUMemoryBackend(memory_backend.MemoryBackend):
40 class LRUMemoryBackend(memory_backend.MemoryBackend):
41 pickle_values = False
41 pickle_values = False
42
42
43 def __init__(self, arguments):
43 def __init__(self, arguments):
44 max_size = arguments.pop('max_size', _default_max_size)
44 max_size = arguments.pop('max_size', _default_max_size)
45
45
46 LRUDictClass = LRUDict
46 LRUDictClass = LRUDict
47 if arguments.pop('log_key_count', None):
47 if arguments.pop('log_key_count', None):
48 LRUDictClass = LRUDictDebug
48 LRUDictClass = LRUDictDebug
49
49
50 arguments['cache_dict'] = LRUDictClass(max_size)
50 arguments['cache_dict'] = LRUDictClass(max_size)
51 super(LRUMemoryBackend, self).__init__(arguments)
51 super(LRUMemoryBackend, self).__init__(arguments)
52
52
53 def delete(self, key):
53 def delete(self, key):
54 try:
54 try:
55 del self._cache[key]
55 del self._cache[key]
56 except KeyError:
56 except KeyError:
57 # we don't care if key isn't there at deletion
57 # we don't care if key isn't there at deletion
58 pass
58 pass
59
59
60 def delete_multi(self, keys):
60 def delete_multi(self, keys):
61 for key in keys:
61 for key in keys:
62 self.delete(key)
62 self.delete(key)
63
63
64
64
65 class Serializer(object):
65 class Serializer(object):
66 def _dumps(self, value, safe=False):
66 def _dumps(self, value, safe=False):
67 try:
67 try:
68 return compat.pickle.dumps(value)
68 return compat.pickle.dumps(value)
69 except Exception:
69 except Exception:
70 if safe:
70 if safe:
71 return NO_VALUE
71 return NO_VALUE
72 else:
72 else:
73 raise
73 raise
74
74
75 def _loads(self, value, safe=True):
75 def _loads(self, value, safe=True):
76 try:
76 try:
77 return compat.pickle.loads(value)
77 return compat.pickle.loads(value)
78 except Exception:
78 except Exception:
79 if safe:
79 if safe:
80 return NO_VALUE
80 return NO_VALUE
81 else:
81 else:
82 raise
82 raise
83
83
84
84
85 import fcntl
86 flock_org = fcntl.flock
87
88
85 class CustomLockFactory(FileLock):
89 class CustomLockFactory(FileLock):
86
90
87 @memoized_property
91 @memoized_property
88 def _module(self):
92 def _module(self):
89 import fcntl
90 flock_org = fcntl.flock
91
93
92 def gevent_flock(fd, operation):
94 def gevent_flock(fd, operation):
93 """
95 """
94 Gevent compatible flock
96 Gevent compatible flock
95 """
97 """
96 # set non-blocking, this will cause an exception if we cannot acquire a lock
98 # set non-blocking, this will cause an exception if we cannot acquire a lock
97 operation |= fcntl.LOCK_NB
99 operation |= fcntl.LOCK_NB
98 start_lock_time = time.time()
100 start_lock_time = time.time()
99 timeout = 60 * 15 # 15min
101 timeout = 60 * 15 # 15min
100 while True:
102 while True:
101 try:
103 try:
102 flock_org(fd, operation)
104 flock_org(fd, operation)
103 # lock has been acquired
105 # lock has been acquired
104 break
106 break
105 except (OSError, IOError) as e:
107 except (OSError, IOError) as e:
106 # raise on other errors than Resource temporarily unavailable
108 # raise on other errors than Resource temporarily unavailable
107 if e.errno != errno.EAGAIN:
109 if e.errno != errno.EAGAIN:
108 raise
110 raise
109 elif (time.time() - start_lock_time) > timeout:
111 elif (time.time() - start_lock_time) > timeout:
110 # waited to much time on a lock, better fail than loop for ever
112 # waited to much time on a lock, better fail than loop for ever
111 log.error('Failed to acquire lock on `%s` after waiting %ss',
113 log.error('Failed to acquire lock on `%s` after waiting %ss',
112 self.filename, timeout)
114 self.filename, timeout)
113 raise
115 raise
114 wait_timeout = 0.03
116 wait_timeout = 0.03
115 log.debug('Failed to acquire lock on `%s`, retry in %ss',
117 log.debug('Failed to acquire lock on `%s`, retry in %ss',
116 self.filename, wait_timeout)
118 self.filename, wait_timeout)
117 gevent.sleep(wait_timeout)
119 gevent.sleep(wait_timeout)
118
120
119 fcntl.flock = gevent_flock
121 fcntl.flock = gevent_flock
120 return fcntl
122 return fcntl
121
123
122
124
123 class FileNamespaceBackend(Serializer, file_backend.DBMBackend):
125 class FileNamespaceBackend(Serializer, file_backend.DBMBackend):
124
126
125 def __init__(self, arguments):
127 def __init__(self, arguments):
126 arguments['lock_factory'] = CustomLockFactory
128 arguments['lock_factory'] = CustomLockFactory
127 super(FileNamespaceBackend, self).__init__(arguments)
129 super(FileNamespaceBackend, self).__init__(arguments)
128
130
129 def list_keys(self, prefix=''):
131 def list_keys(self, prefix=''):
130 def cond(v):
132 def cond(v):
131 if not prefix:
133 if not prefix:
132 return True
134 return True
133
135
134 if v.startswith(prefix):
136 if v.startswith(prefix):
135 return True
137 return True
136 return False
138 return False
137
139
138 with self._dbm_file(True) as dbm:
140 with self._dbm_file(True) as dbm:
139
141
140 return filter(cond, dbm.keys())
142 return filter(cond, dbm.keys())
141
143
142 def get_store(self):
144 def get_store(self):
143 return self.filename
145 return self.filename
144
146
145 def get(self, key):
147 def get(self, key):
146 with self._dbm_file(False) as dbm:
148 with self._dbm_file(False) as dbm:
147 if hasattr(dbm, 'get'):
149 if hasattr(dbm, 'get'):
148 value = dbm.get(key, NO_VALUE)
150 value = dbm.get(key, NO_VALUE)
149 else:
151 else:
150 # gdbm objects lack a .get method
152 # gdbm objects lack a .get method
151 try:
153 try:
152 value = dbm[key]
154 value = dbm[key]
153 except KeyError:
155 except KeyError:
154 value = NO_VALUE
156 value = NO_VALUE
155 if value is not NO_VALUE:
157 if value is not NO_VALUE:
156 value = self._loads(value)
158 value = self._loads(value)
157 return value
159 return value
158
160
159 def set(self, key, value):
161 def set(self, key, value):
160 with self._dbm_file(True) as dbm:
162 with self._dbm_file(True) as dbm:
161 dbm[key] = self._dumps(value)
163 dbm[key] = self._dumps(value)
162
164
163 def set_multi(self, mapping):
165 def set_multi(self, mapping):
164 with self._dbm_file(True) as dbm:
166 with self._dbm_file(True) as dbm:
165 for key, value in mapping.items():
167 for key, value in mapping.items():
166 dbm[key] = self._dumps(value)
168 dbm[key] = self._dumps(value)
167
169
168
170
169 class RedisPickleBackend(Serializer, redis_backend.RedisBackend):
171 class RedisPickleBackend(Serializer, redis_backend.RedisBackend):
170 def list_keys(self, prefix=''):
172 def list_keys(self, prefix=''):
171 if prefix:
173 if prefix:
172 prefix = prefix + '*'
174 prefix = prefix + '*'
173 return self.client.keys(prefix)
175 return self.client.keys(prefix)
174
176
175 def get_store(self):
177 def get_store(self):
176 return self.client.connection_pool
178 return self.client.connection_pool
177
179
178 def get(self, key):
180 def get(self, key):
179 value = self.client.get(key)
181 value = self.client.get(key)
180 if value is None:
182 if value is None:
181 return NO_VALUE
183 return NO_VALUE
182 return self._loads(value)
184 return self._loads(value)
183
185
184 def set(self, key, value):
186 def set(self, key, value):
185 if self.redis_expiration_time:
187 if self.redis_expiration_time:
186 self.client.setex(key, self.redis_expiration_time,
188 self.client.setex(key, self.redis_expiration_time,
187 self._dumps(value))
189 self._dumps(value))
188 else:
190 else:
189 self.client.set(key, self._dumps(value))
191 self.client.set(key, self._dumps(value))
190
192
191 def set_multi(self, mapping):
193 def set_multi(self, mapping):
192 mapping = dict(
194 mapping = dict(
193 (k, self._dumps(v))
195 (k, self._dumps(v))
194 for k, v in mapping.items()
196 for k, v in mapping.items()
195 )
197 )
196
198
197 if not self.redis_expiration_time:
199 if not self.redis_expiration_time:
198 self.client.mset(mapping)
200 self.client.mset(mapping)
199 else:
201 else:
200 pipe = self.client.pipeline()
202 pipe = self.client.pipeline()
201 for key, value in mapping.items():
203 for key, value in mapping.items():
202 pipe.setex(key, self.redis_expiration_time, value)
204 pipe.setex(key, self.redis_expiration_time, value)
203 pipe.execute()
205 pipe.execute()
General Comments 0
You need to be logged in to leave comments. Login now