# admin/verify.py - better repository integrity checking for Mercurial # # Copyright 2023 Octobus # # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. from __future__ import annotations import collections import copy import functools from ..i18n import _ from .. import error, pycompat, registrar, requirements from ..utils import stringutil verify_table = {} verify_alias_table = {} check = registrar.verify_check(verify_table, verify_alias_table) # Use this to declare options/aliases in the middle of the hierarchy. # Checks like these are not run themselves and cannot have a body. # For an example, see the `revlogs` check. def noop_func(*args, **kwargs): return @check(b"working-copy.dirstate", alias=b"dirstate") def check_dirstate(ui, repo, **options): ui.status(_(b"checking dirstate\n")) parent1, parent2 = repo.dirstate.parents() m1 = repo[parent1].manifest() m2 = repo[parent2].manifest() errors = 0 is_narrow = requirements.NARROW_REQUIREMENT in repo.requirements narrow_matcher = repo.narrowmatch() if is_narrow else None for err in repo.dirstate.verify(m1, m2, parent1, narrow_matcher): ui.warn(err) errors += 1 return errors # Tree of all checks and their associated function pyramid = {} def build_pyramid(table, full_pyramid): """Create a pyramid of checks of the registered checks. It is a name-based hierarchy that can be arbitrarily nested.""" for entry, func in sorted(table.items(), key=lambda x: x[0], reverse=True): cursor = full_pyramid levels = entry.split(b".") for level in levels[:-1]: current_node = cursor.setdefault(level, {}) cursor = current_node if cursor.get(levels[-1]) is None: cursor[levels[-1]] = (entry, func) elif func is not noop_func: m = b"intermediate checks need to use `verify.noop_func`" raise error.ProgrammingError(m) def find_checks(name, table=None, alias_table=None, full_pyramid=None): """Find all checks for a given name and returns a dict of (qualified_check_name, check_function) # Examples Using a full qualified name: "working-copy.dirstate" -> { "working-copy.dirstate": CF, } Using a *prefix* of a qualified name: "store.revlogs" -> { "store.revlogs.changelog": CF, "store.revlogs.manifestlog": CF, "store.revlogs.filelog": CF, } Using a defined alias: "revlogs" -> { "store.revlogs.changelog": CF, "store.revlogs.manifestlog": CF, "store.revlogs.filelog": CF, } Using something that is none of the above will be an error. """ if table is None: table = verify_table if alias_table is None: alias_table = verify_alias_table if name == b"full": return table checks = {} # is it a full name? check = table.get(name) if check is None: # is it an alias? qualified_name = alias_table.get(name) if qualified_name is not None: name = qualified_name check = table.get(name) else: split = name.split(b".", 1) if len(split) == 2: # split[0] can be an alias qualified_name = alias_table.get(split[0]) if qualified_name is not None: name = b"%s.%s" % (qualified_name, split[1]) check = table.get(name) else: qualified_name = name # Maybe it's a subtree in the check hierarchy that does not # have an explicit alias. levels = name.split(b".") if full_pyramid is not None: if not full_pyramid: build_pyramid(table, full_pyramid) pyramid.clear() pyramid.update(full_pyramid.items()) else: build_pyramid(table, pyramid) subtree = pyramid # Find subtree for level in levels: subtree = subtree.get(level) if subtree is None: hint = error.getsimilar(list(alias_table) + list(table), name) hint = error.similarity_hint(hint) raise error.InputError(_(b"unknown check %s" % name), hint=hint) # Get all checks in that subtree if isinstance(subtree, dict): stack = list(subtree.items()) while stack: current_name, entry = stack.pop() if isinstance(entry, dict): stack.extend(entry.items()) else: # (qualified_name, func) checks[entry[0]] = entry[1] else: checks[name] = check return checks def pass_options( ui, checks, options, table=None, alias_table=None, full_pyramid=None, ): """Given a dict of checks (fully qualified name to function), and a list of options as given by the user, pass each option down to the right check function.""" ui.debug(b"passing options to check functions\n") to_modify = collections.defaultdict(dict) if not checks: raise error.Error(_(b"`checks` required")) for option in sorted(options): split = option.split(b":") hint = _( b"syntax is 'check:option=value', " b"eg. revlogs.changelog:copies=yes" ) option_error = error.InputError( _(b"invalid option '%s'") % option, hint=hint ) if len(split) != 2: raise option_error check_name, option_value = split if not option_value: raise option_error split = option_value.split(b"=") if len(split) != 2: raise option_error option_name, value = split if not value: raise option_error path = b"%s:%s" % (check_name, option_name) matching_checks = find_checks( check_name, table=table, alias_table=alias_table, full_pyramid=full_pyramid, ) for name in matching_checks: check = checks.get(name) if check is None: msg = _(b"specified option '%s' for unselected check '%s'\n") raise error.InputError(msg % (name, option_name)) assert hasattr(check, "func") # help Pytype if not hasattr(check.func, "options"): raise error.InputError( _(b"check '%s' has no option '%s'") % (name, option_name) ) try: matching_option = next( (o for o in check.func.options if o[0] == option_name) ) except StopIteration: raise error.InputError( _(b"check '%s' has no option '%s'") % (name, option_name) ) # transform the argument from cli string to the expected Python type _name, typ, _docstring = matching_option as_typed = None if isinstance(typ, bool): as_bool = stringutil.parsebool(value) if as_bool is None: raise error.InputError( _(b"'%s' is not a boolean ('%s')") % (path, value) ) as_typed = as_bool elif isinstance(typ, list): as_list = stringutil.parselist(value) if as_list is None: raise error.InputError( _(b"'%s' is not a list ('%s')") % (path, value) ) as_typed = as_list else: raise error.ProgrammingError(b"unsupported type %s", type(typ)) if option_name in to_modify[name]: raise error.InputError( _(b"duplicated option '%s' for '%s'") % (option_name, name) ) else: assert as_typed is not None to_modify[name][option_name] = as_typed # Manage case where a check is set but without command line options # it will later be set with default check options values for name, f in checks.items(): if name not in to_modify: to_modify[name] = {} # Merge default options with command line options for check_name, cmd_options in to_modify.items(): check = checks.get(check_name) func = checks[check_name] merged_options = {} # help Pytype assert check is not None assert check.func is not None assert hasattr(check.func, "options") if check.func.options: # copy the default value in case it's mutable (list, etc.) merged_options = { o[0]: copy.deepcopy(o[1]) for o in check.func.options } if cmd_options: for k, v in cmd_options.items(): merged_options[k] = v options = pycompat.strkwargs(merged_options) checks[check_name] = functools.partial(func, **options) ui.debug(b"merged options for '%s': '%r'\n" % (check_name, options)) return checks def get_checks( repo, ui, names=None, options=None, table=None, alias_table=None, full_pyramid=None, ): """Given a list of function names and optionally a list of options, return matched checks with merged options (command line options values take precedence on default ones) It runs find checks, then resolve options and returns a dict of matched functions with resolved options. """ funcs = {} if names is None: names = [] if options is None: options = [] # find checks for name in names: matched = find_checks( name, table=table, alias_table=alias_table, full_pyramid=full_pyramid, ) matched_names = b", ".join(matched) ui.debug(b"found checks '%s' for name '%s'\n" % (matched_names, name)) funcs.update(matched) funcs = {n: functools.partial(f, ui, repo) for n, f in funcs.items()} # resolve options checks = pass_options( ui, funcs, options, table=table, alias_table=alias_table, full_pyramid=full_pyramid, ) return checks