##// END OF EJS Templates
exchange: refactor control flow of _getbundlechangegrouppart()...
exchange: refactor control flow of _getbundlechangegrouppart() The use of early return makes the control flow of this function much easier to reason about IMO. Differential Revision: https://phab.mercurial-scm.org/D4010

File last commit:

r37513:b1fb341d default
r38816:188d12d8 default
Show More
common.py
151 lines | 4.4 KiB | text/x-python | PythonLexer
import imp
import inspect
import io
import os
import types
try:
import hypothesis
except ImportError:
hypothesis = None
def make_cffi(cls):
"""Decorator to add CFFI versions of each test method."""
# The module containing this class definition should
# `import zstandard as zstd`. Otherwise things may blow up.
mod = inspect.getmodule(cls)
if not hasattr(mod, 'zstd'):
raise Exception('test module does not contain "zstd" symbol')
if not hasattr(mod.zstd, 'backend'):
raise Exception('zstd symbol does not have "backend" attribute; did '
'you `import zstandard as zstd`?')
# If `import zstandard` already chose the cffi backend, there is nothing
# for us to do: we only add the cffi variation if the default backend
# is the C extension.
if mod.zstd.backend == 'cffi':
return cls
old_env = dict(os.environ)
os.environ['PYTHON_ZSTANDARD_IMPORT_POLICY'] = 'cffi'
try:
try:
mod_info = imp.find_module('zstandard')
mod = imp.load_module('zstandard_cffi', *mod_info)
except ImportError:
return cls
finally:
os.environ.clear()
os.environ.update(old_env)
if mod.backend != 'cffi':
raise Exception('got the zstandard %s backend instead of cffi' % mod.backend)
# If CFFI version is available, dynamically construct test methods
# that use it.
for attr in dir(cls):
fn = getattr(cls, attr)
if not inspect.ismethod(fn) and not inspect.isfunction(fn):
continue
if not fn.__name__.startswith('test_'):
continue
name = '%s_cffi' % fn.__name__
# Replace the "zstd" symbol with the CFFI module instance. Then copy
# the function object and install it in a new attribute.
if isinstance(fn, types.FunctionType):
globs = dict(fn.__globals__)
globs['zstd'] = mod
new_fn = types.FunctionType(fn.__code__, globs, name,
fn.__defaults__, fn.__closure__)
new_method = new_fn
else:
globs = dict(fn.__func__.func_globals)
globs['zstd'] = mod
new_fn = types.FunctionType(fn.__func__.func_code, globs, name,
fn.__func__.func_defaults,
fn.__func__.func_closure)
new_method = types.UnboundMethodType(new_fn, fn.im_self,
fn.im_class)
setattr(cls, name, new_method)
return cls
class OpCountingBytesIO(io.BytesIO):
def __init__(self, *args, **kwargs):
self._read_count = 0
self._write_count = 0
return super(OpCountingBytesIO, self).__init__(*args, **kwargs)
def read(self, *args):
self._read_count += 1
return super(OpCountingBytesIO, self).read(*args)
def write(self, data):
self._write_count += 1
return super(OpCountingBytesIO, self).write(data)
_source_files = []
def random_input_data():
"""Obtain the raw content of source files.
This is used for generating "random" data to feed into fuzzing, since it is
faster than random content generation.
"""
if _source_files:
return _source_files
for root, dirs, files in os.walk(os.path.dirname(__file__)):
dirs[:] = list(sorted(dirs))
for f in sorted(files):
try:
with open(os.path.join(root, f), 'rb') as fh:
data = fh.read()
if data:
_source_files.append(data)
except OSError:
pass
return _source_files
def generate_samples():
inputs = [
b'foo',
b'bar',
b'abcdef',
b'sometext',
b'baz',
]
samples = []
for i in range(128):
samples.append(inputs[i % 5])
samples.append(inputs[i % 5] * (i + 3))
samples.append(inputs[-(i % 5)] * (i + 2))
return samples
if hypothesis:
default_settings = hypothesis.settings()
hypothesis.settings.register_profile('default', default_settings)
ci_settings = hypothesis.settings(max_examples=2500,
max_iterations=2500)
hypothesis.settings.register_profile('ci', ci_settings)
hypothesis.settings.load_profile(
os.environ.get('HYPOTHESIS_PROFILE', 'default'))