##// END OF EJS Templates
cache: fix overwrite of flock timeout, and improve logging.
marcink -
r2947:21c6c394 default
parent child Browse files
Show More
@@ -1,201 +1,203 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2015-2018 RhodeCode GmbH
3 # Copyright (C) 2015-2018 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 import time
20 import time
21 import errno
21 import errno
22 import logging
22 import logging
23
23
24 import gevent
24 import gevent
25
25
26 from dogpile.cache.backends import memory as memory_backend
26 from dogpile.cache.backends import memory as memory_backend
27 from dogpile.cache.backends import file as file_backend
27 from dogpile.cache.backends import file as file_backend
28 from dogpile.cache.backends import redis as redis_backend
28 from dogpile.cache.backends import redis as redis_backend
29 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
29 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
30 from dogpile.cache.util import memoized_property
30 from dogpile.cache.util import memoized_property
31
31
32 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
32 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
33
33
34
34
35 _default_max_size = 1024
35 _default_max_size = 1024
36
36
37 log = logging.getLogger(__name__)
37 log = logging.getLogger(__name__)
38
38
39
39
40 class LRUMemoryBackend(memory_backend.MemoryBackend):
40 class LRUMemoryBackend(memory_backend.MemoryBackend):
41 pickle_values = False
41 pickle_values = False
42
42
43 def __init__(self, arguments):
43 def __init__(self, arguments):
44 max_size = arguments.pop('max_size', _default_max_size)
44 max_size = arguments.pop('max_size', _default_max_size)
45
45
46 LRUDictClass = LRUDict
46 LRUDictClass = LRUDict
47 if arguments.pop('log_key_count', None):
47 if arguments.pop('log_key_count', None):
48 LRUDictClass = LRUDictDebug
48 LRUDictClass = LRUDictDebug
49
49
50 arguments['cache_dict'] = LRUDictClass(max_size)
50 arguments['cache_dict'] = LRUDictClass(max_size)
51 super(LRUMemoryBackend, self).__init__(arguments)
51 super(LRUMemoryBackend, self).__init__(arguments)
52
52
53 def delete(self, key):
53 def delete(self, key):
54 try:
54 try:
55 del self._cache[key]
55 del self._cache[key]
56 except KeyError:
56 except KeyError:
57 # we don't care if key isn't there at deletion
57 # we don't care if key isn't there at deletion
58 pass
58 pass
59
59
60 def delete_multi(self, keys):
60 def delete_multi(self, keys):
61 for key in keys:
61 for key in keys:
62 self.delete(key)
62 self.delete(key)
63
63
64
64
65 class Serializer(object):
65 class Serializer(object):
66 def _dumps(self, value, safe=False):
66 def _dumps(self, value, safe=False):
67 try:
67 try:
68 return compat.pickle.dumps(value)
68 return compat.pickle.dumps(value)
69 except Exception:
69 except Exception:
70 if safe:
70 if safe:
71 return NO_VALUE
71 return NO_VALUE
72 else:
72 else:
73 raise
73 raise
74
74
75 def _loads(self, value, safe=True):
75 def _loads(self, value, safe=True):
76 try:
76 try:
77 return compat.pickle.loads(value)
77 return compat.pickle.loads(value)
78 except Exception:
78 except Exception:
79 if safe:
79 if safe:
80 return NO_VALUE
80 return NO_VALUE
81 else:
81 else:
82 raise
82 raise
83
83
84
84
85 class CustomLockFactory(FileLock):
85 class CustomLockFactory(FileLock):
86
86
87 @memoized_property
87 @memoized_property
88 def _module(self):
88 def _module(self):
89 import fcntl
89 import fcntl
90 flock_org = fcntl.flock
90 flock_org = fcntl.flock
91
91
92 def gevent_flock(fd, operation):
92 def gevent_flock(fd, operation):
93 """
93 """
94 Gevent compatible flock
94 Gevent compatible flock
95 """
95 """
96 # set non-blocking, this will cause an exception if we cannot acquire a lock
96 # set non-blocking, this will cause an exception if we cannot acquire a lock
97 operation |= fcntl.LOCK_NB
97 operation |= fcntl.LOCK_NB
98 start_lock_time = time.time()
98 start_lock_time = time.time()
99 timeout = 60 * 5 # 5min
99 timeout = 60 * 5 # 5min
100 while True:
100 while True:
101 try:
101 try:
102 flock_org(fd, operation)
102 flock_org(fd, operation)
103 # lock has been acquired
103 # lock has been acquired
104 break
104 break
105 except (OSError, IOError) as e:
105 except (OSError, IOError) as e:
106 # raise on other errors than Resource temporarily unavailable
106 # raise on other errors than Resource temporarily unavailable
107 if e.errno != errno.EAGAIN:
107 if e.errno != errno.EAGAIN:
108 raise
108 raise
109 elif (time.time() - start_lock_time) > timeout:
109 elif (time.time() - start_lock_time) > timeout:
110 # waited to much time on a lock, better fail than loop for ever
110 # waited to much time on a lock, better fail than loop for ever
111 log.error('Failed to acquire lock on %s file', self.filename)
111 log.error('Failed to acquire lock on `%s` after waiting %ss',
112 self.filename, timeout)
112 raise
113 raise
113 timeout = 0.03
114 wait_timeout = 0.03
114 log.debug('Failed to acquire lock, retry in %ss', timeout)
115 log.debug('Failed to acquire lock on `%s`, retry in %ss',
115 gevent.sleep(timeout)
116 self.filename, wait_timeout)
117 gevent.sleep(wait_timeout)
116
118
117 fcntl.flock = gevent_flock
119 fcntl.flock = gevent_flock
118 return fcntl
120 return fcntl
119
121
120
122
121 class FileNamespaceBackend(Serializer, file_backend.DBMBackend):
123 class FileNamespaceBackend(Serializer, file_backend.DBMBackend):
122
124
123 def __init__(self, arguments):
125 def __init__(self, arguments):
124 arguments['lock_factory'] = CustomLockFactory
126 arguments['lock_factory'] = CustomLockFactory
125 super(FileNamespaceBackend, self).__init__(arguments)
127 super(FileNamespaceBackend, self).__init__(arguments)
126
128
127 def list_keys(self, prefix=''):
129 def list_keys(self, prefix=''):
128 def cond(v):
130 def cond(v):
129 if not prefix:
131 if not prefix:
130 return True
132 return True
131
133
132 if v.startswith(prefix):
134 if v.startswith(prefix):
133 return True
135 return True
134 return False
136 return False
135
137
136 with self._dbm_file(True) as dbm:
138 with self._dbm_file(True) as dbm:
137
139
138 return filter(cond, dbm.keys())
140 return filter(cond, dbm.keys())
139
141
140 def get_store(self):
142 def get_store(self):
141 return self.filename
143 return self.filename
142
144
143 def get(self, key):
145 def get(self, key):
144 with self._dbm_file(False) as dbm:
146 with self._dbm_file(False) as dbm:
145 if hasattr(dbm, 'get'):
147 if hasattr(dbm, 'get'):
146 value = dbm.get(key, NO_VALUE)
148 value = dbm.get(key, NO_VALUE)
147 else:
149 else:
148 # gdbm objects lack a .get method
150 # gdbm objects lack a .get method
149 try:
151 try:
150 value = dbm[key]
152 value = dbm[key]
151 except KeyError:
153 except KeyError:
152 value = NO_VALUE
154 value = NO_VALUE
153 if value is not NO_VALUE:
155 if value is not NO_VALUE:
154 value = self._loads(value)
156 value = self._loads(value)
155 return value
157 return value
156
158
157 def set(self, key, value):
159 def set(self, key, value):
158 with self._dbm_file(True) as dbm:
160 with self._dbm_file(True) as dbm:
159 dbm[key] = self._dumps(value)
161 dbm[key] = self._dumps(value)
160
162
161 def set_multi(self, mapping):
163 def set_multi(self, mapping):
162 with self._dbm_file(True) as dbm:
164 with self._dbm_file(True) as dbm:
163 for key, value in mapping.items():
165 for key, value in mapping.items():
164 dbm[key] = self._dumps(value)
166 dbm[key] = self._dumps(value)
165
167
166
168
167 class RedisPickleBackend(Serializer, redis_backend.RedisBackend):
169 class RedisPickleBackend(Serializer, redis_backend.RedisBackend):
168 def list_keys(self, prefix=''):
170 def list_keys(self, prefix=''):
169 if prefix:
171 if prefix:
170 prefix = prefix + '*'
172 prefix = prefix + '*'
171 return self.client.keys(prefix)
173 return self.client.keys(prefix)
172
174
173 def get_store(self):
175 def get_store(self):
174 return self.client.connection_pool
176 return self.client.connection_pool
175
177
176 def get(self, key):
178 def get(self, key):
177 value = self.client.get(key)
179 value = self.client.get(key)
178 if value is None:
180 if value is None:
179 return NO_VALUE
181 return NO_VALUE
180 return self._loads(value)
182 return self._loads(value)
181
183
182 def set(self, key, value):
184 def set(self, key, value):
183 if self.redis_expiration_time:
185 if self.redis_expiration_time:
184 self.client.setex(key, self.redis_expiration_time,
186 self.client.setex(key, self.redis_expiration_time,
185 self._dumps(value))
187 self._dumps(value))
186 else:
188 else:
187 self.client.set(key, self._dumps(value))
189 self.client.set(key, self._dumps(value))
188
190
189 def set_multi(self, mapping):
191 def set_multi(self, mapping):
190 mapping = dict(
192 mapping = dict(
191 (k, self._dumps(v))
193 (k, self._dumps(v))
192 for k, v in mapping.items()
194 for k, v in mapping.items()
193 )
195 )
194
196
195 if not self.redis_expiration_time:
197 if not self.redis_expiration_time:
196 self.client.mset(mapping)
198 self.client.mset(mapping)
197 else:
199 else:
198 pipe = self.client.pipeline()
200 pipe = self.client.pipeline()
199 for key, value in mapping.items():
201 for key, value in mapping.items():
200 pipe.setex(key, self.redis_expiration_time, value)
202 pipe.setex(key, self.redis_expiration_time, value)
201 pipe.execute()
203 pipe.execute()
General Comments 0
You need to be logged in to leave comments. Login now