##// END OF EJS Templates
caches: reduce timeouts on flock, and provide error output about filename which we cannot lock at.
marcink -
r2941:49e7d41f default
parent child Browse files
Show More
@@ -1,201 +1,202 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2015-2018 RhodeCode GmbH
3 # Copyright (C) 2015-2018 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 import time
20 import time
21 import errno
21 import errno
22 import logging
22 import logging
23
23
24 import gevent
24 import gevent
25
25
26 from dogpile.cache.backends import memory as memory_backend
26 from dogpile.cache.backends import memory as memory_backend
27 from dogpile.cache.backends import file as file_backend
27 from dogpile.cache.backends import file as file_backend
28 from dogpile.cache.backends import redis as redis_backend
28 from dogpile.cache.backends import redis as redis_backend
29 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
29 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
30 from dogpile.cache.util import memoized_property
30 from dogpile.cache.util import memoized_property
31 from lru import LRU as LRUDict
31 from lru import LRU as LRUDict
32
32
33
33
34 _default_max_size = 1024
34 _default_max_size = 1024
35
35
36 log = logging.getLogger(__name__)
36 log = logging.getLogger(__name__)
37
37
38
38
39 class LRUMemoryBackend(memory_backend.MemoryBackend):
39 class LRUMemoryBackend(memory_backend.MemoryBackend):
40 pickle_values = False
40 pickle_values = False
41
41
42 def __init__(self, arguments):
42 def __init__(self, arguments):
43 max_size = arguments.pop('max_size', _default_max_size)
43 max_size = arguments.pop('max_size', _default_max_size)
44 callback = None
44 callback = None
45 if arguments.pop('log_max_size_reached', None):
45 if arguments.pop('log_max_size_reached', None):
46 def evicted(key, value):
46 def evicted(key, value):
47 log.debug(
47 log.debug(
48 'LRU: evicting key `%s` due to max size %s reach', key, max_size)
48 'LRU: evicting key `%s` due to max size %s reach', key, max_size)
49 callback = evicted
49 callback = evicted
50
50
51 arguments['cache_dict'] = LRUDict(max_size, callback=callback)
51 arguments['cache_dict'] = LRUDict(max_size, callback=callback)
52 super(LRUMemoryBackend, self).__init__(arguments)
52 super(LRUMemoryBackend, self).__init__(arguments)
53
53
54 def delete(self, key):
54 def delete(self, key):
55 try:
55 try:
56 del self._cache[key]
56 del self._cache[key]
57 except KeyError:
57 except KeyError:
58 # we don't care if key isn't there at deletion
58 # we don't care if key isn't there at deletion
59 pass
59 pass
60
60
61 def delete_multi(self, keys):
61 def delete_multi(self, keys):
62 for key in keys:
62 for key in keys:
63 self.delete(key)
63 self.delete(key)
64
64
65
65
66 class Serializer(object):
66 class Serializer(object):
67 def _dumps(self, value, safe=False):
67 def _dumps(self, value, safe=False):
68 try:
68 try:
69 return compat.pickle.dumps(value)
69 return compat.pickle.dumps(value)
70 except Exception:
70 except Exception:
71 if safe:
71 if safe:
72 return NO_VALUE
72 return NO_VALUE
73 else:
73 else:
74 raise
74 raise
75
75
76 def _loads(self, value, safe=True):
76 def _loads(self, value, safe=True):
77 try:
77 try:
78 return compat.pickle.loads(value)
78 return compat.pickle.loads(value)
79 except Exception:
79 except Exception:
80 if safe:
80 if safe:
81 return NO_VALUE
81 return NO_VALUE
82 else:
82 else:
83 raise
83 raise
84
84
85
85
86 class CustomLockFactory(FileLock):
86 class CustomLockFactory(FileLock):
87
87
88 @memoized_property
88 @memoized_property
89 def _module(self):
89 def _module(self):
90 import fcntl
90 import fcntl
91 flock_org = fcntl.flock
91 flock_org = fcntl.flock
92
92
93 def gevent_flock(fd, operation):
93 def gevent_flock(fd, operation):
94 """
94 """
95 Gevent compatible flock
95 Gevent compatible flock
96 """
96 """
97 # set non-blocking, this will cause an exception if we cannot acquire a lock
97 # set non-blocking, this will cause an exception if we cannot acquire a lock
98 operation |= fcntl.LOCK_NB
98 operation |= fcntl.LOCK_NB
99 start_lock_time = time.time()
99 start_lock_time = time.time()
100 timeout = 60 * 5 # 5min
100 timeout = 60 * 5 # 5min
101 while True:
101 while True:
102 try:
102 try:
103 flock_org(fd, operation)
103 flock_org(fd, operation)
104 # lock has been acquired
104 # lock has been acquired
105 break
105 break
106 except (OSError, IOError) as e:
106 except (OSError, IOError) as e:
107 # raise on other errors than Resource temporarily unavailable
107 # raise on other errors than Resource temporarily unavailable
108 if e.errno != errno.EAGAIN:
108 if e.errno != errno.EAGAIN:
109 raise
109 raise
110 elif (time.time() - start_lock_time) > timeout:
110 elif (time.time() - start_lock_time) > timeout:
111 # waited to much time on a lock, better fail than loop for ever
111 # waited to much time on a lock, better fail than loop for ever
112 log.error('Failed to acquire lock on %s file', self.filename)
112 raise
113 raise
113
114
114 log.debug('Failed to acquire lock, retry in 0.1')
115 log.debug('Failed to acquire lock, retry in 0.03')
115 gevent.sleep(0.1)
116 gevent.sleep(0.03)
116
117
117 fcntl.flock = gevent_flock
118 fcntl.flock = gevent_flock
118 return fcntl
119 return fcntl
119
120
120
121
121 class FileNamespaceBackend(Serializer, file_backend.DBMBackend):
122 class FileNamespaceBackend(Serializer, file_backend.DBMBackend):
122
123
123 def __init__(self, arguments):
124 def __init__(self, arguments):
124 arguments['lock_factory'] = CustomLockFactory
125 arguments['lock_factory'] = CustomLockFactory
125 super(FileNamespaceBackend, self).__init__(arguments)
126 super(FileNamespaceBackend, self).__init__(arguments)
126
127
127 def list_keys(self, prefix=''):
128 def list_keys(self, prefix=''):
128 def cond(v):
129 def cond(v):
129 if not prefix:
130 if not prefix:
130 return True
131 return True
131
132
132 if v.startswith(prefix):
133 if v.startswith(prefix):
133 return True
134 return True
134 return False
135 return False
135
136
136 with self._dbm_file(True) as dbm:
137 with self._dbm_file(True) as dbm:
137
138
138 return filter(cond, dbm.keys())
139 return filter(cond, dbm.keys())
139
140
140 def get_store(self):
141 def get_store(self):
141 return self.filename
142 return self.filename
142
143
143 def get(self, key):
144 def get(self, key):
144 with self._dbm_file(False) as dbm:
145 with self._dbm_file(False) as dbm:
145 if hasattr(dbm, 'get'):
146 if hasattr(dbm, 'get'):
146 value = dbm.get(key, NO_VALUE)
147 value = dbm.get(key, NO_VALUE)
147 else:
148 else:
148 # gdbm objects lack a .get method
149 # gdbm objects lack a .get method
149 try:
150 try:
150 value = dbm[key]
151 value = dbm[key]
151 except KeyError:
152 except KeyError:
152 value = NO_VALUE
153 value = NO_VALUE
153 if value is not NO_VALUE:
154 if value is not NO_VALUE:
154 value = self._loads(value)
155 value = self._loads(value)
155 return value
156 return value
156
157
157 def set(self, key, value):
158 def set(self, key, value):
158 with self._dbm_file(True) as dbm:
159 with self._dbm_file(True) as dbm:
159 dbm[key] = self._dumps(value)
160 dbm[key] = self._dumps(value)
160
161
161 def set_multi(self, mapping):
162 def set_multi(self, mapping):
162 with self._dbm_file(True) as dbm:
163 with self._dbm_file(True) as dbm:
163 for key, value in mapping.items():
164 for key, value in mapping.items():
164 dbm[key] = self._dumps(value)
165 dbm[key] = self._dumps(value)
165
166
166
167
167 class RedisPickleBackend(Serializer, redis_backend.RedisBackend):
168 class RedisPickleBackend(Serializer, redis_backend.RedisBackend):
168 def list_keys(self, prefix=''):
169 def list_keys(self, prefix=''):
169 if prefix:
170 if prefix:
170 prefix = prefix + '*'
171 prefix = prefix + '*'
171 return self.client.keys(prefix)
172 return self.client.keys(prefix)
172
173
173 def get_store(self):
174 def get_store(self):
174 return self.client.connection_pool
175 return self.client.connection_pool
175
176
176 def get(self, key):
177 def get(self, key):
177 value = self.client.get(key)
178 value = self.client.get(key)
178 if value is None:
179 if value is None:
179 return NO_VALUE
180 return NO_VALUE
180 return self._loads(value)
181 return self._loads(value)
181
182
182 def set(self, key, value):
183 def set(self, key, value):
183 if self.redis_expiration_time:
184 if self.redis_expiration_time:
184 self.client.setex(key, self.redis_expiration_time,
185 self.client.setex(key, self.redis_expiration_time,
185 self._dumps(value))
186 self._dumps(value))
186 else:
187 else:
187 self.client.set(key, self._dumps(value))
188 self.client.set(key, self._dumps(value))
188
189
189 def set_multi(self, mapping):
190 def set_multi(self, mapping):
190 mapping = dict(
191 mapping = dict(
191 (k, self._dumps(v))
192 (k, self._dumps(v))
192 for k, v in mapping.items()
193 for k, v in mapping.items()
193 )
194 )
194
195
195 if not self.redis_expiration_time:
196 if not self.redis_expiration_time:
196 self.client.mset(mapping)
197 self.client.mset(mapping)
197 else:
198 else:
198 pipe = self.client.pipeline()
199 pipe = self.client.pipeline()
199 for key, value in mapping.items():
200 for key, value in mapping.items():
200 pipe.setex(key, self.redis_expiration_time, value)
201 pipe.setex(key, self.redis_expiration_time, value)
201 pipe.execute()
202 pipe.execute()
General Comments 0
You need to be logged in to leave comments. Login now