##// END OF EJS Templates
feat(disk-cache): rewrite diskcache backend to be k8s and NFS safe....
super-admin -
r1241:29c2a6b0 default
parent child Browse files
Show More
@@ -0,0 +1,31 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2024 RhodeCode GmbH
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
18 from .fanout_cache import get_archival_cache_store
19 from .fanout_cache import get_archival_config
20
21 from .utils import archive_iterator
22 from .utils import ArchiveCacheLock
23
24
25 def includeme(config):
26 # NOTE: for vcsserver, we lazy init this and config is sent from RhodeCode
27 return
28
29 # init our cache at start
30 settings = config.get_settings()
31 get_archival_cache_store(settings)
@@ -0,0 +1,258 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2024 RhodeCode GmbH
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
18 import codecs
19 import contextlib
20 import functools
21 import os
22 import logging
23 import time
24 import typing
25 import zlib
26
27 from vcsserver.lib.rc_json import json
28 from .lock import GenerationLock
29
30 log = logging.getLogger(__name__)
31
32 cache_meta = None
33
34 UNKNOWN = -241
35 NO_VAL = -917
36
37 MODE_BINARY = 'BINARY'
38
39
40 class FileSystemCache:
41
42 def __init__(self, index, directory, **settings):
43 self._index = index
44 self._directory = directory
45
46 def _write_file(self, full_path, iterator, mode, encoding=None):
47 full_dir, _ = os.path.split(full_path)
48
49 for count in range(1, 11):
50 with contextlib.suppress(OSError):
51 os.makedirs(full_dir)
52
53 try:
54 # Another cache may have deleted the directory before
55 # the file could be opened.
56 writer = open(full_path, mode, encoding=encoding)
57 except OSError:
58 if count == 10:
59 # Give up after 10 tries to open the file.
60 raise
61 continue
62
63 with writer:
64 size = 0
65 for chunk in iterator:
66 size += len(chunk)
67 writer.write(chunk)
68 return size
69
70 def _get_keyfile(self, key):
71 return os.path.join(self._directory, f'{key}.key')
72
73 def store(self, key, value_reader, metadata):
74 filename, full_path = self.random_filename()
75 key_file = self._get_keyfile(key)
76
77 # STORE METADATA
78 _metadata = {
79 "version": "v1",
80 "timestamp": time.time(),
81 "filename": filename,
82 "full_path": full_path,
83 "key_file": key_file,
84 }
85 if metadata:
86 _metadata.update(metadata)
87
88 reader = functools.partial(value_reader.read, 2**22)
89
90 iterator = iter(reader, b'')
91 size = self._write_file(full_path, iterator, 'xb')
92
93 # after archive is finished, we create a key to save the presence of the binary file
94 with open(key_file, 'wb') as f:
95 f.write(json.dumps(_metadata))
96
97 return key, size, MODE_BINARY, filename, _metadata
98
99 def fetch(self, key) -> tuple[typing.BinaryIO, dict]:
100 if key not in self:
101 raise KeyError(key)
102
103 key_file = self._get_keyfile(key)
104 with open(key_file, 'rb') as f:
105 metadata = json.loads(f.read())
106
107 filename = metadata['filename']
108
109 return open(os.path.join(self._directory, filename), 'rb'), metadata
110
111 def random_filename(self):
112 """Return filename and full-path tuple for file storage.
113
114 Filename will be a randomly generated 28 character hexadecimal string
115 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
116 reduce the size of directories. On older filesystems, lookups in
117 directories with many files may be slow.
118 """
119
120 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
121 sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
122 name = hex_name[4:] + '.archive_cache'
123 filename = os.path.join(sub_dir, name)
124 full_path = os.path.join(self._directory, filename)
125 return filename, full_path
126
127 def hash(self, key):
128 """Compute portable hash for `key`.
129
130 :param key: key to hash
131 :return: hash value
132
133 """
134 mask = 0xFFFFFFFF
135 return zlib.adler32(key.encode('utf-8')) & mask # noqa
136
137 def __contains__(self, key):
138 """Return `True` if `key` matching item is found in cache.
139
140 :param key: key matching item
141 :return: True if key matching item
142
143 """
144 key_file = self._get_keyfile(key)
145 return os.path.exists(key_file)
146
147
148 class FanoutCache:
149 """Cache that shards keys and values."""
150
151 def __init__(
152 self, directory=None, **settings
153 ):
154 """Initialize cache instance.
155
156 :param str directory: cache directory
157 :param settings: settings dict
158
159 """
160 if directory is None:
161 raise ValueError('directory cannot be None')
162
163 directory = str(directory)
164 directory = os.path.expanduser(directory)
165 directory = os.path.expandvars(directory)
166 self._directory = directory
167
168 self._count = settings.pop('cache_shards')
169 self._locking_url = settings.pop('locking_url')
170
171 self._shards = tuple(
172 FileSystemCache(
173 index=num,
174 directory=os.path.join(directory, 'shard_%03d' % num),
175 **settings,
176 )
177 for num in range(self._count)
178 )
179 self._hash = self._shards[0].hash
180
181 def get_lock(self, lock_key):
182 return GenerationLock(lock_key, self._locking_url)
183
184 def _get_shard(self, key) -> FileSystemCache:
185 index = self._hash(key) % self._count
186 shard = self._shards[index]
187 return shard
188
189 def store(self, key, value_reader, metadata=None):
190 shard = self._get_shard(key)
191 return shard.store(key, value_reader, metadata)
192
193 def fetch(self, key):
194 """Return file handle corresponding to `key` from cache.
195 """
196 shard = self._get_shard(key)
197 return shard.fetch(key)
198
199 def has_key(self, key):
200 """Return `True` if `key` matching item is found in cache.
201
202 :param key: key for item
203 :return: True if key is found
204
205 """
206 shard = self._get_shard(key)
207 return key in shard
208
209 def __contains__(self, item):
210 return self.has_key(item)
211
212
213 def get_archival_config(config):
214
215 final_config = {
216
217 }
218
219 for k, v in config.items():
220 if k.startswith('archive_cache'):
221 final_config[k] = v
222
223 return final_config
224
225
226 def get_archival_cache_store(config):
227
228 global cache_meta
229 if cache_meta is not None:
230 return cache_meta
231
232 config = get_archival_config(config)
233 backend = config['archive_cache.backend.type']
234 if backend != 'filesystem':
235 raise ValueError('archive_cache.backend.type only supports "filesystem"')
236
237 archive_cache_locking_url = config['archive_cache.locking.url']
238 archive_cache_dir = config['archive_cache.filesystem.store_dir']
239 archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
240 archive_cache_shards = config['archive_cache.filesystem.cache_shards']
241 archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
242
243 log.debug('Initializing archival cache instance under %s', archive_cache_dir)
244
245 # check if it's ok to write, and re-create the archive cache
246 if not os.path.isdir(archive_cache_dir):
247 os.makedirs(archive_cache_dir, exist_ok=True)
248
249 d_cache = FanoutCache(
250 archive_cache_dir,
251 locking_url=archive_cache_locking_url,
252 cache_shards=archive_cache_shards,
253 cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
254 cache_eviction_policy=archive_cache_eviction_policy
255 )
256 cache_meta = d_cache
257 return cache_meta
258
@@ -0,0 +1,59 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2024 RhodeCode GmbH
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
18 import redis
19 from vcsserver.lib._vendor import redis_lock
20
21 from .utils import ArchiveCacheLock
22
23
24 class GenerationLock:
25 """
26 Locking mechanism that detects if a lock is acquired
27
28 with GenerationLock(lock_key):
29 compute_archive()
30 """
31 lock_timeout = 7200
32
33 def __init__(self, lock_key, url):
34 self.lock_key = lock_key
35 self._create_client(url)
36 self.lock = self.get_lock()
37
38 def _create_client(self, url):
39 connection_pool = redis.ConnectionPool.from_url(url)
40 self.writer_client = redis.StrictRedis(
41 connection_pool=connection_pool
42 )
43 self.reader_client = self.writer_client
44
45 def get_lock(self):
46 return redis_lock.Lock(
47 redis_client=self.writer_client,
48 name=self.lock_key,
49 expire=self.lock_timeout,
50 strict=True
51 )
52
53 def __enter__(self):
54 acquired = self.lock.acquire(blocking=False)
55 if not acquired:
56 raise ArchiveCacheLock('Failed to create a lock')
57
58 def __exit__(self, exc_type, exc_val, exc_tb):
59 self.lock.release()
@@ -0,0 +1,29 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2024 RhodeCode GmbH
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
18
19 class ArchiveCacheLock(Exception):
20 pass
21
22
23 def archive_iterator(_reader, block_size: int = 4096 * 512):
24 # 4096 * 64 = 64KB
25 while 1:
26 data = _reader.read(block_size)
27 if not data:
28 break
29 yield data
@@ -1,74 +1,73 b''
1 # deps, generated via pipdeptree --exclude setuptools,wheel,pipdeptree,pip -f | tr '[:upper:]' '[:lower:]'
1 # deps, generated via pipdeptree --exclude setuptools,wheel,pipdeptree,pip -f | tr '[:upper:]' '[:lower:]'
2
2
3 async-timeout==4.0.3
3 async-timeout==4.0.3
4 atomicwrites==1.4.1
4 atomicwrites==1.4.1
5 celery==5.3.6
5 celery==5.3.6
6 billiard==4.2.0
6 billiard==4.2.0
7 click==8.1.3
7 click==8.1.3
8 click-didyoumean==0.3.0
8 click-didyoumean==0.3.0
9 click==8.1.3
9 click==8.1.3
10 click-plugins==1.1.1
10 click-plugins==1.1.1
11 click==8.1.3
11 click==8.1.3
12 click-repl==0.2.0
12 click-repl==0.2.0
13 click==8.1.3
13 click==8.1.3
14 prompt-toolkit==3.0.38
14 prompt-toolkit==3.0.38
15 wcwidth==0.2.6
15 wcwidth==0.2.6
16 six==1.16.0
16 six==1.16.0
17 kombu==5.3.5
17 kombu==5.3.5
18 amqp==5.2.0
18 amqp==5.2.0
19 vine==5.1.0
19 vine==5.1.0
20 vine==5.1.0
20 vine==5.1.0
21 python-dateutil==2.8.2
21 python-dateutil==2.8.2
22 six==1.16.0
22 six==1.16.0
23 tzdata==2024.1
23 tzdata==2024.1
24 vine==5.1.0
24 vine==5.1.0
25 contextlib2==21.6.0
25 contextlib2==21.6.0
26 diskcache==5.6.3
27 dogpile.cache==1.3.3
26 dogpile.cache==1.3.3
28 decorator==5.1.1
27 decorator==5.1.1
29 stevedore==5.1.0
28 stevedore==5.1.0
30 pbr==5.11.1
29 pbr==5.11.1
31 dulwich==0.21.6
30 dulwich==0.21.6
32 urllib3==1.26.14
31 urllib3==1.26.14
33 gunicorn==21.2.0
32 gunicorn==21.2.0
34 packaging==24.0
33 packaging==24.0
35 hg-evolve==11.0.2
34 hg-evolve==11.0.2
36 importlib-metadata==6.0.0
35 importlib-metadata==6.0.0
37 zipp==3.15.0
36 zipp==3.15.0
38 mercurial==6.3.3
37 mercurial==6.3.3
39 more-itertools==9.1.0
38 more-itertools==9.1.0
40 msgpack==1.0.8
39 msgpack==1.0.8
41 orjson==3.10.3
40 orjson==3.10.3
42 psutil==5.9.8
41 psutil==5.9.8
43 py==1.11.0
42 py==1.11.0
44 pygit2==1.13.3
43 pygit2==1.13.3
45 cffi==1.16.0
44 cffi==1.16.0
46 pycparser==2.21
45 pycparser==2.21
47 pygments==2.15.1
46 pygments==2.15.1
48 pyparsing==3.1.1
47 pyparsing==3.1.1
49 pyramid==2.0.2
48 pyramid==2.0.2
50 hupper==1.12
49 hupper==1.12
51 plaster==1.1.2
50 plaster==1.1.2
52 plaster-pastedeploy==1.0.1
51 plaster-pastedeploy==1.0.1
53 pastedeploy==3.1.0
52 pastedeploy==3.1.0
54 plaster==1.1.2
53 plaster==1.1.2
55 translationstring==1.4
54 translationstring==1.4
56 venusian==3.0.0
55 venusian==3.0.0
57 webob==1.8.7
56 webob==1.8.7
58 zope.deprecation==5.0.0
57 zope.deprecation==5.0.0
59 zope.interface==6.3.0
58 zope.interface==6.3.0
60 redis==5.0.4
59 redis==5.0.4
61 async-timeout==4.0.3
60 async-timeout==4.0.3
62 repoze.lru==0.7
61 repoze.lru==0.7
63 scandir==1.10.0
62 scandir==1.10.0
64 setproctitle==1.3.3
63 setproctitle==1.3.3
65 subvertpy==0.11.0
64 subvertpy==0.11.0
66 waitress==3.0.0
65 waitress==3.0.0
67 wcwidth==0.2.6
66 wcwidth==0.2.6
68
67
69
68
70 ## test related requirements
69 ## test related requirements
71 #-r requirements_test.txt
70 #-r requirements_test.txt
72
71
73 ## uncomment to add the debug libraries
72 ## uncomment to add the debug libraries
74 #-r requirements_debug.txt
73 #-r requirements_debug.txt
@@ -1,193 +1,187 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2023 RhodeCode GmbH
2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 import os
17 import os
18 import sys
18 import sys
19 import tempfile
19 import tempfile
20 import logging
20 import logging
21 import urllib.parse
21 import urllib.parse
22
22
23 from vcsserver.lib.rc_cache.archive_cache import get_archival_cache_store
23 from vcsserver.lib.rc_cache.archive_cache import get_archival_cache_store
24
24
25 from vcsserver import exceptions
25 from vcsserver import exceptions
26 from vcsserver.exceptions import NoContentException
26 from vcsserver.exceptions import NoContentException
27 from vcsserver.hgcompat import archival
27 from vcsserver.hgcompat import archival
28 from vcsserver.str_utils import safe_bytes
28 from vcsserver.str_utils import safe_bytes
29 from vcsserver.lib.exc_tracking import format_exc
29 from vcsserver.lib.exc_tracking import format_exc
30 log = logging.getLogger(__name__)
30 log = logging.getLogger(__name__)
31
31
32
32
33 class RepoFactory:
33 class RepoFactory:
34 """
34 """
35 Utility to create instances of repository
35 Utility to create instances of repository
36
36
37 It provides internal caching of the `repo` object based on
37 It provides internal caching of the `repo` object based on
38 the :term:`call context`.
38 the :term:`call context`.
39 """
39 """
40 repo_type = None
40 repo_type = None
41
41
42 def __init__(self):
42 def __init__(self):
43 pass
43 pass
44
44
45 def _create_config(self, path, config):
45 def _create_config(self, path, config):
46 config = {}
46 config = {}
47 return config
47 return config
48
48
49 def _create_repo(self, wire, create):
49 def _create_repo(self, wire, create):
50 raise NotImplementedError()
50 raise NotImplementedError()
51
51
52 def repo(self, wire, create=False):
52 def repo(self, wire, create=False):
53 raise NotImplementedError()
53 raise NotImplementedError()
54
54
55
55
56 def obfuscate_qs(query_string):
56 def obfuscate_qs(query_string):
57 if query_string is None:
57 if query_string is None:
58 return None
58 return None
59
59
60 parsed = []
60 parsed = []
61 for k, v in urllib.parse.parse_qsl(query_string, keep_blank_values=True):
61 for k, v in urllib.parse.parse_qsl(query_string, keep_blank_values=True):
62 if k in ['auth_token', 'api_key']:
62 if k in ['auth_token', 'api_key']:
63 v = "*****"
63 v = "*****"
64 parsed.append((k, v))
64 parsed.append((k, v))
65
65
66 return '&'.join('{}{}'.format(
66 return '&'.join('{}{}'.format(
67 k, f'={v}' if v else '') for k, v in parsed)
67 k, f'={v}' if v else '') for k, v in parsed)
68
68
69
69
70 def raise_from_original(new_type, org_exc: Exception):
70 def raise_from_original(new_type, org_exc: Exception):
71 """
71 """
72 Raise a new exception type with original args and traceback.
72 Raise a new exception type with original args and traceback.
73 """
73 """
74 exc_info = sys.exc_info()
74 exc_info = sys.exc_info()
75 exc_type, exc_value, exc_traceback = exc_info
75 exc_type, exc_value, exc_traceback = exc_info
76 new_exc = new_type(*exc_value.args)
76 new_exc = new_type(*exc_value.args)
77
77
78 # store the original traceback into the new exc
78 # store the original traceback into the new exc
79 new_exc._org_exc_tb = format_exc(exc_info)
79 new_exc._org_exc_tb = format_exc(exc_info)
80
80
81 try:
81 try:
82 raise new_exc.with_traceback(exc_traceback)
82 raise new_exc.with_traceback(exc_traceback)
83 finally:
83 finally:
84 del exc_traceback
84 del exc_traceback
85
85
86
86
87 class ArchiveNode:
87 class ArchiveNode:
88 def __init__(self, path, mode, is_link, raw_bytes):
88 def __init__(self, path, mode, is_link, raw_bytes):
89 self.path = path
89 self.path = path
90 self.mode = mode
90 self.mode = mode
91 self.is_link = is_link
91 self.is_link = is_link
92 self.raw_bytes = raw_bytes
92 self.raw_bytes = raw_bytes
93
93
94
94
95 def store_archive_in_cache(node_walker, archive_key, kind, mtime, archive_at_path, archive_dir_name,
95 def store_archive_in_cache(node_walker, archive_key, kind, mtime, archive_at_path, archive_dir_name,
96 commit_id, write_metadata=True, extra_metadata=None, cache_config=None):
96 commit_id, write_metadata=True, extra_metadata=None, cache_config=None):
97 """
97 """
98 Function that would store generate archive and send it to a dedicated backend store
98 Function that would store generate archive and send it to a dedicated backend store
99 In here we use diskcache
99 In here we use diskcache
100
100
101 :param node_walker: a generator returning nodes to add to archive
101 :param node_walker: a generator returning nodes to add to archive
102 :param archive_key: key used to store the path
102 :param archive_key: key used to store the path
103 :param kind: archive kind
103 :param kind: archive kind
104 :param mtime: time of creation
104 :param mtime: time of creation
105 :param archive_at_path: default '/' the path at archive was started.
105 :param archive_at_path: default '/' the path at archive was started.
106 If this is not '/' it means it's a partial archive
106 If this is not '/' it means it's a partial archive
107 :param archive_dir_name: inside dir name when creating an archive
107 :param archive_dir_name: inside dir name when creating an archive
108 :param commit_id: commit sha of revision archive was created at
108 :param commit_id: commit sha of revision archive was created at
109 :param write_metadata:
109 :param write_metadata:
110 :param extra_metadata:
110 :param extra_metadata:
111 :param cache_config:
111 :param cache_config:
112
112
113 walker should be a file walker, for example,
113 walker should be a file walker, for example,
114 def node_walker():
114 def node_walker():
115 for file_info in files:
115 for file_info in files:
116 yield ArchiveNode(fn, mode, is_link, ctx[fn].data)
116 yield ArchiveNode(fn, mode, is_link, ctx[fn].data)
117 """
117 """
118 extra_metadata = extra_metadata or {}
118 extra_metadata = extra_metadata or {}
119
119
120 d_cache = get_archival_cache_store(config=cache_config)
120 d_cache = get_archival_cache_store(config=cache_config)
121
121
122 if archive_key in d_cache:
122 if archive_key in d_cache:
123 with d_cache as d_cache_reader:
123 reader, metadata = d_cache.fetch(archive_key)
124 reader, tag = d_cache_reader.get(archive_key, read=True, tag=True, retry=True)
125 return reader.name
124 return reader.name
126
125
127 archive_tmp_path = safe_bytes(tempfile.mkstemp()[1])
126 archive_tmp_path = safe_bytes(tempfile.mkstemp()[1])
128 log.debug('Creating new temp archive in %s', archive_tmp_path)
127 log.debug('Creating new temp archive in %s', archive_tmp_path)
129
128
130 if kind == "tgz":
129 if kind == "tgz":
131 archiver = archival.tarit(archive_tmp_path, mtime, b"gz")
130 archiver = archival.tarit(archive_tmp_path, mtime, b"gz")
132 elif kind == "tbz2":
131 elif kind == "tbz2":
133 archiver = archival.tarit(archive_tmp_path, mtime, b"bz2")
132 archiver = archival.tarit(archive_tmp_path, mtime, b"bz2")
134 elif kind == 'zip':
133 elif kind == 'zip':
135 archiver = archival.zipit(archive_tmp_path, mtime)
134 archiver = archival.zipit(archive_tmp_path, mtime)
136 else:
135 else:
137 raise exceptions.ArchiveException()(
136 raise exceptions.ArchiveException()(
138 f'Remote does not support: "{kind}" archive type.')
137 f'Remote does not support: "{kind}" archive type.')
139
138
140 for f in node_walker(commit_id, archive_at_path):
139 for f in node_walker(commit_id, archive_at_path):
141 f_path = os.path.join(safe_bytes(archive_dir_name), safe_bytes(f.path).lstrip(b'/'))
140 f_path = os.path.join(safe_bytes(archive_dir_name), safe_bytes(f.path).lstrip(b'/'))
141
142 try:
142 try:
143 archiver.addfile(f_path, f.mode, f.is_link, f.raw_bytes())
143 archiver.addfile(f_path, f.mode, f.is_link, f.raw_bytes())
144 except NoContentException:
144 except NoContentException:
145 # NOTE(marcink): this is a special case for SVN so we can create "empty"
145 # NOTE(marcink): this is a special case for SVN so we can create "empty"
146 # directories which are not supported by archiver
146 # directories which are not supported by archiver
147 archiver.addfile(os.path.join(f_path, b'.dir'), f.mode, f.is_link, b'')
147 archiver.addfile(os.path.join(f_path, b'.dir'), f.mode, f.is_link, b'')
148
148
149 if write_metadata:
150 metadata = dict([
149 metadata = dict([
151 ('commit_id', commit_id),
150 ('commit_id', commit_id),
152 ('mtime', mtime),
151 ('mtime', mtime),
153 ])
152 ])
154 metadata.update(extra_metadata)
153 metadata.update(extra_metadata)
155
154 if write_metadata:
156 meta = [safe_bytes(f"{f_name}:{value}") for f_name, value in metadata.items()]
155 meta = [safe_bytes(f"{f_name}:{value}") for f_name, value in metadata.items()]
157 f_path = os.path.join(safe_bytes(archive_dir_name), b'.archival.txt')
156 f_path = os.path.join(safe_bytes(archive_dir_name), b'.archival.txt')
158 archiver.addfile(f_path, 0o644, False, b'\n'.join(meta))
157 archiver.addfile(f_path, 0o644, False, b'\n'.join(meta))
159
158
160 archiver.done()
159 archiver.done()
161
160
162 # ensure set & get are atomic
163 with d_cache.transact():
164
165 with open(archive_tmp_path, 'rb') as archive_file:
161 with open(archive_tmp_path, 'rb') as archive_file:
166 add_result = d_cache.set(archive_key, archive_file, read=True, tag='db-name', retry=True)
162 add_result = d_cache.store(archive_key, archive_file, metadata=metadata)
167 if not add_result:
163 if not add_result:
168 log.error('Failed to store cache for key=%s', archive_key)
164 log.error('Failed to store cache for key=%s', archive_key)
169
165
170 os.remove(archive_tmp_path)
166 os.remove(archive_tmp_path)
171
167
172 reader, tag = d_cache.get(archive_key, read=True, tag=True, retry=True)
168 reader, metadata = d_cache.fetch(archive_key)
173 if not reader:
174 raise AssertionError(f'empty reader on key={archive_key} added={add_result}')
175
169
176 return reader.name
170 return reader.name
177
171
178
172
179 class BinaryEnvelope:
173 class BinaryEnvelope:
180 def __init__(self, val):
174 def __init__(self, val):
181 self.val = val
175 self.val = val
182
176
183
177
184 class BytesEnvelope(bytes):
178 class BytesEnvelope(bytes):
185 def __new__(cls, content):
179 def __new__(cls, content):
186 if isinstance(content, bytes):
180 if isinstance(content, bytes):
187 return super().__new__(cls, content)
181 return super().__new__(cls, content)
188 else:
182 else:
189 raise TypeError('BytesEnvelope content= param must be bytes. Use BinaryEnvelope to wrap other types')
183 raise TypeError('BytesEnvelope content= param must be bytes. Use BinaryEnvelope to wrap other types')
190
184
191
185
192 class BinaryBytesEnvelope(BytesEnvelope):
186 class BinaryBytesEnvelope(BytesEnvelope):
193 pass
187 pass
1 NO CONTENT: file was removed
NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now