streamclone.py
887 lines
| 28.6 KiB
| text/x-python
|
PythonLexer
/ mercurial / streamclone.py
Gregory Szorc
|
r26441 | # streamclone.py - producing and consuming streaming repository data | ||
# | ||||
# Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com> | ||||
# | ||||
# This software may be used and distributed according to the terms of the | ||||
# GNU General Public License version 2 or any later version. | ||||
from __future__ import absolute_import | ||||
Boris Feld
|
r35783 | import contextlib | ||
r48240 | import errno | |||
Boris Feld
|
r35783 | import os | ||
Gregory Szorc
|
r26755 | import struct | ||
Gregory Szorc
|
r26443 | |||
Gregory Szorc
|
r26442 | from .i18n import _ | ||
Gregory Szorc
|
r43355 | from .pycompat import open | ||
Augie Fackler
|
r43346 | from .interfaces import repository | ||
Gregory Szorc
|
r26441 | from . import ( | ||
r48240 | bookmarks, | |||
Boris Feld
|
r35785 | cacheutil, | ||
Gregory Szorc
|
r26442 | error, | ||
Pulkit Goyal
|
r40375 | narrowspec, | ||
Gregory Szorc
|
r32744 | phases, | ||
Yuya Nishihara
|
r38182 | pycompat, | ||
Raphaël Gomès
|
r47371 | requirements as requirementsmod, | ||
Pulkit Goyal
|
r45666 | scmutil, | ||
Gregory Szorc
|
r26443 | store, | ||
Gregory Szorc
|
r26442 | util, | ||
Gregory Szorc
|
r26441 | ) | ||
r48240 | from .utils import ( | |||
stringutil, | ||||
) | ||||
Gregory Szorc
|
r26441 | |||
Augie Fackler
|
r43346 | |||
Boris Feld
|
r35775 | def canperformstreamclone(pullop, bundle2=False): | ||
Gregory Szorc
|
r26446 | """Whether it is possible to perform a streaming clone as part of pull. | ||
Gregory Szorc
|
r26445 | |||
Boris Feld
|
r35775 | ``bundle2`` will cause the function to consider stream clone through | ||
bundle2 and only through bundle2. | ||||
Gregory Szorc
|
r26467 | |||
Gregory Szorc
|
r26446 | Returns a tuple of (supported, requirements). ``supported`` is True if | ||
streaming clone is supported and False otherwise. ``requirements`` is | ||||
a set of repo requirements from the remote, or ``None`` if stream clone | ||||
isn't supported. | ||||
""" | ||||
Gregory Szorc
|
r26466 | repo = pullop.repo | ||
remote = pullop.remote | ||||
Gregory Szorc
|
r26467 | bundle2supported = False | ||
if pullop.canusebundle2: | ||||
Augie Fackler
|
r43347 | if b'v2' in pullop.remotebundle2caps.get(b'stream', []): | ||
Gregory Szorc
|
r26467 | bundle2supported = True | ||
# else | ||||
Augie Fackler
|
r43346 | # Server doesn't support bundle2 stream clone or doesn't support | ||
# the versions we support. Fall back and possibly allow legacy. | ||||
Gregory Szorc
|
r26467 | |||
# Ensures legacy code path uses available bundle2. | ||||
Boris Feld
|
r35775 | if bundle2supported and not bundle2: | ||
Gregory Szorc
|
r26467 | return False, None | ||
# Ensures bundle2 doesn't try to do a stream clone if it isn't supported. | ||||
Boris Feld
|
r35775 | elif bundle2 and not bundle2supported: | ||
return False, None | ||||
Gregory Szorc
|
r26467 | |||
Gregory Szorc
|
r26447 | # Streaming clone only works on empty repositories. | ||
if len(repo): | ||||
return False, None | ||||
Gregory Szorc
|
r26446 | # Streaming clone only works if all data is being requested. | ||
Gregory Szorc
|
r26466 | if pullop.heads: | ||
Gregory Szorc
|
r26446 | return False, None | ||
Gregory Szorc
|
r26445 | |||
Gregory Szorc
|
r26466 | streamrequested = pullop.streamclonerequested | ||
Gregory Szorc
|
r26446 | # If we don't have a preference, let the server decide for us. This | ||
# likely only comes into play in LANs. | ||||
if streamrequested is None: | ||||
# The server can advertise whether to prefer streaming clone. | ||||
Augie Fackler
|
r43347 | streamrequested = remote.capable(b'stream-preferred') | ||
Gregory Szorc
|
r26446 | |||
if not streamrequested: | ||||
return False, None | ||||
Gregory Szorc
|
r26445 | |||
Gregory Szorc
|
r26446 | # In order for stream clone to work, the client has to support all the | ||
# requirements advertised by the server. | ||||
# | ||||
# The server advertises its requirements via the "stream" and "streamreqs" | ||||
# capability. "stream" (a value-less capability) is advertised if and only | ||||
# if the only requirement is "revlogv1." Else, the "streamreqs" capability | ||||
# is advertised and contains a comma-delimited list of requirements. | ||||
requirements = set() | ||||
Augie Fackler
|
r43347 | if remote.capable(b'stream'): | ||
Raphaël Gomès
|
r47371 | requirements.add(requirementsmod.REVLOGV1_REQUIREMENT) | ||
Gregory Szorc
|
r26446 | else: | ||
Augie Fackler
|
r43347 | streamreqs = remote.capable(b'streamreqs') | ||
Gregory Szorc
|
r26446 | # This is weird and shouldn't happen with modern servers. | ||
if not streamreqs: | ||||
Augie Fackler
|
r43346 | pullop.repo.ui.warn( | ||
_( | ||||
Augie Fackler
|
r43347 | b'warning: stream clone requested but server has them ' | ||
b'disabled\n' | ||||
Augie Fackler
|
r43346 | ) | ||
) | ||||
Gregory Szorc
|
r26446 | return False, None | ||
Augie Fackler
|
r43347 | streamreqs = set(streamreqs.split(b',')) | ||
Gregory Szorc
|
r26446 | # Server requires something we don't support. Bail. | ||
Siddharth Agarwal
|
r32259 | missingreqs = streamreqs - repo.supportedformats | ||
if missingreqs: | ||||
pullop.repo.ui.warn( | ||||
Augie Fackler
|
r43346 | _( | ||
Augie Fackler
|
r43347 | b'warning: stream clone requested but client is missing ' | ||
b'requirements: %s\n' | ||||
Augie Fackler
|
r43346 | ) | ||
Augie Fackler
|
r43347 | % b', '.join(sorted(missingreqs)) | ||
Augie Fackler
|
r43346 | ) | ||
pullop.repo.ui.warn( | ||||
_( | ||||
Augie Fackler
|
r43347 | b'(see https://www.mercurial-scm.org/wiki/MissingRequirement ' | ||
b'for more information)\n' | ||||
Augie Fackler
|
r43346 | ) | ||
) | ||||
Gregory Szorc
|
r26446 | return False, None | ||
requirements = streamreqs | ||||
return True, requirements | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r26462 | def maybeperformlegacystreamclone(pullop): | ||
"""Possibly perform a legacy stream clone operation. | ||||
Legacy stream clones are performed as part of pull but before all other | ||||
operations. | ||||
A legacy stream clone will not be performed if a bundle2 stream clone is | ||||
supported. | ||||
""" | ||||
Gregory Szorc
|
r39736 | from . import localrepo | ||
Gregory Szorc
|
r26466 | supported, requirements = canperformstreamclone(pullop) | ||
Gregory Szorc
|
r26458 | |||
Gregory Szorc
|
r26446 | if not supported: | ||
return | ||||
Gregory Szorc
|
r26466 | repo = pullop.repo | ||
remote = pullop.remote | ||||
Gregory Szorc
|
r26459 | # Save remote branchmap. We will use it later to speed up branchcache | ||
# creation. | ||||
rbranchmap = None | ||||
Augie Fackler
|
r43347 | if remote.capable(b'branchmap'): | ||
Gregory Szorc
|
r37656 | with remote.commandexecutor() as e: | ||
Augie Fackler
|
r43347 | rbranchmap = e.callcommand(b'branchmap', {}).result() | ||
Gregory Szorc
|
r26459 | |||
Augie Fackler
|
r43347 | repo.ui.status(_(b'streaming all changes\n')) | ||
Gregory Szorc
|
r26470 | |||
Gregory Szorc
|
r37656 | with remote.commandexecutor() as e: | ||
Augie Fackler
|
r43347 | fp = e.callcommand(b'stream_out', {}).result() | ||
Gregory Szorc
|
r37656 | |||
# TODO strictly speaking, this code should all be inside the context | ||||
# manager because the context manager is supposed to ensure all wire state | ||||
# is flushed when exiting. But the legacy peers don't do this, so it | ||||
# doesn't matter. | ||||
Gregory Szorc
|
r26459 | l = fp.readline() | ||
try: | ||||
resp = int(l) | ||||
except ValueError: | ||||
raise error.ResponseError( | ||||
Augie Fackler
|
r43347 | _(b'unexpected response from remote server:'), l | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r26459 | if resp == 1: | ||
Augie Fackler
|
r43347 | raise error.Abort(_(b'operation forbidden by server')) | ||
Gregory Szorc
|
r26459 | elif resp == 2: | ||
Augie Fackler
|
r43347 | raise error.Abort(_(b'locking the remote repository failed')) | ||
Gregory Szorc
|
r26459 | elif resp != 0: | ||
Augie Fackler
|
r43347 | raise error.Abort(_(b'the server sent an unknown error code')) | ||
Gregory Szorc
|
r26459 | |||
Gregory Szorc
|
r26468 | l = fp.readline() | ||
try: | ||||
Augie Fackler
|
r43347 | filecount, bytecount = map(int, l.split(b' ', 1)) | ||
Gregory Szorc
|
r26468 | except (ValueError, TypeError): | ||
raise error.ResponseError( | ||||
Augie Fackler
|
r43347 | _(b'unexpected response from remote server:'), l | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r26468 | |||
Bryan O'Sullivan
|
r27850 | with repo.lock(): | ||
Gregory Szorc
|
r26468 | consumev1(repo, fp, filecount, bytecount) | ||
Gregory Szorc
|
r26461 | |||
# new requirements = old non-format requirements + | ||||
# new format-related remote requirements | ||||
# requirements from the streamed-in repository | ||||
repo.requirements = requirements | ( | ||||
Augie Fackler
|
r43346 | repo.requirements - repo.supportedformats | ||
) | ||||
Gregory Szorc
|
r39736 | repo.svfs.options = localrepo.resolvestorevfsoptions( | ||
Augie Fackler
|
r43346 | repo.ui, repo.requirements, repo.features | ||
) | ||||
Pulkit Goyal
|
r45666 | scmutil.writereporequirements(repo) | ||
Gregory Szorc
|
r26461 | |||
if rbranchmap: | ||||
Martijn Pieters
|
r41764 | repo._branchcaches.replace(repo, rbranchmap) | ||
Gregory Szorc
|
r26461 | |||
repo.invalidate() | ||||
Gregory Szorc
|
r26445 | |||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r32744 | def allowservergeneration(repo): | ||
Gregory Szorc
|
r26444 | """Whether streaming clones are allowed from the server.""" | ||
Gregory Szorc
|
r40064 | if repository.REPO_FEATURE_STREAM_CLONE not in repo.features: | ||
return False | ||||
Augie Fackler
|
r43347 | if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True): | ||
Gregory Szorc
|
r32744 | return False | ||
# The way stream clone works makes it impossible to hide secret changesets. | ||||
# So don't allow this by default. | ||||
secret = phases.hassecret(repo) | ||||
if secret: | ||||
Augie Fackler
|
r43347 | return repo.ui.configbool(b'server', b'uncompressedallowsecret') | ||
Gregory Szorc
|
r32744 | |||
return True | ||||
Gregory Szorc
|
r26444 | |||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r26443 | # This is it's own function so extensions can override it. | ||
Pulkit Goyal
|
r40375 | def _walkstreamfiles(repo, matcher=None): | ||
return repo.store.walk(matcher) | ||||
Gregory Szorc
|
r26443 | |||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r26443 | def generatev1(repo): | ||
"""Emit content for version 1 of a streaming clone. | ||||
Gregory Szorc
|
r26469 | This returns a 3-tuple of (file count, byte size, data iterator). | ||
Gregory Szorc
|
r26443 | |||
Gregory Szorc
|
r26469 | The data iterator consists of N entries for each file being transferred. | ||
Each file entry starts as a line with the file name and integer size | ||||
delimited by a null byte. | ||||
Gregory Szorc
|
r26443 | |||
The raw file data follows. Following the raw file data is the next file | ||||
entry, or EOF. | ||||
When used on the wire protocol, an additional line indicating protocol | ||||
success will be prepended to the stream. This function is not responsible | ||||
for adding it. | ||||
This function will obtain a repository lock to ensure a consistent view of | ||||
the store is captured. It therefore may raise LockError. | ||||
""" | ||||
entries = [] | ||||
total_bytes = 0 | ||||
# Get consistent snapshot of repo, lock during scan. | ||||
Bryan O'Sullivan
|
r27845 | with repo.lock(): | ||
Augie Fackler
|
r43347 | repo.ui.debug(b'scanning\n') | ||
r47657 | for file_type, name, ename, size in _walkstreamfiles(repo): | |||
Gregory Szorc
|
r26443 | if size: | ||
entries.append((name, size)) | ||||
total_bytes += size | ||||
r47748 | _test_sync_point_walk_1(repo) | |||
_test_sync_point_walk_2(repo) | ||||
Gregory Szorc
|
r26443 | |||
Augie Fackler
|
r43346 | repo.ui.debug( | ||
Augie Fackler
|
r43347 | b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes) | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r26443 | |||
svfs = repo.svfs | ||||
debugflag = repo.ui.debugflag | ||||
Gregory Szorc
|
r26469 | def emitrevlogdata(): | ||
r33256 | for name, size in entries: | |||
if debugflag: | ||||
Augie Fackler
|
r43347 | repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size)) | ||
r33256 | # partially encode name over the wire for backwards compat | |||
Augie Fackler
|
r43347 | yield b'%s\0%d\n' % (store.encodedir(name), size) | ||
Yuya Nishihara
|
r33411 | # auditing at this stage is both pointless (paths are already | ||
# trusted by the local repo) and expensive | ||||
Augie Fackler
|
r43347 | with svfs(name, b'rb', auditpath=False) as fp: | ||
Yuya Nishihara
|
r33410 | if size <= 65536: | ||
r33256 | yield fp.read(size) | |||
Yuya Nishihara
|
r33410 | else: | ||
for chunk in util.filechunkiter(fp, limit=size): | ||||
yield chunk | ||||
Gregory Szorc
|
r26469 | |||
return len(entries), total_bytes, emitrevlogdata() | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r26469 | def generatev1wireproto(repo): | ||
"""Emit content for version 1 of streaming clone suitable for the wire. | ||||
Gregory Szorc
|
r35507 | This is the data output from ``generatev1()`` with 2 header lines. The | ||
first line indicates overall success. The 2nd contains the file count and | ||||
byte size of payload. | ||||
The success line contains "0" for success, "1" for stream generation not | ||||
allowed, and "2" for error locking the repository (possibly indicating | ||||
a permissions error for the server process). | ||||
Gregory Szorc
|
r26469 | """ | ||
Gregory Szorc
|
r35507 | if not allowservergeneration(repo): | ||
Augie Fackler
|
r43347 | yield b'1\n' | ||
Gregory Szorc
|
r35507 | return | ||
try: | ||||
filecount, bytecount, it = generatev1(repo) | ||||
except error.LockError: | ||||
Augie Fackler
|
r43347 | yield b'2\n' | ||
Gregory Szorc
|
r35507 | return | ||
# Indicates successful response. | ||||
Augie Fackler
|
r43347 | yield b'0\n' | ||
yield b'%d %d\n' % (filecount, bytecount) | ||||
Gregory Szorc
|
r26469 | for chunk in it: | ||
yield chunk | ||||
Gregory Szorc
|
r26443 | |||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r43347 | def generatebundlev1(repo, compression=b'UN'): | ||
Gregory Szorc
|
r26755 | """Emit content for version 1 of a stream clone bundle. | ||
The first 4 bytes of the output ("HGS1") denote this as stream clone | ||||
bundle version 1. | ||||
The next 2 bytes indicate the compression type. Only "UN" is currently | ||||
supported. | ||||
The next 16 bytes are two 64-bit big endian unsigned integers indicating | ||||
file count and byte count, respectively. | ||||
The next 2 bytes is a 16-bit big endian unsigned short declaring the length | ||||
of the requirements string, including a trailing \0. The following N bytes | ||||
are the requirements string, which is ASCII containing a comma-delimited | ||||
list of repo requirements that are needed to support the data. | ||||
The remaining content is the output of ``generatev1()`` (which may be | ||||
compressed in the future). | ||||
Returns a tuple of (requirements, data generator). | ||||
""" | ||||
Augie Fackler
|
r43347 | if compression != b'UN': | ||
raise ValueError(b'we do not support the compression argument yet') | ||||
Gregory Szorc
|
r26755 | |||
requirements = repo.requirements & repo.supportedformats | ||||
Augie Fackler
|
r43347 | requires = b','.join(sorted(requirements)) | ||
Gregory Szorc
|
r26755 | |||
def gen(): | ||||
Augie Fackler
|
r43347 | yield b'HGS1' | ||
Gregory Szorc
|
r26755 | yield compression | ||
filecount, bytecount, it = generatev1(repo) | ||||
Augie Fackler
|
r43346 | repo.ui.status( | ||
Augie Fackler
|
r43347 | _(b'writing %d bytes for %d files\n') % (bytecount, filecount) | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r26755 | |||
Augie Fackler
|
r43347 | yield struct.pack(b'>QQ', filecount, bytecount) | ||
yield struct.pack(b'>H', len(requires) + 1) | ||||
yield requires + b'\0' | ||||
Gregory Szorc
|
r26755 | |||
# This is where we'll add compression in the future. | ||||
Augie Fackler
|
r43347 | assert compression == b'UN' | ||
Gregory Szorc
|
r26755 | |||
Augie Fackler
|
r43346 | progress = repo.ui.makeprogress( | ||
Augie Fackler
|
r43347 | _(b'bundle'), total=bytecount, unit=_(b'bytes') | ||
Augie Fackler
|
r43346 | ) | ||
Martin von Zweigbergk
|
r38368 | progress.update(0) | ||
Gregory Szorc
|
r26755 | |||
for chunk in it: | ||||
Martin von Zweigbergk
|
r38368 | progress.increment(step=len(chunk)) | ||
Gregory Szorc
|
r26755 | yield chunk | ||
Martin von Zweigbergk
|
r38392 | progress.complete() | ||
Gregory Szorc
|
r26755 | |||
return requirements, gen() | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r26468 | def consumev1(repo, fp, filecount, bytecount): | ||
Gregory Szorc
|
r26443 | """Apply the contents from version 1 of a streaming clone file handle. | ||
Mads Kiilerich
|
r30332 | This takes the output from "stream_out" and applies it to the specified | ||
Gregory Szorc
|
r26443 | repository. | ||
Mads Kiilerich
|
r30332 | Like "stream_out," the status line added by the wire protocol is not | ||
handled by this function. | ||||
Gregory Szorc
|
r26443 | """ | ||
Bryan O'Sullivan
|
r27859 | with repo.lock(): | ||
Augie Fackler
|
r43346 | repo.ui.status( | ||
Augie Fackler
|
r43347 | _(b'%d files to transfer, %s of data\n') | ||
Augie Fackler
|
r43346 | % (filecount, util.bytecount(bytecount)) | ||
) | ||||
progress = repo.ui.makeprogress( | ||||
Augie Fackler
|
r43347 | _(b'clone'), total=bytecount, unit=_(b'bytes') | ||
Augie Fackler
|
r43346 | ) | ||
Martin von Zweigbergk
|
r38368 | progress.update(0) | ||
Simon Farnsworth
|
r30975 | start = util.timer() | ||
Gregory Szorc
|
r26443 | |||
FUJIWARA Katsunori
|
r29919 | # TODO: get rid of (potential) inconsistency | ||
# | ||||
# If transaction is started and any @filecache property is | ||||
# changed at this point, it causes inconsistency between | ||||
# in-memory cached property and streamclone-ed file on the | ||||
# disk. Nested transaction prevents transaction scope "clone" | ||||
# below from writing in-memory changes out at the end of it, | ||||
# even though in-memory changes are discarded at the end of it | ||||
# regardless of transaction nesting. | ||||
# | ||||
# But transaction nesting can't be simply prohibited, because | ||||
# nesting occurs also in ordinary case (e.g. enabling | ||||
# clonebundles). | ||||
Augie Fackler
|
r43347 | with repo.transaction(b'clone'): | ||
Gregory Szorc
|
r27897 | with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount): | ||
Gregory Szorc
|
r38806 | for i in pycompat.xrange(filecount): | ||
Gregory Szorc
|
r27896 | # XXX doesn't support '\n' or '\r' in filenames | ||
l = fp.readline() | ||||
try: | ||||
Augie Fackler
|
r43347 | name, size = l.split(b'\0', 1) | ||
Gregory Szorc
|
r27896 | size = int(size) | ||
except (ValueError, TypeError): | ||||
raise error.ResponseError( | ||||
Augie Fackler
|
r43347 | _(b'unexpected response from remote server:'), l | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r27896 | if repo.ui.debugflag: | ||
Augie Fackler
|
r43346 | repo.ui.debug( | ||
Augie Fackler
|
r43347 | b'adding %s (%s)\n' % (name, util.bytecount(size)) | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r27896 | # for backwards compat, name was partially encoded | ||
Gregory Szorc
|
r27897 | path = store.decodedir(name) | ||
Augie Fackler
|
r43347 | with repo.svfs(path, b'w', backgroundclose=True) as ofp: | ||
Gregory Szorc
|
r27896 | for chunk in util.filechunkiter(fp, limit=size): | ||
Martin von Zweigbergk
|
r38368 | progress.increment(step=len(chunk)) | ||
Gregory Szorc
|
r27896 | ofp.write(chunk) | ||
Gregory Szorc
|
r26443 | |||
FUJIWARA Katsunori
|
r29919 | # force @filecache properties to be reloaded from | ||
# streamclone-ed file at next access | ||||
repo.invalidate(clearfilecache=True) | ||||
Gregory Szorc
|
r26443 | |||
Simon Farnsworth
|
r30975 | elapsed = util.timer() - start | ||
Gregory Szorc
|
r26443 | if elapsed <= 0: | ||
elapsed = 0.001 | ||||
Martin von Zweigbergk
|
r38392 | progress.complete() | ||
Augie Fackler
|
r43346 | repo.ui.status( | ||
Augie Fackler
|
r43347 | _(b'transferred %s in %.1f seconds (%s/sec)\n') | ||
Augie Fackler
|
r43346 | % ( | ||
util.bytecount(bytecount), | ||||
elapsed, | ||||
util.bytecount(bytecount / elapsed), | ||||
) | ||||
) | ||||
Gregory Szorc
|
r26755 | |||
Gregory Szorc
|
r27882 | def readbundle1header(fp): | ||
Gregory Szorc
|
r26755 | compression = fp.read(2) | ||
Augie Fackler
|
r43347 | if compression != b'UN': | ||
Augie Fackler
|
r43346 | raise error.Abort( | ||
Augie Fackler
|
r43347 | _( | ||
b'only uncompressed stream clone bundles are ' | ||||
b'supported; got %s' | ||||
) | ||||
Augie Fackler
|
r43346 | % compression | ||
) | ||||
Gregory Szorc
|
r26755 | |||
Augie Fackler
|
r43347 | filecount, bytecount = struct.unpack(b'>QQ', fp.read(16)) | ||
requireslen = struct.unpack(b'>H', fp.read(2))[0] | ||||
Gregory Szorc
|
r26755 | requires = fp.read(requireslen) | ||
Augie Fackler
|
r43347 | if not requires.endswith(b'\0'): | ||
Augie Fackler
|
r43346 | raise error.Abort( | ||
_( | ||||
Augie Fackler
|
r43347 | b'malformed stream clone bundle: ' | ||
b'requirements not properly encoded' | ||||
Augie Fackler
|
r43346 | ) | ||
) | ||||
Gregory Szorc
|
r26755 | |||
Augie Fackler
|
r43347 | requirements = set(requires.rstrip(b'\0').split(b',')) | ||
Gregory Szorc
|
r27882 | |||
return filecount, bytecount, requirements | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r27882 | def applybundlev1(repo, fp): | ||
"""Apply the content from a stream clone bundle version 1. | ||||
We assume the 4 byte header has been read and validated and the file handle | ||||
is at the 2 byte compression identifier. | ||||
""" | ||||
if len(repo): | ||||
Augie Fackler
|
r43346 | raise error.Abort( | ||
Martin von Zweigbergk
|
r43387 | _(b'cannot apply stream clone bundle on non-empty repo') | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r27882 | |||
filecount, bytecount, requirements = readbundle1header(fp) | ||||
Gregory Szorc
|
r26755 | missingreqs = requirements - repo.supportedformats | ||
if missingreqs: | ||||
Augie Fackler
|
r43346 | raise error.Abort( | ||
Martin von Zweigbergk
|
r43387 | _(b'unable to apply stream clone: unsupported format: %s') | ||
Augie Fackler
|
r43347 | % b', '.join(sorted(missingreqs)) | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r26755 | |||
consumev1(repo, fp, filecount, bytecount) | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r26755 | class streamcloneapplier(object): | ||
"""Class to manage applying streaming clone bundles. | ||||
We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle | ||||
readers to perform bundle type-specific functionality. | ||||
""" | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r26755 | def __init__(self, fh): | ||
self._fh = fh | ||||
def apply(self, repo): | ||||
return applybundlev1(repo, self._fh) | ||||
Boris Feld
|
r35774 | |||
Augie Fackler
|
r43346 | |||
Boris Feld
|
r35783 | # type of file to stream | ||
Augie Fackler
|
r43346 | _fileappend = 0 # append only file | ||
_filefull = 1 # full snapshot file | ||||
Boris Feld
|
r35783 | |||
Boris Feld
|
r35785 | # Source of the file | ||
Augie Fackler
|
r43347 | _srcstore = b's' # store (svfs) | ||
_srccache = b'c' # cache (cache) | ||||
Boris Feld
|
r35785 | |||
Boris Feld
|
r35783 | # This is it's own function so extensions can override it. | ||
def _walkstreamfullstorefiles(repo): | ||||
"""list snapshot file from the store""" | ||||
fnames = [] | ||||
if not repo.publishing(): | ||||
Augie Fackler
|
r43347 | fnames.append(b'phaseroots') | ||
Boris Feld
|
r35783 | return fnames | ||
Augie Fackler
|
r43346 | |||
Boris Feld
|
r35785 | def _filterfull(entry, copy, vfsmap): | ||
Boris Feld
|
r35783 | """actually copy the snapshot files""" | ||
Boris Feld
|
r35785 | src, name, ftype, data = entry | ||
Boris Feld
|
r35783 | if ftype != _filefull: | ||
return entry | ||||
Boris Feld
|
r35785 | return (src, name, ftype, copy(vfsmap[src].join(name))) | ||
Boris Feld
|
r35783 | |||
Augie Fackler
|
r43346 | |||
Boris Feld
|
r35783 | @contextlib.contextmanager | ||
def maketempcopies(): | ||||
"""return a function to temporary copy file""" | ||||
files = [] | ||||
try: | ||||
Augie Fackler
|
r43346 | |||
Boris Feld
|
r35783 | def copy(src): | ||
Yuya Nishihara
|
r38182 | fd, dst = pycompat.mkstemp() | ||
Boris Feld
|
r35783 | os.close(fd) | ||
files.append(dst) | ||||
util.copyfiles(src, dst, hardlink=True) | ||||
return dst | ||||
Augie Fackler
|
r43346 | |||
Boris Feld
|
r35783 | yield copy | ||
finally: | ||||
for tmp in files: | ||||
util.tryunlink(tmp) | ||||
Augie Fackler
|
r43346 | |||
Boris Feld
|
r35785 | def _makemap(repo): | ||
"""make a (src -> vfs) map for the repo""" | ||||
vfsmap = { | ||||
_srcstore: repo.svfs, | ||||
_srccache: repo.cachevfs, | ||||
} | ||||
# we keep repo.vfs out of the on purpose, ther are too many danger there | ||||
# (eg: .hg/hgrc) | ||||
assert repo.vfs not in vfsmap.values() | ||||
return vfsmap | ||||
Augie Fackler
|
r43346 | |||
Boris Feld
|
r35820 | def _emit2(repo, entries, totalfilesize): | ||
Boris Feld
|
r35774 | """actually emit the stream bundle""" | ||
Boris Feld
|
r35785 | vfsmap = _makemap(repo) | ||
Augie Fackler
|
r43346 | progress = repo.ui.makeprogress( | ||
Augie Fackler
|
r43347 | _(b'bundle'), total=totalfilesize, unit=_(b'bytes') | ||
Augie Fackler
|
r43346 | ) | ||
Martin von Zweigbergk
|
r38368 | progress.update(0) | ||
Martin von Zweigbergk
|
r38393 | with maketempcopies() as copy, progress: | ||
# copy is delayed until we are in the try | ||||
entries = [_filterfull(e, copy, vfsmap) for e in entries] | ||||
Augie Fackler
|
r43346 | yield None # this release the lock on the repository | ||
Martin von Zweigbergk
|
r38393 | seen = 0 | ||
Boris Feld
|
r35783 | |||
Martin von Zweigbergk
|
r38393 | for src, name, ftype, data in entries: | ||
vfs = vfsmap[src] | ||||
yield src | ||||
yield util.uvarintencode(len(name)) | ||||
if ftype == _fileappend: | ||||
fp = vfs(name) | ||||
size = data | ||||
elif ftype == _filefull: | ||||
Augie Fackler
|
r43347 | fp = open(data, b'rb') | ||
Martin von Zweigbergk
|
r38393 | size = util.fstat(fp).st_size | ||
try: | ||||
yield util.uvarintencode(size) | ||||
yield name | ||||
if size <= 65536: | ||||
chunks = (fp.read(size),) | ||||
else: | ||||
chunks = util.filechunkiter(fp, limit=size) | ||||
for chunk in chunks: | ||||
seen += len(chunk) | ||||
progress.update(seen) | ||||
yield chunk | ||||
finally: | ||||
fp.close() | ||||
Boris Feld
|
r35774 | |||
Augie Fackler
|
r43346 | |||
r47748 | def _test_sync_point_walk_1(repo): | |||
"""a function for synchronisation during tests""" | ||||
def _test_sync_point_walk_2(repo): | ||||
"""a function for synchronisation during tests""" | ||||
r48237 | def _v2_walk(repo, includes, excludes, includeobsmarkers): | |||
"""emit a seris of files information useful to clone a repo | ||||
return (entries, totalfilesize) | ||||
entries is a list of tuple (vfs-key, file-path, file-type, size) | ||||
- `vfs-key`: is a key to the right vfs to write the file (see _makemap) | ||||
- `name`: file path of the file to copy (to be feed to the vfss) | ||||
- `file-type`: do this file need to be copied with the source lock ? | ||||
- `size`: the size of the file (or None) | ||||
""" | ||||
assert repo._currentlock(repo._lockref) is not None | ||||
entries = [] | ||||
totalfilesize = 0 | ||||
matcher = None | ||||
if includes or excludes: | ||||
matcher = narrowspec.match(repo.root, includes, excludes) | ||||
for rl_type, name, ename, size in _walkstreamfiles(repo, matcher): | ||||
if size: | ||||
ft = _fileappend | ||||
if rl_type & store.FILEFLAGS_VOLATILE: | ||||
ft = _filefull | ||||
entries.append((_srcstore, name, ft, size)) | ||||
totalfilesize += size | ||||
for name in _walkstreamfullstorefiles(repo): | ||||
if repo.svfs.exists(name): | ||||
totalfilesize += repo.svfs.lstat(name).st_size | ||||
entries.append((_srcstore, name, _filefull, None)) | ||||
if includeobsmarkers and repo.svfs.exists(b'obsstore'): | ||||
totalfilesize += repo.svfs.lstat(b'obsstore').st_size | ||||
entries.append((_srcstore, b'obsstore', _filefull, None)) | ||||
for name in cacheutil.cachetocopy(repo): | ||||
if repo.cachevfs.exists(name): | ||||
totalfilesize += repo.cachevfs.lstat(name).st_size | ||||
entries.append((_srccache, name, _filefull, None)) | ||||
return entries, totalfilesize | ||||
r40434 | def generatev2(repo, includes, excludes, includeobsmarkers): | |||
Boris Feld
|
r35774 | """Emit content for version 2 of a streaming clone. | ||
the data stream consists the following entries: | ||||
Boris Feld
|
r35785 | 1) A char representing the file destination (eg: store or cache) | ||
2) A varint containing the length of the filename | ||||
3) A varint containing the length of file data | ||||
4) N bytes containing the filename (the internal, store-agnostic form) | ||||
5) N bytes containing the file data | ||||
Boris Feld
|
r35774 | |||
Returns a 3-tuple of (file count, file size, data iterator). | ||||
""" | ||||
with repo.lock(): | ||||
r48237 | repo.ui.debug(b'scanning\n') | |||
Pulkit Goyal
|
r40375 | |||
r48237 | entries, totalfilesize = _v2_walk( | |||
repo, | ||||
includes=includes, | ||||
excludes=excludes, | ||||
includeobsmarkers=includeobsmarkers, | ||||
) | ||||
Boris Feld
|
r35774 | |||
Boris Feld
|
r35820 | chunks = _emit2(repo, entries, totalfilesize) | ||
Boris Feld
|
r35783 | first = next(chunks) | ||
assert first is None | ||||
r47748 | _test_sync_point_walk_1(repo) | |||
_test_sync_point_walk_2(repo) | ||||
Boris Feld
|
r35774 | |||
return len(entries), totalfilesize, chunks | ||||
Augie Fackler
|
r43346 | |||
Boris Feld
|
r35785 | @contextlib.contextmanager | ||
def nested(*ctxs): | ||||
Augie Fackler
|
r39793 | this = ctxs[0] | ||
rest = ctxs[1:] | ||||
with this: | ||||
if rest: | ||||
with nested(*rest): | ||||
yield | ||||
else: | ||||
Boris Feld
|
r35785 | yield | ||
Augie Fackler
|
r43346 | |||
Boris Feld
|
r35774 | def consumev2(repo, fp, filecount, filesize): | ||
"""Apply the contents from a version 2 streaming clone. | ||||
Data is read from an object that only needs to provide a ``read(size)`` | ||||
method. | ||||
""" | ||||
with repo.lock(): | ||||
Augie Fackler
|
r43346 | repo.ui.status( | ||
Augie Fackler
|
r43347 | _(b'%d files to transfer, %s of data\n') | ||
Augie Fackler
|
r43346 | % (filecount, util.bytecount(filesize)) | ||
) | ||||
Boris Feld
|
r35774 | |||
start = util.timer() | ||||
Augie Fackler
|
r43346 | progress = repo.ui.makeprogress( | ||
Augie Fackler
|
r43347 | _(b'clone'), total=filesize, unit=_(b'bytes') | ||
Augie Fackler
|
r43346 | ) | ||
Martin von Zweigbergk
|
r38368 | progress.update(0) | ||
Boris Feld
|
r35774 | |||
Boris Feld
|
r35785 | vfsmap = _makemap(repo) | ||
Boris Feld
|
r35774 | |||
Augie Fackler
|
r43347 | with repo.transaction(b'clone'): | ||
Augie Fackler
|
r43346 | ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values()) | ||
Boris Feld
|
r35785 | with nested(*ctxs): | ||
Boris Feld
|
r35774 | for i in range(filecount): | ||
Boris Feld
|
r35821 | src = util.readexactly(fp, 1) | ||
Boris Feld
|
r35785 | vfs = vfsmap[src] | ||
Boris Feld
|
r35774 | namelen = util.uvarintdecodestream(fp) | ||
datalen = util.uvarintdecodestream(fp) | ||||
Boris Feld
|
r35821 | name = util.readexactly(fp, namelen) | ||
Boris Feld
|
r35774 | |||
if repo.ui.debugflag: | ||||
Augie Fackler
|
r43346 | repo.ui.debug( | ||
Augie Fackler
|
r43347 | b'adding [%s] %s (%s)\n' | ||
Augie Fackler
|
r43346 | % (src, name, util.bytecount(datalen)) | ||
) | ||||
Boris Feld
|
r35774 | |||
Augie Fackler
|
r43347 | with vfs(name, b'w') as ofp: | ||
Boris Feld
|
r35774 | for chunk in util.filechunkiter(fp, limit=datalen): | ||
Martin von Zweigbergk
|
r38368 | progress.increment(step=len(chunk)) | ||
Boris Feld
|
r35774 | ofp.write(chunk) | ||
# force @filecache properties to be reloaded from | ||||
# streamclone-ed file at next access | ||||
repo.invalidate(clearfilecache=True) | ||||
elapsed = util.timer() - start | ||||
if elapsed <= 0: | ||||
elapsed = 0.001 | ||||
Augie Fackler
|
r43346 | repo.ui.status( | ||
Augie Fackler
|
r43347 | _(b'transferred %s in %.1f seconds (%s/sec)\n') | ||
Augie Fackler
|
r43346 | % ( | ||
util.bytecount(progress.pos), | ||||
elapsed, | ||||
util.bytecount(progress.pos / elapsed), | ||||
) | ||||
) | ||||
Martin von Zweigbergk
|
r38392 | progress.complete() | ||
Boris Feld
|
r35774 | |||
Augie Fackler
|
r43346 | |||
Boris Feld
|
r35774 | def applybundlev2(repo, fp, filecount, filesize, requirements): | ||
Gregory Szorc
|
r39736 | from . import localrepo | ||
Boris Feld
|
r35774 | missingreqs = [r for r in requirements if r not in repo.supported] | ||
if missingreqs: | ||||
Augie Fackler
|
r43346 | raise error.Abort( | ||
Martin von Zweigbergk
|
r43387 | _(b'unable to apply stream clone: unsupported format: %s') | ||
Augie Fackler
|
r43347 | % b', '.join(sorted(missingreqs)) | ||
Augie Fackler
|
r43346 | ) | ||
Boris Feld
|
r35774 | |||
consumev2(repo, fp, filecount, filesize) | ||||
Boris Feld
|
r35822 | |||
# new requirements = old non-format requirements + | ||||
# new format-related remote requirements | ||||
# requirements from the streamed-in repository | ||||
repo.requirements = set(requirements) | ( | ||||
Augie Fackler
|
r43346 | repo.requirements - repo.supportedformats | ||
) | ||||
Gregory Szorc
|
r39736 | repo.svfs.options = localrepo.resolvestorevfsoptions( | ||
Augie Fackler
|
r43346 | repo.ui, repo.requirements, repo.features | ||
) | ||||
Pulkit Goyal
|
r45666 | scmutil.writereporequirements(repo) | ||
r48240 | ||||
def _copy_files(src_vfs_map, dst_vfs_map, entries, progress): | ||||
hardlink = [True] | ||||
def copy_used(): | ||||
hardlink[0] = False | ||||
progress.topic = _(b'copying') | ||||
for k, path, size in entries: | ||||
src_vfs = src_vfs_map[k] | ||||
dst_vfs = dst_vfs_map[k] | ||||
src_path = src_vfs.join(path) | ||||
dst_path = dst_vfs.join(path) | ||||
dirname = dst_vfs.dirname(path) | ||||
if not dst_vfs.exists(dirname): | ||||
dst_vfs.makedirs(dirname) | ||||
dst_vfs.register_file(path) | ||||
# XXX we could use the #nb_bytes argument. | ||||
util.copyfile( | ||||
src_path, | ||||
dst_path, | ||||
hardlink=hardlink[0], | ||||
no_hardlink_cb=copy_used, | ||||
check_fs_hardlink=False, | ||||
) | ||||
progress.increment() | ||||
return hardlink[0] | ||||
def local_copy(src_repo, dest_repo): | ||||
"""copy all content from one local repository to another | ||||
This is useful for local clone""" | ||||
src_store_requirements = { | ||||
r | ||||
for r in src_repo.requirements | ||||
if r not in requirementsmod.WORKING_DIR_REQUIREMENTS | ||||
} | ||||
dest_store_requirements = { | ||||
r | ||||
for r in dest_repo.requirements | ||||
if r not in requirementsmod.WORKING_DIR_REQUIREMENTS | ||||
} | ||||
assert src_store_requirements == dest_store_requirements | ||||
with dest_repo.lock(): | ||||
with src_repo.lock(): | ||||
r48241 | ||||
# bookmark is not integrated to the streaming as it might use the | ||||
# `repo.vfs` and they are too many sentitive data accessible | ||||
# through `repo.vfs` to expose it to streaming clone. | ||||
src_book_vfs = bookmarks.bookmarksvfs(src_repo) | ||||
srcbookmarks = src_book_vfs.join(b'bookmarks') | ||||
bm_count = 0 | ||||
if os.path.exists(srcbookmarks): | ||||
bm_count = 1 | ||||
r48240 | entries, totalfilesize = _v2_walk( | |||
src_repo, | ||||
includes=None, | ||||
excludes=None, | ||||
includeobsmarkers=True, | ||||
) | ||||
src_vfs_map = _makemap(src_repo) | ||||
dest_vfs_map = _makemap(dest_repo) | ||||
progress = src_repo.ui.makeprogress( | ||||
topic=_(b'linking'), | ||||
r48241 | total=len(entries) + bm_count, | |||
r48240 | unit=_(b'files'), | |||
) | ||||
# copy files | ||||
# | ||||
# We could copy the full file while the source repository is locked | ||||
# and the other one without the lock. However, in the linking case, | ||||
# this would also requires checks that nobody is appending any data | ||||
# to the files while we do the clone, so this is not done yet. We | ||||
# could do this blindly when copying files. | ||||
files = ((k, path, size) for k, path, ftype, size in entries) | ||||
hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress) | ||||
# copy bookmarks over | ||||
r48241 | if bm_count: | |||
dst_book_vfs = bookmarks.bookmarksvfs(dest_repo) | ||||
dstbookmarks = dst_book_vfs.join(b'bookmarks') | ||||
r48240 | util.copyfile(srcbookmarks, dstbookmarks) | |||
progress.complete() | ||||
if hardlink: | ||||
msg = b'linked %d files\n' | ||||
else: | ||||
msg = b'copied %d files\n' | ||||
r48241 | src_repo.ui.debug(msg % (len(entries) + bm_count)) | |||
r48240 | ||||
with dest_repo.transaction(b"localclone") as tr: | ||||
dest_repo.store.write(tr) | ||||
# clean up transaction file as they do not make sense | ||||
undo_files = [(dest_repo.svfs, b'undo.backupfiles')] | ||||
undo_files.extend(dest_repo.undofiles()) | ||||
for undovfs, undofile in undo_files: | ||||
try: | ||||
undovfs.unlink(undofile) | ||||
except OSError as e: | ||||
if e.errno != errno.ENOENT: | ||||
msg = _(b'error removing %s: %s\n') | ||||
path = undovfs.join(undofile) | ||||
e_msg = stringutil.forcebytestr(e) | ||||
msg %= (path, e_msg) | ||||
dest_repo.ui.warn(msg) | ||||