##// END OF EJS Templates
compression: accept level management for zlib compression...
marmoute -
r42209:aaececb4 default
parent child Browse files
Show More
@@ -1,747 +1,757 b''
1 # compression.py - Mercurial utility functions for compression
1 # compression.py - Mercurial utility functions for compression
2 #
2 #
3 # This software may be used and distributed according to the terms of the
3 # This software may be used and distributed according to the terms of the
4 # GNU General Public License version 2 or any later version.
4 # GNU General Public License version 2 or any later version.
5
5
6
6
7 from __future__ import absolute_import, print_function
7 from __future__ import absolute_import, print_function
8
8
9 import bz2
9 import bz2
10 import collections
10 import collections
11 import zlib
11 import zlib
12
12
13 from .. import (
13 from .. import (
14 error,
14 error,
15 i18n,
15 i18n,
16 pycompat,
16 pycompat,
17 )
17 )
18 from . import (
18 from . import (
19 stringutil,
19 stringutil,
20 )
20 )
21
21
22 safehasattr = pycompat.safehasattr
22 safehasattr = pycompat.safehasattr
23
23
24
24
25 _ = i18n._
25 _ = i18n._
26
26
27 # compression code
27 # compression code
28
28
29 SERVERROLE = 'server'
29 SERVERROLE = 'server'
30 CLIENTROLE = 'client'
30 CLIENTROLE = 'client'
31
31
32 compewireprotosupport = collections.namedtuple(r'compenginewireprotosupport',
32 compewireprotosupport = collections.namedtuple(r'compenginewireprotosupport',
33 (r'name', r'serverpriority',
33 (r'name', r'serverpriority',
34 r'clientpriority'))
34 r'clientpriority'))
35
35
36 class propertycache(object):
36 class propertycache(object):
37 def __init__(self, func):
37 def __init__(self, func):
38 self.func = func
38 self.func = func
39 self.name = func.__name__
39 self.name = func.__name__
40 def __get__(self, obj, type=None):
40 def __get__(self, obj, type=None):
41 result = self.func(obj)
41 result = self.func(obj)
42 self.cachevalue(obj, result)
42 self.cachevalue(obj, result)
43 return result
43 return result
44
44
45 def cachevalue(self, obj, value):
45 def cachevalue(self, obj, value):
46 # __dict__ assignment required to bypass __setattr__ (eg: repoview)
46 # __dict__ assignment required to bypass __setattr__ (eg: repoview)
47 obj.__dict__[self.name] = value
47 obj.__dict__[self.name] = value
48
48
49 class compressormanager(object):
49 class compressormanager(object):
50 """Holds registrations of various compression engines.
50 """Holds registrations of various compression engines.
51
51
52 This class essentially abstracts the differences between compression
52 This class essentially abstracts the differences between compression
53 engines to allow new compression formats to be added easily, possibly from
53 engines to allow new compression formats to be added easily, possibly from
54 extensions.
54 extensions.
55
55
56 Compressors are registered against the global instance by calling its
56 Compressors are registered against the global instance by calling its
57 ``register()`` method.
57 ``register()`` method.
58 """
58 """
59 def __init__(self):
59 def __init__(self):
60 self._engines = {}
60 self._engines = {}
61 # Bundle spec human name to engine name.
61 # Bundle spec human name to engine name.
62 self._bundlenames = {}
62 self._bundlenames = {}
63 # Internal bundle identifier to engine name.
63 # Internal bundle identifier to engine name.
64 self._bundletypes = {}
64 self._bundletypes = {}
65 # Revlog header to engine name.
65 # Revlog header to engine name.
66 self._revlogheaders = {}
66 self._revlogheaders = {}
67 # Wire proto identifier to engine name.
67 # Wire proto identifier to engine name.
68 self._wiretypes = {}
68 self._wiretypes = {}
69
69
70 def __getitem__(self, key):
70 def __getitem__(self, key):
71 return self._engines[key]
71 return self._engines[key]
72
72
73 def __contains__(self, key):
73 def __contains__(self, key):
74 return key in self._engines
74 return key in self._engines
75
75
76 def __iter__(self):
76 def __iter__(self):
77 return iter(self._engines.keys())
77 return iter(self._engines.keys())
78
78
79 def register(self, engine):
79 def register(self, engine):
80 """Register a compression engine with the manager.
80 """Register a compression engine with the manager.
81
81
82 The argument must be a ``compressionengine`` instance.
82 The argument must be a ``compressionengine`` instance.
83 """
83 """
84 if not isinstance(engine, compressionengine):
84 if not isinstance(engine, compressionengine):
85 raise ValueError(_('argument must be a compressionengine'))
85 raise ValueError(_('argument must be a compressionengine'))
86
86
87 name = engine.name()
87 name = engine.name()
88
88
89 if name in self._engines:
89 if name in self._engines:
90 raise error.Abort(_('compression engine %s already registered') %
90 raise error.Abort(_('compression engine %s already registered') %
91 name)
91 name)
92
92
93 bundleinfo = engine.bundletype()
93 bundleinfo = engine.bundletype()
94 if bundleinfo:
94 if bundleinfo:
95 bundlename, bundletype = bundleinfo
95 bundlename, bundletype = bundleinfo
96
96
97 if bundlename in self._bundlenames:
97 if bundlename in self._bundlenames:
98 raise error.Abort(_('bundle name %s already registered') %
98 raise error.Abort(_('bundle name %s already registered') %
99 bundlename)
99 bundlename)
100 if bundletype in self._bundletypes:
100 if bundletype in self._bundletypes:
101 raise error.Abort(_('bundle type %s already registered by %s') %
101 raise error.Abort(_('bundle type %s already registered by %s') %
102 (bundletype, self._bundletypes[bundletype]))
102 (bundletype, self._bundletypes[bundletype]))
103
103
104 # No external facing name declared.
104 # No external facing name declared.
105 if bundlename:
105 if bundlename:
106 self._bundlenames[bundlename] = name
106 self._bundlenames[bundlename] = name
107
107
108 self._bundletypes[bundletype] = name
108 self._bundletypes[bundletype] = name
109
109
110 wiresupport = engine.wireprotosupport()
110 wiresupport = engine.wireprotosupport()
111 if wiresupport:
111 if wiresupport:
112 wiretype = wiresupport.name
112 wiretype = wiresupport.name
113 if wiretype in self._wiretypes:
113 if wiretype in self._wiretypes:
114 raise error.Abort(_('wire protocol compression %s already '
114 raise error.Abort(_('wire protocol compression %s already '
115 'registered by %s') %
115 'registered by %s') %
116 (wiretype, self._wiretypes[wiretype]))
116 (wiretype, self._wiretypes[wiretype]))
117
117
118 self._wiretypes[wiretype] = name
118 self._wiretypes[wiretype] = name
119
119
120 revlogheader = engine.revlogheader()
120 revlogheader = engine.revlogheader()
121 if revlogheader and revlogheader in self._revlogheaders:
121 if revlogheader and revlogheader in self._revlogheaders:
122 raise error.Abort(_('revlog header %s already registered by %s') %
122 raise error.Abort(_('revlog header %s already registered by %s') %
123 (revlogheader, self._revlogheaders[revlogheader]))
123 (revlogheader, self._revlogheaders[revlogheader]))
124
124
125 if revlogheader:
125 if revlogheader:
126 self._revlogheaders[revlogheader] = name
126 self._revlogheaders[revlogheader] = name
127
127
128 self._engines[name] = engine
128 self._engines[name] = engine
129
129
130 @property
130 @property
131 def supportedbundlenames(self):
131 def supportedbundlenames(self):
132 return set(self._bundlenames.keys())
132 return set(self._bundlenames.keys())
133
133
134 @property
134 @property
135 def supportedbundletypes(self):
135 def supportedbundletypes(self):
136 return set(self._bundletypes.keys())
136 return set(self._bundletypes.keys())
137
137
138 def forbundlename(self, bundlename):
138 def forbundlename(self, bundlename):
139 """Obtain a compression engine registered to a bundle name.
139 """Obtain a compression engine registered to a bundle name.
140
140
141 Will raise KeyError if the bundle type isn't registered.
141 Will raise KeyError if the bundle type isn't registered.
142
142
143 Will abort if the engine is known but not available.
143 Will abort if the engine is known but not available.
144 """
144 """
145 engine = self._engines[self._bundlenames[bundlename]]
145 engine = self._engines[self._bundlenames[bundlename]]
146 if not engine.available():
146 if not engine.available():
147 raise error.Abort(_('compression engine %s could not be loaded') %
147 raise error.Abort(_('compression engine %s could not be loaded') %
148 engine.name())
148 engine.name())
149 return engine
149 return engine
150
150
151 def forbundletype(self, bundletype):
151 def forbundletype(self, bundletype):
152 """Obtain a compression engine registered to a bundle type.
152 """Obtain a compression engine registered to a bundle type.
153
153
154 Will raise KeyError if the bundle type isn't registered.
154 Will raise KeyError if the bundle type isn't registered.
155
155
156 Will abort if the engine is known but not available.
156 Will abort if the engine is known but not available.
157 """
157 """
158 engine = self._engines[self._bundletypes[bundletype]]
158 engine = self._engines[self._bundletypes[bundletype]]
159 if not engine.available():
159 if not engine.available():
160 raise error.Abort(_('compression engine %s could not be loaded') %
160 raise error.Abort(_('compression engine %s could not be loaded') %
161 engine.name())
161 engine.name())
162 return engine
162 return engine
163
163
164 def supportedwireengines(self, role, onlyavailable=True):
164 def supportedwireengines(self, role, onlyavailable=True):
165 """Obtain compression engines that support the wire protocol.
165 """Obtain compression engines that support the wire protocol.
166
166
167 Returns a list of engines in prioritized order, most desired first.
167 Returns a list of engines in prioritized order, most desired first.
168
168
169 If ``onlyavailable`` is set, filter out engines that can't be
169 If ``onlyavailable`` is set, filter out engines that can't be
170 loaded.
170 loaded.
171 """
171 """
172 assert role in (SERVERROLE, CLIENTROLE)
172 assert role in (SERVERROLE, CLIENTROLE)
173
173
174 attr = 'serverpriority' if role == SERVERROLE else 'clientpriority'
174 attr = 'serverpriority' if role == SERVERROLE else 'clientpriority'
175
175
176 engines = [self._engines[e] for e in self._wiretypes.values()]
176 engines = [self._engines[e] for e in self._wiretypes.values()]
177 if onlyavailable:
177 if onlyavailable:
178 engines = [e for e in engines if e.available()]
178 engines = [e for e in engines if e.available()]
179
179
180 def getkey(e):
180 def getkey(e):
181 # Sort first by priority, highest first. In case of tie, sort
181 # Sort first by priority, highest first. In case of tie, sort
182 # alphabetically. This is arbitrary, but ensures output is
182 # alphabetically. This is arbitrary, but ensures output is
183 # stable.
183 # stable.
184 w = e.wireprotosupport()
184 w = e.wireprotosupport()
185 return -1 * getattr(w, attr), w.name
185 return -1 * getattr(w, attr), w.name
186
186
187 return list(sorted(engines, key=getkey))
187 return list(sorted(engines, key=getkey))
188
188
189 def forwiretype(self, wiretype):
189 def forwiretype(self, wiretype):
190 engine = self._engines[self._wiretypes[wiretype]]
190 engine = self._engines[self._wiretypes[wiretype]]
191 if not engine.available():
191 if not engine.available():
192 raise error.Abort(_('compression engine %s could not be loaded') %
192 raise error.Abort(_('compression engine %s could not be loaded') %
193 engine.name())
193 engine.name())
194 return engine
194 return engine
195
195
196 def forrevlogheader(self, header):
196 def forrevlogheader(self, header):
197 """Obtain a compression engine registered to a revlog header.
197 """Obtain a compression engine registered to a revlog header.
198
198
199 Will raise KeyError if the revlog header value isn't registered.
199 Will raise KeyError if the revlog header value isn't registered.
200 """
200 """
201 return self._engines[self._revlogheaders[header]]
201 return self._engines[self._revlogheaders[header]]
202
202
203 compengines = compressormanager()
203 compengines = compressormanager()
204
204
205 class compressionengine(object):
205 class compressionengine(object):
206 """Base class for compression engines.
206 """Base class for compression engines.
207
207
208 Compression engines must implement the interface defined by this class.
208 Compression engines must implement the interface defined by this class.
209 """
209 """
210 def name(self):
210 def name(self):
211 """Returns the name of the compression engine.
211 """Returns the name of the compression engine.
212
212
213 This is the key the engine is registered under.
213 This is the key the engine is registered under.
214
214
215 This method must be implemented.
215 This method must be implemented.
216 """
216 """
217 raise NotImplementedError()
217 raise NotImplementedError()
218
218
219 def available(self):
219 def available(self):
220 """Whether the compression engine is available.
220 """Whether the compression engine is available.
221
221
222 The intent of this method is to allow optional compression engines
222 The intent of this method is to allow optional compression engines
223 that may not be available in all installations (such as engines relying
223 that may not be available in all installations (such as engines relying
224 on C extensions that may not be present).
224 on C extensions that may not be present).
225 """
225 """
226 return True
226 return True
227
227
228 def bundletype(self):
228 def bundletype(self):
229 """Describes bundle identifiers for this engine.
229 """Describes bundle identifiers for this engine.
230
230
231 If this compression engine isn't supported for bundles, returns None.
231 If this compression engine isn't supported for bundles, returns None.
232
232
233 If this engine can be used for bundles, returns a 2-tuple of strings of
233 If this engine can be used for bundles, returns a 2-tuple of strings of
234 the user-facing "bundle spec" compression name and an internal
234 the user-facing "bundle spec" compression name and an internal
235 identifier used to denote the compression format within bundles. To
235 identifier used to denote the compression format within bundles. To
236 exclude the name from external usage, set the first element to ``None``.
236 exclude the name from external usage, set the first element to ``None``.
237
237
238 If bundle compression is supported, the class must also implement
238 If bundle compression is supported, the class must also implement
239 ``compressstream`` and `decompressorreader``.
239 ``compressstream`` and `decompressorreader``.
240
240
241 The docstring of this method is used in the help system to tell users
241 The docstring of this method is used in the help system to tell users
242 about this engine.
242 about this engine.
243 """
243 """
244 return None
244 return None
245
245
246 def wireprotosupport(self):
246 def wireprotosupport(self):
247 """Declare support for this compression format on the wire protocol.
247 """Declare support for this compression format on the wire protocol.
248
248
249 If this compression engine isn't supported for compressing wire
249 If this compression engine isn't supported for compressing wire
250 protocol payloads, returns None.
250 protocol payloads, returns None.
251
251
252 Otherwise, returns ``compenginewireprotosupport`` with the following
252 Otherwise, returns ``compenginewireprotosupport`` with the following
253 fields:
253 fields:
254
254
255 * String format identifier
255 * String format identifier
256 * Integer priority for the server
256 * Integer priority for the server
257 * Integer priority for the client
257 * Integer priority for the client
258
258
259 The integer priorities are used to order the advertisement of format
259 The integer priorities are used to order the advertisement of format
260 support by server and client. The highest integer is advertised
260 support by server and client. The highest integer is advertised
261 first. Integers with non-positive values aren't advertised.
261 first. Integers with non-positive values aren't advertised.
262
262
263 The priority values are somewhat arbitrary and only used for default
263 The priority values are somewhat arbitrary and only used for default
264 ordering. The relative order can be changed via config options.
264 ordering. The relative order can be changed via config options.
265
265
266 If wire protocol compression is supported, the class must also implement
266 If wire protocol compression is supported, the class must also implement
267 ``compressstream`` and ``decompressorreader``.
267 ``compressstream`` and ``decompressorreader``.
268 """
268 """
269 return None
269 return None
270
270
271 def revlogheader(self):
271 def revlogheader(self):
272 """Header added to revlog chunks that identifies this engine.
272 """Header added to revlog chunks that identifies this engine.
273
273
274 If this engine can be used to compress revlogs, this method should
274 If this engine can be used to compress revlogs, this method should
275 return the bytes used to identify chunks compressed with this engine.
275 return the bytes used to identify chunks compressed with this engine.
276 Else, the method should return ``None`` to indicate it does not
276 Else, the method should return ``None`` to indicate it does not
277 participate in revlog compression.
277 participate in revlog compression.
278 """
278 """
279 return None
279 return None
280
280
281 def compressstream(self, it, opts=None):
281 def compressstream(self, it, opts=None):
282 """Compress an iterator of chunks.
282 """Compress an iterator of chunks.
283
283
284 The method receives an iterator (ideally a generator) of chunks of
284 The method receives an iterator (ideally a generator) of chunks of
285 bytes to be compressed. It returns an iterator (ideally a generator)
285 bytes to be compressed. It returns an iterator (ideally a generator)
286 of bytes of chunks representing the compressed output.
286 of bytes of chunks representing the compressed output.
287
287
288 Optionally accepts an argument defining how to perform compression.
288 Optionally accepts an argument defining how to perform compression.
289 Each engine treats this argument differently.
289 Each engine treats this argument differently.
290 """
290 """
291 raise NotImplementedError()
291 raise NotImplementedError()
292
292
293 def decompressorreader(self, fh):
293 def decompressorreader(self, fh):
294 """Perform decompression on a file object.
294 """Perform decompression on a file object.
295
295
296 Argument is an object with a ``read(size)`` method that returns
296 Argument is an object with a ``read(size)`` method that returns
297 compressed data. Return value is an object with a ``read(size)`` that
297 compressed data. Return value is an object with a ``read(size)`` that
298 returns uncompressed data.
298 returns uncompressed data.
299 """
299 """
300 raise NotImplementedError()
300 raise NotImplementedError()
301
301
302 def revlogcompressor(self, opts=None):
302 def revlogcompressor(self, opts=None):
303 """Obtain an object that can be used to compress revlog entries.
303 """Obtain an object that can be used to compress revlog entries.
304
304
305 The object has a ``compress(data)`` method that compresses binary
305 The object has a ``compress(data)`` method that compresses binary
306 data. This method returns compressed binary data or ``None`` if
306 data. This method returns compressed binary data or ``None`` if
307 the data could not be compressed (too small, not compressible, etc).
307 the data could not be compressed (too small, not compressible, etc).
308 The returned data should have a header uniquely identifying this
308 The returned data should have a header uniquely identifying this
309 compression format so decompression can be routed to this engine.
309 compression format so decompression can be routed to this engine.
310 This header should be identified by the ``revlogheader()`` return
310 This header should be identified by the ``revlogheader()`` return
311 value.
311 value.
312
312
313 The object has a ``decompress(data)`` method that decompresses
313 The object has a ``decompress(data)`` method that decompresses
314 data. The method will only be called if ``data`` begins with
314 data. The method will only be called if ``data`` begins with
315 ``revlogheader()``. The method should return the raw, uncompressed
315 ``revlogheader()``. The method should return the raw, uncompressed
316 data or raise a ``StorageError``.
316 data or raise a ``StorageError``.
317
317
318 The object is reusable but is not thread safe.
318 The object is reusable but is not thread safe.
319 """
319 """
320 raise NotImplementedError()
320 raise NotImplementedError()
321
321
322 class _CompressedStreamReader(object):
322 class _CompressedStreamReader(object):
323 def __init__(self, fh):
323 def __init__(self, fh):
324 if safehasattr(fh, 'unbufferedread'):
324 if safehasattr(fh, 'unbufferedread'):
325 self._reader = fh.unbufferedread
325 self._reader = fh.unbufferedread
326 else:
326 else:
327 self._reader = fh.read
327 self._reader = fh.read
328 self._pending = []
328 self._pending = []
329 self._pos = 0
329 self._pos = 0
330 self._eof = False
330 self._eof = False
331
331
332 def _decompress(self, chunk):
332 def _decompress(self, chunk):
333 raise NotImplementedError()
333 raise NotImplementedError()
334
334
335 def read(self, l):
335 def read(self, l):
336 buf = []
336 buf = []
337 while True:
337 while True:
338 while self._pending:
338 while self._pending:
339 if len(self._pending[0]) > l + self._pos:
339 if len(self._pending[0]) > l + self._pos:
340 newbuf = self._pending[0]
340 newbuf = self._pending[0]
341 buf.append(newbuf[self._pos:self._pos + l])
341 buf.append(newbuf[self._pos:self._pos + l])
342 self._pos += l
342 self._pos += l
343 return ''.join(buf)
343 return ''.join(buf)
344
344
345 newbuf = self._pending.pop(0)
345 newbuf = self._pending.pop(0)
346 if self._pos:
346 if self._pos:
347 buf.append(newbuf[self._pos:])
347 buf.append(newbuf[self._pos:])
348 l -= len(newbuf) - self._pos
348 l -= len(newbuf) - self._pos
349 else:
349 else:
350 buf.append(newbuf)
350 buf.append(newbuf)
351 l -= len(newbuf)
351 l -= len(newbuf)
352 self._pos = 0
352 self._pos = 0
353
353
354 if self._eof:
354 if self._eof:
355 return ''.join(buf)
355 return ''.join(buf)
356 chunk = self._reader(65536)
356 chunk = self._reader(65536)
357 self._decompress(chunk)
357 self._decompress(chunk)
358 if not chunk and not self._pending and not self._eof:
358 if not chunk and not self._pending and not self._eof:
359 # No progress and no new data, bail out
359 # No progress and no new data, bail out
360 return ''.join(buf)
360 return ''.join(buf)
361
361
362 class _GzipCompressedStreamReader(_CompressedStreamReader):
362 class _GzipCompressedStreamReader(_CompressedStreamReader):
363 def __init__(self, fh):
363 def __init__(self, fh):
364 super(_GzipCompressedStreamReader, self).__init__(fh)
364 super(_GzipCompressedStreamReader, self).__init__(fh)
365 self._decompobj = zlib.decompressobj()
365 self._decompobj = zlib.decompressobj()
366 def _decompress(self, chunk):
366 def _decompress(self, chunk):
367 newbuf = self._decompobj.decompress(chunk)
367 newbuf = self._decompobj.decompress(chunk)
368 if newbuf:
368 if newbuf:
369 self._pending.append(newbuf)
369 self._pending.append(newbuf)
370 d = self._decompobj.copy()
370 d = self._decompobj.copy()
371 try:
371 try:
372 d.decompress('x')
372 d.decompress('x')
373 d.flush()
373 d.flush()
374 if d.unused_data == 'x':
374 if d.unused_data == 'x':
375 self._eof = True
375 self._eof = True
376 except zlib.error:
376 except zlib.error:
377 pass
377 pass
378
378
379 class _BZ2CompressedStreamReader(_CompressedStreamReader):
379 class _BZ2CompressedStreamReader(_CompressedStreamReader):
380 def __init__(self, fh):
380 def __init__(self, fh):
381 super(_BZ2CompressedStreamReader, self).__init__(fh)
381 super(_BZ2CompressedStreamReader, self).__init__(fh)
382 self._decompobj = bz2.BZ2Decompressor()
382 self._decompobj = bz2.BZ2Decompressor()
383 def _decompress(self, chunk):
383 def _decompress(self, chunk):
384 newbuf = self._decompobj.decompress(chunk)
384 newbuf = self._decompobj.decompress(chunk)
385 if newbuf:
385 if newbuf:
386 self._pending.append(newbuf)
386 self._pending.append(newbuf)
387 try:
387 try:
388 while True:
388 while True:
389 newbuf = self._decompobj.decompress('')
389 newbuf = self._decompobj.decompress('')
390 if newbuf:
390 if newbuf:
391 self._pending.append(newbuf)
391 self._pending.append(newbuf)
392 else:
392 else:
393 break
393 break
394 except EOFError:
394 except EOFError:
395 self._eof = True
395 self._eof = True
396
396
397 class _TruncatedBZ2CompressedStreamReader(_BZ2CompressedStreamReader):
397 class _TruncatedBZ2CompressedStreamReader(_BZ2CompressedStreamReader):
398 def __init__(self, fh):
398 def __init__(self, fh):
399 super(_TruncatedBZ2CompressedStreamReader, self).__init__(fh)
399 super(_TruncatedBZ2CompressedStreamReader, self).__init__(fh)
400 newbuf = self._decompobj.decompress('BZ')
400 newbuf = self._decompobj.decompress('BZ')
401 if newbuf:
401 if newbuf:
402 self._pending.append(newbuf)
402 self._pending.append(newbuf)
403
403
404 class _ZstdCompressedStreamReader(_CompressedStreamReader):
404 class _ZstdCompressedStreamReader(_CompressedStreamReader):
405 def __init__(self, fh, zstd):
405 def __init__(self, fh, zstd):
406 super(_ZstdCompressedStreamReader, self).__init__(fh)
406 super(_ZstdCompressedStreamReader, self).__init__(fh)
407 self._zstd = zstd
407 self._zstd = zstd
408 self._decompobj = zstd.ZstdDecompressor().decompressobj()
408 self._decompobj = zstd.ZstdDecompressor().decompressobj()
409 def _decompress(self, chunk):
409 def _decompress(self, chunk):
410 newbuf = self._decompobj.decompress(chunk)
410 newbuf = self._decompobj.decompress(chunk)
411 if newbuf:
411 if newbuf:
412 self._pending.append(newbuf)
412 self._pending.append(newbuf)
413 try:
413 try:
414 while True:
414 while True:
415 newbuf = self._decompobj.decompress('')
415 newbuf = self._decompobj.decompress('')
416 if newbuf:
416 if newbuf:
417 self._pending.append(newbuf)
417 self._pending.append(newbuf)
418 else:
418 else:
419 break
419 break
420 except self._zstd.ZstdError:
420 except self._zstd.ZstdError:
421 self._eof = True
421 self._eof = True
422
422
423 class _zlibengine(compressionengine):
423 class _zlibengine(compressionengine):
424 def name(self):
424 def name(self):
425 return 'zlib'
425 return 'zlib'
426
426
427 def bundletype(self):
427 def bundletype(self):
428 """zlib compression using the DEFLATE algorithm.
428 """zlib compression using the DEFLATE algorithm.
429
429
430 All Mercurial clients should support this format. The compression
430 All Mercurial clients should support this format. The compression
431 algorithm strikes a reasonable balance between compression ratio
431 algorithm strikes a reasonable balance between compression ratio
432 and size.
432 and size.
433 """
433 """
434 return 'gzip', 'GZ'
434 return 'gzip', 'GZ'
435
435
436 def wireprotosupport(self):
436 def wireprotosupport(self):
437 return compewireprotosupport('zlib', 20, 20)
437 return compewireprotosupport('zlib', 20, 20)
438
438
439 def revlogheader(self):
439 def revlogheader(self):
440 return 'x'
440 return 'x'
441
441
442 def compressstream(self, it, opts=None):
442 def compressstream(self, it, opts=None):
443 opts = opts or {}
443 opts = opts or {}
444
444
445 z = zlib.compressobj(opts.get('level', -1))
445 z = zlib.compressobj(opts.get('level', -1))
446 for chunk in it:
446 for chunk in it:
447 data = z.compress(chunk)
447 data = z.compress(chunk)
448 # Not all calls to compress emit data. It is cheaper to inspect
448 # Not all calls to compress emit data. It is cheaper to inspect
449 # here than to feed empty chunks through generator.
449 # here than to feed empty chunks through generator.
450 if data:
450 if data:
451 yield data
451 yield data
452
452
453 yield z.flush()
453 yield z.flush()
454
454
455 def decompressorreader(self, fh):
455 def decompressorreader(self, fh):
456 return _GzipCompressedStreamReader(fh)
456 return _GzipCompressedStreamReader(fh)
457
457
458 class zlibrevlogcompressor(object):
458 class zlibrevlogcompressor(object):
459
460 def __init__(self, level=None):
461 self._level = level
462
459 def compress(self, data):
463 def compress(self, data):
460 insize = len(data)
464 insize = len(data)
461 # Caller handles empty input case.
465 # Caller handles empty input case.
462 assert insize > 0
466 assert insize > 0
463
467
464 if insize < 44:
468 if insize < 44:
465 return None
469 return None
466
470
467 elif insize <= 1000000:
471 elif insize <= 1000000:
472 if self._level is None:
468 compressed = zlib.compress(data)
473 compressed = zlib.compress(data)
474 else:
475 compressed = zlib.compress(data, self._level)
469 if len(compressed) < insize:
476 if len(compressed) < insize:
470 return compressed
477 return compressed
471 return None
478 return None
472
479
473 # zlib makes an internal copy of the input buffer, doubling
480 # zlib makes an internal copy of the input buffer, doubling
474 # memory usage for large inputs. So do streaming compression
481 # memory usage for large inputs. So do streaming compression
475 # on large inputs.
482 # on large inputs.
476 else:
483 else:
484 if self._level is None:
477 z = zlib.compressobj()
485 z = zlib.compressobj()
486 else:
487 z = zlib.compressobj(level=self._level)
478 parts = []
488 parts = []
479 pos = 0
489 pos = 0
480 while pos < insize:
490 while pos < insize:
481 pos2 = pos + 2**20
491 pos2 = pos + 2**20
482 parts.append(z.compress(data[pos:pos2]))
492 parts.append(z.compress(data[pos:pos2]))
483 pos = pos2
493 pos = pos2
484 parts.append(z.flush())
494 parts.append(z.flush())
485
495
486 if sum(map(len, parts)) < insize:
496 if sum(map(len, parts)) < insize:
487 return ''.join(parts)
497 return ''.join(parts)
488 return None
498 return None
489
499
490 def decompress(self, data):
500 def decompress(self, data):
491 try:
501 try:
492 return zlib.decompress(data)
502 return zlib.decompress(data)
493 except zlib.error as e:
503 except zlib.error as e:
494 raise error.StorageError(_('revlog decompress error: %s') %
504 raise error.StorageError(_('revlog decompress error: %s') %
495 stringutil.forcebytestr(e))
505 stringutil.forcebytestr(e))
496
506
497 def revlogcompressor(self, opts=None):
507 def revlogcompressor(self, opts=None):
498 return self.zlibrevlogcompressor()
508 return self.zlibrevlogcompressor()
499
509
500 compengines.register(_zlibengine())
510 compengines.register(_zlibengine())
501
511
502 class _bz2engine(compressionengine):
512 class _bz2engine(compressionengine):
503 def name(self):
513 def name(self):
504 return 'bz2'
514 return 'bz2'
505
515
506 def bundletype(self):
516 def bundletype(self):
507 """An algorithm that produces smaller bundles than ``gzip``.
517 """An algorithm that produces smaller bundles than ``gzip``.
508
518
509 All Mercurial clients should support this format.
519 All Mercurial clients should support this format.
510
520
511 This engine will likely produce smaller bundles than ``gzip`` but
521 This engine will likely produce smaller bundles than ``gzip`` but
512 will be significantly slower, both during compression and
522 will be significantly slower, both during compression and
513 decompression.
523 decompression.
514
524
515 If available, the ``zstd`` engine can yield similar or better
525 If available, the ``zstd`` engine can yield similar or better
516 compression at much higher speeds.
526 compression at much higher speeds.
517 """
527 """
518 return 'bzip2', 'BZ'
528 return 'bzip2', 'BZ'
519
529
520 # We declare a protocol name but don't advertise by default because
530 # We declare a protocol name but don't advertise by default because
521 # it is slow.
531 # it is slow.
522 def wireprotosupport(self):
532 def wireprotosupport(self):
523 return compewireprotosupport('bzip2', 0, 0)
533 return compewireprotosupport('bzip2', 0, 0)
524
534
525 def compressstream(self, it, opts=None):
535 def compressstream(self, it, opts=None):
526 opts = opts or {}
536 opts = opts or {}
527 z = bz2.BZ2Compressor(opts.get('level', 9))
537 z = bz2.BZ2Compressor(opts.get('level', 9))
528 for chunk in it:
538 for chunk in it:
529 data = z.compress(chunk)
539 data = z.compress(chunk)
530 if data:
540 if data:
531 yield data
541 yield data
532
542
533 yield z.flush()
543 yield z.flush()
534
544
535 def decompressorreader(self, fh):
545 def decompressorreader(self, fh):
536 return _BZ2CompressedStreamReader(fh)
546 return _BZ2CompressedStreamReader(fh)
537
547
538 compengines.register(_bz2engine())
548 compengines.register(_bz2engine())
539
549
540 class _truncatedbz2engine(compressionengine):
550 class _truncatedbz2engine(compressionengine):
541 def name(self):
551 def name(self):
542 return 'bz2truncated'
552 return 'bz2truncated'
543
553
544 def bundletype(self):
554 def bundletype(self):
545 return None, '_truncatedBZ'
555 return None, '_truncatedBZ'
546
556
547 # We don't implement compressstream because it is hackily handled elsewhere.
557 # We don't implement compressstream because it is hackily handled elsewhere.
548
558
549 def decompressorreader(self, fh):
559 def decompressorreader(self, fh):
550 return _TruncatedBZ2CompressedStreamReader(fh)
560 return _TruncatedBZ2CompressedStreamReader(fh)
551
561
552 compengines.register(_truncatedbz2engine())
562 compengines.register(_truncatedbz2engine())
553
563
554 class _noopengine(compressionengine):
564 class _noopengine(compressionengine):
555 def name(self):
565 def name(self):
556 return 'none'
566 return 'none'
557
567
558 def bundletype(self):
568 def bundletype(self):
559 """No compression is performed.
569 """No compression is performed.
560
570
561 Use this compression engine to explicitly disable compression.
571 Use this compression engine to explicitly disable compression.
562 """
572 """
563 return 'none', 'UN'
573 return 'none', 'UN'
564
574
565 # Clients always support uncompressed payloads. Servers don't because
575 # Clients always support uncompressed payloads. Servers don't because
566 # unless you are on a fast network, uncompressed payloads can easily
576 # unless you are on a fast network, uncompressed payloads can easily
567 # saturate your network pipe.
577 # saturate your network pipe.
568 def wireprotosupport(self):
578 def wireprotosupport(self):
569 return compewireprotosupport('none', 0, 10)
579 return compewireprotosupport('none', 0, 10)
570
580
571 # We don't implement revlogheader because it is handled specially
581 # We don't implement revlogheader because it is handled specially
572 # in the revlog class.
582 # in the revlog class.
573
583
574 def compressstream(self, it, opts=None):
584 def compressstream(self, it, opts=None):
575 return it
585 return it
576
586
577 def decompressorreader(self, fh):
587 def decompressorreader(self, fh):
578 return fh
588 return fh
579
589
580 class nooprevlogcompressor(object):
590 class nooprevlogcompressor(object):
581 def compress(self, data):
591 def compress(self, data):
582 return None
592 return None
583
593
584 def revlogcompressor(self, opts=None):
594 def revlogcompressor(self, opts=None):
585 return self.nooprevlogcompressor()
595 return self.nooprevlogcompressor()
586
596
587 compengines.register(_noopengine())
597 compengines.register(_noopengine())
588
598
589 class _zstdengine(compressionengine):
599 class _zstdengine(compressionengine):
590 def name(self):
600 def name(self):
591 return 'zstd'
601 return 'zstd'
592
602
593 @propertycache
603 @propertycache
594 def _module(self):
604 def _module(self):
595 # Not all installs have the zstd module available. So defer importing
605 # Not all installs have the zstd module available. So defer importing
596 # until first access.
606 # until first access.
597 try:
607 try:
598 from .. import zstd
608 from .. import zstd
599 # Force delayed import.
609 # Force delayed import.
600 zstd.__version__
610 zstd.__version__
601 return zstd
611 return zstd
602 except ImportError:
612 except ImportError:
603 return None
613 return None
604
614
605 def available(self):
615 def available(self):
606 return bool(self._module)
616 return bool(self._module)
607
617
608 def bundletype(self):
618 def bundletype(self):
609 """A modern compression algorithm that is fast and highly flexible.
619 """A modern compression algorithm that is fast and highly flexible.
610
620
611 Only supported by Mercurial 4.1 and newer clients.
621 Only supported by Mercurial 4.1 and newer clients.
612
622
613 With the default settings, zstd compression is both faster and yields
623 With the default settings, zstd compression is both faster and yields
614 better compression than ``gzip``. It also frequently yields better
624 better compression than ``gzip``. It also frequently yields better
615 compression than ``bzip2`` while operating at much higher speeds.
625 compression than ``bzip2`` while operating at much higher speeds.
616
626
617 If this engine is available and backwards compatibility is not a
627 If this engine is available and backwards compatibility is not a
618 concern, it is likely the best available engine.
628 concern, it is likely the best available engine.
619 """
629 """
620 return 'zstd', 'ZS'
630 return 'zstd', 'ZS'
621
631
622 def wireprotosupport(self):
632 def wireprotosupport(self):
623 return compewireprotosupport('zstd', 50, 50)
633 return compewireprotosupport('zstd', 50, 50)
624
634
625 def revlogheader(self):
635 def revlogheader(self):
626 return '\x28'
636 return '\x28'
627
637
628 def compressstream(self, it, opts=None):
638 def compressstream(self, it, opts=None):
629 opts = opts or {}
639 opts = opts or {}
630 # zstd level 3 is almost always significantly faster than zlib
640 # zstd level 3 is almost always significantly faster than zlib
631 # while providing no worse compression. It strikes a good balance
641 # while providing no worse compression. It strikes a good balance
632 # between speed and compression.
642 # between speed and compression.
633 level = opts.get('level', 3)
643 level = opts.get('level', 3)
634
644
635 zstd = self._module
645 zstd = self._module
636 z = zstd.ZstdCompressor(level=level).compressobj()
646 z = zstd.ZstdCompressor(level=level).compressobj()
637 for chunk in it:
647 for chunk in it:
638 data = z.compress(chunk)
648 data = z.compress(chunk)
639 if data:
649 if data:
640 yield data
650 yield data
641
651
642 yield z.flush()
652 yield z.flush()
643
653
644 def decompressorreader(self, fh):
654 def decompressorreader(self, fh):
645 return _ZstdCompressedStreamReader(fh, self._module)
655 return _ZstdCompressedStreamReader(fh, self._module)
646
656
647 class zstdrevlogcompressor(object):
657 class zstdrevlogcompressor(object):
648 def __init__(self, zstd, level=3):
658 def __init__(self, zstd, level=3):
649 # TODO consider omitting frame magic to save 4 bytes.
659 # TODO consider omitting frame magic to save 4 bytes.
650 # This writes content sizes into the frame header. That is
660 # This writes content sizes into the frame header. That is
651 # extra storage. But it allows a correct size memory allocation
661 # extra storage. But it allows a correct size memory allocation
652 # to hold the result.
662 # to hold the result.
653 self._cctx = zstd.ZstdCompressor(level=level)
663 self._cctx = zstd.ZstdCompressor(level=level)
654 self._dctx = zstd.ZstdDecompressor()
664 self._dctx = zstd.ZstdDecompressor()
655 self._compinsize = zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE
665 self._compinsize = zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE
656 self._decompinsize = zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE
666 self._decompinsize = zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE
657
667
658 def compress(self, data):
668 def compress(self, data):
659 insize = len(data)
669 insize = len(data)
660 # Caller handles empty input case.
670 # Caller handles empty input case.
661 assert insize > 0
671 assert insize > 0
662
672
663 if insize < 50:
673 if insize < 50:
664 return None
674 return None
665
675
666 elif insize <= 1000000:
676 elif insize <= 1000000:
667 compressed = self._cctx.compress(data)
677 compressed = self._cctx.compress(data)
668 if len(compressed) < insize:
678 if len(compressed) < insize:
669 return compressed
679 return compressed
670 return None
680 return None
671 else:
681 else:
672 z = self._cctx.compressobj()
682 z = self._cctx.compressobj()
673 chunks = []
683 chunks = []
674 pos = 0
684 pos = 0
675 while pos < insize:
685 while pos < insize:
676 pos2 = pos + self._compinsize
686 pos2 = pos + self._compinsize
677 chunk = z.compress(data[pos:pos2])
687 chunk = z.compress(data[pos:pos2])
678 if chunk:
688 if chunk:
679 chunks.append(chunk)
689 chunks.append(chunk)
680 pos = pos2
690 pos = pos2
681 chunks.append(z.flush())
691 chunks.append(z.flush())
682
692
683 if sum(map(len, chunks)) < insize:
693 if sum(map(len, chunks)) < insize:
684 return ''.join(chunks)
694 return ''.join(chunks)
685 return None
695 return None
686
696
687 def decompress(self, data):
697 def decompress(self, data):
688 insize = len(data)
698 insize = len(data)
689
699
690 try:
700 try:
691 # This was measured to be faster than other streaming
701 # This was measured to be faster than other streaming
692 # decompressors.
702 # decompressors.
693 dobj = self._dctx.decompressobj()
703 dobj = self._dctx.decompressobj()
694 chunks = []
704 chunks = []
695 pos = 0
705 pos = 0
696 while pos < insize:
706 while pos < insize:
697 pos2 = pos + self._decompinsize
707 pos2 = pos + self._decompinsize
698 chunk = dobj.decompress(data[pos:pos2])
708 chunk = dobj.decompress(data[pos:pos2])
699 if chunk:
709 if chunk:
700 chunks.append(chunk)
710 chunks.append(chunk)
701 pos = pos2
711 pos = pos2
702 # Frame should be exhausted, so no finish() API.
712 # Frame should be exhausted, so no finish() API.
703
713
704 return ''.join(chunks)
714 return ''.join(chunks)
705 except Exception as e:
715 except Exception as e:
706 raise error.StorageError(_('revlog decompress error: %s') %
716 raise error.StorageError(_('revlog decompress error: %s') %
707 stringutil.forcebytestr(e))
717 stringutil.forcebytestr(e))
708
718
709 def revlogcompressor(self, opts=None):
719 def revlogcompressor(self, opts=None):
710 opts = opts or {}
720 opts = opts or {}
711 return self.zstdrevlogcompressor(self._module,
721 return self.zstdrevlogcompressor(self._module,
712 level=opts.get('level', 3))
722 level=opts.get('level', 3))
713
723
714 compengines.register(_zstdengine())
724 compengines.register(_zstdengine())
715
725
716 def bundlecompressiontopics():
726 def bundlecompressiontopics():
717 """Obtains a list of available bundle compressions for use in help."""
727 """Obtains a list of available bundle compressions for use in help."""
718 # help.makeitemsdocs() expects a dict of names to items with a .__doc__.
728 # help.makeitemsdocs() expects a dict of names to items with a .__doc__.
719 items = {}
729 items = {}
720
730
721 # We need to format the docstring. So use a dummy object/type to hold it
731 # We need to format the docstring. So use a dummy object/type to hold it
722 # rather than mutating the original.
732 # rather than mutating the original.
723 class docobject(object):
733 class docobject(object):
724 pass
734 pass
725
735
726 for name in compengines:
736 for name in compengines:
727 engine = compengines[name]
737 engine = compengines[name]
728
738
729 if not engine.available():
739 if not engine.available():
730 continue
740 continue
731
741
732 bt = engine.bundletype()
742 bt = engine.bundletype()
733 if not bt or not bt[0]:
743 if not bt or not bt[0]:
734 continue
744 continue
735
745
736 doc = b'``%s``\n %s' % (bt[0], pycompat.getdoc(engine.bundletype))
746 doc = b'``%s``\n %s' % (bt[0], pycompat.getdoc(engine.bundletype))
737
747
738 value = docobject()
748 value = docobject()
739 value.__doc__ = pycompat.sysstr(doc)
749 value.__doc__ = pycompat.sysstr(doc)
740 value._origdoc = engine.bundletype.__doc__
750 value._origdoc = engine.bundletype.__doc__
741 value._origfunc = engine.bundletype
751 value._origfunc = engine.bundletype
742
752
743 items[bt[0]] = value
753 items[bt[0]] = value
744
754
745 return items
755 return items
746
756
747 i18nfunctions = bundlecompressiontopics().values()
757 i18nfunctions = bundlecompressiontopics().values()
General Comments 0
You need to be logged in to leave comments. Login now