##// END OF EJS Templates
stream-clone: add a explicit set list requirements relevant to stream clone...
marmoute -
r49447:baddab22 default
parent child Browse files
Show More
@@ -1,427 +1,427
1 1 # bundlecaches.py - utility to deal with pre-computed bundle for servers
2 2 #
3 3 # This software may be used and distributed according to the terms of the
4 4 # GNU General Public License version 2 or any later version.
5 5
6 6 from .i18n import _
7 7
8 8 from .thirdparty import attr
9 9
10 10 from . import (
11 11 error,
12 12 requirements as requirementsmod,
13 13 sslutil,
14 14 util,
15 15 )
16 16 from .utils import stringutil
17 17
18 18 urlreq = util.urlreq
19 19
20 20 CB_MANIFEST_FILE = b'clonebundles.manifest'
21 21
22 22
23 23 @attr.s
24 24 class bundlespec(object):
25 25 compression = attr.ib()
26 26 wirecompression = attr.ib()
27 27 version = attr.ib()
28 28 wireversion = attr.ib()
29 29 params = attr.ib()
30 30 contentopts = attr.ib()
31 31
32 32
33 33 # Maps bundle version human names to changegroup versions.
34 34 _bundlespeccgversions = {
35 35 b'v1': b'01',
36 36 b'v2': b'02',
37 37 b'packed1': b's1',
38 38 b'bundle2': b'02', # legacy
39 39 }
40 40
41 41 # Maps bundle version with content opts to choose which part to bundle
42 42 _bundlespeccontentopts = {
43 43 b'v1': {
44 44 b'changegroup': True,
45 45 b'cg.version': b'01',
46 46 b'obsolescence': False,
47 47 b'phases': False,
48 48 b'tagsfnodescache': False,
49 49 b'revbranchcache': False,
50 50 },
51 51 b'v2': {
52 52 b'changegroup': True,
53 53 b'cg.version': b'02',
54 54 b'obsolescence': False,
55 55 b'phases': False,
56 56 b'tagsfnodescache': True,
57 57 b'revbranchcache': True,
58 58 },
59 59 b'packed1': {b'cg.version': b's1'},
60 60 }
61 61 _bundlespeccontentopts[b'bundle2'] = _bundlespeccontentopts[b'v2']
62 62
63 63 _bundlespecvariants = {
64 64 b"streamv2": {
65 65 b"changegroup": False,
66 66 b"streamv2": True,
67 67 b"tagsfnodescache": False,
68 68 b"revbranchcache": False,
69 69 }
70 70 }
71 71
72 72 # Compression engines allowed in version 1. THIS SHOULD NEVER CHANGE.
73 73 _bundlespecv1compengines = {b'gzip', b'bzip2', b'none'}
74 74
75 75
76 76 def parsebundlespec(repo, spec, strict=True):
77 77 """Parse a bundle string specification into parts.
78 78
79 79 Bundle specifications denote a well-defined bundle/exchange format.
80 80 The content of a given specification should not change over time in
81 81 order to ensure that bundles produced by a newer version of Mercurial are
82 82 readable from an older version.
83 83
84 84 The string currently has the form:
85 85
86 86 <compression>-<type>[;<parameter0>[;<parameter1>]]
87 87
88 88 Where <compression> is one of the supported compression formats
89 89 and <type> is (currently) a version string. A ";" can follow the type and
90 90 all text afterwards is interpreted as URI encoded, ";" delimited key=value
91 91 pairs.
92 92
93 93 If ``strict`` is True (the default) <compression> is required. Otherwise,
94 94 it is optional.
95 95
96 96 Returns a bundlespec object of (compression, version, parameters).
97 97 Compression will be ``None`` if not in strict mode and a compression isn't
98 98 defined.
99 99
100 100 An ``InvalidBundleSpecification`` is raised when the specification is
101 101 not syntactically well formed.
102 102
103 103 An ``UnsupportedBundleSpecification`` is raised when the compression or
104 104 bundle type/version is not recognized.
105 105
106 106 Note: this function will likely eventually return a more complex data
107 107 structure, including bundle2 part information.
108 108 """
109 109
110 110 def parseparams(s):
111 111 if b';' not in s:
112 112 return s, {}
113 113
114 114 params = {}
115 115 version, paramstr = s.split(b';', 1)
116 116
117 117 for p in paramstr.split(b';'):
118 118 if b'=' not in p:
119 119 raise error.InvalidBundleSpecification(
120 120 _(
121 121 b'invalid bundle specification: '
122 122 b'missing "=" in parameter: %s'
123 123 )
124 124 % p
125 125 )
126 126
127 127 key, value = p.split(b'=', 1)
128 128 key = urlreq.unquote(key)
129 129 value = urlreq.unquote(value)
130 130 params[key] = value
131 131
132 132 return version, params
133 133
134 134 if strict and b'-' not in spec:
135 135 raise error.InvalidBundleSpecification(
136 136 _(
137 137 b'invalid bundle specification; '
138 138 b'must be prefixed with compression: %s'
139 139 )
140 140 % spec
141 141 )
142 142
143 143 if b'-' in spec:
144 144 compression, version = spec.split(b'-', 1)
145 145
146 146 if compression not in util.compengines.supportedbundlenames:
147 147 raise error.UnsupportedBundleSpecification(
148 148 _(b'%s compression is not supported') % compression
149 149 )
150 150
151 151 version, params = parseparams(version)
152 152
153 153 if version not in _bundlespeccgversions:
154 154 raise error.UnsupportedBundleSpecification(
155 155 _(b'%s is not a recognized bundle version') % version
156 156 )
157 157 else:
158 158 # Value could be just the compression or just the version, in which
159 159 # case some defaults are assumed (but only when not in strict mode).
160 160 assert not strict
161 161
162 162 spec, params = parseparams(spec)
163 163
164 164 if spec in util.compengines.supportedbundlenames:
165 165 compression = spec
166 166 version = b'v1'
167 167 # Generaldelta repos require v2.
168 168 if requirementsmod.GENERALDELTA_REQUIREMENT in repo.requirements:
169 169 version = b'v2'
170 170 elif requirementsmod.REVLOGV2_REQUIREMENT in repo.requirements:
171 171 version = b'v2'
172 172 # Modern compression engines require v2.
173 173 if compression not in _bundlespecv1compengines:
174 174 version = b'v2'
175 175 elif spec in _bundlespeccgversions:
176 176 if spec == b'packed1':
177 177 compression = b'none'
178 178 else:
179 179 compression = b'bzip2'
180 180 version = spec
181 181 else:
182 182 raise error.UnsupportedBundleSpecification(
183 183 _(b'%s is not a recognized bundle specification') % spec
184 184 )
185 185
186 186 # Bundle version 1 only supports a known set of compression engines.
187 187 if version == b'v1' and compression not in _bundlespecv1compengines:
188 188 raise error.UnsupportedBundleSpecification(
189 189 _(b'compression engine %s is not supported on v1 bundles')
190 190 % compression
191 191 )
192 192
193 193 # The specification for packed1 can optionally declare the data formats
194 194 # required to apply it. If we see this metadata, compare against what the
195 195 # repo supports and error if the bundle isn't compatible.
196 196 if version == b'packed1' and b'requirements' in params:
197 197 requirements = set(params[b'requirements'].split(b','))
198 missingreqs = requirements - repo.supportedformats
198 missingreqs = requirements - requirementsmod.STREAM_FIXED_REQUIREMENTS
199 199 if missingreqs:
200 200 raise error.UnsupportedBundleSpecification(
201 201 _(b'missing support for repository features: %s')
202 202 % b', '.join(sorted(missingreqs))
203 203 )
204 204
205 205 # Compute contentopts based on the version
206 206 contentopts = _bundlespeccontentopts.get(version, {}).copy()
207 207
208 208 # Process the variants
209 209 if b"stream" in params and params[b"stream"] == b"v2":
210 210 variant = _bundlespecvariants[b"streamv2"]
211 211 contentopts.update(variant)
212 212
213 213 engine = util.compengines.forbundlename(compression)
214 214 compression, wirecompression = engine.bundletype()
215 215 wireversion = _bundlespeccgversions[version]
216 216
217 217 return bundlespec(
218 218 compression, wirecompression, version, wireversion, params, contentopts
219 219 )
220 220
221 221
222 222 def parseclonebundlesmanifest(repo, s):
223 223 """Parses the raw text of a clone bundles manifest.
224 224
225 225 Returns a list of dicts. The dicts have a ``URL`` key corresponding
226 226 to the URL and other keys are the attributes for the entry.
227 227 """
228 228 m = []
229 229 for line in s.splitlines():
230 230 fields = line.split()
231 231 if not fields:
232 232 continue
233 233 attrs = {b'URL': fields[0]}
234 234 for rawattr in fields[1:]:
235 235 key, value = rawattr.split(b'=', 1)
236 236 key = util.urlreq.unquote(key)
237 237 value = util.urlreq.unquote(value)
238 238 attrs[key] = value
239 239
240 240 # Parse BUNDLESPEC into components. This makes client-side
241 241 # preferences easier to specify since you can prefer a single
242 242 # component of the BUNDLESPEC.
243 243 if key == b'BUNDLESPEC':
244 244 try:
245 245 bundlespec = parsebundlespec(repo, value)
246 246 attrs[b'COMPRESSION'] = bundlespec.compression
247 247 attrs[b'VERSION'] = bundlespec.version
248 248 except error.InvalidBundleSpecification:
249 249 pass
250 250 except error.UnsupportedBundleSpecification:
251 251 pass
252 252
253 253 m.append(attrs)
254 254
255 255 return m
256 256
257 257
258 258 def isstreamclonespec(bundlespec):
259 259 # Stream clone v1
260 260 if bundlespec.wirecompression == b'UN' and bundlespec.wireversion == b's1':
261 261 return True
262 262
263 263 # Stream clone v2
264 264 if (
265 265 bundlespec.wirecompression == b'UN'
266 266 and bundlespec.wireversion == b'02'
267 267 and bundlespec.contentopts.get(b'streamv2')
268 268 ):
269 269 return True
270 270
271 271 return False
272 272
273 273
274 274 def filterclonebundleentries(repo, entries, streamclonerequested=False):
275 275 """Remove incompatible clone bundle manifest entries.
276 276
277 277 Accepts a list of entries parsed with ``parseclonebundlesmanifest``
278 278 and returns a new list consisting of only the entries that this client
279 279 should be able to apply.
280 280
281 281 There is no guarantee we'll be able to apply all returned entries because
282 282 the metadata we use to filter on may be missing or wrong.
283 283 """
284 284 newentries = []
285 285 for entry in entries:
286 286 spec = entry.get(b'BUNDLESPEC')
287 287 if spec:
288 288 try:
289 289 bundlespec = parsebundlespec(repo, spec, strict=True)
290 290
291 291 # If a stream clone was requested, filter out non-streamclone
292 292 # entries.
293 293 if streamclonerequested and not isstreamclonespec(bundlespec):
294 294 repo.ui.debug(
295 295 b'filtering %s because not a stream clone\n'
296 296 % entry[b'URL']
297 297 )
298 298 continue
299 299
300 300 except error.InvalidBundleSpecification as e:
301 301 repo.ui.debug(stringutil.forcebytestr(e) + b'\n')
302 302 continue
303 303 except error.UnsupportedBundleSpecification as e:
304 304 repo.ui.debug(
305 305 b'filtering %s because unsupported bundle '
306 306 b'spec: %s\n' % (entry[b'URL'], stringutil.forcebytestr(e))
307 307 )
308 308 continue
309 309 # If we don't have a spec and requested a stream clone, we don't know
310 310 # what the entry is so don't attempt to apply it.
311 311 elif streamclonerequested:
312 312 repo.ui.debug(
313 313 b'filtering %s because cannot determine if a stream '
314 314 b'clone bundle\n' % entry[b'URL']
315 315 )
316 316 continue
317 317
318 318 if b'REQUIRESNI' in entry and not sslutil.hassni:
319 319 repo.ui.debug(
320 320 b'filtering %s because SNI not supported\n' % entry[b'URL']
321 321 )
322 322 continue
323 323
324 324 if b'REQUIREDRAM' in entry:
325 325 try:
326 326 requiredram = util.sizetoint(entry[b'REQUIREDRAM'])
327 327 except error.ParseError:
328 328 repo.ui.debug(
329 329 b'filtering %s due to a bad REQUIREDRAM attribute\n'
330 330 % entry[b'URL']
331 331 )
332 332 continue
333 333 actualram = repo.ui.estimatememory()
334 334 if actualram is not None and actualram * 0.66 < requiredram:
335 335 repo.ui.debug(
336 336 b'filtering %s as it needs more than 2/3 of system memory\n'
337 337 % entry[b'URL']
338 338 )
339 339 continue
340 340
341 341 newentries.append(entry)
342 342
343 343 return newentries
344 344
345 345
346 346 class clonebundleentry(object):
347 347 """Represents an item in a clone bundles manifest.
348 348
349 349 This rich class is needed to support sorting since sorted() in Python 3
350 350 doesn't support ``cmp`` and our comparison is complex enough that ``key=``
351 351 won't work.
352 352 """
353 353
354 354 def __init__(self, value, prefers):
355 355 self.value = value
356 356 self.prefers = prefers
357 357
358 358 def _cmp(self, other):
359 359 for prefkey, prefvalue in self.prefers:
360 360 avalue = self.value.get(prefkey)
361 361 bvalue = other.value.get(prefkey)
362 362
363 363 # Special case for b missing attribute and a matches exactly.
364 364 if avalue is not None and bvalue is None and avalue == prefvalue:
365 365 return -1
366 366
367 367 # Special case for a missing attribute and b matches exactly.
368 368 if bvalue is not None and avalue is None and bvalue == prefvalue:
369 369 return 1
370 370
371 371 # We can't compare unless attribute present on both.
372 372 if avalue is None or bvalue is None:
373 373 continue
374 374
375 375 # Same values should fall back to next attribute.
376 376 if avalue == bvalue:
377 377 continue
378 378
379 379 # Exact matches come first.
380 380 if avalue == prefvalue:
381 381 return -1
382 382 if bvalue == prefvalue:
383 383 return 1
384 384
385 385 # Fall back to next attribute.
386 386 continue
387 387
388 388 # If we got here we couldn't sort by attributes and prefers. Fall
389 389 # back to index order.
390 390 return 0
391 391
392 392 def __lt__(self, other):
393 393 return self._cmp(other) < 0
394 394
395 395 def __gt__(self, other):
396 396 return self._cmp(other) > 0
397 397
398 398 def __eq__(self, other):
399 399 return self._cmp(other) == 0
400 400
401 401 def __le__(self, other):
402 402 return self._cmp(other) <= 0
403 403
404 404 def __ge__(self, other):
405 405 return self._cmp(other) >= 0
406 406
407 407 def __ne__(self, other):
408 408 return self._cmp(other) != 0
409 409
410 410
411 411 def sortclonebundleentries(ui, entries):
412 412 prefers = ui.configlist(b'ui', b'clonebundleprefers')
413 413 if not prefers:
414 414 return list(entries)
415 415
416 416 def _split(p):
417 417 if b'=' not in p:
418 418 hint = _(b"each comma separated item should be key=value pairs")
419 419 raise error.Abort(
420 420 _(b"invalid ui.clonebundleprefers item: %s") % p, hint=hint
421 421 )
422 422 return p.split(b'=', 1)
423 423
424 424 prefers = [_split(p) for p in prefers]
425 425
426 426 items = sorted(clonebundleentry(v, prefers) for v in entries)
427 427 return [i.value for i in items]
@@ -1,91 +1,111
1 1 # requirements.py - objects and functions related to repository requirements
2 2 #
3 3 # Copyright 2005-2007 Olivia Mackall <olivia@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 GENERALDELTA_REQUIREMENT = b'generaldelta'
11 11 DOTENCODE_REQUIREMENT = b'dotencode'
12 12 STORE_REQUIREMENT = b'store'
13 13 FNCACHE_REQUIREMENT = b'fncache'
14 14
15 15 DIRSTATE_V2_REQUIREMENT = b'dirstate-v2'
16 16
17 17 # When narrowing is finalized and no longer subject to format changes,
18 18 # we should move this to just "narrow" or similar.
19 19 NARROW_REQUIREMENT = b'narrowhg-experimental'
20 20
21 21 # Enables sparse working directory usage
22 22 SPARSE_REQUIREMENT = b'exp-sparse'
23 23
24 24 # Enables the internal phase which is used to hide changesets instead
25 25 # of stripping them
26 26 INTERNAL_PHASE_REQUIREMENT = b'internal-phase'
27 27
28 28 # Stores manifest in Tree structure
29 29 TREEMANIFEST_REQUIREMENT = b'treemanifest'
30 30
31 31 REVLOGV1_REQUIREMENT = b'revlogv1'
32 32
33 33 # Increment the sub-version when the revlog v2 format changes to lock out old
34 34 # clients.
35 35 CHANGELOGV2_REQUIREMENT = b'exp-changelog-v2'
36 36
37 37 # Increment the sub-version when the revlog v2 format changes to lock out old
38 38 # clients.
39 39 REVLOGV2_REQUIREMENT = b'exp-revlogv2.2'
40 40
41 41 # A repository with the sparserevlog feature will have delta chains that
42 42 # can spread over a larger span. Sparse reading cuts these large spans into
43 43 # pieces, so that each piece isn't too big.
44 44 # Without the sparserevlog capability, reading from the repository could use
45 45 # huge amounts of memory, because the whole span would be read at once,
46 46 # including all the intermediate revisions that aren't pertinent for the chain.
47 47 # This is why once a repository has enabled sparse-read, it becomes required.
48 48 SPARSEREVLOG_REQUIREMENT = b'sparserevlog'
49 49
50 50 # A repository with the the copies-sidedata-changeset requirement will store
51 51 # copies related information in changeset's sidedata.
52 52 COPIESSDC_REQUIREMENT = b'exp-copies-sidedata-changeset'
53 53
54 54 # The repository use persistent nodemap for the changelog and the manifest.
55 55 NODEMAP_REQUIREMENT = b'persistent-nodemap'
56 56
57 57 # Denotes that the current repository is a share
58 58 SHARED_REQUIREMENT = b'shared'
59 59
60 60 # Denotes that current repository is a share and the shared source path is
61 61 # relative to the current repository root path
62 62 RELATIVE_SHARED_REQUIREMENT = b'relshared'
63 63
64 64 # A repository with share implemented safely. The repository has different
65 65 # store and working copy requirements i.e. both `.hg/requires` and
66 66 # `.hg/store/requires` are present.
67 67 SHARESAFE_REQUIREMENT = b'share-safe'
68 68
69 69 # Bookmarks must be stored in the `store` part of the repository and will be
70 70 # share accross shares
71 71 BOOKMARKS_IN_STORE_REQUIREMENT = b'bookmarksinstore'
72 72
73 73 # List of requirements which are working directory specific
74 74 # These requirements cannot be shared between repositories if they
75 75 # share the same store
76 76 # * sparse is a working directory specific functionality and hence working
77 77 # directory specific requirement
78 78 # * SHARED_REQUIREMENT and RELATIVE_SHARED_REQUIREMENT are requirements which
79 79 # represents that the current working copy/repository shares store of another
80 80 # repo. Hence both of them should be stored in working copy
81 81 # * SHARESAFE_REQUIREMENT needs to be stored in working dir to mark that rest of
82 82 # the requirements are stored in store's requires
83 83 # * DIRSTATE_V2_REQUIREMENT affects .hg/dirstate, of which there is one per
84 84 # working directory.
85 85 WORKING_DIR_REQUIREMENTS = {
86 86 SPARSE_REQUIREMENT,
87 87 SHARED_REQUIREMENT,
88 88 RELATIVE_SHARED_REQUIREMENT,
89 89 SHARESAFE_REQUIREMENT,
90 90 DIRSTATE_V2_REQUIREMENT,
91 91 }
92
93 # List of requirement that impact "stream-clone" (and hardlink clone) and
94 # cannot be changed in such cases.
95 #
96 # requirements not in this list are safe to be altered during stream-clone.
97 #
98 # note: the list is currently inherited from previous code and miss some relevant requirement while containing some irrelevant ones.
99 STREAM_FIXED_REQUIREMENTS = {
100 BOOKMARKS_IN_STORE_REQUIREMENT,
101 CHANGELOGV2_REQUIREMENT,
102 COPIESSDC_REQUIREMENT,
103 DIRSTATE_V2_REQUIREMENT,
104 GENERALDELTA_REQUIREMENT,
105 NODEMAP_REQUIREMENT,
106 REVLOGV1_REQUIREMENT,
107 REVLOGV2_REQUIREMENT,
108 SHARESAFE_REQUIREMENT,
109 SPARSEREVLOG_REQUIREMENT,
110 TREEMANIFEST_REQUIREMENT,
111 }
@@ -1,941 +1,939
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import contextlib
11 11 import errno
12 12 import os
13 13 import struct
14 14
15 15 from .i18n import _
16 16 from .pycompat import open
17 17 from .interfaces import repository
18 18 from . import (
19 19 bookmarks,
20 20 cacheutil,
21 21 error,
22 22 narrowspec,
23 23 phases,
24 24 pycompat,
25 25 requirements as requirementsmod,
26 26 scmutil,
27 27 store,
28 28 util,
29 29 )
30 30 from .utils import (
31 31 stringutil,
32 32 )
33 33
34 34
35 def new_stream_clone_requirements(
36 supported_formats, default_requirements, streamed_requirements
37 ):
35 def new_stream_clone_requirements(default_requirements, streamed_requirements):
38 36 """determine the final set of requirement for a new stream clone
39 37
40 38 this method combine the "default" requirements that a new repository would
41 39 use with the constaint we get from the stream clone content. We keep local
42 40 configuration choice when possible.
43 41 """
44 42 requirements = set(default_requirements)
45 requirements -= supported_formats
43 requirements -= requirementsmod.STREAM_FIXED_REQUIREMENTS
46 44 requirements.update(streamed_requirements)
47 45 return requirements
48 46
49 47
50 48 def streamed_requirements(repo):
51 49 """the set of requirement the new clone will have to support
52 50
53 51 This is used for advertising the stream options and to generate the actual
54 52 stream content."""
55 requiredformats = repo.requirements & repo.supportedformats
53 requiredformats = (
54 repo.requirements & requirementsmod.STREAM_FIXED_REQUIREMENTS
55 )
56 56 return requiredformats
57 57
58 58
59 59 def canperformstreamclone(pullop, bundle2=False):
60 60 """Whether it is possible to perform a streaming clone as part of pull.
61 61
62 62 ``bundle2`` will cause the function to consider stream clone through
63 63 bundle2 and only through bundle2.
64 64
65 65 Returns a tuple of (supported, requirements). ``supported`` is True if
66 66 streaming clone is supported and False otherwise. ``requirements`` is
67 67 a set of repo requirements from the remote, or ``None`` if stream clone
68 68 isn't supported.
69 69 """
70 70 repo = pullop.repo
71 71 remote = pullop.remote
72 72
73 73 bundle2supported = False
74 74 if pullop.canusebundle2:
75 75 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
76 76 bundle2supported = True
77 77 # else
78 78 # Server doesn't support bundle2 stream clone or doesn't support
79 79 # the versions we support. Fall back and possibly allow legacy.
80 80
81 81 # Ensures legacy code path uses available bundle2.
82 82 if bundle2supported and not bundle2:
83 83 return False, None
84 84 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
85 85 elif bundle2 and not bundle2supported:
86 86 return False, None
87 87
88 88 # Streaming clone only works on empty repositories.
89 89 if len(repo):
90 90 return False, None
91 91
92 92 # Streaming clone only works if all data is being requested.
93 93 if pullop.heads:
94 94 return False, None
95 95
96 96 streamrequested = pullop.streamclonerequested
97 97
98 98 # If we don't have a preference, let the server decide for us. This
99 99 # likely only comes into play in LANs.
100 100 if streamrequested is None:
101 101 # The server can advertise whether to prefer streaming clone.
102 102 streamrequested = remote.capable(b'stream-preferred')
103 103
104 104 if not streamrequested:
105 105 return False, None
106 106
107 107 # In order for stream clone to work, the client has to support all the
108 108 # requirements advertised by the server.
109 109 #
110 110 # The server advertises its requirements via the "stream" and "streamreqs"
111 111 # capability. "stream" (a value-less capability) is advertised if and only
112 112 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
113 113 # is advertised and contains a comma-delimited list of requirements.
114 114 requirements = set()
115 115 if remote.capable(b'stream'):
116 116 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
117 117 else:
118 118 streamreqs = remote.capable(b'streamreqs')
119 119 # This is weird and shouldn't happen with modern servers.
120 120 if not streamreqs:
121 121 pullop.repo.ui.warn(
122 122 _(
123 123 b'warning: stream clone requested but server has them '
124 124 b'disabled\n'
125 125 )
126 126 )
127 127 return False, None
128 128
129 129 streamreqs = set(streamreqs.split(b','))
130 130 # Server requires something we don't support. Bail.
131 131 missingreqs = streamreqs - repo.supported
132 132 if missingreqs:
133 133 pullop.repo.ui.warn(
134 134 _(
135 135 b'warning: stream clone requested but client is missing '
136 136 b'requirements: %s\n'
137 137 )
138 138 % b', '.join(sorted(missingreqs))
139 139 )
140 140 pullop.repo.ui.warn(
141 141 _(
142 142 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
143 143 b'for more information)\n'
144 144 )
145 145 )
146 146 return False, None
147 147 requirements = streamreqs
148 148
149 149 return True, requirements
150 150
151 151
152 152 def maybeperformlegacystreamclone(pullop):
153 153 """Possibly perform a legacy stream clone operation.
154 154
155 155 Legacy stream clones are performed as part of pull but before all other
156 156 operations.
157 157
158 158 A legacy stream clone will not be performed if a bundle2 stream clone is
159 159 supported.
160 160 """
161 161 from . import localrepo
162 162
163 163 supported, requirements = canperformstreamclone(pullop)
164 164
165 165 if not supported:
166 166 return
167 167
168 168 repo = pullop.repo
169 169 remote = pullop.remote
170 170
171 171 # Save remote branchmap. We will use it later to speed up branchcache
172 172 # creation.
173 173 rbranchmap = None
174 174 if remote.capable(b'branchmap'):
175 175 with remote.commandexecutor() as e:
176 176 rbranchmap = e.callcommand(b'branchmap', {}).result()
177 177
178 178 repo.ui.status(_(b'streaming all changes\n'))
179 179
180 180 with remote.commandexecutor() as e:
181 181 fp = e.callcommand(b'stream_out', {}).result()
182 182
183 183 # TODO strictly speaking, this code should all be inside the context
184 184 # manager because the context manager is supposed to ensure all wire state
185 185 # is flushed when exiting. But the legacy peers don't do this, so it
186 186 # doesn't matter.
187 187 l = fp.readline()
188 188 try:
189 189 resp = int(l)
190 190 except ValueError:
191 191 raise error.ResponseError(
192 192 _(b'unexpected response from remote server:'), l
193 193 )
194 194 if resp == 1:
195 195 raise error.Abort(_(b'operation forbidden by server'))
196 196 elif resp == 2:
197 197 raise error.Abort(_(b'locking the remote repository failed'))
198 198 elif resp != 0:
199 199 raise error.Abort(_(b'the server sent an unknown error code'))
200 200
201 201 l = fp.readline()
202 202 try:
203 203 filecount, bytecount = map(int, l.split(b' ', 1))
204 204 except (ValueError, TypeError):
205 205 raise error.ResponseError(
206 206 _(b'unexpected response from remote server:'), l
207 207 )
208 208
209 209 with repo.lock():
210 210 consumev1(repo, fp, filecount, bytecount)
211 211 repo.requirements = new_stream_clone_requirements(
212 repo.supportedformats,
213 212 repo.requirements,
214 213 requirements,
215 214 )
216 215 repo.svfs.options = localrepo.resolvestorevfsoptions(
217 216 repo.ui, repo.requirements, repo.features
218 217 )
219 218 scmutil.writereporequirements(repo)
220 219
221 220 if rbranchmap:
222 221 repo._branchcaches.replace(repo, rbranchmap)
223 222
224 223 repo.invalidate()
225 224
226 225
227 226 def allowservergeneration(repo):
228 227 """Whether streaming clones are allowed from the server."""
229 228 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
230 229 return False
231 230
232 231 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
233 232 return False
234 233
235 234 # The way stream clone works makes it impossible to hide secret changesets.
236 235 # So don't allow this by default.
237 236 secret = phases.hassecret(repo)
238 237 if secret:
239 238 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
240 239
241 240 return True
242 241
243 242
244 243 # This is it's own function so extensions can override it.
245 244 def _walkstreamfiles(repo, matcher=None):
246 245 return repo.store.walk(matcher)
247 246
248 247
249 248 def generatev1(repo):
250 249 """Emit content for version 1 of a streaming clone.
251 250
252 251 This returns a 3-tuple of (file count, byte size, data iterator).
253 252
254 253 The data iterator consists of N entries for each file being transferred.
255 254 Each file entry starts as a line with the file name and integer size
256 255 delimited by a null byte.
257 256
258 257 The raw file data follows. Following the raw file data is the next file
259 258 entry, or EOF.
260 259
261 260 When used on the wire protocol, an additional line indicating protocol
262 261 success will be prepended to the stream. This function is not responsible
263 262 for adding it.
264 263
265 264 This function will obtain a repository lock to ensure a consistent view of
266 265 the store is captured. It therefore may raise LockError.
267 266 """
268 267 entries = []
269 268 total_bytes = 0
270 269 # Get consistent snapshot of repo, lock during scan.
271 270 with repo.lock():
272 271 repo.ui.debug(b'scanning\n')
273 272 for file_type, name, size in _walkstreamfiles(repo):
274 273 if size:
275 274 entries.append((name, size))
276 275 total_bytes += size
277 276 _test_sync_point_walk_1(repo)
278 277 _test_sync_point_walk_2(repo)
279 278
280 279 repo.ui.debug(
281 280 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
282 281 )
283 282
284 283 svfs = repo.svfs
285 284 debugflag = repo.ui.debugflag
286 285
287 286 def emitrevlogdata():
288 287 for name, size in entries:
289 288 if debugflag:
290 289 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
291 290 # partially encode name over the wire for backwards compat
292 291 yield b'%s\0%d\n' % (store.encodedir(name), size)
293 292 # auditing at this stage is both pointless (paths are already
294 293 # trusted by the local repo) and expensive
295 294 with svfs(name, b'rb', auditpath=False) as fp:
296 295 if size <= 65536:
297 296 yield fp.read(size)
298 297 else:
299 298 for chunk in util.filechunkiter(fp, limit=size):
300 299 yield chunk
301 300
302 301 return len(entries), total_bytes, emitrevlogdata()
303 302
304 303
305 304 def generatev1wireproto(repo):
306 305 """Emit content for version 1 of streaming clone suitable for the wire.
307 306
308 307 This is the data output from ``generatev1()`` with 2 header lines. The
309 308 first line indicates overall success. The 2nd contains the file count and
310 309 byte size of payload.
311 310
312 311 The success line contains "0" for success, "1" for stream generation not
313 312 allowed, and "2" for error locking the repository (possibly indicating
314 313 a permissions error for the server process).
315 314 """
316 315 if not allowservergeneration(repo):
317 316 yield b'1\n'
318 317 return
319 318
320 319 try:
321 320 filecount, bytecount, it = generatev1(repo)
322 321 except error.LockError:
323 322 yield b'2\n'
324 323 return
325 324
326 325 # Indicates successful response.
327 326 yield b'0\n'
328 327 yield b'%d %d\n' % (filecount, bytecount)
329 328 for chunk in it:
330 329 yield chunk
331 330
332 331
333 332 def generatebundlev1(repo, compression=b'UN'):
334 333 """Emit content for version 1 of a stream clone bundle.
335 334
336 335 The first 4 bytes of the output ("HGS1") denote this as stream clone
337 336 bundle version 1.
338 337
339 338 The next 2 bytes indicate the compression type. Only "UN" is currently
340 339 supported.
341 340
342 341 The next 16 bytes are two 64-bit big endian unsigned integers indicating
343 342 file count and byte count, respectively.
344 343
345 344 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
346 345 of the requirements string, including a trailing \0. The following N bytes
347 346 are the requirements string, which is ASCII containing a comma-delimited
348 347 list of repo requirements that are needed to support the data.
349 348
350 349 The remaining content is the output of ``generatev1()`` (which may be
351 350 compressed in the future).
352 351
353 352 Returns a tuple of (requirements, data generator).
354 353 """
355 354 if compression != b'UN':
356 355 raise ValueError(b'we do not support the compression argument yet')
357 356
358 357 requirements = streamed_requirements(repo)
359 358 requires = b','.join(sorted(requirements))
360 359
361 360 def gen():
362 361 yield b'HGS1'
363 362 yield compression
364 363
365 364 filecount, bytecount, it = generatev1(repo)
366 365 repo.ui.status(
367 366 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
368 367 )
369 368
370 369 yield struct.pack(b'>QQ', filecount, bytecount)
371 370 yield struct.pack(b'>H', len(requires) + 1)
372 371 yield requires + b'\0'
373 372
374 373 # This is where we'll add compression in the future.
375 374 assert compression == b'UN'
376 375
377 376 progress = repo.ui.makeprogress(
378 377 _(b'bundle'), total=bytecount, unit=_(b'bytes')
379 378 )
380 379 progress.update(0)
381 380
382 381 for chunk in it:
383 382 progress.increment(step=len(chunk))
384 383 yield chunk
385 384
386 385 progress.complete()
387 386
388 387 return requirements, gen()
389 388
390 389
391 390 def consumev1(repo, fp, filecount, bytecount):
392 391 """Apply the contents from version 1 of a streaming clone file handle.
393 392
394 393 This takes the output from "stream_out" and applies it to the specified
395 394 repository.
396 395
397 396 Like "stream_out," the status line added by the wire protocol is not
398 397 handled by this function.
399 398 """
400 399 with repo.lock():
401 400 repo.ui.status(
402 401 _(b'%d files to transfer, %s of data\n')
403 402 % (filecount, util.bytecount(bytecount))
404 403 )
405 404 progress = repo.ui.makeprogress(
406 405 _(b'clone'), total=bytecount, unit=_(b'bytes')
407 406 )
408 407 progress.update(0)
409 408 start = util.timer()
410 409
411 410 # TODO: get rid of (potential) inconsistency
412 411 #
413 412 # If transaction is started and any @filecache property is
414 413 # changed at this point, it causes inconsistency between
415 414 # in-memory cached property and streamclone-ed file on the
416 415 # disk. Nested transaction prevents transaction scope "clone"
417 416 # below from writing in-memory changes out at the end of it,
418 417 # even though in-memory changes are discarded at the end of it
419 418 # regardless of transaction nesting.
420 419 #
421 420 # But transaction nesting can't be simply prohibited, because
422 421 # nesting occurs also in ordinary case (e.g. enabling
423 422 # clonebundles).
424 423
425 424 with repo.transaction(b'clone'):
426 425 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
427 426 for i in pycompat.xrange(filecount):
428 427 # XXX doesn't support '\n' or '\r' in filenames
429 428 l = fp.readline()
430 429 try:
431 430 name, size = l.split(b'\0', 1)
432 431 size = int(size)
433 432 except (ValueError, TypeError):
434 433 raise error.ResponseError(
435 434 _(b'unexpected response from remote server:'), l
436 435 )
437 436 if repo.ui.debugflag:
438 437 repo.ui.debug(
439 438 b'adding %s (%s)\n' % (name, util.bytecount(size))
440 439 )
441 440 # for backwards compat, name was partially encoded
442 441 path = store.decodedir(name)
443 442 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
444 443 for chunk in util.filechunkiter(fp, limit=size):
445 444 progress.increment(step=len(chunk))
446 445 ofp.write(chunk)
447 446
448 447 # force @filecache properties to be reloaded from
449 448 # streamclone-ed file at next access
450 449 repo.invalidate(clearfilecache=True)
451 450
452 451 elapsed = util.timer() - start
453 452 if elapsed <= 0:
454 453 elapsed = 0.001
455 454 progress.complete()
456 455 repo.ui.status(
457 456 _(b'transferred %s in %.1f seconds (%s/sec)\n')
458 457 % (
459 458 util.bytecount(bytecount),
460 459 elapsed,
461 460 util.bytecount(bytecount / elapsed),
462 461 )
463 462 )
464 463
465 464
466 465 def readbundle1header(fp):
467 466 compression = fp.read(2)
468 467 if compression != b'UN':
469 468 raise error.Abort(
470 469 _(
471 470 b'only uncompressed stream clone bundles are '
472 471 b'supported; got %s'
473 472 )
474 473 % compression
475 474 )
476 475
477 476 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
478 477 requireslen = struct.unpack(b'>H', fp.read(2))[0]
479 478 requires = fp.read(requireslen)
480 479
481 480 if not requires.endswith(b'\0'):
482 481 raise error.Abort(
483 482 _(
484 483 b'malformed stream clone bundle: '
485 484 b'requirements not properly encoded'
486 485 )
487 486 )
488 487
489 488 requirements = set(requires.rstrip(b'\0').split(b','))
490 489
491 490 return filecount, bytecount, requirements
492 491
493 492
494 493 def applybundlev1(repo, fp):
495 494 """Apply the content from a stream clone bundle version 1.
496 495
497 496 We assume the 4 byte header has been read and validated and the file handle
498 497 is at the 2 byte compression identifier.
499 498 """
500 499 if len(repo):
501 500 raise error.Abort(
502 501 _(b'cannot apply stream clone bundle on non-empty repo')
503 502 )
504 503
505 504 filecount, bytecount, requirements = readbundle1header(fp)
506 505 missingreqs = requirements - repo.supported
507 506 if missingreqs:
508 507 raise error.Abort(
509 508 _(b'unable to apply stream clone: unsupported format: %s')
510 509 % b', '.join(sorted(missingreqs))
511 510 )
512 511
513 512 consumev1(repo, fp, filecount, bytecount)
514 513
515 514
516 515 class streamcloneapplier(object):
517 516 """Class to manage applying streaming clone bundles.
518 517
519 518 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
520 519 readers to perform bundle type-specific functionality.
521 520 """
522 521
523 522 def __init__(self, fh):
524 523 self._fh = fh
525 524
526 525 def apply(self, repo):
527 526 return applybundlev1(repo, self._fh)
528 527
529 528
530 529 # type of file to stream
531 530 _fileappend = 0 # append only file
532 531 _filefull = 1 # full snapshot file
533 532
534 533 # Source of the file
535 534 _srcstore = b's' # store (svfs)
536 535 _srccache = b'c' # cache (cache)
537 536
538 537 # This is it's own function so extensions can override it.
539 538 def _walkstreamfullstorefiles(repo):
540 539 """list snapshot file from the store"""
541 540 fnames = []
542 541 if not repo.publishing():
543 542 fnames.append(b'phaseroots')
544 543 return fnames
545 544
546 545
547 546 def _filterfull(entry, copy, vfsmap):
548 547 """actually copy the snapshot files"""
549 548 src, name, ftype, data = entry
550 549 if ftype != _filefull:
551 550 return entry
552 551 return (src, name, ftype, copy(vfsmap[src].join(name)))
553 552
554 553
555 554 @contextlib.contextmanager
556 555 def maketempcopies():
557 556 """return a function to temporary copy file"""
558 557 files = []
559 558 try:
560 559
561 560 def copy(src):
562 561 fd, dst = pycompat.mkstemp()
563 562 os.close(fd)
564 563 files.append(dst)
565 564 util.copyfiles(src, dst, hardlink=True)
566 565 return dst
567 566
568 567 yield copy
569 568 finally:
570 569 for tmp in files:
571 570 util.tryunlink(tmp)
572 571
573 572
574 573 def _makemap(repo):
575 574 """make a (src -> vfs) map for the repo"""
576 575 vfsmap = {
577 576 _srcstore: repo.svfs,
578 577 _srccache: repo.cachevfs,
579 578 }
580 579 # we keep repo.vfs out of the on purpose, ther are too many danger there
581 580 # (eg: .hg/hgrc)
582 581 assert repo.vfs not in vfsmap.values()
583 582
584 583 return vfsmap
585 584
586 585
587 586 def _emit2(repo, entries, totalfilesize):
588 587 """actually emit the stream bundle"""
589 588 vfsmap = _makemap(repo)
590 589 # we keep repo.vfs out of the on purpose, ther are too many danger there
591 590 # (eg: .hg/hgrc),
592 591 #
593 592 # this assert is duplicated (from _makemap) as author might think this is
594 593 # fine, while this is really not fine.
595 594 if repo.vfs in vfsmap.values():
596 595 raise error.ProgrammingError(
597 596 b'repo.vfs must not be added to vfsmap for security reasons'
598 597 )
599 598
600 599 progress = repo.ui.makeprogress(
601 600 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
602 601 )
603 602 progress.update(0)
604 603 with maketempcopies() as copy, progress:
605 604 # copy is delayed until we are in the try
606 605 entries = [_filterfull(e, copy, vfsmap) for e in entries]
607 606 yield None # this release the lock on the repository
608 607 totalbytecount = 0
609 608
610 609 for src, name, ftype, data in entries:
611 610 vfs = vfsmap[src]
612 611 yield src
613 612 yield util.uvarintencode(len(name))
614 613 if ftype == _fileappend:
615 614 fp = vfs(name)
616 615 size = data
617 616 elif ftype == _filefull:
618 617 fp = open(data, b'rb')
619 618 size = util.fstat(fp).st_size
620 619 bytecount = 0
621 620 try:
622 621 yield util.uvarintencode(size)
623 622 yield name
624 623 if size <= 65536:
625 624 chunks = (fp.read(size),)
626 625 else:
627 626 chunks = util.filechunkiter(fp, limit=size)
628 627 for chunk in chunks:
629 628 bytecount += len(chunk)
630 629 totalbytecount += len(chunk)
631 630 progress.update(totalbytecount)
632 631 yield chunk
633 632 if bytecount != size:
634 633 # Would most likely be caused by a race due to `hg strip` or
635 634 # a revlog split
636 635 raise error.Abort(
637 636 _(
638 637 b'clone could only read %d bytes from %s, but '
639 638 b'expected %d bytes'
640 639 )
641 640 % (bytecount, name, size)
642 641 )
643 642 finally:
644 643 fp.close()
645 644
646 645
647 646 def _test_sync_point_walk_1(repo):
648 647 """a function for synchronisation during tests"""
649 648
650 649
651 650 def _test_sync_point_walk_2(repo):
652 651 """a function for synchronisation during tests"""
653 652
654 653
655 654 def _v2_walk(repo, includes, excludes, includeobsmarkers):
656 655 """emit a seris of files information useful to clone a repo
657 656
658 657 return (entries, totalfilesize)
659 658
660 659 entries is a list of tuple (vfs-key, file-path, file-type, size)
661 660
662 661 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
663 662 - `name`: file path of the file to copy (to be feed to the vfss)
664 663 - `file-type`: do this file need to be copied with the source lock ?
665 664 - `size`: the size of the file (or None)
666 665 """
667 666 assert repo._currentlock(repo._lockref) is not None
668 667 entries = []
669 668 totalfilesize = 0
670 669
671 670 matcher = None
672 671 if includes or excludes:
673 672 matcher = narrowspec.match(repo.root, includes, excludes)
674 673
675 674 for rl_type, name, size in _walkstreamfiles(repo, matcher):
676 675 if size:
677 676 ft = _fileappend
678 677 if rl_type & store.FILEFLAGS_VOLATILE:
679 678 ft = _filefull
680 679 entries.append((_srcstore, name, ft, size))
681 680 totalfilesize += size
682 681 for name in _walkstreamfullstorefiles(repo):
683 682 if repo.svfs.exists(name):
684 683 totalfilesize += repo.svfs.lstat(name).st_size
685 684 entries.append((_srcstore, name, _filefull, None))
686 685 if includeobsmarkers and repo.svfs.exists(b'obsstore'):
687 686 totalfilesize += repo.svfs.lstat(b'obsstore').st_size
688 687 entries.append((_srcstore, b'obsstore', _filefull, None))
689 688 for name in cacheutil.cachetocopy(repo):
690 689 if repo.cachevfs.exists(name):
691 690 totalfilesize += repo.cachevfs.lstat(name).st_size
692 691 entries.append((_srccache, name, _filefull, None))
693 692 return entries, totalfilesize
694 693
695 694
696 695 def generatev2(repo, includes, excludes, includeobsmarkers):
697 696 """Emit content for version 2 of a streaming clone.
698 697
699 698 the data stream consists the following entries:
700 699 1) A char representing the file destination (eg: store or cache)
701 700 2) A varint containing the length of the filename
702 701 3) A varint containing the length of file data
703 702 4) N bytes containing the filename (the internal, store-agnostic form)
704 703 5) N bytes containing the file data
705 704
706 705 Returns a 3-tuple of (file count, file size, data iterator).
707 706 """
708 707
709 708 with repo.lock():
710 709
711 710 repo.ui.debug(b'scanning\n')
712 711
713 712 entries, totalfilesize = _v2_walk(
714 713 repo,
715 714 includes=includes,
716 715 excludes=excludes,
717 716 includeobsmarkers=includeobsmarkers,
718 717 )
719 718
720 719 chunks = _emit2(repo, entries, totalfilesize)
721 720 first = next(chunks)
722 721 assert first is None
723 722 _test_sync_point_walk_1(repo)
724 723 _test_sync_point_walk_2(repo)
725 724
726 725 return len(entries), totalfilesize, chunks
727 726
728 727
729 728 @contextlib.contextmanager
730 729 def nested(*ctxs):
731 730 this = ctxs[0]
732 731 rest = ctxs[1:]
733 732 with this:
734 733 if rest:
735 734 with nested(*rest):
736 735 yield
737 736 else:
738 737 yield
739 738
740 739
741 740 def consumev2(repo, fp, filecount, filesize):
742 741 """Apply the contents from a version 2 streaming clone.
743 742
744 743 Data is read from an object that only needs to provide a ``read(size)``
745 744 method.
746 745 """
747 746 with repo.lock():
748 747 repo.ui.status(
749 748 _(b'%d files to transfer, %s of data\n')
750 749 % (filecount, util.bytecount(filesize))
751 750 )
752 751
753 752 start = util.timer()
754 753 progress = repo.ui.makeprogress(
755 754 _(b'clone'), total=filesize, unit=_(b'bytes')
756 755 )
757 756 progress.update(0)
758 757
759 758 vfsmap = _makemap(repo)
760 759 # we keep repo.vfs out of the on purpose, ther are too many danger
761 760 # there (eg: .hg/hgrc),
762 761 #
763 762 # this assert is duplicated (from _makemap) as author might think this
764 763 # is fine, while this is really not fine.
765 764 if repo.vfs in vfsmap.values():
766 765 raise error.ProgrammingError(
767 766 b'repo.vfs must not be added to vfsmap for security reasons'
768 767 )
769 768
770 769 with repo.transaction(b'clone'):
771 770 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
772 771 with nested(*ctxs):
773 772 for i in range(filecount):
774 773 src = util.readexactly(fp, 1)
775 774 vfs = vfsmap[src]
776 775 namelen = util.uvarintdecodestream(fp)
777 776 datalen = util.uvarintdecodestream(fp)
778 777
779 778 name = util.readexactly(fp, namelen)
780 779
781 780 if repo.ui.debugflag:
782 781 repo.ui.debug(
783 782 b'adding [%s] %s (%s)\n'
784 783 % (src, name, util.bytecount(datalen))
785 784 )
786 785
787 786 with vfs(name, b'w') as ofp:
788 787 for chunk in util.filechunkiter(fp, limit=datalen):
789 788 progress.increment(step=len(chunk))
790 789 ofp.write(chunk)
791 790
792 791 # force @filecache properties to be reloaded from
793 792 # streamclone-ed file at next access
794 793 repo.invalidate(clearfilecache=True)
795 794
796 795 elapsed = util.timer() - start
797 796 if elapsed <= 0:
798 797 elapsed = 0.001
799 798 repo.ui.status(
800 799 _(b'transferred %s in %.1f seconds (%s/sec)\n')
801 800 % (
802 801 util.bytecount(progress.pos),
803 802 elapsed,
804 803 util.bytecount(progress.pos / elapsed),
805 804 )
806 805 )
807 806 progress.complete()
808 807
809 808
810 809 def applybundlev2(repo, fp, filecount, filesize, requirements):
811 810 from . import localrepo
812 811
813 812 missingreqs = [r for r in requirements if r not in repo.supported]
814 813 if missingreqs:
815 814 raise error.Abort(
816 815 _(b'unable to apply stream clone: unsupported format: %s')
817 816 % b', '.join(sorted(missingreqs))
818 817 )
819 818
820 819 consumev2(repo, fp, filecount, filesize)
821 820
822 821 repo.requirements = new_stream_clone_requirements(
823 repo.supportedformats,
824 822 repo.requirements,
825 823 requirements,
826 824 )
827 825 repo.svfs.options = localrepo.resolvestorevfsoptions(
828 826 repo.ui, repo.requirements, repo.features
829 827 )
830 828 scmutil.writereporequirements(repo)
831 829
832 830
833 831 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
834 832 hardlink = [True]
835 833
836 834 def copy_used():
837 835 hardlink[0] = False
838 836 progress.topic = _(b'copying')
839 837
840 838 for k, path, size in entries:
841 839 src_vfs = src_vfs_map[k]
842 840 dst_vfs = dst_vfs_map[k]
843 841 src_path = src_vfs.join(path)
844 842 dst_path = dst_vfs.join(path)
845 843 # We cannot use dirname and makedirs of dst_vfs here because the store
846 844 # encoding confuses them. See issue 6581 for details.
847 845 dirname = os.path.dirname(dst_path)
848 846 if not os.path.exists(dirname):
849 847 util.makedirs(dirname)
850 848 dst_vfs.register_file(path)
851 849 # XXX we could use the #nb_bytes argument.
852 850 util.copyfile(
853 851 src_path,
854 852 dst_path,
855 853 hardlink=hardlink[0],
856 854 no_hardlink_cb=copy_used,
857 855 check_fs_hardlink=False,
858 856 )
859 857 progress.increment()
860 858 return hardlink[0]
861 859
862 860
863 861 def local_copy(src_repo, dest_repo):
864 862 """copy all content from one local repository to another
865 863
866 864 This is useful for local clone"""
867 865 src_store_requirements = {
868 866 r
869 867 for r in src_repo.requirements
870 868 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
871 869 }
872 870 dest_store_requirements = {
873 871 r
874 872 for r in dest_repo.requirements
875 873 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
876 874 }
877 875 assert src_store_requirements == dest_store_requirements
878 876
879 877 with dest_repo.lock():
880 878 with src_repo.lock():
881 879
882 880 # bookmark is not integrated to the streaming as it might use the
883 881 # `repo.vfs` and they are too many sentitive data accessible
884 882 # through `repo.vfs` to expose it to streaming clone.
885 883 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
886 884 srcbookmarks = src_book_vfs.join(b'bookmarks')
887 885 bm_count = 0
888 886 if os.path.exists(srcbookmarks):
889 887 bm_count = 1
890 888
891 889 entries, totalfilesize = _v2_walk(
892 890 src_repo,
893 891 includes=None,
894 892 excludes=None,
895 893 includeobsmarkers=True,
896 894 )
897 895 src_vfs_map = _makemap(src_repo)
898 896 dest_vfs_map = _makemap(dest_repo)
899 897 progress = src_repo.ui.makeprogress(
900 898 topic=_(b'linking'),
901 899 total=len(entries) + bm_count,
902 900 unit=_(b'files'),
903 901 )
904 902 # copy files
905 903 #
906 904 # We could copy the full file while the source repository is locked
907 905 # and the other one without the lock. However, in the linking case,
908 906 # this would also requires checks that nobody is appending any data
909 907 # to the files while we do the clone, so this is not done yet. We
910 908 # could do this blindly when copying files.
911 909 files = ((k, path, size) for k, path, ftype, size in entries)
912 910 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
913 911
914 912 # copy bookmarks over
915 913 if bm_count:
916 914 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
917 915 dstbookmarks = dst_book_vfs.join(b'bookmarks')
918 916 util.copyfile(srcbookmarks, dstbookmarks)
919 917 progress.complete()
920 918 if hardlink:
921 919 msg = b'linked %d files\n'
922 920 else:
923 921 msg = b'copied %d files\n'
924 922 src_repo.ui.debug(msg % (len(entries) + bm_count))
925 923
926 924 with dest_repo.transaction(b"localclone") as tr:
927 925 dest_repo.store.write(tr)
928 926
929 927 # clean up transaction file as they do not make sense
930 928 undo_files = [(dest_repo.svfs, b'undo.backupfiles')]
931 929 undo_files.extend(dest_repo.undofiles())
932 930 for undovfs, undofile in undo_files:
933 931 try:
934 932 undovfs.unlink(undofile)
935 933 except OSError as e:
936 934 if e.errno != errno.ENOENT:
937 935 msg = _(b'error removing %s: %s\n')
938 936 path = undovfs.join(undofile)
939 937 e_msg = stringutil.forcebytestr(e)
940 938 msg %= (path, e_msg)
941 939 dest_repo.ui.warn(msg)
General Comments 0
You need to be logged in to leave comments. Login now