##// END OF EJS Templates
nodemap: move on disk file to version 1...
marmoute -
r45294:261e7175 default
parent child Browse files
Show More
@@ -1,645 +1,644
1 1 # nodemap.py - nodemap related code and utilities
2 2 #
3 3 # Copyright 2019 Pierre-Yves David <pierre-yves.david@octobus.net>
4 4 # Copyright 2019 George Racinet <georges.racinet@octobus.net>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from __future__ import absolute_import
10 10
11 11 import errno
12 12 import os
13 13 import re
14 14 import struct
15 15
16 16 from ..i18n import _
17 17
18 18 from .. import (
19 19 error,
20 20 node as nodemod,
21 21 util,
22 22 )
23 23
24 24
25 25 class NodeMap(dict):
26 26 def __missing__(self, x):
27 27 raise error.RevlogError(b'unknown node: %s' % x)
28 28
29 29
30 30 def persisted_data(revlog):
31 31 """read the nodemap for a revlog from disk"""
32 32 if revlog.nodemap_file is None:
33 33 return None
34 34 pdata = revlog.opener.tryread(revlog.nodemap_file)
35 35 if not pdata:
36 36 return None
37 37 offset = 0
38 38 (version,) = S_VERSION.unpack(pdata[offset : offset + S_VERSION.size])
39 39 if version != ONDISK_VERSION:
40 40 return None
41 41 offset += S_VERSION.size
42 42 headers = S_HEADER.unpack(pdata[offset : offset + S_HEADER.size])
43 43 uid_size, tip_rev, data_length, data_unused, tip_node_size = headers
44 44 offset += S_HEADER.size
45 45 docket = NodeMapDocket(pdata[offset : offset + uid_size])
46 46 offset += uid_size
47 47 docket.tip_rev = tip_rev
48 48 docket.tip_node = pdata[offset : offset + tip_node_size]
49 49 docket.data_length = data_length
50 50 docket.data_unused = data_unused
51 51
52 52 filename = _rawdata_filepath(revlog, docket)
53 53 use_mmap = revlog.opener.options.get(b"exp-persistent-nodemap.mmap")
54 54 try:
55 55 with revlog.opener(filename) as fd:
56 56 if use_mmap:
57 57 data = util.buffer(util.mmapread(fd, data_length))
58 58 else:
59 59 data = fd.read(data_length)
60 60 except OSError as e:
61 61 if e.errno != errno.ENOENT:
62 62 raise
63 63 if len(data) < data_length:
64 64 return None
65 65 return docket, data
66 66
67 67
68 68 def setup_persistent_nodemap(tr, revlog):
69 69 """Install whatever is needed transaction side to persist a nodemap on disk
70 70
71 71 (only actually persist the nodemap if this is relevant for this revlog)
72 72 """
73 73 if revlog._inline:
74 74 return # inlined revlog are too small for this to be relevant
75 75 if revlog.nodemap_file is None:
76 76 return # we do not use persistent_nodemap on this revlog
77 77
78 78 # we need to happen after the changelog finalization, in that use "cl-"
79 79 callback_id = b"nm-revlog-persistent-nodemap-%s" % revlog.nodemap_file
80 80 if tr.hasfinalize(callback_id):
81 81 return # no need to register again
82 82 tr.addpending(
83 83 callback_id, lambda tr: _persist_nodemap(tr, revlog, pending=True)
84 84 )
85 85 tr.addfinalize(callback_id, lambda tr: _persist_nodemap(tr, revlog))
86 86
87 87
88 88 class _NoTransaction(object):
89 89 """transaction like object to update the nodemap outside a transaction
90 90 """
91 91
92 92 def __init__(self):
93 93 self._postclose = {}
94 94
95 95 def addpostclose(self, callback_id, callback_func):
96 96 self._postclose[callback_id] = callback_func
97 97
98 98 def registertmp(self, *args, **kwargs):
99 99 pass
100 100
101 101 def addbackup(self, *args, **kwargs):
102 102 pass
103 103
104 104 def add(self, *args, **kwargs):
105 105 pass
106 106
107 107 def addabort(self, *args, **kwargs):
108 108 pass
109 109
110 110 def _report(self, *args):
111 111 pass
112 112
113 113
114 114 def update_persistent_nodemap(revlog):
115 115 """update the persistent nodemap right now
116 116
117 117 To be used for updating the nodemap on disk outside of a normal transaction
118 118 setup (eg, `debugupdatecache`).
119 119 """
120 120 if revlog._inline:
121 121 return # inlined revlog are too small for this to be relevant
122 122 if revlog.nodemap_file is None:
123 123 return # we do not use persistent_nodemap on this revlog
124 124
125 125 notr = _NoTransaction()
126 126 _persist_nodemap(notr, revlog)
127 127 for k in sorted(notr._postclose):
128 128 notr._postclose[k](None)
129 129
130 130
131 131 def _persist_nodemap(tr, revlog, pending=False):
132 132 """Write nodemap data on disk for a given revlog
133 133 """
134 134 if getattr(revlog, 'filteredrevs', ()):
135 135 raise error.ProgrammingError(
136 136 "cannot persist nodemap of a filtered changelog"
137 137 )
138 138 if revlog.nodemap_file is None:
139 139 msg = "calling persist nodemap on a revlog without the feature enableb"
140 140 raise error.ProgrammingError(msg)
141 141
142 142 can_incremental = util.safehasattr(revlog.index, "nodemap_data_incremental")
143 143 ondisk_docket = revlog._nodemap_docket
144 144 feed_data = util.safehasattr(revlog.index, "update_nodemap_data")
145 145 use_mmap = revlog.opener.options.get(b"exp-persistent-nodemap.mmap")
146 146 mode = revlog.opener.options.get(b"exp-persistent-nodemap.mode")
147 147 if not can_incremental:
148 148 msg = _(b"persistent nodemap in strict mode without efficient method")
149 149 if mode == b'warn':
150 150 tr._report(b"%s\n" % msg)
151 151 elif mode == b'strict':
152 152 raise error.Abort(msg)
153 153
154 154 data = None
155 155 # first attemp an incremental update of the data
156 156 if can_incremental and ondisk_docket is not None:
157 157 target_docket = revlog._nodemap_docket.copy()
158 158 (
159 159 src_docket,
160 160 data_changed_count,
161 161 data,
162 162 ) = revlog.index.nodemap_data_incremental()
163 163 new_length = target_docket.data_length + len(data)
164 164 new_unused = target_docket.data_unused + data_changed_count
165 165 if src_docket != target_docket:
166 166 data = None
167 167 elif new_length <= (new_unused * 10): # under 10% of unused data
168 168 data = None
169 169 else:
170 170 datafile = _rawdata_filepath(revlog, target_docket)
171 171 # EXP-TODO: if this is a cache, this should use a cache vfs, not a
172 172 # store vfs
173 173 tr.add(datafile, target_docket.data_length)
174 174 with revlog.opener(datafile, b'r+') as fd:
175 175 fd.seek(target_docket.data_length)
176 176 fd.write(data)
177 177 if feed_data:
178 178 if use_mmap:
179 179 fd.seek(0)
180 180 new_data = fd.read(new_length)
181 181 else:
182 182 fd.flush()
183 183 new_data = util.buffer(util.mmapread(fd, new_length))
184 184 target_docket.data_length = new_length
185 185 target_docket.data_unused = new_unused
186 186
187 187 if data is None:
188 188 # otherwise fallback to a full new export
189 189 target_docket = NodeMapDocket()
190 190 datafile = _rawdata_filepath(revlog, target_docket)
191 191 if util.safehasattr(revlog.index, "nodemap_data_all"):
192 192 data = revlog.index.nodemap_data_all()
193 193 else:
194 194 data = persistent_data(revlog.index)
195 195 # EXP-TODO: if this is a cache, this should use a cache vfs, not a
196 196 # store vfs
197 197
198 198 tryunlink = revlog.opener.tryunlink
199 199
200 200 def abortck(tr):
201 201 tryunlink(datafile)
202 202
203 203 callback_id = b"delete-%s" % datafile
204 204
205 205 # some flavor of the transaction abort does not cleanup new file, it
206 206 # simply empty them.
207 207 tr.addabort(callback_id, abortck)
208 208 with revlog.opener(datafile, b'w+') as fd:
209 209 fd.write(data)
210 210 if feed_data:
211 211 if use_mmap:
212 212 new_data = data
213 213 else:
214 214 fd.flush()
215 215 new_data = util.buffer(util.mmapread(fd, len(data)))
216 216 target_docket.data_length = len(data)
217 217 target_docket.tip_rev = revlog.tiprev()
218 218 target_docket.tip_node = revlog.node(target_docket.tip_rev)
219 219 # EXP-TODO: if this is a cache, this should use a cache vfs, not a
220 220 # store vfs
221 221 file_path = revlog.nodemap_file
222 222 if pending:
223 223 file_path += b'.a'
224 224 tr.registertmp(file_path)
225 225 else:
226 226 tr.addbackup(file_path)
227 227
228 228 with revlog.opener(file_path, b'w', atomictemp=True) as fp:
229 229 fp.write(target_docket.serialize())
230 230 revlog._nodemap_docket = target_docket
231 231 if feed_data:
232 232 revlog.index.update_nodemap_data(target_docket, new_data)
233 233
234 234 # search for old index file in all cases, some older process might have
235 235 # left one behind.
236 236 olds = _other_rawdata_filepath(revlog, target_docket)
237 237 if olds:
238 238 realvfs = getattr(revlog, '_realopener', revlog.opener)
239 239
240 240 def cleanup(tr):
241 241 for oldfile in olds:
242 242 realvfs.tryunlink(oldfile)
243 243
244 244 callback_id = b"revlog-cleanup-nodemap-%s" % revlog.nodemap_file
245 245 tr.addpostclose(callback_id, cleanup)
246 246
247 247
248 248 ### Nodemap docket file
249 249 #
250 250 # The nodemap data are stored on disk using 2 files:
251 251 #
252 252 # * a raw data files containing a persistent nodemap
253 253 # (see `Nodemap Trie` section)
254 254 #
255 255 # * a small "docket" file containing medatadata
256 256 #
257 257 # While the nodemap data can be multiple tens of megabytes, the "docket" is
258 258 # small, it is easy to update it automatically or to duplicated its content
259 259 # during a transaction.
260 260 #
261 261 # Multiple raw data can exist at the same time (The currently valid one and a
262 262 # new one beind used by an in progress transaction). To accomodate this, the
263 263 # filename hosting the raw data has a variable parts. The exact filename is
264 264 # specified inside the "docket" file.
265 265 #
266 266 # The docket file contains information to find, qualify and validate the raw
267 267 # data. Its content is currently very light, but it will expand as the on disk
268 268 # nodemap gains the necessary features to be used in production.
269 269
270 # version 0 is experimental, no BC garantee, do no use outside of tests.
271 ONDISK_VERSION = 0
270 ONDISK_VERSION = 1
272 271 S_VERSION = struct.Struct(">B")
273 272 S_HEADER = struct.Struct(">BQQQQ")
274 273
275 274 ID_SIZE = 8
276 275
277 276
278 277 def _make_uid():
279 278 """return a new unique identifier.
280 279
281 280 The identifier is random and composed of ascii characters."""
282 281 return nodemod.hex(os.urandom(ID_SIZE))
283 282
284 283
285 284 class NodeMapDocket(object):
286 285 """metadata associated with persistent nodemap data
287 286
288 287 The persistent data may come from disk or be on their way to disk.
289 288 """
290 289
291 290 def __init__(self, uid=None):
292 291 if uid is None:
293 292 uid = _make_uid()
294 293 # a unique identifier for the data file:
295 294 # - When new data are appended, it is preserved.
296 295 # - When a new data file is created, a new identifier is generated.
297 296 self.uid = uid
298 297 # the tipmost revision stored in the data file. This revision and all
299 298 # revision before it are expected to be encoded in the data file.
300 299 self.tip_rev = None
301 300 # the node of that tipmost revision, if it mismatch the current index
302 301 # data the docket is not valid for the current index and should be
303 302 # discarded.
304 303 #
305 304 # note: this method is not perfect as some destructive operation could
306 305 # preserve the same tip_rev + tip_node while altering lower revision.
307 306 # However this multiple other caches have the same vulnerability (eg:
308 307 # brancmap cache).
309 308 self.tip_node = None
310 309 # the size (in bytes) of the persisted data to encode the nodemap valid
311 310 # for `tip_rev`.
312 311 # - data file shorter than this are corrupted,
313 312 # - any extra data should be ignored.
314 313 self.data_length = None
315 314 # the amount (in bytes) of "dead" data, still in the data file but no
316 315 # longer used for the nodemap.
317 316 self.data_unused = 0
318 317
319 318 def copy(self):
320 319 new = NodeMapDocket(uid=self.uid)
321 320 new.tip_rev = self.tip_rev
322 321 new.tip_node = self.tip_node
323 322 new.data_length = self.data_length
324 323 new.data_unused = self.data_unused
325 324 return new
326 325
327 326 def __cmp__(self, other):
328 327 if self.uid < other.uid:
329 328 return -1
330 329 if self.uid > other.uid:
331 330 return 1
332 331 elif self.data_length < other.data_length:
333 332 return -1
334 333 elif self.data_length > other.data_length:
335 334 return 1
336 335 return 0
337 336
338 337 def __eq__(self, other):
339 338 return self.uid == other.uid and self.data_length == other.data_length
340 339
341 340 def serialize(self):
342 341 """return serialized bytes for a docket using the passed uid"""
343 342 data = []
344 343 data.append(S_VERSION.pack(ONDISK_VERSION))
345 344 headers = (
346 345 len(self.uid),
347 346 self.tip_rev,
348 347 self.data_length,
349 348 self.data_unused,
350 349 len(self.tip_node),
351 350 )
352 351 data.append(S_HEADER.pack(*headers))
353 352 data.append(self.uid)
354 353 data.append(self.tip_node)
355 354 return b''.join(data)
356 355
357 356
358 357 def _rawdata_filepath(revlog, docket):
359 358 """The (vfs relative) nodemap's rawdata file for a given uid"""
360 359 if revlog.nodemap_file.endswith(b'.n.a'):
361 360 prefix = revlog.nodemap_file[:-4]
362 361 else:
363 362 prefix = revlog.nodemap_file[:-2]
364 363 return b"%s-%s.nd" % (prefix, docket.uid)
365 364
366 365
367 366 def _other_rawdata_filepath(revlog, docket):
368 367 prefix = revlog.nodemap_file[:-2]
369 368 pattern = re.compile(br"(^|/)%s-[0-9a-f]+\.nd$" % prefix)
370 369 new_file_path = _rawdata_filepath(revlog, docket)
371 370 new_file_name = revlog.opener.basename(new_file_path)
372 371 dirpath = revlog.opener.dirname(new_file_path)
373 372 others = []
374 373 for f in revlog.opener.listdir(dirpath):
375 374 if pattern.match(f) and f != new_file_name:
376 375 others.append(f)
377 376 return others
378 377
379 378
380 379 ### Nodemap Trie
381 380 #
382 381 # This is a simple reference implementation to compute and persist a nodemap
383 382 # trie. This reference implementation is write only. The python version of this
384 383 # is not expected to be actually used, since it wont provide performance
385 384 # improvement over existing non-persistent C implementation.
386 385 #
387 386 # The nodemap is persisted as Trie using 4bits-address/16-entries block. each
388 387 # revision can be adressed using its node shortest prefix.
389 388 #
390 389 # The trie is stored as a sequence of block. Each block contains 16 entries
391 390 # (signed 64bit integer, big endian). Each entry can be one of the following:
392 391 #
393 392 # * value >= 0 -> index of sub-block
394 393 # * value == -1 -> no value
395 394 # * value < -1 -> a revision value: rev = -(value+10)
396 395 #
397 396 # The implementation focus on simplicity, not on performance. A Rust
398 397 # implementation should provide a efficient version of the same binary
399 398 # persistence. This reference python implementation is never meant to be
400 399 # extensively use in production.
401 400
402 401
403 402 def persistent_data(index):
404 403 """return the persistent binary form for a nodemap for a given index
405 404 """
406 405 trie = _build_trie(index)
407 406 return _persist_trie(trie)
408 407
409 408
410 409 def update_persistent_data(index, root, max_idx, last_rev):
411 410 """return the incremental update for persistent nodemap from a given index
412 411 """
413 412 changed_block, trie = _update_trie(index, root, last_rev)
414 413 return (
415 414 changed_block * S_BLOCK.size,
416 415 _persist_trie(trie, existing_idx=max_idx),
417 416 )
418 417
419 418
420 419 S_BLOCK = struct.Struct(">" + ("l" * 16))
421 420
422 421 NO_ENTRY = -1
423 422 # rev 0 need to be -2 because 0 is used by block, -1 is a special value.
424 423 REV_OFFSET = 2
425 424
426 425
427 426 def _transform_rev(rev):
428 427 """Return the number used to represent the rev in the tree.
429 428
430 429 (or retrieve a rev number from such representation)
431 430
432 431 Note that this is an involution, a function equal to its inverse (i.e.
433 432 which gives the identity when applied to itself).
434 433 """
435 434 return -(rev + REV_OFFSET)
436 435
437 436
438 437 def _to_int(hex_digit):
439 438 """turn an hexadecimal digit into a proper integer"""
440 439 return int(hex_digit, 16)
441 440
442 441
443 442 class Block(dict):
444 443 """represent a block of the Trie
445 444
446 445 contains up to 16 entry indexed from 0 to 15"""
447 446
448 447 def __init__(self):
449 448 super(Block, self).__init__()
450 449 # If this block exist on disk, here is its ID
451 450 self.ondisk_id = None
452 451
453 452 def __iter__(self):
454 453 return iter(self.get(i) for i in range(16))
455 454
456 455
457 456 def _build_trie(index):
458 457 """build a nodemap trie
459 458
460 459 The nodemap stores revision number for each unique prefix.
461 460
462 461 Each block is a dictionary with keys in `[0, 15]`. Values are either
463 462 another block or a revision number.
464 463 """
465 464 root = Block()
466 465 for rev in range(len(index)):
467 466 hex = nodemod.hex(index[rev][7])
468 467 _insert_into_block(index, 0, root, rev, hex)
469 468 return root
470 469
471 470
472 471 def _update_trie(index, root, last_rev):
473 472 """consume"""
474 473 changed = 0
475 474 for rev in range(last_rev + 1, len(index)):
476 475 hex = nodemod.hex(index[rev][7])
477 476 changed += _insert_into_block(index, 0, root, rev, hex)
478 477 return changed, root
479 478
480 479
481 480 def _insert_into_block(index, level, block, current_rev, current_hex):
482 481 """insert a new revision in a block
483 482
484 483 index: the index we are adding revision for
485 484 level: the depth of the current block in the trie
486 485 block: the block currently being considered
487 486 current_rev: the revision number we are adding
488 487 current_hex: the hexadecimal representation of the of that revision
489 488 """
490 489 changed = 1
491 490 if block.ondisk_id is not None:
492 491 block.ondisk_id = None
493 492 hex_digit = _to_int(current_hex[level : level + 1])
494 493 entry = block.get(hex_digit)
495 494 if entry is None:
496 495 # no entry, simply store the revision number
497 496 block[hex_digit] = current_rev
498 497 elif isinstance(entry, dict):
499 498 # need to recurse to an underlying block
500 499 changed += _insert_into_block(
501 500 index, level + 1, entry, current_rev, current_hex
502 501 )
503 502 else:
504 503 # collision with a previously unique prefix, inserting new
505 504 # vertices to fit both entry.
506 505 other_hex = nodemod.hex(index[entry][7])
507 506 other_rev = entry
508 507 new = Block()
509 508 block[hex_digit] = new
510 509 _insert_into_block(index, level + 1, new, other_rev, other_hex)
511 510 _insert_into_block(index, level + 1, new, current_rev, current_hex)
512 511 return changed
513 512
514 513
515 514 def _persist_trie(root, existing_idx=None):
516 515 """turn a nodemap trie into persistent binary data
517 516
518 517 See `_build_trie` for nodemap trie structure"""
519 518 block_map = {}
520 519 if existing_idx is not None:
521 520 base_idx = existing_idx + 1
522 521 else:
523 522 base_idx = 0
524 523 chunks = []
525 524 for tn in _walk_trie(root):
526 525 if tn.ondisk_id is not None:
527 526 block_map[id(tn)] = tn.ondisk_id
528 527 else:
529 528 block_map[id(tn)] = len(chunks) + base_idx
530 529 chunks.append(_persist_block(tn, block_map))
531 530 return b''.join(chunks)
532 531
533 532
534 533 def _walk_trie(block):
535 534 """yield all the block in a trie
536 535
537 536 Children blocks are always yield before their parent block.
538 537 """
539 538 for (__, item) in sorted(block.items()):
540 539 if isinstance(item, dict):
541 540 for sub_block in _walk_trie(item):
542 541 yield sub_block
543 542 yield block
544 543
545 544
546 545 def _persist_block(block_node, block_map):
547 546 """produce persistent binary data for a single block
548 547
549 548 Children block are assumed to be already persisted and present in
550 549 block_map.
551 550 """
552 551 data = tuple(_to_value(v, block_map) for v in block_node)
553 552 return S_BLOCK.pack(*data)
554 553
555 554
556 555 def _to_value(item, block_map):
557 556 """persist any value as an integer"""
558 557 if item is None:
559 558 return NO_ENTRY
560 559 elif isinstance(item, dict):
561 560 return block_map[id(item)]
562 561 else:
563 562 return _transform_rev(item)
564 563
565 564
566 565 def parse_data(data):
567 566 """parse parse nodemap data into a nodemap Trie"""
568 567 if (len(data) % S_BLOCK.size) != 0:
569 568 msg = "nodemap data size is not a multiple of block size (%d): %d"
570 569 raise error.Abort(msg % (S_BLOCK.size, len(data)))
571 570 if not data:
572 571 return Block(), None
573 572 block_map = {}
574 573 new_blocks = []
575 574 for i in range(0, len(data), S_BLOCK.size):
576 575 block = Block()
577 576 block.ondisk_id = len(block_map)
578 577 block_map[block.ondisk_id] = block
579 578 block_data = data[i : i + S_BLOCK.size]
580 579 values = S_BLOCK.unpack(block_data)
581 580 new_blocks.append((block, values))
582 581 for b, values in new_blocks:
583 582 for idx, v in enumerate(values):
584 583 if v == NO_ENTRY:
585 584 continue
586 585 elif v >= 0:
587 586 b[idx] = block_map[v]
588 587 else:
589 588 b[idx] = _transform_rev(v)
590 589 return block, i // S_BLOCK.size
591 590
592 591
593 592 # debug utility
594 593
595 594
596 595 def check_data(ui, index, data):
597 596 """verify that the provided nodemap data are valid for the given idex"""
598 597 ret = 0
599 598 ui.status((b"revision in index: %d\n") % len(index))
600 599 root, __ = parse_data(data)
601 600 all_revs = set(_all_revisions(root))
602 601 ui.status((b"revision in nodemap: %d\n") % len(all_revs))
603 602 for r in range(len(index)):
604 603 if r not in all_revs:
605 604 msg = b" revision missing from nodemap: %d\n" % r
606 605 ui.write_err(msg)
607 606 ret = 1
608 607 else:
609 608 all_revs.remove(r)
610 609 nm_rev = _find_node(root, nodemod.hex(index[r][7]))
611 610 if nm_rev is None:
612 611 msg = b" revision node does not match any entries: %d\n" % r
613 612 ui.write_err(msg)
614 613 ret = 1
615 614 elif nm_rev != r:
616 615 msg = (
617 616 b" revision node does not match the expected revision: "
618 617 b"%d != %d\n" % (r, nm_rev)
619 618 )
620 619 ui.write_err(msg)
621 620 ret = 1
622 621
623 622 if all_revs:
624 623 for r in sorted(all_revs):
625 624 msg = b" extra revision in nodemap: %d\n" % r
626 625 ui.write_err(msg)
627 626 ret = 1
628 627 return ret
629 628
630 629
631 630 def _all_revisions(root):
632 631 """return all revisions stored in a Trie"""
633 632 for block in _walk_trie(root):
634 633 for v in block:
635 634 if v is None or isinstance(v, Block):
636 635 continue
637 636 yield v
638 637
639 638
640 639 def _find_node(block, node):
641 640 """find the revision associated with a given node"""
642 641 entry = block.get(_to_int(node[0:1]))
643 642 if isinstance(entry, dict):
644 643 return _find_node(entry, node[1:])
645 644 return entry
General Comments 0
You need to be logged in to leave comments. Login now