##// END OF EJS Templates
caches: use a gevent compatible file-lock mechanism....
marcink -
r2878:bb23dd31 default
parent child Browse files
Show More
@@ -1,120 +1,164 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2015-2018 RhodeCode GmbH
3 # Copyright (C) 2015-2018 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 import time
21 import errno
22 import logging
23
24 import gevent
20
25
21 from dogpile.cache.backends import memory as memory_backend
26 from dogpile.cache.backends import memory as memory_backend
22 from dogpile.cache.backends import file as file_backend
27 from dogpile.cache.backends import file as file_backend
23 from dogpile.cache.backends import redis as redis_backend
28 from dogpile.cache.backends import redis as redis_backend
24 from dogpile.cache.backends.file import NO_VALUE, compat
29 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
30 from dogpile.cache.util import memoized_property
25
31
26 from rhodecode.lib.memory_lru_debug import LRUDict
32 from rhodecode.lib.memory_lru_debug import LRUDict
27
33
28 _default_max_size = 1024
34 _default_max_size = 1024
29
35
36 log = logging.getLogger(__name__)
37
30
38
31 class LRUMemoryBackend(memory_backend.MemoryBackend):
39 class LRUMemoryBackend(memory_backend.MemoryBackend):
32
40
33 def __init__(self, arguments):
41 def __init__(self, arguments):
34 max_size = arguments.pop('max_size', _default_max_size)
42 max_size = arguments.pop('max_size', _default_max_size)
35 arguments['cache_dict'] = LRUDict(max_size)
43 arguments['cache_dict'] = LRUDict(max_size)
36 super(LRUMemoryBackend, self).__init__(arguments)
44 super(LRUMemoryBackend, self).__init__(arguments)
37
45
38
46
39 class Serializer(object):
47 class Serializer(object):
40 def _dumps(self, value):
48 def _dumps(self, value):
41 return compat.pickle.dumps(value)
49 return compat.pickle.dumps(value)
42
50
43 def _loads(self, value):
51 def _loads(self, value):
44 return compat.pickle.loads(value)
52 return compat.pickle.loads(value)
45
53
46
54
55 class CustomLockFactory(FileLock):
56
57 @memoized_property
58 def _module(self):
59 import fcntl
60 flock_org = fcntl.flock
61
62 def gevent_flock(fd, operation):
63 """
64 Gevent compatible flock
65 """
66 # set non-blocking, this will cause an exception if we cannot acquire a lock
67 operation |= fcntl.LOCK_NB
68 start_lock_time = time.time()
69 timeout = 60 * 5 # 5min
70 while True:
71 try:
72 flock_org(fd, operation)
73 # lock has been acquired
74 break
75 except (OSError, IOError) as e:
76 # raise on other errors than Resource temporarily unavailable
77 if e.errno != errno.EAGAIN:
78 raise
79 elif (time.time() - start_lock_time) > timeout:
80 # waited to much time on a lock, better fail than loop for ever
81 raise
82
83 log.debug('Failed to acquire lock, retry in 0.1')
84 gevent.sleep(0.1)
85
86 fcntl.flock = gevent_flock
87 return fcntl
88
89
47 class FileNamespaceBackend(Serializer, file_backend.DBMBackend):
90 class FileNamespaceBackend(Serializer, file_backend.DBMBackend):
48
91
49 def __init__(self, arguments):
92 def __init__(self, arguments):
93 arguments['lock_factory'] = CustomLockFactory
50 super(FileNamespaceBackend, self).__init__(arguments)
94 super(FileNamespaceBackend, self).__init__(arguments)
51
95
52 def list_keys(self, prefix=''):
96 def list_keys(self, prefix=''):
53 def cond(v):
97 def cond(v):
54 if not prefix:
98 if not prefix:
55 return True
99 return True
56
100
57 if v.startswith(prefix):
101 if v.startswith(prefix):
58 return True
102 return True
59 return False
103 return False
60
104
61 with self._dbm_file(True) as dbm:
105 with self._dbm_file(True) as dbm:
62
106
63 return filter(cond, dbm.keys())
107 return filter(cond, dbm.keys())
64
108
65 def get_store(self):
109 def get_store(self):
66 return self.filename
110 return self.filename
67
111
68 def get(self, key):
112 def get(self, key):
69 with self._dbm_file(False) as dbm:
113 with self._dbm_file(False) as dbm:
70 if hasattr(dbm, 'get'):
114 if hasattr(dbm, 'get'):
71 value = dbm.get(key, NO_VALUE)
115 value = dbm.get(key, NO_VALUE)
72 else:
116 else:
73 # gdbm objects lack a .get method
117 # gdbm objects lack a .get method
74 try:
118 try:
75 value = dbm[key]
119 value = dbm[key]
76 except KeyError:
120 except KeyError:
77 value = NO_VALUE
121 value = NO_VALUE
78 if value is not NO_VALUE:
122 if value is not NO_VALUE:
79 value = self._loads(value)
123 value = self._loads(value)
80 return value
124 return value
81
125
82 def set(self, key, value):
126 def set(self, key, value):
83 with self._dbm_file(True) as dbm:
127 with self._dbm_file(True) as dbm:
84 dbm[key] = self._dumps(value)
128 dbm[key] = self._dumps(value)
85
129
86 def set_multi(self, mapping):
130 def set_multi(self, mapping):
87 with self._dbm_file(True) as dbm:
131 with self._dbm_file(True) as dbm:
88 for key, value in mapping.items():
132 for key, value in mapping.items():
89 dbm[key] = self._dumps(value)
133 dbm[key] = self._dumps(value)
90
134
91
135
92 class RedisPickleBackend(Serializer, redis_backend.RedisBackend):
136 class RedisPickleBackend(Serializer, redis_backend.RedisBackend):
93 def list_keys(self, prefix=''):
137 def list_keys(self, prefix=''):
94 if prefix:
138 if prefix:
95 prefix = prefix + '*'
139 prefix = prefix + '*'
96 return self.client.keys(prefix)
140 return self.client.keys(prefix)
97
141
98 def get_store(self):
142 def get_store(self):
99 return self.client.connection_pool
143 return self.client.connection_pool
100
144
101 def set(self, key, value):
145 def set(self, key, value):
102 if self.redis_expiration_time:
146 if self.redis_expiration_time:
103 self.client.setex(key, self.redis_expiration_time,
147 self.client.setex(key, self.redis_expiration_time,
104 self._dumps(value))
148 self._dumps(value))
105 else:
149 else:
106 self.client.set(key, self._dumps(value))
150 self.client.set(key, self._dumps(value))
107
151
108 def set_multi(self, mapping):
152 def set_multi(self, mapping):
109 mapping = dict(
153 mapping = dict(
110 (k, self._dumps(v))
154 (k, self._dumps(v))
111 for k, v in mapping.items()
155 for k, v in mapping.items()
112 )
156 )
113
157
114 if not self.redis_expiration_time:
158 if not self.redis_expiration_time:
115 self.client.mset(mapping)
159 self.client.mset(mapping)
116 else:
160 else:
117 pipe = self.client.pipeline()
161 pipe = self.client.pipeline()
118 for key, value in mapping.items():
162 for key, value in mapping.items():
119 pipe.setex(key, self.redis_expiration_time, value)
163 pipe.setex(key, self.redis_expiration_time, value)
120 pipe.execute()
164 pipe.execute()
General Comments 0
You need to be logged in to leave comments. Login now