##// END OF EJS Templates
fix: fixed logging in archive-cache
super-admin -
r1258:4a8b96ae default
parent child Browse files
Show More
@@ -1,174 +1,174 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
18
19 import codecs
19 import codecs
20 import hashlib
20 import hashlib
21 import logging
21 import logging
22 import os
22 import os
23
23
24 import fsspec
24 import fsspec
25
25
26 from .base import BaseCache, BaseShard
26 from .base import BaseCache, BaseShard
27 from ..utils import ShardFileReader, NOT_GIVEN
27 from ..utils import ShardFileReader, NOT_GIVEN
28 from ...type_utils import str2bool
28 from ...type_utils import str2bool
29
29
30 log = logging.getLogger(__name__)
30 log = logging.getLogger(__name__)
31
31
32
32
33 class FileSystemShard(BaseShard):
33 class FileSystemShard(BaseShard):
34
34
35 def __init__(self, index, directory, directory_folder, fs, **settings):
35 def __init__(self, index, directory, directory_folder, fs, **settings):
36 self._index = index
36 self._index = index
37 self._directory = directory
37 self._directory = directory
38 self._directory_folder = directory_folder
38 self._directory_folder = directory_folder
39 self.storage_type = 'directory'
39 self.storage_type = 'directory'
40
40
41 self.fs = fs
41 self.fs = fs
42
42
43 @property
43 @property
44 def directory(self):
44 def directory(self):
45 """Cache directory final path."""
45 """Cache directory final path."""
46 return os.path.join(self._directory, self._directory_folder)
46 return os.path.join(self._directory, self._directory_folder)
47
47
48 def _get_keyfile(self, archive_key) -> tuple[str, str]:
48 def _get_keyfile(self, archive_key) -> tuple[str, str]:
49 key_file = f'{archive_key}.{self.key_suffix}'
49 key_file = f'{archive_key}.{self.key_suffix}'
50 return key_file, os.path.join(self.directory, key_file)
50 return key_file, os.path.join(self.directory, key_file)
51
51
52 def _get_writer(self, path, mode):
52 def _get_writer(self, path, mode):
53 for count in range(1, 11):
53 for count in range(1, 11):
54 try:
54 try:
55 # Another cache may have deleted the directory before
55 # Another cache may have deleted the directory before
56 # the file could be opened.
56 # the file could be opened.
57 return self.fs.open(path, mode)
57 return self.fs.open(path, mode)
58 except OSError:
58 except OSError:
59 if count == 10:
59 if count == 10:
60 # Give up after 10 tries to open the file.
60 # Give up after 10 tries to open the file.
61 raise
61 raise
62 continue
62 continue
63
63
64 def _write_file(self, full_path, iterator, mode):
64 def _write_file(self, full_path, iterator, mode):
65 # ensure dir exists
65 # ensure dir exists
66 destination, _ = os.path.split(full_path)
66 destination, _ = os.path.split(full_path)
67 if not self.fs.exists(destination):
67 if not self.fs.exists(destination):
68 self.fs.makedirs(destination)
68 self.fs.makedirs(destination)
69
69
70 writer = self._get_writer(full_path, mode)
70 writer = self._get_writer(full_path, mode)
71
71
72 digest = hashlib.sha256()
72 digest = hashlib.sha256()
73 with writer:
73 with writer:
74 size = 0
74 size = 0
75 for chunk in iterator:
75 for chunk in iterator:
76 size += len(chunk)
76 size += len(chunk)
77 digest.update(chunk)
77 digest.update(chunk)
78 writer.write(chunk)
78 writer.write(chunk)
79 writer.flush()
79 writer.flush()
80 # Get the file descriptor
80 # Get the file descriptor
81 fd = writer.fileno()
81 fd = writer.fileno()
82
82
83 # Sync the file descriptor to disk, helps with NFS cases...
83 # Sync the file descriptor to disk, helps with NFS cases...
84 os.fsync(fd)
84 os.fsync(fd)
85 sha256 = digest.hexdigest()
85 sha256 = digest.hexdigest()
86 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
86 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
87 return size, sha256
87 return size, sha256
88
88
89 def store(self, key, value_reader, metadata: dict | None = None):
89 def store(self, key, value_reader, metadata: dict | None = None):
90 return self._store(key, value_reader, metadata, mode='xb')
90 return self._store(key, value_reader, metadata, mode='xb')
91
91
92 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
92 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
93 return self._fetch(key, retry, retry_attempts, retry_backoff)
93 return self._fetch(key, retry, retry_attempts, retry_backoff)
94
94
95 def remove(self, key):
95 def remove(self, key):
96 return self._remove(key)
96 return self._remove(key)
97
97
98 def random_filename(self):
98 def random_filename(self):
99 """Return filename and full-path tuple for file storage.
99 """Return filename and full-path tuple for file storage.
100
100
101 Filename will be a randomly generated 28 character hexadecimal string
101 Filename will be a randomly generated 28 character hexadecimal string
102 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
102 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
103 reduce the size of directories. On older filesystems, lookups in
103 reduce the size of directories. On older filesystems, lookups in
104 directories with many files may be slow.
104 directories with many files may be slow.
105 """
105 """
106
106
107 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
107 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
108
108
109 archive_name = hex_name[4:] + '.archive_cache'
109 archive_name = hex_name[4:] + '.archive_cache'
110 filename = f"{hex_name[:2]}/{hex_name[2:4]}/{archive_name}"
110 filename = f"{hex_name[:2]}/{hex_name[2:4]}/{archive_name}"
111
111
112 full_path = os.path.join(self.directory, filename)
112 full_path = os.path.join(self.directory, filename)
113 return archive_name, full_path
113 return archive_name, full_path
114
114
115 def __repr__(self):
115 def __repr__(self):
116 return f'{self.__class__.__name__}(index={self._index}, dir={self.directory})'
116 return f'{self.__class__.__name__}(index={self._index}, dir={self.directory})'
117
117
118
118
119 class FileSystemFanoutCache(BaseCache):
119 class FileSystemFanoutCache(BaseCache):
120 shard_name = 'shard_%03d'
120 shard_name = 'shard_%03d'
121 shard_cls = FileSystemShard
121 shard_cls = FileSystemShard
122
122
123 def __init__(self, locking_url, **settings):
123 def __init__(self, locking_url, **settings):
124 """
124 """
125 Initialize file system cache instance.
125 Initialize file system cache instance.
126
126
127 :param str locking_url: redis url for a lock
127 :param str locking_url: redis url for a lock
128 :param settings: settings dict
128 :param settings: settings dict
129
129
130 """
130 """
131 self._locking_url = locking_url
131 self._locking_url = locking_url
132 self._config = settings
132 self._config = settings
133 cache_dir = self.get_conf('archive_cache.filesystem.store_dir')
133 cache_dir = self.get_conf('archive_cache.filesystem.store_dir')
134 directory = str(cache_dir)
134 directory = str(cache_dir)
135 directory = os.path.expanduser(directory)
135 directory = os.path.expanduser(directory)
136 directory = os.path.expandvars(directory)
136 directory = os.path.expandvars(directory)
137 self._directory = directory
137 self._directory = directory
138 self._storage_path = directory # common path for all from BaseCache
138 self._storage_path = directory # common path for all from BaseCache
139
139
140 self._shard_count = int(self.get_conf('archive_cache.filesystem.cache_shards', pop=True))
140 self._shard_count = int(self.get_conf('archive_cache.filesystem.cache_shards', pop=True))
141 if self._shard_count < 1:
141 if self._shard_count < 1:
142 raise ValueError('cache_shards must be 1 or more')
142 raise ValueError('cache_shards must be 1 or more')
143
143
144 self._eviction_policy = self.get_conf('archive_cache.filesystem.eviction_policy', pop=True)
144 self._eviction_policy = self.get_conf('archive_cache.filesystem.eviction_policy', pop=True)
145 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.filesystem.cache_size_gb')))
145 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.filesystem.cache_size_gb')))
146
146
147 self.retry = str2bool(self.get_conf('archive_cache.filesystem.retry', pop=True))
147 self.retry = str2bool(self.get_conf('archive_cache.filesystem.retry', pop=True))
148 self.retry_attempts = int(self.get_conf('archive_cache.filesystem.retry_attempts', pop=True))
148 self.retry_attempts = int(self.get_conf('archive_cache.filesystem.retry_attempts', pop=True))
149 self.retry_backoff = int(self.get_conf('archive_cache.filesystem.retry_backoff', pop=True))
149 self.retry_backoff = int(self.get_conf('archive_cache.filesystem.retry_backoff', pop=True))
150
150
151 log.debug('Initializing %s archival cache instance under %s', self)
151 log.debug('Initializing %s archival cache instance', self)
152 fs = fsspec.filesystem('file')
152 fs = fsspec.filesystem('file')
153 # check if it's ok to write, and re-create the archive cache main dir
153 # check if it's ok to write, and re-create the archive cache main dir
154 # A directory is the virtual equivalent of a physical file cabinet.
154 # A directory is the virtual equivalent of a physical file cabinet.
155 # In other words, it's a container for organizing digital data.
155 # In other words, it's a container for organizing digital data.
156 # Unlike a folder, which can only store files, a directory can store files,
156 # Unlike a folder, which can only store files, a directory can store files,
157 # subdirectories, and other directories.
157 # subdirectories, and other directories.
158 if not fs.exists(self._directory):
158 if not fs.exists(self._directory):
159 fs.makedirs(self._directory, exist_ok=True)
159 fs.makedirs(self._directory, exist_ok=True)
160
160
161 self._shards = tuple(
161 self._shards = tuple(
162 self.shard_cls(
162 self.shard_cls(
163 index=num,
163 index=num,
164 directory=directory,
164 directory=directory,
165 directory_folder=self.shard_name % num,
165 directory_folder=self.shard_name % num,
166 fs=fs,
166 fs=fs,
167 **settings,
167 **settings,
168 )
168 )
169 for num in range(self._shard_count)
169 for num in range(self._shard_count)
170 )
170 )
171 self._hash = self._shards[0].hash
171 self._hash = self._shards[0].hash
172
172
173 def _get_size(self, shard, archive_path):
173 def _get_size(self, shard, archive_path):
174 return os.stat(archive_path).st_size
174 return os.stat(archive_path).st_size
@@ -1,164 +1,164 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
18
19 import codecs
19 import codecs
20 import hashlib
20 import hashlib
21 import logging
21 import logging
22 import os
22 import os
23
23
24 import fsspec
24 import fsspec
25
25
26 from .base import BaseCache, BaseShard
26 from .base import BaseCache, BaseShard
27 from ..utils import ShardFileReader, NOT_GIVEN
27 from ..utils import ShardFileReader, NOT_GIVEN
28 from ...type_utils import str2bool
28 from ...type_utils import str2bool
29
29
30 log = logging.getLogger(__name__)
30 log = logging.getLogger(__name__)
31
31
32
32
33 class S3Shard(BaseShard):
33 class S3Shard(BaseShard):
34
34
35 def __init__(self, index, bucket, bucket_folder, fs, **settings):
35 def __init__(self, index, bucket, bucket_folder, fs, **settings):
36 self._index = index
36 self._index = index
37 self._bucket_folder = bucket_folder
37 self._bucket_folder = bucket_folder
38 self.storage_type = 'bucket'
38 self.storage_type = 'bucket'
39 self._bucket_main = bucket
39 self._bucket_main = bucket
40
40
41 self.fs = fs
41 self.fs = fs
42
42
43 @property
43 @property
44 def bucket(self):
44 def bucket(self):
45 """Cache bucket final path."""
45 """Cache bucket final path."""
46 return os.path.join(self._bucket_main, self._bucket_folder)
46 return os.path.join(self._bucket_main, self._bucket_folder)
47
47
48 def _get_keyfile(self, archive_key) -> tuple[str, str]:
48 def _get_keyfile(self, archive_key) -> tuple[str, str]:
49 key_file = f'{archive_key}-{self.key_suffix}'
49 key_file = f'{archive_key}-{self.key_suffix}'
50 return key_file, os.path.join(self.bucket, key_file)
50 return key_file, os.path.join(self.bucket, key_file)
51
51
52 def _get_writer(self, path, mode):
52 def _get_writer(self, path, mode):
53 return self.fs.open(path, 'wb')
53 return self.fs.open(path, 'wb')
54
54
55 def _write_file(self, full_path, iterator, mode):
55 def _write_file(self, full_path, iterator, mode):
56
56
57 # ensure folder in bucket exists
57 # ensure folder in bucket exists
58 destination = self.bucket
58 destination = self.bucket
59 if not self.fs.exists(destination):
59 if not self.fs.exists(destination):
60 self.fs.mkdir(destination, s3_additional_kwargs={})
60 self.fs.mkdir(destination, s3_additional_kwargs={})
61
61
62 writer = self._get_writer(full_path, mode)
62 writer = self._get_writer(full_path, mode)
63
63
64 digest = hashlib.sha256()
64 digest = hashlib.sha256()
65 with writer:
65 with writer:
66 size = 0
66 size = 0
67 for chunk in iterator:
67 for chunk in iterator:
68 size += len(chunk)
68 size += len(chunk)
69 digest.update(chunk)
69 digest.update(chunk)
70 writer.write(chunk)
70 writer.write(chunk)
71
71
72 sha256 = digest.hexdigest()
72 sha256 = digest.hexdigest()
73 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
73 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
74 return size, sha256
74 return size, sha256
75
75
76 def store(self, key, value_reader, metadata: dict | None = None):
76 def store(self, key, value_reader, metadata: dict | None = None):
77 return self._store(key, value_reader, metadata, mode='wb')
77 return self._store(key, value_reader, metadata, mode='wb')
78
78
79 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
79 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
80 return self._fetch(key, retry, retry_attempts, retry_backoff)
80 return self._fetch(key, retry, retry_attempts, retry_backoff)
81
81
82 def remove(self, key):
82 def remove(self, key):
83 return self._remove(key)
83 return self._remove(key)
84
84
85 def random_filename(self):
85 def random_filename(self):
86 """Return filename and full-path tuple for file storage.
86 """Return filename and full-path tuple for file storage.
87
87
88 Filename will be a randomly generated 28 character hexadecimal string
88 Filename will be a randomly generated 28 character hexadecimal string
89 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
89 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
90 reduce the size of directories. On older filesystems, lookups in
90 reduce the size of directories. On older filesystems, lookups in
91 directories with many files may be slow.
91 directories with many files may be slow.
92 """
92 """
93
93
94 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
94 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
95
95
96 archive_name = hex_name[4:] + '.archive_cache'
96 archive_name = hex_name[4:] + '.archive_cache'
97 filename = f"{hex_name[:2]}-{hex_name[2:4]}-{archive_name}"
97 filename = f"{hex_name[:2]}-{hex_name[2:4]}-{archive_name}"
98
98
99 full_path = os.path.join(self.bucket, filename)
99 full_path = os.path.join(self.bucket, filename)
100 return archive_name, full_path
100 return archive_name, full_path
101
101
102 def __repr__(self):
102 def __repr__(self):
103 return f'{self.__class__.__name__}(index={self._index}, bucket={self.bucket})'
103 return f'{self.__class__.__name__}(index={self._index}, bucket={self.bucket})'
104
104
105
105
106 class ObjectStoreCache(BaseCache):
106 class ObjectStoreCache(BaseCache):
107 shard_name = 'shard-%03d'
107 shard_name = 'shard-%03d'
108 shard_cls = S3Shard
108 shard_cls = S3Shard
109
109
110 def __init__(self, locking_url, **settings):
110 def __init__(self, locking_url, **settings):
111 """
111 """
112 Initialize objectstore cache instance.
112 Initialize objectstore cache instance.
113
113
114 :param str locking_url: redis url for a lock
114 :param str locking_url: redis url for a lock
115 :param settings: settings dict
115 :param settings: settings dict
116
116
117 """
117 """
118 self._locking_url = locking_url
118 self._locking_url = locking_url
119 self._config = settings
119 self._config = settings
120
120
121 objectstore_url = self.get_conf('archive_cache.objectstore.url')
121 objectstore_url = self.get_conf('archive_cache.objectstore.url')
122 self._storage_path = objectstore_url # common path for all from BaseCache
122 self._storage_path = objectstore_url # common path for all from BaseCache
123
123
124 self._shard_count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True))
124 self._shard_count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True))
125 if self._shard_count < 1:
125 if self._shard_count < 1:
126 raise ValueError('cache_shards must be 1 or more')
126 raise ValueError('cache_shards must be 1 or more')
127
127
128 self._bucket = settings.pop('archive_cache.objectstore.bucket')
128 self._bucket = settings.pop('archive_cache.objectstore.bucket')
129 if not self._bucket:
129 if not self._bucket:
130 raise ValueError('archive_cache.objectstore.bucket needs to have a value')
130 raise ValueError('archive_cache.objectstore.bucket needs to have a value')
131
131
132 self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True)
132 self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True)
133 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb')))
133 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb')))
134
134
135 self.retry = str2bool(self.get_conf('archive_cache.objectstore.retry', pop=True))
135 self.retry = str2bool(self.get_conf('archive_cache.objectstore.retry', pop=True))
136 self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True))
136 self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True))
137 self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True))
137 self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True))
138
138
139 endpoint_url = settings.pop('archive_cache.objectstore.url')
139 endpoint_url = settings.pop('archive_cache.objectstore.url')
140 key = settings.pop('archive_cache.objectstore.key')
140 key = settings.pop('archive_cache.objectstore.key')
141 secret = settings.pop('archive_cache.objectstore.secret')
141 secret = settings.pop('archive_cache.objectstore.secret')
142
142
143 log.debug('Initializing %s archival cache instance under %s', self)
143 log.debug('Initializing %s archival cache instance', self)
144
144
145 fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret)
145 fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret)
146
146
147 # init main bucket
147 # init main bucket
148 if not fs.exists(self._bucket):
148 if not fs.exists(self._bucket):
149 fs.mkdir(self._bucket)
149 fs.mkdir(self._bucket)
150
150
151 self._shards = tuple(
151 self._shards = tuple(
152 self.shard_cls(
152 self.shard_cls(
153 index=num,
153 index=num,
154 bucket=self._bucket,
154 bucket=self._bucket,
155 bucket_folder=self.shard_name % num,
155 bucket_folder=self.shard_name % num,
156 fs=fs,
156 fs=fs,
157 **settings,
157 **settings,
158 )
158 )
159 for num in range(self._shard_count)
159 for num in range(self._shard_count)
160 )
160 )
161 self._hash = self._shards[0].hash
161 self._hash = self._shards[0].hash
162
162
163 def _get_size(self, shard, archive_path):
163 def _get_size(self, shard, archive_path):
164 return shard.fs.info(archive_path)['size']
164 return shard.fs.info(archive_path)['size']
General Comments 0
You need to be logged in to leave comments. Login now