streamclone.py
732 lines
| 23.5 KiB
| text/x-python
|
PythonLexer
/ mercurial / streamclone.py
Gregory Szorc
|
r26441 | # streamclone.py - producing and consuming streaming repository data | ||
# | ||||
# Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com> | ||||
# | ||||
# This software may be used and distributed according to the terms of the | ||||
# GNU General Public License version 2 or any later version. | ||||
from __future__ import absolute_import | ||||
Boris Feld
|
r35783 | import contextlib | ||
import os | ||||
Gregory Szorc
|
r26755 | import struct | ||
Gregory Szorc
|
r26443 | |||
Gregory Szorc
|
r26442 | from .i18n import _ | ||
Augie Fackler
|
r43346 | from .interfaces import repository | ||
Gregory Szorc
|
r26441 | from . import ( | ||
Boris Feld
|
r35785 | cacheutil, | ||
Gregory Szorc
|
r26442 | error, | ||
Pulkit Goyal
|
r40375 | narrowspec, | ||
Gregory Szorc
|
r32744 | phases, | ||
Yuya Nishihara
|
r38182 | pycompat, | ||
Gregory Szorc
|
r26443 | store, | ||
Gregory Szorc
|
r26442 | util, | ||
Gregory Szorc
|
r26441 | ) | ||
Augie Fackler
|
r43346 | |||
Boris Feld
|
r35775 | def canperformstreamclone(pullop, bundle2=False): | ||
Gregory Szorc
|
r26446 | """Whether it is possible to perform a streaming clone as part of pull. | ||
Gregory Szorc
|
r26445 | |||
Boris Feld
|
r35775 | ``bundle2`` will cause the function to consider stream clone through | ||
bundle2 and only through bundle2. | ||||
Gregory Szorc
|
r26467 | |||
Gregory Szorc
|
r26446 | Returns a tuple of (supported, requirements). ``supported`` is True if | ||
streaming clone is supported and False otherwise. ``requirements`` is | ||||
a set of repo requirements from the remote, or ``None`` if stream clone | ||||
isn't supported. | ||||
""" | ||||
Gregory Szorc
|
r26466 | repo = pullop.repo | ||
remote = pullop.remote | ||||
Gregory Szorc
|
r26467 | bundle2supported = False | ||
if pullop.canusebundle2: | ||||
Augie Fackler
|
r43347 | if b'v2' in pullop.remotebundle2caps.get(b'stream', []): | ||
Gregory Szorc
|
r26467 | bundle2supported = True | ||
# else | ||||
Augie Fackler
|
r43346 | # Server doesn't support bundle2 stream clone or doesn't support | ||
# the versions we support. Fall back and possibly allow legacy. | ||||
Gregory Szorc
|
r26467 | |||
# Ensures legacy code path uses available bundle2. | ||||
Boris Feld
|
r35775 | if bundle2supported and not bundle2: | ||
Gregory Szorc
|
r26467 | return False, None | ||
# Ensures bundle2 doesn't try to do a stream clone if it isn't supported. | ||||
Boris Feld
|
r35775 | elif bundle2 and not bundle2supported: | ||
return False, None | ||||
Gregory Szorc
|
r26467 | |||
Gregory Szorc
|
r26447 | # Streaming clone only works on empty repositories. | ||
if len(repo): | ||||
return False, None | ||||
Gregory Szorc
|
r26446 | # Streaming clone only works if all data is being requested. | ||
Gregory Szorc
|
r26466 | if pullop.heads: | ||
Gregory Szorc
|
r26446 | return False, None | ||
Gregory Szorc
|
r26445 | |||
Gregory Szorc
|
r26466 | streamrequested = pullop.streamclonerequested | ||
Gregory Szorc
|
r26446 | # If we don't have a preference, let the server decide for us. This | ||
# likely only comes into play in LANs. | ||||
if streamrequested is None: | ||||
# The server can advertise whether to prefer streaming clone. | ||||
Augie Fackler
|
r43347 | streamrequested = remote.capable(b'stream-preferred') | ||
Gregory Szorc
|
r26446 | |||
if not streamrequested: | ||||
return False, None | ||||
Gregory Szorc
|
r26445 | |||
Gregory Szorc
|
r26446 | # In order for stream clone to work, the client has to support all the | ||
# requirements advertised by the server. | ||||
# | ||||
# The server advertises its requirements via the "stream" and "streamreqs" | ||||
# capability. "stream" (a value-less capability) is advertised if and only | ||||
# if the only requirement is "revlogv1." Else, the "streamreqs" capability | ||||
# is advertised and contains a comma-delimited list of requirements. | ||||
requirements = set() | ||||
Augie Fackler
|
r43347 | if remote.capable(b'stream'): | ||
requirements.add(b'revlogv1') | ||||
Gregory Szorc
|
r26446 | else: | ||
Augie Fackler
|
r43347 | streamreqs = remote.capable(b'streamreqs') | ||
Gregory Szorc
|
r26446 | # This is weird and shouldn't happen with modern servers. | ||
if not streamreqs: | ||||
Augie Fackler
|
r43346 | pullop.repo.ui.warn( | ||
_( | ||||
Augie Fackler
|
r43347 | b'warning: stream clone requested but server has them ' | ||
b'disabled\n' | ||||
Augie Fackler
|
r43346 | ) | ||
) | ||||
Gregory Szorc
|
r26446 | return False, None | ||
Augie Fackler
|
r43347 | streamreqs = set(streamreqs.split(b',')) | ||
Gregory Szorc
|
r26446 | # Server requires something we don't support. Bail. | ||
Siddharth Agarwal
|
r32259 | missingreqs = streamreqs - repo.supportedformats | ||
if missingreqs: | ||||
pullop.repo.ui.warn( | ||||
Augie Fackler
|
r43346 | _( | ||
Augie Fackler
|
r43347 | b'warning: stream clone requested but client is missing ' | ||
b'requirements: %s\n' | ||||
Augie Fackler
|
r43346 | ) | ||
Augie Fackler
|
r43347 | % b', '.join(sorted(missingreqs)) | ||
Augie Fackler
|
r43346 | ) | ||
pullop.repo.ui.warn( | ||||
_( | ||||
Augie Fackler
|
r43347 | b'(see https://www.mercurial-scm.org/wiki/MissingRequirement ' | ||
b'for more information)\n' | ||||
Augie Fackler
|
r43346 | ) | ||
) | ||||
Gregory Szorc
|
r26446 | return False, None | ||
requirements = streamreqs | ||||
return True, requirements | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r26462 | def maybeperformlegacystreamclone(pullop): | ||
"""Possibly perform a legacy stream clone operation. | ||||
Legacy stream clones are performed as part of pull but before all other | ||||
operations. | ||||
A legacy stream clone will not be performed if a bundle2 stream clone is | ||||
supported. | ||||
""" | ||||
Gregory Szorc
|
r39736 | from . import localrepo | ||
Gregory Szorc
|
r26466 | supported, requirements = canperformstreamclone(pullop) | ||
Gregory Szorc
|
r26458 | |||
Gregory Szorc
|
r26446 | if not supported: | ||
return | ||||
Gregory Szorc
|
r26466 | repo = pullop.repo | ||
remote = pullop.remote | ||||
Gregory Szorc
|
r26459 | # Save remote branchmap. We will use it later to speed up branchcache | ||
# creation. | ||||
rbranchmap = None | ||||
Augie Fackler
|
r43347 | if remote.capable(b'branchmap'): | ||
Gregory Szorc
|
r37656 | with remote.commandexecutor() as e: | ||
Augie Fackler
|
r43347 | rbranchmap = e.callcommand(b'branchmap', {}).result() | ||
Gregory Szorc
|
r26459 | |||
Augie Fackler
|
r43347 | repo.ui.status(_(b'streaming all changes\n')) | ||
Gregory Szorc
|
r26470 | |||
Gregory Szorc
|
r37656 | with remote.commandexecutor() as e: | ||
Augie Fackler
|
r43347 | fp = e.callcommand(b'stream_out', {}).result() | ||
Gregory Szorc
|
r37656 | |||
# TODO strictly speaking, this code should all be inside the context | ||||
# manager because the context manager is supposed to ensure all wire state | ||||
# is flushed when exiting. But the legacy peers don't do this, so it | ||||
# doesn't matter. | ||||
Gregory Szorc
|
r26459 | l = fp.readline() | ||
try: | ||||
resp = int(l) | ||||
except ValueError: | ||||
raise error.ResponseError( | ||||
Augie Fackler
|
r43347 | _(b'unexpected response from remote server:'), l | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r26459 | if resp == 1: | ||
Augie Fackler
|
r43347 | raise error.Abort(_(b'operation forbidden by server')) | ||
Gregory Szorc
|
r26459 | elif resp == 2: | ||
Augie Fackler
|
r43347 | raise error.Abort(_(b'locking the remote repository failed')) | ||
Gregory Szorc
|
r26459 | elif resp != 0: | ||
Augie Fackler
|
r43347 | raise error.Abort(_(b'the server sent an unknown error code')) | ||
Gregory Szorc
|
r26459 | |||
Gregory Szorc
|
r26468 | l = fp.readline() | ||
try: | ||||
Augie Fackler
|
r43347 | filecount, bytecount = map(int, l.split(b' ', 1)) | ||
Gregory Szorc
|
r26468 | except (ValueError, TypeError): | ||
raise error.ResponseError( | ||||
Augie Fackler
|
r43347 | _(b'unexpected response from remote server:'), l | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r26468 | |||
Bryan O'Sullivan
|
r27850 | with repo.lock(): | ||
Gregory Szorc
|
r26468 | consumev1(repo, fp, filecount, bytecount) | ||
Gregory Szorc
|
r26461 | |||
# new requirements = old non-format requirements + | ||||
# new format-related remote requirements | ||||
# requirements from the streamed-in repository | ||||
repo.requirements = requirements | ( | ||||
Augie Fackler
|
r43346 | repo.requirements - repo.supportedformats | ||
) | ||||
Gregory Szorc
|
r39736 | repo.svfs.options = localrepo.resolvestorevfsoptions( | ||
Augie Fackler
|
r43346 | repo.ui, repo.requirements, repo.features | ||
) | ||||
Gregory Szorc
|
r26461 | repo._writerequirements() | ||
if rbranchmap: | ||||
Martijn Pieters
|
r41764 | repo._branchcaches.replace(repo, rbranchmap) | ||
Gregory Szorc
|
r26461 | |||
repo.invalidate() | ||||
Gregory Szorc
|
r26445 | |||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r32744 | def allowservergeneration(repo): | ||
Gregory Szorc
|
r26444 | """Whether streaming clones are allowed from the server.""" | ||
Gregory Szorc
|
r40064 | if repository.REPO_FEATURE_STREAM_CLONE not in repo.features: | ||
return False | ||||
Augie Fackler
|
r43347 | if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True): | ||
Gregory Szorc
|
r32744 | return False | ||
# The way stream clone works makes it impossible to hide secret changesets. | ||||
# So don't allow this by default. | ||||
secret = phases.hassecret(repo) | ||||
if secret: | ||||
Augie Fackler
|
r43347 | return repo.ui.configbool(b'server', b'uncompressedallowsecret') | ||
Gregory Szorc
|
r32744 | |||
return True | ||||
Gregory Szorc
|
r26444 | |||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r26443 | # This is it's own function so extensions can override it. | ||
Pulkit Goyal
|
r40375 | def _walkstreamfiles(repo, matcher=None): | ||
return repo.store.walk(matcher) | ||||
Gregory Szorc
|
r26443 | |||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r26443 | def generatev1(repo): | ||
"""Emit content for version 1 of a streaming clone. | ||||
Gregory Szorc
|
r26469 | This returns a 3-tuple of (file count, byte size, data iterator). | ||
Gregory Szorc
|
r26443 | |||
Gregory Szorc
|
r26469 | The data iterator consists of N entries for each file being transferred. | ||
Each file entry starts as a line with the file name and integer size | ||||
delimited by a null byte. | ||||
Gregory Szorc
|
r26443 | |||
The raw file data follows. Following the raw file data is the next file | ||||
entry, or EOF. | ||||
When used on the wire protocol, an additional line indicating protocol | ||||
success will be prepended to the stream. This function is not responsible | ||||
for adding it. | ||||
This function will obtain a repository lock to ensure a consistent view of | ||||
the store is captured. It therefore may raise LockError. | ||||
""" | ||||
entries = [] | ||||
total_bytes = 0 | ||||
# Get consistent snapshot of repo, lock during scan. | ||||
Bryan O'Sullivan
|
r27845 | with repo.lock(): | ||
Augie Fackler
|
r43347 | repo.ui.debug(b'scanning\n') | ||
Gregory Szorc
|
r26443 | for name, ename, size in _walkstreamfiles(repo): | ||
if size: | ||||
entries.append((name, size)) | ||||
total_bytes += size | ||||
Augie Fackler
|
r43346 | repo.ui.debug( | ||
Augie Fackler
|
r43347 | b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes) | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r26443 | |||
svfs = repo.svfs | ||||
debugflag = repo.ui.debugflag | ||||
Gregory Szorc
|
r26469 | def emitrevlogdata(): | ||
r33256 | for name, size in entries: | |||
if debugflag: | ||||
Augie Fackler
|
r43347 | repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size)) | ||
r33256 | # partially encode name over the wire for backwards compat | |||
Augie Fackler
|
r43347 | yield b'%s\0%d\n' % (store.encodedir(name), size) | ||
Yuya Nishihara
|
r33411 | # auditing at this stage is both pointless (paths are already | ||
# trusted by the local repo) and expensive | ||||
Augie Fackler
|
r43347 | with svfs(name, b'rb', auditpath=False) as fp: | ||
Yuya Nishihara
|
r33410 | if size <= 65536: | ||
r33256 | yield fp.read(size) | |||
Yuya Nishihara
|
r33410 | else: | ||
for chunk in util.filechunkiter(fp, limit=size): | ||||
yield chunk | ||||
Gregory Szorc
|
r26469 | |||
return len(entries), total_bytes, emitrevlogdata() | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r26469 | def generatev1wireproto(repo): | ||
"""Emit content for version 1 of streaming clone suitable for the wire. | ||||
Gregory Szorc
|
r35507 | This is the data output from ``generatev1()`` with 2 header lines. The | ||
first line indicates overall success. The 2nd contains the file count and | ||||
byte size of payload. | ||||
The success line contains "0" for success, "1" for stream generation not | ||||
allowed, and "2" for error locking the repository (possibly indicating | ||||
a permissions error for the server process). | ||||
Gregory Szorc
|
r26469 | """ | ||
Gregory Szorc
|
r35507 | if not allowservergeneration(repo): | ||
Augie Fackler
|
r43347 | yield b'1\n' | ||
Gregory Szorc
|
r35507 | return | ||
try: | ||||
filecount, bytecount, it = generatev1(repo) | ||||
except error.LockError: | ||||
Augie Fackler
|
r43347 | yield b'2\n' | ||
Gregory Szorc
|
r35507 | return | ||
# Indicates successful response. | ||||
Augie Fackler
|
r43347 | yield b'0\n' | ||
yield b'%d %d\n' % (filecount, bytecount) | ||||
Gregory Szorc
|
r26469 | for chunk in it: | ||
yield chunk | ||||
Gregory Szorc
|
r26443 | |||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r43347 | def generatebundlev1(repo, compression=b'UN'): | ||
Gregory Szorc
|
r26755 | """Emit content for version 1 of a stream clone bundle. | ||
The first 4 bytes of the output ("HGS1") denote this as stream clone | ||||
bundle version 1. | ||||
The next 2 bytes indicate the compression type. Only "UN" is currently | ||||
supported. | ||||
The next 16 bytes are two 64-bit big endian unsigned integers indicating | ||||
file count and byte count, respectively. | ||||
The next 2 bytes is a 16-bit big endian unsigned short declaring the length | ||||
of the requirements string, including a trailing \0. The following N bytes | ||||
are the requirements string, which is ASCII containing a comma-delimited | ||||
list of repo requirements that are needed to support the data. | ||||
The remaining content is the output of ``generatev1()`` (which may be | ||||
compressed in the future). | ||||
Returns a tuple of (requirements, data generator). | ||||
""" | ||||
Augie Fackler
|
r43347 | if compression != b'UN': | ||
raise ValueError(b'we do not support the compression argument yet') | ||||
Gregory Szorc
|
r26755 | |||
requirements = repo.requirements & repo.supportedformats | ||||
Augie Fackler
|
r43347 | requires = b','.join(sorted(requirements)) | ||
Gregory Szorc
|
r26755 | |||
def gen(): | ||||
Augie Fackler
|
r43347 | yield b'HGS1' | ||
Gregory Szorc
|
r26755 | yield compression | ||
filecount, bytecount, it = generatev1(repo) | ||||
Augie Fackler
|
r43346 | repo.ui.status( | ||
Augie Fackler
|
r43347 | _(b'writing %d bytes for %d files\n') % (bytecount, filecount) | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r26755 | |||
Augie Fackler
|
r43347 | yield struct.pack(b'>QQ', filecount, bytecount) | ||
yield struct.pack(b'>H', len(requires) + 1) | ||||
yield requires + b'\0' | ||||
Gregory Szorc
|
r26755 | |||
# This is where we'll add compression in the future. | ||||
Augie Fackler
|
r43347 | assert compression == b'UN' | ||
Gregory Szorc
|
r26755 | |||
Augie Fackler
|
r43346 | progress = repo.ui.makeprogress( | ||
Augie Fackler
|
r43347 | _(b'bundle'), total=bytecount, unit=_(b'bytes') | ||
Augie Fackler
|
r43346 | ) | ||
Martin von Zweigbergk
|
r38368 | progress.update(0) | ||
Gregory Szorc
|
r26755 | |||
for chunk in it: | ||||
Martin von Zweigbergk
|
r38368 | progress.increment(step=len(chunk)) | ||
Gregory Szorc
|
r26755 | yield chunk | ||
Martin von Zweigbergk
|
r38392 | progress.complete() | ||
Gregory Szorc
|
r26755 | |||
return requirements, gen() | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r26468 | def consumev1(repo, fp, filecount, bytecount): | ||
Gregory Szorc
|
r26443 | """Apply the contents from version 1 of a streaming clone file handle. | ||
Mads Kiilerich
|
r30332 | This takes the output from "stream_out" and applies it to the specified | ||
Gregory Szorc
|
r26443 | repository. | ||
Mads Kiilerich
|
r30332 | Like "stream_out," the status line added by the wire protocol is not | ||
handled by this function. | ||||
Gregory Szorc
|
r26443 | """ | ||
Bryan O'Sullivan
|
r27859 | with repo.lock(): | ||
Augie Fackler
|
r43346 | repo.ui.status( | ||
Augie Fackler
|
r43347 | _(b'%d files to transfer, %s of data\n') | ||
Augie Fackler
|
r43346 | % (filecount, util.bytecount(bytecount)) | ||
) | ||||
progress = repo.ui.makeprogress( | ||||
Augie Fackler
|
r43347 | _(b'clone'), total=bytecount, unit=_(b'bytes') | ||
Augie Fackler
|
r43346 | ) | ||
Martin von Zweigbergk
|
r38368 | progress.update(0) | ||
Simon Farnsworth
|
r30975 | start = util.timer() | ||
Gregory Szorc
|
r26443 | |||
FUJIWARA Katsunori
|
r29919 | # TODO: get rid of (potential) inconsistency | ||
# | ||||
# If transaction is started and any @filecache property is | ||||
# changed at this point, it causes inconsistency between | ||||
# in-memory cached property and streamclone-ed file on the | ||||
# disk. Nested transaction prevents transaction scope "clone" | ||||
# below from writing in-memory changes out at the end of it, | ||||
# even though in-memory changes are discarded at the end of it | ||||
# regardless of transaction nesting. | ||||
# | ||||
# But transaction nesting can't be simply prohibited, because | ||||
# nesting occurs also in ordinary case (e.g. enabling | ||||
# clonebundles). | ||||
Augie Fackler
|
r43347 | with repo.transaction(b'clone'): | ||
Gregory Szorc
|
r27897 | with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount): | ||
Gregory Szorc
|
r38806 | for i in pycompat.xrange(filecount): | ||
Gregory Szorc
|
r27896 | # XXX doesn't support '\n' or '\r' in filenames | ||
l = fp.readline() | ||||
try: | ||||
Augie Fackler
|
r43347 | name, size = l.split(b'\0', 1) | ||
Gregory Szorc
|
r27896 | size = int(size) | ||
except (ValueError, TypeError): | ||||
raise error.ResponseError( | ||||
Augie Fackler
|
r43347 | _(b'unexpected response from remote server:'), l | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r27896 | if repo.ui.debugflag: | ||
Augie Fackler
|
r43346 | repo.ui.debug( | ||
Augie Fackler
|
r43347 | b'adding %s (%s)\n' % (name, util.bytecount(size)) | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r27896 | # for backwards compat, name was partially encoded | ||
Gregory Szorc
|
r27897 | path = store.decodedir(name) | ||
Augie Fackler
|
r43347 | with repo.svfs(path, b'w', backgroundclose=True) as ofp: | ||
Gregory Szorc
|
r27896 | for chunk in util.filechunkiter(fp, limit=size): | ||
Martin von Zweigbergk
|
r38368 | progress.increment(step=len(chunk)) | ||
Gregory Szorc
|
r27896 | ofp.write(chunk) | ||
Gregory Szorc
|
r26443 | |||
FUJIWARA Katsunori
|
r29919 | # force @filecache properties to be reloaded from | ||
# streamclone-ed file at next access | ||||
repo.invalidate(clearfilecache=True) | ||||
Gregory Szorc
|
r26443 | |||
Simon Farnsworth
|
r30975 | elapsed = util.timer() - start | ||
Gregory Szorc
|
r26443 | if elapsed <= 0: | ||
elapsed = 0.001 | ||||
Martin von Zweigbergk
|
r38392 | progress.complete() | ||
Augie Fackler
|
r43346 | repo.ui.status( | ||
Augie Fackler
|
r43347 | _(b'transferred %s in %.1f seconds (%s/sec)\n') | ||
Augie Fackler
|
r43346 | % ( | ||
util.bytecount(bytecount), | ||||
elapsed, | ||||
util.bytecount(bytecount / elapsed), | ||||
) | ||||
) | ||||
Gregory Szorc
|
r26755 | |||
Gregory Szorc
|
r27882 | def readbundle1header(fp): | ||
Gregory Szorc
|
r26755 | compression = fp.read(2) | ||
Augie Fackler
|
r43347 | if compression != b'UN': | ||
Augie Fackler
|
r43346 | raise error.Abort( | ||
Augie Fackler
|
r43347 | _( | ||
b'only uncompressed stream clone bundles are ' | ||||
b'supported; got %s' | ||||
) | ||||
Augie Fackler
|
r43346 | % compression | ||
) | ||||
Gregory Szorc
|
r26755 | |||
Augie Fackler
|
r43347 | filecount, bytecount = struct.unpack(b'>QQ', fp.read(16)) | ||
requireslen = struct.unpack(b'>H', fp.read(2))[0] | ||||
Gregory Szorc
|
r26755 | requires = fp.read(requireslen) | ||
Augie Fackler
|
r43347 | if not requires.endswith(b'\0'): | ||
Augie Fackler
|
r43346 | raise error.Abort( | ||
_( | ||||
Augie Fackler
|
r43347 | b'malformed stream clone bundle: ' | ||
b'requirements not properly encoded' | ||||
Augie Fackler
|
r43346 | ) | ||
) | ||||
Gregory Szorc
|
r26755 | |||
Augie Fackler
|
r43347 | requirements = set(requires.rstrip(b'\0').split(b',')) | ||
Gregory Szorc
|
r27882 | |||
return filecount, bytecount, requirements | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r27882 | def applybundlev1(repo, fp): | ||
"""Apply the content from a stream clone bundle version 1. | ||||
We assume the 4 byte header has been read and validated and the file handle | ||||
is at the 2 byte compression identifier. | ||||
""" | ||||
if len(repo): | ||||
Augie Fackler
|
r43346 | raise error.Abort( | ||
Augie Fackler
|
r43347 | _(b'cannot apply stream clone bundle on non-empty ' b'repo') | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r27882 | |||
filecount, bytecount, requirements = readbundle1header(fp) | ||||
Gregory Szorc
|
r26755 | missingreqs = requirements - repo.supportedformats | ||
if missingreqs: | ||||
Augie Fackler
|
r43346 | raise error.Abort( | ||
Augie Fackler
|
r43347 | _(b'unable to apply stream clone: ' b'unsupported format: %s') | ||
% b', '.join(sorted(missingreqs)) | ||||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r26755 | |||
consumev1(repo, fp, filecount, bytecount) | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r26755 | class streamcloneapplier(object): | ||
"""Class to manage applying streaming clone bundles. | ||||
We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle | ||||
readers to perform bundle type-specific functionality. | ||||
""" | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r26755 | def __init__(self, fh): | ||
self._fh = fh | ||||
def apply(self, repo): | ||||
return applybundlev1(repo, self._fh) | ||||
Boris Feld
|
r35774 | |||
Augie Fackler
|
r43346 | |||
Boris Feld
|
r35783 | # type of file to stream | ||
Augie Fackler
|
r43346 | _fileappend = 0 # append only file | ||
_filefull = 1 # full snapshot file | ||||
Boris Feld
|
r35783 | |||
Boris Feld
|
r35785 | # Source of the file | ||
Augie Fackler
|
r43347 | _srcstore = b's' # store (svfs) | ||
_srccache = b'c' # cache (cache) | ||||
Boris Feld
|
r35785 | |||
Boris Feld
|
r35783 | # This is it's own function so extensions can override it. | ||
def _walkstreamfullstorefiles(repo): | ||||
"""list snapshot file from the store""" | ||||
fnames = [] | ||||
if not repo.publishing(): | ||||
Augie Fackler
|
r43347 | fnames.append(b'phaseroots') | ||
Boris Feld
|
r35783 | return fnames | ||
Augie Fackler
|
r43346 | |||
Boris Feld
|
r35785 | def _filterfull(entry, copy, vfsmap): | ||
Boris Feld
|
r35783 | """actually copy the snapshot files""" | ||
Boris Feld
|
r35785 | src, name, ftype, data = entry | ||
Boris Feld
|
r35783 | if ftype != _filefull: | ||
return entry | ||||
Boris Feld
|
r35785 | return (src, name, ftype, copy(vfsmap[src].join(name))) | ||
Boris Feld
|
r35783 | |||
Augie Fackler
|
r43346 | |||
Boris Feld
|
r35783 | @contextlib.contextmanager | ||
def maketempcopies(): | ||||
"""return a function to temporary copy file""" | ||||
files = [] | ||||
try: | ||||
Augie Fackler
|
r43346 | |||
Boris Feld
|
r35783 | def copy(src): | ||
Yuya Nishihara
|
r38182 | fd, dst = pycompat.mkstemp() | ||
Boris Feld
|
r35783 | os.close(fd) | ||
files.append(dst) | ||||
util.copyfiles(src, dst, hardlink=True) | ||||
return dst | ||||
Augie Fackler
|
r43346 | |||
Boris Feld
|
r35783 | yield copy | ||
finally: | ||||
for tmp in files: | ||||
util.tryunlink(tmp) | ||||
Augie Fackler
|
r43346 | |||
Boris Feld
|
r35785 | def _makemap(repo): | ||
"""make a (src -> vfs) map for the repo""" | ||||
vfsmap = { | ||||
_srcstore: repo.svfs, | ||||
_srccache: repo.cachevfs, | ||||
} | ||||
# we keep repo.vfs out of the on purpose, ther are too many danger there | ||||
# (eg: .hg/hgrc) | ||||
assert repo.vfs not in vfsmap.values() | ||||
return vfsmap | ||||
Augie Fackler
|
r43346 | |||
Boris Feld
|
r35820 | def _emit2(repo, entries, totalfilesize): | ||
Boris Feld
|
r35774 | """actually emit the stream bundle""" | ||
Boris Feld
|
r35785 | vfsmap = _makemap(repo) | ||
Augie Fackler
|
r43346 | progress = repo.ui.makeprogress( | ||
Augie Fackler
|
r43347 | _(b'bundle'), total=totalfilesize, unit=_(b'bytes') | ||
Augie Fackler
|
r43346 | ) | ||
Martin von Zweigbergk
|
r38368 | progress.update(0) | ||
Martin von Zweigbergk
|
r38393 | with maketempcopies() as copy, progress: | ||
# copy is delayed until we are in the try | ||||
entries = [_filterfull(e, copy, vfsmap) for e in entries] | ||||
Augie Fackler
|
r43346 | yield None # this release the lock on the repository | ||
Martin von Zweigbergk
|
r38393 | seen = 0 | ||
Boris Feld
|
r35783 | |||
Martin von Zweigbergk
|
r38393 | for src, name, ftype, data in entries: | ||
vfs = vfsmap[src] | ||||
yield src | ||||
yield util.uvarintencode(len(name)) | ||||
if ftype == _fileappend: | ||||
fp = vfs(name) | ||||
size = data | ||||
elif ftype == _filefull: | ||||
Augie Fackler
|
r43347 | fp = open(data, b'rb') | ||
Martin von Zweigbergk
|
r38393 | size = util.fstat(fp).st_size | ||
try: | ||||
yield util.uvarintencode(size) | ||||
yield name | ||||
if size <= 65536: | ||||
chunks = (fp.read(size),) | ||||
else: | ||||
chunks = util.filechunkiter(fp, limit=size) | ||||
for chunk in chunks: | ||||
seen += len(chunk) | ||||
progress.update(seen) | ||||
yield chunk | ||||
finally: | ||||
fp.close() | ||||
Boris Feld
|
r35774 | |||
Augie Fackler
|
r43346 | |||
r40434 | def generatev2(repo, includes, excludes, includeobsmarkers): | |||
Boris Feld
|
r35774 | """Emit content for version 2 of a streaming clone. | ||
the data stream consists the following entries: | ||||
Boris Feld
|
r35785 | 1) A char representing the file destination (eg: store or cache) | ||
2) A varint containing the length of the filename | ||||
3) A varint containing the length of file data | ||||
4) N bytes containing the filename (the internal, store-agnostic form) | ||||
5) N bytes containing the file data | ||||
Boris Feld
|
r35774 | |||
Returns a 3-tuple of (file count, file size, data iterator). | ||||
""" | ||||
with repo.lock(): | ||||
entries = [] | ||||
totalfilesize = 0 | ||||
Pulkit Goyal
|
r40375 | matcher = None | ||
if includes or excludes: | ||||
matcher = narrowspec.match(repo.root, includes, excludes) | ||||
Augie Fackler
|
r43347 | repo.ui.debug(b'scanning\n') | ||
Pulkit Goyal
|
r40375 | for name, ename, size in _walkstreamfiles(repo, matcher): | ||
Boris Feld
|
r35774 | if size: | ||
Boris Feld
|
r35785 | entries.append((_srcstore, name, _fileappend, size)) | ||
Boris Feld
|
r35774 | totalfilesize += size | ||
Boris Feld
|
r35783 | for name in _walkstreamfullstorefiles(repo): | ||
if repo.svfs.exists(name): | ||||
totalfilesize += repo.svfs.lstat(name).st_size | ||||
Boris Feld
|
r35785 | entries.append((_srcstore, name, _filefull, None)) | ||
Augie Fackler
|
r43347 | if includeobsmarkers and repo.svfs.exists(b'obsstore'): | ||
totalfilesize += repo.svfs.lstat(b'obsstore').st_size | ||||
entries.append((_srcstore, b'obsstore', _filefull, None)) | ||||
Boris Feld
|
r35785 | for name in cacheutil.cachetocopy(repo): | ||
if repo.cachevfs.exists(name): | ||||
totalfilesize += repo.cachevfs.lstat(name).st_size | ||||
entries.append((_srccache, name, _filefull, None)) | ||||
Boris Feld
|
r35774 | |||
Boris Feld
|
r35820 | chunks = _emit2(repo, entries, totalfilesize) | ||
Boris Feld
|
r35783 | first = next(chunks) | ||
assert first is None | ||||
Boris Feld
|
r35774 | |||
return len(entries), totalfilesize, chunks | ||||
Augie Fackler
|
r43346 | |||
Boris Feld
|
r35785 | @contextlib.contextmanager | ||
def nested(*ctxs): | ||||
Augie Fackler
|
r39793 | this = ctxs[0] | ||
rest = ctxs[1:] | ||||
with this: | ||||
if rest: | ||||
with nested(*rest): | ||||
yield | ||||
else: | ||||
Boris Feld
|
r35785 | yield | ||
Augie Fackler
|
r43346 | |||
Boris Feld
|
r35774 | def consumev2(repo, fp, filecount, filesize): | ||
"""Apply the contents from a version 2 streaming clone. | ||||
Data is read from an object that only needs to provide a ``read(size)`` | ||||
method. | ||||
""" | ||||
with repo.lock(): | ||||
Augie Fackler
|
r43346 | repo.ui.status( | ||
Augie Fackler
|
r43347 | _(b'%d files to transfer, %s of data\n') | ||
Augie Fackler
|
r43346 | % (filecount, util.bytecount(filesize)) | ||
) | ||||
Boris Feld
|
r35774 | |||
start = util.timer() | ||||
Augie Fackler
|
r43346 | progress = repo.ui.makeprogress( | ||
Augie Fackler
|
r43347 | _(b'clone'), total=filesize, unit=_(b'bytes') | ||
Augie Fackler
|
r43346 | ) | ||
Martin von Zweigbergk
|
r38368 | progress.update(0) | ||
Boris Feld
|
r35774 | |||
Boris Feld
|
r35785 | vfsmap = _makemap(repo) | ||
Boris Feld
|
r35774 | |||
Augie Fackler
|
r43347 | with repo.transaction(b'clone'): | ||
Augie Fackler
|
r43346 | ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values()) | ||
Boris Feld
|
r35785 | with nested(*ctxs): | ||
Boris Feld
|
r35774 | for i in range(filecount): | ||
Boris Feld
|
r35821 | src = util.readexactly(fp, 1) | ||
Boris Feld
|
r35785 | vfs = vfsmap[src] | ||
Boris Feld
|
r35774 | namelen = util.uvarintdecodestream(fp) | ||
datalen = util.uvarintdecodestream(fp) | ||||
Boris Feld
|
r35821 | name = util.readexactly(fp, namelen) | ||
Boris Feld
|
r35774 | |||
if repo.ui.debugflag: | ||||
Augie Fackler
|
r43346 | repo.ui.debug( | ||
Augie Fackler
|
r43347 | b'adding [%s] %s (%s)\n' | ||
Augie Fackler
|
r43346 | % (src, name, util.bytecount(datalen)) | ||
) | ||||
Boris Feld
|
r35774 | |||
Augie Fackler
|
r43347 | with vfs(name, b'w') as ofp: | ||
Boris Feld
|
r35774 | for chunk in util.filechunkiter(fp, limit=datalen): | ||
Martin von Zweigbergk
|
r38368 | progress.increment(step=len(chunk)) | ||
Boris Feld
|
r35774 | ofp.write(chunk) | ||
# force @filecache properties to be reloaded from | ||||
# streamclone-ed file at next access | ||||
repo.invalidate(clearfilecache=True) | ||||
elapsed = util.timer() - start | ||||
if elapsed <= 0: | ||||
elapsed = 0.001 | ||||
Augie Fackler
|
r43346 | repo.ui.status( | ||
Augie Fackler
|
r43347 | _(b'transferred %s in %.1f seconds (%s/sec)\n') | ||
Augie Fackler
|
r43346 | % ( | ||
util.bytecount(progress.pos), | ||||
elapsed, | ||||
util.bytecount(progress.pos / elapsed), | ||||
) | ||||
) | ||||
Martin von Zweigbergk
|
r38392 | progress.complete() | ||
Boris Feld
|
r35774 | |||
Augie Fackler
|
r43346 | |||
Boris Feld
|
r35774 | def applybundlev2(repo, fp, filecount, filesize, requirements): | ||
Gregory Szorc
|
r39736 | from . import localrepo | ||
Boris Feld
|
r35774 | missingreqs = [r for r in requirements if r not in repo.supported] | ||
if missingreqs: | ||||
Augie Fackler
|
r43346 | raise error.Abort( | ||
Augie Fackler
|
r43347 | _(b'unable to apply stream clone: ' b'unsupported format: %s') | ||
% b', '.join(sorted(missingreqs)) | ||||
Augie Fackler
|
r43346 | ) | ||
Boris Feld
|
r35774 | |||
consumev2(repo, fp, filecount, filesize) | ||||
Boris Feld
|
r35822 | |||
# new requirements = old non-format requirements + | ||||
# new format-related remote requirements | ||||
# requirements from the streamed-in repository | ||||
repo.requirements = set(requirements) | ( | ||||
Augie Fackler
|
r43346 | repo.requirements - repo.supportedformats | ||
) | ||||
Gregory Szorc
|
r39736 | repo.svfs.options = localrepo.resolvestorevfsoptions( | ||
Augie Fackler
|
r43346 | repo.ui, repo.requirements, repo.features | ||
) | ||||
Boris Feld
|
r35822 | repo._writerequirements() | ||