##// END OF EJS Templates
nodemap: update the index with the newly written data (when appropriate)...
marmoute -
r44812:6ecc34b3 default
parent child Browse files
Show More
@@ -1,513 +1,519 b''
1 1 # nodemap.py - nodemap related code and utilities
2 2 #
3 3 # Copyright 2019 Pierre-Yves David <pierre-yves.david@octobus.net>
4 4 # Copyright 2019 George Racinet <georges.racinet@octobus.net>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from __future__ import absolute_import
10 10
11 11 import os
12 12 import re
13 13 import struct
14 14
15 15 from .. import (
16 16 error,
17 17 node as nodemod,
18 18 util,
19 19 )
20 20
21 21
22 22 class NodeMap(dict):
23 23 def __missing__(self, x):
24 24 raise error.RevlogError(b'unknown node: %s' % x)
25 25
26 26
27 27 def persisted_data(revlog):
28 28 """read the nodemap for a revlog from disk"""
29 29 if revlog.nodemap_file is None:
30 30 return None
31 31 pdata = revlog.opener.tryread(revlog.nodemap_file)
32 32 if not pdata:
33 33 return None
34 34 offset = 0
35 35 (version,) = S_VERSION.unpack(pdata[offset : offset + S_VERSION.size])
36 36 if version != ONDISK_VERSION:
37 37 return None
38 38 offset += S_VERSION.size
39 39 headers = S_HEADER.unpack(pdata[offset : offset + S_HEADER.size])
40 40 uid_size, tip_rev, data_length, data_unused = headers
41 41 offset += S_HEADER.size
42 42 docket = NodeMapDocket(pdata[offset : offset + uid_size])
43 43 docket.tip_rev = tip_rev
44 44 docket.data_length = data_length
45 45 docket.data_unused = data_unused
46 46
47 47 filename = _rawdata_filepath(revlog, docket)
48 48 data = revlog.opener.tryread(filename)
49 49 if len(data) < data_length:
50 50 return None
51 51 elif len(data) > data_length:
52 52 data = data[:data_length]
53 53 return docket, data
54 54
55 55
56 56 def setup_persistent_nodemap(tr, revlog):
57 57 """Install whatever is needed transaction side to persist a nodemap on disk
58 58
59 59 (only actually persist the nodemap if this is relevant for this revlog)
60 60 """
61 61 if revlog._inline:
62 62 return # inlined revlog are too small for this to be relevant
63 63 if revlog.nodemap_file is None:
64 64 return # we do not use persistent_nodemap on this revlog
65 65 callback_id = b"revlog-persistent-nodemap-%s" % revlog.nodemap_file
66 66 if tr.hasfinalize(callback_id):
67 67 return # no need to register again
68 68 tr.addfinalize(callback_id, lambda tr: _persist_nodemap(tr, revlog))
69 69
70 70
71 71 def _persist_nodemap(tr, revlog):
72 72 """Write nodemap data on disk for a given revlog
73 73 """
74 74 if getattr(revlog, 'filteredrevs', ()):
75 75 raise error.ProgrammingError(
76 76 "cannot persist nodemap of a filtered changelog"
77 77 )
78 78 if revlog.nodemap_file is None:
79 79 msg = "calling persist nodemap on a revlog without the feature enableb"
80 80 raise error.ProgrammingError(msg)
81 81
82 82 can_incremental = util.safehasattr(revlog.index, "nodemap_data_incremental")
83 83 ondisk_docket = revlog._nodemap_docket
84 84
85 85 data = None
86 86 # first attemp an incremental update of the data
87 87 if can_incremental and ondisk_docket is not None:
88 88 target_docket = revlog._nodemap_docket.copy()
89 89 (
90 90 src_docket,
91 91 data_changed_count,
92 92 data,
93 93 ) = revlog.index.nodemap_data_incremental()
94 94 if src_docket != target_docket:
95 95 data = None
96 96 else:
97 97 datafile = _rawdata_filepath(revlog, target_docket)
98 98 # EXP-TODO: if this is a cache, this should use a cache vfs, not a
99 99 # store vfs
100 100 with revlog.opener(datafile, b'r+') as fd:
101 101 fd.seek(target_docket.data_length)
102 102 fd.write(data)
103 fd.seek(0)
104 new_data = fd.read(target_docket.data_length + len(data))
103 105 target_docket.data_length += len(data)
104 106 target_docket.data_unused += data_changed_count
105 107
106 108 if data is None:
107 109 # otherwise fallback to a full new export
108 110 target_docket = NodeMapDocket()
109 111 datafile = _rawdata_filepath(revlog, target_docket)
110 112 if util.safehasattr(revlog.index, "nodemap_data_all"):
111 113 data = revlog.index.nodemap_data_all()
112 114 else:
113 115 data = persistent_data(revlog.index)
114 116 # EXP-TODO: if this is a cache, this should use a cache vfs, not a
115 117 # store vfs
118 new_data = data
116 119 with revlog.opener(datafile, b'w') as fd:
117 120 fd.write(data)
118 121 target_docket.data_length = len(data)
119 122 target_docket.tip_rev = revlog.tiprev()
120 123 # EXP-TODO: if this is a cache, this should use a cache vfs, not a
121 124 # store vfs
122 125 with revlog.opener(revlog.nodemap_file, b'w', atomictemp=True) as fp:
123 126 fp.write(target_docket.serialize())
124 127 revlog._nodemap_docket = target_docket
128 if util.safehasattr(revlog.index, "update_nodemap_data"):
129 revlog.index.update_nodemap_data(target_docket, new_data)
130
125 131 # EXP-TODO: if the transaction abort, we should remove the new data and
126 132 # reinstall the old one.
127 133
128 134 # search for old index file in all cases, some older process might have
129 135 # left one behind.
130 136 olds = _other_rawdata_filepath(revlog, target_docket)
131 137 if olds:
132 138 realvfs = getattr(revlog, '_realopener', revlog.opener)
133 139
134 140 def cleanup(tr):
135 141 for oldfile in olds:
136 142 realvfs.tryunlink(oldfile)
137 143
138 144 callback_id = b"revlog-cleanup-nodemap-%s" % revlog.nodemap_file
139 145 tr.addpostclose(callback_id, cleanup)
140 146
141 147
142 148 ### Nodemap docket file
143 149 #
144 150 # The nodemap data are stored on disk using 2 files:
145 151 #
146 152 # * a raw data files containing a persistent nodemap
147 153 # (see `Nodemap Trie` section)
148 154 #
149 155 # * a small "docket" file containing medatadata
150 156 #
151 157 # While the nodemap data can be multiple tens of megabytes, the "docket" is
152 158 # small, it is easy to update it automatically or to duplicated its content
153 159 # during a transaction.
154 160 #
155 161 # Multiple raw data can exist at the same time (The currently valid one and a
156 162 # new one beind used by an in progress transaction). To accomodate this, the
157 163 # filename hosting the raw data has a variable parts. The exact filename is
158 164 # specified inside the "docket" file.
159 165 #
160 166 # The docket file contains information to find, qualify and validate the raw
161 167 # data. Its content is currently very light, but it will expand as the on disk
162 168 # nodemap gains the necessary features to be used in production.
163 169
164 170 # version 0 is experimental, no BC garantee, do no use outside of tests.
165 171 ONDISK_VERSION = 0
166 172 S_VERSION = struct.Struct(">B")
167 173 S_HEADER = struct.Struct(">BQQQ")
168 174
169 175 ID_SIZE = 8
170 176
171 177
172 178 def _make_uid():
173 179 """return a new unique identifier.
174 180
175 181 The identifier is random and composed of ascii characters."""
176 182 return nodemod.hex(os.urandom(ID_SIZE))
177 183
178 184
179 185 class NodeMapDocket(object):
180 186 """metadata associated with persistent nodemap data
181 187
182 188 The persistent data may come from disk or be on their way to disk.
183 189 """
184 190
185 191 def __init__(self, uid=None):
186 192 if uid is None:
187 193 uid = _make_uid()
188 194 self.uid = uid
189 195 self.tip_rev = None
190 196 self.data_length = None
191 197 self.data_unused = 0
192 198
193 199 def copy(self):
194 200 new = NodeMapDocket(uid=self.uid)
195 201 new.tip_rev = self.tip_rev
196 202 new.data_length = self.data_length
197 203 new.data_unused = self.data_unused
198 204 return new
199 205
200 206 def __cmp__(self, other):
201 207 if self.uid < other.uid:
202 208 return -1
203 209 if self.uid > other.uid:
204 210 return 1
205 211 elif self.data_length < other.data_length:
206 212 return -1
207 213 elif self.data_length > other.data_length:
208 214 return 1
209 215 return 0
210 216
211 217 def __eq__(self, other):
212 218 return self.uid == other.uid and self.data_length == other.data_length
213 219
214 220 def serialize(self):
215 221 """return serialized bytes for a docket using the passed uid"""
216 222 data = []
217 223 data.append(S_VERSION.pack(ONDISK_VERSION))
218 224 headers = (
219 225 len(self.uid),
220 226 self.tip_rev,
221 227 self.data_length,
222 228 self.data_unused,
223 229 )
224 230 data.append(S_HEADER.pack(*headers))
225 231 data.append(self.uid)
226 232 return b''.join(data)
227 233
228 234
229 235 def _rawdata_filepath(revlog, docket):
230 236 """The (vfs relative) nodemap's rawdata file for a given uid"""
231 237 prefix = revlog.nodemap_file[:-2]
232 238 return b"%s-%s.nd" % (prefix, docket.uid)
233 239
234 240
235 241 def _other_rawdata_filepath(revlog, docket):
236 242 prefix = revlog.nodemap_file[:-2]
237 243 pattern = re.compile(b"(^|/)%s-[0-9a-f]+\.nd$" % prefix)
238 244 new_file_path = _rawdata_filepath(revlog, docket)
239 245 new_file_name = revlog.opener.basename(new_file_path)
240 246 dirpath = revlog.opener.dirname(new_file_path)
241 247 others = []
242 248 for f in revlog.opener.listdir(dirpath):
243 249 if pattern.match(f) and f != new_file_name:
244 250 others.append(f)
245 251 return others
246 252
247 253
248 254 ### Nodemap Trie
249 255 #
250 256 # This is a simple reference implementation to compute and persist a nodemap
251 257 # trie. This reference implementation is write only. The python version of this
252 258 # is not expected to be actually used, since it wont provide performance
253 259 # improvement over existing non-persistent C implementation.
254 260 #
255 261 # The nodemap is persisted as Trie using 4bits-address/16-entries block. each
256 262 # revision can be adressed using its node shortest prefix.
257 263 #
258 264 # The trie is stored as a sequence of block. Each block contains 16 entries
259 265 # (signed 64bit integer, big endian). Each entry can be one of the following:
260 266 #
261 267 # * value >= 0 -> index of sub-block
262 268 # * value == -1 -> no value
263 269 # * value < -1 -> a revision value: rev = -(value+10)
264 270 #
265 271 # The implementation focus on simplicity, not on performance. A Rust
266 272 # implementation should provide a efficient version of the same binary
267 273 # persistence. This reference python implementation is never meant to be
268 274 # extensively use in production.
269 275
270 276
271 277 def persistent_data(index):
272 278 """return the persistent binary form for a nodemap for a given index
273 279 """
274 280 trie = _build_trie(index)
275 281 return _persist_trie(trie)
276 282
277 283
278 284 def update_persistent_data(index, root, max_idx, last_rev):
279 285 """return the incremental update for persistent nodemap from a given index
280 286 """
281 287 changed_block, trie = _update_trie(index, root, last_rev)
282 288 return (
283 289 changed_block * S_BLOCK.size,
284 290 _persist_trie(trie, existing_idx=max_idx),
285 291 )
286 292
287 293
288 294 S_BLOCK = struct.Struct(">" + ("l" * 16))
289 295
290 296 NO_ENTRY = -1
291 297 # rev 0 need to be -2 because 0 is used by block, -1 is a special value.
292 298 REV_OFFSET = 2
293 299
294 300
295 301 def _transform_rev(rev):
296 302 """Return the number used to represent the rev in the tree.
297 303
298 304 (or retrieve a rev number from such representation)
299 305
300 306 Note that this is an involution, a function equal to its inverse (i.e.
301 307 which gives the identity when applied to itself).
302 308 """
303 309 return -(rev + REV_OFFSET)
304 310
305 311
306 312 def _to_int(hex_digit):
307 313 """turn an hexadecimal digit into a proper integer"""
308 314 return int(hex_digit, 16)
309 315
310 316
311 317 class Block(dict):
312 318 """represent a block of the Trie
313 319
314 320 contains up to 16 entry indexed from 0 to 15"""
315 321
316 322 def __init__(self):
317 323 super(Block, self).__init__()
318 324 # If this block exist on disk, here is its ID
319 325 self.ondisk_id = None
320 326
321 327 def __iter__(self):
322 328 return iter(self.get(i) for i in range(16))
323 329
324 330
325 331 def _build_trie(index):
326 332 """build a nodemap trie
327 333
328 334 The nodemap stores revision number for each unique prefix.
329 335
330 336 Each block is a dictionary with keys in `[0, 15]`. Values are either
331 337 another block or a revision number.
332 338 """
333 339 root = Block()
334 340 for rev in range(len(index)):
335 341 hex = nodemod.hex(index[rev][7])
336 342 _insert_into_block(index, 0, root, rev, hex)
337 343 return root
338 344
339 345
340 346 def _update_trie(index, root, last_rev):
341 347 """consume"""
342 348 changed = 0
343 349 for rev in range(last_rev + 1, len(index)):
344 350 hex = nodemod.hex(index[rev][7])
345 351 changed += _insert_into_block(index, 0, root, rev, hex)
346 352 return changed, root
347 353
348 354
349 355 def _insert_into_block(index, level, block, current_rev, current_hex):
350 356 """insert a new revision in a block
351 357
352 358 index: the index we are adding revision for
353 359 level: the depth of the current block in the trie
354 360 block: the block currently being considered
355 361 current_rev: the revision number we are adding
356 362 current_hex: the hexadecimal representation of the of that revision
357 363 """
358 364 changed = 1
359 365 if block.ondisk_id is not None:
360 366 block.ondisk_id = None
361 367 hex_digit = _to_int(current_hex[level : level + 1])
362 368 entry = block.get(hex_digit)
363 369 if entry is None:
364 370 # no entry, simply store the revision number
365 371 block[hex_digit] = current_rev
366 372 elif isinstance(entry, dict):
367 373 # need to recurse to an underlying block
368 374 changed += _insert_into_block(
369 375 index, level + 1, entry, current_rev, current_hex
370 376 )
371 377 else:
372 378 # collision with a previously unique prefix, inserting new
373 379 # vertices to fit both entry.
374 380 other_hex = nodemod.hex(index[entry][7])
375 381 other_rev = entry
376 382 new = Block()
377 383 block[hex_digit] = new
378 384 _insert_into_block(index, level + 1, new, other_rev, other_hex)
379 385 _insert_into_block(index, level + 1, new, current_rev, current_hex)
380 386 return changed
381 387
382 388
383 389 def _persist_trie(root, existing_idx=None):
384 390 """turn a nodemap trie into persistent binary data
385 391
386 392 See `_build_trie` for nodemap trie structure"""
387 393 block_map = {}
388 394 if existing_idx is not None:
389 395 base_idx = existing_idx + 1
390 396 else:
391 397 base_idx = 0
392 398 chunks = []
393 399 for tn in _walk_trie(root):
394 400 if tn.ondisk_id is not None:
395 401 block_map[id(tn)] = tn.ondisk_id
396 402 else:
397 403 block_map[id(tn)] = len(chunks) + base_idx
398 404 chunks.append(_persist_block(tn, block_map))
399 405 return b''.join(chunks)
400 406
401 407
402 408 def _walk_trie(block):
403 409 """yield all the block in a trie
404 410
405 411 Children blocks are always yield before their parent block.
406 412 """
407 413 for (_, item) in sorted(block.items()):
408 414 if isinstance(item, dict):
409 415 for sub_block in _walk_trie(item):
410 416 yield sub_block
411 417 yield block
412 418
413 419
414 420 def _persist_block(block_node, block_map):
415 421 """produce persistent binary data for a single block
416 422
417 423 Children block are assumed to be already persisted and present in
418 424 block_map.
419 425 """
420 426 data = tuple(_to_value(v, block_map) for v in block_node)
421 427 return S_BLOCK.pack(*data)
422 428
423 429
424 430 def _to_value(item, block_map):
425 431 """persist any value as an integer"""
426 432 if item is None:
427 433 return NO_ENTRY
428 434 elif isinstance(item, dict):
429 435 return block_map[id(item)]
430 436 else:
431 437 return _transform_rev(item)
432 438
433 439
434 440 def parse_data(data):
435 441 """parse parse nodemap data into a nodemap Trie"""
436 442 if (len(data) % S_BLOCK.size) != 0:
437 443 msg = "nodemap data size is not a multiple of block size (%d): %d"
438 444 raise error.Abort(msg % (S_BLOCK.size, len(data)))
439 445 if not data:
440 446 return Block(), None
441 447 block_map = {}
442 448 new_blocks = []
443 449 for i in range(0, len(data), S_BLOCK.size):
444 450 block = Block()
445 451 block.ondisk_id = len(block_map)
446 452 block_map[block.ondisk_id] = block
447 453 block_data = data[i : i + S_BLOCK.size]
448 454 values = S_BLOCK.unpack(block_data)
449 455 new_blocks.append((block, values))
450 456 for b, values in new_blocks:
451 457 for idx, v in enumerate(values):
452 458 if v == NO_ENTRY:
453 459 continue
454 460 elif v >= 0:
455 461 b[idx] = block_map[v]
456 462 else:
457 463 b[idx] = _transform_rev(v)
458 464 return block, i // S_BLOCK.size
459 465
460 466
461 467 # debug utility
462 468
463 469
464 470 def check_data(ui, index, data):
465 471 """verify that the provided nodemap data are valid for the given idex"""
466 472 ret = 0
467 473 ui.status((b"revision in index: %d\n") % len(index))
468 474 root, __ = parse_data(data)
469 475 all_revs = set(_all_revisions(root))
470 476 ui.status((b"revision in nodemap: %d\n") % len(all_revs))
471 477 for r in range(len(index)):
472 478 if r not in all_revs:
473 479 msg = b" revision missing from nodemap: %d\n" % r
474 480 ui.write_err(msg)
475 481 ret = 1
476 482 else:
477 483 all_revs.remove(r)
478 484 nm_rev = _find_node(root, nodemod.hex(index[r][7]))
479 485 if nm_rev is None:
480 486 msg = b" revision node does not match any entries: %d\n" % r
481 487 ui.write_err(msg)
482 488 ret = 1
483 489 elif nm_rev != r:
484 490 msg = (
485 491 b" revision node does not match the expected revision: "
486 492 b"%d != %d\n" % (r, nm_rev)
487 493 )
488 494 ui.write_err(msg)
489 495 ret = 1
490 496
491 497 if all_revs:
492 498 for r in sorted(all_revs):
493 499 msg = b" extra revision in nodemap: %d\n" % r
494 500 ui.write_err(msg)
495 501 ret = 1
496 502 return ret
497 503
498 504
499 505 def _all_revisions(root):
500 506 """return all revisions stored in a Trie"""
501 507 for block in _walk_trie(root):
502 508 for v in block:
503 509 if v is None or isinstance(v, Block):
504 510 continue
505 511 yield v
506 512
507 513
508 514 def _find_node(block, node):
509 515 """find the revision associated with a given node"""
510 516 entry = block.get(_to_int(node[0:1]))
511 517 if isinstance(entry, dict):
512 518 return _find_node(entry, node[1:])
513 519 return entry
General Comments 0
You need to be logged in to leave comments. Login now