##// END OF EJS Templates
undo-files: use the cleanup function in streamclone...
marmoute -
r51187:97e91001 stable
parent child Browse files
Show More
@@ -1,949 +1,935 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8
9 9 import contextlib
10 import errno
11 10 import os
12 11 import struct
13 12
14 13 from .i18n import _
15 14 from .pycompat import open
16 15 from .interfaces import repository
17 16 from . import (
18 17 bookmarks,
19 18 cacheutil,
20 19 error,
21 20 narrowspec,
22 21 phases,
23 22 pycompat,
23 repair,
24 24 requirements as requirementsmod,
25 25 scmutil,
26 26 store,
27 27 util,
28 28 )
29 29 from .revlogutils import (
30 30 nodemap,
31 31 )
32 from .utils import (
33 stringutil,
34 )
35 32
36 33
37 34 def new_stream_clone_requirements(default_requirements, streamed_requirements):
38 35 """determine the final set of requirement for a new stream clone
39 36
40 37 this method combine the "default" requirements that a new repository would
41 38 use with the constaint we get from the stream clone content. We keep local
42 39 configuration choice when possible.
43 40 """
44 41 requirements = set(default_requirements)
45 42 requirements -= requirementsmod.STREAM_FIXED_REQUIREMENTS
46 43 requirements.update(streamed_requirements)
47 44 return requirements
48 45
49 46
50 47 def streamed_requirements(repo):
51 48 """the set of requirement the new clone will have to support
52 49
53 50 This is used for advertising the stream options and to generate the actual
54 51 stream content."""
55 52 requiredformats = (
56 53 repo.requirements & requirementsmod.STREAM_FIXED_REQUIREMENTS
57 54 )
58 55 return requiredformats
59 56
60 57
61 58 def canperformstreamclone(pullop, bundle2=False):
62 59 """Whether it is possible to perform a streaming clone as part of pull.
63 60
64 61 ``bundle2`` will cause the function to consider stream clone through
65 62 bundle2 and only through bundle2.
66 63
67 64 Returns a tuple of (supported, requirements). ``supported`` is True if
68 65 streaming clone is supported and False otherwise. ``requirements`` is
69 66 a set of repo requirements from the remote, or ``None`` if stream clone
70 67 isn't supported.
71 68 """
72 69 repo = pullop.repo
73 70 remote = pullop.remote
74 71
75 72 bundle2supported = False
76 73 if pullop.canusebundle2:
77 74 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
78 75 bundle2supported = True
79 76 # else
80 77 # Server doesn't support bundle2 stream clone or doesn't support
81 78 # the versions we support. Fall back and possibly allow legacy.
82 79
83 80 # Ensures legacy code path uses available bundle2.
84 81 if bundle2supported and not bundle2:
85 82 return False, None
86 83 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
87 84 elif bundle2 and not bundle2supported:
88 85 return False, None
89 86
90 87 # Streaming clone only works on empty repositories.
91 88 if len(repo):
92 89 return False, None
93 90
94 91 # Streaming clone only works if all data is being requested.
95 92 if pullop.heads:
96 93 return False, None
97 94
98 95 streamrequested = pullop.streamclonerequested
99 96
100 97 # If we don't have a preference, let the server decide for us. This
101 98 # likely only comes into play in LANs.
102 99 if streamrequested is None:
103 100 # The server can advertise whether to prefer streaming clone.
104 101 streamrequested = remote.capable(b'stream-preferred')
105 102
106 103 if not streamrequested:
107 104 return False, None
108 105
109 106 # In order for stream clone to work, the client has to support all the
110 107 # requirements advertised by the server.
111 108 #
112 109 # The server advertises its requirements via the "stream" and "streamreqs"
113 110 # capability. "stream" (a value-less capability) is advertised if and only
114 111 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
115 112 # is advertised and contains a comma-delimited list of requirements.
116 113 requirements = set()
117 114 if remote.capable(b'stream'):
118 115 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
119 116 else:
120 117 streamreqs = remote.capable(b'streamreqs')
121 118 # This is weird and shouldn't happen with modern servers.
122 119 if not streamreqs:
123 120 pullop.repo.ui.warn(
124 121 _(
125 122 b'warning: stream clone requested but server has them '
126 123 b'disabled\n'
127 124 )
128 125 )
129 126 return False, None
130 127
131 128 streamreqs = set(streamreqs.split(b','))
132 129 # Server requires something we don't support. Bail.
133 130 missingreqs = streamreqs - repo.supported
134 131 if missingreqs:
135 132 pullop.repo.ui.warn(
136 133 _(
137 134 b'warning: stream clone requested but client is missing '
138 135 b'requirements: %s\n'
139 136 )
140 137 % b', '.join(sorted(missingreqs))
141 138 )
142 139 pullop.repo.ui.warn(
143 140 _(
144 141 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
145 142 b'for more information)\n'
146 143 )
147 144 )
148 145 return False, None
149 146 requirements = streamreqs
150 147
151 148 return True, requirements
152 149
153 150
154 151 def maybeperformlegacystreamclone(pullop):
155 152 """Possibly perform a legacy stream clone operation.
156 153
157 154 Legacy stream clones are performed as part of pull but before all other
158 155 operations.
159 156
160 157 A legacy stream clone will not be performed if a bundle2 stream clone is
161 158 supported.
162 159 """
163 160 from . import localrepo
164 161
165 162 supported, requirements = canperformstreamclone(pullop)
166 163
167 164 if not supported:
168 165 return
169 166
170 167 repo = pullop.repo
171 168 remote = pullop.remote
172 169
173 170 # Save remote branchmap. We will use it later to speed up branchcache
174 171 # creation.
175 172 rbranchmap = None
176 173 if remote.capable(b'branchmap'):
177 174 with remote.commandexecutor() as e:
178 175 rbranchmap = e.callcommand(b'branchmap', {}).result()
179 176
180 177 repo.ui.status(_(b'streaming all changes\n'))
181 178
182 179 with remote.commandexecutor() as e:
183 180 fp = e.callcommand(b'stream_out', {}).result()
184 181
185 182 # TODO strictly speaking, this code should all be inside the context
186 183 # manager because the context manager is supposed to ensure all wire state
187 184 # is flushed when exiting. But the legacy peers don't do this, so it
188 185 # doesn't matter.
189 186 l = fp.readline()
190 187 try:
191 188 resp = int(l)
192 189 except ValueError:
193 190 raise error.ResponseError(
194 191 _(b'unexpected response from remote server:'), l
195 192 )
196 193 if resp == 1:
197 194 raise error.Abort(_(b'operation forbidden by server'))
198 195 elif resp == 2:
199 196 raise error.Abort(_(b'locking the remote repository failed'))
200 197 elif resp != 0:
201 198 raise error.Abort(_(b'the server sent an unknown error code'))
202 199
203 200 l = fp.readline()
204 201 try:
205 202 filecount, bytecount = map(int, l.split(b' ', 1))
206 203 except (ValueError, TypeError):
207 204 raise error.ResponseError(
208 205 _(b'unexpected response from remote server:'), l
209 206 )
210 207
211 208 with repo.lock():
212 209 consumev1(repo, fp, filecount, bytecount)
213 210 repo.requirements = new_stream_clone_requirements(
214 211 repo.requirements,
215 212 requirements,
216 213 )
217 214 repo.svfs.options = localrepo.resolvestorevfsoptions(
218 215 repo.ui, repo.requirements, repo.features
219 216 )
220 217 scmutil.writereporequirements(repo)
221 218 nodemap.post_stream_cleanup(repo)
222 219
223 220 if rbranchmap:
224 221 repo._branchcaches.replace(repo, rbranchmap)
225 222
226 223 repo.invalidate()
227 224
228 225
229 226 def allowservergeneration(repo):
230 227 """Whether streaming clones are allowed from the server."""
231 228 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
232 229 return False
233 230
234 231 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
235 232 return False
236 233
237 234 # The way stream clone works makes it impossible to hide secret changesets.
238 235 # So don't allow this by default.
239 236 secret = phases.hassecret(repo)
240 237 if secret:
241 238 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
242 239
243 240 return True
244 241
245 242
246 243 # This is it's own function so extensions can override it.
247 244 def _walkstreamfiles(repo, matcher=None):
248 245 return repo.store.walk(matcher)
249 246
250 247
251 248 def generatev1(repo):
252 249 """Emit content for version 1 of a streaming clone.
253 250
254 251 This returns a 3-tuple of (file count, byte size, data iterator).
255 252
256 253 The data iterator consists of N entries for each file being transferred.
257 254 Each file entry starts as a line with the file name and integer size
258 255 delimited by a null byte.
259 256
260 257 The raw file data follows. Following the raw file data is the next file
261 258 entry, or EOF.
262 259
263 260 When used on the wire protocol, an additional line indicating protocol
264 261 success will be prepended to the stream. This function is not responsible
265 262 for adding it.
266 263
267 264 This function will obtain a repository lock to ensure a consistent view of
268 265 the store is captured. It therefore may raise LockError.
269 266 """
270 267 entries = []
271 268 total_bytes = 0
272 269 # Get consistent snapshot of repo, lock during scan.
273 270 with repo.lock():
274 271 repo.ui.debug(b'scanning\n')
275 272 for file_type, name, size in _walkstreamfiles(repo):
276 273 if size:
277 274 entries.append((name, size))
278 275 total_bytes += size
279 276 _test_sync_point_walk_1(repo)
280 277 _test_sync_point_walk_2(repo)
281 278
282 279 repo.ui.debug(
283 280 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
284 281 )
285 282
286 283 svfs = repo.svfs
287 284 debugflag = repo.ui.debugflag
288 285
289 286 def emitrevlogdata():
290 287 for name, size in entries:
291 288 if debugflag:
292 289 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
293 290 # partially encode name over the wire for backwards compat
294 291 yield b'%s\0%d\n' % (store.encodedir(name), size)
295 292 # auditing at this stage is both pointless (paths are already
296 293 # trusted by the local repo) and expensive
297 294 with svfs(name, b'rb', auditpath=False) as fp:
298 295 if size <= 65536:
299 296 yield fp.read(size)
300 297 else:
301 298 for chunk in util.filechunkiter(fp, limit=size):
302 299 yield chunk
303 300
304 301 return len(entries), total_bytes, emitrevlogdata()
305 302
306 303
307 304 def generatev1wireproto(repo):
308 305 """Emit content for version 1 of streaming clone suitable for the wire.
309 306
310 307 This is the data output from ``generatev1()`` with 2 header lines. The
311 308 first line indicates overall success. The 2nd contains the file count and
312 309 byte size of payload.
313 310
314 311 The success line contains "0" for success, "1" for stream generation not
315 312 allowed, and "2" for error locking the repository (possibly indicating
316 313 a permissions error for the server process).
317 314 """
318 315 if not allowservergeneration(repo):
319 316 yield b'1\n'
320 317 return
321 318
322 319 try:
323 320 filecount, bytecount, it = generatev1(repo)
324 321 except error.LockError:
325 322 yield b'2\n'
326 323 return
327 324
328 325 # Indicates successful response.
329 326 yield b'0\n'
330 327 yield b'%d %d\n' % (filecount, bytecount)
331 328 for chunk in it:
332 329 yield chunk
333 330
334 331
335 332 def generatebundlev1(repo, compression=b'UN'):
336 333 """Emit content for version 1 of a stream clone bundle.
337 334
338 335 The first 4 bytes of the output ("HGS1") denote this as stream clone
339 336 bundle version 1.
340 337
341 338 The next 2 bytes indicate the compression type. Only "UN" is currently
342 339 supported.
343 340
344 341 The next 16 bytes are two 64-bit big endian unsigned integers indicating
345 342 file count and byte count, respectively.
346 343
347 344 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
348 345 of the requirements string, including a trailing \0. The following N bytes
349 346 are the requirements string, which is ASCII containing a comma-delimited
350 347 list of repo requirements that are needed to support the data.
351 348
352 349 The remaining content is the output of ``generatev1()`` (which may be
353 350 compressed in the future).
354 351
355 352 Returns a tuple of (requirements, data generator).
356 353 """
357 354 if compression != b'UN':
358 355 raise ValueError(b'we do not support the compression argument yet')
359 356
360 357 requirements = streamed_requirements(repo)
361 358 requires = b','.join(sorted(requirements))
362 359
363 360 def gen():
364 361 yield b'HGS1'
365 362 yield compression
366 363
367 364 filecount, bytecount, it = generatev1(repo)
368 365 repo.ui.status(
369 366 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
370 367 )
371 368
372 369 yield struct.pack(b'>QQ', filecount, bytecount)
373 370 yield struct.pack(b'>H', len(requires) + 1)
374 371 yield requires + b'\0'
375 372
376 373 # This is where we'll add compression in the future.
377 374 assert compression == b'UN'
378 375
379 376 progress = repo.ui.makeprogress(
380 377 _(b'bundle'), total=bytecount, unit=_(b'bytes')
381 378 )
382 379 progress.update(0)
383 380
384 381 for chunk in it:
385 382 progress.increment(step=len(chunk))
386 383 yield chunk
387 384
388 385 progress.complete()
389 386
390 387 return requirements, gen()
391 388
392 389
393 390 def consumev1(repo, fp, filecount, bytecount):
394 391 """Apply the contents from version 1 of a streaming clone file handle.
395 392
396 393 This takes the output from "stream_out" and applies it to the specified
397 394 repository.
398 395
399 396 Like "stream_out," the status line added by the wire protocol is not
400 397 handled by this function.
401 398 """
402 399 with repo.lock():
403 400 repo.ui.status(
404 401 _(b'%d files to transfer, %s of data\n')
405 402 % (filecount, util.bytecount(bytecount))
406 403 )
407 404 progress = repo.ui.makeprogress(
408 405 _(b'clone'), total=bytecount, unit=_(b'bytes')
409 406 )
410 407 progress.update(0)
411 408 start = util.timer()
412 409
413 410 # TODO: get rid of (potential) inconsistency
414 411 #
415 412 # If transaction is started and any @filecache property is
416 413 # changed at this point, it causes inconsistency between
417 414 # in-memory cached property and streamclone-ed file on the
418 415 # disk. Nested transaction prevents transaction scope "clone"
419 416 # below from writing in-memory changes out at the end of it,
420 417 # even though in-memory changes are discarded at the end of it
421 418 # regardless of transaction nesting.
422 419 #
423 420 # But transaction nesting can't be simply prohibited, because
424 421 # nesting occurs also in ordinary case (e.g. enabling
425 422 # clonebundles).
426 423
427 424 with repo.transaction(b'clone'):
428 425 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
429 426 for i in range(filecount):
430 427 # XXX doesn't support '\n' or '\r' in filenames
431 428 l = fp.readline()
432 429 try:
433 430 name, size = l.split(b'\0', 1)
434 431 size = int(size)
435 432 except (ValueError, TypeError):
436 433 raise error.ResponseError(
437 434 _(b'unexpected response from remote server:'), l
438 435 )
439 436 if repo.ui.debugflag:
440 437 repo.ui.debug(
441 438 b'adding %s (%s)\n' % (name, util.bytecount(size))
442 439 )
443 440 # for backwards compat, name was partially encoded
444 441 path = store.decodedir(name)
445 442 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
446 443 for chunk in util.filechunkiter(fp, limit=size):
447 444 progress.increment(step=len(chunk))
448 445 ofp.write(chunk)
449 446
450 447 # force @filecache properties to be reloaded from
451 448 # streamclone-ed file at next access
452 449 repo.invalidate(clearfilecache=True)
453 450
454 451 elapsed = util.timer() - start
455 452 if elapsed <= 0:
456 453 elapsed = 0.001
457 454 progress.complete()
458 455 repo.ui.status(
459 456 _(b'transferred %s in %.1f seconds (%s/sec)\n')
460 457 % (
461 458 util.bytecount(bytecount),
462 459 elapsed,
463 460 util.bytecount(bytecount / elapsed),
464 461 )
465 462 )
466 463
467 464
468 465 def readbundle1header(fp):
469 466 compression = fp.read(2)
470 467 if compression != b'UN':
471 468 raise error.Abort(
472 469 _(
473 470 b'only uncompressed stream clone bundles are '
474 471 b'supported; got %s'
475 472 )
476 473 % compression
477 474 )
478 475
479 476 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
480 477 requireslen = struct.unpack(b'>H', fp.read(2))[0]
481 478 requires = fp.read(requireslen)
482 479
483 480 if not requires.endswith(b'\0'):
484 481 raise error.Abort(
485 482 _(
486 483 b'malformed stream clone bundle: '
487 484 b'requirements not properly encoded'
488 485 )
489 486 )
490 487
491 488 requirements = set(requires.rstrip(b'\0').split(b','))
492 489
493 490 return filecount, bytecount, requirements
494 491
495 492
496 493 def applybundlev1(repo, fp):
497 494 """Apply the content from a stream clone bundle version 1.
498 495
499 496 We assume the 4 byte header has been read and validated and the file handle
500 497 is at the 2 byte compression identifier.
501 498 """
502 499 if len(repo):
503 500 raise error.Abort(
504 501 _(b'cannot apply stream clone bundle on non-empty repo')
505 502 )
506 503
507 504 filecount, bytecount, requirements = readbundle1header(fp)
508 505 missingreqs = requirements - repo.supported
509 506 if missingreqs:
510 507 raise error.Abort(
511 508 _(b'unable to apply stream clone: unsupported format: %s')
512 509 % b', '.join(sorted(missingreqs))
513 510 )
514 511
515 512 consumev1(repo, fp, filecount, bytecount)
516 513 nodemap.post_stream_cleanup(repo)
517 514
518 515
519 516 class streamcloneapplier:
520 517 """Class to manage applying streaming clone bundles.
521 518
522 519 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
523 520 readers to perform bundle type-specific functionality.
524 521 """
525 522
526 523 def __init__(self, fh):
527 524 self._fh = fh
528 525
529 526 def apply(self, repo):
530 527 return applybundlev1(repo, self._fh)
531 528
532 529
533 530 # type of file to stream
534 531 _fileappend = 0 # append only file
535 532 _filefull = 1 # full snapshot file
536 533
537 534 # Source of the file
538 535 _srcstore = b's' # store (svfs)
539 536 _srccache = b'c' # cache (cache)
540 537
541 538 # This is it's own function so extensions can override it.
542 539 def _walkstreamfullstorefiles(repo):
543 540 """list snapshot file from the store"""
544 541 fnames = []
545 542 if not repo.publishing():
546 543 fnames.append(b'phaseroots')
547 544 return fnames
548 545
549 546
550 547 def _filterfull(entry, copy, vfsmap):
551 548 """actually copy the snapshot files"""
552 549 src, name, ftype, data = entry
553 550 if ftype != _filefull:
554 551 return entry
555 552 return (src, name, ftype, copy(vfsmap[src].join(name)))
556 553
557 554
558 555 @contextlib.contextmanager
559 556 def maketempcopies():
560 557 """return a function to temporary copy file"""
561 558
562 559 files = []
563 560 dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-')
564 561 try:
565 562
566 563 def copy(src):
567 564 fd, dst = pycompat.mkstemp(
568 565 prefix=os.path.basename(src), dir=dst_dir
569 566 )
570 567 os.close(fd)
571 568 files.append(dst)
572 569 util.copyfiles(src, dst, hardlink=True)
573 570 return dst
574 571
575 572 yield copy
576 573 finally:
577 574 for tmp in files:
578 575 util.tryunlink(tmp)
579 576 util.tryrmdir(dst_dir)
580 577
581 578
582 579 def _makemap(repo):
583 580 """make a (src -> vfs) map for the repo"""
584 581 vfsmap = {
585 582 _srcstore: repo.svfs,
586 583 _srccache: repo.cachevfs,
587 584 }
588 585 # we keep repo.vfs out of the on purpose, ther are too many danger there
589 586 # (eg: .hg/hgrc)
590 587 assert repo.vfs not in vfsmap.values()
591 588
592 589 return vfsmap
593 590
594 591
595 592 def _emit2(repo, entries, totalfilesize):
596 593 """actually emit the stream bundle"""
597 594 vfsmap = _makemap(repo)
598 595 # we keep repo.vfs out of the on purpose, ther are too many danger there
599 596 # (eg: .hg/hgrc),
600 597 #
601 598 # this assert is duplicated (from _makemap) as author might think this is
602 599 # fine, while this is really not fine.
603 600 if repo.vfs in vfsmap.values():
604 601 raise error.ProgrammingError(
605 602 b'repo.vfs must not be added to vfsmap for security reasons'
606 603 )
607 604
608 605 progress = repo.ui.makeprogress(
609 606 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
610 607 )
611 608 progress.update(0)
612 609 with maketempcopies() as copy, progress:
613 610 # copy is delayed until we are in the try
614 611 entries = [_filterfull(e, copy, vfsmap) for e in entries]
615 612 yield None # this release the lock on the repository
616 613 totalbytecount = 0
617 614
618 615 for src, name, ftype, data in entries:
619 616 vfs = vfsmap[src]
620 617 yield src
621 618 yield util.uvarintencode(len(name))
622 619 if ftype == _fileappend:
623 620 fp = vfs(name)
624 621 size = data
625 622 elif ftype == _filefull:
626 623 fp = open(data, b'rb')
627 624 size = util.fstat(fp).st_size
628 625 bytecount = 0
629 626 try:
630 627 yield util.uvarintencode(size)
631 628 yield name
632 629 if size <= 65536:
633 630 chunks = (fp.read(size),)
634 631 else:
635 632 chunks = util.filechunkiter(fp, limit=size)
636 633 for chunk in chunks:
637 634 bytecount += len(chunk)
638 635 totalbytecount += len(chunk)
639 636 progress.update(totalbytecount)
640 637 yield chunk
641 638 if bytecount != size:
642 639 # Would most likely be caused by a race due to `hg strip` or
643 640 # a revlog split
644 641 raise error.Abort(
645 642 _(
646 643 b'clone could only read %d bytes from %s, but '
647 644 b'expected %d bytes'
648 645 )
649 646 % (bytecount, name, size)
650 647 )
651 648 finally:
652 649 fp.close()
653 650
654 651
655 652 def _test_sync_point_walk_1(repo):
656 653 """a function for synchronisation during tests"""
657 654
658 655
659 656 def _test_sync_point_walk_2(repo):
660 657 """a function for synchronisation during tests"""
661 658
662 659
663 660 def _v2_walk(repo, includes, excludes, includeobsmarkers):
664 661 """emit a seris of files information useful to clone a repo
665 662
666 663 return (entries, totalfilesize)
667 664
668 665 entries is a list of tuple (vfs-key, file-path, file-type, size)
669 666
670 667 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
671 668 - `name`: file path of the file to copy (to be feed to the vfss)
672 669 - `file-type`: do this file need to be copied with the source lock ?
673 670 - `size`: the size of the file (or None)
674 671 """
675 672 assert repo._currentlock(repo._lockref) is not None
676 673 entries = []
677 674 totalfilesize = 0
678 675
679 676 matcher = None
680 677 if includes or excludes:
681 678 matcher = narrowspec.match(repo.root, includes, excludes)
682 679
683 680 for rl_type, name, size in _walkstreamfiles(repo, matcher):
684 681 if size:
685 682 ft = _fileappend
686 683 if rl_type & store.FILEFLAGS_VOLATILE:
687 684 ft = _filefull
688 685 entries.append((_srcstore, name, ft, size))
689 686 totalfilesize += size
690 687 for name in _walkstreamfullstorefiles(repo):
691 688 if repo.svfs.exists(name):
692 689 totalfilesize += repo.svfs.lstat(name).st_size
693 690 entries.append((_srcstore, name, _filefull, None))
694 691 if includeobsmarkers and repo.svfs.exists(b'obsstore'):
695 692 totalfilesize += repo.svfs.lstat(b'obsstore').st_size
696 693 entries.append((_srcstore, b'obsstore', _filefull, None))
697 694 for name in cacheutil.cachetocopy(repo):
698 695 if repo.cachevfs.exists(name):
699 696 totalfilesize += repo.cachevfs.lstat(name).st_size
700 697 entries.append((_srccache, name, _filefull, None))
701 698 return entries, totalfilesize
702 699
703 700
704 701 def generatev2(repo, includes, excludes, includeobsmarkers):
705 702 """Emit content for version 2 of a streaming clone.
706 703
707 704 the data stream consists the following entries:
708 705 1) A char representing the file destination (eg: store or cache)
709 706 2) A varint containing the length of the filename
710 707 3) A varint containing the length of file data
711 708 4) N bytes containing the filename (the internal, store-agnostic form)
712 709 5) N bytes containing the file data
713 710
714 711 Returns a 3-tuple of (file count, file size, data iterator).
715 712 """
716 713
717 714 with repo.lock():
718 715
719 716 repo.ui.debug(b'scanning\n')
720 717
721 718 entries, totalfilesize = _v2_walk(
722 719 repo,
723 720 includes=includes,
724 721 excludes=excludes,
725 722 includeobsmarkers=includeobsmarkers,
726 723 )
727 724
728 725 chunks = _emit2(repo, entries, totalfilesize)
729 726 first = next(chunks)
730 727 assert first is None
731 728 _test_sync_point_walk_1(repo)
732 729 _test_sync_point_walk_2(repo)
733 730
734 731 return len(entries), totalfilesize, chunks
735 732
736 733
737 734 @contextlib.contextmanager
738 735 def nested(*ctxs):
739 736 this = ctxs[0]
740 737 rest = ctxs[1:]
741 738 with this:
742 739 if rest:
743 740 with nested(*rest):
744 741 yield
745 742 else:
746 743 yield
747 744
748 745
749 746 def consumev2(repo, fp, filecount, filesize):
750 747 """Apply the contents from a version 2 streaming clone.
751 748
752 749 Data is read from an object that only needs to provide a ``read(size)``
753 750 method.
754 751 """
755 752 with repo.lock():
756 753 repo.ui.status(
757 754 _(b'%d files to transfer, %s of data\n')
758 755 % (filecount, util.bytecount(filesize))
759 756 )
760 757
761 758 start = util.timer()
762 759 progress = repo.ui.makeprogress(
763 760 _(b'clone'), total=filesize, unit=_(b'bytes')
764 761 )
765 762 progress.update(0)
766 763
767 764 vfsmap = _makemap(repo)
768 765 # we keep repo.vfs out of the on purpose, ther are too many danger
769 766 # there (eg: .hg/hgrc),
770 767 #
771 768 # this assert is duplicated (from _makemap) as author might think this
772 769 # is fine, while this is really not fine.
773 770 if repo.vfs in vfsmap.values():
774 771 raise error.ProgrammingError(
775 772 b'repo.vfs must not be added to vfsmap for security reasons'
776 773 )
777 774
778 775 with repo.transaction(b'clone'):
779 776 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
780 777 with nested(*ctxs):
781 778 for i in range(filecount):
782 779 src = util.readexactly(fp, 1)
783 780 vfs = vfsmap[src]
784 781 namelen = util.uvarintdecodestream(fp)
785 782 datalen = util.uvarintdecodestream(fp)
786 783
787 784 name = util.readexactly(fp, namelen)
788 785
789 786 if repo.ui.debugflag:
790 787 repo.ui.debug(
791 788 b'adding [%s] %s (%s)\n'
792 789 % (src, name, util.bytecount(datalen))
793 790 )
794 791
795 792 with vfs(name, b'w') as ofp:
796 793 for chunk in util.filechunkiter(fp, limit=datalen):
797 794 progress.increment(step=len(chunk))
798 795 ofp.write(chunk)
799 796
800 797 # force @filecache properties to be reloaded from
801 798 # streamclone-ed file at next access
802 799 repo.invalidate(clearfilecache=True)
803 800
804 801 elapsed = util.timer() - start
805 802 if elapsed <= 0:
806 803 elapsed = 0.001
807 804 repo.ui.status(
808 805 _(b'transferred %s in %.1f seconds (%s/sec)\n')
809 806 % (
810 807 util.bytecount(progress.pos),
811 808 elapsed,
812 809 util.bytecount(progress.pos / elapsed),
813 810 )
814 811 )
815 812 progress.complete()
816 813
817 814
818 815 def applybundlev2(repo, fp, filecount, filesize, requirements):
819 816 from . import localrepo
820 817
821 818 missingreqs = [r for r in requirements if r not in repo.supported]
822 819 if missingreqs:
823 820 raise error.Abort(
824 821 _(b'unable to apply stream clone: unsupported format: %s')
825 822 % b', '.join(sorted(missingreqs))
826 823 )
827 824
828 825 consumev2(repo, fp, filecount, filesize)
829 826
830 827 repo.requirements = new_stream_clone_requirements(
831 828 repo.requirements,
832 829 requirements,
833 830 )
834 831 repo.svfs.options = localrepo.resolvestorevfsoptions(
835 832 repo.ui, repo.requirements, repo.features
836 833 )
837 834 scmutil.writereporequirements(repo)
838 835 nodemap.post_stream_cleanup(repo)
839 836
840 837
841 838 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
842 839 hardlink = [True]
843 840
844 841 def copy_used():
845 842 hardlink[0] = False
846 843 progress.topic = _(b'copying')
847 844
848 845 for k, path, size in entries:
849 846 src_vfs = src_vfs_map[k]
850 847 dst_vfs = dst_vfs_map[k]
851 848 src_path = src_vfs.join(path)
852 849 dst_path = dst_vfs.join(path)
853 850 # We cannot use dirname and makedirs of dst_vfs here because the store
854 851 # encoding confuses them. See issue 6581 for details.
855 852 dirname = os.path.dirname(dst_path)
856 853 if not os.path.exists(dirname):
857 854 util.makedirs(dirname)
858 855 dst_vfs.register_file(path)
859 856 # XXX we could use the #nb_bytes argument.
860 857 util.copyfile(
861 858 src_path,
862 859 dst_path,
863 860 hardlink=hardlink[0],
864 861 no_hardlink_cb=copy_used,
865 862 check_fs_hardlink=False,
866 863 )
867 864 progress.increment()
868 865 return hardlink[0]
869 866
870 867
871 868 def local_copy(src_repo, dest_repo):
872 869 """copy all content from one local repository to another
873 870
874 871 This is useful for local clone"""
875 872 src_store_requirements = {
876 873 r
877 874 for r in src_repo.requirements
878 875 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
879 876 }
880 877 dest_store_requirements = {
881 878 r
882 879 for r in dest_repo.requirements
883 880 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
884 881 }
885 882 assert src_store_requirements == dest_store_requirements
886 883
887 884 with dest_repo.lock():
888 885 with src_repo.lock():
889 886
890 887 # bookmark is not integrated to the streaming as it might use the
891 888 # `repo.vfs` and they are too many sentitive data accessible
892 889 # through `repo.vfs` to expose it to streaming clone.
893 890 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
894 891 srcbookmarks = src_book_vfs.join(b'bookmarks')
895 892 bm_count = 0
896 893 if os.path.exists(srcbookmarks):
897 894 bm_count = 1
898 895
899 896 entries, totalfilesize = _v2_walk(
900 897 src_repo,
901 898 includes=None,
902 899 excludes=None,
903 900 includeobsmarkers=True,
904 901 )
905 902 src_vfs_map = _makemap(src_repo)
906 903 dest_vfs_map = _makemap(dest_repo)
907 904 progress = src_repo.ui.makeprogress(
908 905 topic=_(b'linking'),
909 906 total=len(entries) + bm_count,
910 907 unit=_(b'files'),
911 908 )
912 909 # copy files
913 910 #
914 911 # We could copy the full file while the source repository is locked
915 912 # and the other one without the lock. However, in the linking case,
916 913 # this would also requires checks that nobody is appending any data
917 914 # to the files while we do the clone, so this is not done yet. We
918 915 # could do this blindly when copying files.
919 916 files = ((k, path, size) for k, path, ftype, size in entries)
920 917 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
921 918
922 919 # copy bookmarks over
923 920 if bm_count:
924 921 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
925 922 dstbookmarks = dst_book_vfs.join(b'bookmarks')
926 923 util.copyfile(srcbookmarks, dstbookmarks)
927 924 progress.complete()
928 925 if hardlink:
929 926 msg = b'linked %d files\n'
930 927 else:
931 928 msg = b'copied %d files\n'
932 929 src_repo.ui.debug(msg % (len(entries) + bm_count))
933 930
934 931 with dest_repo.transaction(b"localclone") as tr:
935 932 dest_repo.store.write(tr)
936 933
937 934 # clean up transaction file as they do not make sense
938 undo_files = [(dest_repo.svfs, b'undo.backupfiles')]
939 undo_files.extend(dest_repo.undofiles())
940 for undovfs, undofile in undo_files:
941 try:
942 undovfs.unlink(undofile)
943 except OSError as e:
944 if e.errno != errno.ENOENT:
945 msg = _(b'error removing %s: %s\n')
946 path = undovfs.join(undofile)
947 e_msg = stringutil.forcebytestr(e)
948 msg %= (path, e_msg)
949 dest_repo.ui.warn(msg)
935 repair.cleanup_undo_files(dest_repo)
General Comments 0
You need to be logged in to leave comments. Login now