##// END OF EJS Templates
streamingclone: extract the scanning part from the generation part...
marmoute -
r48237:2f4ca480 default
parent child Browse files
Show More
@@ -1,750 +1,773
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import contextlib
11 11 import os
12 12 import struct
13 13
14 14 from .i18n import _
15 15 from .pycompat import open
16 16 from .interfaces import repository
17 17 from . import (
18 18 cacheutil,
19 19 error,
20 20 narrowspec,
21 21 phases,
22 22 pycompat,
23 23 requirements as requirementsmod,
24 24 scmutil,
25 25 store,
26 26 util,
27 27 )
28 28
29 29
30 30 def canperformstreamclone(pullop, bundle2=False):
31 31 """Whether it is possible to perform a streaming clone as part of pull.
32 32
33 33 ``bundle2`` will cause the function to consider stream clone through
34 34 bundle2 and only through bundle2.
35 35
36 36 Returns a tuple of (supported, requirements). ``supported`` is True if
37 37 streaming clone is supported and False otherwise. ``requirements`` is
38 38 a set of repo requirements from the remote, or ``None`` if stream clone
39 39 isn't supported.
40 40 """
41 41 repo = pullop.repo
42 42 remote = pullop.remote
43 43
44 44 bundle2supported = False
45 45 if pullop.canusebundle2:
46 46 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
47 47 bundle2supported = True
48 48 # else
49 49 # Server doesn't support bundle2 stream clone or doesn't support
50 50 # the versions we support. Fall back and possibly allow legacy.
51 51
52 52 # Ensures legacy code path uses available bundle2.
53 53 if bundle2supported and not bundle2:
54 54 return False, None
55 55 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
56 56 elif bundle2 and not bundle2supported:
57 57 return False, None
58 58
59 59 # Streaming clone only works on empty repositories.
60 60 if len(repo):
61 61 return False, None
62 62
63 63 # Streaming clone only works if all data is being requested.
64 64 if pullop.heads:
65 65 return False, None
66 66
67 67 streamrequested = pullop.streamclonerequested
68 68
69 69 # If we don't have a preference, let the server decide for us. This
70 70 # likely only comes into play in LANs.
71 71 if streamrequested is None:
72 72 # The server can advertise whether to prefer streaming clone.
73 73 streamrequested = remote.capable(b'stream-preferred')
74 74
75 75 if not streamrequested:
76 76 return False, None
77 77
78 78 # In order for stream clone to work, the client has to support all the
79 79 # requirements advertised by the server.
80 80 #
81 81 # The server advertises its requirements via the "stream" and "streamreqs"
82 82 # capability. "stream" (a value-less capability) is advertised if and only
83 83 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
84 84 # is advertised and contains a comma-delimited list of requirements.
85 85 requirements = set()
86 86 if remote.capable(b'stream'):
87 87 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
88 88 else:
89 89 streamreqs = remote.capable(b'streamreqs')
90 90 # This is weird and shouldn't happen with modern servers.
91 91 if not streamreqs:
92 92 pullop.repo.ui.warn(
93 93 _(
94 94 b'warning: stream clone requested but server has them '
95 95 b'disabled\n'
96 96 )
97 97 )
98 98 return False, None
99 99
100 100 streamreqs = set(streamreqs.split(b','))
101 101 # Server requires something we don't support. Bail.
102 102 missingreqs = streamreqs - repo.supportedformats
103 103 if missingreqs:
104 104 pullop.repo.ui.warn(
105 105 _(
106 106 b'warning: stream clone requested but client is missing '
107 107 b'requirements: %s\n'
108 108 )
109 109 % b', '.join(sorted(missingreqs))
110 110 )
111 111 pullop.repo.ui.warn(
112 112 _(
113 113 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
114 114 b'for more information)\n'
115 115 )
116 116 )
117 117 return False, None
118 118 requirements = streamreqs
119 119
120 120 return True, requirements
121 121
122 122
123 123 def maybeperformlegacystreamclone(pullop):
124 124 """Possibly perform a legacy stream clone operation.
125 125
126 126 Legacy stream clones are performed as part of pull but before all other
127 127 operations.
128 128
129 129 A legacy stream clone will not be performed if a bundle2 stream clone is
130 130 supported.
131 131 """
132 132 from . import localrepo
133 133
134 134 supported, requirements = canperformstreamclone(pullop)
135 135
136 136 if not supported:
137 137 return
138 138
139 139 repo = pullop.repo
140 140 remote = pullop.remote
141 141
142 142 # Save remote branchmap. We will use it later to speed up branchcache
143 143 # creation.
144 144 rbranchmap = None
145 145 if remote.capable(b'branchmap'):
146 146 with remote.commandexecutor() as e:
147 147 rbranchmap = e.callcommand(b'branchmap', {}).result()
148 148
149 149 repo.ui.status(_(b'streaming all changes\n'))
150 150
151 151 with remote.commandexecutor() as e:
152 152 fp = e.callcommand(b'stream_out', {}).result()
153 153
154 154 # TODO strictly speaking, this code should all be inside the context
155 155 # manager because the context manager is supposed to ensure all wire state
156 156 # is flushed when exiting. But the legacy peers don't do this, so it
157 157 # doesn't matter.
158 158 l = fp.readline()
159 159 try:
160 160 resp = int(l)
161 161 except ValueError:
162 162 raise error.ResponseError(
163 163 _(b'unexpected response from remote server:'), l
164 164 )
165 165 if resp == 1:
166 166 raise error.Abort(_(b'operation forbidden by server'))
167 167 elif resp == 2:
168 168 raise error.Abort(_(b'locking the remote repository failed'))
169 169 elif resp != 0:
170 170 raise error.Abort(_(b'the server sent an unknown error code'))
171 171
172 172 l = fp.readline()
173 173 try:
174 174 filecount, bytecount = map(int, l.split(b' ', 1))
175 175 except (ValueError, TypeError):
176 176 raise error.ResponseError(
177 177 _(b'unexpected response from remote server:'), l
178 178 )
179 179
180 180 with repo.lock():
181 181 consumev1(repo, fp, filecount, bytecount)
182 182
183 183 # new requirements = old non-format requirements +
184 184 # new format-related remote requirements
185 185 # requirements from the streamed-in repository
186 186 repo.requirements = requirements | (
187 187 repo.requirements - repo.supportedformats
188 188 )
189 189 repo.svfs.options = localrepo.resolvestorevfsoptions(
190 190 repo.ui, repo.requirements, repo.features
191 191 )
192 192 scmutil.writereporequirements(repo)
193 193
194 194 if rbranchmap:
195 195 repo._branchcaches.replace(repo, rbranchmap)
196 196
197 197 repo.invalidate()
198 198
199 199
200 200 def allowservergeneration(repo):
201 201 """Whether streaming clones are allowed from the server."""
202 202 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
203 203 return False
204 204
205 205 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
206 206 return False
207 207
208 208 # The way stream clone works makes it impossible to hide secret changesets.
209 209 # So don't allow this by default.
210 210 secret = phases.hassecret(repo)
211 211 if secret:
212 212 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
213 213
214 214 return True
215 215
216 216
217 217 # This is it's own function so extensions can override it.
218 218 def _walkstreamfiles(repo, matcher=None):
219 219 return repo.store.walk(matcher)
220 220
221 221
222 222 def generatev1(repo):
223 223 """Emit content for version 1 of a streaming clone.
224 224
225 225 This returns a 3-tuple of (file count, byte size, data iterator).
226 226
227 227 The data iterator consists of N entries for each file being transferred.
228 228 Each file entry starts as a line with the file name and integer size
229 229 delimited by a null byte.
230 230
231 231 The raw file data follows. Following the raw file data is the next file
232 232 entry, or EOF.
233 233
234 234 When used on the wire protocol, an additional line indicating protocol
235 235 success will be prepended to the stream. This function is not responsible
236 236 for adding it.
237 237
238 238 This function will obtain a repository lock to ensure a consistent view of
239 239 the store is captured. It therefore may raise LockError.
240 240 """
241 241 entries = []
242 242 total_bytes = 0
243 243 # Get consistent snapshot of repo, lock during scan.
244 244 with repo.lock():
245 245 repo.ui.debug(b'scanning\n')
246 246 for file_type, name, ename, size in _walkstreamfiles(repo):
247 247 if size:
248 248 entries.append((name, size))
249 249 total_bytes += size
250 250 _test_sync_point_walk_1(repo)
251 251 _test_sync_point_walk_2(repo)
252 252
253 253 repo.ui.debug(
254 254 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
255 255 )
256 256
257 257 svfs = repo.svfs
258 258 debugflag = repo.ui.debugflag
259 259
260 260 def emitrevlogdata():
261 261 for name, size in entries:
262 262 if debugflag:
263 263 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
264 264 # partially encode name over the wire for backwards compat
265 265 yield b'%s\0%d\n' % (store.encodedir(name), size)
266 266 # auditing at this stage is both pointless (paths are already
267 267 # trusted by the local repo) and expensive
268 268 with svfs(name, b'rb', auditpath=False) as fp:
269 269 if size <= 65536:
270 270 yield fp.read(size)
271 271 else:
272 272 for chunk in util.filechunkiter(fp, limit=size):
273 273 yield chunk
274 274
275 275 return len(entries), total_bytes, emitrevlogdata()
276 276
277 277
278 278 def generatev1wireproto(repo):
279 279 """Emit content for version 1 of streaming clone suitable for the wire.
280 280
281 281 This is the data output from ``generatev1()`` with 2 header lines. The
282 282 first line indicates overall success. The 2nd contains the file count and
283 283 byte size of payload.
284 284
285 285 The success line contains "0" for success, "1" for stream generation not
286 286 allowed, and "2" for error locking the repository (possibly indicating
287 287 a permissions error for the server process).
288 288 """
289 289 if not allowservergeneration(repo):
290 290 yield b'1\n'
291 291 return
292 292
293 293 try:
294 294 filecount, bytecount, it = generatev1(repo)
295 295 except error.LockError:
296 296 yield b'2\n'
297 297 return
298 298
299 299 # Indicates successful response.
300 300 yield b'0\n'
301 301 yield b'%d %d\n' % (filecount, bytecount)
302 302 for chunk in it:
303 303 yield chunk
304 304
305 305
306 306 def generatebundlev1(repo, compression=b'UN'):
307 307 """Emit content for version 1 of a stream clone bundle.
308 308
309 309 The first 4 bytes of the output ("HGS1") denote this as stream clone
310 310 bundle version 1.
311 311
312 312 The next 2 bytes indicate the compression type. Only "UN" is currently
313 313 supported.
314 314
315 315 The next 16 bytes are two 64-bit big endian unsigned integers indicating
316 316 file count and byte count, respectively.
317 317
318 318 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
319 319 of the requirements string, including a trailing \0. The following N bytes
320 320 are the requirements string, which is ASCII containing a comma-delimited
321 321 list of repo requirements that are needed to support the data.
322 322
323 323 The remaining content is the output of ``generatev1()`` (which may be
324 324 compressed in the future).
325 325
326 326 Returns a tuple of (requirements, data generator).
327 327 """
328 328 if compression != b'UN':
329 329 raise ValueError(b'we do not support the compression argument yet')
330 330
331 331 requirements = repo.requirements & repo.supportedformats
332 332 requires = b','.join(sorted(requirements))
333 333
334 334 def gen():
335 335 yield b'HGS1'
336 336 yield compression
337 337
338 338 filecount, bytecount, it = generatev1(repo)
339 339 repo.ui.status(
340 340 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
341 341 )
342 342
343 343 yield struct.pack(b'>QQ', filecount, bytecount)
344 344 yield struct.pack(b'>H', len(requires) + 1)
345 345 yield requires + b'\0'
346 346
347 347 # This is where we'll add compression in the future.
348 348 assert compression == b'UN'
349 349
350 350 progress = repo.ui.makeprogress(
351 351 _(b'bundle'), total=bytecount, unit=_(b'bytes')
352 352 )
353 353 progress.update(0)
354 354
355 355 for chunk in it:
356 356 progress.increment(step=len(chunk))
357 357 yield chunk
358 358
359 359 progress.complete()
360 360
361 361 return requirements, gen()
362 362
363 363
364 364 def consumev1(repo, fp, filecount, bytecount):
365 365 """Apply the contents from version 1 of a streaming clone file handle.
366 366
367 367 This takes the output from "stream_out" and applies it to the specified
368 368 repository.
369 369
370 370 Like "stream_out," the status line added by the wire protocol is not
371 371 handled by this function.
372 372 """
373 373 with repo.lock():
374 374 repo.ui.status(
375 375 _(b'%d files to transfer, %s of data\n')
376 376 % (filecount, util.bytecount(bytecount))
377 377 )
378 378 progress = repo.ui.makeprogress(
379 379 _(b'clone'), total=bytecount, unit=_(b'bytes')
380 380 )
381 381 progress.update(0)
382 382 start = util.timer()
383 383
384 384 # TODO: get rid of (potential) inconsistency
385 385 #
386 386 # If transaction is started and any @filecache property is
387 387 # changed at this point, it causes inconsistency between
388 388 # in-memory cached property and streamclone-ed file on the
389 389 # disk. Nested transaction prevents transaction scope "clone"
390 390 # below from writing in-memory changes out at the end of it,
391 391 # even though in-memory changes are discarded at the end of it
392 392 # regardless of transaction nesting.
393 393 #
394 394 # But transaction nesting can't be simply prohibited, because
395 395 # nesting occurs also in ordinary case (e.g. enabling
396 396 # clonebundles).
397 397
398 398 with repo.transaction(b'clone'):
399 399 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
400 400 for i in pycompat.xrange(filecount):
401 401 # XXX doesn't support '\n' or '\r' in filenames
402 402 l = fp.readline()
403 403 try:
404 404 name, size = l.split(b'\0', 1)
405 405 size = int(size)
406 406 except (ValueError, TypeError):
407 407 raise error.ResponseError(
408 408 _(b'unexpected response from remote server:'), l
409 409 )
410 410 if repo.ui.debugflag:
411 411 repo.ui.debug(
412 412 b'adding %s (%s)\n' % (name, util.bytecount(size))
413 413 )
414 414 # for backwards compat, name was partially encoded
415 415 path = store.decodedir(name)
416 416 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
417 417 for chunk in util.filechunkiter(fp, limit=size):
418 418 progress.increment(step=len(chunk))
419 419 ofp.write(chunk)
420 420
421 421 # force @filecache properties to be reloaded from
422 422 # streamclone-ed file at next access
423 423 repo.invalidate(clearfilecache=True)
424 424
425 425 elapsed = util.timer() - start
426 426 if elapsed <= 0:
427 427 elapsed = 0.001
428 428 progress.complete()
429 429 repo.ui.status(
430 430 _(b'transferred %s in %.1f seconds (%s/sec)\n')
431 431 % (
432 432 util.bytecount(bytecount),
433 433 elapsed,
434 434 util.bytecount(bytecount / elapsed),
435 435 )
436 436 )
437 437
438 438
439 439 def readbundle1header(fp):
440 440 compression = fp.read(2)
441 441 if compression != b'UN':
442 442 raise error.Abort(
443 443 _(
444 444 b'only uncompressed stream clone bundles are '
445 445 b'supported; got %s'
446 446 )
447 447 % compression
448 448 )
449 449
450 450 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
451 451 requireslen = struct.unpack(b'>H', fp.read(2))[0]
452 452 requires = fp.read(requireslen)
453 453
454 454 if not requires.endswith(b'\0'):
455 455 raise error.Abort(
456 456 _(
457 457 b'malformed stream clone bundle: '
458 458 b'requirements not properly encoded'
459 459 )
460 460 )
461 461
462 462 requirements = set(requires.rstrip(b'\0').split(b','))
463 463
464 464 return filecount, bytecount, requirements
465 465
466 466
467 467 def applybundlev1(repo, fp):
468 468 """Apply the content from a stream clone bundle version 1.
469 469
470 470 We assume the 4 byte header has been read and validated and the file handle
471 471 is at the 2 byte compression identifier.
472 472 """
473 473 if len(repo):
474 474 raise error.Abort(
475 475 _(b'cannot apply stream clone bundle on non-empty repo')
476 476 )
477 477
478 478 filecount, bytecount, requirements = readbundle1header(fp)
479 479 missingreqs = requirements - repo.supportedformats
480 480 if missingreqs:
481 481 raise error.Abort(
482 482 _(b'unable to apply stream clone: unsupported format: %s')
483 483 % b', '.join(sorted(missingreqs))
484 484 )
485 485
486 486 consumev1(repo, fp, filecount, bytecount)
487 487
488 488
489 489 class streamcloneapplier(object):
490 490 """Class to manage applying streaming clone bundles.
491 491
492 492 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
493 493 readers to perform bundle type-specific functionality.
494 494 """
495 495
496 496 def __init__(self, fh):
497 497 self._fh = fh
498 498
499 499 def apply(self, repo):
500 500 return applybundlev1(repo, self._fh)
501 501
502 502
503 503 # type of file to stream
504 504 _fileappend = 0 # append only file
505 505 _filefull = 1 # full snapshot file
506 506
507 507 # Source of the file
508 508 _srcstore = b's' # store (svfs)
509 509 _srccache = b'c' # cache (cache)
510 510
511 511 # This is it's own function so extensions can override it.
512 512 def _walkstreamfullstorefiles(repo):
513 513 """list snapshot file from the store"""
514 514 fnames = []
515 515 if not repo.publishing():
516 516 fnames.append(b'phaseroots')
517 517 return fnames
518 518
519 519
520 520 def _filterfull(entry, copy, vfsmap):
521 521 """actually copy the snapshot files"""
522 522 src, name, ftype, data = entry
523 523 if ftype != _filefull:
524 524 return entry
525 525 return (src, name, ftype, copy(vfsmap[src].join(name)))
526 526
527 527
528 528 @contextlib.contextmanager
529 529 def maketempcopies():
530 530 """return a function to temporary copy file"""
531 531 files = []
532 532 try:
533 533
534 534 def copy(src):
535 535 fd, dst = pycompat.mkstemp()
536 536 os.close(fd)
537 537 files.append(dst)
538 538 util.copyfiles(src, dst, hardlink=True)
539 539 return dst
540 540
541 541 yield copy
542 542 finally:
543 543 for tmp in files:
544 544 util.tryunlink(tmp)
545 545
546 546
547 547 def _makemap(repo):
548 548 """make a (src -> vfs) map for the repo"""
549 549 vfsmap = {
550 550 _srcstore: repo.svfs,
551 551 _srccache: repo.cachevfs,
552 552 }
553 553 # we keep repo.vfs out of the on purpose, ther are too many danger there
554 554 # (eg: .hg/hgrc)
555 555 assert repo.vfs not in vfsmap.values()
556 556
557 557 return vfsmap
558 558
559 559
560 560 def _emit2(repo, entries, totalfilesize):
561 561 """actually emit the stream bundle"""
562 562 vfsmap = _makemap(repo)
563 563 progress = repo.ui.makeprogress(
564 564 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
565 565 )
566 566 progress.update(0)
567 567 with maketempcopies() as copy, progress:
568 568 # copy is delayed until we are in the try
569 569 entries = [_filterfull(e, copy, vfsmap) for e in entries]
570 570 yield None # this release the lock on the repository
571 571 seen = 0
572 572
573 573 for src, name, ftype, data in entries:
574 574 vfs = vfsmap[src]
575 575 yield src
576 576 yield util.uvarintencode(len(name))
577 577 if ftype == _fileappend:
578 578 fp = vfs(name)
579 579 size = data
580 580 elif ftype == _filefull:
581 581 fp = open(data, b'rb')
582 582 size = util.fstat(fp).st_size
583 583 try:
584 584 yield util.uvarintencode(size)
585 585 yield name
586 586 if size <= 65536:
587 587 chunks = (fp.read(size),)
588 588 else:
589 589 chunks = util.filechunkiter(fp, limit=size)
590 590 for chunk in chunks:
591 591 seen += len(chunk)
592 592 progress.update(seen)
593 593 yield chunk
594 594 finally:
595 595 fp.close()
596 596
597 597
598 598 def _test_sync_point_walk_1(repo):
599 599 """a function for synchronisation during tests"""
600 600
601 601
602 602 def _test_sync_point_walk_2(repo):
603 603 """a function for synchronisation during tests"""
604 604
605 605
606 def generatev2(repo, includes, excludes, includeobsmarkers):
607 """Emit content for version 2 of a streaming clone.
606 def _v2_walk(repo, includes, excludes, includeobsmarkers):
607 """emit a seris of files information useful to clone a repo
608
609 return (entries, totalfilesize)
610
611 entries is a list of tuple (vfs-key, file-path, file-type, size)
608 612
609 the data stream consists the following entries:
610 1) A char representing the file destination (eg: store or cache)
611 2) A varint containing the length of the filename
612 3) A varint containing the length of file data
613 4) N bytes containing the filename (the internal, store-agnostic form)
614 5) N bytes containing the file data
615
616 Returns a 3-tuple of (file count, file size, data iterator).
613 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
614 - `name`: file path of the file to copy (to be feed to the vfss)
615 - `file-type`: do this file need to be copied with the source lock ?
616 - `size`: the size of the file (or None)
617 617 """
618
619 with repo.lock():
620
618 assert repo._currentlock(repo._lockref) is not None
621 619 entries = []
622 620 totalfilesize = 0
623 621
624 622 matcher = None
625 623 if includes or excludes:
626 624 matcher = narrowspec.match(repo.root, includes, excludes)
627 625
628 repo.ui.debug(b'scanning\n')
629 626 for rl_type, name, ename, size in _walkstreamfiles(repo, matcher):
630 627 if size:
631 628 ft = _fileappend
632 629 if rl_type & store.FILEFLAGS_VOLATILE:
633 630 ft = _filefull
634 631 entries.append((_srcstore, name, ft, size))
635 632 totalfilesize += size
636 633 for name in _walkstreamfullstorefiles(repo):
637 634 if repo.svfs.exists(name):
638 635 totalfilesize += repo.svfs.lstat(name).st_size
639 636 entries.append((_srcstore, name, _filefull, None))
640 637 if includeobsmarkers and repo.svfs.exists(b'obsstore'):
641 638 totalfilesize += repo.svfs.lstat(b'obsstore').st_size
642 639 entries.append((_srcstore, b'obsstore', _filefull, None))
643 640 for name in cacheutil.cachetocopy(repo):
644 641 if repo.cachevfs.exists(name):
645 642 totalfilesize += repo.cachevfs.lstat(name).st_size
646 643 entries.append((_srccache, name, _filefull, None))
644 return entries, totalfilesize
645
646
647 def generatev2(repo, includes, excludes, includeobsmarkers):
648 """Emit content for version 2 of a streaming clone.
649
650 the data stream consists the following entries:
651 1) A char representing the file destination (eg: store or cache)
652 2) A varint containing the length of the filename
653 3) A varint containing the length of file data
654 4) N bytes containing the filename (the internal, store-agnostic form)
655 5) N bytes containing the file data
656
657 Returns a 3-tuple of (file count, file size, data iterator).
658 """
659
660 with repo.lock():
661
662 repo.ui.debug(b'scanning\n')
663
664 entries, totalfilesize = _v2_walk(
665 repo,
666 includes=includes,
667 excludes=excludes,
668 includeobsmarkers=includeobsmarkers,
669 )
647 670
648 671 chunks = _emit2(repo, entries, totalfilesize)
649 672 first = next(chunks)
650 673 assert first is None
651 674 _test_sync_point_walk_1(repo)
652 675 _test_sync_point_walk_2(repo)
653 676
654 677 return len(entries), totalfilesize, chunks
655 678
656 679
657 680 @contextlib.contextmanager
658 681 def nested(*ctxs):
659 682 this = ctxs[0]
660 683 rest = ctxs[1:]
661 684 with this:
662 685 if rest:
663 686 with nested(*rest):
664 687 yield
665 688 else:
666 689 yield
667 690
668 691
669 692 def consumev2(repo, fp, filecount, filesize):
670 693 """Apply the contents from a version 2 streaming clone.
671 694
672 695 Data is read from an object that only needs to provide a ``read(size)``
673 696 method.
674 697 """
675 698 with repo.lock():
676 699 repo.ui.status(
677 700 _(b'%d files to transfer, %s of data\n')
678 701 % (filecount, util.bytecount(filesize))
679 702 )
680 703
681 704 start = util.timer()
682 705 progress = repo.ui.makeprogress(
683 706 _(b'clone'), total=filesize, unit=_(b'bytes')
684 707 )
685 708 progress.update(0)
686 709
687 710 vfsmap = _makemap(repo)
688 711
689 712 with repo.transaction(b'clone'):
690 713 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
691 714 with nested(*ctxs):
692 715 for i in range(filecount):
693 716 src = util.readexactly(fp, 1)
694 717 vfs = vfsmap[src]
695 718 namelen = util.uvarintdecodestream(fp)
696 719 datalen = util.uvarintdecodestream(fp)
697 720
698 721 name = util.readexactly(fp, namelen)
699 722
700 723 if repo.ui.debugflag:
701 724 repo.ui.debug(
702 725 b'adding [%s] %s (%s)\n'
703 726 % (src, name, util.bytecount(datalen))
704 727 )
705 728
706 729 with vfs(name, b'w') as ofp:
707 730 for chunk in util.filechunkiter(fp, limit=datalen):
708 731 progress.increment(step=len(chunk))
709 732 ofp.write(chunk)
710 733
711 734 # force @filecache properties to be reloaded from
712 735 # streamclone-ed file at next access
713 736 repo.invalidate(clearfilecache=True)
714 737
715 738 elapsed = util.timer() - start
716 739 if elapsed <= 0:
717 740 elapsed = 0.001
718 741 repo.ui.status(
719 742 _(b'transferred %s in %.1f seconds (%s/sec)\n')
720 743 % (
721 744 util.bytecount(progress.pos),
722 745 elapsed,
723 746 util.bytecount(progress.pos / elapsed),
724 747 )
725 748 )
726 749 progress.complete()
727 750
728 751
729 752 def applybundlev2(repo, fp, filecount, filesize, requirements):
730 753 from . import localrepo
731 754
732 755 missingreqs = [r for r in requirements if r not in repo.supported]
733 756 if missingreqs:
734 757 raise error.Abort(
735 758 _(b'unable to apply stream clone: unsupported format: %s')
736 759 % b', '.join(sorted(missingreqs))
737 760 )
738 761
739 762 consumev2(repo, fp, filecount, filesize)
740 763
741 764 # new requirements = old non-format requirements +
742 765 # new format-related remote requirements
743 766 # requirements from the streamed-in repository
744 767 repo.requirements = set(requirements) | (
745 768 repo.requirements - repo.supportedformats
746 769 )
747 770 repo.svfs.options = localrepo.resolvestorevfsoptions(
748 771 repo.ui, repo.requirements, repo.features
749 772 )
750 773 scmutil.writereporequirements(repo)
General Comments 0
You need to be logged in to leave comments. Login now