common.py
203 lines
| 5.7 KiB
| text/x-python
|
PythonLexer
Gregory Szorc
|
r37513 | import imp | ||
Gregory Szorc
|
r30895 | import inspect | ||
Gregory Szorc
|
r30435 | import io | ||
Gregory Szorc
|
r31796 | import os | ||
Gregory Szorc
|
r30895 | import types | ||
Gregory Szorc
|
r44446 | import unittest | ||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | try: | ||
import hypothesis | ||||
except ImportError: | ||||
hypothesis = None | ||||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r44446 | class TestCase(unittest.TestCase): | ||
if not getattr(unittest.TestCase, "assertRaisesRegex", False): | ||||
assertRaisesRegex = unittest.TestCase.assertRaisesRegexp | ||||
Gregory Szorc
|
r30895 | def make_cffi(cls): | ||
"""Decorator to add CFFI versions of each test method.""" | ||||
Gregory Szorc
|
r37513 | # The module containing this class definition should | ||
# `import zstandard as zstd`. Otherwise things may blow up. | ||||
mod = inspect.getmodule(cls) | ||||
Gregory Szorc
|
r44446 | if not hasattr(mod, "zstd"): | ||
Gregory Szorc
|
r37513 | raise Exception('test module does not contain "zstd" symbol') | ||
Gregory Szorc
|
r44446 | if not hasattr(mod.zstd, "backend"): | ||
raise Exception( | ||||
'zstd symbol does not have "backend" attribute; did ' | ||||
"you `import zstandard as zstd`?" | ||||
) | ||||
Gregory Szorc
|
r37513 | |||
# If `import zstandard` already chose the cffi backend, there is nothing | ||||
# for us to do: we only add the cffi variation if the default backend | ||||
# is the C extension. | ||||
Gregory Szorc
|
r44446 | if mod.zstd.backend == "cffi": | ||
Gregory Szorc
|
r37513 | return cls | ||
old_env = dict(os.environ) | ||||
Gregory Szorc
|
r44446 | os.environ["PYTHON_ZSTANDARD_IMPORT_POLICY"] = "cffi" | ||
Gregory Szorc
|
r30895 | try: | ||
Gregory Szorc
|
r37513 | try: | ||
Gregory Szorc
|
r44446 | mod_info = imp.find_module("zstandard") | ||
mod = imp.load_module("zstandard_cffi", *mod_info) | ||||
Gregory Szorc
|
r37513 | except ImportError: | ||
return cls | ||||
finally: | ||||
os.environ.clear() | ||||
os.environ.update(old_env) | ||||
Gregory Szorc
|
r44446 | if mod.backend != "cffi": | ||
Gregory Szorc
|
r44605 | raise Exception( | ||
"got the zstandard %s backend instead of cffi" % mod.backend | ||||
) | ||||
Gregory Szorc
|
r30895 | |||
# If CFFI version is available, dynamically construct test methods | ||||
# that use it. | ||||
for attr in dir(cls): | ||||
fn = getattr(cls, attr) | ||||
if not inspect.ismethod(fn) and not inspect.isfunction(fn): | ||||
continue | ||||
Gregory Szorc
|
r44446 | if not fn.__name__.startswith("test_"): | ||
Gregory Szorc
|
r30895 | continue | ||
Gregory Szorc
|
r44446 | name = "%s_cffi" % fn.__name__ | ||
Gregory Szorc
|
r30895 | |||
# Replace the "zstd" symbol with the CFFI module instance. Then copy | ||||
# the function object and install it in a new attribute. | ||||
if isinstance(fn, types.FunctionType): | ||||
globs = dict(fn.__globals__) | ||||
Gregory Szorc
|
r44446 | globs["zstd"] = mod | ||
new_fn = types.FunctionType( | ||||
fn.__code__, globs, name, fn.__defaults__, fn.__closure__ | ||||
) | ||||
Gregory Szorc
|
r30895 | new_method = new_fn | ||
else: | ||||
globs = dict(fn.__func__.func_globals) | ||||
Gregory Szorc
|
r44446 | globs["zstd"] = mod | ||
new_fn = types.FunctionType( | ||||
fn.__func__.func_code, | ||||
globs, | ||||
name, | ||||
fn.__func__.func_defaults, | ||||
fn.__func__.func_closure, | ||||
) | ||||
Gregory Szorc
|
r44605 | new_method = types.UnboundMethodType( | ||
new_fn, fn.im_self, fn.im_class | ||||
) | ||||
Gregory Szorc
|
r30895 | |||
setattr(cls, name, new_method) | ||||
return cls | ||||
Gregory Szorc
|
r30435 | |||
Gregory Szorc
|
r42237 | class NonClosingBytesIO(io.BytesIO): | ||
"""BytesIO that saves the underlying buffer on close(). | ||||
This allows us to access written data after close(). | ||||
""" | ||||
Gregory Szorc
|
r44446 | |||
Gregory Szorc
|
r30435 | def __init__(self, *args, **kwargs): | ||
Gregory Szorc
|
r42237 | super(NonClosingBytesIO, self).__init__(*args, **kwargs) | ||
self._saved_buffer = None | ||||
def close(self): | ||||
self._saved_buffer = self.getvalue() | ||||
return super(NonClosingBytesIO, self).close() | ||||
def getvalue(self): | ||||
if self.closed: | ||||
return self._saved_buffer | ||||
else: | ||||
return super(NonClosingBytesIO, self).getvalue() | ||||
class OpCountingBytesIO(NonClosingBytesIO): | ||||
def __init__(self, *args, **kwargs): | ||||
self._flush_count = 0 | ||||
Gregory Szorc
|
r30435 | self._read_count = 0 | ||
self._write_count = 0 | ||||
return super(OpCountingBytesIO, self).__init__(*args, **kwargs) | ||||
Gregory Szorc
|
r42237 | def flush(self): | ||
self._flush_count += 1 | ||||
return super(OpCountingBytesIO, self).flush() | ||||
Gregory Szorc
|
r30435 | def read(self, *args): | ||
self._read_count += 1 | ||||
return super(OpCountingBytesIO, self).read(*args) | ||||
def write(self, data): | ||||
self._write_count += 1 | ||||
return super(OpCountingBytesIO, self).write(data) | ||||
Gregory Szorc
|
r31796 | |||
_source_files = [] | ||||
def random_input_data(): | ||||
"""Obtain the raw content of source files. | ||||
This is used for generating "random" data to feed into fuzzing, since it is | ||||
faster than random content generation. | ||||
""" | ||||
if _source_files: | ||||
return _source_files | ||||
for root, dirs, files in os.walk(os.path.dirname(__file__)): | ||||
dirs[:] = list(sorted(dirs)) | ||||
for f in sorted(files): | ||||
try: | ||||
Gregory Szorc
|
r44446 | with open(os.path.join(root, f), "rb") as fh: | ||
Gregory Szorc
|
r31796 | data = fh.read() | ||
if data: | ||||
_source_files.append(data) | ||||
except OSError: | ||||
pass | ||||
Gregory Szorc
|
r42237 | # Also add some actual random data. | ||
_source_files.append(os.urandom(100)) | ||||
_source_files.append(os.urandom(1000)) | ||||
_source_files.append(os.urandom(10000)) | ||||
_source_files.append(os.urandom(100000)) | ||||
_source_files.append(os.urandom(1000000)) | ||||
Gregory Szorc
|
r31796 | return _source_files | ||
Gregory Szorc
|
r37513 | |||
def generate_samples(): | ||||
inputs = [ | ||||
Gregory Szorc
|
r44446 | b"foo", | ||
b"bar", | ||||
b"abcdef", | ||||
b"sometext", | ||||
b"baz", | ||||
Gregory Szorc
|
r37513 | ] | ||
samples = [] | ||||
for i in range(128): | ||||
samples.append(inputs[i % 5]) | ||||
samples.append(inputs[i % 5] * (i + 3)) | ||||
samples.append(inputs[-(i % 5)] * (i + 2)) | ||||
return samples | ||||
if hypothesis: | ||||
Gregory Szorc
|
r42237 | default_settings = hypothesis.settings(deadline=10000) | ||
Gregory Szorc
|
r44446 | hypothesis.settings.register_profile("default", default_settings) | ||
Gregory Szorc
|
r37513 | |||
Gregory Szorc
|
r42237 | ci_settings = hypothesis.settings(deadline=20000, max_examples=1000) | ||
Gregory Szorc
|
r44446 | hypothesis.settings.register_profile("ci", ci_settings) | ||
Gregory Szorc
|
r37513 | |||
Gregory Szorc
|
r42237 | expensive_settings = hypothesis.settings(deadline=None, max_examples=10000) | ||
Gregory Szorc
|
r44446 | hypothesis.settings.register_profile("expensive", expensive_settings) | ||
Gregory Szorc
|
r42237 | |||
Gregory Szorc
|
r44605 | hypothesis.settings.load_profile( | ||
os.environ.get("HYPOTHESIS_PROFILE", "default") | ||||
) | ||||