Show More
test-cbor.py
210 lines
| 6.5 KiB
| text/x-python
|
PythonLexer
/ tests / test-cbor.py
Gregory Szorc
|
r37729 | from __future__ import absolute_import | ||
import io | ||||
import unittest | ||||
from mercurial.thirdparty import ( | ||||
cbor, | ||||
) | ||||
from mercurial.utils import ( | ||||
cborutil, | ||||
) | ||||
def loadit(it): | ||||
return cbor.loads(b''.join(it)) | ||||
class BytestringTests(unittest.TestCase): | ||||
def testsimple(self): | ||||
self.assertEqual( | ||||
list(cborutil.streamencode(b'foobar')), | ||||
[b'\x46', b'foobar']) | ||||
self.assertEqual( | ||||
loadit(cborutil.streamencode(b'foobar')), | ||||
b'foobar') | ||||
def testlong(self): | ||||
source = b'x' * 1048576 | ||||
self.assertEqual(loadit(cborutil.streamencode(source)), source) | ||||
def testfromiter(self): | ||||
# This is the example from RFC 7049 Section 2.2.2. | ||||
source = [b'\xaa\xbb\xcc\xdd', b'\xee\xff\x99'] | ||||
self.assertEqual( | ||||
list(cborutil.streamencodebytestringfromiter(source)), | ||||
[ | ||||
b'\x5f', | ||||
b'\x44', | ||||
b'\xaa\xbb\xcc\xdd', | ||||
b'\x43', | ||||
b'\xee\xff\x99', | ||||
b'\xff', | ||||
]) | ||||
self.assertEqual( | ||||
loadit(cborutil.streamencodebytestringfromiter(source)), | ||||
b''.join(source)) | ||||
def testfromiterlarge(self): | ||||
source = [b'a' * 16, b'b' * 128, b'c' * 1024, b'd' * 1048576] | ||||
self.assertEqual( | ||||
loadit(cborutil.streamencodebytestringfromiter(source)), | ||||
b''.join(source)) | ||||
def testindefinite(self): | ||||
source = b'\x00\x01\x02\x03' + b'\xff' * 16384 | ||||
it = cborutil.streamencodeindefinitebytestring(source, chunksize=2) | ||||
self.assertEqual(next(it), b'\x5f') | ||||
self.assertEqual(next(it), b'\x42') | ||||
self.assertEqual(next(it), b'\x00\x01') | ||||
self.assertEqual(next(it), b'\x42') | ||||
self.assertEqual(next(it), b'\x02\x03') | ||||
self.assertEqual(next(it), b'\x42') | ||||
self.assertEqual(next(it), b'\xff\xff') | ||||
dest = b''.join(cborutil.streamencodeindefinitebytestring( | ||||
source, chunksize=42)) | ||||
Augie Fackler
|
r37916 | self.assertEqual(cbor.loads(dest), source) | ||
Gregory Szorc
|
r37729 | |||
def testreadtoiter(self): | ||||
source = io.BytesIO(b'\x5f\x44\xaa\xbb\xcc\xdd\x43\xee\xff\x99\xff') | ||||
it = cborutil.readindefinitebytestringtoiter(source) | ||||
self.assertEqual(next(it), b'\xaa\xbb\xcc\xdd') | ||||
self.assertEqual(next(it), b'\xee\xff\x99') | ||||
with self.assertRaises(StopIteration): | ||||
next(it) | ||||
class IntTests(unittest.TestCase): | ||||
def testsmall(self): | ||||
self.assertEqual(list(cborutil.streamencode(0)), [b'\x00']) | ||||
self.assertEqual(list(cborutil.streamencode(1)), [b'\x01']) | ||||
self.assertEqual(list(cborutil.streamencode(2)), [b'\x02']) | ||||
self.assertEqual(list(cborutil.streamencode(3)), [b'\x03']) | ||||
self.assertEqual(list(cborutil.streamencode(4)), [b'\x04']) | ||||
def testnegativesmall(self): | ||||
self.assertEqual(list(cborutil.streamencode(-1)), [b'\x20']) | ||||
self.assertEqual(list(cborutil.streamencode(-2)), [b'\x21']) | ||||
self.assertEqual(list(cborutil.streamencode(-3)), [b'\x22']) | ||||
self.assertEqual(list(cborutil.streamencode(-4)), [b'\x23']) | ||||
self.assertEqual(list(cborutil.streamencode(-5)), [b'\x24']) | ||||
def testrange(self): | ||||
for i in range(-70000, 70000, 10): | ||||
self.assertEqual( | ||||
b''.join(cborutil.streamencode(i)), | ||||
cbor.dumps(i)) | ||||
class ArrayTests(unittest.TestCase): | ||||
def testempty(self): | ||||
self.assertEqual(list(cborutil.streamencode([])), [b'\x80']) | ||||
self.assertEqual(loadit(cborutil.streamencode([])), []) | ||||
def testbasic(self): | ||||
source = [b'foo', b'bar', 1, -10] | ||||
self.assertEqual(list(cborutil.streamencode(source)), [ | ||||
b'\x84', b'\x43', b'foo', b'\x43', b'bar', b'\x01', b'\x29']) | ||||
def testemptyfromiter(self): | ||||
self.assertEqual(b''.join(cborutil.streamencodearrayfromiter([])), | ||||
b'\x9f\xff') | ||||
def testfromiter1(self): | ||||
source = [b'foo'] | ||||
self.assertEqual(list(cborutil.streamencodearrayfromiter(source)), [ | ||||
b'\x9f', | ||||
b'\x43', b'foo', | ||||
b'\xff', | ||||
]) | ||||
dest = b''.join(cborutil.streamencodearrayfromiter(source)) | ||||
self.assertEqual(cbor.loads(dest), source) | ||||
def testtuple(self): | ||||
source = (b'foo', None, 42) | ||||
self.assertEqual(cbor.loads(b''.join(cborutil.streamencode(source))), | ||||
list(source)) | ||||
class SetTests(unittest.TestCase): | ||||
def testempty(self): | ||||
self.assertEqual(list(cborutil.streamencode(set())), [ | ||||
b'\xd9\x01\x02', | ||||
b'\x80', | ||||
]) | ||||
def testset(self): | ||||
source = {b'foo', None, 42} | ||||
self.assertEqual(cbor.loads(b''.join(cborutil.streamencode(source))), | ||||
source) | ||||
class BoolTests(unittest.TestCase): | ||||
def testbasic(self): | ||||
self.assertEqual(list(cborutil.streamencode(True)), [b'\xf5']) | ||||
self.assertEqual(list(cborutil.streamencode(False)), [b'\xf4']) | ||||
self.assertIs(loadit(cborutil.streamencode(True)), True) | ||||
self.assertIs(loadit(cborutil.streamencode(False)), False) | ||||
class NoneTests(unittest.TestCase): | ||||
def testbasic(self): | ||||
self.assertEqual(list(cborutil.streamencode(None)), [b'\xf6']) | ||||
self.assertIs(loadit(cborutil.streamencode(None)), None) | ||||
class MapTests(unittest.TestCase): | ||||
def testempty(self): | ||||
self.assertEqual(list(cborutil.streamencode({})), [b'\xa0']) | ||||
self.assertEqual(loadit(cborutil.streamencode({})), {}) | ||||
def testemptyindefinite(self): | ||||
self.assertEqual(list(cborutil.streamencodemapfromiter([])), [ | ||||
b'\xbf', b'\xff']) | ||||
self.assertEqual(loadit(cborutil.streamencodemapfromiter([])), {}) | ||||
def testone(self): | ||||
source = {b'foo': b'bar'} | ||||
self.assertEqual(list(cborutil.streamencode(source)), [ | ||||
b'\xa1', b'\x43', b'foo', b'\x43', b'bar']) | ||||
self.assertEqual(loadit(cborutil.streamencode(source)), source) | ||||
def testmultiple(self): | ||||
source = { | ||||
b'foo': b'bar', | ||||
b'baz': b'value1', | ||||
} | ||||
self.assertEqual(loadit(cborutil.streamencode(source)), source) | ||||
self.assertEqual( | ||||
loadit(cborutil.streamencodemapfromiter(source.items())), | ||||
source) | ||||
def testcomplex(self): | ||||
source = { | ||||
b'key': 1, | ||||
2: -10, | ||||
} | ||||
self.assertEqual(loadit(cborutil.streamencode(source)), | ||||
source) | ||||
self.assertEqual( | ||||
loadit(cborutil.streamencodemapfromiter(source.items())), | ||||
source) | ||||
if __name__ == '__main__': | ||||
import silenttestrunner | ||||
silenttestrunner.main(__name__) | ||||