##// END OF EJS Templates
stream-clone: yield cache entry in `_entries_walk` too...
marmoute -
r51409:43ed1f12 default
parent child Browse files
Show More
@@ -1,955 +1,960 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8
9 9 import contextlib
10 10 import os
11 11 import struct
12 12
13 13 from .i18n import _
14 14 from .pycompat import open
15 15 from .interfaces import repository
16 16 from . import (
17 17 bookmarks,
18 18 cacheutil,
19 19 error,
20 20 narrowspec,
21 21 phases,
22 22 pycompat,
23 23 requirements as requirementsmod,
24 24 scmutil,
25 25 store,
26 26 transaction,
27 27 util,
28 28 )
29 29 from .revlogutils import (
30 30 nodemap,
31 31 )
32 32
33 33
34 34 def new_stream_clone_requirements(default_requirements, streamed_requirements):
35 35 """determine the final set of requirement for a new stream clone
36 36
37 37 this method combine the "default" requirements that a new repository would
38 38 use with the constaint we get from the stream clone content. We keep local
39 39 configuration choice when possible.
40 40 """
41 41 requirements = set(default_requirements)
42 42 requirements -= requirementsmod.STREAM_FIXED_REQUIREMENTS
43 43 requirements.update(streamed_requirements)
44 44 return requirements
45 45
46 46
47 47 def streamed_requirements(repo):
48 48 """the set of requirement the new clone will have to support
49 49
50 50 This is used for advertising the stream options and to generate the actual
51 51 stream content."""
52 52 requiredformats = (
53 53 repo.requirements & requirementsmod.STREAM_FIXED_REQUIREMENTS
54 54 )
55 55 return requiredformats
56 56
57 57
58 58 def canperformstreamclone(pullop, bundle2=False):
59 59 """Whether it is possible to perform a streaming clone as part of pull.
60 60
61 61 ``bundle2`` will cause the function to consider stream clone through
62 62 bundle2 and only through bundle2.
63 63
64 64 Returns a tuple of (supported, requirements). ``supported`` is True if
65 65 streaming clone is supported and False otherwise. ``requirements`` is
66 66 a set of repo requirements from the remote, or ``None`` if stream clone
67 67 isn't supported.
68 68 """
69 69 repo = pullop.repo
70 70 remote = pullop.remote
71 71
72 72 bundle2supported = False
73 73 if pullop.canusebundle2:
74 74 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
75 75 bundle2supported = True
76 76 # else
77 77 # Server doesn't support bundle2 stream clone or doesn't support
78 78 # the versions we support. Fall back and possibly allow legacy.
79 79
80 80 # Ensures legacy code path uses available bundle2.
81 81 if bundle2supported and not bundle2:
82 82 return False, None
83 83 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
84 84 elif bundle2 and not bundle2supported:
85 85 return False, None
86 86
87 87 # Streaming clone only works on empty repositories.
88 88 if len(repo):
89 89 return False, None
90 90
91 91 # Streaming clone only works if all data is being requested.
92 92 if pullop.heads:
93 93 return False, None
94 94
95 95 streamrequested = pullop.streamclonerequested
96 96
97 97 # If we don't have a preference, let the server decide for us. This
98 98 # likely only comes into play in LANs.
99 99 if streamrequested is None:
100 100 # The server can advertise whether to prefer streaming clone.
101 101 streamrequested = remote.capable(b'stream-preferred')
102 102
103 103 if not streamrequested:
104 104 return False, None
105 105
106 106 # In order for stream clone to work, the client has to support all the
107 107 # requirements advertised by the server.
108 108 #
109 109 # The server advertises its requirements via the "stream" and "streamreqs"
110 110 # capability. "stream" (a value-less capability) is advertised if and only
111 111 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
112 112 # is advertised and contains a comma-delimited list of requirements.
113 113 requirements = set()
114 114 if remote.capable(b'stream'):
115 115 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
116 116 else:
117 117 streamreqs = remote.capable(b'streamreqs')
118 118 # This is weird and shouldn't happen with modern servers.
119 119 if not streamreqs:
120 120 pullop.repo.ui.warn(
121 121 _(
122 122 b'warning: stream clone requested but server has them '
123 123 b'disabled\n'
124 124 )
125 125 )
126 126 return False, None
127 127
128 128 streamreqs = set(streamreqs.split(b','))
129 129 # Server requires something we don't support. Bail.
130 130 missingreqs = streamreqs - repo.supported
131 131 if missingreqs:
132 132 pullop.repo.ui.warn(
133 133 _(
134 134 b'warning: stream clone requested but client is missing '
135 135 b'requirements: %s\n'
136 136 )
137 137 % b', '.join(sorted(missingreqs))
138 138 )
139 139 pullop.repo.ui.warn(
140 140 _(
141 141 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
142 142 b'for more information)\n'
143 143 )
144 144 )
145 145 return False, None
146 146 requirements = streamreqs
147 147
148 148 return True, requirements
149 149
150 150
151 151 def maybeperformlegacystreamclone(pullop):
152 152 """Possibly perform a legacy stream clone operation.
153 153
154 154 Legacy stream clones are performed as part of pull but before all other
155 155 operations.
156 156
157 157 A legacy stream clone will not be performed if a bundle2 stream clone is
158 158 supported.
159 159 """
160 160 from . import localrepo
161 161
162 162 supported, requirements = canperformstreamclone(pullop)
163 163
164 164 if not supported:
165 165 return
166 166
167 167 repo = pullop.repo
168 168 remote = pullop.remote
169 169
170 170 # Save remote branchmap. We will use it later to speed up branchcache
171 171 # creation.
172 172 rbranchmap = None
173 173 if remote.capable(b'branchmap'):
174 174 with remote.commandexecutor() as e:
175 175 rbranchmap = e.callcommand(b'branchmap', {}).result()
176 176
177 177 repo.ui.status(_(b'streaming all changes\n'))
178 178
179 179 with remote.commandexecutor() as e:
180 180 fp = e.callcommand(b'stream_out', {}).result()
181 181
182 182 # TODO strictly speaking, this code should all be inside the context
183 183 # manager because the context manager is supposed to ensure all wire state
184 184 # is flushed when exiting. But the legacy peers don't do this, so it
185 185 # doesn't matter.
186 186 l = fp.readline()
187 187 try:
188 188 resp = int(l)
189 189 except ValueError:
190 190 raise error.ResponseError(
191 191 _(b'unexpected response from remote server:'), l
192 192 )
193 193 if resp == 1:
194 194 raise error.Abort(_(b'operation forbidden by server'))
195 195 elif resp == 2:
196 196 raise error.Abort(_(b'locking the remote repository failed'))
197 197 elif resp != 0:
198 198 raise error.Abort(_(b'the server sent an unknown error code'))
199 199
200 200 l = fp.readline()
201 201 try:
202 202 filecount, bytecount = map(int, l.split(b' ', 1))
203 203 except (ValueError, TypeError):
204 204 raise error.ResponseError(
205 205 _(b'unexpected response from remote server:'), l
206 206 )
207 207
208 208 with repo.lock():
209 209 consumev1(repo, fp, filecount, bytecount)
210 210 repo.requirements = new_stream_clone_requirements(
211 211 repo.requirements,
212 212 requirements,
213 213 )
214 214 repo.svfs.options = localrepo.resolvestorevfsoptions(
215 215 repo.ui, repo.requirements, repo.features
216 216 )
217 217 scmutil.writereporequirements(repo)
218 218 nodemap.post_stream_cleanup(repo)
219 219
220 220 if rbranchmap:
221 221 repo._branchcaches.replace(repo, rbranchmap)
222 222
223 223 repo.invalidate()
224 224
225 225
226 226 def allowservergeneration(repo):
227 227 """Whether streaming clones are allowed from the server."""
228 228 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
229 229 return False
230 230
231 231 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
232 232 return False
233 233
234 234 # The way stream clone works makes it impossible to hide secret changesets.
235 235 # So don't allow this by default.
236 236 secret = phases.hassecret(repo)
237 237 if secret:
238 238 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
239 239
240 240 return True
241 241
242 242
243 243 # This is it's own function so extensions can override it.
244 244 def _walkstreamfiles(repo, matcher=None, phase=False, obsolescence=False):
245 245 return repo.store.walk(matcher, phase=phase, obsolescence=obsolescence)
246 246
247 247
248 248 def generatev1(repo):
249 249 """Emit content for version 1 of a streaming clone.
250 250
251 251 This returns a 3-tuple of (file count, byte size, data iterator).
252 252
253 253 The data iterator consists of N entries for each file being transferred.
254 254 Each file entry starts as a line with the file name and integer size
255 255 delimited by a null byte.
256 256
257 257 The raw file data follows. Following the raw file data is the next file
258 258 entry, or EOF.
259 259
260 260 When used on the wire protocol, an additional line indicating protocol
261 261 success will be prepended to the stream. This function is not responsible
262 262 for adding it.
263 263
264 264 This function will obtain a repository lock to ensure a consistent view of
265 265 the store is captured. It therefore may raise LockError.
266 266 """
267 267 entries = []
268 268 total_bytes = 0
269 269 # Get consistent snapshot of repo, lock during scan.
270 270 with repo.lock():
271 271 repo.ui.debug(b'scanning\n')
272 272 for entry in _walkstreamfiles(repo):
273 273 for f in entry.files():
274 274 file_size = f.file_size(repo.store.vfs)
275 275 if file_size:
276 276 entries.append((f.unencoded_path, file_size))
277 277 total_bytes += file_size
278 278 _test_sync_point_walk_1(repo)
279 279 _test_sync_point_walk_2(repo)
280 280
281 281 repo.ui.debug(
282 282 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
283 283 )
284 284
285 285 svfs = repo.svfs
286 286 debugflag = repo.ui.debugflag
287 287
288 288 def emitrevlogdata():
289 289 for name, size in entries:
290 290 if debugflag:
291 291 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
292 292 # partially encode name over the wire for backwards compat
293 293 yield b'%s\0%d\n' % (store.encodedir(name), size)
294 294 # auditing at this stage is both pointless (paths are already
295 295 # trusted by the local repo) and expensive
296 296 with svfs(name, b'rb', auditpath=False) as fp:
297 297 if size <= 65536:
298 298 yield fp.read(size)
299 299 else:
300 300 for chunk in util.filechunkiter(fp, limit=size):
301 301 yield chunk
302 302
303 303 return len(entries), total_bytes, emitrevlogdata()
304 304
305 305
306 306 def generatev1wireproto(repo):
307 307 """Emit content for version 1 of streaming clone suitable for the wire.
308 308
309 309 This is the data output from ``generatev1()`` with 2 header lines. The
310 310 first line indicates overall success. The 2nd contains the file count and
311 311 byte size of payload.
312 312
313 313 The success line contains "0" for success, "1" for stream generation not
314 314 allowed, and "2" for error locking the repository (possibly indicating
315 315 a permissions error for the server process).
316 316 """
317 317 if not allowservergeneration(repo):
318 318 yield b'1\n'
319 319 return
320 320
321 321 try:
322 322 filecount, bytecount, it = generatev1(repo)
323 323 except error.LockError:
324 324 yield b'2\n'
325 325 return
326 326
327 327 # Indicates successful response.
328 328 yield b'0\n'
329 329 yield b'%d %d\n' % (filecount, bytecount)
330 330 for chunk in it:
331 331 yield chunk
332 332
333 333
334 334 def generatebundlev1(repo, compression=b'UN'):
335 335 """Emit content for version 1 of a stream clone bundle.
336 336
337 337 The first 4 bytes of the output ("HGS1") denote this as stream clone
338 338 bundle version 1.
339 339
340 340 The next 2 bytes indicate the compression type. Only "UN" is currently
341 341 supported.
342 342
343 343 The next 16 bytes are two 64-bit big endian unsigned integers indicating
344 344 file count and byte count, respectively.
345 345
346 346 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
347 347 of the requirements string, including a trailing \0. The following N bytes
348 348 are the requirements string, which is ASCII containing a comma-delimited
349 349 list of repo requirements that are needed to support the data.
350 350
351 351 The remaining content is the output of ``generatev1()`` (which may be
352 352 compressed in the future).
353 353
354 354 Returns a tuple of (requirements, data generator).
355 355 """
356 356 if compression != b'UN':
357 357 raise ValueError(b'we do not support the compression argument yet')
358 358
359 359 requirements = streamed_requirements(repo)
360 360 requires = b','.join(sorted(requirements))
361 361
362 362 def gen():
363 363 yield b'HGS1'
364 364 yield compression
365 365
366 366 filecount, bytecount, it = generatev1(repo)
367 367 repo.ui.status(
368 368 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
369 369 )
370 370
371 371 yield struct.pack(b'>QQ', filecount, bytecount)
372 372 yield struct.pack(b'>H', len(requires) + 1)
373 373 yield requires + b'\0'
374 374
375 375 # This is where we'll add compression in the future.
376 376 assert compression == b'UN'
377 377
378 378 progress = repo.ui.makeprogress(
379 379 _(b'bundle'), total=bytecount, unit=_(b'bytes')
380 380 )
381 381 progress.update(0)
382 382
383 383 for chunk in it:
384 384 progress.increment(step=len(chunk))
385 385 yield chunk
386 386
387 387 progress.complete()
388 388
389 389 return requirements, gen()
390 390
391 391
392 392 def consumev1(repo, fp, filecount, bytecount):
393 393 """Apply the contents from version 1 of a streaming clone file handle.
394 394
395 395 This takes the output from "stream_out" and applies it to the specified
396 396 repository.
397 397
398 398 Like "stream_out," the status line added by the wire protocol is not
399 399 handled by this function.
400 400 """
401 401 with repo.lock():
402 402 repo.ui.status(
403 403 _(b'%d files to transfer, %s of data\n')
404 404 % (filecount, util.bytecount(bytecount))
405 405 )
406 406 progress = repo.ui.makeprogress(
407 407 _(b'clone'), total=bytecount, unit=_(b'bytes')
408 408 )
409 409 progress.update(0)
410 410 start = util.timer()
411 411
412 412 # TODO: get rid of (potential) inconsistency
413 413 #
414 414 # If transaction is started and any @filecache property is
415 415 # changed at this point, it causes inconsistency between
416 416 # in-memory cached property and streamclone-ed file on the
417 417 # disk. Nested transaction prevents transaction scope "clone"
418 418 # below from writing in-memory changes out at the end of it,
419 419 # even though in-memory changes are discarded at the end of it
420 420 # regardless of transaction nesting.
421 421 #
422 422 # But transaction nesting can't be simply prohibited, because
423 423 # nesting occurs also in ordinary case (e.g. enabling
424 424 # clonebundles).
425 425
426 426 with repo.transaction(b'clone'):
427 427 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
428 428 for i in range(filecount):
429 429 # XXX doesn't support '\n' or '\r' in filenames
430 430 l = fp.readline()
431 431 try:
432 432 name, size = l.split(b'\0', 1)
433 433 size = int(size)
434 434 except (ValueError, TypeError):
435 435 raise error.ResponseError(
436 436 _(b'unexpected response from remote server:'), l
437 437 )
438 438 if repo.ui.debugflag:
439 439 repo.ui.debug(
440 440 b'adding %s (%s)\n' % (name, util.bytecount(size))
441 441 )
442 442 # for backwards compat, name was partially encoded
443 443 path = store.decodedir(name)
444 444 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
445 445 for chunk in util.filechunkiter(fp, limit=size):
446 446 progress.increment(step=len(chunk))
447 447 ofp.write(chunk)
448 448
449 449 # force @filecache properties to be reloaded from
450 450 # streamclone-ed file at next access
451 451 repo.invalidate(clearfilecache=True)
452 452
453 453 elapsed = util.timer() - start
454 454 if elapsed <= 0:
455 455 elapsed = 0.001
456 456 progress.complete()
457 457 repo.ui.status(
458 458 _(b'transferred %s in %.1f seconds (%s/sec)\n')
459 459 % (
460 460 util.bytecount(bytecount),
461 461 elapsed,
462 462 util.bytecount(bytecount / elapsed),
463 463 )
464 464 )
465 465
466 466
467 467 def readbundle1header(fp):
468 468 compression = fp.read(2)
469 469 if compression != b'UN':
470 470 raise error.Abort(
471 471 _(
472 472 b'only uncompressed stream clone bundles are '
473 473 b'supported; got %s'
474 474 )
475 475 % compression
476 476 )
477 477
478 478 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
479 479 requireslen = struct.unpack(b'>H', fp.read(2))[0]
480 480 requires = fp.read(requireslen)
481 481
482 482 if not requires.endswith(b'\0'):
483 483 raise error.Abort(
484 484 _(
485 485 b'malformed stream clone bundle: '
486 486 b'requirements not properly encoded'
487 487 )
488 488 )
489 489
490 490 requirements = set(requires.rstrip(b'\0').split(b','))
491 491
492 492 return filecount, bytecount, requirements
493 493
494 494
495 495 def applybundlev1(repo, fp):
496 496 """Apply the content from a stream clone bundle version 1.
497 497
498 498 We assume the 4 byte header has been read and validated and the file handle
499 499 is at the 2 byte compression identifier.
500 500 """
501 501 if len(repo):
502 502 raise error.Abort(
503 503 _(b'cannot apply stream clone bundle on non-empty repo')
504 504 )
505 505
506 506 filecount, bytecount, requirements = readbundle1header(fp)
507 507 missingreqs = requirements - repo.supported
508 508 if missingreqs:
509 509 raise error.Abort(
510 510 _(b'unable to apply stream clone: unsupported format: %s')
511 511 % b', '.join(sorted(missingreqs))
512 512 )
513 513
514 514 consumev1(repo, fp, filecount, bytecount)
515 515 nodemap.post_stream_cleanup(repo)
516 516
517 517
518 518 class streamcloneapplier:
519 519 """Class to manage applying streaming clone bundles.
520 520
521 521 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
522 522 readers to perform bundle type-specific functionality.
523 523 """
524 524
525 525 def __init__(self, fh):
526 526 self._fh = fh
527 527
528 528 def apply(self, repo):
529 529 return applybundlev1(repo, self._fh)
530 530
531 531
532 532 # type of file to stream
533 533 _fileappend = 0 # append only file
534 534 _filefull = 1 # full snapshot file
535 535
536 536 # Source of the file
537 537 _srcstore = b's' # store (svfs)
538 538 _srccache = b'c' # cache (cache)
539 539
540 540 # This is it's own function so extensions can override it.
541 541 def _walkstreamfullstorefiles(repo):
542 542 """list snapshot file from the store"""
543 543 fnames = []
544 544 if not repo.publishing():
545 545 fnames.append(b'phaseroots')
546 546 return fnames
547 547
548 548
549 549 def _filterfull(entry, copy, vfsmap):
550 550 """actually copy the snapshot files"""
551 551 src, name, ftype, data = entry
552 552 if ftype != _filefull:
553 553 return entry
554 554 return (src, name, ftype, copy(vfsmap[src].join(name)))
555 555
556 556
557 557 @contextlib.contextmanager
558 558 def maketempcopies():
559 559 """return a function to temporary copy file"""
560 560
561 561 files = []
562 562 dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-')
563 563 try:
564 564
565 565 def copy(src):
566 566 fd, dst = pycompat.mkstemp(
567 567 prefix=os.path.basename(src), dir=dst_dir
568 568 )
569 569 os.close(fd)
570 570 files.append(dst)
571 571 util.copyfiles(src, dst, hardlink=True)
572 572 return dst
573 573
574 574 yield copy
575 575 finally:
576 576 for tmp in files:
577 577 util.tryunlink(tmp)
578 578 util.tryrmdir(dst_dir)
579 579
580 580
581 581 def _makemap(repo):
582 582 """make a (src -> vfs) map for the repo"""
583 583 vfsmap = {
584 584 _srcstore: repo.svfs,
585 585 _srccache: repo.cachevfs,
586 586 }
587 587 # we keep repo.vfs out of the on purpose, ther are too many danger there
588 588 # (eg: .hg/hgrc)
589 589 assert repo.vfs not in vfsmap.values()
590 590
591 591 return vfsmap
592 592
593 593
594 594 def _emit2(repo, entries, totalfilesize):
595 595 """actually emit the stream bundle"""
596 596 vfsmap = _makemap(repo)
597 597 # we keep repo.vfs out of the on purpose, ther are too many danger there
598 598 # (eg: .hg/hgrc),
599 599 #
600 600 # this assert is duplicated (from _makemap) as author might think this is
601 601 # fine, while this is really not fine.
602 602 if repo.vfs in vfsmap.values():
603 603 raise error.ProgrammingError(
604 604 b'repo.vfs must not be added to vfsmap for security reasons'
605 605 )
606 606
607 607 progress = repo.ui.makeprogress(
608 608 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
609 609 )
610 610 progress.update(0)
611 611 with maketempcopies() as copy, progress:
612 612 # copy is delayed until we are in the try
613 613 entries = [_filterfull(e, copy, vfsmap) for e in entries]
614 614 yield None # this release the lock on the repository
615 615 totalbytecount = 0
616 616
617 617 for src, name, ftype, data in entries:
618 618 vfs = vfsmap[src]
619 619 yield src
620 620 yield util.uvarintencode(len(name))
621 621 if ftype == _fileappend:
622 622 fp = vfs(name)
623 623 size = data
624 624 elif ftype == _filefull:
625 625 fp = open(data, b'rb')
626 626 size = util.fstat(fp).st_size
627 627 bytecount = 0
628 628 try:
629 629 yield util.uvarintencode(size)
630 630 yield name
631 631 if size <= 65536:
632 632 chunks = (fp.read(size),)
633 633 else:
634 634 chunks = util.filechunkiter(fp, limit=size)
635 635 for chunk in chunks:
636 636 bytecount += len(chunk)
637 637 totalbytecount += len(chunk)
638 638 progress.update(totalbytecount)
639 639 yield chunk
640 640 if bytecount != size:
641 641 # Would most likely be caused by a race due to `hg strip` or
642 642 # a revlog split
643 643 raise error.Abort(
644 644 _(
645 645 b'clone could only read %d bytes from %s, but '
646 646 b'expected %d bytes'
647 647 )
648 648 % (bytecount, name, size)
649 649 )
650 650 finally:
651 651 fp.close()
652 652
653 653
654 654 def _test_sync_point_walk_1(repo):
655 655 """a function for synchronisation during tests"""
656 656
657 657
658 658 def _test_sync_point_walk_2(repo):
659 659 """a function for synchronisation during tests"""
660 660
661 661
662 662 def _entries_walk(repo, includes, excludes, includeobsmarkers):
663 663 """emit a seris of files information useful to clone a repo
664 664
665 665 return (vfs-key, entry) iterator
666 666
667 667 Where `entry` is StoreEntry. (used even for cache entries)
668 668 """
669 669 assert repo._currentlock(repo._lockref) is not None
670 670
671 671 matcher = None
672 672 if includes or excludes:
673 673 matcher = narrowspec.match(repo.root, includes, excludes)
674 674
675 675 phase = not repo.publishing()
676 676 entries = _walkstreamfiles(
677 677 repo,
678 678 matcher,
679 679 phase=phase,
680 680 obsolescence=includeobsmarkers,
681 681 )
682 682 for entry in entries:
683 683 yield (_srcstore, entry)
684 684
685 for name in cacheutil.cachetocopy(repo):
686 if repo.cachevfs.exists(name):
687 # not really a StoreEntry, but close enough
688 entry = store.SimpleStoreEntry(
689 entry_path=name,
690 is_volatile=True,
691 )
692 yield (_srccache, entry)
693
685 694
686 695 def _v2_walk(repo, includes, excludes, includeobsmarkers):
687 696 """emit a seris of files information useful to clone a repo
688 697
689 698 return (entries, totalfilesize)
690 699
691 700 entries is a list of tuple (vfs-key, file-path, file-type, size)
692 701
693 702 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
694 703 - `name`: file path of the file to copy (to be feed to the vfss)
695 704 - `file-type`: do this file need to be copied with the source lock ?
696 705 - `size`: the size of the file (or None)
697 706 """
698 707 assert repo._currentlock(repo._lockref) is not None
699 708 files = []
700 709 totalfilesize = 0
701 710
702 711 vfsmap = _makemap(repo)
703 712 entries = _entries_walk(repo, includes, excludes, includeobsmarkers)
704 713 for vfs_key, entry in entries:
705 714 vfs = vfsmap[vfs_key]
706 715 for f in entry.files():
707 716 file_size = f.file_size(vfs)
708 717 if file_size:
709 718 ft = _fileappend
710 719 if f.is_volatile:
711 720 ft = _filefull
712 721 files.append((vfs_key, f.unencoded_path, ft, file_size))
713 722 totalfilesize += file_size
714 for name in cacheutil.cachetocopy(repo):
715 if repo.cachevfs.exists(name):
716 totalfilesize += repo.cachevfs.lstat(name).st_size
717 files.append((_srccache, name, _filefull, None))
718 723 return files, totalfilesize
719 724
720 725
721 726 def generatev2(repo, includes, excludes, includeobsmarkers):
722 727 """Emit content for version 2 of a streaming clone.
723 728
724 729 the data stream consists the following entries:
725 730 1) A char representing the file destination (eg: store or cache)
726 731 2) A varint containing the length of the filename
727 732 3) A varint containing the length of file data
728 733 4) N bytes containing the filename (the internal, store-agnostic form)
729 734 5) N bytes containing the file data
730 735
731 736 Returns a 3-tuple of (file count, file size, data iterator).
732 737 """
733 738
734 739 with repo.lock():
735 740
736 741 repo.ui.debug(b'scanning\n')
737 742
738 743 entries, totalfilesize = _v2_walk(
739 744 repo,
740 745 includes=includes,
741 746 excludes=excludes,
742 747 includeobsmarkers=includeobsmarkers,
743 748 )
744 749
745 750 chunks = _emit2(repo, entries, totalfilesize)
746 751 first = next(chunks)
747 752 assert first is None
748 753 _test_sync_point_walk_1(repo)
749 754 _test_sync_point_walk_2(repo)
750 755
751 756 return len(entries), totalfilesize, chunks
752 757
753 758
754 759 @contextlib.contextmanager
755 760 def nested(*ctxs):
756 761 this = ctxs[0]
757 762 rest = ctxs[1:]
758 763 with this:
759 764 if rest:
760 765 with nested(*rest):
761 766 yield
762 767 else:
763 768 yield
764 769
765 770
766 771 def consumev2(repo, fp, filecount, filesize):
767 772 """Apply the contents from a version 2 streaming clone.
768 773
769 774 Data is read from an object that only needs to provide a ``read(size)``
770 775 method.
771 776 """
772 777 with repo.lock():
773 778 repo.ui.status(
774 779 _(b'%d files to transfer, %s of data\n')
775 780 % (filecount, util.bytecount(filesize))
776 781 )
777 782
778 783 start = util.timer()
779 784 progress = repo.ui.makeprogress(
780 785 _(b'clone'), total=filesize, unit=_(b'bytes')
781 786 )
782 787 progress.update(0)
783 788
784 789 vfsmap = _makemap(repo)
785 790 # we keep repo.vfs out of the on purpose, ther are too many danger
786 791 # there (eg: .hg/hgrc),
787 792 #
788 793 # this assert is duplicated (from _makemap) as author might think this
789 794 # is fine, while this is really not fine.
790 795 if repo.vfs in vfsmap.values():
791 796 raise error.ProgrammingError(
792 797 b'repo.vfs must not be added to vfsmap for security reasons'
793 798 )
794 799
795 800 with repo.transaction(b'clone'):
796 801 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
797 802 with nested(*ctxs):
798 803 for i in range(filecount):
799 804 src = util.readexactly(fp, 1)
800 805 vfs = vfsmap[src]
801 806 namelen = util.uvarintdecodestream(fp)
802 807 datalen = util.uvarintdecodestream(fp)
803 808
804 809 name = util.readexactly(fp, namelen)
805 810
806 811 if repo.ui.debugflag:
807 812 repo.ui.debug(
808 813 b'adding [%s] %s (%s)\n'
809 814 % (src, name, util.bytecount(datalen))
810 815 )
811 816
812 817 with vfs(name, b'w') as ofp:
813 818 for chunk in util.filechunkiter(fp, limit=datalen):
814 819 progress.increment(step=len(chunk))
815 820 ofp.write(chunk)
816 821
817 822 # force @filecache properties to be reloaded from
818 823 # streamclone-ed file at next access
819 824 repo.invalidate(clearfilecache=True)
820 825
821 826 elapsed = util.timer() - start
822 827 if elapsed <= 0:
823 828 elapsed = 0.001
824 829 repo.ui.status(
825 830 _(b'transferred %s in %.1f seconds (%s/sec)\n')
826 831 % (
827 832 util.bytecount(progress.pos),
828 833 elapsed,
829 834 util.bytecount(progress.pos / elapsed),
830 835 )
831 836 )
832 837 progress.complete()
833 838
834 839
835 840 def applybundlev2(repo, fp, filecount, filesize, requirements):
836 841 from . import localrepo
837 842
838 843 missingreqs = [r for r in requirements if r not in repo.supported]
839 844 if missingreqs:
840 845 raise error.Abort(
841 846 _(b'unable to apply stream clone: unsupported format: %s')
842 847 % b', '.join(sorted(missingreqs))
843 848 )
844 849
845 850 consumev2(repo, fp, filecount, filesize)
846 851
847 852 repo.requirements = new_stream_clone_requirements(
848 853 repo.requirements,
849 854 requirements,
850 855 )
851 856 repo.svfs.options = localrepo.resolvestorevfsoptions(
852 857 repo.ui, repo.requirements, repo.features
853 858 )
854 859 scmutil.writereporequirements(repo)
855 860 nodemap.post_stream_cleanup(repo)
856 861
857 862
858 863 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
859 864 hardlink = [True]
860 865
861 866 def copy_used():
862 867 hardlink[0] = False
863 868 progress.topic = _(b'copying')
864 869
865 870 for k, path, size in entries:
866 871 src_vfs = src_vfs_map[k]
867 872 dst_vfs = dst_vfs_map[k]
868 873 src_path = src_vfs.join(path)
869 874 dst_path = dst_vfs.join(path)
870 875 # We cannot use dirname and makedirs of dst_vfs here because the store
871 876 # encoding confuses them. See issue 6581 for details.
872 877 dirname = os.path.dirname(dst_path)
873 878 if not os.path.exists(dirname):
874 879 util.makedirs(dirname)
875 880 dst_vfs.register_file(path)
876 881 # XXX we could use the #nb_bytes argument.
877 882 util.copyfile(
878 883 src_path,
879 884 dst_path,
880 885 hardlink=hardlink[0],
881 886 no_hardlink_cb=copy_used,
882 887 check_fs_hardlink=False,
883 888 )
884 889 progress.increment()
885 890 return hardlink[0]
886 891
887 892
888 893 def local_copy(src_repo, dest_repo):
889 894 """copy all content from one local repository to another
890 895
891 896 This is useful for local clone"""
892 897 src_store_requirements = {
893 898 r
894 899 for r in src_repo.requirements
895 900 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
896 901 }
897 902 dest_store_requirements = {
898 903 r
899 904 for r in dest_repo.requirements
900 905 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
901 906 }
902 907 assert src_store_requirements == dest_store_requirements
903 908
904 909 with dest_repo.lock():
905 910 with src_repo.lock():
906 911
907 912 # bookmark is not integrated to the streaming as it might use the
908 913 # `repo.vfs` and they are too many sentitive data accessible
909 914 # through `repo.vfs` to expose it to streaming clone.
910 915 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
911 916 srcbookmarks = src_book_vfs.join(b'bookmarks')
912 917 bm_count = 0
913 918 if os.path.exists(srcbookmarks):
914 919 bm_count = 1
915 920
916 921 entries, totalfilesize = _v2_walk(
917 922 src_repo,
918 923 includes=None,
919 924 excludes=None,
920 925 includeobsmarkers=True,
921 926 )
922 927 src_vfs_map = _makemap(src_repo)
923 928 dest_vfs_map = _makemap(dest_repo)
924 929 progress = src_repo.ui.makeprogress(
925 930 topic=_(b'linking'),
926 931 total=len(entries) + bm_count,
927 932 unit=_(b'files'),
928 933 )
929 934 # copy files
930 935 #
931 936 # We could copy the full file while the source repository is locked
932 937 # and the other one without the lock. However, in the linking case,
933 938 # this would also requires checks that nobody is appending any data
934 939 # to the files while we do the clone, so this is not done yet. We
935 940 # could do this blindly when copying files.
936 941 files = ((k, path, size) for k, path, ftype, size in entries)
937 942 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
938 943
939 944 # copy bookmarks over
940 945 if bm_count:
941 946 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
942 947 dstbookmarks = dst_book_vfs.join(b'bookmarks')
943 948 util.copyfile(srcbookmarks, dstbookmarks)
944 949 progress.complete()
945 950 if hardlink:
946 951 msg = b'linked %d files\n'
947 952 else:
948 953 msg = b'copied %d files\n'
949 954 src_repo.ui.debug(msg % (len(entries) + bm_count))
950 955
951 956 with dest_repo.transaction(b"localclone") as tr:
952 957 dest_repo.store.write(tr)
953 958
954 959 # clean up transaction file as they do not make sense
955 960 transaction.cleanup_undo_files(dest_repo.ui.warn, dest_repo.vfs_map)
General Comments 0
You need to be logged in to leave comments. Login now