##// END OF EJS Templates
stream-clone: introduce a _entries_walk...
marmoute -
r51408:06d580b8 default
parent child Browse files
Show More
@@ -1,936 +1,955 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8
9 9 import contextlib
10 10 import os
11 11 import struct
12 12
13 13 from .i18n import _
14 14 from .pycompat import open
15 15 from .interfaces import repository
16 16 from . import (
17 17 bookmarks,
18 18 cacheutil,
19 19 error,
20 20 narrowspec,
21 21 phases,
22 22 pycompat,
23 23 requirements as requirementsmod,
24 24 scmutil,
25 25 store,
26 26 transaction,
27 27 util,
28 28 )
29 29 from .revlogutils import (
30 30 nodemap,
31 31 )
32 32
33 33
34 34 def new_stream_clone_requirements(default_requirements, streamed_requirements):
35 35 """determine the final set of requirement for a new stream clone
36 36
37 37 this method combine the "default" requirements that a new repository would
38 38 use with the constaint we get from the stream clone content. We keep local
39 39 configuration choice when possible.
40 40 """
41 41 requirements = set(default_requirements)
42 42 requirements -= requirementsmod.STREAM_FIXED_REQUIREMENTS
43 43 requirements.update(streamed_requirements)
44 44 return requirements
45 45
46 46
47 47 def streamed_requirements(repo):
48 48 """the set of requirement the new clone will have to support
49 49
50 50 This is used for advertising the stream options and to generate the actual
51 51 stream content."""
52 52 requiredformats = (
53 53 repo.requirements & requirementsmod.STREAM_FIXED_REQUIREMENTS
54 54 )
55 55 return requiredformats
56 56
57 57
58 58 def canperformstreamclone(pullop, bundle2=False):
59 59 """Whether it is possible to perform a streaming clone as part of pull.
60 60
61 61 ``bundle2`` will cause the function to consider stream clone through
62 62 bundle2 and only through bundle2.
63 63
64 64 Returns a tuple of (supported, requirements). ``supported`` is True if
65 65 streaming clone is supported and False otherwise. ``requirements`` is
66 66 a set of repo requirements from the remote, or ``None`` if stream clone
67 67 isn't supported.
68 68 """
69 69 repo = pullop.repo
70 70 remote = pullop.remote
71 71
72 72 bundle2supported = False
73 73 if pullop.canusebundle2:
74 74 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
75 75 bundle2supported = True
76 76 # else
77 77 # Server doesn't support bundle2 stream clone or doesn't support
78 78 # the versions we support. Fall back and possibly allow legacy.
79 79
80 80 # Ensures legacy code path uses available bundle2.
81 81 if bundle2supported and not bundle2:
82 82 return False, None
83 83 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
84 84 elif bundle2 and not bundle2supported:
85 85 return False, None
86 86
87 87 # Streaming clone only works on empty repositories.
88 88 if len(repo):
89 89 return False, None
90 90
91 91 # Streaming clone only works if all data is being requested.
92 92 if pullop.heads:
93 93 return False, None
94 94
95 95 streamrequested = pullop.streamclonerequested
96 96
97 97 # If we don't have a preference, let the server decide for us. This
98 98 # likely only comes into play in LANs.
99 99 if streamrequested is None:
100 100 # The server can advertise whether to prefer streaming clone.
101 101 streamrequested = remote.capable(b'stream-preferred')
102 102
103 103 if not streamrequested:
104 104 return False, None
105 105
106 106 # In order for stream clone to work, the client has to support all the
107 107 # requirements advertised by the server.
108 108 #
109 109 # The server advertises its requirements via the "stream" and "streamreqs"
110 110 # capability. "stream" (a value-less capability) is advertised if and only
111 111 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
112 112 # is advertised and contains a comma-delimited list of requirements.
113 113 requirements = set()
114 114 if remote.capable(b'stream'):
115 115 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
116 116 else:
117 117 streamreqs = remote.capable(b'streamreqs')
118 118 # This is weird and shouldn't happen with modern servers.
119 119 if not streamreqs:
120 120 pullop.repo.ui.warn(
121 121 _(
122 122 b'warning: stream clone requested but server has them '
123 123 b'disabled\n'
124 124 )
125 125 )
126 126 return False, None
127 127
128 128 streamreqs = set(streamreqs.split(b','))
129 129 # Server requires something we don't support. Bail.
130 130 missingreqs = streamreqs - repo.supported
131 131 if missingreqs:
132 132 pullop.repo.ui.warn(
133 133 _(
134 134 b'warning: stream clone requested but client is missing '
135 135 b'requirements: %s\n'
136 136 )
137 137 % b', '.join(sorted(missingreqs))
138 138 )
139 139 pullop.repo.ui.warn(
140 140 _(
141 141 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
142 142 b'for more information)\n'
143 143 )
144 144 )
145 145 return False, None
146 146 requirements = streamreqs
147 147
148 148 return True, requirements
149 149
150 150
151 151 def maybeperformlegacystreamclone(pullop):
152 152 """Possibly perform a legacy stream clone operation.
153 153
154 154 Legacy stream clones are performed as part of pull but before all other
155 155 operations.
156 156
157 157 A legacy stream clone will not be performed if a bundle2 stream clone is
158 158 supported.
159 159 """
160 160 from . import localrepo
161 161
162 162 supported, requirements = canperformstreamclone(pullop)
163 163
164 164 if not supported:
165 165 return
166 166
167 167 repo = pullop.repo
168 168 remote = pullop.remote
169 169
170 170 # Save remote branchmap. We will use it later to speed up branchcache
171 171 # creation.
172 172 rbranchmap = None
173 173 if remote.capable(b'branchmap'):
174 174 with remote.commandexecutor() as e:
175 175 rbranchmap = e.callcommand(b'branchmap', {}).result()
176 176
177 177 repo.ui.status(_(b'streaming all changes\n'))
178 178
179 179 with remote.commandexecutor() as e:
180 180 fp = e.callcommand(b'stream_out', {}).result()
181 181
182 182 # TODO strictly speaking, this code should all be inside the context
183 183 # manager because the context manager is supposed to ensure all wire state
184 184 # is flushed when exiting. But the legacy peers don't do this, so it
185 185 # doesn't matter.
186 186 l = fp.readline()
187 187 try:
188 188 resp = int(l)
189 189 except ValueError:
190 190 raise error.ResponseError(
191 191 _(b'unexpected response from remote server:'), l
192 192 )
193 193 if resp == 1:
194 194 raise error.Abort(_(b'operation forbidden by server'))
195 195 elif resp == 2:
196 196 raise error.Abort(_(b'locking the remote repository failed'))
197 197 elif resp != 0:
198 198 raise error.Abort(_(b'the server sent an unknown error code'))
199 199
200 200 l = fp.readline()
201 201 try:
202 202 filecount, bytecount = map(int, l.split(b' ', 1))
203 203 except (ValueError, TypeError):
204 204 raise error.ResponseError(
205 205 _(b'unexpected response from remote server:'), l
206 206 )
207 207
208 208 with repo.lock():
209 209 consumev1(repo, fp, filecount, bytecount)
210 210 repo.requirements = new_stream_clone_requirements(
211 211 repo.requirements,
212 212 requirements,
213 213 )
214 214 repo.svfs.options = localrepo.resolvestorevfsoptions(
215 215 repo.ui, repo.requirements, repo.features
216 216 )
217 217 scmutil.writereporequirements(repo)
218 218 nodemap.post_stream_cleanup(repo)
219 219
220 220 if rbranchmap:
221 221 repo._branchcaches.replace(repo, rbranchmap)
222 222
223 223 repo.invalidate()
224 224
225 225
226 226 def allowservergeneration(repo):
227 227 """Whether streaming clones are allowed from the server."""
228 228 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
229 229 return False
230 230
231 231 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
232 232 return False
233 233
234 234 # The way stream clone works makes it impossible to hide secret changesets.
235 235 # So don't allow this by default.
236 236 secret = phases.hassecret(repo)
237 237 if secret:
238 238 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
239 239
240 240 return True
241 241
242 242
243 243 # This is it's own function so extensions can override it.
244 244 def _walkstreamfiles(repo, matcher=None, phase=False, obsolescence=False):
245 245 return repo.store.walk(matcher, phase=phase, obsolescence=obsolescence)
246 246
247 247
248 248 def generatev1(repo):
249 249 """Emit content for version 1 of a streaming clone.
250 250
251 251 This returns a 3-tuple of (file count, byte size, data iterator).
252 252
253 253 The data iterator consists of N entries for each file being transferred.
254 254 Each file entry starts as a line with the file name and integer size
255 255 delimited by a null byte.
256 256
257 257 The raw file data follows. Following the raw file data is the next file
258 258 entry, or EOF.
259 259
260 260 When used on the wire protocol, an additional line indicating protocol
261 261 success will be prepended to the stream. This function is not responsible
262 262 for adding it.
263 263
264 264 This function will obtain a repository lock to ensure a consistent view of
265 265 the store is captured. It therefore may raise LockError.
266 266 """
267 267 entries = []
268 268 total_bytes = 0
269 269 # Get consistent snapshot of repo, lock during scan.
270 270 with repo.lock():
271 271 repo.ui.debug(b'scanning\n')
272 272 for entry in _walkstreamfiles(repo):
273 273 for f in entry.files():
274 274 file_size = f.file_size(repo.store.vfs)
275 275 if file_size:
276 276 entries.append((f.unencoded_path, file_size))
277 277 total_bytes += file_size
278 278 _test_sync_point_walk_1(repo)
279 279 _test_sync_point_walk_2(repo)
280 280
281 281 repo.ui.debug(
282 282 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
283 283 )
284 284
285 285 svfs = repo.svfs
286 286 debugflag = repo.ui.debugflag
287 287
288 288 def emitrevlogdata():
289 289 for name, size in entries:
290 290 if debugflag:
291 291 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
292 292 # partially encode name over the wire for backwards compat
293 293 yield b'%s\0%d\n' % (store.encodedir(name), size)
294 294 # auditing at this stage is both pointless (paths are already
295 295 # trusted by the local repo) and expensive
296 296 with svfs(name, b'rb', auditpath=False) as fp:
297 297 if size <= 65536:
298 298 yield fp.read(size)
299 299 else:
300 300 for chunk in util.filechunkiter(fp, limit=size):
301 301 yield chunk
302 302
303 303 return len(entries), total_bytes, emitrevlogdata()
304 304
305 305
306 306 def generatev1wireproto(repo):
307 307 """Emit content for version 1 of streaming clone suitable for the wire.
308 308
309 309 This is the data output from ``generatev1()`` with 2 header lines. The
310 310 first line indicates overall success. The 2nd contains the file count and
311 311 byte size of payload.
312 312
313 313 The success line contains "0" for success, "1" for stream generation not
314 314 allowed, and "2" for error locking the repository (possibly indicating
315 315 a permissions error for the server process).
316 316 """
317 317 if not allowservergeneration(repo):
318 318 yield b'1\n'
319 319 return
320 320
321 321 try:
322 322 filecount, bytecount, it = generatev1(repo)
323 323 except error.LockError:
324 324 yield b'2\n'
325 325 return
326 326
327 327 # Indicates successful response.
328 328 yield b'0\n'
329 329 yield b'%d %d\n' % (filecount, bytecount)
330 330 for chunk in it:
331 331 yield chunk
332 332
333 333
334 334 def generatebundlev1(repo, compression=b'UN'):
335 335 """Emit content for version 1 of a stream clone bundle.
336 336
337 337 The first 4 bytes of the output ("HGS1") denote this as stream clone
338 338 bundle version 1.
339 339
340 340 The next 2 bytes indicate the compression type. Only "UN" is currently
341 341 supported.
342 342
343 343 The next 16 bytes are two 64-bit big endian unsigned integers indicating
344 344 file count and byte count, respectively.
345 345
346 346 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
347 347 of the requirements string, including a trailing \0. The following N bytes
348 348 are the requirements string, which is ASCII containing a comma-delimited
349 349 list of repo requirements that are needed to support the data.
350 350
351 351 The remaining content is the output of ``generatev1()`` (which may be
352 352 compressed in the future).
353 353
354 354 Returns a tuple of (requirements, data generator).
355 355 """
356 356 if compression != b'UN':
357 357 raise ValueError(b'we do not support the compression argument yet')
358 358
359 359 requirements = streamed_requirements(repo)
360 360 requires = b','.join(sorted(requirements))
361 361
362 362 def gen():
363 363 yield b'HGS1'
364 364 yield compression
365 365
366 366 filecount, bytecount, it = generatev1(repo)
367 367 repo.ui.status(
368 368 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
369 369 )
370 370
371 371 yield struct.pack(b'>QQ', filecount, bytecount)
372 372 yield struct.pack(b'>H', len(requires) + 1)
373 373 yield requires + b'\0'
374 374
375 375 # This is where we'll add compression in the future.
376 376 assert compression == b'UN'
377 377
378 378 progress = repo.ui.makeprogress(
379 379 _(b'bundle'), total=bytecount, unit=_(b'bytes')
380 380 )
381 381 progress.update(0)
382 382
383 383 for chunk in it:
384 384 progress.increment(step=len(chunk))
385 385 yield chunk
386 386
387 387 progress.complete()
388 388
389 389 return requirements, gen()
390 390
391 391
392 392 def consumev1(repo, fp, filecount, bytecount):
393 393 """Apply the contents from version 1 of a streaming clone file handle.
394 394
395 395 This takes the output from "stream_out" and applies it to the specified
396 396 repository.
397 397
398 398 Like "stream_out," the status line added by the wire protocol is not
399 399 handled by this function.
400 400 """
401 401 with repo.lock():
402 402 repo.ui.status(
403 403 _(b'%d files to transfer, %s of data\n')
404 404 % (filecount, util.bytecount(bytecount))
405 405 )
406 406 progress = repo.ui.makeprogress(
407 407 _(b'clone'), total=bytecount, unit=_(b'bytes')
408 408 )
409 409 progress.update(0)
410 410 start = util.timer()
411 411
412 412 # TODO: get rid of (potential) inconsistency
413 413 #
414 414 # If transaction is started and any @filecache property is
415 415 # changed at this point, it causes inconsistency between
416 416 # in-memory cached property and streamclone-ed file on the
417 417 # disk. Nested transaction prevents transaction scope "clone"
418 418 # below from writing in-memory changes out at the end of it,
419 419 # even though in-memory changes are discarded at the end of it
420 420 # regardless of transaction nesting.
421 421 #
422 422 # But transaction nesting can't be simply prohibited, because
423 423 # nesting occurs also in ordinary case (e.g. enabling
424 424 # clonebundles).
425 425
426 426 with repo.transaction(b'clone'):
427 427 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
428 428 for i in range(filecount):
429 429 # XXX doesn't support '\n' or '\r' in filenames
430 430 l = fp.readline()
431 431 try:
432 432 name, size = l.split(b'\0', 1)
433 433 size = int(size)
434 434 except (ValueError, TypeError):
435 435 raise error.ResponseError(
436 436 _(b'unexpected response from remote server:'), l
437 437 )
438 438 if repo.ui.debugflag:
439 439 repo.ui.debug(
440 440 b'adding %s (%s)\n' % (name, util.bytecount(size))
441 441 )
442 442 # for backwards compat, name was partially encoded
443 443 path = store.decodedir(name)
444 444 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
445 445 for chunk in util.filechunkiter(fp, limit=size):
446 446 progress.increment(step=len(chunk))
447 447 ofp.write(chunk)
448 448
449 449 # force @filecache properties to be reloaded from
450 450 # streamclone-ed file at next access
451 451 repo.invalidate(clearfilecache=True)
452 452
453 453 elapsed = util.timer() - start
454 454 if elapsed <= 0:
455 455 elapsed = 0.001
456 456 progress.complete()
457 457 repo.ui.status(
458 458 _(b'transferred %s in %.1f seconds (%s/sec)\n')
459 459 % (
460 460 util.bytecount(bytecount),
461 461 elapsed,
462 462 util.bytecount(bytecount / elapsed),
463 463 )
464 464 )
465 465
466 466
467 467 def readbundle1header(fp):
468 468 compression = fp.read(2)
469 469 if compression != b'UN':
470 470 raise error.Abort(
471 471 _(
472 472 b'only uncompressed stream clone bundles are '
473 473 b'supported; got %s'
474 474 )
475 475 % compression
476 476 )
477 477
478 478 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
479 479 requireslen = struct.unpack(b'>H', fp.read(2))[0]
480 480 requires = fp.read(requireslen)
481 481
482 482 if not requires.endswith(b'\0'):
483 483 raise error.Abort(
484 484 _(
485 485 b'malformed stream clone bundle: '
486 486 b'requirements not properly encoded'
487 487 )
488 488 )
489 489
490 490 requirements = set(requires.rstrip(b'\0').split(b','))
491 491
492 492 return filecount, bytecount, requirements
493 493
494 494
495 495 def applybundlev1(repo, fp):
496 496 """Apply the content from a stream clone bundle version 1.
497 497
498 498 We assume the 4 byte header has been read and validated and the file handle
499 499 is at the 2 byte compression identifier.
500 500 """
501 501 if len(repo):
502 502 raise error.Abort(
503 503 _(b'cannot apply stream clone bundle on non-empty repo')
504 504 )
505 505
506 506 filecount, bytecount, requirements = readbundle1header(fp)
507 507 missingreqs = requirements - repo.supported
508 508 if missingreqs:
509 509 raise error.Abort(
510 510 _(b'unable to apply stream clone: unsupported format: %s')
511 511 % b', '.join(sorted(missingreqs))
512 512 )
513 513
514 514 consumev1(repo, fp, filecount, bytecount)
515 515 nodemap.post_stream_cleanup(repo)
516 516
517 517
518 518 class streamcloneapplier:
519 519 """Class to manage applying streaming clone bundles.
520 520
521 521 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
522 522 readers to perform bundle type-specific functionality.
523 523 """
524 524
525 525 def __init__(self, fh):
526 526 self._fh = fh
527 527
528 528 def apply(self, repo):
529 529 return applybundlev1(repo, self._fh)
530 530
531 531
532 532 # type of file to stream
533 533 _fileappend = 0 # append only file
534 534 _filefull = 1 # full snapshot file
535 535
536 536 # Source of the file
537 537 _srcstore = b's' # store (svfs)
538 538 _srccache = b'c' # cache (cache)
539 539
540 540 # This is it's own function so extensions can override it.
541 541 def _walkstreamfullstorefiles(repo):
542 542 """list snapshot file from the store"""
543 543 fnames = []
544 544 if not repo.publishing():
545 545 fnames.append(b'phaseroots')
546 546 return fnames
547 547
548 548
549 549 def _filterfull(entry, copy, vfsmap):
550 550 """actually copy the snapshot files"""
551 551 src, name, ftype, data = entry
552 552 if ftype != _filefull:
553 553 return entry
554 554 return (src, name, ftype, copy(vfsmap[src].join(name)))
555 555
556 556
557 557 @contextlib.contextmanager
558 558 def maketempcopies():
559 559 """return a function to temporary copy file"""
560 560
561 561 files = []
562 562 dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-')
563 563 try:
564 564
565 565 def copy(src):
566 566 fd, dst = pycompat.mkstemp(
567 567 prefix=os.path.basename(src), dir=dst_dir
568 568 )
569 569 os.close(fd)
570 570 files.append(dst)
571 571 util.copyfiles(src, dst, hardlink=True)
572 572 return dst
573 573
574 574 yield copy
575 575 finally:
576 576 for tmp in files:
577 577 util.tryunlink(tmp)
578 578 util.tryrmdir(dst_dir)
579 579
580 580
581 581 def _makemap(repo):
582 582 """make a (src -> vfs) map for the repo"""
583 583 vfsmap = {
584 584 _srcstore: repo.svfs,
585 585 _srccache: repo.cachevfs,
586 586 }
587 587 # we keep repo.vfs out of the on purpose, ther are too many danger there
588 588 # (eg: .hg/hgrc)
589 589 assert repo.vfs not in vfsmap.values()
590 590
591 591 return vfsmap
592 592
593 593
594 594 def _emit2(repo, entries, totalfilesize):
595 595 """actually emit the stream bundle"""
596 596 vfsmap = _makemap(repo)
597 597 # we keep repo.vfs out of the on purpose, ther are too many danger there
598 598 # (eg: .hg/hgrc),
599 599 #
600 600 # this assert is duplicated (from _makemap) as author might think this is
601 601 # fine, while this is really not fine.
602 602 if repo.vfs in vfsmap.values():
603 603 raise error.ProgrammingError(
604 604 b'repo.vfs must not be added to vfsmap for security reasons'
605 605 )
606 606
607 607 progress = repo.ui.makeprogress(
608 608 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
609 609 )
610 610 progress.update(0)
611 611 with maketempcopies() as copy, progress:
612 612 # copy is delayed until we are in the try
613 613 entries = [_filterfull(e, copy, vfsmap) for e in entries]
614 614 yield None # this release the lock on the repository
615 615 totalbytecount = 0
616 616
617 617 for src, name, ftype, data in entries:
618 618 vfs = vfsmap[src]
619 619 yield src
620 620 yield util.uvarintencode(len(name))
621 621 if ftype == _fileappend:
622 622 fp = vfs(name)
623 623 size = data
624 624 elif ftype == _filefull:
625 625 fp = open(data, b'rb')
626 626 size = util.fstat(fp).st_size
627 627 bytecount = 0
628 628 try:
629 629 yield util.uvarintencode(size)
630 630 yield name
631 631 if size <= 65536:
632 632 chunks = (fp.read(size),)
633 633 else:
634 634 chunks = util.filechunkiter(fp, limit=size)
635 635 for chunk in chunks:
636 636 bytecount += len(chunk)
637 637 totalbytecount += len(chunk)
638 638 progress.update(totalbytecount)
639 639 yield chunk
640 640 if bytecount != size:
641 641 # Would most likely be caused by a race due to `hg strip` or
642 642 # a revlog split
643 643 raise error.Abort(
644 644 _(
645 645 b'clone could only read %d bytes from %s, but '
646 646 b'expected %d bytes'
647 647 )
648 648 % (bytecount, name, size)
649 649 )
650 650 finally:
651 651 fp.close()
652 652
653 653
654 654 def _test_sync_point_walk_1(repo):
655 655 """a function for synchronisation during tests"""
656 656
657 657
658 658 def _test_sync_point_walk_2(repo):
659 659 """a function for synchronisation during tests"""
660 660
661 661
662 def _entries_walk(repo, includes, excludes, includeobsmarkers):
663 """emit a seris of files information useful to clone a repo
664
665 return (vfs-key, entry) iterator
666
667 Where `entry` is StoreEntry. (used even for cache entries)
668 """
669 assert repo._currentlock(repo._lockref) is not None
670
671 matcher = None
672 if includes or excludes:
673 matcher = narrowspec.match(repo.root, includes, excludes)
674
675 phase = not repo.publishing()
676 entries = _walkstreamfiles(
677 repo,
678 matcher,
679 phase=phase,
680 obsolescence=includeobsmarkers,
681 )
682 for entry in entries:
683 yield (_srcstore, entry)
684
685
662 686 def _v2_walk(repo, includes, excludes, includeobsmarkers):
663 687 """emit a seris of files information useful to clone a repo
664 688
665 689 return (entries, totalfilesize)
666 690
667 691 entries is a list of tuple (vfs-key, file-path, file-type, size)
668 692
669 693 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
670 694 - `name`: file path of the file to copy (to be feed to the vfss)
671 695 - `file-type`: do this file need to be copied with the source lock ?
672 696 - `size`: the size of the file (or None)
673 697 """
674 698 assert repo._currentlock(repo._lockref) is not None
675 699 files = []
676 700 totalfilesize = 0
677 701
678 matcher = None
679 if includes or excludes:
680 matcher = narrowspec.match(repo.root, includes, excludes)
681
682 phase = not repo.publishing()
683 entries = _walkstreamfiles(
684 repo, matcher, phase=phase, obsolescence=includeobsmarkers
685 )
686 for entry in entries:
702 vfsmap = _makemap(repo)
703 entries = _entries_walk(repo, includes, excludes, includeobsmarkers)
704 for vfs_key, entry in entries:
705 vfs = vfsmap[vfs_key]
687 706 for f in entry.files():
688 file_size = f.file_size(repo.store.vfs)
707 file_size = f.file_size(vfs)
689 708 if file_size:
690 709 ft = _fileappend
691 710 if f.is_volatile:
692 711 ft = _filefull
693 files.append((_srcstore, f.unencoded_path, ft, file_size))
712 files.append((vfs_key, f.unencoded_path, ft, file_size))
694 713 totalfilesize += file_size
695 714 for name in cacheutil.cachetocopy(repo):
696 715 if repo.cachevfs.exists(name):
697 716 totalfilesize += repo.cachevfs.lstat(name).st_size
698 717 files.append((_srccache, name, _filefull, None))
699 718 return files, totalfilesize
700 719
701 720
702 721 def generatev2(repo, includes, excludes, includeobsmarkers):
703 722 """Emit content for version 2 of a streaming clone.
704 723
705 724 the data stream consists the following entries:
706 725 1) A char representing the file destination (eg: store or cache)
707 726 2) A varint containing the length of the filename
708 727 3) A varint containing the length of file data
709 728 4) N bytes containing the filename (the internal, store-agnostic form)
710 729 5) N bytes containing the file data
711 730
712 731 Returns a 3-tuple of (file count, file size, data iterator).
713 732 """
714 733
715 734 with repo.lock():
716 735
717 736 repo.ui.debug(b'scanning\n')
718 737
719 738 entries, totalfilesize = _v2_walk(
720 739 repo,
721 740 includes=includes,
722 741 excludes=excludes,
723 742 includeobsmarkers=includeobsmarkers,
724 743 )
725 744
726 745 chunks = _emit2(repo, entries, totalfilesize)
727 746 first = next(chunks)
728 747 assert first is None
729 748 _test_sync_point_walk_1(repo)
730 749 _test_sync_point_walk_2(repo)
731 750
732 751 return len(entries), totalfilesize, chunks
733 752
734 753
735 754 @contextlib.contextmanager
736 755 def nested(*ctxs):
737 756 this = ctxs[0]
738 757 rest = ctxs[1:]
739 758 with this:
740 759 if rest:
741 760 with nested(*rest):
742 761 yield
743 762 else:
744 763 yield
745 764
746 765
747 766 def consumev2(repo, fp, filecount, filesize):
748 767 """Apply the contents from a version 2 streaming clone.
749 768
750 769 Data is read from an object that only needs to provide a ``read(size)``
751 770 method.
752 771 """
753 772 with repo.lock():
754 773 repo.ui.status(
755 774 _(b'%d files to transfer, %s of data\n')
756 775 % (filecount, util.bytecount(filesize))
757 776 )
758 777
759 778 start = util.timer()
760 779 progress = repo.ui.makeprogress(
761 780 _(b'clone'), total=filesize, unit=_(b'bytes')
762 781 )
763 782 progress.update(0)
764 783
765 784 vfsmap = _makemap(repo)
766 785 # we keep repo.vfs out of the on purpose, ther are too many danger
767 786 # there (eg: .hg/hgrc),
768 787 #
769 788 # this assert is duplicated (from _makemap) as author might think this
770 789 # is fine, while this is really not fine.
771 790 if repo.vfs in vfsmap.values():
772 791 raise error.ProgrammingError(
773 792 b'repo.vfs must not be added to vfsmap for security reasons'
774 793 )
775 794
776 795 with repo.transaction(b'clone'):
777 796 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
778 797 with nested(*ctxs):
779 798 for i in range(filecount):
780 799 src = util.readexactly(fp, 1)
781 800 vfs = vfsmap[src]
782 801 namelen = util.uvarintdecodestream(fp)
783 802 datalen = util.uvarintdecodestream(fp)
784 803
785 804 name = util.readexactly(fp, namelen)
786 805
787 806 if repo.ui.debugflag:
788 807 repo.ui.debug(
789 808 b'adding [%s] %s (%s)\n'
790 809 % (src, name, util.bytecount(datalen))
791 810 )
792 811
793 812 with vfs(name, b'w') as ofp:
794 813 for chunk in util.filechunkiter(fp, limit=datalen):
795 814 progress.increment(step=len(chunk))
796 815 ofp.write(chunk)
797 816
798 817 # force @filecache properties to be reloaded from
799 818 # streamclone-ed file at next access
800 819 repo.invalidate(clearfilecache=True)
801 820
802 821 elapsed = util.timer() - start
803 822 if elapsed <= 0:
804 823 elapsed = 0.001
805 824 repo.ui.status(
806 825 _(b'transferred %s in %.1f seconds (%s/sec)\n')
807 826 % (
808 827 util.bytecount(progress.pos),
809 828 elapsed,
810 829 util.bytecount(progress.pos / elapsed),
811 830 )
812 831 )
813 832 progress.complete()
814 833
815 834
816 835 def applybundlev2(repo, fp, filecount, filesize, requirements):
817 836 from . import localrepo
818 837
819 838 missingreqs = [r for r in requirements if r not in repo.supported]
820 839 if missingreqs:
821 840 raise error.Abort(
822 841 _(b'unable to apply stream clone: unsupported format: %s')
823 842 % b', '.join(sorted(missingreqs))
824 843 )
825 844
826 845 consumev2(repo, fp, filecount, filesize)
827 846
828 847 repo.requirements = new_stream_clone_requirements(
829 848 repo.requirements,
830 849 requirements,
831 850 )
832 851 repo.svfs.options = localrepo.resolvestorevfsoptions(
833 852 repo.ui, repo.requirements, repo.features
834 853 )
835 854 scmutil.writereporequirements(repo)
836 855 nodemap.post_stream_cleanup(repo)
837 856
838 857
839 858 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
840 859 hardlink = [True]
841 860
842 861 def copy_used():
843 862 hardlink[0] = False
844 863 progress.topic = _(b'copying')
845 864
846 865 for k, path, size in entries:
847 866 src_vfs = src_vfs_map[k]
848 867 dst_vfs = dst_vfs_map[k]
849 868 src_path = src_vfs.join(path)
850 869 dst_path = dst_vfs.join(path)
851 870 # We cannot use dirname and makedirs of dst_vfs here because the store
852 871 # encoding confuses them. See issue 6581 for details.
853 872 dirname = os.path.dirname(dst_path)
854 873 if not os.path.exists(dirname):
855 874 util.makedirs(dirname)
856 875 dst_vfs.register_file(path)
857 876 # XXX we could use the #nb_bytes argument.
858 877 util.copyfile(
859 878 src_path,
860 879 dst_path,
861 880 hardlink=hardlink[0],
862 881 no_hardlink_cb=copy_used,
863 882 check_fs_hardlink=False,
864 883 )
865 884 progress.increment()
866 885 return hardlink[0]
867 886
868 887
869 888 def local_copy(src_repo, dest_repo):
870 889 """copy all content from one local repository to another
871 890
872 891 This is useful for local clone"""
873 892 src_store_requirements = {
874 893 r
875 894 for r in src_repo.requirements
876 895 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
877 896 }
878 897 dest_store_requirements = {
879 898 r
880 899 for r in dest_repo.requirements
881 900 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
882 901 }
883 902 assert src_store_requirements == dest_store_requirements
884 903
885 904 with dest_repo.lock():
886 905 with src_repo.lock():
887 906
888 907 # bookmark is not integrated to the streaming as it might use the
889 908 # `repo.vfs` and they are too many sentitive data accessible
890 909 # through `repo.vfs` to expose it to streaming clone.
891 910 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
892 911 srcbookmarks = src_book_vfs.join(b'bookmarks')
893 912 bm_count = 0
894 913 if os.path.exists(srcbookmarks):
895 914 bm_count = 1
896 915
897 916 entries, totalfilesize = _v2_walk(
898 917 src_repo,
899 918 includes=None,
900 919 excludes=None,
901 920 includeobsmarkers=True,
902 921 )
903 922 src_vfs_map = _makemap(src_repo)
904 923 dest_vfs_map = _makemap(dest_repo)
905 924 progress = src_repo.ui.makeprogress(
906 925 topic=_(b'linking'),
907 926 total=len(entries) + bm_count,
908 927 unit=_(b'files'),
909 928 )
910 929 # copy files
911 930 #
912 931 # We could copy the full file while the source repository is locked
913 932 # and the other one without the lock. However, in the linking case,
914 933 # this would also requires checks that nobody is appending any data
915 934 # to the files while we do the clone, so this is not done yet. We
916 935 # could do this blindly when copying files.
917 936 files = ((k, path, size) for k, path, ftype, size in entries)
918 937 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
919 938
920 939 # copy bookmarks over
921 940 if bm_count:
922 941 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
923 942 dstbookmarks = dst_book_vfs.join(b'bookmarks')
924 943 util.copyfile(srcbookmarks, dstbookmarks)
925 944 progress.complete()
926 945 if hardlink:
927 946 msg = b'linked %d files\n'
928 947 else:
929 948 msg = b'copied %d files\n'
930 949 src_repo.ui.debug(msg % (len(entries) + bm_count))
931 950
932 951 with dest_repo.transaction(b"localclone") as tr:
933 952 dest_repo.store.write(tr)
934 953
935 954 # clean up transaction file as they do not make sense
936 955 transaction.cleanup_undo_files(dest_repo.ui.warn, dest_repo.vfs_map)
General Comments 0
You need to be logged in to leave comments. Login now