##// END OF EJS Templates
sqlitestore: add an `ancestors` method...
marmoute -
r50566:92c65bd0 default
parent child Browse files
Show More
@@ -1,1325 +1,1344 b''
1 1 # sqlitestore.py - Storage backend that uses SQLite
2 2 #
3 3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 """store repository data in SQLite (EXPERIMENTAL)
9 9
10 10 The sqlitestore extension enables the storage of repository data in SQLite.
11 11
12 12 This extension is HIGHLY EXPERIMENTAL. There are NO BACKWARDS COMPATIBILITY
13 13 GUARANTEES. This means that repositories created with this extension may
14 14 only be usable with the exact version of this extension/Mercurial that was
15 15 used. The extension attempts to enforce this in order to prevent repository
16 16 corruption.
17 17
18 18 In addition, several features are not yet supported or have known bugs:
19 19
20 20 * Only some data is stored in SQLite. Changeset, manifest, and other repository
21 21 data is not yet stored in SQLite.
22 22 * Transactions are not robust. If the process is aborted at the right time
23 23 during transaction close/rollback, the repository could be in an inconsistent
24 24 state. This problem will diminish once all repository data is tracked by
25 25 SQLite.
26 26 * Bundle repositories do not work (the ability to use e.g.
27 27 `hg -R <bundle-file> log` to automatically overlay a bundle on top of the
28 28 existing repository).
29 29 * Various other features don't work.
30 30
31 31 This extension should work for basic clone/pull, update, and commit workflows.
32 32 Some history rewriting operations may fail due to lack of support for bundle
33 33 repositories.
34 34
35 35 To use, activate the extension and set the ``storage.new-repo-backend`` config
36 36 option to ``sqlite`` to enable new repositories to use SQLite for storage.
37 37 """
38 38
39 39 # To run the test suite with repos using SQLite by default, execute the
40 40 # following:
41 41 #
42 42 # HGREPOFEATURES="sqlitestore" run-tests.py \
43 43 # --extra-config-opt extensions.sqlitestore= \
44 44 # --extra-config-opt storage.new-repo-backend=sqlite
45 45
46 46
47 47 import sqlite3
48 48 import struct
49 49 import threading
50 50 import zlib
51 51
52 52 from mercurial.i18n import _
53 53 from mercurial.node import (
54 54 nullrev,
55 55 sha1nodeconstants,
56 56 short,
57 57 )
58 58 from mercurial.thirdparty import attr
59 59 from mercurial import (
60 60 ancestor,
61 61 dagop,
62 62 encoding,
63 63 error,
64 64 extensions,
65 65 localrepo,
66 66 mdiff,
67 67 pycompat,
68 68 registrar,
69 69 requirements,
70 70 util,
71 71 verify,
72 72 )
73 73 from mercurial.interfaces import (
74 74 repository,
75 75 util as interfaceutil,
76 76 )
77 77 from mercurial.utils import (
78 78 hashutil,
79 79 storageutil,
80 80 )
81 81
82 82 try:
83 83 from mercurial import zstd
84 84
85 85 zstd.__version__
86 86 except ImportError:
87 87 zstd = None
88 88
89 89 configtable = {}
90 90 configitem = registrar.configitem(configtable)
91 91
92 92 # experimental config: storage.sqlite.compression
93 93 configitem(
94 94 b'storage',
95 95 b'sqlite.compression',
96 96 default=b'zstd' if zstd else b'zlib',
97 97 experimental=True,
98 98 )
99 99
100 100 # Note for extension authors: ONLY specify testedwith = 'ships-with-hg-core' for
101 101 # extensions which SHIP WITH MERCURIAL. Non-mainline extensions should
102 102 # be specifying the version(s) of Mercurial they are tested with, or
103 103 # leave the attribute unspecified.
104 104 testedwith = b'ships-with-hg-core'
105 105
106 106 REQUIREMENT = b'exp-sqlite-001'
107 107 REQUIREMENT_ZSTD = b'exp-sqlite-comp-001=zstd'
108 108 REQUIREMENT_ZLIB = b'exp-sqlite-comp-001=zlib'
109 109 REQUIREMENT_NONE = b'exp-sqlite-comp-001=none'
110 110 REQUIREMENT_SHALLOW_FILES = b'exp-sqlite-shallow-files'
111 111
112 112 CURRENT_SCHEMA_VERSION = 1
113 113
114 114 COMPRESSION_NONE = 1
115 115 COMPRESSION_ZSTD = 2
116 116 COMPRESSION_ZLIB = 3
117 117
118 118 FLAG_CENSORED = 1
119 119 FLAG_MISSING_P1 = 2
120 120 FLAG_MISSING_P2 = 4
121 121
122 122 CREATE_SCHEMA = [
123 123 # Deltas are stored as content-indexed blobs.
124 124 # compression column holds COMPRESSION_* constant for how the
125 125 # delta is encoded.
126 126 'CREATE TABLE delta ('
127 127 ' id INTEGER PRIMARY KEY, '
128 128 ' compression INTEGER NOT NULL, '
129 129 ' hash BLOB UNIQUE ON CONFLICT ABORT, '
130 130 ' delta BLOB NOT NULL '
131 131 ')',
132 132 # Tracked paths are denormalized to integers to avoid redundant
133 133 # storage of the path name.
134 134 'CREATE TABLE filepath ('
135 135 ' id INTEGER PRIMARY KEY, '
136 136 ' path BLOB NOT NULL '
137 137 ')',
138 138 'CREATE UNIQUE INDEX filepath_path ON filepath (path)',
139 139 # We have a single table for all file revision data.
140 140 # Each file revision is uniquely described by a (path, rev) and
141 141 # (path, node).
142 142 #
143 143 # Revision data is stored as a pointer to the delta producing this
144 144 # revision and the file revision whose delta should be applied before
145 145 # that one. One can reconstruct the delta chain by recursively following
146 146 # the delta base revision pointers until one encounters NULL.
147 147 #
148 148 # flags column holds bitwise integer flags controlling storage options.
149 149 # These flags are defined by the FLAG_* constants.
150 150 'CREATE TABLE fileindex ('
151 151 ' id INTEGER PRIMARY KEY, '
152 152 ' pathid INTEGER REFERENCES filepath(id), '
153 153 ' revnum INTEGER NOT NULL, '
154 154 ' p1rev INTEGER NOT NULL, '
155 155 ' p2rev INTEGER NOT NULL, '
156 156 ' linkrev INTEGER NOT NULL, '
157 157 ' flags INTEGER NOT NULL, '
158 158 ' deltaid INTEGER REFERENCES delta(id), '
159 159 ' deltabaseid INTEGER REFERENCES fileindex(id), '
160 160 ' node BLOB NOT NULL '
161 161 ')',
162 162 'CREATE UNIQUE INDEX fileindex_pathrevnum '
163 163 ' ON fileindex (pathid, revnum)',
164 164 'CREATE UNIQUE INDEX fileindex_pathnode ON fileindex (pathid, node)',
165 165 # Provide a view over all file data for convenience.
166 166 'CREATE VIEW filedata AS '
167 167 'SELECT '
168 168 ' fileindex.id AS id, '
169 169 ' filepath.id AS pathid, '
170 170 ' filepath.path AS path, '
171 171 ' fileindex.revnum AS revnum, '
172 172 ' fileindex.node AS node, '
173 173 ' fileindex.p1rev AS p1rev, '
174 174 ' fileindex.p2rev AS p2rev, '
175 175 ' fileindex.linkrev AS linkrev, '
176 176 ' fileindex.flags AS flags, '
177 177 ' fileindex.deltaid AS deltaid, '
178 178 ' fileindex.deltabaseid AS deltabaseid '
179 179 'FROM filepath, fileindex '
180 180 'WHERE fileindex.pathid=filepath.id',
181 181 'PRAGMA user_version=%d' % CURRENT_SCHEMA_VERSION,
182 182 ]
183 183
184 184
185 185 def resolvedeltachain(db, pathid, node, revisioncache, stoprids, zstddctx=None):
186 186 """Resolve a delta chain for a file node."""
187 187
188 188 # TODO the "not in ({stops})" here is possibly slowing down the query
189 189 # because it needs to perform the lookup on every recursive invocation.
190 190 # This could possibly be faster if we created a temporary query with
191 191 # baseid "poisoned" to null and limited the recursive filter to
192 192 # "is not null".
193 193 res = db.execute(
194 194 'WITH RECURSIVE '
195 195 ' deltachain(deltaid, baseid) AS ('
196 196 ' SELECT deltaid, deltabaseid FROM fileindex '
197 197 ' WHERE pathid=? AND node=? '
198 198 ' UNION ALL '
199 199 ' SELECT fileindex.deltaid, deltabaseid '
200 200 ' FROM fileindex, deltachain '
201 201 ' WHERE '
202 202 ' fileindex.id=deltachain.baseid '
203 203 ' AND deltachain.baseid IS NOT NULL '
204 204 ' AND fileindex.id NOT IN ({stops}) '
205 205 ' ) '
206 206 'SELECT deltachain.baseid, compression, delta '
207 207 'FROM deltachain, delta '
208 208 'WHERE delta.id=deltachain.deltaid'.format(
209 209 stops=','.join(['?'] * len(stoprids))
210 210 ),
211 211 tuple([pathid, node] + list(stoprids.keys())),
212 212 )
213 213
214 214 deltas = []
215 215 lastdeltabaseid = None
216 216
217 217 for deltabaseid, compression, delta in res:
218 218 lastdeltabaseid = deltabaseid
219 219
220 220 if compression == COMPRESSION_ZSTD:
221 221 delta = zstddctx.decompress(delta)
222 222 elif compression == COMPRESSION_NONE:
223 223 delta = delta
224 224 elif compression == COMPRESSION_ZLIB:
225 225 delta = zlib.decompress(delta)
226 226 else:
227 227 raise SQLiteStoreError(
228 228 b'unhandled compression type: %d' % compression
229 229 )
230 230
231 231 deltas.append(delta)
232 232
233 233 if lastdeltabaseid in stoprids:
234 234 basetext = revisioncache[stoprids[lastdeltabaseid]]
235 235 else:
236 236 basetext = deltas.pop()
237 237
238 238 deltas.reverse()
239 239 fulltext = mdiff.patches(basetext, deltas)
240 240
241 241 # SQLite returns buffer instances for blob columns on Python 2. This
242 242 # type can propagate through the delta application layer. Because
243 243 # downstream callers assume revisions are bytes, cast as needed.
244 244 if not isinstance(fulltext, bytes):
245 245 fulltext = bytes(delta)
246 246
247 247 return fulltext
248 248
249 249
250 250 def insertdelta(db, compression, hash, delta):
251 251 try:
252 252 return db.execute(
253 253 'INSERT INTO delta (compression, hash, delta) VALUES (?, ?, ?)',
254 254 (compression, hash, delta),
255 255 ).lastrowid
256 256 except sqlite3.IntegrityError:
257 257 return db.execute(
258 258 'SELECT id FROM delta WHERE hash=?', (hash,)
259 259 ).fetchone()[0]
260 260
261 261
262 262 class SQLiteStoreError(error.StorageError):
263 263 pass
264 264
265 265
266 266 @attr.s
267 267 class revisionentry:
268 268 rid = attr.ib()
269 269 rev = attr.ib()
270 270 node = attr.ib()
271 271 p1rev = attr.ib()
272 272 p2rev = attr.ib()
273 273 p1node = attr.ib()
274 274 p2node = attr.ib()
275 275 linkrev = attr.ib()
276 276 flags = attr.ib()
277 277
278 278
279 279 @interfaceutil.implementer(repository.irevisiondelta)
280 280 @attr.s(slots=True)
281 281 class sqliterevisiondelta:
282 282 node = attr.ib()
283 283 p1node = attr.ib()
284 284 p2node = attr.ib()
285 285 basenode = attr.ib()
286 286 flags = attr.ib()
287 287 baserevisionsize = attr.ib()
288 288 revision = attr.ib()
289 289 delta = attr.ib()
290 290 sidedata = attr.ib()
291 291 protocol_flags = attr.ib()
292 292 linknode = attr.ib(default=None)
293 293
294 294
295 295 @interfaceutil.implementer(repository.iverifyproblem)
296 296 @attr.s(frozen=True)
297 297 class sqliteproblem:
298 298 warning = attr.ib(default=None)
299 299 error = attr.ib(default=None)
300 300 node = attr.ib(default=None)
301 301
302 302
303 303 @interfaceutil.implementer(repository.ifilestorage)
304 304 class sqlitefilestore:
305 305 """Implements storage for an individual tracked path."""
306 306
307 307 def __init__(self, db, path, compression):
308 308 self.nullid = sha1nodeconstants.nullid
309 309 self._db = db
310 310 self._path = path
311 311
312 312 self._pathid = None
313 313
314 314 # revnum -> node
315 315 self._revtonode = {}
316 316 # node -> revnum
317 317 self._nodetorev = {}
318 318 # node -> data structure
319 319 self._revisions = {}
320 320
321 321 self._revisioncache = util.lrucachedict(10)
322 322
323 323 self._compengine = compression
324 324
325 325 if compression == b'zstd':
326 326 self._cctx = zstd.ZstdCompressor(level=3)
327 327 self._dctx = zstd.ZstdDecompressor()
328 328 else:
329 329 self._cctx = None
330 330 self._dctx = None
331 331
332 332 self._refreshindex()
333 333
334 334 def _refreshindex(self):
335 335 self._revtonode = {}
336 336 self._nodetorev = {}
337 337 self._revisions = {}
338 338
339 339 res = list(
340 340 self._db.execute(
341 341 'SELECT id FROM filepath WHERE path=?', (self._path,)
342 342 )
343 343 )
344 344
345 345 if not res:
346 346 self._pathid = None
347 347 return
348 348
349 349 self._pathid = res[0][0]
350 350
351 351 res = self._db.execute(
352 352 'SELECT id, revnum, node, p1rev, p2rev, linkrev, flags '
353 353 'FROM fileindex '
354 354 'WHERE pathid=? '
355 355 'ORDER BY revnum ASC',
356 356 (self._pathid,),
357 357 )
358 358
359 359 for i, row in enumerate(res):
360 360 rid, rev, node, p1rev, p2rev, linkrev, flags = row
361 361
362 362 if i != rev:
363 363 raise SQLiteStoreError(
364 364 _(b'sqlite database has inconsistent revision numbers')
365 365 )
366 366
367 367 if p1rev == nullrev:
368 368 p1node = sha1nodeconstants.nullid
369 369 else:
370 370 p1node = self._revtonode[p1rev]
371 371
372 372 if p2rev == nullrev:
373 373 p2node = sha1nodeconstants.nullid
374 374 else:
375 375 p2node = self._revtonode[p2rev]
376 376
377 377 entry = revisionentry(
378 378 rid=rid,
379 379 rev=rev,
380 380 node=node,
381 381 p1rev=p1rev,
382 382 p2rev=p2rev,
383 383 p1node=p1node,
384 384 p2node=p2node,
385 385 linkrev=linkrev,
386 386 flags=flags,
387 387 )
388 388
389 389 self._revtonode[rev] = node
390 390 self._nodetorev[node] = rev
391 391 self._revisions[node] = entry
392 392
393 393 # Start of ifileindex interface.
394 394
395 395 def __len__(self):
396 396 return len(self._revisions)
397 397
398 398 def __iter__(self):
399 399 return iter(range(len(self._revisions)))
400 400
401 401 def hasnode(self, node):
402 402 if node == sha1nodeconstants.nullid:
403 403 return False
404 404
405 405 return node in self._nodetorev
406 406
407 407 def revs(self, start=0, stop=None):
408 408 return storageutil.iterrevs(
409 409 len(self._revisions), start=start, stop=stop
410 410 )
411 411
412 412 def parents(self, node):
413 413 if node == sha1nodeconstants.nullid:
414 414 return sha1nodeconstants.nullid, sha1nodeconstants.nullid
415 415
416 416 if node not in self._revisions:
417 417 raise error.LookupError(node, self._path, _(b'no node'))
418 418
419 419 entry = self._revisions[node]
420 420 return entry.p1node, entry.p2node
421 421
422 422 def parentrevs(self, rev):
423 423 if rev == nullrev:
424 424 return nullrev, nullrev
425 425
426 426 if rev not in self._revtonode:
427 427 raise IndexError(rev)
428 428
429 429 entry = self._revisions[self._revtonode[rev]]
430 430 return entry.p1rev, entry.p2rev
431 431
432 def ancestors(self, revs, stoprev=0, inclusive=False):
433 """Generate the ancestors of 'revs' in reverse revision order.
434 Does not generate revs lower than stoprev.
435
436 See the documentation for ancestor.lazyancestors for more details."""
437
438 # first, make sure start revisions aren't filtered
439 revs = list(revs)
440 checkrev = self.node
441 for r in revs:
442 checkrev(r)
443
444 return ancestor.lazyancestors(
445 self.parentrevs,
446 revs,
447 stoprev=stoprev,
448 inclusive=inclusive,
449 )
450
432 451 def rev(self, node):
433 452 if node == sha1nodeconstants.nullid:
434 453 return nullrev
435 454
436 455 if node not in self._nodetorev:
437 456 raise error.LookupError(node, self._path, _(b'no node'))
438 457
439 458 return self._nodetorev[node]
440 459
441 460 def node(self, rev):
442 461 if rev == nullrev:
443 462 return sha1nodeconstants.nullid
444 463
445 464 if rev not in self._revtonode:
446 465 raise IndexError(rev)
447 466
448 467 return self._revtonode[rev]
449 468
450 469 def lookup(self, node):
451 470 return storageutil.fileidlookup(self, node, self._path)
452 471
453 472 def linkrev(self, rev):
454 473 if rev == nullrev:
455 474 return nullrev
456 475
457 476 if rev not in self._revtonode:
458 477 raise IndexError(rev)
459 478
460 479 entry = self._revisions[self._revtonode[rev]]
461 480 return entry.linkrev
462 481
463 482 def iscensored(self, rev):
464 483 if rev == nullrev:
465 484 return False
466 485
467 486 if rev not in self._revtonode:
468 487 raise IndexError(rev)
469 488
470 489 return self._revisions[self._revtonode[rev]].flags & FLAG_CENSORED
471 490
472 491 def commonancestorsheads(self, node1, node2):
473 492 rev1 = self.rev(node1)
474 493 rev2 = self.rev(node2)
475 494
476 495 ancestors = ancestor.commonancestorsheads(self.parentrevs, rev1, rev2)
477 496 return pycompat.maplist(self.node, ancestors)
478 497
479 498 def descendants(self, revs):
480 499 # TODO we could implement this using a recursive SQL query, which
481 500 # might be faster.
482 501 return dagop.descendantrevs(revs, self.revs, self.parentrevs)
483 502
484 503 def heads(self, start=None, stop=None):
485 504 if start is None and stop is None:
486 505 if not len(self):
487 506 return [sha1nodeconstants.nullid]
488 507
489 508 startrev = self.rev(start) if start is not None else nullrev
490 509 stoprevs = {self.rev(n) for n in stop or []}
491 510
492 511 revs = dagop.headrevssubset(
493 512 self.revs, self.parentrevs, startrev=startrev, stoprevs=stoprevs
494 513 )
495 514
496 515 return [self.node(rev) for rev in revs]
497 516
498 517 def children(self, node):
499 518 rev = self.rev(node)
500 519
501 520 res = self._db.execute(
502 521 'SELECT'
503 522 ' node '
504 523 ' FROM filedata '
505 524 ' WHERE path=? AND (p1rev=? OR p2rev=?) '
506 525 ' ORDER BY revnum ASC',
507 526 (self._path, rev, rev),
508 527 )
509 528
510 529 return [row[0] for row in res]
511 530
512 531 # End of ifileindex interface.
513 532
514 533 # Start of ifiledata interface.
515 534
516 535 def size(self, rev):
517 536 if rev == nullrev:
518 537 return 0
519 538
520 539 if rev not in self._revtonode:
521 540 raise IndexError(rev)
522 541
523 542 node = self._revtonode[rev]
524 543
525 544 if self.renamed(node):
526 545 return len(self.read(node))
527 546
528 547 return len(self.revision(node))
529 548
530 549 def revision(self, node, raw=False, _verifyhash=True):
531 550 if node in (sha1nodeconstants.nullid, nullrev):
532 551 return b''
533 552
534 553 if isinstance(node, int):
535 554 node = self.node(node)
536 555
537 556 if node not in self._nodetorev:
538 557 raise error.LookupError(node, self._path, _(b'no node'))
539 558
540 559 if node in self._revisioncache:
541 560 return self._revisioncache[node]
542 561
543 562 # Because we have a fulltext revision cache, we are able to
544 563 # short-circuit delta chain traversal and decompression as soon as
545 564 # we encounter a revision in the cache.
546 565
547 566 stoprids = {self._revisions[n].rid: n for n in self._revisioncache}
548 567
549 568 if not stoprids:
550 569 stoprids[-1] = None
551 570
552 571 fulltext = resolvedeltachain(
553 572 self._db,
554 573 self._pathid,
555 574 node,
556 575 self._revisioncache,
557 576 stoprids,
558 577 zstddctx=self._dctx,
559 578 )
560 579
561 580 # Don't verify hashes if parent nodes were rewritten, as the hash
562 581 # wouldn't verify.
563 582 if self._revisions[node].flags & (FLAG_MISSING_P1 | FLAG_MISSING_P2):
564 583 _verifyhash = False
565 584
566 585 if _verifyhash:
567 586 self._checkhash(fulltext, node)
568 587 self._revisioncache[node] = fulltext
569 588
570 589 return fulltext
571 590
572 591 def rawdata(self, *args, **kwargs):
573 592 return self.revision(*args, **kwargs)
574 593
575 594 def read(self, node):
576 595 return storageutil.filtermetadata(self.revision(node))
577 596
578 597 def renamed(self, node):
579 598 return storageutil.filerevisioncopied(self, node)
580 599
581 600 def cmp(self, node, fulltext):
582 601 return not storageutil.filedataequivalent(self, node, fulltext)
583 602
584 603 def emitrevisions(
585 604 self,
586 605 nodes,
587 606 nodesorder=None,
588 607 revisiondata=False,
589 608 assumehaveparentrevisions=False,
590 609 deltamode=repository.CG_DELTAMODE_STD,
591 610 sidedata_helpers=None,
592 611 debug_info=None,
593 612 ):
594 613 if nodesorder not in (b'nodes', b'storage', b'linear', None):
595 614 raise error.ProgrammingError(
596 615 b'unhandled value for nodesorder: %s' % nodesorder
597 616 )
598 617
599 618 nodes = [n for n in nodes if n != sha1nodeconstants.nullid]
600 619
601 620 if not nodes:
602 621 return
603 622
604 623 # TODO perform in a single query.
605 624 res = self._db.execute(
606 625 'SELECT revnum, deltaid FROM fileindex '
607 626 'WHERE pathid=? '
608 627 ' AND node in (%s)' % (','.join(['?'] * len(nodes))),
609 628 tuple([self._pathid] + nodes),
610 629 )
611 630
612 631 deltabases = {}
613 632
614 633 for rev, deltaid in res:
615 634 res = self._db.execute(
616 635 'SELECT revnum from fileindex WHERE pathid=? AND deltaid=?',
617 636 (self._pathid, deltaid),
618 637 )
619 638 deltabases[rev] = res.fetchone()[0]
620 639
621 640 # TODO define revdifffn so we can use delta from storage.
622 641 for delta in storageutil.emitrevisions(
623 642 self,
624 643 nodes,
625 644 nodesorder,
626 645 sqliterevisiondelta,
627 646 deltaparentfn=deltabases.__getitem__,
628 647 revisiondata=revisiondata,
629 648 assumehaveparentrevisions=assumehaveparentrevisions,
630 649 deltamode=deltamode,
631 650 sidedata_helpers=sidedata_helpers,
632 651 ):
633 652
634 653 yield delta
635 654
636 655 # End of ifiledata interface.
637 656
638 657 # Start of ifilemutation interface.
639 658
640 659 def add(self, filedata, meta, transaction, linkrev, p1, p2):
641 660 if meta or filedata.startswith(b'\x01\n'):
642 661 filedata = storageutil.packmeta(meta, filedata)
643 662
644 663 rev = self.addrevision(filedata, transaction, linkrev, p1, p2)
645 664 return self.node(rev)
646 665
647 666 def addrevision(
648 667 self,
649 668 revisiondata,
650 669 transaction,
651 670 linkrev,
652 671 p1,
653 672 p2,
654 673 node=None,
655 674 flags=0,
656 675 cachedelta=None,
657 676 ):
658 677 if flags:
659 678 raise SQLiteStoreError(_(b'flags not supported on revisions'))
660 679
661 680 validatehash = node is not None
662 681 node = node or storageutil.hashrevisionsha1(revisiondata, p1, p2)
663 682
664 683 if validatehash:
665 684 self._checkhash(revisiondata, node, p1, p2)
666 685
667 686 rev = self._nodetorev.get(node)
668 687 if rev is not None:
669 688 return rev
670 689
671 690 rev = self._addrawrevision(
672 691 node, revisiondata, transaction, linkrev, p1, p2
673 692 )
674 693
675 694 self._revisioncache[node] = revisiondata
676 695 return rev
677 696
678 697 def addgroup(
679 698 self,
680 699 deltas,
681 700 linkmapper,
682 701 transaction,
683 702 addrevisioncb=None,
684 703 duplicaterevisioncb=None,
685 704 maybemissingparents=False,
686 705 ):
687 706 empty = True
688 707
689 708 for (
690 709 node,
691 710 p1,
692 711 p2,
693 712 linknode,
694 713 deltabase,
695 714 delta,
696 715 wireflags,
697 716 sidedata,
698 717 ) in deltas:
699 718 storeflags = 0
700 719
701 720 if wireflags & repository.REVISION_FLAG_CENSORED:
702 721 storeflags |= FLAG_CENSORED
703 722
704 723 if wireflags & ~repository.REVISION_FLAG_CENSORED:
705 724 raise SQLiteStoreError(b'unhandled revision flag')
706 725
707 726 if maybemissingparents:
708 727 if p1 != sha1nodeconstants.nullid and not self.hasnode(p1):
709 728 p1 = sha1nodeconstants.nullid
710 729 storeflags |= FLAG_MISSING_P1
711 730
712 731 if p2 != sha1nodeconstants.nullid and not self.hasnode(p2):
713 732 p2 = sha1nodeconstants.nullid
714 733 storeflags |= FLAG_MISSING_P2
715 734
716 735 baserev = self.rev(deltabase)
717 736
718 737 # If base is censored, delta must be full replacement in a single
719 738 # patch operation.
720 739 if baserev != nullrev and self.iscensored(baserev):
721 740 hlen = struct.calcsize(b'>lll')
722 741 oldlen = len(self.rawdata(deltabase, _verifyhash=False))
723 742 newlen = len(delta) - hlen
724 743
725 744 if delta[:hlen] != mdiff.replacediffheader(oldlen, newlen):
726 745 raise error.CensoredBaseError(self._path, deltabase)
727 746
728 747 if not (storeflags & FLAG_CENSORED) and storageutil.deltaiscensored(
729 748 delta, baserev, lambda x: len(self.rawdata(x))
730 749 ):
731 750 storeflags |= FLAG_CENSORED
732 751
733 752 linkrev = linkmapper(linknode)
734 753
735 754 if node in self._revisions:
736 755 # Possibly reset parents to make them proper.
737 756 entry = self._revisions[node]
738 757
739 758 if (
740 759 entry.flags & FLAG_MISSING_P1
741 760 and p1 != sha1nodeconstants.nullid
742 761 ):
743 762 entry.p1node = p1
744 763 entry.p1rev = self._nodetorev[p1]
745 764 entry.flags &= ~FLAG_MISSING_P1
746 765
747 766 self._db.execute(
748 767 'UPDATE fileindex SET p1rev=?, flags=? WHERE id=?',
749 768 (self._nodetorev[p1], entry.flags, entry.rid),
750 769 )
751 770
752 771 if (
753 772 entry.flags & FLAG_MISSING_P2
754 773 and p2 != sha1nodeconstants.nullid
755 774 ):
756 775 entry.p2node = p2
757 776 entry.p2rev = self._nodetorev[p2]
758 777 entry.flags &= ~FLAG_MISSING_P2
759 778
760 779 self._db.execute(
761 780 'UPDATE fileindex SET p2rev=?, flags=? WHERE id=?',
762 781 (self._nodetorev[p1], entry.flags, entry.rid),
763 782 )
764 783
765 784 if duplicaterevisioncb:
766 785 duplicaterevisioncb(self, self.rev(node))
767 786 empty = False
768 787 continue
769 788
770 789 if deltabase == sha1nodeconstants.nullid:
771 790 text = mdiff.patch(b'', delta)
772 791 storedelta = None
773 792 else:
774 793 text = None
775 794 storedelta = (deltabase, delta)
776 795
777 796 rev = self._addrawrevision(
778 797 node,
779 798 text,
780 799 transaction,
781 800 linkrev,
782 801 p1,
783 802 p2,
784 803 storedelta=storedelta,
785 804 flags=storeflags,
786 805 )
787 806
788 807 if addrevisioncb:
789 808 addrevisioncb(self, rev)
790 809 empty = False
791 810
792 811 return not empty
793 812
794 813 def censorrevision(self, tr, censornode, tombstone=b''):
795 814 tombstone = storageutil.packmeta({b'censored': tombstone}, b'')
796 815
797 816 # This restriction is cargo culted from revlogs and makes no sense for
798 817 # SQLite, since columns can be resized at will.
799 818 if len(tombstone) > len(self.rawdata(censornode)):
800 819 raise error.Abort(
801 820 _(b'censor tombstone must be no longer than censored data')
802 821 )
803 822
804 823 # We need to replace the censored revision's data with the tombstone.
805 824 # But replacing that data will have implications for delta chains that
806 825 # reference it.
807 826 #
808 827 # While "better," more complex strategies are possible, we do something
809 828 # simple: we find delta chain children of the censored revision and we
810 829 # replace those incremental deltas with fulltexts of their corresponding
811 830 # revision. Then we delete the now-unreferenced delta and original
812 831 # revision and insert a replacement.
813 832
814 833 # Find the delta to be censored.
815 834 censoreddeltaid = self._db.execute(
816 835 'SELECT deltaid FROM fileindex WHERE id=?',
817 836 (self._revisions[censornode].rid,),
818 837 ).fetchone()[0]
819 838
820 839 # Find all its delta chain children.
821 840 # TODO once we support storing deltas for !files, we'll need to look
822 841 # for those delta chains too.
823 842 rows = list(
824 843 self._db.execute(
825 844 'SELECT id, pathid, node FROM fileindex '
826 845 'WHERE deltabaseid=? OR deltaid=?',
827 846 (censoreddeltaid, censoreddeltaid),
828 847 )
829 848 )
830 849
831 850 for row in rows:
832 851 rid, pathid, node = row
833 852
834 853 fulltext = resolvedeltachain(
835 854 self._db, pathid, node, {}, {-1: None}, zstddctx=self._dctx
836 855 )
837 856
838 857 deltahash = hashutil.sha1(fulltext).digest()
839 858
840 859 if self._compengine == b'zstd':
841 860 deltablob = self._cctx.compress(fulltext)
842 861 compression = COMPRESSION_ZSTD
843 862 elif self._compengine == b'zlib':
844 863 deltablob = zlib.compress(fulltext)
845 864 compression = COMPRESSION_ZLIB
846 865 elif self._compengine == b'none':
847 866 deltablob = fulltext
848 867 compression = COMPRESSION_NONE
849 868 else:
850 869 raise error.ProgrammingError(
851 870 b'unhandled compression engine: %s' % self._compengine
852 871 )
853 872
854 873 if len(deltablob) >= len(fulltext):
855 874 deltablob = fulltext
856 875 compression = COMPRESSION_NONE
857 876
858 877 deltaid = insertdelta(self._db, compression, deltahash, deltablob)
859 878
860 879 self._db.execute(
861 880 'UPDATE fileindex SET deltaid=?, deltabaseid=NULL '
862 881 'WHERE id=?',
863 882 (deltaid, rid),
864 883 )
865 884
866 885 # Now create the tombstone delta and replace the delta on the censored
867 886 # node.
868 887 deltahash = hashutil.sha1(tombstone).digest()
869 888 tombstonedeltaid = insertdelta(
870 889 self._db, COMPRESSION_NONE, deltahash, tombstone
871 890 )
872 891
873 892 flags = self._revisions[censornode].flags
874 893 flags |= FLAG_CENSORED
875 894
876 895 self._db.execute(
877 896 'UPDATE fileindex SET flags=?, deltaid=?, deltabaseid=NULL '
878 897 'WHERE pathid=? AND node=?',
879 898 (flags, tombstonedeltaid, self._pathid, censornode),
880 899 )
881 900
882 901 self._db.execute('DELETE FROM delta WHERE id=?', (censoreddeltaid,))
883 902
884 903 self._refreshindex()
885 904 self._revisioncache.clear()
886 905
887 906 def getstrippoint(self, minlink):
888 907 return storageutil.resolvestripinfo(
889 908 minlink,
890 909 len(self) - 1,
891 910 [self.rev(n) for n in self.heads()],
892 911 self.linkrev,
893 912 self.parentrevs,
894 913 )
895 914
896 915 def strip(self, minlink, transaction):
897 916 if not len(self):
898 917 return
899 918
900 919 rev, _ignored = self.getstrippoint(minlink)
901 920
902 921 if rev == len(self):
903 922 return
904 923
905 924 for rev in self.revs(rev):
906 925 self._db.execute(
907 926 'DELETE FROM fileindex WHERE pathid=? AND node=?',
908 927 (self._pathid, self.node(rev)),
909 928 )
910 929
911 930 # TODO how should we garbage collect data in delta table?
912 931
913 932 self._refreshindex()
914 933
915 934 # End of ifilemutation interface.
916 935
917 936 # Start of ifilestorage interface.
918 937
919 938 def files(self):
920 939 return []
921 940
922 941 def sidedata(self, nodeorrev, _df=None):
923 942 # Not supported for now
924 943 return {}
925 944
926 945 def storageinfo(
927 946 self,
928 947 exclusivefiles=False,
929 948 sharedfiles=False,
930 949 revisionscount=False,
931 950 trackedsize=False,
932 951 storedsize=False,
933 952 ):
934 953 d = {}
935 954
936 955 if exclusivefiles:
937 956 d[b'exclusivefiles'] = []
938 957
939 958 if sharedfiles:
940 959 # TODO list sqlite file(s) here.
941 960 d[b'sharedfiles'] = []
942 961
943 962 if revisionscount:
944 963 d[b'revisionscount'] = len(self)
945 964
946 965 if trackedsize:
947 966 d[b'trackedsize'] = sum(
948 967 len(self.revision(node)) for node in self._nodetorev
949 968 )
950 969
951 970 if storedsize:
952 971 # TODO implement this?
953 972 d[b'storedsize'] = None
954 973
955 974 return d
956 975
957 976 def verifyintegrity(self, state):
958 977 state[b'skipread'] = set()
959 978
960 979 for rev in self:
961 980 node = self.node(rev)
962 981
963 982 try:
964 983 self.revision(node)
965 984 except Exception as e:
966 985 yield sqliteproblem(
967 986 error=_(b'unpacking %s: %s') % (short(node), e), node=node
968 987 )
969 988
970 989 state[b'skipread'].add(node)
971 990
972 991 # End of ifilestorage interface.
973 992
974 993 def _checkhash(self, fulltext, node, p1=None, p2=None):
975 994 if p1 is None and p2 is None:
976 995 p1, p2 = self.parents(node)
977 996
978 997 if node == storageutil.hashrevisionsha1(fulltext, p1, p2):
979 998 return
980 999
981 1000 try:
982 1001 del self._revisioncache[node]
983 1002 except KeyError:
984 1003 pass
985 1004
986 1005 if storageutil.iscensoredtext(fulltext):
987 1006 raise error.CensoredNodeError(self._path, node, fulltext)
988 1007
989 1008 raise SQLiteStoreError(_(b'integrity check failed on %s') % self._path)
990 1009
991 1010 def _addrawrevision(
992 1011 self,
993 1012 node,
994 1013 revisiondata,
995 1014 transaction,
996 1015 linkrev,
997 1016 p1,
998 1017 p2,
999 1018 storedelta=None,
1000 1019 flags=0,
1001 1020 ):
1002 1021 if self._pathid is None:
1003 1022 res = self._db.execute(
1004 1023 'INSERT INTO filepath (path) VALUES (?)', (self._path,)
1005 1024 )
1006 1025 self._pathid = res.lastrowid
1007 1026
1008 1027 # For simplicity, always store a delta against p1.
1009 1028 # TODO we need a lot more logic here to make behavior reasonable.
1010 1029
1011 1030 if storedelta:
1012 1031 deltabase, delta = storedelta
1013 1032
1014 1033 if isinstance(deltabase, int):
1015 1034 deltabase = self.node(deltabase)
1016 1035
1017 1036 else:
1018 1037 assert revisiondata is not None
1019 1038 deltabase = p1
1020 1039
1021 1040 if deltabase == sha1nodeconstants.nullid:
1022 1041 delta = revisiondata
1023 1042 else:
1024 1043 delta = mdiff.textdiff(
1025 1044 self.revision(self.rev(deltabase)), revisiondata
1026 1045 )
1027 1046
1028 1047 # File index stores a pointer to its delta and the parent delta.
1029 1048 # The parent delta is stored via a pointer to the fileindex PK.
1030 1049 if deltabase == sha1nodeconstants.nullid:
1031 1050 baseid = None
1032 1051 else:
1033 1052 baseid = self._revisions[deltabase].rid
1034 1053
1035 1054 # Deltas are stored with a hash of their content. This allows
1036 1055 # us to de-duplicate. The table is configured to ignore conflicts
1037 1056 # and it is faster to just insert and silently noop than to look
1038 1057 # first.
1039 1058 deltahash = hashutil.sha1(delta).digest()
1040 1059
1041 1060 if self._compengine == b'zstd':
1042 1061 deltablob = self._cctx.compress(delta)
1043 1062 compression = COMPRESSION_ZSTD
1044 1063 elif self._compengine == b'zlib':
1045 1064 deltablob = zlib.compress(delta)
1046 1065 compression = COMPRESSION_ZLIB
1047 1066 elif self._compengine == b'none':
1048 1067 deltablob = delta
1049 1068 compression = COMPRESSION_NONE
1050 1069 else:
1051 1070 raise error.ProgrammingError(
1052 1071 b'unhandled compression engine: %s' % self._compengine
1053 1072 )
1054 1073
1055 1074 # Don't store compressed data if it isn't practical.
1056 1075 if len(deltablob) >= len(delta):
1057 1076 deltablob = delta
1058 1077 compression = COMPRESSION_NONE
1059 1078
1060 1079 deltaid = insertdelta(self._db, compression, deltahash, deltablob)
1061 1080
1062 1081 rev = len(self)
1063 1082
1064 1083 if p1 == sha1nodeconstants.nullid:
1065 1084 p1rev = nullrev
1066 1085 else:
1067 1086 p1rev = self._nodetorev[p1]
1068 1087
1069 1088 if p2 == sha1nodeconstants.nullid:
1070 1089 p2rev = nullrev
1071 1090 else:
1072 1091 p2rev = self._nodetorev[p2]
1073 1092
1074 1093 rid = self._db.execute(
1075 1094 'INSERT INTO fileindex ('
1076 1095 ' pathid, revnum, node, p1rev, p2rev, linkrev, flags, '
1077 1096 ' deltaid, deltabaseid) '
1078 1097 ' VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)',
1079 1098 (
1080 1099 self._pathid,
1081 1100 rev,
1082 1101 node,
1083 1102 p1rev,
1084 1103 p2rev,
1085 1104 linkrev,
1086 1105 flags,
1087 1106 deltaid,
1088 1107 baseid,
1089 1108 ),
1090 1109 ).lastrowid
1091 1110
1092 1111 entry = revisionentry(
1093 1112 rid=rid,
1094 1113 rev=rev,
1095 1114 node=node,
1096 1115 p1rev=p1rev,
1097 1116 p2rev=p2rev,
1098 1117 p1node=p1,
1099 1118 p2node=p2,
1100 1119 linkrev=linkrev,
1101 1120 flags=flags,
1102 1121 )
1103 1122
1104 1123 self._nodetorev[node] = rev
1105 1124 self._revtonode[rev] = node
1106 1125 self._revisions[node] = entry
1107 1126
1108 1127 return rev
1109 1128
1110 1129
1111 1130 class sqliterepository(localrepo.localrepository):
1112 1131 def cancopy(self):
1113 1132 return False
1114 1133
1115 1134 def transaction(self, *args, **kwargs):
1116 1135 current = self.currenttransaction()
1117 1136
1118 1137 tr = super(sqliterepository, self).transaction(*args, **kwargs)
1119 1138
1120 1139 if current:
1121 1140 return tr
1122 1141
1123 1142 self._dbconn.execute('BEGIN TRANSACTION')
1124 1143
1125 1144 def committransaction(_):
1126 1145 self._dbconn.commit()
1127 1146
1128 1147 tr.addfinalize(b'sqlitestore', committransaction)
1129 1148
1130 1149 return tr
1131 1150
1132 1151 @property
1133 1152 def _dbconn(self):
1134 1153 # SQLite connections can only be used on the thread that created
1135 1154 # them. In most cases, this "just works." However, hgweb uses
1136 1155 # multiple threads.
1137 1156 tid = threading.current_thread().ident
1138 1157
1139 1158 if self._db:
1140 1159 if self._db[0] == tid:
1141 1160 return self._db[1]
1142 1161
1143 1162 db = makedb(self.svfs.join(b'db.sqlite'))
1144 1163 self._db = (tid, db)
1145 1164
1146 1165 return db
1147 1166
1148 1167
1149 1168 def makedb(path):
1150 1169 """Construct a database handle for a database at path."""
1151 1170
1152 1171 db = sqlite3.connect(encoding.strfromlocal(path))
1153 1172 db.text_factory = bytes
1154 1173
1155 1174 res = db.execute('PRAGMA user_version').fetchone()[0]
1156 1175
1157 1176 # New database.
1158 1177 if res == 0:
1159 1178 for statement in CREATE_SCHEMA:
1160 1179 db.execute(statement)
1161 1180
1162 1181 db.commit()
1163 1182
1164 1183 elif res == CURRENT_SCHEMA_VERSION:
1165 1184 pass
1166 1185
1167 1186 else:
1168 1187 raise error.Abort(_(b'sqlite database has unrecognized version'))
1169 1188
1170 1189 db.execute('PRAGMA journal_mode=WAL')
1171 1190
1172 1191 return db
1173 1192
1174 1193
1175 1194 def featuresetup(ui, supported):
1176 1195 supported.add(REQUIREMENT)
1177 1196
1178 1197 if zstd:
1179 1198 supported.add(REQUIREMENT_ZSTD)
1180 1199
1181 1200 supported.add(REQUIREMENT_ZLIB)
1182 1201 supported.add(REQUIREMENT_NONE)
1183 1202 supported.add(REQUIREMENT_SHALLOW_FILES)
1184 1203 supported.add(requirements.NARROW_REQUIREMENT)
1185 1204
1186 1205
1187 1206 def newreporequirements(orig, ui, createopts):
1188 1207 if createopts[b'backend'] != b'sqlite':
1189 1208 return orig(ui, createopts)
1190 1209
1191 1210 # This restriction can be lifted once we have more confidence.
1192 1211 if b'sharedrepo' in createopts:
1193 1212 raise error.Abort(
1194 1213 _(b'shared repositories not supported with SQLite store')
1195 1214 )
1196 1215
1197 1216 # This filtering is out of an abundance of caution: we want to ensure
1198 1217 # we honor creation options and we do that by annotating exactly the
1199 1218 # creation options we recognize.
1200 1219 known = {
1201 1220 b'narrowfiles',
1202 1221 b'backend',
1203 1222 b'shallowfilestore',
1204 1223 }
1205 1224
1206 1225 unsupported = set(createopts) - known
1207 1226 if unsupported:
1208 1227 raise error.Abort(
1209 1228 _(b'SQLite store does not support repo creation option: %s')
1210 1229 % b', '.join(sorted(unsupported))
1211 1230 )
1212 1231
1213 1232 # Since we're a hybrid store that still relies on revlogs, we fall back
1214 1233 # to using the revlogv1 backend's storage requirements then adding our
1215 1234 # own requirement.
1216 1235 createopts[b'backend'] = b'revlogv1'
1217 1236 requirements = orig(ui, createopts)
1218 1237 requirements.add(REQUIREMENT)
1219 1238
1220 1239 compression = ui.config(b'storage', b'sqlite.compression')
1221 1240
1222 1241 if compression == b'zstd' and not zstd:
1223 1242 raise error.Abort(
1224 1243 _(
1225 1244 b'storage.sqlite.compression set to "zstd" but '
1226 1245 b'zstandard compression not available to this '
1227 1246 b'Mercurial install'
1228 1247 )
1229 1248 )
1230 1249
1231 1250 if compression == b'zstd':
1232 1251 requirements.add(REQUIREMENT_ZSTD)
1233 1252 elif compression == b'zlib':
1234 1253 requirements.add(REQUIREMENT_ZLIB)
1235 1254 elif compression == b'none':
1236 1255 requirements.add(REQUIREMENT_NONE)
1237 1256 else:
1238 1257 raise error.Abort(
1239 1258 _(
1240 1259 b'unknown compression engine defined in '
1241 1260 b'storage.sqlite.compression: %s'
1242 1261 )
1243 1262 % compression
1244 1263 )
1245 1264
1246 1265 if createopts.get(b'shallowfilestore'):
1247 1266 requirements.add(REQUIREMENT_SHALLOW_FILES)
1248 1267
1249 1268 return requirements
1250 1269
1251 1270
1252 1271 @interfaceutil.implementer(repository.ilocalrepositoryfilestorage)
1253 1272 class sqlitefilestorage:
1254 1273 """Repository file storage backed by SQLite."""
1255 1274
1256 1275 def file(self, path):
1257 1276 if path[0] == b'/':
1258 1277 path = path[1:]
1259 1278
1260 1279 if REQUIREMENT_ZSTD in self.requirements:
1261 1280 compression = b'zstd'
1262 1281 elif REQUIREMENT_ZLIB in self.requirements:
1263 1282 compression = b'zlib'
1264 1283 elif REQUIREMENT_NONE in self.requirements:
1265 1284 compression = b'none'
1266 1285 else:
1267 1286 raise error.Abort(
1268 1287 _(
1269 1288 b'unable to determine what compression engine '
1270 1289 b'to use for SQLite storage'
1271 1290 )
1272 1291 )
1273 1292
1274 1293 return sqlitefilestore(self._dbconn, path, compression)
1275 1294
1276 1295
1277 1296 def makefilestorage(orig, requirements, features, **kwargs):
1278 1297 """Produce a type conforming to ``ilocalrepositoryfilestorage``."""
1279 1298 if REQUIREMENT in requirements:
1280 1299 if REQUIREMENT_SHALLOW_FILES in requirements:
1281 1300 features.add(repository.REPO_FEATURE_SHALLOW_FILE_STORAGE)
1282 1301
1283 1302 return sqlitefilestorage
1284 1303 else:
1285 1304 return orig(requirements=requirements, features=features, **kwargs)
1286 1305
1287 1306
1288 1307 def makemain(orig, ui, requirements, **kwargs):
1289 1308 if REQUIREMENT in requirements:
1290 1309 if REQUIREMENT_ZSTD in requirements and not zstd:
1291 1310 raise error.Abort(
1292 1311 _(
1293 1312 b'repository uses zstandard compression, which '
1294 1313 b'is not available to this Mercurial install'
1295 1314 )
1296 1315 )
1297 1316
1298 1317 return sqliterepository
1299 1318
1300 1319 return orig(requirements=requirements, **kwargs)
1301 1320
1302 1321
1303 1322 def verifierinit(orig, self, *args, **kwargs):
1304 1323 orig(self, *args, **kwargs)
1305 1324
1306 1325 # We don't care that files in the store don't align with what is
1307 1326 # advertised. So suppress these warnings.
1308 1327 self.warnorphanstorefiles = False
1309 1328
1310 1329
1311 1330 def extsetup(ui):
1312 1331 localrepo.featuresetupfuncs.add(featuresetup)
1313 1332 extensions.wrapfunction(
1314 1333 localrepo, b'newreporequirements', newreporequirements
1315 1334 )
1316 1335 extensions.wrapfunction(localrepo, b'makefilestorage', makefilestorage)
1317 1336 extensions.wrapfunction(localrepo, b'makemain', makemain)
1318 1337 extensions.wrapfunction(verify.verifier, b'__init__', verifierinit)
1319 1338
1320 1339
1321 1340 def reposetup(ui, repo):
1322 1341 if isinstance(repo, sqliterepository):
1323 1342 repo._db = None
1324 1343
1325 1344 # TODO check for bundlerepository?
General Comments 0
You need to be logged in to leave comments. Login now