##// END OF EJS Templates
stream-clone: add a explicit set list requirements relevant to stream clone...
marmoute -
r49447:baddab22 default
parent child Browse files
Show More
@@ -1,427 +1,427
1 # bundlecaches.py - utility to deal with pre-computed bundle for servers
1 # bundlecaches.py - utility to deal with pre-computed bundle for servers
2 #
2 #
3 # This software may be used and distributed according to the terms of the
3 # This software may be used and distributed according to the terms of the
4 # GNU General Public License version 2 or any later version.
4 # GNU General Public License version 2 or any later version.
5
5
6 from .i18n import _
6 from .i18n import _
7
7
8 from .thirdparty import attr
8 from .thirdparty import attr
9
9
10 from . import (
10 from . import (
11 error,
11 error,
12 requirements as requirementsmod,
12 requirements as requirementsmod,
13 sslutil,
13 sslutil,
14 util,
14 util,
15 )
15 )
16 from .utils import stringutil
16 from .utils import stringutil
17
17
18 urlreq = util.urlreq
18 urlreq = util.urlreq
19
19
20 CB_MANIFEST_FILE = b'clonebundles.manifest'
20 CB_MANIFEST_FILE = b'clonebundles.manifest'
21
21
22
22
23 @attr.s
23 @attr.s
24 class bundlespec(object):
24 class bundlespec(object):
25 compression = attr.ib()
25 compression = attr.ib()
26 wirecompression = attr.ib()
26 wirecompression = attr.ib()
27 version = attr.ib()
27 version = attr.ib()
28 wireversion = attr.ib()
28 wireversion = attr.ib()
29 params = attr.ib()
29 params = attr.ib()
30 contentopts = attr.ib()
30 contentopts = attr.ib()
31
31
32
32
33 # Maps bundle version human names to changegroup versions.
33 # Maps bundle version human names to changegroup versions.
34 _bundlespeccgversions = {
34 _bundlespeccgversions = {
35 b'v1': b'01',
35 b'v1': b'01',
36 b'v2': b'02',
36 b'v2': b'02',
37 b'packed1': b's1',
37 b'packed1': b's1',
38 b'bundle2': b'02', # legacy
38 b'bundle2': b'02', # legacy
39 }
39 }
40
40
41 # Maps bundle version with content opts to choose which part to bundle
41 # Maps bundle version with content opts to choose which part to bundle
42 _bundlespeccontentopts = {
42 _bundlespeccontentopts = {
43 b'v1': {
43 b'v1': {
44 b'changegroup': True,
44 b'changegroup': True,
45 b'cg.version': b'01',
45 b'cg.version': b'01',
46 b'obsolescence': False,
46 b'obsolescence': False,
47 b'phases': False,
47 b'phases': False,
48 b'tagsfnodescache': False,
48 b'tagsfnodescache': False,
49 b'revbranchcache': False,
49 b'revbranchcache': False,
50 },
50 },
51 b'v2': {
51 b'v2': {
52 b'changegroup': True,
52 b'changegroup': True,
53 b'cg.version': b'02',
53 b'cg.version': b'02',
54 b'obsolescence': False,
54 b'obsolescence': False,
55 b'phases': False,
55 b'phases': False,
56 b'tagsfnodescache': True,
56 b'tagsfnodescache': True,
57 b'revbranchcache': True,
57 b'revbranchcache': True,
58 },
58 },
59 b'packed1': {b'cg.version': b's1'},
59 b'packed1': {b'cg.version': b's1'},
60 }
60 }
61 _bundlespeccontentopts[b'bundle2'] = _bundlespeccontentopts[b'v2']
61 _bundlespeccontentopts[b'bundle2'] = _bundlespeccontentopts[b'v2']
62
62
63 _bundlespecvariants = {
63 _bundlespecvariants = {
64 b"streamv2": {
64 b"streamv2": {
65 b"changegroup": False,
65 b"changegroup": False,
66 b"streamv2": True,
66 b"streamv2": True,
67 b"tagsfnodescache": False,
67 b"tagsfnodescache": False,
68 b"revbranchcache": False,
68 b"revbranchcache": False,
69 }
69 }
70 }
70 }
71
71
72 # Compression engines allowed in version 1. THIS SHOULD NEVER CHANGE.
72 # Compression engines allowed in version 1. THIS SHOULD NEVER CHANGE.
73 _bundlespecv1compengines = {b'gzip', b'bzip2', b'none'}
73 _bundlespecv1compengines = {b'gzip', b'bzip2', b'none'}
74
74
75
75
76 def parsebundlespec(repo, spec, strict=True):
76 def parsebundlespec(repo, spec, strict=True):
77 """Parse a bundle string specification into parts.
77 """Parse a bundle string specification into parts.
78
78
79 Bundle specifications denote a well-defined bundle/exchange format.
79 Bundle specifications denote a well-defined bundle/exchange format.
80 The content of a given specification should not change over time in
80 The content of a given specification should not change over time in
81 order to ensure that bundles produced by a newer version of Mercurial are
81 order to ensure that bundles produced by a newer version of Mercurial are
82 readable from an older version.
82 readable from an older version.
83
83
84 The string currently has the form:
84 The string currently has the form:
85
85
86 <compression>-<type>[;<parameter0>[;<parameter1>]]
86 <compression>-<type>[;<parameter0>[;<parameter1>]]
87
87
88 Where <compression> is one of the supported compression formats
88 Where <compression> is one of the supported compression formats
89 and <type> is (currently) a version string. A ";" can follow the type and
89 and <type> is (currently) a version string. A ";" can follow the type and
90 all text afterwards is interpreted as URI encoded, ";" delimited key=value
90 all text afterwards is interpreted as URI encoded, ";" delimited key=value
91 pairs.
91 pairs.
92
92
93 If ``strict`` is True (the default) <compression> is required. Otherwise,
93 If ``strict`` is True (the default) <compression> is required. Otherwise,
94 it is optional.
94 it is optional.
95
95
96 Returns a bundlespec object of (compression, version, parameters).
96 Returns a bundlespec object of (compression, version, parameters).
97 Compression will be ``None`` if not in strict mode and a compression isn't
97 Compression will be ``None`` if not in strict mode and a compression isn't
98 defined.
98 defined.
99
99
100 An ``InvalidBundleSpecification`` is raised when the specification is
100 An ``InvalidBundleSpecification`` is raised when the specification is
101 not syntactically well formed.
101 not syntactically well formed.
102
102
103 An ``UnsupportedBundleSpecification`` is raised when the compression or
103 An ``UnsupportedBundleSpecification`` is raised when the compression or
104 bundle type/version is not recognized.
104 bundle type/version is not recognized.
105
105
106 Note: this function will likely eventually return a more complex data
106 Note: this function will likely eventually return a more complex data
107 structure, including bundle2 part information.
107 structure, including bundle2 part information.
108 """
108 """
109
109
110 def parseparams(s):
110 def parseparams(s):
111 if b';' not in s:
111 if b';' not in s:
112 return s, {}
112 return s, {}
113
113
114 params = {}
114 params = {}
115 version, paramstr = s.split(b';', 1)
115 version, paramstr = s.split(b';', 1)
116
116
117 for p in paramstr.split(b';'):
117 for p in paramstr.split(b';'):
118 if b'=' not in p:
118 if b'=' not in p:
119 raise error.InvalidBundleSpecification(
119 raise error.InvalidBundleSpecification(
120 _(
120 _(
121 b'invalid bundle specification: '
121 b'invalid bundle specification: '
122 b'missing "=" in parameter: %s'
122 b'missing "=" in parameter: %s'
123 )
123 )
124 % p
124 % p
125 )
125 )
126
126
127 key, value = p.split(b'=', 1)
127 key, value = p.split(b'=', 1)
128 key = urlreq.unquote(key)
128 key = urlreq.unquote(key)
129 value = urlreq.unquote(value)
129 value = urlreq.unquote(value)
130 params[key] = value
130 params[key] = value
131
131
132 return version, params
132 return version, params
133
133
134 if strict and b'-' not in spec:
134 if strict and b'-' not in spec:
135 raise error.InvalidBundleSpecification(
135 raise error.InvalidBundleSpecification(
136 _(
136 _(
137 b'invalid bundle specification; '
137 b'invalid bundle specification; '
138 b'must be prefixed with compression: %s'
138 b'must be prefixed with compression: %s'
139 )
139 )
140 % spec
140 % spec
141 )
141 )
142
142
143 if b'-' in spec:
143 if b'-' in spec:
144 compression, version = spec.split(b'-', 1)
144 compression, version = spec.split(b'-', 1)
145
145
146 if compression not in util.compengines.supportedbundlenames:
146 if compression not in util.compengines.supportedbundlenames:
147 raise error.UnsupportedBundleSpecification(
147 raise error.UnsupportedBundleSpecification(
148 _(b'%s compression is not supported') % compression
148 _(b'%s compression is not supported') % compression
149 )
149 )
150
150
151 version, params = parseparams(version)
151 version, params = parseparams(version)
152
152
153 if version not in _bundlespeccgversions:
153 if version not in _bundlespeccgversions:
154 raise error.UnsupportedBundleSpecification(
154 raise error.UnsupportedBundleSpecification(
155 _(b'%s is not a recognized bundle version') % version
155 _(b'%s is not a recognized bundle version') % version
156 )
156 )
157 else:
157 else:
158 # Value could be just the compression or just the version, in which
158 # Value could be just the compression or just the version, in which
159 # case some defaults are assumed (but only when not in strict mode).
159 # case some defaults are assumed (but only when not in strict mode).
160 assert not strict
160 assert not strict
161
161
162 spec, params = parseparams(spec)
162 spec, params = parseparams(spec)
163
163
164 if spec in util.compengines.supportedbundlenames:
164 if spec in util.compengines.supportedbundlenames:
165 compression = spec
165 compression = spec
166 version = b'v1'
166 version = b'v1'
167 # Generaldelta repos require v2.
167 # Generaldelta repos require v2.
168 if requirementsmod.GENERALDELTA_REQUIREMENT in repo.requirements:
168 if requirementsmod.GENERALDELTA_REQUIREMENT in repo.requirements:
169 version = b'v2'
169 version = b'v2'
170 elif requirementsmod.REVLOGV2_REQUIREMENT in repo.requirements:
170 elif requirementsmod.REVLOGV2_REQUIREMENT in repo.requirements:
171 version = b'v2'
171 version = b'v2'
172 # Modern compression engines require v2.
172 # Modern compression engines require v2.
173 if compression not in _bundlespecv1compengines:
173 if compression not in _bundlespecv1compengines:
174 version = b'v2'
174 version = b'v2'
175 elif spec in _bundlespeccgversions:
175 elif spec in _bundlespeccgversions:
176 if spec == b'packed1':
176 if spec == b'packed1':
177 compression = b'none'
177 compression = b'none'
178 else:
178 else:
179 compression = b'bzip2'
179 compression = b'bzip2'
180 version = spec
180 version = spec
181 else:
181 else:
182 raise error.UnsupportedBundleSpecification(
182 raise error.UnsupportedBundleSpecification(
183 _(b'%s is not a recognized bundle specification') % spec
183 _(b'%s is not a recognized bundle specification') % spec
184 )
184 )
185
185
186 # Bundle version 1 only supports a known set of compression engines.
186 # Bundle version 1 only supports a known set of compression engines.
187 if version == b'v1' and compression not in _bundlespecv1compengines:
187 if version == b'v1' and compression not in _bundlespecv1compengines:
188 raise error.UnsupportedBundleSpecification(
188 raise error.UnsupportedBundleSpecification(
189 _(b'compression engine %s is not supported on v1 bundles')
189 _(b'compression engine %s is not supported on v1 bundles')
190 % compression
190 % compression
191 )
191 )
192
192
193 # The specification for packed1 can optionally declare the data formats
193 # The specification for packed1 can optionally declare the data formats
194 # required to apply it. If we see this metadata, compare against what the
194 # required to apply it. If we see this metadata, compare against what the
195 # repo supports and error if the bundle isn't compatible.
195 # repo supports and error if the bundle isn't compatible.
196 if version == b'packed1' and b'requirements' in params:
196 if version == b'packed1' and b'requirements' in params:
197 requirements = set(params[b'requirements'].split(b','))
197 requirements = set(params[b'requirements'].split(b','))
198 missingreqs = requirements - repo.supportedformats
198 missingreqs = requirements - requirementsmod.STREAM_FIXED_REQUIREMENTS
199 if missingreqs:
199 if missingreqs:
200 raise error.UnsupportedBundleSpecification(
200 raise error.UnsupportedBundleSpecification(
201 _(b'missing support for repository features: %s')
201 _(b'missing support for repository features: %s')
202 % b', '.join(sorted(missingreqs))
202 % b', '.join(sorted(missingreqs))
203 )
203 )
204
204
205 # Compute contentopts based on the version
205 # Compute contentopts based on the version
206 contentopts = _bundlespeccontentopts.get(version, {}).copy()
206 contentopts = _bundlespeccontentopts.get(version, {}).copy()
207
207
208 # Process the variants
208 # Process the variants
209 if b"stream" in params and params[b"stream"] == b"v2":
209 if b"stream" in params and params[b"stream"] == b"v2":
210 variant = _bundlespecvariants[b"streamv2"]
210 variant = _bundlespecvariants[b"streamv2"]
211 contentopts.update(variant)
211 contentopts.update(variant)
212
212
213 engine = util.compengines.forbundlename(compression)
213 engine = util.compengines.forbundlename(compression)
214 compression, wirecompression = engine.bundletype()
214 compression, wirecompression = engine.bundletype()
215 wireversion = _bundlespeccgversions[version]
215 wireversion = _bundlespeccgversions[version]
216
216
217 return bundlespec(
217 return bundlespec(
218 compression, wirecompression, version, wireversion, params, contentopts
218 compression, wirecompression, version, wireversion, params, contentopts
219 )
219 )
220
220
221
221
222 def parseclonebundlesmanifest(repo, s):
222 def parseclonebundlesmanifest(repo, s):
223 """Parses the raw text of a clone bundles manifest.
223 """Parses the raw text of a clone bundles manifest.
224
224
225 Returns a list of dicts. The dicts have a ``URL`` key corresponding
225 Returns a list of dicts. The dicts have a ``URL`` key corresponding
226 to the URL and other keys are the attributes for the entry.
226 to the URL and other keys are the attributes for the entry.
227 """
227 """
228 m = []
228 m = []
229 for line in s.splitlines():
229 for line in s.splitlines():
230 fields = line.split()
230 fields = line.split()
231 if not fields:
231 if not fields:
232 continue
232 continue
233 attrs = {b'URL': fields[0]}
233 attrs = {b'URL': fields[0]}
234 for rawattr in fields[1:]:
234 for rawattr in fields[1:]:
235 key, value = rawattr.split(b'=', 1)
235 key, value = rawattr.split(b'=', 1)
236 key = util.urlreq.unquote(key)
236 key = util.urlreq.unquote(key)
237 value = util.urlreq.unquote(value)
237 value = util.urlreq.unquote(value)
238 attrs[key] = value
238 attrs[key] = value
239
239
240 # Parse BUNDLESPEC into components. This makes client-side
240 # Parse BUNDLESPEC into components. This makes client-side
241 # preferences easier to specify since you can prefer a single
241 # preferences easier to specify since you can prefer a single
242 # component of the BUNDLESPEC.
242 # component of the BUNDLESPEC.
243 if key == b'BUNDLESPEC':
243 if key == b'BUNDLESPEC':
244 try:
244 try:
245 bundlespec = parsebundlespec(repo, value)
245 bundlespec = parsebundlespec(repo, value)
246 attrs[b'COMPRESSION'] = bundlespec.compression
246 attrs[b'COMPRESSION'] = bundlespec.compression
247 attrs[b'VERSION'] = bundlespec.version
247 attrs[b'VERSION'] = bundlespec.version
248 except error.InvalidBundleSpecification:
248 except error.InvalidBundleSpecification:
249 pass
249 pass
250 except error.UnsupportedBundleSpecification:
250 except error.UnsupportedBundleSpecification:
251 pass
251 pass
252
252
253 m.append(attrs)
253 m.append(attrs)
254
254
255 return m
255 return m
256
256
257
257
258 def isstreamclonespec(bundlespec):
258 def isstreamclonespec(bundlespec):
259 # Stream clone v1
259 # Stream clone v1
260 if bundlespec.wirecompression == b'UN' and bundlespec.wireversion == b's1':
260 if bundlespec.wirecompression == b'UN' and bundlespec.wireversion == b's1':
261 return True
261 return True
262
262
263 # Stream clone v2
263 # Stream clone v2
264 if (
264 if (
265 bundlespec.wirecompression == b'UN'
265 bundlespec.wirecompression == b'UN'
266 and bundlespec.wireversion == b'02'
266 and bundlespec.wireversion == b'02'
267 and bundlespec.contentopts.get(b'streamv2')
267 and bundlespec.contentopts.get(b'streamv2')
268 ):
268 ):
269 return True
269 return True
270
270
271 return False
271 return False
272
272
273
273
274 def filterclonebundleentries(repo, entries, streamclonerequested=False):
274 def filterclonebundleentries(repo, entries, streamclonerequested=False):
275 """Remove incompatible clone bundle manifest entries.
275 """Remove incompatible clone bundle manifest entries.
276
276
277 Accepts a list of entries parsed with ``parseclonebundlesmanifest``
277 Accepts a list of entries parsed with ``parseclonebundlesmanifest``
278 and returns a new list consisting of only the entries that this client
278 and returns a new list consisting of only the entries that this client
279 should be able to apply.
279 should be able to apply.
280
280
281 There is no guarantee we'll be able to apply all returned entries because
281 There is no guarantee we'll be able to apply all returned entries because
282 the metadata we use to filter on may be missing or wrong.
282 the metadata we use to filter on may be missing or wrong.
283 """
283 """
284 newentries = []
284 newentries = []
285 for entry in entries:
285 for entry in entries:
286 spec = entry.get(b'BUNDLESPEC')
286 spec = entry.get(b'BUNDLESPEC')
287 if spec:
287 if spec:
288 try:
288 try:
289 bundlespec = parsebundlespec(repo, spec, strict=True)
289 bundlespec = parsebundlespec(repo, spec, strict=True)
290
290
291 # If a stream clone was requested, filter out non-streamclone
291 # If a stream clone was requested, filter out non-streamclone
292 # entries.
292 # entries.
293 if streamclonerequested and not isstreamclonespec(bundlespec):
293 if streamclonerequested and not isstreamclonespec(bundlespec):
294 repo.ui.debug(
294 repo.ui.debug(
295 b'filtering %s because not a stream clone\n'
295 b'filtering %s because not a stream clone\n'
296 % entry[b'URL']
296 % entry[b'URL']
297 )
297 )
298 continue
298 continue
299
299
300 except error.InvalidBundleSpecification as e:
300 except error.InvalidBundleSpecification as e:
301 repo.ui.debug(stringutil.forcebytestr(e) + b'\n')
301 repo.ui.debug(stringutil.forcebytestr(e) + b'\n')
302 continue
302 continue
303 except error.UnsupportedBundleSpecification as e:
303 except error.UnsupportedBundleSpecification as e:
304 repo.ui.debug(
304 repo.ui.debug(
305 b'filtering %s because unsupported bundle '
305 b'filtering %s because unsupported bundle '
306 b'spec: %s\n' % (entry[b'URL'], stringutil.forcebytestr(e))
306 b'spec: %s\n' % (entry[b'URL'], stringutil.forcebytestr(e))
307 )
307 )
308 continue
308 continue
309 # If we don't have a spec and requested a stream clone, we don't know
309 # If we don't have a spec and requested a stream clone, we don't know
310 # what the entry is so don't attempt to apply it.
310 # what the entry is so don't attempt to apply it.
311 elif streamclonerequested:
311 elif streamclonerequested:
312 repo.ui.debug(
312 repo.ui.debug(
313 b'filtering %s because cannot determine if a stream '
313 b'filtering %s because cannot determine if a stream '
314 b'clone bundle\n' % entry[b'URL']
314 b'clone bundle\n' % entry[b'URL']
315 )
315 )
316 continue
316 continue
317
317
318 if b'REQUIRESNI' in entry and not sslutil.hassni:
318 if b'REQUIRESNI' in entry and not sslutil.hassni:
319 repo.ui.debug(
319 repo.ui.debug(
320 b'filtering %s because SNI not supported\n' % entry[b'URL']
320 b'filtering %s because SNI not supported\n' % entry[b'URL']
321 )
321 )
322 continue
322 continue
323
323
324 if b'REQUIREDRAM' in entry:
324 if b'REQUIREDRAM' in entry:
325 try:
325 try:
326 requiredram = util.sizetoint(entry[b'REQUIREDRAM'])
326 requiredram = util.sizetoint(entry[b'REQUIREDRAM'])
327 except error.ParseError:
327 except error.ParseError:
328 repo.ui.debug(
328 repo.ui.debug(
329 b'filtering %s due to a bad REQUIREDRAM attribute\n'
329 b'filtering %s due to a bad REQUIREDRAM attribute\n'
330 % entry[b'URL']
330 % entry[b'URL']
331 )
331 )
332 continue
332 continue
333 actualram = repo.ui.estimatememory()
333 actualram = repo.ui.estimatememory()
334 if actualram is not None and actualram * 0.66 < requiredram:
334 if actualram is not None and actualram * 0.66 < requiredram:
335 repo.ui.debug(
335 repo.ui.debug(
336 b'filtering %s as it needs more than 2/3 of system memory\n'
336 b'filtering %s as it needs more than 2/3 of system memory\n'
337 % entry[b'URL']
337 % entry[b'URL']
338 )
338 )
339 continue
339 continue
340
340
341 newentries.append(entry)
341 newentries.append(entry)
342
342
343 return newentries
343 return newentries
344
344
345
345
346 class clonebundleentry(object):
346 class clonebundleentry(object):
347 """Represents an item in a clone bundles manifest.
347 """Represents an item in a clone bundles manifest.
348
348
349 This rich class is needed to support sorting since sorted() in Python 3
349 This rich class is needed to support sorting since sorted() in Python 3
350 doesn't support ``cmp`` and our comparison is complex enough that ``key=``
350 doesn't support ``cmp`` and our comparison is complex enough that ``key=``
351 won't work.
351 won't work.
352 """
352 """
353
353
354 def __init__(self, value, prefers):
354 def __init__(self, value, prefers):
355 self.value = value
355 self.value = value
356 self.prefers = prefers
356 self.prefers = prefers
357
357
358 def _cmp(self, other):
358 def _cmp(self, other):
359 for prefkey, prefvalue in self.prefers:
359 for prefkey, prefvalue in self.prefers:
360 avalue = self.value.get(prefkey)
360 avalue = self.value.get(prefkey)
361 bvalue = other.value.get(prefkey)
361 bvalue = other.value.get(prefkey)
362
362
363 # Special case for b missing attribute and a matches exactly.
363 # Special case for b missing attribute and a matches exactly.
364 if avalue is not None and bvalue is None and avalue == prefvalue:
364 if avalue is not None and bvalue is None and avalue == prefvalue:
365 return -1
365 return -1
366
366
367 # Special case for a missing attribute and b matches exactly.
367 # Special case for a missing attribute and b matches exactly.
368 if bvalue is not None and avalue is None and bvalue == prefvalue:
368 if bvalue is not None and avalue is None and bvalue == prefvalue:
369 return 1
369 return 1
370
370
371 # We can't compare unless attribute present on both.
371 # We can't compare unless attribute present on both.
372 if avalue is None or bvalue is None:
372 if avalue is None or bvalue is None:
373 continue
373 continue
374
374
375 # Same values should fall back to next attribute.
375 # Same values should fall back to next attribute.
376 if avalue == bvalue:
376 if avalue == bvalue:
377 continue
377 continue
378
378
379 # Exact matches come first.
379 # Exact matches come first.
380 if avalue == prefvalue:
380 if avalue == prefvalue:
381 return -1
381 return -1
382 if bvalue == prefvalue:
382 if bvalue == prefvalue:
383 return 1
383 return 1
384
384
385 # Fall back to next attribute.
385 # Fall back to next attribute.
386 continue
386 continue
387
387
388 # If we got here we couldn't sort by attributes and prefers. Fall
388 # If we got here we couldn't sort by attributes and prefers. Fall
389 # back to index order.
389 # back to index order.
390 return 0
390 return 0
391
391
392 def __lt__(self, other):
392 def __lt__(self, other):
393 return self._cmp(other) < 0
393 return self._cmp(other) < 0
394
394
395 def __gt__(self, other):
395 def __gt__(self, other):
396 return self._cmp(other) > 0
396 return self._cmp(other) > 0
397
397
398 def __eq__(self, other):
398 def __eq__(self, other):
399 return self._cmp(other) == 0
399 return self._cmp(other) == 0
400
400
401 def __le__(self, other):
401 def __le__(self, other):
402 return self._cmp(other) <= 0
402 return self._cmp(other) <= 0
403
403
404 def __ge__(self, other):
404 def __ge__(self, other):
405 return self._cmp(other) >= 0
405 return self._cmp(other) >= 0
406
406
407 def __ne__(self, other):
407 def __ne__(self, other):
408 return self._cmp(other) != 0
408 return self._cmp(other) != 0
409
409
410
410
411 def sortclonebundleentries(ui, entries):
411 def sortclonebundleentries(ui, entries):
412 prefers = ui.configlist(b'ui', b'clonebundleprefers')
412 prefers = ui.configlist(b'ui', b'clonebundleprefers')
413 if not prefers:
413 if not prefers:
414 return list(entries)
414 return list(entries)
415
415
416 def _split(p):
416 def _split(p):
417 if b'=' not in p:
417 if b'=' not in p:
418 hint = _(b"each comma separated item should be key=value pairs")
418 hint = _(b"each comma separated item should be key=value pairs")
419 raise error.Abort(
419 raise error.Abort(
420 _(b"invalid ui.clonebundleprefers item: %s") % p, hint=hint
420 _(b"invalid ui.clonebundleprefers item: %s") % p, hint=hint
421 )
421 )
422 return p.split(b'=', 1)
422 return p.split(b'=', 1)
423
423
424 prefers = [_split(p) for p in prefers]
424 prefers = [_split(p) for p in prefers]
425
425
426 items = sorted(clonebundleentry(v, prefers) for v in entries)
426 items = sorted(clonebundleentry(v, prefers) for v in entries)
427 return [i.value for i in items]
427 return [i.value for i in items]
@@ -1,91 +1,111
1 # requirements.py - objects and functions related to repository requirements
1 # requirements.py - objects and functions related to repository requirements
2 #
2 #
3 # Copyright 2005-2007 Olivia Mackall <olivia@selenic.com>
3 # Copyright 2005-2007 Olivia Mackall <olivia@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 GENERALDELTA_REQUIREMENT = b'generaldelta'
10 GENERALDELTA_REQUIREMENT = b'generaldelta'
11 DOTENCODE_REQUIREMENT = b'dotencode'
11 DOTENCODE_REQUIREMENT = b'dotencode'
12 STORE_REQUIREMENT = b'store'
12 STORE_REQUIREMENT = b'store'
13 FNCACHE_REQUIREMENT = b'fncache'
13 FNCACHE_REQUIREMENT = b'fncache'
14
14
15 DIRSTATE_V2_REQUIREMENT = b'dirstate-v2'
15 DIRSTATE_V2_REQUIREMENT = b'dirstate-v2'
16
16
17 # When narrowing is finalized and no longer subject to format changes,
17 # When narrowing is finalized and no longer subject to format changes,
18 # we should move this to just "narrow" or similar.
18 # we should move this to just "narrow" or similar.
19 NARROW_REQUIREMENT = b'narrowhg-experimental'
19 NARROW_REQUIREMENT = b'narrowhg-experimental'
20
20
21 # Enables sparse working directory usage
21 # Enables sparse working directory usage
22 SPARSE_REQUIREMENT = b'exp-sparse'
22 SPARSE_REQUIREMENT = b'exp-sparse'
23
23
24 # Enables the internal phase which is used to hide changesets instead
24 # Enables the internal phase which is used to hide changesets instead
25 # of stripping them
25 # of stripping them
26 INTERNAL_PHASE_REQUIREMENT = b'internal-phase'
26 INTERNAL_PHASE_REQUIREMENT = b'internal-phase'
27
27
28 # Stores manifest in Tree structure
28 # Stores manifest in Tree structure
29 TREEMANIFEST_REQUIREMENT = b'treemanifest'
29 TREEMANIFEST_REQUIREMENT = b'treemanifest'
30
30
31 REVLOGV1_REQUIREMENT = b'revlogv1'
31 REVLOGV1_REQUIREMENT = b'revlogv1'
32
32
33 # Increment the sub-version when the revlog v2 format changes to lock out old
33 # Increment the sub-version when the revlog v2 format changes to lock out old
34 # clients.
34 # clients.
35 CHANGELOGV2_REQUIREMENT = b'exp-changelog-v2'
35 CHANGELOGV2_REQUIREMENT = b'exp-changelog-v2'
36
36
37 # Increment the sub-version when the revlog v2 format changes to lock out old
37 # Increment the sub-version when the revlog v2 format changes to lock out old
38 # clients.
38 # clients.
39 REVLOGV2_REQUIREMENT = b'exp-revlogv2.2'
39 REVLOGV2_REQUIREMENT = b'exp-revlogv2.2'
40
40
41 # A repository with the sparserevlog feature will have delta chains that
41 # A repository with the sparserevlog feature will have delta chains that
42 # can spread over a larger span. Sparse reading cuts these large spans into
42 # can spread over a larger span. Sparse reading cuts these large spans into
43 # pieces, so that each piece isn't too big.
43 # pieces, so that each piece isn't too big.
44 # Without the sparserevlog capability, reading from the repository could use
44 # Without the sparserevlog capability, reading from the repository could use
45 # huge amounts of memory, because the whole span would be read at once,
45 # huge amounts of memory, because the whole span would be read at once,
46 # including all the intermediate revisions that aren't pertinent for the chain.
46 # including all the intermediate revisions that aren't pertinent for the chain.
47 # This is why once a repository has enabled sparse-read, it becomes required.
47 # This is why once a repository has enabled sparse-read, it becomes required.
48 SPARSEREVLOG_REQUIREMENT = b'sparserevlog'
48 SPARSEREVLOG_REQUIREMENT = b'sparserevlog'
49
49
50 # A repository with the the copies-sidedata-changeset requirement will store
50 # A repository with the the copies-sidedata-changeset requirement will store
51 # copies related information in changeset's sidedata.
51 # copies related information in changeset's sidedata.
52 COPIESSDC_REQUIREMENT = b'exp-copies-sidedata-changeset'
52 COPIESSDC_REQUIREMENT = b'exp-copies-sidedata-changeset'
53
53
54 # The repository use persistent nodemap for the changelog and the manifest.
54 # The repository use persistent nodemap for the changelog and the manifest.
55 NODEMAP_REQUIREMENT = b'persistent-nodemap'
55 NODEMAP_REQUIREMENT = b'persistent-nodemap'
56
56
57 # Denotes that the current repository is a share
57 # Denotes that the current repository is a share
58 SHARED_REQUIREMENT = b'shared'
58 SHARED_REQUIREMENT = b'shared'
59
59
60 # Denotes that current repository is a share and the shared source path is
60 # Denotes that current repository is a share and the shared source path is
61 # relative to the current repository root path
61 # relative to the current repository root path
62 RELATIVE_SHARED_REQUIREMENT = b'relshared'
62 RELATIVE_SHARED_REQUIREMENT = b'relshared'
63
63
64 # A repository with share implemented safely. The repository has different
64 # A repository with share implemented safely. The repository has different
65 # store and working copy requirements i.e. both `.hg/requires` and
65 # store and working copy requirements i.e. both `.hg/requires` and
66 # `.hg/store/requires` are present.
66 # `.hg/store/requires` are present.
67 SHARESAFE_REQUIREMENT = b'share-safe'
67 SHARESAFE_REQUIREMENT = b'share-safe'
68
68
69 # Bookmarks must be stored in the `store` part of the repository and will be
69 # Bookmarks must be stored in the `store` part of the repository and will be
70 # share accross shares
70 # share accross shares
71 BOOKMARKS_IN_STORE_REQUIREMENT = b'bookmarksinstore'
71 BOOKMARKS_IN_STORE_REQUIREMENT = b'bookmarksinstore'
72
72
73 # List of requirements which are working directory specific
73 # List of requirements which are working directory specific
74 # These requirements cannot be shared between repositories if they
74 # These requirements cannot be shared between repositories if they
75 # share the same store
75 # share the same store
76 # * sparse is a working directory specific functionality and hence working
76 # * sparse is a working directory specific functionality and hence working
77 # directory specific requirement
77 # directory specific requirement
78 # * SHARED_REQUIREMENT and RELATIVE_SHARED_REQUIREMENT are requirements which
78 # * SHARED_REQUIREMENT and RELATIVE_SHARED_REQUIREMENT are requirements which
79 # represents that the current working copy/repository shares store of another
79 # represents that the current working copy/repository shares store of another
80 # repo. Hence both of them should be stored in working copy
80 # repo. Hence both of them should be stored in working copy
81 # * SHARESAFE_REQUIREMENT needs to be stored in working dir to mark that rest of
81 # * SHARESAFE_REQUIREMENT needs to be stored in working dir to mark that rest of
82 # the requirements are stored in store's requires
82 # the requirements are stored in store's requires
83 # * DIRSTATE_V2_REQUIREMENT affects .hg/dirstate, of which there is one per
83 # * DIRSTATE_V2_REQUIREMENT affects .hg/dirstate, of which there is one per
84 # working directory.
84 # working directory.
85 WORKING_DIR_REQUIREMENTS = {
85 WORKING_DIR_REQUIREMENTS = {
86 SPARSE_REQUIREMENT,
86 SPARSE_REQUIREMENT,
87 SHARED_REQUIREMENT,
87 SHARED_REQUIREMENT,
88 RELATIVE_SHARED_REQUIREMENT,
88 RELATIVE_SHARED_REQUIREMENT,
89 SHARESAFE_REQUIREMENT,
89 SHARESAFE_REQUIREMENT,
90 DIRSTATE_V2_REQUIREMENT,
90 DIRSTATE_V2_REQUIREMENT,
91 }
91 }
92
93 # List of requirement that impact "stream-clone" (and hardlink clone) and
94 # cannot be changed in such cases.
95 #
96 # requirements not in this list are safe to be altered during stream-clone.
97 #
98 # note: the list is currently inherited from previous code and miss some relevant requirement while containing some irrelevant ones.
99 STREAM_FIXED_REQUIREMENTS = {
100 BOOKMARKS_IN_STORE_REQUIREMENT,
101 CHANGELOGV2_REQUIREMENT,
102 COPIESSDC_REQUIREMENT,
103 DIRSTATE_V2_REQUIREMENT,
104 GENERALDELTA_REQUIREMENT,
105 NODEMAP_REQUIREMENT,
106 REVLOGV1_REQUIREMENT,
107 REVLOGV2_REQUIREMENT,
108 SHARESAFE_REQUIREMENT,
109 SPARSEREVLOG_REQUIREMENT,
110 TREEMANIFEST_REQUIREMENT,
111 }
@@ -1,941 +1,939
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import contextlib
10 import contextlib
11 import errno
11 import errno
12 import os
12 import os
13 import struct
13 import struct
14
14
15 from .i18n import _
15 from .i18n import _
16 from .pycompat import open
16 from .pycompat import open
17 from .interfaces import repository
17 from .interfaces import repository
18 from . import (
18 from . import (
19 bookmarks,
19 bookmarks,
20 cacheutil,
20 cacheutil,
21 error,
21 error,
22 narrowspec,
22 narrowspec,
23 phases,
23 phases,
24 pycompat,
24 pycompat,
25 requirements as requirementsmod,
25 requirements as requirementsmod,
26 scmutil,
26 scmutil,
27 store,
27 store,
28 util,
28 util,
29 )
29 )
30 from .utils import (
30 from .utils import (
31 stringutil,
31 stringutil,
32 )
32 )
33
33
34
34
35 def new_stream_clone_requirements(
35 def new_stream_clone_requirements(default_requirements, streamed_requirements):
36 supported_formats, default_requirements, streamed_requirements
37 ):
38 """determine the final set of requirement for a new stream clone
36 """determine the final set of requirement for a new stream clone
39
37
40 this method combine the "default" requirements that a new repository would
38 this method combine the "default" requirements that a new repository would
41 use with the constaint we get from the stream clone content. We keep local
39 use with the constaint we get from the stream clone content. We keep local
42 configuration choice when possible.
40 configuration choice when possible.
43 """
41 """
44 requirements = set(default_requirements)
42 requirements = set(default_requirements)
45 requirements -= supported_formats
43 requirements -= requirementsmod.STREAM_FIXED_REQUIREMENTS
46 requirements.update(streamed_requirements)
44 requirements.update(streamed_requirements)
47 return requirements
45 return requirements
48
46
49
47
50 def streamed_requirements(repo):
48 def streamed_requirements(repo):
51 """the set of requirement the new clone will have to support
49 """the set of requirement the new clone will have to support
52
50
53 This is used for advertising the stream options and to generate the actual
51 This is used for advertising the stream options and to generate the actual
54 stream content."""
52 stream content."""
55 requiredformats = repo.requirements & repo.supportedformats
53 requiredformats = (
54 repo.requirements & requirementsmod.STREAM_FIXED_REQUIREMENTS
55 )
56 return requiredformats
56 return requiredformats
57
57
58
58
59 def canperformstreamclone(pullop, bundle2=False):
59 def canperformstreamclone(pullop, bundle2=False):
60 """Whether it is possible to perform a streaming clone as part of pull.
60 """Whether it is possible to perform a streaming clone as part of pull.
61
61
62 ``bundle2`` will cause the function to consider stream clone through
62 ``bundle2`` will cause the function to consider stream clone through
63 bundle2 and only through bundle2.
63 bundle2 and only through bundle2.
64
64
65 Returns a tuple of (supported, requirements). ``supported`` is True if
65 Returns a tuple of (supported, requirements). ``supported`` is True if
66 streaming clone is supported and False otherwise. ``requirements`` is
66 streaming clone is supported and False otherwise. ``requirements`` is
67 a set of repo requirements from the remote, or ``None`` if stream clone
67 a set of repo requirements from the remote, or ``None`` if stream clone
68 isn't supported.
68 isn't supported.
69 """
69 """
70 repo = pullop.repo
70 repo = pullop.repo
71 remote = pullop.remote
71 remote = pullop.remote
72
72
73 bundle2supported = False
73 bundle2supported = False
74 if pullop.canusebundle2:
74 if pullop.canusebundle2:
75 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
75 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
76 bundle2supported = True
76 bundle2supported = True
77 # else
77 # else
78 # Server doesn't support bundle2 stream clone or doesn't support
78 # Server doesn't support bundle2 stream clone or doesn't support
79 # the versions we support. Fall back and possibly allow legacy.
79 # the versions we support. Fall back and possibly allow legacy.
80
80
81 # Ensures legacy code path uses available bundle2.
81 # Ensures legacy code path uses available bundle2.
82 if bundle2supported and not bundle2:
82 if bundle2supported and not bundle2:
83 return False, None
83 return False, None
84 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
84 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
85 elif bundle2 and not bundle2supported:
85 elif bundle2 and not bundle2supported:
86 return False, None
86 return False, None
87
87
88 # Streaming clone only works on empty repositories.
88 # Streaming clone only works on empty repositories.
89 if len(repo):
89 if len(repo):
90 return False, None
90 return False, None
91
91
92 # Streaming clone only works if all data is being requested.
92 # Streaming clone only works if all data is being requested.
93 if pullop.heads:
93 if pullop.heads:
94 return False, None
94 return False, None
95
95
96 streamrequested = pullop.streamclonerequested
96 streamrequested = pullop.streamclonerequested
97
97
98 # If we don't have a preference, let the server decide for us. This
98 # If we don't have a preference, let the server decide for us. This
99 # likely only comes into play in LANs.
99 # likely only comes into play in LANs.
100 if streamrequested is None:
100 if streamrequested is None:
101 # The server can advertise whether to prefer streaming clone.
101 # The server can advertise whether to prefer streaming clone.
102 streamrequested = remote.capable(b'stream-preferred')
102 streamrequested = remote.capable(b'stream-preferred')
103
103
104 if not streamrequested:
104 if not streamrequested:
105 return False, None
105 return False, None
106
106
107 # In order for stream clone to work, the client has to support all the
107 # In order for stream clone to work, the client has to support all the
108 # requirements advertised by the server.
108 # requirements advertised by the server.
109 #
109 #
110 # The server advertises its requirements via the "stream" and "streamreqs"
110 # The server advertises its requirements via the "stream" and "streamreqs"
111 # capability. "stream" (a value-less capability) is advertised if and only
111 # capability. "stream" (a value-less capability) is advertised if and only
112 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
112 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
113 # is advertised and contains a comma-delimited list of requirements.
113 # is advertised and contains a comma-delimited list of requirements.
114 requirements = set()
114 requirements = set()
115 if remote.capable(b'stream'):
115 if remote.capable(b'stream'):
116 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
116 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
117 else:
117 else:
118 streamreqs = remote.capable(b'streamreqs')
118 streamreqs = remote.capable(b'streamreqs')
119 # This is weird and shouldn't happen with modern servers.
119 # This is weird and shouldn't happen with modern servers.
120 if not streamreqs:
120 if not streamreqs:
121 pullop.repo.ui.warn(
121 pullop.repo.ui.warn(
122 _(
122 _(
123 b'warning: stream clone requested but server has them '
123 b'warning: stream clone requested but server has them '
124 b'disabled\n'
124 b'disabled\n'
125 )
125 )
126 )
126 )
127 return False, None
127 return False, None
128
128
129 streamreqs = set(streamreqs.split(b','))
129 streamreqs = set(streamreqs.split(b','))
130 # Server requires something we don't support. Bail.
130 # Server requires something we don't support. Bail.
131 missingreqs = streamreqs - repo.supported
131 missingreqs = streamreqs - repo.supported
132 if missingreqs:
132 if missingreqs:
133 pullop.repo.ui.warn(
133 pullop.repo.ui.warn(
134 _(
134 _(
135 b'warning: stream clone requested but client is missing '
135 b'warning: stream clone requested but client is missing '
136 b'requirements: %s\n'
136 b'requirements: %s\n'
137 )
137 )
138 % b', '.join(sorted(missingreqs))
138 % b', '.join(sorted(missingreqs))
139 )
139 )
140 pullop.repo.ui.warn(
140 pullop.repo.ui.warn(
141 _(
141 _(
142 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
142 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
143 b'for more information)\n'
143 b'for more information)\n'
144 )
144 )
145 )
145 )
146 return False, None
146 return False, None
147 requirements = streamreqs
147 requirements = streamreqs
148
148
149 return True, requirements
149 return True, requirements
150
150
151
151
152 def maybeperformlegacystreamclone(pullop):
152 def maybeperformlegacystreamclone(pullop):
153 """Possibly perform a legacy stream clone operation.
153 """Possibly perform a legacy stream clone operation.
154
154
155 Legacy stream clones are performed as part of pull but before all other
155 Legacy stream clones are performed as part of pull but before all other
156 operations.
156 operations.
157
157
158 A legacy stream clone will not be performed if a bundle2 stream clone is
158 A legacy stream clone will not be performed if a bundle2 stream clone is
159 supported.
159 supported.
160 """
160 """
161 from . import localrepo
161 from . import localrepo
162
162
163 supported, requirements = canperformstreamclone(pullop)
163 supported, requirements = canperformstreamclone(pullop)
164
164
165 if not supported:
165 if not supported:
166 return
166 return
167
167
168 repo = pullop.repo
168 repo = pullop.repo
169 remote = pullop.remote
169 remote = pullop.remote
170
170
171 # Save remote branchmap. We will use it later to speed up branchcache
171 # Save remote branchmap. We will use it later to speed up branchcache
172 # creation.
172 # creation.
173 rbranchmap = None
173 rbranchmap = None
174 if remote.capable(b'branchmap'):
174 if remote.capable(b'branchmap'):
175 with remote.commandexecutor() as e:
175 with remote.commandexecutor() as e:
176 rbranchmap = e.callcommand(b'branchmap', {}).result()
176 rbranchmap = e.callcommand(b'branchmap', {}).result()
177
177
178 repo.ui.status(_(b'streaming all changes\n'))
178 repo.ui.status(_(b'streaming all changes\n'))
179
179
180 with remote.commandexecutor() as e:
180 with remote.commandexecutor() as e:
181 fp = e.callcommand(b'stream_out', {}).result()
181 fp = e.callcommand(b'stream_out', {}).result()
182
182
183 # TODO strictly speaking, this code should all be inside the context
183 # TODO strictly speaking, this code should all be inside the context
184 # manager because the context manager is supposed to ensure all wire state
184 # manager because the context manager is supposed to ensure all wire state
185 # is flushed when exiting. But the legacy peers don't do this, so it
185 # is flushed when exiting. But the legacy peers don't do this, so it
186 # doesn't matter.
186 # doesn't matter.
187 l = fp.readline()
187 l = fp.readline()
188 try:
188 try:
189 resp = int(l)
189 resp = int(l)
190 except ValueError:
190 except ValueError:
191 raise error.ResponseError(
191 raise error.ResponseError(
192 _(b'unexpected response from remote server:'), l
192 _(b'unexpected response from remote server:'), l
193 )
193 )
194 if resp == 1:
194 if resp == 1:
195 raise error.Abort(_(b'operation forbidden by server'))
195 raise error.Abort(_(b'operation forbidden by server'))
196 elif resp == 2:
196 elif resp == 2:
197 raise error.Abort(_(b'locking the remote repository failed'))
197 raise error.Abort(_(b'locking the remote repository failed'))
198 elif resp != 0:
198 elif resp != 0:
199 raise error.Abort(_(b'the server sent an unknown error code'))
199 raise error.Abort(_(b'the server sent an unknown error code'))
200
200
201 l = fp.readline()
201 l = fp.readline()
202 try:
202 try:
203 filecount, bytecount = map(int, l.split(b' ', 1))
203 filecount, bytecount = map(int, l.split(b' ', 1))
204 except (ValueError, TypeError):
204 except (ValueError, TypeError):
205 raise error.ResponseError(
205 raise error.ResponseError(
206 _(b'unexpected response from remote server:'), l
206 _(b'unexpected response from remote server:'), l
207 )
207 )
208
208
209 with repo.lock():
209 with repo.lock():
210 consumev1(repo, fp, filecount, bytecount)
210 consumev1(repo, fp, filecount, bytecount)
211 repo.requirements = new_stream_clone_requirements(
211 repo.requirements = new_stream_clone_requirements(
212 repo.supportedformats,
213 repo.requirements,
212 repo.requirements,
214 requirements,
213 requirements,
215 )
214 )
216 repo.svfs.options = localrepo.resolvestorevfsoptions(
215 repo.svfs.options = localrepo.resolvestorevfsoptions(
217 repo.ui, repo.requirements, repo.features
216 repo.ui, repo.requirements, repo.features
218 )
217 )
219 scmutil.writereporequirements(repo)
218 scmutil.writereporequirements(repo)
220
219
221 if rbranchmap:
220 if rbranchmap:
222 repo._branchcaches.replace(repo, rbranchmap)
221 repo._branchcaches.replace(repo, rbranchmap)
223
222
224 repo.invalidate()
223 repo.invalidate()
225
224
226
225
227 def allowservergeneration(repo):
226 def allowservergeneration(repo):
228 """Whether streaming clones are allowed from the server."""
227 """Whether streaming clones are allowed from the server."""
229 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
228 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
230 return False
229 return False
231
230
232 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
231 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
233 return False
232 return False
234
233
235 # The way stream clone works makes it impossible to hide secret changesets.
234 # The way stream clone works makes it impossible to hide secret changesets.
236 # So don't allow this by default.
235 # So don't allow this by default.
237 secret = phases.hassecret(repo)
236 secret = phases.hassecret(repo)
238 if secret:
237 if secret:
239 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
238 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
240
239
241 return True
240 return True
242
241
243
242
244 # This is it's own function so extensions can override it.
243 # This is it's own function so extensions can override it.
245 def _walkstreamfiles(repo, matcher=None):
244 def _walkstreamfiles(repo, matcher=None):
246 return repo.store.walk(matcher)
245 return repo.store.walk(matcher)
247
246
248
247
249 def generatev1(repo):
248 def generatev1(repo):
250 """Emit content for version 1 of a streaming clone.
249 """Emit content for version 1 of a streaming clone.
251
250
252 This returns a 3-tuple of (file count, byte size, data iterator).
251 This returns a 3-tuple of (file count, byte size, data iterator).
253
252
254 The data iterator consists of N entries for each file being transferred.
253 The data iterator consists of N entries for each file being transferred.
255 Each file entry starts as a line with the file name and integer size
254 Each file entry starts as a line with the file name and integer size
256 delimited by a null byte.
255 delimited by a null byte.
257
256
258 The raw file data follows. Following the raw file data is the next file
257 The raw file data follows. Following the raw file data is the next file
259 entry, or EOF.
258 entry, or EOF.
260
259
261 When used on the wire protocol, an additional line indicating protocol
260 When used on the wire protocol, an additional line indicating protocol
262 success will be prepended to the stream. This function is not responsible
261 success will be prepended to the stream. This function is not responsible
263 for adding it.
262 for adding it.
264
263
265 This function will obtain a repository lock to ensure a consistent view of
264 This function will obtain a repository lock to ensure a consistent view of
266 the store is captured. It therefore may raise LockError.
265 the store is captured. It therefore may raise LockError.
267 """
266 """
268 entries = []
267 entries = []
269 total_bytes = 0
268 total_bytes = 0
270 # Get consistent snapshot of repo, lock during scan.
269 # Get consistent snapshot of repo, lock during scan.
271 with repo.lock():
270 with repo.lock():
272 repo.ui.debug(b'scanning\n')
271 repo.ui.debug(b'scanning\n')
273 for file_type, name, size in _walkstreamfiles(repo):
272 for file_type, name, size in _walkstreamfiles(repo):
274 if size:
273 if size:
275 entries.append((name, size))
274 entries.append((name, size))
276 total_bytes += size
275 total_bytes += size
277 _test_sync_point_walk_1(repo)
276 _test_sync_point_walk_1(repo)
278 _test_sync_point_walk_2(repo)
277 _test_sync_point_walk_2(repo)
279
278
280 repo.ui.debug(
279 repo.ui.debug(
281 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
280 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
282 )
281 )
283
282
284 svfs = repo.svfs
283 svfs = repo.svfs
285 debugflag = repo.ui.debugflag
284 debugflag = repo.ui.debugflag
286
285
287 def emitrevlogdata():
286 def emitrevlogdata():
288 for name, size in entries:
287 for name, size in entries:
289 if debugflag:
288 if debugflag:
290 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
289 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
291 # partially encode name over the wire for backwards compat
290 # partially encode name over the wire for backwards compat
292 yield b'%s\0%d\n' % (store.encodedir(name), size)
291 yield b'%s\0%d\n' % (store.encodedir(name), size)
293 # auditing at this stage is both pointless (paths are already
292 # auditing at this stage is both pointless (paths are already
294 # trusted by the local repo) and expensive
293 # trusted by the local repo) and expensive
295 with svfs(name, b'rb', auditpath=False) as fp:
294 with svfs(name, b'rb', auditpath=False) as fp:
296 if size <= 65536:
295 if size <= 65536:
297 yield fp.read(size)
296 yield fp.read(size)
298 else:
297 else:
299 for chunk in util.filechunkiter(fp, limit=size):
298 for chunk in util.filechunkiter(fp, limit=size):
300 yield chunk
299 yield chunk
301
300
302 return len(entries), total_bytes, emitrevlogdata()
301 return len(entries), total_bytes, emitrevlogdata()
303
302
304
303
305 def generatev1wireproto(repo):
304 def generatev1wireproto(repo):
306 """Emit content for version 1 of streaming clone suitable for the wire.
305 """Emit content for version 1 of streaming clone suitable for the wire.
307
306
308 This is the data output from ``generatev1()`` with 2 header lines. The
307 This is the data output from ``generatev1()`` with 2 header lines. The
309 first line indicates overall success. The 2nd contains the file count and
308 first line indicates overall success. The 2nd contains the file count and
310 byte size of payload.
309 byte size of payload.
311
310
312 The success line contains "0" for success, "1" for stream generation not
311 The success line contains "0" for success, "1" for stream generation not
313 allowed, and "2" for error locking the repository (possibly indicating
312 allowed, and "2" for error locking the repository (possibly indicating
314 a permissions error for the server process).
313 a permissions error for the server process).
315 """
314 """
316 if not allowservergeneration(repo):
315 if not allowservergeneration(repo):
317 yield b'1\n'
316 yield b'1\n'
318 return
317 return
319
318
320 try:
319 try:
321 filecount, bytecount, it = generatev1(repo)
320 filecount, bytecount, it = generatev1(repo)
322 except error.LockError:
321 except error.LockError:
323 yield b'2\n'
322 yield b'2\n'
324 return
323 return
325
324
326 # Indicates successful response.
325 # Indicates successful response.
327 yield b'0\n'
326 yield b'0\n'
328 yield b'%d %d\n' % (filecount, bytecount)
327 yield b'%d %d\n' % (filecount, bytecount)
329 for chunk in it:
328 for chunk in it:
330 yield chunk
329 yield chunk
331
330
332
331
333 def generatebundlev1(repo, compression=b'UN'):
332 def generatebundlev1(repo, compression=b'UN'):
334 """Emit content for version 1 of a stream clone bundle.
333 """Emit content for version 1 of a stream clone bundle.
335
334
336 The first 4 bytes of the output ("HGS1") denote this as stream clone
335 The first 4 bytes of the output ("HGS1") denote this as stream clone
337 bundle version 1.
336 bundle version 1.
338
337
339 The next 2 bytes indicate the compression type. Only "UN" is currently
338 The next 2 bytes indicate the compression type. Only "UN" is currently
340 supported.
339 supported.
341
340
342 The next 16 bytes are two 64-bit big endian unsigned integers indicating
341 The next 16 bytes are two 64-bit big endian unsigned integers indicating
343 file count and byte count, respectively.
342 file count and byte count, respectively.
344
343
345 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
344 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
346 of the requirements string, including a trailing \0. The following N bytes
345 of the requirements string, including a trailing \0. The following N bytes
347 are the requirements string, which is ASCII containing a comma-delimited
346 are the requirements string, which is ASCII containing a comma-delimited
348 list of repo requirements that are needed to support the data.
347 list of repo requirements that are needed to support the data.
349
348
350 The remaining content is the output of ``generatev1()`` (which may be
349 The remaining content is the output of ``generatev1()`` (which may be
351 compressed in the future).
350 compressed in the future).
352
351
353 Returns a tuple of (requirements, data generator).
352 Returns a tuple of (requirements, data generator).
354 """
353 """
355 if compression != b'UN':
354 if compression != b'UN':
356 raise ValueError(b'we do not support the compression argument yet')
355 raise ValueError(b'we do not support the compression argument yet')
357
356
358 requirements = streamed_requirements(repo)
357 requirements = streamed_requirements(repo)
359 requires = b','.join(sorted(requirements))
358 requires = b','.join(sorted(requirements))
360
359
361 def gen():
360 def gen():
362 yield b'HGS1'
361 yield b'HGS1'
363 yield compression
362 yield compression
364
363
365 filecount, bytecount, it = generatev1(repo)
364 filecount, bytecount, it = generatev1(repo)
366 repo.ui.status(
365 repo.ui.status(
367 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
366 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
368 )
367 )
369
368
370 yield struct.pack(b'>QQ', filecount, bytecount)
369 yield struct.pack(b'>QQ', filecount, bytecount)
371 yield struct.pack(b'>H', len(requires) + 1)
370 yield struct.pack(b'>H', len(requires) + 1)
372 yield requires + b'\0'
371 yield requires + b'\0'
373
372
374 # This is where we'll add compression in the future.
373 # This is where we'll add compression in the future.
375 assert compression == b'UN'
374 assert compression == b'UN'
376
375
377 progress = repo.ui.makeprogress(
376 progress = repo.ui.makeprogress(
378 _(b'bundle'), total=bytecount, unit=_(b'bytes')
377 _(b'bundle'), total=bytecount, unit=_(b'bytes')
379 )
378 )
380 progress.update(0)
379 progress.update(0)
381
380
382 for chunk in it:
381 for chunk in it:
383 progress.increment(step=len(chunk))
382 progress.increment(step=len(chunk))
384 yield chunk
383 yield chunk
385
384
386 progress.complete()
385 progress.complete()
387
386
388 return requirements, gen()
387 return requirements, gen()
389
388
390
389
391 def consumev1(repo, fp, filecount, bytecount):
390 def consumev1(repo, fp, filecount, bytecount):
392 """Apply the contents from version 1 of a streaming clone file handle.
391 """Apply the contents from version 1 of a streaming clone file handle.
393
392
394 This takes the output from "stream_out" and applies it to the specified
393 This takes the output from "stream_out" and applies it to the specified
395 repository.
394 repository.
396
395
397 Like "stream_out," the status line added by the wire protocol is not
396 Like "stream_out," the status line added by the wire protocol is not
398 handled by this function.
397 handled by this function.
399 """
398 """
400 with repo.lock():
399 with repo.lock():
401 repo.ui.status(
400 repo.ui.status(
402 _(b'%d files to transfer, %s of data\n')
401 _(b'%d files to transfer, %s of data\n')
403 % (filecount, util.bytecount(bytecount))
402 % (filecount, util.bytecount(bytecount))
404 )
403 )
405 progress = repo.ui.makeprogress(
404 progress = repo.ui.makeprogress(
406 _(b'clone'), total=bytecount, unit=_(b'bytes')
405 _(b'clone'), total=bytecount, unit=_(b'bytes')
407 )
406 )
408 progress.update(0)
407 progress.update(0)
409 start = util.timer()
408 start = util.timer()
410
409
411 # TODO: get rid of (potential) inconsistency
410 # TODO: get rid of (potential) inconsistency
412 #
411 #
413 # If transaction is started and any @filecache property is
412 # If transaction is started and any @filecache property is
414 # changed at this point, it causes inconsistency between
413 # changed at this point, it causes inconsistency between
415 # in-memory cached property and streamclone-ed file on the
414 # in-memory cached property and streamclone-ed file on the
416 # disk. Nested transaction prevents transaction scope "clone"
415 # disk. Nested transaction prevents transaction scope "clone"
417 # below from writing in-memory changes out at the end of it,
416 # below from writing in-memory changes out at the end of it,
418 # even though in-memory changes are discarded at the end of it
417 # even though in-memory changes are discarded at the end of it
419 # regardless of transaction nesting.
418 # regardless of transaction nesting.
420 #
419 #
421 # But transaction nesting can't be simply prohibited, because
420 # But transaction nesting can't be simply prohibited, because
422 # nesting occurs also in ordinary case (e.g. enabling
421 # nesting occurs also in ordinary case (e.g. enabling
423 # clonebundles).
422 # clonebundles).
424
423
425 with repo.transaction(b'clone'):
424 with repo.transaction(b'clone'):
426 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
425 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
427 for i in pycompat.xrange(filecount):
426 for i in pycompat.xrange(filecount):
428 # XXX doesn't support '\n' or '\r' in filenames
427 # XXX doesn't support '\n' or '\r' in filenames
429 l = fp.readline()
428 l = fp.readline()
430 try:
429 try:
431 name, size = l.split(b'\0', 1)
430 name, size = l.split(b'\0', 1)
432 size = int(size)
431 size = int(size)
433 except (ValueError, TypeError):
432 except (ValueError, TypeError):
434 raise error.ResponseError(
433 raise error.ResponseError(
435 _(b'unexpected response from remote server:'), l
434 _(b'unexpected response from remote server:'), l
436 )
435 )
437 if repo.ui.debugflag:
436 if repo.ui.debugflag:
438 repo.ui.debug(
437 repo.ui.debug(
439 b'adding %s (%s)\n' % (name, util.bytecount(size))
438 b'adding %s (%s)\n' % (name, util.bytecount(size))
440 )
439 )
441 # for backwards compat, name was partially encoded
440 # for backwards compat, name was partially encoded
442 path = store.decodedir(name)
441 path = store.decodedir(name)
443 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
442 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
444 for chunk in util.filechunkiter(fp, limit=size):
443 for chunk in util.filechunkiter(fp, limit=size):
445 progress.increment(step=len(chunk))
444 progress.increment(step=len(chunk))
446 ofp.write(chunk)
445 ofp.write(chunk)
447
446
448 # force @filecache properties to be reloaded from
447 # force @filecache properties to be reloaded from
449 # streamclone-ed file at next access
448 # streamclone-ed file at next access
450 repo.invalidate(clearfilecache=True)
449 repo.invalidate(clearfilecache=True)
451
450
452 elapsed = util.timer() - start
451 elapsed = util.timer() - start
453 if elapsed <= 0:
452 if elapsed <= 0:
454 elapsed = 0.001
453 elapsed = 0.001
455 progress.complete()
454 progress.complete()
456 repo.ui.status(
455 repo.ui.status(
457 _(b'transferred %s in %.1f seconds (%s/sec)\n')
456 _(b'transferred %s in %.1f seconds (%s/sec)\n')
458 % (
457 % (
459 util.bytecount(bytecount),
458 util.bytecount(bytecount),
460 elapsed,
459 elapsed,
461 util.bytecount(bytecount / elapsed),
460 util.bytecount(bytecount / elapsed),
462 )
461 )
463 )
462 )
464
463
465
464
466 def readbundle1header(fp):
465 def readbundle1header(fp):
467 compression = fp.read(2)
466 compression = fp.read(2)
468 if compression != b'UN':
467 if compression != b'UN':
469 raise error.Abort(
468 raise error.Abort(
470 _(
469 _(
471 b'only uncompressed stream clone bundles are '
470 b'only uncompressed stream clone bundles are '
472 b'supported; got %s'
471 b'supported; got %s'
473 )
472 )
474 % compression
473 % compression
475 )
474 )
476
475
477 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
476 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
478 requireslen = struct.unpack(b'>H', fp.read(2))[0]
477 requireslen = struct.unpack(b'>H', fp.read(2))[0]
479 requires = fp.read(requireslen)
478 requires = fp.read(requireslen)
480
479
481 if not requires.endswith(b'\0'):
480 if not requires.endswith(b'\0'):
482 raise error.Abort(
481 raise error.Abort(
483 _(
482 _(
484 b'malformed stream clone bundle: '
483 b'malformed stream clone bundle: '
485 b'requirements not properly encoded'
484 b'requirements not properly encoded'
486 )
485 )
487 )
486 )
488
487
489 requirements = set(requires.rstrip(b'\0').split(b','))
488 requirements = set(requires.rstrip(b'\0').split(b','))
490
489
491 return filecount, bytecount, requirements
490 return filecount, bytecount, requirements
492
491
493
492
494 def applybundlev1(repo, fp):
493 def applybundlev1(repo, fp):
495 """Apply the content from a stream clone bundle version 1.
494 """Apply the content from a stream clone bundle version 1.
496
495
497 We assume the 4 byte header has been read and validated and the file handle
496 We assume the 4 byte header has been read and validated and the file handle
498 is at the 2 byte compression identifier.
497 is at the 2 byte compression identifier.
499 """
498 """
500 if len(repo):
499 if len(repo):
501 raise error.Abort(
500 raise error.Abort(
502 _(b'cannot apply stream clone bundle on non-empty repo')
501 _(b'cannot apply stream clone bundle on non-empty repo')
503 )
502 )
504
503
505 filecount, bytecount, requirements = readbundle1header(fp)
504 filecount, bytecount, requirements = readbundle1header(fp)
506 missingreqs = requirements - repo.supported
505 missingreqs = requirements - repo.supported
507 if missingreqs:
506 if missingreqs:
508 raise error.Abort(
507 raise error.Abort(
509 _(b'unable to apply stream clone: unsupported format: %s')
508 _(b'unable to apply stream clone: unsupported format: %s')
510 % b', '.join(sorted(missingreqs))
509 % b', '.join(sorted(missingreqs))
511 )
510 )
512
511
513 consumev1(repo, fp, filecount, bytecount)
512 consumev1(repo, fp, filecount, bytecount)
514
513
515
514
516 class streamcloneapplier(object):
515 class streamcloneapplier(object):
517 """Class to manage applying streaming clone bundles.
516 """Class to manage applying streaming clone bundles.
518
517
519 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
518 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
520 readers to perform bundle type-specific functionality.
519 readers to perform bundle type-specific functionality.
521 """
520 """
522
521
523 def __init__(self, fh):
522 def __init__(self, fh):
524 self._fh = fh
523 self._fh = fh
525
524
526 def apply(self, repo):
525 def apply(self, repo):
527 return applybundlev1(repo, self._fh)
526 return applybundlev1(repo, self._fh)
528
527
529
528
530 # type of file to stream
529 # type of file to stream
531 _fileappend = 0 # append only file
530 _fileappend = 0 # append only file
532 _filefull = 1 # full snapshot file
531 _filefull = 1 # full snapshot file
533
532
534 # Source of the file
533 # Source of the file
535 _srcstore = b's' # store (svfs)
534 _srcstore = b's' # store (svfs)
536 _srccache = b'c' # cache (cache)
535 _srccache = b'c' # cache (cache)
537
536
538 # This is it's own function so extensions can override it.
537 # This is it's own function so extensions can override it.
539 def _walkstreamfullstorefiles(repo):
538 def _walkstreamfullstorefiles(repo):
540 """list snapshot file from the store"""
539 """list snapshot file from the store"""
541 fnames = []
540 fnames = []
542 if not repo.publishing():
541 if not repo.publishing():
543 fnames.append(b'phaseroots')
542 fnames.append(b'phaseroots')
544 return fnames
543 return fnames
545
544
546
545
547 def _filterfull(entry, copy, vfsmap):
546 def _filterfull(entry, copy, vfsmap):
548 """actually copy the snapshot files"""
547 """actually copy the snapshot files"""
549 src, name, ftype, data = entry
548 src, name, ftype, data = entry
550 if ftype != _filefull:
549 if ftype != _filefull:
551 return entry
550 return entry
552 return (src, name, ftype, copy(vfsmap[src].join(name)))
551 return (src, name, ftype, copy(vfsmap[src].join(name)))
553
552
554
553
555 @contextlib.contextmanager
554 @contextlib.contextmanager
556 def maketempcopies():
555 def maketempcopies():
557 """return a function to temporary copy file"""
556 """return a function to temporary copy file"""
558 files = []
557 files = []
559 try:
558 try:
560
559
561 def copy(src):
560 def copy(src):
562 fd, dst = pycompat.mkstemp()
561 fd, dst = pycompat.mkstemp()
563 os.close(fd)
562 os.close(fd)
564 files.append(dst)
563 files.append(dst)
565 util.copyfiles(src, dst, hardlink=True)
564 util.copyfiles(src, dst, hardlink=True)
566 return dst
565 return dst
567
566
568 yield copy
567 yield copy
569 finally:
568 finally:
570 for tmp in files:
569 for tmp in files:
571 util.tryunlink(tmp)
570 util.tryunlink(tmp)
572
571
573
572
574 def _makemap(repo):
573 def _makemap(repo):
575 """make a (src -> vfs) map for the repo"""
574 """make a (src -> vfs) map for the repo"""
576 vfsmap = {
575 vfsmap = {
577 _srcstore: repo.svfs,
576 _srcstore: repo.svfs,
578 _srccache: repo.cachevfs,
577 _srccache: repo.cachevfs,
579 }
578 }
580 # we keep repo.vfs out of the on purpose, ther are too many danger there
579 # we keep repo.vfs out of the on purpose, ther are too many danger there
581 # (eg: .hg/hgrc)
580 # (eg: .hg/hgrc)
582 assert repo.vfs not in vfsmap.values()
581 assert repo.vfs not in vfsmap.values()
583
582
584 return vfsmap
583 return vfsmap
585
584
586
585
587 def _emit2(repo, entries, totalfilesize):
586 def _emit2(repo, entries, totalfilesize):
588 """actually emit the stream bundle"""
587 """actually emit the stream bundle"""
589 vfsmap = _makemap(repo)
588 vfsmap = _makemap(repo)
590 # we keep repo.vfs out of the on purpose, ther are too many danger there
589 # we keep repo.vfs out of the on purpose, ther are too many danger there
591 # (eg: .hg/hgrc),
590 # (eg: .hg/hgrc),
592 #
591 #
593 # this assert is duplicated (from _makemap) as author might think this is
592 # this assert is duplicated (from _makemap) as author might think this is
594 # fine, while this is really not fine.
593 # fine, while this is really not fine.
595 if repo.vfs in vfsmap.values():
594 if repo.vfs in vfsmap.values():
596 raise error.ProgrammingError(
595 raise error.ProgrammingError(
597 b'repo.vfs must not be added to vfsmap for security reasons'
596 b'repo.vfs must not be added to vfsmap for security reasons'
598 )
597 )
599
598
600 progress = repo.ui.makeprogress(
599 progress = repo.ui.makeprogress(
601 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
600 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
602 )
601 )
603 progress.update(0)
602 progress.update(0)
604 with maketempcopies() as copy, progress:
603 with maketempcopies() as copy, progress:
605 # copy is delayed until we are in the try
604 # copy is delayed until we are in the try
606 entries = [_filterfull(e, copy, vfsmap) for e in entries]
605 entries = [_filterfull(e, copy, vfsmap) for e in entries]
607 yield None # this release the lock on the repository
606 yield None # this release the lock on the repository
608 totalbytecount = 0
607 totalbytecount = 0
609
608
610 for src, name, ftype, data in entries:
609 for src, name, ftype, data in entries:
611 vfs = vfsmap[src]
610 vfs = vfsmap[src]
612 yield src
611 yield src
613 yield util.uvarintencode(len(name))
612 yield util.uvarintencode(len(name))
614 if ftype == _fileappend:
613 if ftype == _fileappend:
615 fp = vfs(name)
614 fp = vfs(name)
616 size = data
615 size = data
617 elif ftype == _filefull:
616 elif ftype == _filefull:
618 fp = open(data, b'rb')
617 fp = open(data, b'rb')
619 size = util.fstat(fp).st_size
618 size = util.fstat(fp).st_size
620 bytecount = 0
619 bytecount = 0
621 try:
620 try:
622 yield util.uvarintencode(size)
621 yield util.uvarintencode(size)
623 yield name
622 yield name
624 if size <= 65536:
623 if size <= 65536:
625 chunks = (fp.read(size),)
624 chunks = (fp.read(size),)
626 else:
625 else:
627 chunks = util.filechunkiter(fp, limit=size)
626 chunks = util.filechunkiter(fp, limit=size)
628 for chunk in chunks:
627 for chunk in chunks:
629 bytecount += len(chunk)
628 bytecount += len(chunk)
630 totalbytecount += len(chunk)
629 totalbytecount += len(chunk)
631 progress.update(totalbytecount)
630 progress.update(totalbytecount)
632 yield chunk
631 yield chunk
633 if bytecount != size:
632 if bytecount != size:
634 # Would most likely be caused by a race due to `hg strip` or
633 # Would most likely be caused by a race due to `hg strip` or
635 # a revlog split
634 # a revlog split
636 raise error.Abort(
635 raise error.Abort(
637 _(
636 _(
638 b'clone could only read %d bytes from %s, but '
637 b'clone could only read %d bytes from %s, but '
639 b'expected %d bytes'
638 b'expected %d bytes'
640 )
639 )
641 % (bytecount, name, size)
640 % (bytecount, name, size)
642 )
641 )
643 finally:
642 finally:
644 fp.close()
643 fp.close()
645
644
646
645
647 def _test_sync_point_walk_1(repo):
646 def _test_sync_point_walk_1(repo):
648 """a function for synchronisation during tests"""
647 """a function for synchronisation during tests"""
649
648
650
649
651 def _test_sync_point_walk_2(repo):
650 def _test_sync_point_walk_2(repo):
652 """a function for synchronisation during tests"""
651 """a function for synchronisation during tests"""
653
652
654
653
655 def _v2_walk(repo, includes, excludes, includeobsmarkers):
654 def _v2_walk(repo, includes, excludes, includeobsmarkers):
656 """emit a seris of files information useful to clone a repo
655 """emit a seris of files information useful to clone a repo
657
656
658 return (entries, totalfilesize)
657 return (entries, totalfilesize)
659
658
660 entries is a list of tuple (vfs-key, file-path, file-type, size)
659 entries is a list of tuple (vfs-key, file-path, file-type, size)
661
660
662 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
661 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
663 - `name`: file path of the file to copy (to be feed to the vfss)
662 - `name`: file path of the file to copy (to be feed to the vfss)
664 - `file-type`: do this file need to be copied with the source lock ?
663 - `file-type`: do this file need to be copied with the source lock ?
665 - `size`: the size of the file (or None)
664 - `size`: the size of the file (or None)
666 """
665 """
667 assert repo._currentlock(repo._lockref) is not None
666 assert repo._currentlock(repo._lockref) is not None
668 entries = []
667 entries = []
669 totalfilesize = 0
668 totalfilesize = 0
670
669
671 matcher = None
670 matcher = None
672 if includes or excludes:
671 if includes or excludes:
673 matcher = narrowspec.match(repo.root, includes, excludes)
672 matcher = narrowspec.match(repo.root, includes, excludes)
674
673
675 for rl_type, name, size in _walkstreamfiles(repo, matcher):
674 for rl_type, name, size in _walkstreamfiles(repo, matcher):
676 if size:
675 if size:
677 ft = _fileappend
676 ft = _fileappend
678 if rl_type & store.FILEFLAGS_VOLATILE:
677 if rl_type & store.FILEFLAGS_VOLATILE:
679 ft = _filefull
678 ft = _filefull
680 entries.append((_srcstore, name, ft, size))
679 entries.append((_srcstore, name, ft, size))
681 totalfilesize += size
680 totalfilesize += size
682 for name in _walkstreamfullstorefiles(repo):
681 for name in _walkstreamfullstorefiles(repo):
683 if repo.svfs.exists(name):
682 if repo.svfs.exists(name):
684 totalfilesize += repo.svfs.lstat(name).st_size
683 totalfilesize += repo.svfs.lstat(name).st_size
685 entries.append((_srcstore, name, _filefull, None))
684 entries.append((_srcstore, name, _filefull, None))
686 if includeobsmarkers and repo.svfs.exists(b'obsstore'):
685 if includeobsmarkers and repo.svfs.exists(b'obsstore'):
687 totalfilesize += repo.svfs.lstat(b'obsstore').st_size
686 totalfilesize += repo.svfs.lstat(b'obsstore').st_size
688 entries.append((_srcstore, b'obsstore', _filefull, None))
687 entries.append((_srcstore, b'obsstore', _filefull, None))
689 for name in cacheutil.cachetocopy(repo):
688 for name in cacheutil.cachetocopy(repo):
690 if repo.cachevfs.exists(name):
689 if repo.cachevfs.exists(name):
691 totalfilesize += repo.cachevfs.lstat(name).st_size
690 totalfilesize += repo.cachevfs.lstat(name).st_size
692 entries.append((_srccache, name, _filefull, None))
691 entries.append((_srccache, name, _filefull, None))
693 return entries, totalfilesize
692 return entries, totalfilesize
694
693
695
694
696 def generatev2(repo, includes, excludes, includeobsmarkers):
695 def generatev2(repo, includes, excludes, includeobsmarkers):
697 """Emit content for version 2 of a streaming clone.
696 """Emit content for version 2 of a streaming clone.
698
697
699 the data stream consists the following entries:
698 the data stream consists the following entries:
700 1) A char representing the file destination (eg: store or cache)
699 1) A char representing the file destination (eg: store or cache)
701 2) A varint containing the length of the filename
700 2) A varint containing the length of the filename
702 3) A varint containing the length of file data
701 3) A varint containing the length of file data
703 4) N bytes containing the filename (the internal, store-agnostic form)
702 4) N bytes containing the filename (the internal, store-agnostic form)
704 5) N bytes containing the file data
703 5) N bytes containing the file data
705
704
706 Returns a 3-tuple of (file count, file size, data iterator).
705 Returns a 3-tuple of (file count, file size, data iterator).
707 """
706 """
708
707
709 with repo.lock():
708 with repo.lock():
710
709
711 repo.ui.debug(b'scanning\n')
710 repo.ui.debug(b'scanning\n')
712
711
713 entries, totalfilesize = _v2_walk(
712 entries, totalfilesize = _v2_walk(
714 repo,
713 repo,
715 includes=includes,
714 includes=includes,
716 excludes=excludes,
715 excludes=excludes,
717 includeobsmarkers=includeobsmarkers,
716 includeobsmarkers=includeobsmarkers,
718 )
717 )
719
718
720 chunks = _emit2(repo, entries, totalfilesize)
719 chunks = _emit2(repo, entries, totalfilesize)
721 first = next(chunks)
720 first = next(chunks)
722 assert first is None
721 assert first is None
723 _test_sync_point_walk_1(repo)
722 _test_sync_point_walk_1(repo)
724 _test_sync_point_walk_2(repo)
723 _test_sync_point_walk_2(repo)
725
724
726 return len(entries), totalfilesize, chunks
725 return len(entries), totalfilesize, chunks
727
726
728
727
729 @contextlib.contextmanager
728 @contextlib.contextmanager
730 def nested(*ctxs):
729 def nested(*ctxs):
731 this = ctxs[0]
730 this = ctxs[0]
732 rest = ctxs[1:]
731 rest = ctxs[1:]
733 with this:
732 with this:
734 if rest:
733 if rest:
735 with nested(*rest):
734 with nested(*rest):
736 yield
735 yield
737 else:
736 else:
738 yield
737 yield
739
738
740
739
741 def consumev2(repo, fp, filecount, filesize):
740 def consumev2(repo, fp, filecount, filesize):
742 """Apply the contents from a version 2 streaming clone.
741 """Apply the contents from a version 2 streaming clone.
743
742
744 Data is read from an object that only needs to provide a ``read(size)``
743 Data is read from an object that only needs to provide a ``read(size)``
745 method.
744 method.
746 """
745 """
747 with repo.lock():
746 with repo.lock():
748 repo.ui.status(
747 repo.ui.status(
749 _(b'%d files to transfer, %s of data\n')
748 _(b'%d files to transfer, %s of data\n')
750 % (filecount, util.bytecount(filesize))
749 % (filecount, util.bytecount(filesize))
751 )
750 )
752
751
753 start = util.timer()
752 start = util.timer()
754 progress = repo.ui.makeprogress(
753 progress = repo.ui.makeprogress(
755 _(b'clone'), total=filesize, unit=_(b'bytes')
754 _(b'clone'), total=filesize, unit=_(b'bytes')
756 )
755 )
757 progress.update(0)
756 progress.update(0)
758
757
759 vfsmap = _makemap(repo)
758 vfsmap = _makemap(repo)
760 # we keep repo.vfs out of the on purpose, ther are too many danger
759 # we keep repo.vfs out of the on purpose, ther are too many danger
761 # there (eg: .hg/hgrc),
760 # there (eg: .hg/hgrc),
762 #
761 #
763 # this assert is duplicated (from _makemap) as author might think this
762 # this assert is duplicated (from _makemap) as author might think this
764 # is fine, while this is really not fine.
763 # is fine, while this is really not fine.
765 if repo.vfs in vfsmap.values():
764 if repo.vfs in vfsmap.values():
766 raise error.ProgrammingError(
765 raise error.ProgrammingError(
767 b'repo.vfs must not be added to vfsmap for security reasons'
766 b'repo.vfs must not be added to vfsmap for security reasons'
768 )
767 )
769
768
770 with repo.transaction(b'clone'):
769 with repo.transaction(b'clone'):
771 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
770 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
772 with nested(*ctxs):
771 with nested(*ctxs):
773 for i in range(filecount):
772 for i in range(filecount):
774 src = util.readexactly(fp, 1)
773 src = util.readexactly(fp, 1)
775 vfs = vfsmap[src]
774 vfs = vfsmap[src]
776 namelen = util.uvarintdecodestream(fp)
775 namelen = util.uvarintdecodestream(fp)
777 datalen = util.uvarintdecodestream(fp)
776 datalen = util.uvarintdecodestream(fp)
778
777
779 name = util.readexactly(fp, namelen)
778 name = util.readexactly(fp, namelen)
780
779
781 if repo.ui.debugflag:
780 if repo.ui.debugflag:
782 repo.ui.debug(
781 repo.ui.debug(
783 b'adding [%s] %s (%s)\n'
782 b'adding [%s] %s (%s)\n'
784 % (src, name, util.bytecount(datalen))
783 % (src, name, util.bytecount(datalen))
785 )
784 )
786
785
787 with vfs(name, b'w') as ofp:
786 with vfs(name, b'w') as ofp:
788 for chunk in util.filechunkiter(fp, limit=datalen):
787 for chunk in util.filechunkiter(fp, limit=datalen):
789 progress.increment(step=len(chunk))
788 progress.increment(step=len(chunk))
790 ofp.write(chunk)
789 ofp.write(chunk)
791
790
792 # force @filecache properties to be reloaded from
791 # force @filecache properties to be reloaded from
793 # streamclone-ed file at next access
792 # streamclone-ed file at next access
794 repo.invalidate(clearfilecache=True)
793 repo.invalidate(clearfilecache=True)
795
794
796 elapsed = util.timer() - start
795 elapsed = util.timer() - start
797 if elapsed <= 0:
796 if elapsed <= 0:
798 elapsed = 0.001
797 elapsed = 0.001
799 repo.ui.status(
798 repo.ui.status(
800 _(b'transferred %s in %.1f seconds (%s/sec)\n')
799 _(b'transferred %s in %.1f seconds (%s/sec)\n')
801 % (
800 % (
802 util.bytecount(progress.pos),
801 util.bytecount(progress.pos),
803 elapsed,
802 elapsed,
804 util.bytecount(progress.pos / elapsed),
803 util.bytecount(progress.pos / elapsed),
805 )
804 )
806 )
805 )
807 progress.complete()
806 progress.complete()
808
807
809
808
810 def applybundlev2(repo, fp, filecount, filesize, requirements):
809 def applybundlev2(repo, fp, filecount, filesize, requirements):
811 from . import localrepo
810 from . import localrepo
812
811
813 missingreqs = [r for r in requirements if r not in repo.supported]
812 missingreqs = [r for r in requirements if r not in repo.supported]
814 if missingreqs:
813 if missingreqs:
815 raise error.Abort(
814 raise error.Abort(
816 _(b'unable to apply stream clone: unsupported format: %s')
815 _(b'unable to apply stream clone: unsupported format: %s')
817 % b', '.join(sorted(missingreqs))
816 % b', '.join(sorted(missingreqs))
818 )
817 )
819
818
820 consumev2(repo, fp, filecount, filesize)
819 consumev2(repo, fp, filecount, filesize)
821
820
822 repo.requirements = new_stream_clone_requirements(
821 repo.requirements = new_stream_clone_requirements(
823 repo.supportedformats,
824 repo.requirements,
822 repo.requirements,
825 requirements,
823 requirements,
826 )
824 )
827 repo.svfs.options = localrepo.resolvestorevfsoptions(
825 repo.svfs.options = localrepo.resolvestorevfsoptions(
828 repo.ui, repo.requirements, repo.features
826 repo.ui, repo.requirements, repo.features
829 )
827 )
830 scmutil.writereporequirements(repo)
828 scmutil.writereporequirements(repo)
831
829
832
830
833 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
831 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
834 hardlink = [True]
832 hardlink = [True]
835
833
836 def copy_used():
834 def copy_used():
837 hardlink[0] = False
835 hardlink[0] = False
838 progress.topic = _(b'copying')
836 progress.topic = _(b'copying')
839
837
840 for k, path, size in entries:
838 for k, path, size in entries:
841 src_vfs = src_vfs_map[k]
839 src_vfs = src_vfs_map[k]
842 dst_vfs = dst_vfs_map[k]
840 dst_vfs = dst_vfs_map[k]
843 src_path = src_vfs.join(path)
841 src_path = src_vfs.join(path)
844 dst_path = dst_vfs.join(path)
842 dst_path = dst_vfs.join(path)
845 # We cannot use dirname and makedirs of dst_vfs here because the store
843 # We cannot use dirname and makedirs of dst_vfs here because the store
846 # encoding confuses them. See issue 6581 for details.
844 # encoding confuses them. See issue 6581 for details.
847 dirname = os.path.dirname(dst_path)
845 dirname = os.path.dirname(dst_path)
848 if not os.path.exists(dirname):
846 if not os.path.exists(dirname):
849 util.makedirs(dirname)
847 util.makedirs(dirname)
850 dst_vfs.register_file(path)
848 dst_vfs.register_file(path)
851 # XXX we could use the #nb_bytes argument.
849 # XXX we could use the #nb_bytes argument.
852 util.copyfile(
850 util.copyfile(
853 src_path,
851 src_path,
854 dst_path,
852 dst_path,
855 hardlink=hardlink[0],
853 hardlink=hardlink[0],
856 no_hardlink_cb=copy_used,
854 no_hardlink_cb=copy_used,
857 check_fs_hardlink=False,
855 check_fs_hardlink=False,
858 )
856 )
859 progress.increment()
857 progress.increment()
860 return hardlink[0]
858 return hardlink[0]
861
859
862
860
863 def local_copy(src_repo, dest_repo):
861 def local_copy(src_repo, dest_repo):
864 """copy all content from one local repository to another
862 """copy all content from one local repository to another
865
863
866 This is useful for local clone"""
864 This is useful for local clone"""
867 src_store_requirements = {
865 src_store_requirements = {
868 r
866 r
869 for r in src_repo.requirements
867 for r in src_repo.requirements
870 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
868 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
871 }
869 }
872 dest_store_requirements = {
870 dest_store_requirements = {
873 r
871 r
874 for r in dest_repo.requirements
872 for r in dest_repo.requirements
875 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
873 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
876 }
874 }
877 assert src_store_requirements == dest_store_requirements
875 assert src_store_requirements == dest_store_requirements
878
876
879 with dest_repo.lock():
877 with dest_repo.lock():
880 with src_repo.lock():
878 with src_repo.lock():
881
879
882 # bookmark is not integrated to the streaming as it might use the
880 # bookmark is not integrated to the streaming as it might use the
883 # `repo.vfs` and they are too many sentitive data accessible
881 # `repo.vfs` and they are too many sentitive data accessible
884 # through `repo.vfs` to expose it to streaming clone.
882 # through `repo.vfs` to expose it to streaming clone.
885 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
883 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
886 srcbookmarks = src_book_vfs.join(b'bookmarks')
884 srcbookmarks = src_book_vfs.join(b'bookmarks')
887 bm_count = 0
885 bm_count = 0
888 if os.path.exists(srcbookmarks):
886 if os.path.exists(srcbookmarks):
889 bm_count = 1
887 bm_count = 1
890
888
891 entries, totalfilesize = _v2_walk(
889 entries, totalfilesize = _v2_walk(
892 src_repo,
890 src_repo,
893 includes=None,
891 includes=None,
894 excludes=None,
892 excludes=None,
895 includeobsmarkers=True,
893 includeobsmarkers=True,
896 )
894 )
897 src_vfs_map = _makemap(src_repo)
895 src_vfs_map = _makemap(src_repo)
898 dest_vfs_map = _makemap(dest_repo)
896 dest_vfs_map = _makemap(dest_repo)
899 progress = src_repo.ui.makeprogress(
897 progress = src_repo.ui.makeprogress(
900 topic=_(b'linking'),
898 topic=_(b'linking'),
901 total=len(entries) + bm_count,
899 total=len(entries) + bm_count,
902 unit=_(b'files'),
900 unit=_(b'files'),
903 )
901 )
904 # copy files
902 # copy files
905 #
903 #
906 # We could copy the full file while the source repository is locked
904 # We could copy the full file while the source repository is locked
907 # and the other one without the lock. However, in the linking case,
905 # and the other one without the lock. However, in the linking case,
908 # this would also requires checks that nobody is appending any data
906 # this would also requires checks that nobody is appending any data
909 # to the files while we do the clone, so this is not done yet. We
907 # to the files while we do the clone, so this is not done yet. We
910 # could do this blindly when copying files.
908 # could do this blindly when copying files.
911 files = ((k, path, size) for k, path, ftype, size in entries)
909 files = ((k, path, size) for k, path, ftype, size in entries)
912 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
910 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
913
911
914 # copy bookmarks over
912 # copy bookmarks over
915 if bm_count:
913 if bm_count:
916 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
914 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
917 dstbookmarks = dst_book_vfs.join(b'bookmarks')
915 dstbookmarks = dst_book_vfs.join(b'bookmarks')
918 util.copyfile(srcbookmarks, dstbookmarks)
916 util.copyfile(srcbookmarks, dstbookmarks)
919 progress.complete()
917 progress.complete()
920 if hardlink:
918 if hardlink:
921 msg = b'linked %d files\n'
919 msg = b'linked %d files\n'
922 else:
920 else:
923 msg = b'copied %d files\n'
921 msg = b'copied %d files\n'
924 src_repo.ui.debug(msg % (len(entries) + bm_count))
922 src_repo.ui.debug(msg % (len(entries) + bm_count))
925
923
926 with dest_repo.transaction(b"localclone") as tr:
924 with dest_repo.transaction(b"localclone") as tr:
927 dest_repo.store.write(tr)
925 dest_repo.store.write(tr)
928
926
929 # clean up transaction file as they do not make sense
927 # clean up transaction file as they do not make sense
930 undo_files = [(dest_repo.svfs, b'undo.backupfiles')]
928 undo_files = [(dest_repo.svfs, b'undo.backupfiles')]
931 undo_files.extend(dest_repo.undofiles())
929 undo_files.extend(dest_repo.undofiles())
932 for undovfs, undofile in undo_files:
930 for undovfs, undofile in undo_files:
933 try:
931 try:
934 undovfs.unlink(undofile)
932 undovfs.unlink(undofile)
935 except OSError as e:
933 except OSError as e:
936 if e.errno != errno.ENOENT:
934 if e.errno != errno.ENOENT:
937 msg = _(b'error removing %s: %s\n')
935 msg = _(b'error removing %s: %s\n')
938 path = undovfs.join(undofile)
936 path = undovfs.join(undofile)
939 e_msg = stringutil.forcebytestr(e)
937 e_msg = stringutil.forcebytestr(e)
940 msg %= (path, e_msg)
938 msg %= (path, e_msg)
941 dest_repo.ui.warn(msg)
939 dest_repo.ui.warn(msg)
General Comments 0
You need to be logged in to leave comments. Login now