##// END OF EJS Templates
tests: get access to thirdparty.cbor without requiring it to be installed...
Augie Fackler -
r41195:a9905045 default
parent child Browse files
Show More
@@ -1,992 +1,1001 b''
1 from __future__ import absolute_import
1 from __future__ import absolute_import
2
2
3 import os
4 import sys
3 import unittest
5 import unittest
4
6
5 from mercurial.thirdparty import (
7 # TODO migrate to canned cbor test strings and stop using thirdparty.cbor
6 cbor,
8 tpp = os.path.normpath(os.path.join(os.path.dirname(__file__),
7 )
9 '..', 'mercurial', 'thirdparty'))
10 if not os.path.exists(tpp):
11 # skip, not in a repo
12 sys.exit(80)
13 sys.path[0:0] = [tpp]
14 import cbor
15 del sys.path[0]
16
8 from mercurial.utils import (
17 from mercurial.utils import (
9 cborutil,
18 cborutil,
10 )
19 )
11
20
12 class TestCase(unittest.TestCase):
21 class TestCase(unittest.TestCase):
13 if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
22 if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
14 # Python 3.7 deprecates the regex*p* version, but 2.7 lacks
23 # Python 3.7 deprecates the regex*p* version, but 2.7 lacks
15 # the regex version.
24 # the regex version.
16 assertRaisesRegex = (# camelcase-required
25 assertRaisesRegex = (# camelcase-required
17 unittest.TestCase.assertRaisesRegexp)
26 unittest.TestCase.assertRaisesRegexp)
18
27
19 def loadit(it):
28 def loadit(it):
20 return cbor.loads(b''.join(it))
29 return cbor.loads(b''.join(it))
21
30
22 class BytestringTests(TestCase):
31 class BytestringTests(TestCase):
23 def testsimple(self):
32 def testsimple(self):
24 self.assertEqual(
33 self.assertEqual(
25 list(cborutil.streamencode(b'foobar')),
34 list(cborutil.streamencode(b'foobar')),
26 [b'\x46', b'foobar'])
35 [b'\x46', b'foobar'])
27
36
28 self.assertEqual(
37 self.assertEqual(
29 loadit(cborutil.streamencode(b'foobar')),
38 loadit(cborutil.streamencode(b'foobar')),
30 b'foobar')
39 b'foobar')
31
40
32 self.assertEqual(cborutil.decodeall(b'\x46foobar'),
41 self.assertEqual(cborutil.decodeall(b'\x46foobar'),
33 [b'foobar'])
42 [b'foobar'])
34
43
35 self.assertEqual(cborutil.decodeall(b'\x46foobar\x45fizbi'),
44 self.assertEqual(cborutil.decodeall(b'\x46foobar\x45fizbi'),
36 [b'foobar', b'fizbi'])
45 [b'foobar', b'fizbi'])
37
46
38 def testlong(self):
47 def testlong(self):
39 source = b'x' * 1048576
48 source = b'x' * 1048576
40
49
41 self.assertEqual(loadit(cborutil.streamencode(source)), source)
50 self.assertEqual(loadit(cborutil.streamencode(source)), source)
42
51
43 encoded = b''.join(cborutil.streamencode(source))
52 encoded = b''.join(cborutil.streamencode(source))
44 self.assertEqual(cborutil.decodeall(encoded), [source])
53 self.assertEqual(cborutil.decodeall(encoded), [source])
45
54
46 def testfromiter(self):
55 def testfromiter(self):
47 # This is the example from RFC 7049 Section 2.2.2.
56 # This is the example from RFC 7049 Section 2.2.2.
48 source = [b'\xaa\xbb\xcc\xdd', b'\xee\xff\x99']
57 source = [b'\xaa\xbb\xcc\xdd', b'\xee\xff\x99']
49
58
50 self.assertEqual(
59 self.assertEqual(
51 list(cborutil.streamencodebytestringfromiter(source)),
60 list(cborutil.streamencodebytestringfromiter(source)),
52 [
61 [
53 b'\x5f',
62 b'\x5f',
54 b'\x44',
63 b'\x44',
55 b'\xaa\xbb\xcc\xdd',
64 b'\xaa\xbb\xcc\xdd',
56 b'\x43',
65 b'\x43',
57 b'\xee\xff\x99',
66 b'\xee\xff\x99',
58 b'\xff',
67 b'\xff',
59 ])
68 ])
60
69
61 self.assertEqual(
70 self.assertEqual(
62 loadit(cborutil.streamencodebytestringfromiter(source)),
71 loadit(cborutil.streamencodebytestringfromiter(source)),
63 b''.join(source))
72 b''.join(source))
64
73
65 self.assertEqual(cborutil.decodeall(b'\x5f\x44\xaa\xbb\xcc\xdd'
74 self.assertEqual(cborutil.decodeall(b'\x5f\x44\xaa\xbb\xcc\xdd'
66 b'\x43\xee\xff\x99\xff'),
75 b'\x43\xee\xff\x99\xff'),
67 [b'\xaa\xbb\xcc\xdd', b'\xee\xff\x99', b''])
76 [b'\xaa\xbb\xcc\xdd', b'\xee\xff\x99', b''])
68
77
69 for i, chunk in enumerate(
78 for i, chunk in enumerate(
70 cborutil.decodeall(b'\x5f\x44\xaa\xbb\xcc\xdd'
79 cborutil.decodeall(b'\x5f\x44\xaa\xbb\xcc\xdd'
71 b'\x43\xee\xff\x99\xff')):
80 b'\x43\xee\xff\x99\xff')):
72 self.assertIsInstance(chunk, cborutil.bytestringchunk)
81 self.assertIsInstance(chunk, cborutil.bytestringchunk)
73
82
74 if i == 0:
83 if i == 0:
75 self.assertTrue(chunk.isfirst)
84 self.assertTrue(chunk.isfirst)
76 else:
85 else:
77 self.assertFalse(chunk.isfirst)
86 self.assertFalse(chunk.isfirst)
78
87
79 if i == 2:
88 if i == 2:
80 self.assertTrue(chunk.islast)
89 self.assertTrue(chunk.islast)
81 else:
90 else:
82 self.assertFalse(chunk.islast)
91 self.assertFalse(chunk.islast)
83
92
84 def testfromiterlarge(self):
93 def testfromiterlarge(self):
85 source = [b'a' * 16, b'b' * 128, b'c' * 1024, b'd' * 1048576]
94 source = [b'a' * 16, b'b' * 128, b'c' * 1024, b'd' * 1048576]
86
95
87 self.assertEqual(
96 self.assertEqual(
88 loadit(cborutil.streamencodebytestringfromiter(source)),
97 loadit(cborutil.streamencodebytestringfromiter(source)),
89 b''.join(source))
98 b''.join(source))
90
99
91 def testindefinite(self):
100 def testindefinite(self):
92 source = b'\x00\x01\x02\x03' + b'\xff' * 16384
101 source = b'\x00\x01\x02\x03' + b'\xff' * 16384
93
102
94 it = cborutil.streamencodeindefinitebytestring(source, chunksize=2)
103 it = cborutil.streamencodeindefinitebytestring(source, chunksize=2)
95
104
96 self.assertEqual(next(it), b'\x5f')
105 self.assertEqual(next(it), b'\x5f')
97 self.assertEqual(next(it), b'\x42')
106 self.assertEqual(next(it), b'\x42')
98 self.assertEqual(next(it), b'\x00\x01')
107 self.assertEqual(next(it), b'\x00\x01')
99 self.assertEqual(next(it), b'\x42')
108 self.assertEqual(next(it), b'\x42')
100 self.assertEqual(next(it), b'\x02\x03')
109 self.assertEqual(next(it), b'\x02\x03')
101 self.assertEqual(next(it), b'\x42')
110 self.assertEqual(next(it), b'\x42')
102 self.assertEqual(next(it), b'\xff\xff')
111 self.assertEqual(next(it), b'\xff\xff')
103
112
104 dest = b''.join(cborutil.streamencodeindefinitebytestring(
113 dest = b''.join(cborutil.streamencodeindefinitebytestring(
105 source, chunksize=42))
114 source, chunksize=42))
106 self.assertEqual(cbor.loads(dest), source)
115 self.assertEqual(cbor.loads(dest), source)
107
116
108 self.assertEqual(b''.join(cborutil.decodeall(dest)), source)
117 self.assertEqual(b''.join(cborutil.decodeall(dest)), source)
109
118
110 for chunk in cborutil.decodeall(dest):
119 for chunk in cborutil.decodeall(dest):
111 self.assertIsInstance(chunk, cborutil.bytestringchunk)
120 self.assertIsInstance(chunk, cborutil.bytestringchunk)
112 self.assertIn(len(chunk), (0, 8, 42))
121 self.assertIn(len(chunk), (0, 8, 42))
113
122
114 encoded = b'\x5f\xff'
123 encoded = b'\x5f\xff'
115 b = cborutil.decodeall(encoded)
124 b = cborutil.decodeall(encoded)
116 self.assertEqual(b, [b''])
125 self.assertEqual(b, [b''])
117 self.assertTrue(b[0].isfirst)
126 self.assertTrue(b[0].isfirst)
118 self.assertTrue(b[0].islast)
127 self.assertTrue(b[0].islast)
119
128
120 def testdecodevariouslengths(self):
129 def testdecodevariouslengths(self):
121 for i in (0, 1, 22, 23, 24, 25, 254, 255, 256, 65534, 65535, 65536):
130 for i in (0, 1, 22, 23, 24, 25, 254, 255, 256, 65534, 65535, 65536):
122 source = b'x' * i
131 source = b'x' * i
123 encoded = b''.join(cborutil.streamencode(source))
132 encoded = b''.join(cborutil.streamencode(source))
124
133
125 if len(source) < 24:
134 if len(source) < 24:
126 hlen = 1
135 hlen = 1
127 elif len(source) < 256:
136 elif len(source) < 256:
128 hlen = 2
137 hlen = 2
129 elif len(source) < 65536:
138 elif len(source) < 65536:
130 hlen = 3
139 hlen = 3
131 elif len(source) < 1048576:
140 elif len(source) < 1048576:
132 hlen = 5
141 hlen = 5
133
142
134 self.assertEqual(cborutil.decodeitem(encoded),
143 self.assertEqual(cborutil.decodeitem(encoded),
135 (True, source, hlen + len(source),
144 (True, source, hlen + len(source),
136 cborutil.SPECIAL_NONE))
145 cborutil.SPECIAL_NONE))
137
146
138 def testpartialdecode(self):
147 def testpartialdecode(self):
139 encoded = b''.join(cborutil.streamencode(b'foobar'))
148 encoded = b''.join(cborutil.streamencode(b'foobar'))
140
149
141 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
150 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
142 (False, None, -6, cborutil.SPECIAL_NONE))
151 (False, None, -6, cborutil.SPECIAL_NONE))
143 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
152 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
144 (False, None, -5, cborutil.SPECIAL_NONE))
153 (False, None, -5, cborutil.SPECIAL_NONE))
145 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
154 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
146 (False, None, -4, cborutil.SPECIAL_NONE))
155 (False, None, -4, cborutil.SPECIAL_NONE))
147 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
156 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
148 (False, None, -3, cborutil.SPECIAL_NONE))
157 (False, None, -3, cborutil.SPECIAL_NONE))
149 self.assertEqual(cborutil.decodeitem(encoded[0:5]),
158 self.assertEqual(cborutil.decodeitem(encoded[0:5]),
150 (False, None, -2, cborutil.SPECIAL_NONE))
159 (False, None, -2, cborutil.SPECIAL_NONE))
151 self.assertEqual(cborutil.decodeitem(encoded[0:6]),
160 self.assertEqual(cborutil.decodeitem(encoded[0:6]),
152 (False, None, -1, cborutil.SPECIAL_NONE))
161 (False, None, -1, cborutil.SPECIAL_NONE))
153 self.assertEqual(cborutil.decodeitem(encoded[0:7]),
162 self.assertEqual(cborutil.decodeitem(encoded[0:7]),
154 (True, b'foobar', 7, cborutil.SPECIAL_NONE))
163 (True, b'foobar', 7, cborutil.SPECIAL_NONE))
155
164
156 def testpartialdecodevariouslengths(self):
165 def testpartialdecodevariouslengths(self):
157 lens = [
166 lens = [
158 2,
167 2,
159 3,
168 3,
160 10,
169 10,
161 23,
170 23,
162 24,
171 24,
163 25,
172 25,
164 31,
173 31,
165 100,
174 100,
166 254,
175 254,
167 255,
176 255,
168 256,
177 256,
169 257,
178 257,
170 16384,
179 16384,
171 65534,
180 65534,
172 65535,
181 65535,
173 65536,
182 65536,
174 65537,
183 65537,
175 131071,
184 131071,
176 131072,
185 131072,
177 131073,
186 131073,
178 1048575,
187 1048575,
179 1048576,
188 1048576,
180 1048577,
189 1048577,
181 ]
190 ]
182
191
183 for size in lens:
192 for size in lens:
184 if size < 24:
193 if size < 24:
185 hlen = 1
194 hlen = 1
186 elif size < 2**8:
195 elif size < 2**8:
187 hlen = 2
196 hlen = 2
188 elif size < 2**16:
197 elif size < 2**16:
189 hlen = 3
198 hlen = 3
190 elif size < 2**32:
199 elif size < 2**32:
191 hlen = 5
200 hlen = 5
192 else:
201 else:
193 assert False
202 assert False
194
203
195 source = b'x' * size
204 source = b'x' * size
196 encoded = b''.join(cborutil.streamencode(source))
205 encoded = b''.join(cborutil.streamencode(source))
197
206
198 res = cborutil.decodeitem(encoded[0:1])
207 res = cborutil.decodeitem(encoded[0:1])
199
208
200 if hlen > 1:
209 if hlen > 1:
201 self.assertEqual(res, (False, None, -(hlen - 1),
210 self.assertEqual(res, (False, None, -(hlen - 1),
202 cborutil.SPECIAL_NONE))
211 cborutil.SPECIAL_NONE))
203 else:
212 else:
204 self.assertEqual(res, (False, None, -(size + hlen - 1),
213 self.assertEqual(res, (False, None, -(size + hlen - 1),
205 cborutil.SPECIAL_NONE))
214 cborutil.SPECIAL_NONE))
206
215
207 # Decoding partial header reports remaining header size.
216 # Decoding partial header reports remaining header size.
208 for i in range(hlen - 1):
217 for i in range(hlen - 1):
209 self.assertEqual(cborutil.decodeitem(encoded[0:i + 1]),
218 self.assertEqual(cborutil.decodeitem(encoded[0:i + 1]),
210 (False, None, -(hlen - i - 1),
219 (False, None, -(hlen - i - 1),
211 cborutil.SPECIAL_NONE))
220 cborutil.SPECIAL_NONE))
212
221
213 # Decoding complete header reports item size.
222 # Decoding complete header reports item size.
214 self.assertEqual(cborutil.decodeitem(encoded[0:hlen]),
223 self.assertEqual(cborutil.decodeitem(encoded[0:hlen]),
215 (False, None, -size, cborutil.SPECIAL_NONE))
224 (False, None, -size, cborutil.SPECIAL_NONE))
216
225
217 # Decoding single byte after header reports item size - 1
226 # Decoding single byte after header reports item size - 1
218 self.assertEqual(cborutil.decodeitem(encoded[0:hlen + 1]),
227 self.assertEqual(cborutil.decodeitem(encoded[0:hlen + 1]),
219 (False, None, -(size - 1), cborutil.SPECIAL_NONE))
228 (False, None, -(size - 1), cborutil.SPECIAL_NONE))
220
229
221 # Decoding all but the last byte reports -1 needed.
230 # Decoding all but the last byte reports -1 needed.
222 self.assertEqual(cborutil.decodeitem(encoded[0:hlen + size - 1]),
231 self.assertEqual(cborutil.decodeitem(encoded[0:hlen + size - 1]),
223 (False, None, -1, cborutil.SPECIAL_NONE))
232 (False, None, -1, cborutil.SPECIAL_NONE))
224
233
225 # Decoding last byte retrieves value.
234 # Decoding last byte retrieves value.
226 self.assertEqual(cborutil.decodeitem(encoded[0:hlen + size]),
235 self.assertEqual(cborutil.decodeitem(encoded[0:hlen + size]),
227 (True, source, hlen + size, cborutil.SPECIAL_NONE))
236 (True, source, hlen + size, cborutil.SPECIAL_NONE))
228
237
229 def testindefinitepartialdecode(self):
238 def testindefinitepartialdecode(self):
230 encoded = b''.join(cborutil.streamencodebytestringfromiter(
239 encoded = b''.join(cborutil.streamencodebytestringfromiter(
231 [b'foobar', b'biz']))
240 [b'foobar', b'biz']))
232
241
233 # First item should be begin of bytestring special.
242 # First item should be begin of bytestring special.
234 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
243 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
235 (True, None, 1,
244 (True, None, 1,
236 cborutil.SPECIAL_START_INDEFINITE_BYTESTRING))
245 cborutil.SPECIAL_START_INDEFINITE_BYTESTRING))
237
246
238 # Second item should be the first chunk. But only available when
247 # Second item should be the first chunk. But only available when
239 # we give it 7 bytes (1 byte header + 6 byte chunk).
248 # we give it 7 bytes (1 byte header + 6 byte chunk).
240 self.assertEqual(cborutil.decodeitem(encoded[1:2]),
249 self.assertEqual(cborutil.decodeitem(encoded[1:2]),
241 (False, None, -6, cborutil.SPECIAL_NONE))
250 (False, None, -6, cborutil.SPECIAL_NONE))
242 self.assertEqual(cborutil.decodeitem(encoded[1:3]),
251 self.assertEqual(cborutil.decodeitem(encoded[1:3]),
243 (False, None, -5, cborutil.SPECIAL_NONE))
252 (False, None, -5, cborutil.SPECIAL_NONE))
244 self.assertEqual(cborutil.decodeitem(encoded[1:4]),
253 self.assertEqual(cborutil.decodeitem(encoded[1:4]),
245 (False, None, -4, cborutil.SPECIAL_NONE))
254 (False, None, -4, cborutil.SPECIAL_NONE))
246 self.assertEqual(cborutil.decodeitem(encoded[1:5]),
255 self.assertEqual(cborutil.decodeitem(encoded[1:5]),
247 (False, None, -3, cborutil.SPECIAL_NONE))
256 (False, None, -3, cborutil.SPECIAL_NONE))
248 self.assertEqual(cborutil.decodeitem(encoded[1:6]),
257 self.assertEqual(cborutil.decodeitem(encoded[1:6]),
249 (False, None, -2, cborutil.SPECIAL_NONE))
258 (False, None, -2, cborutil.SPECIAL_NONE))
250 self.assertEqual(cborutil.decodeitem(encoded[1:7]),
259 self.assertEqual(cborutil.decodeitem(encoded[1:7]),
251 (False, None, -1, cborutil.SPECIAL_NONE))
260 (False, None, -1, cborutil.SPECIAL_NONE))
252
261
253 self.assertEqual(cborutil.decodeitem(encoded[1:8]),
262 self.assertEqual(cborutil.decodeitem(encoded[1:8]),
254 (True, b'foobar', 7, cborutil.SPECIAL_NONE))
263 (True, b'foobar', 7, cborutil.SPECIAL_NONE))
255
264
256 # Third item should be second chunk. But only available when
265 # Third item should be second chunk. But only available when
257 # we give it 4 bytes (1 byte header + 3 byte chunk).
266 # we give it 4 bytes (1 byte header + 3 byte chunk).
258 self.assertEqual(cborutil.decodeitem(encoded[8:9]),
267 self.assertEqual(cborutil.decodeitem(encoded[8:9]),
259 (False, None, -3, cborutil.SPECIAL_NONE))
268 (False, None, -3, cborutil.SPECIAL_NONE))
260 self.assertEqual(cborutil.decodeitem(encoded[8:10]),
269 self.assertEqual(cborutil.decodeitem(encoded[8:10]),
261 (False, None, -2, cborutil.SPECIAL_NONE))
270 (False, None, -2, cborutil.SPECIAL_NONE))
262 self.assertEqual(cborutil.decodeitem(encoded[8:11]),
271 self.assertEqual(cborutil.decodeitem(encoded[8:11]),
263 (False, None, -1, cborutil.SPECIAL_NONE))
272 (False, None, -1, cborutil.SPECIAL_NONE))
264
273
265 self.assertEqual(cborutil.decodeitem(encoded[8:12]),
274 self.assertEqual(cborutil.decodeitem(encoded[8:12]),
266 (True, b'biz', 4, cborutil.SPECIAL_NONE))
275 (True, b'biz', 4, cborutil.SPECIAL_NONE))
267
276
268 # Fourth item should be end of indefinite stream marker.
277 # Fourth item should be end of indefinite stream marker.
269 self.assertEqual(cborutil.decodeitem(encoded[12:13]),
278 self.assertEqual(cborutil.decodeitem(encoded[12:13]),
270 (True, None, 1, cborutil.SPECIAL_INDEFINITE_BREAK))
279 (True, None, 1, cborutil.SPECIAL_INDEFINITE_BREAK))
271
280
272 # Now test the behavior when going through the decoder.
281 # Now test the behavior when going through the decoder.
273
282
274 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:1]),
283 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:1]),
275 (False, 1, 0))
284 (False, 1, 0))
276 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:2]),
285 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:2]),
277 (False, 1, 6))
286 (False, 1, 6))
278 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:3]),
287 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:3]),
279 (False, 1, 5))
288 (False, 1, 5))
280 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:4]),
289 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:4]),
281 (False, 1, 4))
290 (False, 1, 4))
282 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:5]),
291 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:5]),
283 (False, 1, 3))
292 (False, 1, 3))
284 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:6]),
293 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:6]),
285 (False, 1, 2))
294 (False, 1, 2))
286 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:7]),
295 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:7]),
287 (False, 1, 1))
296 (False, 1, 1))
288 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:8]),
297 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:8]),
289 (True, 8, 0))
298 (True, 8, 0))
290
299
291 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:9]),
300 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:9]),
292 (True, 8, 3))
301 (True, 8, 3))
293 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:10]),
302 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:10]),
294 (True, 8, 2))
303 (True, 8, 2))
295 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:11]),
304 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:11]),
296 (True, 8, 1))
305 (True, 8, 1))
297 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:12]),
306 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:12]),
298 (True, 12, 0))
307 (True, 12, 0))
299
308
300 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:13]),
309 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:13]),
301 (True, 13, 0))
310 (True, 13, 0))
302
311
303 decoder = cborutil.sansiodecoder()
312 decoder = cborutil.sansiodecoder()
304 decoder.decode(encoded[0:8])
313 decoder.decode(encoded[0:8])
305 values = decoder.getavailable()
314 values = decoder.getavailable()
306 self.assertEqual(values, [b'foobar'])
315 self.assertEqual(values, [b'foobar'])
307 self.assertTrue(values[0].isfirst)
316 self.assertTrue(values[0].isfirst)
308 self.assertFalse(values[0].islast)
317 self.assertFalse(values[0].islast)
309
318
310 self.assertEqual(decoder.decode(encoded[8:12]),
319 self.assertEqual(decoder.decode(encoded[8:12]),
311 (True, 4, 0))
320 (True, 4, 0))
312 values = decoder.getavailable()
321 values = decoder.getavailable()
313 self.assertEqual(values, [b'biz'])
322 self.assertEqual(values, [b'biz'])
314 self.assertFalse(values[0].isfirst)
323 self.assertFalse(values[0].isfirst)
315 self.assertFalse(values[0].islast)
324 self.assertFalse(values[0].islast)
316
325
317 self.assertEqual(decoder.decode(encoded[12:]),
326 self.assertEqual(decoder.decode(encoded[12:]),
318 (True, 1, 0))
327 (True, 1, 0))
319 values = decoder.getavailable()
328 values = decoder.getavailable()
320 self.assertEqual(values, [b''])
329 self.assertEqual(values, [b''])
321 self.assertFalse(values[0].isfirst)
330 self.assertFalse(values[0].isfirst)
322 self.assertTrue(values[0].islast)
331 self.assertTrue(values[0].islast)
323
332
324 class StringTests(TestCase):
333 class StringTests(TestCase):
325 def testdecodeforbidden(self):
334 def testdecodeforbidden(self):
326 encoded = b'\x63foo'
335 encoded = b'\x63foo'
327 with self.assertRaisesRegex(cborutil.CBORDecodeError,
336 with self.assertRaisesRegex(cborutil.CBORDecodeError,
328 'string major type not supported'):
337 'string major type not supported'):
329 cborutil.decodeall(encoded)
338 cborutil.decodeall(encoded)
330
339
331 class IntTests(TestCase):
340 class IntTests(TestCase):
332 def testsmall(self):
341 def testsmall(self):
333 self.assertEqual(list(cborutil.streamencode(0)), [b'\x00'])
342 self.assertEqual(list(cborutil.streamencode(0)), [b'\x00'])
334 self.assertEqual(cborutil.decodeall(b'\x00'), [0])
343 self.assertEqual(cborutil.decodeall(b'\x00'), [0])
335
344
336 self.assertEqual(list(cborutil.streamencode(1)), [b'\x01'])
345 self.assertEqual(list(cborutil.streamencode(1)), [b'\x01'])
337 self.assertEqual(cborutil.decodeall(b'\x01'), [1])
346 self.assertEqual(cborutil.decodeall(b'\x01'), [1])
338
347
339 self.assertEqual(list(cborutil.streamencode(2)), [b'\x02'])
348 self.assertEqual(list(cborutil.streamencode(2)), [b'\x02'])
340 self.assertEqual(cborutil.decodeall(b'\x02'), [2])
349 self.assertEqual(cborutil.decodeall(b'\x02'), [2])
341
350
342 self.assertEqual(list(cborutil.streamencode(3)), [b'\x03'])
351 self.assertEqual(list(cborutil.streamencode(3)), [b'\x03'])
343 self.assertEqual(cborutil.decodeall(b'\x03'), [3])
352 self.assertEqual(cborutil.decodeall(b'\x03'), [3])
344
353
345 self.assertEqual(list(cborutil.streamencode(4)), [b'\x04'])
354 self.assertEqual(list(cborutil.streamencode(4)), [b'\x04'])
346 self.assertEqual(cborutil.decodeall(b'\x04'), [4])
355 self.assertEqual(cborutil.decodeall(b'\x04'), [4])
347
356
348 # Multiple value decode works.
357 # Multiple value decode works.
349 self.assertEqual(cborutil.decodeall(b'\x00\x01\x02\x03\x04'),
358 self.assertEqual(cborutil.decodeall(b'\x00\x01\x02\x03\x04'),
350 [0, 1, 2, 3, 4])
359 [0, 1, 2, 3, 4])
351
360
352 def testnegativesmall(self):
361 def testnegativesmall(self):
353 self.assertEqual(list(cborutil.streamencode(-1)), [b'\x20'])
362 self.assertEqual(list(cborutil.streamencode(-1)), [b'\x20'])
354 self.assertEqual(cborutil.decodeall(b'\x20'), [-1])
363 self.assertEqual(cborutil.decodeall(b'\x20'), [-1])
355
364
356 self.assertEqual(list(cborutil.streamencode(-2)), [b'\x21'])
365 self.assertEqual(list(cborutil.streamencode(-2)), [b'\x21'])
357 self.assertEqual(cborutil.decodeall(b'\x21'), [-2])
366 self.assertEqual(cborutil.decodeall(b'\x21'), [-2])
358
367
359 self.assertEqual(list(cborutil.streamencode(-3)), [b'\x22'])
368 self.assertEqual(list(cborutil.streamencode(-3)), [b'\x22'])
360 self.assertEqual(cborutil.decodeall(b'\x22'), [-3])
369 self.assertEqual(cborutil.decodeall(b'\x22'), [-3])
361
370
362 self.assertEqual(list(cborutil.streamencode(-4)), [b'\x23'])
371 self.assertEqual(list(cborutil.streamencode(-4)), [b'\x23'])
363 self.assertEqual(cborutil.decodeall(b'\x23'), [-4])
372 self.assertEqual(cborutil.decodeall(b'\x23'), [-4])
364
373
365 self.assertEqual(list(cborutil.streamencode(-5)), [b'\x24'])
374 self.assertEqual(list(cborutil.streamencode(-5)), [b'\x24'])
366 self.assertEqual(cborutil.decodeall(b'\x24'), [-5])
375 self.assertEqual(cborutil.decodeall(b'\x24'), [-5])
367
376
368 # Multiple value decode works.
377 # Multiple value decode works.
369 self.assertEqual(cborutil.decodeall(b'\x20\x21\x22\x23\x24'),
378 self.assertEqual(cborutil.decodeall(b'\x20\x21\x22\x23\x24'),
370 [-1, -2, -3, -4, -5])
379 [-1, -2, -3, -4, -5])
371
380
372 def testrange(self):
381 def testrange(self):
373 for i in range(-70000, 70000, 10):
382 for i in range(-70000, 70000, 10):
374 encoded = b''.join(cborutil.streamencode(i))
383 encoded = b''.join(cborutil.streamencode(i))
375
384
376 self.assertEqual(encoded, cbor.dumps(i))
385 self.assertEqual(encoded, cbor.dumps(i))
377 self.assertEqual(cborutil.decodeall(encoded), [i])
386 self.assertEqual(cborutil.decodeall(encoded), [i])
378
387
379 def testdecodepartialubyte(self):
388 def testdecodepartialubyte(self):
380 encoded = b''.join(cborutil.streamencode(250))
389 encoded = b''.join(cborutil.streamencode(250))
381
390
382 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
391 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
383 (False, None, -1, cborutil.SPECIAL_NONE))
392 (False, None, -1, cborutil.SPECIAL_NONE))
384 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
393 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
385 (True, 250, 2, cborutil.SPECIAL_NONE))
394 (True, 250, 2, cborutil.SPECIAL_NONE))
386
395
387 def testdecodepartialbyte(self):
396 def testdecodepartialbyte(self):
388 encoded = b''.join(cborutil.streamencode(-42))
397 encoded = b''.join(cborutil.streamencode(-42))
389 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
398 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
390 (False, None, -1, cborutil.SPECIAL_NONE))
399 (False, None, -1, cborutil.SPECIAL_NONE))
391 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
400 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
392 (True, -42, 2, cborutil.SPECIAL_NONE))
401 (True, -42, 2, cborutil.SPECIAL_NONE))
393
402
394 def testdecodepartialushort(self):
403 def testdecodepartialushort(self):
395 encoded = b''.join(cborutil.streamencode(2**15))
404 encoded = b''.join(cborutil.streamencode(2**15))
396
405
397 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
406 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
398 (False, None, -2, cborutil.SPECIAL_NONE))
407 (False, None, -2, cborutil.SPECIAL_NONE))
399 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
408 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
400 (False, None, -1, cborutil.SPECIAL_NONE))
409 (False, None, -1, cborutil.SPECIAL_NONE))
401 self.assertEqual(cborutil.decodeitem(encoded[0:5]),
410 self.assertEqual(cborutil.decodeitem(encoded[0:5]),
402 (True, 2**15, 3, cborutil.SPECIAL_NONE))
411 (True, 2**15, 3, cborutil.SPECIAL_NONE))
403
412
404 def testdecodepartialshort(self):
413 def testdecodepartialshort(self):
405 encoded = b''.join(cborutil.streamencode(-1024))
414 encoded = b''.join(cborutil.streamencode(-1024))
406
415
407 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
416 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
408 (False, None, -2, cborutil.SPECIAL_NONE))
417 (False, None, -2, cborutil.SPECIAL_NONE))
409 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
418 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
410 (False, None, -1, cborutil.SPECIAL_NONE))
419 (False, None, -1, cborutil.SPECIAL_NONE))
411 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
420 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
412 (True, -1024, 3, cborutil.SPECIAL_NONE))
421 (True, -1024, 3, cborutil.SPECIAL_NONE))
413
422
414 def testdecodepartialulong(self):
423 def testdecodepartialulong(self):
415 encoded = b''.join(cborutil.streamencode(2**28))
424 encoded = b''.join(cborutil.streamencode(2**28))
416
425
417 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
426 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
418 (False, None, -4, cborutil.SPECIAL_NONE))
427 (False, None, -4, cborutil.SPECIAL_NONE))
419 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
428 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
420 (False, None, -3, cborutil.SPECIAL_NONE))
429 (False, None, -3, cborutil.SPECIAL_NONE))
421 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
430 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
422 (False, None, -2, cborutil.SPECIAL_NONE))
431 (False, None, -2, cborutil.SPECIAL_NONE))
423 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
432 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
424 (False, None, -1, cborutil.SPECIAL_NONE))
433 (False, None, -1, cborutil.SPECIAL_NONE))
425 self.assertEqual(cborutil.decodeitem(encoded[0:5]),
434 self.assertEqual(cborutil.decodeitem(encoded[0:5]),
426 (True, 2**28, 5, cborutil.SPECIAL_NONE))
435 (True, 2**28, 5, cborutil.SPECIAL_NONE))
427
436
428 def testdecodepartiallong(self):
437 def testdecodepartiallong(self):
429 encoded = b''.join(cborutil.streamencode(-1048580))
438 encoded = b''.join(cborutil.streamencode(-1048580))
430
439
431 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
440 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
432 (False, None, -4, cborutil.SPECIAL_NONE))
441 (False, None, -4, cborutil.SPECIAL_NONE))
433 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
442 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
434 (False, None, -3, cborutil.SPECIAL_NONE))
443 (False, None, -3, cborutil.SPECIAL_NONE))
435 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
444 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
436 (False, None, -2, cborutil.SPECIAL_NONE))
445 (False, None, -2, cborutil.SPECIAL_NONE))
437 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
446 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
438 (False, None, -1, cborutil.SPECIAL_NONE))
447 (False, None, -1, cborutil.SPECIAL_NONE))
439 self.assertEqual(cborutil.decodeitem(encoded[0:5]),
448 self.assertEqual(cborutil.decodeitem(encoded[0:5]),
440 (True, -1048580, 5, cborutil.SPECIAL_NONE))
449 (True, -1048580, 5, cborutil.SPECIAL_NONE))
441
450
442 def testdecodepartialulonglong(self):
451 def testdecodepartialulonglong(self):
443 encoded = b''.join(cborutil.streamencode(2**32))
452 encoded = b''.join(cborutil.streamencode(2**32))
444
453
445 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
454 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
446 (False, None, -8, cborutil.SPECIAL_NONE))
455 (False, None, -8, cborutil.SPECIAL_NONE))
447 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
456 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
448 (False, None, -7, cborutil.SPECIAL_NONE))
457 (False, None, -7, cborutil.SPECIAL_NONE))
449 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
458 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
450 (False, None, -6, cborutil.SPECIAL_NONE))
459 (False, None, -6, cborutil.SPECIAL_NONE))
451 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
460 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
452 (False, None, -5, cborutil.SPECIAL_NONE))
461 (False, None, -5, cborutil.SPECIAL_NONE))
453 self.assertEqual(cborutil.decodeitem(encoded[0:5]),
462 self.assertEqual(cborutil.decodeitem(encoded[0:5]),
454 (False, None, -4, cborutil.SPECIAL_NONE))
463 (False, None, -4, cborutil.SPECIAL_NONE))
455 self.assertEqual(cborutil.decodeitem(encoded[0:6]),
464 self.assertEqual(cborutil.decodeitem(encoded[0:6]),
456 (False, None, -3, cborutil.SPECIAL_NONE))
465 (False, None, -3, cborutil.SPECIAL_NONE))
457 self.assertEqual(cborutil.decodeitem(encoded[0:7]),
466 self.assertEqual(cborutil.decodeitem(encoded[0:7]),
458 (False, None, -2, cborutil.SPECIAL_NONE))
467 (False, None, -2, cborutil.SPECIAL_NONE))
459 self.assertEqual(cborutil.decodeitem(encoded[0:8]),
468 self.assertEqual(cborutil.decodeitem(encoded[0:8]),
460 (False, None, -1, cborutil.SPECIAL_NONE))
469 (False, None, -1, cborutil.SPECIAL_NONE))
461 self.assertEqual(cborutil.decodeitem(encoded[0:9]),
470 self.assertEqual(cborutil.decodeitem(encoded[0:9]),
462 (True, 2**32, 9, cborutil.SPECIAL_NONE))
471 (True, 2**32, 9, cborutil.SPECIAL_NONE))
463
472
464 with self.assertRaisesRegex(
473 with self.assertRaisesRegex(
465 cborutil.CBORDecodeError, 'input data not fully consumed'):
474 cborutil.CBORDecodeError, 'input data not fully consumed'):
466 cborutil.decodeall(encoded[0:1])
475 cborutil.decodeall(encoded[0:1])
467
476
468 with self.assertRaisesRegex(
477 with self.assertRaisesRegex(
469 cborutil.CBORDecodeError, 'input data not fully consumed'):
478 cborutil.CBORDecodeError, 'input data not fully consumed'):
470 cborutil.decodeall(encoded[0:2])
479 cborutil.decodeall(encoded[0:2])
471
480
472 def testdecodepartiallonglong(self):
481 def testdecodepartiallonglong(self):
473 encoded = b''.join(cborutil.streamencode(-7000000000))
482 encoded = b''.join(cborutil.streamencode(-7000000000))
474
483
475 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
484 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
476 (False, None, -8, cborutil.SPECIAL_NONE))
485 (False, None, -8, cborutil.SPECIAL_NONE))
477 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
486 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
478 (False, None, -7, cborutil.SPECIAL_NONE))
487 (False, None, -7, cborutil.SPECIAL_NONE))
479 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
488 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
480 (False, None, -6, cborutil.SPECIAL_NONE))
489 (False, None, -6, cborutil.SPECIAL_NONE))
481 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
490 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
482 (False, None, -5, cborutil.SPECIAL_NONE))
491 (False, None, -5, cborutil.SPECIAL_NONE))
483 self.assertEqual(cborutil.decodeitem(encoded[0:5]),
492 self.assertEqual(cborutil.decodeitem(encoded[0:5]),
484 (False, None, -4, cborutil.SPECIAL_NONE))
493 (False, None, -4, cborutil.SPECIAL_NONE))
485 self.assertEqual(cborutil.decodeitem(encoded[0:6]),
494 self.assertEqual(cborutil.decodeitem(encoded[0:6]),
486 (False, None, -3, cborutil.SPECIAL_NONE))
495 (False, None, -3, cborutil.SPECIAL_NONE))
487 self.assertEqual(cborutil.decodeitem(encoded[0:7]),
496 self.assertEqual(cborutil.decodeitem(encoded[0:7]),
488 (False, None, -2, cborutil.SPECIAL_NONE))
497 (False, None, -2, cborutil.SPECIAL_NONE))
489 self.assertEqual(cborutil.decodeitem(encoded[0:8]),
498 self.assertEqual(cborutil.decodeitem(encoded[0:8]),
490 (False, None, -1, cborutil.SPECIAL_NONE))
499 (False, None, -1, cborutil.SPECIAL_NONE))
491 self.assertEqual(cborutil.decodeitem(encoded[0:9]),
500 self.assertEqual(cborutil.decodeitem(encoded[0:9]),
492 (True, -7000000000, 9, cborutil.SPECIAL_NONE))
501 (True, -7000000000, 9, cborutil.SPECIAL_NONE))
493
502
494 class ArrayTests(TestCase):
503 class ArrayTests(TestCase):
495 def testempty(self):
504 def testempty(self):
496 self.assertEqual(list(cborutil.streamencode([])), [b'\x80'])
505 self.assertEqual(list(cborutil.streamencode([])), [b'\x80'])
497 self.assertEqual(loadit(cborutil.streamencode([])), [])
506 self.assertEqual(loadit(cborutil.streamencode([])), [])
498
507
499 self.assertEqual(cborutil.decodeall(b'\x80'), [[]])
508 self.assertEqual(cborutil.decodeall(b'\x80'), [[]])
500
509
501 def testbasic(self):
510 def testbasic(self):
502 source = [b'foo', b'bar', 1, -10]
511 source = [b'foo', b'bar', 1, -10]
503
512
504 chunks = [
513 chunks = [
505 b'\x84', b'\x43', b'foo', b'\x43', b'bar', b'\x01', b'\x29']
514 b'\x84', b'\x43', b'foo', b'\x43', b'bar', b'\x01', b'\x29']
506
515
507 self.assertEqual(list(cborutil.streamencode(source)), chunks)
516 self.assertEqual(list(cborutil.streamencode(source)), chunks)
508
517
509 self.assertEqual(cborutil.decodeall(b''.join(chunks)), [source])
518 self.assertEqual(cborutil.decodeall(b''.join(chunks)), [source])
510
519
511 def testemptyfromiter(self):
520 def testemptyfromiter(self):
512 self.assertEqual(b''.join(cborutil.streamencodearrayfromiter([])),
521 self.assertEqual(b''.join(cborutil.streamencodearrayfromiter([])),
513 b'\x9f\xff')
522 b'\x9f\xff')
514
523
515 with self.assertRaisesRegex(cborutil.CBORDecodeError,
524 with self.assertRaisesRegex(cborutil.CBORDecodeError,
516 'indefinite length uint not allowed'):
525 'indefinite length uint not allowed'):
517 cborutil.decodeall(b'\x9f\xff')
526 cborutil.decodeall(b'\x9f\xff')
518
527
519 def testfromiter1(self):
528 def testfromiter1(self):
520 source = [b'foo']
529 source = [b'foo']
521
530
522 self.assertEqual(list(cborutil.streamencodearrayfromiter(source)), [
531 self.assertEqual(list(cborutil.streamencodearrayfromiter(source)), [
523 b'\x9f',
532 b'\x9f',
524 b'\x43', b'foo',
533 b'\x43', b'foo',
525 b'\xff',
534 b'\xff',
526 ])
535 ])
527
536
528 dest = b''.join(cborutil.streamencodearrayfromiter(source))
537 dest = b''.join(cborutil.streamencodearrayfromiter(source))
529 self.assertEqual(cbor.loads(dest), source)
538 self.assertEqual(cbor.loads(dest), source)
530
539
531 with self.assertRaisesRegex(cborutil.CBORDecodeError,
540 with self.assertRaisesRegex(cborutil.CBORDecodeError,
532 'indefinite length uint not allowed'):
541 'indefinite length uint not allowed'):
533 cborutil.decodeall(dest)
542 cborutil.decodeall(dest)
534
543
535 def testtuple(self):
544 def testtuple(self):
536 source = (b'foo', None, 42)
545 source = (b'foo', None, 42)
537 encoded = b''.join(cborutil.streamencode(source))
546 encoded = b''.join(cborutil.streamencode(source))
538
547
539 self.assertEqual(cbor.loads(encoded), list(source))
548 self.assertEqual(cbor.loads(encoded), list(source))
540
549
541 self.assertEqual(cborutil.decodeall(encoded), [list(source)])
550 self.assertEqual(cborutil.decodeall(encoded), [list(source)])
542
551
543 def testpartialdecode(self):
552 def testpartialdecode(self):
544 source = list(range(4))
553 source = list(range(4))
545 encoded = b''.join(cborutil.streamencode(source))
554 encoded = b''.join(cborutil.streamencode(source))
546 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
555 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
547 (True, 4, 1, cborutil.SPECIAL_START_ARRAY))
556 (True, 4, 1, cborutil.SPECIAL_START_ARRAY))
548 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
557 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
549 (True, 4, 1, cborutil.SPECIAL_START_ARRAY))
558 (True, 4, 1, cborutil.SPECIAL_START_ARRAY))
550
559
551 source = list(range(23))
560 source = list(range(23))
552 encoded = b''.join(cborutil.streamencode(source))
561 encoded = b''.join(cborutil.streamencode(source))
553 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
562 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
554 (True, 23, 1, cborutil.SPECIAL_START_ARRAY))
563 (True, 23, 1, cborutil.SPECIAL_START_ARRAY))
555 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
564 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
556 (True, 23, 1, cborutil.SPECIAL_START_ARRAY))
565 (True, 23, 1, cborutil.SPECIAL_START_ARRAY))
557
566
558 source = list(range(24))
567 source = list(range(24))
559 encoded = b''.join(cborutil.streamencode(source))
568 encoded = b''.join(cborutil.streamencode(source))
560 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
569 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
561 (False, None, -1, cborutil.SPECIAL_NONE))
570 (False, None, -1, cborutil.SPECIAL_NONE))
562 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
571 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
563 (True, 24, 2, cborutil.SPECIAL_START_ARRAY))
572 (True, 24, 2, cborutil.SPECIAL_START_ARRAY))
564 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
573 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
565 (True, 24, 2, cborutil.SPECIAL_START_ARRAY))
574 (True, 24, 2, cborutil.SPECIAL_START_ARRAY))
566
575
567 source = list(range(256))
576 source = list(range(256))
568 encoded = b''.join(cborutil.streamencode(source))
577 encoded = b''.join(cborutil.streamencode(source))
569 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
578 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
570 (False, None, -2, cborutil.SPECIAL_NONE))
579 (False, None, -2, cborutil.SPECIAL_NONE))
571 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
580 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
572 (False, None, -1, cborutil.SPECIAL_NONE))
581 (False, None, -1, cborutil.SPECIAL_NONE))
573 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
582 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
574 (True, 256, 3, cborutil.SPECIAL_START_ARRAY))
583 (True, 256, 3, cborutil.SPECIAL_START_ARRAY))
575 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
584 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
576 (True, 256, 3, cborutil.SPECIAL_START_ARRAY))
585 (True, 256, 3, cborutil.SPECIAL_START_ARRAY))
577
586
578 def testnested(self):
587 def testnested(self):
579 source = [[], [], [[], [], []]]
588 source = [[], [], [[], [], []]]
580 encoded = b''.join(cborutil.streamencode(source))
589 encoded = b''.join(cborutil.streamencode(source))
581 self.assertEqual(cborutil.decodeall(encoded), [source])
590 self.assertEqual(cborutil.decodeall(encoded), [source])
582
591
583 source = [True, None, [True, 0, 2], [None], [], [[[]], -87]]
592 source = [True, None, [True, 0, 2], [None], [], [[[]], -87]]
584 encoded = b''.join(cborutil.streamencode(source))
593 encoded = b''.join(cborutil.streamencode(source))
585 self.assertEqual(cborutil.decodeall(encoded), [source])
594 self.assertEqual(cborutil.decodeall(encoded), [source])
586
595
587 # A set within an array.
596 # A set within an array.
588 source = [None, {b'foo', b'bar', None, False}, set()]
597 source = [None, {b'foo', b'bar', None, False}, set()]
589 encoded = b''.join(cborutil.streamencode(source))
598 encoded = b''.join(cborutil.streamencode(source))
590 self.assertEqual(cborutil.decodeall(encoded), [source])
599 self.assertEqual(cborutil.decodeall(encoded), [source])
591
600
592 # A map within an array.
601 # A map within an array.
593 source = [None, {}, {b'foo': b'bar', True: False}, [{}]]
602 source = [None, {}, {b'foo': b'bar', True: False}, [{}]]
594 encoded = b''.join(cborutil.streamencode(source))
603 encoded = b''.join(cborutil.streamencode(source))
595 self.assertEqual(cborutil.decodeall(encoded), [source])
604 self.assertEqual(cborutil.decodeall(encoded), [source])
596
605
597 def testindefinitebytestringvalues(self):
606 def testindefinitebytestringvalues(self):
598 # Single value array whose value is an empty indefinite bytestring.
607 # Single value array whose value is an empty indefinite bytestring.
599 encoded = b'\x81\x5f\x40\xff'
608 encoded = b'\x81\x5f\x40\xff'
600
609
601 with self.assertRaisesRegex(cborutil.CBORDecodeError,
610 with self.assertRaisesRegex(cborutil.CBORDecodeError,
602 'indefinite length bytestrings not '
611 'indefinite length bytestrings not '
603 'allowed as array values'):
612 'allowed as array values'):
604 cborutil.decodeall(encoded)
613 cborutil.decodeall(encoded)
605
614
606 class SetTests(TestCase):
615 class SetTests(TestCase):
607 def testempty(self):
616 def testempty(self):
608 self.assertEqual(list(cborutil.streamencode(set())), [
617 self.assertEqual(list(cborutil.streamencode(set())), [
609 b'\xd9\x01\x02',
618 b'\xd9\x01\x02',
610 b'\x80',
619 b'\x80',
611 ])
620 ])
612
621
613 self.assertEqual(cborutil.decodeall(b'\xd9\x01\x02\x80'), [set()])
622 self.assertEqual(cborutil.decodeall(b'\xd9\x01\x02\x80'), [set()])
614
623
615 def testset(self):
624 def testset(self):
616 source = {b'foo', None, 42}
625 source = {b'foo', None, 42}
617 encoded = b''.join(cborutil.streamencode(source))
626 encoded = b''.join(cborutil.streamencode(source))
618
627
619 self.assertEqual(cbor.loads(encoded), source)
628 self.assertEqual(cbor.loads(encoded), source)
620
629
621 self.assertEqual(cborutil.decodeall(encoded), [source])
630 self.assertEqual(cborutil.decodeall(encoded), [source])
622
631
623 def testinvalidtag(self):
632 def testinvalidtag(self):
624 # Must use array to encode sets.
633 # Must use array to encode sets.
625 encoded = b'\xd9\x01\x02\xa0'
634 encoded = b'\xd9\x01\x02\xa0'
626
635
627 with self.assertRaisesRegex(cborutil.CBORDecodeError,
636 with self.assertRaisesRegex(cborutil.CBORDecodeError,
628 'expected array after finite set '
637 'expected array after finite set '
629 'semantic tag'):
638 'semantic tag'):
630 cborutil.decodeall(encoded)
639 cborutil.decodeall(encoded)
631
640
632 def testpartialdecode(self):
641 def testpartialdecode(self):
633 # Semantic tag item will be 3 bytes. Set header will be variable
642 # Semantic tag item will be 3 bytes. Set header will be variable
634 # depending on length.
643 # depending on length.
635 encoded = b''.join(cborutil.streamencode({i for i in range(23)}))
644 encoded = b''.join(cborutil.streamencode({i for i in range(23)}))
636 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
645 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
637 (False, None, -2, cborutil.SPECIAL_NONE))
646 (False, None, -2, cborutil.SPECIAL_NONE))
638 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
647 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
639 (False, None, -1, cborutil.SPECIAL_NONE))
648 (False, None, -1, cborutil.SPECIAL_NONE))
640 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
649 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
641 (False, None, -1, cborutil.SPECIAL_NONE))
650 (False, None, -1, cborutil.SPECIAL_NONE))
642 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
651 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
643 (True, 23, 4, cborutil.SPECIAL_START_SET))
652 (True, 23, 4, cborutil.SPECIAL_START_SET))
644 self.assertEqual(cborutil.decodeitem(encoded[0:5]),
653 self.assertEqual(cborutil.decodeitem(encoded[0:5]),
645 (True, 23, 4, cborutil.SPECIAL_START_SET))
654 (True, 23, 4, cborutil.SPECIAL_START_SET))
646
655
647 encoded = b''.join(cborutil.streamencode({i for i in range(24)}))
656 encoded = b''.join(cborutil.streamencode({i for i in range(24)}))
648 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
657 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
649 (False, None, -2, cborutil.SPECIAL_NONE))
658 (False, None, -2, cborutil.SPECIAL_NONE))
650 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
659 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
651 (False, None, -1, cborutil.SPECIAL_NONE))
660 (False, None, -1, cborutil.SPECIAL_NONE))
652 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
661 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
653 (False, None, -1, cborutil.SPECIAL_NONE))
662 (False, None, -1, cborutil.SPECIAL_NONE))
654 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
663 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
655 (False, None, -1, cborutil.SPECIAL_NONE))
664 (False, None, -1, cborutil.SPECIAL_NONE))
656 self.assertEqual(cborutil.decodeitem(encoded[0:5]),
665 self.assertEqual(cborutil.decodeitem(encoded[0:5]),
657 (True, 24, 5, cborutil.SPECIAL_START_SET))
666 (True, 24, 5, cborutil.SPECIAL_START_SET))
658 self.assertEqual(cborutil.decodeitem(encoded[0:6]),
667 self.assertEqual(cborutil.decodeitem(encoded[0:6]),
659 (True, 24, 5, cborutil.SPECIAL_START_SET))
668 (True, 24, 5, cborutil.SPECIAL_START_SET))
660
669
661 encoded = b''.join(cborutil.streamencode({i for i in range(256)}))
670 encoded = b''.join(cborutil.streamencode({i for i in range(256)}))
662 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
671 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
663 (False, None, -2, cborutil.SPECIAL_NONE))
672 (False, None, -2, cborutil.SPECIAL_NONE))
664 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
673 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
665 (False, None, -1, cborutil.SPECIAL_NONE))
674 (False, None, -1, cborutil.SPECIAL_NONE))
666 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
675 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
667 (False, None, -1, cborutil.SPECIAL_NONE))
676 (False, None, -1, cborutil.SPECIAL_NONE))
668 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
677 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
669 (False, None, -2, cborutil.SPECIAL_NONE))
678 (False, None, -2, cborutil.SPECIAL_NONE))
670 self.assertEqual(cborutil.decodeitem(encoded[0:5]),
679 self.assertEqual(cborutil.decodeitem(encoded[0:5]),
671 (False, None, -1, cborutil.SPECIAL_NONE))
680 (False, None, -1, cborutil.SPECIAL_NONE))
672 self.assertEqual(cborutil.decodeitem(encoded[0:6]),
681 self.assertEqual(cborutil.decodeitem(encoded[0:6]),
673 (True, 256, 6, cborutil.SPECIAL_START_SET))
682 (True, 256, 6, cborutil.SPECIAL_START_SET))
674
683
675 def testinvalidvalue(self):
684 def testinvalidvalue(self):
676 encoded = b''.join([
685 encoded = b''.join([
677 b'\xd9\x01\x02', # semantic tag
686 b'\xd9\x01\x02', # semantic tag
678 b'\x81', # array of size 1
687 b'\x81', # array of size 1
679 b'\x5f\x43foo\xff', # indefinite length bytestring "foo"
688 b'\x5f\x43foo\xff', # indefinite length bytestring "foo"
680 ])
689 ])
681
690
682 with self.assertRaisesRegex(cborutil.CBORDecodeError,
691 with self.assertRaisesRegex(cborutil.CBORDecodeError,
683 'indefinite length bytestrings not '
692 'indefinite length bytestrings not '
684 'allowed as set values'):
693 'allowed as set values'):
685 cborutil.decodeall(encoded)
694 cborutil.decodeall(encoded)
686
695
687 encoded = b''.join([
696 encoded = b''.join([
688 b'\xd9\x01\x02',
697 b'\xd9\x01\x02',
689 b'\x81',
698 b'\x81',
690 b'\x80', # empty array
699 b'\x80', # empty array
691 ])
700 ])
692
701
693 with self.assertRaisesRegex(cborutil.CBORDecodeError,
702 with self.assertRaisesRegex(cborutil.CBORDecodeError,
694 'collections not allowed as set values'):
703 'collections not allowed as set values'):
695 cborutil.decodeall(encoded)
704 cborutil.decodeall(encoded)
696
705
697 encoded = b''.join([
706 encoded = b''.join([
698 b'\xd9\x01\x02',
707 b'\xd9\x01\x02',
699 b'\x81',
708 b'\x81',
700 b'\xa0', # empty map
709 b'\xa0', # empty map
701 ])
710 ])
702
711
703 with self.assertRaisesRegex(cborutil.CBORDecodeError,
712 with self.assertRaisesRegex(cborutil.CBORDecodeError,
704 'collections not allowed as set values'):
713 'collections not allowed as set values'):
705 cborutil.decodeall(encoded)
714 cborutil.decodeall(encoded)
706
715
707 encoded = b''.join([
716 encoded = b''.join([
708 b'\xd9\x01\x02',
717 b'\xd9\x01\x02',
709 b'\x81',
718 b'\x81',
710 b'\xd9\x01\x02\x81\x01', # set with integer 1
719 b'\xd9\x01\x02\x81\x01', # set with integer 1
711 ])
720 ])
712
721
713 with self.assertRaisesRegex(cborutil.CBORDecodeError,
722 with self.assertRaisesRegex(cborutil.CBORDecodeError,
714 'collections not allowed as set values'):
723 'collections not allowed as set values'):
715 cborutil.decodeall(encoded)
724 cborutil.decodeall(encoded)
716
725
717 class BoolTests(TestCase):
726 class BoolTests(TestCase):
718 def testbasic(self):
727 def testbasic(self):
719 self.assertEqual(list(cborutil.streamencode(True)), [b'\xf5'])
728 self.assertEqual(list(cborutil.streamencode(True)), [b'\xf5'])
720 self.assertEqual(list(cborutil.streamencode(False)), [b'\xf4'])
729 self.assertEqual(list(cborutil.streamencode(False)), [b'\xf4'])
721
730
722 self.assertIs(loadit(cborutil.streamencode(True)), True)
731 self.assertIs(loadit(cborutil.streamencode(True)), True)
723 self.assertIs(loadit(cborutil.streamencode(False)), False)
732 self.assertIs(loadit(cborutil.streamencode(False)), False)
724
733
725 self.assertEqual(cborutil.decodeall(b'\xf4'), [False])
734 self.assertEqual(cborutil.decodeall(b'\xf4'), [False])
726 self.assertEqual(cborutil.decodeall(b'\xf5'), [True])
735 self.assertEqual(cborutil.decodeall(b'\xf5'), [True])
727
736
728 self.assertEqual(cborutil.decodeall(b'\xf4\xf5\xf5\xf4'),
737 self.assertEqual(cborutil.decodeall(b'\xf4\xf5\xf5\xf4'),
729 [False, True, True, False])
738 [False, True, True, False])
730
739
731 class NoneTests(TestCase):
740 class NoneTests(TestCase):
732 def testbasic(self):
741 def testbasic(self):
733 self.assertEqual(list(cborutil.streamencode(None)), [b'\xf6'])
742 self.assertEqual(list(cborutil.streamencode(None)), [b'\xf6'])
734
743
735 self.assertIs(loadit(cborutil.streamencode(None)), None)
744 self.assertIs(loadit(cborutil.streamencode(None)), None)
736
745
737 self.assertEqual(cborutil.decodeall(b'\xf6'), [None])
746 self.assertEqual(cborutil.decodeall(b'\xf6'), [None])
738 self.assertEqual(cborutil.decodeall(b'\xf6\xf6'), [None, None])
747 self.assertEqual(cborutil.decodeall(b'\xf6\xf6'), [None, None])
739
748
740 class MapTests(TestCase):
749 class MapTests(TestCase):
741 def testempty(self):
750 def testempty(self):
742 self.assertEqual(list(cborutil.streamencode({})), [b'\xa0'])
751 self.assertEqual(list(cborutil.streamencode({})), [b'\xa0'])
743 self.assertEqual(loadit(cborutil.streamencode({})), {})
752 self.assertEqual(loadit(cborutil.streamencode({})), {})
744
753
745 self.assertEqual(cborutil.decodeall(b'\xa0'), [{}])
754 self.assertEqual(cborutil.decodeall(b'\xa0'), [{}])
746
755
747 def testemptyindefinite(self):
756 def testemptyindefinite(self):
748 self.assertEqual(list(cborutil.streamencodemapfromiter([])), [
757 self.assertEqual(list(cborutil.streamencodemapfromiter([])), [
749 b'\xbf', b'\xff'])
758 b'\xbf', b'\xff'])
750
759
751 self.assertEqual(loadit(cborutil.streamencodemapfromiter([])), {})
760 self.assertEqual(loadit(cborutil.streamencodemapfromiter([])), {})
752
761
753 with self.assertRaisesRegex(cborutil.CBORDecodeError,
762 with self.assertRaisesRegex(cborutil.CBORDecodeError,
754 'indefinite length uint not allowed'):
763 'indefinite length uint not allowed'):
755 cborutil.decodeall(b'\xbf\xff')
764 cborutil.decodeall(b'\xbf\xff')
756
765
757 def testone(self):
766 def testone(self):
758 source = {b'foo': b'bar'}
767 source = {b'foo': b'bar'}
759 self.assertEqual(list(cborutil.streamencode(source)), [
768 self.assertEqual(list(cborutil.streamencode(source)), [
760 b'\xa1', b'\x43', b'foo', b'\x43', b'bar'])
769 b'\xa1', b'\x43', b'foo', b'\x43', b'bar'])
761
770
762 self.assertEqual(loadit(cborutil.streamencode(source)), source)
771 self.assertEqual(loadit(cborutil.streamencode(source)), source)
763
772
764 self.assertEqual(cborutil.decodeall(b'\xa1\x43foo\x43bar'), [source])
773 self.assertEqual(cborutil.decodeall(b'\xa1\x43foo\x43bar'), [source])
765
774
766 def testmultiple(self):
775 def testmultiple(self):
767 source = {
776 source = {
768 b'foo': b'bar',
777 b'foo': b'bar',
769 b'baz': b'value1',
778 b'baz': b'value1',
770 }
779 }
771
780
772 self.assertEqual(loadit(cborutil.streamencode(source)), source)
781 self.assertEqual(loadit(cborutil.streamencode(source)), source)
773
782
774 self.assertEqual(
783 self.assertEqual(
775 loadit(cborutil.streamencodemapfromiter(source.items())),
784 loadit(cborutil.streamencodemapfromiter(source.items())),
776 source)
785 source)
777
786
778 encoded = b''.join(cborutil.streamencode(source))
787 encoded = b''.join(cborutil.streamencode(source))
779 self.assertEqual(cborutil.decodeall(encoded), [source])
788 self.assertEqual(cborutil.decodeall(encoded), [source])
780
789
781 def testcomplex(self):
790 def testcomplex(self):
782 source = {
791 source = {
783 b'key': 1,
792 b'key': 1,
784 2: -10,
793 2: -10,
785 }
794 }
786
795
787 self.assertEqual(loadit(cborutil.streamencode(source)),
796 self.assertEqual(loadit(cborutil.streamencode(source)),
788 source)
797 source)
789
798
790 self.assertEqual(
799 self.assertEqual(
791 loadit(cborutil.streamencodemapfromiter(source.items())),
800 loadit(cborutil.streamencodemapfromiter(source.items())),
792 source)
801 source)
793
802
794 encoded = b''.join(cborutil.streamencode(source))
803 encoded = b''.join(cborutil.streamencode(source))
795 self.assertEqual(cborutil.decodeall(encoded), [source])
804 self.assertEqual(cborutil.decodeall(encoded), [source])
796
805
797 def testnested(self):
806 def testnested(self):
798 source = {b'key1': None, b'key2': {b'sub1': b'sub2'}, b'sub2': {}}
807 source = {b'key1': None, b'key2': {b'sub1': b'sub2'}, b'sub2': {}}
799 encoded = b''.join(cborutil.streamencode(source))
808 encoded = b''.join(cborutil.streamencode(source))
800
809
801 self.assertEqual(cborutil.decodeall(encoded), [source])
810 self.assertEqual(cborutil.decodeall(encoded), [source])
802
811
803 source = {
812 source = {
804 b'key1': [],
813 b'key1': [],
805 b'key2': [None, False],
814 b'key2': [None, False],
806 b'key3': {b'foo', b'bar'},
815 b'key3': {b'foo', b'bar'},
807 b'key4': {},
816 b'key4': {},
808 }
817 }
809 encoded = b''.join(cborutil.streamencode(source))
818 encoded = b''.join(cborutil.streamencode(source))
810 self.assertEqual(cborutil.decodeall(encoded), [source])
819 self.assertEqual(cborutil.decodeall(encoded), [source])
811
820
812 def testillegalkey(self):
821 def testillegalkey(self):
813 encoded = b''.join([
822 encoded = b''.join([
814 # map header + len 1
823 # map header + len 1
815 b'\xa1',
824 b'\xa1',
816 # indefinite length bytestring "foo" in key position
825 # indefinite length bytestring "foo" in key position
817 b'\x5f\x03foo\xff'
826 b'\x5f\x03foo\xff'
818 ])
827 ])
819
828
820 with self.assertRaisesRegex(cborutil.CBORDecodeError,
829 with self.assertRaisesRegex(cborutil.CBORDecodeError,
821 'indefinite length bytestrings not '
830 'indefinite length bytestrings not '
822 'allowed as map keys'):
831 'allowed as map keys'):
823 cborutil.decodeall(encoded)
832 cborutil.decodeall(encoded)
824
833
825 encoded = b''.join([
834 encoded = b''.join([
826 b'\xa1',
835 b'\xa1',
827 b'\x80', # empty array
836 b'\x80', # empty array
828 b'\x43foo',
837 b'\x43foo',
829 ])
838 ])
830
839
831 with self.assertRaisesRegex(cborutil.CBORDecodeError,
840 with self.assertRaisesRegex(cborutil.CBORDecodeError,
832 'collections not supported as map keys'):
841 'collections not supported as map keys'):
833 cborutil.decodeall(encoded)
842 cborutil.decodeall(encoded)
834
843
835 def testillegalvalue(self):
844 def testillegalvalue(self):
836 encoded = b''.join([
845 encoded = b''.join([
837 b'\xa1', # map headers
846 b'\xa1', # map headers
838 b'\x43foo', # key
847 b'\x43foo', # key
839 b'\x5f\x03bar\xff', # indefinite length value
848 b'\x5f\x03bar\xff', # indefinite length value
840 ])
849 ])
841
850
842 with self.assertRaisesRegex(cborutil.CBORDecodeError,
851 with self.assertRaisesRegex(cborutil.CBORDecodeError,
843 'indefinite length bytestrings not '
852 'indefinite length bytestrings not '
844 'allowed as map values'):
853 'allowed as map values'):
845 cborutil.decodeall(encoded)
854 cborutil.decodeall(encoded)
846
855
847 def testpartialdecode(self):
856 def testpartialdecode(self):
848 source = {b'key1': b'value1'}
857 source = {b'key1': b'value1'}
849 encoded = b''.join(cborutil.streamencode(source))
858 encoded = b''.join(cborutil.streamencode(source))
850
859
851 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
860 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
852 (True, 1, 1, cborutil.SPECIAL_START_MAP))
861 (True, 1, 1, cborutil.SPECIAL_START_MAP))
853 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
862 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
854 (True, 1, 1, cborutil.SPECIAL_START_MAP))
863 (True, 1, 1, cborutil.SPECIAL_START_MAP))
855
864
856 source = {b'key%d' % i: None for i in range(23)}
865 source = {b'key%d' % i: None for i in range(23)}
857 encoded = b''.join(cborutil.streamencode(source))
866 encoded = b''.join(cborutil.streamencode(source))
858 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
867 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
859 (True, 23, 1, cborutil.SPECIAL_START_MAP))
868 (True, 23, 1, cborutil.SPECIAL_START_MAP))
860
869
861 source = {b'key%d' % i: None for i in range(24)}
870 source = {b'key%d' % i: None for i in range(24)}
862 encoded = b''.join(cborutil.streamencode(source))
871 encoded = b''.join(cborutil.streamencode(source))
863 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
872 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
864 (False, None, -1, cborutil.SPECIAL_NONE))
873 (False, None, -1, cborutil.SPECIAL_NONE))
865 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
874 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
866 (True, 24, 2, cborutil.SPECIAL_START_MAP))
875 (True, 24, 2, cborutil.SPECIAL_START_MAP))
867 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
876 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
868 (True, 24, 2, cborutil.SPECIAL_START_MAP))
877 (True, 24, 2, cborutil.SPECIAL_START_MAP))
869
878
870 source = {b'key%d' % i: None for i in range(256)}
879 source = {b'key%d' % i: None for i in range(256)}
871 encoded = b''.join(cborutil.streamencode(source))
880 encoded = b''.join(cborutil.streamencode(source))
872 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
881 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
873 (False, None, -2, cborutil.SPECIAL_NONE))
882 (False, None, -2, cborutil.SPECIAL_NONE))
874 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
883 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
875 (False, None, -1, cborutil.SPECIAL_NONE))
884 (False, None, -1, cborutil.SPECIAL_NONE))
876 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
885 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
877 (True, 256, 3, cborutil.SPECIAL_START_MAP))
886 (True, 256, 3, cborutil.SPECIAL_START_MAP))
878 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
887 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
879 (True, 256, 3, cborutil.SPECIAL_START_MAP))
888 (True, 256, 3, cborutil.SPECIAL_START_MAP))
880
889
881 source = {b'key%d' % i: None for i in range(65536)}
890 source = {b'key%d' % i: None for i in range(65536)}
882 encoded = b''.join(cborutil.streamencode(source))
891 encoded = b''.join(cborutil.streamencode(source))
883 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
892 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
884 (False, None, -4, cborutil.SPECIAL_NONE))
893 (False, None, -4, cborutil.SPECIAL_NONE))
885 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
894 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
886 (False, None, -3, cborutil.SPECIAL_NONE))
895 (False, None, -3, cborutil.SPECIAL_NONE))
887 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
896 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
888 (False, None, -2, cborutil.SPECIAL_NONE))
897 (False, None, -2, cborutil.SPECIAL_NONE))
889 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
898 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
890 (False, None, -1, cborutil.SPECIAL_NONE))
899 (False, None, -1, cborutil.SPECIAL_NONE))
891 self.assertEqual(cborutil.decodeitem(encoded[0:5]),
900 self.assertEqual(cborutil.decodeitem(encoded[0:5]),
892 (True, 65536, 5, cborutil.SPECIAL_START_MAP))
901 (True, 65536, 5, cborutil.SPECIAL_START_MAP))
893 self.assertEqual(cborutil.decodeitem(encoded[0:6]),
902 self.assertEqual(cborutil.decodeitem(encoded[0:6]),
894 (True, 65536, 5, cborutil.SPECIAL_START_MAP))
903 (True, 65536, 5, cborutil.SPECIAL_START_MAP))
895
904
896 class SemanticTagTests(TestCase):
905 class SemanticTagTests(TestCase):
897 def testdecodeforbidden(self):
906 def testdecodeforbidden(self):
898 for i in range(500):
907 for i in range(500):
899 if i == cborutil.SEMANTIC_TAG_FINITE_SET:
908 if i == cborutil.SEMANTIC_TAG_FINITE_SET:
900 continue
909 continue
901
910
902 tag = cborutil.encodelength(cborutil.MAJOR_TYPE_SEMANTIC,
911 tag = cborutil.encodelength(cborutil.MAJOR_TYPE_SEMANTIC,
903 i)
912 i)
904
913
905 encoded = tag + cborutil.encodelength(cborutil.MAJOR_TYPE_UINT, 42)
914 encoded = tag + cborutil.encodelength(cborutil.MAJOR_TYPE_UINT, 42)
906
915
907 # Partial decode is incomplete.
916 # Partial decode is incomplete.
908 if i < 24:
917 if i < 24:
909 pass
918 pass
910 elif i < 256:
919 elif i < 256:
911 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
920 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
912 (False, None, -1, cborutil.SPECIAL_NONE))
921 (False, None, -1, cborutil.SPECIAL_NONE))
913 elif i < 65536:
922 elif i < 65536:
914 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
923 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
915 (False, None, -2, cborutil.SPECIAL_NONE))
924 (False, None, -2, cborutil.SPECIAL_NONE))
916 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
925 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
917 (False, None, -1, cborutil.SPECIAL_NONE))
926 (False, None, -1, cborutil.SPECIAL_NONE))
918
927
919 with self.assertRaisesRegex(cborutil.CBORDecodeError,
928 with self.assertRaisesRegex(cborutil.CBORDecodeError,
920 'semantic tag \d+ not allowed'):
929 'semantic tag \d+ not allowed'):
921 cborutil.decodeitem(encoded)
930 cborutil.decodeitem(encoded)
922
931
923 class SpecialTypesTests(TestCase):
932 class SpecialTypesTests(TestCase):
924 def testforbiddentypes(self):
933 def testforbiddentypes(self):
925 for i in range(256):
934 for i in range(256):
926 if i == cborutil.SUBTYPE_FALSE:
935 if i == cborutil.SUBTYPE_FALSE:
927 continue
936 continue
928 elif i == cborutil.SUBTYPE_TRUE:
937 elif i == cborutil.SUBTYPE_TRUE:
929 continue
938 continue
930 elif i == cborutil.SUBTYPE_NULL:
939 elif i == cborutil.SUBTYPE_NULL:
931 continue
940 continue
932
941
933 encoded = cborutil.encodelength(cborutil.MAJOR_TYPE_SPECIAL, i)
942 encoded = cborutil.encodelength(cborutil.MAJOR_TYPE_SPECIAL, i)
934
943
935 with self.assertRaisesRegex(cborutil.CBORDecodeError,
944 with self.assertRaisesRegex(cborutil.CBORDecodeError,
936 'special type \d+ not allowed'):
945 'special type \d+ not allowed'):
937 cborutil.decodeitem(encoded)
946 cborutil.decodeitem(encoded)
938
947
939 class SansIODecoderTests(TestCase):
948 class SansIODecoderTests(TestCase):
940 def testemptyinput(self):
949 def testemptyinput(self):
941 decoder = cborutil.sansiodecoder()
950 decoder = cborutil.sansiodecoder()
942 self.assertEqual(decoder.decode(b''), (False, 0, 0))
951 self.assertEqual(decoder.decode(b''), (False, 0, 0))
943
952
944 class BufferingDecoderTests(TestCase):
953 class BufferingDecoderTests(TestCase):
945 def testsimple(self):
954 def testsimple(self):
946 source = [
955 source = [
947 b'foobar',
956 b'foobar',
948 b'x' * 128,
957 b'x' * 128,
949 {b'foo': b'bar'},
958 {b'foo': b'bar'},
950 True,
959 True,
951 False,
960 False,
952 None,
961 None,
953 [None for i in range(128)],
962 [None for i in range(128)],
954 ]
963 ]
955
964
956 encoded = b''.join(cborutil.streamencode(source))
965 encoded = b''.join(cborutil.streamencode(source))
957
966
958 for step in range(1, 32):
967 for step in range(1, 32):
959 decoder = cborutil.bufferingdecoder()
968 decoder = cborutil.bufferingdecoder()
960 start = 0
969 start = 0
961
970
962 while start < len(encoded):
971 while start < len(encoded):
963 decoder.decode(encoded[start:start + step])
972 decoder.decode(encoded[start:start + step])
964 start += step
973 start += step
965
974
966 self.assertEqual(decoder.getavailable(), [source])
975 self.assertEqual(decoder.getavailable(), [source])
967
976
968 def testbytearray(self):
977 def testbytearray(self):
969 source = b''.join(cborutil.streamencode(b'foobar'))
978 source = b''.join(cborutil.streamencode(b'foobar'))
970
979
971 decoder = cborutil.bufferingdecoder()
980 decoder = cborutil.bufferingdecoder()
972 decoder.decode(bytearray(source))
981 decoder.decode(bytearray(source))
973
982
974 self.assertEqual(decoder.getavailable(), [b'foobar'])
983 self.assertEqual(decoder.getavailable(), [b'foobar'])
975
984
976 class DecodeallTests(TestCase):
985 class DecodeallTests(TestCase):
977 def testemptyinput(self):
986 def testemptyinput(self):
978 self.assertEqual(cborutil.decodeall(b''), [])
987 self.assertEqual(cborutil.decodeall(b''), [])
979
988
980 def testpartialinput(self):
989 def testpartialinput(self):
981 encoded = b''.join([
990 encoded = b''.join([
982 b'\x82', # array of 2 elements
991 b'\x82', # array of 2 elements
983 b'\x01', # integer 1
992 b'\x01', # integer 1
984 ])
993 ])
985
994
986 with self.assertRaisesRegex(cborutil.CBORDecodeError,
995 with self.assertRaisesRegex(cborutil.CBORDecodeError,
987 'input data not complete'):
996 'input data not complete'):
988 cborutil.decodeall(encoded)
997 cborutil.decodeall(encoded)
989
998
990 if __name__ == '__main__':
999 if __name__ == '__main__':
991 import silenttestrunner
1000 import silenttestrunner
992 silenttestrunner.main(__name__)
1001 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now