##// END OF EJS Templates
fix: fixed logging in archive-cache
super-admin -
r1258:4a8b96ae default
parent child Browse files
Show More
@@ -1,174 +1,174 b''
1 1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import codecs
20 20 import hashlib
21 21 import logging
22 22 import os
23 23
24 24 import fsspec
25 25
26 26 from .base import BaseCache, BaseShard
27 27 from ..utils import ShardFileReader, NOT_GIVEN
28 28 from ...type_utils import str2bool
29 29
30 30 log = logging.getLogger(__name__)
31 31
32 32
33 33 class FileSystemShard(BaseShard):
34 34
35 35 def __init__(self, index, directory, directory_folder, fs, **settings):
36 36 self._index = index
37 37 self._directory = directory
38 38 self._directory_folder = directory_folder
39 39 self.storage_type = 'directory'
40 40
41 41 self.fs = fs
42 42
43 43 @property
44 44 def directory(self):
45 45 """Cache directory final path."""
46 46 return os.path.join(self._directory, self._directory_folder)
47 47
48 48 def _get_keyfile(self, archive_key) -> tuple[str, str]:
49 49 key_file = f'{archive_key}.{self.key_suffix}'
50 50 return key_file, os.path.join(self.directory, key_file)
51 51
52 52 def _get_writer(self, path, mode):
53 53 for count in range(1, 11):
54 54 try:
55 55 # Another cache may have deleted the directory before
56 56 # the file could be opened.
57 57 return self.fs.open(path, mode)
58 58 except OSError:
59 59 if count == 10:
60 60 # Give up after 10 tries to open the file.
61 61 raise
62 62 continue
63 63
64 64 def _write_file(self, full_path, iterator, mode):
65 65 # ensure dir exists
66 66 destination, _ = os.path.split(full_path)
67 67 if not self.fs.exists(destination):
68 68 self.fs.makedirs(destination)
69 69
70 70 writer = self._get_writer(full_path, mode)
71 71
72 72 digest = hashlib.sha256()
73 73 with writer:
74 74 size = 0
75 75 for chunk in iterator:
76 76 size += len(chunk)
77 77 digest.update(chunk)
78 78 writer.write(chunk)
79 79 writer.flush()
80 80 # Get the file descriptor
81 81 fd = writer.fileno()
82 82
83 83 # Sync the file descriptor to disk, helps with NFS cases...
84 84 os.fsync(fd)
85 85 sha256 = digest.hexdigest()
86 86 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
87 87 return size, sha256
88 88
89 89 def store(self, key, value_reader, metadata: dict | None = None):
90 90 return self._store(key, value_reader, metadata, mode='xb')
91 91
92 92 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
93 93 return self._fetch(key, retry, retry_attempts, retry_backoff)
94 94
95 95 def remove(self, key):
96 96 return self._remove(key)
97 97
98 98 def random_filename(self):
99 99 """Return filename and full-path tuple for file storage.
100 100
101 101 Filename will be a randomly generated 28 character hexadecimal string
102 102 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
103 103 reduce the size of directories. On older filesystems, lookups in
104 104 directories with many files may be slow.
105 105 """
106 106
107 107 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
108 108
109 109 archive_name = hex_name[4:] + '.archive_cache'
110 110 filename = f"{hex_name[:2]}/{hex_name[2:4]}/{archive_name}"
111 111
112 112 full_path = os.path.join(self.directory, filename)
113 113 return archive_name, full_path
114 114
115 115 def __repr__(self):
116 116 return f'{self.__class__.__name__}(index={self._index}, dir={self.directory})'
117 117
118 118
119 119 class FileSystemFanoutCache(BaseCache):
120 120 shard_name = 'shard_%03d'
121 121 shard_cls = FileSystemShard
122 122
123 123 def __init__(self, locking_url, **settings):
124 124 """
125 125 Initialize file system cache instance.
126 126
127 127 :param str locking_url: redis url for a lock
128 128 :param settings: settings dict
129 129
130 130 """
131 131 self._locking_url = locking_url
132 132 self._config = settings
133 133 cache_dir = self.get_conf('archive_cache.filesystem.store_dir')
134 134 directory = str(cache_dir)
135 135 directory = os.path.expanduser(directory)
136 136 directory = os.path.expandvars(directory)
137 137 self._directory = directory
138 138 self._storage_path = directory # common path for all from BaseCache
139 139
140 140 self._shard_count = int(self.get_conf('archive_cache.filesystem.cache_shards', pop=True))
141 141 if self._shard_count < 1:
142 142 raise ValueError('cache_shards must be 1 or more')
143 143
144 144 self._eviction_policy = self.get_conf('archive_cache.filesystem.eviction_policy', pop=True)
145 145 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.filesystem.cache_size_gb')))
146 146
147 147 self.retry = str2bool(self.get_conf('archive_cache.filesystem.retry', pop=True))
148 148 self.retry_attempts = int(self.get_conf('archive_cache.filesystem.retry_attempts', pop=True))
149 149 self.retry_backoff = int(self.get_conf('archive_cache.filesystem.retry_backoff', pop=True))
150 150
151 log.debug('Initializing %s archival cache instance under %s', self)
151 log.debug('Initializing %s archival cache instance', self)
152 152 fs = fsspec.filesystem('file')
153 153 # check if it's ok to write, and re-create the archive cache main dir
154 154 # A directory is the virtual equivalent of a physical file cabinet.
155 155 # In other words, it's a container for organizing digital data.
156 156 # Unlike a folder, which can only store files, a directory can store files,
157 157 # subdirectories, and other directories.
158 158 if not fs.exists(self._directory):
159 159 fs.makedirs(self._directory, exist_ok=True)
160 160
161 161 self._shards = tuple(
162 162 self.shard_cls(
163 163 index=num,
164 164 directory=directory,
165 165 directory_folder=self.shard_name % num,
166 166 fs=fs,
167 167 **settings,
168 168 )
169 169 for num in range(self._shard_count)
170 170 )
171 171 self._hash = self._shards[0].hash
172 172
173 173 def _get_size(self, shard, archive_path):
174 174 return os.stat(archive_path).st_size
@@ -1,164 +1,164 b''
1 1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import codecs
20 20 import hashlib
21 21 import logging
22 22 import os
23 23
24 24 import fsspec
25 25
26 26 from .base import BaseCache, BaseShard
27 27 from ..utils import ShardFileReader, NOT_GIVEN
28 28 from ...type_utils import str2bool
29 29
30 30 log = logging.getLogger(__name__)
31 31
32 32
33 33 class S3Shard(BaseShard):
34 34
35 35 def __init__(self, index, bucket, bucket_folder, fs, **settings):
36 36 self._index = index
37 37 self._bucket_folder = bucket_folder
38 38 self.storage_type = 'bucket'
39 39 self._bucket_main = bucket
40 40
41 41 self.fs = fs
42 42
43 43 @property
44 44 def bucket(self):
45 45 """Cache bucket final path."""
46 46 return os.path.join(self._bucket_main, self._bucket_folder)
47 47
48 48 def _get_keyfile(self, archive_key) -> tuple[str, str]:
49 49 key_file = f'{archive_key}-{self.key_suffix}'
50 50 return key_file, os.path.join(self.bucket, key_file)
51 51
52 52 def _get_writer(self, path, mode):
53 53 return self.fs.open(path, 'wb')
54 54
55 55 def _write_file(self, full_path, iterator, mode):
56 56
57 57 # ensure folder in bucket exists
58 58 destination = self.bucket
59 59 if not self.fs.exists(destination):
60 60 self.fs.mkdir(destination, s3_additional_kwargs={})
61 61
62 62 writer = self._get_writer(full_path, mode)
63 63
64 64 digest = hashlib.sha256()
65 65 with writer:
66 66 size = 0
67 67 for chunk in iterator:
68 68 size += len(chunk)
69 69 digest.update(chunk)
70 70 writer.write(chunk)
71 71
72 72 sha256 = digest.hexdigest()
73 73 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
74 74 return size, sha256
75 75
76 76 def store(self, key, value_reader, metadata: dict | None = None):
77 77 return self._store(key, value_reader, metadata, mode='wb')
78 78
79 79 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
80 80 return self._fetch(key, retry, retry_attempts, retry_backoff)
81 81
82 82 def remove(self, key):
83 83 return self._remove(key)
84 84
85 85 def random_filename(self):
86 86 """Return filename and full-path tuple for file storage.
87 87
88 88 Filename will be a randomly generated 28 character hexadecimal string
89 89 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
90 90 reduce the size of directories. On older filesystems, lookups in
91 91 directories with many files may be slow.
92 92 """
93 93
94 94 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
95 95
96 96 archive_name = hex_name[4:] + '.archive_cache'
97 97 filename = f"{hex_name[:2]}-{hex_name[2:4]}-{archive_name}"
98 98
99 99 full_path = os.path.join(self.bucket, filename)
100 100 return archive_name, full_path
101 101
102 102 def __repr__(self):
103 103 return f'{self.__class__.__name__}(index={self._index}, bucket={self.bucket})'
104 104
105 105
106 106 class ObjectStoreCache(BaseCache):
107 107 shard_name = 'shard-%03d'
108 108 shard_cls = S3Shard
109 109
110 110 def __init__(self, locking_url, **settings):
111 111 """
112 112 Initialize objectstore cache instance.
113 113
114 114 :param str locking_url: redis url for a lock
115 115 :param settings: settings dict
116 116
117 117 """
118 118 self._locking_url = locking_url
119 119 self._config = settings
120 120
121 121 objectstore_url = self.get_conf('archive_cache.objectstore.url')
122 122 self._storage_path = objectstore_url # common path for all from BaseCache
123 123
124 124 self._shard_count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True))
125 125 if self._shard_count < 1:
126 126 raise ValueError('cache_shards must be 1 or more')
127 127
128 128 self._bucket = settings.pop('archive_cache.objectstore.bucket')
129 129 if not self._bucket:
130 130 raise ValueError('archive_cache.objectstore.bucket needs to have a value')
131 131
132 132 self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True)
133 133 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb')))
134 134
135 135 self.retry = str2bool(self.get_conf('archive_cache.objectstore.retry', pop=True))
136 136 self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True))
137 137 self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True))
138 138
139 139 endpoint_url = settings.pop('archive_cache.objectstore.url')
140 140 key = settings.pop('archive_cache.objectstore.key')
141 141 secret = settings.pop('archive_cache.objectstore.secret')
142 142
143 log.debug('Initializing %s archival cache instance under %s', self)
143 log.debug('Initializing %s archival cache instance', self)
144 144
145 145 fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret)
146 146
147 147 # init main bucket
148 148 if not fs.exists(self._bucket):
149 149 fs.mkdir(self._bucket)
150 150
151 151 self._shards = tuple(
152 152 self.shard_cls(
153 153 index=num,
154 154 bucket=self._bucket,
155 155 bucket_folder=self.shard_name % num,
156 156 fs=fs,
157 157 **settings,
158 158 )
159 159 for num in range(self._shard_count)
160 160 )
161 161 self._hash = self._shards[0].hash
162 162
163 163 def _get_size(self, shard, archive_path):
164 164 return shard.fs.info(archive_path)['size']
General Comments 0
You need to be logged in to leave comments. Login now