# changegroup.py - Mercurial changegroup manipulation functions # # Copyright 2006 Matt Mackall <mpm@selenic.com> # # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. from i18n import _ from node import nullrev import mdiff, util import struct, os, bz2, zlib, tempfile _BUNDLE10_DELTA_HEADER = "20s20s20s20s" def readexactly(stream, n): '''read n bytes from stream.read and abort if less was available''' s = stream.read(n) if len(s) < n: raise util.Abort(_("stream ended unexpectedly" " (got %d bytes, expected %d)") % (len(s), n)) return s def getchunk(stream): """return the next chunk from stream as a string""" d = readexactly(stream, 4) l = struct.unpack(">l", d)[0] if l <= 4: if l: raise util.Abort(_("invalid chunk length %d") % l) return "" return readexactly(stream, l - 4) def chunkheader(length): """return a changegroup chunk header (string)""" return struct.pack(">l", length + 4) def closechunk(): """return a changegroup chunk header (string) for a zero-length chunk""" return struct.pack(">l", 0) class nocompress(object): def compress(self, x): return x def flush(self): return "" bundletypes = { "": ("", nocompress), # only when using unbundle on ssh and old http servers # since the unification ssh accepts a header but there # is no capability signaling it. "HG10UN": ("HG10UN", nocompress), "HG10BZ": ("HG10", lambda: bz2.BZ2Compressor()), "HG10GZ": ("HG10GZ", lambda: zlib.compressobj()), } # hgweb uses this list to communicate its preferred type bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN'] def writebundle(cg, filename, bundletype): """Write a bundle file and return its filename. Existing files will not be overwritten. If no filename is specified, a temporary file is created. bz2 compression can be turned off. The bundle file will be deleted in case of errors. """ fh = None cleanup = None try: if filename: fh = open(filename, "wb") else: fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg") fh = os.fdopen(fd, "wb") cleanup = filename header, compressor = bundletypes[bundletype] fh.write(header) z = compressor() # parse the changegroup data, otherwise we will block # in case of sshrepo because we don't know the end of the stream # an empty chunkgroup is the end of the changegroup # a changegroup has at least 2 chunkgroups (changelog and manifest). # after that, an empty chunkgroup is the end of the changegroup empty = False count = 0 while not empty or count <= 2: empty = True count += 1 while True: chunk = getchunk(cg) if not chunk: break empty = False fh.write(z.compress(chunkheader(len(chunk)))) pos = 0 while pos < len(chunk): next = pos + 2**20 fh.write(z.compress(chunk[pos:next])) pos = next fh.write(z.compress(closechunk())) fh.write(z.flush()) cleanup = None return filename finally: if fh is not None: fh.close() if cleanup is not None: os.unlink(cleanup) def decompressor(fh, alg): if alg == 'UN': return fh elif alg == 'GZ': def generator(f): zd = zlib.decompressobj() for chunk in util.filechunkiter(f): yield zd.decompress(chunk) elif alg == 'BZ': def generator(f): zd = bz2.BZ2Decompressor() zd.decompress("BZ") for chunk in util.filechunkiter(f, 4096): yield zd.decompress(chunk) else: raise util.Abort("unknown bundle compression '%s'" % alg) return util.chunkbuffer(generator(fh)) class unbundle10(object): deltaheader = _BUNDLE10_DELTA_HEADER deltaheadersize = struct.calcsize(deltaheader) def __init__(self, fh, alg): self._stream = decompressor(fh, alg) self._type = alg self.callback = None def compressed(self): return self._type != 'UN' def read(self, l): return self._stream.read(l) def seek(self, pos): return self._stream.seek(pos) def tell(self): return self._stream.tell() def close(self): return self._stream.close() def chunklength(self): d = readexactly(self._stream, 4) l = struct.unpack(">l", d)[0] if l <= 4: if l: raise util.Abort(_("invalid chunk length %d") % l) return 0 if self.callback: self.callback() return l - 4 def changelogheader(self): """v10 does not have a changelog header chunk""" return {} def manifestheader(self): """v10 does not have a manifest header chunk""" return {} def filelogheader(self): """return the header of the filelogs chunk, v10 only has the filename""" l = self.chunklength() if not l: return {} fname = readexactly(self._stream, l) return dict(filename=fname) def _deltaheader(self, headertuple, prevnode): node, p1, p2, cs = headertuple if prevnode is None: deltabase = p1 else: deltabase = prevnode return node, p1, p2, deltabase, cs def deltachunk(self, prevnode): l = self.chunklength() if not l: return {} headerdata = readexactly(self._stream, self.deltaheadersize) header = struct.unpack(self.deltaheader, headerdata) delta = readexactly(self._stream, l - self.deltaheadersize) node, p1, p2, deltabase, cs = self._deltaheader(header, prevnode) return dict(node=node, p1=p1, p2=p2, cs=cs, deltabase=deltabase, delta=delta) class headerlessfixup(object): def __init__(self, fh, h): self._h = h self._fh = fh def read(self, n): if self._h: d, self._h = self._h[:n], self._h[n:] if len(d) < n: d += readexactly(self._fh, n - len(d)) return d return readexactly(self._fh, n) def readbundle(fh, fname): header = readexactly(fh, 6) if not fname: fname = "stream" if not header.startswith('HG') and header.startswith('\0'): fh = headerlessfixup(fh, header) header = "HG10UN" magic, version, alg = header[0:2], header[2:4], header[4:6] if magic != 'HG': raise util.Abort(_('%s: not a Mercurial bundle') % fname) if version != '10': raise util.Abort(_('%s: unknown bundle version %s') % (fname, version)) return unbundle10(fh, alg) class bundle10(object): deltaheader = _BUNDLE10_DELTA_HEADER def __init__(self, lookup): self._lookup = lookup def close(self): return closechunk() def fileheader(self, fname): return chunkheader(len(fname)) + fname def revchunk(self, revlog, rev, prev): node = revlog.node(rev) p1, p2 = revlog.parentrevs(rev) base = prev prefix = '' if base == nullrev: delta = revlog.revision(node) prefix = mdiff.trivialdiffheader(len(delta)) else: delta = revlog.revdiff(base, rev) linknode = self._lookup(revlog, node) p1n, p2n = revlog.parents(node) basenode = revlog.node(base) meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode) meta += prefix l = len(meta) + len(delta) yield chunkheader(l) yield meta yield delta def builddeltaheader(self, node, p1n, p2n, basenode, linknode): # do nothing with basenode, it is implicitly the previous one in HG10 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)