##// END OF EJS Templates
feat(archive-cache): added logging and relative imports
super-admin -
r5424:035c4f28 default
parent child Browse files
Show More
@@ -1,412 +1,423 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
18
19 import codecs
19 import codecs
20 import contextlib
20 import contextlib
21 import functools
21 import functools
22 import os
22 import os
23 import logging
23 import logging
24 import time
24 import time
25 import typing
25 import typing
26 import zlib
26 import zlib
27 import sqlite3
27 import sqlite3
28
28
29 from rhodecode.lib.ext_json import json
29 from ...ext_json import json
30 from .lock import GenerationLock
30 from .lock import GenerationLock
31 from .utils import format_size
31
32
32 log = logging.getLogger(__name__)
33 log = logging.getLogger(__name__)
33
34
34 cache_meta = None
35 cache_meta = None
35
36
36 UNKNOWN = -241
37 UNKNOWN = -241
37 NO_VAL = -917
38 NO_VAL = -917
38
39
39 MODE_BINARY = 'BINARY'
40 MODE_BINARY = 'BINARY'
40
41
41
42
42 EVICTION_POLICY = {
43 EVICTION_POLICY = {
43 'none': {
44 'none': {
44 'evict': None,
45 'evict': None,
45 },
46 },
46 'least-recently-stored': {
47 'least-recently-stored': {
47 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time',
48 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time',
48 },
49 },
49 'least-recently-used': {
50 'least-recently-used': {
50 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time',
51 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time',
51 },
52 },
52 'least-frequently-used': {
53 'least-frequently-used': {
53 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count',
54 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count',
54 },
55 },
55 }
56 }
56
57
57
58
58 class DB:
59 class DB:
59
60
60 def __init__(self):
61 def __init__(self):
61 self.connection = sqlite3.connect(':memory:')
62 self.connection = sqlite3.connect(':memory:')
62 self._init_db()
63 self._init_db()
63
64
64 def _init_db(self):
65 def _init_db(self):
65 qry = '''
66 qry = '''
66 CREATE TABLE IF NOT EXISTS archive_cache (
67 CREATE TABLE IF NOT EXISTS archive_cache (
67 rowid INTEGER PRIMARY KEY,
68 rowid INTEGER PRIMARY KEY,
68 key_file TEXT,
69 key_file TEXT,
69 key_file_path TEXT,
70 key_file_path TEXT,
70 filename TEXT,
71 filename TEXT,
71 full_path TEXT,
72 full_path TEXT,
72 store_time REAL,
73 store_time REAL,
73 access_time REAL,
74 access_time REAL,
74 access_count INTEGER DEFAULT 0,
75 access_count INTEGER DEFAULT 0,
75 size INTEGER DEFAULT 0
76 size INTEGER DEFAULT 0
76 )
77 )
77 '''
78 '''
78
79
79 self.sql(qry)
80 self.sql(qry)
80 self.connection.commit()
81 self.connection.commit()
81
82
82 @property
83 @property
83 def sql(self):
84 def sql(self):
84 return self.connection.execute
85 return self.connection.execute
85
86
86 def bulk_insert(self, rows):
87 def bulk_insert(self, rows):
87 qry = '''
88 qry = '''
88 INSERT INTO archive_cache (
89 INSERT INTO archive_cache (
89 rowid,
90 rowid,
90 key_file,
91 key_file,
91 key_file_path,
92 key_file_path,
92 filename,
93 filename,
93 full_path,
94 full_path,
94 store_time,
95 store_time,
95 access_time,
96 access_time,
96 access_count,
97 access_count,
97 size
98 size
98 )
99 )
99 VALUES (
100 VALUES (
100 ?, ?, ?, ?, ?, ?, ?, ?, ?
101 ?, ?, ?, ?, ?, ?, ?, ?, ?
101 )
102 )
102 '''
103 '''
103 cursor = self.connection.cursor()
104 cursor = self.connection.cursor()
104 cursor.executemany(qry, rows)
105 cursor.executemany(qry, rows)
105 self.connection.commit()
106 self.connection.commit()
106
107
107
108
108 class FileSystemCache:
109 class FileSystemCache:
109
110
110 def __init__(self, index, directory, **settings):
111 def __init__(self, index, directory, **settings):
111 self._index = index
112 self._index = index
112 self._directory = directory
113 self._directory = directory
113
114
114 def _write_file(self, full_path, iterator, mode, encoding=None):
115 def _write_file(self, full_path, iterator, mode, encoding=None):
115 full_dir, _ = os.path.split(full_path)
116 full_dir, _ = os.path.split(full_path)
116
117
117 for count in range(1, 11):
118 for count in range(1, 11):
118 with contextlib.suppress(OSError):
119 with contextlib.suppress(OSError):
119 os.makedirs(full_dir)
120 os.makedirs(full_dir)
120
121
121 try:
122 try:
122 # Another cache may have deleted the directory before
123 # Another cache may have deleted the directory before
123 # the file could be opened.
124 # the file could be opened.
124 writer = open(full_path, mode, encoding=encoding)
125 writer = open(full_path, mode, encoding=encoding)
125 except OSError:
126 except OSError:
126 if count == 10:
127 if count == 10:
127 # Give up after 10 tries to open the file.
128 # Give up after 10 tries to open the file.
128 raise
129 raise
129 continue
130 continue
130
131
131 with writer:
132 with writer:
132 size = 0
133 size = 0
133 for chunk in iterator:
134 for chunk in iterator:
134 size += len(chunk)
135 size += len(chunk)
135 writer.write(chunk)
136 writer.write(chunk)
136 return size
137 return size
137
138
138 def _get_keyfile(self, key):
139 def _get_keyfile(self, key):
139 return os.path.join(self._directory, f'{key}.key')
140 return os.path.join(self._directory, f'{key}.key')
140
141
141 def store(self, key, value_reader, metadata):
142 def store(self, key, value_reader, metadata):
142 filename, full_path = self.random_filename()
143 filename, full_path = self.random_filename()
143 key_file = self._get_keyfile(key)
144 key_file = self._get_keyfile(key)
144
145
145 # STORE METADATA
146 # STORE METADATA
146 _metadata = {
147 _metadata = {
147 "version": "v1",
148 "version": "v1",
148 "filename": filename,
149 "filename": filename,
149 "full_path": full_path,
150 "full_path": full_path,
150 "key_file": key_file,
151 "key_file": key_file,
151 "store_time": time.time(),
152 "store_time": time.time(),
152 "access_count": 1,
153 "access_count": 1,
153 "access_time": 0,
154 "access_time": 0,
154 "size": 0
155 "size": 0
155 }
156 }
156 if metadata:
157 if metadata:
157 _metadata.update(metadata)
158 _metadata.update(metadata)
158
159
159 reader = functools.partial(value_reader.read, 2**22)
160 reader = functools.partial(value_reader.read, 2**22)
160
161
161 iterator = iter(reader, b'')
162 iterator = iter(reader, b'')
162 size = self._write_file(full_path, iterator, 'xb')
163 size = self._write_file(full_path, iterator, 'xb')
163 metadata['size'] = size
164 metadata['size'] = size
164
165
165 # after archive is finished, we create a key to save the presence of the binary file
166 # after archive is finished, we create a key to save the presence of the binary file
166 with open(key_file, 'wb') as f:
167 with open(key_file, 'wb') as f:
167 f.write(json.dumps(_metadata))
168 f.write(json.dumps(_metadata))
168
169
169 return key, size, MODE_BINARY, filename, _metadata
170 return key, size, MODE_BINARY, filename, _metadata
170
171
171 def fetch(self, key) -> tuple[typing.BinaryIO, dict]:
172 def fetch(self, key) -> tuple[typing.BinaryIO, dict]:
172 if key not in self:
173 if key not in self:
173 raise KeyError(key)
174 raise KeyError(key)
174
175
175 key_file = self._get_keyfile(key)
176 key_file = self._get_keyfile(key)
176 with open(key_file, 'rb') as f:
177 with open(key_file, 'rb') as f:
177 metadata = json.loads(f.read())
178 metadata = json.loads(f.read())
178
179
179 filename = metadata['filename']
180 filename = metadata['filename']
180
181
181 try:
182 try:
182 return open(os.path.join(self._directory, filename), 'rb'), metadata
183 return open(os.path.join(self._directory, filename), 'rb'), metadata
183 finally:
184 finally:
184 # update usage stats, count and accessed
185 # update usage stats, count and accessed
185 metadata["access_count"] = metadata.get("access_count", 0) + 1
186 metadata["access_count"] = metadata.get("access_count", 0) + 1
186 metadata["access_time"] = time.time()
187 metadata["access_time"] = time.time()
187
188
188 with open(key_file, 'wb') as f:
189 with open(key_file, 'wb') as f:
189 f.write(json.dumps(metadata))
190 f.write(json.dumps(metadata))
190
191
191 def random_filename(self):
192 def random_filename(self):
192 """Return filename and full-path tuple for file storage.
193 """Return filename and full-path tuple for file storage.
193
194
194 Filename will be a randomly generated 28 character hexadecimal string
195 Filename will be a randomly generated 28 character hexadecimal string
195 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
196 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
196 reduce the size of directories. On older filesystems, lookups in
197 reduce the size of directories. On older filesystems, lookups in
197 directories with many files may be slow.
198 directories with many files may be slow.
198 """
199 """
199
200
200 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
201 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
201 sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
202 sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
202 name = hex_name[4:] + '.archive_cache'
203 name = hex_name[4:] + '.archive_cache'
203 filename = os.path.join(sub_dir, name)
204 filename = os.path.join(sub_dir, name)
204 full_path = os.path.join(self._directory, filename)
205 full_path = os.path.join(self._directory, filename)
205 return filename, full_path
206 return filename, full_path
206
207
207 def hash(self, key):
208 def hash(self, key):
208 """Compute portable hash for `key`.
209 """Compute portable hash for `key`.
209
210
210 :param key: key to hash
211 :param key: key to hash
211 :return: hash value
212 :return: hash value
212
213
213 """
214 """
214 mask = 0xFFFFFFFF
215 mask = 0xFFFFFFFF
215 return zlib.adler32(key.encode('utf-8')) & mask # noqa
216 return zlib.adler32(key.encode('utf-8')) & mask # noqa
216
217
217 def __contains__(self, key):
218 def __contains__(self, key):
218 """Return `True` if `key` matching item is found in cache.
219 """Return `True` if `key` matching item is found in cache.
219
220
220 :param key: key matching item
221 :param key: key matching item
221 :return: True if key matching item
222 :return: True if key matching item
222
223
223 """
224 """
224 key_file = self._get_keyfile(key)
225 key_file = self._get_keyfile(key)
225 return os.path.exists(key_file)
226 return os.path.exists(key_file)
226
227
227
228
228 class FanoutCache:
229 class FanoutCache:
229 """Cache that shards keys and values."""
230 """Cache that shards keys and values."""
230
231
231 def __init__(
232 def __init__(
232 self, directory=None, **settings
233 self, directory=None, **settings
233 ):
234 ):
234 """Initialize cache instance.
235 """Initialize cache instance.
235
236
236 :param str directory: cache directory
237 :param str directory: cache directory
237 :param settings: settings dict
238 :param settings: settings dict
238
239
239 """
240 """
240 if directory is None:
241 if directory is None:
241 raise ValueError('directory cannot be None')
242 raise ValueError('directory cannot be None')
242
243
243 directory = str(directory)
244 directory = str(directory)
244 directory = os.path.expanduser(directory)
245 directory = os.path.expanduser(directory)
245 directory = os.path.expandvars(directory)
246 directory = os.path.expandvars(directory)
246 self._directory = directory
247 self._directory = directory
247
248
248 self._count = settings.pop('cache_shards')
249 self._count = settings.pop('cache_shards')
249 self._locking_url = settings.pop('locking_url')
250 self._locking_url = settings.pop('locking_url')
250
251
251 self._eviction_policy = settings['cache_eviction_policy']
252 self._eviction_policy = settings['cache_eviction_policy']
252 self._cache_size_limit = settings['cache_size_limit']
253 self._cache_size_limit = settings['cache_size_limit']
253
254
254 self._shards = tuple(
255 self._shards = tuple(
255 FileSystemCache(
256 FileSystemCache(
256 index=num,
257 index=num,
257 directory=os.path.join(directory, 'shard_%03d' % num),
258 directory=os.path.join(directory, 'shard_%03d' % num),
258 **settings,
259 **settings,
259 )
260 )
260 for num in range(self._count)
261 for num in range(self._count)
261 )
262 )
262 self._hash = self._shards[0].hash
263 self._hash = self._shards[0].hash
263
264
264 def get_lock(self, lock_key):
265 def get_lock(self, lock_key):
265 return GenerationLock(lock_key, self._locking_url)
266 return GenerationLock(lock_key, self._locking_url)
266
267
267 def _get_shard(self, key) -> FileSystemCache:
268 def _get_shard(self, key) -> FileSystemCache:
268 index = self._hash(key) % self._count
269 index = self._hash(key) % self._count
269 shard = self._shards[index]
270 shard = self._shards[index]
270 return shard
271 return shard
271
272
272 def store(self, key, value_reader, metadata=None):
273 def store(self, key, value_reader, metadata=None):
273 shard = self._get_shard(key)
274 shard = self._get_shard(key)
274 return shard.store(key, value_reader, metadata)
275 return shard.store(key, value_reader, metadata)
275
276
276 def fetch(self, key):
277 def fetch(self, key):
277 """Return file handle corresponding to `key` from cache.
278 """Return file handle corresponding to `key` from cache.
278 """
279 """
279 shard = self._get_shard(key)
280 shard = self._get_shard(key)
280 return shard.fetch(key)
281 return shard.fetch(key)
281
282
282 def has_key(self, key):
283 def has_key(self, key):
283 """Return `True` if `key` matching item is found in cache.
284 """Return `True` if `key` matching item is found in cache.
284
285
285 :param key: key for item
286 :param key: key for item
286 :return: True if key is found
287 :return: True if key is found
287
288
288 """
289 """
289 shard = self._get_shard(key)
290 shard = self._get_shard(key)
290 return key in shard
291 return key in shard
291
292
292 def __contains__(self, item):
293 def __contains__(self, item):
293 return self.has_key(item)
294 return self.has_key(item)
294
295
295 def evict(self, policy=None, size_limit=None):
296 def evict(self, policy=None, size_limit=None):
296 """
297 """
297 Remove old items based on the conditions
298 Remove old items based on the conditions
298
299
299
300
300 explanation of this algo:
301 explanation of this algo:
301 iterate over each shard, then for each shard iterate over the .key files
302 iterate over each shard, then for each shard iterate over the .key files
302 read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
303 read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
303 access data, time creation, and access counts.
304 access data, time creation, and access counts.
304
305
305 Store that into a memory DB so we can run different sorting strategies easily.
306 Store that into a memory DB so we can run different sorting strategies easily.
306 Summing the size is a sum sql query.
307 Summing the size is a sum sql query.
307
308
308 Then we run a sorting strategy based on eviction policy.
309 Then we run a sorting strategy based on eviction policy.
309 We iterate over sorted keys, and remove each checking if we hit the overall limit.
310 We iterate over sorted keys, and remove each checking if we hit the overall limit.
310 """
311 """
311
312
312 policy = policy or self._eviction_policy
313 policy = policy or self._eviction_policy
313 size_limit = size_limit or self._cache_size_limit
314 size_limit = size_limit or self._cache_size_limit
314
315
315 select_policy = EVICTION_POLICY[policy]['evict']
316 select_policy = EVICTION_POLICY[policy]['evict']
316
317
318 log.debug('Running eviction policy \'%s\', and checking for size limit: %s',
319 policy, format_size(size_limit))
320
317 if select_policy is None:
321 if select_policy is None:
318 return 0
322 return 0
319
323
320 db = DB()
324 db = DB()
321
325
322 data = []
326 data = []
323 cnt = 1
327 cnt = 1
324 for shard in self._shards:
328 for shard in self._shards:
325 for key_file in os.listdir(shard._directory):
329 for key_file in os.listdir(shard._directory):
326 if key_file.endswith('.key'):
330 if key_file.endswith('.key'):
327 key_file_path = os.path.join(shard._directory, key_file)
331 key_file_path = os.path.join(shard._directory, key_file)
328 with open(key_file_path, 'rb') as f:
332 with open(key_file_path, 'rb') as f:
329 metadata = json.loads(f.read())
333 metadata = json.loads(f.read())
330 # in case we don't have size re-calc it...
334 # in case we don't have size re-calc it...
331 if not metadata.get('size'):
335 if not metadata.get('size'):
332 fn = metadata.get('full_path')
336 fn = metadata.get('full_path')
333 size = os.stat(fn).st_size
337 size = os.stat(fn).st_size
334
338
335 data.append([
339 data.append([
336 cnt,
340 cnt,
337 key_file,
341 key_file,
338 key_file_path,
342 key_file_path,
339 metadata.get('filename'),
343 metadata.get('filename'),
340 metadata.get('full_path'),
344 metadata.get('full_path'),
341 metadata.get('store_time', 0),
345 metadata.get('store_time', 0),
342 metadata.get('access_time', 0),
346 metadata.get('access_time', 0),
343 metadata.get('access_count', 0),
347 metadata.get('access_count', 0),
344 metadata.get('size', size),
348 metadata.get('size', size),
345 ])
349 ])
346 cnt += 1
350 cnt += 1
347
351
348 # Insert bulk data using executemany
352 # Insert bulk data using executemany
349 db.bulk_insert(data)
353 db.bulk_insert(data)
350
354
351 ((total_size,),) = db.sql('SELECT COALESCE(SUM(size), 0) FROM archive_cache').fetchall()
355 ((total_size,),) = db.sql('SELECT COALESCE(SUM(size), 0) FROM archive_cache').fetchall()
352
356 log.debug('Analyzed %s keys, occupied: %s', len(data), format_size(total_size))
353 select_policy_qry = select_policy.format(fields='key_file_path, full_path, size')
357 select_policy_qry = select_policy.format(fields='key_file_path, full_path, size')
354 sorted_keys = db.sql(select_policy_qry).fetchall()
358 sorted_keys = db.sql(select_policy_qry).fetchall()
355
359
360 removed_items = 0
361 removed_size = 0
356 for key, cached_file, size in sorted_keys:
362 for key, cached_file, size in sorted_keys:
357 # simulate removal impact BEFORE removal
363 # simulate removal impact BEFORE removal
358 total_size -= size
364 total_size -= size
365
359 if total_size <= size_limit:
366 if total_size <= size_limit:
360 # we obtained what we wanted...
367 # we obtained what we wanted...
361 break
368 break
362
369
363 os.remove(cached_file)
370 os.remove(cached_file)
364 os.remove(key)
371 os.remove(key)
365 return
372 removed_items += 1
373 removed_size += size
374
375 log.debug('Removed %s cache archives, and reduced size: %s', removed_items, format_size(removed_size))
376 return removed_items
366
377
367
378
368 def get_archival_config(config):
379 def get_archival_config(config):
369
380
370 final_config = {
381 final_config = {
371
382
372 }
383 }
373
384
374 for k, v in config.items():
385 for k, v in config.items():
375 if k.startswith('archive_cache'):
386 if k.startswith('archive_cache'):
376 final_config[k] = v
387 final_config[k] = v
377
388
378 return final_config
389 return final_config
379
390
380
391
381 def get_archival_cache_store(config):
392 def get_archival_cache_store(config):
382
393
383 global cache_meta
394 global cache_meta
384 if cache_meta is not None:
395 if cache_meta is not None:
385 return cache_meta
396 return cache_meta
386
397
387 config = get_archival_config(config)
398 config = get_archival_config(config)
388 backend = config['archive_cache.backend.type']
399 backend = config['archive_cache.backend.type']
389 if backend != 'filesystem':
400 if backend != 'filesystem':
390 raise ValueError('archive_cache.backend.type only supports "filesystem"')
401 raise ValueError('archive_cache.backend.type only supports "filesystem"')
391
402
392 archive_cache_locking_url = config['archive_cache.locking.url']
403 archive_cache_locking_url = config['archive_cache.locking.url']
393 archive_cache_dir = config['archive_cache.filesystem.store_dir']
404 archive_cache_dir = config['archive_cache.filesystem.store_dir']
394 archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
405 archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
395 archive_cache_shards = config['archive_cache.filesystem.cache_shards']
406 archive_cache_shards = config['archive_cache.filesystem.cache_shards']
396 archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
407 archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
397
408
398 log.debug('Initializing archival cache instance under %s', archive_cache_dir)
409 log.debug('Initializing archival cache instance under %s', archive_cache_dir)
399
410
400 # check if it's ok to write, and re-create the archive cache
411 # check if it's ok to write, and re-create the archive cache
401 if not os.path.isdir(archive_cache_dir):
412 if not os.path.isdir(archive_cache_dir):
402 os.makedirs(archive_cache_dir, exist_ok=True)
413 os.makedirs(archive_cache_dir, exist_ok=True)
403
414
404 d_cache = FanoutCache(
415 d_cache = FanoutCache(
405 archive_cache_dir,
416 archive_cache_dir,
406 locking_url=archive_cache_locking_url,
417 locking_url=archive_cache_locking_url,
407 cache_shards=archive_cache_shards,
418 cache_shards=archive_cache_shards,
408 cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
419 cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
409 cache_eviction_policy=archive_cache_eviction_policy
420 cache_eviction_policy=archive_cache_eviction_policy
410 )
421 )
411 cache_meta = d_cache
422 cache_meta = d_cache
412 return cache_meta
423 return cache_meta
@@ -1,60 +1,59 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
18
19 import redis
19 import redis
20 from rhodecode.lib._vendor import redis_lock
20 from ..._vendor import redis_lock
21
22 from .utils import ArchiveCacheLock
21 from .utils import ArchiveCacheLock
23
22
24
23
25 class GenerationLock:
24 class GenerationLock:
26 """
25 """
27 Locking mechanism that detects if a lock is acquired
26 Locking mechanism that detects if a lock is acquired
28
27
29 with GenerationLock(lock_key):
28 with GenerationLock(lock_key):
30 compute_archive()
29 compute_archive()
31 """
30 """
32 lock_timeout = 7200
31 lock_timeout = 7200
33
32
34 def __init__(self, lock_key, url):
33 def __init__(self, lock_key, url):
35 self.lock_key = lock_key
34 self.lock_key = lock_key
36 self._create_client(url)
35 self._create_client(url)
37 self.lock = self.get_lock()
36 self.lock = self.get_lock()
38
37
39 def _create_client(self, url):
38 def _create_client(self, url):
40 connection_pool = redis.ConnectionPool.from_url(url)
39 connection_pool = redis.ConnectionPool.from_url(url)
41 self.writer_client = redis.StrictRedis(
40 self.writer_client = redis.StrictRedis(
42 connection_pool=connection_pool
41 connection_pool=connection_pool
43 )
42 )
44 self.reader_client = self.writer_client
43 self.reader_client = self.writer_client
45
44
46 def get_lock(self):
45 def get_lock(self):
47 return redis_lock.Lock(
46 return redis_lock.Lock(
48 redis_client=self.writer_client,
47 redis_client=self.writer_client,
49 name=self.lock_key,
48 name=self.lock_key,
50 expire=self.lock_timeout,
49 expire=self.lock_timeout,
51 strict=True
50 strict=True
52 )
51 )
53
52
54 def __enter__(self):
53 def __enter__(self):
55 acquired = self.lock.acquire(blocking=False)
54 acquired = self.lock.acquire(blocking=False)
56 if not acquired:
55 if not acquired:
57 raise ArchiveCacheLock('Failed to create a lock')
56 raise ArchiveCacheLock('Failed to create a lock')
58
57
59 def __exit__(self, exc_type, exc_val, exc_tb):
58 def __exit__(self, exc_type, exc_val, exc_tb):
60 self.lock.release()
59 self.lock.release()
General Comments 0
You need to be logged in to leave comments. Login now