diff --git a/mercurial/util.py b/mercurial/util.py --- a/mercurial/util.py +++ b/mercurial/util.py @@ -3000,6 +3000,8 @@ class compressormanager(object): self._bundlenames = {} # Internal bundle identifier to engine name. self._bundletypes = {} + # Revlog header to engine name. + self._revlogheaders = {} # Wire proto identifier to engine name. self._wiretypes = {} @@ -3053,6 +3055,14 @@ class compressormanager(object): self._wiretypes[wiretype] = name + revlogheader = engine.revlogheader() + if revlogheader and revlogheader in self._revlogheaders: + raise error.Abort(_('revlog header %s already registered by %s') % + (revlogheader, self._revlogheaders[revlogheader])) + + if revlogheader: + self._revlogheaders[revlogheader] = name + self._engines[name] = engine @property @@ -3121,6 +3131,13 @@ class compressormanager(object): engine.name()) return engine + def forrevlogheader(self, header): + """Obtain a compression engine registered to a revlog header. + + Will raise KeyError if the revlog header value isn't registered. + """ + return self._engines[self._revlogheaders[header]] + compengines = compressormanager() class compressionengine(object): @@ -3186,6 +3203,16 @@ class compressionengine(object): """ return None + def revlogheader(self): + """Header added to revlog chunks that identifies this engine. + + If this engine can be used to compress revlogs, this method should + return the bytes used to identify chunks compressed with this engine. + Else, the method should return ``None`` to indicate it does not + participate in revlog compression. + """ + return None + def compressstream(self, it, opts=None): """Compress an iterator of chunks. @@ -3215,6 +3242,13 @@ class compressionengine(object): the data could not be compressed (too small, not compressible, etc). The returned data should have a header uniquely identifying this compression format so decompression can be routed to this engine. + This header should be identified by the ``revlogheader()`` return + value. + + The object has a ``decompress(data)`` method that decompresses + data. The method will only be called if ``data`` begins with + ``revlogheader()``. The method should return the raw, uncompressed + data or raise a ``RevlogError``. The object is reusable but is not thread safe. """ @@ -3230,6 +3264,9 @@ class _zlibengine(compressionengine): def wireprotosupport(self): return compewireprotosupport('zlib', 20, 20) + def revlogheader(self): + return 'x' + def compressstream(self, it, opts=None): opts = opts or {} @@ -3286,6 +3323,13 @@ class _zlibengine(compressionengine): return ''.join(parts) return None + def decompress(self, data): + try: + return zlib.decompress(data) + except zlib.error as e: + raise error.RevlogError(_('revlog decompress error: %s') % + str(e)) + def revlogcompressor(self, opts=None): return self.zlibrevlogcompressor() @@ -3357,6 +3401,9 @@ class _noopengine(compressionengine): def wireprotosupport(self): return compewireprotosupport('none', 0, 10) + # We don't implement revlogheader because it is handled specially + # in the revlog class. + def compressstream(self, it, opts=None): return it @@ -3397,6 +3444,9 @@ class _zstdengine(compressionengine): def wireprotosupport(self): return compewireprotosupport('zstd', 50, 50) + def revlogheader(self): + return '\x28' + def compressstream(self, it, opts=None): opts = opts or {} # zstd level 3 is almost always significantly faster than zlib @@ -3425,7 +3475,9 @@ class _zstdengine(compressionengine): # pre-allocate a buffer to hold the result. self._cctx = zstd.ZstdCompressor(level=level, write_content_size=True) + self._dctx = zstd.ZstdDecompressor() self._compinsize = zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE + self._decompinsize = zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE def compress(self, data): insize = len(data) @@ -3456,6 +3508,28 @@ class _zstdengine(compressionengine): return ''.join(chunks) return None + def decompress(self, data): + insize = len(data) + + try: + # This was measured to be faster than other streaming + # decompressors. + dobj = self._dctx.decompressobj() + chunks = [] + pos = 0 + while pos < insize: + pos2 = pos + self._decompinsize + chunk = dobj.decompress(data[pos:pos2]) + if chunk: + chunks.append(chunk) + pos = pos2 + # Frame should be exhausted, so no finish() API. + + return ''.join(chunks) + except Exception as e: + raise error.RevlogError(_('revlog decompress error: %s') % + str(e)) + def revlogcompressor(self, opts=None): opts = opts or {} return self.zstdrevlogcompressor(self._module,