##// END OF EJS Templates
ssh: avoid reading beyond the end of stream when using compression...
Joerg Sonnenberger -
r38735:27391d74 default
parent child Browse files
Show More
@@ -99,6 +99,17 b' class doublepipe(object):'
99 _forwardoutput(self._ui, self._side)
99 _forwardoutput(self._ui, self._side)
100 return r
100 return r
101
101
102 def unbufferedread(self, size):
103 r = self._call('unbufferedread', size)
104 if size != 0 and not r:
105 # We've observed a condition that indicates the
106 # stdout closed unexpectedly. Check stderr one
107 # more time and snag anything that's there before
108 # letting anyone know the main part of the pipe
109 # closed prematurely.
110 _forwardoutput(self._ui, self._side)
111 return r
112
102 def readline(self):
113 def readline(self):
103 return self._call('readline')
114 return self._call('readline')
104
115
@@ -322,6 +322,11 b' class bufferedinputpipe(object):'
322 self._fillbuffer()
322 self._fillbuffer()
323 return self._frombuffer(size)
323 return self._frombuffer(size)
324
324
325 def unbufferedread(self, size):
326 if not self._eof and self._lenbuf == 0:
327 self._fillbuffer(max(size, _chunksize))
328 return self._frombuffer(min(self._lenbuf, size))
329
325 def readline(self, *args, **kwargs):
330 def readline(self, *args, **kwargs):
326 if 1 < len(self._buffer):
331 if 1 < len(self._buffer):
327 # this should not happen because both read and readline end with a
332 # this should not happen because both read and readline end with a
@@ -363,9 +368,9 b' class bufferedinputpipe(object):'
363 self._lenbuf = 0
368 self._lenbuf = 0
364 return data
369 return data
365
370
366 def _fillbuffer(self):
371 def _fillbuffer(self, size=_chunksize):
367 """read data to the buffer"""
372 """read data to the buffer"""
368 data = os.read(self._input.fileno(), _chunksize)
373 data = os.read(self._input.fileno(), size)
369 if not data:
374 if not data:
370 self._eof = True
375 self._eof = True
371 else:
376 else:
@@ -3302,6 +3307,104 b' class compressionengine(object):'
3302 """
3307 """
3303 raise NotImplementedError()
3308 raise NotImplementedError()
3304
3309
3310 class _CompressedStreamReader(object):
3311 def __init__(self, fh):
3312 if safehasattr(fh, 'unbufferedread'):
3313 self._reader = fh.unbufferedread
3314 else:
3315 self._reader = fh.read
3316 self._pending = []
3317 self._pos = 0
3318 self._eof = False
3319
3320 def _decompress(self, chunk):
3321 raise NotImplementedError()
3322
3323 def read(self, l):
3324 buf = []
3325 while True:
3326 while self._pending:
3327 if len(self._pending[0]) > l + self._pos:
3328 newbuf = self._pending[0]
3329 buf.append(newbuf[self._pos:self._pos + l])
3330 self._pos += l
3331 return ''.join(buf)
3332
3333 newbuf = self._pending.pop(0)
3334 if self._pos:
3335 buf.append(newbuf[self._pos:])
3336 l -= len(newbuf) - self._pos
3337 else:
3338 buf.append(newbuf)
3339 l -= len(newbuf)
3340 self._pos = 0
3341
3342 if self._eof:
3343 return ''.join(buf)
3344 chunk = self._reader(65536)
3345 self._decompress(chunk)
3346
3347 class _GzipCompressedStreamReader(_CompressedStreamReader):
3348 def __init__(self, fh):
3349 super(_GzipCompressedStreamReader, self).__init__(fh)
3350 self._decompobj = zlib.decompressobj()
3351 def _decompress(self, chunk):
3352 newbuf = self._decompobj.decompress(chunk)
3353 if newbuf:
3354 self._pending.append(newbuf)
3355 d = self._decompobj.copy()
3356 try:
3357 d.decompress('x')
3358 d.flush()
3359 if d.unused_data == 'x':
3360 self._eof = True
3361 except zlib.error:
3362 pass
3363
3364 class _BZ2CompressedStreamReader(_CompressedStreamReader):
3365 def __init__(self, fh):
3366 super(_BZ2CompressedStreamReader, self).__init__(fh)
3367 self._decompobj = bz2.BZ2Decompressor()
3368 def _decompress(self, chunk):
3369 newbuf = self._decompobj.decompress(chunk)
3370 if newbuf:
3371 self._pending.append(newbuf)
3372 try:
3373 while True:
3374 newbuf = self._decompobj.decompress('')
3375 if newbuf:
3376 self._pending.append(newbuf)
3377 else:
3378 break
3379 except EOFError:
3380 self._eof = True
3381
3382 class _TruncatedBZ2CompressedStreamReader(_BZ2CompressedStreamReader):
3383 def __init__(self, fh):
3384 super(_TruncatedBZ2CompressedStreamReader, self).__init__(fh)
3385 newbuf = self._decompobj.decompress('BZ')
3386 if newbuf:
3387 self._pending.append(newbuf)
3388
3389 class _ZstdCompressedStreamReader(_CompressedStreamReader):
3390 def __init__(self, fh, zstd):
3391 super(_ZstdCompressedStreamReader, self).__init__(fh)
3392 self._zstd = zstd
3393 self._decompobj = zstd.ZstdDecompressor().decompressobj()
3394 def _decompress(self, chunk):
3395 newbuf = self._decompobj.decompress(chunk)
3396 if newbuf:
3397 self._pending.append(newbuf)
3398 try:
3399 while True:
3400 newbuf = self._decompobj.decompress('')
3401 if newbuf:
3402 self._pending.append(newbuf)
3403 else:
3404 break
3405 except self._zstd.ZstdError:
3406 self._eof = True
3407
3305 class _zlibengine(compressionengine):
3408 class _zlibengine(compressionengine):
3306 def name(self):
3409 def name(self):
3307 return 'zlib'
3410 return 'zlib'
@@ -3335,15 +3438,7 b' class _zlibengine(compressionengine):'
3335 yield z.flush()
3438 yield z.flush()
3336
3439
3337 def decompressorreader(self, fh):
3440 def decompressorreader(self, fh):
3338 def gen():
3441 return _GzipCompressedStreamReader(fh)
3339 d = zlib.decompressobj()
3340 for chunk in filechunkiter(fh):
3341 while chunk:
3342 # Limit output size to limit memory.
3343 yield d.decompress(chunk, 2 ** 18)
3344 chunk = d.unconsumed_tail
3345
3346 return chunkbuffer(gen())
3347
3442
3348 class zlibrevlogcompressor(object):
3443 class zlibrevlogcompressor(object):
3349 def compress(self, data):
3444 def compress(self, data):
@@ -3423,12 +3518,7 b' class _bz2engine(compressionengine):'
3423 yield z.flush()
3518 yield z.flush()
3424
3519
3425 def decompressorreader(self, fh):
3520 def decompressorreader(self, fh):
3426 def gen():
3521 return _BZ2CompressedStreamReader(fh)
3427 d = bz2.BZ2Decompressor()
3428 for chunk in filechunkiter(fh):
3429 yield d.decompress(chunk)
3430
3431 return chunkbuffer(gen())
3432
3522
3433 compengines.register(_bz2engine())
3523 compengines.register(_bz2engine())
3434
3524
@@ -3442,14 +3532,7 b' class _truncatedbz2engine(compressioneng'
3442 # We don't implement compressstream because it is hackily handled elsewhere.
3532 # We don't implement compressstream because it is hackily handled elsewhere.
3443
3533
3444 def decompressorreader(self, fh):
3534 def decompressorreader(self, fh):
3445 def gen():
3535 return _TruncatedBZ2CompressedStreamReader(fh)
3446 # The input stream doesn't have the 'BZ' header. So add it back.
3447 d = bz2.BZ2Decompressor()
3448 d.decompress('BZ')
3449 for chunk in filechunkiter(fh):
3450 yield d.decompress(chunk)
3451
3452 return chunkbuffer(gen())
3453
3536
3454 compengines.register(_truncatedbz2engine())
3537 compengines.register(_truncatedbz2engine())
3455
3538
@@ -3544,9 +3627,7 b' class _zstdengine(compressionengine):'
3544 yield z.flush()
3627 yield z.flush()
3545
3628
3546 def decompressorreader(self, fh):
3629 def decompressorreader(self, fh):
3547 zstd = self._module
3630 return _ZstdCompressedStreamReader(fh, self._module)
3548 dctx = zstd.ZstdDecompressor()
3549 return chunkbuffer(dctx.read_from(fh))
3550
3631
3551 class zstdrevlogcompressor(object):
3632 class zstdrevlogcompressor(object):
3552 def __init__(self, zstd, level=3):
3633 def __init__(self, zstd, level=3):
General Comments 0
You need to be logged in to leave comments. Login now