##// END OF EJS Templates
compression: tell pytype to not sweat a missing `zstd` module...
Augie Fackler -
r43776:9ce76155 default
parent child Browse files
Show More
@@ -1,808 +1,808
1 # compression.py - Mercurial utility functions for compression
1 # compression.py - Mercurial utility functions for compression
2 #
2 #
3 # This software may be used and distributed according to the terms of the
3 # This software may be used and distributed according to the terms of the
4 # GNU General Public License version 2 or any later version.
4 # GNU General Public License version 2 or any later version.
5
5
6
6
7 from __future__ import absolute_import, print_function
7 from __future__ import absolute_import, print_function
8
8
9 import bz2
9 import bz2
10 import collections
10 import collections
11 import zlib
11 import zlib
12
12
13 from ..pycompat import getattr
13 from ..pycompat import getattr
14 from .. import (
14 from .. import (
15 error,
15 error,
16 i18n,
16 i18n,
17 pycompat,
17 pycompat,
18 )
18 )
19 from . import stringutil
19 from . import stringutil
20
20
21 safehasattr = pycompat.safehasattr
21 safehasattr = pycompat.safehasattr
22
22
23
23
24 _ = i18n._
24 _ = i18n._
25
25
26 # compression code
26 # compression code
27
27
28 SERVERROLE = b'server'
28 SERVERROLE = b'server'
29 CLIENTROLE = b'client'
29 CLIENTROLE = b'client'
30
30
31 compewireprotosupport = collections.namedtuple(
31 compewireprotosupport = collections.namedtuple(
32 r'compenginewireprotosupport',
32 r'compenginewireprotosupport',
33 (r'name', r'serverpriority', r'clientpriority'),
33 (r'name', r'serverpriority', r'clientpriority'),
34 )
34 )
35
35
36
36
37 class propertycache(object):
37 class propertycache(object):
38 def __init__(self, func):
38 def __init__(self, func):
39 self.func = func
39 self.func = func
40 self.name = func.__name__
40 self.name = func.__name__
41
41
42 def __get__(self, obj, type=None):
42 def __get__(self, obj, type=None):
43 result = self.func(obj)
43 result = self.func(obj)
44 self.cachevalue(obj, result)
44 self.cachevalue(obj, result)
45 return result
45 return result
46
46
47 def cachevalue(self, obj, value):
47 def cachevalue(self, obj, value):
48 # __dict__ assignment required to bypass __setattr__ (eg: repoview)
48 # __dict__ assignment required to bypass __setattr__ (eg: repoview)
49 obj.__dict__[self.name] = value
49 obj.__dict__[self.name] = value
50
50
51
51
52 class compressormanager(object):
52 class compressormanager(object):
53 """Holds registrations of various compression engines.
53 """Holds registrations of various compression engines.
54
54
55 This class essentially abstracts the differences between compression
55 This class essentially abstracts the differences between compression
56 engines to allow new compression formats to be added easily, possibly from
56 engines to allow new compression formats to be added easily, possibly from
57 extensions.
57 extensions.
58
58
59 Compressors are registered against the global instance by calling its
59 Compressors are registered against the global instance by calling its
60 ``register()`` method.
60 ``register()`` method.
61 """
61 """
62
62
63 def __init__(self):
63 def __init__(self):
64 self._engines = {}
64 self._engines = {}
65 # Bundle spec human name to engine name.
65 # Bundle spec human name to engine name.
66 self._bundlenames = {}
66 self._bundlenames = {}
67 # Internal bundle identifier to engine name.
67 # Internal bundle identifier to engine name.
68 self._bundletypes = {}
68 self._bundletypes = {}
69 # Revlog header to engine name.
69 # Revlog header to engine name.
70 self._revlogheaders = {}
70 self._revlogheaders = {}
71 # Wire proto identifier to engine name.
71 # Wire proto identifier to engine name.
72 self._wiretypes = {}
72 self._wiretypes = {}
73
73
74 def __getitem__(self, key):
74 def __getitem__(self, key):
75 return self._engines[key]
75 return self._engines[key]
76
76
77 def __contains__(self, key):
77 def __contains__(self, key):
78 return key in self._engines
78 return key in self._engines
79
79
80 def __iter__(self):
80 def __iter__(self):
81 return iter(self._engines.keys())
81 return iter(self._engines.keys())
82
82
83 def register(self, engine):
83 def register(self, engine):
84 """Register a compression engine with the manager.
84 """Register a compression engine with the manager.
85
85
86 The argument must be a ``compressionengine`` instance.
86 The argument must be a ``compressionengine`` instance.
87 """
87 """
88 if not isinstance(engine, compressionengine):
88 if not isinstance(engine, compressionengine):
89 raise ValueError(_(b'argument must be a compressionengine'))
89 raise ValueError(_(b'argument must be a compressionengine'))
90
90
91 name = engine.name()
91 name = engine.name()
92
92
93 if name in self._engines:
93 if name in self._engines:
94 raise error.Abort(
94 raise error.Abort(
95 _(b'compression engine %s already registered') % name
95 _(b'compression engine %s already registered') % name
96 )
96 )
97
97
98 bundleinfo = engine.bundletype()
98 bundleinfo = engine.bundletype()
99 if bundleinfo:
99 if bundleinfo:
100 bundlename, bundletype = bundleinfo
100 bundlename, bundletype = bundleinfo
101
101
102 if bundlename in self._bundlenames:
102 if bundlename in self._bundlenames:
103 raise error.Abort(
103 raise error.Abort(
104 _(b'bundle name %s already registered') % bundlename
104 _(b'bundle name %s already registered') % bundlename
105 )
105 )
106 if bundletype in self._bundletypes:
106 if bundletype in self._bundletypes:
107 raise error.Abort(
107 raise error.Abort(
108 _(b'bundle type %s already registered by %s')
108 _(b'bundle type %s already registered by %s')
109 % (bundletype, self._bundletypes[bundletype])
109 % (bundletype, self._bundletypes[bundletype])
110 )
110 )
111
111
112 # No external facing name declared.
112 # No external facing name declared.
113 if bundlename:
113 if bundlename:
114 self._bundlenames[bundlename] = name
114 self._bundlenames[bundlename] = name
115
115
116 self._bundletypes[bundletype] = name
116 self._bundletypes[bundletype] = name
117
117
118 wiresupport = engine.wireprotosupport()
118 wiresupport = engine.wireprotosupport()
119 if wiresupport:
119 if wiresupport:
120 wiretype = wiresupport.name
120 wiretype = wiresupport.name
121 if wiretype in self._wiretypes:
121 if wiretype in self._wiretypes:
122 raise error.Abort(
122 raise error.Abort(
123 _(
123 _(
124 b'wire protocol compression %s already '
124 b'wire protocol compression %s already '
125 b'registered by %s'
125 b'registered by %s'
126 )
126 )
127 % (wiretype, self._wiretypes[wiretype])
127 % (wiretype, self._wiretypes[wiretype])
128 )
128 )
129
129
130 self._wiretypes[wiretype] = name
130 self._wiretypes[wiretype] = name
131
131
132 revlogheader = engine.revlogheader()
132 revlogheader = engine.revlogheader()
133 if revlogheader and revlogheader in self._revlogheaders:
133 if revlogheader and revlogheader in self._revlogheaders:
134 raise error.Abort(
134 raise error.Abort(
135 _(b'revlog header %s already registered by %s')
135 _(b'revlog header %s already registered by %s')
136 % (revlogheader, self._revlogheaders[revlogheader])
136 % (revlogheader, self._revlogheaders[revlogheader])
137 )
137 )
138
138
139 if revlogheader:
139 if revlogheader:
140 self._revlogheaders[revlogheader] = name
140 self._revlogheaders[revlogheader] = name
141
141
142 self._engines[name] = engine
142 self._engines[name] = engine
143
143
144 @property
144 @property
145 def supportedbundlenames(self):
145 def supportedbundlenames(self):
146 return set(self._bundlenames.keys())
146 return set(self._bundlenames.keys())
147
147
148 @property
148 @property
149 def supportedbundletypes(self):
149 def supportedbundletypes(self):
150 return set(self._bundletypes.keys())
150 return set(self._bundletypes.keys())
151
151
152 def forbundlename(self, bundlename):
152 def forbundlename(self, bundlename):
153 """Obtain a compression engine registered to a bundle name.
153 """Obtain a compression engine registered to a bundle name.
154
154
155 Will raise KeyError if the bundle type isn't registered.
155 Will raise KeyError if the bundle type isn't registered.
156
156
157 Will abort if the engine is known but not available.
157 Will abort if the engine is known but not available.
158 """
158 """
159 engine = self._engines[self._bundlenames[bundlename]]
159 engine = self._engines[self._bundlenames[bundlename]]
160 if not engine.available():
160 if not engine.available():
161 raise error.Abort(
161 raise error.Abort(
162 _(b'compression engine %s could not be loaded') % engine.name()
162 _(b'compression engine %s could not be loaded') % engine.name()
163 )
163 )
164 return engine
164 return engine
165
165
166 def forbundletype(self, bundletype):
166 def forbundletype(self, bundletype):
167 """Obtain a compression engine registered to a bundle type.
167 """Obtain a compression engine registered to a bundle type.
168
168
169 Will raise KeyError if the bundle type isn't registered.
169 Will raise KeyError if the bundle type isn't registered.
170
170
171 Will abort if the engine is known but not available.
171 Will abort if the engine is known but not available.
172 """
172 """
173 engine = self._engines[self._bundletypes[bundletype]]
173 engine = self._engines[self._bundletypes[bundletype]]
174 if not engine.available():
174 if not engine.available():
175 raise error.Abort(
175 raise error.Abort(
176 _(b'compression engine %s could not be loaded') % engine.name()
176 _(b'compression engine %s could not be loaded') % engine.name()
177 )
177 )
178 return engine
178 return engine
179
179
180 def supportedwireengines(self, role, onlyavailable=True):
180 def supportedwireengines(self, role, onlyavailable=True):
181 """Obtain compression engines that support the wire protocol.
181 """Obtain compression engines that support the wire protocol.
182
182
183 Returns a list of engines in prioritized order, most desired first.
183 Returns a list of engines in prioritized order, most desired first.
184
184
185 If ``onlyavailable`` is set, filter out engines that can't be
185 If ``onlyavailable`` is set, filter out engines that can't be
186 loaded.
186 loaded.
187 """
187 """
188 assert role in (SERVERROLE, CLIENTROLE)
188 assert role in (SERVERROLE, CLIENTROLE)
189
189
190 attr = b'serverpriority' if role == SERVERROLE else b'clientpriority'
190 attr = b'serverpriority' if role == SERVERROLE else b'clientpriority'
191
191
192 engines = [self._engines[e] for e in self._wiretypes.values()]
192 engines = [self._engines[e] for e in self._wiretypes.values()]
193 if onlyavailable:
193 if onlyavailable:
194 engines = [e for e in engines if e.available()]
194 engines = [e for e in engines if e.available()]
195
195
196 def getkey(e):
196 def getkey(e):
197 # Sort first by priority, highest first. In case of tie, sort
197 # Sort first by priority, highest first. In case of tie, sort
198 # alphabetically. This is arbitrary, but ensures output is
198 # alphabetically. This is arbitrary, but ensures output is
199 # stable.
199 # stable.
200 w = e.wireprotosupport()
200 w = e.wireprotosupport()
201 return -1 * getattr(w, attr), w.name
201 return -1 * getattr(w, attr), w.name
202
202
203 return list(sorted(engines, key=getkey))
203 return list(sorted(engines, key=getkey))
204
204
205 def forwiretype(self, wiretype):
205 def forwiretype(self, wiretype):
206 engine = self._engines[self._wiretypes[wiretype]]
206 engine = self._engines[self._wiretypes[wiretype]]
207 if not engine.available():
207 if not engine.available():
208 raise error.Abort(
208 raise error.Abort(
209 _(b'compression engine %s could not be loaded') % engine.name()
209 _(b'compression engine %s could not be loaded') % engine.name()
210 )
210 )
211 return engine
211 return engine
212
212
213 def forrevlogheader(self, header):
213 def forrevlogheader(self, header):
214 """Obtain a compression engine registered to a revlog header.
214 """Obtain a compression engine registered to a revlog header.
215
215
216 Will raise KeyError if the revlog header value isn't registered.
216 Will raise KeyError if the revlog header value isn't registered.
217 """
217 """
218 return self._engines[self._revlogheaders[header]]
218 return self._engines[self._revlogheaders[header]]
219
219
220
220
221 compengines = compressormanager()
221 compengines = compressormanager()
222
222
223
223
224 class compressionengine(object):
224 class compressionengine(object):
225 """Base class for compression engines.
225 """Base class for compression engines.
226
226
227 Compression engines must implement the interface defined by this class.
227 Compression engines must implement the interface defined by this class.
228 """
228 """
229
229
230 def name(self):
230 def name(self):
231 """Returns the name of the compression engine.
231 """Returns the name of the compression engine.
232
232
233 This is the key the engine is registered under.
233 This is the key the engine is registered under.
234
234
235 This method must be implemented.
235 This method must be implemented.
236 """
236 """
237 raise NotImplementedError()
237 raise NotImplementedError()
238
238
239 def available(self):
239 def available(self):
240 """Whether the compression engine is available.
240 """Whether the compression engine is available.
241
241
242 The intent of this method is to allow optional compression engines
242 The intent of this method is to allow optional compression engines
243 that may not be available in all installations (such as engines relying
243 that may not be available in all installations (such as engines relying
244 on C extensions that may not be present).
244 on C extensions that may not be present).
245 """
245 """
246 return True
246 return True
247
247
248 def bundletype(self):
248 def bundletype(self):
249 """Describes bundle identifiers for this engine.
249 """Describes bundle identifiers for this engine.
250
250
251 If this compression engine isn't supported for bundles, returns None.
251 If this compression engine isn't supported for bundles, returns None.
252
252
253 If this engine can be used for bundles, returns a 2-tuple of strings of
253 If this engine can be used for bundles, returns a 2-tuple of strings of
254 the user-facing "bundle spec" compression name and an internal
254 the user-facing "bundle spec" compression name and an internal
255 identifier used to denote the compression format within bundles. To
255 identifier used to denote the compression format within bundles. To
256 exclude the name from external usage, set the first element to ``None``.
256 exclude the name from external usage, set the first element to ``None``.
257
257
258 If bundle compression is supported, the class must also implement
258 If bundle compression is supported, the class must also implement
259 ``compressstream`` and `decompressorreader``.
259 ``compressstream`` and `decompressorreader``.
260
260
261 The docstring of this method is used in the help system to tell users
261 The docstring of this method is used in the help system to tell users
262 about this engine.
262 about this engine.
263 """
263 """
264 return None
264 return None
265
265
266 def wireprotosupport(self):
266 def wireprotosupport(self):
267 """Declare support for this compression format on the wire protocol.
267 """Declare support for this compression format on the wire protocol.
268
268
269 If this compression engine isn't supported for compressing wire
269 If this compression engine isn't supported for compressing wire
270 protocol payloads, returns None.
270 protocol payloads, returns None.
271
271
272 Otherwise, returns ``compenginewireprotosupport`` with the following
272 Otherwise, returns ``compenginewireprotosupport`` with the following
273 fields:
273 fields:
274
274
275 * String format identifier
275 * String format identifier
276 * Integer priority for the server
276 * Integer priority for the server
277 * Integer priority for the client
277 * Integer priority for the client
278
278
279 The integer priorities are used to order the advertisement of format
279 The integer priorities are used to order the advertisement of format
280 support by server and client. The highest integer is advertised
280 support by server and client. The highest integer is advertised
281 first. Integers with non-positive values aren't advertised.
281 first. Integers with non-positive values aren't advertised.
282
282
283 The priority values are somewhat arbitrary and only used for default
283 The priority values are somewhat arbitrary and only used for default
284 ordering. The relative order can be changed via config options.
284 ordering. The relative order can be changed via config options.
285
285
286 If wire protocol compression is supported, the class must also implement
286 If wire protocol compression is supported, the class must also implement
287 ``compressstream`` and ``decompressorreader``.
287 ``compressstream`` and ``decompressorreader``.
288 """
288 """
289 return None
289 return None
290
290
291 def revlogheader(self):
291 def revlogheader(self):
292 """Header added to revlog chunks that identifies this engine.
292 """Header added to revlog chunks that identifies this engine.
293
293
294 If this engine can be used to compress revlogs, this method should
294 If this engine can be used to compress revlogs, this method should
295 return the bytes used to identify chunks compressed with this engine.
295 return the bytes used to identify chunks compressed with this engine.
296 Else, the method should return ``None`` to indicate it does not
296 Else, the method should return ``None`` to indicate it does not
297 participate in revlog compression.
297 participate in revlog compression.
298 """
298 """
299 return None
299 return None
300
300
301 def compressstream(self, it, opts=None):
301 def compressstream(self, it, opts=None):
302 """Compress an iterator of chunks.
302 """Compress an iterator of chunks.
303
303
304 The method receives an iterator (ideally a generator) of chunks of
304 The method receives an iterator (ideally a generator) of chunks of
305 bytes to be compressed. It returns an iterator (ideally a generator)
305 bytes to be compressed. It returns an iterator (ideally a generator)
306 of bytes of chunks representing the compressed output.
306 of bytes of chunks representing the compressed output.
307
307
308 Optionally accepts an argument defining how to perform compression.
308 Optionally accepts an argument defining how to perform compression.
309 Each engine treats this argument differently.
309 Each engine treats this argument differently.
310 """
310 """
311 raise NotImplementedError()
311 raise NotImplementedError()
312
312
313 def decompressorreader(self, fh):
313 def decompressorreader(self, fh):
314 """Perform decompression on a file object.
314 """Perform decompression on a file object.
315
315
316 Argument is an object with a ``read(size)`` method that returns
316 Argument is an object with a ``read(size)`` method that returns
317 compressed data. Return value is an object with a ``read(size)`` that
317 compressed data. Return value is an object with a ``read(size)`` that
318 returns uncompressed data.
318 returns uncompressed data.
319 """
319 """
320 raise NotImplementedError()
320 raise NotImplementedError()
321
321
322 def revlogcompressor(self, opts=None):
322 def revlogcompressor(self, opts=None):
323 """Obtain an object that can be used to compress revlog entries.
323 """Obtain an object that can be used to compress revlog entries.
324
324
325 The object has a ``compress(data)`` method that compresses binary
325 The object has a ``compress(data)`` method that compresses binary
326 data. This method returns compressed binary data or ``None`` if
326 data. This method returns compressed binary data or ``None`` if
327 the data could not be compressed (too small, not compressible, etc).
327 the data could not be compressed (too small, not compressible, etc).
328 The returned data should have a header uniquely identifying this
328 The returned data should have a header uniquely identifying this
329 compression format so decompression can be routed to this engine.
329 compression format so decompression can be routed to this engine.
330 This header should be identified by the ``revlogheader()`` return
330 This header should be identified by the ``revlogheader()`` return
331 value.
331 value.
332
332
333 The object has a ``decompress(data)`` method that decompresses
333 The object has a ``decompress(data)`` method that decompresses
334 data. The method will only be called if ``data`` begins with
334 data. The method will only be called if ``data`` begins with
335 ``revlogheader()``. The method should return the raw, uncompressed
335 ``revlogheader()``. The method should return the raw, uncompressed
336 data or raise a ``StorageError``.
336 data or raise a ``StorageError``.
337
337
338 The object is reusable but is not thread safe.
338 The object is reusable but is not thread safe.
339 """
339 """
340 raise NotImplementedError()
340 raise NotImplementedError()
341
341
342
342
343 class _CompressedStreamReader(object):
343 class _CompressedStreamReader(object):
344 def __init__(self, fh):
344 def __init__(self, fh):
345 if safehasattr(fh, 'unbufferedread'):
345 if safehasattr(fh, 'unbufferedread'):
346 self._reader = fh.unbufferedread
346 self._reader = fh.unbufferedread
347 else:
347 else:
348 self._reader = fh.read
348 self._reader = fh.read
349 self._pending = []
349 self._pending = []
350 self._pos = 0
350 self._pos = 0
351 self._eof = False
351 self._eof = False
352
352
353 def _decompress(self, chunk):
353 def _decompress(self, chunk):
354 raise NotImplementedError()
354 raise NotImplementedError()
355
355
356 def read(self, l):
356 def read(self, l):
357 buf = []
357 buf = []
358 while True:
358 while True:
359 while self._pending:
359 while self._pending:
360 if len(self._pending[0]) > l + self._pos:
360 if len(self._pending[0]) > l + self._pos:
361 newbuf = self._pending[0]
361 newbuf = self._pending[0]
362 buf.append(newbuf[self._pos : self._pos + l])
362 buf.append(newbuf[self._pos : self._pos + l])
363 self._pos += l
363 self._pos += l
364 return b''.join(buf)
364 return b''.join(buf)
365
365
366 newbuf = self._pending.pop(0)
366 newbuf = self._pending.pop(0)
367 if self._pos:
367 if self._pos:
368 buf.append(newbuf[self._pos :])
368 buf.append(newbuf[self._pos :])
369 l -= len(newbuf) - self._pos
369 l -= len(newbuf) - self._pos
370 else:
370 else:
371 buf.append(newbuf)
371 buf.append(newbuf)
372 l -= len(newbuf)
372 l -= len(newbuf)
373 self._pos = 0
373 self._pos = 0
374
374
375 if self._eof:
375 if self._eof:
376 return b''.join(buf)
376 return b''.join(buf)
377 chunk = self._reader(65536)
377 chunk = self._reader(65536)
378 self._decompress(chunk)
378 self._decompress(chunk)
379 if not chunk and not self._pending and not self._eof:
379 if not chunk and not self._pending and not self._eof:
380 # No progress and no new data, bail out
380 # No progress and no new data, bail out
381 return b''.join(buf)
381 return b''.join(buf)
382
382
383
383
384 class _GzipCompressedStreamReader(_CompressedStreamReader):
384 class _GzipCompressedStreamReader(_CompressedStreamReader):
385 def __init__(self, fh):
385 def __init__(self, fh):
386 super(_GzipCompressedStreamReader, self).__init__(fh)
386 super(_GzipCompressedStreamReader, self).__init__(fh)
387 self._decompobj = zlib.decompressobj()
387 self._decompobj = zlib.decompressobj()
388
388
389 def _decompress(self, chunk):
389 def _decompress(self, chunk):
390 newbuf = self._decompobj.decompress(chunk)
390 newbuf = self._decompobj.decompress(chunk)
391 if newbuf:
391 if newbuf:
392 self._pending.append(newbuf)
392 self._pending.append(newbuf)
393 d = self._decompobj.copy()
393 d = self._decompobj.copy()
394 try:
394 try:
395 d.decompress(b'x')
395 d.decompress(b'x')
396 d.flush()
396 d.flush()
397 if d.unused_data == b'x':
397 if d.unused_data == b'x':
398 self._eof = True
398 self._eof = True
399 except zlib.error:
399 except zlib.error:
400 pass
400 pass
401
401
402
402
403 class _BZ2CompressedStreamReader(_CompressedStreamReader):
403 class _BZ2CompressedStreamReader(_CompressedStreamReader):
404 def __init__(self, fh):
404 def __init__(self, fh):
405 super(_BZ2CompressedStreamReader, self).__init__(fh)
405 super(_BZ2CompressedStreamReader, self).__init__(fh)
406 self._decompobj = bz2.BZ2Decompressor()
406 self._decompobj = bz2.BZ2Decompressor()
407
407
408 def _decompress(self, chunk):
408 def _decompress(self, chunk):
409 newbuf = self._decompobj.decompress(chunk)
409 newbuf = self._decompobj.decompress(chunk)
410 if newbuf:
410 if newbuf:
411 self._pending.append(newbuf)
411 self._pending.append(newbuf)
412 try:
412 try:
413 while True:
413 while True:
414 newbuf = self._decompobj.decompress(b'')
414 newbuf = self._decompobj.decompress(b'')
415 if newbuf:
415 if newbuf:
416 self._pending.append(newbuf)
416 self._pending.append(newbuf)
417 else:
417 else:
418 break
418 break
419 except EOFError:
419 except EOFError:
420 self._eof = True
420 self._eof = True
421
421
422
422
423 class _TruncatedBZ2CompressedStreamReader(_BZ2CompressedStreamReader):
423 class _TruncatedBZ2CompressedStreamReader(_BZ2CompressedStreamReader):
424 def __init__(self, fh):
424 def __init__(self, fh):
425 super(_TruncatedBZ2CompressedStreamReader, self).__init__(fh)
425 super(_TruncatedBZ2CompressedStreamReader, self).__init__(fh)
426 newbuf = self._decompobj.decompress(b'BZ')
426 newbuf = self._decompobj.decompress(b'BZ')
427 if newbuf:
427 if newbuf:
428 self._pending.append(newbuf)
428 self._pending.append(newbuf)
429
429
430
430
431 class _ZstdCompressedStreamReader(_CompressedStreamReader):
431 class _ZstdCompressedStreamReader(_CompressedStreamReader):
432 def __init__(self, fh, zstd):
432 def __init__(self, fh, zstd):
433 super(_ZstdCompressedStreamReader, self).__init__(fh)
433 super(_ZstdCompressedStreamReader, self).__init__(fh)
434 self._zstd = zstd
434 self._zstd = zstd
435 self._decompobj = zstd.ZstdDecompressor().decompressobj()
435 self._decompobj = zstd.ZstdDecompressor().decompressobj()
436
436
437 def _decompress(self, chunk):
437 def _decompress(self, chunk):
438 newbuf = self._decompobj.decompress(chunk)
438 newbuf = self._decompobj.decompress(chunk)
439 if newbuf:
439 if newbuf:
440 self._pending.append(newbuf)
440 self._pending.append(newbuf)
441 try:
441 try:
442 while True:
442 while True:
443 newbuf = self._decompobj.decompress(b'')
443 newbuf = self._decompobj.decompress(b'')
444 if newbuf:
444 if newbuf:
445 self._pending.append(newbuf)
445 self._pending.append(newbuf)
446 else:
446 else:
447 break
447 break
448 except self._zstd.ZstdError:
448 except self._zstd.ZstdError:
449 self._eof = True
449 self._eof = True
450
450
451
451
452 class _zlibengine(compressionengine):
452 class _zlibengine(compressionengine):
453 def name(self):
453 def name(self):
454 return b'zlib'
454 return b'zlib'
455
455
456 def bundletype(self):
456 def bundletype(self):
457 """zlib compression using the DEFLATE algorithm.
457 """zlib compression using the DEFLATE algorithm.
458
458
459 All Mercurial clients should support this format. The compression
459 All Mercurial clients should support this format. The compression
460 algorithm strikes a reasonable balance between compression ratio
460 algorithm strikes a reasonable balance between compression ratio
461 and size.
461 and size.
462 """
462 """
463 return b'gzip', b'GZ'
463 return b'gzip', b'GZ'
464
464
465 def wireprotosupport(self):
465 def wireprotosupport(self):
466 return compewireprotosupport(b'zlib', 20, 20)
466 return compewireprotosupport(b'zlib', 20, 20)
467
467
468 def revlogheader(self):
468 def revlogheader(self):
469 return b'x'
469 return b'x'
470
470
471 def compressstream(self, it, opts=None):
471 def compressstream(self, it, opts=None):
472 opts = opts or {}
472 opts = opts or {}
473
473
474 z = zlib.compressobj(opts.get(b'level', -1))
474 z = zlib.compressobj(opts.get(b'level', -1))
475 for chunk in it:
475 for chunk in it:
476 data = z.compress(chunk)
476 data = z.compress(chunk)
477 # Not all calls to compress emit data. It is cheaper to inspect
477 # Not all calls to compress emit data. It is cheaper to inspect
478 # here than to feed empty chunks through generator.
478 # here than to feed empty chunks through generator.
479 if data:
479 if data:
480 yield data
480 yield data
481
481
482 yield z.flush()
482 yield z.flush()
483
483
484 def decompressorreader(self, fh):
484 def decompressorreader(self, fh):
485 return _GzipCompressedStreamReader(fh)
485 return _GzipCompressedStreamReader(fh)
486
486
487 class zlibrevlogcompressor(object):
487 class zlibrevlogcompressor(object):
488 def __init__(self, level=None):
488 def __init__(self, level=None):
489 self._level = level
489 self._level = level
490
490
491 def compress(self, data):
491 def compress(self, data):
492 insize = len(data)
492 insize = len(data)
493 # Caller handles empty input case.
493 # Caller handles empty input case.
494 assert insize > 0
494 assert insize > 0
495
495
496 if insize < 44:
496 if insize < 44:
497 return None
497 return None
498
498
499 elif insize <= 1000000:
499 elif insize <= 1000000:
500 if self._level is None:
500 if self._level is None:
501 compressed = zlib.compress(data)
501 compressed = zlib.compress(data)
502 else:
502 else:
503 compressed = zlib.compress(data, self._level)
503 compressed = zlib.compress(data, self._level)
504 if len(compressed) < insize:
504 if len(compressed) < insize:
505 return compressed
505 return compressed
506 return None
506 return None
507
507
508 # zlib makes an internal copy of the input buffer, doubling
508 # zlib makes an internal copy of the input buffer, doubling
509 # memory usage for large inputs. So do streaming compression
509 # memory usage for large inputs. So do streaming compression
510 # on large inputs.
510 # on large inputs.
511 else:
511 else:
512 if self._level is None:
512 if self._level is None:
513 z = zlib.compressobj()
513 z = zlib.compressobj()
514 else:
514 else:
515 z = zlib.compressobj(level=self._level)
515 z = zlib.compressobj(level=self._level)
516 parts = []
516 parts = []
517 pos = 0
517 pos = 0
518 while pos < insize:
518 while pos < insize:
519 pos2 = pos + 2 ** 20
519 pos2 = pos + 2 ** 20
520 parts.append(z.compress(data[pos:pos2]))
520 parts.append(z.compress(data[pos:pos2]))
521 pos = pos2
521 pos = pos2
522 parts.append(z.flush())
522 parts.append(z.flush())
523
523
524 if sum(map(len, parts)) < insize:
524 if sum(map(len, parts)) < insize:
525 return b''.join(parts)
525 return b''.join(parts)
526 return None
526 return None
527
527
528 def decompress(self, data):
528 def decompress(self, data):
529 try:
529 try:
530 return zlib.decompress(data)
530 return zlib.decompress(data)
531 except zlib.error as e:
531 except zlib.error as e:
532 raise error.StorageError(
532 raise error.StorageError(
533 _(b'revlog decompress error: %s')
533 _(b'revlog decompress error: %s')
534 % stringutil.forcebytestr(e)
534 % stringutil.forcebytestr(e)
535 )
535 )
536
536
537 def revlogcompressor(self, opts=None):
537 def revlogcompressor(self, opts=None):
538 level = None
538 level = None
539 if opts is not None:
539 if opts is not None:
540 level = opts.get(b'zlib.level')
540 level = opts.get(b'zlib.level')
541 return self.zlibrevlogcompressor(level)
541 return self.zlibrevlogcompressor(level)
542
542
543
543
544 compengines.register(_zlibengine())
544 compengines.register(_zlibengine())
545
545
546
546
547 class _bz2engine(compressionengine):
547 class _bz2engine(compressionengine):
548 def name(self):
548 def name(self):
549 return b'bz2'
549 return b'bz2'
550
550
551 def bundletype(self):
551 def bundletype(self):
552 """An algorithm that produces smaller bundles than ``gzip``.
552 """An algorithm that produces smaller bundles than ``gzip``.
553
553
554 All Mercurial clients should support this format.
554 All Mercurial clients should support this format.
555
555
556 This engine will likely produce smaller bundles than ``gzip`` but
556 This engine will likely produce smaller bundles than ``gzip`` but
557 will be significantly slower, both during compression and
557 will be significantly slower, both during compression and
558 decompression.
558 decompression.
559
559
560 If available, the ``zstd`` engine can yield similar or better
560 If available, the ``zstd`` engine can yield similar or better
561 compression at much higher speeds.
561 compression at much higher speeds.
562 """
562 """
563 return b'bzip2', b'BZ'
563 return b'bzip2', b'BZ'
564
564
565 # We declare a protocol name but don't advertise by default because
565 # We declare a protocol name but don't advertise by default because
566 # it is slow.
566 # it is slow.
567 def wireprotosupport(self):
567 def wireprotosupport(self):
568 return compewireprotosupport(b'bzip2', 0, 0)
568 return compewireprotosupport(b'bzip2', 0, 0)
569
569
570 def compressstream(self, it, opts=None):
570 def compressstream(self, it, opts=None):
571 opts = opts or {}
571 opts = opts or {}
572 z = bz2.BZ2Compressor(opts.get(b'level', 9))
572 z = bz2.BZ2Compressor(opts.get(b'level', 9))
573 for chunk in it:
573 for chunk in it:
574 data = z.compress(chunk)
574 data = z.compress(chunk)
575 if data:
575 if data:
576 yield data
576 yield data
577
577
578 yield z.flush()
578 yield z.flush()
579
579
580 def decompressorreader(self, fh):
580 def decompressorreader(self, fh):
581 return _BZ2CompressedStreamReader(fh)
581 return _BZ2CompressedStreamReader(fh)
582
582
583
583
584 compengines.register(_bz2engine())
584 compengines.register(_bz2engine())
585
585
586
586
587 class _truncatedbz2engine(compressionengine):
587 class _truncatedbz2engine(compressionengine):
588 def name(self):
588 def name(self):
589 return b'bz2truncated'
589 return b'bz2truncated'
590
590
591 def bundletype(self):
591 def bundletype(self):
592 return None, b'_truncatedBZ'
592 return None, b'_truncatedBZ'
593
593
594 # We don't implement compressstream because it is hackily handled elsewhere.
594 # We don't implement compressstream because it is hackily handled elsewhere.
595
595
596 def decompressorreader(self, fh):
596 def decompressorreader(self, fh):
597 return _TruncatedBZ2CompressedStreamReader(fh)
597 return _TruncatedBZ2CompressedStreamReader(fh)
598
598
599
599
600 compengines.register(_truncatedbz2engine())
600 compengines.register(_truncatedbz2engine())
601
601
602
602
603 class _noopengine(compressionengine):
603 class _noopengine(compressionengine):
604 def name(self):
604 def name(self):
605 return b'none'
605 return b'none'
606
606
607 def bundletype(self):
607 def bundletype(self):
608 """No compression is performed.
608 """No compression is performed.
609
609
610 Use this compression engine to explicitly disable compression.
610 Use this compression engine to explicitly disable compression.
611 """
611 """
612 return b'none', b'UN'
612 return b'none', b'UN'
613
613
614 # Clients always support uncompressed payloads. Servers don't because
614 # Clients always support uncompressed payloads. Servers don't because
615 # unless you are on a fast network, uncompressed payloads can easily
615 # unless you are on a fast network, uncompressed payloads can easily
616 # saturate your network pipe.
616 # saturate your network pipe.
617 def wireprotosupport(self):
617 def wireprotosupport(self):
618 return compewireprotosupport(b'none', 0, 10)
618 return compewireprotosupport(b'none', 0, 10)
619
619
620 # We don't implement revlogheader because it is handled specially
620 # We don't implement revlogheader because it is handled specially
621 # in the revlog class.
621 # in the revlog class.
622
622
623 def compressstream(self, it, opts=None):
623 def compressstream(self, it, opts=None):
624 return it
624 return it
625
625
626 def decompressorreader(self, fh):
626 def decompressorreader(self, fh):
627 return fh
627 return fh
628
628
629 class nooprevlogcompressor(object):
629 class nooprevlogcompressor(object):
630 def compress(self, data):
630 def compress(self, data):
631 return None
631 return None
632
632
633 def revlogcompressor(self, opts=None):
633 def revlogcompressor(self, opts=None):
634 return self.nooprevlogcompressor()
634 return self.nooprevlogcompressor()
635
635
636
636
637 compengines.register(_noopengine())
637 compengines.register(_noopengine())
638
638
639
639
640 class _zstdengine(compressionengine):
640 class _zstdengine(compressionengine):
641 def name(self):
641 def name(self):
642 return b'zstd'
642 return b'zstd'
643
643
644 @propertycache
644 @propertycache
645 def _module(self):
645 def _module(self):
646 # Not all installs have the zstd module available. So defer importing
646 # Not all installs have the zstd module available. So defer importing
647 # until first access.
647 # until first access.
648 try:
648 try:
649 from .. import zstd
649 from .. import zstd # pytype: disable=import-error
650
650
651 # Force delayed import.
651 # Force delayed import.
652 zstd.__version__
652 zstd.__version__
653 return zstd
653 return zstd
654 except ImportError:
654 except ImportError:
655 return None
655 return None
656
656
657 def available(self):
657 def available(self):
658 return bool(self._module)
658 return bool(self._module)
659
659
660 def bundletype(self):
660 def bundletype(self):
661 """A modern compression algorithm that is fast and highly flexible.
661 """A modern compression algorithm that is fast and highly flexible.
662
662
663 Only supported by Mercurial 4.1 and newer clients.
663 Only supported by Mercurial 4.1 and newer clients.
664
664
665 With the default settings, zstd compression is both faster and yields
665 With the default settings, zstd compression is both faster and yields
666 better compression than ``gzip``. It also frequently yields better
666 better compression than ``gzip``. It also frequently yields better
667 compression than ``bzip2`` while operating at much higher speeds.
667 compression than ``bzip2`` while operating at much higher speeds.
668
668
669 If this engine is available and backwards compatibility is not a
669 If this engine is available and backwards compatibility is not a
670 concern, it is likely the best available engine.
670 concern, it is likely the best available engine.
671 """
671 """
672 return b'zstd', b'ZS'
672 return b'zstd', b'ZS'
673
673
674 def wireprotosupport(self):
674 def wireprotosupport(self):
675 return compewireprotosupport(b'zstd', 50, 50)
675 return compewireprotosupport(b'zstd', 50, 50)
676
676
677 def revlogheader(self):
677 def revlogheader(self):
678 return b'\x28'
678 return b'\x28'
679
679
680 def compressstream(self, it, opts=None):
680 def compressstream(self, it, opts=None):
681 opts = opts or {}
681 opts = opts or {}
682 # zstd level 3 is almost always significantly faster than zlib
682 # zstd level 3 is almost always significantly faster than zlib
683 # while providing no worse compression. It strikes a good balance
683 # while providing no worse compression. It strikes a good balance
684 # between speed and compression.
684 # between speed and compression.
685 level = opts.get(b'level', 3)
685 level = opts.get(b'level', 3)
686
686
687 zstd = self._module
687 zstd = self._module
688 z = zstd.ZstdCompressor(level=level).compressobj()
688 z = zstd.ZstdCompressor(level=level).compressobj()
689 for chunk in it:
689 for chunk in it:
690 data = z.compress(chunk)
690 data = z.compress(chunk)
691 if data:
691 if data:
692 yield data
692 yield data
693
693
694 yield z.flush()
694 yield z.flush()
695
695
696 def decompressorreader(self, fh):
696 def decompressorreader(self, fh):
697 return _ZstdCompressedStreamReader(fh, self._module)
697 return _ZstdCompressedStreamReader(fh, self._module)
698
698
699 class zstdrevlogcompressor(object):
699 class zstdrevlogcompressor(object):
700 def __init__(self, zstd, level=3):
700 def __init__(self, zstd, level=3):
701 # TODO consider omitting frame magic to save 4 bytes.
701 # TODO consider omitting frame magic to save 4 bytes.
702 # This writes content sizes into the frame header. That is
702 # This writes content sizes into the frame header. That is
703 # extra storage. But it allows a correct size memory allocation
703 # extra storage. But it allows a correct size memory allocation
704 # to hold the result.
704 # to hold the result.
705 self._cctx = zstd.ZstdCompressor(level=level)
705 self._cctx = zstd.ZstdCompressor(level=level)
706 self._dctx = zstd.ZstdDecompressor()
706 self._dctx = zstd.ZstdDecompressor()
707 self._compinsize = zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE
707 self._compinsize = zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE
708 self._decompinsize = zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE
708 self._decompinsize = zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE
709
709
710 def compress(self, data):
710 def compress(self, data):
711 insize = len(data)
711 insize = len(data)
712 # Caller handles empty input case.
712 # Caller handles empty input case.
713 assert insize > 0
713 assert insize > 0
714
714
715 if insize < 50:
715 if insize < 50:
716 return None
716 return None
717
717
718 elif insize <= 1000000:
718 elif insize <= 1000000:
719 compressed = self._cctx.compress(data)
719 compressed = self._cctx.compress(data)
720 if len(compressed) < insize:
720 if len(compressed) < insize:
721 return compressed
721 return compressed
722 return None
722 return None
723 else:
723 else:
724 z = self._cctx.compressobj()
724 z = self._cctx.compressobj()
725 chunks = []
725 chunks = []
726 pos = 0
726 pos = 0
727 while pos < insize:
727 while pos < insize:
728 pos2 = pos + self._compinsize
728 pos2 = pos + self._compinsize
729 chunk = z.compress(data[pos:pos2])
729 chunk = z.compress(data[pos:pos2])
730 if chunk:
730 if chunk:
731 chunks.append(chunk)
731 chunks.append(chunk)
732 pos = pos2
732 pos = pos2
733 chunks.append(z.flush())
733 chunks.append(z.flush())
734
734
735 if sum(map(len, chunks)) < insize:
735 if sum(map(len, chunks)) < insize:
736 return b''.join(chunks)
736 return b''.join(chunks)
737 return None
737 return None
738
738
739 def decompress(self, data):
739 def decompress(self, data):
740 insize = len(data)
740 insize = len(data)
741
741
742 try:
742 try:
743 # This was measured to be faster than other streaming
743 # This was measured to be faster than other streaming
744 # decompressors.
744 # decompressors.
745 dobj = self._dctx.decompressobj()
745 dobj = self._dctx.decompressobj()
746 chunks = []
746 chunks = []
747 pos = 0
747 pos = 0
748 while pos < insize:
748 while pos < insize:
749 pos2 = pos + self._decompinsize
749 pos2 = pos + self._decompinsize
750 chunk = dobj.decompress(data[pos:pos2])
750 chunk = dobj.decompress(data[pos:pos2])
751 if chunk:
751 if chunk:
752 chunks.append(chunk)
752 chunks.append(chunk)
753 pos = pos2
753 pos = pos2
754 # Frame should be exhausted, so no finish() API.
754 # Frame should be exhausted, so no finish() API.
755
755
756 return b''.join(chunks)
756 return b''.join(chunks)
757 except Exception as e:
757 except Exception as e:
758 raise error.StorageError(
758 raise error.StorageError(
759 _(b'revlog decompress error: %s')
759 _(b'revlog decompress error: %s')
760 % stringutil.forcebytestr(e)
760 % stringutil.forcebytestr(e)
761 )
761 )
762
762
763 def revlogcompressor(self, opts=None):
763 def revlogcompressor(self, opts=None):
764 opts = opts or {}
764 opts = opts or {}
765 level = opts.get(b'zstd.level')
765 level = opts.get(b'zstd.level')
766 if level is None:
766 if level is None:
767 level = opts.get(b'level')
767 level = opts.get(b'level')
768 if level is None:
768 if level is None:
769 level = 3
769 level = 3
770 return self.zstdrevlogcompressor(self._module, level=level)
770 return self.zstdrevlogcompressor(self._module, level=level)
771
771
772
772
773 compengines.register(_zstdengine())
773 compengines.register(_zstdengine())
774
774
775
775
776 def bundlecompressiontopics():
776 def bundlecompressiontopics():
777 """Obtains a list of available bundle compressions for use in help."""
777 """Obtains a list of available bundle compressions for use in help."""
778 # help.makeitemsdocs() expects a dict of names to items with a .__doc__.
778 # help.makeitemsdocs() expects a dict of names to items with a .__doc__.
779 items = {}
779 items = {}
780
780
781 # We need to format the docstring. So use a dummy object/type to hold it
781 # We need to format the docstring. So use a dummy object/type to hold it
782 # rather than mutating the original.
782 # rather than mutating the original.
783 class docobject(object):
783 class docobject(object):
784 pass
784 pass
785
785
786 for name in compengines:
786 for name in compengines:
787 engine = compengines[name]
787 engine = compengines[name]
788
788
789 if not engine.available():
789 if not engine.available():
790 continue
790 continue
791
791
792 bt = engine.bundletype()
792 bt = engine.bundletype()
793 if not bt or not bt[0]:
793 if not bt or not bt[0]:
794 continue
794 continue
795
795
796 doc = b'``%s``\n %s' % (bt[0], pycompat.getdoc(engine.bundletype))
796 doc = b'``%s``\n %s' % (bt[0], pycompat.getdoc(engine.bundletype))
797
797
798 value = docobject()
798 value = docobject()
799 value.__doc__ = pycompat.sysstr(doc)
799 value.__doc__ = pycompat.sysstr(doc)
800 value._origdoc = engine.bundletype.__doc__
800 value._origdoc = engine.bundletype.__doc__
801 value._origfunc = engine.bundletype
801 value._origfunc = engine.bundletype
802
802
803 items[bt[0]] = value
803 items[bt[0]] = value
804
804
805 return items
805 return items
806
806
807
807
808 i18nfunctions = bundlecompressiontopics().values()
808 i18nfunctions = bundlecompressiontopics().values()
General Comments 0
You need to be logged in to leave comments. Login now