##// END OF EJS Templates
sqlitestore: add an `ancestors` method...
marmoute -
r50684:df750b81 stable
parent child Browse files
Show More
@@ -1,1324 +1,1343 b''
1 1 # sqlitestore.py - Storage backend that uses SQLite
2 2 #
3 3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 """store repository data in SQLite (EXPERIMENTAL)
9 9
10 10 The sqlitestore extension enables the storage of repository data in SQLite.
11 11
12 12 This extension is HIGHLY EXPERIMENTAL. There are NO BACKWARDS COMPATIBILITY
13 13 GUARANTEES. This means that repositories created with this extension may
14 14 only be usable with the exact version of this extension/Mercurial that was
15 15 used. The extension attempts to enforce this in order to prevent repository
16 16 corruption.
17 17
18 18 In addition, several features are not yet supported or have known bugs:
19 19
20 20 * Only some data is stored in SQLite. Changeset, manifest, and other repository
21 21 data is not yet stored in SQLite.
22 22 * Transactions are not robust. If the process is aborted at the right time
23 23 during transaction close/rollback, the repository could be in an inconsistent
24 24 state. This problem will diminish once all repository data is tracked by
25 25 SQLite.
26 26 * Bundle repositories do not work (the ability to use e.g.
27 27 `hg -R <bundle-file> log` to automatically overlay a bundle on top of the
28 28 existing repository).
29 29 * Various other features don't work.
30 30
31 31 This extension should work for basic clone/pull, update, and commit workflows.
32 32 Some history rewriting operations may fail due to lack of support for bundle
33 33 repositories.
34 34
35 35 To use, activate the extension and set the ``storage.new-repo-backend`` config
36 36 option to ``sqlite`` to enable new repositories to use SQLite for storage.
37 37 """
38 38
39 39 # To run the test suite with repos using SQLite by default, execute the
40 40 # following:
41 41 #
42 42 # HGREPOFEATURES="sqlitestore" run-tests.py \
43 43 # --extra-config-opt extensions.sqlitestore= \
44 44 # --extra-config-opt storage.new-repo-backend=sqlite
45 45
46 46
47 47 import sqlite3
48 48 import struct
49 49 import threading
50 50 import zlib
51 51
52 52 from mercurial.i18n import _
53 53 from mercurial.node import (
54 54 nullrev,
55 55 sha1nodeconstants,
56 56 short,
57 57 )
58 58 from mercurial.thirdparty import attr
59 59 from mercurial import (
60 60 ancestor,
61 61 dagop,
62 62 encoding,
63 63 error,
64 64 extensions,
65 65 localrepo,
66 66 mdiff,
67 67 pycompat,
68 68 registrar,
69 69 requirements,
70 70 util,
71 71 verify,
72 72 )
73 73 from mercurial.interfaces import (
74 74 repository,
75 75 util as interfaceutil,
76 76 )
77 77 from mercurial.utils import (
78 78 hashutil,
79 79 storageutil,
80 80 )
81 81
82 82 try:
83 83 from mercurial import zstd
84 84
85 85 zstd.__version__
86 86 except ImportError:
87 87 zstd = None
88 88
89 89 configtable = {}
90 90 configitem = registrar.configitem(configtable)
91 91
92 92 # experimental config: storage.sqlite.compression
93 93 configitem(
94 94 b'storage',
95 95 b'sqlite.compression',
96 96 default=b'zstd' if zstd else b'zlib',
97 97 experimental=True,
98 98 )
99 99
100 100 # Note for extension authors: ONLY specify testedwith = 'ships-with-hg-core' for
101 101 # extensions which SHIP WITH MERCURIAL. Non-mainline extensions should
102 102 # be specifying the version(s) of Mercurial they are tested with, or
103 103 # leave the attribute unspecified.
104 104 testedwith = b'ships-with-hg-core'
105 105
106 106 REQUIREMENT = b'exp-sqlite-001'
107 107 REQUIREMENT_ZSTD = b'exp-sqlite-comp-001=zstd'
108 108 REQUIREMENT_ZLIB = b'exp-sqlite-comp-001=zlib'
109 109 REQUIREMENT_NONE = b'exp-sqlite-comp-001=none'
110 110 REQUIREMENT_SHALLOW_FILES = b'exp-sqlite-shallow-files'
111 111
112 112 CURRENT_SCHEMA_VERSION = 1
113 113
114 114 COMPRESSION_NONE = 1
115 115 COMPRESSION_ZSTD = 2
116 116 COMPRESSION_ZLIB = 3
117 117
118 118 FLAG_CENSORED = 1
119 119 FLAG_MISSING_P1 = 2
120 120 FLAG_MISSING_P2 = 4
121 121
122 122 CREATE_SCHEMA = [
123 123 # Deltas are stored as content-indexed blobs.
124 124 # compression column holds COMPRESSION_* constant for how the
125 125 # delta is encoded.
126 126 'CREATE TABLE delta ('
127 127 ' id INTEGER PRIMARY KEY, '
128 128 ' compression INTEGER NOT NULL, '
129 129 ' hash BLOB UNIQUE ON CONFLICT ABORT, '
130 130 ' delta BLOB NOT NULL '
131 131 ')',
132 132 # Tracked paths are denormalized to integers to avoid redundant
133 133 # storage of the path name.
134 134 'CREATE TABLE filepath ('
135 135 ' id INTEGER PRIMARY KEY, '
136 136 ' path BLOB NOT NULL '
137 137 ')',
138 138 'CREATE UNIQUE INDEX filepath_path ON filepath (path)',
139 139 # We have a single table for all file revision data.
140 140 # Each file revision is uniquely described by a (path, rev) and
141 141 # (path, node).
142 142 #
143 143 # Revision data is stored as a pointer to the delta producing this
144 144 # revision and the file revision whose delta should be applied before
145 145 # that one. One can reconstruct the delta chain by recursively following
146 146 # the delta base revision pointers until one encounters NULL.
147 147 #
148 148 # flags column holds bitwise integer flags controlling storage options.
149 149 # These flags are defined by the FLAG_* constants.
150 150 'CREATE TABLE fileindex ('
151 151 ' id INTEGER PRIMARY KEY, '
152 152 ' pathid INTEGER REFERENCES filepath(id), '
153 153 ' revnum INTEGER NOT NULL, '
154 154 ' p1rev INTEGER NOT NULL, '
155 155 ' p2rev INTEGER NOT NULL, '
156 156 ' linkrev INTEGER NOT NULL, '
157 157 ' flags INTEGER NOT NULL, '
158 158 ' deltaid INTEGER REFERENCES delta(id), '
159 159 ' deltabaseid INTEGER REFERENCES fileindex(id), '
160 160 ' node BLOB NOT NULL '
161 161 ')',
162 162 'CREATE UNIQUE INDEX fileindex_pathrevnum '
163 163 ' ON fileindex (pathid, revnum)',
164 164 'CREATE UNIQUE INDEX fileindex_pathnode ON fileindex (pathid, node)',
165 165 # Provide a view over all file data for convenience.
166 166 'CREATE VIEW filedata AS '
167 167 'SELECT '
168 168 ' fileindex.id AS id, '
169 169 ' filepath.id AS pathid, '
170 170 ' filepath.path AS path, '
171 171 ' fileindex.revnum AS revnum, '
172 172 ' fileindex.node AS node, '
173 173 ' fileindex.p1rev AS p1rev, '
174 174 ' fileindex.p2rev AS p2rev, '
175 175 ' fileindex.linkrev AS linkrev, '
176 176 ' fileindex.flags AS flags, '
177 177 ' fileindex.deltaid AS deltaid, '
178 178 ' fileindex.deltabaseid AS deltabaseid '
179 179 'FROM filepath, fileindex '
180 180 'WHERE fileindex.pathid=filepath.id',
181 181 'PRAGMA user_version=%d' % CURRENT_SCHEMA_VERSION,
182 182 ]
183 183
184 184
185 185 def resolvedeltachain(db, pathid, node, revisioncache, stoprids, zstddctx=None):
186 186 """Resolve a delta chain for a file node."""
187 187
188 188 # TODO the "not in ({stops})" here is possibly slowing down the query
189 189 # because it needs to perform the lookup on every recursive invocation.
190 190 # This could possibly be faster if we created a temporary query with
191 191 # baseid "poisoned" to null and limited the recursive filter to
192 192 # "is not null".
193 193 res = db.execute(
194 194 'WITH RECURSIVE '
195 195 ' deltachain(deltaid, baseid) AS ('
196 196 ' SELECT deltaid, deltabaseid FROM fileindex '
197 197 ' WHERE pathid=? AND node=? '
198 198 ' UNION ALL '
199 199 ' SELECT fileindex.deltaid, deltabaseid '
200 200 ' FROM fileindex, deltachain '
201 201 ' WHERE '
202 202 ' fileindex.id=deltachain.baseid '
203 203 ' AND deltachain.baseid IS NOT NULL '
204 204 ' AND fileindex.id NOT IN ({stops}) '
205 205 ' ) '
206 206 'SELECT deltachain.baseid, compression, delta '
207 207 'FROM deltachain, delta '
208 208 'WHERE delta.id=deltachain.deltaid'.format(
209 209 stops=','.join(['?'] * len(stoprids))
210 210 ),
211 211 tuple([pathid, node] + list(stoprids.keys())),
212 212 )
213 213
214 214 deltas = []
215 215 lastdeltabaseid = None
216 216
217 217 for deltabaseid, compression, delta in res:
218 218 lastdeltabaseid = deltabaseid
219 219
220 220 if compression == COMPRESSION_ZSTD:
221 221 delta = zstddctx.decompress(delta)
222 222 elif compression == COMPRESSION_NONE:
223 223 delta = delta
224 224 elif compression == COMPRESSION_ZLIB:
225 225 delta = zlib.decompress(delta)
226 226 else:
227 227 raise SQLiteStoreError(
228 228 b'unhandled compression type: %d' % compression
229 229 )
230 230
231 231 deltas.append(delta)
232 232
233 233 if lastdeltabaseid in stoprids:
234 234 basetext = revisioncache[stoprids[lastdeltabaseid]]
235 235 else:
236 236 basetext = deltas.pop()
237 237
238 238 deltas.reverse()
239 239 fulltext = mdiff.patches(basetext, deltas)
240 240
241 241 # SQLite returns buffer instances for blob columns on Python 2. This
242 242 # type can propagate through the delta application layer. Because
243 243 # downstream callers assume revisions are bytes, cast as needed.
244 244 if not isinstance(fulltext, bytes):
245 245 fulltext = bytes(delta)
246 246
247 247 return fulltext
248 248
249 249
250 250 def insertdelta(db, compression, hash, delta):
251 251 try:
252 252 return db.execute(
253 253 'INSERT INTO delta (compression, hash, delta) VALUES (?, ?, ?)',
254 254 (compression, hash, delta),
255 255 ).lastrowid
256 256 except sqlite3.IntegrityError:
257 257 return db.execute(
258 258 'SELECT id FROM delta WHERE hash=?', (hash,)
259 259 ).fetchone()[0]
260 260
261 261
262 262 class SQLiteStoreError(error.StorageError):
263 263 pass
264 264
265 265
266 266 @attr.s
267 267 class revisionentry:
268 268 rid = attr.ib()
269 269 rev = attr.ib()
270 270 node = attr.ib()
271 271 p1rev = attr.ib()
272 272 p2rev = attr.ib()
273 273 p1node = attr.ib()
274 274 p2node = attr.ib()
275 275 linkrev = attr.ib()
276 276 flags = attr.ib()
277 277
278 278
279 279 @interfaceutil.implementer(repository.irevisiondelta)
280 280 @attr.s(slots=True)
281 281 class sqliterevisiondelta:
282 282 node = attr.ib()
283 283 p1node = attr.ib()
284 284 p2node = attr.ib()
285 285 basenode = attr.ib()
286 286 flags = attr.ib()
287 287 baserevisionsize = attr.ib()
288 288 revision = attr.ib()
289 289 delta = attr.ib()
290 290 sidedata = attr.ib()
291 291 protocol_flags = attr.ib()
292 292 linknode = attr.ib(default=None)
293 293
294 294
295 295 @interfaceutil.implementer(repository.iverifyproblem)
296 296 @attr.s(frozen=True)
297 297 class sqliteproblem:
298 298 warning = attr.ib(default=None)
299 299 error = attr.ib(default=None)
300 300 node = attr.ib(default=None)
301 301
302 302
303 303 @interfaceutil.implementer(repository.ifilestorage)
304 304 class sqlitefilestore:
305 305 """Implements storage for an individual tracked path."""
306 306
307 307 def __init__(self, db, path, compression):
308 308 self.nullid = sha1nodeconstants.nullid
309 309 self._db = db
310 310 self._path = path
311 311
312 312 self._pathid = None
313 313
314 314 # revnum -> node
315 315 self._revtonode = {}
316 316 # node -> revnum
317 317 self._nodetorev = {}
318 318 # node -> data structure
319 319 self._revisions = {}
320 320
321 321 self._revisioncache = util.lrucachedict(10)
322 322
323 323 self._compengine = compression
324 324
325 325 if compression == b'zstd':
326 326 self._cctx = zstd.ZstdCompressor(level=3)
327 327 self._dctx = zstd.ZstdDecompressor()
328 328 else:
329 329 self._cctx = None
330 330 self._dctx = None
331 331
332 332 self._refreshindex()
333 333
334 334 def _refreshindex(self):
335 335 self._revtonode = {}
336 336 self._nodetorev = {}
337 337 self._revisions = {}
338 338
339 339 res = list(
340 340 self._db.execute(
341 341 'SELECT id FROM filepath WHERE path=?', (self._path,)
342 342 )
343 343 )
344 344
345 345 if not res:
346 346 self._pathid = None
347 347 return
348 348
349 349 self._pathid = res[0][0]
350 350
351 351 res = self._db.execute(
352 352 'SELECT id, revnum, node, p1rev, p2rev, linkrev, flags '
353 353 'FROM fileindex '
354 354 'WHERE pathid=? '
355 355 'ORDER BY revnum ASC',
356 356 (self._pathid,),
357 357 )
358 358
359 359 for i, row in enumerate(res):
360 360 rid, rev, node, p1rev, p2rev, linkrev, flags = row
361 361
362 362 if i != rev:
363 363 raise SQLiteStoreError(
364 364 _(b'sqlite database has inconsistent revision numbers')
365 365 )
366 366
367 367 if p1rev == nullrev:
368 368 p1node = sha1nodeconstants.nullid
369 369 else:
370 370 p1node = self._revtonode[p1rev]
371 371
372 372 if p2rev == nullrev:
373 373 p2node = sha1nodeconstants.nullid
374 374 else:
375 375 p2node = self._revtonode[p2rev]
376 376
377 377 entry = revisionentry(
378 378 rid=rid,
379 379 rev=rev,
380 380 node=node,
381 381 p1rev=p1rev,
382 382 p2rev=p2rev,
383 383 p1node=p1node,
384 384 p2node=p2node,
385 385 linkrev=linkrev,
386 386 flags=flags,
387 387 )
388 388
389 389 self._revtonode[rev] = node
390 390 self._nodetorev[node] = rev
391 391 self._revisions[node] = entry
392 392
393 393 # Start of ifileindex interface.
394 394
395 395 def __len__(self):
396 396 return len(self._revisions)
397 397
398 398 def __iter__(self):
399 399 return iter(range(len(self._revisions)))
400 400
401 401 def hasnode(self, node):
402 402 if node == sha1nodeconstants.nullid:
403 403 return False
404 404
405 405 return node in self._nodetorev
406 406
407 407 def revs(self, start=0, stop=None):
408 408 return storageutil.iterrevs(
409 409 len(self._revisions), start=start, stop=stop
410 410 )
411 411
412 412 def parents(self, node):
413 413 if node == sha1nodeconstants.nullid:
414 414 return sha1nodeconstants.nullid, sha1nodeconstants.nullid
415 415
416 416 if node not in self._revisions:
417 417 raise error.LookupError(node, self._path, _(b'no node'))
418 418
419 419 entry = self._revisions[node]
420 420 return entry.p1node, entry.p2node
421 421
422 422 def parentrevs(self, rev):
423 423 if rev == nullrev:
424 424 return nullrev, nullrev
425 425
426 426 if rev not in self._revtonode:
427 427 raise IndexError(rev)
428 428
429 429 entry = self._revisions[self._revtonode[rev]]
430 430 return entry.p1rev, entry.p2rev
431 431
432 def ancestors(self, revs, stoprev=0, inclusive=False):
433 """Generate the ancestors of 'revs' in reverse revision order.
434 Does not generate revs lower than stoprev.
435
436 See the documentation for ancestor.lazyancestors for more details."""
437
438 # first, make sure start revisions aren't filtered
439 revs = list(revs)
440 checkrev = self.node
441 for r in revs:
442 checkrev(r)
443
444 return ancestor.lazyancestors(
445 self.parentrevs,
446 revs,
447 stoprev=stoprev,
448 inclusive=inclusive,
449 )
450
432 451 def rev(self, node):
433 452 if node == sha1nodeconstants.nullid:
434 453 return nullrev
435 454
436 455 if node not in self._nodetorev:
437 456 raise error.LookupError(node, self._path, _(b'no node'))
438 457
439 458 return self._nodetorev[node]
440 459
441 460 def node(self, rev):
442 461 if rev == nullrev:
443 462 return sha1nodeconstants.nullid
444 463
445 464 if rev not in self._revtonode:
446 465 raise IndexError(rev)
447 466
448 467 return self._revtonode[rev]
449 468
450 469 def lookup(self, node):
451 470 return storageutil.fileidlookup(self, node, self._path)
452 471
453 472 def linkrev(self, rev):
454 473 if rev == nullrev:
455 474 return nullrev
456 475
457 476 if rev not in self._revtonode:
458 477 raise IndexError(rev)
459 478
460 479 entry = self._revisions[self._revtonode[rev]]
461 480 return entry.linkrev
462 481
463 482 def iscensored(self, rev):
464 483 if rev == nullrev:
465 484 return False
466 485
467 486 if rev not in self._revtonode:
468 487 raise IndexError(rev)
469 488
470 489 return self._revisions[self._revtonode[rev]].flags & FLAG_CENSORED
471 490
472 491 def commonancestorsheads(self, node1, node2):
473 492 rev1 = self.rev(node1)
474 493 rev2 = self.rev(node2)
475 494
476 495 ancestors = ancestor.commonancestorsheads(self.parentrevs, rev1, rev2)
477 496 return pycompat.maplist(self.node, ancestors)
478 497
479 498 def descendants(self, revs):
480 499 # TODO we could implement this using a recursive SQL query, which
481 500 # might be faster.
482 501 return dagop.descendantrevs(revs, self.revs, self.parentrevs)
483 502
484 503 def heads(self, start=None, stop=None):
485 504 if start is None and stop is None:
486 505 if not len(self):
487 506 return [sha1nodeconstants.nullid]
488 507
489 508 startrev = self.rev(start) if start is not None else nullrev
490 509 stoprevs = {self.rev(n) for n in stop or []}
491 510
492 511 revs = dagop.headrevssubset(
493 512 self.revs, self.parentrevs, startrev=startrev, stoprevs=stoprevs
494 513 )
495 514
496 515 return [self.node(rev) for rev in revs]
497 516
498 517 def children(self, node):
499 518 rev = self.rev(node)
500 519
501 520 res = self._db.execute(
502 521 'SELECT'
503 522 ' node '
504 523 ' FROM filedata '
505 524 ' WHERE path=? AND (p1rev=? OR p2rev=?) '
506 525 ' ORDER BY revnum ASC',
507 526 (self._path, rev, rev),
508 527 )
509 528
510 529 return [row[0] for row in res]
511 530
512 531 # End of ifileindex interface.
513 532
514 533 # Start of ifiledata interface.
515 534
516 535 def size(self, rev):
517 536 if rev == nullrev:
518 537 return 0
519 538
520 539 if rev not in self._revtonode:
521 540 raise IndexError(rev)
522 541
523 542 node = self._revtonode[rev]
524 543
525 544 if self.renamed(node):
526 545 return len(self.read(node))
527 546
528 547 return len(self.revision(node))
529 548
530 549 def revision(self, node, raw=False, _verifyhash=True):
531 550 if node in (sha1nodeconstants.nullid, nullrev):
532 551 return b''
533 552
534 553 if isinstance(node, int):
535 554 node = self.node(node)
536 555
537 556 if node not in self._nodetorev:
538 557 raise error.LookupError(node, self._path, _(b'no node'))
539 558
540 559 if node in self._revisioncache:
541 560 return self._revisioncache[node]
542 561
543 562 # Because we have a fulltext revision cache, we are able to
544 563 # short-circuit delta chain traversal and decompression as soon as
545 564 # we encounter a revision in the cache.
546 565
547 566 stoprids = {self._revisions[n].rid: n for n in self._revisioncache}
548 567
549 568 if not stoprids:
550 569 stoprids[-1] = None
551 570
552 571 fulltext = resolvedeltachain(
553 572 self._db,
554 573 self._pathid,
555 574 node,
556 575 self._revisioncache,
557 576 stoprids,
558 577 zstddctx=self._dctx,
559 578 )
560 579
561 580 # Don't verify hashes if parent nodes were rewritten, as the hash
562 581 # wouldn't verify.
563 582 if self._revisions[node].flags & (FLAG_MISSING_P1 | FLAG_MISSING_P2):
564 583 _verifyhash = False
565 584
566 585 if _verifyhash:
567 586 self._checkhash(fulltext, node)
568 587 self._revisioncache[node] = fulltext
569 588
570 589 return fulltext
571 590
572 591 def rawdata(self, *args, **kwargs):
573 592 return self.revision(*args, **kwargs)
574 593
575 594 def read(self, node):
576 595 return storageutil.filtermetadata(self.revision(node))
577 596
578 597 def renamed(self, node):
579 598 return storageutil.filerevisioncopied(self, node)
580 599
581 600 def cmp(self, node, fulltext):
582 601 return not storageutil.filedataequivalent(self, node, fulltext)
583 602
584 603 def emitrevisions(
585 604 self,
586 605 nodes,
587 606 nodesorder=None,
588 607 revisiondata=False,
589 608 assumehaveparentrevisions=False,
590 609 deltamode=repository.CG_DELTAMODE_STD,
591 610 sidedata_helpers=None,
592 611 ):
593 612 if nodesorder not in (b'nodes', b'storage', b'linear', None):
594 613 raise error.ProgrammingError(
595 614 b'unhandled value for nodesorder: %s' % nodesorder
596 615 )
597 616
598 617 nodes = [n for n in nodes if n != sha1nodeconstants.nullid]
599 618
600 619 if not nodes:
601 620 return
602 621
603 622 # TODO perform in a single query.
604 623 res = self._db.execute(
605 624 'SELECT revnum, deltaid FROM fileindex '
606 625 'WHERE pathid=? '
607 626 ' AND node in (%s)' % (','.join(['?'] * len(nodes))),
608 627 tuple([self._pathid] + nodes),
609 628 )
610 629
611 630 deltabases = {}
612 631
613 632 for rev, deltaid in res:
614 633 res = self._db.execute(
615 634 'SELECT revnum from fileindex WHERE pathid=? AND deltaid=?',
616 635 (self._pathid, deltaid),
617 636 )
618 637 deltabases[rev] = res.fetchone()[0]
619 638
620 639 # TODO define revdifffn so we can use delta from storage.
621 640 for delta in storageutil.emitrevisions(
622 641 self,
623 642 nodes,
624 643 nodesorder,
625 644 sqliterevisiondelta,
626 645 deltaparentfn=deltabases.__getitem__,
627 646 revisiondata=revisiondata,
628 647 assumehaveparentrevisions=assumehaveparentrevisions,
629 648 deltamode=deltamode,
630 649 sidedata_helpers=sidedata_helpers,
631 650 ):
632 651
633 652 yield delta
634 653
635 654 # End of ifiledata interface.
636 655
637 656 # Start of ifilemutation interface.
638 657
639 658 def add(self, filedata, meta, transaction, linkrev, p1, p2):
640 659 if meta or filedata.startswith(b'\x01\n'):
641 660 filedata = storageutil.packmeta(meta, filedata)
642 661
643 662 rev = self.addrevision(filedata, transaction, linkrev, p1, p2)
644 663 return self.node(rev)
645 664
646 665 def addrevision(
647 666 self,
648 667 revisiondata,
649 668 transaction,
650 669 linkrev,
651 670 p1,
652 671 p2,
653 672 node=None,
654 673 flags=0,
655 674 cachedelta=None,
656 675 ):
657 676 if flags:
658 677 raise SQLiteStoreError(_(b'flags not supported on revisions'))
659 678
660 679 validatehash = node is not None
661 680 node = node or storageutil.hashrevisionsha1(revisiondata, p1, p2)
662 681
663 682 if validatehash:
664 683 self._checkhash(revisiondata, node, p1, p2)
665 684
666 685 rev = self._nodetorev.get(node)
667 686 if rev is not None:
668 687 return rev
669 688
670 689 rev = self._addrawrevision(
671 690 node, revisiondata, transaction, linkrev, p1, p2
672 691 )
673 692
674 693 self._revisioncache[node] = revisiondata
675 694 return rev
676 695
677 696 def addgroup(
678 697 self,
679 698 deltas,
680 699 linkmapper,
681 700 transaction,
682 701 addrevisioncb=None,
683 702 duplicaterevisioncb=None,
684 703 maybemissingparents=False,
685 704 ):
686 705 empty = True
687 706
688 707 for (
689 708 node,
690 709 p1,
691 710 p2,
692 711 linknode,
693 712 deltabase,
694 713 delta,
695 714 wireflags,
696 715 sidedata,
697 716 ) in deltas:
698 717 storeflags = 0
699 718
700 719 if wireflags & repository.REVISION_FLAG_CENSORED:
701 720 storeflags |= FLAG_CENSORED
702 721
703 722 if wireflags & ~repository.REVISION_FLAG_CENSORED:
704 723 raise SQLiteStoreError(b'unhandled revision flag')
705 724
706 725 if maybemissingparents:
707 726 if p1 != sha1nodeconstants.nullid and not self.hasnode(p1):
708 727 p1 = sha1nodeconstants.nullid
709 728 storeflags |= FLAG_MISSING_P1
710 729
711 730 if p2 != sha1nodeconstants.nullid and not self.hasnode(p2):
712 731 p2 = sha1nodeconstants.nullid
713 732 storeflags |= FLAG_MISSING_P2
714 733
715 734 baserev = self.rev(deltabase)
716 735
717 736 # If base is censored, delta must be full replacement in a single
718 737 # patch operation.
719 738 if baserev != nullrev and self.iscensored(baserev):
720 739 hlen = struct.calcsize(b'>lll')
721 740 oldlen = len(self.rawdata(deltabase, _verifyhash=False))
722 741 newlen = len(delta) - hlen
723 742
724 743 if delta[:hlen] != mdiff.replacediffheader(oldlen, newlen):
725 744 raise error.CensoredBaseError(self._path, deltabase)
726 745
727 746 if not (storeflags & FLAG_CENSORED) and storageutil.deltaiscensored(
728 747 delta, baserev, lambda x: len(self.rawdata(x))
729 748 ):
730 749 storeflags |= FLAG_CENSORED
731 750
732 751 linkrev = linkmapper(linknode)
733 752
734 753 if node in self._revisions:
735 754 # Possibly reset parents to make them proper.
736 755 entry = self._revisions[node]
737 756
738 757 if (
739 758 entry.flags & FLAG_MISSING_P1
740 759 and p1 != sha1nodeconstants.nullid
741 760 ):
742 761 entry.p1node = p1
743 762 entry.p1rev = self._nodetorev[p1]
744 763 entry.flags &= ~FLAG_MISSING_P1
745 764
746 765 self._db.execute(
747 766 'UPDATE fileindex SET p1rev=?, flags=? WHERE id=?',
748 767 (self._nodetorev[p1], entry.flags, entry.rid),
749 768 )
750 769
751 770 if (
752 771 entry.flags & FLAG_MISSING_P2
753 772 and p2 != sha1nodeconstants.nullid
754 773 ):
755 774 entry.p2node = p2
756 775 entry.p2rev = self._nodetorev[p2]
757 776 entry.flags &= ~FLAG_MISSING_P2
758 777
759 778 self._db.execute(
760 779 'UPDATE fileindex SET p2rev=?, flags=? WHERE id=?',
761 780 (self._nodetorev[p1], entry.flags, entry.rid),
762 781 )
763 782
764 783 if duplicaterevisioncb:
765 784 duplicaterevisioncb(self, self.rev(node))
766 785 empty = False
767 786 continue
768 787
769 788 if deltabase == sha1nodeconstants.nullid:
770 789 text = mdiff.patch(b'', delta)
771 790 storedelta = None
772 791 else:
773 792 text = None
774 793 storedelta = (deltabase, delta)
775 794
776 795 rev = self._addrawrevision(
777 796 node,
778 797 text,
779 798 transaction,
780 799 linkrev,
781 800 p1,
782 801 p2,
783 802 storedelta=storedelta,
784 803 flags=storeflags,
785 804 )
786 805
787 806 if addrevisioncb:
788 807 addrevisioncb(self, rev)
789 808 empty = False
790 809
791 810 return not empty
792 811
793 812 def censorrevision(self, tr, censornode, tombstone=b''):
794 813 tombstone = storageutil.packmeta({b'censored': tombstone}, b'')
795 814
796 815 # This restriction is cargo culted from revlogs and makes no sense for
797 816 # SQLite, since columns can be resized at will.
798 817 if len(tombstone) > len(self.rawdata(censornode)):
799 818 raise error.Abort(
800 819 _(b'censor tombstone must be no longer than censored data')
801 820 )
802 821
803 822 # We need to replace the censored revision's data with the tombstone.
804 823 # But replacing that data will have implications for delta chains that
805 824 # reference it.
806 825 #
807 826 # While "better," more complex strategies are possible, we do something
808 827 # simple: we find delta chain children of the censored revision and we
809 828 # replace those incremental deltas with fulltexts of their corresponding
810 829 # revision. Then we delete the now-unreferenced delta and original
811 830 # revision and insert a replacement.
812 831
813 832 # Find the delta to be censored.
814 833 censoreddeltaid = self._db.execute(
815 834 'SELECT deltaid FROM fileindex WHERE id=?',
816 835 (self._revisions[censornode].rid,),
817 836 ).fetchone()[0]
818 837
819 838 # Find all its delta chain children.
820 839 # TODO once we support storing deltas for !files, we'll need to look
821 840 # for those delta chains too.
822 841 rows = list(
823 842 self._db.execute(
824 843 'SELECT id, pathid, node FROM fileindex '
825 844 'WHERE deltabaseid=? OR deltaid=?',
826 845 (censoreddeltaid, censoreddeltaid),
827 846 )
828 847 )
829 848
830 849 for row in rows:
831 850 rid, pathid, node = row
832 851
833 852 fulltext = resolvedeltachain(
834 853 self._db, pathid, node, {}, {-1: None}, zstddctx=self._dctx
835 854 )
836 855
837 856 deltahash = hashutil.sha1(fulltext).digest()
838 857
839 858 if self._compengine == b'zstd':
840 859 deltablob = self._cctx.compress(fulltext)
841 860 compression = COMPRESSION_ZSTD
842 861 elif self._compengine == b'zlib':
843 862 deltablob = zlib.compress(fulltext)
844 863 compression = COMPRESSION_ZLIB
845 864 elif self._compengine == b'none':
846 865 deltablob = fulltext
847 866 compression = COMPRESSION_NONE
848 867 else:
849 868 raise error.ProgrammingError(
850 869 b'unhandled compression engine: %s' % self._compengine
851 870 )
852 871
853 872 if len(deltablob) >= len(fulltext):
854 873 deltablob = fulltext
855 874 compression = COMPRESSION_NONE
856 875
857 876 deltaid = insertdelta(self._db, compression, deltahash, deltablob)
858 877
859 878 self._db.execute(
860 879 'UPDATE fileindex SET deltaid=?, deltabaseid=NULL '
861 880 'WHERE id=?',
862 881 (deltaid, rid),
863 882 )
864 883
865 884 # Now create the tombstone delta and replace the delta on the censored
866 885 # node.
867 886 deltahash = hashutil.sha1(tombstone).digest()
868 887 tombstonedeltaid = insertdelta(
869 888 self._db, COMPRESSION_NONE, deltahash, tombstone
870 889 )
871 890
872 891 flags = self._revisions[censornode].flags
873 892 flags |= FLAG_CENSORED
874 893
875 894 self._db.execute(
876 895 'UPDATE fileindex SET flags=?, deltaid=?, deltabaseid=NULL '
877 896 'WHERE pathid=? AND node=?',
878 897 (flags, tombstonedeltaid, self._pathid, censornode),
879 898 )
880 899
881 900 self._db.execute('DELETE FROM delta WHERE id=?', (censoreddeltaid,))
882 901
883 902 self._refreshindex()
884 903 self._revisioncache.clear()
885 904
886 905 def getstrippoint(self, minlink):
887 906 return storageutil.resolvestripinfo(
888 907 minlink,
889 908 len(self) - 1,
890 909 [self.rev(n) for n in self.heads()],
891 910 self.linkrev,
892 911 self.parentrevs,
893 912 )
894 913
895 914 def strip(self, minlink, transaction):
896 915 if not len(self):
897 916 return
898 917
899 918 rev, _ignored = self.getstrippoint(minlink)
900 919
901 920 if rev == len(self):
902 921 return
903 922
904 923 for rev in self.revs(rev):
905 924 self._db.execute(
906 925 'DELETE FROM fileindex WHERE pathid=? AND node=?',
907 926 (self._pathid, self.node(rev)),
908 927 )
909 928
910 929 # TODO how should we garbage collect data in delta table?
911 930
912 931 self._refreshindex()
913 932
914 933 # End of ifilemutation interface.
915 934
916 935 # Start of ifilestorage interface.
917 936
918 937 def files(self):
919 938 return []
920 939
921 940 def sidedata(self, nodeorrev, _df=None):
922 941 # Not supported for now
923 942 return {}
924 943
925 944 def storageinfo(
926 945 self,
927 946 exclusivefiles=False,
928 947 sharedfiles=False,
929 948 revisionscount=False,
930 949 trackedsize=False,
931 950 storedsize=False,
932 951 ):
933 952 d = {}
934 953
935 954 if exclusivefiles:
936 955 d[b'exclusivefiles'] = []
937 956
938 957 if sharedfiles:
939 958 # TODO list sqlite file(s) here.
940 959 d[b'sharedfiles'] = []
941 960
942 961 if revisionscount:
943 962 d[b'revisionscount'] = len(self)
944 963
945 964 if trackedsize:
946 965 d[b'trackedsize'] = sum(
947 966 len(self.revision(node)) for node in self._nodetorev
948 967 )
949 968
950 969 if storedsize:
951 970 # TODO implement this?
952 971 d[b'storedsize'] = None
953 972
954 973 return d
955 974
956 975 def verifyintegrity(self, state):
957 976 state[b'skipread'] = set()
958 977
959 978 for rev in self:
960 979 node = self.node(rev)
961 980
962 981 try:
963 982 self.revision(node)
964 983 except Exception as e:
965 984 yield sqliteproblem(
966 985 error=_(b'unpacking %s: %s') % (short(node), e), node=node
967 986 )
968 987
969 988 state[b'skipread'].add(node)
970 989
971 990 # End of ifilestorage interface.
972 991
973 992 def _checkhash(self, fulltext, node, p1=None, p2=None):
974 993 if p1 is None and p2 is None:
975 994 p1, p2 = self.parents(node)
976 995
977 996 if node == storageutil.hashrevisionsha1(fulltext, p1, p2):
978 997 return
979 998
980 999 try:
981 1000 del self._revisioncache[node]
982 1001 except KeyError:
983 1002 pass
984 1003
985 1004 if storageutil.iscensoredtext(fulltext):
986 1005 raise error.CensoredNodeError(self._path, node, fulltext)
987 1006
988 1007 raise SQLiteStoreError(_(b'integrity check failed on %s') % self._path)
989 1008
990 1009 def _addrawrevision(
991 1010 self,
992 1011 node,
993 1012 revisiondata,
994 1013 transaction,
995 1014 linkrev,
996 1015 p1,
997 1016 p2,
998 1017 storedelta=None,
999 1018 flags=0,
1000 1019 ):
1001 1020 if self._pathid is None:
1002 1021 res = self._db.execute(
1003 1022 'INSERT INTO filepath (path) VALUES (?)', (self._path,)
1004 1023 )
1005 1024 self._pathid = res.lastrowid
1006 1025
1007 1026 # For simplicity, always store a delta against p1.
1008 1027 # TODO we need a lot more logic here to make behavior reasonable.
1009 1028
1010 1029 if storedelta:
1011 1030 deltabase, delta = storedelta
1012 1031
1013 1032 if isinstance(deltabase, int):
1014 1033 deltabase = self.node(deltabase)
1015 1034
1016 1035 else:
1017 1036 assert revisiondata is not None
1018 1037 deltabase = p1
1019 1038
1020 1039 if deltabase == sha1nodeconstants.nullid:
1021 1040 delta = revisiondata
1022 1041 else:
1023 1042 delta = mdiff.textdiff(
1024 1043 self.revision(self.rev(deltabase)), revisiondata
1025 1044 )
1026 1045
1027 1046 # File index stores a pointer to its delta and the parent delta.
1028 1047 # The parent delta is stored via a pointer to the fileindex PK.
1029 1048 if deltabase == sha1nodeconstants.nullid:
1030 1049 baseid = None
1031 1050 else:
1032 1051 baseid = self._revisions[deltabase].rid
1033 1052
1034 1053 # Deltas are stored with a hash of their content. This allows
1035 1054 # us to de-duplicate. The table is configured to ignore conflicts
1036 1055 # and it is faster to just insert and silently noop than to look
1037 1056 # first.
1038 1057 deltahash = hashutil.sha1(delta).digest()
1039 1058
1040 1059 if self._compengine == b'zstd':
1041 1060 deltablob = self._cctx.compress(delta)
1042 1061 compression = COMPRESSION_ZSTD
1043 1062 elif self._compengine == b'zlib':
1044 1063 deltablob = zlib.compress(delta)
1045 1064 compression = COMPRESSION_ZLIB
1046 1065 elif self._compengine == b'none':
1047 1066 deltablob = delta
1048 1067 compression = COMPRESSION_NONE
1049 1068 else:
1050 1069 raise error.ProgrammingError(
1051 1070 b'unhandled compression engine: %s' % self._compengine
1052 1071 )
1053 1072
1054 1073 # Don't store compressed data if it isn't practical.
1055 1074 if len(deltablob) >= len(delta):
1056 1075 deltablob = delta
1057 1076 compression = COMPRESSION_NONE
1058 1077
1059 1078 deltaid = insertdelta(self._db, compression, deltahash, deltablob)
1060 1079
1061 1080 rev = len(self)
1062 1081
1063 1082 if p1 == sha1nodeconstants.nullid:
1064 1083 p1rev = nullrev
1065 1084 else:
1066 1085 p1rev = self._nodetorev[p1]
1067 1086
1068 1087 if p2 == sha1nodeconstants.nullid:
1069 1088 p2rev = nullrev
1070 1089 else:
1071 1090 p2rev = self._nodetorev[p2]
1072 1091
1073 1092 rid = self._db.execute(
1074 1093 'INSERT INTO fileindex ('
1075 1094 ' pathid, revnum, node, p1rev, p2rev, linkrev, flags, '
1076 1095 ' deltaid, deltabaseid) '
1077 1096 ' VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)',
1078 1097 (
1079 1098 self._pathid,
1080 1099 rev,
1081 1100 node,
1082 1101 p1rev,
1083 1102 p2rev,
1084 1103 linkrev,
1085 1104 flags,
1086 1105 deltaid,
1087 1106 baseid,
1088 1107 ),
1089 1108 ).lastrowid
1090 1109
1091 1110 entry = revisionentry(
1092 1111 rid=rid,
1093 1112 rev=rev,
1094 1113 node=node,
1095 1114 p1rev=p1rev,
1096 1115 p2rev=p2rev,
1097 1116 p1node=p1,
1098 1117 p2node=p2,
1099 1118 linkrev=linkrev,
1100 1119 flags=flags,
1101 1120 )
1102 1121
1103 1122 self._nodetorev[node] = rev
1104 1123 self._revtonode[rev] = node
1105 1124 self._revisions[node] = entry
1106 1125
1107 1126 return rev
1108 1127
1109 1128
1110 1129 class sqliterepository(localrepo.localrepository):
1111 1130 def cancopy(self):
1112 1131 return False
1113 1132
1114 1133 def transaction(self, *args, **kwargs):
1115 1134 current = self.currenttransaction()
1116 1135
1117 1136 tr = super(sqliterepository, self).transaction(*args, **kwargs)
1118 1137
1119 1138 if current:
1120 1139 return tr
1121 1140
1122 1141 self._dbconn.execute('BEGIN TRANSACTION')
1123 1142
1124 1143 def committransaction(_):
1125 1144 self._dbconn.commit()
1126 1145
1127 1146 tr.addfinalize(b'sqlitestore', committransaction)
1128 1147
1129 1148 return tr
1130 1149
1131 1150 @property
1132 1151 def _dbconn(self):
1133 1152 # SQLite connections can only be used on the thread that created
1134 1153 # them. In most cases, this "just works." However, hgweb uses
1135 1154 # multiple threads.
1136 1155 tid = threading.current_thread().ident
1137 1156
1138 1157 if self._db:
1139 1158 if self._db[0] == tid:
1140 1159 return self._db[1]
1141 1160
1142 1161 db = makedb(self.svfs.join(b'db.sqlite'))
1143 1162 self._db = (tid, db)
1144 1163
1145 1164 return db
1146 1165
1147 1166
1148 1167 def makedb(path):
1149 1168 """Construct a database handle for a database at path."""
1150 1169
1151 1170 db = sqlite3.connect(encoding.strfromlocal(path))
1152 1171 db.text_factory = bytes
1153 1172
1154 1173 res = db.execute('PRAGMA user_version').fetchone()[0]
1155 1174
1156 1175 # New database.
1157 1176 if res == 0:
1158 1177 for statement in CREATE_SCHEMA:
1159 1178 db.execute(statement)
1160 1179
1161 1180 db.commit()
1162 1181
1163 1182 elif res == CURRENT_SCHEMA_VERSION:
1164 1183 pass
1165 1184
1166 1185 else:
1167 1186 raise error.Abort(_(b'sqlite database has unrecognized version'))
1168 1187
1169 1188 db.execute('PRAGMA journal_mode=WAL')
1170 1189
1171 1190 return db
1172 1191
1173 1192
1174 1193 def featuresetup(ui, supported):
1175 1194 supported.add(REQUIREMENT)
1176 1195
1177 1196 if zstd:
1178 1197 supported.add(REQUIREMENT_ZSTD)
1179 1198
1180 1199 supported.add(REQUIREMENT_ZLIB)
1181 1200 supported.add(REQUIREMENT_NONE)
1182 1201 supported.add(REQUIREMENT_SHALLOW_FILES)
1183 1202 supported.add(requirements.NARROW_REQUIREMENT)
1184 1203
1185 1204
1186 1205 def newreporequirements(orig, ui, createopts):
1187 1206 if createopts[b'backend'] != b'sqlite':
1188 1207 return orig(ui, createopts)
1189 1208
1190 1209 # This restriction can be lifted once we have more confidence.
1191 1210 if b'sharedrepo' in createopts:
1192 1211 raise error.Abort(
1193 1212 _(b'shared repositories not supported with SQLite store')
1194 1213 )
1195 1214
1196 1215 # This filtering is out of an abundance of caution: we want to ensure
1197 1216 # we honor creation options and we do that by annotating exactly the
1198 1217 # creation options we recognize.
1199 1218 known = {
1200 1219 b'narrowfiles',
1201 1220 b'backend',
1202 1221 b'shallowfilestore',
1203 1222 }
1204 1223
1205 1224 unsupported = set(createopts) - known
1206 1225 if unsupported:
1207 1226 raise error.Abort(
1208 1227 _(b'SQLite store does not support repo creation option: %s')
1209 1228 % b', '.join(sorted(unsupported))
1210 1229 )
1211 1230
1212 1231 # Since we're a hybrid store that still relies on revlogs, we fall back
1213 1232 # to using the revlogv1 backend's storage requirements then adding our
1214 1233 # own requirement.
1215 1234 createopts[b'backend'] = b'revlogv1'
1216 1235 requirements = orig(ui, createopts)
1217 1236 requirements.add(REQUIREMENT)
1218 1237
1219 1238 compression = ui.config(b'storage', b'sqlite.compression')
1220 1239
1221 1240 if compression == b'zstd' and not zstd:
1222 1241 raise error.Abort(
1223 1242 _(
1224 1243 b'storage.sqlite.compression set to "zstd" but '
1225 1244 b'zstandard compression not available to this '
1226 1245 b'Mercurial install'
1227 1246 )
1228 1247 )
1229 1248
1230 1249 if compression == b'zstd':
1231 1250 requirements.add(REQUIREMENT_ZSTD)
1232 1251 elif compression == b'zlib':
1233 1252 requirements.add(REQUIREMENT_ZLIB)
1234 1253 elif compression == b'none':
1235 1254 requirements.add(REQUIREMENT_NONE)
1236 1255 else:
1237 1256 raise error.Abort(
1238 1257 _(
1239 1258 b'unknown compression engine defined in '
1240 1259 b'storage.sqlite.compression: %s'
1241 1260 )
1242 1261 % compression
1243 1262 )
1244 1263
1245 1264 if createopts.get(b'shallowfilestore'):
1246 1265 requirements.add(REQUIREMENT_SHALLOW_FILES)
1247 1266
1248 1267 return requirements
1249 1268
1250 1269
1251 1270 @interfaceutil.implementer(repository.ilocalrepositoryfilestorage)
1252 1271 class sqlitefilestorage:
1253 1272 """Repository file storage backed by SQLite."""
1254 1273
1255 1274 def file(self, path):
1256 1275 if path[0] == b'/':
1257 1276 path = path[1:]
1258 1277
1259 1278 if REQUIREMENT_ZSTD in self.requirements:
1260 1279 compression = b'zstd'
1261 1280 elif REQUIREMENT_ZLIB in self.requirements:
1262 1281 compression = b'zlib'
1263 1282 elif REQUIREMENT_NONE in self.requirements:
1264 1283 compression = b'none'
1265 1284 else:
1266 1285 raise error.Abort(
1267 1286 _(
1268 1287 b'unable to determine what compression engine '
1269 1288 b'to use for SQLite storage'
1270 1289 )
1271 1290 )
1272 1291
1273 1292 return sqlitefilestore(self._dbconn, path, compression)
1274 1293
1275 1294
1276 1295 def makefilestorage(orig, requirements, features, **kwargs):
1277 1296 """Produce a type conforming to ``ilocalrepositoryfilestorage``."""
1278 1297 if REQUIREMENT in requirements:
1279 1298 if REQUIREMENT_SHALLOW_FILES in requirements:
1280 1299 features.add(repository.REPO_FEATURE_SHALLOW_FILE_STORAGE)
1281 1300
1282 1301 return sqlitefilestorage
1283 1302 else:
1284 1303 return orig(requirements=requirements, features=features, **kwargs)
1285 1304
1286 1305
1287 1306 def makemain(orig, ui, requirements, **kwargs):
1288 1307 if REQUIREMENT in requirements:
1289 1308 if REQUIREMENT_ZSTD in requirements and not zstd:
1290 1309 raise error.Abort(
1291 1310 _(
1292 1311 b'repository uses zstandard compression, which '
1293 1312 b'is not available to this Mercurial install'
1294 1313 )
1295 1314 )
1296 1315
1297 1316 return sqliterepository
1298 1317
1299 1318 return orig(requirements=requirements, **kwargs)
1300 1319
1301 1320
1302 1321 def verifierinit(orig, self, *args, **kwargs):
1303 1322 orig(self, *args, **kwargs)
1304 1323
1305 1324 # We don't care that files in the store don't align with what is
1306 1325 # advertised. So suppress these warnings.
1307 1326 self.warnorphanstorefiles = False
1308 1327
1309 1328
1310 1329 def extsetup(ui):
1311 1330 localrepo.featuresetupfuncs.add(featuresetup)
1312 1331 extensions.wrapfunction(
1313 1332 localrepo, b'newreporequirements', newreporequirements
1314 1333 )
1315 1334 extensions.wrapfunction(localrepo, b'makefilestorage', makefilestorage)
1316 1335 extensions.wrapfunction(localrepo, b'makemain', makemain)
1317 1336 extensions.wrapfunction(verify.verifier, b'__init__', verifierinit)
1318 1337
1319 1338
1320 1339 def reposetup(ui, repo):
1321 1340 if isinstance(repo, sqliterepository):
1322 1341 repo._db = None
1323 1342
1324 1343 # TODO check for bundlerepository?
General Comments 0
You need to be logged in to leave comments. Login now