##// END OF EJS Templates
persistent-nodemap: properly ignore non-existent `.nd` data file...
Simon Sapin -
r46705:8ff2d835 default
parent child Browse files
Show More
@@ -1,640 +1,642
1 1 # nodemap.py - nodemap related code and utilities
2 2 #
3 3 # Copyright 2019 Pierre-Yves David <pierre-yves.david@octobus.net>
4 4 # Copyright 2019 George Racinet <georges.racinet@octobus.net>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from __future__ import absolute_import
10 10
11 11 import errno
12 12 import os
13 13 import re
14 14 import struct
15 15
16 16 from ..i18n import _
17 17
18 18 from .. import (
19 19 error,
20 20 node as nodemod,
21 21 util,
22 22 )
23 23
24 24
25 25 class NodeMap(dict):
26 26 def __missing__(self, x):
27 27 raise error.RevlogError(b'unknown node: %s' % x)
28 28
29 29
30 30 def persisted_data(revlog):
31 31 """read the nodemap for a revlog from disk"""
32 32 if revlog.nodemap_file is None:
33 33 return None
34 34 pdata = revlog.opener.tryread(revlog.nodemap_file)
35 35 if not pdata:
36 36 return None
37 37 offset = 0
38 38 (version,) = S_VERSION.unpack(pdata[offset : offset + S_VERSION.size])
39 39 if version != ONDISK_VERSION:
40 40 return None
41 41 offset += S_VERSION.size
42 42 headers = S_HEADER.unpack(pdata[offset : offset + S_HEADER.size])
43 43 uid_size, tip_rev, data_length, data_unused, tip_node_size = headers
44 44 offset += S_HEADER.size
45 45 docket = NodeMapDocket(pdata[offset : offset + uid_size])
46 46 offset += uid_size
47 47 docket.tip_rev = tip_rev
48 48 docket.tip_node = pdata[offset : offset + tip_node_size]
49 49 docket.data_length = data_length
50 50 docket.data_unused = data_unused
51 51
52 52 filename = _rawdata_filepath(revlog, docket)
53 53 use_mmap = revlog.opener.options.get(b"persistent-nodemap.mmap")
54 54 try:
55 55 with revlog.opener(filename) as fd:
56 56 if use_mmap:
57 57 data = util.buffer(util.mmapread(fd, data_length))
58 58 else:
59 59 data = fd.read(data_length)
60 60 except OSError as e:
61 if e.errno != errno.ENOENT:
61 if e.errno == errno.ENOENT:
62 return None
63 else:
62 64 raise
63 65 if len(data) < data_length:
64 66 return None
65 67 return docket, data
66 68
67 69
68 70 def setup_persistent_nodemap(tr, revlog):
69 71 """Install whatever is needed transaction side to persist a nodemap on disk
70 72
71 73 (only actually persist the nodemap if this is relevant for this revlog)
72 74 """
73 75 if revlog._inline:
74 76 return # inlined revlog are too small for this to be relevant
75 77 if revlog.nodemap_file is None:
76 78 return # we do not use persistent_nodemap on this revlog
77 79
78 80 # we need to happen after the changelog finalization, in that use "cl-"
79 81 callback_id = b"nm-revlog-persistent-nodemap-%s" % revlog.nodemap_file
80 82 if tr.hasfinalize(callback_id):
81 83 return # no need to register again
82 84 tr.addpending(
83 85 callback_id, lambda tr: _persist_nodemap(tr, revlog, pending=True)
84 86 )
85 87 tr.addfinalize(callback_id, lambda tr: _persist_nodemap(tr, revlog))
86 88
87 89
88 90 class _NoTransaction(object):
89 91 """transaction like object to update the nodemap outside a transaction"""
90 92
91 93 def __init__(self):
92 94 self._postclose = {}
93 95
94 96 def addpostclose(self, callback_id, callback_func):
95 97 self._postclose[callback_id] = callback_func
96 98
97 99 def registertmp(self, *args, **kwargs):
98 100 pass
99 101
100 102 def addbackup(self, *args, **kwargs):
101 103 pass
102 104
103 105 def add(self, *args, **kwargs):
104 106 pass
105 107
106 108 def addabort(self, *args, **kwargs):
107 109 pass
108 110
109 111 def _report(self, *args):
110 112 pass
111 113
112 114
113 115 def update_persistent_nodemap(revlog):
114 116 """update the persistent nodemap right now
115 117
116 118 To be used for updating the nodemap on disk outside of a normal transaction
117 119 setup (eg, `debugupdatecache`).
118 120 """
119 121 if revlog._inline:
120 122 return # inlined revlog are too small for this to be relevant
121 123 if revlog.nodemap_file is None:
122 124 return # we do not use persistent_nodemap on this revlog
123 125
124 126 notr = _NoTransaction()
125 127 _persist_nodemap(notr, revlog)
126 128 for k in sorted(notr._postclose):
127 129 notr._postclose[k](None)
128 130
129 131
130 132 def _persist_nodemap(tr, revlog, pending=False):
131 133 """Write nodemap data on disk for a given revlog"""
132 134 if getattr(revlog, 'filteredrevs', ()):
133 135 raise error.ProgrammingError(
134 136 "cannot persist nodemap of a filtered changelog"
135 137 )
136 138 if revlog.nodemap_file is None:
137 139 msg = "calling persist nodemap on a revlog without the feature enableb"
138 140 raise error.ProgrammingError(msg)
139 141
140 142 can_incremental = util.safehasattr(revlog.index, "nodemap_data_incremental")
141 143 ondisk_docket = revlog._nodemap_docket
142 144 feed_data = util.safehasattr(revlog.index, "update_nodemap_data")
143 145 use_mmap = revlog.opener.options.get(b"persistent-nodemap.mmap")
144 146 mode = revlog.opener.options.get(b"persistent-nodemap.mode")
145 147 if not can_incremental:
146 148 msg = _(b"persistent nodemap in strict mode without efficient method")
147 149 if mode == b'warn':
148 150 tr._report(b"%s\n" % msg)
149 151 elif mode == b'strict':
150 152 raise error.Abort(msg)
151 153
152 154 data = None
153 155 # first attemp an incremental update of the data
154 156 if can_incremental and ondisk_docket is not None:
155 157 target_docket = revlog._nodemap_docket.copy()
156 158 (
157 159 src_docket,
158 160 data_changed_count,
159 161 data,
160 162 ) = revlog.index.nodemap_data_incremental()
161 163 new_length = target_docket.data_length + len(data)
162 164 new_unused = target_docket.data_unused + data_changed_count
163 165 if src_docket != target_docket:
164 166 data = None
165 167 elif new_length <= (new_unused * 10): # under 10% of unused data
166 168 data = None
167 169 else:
168 170 datafile = _rawdata_filepath(revlog, target_docket)
169 171 # EXP-TODO: if this is a cache, this should use a cache vfs, not a
170 172 # store vfs
171 173 tr.add(datafile, target_docket.data_length)
172 174 with revlog.opener(datafile, b'r+') as fd:
173 175 fd.seek(target_docket.data_length)
174 176 fd.write(data)
175 177 if feed_data:
176 178 if use_mmap:
177 179 fd.seek(0)
178 180 new_data = fd.read(new_length)
179 181 else:
180 182 fd.flush()
181 183 new_data = util.buffer(util.mmapread(fd, new_length))
182 184 target_docket.data_length = new_length
183 185 target_docket.data_unused = new_unused
184 186
185 187 if data is None:
186 188 # otherwise fallback to a full new export
187 189 target_docket = NodeMapDocket()
188 190 datafile = _rawdata_filepath(revlog, target_docket)
189 191 if util.safehasattr(revlog.index, "nodemap_data_all"):
190 192 data = revlog.index.nodemap_data_all()
191 193 else:
192 194 data = persistent_data(revlog.index)
193 195 # EXP-TODO: if this is a cache, this should use a cache vfs, not a
194 196 # store vfs
195 197
196 198 tryunlink = revlog.opener.tryunlink
197 199
198 200 def abortck(tr):
199 201 tryunlink(datafile)
200 202
201 203 callback_id = b"delete-%s" % datafile
202 204
203 205 # some flavor of the transaction abort does not cleanup new file, it
204 206 # simply empty them.
205 207 tr.addabort(callback_id, abortck)
206 208 with revlog.opener(datafile, b'w+') as fd:
207 209 fd.write(data)
208 210 if feed_data:
209 211 if use_mmap:
210 212 new_data = data
211 213 else:
212 214 fd.flush()
213 215 new_data = util.buffer(util.mmapread(fd, len(data)))
214 216 target_docket.data_length = len(data)
215 217 target_docket.tip_rev = revlog.tiprev()
216 218 target_docket.tip_node = revlog.node(target_docket.tip_rev)
217 219 # EXP-TODO: if this is a cache, this should use a cache vfs, not a
218 220 # store vfs
219 221 file_path = revlog.nodemap_file
220 222 if pending:
221 223 file_path += b'.a'
222 224 tr.registertmp(file_path)
223 225 else:
224 226 tr.addbackup(file_path)
225 227
226 228 with revlog.opener(file_path, b'w', atomictemp=True) as fp:
227 229 fp.write(target_docket.serialize())
228 230 revlog._nodemap_docket = target_docket
229 231 if feed_data:
230 232 revlog.index.update_nodemap_data(target_docket, new_data)
231 233
232 234 # search for old index file in all cases, some older process might have
233 235 # left one behind.
234 236 olds = _other_rawdata_filepath(revlog, target_docket)
235 237 if olds:
236 238 realvfs = getattr(revlog, '_realopener', revlog.opener)
237 239
238 240 def cleanup(tr):
239 241 for oldfile in olds:
240 242 realvfs.tryunlink(oldfile)
241 243
242 244 callback_id = b"revlog-cleanup-nodemap-%s" % revlog.nodemap_file
243 245 tr.addpostclose(callback_id, cleanup)
244 246
245 247
246 248 ### Nodemap docket file
247 249 #
248 250 # The nodemap data are stored on disk using 2 files:
249 251 #
250 252 # * a raw data files containing a persistent nodemap
251 253 # (see `Nodemap Trie` section)
252 254 #
253 255 # * a small "docket" file containing medatadata
254 256 #
255 257 # While the nodemap data can be multiple tens of megabytes, the "docket" is
256 258 # small, it is easy to update it automatically or to duplicated its content
257 259 # during a transaction.
258 260 #
259 261 # Multiple raw data can exist at the same time (The currently valid one and a
260 262 # new one beind used by an in progress transaction). To accomodate this, the
261 263 # filename hosting the raw data has a variable parts. The exact filename is
262 264 # specified inside the "docket" file.
263 265 #
264 266 # The docket file contains information to find, qualify and validate the raw
265 267 # data. Its content is currently very light, but it will expand as the on disk
266 268 # nodemap gains the necessary features to be used in production.
267 269
268 270 ONDISK_VERSION = 1
269 271 S_VERSION = struct.Struct(">B")
270 272 S_HEADER = struct.Struct(">BQQQQ")
271 273
272 274 ID_SIZE = 8
273 275
274 276
275 277 def _make_uid():
276 278 """return a new unique identifier.
277 279
278 280 The identifier is random and composed of ascii characters."""
279 281 return nodemod.hex(os.urandom(ID_SIZE))
280 282
281 283
282 284 class NodeMapDocket(object):
283 285 """metadata associated with persistent nodemap data
284 286
285 287 The persistent data may come from disk or be on their way to disk.
286 288 """
287 289
288 290 def __init__(self, uid=None):
289 291 if uid is None:
290 292 uid = _make_uid()
291 293 # a unique identifier for the data file:
292 294 # - When new data are appended, it is preserved.
293 295 # - When a new data file is created, a new identifier is generated.
294 296 self.uid = uid
295 297 # the tipmost revision stored in the data file. This revision and all
296 298 # revision before it are expected to be encoded in the data file.
297 299 self.tip_rev = None
298 300 # the node of that tipmost revision, if it mismatch the current index
299 301 # data the docket is not valid for the current index and should be
300 302 # discarded.
301 303 #
302 304 # note: this method is not perfect as some destructive operation could
303 305 # preserve the same tip_rev + tip_node while altering lower revision.
304 306 # However this multiple other caches have the same vulnerability (eg:
305 307 # brancmap cache).
306 308 self.tip_node = None
307 309 # the size (in bytes) of the persisted data to encode the nodemap valid
308 310 # for `tip_rev`.
309 311 # - data file shorter than this are corrupted,
310 312 # - any extra data should be ignored.
311 313 self.data_length = None
312 314 # the amount (in bytes) of "dead" data, still in the data file but no
313 315 # longer used for the nodemap.
314 316 self.data_unused = 0
315 317
316 318 def copy(self):
317 319 new = NodeMapDocket(uid=self.uid)
318 320 new.tip_rev = self.tip_rev
319 321 new.tip_node = self.tip_node
320 322 new.data_length = self.data_length
321 323 new.data_unused = self.data_unused
322 324 return new
323 325
324 326 def __cmp__(self, other):
325 327 if self.uid < other.uid:
326 328 return -1
327 329 if self.uid > other.uid:
328 330 return 1
329 331 elif self.data_length < other.data_length:
330 332 return -1
331 333 elif self.data_length > other.data_length:
332 334 return 1
333 335 return 0
334 336
335 337 def __eq__(self, other):
336 338 return self.uid == other.uid and self.data_length == other.data_length
337 339
338 340 def serialize(self):
339 341 """return serialized bytes for a docket using the passed uid"""
340 342 data = []
341 343 data.append(S_VERSION.pack(ONDISK_VERSION))
342 344 headers = (
343 345 len(self.uid),
344 346 self.tip_rev,
345 347 self.data_length,
346 348 self.data_unused,
347 349 len(self.tip_node),
348 350 )
349 351 data.append(S_HEADER.pack(*headers))
350 352 data.append(self.uid)
351 353 data.append(self.tip_node)
352 354 return b''.join(data)
353 355
354 356
355 357 def _rawdata_filepath(revlog, docket):
356 358 """The (vfs relative) nodemap's rawdata file for a given uid"""
357 359 if revlog.nodemap_file.endswith(b'.n.a'):
358 360 prefix = revlog.nodemap_file[:-4]
359 361 else:
360 362 prefix = revlog.nodemap_file[:-2]
361 363 return b"%s-%s.nd" % (prefix, docket.uid)
362 364
363 365
364 366 def _other_rawdata_filepath(revlog, docket):
365 367 prefix = revlog.nodemap_file[:-2]
366 368 pattern = re.compile(br"(^|/)%s-[0-9a-f]+\.nd$" % prefix)
367 369 new_file_path = _rawdata_filepath(revlog, docket)
368 370 new_file_name = revlog.opener.basename(new_file_path)
369 371 dirpath = revlog.opener.dirname(new_file_path)
370 372 others = []
371 373 for f in revlog.opener.listdir(dirpath):
372 374 if pattern.match(f) and f != new_file_name:
373 375 others.append(f)
374 376 return others
375 377
376 378
377 379 ### Nodemap Trie
378 380 #
379 381 # This is a simple reference implementation to compute and persist a nodemap
380 382 # trie. This reference implementation is write only. The python version of this
381 383 # is not expected to be actually used, since it wont provide performance
382 384 # improvement over existing non-persistent C implementation.
383 385 #
384 386 # The nodemap is persisted as Trie using 4bits-address/16-entries block. each
385 387 # revision can be adressed using its node shortest prefix.
386 388 #
387 389 # The trie is stored as a sequence of block. Each block contains 16 entries
388 390 # (signed 64bit integer, big endian). Each entry can be one of the following:
389 391 #
390 392 # * value >= 0 -> index of sub-block
391 393 # * value == -1 -> no value
392 394 # * value < -1 -> a revision value: rev = -(value+10)
393 395 #
394 396 # The implementation focus on simplicity, not on performance. A Rust
395 397 # implementation should provide a efficient version of the same binary
396 398 # persistence. This reference python implementation is never meant to be
397 399 # extensively use in production.
398 400
399 401
400 402 def persistent_data(index):
401 403 """return the persistent binary form for a nodemap for a given index"""
402 404 trie = _build_trie(index)
403 405 return _persist_trie(trie)
404 406
405 407
406 408 def update_persistent_data(index, root, max_idx, last_rev):
407 409 """return the incremental update for persistent nodemap from a given index"""
408 410 changed_block, trie = _update_trie(index, root, last_rev)
409 411 return (
410 412 changed_block * S_BLOCK.size,
411 413 _persist_trie(trie, existing_idx=max_idx),
412 414 )
413 415
414 416
415 417 S_BLOCK = struct.Struct(">" + ("l" * 16))
416 418
417 419 NO_ENTRY = -1
418 420 # rev 0 need to be -2 because 0 is used by block, -1 is a special value.
419 421 REV_OFFSET = 2
420 422
421 423
422 424 def _transform_rev(rev):
423 425 """Return the number used to represent the rev in the tree.
424 426
425 427 (or retrieve a rev number from such representation)
426 428
427 429 Note that this is an involution, a function equal to its inverse (i.e.
428 430 which gives the identity when applied to itself).
429 431 """
430 432 return -(rev + REV_OFFSET)
431 433
432 434
433 435 def _to_int(hex_digit):
434 436 """turn an hexadecimal digit into a proper integer"""
435 437 return int(hex_digit, 16)
436 438
437 439
438 440 class Block(dict):
439 441 """represent a block of the Trie
440 442
441 443 contains up to 16 entry indexed from 0 to 15"""
442 444
443 445 def __init__(self):
444 446 super(Block, self).__init__()
445 447 # If this block exist on disk, here is its ID
446 448 self.ondisk_id = None
447 449
448 450 def __iter__(self):
449 451 return iter(self.get(i) for i in range(16))
450 452
451 453
452 454 def _build_trie(index):
453 455 """build a nodemap trie
454 456
455 457 The nodemap stores revision number for each unique prefix.
456 458
457 459 Each block is a dictionary with keys in `[0, 15]`. Values are either
458 460 another block or a revision number.
459 461 """
460 462 root = Block()
461 463 for rev in range(len(index)):
462 464 hex = nodemod.hex(index[rev][7])
463 465 _insert_into_block(index, 0, root, rev, hex)
464 466 return root
465 467
466 468
467 469 def _update_trie(index, root, last_rev):
468 470 """consume"""
469 471 changed = 0
470 472 for rev in range(last_rev + 1, len(index)):
471 473 hex = nodemod.hex(index[rev][7])
472 474 changed += _insert_into_block(index, 0, root, rev, hex)
473 475 return changed, root
474 476
475 477
476 478 def _insert_into_block(index, level, block, current_rev, current_hex):
477 479 """insert a new revision in a block
478 480
479 481 index: the index we are adding revision for
480 482 level: the depth of the current block in the trie
481 483 block: the block currently being considered
482 484 current_rev: the revision number we are adding
483 485 current_hex: the hexadecimal representation of the of that revision
484 486 """
485 487 changed = 1
486 488 if block.ondisk_id is not None:
487 489 block.ondisk_id = None
488 490 hex_digit = _to_int(current_hex[level : level + 1])
489 491 entry = block.get(hex_digit)
490 492 if entry is None:
491 493 # no entry, simply store the revision number
492 494 block[hex_digit] = current_rev
493 495 elif isinstance(entry, dict):
494 496 # need to recurse to an underlying block
495 497 changed += _insert_into_block(
496 498 index, level + 1, entry, current_rev, current_hex
497 499 )
498 500 else:
499 501 # collision with a previously unique prefix, inserting new
500 502 # vertices to fit both entry.
501 503 other_hex = nodemod.hex(index[entry][7])
502 504 other_rev = entry
503 505 new = Block()
504 506 block[hex_digit] = new
505 507 _insert_into_block(index, level + 1, new, other_rev, other_hex)
506 508 _insert_into_block(index, level + 1, new, current_rev, current_hex)
507 509 return changed
508 510
509 511
510 512 def _persist_trie(root, existing_idx=None):
511 513 """turn a nodemap trie into persistent binary data
512 514
513 515 See `_build_trie` for nodemap trie structure"""
514 516 block_map = {}
515 517 if existing_idx is not None:
516 518 base_idx = existing_idx + 1
517 519 else:
518 520 base_idx = 0
519 521 chunks = []
520 522 for tn in _walk_trie(root):
521 523 if tn.ondisk_id is not None:
522 524 block_map[id(tn)] = tn.ondisk_id
523 525 else:
524 526 block_map[id(tn)] = len(chunks) + base_idx
525 527 chunks.append(_persist_block(tn, block_map))
526 528 return b''.join(chunks)
527 529
528 530
529 531 def _walk_trie(block):
530 532 """yield all the block in a trie
531 533
532 534 Children blocks are always yield before their parent block.
533 535 """
534 536 for (__, item) in sorted(block.items()):
535 537 if isinstance(item, dict):
536 538 for sub_block in _walk_trie(item):
537 539 yield sub_block
538 540 yield block
539 541
540 542
541 543 def _persist_block(block_node, block_map):
542 544 """produce persistent binary data for a single block
543 545
544 546 Children block are assumed to be already persisted and present in
545 547 block_map.
546 548 """
547 549 data = tuple(_to_value(v, block_map) for v in block_node)
548 550 return S_BLOCK.pack(*data)
549 551
550 552
551 553 def _to_value(item, block_map):
552 554 """persist any value as an integer"""
553 555 if item is None:
554 556 return NO_ENTRY
555 557 elif isinstance(item, dict):
556 558 return block_map[id(item)]
557 559 else:
558 560 return _transform_rev(item)
559 561
560 562
561 563 def parse_data(data):
562 564 """parse parse nodemap data into a nodemap Trie"""
563 565 if (len(data) % S_BLOCK.size) != 0:
564 566 msg = "nodemap data size is not a multiple of block size (%d): %d"
565 567 raise error.Abort(msg % (S_BLOCK.size, len(data)))
566 568 if not data:
567 569 return Block(), None
568 570 block_map = {}
569 571 new_blocks = []
570 572 for i in range(0, len(data), S_BLOCK.size):
571 573 block = Block()
572 574 block.ondisk_id = len(block_map)
573 575 block_map[block.ondisk_id] = block
574 576 block_data = data[i : i + S_BLOCK.size]
575 577 values = S_BLOCK.unpack(block_data)
576 578 new_blocks.append((block, values))
577 579 for b, values in new_blocks:
578 580 for idx, v in enumerate(values):
579 581 if v == NO_ENTRY:
580 582 continue
581 583 elif v >= 0:
582 584 b[idx] = block_map[v]
583 585 else:
584 586 b[idx] = _transform_rev(v)
585 587 return block, i // S_BLOCK.size
586 588
587 589
588 590 # debug utility
589 591
590 592
591 593 def check_data(ui, index, data):
592 594 """verify that the provided nodemap data are valid for the given idex"""
593 595 ret = 0
594 596 ui.status((b"revision in index: %d\n") % len(index))
595 597 root, __ = parse_data(data)
596 598 all_revs = set(_all_revisions(root))
597 599 ui.status((b"revision in nodemap: %d\n") % len(all_revs))
598 600 for r in range(len(index)):
599 601 if r not in all_revs:
600 602 msg = b" revision missing from nodemap: %d\n" % r
601 603 ui.write_err(msg)
602 604 ret = 1
603 605 else:
604 606 all_revs.remove(r)
605 607 nm_rev = _find_node(root, nodemod.hex(index[r][7]))
606 608 if nm_rev is None:
607 609 msg = b" revision node does not match any entries: %d\n" % r
608 610 ui.write_err(msg)
609 611 ret = 1
610 612 elif nm_rev != r:
611 613 msg = (
612 614 b" revision node does not match the expected revision: "
613 615 b"%d != %d\n" % (r, nm_rev)
614 616 )
615 617 ui.write_err(msg)
616 618 ret = 1
617 619
618 620 if all_revs:
619 621 for r in sorted(all_revs):
620 622 msg = b" extra revision in nodemap: %d\n" % r
621 623 ui.write_err(msg)
622 624 ret = 1
623 625 return ret
624 626
625 627
626 628 def _all_revisions(root):
627 629 """return all revisions stored in a Trie"""
628 630 for block in _walk_trie(root):
629 631 for v in block:
630 632 if v is None or isinstance(v, Block):
631 633 continue
632 634 yield v
633 635
634 636
635 637 def _find_node(block, node):
636 638 """find the revision associated with a given node"""
637 639 entry = block.get(_to_int(node[0:1]))
638 640 if isinstance(entry, dict):
639 641 return _find_node(entry, node[1:])
640 642 return entry
General Comments 0
You need to be logged in to leave comments. Login now