Show More
streamclone.py
647 lines
| 22.5 KiB
| text/x-python
|
PythonLexer
/ mercurial / streamclone.py
Gregory Szorc
|
r26441 | # streamclone.py - producing and consuming streaming repository data | ||
# | ||||
# Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com> | ||||
# | ||||
# This software may be used and distributed according to the terms of the | ||||
# GNU General Public License version 2 or any later version. | ||||
from __future__ import absolute_import | ||||
Boris Feld
|
r35783 | import contextlib | ||
import os | ||||
Gregory Szorc
|
r26755 | import struct | ||
Gregory Szorc
|
r26443 | |||
Gregory Szorc
|
r26442 | from .i18n import _ | ||
Gregory Szorc
|
r26441 | from . import ( | ||
branchmap, | ||||
Boris Feld
|
r35785 | cacheutil, | ||
Gregory Szorc
|
r26442 | error, | ||
Gregory Szorc
|
r32744 | phases, | ||
Yuya Nishihara
|
r38182 | pycompat, | ||
Gregory Szorc
|
r26443 | store, | ||
Gregory Szorc
|
r26442 | util, | ||
Gregory Szorc
|
r26441 | ) | ||
Boris Feld
|
r35775 | def canperformstreamclone(pullop, bundle2=False): | ||
Gregory Szorc
|
r26446 | """Whether it is possible to perform a streaming clone as part of pull. | ||
Gregory Szorc
|
r26445 | |||
Boris Feld
|
r35775 | ``bundle2`` will cause the function to consider stream clone through | ||
bundle2 and only through bundle2. | ||||
Gregory Szorc
|
r26467 | |||
Gregory Szorc
|
r26446 | Returns a tuple of (supported, requirements). ``supported`` is True if | ||
streaming clone is supported and False otherwise. ``requirements`` is | ||||
a set of repo requirements from the remote, or ``None`` if stream clone | ||||
isn't supported. | ||||
""" | ||||
Gregory Szorc
|
r26466 | repo = pullop.repo | ||
remote = pullop.remote | ||||
Gregory Szorc
|
r26467 | bundle2supported = False | ||
if pullop.canusebundle2: | ||||
Boris Feld
|
r35775 | if 'v2' in pullop.remotebundle2caps.get('stream', []): | ||
Gregory Szorc
|
r26467 | bundle2supported = True | ||
# else | ||||
# Server doesn't support bundle2 stream clone or doesn't support | ||||
# the versions we support. Fall back and possibly allow legacy. | ||||
# Ensures legacy code path uses available bundle2. | ||||
Boris Feld
|
r35775 | if bundle2supported and not bundle2: | ||
Gregory Szorc
|
r26467 | return False, None | ||
# Ensures bundle2 doesn't try to do a stream clone if it isn't supported. | ||||
Boris Feld
|
r35775 | elif bundle2 and not bundle2supported: | ||
return False, None | ||||
Gregory Szorc
|
r26467 | |||
Gregory Szorc
|
r26447 | # Streaming clone only works on empty repositories. | ||
if len(repo): | ||||
return False, None | ||||
Gregory Szorc
|
r26446 | # Streaming clone only works if all data is being requested. | ||
Gregory Szorc
|
r26466 | if pullop.heads: | ||
Gregory Szorc
|
r26446 | return False, None | ||
Gregory Szorc
|
r26445 | |||
Gregory Szorc
|
r26466 | streamrequested = pullop.streamclonerequested | ||
Gregory Szorc
|
r26446 | # If we don't have a preference, let the server decide for us. This | ||
# likely only comes into play in LANs. | ||||
if streamrequested is None: | ||||
# The server can advertise whether to prefer streaming clone. | ||||
streamrequested = remote.capable('stream-preferred') | ||||
if not streamrequested: | ||||
return False, None | ||||
Gregory Szorc
|
r26445 | |||
Gregory Szorc
|
r26446 | # In order for stream clone to work, the client has to support all the | ||
# requirements advertised by the server. | ||||
# | ||||
# The server advertises its requirements via the "stream" and "streamreqs" | ||||
# capability. "stream" (a value-less capability) is advertised if and only | ||||
# if the only requirement is "revlogv1." Else, the "streamreqs" capability | ||||
# is advertised and contains a comma-delimited list of requirements. | ||||
requirements = set() | ||||
if remote.capable('stream'): | ||||
requirements.add('revlogv1') | ||||
else: | ||||
streamreqs = remote.capable('streamreqs') | ||||
# This is weird and shouldn't happen with modern servers. | ||||
if not streamreqs: | ||||
Siddharth Agarwal
|
r32259 | pullop.repo.ui.warn(_( | ||
'warning: stream clone requested but server has them ' | ||||
'disabled\n')) | ||||
Gregory Szorc
|
r26446 | return False, None | ||
streamreqs = set(streamreqs.split(',')) | ||||
# Server requires something we don't support. Bail. | ||||
Siddharth Agarwal
|
r32259 | missingreqs = streamreqs - repo.supportedformats | ||
if missingreqs: | ||||
pullop.repo.ui.warn(_( | ||||
'warning: stream clone requested but client is missing ' | ||||
'requirements: %s\n') % ', '.join(sorted(missingreqs))) | ||||
pullop.repo.ui.warn( | ||||
_('(see https://www.mercurial-scm.org/wiki/MissingRequirement ' | ||||
'for more information)\n')) | ||||
Gregory Szorc
|
r26446 | return False, None | ||
requirements = streamreqs | ||||
return True, requirements | ||||
Gregory Szorc
|
r26462 | def maybeperformlegacystreamclone(pullop): | ||
"""Possibly perform a legacy stream clone operation. | ||||
Legacy stream clones are performed as part of pull but before all other | ||||
operations. | ||||
A legacy stream clone will not be performed if a bundle2 stream clone is | ||||
supported. | ||||
""" | ||||
Gregory Szorc
|
r39736 | from . import localrepo | ||
Gregory Szorc
|
r26466 | supported, requirements = canperformstreamclone(pullop) | ||
Gregory Szorc
|
r26458 | |||
Gregory Szorc
|
r26446 | if not supported: | ||
return | ||||
Gregory Szorc
|
r26466 | repo = pullop.repo | ||
remote = pullop.remote | ||||
Gregory Szorc
|
r26459 | # Save remote branchmap. We will use it later to speed up branchcache | ||
# creation. | ||||
rbranchmap = None | ||||
if remote.capable('branchmap'): | ||||
Gregory Szorc
|
r37656 | with remote.commandexecutor() as e: | ||
rbranchmap = e.callcommand('branchmap', {}).result() | ||||
Gregory Szorc
|
r26459 | |||
Gregory Szorc
|
r26470 | repo.ui.status(_('streaming all changes\n')) | ||
Gregory Szorc
|
r37656 | with remote.commandexecutor() as e: | ||
fp = e.callcommand('stream_out', {}).result() | ||||
# TODO strictly speaking, this code should all be inside the context | ||||
# manager because the context manager is supposed to ensure all wire state | ||||
# is flushed when exiting. But the legacy peers don't do this, so it | ||||
# doesn't matter. | ||||
Gregory Szorc
|
r26459 | l = fp.readline() | ||
try: | ||||
resp = int(l) | ||||
except ValueError: | ||||
raise error.ResponseError( | ||||
_('unexpected response from remote server:'), l) | ||||
if resp == 1: | ||||
Pierre-Yves David
|
r26587 | raise error.Abort(_('operation forbidden by server')) | ||
Gregory Szorc
|
r26459 | elif resp == 2: | ||
Pierre-Yves David
|
r26587 | raise error.Abort(_('locking the remote repository failed')) | ||
Gregory Szorc
|
r26459 | elif resp != 0: | ||
Pierre-Yves David
|
r26587 | raise error.Abort(_('the server sent an unknown error code')) | ||
Gregory Szorc
|
r26459 | |||
Gregory Szorc
|
r26468 | l = fp.readline() | ||
try: | ||||
filecount, bytecount = map(int, l.split(' ', 1)) | ||||
except (ValueError, TypeError): | ||||
raise error.ResponseError( | ||||
_('unexpected response from remote server:'), l) | ||||
Bryan O'Sullivan
|
r27850 | with repo.lock(): | ||
Gregory Szorc
|
r26468 | consumev1(repo, fp, filecount, bytecount) | ||
Gregory Szorc
|
r26461 | |||
# new requirements = old non-format requirements + | ||||
# new format-related remote requirements | ||||
# requirements from the streamed-in repository | ||||
repo.requirements = requirements | ( | ||||
repo.requirements - repo.supportedformats) | ||||
Gregory Szorc
|
r39736 | repo.svfs.options = localrepo.resolvestorevfsoptions( | ||
repo.ui, repo.requirements) | ||||
Gregory Szorc
|
r26461 | repo._writerequirements() | ||
if rbranchmap: | ||||
branchmap.replacecache(repo, rbranchmap) | ||||
repo.invalidate() | ||||
Gregory Szorc
|
r26445 | |||
Gregory Szorc
|
r32744 | def allowservergeneration(repo): | ||
Gregory Szorc
|
r26444 | """Whether streaming clones are allowed from the server.""" | ||
Jun Wu
|
r33499 | if not repo.ui.configbool('server', 'uncompressed', untrusted=True): | ||
Gregory Szorc
|
r32744 | return False | ||
# The way stream clone works makes it impossible to hide secret changesets. | ||||
# So don't allow this by default. | ||||
secret = phases.hassecret(repo) | ||||
if secret: | ||||
r33223 | return repo.ui.configbool('server', 'uncompressedallowsecret') | |||
Gregory Szorc
|
r32744 | |||
return True | ||||
Gregory Szorc
|
r26444 | |||
Gregory Szorc
|
r26443 | # This is it's own function so extensions can override it. | ||
def _walkstreamfiles(repo): | ||||
return repo.store.walk() | ||||
def generatev1(repo): | ||||
"""Emit content for version 1 of a streaming clone. | ||||
Gregory Szorc
|
r26469 | This returns a 3-tuple of (file count, byte size, data iterator). | ||
Gregory Szorc
|
r26443 | |||
Gregory Szorc
|
r26469 | The data iterator consists of N entries for each file being transferred. | ||
Each file entry starts as a line with the file name and integer size | ||||
delimited by a null byte. | ||||
Gregory Szorc
|
r26443 | |||
The raw file data follows. Following the raw file data is the next file | ||||
entry, or EOF. | ||||
When used on the wire protocol, an additional line indicating protocol | ||||
success will be prepended to the stream. This function is not responsible | ||||
for adding it. | ||||
This function will obtain a repository lock to ensure a consistent view of | ||||
the store is captured. It therefore may raise LockError. | ||||
""" | ||||
entries = [] | ||||
total_bytes = 0 | ||||
# Get consistent snapshot of repo, lock during scan. | ||||
Bryan O'Sullivan
|
r27845 | with repo.lock(): | ||
Gregory Szorc
|
r26443 | repo.ui.debug('scanning\n') | ||
for name, ename, size in _walkstreamfiles(repo): | ||||
if size: | ||||
entries.append((name, size)) | ||||
total_bytes += size | ||||
repo.ui.debug('%d files, %d bytes to transfer\n' % | ||||
(len(entries), total_bytes)) | ||||
svfs = repo.svfs | ||||
debugflag = repo.ui.debugflag | ||||
Gregory Szorc
|
r26469 | def emitrevlogdata(): | ||
r33256 | for name, size in entries: | |||
if debugflag: | ||||
repo.ui.debug('sending %s (%d bytes)\n' % (name, size)) | ||||
# partially encode name over the wire for backwards compat | ||||
yield '%s\0%d\n' % (store.encodedir(name), size) | ||||
Yuya Nishihara
|
r33411 | # auditing at this stage is both pointless (paths are already | ||
# trusted by the local repo) and expensive | ||||
Yuya Nishihara
|
r33410 | with svfs(name, 'rb', auditpath=False) as fp: | ||
if size <= 65536: | ||||
r33256 | yield fp.read(size) | |||
Yuya Nishihara
|
r33410 | else: | ||
for chunk in util.filechunkiter(fp, limit=size): | ||||
yield chunk | ||||
Gregory Szorc
|
r26469 | |||
return len(entries), total_bytes, emitrevlogdata() | ||||
def generatev1wireproto(repo): | ||||
"""Emit content for version 1 of streaming clone suitable for the wire. | ||||
Gregory Szorc
|
r35507 | This is the data output from ``generatev1()`` with 2 header lines. The | ||
first line indicates overall success. The 2nd contains the file count and | ||||
byte size of payload. | ||||
The success line contains "0" for success, "1" for stream generation not | ||||
allowed, and "2" for error locking the repository (possibly indicating | ||||
a permissions error for the server process). | ||||
Gregory Szorc
|
r26469 | """ | ||
Gregory Szorc
|
r35507 | if not allowservergeneration(repo): | ||
yield '1\n' | ||||
return | ||||
try: | ||||
filecount, bytecount, it = generatev1(repo) | ||||
except error.LockError: | ||||
yield '2\n' | ||||
return | ||||
# Indicates successful response. | ||||
yield '0\n' | ||||
Gregory Szorc
|
r26469 | yield '%d %d\n' % (filecount, bytecount) | ||
for chunk in it: | ||||
yield chunk | ||||
Gregory Szorc
|
r26443 | |||
Gregory Szorc
|
r26755 | def generatebundlev1(repo, compression='UN'): | ||
"""Emit content for version 1 of a stream clone bundle. | ||||
The first 4 bytes of the output ("HGS1") denote this as stream clone | ||||
bundle version 1. | ||||
The next 2 bytes indicate the compression type. Only "UN" is currently | ||||
supported. | ||||
The next 16 bytes are two 64-bit big endian unsigned integers indicating | ||||
file count and byte count, respectively. | ||||
The next 2 bytes is a 16-bit big endian unsigned short declaring the length | ||||
of the requirements string, including a trailing \0. The following N bytes | ||||
are the requirements string, which is ASCII containing a comma-delimited | ||||
list of repo requirements that are needed to support the data. | ||||
The remaining content is the output of ``generatev1()`` (which may be | ||||
compressed in the future). | ||||
Returns a tuple of (requirements, data generator). | ||||
""" | ||||
if compression != 'UN': | ||||
raise ValueError('we do not support the compression argument yet') | ||||
requirements = repo.requirements & repo.supportedformats | ||||
requires = ','.join(sorted(requirements)) | ||||
def gen(): | ||||
yield 'HGS1' | ||||
yield compression | ||||
filecount, bytecount, it = generatev1(repo) | ||||
repo.ui.status(_('writing %d bytes for %d files\n') % | ||||
(bytecount, filecount)) | ||||
yield struct.pack('>QQ', filecount, bytecount) | ||||
yield struct.pack('>H', len(requires) + 1) | ||||
yield requires + '\0' | ||||
# This is where we'll add compression in the future. | ||||
assert compression == 'UN' | ||||
Martin von Zweigbergk
|
r38368 | progress = repo.ui.makeprogress(_('bundle'), total=bytecount, | ||
unit=_('bytes')) | ||||
progress.update(0) | ||||
Gregory Szorc
|
r26755 | |||
for chunk in it: | ||||
Martin von Zweigbergk
|
r38368 | progress.increment(step=len(chunk)) | ||
Gregory Szorc
|
r26755 | yield chunk | ||
Martin von Zweigbergk
|
r38392 | progress.complete() | ||
Gregory Szorc
|
r26755 | |||
return requirements, gen() | ||||
Gregory Szorc
|
r26468 | def consumev1(repo, fp, filecount, bytecount): | ||
Gregory Szorc
|
r26443 | """Apply the contents from version 1 of a streaming clone file handle. | ||
Mads Kiilerich
|
r30332 | This takes the output from "stream_out" and applies it to the specified | ||
Gregory Szorc
|
r26443 | repository. | ||
Mads Kiilerich
|
r30332 | Like "stream_out," the status line added by the wire protocol is not | ||
handled by this function. | ||||
Gregory Szorc
|
r26443 | """ | ||
Bryan O'Sullivan
|
r27859 | with repo.lock(): | ||
Gregory Szorc
|
r26443 | repo.ui.status(_('%d files to transfer, %s of data\n') % | ||
Gregory Szorc
|
r26468 | (filecount, util.bytecount(bytecount))) | ||
Martin von Zweigbergk
|
r38368 | progress = repo.ui.makeprogress(_('clone'), total=bytecount, | ||
unit=_('bytes')) | ||||
progress.update(0) | ||||
Simon Farnsworth
|
r30975 | start = util.timer() | ||
Gregory Szorc
|
r26443 | |||
FUJIWARA Katsunori
|
r29919 | # TODO: get rid of (potential) inconsistency | ||
# | ||||
# If transaction is started and any @filecache property is | ||||
# changed at this point, it causes inconsistency between | ||||
# in-memory cached property and streamclone-ed file on the | ||||
# disk. Nested transaction prevents transaction scope "clone" | ||||
# below from writing in-memory changes out at the end of it, | ||||
# even though in-memory changes are discarded at the end of it | ||||
# regardless of transaction nesting. | ||||
# | ||||
# But transaction nesting can't be simply prohibited, because | ||||
# nesting occurs also in ordinary case (e.g. enabling | ||||
# clonebundles). | ||||
Bryan O'Sullivan
|
r27870 | with repo.transaction('clone'): | ||
Gregory Szorc
|
r27897 | with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount): | ||
Gregory Szorc
|
r38806 | for i in pycompat.xrange(filecount): | ||
Gregory Szorc
|
r27896 | # XXX doesn't support '\n' or '\r' in filenames | ||
l = fp.readline() | ||||
try: | ||||
name, size = l.split('\0', 1) | ||||
size = int(size) | ||||
except (ValueError, TypeError): | ||||
raise error.ResponseError( | ||||
_('unexpected response from remote server:'), l) | ||||
if repo.ui.debugflag: | ||||
repo.ui.debug('adding %s (%s)\n' % | ||||
(name, util.bytecount(size))) | ||||
# for backwards compat, name was partially encoded | ||||
Gregory Szorc
|
r27897 | path = store.decodedir(name) | ||
with repo.svfs(path, 'w', backgroundclose=True) as ofp: | ||||
Gregory Szorc
|
r27896 | for chunk in util.filechunkiter(fp, limit=size): | ||
Martin von Zweigbergk
|
r38368 | progress.increment(step=len(chunk)) | ||
Gregory Szorc
|
r27896 | ofp.write(chunk) | ||
Gregory Szorc
|
r26443 | |||
FUJIWARA Katsunori
|
r29919 | # force @filecache properties to be reloaded from | ||
# streamclone-ed file at next access | ||||
repo.invalidate(clearfilecache=True) | ||||
Gregory Szorc
|
r26443 | |||
Simon Farnsworth
|
r30975 | elapsed = util.timer() - start | ||
Gregory Szorc
|
r26443 | if elapsed <= 0: | ||
elapsed = 0.001 | ||||
Martin von Zweigbergk
|
r38392 | progress.complete() | ||
Gregory Szorc
|
r26443 | repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') % | ||
Gregory Szorc
|
r26468 | (util.bytecount(bytecount), elapsed, | ||
util.bytecount(bytecount / elapsed))) | ||||
Gregory Szorc
|
r26755 | |||
Gregory Szorc
|
r27882 | def readbundle1header(fp): | ||
Gregory Szorc
|
r26755 | compression = fp.read(2) | ||
if compression != 'UN': | ||||
raise error.Abort(_('only uncompressed stream clone bundles are ' | ||||
'supported; got %s') % compression) | ||||
filecount, bytecount = struct.unpack('>QQ', fp.read(16)) | ||||
requireslen = struct.unpack('>H', fp.read(2))[0] | ||||
requires = fp.read(requireslen) | ||||
if not requires.endswith('\0'): | ||||
raise error.Abort(_('malformed stream clone bundle: ' | ||||
'requirements not properly encoded')) | ||||
requirements = set(requires.rstrip('\0').split(',')) | ||||
Gregory Szorc
|
r27882 | |||
return filecount, bytecount, requirements | ||||
def applybundlev1(repo, fp): | ||||
"""Apply the content from a stream clone bundle version 1. | ||||
We assume the 4 byte header has been read and validated and the file handle | ||||
is at the 2 byte compression identifier. | ||||
""" | ||||
if len(repo): | ||||
raise error.Abort(_('cannot apply stream clone bundle on non-empty ' | ||||
'repo')) | ||||
filecount, bytecount, requirements = readbundle1header(fp) | ||||
Gregory Szorc
|
r26755 | missingreqs = requirements - repo.supportedformats | ||
if missingreqs: | ||||
raise error.Abort(_('unable to apply stream clone: ' | ||||
'unsupported format: %s') % | ||||
', '.join(sorted(missingreqs))) | ||||
consumev1(repo, fp, filecount, bytecount) | ||||
class streamcloneapplier(object): | ||||
"""Class to manage applying streaming clone bundles. | ||||
We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle | ||||
readers to perform bundle type-specific functionality. | ||||
""" | ||||
def __init__(self, fh): | ||||
self._fh = fh | ||||
def apply(self, repo): | ||||
return applybundlev1(repo, self._fh) | ||||
Boris Feld
|
r35774 | |||
Boris Feld
|
r35783 | # type of file to stream | ||
_fileappend = 0 # append only file | ||||
_filefull = 1 # full snapshot file | ||||
Boris Feld
|
r35785 | # Source of the file | ||
_srcstore = 's' # store (svfs) | ||||
_srccache = 'c' # cache (cache) | ||||
Boris Feld
|
r35783 | # This is it's own function so extensions can override it. | ||
def _walkstreamfullstorefiles(repo): | ||||
"""list snapshot file from the store""" | ||||
fnames = [] | ||||
if not repo.publishing(): | ||||
fnames.append('phaseroots') | ||||
return fnames | ||||
Boris Feld
|
r35785 | def _filterfull(entry, copy, vfsmap): | ||
Boris Feld
|
r35783 | """actually copy the snapshot files""" | ||
Boris Feld
|
r35785 | src, name, ftype, data = entry | ||
Boris Feld
|
r35783 | if ftype != _filefull: | ||
return entry | ||||
Boris Feld
|
r35785 | return (src, name, ftype, copy(vfsmap[src].join(name))) | ||
Boris Feld
|
r35783 | |||
@contextlib.contextmanager | ||||
def maketempcopies(): | ||||
"""return a function to temporary copy file""" | ||||
files = [] | ||||
try: | ||||
def copy(src): | ||||
Yuya Nishihara
|
r38182 | fd, dst = pycompat.mkstemp() | ||
Boris Feld
|
r35783 | os.close(fd) | ||
files.append(dst) | ||||
util.copyfiles(src, dst, hardlink=True) | ||||
return dst | ||||
yield copy | ||||
finally: | ||||
for tmp in files: | ||||
util.tryunlink(tmp) | ||||
Boris Feld
|
r35785 | def _makemap(repo): | ||
"""make a (src -> vfs) map for the repo""" | ||||
vfsmap = { | ||||
_srcstore: repo.svfs, | ||||
_srccache: repo.cachevfs, | ||||
} | ||||
# we keep repo.vfs out of the on purpose, ther are too many danger there | ||||
# (eg: .hg/hgrc) | ||||
assert repo.vfs not in vfsmap.values() | ||||
return vfsmap | ||||
Boris Feld
|
r35820 | def _emit2(repo, entries, totalfilesize): | ||
Boris Feld
|
r35774 | """actually emit the stream bundle""" | ||
Boris Feld
|
r35785 | vfsmap = _makemap(repo) | ||
Martin von Zweigbergk
|
r38368 | progress = repo.ui.makeprogress(_('bundle'), total=totalfilesize, | ||
unit=_('bytes')) | ||||
progress.update(0) | ||||
Martin von Zweigbergk
|
r38393 | with maketempcopies() as copy, progress: | ||
# copy is delayed until we are in the try | ||||
entries = [_filterfull(e, copy, vfsmap) for e in entries] | ||||
yield None # this release the lock on the repository | ||||
seen = 0 | ||||
Boris Feld
|
r35783 | |||
Martin von Zweigbergk
|
r38393 | for src, name, ftype, data in entries: | ||
vfs = vfsmap[src] | ||||
yield src | ||||
yield util.uvarintencode(len(name)) | ||||
if ftype == _fileappend: | ||||
fp = vfs(name) | ||||
size = data | ||||
elif ftype == _filefull: | ||||
fp = open(data, 'rb') | ||||
size = util.fstat(fp).st_size | ||||
try: | ||||
yield util.uvarintencode(size) | ||||
yield name | ||||
if size <= 65536: | ||||
chunks = (fp.read(size),) | ||||
else: | ||||
chunks = util.filechunkiter(fp, limit=size) | ||||
for chunk in chunks: | ||||
seen += len(chunk) | ||||
progress.update(seen) | ||||
yield chunk | ||||
finally: | ||||
fp.close() | ||||
Boris Feld
|
r35774 | |||
def generatev2(repo): | ||||
"""Emit content for version 2 of a streaming clone. | ||||
the data stream consists the following entries: | ||||
Boris Feld
|
r35785 | 1) A char representing the file destination (eg: store or cache) | ||
2) A varint containing the length of the filename | ||||
3) A varint containing the length of file data | ||||
4) N bytes containing the filename (the internal, store-agnostic form) | ||||
5) N bytes containing the file data | ||||
Boris Feld
|
r35774 | |||
Returns a 3-tuple of (file count, file size, data iterator). | ||||
""" | ||||
with repo.lock(): | ||||
entries = [] | ||||
totalfilesize = 0 | ||||
repo.ui.debug('scanning\n') | ||||
for name, ename, size in _walkstreamfiles(repo): | ||||
if size: | ||||
Boris Feld
|
r35785 | entries.append((_srcstore, name, _fileappend, size)) | ||
Boris Feld
|
r35774 | totalfilesize += size | ||
Boris Feld
|
r35783 | for name in _walkstreamfullstorefiles(repo): | ||
if repo.svfs.exists(name): | ||||
totalfilesize += repo.svfs.lstat(name).st_size | ||||
Boris Feld
|
r35785 | entries.append((_srcstore, name, _filefull, None)) | ||
for name in cacheutil.cachetocopy(repo): | ||||
if repo.cachevfs.exists(name): | ||||
totalfilesize += repo.cachevfs.lstat(name).st_size | ||||
entries.append((_srccache, name, _filefull, None)) | ||||
Boris Feld
|
r35774 | |||
Boris Feld
|
r35820 | chunks = _emit2(repo, entries, totalfilesize) | ||
Boris Feld
|
r35783 | first = next(chunks) | ||
assert first is None | ||||
Boris Feld
|
r35774 | |||
return len(entries), totalfilesize, chunks | ||||
Boris Feld
|
r35785 | @contextlib.contextmanager | ||
def nested(*ctxs): | ||||
Augie Fackler
|
r39793 | this = ctxs[0] | ||
rest = ctxs[1:] | ||||
with this: | ||||
if rest: | ||||
with nested(*rest): | ||||
yield | ||||
else: | ||||
Boris Feld
|
r35785 | yield | ||
Boris Feld
|
r35774 | def consumev2(repo, fp, filecount, filesize): | ||
"""Apply the contents from a version 2 streaming clone. | ||||
Data is read from an object that only needs to provide a ``read(size)`` | ||||
method. | ||||
""" | ||||
with repo.lock(): | ||||
repo.ui.status(_('%d files to transfer, %s of data\n') % | ||||
(filecount, util.bytecount(filesize))) | ||||
start = util.timer() | ||||
Martin von Zweigbergk
|
r38368 | progress = repo.ui.makeprogress(_('clone'), total=filesize, | ||
unit=_('bytes')) | ||||
progress.update(0) | ||||
Boris Feld
|
r35774 | |||
Boris Feld
|
r35785 | vfsmap = _makemap(repo) | ||
Boris Feld
|
r35774 | |||
with repo.transaction('clone'): | ||||
Boris Feld
|
r35785 | ctxs = (vfs.backgroundclosing(repo.ui) | ||
for vfs in vfsmap.values()) | ||||
with nested(*ctxs): | ||||
Boris Feld
|
r35774 | for i in range(filecount): | ||
Boris Feld
|
r35821 | src = util.readexactly(fp, 1) | ||
Boris Feld
|
r35785 | vfs = vfsmap[src] | ||
Boris Feld
|
r35774 | namelen = util.uvarintdecodestream(fp) | ||
datalen = util.uvarintdecodestream(fp) | ||||
Boris Feld
|
r35821 | name = util.readexactly(fp, namelen) | ||
Boris Feld
|
r35774 | |||
if repo.ui.debugflag: | ||||
Boris Feld
|
r35785 | repo.ui.debug('adding [%s] %s (%s)\n' % | ||
(src, name, util.bytecount(datalen))) | ||||
Boris Feld
|
r35774 | |||
with vfs(name, 'w') as ofp: | ||||
for chunk in util.filechunkiter(fp, limit=datalen): | ||||
Martin von Zweigbergk
|
r38368 | progress.increment(step=len(chunk)) | ||
Boris Feld
|
r35774 | ofp.write(chunk) | ||
# force @filecache properties to be reloaded from | ||||
# streamclone-ed file at next access | ||||
repo.invalidate(clearfilecache=True) | ||||
elapsed = util.timer() - start | ||||
if elapsed <= 0: | ||||
elapsed = 0.001 | ||||
repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') % | ||||
Martin von Zweigbergk
|
r38368 | (util.bytecount(progress.pos), elapsed, | ||
util.bytecount(progress.pos / elapsed))) | ||||
Martin von Zweigbergk
|
r38392 | progress.complete() | ||
Boris Feld
|
r35774 | |||
def applybundlev2(repo, fp, filecount, filesize, requirements): | ||||
Gregory Szorc
|
r39736 | from . import localrepo | ||
Boris Feld
|
r35774 | missingreqs = [r for r in requirements if r not in repo.supported] | ||
if missingreqs: | ||||
raise error.Abort(_('unable to apply stream clone: ' | ||||
'unsupported format: %s') % | ||||
', '.join(sorted(missingreqs))) | ||||
consumev2(repo, fp, filecount, filesize) | ||||
Boris Feld
|
r35822 | |||
# new requirements = old non-format requirements + | ||||
# new format-related remote requirements | ||||
# requirements from the streamed-in repository | ||||
repo.requirements = set(requirements) | ( | ||||
repo.requirements - repo.supportedformats) | ||||
Gregory Szorc
|
r39736 | repo.svfs.options = localrepo.resolvestorevfsoptions( | ||
repo.ui, repo.requirements) | ||||
Boris Feld
|
r35822 | repo._writerequirements() | ||