##// END OF EJS Templates
compression: use sysstr to specify attribute to fetch for priority...
marmoute -
r51806:93b0de7f default
parent child Browse files
Show More
@@ -1,810 +1,810 b''
1 # compression.py - Mercurial utility functions for compression
1 # compression.py - Mercurial utility functions for compression
2 #
2 #
3 # This software may be used and distributed according to the terms of the
3 # This software may be used and distributed according to the terms of the
4 # GNU General Public License version 2 or any later version.
4 # GNU General Public License version 2 or any later version.
5
5
6
6
7 import bz2
7 import bz2
8 import collections
8 import collections
9 import zlib
9 import zlib
10
10
11 from ..pycompat import getattr
11 from ..pycompat import getattr
12 from .. import (
12 from .. import (
13 error,
13 error,
14 i18n,
14 i18n,
15 pycompat,
15 pycompat,
16 )
16 )
17 from . import stringutil
17 from . import stringutil
18
18
19 safehasattr = pycompat.safehasattr
19 safehasattr = pycompat.safehasattr
20
20
21
21
22 _ = i18n._
22 _ = i18n._
23
23
24 # compression code
24 # compression code
25
25
26 SERVERROLE = b'server'
26 SERVERROLE = b'server'
27 CLIENTROLE = b'client'
27 CLIENTROLE = b'client'
28
28
29 compewireprotosupport = collections.namedtuple(
29 compewireprotosupport = collections.namedtuple(
30 'compenginewireprotosupport',
30 'compenginewireprotosupport',
31 ('name', 'serverpriority', 'clientpriority'),
31 ('name', 'serverpriority', 'clientpriority'),
32 )
32 )
33
33
34
34
35 class propertycache:
35 class propertycache:
36 def __init__(self, func):
36 def __init__(self, func):
37 self.func = func
37 self.func = func
38 self.name = func.__name__
38 self.name = func.__name__
39
39
40 def __get__(self, obj, type=None):
40 def __get__(self, obj, type=None):
41 result = self.func(obj)
41 result = self.func(obj)
42 self.cachevalue(obj, result)
42 self.cachevalue(obj, result)
43 return result
43 return result
44
44
45 def cachevalue(self, obj, value):
45 def cachevalue(self, obj, value):
46 # __dict__ assignment required to bypass __setattr__ (eg: repoview)
46 # __dict__ assignment required to bypass __setattr__ (eg: repoview)
47 obj.__dict__[self.name] = value
47 obj.__dict__[self.name] = value
48
48
49
49
50 class compressormanager:
50 class compressormanager:
51 """Holds registrations of various compression engines.
51 """Holds registrations of various compression engines.
52
52
53 This class essentially abstracts the differences between compression
53 This class essentially abstracts the differences between compression
54 engines to allow new compression formats to be added easily, possibly from
54 engines to allow new compression formats to be added easily, possibly from
55 extensions.
55 extensions.
56
56
57 Compressors are registered against the global instance by calling its
57 Compressors are registered against the global instance by calling its
58 ``register()`` method.
58 ``register()`` method.
59 """
59 """
60
60
61 def __init__(self):
61 def __init__(self):
62 self._engines = {}
62 self._engines = {}
63 # Bundle spec human name to engine name.
63 # Bundle spec human name to engine name.
64 self._bundlenames = {}
64 self._bundlenames = {}
65 # Internal bundle identifier to engine name.
65 # Internal bundle identifier to engine name.
66 self._bundletypes = {}
66 self._bundletypes = {}
67 # Revlog header to engine name.
67 # Revlog header to engine name.
68 self._revlogheaders = {}
68 self._revlogheaders = {}
69 # Wire proto identifier to engine name.
69 # Wire proto identifier to engine name.
70 self._wiretypes = {}
70 self._wiretypes = {}
71
71
72 def __getitem__(self, key):
72 def __getitem__(self, key):
73 return self._engines[key]
73 return self._engines[key]
74
74
75 def __contains__(self, key):
75 def __contains__(self, key):
76 return key in self._engines
76 return key in self._engines
77
77
78 def __iter__(self):
78 def __iter__(self):
79 return iter(self._engines.keys())
79 return iter(self._engines.keys())
80
80
81 def register(self, engine):
81 def register(self, engine):
82 """Register a compression engine with the manager.
82 """Register a compression engine with the manager.
83
83
84 The argument must be a ``compressionengine`` instance.
84 The argument must be a ``compressionengine`` instance.
85 """
85 """
86 if not isinstance(engine, compressionengine):
86 if not isinstance(engine, compressionengine):
87 raise ValueError(_(b'argument must be a compressionengine'))
87 raise ValueError(_(b'argument must be a compressionengine'))
88
88
89 name = engine.name()
89 name = engine.name()
90
90
91 if name in self._engines:
91 if name in self._engines:
92 raise error.Abort(
92 raise error.Abort(
93 _(b'compression engine %s already registered') % name
93 _(b'compression engine %s already registered') % name
94 )
94 )
95
95
96 bundleinfo = engine.bundletype()
96 bundleinfo = engine.bundletype()
97 if bundleinfo:
97 if bundleinfo:
98 bundlename, bundletype = bundleinfo
98 bundlename, bundletype = bundleinfo
99
99
100 if bundlename in self._bundlenames:
100 if bundlename in self._bundlenames:
101 raise error.Abort(
101 raise error.Abort(
102 _(b'bundle name %s already registered') % bundlename
102 _(b'bundle name %s already registered') % bundlename
103 )
103 )
104 if bundletype in self._bundletypes:
104 if bundletype in self._bundletypes:
105 raise error.Abort(
105 raise error.Abort(
106 _(b'bundle type %s already registered by %s')
106 _(b'bundle type %s already registered by %s')
107 % (bundletype, self._bundletypes[bundletype])
107 % (bundletype, self._bundletypes[bundletype])
108 )
108 )
109
109
110 # No external facing name declared.
110 # No external facing name declared.
111 if bundlename:
111 if bundlename:
112 self._bundlenames[bundlename] = name
112 self._bundlenames[bundlename] = name
113
113
114 self._bundletypes[bundletype] = name
114 self._bundletypes[bundletype] = name
115
115
116 wiresupport = engine.wireprotosupport()
116 wiresupport = engine.wireprotosupport()
117 if wiresupport:
117 if wiresupport:
118 wiretype = wiresupport.name
118 wiretype = wiresupport.name
119 if wiretype in self._wiretypes:
119 if wiretype in self._wiretypes:
120 raise error.Abort(
120 raise error.Abort(
121 _(
121 _(
122 b'wire protocol compression %s already '
122 b'wire protocol compression %s already '
123 b'registered by %s'
123 b'registered by %s'
124 )
124 )
125 % (wiretype, self._wiretypes[wiretype])
125 % (wiretype, self._wiretypes[wiretype])
126 )
126 )
127
127
128 self._wiretypes[wiretype] = name
128 self._wiretypes[wiretype] = name
129
129
130 revlogheader = engine.revlogheader()
130 revlogheader = engine.revlogheader()
131 if revlogheader and revlogheader in self._revlogheaders:
131 if revlogheader and revlogheader in self._revlogheaders:
132 raise error.Abort(
132 raise error.Abort(
133 _(b'revlog header %s already registered by %s')
133 _(b'revlog header %s already registered by %s')
134 % (revlogheader, self._revlogheaders[revlogheader])
134 % (revlogheader, self._revlogheaders[revlogheader])
135 )
135 )
136
136
137 if revlogheader:
137 if revlogheader:
138 self._revlogheaders[revlogheader] = name
138 self._revlogheaders[revlogheader] = name
139
139
140 self._engines[name] = engine
140 self._engines[name] = engine
141
141
142 @property
142 @property
143 def supportedbundlenames(self):
143 def supportedbundlenames(self):
144 return set(self._bundlenames.keys())
144 return set(self._bundlenames.keys())
145
145
146 @property
146 @property
147 def supportedbundletypes(self):
147 def supportedbundletypes(self):
148 return set(self._bundletypes.keys())
148 return set(self._bundletypes.keys())
149
149
150 def forbundlename(self, bundlename):
150 def forbundlename(self, bundlename):
151 """Obtain a compression engine registered to a bundle name.
151 """Obtain a compression engine registered to a bundle name.
152
152
153 Will raise KeyError if the bundle type isn't registered.
153 Will raise KeyError if the bundle type isn't registered.
154
154
155 Will abort if the engine is known but not available.
155 Will abort if the engine is known but not available.
156 """
156 """
157 engine = self._engines[self._bundlenames[bundlename]]
157 engine = self._engines[self._bundlenames[bundlename]]
158 if not engine.available():
158 if not engine.available():
159 raise error.Abort(
159 raise error.Abort(
160 _(b'compression engine %s could not be loaded') % engine.name()
160 _(b'compression engine %s could not be loaded') % engine.name()
161 )
161 )
162 return engine
162 return engine
163
163
164 def forbundletype(self, bundletype):
164 def forbundletype(self, bundletype):
165 """Obtain a compression engine registered to a bundle type.
165 """Obtain a compression engine registered to a bundle type.
166
166
167 Will raise KeyError if the bundle type isn't registered.
167 Will raise KeyError if the bundle type isn't registered.
168
168
169 Will abort if the engine is known but not available.
169 Will abort if the engine is known but not available.
170 """
170 """
171 engine = self._engines[self._bundletypes[bundletype]]
171 engine = self._engines[self._bundletypes[bundletype]]
172 if not engine.available():
172 if not engine.available():
173 raise error.Abort(
173 raise error.Abort(
174 _(b'compression engine %s could not be loaded') % engine.name()
174 _(b'compression engine %s could not be loaded') % engine.name()
175 )
175 )
176 return engine
176 return engine
177
177
178 def supportedwireengines(self, role, onlyavailable=True):
178 def supportedwireengines(self, role, onlyavailable=True):
179 """Obtain compression engines that support the wire protocol.
179 """Obtain compression engines that support the wire protocol.
180
180
181 Returns a list of engines in prioritized order, most desired first.
181 Returns a list of engines in prioritized order, most desired first.
182
182
183 If ``onlyavailable`` is set, filter out engines that can't be
183 If ``onlyavailable`` is set, filter out engines that can't be
184 loaded.
184 loaded.
185 """
185 """
186 assert role in (SERVERROLE, CLIENTROLE)
186 assert role in (SERVERROLE, CLIENTROLE)
187
187
188 attr = b'serverpriority' if role == SERVERROLE else b'clientpriority'
188 attr = 'serverpriority' if role == SERVERROLE else 'clientpriority'
189
189
190 engines = [self._engines[e] for e in self._wiretypes.values()]
190 engines = [self._engines[e] for e in self._wiretypes.values()]
191 if onlyavailable:
191 if onlyavailable:
192 engines = [e for e in engines if e.available()]
192 engines = [e for e in engines if e.available()]
193
193
194 def getkey(e):
194 def getkey(e):
195 # Sort first by priority, highest first. In case of tie, sort
195 # Sort first by priority, highest first. In case of tie, sort
196 # alphabetically. This is arbitrary, but ensures output is
196 # alphabetically. This is arbitrary, but ensures output is
197 # stable.
197 # stable.
198 w = e.wireprotosupport()
198 w = e.wireprotosupport()
199 return -1 * getattr(w, attr), w.name
199 return -1 * getattr(w, attr), w.name
200
200
201 return list(sorted(engines, key=getkey))
201 return list(sorted(engines, key=getkey))
202
202
203 def forwiretype(self, wiretype):
203 def forwiretype(self, wiretype):
204 engine = self._engines[self._wiretypes[wiretype]]
204 engine = self._engines[self._wiretypes[wiretype]]
205 if not engine.available():
205 if not engine.available():
206 raise error.Abort(
206 raise error.Abort(
207 _(b'compression engine %s could not be loaded') % engine.name()
207 _(b'compression engine %s could not be loaded') % engine.name()
208 )
208 )
209 return engine
209 return engine
210
210
211 def forrevlogheader(self, header):
211 def forrevlogheader(self, header):
212 """Obtain a compression engine registered to a revlog header.
212 """Obtain a compression engine registered to a revlog header.
213
213
214 Will raise KeyError if the revlog header value isn't registered.
214 Will raise KeyError if the revlog header value isn't registered.
215 """
215 """
216 return self._engines[self._revlogheaders[header]]
216 return self._engines[self._revlogheaders[header]]
217
217
218
218
219 compengines = compressormanager()
219 compengines = compressormanager()
220
220
221
221
222 class compressionengine:
222 class compressionengine:
223 """Base class for compression engines.
223 """Base class for compression engines.
224
224
225 Compression engines must implement the interface defined by this class.
225 Compression engines must implement the interface defined by this class.
226 """
226 """
227
227
228 def name(self):
228 def name(self):
229 """Returns the name of the compression engine.
229 """Returns the name of the compression engine.
230
230
231 This is the key the engine is registered under.
231 This is the key the engine is registered under.
232
232
233 This method must be implemented.
233 This method must be implemented.
234 """
234 """
235 raise NotImplementedError()
235 raise NotImplementedError()
236
236
237 def available(self):
237 def available(self):
238 """Whether the compression engine is available.
238 """Whether the compression engine is available.
239
239
240 The intent of this method is to allow optional compression engines
240 The intent of this method is to allow optional compression engines
241 that may not be available in all installations (such as engines relying
241 that may not be available in all installations (such as engines relying
242 on C extensions that may not be present).
242 on C extensions that may not be present).
243 """
243 """
244 return True
244 return True
245
245
246 def bundletype(self):
246 def bundletype(self):
247 """Describes bundle identifiers for this engine.
247 """Describes bundle identifiers for this engine.
248
248
249 If this compression engine isn't supported for bundles, returns None.
249 If this compression engine isn't supported for bundles, returns None.
250
250
251 If this engine can be used for bundles, returns a 2-tuple of strings of
251 If this engine can be used for bundles, returns a 2-tuple of strings of
252 the user-facing "bundle spec" compression name and an internal
252 the user-facing "bundle spec" compression name and an internal
253 identifier used to denote the compression format within bundles. To
253 identifier used to denote the compression format within bundles. To
254 exclude the name from external usage, set the first element to ``None``.
254 exclude the name from external usage, set the first element to ``None``.
255
255
256 If bundle compression is supported, the class must also implement
256 If bundle compression is supported, the class must also implement
257 ``compressstream`` and `decompressorreader``.
257 ``compressstream`` and `decompressorreader``.
258
258
259 The docstring of this method is used in the help system to tell users
259 The docstring of this method is used in the help system to tell users
260 about this engine.
260 about this engine.
261 """
261 """
262 return None
262 return None
263
263
264 def wireprotosupport(self):
264 def wireprotosupport(self):
265 """Declare support for this compression format on the wire protocol.
265 """Declare support for this compression format on the wire protocol.
266
266
267 If this compression engine isn't supported for compressing wire
267 If this compression engine isn't supported for compressing wire
268 protocol payloads, returns None.
268 protocol payloads, returns None.
269
269
270 Otherwise, returns ``compenginewireprotosupport`` with the following
270 Otherwise, returns ``compenginewireprotosupport`` with the following
271 fields:
271 fields:
272
272
273 * String format identifier
273 * String format identifier
274 * Integer priority for the server
274 * Integer priority for the server
275 * Integer priority for the client
275 * Integer priority for the client
276
276
277 The integer priorities are used to order the advertisement of format
277 The integer priorities are used to order the advertisement of format
278 support by server and client. The highest integer is advertised
278 support by server and client. The highest integer is advertised
279 first. Integers with non-positive values aren't advertised.
279 first. Integers with non-positive values aren't advertised.
280
280
281 The priority values are somewhat arbitrary and only used for default
281 The priority values are somewhat arbitrary and only used for default
282 ordering. The relative order can be changed via config options.
282 ordering. The relative order can be changed via config options.
283
283
284 If wire protocol compression is supported, the class must also implement
284 If wire protocol compression is supported, the class must also implement
285 ``compressstream`` and ``decompressorreader``.
285 ``compressstream`` and ``decompressorreader``.
286 """
286 """
287 return None
287 return None
288
288
289 def revlogheader(self):
289 def revlogheader(self):
290 """Header added to revlog chunks that identifies this engine.
290 """Header added to revlog chunks that identifies this engine.
291
291
292 If this engine can be used to compress revlogs, this method should
292 If this engine can be used to compress revlogs, this method should
293 return the bytes used to identify chunks compressed with this engine.
293 return the bytes used to identify chunks compressed with this engine.
294 Else, the method should return ``None`` to indicate it does not
294 Else, the method should return ``None`` to indicate it does not
295 participate in revlog compression.
295 participate in revlog compression.
296 """
296 """
297 return None
297 return None
298
298
299 def compressstream(self, it, opts=None):
299 def compressstream(self, it, opts=None):
300 """Compress an iterator of chunks.
300 """Compress an iterator of chunks.
301
301
302 The method receives an iterator (ideally a generator) of chunks of
302 The method receives an iterator (ideally a generator) of chunks of
303 bytes to be compressed. It returns an iterator (ideally a generator)
303 bytes to be compressed. It returns an iterator (ideally a generator)
304 of bytes of chunks representing the compressed output.
304 of bytes of chunks representing the compressed output.
305
305
306 Optionally accepts an argument defining how to perform compression.
306 Optionally accepts an argument defining how to perform compression.
307 Each engine treats this argument differently.
307 Each engine treats this argument differently.
308 """
308 """
309 raise NotImplementedError()
309 raise NotImplementedError()
310
310
311 def decompressorreader(self, fh):
311 def decompressorreader(self, fh):
312 """Perform decompression on a file object.
312 """Perform decompression on a file object.
313
313
314 Argument is an object with a ``read(size)`` method that returns
314 Argument is an object with a ``read(size)`` method that returns
315 compressed data. Return value is an object with a ``read(size)`` that
315 compressed data. Return value is an object with a ``read(size)`` that
316 returns uncompressed data.
316 returns uncompressed data.
317 """
317 """
318 raise NotImplementedError()
318 raise NotImplementedError()
319
319
320 def revlogcompressor(self, opts=None):
320 def revlogcompressor(self, opts=None):
321 """Obtain an object that can be used to compress revlog entries.
321 """Obtain an object that can be used to compress revlog entries.
322
322
323 The object has a ``compress(data)`` method that compresses binary
323 The object has a ``compress(data)`` method that compresses binary
324 data. This method returns compressed binary data or ``None`` if
324 data. This method returns compressed binary data or ``None`` if
325 the data could not be compressed (too small, not compressible, etc).
325 the data could not be compressed (too small, not compressible, etc).
326 The returned data should have a header uniquely identifying this
326 The returned data should have a header uniquely identifying this
327 compression format so decompression can be routed to this engine.
327 compression format so decompression can be routed to this engine.
328 This header should be identified by the ``revlogheader()`` return
328 This header should be identified by the ``revlogheader()`` return
329 value.
329 value.
330
330
331 The object has a ``decompress(data)`` method that decompresses
331 The object has a ``decompress(data)`` method that decompresses
332 data. The method will only be called if ``data`` begins with
332 data. The method will only be called if ``data`` begins with
333 ``revlogheader()``. The method should return the raw, uncompressed
333 ``revlogheader()``. The method should return the raw, uncompressed
334 data or raise a ``StorageError``.
334 data or raise a ``StorageError``.
335
335
336 The object is reusable but is not thread safe.
336 The object is reusable but is not thread safe.
337 """
337 """
338 raise NotImplementedError()
338 raise NotImplementedError()
339
339
340
340
341 class _CompressedStreamReader:
341 class _CompressedStreamReader:
342 def __init__(self, fh):
342 def __init__(self, fh):
343 if safehasattr(fh, 'unbufferedread'):
343 if safehasattr(fh, 'unbufferedread'):
344 self._reader = fh.unbufferedread
344 self._reader = fh.unbufferedread
345 else:
345 else:
346 self._reader = fh.read
346 self._reader = fh.read
347 self._pending = []
347 self._pending = []
348 self._pos = 0
348 self._pos = 0
349 self._eof = False
349 self._eof = False
350
350
351 def _decompress(self, chunk):
351 def _decompress(self, chunk):
352 raise NotImplementedError()
352 raise NotImplementedError()
353
353
354 def read(self, l):
354 def read(self, l):
355 buf = []
355 buf = []
356 while True:
356 while True:
357 while self._pending:
357 while self._pending:
358 if len(self._pending[0]) > l + self._pos:
358 if len(self._pending[0]) > l + self._pos:
359 newbuf = self._pending[0]
359 newbuf = self._pending[0]
360 buf.append(newbuf[self._pos : self._pos + l])
360 buf.append(newbuf[self._pos : self._pos + l])
361 self._pos += l
361 self._pos += l
362 return b''.join(buf)
362 return b''.join(buf)
363
363
364 newbuf = self._pending.pop(0)
364 newbuf = self._pending.pop(0)
365 if self._pos:
365 if self._pos:
366 buf.append(newbuf[self._pos :])
366 buf.append(newbuf[self._pos :])
367 l -= len(newbuf) - self._pos
367 l -= len(newbuf) - self._pos
368 else:
368 else:
369 buf.append(newbuf)
369 buf.append(newbuf)
370 l -= len(newbuf)
370 l -= len(newbuf)
371 self._pos = 0
371 self._pos = 0
372
372
373 if self._eof:
373 if self._eof:
374 return b''.join(buf)
374 return b''.join(buf)
375 chunk = self._reader(65536)
375 chunk = self._reader(65536)
376 self._decompress(chunk)
376 self._decompress(chunk)
377 if not chunk and not self._pending and not self._eof:
377 if not chunk and not self._pending and not self._eof:
378 # No progress and no new data, bail out
378 # No progress and no new data, bail out
379 return b''.join(buf)
379 return b''.join(buf)
380
380
381
381
382 class _GzipCompressedStreamReader(_CompressedStreamReader):
382 class _GzipCompressedStreamReader(_CompressedStreamReader):
383 def __init__(self, fh):
383 def __init__(self, fh):
384 super(_GzipCompressedStreamReader, self).__init__(fh)
384 super(_GzipCompressedStreamReader, self).__init__(fh)
385 self._decompobj = zlib.decompressobj()
385 self._decompobj = zlib.decompressobj()
386
386
387 def _decompress(self, chunk):
387 def _decompress(self, chunk):
388 newbuf = self._decompobj.decompress(chunk)
388 newbuf = self._decompobj.decompress(chunk)
389 if newbuf:
389 if newbuf:
390 self._pending.append(newbuf)
390 self._pending.append(newbuf)
391 d = self._decompobj.copy()
391 d = self._decompobj.copy()
392 try:
392 try:
393 d.decompress(b'x')
393 d.decompress(b'x')
394 d.flush()
394 d.flush()
395 if d.unused_data == b'x':
395 if d.unused_data == b'x':
396 self._eof = True
396 self._eof = True
397 except zlib.error:
397 except zlib.error:
398 pass
398 pass
399
399
400
400
401 class _BZ2CompressedStreamReader(_CompressedStreamReader):
401 class _BZ2CompressedStreamReader(_CompressedStreamReader):
402 def __init__(self, fh):
402 def __init__(self, fh):
403 super(_BZ2CompressedStreamReader, self).__init__(fh)
403 super(_BZ2CompressedStreamReader, self).__init__(fh)
404 self._decompobj = bz2.BZ2Decompressor()
404 self._decompobj = bz2.BZ2Decompressor()
405
405
406 def _decompress(self, chunk):
406 def _decompress(self, chunk):
407 newbuf = self._decompobj.decompress(chunk)
407 newbuf = self._decompobj.decompress(chunk)
408 if newbuf:
408 if newbuf:
409 self._pending.append(newbuf)
409 self._pending.append(newbuf)
410 try:
410 try:
411 while True:
411 while True:
412 newbuf = self._decompobj.decompress(b'')
412 newbuf = self._decompobj.decompress(b'')
413 if newbuf:
413 if newbuf:
414 self._pending.append(newbuf)
414 self._pending.append(newbuf)
415 else:
415 else:
416 break
416 break
417 except EOFError:
417 except EOFError:
418 self._eof = True
418 self._eof = True
419
419
420
420
421 class _TruncatedBZ2CompressedStreamReader(_BZ2CompressedStreamReader):
421 class _TruncatedBZ2CompressedStreamReader(_BZ2CompressedStreamReader):
422 def __init__(self, fh):
422 def __init__(self, fh):
423 super(_TruncatedBZ2CompressedStreamReader, self).__init__(fh)
423 super(_TruncatedBZ2CompressedStreamReader, self).__init__(fh)
424 newbuf = self._decompobj.decompress(b'BZ')
424 newbuf = self._decompobj.decompress(b'BZ')
425 if newbuf:
425 if newbuf:
426 self._pending.append(newbuf)
426 self._pending.append(newbuf)
427
427
428
428
429 class _ZstdCompressedStreamReader(_CompressedStreamReader):
429 class _ZstdCompressedStreamReader(_CompressedStreamReader):
430 def __init__(self, fh, zstd):
430 def __init__(self, fh, zstd):
431 super(_ZstdCompressedStreamReader, self).__init__(fh)
431 super(_ZstdCompressedStreamReader, self).__init__(fh)
432 self._zstd = zstd
432 self._zstd = zstd
433 self._decompobj = zstd.ZstdDecompressor().decompressobj()
433 self._decompobj = zstd.ZstdDecompressor().decompressobj()
434
434
435 def _decompress(self, chunk):
435 def _decompress(self, chunk):
436 newbuf = self._decompobj.decompress(chunk)
436 newbuf = self._decompobj.decompress(chunk)
437 if newbuf:
437 if newbuf:
438 self._pending.append(newbuf)
438 self._pending.append(newbuf)
439 try:
439 try:
440 while True:
440 while True:
441 newbuf = self._decompobj.decompress(b'')
441 newbuf = self._decompobj.decompress(b'')
442 if newbuf:
442 if newbuf:
443 self._pending.append(newbuf)
443 self._pending.append(newbuf)
444 else:
444 else:
445 break
445 break
446 except self._zstd.ZstdError:
446 except self._zstd.ZstdError:
447 self._eof = True
447 self._eof = True
448
448
449
449
450 class _zlibengine(compressionengine):
450 class _zlibengine(compressionengine):
451 def name(self):
451 def name(self):
452 return b'zlib'
452 return b'zlib'
453
453
454 def bundletype(self):
454 def bundletype(self):
455 """zlib compression using the DEFLATE algorithm.
455 """zlib compression using the DEFLATE algorithm.
456
456
457 All Mercurial clients should support this format. The compression
457 All Mercurial clients should support this format. The compression
458 algorithm strikes a reasonable balance between compression ratio
458 algorithm strikes a reasonable balance between compression ratio
459 and size.
459 and size.
460 """
460 """
461 return b'gzip', b'GZ'
461 return b'gzip', b'GZ'
462
462
463 def wireprotosupport(self):
463 def wireprotosupport(self):
464 return compewireprotosupport(b'zlib', 20, 20)
464 return compewireprotosupport(b'zlib', 20, 20)
465
465
466 def revlogheader(self):
466 def revlogheader(self):
467 return b'x'
467 return b'x'
468
468
469 def compressstream(self, it, opts=None):
469 def compressstream(self, it, opts=None):
470 opts = opts or {}
470 opts = opts or {}
471
471
472 z = zlib.compressobj(opts.get(b'level', -1))
472 z = zlib.compressobj(opts.get(b'level', -1))
473 for chunk in it:
473 for chunk in it:
474 data = z.compress(chunk)
474 data = z.compress(chunk)
475 # Not all calls to compress emit data. It is cheaper to inspect
475 # Not all calls to compress emit data. It is cheaper to inspect
476 # here than to feed empty chunks through generator.
476 # here than to feed empty chunks through generator.
477 if data:
477 if data:
478 yield data
478 yield data
479
479
480 yield z.flush()
480 yield z.flush()
481
481
482 def decompressorreader(self, fh):
482 def decompressorreader(self, fh):
483 return _GzipCompressedStreamReader(fh)
483 return _GzipCompressedStreamReader(fh)
484
484
485 class zlibrevlogcompressor:
485 class zlibrevlogcompressor:
486 def __init__(self, level=None):
486 def __init__(self, level=None):
487 self._level = level
487 self._level = level
488
488
489 def compress(self, data):
489 def compress(self, data):
490 insize = len(data)
490 insize = len(data)
491 # Caller handles empty input case.
491 # Caller handles empty input case.
492 assert insize > 0
492 assert insize > 0
493
493
494 if insize < 44:
494 if insize < 44:
495 return None
495 return None
496
496
497 elif insize <= 1000000:
497 elif insize <= 1000000:
498 if self._level is None:
498 if self._level is None:
499 compressed = zlib.compress(data)
499 compressed = zlib.compress(data)
500 else:
500 else:
501 compressed = zlib.compress(data, self._level)
501 compressed = zlib.compress(data, self._level)
502 if len(compressed) < insize:
502 if len(compressed) < insize:
503 return compressed
503 return compressed
504 return None
504 return None
505
505
506 # zlib makes an internal copy of the input buffer, doubling
506 # zlib makes an internal copy of the input buffer, doubling
507 # memory usage for large inputs. So do streaming compression
507 # memory usage for large inputs. So do streaming compression
508 # on large inputs.
508 # on large inputs.
509 else:
509 else:
510 if self._level is None:
510 if self._level is None:
511 z = zlib.compressobj()
511 z = zlib.compressobj()
512 else:
512 else:
513 z = zlib.compressobj(level=self._level)
513 z = zlib.compressobj(level=self._level)
514 parts = []
514 parts = []
515 pos = 0
515 pos = 0
516 while pos < insize:
516 while pos < insize:
517 pos2 = pos + 2 ** 20
517 pos2 = pos + 2 ** 20
518 parts.append(z.compress(data[pos:pos2]))
518 parts.append(z.compress(data[pos:pos2]))
519 pos = pos2
519 pos = pos2
520 parts.append(z.flush())
520 parts.append(z.flush())
521
521
522 if sum(map(len, parts)) < insize:
522 if sum(map(len, parts)) < insize:
523 return b''.join(parts)
523 return b''.join(parts)
524 return None
524 return None
525
525
526 def decompress(self, data):
526 def decompress(self, data):
527 try:
527 try:
528 return zlib.decompress(data)
528 return zlib.decompress(data)
529 except zlib.error as e:
529 except zlib.error as e:
530 raise error.StorageError(
530 raise error.StorageError(
531 _(b'revlog decompress error: %s')
531 _(b'revlog decompress error: %s')
532 % stringutil.forcebytestr(e)
532 % stringutil.forcebytestr(e)
533 )
533 )
534
534
535 def revlogcompressor(self, opts=None):
535 def revlogcompressor(self, opts=None):
536 level = None
536 level = None
537 if opts is not None:
537 if opts is not None:
538 level = opts.get(b'zlib.level')
538 level = opts.get(b'zlib.level')
539 return self.zlibrevlogcompressor(level)
539 return self.zlibrevlogcompressor(level)
540
540
541
541
542 compengines.register(_zlibengine())
542 compengines.register(_zlibengine())
543
543
544
544
545 class _bz2engine(compressionengine):
545 class _bz2engine(compressionengine):
546 def name(self):
546 def name(self):
547 return b'bz2'
547 return b'bz2'
548
548
549 def bundletype(self):
549 def bundletype(self):
550 """An algorithm that produces smaller bundles than ``gzip``.
550 """An algorithm that produces smaller bundles than ``gzip``.
551
551
552 All Mercurial clients should support this format.
552 All Mercurial clients should support this format.
553
553
554 This engine will likely produce smaller bundles than ``gzip`` but
554 This engine will likely produce smaller bundles than ``gzip`` but
555 will be significantly slower, both during compression and
555 will be significantly slower, both during compression and
556 decompression.
556 decompression.
557
557
558 If available, the ``zstd`` engine can yield similar or better
558 If available, the ``zstd`` engine can yield similar or better
559 compression at much higher speeds.
559 compression at much higher speeds.
560 """
560 """
561 return b'bzip2', b'BZ'
561 return b'bzip2', b'BZ'
562
562
563 # We declare a protocol name but don't advertise by default because
563 # We declare a protocol name but don't advertise by default because
564 # it is slow.
564 # it is slow.
565 def wireprotosupport(self):
565 def wireprotosupport(self):
566 return compewireprotosupport(b'bzip2', 0, 0)
566 return compewireprotosupport(b'bzip2', 0, 0)
567
567
568 def compressstream(self, it, opts=None):
568 def compressstream(self, it, opts=None):
569 opts = opts or {}
569 opts = opts or {}
570 z = bz2.BZ2Compressor(opts.get(b'level', 9))
570 z = bz2.BZ2Compressor(opts.get(b'level', 9))
571 for chunk in it:
571 for chunk in it:
572 data = z.compress(chunk)
572 data = z.compress(chunk)
573 if data:
573 if data:
574 yield data
574 yield data
575
575
576 yield z.flush()
576 yield z.flush()
577
577
578 def decompressorreader(self, fh):
578 def decompressorreader(self, fh):
579 return _BZ2CompressedStreamReader(fh)
579 return _BZ2CompressedStreamReader(fh)
580
580
581
581
582 compengines.register(_bz2engine())
582 compengines.register(_bz2engine())
583
583
584
584
585 class _truncatedbz2engine(compressionengine):
585 class _truncatedbz2engine(compressionengine):
586 def name(self):
586 def name(self):
587 return b'bz2truncated'
587 return b'bz2truncated'
588
588
589 def bundletype(self):
589 def bundletype(self):
590 return None, b'_truncatedBZ'
590 return None, b'_truncatedBZ'
591
591
592 # We don't implement compressstream because it is hackily handled elsewhere.
592 # We don't implement compressstream because it is hackily handled elsewhere.
593
593
594 def decompressorreader(self, fh):
594 def decompressorreader(self, fh):
595 return _TruncatedBZ2CompressedStreamReader(fh)
595 return _TruncatedBZ2CompressedStreamReader(fh)
596
596
597
597
598 compengines.register(_truncatedbz2engine())
598 compengines.register(_truncatedbz2engine())
599
599
600
600
601 class _noopengine(compressionengine):
601 class _noopengine(compressionengine):
602 def name(self):
602 def name(self):
603 return b'none'
603 return b'none'
604
604
605 def bundletype(self):
605 def bundletype(self):
606 """No compression is performed.
606 """No compression is performed.
607
607
608 Use this compression engine to explicitly disable compression.
608 Use this compression engine to explicitly disable compression.
609 """
609 """
610 return b'none', b'UN'
610 return b'none', b'UN'
611
611
612 # Clients always support uncompressed payloads. Servers don't because
612 # Clients always support uncompressed payloads. Servers don't because
613 # unless you are on a fast network, uncompressed payloads can easily
613 # unless you are on a fast network, uncompressed payloads can easily
614 # saturate your network pipe.
614 # saturate your network pipe.
615 def wireprotosupport(self):
615 def wireprotosupport(self):
616 return compewireprotosupport(b'none', 0, 10)
616 return compewireprotosupport(b'none', 0, 10)
617
617
618 # revlog special cases the uncompressed case, but implementing
618 # revlog special cases the uncompressed case, but implementing
619 # revlogheader allows forcing uncompressed storage.
619 # revlogheader allows forcing uncompressed storage.
620 def revlogheader(self):
620 def revlogheader(self):
621 return b'\0'
621 return b'\0'
622
622
623 def compressstream(self, it, opts=None):
623 def compressstream(self, it, opts=None):
624 return it
624 return it
625
625
626 def decompressorreader(self, fh):
626 def decompressorreader(self, fh):
627 return fh
627 return fh
628
628
629 class nooprevlogcompressor:
629 class nooprevlogcompressor:
630 def compress(self, data):
630 def compress(self, data):
631 return None
631 return None
632
632
633 def revlogcompressor(self, opts=None):
633 def revlogcompressor(self, opts=None):
634 return self.nooprevlogcompressor()
634 return self.nooprevlogcompressor()
635
635
636
636
637 compengines.register(_noopengine())
637 compengines.register(_noopengine())
638
638
639
639
640 class _zstdengine(compressionengine):
640 class _zstdengine(compressionengine):
641 def name(self):
641 def name(self):
642 return b'zstd'
642 return b'zstd'
643
643
644 @propertycache
644 @propertycache
645 def _module(self):
645 def _module(self):
646 # Not all installs have the zstd module available. So defer importing
646 # Not all installs have the zstd module available. So defer importing
647 # until first access.
647 # until first access.
648 try:
648 try:
649 from .. import zstd # pytype: disable=import-error
649 from .. import zstd # pytype: disable=import-error
650
650
651 # Force delayed import.
651 # Force delayed import.
652 zstd.__version__
652 zstd.__version__
653 return zstd
653 return zstd
654 except ImportError:
654 except ImportError:
655 return None
655 return None
656
656
657 def available(self):
657 def available(self):
658 return bool(self._module)
658 return bool(self._module)
659
659
660 def bundletype(self):
660 def bundletype(self):
661 """A modern compression algorithm that is fast and highly flexible.
661 """A modern compression algorithm that is fast and highly flexible.
662
662
663 Only supported by Mercurial 4.1 and newer clients.
663 Only supported by Mercurial 4.1 and newer clients.
664
664
665 With the default settings, zstd compression is both faster and yields
665 With the default settings, zstd compression is both faster and yields
666 better compression than ``gzip``. It also frequently yields better
666 better compression than ``gzip``. It also frequently yields better
667 compression than ``bzip2`` while operating at much higher speeds.
667 compression than ``bzip2`` while operating at much higher speeds.
668
668
669 If this engine is available and backwards compatibility is not a
669 If this engine is available and backwards compatibility is not a
670 concern, it is likely the best available engine.
670 concern, it is likely the best available engine.
671 """
671 """
672 return b'zstd', b'ZS'
672 return b'zstd', b'ZS'
673
673
674 def wireprotosupport(self):
674 def wireprotosupport(self):
675 return compewireprotosupport(b'zstd', 50, 50)
675 return compewireprotosupport(b'zstd', 50, 50)
676
676
677 def revlogheader(self):
677 def revlogheader(self):
678 return b'\x28'
678 return b'\x28'
679
679
680 def compressstream(self, it, opts=None):
680 def compressstream(self, it, opts=None):
681 opts = opts or {}
681 opts = opts or {}
682 # zstd level 3 is almost always significantly faster than zlib
682 # zstd level 3 is almost always significantly faster than zlib
683 # while providing no worse compression. It strikes a good balance
683 # while providing no worse compression. It strikes a good balance
684 # between speed and compression.
684 # between speed and compression.
685 level = opts.get(b'level', 3)
685 level = opts.get(b'level', 3)
686 # default to single-threaded compression
686 # default to single-threaded compression
687 threads = opts.get(b'threads', 0)
687 threads = opts.get(b'threads', 0)
688
688
689 zstd = self._module
689 zstd = self._module
690 z = zstd.ZstdCompressor(level=level, threads=threads).compressobj()
690 z = zstd.ZstdCompressor(level=level, threads=threads).compressobj()
691 for chunk in it:
691 for chunk in it:
692 data = z.compress(chunk)
692 data = z.compress(chunk)
693 if data:
693 if data:
694 yield data
694 yield data
695
695
696 yield z.flush()
696 yield z.flush()
697
697
698 def decompressorreader(self, fh):
698 def decompressorreader(self, fh):
699 return _ZstdCompressedStreamReader(fh, self._module)
699 return _ZstdCompressedStreamReader(fh, self._module)
700
700
701 class zstdrevlogcompressor:
701 class zstdrevlogcompressor:
702 def __init__(self, zstd, level=3):
702 def __init__(self, zstd, level=3):
703 # TODO consider omitting frame magic to save 4 bytes.
703 # TODO consider omitting frame magic to save 4 bytes.
704 # This writes content sizes into the frame header. That is
704 # This writes content sizes into the frame header. That is
705 # extra storage. But it allows a correct size memory allocation
705 # extra storage. But it allows a correct size memory allocation
706 # to hold the result.
706 # to hold the result.
707 self._cctx = zstd.ZstdCompressor(level=level)
707 self._cctx = zstd.ZstdCompressor(level=level)
708 self._dctx = zstd.ZstdDecompressor()
708 self._dctx = zstd.ZstdDecompressor()
709 self._compinsize = zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE
709 self._compinsize = zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE
710 self._decompinsize = zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE
710 self._decompinsize = zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE
711
711
712 def compress(self, data):
712 def compress(self, data):
713 insize = len(data)
713 insize = len(data)
714 # Caller handles empty input case.
714 # Caller handles empty input case.
715 assert insize > 0
715 assert insize > 0
716
716
717 if insize < 50:
717 if insize < 50:
718 return None
718 return None
719
719
720 elif insize <= 1000000:
720 elif insize <= 1000000:
721 compressed = self._cctx.compress(data)
721 compressed = self._cctx.compress(data)
722 if len(compressed) < insize:
722 if len(compressed) < insize:
723 return compressed
723 return compressed
724 return None
724 return None
725 else:
725 else:
726 z = self._cctx.compressobj()
726 z = self._cctx.compressobj()
727 chunks = []
727 chunks = []
728 pos = 0
728 pos = 0
729 while pos < insize:
729 while pos < insize:
730 pos2 = pos + self._compinsize
730 pos2 = pos + self._compinsize
731 chunk = z.compress(data[pos:pos2])
731 chunk = z.compress(data[pos:pos2])
732 if chunk:
732 if chunk:
733 chunks.append(chunk)
733 chunks.append(chunk)
734 pos = pos2
734 pos = pos2
735 chunks.append(z.flush())
735 chunks.append(z.flush())
736
736
737 if sum(map(len, chunks)) < insize:
737 if sum(map(len, chunks)) < insize:
738 return b''.join(chunks)
738 return b''.join(chunks)
739 return None
739 return None
740
740
741 def decompress(self, data):
741 def decompress(self, data):
742 insize = len(data)
742 insize = len(data)
743
743
744 try:
744 try:
745 # This was measured to be faster than other streaming
745 # This was measured to be faster than other streaming
746 # decompressors.
746 # decompressors.
747 dobj = self._dctx.decompressobj()
747 dobj = self._dctx.decompressobj()
748 chunks = []
748 chunks = []
749 pos = 0
749 pos = 0
750 while pos < insize:
750 while pos < insize:
751 pos2 = pos + self._decompinsize
751 pos2 = pos + self._decompinsize
752 chunk = dobj.decompress(data[pos:pos2])
752 chunk = dobj.decompress(data[pos:pos2])
753 if chunk:
753 if chunk:
754 chunks.append(chunk)
754 chunks.append(chunk)
755 pos = pos2
755 pos = pos2
756 # Frame should be exhausted, so no finish() API.
756 # Frame should be exhausted, so no finish() API.
757
757
758 return b''.join(chunks)
758 return b''.join(chunks)
759 except Exception as e:
759 except Exception as e:
760 raise error.StorageError(
760 raise error.StorageError(
761 _(b'revlog decompress error: %s')
761 _(b'revlog decompress error: %s')
762 % stringutil.forcebytestr(e)
762 % stringutil.forcebytestr(e)
763 )
763 )
764
764
765 def revlogcompressor(self, opts=None):
765 def revlogcompressor(self, opts=None):
766 opts = opts or {}
766 opts = opts or {}
767 level = opts.get(b'zstd.level')
767 level = opts.get(b'zstd.level')
768 if level is None:
768 if level is None:
769 level = opts.get(b'level')
769 level = opts.get(b'level')
770 if level is None:
770 if level is None:
771 level = 3
771 level = 3
772 return self.zstdrevlogcompressor(self._module, level=level)
772 return self.zstdrevlogcompressor(self._module, level=level)
773
773
774
774
775 compengines.register(_zstdengine())
775 compengines.register(_zstdengine())
776
776
777
777
778 def bundlecompressiontopics():
778 def bundlecompressiontopics():
779 """Obtains a list of available bundle compressions for use in help."""
779 """Obtains a list of available bundle compressions for use in help."""
780 # help.makeitemsdocs() expects a dict of names to items with a .__doc__.
780 # help.makeitemsdocs() expects a dict of names to items with a .__doc__.
781 items = {}
781 items = {}
782
782
783 # We need to format the docstring. So use a dummy object/type to hold it
783 # We need to format the docstring. So use a dummy object/type to hold it
784 # rather than mutating the original.
784 # rather than mutating the original.
785 class docobject:
785 class docobject:
786 pass
786 pass
787
787
788 for name in compengines:
788 for name in compengines:
789 engine = compengines[name]
789 engine = compengines[name]
790
790
791 if not engine.available():
791 if not engine.available():
792 continue
792 continue
793
793
794 bt = engine.bundletype()
794 bt = engine.bundletype()
795 if not bt or not bt[0]:
795 if not bt or not bt[0]:
796 continue
796 continue
797
797
798 doc = b'``%s``\n %s' % (bt[0], pycompat.getdoc(engine.bundletype))
798 doc = b'``%s``\n %s' % (bt[0], pycompat.getdoc(engine.bundletype))
799
799
800 value = docobject()
800 value = docobject()
801 value.__doc__ = pycompat.sysstr(doc)
801 value.__doc__ = pycompat.sysstr(doc)
802 value._origdoc = engine.bundletype.__doc__
802 value._origdoc = engine.bundletype.__doc__
803 value._origfunc = engine.bundletype
803 value._origfunc = engine.bundletype
804
804
805 items[bt[0]] = value
805 items[bt[0]] = value
806
806
807 return items
807 return items
808
808
809
809
810 i18nfunctions = bundlecompressiontopics().values()
810 i18nfunctions = bundlecompressiontopics().values()
@@ -1,449 +1,447 b''
1 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
1 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
2 #
2 #
3 # This software may be used and distributed according to the terms of the
3 # This software may be used and distributed according to the terms of the
4 # GNU General Public License version 2 or any later version.
4 # GNU General Public License version 2 or any later version.
5
5
6
6
7 from .node import (
7 from .node import (
8 bin,
8 bin,
9 hex,
9 hex,
10 )
10 )
11 from .i18n import _
11 from .i18n import _
12 from .pycompat import getattr
12 from .pycompat import getattr
13 from .thirdparty import attr
13 from .thirdparty import attr
14 from . import (
14 from . import (
15 error,
15 error,
16 util,
16 util,
17 )
17 )
18 from .interfaces import util as interfaceutil
18 from .interfaces import util as interfaceutil
19 from .utils import compression
19 from .utils import compression
20
20
21 # Names of the SSH protocol implementations.
21 # Names of the SSH protocol implementations.
22 SSHV1 = b'ssh-v1'
22 SSHV1 = b'ssh-v1'
23
23
24 NARROWCAP = b'exp-narrow-1'
24 NARROWCAP = b'exp-narrow-1'
25 ELLIPSESCAP1 = b'exp-ellipses-1'
25 ELLIPSESCAP1 = b'exp-ellipses-1'
26 ELLIPSESCAP = b'exp-ellipses-2'
26 ELLIPSESCAP = b'exp-ellipses-2'
27 SUPPORTED_ELLIPSESCAP = (ELLIPSESCAP1, ELLIPSESCAP)
27 SUPPORTED_ELLIPSESCAP = (ELLIPSESCAP1, ELLIPSESCAP)
28
28
29 # All available wire protocol transports.
29 # All available wire protocol transports.
30 TRANSPORTS = {
30 TRANSPORTS = {
31 SSHV1: {
31 SSHV1: {
32 b'transport': b'ssh',
32 b'transport': b'ssh',
33 b'version': 1,
33 b'version': 1,
34 },
34 },
35 b'http-v1': {
35 b'http-v1': {
36 b'transport': b'http',
36 b'transport': b'http',
37 b'version': 1,
37 b'version': 1,
38 },
38 },
39 }
39 }
40
40
41
41
42 class bytesresponse:
42 class bytesresponse:
43 """A wire protocol response consisting of raw bytes."""
43 """A wire protocol response consisting of raw bytes."""
44
44
45 def __init__(self, data):
45 def __init__(self, data):
46 self.data = data
46 self.data = data
47
47
48
48
49 class ooberror:
49 class ooberror:
50 """wireproto reply: failure of a batch of operation
50 """wireproto reply: failure of a batch of operation
51
51
52 Something failed during a batch call. The error message is stored in
52 Something failed during a batch call. The error message is stored in
53 `self.message`.
53 `self.message`.
54 """
54 """
55
55
56 def __init__(self, message):
56 def __init__(self, message):
57 self.message = message
57 self.message = message
58
58
59
59
60 class pushres:
60 class pushres:
61 """wireproto reply: success with simple integer return
61 """wireproto reply: success with simple integer return
62
62
63 The call was successful and returned an integer contained in `self.res`.
63 The call was successful and returned an integer contained in `self.res`.
64 """
64 """
65
65
66 def __init__(self, res, output):
66 def __init__(self, res, output):
67 self.res = res
67 self.res = res
68 self.output = output
68 self.output = output
69
69
70
70
71 class pusherr:
71 class pusherr:
72 """wireproto reply: failure
72 """wireproto reply: failure
73
73
74 The call failed. The `self.res` attribute contains the error message.
74 The call failed. The `self.res` attribute contains the error message.
75 """
75 """
76
76
77 def __init__(self, res, output):
77 def __init__(self, res, output):
78 self.res = res
78 self.res = res
79 self.output = output
79 self.output = output
80
80
81
81
82 class streamres:
82 class streamres:
83 """wireproto reply: binary stream
83 """wireproto reply: binary stream
84
84
85 The call was successful and the result is a stream.
85 The call was successful and the result is a stream.
86
86
87 Accepts a generator containing chunks of data to be sent to the client.
87 Accepts a generator containing chunks of data to be sent to the client.
88
88
89 ``prefer_uncompressed`` indicates that the data is expected to be
89 ``prefer_uncompressed`` indicates that the data is expected to be
90 uncompressable and that the stream should therefore use the ``none``
90 uncompressable and that the stream should therefore use the ``none``
91 engine.
91 engine.
92 """
92 """
93
93
94 def __init__(self, gen=None, prefer_uncompressed=False):
94 def __init__(self, gen=None, prefer_uncompressed=False):
95 self.gen = gen
95 self.gen = gen
96 self.prefer_uncompressed = prefer_uncompressed
96 self.prefer_uncompressed = prefer_uncompressed
97
97
98
98
99 class streamreslegacy:
99 class streamreslegacy:
100 """wireproto reply: uncompressed binary stream
100 """wireproto reply: uncompressed binary stream
101
101
102 The call was successful and the result is a stream.
102 The call was successful and the result is a stream.
103
103
104 Accepts a generator containing chunks of data to be sent to the client.
104 Accepts a generator containing chunks of data to be sent to the client.
105
105
106 Like ``streamres``, but sends an uncompressed data for "version 1" clients
106 Like ``streamres``, but sends an uncompressed data for "version 1" clients
107 using the application/mercurial-0.1 media type.
107 using the application/mercurial-0.1 media type.
108 """
108 """
109
109
110 def __init__(self, gen=None):
110 def __init__(self, gen=None):
111 self.gen = gen
111 self.gen = gen
112
112
113
113
114 # list of nodes encoding / decoding
114 # list of nodes encoding / decoding
115 def decodelist(l, sep=b' '):
115 def decodelist(l, sep=b' '):
116 if l:
116 if l:
117 return [bin(v) for v in l.split(sep)]
117 return [bin(v) for v in l.split(sep)]
118 return []
118 return []
119
119
120
120
121 def encodelist(l, sep=b' '):
121 def encodelist(l, sep=b' '):
122 try:
122 try:
123 return sep.join(map(hex, l))
123 return sep.join(map(hex, l))
124 except TypeError:
124 except TypeError:
125 raise
125 raise
126
126
127
127
128 # batched call argument encoding
128 # batched call argument encoding
129
129
130
130
131 def escapebatcharg(plain):
131 def escapebatcharg(plain):
132 return (
132 return (
133 plain.replace(b':', b':c')
133 plain.replace(b':', b':c')
134 .replace(b',', b':o')
134 .replace(b',', b':o')
135 .replace(b';', b':s')
135 .replace(b';', b':s')
136 .replace(b'=', b':e')
136 .replace(b'=', b':e')
137 )
137 )
138
138
139
139
140 def unescapebatcharg(escaped):
140 def unescapebatcharg(escaped):
141 return (
141 return (
142 escaped.replace(b':e', b'=')
142 escaped.replace(b':e', b'=')
143 .replace(b':s', b';')
143 .replace(b':s', b';')
144 .replace(b':o', b',')
144 .replace(b':o', b',')
145 .replace(b':c', b':')
145 .replace(b':c', b':')
146 )
146 )
147
147
148
148
149 # mapping of options accepted by getbundle and their types
149 # mapping of options accepted by getbundle and their types
150 #
150 #
151 # Meant to be extended by extensions. It is the extension's responsibility to
151 # Meant to be extended by extensions. It is the extension's responsibility to
152 # ensure such options are properly processed in exchange.getbundle.
152 # ensure such options are properly processed in exchange.getbundle.
153 #
153 #
154 # supported types are:
154 # supported types are:
155 #
155 #
156 # :nodes: list of binary nodes, transmitted as space-separated hex nodes
156 # :nodes: list of binary nodes, transmitted as space-separated hex nodes
157 # :csv: list of values, transmitted as comma-separated values
157 # :csv: list of values, transmitted as comma-separated values
158 # :scsv: set of values, transmitted as comma-separated values
158 # :scsv: set of values, transmitted as comma-separated values
159 # :plain: string with no transformation needed.
159 # :plain: string with no transformation needed.
160 GETBUNDLE_ARGUMENTS = {
160 GETBUNDLE_ARGUMENTS = {
161 b'heads': b'nodes',
161 b'heads': b'nodes',
162 b'bookmarks': b'boolean',
162 b'bookmarks': b'boolean',
163 b'common': b'nodes',
163 b'common': b'nodes',
164 b'obsmarkers': b'boolean',
164 b'obsmarkers': b'boolean',
165 b'phases': b'boolean',
165 b'phases': b'boolean',
166 b'bundlecaps': b'scsv',
166 b'bundlecaps': b'scsv',
167 b'listkeys': b'csv',
167 b'listkeys': b'csv',
168 b'cg': b'boolean',
168 b'cg': b'boolean',
169 b'cbattempted': b'boolean',
169 b'cbattempted': b'boolean',
170 b'stream': b'boolean',
170 b'stream': b'boolean',
171 b'includepats': b'csv',
171 b'includepats': b'csv',
172 b'excludepats': b'csv',
172 b'excludepats': b'csv',
173 }
173 }
174
174
175
175
176 class baseprotocolhandler(interfaceutil.Interface):
176 class baseprotocolhandler(interfaceutil.Interface):
177 """Abstract base class for wire protocol handlers.
177 """Abstract base class for wire protocol handlers.
178
178
179 A wire protocol handler serves as an interface between protocol command
179 A wire protocol handler serves as an interface between protocol command
180 handlers and the wire protocol transport layer. Protocol handlers provide
180 handlers and the wire protocol transport layer. Protocol handlers provide
181 methods to read command arguments, redirect stdio for the duration of
181 methods to read command arguments, redirect stdio for the duration of
182 the request, handle response types, etc.
182 the request, handle response types, etc.
183 """
183 """
184
184
185 name = interfaceutil.Attribute(
185 name = interfaceutil.Attribute(
186 """The name of the protocol implementation.
186 """The name of the protocol implementation.
187
187
188 Used for uniquely identifying the transport type.
188 Used for uniquely identifying the transport type.
189 """
189 """
190 )
190 )
191
191
192 def getargs(args):
192 def getargs(args):
193 """return the value for arguments in <args>
193 """return the value for arguments in <args>
194
194
195 For version 1 transports, returns a list of values in the same
195 For version 1 transports, returns a list of values in the same
196 order they appear in ``args``. For version 2 transports, returns
196 order they appear in ``args``. For version 2 transports, returns
197 a dict mapping argument name to value.
197 a dict mapping argument name to value.
198 """
198 """
199
199
200 def getprotocaps():
200 def getprotocaps():
201 """Returns the list of protocol-level capabilities of client
201 """Returns the list of protocol-level capabilities of client
202
202
203 Returns a list of capabilities as declared by the client for
203 Returns a list of capabilities as declared by the client for
204 the current request (or connection for stateful protocol handlers)."""
204 the current request (or connection for stateful protocol handlers)."""
205
205
206 def getpayload():
206 def getpayload():
207 """Provide a generator for the raw payload.
207 """Provide a generator for the raw payload.
208
208
209 The caller is responsible for ensuring that the full payload is
209 The caller is responsible for ensuring that the full payload is
210 processed.
210 processed.
211 """
211 """
212
212
213 def mayberedirectstdio():
213 def mayberedirectstdio():
214 """Context manager to possibly redirect stdio.
214 """Context manager to possibly redirect stdio.
215
215
216 The context manager yields a file-object like object that receives
216 The context manager yields a file-object like object that receives
217 stdout and stderr output when the context manager is active. Or it
217 stdout and stderr output when the context manager is active. Or it
218 yields ``None`` if no I/O redirection occurs.
218 yields ``None`` if no I/O redirection occurs.
219
219
220 The intent of this context manager is to capture stdio output
220 The intent of this context manager is to capture stdio output
221 so it may be sent in the response. Some transports support streaming
221 so it may be sent in the response. Some transports support streaming
222 stdio to the client in real time. For these transports, stdio output
222 stdio to the client in real time. For these transports, stdio output
223 won't be captured.
223 won't be captured.
224 """
224 """
225
225
226 def client():
226 def client():
227 """Returns a string representation of this client (as bytes)."""
227 """Returns a string representation of this client (as bytes)."""
228
228
229 def addcapabilities(repo, caps):
229 def addcapabilities(repo, caps):
230 """Adds advertised capabilities specific to this protocol.
230 """Adds advertised capabilities specific to this protocol.
231
231
232 Receives the list of capabilities collected so far.
232 Receives the list of capabilities collected so far.
233
233
234 Returns a list of capabilities. The passed in argument can be returned.
234 Returns a list of capabilities. The passed in argument can be returned.
235 """
235 """
236
236
237 def checkperm(perm):
237 def checkperm(perm):
238 """Validate that the client has permissions to perform a request.
238 """Validate that the client has permissions to perform a request.
239
239
240 The argument is the permission required to proceed. If the client
240 The argument is the permission required to proceed. If the client
241 doesn't have that permission, the exception should raise or abort
241 doesn't have that permission, the exception should raise or abort
242 in a protocol specific manner.
242 in a protocol specific manner.
243 """
243 """
244
244
245
245
246 class commandentry:
246 class commandentry:
247 """Represents a declared wire protocol command."""
247 """Represents a declared wire protocol command."""
248
248
249 def __init__(
249 def __init__(
250 self,
250 self,
251 func,
251 func,
252 args=b'',
252 args=b'',
253 transports=None,
253 transports=None,
254 permission=b'push',
254 permission=b'push',
255 cachekeyfn=None,
255 cachekeyfn=None,
256 extracapabilitiesfn=None,
256 extracapabilitiesfn=None,
257 ):
257 ):
258 self.func = func
258 self.func = func
259 self.args = args
259 self.args = args
260 self.transports = transports or set()
260 self.transports = transports or set()
261 self.permission = permission
261 self.permission = permission
262 self.cachekeyfn = cachekeyfn
262 self.cachekeyfn = cachekeyfn
263 self.extracapabilitiesfn = extracapabilitiesfn
263 self.extracapabilitiesfn = extracapabilitiesfn
264
264
265 def _merge(self, func, args):
265 def _merge(self, func, args):
266 """Merge this instance with an incoming 2-tuple.
266 """Merge this instance with an incoming 2-tuple.
267
267
268 This is called when a caller using the old 2-tuple API attempts
268 This is called when a caller using the old 2-tuple API attempts
269 to replace an instance. The incoming values are merged with
269 to replace an instance. The incoming values are merged with
270 data not captured by the 2-tuple and a new instance containing
270 data not captured by the 2-tuple and a new instance containing
271 the union of the two objects is returned.
271 the union of the two objects is returned.
272 """
272 """
273 return commandentry(
273 return commandentry(
274 func,
274 func,
275 args=args,
275 args=args,
276 transports=set(self.transports),
276 transports=set(self.transports),
277 permission=self.permission,
277 permission=self.permission,
278 )
278 )
279
279
280 # Old code treats instances as 2-tuples. So expose that interface.
280 # Old code treats instances as 2-tuples. So expose that interface.
281 def __iter__(self):
281 def __iter__(self):
282 yield self.func
282 yield self.func
283 yield self.args
283 yield self.args
284
284
285 def __getitem__(self, i):
285 def __getitem__(self, i):
286 if i == 0:
286 if i == 0:
287 return self.func
287 return self.func
288 elif i == 1:
288 elif i == 1:
289 return self.args
289 return self.args
290 else:
290 else:
291 raise IndexError(b'can only access elements 0 and 1')
291 raise IndexError(b'can only access elements 0 and 1')
292
292
293
293
294 class commanddict(dict):
294 class commanddict(dict):
295 """Container for registered wire protocol commands.
295 """Container for registered wire protocol commands.
296
296
297 It behaves like a dict. But __setitem__ is overwritten to allow silent
297 It behaves like a dict. But __setitem__ is overwritten to allow silent
298 coercion of values from 2-tuples for API compatibility.
298 coercion of values from 2-tuples for API compatibility.
299 """
299 """
300
300
301 def __setitem__(self, k, v):
301 def __setitem__(self, k, v):
302 if isinstance(v, commandentry):
302 if isinstance(v, commandentry):
303 pass
303 pass
304 # Cast 2-tuples to commandentry instances.
304 # Cast 2-tuples to commandentry instances.
305 elif isinstance(v, tuple):
305 elif isinstance(v, tuple):
306 if len(v) != 2:
306 if len(v) != 2:
307 raise ValueError(b'command tuples must have exactly 2 elements')
307 raise ValueError(b'command tuples must have exactly 2 elements')
308
308
309 # It is common for extensions to wrap wire protocol commands via
309 # It is common for extensions to wrap wire protocol commands via
310 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
310 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
311 # doing this aren't aware of the new API that uses objects to store
311 # doing this aren't aware of the new API that uses objects to store
312 # command entries, we automatically merge old state with new.
312 # command entries, we automatically merge old state with new.
313 if k in self:
313 if k in self:
314 v = self[k]._merge(v[0], v[1])
314 v = self[k]._merge(v[0], v[1])
315 else:
315 else:
316 # Use default values from @wireprotocommand.
316 # Use default values from @wireprotocommand.
317 v = commandentry(
317 v = commandentry(
318 v[0],
318 v[0],
319 args=v[1],
319 args=v[1],
320 transports=set(TRANSPORTS),
320 transports=set(TRANSPORTS),
321 permission=b'push',
321 permission=b'push',
322 )
322 )
323 else:
323 else:
324 raise ValueError(
324 raise ValueError(
325 b'command entries must be commandentry instances '
325 b'command entries must be commandentry instances '
326 b'or 2-tuples'
326 b'or 2-tuples'
327 )
327 )
328
328
329 return super(commanddict, self).__setitem__(k, v)
329 return super(commanddict, self).__setitem__(k, v)
330
330
331 def commandavailable(self, command, proto):
331 def commandavailable(self, command, proto):
332 """Determine if a command is available for the requested protocol."""
332 """Determine if a command is available for the requested protocol."""
333 assert proto.name in TRANSPORTS
333 assert proto.name in TRANSPORTS
334
334
335 entry = self.get(command)
335 entry = self.get(command)
336
336
337 if not entry:
337 if not entry:
338 return False
338 return False
339
339
340 if proto.name not in entry.transports:
340 if proto.name not in entry.transports:
341 return False
341 return False
342
342
343 return True
343 return True
344
344
345
345
346 def supportedcompengines(ui, role):
346 def supportedcompengines(ui, role):
347 """Obtain the list of supported compression engines for a request."""
347 """Obtain the list of supported compression engines for a request."""
348 assert role in (compression.CLIENTROLE, compression.SERVERROLE)
348 assert role in (compression.CLIENTROLE, compression.SERVERROLE)
349
349
350 compengines = compression.compengines.supportedwireengines(role)
350 compengines = compression.compengines.supportedwireengines(role)
351
351
352 # Allow config to override default list and ordering.
352 # Allow config to override default list and ordering.
353 if role == compression.SERVERROLE:
353 if role == compression.SERVERROLE:
354 configengines = ui.configlist(b'server', b'compressionengines')
354 configengines = ui.configlist(b'server', b'compressionengines')
355 config = b'server.compressionengines'
355 config = b'server.compressionengines'
356 else:
356 else:
357 # This is currently implemented mainly to facilitate testing. In most
357 # This is currently implemented mainly to facilitate testing. In most
358 # cases, the server should be in charge of choosing a compression engine
358 # cases, the server should be in charge of choosing a compression engine
359 # because a server has the most to lose from a sub-optimal choice. (e.g.
359 # because a server has the most to lose from a sub-optimal choice. (e.g.
360 # CPU DoS due to an expensive engine or a network DoS due to poor
360 # CPU DoS due to an expensive engine or a network DoS due to poor
361 # compression ratio).
361 # compression ratio).
362 configengines = ui.configlist(
362 configengines = ui.configlist(
363 b'experimental', b'clientcompressionengines'
363 b'experimental', b'clientcompressionengines'
364 )
364 )
365 config = b'experimental.clientcompressionengines'
365 config = b'experimental.clientcompressionengines'
366
366
367 # No explicit config. Filter out the ones that aren't supposed to be
367 # No explicit config. Filter out the ones that aren't supposed to be
368 # advertised and return default ordering.
368 # advertised and return default ordering.
369 if not configengines:
369 if not configengines:
370 attr = (
370 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
371 b'serverpriority' if role == util.SERVERROLE else b'clientpriority'
372 )
373 return [
371 return [
374 e for e in compengines if getattr(e.wireprotosupport(), attr) > 0
372 e for e in compengines if getattr(e.wireprotosupport(), attr) > 0
375 ]
373 ]
376
374
377 # If compression engines are listed in the config, assume there is a good
375 # If compression engines are listed in the config, assume there is a good
378 # reason for it (like server operators wanting to achieve specific
376 # reason for it (like server operators wanting to achieve specific
379 # performance characteristics). So fail fast if the config references
377 # performance characteristics). So fail fast if the config references
380 # unusable compression engines.
378 # unusable compression engines.
381 validnames = {e.name() for e in compengines}
379 validnames = {e.name() for e in compengines}
382 invalidnames = {e for e in configengines if e not in validnames}
380 invalidnames = {e for e in configengines if e not in validnames}
383 if invalidnames:
381 if invalidnames:
384 raise error.Abort(
382 raise error.Abort(
385 _(b'invalid compression engine defined in %s: %s')
383 _(b'invalid compression engine defined in %s: %s')
386 % (config, b', '.join(sorted(invalidnames)))
384 % (config, b', '.join(sorted(invalidnames)))
387 )
385 )
388
386
389 compengines = [e for e in compengines if e.name() in configengines]
387 compengines = [e for e in compengines if e.name() in configengines]
390 compengines = sorted(
388 compengines = sorted(
391 compengines, key=lambda e: configengines.index(e.name())
389 compengines, key=lambda e: configengines.index(e.name())
392 )
390 )
393
391
394 if not compengines:
392 if not compengines:
395 raise error.Abort(
393 raise error.Abort(
396 _(
394 _(
397 b'%s config option does not specify any known '
395 b'%s config option does not specify any known '
398 b'compression engines'
396 b'compression engines'
399 )
397 )
400 % config,
398 % config,
401 hint=_(b'usable compression engines: %s')
399 hint=_(b'usable compression engines: %s')
402 % b', '.sorted(validnames), # pytype: disable=attribute-error
400 % b', '.sorted(validnames), # pytype: disable=attribute-error
403 )
401 )
404
402
405 return compengines
403 return compengines
406
404
407
405
408 @attr.s
406 @attr.s
409 class encodedresponse:
407 class encodedresponse:
410 """Represents response data that is already content encoded.
408 """Represents response data that is already content encoded.
411
409
412 Wire protocol version 2 only.
410 Wire protocol version 2 only.
413
411
414 Commands typically emit Python objects that are encoded and sent over the
412 Commands typically emit Python objects that are encoded and sent over the
415 wire. If commands emit an object of this type, the encoding step is bypassed
413 wire. If commands emit an object of this type, the encoding step is bypassed
416 and the content from this object is used instead.
414 and the content from this object is used instead.
417 """
415 """
418
416
419 data = attr.ib()
417 data = attr.ib()
420
418
421
419
422 @attr.s
420 @attr.s
423 class alternatelocationresponse:
421 class alternatelocationresponse:
424 """Represents a response available at an alternate location.
422 """Represents a response available at an alternate location.
425
423
426 Instances are sent in place of actual response objects when the server
424 Instances are sent in place of actual response objects when the server
427 is sending a "content redirect" response.
425 is sending a "content redirect" response.
428
426
429 Only compatible with wire protocol version 2.
427 Only compatible with wire protocol version 2.
430 """
428 """
431
429
432 url = attr.ib()
430 url = attr.ib()
433 mediatype = attr.ib()
431 mediatype = attr.ib()
434 size = attr.ib(default=None)
432 size = attr.ib(default=None)
435 fullhashes = attr.ib(default=None)
433 fullhashes = attr.ib(default=None)
436 fullhashseed = attr.ib(default=None)
434 fullhashseed = attr.ib(default=None)
437 serverdercerts = attr.ib(default=None)
435 serverdercerts = attr.ib(default=None)
438 servercadercerts = attr.ib(default=None)
436 servercadercerts = attr.ib(default=None)
439
437
440
438
441 @attr.s
439 @attr.s
442 class indefinitebytestringresponse:
440 class indefinitebytestringresponse:
443 """Represents an object to be encoded to an indefinite length bytestring.
441 """Represents an object to be encoded to an indefinite length bytestring.
444
442
445 Instances are initialized from an iterable of chunks, with each chunk being
443 Instances are initialized from an iterable of chunks, with each chunk being
446 a bytes instance.
444 a bytes instance.
447 """
445 """
448
446
449 chunks = attr.ib()
447 chunks = attr.ib()
General Comments 0
You need to be logged in to leave comments. Login now