##// END OF EJS Templates
tests: avoid using a list comprehension to fill a list with fixed values...
Matt Harbison -
r44448:6dbb18e1 default
parent child Browse files
Show More
@@ -1,1275 +1,1275 b''
1 from __future__ import absolute_import
1 from __future__ import absolute_import
2
2
3 import os
3 import os
4 import sys
4 import sys
5 import unittest
5 import unittest
6
6
7 # TODO migrate to canned cbor test strings and stop using thirdparty.cbor
7 # TODO migrate to canned cbor test strings and stop using thirdparty.cbor
8 tpp = os.path.normpath(
8 tpp = os.path.normpath(
9 os.path.join(os.path.dirname(__file__), '..', 'mercurial', 'thirdparty')
9 os.path.join(os.path.dirname(__file__), '..', 'mercurial', 'thirdparty')
10 )
10 )
11 if not os.path.exists(tpp):
11 if not os.path.exists(tpp):
12 # skip, not in a repo
12 # skip, not in a repo
13 sys.exit(80)
13 sys.exit(80)
14 sys.path[0:0] = [tpp]
14 sys.path[0:0] = [tpp]
15 import cbor
15 import cbor
16
16
17 del sys.path[0]
17 del sys.path[0]
18
18
19 from mercurial.utils import cborutil
19 from mercurial.utils import cborutil
20
20
21
21
22 class TestCase(unittest.TestCase):
22 class TestCase(unittest.TestCase):
23 if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
23 if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
24 # Python 3.7 deprecates the regex*p* version, but 2.7 lacks
24 # Python 3.7 deprecates the regex*p* version, but 2.7 lacks
25 # the regex version.
25 # the regex version.
26 assertRaisesRegex = ( # camelcase-required
26 assertRaisesRegex = ( # camelcase-required
27 unittest.TestCase.assertRaisesRegexp
27 unittest.TestCase.assertRaisesRegexp
28 )
28 )
29
29
30
30
31 def loadit(it):
31 def loadit(it):
32 return cbor.loads(b''.join(it))
32 return cbor.loads(b''.join(it))
33
33
34
34
35 class BytestringTests(TestCase):
35 class BytestringTests(TestCase):
36 def testsimple(self):
36 def testsimple(self):
37 self.assertEqual(
37 self.assertEqual(
38 list(cborutil.streamencode(b'foobar')), [b'\x46', b'foobar']
38 list(cborutil.streamencode(b'foobar')), [b'\x46', b'foobar']
39 )
39 )
40
40
41 self.assertEqual(loadit(cborutil.streamencode(b'foobar')), b'foobar')
41 self.assertEqual(loadit(cborutil.streamencode(b'foobar')), b'foobar')
42
42
43 self.assertEqual(cborutil.decodeall(b'\x46foobar'), [b'foobar'])
43 self.assertEqual(cborutil.decodeall(b'\x46foobar'), [b'foobar'])
44
44
45 self.assertEqual(
45 self.assertEqual(
46 cborutil.decodeall(b'\x46foobar\x45fizbi'), [b'foobar', b'fizbi']
46 cborutil.decodeall(b'\x46foobar\x45fizbi'), [b'foobar', b'fizbi']
47 )
47 )
48
48
49 def testlong(self):
49 def testlong(self):
50 source = b'x' * 1048576
50 source = b'x' * 1048576
51
51
52 self.assertEqual(loadit(cborutil.streamencode(source)), source)
52 self.assertEqual(loadit(cborutil.streamencode(source)), source)
53
53
54 encoded = b''.join(cborutil.streamencode(source))
54 encoded = b''.join(cborutil.streamencode(source))
55 self.assertEqual(cborutil.decodeall(encoded), [source])
55 self.assertEqual(cborutil.decodeall(encoded), [source])
56
56
57 def testfromiter(self):
57 def testfromiter(self):
58 # This is the example from RFC 7049 Section 2.2.2.
58 # This is the example from RFC 7049 Section 2.2.2.
59 source = [b'\xaa\xbb\xcc\xdd', b'\xee\xff\x99']
59 source = [b'\xaa\xbb\xcc\xdd', b'\xee\xff\x99']
60
60
61 self.assertEqual(
61 self.assertEqual(
62 list(cborutil.streamencodebytestringfromiter(source)),
62 list(cborutil.streamencodebytestringfromiter(source)),
63 [
63 [
64 b'\x5f',
64 b'\x5f',
65 b'\x44',
65 b'\x44',
66 b'\xaa\xbb\xcc\xdd',
66 b'\xaa\xbb\xcc\xdd',
67 b'\x43',
67 b'\x43',
68 b'\xee\xff\x99',
68 b'\xee\xff\x99',
69 b'\xff',
69 b'\xff',
70 ],
70 ],
71 )
71 )
72
72
73 self.assertEqual(
73 self.assertEqual(
74 loadit(cborutil.streamencodebytestringfromiter(source)),
74 loadit(cborutil.streamencodebytestringfromiter(source)),
75 b''.join(source),
75 b''.join(source),
76 )
76 )
77
77
78 self.assertEqual(
78 self.assertEqual(
79 cborutil.decodeall(
79 cborutil.decodeall(
80 b'\x5f\x44\xaa\xbb\xcc\xdd' b'\x43\xee\xff\x99\xff'
80 b'\x5f\x44\xaa\xbb\xcc\xdd' b'\x43\xee\xff\x99\xff'
81 ),
81 ),
82 [b'\xaa\xbb\xcc\xdd', b'\xee\xff\x99', b''],
82 [b'\xaa\xbb\xcc\xdd', b'\xee\xff\x99', b''],
83 )
83 )
84
84
85 for i, chunk in enumerate(
85 for i, chunk in enumerate(
86 cborutil.decodeall(
86 cborutil.decodeall(
87 b'\x5f\x44\xaa\xbb\xcc\xdd' b'\x43\xee\xff\x99\xff'
87 b'\x5f\x44\xaa\xbb\xcc\xdd' b'\x43\xee\xff\x99\xff'
88 )
88 )
89 ):
89 ):
90 self.assertIsInstance(chunk, cborutil.bytestringchunk)
90 self.assertIsInstance(chunk, cborutil.bytestringchunk)
91
91
92 if i == 0:
92 if i == 0:
93 self.assertTrue(chunk.isfirst)
93 self.assertTrue(chunk.isfirst)
94 else:
94 else:
95 self.assertFalse(chunk.isfirst)
95 self.assertFalse(chunk.isfirst)
96
96
97 if i == 2:
97 if i == 2:
98 self.assertTrue(chunk.islast)
98 self.assertTrue(chunk.islast)
99 else:
99 else:
100 self.assertFalse(chunk.islast)
100 self.assertFalse(chunk.islast)
101
101
102 def testfromiterlarge(self):
102 def testfromiterlarge(self):
103 source = [b'a' * 16, b'b' * 128, b'c' * 1024, b'd' * 1048576]
103 source = [b'a' * 16, b'b' * 128, b'c' * 1024, b'd' * 1048576]
104
104
105 self.assertEqual(
105 self.assertEqual(
106 loadit(cborutil.streamencodebytestringfromiter(source)),
106 loadit(cborutil.streamencodebytestringfromiter(source)),
107 b''.join(source),
107 b''.join(source),
108 )
108 )
109
109
110 def testindefinite(self):
110 def testindefinite(self):
111 source = b'\x00\x01\x02\x03' + b'\xff' * 16384
111 source = b'\x00\x01\x02\x03' + b'\xff' * 16384
112
112
113 it = cborutil.streamencodeindefinitebytestring(source, chunksize=2)
113 it = cborutil.streamencodeindefinitebytestring(source, chunksize=2)
114
114
115 self.assertEqual(next(it), b'\x5f')
115 self.assertEqual(next(it), b'\x5f')
116 self.assertEqual(next(it), b'\x42')
116 self.assertEqual(next(it), b'\x42')
117 self.assertEqual(next(it), b'\x00\x01')
117 self.assertEqual(next(it), b'\x00\x01')
118 self.assertEqual(next(it), b'\x42')
118 self.assertEqual(next(it), b'\x42')
119 self.assertEqual(next(it), b'\x02\x03')
119 self.assertEqual(next(it), b'\x02\x03')
120 self.assertEqual(next(it), b'\x42')
120 self.assertEqual(next(it), b'\x42')
121 self.assertEqual(next(it), b'\xff\xff')
121 self.assertEqual(next(it), b'\xff\xff')
122
122
123 dest = b''.join(
123 dest = b''.join(
124 cborutil.streamencodeindefinitebytestring(source, chunksize=42)
124 cborutil.streamencodeindefinitebytestring(source, chunksize=42)
125 )
125 )
126 self.assertEqual(cbor.loads(dest), source)
126 self.assertEqual(cbor.loads(dest), source)
127
127
128 self.assertEqual(b''.join(cborutil.decodeall(dest)), source)
128 self.assertEqual(b''.join(cborutil.decodeall(dest)), source)
129
129
130 for chunk in cborutil.decodeall(dest):
130 for chunk in cborutil.decodeall(dest):
131 self.assertIsInstance(chunk, cborutil.bytestringchunk)
131 self.assertIsInstance(chunk, cborutil.bytestringchunk)
132 self.assertIn(len(chunk), (0, 8, 42))
132 self.assertIn(len(chunk), (0, 8, 42))
133
133
134 encoded = b'\x5f\xff'
134 encoded = b'\x5f\xff'
135 b = cborutil.decodeall(encoded)
135 b = cborutil.decodeall(encoded)
136 self.assertEqual(b, [b''])
136 self.assertEqual(b, [b''])
137 self.assertTrue(b[0].isfirst)
137 self.assertTrue(b[0].isfirst)
138 self.assertTrue(b[0].islast)
138 self.assertTrue(b[0].islast)
139
139
140 def testdecodevariouslengths(self):
140 def testdecodevariouslengths(self):
141 for i in (0, 1, 22, 23, 24, 25, 254, 255, 256, 65534, 65535, 65536):
141 for i in (0, 1, 22, 23, 24, 25, 254, 255, 256, 65534, 65535, 65536):
142 source = b'x' * i
142 source = b'x' * i
143 encoded = b''.join(cborutil.streamencode(source))
143 encoded = b''.join(cborutil.streamencode(source))
144
144
145 if len(source) < 24:
145 if len(source) < 24:
146 hlen = 1
146 hlen = 1
147 elif len(source) < 256:
147 elif len(source) < 256:
148 hlen = 2
148 hlen = 2
149 elif len(source) < 65536:
149 elif len(source) < 65536:
150 hlen = 3
150 hlen = 3
151 elif len(source) < 1048576:
151 elif len(source) < 1048576:
152 hlen = 5
152 hlen = 5
153
153
154 self.assertEqual(
154 self.assertEqual(
155 cborutil.decodeitem(encoded),
155 cborutil.decodeitem(encoded),
156 (True, source, hlen + len(source), cborutil.SPECIAL_NONE),
156 (True, source, hlen + len(source), cborutil.SPECIAL_NONE),
157 )
157 )
158
158
159 def testpartialdecode(self):
159 def testpartialdecode(self):
160 encoded = b''.join(cborutil.streamencode(b'foobar'))
160 encoded = b''.join(cborutil.streamencode(b'foobar'))
161
161
162 self.assertEqual(
162 self.assertEqual(
163 cborutil.decodeitem(encoded[0:1]),
163 cborutil.decodeitem(encoded[0:1]),
164 (False, None, -6, cborutil.SPECIAL_NONE),
164 (False, None, -6, cborutil.SPECIAL_NONE),
165 )
165 )
166 self.assertEqual(
166 self.assertEqual(
167 cborutil.decodeitem(encoded[0:2]),
167 cborutil.decodeitem(encoded[0:2]),
168 (False, None, -5, cborutil.SPECIAL_NONE),
168 (False, None, -5, cborutil.SPECIAL_NONE),
169 )
169 )
170 self.assertEqual(
170 self.assertEqual(
171 cborutil.decodeitem(encoded[0:3]),
171 cborutil.decodeitem(encoded[0:3]),
172 (False, None, -4, cborutil.SPECIAL_NONE),
172 (False, None, -4, cborutil.SPECIAL_NONE),
173 )
173 )
174 self.assertEqual(
174 self.assertEqual(
175 cborutil.decodeitem(encoded[0:4]),
175 cborutil.decodeitem(encoded[0:4]),
176 (False, None, -3, cborutil.SPECIAL_NONE),
176 (False, None, -3, cborutil.SPECIAL_NONE),
177 )
177 )
178 self.assertEqual(
178 self.assertEqual(
179 cborutil.decodeitem(encoded[0:5]),
179 cborutil.decodeitem(encoded[0:5]),
180 (False, None, -2, cborutil.SPECIAL_NONE),
180 (False, None, -2, cborutil.SPECIAL_NONE),
181 )
181 )
182 self.assertEqual(
182 self.assertEqual(
183 cborutil.decodeitem(encoded[0:6]),
183 cborutil.decodeitem(encoded[0:6]),
184 (False, None, -1, cborutil.SPECIAL_NONE),
184 (False, None, -1, cborutil.SPECIAL_NONE),
185 )
185 )
186 self.assertEqual(
186 self.assertEqual(
187 cborutil.decodeitem(encoded[0:7]),
187 cborutil.decodeitem(encoded[0:7]),
188 (True, b'foobar', 7, cborutil.SPECIAL_NONE),
188 (True, b'foobar', 7, cborutil.SPECIAL_NONE),
189 )
189 )
190
190
191 def testpartialdecodevariouslengths(self):
191 def testpartialdecodevariouslengths(self):
192 lens = [
192 lens = [
193 2,
193 2,
194 3,
194 3,
195 10,
195 10,
196 23,
196 23,
197 24,
197 24,
198 25,
198 25,
199 31,
199 31,
200 100,
200 100,
201 254,
201 254,
202 255,
202 255,
203 256,
203 256,
204 257,
204 257,
205 16384,
205 16384,
206 65534,
206 65534,
207 65535,
207 65535,
208 65536,
208 65536,
209 65537,
209 65537,
210 131071,
210 131071,
211 131072,
211 131072,
212 131073,
212 131073,
213 1048575,
213 1048575,
214 1048576,
214 1048576,
215 1048577,
215 1048577,
216 ]
216 ]
217
217
218 for size in lens:
218 for size in lens:
219 if size < 24:
219 if size < 24:
220 hlen = 1
220 hlen = 1
221 elif size < 2 ** 8:
221 elif size < 2 ** 8:
222 hlen = 2
222 hlen = 2
223 elif size < 2 ** 16:
223 elif size < 2 ** 16:
224 hlen = 3
224 hlen = 3
225 elif size < 2 ** 32:
225 elif size < 2 ** 32:
226 hlen = 5
226 hlen = 5
227 else:
227 else:
228 assert False
228 assert False
229
229
230 source = b'x' * size
230 source = b'x' * size
231 encoded = b''.join(cborutil.streamencode(source))
231 encoded = b''.join(cborutil.streamencode(source))
232
232
233 res = cborutil.decodeitem(encoded[0:1])
233 res = cborutil.decodeitem(encoded[0:1])
234
234
235 if hlen > 1:
235 if hlen > 1:
236 self.assertEqual(
236 self.assertEqual(
237 res, (False, None, -(hlen - 1), cborutil.SPECIAL_NONE)
237 res, (False, None, -(hlen - 1), cborutil.SPECIAL_NONE)
238 )
238 )
239 else:
239 else:
240 self.assertEqual(
240 self.assertEqual(
241 res,
241 res,
242 (False, None, -(size + hlen - 1), cborutil.SPECIAL_NONE),
242 (False, None, -(size + hlen - 1), cborutil.SPECIAL_NONE),
243 )
243 )
244
244
245 # Decoding partial header reports remaining header size.
245 # Decoding partial header reports remaining header size.
246 for i in range(hlen - 1):
246 for i in range(hlen - 1):
247 self.assertEqual(
247 self.assertEqual(
248 cborutil.decodeitem(encoded[0 : i + 1]),
248 cborutil.decodeitem(encoded[0 : i + 1]),
249 (False, None, -(hlen - i - 1), cborutil.SPECIAL_NONE),
249 (False, None, -(hlen - i - 1), cborutil.SPECIAL_NONE),
250 )
250 )
251
251
252 # Decoding complete header reports item size.
252 # Decoding complete header reports item size.
253 self.assertEqual(
253 self.assertEqual(
254 cborutil.decodeitem(encoded[0:hlen]),
254 cborutil.decodeitem(encoded[0:hlen]),
255 (False, None, -size, cborutil.SPECIAL_NONE),
255 (False, None, -size, cborutil.SPECIAL_NONE),
256 )
256 )
257
257
258 # Decoding single byte after header reports item size - 1
258 # Decoding single byte after header reports item size - 1
259 self.assertEqual(
259 self.assertEqual(
260 cborutil.decodeitem(encoded[0 : hlen + 1]),
260 cborutil.decodeitem(encoded[0 : hlen + 1]),
261 (False, None, -(size - 1), cborutil.SPECIAL_NONE),
261 (False, None, -(size - 1), cborutil.SPECIAL_NONE),
262 )
262 )
263
263
264 # Decoding all but the last byte reports -1 needed.
264 # Decoding all but the last byte reports -1 needed.
265 self.assertEqual(
265 self.assertEqual(
266 cborutil.decodeitem(encoded[0 : hlen + size - 1]),
266 cborutil.decodeitem(encoded[0 : hlen + size - 1]),
267 (False, None, -1, cborutil.SPECIAL_NONE),
267 (False, None, -1, cborutil.SPECIAL_NONE),
268 )
268 )
269
269
270 # Decoding last byte retrieves value.
270 # Decoding last byte retrieves value.
271 self.assertEqual(
271 self.assertEqual(
272 cborutil.decodeitem(encoded[0 : hlen + size]),
272 cborutil.decodeitem(encoded[0 : hlen + size]),
273 (True, source, hlen + size, cborutil.SPECIAL_NONE),
273 (True, source, hlen + size, cborutil.SPECIAL_NONE),
274 )
274 )
275
275
276 def testindefinitepartialdecode(self):
276 def testindefinitepartialdecode(self):
277 encoded = b''.join(
277 encoded = b''.join(
278 cborutil.streamencodebytestringfromiter([b'foobar', b'biz'])
278 cborutil.streamencodebytestringfromiter([b'foobar', b'biz'])
279 )
279 )
280
280
281 # First item should be begin of bytestring special.
281 # First item should be begin of bytestring special.
282 self.assertEqual(
282 self.assertEqual(
283 cborutil.decodeitem(encoded[0:1]),
283 cborutil.decodeitem(encoded[0:1]),
284 (True, None, 1, cborutil.SPECIAL_START_INDEFINITE_BYTESTRING),
284 (True, None, 1, cborutil.SPECIAL_START_INDEFINITE_BYTESTRING),
285 )
285 )
286
286
287 # Second item should be the first chunk. But only available when
287 # Second item should be the first chunk. But only available when
288 # we give it 7 bytes (1 byte header + 6 byte chunk).
288 # we give it 7 bytes (1 byte header + 6 byte chunk).
289 self.assertEqual(
289 self.assertEqual(
290 cborutil.decodeitem(encoded[1:2]),
290 cborutil.decodeitem(encoded[1:2]),
291 (False, None, -6, cborutil.SPECIAL_NONE),
291 (False, None, -6, cborutil.SPECIAL_NONE),
292 )
292 )
293 self.assertEqual(
293 self.assertEqual(
294 cborutil.decodeitem(encoded[1:3]),
294 cborutil.decodeitem(encoded[1:3]),
295 (False, None, -5, cborutil.SPECIAL_NONE),
295 (False, None, -5, cborutil.SPECIAL_NONE),
296 )
296 )
297 self.assertEqual(
297 self.assertEqual(
298 cborutil.decodeitem(encoded[1:4]),
298 cborutil.decodeitem(encoded[1:4]),
299 (False, None, -4, cborutil.SPECIAL_NONE),
299 (False, None, -4, cborutil.SPECIAL_NONE),
300 )
300 )
301 self.assertEqual(
301 self.assertEqual(
302 cborutil.decodeitem(encoded[1:5]),
302 cborutil.decodeitem(encoded[1:5]),
303 (False, None, -3, cborutil.SPECIAL_NONE),
303 (False, None, -3, cborutil.SPECIAL_NONE),
304 )
304 )
305 self.assertEqual(
305 self.assertEqual(
306 cborutil.decodeitem(encoded[1:6]),
306 cborutil.decodeitem(encoded[1:6]),
307 (False, None, -2, cborutil.SPECIAL_NONE),
307 (False, None, -2, cborutil.SPECIAL_NONE),
308 )
308 )
309 self.assertEqual(
309 self.assertEqual(
310 cborutil.decodeitem(encoded[1:7]),
310 cborutil.decodeitem(encoded[1:7]),
311 (False, None, -1, cborutil.SPECIAL_NONE),
311 (False, None, -1, cborutil.SPECIAL_NONE),
312 )
312 )
313
313
314 self.assertEqual(
314 self.assertEqual(
315 cborutil.decodeitem(encoded[1:8]),
315 cborutil.decodeitem(encoded[1:8]),
316 (True, b'foobar', 7, cborutil.SPECIAL_NONE),
316 (True, b'foobar', 7, cborutil.SPECIAL_NONE),
317 )
317 )
318
318
319 # Third item should be second chunk. But only available when
319 # Third item should be second chunk. But only available when
320 # we give it 4 bytes (1 byte header + 3 byte chunk).
320 # we give it 4 bytes (1 byte header + 3 byte chunk).
321 self.assertEqual(
321 self.assertEqual(
322 cborutil.decodeitem(encoded[8:9]),
322 cborutil.decodeitem(encoded[8:9]),
323 (False, None, -3, cborutil.SPECIAL_NONE),
323 (False, None, -3, cborutil.SPECIAL_NONE),
324 )
324 )
325 self.assertEqual(
325 self.assertEqual(
326 cborutil.decodeitem(encoded[8:10]),
326 cborutil.decodeitem(encoded[8:10]),
327 (False, None, -2, cborutil.SPECIAL_NONE),
327 (False, None, -2, cborutil.SPECIAL_NONE),
328 )
328 )
329 self.assertEqual(
329 self.assertEqual(
330 cborutil.decodeitem(encoded[8:11]),
330 cborutil.decodeitem(encoded[8:11]),
331 (False, None, -1, cborutil.SPECIAL_NONE),
331 (False, None, -1, cborutil.SPECIAL_NONE),
332 )
332 )
333
333
334 self.assertEqual(
334 self.assertEqual(
335 cborutil.decodeitem(encoded[8:12]),
335 cborutil.decodeitem(encoded[8:12]),
336 (True, b'biz', 4, cborutil.SPECIAL_NONE),
336 (True, b'biz', 4, cborutil.SPECIAL_NONE),
337 )
337 )
338
338
339 # Fourth item should be end of indefinite stream marker.
339 # Fourth item should be end of indefinite stream marker.
340 self.assertEqual(
340 self.assertEqual(
341 cborutil.decodeitem(encoded[12:13]),
341 cborutil.decodeitem(encoded[12:13]),
342 (True, None, 1, cborutil.SPECIAL_INDEFINITE_BREAK),
342 (True, None, 1, cborutil.SPECIAL_INDEFINITE_BREAK),
343 )
343 )
344
344
345 # Now test the behavior when going through the decoder.
345 # Now test the behavior when going through the decoder.
346
346
347 self.assertEqual(
347 self.assertEqual(
348 cborutil.sansiodecoder().decode(encoded[0:1]), (False, 1, 0)
348 cborutil.sansiodecoder().decode(encoded[0:1]), (False, 1, 0)
349 )
349 )
350 self.assertEqual(
350 self.assertEqual(
351 cborutil.sansiodecoder().decode(encoded[0:2]), (False, 1, 6)
351 cborutil.sansiodecoder().decode(encoded[0:2]), (False, 1, 6)
352 )
352 )
353 self.assertEqual(
353 self.assertEqual(
354 cborutil.sansiodecoder().decode(encoded[0:3]), (False, 1, 5)
354 cborutil.sansiodecoder().decode(encoded[0:3]), (False, 1, 5)
355 )
355 )
356 self.assertEqual(
356 self.assertEqual(
357 cborutil.sansiodecoder().decode(encoded[0:4]), (False, 1, 4)
357 cborutil.sansiodecoder().decode(encoded[0:4]), (False, 1, 4)
358 )
358 )
359 self.assertEqual(
359 self.assertEqual(
360 cborutil.sansiodecoder().decode(encoded[0:5]), (False, 1, 3)
360 cborutil.sansiodecoder().decode(encoded[0:5]), (False, 1, 3)
361 )
361 )
362 self.assertEqual(
362 self.assertEqual(
363 cborutil.sansiodecoder().decode(encoded[0:6]), (False, 1, 2)
363 cborutil.sansiodecoder().decode(encoded[0:6]), (False, 1, 2)
364 )
364 )
365 self.assertEqual(
365 self.assertEqual(
366 cborutil.sansiodecoder().decode(encoded[0:7]), (False, 1, 1)
366 cborutil.sansiodecoder().decode(encoded[0:7]), (False, 1, 1)
367 )
367 )
368 self.assertEqual(
368 self.assertEqual(
369 cborutil.sansiodecoder().decode(encoded[0:8]), (True, 8, 0)
369 cborutil.sansiodecoder().decode(encoded[0:8]), (True, 8, 0)
370 )
370 )
371
371
372 self.assertEqual(
372 self.assertEqual(
373 cborutil.sansiodecoder().decode(encoded[0:9]), (True, 8, 3)
373 cborutil.sansiodecoder().decode(encoded[0:9]), (True, 8, 3)
374 )
374 )
375 self.assertEqual(
375 self.assertEqual(
376 cborutil.sansiodecoder().decode(encoded[0:10]), (True, 8, 2)
376 cborutil.sansiodecoder().decode(encoded[0:10]), (True, 8, 2)
377 )
377 )
378 self.assertEqual(
378 self.assertEqual(
379 cborutil.sansiodecoder().decode(encoded[0:11]), (True, 8, 1)
379 cborutil.sansiodecoder().decode(encoded[0:11]), (True, 8, 1)
380 )
380 )
381 self.assertEqual(
381 self.assertEqual(
382 cborutil.sansiodecoder().decode(encoded[0:12]), (True, 12, 0)
382 cborutil.sansiodecoder().decode(encoded[0:12]), (True, 12, 0)
383 )
383 )
384
384
385 self.assertEqual(
385 self.assertEqual(
386 cborutil.sansiodecoder().decode(encoded[0:13]), (True, 13, 0)
386 cborutil.sansiodecoder().decode(encoded[0:13]), (True, 13, 0)
387 )
387 )
388
388
389 decoder = cborutil.sansiodecoder()
389 decoder = cborutil.sansiodecoder()
390 decoder.decode(encoded[0:8])
390 decoder.decode(encoded[0:8])
391 values = decoder.getavailable()
391 values = decoder.getavailable()
392 self.assertEqual(values, [b'foobar'])
392 self.assertEqual(values, [b'foobar'])
393 self.assertTrue(values[0].isfirst)
393 self.assertTrue(values[0].isfirst)
394 self.assertFalse(values[0].islast)
394 self.assertFalse(values[0].islast)
395
395
396 self.assertEqual(decoder.decode(encoded[8:12]), (True, 4, 0))
396 self.assertEqual(decoder.decode(encoded[8:12]), (True, 4, 0))
397 values = decoder.getavailable()
397 values = decoder.getavailable()
398 self.assertEqual(values, [b'biz'])
398 self.assertEqual(values, [b'biz'])
399 self.assertFalse(values[0].isfirst)
399 self.assertFalse(values[0].isfirst)
400 self.assertFalse(values[0].islast)
400 self.assertFalse(values[0].islast)
401
401
402 self.assertEqual(decoder.decode(encoded[12:]), (True, 1, 0))
402 self.assertEqual(decoder.decode(encoded[12:]), (True, 1, 0))
403 values = decoder.getavailable()
403 values = decoder.getavailable()
404 self.assertEqual(values, [b''])
404 self.assertEqual(values, [b''])
405 self.assertFalse(values[0].isfirst)
405 self.assertFalse(values[0].isfirst)
406 self.assertTrue(values[0].islast)
406 self.assertTrue(values[0].islast)
407
407
408
408
409 class StringTests(TestCase):
409 class StringTests(TestCase):
410 def testdecodeforbidden(self):
410 def testdecodeforbidden(self):
411 encoded = b'\x63foo'
411 encoded = b'\x63foo'
412 with self.assertRaisesRegex(
412 with self.assertRaisesRegex(
413 cborutil.CBORDecodeError, 'string major type not supported'
413 cborutil.CBORDecodeError, 'string major type not supported'
414 ):
414 ):
415 cborutil.decodeall(encoded)
415 cborutil.decodeall(encoded)
416
416
417
417
418 class IntTests(TestCase):
418 class IntTests(TestCase):
419 def testsmall(self):
419 def testsmall(self):
420 self.assertEqual(list(cborutil.streamencode(0)), [b'\x00'])
420 self.assertEqual(list(cborutil.streamencode(0)), [b'\x00'])
421 self.assertEqual(cborutil.decodeall(b'\x00'), [0])
421 self.assertEqual(cborutil.decodeall(b'\x00'), [0])
422
422
423 self.assertEqual(list(cborutil.streamencode(1)), [b'\x01'])
423 self.assertEqual(list(cborutil.streamencode(1)), [b'\x01'])
424 self.assertEqual(cborutil.decodeall(b'\x01'), [1])
424 self.assertEqual(cborutil.decodeall(b'\x01'), [1])
425
425
426 self.assertEqual(list(cborutil.streamencode(2)), [b'\x02'])
426 self.assertEqual(list(cborutil.streamencode(2)), [b'\x02'])
427 self.assertEqual(cborutil.decodeall(b'\x02'), [2])
427 self.assertEqual(cborutil.decodeall(b'\x02'), [2])
428
428
429 self.assertEqual(list(cborutil.streamencode(3)), [b'\x03'])
429 self.assertEqual(list(cborutil.streamencode(3)), [b'\x03'])
430 self.assertEqual(cborutil.decodeall(b'\x03'), [3])
430 self.assertEqual(cborutil.decodeall(b'\x03'), [3])
431
431
432 self.assertEqual(list(cborutil.streamencode(4)), [b'\x04'])
432 self.assertEqual(list(cborutil.streamencode(4)), [b'\x04'])
433 self.assertEqual(cborutil.decodeall(b'\x04'), [4])
433 self.assertEqual(cborutil.decodeall(b'\x04'), [4])
434
434
435 # Multiple value decode works.
435 # Multiple value decode works.
436 self.assertEqual(
436 self.assertEqual(
437 cborutil.decodeall(b'\x00\x01\x02\x03\x04'), [0, 1, 2, 3, 4]
437 cborutil.decodeall(b'\x00\x01\x02\x03\x04'), [0, 1, 2, 3, 4]
438 )
438 )
439
439
440 def testnegativesmall(self):
440 def testnegativesmall(self):
441 self.assertEqual(list(cborutil.streamencode(-1)), [b'\x20'])
441 self.assertEqual(list(cborutil.streamencode(-1)), [b'\x20'])
442 self.assertEqual(cborutil.decodeall(b'\x20'), [-1])
442 self.assertEqual(cborutil.decodeall(b'\x20'), [-1])
443
443
444 self.assertEqual(list(cborutil.streamencode(-2)), [b'\x21'])
444 self.assertEqual(list(cborutil.streamencode(-2)), [b'\x21'])
445 self.assertEqual(cborutil.decodeall(b'\x21'), [-2])
445 self.assertEqual(cborutil.decodeall(b'\x21'), [-2])
446
446
447 self.assertEqual(list(cborutil.streamencode(-3)), [b'\x22'])
447 self.assertEqual(list(cborutil.streamencode(-3)), [b'\x22'])
448 self.assertEqual(cborutil.decodeall(b'\x22'), [-3])
448 self.assertEqual(cborutil.decodeall(b'\x22'), [-3])
449
449
450 self.assertEqual(list(cborutil.streamencode(-4)), [b'\x23'])
450 self.assertEqual(list(cborutil.streamencode(-4)), [b'\x23'])
451 self.assertEqual(cborutil.decodeall(b'\x23'), [-4])
451 self.assertEqual(cborutil.decodeall(b'\x23'), [-4])
452
452
453 self.assertEqual(list(cborutil.streamencode(-5)), [b'\x24'])
453 self.assertEqual(list(cborutil.streamencode(-5)), [b'\x24'])
454 self.assertEqual(cborutil.decodeall(b'\x24'), [-5])
454 self.assertEqual(cborutil.decodeall(b'\x24'), [-5])
455
455
456 # Multiple value decode works.
456 # Multiple value decode works.
457 self.assertEqual(
457 self.assertEqual(
458 cborutil.decodeall(b'\x20\x21\x22\x23\x24'), [-1, -2, -3, -4, -5]
458 cborutil.decodeall(b'\x20\x21\x22\x23\x24'), [-1, -2, -3, -4, -5]
459 )
459 )
460
460
461 def testrange(self):
461 def testrange(self):
462 for i in range(-70000, 70000, 10):
462 for i in range(-70000, 70000, 10):
463 encoded = b''.join(cborutil.streamencode(i))
463 encoded = b''.join(cborutil.streamencode(i))
464
464
465 self.assertEqual(encoded, cbor.dumps(i))
465 self.assertEqual(encoded, cbor.dumps(i))
466 self.assertEqual(cborutil.decodeall(encoded), [i])
466 self.assertEqual(cborutil.decodeall(encoded), [i])
467
467
468 def testdecodepartialubyte(self):
468 def testdecodepartialubyte(self):
469 encoded = b''.join(cborutil.streamencode(250))
469 encoded = b''.join(cborutil.streamencode(250))
470
470
471 self.assertEqual(
471 self.assertEqual(
472 cborutil.decodeitem(encoded[0:1]),
472 cborutil.decodeitem(encoded[0:1]),
473 (False, None, -1, cborutil.SPECIAL_NONE),
473 (False, None, -1, cborutil.SPECIAL_NONE),
474 )
474 )
475 self.assertEqual(
475 self.assertEqual(
476 cborutil.decodeitem(encoded[0:2]),
476 cborutil.decodeitem(encoded[0:2]),
477 (True, 250, 2, cborutil.SPECIAL_NONE),
477 (True, 250, 2, cborutil.SPECIAL_NONE),
478 )
478 )
479
479
480 def testdecodepartialbyte(self):
480 def testdecodepartialbyte(self):
481 encoded = b''.join(cborutil.streamencode(-42))
481 encoded = b''.join(cborutil.streamencode(-42))
482 self.assertEqual(
482 self.assertEqual(
483 cborutil.decodeitem(encoded[0:1]),
483 cborutil.decodeitem(encoded[0:1]),
484 (False, None, -1, cborutil.SPECIAL_NONE),
484 (False, None, -1, cborutil.SPECIAL_NONE),
485 )
485 )
486 self.assertEqual(
486 self.assertEqual(
487 cborutil.decodeitem(encoded[0:2]),
487 cborutil.decodeitem(encoded[0:2]),
488 (True, -42, 2, cborutil.SPECIAL_NONE),
488 (True, -42, 2, cborutil.SPECIAL_NONE),
489 )
489 )
490
490
491 def testdecodepartialushort(self):
491 def testdecodepartialushort(self):
492 encoded = b''.join(cborutil.streamencode(2 ** 15))
492 encoded = b''.join(cborutil.streamencode(2 ** 15))
493
493
494 self.assertEqual(
494 self.assertEqual(
495 cborutil.decodeitem(encoded[0:1]),
495 cborutil.decodeitem(encoded[0:1]),
496 (False, None, -2, cborutil.SPECIAL_NONE),
496 (False, None, -2, cborutil.SPECIAL_NONE),
497 )
497 )
498 self.assertEqual(
498 self.assertEqual(
499 cborutil.decodeitem(encoded[0:2]),
499 cborutil.decodeitem(encoded[0:2]),
500 (False, None, -1, cborutil.SPECIAL_NONE),
500 (False, None, -1, cborutil.SPECIAL_NONE),
501 )
501 )
502 self.assertEqual(
502 self.assertEqual(
503 cborutil.decodeitem(encoded[0:5]),
503 cborutil.decodeitem(encoded[0:5]),
504 (True, 2 ** 15, 3, cborutil.SPECIAL_NONE),
504 (True, 2 ** 15, 3, cborutil.SPECIAL_NONE),
505 )
505 )
506
506
507 def testdecodepartialshort(self):
507 def testdecodepartialshort(self):
508 encoded = b''.join(cborutil.streamencode(-1024))
508 encoded = b''.join(cborutil.streamencode(-1024))
509
509
510 self.assertEqual(
510 self.assertEqual(
511 cborutil.decodeitem(encoded[0:1]),
511 cborutil.decodeitem(encoded[0:1]),
512 (False, None, -2, cborutil.SPECIAL_NONE),
512 (False, None, -2, cborutil.SPECIAL_NONE),
513 )
513 )
514 self.assertEqual(
514 self.assertEqual(
515 cborutil.decodeitem(encoded[0:2]),
515 cborutil.decodeitem(encoded[0:2]),
516 (False, None, -1, cborutil.SPECIAL_NONE),
516 (False, None, -1, cborutil.SPECIAL_NONE),
517 )
517 )
518 self.assertEqual(
518 self.assertEqual(
519 cborutil.decodeitem(encoded[0:3]),
519 cborutil.decodeitem(encoded[0:3]),
520 (True, -1024, 3, cborutil.SPECIAL_NONE),
520 (True, -1024, 3, cborutil.SPECIAL_NONE),
521 )
521 )
522
522
523 def testdecodepartialulong(self):
523 def testdecodepartialulong(self):
524 encoded = b''.join(cborutil.streamencode(2 ** 28))
524 encoded = b''.join(cborutil.streamencode(2 ** 28))
525
525
526 self.assertEqual(
526 self.assertEqual(
527 cborutil.decodeitem(encoded[0:1]),
527 cborutil.decodeitem(encoded[0:1]),
528 (False, None, -4, cborutil.SPECIAL_NONE),
528 (False, None, -4, cborutil.SPECIAL_NONE),
529 )
529 )
530 self.assertEqual(
530 self.assertEqual(
531 cborutil.decodeitem(encoded[0:2]),
531 cborutil.decodeitem(encoded[0:2]),
532 (False, None, -3, cborutil.SPECIAL_NONE),
532 (False, None, -3, cborutil.SPECIAL_NONE),
533 )
533 )
534 self.assertEqual(
534 self.assertEqual(
535 cborutil.decodeitem(encoded[0:3]),
535 cborutil.decodeitem(encoded[0:3]),
536 (False, None, -2, cborutil.SPECIAL_NONE),
536 (False, None, -2, cborutil.SPECIAL_NONE),
537 )
537 )
538 self.assertEqual(
538 self.assertEqual(
539 cborutil.decodeitem(encoded[0:4]),
539 cborutil.decodeitem(encoded[0:4]),
540 (False, None, -1, cborutil.SPECIAL_NONE),
540 (False, None, -1, cborutil.SPECIAL_NONE),
541 )
541 )
542 self.assertEqual(
542 self.assertEqual(
543 cborutil.decodeitem(encoded[0:5]),
543 cborutil.decodeitem(encoded[0:5]),
544 (True, 2 ** 28, 5, cborutil.SPECIAL_NONE),
544 (True, 2 ** 28, 5, cborutil.SPECIAL_NONE),
545 )
545 )
546
546
547 def testdecodepartiallong(self):
547 def testdecodepartiallong(self):
548 encoded = b''.join(cborutil.streamencode(-1048580))
548 encoded = b''.join(cborutil.streamencode(-1048580))
549
549
550 self.assertEqual(
550 self.assertEqual(
551 cborutil.decodeitem(encoded[0:1]),
551 cborutil.decodeitem(encoded[0:1]),
552 (False, None, -4, cborutil.SPECIAL_NONE),
552 (False, None, -4, cborutil.SPECIAL_NONE),
553 )
553 )
554 self.assertEqual(
554 self.assertEqual(
555 cborutil.decodeitem(encoded[0:2]),
555 cborutil.decodeitem(encoded[0:2]),
556 (False, None, -3, cborutil.SPECIAL_NONE),
556 (False, None, -3, cborutil.SPECIAL_NONE),
557 )
557 )
558 self.assertEqual(
558 self.assertEqual(
559 cborutil.decodeitem(encoded[0:3]),
559 cborutil.decodeitem(encoded[0:3]),
560 (False, None, -2, cborutil.SPECIAL_NONE),
560 (False, None, -2, cborutil.SPECIAL_NONE),
561 )
561 )
562 self.assertEqual(
562 self.assertEqual(
563 cborutil.decodeitem(encoded[0:4]),
563 cborutil.decodeitem(encoded[0:4]),
564 (False, None, -1, cborutil.SPECIAL_NONE),
564 (False, None, -1, cborutil.SPECIAL_NONE),
565 )
565 )
566 self.assertEqual(
566 self.assertEqual(
567 cborutil.decodeitem(encoded[0:5]),
567 cborutil.decodeitem(encoded[0:5]),
568 (True, -1048580, 5, cborutil.SPECIAL_NONE),
568 (True, -1048580, 5, cborutil.SPECIAL_NONE),
569 )
569 )
570
570
571 def testdecodepartialulonglong(self):
571 def testdecodepartialulonglong(self):
572 encoded = b''.join(cborutil.streamencode(2 ** 32))
572 encoded = b''.join(cborutil.streamencode(2 ** 32))
573
573
574 self.assertEqual(
574 self.assertEqual(
575 cborutil.decodeitem(encoded[0:1]),
575 cborutil.decodeitem(encoded[0:1]),
576 (False, None, -8, cborutil.SPECIAL_NONE),
576 (False, None, -8, cborutil.SPECIAL_NONE),
577 )
577 )
578 self.assertEqual(
578 self.assertEqual(
579 cborutil.decodeitem(encoded[0:2]),
579 cborutil.decodeitem(encoded[0:2]),
580 (False, None, -7, cborutil.SPECIAL_NONE),
580 (False, None, -7, cborutil.SPECIAL_NONE),
581 )
581 )
582 self.assertEqual(
582 self.assertEqual(
583 cborutil.decodeitem(encoded[0:3]),
583 cborutil.decodeitem(encoded[0:3]),
584 (False, None, -6, cborutil.SPECIAL_NONE),
584 (False, None, -6, cborutil.SPECIAL_NONE),
585 )
585 )
586 self.assertEqual(
586 self.assertEqual(
587 cborutil.decodeitem(encoded[0:4]),
587 cborutil.decodeitem(encoded[0:4]),
588 (False, None, -5, cborutil.SPECIAL_NONE),
588 (False, None, -5, cborutil.SPECIAL_NONE),
589 )
589 )
590 self.assertEqual(
590 self.assertEqual(
591 cborutil.decodeitem(encoded[0:5]),
591 cborutil.decodeitem(encoded[0:5]),
592 (False, None, -4, cborutil.SPECIAL_NONE),
592 (False, None, -4, cborutil.SPECIAL_NONE),
593 )
593 )
594 self.assertEqual(
594 self.assertEqual(
595 cborutil.decodeitem(encoded[0:6]),
595 cborutil.decodeitem(encoded[0:6]),
596 (False, None, -3, cborutil.SPECIAL_NONE),
596 (False, None, -3, cborutil.SPECIAL_NONE),
597 )
597 )
598 self.assertEqual(
598 self.assertEqual(
599 cborutil.decodeitem(encoded[0:7]),
599 cborutil.decodeitem(encoded[0:7]),
600 (False, None, -2, cborutil.SPECIAL_NONE),
600 (False, None, -2, cborutil.SPECIAL_NONE),
601 )
601 )
602 self.assertEqual(
602 self.assertEqual(
603 cborutil.decodeitem(encoded[0:8]),
603 cborutil.decodeitem(encoded[0:8]),
604 (False, None, -1, cborutil.SPECIAL_NONE),
604 (False, None, -1, cborutil.SPECIAL_NONE),
605 )
605 )
606 self.assertEqual(
606 self.assertEqual(
607 cborutil.decodeitem(encoded[0:9]),
607 cborutil.decodeitem(encoded[0:9]),
608 (True, 2 ** 32, 9, cborutil.SPECIAL_NONE),
608 (True, 2 ** 32, 9, cborutil.SPECIAL_NONE),
609 )
609 )
610
610
611 with self.assertRaisesRegex(
611 with self.assertRaisesRegex(
612 cborutil.CBORDecodeError, 'input data not fully consumed'
612 cborutil.CBORDecodeError, 'input data not fully consumed'
613 ):
613 ):
614 cborutil.decodeall(encoded[0:1])
614 cborutil.decodeall(encoded[0:1])
615
615
616 with self.assertRaisesRegex(
616 with self.assertRaisesRegex(
617 cborutil.CBORDecodeError, 'input data not fully consumed'
617 cborutil.CBORDecodeError, 'input data not fully consumed'
618 ):
618 ):
619 cborutil.decodeall(encoded[0:2])
619 cborutil.decodeall(encoded[0:2])
620
620
621 def testdecodepartiallonglong(self):
621 def testdecodepartiallonglong(self):
622 encoded = b''.join(cborutil.streamencode(-7000000000))
622 encoded = b''.join(cborutil.streamencode(-7000000000))
623
623
624 self.assertEqual(
624 self.assertEqual(
625 cborutil.decodeitem(encoded[0:1]),
625 cborutil.decodeitem(encoded[0:1]),
626 (False, None, -8, cborutil.SPECIAL_NONE),
626 (False, None, -8, cborutil.SPECIAL_NONE),
627 )
627 )
628 self.assertEqual(
628 self.assertEqual(
629 cborutil.decodeitem(encoded[0:2]),
629 cborutil.decodeitem(encoded[0:2]),
630 (False, None, -7, cborutil.SPECIAL_NONE),
630 (False, None, -7, cborutil.SPECIAL_NONE),
631 )
631 )
632 self.assertEqual(
632 self.assertEqual(
633 cborutil.decodeitem(encoded[0:3]),
633 cborutil.decodeitem(encoded[0:3]),
634 (False, None, -6, cborutil.SPECIAL_NONE),
634 (False, None, -6, cborutil.SPECIAL_NONE),
635 )
635 )
636 self.assertEqual(
636 self.assertEqual(
637 cborutil.decodeitem(encoded[0:4]),
637 cborutil.decodeitem(encoded[0:4]),
638 (False, None, -5, cborutil.SPECIAL_NONE),
638 (False, None, -5, cborutil.SPECIAL_NONE),
639 )
639 )
640 self.assertEqual(
640 self.assertEqual(
641 cborutil.decodeitem(encoded[0:5]),
641 cborutil.decodeitem(encoded[0:5]),
642 (False, None, -4, cborutil.SPECIAL_NONE),
642 (False, None, -4, cborutil.SPECIAL_NONE),
643 )
643 )
644 self.assertEqual(
644 self.assertEqual(
645 cborutil.decodeitem(encoded[0:6]),
645 cborutil.decodeitem(encoded[0:6]),
646 (False, None, -3, cborutil.SPECIAL_NONE),
646 (False, None, -3, cborutil.SPECIAL_NONE),
647 )
647 )
648 self.assertEqual(
648 self.assertEqual(
649 cborutil.decodeitem(encoded[0:7]),
649 cborutil.decodeitem(encoded[0:7]),
650 (False, None, -2, cborutil.SPECIAL_NONE),
650 (False, None, -2, cborutil.SPECIAL_NONE),
651 )
651 )
652 self.assertEqual(
652 self.assertEqual(
653 cborutil.decodeitem(encoded[0:8]),
653 cborutil.decodeitem(encoded[0:8]),
654 (False, None, -1, cborutil.SPECIAL_NONE),
654 (False, None, -1, cborutil.SPECIAL_NONE),
655 )
655 )
656 self.assertEqual(
656 self.assertEqual(
657 cborutil.decodeitem(encoded[0:9]),
657 cborutil.decodeitem(encoded[0:9]),
658 (True, -7000000000, 9, cborutil.SPECIAL_NONE),
658 (True, -7000000000, 9, cborutil.SPECIAL_NONE),
659 )
659 )
660
660
661
661
662 class ArrayTests(TestCase):
662 class ArrayTests(TestCase):
663 def testempty(self):
663 def testempty(self):
664 self.assertEqual(list(cborutil.streamencode([])), [b'\x80'])
664 self.assertEqual(list(cborutil.streamencode([])), [b'\x80'])
665 self.assertEqual(loadit(cborutil.streamencode([])), [])
665 self.assertEqual(loadit(cborutil.streamencode([])), [])
666
666
667 self.assertEqual(cborutil.decodeall(b'\x80'), [[]])
667 self.assertEqual(cborutil.decodeall(b'\x80'), [[]])
668
668
669 def testbasic(self):
669 def testbasic(self):
670 source = [b'foo', b'bar', 1, -10]
670 source = [b'foo', b'bar', 1, -10]
671
671
672 chunks = [b'\x84', b'\x43', b'foo', b'\x43', b'bar', b'\x01', b'\x29']
672 chunks = [b'\x84', b'\x43', b'foo', b'\x43', b'bar', b'\x01', b'\x29']
673
673
674 self.assertEqual(list(cborutil.streamencode(source)), chunks)
674 self.assertEqual(list(cborutil.streamencode(source)), chunks)
675
675
676 self.assertEqual(cborutil.decodeall(b''.join(chunks)), [source])
676 self.assertEqual(cborutil.decodeall(b''.join(chunks)), [source])
677
677
678 def testemptyfromiter(self):
678 def testemptyfromiter(self):
679 self.assertEqual(
679 self.assertEqual(
680 b''.join(cborutil.streamencodearrayfromiter([])), b'\x9f\xff'
680 b''.join(cborutil.streamencodearrayfromiter([])), b'\x9f\xff'
681 )
681 )
682
682
683 with self.assertRaisesRegex(
683 with self.assertRaisesRegex(
684 cborutil.CBORDecodeError, 'indefinite length uint not allowed'
684 cborutil.CBORDecodeError, 'indefinite length uint not allowed'
685 ):
685 ):
686 cborutil.decodeall(b'\x9f\xff')
686 cborutil.decodeall(b'\x9f\xff')
687
687
688 def testfromiter1(self):
688 def testfromiter1(self):
689 source = [b'foo']
689 source = [b'foo']
690
690
691 self.assertEqual(
691 self.assertEqual(
692 list(cborutil.streamencodearrayfromiter(source)),
692 list(cborutil.streamencodearrayfromiter(source)),
693 [b'\x9f', b'\x43', b'foo', b'\xff',],
693 [b'\x9f', b'\x43', b'foo', b'\xff',],
694 )
694 )
695
695
696 dest = b''.join(cborutil.streamencodearrayfromiter(source))
696 dest = b''.join(cborutil.streamencodearrayfromiter(source))
697 self.assertEqual(cbor.loads(dest), source)
697 self.assertEqual(cbor.loads(dest), source)
698
698
699 with self.assertRaisesRegex(
699 with self.assertRaisesRegex(
700 cborutil.CBORDecodeError, 'indefinite length uint not allowed'
700 cborutil.CBORDecodeError, 'indefinite length uint not allowed'
701 ):
701 ):
702 cborutil.decodeall(dest)
702 cborutil.decodeall(dest)
703
703
704 def testtuple(self):
704 def testtuple(self):
705 source = (b'foo', None, 42)
705 source = (b'foo', None, 42)
706 encoded = b''.join(cborutil.streamencode(source))
706 encoded = b''.join(cborutil.streamencode(source))
707
707
708 self.assertEqual(cbor.loads(encoded), list(source))
708 self.assertEqual(cbor.loads(encoded), list(source))
709
709
710 self.assertEqual(cborutil.decodeall(encoded), [list(source)])
710 self.assertEqual(cborutil.decodeall(encoded), [list(source)])
711
711
712 def testpartialdecode(self):
712 def testpartialdecode(self):
713 source = list(range(4))
713 source = list(range(4))
714 encoded = b''.join(cborutil.streamencode(source))
714 encoded = b''.join(cborutil.streamencode(source))
715 self.assertEqual(
715 self.assertEqual(
716 cborutil.decodeitem(encoded[0:1]),
716 cborutil.decodeitem(encoded[0:1]),
717 (True, 4, 1, cborutil.SPECIAL_START_ARRAY),
717 (True, 4, 1, cborutil.SPECIAL_START_ARRAY),
718 )
718 )
719 self.assertEqual(
719 self.assertEqual(
720 cborutil.decodeitem(encoded[0:2]),
720 cborutil.decodeitem(encoded[0:2]),
721 (True, 4, 1, cborutil.SPECIAL_START_ARRAY),
721 (True, 4, 1, cborutil.SPECIAL_START_ARRAY),
722 )
722 )
723
723
724 source = list(range(23))
724 source = list(range(23))
725 encoded = b''.join(cborutil.streamencode(source))
725 encoded = b''.join(cborutil.streamencode(source))
726 self.assertEqual(
726 self.assertEqual(
727 cborutil.decodeitem(encoded[0:1]),
727 cborutil.decodeitem(encoded[0:1]),
728 (True, 23, 1, cborutil.SPECIAL_START_ARRAY),
728 (True, 23, 1, cborutil.SPECIAL_START_ARRAY),
729 )
729 )
730 self.assertEqual(
730 self.assertEqual(
731 cborutil.decodeitem(encoded[0:2]),
731 cborutil.decodeitem(encoded[0:2]),
732 (True, 23, 1, cborutil.SPECIAL_START_ARRAY),
732 (True, 23, 1, cborutil.SPECIAL_START_ARRAY),
733 )
733 )
734
734
735 source = list(range(24))
735 source = list(range(24))
736 encoded = b''.join(cborutil.streamencode(source))
736 encoded = b''.join(cborutil.streamencode(source))
737 self.assertEqual(
737 self.assertEqual(
738 cborutil.decodeitem(encoded[0:1]),
738 cborutil.decodeitem(encoded[0:1]),
739 (False, None, -1, cborutil.SPECIAL_NONE),
739 (False, None, -1, cborutil.SPECIAL_NONE),
740 )
740 )
741 self.assertEqual(
741 self.assertEqual(
742 cborutil.decodeitem(encoded[0:2]),
742 cborutil.decodeitem(encoded[0:2]),
743 (True, 24, 2, cborutil.SPECIAL_START_ARRAY),
743 (True, 24, 2, cborutil.SPECIAL_START_ARRAY),
744 )
744 )
745 self.assertEqual(
745 self.assertEqual(
746 cborutil.decodeitem(encoded[0:3]),
746 cborutil.decodeitem(encoded[0:3]),
747 (True, 24, 2, cborutil.SPECIAL_START_ARRAY),
747 (True, 24, 2, cborutil.SPECIAL_START_ARRAY),
748 )
748 )
749
749
750 source = list(range(256))
750 source = list(range(256))
751 encoded = b''.join(cborutil.streamencode(source))
751 encoded = b''.join(cborutil.streamencode(source))
752 self.assertEqual(
752 self.assertEqual(
753 cborutil.decodeitem(encoded[0:1]),
753 cborutil.decodeitem(encoded[0:1]),
754 (False, None, -2, cborutil.SPECIAL_NONE),
754 (False, None, -2, cborutil.SPECIAL_NONE),
755 )
755 )
756 self.assertEqual(
756 self.assertEqual(
757 cborutil.decodeitem(encoded[0:2]),
757 cborutil.decodeitem(encoded[0:2]),
758 (False, None, -1, cborutil.SPECIAL_NONE),
758 (False, None, -1, cborutil.SPECIAL_NONE),
759 )
759 )
760 self.assertEqual(
760 self.assertEqual(
761 cborutil.decodeitem(encoded[0:3]),
761 cborutil.decodeitem(encoded[0:3]),
762 (True, 256, 3, cborutil.SPECIAL_START_ARRAY),
762 (True, 256, 3, cborutil.SPECIAL_START_ARRAY),
763 )
763 )
764 self.assertEqual(
764 self.assertEqual(
765 cborutil.decodeitem(encoded[0:4]),
765 cborutil.decodeitem(encoded[0:4]),
766 (True, 256, 3, cborutil.SPECIAL_START_ARRAY),
766 (True, 256, 3, cborutil.SPECIAL_START_ARRAY),
767 )
767 )
768
768
769 def testnested(self):
769 def testnested(self):
770 source = [[], [], [[], [], []]]
770 source = [[], [], [[], [], []]]
771 encoded = b''.join(cborutil.streamencode(source))
771 encoded = b''.join(cborutil.streamencode(source))
772 self.assertEqual(cborutil.decodeall(encoded), [source])
772 self.assertEqual(cborutil.decodeall(encoded), [source])
773
773
774 source = [True, None, [True, 0, 2], [None], [], [[[]], -87]]
774 source = [True, None, [True, 0, 2], [None], [], [[[]], -87]]
775 encoded = b''.join(cborutil.streamencode(source))
775 encoded = b''.join(cborutil.streamencode(source))
776 self.assertEqual(cborutil.decodeall(encoded), [source])
776 self.assertEqual(cborutil.decodeall(encoded), [source])
777
777
778 # A set within an array.
778 # A set within an array.
779 source = [None, {b'foo', b'bar', None, False}, set()]
779 source = [None, {b'foo', b'bar', None, False}, set()]
780 encoded = b''.join(cborutil.streamencode(source))
780 encoded = b''.join(cborutil.streamencode(source))
781 self.assertEqual(cborutil.decodeall(encoded), [source])
781 self.assertEqual(cborutil.decodeall(encoded), [source])
782
782
783 # A map within an array.
783 # A map within an array.
784 source = [None, {}, {b'foo': b'bar', True: False}, [{}]]
784 source = [None, {}, {b'foo': b'bar', True: False}, [{}]]
785 encoded = b''.join(cborutil.streamencode(source))
785 encoded = b''.join(cborutil.streamencode(source))
786 self.assertEqual(cborutil.decodeall(encoded), [source])
786 self.assertEqual(cborutil.decodeall(encoded), [source])
787
787
788 def testindefinitebytestringvalues(self):
788 def testindefinitebytestringvalues(self):
789 # Single value array whose value is an empty indefinite bytestring.
789 # Single value array whose value is an empty indefinite bytestring.
790 encoded = b'\x81\x5f\x40\xff'
790 encoded = b'\x81\x5f\x40\xff'
791
791
792 with self.assertRaisesRegex(
792 with self.assertRaisesRegex(
793 cborutil.CBORDecodeError,
793 cborutil.CBORDecodeError,
794 'indefinite length bytestrings not ' 'allowed as array values',
794 'indefinite length bytestrings not ' 'allowed as array values',
795 ):
795 ):
796 cborutil.decodeall(encoded)
796 cborutil.decodeall(encoded)
797
797
798
798
799 class SetTests(TestCase):
799 class SetTests(TestCase):
800 def testempty(self):
800 def testempty(self):
801 self.assertEqual(
801 self.assertEqual(
802 list(cborutil.streamencode(set())), [b'\xd9\x01\x02', b'\x80',]
802 list(cborutil.streamencode(set())), [b'\xd9\x01\x02', b'\x80',]
803 )
803 )
804
804
805 self.assertEqual(cborutil.decodeall(b'\xd9\x01\x02\x80'), [set()])
805 self.assertEqual(cborutil.decodeall(b'\xd9\x01\x02\x80'), [set()])
806
806
807 def testset(self):
807 def testset(self):
808 source = {b'foo', None, 42}
808 source = {b'foo', None, 42}
809 encoded = b''.join(cborutil.streamencode(source))
809 encoded = b''.join(cborutil.streamencode(source))
810
810
811 self.assertEqual(cbor.loads(encoded), source)
811 self.assertEqual(cbor.loads(encoded), source)
812
812
813 self.assertEqual(cborutil.decodeall(encoded), [source])
813 self.assertEqual(cborutil.decodeall(encoded), [source])
814
814
815 def testinvalidtag(self):
815 def testinvalidtag(self):
816 # Must use array to encode sets.
816 # Must use array to encode sets.
817 encoded = b'\xd9\x01\x02\xa0'
817 encoded = b'\xd9\x01\x02\xa0'
818
818
819 with self.assertRaisesRegex(
819 with self.assertRaisesRegex(
820 cborutil.CBORDecodeError,
820 cborutil.CBORDecodeError,
821 'expected array after finite set ' 'semantic tag',
821 'expected array after finite set ' 'semantic tag',
822 ):
822 ):
823 cborutil.decodeall(encoded)
823 cborutil.decodeall(encoded)
824
824
825 def testpartialdecode(self):
825 def testpartialdecode(self):
826 # Semantic tag item will be 3 bytes. Set header will be variable
826 # Semantic tag item will be 3 bytes. Set header will be variable
827 # depending on length.
827 # depending on length.
828 encoded = b''.join(cborutil.streamencode({i for i in range(23)}))
828 encoded = b''.join(cborutil.streamencode({i for i in range(23)}))
829 self.assertEqual(
829 self.assertEqual(
830 cborutil.decodeitem(encoded[0:1]),
830 cborutil.decodeitem(encoded[0:1]),
831 (False, None, -2, cborutil.SPECIAL_NONE),
831 (False, None, -2, cborutil.SPECIAL_NONE),
832 )
832 )
833 self.assertEqual(
833 self.assertEqual(
834 cborutil.decodeitem(encoded[0:2]),
834 cborutil.decodeitem(encoded[0:2]),
835 (False, None, -1, cborutil.SPECIAL_NONE),
835 (False, None, -1, cborutil.SPECIAL_NONE),
836 )
836 )
837 self.assertEqual(
837 self.assertEqual(
838 cborutil.decodeitem(encoded[0:3]),
838 cborutil.decodeitem(encoded[0:3]),
839 (False, None, -1, cborutil.SPECIAL_NONE),
839 (False, None, -1, cborutil.SPECIAL_NONE),
840 )
840 )
841 self.assertEqual(
841 self.assertEqual(
842 cborutil.decodeitem(encoded[0:4]),
842 cborutil.decodeitem(encoded[0:4]),
843 (True, 23, 4, cborutil.SPECIAL_START_SET),
843 (True, 23, 4, cborutil.SPECIAL_START_SET),
844 )
844 )
845 self.assertEqual(
845 self.assertEqual(
846 cborutil.decodeitem(encoded[0:5]),
846 cborutil.decodeitem(encoded[0:5]),
847 (True, 23, 4, cborutil.SPECIAL_START_SET),
847 (True, 23, 4, cborutil.SPECIAL_START_SET),
848 )
848 )
849
849
850 encoded = b''.join(cborutil.streamencode({i for i in range(24)}))
850 encoded = b''.join(cborutil.streamencode({i for i in range(24)}))
851 self.assertEqual(
851 self.assertEqual(
852 cborutil.decodeitem(encoded[0:1]),
852 cborutil.decodeitem(encoded[0:1]),
853 (False, None, -2, cborutil.SPECIAL_NONE),
853 (False, None, -2, cborutil.SPECIAL_NONE),
854 )
854 )
855 self.assertEqual(
855 self.assertEqual(
856 cborutil.decodeitem(encoded[0:2]),
856 cborutil.decodeitem(encoded[0:2]),
857 (False, None, -1, cborutil.SPECIAL_NONE),
857 (False, None, -1, cborutil.SPECIAL_NONE),
858 )
858 )
859 self.assertEqual(
859 self.assertEqual(
860 cborutil.decodeitem(encoded[0:3]),
860 cborutil.decodeitem(encoded[0:3]),
861 (False, None, -1, cborutil.SPECIAL_NONE),
861 (False, None, -1, cborutil.SPECIAL_NONE),
862 )
862 )
863 self.assertEqual(
863 self.assertEqual(
864 cborutil.decodeitem(encoded[0:4]),
864 cborutil.decodeitem(encoded[0:4]),
865 (False, None, -1, cborutil.SPECIAL_NONE),
865 (False, None, -1, cborutil.SPECIAL_NONE),
866 )
866 )
867 self.assertEqual(
867 self.assertEqual(
868 cborutil.decodeitem(encoded[0:5]),
868 cborutil.decodeitem(encoded[0:5]),
869 (True, 24, 5, cborutil.SPECIAL_START_SET),
869 (True, 24, 5, cborutil.SPECIAL_START_SET),
870 )
870 )
871 self.assertEqual(
871 self.assertEqual(
872 cborutil.decodeitem(encoded[0:6]),
872 cborutil.decodeitem(encoded[0:6]),
873 (True, 24, 5, cborutil.SPECIAL_START_SET),
873 (True, 24, 5, cborutil.SPECIAL_START_SET),
874 )
874 )
875
875
876 encoded = b''.join(cborutil.streamencode({i for i in range(256)}))
876 encoded = b''.join(cborutil.streamencode({i for i in range(256)}))
877 self.assertEqual(
877 self.assertEqual(
878 cborutil.decodeitem(encoded[0:1]),
878 cborutil.decodeitem(encoded[0:1]),
879 (False, None, -2, cborutil.SPECIAL_NONE),
879 (False, None, -2, cborutil.SPECIAL_NONE),
880 )
880 )
881 self.assertEqual(
881 self.assertEqual(
882 cborutil.decodeitem(encoded[0:2]),
882 cborutil.decodeitem(encoded[0:2]),
883 (False, None, -1, cborutil.SPECIAL_NONE),
883 (False, None, -1, cborutil.SPECIAL_NONE),
884 )
884 )
885 self.assertEqual(
885 self.assertEqual(
886 cborutil.decodeitem(encoded[0:3]),
886 cborutil.decodeitem(encoded[0:3]),
887 (False, None, -1, cborutil.SPECIAL_NONE),
887 (False, None, -1, cborutil.SPECIAL_NONE),
888 )
888 )
889 self.assertEqual(
889 self.assertEqual(
890 cborutil.decodeitem(encoded[0:4]),
890 cborutil.decodeitem(encoded[0:4]),
891 (False, None, -2, cborutil.SPECIAL_NONE),
891 (False, None, -2, cborutil.SPECIAL_NONE),
892 )
892 )
893 self.assertEqual(
893 self.assertEqual(
894 cborutil.decodeitem(encoded[0:5]),
894 cborutil.decodeitem(encoded[0:5]),
895 (False, None, -1, cborutil.SPECIAL_NONE),
895 (False, None, -1, cborutil.SPECIAL_NONE),
896 )
896 )
897 self.assertEqual(
897 self.assertEqual(
898 cborutil.decodeitem(encoded[0:6]),
898 cborutil.decodeitem(encoded[0:6]),
899 (True, 256, 6, cborutil.SPECIAL_START_SET),
899 (True, 256, 6, cborutil.SPECIAL_START_SET),
900 )
900 )
901
901
902 def testinvalidvalue(self):
902 def testinvalidvalue(self):
903 encoded = b''.join(
903 encoded = b''.join(
904 [
904 [
905 b'\xd9\x01\x02', # semantic tag
905 b'\xd9\x01\x02', # semantic tag
906 b'\x81', # array of size 1
906 b'\x81', # array of size 1
907 b'\x5f\x43foo\xff', # indefinite length bytestring "foo"
907 b'\x5f\x43foo\xff', # indefinite length bytestring "foo"
908 ]
908 ]
909 )
909 )
910
910
911 with self.assertRaisesRegex(
911 with self.assertRaisesRegex(
912 cborutil.CBORDecodeError,
912 cborutil.CBORDecodeError,
913 'indefinite length bytestrings not ' 'allowed as set values',
913 'indefinite length bytestrings not ' 'allowed as set values',
914 ):
914 ):
915 cborutil.decodeall(encoded)
915 cborutil.decodeall(encoded)
916
916
917 encoded = b''.join([b'\xd9\x01\x02', b'\x81', b'\x80',]) # empty array
917 encoded = b''.join([b'\xd9\x01\x02', b'\x81', b'\x80',]) # empty array
918
918
919 with self.assertRaisesRegex(
919 with self.assertRaisesRegex(
920 cborutil.CBORDecodeError, 'collections not allowed as set values'
920 cborutil.CBORDecodeError, 'collections not allowed as set values'
921 ):
921 ):
922 cborutil.decodeall(encoded)
922 cborutil.decodeall(encoded)
923
923
924 encoded = b''.join([b'\xd9\x01\x02', b'\x81', b'\xa0',]) # empty map
924 encoded = b''.join([b'\xd9\x01\x02', b'\x81', b'\xa0',]) # empty map
925
925
926 with self.assertRaisesRegex(
926 with self.assertRaisesRegex(
927 cborutil.CBORDecodeError, 'collections not allowed as set values'
927 cborutil.CBORDecodeError, 'collections not allowed as set values'
928 ):
928 ):
929 cborutil.decodeall(encoded)
929 cborutil.decodeall(encoded)
930
930
931 encoded = b''.join(
931 encoded = b''.join(
932 [
932 [
933 b'\xd9\x01\x02',
933 b'\xd9\x01\x02',
934 b'\x81',
934 b'\x81',
935 b'\xd9\x01\x02\x81\x01', # set with integer 1
935 b'\xd9\x01\x02\x81\x01', # set with integer 1
936 ]
936 ]
937 )
937 )
938
938
939 with self.assertRaisesRegex(
939 with self.assertRaisesRegex(
940 cborutil.CBORDecodeError, 'collections not allowed as set values'
940 cborutil.CBORDecodeError, 'collections not allowed as set values'
941 ):
941 ):
942 cborutil.decodeall(encoded)
942 cborutil.decodeall(encoded)
943
943
944
944
945 class BoolTests(TestCase):
945 class BoolTests(TestCase):
946 def testbasic(self):
946 def testbasic(self):
947 self.assertEqual(list(cborutil.streamencode(True)), [b'\xf5'])
947 self.assertEqual(list(cborutil.streamencode(True)), [b'\xf5'])
948 self.assertEqual(list(cborutil.streamencode(False)), [b'\xf4'])
948 self.assertEqual(list(cborutil.streamencode(False)), [b'\xf4'])
949
949
950 self.assertIs(loadit(cborutil.streamencode(True)), True)
950 self.assertIs(loadit(cborutil.streamencode(True)), True)
951 self.assertIs(loadit(cborutil.streamencode(False)), False)
951 self.assertIs(loadit(cborutil.streamencode(False)), False)
952
952
953 self.assertEqual(cborutil.decodeall(b'\xf4'), [False])
953 self.assertEqual(cborutil.decodeall(b'\xf4'), [False])
954 self.assertEqual(cborutil.decodeall(b'\xf5'), [True])
954 self.assertEqual(cborutil.decodeall(b'\xf5'), [True])
955
955
956 self.assertEqual(
956 self.assertEqual(
957 cborutil.decodeall(b'\xf4\xf5\xf5\xf4'), [False, True, True, False]
957 cborutil.decodeall(b'\xf4\xf5\xf5\xf4'), [False, True, True, False]
958 )
958 )
959
959
960
960
961 class NoneTests(TestCase):
961 class NoneTests(TestCase):
962 def testbasic(self):
962 def testbasic(self):
963 self.assertEqual(list(cborutil.streamencode(None)), [b'\xf6'])
963 self.assertEqual(list(cborutil.streamencode(None)), [b'\xf6'])
964
964
965 self.assertIs(loadit(cborutil.streamencode(None)), None)
965 self.assertIs(loadit(cborutil.streamencode(None)), None)
966
966
967 self.assertEqual(cborutil.decodeall(b'\xf6'), [None])
967 self.assertEqual(cborutil.decodeall(b'\xf6'), [None])
968 self.assertEqual(cborutil.decodeall(b'\xf6\xf6'), [None, None])
968 self.assertEqual(cborutil.decodeall(b'\xf6\xf6'), [None, None])
969
969
970
970
971 class MapTests(TestCase):
971 class MapTests(TestCase):
972 def testempty(self):
972 def testempty(self):
973 self.assertEqual(list(cborutil.streamencode({})), [b'\xa0'])
973 self.assertEqual(list(cborutil.streamencode({})), [b'\xa0'])
974 self.assertEqual(loadit(cborutil.streamencode({})), {})
974 self.assertEqual(loadit(cborutil.streamencode({})), {})
975
975
976 self.assertEqual(cborutil.decodeall(b'\xa0'), [{}])
976 self.assertEqual(cborutil.decodeall(b'\xa0'), [{}])
977
977
978 def testemptyindefinite(self):
978 def testemptyindefinite(self):
979 self.assertEqual(
979 self.assertEqual(
980 list(cborutil.streamencodemapfromiter([])), [b'\xbf', b'\xff']
980 list(cborutil.streamencodemapfromiter([])), [b'\xbf', b'\xff']
981 )
981 )
982
982
983 self.assertEqual(loadit(cborutil.streamencodemapfromiter([])), {})
983 self.assertEqual(loadit(cborutil.streamencodemapfromiter([])), {})
984
984
985 with self.assertRaisesRegex(
985 with self.assertRaisesRegex(
986 cborutil.CBORDecodeError, 'indefinite length uint not allowed'
986 cborutil.CBORDecodeError, 'indefinite length uint not allowed'
987 ):
987 ):
988 cborutil.decodeall(b'\xbf\xff')
988 cborutil.decodeall(b'\xbf\xff')
989
989
990 def testone(self):
990 def testone(self):
991 source = {b'foo': b'bar'}
991 source = {b'foo': b'bar'}
992 self.assertEqual(
992 self.assertEqual(
993 list(cborutil.streamencode(source)),
993 list(cborutil.streamencode(source)),
994 [b'\xa1', b'\x43', b'foo', b'\x43', b'bar'],
994 [b'\xa1', b'\x43', b'foo', b'\x43', b'bar'],
995 )
995 )
996
996
997 self.assertEqual(loadit(cborutil.streamencode(source)), source)
997 self.assertEqual(loadit(cborutil.streamencode(source)), source)
998
998
999 self.assertEqual(cborutil.decodeall(b'\xa1\x43foo\x43bar'), [source])
999 self.assertEqual(cborutil.decodeall(b'\xa1\x43foo\x43bar'), [source])
1000
1000
1001 def testmultiple(self):
1001 def testmultiple(self):
1002 source = {
1002 source = {
1003 b'foo': b'bar',
1003 b'foo': b'bar',
1004 b'baz': b'value1',
1004 b'baz': b'value1',
1005 }
1005 }
1006
1006
1007 self.assertEqual(loadit(cborutil.streamencode(source)), source)
1007 self.assertEqual(loadit(cborutil.streamencode(source)), source)
1008
1008
1009 self.assertEqual(
1009 self.assertEqual(
1010 loadit(cborutil.streamencodemapfromiter(source.items())), source
1010 loadit(cborutil.streamencodemapfromiter(source.items())), source
1011 )
1011 )
1012
1012
1013 encoded = b''.join(cborutil.streamencode(source))
1013 encoded = b''.join(cborutil.streamencode(source))
1014 self.assertEqual(cborutil.decodeall(encoded), [source])
1014 self.assertEqual(cborutil.decodeall(encoded), [source])
1015
1015
1016 def testcomplex(self):
1016 def testcomplex(self):
1017 source = {
1017 source = {
1018 b'key': 1,
1018 b'key': 1,
1019 2: -10,
1019 2: -10,
1020 }
1020 }
1021
1021
1022 self.assertEqual(loadit(cborutil.streamencode(source)), source)
1022 self.assertEqual(loadit(cborutil.streamencode(source)), source)
1023
1023
1024 self.assertEqual(
1024 self.assertEqual(
1025 loadit(cborutil.streamencodemapfromiter(source.items())), source
1025 loadit(cborutil.streamencodemapfromiter(source.items())), source
1026 )
1026 )
1027
1027
1028 encoded = b''.join(cborutil.streamencode(source))
1028 encoded = b''.join(cborutil.streamencode(source))
1029 self.assertEqual(cborutil.decodeall(encoded), [source])
1029 self.assertEqual(cborutil.decodeall(encoded), [source])
1030
1030
1031 def testnested(self):
1031 def testnested(self):
1032 source = {b'key1': None, b'key2': {b'sub1': b'sub2'}, b'sub2': {}}
1032 source = {b'key1': None, b'key2': {b'sub1': b'sub2'}, b'sub2': {}}
1033 encoded = b''.join(cborutil.streamencode(source))
1033 encoded = b''.join(cborutil.streamencode(source))
1034
1034
1035 self.assertEqual(cborutil.decodeall(encoded), [source])
1035 self.assertEqual(cborutil.decodeall(encoded), [source])
1036
1036
1037 source = {
1037 source = {
1038 b'key1': [],
1038 b'key1': [],
1039 b'key2': [None, False],
1039 b'key2': [None, False],
1040 b'key3': {b'foo', b'bar'},
1040 b'key3': {b'foo', b'bar'},
1041 b'key4': {},
1041 b'key4': {},
1042 }
1042 }
1043 encoded = b''.join(cborutil.streamencode(source))
1043 encoded = b''.join(cborutil.streamencode(source))
1044 self.assertEqual(cborutil.decodeall(encoded), [source])
1044 self.assertEqual(cborutil.decodeall(encoded), [source])
1045
1045
1046 def testillegalkey(self):
1046 def testillegalkey(self):
1047 encoded = b''.join(
1047 encoded = b''.join(
1048 [
1048 [
1049 # map header + len 1
1049 # map header + len 1
1050 b'\xa1',
1050 b'\xa1',
1051 # indefinite length bytestring "foo" in key position
1051 # indefinite length bytestring "foo" in key position
1052 b'\x5f\x03foo\xff',
1052 b'\x5f\x03foo\xff',
1053 ]
1053 ]
1054 )
1054 )
1055
1055
1056 with self.assertRaisesRegex(
1056 with self.assertRaisesRegex(
1057 cborutil.CBORDecodeError,
1057 cborutil.CBORDecodeError,
1058 'indefinite length bytestrings not ' 'allowed as map keys',
1058 'indefinite length bytestrings not ' 'allowed as map keys',
1059 ):
1059 ):
1060 cborutil.decodeall(encoded)
1060 cborutil.decodeall(encoded)
1061
1061
1062 encoded = b''.join([b'\xa1', b'\x80', b'\x43foo',]) # empty array
1062 encoded = b''.join([b'\xa1', b'\x80', b'\x43foo',]) # empty array
1063
1063
1064 with self.assertRaisesRegex(
1064 with self.assertRaisesRegex(
1065 cborutil.CBORDecodeError, 'collections not supported as map keys'
1065 cborutil.CBORDecodeError, 'collections not supported as map keys'
1066 ):
1066 ):
1067 cborutil.decodeall(encoded)
1067 cborutil.decodeall(encoded)
1068
1068
1069 def testillegalvalue(self):
1069 def testillegalvalue(self):
1070 encoded = b''.join(
1070 encoded = b''.join(
1071 [
1071 [
1072 b'\xa1', # map headers
1072 b'\xa1', # map headers
1073 b'\x43foo', # key
1073 b'\x43foo', # key
1074 b'\x5f\x03bar\xff', # indefinite length value
1074 b'\x5f\x03bar\xff', # indefinite length value
1075 ]
1075 ]
1076 )
1076 )
1077
1077
1078 with self.assertRaisesRegex(
1078 with self.assertRaisesRegex(
1079 cborutil.CBORDecodeError,
1079 cborutil.CBORDecodeError,
1080 'indefinite length bytestrings not ' 'allowed as map values',
1080 'indefinite length bytestrings not ' 'allowed as map values',
1081 ):
1081 ):
1082 cborutil.decodeall(encoded)
1082 cborutil.decodeall(encoded)
1083
1083
1084 def testpartialdecode(self):
1084 def testpartialdecode(self):
1085 source = {b'key1': b'value1'}
1085 source = {b'key1': b'value1'}
1086 encoded = b''.join(cborutil.streamencode(source))
1086 encoded = b''.join(cborutil.streamencode(source))
1087
1087
1088 self.assertEqual(
1088 self.assertEqual(
1089 cborutil.decodeitem(encoded[0:1]),
1089 cborutil.decodeitem(encoded[0:1]),
1090 (True, 1, 1, cborutil.SPECIAL_START_MAP),
1090 (True, 1, 1, cborutil.SPECIAL_START_MAP),
1091 )
1091 )
1092 self.assertEqual(
1092 self.assertEqual(
1093 cborutil.decodeitem(encoded[0:2]),
1093 cborutil.decodeitem(encoded[0:2]),
1094 (True, 1, 1, cborutil.SPECIAL_START_MAP),
1094 (True, 1, 1, cborutil.SPECIAL_START_MAP),
1095 )
1095 )
1096
1096
1097 source = {b'key%d' % i: None for i in range(23)}
1097 source = {b'key%d' % i: None for i in range(23)}
1098 encoded = b''.join(cborutil.streamencode(source))
1098 encoded = b''.join(cborutil.streamencode(source))
1099 self.assertEqual(
1099 self.assertEqual(
1100 cborutil.decodeitem(encoded[0:1]),
1100 cborutil.decodeitem(encoded[0:1]),
1101 (True, 23, 1, cborutil.SPECIAL_START_MAP),
1101 (True, 23, 1, cborutil.SPECIAL_START_MAP),
1102 )
1102 )
1103
1103
1104 source = {b'key%d' % i: None for i in range(24)}
1104 source = {b'key%d' % i: None for i in range(24)}
1105 encoded = b''.join(cborutil.streamencode(source))
1105 encoded = b''.join(cborutil.streamencode(source))
1106 self.assertEqual(
1106 self.assertEqual(
1107 cborutil.decodeitem(encoded[0:1]),
1107 cborutil.decodeitem(encoded[0:1]),
1108 (False, None, -1, cborutil.SPECIAL_NONE),
1108 (False, None, -1, cborutil.SPECIAL_NONE),
1109 )
1109 )
1110 self.assertEqual(
1110 self.assertEqual(
1111 cborutil.decodeitem(encoded[0:2]),
1111 cborutil.decodeitem(encoded[0:2]),
1112 (True, 24, 2, cborutil.SPECIAL_START_MAP),
1112 (True, 24, 2, cborutil.SPECIAL_START_MAP),
1113 )
1113 )
1114 self.assertEqual(
1114 self.assertEqual(
1115 cborutil.decodeitem(encoded[0:3]),
1115 cborutil.decodeitem(encoded[0:3]),
1116 (True, 24, 2, cborutil.SPECIAL_START_MAP),
1116 (True, 24, 2, cborutil.SPECIAL_START_MAP),
1117 )
1117 )
1118
1118
1119 source = {b'key%d' % i: None for i in range(256)}
1119 source = {b'key%d' % i: None for i in range(256)}
1120 encoded = b''.join(cborutil.streamencode(source))
1120 encoded = b''.join(cborutil.streamencode(source))
1121 self.assertEqual(
1121 self.assertEqual(
1122 cborutil.decodeitem(encoded[0:1]),
1122 cborutil.decodeitem(encoded[0:1]),
1123 (False, None, -2, cborutil.SPECIAL_NONE),
1123 (False, None, -2, cborutil.SPECIAL_NONE),
1124 )
1124 )
1125 self.assertEqual(
1125 self.assertEqual(
1126 cborutil.decodeitem(encoded[0:2]),
1126 cborutil.decodeitem(encoded[0:2]),
1127 (False, None, -1, cborutil.SPECIAL_NONE),
1127 (False, None, -1, cborutil.SPECIAL_NONE),
1128 )
1128 )
1129 self.assertEqual(
1129 self.assertEqual(
1130 cborutil.decodeitem(encoded[0:3]),
1130 cborutil.decodeitem(encoded[0:3]),
1131 (True, 256, 3, cborutil.SPECIAL_START_MAP),
1131 (True, 256, 3, cborutil.SPECIAL_START_MAP),
1132 )
1132 )
1133 self.assertEqual(
1133 self.assertEqual(
1134 cborutil.decodeitem(encoded[0:4]),
1134 cborutil.decodeitem(encoded[0:4]),
1135 (True, 256, 3, cborutil.SPECIAL_START_MAP),
1135 (True, 256, 3, cborutil.SPECIAL_START_MAP),
1136 )
1136 )
1137
1137
1138 source = {b'key%d' % i: None for i in range(65536)}
1138 source = {b'key%d' % i: None for i in range(65536)}
1139 encoded = b''.join(cborutil.streamencode(source))
1139 encoded = b''.join(cborutil.streamencode(source))
1140 self.assertEqual(
1140 self.assertEqual(
1141 cborutil.decodeitem(encoded[0:1]),
1141 cborutil.decodeitem(encoded[0:1]),
1142 (False, None, -4, cborutil.SPECIAL_NONE),
1142 (False, None, -4, cborutil.SPECIAL_NONE),
1143 )
1143 )
1144 self.assertEqual(
1144 self.assertEqual(
1145 cborutil.decodeitem(encoded[0:2]),
1145 cborutil.decodeitem(encoded[0:2]),
1146 (False, None, -3, cborutil.SPECIAL_NONE),
1146 (False, None, -3, cborutil.SPECIAL_NONE),
1147 )
1147 )
1148 self.assertEqual(
1148 self.assertEqual(
1149 cborutil.decodeitem(encoded[0:3]),
1149 cborutil.decodeitem(encoded[0:3]),
1150 (False, None, -2, cborutil.SPECIAL_NONE),
1150 (False, None, -2, cborutil.SPECIAL_NONE),
1151 )
1151 )
1152 self.assertEqual(
1152 self.assertEqual(
1153 cborutil.decodeitem(encoded[0:4]),
1153 cborutil.decodeitem(encoded[0:4]),
1154 (False, None, -1, cborutil.SPECIAL_NONE),
1154 (False, None, -1, cborutil.SPECIAL_NONE),
1155 )
1155 )
1156 self.assertEqual(
1156 self.assertEqual(
1157 cborutil.decodeitem(encoded[0:5]),
1157 cborutil.decodeitem(encoded[0:5]),
1158 (True, 65536, 5, cborutil.SPECIAL_START_MAP),
1158 (True, 65536, 5, cborutil.SPECIAL_START_MAP),
1159 )
1159 )
1160 self.assertEqual(
1160 self.assertEqual(
1161 cborutil.decodeitem(encoded[0:6]),
1161 cborutil.decodeitem(encoded[0:6]),
1162 (True, 65536, 5, cborutil.SPECIAL_START_MAP),
1162 (True, 65536, 5, cborutil.SPECIAL_START_MAP),
1163 )
1163 )
1164
1164
1165
1165
1166 class SemanticTagTests(TestCase):
1166 class SemanticTagTests(TestCase):
1167 def testdecodeforbidden(self):
1167 def testdecodeforbidden(self):
1168 for i in range(500):
1168 for i in range(500):
1169 if i == cborutil.SEMANTIC_TAG_FINITE_SET:
1169 if i == cborutil.SEMANTIC_TAG_FINITE_SET:
1170 continue
1170 continue
1171
1171
1172 tag = cborutil.encodelength(cborutil.MAJOR_TYPE_SEMANTIC, i)
1172 tag = cborutil.encodelength(cborutil.MAJOR_TYPE_SEMANTIC, i)
1173
1173
1174 encoded = tag + cborutil.encodelength(cborutil.MAJOR_TYPE_UINT, 42)
1174 encoded = tag + cborutil.encodelength(cborutil.MAJOR_TYPE_UINT, 42)
1175
1175
1176 # Partial decode is incomplete.
1176 # Partial decode is incomplete.
1177 if i < 24:
1177 if i < 24:
1178 pass
1178 pass
1179 elif i < 256:
1179 elif i < 256:
1180 self.assertEqual(
1180 self.assertEqual(
1181 cborutil.decodeitem(encoded[0:1]),
1181 cborutil.decodeitem(encoded[0:1]),
1182 (False, None, -1, cborutil.SPECIAL_NONE),
1182 (False, None, -1, cborutil.SPECIAL_NONE),
1183 )
1183 )
1184 elif i < 65536:
1184 elif i < 65536:
1185 self.assertEqual(
1185 self.assertEqual(
1186 cborutil.decodeitem(encoded[0:1]),
1186 cborutil.decodeitem(encoded[0:1]),
1187 (False, None, -2, cborutil.SPECIAL_NONE),
1187 (False, None, -2, cborutil.SPECIAL_NONE),
1188 )
1188 )
1189 self.assertEqual(
1189 self.assertEqual(
1190 cborutil.decodeitem(encoded[0:2]),
1190 cborutil.decodeitem(encoded[0:2]),
1191 (False, None, -1, cborutil.SPECIAL_NONE),
1191 (False, None, -1, cborutil.SPECIAL_NONE),
1192 )
1192 )
1193
1193
1194 with self.assertRaisesRegex(
1194 with self.assertRaisesRegex(
1195 cborutil.CBORDecodeError, r'semantic tag \d+ not allowed'
1195 cborutil.CBORDecodeError, r'semantic tag \d+ not allowed'
1196 ):
1196 ):
1197 cborutil.decodeitem(encoded)
1197 cborutil.decodeitem(encoded)
1198
1198
1199
1199
1200 class SpecialTypesTests(TestCase):
1200 class SpecialTypesTests(TestCase):
1201 def testforbiddentypes(self):
1201 def testforbiddentypes(self):
1202 for i in range(256):
1202 for i in range(256):
1203 if i == cborutil.SUBTYPE_FALSE:
1203 if i == cborutil.SUBTYPE_FALSE:
1204 continue
1204 continue
1205 elif i == cborutil.SUBTYPE_TRUE:
1205 elif i == cborutil.SUBTYPE_TRUE:
1206 continue
1206 continue
1207 elif i == cborutil.SUBTYPE_NULL:
1207 elif i == cborutil.SUBTYPE_NULL:
1208 continue
1208 continue
1209
1209
1210 encoded = cborutil.encodelength(cborutil.MAJOR_TYPE_SPECIAL, i)
1210 encoded = cborutil.encodelength(cborutil.MAJOR_TYPE_SPECIAL, i)
1211
1211
1212 with self.assertRaisesRegex(
1212 with self.assertRaisesRegex(
1213 cborutil.CBORDecodeError, r'special type \d+ not allowed'
1213 cborutil.CBORDecodeError, r'special type \d+ not allowed'
1214 ):
1214 ):
1215 cborutil.decodeitem(encoded)
1215 cborutil.decodeitem(encoded)
1216
1216
1217
1217
1218 class SansIODecoderTests(TestCase):
1218 class SansIODecoderTests(TestCase):
1219 def testemptyinput(self):
1219 def testemptyinput(self):
1220 decoder = cborutil.sansiodecoder()
1220 decoder = cborutil.sansiodecoder()
1221 self.assertEqual(decoder.decode(b''), (False, 0, 0))
1221 self.assertEqual(decoder.decode(b''), (False, 0, 0))
1222
1222
1223
1223
1224 class BufferingDecoderTests(TestCase):
1224 class BufferingDecoderTests(TestCase):
1225 def testsimple(self):
1225 def testsimple(self):
1226 source = [
1226 source = [
1227 b'foobar',
1227 b'foobar',
1228 b'x' * 128,
1228 b'x' * 128,
1229 {b'foo': b'bar'},
1229 {b'foo': b'bar'},
1230 True,
1230 True,
1231 False,
1231 False,
1232 None,
1232 None,
1233 [None for i in range(128)],
1233 [None] * 128,
1234 ]
1234 ]
1235
1235
1236 encoded = b''.join(cborutil.streamencode(source))
1236 encoded = b''.join(cborutil.streamencode(source))
1237
1237
1238 for step in range(1, 32):
1238 for step in range(1, 32):
1239 decoder = cborutil.bufferingdecoder()
1239 decoder = cborutil.bufferingdecoder()
1240 start = 0
1240 start = 0
1241
1241
1242 while start < len(encoded):
1242 while start < len(encoded):
1243 decoder.decode(encoded[start : start + step])
1243 decoder.decode(encoded[start : start + step])
1244 start += step
1244 start += step
1245
1245
1246 self.assertEqual(decoder.getavailable(), [source])
1246 self.assertEqual(decoder.getavailable(), [source])
1247
1247
1248 def testbytearray(self):
1248 def testbytearray(self):
1249 source = b''.join(cborutil.streamencode(b'foobar'))
1249 source = b''.join(cborutil.streamencode(b'foobar'))
1250
1250
1251 decoder = cborutil.bufferingdecoder()
1251 decoder = cborutil.bufferingdecoder()
1252 decoder.decode(bytearray(source))
1252 decoder.decode(bytearray(source))
1253
1253
1254 self.assertEqual(decoder.getavailable(), [b'foobar'])
1254 self.assertEqual(decoder.getavailable(), [b'foobar'])
1255
1255
1256
1256
1257 class DecodeallTests(TestCase):
1257 class DecodeallTests(TestCase):
1258 def testemptyinput(self):
1258 def testemptyinput(self):
1259 self.assertEqual(cborutil.decodeall(b''), [])
1259 self.assertEqual(cborutil.decodeall(b''), [])
1260
1260
1261 def testpartialinput(self):
1261 def testpartialinput(self):
1262 encoded = b''.join(
1262 encoded = b''.join(
1263 [b'\x82', b'\x01',] # array of 2 elements # integer 1
1263 [b'\x82', b'\x01',] # array of 2 elements # integer 1
1264 )
1264 )
1265
1265
1266 with self.assertRaisesRegex(
1266 with self.assertRaisesRegex(
1267 cborutil.CBORDecodeError, 'input data not complete'
1267 cborutil.CBORDecodeError, 'input data not complete'
1268 ):
1268 ):
1269 cborutil.decodeall(encoded)
1269 cborutil.decodeall(encoded)
1270
1270
1271
1271
1272 if __name__ == '__main__':
1272 if __name__ == '__main__':
1273 import silenttestrunner
1273 import silenttestrunner
1274
1274
1275 silenttestrunner.main(__name__)
1275 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now