##// END OF EJS Templates
stream-clone: bail-out earlier if stream clone is not requested...
marmoute -
r51415:58e4842f default
parent child Browse files
Show More
@@ -1,960 +1,959 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8
9 9 import contextlib
10 10 import os
11 11 import struct
12 12
13 13 from .i18n import _
14 14 from .pycompat import open
15 15 from .interfaces import repository
16 16 from . import (
17 17 bookmarks,
18 18 cacheutil,
19 19 error,
20 20 narrowspec,
21 21 phases,
22 22 pycompat,
23 23 requirements as requirementsmod,
24 24 scmutil,
25 25 store,
26 26 transaction,
27 27 util,
28 28 )
29 29 from .revlogutils import (
30 30 nodemap,
31 31 )
32 32
33 33
34 34 def new_stream_clone_requirements(default_requirements, streamed_requirements):
35 35 """determine the final set of requirement for a new stream clone
36 36
37 37 this method combine the "default" requirements that a new repository would
38 38 use with the constaint we get from the stream clone content. We keep local
39 39 configuration choice when possible.
40 40 """
41 41 requirements = set(default_requirements)
42 42 requirements -= requirementsmod.STREAM_FIXED_REQUIREMENTS
43 43 requirements.update(streamed_requirements)
44 44 return requirements
45 45
46 46
47 47 def streamed_requirements(repo):
48 48 """the set of requirement the new clone will have to support
49 49
50 50 This is used for advertising the stream options and to generate the actual
51 51 stream content."""
52 52 requiredformats = (
53 53 repo.requirements & requirementsmod.STREAM_FIXED_REQUIREMENTS
54 54 )
55 55 return requiredformats
56 56
57 57
58 58 def canperformstreamclone(pullop, bundle2=False):
59 59 """Whether it is possible to perform a streaming clone as part of pull.
60 60
61 61 ``bundle2`` will cause the function to consider stream clone through
62 62 bundle2 and only through bundle2.
63 63
64 64 Returns a tuple of (supported, requirements). ``supported`` is True if
65 65 streaming clone is supported and False otherwise. ``requirements`` is
66 66 a set of repo requirements from the remote, or ``None`` if stream clone
67 67 isn't supported.
68 68 """
69 69 repo = pullop.repo
70 70 remote = pullop.remote
71 71
72 # should we consider streaming clone at all ?
73 streamrequested = pullop.streamclonerequested
74 # If we don't have a preference, let the server decide for us. This
75 # likely only comes into play in LANs.
76 if streamrequested is None:
77 # The server can advertise whether to prefer streaming clone.
78 streamrequested = remote.capable(b'stream-preferred')
79 if not streamrequested:
80 return False, None
81
72 82 # Streaming clone only works on an empty destination repository
73 83 if len(repo):
74 84 return False, None
75 85
76 86 # Streaming clone only works if all data is being requested.
77 87 if pullop.heads:
78 88 return False, None
79 89
80 90 bundle2supported = False
81 91 if pullop.canusebundle2:
82 92 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
83 93 bundle2supported = True
84 94 # else
85 95 # Server doesn't support bundle2 stream clone or doesn't support
86 96 # the versions we support. Fall back and possibly allow legacy.
87 97
88 98 # Ensures legacy code path uses available bundle2.
89 99 if bundle2supported and not bundle2:
90 100 return False, None
91 101 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
92 102 elif bundle2 and not bundle2supported:
93 103 return False, None
94 104
95 streamrequested = pullop.streamclonerequested
96
97 # If we don't have a preference, let the server decide for us. This
98 # likely only comes into play in LANs.
99 if streamrequested is None:
100 # The server can advertise whether to prefer streaming clone.
101 streamrequested = remote.capable(b'stream-preferred')
102
103 if not streamrequested:
104 return False, None
105
106 105 # In order for stream clone to work, the client has to support all the
107 106 # requirements advertised by the server.
108 107 #
109 108 # The server advertises its requirements via the "stream" and "streamreqs"
110 109 # capability. "stream" (a value-less capability) is advertised if and only
111 110 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
112 111 # is advertised and contains a comma-delimited list of requirements.
113 112 requirements = set()
114 113 if remote.capable(b'stream'):
115 114 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
116 115 else:
117 116 streamreqs = remote.capable(b'streamreqs')
118 117 # This is weird and shouldn't happen with modern servers.
119 118 if not streamreqs:
120 119 pullop.repo.ui.warn(
121 120 _(
122 121 b'warning: stream clone requested but server has them '
123 122 b'disabled\n'
124 123 )
125 124 )
126 125 return False, None
127 126
128 127 streamreqs = set(streamreqs.split(b','))
129 128 # Server requires something we don't support. Bail.
130 129 missingreqs = streamreqs - repo.supported
131 130 if missingreqs:
132 131 pullop.repo.ui.warn(
133 132 _(
134 133 b'warning: stream clone requested but client is missing '
135 134 b'requirements: %s\n'
136 135 )
137 136 % b', '.join(sorted(missingreqs))
138 137 )
139 138 pullop.repo.ui.warn(
140 139 _(
141 140 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
142 141 b'for more information)\n'
143 142 )
144 143 )
145 144 return False, None
146 145 requirements = streamreqs
147 146
148 147 return True, requirements
149 148
150 149
151 150 def maybeperformlegacystreamclone(pullop):
152 151 """Possibly perform a legacy stream clone operation.
153 152
154 153 Legacy stream clones are performed as part of pull but before all other
155 154 operations.
156 155
157 156 A legacy stream clone will not be performed if a bundle2 stream clone is
158 157 supported.
159 158 """
160 159 from . import localrepo
161 160
162 161 supported, requirements = canperformstreamclone(pullop)
163 162
164 163 if not supported:
165 164 return
166 165
167 166 repo = pullop.repo
168 167 remote = pullop.remote
169 168
170 169 # Save remote branchmap. We will use it later to speed up branchcache
171 170 # creation.
172 171 rbranchmap = None
173 172 if remote.capable(b'branchmap'):
174 173 with remote.commandexecutor() as e:
175 174 rbranchmap = e.callcommand(b'branchmap', {}).result()
176 175
177 176 repo.ui.status(_(b'streaming all changes\n'))
178 177
179 178 with remote.commandexecutor() as e:
180 179 fp = e.callcommand(b'stream_out', {}).result()
181 180
182 181 # TODO strictly speaking, this code should all be inside the context
183 182 # manager because the context manager is supposed to ensure all wire state
184 183 # is flushed when exiting. But the legacy peers don't do this, so it
185 184 # doesn't matter.
186 185 l = fp.readline()
187 186 try:
188 187 resp = int(l)
189 188 except ValueError:
190 189 raise error.ResponseError(
191 190 _(b'unexpected response from remote server:'), l
192 191 )
193 192 if resp == 1:
194 193 raise error.Abort(_(b'operation forbidden by server'))
195 194 elif resp == 2:
196 195 raise error.Abort(_(b'locking the remote repository failed'))
197 196 elif resp != 0:
198 197 raise error.Abort(_(b'the server sent an unknown error code'))
199 198
200 199 l = fp.readline()
201 200 try:
202 201 filecount, bytecount = map(int, l.split(b' ', 1))
203 202 except (ValueError, TypeError):
204 203 raise error.ResponseError(
205 204 _(b'unexpected response from remote server:'), l
206 205 )
207 206
208 207 with repo.lock():
209 208 consumev1(repo, fp, filecount, bytecount)
210 209 repo.requirements = new_stream_clone_requirements(
211 210 repo.requirements,
212 211 requirements,
213 212 )
214 213 repo.svfs.options = localrepo.resolvestorevfsoptions(
215 214 repo.ui, repo.requirements, repo.features
216 215 )
217 216 scmutil.writereporequirements(repo)
218 217 nodemap.post_stream_cleanup(repo)
219 218
220 219 if rbranchmap:
221 220 repo._branchcaches.replace(repo, rbranchmap)
222 221
223 222 repo.invalidate()
224 223
225 224
226 225 def allowservergeneration(repo):
227 226 """Whether streaming clones are allowed from the server."""
228 227 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
229 228 return False
230 229
231 230 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
232 231 return False
233 232
234 233 # The way stream clone works makes it impossible to hide secret changesets.
235 234 # So don't allow this by default.
236 235 secret = phases.hassecret(repo)
237 236 if secret:
238 237 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
239 238
240 239 return True
241 240
242 241
243 242 # This is it's own function so extensions can override it.
244 243 def _walkstreamfiles(repo, matcher=None, phase=False, obsolescence=False):
245 244 return repo.store.walk(matcher, phase=phase, obsolescence=obsolescence)
246 245
247 246
248 247 def generatev1(repo):
249 248 """Emit content for version 1 of a streaming clone.
250 249
251 250 This returns a 3-tuple of (file count, byte size, data iterator).
252 251
253 252 The data iterator consists of N entries for each file being transferred.
254 253 Each file entry starts as a line with the file name and integer size
255 254 delimited by a null byte.
256 255
257 256 The raw file data follows. Following the raw file data is the next file
258 257 entry, or EOF.
259 258
260 259 When used on the wire protocol, an additional line indicating protocol
261 260 success will be prepended to the stream. This function is not responsible
262 261 for adding it.
263 262
264 263 This function will obtain a repository lock to ensure a consistent view of
265 264 the store is captured. It therefore may raise LockError.
266 265 """
267 266 entries = []
268 267 total_bytes = 0
269 268 # Get consistent snapshot of repo, lock during scan.
270 269 with repo.lock():
271 270 repo.ui.debug(b'scanning\n')
272 271 for entry in _walkstreamfiles(repo):
273 272 for f in entry.files():
274 273 file_size = f.file_size(repo.store.vfs)
275 274 if file_size:
276 275 entries.append((f.unencoded_path, file_size))
277 276 total_bytes += file_size
278 277 _test_sync_point_walk_1(repo)
279 278 _test_sync_point_walk_2(repo)
280 279
281 280 repo.ui.debug(
282 281 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
283 282 )
284 283
285 284 svfs = repo.svfs
286 285 debugflag = repo.ui.debugflag
287 286
288 287 def emitrevlogdata():
289 288 for name, size in entries:
290 289 if debugflag:
291 290 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
292 291 # partially encode name over the wire for backwards compat
293 292 yield b'%s\0%d\n' % (store.encodedir(name), size)
294 293 # auditing at this stage is both pointless (paths are already
295 294 # trusted by the local repo) and expensive
296 295 with svfs(name, b'rb', auditpath=False) as fp:
297 296 if size <= 65536:
298 297 yield fp.read(size)
299 298 else:
300 299 for chunk in util.filechunkiter(fp, limit=size):
301 300 yield chunk
302 301
303 302 return len(entries), total_bytes, emitrevlogdata()
304 303
305 304
306 305 def generatev1wireproto(repo):
307 306 """Emit content for version 1 of streaming clone suitable for the wire.
308 307
309 308 This is the data output from ``generatev1()`` with 2 header lines. The
310 309 first line indicates overall success. The 2nd contains the file count and
311 310 byte size of payload.
312 311
313 312 The success line contains "0" for success, "1" for stream generation not
314 313 allowed, and "2" for error locking the repository (possibly indicating
315 314 a permissions error for the server process).
316 315 """
317 316 if not allowservergeneration(repo):
318 317 yield b'1\n'
319 318 return
320 319
321 320 try:
322 321 filecount, bytecount, it = generatev1(repo)
323 322 except error.LockError:
324 323 yield b'2\n'
325 324 return
326 325
327 326 # Indicates successful response.
328 327 yield b'0\n'
329 328 yield b'%d %d\n' % (filecount, bytecount)
330 329 for chunk in it:
331 330 yield chunk
332 331
333 332
334 333 def generatebundlev1(repo, compression=b'UN'):
335 334 """Emit content for version 1 of a stream clone bundle.
336 335
337 336 The first 4 bytes of the output ("HGS1") denote this as stream clone
338 337 bundle version 1.
339 338
340 339 The next 2 bytes indicate the compression type. Only "UN" is currently
341 340 supported.
342 341
343 342 The next 16 bytes are two 64-bit big endian unsigned integers indicating
344 343 file count and byte count, respectively.
345 344
346 345 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
347 346 of the requirements string, including a trailing \0. The following N bytes
348 347 are the requirements string, which is ASCII containing a comma-delimited
349 348 list of repo requirements that are needed to support the data.
350 349
351 350 The remaining content is the output of ``generatev1()`` (which may be
352 351 compressed in the future).
353 352
354 353 Returns a tuple of (requirements, data generator).
355 354 """
356 355 if compression != b'UN':
357 356 raise ValueError(b'we do not support the compression argument yet')
358 357
359 358 requirements = streamed_requirements(repo)
360 359 requires = b','.join(sorted(requirements))
361 360
362 361 def gen():
363 362 yield b'HGS1'
364 363 yield compression
365 364
366 365 filecount, bytecount, it = generatev1(repo)
367 366 repo.ui.status(
368 367 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
369 368 )
370 369
371 370 yield struct.pack(b'>QQ', filecount, bytecount)
372 371 yield struct.pack(b'>H', len(requires) + 1)
373 372 yield requires + b'\0'
374 373
375 374 # This is where we'll add compression in the future.
376 375 assert compression == b'UN'
377 376
378 377 progress = repo.ui.makeprogress(
379 378 _(b'bundle'), total=bytecount, unit=_(b'bytes')
380 379 )
381 380 progress.update(0)
382 381
383 382 for chunk in it:
384 383 progress.increment(step=len(chunk))
385 384 yield chunk
386 385
387 386 progress.complete()
388 387
389 388 return requirements, gen()
390 389
391 390
392 391 def consumev1(repo, fp, filecount, bytecount):
393 392 """Apply the contents from version 1 of a streaming clone file handle.
394 393
395 394 This takes the output from "stream_out" and applies it to the specified
396 395 repository.
397 396
398 397 Like "stream_out," the status line added by the wire protocol is not
399 398 handled by this function.
400 399 """
401 400 with repo.lock():
402 401 repo.ui.status(
403 402 _(b'%d files to transfer, %s of data\n')
404 403 % (filecount, util.bytecount(bytecount))
405 404 )
406 405 progress = repo.ui.makeprogress(
407 406 _(b'clone'), total=bytecount, unit=_(b'bytes')
408 407 )
409 408 progress.update(0)
410 409 start = util.timer()
411 410
412 411 # TODO: get rid of (potential) inconsistency
413 412 #
414 413 # If transaction is started and any @filecache property is
415 414 # changed at this point, it causes inconsistency between
416 415 # in-memory cached property and streamclone-ed file on the
417 416 # disk. Nested transaction prevents transaction scope "clone"
418 417 # below from writing in-memory changes out at the end of it,
419 418 # even though in-memory changes are discarded at the end of it
420 419 # regardless of transaction nesting.
421 420 #
422 421 # But transaction nesting can't be simply prohibited, because
423 422 # nesting occurs also in ordinary case (e.g. enabling
424 423 # clonebundles).
425 424
426 425 with repo.transaction(b'clone'):
427 426 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
428 427 for i in range(filecount):
429 428 # XXX doesn't support '\n' or '\r' in filenames
430 429 l = fp.readline()
431 430 try:
432 431 name, size = l.split(b'\0', 1)
433 432 size = int(size)
434 433 except (ValueError, TypeError):
435 434 raise error.ResponseError(
436 435 _(b'unexpected response from remote server:'), l
437 436 )
438 437 if repo.ui.debugflag:
439 438 repo.ui.debug(
440 439 b'adding %s (%s)\n' % (name, util.bytecount(size))
441 440 )
442 441 # for backwards compat, name was partially encoded
443 442 path = store.decodedir(name)
444 443 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
445 444 for chunk in util.filechunkiter(fp, limit=size):
446 445 progress.increment(step=len(chunk))
447 446 ofp.write(chunk)
448 447
449 448 # force @filecache properties to be reloaded from
450 449 # streamclone-ed file at next access
451 450 repo.invalidate(clearfilecache=True)
452 451
453 452 elapsed = util.timer() - start
454 453 if elapsed <= 0:
455 454 elapsed = 0.001
456 455 progress.complete()
457 456 repo.ui.status(
458 457 _(b'transferred %s in %.1f seconds (%s/sec)\n')
459 458 % (
460 459 util.bytecount(bytecount),
461 460 elapsed,
462 461 util.bytecount(bytecount / elapsed),
463 462 )
464 463 )
465 464
466 465
467 466 def readbundle1header(fp):
468 467 compression = fp.read(2)
469 468 if compression != b'UN':
470 469 raise error.Abort(
471 470 _(
472 471 b'only uncompressed stream clone bundles are '
473 472 b'supported; got %s'
474 473 )
475 474 % compression
476 475 )
477 476
478 477 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
479 478 requireslen = struct.unpack(b'>H', fp.read(2))[0]
480 479 requires = fp.read(requireslen)
481 480
482 481 if not requires.endswith(b'\0'):
483 482 raise error.Abort(
484 483 _(
485 484 b'malformed stream clone bundle: '
486 485 b'requirements not properly encoded'
487 486 )
488 487 )
489 488
490 489 requirements = set(requires.rstrip(b'\0').split(b','))
491 490
492 491 return filecount, bytecount, requirements
493 492
494 493
495 494 def applybundlev1(repo, fp):
496 495 """Apply the content from a stream clone bundle version 1.
497 496
498 497 We assume the 4 byte header has been read and validated and the file handle
499 498 is at the 2 byte compression identifier.
500 499 """
501 500 if len(repo):
502 501 raise error.Abort(
503 502 _(b'cannot apply stream clone bundle on non-empty repo')
504 503 )
505 504
506 505 filecount, bytecount, requirements = readbundle1header(fp)
507 506 missingreqs = requirements - repo.supported
508 507 if missingreqs:
509 508 raise error.Abort(
510 509 _(b'unable to apply stream clone: unsupported format: %s')
511 510 % b', '.join(sorted(missingreqs))
512 511 )
513 512
514 513 consumev1(repo, fp, filecount, bytecount)
515 514 nodemap.post_stream_cleanup(repo)
516 515
517 516
518 517 class streamcloneapplier:
519 518 """Class to manage applying streaming clone bundles.
520 519
521 520 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
522 521 readers to perform bundle type-specific functionality.
523 522 """
524 523
525 524 def __init__(self, fh):
526 525 self._fh = fh
527 526
528 527 def apply(self, repo):
529 528 return applybundlev1(repo, self._fh)
530 529
531 530
532 531 # type of file to stream
533 532 _fileappend = 0 # append only file
534 533 _filefull = 1 # full snapshot file
535 534
536 535 # Source of the file
537 536 _srcstore = b's' # store (svfs)
538 537 _srccache = b'c' # cache (cache)
539 538
540 539 # This is it's own function so extensions can override it.
541 540 def _walkstreamfullstorefiles(repo):
542 541 """list snapshot file from the store"""
543 542 fnames = []
544 543 if not repo.publishing():
545 544 fnames.append(b'phaseroots')
546 545 return fnames
547 546
548 547
549 548 def _filterfull(entry, copy, vfsmap):
550 549 """actually copy the snapshot files"""
551 550 src, name, ftype, data = entry
552 551 if ftype != _filefull:
553 552 return entry
554 553 return (src, name, ftype, copy(vfsmap[src].join(name)))
555 554
556 555
557 556 @contextlib.contextmanager
558 557 def maketempcopies():
559 558 """return a function to temporary copy file"""
560 559
561 560 files = []
562 561 dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-')
563 562 try:
564 563
565 564 def copy(src):
566 565 fd, dst = pycompat.mkstemp(
567 566 prefix=os.path.basename(src), dir=dst_dir
568 567 )
569 568 os.close(fd)
570 569 files.append(dst)
571 570 util.copyfiles(src, dst, hardlink=True)
572 571 return dst
573 572
574 573 yield copy
575 574 finally:
576 575 for tmp in files:
577 576 util.tryunlink(tmp)
578 577 util.tryrmdir(dst_dir)
579 578
580 579
581 580 def _makemap(repo):
582 581 """make a (src -> vfs) map for the repo"""
583 582 vfsmap = {
584 583 _srcstore: repo.svfs,
585 584 _srccache: repo.cachevfs,
586 585 }
587 586 # we keep repo.vfs out of the on purpose, ther are too many danger there
588 587 # (eg: .hg/hgrc)
589 588 assert repo.vfs not in vfsmap.values()
590 589
591 590 return vfsmap
592 591
593 592
594 593 def _emit2(repo, entries, totalfilesize):
595 594 """actually emit the stream bundle"""
596 595 vfsmap = _makemap(repo)
597 596 # we keep repo.vfs out of the on purpose, ther are too many danger there
598 597 # (eg: .hg/hgrc),
599 598 #
600 599 # this assert is duplicated (from _makemap) as author might think this is
601 600 # fine, while this is really not fine.
602 601 if repo.vfs in vfsmap.values():
603 602 raise error.ProgrammingError(
604 603 b'repo.vfs must not be added to vfsmap for security reasons'
605 604 )
606 605
607 606 progress = repo.ui.makeprogress(
608 607 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
609 608 )
610 609 progress.update(0)
611 610 with maketempcopies() as copy, progress:
612 611 # copy is delayed until we are in the try
613 612 entries = [_filterfull(e, copy, vfsmap) for e in entries]
614 613 yield None # this release the lock on the repository
615 614 totalbytecount = 0
616 615
617 616 for src, name, ftype, data in entries:
618 617 vfs = vfsmap[src]
619 618 yield src
620 619 yield util.uvarintencode(len(name))
621 620 if ftype == _fileappend:
622 621 fp = vfs(name)
623 622 size = data
624 623 elif ftype == _filefull:
625 624 fp = open(data, b'rb')
626 625 size = util.fstat(fp).st_size
627 626 bytecount = 0
628 627 try:
629 628 yield util.uvarintencode(size)
630 629 yield name
631 630 if size <= 65536:
632 631 chunks = (fp.read(size),)
633 632 else:
634 633 chunks = util.filechunkiter(fp, limit=size)
635 634 for chunk in chunks:
636 635 bytecount += len(chunk)
637 636 totalbytecount += len(chunk)
638 637 progress.update(totalbytecount)
639 638 yield chunk
640 639 if bytecount != size:
641 640 # Would most likely be caused by a race due to `hg strip` or
642 641 # a revlog split
643 642 raise error.Abort(
644 643 _(
645 644 b'clone could only read %d bytes from %s, but '
646 645 b'expected %d bytes'
647 646 )
648 647 % (bytecount, name, size)
649 648 )
650 649 finally:
651 650 fp.close()
652 651
653 652
654 653 def _test_sync_point_walk_1(repo):
655 654 """a function for synchronisation during tests"""
656 655
657 656
658 657 def _test_sync_point_walk_2(repo):
659 658 """a function for synchronisation during tests"""
660 659
661 660
662 661 def _entries_walk(repo, includes, excludes, includeobsmarkers):
663 662 """emit a seris of files information useful to clone a repo
664 663
665 664 return (vfs-key, entry) iterator
666 665
667 666 Where `entry` is StoreEntry. (used even for cache entries)
668 667 """
669 668 assert repo._currentlock(repo._lockref) is not None
670 669
671 670 matcher = None
672 671 if includes or excludes:
673 672 matcher = narrowspec.match(repo.root, includes, excludes)
674 673
675 674 phase = not repo.publishing()
676 675 entries = _walkstreamfiles(
677 676 repo,
678 677 matcher,
679 678 phase=phase,
680 679 obsolescence=includeobsmarkers,
681 680 )
682 681 for entry in entries:
683 682 yield (_srcstore, entry)
684 683
685 684 for name in cacheutil.cachetocopy(repo):
686 685 if repo.cachevfs.exists(name):
687 686 # not really a StoreEntry, but close enough
688 687 entry = store.SimpleStoreEntry(
689 688 entry_path=name,
690 689 is_volatile=True,
691 690 )
692 691 yield (_srccache, entry)
693 692
694 693
695 694 def _v2_walk(repo, includes, excludes, includeobsmarkers):
696 695 """emit a seris of files information useful to clone a repo
697 696
698 697 return (entries, totalfilesize)
699 698
700 699 entries is a list of tuple (vfs-key, file-path, file-type, size)
701 700
702 701 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
703 702 - `name`: file path of the file to copy (to be feed to the vfss)
704 703 - `file-type`: do this file need to be copied with the source lock ?
705 704 - `size`: the size of the file (or None)
706 705 """
707 706 assert repo._currentlock(repo._lockref) is not None
708 707 files = []
709 708 totalfilesize = 0
710 709
711 710 vfsmap = _makemap(repo)
712 711 entries = _entries_walk(repo, includes, excludes, includeobsmarkers)
713 712 for vfs_key, entry in entries:
714 713 vfs = vfsmap[vfs_key]
715 714 for f in entry.files():
716 715 file_size = f.file_size(vfs)
717 716 if file_size:
718 717 ft = _fileappend
719 718 if f.is_volatile:
720 719 ft = _filefull
721 720 files.append((vfs_key, f.unencoded_path, ft, file_size))
722 721 totalfilesize += file_size
723 722 return files, totalfilesize
724 723
725 724
726 725 def generatev2(repo, includes, excludes, includeobsmarkers):
727 726 """Emit content for version 2 of a streaming clone.
728 727
729 728 the data stream consists the following entries:
730 729 1) A char representing the file destination (eg: store or cache)
731 730 2) A varint containing the length of the filename
732 731 3) A varint containing the length of file data
733 732 4) N bytes containing the filename (the internal, store-agnostic form)
734 733 5) N bytes containing the file data
735 734
736 735 Returns a 3-tuple of (file count, file size, data iterator).
737 736 """
738 737
739 738 with repo.lock():
740 739
741 740 repo.ui.debug(b'scanning\n')
742 741
743 742 entries, totalfilesize = _v2_walk(
744 743 repo,
745 744 includes=includes,
746 745 excludes=excludes,
747 746 includeobsmarkers=includeobsmarkers,
748 747 )
749 748
750 749 chunks = _emit2(repo, entries, totalfilesize)
751 750 first = next(chunks)
752 751 assert first is None
753 752 _test_sync_point_walk_1(repo)
754 753 _test_sync_point_walk_2(repo)
755 754
756 755 return len(entries), totalfilesize, chunks
757 756
758 757
759 758 @contextlib.contextmanager
760 759 def nested(*ctxs):
761 760 this = ctxs[0]
762 761 rest = ctxs[1:]
763 762 with this:
764 763 if rest:
765 764 with nested(*rest):
766 765 yield
767 766 else:
768 767 yield
769 768
770 769
771 770 def consumev2(repo, fp, filecount, filesize):
772 771 """Apply the contents from a version 2 streaming clone.
773 772
774 773 Data is read from an object that only needs to provide a ``read(size)``
775 774 method.
776 775 """
777 776 with repo.lock():
778 777 repo.ui.status(
779 778 _(b'%d files to transfer, %s of data\n')
780 779 % (filecount, util.bytecount(filesize))
781 780 )
782 781
783 782 start = util.timer()
784 783 progress = repo.ui.makeprogress(
785 784 _(b'clone'), total=filesize, unit=_(b'bytes')
786 785 )
787 786 progress.update(0)
788 787
789 788 vfsmap = _makemap(repo)
790 789 # we keep repo.vfs out of the on purpose, ther are too many danger
791 790 # there (eg: .hg/hgrc),
792 791 #
793 792 # this assert is duplicated (from _makemap) as author might think this
794 793 # is fine, while this is really not fine.
795 794 if repo.vfs in vfsmap.values():
796 795 raise error.ProgrammingError(
797 796 b'repo.vfs must not be added to vfsmap for security reasons'
798 797 )
799 798
800 799 with repo.transaction(b'clone'):
801 800 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
802 801 with nested(*ctxs):
803 802 for i in range(filecount):
804 803 src = util.readexactly(fp, 1)
805 804 vfs = vfsmap[src]
806 805 namelen = util.uvarintdecodestream(fp)
807 806 datalen = util.uvarintdecodestream(fp)
808 807
809 808 name = util.readexactly(fp, namelen)
810 809
811 810 if repo.ui.debugflag:
812 811 repo.ui.debug(
813 812 b'adding [%s] %s (%s)\n'
814 813 % (src, name, util.bytecount(datalen))
815 814 )
816 815
817 816 with vfs(name, b'w') as ofp:
818 817 for chunk in util.filechunkiter(fp, limit=datalen):
819 818 progress.increment(step=len(chunk))
820 819 ofp.write(chunk)
821 820
822 821 # force @filecache properties to be reloaded from
823 822 # streamclone-ed file at next access
824 823 repo.invalidate(clearfilecache=True)
825 824
826 825 elapsed = util.timer() - start
827 826 if elapsed <= 0:
828 827 elapsed = 0.001
829 828 repo.ui.status(
830 829 _(b'transferred %s in %.1f seconds (%s/sec)\n')
831 830 % (
832 831 util.bytecount(progress.pos),
833 832 elapsed,
834 833 util.bytecount(progress.pos / elapsed),
835 834 )
836 835 )
837 836 progress.complete()
838 837
839 838
840 839 def applybundlev2(repo, fp, filecount, filesize, requirements):
841 840 from . import localrepo
842 841
843 842 missingreqs = [r for r in requirements if r not in repo.supported]
844 843 if missingreqs:
845 844 raise error.Abort(
846 845 _(b'unable to apply stream clone: unsupported format: %s')
847 846 % b', '.join(sorted(missingreqs))
848 847 )
849 848
850 849 consumev2(repo, fp, filecount, filesize)
851 850
852 851 repo.requirements = new_stream_clone_requirements(
853 852 repo.requirements,
854 853 requirements,
855 854 )
856 855 repo.svfs.options = localrepo.resolvestorevfsoptions(
857 856 repo.ui, repo.requirements, repo.features
858 857 )
859 858 scmutil.writereporequirements(repo)
860 859 nodemap.post_stream_cleanup(repo)
861 860
862 861
863 862 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
864 863 hardlink = [True]
865 864
866 865 def copy_used():
867 866 hardlink[0] = False
868 867 progress.topic = _(b'copying')
869 868
870 869 for k, path, size in entries:
871 870 src_vfs = src_vfs_map[k]
872 871 dst_vfs = dst_vfs_map[k]
873 872 src_path = src_vfs.join(path)
874 873 dst_path = dst_vfs.join(path)
875 874 # We cannot use dirname and makedirs of dst_vfs here because the store
876 875 # encoding confuses them. See issue 6581 for details.
877 876 dirname = os.path.dirname(dst_path)
878 877 if not os.path.exists(dirname):
879 878 util.makedirs(dirname)
880 879 dst_vfs.register_file(path)
881 880 # XXX we could use the #nb_bytes argument.
882 881 util.copyfile(
883 882 src_path,
884 883 dst_path,
885 884 hardlink=hardlink[0],
886 885 no_hardlink_cb=copy_used,
887 886 check_fs_hardlink=False,
888 887 )
889 888 progress.increment()
890 889 return hardlink[0]
891 890
892 891
893 892 def local_copy(src_repo, dest_repo):
894 893 """copy all content from one local repository to another
895 894
896 895 This is useful for local clone"""
897 896 src_store_requirements = {
898 897 r
899 898 for r in src_repo.requirements
900 899 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
901 900 }
902 901 dest_store_requirements = {
903 902 r
904 903 for r in dest_repo.requirements
905 904 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
906 905 }
907 906 assert src_store_requirements == dest_store_requirements
908 907
909 908 with dest_repo.lock():
910 909 with src_repo.lock():
911 910
912 911 # bookmark is not integrated to the streaming as it might use the
913 912 # `repo.vfs` and they are too many sentitive data accessible
914 913 # through `repo.vfs` to expose it to streaming clone.
915 914 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
916 915 srcbookmarks = src_book_vfs.join(b'bookmarks')
917 916 bm_count = 0
918 917 if os.path.exists(srcbookmarks):
919 918 bm_count = 1
920 919
921 920 entries, totalfilesize = _v2_walk(
922 921 src_repo,
923 922 includes=None,
924 923 excludes=None,
925 924 includeobsmarkers=True,
926 925 )
927 926 src_vfs_map = _makemap(src_repo)
928 927 dest_vfs_map = _makemap(dest_repo)
929 928 progress = src_repo.ui.makeprogress(
930 929 topic=_(b'linking'),
931 930 total=len(entries) + bm_count,
932 931 unit=_(b'files'),
933 932 )
934 933 # copy files
935 934 #
936 935 # We could copy the full file while the source repository is locked
937 936 # and the other one without the lock. However, in the linking case,
938 937 # this would also requires checks that nobody is appending any data
939 938 # to the files while we do the clone, so this is not done yet. We
940 939 # could do this blindly when copying files.
941 940 files = ((k, path, size) for k, path, ftype, size in entries)
942 941 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
943 942
944 943 # copy bookmarks over
945 944 if bm_count:
946 945 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
947 946 dstbookmarks = dst_book_vfs.join(b'bookmarks')
948 947 util.copyfile(srcbookmarks, dstbookmarks)
949 948 progress.complete()
950 949 if hardlink:
951 950 msg = b'linked %d files\n'
952 951 else:
953 952 msg = b'copied %d files\n'
954 953 src_repo.ui.debug(msg % (len(entries) + bm_count))
955 954
956 955 with dest_repo.transaction(b"localclone") as tr:
957 956 dest_repo.store.write(tr)
958 957
959 958 # clean up transaction file as they do not make sense
960 959 transaction.cleanup_undo_files(dest_repo.ui.warn, dest_repo.vfs_map)
General Comments 0
You need to be logged in to leave comments. Login now