##// END OF EJS Templates
tests: get access to thirdparty.cbor without requiring it to be installed...
Augie Fackler -
r41195:a9905045 default
parent child Browse files
Show More
@@ -1,992 +1,1001 b''
1 1 from __future__ import absolute_import
2 2
3 import os
4 import sys
3 5 import unittest
4 6
5 from mercurial.thirdparty import (
6 cbor,
7 )
7 # TODO migrate to canned cbor test strings and stop using thirdparty.cbor
8 tpp = os.path.normpath(os.path.join(os.path.dirname(__file__),
9 '..', 'mercurial', 'thirdparty'))
10 if not os.path.exists(tpp):
11 # skip, not in a repo
12 sys.exit(80)
13 sys.path[0:0] = [tpp]
14 import cbor
15 del sys.path[0]
16
8 17 from mercurial.utils import (
9 18 cborutil,
10 19 )
11 20
12 21 class TestCase(unittest.TestCase):
13 22 if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
14 23 # Python 3.7 deprecates the regex*p* version, but 2.7 lacks
15 24 # the regex version.
16 25 assertRaisesRegex = (# camelcase-required
17 26 unittest.TestCase.assertRaisesRegexp)
18 27
19 28 def loadit(it):
20 29 return cbor.loads(b''.join(it))
21 30
22 31 class BytestringTests(TestCase):
23 32 def testsimple(self):
24 33 self.assertEqual(
25 34 list(cborutil.streamencode(b'foobar')),
26 35 [b'\x46', b'foobar'])
27 36
28 37 self.assertEqual(
29 38 loadit(cborutil.streamencode(b'foobar')),
30 39 b'foobar')
31 40
32 41 self.assertEqual(cborutil.decodeall(b'\x46foobar'),
33 42 [b'foobar'])
34 43
35 44 self.assertEqual(cborutil.decodeall(b'\x46foobar\x45fizbi'),
36 45 [b'foobar', b'fizbi'])
37 46
38 47 def testlong(self):
39 48 source = b'x' * 1048576
40 49
41 50 self.assertEqual(loadit(cborutil.streamencode(source)), source)
42 51
43 52 encoded = b''.join(cborutil.streamencode(source))
44 53 self.assertEqual(cborutil.decodeall(encoded), [source])
45 54
46 55 def testfromiter(self):
47 56 # This is the example from RFC 7049 Section 2.2.2.
48 57 source = [b'\xaa\xbb\xcc\xdd', b'\xee\xff\x99']
49 58
50 59 self.assertEqual(
51 60 list(cborutil.streamencodebytestringfromiter(source)),
52 61 [
53 62 b'\x5f',
54 63 b'\x44',
55 64 b'\xaa\xbb\xcc\xdd',
56 65 b'\x43',
57 66 b'\xee\xff\x99',
58 67 b'\xff',
59 68 ])
60 69
61 70 self.assertEqual(
62 71 loadit(cborutil.streamencodebytestringfromiter(source)),
63 72 b''.join(source))
64 73
65 74 self.assertEqual(cborutil.decodeall(b'\x5f\x44\xaa\xbb\xcc\xdd'
66 75 b'\x43\xee\xff\x99\xff'),
67 76 [b'\xaa\xbb\xcc\xdd', b'\xee\xff\x99', b''])
68 77
69 78 for i, chunk in enumerate(
70 79 cborutil.decodeall(b'\x5f\x44\xaa\xbb\xcc\xdd'
71 80 b'\x43\xee\xff\x99\xff')):
72 81 self.assertIsInstance(chunk, cborutil.bytestringchunk)
73 82
74 83 if i == 0:
75 84 self.assertTrue(chunk.isfirst)
76 85 else:
77 86 self.assertFalse(chunk.isfirst)
78 87
79 88 if i == 2:
80 89 self.assertTrue(chunk.islast)
81 90 else:
82 91 self.assertFalse(chunk.islast)
83 92
84 93 def testfromiterlarge(self):
85 94 source = [b'a' * 16, b'b' * 128, b'c' * 1024, b'd' * 1048576]
86 95
87 96 self.assertEqual(
88 97 loadit(cborutil.streamencodebytestringfromiter(source)),
89 98 b''.join(source))
90 99
91 100 def testindefinite(self):
92 101 source = b'\x00\x01\x02\x03' + b'\xff' * 16384
93 102
94 103 it = cborutil.streamencodeindefinitebytestring(source, chunksize=2)
95 104
96 105 self.assertEqual(next(it), b'\x5f')
97 106 self.assertEqual(next(it), b'\x42')
98 107 self.assertEqual(next(it), b'\x00\x01')
99 108 self.assertEqual(next(it), b'\x42')
100 109 self.assertEqual(next(it), b'\x02\x03')
101 110 self.assertEqual(next(it), b'\x42')
102 111 self.assertEqual(next(it), b'\xff\xff')
103 112
104 113 dest = b''.join(cborutil.streamencodeindefinitebytestring(
105 114 source, chunksize=42))
106 115 self.assertEqual(cbor.loads(dest), source)
107 116
108 117 self.assertEqual(b''.join(cborutil.decodeall(dest)), source)
109 118
110 119 for chunk in cborutil.decodeall(dest):
111 120 self.assertIsInstance(chunk, cborutil.bytestringchunk)
112 121 self.assertIn(len(chunk), (0, 8, 42))
113 122
114 123 encoded = b'\x5f\xff'
115 124 b = cborutil.decodeall(encoded)
116 125 self.assertEqual(b, [b''])
117 126 self.assertTrue(b[0].isfirst)
118 127 self.assertTrue(b[0].islast)
119 128
120 129 def testdecodevariouslengths(self):
121 130 for i in (0, 1, 22, 23, 24, 25, 254, 255, 256, 65534, 65535, 65536):
122 131 source = b'x' * i
123 132 encoded = b''.join(cborutil.streamencode(source))
124 133
125 134 if len(source) < 24:
126 135 hlen = 1
127 136 elif len(source) < 256:
128 137 hlen = 2
129 138 elif len(source) < 65536:
130 139 hlen = 3
131 140 elif len(source) < 1048576:
132 141 hlen = 5
133 142
134 143 self.assertEqual(cborutil.decodeitem(encoded),
135 144 (True, source, hlen + len(source),
136 145 cborutil.SPECIAL_NONE))
137 146
138 147 def testpartialdecode(self):
139 148 encoded = b''.join(cborutil.streamencode(b'foobar'))
140 149
141 150 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
142 151 (False, None, -6, cborutil.SPECIAL_NONE))
143 152 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
144 153 (False, None, -5, cborutil.SPECIAL_NONE))
145 154 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
146 155 (False, None, -4, cborutil.SPECIAL_NONE))
147 156 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
148 157 (False, None, -3, cborutil.SPECIAL_NONE))
149 158 self.assertEqual(cborutil.decodeitem(encoded[0:5]),
150 159 (False, None, -2, cborutil.SPECIAL_NONE))
151 160 self.assertEqual(cborutil.decodeitem(encoded[0:6]),
152 161 (False, None, -1, cborutil.SPECIAL_NONE))
153 162 self.assertEqual(cborutil.decodeitem(encoded[0:7]),
154 163 (True, b'foobar', 7, cborutil.SPECIAL_NONE))
155 164
156 165 def testpartialdecodevariouslengths(self):
157 166 lens = [
158 167 2,
159 168 3,
160 169 10,
161 170 23,
162 171 24,
163 172 25,
164 173 31,
165 174 100,
166 175 254,
167 176 255,
168 177 256,
169 178 257,
170 179 16384,
171 180 65534,
172 181 65535,
173 182 65536,
174 183 65537,
175 184 131071,
176 185 131072,
177 186 131073,
178 187 1048575,
179 188 1048576,
180 189 1048577,
181 190 ]
182 191
183 192 for size in lens:
184 193 if size < 24:
185 194 hlen = 1
186 195 elif size < 2**8:
187 196 hlen = 2
188 197 elif size < 2**16:
189 198 hlen = 3
190 199 elif size < 2**32:
191 200 hlen = 5
192 201 else:
193 202 assert False
194 203
195 204 source = b'x' * size
196 205 encoded = b''.join(cborutil.streamencode(source))
197 206
198 207 res = cborutil.decodeitem(encoded[0:1])
199 208
200 209 if hlen > 1:
201 210 self.assertEqual(res, (False, None, -(hlen - 1),
202 211 cborutil.SPECIAL_NONE))
203 212 else:
204 213 self.assertEqual(res, (False, None, -(size + hlen - 1),
205 214 cborutil.SPECIAL_NONE))
206 215
207 216 # Decoding partial header reports remaining header size.
208 217 for i in range(hlen - 1):
209 218 self.assertEqual(cborutil.decodeitem(encoded[0:i + 1]),
210 219 (False, None, -(hlen - i - 1),
211 220 cborutil.SPECIAL_NONE))
212 221
213 222 # Decoding complete header reports item size.
214 223 self.assertEqual(cborutil.decodeitem(encoded[0:hlen]),
215 224 (False, None, -size, cborutil.SPECIAL_NONE))
216 225
217 226 # Decoding single byte after header reports item size - 1
218 227 self.assertEqual(cborutil.decodeitem(encoded[0:hlen + 1]),
219 228 (False, None, -(size - 1), cborutil.SPECIAL_NONE))
220 229
221 230 # Decoding all but the last byte reports -1 needed.
222 231 self.assertEqual(cborutil.decodeitem(encoded[0:hlen + size - 1]),
223 232 (False, None, -1, cborutil.SPECIAL_NONE))
224 233
225 234 # Decoding last byte retrieves value.
226 235 self.assertEqual(cborutil.decodeitem(encoded[0:hlen + size]),
227 236 (True, source, hlen + size, cborutil.SPECIAL_NONE))
228 237
229 238 def testindefinitepartialdecode(self):
230 239 encoded = b''.join(cborutil.streamencodebytestringfromiter(
231 240 [b'foobar', b'biz']))
232 241
233 242 # First item should be begin of bytestring special.
234 243 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
235 244 (True, None, 1,
236 245 cborutil.SPECIAL_START_INDEFINITE_BYTESTRING))
237 246
238 247 # Second item should be the first chunk. But only available when
239 248 # we give it 7 bytes (1 byte header + 6 byte chunk).
240 249 self.assertEqual(cborutil.decodeitem(encoded[1:2]),
241 250 (False, None, -6, cborutil.SPECIAL_NONE))
242 251 self.assertEqual(cborutil.decodeitem(encoded[1:3]),
243 252 (False, None, -5, cborutil.SPECIAL_NONE))
244 253 self.assertEqual(cborutil.decodeitem(encoded[1:4]),
245 254 (False, None, -4, cborutil.SPECIAL_NONE))
246 255 self.assertEqual(cborutil.decodeitem(encoded[1:5]),
247 256 (False, None, -3, cborutil.SPECIAL_NONE))
248 257 self.assertEqual(cborutil.decodeitem(encoded[1:6]),
249 258 (False, None, -2, cborutil.SPECIAL_NONE))
250 259 self.assertEqual(cborutil.decodeitem(encoded[1:7]),
251 260 (False, None, -1, cborutil.SPECIAL_NONE))
252 261
253 262 self.assertEqual(cborutil.decodeitem(encoded[1:8]),
254 263 (True, b'foobar', 7, cborutil.SPECIAL_NONE))
255 264
256 265 # Third item should be second chunk. But only available when
257 266 # we give it 4 bytes (1 byte header + 3 byte chunk).
258 267 self.assertEqual(cborutil.decodeitem(encoded[8:9]),
259 268 (False, None, -3, cborutil.SPECIAL_NONE))
260 269 self.assertEqual(cborutil.decodeitem(encoded[8:10]),
261 270 (False, None, -2, cborutil.SPECIAL_NONE))
262 271 self.assertEqual(cborutil.decodeitem(encoded[8:11]),
263 272 (False, None, -1, cborutil.SPECIAL_NONE))
264 273
265 274 self.assertEqual(cborutil.decodeitem(encoded[8:12]),
266 275 (True, b'biz', 4, cborutil.SPECIAL_NONE))
267 276
268 277 # Fourth item should be end of indefinite stream marker.
269 278 self.assertEqual(cborutil.decodeitem(encoded[12:13]),
270 279 (True, None, 1, cborutil.SPECIAL_INDEFINITE_BREAK))
271 280
272 281 # Now test the behavior when going through the decoder.
273 282
274 283 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:1]),
275 284 (False, 1, 0))
276 285 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:2]),
277 286 (False, 1, 6))
278 287 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:3]),
279 288 (False, 1, 5))
280 289 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:4]),
281 290 (False, 1, 4))
282 291 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:5]),
283 292 (False, 1, 3))
284 293 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:6]),
285 294 (False, 1, 2))
286 295 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:7]),
287 296 (False, 1, 1))
288 297 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:8]),
289 298 (True, 8, 0))
290 299
291 300 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:9]),
292 301 (True, 8, 3))
293 302 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:10]),
294 303 (True, 8, 2))
295 304 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:11]),
296 305 (True, 8, 1))
297 306 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:12]),
298 307 (True, 12, 0))
299 308
300 309 self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:13]),
301 310 (True, 13, 0))
302 311
303 312 decoder = cborutil.sansiodecoder()
304 313 decoder.decode(encoded[0:8])
305 314 values = decoder.getavailable()
306 315 self.assertEqual(values, [b'foobar'])
307 316 self.assertTrue(values[0].isfirst)
308 317 self.assertFalse(values[0].islast)
309 318
310 319 self.assertEqual(decoder.decode(encoded[8:12]),
311 320 (True, 4, 0))
312 321 values = decoder.getavailable()
313 322 self.assertEqual(values, [b'biz'])
314 323 self.assertFalse(values[0].isfirst)
315 324 self.assertFalse(values[0].islast)
316 325
317 326 self.assertEqual(decoder.decode(encoded[12:]),
318 327 (True, 1, 0))
319 328 values = decoder.getavailable()
320 329 self.assertEqual(values, [b''])
321 330 self.assertFalse(values[0].isfirst)
322 331 self.assertTrue(values[0].islast)
323 332
324 333 class StringTests(TestCase):
325 334 def testdecodeforbidden(self):
326 335 encoded = b'\x63foo'
327 336 with self.assertRaisesRegex(cborutil.CBORDecodeError,
328 337 'string major type not supported'):
329 338 cborutil.decodeall(encoded)
330 339
331 340 class IntTests(TestCase):
332 341 def testsmall(self):
333 342 self.assertEqual(list(cborutil.streamencode(0)), [b'\x00'])
334 343 self.assertEqual(cborutil.decodeall(b'\x00'), [0])
335 344
336 345 self.assertEqual(list(cborutil.streamencode(1)), [b'\x01'])
337 346 self.assertEqual(cborutil.decodeall(b'\x01'), [1])
338 347
339 348 self.assertEqual(list(cborutil.streamencode(2)), [b'\x02'])
340 349 self.assertEqual(cborutil.decodeall(b'\x02'), [2])
341 350
342 351 self.assertEqual(list(cborutil.streamencode(3)), [b'\x03'])
343 352 self.assertEqual(cborutil.decodeall(b'\x03'), [3])
344 353
345 354 self.assertEqual(list(cborutil.streamencode(4)), [b'\x04'])
346 355 self.assertEqual(cborutil.decodeall(b'\x04'), [4])
347 356
348 357 # Multiple value decode works.
349 358 self.assertEqual(cborutil.decodeall(b'\x00\x01\x02\x03\x04'),
350 359 [0, 1, 2, 3, 4])
351 360
352 361 def testnegativesmall(self):
353 362 self.assertEqual(list(cborutil.streamencode(-1)), [b'\x20'])
354 363 self.assertEqual(cborutil.decodeall(b'\x20'), [-1])
355 364
356 365 self.assertEqual(list(cborutil.streamencode(-2)), [b'\x21'])
357 366 self.assertEqual(cborutil.decodeall(b'\x21'), [-2])
358 367
359 368 self.assertEqual(list(cborutil.streamencode(-3)), [b'\x22'])
360 369 self.assertEqual(cborutil.decodeall(b'\x22'), [-3])
361 370
362 371 self.assertEqual(list(cborutil.streamencode(-4)), [b'\x23'])
363 372 self.assertEqual(cborutil.decodeall(b'\x23'), [-4])
364 373
365 374 self.assertEqual(list(cborutil.streamencode(-5)), [b'\x24'])
366 375 self.assertEqual(cborutil.decodeall(b'\x24'), [-5])
367 376
368 377 # Multiple value decode works.
369 378 self.assertEqual(cborutil.decodeall(b'\x20\x21\x22\x23\x24'),
370 379 [-1, -2, -3, -4, -5])
371 380
372 381 def testrange(self):
373 382 for i in range(-70000, 70000, 10):
374 383 encoded = b''.join(cborutil.streamencode(i))
375 384
376 385 self.assertEqual(encoded, cbor.dumps(i))
377 386 self.assertEqual(cborutil.decodeall(encoded), [i])
378 387
379 388 def testdecodepartialubyte(self):
380 389 encoded = b''.join(cborutil.streamencode(250))
381 390
382 391 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
383 392 (False, None, -1, cborutil.SPECIAL_NONE))
384 393 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
385 394 (True, 250, 2, cborutil.SPECIAL_NONE))
386 395
387 396 def testdecodepartialbyte(self):
388 397 encoded = b''.join(cborutil.streamencode(-42))
389 398 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
390 399 (False, None, -1, cborutil.SPECIAL_NONE))
391 400 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
392 401 (True, -42, 2, cborutil.SPECIAL_NONE))
393 402
394 403 def testdecodepartialushort(self):
395 404 encoded = b''.join(cborutil.streamencode(2**15))
396 405
397 406 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
398 407 (False, None, -2, cborutil.SPECIAL_NONE))
399 408 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
400 409 (False, None, -1, cborutil.SPECIAL_NONE))
401 410 self.assertEqual(cborutil.decodeitem(encoded[0:5]),
402 411 (True, 2**15, 3, cborutil.SPECIAL_NONE))
403 412
404 413 def testdecodepartialshort(self):
405 414 encoded = b''.join(cborutil.streamencode(-1024))
406 415
407 416 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
408 417 (False, None, -2, cborutil.SPECIAL_NONE))
409 418 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
410 419 (False, None, -1, cborutil.SPECIAL_NONE))
411 420 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
412 421 (True, -1024, 3, cborutil.SPECIAL_NONE))
413 422
414 423 def testdecodepartialulong(self):
415 424 encoded = b''.join(cborutil.streamencode(2**28))
416 425
417 426 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
418 427 (False, None, -4, cborutil.SPECIAL_NONE))
419 428 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
420 429 (False, None, -3, cborutil.SPECIAL_NONE))
421 430 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
422 431 (False, None, -2, cborutil.SPECIAL_NONE))
423 432 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
424 433 (False, None, -1, cborutil.SPECIAL_NONE))
425 434 self.assertEqual(cborutil.decodeitem(encoded[0:5]),
426 435 (True, 2**28, 5, cborutil.SPECIAL_NONE))
427 436
428 437 def testdecodepartiallong(self):
429 438 encoded = b''.join(cborutil.streamencode(-1048580))
430 439
431 440 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
432 441 (False, None, -4, cborutil.SPECIAL_NONE))
433 442 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
434 443 (False, None, -3, cborutil.SPECIAL_NONE))
435 444 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
436 445 (False, None, -2, cborutil.SPECIAL_NONE))
437 446 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
438 447 (False, None, -1, cborutil.SPECIAL_NONE))
439 448 self.assertEqual(cborutil.decodeitem(encoded[0:5]),
440 449 (True, -1048580, 5, cborutil.SPECIAL_NONE))
441 450
442 451 def testdecodepartialulonglong(self):
443 452 encoded = b''.join(cborutil.streamencode(2**32))
444 453
445 454 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
446 455 (False, None, -8, cborutil.SPECIAL_NONE))
447 456 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
448 457 (False, None, -7, cborutil.SPECIAL_NONE))
449 458 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
450 459 (False, None, -6, cborutil.SPECIAL_NONE))
451 460 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
452 461 (False, None, -5, cborutil.SPECIAL_NONE))
453 462 self.assertEqual(cborutil.decodeitem(encoded[0:5]),
454 463 (False, None, -4, cborutil.SPECIAL_NONE))
455 464 self.assertEqual(cborutil.decodeitem(encoded[0:6]),
456 465 (False, None, -3, cborutil.SPECIAL_NONE))
457 466 self.assertEqual(cborutil.decodeitem(encoded[0:7]),
458 467 (False, None, -2, cborutil.SPECIAL_NONE))
459 468 self.assertEqual(cborutil.decodeitem(encoded[0:8]),
460 469 (False, None, -1, cborutil.SPECIAL_NONE))
461 470 self.assertEqual(cborutil.decodeitem(encoded[0:9]),
462 471 (True, 2**32, 9, cborutil.SPECIAL_NONE))
463 472
464 473 with self.assertRaisesRegex(
465 474 cborutil.CBORDecodeError, 'input data not fully consumed'):
466 475 cborutil.decodeall(encoded[0:1])
467 476
468 477 with self.assertRaisesRegex(
469 478 cborutil.CBORDecodeError, 'input data not fully consumed'):
470 479 cborutil.decodeall(encoded[0:2])
471 480
472 481 def testdecodepartiallonglong(self):
473 482 encoded = b''.join(cborutil.streamencode(-7000000000))
474 483
475 484 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
476 485 (False, None, -8, cborutil.SPECIAL_NONE))
477 486 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
478 487 (False, None, -7, cborutil.SPECIAL_NONE))
479 488 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
480 489 (False, None, -6, cborutil.SPECIAL_NONE))
481 490 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
482 491 (False, None, -5, cborutil.SPECIAL_NONE))
483 492 self.assertEqual(cborutil.decodeitem(encoded[0:5]),
484 493 (False, None, -4, cborutil.SPECIAL_NONE))
485 494 self.assertEqual(cborutil.decodeitem(encoded[0:6]),
486 495 (False, None, -3, cborutil.SPECIAL_NONE))
487 496 self.assertEqual(cborutil.decodeitem(encoded[0:7]),
488 497 (False, None, -2, cborutil.SPECIAL_NONE))
489 498 self.assertEqual(cborutil.decodeitem(encoded[0:8]),
490 499 (False, None, -1, cborutil.SPECIAL_NONE))
491 500 self.assertEqual(cborutil.decodeitem(encoded[0:9]),
492 501 (True, -7000000000, 9, cborutil.SPECIAL_NONE))
493 502
494 503 class ArrayTests(TestCase):
495 504 def testempty(self):
496 505 self.assertEqual(list(cborutil.streamencode([])), [b'\x80'])
497 506 self.assertEqual(loadit(cborutil.streamencode([])), [])
498 507
499 508 self.assertEqual(cborutil.decodeall(b'\x80'), [[]])
500 509
501 510 def testbasic(self):
502 511 source = [b'foo', b'bar', 1, -10]
503 512
504 513 chunks = [
505 514 b'\x84', b'\x43', b'foo', b'\x43', b'bar', b'\x01', b'\x29']
506 515
507 516 self.assertEqual(list(cborutil.streamencode(source)), chunks)
508 517
509 518 self.assertEqual(cborutil.decodeall(b''.join(chunks)), [source])
510 519
511 520 def testemptyfromiter(self):
512 521 self.assertEqual(b''.join(cborutil.streamencodearrayfromiter([])),
513 522 b'\x9f\xff')
514 523
515 524 with self.assertRaisesRegex(cborutil.CBORDecodeError,
516 525 'indefinite length uint not allowed'):
517 526 cborutil.decodeall(b'\x9f\xff')
518 527
519 528 def testfromiter1(self):
520 529 source = [b'foo']
521 530
522 531 self.assertEqual(list(cborutil.streamencodearrayfromiter(source)), [
523 532 b'\x9f',
524 533 b'\x43', b'foo',
525 534 b'\xff',
526 535 ])
527 536
528 537 dest = b''.join(cborutil.streamencodearrayfromiter(source))
529 538 self.assertEqual(cbor.loads(dest), source)
530 539
531 540 with self.assertRaisesRegex(cborutil.CBORDecodeError,
532 541 'indefinite length uint not allowed'):
533 542 cborutil.decodeall(dest)
534 543
535 544 def testtuple(self):
536 545 source = (b'foo', None, 42)
537 546 encoded = b''.join(cborutil.streamencode(source))
538 547
539 548 self.assertEqual(cbor.loads(encoded), list(source))
540 549
541 550 self.assertEqual(cborutil.decodeall(encoded), [list(source)])
542 551
543 552 def testpartialdecode(self):
544 553 source = list(range(4))
545 554 encoded = b''.join(cborutil.streamencode(source))
546 555 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
547 556 (True, 4, 1, cborutil.SPECIAL_START_ARRAY))
548 557 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
549 558 (True, 4, 1, cborutil.SPECIAL_START_ARRAY))
550 559
551 560 source = list(range(23))
552 561 encoded = b''.join(cborutil.streamencode(source))
553 562 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
554 563 (True, 23, 1, cborutil.SPECIAL_START_ARRAY))
555 564 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
556 565 (True, 23, 1, cborutil.SPECIAL_START_ARRAY))
557 566
558 567 source = list(range(24))
559 568 encoded = b''.join(cborutil.streamencode(source))
560 569 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
561 570 (False, None, -1, cborutil.SPECIAL_NONE))
562 571 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
563 572 (True, 24, 2, cborutil.SPECIAL_START_ARRAY))
564 573 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
565 574 (True, 24, 2, cborutil.SPECIAL_START_ARRAY))
566 575
567 576 source = list(range(256))
568 577 encoded = b''.join(cborutil.streamencode(source))
569 578 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
570 579 (False, None, -2, cborutil.SPECIAL_NONE))
571 580 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
572 581 (False, None, -1, cborutil.SPECIAL_NONE))
573 582 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
574 583 (True, 256, 3, cborutil.SPECIAL_START_ARRAY))
575 584 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
576 585 (True, 256, 3, cborutil.SPECIAL_START_ARRAY))
577 586
578 587 def testnested(self):
579 588 source = [[], [], [[], [], []]]
580 589 encoded = b''.join(cborutil.streamencode(source))
581 590 self.assertEqual(cborutil.decodeall(encoded), [source])
582 591
583 592 source = [True, None, [True, 0, 2], [None], [], [[[]], -87]]
584 593 encoded = b''.join(cborutil.streamencode(source))
585 594 self.assertEqual(cborutil.decodeall(encoded), [source])
586 595
587 596 # A set within an array.
588 597 source = [None, {b'foo', b'bar', None, False}, set()]
589 598 encoded = b''.join(cborutil.streamencode(source))
590 599 self.assertEqual(cborutil.decodeall(encoded), [source])
591 600
592 601 # A map within an array.
593 602 source = [None, {}, {b'foo': b'bar', True: False}, [{}]]
594 603 encoded = b''.join(cborutil.streamencode(source))
595 604 self.assertEqual(cborutil.decodeall(encoded), [source])
596 605
597 606 def testindefinitebytestringvalues(self):
598 607 # Single value array whose value is an empty indefinite bytestring.
599 608 encoded = b'\x81\x5f\x40\xff'
600 609
601 610 with self.assertRaisesRegex(cborutil.CBORDecodeError,
602 611 'indefinite length bytestrings not '
603 612 'allowed as array values'):
604 613 cborutil.decodeall(encoded)
605 614
606 615 class SetTests(TestCase):
607 616 def testempty(self):
608 617 self.assertEqual(list(cborutil.streamencode(set())), [
609 618 b'\xd9\x01\x02',
610 619 b'\x80',
611 620 ])
612 621
613 622 self.assertEqual(cborutil.decodeall(b'\xd9\x01\x02\x80'), [set()])
614 623
615 624 def testset(self):
616 625 source = {b'foo', None, 42}
617 626 encoded = b''.join(cborutil.streamencode(source))
618 627
619 628 self.assertEqual(cbor.loads(encoded), source)
620 629
621 630 self.assertEqual(cborutil.decodeall(encoded), [source])
622 631
623 632 def testinvalidtag(self):
624 633 # Must use array to encode sets.
625 634 encoded = b'\xd9\x01\x02\xa0'
626 635
627 636 with self.assertRaisesRegex(cborutil.CBORDecodeError,
628 637 'expected array after finite set '
629 638 'semantic tag'):
630 639 cborutil.decodeall(encoded)
631 640
632 641 def testpartialdecode(self):
633 642 # Semantic tag item will be 3 bytes. Set header will be variable
634 643 # depending on length.
635 644 encoded = b''.join(cborutil.streamencode({i for i in range(23)}))
636 645 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
637 646 (False, None, -2, cborutil.SPECIAL_NONE))
638 647 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
639 648 (False, None, -1, cborutil.SPECIAL_NONE))
640 649 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
641 650 (False, None, -1, cborutil.SPECIAL_NONE))
642 651 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
643 652 (True, 23, 4, cborutil.SPECIAL_START_SET))
644 653 self.assertEqual(cborutil.decodeitem(encoded[0:5]),
645 654 (True, 23, 4, cborutil.SPECIAL_START_SET))
646 655
647 656 encoded = b''.join(cborutil.streamencode({i for i in range(24)}))
648 657 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
649 658 (False, None, -2, cborutil.SPECIAL_NONE))
650 659 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
651 660 (False, None, -1, cborutil.SPECIAL_NONE))
652 661 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
653 662 (False, None, -1, cborutil.SPECIAL_NONE))
654 663 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
655 664 (False, None, -1, cborutil.SPECIAL_NONE))
656 665 self.assertEqual(cborutil.decodeitem(encoded[0:5]),
657 666 (True, 24, 5, cborutil.SPECIAL_START_SET))
658 667 self.assertEqual(cborutil.decodeitem(encoded[0:6]),
659 668 (True, 24, 5, cborutil.SPECIAL_START_SET))
660 669
661 670 encoded = b''.join(cborutil.streamencode({i for i in range(256)}))
662 671 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
663 672 (False, None, -2, cborutil.SPECIAL_NONE))
664 673 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
665 674 (False, None, -1, cborutil.SPECIAL_NONE))
666 675 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
667 676 (False, None, -1, cborutil.SPECIAL_NONE))
668 677 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
669 678 (False, None, -2, cborutil.SPECIAL_NONE))
670 679 self.assertEqual(cborutil.decodeitem(encoded[0:5]),
671 680 (False, None, -1, cborutil.SPECIAL_NONE))
672 681 self.assertEqual(cborutil.decodeitem(encoded[0:6]),
673 682 (True, 256, 6, cborutil.SPECIAL_START_SET))
674 683
675 684 def testinvalidvalue(self):
676 685 encoded = b''.join([
677 686 b'\xd9\x01\x02', # semantic tag
678 687 b'\x81', # array of size 1
679 688 b'\x5f\x43foo\xff', # indefinite length bytestring "foo"
680 689 ])
681 690
682 691 with self.assertRaisesRegex(cborutil.CBORDecodeError,
683 692 'indefinite length bytestrings not '
684 693 'allowed as set values'):
685 694 cborutil.decodeall(encoded)
686 695
687 696 encoded = b''.join([
688 697 b'\xd9\x01\x02',
689 698 b'\x81',
690 699 b'\x80', # empty array
691 700 ])
692 701
693 702 with self.assertRaisesRegex(cborutil.CBORDecodeError,
694 703 'collections not allowed as set values'):
695 704 cborutil.decodeall(encoded)
696 705
697 706 encoded = b''.join([
698 707 b'\xd9\x01\x02',
699 708 b'\x81',
700 709 b'\xa0', # empty map
701 710 ])
702 711
703 712 with self.assertRaisesRegex(cborutil.CBORDecodeError,
704 713 'collections not allowed as set values'):
705 714 cborutil.decodeall(encoded)
706 715
707 716 encoded = b''.join([
708 717 b'\xd9\x01\x02',
709 718 b'\x81',
710 719 b'\xd9\x01\x02\x81\x01', # set with integer 1
711 720 ])
712 721
713 722 with self.assertRaisesRegex(cborutil.CBORDecodeError,
714 723 'collections not allowed as set values'):
715 724 cborutil.decodeall(encoded)
716 725
717 726 class BoolTests(TestCase):
718 727 def testbasic(self):
719 728 self.assertEqual(list(cborutil.streamencode(True)), [b'\xf5'])
720 729 self.assertEqual(list(cborutil.streamencode(False)), [b'\xf4'])
721 730
722 731 self.assertIs(loadit(cborutil.streamencode(True)), True)
723 732 self.assertIs(loadit(cborutil.streamencode(False)), False)
724 733
725 734 self.assertEqual(cborutil.decodeall(b'\xf4'), [False])
726 735 self.assertEqual(cborutil.decodeall(b'\xf5'), [True])
727 736
728 737 self.assertEqual(cborutil.decodeall(b'\xf4\xf5\xf5\xf4'),
729 738 [False, True, True, False])
730 739
731 740 class NoneTests(TestCase):
732 741 def testbasic(self):
733 742 self.assertEqual(list(cborutil.streamencode(None)), [b'\xf6'])
734 743
735 744 self.assertIs(loadit(cborutil.streamencode(None)), None)
736 745
737 746 self.assertEqual(cborutil.decodeall(b'\xf6'), [None])
738 747 self.assertEqual(cborutil.decodeall(b'\xf6\xf6'), [None, None])
739 748
740 749 class MapTests(TestCase):
741 750 def testempty(self):
742 751 self.assertEqual(list(cborutil.streamencode({})), [b'\xa0'])
743 752 self.assertEqual(loadit(cborutil.streamencode({})), {})
744 753
745 754 self.assertEqual(cborutil.decodeall(b'\xa0'), [{}])
746 755
747 756 def testemptyindefinite(self):
748 757 self.assertEqual(list(cborutil.streamencodemapfromiter([])), [
749 758 b'\xbf', b'\xff'])
750 759
751 760 self.assertEqual(loadit(cborutil.streamencodemapfromiter([])), {})
752 761
753 762 with self.assertRaisesRegex(cborutil.CBORDecodeError,
754 763 'indefinite length uint not allowed'):
755 764 cborutil.decodeall(b'\xbf\xff')
756 765
757 766 def testone(self):
758 767 source = {b'foo': b'bar'}
759 768 self.assertEqual(list(cborutil.streamencode(source)), [
760 769 b'\xa1', b'\x43', b'foo', b'\x43', b'bar'])
761 770
762 771 self.assertEqual(loadit(cborutil.streamencode(source)), source)
763 772
764 773 self.assertEqual(cborutil.decodeall(b'\xa1\x43foo\x43bar'), [source])
765 774
766 775 def testmultiple(self):
767 776 source = {
768 777 b'foo': b'bar',
769 778 b'baz': b'value1',
770 779 }
771 780
772 781 self.assertEqual(loadit(cborutil.streamencode(source)), source)
773 782
774 783 self.assertEqual(
775 784 loadit(cborutil.streamencodemapfromiter(source.items())),
776 785 source)
777 786
778 787 encoded = b''.join(cborutil.streamencode(source))
779 788 self.assertEqual(cborutil.decodeall(encoded), [source])
780 789
781 790 def testcomplex(self):
782 791 source = {
783 792 b'key': 1,
784 793 2: -10,
785 794 }
786 795
787 796 self.assertEqual(loadit(cborutil.streamencode(source)),
788 797 source)
789 798
790 799 self.assertEqual(
791 800 loadit(cborutil.streamencodemapfromiter(source.items())),
792 801 source)
793 802
794 803 encoded = b''.join(cborutil.streamencode(source))
795 804 self.assertEqual(cborutil.decodeall(encoded), [source])
796 805
797 806 def testnested(self):
798 807 source = {b'key1': None, b'key2': {b'sub1': b'sub2'}, b'sub2': {}}
799 808 encoded = b''.join(cborutil.streamencode(source))
800 809
801 810 self.assertEqual(cborutil.decodeall(encoded), [source])
802 811
803 812 source = {
804 813 b'key1': [],
805 814 b'key2': [None, False],
806 815 b'key3': {b'foo', b'bar'},
807 816 b'key4': {},
808 817 }
809 818 encoded = b''.join(cborutil.streamencode(source))
810 819 self.assertEqual(cborutil.decodeall(encoded), [source])
811 820
812 821 def testillegalkey(self):
813 822 encoded = b''.join([
814 823 # map header + len 1
815 824 b'\xa1',
816 825 # indefinite length bytestring "foo" in key position
817 826 b'\x5f\x03foo\xff'
818 827 ])
819 828
820 829 with self.assertRaisesRegex(cborutil.CBORDecodeError,
821 830 'indefinite length bytestrings not '
822 831 'allowed as map keys'):
823 832 cborutil.decodeall(encoded)
824 833
825 834 encoded = b''.join([
826 835 b'\xa1',
827 836 b'\x80', # empty array
828 837 b'\x43foo',
829 838 ])
830 839
831 840 with self.assertRaisesRegex(cborutil.CBORDecodeError,
832 841 'collections not supported as map keys'):
833 842 cborutil.decodeall(encoded)
834 843
835 844 def testillegalvalue(self):
836 845 encoded = b''.join([
837 846 b'\xa1', # map headers
838 847 b'\x43foo', # key
839 848 b'\x5f\x03bar\xff', # indefinite length value
840 849 ])
841 850
842 851 with self.assertRaisesRegex(cborutil.CBORDecodeError,
843 852 'indefinite length bytestrings not '
844 853 'allowed as map values'):
845 854 cborutil.decodeall(encoded)
846 855
847 856 def testpartialdecode(self):
848 857 source = {b'key1': b'value1'}
849 858 encoded = b''.join(cborutil.streamencode(source))
850 859
851 860 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
852 861 (True, 1, 1, cborutil.SPECIAL_START_MAP))
853 862 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
854 863 (True, 1, 1, cborutil.SPECIAL_START_MAP))
855 864
856 865 source = {b'key%d' % i: None for i in range(23)}
857 866 encoded = b''.join(cborutil.streamencode(source))
858 867 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
859 868 (True, 23, 1, cborutil.SPECIAL_START_MAP))
860 869
861 870 source = {b'key%d' % i: None for i in range(24)}
862 871 encoded = b''.join(cborutil.streamencode(source))
863 872 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
864 873 (False, None, -1, cborutil.SPECIAL_NONE))
865 874 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
866 875 (True, 24, 2, cborutil.SPECIAL_START_MAP))
867 876 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
868 877 (True, 24, 2, cborutil.SPECIAL_START_MAP))
869 878
870 879 source = {b'key%d' % i: None for i in range(256)}
871 880 encoded = b''.join(cborutil.streamencode(source))
872 881 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
873 882 (False, None, -2, cborutil.SPECIAL_NONE))
874 883 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
875 884 (False, None, -1, cborutil.SPECIAL_NONE))
876 885 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
877 886 (True, 256, 3, cborutil.SPECIAL_START_MAP))
878 887 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
879 888 (True, 256, 3, cborutil.SPECIAL_START_MAP))
880 889
881 890 source = {b'key%d' % i: None for i in range(65536)}
882 891 encoded = b''.join(cborutil.streamencode(source))
883 892 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
884 893 (False, None, -4, cborutil.SPECIAL_NONE))
885 894 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
886 895 (False, None, -3, cborutil.SPECIAL_NONE))
887 896 self.assertEqual(cborutil.decodeitem(encoded[0:3]),
888 897 (False, None, -2, cborutil.SPECIAL_NONE))
889 898 self.assertEqual(cborutil.decodeitem(encoded[0:4]),
890 899 (False, None, -1, cborutil.SPECIAL_NONE))
891 900 self.assertEqual(cborutil.decodeitem(encoded[0:5]),
892 901 (True, 65536, 5, cborutil.SPECIAL_START_MAP))
893 902 self.assertEqual(cborutil.decodeitem(encoded[0:6]),
894 903 (True, 65536, 5, cborutil.SPECIAL_START_MAP))
895 904
896 905 class SemanticTagTests(TestCase):
897 906 def testdecodeforbidden(self):
898 907 for i in range(500):
899 908 if i == cborutil.SEMANTIC_TAG_FINITE_SET:
900 909 continue
901 910
902 911 tag = cborutil.encodelength(cborutil.MAJOR_TYPE_SEMANTIC,
903 912 i)
904 913
905 914 encoded = tag + cborutil.encodelength(cborutil.MAJOR_TYPE_UINT, 42)
906 915
907 916 # Partial decode is incomplete.
908 917 if i < 24:
909 918 pass
910 919 elif i < 256:
911 920 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
912 921 (False, None, -1, cborutil.SPECIAL_NONE))
913 922 elif i < 65536:
914 923 self.assertEqual(cborutil.decodeitem(encoded[0:1]),
915 924 (False, None, -2, cborutil.SPECIAL_NONE))
916 925 self.assertEqual(cborutil.decodeitem(encoded[0:2]),
917 926 (False, None, -1, cborutil.SPECIAL_NONE))
918 927
919 928 with self.assertRaisesRegex(cborutil.CBORDecodeError,
920 929 'semantic tag \d+ not allowed'):
921 930 cborutil.decodeitem(encoded)
922 931
923 932 class SpecialTypesTests(TestCase):
924 933 def testforbiddentypes(self):
925 934 for i in range(256):
926 935 if i == cborutil.SUBTYPE_FALSE:
927 936 continue
928 937 elif i == cborutil.SUBTYPE_TRUE:
929 938 continue
930 939 elif i == cborutil.SUBTYPE_NULL:
931 940 continue
932 941
933 942 encoded = cborutil.encodelength(cborutil.MAJOR_TYPE_SPECIAL, i)
934 943
935 944 with self.assertRaisesRegex(cborutil.CBORDecodeError,
936 945 'special type \d+ not allowed'):
937 946 cborutil.decodeitem(encoded)
938 947
939 948 class SansIODecoderTests(TestCase):
940 949 def testemptyinput(self):
941 950 decoder = cborutil.sansiodecoder()
942 951 self.assertEqual(decoder.decode(b''), (False, 0, 0))
943 952
944 953 class BufferingDecoderTests(TestCase):
945 954 def testsimple(self):
946 955 source = [
947 956 b'foobar',
948 957 b'x' * 128,
949 958 {b'foo': b'bar'},
950 959 True,
951 960 False,
952 961 None,
953 962 [None for i in range(128)],
954 963 ]
955 964
956 965 encoded = b''.join(cborutil.streamencode(source))
957 966
958 967 for step in range(1, 32):
959 968 decoder = cborutil.bufferingdecoder()
960 969 start = 0
961 970
962 971 while start < len(encoded):
963 972 decoder.decode(encoded[start:start + step])
964 973 start += step
965 974
966 975 self.assertEqual(decoder.getavailable(), [source])
967 976
968 977 def testbytearray(self):
969 978 source = b''.join(cborutil.streamencode(b'foobar'))
970 979
971 980 decoder = cborutil.bufferingdecoder()
972 981 decoder.decode(bytearray(source))
973 982
974 983 self.assertEqual(decoder.getavailable(), [b'foobar'])
975 984
976 985 class DecodeallTests(TestCase):
977 986 def testemptyinput(self):
978 987 self.assertEqual(cborutil.decodeall(b''), [])
979 988
980 989 def testpartialinput(self):
981 990 encoded = b''.join([
982 991 b'\x82', # array of 2 elements
983 992 b'\x01', # integer 1
984 993 ])
985 994
986 995 with self.assertRaisesRegex(cborutil.CBORDecodeError,
987 996 'input data not complete'):
988 997 cborutil.decodeall(encoded)
989 998
990 999 if __name__ == '__main__':
991 1000 import silenttestrunner
992 1001 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now