|
|
import re
|
|
|
import struct
|
|
|
from datetime import datetime, timedelta
|
|
|
from io import BytesIO
|
|
|
|
|
|
from .compat import timezone, xrange, byte_as_integer, unpack_float16
|
|
|
from .types import CBORTag, undefined, break_marker, CBORSimpleValue
|
|
|
|
|
|
timestamp_re = re.compile(r'^(\d{4})-(\d\d)-(\d\d)T(\d\d):(\d\d):(\d\d)'
|
|
|
r'(?:\.(\d+))?(?:Z|([+-]\d\d):(\d\d))$')
|
|
|
|
|
|
|
|
|
class CBORDecodeError(Exception):
|
|
|
"""Raised when an error occurs deserializing a CBOR datastream."""
|
|
|
|
|
|
|
|
|
def decode_uint(decoder, subtype, shareable_index=None, allow_indefinite=False):
|
|
|
# Major tag 0
|
|
|
if subtype < 24:
|
|
|
return subtype
|
|
|
elif subtype == 24:
|
|
|
return struct.unpack('>B', decoder.read(1))[0]
|
|
|
elif subtype == 25:
|
|
|
return struct.unpack('>H', decoder.read(2))[0]
|
|
|
elif subtype == 26:
|
|
|
return struct.unpack('>L', decoder.read(4))[0]
|
|
|
elif subtype == 27:
|
|
|
return struct.unpack('>Q', decoder.read(8))[0]
|
|
|
elif subtype == 31 and allow_indefinite:
|
|
|
return None
|
|
|
else:
|
|
|
raise CBORDecodeError('unknown unsigned integer subtype 0x%x' % subtype)
|
|
|
|
|
|
|
|
|
def decode_negint(decoder, subtype, shareable_index=None):
|
|
|
# Major tag 1
|
|
|
uint = decode_uint(decoder, subtype)
|
|
|
return -uint - 1
|
|
|
|
|
|
|
|
|
def decode_bytestring(decoder, subtype, shareable_index=None):
|
|
|
# Major tag 2
|
|
|
length = decode_uint(decoder, subtype, allow_indefinite=True)
|
|
|
if length is None:
|
|
|
# Indefinite length
|
|
|
buf = bytearray()
|
|
|
while True:
|
|
|
initial_byte = byte_as_integer(decoder.read(1))
|
|
|
if initial_byte == 255:
|
|
|
return buf
|
|
|
else:
|
|
|
length = decode_uint(decoder, initial_byte & 31)
|
|
|
value = decoder.read(length)
|
|
|
buf.extend(value)
|
|
|
else:
|
|
|
return decoder.read(length)
|
|
|
|
|
|
|
|
|
def decode_string(decoder, subtype, shareable_index=None):
|
|
|
# Major tag 3
|
|
|
return decode_bytestring(decoder, subtype).decode('utf-8')
|
|
|
|
|
|
|
|
|
def decode_array(decoder, subtype, shareable_index=None):
|
|
|
# Major tag 4
|
|
|
items = []
|
|
|
decoder.set_shareable(shareable_index, items)
|
|
|
length = decode_uint(decoder, subtype, allow_indefinite=True)
|
|
|
if length is None:
|
|
|
# Indefinite length
|
|
|
while True:
|
|
|
value = decoder.decode()
|
|
|
if value is break_marker:
|
|
|
break
|
|
|
else:
|
|
|
items.append(value)
|
|
|
else:
|
|
|
for _ in xrange(length):
|
|
|
item = decoder.decode()
|
|
|
items.append(item)
|
|
|
|
|
|
return items
|
|
|
|
|
|
|
|
|
def decode_map(decoder, subtype, shareable_index=None):
|
|
|
# Major tag 5
|
|
|
dictionary = {}
|
|
|
decoder.set_shareable(shareable_index, dictionary)
|
|
|
length = decode_uint(decoder, subtype, allow_indefinite=True)
|
|
|
if length is None:
|
|
|
# Indefinite length
|
|
|
while True:
|
|
|
key = decoder.decode()
|
|
|
if key is break_marker:
|
|
|
break
|
|
|
else:
|
|
|
value = decoder.decode()
|
|
|
dictionary[key] = value
|
|
|
else:
|
|
|
for _ in xrange(length):
|
|
|
key = decoder.decode()
|
|
|
value = decoder.decode()
|
|
|
dictionary[key] = value
|
|
|
|
|
|
if decoder.object_hook:
|
|
|
return decoder.object_hook(decoder, dictionary)
|
|
|
else:
|
|
|
return dictionary
|
|
|
|
|
|
|
|
|
def decode_semantic(decoder, subtype, shareable_index=None):
|
|
|
# Major tag 6
|
|
|
tagnum = decode_uint(decoder, subtype)
|
|
|
|
|
|
# Special handling for the "shareable" tag
|
|
|
if tagnum == 28:
|
|
|
shareable_index = decoder._allocate_shareable()
|
|
|
return decoder.decode(shareable_index)
|
|
|
|
|
|
value = decoder.decode()
|
|
|
semantic_decoder = semantic_decoders.get(tagnum)
|
|
|
if semantic_decoder:
|
|
|
return semantic_decoder(decoder, value, shareable_index)
|
|
|
|
|
|
tag = CBORTag(tagnum, value)
|
|
|
if decoder.tag_hook:
|
|
|
return decoder.tag_hook(decoder, tag, shareable_index)
|
|
|
else:
|
|
|
return tag
|
|
|
|
|
|
|
|
|
def decode_special(decoder, subtype, shareable_index=None):
|
|
|
# Simple value
|
|
|
if subtype < 20:
|
|
|
return CBORSimpleValue(subtype)
|
|
|
|
|
|
# Major tag 7
|
|
|
return special_decoders[subtype](decoder)
|
|
|
|
|
|
|
|
|
#
|
|
|
# Semantic decoders (major tag 6)
|
|
|
#
|
|
|
|
|
|
def decode_datetime_string(decoder, value, shareable_index=None):
|
|
|
# Semantic tag 0
|
|
|
match = timestamp_re.match(value)
|
|
|
if match:
|
|
|
year, month, day, hour, minute, second, micro, offset_h, offset_m = match.groups()
|
|
|
if offset_h:
|
|
|
tz = timezone(timedelta(hours=int(offset_h), minutes=int(offset_m)))
|
|
|
else:
|
|
|
tz = timezone.utc
|
|
|
|
|
|
return datetime(int(year), int(month), int(day), int(hour), int(minute), int(second),
|
|
|
int(micro or 0), tz)
|
|
|
else:
|
|
|
raise CBORDecodeError('invalid datetime string: {}'.format(value))
|
|
|
|
|
|
|
|
|
def decode_epoch_datetime(decoder, value, shareable_index=None):
|
|
|
# Semantic tag 1
|
|
|
return datetime.fromtimestamp(value, timezone.utc)
|
|
|
|
|
|
|
|
|
def decode_positive_bignum(decoder, value, shareable_index=None):
|
|
|
# Semantic tag 2
|
|
|
from binascii import hexlify
|
|
|
return int(hexlify(value), 16)
|
|
|
|
|
|
|
|
|
def decode_negative_bignum(decoder, value, shareable_index=None):
|
|
|
# Semantic tag 3
|
|
|
return -decode_positive_bignum(decoder, value) - 1
|
|
|
|
|
|
|
|
|
def decode_fraction(decoder, value, shareable_index=None):
|
|
|
# Semantic tag 4
|
|
|
from decimal import Decimal
|
|
|
exp = Decimal(value[0])
|
|
|
mantissa = Decimal(value[1])
|
|
|
return mantissa * (10 ** exp)
|
|
|
|
|
|
|
|
|
def decode_bigfloat(decoder, value, shareable_index=None):
|
|
|
# Semantic tag 5
|
|
|
from decimal import Decimal
|
|
|
exp = Decimal(value[0])
|
|
|
mantissa = Decimal(value[1])
|
|
|
return mantissa * (2 ** exp)
|
|
|
|
|
|
|
|
|
def decode_sharedref(decoder, value, shareable_index=None):
|
|
|
# Semantic tag 29
|
|
|
try:
|
|
|
shared = decoder._shareables[value]
|
|
|
except IndexError:
|
|
|
raise CBORDecodeError('shared reference %d not found' % value)
|
|
|
|
|
|
if shared is None:
|
|
|
raise CBORDecodeError('shared value %d has not been initialized' % value)
|
|
|
else:
|
|
|
return shared
|
|
|
|
|
|
|
|
|
def decode_rational(decoder, value, shareable_index=None):
|
|
|
# Semantic tag 30
|
|
|
from fractions import Fraction
|
|
|
return Fraction(*value)
|
|
|
|
|
|
|
|
|
def decode_regexp(decoder, value, shareable_index=None):
|
|
|
# Semantic tag 35
|
|
|
return re.compile(value)
|
|
|
|
|
|
|
|
|
def decode_mime(decoder, value, shareable_index=None):
|
|
|
# Semantic tag 36
|
|
|
from email.parser import Parser
|
|
|
return Parser().parsestr(value)
|
|
|
|
|
|
|
|
|
def decode_uuid(decoder, value, shareable_index=None):
|
|
|
# Semantic tag 37
|
|
|
from uuid import UUID
|
|
|
return UUID(bytes=value)
|
|
|
|
|
|
|
|
|
def decode_set(decoder, value, shareable_index=None):
|
|
|
# Semantic tag 258
|
|
|
return set(value)
|
|
|
|
|
|
|
|
|
#
|
|
|
# Special decoders (major tag 7)
|
|
|
#
|
|
|
|
|
|
def decode_simple_value(decoder, shareable_index=None):
|
|
|
return CBORSimpleValue(struct.unpack('>B', decoder.read(1))[0])
|
|
|
|
|
|
|
|
|
def decode_float16(decoder, shareable_index=None):
|
|
|
payload = decoder.read(2)
|
|
|
return unpack_float16(payload)
|
|
|
|
|
|
|
|
|
def decode_float32(decoder, shareable_index=None):
|
|
|
return struct.unpack('>f', decoder.read(4))[0]
|
|
|
|
|
|
|
|
|
def decode_float64(decoder, shareable_index=None):
|
|
|
return struct.unpack('>d', decoder.read(8))[0]
|
|
|
|
|
|
|
|
|
major_decoders = {
|
|
|
0: decode_uint,
|
|
|
1: decode_negint,
|
|
|
2: decode_bytestring,
|
|
|
3: decode_string,
|
|
|
4: decode_array,
|
|
|
5: decode_map,
|
|
|
6: decode_semantic,
|
|
|
7: decode_special
|
|
|
}
|
|
|
|
|
|
special_decoders = {
|
|
|
20: lambda self: False,
|
|
|
21: lambda self: True,
|
|
|
22: lambda self: None,
|
|
|
23: lambda self: undefined,
|
|
|
24: decode_simple_value,
|
|
|
25: decode_float16,
|
|
|
26: decode_float32,
|
|
|
27: decode_float64,
|
|
|
31: lambda self: break_marker
|
|
|
}
|
|
|
|
|
|
semantic_decoders = {
|
|
|
0: decode_datetime_string,
|
|
|
1: decode_epoch_datetime,
|
|
|
2: decode_positive_bignum,
|
|
|
3: decode_negative_bignum,
|
|
|
4: decode_fraction,
|
|
|
5: decode_bigfloat,
|
|
|
29: decode_sharedref,
|
|
|
30: decode_rational,
|
|
|
35: decode_regexp,
|
|
|
36: decode_mime,
|
|
|
37: decode_uuid,
|
|
|
258: decode_set
|
|
|
}
|
|
|
|
|
|
|
|
|
class CBORDecoder(object):
|
|
|
"""
|
|
|
Deserializes a CBOR encoded byte stream.
|
|
|
|
|
|
:param tag_hook: Callable that takes 3 arguments: the decoder instance, the
|
|
|
:class:`~cbor2.types.CBORTag` and the shareable index for the resulting object, if any.
|
|
|
This callback is called for any tags for which there is no built-in decoder.
|
|
|
The return value is substituted for the CBORTag object in the deserialized output.
|
|
|
:param object_hook: Callable that takes 2 arguments: the decoder instance and the dictionary.
|
|
|
This callback is called for each deserialized :class:`dict` object.
|
|
|
The return value is substituted for the dict in the deserialized output.
|
|
|
"""
|
|
|
|
|
|
__slots__ = ('fp', 'tag_hook', 'object_hook', '_shareables')
|
|
|
|
|
|
def __init__(self, fp, tag_hook=None, object_hook=None):
|
|
|
self.fp = fp
|
|
|
self.tag_hook = tag_hook
|
|
|
self.object_hook = object_hook
|
|
|
self._shareables = []
|
|
|
|
|
|
def _allocate_shareable(self):
|
|
|
self._shareables.append(None)
|
|
|
return len(self._shareables) - 1
|
|
|
|
|
|
def set_shareable(self, index, value):
|
|
|
"""
|
|
|
Set the shareable value for the last encountered shared value marker, if any.
|
|
|
|
|
|
If the given index is ``None``, nothing is done.
|
|
|
|
|
|
:param index: the value of the ``shared_index`` argument to the decoder
|
|
|
:param value: the shared value
|
|
|
|
|
|
"""
|
|
|
if index is not None:
|
|
|
self._shareables[index] = value
|
|
|
|
|
|
def read(self, amount):
|
|
|
"""
|
|
|
Read bytes from the data stream.
|
|
|
|
|
|
:param int amount: the number of bytes to read
|
|
|
|
|
|
"""
|
|
|
data = self.fp.read(amount)
|
|
|
if len(data) < amount:
|
|
|
raise CBORDecodeError('premature end of stream (expected to read {} bytes, got {} '
|
|
|
'instead)'.format(amount, len(data)))
|
|
|
|
|
|
return data
|
|
|
|
|
|
def decode(self, shareable_index=None):
|
|
|
"""
|
|
|
Decode the next value from the stream.
|
|
|
|
|
|
:raises CBORDecodeError: if there is any problem decoding the stream
|
|
|
|
|
|
"""
|
|
|
try:
|
|
|
initial_byte = byte_as_integer(self.fp.read(1))
|
|
|
major_type = initial_byte >> 5
|
|
|
subtype = initial_byte & 31
|
|
|
except Exception as e:
|
|
|
raise CBORDecodeError('error reading major type at index {}: {}'
|
|
|
.format(self.fp.tell(), e))
|
|
|
|
|
|
decoder = major_decoders[major_type]
|
|
|
try:
|
|
|
return decoder(self, subtype, shareable_index)
|
|
|
except CBORDecodeError:
|
|
|
raise
|
|
|
except Exception as e:
|
|
|
raise CBORDecodeError('error decoding value at index {}: {}'.format(self.fp.tell(), e))
|
|
|
|
|
|
def decode_from_bytes(self, buf):
|
|
|
"""
|
|
|
Wrap the given bytestring as a file and call :meth:`decode` with it as the argument.
|
|
|
|
|
|
This method was intended to be used from the ``tag_hook`` hook when an object needs to be
|
|
|
decoded separately from the rest but while still taking advantage of the shared value
|
|
|
registry.
|
|
|
|
|
|
"""
|
|
|
old_fp = self.fp
|
|
|
self.fp = BytesIO(buf)
|
|
|
retval = self.decode()
|
|
|
self.fp = old_fp
|
|
|
return retval
|
|
|
|
|
|
|
|
|
def loads(payload, **kwargs):
|
|
|
"""
|
|
|
Deserialize an object from a bytestring.
|
|
|
|
|
|
:param bytes payload: the bytestring to serialize
|
|
|
:param kwargs: keyword arguments passed to :class:`~.CBORDecoder`
|
|
|
:return: the deserialized object
|
|
|
|
|
|
"""
|
|
|
fp = BytesIO(payload)
|
|
|
return CBORDecoder(fp, **kwargs).decode()
|
|
|
|
|
|
|
|
|
def load(fp, **kwargs):
|
|
|
"""
|
|
|
Deserialize an object from an open file.
|
|
|
|
|
|
:param fp: the input file (any file-like object)
|
|
|
:param kwargs: keyword arguments passed to :class:`~.CBORDecoder`
|
|
|
:return: the deserialized object
|
|
|
|
|
|
"""
|
|
|
return CBORDecoder(fp, **kwargs).decode()
|
|
|
|