##// END OF EJS Templates
nodemap: write new data from the expected current data length...
marmoute -
r44810:2ea6a67f default
parent child Browse files
Show More
@@ -1,507 +1,508
1 1 # nodemap.py - nodemap related code and utilities
2 2 #
3 3 # Copyright 2019 Pierre-Yves David <pierre-yves.david@octobus.net>
4 4 # Copyright 2019 George Racinet <georges.racinet@octobus.net>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from __future__ import absolute_import
10 10
11 11 import os
12 12 import re
13 13 import struct
14 14
15 15 from .. import (
16 16 error,
17 17 node as nodemod,
18 18 util,
19 19 )
20 20
21 21
22 22 class NodeMap(dict):
23 23 def __missing__(self, x):
24 24 raise error.RevlogError(b'unknown node: %s' % x)
25 25
26 26
27 27 def persisted_data(revlog):
28 28 """read the nodemap for a revlog from disk"""
29 29 if revlog.nodemap_file is None:
30 30 return None
31 31 pdata = revlog.opener.tryread(revlog.nodemap_file)
32 32 if not pdata:
33 33 return None
34 34 offset = 0
35 35 (version,) = S_VERSION.unpack(pdata[offset : offset + S_VERSION.size])
36 36 if version != ONDISK_VERSION:
37 37 return None
38 38 offset += S_VERSION.size
39 39 headers = S_HEADER.unpack(pdata[offset : offset + S_HEADER.size])
40 40 uid_size, tip_rev, data_length, data_unused = headers
41 41 offset += S_HEADER.size
42 42 docket = NodeMapDocket(pdata[offset : offset + uid_size])
43 43 docket.tip_rev = tip_rev
44 44 docket.data_length = data_length
45 45 docket.data_unused = data_unused
46 46
47 47 filename = _rawdata_filepath(revlog, docket)
48 48 return docket, revlog.opener.tryread(filename)
49 49
50 50
51 51 def setup_persistent_nodemap(tr, revlog):
52 52 """Install whatever is needed transaction side to persist a nodemap on disk
53 53
54 54 (only actually persist the nodemap if this is relevant for this revlog)
55 55 """
56 56 if revlog._inline:
57 57 return # inlined revlog are too small for this to be relevant
58 58 if revlog.nodemap_file is None:
59 59 return # we do not use persistent_nodemap on this revlog
60 60 callback_id = b"revlog-persistent-nodemap-%s" % revlog.nodemap_file
61 61 if tr.hasfinalize(callback_id):
62 62 return # no need to register again
63 63 tr.addfinalize(callback_id, lambda tr: _persist_nodemap(tr, revlog))
64 64
65 65
66 66 def _persist_nodemap(tr, revlog):
67 67 """Write nodemap data on disk for a given revlog
68 68 """
69 69 if getattr(revlog, 'filteredrevs', ()):
70 70 raise error.ProgrammingError(
71 71 "cannot persist nodemap of a filtered changelog"
72 72 )
73 73 if revlog.nodemap_file is None:
74 74 msg = "calling persist nodemap on a revlog without the feature enableb"
75 75 raise error.ProgrammingError(msg)
76 76
77 77 can_incremental = util.safehasattr(revlog.index, "nodemap_data_incremental")
78 78 ondisk_docket = revlog._nodemap_docket
79 79
80 80 data = None
81 81 # first attemp an incremental update of the data
82 82 if can_incremental and ondisk_docket is not None:
83 83 target_docket = revlog._nodemap_docket.copy()
84 84 (
85 85 src_docket,
86 86 data_changed_count,
87 87 data,
88 88 ) = revlog.index.nodemap_data_incremental()
89 89 if src_docket != target_docket:
90 90 data = None
91 91 else:
92 92 datafile = _rawdata_filepath(revlog, target_docket)
93 93 # EXP-TODO: if this is a cache, this should use a cache vfs, not a
94 94 # store vfs
95 with revlog.opener(datafile, b'a') as fd:
95 with revlog.opener(datafile, b'r+') as fd:
96 fd.seek(target_docket.data_length)
96 97 fd.write(data)
97 98 target_docket.data_length += len(data)
98 99 target_docket.data_unused += data_changed_count
99 100
100 101 if data is None:
101 102 # otherwise fallback to a full new export
102 103 target_docket = NodeMapDocket()
103 104 datafile = _rawdata_filepath(revlog, target_docket)
104 105 if util.safehasattr(revlog.index, "nodemap_data_all"):
105 106 data = revlog.index.nodemap_data_all()
106 107 else:
107 108 data = persistent_data(revlog.index)
108 109 # EXP-TODO: if this is a cache, this should use a cache vfs, not a
109 110 # store vfs
110 111 with revlog.opener(datafile, b'w') as fd:
111 112 fd.write(data)
112 113 target_docket.data_length = len(data)
113 114 target_docket.tip_rev = revlog.tiprev()
114 115 # EXP-TODO: if this is a cache, this should use a cache vfs, not a
115 116 # store vfs
116 117 with revlog.opener(revlog.nodemap_file, b'w', atomictemp=True) as fp:
117 118 fp.write(target_docket.serialize())
118 119 revlog._nodemap_docket = target_docket
119 120 # EXP-TODO: if the transaction abort, we should remove the new data and
120 121 # reinstall the old one.
121 122
122 123 # search for old index file in all cases, some older process might have
123 124 # left one behind.
124 125 olds = _other_rawdata_filepath(revlog, target_docket)
125 126 if olds:
126 127 realvfs = getattr(revlog, '_realopener', revlog.opener)
127 128
128 129 def cleanup(tr):
129 130 for oldfile in olds:
130 131 realvfs.tryunlink(oldfile)
131 132
132 133 callback_id = b"revlog-cleanup-nodemap-%s" % revlog.nodemap_file
133 134 tr.addpostclose(callback_id, cleanup)
134 135
135 136
136 137 ### Nodemap docket file
137 138 #
138 139 # The nodemap data are stored on disk using 2 files:
139 140 #
140 141 # * a raw data files containing a persistent nodemap
141 142 # (see `Nodemap Trie` section)
142 143 #
143 144 # * a small "docket" file containing medatadata
144 145 #
145 146 # While the nodemap data can be multiple tens of megabytes, the "docket" is
146 147 # small, it is easy to update it automatically or to duplicated its content
147 148 # during a transaction.
148 149 #
149 150 # Multiple raw data can exist at the same time (The currently valid one and a
150 151 # new one beind used by an in progress transaction). To accomodate this, the
151 152 # filename hosting the raw data has a variable parts. The exact filename is
152 153 # specified inside the "docket" file.
153 154 #
154 155 # The docket file contains information to find, qualify and validate the raw
155 156 # data. Its content is currently very light, but it will expand as the on disk
156 157 # nodemap gains the necessary features to be used in production.
157 158
158 159 # version 0 is experimental, no BC garantee, do no use outside of tests.
159 160 ONDISK_VERSION = 0
160 161 S_VERSION = struct.Struct(">B")
161 162 S_HEADER = struct.Struct(">BQQQ")
162 163
163 164 ID_SIZE = 8
164 165
165 166
166 167 def _make_uid():
167 168 """return a new unique identifier.
168 169
169 170 The identifier is random and composed of ascii characters."""
170 171 return nodemod.hex(os.urandom(ID_SIZE))
171 172
172 173
173 174 class NodeMapDocket(object):
174 175 """metadata associated with persistent nodemap data
175 176
176 177 The persistent data may come from disk or be on their way to disk.
177 178 """
178 179
179 180 def __init__(self, uid=None):
180 181 if uid is None:
181 182 uid = _make_uid()
182 183 self.uid = uid
183 184 self.tip_rev = None
184 185 self.data_length = None
185 186 self.data_unused = 0
186 187
187 188 def copy(self):
188 189 new = NodeMapDocket(uid=self.uid)
189 190 new.tip_rev = self.tip_rev
190 191 new.data_length = self.data_length
191 192 new.data_unused = self.data_unused
192 193 return new
193 194
194 195 def __cmp__(self, other):
195 196 if self.uid < other.uid:
196 197 return -1
197 198 if self.uid > other.uid:
198 199 return 1
199 200 elif self.data_length < other.data_length:
200 201 return -1
201 202 elif self.data_length > other.data_length:
202 203 return 1
203 204 return 0
204 205
205 206 def __eq__(self, other):
206 207 return self.uid == other.uid and self.data_length == other.data_length
207 208
208 209 def serialize(self):
209 210 """return serialized bytes for a docket using the passed uid"""
210 211 data = []
211 212 data.append(S_VERSION.pack(ONDISK_VERSION))
212 213 headers = (
213 214 len(self.uid),
214 215 self.tip_rev,
215 216 self.data_length,
216 217 self.data_unused,
217 218 )
218 219 data.append(S_HEADER.pack(*headers))
219 220 data.append(self.uid)
220 221 return b''.join(data)
221 222
222 223
223 224 def _rawdata_filepath(revlog, docket):
224 225 """The (vfs relative) nodemap's rawdata file for a given uid"""
225 226 prefix = revlog.nodemap_file[:-2]
226 227 return b"%s-%s.nd" % (prefix, docket.uid)
227 228
228 229
229 230 def _other_rawdata_filepath(revlog, docket):
230 231 prefix = revlog.nodemap_file[:-2]
231 232 pattern = re.compile(b"(^|/)%s-[0-9a-f]+\.nd$" % prefix)
232 233 new_file_path = _rawdata_filepath(revlog, docket)
233 234 new_file_name = revlog.opener.basename(new_file_path)
234 235 dirpath = revlog.opener.dirname(new_file_path)
235 236 others = []
236 237 for f in revlog.opener.listdir(dirpath):
237 238 if pattern.match(f) and f != new_file_name:
238 239 others.append(f)
239 240 return others
240 241
241 242
242 243 ### Nodemap Trie
243 244 #
244 245 # This is a simple reference implementation to compute and persist a nodemap
245 246 # trie. This reference implementation is write only. The python version of this
246 247 # is not expected to be actually used, since it wont provide performance
247 248 # improvement over existing non-persistent C implementation.
248 249 #
249 250 # The nodemap is persisted as Trie using 4bits-address/16-entries block. each
250 251 # revision can be adressed using its node shortest prefix.
251 252 #
252 253 # The trie is stored as a sequence of block. Each block contains 16 entries
253 254 # (signed 64bit integer, big endian). Each entry can be one of the following:
254 255 #
255 256 # * value >= 0 -> index of sub-block
256 257 # * value == -1 -> no value
257 258 # * value < -1 -> a revision value: rev = -(value+10)
258 259 #
259 260 # The implementation focus on simplicity, not on performance. A Rust
260 261 # implementation should provide a efficient version of the same binary
261 262 # persistence. This reference python implementation is never meant to be
262 263 # extensively use in production.
263 264
264 265
265 266 def persistent_data(index):
266 267 """return the persistent binary form for a nodemap for a given index
267 268 """
268 269 trie = _build_trie(index)
269 270 return _persist_trie(trie)
270 271
271 272
272 273 def update_persistent_data(index, root, max_idx, last_rev):
273 274 """return the incremental update for persistent nodemap from a given index
274 275 """
275 276 changed_block, trie = _update_trie(index, root, last_rev)
276 277 return (
277 278 changed_block * S_BLOCK.size,
278 279 _persist_trie(trie, existing_idx=max_idx),
279 280 )
280 281
281 282
282 283 S_BLOCK = struct.Struct(">" + ("l" * 16))
283 284
284 285 NO_ENTRY = -1
285 286 # rev 0 need to be -2 because 0 is used by block, -1 is a special value.
286 287 REV_OFFSET = 2
287 288
288 289
289 290 def _transform_rev(rev):
290 291 """Return the number used to represent the rev in the tree.
291 292
292 293 (or retrieve a rev number from such representation)
293 294
294 295 Note that this is an involution, a function equal to its inverse (i.e.
295 296 which gives the identity when applied to itself).
296 297 """
297 298 return -(rev + REV_OFFSET)
298 299
299 300
300 301 def _to_int(hex_digit):
301 302 """turn an hexadecimal digit into a proper integer"""
302 303 return int(hex_digit, 16)
303 304
304 305
305 306 class Block(dict):
306 307 """represent a block of the Trie
307 308
308 309 contains up to 16 entry indexed from 0 to 15"""
309 310
310 311 def __init__(self):
311 312 super(Block, self).__init__()
312 313 # If this block exist on disk, here is its ID
313 314 self.ondisk_id = None
314 315
315 316 def __iter__(self):
316 317 return iter(self.get(i) for i in range(16))
317 318
318 319
319 320 def _build_trie(index):
320 321 """build a nodemap trie
321 322
322 323 The nodemap stores revision number for each unique prefix.
323 324
324 325 Each block is a dictionary with keys in `[0, 15]`. Values are either
325 326 another block or a revision number.
326 327 """
327 328 root = Block()
328 329 for rev in range(len(index)):
329 330 hex = nodemod.hex(index[rev][7])
330 331 _insert_into_block(index, 0, root, rev, hex)
331 332 return root
332 333
333 334
334 335 def _update_trie(index, root, last_rev):
335 336 """consume"""
336 337 changed = 0
337 338 for rev in range(last_rev + 1, len(index)):
338 339 hex = nodemod.hex(index[rev][7])
339 340 changed += _insert_into_block(index, 0, root, rev, hex)
340 341 return changed, root
341 342
342 343
343 344 def _insert_into_block(index, level, block, current_rev, current_hex):
344 345 """insert a new revision in a block
345 346
346 347 index: the index we are adding revision for
347 348 level: the depth of the current block in the trie
348 349 block: the block currently being considered
349 350 current_rev: the revision number we are adding
350 351 current_hex: the hexadecimal representation of the of that revision
351 352 """
352 353 changed = 1
353 354 if block.ondisk_id is not None:
354 355 block.ondisk_id = None
355 356 hex_digit = _to_int(current_hex[level : level + 1])
356 357 entry = block.get(hex_digit)
357 358 if entry is None:
358 359 # no entry, simply store the revision number
359 360 block[hex_digit] = current_rev
360 361 elif isinstance(entry, dict):
361 362 # need to recurse to an underlying block
362 363 changed += _insert_into_block(
363 364 index, level + 1, entry, current_rev, current_hex
364 365 )
365 366 else:
366 367 # collision with a previously unique prefix, inserting new
367 368 # vertices to fit both entry.
368 369 other_hex = nodemod.hex(index[entry][7])
369 370 other_rev = entry
370 371 new = Block()
371 372 block[hex_digit] = new
372 373 _insert_into_block(index, level + 1, new, other_rev, other_hex)
373 374 _insert_into_block(index, level + 1, new, current_rev, current_hex)
374 375 return changed
375 376
376 377
377 378 def _persist_trie(root, existing_idx=None):
378 379 """turn a nodemap trie into persistent binary data
379 380
380 381 See `_build_trie` for nodemap trie structure"""
381 382 block_map = {}
382 383 if existing_idx is not None:
383 384 base_idx = existing_idx + 1
384 385 else:
385 386 base_idx = 0
386 387 chunks = []
387 388 for tn in _walk_trie(root):
388 389 if tn.ondisk_id is not None:
389 390 block_map[id(tn)] = tn.ondisk_id
390 391 else:
391 392 block_map[id(tn)] = len(chunks) + base_idx
392 393 chunks.append(_persist_block(tn, block_map))
393 394 return b''.join(chunks)
394 395
395 396
396 397 def _walk_trie(block):
397 398 """yield all the block in a trie
398 399
399 400 Children blocks are always yield before their parent block.
400 401 """
401 402 for (_, item) in sorted(block.items()):
402 403 if isinstance(item, dict):
403 404 for sub_block in _walk_trie(item):
404 405 yield sub_block
405 406 yield block
406 407
407 408
408 409 def _persist_block(block_node, block_map):
409 410 """produce persistent binary data for a single block
410 411
411 412 Children block are assumed to be already persisted and present in
412 413 block_map.
413 414 """
414 415 data = tuple(_to_value(v, block_map) for v in block_node)
415 416 return S_BLOCK.pack(*data)
416 417
417 418
418 419 def _to_value(item, block_map):
419 420 """persist any value as an integer"""
420 421 if item is None:
421 422 return NO_ENTRY
422 423 elif isinstance(item, dict):
423 424 return block_map[id(item)]
424 425 else:
425 426 return _transform_rev(item)
426 427
427 428
428 429 def parse_data(data):
429 430 """parse parse nodemap data into a nodemap Trie"""
430 431 if (len(data) % S_BLOCK.size) != 0:
431 432 msg = "nodemap data size is not a multiple of block size (%d): %d"
432 433 raise error.Abort(msg % (S_BLOCK.size, len(data)))
433 434 if not data:
434 435 return Block(), None
435 436 block_map = {}
436 437 new_blocks = []
437 438 for i in range(0, len(data), S_BLOCK.size):
438 439 block = Block()
439 440 block.ondisk_id = len(block_map)
440 441 block_map[block.ondisk_id] = block
441 442 block_data = data[i : i + S_BLOCK.size]
442 443 values = S_BLOCK.unpack(block_data)
443 444 new_blocks.append((block, values))
444 445 for b, values in new_blocks:
445 446 for idx, v in enumerate(values):
446 447 if v == NO_ENTRY:
447 448 continue
448 449 elif v >= 0:
449 450 b[idx] = block_map[v]
450 451 else:
451 452 b[idx] = _transform_rev(v)
452 453 return block, i // S_BLOCK.size
453 454
454 455
455 456 # debug utility
456 457
457 458
458 459 def check_data(ui, index, data):
459 460 """verify that the provided nodemap data are valid for the given idex"""
460 461 ret = 0
461 462 ui.status((b"revision in index: %d\n") % len(index))
462 463 root, __ = parse_data(data)
463 464 all_revs = set(_all_revisions(root))
464 465 ui.status((b"revision in nodemap: %d\n") % len(all_revs))
465 466 for r in range(len(index)):
466 467 if r not in all_revs:
467 468 msg = b" revision missing from nodemap: %d\n" % r
468 469 ui.write_err(msg)
469 470 ret = 1
470 471 else:
471 472 all_revs.remove(r)
472 473 nm_rev = _find_node(root, nodemod.hex(index[r][7]))
473 474 if nm_rev is None:
474 475 msg = b" revision node does not match any entries: %d\n" % r
475 476 ui.write_err(msg)
476 477 ret = 1
477 478 elif nm_rev != r:
478 479 msg = (
479 480 b" revision node does not match the expected revision: "
480 481 b"%d != %d\n" % (r, nm_rev)
481 482 )
482 483 ui.write_err(msg)
483 484 ret = 1
484 485
485 486 if all_revs:
486 487 for r in sorted(all_revs):
487 488 msg = b" extra revision in nodemap: %d\n" % r
488 489 ui.write_err(msg)
489 490 ret = 1
490 491 return ret
491 492
492 493
493 494 def _all_revisions(root):
494 495 """return all revisions stored in a Trie"""
495 496 for block in _walk_trie(root):
496 497 for v in block:
497 498 if v is None or isinstance(v, Block):
498 499 continue
499 500 yield v
500 501
501 502
502 503 def _find_node(block, node):
503 504 """find the revision associated with a given node"""
504 505 entry = block.get(_to_int(node[0:1]))
505 506 if isinstance(entry, dict):
506 507 return _find_node(entry, node[1:])
507 508 return entry
General Comments 0
You need to be logged in to leave comments. Login now