##// END OF EJS Templates
cborutil: remove readindefinitebytestringtoiter()...
Gregory Szorc -
r39449:a40d3da8 default
parent child Browse files
Show More
@@ -10,10 +10,6 b' from __future__ import absolute_import'
10 import struct
10 import struct
11 import sys
11 import sys
12
12
13 from ..thirdparty.cbor.cbor2 import (
14 decoder as decodermod,
15 )
16
17 # Very short very of RFC 7049...
13 # Very short very of RFC 7049...
18 #
14 #
19 # Each item begins with a byte. The 3 high bits of that byte denote the
15 # Each item begins with a byte. The 3 high bits of that byte denote the
@@ -219,54 +215,6 b' def streamencode(v):'
219
215
220 return fn(v)
216 return fn(v)
221
217
222 def readindefinitebytestringtoiter(fh, expectheader=True):
223 """Read an indefinite bytestring to a generator.
224
225 Receives an object with a ``read(X)`` method to read N bytes.
226
227 If ``expectheader`` is True, it is expected that the first byte read
228 will represent an indefinite length bytestring. Otherwise, we
229 expect the first byte to be part of the first bytestring chunk.
230 """
231 read = fh.read
232 decodeuint = decodermod.decode_uint
233 byteasinteger = decodermod.byte_as_integer
234
235 if expectheader:
236 initial = decodermod.byte_as_integer(read(1))
237
238 majortype = initial >> 5
239 subtype = initial & SUBTYPE_MASK
240
241 if majortype != MAJOR_TYPE_BYTESTRING:
242 raise decodermod.CBORDecodeError(
243 'expected major type %d; got %d' % (MAJOR_TYPE_BYTESTRING,
244 majortype))
245
246 if subtype != SUBTYPE_INDEFINITE:
247 raise decodermod.CBORDecodeError(
248 'expected indefinite subtype; got %d' % subtype)
249
250 # The indefinite bytestring is composed of chunks of normal bytestrings.
251 # Read chunks until we hit a BREAK byte.
252
253 while True:
254 # We need to sniff for the BREAK byte.
255 initial = byteasinteger(read(1))
256
257 if initial == BREAK_INT:
258 break
259
260 length = decodeuint(fh, initial & SUBTYPE_MASK)
261 chunk = read(length)
262
263 if len(chunk) != length:
264 raise decodermod.CBORDecodeError(
265 'failed to read bytestring chunk: got %d bytes; expected %d' % (
266 len(chunk), length))
267
268 yield chunk
269
270 class CBORDecodeError(Exception):
218 class CBORDecodeError(Exception):
271 """Represents an error decoding CBOR."""
219 """Represents an error decoding CBOR."""
272
220
@@ -1,6 +1,5 b''
1 from __future__ import absolute_import
1 from __future__ import absolute_import
2
2
3 import io
4 import unittest
3 import unittest
5
4
6 from mercurial.thirdparty import (
5 from mercurial.thirdparty import (
@@ -118,16 +117,6 b' class BytestringTests(TestCase):'
118 self.assertTrue(b[0].isfirst)
117 self.assertTrue(b[0].isfirst)
119 self.assertTrue(b[0].islast)
118 self.assertTrue(b[0].islast)
120
119
121 def testreadtoiter(self):
122 source = io.BytesIO(b'\x5f\x44\xaa\xbb\xcc\xdd\x43\xee\xff\x99\xff')
123
124 it = cborutil.readindefinitebytestringtoiter(source)
125 self.assertEqual(next(it), b'\xaa\xbb\xcc\xdd')
126 self.assertEqual(next(it), b'\xee\xff\x99')
127
128 with self.assertRaises(StopIteration):
129 next(it)
130
131 def testdecodevariouslengths(self):
120 def testdecodevariouslengths(self):
132 for i in (0, 1, 22, 23, 24, 25, 254, 255, 256, 65534, 65535, 65536):
121 for i in (0, 1, 22, 23, 24, 25, 254, 255, 256, 65534, 65535, 65536):
133 source = b'x' * i
122 source = b'x' * i
General Comments 0
You need to be logged in to leave comments. Login now