##// END OF EJS Templates
obsutil: prefetch method in allpredecessors loop...
obsutil: prefetch method in allpredecessors loop We don't expect a massive speedup from this, but the change was laying around in my repository and it cannot hurt.

File last commit:

r37144:4bd73a95 default
r40497:c7618901 default
Show More
decoder.py
407 lines | 11.4 KiB | text/x-python | PythonLexer
Pulkit Goyal
thirdparty: vendor cbor2 python library...
r37144 import re
import struct
from datetime import datetime, timedelta
from io import BytesIO
from .compat import timezone, xrange, byte_as_integer, unpack_float16
from .types import CBORTag, undefined, break_marker, CBORSimpleValue
timestamp_re = re.compile(r'^(\d{4})-(\d\d)-(\d\d)T(\d\d):(\d\d):(\d\d)'
r'(?:\.(\d+))?(?:Z|([+-]\d\d):(\d\d))$')
class CBORDecodeError(Exception):
"""Raised when an error occurs deserializing a CBOR datastream."""
def decode_uint(decoder, subtype, shareable_index=None, allow_indefinite=False):
# Major tag 0
if subtype < 24:
return subtype
elif subtype == 24:
return struct.unpack('>B', decoder.read(1))[0]
elif subtype == 25:
return struct.unpack('>H', decoder.read(2))[0]
elif subtype == 26:
return struct.unpack('>L', decoder.read(4))[0]
elif subtype == 27:
return struct.unpack('>Q', decoder.read(8))[0]
elif subtype == 31 and allow_indefinite:
return None
else:
raise CBORDecodeError('unknown unsigned integer subtype 0x%x' % subtype)
def decode_negint(decoder, subtype, shareable_index=None):
# Major tag 1
uint = decode_uint(decoder, subtype)
return -uint - 1
def decode_bytestring(decoder, subtype, shareable_index=None):
# Major tag 2
length = decode_uint(decoder, subtype, allow_indefinite=True)
if length is None:
# Indefinite length
buf = bytearray()
while True:
initial_byte = byte_as_integer(decoder.read(1))
if initial_byte == 255:
return buf
else:
length = decode_uint(decoder, initial_byte & 31)
value = decoder.read(length)
buf.extend(value)
else:
return decoder.read(length)
def decode_string(decoder, subtype, shareable_index=None):
# Major tag 3
return decode_bytestring(decoder, subtype).decode('utf-8')
def decode_array(decoder, subtype, shareable_index=None):
# Major tag 4
items = []
decoder.set_shareable(shareable_index, items)
length = decode_uint(decoder, subtype, allow_indefinite=True)
if length is None:
# Indefinite length
while True:
value = decoder.decode()
if value is break_marker:
break
else:
items.append(value)
else:
for _ in xrange(length):
item = decoder.decode()
items.append(item)
return items
def decode_map(decoder, subtype, shareable_index=None):
# Major tag 5
dictionary = {}
decoder.set_shareable(shareable_index, dictionary)
length = decode_uint(decoder, subtype, allow_indefinite=True)
if length is None:
# Indefinite length
while True:
key = decoder.decode()
if key is break_marker:
break
else:
value = decoder.decode()
dictionary[key] = value
else:
for _ in xrange(length):
key = decoder.decode()
value = decoder.decode()
dictionary[key] = value
if decoder.object_hook:
return decoder.object_hook(decoder, dictionary)
else:
return dictionary
def decode_semantic(decoder, subtype, shareable_index=None):
# Major tag 6
tagnum = decode_uint(decoder, subtype)
# Special handling for the "shareable" tag
if tagnum == 28:
shareable_index = decoder._allocate_shareable()
return decoder.decode(shareable_index)
value = decoder.decode()
semantic_decoder = semantic_decoders.get(tagnum)
if semantic_decoder:
return semantic_decoder(decoder, value, shareable_index)
tag = CBORTag(tagnum, value)
if decoder.tag_hook:
return decoder.tag_hook(decoder, tag, shareable_index)
else:
return tag
def decode_special(decoder, subtype, shareable_index=None):
# Simple value
if subtype < 20:
return CBORSimpleValue(subtype)
# Major tag 7
return special_decoders[subtype](decoder)
#
# Semantic decoders (major tag 6)
#
def decode_datetime_string(decoder, value, shareable_index=None):
# Semantic tag 0
match = timestamp_re.match(value)
if match:
year, month, day, hour, minute, second, micro, offset_h, offset_m = match.groups()
if offset_h:
tz = timezone(timedelta(hours=int(offset_h), minutes=int(offset_m)))
else:
tz = timezone.utc
return datetime(int(year), int(month), int(day), int(hour), int(minute), int(second),
int(micro or 0), tz)
else:
raise CBORDecodeError('invalid datetime string: {}'.format(value))
def decode_epoch_datetime(decoder, value, shareable_index=None):
# Semantic tag 1
return datetime.fromtimestamp(value, timezone.utc)
def decode_positive_bignum(decoder, value, shareable_index=None):
# Semantic tag 2
from binascii import hexlify
return int(hexlify(value), 16)
def decode_negative_bignum(decoder, value, shareable_index=None):
# Semantic tag 3
return -decode_positive_bignum(decoder, value) - 1
def decode_fraction(decoder, value, shareable_index=None):
# Semantic tag 4
from decimal import Decimal
exp = Decimal(value[0])
mantissa = Decimal(value[1])
return mantissa * (10 ** exp)
def decode_bigfloat(decoder, value, shareable_index=None):
# Semantic tag 5
from decimal import Decimal
exp = Decimal(value[0])
mantissa = Decimal(value[1])
return mantissa * (2 ** exp)
def decode_sharedref(decoder, value, shareable_index=None):
# Semantic tag 29
try:
shared = decoder._shareables[value]
except IndexError:
raise CBORDecodeError('shared reference %d not found' % value)
if shared is None:
raise CBORDecodeError('shared value %d has not been initialized' % value)
else:
return shared
def decode_rational(decoder, value, shareable_index=None):
# Semantic tag 30
from fractions import Fraction
return Fraction(*value)
def decode_regexp(decoder, value, shareable_index=None):
# Semantic tag 35
return re.compile(value)
def decode_mime(decoder, value, shareable_index=None):
# Semantic tag 36
from email.parser import Parser
return Parser().parsestr(value)
def decode_uuid(decoder, value, shareable_index=None):
# Semantic tag 37
from uuid import UUID
return UUID(bytes=value)
def decode_set(decoder, value, shareable_index=None):
# Semantic tag 258
return set(value)
#
# Special decoders (major tag 7)
#
def decode_simple_value(decoder, shareable_index=None):
return CBORSimpleValue(struct.unpack('>B', decoder.read(1))[0])
def decode_float16(decoder, shareable_index=None):
payload = decoder.read(2)
return unpack_float16(payload)
def decode_float32(decoder, shareable_index=None):
return struct.unpack('>f', decoder.read(4))[0]
def decode_float64(decoder, shareable_index=None):
return struct.unpack('>d', decoder.read(8))[0]
major_decoders = {
0: decode_uint,
1: decode_negint,
2: decode_bytestring,
3: decode_string,
4: decode_array,
5: decode_map,
6: decode_semantic,
7: decode_special
}
special_decoders = {
20: lambda self: False,
21: lambda self: True,
22: lambda self: None,
23: lambda self: undefined,
24: decode_simple_value,
25: decode_float16,
26: decode_float32,
27: decode_float64,
31: lambda self: break_marker
}
semantic_decoders = {
0: decode_datetime_string,
1: decode_epoch_datetime,
2: decode_positive_bignum,
3: decode_negative_bignum,
4: decode_fraction,
5: decode_bigfloat,
29: decode_sharedref,
30: decode_rational,
35: decode_regexp,
36: decode_mime,
37: decode_uuid,
258: decode_set
}
class CBORDecoder(object):
"""
Deserializes a CBOR encoded byte stream.
:param tag_hook: Callable that takes 3 arguments: the decoder instance, the
:class:`~cbor2.types.CBORTag` and the shareable index for the resulting object, if any.
This callback is called for any tags for which there is no built-in decoder.
The return value is substituted for the CBORTag object in the deserialized output.
:param object_hook: Callable that takes 2 arguments: the decoder instance and the dictionary.
This callback is called for each deserialized :class:`dict` object.
The return value is substituted for the dict in the deserialized output.
"""
__slots__ = ('fp', 'tag_hook', 'object_hook', '_shareables')
def __init__(self, fp, tag_hook=None, object_hook=None):
self.fp = fp
self.tag_hook = tag_hook
self.object_hook = object_hook
self._shareables = []
def _allocate_shareable(self):
self._shareables.append(None)
return len(self._shareables) - 1
def set_shareable(self, index, value):
"""
Set the shareable value for the last encountered shared value marker, if any.
If the given index is ``None``, nothing is done.
:param index: the value of the ``shared_index`` argument to the decoder
:param value: the shared value
"""
if index is not None:
self._shareables[index] = value
def read(self, amount):
"""
Read bytes from the data stream.
:param int amount: the number of bytes to read
"""
data = self.fp.read(amount)
if len(data) < amount:
raise CBORDecodeError('premature end of stream (expected to read {} bytes, got {} '
'instead)'.format(amount, len(data)))
return data
def decode(self, shareable_index=None):
"""
Decode the next value from the stream.
:raises CBORDecodeError: if there is any problem decoding the stream
"""
try:
initial_byte = byte_as_integer(self.fp.read(1))
major_type = initial_byte >> 5
subtype = initial_byte & 31
except Exception as e:
raise CBORDecodeError('error reading major type at index {}: {}'
.format(self.fp.tell(), e))
decoder = major_decoders[major_type]
try:
return decoder(self, subtype, shareable_index)
except CBORDecodeError:
raise
except Exception as e:
raise CBORDecodeError('error decoding value at index {}: {}'.format(self.fp.tell(), e))
def decode_from_bytes(self, buf):
"""
Wrap the given bytestring as a file and call :meth:`decode` with it as the argument.
This method was intended to be used from the ``tag_hook`` hook when an object needs to be
decoded separately from the rest but while still taking advantage of the shared value
registry.
"""
old_fp = self.fp
self.fp = BytesIO(buf)
retval = self.decode()
self.fp = old_fp
return retval
def loads(payload, **kwargs):
"""
Deserialize an object from a bytestring.
:param bytes payload: the bytestring to serialize
:param kwargs: keyword arguments passed to :class:`~.CBORDecoder`
:return: the deserialized object
"""
fp = BytesIO(payload)
return CBORDecoder(fp, **kwargs).decode()
def load(fp, **kwargs):
"""
Deserialize an object from an open file.
:param fp: the input file (any file-like object)
:param kwargs: keyword arguments passed to :class:`~.CBORDecoder`
:return: the deserialized object
"""
return CBORDecoder(fp, **kwargs).decode()