##// END OF EJS Templates
dispatch: don't clamp the range of the exit code twice...
dispatch: don't clamp the range of the exit code twice We already limit the range to (0, 255) in the call to sys.exit(). The duplicated operation can't possibly be hurting us, but let's clean it up to avoid confusion.

File last commit:

r31796:e0dc4053 default
r36700:68328202 default
Show More
common.py
88 lines | 2.6 KiB | text/x-python | PythonLexer
import inspect
import io
import os
import types
def make_cffi(cls):
"""Decorator to add CFFI versions of each test method."""
try:
import zstd_cffi
except ImportError:
return cls
# If CFFI version is available, dynamically construct test methods
# that use it.
for attr in dir(cls):
fn = getattr(cls, attr)
if not inspect.ismethod(fn) and not inspect.isfunction(fn):
continue
if not fn.__name__.startswith('test_'):
continue
name = '%s_cffi' % fn.__name__
# Replace the "zstd" symbol with the CFFI module instance. Then copy
# the function object and install it in a new attribute.
if isinstance(fn, types.FunctionType):
globs = dict(fn.__globals__)
globs['zstd'] = zstd_cffi
new_fn = types.FunctionType(fn.__code__, globs, name,
fn.__defaults__, fn.__closure__)
new_method = new_fn
else:
globs = dict(fn.__func__.func_globals)
globs['zstd'] = zstd_cffi
new_fn = types.FunctionType(fn.__func__.func_code, globs, name,
fn.__func__.func_defaults,
fn.__func__.func_closure)
new_method = types.UnboundMethodType(new_fn, fn.im_self,
fn.im_class)
setattr(cls, name, new_method)
return cls
class OpCountingBytesIO(io.BytesIO):
def __init__(self, *args, **kwargs):
self._read_count = 0
self._write_count = 0
return super(OpCountingBytesIO, self).__init__(*args, **kwargs)
def read(self, *args):
self._read_count += 1
return super(OpCountingBytesIO, self).read(*args)
def write(self, data):
self._write_count += 1
return super(OpCountingBytesIO, self).write(data)
_source_files = []
def random_input_data():
"""Obtain the raw content of source files.
This is used for generating "random" data to feed into fuzzing, since it is
faster than random content generation.
"""
if _source_files:
return _source_files
for root, dirs, files in os.walk(os.path.dirname(__file__)):
dirs[:] = list(sorted(dirs))
for f in sorted(files):
try:
with open(os.path.join(root, f), 'rb') as fh:
data = fh.read()
if data:
_source_files.append(data)
except OSError:
pass
return _source_files