##// END OF EJS Templates
streamclone: ensure the server sends the right amount of data...
Valentin Gatien-Baron -
r48592:48f07adb stable
parent child Browse files
Show More
@@ -1,906 +1,918 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import contextlib
11 11 import errno
12 12 import os
13 13 import struct
14 14
15 15 from .i18n import _
16 16 from .pycompat import open
17 17 from .interfaces import repository
18 18 from . import (
19 19 bookmarks,
20 20 cacheutil,
21 21 error,
22 22 narrowspec,
23 23 phases,
24 24 pycompat,
25 25 requirements as requirementsmod,
26 26 scmutil,
27 27 store,
28 28 util,
29 29 )
30 30 from .utils import (
31 31 stringutil,
32 32 )
33 33
34 34
35 35 def canperformstreamclone(pullop, bundle2=False):
36 36 """Whether it is possible to perform a streaming clone as part of pull.
37 37
38 38 ``bundle2`` will cause the function to consider stream clone through
39 39 bundle2 and only through bundle2.
40 40
41 41 Returns a tuple of (supported, requirements). ``supported`` is True if
42 42 streaming clone is supported and False otherwise. ``requirements`` is
43 43 a set of repo requirements from the remote, or ``None`` if stream clone
44 44 isn't supported.
45 45 """
46 46 repo = pullop.repo
47 47 remote = pullop.remote
48 48
49 49 bundle2supported = False
50 50 if pullop.canusebundle2:
51 51 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
52 52 bundle2supported = True
53 53 # else
54 54 # Server doesn't support bundle2 stream clone or doesn't support
55 55 # the versions we support. Fall back and possibly allow legacy.
56 56
57 57 # Ensures legacy code path uses available bundle2.
58 58 if bundle2supported and not bundle2:
59 59 return False, None
60 60 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
61 61 elif bundle2 and not bundle2supported:
62 62 return False, None
63 63
64 64 # Streaming clone only works on empty repositories.
65 65 if len(repo):
66 66 return False, None
67 67
68 68 # Streaming clone only works if all data is being requested.
69 69 if pullop.heads:
70 70 return False, None
71 71
72 72 streamrequested = pullop.streamclonerequested
73 73
74 74 # If we don't have a preference, let the server decide for us. This
75 75 # likely only comes into play in LANs.
76 76 if streamrequested is None:
77 77 # The server can advertise whether to prefer streaming clone.
78 78 streamrequested = remote.capable(b'stream-preferred')
79 79
80 80 if not streamrequested:
81 81 return False, None
82 82
83 83 # In order for stream clone to work, the client has to support all the
84 84 # requirements advertised by the server.
85 85 #
86 86 # The server advertises its requirements via the "stream" and "streamreqs"
87 87 # capability. "stream" (a value-less capability) is advertised if and only
88 88 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
89 89 # is advertised and contains a comma-delimited list of requirements.
90 90 requirements = set()
91 91 if remote.capable(b'stream'):
92 92 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
93 93 else:
94 94 streamreqs = remote.capable(b'streamreqs')
95 95 # This is weird and shouldn't happen with modern servers.
96 96 if not streamreqs:
97 97 pullop.repo.ui.warn(
98 98 _(
99 99 b'warning: stream clone requested but server has them '
100 100 b'disabled\n'
101 101 )
102 102 )
103 103 return False, None
104 104
105 105 streamreqs = set(streamreqs.split(b','))
106 106 # Server requires something we don't support. Bail.
107 107 missingreqs = streamreqs - repo.supportedformats
108 108 if missingreqs:
109 109 pullop.repo.ui.warn(
110 110 _(
111 111 b'warning: stream clone requested but client is missing '
112 112 b'requirements: %s\n'
113 113 )
114 114 % b', '.join(sorted(missingreqs))
115 115 )
116 116 pullop.repo.ui.warn(
117 117 _(
118 118 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
119 119 b'for more information)\n'
120 120 )
121 121 )
122 122 return False, None
123 123 requirements = streamreqs
124 124
125 125 return True, requirements
126 126
127 127
128 128 def maybeperformlegacystreamclone(pullop):
129 129 """Possibly perform a legacy stream clone operation.
130 130
131 131 Legacy stream clones are performed as part of pull but before all other
132 132 operations.
133 133
134 134 A legacy stream clone will not be performed if a bundle2 stream clone is
135 135 supported.
136 136 """
137 137 from . import localrepo
138 138
139 139 supported, requirements = canperformstreamclone(pullop)
140 140
141 141 if not supported:
142 142 return
143 143
144 144 repo = pullop.repo
145 145 remote = pullop.remote
146 146
147 147 # Save remote branchmap. We will use it later to speed up branchcache
148 148 # creation.
149 149 rbranchmap = None
150 150 if remote.capable(b'branchmap'):
151 151 with remote.commandexecutor() as e:
152 152 rbranchmap = e.callcommand(b'branchmap', {}).result()
153 153
154 154 repo.ui.status(_(b'streaming all changes\n'))
155 155
156 156 with remote.commandexecutor() as e:
157 157 fp = e.callcommand(b'stream_out', {}).result()
158 158
159 159 # TODO strictly speaking, this code should all be inside the context
160 160 # manager because the context manager is supposed to ensure all wire state
161 161 # is flushed when exiting. But the legacy peers don't do this, so it
162 162 # doesn't matter.
163 163 l = fp.readline()
164 164 try:
165 165 resp = int(l)
166 166 except ValueError:
167 167 raise error.ResponseError(
168 168 _(b'unexpected response from remote server:'), l
169 169 )
170 170 if resp == 1:
171 171 raise error.Abort(_(b'operation forbidden by server'))
172 172 elif resp == 2:
173 173 raise error.Abort(_(b'locking the remote repository failed'))
174 174 elif resp != 0:
175 175 raise error.Abort(_(b'the server sent an unknown error code'))
176 176
177 177 l = fp.readline()
178 178 try:
179 179 filecount, bytecount = map(int, l.split(b' ', 1))
180 180 except (ValueError, TypeError):
181 181 raise error.ResponseError(
182 182 _(b'unexpected response from remote server:'), l
183 183 )
184 184
185 185 with repo.lock():
186 186 consumev1(repo, fp, filecount, bytecount)
187 187
188 188 # new requirements = old non-format requirements +
189 189 # new format-related remote requirements
190 190 # requirements from the streamed-in repository
191 191 repo.requirements = requirements | (
192 192 repo.requirements - repo.supportedformats
193 193 )
194 194 repo.svfs.options = localrepo.resolvestorevfsoptions(
195 195 repo.ui, repo.requirements, repo.features
196 196 )
197 197 scmutil.writereporequirements(repo)
198 198
199 199 if rbranchmap:
200 200 repo._branchcaches.replace(repo, rbranchmap)
201 201
202 202 repo.invalidate()
203 203
204 204
205 205 def allowservergeneration(repo):
206 206 """Whether streaming clones are allowed from the server."""
207 207 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
208 208 return False
209 209
210 210 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
211 211 return False
212 212
213 213 # The way stream clone works makes it impossible to hide secret changesets.
214 214 # So don't allow this by default.
215 215 secret = phases.hassecret(repo)
216 216 if secret:
217 217 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
218 218
219 219 return True
220 220
221 221
222 222 # This is it's own function so extensions can override it.
223 223 def _walkstreamfiles(repo, matcher=None):
224 224 return repo.store.walk(matcher)
225 225
226 226
227 227 def generatev1(repo):
228 228 """Emit content for version 1 of a streaming clone.
229 229
230 230 This returns a 3-tuple of (file count, byte size, data iterator).
231 231
232 232 The data iterator consists of N entries for each file being transferred.
233 233 Each file entry starts as a line with the file name and integer size
234 234 delimited by a null byte.
235 235
236 236 The raw file data follows. Following the raw file data is the next file
237 237 entry, or EOF.
238 238
239 239 When used on the wire protocol, an additional line indicating protocol
240 240 success will be prepended to the stream. This function is not responsible
241 241 for adding it.
242 242
243 243 This function will obtain a repository lock to ensure a consistent view of
244 244 the store is captured. It therefore may raise LockError.
245 245 """
246 246 entries = []
247 247 total_bytes = 0
248 248 # Get consistent snapshot of repo, lock during scan.
249 249 with repo.lock():
250 250 repo.ui.debug(b'scanning\n')
251 251 for file_type, name, ename, size in _walkstreamfiles(repo):
252 252 if size:
253 253 entries.append((name, size))
254 254 total_bytes += size
255 255 _test_sync_point_walk_1(repo)
256 256 _test_sync_point_walk_2(repo)
257 257
258 258 repo.ui.debug(
259 259 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
260 260 )
261 261
262 262 svfs = repo.svfs
263 263 debugflag = repo.ui.debugflag
264 264
265 265 def emitrevlogdata():
266 266 for name, size in entries:
267 267 if debugflag:
268 268 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
269 269 # partially encode name over the wire for backwards compat
270 270 yield b'%s\0%d\n' % (store.encodedir(name), size)
271 271 # auditing at this stage is both pointless (paths are already
272 272 # trusted by the local repo) and expensive
273 273 with svfs(name, b'rb', auditpath=False) as fp:
274 274 if size <= 65536:
275 275 yield fp.read(size)
276 276 else:
277 277 for chunk in util.filechunkiter(fp, limit=size):
278 278 yield chunk
279 279
280 280 return len(entries), total_bytes, emitrevlogdata()
281 281
282 282
283 283 def generatev1wireproto(repo):
284 284 """Emit content for version 1 of streaming clone suitable for the wire.
285 285
286 286 This is the data output from ``generatev1()`` with 2 header lines. The
287 287 first line indicates overall success. The 2nd contains the file count and
288 288 byte size of payload.
289 289
290 290 The success line contains "0" for success, "1" for stream generation not
291 291 allowed, and "2" for error locking the repository (possibly indicating
292 292 a permissions error for the server process).
293 293 """
294 294 if not allowservergeneration(repo):
295 295 yield b'1\n'
296 296 return
297 297
298 298 try:
299 299 filecount, bytecount, it = generatev1(repo)
300 300 except error.LockError:
301 301 yield b'2\n'
302 302 return
303 303
304 304 # Indicates successful response.
305 305 yield b'0\n'
306 306 yield b'%d %d\n' % (filecount, bytecount)
307 307 for chunk in it:
308 308 yield chunk
309 309
310 310
311 311 def generatebundlev1(repo, compression=b'UN'):
312 312 """Emit content for version 1 of a stream clone bundle.
313 313
314 314 The first 4 bytes of the output ("HGS1") denote this as stream clone
315 315 bundle version 1.
316 316
317 317 The next 2 bytes indicate the compression type. Only "UN" is currently
318 318 supported.
319 319
320 320 The next 16 bytes are two 64-bit big endian unsigned integers indicating
321 321 file count and byte count, respectively.
322 322
323 323 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
324 324 of the requirements string, including a trailing \0. The following N bytes
325 325 are the requirements string, which is ASCII containing a comma-delimited
326 326 list of repo requirements that are needed to support the data.
327 327
328 328 The remaining content is the output of ``generatev1()`` (which may be
329 329 compressed in the future).
330 330
331 331 Returns a tuple of (requirements, data generator).
332 332 """
333 333 if compression != b'UN':
334 334 raise ValueError(b'we do not support the compression argument yet')
335 335
336 336 requirements = repo.requirements & repo.supportedformats
337 337 requires = b','.join(sorted(requirements))
338 338
339 339 def gen():
340 340 yield b'HGS1'
341 341 yield compression
342 342
343 343 filecount, bytecount, it = generatev1(repo)
344 344 repo.ui.status(
345 345 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
346 346 )
347 347
348 348 yield struct.pack(b'>QQ', filecount, bytecount)
349 349 yield struct.pack(b'>H', len(requires) + 1)
350 350 yield requires + b'\0'
351 351
352 352 # This is where we'll add compression in the future.
353 353 assert compression == b'UN'
354 354
355 355 progress = repo.ui.makeprogress(
356 356 _(b'bundle'), total=bytecount, unit=_(b'bytes')
357 357 )
358 358 progress.update(0)
359 359
360 360 for chunk in it:
361 361 progress.increment(step=len(chunk))
362 362 yield chunk
363 363
364 364 progress.complete()
365 365
366 366 return requirements, gen()
367 367
368 368
369 369 def consumev1(repo, fp, filecount, bytecount):
370 370 """Apply the contents from version 1 of a streaming clone file handle.
371 371
372 372 This takes the output from "stream_out" and applies it to the specified
373 373 repository.
374 374
375 375 Like "stream_out," the status line added by the wire protocol is not
376 376 handled by this function.
377 377 """
378 378 with repo.lock():
379 379 repo.ui.status(
380 380 _(b'%d files to transfer, %s of data\n')
381 381 % (filecount, util.bytecount(bytecount))
382 382 )
383 383 progress = repo.ui.makeprogress(
384 384 _(b'clone'), total=bytecount, unit=_(b'bytes')
385 385 )
386 386 progress.update(0)
387 387 start = util.timer()
388 388
389 389 # TODO: get rid of (potential) inconsistency
390 390 #
391 391 # If transaction is started and any @filecache property is
392 392 # changed at this point, it causes inconsistency between
393 393 # in-memory cached property and streamclone-ed file on the
394 394 # disk. Nested transaction prevents transaction scope "clone"
395 395 # below from writing in-memory changes out at the end of it,
396 396 # even though in-memory changes are discarded at the end of it
397 397 # regardless of transaction nesting.
398 398 #
399 399 # But transaction nesting can't be simply prohibited, because
400 400 # nesting occurs also in ordinary case (e.g. enabling
401 401 # clonebundles).
402 402
403 403 with repo.transaction(b'clone'):
404 404 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
405 405 for i in pycompat.xrange(filecount):
406 406 # XXX doesn't support '\n' or '\r' in filenames
407 407 l = fp.readline()
408 408 try:
409 409 name, size = l.split(b'\0', 1)
410 410 size = int(size)
411 411 except (ValueError, TypeError):
412 412 raise error.ResponseError(
413 413 _(b'unexpected response from remote server:'), l
414 414 )
415 415 if repo.ui.debugflag:
416 416 repo.ui.debug(
417 417 b'adding %s (%s)\n' % (name, util.bytecount(size))
418 418 )
419 419 # for backwards compat, name was partially encoded
420 420 path = store.decodedir(name)
421 421 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
422 422 for chunk in util.filechunkiter(fp, limit=size):
423 423 progress.increment(step=len(chunk))
424 424 ofp.write(chunk)
425 425
426 426 # force @filecache properties to be reloaded from
427 427 # streamclone-ed file at next access
428 428 repo.invalidate(clearfilecache=True)
429 429
430 430 elapsed = util.timer() - start
431 431 if elapsed <= 0:
432 432 elapsed = 0.001
433 433 progress.complete()
434 434 repo.ui.status(
435 435 _(b'transferred %s in %.1f seconds (%s/sec)\n')
436 436 % (
437 437 util.bytecount(bytecount),
438 438 elapsed,
439 439 util.bytecount(bytecount / elapsed),
440 440 )
441 441 )
442 442
443 443
444 444 def readbundle1header(fp):
445 445 compression = fp.read(2)
446 446 if compression != b'UN':
447 447 raise error.Abort(
448 448 _(
449 449 b'only uncompressed stream clone bundles are '
450 450 b'supported; got %s'
451 451 )
452 452 % compression
453 453 )
454 454
455 455 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
456 456 requireslen = struct.unpack(b'>H', fp.read(2))[0]
457 457 requires = fp.read(requireslen)
458 458
459 459 if not requires.endswith(b'\0'):
460 460 raise error.Abort(
461 461 _(
462 462 b'malformed stream clone bundle: '
463 463 b'requirements not properly encoded'
464 464 )
465 465 )
466 466
467 467 requirements = set(requires.rstrip(b'\0').split(b','))
468 468
469 469 return filecount, bytecount, requirements
470 470
471 471
472 472 def applybundlev1(repo, fp):
473 473 """Apply the content from a stream clone bundle version 1.
474 474
475 475 We assume the 4 byte header has been read and validated and the file handle
476 476 is at the 2 byte compression identifier.
477 477 """
478 478 if len(repo):
479 479 raise error.Abort(
480 480 _(b'cannot apply stream clone bundle on non-empty repo')
481 481 )
482 482
483 483 filecount, bytecount, requirements = readbundle1header(fp)
484 484 missingreqs = requirements - repo.supportedformats
485 485 if missingreqs:
486 486 raise error.Abort(
487 487 _(b'unable to apply stream clone: unsupported format: %s')
488 488 % b', '.join(sorted(missingreqs))
489 489 )
490 490
491 491 consumev1(repo, fp, filecount, bytecount)
492 492
493 493
494 494 class streamcloneapplier(object):
495 495 """Class to manage applying streaming clone bundles.
496 496
497 497 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
498 498 readers to perform bundle type-specific functionality.
499 499 """
500 500
501 501 def __init__(self, fh):
502 502 self._fh = fh
503 503
504 504 def apply(self, repo):
505 505 return applybundlev1(repo, self._fh)
506 506
507 507
508 508 # type of file to stream
509 509 _fileappend = 0 # append only file
510 510 _filefull = 1 # full snapshot file
511 511
512 512 # Source of the file
513 513 _srcstore = b's' # store (svfs)
514 514 _srccache = b'c' # cache (cache)
515 515
516 516 # This is it's own function so extensions can override it.
517 517 def _walkstreamfullstorefiles(repo):
518 518 """list snapshot file from the store"""
519 519 fnames = []
520 520 if not repo.publishing():
521 521 fnames.append(b'phaseroots')
522 522 return fnames
523 523
524 524
525 525 def _filterfull(entry, copy, vfsmap):
526 526 """actually copy the snapshot files"""
527 527 src, name, ftype, data = entry
528 528 if ftype != _filefull:
529 529 return entry
530 530 return (src, name, ftype, copy(vfsmap[src].join(name)))
531 531
532 532
533 533 @contextlib.contextmanager
534 534 def maketempcopies():
535 535 """return a function to temporary copy file"""
536 536 files = []
537 537 try:
538 538
539 539 def copy(src):
540 540 fd, dst = pycompat.mkstemp()
541 541 os.close(fd)
542 542 files.append(dst)
543 543 util.copyfiles(src, dst, hardlink=True)
544 544 return dst
545 545
546 546 yield copy
547 547 finally:
548 548 for tmp in files:
549 549 util.tryunlink(tmp)
550 550
551 551
552 552 def _makemap(repo):
553 553 """make a (src -> vfs) map for the repo"""
554 554 vfsmap = {
555 555 _srcstore: repo.svfs,
556 556 _srccache: repo.cachevfs,
557 557 }
558 558 # we keep repo.vfs out of the on purpose, ther are too many danger there
559 559 # (eg: .hg/hgrc)
560 560 assert repo.vfs not in vfsmap.values()
561 561
562 562 return vfsmap
563 563
564 564
565 565 def _emit2(repo, entries, totalfilesize):
566 566 """actually emit the stream bundle"""
567 567 vfsmap = _makemap(repo)
568 568 # we keep repo.vfs out of the on purpose, ther are too many danger there
569 569 # (eg: .hg/hgrc),
570 570 #
571 571 # this assert is duplicated (from _makemap) as author might think this is
572 572 # fine, while this is really not fine.
573 573 if repo.vfs in vfsmap.values():
574 574 raise error.ProgrammingError(
575 575 b'repo.vfs must not be added to vfsmap for security reasons'
576 576 )
577 577
578 578 progress = repo.ui.makeprogress(
579 579 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
580 580 )
581 581 progress.update(0)
582 582 with maketempcopies() as copy, progress:
583 583 # copy is delayed until we are in the try
584 584 entries = [_filterfull(e, copy, vfsmap) for e in entries]
585 585 yield None # this release the lock on the repository
586 seen = 0
586 totalbytecount = 0
587 587
588 588 for src, name, ftype, data in entries:
589 589 vfs = vfsmap[src]
590 590 yield src
591 591 yield util.uvarintencode(len(name))
592 592 if ftype == _fileappend:
593 593 fp = vfs(name)
594 594 size = data
595 595 elif ftype == _filefull:
596 596 fp = open(data, b'rb')
597 597 size = util.fstat(fp).st_size
598 bytecount = 0
598 599 try:
599 600 yield util.uvarintencode(size)
600 601 yield name
601 602 if size <= 65536:
602 603 chunks = (fp.read(size),)
603 604 else:
604 605 chunks = util.filechunkiter(fp, limit=size)
605 606 for chunk in chunks:
606 seen += len(chunk)
607 progress.update(seen)
607 bytecount += len(chunk)
608 totalbytecount += len(chunk)
609 progress.update(totalbytecount)
608 610 yield chunk
611 if bytecount != size:
612 # Would most likely be caused by a race due to `hg strip` or
613 # a revlog split
614 raise error.Abort(
615 _(
616 b'clone could only read %d bytes from %s, but '
617 b'expected %d bytes'
618 )
619 % (bytecount, name, size)
620 )
609 621 finally:
610 622 fp.close()
611 623
612 624
613 625 def _test_sync_point_walk_1(repo):
614 626 """a function for synchronisation during tests"""
615 627
616 628
617 629 def _test_sync_point_walk_2(repo):
618 630 """a function for synchronisation during tests"""
619 631
620 632
621 633 def _v2_walk(repo, includes, excludes, includeobsmarkers):
622 634 """emit a seris of files information useful to clone a repo
623 635
624 636 return (entries, totalfilesize)
625 637
626 638 entries is a list of tuple (vfs-key, file-path, file-type, size)
627 639
628 640 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
629 641 - `name`: file path of the file to copy (to be feed to the vfss)
630 642 - `file-type`: do this file need to be copied with the source lock ?
631 643 - `size`: the size of the file (or None)
632 644 """
633 645 assert repo._currentlock(repo._lockref) is not None
634 646 entries = []
635 647 totalfilesize = 0
636 648
637 649 matcher = None
638 650 if includes or excludes:
639 651 matcher = narrowspec.match(repo.root, includes, excludes)
640 652
641 653 for rl_type, name, ename, size in _walkstreamfiles(repo, matcher):
642 654 if size:
643 655 ft = _fileappend
644 656 if rl_type & store.FILEFLAGS_VOLATILE:
645 657 ft = _filefull
646 658 entries.append((_srcstore, name, ft, size))
647 659 totalfilesize += size
648 660 for name in _walkstreamfullstorefiles(repo):
649 661 if repo.svfs.exists(name):
650 662 totalfilesize += repo.svfs.lstat(name).st_size
651 663 entries.append((_srcstore, name, _filefull, None))
652 664 if includeobsmarkers and repo.svfs.exists(b'obsstore'):
653 665 totalfilesize += repo.svfs.lstat(b'obsstore').st_size
654 666 entries.append((_srcstore, b'obsstore', _filefull, None))
655 667 for name in cacheutil.cachetocopy(repo):
656 668 if repo.cachevfs.exists(name):
657 669 totalfilesize += repo.cachevfs.lstat(name).st_size
658 670 entries.append((_srccache, name, _filefull, None))
659 671 return entries, totalfilesize
660 672
661 673
662 674 def generatev2(repo, includes, excludes, includeobsmarkers):
663 675 """Emit content for version 2 of a streaming clone.
664 676
665 677 the data stream consists the following entries:
666 678 1) A char representing the file destination (eg: store or cache)
667 679 2) A varint containing the length of the filename
668 680 3) A varint containing the length of file data
669 681 4) N bytes containing the filename (the internal, store-agnostic form)
670 682 5) N bytes containing the file data
671 683
672 684 Returns a 3-tuple of (file count, file size, data iterator).
673 685 """
674 686
675 687 with repo.lock():
676 688
677 689 repo.ui.debug(b'scanning\n')
678 690
679 691 entries, totalfilesize = _v2_walk(
680 692 repo,
681 693 includes=includes,
682 694 excludes=excludes,
683 695 includeobsmarkers=includeobsmarkers,
684 696 )
685 697
686 698 chunks = _emit2(repo, entries, totalfilesize)
687 699 first = next(chunks)
688 700 assert first is None
689 701 _test_sync_point_walk_1(repo)
690 702 _test_sync_point_walk_2(repo)
691 703
692 704 return len(entries), totalfilesize, chunks
693 705
694 706
695 707 @contextlib.contextmanager
696 708 def nested(*ctxs):
697 709 this = ctxs[0]
698 710 rest = ctxs[1:]
699 711 with this:
700 712 if rest:
701 713 with nested(*rest):
702 714 yield
703 715 else:
704 716 yield
705 717
706 718
707 719 def consumev2(repo, fp, filecount, filesize):
708 720 """Apply the contents from a version 2 streaming clone.
709 721
710 722 Data is read from an object that only needs to provide a ``read(size)``
711 723 method.
712 724 """
713 725 with repo.lock():
714 726 repo.ui.status(
715 727 _(b'%d files to transfer, %s of data\n')
716 728 % (filecount, util.bytecount(filesize))
717 729 )
718 730
719 731 start = util.timer()
720 732 progress = repo.ui.makeprogress(
721 733 _(b'clone'), total=filesize, unit=_(b'bytes')
722 734 )
723 735 progress.update(0)
724 736
725 737 vfsmap = _makemap(repo)
726 738 # we keep repo.vfs out of the on purpose, ther are too many danger
727 739 # there (eg: .hg/hgrc),
728 740 #
729 741 # this assert is duplicated (from _makemap) as author might think this
730 742 # is fine, while this is really not fine.
731 743 if repo.vfs in vfsmap.values():
732 744 raise error.ProgrammingError(
733 745 b'repo.vfs must not be added to vfsmap for security reasons'
734 746 )
735 747
736 748 with repo.transaction(b'clone'):
737 749 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
738 750 with nested(*ctxs):
739 751 for i in range(filecount):
740 752 src = util.readexactly(fp, 1)
741 753 vfs = vfsmap[src]
742 754 namelen = util.uvarintdecodestream(fp)
743 755 datalen = util.uvarintdecodestream(fp)
744 756
745 757 name = util.readexactly(fp, namelen)
746 758
747 759 if repo.ui.debugflag:
748 760 repo.ui.debug(
749 761 b'adding [%s] %s (%s)\n'
750 762 % (src, name, util.bytecount(datalen))
751 763 )
752 764
753 765 with vfs(name, b'w') as ofp:
754 766 for chunk in util.filechunkiter(fp, limit=datalen):
755 767 progress.increment(step=len(chunk))
756 768 ofp.write(chunk)
757 769
758 770 # force @filecache properties to be reloaded from
759 771 # streamclone-ed file at next access
760 772 repo.invalidate(clearfilecache=True)
761 773
762 774 elapsed = util.timer() - start
763 775 if elapsed <= 0:
764 776 elapsed = 0.001
765 777 repo.ui.status(
766 778 _(b'transferred %s in %.1f seconds (%s/sec)\n')
767 779 % (
768 780 util.bytecount(progress.pos),
769 781 elapsed,
770 782 util.bytecount(progress.pos / elapsed),
771 783 )
772 784 )
773 785 progress.complete()
774 786
775 787
776 788 def applybundlev2(repo, fp, filecount, filesize, requirements):
777 789 from . import localrepo
778 790
779 791 missingreqs = [r for r in requirements if r not in repo.supported]
780 792 if missingreqs:
781 793 raise error.Abort(
782 794 _(b'unable to apply stream clone: unsupported format: %s')
783 795 % b', '.join(sorted(missingreqs))
784 796 )
785 797
786 798 consumev2(repo, fp, filecount, filesize)
787 799
788 800 # new requirements = old non-format requirements +
789 801 # new format-related remote requirements
790 802 # requirements from the streamed-in repository
791 803 repo.requirements = set(requirements) | (
792 804 repo.requirements - repo.supportedformats
793 805 )
794 806 repo.svfs.options = localrepo.resolvestorevfsoptions(
795 807 repo.ui, repo.requirements, repo.features
796 808 )
797 809 scmutil.writereporequirements(repo)
798 810
799 811
800 812 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
801 813 hardlink = [True]
802 814
803 815 def copy_used():
804 816 hardlink[0] = False
805 817 progress.topic = _(b'copying')
806 818
807 819 for k, path, size in entries:
808 820 src_vfs = src_vfs_map[k]
809 821 dst_vfs = dst_vfs_map[k]
810 822 src_path = src_vfs.join(path)
811 823 dst_path = dst_vfs.join(path)
812 824 dirname = dst_vfs.dirname(path)
813 825 if not dst_vfs.exists(dirname):
814 826 dst_vfs.makedirs(dirname)
815 827 dst_vfs.register_file(path)
816 828 # XXX we could use the #nb_bytes argument.
817 829 util.copyfile(
818 830 src_path,
819 831 dst_path,
820 832 hardlink=hardlink[0],
821 833 no_hardlink_cb=copy_used,
822 834 check_fs_hardlink=False,
823 835 )
824 836 progress.increment()
825 837 return hardlink[0]
826 838
827 839
828 840 def local_copy(src_repo, dest_repo):
829 841 """copy all content from one local repository to another
830 842
831 843 This is useful for local clone"""
832 844 src_store_requirements = {
833 845 r
834 846 for r in src_repo.requirements
835 847 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
836 848 }
837 849 dest_store_requirements = {
838 850 r
839 851 for r in dest_repo.requirements
840 852 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
841 853 }
842 854 assert src_store_requirements == dest_store_requirements
843 855
844 856 with dest_repo.lock():
845 857 with src_repo.lock():
846 858
847 859 # bookmark is not integrated to the streaming as it might use the
848 860 # `repo.vfs` and they are too many sentitive data accessible
849 861 # through `repo.vfs` to expose it to streaming clone.
850 862 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
851 863 srcbookmarks = src_book_vfs.join(b'bookmarks')
852 864 bm_count = 0
853 865 if os.path.exists(srcbookmarks):
854 866 bm_count = 1
855 867
856 868 entries, totalfilesize = _v2_walk(
857 869 src_repo,
858 870 includes=None,
859 871 excludes=None,
860 872 includeobsmarkers=True,
861 873 )
862 874 src_vfs_map = _makemap(src_repo)
863 875 dest_vfs_map = _makemap(dest_repo)
864 876 progress = src_repo.ui.makeprogress(
865 877 topic=_(b'linking'),
866 878 total=len(entries) + bm_count,
867 879 unit=_(b'files'),
868 880 )
869 881 # copy files
870 882 #
871 883 # We could copy the full file while the source repository is locked
872 884 # and the other one without the lock. However, in the linking case,
873 885 # this would also requires checks that nobody is appending any data
874 886 # to the files while we do the clone, so this is not done yet. We
875 887 # could do this blindly when copying files.
876 888 files = ((k, path, size) for k, path, ftype, size in entries)
877 889 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
878 890
879 891 # copy bookmarks over
880 892 if bm_count:
881 893 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
882 894 dstbookmarks = dst_book_vfs.join(b'bookmarks')
883 895 util.copyfile(srcbookmarks, dstbookmarks)
884 896 progress.complete()
885 897 if hardlink:
886 898 msg = b'linked %d files\n'
887 899 else:
888 900 msg = b'copied %d files\n'
889 901 src_repo.ui.debug(msg % (len(entries) + bm_count))
890 902
891 903 with dest_repo.transaction(b"localclone") as tr:
892 904 dest_repo.store.write(tr)
893 905
894 906 # clean up transaction file as they do not make sense
895 907 undo_files = [(dest_repo.svfs, b'undo.backupfiles')]
896 908 undo_files.extend(dest_repo.undofiles())
897 909 for undovfs, undofile in undo_files:
898 910 try:
899 911 undovfs.unlink(undofile)
900 912 except OSError as e:
901 913 if e.errno != errno.ENOENT:
902 914 msg = _(b'error removing %s: %s\n')
903 915 path = undovfs.join(undofile)
904 916 e_msg = stringutil.forcebytestr(e)
905 917 msg %= (path, e_msg)
906 918 dest_repo.ui.warn(msg)
General Comments 0
You need to be logged in to leave comments. Login now