##// END OF EJS Templates
nodemap: access the mmap opener option using bytes...
marmoute -
r45187:0792ad55 default
parent child Browse files
Show More
@@ -1,628 +1,628
1 1 # nodemap.py - nodemap related code and utilities
2 2 #
3 3 # Copyright 2019 Pierre-Yves David <pierre-yves.david@octobus.net>
4 4 # Copyright 2019 George Racinet <georges.racinet@octobus.net>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from __future__ import absolute_import
10 10
11 11 import errno
12 12 import os
13 13 import re
14 14 import struct
15 15
16 16 from .. import (
17 17 error,
18 18 node as nodemod,
19 19 util,
20 20 )
21 21
22 22
23 23 class NodeMap(dict):
24 24 def __missing__(self, x):
25 25 raise error.RevlogError(b'unknown node: %s' % x)
26 26
27 27
28 28 def persisted_data(revlog):
29 29 """read the nodemap for a revlog from disk"""
30 30 if revlog.nodemap_file is None:
31 31 return None
32 32 pdata = revlog.opener.tryread(revlog.nodemap_file)
33 33 if not pdata:
34 34 return None
35 35 offset = 0
36 36 (version,) = S_VERSION.unpack(pdata[offset : offset + S_VERSION.size])
37 37 if version != ONDISK_VERSION:
38 38 return None
39 39 offset += S_VERSION.size
40 40 headers = S_HEADER.unpack(pdata[offset : offset + S_HEADER.size])
41 41 uid_size, tip_rev, data_length, data_unused, tip_node_size = headers
42 42 offset += S_HEADER.size
43 43 docket = NodeMapDocket(pdata[offset : offset + uid_size])
44 44 offset += uid_size
45 45 docket.tip_rev = tip_rev
46 46 docket.tip_node = pdata[offset : offset + tip_node_size]
47 47 docket.data_length = data_length
48 48 docket.data_unused = data_unused
49 49
50 50 filename = _rawdata_filepath(revlog, docket)
51 use_mmap = revlog.opener.options.get("exp-persistent-nodemap.mmap")
51 use_mmap = revlog.opener.options.get(b"exp-persistent-nodemap.mmap")
52 52 try:
53 53 with revlog.opener(filename) as fd:
54 54 if use_mmap:
55 55 data = util.buffer(util.mmapread(fd, data_length))
56 56 else:
57 57 data = fd.read(data_length)
58 58 except OSError as e:
59 59 if e.errno != errno.ENOENT:
60 60 raise
61 61 if len(data) < data_length:
62 62 return None
63 63 return docket, data
64 64
65 65
66 66 def setup_persistent_nodemap(tr, revlog):
67 67 """Install whatever is needed transaction side to persist a nodemap on disk
68 68
69 69 (only actually persist the nodemap if this is relevant for this revlog)
70 70 """
71 71 if revlog._inline:
72 72 return # inlined revlog are too small for this to be relevant
73 73 if revlog.nodemap_file is None:
74 74 return # we do not use persistent_nodemap on this revlog
75 75
76 76 # we need to happen after the changelog finalization, in that use "cl-"
77 77 callback_id = b"nm-revlog-persistent-nodemap-%s" % revlog.nodemap_file
78 78 if tr.hasfinalize(callback_id):
79 79 return # no need to register again
80 80 tr.addpending(
81 81 callback_id, lambda tr: _persist_nodemap(tr, revlog, pending=True)
82 82 )
83 83 tr.addfinalize(callback_id, lambda tr: _persist_nodemap(tr, revlog))
84 84
85 85
86 86 class _NoTransaction(object):
87 87 """transaction like object to update the nodemap outside a transaction
88 88 """
89 89
90 90 def __init__(self):
91 91 self._postclose = {}
92 92
93 93 def addpostclose(self, callback_id, callback_func):
94 94 self._postclose[callback_id] = callback_func
95 95
96 96 def registertmp(self, *args, **kwargs):
97 97 pass
98 98
99 99 def addbackup(self, *args, **kwargs):
100 100 pass
101 101
102 102 def add(self, *args, **kwargs):
103 103 pass
104 104
105 105 def addabort(self, *args, **kwargs):
106 106 pass
107 107
108 108
109 109 def update_persistent_nodemap(revlog):
110 110 """update the persistent nodemap right now
111 111
112 112 To be used for updating the nodemap on disk outside of a normal transaction
113 113 setup (eg, `debugupdatecache`).
114 114 """
115 115 notr = _NoTransaction()
116 116 _persist_nodemap(notr, revlog)
117 117 for k in sorted(notr._postclose):
118 118 notr._postclose[k](None)
119 119
120 120
121 121 def _persist_nodemap(tr, revlog, pending=False):
122 122 """Write nodemap data on disk for a given revlog
123 123 """
124 124 if getattr(revlog, 'filteredrevs', ()):
125 125 raise error.ProgrammingError(
126 126 "cannot persist nodemap of a filtered changelog"
127 127 )
128 128 if revlog.nodemap_file is None:
129 129 msg = "calling persist nodemap on a revlog without the feature enableb"
130 130 raise error.ProgrammingError(msg)
131 131
132 132 can_incremental = util.safehasattr(revlog.index, "nodemap_data_incremental")
133 133 ondisk_docket = revlog._nodemap_docket
134 134 feed_data = util.safehasattr(revlog.index, "update_nodemap_data")
135 use_mmap = revlog.opener.options.get("exp-persistent-nodemap.mmap")
135 use_mmap = revlog.opener.options.get(b"exp-persistent-nodemap.mmap")
136 136
137 137 data = None
138 138 # first attemp an incremental update of the data
139 139 if can_incremental and ondisk_docket is not None:
140 140 target_docket = revlog._nodemap_docket.copy()
141 141 (
142 142 src_docket,
143 143 data_changed_count,
144 144 data,
145 145 ) = revlog.index.nodemap_data_incremental()
146 146 new_length = target_docket.data_length + len(data)
147 147 new_unused = target_docket.data_unused + data_changed_count
148 148 if src_docket != target_docket:
149 149 data = None
150 150 elif new_length <= (new_unused * 10): # under 10% of unused data
151 151 data = None
152 152 else:
153 153 datafile = _rawdata_filepath(revlog, target_docket)
154 154 # EXP-TODO: if this is a cache, this should use a cache vfs, not a
155 155 # store vfs
156 156 tr.add(datafile, target_docket.data_length)
157 157 with revlog.opener(datafile, b'r+') as fd:
158 158 fd.seek(target_docket.data_length)
159 159 fd.write(data)
160 160 if feed_data:
161 161 if use_mmap:
162 162 fd.seek(0)
163 163 new_data = fd.read(new_length)
164 164 else:
165 165 fd.flush()
166 166 new_data = util.buffer(util.mmapread(fd, new_length))
167 167 target_docket.data_length = new_length
168 168 target_docket.data_unused = new_unused
169 169
170 170 if data is None:
171 171 # otherwise fallback to a full new export
172 172 target_docket = NodeMapDocket()
173 173 datafile = _rawdata_filepath(revlog, target_docket)
174 174 if util.safehasattr(revlog.index, "nodemap_data_all"):
175 175 data = revlog.index.nodemap_data_all()
176 176 else:
177 177 data = persistent_data(revlog.index)
178 178 # EXP-TODO: if this is a cache, this should use a cache vfs, not a
179 179 # store vfs
180 180
181 181 tryunlink = revlog.opener.tryunlink
182 182
183 183 def abortck(tr):
184 184 tryunlink(datafile)
185 185
186 186 callback_id = b"delete-%s" % datafile
187 187
188 188 # some flavor of the transaction abort does not cleanup new file, it
189 189 # simply empty them.
190 190 tr.addabort(callback_id, abortck)
191 191 with revlog.opener(datafile, b'w+') as fd:
192 192 fd.write(data)
193 193 if feed_data:
194 194 if use_mmap:
195 195 new_data = data
196 196 else:
197 197 fd.flush()
198 198 new_data = util.buffer(util.mmapread(fd, len(data)))
199 199 target_docket.data_length = len(data)
200 200 target_docket.tip_rev = revlog.tiprev()
201 201 target_docket.tip_node = revlog.node(target_docket.tip_rev)
202 202 # EXP-TODO: if this is a cache, this should use a cache vfs, not a
203 203 # store vfs
204 204 file_path = revlog.nodemap_file
205 205 if pending:
206 206 file_path += b'.a'
207 207 tr.registertmp(file_path)
208 208 else:
209 209 tr.addbackup(file_path)
210 210
211 211 with revlog.opener(file_path, b'w', atomictemp=True) as fp:
212 212 fp.write(target_docket.serialize())
213 213 revlog._nodemap_docket = target_docket
214 214 if feed_data:
215 215 revlog.index.update_nodemap_data(target_docket, new_data)
216 216
217 217 # search for old index file in all cases, some older process might have
218 218 # left one behind.
219 219 olds = _other_rawdata_filepath(revlog, target_docket)
220 220 if olds:
221 221 realvfs = getattr(revlog, '_realopener', revlog.opener)
222 222
223 223 def cleanup(tr):
224 224 for oldfile in olds:
225 225 realvfs.tryunlink(oldfile)
226 226
227 227 callback_id = b"revlog-cleanup-nodemap-%s" % revlog.nodemap_file
228 228 tr.addpostclose(callback_id, cleanup)
229 229
230 230
231 231 ### Nodemap docket file
232 232 #
233 233 # The nodemap data are stored on disk using 2 files:
234 234 #
235 235 # * a raw data files containing a persistent nodemap
236 236 # (see `Nodemap Trie` section)
237 237 #
238 238 # * a small "docket" file containing medatadata
239 239 #
240 240 # While the nodemap data can be multiple tens of megabytes, the "docket" is
241 241 # small, it is easy to update it automatically or to duplicated its content
242 242 # during a transaction.
243 243 #
244 244 # Multiple raw data can exist at the same time (The currently valid one and a
245 245 # new one beind used by an in progress transaction). To accomodate this, the
246 246 # filename hosting the raw data has a variable parts. The exact filename is
247 247 # specified inside the "docket" file.
248 248 #
249 249 # The docket file contains information to find, qualify and validate the raw
250 250 # data. Its content is currently very light, but it will expand as the on disk
251 251 # nodemap gains the necessary features to be used in production.
252 252
253 253 # version 0 is experimental, no BC garantee, do no use outside of tests.
254 254 ONDISK_VERSION = 0
255 255 S_VERSION = struct.Struct(">B")
256 256 S_HEADER = struct.Struct(">BQQQQ")
257 257
258 258 ID_SIZE = 8
259 259
260 260
261 261 def _make_uid():
262 262 """return a new unique identifier.
263 263
264 264 The identifier is random and composed of ascii characters."""
265 265 return nodemod.hex(os.urandom(ID_SIZE))
266 266
267 267
268 268 class NodeMapDocket(object):
269 269 """metadata associated with persistent nodemap data
270 270
271 271 The persistent data may come from disk or be on their way to disk.
272 272 """
273 273
274 274 def __init__(self, uid=None):
275 275 if uid is None:
276 276 uid = _make_uid()
277 277 # a unique identifier for the data file:
278 278 # - When new data are appended, it is preserved.
279 279 # - When a new data file is created, a new identifier is generated.
280 280 self.uid = uid
281 281 # the tipmost revision stored in the data file. This revision and all
282 282 # revision before it are expected to be encoded in the data file.
283 283 self.tip_rev = None
284 284 # the node of that tipmost revision, if it mismatch the current index
285 285 # data the docket is not valid for the current index and should be
286 286 # discarded.
287 287 #
288 288 # note: this method is not perfect as some destructive operation could
289 289 # preserve the same tip_rev + tip_node while altering lower revision.
290 290 # However this multiple other caches have the same vulnerability (eg:
291 291 # brancmap cache).
292 292 self.tip_node = None
293 293 # the size (in bytes) of the persisted data to encode the nodemap valid
294 294 # for `tip_rev`.
295 295 # - data file shorter than this are corrupted,
296 296 # - any extra data should be ignored.
297 297 self.data_length = None
298 298 # the amount (in bytes) of "dead" data, still in the data file but no
299 299 # longer used for the nodemap.
300 300 self.data_unused = 0
301 301
302 302 def copy(self):
303 303 new = NodeMapDocket(uid=self.uid)
304 304 new.tip_rev = self.tip_rev
305 305 new.tip_node = self.tip_node
306 306 new.data_length = self.data_length
307 307 new.data_unused = self.data_unused
308 308 return new
309 309
310 310 def __cmp__(self, other):
311 311 if self.uid < other.uid:
312 312 return -1
313 313 if self.uid > other.uid:
314 314 return 1
315 315 elif self.data_length < other.data_length:
316 316 return -1
317 317 elif self.data_length > other.data_length:
318 318 return 1
319 319 return 0
320 320
321 321 def __eq__(self, other):
322 322 return self.uid == other.uid and self.data_length == other.data_length
323 323
324 324 def serialize(self):
325 325 """return serialized bytes for a docket using the passed uid"""
326 326 data = []
327 327 data.append(S_VERSION.pack(ONDISK_VERSION))
328 328 headers = (
329 329 len(self.uid),
330 330 self.tip_rev,
331 331 self.data_length,
332 332 self.data_unused,
333 333 len(self.tip_node),
334 334 )
335 335 data.append(S_HEADER.pack(*headers))
336 336 data.append(self.uid)
337 337 data.append(self.tip_node)
338 338 return b''.join(data)
339 339
340 340
341 341 def _rawdata_filepath(revlog, docket):
342 342 """The (vfs relative) nodemap's rawdata file for a given uid"""
343 343 if revlog.nodemap_file.endswith(b'.n.a'):
344 344 prefix = revlog.nodemap_file[:-4]
345 345 else:
346 346 prefix = revlog.nodemap_file[:-2]
347 347 return b"%s-%s.nd" % (prefix, docket.uid)
348 348
349 349
350 350 def _other_rawdata_filepath(revlog, docket):
351 351 prefix = revlog.nodemap_file[:-2]
352 352 pattern = re.compile(br"(^|/)%s-[0-9a-f]+\.nd$" % prefix)
353 353 new_file_path = _rawdata_filepath(revlog, docket)
354 354 new_file_name = revlog.opener.basename(new_file_path)
355 355 dirpath = revlog.opener.dirname(new_file_path)
356 356 others = []
357 357 for f in revlog.opener.listdir(dirpath):
358 358 if pattern.match(f) and f != new_file_name:
359 359 others.append(f)
360 360 return others
361 361
362 362
363 363 ### Nodemap Trie
364 364 #
365 365 # This is a simple reference implementation to compute and persist a nodemap
366 366 # trie. This reference implementation is write only. The python version of this
367 367 # is not expected to be actually used, since it wont provide performance
368 368 # improvement over existing non-persistent C implementation.
369 369 #
370 370 # The nodemap is persisted as Trie using 4bits-address/16-entries block. each
371 371 # revision can be adressed using its node shortest prefix.
372 372 #
373 373 # The trie is stored as a sequence of block. Each block contains 16 entries
374 374 # (signed 64bit integer, big endian). Each entry can be one of the following:
375 375 #
376 376 # * value >= 0 -> index of sub-block
377 377 # * value == -1 -> no value
378 378 # * value < -1 -> a revision value: rev = -(value+10)
379 379 #
380 380 # The implementation focus on simplicity, not on performance. A Rust
381 381 # implementation should provide a efficient version of the same binary
382 382 # persistence. This reference python implementation is never meant to be
383 383 # extensively use in production.
384 384
385 385
386 386 def persistent_data(index):
387 387 """return the persistent binary form for a nodemap for a given index
388 388 """
389 389 trie = _build_trie(index)
390 390 return _persist_trie(trie)
391 391
392 392
393 393 def update_persistent_data(index, root, max_idx, last_rev):
394 394 """return the incremental update for persistent nodemap from a given index
395 395 """
396 396 changed_block, trie = _update_trie(index, root, last_rev)
397 397 return (
398 398 changed_block * S_BLOCK.size,
399 399 _persist_trie(trie, existing_idx=max_idx),
400 400 )
401 401
402 402
403 403 S_BLOCK = struct.Struct(">" + ("l" * 16))
404 404
405 405 NO_ENTRY = -1
406 406 # rev 0 need to be -2 because 0 is used by block, -1 is a special value.
407 407 REV_OFFSET = 2
408 408
409 409
410 410 def _transform_rev(rev):
411 411 """Return the number used to represent the rev in the tree.
412 412
413 413 (or retrieve a rev number from such representation)
414 414
415 415 Note that this is an involution, a function equal to its inverse (i.e.
416 416 which gives the identity when applied to itself).
417 417 """
418 418 return -(rev + REV_OFFSET)
419 419
420 420
421 421 def _to_int(hex_digit):
422 422 """turn an hexadecimal digit into a proper integer"""
423 423 return int(hex_digit, 16)
424 424
425 425
426 426 class Block(dict):
427 427 """represent a block of the Trie
428 428
429 429 contains up to 16 entry indexed from 0 to 15"""
430 430
431 431 def __init__(self):
432 432 super(Block, self).__init__()
433 433 # If this block exist on disk, here is its ID
434 434 self.ondisk_id = None
435 435
436 436 def __iter__(self):
437 437 return iter(self.get(i) for i in range(16))
438 438
439 439
440 440 def _build_trie(index):
441 441 """build a nodemap trie
442 442
443 443 The nodemap stores revision number for each unique prefix.
444 444
445 445 Each block is a dictionary with keys in `[0, 15]`. Values are either
446 446 another block or a revision number.
447 447 """
448 448 root = Block()
449 449 for rev in range(len(index)):
450 450 hex = nodemod.hex(index[rev][7])
451 451 _insert_into_block(index, 0, root, rev, hex)
452 452 return root
453 453
454 454
455 455 def _update_trie(index, root, last_rev):
456 456 """consume"""
457 457 changed = 0
458 458 for rev in range(last_rev + 1, len(index)):
459 459 hex = nodemod.hex(index[rev][7])
460 460 changed += _insert_into_block(index, 0, root, rev, hex)
461 461 return changed, root
462 462
463 463
464 464 def _insert_into_block(index, level, block, current_rev, current_hex):
465 465 """insert a new revision in a block
466 466
467 467 index: the index we are adding revision for
468 468 level: the depth of the current block in the trie
469 469 block: the block currently being considered
470 470 current_rev: the revision number we are adding
471 471 current_hex: the hexadecimal representation of the of that revision
472 472 """
473 473 changed = 1
474 474 if block.ondisk_id is not None:
475 475 block.ondisk_id = None
476 476 hex_digit = _to_int(current_hex[level : level + 1])
477 477 entry = block.get(hex_digit)
478 478 if entry is None:
479 479 # no entry, simply store the revision number
480 480 block[hex_digit] = current_rev
481 481 elif isinstance(entry, dict):
482 482 # need to recurse to an underlying block
483 483 changed += _insert_into_block(
484 484 index, level + 1, entry, current_rev, current_hex
485 485 )
486 486 else:
487 487 # collision with a previously unique prefix, inserting new
488 488 # vertices to fit both entry.
489 489 other_hex = nodemod.hex(index[entry][7])
490 490 other_rev = entry
491 491 new = Block()
492 492 block[hex_digit] = new
493 493 _insert_into_block(index, level + 1, new, other_rev, other_hex)
494 494 _insert_into_block(index, level + 1, new, current_rev, current_hex)
495 495 return changed
496 496
497 497
498 498 def _persist_trie(root, existing_idx=None):
499 499 """turn a nodemap trie into persistent binary data
500 500
501 501 See `_build_trie` for nodemap trie structure"""
502 502 block_map = {}
503 503 if existing_idx is not None:
504 504 base_idx = existing_idx + 1
505 505 else:
506 506 base_idx = 0
507 507 chunks = []
508 508 for tn in _walk_trie(root):
509 509 if tn.ondisk_id is not None:
510 510 block_map[id(tn)] = tn.ondisk_id
511 511 else:
512 512 block_map[id(tn)] = len(chunks) + base_idx
513 513 chunks.append(_persist_block(tn, block_map))
514 514 return b''.join(chunks)
515 515
516 516
517 517 def _walk_trie(block):
518 518 """yield all the block in a trie
519 519
520 520 Children blocks are always yield before their parent block.
521 521 """
522 522 for (__, item) in sorted(block.items()):
523 523 if isinstance(item, dict):
524 524 for sub_block in _walk_trie(item):
525 525 yield sub_block
526 526 yield block
527 527
528 528
529 529 def _persist_block(block_node, block_map):
530 530 """produce persistent binary data for a single block
531 531
532 532 Children block are assumed to be already persisted and present in
533 533 block_map.
534 534 """
535 535 data = tuple(_to_value(v, block_map) for v in block_node)
536 536 return S_BLOCK.pack(*data)
537 537
538 538
539 539 def _to_value(item, block_map):
540 540 """persist any value as an integer"""
541 541 if item is None:
542 542 return NO_ENTRY
543 543 elif isinstance(item, dict):
544 544 return block_map[id(item)]
545 545 else:
546 546 return _transform_rev(item)
547 547
548 548
549 549 def parse_data(data):
550 550 """parse parse nodemap data into a nodemap Trie"""
551 551 if (len(data) % S_BLOCK.size) != 0:
552 552 msg = "nodemap data size is not a multiple of block size (%d): %d"
553 553 raise error.Abort(msg % (S_BLOCK.size, len(data)))
554 554 if not data:
555 555 return Block(), None
556 556 block_map = {}
557 557 new_blocks = []
558 558 for i in range(0, len(data), S_BLOCK.size):
559 559 block = Block()
560 560 block.ondisk_id = len(block_map)
561 561 block_map[block.ondisk_id] = block
562 562 block_data = data[i : i + S_BLOCK.size]
563 563 values = S_BLOCK.unpack(block_data)
564 564 new_blocks.append((block, values))
565 565 for b, values in new_blocks:
566 566 for idx, v in enumerate(values):
567 567 if v == NO_ENTRY:
568 568 continue
569 569 elif v >= 0:
570 570 b[idx] = block_map[v]
571 571 else:
572 572 b[idx] = _transform_rev(v)
573 573 return block, i // S_BLOCK.size
574 574
575 575
576 576 # debug utility
577 577
578 578
579 579 def check_data(ui, index, data):
580 580 """verify that the provided nodemap data are valid for the given idex"""
581 581 ret = 0
582 582 ui.status((b"revision in index: %d\n") % len(index))
583 583 root, __ = parse_data(data)
584 584 all_revs = set(_all_revisions(root))
585 585 ui.status((b"revision in nodemap: %d\n") % len(all_revs))
586 586 for r in range(len(index)):
587 587 if r not in all_revs:
588 588 msg = b" revision missing from nodemap: %d\n" % r
589 589 ui.write_err(msg)
590 590 ret = 1
591 591 else:
592 592 all_revs.remove(r)
593 593 nm_rev = _find_node(root, nodemod.hex(index[r][7]))
594 594 if nm_rev is None:
595 595 msg = b" revision node does not match any entries: %d\n" % r
596 596 ui.write_err(msg)
597 597 ret = 1
598 598 elif nm_rev != r:
599 599 msg = (
600 600 b" revision node does not match the expected revision: "
601 601 b"%d != %d\n" % (r, nm_rev)
602 602 )
603 603 ui.write_err(msg)
604 604 ret = 1
605 605
606 606 if all_revs:
607 607 for r in sorted(all_revs):
608 608 msg = b" extra revision in nodemap: %d\n" % r
609 609 ui.write_err(msg)
610 610 ret = 1
611 611 return ret
612 612
613 613
614 614 def _all_revisions(root):
615 615 """return all revisions stored in a Trie"""
616 616 for block in _walk_trie(root):
617 617 for v in block:
618 618 if v is None or isinstance(v, Block):
619 619 continue
620 620 yield v
621 621
622 622
623 623 def _find_node(block, node):
624 624 """find the revision associated with a given node"""
625 625 entry = block.get(_to_int(node[0:1]))
626 626 if isinstance(entry, dict):
627 627 return _find_node(entry, node[1:])
628 628 return entry
General Comments 0
You need to be logged in to leave comments. Login now