##// END OF EJS Templates
admin-verify: pass p1 down to the dirstate function...
Raphaël Gomès -
r52506:dcb00d5c stable
parent child Browse files
Show More
@@ -1,341 +1,340
1 1 # admin/verify.py - better repository integrity checking for Mercurial
2 2 #
3 3 # Copyright 2023 Octobus <contact@octobus.net>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import collections
9 9 import copy
10 10 import functools
11 11
12 12 from ..i18n import _
13 13 from .. import error, pycompat, registrar, requirements
14 14 from ..utils import stringutil
15 15
16 16
17 17 verify_table = {}
18 18 verify_alias_table = {}
19 19 check = registrar.verify_check(verify_table, verify_alias_table)
20 20
21 21
22 22 # Use this to declare options/aliases in the middle of the hierarchy.
23 23 # Checks like these are not run themselves and cannot have a body.
24 24 # For an example, see the `revlogs` check.
25 25 def noop_func(*args, **kwargs):
26 26 return
27 27
28 28
29 29 @check(b"working-copy.dirstate", alias=b"dirstate")
30 30 def check_dirstate(ui, repo, **options):
31 31 ui.status(_(b"checking dirstate\n"))
32 32
33 33 parent1, parent2 = repo.dirstate.parents()
34 34 m1 = repo[parent1].manifest()
35 35 m2 = repo[parent2].manifest()
36 36 errors = 0
37 37
38 38 is_narrow = requirements.NARROW_REQUIREMENT in repo.requirements
39 39 narrow_matcher = repo.narrowmatch() if is_narrow else None
40
41 for err in repo.dirstate.verify(m1, m2, narrow_matcher):
40 for err in repo.dirstate.verify(m1, m2, parent1, narrow_matcher):
42 41 ui.warn(err[0] % err[1:])
43 42 errors += 1
44 43
45 44 return errors
46 45
47 46
48 47 # Tree of all checks and their associated function
49 48 pyramid = {}
50 49
51 50
52 51 def build_pyramid(table, full_pyramid):
53 52 """Create a pyramid of checks of the registered checks.
54 53 It is a name-based hierarchy that can be arbitrarily nested."""
55 54 for entry, func in sorted(table.items(), key=lambda x: x[0], reverse=True):
56 55 cursor = full_pyramid
57 56 levels = entry.split(b".")
58 57 for level in levels[:-1]:
59 58 current_node = cursor.setdefault(level, {})
60 59 cursor = current_node
61 60 if cursor.get(levels[-1]) is None:
62 61 cursor[levels[-1]] = (entry, func)
63 62 elif func is not noop_func:
64 63 m = b"intermediate checks need to use `verify.noop_func`"
65 64 raise error.ProgrammingError(m)
66 65
67 66
68 67 def find_checks(name, table=None, alias_table=None, full_pyramid=None):
69 68 """Find all checks for a given name and returns a dict of
70 69 (qualified_check_name, check_function)
71 70
72 71 # Examples
73 72
74 73 Using a full qualified name:
75 74 "working-copy.dirstate" -> {
76 75 "working-copy.dirstate": CF,
77 76 }
78 77
79 78 Using a *prefix* of a qualified name:
80 79 "store.revlogs" -> {
81 80 "store.revlogs.changelog": CF,
82 81 "store.revlogs.manifestlog": CF,
83 82 "store.revlogs.filelog": CF,
84 83 }
85 84
86 85 Using a defined alias:
87 86 "revlogs" -> {
88 87 "store.revlogs.changelog": CF,
89 88 "store.revlogs.manifestlog": CF,
90 89 "store.revlogs.filelog": CF,
91 90 }
92 91
93 92 Using something that is none of the above will be an error.
94 93 """
95 94 if table is None:
96 95 table = verify_table
97 96 if alias_table is None:
98 97 alias_table = verify_alias_table
99 98
100 99 if name == b"full":
101 100 return table
102 101 checks = {}
103 102
104 103 # is it a full name?
105 104 check = table.get(name)
106 105
107 106 if check is None:
108 107 # is it an alias?
109 108 qualified_name = alias_table.get(name)
110 109 if qualified_name is not None:
111 110 name = qualified_name
112 111 check = table.get(name)
113 112 else:
114 113 split = name.split(b".", 1)
115 114 if len(split) == 2:
116 115 # split[0] can be an alias
117 116 qualified_name = alias_table.get(split[0])
118 117 if qualified_name is not None:
119 118 name = b"%s.%s" % (qualified_name, split[1])
120 119 check = table.get(name)
121 120 else:
122 121 qualified_name = name
123 122
124 123 # Maybe it's a subtree in the check hierarchy that does not
125 124 # have an explicit alias.
126 125 levels = name.split(b".")
127 126 if full_pyramid is not None:
128 127 if not full_pyramid:
129 128 build_pyramid(table, full_pyramid)
130 129
131 130 pyramid.clear()
132 131 pyramid.update(full_pyramid.items())
133 132 else:
134 133 build_pyramid(table, pyramid)
135 134
136 135 subtree = pyramid
137 136 # Find subtree
138 137 for level in levels:
139 138 subtree = subtree.get(level)
140 139 if subtree is None:
141 140 hint = error.getsimilar(list(alias_table) + list(table), name)
142 141 hint = error.similarity_hint(hint)
143 142
144 143 raise error.InputError(_(b"unknown check %s" % name), hint=hint)
145 144
146 145 # Get all checks in that subtree
147 146 if isinstance(subtree, dict):
148 147 stack = list(subtree.items())
149 148 while stack:
150 149 current_name, entry = stack.pop()
151 150 if isinstance(entry, dict):
152 151 stack.extend(entry.items())
153 152 else:
154 153 # (qualified_name, func)
155 154 checks[entry[0]] = entry[1]
156 155 else:
157 156 checks[name] = check
158 157
159 158 return checks
160 159
161 160
162 161 def pass_options(
163 162 ui,
164 163 checks,
165 164 options,
166 165 table=None,
167 166 alias_table=None,
168 167 full_pyramid=None,
169 168 ):
170 169 """Given a dict of checks (fully qualified name to function), and a list
171 170 of options as given by the user, pass each option down to the right check
172 171 function."""
173 172 ui.debug(b"passing options to check functions\n")
174 173 to_modify = collections.defaultdict(dict)
175 174
176 175 if not checks:
177 176 raise error.Error(_(b"`checks` required"))
178 177
179 178 for option in sorted(options):
180 179 split = option.split(b":")
181 180 hint = _(
182 181 b"syntax is 'check:option=value', "
183 182 b"eg. revlogs.changelog:copies=yes"
184 183 )
185 184 option_error = error.InputError(
186 185 _(b"invalid option '%s'") % option, hint=hint
187 186 )
188 187 if len(split) != 2:
189 188 raise option_error
190 189
191 190 check_name, option_value = split
192 191 if not option_value:
193 192 raise option_error
194 193
195 194 split = option_value.split(b"=")
196 195 if len(split) != 2:
197 196 raise option_error
198 197
199 198 option_name, value = split
200 199 if not value:
201 200 raise option_error
202 201
203 202 path = b"%s:%s" % (check_name, option_name)
204 203
205 204 matching_checks = find_checks(
206 205 check_name,
207 206 table=table,
208 207 alias_table=alias_table,
209 208 full_pyramid=full_pyramid,
210 209 )
211 210 for name in matching_checks:
212 211 check = checks.get(name)
213 212 if check is None:
214 213 msg = _(b"specified option '%s' for unselected check '%s'\n")
215 214 raise error.InputError(msg % (name, option_name))
216 215
217 216 assert hasattr(check, "func") # help Pytype
218 217
219 218 if not hasattr(check.func, "options"):
220 219 raise error.InputError(
221 220 _(b"check '%s' has no option '%s'") % (name, option_name)
222 221 )
223 222
224 223 try:
225 224 matching_option = next(
226 225 (o for o in check.func.options if o[0] == option_name)
227 226 )
228 227 except StopIteration:
229 228 raise error.InputError(
230 229 _(b"check '%s' has no option '%s'") % (name, option_name)
231 230 )
232 231
233 232 # transform the argument from cli string to the expected Python type
234 233 _name, typ, _docstring = matching_option
235 234
236 235 as_typed = None
237 236 if isinstance(typ, bool):
238 237 as_bool = stringutil.parsebool(value)
239 238 if as_bool is None:
240 239 raise error.InputError(
241 240 _(b"'%s' is not a boolean ('%s')") % (path, value)
242 241 )
243 242 as_typed = as_bool
244 243 elif isinstance(typ, list):
245 244 as_list = stringutil.parselist(value)
246 245 if as_list is None:
247 246 raise error.InputError(
248 247 _(b"'%s' is not a list ('%s')") % (path, value)
249 248 )
250 249 as_typed = as_list
251 250 else:
252 251 raise error.ProgrammingError(b"unsupported type %s", type(typ))
253 252
254 253 if option_name in to_modify[name]:
255 254 raise error.InputError(
256 255 _(b"duplicated option '%s' for '%s'") % (option_name, name)
257 256 )
258 257 else:
259 258 assert as_typed is not None
260 259 to_modify[name][option_name] = as_typed
261 260
262 261 # Manage case where a check is set but without command line options
263 262 # it will later be set with default check options values
264 263 for name, f in checks.items():
265 264 if name not in to_modify:
266 265 to_modify[name] = {}
267 266
268 267 # Merge default options with command line options
269 268 for check_name, cmd_options in to_modify.items():
270 269 check = checks.get(check_name)
271 270 func = checks[check_name]
272 271 merged_options = {}
273 272 # help Pytype
274 273 assert check is not None
275 274 assert check.func is not None
276 275 assert hasattr(check.func, "options")
277 276
278 277 if check.func.options:
279 278 # copy the default value in case it's mutable (list, etc.)
280 279 merged_options = {
281 280 o[0]: copy.deepcopy(o[1]) for o in check.func.options
282 281 }
283 282 if cmd_options:
284 283 for k, v in cmd_options.items():
285 284 merged_options[k] = v
286 285 options = pycompat.strkwargs(merged_options)
287 286 checks[check_name] = functools.partial(func, **options)
288 287 ui.debug(b"merged options for '%s': '%r'\n" % (check_name, options))
289 288
290 289 return checks
291 290
292 291
293 292 def get_checks(
294 293 repo,
295 294 ui,
296 295 names=None,
297 296 options=None,
298 297 table=None,
299 298 alias_table=None,
300 299 full_pyramid=None,
301 300 ):
302 301 """Given a list of function names and optionally a list of
303 302 options, return matched checks with merged options (command line options
304 303 values take precedence on default ones)
305 304
306 305 It runs find checks, then resolve options and returns a dict of matched
307 306 functions with resolved options.
308 307 """
309 308 funcs = {}
310 309
311 310 if names is None:
312 311 names = []
313 312
314 313 if options is None:
315 314 options = []
316 315
317 316 # find checks
318 317 for name in names:
319 318 matched = find_checks(
320 319 name,
321 320 table=table,
322 321 alias_table=alias_table,
323 322 full_pyramid=full_pyramid,
324 323 )
325 324 matched_names = b", ".join(matched)
326 325 ui.debug(b"found checks '%s' for name '%s'\n" % (matched_names, name))
327 326 funcs.update(matched)
328 327
329 328 funcs = {n: functools.partial(f, ui, repo) for n, f in funcs.items()}
330 329
331 330 # resolve options
332 331 checks = pass_options(
333 332 ui,
334 333 funcs,
335 334 options,
336 335 table=table,
337 336 alias_table=alias_table,
338 337 full_pyramid=full_pyramid,
339 338 )
340 339
341 340 return checks
General Comments 0
You need to be logged in to leave comments. Login now