##// END OF EJS Templates
nodemap: never read more than the expected data amount...
marmoute -
r44811:c7eebdb1 default
parent child Browse files
Show More
@@ -1,508 +1,513
1 1 # nodemap.py - nodemap related code and utilities
2 2 #
3 3 # Copyright 2019 Pierre-Yves David <pierre-yves.david@octobus.net>
4 4 # Copyright 2019 George Racinet <georges.racinet@octobus.net>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from __future__ import absolute_import
10 10
11 11 import os
12 12 import re
13 13 import struct
14 14
15 15 from .. import (
16 16 error,
17 17 node as nodemod,
18 18 util,
19 19 )
20 20
21 21
22 22 class NodeMap(dict):
23 23 def __missing__(self, x):
24 24 raise error.RevlogError(b'unknown node: %s' % x)
25 25
26 26
27 27 def persisted_data(revlog):
28 28 """read the nodemap for a revlog from disk"""
29 29 if revlog.nodemap_file is None:
30 30 return None
31 31 pdata = revlog.opener.tryread(revlog.nodemap_file)
32 32 if not pdata:
33 33 return None
34 34 offset = 0
35 35 (version,) = S_VERSION.unpack(pdata[offset : offset + S_VERSION.size])
36 36 if version != ONDISK_VERSION:
37 37 return None
38 38 offset += S_VERSION.size
39 39 headers = S_HEADER.unpack(pdata[offset : offset + S_HEADER.size])
40 40 uid_size, tip_rev, data_length, data_unused = headers
41 41 offset += S_HEADER.size
42 42 docket = NodeMapDocket(pdata[offset : offset + uid_size])
43 43 docket.tip_rev = tip_rev
44 44 docket.data_length = data_length
45 45 docket.data_unused = data_unused
46 46
47 47 filename = _rawdata_filepath(revlog, docket)
48 return docket, revlog.opener.tryread(filename)
48 data = revlog.opener.tryread(filename)
49 if len(data) < data_length:
50 return None
51 elif len(data) > data_length:
52 data = data[:data_length]
53 return docket, data
49 54
50 55
51 56 def setup_persistent_nodemap(tr, revlog):
52 57 """Install whatever is needed transaction side to persist a nodemap on disk
53 58
54 59 (only actually persist the nodemap if this is relevant for this revlog)
55 60 """
56 61 if revlog._inline:
57 62 return # inlined revlog are too small for this to be relevant
58 63 if revlog.nodemap_file is None:
59 64 return # we do not use persistent_nodemap on this revlog
60 65 callback_id = b"revlog-persistent-nodemap-%s" % revlog.nodemap_file
61 66 if tr.hasfinalize(callback_id):
62 67 return # no need to register again
63 68 tr.addfinalize(callback_id, lambda tr: _persist_nodemap(tr, revlog))
64 69
65 70
66 71 def _persist_nodemap(tr, revlog):
67 72 """Write nodemap data on disk for a given revlog
68 73 """
69 74 if getattr(revlog, 'filteredrevs', ()):
70 75 raise error.ProgrammingError(
71 76 "cannot persist nodemap of a filtered changelog"
72 77 )
73 78 if revlog.nodemap_file is None:
74 79 msg = "calling persist nodemap on a revlog without the feature enableb"
75 80 raise error.ProgrammingError(msg)
76 81
77 82 can_incremental = util.safehasattr(revlog.index, "nodemap_data_incremental")
78 83 ondisk_docket = revlog._nodemap_docket
79 84
80 85 data = None
81 86 # first attemp an incremental update of the data
82 87 if can_incremental and ondisk_docket is not None:
83 88 target_docket = revlog._nodemap_docket.copy()
84 89 (
85 90 src_docket,
86 91 data_changed_count,
87 92 data,
88 93 ) = revlog.index.nodemap_data_incremental()
89 94 if src_docket != target_docket:
90 95 data = None
91 96 else:
92 97 datafile = _rawdata_filepath(revlog, target_docket)
93 98 # EXP-TODO: if this is a cache, this should use a cache vfs, not a
94 99 # store vfs
95 100 with revlog.opener(datafile, b'r+') as fd:
96 101 fd.seek(target_docket.data_length)
97 102 fd.write(data)
98 103 target_docket.data_length += len(data)
99 104 target_docket.data_unused += data_changed_count
100 105
101 106 if data is None:
102 107 # otherwise fallback to a full new export
103 108 target_docket = NodeMapDocket()
104 109 datafile = _rawdata_filepath(revlog, target_docket)
105 110 if util.safehasattr(revlog.index, "nodemap_data_all"):
106 111 data = revlog.index.nodemap_data_all()
107 112 else:
108 113 data = persistent_data(revlog.index)
109 114 # EXP-TODO: if this is a cache, this should use a cache vfs, not a
110 115 # store vfs
111 116 with revlog.opener(datafile, b'w') as fd:
112 117 fd.write(data)
113 118 target_docket.data_length = len(data)
114 119 target_docket.tip_rev = revlog.tiprev()
115 120 # EXP-TODO: if this is a cache, this should use a cache vfs, not a
116 121 # store vfs
117 122 with revlog.opener(revlog.nodemap_file, b'w', atomictemp=True) as fp:
118 123 fp.write(target_docket.serialize())
119 124 revlog._nodemap_docket = target_docket
120 125 # EXP-TODO: if the transaction abort, we should remove the new data and
121 126 # reinstall the old one.
122 127
123 128 # search for old index file in all cases, some older process might have
124 129 # left one behind.
125 130 olds = _other_rawdata_filepath(revlog, target_docket)
126 131 if olds:
127 132 realvfs = getattr(revlog, '_realopener', revlog.opener)
128 133
129 134 def cleanup(tr):
130 135 for oldfile in olds:
131 136 realvfs.tryunlink(oldfile)
132 137
133 138 callback_id = b"revlog-cleanup-nodemap-%s" % revlog.nodemap_file
134 139 tr.addpostclose(callback_id, cleanup)
135 140
136 141
137 142 ### Nodemap docket file
138 143 #
139 144 # The nodemap data are stored on disk using 2 files:
140 145 #
141 146 # * a raw data files containing a persistent nodemap
142 147 # (see `Nodemap Trie` section)
143 148 #
144 149 # * a small "docket" file containing medatadata
145 150 #
146 151 # While the nodemap data can be multiple tens of megabytes, the "docket" is
147 152 # small, it is easy to update it automatically or to duplicated its content
148 153 # during a transaction.
149 154 #
150 155 # Multiple raw data can exist at the same time (The currently valid one and a
151 156 # new one beind used by an in progress transaction). To accomodate this, the
152 157 # filename hosting the raw data has a variable parts. The exact filename is
153 158 # specified inside the "docket" file.
154 159 #
155 160 # The docket file contains information to find, qualify and validate the raw
156 161 # data. Its content is currently very light, but it will expand as the on disk
157 162 # nodemap gains the necessary features to be used in production.
158 163
159 164 # version 0 is experimental, no BC garantee, do no use outside of tests.
160 165 ONDISK_VERSION = 0
161 166 S_VERSION = struct.Struct(">B")
162 167 S_HEADER = struct.Struct(">BQQQ")
163 168
164 169 ID_SIZE = 8
165 170
166 171
167 172 def _make_uid():
168 173 """return a new unique identifier.
169 174
170 175 The identifier is random and composed of ascii characters."""
171 176 return nodemod.hex(os.urandom(ID_SIZE))
172 177
173 178
174 179 class NodeMapDocket(object):
175 180 """metadata associated with persistent nodemap data
176 181
177 182 The persistent data may come from disk or be on their way to disk.
178 183 """
179 184
180 185 def __init__(self, uid=None):
181 186 if uid is None:
182 187 uid = _make_uid()
183 188 self.uid = uid
184 189 self.tip_rev = None
185 190 self.data_length = None
186 191 self.data_unused = 0
187 192
188 193 def copy(self):
189 194 new = NodeMapDocket(uid=self.uid)
190 195 new.tip_rev = self.tip_rev
191 196 new.data_length = self.data_length
192 197 new.data_unused = self.data_unused
193 198 return new
194 199
195 200 def __cmp__(self, other):
196 201 if self.uid < other.uid:
197 202 return -1
198 203 if self.uid > other.uid:
199 204 return 1
200 205 elif self.data_length < other.data_length:
201 206 return -1
202 207 elif self.data_length > other.data_length:
203 208 return 1
204 209 return 0
205 210
206 211 def __eq__(self, other):
207 212 return self.uid == other.uid and self.data_length == other.data_length
208 213
209 214 def serialize(self):
210 215 """return serialized bytes for a docket using the passed uid"""
211 216 data = []
212 217 data.append(S_VERSION.pack(ONDISK_VERSION))
213 218 headers = (
214 219 len(self.uid),
215 220 self.tip_rev,
216 221 self.data_length,
217 222 self.data_unused,
218 223 )
219 224 data.append(S_HEADER.pack(*headers))
220 225 data.append(self.uid)
221 226 return b''.join(data)
222 227
223 228
224 229 def _rawdata_filepath(revlog, docket):
225 230 """The (vfs relative) nodemap's rawdata file for a given uid"""
226 231 prefix = revlog.nodemap_file[:-2]
227 232 return b"%s-%s.nd" % (prefix, docket.uid)
228 233
229 234
230 235 def _other_rawdata_filepath(revlog, docket):
231 236 prefix = revlog.nodemap_file[:-2]
232 237 pattern = re.compile(b"(^|/)%s-[0-9a-f]+\.nd$" % prefix)
233 238 new_file_path = _rawdata_filepath(revlog, docket)
234 239 new_file_name = revlog.opener.basename(new_file_path)
235 240 dirpath = revlog.opener.dirname(new_file_path)
236 241 others = []
237 242 for f in revlog.opener.listdir(dirpath):
238 243 if pattern.match(f) and f != new_file_name:
239 244 others.append(f)
240 245 return others
241 246
242 247
243 248 ### Nodemap Trie
244 249 #
245 250 # This is a simple reference implementation to compute and persist a nodemap
246 251 # trie. This reference implementation is write only. The python version of this
247 252 # is not expected to be actually used, since it wont provide performance
248 253 # improvement over existing non-persistent C implementation.
249 254 #
250 255 # The nodemap is persisted as Trie using 4bits-address/16-entries block. each
251 256 # revision can be adressed using its node shortest prefix.
252 257 #
253 258 # The trie is stored as a sequence of block. Each block contains 16 entries
254 259 # (signed 64bit integer, big endian). Each entry can be one of the following:
255 260 #
256 261 # * value >= 0 -> index of sub-block
257 262 # * value == -1 -> no value
258 263 # * value < -1 -> a revision value: rev = -(value+10)
259 264 #
260 265 # The implementation focus on simplicity, not on performance. A Rust
261 266 # implementation should provide a efficient version of the same binary
262 267 # persistence. This reference python implementation is never meant to be
263 268 # extensively use in production.
264 269
265 270
266 271 def persistent_data(index):
267 272 """return the persistent binary form for a nodemap for a given index
268 273 """
269 274 trie = _build_trie(index)
270 275 return _persist_trie(trie)
271 276
272 277
273 278 def update_persistent_data(index, root, max_idx, last_rev):
274 279 """return the incremental update for persistent nodemap from a given index
275 280 """
276 281 changed_block, trie = _update_trie(index, root, last_rev)
277 282 return (
278 283 changed_block * S_BLOCK.size,
279 284 _persist_trie(trie, existing_idx=max_idx),
280 285 )
281 286
282 287
283 288 S_BLOCK = struct.Struct(">" + ("l" * 16))
284 289
285 290 NO_ENTRY = -1
286 291 # rev 0 need to be -2 because 0 is used by block, -1 is a special value.
287 292 REV_OFFSET = 2
288 293
289 294
290 295 def _transform_rev(rev):
291 296 """Return the number used to represent the rev in the tree.
292 297
293 298 (or retrieve a rev number from such representation)
294 299
295 300 Note that this is an involution, a function equal to its inverse (i.e.
296 301 which gives the identity when applied to itself).
297 302 """
298 303 return -(rev + REV_OFFSET)
299 304
300 305
301 306 def _to_int(hex_digit):
302 307 """turn an hexadecimal digit into a proper integer"""
303 308 return int(hex_digit, 16)
304 309
305 310
306 311 class Block(dict):
307 312 """represent a block of the Trie
308 313
309 314 contains up to 16 entry indexed from 0 to 15"""
310 315
311 316 def __init__(self):
312 317 super(Block, self).__init__()
313 318 # If this block exist on disk, here is its ID
314 319 self.ondisk_id = None
315 320
316 321 def __iter__(self):
317 322 return iter(self.get(i) for i in range(16))
318 323
319 324
320 325 def _build_trie(index):
321 326 """build a nodemap trie
322 327
323 328 The nodemap stores revision number for each unique prefix.
324 329
325 330 Each block is a dictionary with keys in `[0, 15]`. Values are either
326 331 another block or a revision number.
327 332 """
328 333 root = Block()
329 334 for rev in range(len(index)):
330 335 hex = nodemod.hex(index[rev][7])
331 336 _insert_into_block(index, 0, root, rev, hex)
332 337 return root
333 338
334 339
335 340 def _update_trie(index, root, last_rev):
336 341 """consume"""
337 342 changed = 0
338 343 for rev in range(last_rev + 1, len(index)):
339 344 hex = nodemod.hex(index[rev][7])
340 345 changed += _insert_into_block(index, 0, root, rev, hex)
341 346 return changed, root
342 347
343 348
344 349 def _insert_into_block(index, level, block, current_rev, current_hex):
345 350 """insert a new revision in a block
346 351
347 352 index: the index we are adding revision for
348 353 level: the depth of the current block in the trie
349 354 block: the block currently being considered
350 355 current_rev: the revision number we are adding
351 356 current_hex: the hexadecimal representation of the of that revision
352 357 """
353 358 changed = 1
354 359 if block.ondisk_id is not None:
355 360 block.ondisk_id = None
356 361 hex_digit = _to_int(current_hex[level : level + 1])
357 362 entry = block.get(hex_digit)
358 363 if entry is None:
359 364 # no entry, simply store the revision number
360 365 block[hex_digit] = current_rev
361 366 elif isinstance(entry, dict):
362 367 # need to recurse to an underlying block
363 368 changed += _insert_into_block(
364 369 index, level + 1, entry, current_rev, current_hex
365 370 )
366 371 else:
367 372 # collision with a previously unique prefix, inserting new
368 373 # vertices to fit both entry.
369 374 other_hex = nodemod.hex(index[entry][7])
370 375 other_rev = entry
371 376 new = Block()
372 377 block[hex_digit] = new
373 378 _insert_into_block(index, level + 1, new, other_rev, other_hex)
374 379 _insert_into_block(index, level + 1, new, current_rev, current_hex)
375 380 return changed
376 381
377 382
378 383 def _persist_trie(root, existing_idx=None):
379 384 """turn a nodemap trie into persistent binary data
380 385
381 386 See `_build_trie` for nodemap trie structure"""
382 387 block_map = {}
383 388 if existing_idx is not None:
384 389 base_idx = existing_idx + 1
385 390 else:
386 391 base_idx = 0
387 392 chunks = []
388 393 for tn in _walk_trie(root):
389 394 if tn.ondisk_id is not None:
390 395 block_map[id(tn)] = tn.ondisk_id
391 396 else:
392 397 block_map[id(tn)] = len(chunks) + base_idx
393 398 chunks.append(_persist_block(tn, block_map))
394 399 return b''.join(chunks)
395 400
396 401
397 402 def _walk_trie(block):
398 403 """yield all the block in a trie
399 404
400 405 Children blocks are always yield before their parent block.
401 406 """
402 407 for (_, item) in sorted(block.items()):
403 408 if isinstance(item, dict):
404 409 for sub_block in _walk_trie(item):
405 410 yield sub_block
406 411 yield block
407 412
408 413
409 414 def _persist_block(block_node, block_map):
410 415 """produce persistent binary data for a single block
411 416
412 417 Children block are assumed to be already persisted and present in
413 418 block_map.
414 419 """
415 420 data = tuple(_to_value(v, block_map) for v in block_node)
416 421 return S_BLOCK.pack(*data)
417 422
418 423
419 424 def _to_value(item, block_map):
420 425 """persist any value as an integer"""
421 426 if item is None:
422 427 return NO_ENTRY
423 428 elif isinstance(item, dict):
424 429 return block_map[id(item)]
425 430 else:
426 431 return _transform_rev(item)
427 432
428 433
429 434 def parse_data(data):
430 435 """parse parse nodemap data into a nodemap Trie"""
431 436 if (len(data) % S_BLOCK.size) != 0:
432 437 msg = "nodemap data size is not a multiple of block size (%d): %d"
433 438 raise error.Abort(msg % (S_BLOCK.size, len(data)))
434 439 if not data:
435 440 return Block(), None
436 441 block_map = {}
437 442 new_blocks = []
438 443 for i in range(0, len(data), S_BLOCK.size):
439 444 block = Block()
440 445 block.ondisk_id = len(block_map)
441 446 block_map[block.ondisk_id] = block
442 447 block_data = data[i : i + S_BLOCK.size]
443 448 values = S_BLOCK.unpack(block_data)
444 449 new_blocks.append((block, values))
445 450 for b, values in new_blocks:
446 451 for idx, v in enumerate(values):
447 452 if v == NO_ENTRY:
448 453 continue
449 454 elif v >= 0:
450 455 b[idx] = block_map[v]
451 456 else:
452 457 b[idx] = _transform_rev(v)
453 458 return block, i // S_BLOCK.size
454 459
455 460
456 461 # debug utility
457 462
458 463
459 464 def check_data(ui, index, data):
460 465 """verify that the provided nodemap data are valid for the given idex"""
461 466 ret = 0
462 467 ui.status((b"revision in index: %d\n") % len(index))
463 468 root, __ = parse_data(data)
464 469 all_revs = set(_all_revisions(root))
465 470 ui.status((b"revision in nodemap: %d\n") % len(all_revs))
466 471 for r in range(len(index)):
467 472 if r not in all_revs:
468 473 msg = b" revision missing from nodemap: %d\n" % r
469 474 ui.write_err(msg)
470 475 ret = 1
471 476 else:
472 477 all_revs.remove(r)
473 478 nm_rev = _find_node(root, nodemod.hex(index[r][7]))
474 479 if nm_rev is None:
475 480 msg = b" revision node does not match any entries: %d\n" % r
476 481 ui.write_err(msg)
477 482 ret = 1
478 483 elif nm_rev != r:
479 484 msg = (
480 485 b" revision node does not match the expected revision: "
481 486 b"%d != %d\n" % (r, nm_rev)
482 487 )
483 488 ui.write_err(msg)
484 489 ret = 1
485 490
486 491 if all_revs:
487 492 for r in sorted(all_revs):
488 493 msg = b" extra revision in nodemap: %d\n" % r
489 494 ui.write_err(msg)
490 495 ret = 1
491 496 return ret
492 497
493 498
494 499 def _all_revisions(root):
495 500 """return all revisions stored in a Trie"""
496 501 for block in _walk_trie(root):
497 502 for v in block:
498 503 if v is None or isinstance(v, Block):
499 504 continue
500 505 yield v
501 506
502 507
503 508 def _find_node(block, node):
504 509 """find the revision associated with a given node"""
505 510 entry = block.get(_to_int(node[0:1]))
506 511 if isinstance(entry, dict):
507 512 return _find_node(entry, node[1:])
508 513 return entry
General Comments 0
You need to be logged in to leave comments. Login now