##// END OF EJS Templates
clone: also report the bookmark file as copied...
marmoute -
r48241:d3702566 default
parent child Browse files
Show More
@@ -1,879 +1,887 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import contextlib
11 11 import errno
12 12 import os
13 13 import struct
14 14
15 15 from .i18n import _
16 16 from .pycompat import open
17 17 from .interfaces import repository
18 18 from . import (
19 19 bookmarks,
20 20 cacheutil,
21 21 error,
22 22 narrowspec,
23 23 phases,
24 24 pycompat,
25 25 requirements as requirementsmod,
26 26 scmutil,
27 27 store,
28 28 util,
29 29 )
30 30 from .utils import (
31 31 stringutil,
32 32 )
33 33
34 34
35 35 def canperformstreamclone(pullop, bundle2=False):
36 36 """Whether it is possible to perform a streaming clone as part of pull.
37 37
38 38 ``bundle2`` will cause the function to consider stream clone through
39 39 bundle2 and only through bundle2.
40 40
41 41 Returns a tuple of (supported, requirements). ``supported`` is True if
42 42 streaming clone is supported and False otherwise. ``requirements`` is
43 43 a set of repo requirements from the remote, or ``None`` if stream clone
44 44 isn't supported.
45 45 """
46 46 repo = pullop.repo
47 47 remote = pullop.remote
48 48
49 49 bundle2supported = False
50 50 if pullop.canusebundle2:
51 51 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
52 52 bundle2supported = True
53 53 # else
54 54 # Server doesn't support bundle2 stream clone or doesn't support
55 55 # the versions we support. Fall back and possibly allow legacy.
56 56
57 57 # Ensures legacy code path uses available bundle2.
58 58 if bundle2supported and not bundle2:
59 59 return False, None
60 60 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
61 61 elif bundle2 and not bundle2supported:
62 62 return False, None
63 63
64 64 # Streaming clone only works on empty repositories.
65 65 if len(repo):
66 66 return False, None
67 67
68 68 # Streaming clone only works if all data is being requested.
69 69 if pullop.heads:
70 70 return False, None
71 71
72 72 streamrequested = pullop.streamclonerequested
73 73
74 74 # If we don't have a preference, let the server decide for us. This
75 75 # likely only comes into play in LANs.
76 76 if streamrequested is None:
77 77 # The server can advertise whether to prefer streaming clone.
78 78 streamrequested = remote.capable(b'stream-preferred')
79 79
80 80 if not streamrequested:
81 81 return False, None
82 82
83 83 # In order for stream clone to work, the client has to support all the
84 84 # requirements advertised by the server.
85 85 #
86 86 # The server advertises its requirements via the "stream" and "streamreqs"
87 87 # capability. "stream" (a value-less capability) is advertised if and only
88 88 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
89 89 # is advertised and contains a comma-delimited list of requirements.
90 90 requirements = set()
91 91 if remote.capable(b'stream'):
92 92 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
93 93 else:
94 94 streamreqs = remote.capable(b'streamreqs')
95 95 # This is weird and shouldn't happen with modern servers.
96 96 if not streamreqs:
97 97 pullop.repo.ui.warn(
98 98 _(
99 99 b'warning: stream clone requested but server has them '
100 100 b'disabled\n'
101 101 )
102 102 )
103 103 return False, None
104 104
105 105 streamreqs = set(streamreqs.split(b','))
106 106 # Server requires something we don't support. Bail.
107 107 missingreqs = streamreqs - repo.supportedformats
108 108 if missingreqs:
109 109 pullop.repo.ui.warn(
110 110 _(
111 111 b'warning: stream clone requested but client is missing '
112 112 b'requirements: %s\n'
113 113 )
114 114 % b', '.join(sorted(missingreqs))
115 115 )
116 116 pullop.repo.ui.warn(
117 117 _(
118 118 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
119 119 b'for more information)\n'
120 120 )
121 121 )
122 122 return False, None
123 123 requirements = streamreqs
124 124
125 125 return True, requirements
126 126
127 127
128 128 def maybeperformlegacystreamclone(pullop):
129 129 """Possibly perform a legacy stream clone operation.
130 130
131 131 Legacy stream clones are performed as part of pull but before all other
132 132 operations.
133 133
134 134 A legacy stream clone will not be performed if a bundle2 stream clone is
135 135 supported.
136 136 """
137 137 from . import localrepo
138 138
139 139 supported, requirements = canperformstreamclone(pullop)
140 140
141 141 if not supported:
142 142 return
143 143
144 144 repo = pullop.repo
145 145 remote = pullop.remote
146 146
147 147 # Save remote branchmap. We will use it later to speed up branchcache
148 148 # creation.
149 149 rbranchmap = None
150 150 if remote.capable(b'branchmap'):
151 151 with remote.commandexecutor() as e:
152 152 rbranchmap = e.callcommand(b'branchmap', {}).result()
153 153
154 154 repo.ui.status(_(b'streaming all changes\n'))
155 155
156 156 with remote.commandexecutor() as e:
157 157 fp = e.callcommand(b'stream_out', {}).result()
158 158
159 159 # TODO strictly speaking, this code should all be inside the context
160 160 # manager because the context manager is supposed to ensure all wire state
161 161 # is flushed when exiting. But the legacy peers don't do this, so it
162 162 # doesn't matter.
163 163 l = fp.readline()
164 164 try:
165 165 resp = int(l)
166 166 except ValueError:
167 167 raise error.ResponseError(
168 168 _(b'unexpected response from remote server:'), l
169 169 )
170 170 if resp == 1:
171 171 raise error.Abort(_(b'operation forbidden by server'))
172 172 elif resp == 2:
173 173 raise error.Abort(_(b'locking the remote repository failed'))
174 174 elif resp != 0:
175 175 raise error.Abort(_(b'the server sent an unknown error code'))
176 176
177 177 l = fp.readline()
178 178 try:
179 179 filecount, bytecount = map(int, l.split(b' ', 1))
180 180 except (ValueError, TypeError):
181 181 raise error.ResponseError(
182 182 _(b'unexpected response from remote server:'), l
183 183 )
184 184
185 185 with repo.lock():
186 186 consumev1(repo, fp, filecount, bytecount)
187 187
188 188 # new requirements = old non-format requirements +
189 189 # new format-related remote requirements
190 190 # requirements from the streamed-in repository
191 191 repo.requirements = requirements | (
192 192 repo.requirements - repo.supportedformats
193 193 )
194 194 repo.svfs.options = localrepo.resolvestorevfsoptions(
195 195 repo.ui, repo.requirements, repo.features
196 196 )
197 197 scmutil.writereporequirements(repo)
198 198
199 199 if rbranchmap:
200 200 repo._branchcaches.replace(repo, rbranchmap)
201 201
202 202 repo.invalidate()
203 203
204 204
205 205 def allowservergeneration(repo):
206 206 """Whether streaming clones are allowed from the server."""
207 207 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
208 208 return False
209 209
210 210 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
211 211 return False
212 212
213 213 # The way stream clone works makes it impossible to hide secret changesets.
214 214 # So don't allow this by default.
215 215 secret = phases.hassecret(repo)
216 216 if secret:
217 217 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
218 218
219 219 return True
220 220
221 221
222 222 # This is it's own function so extensions can override it.
223 223 def _walkstreamfiles(repo, matcher=None):
224 224 return repo.store.walk(matcher)
225 225
226 226
227 227 def generatev1(repo):
228 228 """Emit content for version 1 of a streaming clone.
229 229
230 230 This returns a 3-tuple of (file count, byte size, data iterator).
231 231
232 232 The data iterator consists of N entries for each file being transferred.
233 233 Each file entry starts as a line with the file name and integer size
234 234 delimited by a null byte.
235 235
236 236 The raw file data follows. Following the raw file data is the next file
237 237 entry, or EOF.
238 238
239 239 When used on the wire protocol, an additional line indicating protocol
240 240 success will be prepended to the stream. This function is not responsible
241 241 for adding it.
242 242
243 243 This function will obtain a repository lock to ensure a consistent view of
244 244 the store is captured. It therefore may raise LockError.
245 245 """
246 246 entries = []
247 247 total_bytes = 0
248 248 # Get consistent snapshot of repo, lock during scan.
249 249 with repo.lock():
250 250 repo.ui.debug(b'scanning\n')
251 251 for file_type, name, ename, size in _walkstreamfiles(repo):
252 252 if size:
253 253 entries.append((name, size))
254 254 total_bytes += size
255 255 _test_sync_point_walk_1(repo)
256 256 _test_sync_point_walk_2(repo)
257 257
258 258 repo.ui.debug(
259 259 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
260 260 )
261 261
262 262 svfs = repo.svfs
263 263 debugflag = repo.ui.debugflag
264 264
265 265 def emitrevlogdata():
266 266 for name, size in entries:
267 267 if debugflag:
268 268 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
269 269 # partially encode name over the wire for backwards compat
270 270 yield b'%s\0%d\n' % (store.encodedir(name), size)
271 271 # auditing at this stage is both pointless (paths are already
272 272 # trusted by the local repo) and expensive
273 273 with svfs(name, b'rb', auditpath=False) as fp:
274 274 if size <= 65536:
275 275 yield fp.read(size)
276 276 else:
277 277 for chunk in util.filechunkiter(fp, limit=size):
278 278 yield chunk
279 279
280 280 return len(entries), total_bytes, emitrevlogdata()
281 281
282 282
283 283 def generatev1wireproto(repo):
284 284 """Emit content for version 1 of streaming clone suitable for the wire.
285 285
286 286 This is the data output from ``generatev1()`` with 2 header lines. The
287 287 first line indicates overall success. The 2nd contains the file count and
288 288 byte size of payload.
289 289
290 290 The success line contains "0" for success, "1" for stream generation not
291 291 allowed, and "2" for error locking the repository (possibly indicating
292 292 a permissions error for the server process).
293 293 """
294 294 if not allowservergeneration(repo):
295 295 yield b'1\n'
296 296 return
297 297
298 298 try:
299 299 filecount, bytecount, it = generatev1(repo)
300 300 except error.LockError:
301 301 yield b'2\n'
302 302 return
303 303
304 304 # Indicates successful response.
305 305 yield b'0\n'
306 306 yield b'%d %d\n' % (filecount, bytecount)
307 307 for chunk in it:
308 308 yield chunk
309 309
310 310
311 311 def generatebundlev1(repo, compression=b'UN'):
312 312 """Emit content for version 1 of a stream clone bundle.
313 313
314 314 The first 4 bytes of the output ("HGS1") denote this as stream clone
315 315 bundle version 1.
316 316
317 317 The next 2 bytes indicate the compression type. Only "UN" is currently
318 318 supported.
319 319
320 320 The next 16 bytes are two 64-bit big endian unsigned integers indicating
321 321 file count and byte count, respectively.
322 322
323 323 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
324 324 of the requirements string, including a trailing \0. The following N bytes
325 325 are the requirements string, which is ASCII containing a comma-delimited
326 326 list of repo requirements that are needed to support the data.
327 327
328 328 The remaining content is the output of ``generatev1()`` (which may be
329 329 compressed in the future).
330 330
331 331 Returns a tuple of (requirements, data generator).
332 332 """
333 333 if compression != b'UN':
334 334 raise ValueError(b'we do not support the compression argument yet')
335 335
336 336 requirements = repo.requirements & repo.supportedformats
337 337 requires = b','.join(sorted(requirements))
338 338
339 339 def gen():
340 340 yield b'HGS1'
341 341 yield compression
342 342
343 343 filecount, bytecount, it = generatev1(repo)
344 344 repo.ui.status(
345 345 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
346 346 )
347 347
348 348 yield struct.pack(b'>QQ', filecount, bytecount)
349 349 yield struct.pack(b'>H', len(requires) + 1)
350 350 yield requires + b'\0'
351 351
352 352 # This is where we'll add compression in the future.
353 353 assert compression == b'UN'
354 354
355 355 progress = repo.ui.makeprogress(
356 356 _(b'bundle'), total=bytecount, unit=_(b'bytes')
357 357 )
358 358 progress.update(0)
359 359
360 360 for chunk in it:
361 361 progress.increment(step=len(chunk))
362 362 yield chunk
363 363
364 364 progress.complete()
365 365
366 366 return requirements, gen()
367 367
368 368
369 369 def consumev1(repo, fp, filecount, bytecount):
370 370 """Apply the contents from version 1 of a streaming clone file handle.
371 371
372 372 This takes the output from "stream_out" and applies it to the specified
373 373 repository.
374 374
375 375 Like "stream_out," the status line added by the wire protocol is not
376 376 handled by this function.
377 377 """
378 378 with repo.lock():
379 379 repo.ui.status(
380 380 _(b'%d files to transfer, %s of data\n')
381 381 % (filecount, util.bytecount(bytecount))
382 382 )
383 383 progress = repo.ui.makeprogress(
384 384 _(b'clone'), total=bytecount, unit=_(b'bytes')
385 385 )
386 386 progress.update(0)
387 387 start = util.timer()
388 388
389 389 # TODO: get rid of (potential) inconsistency
390 390 #
391 391 # If transaction is started and any @filecache property is
392 392 # changed at this point, it causes inconsistency between
393 393 # in-memory cached property and streamclone-ed file on the
394 394 # disk. Nested transaction prevents transaction scope "clone"
395 395 # below from writing in-memory changes out at the end of it,
396 396 # even though in-memory changes are discarded at the end of it
397 397 # regardless of transaction nesting.
398 398 #
399 399 # But transaction nesting can't be simply prohibited, because
400 400 # nesting occurs also in ordinary case (e.g. enabling
401 401 # clonebundles).
402 402
403 403 with repo.transaction(b'clone'):
404 404 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
405 405 for i in pycompat.xrange(filecount):
406 406 # XXX doesn't support '\n' or '\r' in filenames
407 407 l = fp.readline()
408 408 try:
409 409 name, size = l.split(b'\0', 1)
410 410 size = int(size)
411 411 except (ValueError, TypeError):
412 412 raise error.ResponseError(
413 413 _(b'unexpected response from remote server:'), l
414 414 )
415 415 if repo.ui.debugflag:
416 416 repo.ui.debug(
417 417 b'adding %s (%s)\n' % (name, util.bytecount(size))
418 418 )
419 419 # for backwards compat, name was partially encoded
420 420 path = store.decodedir(name)
421 421 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
422 422 for chunk in util.filechunkiter(fp, limit=size):
423 423 progress.increment(step=len(chunk))
424 424 ofp.write(chunk)
425 425
426 426 # force @filecache properties to be reloaded from
427 427 # streamclone-ed file at next access
428 428 repo.invalidate(clearfilecache=True)
429 429
430 430 elapsed = util.timer() - start
431 431 if elapsed <= 0:
432 432 elapsed = 0.001
433 433 progress.complete()
434 434 repo.ui.status(
435 435 _(b'transferred %s in %.1f seconds (%s/sec)\n')
436 436 % (
437 437 util.bytecount(bytecount),
438 438 elapsed,
439 439 util.bytecount(bytecount / elapsed),
440 440 )
441 441 )
442 442
443 443
444 444 def readbundle1header(fp):
445 445 compression = fp.read(2)
446 446 if compression != b'UN':
447 447 raise error.Abort(
448 448 _(
449 449 b'only uncompressed stream clone bundles are '
450 450 b'supported; got %s'
451 451 )
452 452 % compression
453 453 )
454 454
455 455 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
456 456 requireslen = struct.unpack(b'>H', fp.read(2))[0]
457 457 requires = fp.read(requireslen)
458 458
459 459 if not requires.endswith(b'\0'):
460 460 raise error.Abort(
461 461 _(
462 462 b'malformed stream clone bundle: '
463 463 b'requirements not properly encoded'
464 464 )
465 465 )
466 466
467 467 requirements = set(requires.rstrip(b'\0').split(b','))
468 468
469 469 return filecount, bytecount, requirements
470 470
471 471
472 472 def applybundlev1(repo, fp):
473 473 """Apply the content from a stream clone bundle version 1.
474 474
475 475 We assume the 4 byte header has been read and validated and the file handle
476 476 is at the 2 byte compression identifier.
477 477 """
478 478 if len(repo):
479 479 raise error.Abort(
480 480 _(b'cannot apply stream clone bundle on non-empty repo')
481 481 )
482 482
483 483 filecount, bytecount, requirements = readbundle1header(fp)
484 484 missingreqs = requirements - repo.supportedformats
485 485 if missingreqs:
486 486 raise error.Abort(
487 487 _(b'unable to apply stream clone: unsupported format: %s')
488 488 % b', '.join(sorted(missingreqs))
489 489 )
490 490
491 491 consumev1(repo, fp, filecount, bytecount)
492 492
493 493
494 494 class streamcloneapplier(object):
495 495 """Class to manage applying streaming clone bundles.
496 496
497 497 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
498 498 readers to perform bundle type-specific functionality.
499 499 """
500 500
501 501 def __init__(self, fh):
502 502 self._fh = fh
503 503
504 504 def apply(self, repo):
505 505 return applybundlev1(repo, self._fh)
506 506
507 507
508 508 # type of file to stream
509 509 _fileappend = 0 # append only file
510 510 _filefull = 1 # full snapshot file
511 511
512 512 # Source of the file
513 513 _srcstore = b's' # store (svfs)
514 514 _srccache = b'c' # cache (cache)
515 515
516 516 # This is it's own function so extensions can override it.
517 517 def _walkstreamfullstorefiles(repo):
518 518 """list snapshot file from the store"""
519 519 fnames = []
520 520 if not repo.publishing():
521 521 fnames.append(b'phaseroots')
522 522 return fnames
523 523
524 524
525 525 def _filterfull(entry, copy, vfsmap):
526 526 """actually copy the snapshot files"""
527 527 src, name, ftype, data = entry
528 528 if ftype != _filefull:
529 529 return entry
530 530 return (src, name, ftype, copy(vfsmap[src].join(name)))
531 531
532 532
533 533 @contextlib.contextmanager
534 534 def maketempcopies():
535 535 """return a function to temporary copy file"""
536 536 files = []
537 537 try:
538 538
539 539 def copy(src):
540 540 fd, dst = pycompat.mkstemp()
541 541 os.close(fd)
542 542 files.append(dst)
543 543 util.copyfiles(src, dst, hardlink=True)
544 544 return dst
545 545
546 546 yield copy
547 547 finally:
548 548 for tmp in files:
549 549 util.tryunlink(tmp)
550 550
551 551
552 552 def _makemap(repo):
553 553 """make a (src -> vfs) map for the repo"""
554 554 vfsmap = {
555 555 _srcstore: repo.svfs,
556 556 _srccache: repo.cachevfs,
557 557 }
558 558 # we keep repo.vfs out of the on purpose, ther are too many danger there
559 559 # (eg: .hg/hgrc)
560 560 assert repo.vfs not in vfsmap.values()
561 561
562 562 return vfsmap
563 563
564 564
565 565 def _emit2(repo, entries, totalfilesize):
566 566 """actually emit the stream bundle"""
567 567 vfsmap = _makemap(repo)
568 568 progress = repo.ui.makeprogress(
569 569 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
570 570 )
571 571 progress.update(0)
572 572 with maketempcopies() as copy, progress:
573 573 # copy is delayed until we are in the try
574 574 entries = [_filterfull(e, copy, vfsmap) for e in entries]
575 575 yield None # this release the lock on the repository
576 576 seen = 0
577 577
578 578 for src, name, ftype, data in entries:
579 579 vfs = vfsmap[src]
580 580 yield src
581 581 yield util.uvarintencode(len(name))
582 582 if ftype == _fileappend:
583 583 fp = vfs(name)
584 584 size = data
585 585 elif ftype == _filefull:
586 586 fp = open(data, b'rb')
587 587 size = util.fstat(fp).st_size
588 588 try:
589 589 yield util.uvarintencode(size)
590 590 yield name
591 591 if size <= 65536:
592 592 chunks = (fp.read(size),)
593 593 else:
594 594 chunks = util.filechunkiter(fp, limit=size)
595 595 for chunk in chunks:
596 596 seen += len(chunk)
597 597 progress.update(seen)
598 598 yield chunk
599 599 finally:
600 600 fp.close()
601 601
602 602
603 603 def _test_sync_point_walk_1(repo):
604 604 """a function for synchronisation during tests"""
605 605
606 606
607 607 def _test_sync_point_walk_2(repo):
608 608 """a function for synchronisation during tests"""
609 609
610 610
611 611 def _v2_walk(repo, includes, excludes, includeobsmarkers):
612 612 """emit a seris of files information useful to clone a repo
613 613
614 614 return (entries, totalfilesize)
615 615
616 616 entries is a list of tuple (vfs-key, file-path, file-type, size)
617 617
618 618 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
619 619 - `name`: file path of the file to copy (to be feed to the vfss)
620 620 - `file-type`: do this file need to be copied with the source lock ?
621 621 - `size`: the size of the file (or None)
622 622 """
623 623 assert repo._currentlock(repo._lockref) is not None
624 624 entries = []
625 625 totalfilesize = 0
626 626
627 627 matcher = None
628 628 if includes or excludes:
629 629 matcher = narrowspec.match(repo.root, includes, excludes)
630 630
631 631 for rl_type, name, ename, size in _walkstreamfiles(repo, matcher):
632 632 if size:
633 633 ft = _fileappend
634 634 if rl_type & store.FILEFLAGS_VOLATILE:
635 635 ft = _filefull
636 636 entries.append((_srcstore, name, ft, size))
637 637 totalfilesize += size
638 638 for name in _walkstreamfullstorefiles(repo):
639 639 if repo.svfs.exists(name):
640 640 totalfilesize += repo.svfs.lstat(name).st_size
641 641 entries.append((_srcstore, name, _filefull, None))
642 642 if includeobsmarkers and repo.svfs.exists(b'obsstore'):
643 643 totalfilesize += repo.svfs.lstat(b'obsstore').st_size
644 644 entries.append((_srcstore, b'obsstore', _filefull, None))
645 645 for name in cacheutil.cachetocopy(repo):
646 646 if repo.cachevfs.exists(name):
647 647 totalfilesize += repo.cachevfs.lstat(name).st_size
648 648 entries.append((_srccache, name, _filefull, None))
649 649 return entries, totalfilesize
650 650
651 651
652 652 def generatev2(repo, includes, excludes, includeobsmarkers):
653 653 """Emit content for version 2 of a streaming clone.
654 654
655 655 the data stream consists the following entries:
656 656 1) A char representing the file destination (eg: store or cache)
657 657 2) A varint containing the length of the filename
658 658 3) A varint containing the length of file data
659 659 4) N bytes containing the filename (the internal, store-agnostic form)
660 660 5) N bytes containing the file data
661 661
662 662 Returns a 3-tuple of (file count, file size, data iterator).
663 663 """
664 664
665 665 with repo.lock():
666 666
667 667 repo.ui.debug(b'scanning\n')
668 668
669 669 entries, totalfilesize = _v2_walk(
670 670 repo,
671 671 includes=includes,
672 672 excludes=excludes,
673 673 includeobsmarkers=includeobsmarkers,
674 674 )
675 675
676 676 chunks = _emit2(repo, entries, totalfilesize)
677 677 first = next(chunks)
678 678 assert first is None
679 679 _test_sync_point_walk_1(repo)
680 680 _test_sync_point_walk_2(repo)
681 681
682 682 return len(entries), totalfilesize, chunks
683 683
684 684
685 685 @contextlib.contextmanager
686 686 def nested(*ctxs):
687 687 this = ctxs[0]
688 688 rest = ctxs[1:]
689 689 with this:
690 690 if rest:
691 691 with nested(*rest):
692 692 yield
693 693 else:
694 694 yield
695 695
696 696
697 697 def consumev2(repo, fp, filecount, filesize):
698 698 """Apply the contents from a version 2 streaming clone.
699 699
700 700 Data is read from an object that only needs to provide a ``read(size)``
701 701 method.
702 702 """
703 703 with repo.lock():
704 704 repo.ui.status(
705 705 _(b'%d files to transfer, %s of data\n')
706 706 % (filecount, util.bytecount(filesize))
707 707 )
708 708
709 709 start = util.timer()
710 710 progress = repo.ui.makeprogress(
711 711 _(b'clone'), total=filesize, unit=_(b'bytes')
712 712 )
713 713 progress.update(0)
714 714
715 715 vfsmap = _makemap(repo)
716 716
717 717 with repo.transaction(b'clone'):
718 718 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
719 719 with nested(*ctxs):
720 720 for i in range(filecount):
721 721 src = util.readexactly(fp, 1)
722 722 vfs = vfsmap[src]
723 723 namelen = util.uvarintdecodestream(fp)
724 724 datalen = util.uvarintdecodestream(fp)
725 725
726 726 name = util.readexactly(fp, namelen)
727 727
728 728 if repo.ui.debugflag:
729 729 repo.ui.debug(
730 730 b'adding [%s] %s (%s)\n'
731 731 % (src, name, util.bytecount(datalen))
732 732 )
733 733
734 734 with vfs(name, b'w') as ofp:
735 735 for chunk in util.filechunkiter(fp, limit=datalen):
736 736 progress.increment(step=len(chunk))
737 737 ofp.write(chunk)
738 738
739 739 # force @filecache properties to be reloaded from
740 740 # streamclone-ed file at next access
741 741 repo.invalidate(clearfilecache=True)
742 742
743 743 elapsed = util.timer() - start
744 744 if elapsed <= 0:
745 745 elapsed = 0.001
746 746 repo.ui.status(
747 747 _(b'transferred %s in %.1f seconds (%s/sec)\n')
748 748 % (
749 749 util.bytecount(progress.pos),
750 750 elapsed,
751 751 util.bytecount(progress.pos / elapsed),
752 752 )
753 753 )
754 754 progress.complete()
755 755
756 756
757 757 def applybundlev2(repo, fp, filecount, filesize, requirements):
758 758 from . import localrepo
759 759
760 760 missingreqs = [r for r in requirements if r not in repo.supported]
761 761 if missingreqs:
762 762 raise error.Abort(
763 763 _(b'unable to apply stream clone: unsupported format: %s')
764 764 % b', '.join(sorted(missingreqs))
765 765 )
766 766
767 767 consumev2(repo, fp, filecount, filesize)
768 768
769 769 # new requirements = old non-format requirements +
770 770 # new format-related remote requirements
771 771 # requirements from the streamed-in repository
772 772 repo.requirements = set(requirements) | (
773 773 repo.requirements - repo.supportedformats
774 774 )
775 775 repo.svfs.options = localrepo.resolvestorevfsoptions(
776 776 repo.ui, repo.requirements, repo.features
777 777 )
778 778 scmutil.writereporequirements(repo)
779 779
780 780
781 781 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
782 782 hardlink = [True]
783 783
784 784 def copy_used():
785 785 hardlink[0] = False
786 786 progress.topic = _(b'copying')
787 787
788 788 for k, path, size in entries:
789 789 src_vfs = src_vfs_map[k]
790 790 dst_vfs = dst_vfs_map[k]
791 791 src_path = src_vfs.join(path)
792 792 dst_path = dst_vfs.join(path)
793 793 dirname = dst_vfs.dirname(path)
794 794 if not dst_vfs.exists(dirname):
795 795 dst_vfs.makedirs(dirname)
796 796 dst_vfs.register_file(path)
797 797 # XXX we could use the #nb_bytes argument.
798 798 util.copyfile(
799 799 src_path,
800 800 dst_path,
801 801 hardlink=hardlink[0],
802 802 no_hardlink_cb=copy_used,
803 803 check_fs_hardlink=False,
804 804 )
805 805 progress.increment()
806 806 return hardlink[0]
807 807
808 808
809 809 def local_copy(src_repo, dest_repo):
810 810 """copy all content from one local repository to another
811 811
812 812 This is useful for local clone"""
813 813 src_store_requirements = {
814 814 r
815 815 for r in src_repo.requirements
816 816 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
817 817 }
818 818 dest_store_requirements = {
819 819 r
820 820 for r in dest_repo.requirements
821 821 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
822 822 }
823 823 assert src_store_requirements == dest_store_requirements
824 824
825 825 with dest_repo.lock():
826 826 with src_repo.lock():
827
828 # bookmark is not integrated to the streaming as it might use the
829 # `repo.vfs` and they are too many sentitive data accessible
830 # through `repo.vfs` to expose it to streaming clone.
831 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
832 srcbookmarks = src_book_vfs.join(b'bookmarks')
833 bm_count = 0
834 if os.path.exists(srcbookmarks):
835 bm_count = 1
836
827 837 entries, totalfilesize = _v2_walk(
828 838 src_repo,
829 839 includes=None,
830 840 excludes=None,
831 841 includeobsmarkers=True,
832 842 )
833 843 src_vfs_map = _makemap(src_repo)
834 844 dest_vfs_map = _makemap(dest_repo)
835 845 progress = src_repo.ui.makeprogress(
836 846 topic=_(b'linking'),
837 total=len(entries),
847 total=len(entries) + bm_count,
838 848 unit=_(b'files'),
839 849 )
840 850 # copy files
841 851 #
842 852 # We could copy the full file while the source repository is locked
843 853 # and the other one without the lock. However, in the linking case,
844 854 # this would also requires checks that nobody is appending any data
845 855 # to the files while we do the clone, so this is not done yet. We
846 856 # could do this blindly when copying files.
847 857 files = ((k, path, size) for k, path, ftype, size in entries)
848 858 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
849 859
850 860 # copy bookmarks over
851 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
852 srcbookmarks = src_book_vfs.join(b'bookmarks')
853 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
854 dstbookmarks = dst_book_vfs.join(b'bookmarks')
855 if os.path.exists(srcbookmarks):
861 if bm_count:
862 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
863 dstbookmarks = dst_book_vfs.join(b'bookmarks')
856 864 util.copyfile(srcbookmarks, dstbookmarks)
857 865 progress.complete()
858 866 if hardlink:
859 867 msg = b'linked %d files\n'
860 868 else:
861 869 msg = b'copied %d files\n'
862 src_repo.ui.debug(msg % len(entries))
870 src_repo.ui.debug(msg % (len(entries) + bm_count))
863 871
864 872 with dest_repo.transaction(b"localclone") as tr:
865 873 dest_repo.store.write(tr)
866 874
867 875 # clean up transaction file as they do not make sense
868 876 undo_files = [(dest_repo.svfs, b'undo.backupfiles')]
869 877 undo_files.extend(dest_repo.undofiles())
870 878 for undovfs, undofile in undo_files:
871 879 try:
872 880 undovfs.unlink(undofile)
873 881 except OSError as e:
874 882 if e.errno != errno.ENOENT:
875 883 msg = _(b'error removing %s: %s\n')
876 884 path = undovfs.join(undofile)
877 885 e_msg = stringutil.forcebytestr(e)
878 886 msg %= (path, e_msg)
879 887 dest_repo.ui.warn(msg)
General Comments 0
You need to be logged in to leave comments. Login now