# HG changeset patch # User Gregory Szorc # Date 2015-10-17 18:14:52 # Node ID bb0b955d050d555139416cbde6a13351ccacf79c # Parent e7e1528cf2000127d45209921a5b19e865d381fc streamclone: support for producing and consuming stream clone bundles Up to this point, stream clones only existed as a dynamically generated data format produced and consumed during streaming clones. In order to support this efficient cloning format with the clone bundles feature, we need a more formal, on disk representation of the streaming clone data. This patch introduces a new "bundle" type for streaming clones. Unlike existing bundles, it does not contain changegroup data. It does, however, share the same concepts like the 4 byte header which identifies the type of data that follows and the 2 byte abbreviation for compression types (of which only "UN" is currently supported). The new bundle format is essentially the existing stream clone version 1 data format with some headers at the beginning. Content negotiation at stream clone request time checked for repository format/requirements compatibility before initiating a stream clone. We can't do active content negotiation when using clone bundles. So, we put this set of requirements inside the payload so consumers have a built-in mechanism for checking compatibility before reading and applying lots of data. Of course, we will also advertise this requirements set in clone bundles. But that's for another patch. We currently don't have a mechanism to produce and consume this new bundle format. This will be implemented in upcoming patches. It's worth noting that if a legacy client attempts to `hg unbundle` a stream clone bundle (with the "HGS1" header), it will abort with: "unknown bundle version S1," which seems appropriate. diff --git a/mercurial/streamclone.py b/mercurial/streamclone.py --- a/mercurial/streamclone.py +++ b/mercurial/streamclone.py @@ -7,6 +7,7 @@ from __future__ import absolute_import +import struct import time from .i18n import _ @@ -236,6 +237,61 @@ def generatev1wireproto(repo): for chunk in it: yield chunk +def generatebundlev1(repo, compression='UN'): + """Emit content for version 1 of a stream clone bundle. + + The first 4 bytes of the output ("HGS1") denote this as stream clone + bundle version 1. + + The next 2 bytes indicate the compression type. Only "UN" is currently + supported. + + The next 16 bytes are two 64-bit big endian unsigned integers indicating + file count and byte count, respectively. + + The next 2 bytes is a 16-bit big endian unsigned short declaring the length + of the requirements string, including a trailing \0. The following N bytes + are the requirements string, which is ASCII containing a comma-delimited + list of repo requirements that are needed to support the data. + + The remaining content is the output of ``generatev1()`` (which may be + compressed in the future). + + Returns a tuple of (requirements, data generator). + """ + if compression != 'UN': + raise ValueError('we do not support the compression argument yet') + + requirements = repo.requirements & repo.supportedformats + requires = ','.join(sorted(requirements)) + + def gen(): + yield 'HGS1' + yield compression + + filecount, bytecount, it = generatev1(repo) + repo.ui.status(_('writing %d bytes for %d files\n') % + (bytecount, filecount)) + + yield struct.pack('>QQ', filecount, bytecount) + yield struct.pack('>H', len(requires) + 1) + yield requires + '\0' + + # This is where we'll add compression in the future. + assert compression == 'UN' + + seen = 0 + repo.ui.progress(_('bundle'), 0, total=bytecount) + + for chunk in it: + seen += len(chunk) + repo.ui.progress(_('bundle'), seen, total=bytecount) + yield chunk + + repo.ui.progress(_('bundle'), None) + + return requirements, gen() + def consumev1(repo, fp, filecount, bytecount): """Apply the contents from version 1 of a streaming clone file handle. @@ -290,3 +346,47 @@ def consumev1(repo, fp, filecount, bytec util.bytecount(bytecount / elapsed))) finally: lock.release() + +def applybundlev1(repo, fp): + """Apply the content from a stream clone bundle version 1. + + We assume the 4 byte header has been read and validated and the file handle + is at the 2 byte compression identifier. + """ + if len(repo): + raise error.Abort(_('cannot apply stream clone bundle on non-empty ' + 'repo')) + + compression = fp.read(2) + if compression != 'UN': + raise error.Abort(_('only uncompressed stream clone bundles are ' + 'supported; got %s') % compression) + + filecount, bytecount = struct.unpack('>QQ', fp.read(16)) + requireslen = struct.unpack('>H', fp.read(2))[0] + requires = fp.read(requireslen) + + if not requires.endswith('\0'): + raise error.Abort(_('malformed stream clone bundle: ' + 'requirements not properly encoded')) + + requirements = set(requires.rstrip('\0').split(',')) + missingreqs = requirements - repo.supportedformats + if missingreqs: + raise error.Abort(_('unable to apply stream clone: ' + 'unsupported format: %s') % + ', '.join(sorted(missingreqs))) + + consumev1(repo, fp, filecount, bytecount) + +class streamcloneapplier(object): + """Class to manage applying streaming clone bundles. + + We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle + readers to perform bundle type-specific functionality. + """ + def __init__(self, fh): + self._fh = fh + + def apply(self, repo): + return applybundlev1(repo, self._fh)