##// END OF EJS Templates
chore(code-sync): synced code from ce for archive_cache
super-admin -
r1244:ecae6663 default
parent child Browse files
Show More
@@ -1,411 +1,426 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2024 RhodeCode GmbH
2 # Copyright (C) 2014-2024 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 import codecs
18 import codecs
19 import contextlib
19 import contextlib
20 import functools
20 import functools
21 import os
21 import os
22 import logging
22 import logging
23 import time
23 import time
24 import typing
24 import typing
25 import zlib
25 import zlib
26 import sqlite3
26 import sqlite3
27
27
28 from vcsserver.lib.rc_json import json
28 from ...ext_json import json
29 from .lock import GenerationLock
29 from .lock import GenerationLock
30 from .utils import format_size
30
31
31 log = logging.getLogger(__name__)
32 log = logging.getLogger(__name__)
32
33
33 cache_meta = None
34 cache_meta = None
34
35
35 UNKNOWN = -241
36 UNKNOWN = -241
36 NO_VAL = -917
37 NO_VAL = -917
37
38
38 MODE_BINARY = 'BINARY'
39 MODE_BINARY = 'BINARY'
39
40
40
41
41 EVICTION_POLICY = {
42 EVICTION_POLICY = {
42 'none': {
43 'none': {
43 'evict': None,
44 'evict': None,
44 },
45 },
45 'least-recently-stored': {
46 'least-recently-stored': {
46 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time',
47 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time',
47 },
48 },
48 'least-recently-used': {
49 'least-recently-used': {
49 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time',
50 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time',
50 },
51 },
51 'least-frequently-used': {
52 'least-frequently-used': {
52 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count',
53 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count',
53 },
54 },
54 }
55 }
55
56
56
57
57 class DB:
58 class DB:
58
59
59 def __init__(self):
60 def __init__(self):
60 self.connection = sqlite3.connect(':memory:')
61 self.connection = sqlite3.connect(':memory:')
61 self._init_db()
62 self._init_db()
62
63
63 def _init_db(self):
64 def _init_db(self):
64 qry = '''
65 qry = '''
65 CREATE TABLE IF NOT EXISTS archive_cache (
66 CREATE TABLE IF NOT EXISTS archive_cache (
66 rowid INTEGER PRIMARY KEY,
67 rowid INTEGER PRIMARY KEY,
67 key_file TEXT,
68 key_file TEXT,
68 key_file_path TEXT,
69 key_file_path TEXT,
69 filename TEXT,
70 filename TEXT,
70 full_path TEXT,
71 full_path TEXT,
71 store_time REAL,
72 store_time REAL,
72 access_time REAL,
73 access_time REAL,
73 access_count INTEGER DEFAULT 0,
74 access_count INTEGER DEFAULT 0,
74 size INTEGER DEFAULT 0
75 size INTEGER DEFAULT 0
75 )
76 )
76 '''
77 '''
77
78
78 self.sql(qry)
79 self.sql(qry)
79 self.connection.commit()
80 self.connection.commit()
80
81
81 @property
82 @property
82 def sql(self):
83 def sql(self):
83 return self.connection.execute
84 return self.connection.execute
84
85
85 def bulk_insert(self, rows):
86 def bulk_insert(self, rows):
86 qry = '''
87 qry = '''
87 INSERT INTO archive_cache (
88 INSERT INTO archive_cache (
88 rowid,
89 rowid,
89 key_file,
90 key_file,
90 key_file_path,
91 key_file_path,
91 filename,
92 filename,
92 full_path,
93 full_path,
93 store_time,
94 store_time,
94 access_time,
95 access_time,
95 access_count,
96 access_count,
96 size
97 size
97 )
98 )
98 VALUES (
99 VALUES (
99 ?, ?, ?, ?, ?, ?, ?, ?, ?
100 ?, ?, ?, ?, ?, ?, ?, ?, ?
100 )
101 )
101 '''
102 '''
102 cursor = self.connection.cursor()
103 cursor = self.connection.cursor()
103 cursor.executemany(qry, rows)
104 cursor.executemany(qry, rows)
104 self.connection.commit()
105 self.connection.commit()
105
106
106
107
107 class FileSystemCache:
108 class FileSystemCache:
108
109
109 def __init__(self, index, directory, **settings):
110 def __init__(self, index, directory, **settings):
110 self._index = index
111 self._index = index
111 self._directory = directory
112 self._directory = directory
112
113
113 def _write_file(self, full_path, iterator, mode, encoding=None):
114 def _write_file(self, full_path, iterator, mode, encoding=None):
114 full_dir, _ = os.path.split(full_path)
115 full_dir, _ = os.path.split(full_path)
115
116
116 for count in range(1, 11):
117 for count in range(1, 11):
117 with contextlib.suppress(OSError):
118 with contextlib.suppress(OSError):
118 os.makedirs(full_dir)
119 os.makedirs(full_dir)
119
120
120 try:
121 try:
121 # Another cache may have deleted the directory before
122 # Another cache may have deleted the directory before
122 # the file could be opened.
123 # the file could be opened.
123 writer = open(full_path, mode, encoding=encoding)
124 writer = open(full_path, mode, encoding=encoding)
124 except OSError:
125 except OSError:
125 if count == 10:
126 if count == 10:
126 # Give up after 10 tries to open the file.
127 # Give up after 10 tries to open the file.
127 raise
128 raise
128 continue
129 continue
129
130
130 with writer:
131 with writer:
131 size = 0
132 size = 0
132 for chunk in iterator:
133 for chunk in iterator:
133 size += len(chunk)
134 size += len(chunk)
134 writer.write(chunk)
135 writer.write(chunk)
135 return size
136 return size
136
137
137 def _get_keyfile(self, key):
138 def _get_keyfile(self, key):
138 return os.path.join(self._directory, f'{key}.key')
139 return os.path.join(self._directory, f'{key}.key')
139
140
140 def store(self, key, value_reader, metadata):
141 def store(self, key, value_reader, metadata):
141 filename, full_path = self.random_filename()
142 filename, full_path = self.random_filename()
142 key_file = self._get_keyfile(key)
143 key_file = self._get_keyfile(key)
143
144
144 # STORE METADATA
145 # STORE METADATA
145 _metadata = {
146 _metadata = {
146 "version": "v1",
147 "version": "v1",
147 "filename": filename,
148 "filename": filename,
148 "full_path": full_path,
149 "full_path": full_path,
149 "key_file": key_file,
150 "key_file": key_file,
150 "store_time": time.time(),
151 "store_time": time.time(),
151 "access_count": 1,
152 "access_count": 1,
152 "access_time": 0,
153 "access_time": 0,
153 "size": 0
154 "size": 0
154 }
155 }
155 if metadata:
156 if metadata:
156 _metadata.update(metadata)
157 _metadata.update(metadata)
157
158
158 reader = functools.partial(value_reader.read, 2**22)
159 reader = functools.partial(value_reader.read, 2**22)
159
160
160 iterator = iter(reader, b'')
161 iterator = iter(reader, b'')
161 size = self._write_file(full_path, iterator, 'xb')
162 size = self._write_file(full_path, iterator, 'xb')
162 metadata['size'] = size
163 metadata['size'] = size
163
164
164 # after archive is finished, we create a key to save the presence of the binary file
165 # after archive is finished, we create a key to save the presence of the binary file
165 with open(key_file, 'wb') as f:
166 with open(key_file, 'wb') as f:
166 f.write(json.dumps(_metadata))
167 f.write(json.dumps(_metadata))
167
168
168 return key, size, MODE_BINARY, filename, _metadata
169 return key, size, MODE_BINARY, filename, _metadata
169
170
170 def fetch(self, key) -> tuple[typing.BinaryIO, dict]:
171 def fetch(self, key) -> tuple[typing.BinaryIO, dict]:
171 if key not in self:
172 if key not in self:
172 raise KeyError(key)
173 raise KeyError(key)
173
174
174 key_file = self._get_keyfile(key)
175 key_file = self._get_keyfile(key)
175 with open(key_file, 'rb') as f:
176 with open(key_file, 'rb') as f:
176 metadata = json.loads(f.read())
177 metadata = json.loads(f.read())
177
178
178 filename = metadata['filename']
179 filename = metadata['filename']
179
180
180 try:
181 try:
181 return open(os.path.join(self._directory, filename), 'rb'), metadata
182 return open(os.path.join(self._directory, filename), 'rb'), metadata
182 finally:
183 finally:
183 # update usage stats, count and accessed
184 # update usage stats, count and accessed
184 metadata["access_count"] = metadata.get("access_count", 0) + 1
185 metadata["access_count"] = metadata.get("access_count", 0) + 1
185 metadata["access_time"] = time.time()
186 metadata["access_time"] = time.time()
186
187
187 with open(key_file, 'wb') as f:
188 with open(key_file, 'wb') as f:
188 f.write(json.dumps(metadata))
189 f.write(json.dumps(metadata))
189
190
190 def random_filename(self):
191 def random_filename(self):
191 """Return filename and full-path tuple for file storage.
192 """Return filename and full-path tuple for file storage.
192
193
193 Filename will be a randomly generated 28 character hexadecimal string
194 Filename will be a randomly generated 28 character hexadecimal string
194 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
195 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
195 reduce the size of directories. On older filesystems, lookups in
196 reduce the size of directories. On older filesystems, lookups in
196 directories with many files may be slow.
197 directories with many files may be slow.
197 """
198 """
198
199
199 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
200 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
200 sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
201 sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
201 name = hex_name[4:] + '.archive_cache'
202 name = hex_name[4:] + '.archive_cache'
202 filename = os.path.join(sub_dir, name)
203 filename = os.path.join(sub_dir, name)
203 full_path = os.path.join(self._directory, filename)
204 full_path = os.path.join(self._directory, filename)
204 return filename, full_path
205 return filename, full_path
205
206
206 def hash(self, key):
207 def hash(self, key):
207 """Compute portable hash for `key`.
208 """Compute portable hash for `key`.
208
209
209 :param key: key to hash
210 :param key: key to hash
210 :return: hash value
211 :return: hash value
211
212
212 """
213 """
213 mask = 0xFFFFFFFF
214 mask = 0xFFFFFFFF
214 return zlib.adler32(key.encode('utf-8')) & mask # noqa
215 return zlib.adler32(key.encode('utf-8')) & mask # noqa
215
216
216 def __contains__(self, key):
217 def __contains__(self, key):
217 """Return `True` if `key` matching item is found in cache.
218 """Return `True` if `key` matching item is found in cache.
218
219
219 :param key: key matching item
220 :param key: key matching item
220 :return: True if key matching item
221 :return: True if key matching item
221
222
222 """
223 """
223 key_file = self._get_keyfile(key)
224 key_file = self._get_keyfile(key)
224 return os.path.exists(key_file)
225 return os.path.exists(key_file)
225
226
226
227
227 class FanoutCache:
228 class FanoutCache:
228 """Cache that shards keys and values."""
229 """Cache that shards keys and values."""
229
230
230 def __init__(
231 def __init__(
231 self, directory=None, **settings
232 self, directory=None, **settings
232 ):
233 ):
233 """Initialize cache instance.
234 """Initialize cache instance.
234
235
235 :param str directory: cache directory
236 :param str directory: cache directory
236 :param settings: settings dict
237 :param settings: settings dict
237
238
238 """
239 """
239 if directory is None:
240 if directory is None:
240 raise ValueError('directory cannot be None')
241 raise ValueError('directory cannot be None')
241
242
242 directory = str(directory)
243 directory = str(directory)
243 directory = os.path.expanduser(directory)
244 directory = os.path.expanduser(directory)
244 directory = os.path.expandvars(directory)
245 directory = os.path.expandvars(directory)
245 self._directory = directory
246 self._directory = directory
246
247
247 self._count = settings.pop('cache_shards')
248 self._count = settings.pop('cache_shards')
248 self._locking_url = settings.pop('locking_url')
249 self._locking_url = settings.pop('locking_url')
249
250
250 self._eviction_policy = settings['cache_eviction_policy']
251 self._eviction_policy = settings['cache_eviction_policy']
251 self._cache_size_limit = settings['cache_size_limit']
252 self._cache_size_limit = settings['cache_size_limit']
252
253
253 self._shards = tuple(
254 self._shards = tuple(
254 FileSystemCache(
255 FileSystemCache(
255 index=num,
256 index=num,
256 directory=os.path.join(directory, 'shard_%03d' % num),
257 directory=os.path.join(directory, 'shard_%03d' % num),
257 **settings,
258 **settings,
258 )
259 )
259 for num in range(self._count)
260 for num in range(self._count)
260 )
261 )
261 self._hash = self._shards[0].hash
262 self._hash = self._shards[0].hash
262
263
263 def get_lock(self, lock_key):
264 def get_lock(self, lock_key):
264 return GenerationLock(lock_key, self._locking_url)
265 return GenerationLock(lock_key, self._locking_url)
265
266
266 def _get_shard(self, key) -> FileSystemCache:
267 def _get_shard(self, key) -> FileSystemCache:
267 index = self._hash(key) % self._count
268 index = self._hash(key) % self._count
268 shard = self._shards[index]
269 shard = self._shards[index]
269 return shard
270 return shard
270
271
271 def store(self, key, value_reader, metadata=None):
272 def store(self, key, value_reader, metadata=None):
272 shard = self._get_shard(key)
273 shard = self._get_shard(key)
273 return shard.store(key, value_reader, metadata)
274 return shard.store(key, value_reader, metadata)
274
275
275 def fetch(self, key):
276 def fetch(self, key):
276 """Return file handle corresponding to `key` from cache.
277 """Return file handle corresponding to `key` from cache.
277 """
278 """
278 shard = self._get_shard(key)
279 shard = self._get_shard(key)
279 return shard.fetch(key)
280 return shard.fetch(key)
280
281
281 def has_key(self, key):
282 def has_key(self, key):
282 """Return `True` if `key` matching item is found in cache.
283 """Return `True` if `key` matching item is found in cache.
283
284
284 :param key: key for item
285 :param key: key for item
285 :return: True if key is found
286 :return: True if key is found
286
287
287 """
288 """
288 shard = self._get_shard(key)
289 shard = self._get_shard(key)
289 return key in shard
290 return key in shard
290
291
291 def __contains__(self, item):
292 def __contains__(self, item):
292 return self.has_key(item)
293 return self.has_key(item)
293
294
294 def evict(self, policy=None, size_limit=None):
295 def evict(self, policy=None, size_limit=None):
295 """
296 """
296 Remove old items based on the conditions
297 Remove old items based on the conditions
297
298
298
299
299 explanation of this algo:
300 explanation of this algo:
300 iterate over each shard, then for each shard iterate over the .key files
301 iterate over each shard, then for each shard iterate over the .key files
301 read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
302 read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
302 access data, time creation, and access counts.
303 access data, time creation, and access counts.
303
304
304 Store that into a memory DB so we can run different sorting strategies easily.
305 Store that into a memory DB so we can run different sorting strategies easily.
305 Summing the size is a sum sql query.
306 Summing the size is a sum sql query.
306
307
307 Then we run a sorting strategy based on eviction policy.
308 Then we run a sorting strategy based on eviction policy.
308 We iterate over sorted keys, and remove each checking if we hit the overall limit.
309 We iterate over sorted keys, and remove each checking if we hit the overall limit.
309 """
310 """
310
311
311 policy = policy or self._eviction_policy
312 policy = policy or self._eviction_policy
312 size_limit = size_limit or self._cache_size_limit
313 size_limit = size_limit or self._cache_size_limit
313
314
314 select_policy = EVICTION_POLICY[policy]['evict']
315 select_policy = EVICTION_POLICY[policy]['evict']
315
316
317 log.debug('Running eviction policy \'%s\', and checking for size limit: %s',
318 policy, format_size(size_limit))
319
316 if select_policy is None:
320 if select_policy is None:
317 return 0
321 return 0
318
322
319 db = DB()
323 db = DB()
320
324
321 data = []
325 data = []
322 cnt = 1
326 cnt = 1
323 for shard in self._shards:
327 for shard in self._shards:
324 for key_file in os.listdir(shard._directory):
328 for key_file in os.listdir(shard._directory):
325 if key_file.endswith('.key'):
329 if key_file.endswith('.key'):
326 key_file_path = os.path.join(shard._directory, key_file)
330 key_file_path = os.path.join(shard._directory, key_file)
327 with open(key_file_path, 'rb') as f:
331 with open(key_file_path, 'rb') as f:
328 metadata = json.loads(f.read())
332 metadata = json.loads(f.read())
333
334 size = metadata.get('size')
335 filename = metadata.get('filename')
336 full_path = metadata.get('full_path')
337
338 if not size:
329 # in case we don't have size re-calc it...
339 # in case we don't have size re-calc it...
330 if not metadata.get('size'):
340 size = os.stat(full_path).st_size
331 fn = metadata.get('full_path')
332 size = os.stat(fn).st_size
333
341
334 data.append([
342 data.append([
335 cnt,
343 cnt,
336 key_file,
344 key_file,
337 key_file_path,
345 key_file_path,
338 metadata.get('filename'),
346 filename,
339 metadata.get('full_path'),
347 full_path,
340 metadata.get('store_time', 0),
348 metadata.get('store_time', 0),
341 metadata.get('access_time', 0),
349 metadata.get('access_time', 0),
342 metadata.get('access_count', 0),
350 metadata.get('access_count', 0),
343 metadata.get('size', size),
351 size,
344 ])
352 ])
345 cnt += 1
353 cnt += 1
346
354
347 # Insert bulk data using executemany
355 # Insert bulk data using executemany
348 db.bulk_insert(data)
356 db.bulk_insert(data)
349
357
350 ((total_size,),) = db.sql('SELECT COALESCE(SUM(size), 0) FROM archive_cache').fetchall()
358 ((total_size,),) = db.sql('SELECT COALESCE(SUM(size), 0) FROM archive_cache').fetchall()
351
359 log.debug('Analyzed %s keys, occupied: %s', len(data), format_size(total_size))
352 select_policy_qry = select_policy.format(fields='key_file_path, full_path, size')
360 select_policy_qry = select_policy.format(fields='key_file_path, full_path, size')
353 sorted_keys = db.sql(select_policy_qry).fetchall()
361 sorted_keys = db.sql(select_policy_qry).fetchall()
354
362
363 removed_items = 0
364 removed_size = 0
355 for key, cached_file, size in sorted_keys:
365 for key, cached_file, size in sorted_keys:
356 # simulate removal impact BEFORE removal
366 # simulate removal impact BEFORE removal
357 total_size -= size
367 total_size -= size
368
358 if total_size <= size_limit:
369 if total_size <= size_limit:
359 # we obtained what we wanted...
370 # we obtained what we wanted...
360 break
371 break
361
372
362 os.remove(cached_file)
373 os.remove(cached_file)
363 os.remove(key)
374 os.remove(key)
364 return
375 removed_items += 1
376 removed_size += size
377
378 log.debug('Removed %s cache archives, and reduced size: %s', removed_items, format_size(removed_size))
379 return removed_items
365
380
366
381
367 def get_archival_config(config):
382 def get_archival_config(config):
368
383
369 final_config = {
384 final_config = {
370
385
371 }
386 }
372
387
373 for k, v in config.items():
388 for k, v in config.items():
374 if k.startswith('archive_cache'):
389 if k.startswith('archive_cache'):
375 final_config[k] = v
390 final_config[k] = v
376
391
377 return final_config
392 return final_config
378
393
379
394
380 def get_archival_cache_store(config):
395 def get_archival_cache_store(config):
381
396
382 global cache_meta
397 global cache_meta
383 if cache_meta is not None:
398 if cache_meta is not None:
384 return cache_meta
399 return cache_meta
385
400
386 config = get_archival_config(config)
401 config = get_archival_config(config)
387 backend = config['archive_cache.backend.type']
402 backend = config['archive_cache.backend.type']
388 if backend != 'filesystem':
403 if backend != 'filesystem':
389 raise ValueError('archive_cache.backend.type only supports "filesystem"')
404 raise ValueError('archive_cache.backend.type only supports "filesystem"')
390
405
391 archive_cache_locking_url = config['archive_cache.locking.url']
406 archive_cache_locking_url = config['archive_cache.locking.url']
392 archive_cache_dir = config['archive_cache.filesystem.store_dir']
407 archive_cache_dir = config['archive_cache.filesystem.store_dir']
393 archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
408 archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
394 archive_cache_shards = config['archive_cache.filesystem.cache_shards']
409 archive_cache_shards = config['archive_cache.filesystem.cache_shards']
395 archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
410 archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
396
411
397 log.debug('Initializing archival cache instance under %s', archive_cache_dir)
412 log.debug('Initializing archival cache instance under %s', archive_cache_dir)
398
413
399 # check if it's ok to write, and re-create the archive cache
414 # check if it's ok to write, and re-create the archive cache
400 if not os.path.isdir(archive_cache_dir):
415 if not os.path.isdir(archive_cache_dir):
401 os.makedirs(archive_cache_dir, exist_ok=True)
416 os.makedirs(archive_cache_dir, exist_ok=True)
402
417
403 d_cache = FanoutCache(
418 d_cache = FanoutCache(
404 archive_cache_dir,
419 archive_cache_dir,
405 locking_url=archive_cache_locking_url,
420 locking_url=archive_cache_locking_url,
406 cache_shards=archive_cache_shards,
421 cache_shards=archive_cache_shards,
407 cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
422 cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
408 cache_eviction_policy=archive_cache_eviction_policy
423 cache_eviction_policy=archive_cache_eviction_policy
409 )
424 )
410 cache_meta = d_cache
425 cache_meta = d_cache
411 return cache_meta
426 return cache_meta
@@ -1,59 +1,58 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2024 RhodeCode GmbH
2 # Copyright (C) 2014-2024 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 import redis
18 import redis
19 from vcsserver.lib._vendor import redis_lock
19 from ..._vendor import redis_lock
20
21 from .utils import ArchiveCacheLock
20 from .utils import ArchiveCacheLock
22
21
23
22
24 class GenerationLock:
23 class GenerationLock:
25 """
24 """
26 Locking mechanism that detects if a lock is acquired
25 Locking mechanism that detects if a lock is acquired
27
26
28 with GenerationLock(lock_key):
27 with GenerationLock(lock_key):
29 compute_archive()
28 compute_archive()
30 """
29 """
31 lock_timeout = 7200
30 lock_timeout = 7200
32
31
33 def __init__(self, lock_key, url):
32 def __init__(self, lock_key, url):
34 self.lock_key = lock_key
33 self.lock_key = lock_key
35 self._create_client(url)
34 self._create_client(url)
36 self.lock = self.get_lock()
35 self.lock = self.get_lock()
37
36
38 def _create_client(self, url):
37 def _create_client(self, url):
39 connection_pool = redis.ConnectionPool.from_url(url)
38 connection_pool = redis.ConnectionPool.from_url(url)
40 self.writer_client = redis.StrictRedis(
39 self.writer_client = redis.StrictRedis(
41 connection_pool=connection_pool
40 connection_pool=connection_pool
42 )
41 )
43 self.reader_client = self.writer_client
42 self.reader_client = self.writer_client
44
43
45 def get_lock(self):
44 def get_lock(self):
46 return redis_lock.Lock(
45 return redis_lock.Lock(
47 redis_client=self.writer_client,
46 redis_client=self.writer_client,
48 name=self.lock_key,
47 name=self.lock_key,
49 expire=self.lock_timeout,
48 expire=self.lock_timeout,
50 strict=True
49 strict=True
51 )
50 )
52
51
53 def __enter__(self):
52 def __enter__(self):
54 acquired = self.lock.acquire(blocking=False)
53 acquired = self.lock.acquire(blocking=False)
55 if not acquired:
54 if not acquired:
56 raise ArchiveCacheLock('Failed to create a lock')
55 raise ArchiveCacheLock('Failed to create a lock')
57
56
58 def __exit__(self, exc_type, exc_val, exc_tb):
57 def __exit__(self, exc_type, exc_val, exc_tb):
59 self.lock.release()
58 self.lock.release()
General Comments 0
You need to be logged in to leave comments. Login now