streamclone.py
1337 lines
| 44.5 KiB
| text/x-python
|
PythonLexer
/ mercurial / streamclone.py
Gregory Szorc
|
r26441 | # streamclone.py - producing and consuming streaming repository data | ||
# | ||||
# Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com> | ||||
# | ||||
# This software may be used and distributed according to the terms of the | ||||
# GNU General Public License version 2 or any later version. | ||||
Matt Harbison
|
r52756 | from __future__ import annotations | ||
Gregory Szorc
|
r26441 | |||
Boris Feld
|
r35783 | import contextlib | ||
r53256 | import errno | |||
Boris Feld
|
r35783 | import os | ||
Gregory Szorc
|
r26755 | import struct | ||
Gregory Szorc
|
r26443 | |||
r53256 | from typing import Optional | |||
Gregory Szorc
|
r26442 | from .i18n import _ | ||
Augie Fackler
|
r43346 | from .interfaces import repository | ||
Gregory Szorc
|
r26441 | from . import ( | ||
r48240 | bookmarks, | |||
r51416 | bundle2 as bundle2mod, | |||
Boris Feld
|
r35785 | cacheutil, | ||
Gregory Szorc
|
r26442 | error, | ||
Pulkit Goyal
|
r40375 | narrowspec, | ||
Gregory Szorc
|
r32744 | phases, | ||
Yuya Nishihara
|
r38182 | pycompat, | ||
Raphaël Gomès
|
r47371 | requirements as requirementsmod, | ||
Pulkit Goyal
|
r45666 | scmutil, | ||
Gregory Szorc
|
r26443 | store, | ||
r51194 | transaction, | |||
Gregory Szorc
|
r26442 | util, | ||
Gregory Szorc
|
r26441 | ) | ||
r49542 | from .revlogutils import ( | |||
nodemap, | ||||
) | ||||
Gregory Szorc
|
r26441 | |||
Augie Fackler
|
r43346 | |||
r49447 | def new_stream_clone_requirements(default_requirements, streamed_requirements): | |||
r49442 | """determine the final set of requirement for a new stream clone | |||
this method combine the "default" requirements that a new repository would | ||||
use with the constaint we get from the stream clone content. We keep local | ||||
configuration choice when possible. | ||||
""" | ||||
requirements = set(default_requirements) | ||||
r49447 | requirements -= requirementsmod.STREAM_FIXED_REQUIREMENTS | |||
r49442 | requirements.update(streamed_requirements) | |||
return requirements | ||||
r49443 | def streamed_requirements(repo): | |||
"""the set of requirement the new clone will have to support | ||||
This is used for advertising the stream options and to generate the actual | ||||
stream content.""" | ||||
r49447 | requiredformats = ( | |||
repo.requirements & requirementsmod.STREAM_FIXED_REQUIREMENTS | ||||
) | ||||
r49443 | return requiredformats | |||
Boris Feld
|
r35775 | def canperformstreamclone(pullop, bundle2=False): | ||
Gregory Szorc
|
r26446 | """Whether it is possible to perform a streaming clone as part of pull. | ||
Gregory Szorc
|
r26445 | |||
Boris Feld
|
r35775 | ``bundle2`` will cause the function to consider stream clone through | ||
bundle2 and only through bundle2. | ||||
Gregory Szorc
|
r26467 | |||
Gregory Szorc
|
r26446 | Returns a tuple of (supported, requirements). ``supported`` is True if | ||
streaming clone is supported and False otherwise. ``requirements`` is | ||||
a set of repo requirements from the remote, or ``None`` if stream clone | ||||
isn't supported. | ||||
""" | ||||
Gregory Szorc
|
r26466 | repo = pullop.repo | ||
remote = pullop.remote | ||||
r51415 | # should we consider streaming clone at all ? | |||
streamrequested = pullop.streamclonerequested | ||||
# If we don't have a preference, let the server decide for us. This | ||||
# likely only comes into play in LANs. | ||||
if streamrequested is None: | ||||
# The server can advertise whether to prefer streaming clone. | ||||
streamrequested = remote.capable(b'stream-preferred') | ||||
if not streamrequested: | ||||
return False, None | ||||
r51413 | # Streaming clone only works on an empty destination repository | |||
if len(repo): | ||||
return False, None | ||||
r51414 | # Streaming clone only works if all data is being requested. | |||
if pullop.heads: | ||||
return False, None | ||||
Gregory Szorc
|
r26467 | bundle2supported = False | ||
if pullop.canusebundle2: | ||||
r51416 | local_caps = bundle2mod.getrepocaps(repo, role=b'client') | |||
local_supported = set(local_caps.get(b'stream', [])) | ||||
remote_supported = set(pullop.remotebundle2caps.get(b'stream', [])) | ||||
bundle2supported = bool(local_supported & remote_supported) | ||||
Gregory Szorc
|
r26467 | # else | ||
Augie Fackler
|
r43346 | # Server doesn't support bundle2 stream clone or doesn't support | ||
# the versions we support. Fall back and possibly allow legacy. | ||||
Gregory Szorc
|
r26467 | |||
# Ensures legacy code path uses available bundle2. | ||||
Boris Feld
|
r35775 | if bundle2supported and not bundle2: | ||
Gregory Szorc
|
r26467 | return False, None | ||
# Ensures bundle2 doesn't try to do a stream clone if it isn't supported. | ||||
Boris Feld
|
r35775 | elif bundle2 and not bundle2supported: | ||
return False, None | ||||
Gregory Szorc
|
r26467 | |||
Gregory Szorc
|
r26446 | # In order for stream clone to work, the client has to support all the | ||
# requirements advertised by the server. | ||||
# | ||||
# The server advertises its requirements via the "stream" and "streamreqs" | ||||
# capability. "stream" (a value-less capability) is advertised if and only | ||||
# if the only requirement is "revlogv1." Else, the "streamreqs" capability | ||||
# is advertised and contains a comma-delimited list of requirements. | ||||
requirements = set() | ||||
Augie Fackler
|
r43347 | if remote.capable(b'stream'): | ||
Raphaël Gomès
|
r47371 | requirements.add(requirementsmod.REVLOGV1_REQUIREMENT) | ||
Gregory Szorc
|
r26446 | else: | ||
Augie Fackler
|
r43347 | streamreqs = remote.capable(b'streamreqs') | ||
Gregory Szorc
|
r26446 | # This is weird and shouldn't happen with modern servers. | ||
if not streamreqs: | ||||
Augie Fackler
|
r43346 | pullop.repo.ui.warn( | ||
_( | ||||
Augie Fackler
|
r43347 | b'warning: stream clone requested but server has them ' | ||
b'disabled\n' | ||||
Augie Fackler
|
r43346 | ) | ||
) | ||||
Gregory Szorc
|
r26446 | return False, None | ||
Augie Fackler
|
r43347 | streamreqs = set(streamreqs.split(b',')) | ||
Gregory Szorc
|
r26446 | # Server requires something we don't support. Bail. | ||
r49522 | missingreqs = streamreqs - repo.supported | |||
Siddharth Agarwal
|
r32259 | if missingreqs: | ||
pullop.repo.ui.warn( | ||||
Augie Fackler
|
r43346 | _( | ||
Augie Fackler
|
r43347 | b'warning: stream clone requested but client is missing ' | ||
b'requirements: %s\n' | ||||
Augie Fackler
|
r43346 | ) | ||
Augie Fackler
|
r43347 | % b', '.join(sorted(missingreqs)) | ||
Augie Fackler
|
r43346 | ) | ||
pullop.repo.ui.warn( | ||||
_( | ||||
Augie Fackler
|
r43347 | b'(see https://www.mercurial-scm.org/wiki/MissingRequirement ' | ||
b'for more information)\n' | ||||
Augie Fackler
|
r43346 | ) | ||
) | ||||
Gregory Szorc
|
r26446 | return False, None | ||
requirements = streamreqs | ||||
return True, requirements | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r26462 | def maybeperformlegacystreamclone(pullop): | ||
"""Possibly perform a legacy stream clone operation. | ||||
Legacy stream clones are performed as part of pull but before all other | ||||
operations. | ||||
A legacy stream clone will not be performed if a bundle2 stream clone is | ||||
supported. | ||||
""" | ||||
Gregory Szorc
|
r39736 | from . import localrepo | ||
Gregory Szorc
|
r26466 | supported, requirements = canperformstreamclone(pullop) | ||
Gregory Szorc
|
r26458 | |||
Gregory Szorc
|
r26446 | if not supported: | ||
return | ||||
Gregory Szorc
|
r26466 | repo = pullop.repo | ||
remote = pullop.remote | ||||
Gregory Szorc
|
r26459 | # Save remote branchmap. We will use it later to speed up branchcache | ||
# creation. | ||||
rbranchmap = None | ||||
Augie Fackler
|
r43347 | if remote.capable(b'branchmap'): | ||
Gregory Szorc
|
r37656 | with remote.commandexecutor() as e: | ||
Augie Fackler
|
r43347 | rbranchmap = e.callcommand(b'branchmap', {}).result() | ||
Gregory Szorc
|
r26459 | |||
Augie Fackler
|
r43347 | repo.ui.status(_(b'streaming all changes\n')) | ||
Gregory Szorc
|
r26470 | |||
Gregory Szorc
|
r37656 | with remote.commandexecutor() as e: | ||
Augie Fackler
|
r43347 | fp = e.callcommand(b'stream_out', {}).result() | ||
Gregory Szorc
|
r37656 | |||
# TODO strictly speaking, this code should all be inside the context | ||||
# manager because the context manager is supposed to ensure all wire state | ||||
# is flushed when exiting. But the legacy peers don't do this, so it | ||||
# doesn't matter. | ||||
Gregory Szorc
|
r26459 | l = fp.readline() | ||
try: | ||||
resp = int(l) | ||||
except ValueError: | ||||
raise error.ResponseError( | ||||
Augie Fackler
|
r43347 | _(b'unexpected response from remote server:'), l | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r26459 | if resp == 1: | ||
Augie Fackler
|
r43347 | raise error.Abort(_(b'operation forbidden by server')) | ||
Gregory Szorc
|
r26459 | elif resp == 2: | ||
Augie Fackler
|
r43347 | raise error.Abort(_(b'locking the remote repository failed')) | ||
Gregory Szorc
|
r26459 | elif resp != 0: | ||
Augie Fackler
|
r43347 | raise error.Abort(_(b'the server sent an unknown error code')) | ||
Gregory Szorc
|
r26459 | |||
Gregory Szorc
|
r26468 | l = fp.readline() | ||
try: | ||||
Augie Fackler
|
r43347 | filecount, bytecount = map(int, l.split(b' ', 1)) | ||
Gregory Szorc
|
r26468 | except (ValueError, TypeError): | ||
raise error.ResponseError( | ||||
Augie Fackler
|
r43347 | _(b'unexpected response from remote server:'), l | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r26468 | |||
Bryan O'Sullivan
|
r27850 | with repo.lock(): | ||
Gregory Szorc
|
r26468 | consumev1(repo, fp, filecount, bytecount) | ||
r49442 | repo.requirements = new_stream_clone_requirements( | |||
repo.requirements, | ||||
requirements, | ||||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r39736 | repo.svfs.options = localrepo.resolvestorevfsoptions( | ||
Augie Fackler
|
r43346 | repo.ui, repo.requirements, repo.features | ||
) | ||||
Pulkit Goyal
|
r45666 | scmutil.writereporequirements(repo) | ||
r49542 | nodemap.post_stream_cleanup(repo) | |||
Gregory Szorc
|
r26461 | |||
if rbranchmap: | ||||
Martijn Pieters
|
r41764 | repo._branchcaches.replace(repo, rbranchmap) | ||
Gregory Szorc
|
r26461 | |||
repo.invalidate() | ||||
Gregory Szorc
|
r26445 | |||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r32744 | def allowservergeneration(repo): | ||
Gregory Szorc
|
r26444 | """Whether streaming clones are allowed from the server.""" | ||
Gregory Szorc
|
r40064 | if repository.REPO_FEATURE_STREAM_CLONE not in repo.features: | ||
return False | ||||
Augie Fackler
|
r43347 | if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True): | ||
Gregory Szorc
|
r32744 | return False | ||
# The way stream clone works makes it impossible to hide secret changesets. | ||||
# So don't allow this by default. | ||||
secret = phases.hassecret(repo) | ||||
if secret: | ||||
Augie Fackler
|
r43347 | return repo.ui.configbool(b'server', b'uncompressedallowsecret') | ||
Gregory Szorc
|
r32744 | |||
return True | ||||
Gregory Szorc
|
r26444 | |||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r26443 | # This is it's own function so extensions can override it. | ||
r51407 | def _walkstreamfiles(repo, matcher=None, phase=False, obsolescence=False): | |||
return repo.store.walk(matcher, phase=phase, obsolescence=obsolescence) | ||||
Gregory Szorc
|
r26443 | |||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r26443 | def generatev1(repo): | ||
"""Emit content for version 1 of a streaming clone. | ||||
Gregory Szorc
|
r26469 | This returns a 3-tuple of (file count, byte size, data iterator). | ||
Gregory Szorc
|
r26443 | |||
Gregory Szorc
|
r26469 | The data iterator consists of N entries for each file being transferred. | ||
Each file entry starts as a line with the file name and integer size | ||||
delimited by a null byte. | ||||
Gregory Szorc
|
r26443 | |||
The raw file data follows. Following the raw file data is the next file | ||||
entry, or EOF. | ||||
When used on the wire protocol, an additional line indicating protocol | ||||
success will be prepended to the stream. This function is not responsible | ||||
for adding it. | ||||
This function will obtain a repository lock to ensure a consistent view of | ||||
the store is captured. It therefore may raise LockError. | ||||
""" | ||||
entries = [] | ||||
total_bytes = 0 | ||||
# Get consistent snapshot of repo, lock during scan. | ||||
Bryan O'Sullivan
|
r27845 | with repo.lock(): | ||
Augie Fackler
|
r43347 | repo.ui.debug(b'scanning\n') | ||
r53256 | _test_sync_point_walk_1_2(repo) | |||
r51364 | for entry in _walkstreamfiles(repo): | |||
r51365 | for f in entry.files(): | |||
r51370 | file_size = f.file_size(repo.store.vfs) | |||
if file_size: | ||||
entries.append((f.unencoded_path, file_size)) | ||||
total_bytes += file_size | ||||
r53255 | _test_sync_point_walk_3(repo) | |||
_test_sync_point_walk_4(repo) | ||||
Gregory Szorc
|
r26443 | |||
Augie Fackler
|
r43346 | repo.ui.debug( | ||
Augie Fackler
|
r43347 | b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes) | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r26443 | |||
svfs = repo.svfs | ||||
debugflag = repo.ui.debugflag | ||||
Gregory Szorc
|
r26469 | def emitrevlogdata(): | ||
r33256 | for name, size in entries: | |||
if debugflag: | ||||
Augie Fackler
|
r43347 | repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size)) | ||
r33256 | # partially encode name over the wire for backwards compat | |||
Augie Fackler
|
r43347 | yield b'%s\0%d\n' % (store.encodedir(name), size) | ||
Yuya Nishihara
|
r33411 | # auditing at this stage is both pointless (paths are already | ||
# trusted by the local repo) and expensive | ||||
Augie Fackler
|
r43347 | with svfs(name, b'rb', auditpath=False) as fp: | ||
Yuya Nishihara
|
r33410 | if size <= 65536: | ||
r33256 | yield fp.read(size) | |||
Yuya Nishihara
|
r33410 | else: | ||
for chunk in util.filechunkiter(fp, limit=size): | ||||
yield chunk | ||||
Gregory Szorc
|
r26469 | |||
return len(entries), total_bytes, emitrevlogdata() | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r26469 | def generatev1wireproto(repo): | ||
"""Emit content for version 1 of streaming clone suitable for the wire. | ||||
Gregory Szorc
|
r35507 | This is the data output from ``generatev1()`` with 2 header lines. The | ||
first line indicates overall success. The 2nd contains the file count and | ||||
byte size of payload. | ||||
The success line contains "0" for success, "1" for stream generation not | ||||
allowed, and "2" for error locking the repository (possibly indicating | ||||
a permissions error for the server process). | ||||
Gregory Szorc
|
r26469 | """ | ||
Gregory Szorc
|
r35507 | if not allowservergeneration(repo): | ||
Augie Fackler
|
r43347 | yield b'1\n' | ||
Gregory Szorc
|
r35507 | return | ||
try: | ||||
filecount, bytecount, it = generatev1(repo) | ||||
except error.LockError: | ||||
Augie Fackler
|
r43347 | yield b'2\n' | ||
Gregory Szorc
|
r35507 | return | ||
# Indicates successful response. | ||||
Augie Fackler
|
r43347 | yield b'0\n' | ||
yield b'%d %d\n' % (filecount, bytecount) | ||||
Gregory Szorc
|
r26469 | for chunk in it: | ||
yield chunk | ||||
Gregory Szorc
|
r26443 | |||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r43347 | def generatebundlev1(repo, compression=b'UN'): | ||
Gregory Szorc
|
r26755 | """Emit content for version 1 of a stream clone bundle. | ||
The first 4 bytes of the output ("HGS1") denote this as stream clone | ||||
bundle version 1. | ||||
The next 2 bytes indicate the compression type. Only "UN" is currently | ||||
supported. | ||||
The next 16 bytes are two 64-bit big endian unsigned integers indicating | ||||
file count and byte count, respectively. | ||||
The next 2 bytes is a 16-bit big endian unsigned short declaring the length | ||||
of the requirements string, including a trailing \0. The following N bytes | ||||
are the requirements string, which is ASCII containing a comma-delimited | ||||
list of repo requirements that are needed to support the data. | ||||
The remaining content is the output of ``generatev1()`` (which may be | ||||
compressed in the future). | ||||
Returns a tuple of (requirements, data generator). | ||||
""" | ||||
Augie Fackler
|
r43347 | if compression != b'UN': | ||
raise ValueError(b'we do not support the compression argument yet') | ||||
Gregory Szorc
|
r26755 | |||
r49443 | requirements = streamed_requirements(repo) | |||
Augie Fackler
|
r43347 | requires = b','.join(sorted(requirements)) | ||
Gregory Szorc
|
r26755 | |||
def gen(): | ||||
Augie Fackler
|
r43347 | yield b'HGS1' | ||
Gregory Szorc
|
r26755 | yield compression | ||
filecount, bytecount, it = generatev1(repo) | ||||
Augie Fackler
|
r43346 | repo.ui.status( | ||
Augie Fackler
|
r43347 | _(b'writing %d bytes for %d files\n') % (bytecount, filecount) | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r26755 | |||
Augie Fackler
|
r43347 | yield struct.pack(b'>QQ', filecount, bytecount) | ||
yield struct.pack(b'>H', len(requires) + 1) | ||||
yield requires + b'\0' | ||||
Gregory Szorc
|
r26755 | |||
# This is where we'll add compression in the future. | ||||
Augie Fackler
|
r43347 | assert compression == b'UN' | ||
Gregory Szorc
|
r26755 | |||
Augie Fackler
|
r43346 | progress = repo.ui.makeprogress( | ||
Augie Fackler
|
r43347 | _(b'bundle'), total=bytecount, unit=_(b'bytes') | ||
Augie Fackler
|
r43346 | ) | ||
Martin von Zweigbergk
|
r38368 | progress.update(0) | ||
Gregory Szorc
|
r26755 | |||
for chunk in it: | ||||
Martin von Zweigbergk
|
r38368 | progress.increment(step=len(chunk)) | ||
Gregory Szorc
|
r26755 | yield chunk | ||
Martin von Zweigbergk
|
r38392 | progress.complete() | ||
Gregory Szorc
|
r26755 | |||
return requirements, gen() | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r26468 | def consumev1(repo, fp, filecount, bytecount): | ||
Gregory Szorc
|
r26443 | """Apply the contents from version 1 of a streaming clone file handle. | ||
Mads Kiilerich
|
r30332 | This takes the output from "stream_out" and applies it to the specified | ||
Gregory Szorc
|
r26443 | repository. | ||
Mads Kiilerich
|
r30332 | Like "stream_out," the status line added by the wire protocol is not | ||
handled by this function. | ||||
Gregory Szorc
|
r26443 | """ | ||
Bryan O'Sullivan
|
r27859 | with repo.lock(): | ||
Augie Fackler
|
r43346 | repo.ui.status( | ||
Augie Fackler
|
r43347 | _(b'%d files to transfer, %s of data\n') | ||
Augie Fackler
|
r43346 | % (filecount, util.bytecount(bytecount)) | ||
) | ||||
progress = repo.ui.makeprogress( | ||||
Augie Fackler
|
r43347 | _(b'clone'), total=bytecount, unit=_(b'bytes') | ||
Augie Fackler
|
r43346 | ) | ||
Martin von Zweigbergk
|
r38368 | progress.update(0) | ||
Simon Farnsworth
|
r30975 | start = util.timer() | ||
Gregory Szorc
|
r26443 | |||
FUJIWARA Katsunori
|
r29919 | # TODO: get rid of (potential) inconsistency | ||
# | ||||
# If transaction is started and any @filecache property is | ||||
# changed at this point, it causes inconsistency between | ||||
# in-memory cached property and streamclone-ed file on the | ||||
# disk. Nested transaction prevents transaction scope "clone" | ||||
# below from writing in-memory changes out at the end of it, | ||||
# even though in-memory changes are discarded at the end of it | ||||
# regardless of transaction nesting. | ||||
# | ||||
# But transaction nesting can't be simply prohibited, because | ||||
# nesting occurs also in ordinary case (e.g. enabling | ||||
# clonebundles). | ||||
Augie Fackler
|
r43347 | with repo.transaction(b'clone'): | ||
Gregory Szorc
|
r27897 | with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount): | ||
Manuel Jacob
|
r50179 | for i in range(filecount): | ||
Gregory Szorc
|
r27896 | # XXX doesn't support '\n' or '\r' in filenames | ||
r51821 | if hasattr(fp, 'readline'): | |||
Mathias De Mare
|
r51559 | l = fp.readline() | ||
else: | ||||
# inline clonebundles use a chunkbuffer, so no readline | ||||
# --> this should be small anyway, the first line | ||||
# only contains the size of the bundle | ||||
l_buf = [] | ||||
while not (l_buf and l_buf[-1] == b'\n'): | ||||
l_buf.append(fp.read(1)) | ||||
l = b''.join(l_buf) | ||||
Gregory Szorc
|
r27896 | try: | ||
Augie Fackler
|
r43347 | name, size = l.split(b'\0', 1) | ||
Gregory Szorc
|
r27896 | size = int(size) | ||
except (ValueError, TypeError): | ||||
raise error.ResponseError( | ||||
Augie Fackler
|
r43347 | _(b'unexpected response from remote server:'), l | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r27896 | if repo.ui.debugflag: | ||
Augie Fackler
|
r43346 | repo.ui.debug( | ||
Augie Fackler
|
r43347 | b'adding %s (%s)\n' % (name, util.bytecount(size)) | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r27896 | # for backwards compat, name was partially encoded | ||
Gregory Szorc
|
r27897 | path = store.decodedir(name) | ||
Augie Fackler
|
r43347 | with repo.svfs(path, b'w', backgroundclose=True) as ofp: | ||
Gregory Szorc
|
r27896 | for chunk in util.filechunkiter(fp, limit=size): | ||
Martin von Zweigbergk
|
r38368 | progress.increment(step=len(chunk)) | ||
Gregory Szorc
|
r27896 | ofp.write(chunk) | ||
Gregory Szorc
|
r26443 | |||
FUJIWARA Katsunori
|
r29919 | # force @filecache properties to be reloaded from | ||
# streamclone-ed file at next access | ||||
repo.invalidate(clearfilecache=True) | ||||
Gregory Szorc
|
r26443 | |||
Simon Farnsworth
|
r30975 | elapsed = util.timer() - start | ||
Gregory Szorc
|
r26443 | if elapsed <= 0: | ||
elapsed = 0.001 | ||||
Martin von Zweigbergk
|
r38392 | progress.complete() | ||
Augie Fackler
|
r43346 | repo.ui.status( | ||
Augie Fackler
|
r43347 | _(b'transferred %s in %.1f seconds (%s/sec)\n') | ||
Augie Fackler
|
r43346 | % ( | ||
util.bytecount(bytecount), | ||||
elapsed, | ||||
util.bytecount(bytecount / elapsed), | ||||
) | ||||
) | ||||
Gregory Szorc
|
r26755 | |||
Gregory Szorc
|
r27882 | def readbundle1header(fp): | ||
Gregory Szorc
|
r26755 | compression = fp.read(2) | ||
Augie Fackler
|
r43347 | if compression != b'UN': | ||
Augie Fackler
|
r43346 | raise error.Abort( | ||
Augie Fackler
|
r43347 | _( | ||
b'only uncompressed stream clone bundles are ' | ||||
b'supported; got %s' | ||||
) | ||||
Augie Fackler
|
r43346 | % compression | ||
) | ||||
Gregory Szorc
|
r26755 | |||
Augie Fackler
|
r43347 | filecount, bytecount = struct.unpack(b'>QQ', fp.read(16)) | ||
requireslen = struct.unpack(b'>H', fp.read(2))[0] | ||||
Gregory Szorc
|
r26755 | requires = fp.read(requireslen) | ||
Augie Fackler
|
r43347 | if not requires.endswith(b'\0'): | ||
Augie Fackler
|
r43346 | raise error.Abort( | ||
_( | ||||
Augie Fackler
|
r43347 | b'malformed stream clone bundle: ' | ||
b'requirements not properly encoded' | ||||
Augie Fackler
|
r43346 | ) | ||
) | ||||
Gregory Szorc
|
r26755 | |||
Augie Fackler
|
r43347 | requirements = set(requires.rstrip(b'\0').split(b',')) | ||
Gregory Szorc
|
r27882 | |||
return filecount, bytecount, requirements | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r27882 | def applybundlev1(repo, fp): | ||
"""Apply the content from a stream clone bundle version 1. | ||||
We assume the 4 byte header has been read and validated and the file handle | ||||
is at the 2 byte compression identifier. | ||||
""" | ||||
if len(repo): | ||||
Augie Fackler
|
r43346 | raise error.Abort( | ||
Martin von Zweigbergk
|
r43387 | _(b'cannot apply stream clone bundle on non-empty repo') | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r27882 | |||
filecount, bytecount, requirements = readbundle1header(fp) | ||||
r49522 | missingreqs = requirements - repo.supported | |||
Gregory Szorc
|
r26755 | if missingreqs: | ||
Augie Fackler
|
r43346 | raise error.Abort( | ||
Martin von Zweigbergk
|
r43387 | _(b'unable to apply stream clone: unsupported format: %s') | ||
Augie Fackler
|
r43347 | % b', '.join(sorted(missingreqs)) | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r26755 | |||
consumev1(repo, fp, filecount, bytecount) | ||||
r49542 | nodemap.post_stream_cleanup(repo) | |||
Gregory Szorc
|
r26755 | |||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r49801 | class streamcloneapplier: | ||
Gregory Szorc
|
r26755 | """Class to manage applying streaming clone bundles. | ||
We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle | ||||
readers to perform bundle type-specific functionality. | ||||
""" | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r26755 | def __init__(self, fh): | ||
self._fh = fh | ||||
def apply(self, repo): | ||||
return applybundlev1(repo, self._fh) | ||||
Boris Feld
|
r35774 | |||
Augie Fackler
|
r43346 | |||
Boris Feld
|
r35783 | # type of file to stream | ||
Augie Fackler
|
r43346 | _fileappend = 0 # append only file | ||
_filefull = 1 # full snapshot file | ||||
Boris Feld
|
r35783 | |||
Boris Feld
|
r35785 | # Source of the file | ||
Augie Fackler
|
r43347 | _srcstore = b's' # store (svfs) | ||
_srccache = b'c' # cache (cache) | ||||
Boris Feld
|
r35785 | |||
Raphaël Gomès
|
r52596 | |||
Boris Feld
|
r35783 | # This is it's own function so extensions can override it. | ||
def _walkstreamfullstorefiles(repo): | ||||
"""list snapshot file from the store""" | ||||
fnames = [] | ||||
if not repo.publishing(): | ||||
Augie Fackler
|
r43347 | fnames.append(b'phaseroots') | ||
Boris Feld
|
r35783 | return fnames | ||
Augie Fackler
|
r43346 | |||
Boris Feld
|
r35785 | def _filterfull(entry, copy, vfsmap): | ||
Boris Feld
|
r35783 | """actually copy the snapshot files""" | ||
Boris Feld
|
r35785 | src, name, ftype, data = entry | ||
Boris Feld
|
r35783 | if ftype != _filefull: | ||
return entry | ||||
Boris Feld
|
r35785 | return (src, name, ftype, copy(vfsmap[src].join(name))) | ||
Boris Feld
|
r35783 | |||
Augie Fackler
|
r43346 | |||
r52909 | class VolatileManager: | |||
r52912 | """Manage temporary backups of volatile files during stream clone. | |||
r51523 | ||||
r52912 | This class will keep open file handles for the volatile files, writing the | |||
smaller ones on disk if the number of open file handles grow too much. | ||||
r51523 | ||||
r52912 | This should be used as a Python context, the file handles and copies will | |||
be discarded when exiting the context. | ||||
Arseniy Alekseyev
|
r50085 | |||
r52912 | The preservation can be done by calling the object on the real path | |||
(encoded full path). | ||||
Valid filehandles for any file should be retrieved by calling `open(path)`. | ||||
r51523 | """ | |||
r52912 | # arbitrarily picked as "it seemed fine" and much higher than the current | |||
Matt Harbison
|
r53081 | # usage. The Windows value of 2 is actually 1 file open at a time, due to | ||
# the `flush_count = self.MAX_OPEN // 2` and `self.MAX_OPEN - 1` threshold | ||||
# for flushing to disk in __call__(). | ||||
MAX_OPEN = 2 if pycompat.iswindows else 100 | ||||
r52912 | ||||
r51523 | def __init__(self): | |||
r52912 | self._counter = 0 | |||
self._volatile_fps = None | ||||
r51523 | self._copies = None | |||
self._dst_dir = None | ||||
Augie Fackler
|
r43346 | |||
r51523 | def __enter__(self): | |||
r52912 | if self._counter == 0: | |||
assert self._volatile_fps is None | ||||
self._volatile_fps = {} | ||||
self._counter += 1 | ||||
return self | ||||
def __exit__(self, *args, **kwars): | ||||
"""discard all backups""" | ||||
self._counter -= 1 | ||||
if self._counter == 0: | ||||
for _size, fp in self._volatile_fps.values(): | ||||
fp.close() | ||||
self._volatile_fps = None | ||||
if self._copies is not None: | ||||
for tmp in self._copies.values(): | ||||
util.tryunlink(tmp) | ||||
util.tryrmdir(self._dst_dir) | ||||
self._copies = None | ||||
self._dst_dir = None | ||||
assert self._volatile_fps is None | ||||
assert self._copies is None | ||||
assert self._dst_dir is None | ||||
def _init_tmp_copies(self): | ||||
"""prepare a temporary directory to save volatile files | ||||
This will be used as backup if we have too many files open""" | ||||
assert 0 < self._counter | ||||
assert self._copies is None | ||||
assert self._dst_dir is None | ||||
r51523 | self._copies = {} | |||
self._dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-') | ||||
r52912 | ||||
def _flush_some_on_disk(self): | ||||
"""move some of the open files to tempory files on disk""" | ||||
if self._copies is None: | ||||
self._init_tmp_copies() | ||||
flush_count = self.MAX_OPEN // 2 | ||||
for src, (size, fp) in sorted(self._volatile_fps.items())[:flush_count]: | ||||
prefix = os.path.basename(src) | ||||
fd, dst = pycompat.mkstemp(prefix=prefix, dir=self._dst_dir) | ||||
self._copies[src] = dst | ||||
os.close(fd) | ||||
# we no longer hardlink, but on the other hand we rarely do this, | ||||
# and we do it for the smallest file only and not at all in the | ||||
# common case. | ||||
with open(dst, 'wb') as bck: | ||||
fp.seek(0) | ||||
bck.write(fp.read()) | ||||
del self._volatile_fps[src] | ||||
fp.close() | ||||
def _keep_one(self, src): | ||||
"""preserve an open file handle for a given path""" | ||||
# store the file quickly to ensure we close it if any error happens | ||||
_, fp = self._volatile_fps[src] = (None, open(src, 'rb')) | ||||
fp.seek(0, os.SEEK_END) | ||||
size = fp.tell() | ||||
self._volatile_fps[src] = (size, fp) | ||||
r53256 | return size | |||
Augie Fackler
|
r43346 | |||
r51523 | def __call__(self, src): | |||
r52912 | """preserve the volatile file at src""" | |||
assert 0 < self._counter | ||||
if len(self._volatile_fps) >= (self.MAX_OPEN - 1): | ||||
self._flush_some_on_disk() | ||||
self._keep_one(src) | ||||
r51523 | ||||
r53256 | def try_keep(self, src) -> Optional[int]: | |||
"""record a volatile file and returns it size | ||||
return None if the file does not exists. | ||||
Used for cache file that are not lock protected. | ||||
""" | ||||
assert 0 < self._counter | ||||
if len(self._volatile_fps) >= (self.MAX_OPEN - 1): | ||||
self._flush_some_on_disk() | ||||
try: | ||||
return self._keep_one(src) | ||||
except IOError as err: | ||||
if err.errno not in (errno.ENOENT, errno.EPERM): | ||||
raise | ||||
return None | ||||
r52910 | @contextlib.contextmanager | |||
def open(self, src): | ||||
r52912 | assert 0 < self._counter | |||
entry = self._volatile_fps.get(src) | ||||
if entry is not None: | ||||
_size, fp = entry | ||||
fp.seek(0) | ||||
r52910 | yield fp | |||
r52912 | else: | |||
if self._copies is None: | ||||
actual_path = src | ||||
else: | ||||
actual_path = self._copies.get(src, src) | ||||
with open(actual_path, 'rb') as fp: | ||||
yield fp | ||||
Boris Feld
|
r35783 | |||
Augie Fackler
|
r43346 | |||
Boris Feld
|
r35785 | def _makemap(repo): | ||
"""make a (src -> vfs) map for the repo""" | ||||
vfsmap = { | ||||
_srcstore: repo.svfs, | ||||
_srccache: repo.cachevfs, | ||||
} | ||||
# we keep repo.vfs out of the on purpose, ther are too many danger there | ||||
# (eg: .hg/hgrc) | ||||
assert repo.vfs not in vfsmap.values() | ||||
return vfsmap | ||||
Augie Fackler
|
r43346 | |||
r51528 | def _emit2(repo, entries): | |||
Boris Feld
|
r35774 | """actually emit the stream bundle""" | ||
Boris Feld
|
r35785 | vfsmap = _makemap(repo) | ||
r48353 | # we keep repo.vfs out of the on purpose, ther are too many danger there | |||
# (eg: .hg/hgrc), | ||||
# | ||||
# this assert is duplicated (from _makemap) as author might think this is | ||||
# fine, while this is really not fine. | ||||
if repo.vfs in vfsmap.values(): | ||||
raise error.ProgrammingError( | ||||
b'repo.vfs must not be added to vfsmap for security reasons' | ||||
) | ||||
r51528 | # translate the vfs one | |||
entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries] | ||||
r53256 | _test_sync_point_walk_1_2(repo) | |||
r51528 | ||||
r51533 | max_linkrev = len(repo) | |||
r51528 | file_count = totalfilesize = 0 | |||
r53253 | with VolatileManager() as volatiles: | |||
# make sure we preserve volatile files | ||||
with util.nogc(): | ||||
# record the expected size of every file | ||||
for k, vfs, e in entries: | ||||
r53254 | e.preserve_volatiles(vfs, volatiles) | |||
r53253 | for f in e.files(): | |||
file_count += 1 | ||||
totalfilesize += f.file_size(vfs) | ||||
r51528 | ||||
r53253 | progress = repo.ui.makeprogress( | |||
_(b'bundle'), total=totalfilesize, unit=_(b'bytes') | ||||
) | ||||
progress.update(0) | ||||
with progress: | ||||
# the first yield release the lock on the repository | ||||
yield file_count, totalfilesize | ||||
totalbytecount = 0 | ||||
Boris Feld
|
r35783 | |||
r53253 | for src, vfs, e in entries: | |||
entry_streams = e.get_streams( | ||||
repo=repo, | ||||
vfs=vfs, | ||||
volatiles=volatiles, | ||||
max_changeset=max_linkrev, | ||||
preserve_file_count=True, | ||||
) | ||||
for name, stream, size in entry_streams: | ||||
yield src | ||||
yield util.uvarintencode(len(name)) | ||||
yield util.uvarintencode(size) | ||||
yield name | ||||
bytecount = 0 | ||||
for chunk in stream: | ||||
bytecount += len(chunk) | ||||
totalbytecount += len(chunk) | ||||
progress.update(totalbytecount) | ||||
yield chunk | ||||
if bytecount != size: | ||||
# Would most likely be caused by a race due to `hg | ||||
# strip` or a revlog split | ||||
msg = _( | ||||
b'clone could only read %d bytes from %s, but ' | ||||
b'expected %d bytes' | ||||
) | ||||
raise error.Abort(msg % (bytecount, name, size)) | ||||
Boris Feld
|
r35774 | |||
Augie Fackler
|
r43346 | |||
Arseniy Alekseyev
|
r51599 | def _emit3(repo, entries): | ||
"""actually emit the stream bundle (v3)""" | ||||
vfsmap = _makemap(repo) | ||||
# we keep repo.vfs out of the map on purpose, ther are too many dangers | ||||
# there (eg: .hg/hgrc), | ||||
# | ||||
# this assert is duplicated (from _makemap) as authors might think this is | ||||
# fine, while this is really not fine. | ||||
if repo.vfs in vfsmap.values(): | ||||
raise error.ProgrammingError( | ||||
b'repo.vfs must not be added to vfsmap for security reasons' | ||||
) | ||||
# translate the vfs once | ||||
r53256 | # we only turn this into a list for the `_test_sync`, this is not ideal | |||
base_entries = list(entries) | ||||
_test_sync_point_walk_1_2(repo) | ||||
entries = [] | ||||
r53253 | with VolatileManager() as volatiles: | |||
r52909 | # make sure we preserve volatile files | |||
r53256 | for vfs_key, e in base_entries: | |||
vfs = vfsmap[vfs_key] | ||||
any_files = True | ||||
r52445 | if e.maybe_volatile: | |||
r53256 | any_files = False | |||
r53254 | e.preserve_volatiles(vfs, volatiles) | |||
r52445 | for f in e.files(): | |||
if f.is_volatile: | ||||
# record the expected size under lock | ||||
f.file_size(vfs) | ||||
r53256 | any_files = True | |||
if any_files: | ||||
entries.append((vfs_key, vfsmap[vfs_key], e)) | ||||
r53253 | ||||
total_entry_count = len(entries) | ||||
max_linkrev = len(repo) | ||||
progress = repo.ui.makeprogress( | ||||
_(b'bundle'), | ||||
total=total_entry_count, | ||||
unit=_(b'entry'), | ||||
) | ||||
progress.update(0) | ||||
Arseniy Alekseyev
|
r51599 | # the first yield release the lock on the repository | ||
yield None | ||||
r53253 | with progress: | |||
yield util.uvarintencode(total_entry_count) | ||||
Arseniy Alekseyev
|
r51599 | |||
r53253 | for src, vfs, e in entries: | |||
entry_streams = e.get_streams( | ||||
repo=repo, | ||||
vfs=vfs, | ||||
volatiles=volatiles, | ||||
max_changeset=max_linkrev, | ||||
) | ||||
yield util.uvarintencode(len(entry_streams)) | ||||
for name, stream, size in entry_streams: | ||||
yield src | ||||
yield util.uvarintencode(len(name)) | ||||
yield util.uvarintencode(size) | ||||
yield name | ||||
yield from stream | ||||
progress.increment() | ||||
Arseniy Alekseyev
|
r51599 | |||
r53256 | def _test_sync_point_walk_1_2(repo): | |||
"""a function for synchronisation during tests | ||||
Triggered after gather entry, but before starting to process/preserve them | ||||
under lock. | ||||
(on v1 is triggered before the actual walk start) | ||||
""" | ||||
r53255 | def _test_sync_point_walk_3(repo): | |||
r53256 | """a function for synchronisation during tests | |||
Triggered right before releasing the lock, but after computing what need | ||||
needed to compute under lock. | ||||
""" | ||||
r47748 | ||||
r53255 | def _test_sync_point_walk_4(repo): | |||
r53256 | """a function for synchronisation during tests | |||
Triggered right after releasing the lock. | ||||
""" | ||||
# not really a StoreEntry, but close enough | ||||
class CacheEntry(store.SimpleStoreEntry): | ||||
"""Represent an entry for Cache files | ||||
It has special logic to preserve cache file early and accept optional | ||||
presence. | ||||
(Yes... this is not really a StoreEntry, but close enough. We could have a | ||||
BaseEntry base class, bbut the store one would be identical) | ||||
""" | ||||
def __init__(self, entry_path): | ||||
super().__init__( | ||||
entry_path, | ||||
# we will directly deal with that in `setup_cache_file` | ||||
is_volatile=True, | ||||
) | ||||
def preserve_volatiles(self, vfs, volatiles): | ||||
self._file_size = volatiles.try_keep(vfs.join(self._entry_path)) | ||||
if self._file_size is None: | ||||
self._files = [] | ||||
else: | ||||
assert self._is_volatile | ||||
self._files = [ | ||||
CacheFile( | ||||
unencoded_path=self._entry_path, | ||||
file_size=self._file_size, | ||||
is_volatile=self._is_volatile, | ||||
) | ||||
] | ||||
def files(self): | ||||
if self._files is None: | ||||
self._files = [ | ||||
CacheFile( | ||||
unencoded_path=self._entry_path, | ||||
is_volatile=self._is_volatile, | ||||
) | ||||
] | ||||
return super().files() | ||||
class CacheFile(store.StoreFile): | ||||
# inform the "copy/hardlink" version that this file might be missing | ||||
# without consequences. | ||||
optional = True | ||||
r47748 | ||||
r51408 | def _entries_walk(repo, includes, excludes, includeobsmarkers): | |||
"""emit a seris of files information useful to clone a repo | ||||
return (vfs-key, entry) iterator | ||||
Where `entry` is StoreEntry. (used even for cache entries) | ||||
""" | ||||
assert repo._currentlock(repo._lockref) is not None | ||||
matcher = None | ||||
if includes or excludes: | ||||
matcher = narrowspec.match(repo.root, includes, excludes) | ||||
phase = not repo.publishing() | ||||
r52443 | # Python is getting crazy at all the small container we creates, disabling | |||
# the gc while we do so helps performance a lot. | ||||
with util.nogc(): | ||||
entries = _walkstreamfiles( | ||||
repo, | ||||
matcher, | ||||
phase=phase, | ||||
obsolescence=includeobsmarkers, | ||||
) | ||||
for entry in entries: | ||||
yield (_srcstore, entry) | ||||
r51408 | ||||
r52443 | for name in cacheutil.cachetocopy(repo): | |||
if repo.cachevfs.exists(name): | ||||
# not really a StoreEntry, but close enough | ||||
r53256 | yield (_srccache, CacheEntry(entry_path=name)) | |||
r51409 | ||||
r51408 | ||||
r40434 | def generatev2(repo, includes, excludes, includeobsmarkers): | |||
Boris Feld
|
r35774 | """Emit content for version 2 of a streaming clone. | ||
the data stream consists the following entries: | ||||
Boris Feld
|
r35785 | 1) A char representing the file destination (eg: store or cache) | ||
2) A varint containing the length of the filename | ||||
3) A varint containing the length of file data | ||||
4) N bytes containing the filename (the internal, store-agnostic form) | ||||
5) N bytes containing the file data | ||||
Boris Feld
|
r35774 | |||
Returns a 3-tuple of (file count, file size, data iterator). | ||||
""" | ||||
with repo.lock(): | ||||
r48237 | repo.ui.debug(b'scanning\n') | |||
Pulkit Goyal
|
r40375 | |||
r51528 | entries = _entries_walk( | |||
r48237 | repo, | |||
includes=includes, | ||||
excludes=excludes, | ||||
includeobsmarkers=includeobsmarkers, | ||||
) | ||||
r51528 | chunks = _emit2(repo, entries) | |||
Boris Feld
|
r35783 | first = next(chunks) | ||
r51528 | file_count, total_file_size = first | |||
r53255 | _test_sync_point_walk_3(repo) | |||
_test_sync_point_walk_4(repo) | ||||
Boris Feld
|
r35774 | |||
r51528 | return file_count, total_file_size, chunks | |||
Boris Feld
|
r35774 | |||
Augie Fackler
|
r43346 | |||
r51417 | def generatev3(repo, includes, excludes, includeobsmarkers): | |||
Arseniy Alekseyev
|
r51599 | """Emit content for version 3 of a streaming clone. | ||
the data stream consists the following: | ||||
1) A varint E containing the number of entries (can be 0), then E entries follow | ||||
2) For each entry: | ||||
2.1) The number of files in this entry (can be 0, but typically 1 or 2) | ||||
2.2) For each file: | ||||
2.2.1) A char representing the file destination (eg: store or cache) | ||||
2.2.2) A varint N containing the length of the filename | ||||
2.2.3) A varint M containing the length of file data | ||||
2.2.4) N bytes containing the filename (the internal, store-agnostic form) | ||||
2.2.5) M bytes containing the file data | ||||
Returns the data iterator. | ||||
XXX This format is experimental and subject to change. Here is a | ||||
XXX non-exhaustive list of things this format could do or change: | ||||
- making it easier to write files in parallel | ||||
- holding the lock for a shorter time | ||||
- improving progress information | ||||
- ways to adjust the number of expected entries/files ? | ||||
""" | ||||
r52444 | # Python is getting crazy at all the small container we creates while | |||
# considering the files to preserve, disabling the gc while we do so helps | ||||
# performance a lot. | ||||
with repo.lock(), util.nogc(): | ||||
Arseniy Alekseyev
|
r51599 | repo.ui.debug(b'scanning\n') | ||
entries = _entries_walk( | ||||
repo, | ||||
includes=includes, | ||||
excludes=excludes, | ||||
includeobsmarkers=includeobsmarkers, | ||||
) | ||||
chunks = _emit3(repo, list(entries)) | ||||
first = next(chunks) | ||||
assert first is None | ||||
r53255 | _test_sync_point_walk_3(repo) | |||
_test_sync_point_walk_4(repo) | ||||
Arseniy Alekseyev
|
r51599 | |||
return chunks | ||||
r51417 | ||||
Boris Feld
|
r35785 | @contextlib.contextmanager | ||
def nested(*ctxs): | ||||
Augie Fackler
|
r39793 | this = ctxs[0] | ||
rest = ctxs[1:] | ||||
with this: | ||||
if rest: | ||||
with nested(*rest): | ||||
yield | ||||
else: | ||||
Boris Feld
|
r35785 | yield | ||
Augie Fackler
|
r43346 | |||
Boris Feld
|
r35774 | def consumev2(repo, fp, filecount, filesize): | ||
"""Apply the contents from a version 2 streaming clone. | ||||
Data is read from an object that only needs to provide a ``read(size)`` | ||||
method. | ||||
""" | ||||
with repo.lock(): | ||||
Augie Fackler
|
r43346 | repo.ui.status( | ||
Augie Fackler
|
r43347 | _(b'%d files to transfer, %s of data\n') | ||
Augie Fackler
|
r43346 | % (filecount, util.bytecount(filesize)) | ||
) | ||||
Boris Feld
|
r35774 | |||
start = util.timer() | ||||
Augie Fackler
|
r43346 | progress = repo.ui.makeprogress( | ||
Augie Fackler
|
r43347 | _(b'clone'), total=filesize, unit=_(b'bytes') | ||
Augie Fackler
|
r43346 | ) | ||
Martin von Zweigbergk
|
r38368 | progress.update(0) | ||
Boris Feld
|
r35774 | |||
Boris Feld
|
r35785 | vfsmap = _makemap(repo) | ||
r48353 | # we keep repo.vfs out of the on purpose, ther are too many danger | |||
# there (eg: .hg/hgrc), | ||||
# | ||||
# this assert is duplicated (from _makemap) as author might think this | ||||
# is fine, while this is really not fine. | ||||
if repo.vfs in vfsmap.values(): | ||||
raise error.ProgrammingError( | ||||
b'repo.vfs must not be added to vfsmap for security reasons' | ||||
) | ||||
Boris Feld
|
r35774 | |||
Augie Fackler
|
r43347 | with repo.transaction(b'clone'): | ||
Augie Fackler
|
r43346 | ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values()) | ||
Boris Feld
|
r35785 | with nested(*ctxs): | ||
Boris Feld
|
r35774 | for i in range(filecount): | ||
Boris Feld
|
r35821 | src = util.readexactly(fp, 1) | ||
Boris Feld
|
r35785 | vfs = vfsmap[src] | ||
Boris Feld
|
r35774 | namelen = util.uvarintdecodestream(fp) | ||
datalen = util.uvarintdecodestream(fp) | ||||
Boris Feld
|
r35821 | name = util.readexactly(fp, namelen) | ||
Boris Feld
|
r35774 | |||
if repo.ui.debugflag: | ||||
Augie Fackler
|
r43346 | repo.ui.debug( | ||
Augie Fackler
|
r43347 | b'adding [%s] %s (%s)\n' | ||
Augie Fackler
|
r43346 | % (src, name, util.bytecount(datalen)) | ||
) | ||||
Boris Feld
|
r35774 | |||
Augie Fackler
|
r43347 | with vfs(name, b'w') as ofp: | ||
Boris Feld
|
r35774 | for chunk in util.filechunkiter(fp, limit=datalen): | ||
Martin von Zweigbergk
|
r38368 | progress.increment(step=len(chunk)) | ||
Boris Feld
|
r35774 | ofp.write(chunk) | ||
# force @filecache properties to be reloaded from | ||||
# streamclone-ed file at next access | ||||
repo.invalidate(clearfilecache=True) | ||||
elapsed = util.timer() - start | ||||
if elapsed <= 0: | ||||
elapsed = 0.001 | ||||
Augie Fackler
|
r43346 | repo.ui.status( | ||
Augie Fackler
|
r43347 | _(b'transferred %s in %.1f seconds (%s/sec)\n') | ||
Augie Fackler
|
r43346 | % ( | ||
util.bytecount(progress.pos), | ||||
elapsed, | ||||
util.bytecount(progress.pos / elapsed), | ||||
) | ||||
) | ||||
Martin von Zweigbergk
|
r38392 | progress.complete() | ||
Boris Feld
|
r35774 | |||
Augie Fackler
|
r43346 | |||
Arseniy Alekseyev
|
r51599 | def consumev3(repo, fp): | ||
"""Apply the contents from a version 3 streaming clone. | ||||
Data is read from an object that only needs to provide a ``read(size)`` | ||||
method. | ||||
""" | ||||
with repo.lock(): | ||||
start = util.timer() | ||||
entrycount = util.uvarintdecodestream(fp) | ||||
repo.ui.status(_(b'%d entries to transfer\n') % (entrycount)) | ||||
progress = repo.ui.makeprogress( | ||||
_(b'clone'), | ||||
total=entrycount, | ||||
unit=_(b'entries'), | ||||
) | ||||
progress.update(0) | ||||
bytes_transferred = 0 | ||||
vfsmap = _makemap(repo) | ||||
# we keep repo.vfs out of the on purpose, there are too many dangers | ||||
# there (eg: .hg/hgrc), | ||||
# | ||||
# this assert is duplicated (from _makemap) as authors might think this | ||||
# is fine, while this is really not fine. | ||||
if repo.vfs in vfsmap.values(): | ||||
raise error.ProgrammingError( | ||||
b'repo.vfs must not be added to vfsmap for security reasons' | ||||
) | ||||
with repo.transaction(b'clone'): | ||||
ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values()) | ||||
with nested(*ctxs): | ||||
for i in range(entrycount): | ||||
filecount = util.uvarintdecodestream(fp) | ||||
if filecount == 0: | ||||
if repo.ui.debugflag: | ||||
repo.ui.debug(b'entry with no files [%d]\n' % (i)) | ||||
for i in range(filecount): | ||||
src = util.readexactly(fp, 1) | ||||
vfs = vfsmap[src] | ||||
namelen = util.uvarintdecodestream(fp) | ||||
datalen = util.uvarintdecodestream(fp) | ||||
name = util.readexactly(fp, namelen) | ||||
if repo.ui.debugflag: | ||||
msg = b'adding [%s] %s (%s)\n' | ||||
msg %= (src, name, util.bytecount(datalen)) | ||||
repo.ui.debug(msg) | ||||
bytes_transferred += datalen | ||||
with vfs(name, b'w') as ofp: | ||||
for chunk in util.filechunkiter(fp, limit=datalen): | ||||
ofp.write(chunk) | ||||
progress.increment(step=1) | ||||
# force @filecache properties to be reloaded from | ||||
# streamclone-ed file at next access | ||||
repo.invalidate(clearfilecache=True) | ||||
elapsed = util.timer() - start | ||||
if elapsed <= 0: | ||||
elapsed = 0.001 | ||||
msg = _(b'transferred %s in %.1f seconds (%s/sec)\n') | ||||
byte_count = util.bytecount(bytes_transferred) | ||||
bytes_sec = util.bytecount(bytes_transferred / elapsed) | ||||
msg %= (byte_count, elapsed, bytes_sec) | ||||
repo.ui.status(msg) | ||||
progress.complete() | ||||
Boris Feld
|
r35774 | def applybundlev2(repo, fp, filecount, filesize, requirements): | ||
Gregory Szorc
|
r39736 | from . import localrepo | ||
Boris Feld
|
r35774 | missingreqs = [r for r in requirements if r not in repo.supported] | ||
if missingreqs: | ||||
Augie Fackler
|
r43346 | raise error.Abort( | ||
Martin von Zweigbergk
|
r43387 | _(b'unable to apply stream clone: unsupported format: %s') | ||
Augie Fackler
|
r43347 | % b', '.join(sorted(missingreqs)) | ||
Augie Fackler
|
r43346 | ) | ||
Boris Feld
|
r35774 | |||
consumev2(repo, fp, filecount, filesize) | ||||
Boris Feld
|
r35822 | |||
r49442 | repo.requirements = new_stream_clone_requirements( | |||
repo.requirements, | ||||
requirements, | ||||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r39736 | repo.svfs.options = localrepo.resolvestorevfsoptions( | ||
Augie Fackler
|
r43346 | repo.ui, repo.requirements, repo.features | ||
) | ||||
Pulkit Goyal
|
r45666 | scmutil.writereporequirements(repo) | ||
r49542 | nodemap.post_stream_cleanup(repo) | |||
r48240 | ||||
Arseniy Alekseyev
|
r51599 | def applybundlev3(repo, fp, requirements): | ||
from . import localrepo | ||||
missingreqs = [r for r in requirements if r not in repo.supported] | ||||
if missingreqs: | ||||
msg = _(b'unable to apply stream clone: unsupported format: %s') | ||||
msg %= b', '.join(sorted(missingreqs)) | ||||
raise error.Abort(msg) | ||||
consumev3(repo, fp) | ||||
repo.requirements = new_stream_clone_requirements( | ||||
repo.requirements, | ||||
requirements, | ||||
) | ||||
repo.svfs.options = localrepo.resolvestorevfsoptions( | ||||
repo.ui, repo.requirements, repo.features | ||||
) | ||||
scmutil.writereporequirements(repo) | ||||
nodemap.post_stream_cleanup(repo) | ||||
r48240 | def _copy_files(src_vfs_map, dst_vfs_map, entries, progress): | |||
hardlink = [True] | ||||
def copy_used(): | ||||
hardlink[0] = False | ||||
progress.topic = _(b'copying') | ||||
r53256 | for k, path, optional in entries: | |||
r48240 | src_vfs = src_vfs_map[k] | |||
dst_vfs = dst_vfs_map[k] | ||||
src_path = src_vfs.join(path) | ||||
dst_path = dst_vfs.join(path) | ||||
r48685 | # We cannot use dirname and makedirs of dst_vfs here because the store | |||
# encoding confuses them. See issue 6581 for details. | ||||
dirname = os.path.dirname(dst_path) | ||||
if not os.path.exists(dirname): | ||||
util.makedirs(dirname) | ||||
r48240 | dst_vfs.register_file(path) | |||
# XXX we could use the #nb_bytes argument. | ||||
r53256 | try: | |||
util.copyfile( | ||||
src_path, | ||||
dst_path, | ||||
hardlink=hardlink[0], | ||||
no_hardlink_cb=copy_used, | ||||
check_fs_hardlink=False, | ||||
) | ||||
except FileNotFoundError: | ||||
if not optional: | ||||
raise | ||||
r48240 | progress.increment() | |||
return hardlink[0] | ||||
def local_copy(src_repo, dest_repo): | ||||
"""copy all content from one local repository to another | ||||
This is useful for local clone""" | ||||
src_store_requirements = { | ||||
r | ||||
for r in src_repo.requirements | ||||
if r not in requirementsmod.WORKING_DIR_REQUIREMENTS | ||||
} | ||||
dest_store_requirements = { | ||||
r | ||||
for r in dest_repo.requirements | ||||
if r not in requirementsmod.WORKING_DIR_REQUIREMENTS | ||||
} | ||||
assert src_store_requirements == dest_store_requirements | ||||
with dest_repo.lock(): | ||||
with src_repo.lock(): | ||||
r48241 | # bookmark is not integrated to the streaming as it might use the | |||
# `repo.vfs` and they are too many sentitive data accessible | ||||
# through `repo.vfs` to expose it to streaming clone. | ||||
src_book_vfs = bookmarks.bookmarksvfs(src_repo) | ||||
srcbookmarks = src_book_vfs.join(b'bookmarks') | ||||
bm_count = 0 | ||||
if os.path.exists(srcbookmarks): | ||||
bm_count = 1 | ||||
r51526 | entries = _entries_walk( | |||
r48240 | src_repo, | |||
includes=None, | ||||
excludes=None, | ||||
includeobsmarkers=True, | ||||
) | ||||
r51526 | entries = list(entries) | |||
r48240 | src_vfs_map = _makemap(src_repo) | |||
dest_vfs_map = _makemap(dest_repo) | ||||
r51526 | total_files = sum(len(e[1].files()) for e in entries) + bm_count | |||
r48240 | progress = src_repo.ui.makeprogress( | |||
topic=_(b'linking'), | ||||
r51526 | total=total_files, | |||
r48240 | unit=_(b'files'), | |||
) | ||||
# copy files | ||||
# | ||||
# We could copy the full file while the source repository is locked | ||||
# and the other one without the lock. However, in the linking case, | ||||
# this would also requires checks that nobody is appending any data | ||||
# to the files while we do the clone, so this is not done yet. We | ||||
# could do this blindly when copying files. | ||||
r51526 | files = [ | |||
r53256 | (vfs_key, f.unencoded_path, f.optional) | |||
r51526 | for vfs_key, e in entries | |||
for f in e.files() | ||||
] | ||||
r48240 | hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress) | |||
# copy bookmarks over | ||||
r48241 | if bm_count: | |||
dst_book_vfs = bookmarks.bookmarksvfs(dest_repo) | ||||
dstbookmarks = dst_book_vfs.join(b'bookmarks') | ||||
r48240 | util.copyfile(srcbookmarks, dstbookmarks) | |||
progress.complete() | ||||
if hardlink: | ||||
msg = b'linked %d files\n' | ||||
else: | ||||
msg = b'copied %d files\n' | ||||
r51526 | src_repo.ui.debug(msg % total_files) | |||
r48240 | ||||
with dest_repo.transaction(b"localclone") as tr: | ||||
dest_repo.store.write(tr) | ||||
# clean up transaction file as they do not make sense | ||||
r51196 | transaction.cleanup_undo_files(dest_repo.ui.warn, dest_repo.vfs_map) | |||