##// END OF EJS Templates
stream: double check that self.vfs is *not* in the vfsmap...
marmoute -
r48353:65c51966 stable
parent child Browse files
Show More
@@ -1,750 +1,769 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import contextlib
11 11 import os
12 12 import struct
13 13
14 14 from .i18n import _
15 15 from .pycompat import open
16 16 from .interfaces import repository
17 17 from . import (
18 18 cacheutil,
19 19 error,
20 20 narrowspec,
21 21 phases,
22 22 pycompat,
23 23 requirements as requirementsmod,
24 24 scmutil,
25 25 store,
26 26 util,
27 27 )
28 28
29 29
30 30 def canperformstreamclone(pullop, bundle2=False):
31 31 """Whether it is possible to perform a streaming clone as part of pull.
32 32
33 33 ``bundle2`` will cause the function to consider stream clone through
34 34 bundle2 and only through bundle2.
35 35
36 36 Returns a tuple of (supported, requirements). ``supported`` is True if
37 37 streaming clone is supported and False otherwise. ``requirements`` is
38 38 a set of repo requirements from the remote, or ``None`` if stream clone
39 39 isn't supported.
40 40 """
41 41 repo = pullop.repo
42 42 remote = pullop.remote
43 43
44 44 bundle2supported = False
45 45 if pullop.canusebundle2:
46 46 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
47 47 bundle2supported = True
48 48 # else
49 49 # Server doesn't support bundle2 stream clone or doesn't support
50 50 # the versions we support. Fall back and possibly allow legacy.
51 51
52 52 # Ensures legacy code path uses available bundle2.
53 53 if bundle2supported and not bundle2:
54 54 return False, None
55 55 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
56 56 elif bundle2 and not bundle2supported:
57 57 return False, None
58 58
59 59 # Streaming clone only works on empty repositories.
60 60 if len(repo):
61 61 return False, None
62 62
63 63 # Streaming clone only works if all data is being requested.
64 64 if pullop.heads:
65 65 return False, None
66 66
67 67 streamrequested = pullop.streamclonerequested
68 68
69 69 # If we don't have a preference, let the server decide for us. This
70 70 # likely only comes into play in LANs.
71 71 if streamrequested is None:
72 72 # The server can advertise whether to prefer streaming clone.
73 73 streamrequested = remote.capable(b'stream-preferred')
74 74
75 75 if not streamrequested:
76 76 return False, None
77 77
78 78 # In order for stream clone to work, the client has to support all the
79 79 # requirements advertised by the server.
80 80 #
81 81 # The server advertises its requirements via the "stream" and "streamreqs"
82 82 # capability. "stream" (a value-less capability) is advertised if and only
83 83 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
84 84 # is advertised and contains a comma-delimited list of requirements.
85 85 requirements = set()
86 86 if remote.capable(b'stream'):
87 87 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
88 88 else:
89 89 streamreqs = remote.capable(b'streamreqs')
90 90 # This is weird and shouldn't happen with modern servers.
91 91 if not streamreqs:
92 92 pullop.repo.ui.warn(
93 93 _(
94 94 b'warning: stream clone requested but server has them '
95 95 b'disabled\n'
96 96 )
97 97 )
98 98 return False, None
99 99
100 100 streamreqs = set(streamreqs.split(b','))
101 101 # Server requires something we don't support. Bail.
102 102 missingreqs = streamreqs - repo.supportedformats
103 103 if missingreqs:
104 104 pullop.repo.ui.warn(
105 105 _(
106 106 b'warning: stream clone requested but client is missing '
107 107 b'requirements: %s\n'
108 108 )
109 109 % b', '.join(sorted(missingreqs))
110 110 )
111 111 pullop.repo.ui.warn(
112 112 _(
113 113 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
114 114 b'for more information)\n'
115 115 )
116 116 )
117 117 return False, None
118 118 requirements = streamreqs
119 119
120 120 return True, requirements
121 121
122 122
123 123 def maybeperformlegacystreamclone(pullop):
124 124 """Possibly perform a legacy stream clone operation.
125 125
126 126 Legacy stream clones are performed as part of pull but before all other
127 127 operations.
128 128
129 129 A legacy stream clone will not be performed if a bundle2 stream clone is
130 130 supported.
131 131 """
132 132 from . import localrepo
133 133
134 134 supported, requirements = canperformstreamclone(pullop)
135 135
136 136 if not supported:
137 137 return
138 138
139 139 repo = pullop.repo
140 140 remote = pullop.remote
141 141
142 142 # Save remote branchmap. We will use it later to speed up branchcache
143 143 # creation.
144 144 rbranchmap = None
145 145 if remote.capable(b'branchmap'):
146 146 with remote.commandexecutor() as e:
147 147 rbranchmap = e.callcommand(b'branchmap', {}).result()
148 148
149 149 repo.ui.status(_(b'streaming all changes\n'))
150 150
151 151 with remote.commandexecutor() as e:
152 152 fp = e.callcommand(b'stream_out', {}).result()
153 153
154 154 # TODO strictly speaking, this code should all be inside the context
155 155 # manager because the context manager is supposed to ensure all wire state
156 156 # is flushed when exiting. But the legacy peers don't do this, so it
157 157 # doesn't matter.
158 158 l = fp.readline()
159 159 try:
160 160 resp = int(l)
161 161 except ValueError:
162 162 raise error.ResponseError(
163 163 _(b'unexpected response from remote server:'), l
164 164 )
165 165 if resp == 1:
166 166 raise error.Abort(_(b'operation forbidden by server'))
167 167 elif resp == 2:
168 168 raise error.Abort(_(b'locking the remote repository failed'))
169 169 elif resp != 0:
170 170 raise error.Abort(_(b'the server sent an unknown error code'))
171 171
172 172 l = fp.readline()
173 173 try:
174 174 filecount, bytecount = map(int, l.split(b' ', 1))
175 175 except (ValueError, TypeError):
176 176 raise error.ResponseError(
177 177 _(b'unexpected response from remote server:'), l
178 178 )
179 179
180 180 with repo.lock():
181 181 consumev1(repo, fp, filecount, bytecount)
182 182
183 183 # new requirements = old non-format requirements +
184 184 # new format-related remote requirements
185 185 # requirements from the streamed-in repository
186 186 repo.requirements = requirements | (
187 187 repo.requirements - repo.supportedformats
188 188 )
189 189 repo.svfs.options = localrepo.resolvestorevfsoptions(
190 190 repo.ui, repo.requirements, repo.features
191 191 )
192 192 scmutil.writereporequirements(repo)
193 193
194 194 if rbranchmap:
195 195 repo._branchcaches.replace(repo, rbranchmap)
196 196
197 197 repo.invalidate()
198 198
199 199
200 200 def allowservergeneration(repo):
201 201 """Whether streaming clones are allowed from the server."""
202 202 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
203 203 return False
204 204
205 205 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
206 206 return False
207 207
208 208 # The way stream clone works makes it impossible to hide secret changesets.
209 209 # So don't allow this by default.
210 210 secret = phases.hassecret(repo)
211 211 if secret:
212 212 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
213 213
214 214 return True
215 215
216 216
217 217 # This is it's own function so extensions can override it.
218 218 def _walkstreamfiles(repo, matcher=None):
219 219 return repo.store.walk(matcher)
220 220
221 221
222 222 def generatev1(repo):
223 223 """Emit content for version 1 of a streaming clone.
224 224
225 225 This returns a 3-tuple of (file count, byte size, data iterator).
226 226
227 227 The data iterator consists of N entries for each file being transferred.
228 228 Each file entry starts as a line with the file name and integer size
229 229 delimited by a null byte.
230 230
231 231 The raw file data follows. Following the raw file data is the next file
232 232 entry, or EOF.
233 233
234 234 When used on the wire protocol, an additional line indicating protocol
235 235 success will be prepended to the stream. This function is not responsible
236 236 for adding it.
237 237
238 238 This function will obtain a repository lock to ensure a consistent view of
239 239 the store is captured. It therefore may raise LockError.
240 240 """
241 241 entries = []
242 242 total_bytes = 0
243 243 # Get consistent snapshot of repo, lock during scan.
244 244 with repo.lock():
245 245 repo.ui.debug(b'scanning\n')
246 246 for file_type, name, ename, size in _walkstreamfiles(repo):
247 247 if size:
248 248 entries.append((name, size))
249 249 total_bytes += size
250 250 _test_sync_point_walk_1(repo)
251 251 _test_sync_point_walk_2(repo)
252 252
253 253 repo.ui.debug(
254 254 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
255 255 )
256 256
257 257 svfs = repo.svfs
258 258 debugflag = repo.ui.debugflag
259 259
260 260 def emitrevlogdata():
261 261 for name, size in entries:
262 262 if debugflag:
263 263 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
264 264 # partially encode name over the wire for backwards compat
265 265 yield b'%s\0%d\n' % (store.encodedir(name), size)
266 266 # auditing at this stage is both pointless (paths are already
267 267 # trusted by the local repo) and expensive
268 268 with svfs(name, b'rb', auditpath=False) as fp:
269 269 if size <= 65536:
270 270 yield fp.read(size)
271 271 else:
272 272 for chunk in util.filechunkiter(fp, limit=size):
273 273 yield chunk
274 274
275 275 return len(entries), total_bytes, emitrevlogdata()
276 276
277 277
278 278 def generatev1wireproto(repo):
279 279 """Emit content for version 1 of streaming clone suitable for the wire.
280 280
281 281 This is the data output from ``generatev1()`` with 2 header lines. The
282 282 first line indicates overall success. The 2nd contains the file count and
283 283 byte size of payload.
284 284
285 285 The success line contains "0" for success, "1" for stream generation not
286 286 allowed, and "2" for error locking the repository (possibly indicating
287 287 a permissions error for the server process).
288 288 """
289 289 if not allowservergeneration(repo):
290 290 yield b'1\n'
291 291 return
292 292
293 293 try:
294 294 filecount, bytecount, it = generatev1(repo)
295 295 except error.LockError:
296 296 yield b'2\n'
297 297 return
298 298
299 299 # Indicates successful response.
300 300 yield b'0\n'
301 301 yield b'%d %d\n' % (filecount, bytecount)
302 302 for chunk in it:
303 303 yield chunk
304 304
305 305
306 306 def generatebundlev1(repo, compression=b'UN'):
307 307 """Emit content for version 1 of a stream clone bundle.
308 308
309 309 The first 4 bytes of the output ("HGS1") denote this as stream clone
310 310 bundle version 1.
311 311
312 312 The next 2 bytes indicate the compression type. Only "UN" is currently
313 313 supported.
314 314
315 315 The next 16 bytes are two 64-bit big endian unsigned integers indicating
316 316 file count and byte count, respectively.
317 317
318 318 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
319 319 of the requirements string, including a trailing \0. The following N bytes
320 320 are the requirements string, which is ASCII containing a comma-delimited
321 321 list of repo requirements that are needed to support the data.
322 322
323 323 The remaining content is the output of ``generatev1()`` (which may be
324 324 compressed in the future).
325 325
326 326 Returns a tuple of (requirements, data generator).
327 327 """
328 328 if compression != b'UN':
329 329 raise ValueError(b'we do not support the compression argument yet')
330 330
331 331 requirements = repo.requirements & repo.supportedformats
332 332 requires = b','.join(sorted(requirements))
333 333
334 334 def gen():
335 335 yield b'HGS1'
336 336 yield compression
337 337
338 338 filecount, bytecount, it = generatev1(repo)
339 339 repo.ui.status(
340 340 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
341 341 )
342 342
343 343 yield struct.pack(b'>QQ', filecount, bytecount)
344 344 yield struct.pack(b'>H', len(requires) + 1)
345 345 yield requires + b'\0'
346 346
347 347 # This is where we'll add compression in the future.
348 348 assert compression == b'UN'
349 349
350 350 progress = repo.ui.makeprogress(
351 351 _(b'bundle'), total=bytecount, unit=_(b'bytes')
352 352 )
353 353 progress.update(0)
354 354
355 355 for chunk in it:
356 356 progress.increment(step=len(chunk))
357 357 yield chunk
358 358
359 359 progress.complete()
360 360
361 361 return requirements, gen()
362 362
363 363
364 364 def consumev1(repo, fp, filecount, bytecount):
365 365 """Apply the contents from version 1 of a streaming clone file handle.
366 366
367 367 This takes the output from "stream_out" and applies it to the specified
368 368 repository.
369 369
370 370 Like "stream_out," the status line added by the wire protocol is not
371 371 handled by this function.
372 372 """
373 373 with repo.lock():
374 374 repo.ui.status(
375 375 _(b'%d files to transfer, %s of data\n')
376 376 % (filecount, util.bytecount(bytecount))
377 377 )
378 378 progress = repo.ui.makeprogress(
379 379 _(b'clone'), total=bytecount, unit=_(b'bytes')
380 380 )
381 381 progress.update(0)
382 382 start = util.timer()
383 383
384 384 # TODO: get rid of (potential) inconsistency
385 385 #
386 386 # If transaction is started and any @filecache property is
387 387 # changed at this point, it causes inconsistency between
388 388 # in-memory cached property and streamclone-ed file on the
389 389 # disk. Nested transaction prevents transaction scope "clone"
390 390 # below from writing in-memory changes out at the end of it,
391 391 # even though in-memory changes are discarded at the end of it
392 392 # regardless of transaction nesting.
393 393 #
394 394 # But transaction nesting can't be simply prohibited, because
395 395 # nesting occurs also in ordinary case (e.g. enabling
396 396 # clonebundles).
397 397
398 398 with repo.transaction(b'clone'):
399 399 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
400 400 for i in pycompat.xrange(filecount):
401 401 # XXX doesn't support '\n' or '\r' in filenames
402 402 l = fp.readline()
403 403 try:
404 404 name, size = l.split(b'\0', 1)
405 405 size = int(size)
406 406 except (ValueError, TypeError):
407 407 raise error.ResponseError(
408 408 _(b'unexpected response from remote server:'), l
409 409 )
410 410 if repo.ui.debugflag:
411 411 repo.ui.debug(
412 412 b'adding %s (%s)\n' % (name, util.bytecount(size))
413 413 )
414 414 # for backwards compat, name was partially encoded
415 415 path = store.decodedir(name)
416 416 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
417 417 for chunk in util.filechunkiter(fp, limit=size):
418 418 progress.increment(step=len(chunk))
419 419 ofp.write(chunk)
420 420
421 421 # force @filecache properties to be reloaded from
422 422 # streamclone-ed file at next access
423 423 repo.invalidate(clearfilecache=True)
424 424
425 425 elapsed = util.timer() - start
426 426 if elapsed <= 0:
427 427 elapsed = 0.001
428 428 progress.complete()
429 429 repo.ui.status(
430 430 _(b'transferred %s in %.1f seconds (%s/sec)\n')
431 431 % (
432 432 util.bytecount(bytecount),
433 433 elapsed,
434 434 util.bytecount(bytecount / elapsed),
435 435 )
436 436 )
437 437
438 438
439 439 def readbundle1header(fp):
440 440 compression = fp.read(2)
441 441 if compression != b'UN':
442 442 raise error.Abort(
443 443 _(
444 444 b'only uncompressed stream clone bundles are '
445 445 b'supported; got %s'
446 446 )
447 447 % compression
448 448 )
449 449
450 450 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
451 451 requireslen = struct.unpack(b'>H', fp.read(2))[0]
452 452 requires = fp.read(requireslen)
453 453
454 454 if not requires.endswith(b'\0'):
455 455 raise error.Abort(
456 456 _(
457 457 b'malformed stream clone bundle: '
458 458 b'requirements not properly encoded'
459 459 )
460 460 )
461 461
462 462 requirements = set(requires.rstrip(b'\0').split(b','))
463 463
464 464 return filecount, bytecount, requirements
465 465
466 466
467 467 def applybundlev1(repo, fp):
468 468 """Apply the content from a stream clone bundle version 1.
469 469
470 470 We assume the 4 byte header has been read and validated and the file handle
471 471 is at the 2 byte compression identifier.
472 472 """
473 473 if len(repo):
474 474 raise error.Abort(
475 475 _(b'cannot apply stream clone bundle on non-empty repo')
476 476 )
477 477
478 478 filecount, bytecount, requirements = readbundle1header(fp)
479 479 missingreqs = requirements - repo.supportedformats
480 480 if missingreqs:
481 481 raise error.Abort(
482 482 _(b'unable to apply stream clone: unsupported format: %s')
483 483 % b', '.join(sorted(missingreqs))
484 484 )
485 485
486 486 consumev1(repo, fp, filecount, bytecount)
487 487
488 488
489 489 class streamcloneapplier(object):
490 490 """Class to manage applying streaming clone bundles.
491 491
492 492 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
493 493 readers to perform bundle type-specific functionality.
494 494 """
495 495
496 496 def __init__(self, fh):
497 497 self._fh = fh
498 498
499 499 def apply(self, repo):
500 500 return applybundlev1(repo, self._fh)
501 501
502 502
503 503 # type of file to stream
504 504 _fileappend = 0 # append only file
505 505 _filefull = 1 # full snapshot file
506 506
507 507 # Source of the file
508 508 _srcstore = b's' # store (svfs)
509 509 _srccache = b'c' # cache (cache)
510 510
511 511 # This is it's own function so extensions can override it.
512 512 def _walkstreamfullstorefiles(repo):
513 513 """list snapshot file from the store"""
514 514 fnames = []
515 515 if not repo.publishing():
516 516 fnames.append(b'phaseroots')
517 517 return fnames
518 518
519 519
520 520 def _filterfull(entry, copy, vfsmap):
521 521 """actually copy the snapshot files"""
522 522 src, name, ftype, data = entry
523 523 if ftype != _filefull:
524 524 return entry
525 525 return (src, name, ftype, copy(vfsmap[src].join(name)))
526 526
527 527
528 528 @contextlib.contextmanager
529 529 def maketempcopies():
530 530 """return a function to temporary copy file"""
531 531 files = []
532 532 try:
533 533
534 534 def copy(src):
535 535 fd, dst = pycompat.mkstemp()
536 536 os.close(fd)
537 537 files.append(dst)
538 538 util.copyfiles(src, dst, hardlink=True)
539 539 return dst
540 540
541 541 yield copy
542 542 finally:
543 543 for tmp in files:
544 544 util.tryunlink(tmp)
545 545
546 546
547 547 def _makemap(repo):
548 548 """make a (src -> vfs) map for the repo"""
549 549 vfsmap = {
550 550 _srcstore: repo.svfs,
551 551 _srccache: repo.cachevfs,
552 552 }
553 553 # we keep repo.vfs out of the on purpose, ther are too many danger there
554 554 # (eg: .hg/hgrc)
555 555 assert repo.vfs not in vfsmap.values()
556 556
557 557 return vfsmap
558 558
559 559
560 560 def _emit2(repo, entries, totalfilesize):
561 561 """actually emit the stream bundle"""
562 562 vfsmap = _makemap(repo)
563 # we keep repo.vfs out of the on purpose, ther are too many danger there
564 # (eg: .hg/hgrc),
565 #
566 # this assert is duplicated (from _makemap) as author might think this is
567 # fine, while this is really not fine.
568 if repo.vfs in vfsmap.values():
569 raise error.ProgrammingError(
570 b'repo.vfs must not be added to vfsmap for security reasons'
571 )
572
563 573 progress = repo.ui.makeprogress(
564 574 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
565 575 )
566 576 progress.update(0)
567 577 with maketempcopies() as copy, progress:
568 578 # copy is delayed until we are in the try
569 579 entries = [_filterfull(e, copy, vfsmap) for e in entries]
570 580 yield None # this release the lock on the repository
571 581 seen = 0
572 582
573 583 for src, name, ftype, data in entries:
574 584 vfs = vfsmap[src]
575 585 yield src
576 586 yield util.uvarintencode(len(name))
577 587 if ftype == _fileappend:
578 588 fp = vfs(name)
579 589 size = data
580 590 elif ftype == _filefull:
581 591 fp = open(data, b'rb')
582 592 size = util.fstat(fp).st_size
583 593 try:
584 594 yield util.uvarintencode(size)
585 595 yield name
586 596 if size <= 65536:
587 597 chunks = (fp.read(size),)
588 598 else:
589 599 chunks = util.filechunkiter(fp, limit=size)
590 600 for chunk in chunks:
591 601 seen += len(chunk)
592 602 progress.update(seen)
593 603 yield chunk
594 604 finally:
595 605 fp.close()
596 606
597 607
598 608 def _test_sync_point_walk_1(repo):
599 609 """a function for synchronisation during tests"""
600 610
601 611
602 612 def _test_sync_point_walk_2(repo):
603 613 """a function for synchronisation during tests"""
604 614
605 615
606 616 def generatev2(repo, includes, excludes, includeobsmarkers):
607 617 """Emit content for version 2 of a streaming clone.
608 618
609 619 the data stream consists the following entries:
610 620 1) A char representing the file destination (eg: store or cache)
611 621 2) A varint containing the length of the filename
612 622 3) A varint containing the length of file data
613 623 4) N bytes containing the filename (the internal, store-agnostic form)
614 624 5) N bytes containing the file data
615 625
616 626 Returns a 3-tuple of (file count, file size, data iterator).
617 627 """
618 628
619 629 with repo.lock():
620 630
621 631 entries = []
622 632 totalfilesize = 0
623 633
624 634 matcher = None
625 635 if includes or excludes:
626 636 matcher = narrowspec.match(repo.root, includes, excludes)
627 637
628 638 repo.ui.debug(b'scanning\n')
629 639 for rl_type, name, ename, size in _walkstreamfiles(repo, matcher):
630 640 if size:
631 641 ft = _fileappend
632 642 if rl_type & store.FILEFLAGS_VOLATILE:
633 643 ft = _filefull
634 644 entries.append((_srcstore, name, ft, size))
635 645 totalfilesize += size
636 646 for name in _walkstreamfullstorefiles(repo):
637 647 if repo.svfs.exists(name):
638 648 totalfilesize += repo.svfs.lstat(name).st_size
639 649 entries.append((_srcstore, name, _filefull, None))
640 650 if includeobsmarkers and repo.svfs.exists(b'obsstore'):
641 651 totalfilesize += repo.svfs.lstat(b'obsstore').st_size
642 652 entries.append((_srcstore, b'obsstore', _filefull, None))
643 653 for name in cacheutil.cachetocopy(repo):
644 654 if repo.cachevfs.exists(name):
645 655 totalfilesize += repo.cachevfs.lstat(name).st_size
646 656 entries.append((_srccache, name, _filefull, None))
647 657
648 658 chunks = _emit2(repo, entries, totalfilesize)
649 659 first = next(chunks)
650 660 assert first is None
651 661 _test_sync_point_walk_1(repo)
652 662 _test_sync_point_walk_2(repo)
653 663
654 664 return len(entries), totalfilesize, chunks
655 665
656 666
657 667 @contextlib.contextmanager
658 668 def nested(*ctxs):
659 669 this = ctxs[0]
660 670 rest = ctxs[1:]
661 671 with this:
662 672 if rest:
663 673 with nested(*rest):
664 674 yield
665 675 else:
666 676 yield
667 677
668 678
669 679 def consumev2(repo, fp, filecount, filesize):
670 680 """Apply the contents from a version 2 streaming clone.
671 681
672 682 Data is read from an object that only needs to provide a ``read(size)``
673 683 method.
674 684 """
675 685 with repo.lock():
676 686 repo.ui.status(
677 687 _(b'%d files to transfer, %s of data\n')
678 688 % (filecount, util.bytecount(filesize))
679 689 )
680 690
681 691 start = util.timer()
682 692 progress = repo.ui.makeprogress(
683 693 _(b'clone'), total=filesize, unit=_(b'bytes')
684 694 )
685 695 progress.update(0)
686 696
687 697 vfsmap = _makemap(repo)
698 # we keep repo.vfs out of the on purpose, ther are too many danger
699 # there (eg: .hg/hgrc),
700 #
701 # this assert is duplicated (from _makemap) as author might think this
702 # is fine, while this is really not fine.
703 if repo.vfs in vfsmap.values():
704 raise error.ProgrammingError(
705 b'repo.vfs must not be added to vfsmap for security reasons'
706 )
688 707
689 708 with repo.transaction(b'clone'):
690 709 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
691 710 with nested(*ctxs):
692 711 for i in range(filecount):
693 712 src = util.readexactly(fp, 1)
694 713 vfs = vfsmap[src]
695 714 namelen = util.uvarintdecodestream(fp)
696 715 datalen = util.uvarintdecodestream(fp)
697 716
698 717 name = util.readexactly(fp, namelen)
699 718
700 719 if repo.ui.debugflag:
701 720 repo.ui.debug(
702 721 b'adding [%s] %s (%s)\n'
703 722 % (src, name, util.bytecount(datalen))
704 723 )
705 724
706 725 with vfs(name, b'w') as ofp:
707 726 for chunk in util.filechunkiter(fp, limit=datalen):
708 727 progress.increment(step=len(chunk))
709 728 ofp.write(chunk)
710 729
711 730 # force @filecache properties to be reloaded from
712 731 # streamclone-ed file at next access
713 732 repo.invalidate(clearfilecache=True)
714 733
715 734 elapsed = util.timer() - start
716 735 if elapsed <= 0:
717 736 elapsed = 0.001
718 737 repo.ui.status(
719 738 _(b'transferred %s in %.1f seconds (%s/sec)\n')
720 739 % (
721 740 util.bytecount(progress.pos),
722 741 elapsed,
723 742 util.bytecount(progress.pos / elapsed),
724 743 )
725 744 )
726 745 progress.complete()
727 746
728 747
729 748 def applybundlev2(repo, fp, filecount, filesize, requirements):
730 749 from . import localrepo
731 750
732 751 missingreqs = [r for r in requirements if r not in repo.supported]
733 752 if missingreqs:
734 753 raise error.Abort(
735 754 _(b'unable to apply stream clone: unsupported format: %s')
736 755 % b', '.join(sorted(missingreqs))
737 756 )
738 757
739 758 consumev2(repo, fp, filecount, filesize)
740 759
741 760 # new requirements = old non-format requirements +
742 761 # new format-related remote requirements
743 762 # requirements from the streamed-in repository
744 763 repo.requirements = set(requirements) | (
745 764 repo.requirements - repo.supportedformats
746 765 )
747 766 repo.svfs.options = localrepo.resolvestorevfsoptions(
748 767 repo.ui, repo.requirements, repo.features
749 768 )
750 769 scmutil.writereporequirements(repo)
General Comments 0
You need to be logged in to leave comments. Login now