##// END OF EJS Templates
caches: new cache + archive cache implementation
super-admin -
r1121:2809dfc5 python3
parent child Browse files
Show More
@@ -0,0 +1,72 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
18 import logging
19 import os
20 import diskcache
21
22 log = logging.getLogger(__name__)
23
24 cache_meta = None
25
26
27 def get_archival_config(config):
28 final_config = {
29 'archive_cache.eviction_policy': 'least-frequently-used'
30 }
31
32 for k, v in config.items():
33 if k.startswith('archive_cache'):
34 final_config[k] = v
35
36 return final_config
37
38
39 def get_archival_cache_store(config):
40
41 global cache_meta
42 if cache_meta is not None:
43 return cache_meta
44
45 config = get_archival_config(config)
46
47 archive_cache_dir = config['archive_cache.store_dir']
48 archive_cache_size_gb = config['archive_cache.cache_size_gb']
49 archive_cache_shards = config['archive_cache.cache_shards']
50 archive_cache_eviction_policy = config['archive_cache.eviction_policy']
51
52 log.debug('Initializing archival cache instance under %s', archive_cache_dir)
53
54 # check if it's ok to write, and re-create the archive cache
55 if not os.path.isdir(archive_cache_dir):
56 os.makedirs(archive_cache_dir, exist_ok=True)
57
58 d_cache = diskcache.FanoutCache(
59 archive_cache_dir, shards=archive_cache_shards,
60 cull_limit=0, # manual eviction required
61 size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
62 eviction_policy=archive_cache_eviction_policy,
63 timeout=30
64 )
65 cache_meta = d_cache
66 return cache_meta
67
68
69 def includeme(config):
70 # init our cache at start, for vcsserver we don't init at runtime
71 # because our cache config is sent via wire on make archive call, this call just lazy-enables the client
72 return
@@ -1,108 +1,112 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2020 RhodeCode GmbH
2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 import logging
18 import logging
19 import threading
19 import threading
20
20
21 from dogpile.cache import register_backend
21 from dogpile.cache import register_backend
22
22
23 from . import region_meta
23 from . import region_meta
24 from .utils import (
24 from .utils import (
25 backend_key_generator,
25 backend_key_generator,
26 clear_cache_namespace,
26 clear_cache_namespace,
27 get_default_cache_settings,
27 get_default_cache_settings,
28 get_or_create_region,
28 get_or_create_region,
29 make_region,
29 make_region,
30 str2bool,
30 str2bool,
31 )
31 )
32
32
33 module_name = 'vcsserver'
33 module_name = 'vcsserver'
34
34
35 register_backend(
35 register_backend(
36 "dogpile.cache.rc.memory_lru", f"{module_name}.lib.rc_cache.backends",
36 "dogpile.cache.rc.memory_lru", f"{module_name}.lib.rc_cache.backends",
37 "LRUMemoryBackend")
37 "LRUMemoryBackend")
38
38
39 register_backend(
39 register_backend(
40 "dogpile.cache.rc.file_namespace", f"{module_name}.lib.rc_cache.backends",
40 "dogpile.cache.rc.file_namespace", f"{module_name}.lib.rc_cache.backends",
41 "FileNamespaceBackend")
41 "FileNamespaceBackend")
42
42
43 register_backend(
43 register_backend(
44 "dogpile.cache.rc.redis", f"{module_name}.lib.rc_cache.backends",
44 "dogpile.cache.rc.redis", f"{module_name}.lib.rc_cache.backends",
45 "RedisPickleBackend")
45 "RedisPickleBackend")
46
46
47 register_backend(
47 register_backend(
48 "dogpile.cache.rc.redis_msgpack", f"{module_name}.lib.rc_cache.backends",
48 "dogpile.cache.rc.redis_msgpack", f"{module_name}.lib.rc_cache.backends",
49 "RedisMsgPackBackend")
49 "RedisMsgPackBackend")
50
50
51
51
52 log = logging.getLogger(__name__)
52 log = logging.getLogger(__name__)
53
53
54
54
55 CLEAR_DELETE = 'delete'
56 CLEAR_INVALIDATE = 'invalidate'
57
58
55 def async_creation_runner(cache, somekey, creator, mutex):
59 def async_creation_runner(cache, somekey, creator, mutex):
56
60
57 def runner():
61 def runner():
58 try:
62 try:
59 value = creator()
63 value = creator()
60 cache.set(somekey, value)
64 cache.set(somekey, value)
61 finally:
65 finally:
62 mutex.release()
66 mutex.release()
63
67
64 thread = threading.Thread(target=runner)
68 thread = threading.Thread(target=runner)
65 thread.start()
69 thread.start()
66
70
67
71
68 def configure_dogpile_cache(settings):
72 def configure_dogpile_cache(settings):
69 cache_dir = settings.get('cache_dir')
73 cache_dir = settings.get('cache_dir')
70 if cache_dir:
74 if cache_dir:
71 region_meta.dogpile_config_defaults['cache_dir'] = cache_dir
75 region_meta.dogpile_config_defaults['cache_dir'] = cache_dir
72
76
73 rc_cache_data = get_default_cache_settings(settings, prefixes=['rc_cache.'])
77 rc_cache_data = get_default_cache_settings(settings, prefixes=['rc_cache.'])
74
78
75 # inspect available namespaces
79 # inspect available namespaces
76 avail_regions = set()
80 avail_regions = set()
77 for key in rc_cache_data.keys():
81 for key in rc_cache_data.keys():
78 namespace_name = key.split('.', 1)[0]
82 namespace_name = key.split('.', 1)[0]
79 if namespace_name in avail_regions:
83 if namespace_name in avail_regions:
80 continue
84 continue
81
85
82 avail_regions.add(namespace_name)
86 avail_regions.add(namespace_name)
83 log.debug('dogpile: found following cache regions: %s', namespace_name)
87 log.debug('dogpile: found following cache regions: %s', namespace_name)
84
88
85 new_region = make_region(
89 new_region = make_region(
86 name=namespace_name,
90 name=namespace_name,
87 function_key_generator=None,
91 function_key_generator=None,
88 async_creation_runner=None
92 async_creation_runner=None
89 )
93 )
90
94
91 new_region.configure_from_config(settings, f'rc_cache.{namespace_name}.')
95 new_region.configure_from_config(settings, f'rc_cache.{namespace_name}.')
92 new_region.function_key_generator = backend_key_generator(new_region.actual_backend)
96 new_region.function_key_generator = backend_key_generator(new_region.actual_backend)
93
97
94 async_creator = str2bool(settings.pop(f'rc_cache.{namespace_name}.async_creator', 'false'))
98 async_creator = str2bool(settings.pop(f'rc_cache.{namespace_name}.async_creator', 'false'))
95 if async_creator:
99 if async_creator:
96 log.debug('configuring region %s with async creator', new_region)
100 log.debug('configuring region %s with async creator', new_region)
97 new_region.async_creation_runner = async_creation_runner
101 new_region.async_creation_runner = async_creation_runner
98
102
99 if log.isEnabledFor(logging.DEBUG):
103 if log.isEnabledFor(logging.DEBUG):
100 region_args = dict(backend=new_region.actual_backend,
104 region_args = dict(backend=new_region.actual_backend,
101 region_invalidator=new_region.region_invalidator.__class__)
105 region_invalidator=new_region.region_invalidator.__class__)
102 log.debug('dogpile: registering a new region `%s` %s', namespace_name, region_args)
106 log.debug('dogpile: registering a new region `%s` %s', namespace_name, region_args)
103
107
104 region_meta.dogpile_cache_regions[namespace_name] = new_region
108 region_meta.dogpile_cache_regions[namespace_name] = new_region
105
109
106
110
107 def includeme(config):
111 def includeme(config):
108 configure_dogpile_cache(config.registry.settings)
112 configure_dogpile_cache(config.registry.settings)
@@ -1,261 +1,267 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2020 RhodeCode GmbH
2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 import errno
18 import errno
19 import fcntl
19 import fcntl
20 import functools
20 import functools
21 import logging
21 import logging
22 import os
22 import os
23 import pickle
23 import pickle
24 #import time
24 #import time
25
25
26 #import gevent
26 #import gevent
27 import msgpack
27 import msgpack
28 import redis
28 import redis
29
29
30 flock_org = fcntl.flock
30 flock_org = fcntl.flock
31 from typing import Union
31 from typing import Union
32
32
33 from dogpile.cache.api import Deserializer, Serializer
33 from dogpile.cache.api import Deserializer, Serializer
34 from dogpile.cache.backends import file as file_backend
34 from dogpile.cache.backends import file as file_backend
35 from dogpile.cache.backends import memory as memory_backend
35 from dogpile.cache.backends import memory as memory_backend
36 from dogpile.cache.backends import redis as redis_backend
36 from dogpile.cache.backends import redis as redis_backend
37 from dogpile.cache.backends.file import FileLock
37 from dogpile.cache.backends.file import FileLock
38 from dogpile.cache.util import memoized_property
38 from dogpile.cache.util import memoized_property
39
39
40 from vcsserver.lib.memory_lru_dict import LRUDict, LRUDictDebug
40 from vcsserver.lib.memory_lru_dict import LRUDict, LRUDictDebug
41 from vcsserver.str_utils import safe_bytes, safe_str
41 from vcsserver.str_utils import safe_bytes, safe_str
42 from vcsserver.type_utils import str2bool
42 from vcsserver.type_utils import str2bool
43
43
44 _default_max_size = 1024
44 _default_max_size = 1024
45
45
46 log = logging.getLogger(__name__)
46 log = logging.getLogger(__name__)
47
47
48
48
49 class LRUMemoryBackend(memory_backend.MemoryBackend):
49 class LRUMemoryBackend(memory_backend.MemoryBackend):
50 key_prefix = 'lru_mem_backend'
50 key_prefix = 'lru_mem_backend'
51 pickle_values = False
51 pickle_values = False
52
52
53 def __init__(self, arguments):
53 def __init__(self, arguments):
54 self.max_size = arguments.pop('max_size', _default_max_size)
54 self.max_size = arguments.pop('max_size', _default_max_size)
55
55
56 LRUDictClass = LRUDict
56 LRUDictClass = LRUDict
57 if arguments.pop('log_key_count', None):
57 if arguments.pop('log_key_count', None):
58 LRUDictClass = LRUDictDebug
58 LRUDictClass = LRUDictDebug
59
59
60 arguments['cache_dict'] = LRUDictClass(self.max_size)
60 arguments['cache_dict'] = LRUDictClass(self.max_size)
61 super().__init__(arguments)
61 super().__init__(arguments)
62
62
63 def __repr__(self):
63 def __repr__(self):
64 return f'{self.__class__}(maxsize=`{self.max_size}`)'
64 return f'{self.__class__}(maxsize=`{self.max_size}`)'
65
65
66 def __str__(self):
66 def __str__(self):
67 return self.__repr__()
67 return self.__repr__()
68
68
69 def delete(self, key):
69 def delete(self, key):
70 try:
70 try:
71 del self._cache[key]
71 del self._cache[key]
72 except KeyError:
72 except KeyError:
73 # we don't care if key isn't there at deletion
73 # we don't care if key isn't there at deletion
74 pass
74 pass
75
75
76 def delete_multi(self, keys):
76 def delete_multi(self, keys):
77 for key in keys:
77 for key in keys:
78 self.delete(key)
78 self.delete(key)
79
79
80
80
81 class PickleSerializer:
81 class PickleSerializer:
82 serializer: None | Serializer = staticmethod( # type: ignore
82 serializer: None | Serializer = staticmethod( # type: ignore
83 functools.partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
83 functools.partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
84 )
84 )
85 deserializer: None | Deserializer = staticmethod( # type: ignore
85 deserializer: None | Deserializer = staticmethod( # type: ignore
86 functools.partial(pickle.loads)
86 functools.partial(pickle.loads)
87 )
87 )
88
88
89
89
90 class MsgPackSerializer(object):
90 class MsgPackSerializer(object):
91 serializer: None | Serializer = staticmethod( # type: ignore
91 serializer: None | Serializer = staticmethod( # type: ignore
92 msgpack.packb
92 msgpack.packb
93 )
93 )
94 deserializer: None | Deserializer = staticmethod( # type: ignore
94 deserializer: None | Deserializer = staticmethod( # type: ignore
95 functools.partial(msgpack.unpackb, use_list=False)
95 functools.partial(msgpack.unpackb, use_list=False)
96 )
96 )
97
97
98
98
99 class CustomLockFactory(FileLock):
99 class CustomLockFactory(FileLock):
100
100
101 pass
101 pass
102
102
103
103
104 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
104 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
105 key_prefix = 'file_backend'
105 key_prefix = 'file_backend'
106
106
107 def __init__(self, arguments):
107 def __init__(self, arguments):
108 arguments['lock_factory'] = CustomLockFactory
108 arguments['lock_factory'] = CustomLockFactory
109 db_file = arguments.get('filename')
109 db_file = arguments.get('filename')
110
110
111 log.debug('initialing cache-backend=%s db in %s', self.__class__.__name__, db_file)
111 log.debug('initialing cache-backend=%s db in %s', self.__class__.__name__, db_file)
112 db_file_dir = os.path.dirname(db_file)
112 db_file_dir = os.path.dirname(db_file)
113 if not os.path.isdir(db_file_dir):
113 if not os.path.isdir(db_file_dir):
114 os.makedirs(db_file_dir)
114 os.makedirs(db_file_dir)
115
115
116 try:
116 try:
117 super().__init__(arguments)
117 super().__init__(arguments)
118 except Exception:
118 except Exception:
119 log.exception('Failed to initialize db at: %s', db_file)
119 log.exception('Failed to initialize db at: %s', db_file)
120 raise
120 raise
121
121
122 def __repr__(self):
122 def __repr__(self):
123 return f'{self.__class__}(file=`{self.filename}`)'
123 return f'{self.__class__}(file=`{self.filename}`)'
124
124
125 def __str__(self):
125 def __str__(self):
126 return self.__repr__()
126 return self.__repr__()
127
127
128 def _get_keys_pattern(self, prefix: bytes = b''):
129 return b'%b:%b' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
130
128 def list_keys(self, prefix: bytes = b''):
131 def list_keys(self, prefix: bytes = b''):
129 prefix = b'%b:%b' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
132 prefix = self._get_keys_pattern(prefix)
130
133
131 def cond(dbm_key: bytes):
134 def cond(dbm_key: bytes):
132 if not prefix:
135 if not prefix:
133 return True
136 return True
134
137
135 if dbm_key.startswith(prefix):
138 if dbm_key.startswith(prefix):
136 return True
139 return True
137 return False
140 return False
138
141
139 with self._dbm_file(True) as dbm:
142 with self._dbm_file(True) as dbm:
140 try:
143 try:
141 return list(filter(cond, dbm.keys()))
144 return list(filter(cond, dbm.keys()))
142 except Exception:
145 except Exception:
143 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
146 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
144 raise
147 raise
145
148
146 def get_store(self):
149 def get_store(self):
147 return self.filename
150 return self.filename
148
151
149
152
150 class BaseRedisBackend(redis_backend.RedisBackend):
153 class BaseRedisBackend(redis_backend.RedisBackend):
151 key_prefix = ''
154 key_prefix = ''
152
155
153 def __init__(self, arguments):
156 def __init__(self, arguments):
154 self.db_conn = arguments.get('host', '') or arguments.get('url', '') or 'redis-host'
157 self.db_conn = arguments.get('host', '') or arguments.get('url', '') or 'redis-host'
155 super().__init__(arguments)
158 super().__init__(arguments)
156
159
157 self._lock_timeout = self.lock_timeout
160 self._lock_timeout = self.lock_timeout
158 self._lock_auto_renewal = str2bool(arguments.pop("lock_auto_renewal", True))
161 self._lock_auto_renewal = str2bool(arguments.pop("lock_auto_renewal", True))
159
162
160 if self._lock_auto_renewal and not self._lock_timeout:
163 if self._lock_auto_renewal and not self._lock_timeout:
161 # set default timeout for auto_renewal
164 # set default timeout for auto_renewal
162 self._lock_timeout = 30
165 self._lock_timeout = 30
163
166
164 def __repr__(self):
167 def __repr__(self):
165 return f'{self.__class__}(conn=`{self.db_conn}`)'
168 return f'{self.__class__}(conn=`{self.db_conn}`)'
166
169
167 def __str__(self):
170 def __str__(self):
168 return self.__repr__()
171 return self.__repr__()
169
172
170 def _create_client(self):
173 def _create_client(self):
171 args = {}
174 args = {}
172
175
173 if self.url is not None:
176 if self.url is not None:
174 args.update(url=self.url)
177 args.update(url=self.url)
175
178
176 else:
179 else:
177 args.update(
180 args.update(
178 host=self.host, password=self.password,
181 host=self.host, password=self.password,
179 port=self.port, db=self.db
182 port=self.port, db=self.db
180 )
183 )
181
184
182 connection_pool = redis.ConnectionPool(**args)
185 connection_pool = redis.ConnectionPool(**args)
183 self.writer_client = redis.StrictRedis(
186 self.writer_client = redis.StrictRedis(
184 connection_pool=connection_pool
187 connection_pool=connection_pool
185 )
188 )
186 self.reader_client = self.writer_client
189 self.reader_client = self.writer_client
187
190
188 def list_keys(self, prefix=''):
191 def _get_keys_pattern(self, prefix: bytes = b''):
189 prefix = f'{self.key_prefix}:{prefix}*'
192 return b'%b:%b*' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
193
194 def list_keys(self, prefix: bytes = b''):
195 prefix = self._get_keys_pattern(prefix)
190 return self.reader_client.keys(prefix)
196 return self.reader_client.keys(prefix)
191
197
192 def get_store(self):
198 def get_store(self):
193 return self.reader_client.connection_pool
199 return self.reader_client.connection_pool
194
200
195 def get_mutex(self, key):
201 def get_mutex(self, key):
196 if self.distributed_lock:
202 if self.distributed_lock:
197 lock_key = f'_lock_{safe_str(key)}'
203 lock_key = f'_lock_{safe_str(key)}'
198 return get_mutex_lock(
204 return get_mutex_lock(
199 self.writer_client, lock_key,
205 self.writer_client, lock_key,
200 self._lock_timeout,
206 self._lock_timeout,
201 auto_renewal=self._lock_auto_renewal
207 auto_renewal=self._lock_auto_renewal
202 )
208 )
203 else:
209 else:
204 return None
210 return None
205
211
206
212
207 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
213 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
208 key_prefix = 'redis_pickle_backend'
214 key_prefix = 'redis_pickle_backend'
209 pass
215 pass
210
216
211
217
212 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
218 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
213 key_prefix = 'redis_msgpack_backend'
219 key_prefix = 'redis_msgpack_backend'
214 pass
220 pass
215
221
216
222
217 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
223 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
218 from vcsserver.lib._vendor import redis_lock
224 from vcsserver.lib._vendor import redis_lock
219
225
220 class _RedisLockWrapper(object):
226 class _RedisLockWrapper(object):
221 """LockWrapper for redis_lock"""
227 """LockWrapper for redis_lock"""
222
228
223 @classmethod
229 @classmethod
224 def get_lock(cls):
230 def get_lock(cls):
225 return redis_lock.Lock(
231 return redis_lock.Lock(
226 redis_client=client,
232 redis_client=client,
227 name=lock_key,
233 name=lock_key,
228 expire=lock_timeout,
234 expire=lock_timeout,
229 auto_renewal=auto_renewal,
235 auto_renewal=auto_renewal,
230 strict=True,
236 strict=True,
231 )
237 )
232
238
233 def __repr__(self):
239 def __repr__(self):
234 return f"{self.__class__.__name__}:{lock_key}"
240 return f"{self.__class__.__name__}:{lock_key}"
235
241
236 def __str__(self):
242 def __str__(self):
237 return f"{self.__class__.__name__}:{lock_key}"
243 return f"{self.__class__.__name__}:{lock_key}"
238
244
239 def __init__(self):
245 def __init__(self):
240 self.lock = self.get_lock()
246 self.lock = self.get_lock()
241 self.lock_key = lock_key
247 self.lock_key = lock_key
242
248
243 def acquire(self, wait=True):
249 def acquire(self, wait=True):
244 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
250 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
245 try:
251 try:
246 acquired = self.lock.acquire(wait)
252 acquired = self.lock.acquire(wait)
247 log.debug('Got lock for key %s, %s', self.lock_key, acquired)
253 log.debug('Got lock for key %s, %s', self.lock_key, acquired)
248 return acquired
254 return acquired
249 except redis_lock.AlreadyAcquired:
255 except redis_lock.AlreadyAcquired:
250 return False
256 return False
251 except redis_lock.AlreadyStarted:
257 except redis_lock.AlreadyStarted:
252 # refresh thread exists, but it also means we acquired the lock
258 # refresh thread exists, but it also means we acquired the lock
253 return True
259 return True
254
260
255 def release(self):
261 def release(self):
256 try:
262 try:
257 self.lock.release()
263 self.lock.release()
258 except redis_lock.NotAcquired:
264 except redis_lock.NotAcquired:
259 pass
265 pass
260
266
261 return _RedisLockWrapper()
267 return _RedisLockWrapper()
@@ -1,232 +1,242 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2020 RhodeCode GmbH
2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 import functools
18 import functools
19 import logging
19 import logging
20 import os
20 import os
21 import threading
21 import threading
22 import time
22 import time
23
23
24 import decorator
24 import decorator
25 from dogpile.cache import CacheRegion
25 from dogpile.cache import CacheRegion
26
26
27 from vcsserver.lib.rc_cache import region_meta
27 from vcsserver.lib.rc_cache import region_meta
28 from vcsserver.str_utils import safe_bytes
28 from vcsserver.str_utils import safe_bytes
29 from vcsserver.type_utils import str2bool
29 from vcsserver.type_utils import str2bool
30 from vcsserver.utils import sha1
30 from vcsserver.utils import sha1
31
31
32 log = logging.getLogger(__name__)
32 log = logging.getLogger(__name__)
33
33
34
34
35 class RhodeCodeCacheRegion(CacheRegion):
35 class RhodeCodeCacheRegion(CacheRegion):
36
36
37 def __repr__(self):
37 def __repr__(self):
38 return f'{self.__class__}(name={self.name})'
38 return f'{self.__class__}(name={self.name})'
39
39
40 def conditional_cache_on_arguments(
40 def conditional_cache_on_arguments(
41 self, namespace=None,
41 self, namespace=None,
42 expiration_time=None,
42 expiration_time=None,
43 should_cache_fn=None,
43 should_cache_fn=None,
44 to_str=str,
44 to_str=str,
45 function_key_generator=None,
45 function_key_generator=None,
46 condition=True):
46 condition=True):
47 """
47 """
48 Custom conditional decorator, that will not touch any dogpile internals if
48 Custom conditional decorator, that will not touch any dogpile internals if
49 condition isn't meet. This works a bit different from should_cache_fn
49 condition isn't meet. This works a bit different from should_cache_fn
50 And it's faster in cases we don't ever want to compute cached values
50 And it's faster in cases we don't ever want to compute cached values
51 """
51 """
52 expiration_time_is_callable = callable(expiration_time)
52 expiration_time_is_callable = callable(expiration_time)
53 if not namespace:
54 namespace = getattr(self, '_default_namespace', None)
53
55
54 if function_key_generator is None:
56 if function_key_generator is None:
55 function_key_generator = self.function_key_generator
57 function_key_generator = self.function_key_generator
56
58
57 def get_or_create_for_user_func(key_generator, user_func, *arg, **kw):
59 def get_or_create_for_user_func(func_key_generator, user_func, *arg, **kw):
58
60
59 if not condition:
61 if not condition:
60 log.debug('Calling un-cached method:%s', user_func.__name__)
62 log.debug('Calling un-cached method:%s', user_func.__name__)
61 start = time.time()
63 start = time.time()
62 result = user_func(*arg, **kw)
64 result = user_func(*arg, **kw)
63 total = time.time() - start
65 total = time.time() - start
64 log.debug('un-cached method:%s took %.4fs', user_func.__name__, total)
66 log.debug('un-cached method:%s took %.4fs', user_func.__name__, total)
65 return result
67 return result
66
68
67 key = key_generator(*arg, **kw)
69 key = func_key_generator(*arg, **kw)
68
70
69 timeout = expiration_time() if expiration_time_is_callable \
71 timeout = expiration_time() if expiration_time_is_callable \
70 else expiration_time
72 else expiration_time
71
73
72 log.debug('Calling cached method:`%s`', user_func.__name__)
74 log.debug('Calling cached method:`%s`', user_func.__name__)
73 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
75 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
74
76
75 def cache_decorator(user_func):
77 def cache_decorator(user_func):
76 if to_str is str:
78 if to_str is str:
77 # backwards compatible
79 # backwards compatible
78 key_generator = function_key_generator(namespace, user_func)
80 key_generator = function_key_generator(namespace, user_func)
79 else:
81 else:
80 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
82 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
81
83
82 def refresh(*arg, **kw):
84 def refresh(*arg, **kw):
83 """
85 """
84 Like invalidate, but regenerates the value instead
86 Like invalidate, but regenerates the value instead
85 """
87 """
86 key = key_generator(*arg, **kw)
88 key = key_generator(*arg, **kw)
87 value = user_func(*arg, **kw)
89 value = user_func(*arg, **kw)
88 self.set(key, value)
90 self.set(key, value)
89 return value
91 return value
90
92
91 def invalidate(*arg, **kw):
93 def invalidate(*arg, **kw):
92 key = key_generator(*arg, **kw)
94 key = key_generator(*arg, **kw)
93 self.delete(key)
95 self.delete(key)
94
96
95 def set_(value, *arg, **kw):
97 def set_(value, *arg, **kw):
96 key = key_generator(*arg, **kw)
98 key = key_generator(*arg, **kw)
97 self.set(key, value)
99 self.set(key, value)
98
100
99 def get(*arg, **kw):
101 def get(*arg, **kw):
100 key = key_generator(*arg, **kw)
102 key = key_generator(*arg, **kw)
101 return self.get(key)
103 return self.get(key)
102
104
103 user_func.set = set_
105 user_func.set = set_
104 user_func.invalidate = invalidate
106 user_func.invalidate = invalidate
105 user_func.get = get
107 user_func.get = get
106 user_func.refresh = refresh
108 user_func.refresh = refresh
107 user_func.key_generator = key_generator
109 user_func.key_generator = key_generator
108 user_func.original = user_func
110 user_func.original = user_func
109
111
110 # Use `decorate` to preserve the signature of :param:`user_func`.
112 # Use `decorate` to preserve the signature of :param:`user_func`.
111 return decorator.decorate(user_func, functools.partial(
113 return decorator.decorate(user_func, functools.partial(
112 get_or_create_for_user_func, key_generator))
114 get_or_create_for_user_func, key_generator))
113
115
114 return cache_decorator
116 return cache_decorator
115
117
116
118
117 def make_region(*arg, **kw):
119 def make_region(*arg, **kw):
118 return RhodeCodeCacheRegion(*arg, **kw)
120 return RhodeCodeCacheRegion(*arg, **kw)
119
121
120
122
121 def get_default_cache_settings(settings, prefixes=None):
123 def get_default_cache_settings(settings, prefixes=None):
122 prefixes = prefixes or []
124 prefixes = prefixes or []
123 cache_settings = {}
125 cache_settings = {}
124 for key in settings.keys():
126 for key in settings.keys():
125 for prefix in prefixes:
127 for prefix in prefixes:
126 if key.startswith(prefix):
128 if key.startswith(prefix):
127 name = key.split(prefix)[1].strip()
129 name = key.split(prefix)[1].strip()
128 val = settings[key]
130 val = settings[key]
129 if isinstance(val, str):
131 if isinstance(val, str):
130 val = val.strip()
132 val = val.strip()
131 cache_settings[name] = val
133 cache_settings[name] = val
132 return cache_settings
134 return cache_settings
133
135
134
136
135 def compute_key_from_params(*args):
137 def compute_key_from_params(*args):
136 """
138 """
137 Helper to compute key from given params to be used in cache manager
139 Helper to compute key from given params to be used in cache manager
138 """
140 """
139 return sha1(safe_bytes("_".join(map(str, args))))
141 return sha1(safe_bytes("_".join(map(str, args))))
140
142
141
143
144 def custom_key_generator(backend, namespace, fn):
145 func_name = fn.__name__
146
147 def generate_key(*args):
148 backend_pref = getattr(backend, 'key_prefix', None) or 'backend_prefix'
149 namespace_pref = namespace or 'default_namespace'
150 arg_key = compute_key_from_params(*args)
151 final_key = f"{backend_pref}:{namespace_pref}:{func_name}_{arg_key}"
152
153 return final_key
154
155 return generate_key
156
157
142 def backend_key_generator(backend):
158 def backend_key_generator(backend):
143 """
159 """
144 Special wrapper that also sends over the backend to the key generator
160 Special wrapper that also sends over the backend to the key generator
145 """
161 """
146 def wrapper(namespace, fn):
162 def wrapper(namespace, fn):
147 return key_generator(backend, namespace, fn)
163 return custom_key_generator(backend, namespace, fn)
148 return wrapper
164 return wrapper
149
165
150
166
151 def key_generator(backend, namespace, fn):
152 func_name = fn.__name__
153
154 def generate_key(*args):
155 backend_prefix = getattr(backend, 'key_prefix', None) or 'backend_prefix'
156 namespace_pref = namespace or 'default_namespace'
157 arg_key = compute_key_from_params(*args)
158 final_key = f"{backend_prefix}:{namespace_pref}:{func_name}_{arg_key}"
159
160 return final_key
161
162 return generate_key
163
164
165 def get_or_create_region(region_name, region_namespace: str = None):
167 def get_or_create_region(region_name, region_namespace: str = None):
166 from vcsserver.lib.rc_cache.backends import FileNamespaceBackend
168 from vcsserver.lib.rc_cache.backends import FileNamespaceBackend
167
169
168 region_obj = region_meta.dogpile_cache_regions.get(region_name)
170 region_obj = region_meta.dogpile_cache_regions.get(region_name)
169 if not region_obj:
171 if not region_obj:
170 reg_keys = list(region_meta.dogpile_cache_regions.keys())
172 reg_keys = list(region_meta.dogpile_cache_regions.keys())
171 raise OSError(f'Region `{region_name}` not in configured: {reg_keys}.')
173 raise EnvironmentError(f'Region `{region_name}` not in configured: {reg_keys}.')
172
174
173 region_uid_name = f'{region_name}:{region_namespace}'
175 region_uid_name = f'{region_name}:{region_namespace}'
174
176
175 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
177 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
176 if not region_namespace:
178 if not region_namespace:
177 raise ValueError(f'{FileNamespaceBackend} used requires to specify region_namespace param')
179 raise ValueError(f'{FileNamespaceBackend} used requires to specify region_namespace param')
178
180
179 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
181 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
180 if region_exist:
182 if region_exist:
181 log.debug('Using already configured region: %s', region_namespace)
183 log.debug('Using already configured region: %s', region_namespace)
182 return region_exist
184 return region_exist
183
185
184 expiration_time = region_obj.expiration_time
186 expiration_time = region_obj.expiration_time
185
187
186 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
188 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
187 namespace_cache_dir = cache_dir
189 namespace_cache_dir = cache_dir
188
190
189 # we default the namespace_cache_dir to our default cache dir.
191 # we default the namespace_cache_dir to our default cache dir.
190 # however if this backend is configured with filename= param, we prioritize that
192 # however if this backend is configured with filename= param, we prioritize that
191 # so all caches within that particular region, even those namespaced end up in the same path
193 # so all caches within that particular region, even those namespaced end up in the same path
192 if region_obj.actual_backend.filename:
194 if region_obj.actual_backend.filename:
193 namespace_cache_dir = os.path.dirname(region_obj.actual_backend.filename)
195 namespace_cache_dir = os.path.dirname(region_obj.actual_backend.filename)
194
196
195 if not os.path.isdir(namespace_cache_dir):
197 if not os.path.isdir(namespace_cache_dir):
196 os.makedirs(namespace_cache_dir)
198 os.makedirs(namespace_cache_dir)
197 new_region = make_region(
199 new_region = make_region(
198 name=region_uid_name,
200 name=region_uid_name,
199 function_key_generator=backend_key_generator(region_obj.actual_backend)
201 function_key_generator=backend_key_generator(region_obj.actual_backend)
200 )
202 )
201
203
202 namespace_filename = os.path.join(
204 namespace_filename = os.path.join(
203 namespace_cache_dir, f"{region_name}_{region_namespace}.cache_db")
205 namespace_cache_dir, f"{region_name}_{region_namespace}.cache_db")
204 # special type that allows 1db per namespace
206 # special type that allows 1db per namespace
205 new_region.configure(
207 new_region.configure(
206 backend='dogpile.cache.rc.file_namespace',
208 backend='dogpile.cache.rc.file_namespace',
207 expiration_time=expiration_time,
209 expiration_time=expiration_time,
208 arguments={"filename": namespace_filename}
210 arguments={"filename": namespace_filename}
209 )
211 )
210
212
211 # create and save in region caches
213 # create and save in region caches
212 log.debug('configuring new region: %s', region_uid_name)
214 log.debug('configuring new region: %s', region_uid_name)
213 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
215 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
214
216
217 region_obj._default_namespace = region_namespace
215 return region_obj
218 return region_obj
216
219
217
220
218 def clear_cache_namespace(cache_region: str | RhodeCodeCacheRegion, cache_namespace_uid: str, invalidate: bool = False, hard: bool = False):
221 def clear_cache_namespace(cache_region: str | RhodeCodeCacheRegion, cache_namespace_uid: str, method: str):
222 from . import CLEAR_DELETE, CLEAR_INVALIDATE
223
219 if not isinstance(cache_region, RhodeCodeCacheRegion):
224 if not isinstance(cache_region, RhodeCodeCacheRegion):
220 cache_region = get_or_create_region(cache_region, cache_namespace_uid)
225 cache_region = get_or_create_region(cache_region, cache_namespace_uid)
226 log.debug('clearing cache region: %s with method=%s', cache_region, method)
221
227
222 cache_keys = cache_region.backend.list_keys(prefix=cache_namespace_uid)
228 num_affected_keys = None
223 num_delete_keys = len(cache_keys)
229
224 if invalidate:
230 if method == CLEAR_INVALIDATE:
225 # NOTE: The CacheRegion.invalidate() method’s default mode of
231 # NOTE: The CacheRegion.invalidate() method’s default mode of
226 # operation is to set a timestamp local to this CacheRegion in this Python process only.
232 # operation is to set a timestamp local to this CacheRegion in this Python process only.
227 # It does not impact other Python processes or regions as the timestamp is only stored locally in memory.
233 # It does not impact other Python processes or regions as the timestamp is only stored locally in memory.
228 cache_region.invalidate(hard=hard)
234 cache_region.invalidate(hard=True)
229 else:
235
230 if num_delete_keys:
236 if method == CLEAR_DELETE:
237 cache_keys = cache_region.backend.list_keys(prefix=cache_namespace_uid)
238 num_affected_keys = len(cache_keys)
239 if num_affected_keys:
231 cache_region.delete_multi(cache_keys)
240 cache_region.delete_multi(cache_keys)
232 return num_delete_keys
241
242 return num_affected_keys
General Comments 0
You need to be logged in to leave comments. Login now