##// END OF EJS Templates
archive-caches: refactor and use cachedir based archive generation
super-admin -
r1122:44baf3db python3
parent child Browse files
Show More
@@ -1,178 +1,195 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2020 RhodeCode GmbH
2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 import os
17 import os
18 import sys
18 import sys
19 import tempfile
19 import traceback
20 import traceback
20 import logging
21 import logging
21 import urllib.parse
22 import urllib.parse
22
23
24 from vcsserver.lib.rc_cache.archive_cache import get_archival_cache_store
23 from vcsserver.lib.rc_cache import region_meta
25 from vcsserver.lib.rc_cache import region_meta
24
26
25 from vcsserver import exceptions
27 from vcsserver import exceptions
26 from vcsserver.exceptions import NoContentException
28 from vcsserver.exceptions import NoContentException
27 from vcsserver.hgcompat import archival
29 from vcsserver.hgcompat import archival
28 from vcsserver.str_utils import safe_bytes
30 from vcsserver.str_utils import safe_bytes
29
31
30 log = logging.getLogger(__name__)
32 log = logging.getLogger(__name__)
31
33
32
34
33 class RepoFactory(object):
35 class RepoFactory(object):
34 """
36 """
35 Utility to create instances of repository
37 Utility to create instances of repository
36
38
37 It provides internal caching of the `repo` object based on
39 It provides internal caching of the `repo` object based on
38 the :term:`call context`.
40 the :term:`call context`.
39 """
41 """
40 repo_type = None
42 repo_type = None
41
43
42 def __init__(self):
44 def __init__(self):
43 self._cache_region = region_meta.dogpile_cache_regions['repo_object']
45 self._cache_region = region_meta.dogpile_cache_regions['repo_object']
44
46
45 def _create_config(self, path, config):
47 def _create_config(self, path, config):
46 config = {}
48 config = {}
47 return config
49 return config
48
50
49 def _create_repo(self, wire, create):
51 def _create_repo(self, wire, create):
50 raise NotImplementedError()
52 raise NotImplementedError()
51
53
52 def repo(self, wire, create=False):
54 def repo(self, wire, create=False):
53 raise NotImplementedError()
55 raise NotImplementedError()
54
56
55
57
56 def obfuscate_qs(query_string):
58 def obfuscate_qs(query_string):
57 if query_string is None:
59 if query_string is None:
58 return None
60 return None
59
61
60 parsed = []
62 parsed = []
61 for k, v in urllib.parse.parse_qsl(query_string, keep_blank_values=True):
63 for k, v in urllib.parse.parse_qsl(query_string, keep_blank_values=True):
62 if k in ['auth_token', 'api_key']:
64 if k in ['auth_token', 'api_key']:
63 v = "*****"
65 v = "*****"
64 parsed.append((k, v))
66 parsed.append((k, v))
65
67
66 return '&'.join('{}{}'.format(
68 return '&'.join('{}{}'.format(
67 k, f'={v}' if v else '') for k, v in parsed)
69 k, f'={v}' if v else '') for k, v in parsed)
68
70
69
71
70 def raise_from_original(new_type, org_exc: Exception):
72 def raise_from_original(new_type, org_exc: Exception):
71 """
73 """
72 Raise a new exception type with original args and traceback.
74 Raise a new exception type with original args and traceback.
73 """
75 """
74
76
75 exc_type, exc_value, exc_traceback = sys.exc_info()
77 exc_type, exc_value, exc_traceback = sys.exc_info()
76 new_exc = new_type(*exc_value.args)
78 new_exc = new_type(*exc_value.args)
77
79
78 # store the original traceback into the new exc
80 # store the original traceback into the new exc
79 new_exc._org_exc_tb = traceback.format_tb(exc_traceback)
81 new_exc._org_exc_tb = traceback.format_tb(exc_traceback)
80
82
81 try:
83 try:
82 raise new_exc.with_traceback(exc_traceback)
84 raise new_exc.with_traceback(exc_traceback)
83 finally:
85 finally:
84 del exc_traceback
86 del exc_traceback
85
87
86
88
89
87 class ArchiveNode(object):
90 class ArchiveNode(object):
88 def __init__(self, path, mode, is_link, raw_bytes):
91 def __init__(self, path, mode, is_link, raw_bytes):
89 self.path = path
92 self.path = path
90 self.mode = mode
93 self.mode = mode
91 self.is_link = is_link
94 self.is_link = is_link
92 self.raw_bytes = raw_bytes
95 self.raw_bytes = raw_bytes
93
96
94
97
95 def archive_repo(walker, archive_dest_path, kind, mtime, archive_at_path,
98 def store_archive_in_cache(node_walker, archive_key, kind, mtime, archive_at_path, archive_dir_name,
96 archive_dir_name, commit_id, write_metadata=True, extra_metadata=None):
99 commit_id, write_metadata=True, extra_metadata=None, cache_config=None):
97 """
100 """
101 Function that would store an generate archive and send it to a dedicated backend store
102 In here we use diskcache
103
104 :param node_walker: a generator returning nodes to add to archive
105 :param archive_key: key used to store the path
106 :param kind: archive kind
107 :param mtime: time of creation
108 :param archive_at_path: default '/' the path at archive was started. if this is not '/' it means it's a partial archive
109 :param archive_dir_name: inside dir name when creating an archive
110 :param commit_id: commit sha of revision archive was created at
111 :param write_metadata:
112 :param extra_metadata:
113 :param cache_config:
114
98 walker should be a file walker, for example:
115 walker should be a file walker, for example:
99 def walker():
116 def node_walker():
100 for file_info in files:
117 for file_info in files:
101 yield ArchiveNode(fn, mode, is_link, ctx[fn].data)
118 yield ArchiveNode(fn, mode, is_link, ctx[fn].data)
102 """
119 """
103 extra_metadata = extra_metadata or {}
120 extra_metadata = extra_metadata or {}
104 archive_dest_path = safe_bytes(archive_dest_path)
121
122 d_cache = get_archival_cache_store(config=cache_config)
123
124 if archive_key in d_cache:
125 with d_cache as d_cache_reader:
126 reader, tag = d_cache_reader.get(archive_key, read=True, tag=True, retry=True)
127 return reader.name
128
129 archive_tmp_path = safe_bytes(tempfile.mkstemp()[1])
130 log.debug('Creating new temp archive in %s', archive_tmp_path)
105
131
106 if kind == "tgz":
132 if kind == "tgz":
107 archiver = archival.tarit(archive_dest_path, mtime, b"gz")
133 archiver = archival.tarit(archive_tmp_path, mtime, b"gz")
108 elif kind == "tbz2":
134 elif kind == "tbz2":
109 archiver = archival.tarit(archive_dest_path, mtime, b"bz2")
135 archiver = archival.tarit(archive_tmp_path, mtime, b"bz2")
110 elif kind == 'zip':
136 elif kind == 'zip':
111 archiver = archival.zipit(archive_dest_path, mtime)
137 archiver = archival.zipit(archive_tmp_path, mtime)
112 else:
138 else:
113 raise exceptions.ArchiveException()(
139 raise exceptions.ArchiveException()(
114 f'Remote does not support: "{kind}" archive type.')
140 f'Remote does not support: "{kind}" archive type.')
115
141
116 for f in walker(commit_id, archive_at_path):
142 for f in node_walker(commit_id, archive_at_path):
117 f_path = os.path.join(safe_bytes(archive_dir_name), safe_bytes(f.path).lstrip(b'/'))
143 f_path = os.path.join(safe_bytes(archive_dir_name), safe_bytes(f.path).lstrip(b'/'))
118 try:
144 try:
119 archiver.addfile(f_path, f.mode, f.is_link, f.raw_bytes())
145 archiver.addfile(f_path, f.mode, f.is_link, f.raw_bytes())
120 except NoContentException:
146 except NoContentException:
121 # NOTE(marcink): this is a special case for SVN so we can create "empty"
147 # NOTE(marcink): this is a special case for SVN so we can create "empty"
122 # directories which arent supported by archiver
148 # directories which arent supported by archiver
123 archiver.addfile(os.path.join(f_path, b'.dir'), f.mode, f.is_link, b'')
149 archiver.addfile(os.path.join(f_path, b'.dir'), f.mode, f.is_link, b'')
124
150
125 if write_metadata:
151 if write_metadata:
126 metadata = dict([
152 metadata = dict([
127 ('commit_id', commit_id),
153 ('commit_id', commit_id),
128 ('mtime', mtime),
154 ('mtime', mtime),
129 ])
155 ])
130 metadata.update(extra_metadata)
156 metadata.update(extra_metadata)
131
157
132 meta = [safe_bytes(f"{f_name}:{value}") for f_name, value in metadata.items()]
158 meta = [safe_bytes(f"{f_name}:{value}") for f_name, value in metadata.items()]
133 f_path = os.path.join(safe_bytes(archive_dir_name), b'.archival.txt')
159 f_path = os.path.join(safe_bytes(archive_dir_name), b'.archival.txt')
134 archiver.addfile(f_path, 0o644, False, b'\n'.join(meta))
160 archiver.addfile(f_path, 0o644, False, b'\n'.join(meta))
135
161
136 return archiver.done()
162 archiver.done()
163
164 # ensure set & get are atomic
165 with d_cache.transact():
166
167 with open(archive_tmp_path, 'rb') as archive_file:
168 add_result = d_cache.set(archive_key, archive_file, read=True, tag='db-name', retry=True)
169 if not add_result:
170 log.error('Failed to store cache for key=%s', archive_key)
171
172 os.remove(archive_tmp_path)
173
174 reader, tag = d_cache.get(archive_key, read=True, tag=True, retry=True)
175 if not reader:
176 raise AssertionError(f'empty reader on key={archive_key} added={add_result}')
177
178 return reader.name
137
179
138
180
139 class BinaryEnvelope(object):
181 class BinaryEnvelope(object):
140 def __init__(self, value: bytes, bin_type=True):
182 def __init__(self, val):
141 self.value = value
183 self.val = val
142 self.bin_type = bin_type
143
144 def __len__(self):
145 return len(self.value)
146
147 def __getitem__(self, index):
148 return self.value[index]
149
150 def __iter__(self):
151 return iter(self.value)
152
153 def __str__(self):
154 return str(self.value)
155
156 def __repr__(self):
157 return repr(self.value)
158
159 def __eq__(self, other):
160 if isinstance(other, BinaryEnvelope):
161 return self.value == other.value
162 return False
163
164 def __ne__(self, other):
165 return not self.__eq__(other)
166
167 def __add__(self, other):
168 if isinstance(other, BinaryEnvelope):
169 return BinaryEnvelope(self.value + other.value)
170 raise TypeError(f"unsupported operand type(s) for +: 'BinaryEnvelope' and '{type(other)}'")
171
172 def __radd__(self, other):
173 if isinstance(other, BinaryEnvelope):
174 return BinaryEnvelope(other.value + self.value)
175 raise TypeError(f"unsupported operand type(s) for +: '{type(other)}' and 'BinaryEnvelope'")
176
184
177
185
186 class BytesEnvelope(bytes):
187 def __new__(cls, content):
188 if isinstance(content, bytes):
189 return super().__new__(cls, content)
190 else:
191 raise TypeError('Content must be bytes.')
178
192
193
194 class BinaryBytesEnvelope(BytesEnvelope):
195 pass
General Comments 0
You need to be logged in to leave comments. Login now