decoder.py
407 lines
| 11.4 KiB
| text/x-python
|
PythonLexer
Pulkit Goyal
|
r37144 | import re | ||
import struct | ||||
from datetime import datetime, timedelta | ||||
from io import BytesIO | ||||
from .compat import timezone, xrange, byte_as_integer, unpack_float16 | ||||
from .types import CBORTag, undefined, break_marker, CBORSimpleValue | ||||
timestamp_re = re.compile(r'^(\d{4})-(\d\d)-(\d\d)T(\d\d):(\d\d):(\d\d)' | ||||
r'(?:\.(\d+))?(?:Z|([+-]\d\d):(\d\d))$') | ||||
class CBORDecodeError(Exception): | ||||
"""Raised when an error occurs deserializing a CBOR datastream.""" | ||||
def decode_uint(decoder, subtype, shareable_index=None, allow_indefinite=False): | ||||
# Major tag 0 | ||||
if subtype < 24: | ||||
return subtype | ||||
elif subtype == 24: | ||||
return struct.unpack('>B', decoder.read(1))[0] | ||||
elif subtype == 25: | ||||
return struct.unpack('>H', decoder.read(2))[0] | ||||
elif subtype == 26: | ||||
return struct.unpack('>L', decoder.read(4))[0] | ||||
elif subtype == 27: | ||||
return struct.unpack('>Q', decoder.read(8))[0] | ||||
elif subtype == 31 and allow_indefinite: | ||||
return None | ||||
else: | ||||
raise CBORDecodeError('unknown unsigned integer subtype 0x%x' % subtype) | ||||
def decode_negint(decoder, subtype, shareable_index=None): | ||||
# Major tag 1 | ||||
uint = decode_uint(decoder, subtype) | ||||
return -uint - 1 | ||||
def decode_bytestring(decoder, subtype, shareable_index=None): | ||||
# Major tag 2 | ||||
length = decode_uint(decoder, subtype, allow_indefinite=True) | ||||
if length is None: | ||||
# Indefinite length | ||||
buf = bytearray() | ||||
while True: | ||||
initial_byte = byte_as_integer(decoder.read(1)) | ||||
if initial_byte == 255: | ||||
return buf | ||||
else: | ||||
length = decode_uint(decoder, initial_byte & 31) | ||||
value = decoder.read(length) | ||||
buf.extend(value) | ||||
else: | ||||
return decoder.read(length) | ||||
def decode_string(decoder, subtype, shareable_index=None): | ||||
# Major tag 3 | ||||
return decode_bytestring(decoder, subtype).decode('utf-8') | ||||
def decode_array(decoder, subtype, shareable_index=None): | ||||
# Major tag 4 | ||||
items = [] | ||||
decoder.set_shareable(shareable_index, items) | ||||
length = decode_uint(decoder, subtype, allow_indefinite=True) | ||||
if length is None: | ||||
# Indefinite length | ||||
while True: | ||||
value = decoder.decode() | ||||
if value is break_marker: | ||||
break | ||||
else: | ||||
items.append(value) | ||||
else: | ||||
for _ in xrange(length): | ||||
item = decoder.decode() | ||||
items.append(item) | ||||
return items | ||||
def decode_map(decoder, subtype, shareable_index=None): | ||||
# Major tag 5 | ||||
dictionary = {} | ||||
decoder.set_shareable(shareable_index, dictionary) | ||||
length = decode_uint(decoder, subtype, allow_indefinite=True) | ||||
if length is None: | ||||
# Indefinite length | ||||
while True: | ||||
key = decoder.decode() | ||||
if key is break_marker: | ||||
break | ||||
else: | ||||
value = decoder.decode() | ||||
dictionary[key] = value | ||||
else: | ||||
for _ in xrange(length): | ||||
key = decoder.decode() | ||||
value = decoder.decode() | ||||
dictionary[key] = value | ||||
if decoder.object_hook: | ||||
return decoder.object_hook(decoder, dictionary) | ||||
else: | ||||
return dictionary | ||||
def decode_semantic(decoder, subtype, shareable_index=None): | ||||
# Major tag 6 | ||||
tagnum = decode_uint(decoder, subtype) | ||||
# Special handling for the "shareable" tag | ||||
if tagnum == 28: | ||||
shareable_index = decoder._allocate_shareable() | ||||
return decoder.decode(shareable_index) | ||||
value = decoder.decode() | ||||
semantic_decoder = semantic_decoders.get(tagnum) | ||||
if semantic_decoder: | ||||
return semantic_decoder(decoder, value, shareable_index) | ||||
tag = CBORTag(tagnum, value) | ||||
if decoder.tag_hook: | ||||
return decoder.tag_hook(decoder, tag, shareable_index) | ||||
else: | ||||
return tag | ||||
def decode_special(decoder, subtype, shareable_index=None): | ||||
# Simple value | ||||
if subtype < 20: | ||||
return CBORSimpleValue(subtype) | ||||
# Major tag 7 | ||||
return special_decoders[subtype](decoder) | ||||
# | ||||
# Semantic decoders (major tag 6) | ||||
# | ||||
def decode_datetime_string(decoder, value, shareable_index=None): | ||||
# Semantic tag 0 | ||||
match = timestamp_re.match(value) | ||||
if match: | ||||
year, month, day, hour, minute, second, micro, offset_h, offset_m = match.groups() | ||||
if offset_h: | ||||
tz = timezone(timedelta(hours=int(offset_h), minutes=int(offset_m))) | ||||
else: | ||||
tz = timezone.utc | ||||
return datetime(int(year), int(month), int(day), int(hour), int(minute), int(second), | ||||
int(micro or 0), tz) | ||||
else: | ||||
raise CBORDecodeError('invalid datetime string: {}'.format(value)) | ||||
def decode_epoch_datetime(decoder, value, shareable_index=None): | ||||
# Semantic tag 1 | ||||
return datetime.fromtimestamp(value, timezone.utc) | ||||
def decode_positive_bignum(decoder, value, shareable_index=None): | ||||
# Semantic tag 2 | ||||
from binascii import hexlify | ||||
return int(hexlify(value), 16) | ||||
def decode_negative_bignum(decoder, value, shareable_index=None): | ||||
# Semantic tag 3 | ||||
return -decode_positive_bignum(decoder, value) - 1 | ||||
def decode_fraction(decoder, value, shareable_index=None): | ||||
# Semantic tag 4 | ||||
from decimal import Decimal | ||||
exp = Decimal(value[0]) | ||||
mantissa = Decimal(value[1]) | ||||
return mantissa * (10 ** exp) | ||||
def decode_bigfloat(decoder, value, shareable_index=None): | ||||
# Semantic tag 5 | ||||
from decimal import Decimal | ||||
exp = Decimal(value[0]) | ||||
mantissa = Decimal(value[1]) | ||||
return mantissa * (2 ** exp) | ||||
def decode_sharedref(decoder, value, shareable_index=None): | ||||
# Semantic tag 29 | ||||
try: | ||||
shared = decoder._shareables[value] | ||||
except IndexError: | ||||
raise CBORDecodeError('shared reference %d not found' % value) | ||||
if shared is None: | ||||
raise CBORDecodeError('shared value %d has not been initialized' % value) | ||||
else: | ||||
return shared | ||||
def decode_rational(decoder, value, shareable_index=None): | ||||
# Semantic tag 30 | ||||
from fractions import Fraction | ||||
return Fraction(*value) | ||||
def decode_regexp(decoder, value, shareable_index=None): | ||||
# Semantic tag 35 | ||||
return re.compile(value) | ||||
def decode_mime(decoder, value, shareable_index=None): | ||||
# Semantic tag 36 | ||||
from email.parser import Parser | ||||
return Parser().parsestr(value) | ||||
def decode_uuid(decoder, value, shareable_index=None): | ||||
# Semantic tag 37 | ||||
from uuid import UUID | ||||
return UUID(bytes=value) | ||||
def decode_set(decoder, value, shareable_index=None): | ||||
# Semantic tag 258 | ||||
return set(value) | ||||
# | ||||
# Special decoders (major tag 7) | ||||
# | ||||
def decode_simple_value(decoder, shareable_index=None): | ||||
return CBORSimpleValue(struct.unpack('>B', decoder.read(1))[0]) | ||||
def decode_float16(decoder, shareable_index=None): | ||||
payload = decoder.read(2) | ||||
return unpack_float16(payload) | ||||
def decode_float32(decoder, shareable_index=None): | ||||
return struct.unpack('>f', decoder.read(4))[0] | ||||
def decode_float64(decoder, shareable_index=None): | ||||
return struct.unpack('>d', decoder.read(8))[0] | ||||
major_decoders = { | ||||
0: decode_uint, | ||||
1: decode_negint, | ||||
2: decode_bytestring, | ||||
3: decode_string, | ||||
4: decode_array, | ||||
5: decode_map, | ||||
6: decode_semantic, | ||||
7: decode_special | ||||
} | ||||
special_decoders = { | ||||
20: lambda self: False, | ||||
21: lambda self: True, | ||||
22: lambda self: None, | ||||
23: lambda self: undefined, | ||||
24: decode_simple_value, | ||||
25: decode_float16, | ||||
26: decode_float32, | ||||
27: decode_float64, | ||||
31: lambda self: break_marker | ||||
} | ||||
semantic_decoders = { | ||||
0: decode_datetime_string, | ||||
1: decode_epoch_datetime, | ||||
2: decode_positive_bignum, | ||||
3: decode_negative_bignum, | ||||
4: decode_fraction, | ||||
5: decode_bigfloat, | ||||
29: decode_sharedref, | ||||
30: decode_rational, | ||||
35: decode_regexp, | ||||
36: decode_mime, | ||||
37: decode_uuid, | ||||
258: decode_set | ||||
} | ||||
class CBORDecoder(object): | ||||
""" | ||||
Deserializes a CBOR encoded byte stream. | ||||
:param tag_hook: Callable that takes 3 arguments: the decoder instance, the | ||||
:class:`~cbor2.types.CBORTag` and the shareable index for the resulting object, if any. | ||||
This callback is called for any tags for which there is no built-in decoder. | ||||
The return value is substituted for the CBORTag object in the deserialized output. | ||||
:param object_hook: Callable that takes 2 arguments: the decoder instance and the dictionary. | ||||
This callback is called for each deserialized :class:`dict` object. | ||||
The return value is substituted for the dict in the deserialized output. | ||||
""" | ||||
__slots__ = ('fp', 'tag_hook', 'object_hook', '_shareables') | ||||
def __init__(self, fp, tag_hook=None, object_hook=None): | ||||
self.fp = fp | ||||
self.tag_hook = tag_hook | ||||
self.object_hook = object_hook | ||||
self._shareables = [] | ||||
def _allocate_shareable(self): | ||||
self._shareables.append(None) | ||||
return len(self._shareables) - 1 | ||||
def set_shareable(self, index, value): | ||||
""" | ||||
Set the shareable value for the last encountered shared value marker, if any. | ||||
If the given index is ``None``, nothing is done. | ||||
:param index: the value of the ``shared_index`` argument to the decoder | ||||
:param value: the shared value | ||||
""" | ||||
if index is not None: | ||||
self._shareables[index] = value | ||||
def read(self, amount): | ||||
""" | ||||
Read bytes from the data stream. | ||||
:param int amount: the number of bytes to read | ||||
""" | ||||
data = self.fp.read(amount) | ||||
if len(data) < amount: | ||||
raise CBORDecodeError('premature end of stream (expected to read {} bytes, got {} ' | ||||
'instead)'.format(amount, len(data))) | ||||
return data | ||||
def decode(self, shareable_index=None): | ||||
""" | ||||
Decode the next value from the stream. | ||||
:raises CBORDecodeError: if there is any problem decoding the stream | ||||
""" | ||||
try: | ||||
initial_byte = byte_as_integer(self.fp.read(1)) | ||||
major_type = initial_byte >> 5 | ||||
subtype = initial_byte & 31 | ||||
except Exception as e: | ||||
raise CBORDecodeError('error reading major type at index {}: {}' | ||||
.format(self.fp.tell(), e)) | ||||
decoder = major_decoders[major_type] | ||||
try: | ||||
return decoder(self, subtype, shareable_index) | ||||
except CBORDecodeError: | ||||
raise | ||||
except Exception as e: | ||||
raise CBORDecodeError('error decoding value at index {}: {}'.format(self.fp.tell(), e)) | ||||
def decode_from_bytes(self, buf): | ||||
""" | ||||
Wrap the given bytestring as a file and call :meth:`decode` with it as the argument. | ||||
This method was intended to be used from the ``tag_hook`` hook when an object needs to be | ||||
decoded separately from the rest but while still taking advantage of the shared value | ||||
registry. | ||||
""" | ||||
old_fp = self.fp | ||||
self.fp = BytesIO(buf) | ||||
retval = self.decode() | ||||
self.fp = old_fp | ||||
return retval | ||||
def loads(payload, **kwargs): | ||||
""" | ||||
Deserialize an object from a bytestring. | ||||
:param bytes payload: the bytestring to serialize | ||||
:param kwargs: keyword arguments passed to :class:`~.CBORDecoder` | ||||
:return: the deserialized object | ||||
""" | ||||
fp = BytesIO(payload) | ||||
return CBORDecoder(fp, **kwargs).decode() | ||||
def load(fp, **kwargs): | ||||
""" | ||||
Deserialize an object from an open file. | ||||
:param fp: the input file (any file-like object) | ||||
:param kwargs: keyword arguments passed to :class:`~.CBORDecoder` | ||||
:return: the deserialized object | ||||
""" | ||||
return CBORDecoder(fp, **kwargs).decode() | ||||