|
|
import imp
|
|
|
import inspect
|
|
|
import io
|
|
|
import os
|
|
|
import types
|
|
|
|
|
|
try:
|
|
|
import hypothesis
|
|
|
except ImportError:
|
|
|
hypothesis = None
|
|
|
|
|
|
|
|
|
def make_cffi(cls):
|
|
|
"""Decorator to add CFFI versions of each test method."""
|
|
|
|
|
|
# The module containing this class definition should
|
|
|
# `import zstandard as zstd`. Otherwise things may blow up.
|
|
|
mod = inspect.getmodule(cls)
|
|
|
if not hasattr(mod, 'zstd'):
|
|
|
raise Exception('test module does not contain "zstd" symbol')
|
|
|
|
|
|
if not hasattr(mod.zstd, 'backend'):
|
|
|
raise Exception('zstd symbol does not have "backend" attribute; did '
|
|
|
'you `import zstandard as zstd`?')
|
|
|
|
|
|
# If `import zstandard` already chose the cffi backend, there is nothing
|
|
|
# for us to do: we only add the cffi variation if the default backend
|
|
|
# is the C extension.
|
|
|
if mod.zstd.backend == 'cffi':
|
|
|
return cls
|
|
|
|
|
|
old_env = dict(os.environ)
|
|
|
os.environ['PYTHON_ZSTANDARD_IMPORT_POLICY'] = 'cffi'
|
|
|
try:
|
|
|
try:
|
|
|
mod_info = imp.find_module('zstandard')
|
|
|
mod = imp.load_module('zstandard_cffi', *mod_info)
|
|
|
except ImportError:
|
|
|
return cls
|
|
|
finally:
|
|
|
os.environ.clear()
|
|
|
os.environ.update(old_env)
|
|
|
|
|
|
if mod.backend != 'cffi':
|
|
|
raise Exception('got the zstandard %s backend instead of cffi' % mod.backend)
|
|
|
|
|
|
# If CFFI version is available, dynamically construct test methods
|
|
|
# that use it.
|
|
|
|
|
|
for attr in dir(cls):
|
|
|
fn = getattr(cls, attr)
|
|
|
if not inspect.ismethod(fn) and not inspect.isfunction(fn):
|
|
|
continue
|
|
|
|
|
|
if not fn.__name__.startswith('test_'):
|
|
|
continue
|
|
|
|
|
|
name = '%s_cffi' % fn.__name__
|
|
|
|
|
|
# Replace the "zstd" symbol with the CFFI module instance. Then copy
|
|
|
# the function object and install it in a new attribute.
|
|
|
if isinstance(fn, types.FunctionType):
|
|
|
globs = dict(fn.__globals__)
|
|
|
globs['zstd'] = mod
|
|
|
new_fn = types.FunctionType(fn.__code__, globs, name,
|
|
|
fn.__defaults__, fn.__closure__)
|
|
|
new_method = new_fn
|
|
|
else:
|
|
|
globs = dict(fn.__func__.func_globals)
|
|
|
globs['zstd'] = mod
|
|
|
new_fn = types.FunctionType(fn.__func__.func_code, globs, name,
|
|
|
fn.__func__.func_defaults,
|
|
|
fn.__func__.func_closure)
|
|
|
new_method = types.UnboundMethodType(new_fn, fn.im_self,
|
|
|
fn.im_class)
|
|
|
|
|
|
setattr(cls, name, new_method)
|
|
|
|
|
|
return cls
|
|
|
|
|
|
|
|
|
class OpCountingBytesIO(io.BytesIO):
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
self._read_count = 0
|
|
|
self._write_count = 0
|
|
|
return super(OpCountingBytesIO, self).__init__(*args, **kwargs)
|
|
|
|
|
|
def read(self, *args):
|
|
|
self._read_count += 1
|
|
|
return super(OpCountingBytesIO, self).read(*args)
|
|
|
|
|
|
def write(self, data):
|
|
|
self._write_count += 1
|
|
|
return super(OpCountingBytesIO, self).write(data)
|
|
|
|
|
|
|
|
|
_source_files = []
|
|
|
|
|
|
|
|
|
def random_input_data():
|
|
|
"""Obtain the raw content of source files.
|
|
|
|
|
|
This is used for generating "random" data to feed into fuzzing, since it is
|
|
|
faster than random content generation.
|
|
|
"""
|
|
|
if _source_files:
|
|
|
return _source_files
|
|
|
|
|
|
for root, dirs, files in os.walk(os.path.dirname(__file__)):
|
|
|
dirs[:] = list(sorted(dirs))
|
|
|
for f in sorted(files):
|
|
|
try:
|
|
|
with open(os.path.join(root, f), 'rb') as fh:
|
|
|
data = fh.read()
|
|
|
if data:
|
|
|
_source_files.append(data)
|
|
|
except OSError:
|
|
|
pass
|
|
|
|
|
|
return _source_files
|
|
|
|
|
|
|
|
|
def generate_samples():
|
|
|
inputs = [
|
|
|
b'foo',
|
|
|
b'bar',
|
|
|
b'abcdef',
|
|
|
b'sometext',
|
|
|
b'baz',
|
|
|
]
|
|
|
|
|
|
samples = []
|
|
|
|
|
|
for i in range(128):
|
|
|
samples.append(inputs[i % 5])
|
|
|
samples.append(inputs[i % 5] * (i + 3))
|
|
|
samples.append(inputs[-(i % 5)] * (i + 2))
|
|
|
|
|
|
return samples
|
|
|
|
|
|
|
|
|
if hypothesis:
|
|
|
default_settings = hypothesis.settings()
|
|
|
hypothesis.settings.register_profile('default', default_settings)
|
|
|
|
|
|
ci_settings = hypothesis.settings(max_examples=2500,
|
|
|
max_iterations=2500)
|
|
|
hypothesis.settings.register_profile('ci', ci_settings)
|
|
|
|
|
|
hypothesis.settings.load_profile(
|
|
|
os.environ.get('HYPOTHESIS_PROFILE', 'default'))
|
|
|
|