##// END OF EJS Templates
archive-caches: refactor and use cachedir based archive generation
super-admin -
r1122:44baf3db python3
parent child Browse files
Show More
@@ -1,178 +1,195 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17 import os
18 18 import sys
19 import tempfile
19 20 import traceback
20 21 import logging
21 22 import urllib.parse
22 23
24 from vcsserver.lib.rc_cache.archive_cache import get_archival_cache_store
23 25 from vcsserver.lib.rc_cache import region_meta
24 26
25 27 from vcsserver import exceptions
26 28 from vcsserver.exceptions import NoContentException
27 29 from vcsserver.hgcompat import archival
28 30 from vcsserver.str_utils import safe_bytes
29 31
30 32 log = logging.getLogger(__name__)
31 33
32 34
33 35 class RepoFactory(object):
34 36 """
35 37 Utility to create instances of repository
36 38
37 39 It provides internal caching of the `repo` object based on
38 40 the :term:`call context`.
39 41 """
40 42 repo_type = None
41 43
42 44 def __init__(self):
43 45 self._cache_region = region_meta.dogpile_cache_regions['repo_object']
44 46
45 47 def _create_config(self, path, config):
46 48 config = {}
47 49 return config
48 50
49 51 def _create_repo(self, wire, create):
50 52 raise NotImplementedError()
51 53
52 54 def repo(self, wire, create=False):
53 55 raise NotImplementedError()
54 56
55 57
56 58 def obfuscate_qs(query_string):
57 59 if query_string is None:
58 60 return None
59 61
60 62 parsed = []
61 63 for k, v in urllib.parse.parse_qsl(query_string, keep_blank_values=True):
62 64 if k in ['auth_token', 'api_key']:
63 65 v = "*****"
64 66 parsed.append((k, v))
65 67
66 68 return '&'.join('{}{}'.format(
67 69 k, f'={v}' if v else '') for k, v in parsed)
68 70
69 71
70 72 def raise_from_original(new_type, org_exc: Exception):
71 73 """
72 74 Raise a new exception type with original args and traceback.
73 75 """
74 76
75 77 exc_type, exc_value, exc_traceback = sys.exc_info()
76 78 new_exc = new_type(*exc_value.args)
77 79
78 80 # store the original traceback into the new exc
79 81 new_exc._org_exc_tb = traceback.format_tb(exc_traceback)
80 82
81 83 try:
82 84 raise new_exc.with_traceback(exc_traceback)
83 85 finally:
84 86 del exc_traceback
85 87
86 88
89
87 90 class ArchiveNode(object):
88 91 def __init__(self, path, mode, is_link, raw_bytes):
89 92 self.path = path
90 93 self.mode = mode
91 94 self.is_link = is_link
92 95 self.raw_bytes = raw_bytes
93 96
94 97
95 def archive_repo(walker, archive_dest_path, kind, mtime, archive_at_path,
96 archive_dir_name, commit_id, write_metadata=True, extra_metadata=None):
98 def store_archive_in_cache(node_walker, archive_key, kind, mtime, archive_at_path, archive_dir_name,
99 commit_id, write_metadata=True, extra_metadata=None, cache_config=None):
97 100 """
101 Function that would store an generate archive and send it to a dedicated backend store
102 In here we use diskcache
103
104 :param node_walker: a generator returning nodes to add to archive
105 :param archive_key: key used to store the path
106 :param kind: archive kind
107 :param mtime: time of creation
108 :param archive_at_path: default '/' the path at archive was started. if this is not '/' it means it's a partial archive
109 :param archive_dir_name: inside dir name when creating an archive
110 :param commit_id: commit sha of revision archive was created at
111 :param write_metadata:
112 :param extra_metadata:
113 :param cache_config:
114
98 115 walker should be a file walker, for example:
99 def walker():
116 def node_walker():
100 117 for file_info in files:
101 118 yield ArchiveNode(fn, mode, is_link, ctx[fn].data)
102 119 """
103 120 extra_metadata = extra_metadata or {}
104 archive_dest_path = safe_bytes(archive_dest_path)
121
122 d_cache = get_archival_cache_store(config=cache_config)
123
124 if archive_key in d_cache:
125 with d_cache as d_cache_reader:
126 reader, tag = d_cache_reader.get(archive_key, read=True, tag=True, retry=True)
127 return reader.name
128
129 archive_tmp_path = safe_bytes(tempfile.mkstemp()[1])
130 log.debug('Creating new temp archive in %s', archive_tmp_path)
105 131
106 132 if kind == "tgz":
107 archiver = archival.tarit(archive_dest_path, mtime, b"gz")
133 archiver = archival.tarit(archive_tmp_path, mtime, b"gz")
108 134 elif kind == "tbz2":
109 archiver = archival.tarit(archive_dest_path, mtime, b"bz2")
135 archiver = archival.tarit(archive_tmp_path, mtime, b"bz2")
110 136 elif kind == 'zip':
111 archiver = archival.zipit(archive_dest_path, mtime)
137 archiver = archival.zipit(archive_tmp_path, mtime)
112 138 else:
113 139 raise exceptions.ArchiveException()(
114 140 f'Remote does not support: "{kind}" archive type.')
115 141
116 for f in walker(commit_id, archive_at_path):
142 for f in node_walker(commit_id, archive_at_path):
117 143 f_path = os.path.join(safe_bytes(archive_dir_name), safe_bytes(f.path).lstrip(b'/'))
118 144 try:
119 145 archiver.addfile(f_path, f.mode, f.is_link, f.raw_bytes())
120 146 except NoContentException:
121 147 # NOTE(marcink): this is a special case for SVN so we can create "empty"
122 148 # directories which arent supported by archiver
123 149 archiver.addfile(os.path.join(f_path, b'.dir'), f.mode, f.is_link, b'')
124 150
125 151 if write_metadata:
126 152 metadata = dict([
127 153 ('commit_id', commit_id),
128 154 ('mtime', mtime),
129 155 ])
130 156 metadata.update(extra_metadata)
131 157
132 158 meta = [safe_bytes(f"{f_name}:{value}") for f_name, value in metadata.items()]
133 159 f_path = os.path.join(safe_bytes(archive_dir_name), b'.archival.txt')
134 160 archiver.addfile(f_path, 0o644, False, b'\n'.join(meta))
135 161
136 return archiver.done()
162 archiver.done()
163
164 # ensure set & get are atomic
165 with d_cache.transact():
166
167 with open(archive_tmp_path, 'rb') as archive_file:
168 add_result = d_cache.set(archive_key, archive_file, read=True, tag='db-name', retry=True)
169 if not add_result:
170 log.error('Failed to store cache for key=%s', archive_key)
171
172 os.remove(archive_tmp_path)
173
174 reader, tag = d_cache.get(archive_key, read=True, tag=True, retry=True)
175 if not reader:
176 raise AssertionError(f'empty reader on key={archive_key} added={add_result}')
177
178 return reader.name
137 179
138 180
139 181 class BinaryEnvelope(object):
140 def __init__(self, value: bytes, bin_type=True):
141 self.value = value
142 self.bin_type = bin_type
143
144 def __len__(self):
145 return len(self.value)
146
147 def __getitem__(self, index):
148 return self.value[index]
149
150 def __iter__(self):
151 return iter(self.value)
152
153 def __str__(self):
154 return str(self.value)
155
156 def __repr__(self):
157 return repr(self.value)
158
159 def __eq__(self, other):
160 if isinstance(other, BinaryEnvelope):
161 return self.value == other.value
162 return False
163
164 def __ne__(self, other):
165 return not self.__eq__(other)
166
167 def __add__(self, other):
168 if isinstance(other, BinaryEnvelope):
169 return BinaryEnvelope(self.value + other.value)
170 raise TypeError(f"unsupported operand type(s) for +: 'BinaryEnvelope' and '{type(other)}'")
171
172 def __radd__(self, other):
173 if isinstance(other, BinaryEnvelope):
174 return BinaryEnvelope(other.value + self.value)
175 raise TypeError(f"unsupported operand type(s) for +: '{type(other)}' and 'BinaryEnvelope'")
182 def __init__(self, val):
183 self.val = val
176 184
177 185
186 class BytesEnvelope(bytes):
187 def __new__(cls, content):
188 if isinstance(content, bytes):
189 return super().__new__(cls, content)
190 else:
191 raise TypeError('Content must be bytes.')
178 192
193
194 class BinaryBytesEnvelope(BytesEnvelope):
195 pass
General Comments 0
You need to be logged in to leave comments. Login now