##// END OF EJS Templates
phabricator: make user searches case-insensitive...
phabricator: make user searches case-insensitive User names in conduit are case insensitive, but when looking for "FOO" it would return "foo" instead and we'd think the user didn't exist. So lower case both the query and the response when comparing them. Differential Revision: https://phab.mercurial-scm.org/D5934

File last commit:

r41690:1ea1bba1 default
r41854:570e62f1 default
Show More
test-cbor.py
1001 lines | 39.6 KiB | text/x-python | PythonLexer
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729 from __future__ import absolute_import
Augie Fackler
tests: get access to thirdparty.cbor without requiring it to be installed...
r41195 import os
import sys
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729 import unittest
Augie Fackler
tests: get access to thirdparty.cbor without requiring it to be installed...
r41195 # TODO migrate to canned cbor test strings and stop using thirdparty.cbor
tpp = os.path.normpath(os.path.join(os.path.dirname(__file__),
'..', 'mercurial', 'thirdparty'))
if not os.path.exists(tpp):
# skip, not in a repo
sys.exit(80)
sys.path[0:0] = [tpp]
import cbor
del sys.path[0]
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729 from mercurial.utils import (
cborutil,
)
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 class TestCase(unittest.TestCase):
if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
# Python 3.7 deprecates the regex*p* version, but 2.7 lacks
# the regex version.
assertRaisesRegex = (# camelcase-required
unittest.TestCase.assertRaisesRegexp)
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729 def loadit(it):
return cbor.loads(b''.join(it))
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 class BytestringTests(TestCase):
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729 def testsimple(self):
self.assertEqual(
list(cborutil.streamencode(b'foobar')),
[b'\x46', b'foobar'])
self.assertEqual(
loadit(cborutil.streamencode(b'foobar')),
b'foobar')
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 self.assertEqual(cborutil.decodeall(b'\x46foobar'),
[b'foobar'])
self.assertEqual(cborutil.decodeall(b'\x46foobar\x45fizbi'),
[b'foobar', b'fizbi'])
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729 def testlong(self):
source = b'x' * 1048576
self.assertEqual(loadit(cborutil.streamencode(source)), source)
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeall(encoded), [source])
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729 def testfromiter(self):
# This is the example from RFC 7049 Section 2.2.2.
source = [b'\xaa\xbb\xcc\xdd', b'\xee\xff\x99']
self.assertEqual(
list(cborutil.streamencodebytestringfromiter(source)),
[
b'\x5f',
b'\x44',
b'\xaa\xbb\xcc\xdd',
b'\x43',
b'\xee\xff\x99',
b'\xff',
])
self.assertEqual(
loadit(cborutil.streamencodebytestringfromiter(source)),
b''.join(source))
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 self.assertEqual(cborutil.decodeall(b'\x5f\x44\xaa\xbb\xcc\xdd'
b'\x43\xee\xff\x99\xff'),
[b'\xaa\xbb\xcc\xdd', b'\xee\xff\x99', b''])
for i, chunk in enumerate(
cborutil.decodeall(b'\x5f\x44\xaa\xbb\xcc\xdd'
b'\x43\xee\xff\x99\xff')):
self.assertIsInstance(chunk, cborutil.bytestringchunk)
if i == 0:
self.assertTrue(chunk.isfirst)
else:
self.assertFalse(chunk.isfirst)
if i == 2:
self.assertTrue(chunk.islast)
else:
self.assertFalse(chunk.islast)
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729 def testfromiterlarge(self):
source = [b'a' * 16, b'b' * 128, b'c' * 1024, b'd' * 1048576]
self.assertEqual(
loadit(cborutil.streamencodebytestringfromiter(source)),
b''.join(source))
def testindefinite(self):
source = b'\x00\x01\x02\x03' + b'\xff' * 16384
it = cborutil.streamencodeindefinitebytestring(source, chunksize=2)
self.assertEqual(next(it), b'\x5f')
self.assertEqual(next(it), b'\x42')
self.assertEqual(next(it), b'\x00\x01')
self.assertEqual(next(it), b'\x42')
self.assertEqual(next(it), b'\x02\x03')
self.assertEqual(next(it), b'\x42')
self.assertEqual(next(it), b'\xff\xff')
dest = b''.join(cborutil.streamencodeindefinitebytestring(
source, chunksize=42))
Augie Fackler
tests: port test-cbor.py to Python 3...
r37916 self.assertEqual(cbor.loads(dest), source)
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 self.assertEqual(b''.join(cborutil.decodeall(dest)), source)
for chunk in cborutil.decodeall(dest):
self.assertIsInstance(chunk, cborutil.bytestringchunk)
self.assertIn(len(chunk), (0, 8, 42))
encoded = b'\x5f\xff'
b = cborutil.decodeall(encoded)
self.assertEqual(b, [b''])
self.assertTrue(b[0].isfirst)
self.assertTrue(b[0].islast)
def testdecodevariouslengths(self):
for i in (0, 1, 22, 23, 24, 25, 254, 255, 256, 65534, 65535, 65536):
source = b'x' * i
encoded = b''.join(cborutil.streamencode(source))
if len(source) < 24:
hlen = 1
elif len(source) < 256:
hlen = 2
elif len(source) < 65536:
hlen = 3
elif len(source) < 1048576:
hlen = 5
self.assertEqual(cborutil.decodeitem(encoded),
(True, source, hlen + len(source),
cborutil.SPECIAL_NONE))
def testpartialdecode(self):
encoded = b''.join(cborutil.streamencode(b'foobar'))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -6, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(False, None, -5, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:3]),
(False, None, -4, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:4]),
(False, None, -3, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:5]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:6]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:7]),
(True, b'foobar', 7, cborutil.SPECIAL_NONE))
def testpartialdecodevariouslengths(self):
lens = [
2,
3,
10,
23,
24,
25,
31,
100,
254,
255,
256,
257,
16384,
65534,
65535,
65536,
65537,
131071,
131072,
131073,
1048575,
1048576,
1048577,
]
for size in lens:
if size < 24:
hlen = 1
elif size < 2**8:
hlen = 2
elif size < 2**16:
hlen = 3
elif size < 2**32:
hlen = 5
else:
assert False
source = b'x' * size
encoded = b''.join(cborutil.streamencode(source))
res = cborutil.decodeitem(encoded[0:1])
if hlen > 1:
self.assertEqual(res, (False, None, -(hlen - 1),
cborutil.SPECIAL_NONE))
else:
self.assertEqual(res, (False, None, -(size + hlen - 1),
cborutil.SPECIAL_NONE))
# Decoding partial header reports remaining header size.
for i in range(hlen - 1):
self.assertEqual(cborutil.decodeitem(encoded[0:i + 1]),
(False, None, -(hlen - i - 1),
cborutil.SPECIAL_NONE))
# Decoding complete header reports item size.
self.assertEqual(cborutil.decodeitem(encoded[0:hlen]),
(False, None, -size, cborutil.SPECIAL_NONE))
# Decoding single byte after header reports item size - 1
self.assertEqual(cborutil.decodeitem(encoded[0:hlen + 1]),
(False, None, -(size - 1), cborutil.SPECIAL_NONE))
# Decoding all but the last byte reports -1 needed.
self.assertEqual(cborutil.decodeitem(encoded[0:hlen + size - 1]),
(False, None, -1, cborutil.SPECIAL_NONE))
# Decoding last byte retrieves value.
self.assertEqual(cborutil.decodeitem(encoded[0:hlen + size]),
(True, source, hlen + size, cborutil.SPECIAL_NONE))
def testindefinitepartialdecode(self):
encoded = b''.join(cborutil.streamencodebytestringfromiter(
[b'foobar', b'biz']))
# First item should be begin of bytestring special.
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(True, None, 1,
cborutil.SPECIAL_START_INDEFINITE_BYTESTRING))
# Second item should be the first chunk. But only available when
# we give it 7 bytes (1 byte header + 6 byte chunk).
self.assertEqual(cborutil.decodeitem(encoded[1:2]),
(False, None, -6, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[1:3]),
(False, None, -5, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[1:4]),
(False, None, -4, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[1:5]),
(False, None, -3, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[1:6]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[1:7]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[1:8]),
(True, b'foobar', 7, cborutil.SPECIAL_NONE))
# Third item should be second chunk. But only available when
# we give it 4 bytes (1 byte header + 3 byte chunk).
self.assertEqual(cborutil.decodeitem(encoded[8:9]),
(False, None, -3, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[8:10]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[8:11]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[8:12]),
(True, b'biz', 4, cborutil.SPECIAL_NONE))
# Fourth item should be end of indefinite stream marker.
self.assertEqual(cborutil.decodeitem(encoded[12:13]),
(True, None, 1, cborutil.SPECIAL_INDEFINITE_BREAK))
# Now test the behavior when going through the decoder.
self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:1]),
(False, 1, 0))
self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:2]),
(False, 1, 6))
self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:3]),
(False, 1, 5))
self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:4]),
(False, 1, 4))
self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:5]),
(False, 1, 3))
self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:6]),
(False, 1, 2))
self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:7]),
(False, 1, 1))
self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:8]),
(True, 8, 0))
self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:9]),
(True, 8, 3))
self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:10]),
(True, 8, 2))
self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:11]),
(True, 8, 1))
self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:12]),
(True, 12, 0))
self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:13]),
(True, 13, 0))
decoder = cborutil.sansiodecoder()
decoder.decode(encoded[0:8])
values = decoder.getavailable()
self.assertEqual(values, [b'foobar'])
self.assertTrue(values[0].isfirst)
self.assertFalse(values[0].islast)
self.assertEqual(decoder.decode(encoded[8:12]),
(True, 4, 0))
values = decoder.getavailable()
self.assertEqual(values, [b'biz'])
self.assertFalse(values[0].isfirst)
self.assertFalse(values[0].islast)
self.assertEqual(decoder.decode(encoded[12:]),
(True, 1, 0))
values = decoder.getavailable()
self.assertEqual(values, [b''])
self.assertFalse(values[0].isfirst)
self.assertTrue(values[0].islast)
class StringTests(TestCase):
def testdecodeforbidden(self):
encoded = b'\x63foo'
with self.assertRaisesRegex(cborutil.CBORDecodeError,
'string major type not supported'):
cborutil.decodeall(encoded)
class IntTests(TestCase):
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729 def testsmall(self):
self.assertEqual(list(cborutil.streamencode(0)), [b'\x00'])
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 self.assertEqual(cborutil.decodeall(b'\x00'), [0])
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729 self.assertEqual(list(cborutil.streamencode(1)), [b'\x01'])
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 self.assertEqual(cborutil.decodeall(b'\x01'), [1])
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729 self.assertEqual(list(cborutil.streamencode(2)), [b'\x02'])
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 self.assertEqual(cborutil.decodeall(b'\x02'), [2])
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729 self.assertEqual(list(cborutil.streamencode(3)), [b'\x03'])
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 self.assertEqual(cborutil.decodeall(b'\x03'), [3])
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729 self.assertEqual(list(cborutil.streamencode(4)), [b'\x04'])
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 self.assertEqual(cborutil.decodeall(b'\x04'), [4])
# Multiple value decode works.
self.assertEqual(cborutil.decodeall(b'\x00\x01\x02\x03\x04'),
[0, 1, 2, 3, 4])
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729
def testnegativesmall(self):
self.assertEqual(list(cborutil.streamencode(-1)), [b'\x20'])
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 self.assertEqual(cborutil.decodeall(b'\x20'), [-1])
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729 self.assertEqual(list(cborutil.streamencode(-2)), [b'\x21'])
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 self.assertEqual(cborutil.decodeall(b'\x21'), [-2])
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729 self.assertEqual(list(cborutil.streamencode(-3)), [b'\x22'])
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 self.assertEqual(cborutil.decodeall(b'\x22'), [-3])
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729 self.assertEqual(list(cborutil.streamencode(-4)), [b'\x23'])
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 self.assertEqual(cborutil.decodeall(b'\x23'), [-4])
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729 self.assertEqual(list(cborutil.streamencode(-5)), [b'\x24'])
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 self.assertEqual(cborutil.decodeall(b'\x24'), [-5])
# Multiple value decode works.
self.assertEqual(cborutil.decodeall(b'\x20\x21\x22\x23\x24'),
[-1, -2, -3, -4, -5])
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729
def testrange(self):
for i in range(-70000, 70000, 10):
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 encoded = b''.join(cborutil.streamencode(i))
self.assertEqual(encoded, cbor.dumps(i))
self.assertEqual(cborutil.decodeall(encoded), [i])
def testdecodepartialubyte(self):
encoded = b''.join(cborutil.streamencode(250))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(True, 250, 2, cborutil.SPECIAL_NONE))
def testdecodepartialbyte(self):
encoded = b''.join(cborutil.streamencode(-42))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(True, -42, 2, cborutil.SPECIAL_NONE))
def testdecodepartialushort(self):
encoded = b''.join(cborutil.streamencode(2**15))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:5]),
(True, 2**15, 3, cborutil.SPECIAL_NONE))
def testdecodepartialshort(self):
encoded = b''.join(cborutil.streamencode(-1024))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:3]),
(True, -1024, 3, cborutil.SPECIAL_NONE))
def testdecodepartialulong(self):
encoded = b''.join(cborutil.streamencode(2**28))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -4, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(False, None, -3, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:3]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:4]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:5]),
(True, 2**28, 5, cborutil.SPECIAL_NONE))
def testdecodepartiallong(self):
encoded = b''.join(cborutil.streamencode(-1048580))
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -4, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(False, None, -3, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:3]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:4]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:5]),
(True, -1048580, 5, cborutil.SPECIAL_NONE))
def testdecodepartialulonglong(self):
encoded = b''.join(cborutil.streamencode(2**32))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -8, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(False, None, -7, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:3]),
(False, None, -6, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:4]),
(False, None, -5, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:5]),
(False, None, -4, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:6]),
(False, None, -3, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:7]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:8]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:9]),
(True, 2**32, 9, cborutil.SPECIAL_NONE))
with self.assertRaisesRegex(
cborutil.CBORDecodeError, 'input data not fully consumed'):
cborutil.decodeall(encoded[0:1])
with self.assertRaisesRegex(
cborutil.CBORDecodeError, 'input data not fully consumed'):
cborutil.decodeall(encoded[0:2])
def testdecodepartiallonglong(self):
encoded = b''.join(cborutil.streamencode(-7000000000))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -8, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(False, None, -7, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:3]),
(False, None, -6, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:4]),
(False, None, -5, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:5]),
(False, None, -4, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:6]),
(False, None, -3, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:7]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:8]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:9]),
(True, -7000000000, 9, cborutil.SPECIAL_NONE))
class ArrayTests(TestCase):
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729 def testempty(self):
self.assertEqual(list(cborutil.streamencode([])), [b'\x80'])
self.assertEqual(loadit(cborutil.streamencode([])), [])
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 self.assertEqual(cborutil.decodeall(b'\x80'), [[]])
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729 def testbasic(self):
source = [b'foo', b'bar', 1, -10]
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 chunks = [
b'\x84', b'\x43', b'foo', b'\x43', b'bar', b'\x01', b'\x29']
self.assertEqual(list(cborutil.streamencode(source)), chunks)
self.assertEqual(cborutil.decodeall(b''.join(chunks)), [source])
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729
def testemptyfromiter(self):
self.assertEqual(b''.join(cborutil.streamencodearrayfromiter([])),
b'\x9f\xff')
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 with self.assertRaisesRegex(cborutil.CBORDecodeError,
'indefinite length uint not allowed'):
cborutil.decodeall(b'\x9f\xff')
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729 def testfromiter1(self):
source = [b'foo']
self.assertEqual(list(cborutil.streamencodearrayfromiter(source)), [
b'\x9f',
b'\x43', b'foo',
b'\xff',
])
dest = b''.join(cborutil.streamencodearrayfromiter(source))
self.assertEqual(cbor.loads(dest), source)
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 with self.assertRaisesRegex(cborutil.CBORDecodeError,
'indefinite length uint not allowed'):
cborutil.decodeall(dest)
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729 def testtuple(self):
source = (b'foo', None, 42)
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 encoded = b''.join(cborutil.streamencode(source))
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 self.assertEqual(cbor.loads(encoded), list(source))
self.assertEqual(cborutil.decodeall(encoded), [list(source)])
def testpartialdecode(self):
source = list(range(4))
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(True, 4, 1, cborutil.SPECIAL_START_ARRAY))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(True, 4, 1, cborutil.SPECIAL_START_ARRAY))
source = list(range(23))
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(True, 23, 1, cborutil.SPECIAL_START_ARRAY))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(True, 23, 1, cborutil.SPECIAL_START_ARRAY))
source = list(range(24))
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(True, 24, 2, cborutil.SPECIAL_START_ARRAY))
self.assertEqual(cborutil.decodeitem(encoded[0:3]),
(True, 24, 2, cborutil.SPECIAL_START_ARRAY))
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 source = list(range(256))
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:3]),
(True, 256, 3, cborutil.SPECIAL_START_ARRAY))
self.assertEqual(cborutil.decodeitem(encoded[0:4]),
(True, 256, 3, cborutil.SPECIAL_START_ARRAY))
def testnested(self):
source = [[], [], [[], [], []]]
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeall(encoded), [source])
source = [True, None, [True, 0, 2], [None], [], [[[]], -87]]
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeall(encoded), [source])
# A set within an array.
source = [None, {b'foo', b'bar', None, False}, set()]
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeall(encoded), [source])
# A map within an array.
source = [None, {}, {b'foo': b'bar', True: False}, [{}]]
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeall(encoded), [source])
def testindefinitebytestringvalues(self):
# Single value array whose value is an empty indefinite bytestring.
encoded = b'\x81\x5f\x40\xff'
with self.assertRaisesRegex(cborutil.CBORDecodeError,
'indefinite length bytestrings not '
'allowed as array values'):
cborutil.decodeall(encoded)
class SetTests(TestCase):
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729 def testempty(self):
self.assertEqual(list(cborutil.streamencode(set())), [
b'\xd9\x01\x02',
b'\x80',
])
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 self.assertEqual(cborutil.decodeall(b'\xd9\x01\x02\x80'), [set()])
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729 def testset(self):
source = {b'foo', None, 42}
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 encoded = b''.join(cborutil.streamencode(source))
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 self.assertEqual(cbor.loads(encoded), source)
self.assertEqual(cborutil.decodeall(encoded), [source])
def testinvalidtag(self):
# Must use array to encode sets.
encoded = b'\xd9\x01\x02\xa0'
with self.assertRaisesRegex(cborutil.CBORDecodeError,
'expected array after finite set '
'semantic tag'):
cborutil.decodeall(encoded)
def testpartialdecode(self):
# Semantic tag item will be 3 bytes. Set header will be variable
# depending on length.
encoded = b''.join(cborutil.streamencode({i for i in range(23)}))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:3]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:4]),
(True, 23, 4, cborutil.SPECIAL_START_SET))
self.assertEqual(cborutil.decodeitem(encoded[0:5]),
(True, 23, 4, cborutil.SPECIAL_START_SET))
encoded = b''.join(cborutil.streamencode({i for i in range(24)}))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:3]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:4]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:5]),
(True, 24, 5, cborutil.SPECIAL_START_SET))
self.assertEqual(cborutil.decodeitem(encoded[0:6]),
(True, 24, 5, cborutil.SPECIAL_START_SET))
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 encoded = b''.join(cborutil.streamencode({i for i in range(256)}))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:3]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:4]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:5]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:6]),
(True, 256, 6, cborutil.SPECIAL_START_SET))
def testinvalidvalue(self):
encoded = b''.join([
b'\xd9\x01\x02', # semantic tag
b'\x81', # array of size 1
b'\x5f\x43foo\xff', # indefinite length bytestring "foo"
])
with self.assertRaisesRegex(cborutil.CBORDecodeError,
'indefinite length bytestrings not '
'allowed as set values'):
cborutil.decodeall(encoded)
encoded = b''.join([
b'\xd9\x01\x02',
b'\x81',
b'\x80', # empty array
])
with self.assertRaisesRegex(cborutil.CBORDecodeError,
'collections not allowed as set values'):
cborutil.decodeall(encoded)
encoded = b''.join([
b'\xd9\x01\x02',
b'\x81',
b'\xa0', # empty map
])
with self.assertRaisesRegex(cborutil.CBORDecodeError,
'collections not allowed as set values'):
cborutil.decodeall(encoded)
encoded = b''.join([
b'\xd9\x01\x02',
b'\x81',
b'\xd9\x01\x02\x81\x01', # set with integer 1
])
with self.assertRaisesRegex(cborutil.CBORDecodeError,
'collections not allowed as set values'):
cborutil.decodeall(encoded)
class BoolTests(TestCase):
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729 def testbasic(self):
self.assertEqual(list(cborutil.streamencode(True)), [b'\xf5'])
self.assertEqual(list(cborutil.streamencode(False)), [b'\xf4'])
self.assertIs(loadit(cborutil.streamencode(True)), True)
self.assertIs(loadit(cborutil.streamencode(False)), False)
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 self.assertEqual(cborutil.decodeall(b'\xf4'), [False])
self.assertEqual(cborutil.decodeall(b'\xf5'), [True])
self.assertEqual(cborutil.decodeall(b'\xf4\xf5\xf5\xf4'),
[False, True, True, False])
class NoneTests(TestCase):
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729 def testbasic(self):
self.assertEqual(list(cborutil.streamencode(None)), [b'\xf6'])
self.assertIs(loadit(cborutil.streamencode(None)), None)
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 self.assertEqual(cborutil.decodeall(b'\xf6'), [None])
self.assertEqual(cborutil.decodeall(b'\xf6\xf6'), [None, None])
class MapTests(TestCase):
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729 def testempty(self):
self.assertEqual(list(cborutil.streamencode({})), [b'\xa0'])
self.assertEqual(loadit(cborutil.streamencode({})), {})
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 self.assertEqual(cborutil.decodeall(b'\xa0'), [{}])
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729 def testemptyindefinite(self):
self.assertEqual(list(cborutil.streamencodemapfromiter([])), [
b'\xbf', b'\xff'])
self.assertEqual(loadit(cborutil.streamencodemapfromiter([])), {})
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 with self.assertRaisesRegex(cborutil.CBORDecodeError,
'indefinite length uint not allowed'):
cborutil.decodeall(b'\xbf\xff')
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729 def testone(self):
source = {b'foo': b'bar'}
self.assertEqual(list(cborutil.streamencode(source)), [
b'\xa1', b'\x43', b'foo', b'\x43', b'bar'])
self.assertEqual(loadit(cborutil.streamencode(source)), source)
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 self.assertEqual(cborutil.decodeall(b'\xa1\x43foo\x43bar'), [source])
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729 def testmultiple(self):
source = {
b'foo': b'bar',
b'baz': b'value1',
}
self.assertEqual(loadit(cborutil.streamencode(source)), source)
self.assertEqual(
loadit(cborutil.streamencodemapfromiter(source.items())),
source)
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeall(encoded), [source])
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729 def testcomplex(self):
source = {
b'key': 1,
2: -10,
}
self.assertEqual(loadit(cborutil.streamencode(source)),
source)
self.assertEqual(
loadit(cborutil.streamencodemapfromiter(source.items())),
source)
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeall(encoded), [source])
def testnested(self):
source = {b'key1': None, b'key2': {b'sub1': b'sub2'}, b'sub2': {}}
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeall(encoded), [source])
source = {
b'key1': [],
b'key2': [None, False],
b'key3': {b'foo', b'bar'},
b'key4': {},
}
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeall(encoded), [source])
def testillegalkey(self):
encoded = b''.join([
# map header + len 1
b'\xa1',
# indefinite length bytestring "foo" in key position
b'\x5f\x03foo\xff'
])
with self.assertRaisesRegex(cborutil.CBORDecodeError,
'indefinite length bytestrings not '
'allowed as map keys'):
cborutil.decodeall(encoded)
encoded = b''.join([
b'\xa1',
b'\x80', # empty array
b'\x43foo',
])
with self.assertRaisesRegex(cborutil.CBORDecodeError,
'collections not supported as map keys'):
cborutil.decodeall(encoded)
def testillegalvalue(self):
encoded = b''.join([
b'\xa1', # map headers
b'\x43foo', # key
b'\x5f\x03bar\xff', # indefinite length value
])
with self.assertRaisesRegex(cborutil.CBORDecodeError,
'indefinite length bytestrings not '
'allowed as map values'):
cborutil.decodeall(encoded)
def testpartialdecode(self):
source = {b'key1': b'value1'}
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(True, 1, 1, cborutil.SPECIAL_START_MAP))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(True, 1, 1, cborutil.SPECIAL_START_MAP))
source = {b'key%d' % i: None for i in range(23)}
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(True, 23, 1, cborutil.SPECIAL_START_MAP))
source = {b'key%d' % i: None for i in range(24)}
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(True, 24, 2, cborutil.SPECIAL_START_MAP))
self.assertEqual(cborutil.decodeitem(encoded[0:3]),
(True, 24, 2, cborutil.SPECIAL_START_MAP))
source = {b'key%d' % i: None for i in range(256)}
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:3]),
(True, 256, 3, cborutil.SPECIAL_START_MAP))
self.assertEqual(cborutil.decodeitem(encoded[0:4]),
(True, 256, 3, cborutil.SPECIAL_START_MAP))
source = {b'key%d' % i: None for i in range(65536)}
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -4, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(False, None, -3, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:3]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:4]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:5]),
(True, 65536, 5, cborutil.SPECIAL_START_MAP))
self.assertEqual(cborutil.decodeitem(encoded[0:6]),
(True, 65536, 5, cborutil.SPECIAL_START_MAP))
class SemanticTagTests(TestCase):
def testdecodeforbidden(self):
for i in range(500):
if i == cborutil.SEMANTIC_TAG_FINITE_SET:
continue
tag = cborutil.encodelength(cborutil.MAJOR_TYPE_SEMANTIC,
i)
encoded = tag + cborutil.encodelength(cborutil.MAJOR_TYPE_UINT, 42)
# Partial decode is incomplete.
if i < 24:
pass
elif i < 256:
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -1, cborutil.SPECIAL_NONE))
elif i < 65536:
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(False, None, -1, cborutil.SPECIAL_NONE))
with self.assertRaisesRegex(cborutil.CBORDecodeError,
Gregory Szorc
tests: use raw strings in test-cbor.py...
r41690 r'semantic tag \d+ not allowed'):
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 cborutil.decodeitem(encoded)
class SpecialTypesTests(TestCase):
def testforbiddentypes(self):
for i in range(256):
if i == cborutil.SUBTYPE_FALSE:
continue
elif i == cborutil.SUBTYPE_TRUE:
continue
elif i == cborutil.SUBTYPE_NULL:
continue
encoded = cborutil.encodelength(cborutil.MAJOR_TYPE_SPECIAL, i)
with self.assertRaisesRegex(cborutil.CBORDecodeError,
Gregory Szorc
tests: use raw strings in test-cbor.py...
r41690 r'special type \d+ not allowed'):
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 cborutil.decodeitem(encoded)
class SansIODecoderTests(TestCase):
def testemptyinput(self):
decoder = cborutil.sansiodecoder()
self.assertEqual(decoder.decode(b''), (False, 0, 0))
Gregory Szorc
cborutil: add a buffering decoder...
r39450 class BufferingDecoderTests(TestCase):
def testsimple(self):
source = [
b'foobar',
b'x' * 128,
{b'foo': b'bar'},
True,
False,
None,
[None for i in range(128)],
]
encoded = b''.join(cborutil.streamencode(source))
for step in range(1, 32):
decoder = cborutil.bufferingdecoder()
start = 0
while start < len(encoded):
decoder.decode(encoded[start:start + step])
start += step
self.assertEqual(decoder.getavailable(), [source])
Gregory Szorc
cborutil: cast bytearray to bytes...
r40160 def testbytearray(self):
source = b''.join(cborutil.streamencode(b'foobar'))
decoder = cborutil.bufferingdecoder()
decoder.decode(bytearray(source))
self.assertEqual(decoder.getavailable(), [b'foobar'])
Gregory Szorc
cborutil: implement sans I/O decoder...
r39448 class DecodeallTests(TestCase):
def testemptyinput(self):
self.assertEqual(cborutil.decodeall(b''), [])
def testpartialinput(self):
encoded = b''.join([
b'\x82', # array of 2 elements
b'\x01', # integer 1
])
with self.assertRaisesRegex(cborutil.CBORDecodeError,
'input data not complete'):
cborutil.decodeall(encoded)
Gregory Szorc
cborutil: implement support for streaming encoding, bytestring decoding...
r37729 if __name__ == '__main__':
import silenttestrunner
silenttestrunner.main(__name__)