##// END OF EJS Templates
nodemap: match comment to actual code...
Joerg Sonnenberger -
r46815:8e7ce655 default
parent child Browse files
Show More
@@ -1,642 +1,644
1 1 # nodemap.py - nodemap related code and utilities
2 2 #
3 3 # Copyright 2019 Pierre-Yves David <pierre-yves.david@octobus.net>
4 4 # Copyright 2019 George Racinet <georges.racinet@octobus.net>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from __future__ import absolute_import
10 10
11 11 import errno
12 12 import os
13 13 import re
14 14 import struct
15 15
16 16 from ..i18n import _
17 17 from ..node import hex
18 18
19 19 from .. import (
20 20 error,
21 21 util,
22 22 )
23 23
24 24
25 25 class NodeMap(dict):
26 26 def __missing__(self, x):
27 27 raise error.RevlogError(b'unknown node: %s' % x)
28 28
29 29
30 30 def persisted_data(revlog):
31 31 """read the nodemap for a revlog from disk"""
32 32 if revlog.nodemap_file is None:
33 33 return None
34 34 pdata = revlog.opener.tryread(revlog.nodemap_file)
35 35 if not pdata:
36 36 return None
37 37 offset = 0
38 38 (version,) = S_VERSION.unpack(pdata[offset : offset + S_VERSION.size])
39 39 if version != ONDISK_VERSION:
40 40 return None
41 41 offset += S_VERSION.size
42 42 headers = S_HEADER.unpack(pdata[offset : offset + S_HEADER.size])
43 43 uid_size, tip_rev, data_length, data_unused, tip_node_size = headers
44 44 offset += S_HEADER.size
45 45 docket = NodeMapDocket(pdata[offset : offset + uid_size])
46 46 offset += uid_size
47 47 docket.tip_rev = tip_rev
48 48 docket.tip_node = pdata[offset : offset + tip_node_size]
49 49 docket.data_length = data_length
50 50 docket.data_unused = data_unused
51 51
52 52 filename = _rawdata_filepath(revlog, docket)
53 53 use_mmap = revlog.opener.options.get(b"persistent-nodemap.mmap")
54 54 try:
55 55 with revlog.opener(filename) as fd:
56 56 if use_mmap:
57 57 data = util.buffer(util.mmapread(fd, data_length))
58 58 else:
59 59 data = fd.read(data_length)
60 60 except OSError as e:
61 61 if e.errno == errno.ENOENT:
62 62 return None
63 63 else:
64 64 raise
65 65 if len(data) < data_length:
66 66 return None
67 67 return docket, data
68 68
69 69
70 70 def setup_persistent_nodemap(tr, revlog):
71 71 """Install whatever is needed transaction side to persist a nodemap on disk
72 72
73 73 (only actually persist the nodemap if this is relevant for this revlog)
74 74 """
75 75 if revlog._inline:
76 76 return # inlined revlog are too small for this to be relevant
77 77 if revlog.nodemap_file is None:
78 78 return # we do not use persistent_nodemap on this revlog
79 79
80 80 # we need to happen after the changelog finalization, in that use "cl-"
81 81 callback_id = b"nm-revlog-persistent-nodemap-%s" % revlog.nodemap_file
82 82 if tr.hasfinalize(callback_id):
83 83 return # no need to register again
84 84 tr.addpending(
85 85 callback_id, lambda tr: _persist_nodemap(tr, revlog, pending=True)
86 86 )
87 87 tr.addfinalize(callback_id, lambda tr: _persist_nodemap(tr, revlog))
88 88
89 89
90 90 class _NoTransaction(object):
91 91 """transaction like object to update the nodemap outside a transaction"""
92 92
93 93 def __init__(self):
94 94 self._postclose = {}
95 95
96 96 def addpostclose(self, callback_id, callback_func):
97 97 self._postclose[callback_id] = callback_func
98 98
99 99 def registertmp(self, *args, **kwargs):
100 100 pass
101 101
102 102 def addbackup(self, *args, **kwargs):
103 103 pass
104 104
105 105 def add(self, *args, **kwargs):
106 106 pass
107 107
108 108 def addabort(self, *args, **kwargs):
109 109 pass
110 110
111 111 def _report(self, *args):
112 112 pass
113 113
114 114
115 115 def update_persistent_nodemap(revlog):
116 116 """update the persistent nodemap right now
117 117
118 118 To be used for updating the nodemap on disk outside of a normal transaction
119 119 setup (eg, `debugupdatecache`).
120 120 """
121 121 if revlog._inline:
122 122 return # inlined revlog are too small for this to be relevant
123 123 if revlog.nodemap_file is None:
124 124 return # we do not use persistent_nodemap on this revlog
125 125
126 126 notr = _NoTransaction()
127 127 _persist_nodemap(notr, revlog)
128 128 for k in sorted(notr._postclose):
129 129 notr._postclose[k](None)
130 130
131 131
132 132 def _persist_nodemap(tr, revlog, pending=False):
133 133 """Write nodemap data on disk for a given revlog"""
134 134 if getattr(revlog, 'filteredrevs', ()):
135 135 raise error.ProgrammingError(
136 136 "cannot persist nodemap of a filtered changelog"
137 137 )
138 138 if revlog.nodemap_file is None:
139 139 msg = "calling persist nodemap on a revlog without the feature enableb"
140 140 raise error.ProgrammingError(msg)
141 141
142 142 can_incremental = util.safehasattr(revlog.index, "nodemap_data_incremental")
143 143 ondisk_docket = revlog._nodemap_docket
144 144 feed_data = util.safehasattr(revlog.index, "update_nodemap_data")
145 145 use_mmap = revlog.opener.options.get(b"persistent-nodemap.mmap")
146 146 mode = revlog.opener.options.get(b"persistent-nodemap.mode")
147 147 if not can_incremental:
148 148 msg = _(b"persistent nodemap in strict mode without efficient method")
149 149 if mode == b'warn':
150 150 tr._report(b"%s\n" % msg)
151 151 elif mode == b'strict':
152 152 raise error.Abort(msg)
153 153
154 154 data = None
155 155 # first attemp an incremental update of the data
156 156 if can_incremental and ondisk_docket is not None:
157 157 target_docket = revlog._nodemap_docket.copy()
158 158 (
159 159 src_docket,
160 160 data_changed_count,
161 161 data,
162 162 ) = revlog.index.nodemap_data_incremental()
163 163 new_length = target_docket.data_length + len(data)
164 164 new_unused = target_docket.data_unused + data_changed_count
165 165 if src_docket != target_docket:
166 166 data = None
167 167 elif new_length <= (new_unused * 10): # under 10% of unused data
168 168 data = None
169 169 else:
170 170 datafile = _rawdata_filepath(revlog, target_docket)
171 171 # EXP-TODO: if this is a cache, this should use a cache vfs, not a
172 172 # store vfs
173 173 tr.add(datafile, target_docket.data_length)
174 174 with revlog.opener(datafile, b'r+') as fd:
175 175 fd.seek(target_docket.data_length)
176 176 fd.write(data)
177 177 if feed_data:
178 178 if use_mmap:
179 179 fd.seek(0)
180 180 new_data = fd.read(new_length)
181 181 else:
182 182 fd.flush()
183 183 new_data = util.buffer(util.mmapread(fd, new_length))
184 184 target_docket.data_length = new_length
185 185 target_docket.data_unused = new_unused
186 186
187 187 if data is None:
188 188 # otherwise fallback to a full new export
189 189 target_docket = NodeMapDocket()
190 190 datafile = _rawdata_filepath(revlog, target_docket)
191 191 if util.safehasattr(revlog.index, "nodemap_data_all"):
192 192 data = revlog.index.nodemap_data_all()
193 193 else:
194 194 data = persistent_data(revlog.index)
195 195 # EXP-TODO: if this is a cache, this should use a cache vfs, not a
196 196 # store vfs
197 197
198 198 tryunlink = revlog.opener.tryunlink
199 199
200 200 def abortck(tr):
201 201 tryunlink(datafile)
202 202
203 203 callback_id = b"delete-%s" % datafile
204 204
205 205 # some flavor of the transaction abort does not cleanup new file, it
206 206 # simply empty them.
207 207 tr.addabort(callback_id, abortck)
208 208 with revlog.opener(datafile, b'w+') as fd:
209 209 fd.write(data)
210 210 if feed_data:
211 211 if use_mmap:
212 212 new_data = data
213 213 else:
214 214 fd.flush()
215 215 new_data = util.buffer(util.mmapread(fd, len(data)))
216 216 target_docket.data_length = len(data)
217 217 target_docket.tip_rev = revlog.tiprev()
218 218 target_docket.tip_node = revlog.node(target_docket.tip_rev)
219 219 # EXP-TODO: if this is a cache, this should use a cache vfs, not a
220 220 # store vfs
221 221 file_path = revlog.nodemap_file
222 222 if pending:
223 223 file_path += b'.a'
224 224 tr.registertmp(file_path)
225 225 else:
226 226 tr.addbackup(file_path)
227 227
228 228 with revlog.opener(file_path, b'w', atomictemp=True) as fp:
229 229 fp.write(target_docket.serialize())
230 230 revlog._nodemap_docket = target_docket
231 231 if feed_data:
232 232 revlog.index.update_nodemap_data(target_docket, new_data)
233 233
234 234 # search for old index file in all cases, some older process might have
235 235 # left one behind.
236 236 olds = _other_rawdata_filepath(revlog, target_docket)
237 237 if olds:
238 238 realvfs = getattr(revlog, '_realopener', revlog.opener)
239 239
240 240 def cleanup(tr):
241 241 for oldfile in olds:
242 242 realvfs.tryunlink(oldfile)
243 243
244 244 callback_id = b"revlog-cleanup-nodemap-%s" % revlog.nodemap_file
245 245 tr.addpostclose(callback_id, cleanup)
246 246
247 247
248 248 ### Nodemap docket file
249 249 #
250 250 # The nodemap data are stored on disk using 2 files:
251 251 #
252 252 # * a raw data files containing a persistent nodemap
253 253 # (see `Nodemap Trie` section)
254 254 #
255 255 # * a small "docket" file containing medatadata
256 256 #
257 257 # While the nodemap data can be multiple tens of megabytes, the "docket" is
258 258 # small, it is easy to update it automatically or to duplicated its content
259 259 # during a transaction.
260 260 #
261 261 # Multiple raw data can exist at the same time (The currently valid one and a
262 262 # new one beind used by an in progress transaction). To accomodate this, the
263 263 # filename hosting the raw data has a variable parts. The exact filename is
264 264 # specified inside the "docket" file.
265 265 #
266 266 # The docket file contains information to find, qualify and validate the raw
267 267 # data. Its content is currently very light, but it will expand as the on disk
268 268 # nodemap gains the necessary features to be used in production.
269 269
270 270 ONDISK_VERSION = 1
271 271 S_VERSION = struct.Struct(">B")
272 272 S_HEADER = struct.Struct(">BQQQQ")
273 273
274 274 ID_SIZE = 8
275 275
276 276
277 277 def _make_uid():
278 278 """return a new unique identifier.
279 279
280 280 The identifier is random and composed of ascii characters."""
281 281 return hex(os.urandom(ID_SIZE))
282 282
283 283
284 284 class NodeMapDocket(object):
285 285 """metadata associated with persistent nodemap data
286 286
287 287 The persistent data may come from disk or be on their way to disk.
288 288 """
289 289
290 290 def __init__(self, uid=None):
291 291 if uid is None:
292 292 uid = _make_uid()
293 293 # a unique identifier for the data file:
294 294 # - When new data are appended, it is preserved.
295 295 # - When a new data file is created, a new identifier is generated.
296 296 self.uid = uid
297 297 # the tipmost revision stored in the data file. This revision and all
298 298 # revision before it are expected to be encoded in the data file.
299 299 self.tip_rev = None
300 300 # the node of that tipmost revision, if it mismatch the current index
301 301 # data the docket is not valid for the current index and should be
302 302 # discarded.
303 303 #
304 304 # note: this method is not perfect as some destructive operation could
305 305 # preserve the same tip_rev + tip_node while altering lower revision.
306 306 # However this multiple other caches have the same vulnerability (eg:
307 307 # brancmap cache).
308 308 self.tip_node = None
309 309 # the size (in bytes) of the persisted data to encode the nodemap valid
310 310 # for `tip_rev`.
311 311 # - data file shorter than this are corrupted,
312 312 # - any extra data should be ignored.
313 313 self.data_length = None
314 314 # the amount (in bytes) of "dead" data, still in the data file but no
315 315 # longer used for the nodemap.
316 316 self.data_unused = 0
317 317
318 318 def copy(self):
319 319 new = NodeMapDocket(uid=self.uid)
320 320 new.tip_rev = self.tip_rev
321 321 new.tip_node = self.tip_node
322 322 new.data_length = self.data_length
323 323 new.data_unused = self.data_unused
324 324 return new
325 325
326 326 def __cmp__(self, other):
327 327 if self.uid < other.uid:
328 328 return -1
329 329 if self.uid > other.uid:
330 330 return 1
331 331 elif self.data_length < other.data_length:
332 332 return -1
333 333 elif self.data_length > other.data_length:
334 334 return 1
335 335 return 0
336 336
337 337 def __eq__(self, other):
338 338 return self.uid == other.uid and self.data_length == other.data_length
339 339
340 340 def serialize(self):
341 341 """return serialized bytes for a docket using the passed uid"""
342 342 data = []
343 343 data.append(S_VERSION.pack(ONDISK_VERSION))
344 344 headers = (
345 345 len(self.uid),
346 346 self.tip_rev,
347 347 self.data_length,
348 348 self.data_unused,
349 349 len(self.tip_node),
350 350 )
351 351 data.append(S_HEADER.pack(*headers))
352 352 data.append(self.uid)
353 353 data.append(self.tip_node)
354 354 return b''.join(data)
355 355
356 356
357 357 def _rawdata_filepath(revlog, docket):
358 358 """The (vfs relative) nodemap's rawdata file for a given uid"""
359 359 if revlog.nodemap_file.endswith(b'.n.a'):
360 360 prefix = revlog.nodemap_file[:-4]
361 361 else:
362 362 prefix = revlog.nodemap_file[:-2]
363 363 return b"%s-%s.nd" % (prefix, docket.uid)
364 364
365 365
366 366 def _other_rawdata_filepath(revlog, docket):
367 367 prefix = revlog.nodemap_file[:-2]
368 368 pattern = re.compile(br"(^|/)%s-[0-9a-f]+\.nd$" % prefix)
369 369 new_file_path = _rawdata_filepath(revlog, docket)
370 370 new_file_name = revlog.opener.basename(new_file_path)
371 371 dirpath = revlog.opener.dirname(new_file_path)
372 372 others = []
373 373 for f in revlog.opener.listdir(dirpath):
374 374 if pattern.match(f) and f != new_file_name:
375 375 others.append(f)
376 376 return others
377 377
378 378
379 379 ### Nodemap Trie
380 380 #
381 381 # This is a simple reference implementation to compute and persist a nodemap
382 382 # trie. This reference implementation is write only. The python version of this
383 383 # is not expected to be actually used, since it wont provide performance
384 384 # improvement over existing non-persistent C implementation.
385 385 #
386 386 # The nodemap is persisted as Trie using 4bits-address/16-entries block. each
387 387 # revision can be adressed using its node shortest prefix.
388 388 #
389 389 # The trie is stored as a sequence of block. Each block contains 16 entries
390 390 # (signed 64bit integer, big endian). Each entry can be one of the following:
391 391 #
392 392 # * value >= 0 -> index of sub-block
393 393 # * value == -1 -> no value
394 # * value < -1 -> a revision value: rev = -(value+10)
394 # * value < -1 -> encoded revision: rev = -(value+2)
395 #
396 # See REV_OFFSET and _transform_rev below.
395 397 #
396 398 # The implementation focus on simplicity, not on performance. A Rust
397 399 # implementation should provide a efficient version of the same binary
398 400 # persistence. This reference python implementation is never meant to be
399 401 # extensively use in production.
400 402
401 403
402 404 def persistent_data(index):
403 405 """return the persistent binary form for a nodemap for a given index"""
404 406 trie = _build_trie(index)
405 407 return _persist_trie(trie)
406 408
407 409
408 410 def update_persistent_data(index, root, max_idx, last_rev):
409 411 """return the incremental update for persistent nodemap from a given index"""
410 412 changed_block, trie = _update_trie(index, root, last_rev)
411 413 return (
412 414 changed_block * S_BLOCK.size,
413 415 _persist_trie(trie, existing_idx=max_idx),
414 416 )
415 417
416 418
417 419 S_BLOCK = struct.Struct(">" + ("l" * 16))
418 420
419 421 NO_ENTRY = -1
420 422 # rev 0 need to be -2 because 0 is used by block, -1 is a special value.
421 423 REV_OFFSET = 2
422 424
423 425
424 426 def _transform_rev(rev):
425 427 """Return the number used to represent the rev in the tree.
426 428
427 429 (or retrieve a rev number from such representation)
428 430
429 431 Note that this is an involution, a function equal to its inverse (i.e.
430 432 which gives the identity when applied to itself).
431 433 """
432 434 return -(rev + REV_OFFSET)
433 435
434 436
435 437 def _to_int(hex_digit):
436 438 """turn an hexadecimal digit into a proper integer"""
437 439 return int(hex_digit, 16)
438 440
439 441
440 442 class Block(dict):
441 443 """represent a block of the Trie
442 444
443 445 contains up to 16 entry indexed from 0 to 15"""
444 446
445 447 def __init__(self):
446 448 super(Block, self).__init__()
447 449 # If this block exist on disk, here is its ID
448 450 self.ondisk_id = None
449 451
450 452 def __iter__(self):
451 453 return iter(self.get(i) for i in range(16))
452 454
453 455
454 456 def _build_trie(index):
455 457 """build a nodemap trie
456 458
457 459 The nodemap stores revision number for each unique prefix.
458 460
459 461 Each block is a dictionary with keys in `[0, 15]`. Values are either
460 462 another block or a revision number.
461 463 """
462 464 root = Block()
463 465 for rev in range(len(index)):
464 466 current_hex = hex(index[rev][7])
465 467 _insert_into_block(index, 0, root, rev, current_hex)
466 468 return root
467 469
468 470
469 471 def _update_trie(index, root, last_rev):
470 472 """consume"""
471 473 changed = 0
472 474 for rev in range(last_rev + 1, len(index)):
473 475 current_hex = hex(index[rev][7])
474 476 changed += _insert_into_block(index, 0, root, rev, current_hex)
475 477 return changed, root
476 478
477 479
478 480 def _insert_into_block(index, level, block, current_rev, current_hex):
479 481 """insert a new revision in a block
480 482
481 483 index: the index we are adding revision for
482 484 level: the depth of the current block in the trie
483 485 block: the block currently being considered
484 486 current_rev: the revision number we are adding
485 487 current_hex: the hexadecimal representation of the of that revision
486 488 """
487 489 changed = 1
488 490 if block.ondisk_id is not None:
489 491 block.ondisk_id = None
490 492 hex_digit = _to_int(current_hex[level : level + 1])
491 493 entry = block.get(hex_digit)
492 494 if entry is None:
493 495 # no entry, simply store the revision number
494 496 block[hex_digit] = current_rev
495 497 elif isinstance(entry, dict):
496 498 # need to recurse to an underlying block
497 499 changed += _insert_into_block(
498 500 index, level + 1, entry, current_rev, current_hex
499 501 )
500 502 else:
501 503 # collision with a previously unique prefix, inserting new
502 504 # vertices to fit both entry.
503 505 other_hex = hex(index[entry][7])
504 506 other_rev = entry
505 507 new = Block()
506 508 block[hex_digit] = new
507 509 _insert_into_block(index, level + 1, new, other_rev, other_hex)
508 510 _insert_into_block(index, level + 1, new, current_rev, current_hex)
509 511 return changed
510 512
511 513
512 514 def _persist_trie(root, existing_idx=None):
513 515 """turn a nodemap trie into persistent binary data
514 516
515 517 See `_build_trie` for nodemap trie structure"""
516 518 block_map = {}
517 519 if existing_idx is not None:
518 520 base_idx = existing_idx + 1
519 521 else:
520 522 base_idx = 0
521 523 chunks = []
522 524 for tn in _walk_trie(root):
523 525 if tn.ondisk_id is not None:
524 526 block_map[id(tn)] = tn.ondisk_id
525 527 else:
526 528 block_map[id(tn)] = len(chunks) + base_idx
527 529 chunks.append(_persist_block(tn, block_map))
528 530 return b''.join(chunks)
529 531
530 532
531 533 def _walk_trie(block):
532 534 """yield all the block in a trie
533 535
534 536 Children blocks are always yield before their parent block.
535 537 """
536 538 for (__, item) in sorted(block.items()):
537 539 if isinstance(item, dict):
538 540 for sub_block in _walk_trie(item):
539 541 yield sub_block
540 542 yield block
541 543
542 544
543 545 def _persist_block(block_node, block_map):
544 546 """produce persistent binary data for a single block
545 547
546 548 Children block are assumed to be already persisted and present in
547 549 block_map.
548 550 """
549 551 data = tuple(_to_value(v, block_map) for v in block_node)
550 552 return S_BLOCK.pack(*data)
551 553
552 554
553 555 def _to_value(item, block_map):
554 556 """persist any value as an integer"""
555 557 if item is None:
556 558 return NO_ENTRY
557 559 elif isinstance(item, dict):
558 560 return block_map[id(item)]
559 561 else:
560 562 return _transform_rev(item)
561 563
562 564
563 565 def parse_data(data):
564 566 """parse parse nodemap data into a nodemap Trie"""
565 567 if (len(data) % S_BLOCK.size) != 0:
566 568 msg = "nodemap data size is not a multiple of block size (%d): %d"
567 569 raise error.Abort(msg % (S_BLOCK.size, len(data)))
568 570 if not data:
569 571 return Block(), None
570 572 block_map = {}
571 573 new_blocks = []
572 574 for i in range(0, len(data), S_BLOCK.size):
573 575 block = Block()
574 576 block.ondisk_id = len(block_map)
575 577 block_map[block.ondisk_id] = block
576 578 block_data = data[i : i + S_BLOCK.size]
577 579 values = S_BLOCK.unpack(block_data)
578 580 new_blocks.append((block, values))
579 581 for b, values in new_blocks:
580 582 for idx, v in enumerate(values):
581 583 if v == NO_ENTRY:
582 584 continue
583 585 elif v >= 0:
584 586 b[idx] = block_map[v]
585 587 else:
586 588 b[idx] = _transform_rev(v)
587 589 return block, i // S_BLOCK.size
588 590
589 591
590 592 # debug utility
591 593
592 594
593 595 def check_data(ui, index, data):
594 596 """verify that the provided nodemap data are valid for the given idex"""
595 597 ret = 0
596 598 ui.status((b"revision in index: %d\n") % len(index))
597 599 root, __ = parse_data(data)
598 600 all_revs = set(_all_revisions(root))
599 601 ui.status((b"revision in nodemap: %d\n") % len(all_revs))
600 602 for r in range(len(index)):
601 603 if r not in all_revs:
602 604 msg = b" revision missing from nodemap: %d\n" % r
603 605 ui.write_err(msg)
604 606 ret = 1
605 607 else:
606 608 all_revs.remove(r)
607 609 nm_rev = _find_node(root, hex(index[r][7]))
608 610 if nm_rev is None:
609 611 msg = b" revision node does not match any entries: %d\n" % r
610 612 ui.write_err(msg)
611 613 ret = 1
612 614 elif nm_rev != r:
613 615 msg = (
614 616 b" revision node does not match the expected revision: "
615 617 b"%d != %d\n" % (r, nm_rev)
616 618 )
617 619 ui.write_err(msg)
618 620 ret = 1
619 621
620 622 if all_revs:
621 623 for r in sorted(all_revs):
622 624 msg = b" extra revision in nodemap: %d\n" % r
623 625 ui.write_err(msg)
624 626 ret = 1
625 627 return ret
626 628
627 629
628 630 def _all_revisions(root):
629 631 """return all revisions stored in a Trie"""
630 632 for block in _walk_trie(root):
631 633 for v in block:
632 634 if v is None or isinstance(v, Block):
633 635 continue
634 636 yield v
635 637
636 638
637 639 def _find_node(block, node):
638 640 """find the revision associated with a given node"""
639 641 entry = block.get(_to_int(node[0:1]))
640 642 if isinstance(entry, dict):
641 643 return _find_node(entry, node[1:])
642 644 return entry
General Comments 0
You need to be logged in to leave comments. Login now