##// END OF EJS Templates
nodemap: skip persistent nodemap warming for revlog not using it...
marmoute -
r45247:b4537125 stable
parent child Browse files
Show More
@@ -1,628 +1,633
1 1 # nodemap.py - nodemap related code and utilities
2 2 #
3 3 # Copyright 2019 Pierre-Yves David <pierre-yves.david@octobus.net>
4 4 # Copyright 2019 George Racinet <georges.racinet@octobus.net>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from __future__ import absolute_import
10 10
11 11 import errno
12 12 import os
13 13 import re
14 14 import struct
15 15
16 16 from .. import (
17 17 error,
18 18 node as nodemod,
19 19 util,
20 20 )
21 21
22 22
23 23 class NodeMap(dict):
24 24 def __missing__(self, x):
25 25 raise error.RevlogError(b'unknown node: %s' % x)
26 26
27 27
28 28 def persisted_data(revlog):
29 29 """read the nodemap for a revlog from disk"""
30 30 if revlog.nodemap_file is None:
31 31 return None
32 32 pdata = revlog.opener.tryread(revlog.nodemap_file)
33 33 if not pdata:
34 34 return None
35 35 offset = 0
36 36 (version,) = S_VERSION.unpack(pdata[offset : offset + S_VERSION.size])
37 37 if version != ONDISK_VERSION:
38 38 return None
39 39 offset += S_VERSION.size
40 40 headers = S_HEADER.unpack(pdata[offset : offset + S_HEADER.size])
41 41 uid_size, tip_rev, data_length, data_unused, tip_node_size = headers
42 42 offset += S_HEADER.size
43 43 docket = NodeMapDocket(pdata[offset : offset + uid_size])
44 44 offset += uid_size
45 45 docket.tip_rev = tip_rev
46 46 docket.tip_node = pdata[offset : offset + tip_node_size]
47 47 docket.data_length = data_length
48 48 docket.data_unused = data_unused
49 49
50 50 filename = _rawdata_filepath(revlog, docket)
51 51 use_mmap = revlog.opener.options.get(b"exp-persistent-nodemap.mmap")
52 52 try:
53 53 with revlog.opener(filename) as fd:
54 54 if use_mmap:
55 55 data = util.buffer(util.mmapread(fd, data_length))
56 56 else:
57 57 data = fd.read(data_length)
58 58 except OSError as e:
59 59 if e.errno != errno.ENOENT:
60 60 raise
61 61 if len(data) < data_length:
62 62 return None
63 63 return docket, data
64 64
65 65
66 66 def setup_persistent_nodemap(tr, revlog):
67 67 """Install whatever is needed transaction side to persist a nodemap on disk
68 68
69 69 (only actually persist the nodemap if this is relevant for this revlog)
70 70 """
71 71 if revlog._inline:
72 72 return # inlined revlog are too small for this to be relevant
73 73 if revlog.nodemap_file is None:
74 74 return # we do not use persistent_nodemap on this revlog
75 75
76 76 # we need to happen after the changelog finalization, in that use "cl-"
77 77 callback_id = b"nm-revlog-persistent-nodemap-%s" % revlog.nodemap_file
78 78 if tr.hasfinalize(callback_id):
79 79 return # no need to register again
80 80 tr.addpending(
81 81 callback_id, lambda tr: _persist_nodemap(tr, revlog, pending=True)
82 82 )
83 83 tr.addfinalize(callback_id, lambda tr: _persist_nodemap(tr, revlog))
84 84
85 85
86 86 class _NoTransaction(object):
87 87 """transaction like object to update the nodemap outside a transaction
88 88 """
89 89
90 90 def __init__(self):
91 91 self._postclose = {}
92 92
93 93 def addpostclose(self, callback_id, callback_func):
94 94 self._postclose[callback_id] = callback_func
95 95
96 96 def registertmp(self, *args, **kwargs):
97 97 pass
98 98
99 99 def addbackup(self, *args, **kwargs):
100 100 pass
101 101
102 102 def add(self, *args, **kwargs):
103 103 pass
104 104
105 105 def addabort(self, *args, **kwargs):
106 106 pass
107 107
108 108
109 109 def update_persistent_nodemap(revlog):
110 110 """update the persistent nodemap right now
111 111
112 112 To be used for updating the nodemap on disk outside of a normal transaction
113 113 setup (eg, `debugupdatecache`).
114 114 """
115 if revlog._inline:
116 return # inlined revlog are too small for this to be relevant
117 if revlog.nodemap_file is None:
118 return # we do not use persistent_nodemap on this revlog
119
115 120 notr = _NoTransaction()
116 121 _persist_nodemap(notr, revlog)
117 122 for k in sorted(notr._postclose):
118 123 notr._postclose[k](None)
119 124
120 125
121 126 def _persist_nodemap(tr, revlog, pending=False):
122 127 """Write nodemap data on disk for a given revlog
123 128 """
124 129 if getattr(revlog, 'filteredrevs', ()):
125 130 raise error.ProgrammingError(
126 131 "cannot persist nodemap of a filtered changelog"
127 132 )
128 133 if revlog.nodemap_file is None:
129 134 msg = "calling persist nodemap on a revlog without the feature enableb"
130 135 raise error.ProgrammingError(msg)
131 136
132 137 can_incremental = util.safehasattr(revlog.index, "nodemap_data_incremental")
133 138 ondisk_docket = revlog._nodemap_docket
134 139 feed_data = util.safehasattr(revlog.index, "update_nodemap_data")
135 140 use_mmap = revlog.opener.options.get(b"exp-persistent-nodemap.mmap")
136 141
137 142 data = None
138 143 # first attemp an incremental update of the data
139 144 if can_incremental and ondisk_docket is not None:
140 145 target_docket = revlog._nodemap_docket.copy()
141 146 (
142 147 src_docket,
143 148 data_changed_count,
144 149 data,
145 150 ) = revlog.index.nodemap_data_incremental()
146 151 new_length = target_docket.data_length + len(data)
147 152 new_unused = target_docket.data_unused + data_changed_count
148 153 if src_docket != target_docket:
149 154 data = None
150 155 elif new_length <= (new_unused * 10): # under 10% of unused data
151 156 data = None
152 157 else:
153 158 datafile = _rawdata_filepath(revlog, target_docket)
154 159 # EXP-TODO: if this is a cache, this should use a cache vfs, not a
155 160 # store vfs
156 161 tr.add(datafile, target_docket.data_length)
157 162 with revlog.opener(datafile, b'r+') as fd:
158 163 fd.seek(target_docket.data_length)
159 164 fd.write(data)
160 165 if feed_data:
161 166 if use_mmap:
162 167 fd.seek(0)
163 168 new_data = fd.read(new_length)
164 169 else:
165 170 fd.flush()
166 171 new_data = util.buffer(util.mmapread(fd, new_length))
167 172 target_docket.data_length = new_length
168 173 target_docket.data_unused = new_unused
169 174
170 175 if data is None:
171 176 # otherwise fallback to a full new export
172 177 target_docket = NodeMapDocket()
173 178 datafile = _rawdata_filepath(revlog, target_docket)
174 179 if util.safehasattr(revlog.index, "nodemap_data_all"):
175 180 data = revlog.index.nodemap_data_all()
176 181 else:
177 182 data = persistent_data(revlog.index)
178 183 # EXP-TODO: if this is a cache, this should use a cache vfs, not a
179 184 # store vfs
180 185
181 186 tryunlink = revlog.opener.tryunlink
182 187
183 188 def abortck(tr):
184 189 tryunlink(datafile)
185 190
186 191 callback_id = b"delete-%s" % datafile
187 192
188 193 # some flavor of the transaction abort does not cleanup new file, it
189 194 # simply empty them.
190 195 tr.addabort(callback_id, abortck)
191 196 with revlog.opener(datafile, b'w+') as fd:
192 197 fd.write(data)
193 198 if feed_data:
194 199 if use_mmap:
195 200 new_data = data
196 201 else:
197 202 fd.flush()
198 203 new_data = util.buffer(util.mmapread(fd, len(data)))
199 204 target_docket.data_length = len(data)
200 205 target_docket.tip_rev = revlog.tiprev()
201 206 target_docket.tip_node = revlog.node(target_docket.tip_rev)
202 207 # EXP-TODO: if this is a cache, this should use a cache vfs, not a
203 208 # store vfs
204 209 file_path = revlog.nodemap_file
205 210 if pending:
206 211 file_path += b'.a'
207 212 tr.registertmp(file_path)
208 213 else:
209 214 tr.addbackup(file_path)
210 215
211 216 with revlog.opener(file_path, b'w', atomictemp=True) as fp:
212 217 fp.write(target_docket.serialize())
213 218 revlog._nodemap_docket = target_docket
214 219 if feed_data:
215 220 revlog.index.update_nodemap_data(target_docket, new_data)
216 221
217 222 # search for old index file in all cases, some older process might have
218 223 # left one behind.
219 224 olds = _other_rawdata_filepath(revlog, target_docket)
220 225 if olds:
221 226 realvfs = getattr(revlog, '_realopener', revlog.opener)
222 227
223 228 def cleanup(tr):
224 229 for oldfile in olds:
225 230 realvfs.tryunlink(oldfile)
226 231
227 232 callback_id = b"revlog-cleanup-nodemap-%s" % revlog.nodemap_file
228 233 tr.addpostclose(callback_id, cleanup)
229 234
230 235
231 236 ### Nodemap docket file
232 237 #
233 238 # The nodemap data are stored on disk using 2 files:
234 239 #
235 240 # * a raw data files containing a persistent nodemap
236 241 # (see `Nodemap Trie` section)
237 242 #
238 243 # * a small "docket" file containing medatadata
239 244 #
240 245 # While the nodemap data can be multiple tens of megabytes, the "docket" is
241 246 # small, it is easy to update it automatically or to duplicated its content
242 247 # during a transaction.
243 248 #
244 249 # Multiple raw data can exist at the same time (The currently valid one and a
245 250 # new one beind used by an in progress transaction). To accomodate this, the
246 251 # filename hosting the raw data has a variable parts. The exact filename is
247 252 # specified inside the "docket" file.
248 253 #
249 254 # The docket file contains information to find, qualify and validate the raw
250 255 # data. Its content is currently very light, but it will expand as the on disk
251 256 # nodemap gains the necessary features to be used in production.
252 257
253 258 # version 0 is experimental, no BC garantee, do no use outside of tests.
254 259 ONDISK_VERSION = 0
255 260 S_VERSION = struct.Struct(">B")
256 261 S_HEADER = struct.Struct(">BQQQQ")
257 262
258 263 ID_SIZE = 8
259 264
260 265
261 266 def _make_uid():
262 267 """return a new unique identifier.
263 268
264 269 The identifier is random and composed of ascii characters."""
265 270 return nodemod.hex(os.urandom(ID_SIZE))
266 271
267 272
268 273 class NodeMapDocket(object):
269 274 """metadata associated with persistent nodemap data
270 275
271 276 The persistent data may come from disk or be on their way to disk.
272 277 """
273 278
274 279 def __init__(self, uid=None):
275 280 if uid is None:
276 281 uid = _make_uid()
277 282 # a unique identifier for the data file:
278 283 # - When new data are appended, it is preserved.
279 284 # - When a new data file is created, a new identifier is generated.
280 285 self.uid = uid
281 286 # the tipmost revision stored in the data file. This revision and all
282 287 # revision before it are expected to be encoded in the data file.
283 288 self.tip_rev = None
284 289 # the node of that tipmost revision, if it mismatch the current index
285 290 # data the docket is not valid for the current index and should be
286 291 # discarded.
287 292 #
288 293 # note: this method is not perfect as some destructive operation could
289 294 # preserve the same tip_rev + tip_node while altering lower revision.
290 295 # However this multiple other caches have the same vulnerability (eg:
291 296 # brancmap cache).
292 297 self.tip_node = None
293 298 # the size (in bytes) of the persisted data to encode the nodemap valid
294 299 # for `tip_rev`.
295 300 # - data file shorter than this are corrupted,
296 301 # - any extra data should be ignored.
297 302 self.data_length = None
298 303 # the amount (in bytes) of "dead" data, still in the data file but no
299 304 # longer used for the nodemap.
300 305 self.data_unused = 0
301 306
302 307 def copy(self):
303 308 new = NodeMapDocket(uid=self.uid)
304 309 new.tip_rev = self.tip_rev
305 310 new.tip_node = self.tip_node
306 311 new.data_length = self.data_length
307 312 new.data_unused = self.data_unused
308 313 return new
309 314
310 315 def __cmp__(self, other):
311 316 if self.uid < other.uid:
312 317 return -1
313 318 if self.uid > other.uid:
314 319 return 1
315 320 elif self.data_length < other.data_length:
316 321 return -1
317 322 elif self.data_length > other.data_length:
318 323 return 1
319 324 return 0
320 325
321 326 def __eq__(self, other):
322 327 return self.uid == other.uid and self.data_length == other.data_length
323 328
324 329 def serialize(self):
325 330 """return serialized bytes for a docket using the passed uid"""
326 331 data = []
327 332 data.append(S_VERSION.pack(ONDISK_VERSION))
328 333 headers = (
329 334 len(self.uid),
330 335 self.tip_rev,
331 336 self.data_length,
332 337 self.data_unused,
333 338 len(self.tip_node),
334 339 )
335 340 data.append(S_HEADER.pack(*headers))
336 341 data.append(self.uid)
337 342 data.append(self.tip_node)
338 343 return b''.join(data)
339 344
340 345
341 346 def _rawdata_filepath(revlog, docket):
342 347 """The (vfs relative) nodemap's rawdata file for a given uid"""
343 348 if revlog.nodemap_file.endswith(b'.n.a'):
344 349 prefix = revlog.nodemap_file[:-4]
345 350 else:
346 351 prefix = revlog.nodemap_file[:-2]
347 352 return b"%s-%s.nd" % (prefix, docket.uid)
348 353
349 354
350 355 def _other_rawdata_filepath(revlog, docket):
351 356 prefix = revlog.nodemap_file[:-2]
352 357 pattern = re.compile(br"(^|/)%s-[0-9a-f]+\.nd$" % prefix)
353 358 new_file_path = _rawdata_filepath(revlog, docket)
354 359 new_file_name = revlog.opener.basename(new_file_path)
355 360 dirpath = revlog.opener.dirname(new_file_path)
356 361 others = []
357 362 for f in revlog.opener.listdir(dirpath):
358 363 if pattern.match(f) and f != new_file_name:
359 364 others.append(f)
360 365 return others
361 366
362 367
363 368 ### Nodemap Trie
364 369 #
365 370 # This is a simple reference implementation to compute and persist a nodemap
366 371 # trie. This reference implementation is write only. The python version of this
367 372 # is not expected to be actually used, since it wont provide performance
368 373 # improvement over existing non-persistent C implementation.
369 374 #
370 375 # The nodemap is persisted as Trie using 4bits-address/16-entries block. each
371 376 # revision can be adressed using its node shortest prefix.
372 377 #
373 378 # The trie is stored as a sequence of block. Each block contains 16 entries
374 379 # (signed 64bit integer, big endian). Each entry can be one of the following:
375 380 #
376 381 # * value >= 0 -> index of sub-block
377 382 # * value == -1 -> no value
378 383 # * value < -1 -> a revision value: rev = -(value+10)
379 384 #
380 385 # The implementation focus on simplicity, not on performance. A Rust
381 386 # implementation should provide a efficient version of the same binary
382 387 # persistence. This reference python implementation is never meant to be
383 388 # extensively use in production.
384 389
385 390
386 391 def persistent_data(index):
387 392 """return the persistent binary form for a nodemap for a given index
388 393 """
389 394 trie = _build_trie(index)
390 395 return _persist_trie(trie)
391 396
392 397
393 398 def update_persistent_data(index, root, max_idx, last_rev):
394 399 """return the incremental update for persistent nodemap from a given index
395 400 """
396 401 changed_block, trie = _update_trie(index, root, last_rev)
397 402 return (
398 403 changed_block * S_BLOCK.size,
399 404 _persist_trie(trie, existing_idx=max_idx),
400 405 )
401 406
402 407
403 408 S_BLOCK = struct.Struct(">" + ("l" * 16))
404 409
405 410 NO_ENTRY = -1
406 411 # rev 0 need to be -2 because 0 is used by block, -1 is a special value.
407 412 REV_OFFSET = 2
408 413
409 414
410 415 def _transform_rev(rev):
411 416 """Return the number used to represent the rev in the tree.
412 417
413 418 (or retrieve a rev number from such representation)
414 419
415 420 Note that this is an involution, a function equal to its inverse (i.e.
416 421 which gives the identity when applied to itself).
417 422 """
418 423 return -(rev + REV_OFFSET)
419 424
420 425
421 426 def _to_int(hex_digit):
422 427 """turn an hexadecimal digit into a proper integer"""
423 428 return int(hex_digit, 16)
424 429
425 430
426 431 class Block(dict):
427 432 """represent a block of the Trie
428 433
429 434 contains up to 16 entry indexed from 0 to 15"""
430 435
431 436 def __init__(self):
432 437 super(Block, self).__init__()
433 438 # If this block exist on disk, here is its ID
434 439 self.ondisk_id = None
435 440
436 441 def __iter__(self):
437 442 return iter(self.get(i) for i in range(16))
438 443
439 444
440 445 def _build_trie(index):
441 446 """build a nodemap trie
442 447
443 448 The nodemap stores revision number for each unique prefix.
444 449
445 450 Each block is a dictionary with keys in `[0, 15]`. Values are either
446 451 another block or a revision number.
447 452 """
448 453 root = Block()
449 454 for rev in range(len(index)):
450 455 hex = nodemod.hex(index[rev][7])
451 456 _insert_into_block(index, 0, root, rev, hex)
452 457 return root
453 458
454 459
455 460 def _update_trie(index, root, last_rev):
456 461 """consume"""
457 462 changed = 0
458 463 for rev in range(last_rev + 1, len(index)):
459 464 hex = nodemod.hex(index[rev][7])
460 465 changed += _insert_into_block(index, 0, root, rev, hex)
461 466 return changed, root
462 467
463 468
464 469 def _insert_into_block(index, level, block, current_rev, current_hex):
465 470 """insert a new revision in a block
466 471
467 472 index: the index we are adding revision for
468 473 level: the depth of the current block in the trie
469 474 block: the block currently being considered
470 475 current_rev: the revision number we are adding
471 476 current_hex: the hexadecimal representation of the of that revision
472 477 """
473 478 changed = 1
474 479 if block.ondisk_id is not None:
475 480 block.ondisk_id = None
476 481 hex_digit = _to_int(current_hex[level : level + 1])
477 482 entry = block.get(hex_digit)
478 483 if entry is None:
479 484 # no entry, simply store the revision number
480 485 block[hex_digit] = current_rev
481 486 elif isinstance(entry, dict):
482 487 # need to recurse to an underlying block
483 488 changed += _insert_into_block(
484 489 index, level + 1, entry, current_rev, current_hex
485 490 )
486 491 else:
487 492 # collision with a previously unique prefix, inserting new
488 493 # vertices to fit both entry.
489 494 other_hex = nodemod.hex(index[entry][7])
490 495 other_rev = entry
491 496 new = Block()
492 497 block[hex_digit] = new
493 498 _insert_into_block(index, level + 1, new, other_rev, other_hex)
494 499 _insert_into_block(index, level + 1, new, current_rev, current_hex)
495 500 return changed
496 501
497 502
498 503 def _persist_trie(root, existing_idx=None):
499 504 """turn a nodemap trie into persistent binary data
500 505
501 506 See `_build_trie` for nodemap trie structure"""
502 507 block_map = {}
503 508 if existing_idx is not None:
504 509 base_idx = existing_idx + 1
505 510 else:
506 511 base_idx = 0
507 512 chunks = []
508 513 for tn in _walk_trie(root):
509 514 if tn.ondisk_id is not None:
510 515 block_map[id(tn)] = tn.ondisk_id
511 516 else:
512 517 block_map[id(tn)] = len(chunks) + base_idx
513 518 chunks.append(_persist_block(tn, block_map))
514 519 return b''.join(chunks)
515 520
516 521
517 522 def _walk_trie(block):
518 523 """yield all the block in a trie
519 524
520 525 Children blocks are always yield before their parent block.
521 526 """
522 527 for (__, item) in sorted(block.items()):
523 528 if isinstance(item, dict):
524 529 for sub_block in _walk_trie(item):
525 530 yield sub_block
526 531 yield block
527 532
528 533
529 534 def _persist_block(block_node, block_map):
530 535 """produce persistent binary data for a single block
531 536
532 537 Children block are assumed to be already persisted and present in
533 538 block_map.
534 539 """
535 540 data = tuple(_to_value(v, block_map) for v in block_node)
536 541 return S_BLOCK.pack(*data)
537 542
538 543
539 544 def _to_value(item, block_map):
540 545 """persist any value as an integer"""
541 546 if item is None:
542 547 return NO_ENTRY
543 548 elif isinstance(item, dict):
544 549 return block_map[id(item)]
545 550 else:
546 551 return _transform_rev(item)
547 552
548 553
549 554 def parse_data(data):
550 555 """parse parse nodemap data into a nodemap Trie"""
551 556 if (len(data) % S_BLOCK.size) != 0:
552 557 msg = "nodemap data size is not a multiple of block size (%d): %d"
553 558 raise error.Abort(msg % (S_BLOCK.size, len(data)))
554 559 if not data:
555 560 return Block(), None
556 561 block_map = {}
557 562 new_blocks = []
558 563 for i in range(0, len(data), S_BLOCK.size):
559 564 block = Block()
560 565 block.ondisk_id = len(block_map)
561 566 block_map[block.ondisk_id] = block
562 567 block_data = data[i : i + S_BLOCK.size]
563 568 values = S_BLOCK.unpack(block_data)
564 569 new_blocks.append((block, values))
565 570 for b, values in new_blocks:
566 571 for idx, v in enumerate(values):
567 572 if v == NO_ENTRY:
568 573 continue
569 574 elif v >= 0:
570 575 b[idx] = block_map[v]
571 576 else:
572 577 b[idx] = _transform_rev(v)
573 578 return block, i // S_BLOCK.size
574 579
575 580
576 581 # debug utility
577 582
578 583
579 584 def check_data(ui, index, data):
580 585 """verify that the provided nodemap data are valid for the given idex"""
581 586 ret = 0
582 587 ui.status((b"revision in index: %d\n") % len(index))
583 588 root, __ = parse_data(data)
584 589 all_revs = set(_all_revisions(root))
585 590 ui.status((b"revision in nodemap: %d\n") % len(all_revs))
586 591 for r in range(len(index)):
587 592 if r not in all_revs:
588 593 msg = b" revision missing from nodemap: %d\n" % r
589 594 ui.write_err(msg)
590 595 ret = 1
591 596 else:
592 597 all_revs.remove(r)
593 598 nm_rev = _find_node(root, nodemod.hex(index[r][7]))
594 599 if nm_rev is None:
595 600 msg = b" revision node does not match any entries: %d\n" % r
596 601 ui.write_err(msg)
597 602 ret = 1
598 603 elif nm_rev != r:
599 604 msg = (
600 605 b" revision node does not match the expected revision: "
601 606 b"%d != %d\n" % (r, nm_rev)
602 607 )
603 608 ui.write_err(msg)
604 609 ret = 1
605 610
606 611 if all_revs:
607 612 for r in sorted(all_revs):
608 613 msg = b" extra revision in nodemap: %d\n" % r
609 614 ui.write_err(msg)
610 615 ret = 1
611 616 return ret
612 617
613 618
614 619 def _all_revisions(root):
615 620 """return all revisions stored in a Trie"""
616 621 for block in _walk_trie(root):
617 622 for v in block:
618 623 if v is None or isinstance(v, Block):
619 624 continue
620 625 yield v
621 626
622 627
623 628 def _find_node(block, node):
624 629 """find the revision associated with a given node"""
625 630 entry = block.get(_to_int(node[0:1]))
626 631 if isinstance(entry, dict):
627 632 return _find_node(entry, node[1:])
628 633 return entry
General Comments 0
You need to be logged in to leave comments. Login now