##// END OF EJS Templates
nodemap: document the docket attributes...
marmoute -
r44983:283fd803 default
parent child Browse files
Show More
@@ -1,554 +1,565 b''
1 1 # nodemap.py - nodemap related code and utilities
2 2 #
3 3 # Copyright 2019 Pierre-Yves David <pierre-yves.david@octobus.net>
4 4 # Copyright 2019 George Racinet <georges.racinet@octobus.net>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from __future__ import absolute_import
10 10
11 11 import errno
12 12 import os
13 13 import re
14 14 import struct
15 15
16 16 from .. import (
17 17 error,
18 18 node as nodemod,
19 19 util,
20 20 )
21 21
22 22
23 23 class NodeMap(dict):
24 24 def __missing__(self, x):
25 25 raise error.RevlogError(b'unknown node: %s' % x)
26 26
27 27
28 28 def persisted_data(revlog):
29 29 """read the nodemap for a revlog from disk"""
30 30 if revlog.nodemap_file is None:
31 31 return None
32 32 pdata = revlog.opener.tryread(revlog.nodemap_file)
33 33 if not pdata:
34 34 return None
35 35 offset = 0
36 36 (version,) = S_VERSION.unpack(pdata[offset : offset + S_VERSION.size])
37 37 if version != ONDISK_VERSION:
38 38 return None
39 39 offset += S_VERSION.size
40 40 headers = S_HEADER.unpack(pdata[offset : offset + S_HEADER.size])
41 41 uid_size, tip_rev, data_length, data_unused = headers
42 42 offset += S_HEADER.size
43 43 docket = NodeMapDocket(pdata[offset : offset + uid_size])
44 44 docket.tip_rev = tip_rev
45 45 docket.data_length = data_length
46 46 docket.data_unused = data_unused
47 47
48 48 filename = _rawdata_filepath(revlog, docket)
49 49 use_mmap = revlog.opener.options.get("exp-persistent-nodemap.mmap")
50 50 try:
51 51 with revlog.opener(filename) as fd:
52 52 if use_mmap:
53 53 data = util.buffer(util.mmapread(fd, data_length))
54 54 else:
55 55 data = fd.read(data_length)
56 56 except OSError as e:
57 57 if e.errno != errno.ENOENT:
58 58 raise
59 59 if len(data) < data_length:
60 60 return None
61 61 return docket, data
62 62
63 63
64 64 def setup_persistent_nodemap(tr, revlog):
65 65 """Install whatever is needed transaction side to persist a nodemap on disk
66 66
67 67 (only actually persist the nodemap if this is relevant for this revlog)
68 68 """
69 69 if revlog._inline:
70 70 return # inlined revlog are too small for this to be relevant
71 71 if revlog.nodemap_file is None:
72 72 return # we do not use persistent_nodemap on this revlog
73 73 callback_id = b"revlog-persistent-nodemap-%s" % revlog.nodemap_file
74 74 if tr.hasfinalize(callback_id):
75 75 return # no need to register again
76 76 tr.addfinalize(
77 77 callback_id, lambda tr: _persist_nodemap(tr.addpostclose, revlog)
78 78 )
79 79
80 80
81 81 def update_persistent_nodemap(revlog):
82 82 """update the persistent nodemap right now
83 83
84 84 To be used for updating the nodemap on disk outside of a normal transaction
85 85 setup (eg, `debugupdatecache`).
86 86 """
87 87 cleanups = []
88 88 _persist_nodemap((lambda x, y: cleanups.append(y)), revlog)
89 89 for c in cleanups:
90 90 c(None)
91 91
92 92
93 93 def _persist_nodemap(cleaner, revlog):
94 94 """Write nodemap data on disk for a given revlog
95 95 """
96 96 if getattr(revlog, 'filteredrevs', ()):
97 97 raise error.ProgrammingError(
98 98 "cannot persist nodemap of a filtered changelog"
99 99 )
100 100 if revlog.nodemap_file is None:
101 101 msg = "calling persist nodemap on a revlog without the feature enableb"
102 102 raise error.ProgrammingError(msg)
103 103
104 104 can_incremental = util.safehasattr(revlog.index, "nodemap_data_incremental")
105 105 ondisk_docket = revlog._nodemap_docket
106 106 feed_data = util.safehasattr(revlog.index, "update_nodemap_data")
107 107 use_mmap = revlog.opener.options.get("exp-persistent-nodemap.mmap")
108 108
109 109 data = None
110 110 # first attemp an incremental update of the data
111 111 if can_incremental and ondisk_docket is not None:
112 112 target_docket = revlog._nodemap_docket.copy()
113 113 (
114 114 src_docket,
115 115 data_changed_count,
116 116 data,
117 117 ) = revlog.index.nodemap_data_incremental()
118 118 if src_docket != target_docket:
119 119 data = None
120 120 else:
121 121 datafile = _rawdata_filepath(revlog, target_docket)
122 122 # EXP-TODO: if this is a cache, this should use a cache vfs, not a
123 123 # store vfs
124 124 new_length = target_docket.data_length + len(data)
125 125 with revlog.opener(datafile, b'r+') as fd:
126 126 fd.seek(target_docket.data_length)
127 127 fd.write(data)
128 128 if feed_data:
129 129 if use_mmap:
130 130 fd.seek(0)
131 131 new_data = fd.read(new_length)
132 132 else:
133 133 fd.flush()
134 134 new_data = util.buffer(util.mmapread(fd, new_length))
135 135 target_docket.data_length = new_length
136 136 target_docket.data_unused += data_changed_count
137 137
138 138 if data is None:
139 139 # otherwise fallback to a full new export
140 140 target_docket = NodeMapDocket()
141 141 datafile = _rawdata_filepath(revlog, target_docket)
142 142 if util.safehasattr(revlog.index, "nodemap_data_all"):
143 143 data = revlog.index.nodemap_data_all()
144 144 else:
145 145 data = persistent_data(revlog.index)
146 146 # EXP-TODO: if this is a cache, this should use a cache vfs, not a
147 147 # store vfs
148 148 with revlog.opener(datafile, b'w+') as fd:
149 149 fd.write(data)
150 150 if feed_data:
151 151 if use_mmap:
152 152 new_data = data
153 153 else:
154 154 fd.flush()
155 155 new_data = util.buffer(util.mmapread(fd, len(data)))
156 156 target_docket.data_length = len(data)
157 157 target_docket.tip_rev = revlog.tiprev()
158 158 # EXP-TODO: if this is a cache, this should use a cache vfs, not a
159 159 # store vfs
160 160 with revlog.opener(revlog.nodemap_file, b'w', atomictemp=True) as fp:
161 161 fp.write(target_docket.serialize())
162 162 revlog._nodemap_docket = target_docket
163 163 if feed_data:
164 164 revlog.index.update_nodemap_data(target_docket, new_data)
165 165
166 166 # EXP-TODO: if the transaction abort, we should remove the new data and
167 167 # reinstall the old one.
168 168
169 169 # search for old index file in all cases, some older process might have
170 170 # left one behind.
171 171 olds = _other_rawdata_filepath(revlog, target_docket)
172 172 if olds:
173 173 realvfs = getattr(revlog, '_realopener', revlog.opener)
174 174
175 175 def cleanup(tr):
176 176 for oldfile in olds:
177 177 realvfs.tryunlink(oldfile)
178 178
179 179 callback_id = b"revlog-cleanup-nodemap-%s" % revlog.nodemap_file
180 180 cleaner(callback_id, cleanup)
181 181
182 182
183 183 ### Nodemap docket file
184 184 #
185 185 # The nodemap data are stored on disk using 2 files:
186 186 #
187 187 # * a raw data files containing a persistent nodemap
188 188 # (see `Nodemap Trie` section)
189 189 #
190 190 # * a small "docket" file containing medatadata
191 191 #
192 192 # While the nodemap data can be multiple tens of megabytes, the "docket" is
193 193 # small, it is easy to update it automatically or to duplicated its content
194 194 # during a transaction.
195 195 #
196 196 # Multiple raw data can exist at the same time (The currently valid one and a
197 197 # new one beind used by an in progress transaction). To accomodate this, the
198 198 # filename hosting the raw data has a variable parts. The exact filename is
199 199 # specified inside the "docket" file.
200 200 #
201 201 # The docket file contains information to find, qualify and validate the raw
202 202 # data. Its content is currently very light, but it will expand as the on disk
203 203 # nodemap gains the necessary features to be used in production.
204 204
205 205 # version 0 is experimental, no BC garantee, do no use outside of tests.
206 206 ONDISK_VERSION = 0
207 207 S_VERSION = struct.Struct(">B")
208 208 S_HEADER = struct.Struct(">BQQQ")
209 209
210 210 ID_SIZE = 8
211 211
212 212
213 213 def _make_uid():
214 214 """return a new unique identifier.
215 215
216 216 The identifier is random and composed of ascii characters."""
217 217 return nodemod.hex(os.urandom(ID_SIZE))
218 218
219 219
220 220 class NodeMapDocket(object):
221 221 """metadata associated with persistent nodemap data
222 222
223 223 The persistent data may come from disk or be on their way to disk.
224 224 """
225 225
226 226 def __init__(self, uid=None):
227 227 if uid is None:
228 228 uid = _make_uid()
229 # a unique identifier for the data file:
230 # - When new data are appended, it is preserved.
231 # - When a new data file is created, a new identifier is generated.
229 232 self.uid = uid
233 # the tipmost revision stored in the data file. This revision and all
234 # revision before it are expected to be encoded in the data file.
230 235 self.tip_rev = None
236 # the size (in bytes) of the persisted data to encode the nodemap valid
237 # for `tip_rev`.
238 # - data file shorter than this are corrupted,
239 # - any extra data should be ignored.
231 240 self.data_length = None
241 # the amount (in bytes) of "dead" data, still in the data file but no
242 # longer used for the nodemap.
232 243 self.data_unused = 0
233 244
234 245 def copy(self):
235 246 new = NodeMapDocket(uid=self.uid)
236 247 new.tip_rev = self.tip_rev
237 248 new.data_length = self.data_length
238 249 new.data_unused = self.data_unused
239 250 return new
240 251
241 252 def __cmp__(self, other):
242 253 if self.uid < other.uid:
243 254 return -1
244 255 if self.uid > other.uid:
245 256 return 1
246 257 elif self.data_length < other.data_length:
247 258 return -1
248 259 elif self.data_length > other.data_length:
249 260 return 1
250 261 return 0
251 262
252 263 def __eq__(self, other):
253 264 return self.uid == other.uid and self.data_length == other.data_length
254 265
255 266 def serialize(self):
256 267 """return serialized bytes for a docket using the passed uid"""
257 268 data = []
258 269 data.append(S_VERSION.pack(ONDISK_VERSION))
259 270 headers = (
260 271 len(self.uid),
261 272 self.tip_rev,
262 273 self.data_length,
263 274 self.data_unused,
264 275 )
265 276 data.append(S_HEADER.pack(*headers))
266 277 data.append(self.uid)
267 278 return b''.join(data)
268 279
269 280
270 281 def _rawdata_filepath(revlog, docket):
271 282 """The (vfs relative) nodemap's rawdata file for a given uid"""
272 283 prefix = revlog.nodemap_file[:-2]
273 284 return b"%s-%s.nd" % (prefix, docket.uid)
274 285
275 286
276 287 def _other_rawdata_filepath(revlog, docket):
277 288 prefix = revlog.nodemap_file[:-2]
278 289 pattern = re.compile(br"(^|/)%s-[0-9a-f]+\.nd$" % prefix)
279 290 new_file_path = _rawdata_filepath(revlog, docket)
280 291 new_file_name = revlog.opener.basename(new_file_path)
281 292 dirpath = revlog.opener.dirname(new_file_path)
282 293 others = []
283 294 for f in revlog.opener.listdir(dirpath):
284 295 if pattern.match(f) and f != new_file_name:
285 296 others.append(f)
286 297 return others
287 298
288 299
289 300 ### Nodemap Trie
290 301 #
291 302 # This is a simple reference implementation to compute and persist a nodemap
292 303 # trie. This reference implementation is write only. The python version of this
293 304 # is not expected to be actually used, since it wont provide performance
294 305 # improvement over existing non-persistent C implementation.
295 306 #
296 307 # The nodemap is persisted as Trie using 4bits-address/16-entries block. each
297 308 # revision can be adressed using its node shortest prefix.
298 309 #
299 310 # The trie is stored as a sequence of block. Each block contains 16 entries
300 311 # (signed 64bit integer, big endian). Each entry can be one of the following:
301 312 #
302 313 # * value >= 0 -> index of sub-block
303 314 # * value == -1 -> no value
304 315 # * value < -1 -> a revision value: rev = -(value+10)
305 316 #
306 317 # The implementation focus on simplicity, not on performance. A Rust
307 318 # implementation should provide a efficient version of the same binary
308 319 # persistence. This reference python implementation is never meant to be
309 320 # extensively use in production.
310 321
311 322
312 323 def persistent_data(index):
313 324 """return the persistent binary form for a nodemap for a given index
314 325 """
315 326 trie = _build_trie(index)
316 327 return _persist_trie(trie)
317 328
318 329
319 330 def update_persistent_data(index, root, max_idx, last_rev):
320 331 """return the incremental update for persistent nodemap from a given index
321 332 """
322 333 changed_block, trie = _update_trie(index, root, last_rev)
323 334 return (
324 335 changed_block * S_BLOCK.size,
325 336 _persist_trie(trie, existing_idx=max_idx),
326 337 )
327 338
328 339
329 340 S_BLOCK = struct.Struct(">" + ("l" * 16))
330 341
331 342 NO_ENTRY = -1
332 343 # rev 0 need to be -2 because 0 is used by block, -1 is a special value.
333 344 REV_OFFSET = 2
334 345
335 346
336 347 def _transform_rev(rev):
337 348 """Return the number used to represent the rev in the tree.
338 349
339 350 (or retrieve a rev number from such representation)
340 351
341 352 Note that this is an involution, a function equal to its inverse (i.e.
342 353 which gives the identity when applied to itself).
343 354 """
344 355 return -(rev + REV_OFFSET)
345 356
346 357
347 358 def _to_int(hex_digit):
348 359 """turn an hexadecimal digit into a proper integer"""
349 360 return int(hex_digit, 16)
350 361
351 362
352 363 class Block(dict):
353 364 """represent a block of the Trie
354 365
355 366 contains up to 16 entry indexed from 0 to 15"""
356 367
357 368 def __init__(self):
358 369 super(Block, self).__init__()
359 370 # If this block exist on disk, here is its ID
360 371 self.ondisk_id = None
361 372
362 373 def __iter__(self):
363 374 return iter(self.get(i) for i in range(16))
364 375
365 376
366 377 def _build_trie(index):
367 378 """build a nodemap trie
368 379
369 380 The nodemap stores revision number for each unique prefix.
370 381
371 382 Each block is a dictionary with keys in `[0, 15]`. Values are either
372 383 another block or a revision number.
373 384 """
374 385 root = Block()
375 386 for rev in range(len(index)):
376 387 hex = nodemod.hex(index[rev][7])
377 388 _insert_into_block(index, 0, root, rev, hex)
378 389 return root
379 390
380 391
381 392 def _update_trie(index, root, last_rev):
382 393 """consume"""
383 394 changed = 0
384 395 for rev in range(last_rev + 1, len(index)):
385 396 hex = nodemod.hex(index[rev][7])
386 397 changed += _insert_into_block(index, 0, root, rev, hex)
387 398 return changed, root
388 399
389 400
390 401 def _insert_into_block(index, level, block, current_rev, current_hex):
391 402 """insert a new revision in a block
392 403
393 404 index: the index we are adding revision for
394 405 level: the depth of the current block in the trie
395 406 block: the block currently being considered
396 407 current_rev: the revision number we are adding
397 408 current_hex: the hexadecimal representation of the of that revision
398 409 """
399 410 changed = 1
400 411 if block.ondisk_id is not None:
401 412 block.ondisk_id = None
402 413 hex_digit = _to_int(current_hex[level : level + 1])
403 414 entry = block.get(hex_digit)
404 415 if entry is None:
405 416 # no entry, simply store the revision number
406 417 block[hex_digit] = current_rev
407 418 elif isinstance(entry, dict):
408 419 # need to recurse to an underlying block
409 420 changed += _insert_into_block(
410 421 index, level + 1, entry, current_rev, current_hex
411 422 )
412 423 else:
413 424 # collision with a previously unique prefix, inserting new
414 425 # vertices to fit both entry.
415 426 other_hex = nodemod.hex(index[entry][7])
416 427 other_rev = entry
417 428 new = Block()
418 429 block[hex_digit] = new
419 430 _insert_into_block(index, level + 1, new, other_rev, other_hex)
420 431 _insert_into_block(index, level + 1, new, current_rev, current_hex)
421 432 return changed
422 433
423 434
424 435 def _persist_trie(root, existing_idx=None):
425 436 """turn a nodemap trie into persistent binary data
426 437
427 438 See `_build_trie` for nodemap trie structure"""
428 439 block_map = {}
429 440 if existing_idx is not None:
430 441 base_idx = existing_idx + 1
431 442 else:
432 443 base_idx = 0
433 444 chunks = []
434 445 for tn in _walk_trie(root):
435 446 if tn.ondisk_id is not None:
436 447 block_map[id(tn)] = tn.ondisk_id
437 448 else:
438 449 block_map[id(tn)] = len(chunks) + base_idx
439 450 chunks.append(_persist_block(tn, block_map))
440 451 return b''.join(chunks)
441 452
442 453
443 454 def _walk_trie(block):
444 455 """yield all the block in a trie
445 456
446 457 Children blocks are always yield before their parent block.
447 458 """
448 459 for (_, item) in sorted(block.items()):
449 460 if isinstance(item, dict):
450 461 for sub_block in _walk_trie(item):
451 462 yield sub_block
452 463 yield block
453 464
454 465
455 466 def _persist_block(block_node, block_map):
456 467 """produce persistent binary data for a single block
457 468
458 469 Children block are assumed to be already persisted and present in
459 470 block_map.
460 471 """
461 472 data = tuple(_to_value(v, block_map) for v in block_node)
462 473 return S_BLOCK.pack(*data)
463 474
464 475
465 476 def _to_value(item, block_map):
466 477 """persist any value as an integer"""
467 478 if item is None:
468 479 return NO_ENTRY
469 480 elif isinstance(item, dict):
470 481 return block_map[id(item)]
471 482 else:
472 483 return _transform_rev(item)
473 484
474 485
475 486 def parse_data(data):
476 487 """parse parse nodemap data into a nodemap Trie"""
477 488 if (len(data) % S_BLOCK.size) != 0:
478 489 msg = "nodemap data size is not a multiple of block size (%d): %d"
479 490 raise error.Abort(msg % (S_BLOCK.size, len(data)))
480 491 if not data:
481 492 return Block(), None
482 493 block_map = {}
483 494 new_blocks = []
484 495 for i in range(0, len(data), S_BLOCK.size):
485 496 block = Block()
486 497 block.ondisk_id = len(block_map)
487 498 block_map[block.ondisk_id] = block
488 499 block_data = data[i : i + S_BLOCK.size]
489 500 values = S_BLOCK.unpack(block_data)
490 501 new_blocks.append((block, values))
491 502 for b, values in new_blocks:
492 503 for idx, v in enumerate(values):
493 504 if v == NO_ENTRY:
494 505 continue
495 506 elif v >= 0:
496 507 b[idx] = block_map[v]
497 508 else:
498 509 b[idx] = _transform_rev(v)
499 510 return block, i // S_BLOCK.size
500 511
501 512
502 513 # debug utility
503 514
504 515
505 516 def check_data(ui, index, data):
506 517 """verify that the provided nodemap data are valid for the given idex"""
507 518 ret = 0
508 519 ui.status((b"revision in index: %d\n") % len(index))
509 520 root, __ = parse_data(data)
510 521 all_revs = set(_all_revisions(root))
511 522 ui.status((b"revision in nodemap: %d\n") % len(all_revs))
512 523 for r in range(len(index)):
513 524 if r not in all_revs:
514 525 msg = b" revision missing from nodemap: %d\n" % r
515 526 ui.write_err(msg)
516 527 ret = 1
517 528 else:
518 529 all_revs.remove(r)
519 530 nm_rev = _find_node(root, nodemod.hex(index[r][7]))
520 531 if nm_rev is None:
521 532 msg = b" revision node does not match any entries: %d\n" % r
522 533 ui.write_err(msg)
523 534 ret = 1
524 535 elif nm_rev != r:
525 536 msg = (
526 537 b" revision node does not match the expected revision: "
527 538 b"%d != %d\n" % (r, nm_rev)
528 539 )
529 540 ui.write_err(msg)
530 541 ret = 1
531 542
532 543 if all_revs:
533 544 for r in sorted(all_revs):
534 545 msg = b" extra revision in nodemap: %d\n" % r
535 546 ui.write_err(msg)
536 547 ret = 1
537 548 return ret
538 549
539 550
540 551 def _all_revisions(root):
541 552 """return all revisions stored in a Trie"""
542 553 for block in _walk_trie(root):
543 554 for v in block:
544 555 if v is None or isinstance(v, Block):
545 556 continue
546 557 yield v
547 558
548 559
549 560 def _find_node(block, node):
550 561 """find the revision associated with a given node"""
551 562 entry = block.get(_to_int(node[0:1]))
552 563 if isinstance(entry, dict):
553 564 return _find_node(entry, node[1:])
554 565 return entry
General Comments 0
You need to be logged in to leave comments. Login now