diff --git a/mercurial/streamclone.py b/mercurial/streamclone.py --- a/mercurial/streamclone.py +++ b/mercurial/streamclone.py @@ -7,6 +7,7 @@ from __future__ import absolute_import +import struct import time from .i18n import _ @@ -236,6 +237,61 @@ def generatev1wireproto(repo): for chunk in it: yield chunk +def generatebundlev1(repo, compression='UN'): + """Emit content for version 1 of a stream clone bundle. + + The first 4 bytes of the output ("HGS1") denote this as stream clone + bundle version 1. + + The next 2 bytes indicate the compression type. Only "UN" is currently + supported. + + The next 16 bytes are two 64-bit big endian unsigned integers indicating + file count and byte count, respectively. + + The next 2 bytes is a 16-bit big endian unsigned short declaring the length + of the requirements string, including a trailing \0. The following N bytes + are the requirements string, which is ASCII containing a comma-delimited + list of repo requirements that are needed to support the data. + + The remaining content is the output of ``generatev1()`` (which may be + compressed in the future). + + Returns a tuple of (requirements, data generator). + """ + if compression != 'UN': + raise ValueError('we do not support the compression argument yet') + + requirements = repo.requirements & repo.supportedformats + requires = ','.join(sorted(requirements)) + + def gen(): + yield 'HGS1' + yield compression + + filecount, bytecount, it = generatev1(repo) + repo.ui.status(_('writing %d bytes for %d files\n') % + (bytecount, filecount)) + + yield struct.pack('>QQ', filecount, bytecount) + yield struct.pack('>H', len(requires) + 1) + yield requires + '\0' + + # This is where we'll add compression in the future. + assert compression == 'UN' + + seen = 0 + repo.ui.progress(_('bundle'), 0, total=bytecount) + + for chunk in it: + seen += len(chunk) + repo.ui.progress(_('bundle'), seen, total=bytecount) + yield chunk + + repo.ui.progress(_('bundle'), None) + + return requirements, gen() + def consumev1(repo, fp, filecount, bytecount): """Apply the contents from version 1 of a streaming clone file handle. @@ -290,3 +346,47 @@ def consumev1(repo, fp, filecount, bytec util.bytecount(bytecount / elapsed))) finally: lock.release() + +def applybundlev1(repo, fp): + """Apply the content from a stream clone bundle version 1. + + We assume the 4 byte header has been read and validated and the file handle + is at the 2 byte compression identifier. + """ + if len(repo): + raise error.Abort(_('cannot apply stream clone bundle on non-empty ' + 'repo')) + + compression = fp.read(2) + if compression != 'UN': + raise error.Abort(_('only uncompressed stream clone bundles are ' + 'supported; got %s') % compression) + + filecount, bytecount = struct.unpack('>QQ', fp.read(16)) + requireslen = struct.unpack('>H', fp.read(2))[0] + requires = fp.read(requireslen) + + if not requires.endswith('\0'): + raise error.Abort(_('malformed stream clone bundle: ' + 'requirements not properly encoded')) + + requirements = set(requires.rstrip('\0').split(',')) + missingreqs = requirements - repo.supportedformats + if missingreqs: + raise error.Abort(_('unable to apply stream clone: ' + 'unsupported format: %s') % + ', '.join(sorted(missingreqs))) + + consumev1(repo, fp, filecount, bytecount) + +class streamcloneapplier(object): + """Class to manage applying streaming clone bundles. + + We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle + readers to perform bundle type-specific functionality. + """ + def __init__(self, fh): + self._fh = fh + + def apply(self, repo): + return applybundlev1(repo, self._fh)