##// END OF EJS Templates
sqlitestore: support for storing revisions without their parents...
Gregory Szorc -
r40428:595641bd default
parent child Browse files
Show More
@@ -1,1134 +1,1169
1 1 # sqlitestore.py - Storage backend that uses SQLite
2 2 #
3 3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 """store repository data in SQLite (EXPERIMENTAL)
9 9
10 10 The sqlitestore extension enables the storage of repository data in SQLite.
11 11
12 12 This extension is HIGHLY EXPERIMENTAL. There are NO BACKWARDS COMPATIBILITY
13 13 GUARANTEES. This means that repositories created with this extension may
14 14 only be usable with the exact version of this extension/Mercurial that was
15 15 used. The extension attempts to enforce this in order to prevent repository
16 16 corruption.
17 17
18 18 In addition, several features are not yet supported or have known bugs:
19 19
20 20 * Only some data is stored in SQLite. Changeset, manifest, and other repository
21 21 data is not yet stored in SQLite.
22 22 * Transactions are not robust. If the process is aborted at the right time
23 23 during transaction close/rollback, the repository could be in an inconsistent
24 24 state. This problem will diminish once all repository data is tracked by
25 25 SQLite.
26 26 * Bundle repositories do not work (the ability to use e.g.
27 27 `hg -R <bundle-file> log` to automatically overlay a bundle on top of the
28 28 existing repository).
29 29 * Various other features don't work.
30 30
31 31 This extension should work for basic clone/pull, update, and commit workflows.
32 32 Some history rewriting operations may fail due to lack of support for bundle
33 33 repositories.
34 34
35 35 To use, activate the extension and set the ``storage.new-repo-backend`` config
36 36 option to ``sqlite`` to enable new repositories to use SQLite for storage.
37 37 """
38 38
39 39 # To run the test suite with repos using SQLite by default, execute the
40 40 # following:
41 41 #
42 42 # HGREPOFEATURES="sqlitestore" run-tests.py \
43 43 # --extra-config-opt extensions.sqlitestore= \
44 44 # --extra-config-opt storage.new-repo-backend=sqlite
45 45
46 46 from __future__ import absolute_import
47 47
48 48 import hashlib
49 49 import sqlite3
50 50 import struct
51 51 import threading
52 52 import zlib
53 53
54 54 from mercurial.i18n import _
55 55 from mercurial.node import (
56 56 nullid,
57 57 nullrev,
58 58 short,
59 59 )
60 60 from mercurial.thirdparty import (
61 61 attr,
62 62 )
63 63 from mercurial import (
64 64 ancestor,
65 65 dagop,
66 66 error,
67 67 extensions,
68 68 localrepo,
69 69 mdiff,
70 70 pycompat,
71 71 registrar,
72 72 repository,
73 73 util,
74 74 verify,
75 75 )
76 76 from mercurial.utils import (
77 77 interfaceutil,
78 78 storageutil,
79 79 )
80 80
81 81 try:
82 82 from mercurial import zstd
83 83 zstd.__version__
84 84 except ImportError:
85 85 zstd = None
86 86
87 87 configtable = {}
88 88 configitem = registrar.configitem(configtable)
89 89
90 90 # experimental config: storage.sqlite.compression
91 91 configitem('storage', 'sqlite.compression',
92 92 default='zstd' if zstd else 'zlib')
93 93
94 94 # Note for extension authors: ONLY specify testedwith = 'ships-with-hg-core' for
95 95 # extensions which SHIP WITH MERCURIAL. Non-mainline extensions should
96 96 # be specifying the version(s) of Mercurial they are tested with, or
97 97 # leave the attribute unspecified.
98 98 testedwith = 'ships-with-hg-core'
99 99
100 100 REQUIREMENT = b'exp-sqlite-001'
101 101 REQUIREMENT_ZSTD = b'exp-sqlite-comp-001=zstd'
102 102 REQUIREMENT_ZLIB = b'exp-sqlite-comp-001=zlib'
103 103 REQUIREMENT_NONE = b'exp-sqlite-comp-001=none'
104 104 REQUIREMENT_SHALLOW_FILES = b'exp-sqlite-shallow-files'
105 105
106 106 CURRENT_SCHEMA_VERSION = 1
107 107
108 108 COMPRESSION_NONE = 1
109 109 COMPRESSION_ZSTD = 2
110 110 COMPRESSION_ZLIB = 3
111 111
112 112 FLAG_CENSORED = 1
113 FLAG_MISSING_P1 = 2
114 FLAG_MISSING_P2 = 4
113 115
114 116 CREATE_SCHEMA = [
115 117 # Deltas are stored as content-indexed blobs.
116 118 # compression column holds COMPRESSION_* constant for how the
117 119 # delta is encoded.
118 120
119 121 r'CREATE TABLE delta ('
120 122 r' id INTEGER PRIMARY KEY, '
121 123 r' compression INTEGER NOT NULL, '
122 124 r' hash BLOB UNIQUE ON CONFLICT ABORT, '
123 125 r' delta BLOB NOT NULL '
124 126 r')',
125 127
126 128 # Tracked paths are denormalized to integers to avoid redundant
127 129 # storage of the path name.
128 130 r'CREATE TABLE filepath ('
129 131 r' id INTEGER PRIMARY KEY, '
130 132 r' path BLOB NOT NULL '
131 133 r')',
132 134
133 135 r'CREATE UNIQUE INDEX filepath_path '
134 136 r' ON filepath (path)',
135 137
136 138 # We have a single table for all file revision data.
137 139 # Each file revision is uniquely described by a (path, rev) and
138 140 # (path, node).
139 141 #
140 142 # Revision data is stored as a pointer to the delta producing this
141 143 # revision and the file revision whose delta should be applied before
142 144 # that one. One can reconstruct the delta chain by recursively following
143 145 # the delta base revision pointers until one encounters NULL.
144 146 #
145 147 # flags column holds bitwise integer flags controlling storage options.
146 148 # These flags are defined by the FLAG_* constants.
147 149 r'CREATE TABLE fileindex ('
148 150 r' id INTEGER PRIMARY KEY, '
149 151 r' pathid INTEGER REFERENCES filepath(id), '
150 152 r' revnum INTEGER NOT NULL, '
151 153 r' p1rev INTEGER NOT NULL, '
152 154 r' p2rev INTEGER NOT NULL, '
153 155 r' linkrev INTEGER NOT NULL, '
154 156 r' flags INTEGER NOT NULL, '
155 157 r' deltaid INTEGER REFERENCES delta(id), '
156 158 r' deltabaseid INTEGER REFERENCES fileindex(id), '
157 159 r' node BLOB NOT NULL '
158 160 r')',
159 161
160 162 r'CREATE UNIQUE INDEX fileindex_pathrevnum '
161 163 r' ON fileindex (pathid, revnum)',
162 164
163 165 r'CREATE UNIQUE INDEX fileindex_pathnode '
164 166 r' ON fileindex (pathid, node)',
165 167
166 168 # Provide a view over all file data for convenience.
167 169 r'CREATE VIEW filedata AS '
168 170 r'SELECT '
169 171 r' fileindex.id AS id, '
170 172 r' filepath.id AS pathid, '
171 173 r' filepath.path AS path, '
172 174 r' fileindex.revnum AS revnum, '
173 175 r' fileindex.node AS node, '
174 176 r' fileindex.p1rev AS p1rev, '
175 177 r' fileindex.p2rev AS p2rev, '
176 178 r' fileindex.linkrev AS linkrev, '
177 179 r' fileindex.flags AS flags, '
178 180 r' fileindex.deltaid AS deltaid, '
179 181 r' fileindex.deltabaseid AS deltabaseid '
180 182 r'FROM filepath, fileindex '
181 183 r'WHERE fileindex.pathid=filepath.id',
182 184
183 185 r'PRAGMA user_version=%d' % CURRENT_SCHEMA_VERSION,
184 186 ]
185 187
186 188 def resolvedeltachain(db, pathid, node, revisioncache,
187 189 stoprids, zstddctx=None):
188 190 """Resolve a delta chain for a file node."""
189 191
190 192 # TODO the "not in ({stops})" here is possibly slowing down the query
191 193 # because it needs to perform the lookup on every recursive invocation.
192 194 # This could possibly be faster if we created a temporary query with
193 195 # baseid "poisoned" to null and limited the recursive filter to
194 196 # "is not null".
195 197 res = db.execute(
196 198 r'WITH RECURSIVE '
197 199 r' deltachain(deltaid, baseid) AS ('
198 200 r' SELECT deltaid, deltabaseid FROM fileindex '
199 201 r' WHERE pathid=? AND node=? '
200 202 r' UNION ALL '
201 203 r' SELECT fileindex.deltaid, deltabaseid '
202 204 r' FROM fileindex, deltachain '
203 205 r' WHERE '
204 206 r' fileindex.id=deltachain.baseid '
205 207 r' AND deltachain.baseid IS NOT NULL '
206 208 r' AND fileindex.id NOT IN ({stops}) '
207 209 r' ) '
208 210 r'SELECT deltachain.baseid, compression, delta '
209 211 r'FROM deltachain, delta '
210 212 r'WHERE delta.id=deltachain.deltaid'.format(
211 213 stops=r','.join([r'?'] * len(stoprids))),
212 214 tuple([pathid, node] + list(stoprids.keys())))
213 215
214 216 deltas = []
215 217 lastdeltabaseid = None
216 218
217 219 for deltabaseid, compression, delta in res:
218 220 lastdeltabaseid = deltabaseid
219 221
220 222 if compression == COMPRESSION_ZSTD:
221 223 delta = zstddctx.decompress(delta)
222 224 elif compression == COMPRESSION_NONE:
223 225 delta = delta
224 226 elif compression == COMPRESSION_ZLIB:
225 227 delta = zlib.decompress(delta)
226 228 else:
227 229 raise SQLiteStoreError('unhandled compression type: %d' %
228 230 compression)
229 231
230 232 deltas.append(delta)
231 233
232 234 if lastdeltabaseid in stoprids:
233 235 basetext = revisioncache[stoprids[lastdeltabaseid]]
234 236 else:
235 237 basetext = deltas.pop()
236 238
237 239 deltas.reverse()
238 240 fulltext = mdiff.patches(basetext, deltas)
239 241
240 242 # SQLite returns buffer instances for blob columns on Python 2. This
241 243 # type can propagate through the delta application layer. Because
242 244 # downstream callers assume revisions are bytes, cast as needed.
243 245 if not isinstance(fulltext, bytes):
244 246 fulltext = bytes(delta)
245 247
246 248 return fulltext
247 249
248 250 def insertdelta(db, compression, hash, delta):
249 251 try:
250 252 return db.execute(
251 253 r'INSERT INTO delta (compression, hash, delta) '
252 254 r'VALUES (?, ?, ?)',
253 255 (compression, hash, delta)).lastrowid
254 256 except sqlite3.IntegrityError:
255 257 return db.execute(
256 258 r'SELECT id FROM delta WHERE hash=?',
257 259 (hash,)).fetchone()[0]
258 260
259 261 class SQLiteStoreError(error.StorageError):
260 262 pass
261 263
262 264 @attr.s
263 265 class revisionentry(object):
264 266 rid = attr.ib()
265 267 rev = attr.ib()
266 268 node = attr.ib()
267 269 p1rev = attr.ib()
268 270 p2rev = attr.ib()
269 271 p1node = attr.ib()
270 272 p2node = attr.ib()
271 273 linkrev = attr.ib()
272 274 flags = attr.ib()
273 275
274 276 @interfaceutil.implementer(repository.irevisiondelta)
275 277 @attr.s(slots=True)
276 278 class sqliterevisiondelta(object):
277 279 node = attr.ib()
278 280 p1node = attr.ib()
279 281 p2node = attr.ib()
280 282 basenode = attr.ib()
281 283 flags = attr.ib()
282 284 baserevisionsize = attr.ib()
283 285 revision = attr.ib()
284 286 delta = attr.ib()
285 287 linknode = attr.ib(default=None)
286 288
287 289 @interfaceutil.implementer(repository.iverifyproblem)
288 290 @attr.s(frozen=True)
289 291 class sqliteproblem(object):
290 292 warning = attr.ib(default=None)
291 293 error = attr.ib(default=None)
292 294 node = attr.ib(default=None)
293 295
294 296 @interfaceutil.implementer(repository.ifilestorage)
295 297 class sqlitefilestore(object):
296 298 """Implements storage for an individual tracked path."""
297 299
298 300 def __init__(self, db, path, compression):
299 301 self._db = db
300 302 self._path = path
301 303
302 304 self._pathid = None
303 305
304 306 # revnum -> node
305 307 self._revtonode = {}
306 308 # node -> revnum
307 309 self._nodetorev = {}
308 310 # node -> data structure
309 311 self._revisions = {}
310 312
311 313 self._revisioncache = util.lrucachedict(10)
312 314
313 315 self._compengine = compression
314 316
315 317 if compression == 'zstd':
316 318 self._cctx = zstd.ZstdCompressor(level=3)
317 319 self._dctx = zstd.ZstdDecompressor()
318 320 else:
319 321 self._cctx = None
320 322 self._dctx = None
321 323
322 324 self._refreshindex()
323 325
324 326 def _refreshindex(self):
325 327 self._revtonode = {}
326 328 self._nodetorev = {}
327 329 self._revisions = {}
328 330
329 331 res = list(self._db.execute(
330 332 r'SELECT id FROM filepath WHERE path=?', (self._path,)))
331 333
332 334 if not res:
333 335 self._pathid = None
334 336 return
335 337
336 338 self._pathid = res[0][0]
337 339
338 340 res = self._db.execute(
339 341 r'SELECT id, revnum, node, p1rev, p2rev, linkrev, flags '
340 342 r'FROM fileindex '
341 343 r'WHERE pathid=? '
342 344 r'ORDER BY revnum ASC',
343 345 (self._pathid,))
344 346
345 347 for i, row in enumerate(res):
346 348 rid, rev, node, p1rev, p2rev, linkrev, flags = row
347 349
348 350 if i != rev:
349 351 raise SQLiteStoreError(_('sqlite database has inconsistent '
350 352 'revision numbers'))
351 353
352 354 if p1rev == nullrev:
353 355 p1node = nullid
354 356 else:
355 357 p1node = self._revtonode[p1rev]
356 358
357 359 if p2rev == nullrev:
358 360 p2node = nullid
359 361 else:
360 362 p2node = self._revtonode[p2rev]
361 363
362 364 entry = revisionentry(
363 365 rid=rid,
364 366 rev=rev,
365 367 node=node,
366 368 p1rev=p1rev,
367 369 p2rev=p2rev,
368 370 p1node=p1node,
369 371 p2node=p2node,
370 372 linkrev=linkrev,
371 373 flags=flags)
372 374
373 375 self._revtonode[rev] = node
374 376 self._nodetorev[node] = rev
375 377 self._revisions[node] = entry
376 378
377 379 # Start of ifileindex interface.
378 380
379 381 def __len__(self):
380 382 return len(self._revisions)
381 383
382 384 def __iter__(self):
383 385 return iter(pycompat.xrange(len(self._revisions)))
384 386
385 387 def hasnode(self, node):
386 388 if node == nullid:
387 389 return False
388 390
389 391 return node in self._nodetorev
390 392
391 393 def revs(self, start=0, stop=None):
392 394 return storageutil.iterrevs(len(self._revisions), start=start,
393 395 stop=stop)
394 396
395 397 def parents(self, node):
396 398 if node == nullid:
397 399 return nullid, nullid
398 400
399 401 if node not in self._revisions:
400 402 raise error.LookupError(node, self._path, _('no node'))
401 403
402 404 entry = self._revisions[node]
403 405 return entry.p1node, entry.p2node
404 406
405 407 def parentrevs(self, rev):
406 408 if rev == nullrev:
407 409 return nullrev, nullrev
408 410
409 411 if rev not in self._revtonode:
410 412 raise IndexError(rev)
411 413
412 414 entry = self._revisions[self._revtonode[rev]]
413 415 return entry.p1rev, entry.p2rev
414 416
415 417 def rev(self, node):
416 418 if node == nullid:
417 419 return nullrev
418 420
419 421 if node not in self._nodetorev:
420 422 raise error.LookupError(node, self._path, _('no node'))
421 423
422 424 return self._nodetorev[node]
423 425
424 426 def node(self, rev):
425 427 if rev == nullrev:
426 428 return nullid
427 429
428 430 if rev not in self._revtonode:
429 431 raise IndexError(rev)
430 432
431 433 return self._revtonode[rev]
432 434
433 435 def lookup(self, node):
434 436 return storageutil.fileidlookup(self, node, self._path)
435 437
436 438 def linkrev(self, rev):
437 439 if rev == nullrev:
438 440 return nullrev
439 441
440 442 if rev not in self._revtonode:
441 443 raise IndexError(rev)
442 444
443 445 entry = self._revisions[self._revtonode[rev]]
444 446 return entry.linkrev
445 447
446 448 def iscensored(self, rev):
447 449 if rev == nullrev:
448 450 return False
449 451
450 452 if rev not in self._revtonode:
451 453 raise IndexError(rev)
452 454
453 455 return self._revisions[self._revtonode[rev]].flags & FLAG_CENSORED
454 456
455 457 def commonancestorsheads(self, node1, node2):
456 458 rev1 = self.rev(node1)
457 459 rev2 = self.rev(node2)
458 460
459 461 ancestors = ancestor.commonancestorsheads(self.parentrevs, rev1, rev2)
460 462 return pycompat.maplist(self.node, ancestors)
461 463
462 464 def descendants(self, revs):
463 465 # TODO we could implement this using a recursive SQL query, which
464 466 # might be faster.
465 467 return dagop.descendantrevs(revs, self.revs, self.parentrevs)
466 468
467 469 def heads(self, start=None, stop=None):
468 470 if start is None and stop is None:
469 471 if not len(self):
470 472 return [nullid]
471 473
472 474 startrev = self.rev(start) if start is not None else nullrev
473 475 stoprevs = {self.rev(n) for n in stop or []}
474 476
475 477 revs = dagop.headrevssubset(self.revs, self.parentrevs,
476 478 startrev=startrev, stoprevs=stoprevs)
477 479
478 480 return [self.node(rev) for rev in revs]
479 481
480 482 def children(self, node):
481 483 rev = self.rev(node)
482 484
483 485 res = self._db.execute(
484 486 r'SELECT'
485 487 r' node '
486 488 r' FROM filedata '
487 489 r' WHERE path=? AND (p1rev=? OR p2rev=?) '
488 490 r' ORDER BY revnum ASC',
489 491 (self._path, rev, rev))
490 492
491 493 return [row[0] for row in res]
492 494
493 495 # End of ifileindex interface.
494 496
495 497 # Start of ifiledata interface.
496 498
497 499 def size(self, rev):
498 500 if rev == nullrev:
499 501 return 0
500 502
501 503 if rev not in self._revtonode:
502 504 raise IndexError(rev)
503 505
504 506 node = self._revtonode[rev]
505 507
506 508 if self.renamed(node):
507 509 return len(self.read(node))
508 510
509 511 return len(self.revision(node))
510 512
511 513 def revision(self, node, raw=False, _verifyhash=True):
512 514 if node in (nullid, nullrev):
513 515 return b''
514 516
515 517 if isinstance(node, int):
516 518 node = self.node(node)
517 519
518 520 if node not in self._nodetorev:
519 521 raise error.LookupError(node, self._path, _('no node'))
520 522
521 523 if node in self._revisioncache:
522 524 return self._revisioncache[node]
523 525
524 526 # Because we have a fulltext revision cache, we are able to
525 527 # short-circuit delta chain traversal and decompression as soon as
526 528 # we encounter a revision in the cache.
527 529
528 530 stoprids = {self._revisions[n].rid: n
529 531 for n in self._revisioncache}
530 532
531 533 if not stoprids:
532 534 stoprids[-1] = None
533 535
534 536 fulltext = resolvedeltachain(self._db, self._pathid, node,
535 537 self._revisioncache, stoprids,
536 538 zstddctx=self._dctx)
537 539
540 # Don't verify hashes if parent nodes were rewritten, as the hash
541 # wouldn't verify.
542 if self._revisions[node].flags & (FLAG_MISSING_P1 | FLAG_MISSING_P2):
543 _verifyhash = False
544
538 545 if _verifyhash:
539 546 self._checkhash(fulltext, node)
540 547 self._revisioncache[node] = fulltext
541 548
542 549 return fulltext
543 550
544 551 def read(self, node):
545 552 return storageutil.filtermetadata(self.revision(node))
546 553
547 554 def renamed(self, node):
548 555 return storageutil.filerevisioncopied(self, node)
549 556
550 557 def cmp(self, node, fulltext):
551 558 return not storageutil.filedataequivalent(self, node, fulltext)
552 559
553 560 def emitrevisions(self, nodes, nodesorder=None, revisiondata=False,
554 561 assumehaveparentrevisions=False, deltaprevious=False):
555 562 if nodesorder not in ('nodes', 'storage', None):
556 563 raise error.ProgrammingError('unhandled value for nodesorder: %s' %
557 564 nodesorder)
558 565
559 566 nodes = [n for n in nodes if n != nullid]
560 567
561 568 if not nodes:
562 569 return
563 570
564 571 # TODO perform in a single query.
565 572 res = self._db.execute(
566 573 r'SELECT revnum, deltaid FROM fileindex '
567 574 r'WHERE pathid=? '
568 575 r' AND node in (%s)' % (r','.join([r'?'] * len(nodes))),
569 576 tuple([self._pathid] + nodes))
570 577
571 578 deltabases = {}
572 579
573 580 for rev, deltaid in res:
574 581 res = self._db.execute(
575 582 r'SELECT revnum from fileindex WHERE pathid=? AND deltaid=?',
576 583 (self._pathid, deltaid))
577 584 deltabases[rev] = res.fetchone()[0]
578 585
579 586 # TODO define revdifffn so we can use delta from storage.
580 587 for delta in storageutil.emitrevisions(
581 588 self, nodes, nodesorder, sqliterevisiondelta,
582 589 deltaparentfn=deltabases.__getitem__,
583 590 revisiondata=revisiondata,
584 591 assumehaveparentrevisions=assumehaveparentrevisions,
585 592 deltaprevious=deltaprevious):
586 593
587 594 yield delta
588 595
589 596 # End of ifiledata interface.
590 597
591 598 # Start of ifilemutation interface.
592 599
593 600 def add(self, filedata, meta, transaction, linkrev, p1, p2):
594 601 if meta or filedata.startswith(b'\x01\n'):
595 602 filedata = storageutil.packmeta(meta, filedata)
596 603
597 604 return self.addrevision(filedata, transaction, linkrev, p1, p2)
598 605
599 606 def addrevision(self, revisiondata, transaction, linkrev, p1, p2, node=None,
600 607 flags=0, cachedelta=None):
601 608 if flags:
602 609 raise SQLiteStoreError(_('flags not supported on revisions'))
603 610
604 611 validatehash = node is not None
605 612 node = node or storageutil.hashrevisionsha1(revisiondata, p1, p2)
606 613
607 614 if validatehash:
608 615 self._checkhash(revisiondata, node, p1, p2)
609 616
610 617 if node in self._nodetorev:
611 618 return node
612 619
613 620 node = self._addrawrevision(node, revisiondata, transaction, linkrev,
614 621 p1, p2)
615 622
616 623 self._revisioncache[node] = revisiondata
617 624 return node
618 625
619 626 def addgroup(self, deltas, linkmapper, transaction, addrevisioncb=None,
620 627 maybemissingparents=False):
621 if maybemissingparents:
622 raise error.Abort(_('SQLite storage does not support missing '
623 'parents write mode'))
624
625 628 nodes = []
626 629
627 630 for node, p1, p2, linknode, deltabase, delta, wireflags in deltas:
628 631 storeflags = 0
629 632
630 633 if wireflags & repository.REVISION_FLAG_CENSORED:
631 634 storeflags |= FLAG_CENSORED
632 635
633 636 if wireflags & ~repository.REVISION_FLAG_CENSORED:
634 637 raise SQLiteStoreError('unhandled revision flag')
635 638
639 if maybemissingparents:
640 if p1 != nullid and not self.hasnode(p1):
641 p1 = nullid
642 storeflags |= FLAG_MISSING_P1
643
644 if p2 != nullid and not self.hasnode(p2):
645 p2 = nullid
646 storeflags |= FLAG_MISSING_P2
647
636 648 baserev = self.rev(deltabase)
637 649
638 650 # If base is censored, delta must be full replacement in a single
639 651 # patch operation.
640 652 if baserev != nullrev and self.iscensored(baserev):
641 653 hlen = struct.calcsize('>lll')
642 654 oldlen = len(self.revision(deltabase, raw=True,
643 655 _verifyhash=False))
644 656 newlen = len(delta) - hlen
645 657
646 658 if delta[:hlen] != mdiff.replacediffheader(oldlen, newlen):
647 659 raise error.CensoredBaseError(self._path,
648 660 deltabase)
649 661
650 662 if (not (storeflags & FLAG_CENSORED)
651 663 and storageutil.deltaiscensored(
652 664 delta, baserev, lambda x: len(self.revision(x, raw=True)))):
653 665 storeflags |= FLAG_CENSORED
654 666
655 667 linkrev = linkmapper(linknode)
656 668
657 669 nodes.append(node)
658 670
659 671 if node in self._revisions:
672 # Possibly reset parents to make them proper.
673 entry = self._revisions[node]
674
675 if entry.flags & FLAG_MISSING_P1 and p1 != nullid:
676 entry.p1node = p1
677 entry.p1rev = self._nodetorev[p1]
678 entry.flags &= ~FLAG_MISSING_P1
679
680 self._db.execute(
681 r'UPDATE fileindex SET p1rev=?, flags=? '
682 r'WHERE id=?',
683 (self._nodetorev[p1], entry.flags, entry.rid))
684
685 if entry.flags & FLAG_MISSING_P2 and p2 != nullid:
686 entry.p2node = p2
687 entry.p2rev = self._nodetorev[p2]
688 entry.flags &= ~FLAG_MISSING_P2
689
690 self._db.execute(
691 r'UPDATE fileindex SET p2rev=?, flags=? '
692 r'WHERE id=?',
693 (self._nodetorev[p1], entry.flags, entry.rid))
694
660 695 continue
661 696
662 697 if deltabase == nullid:
663 698 text = mdiff.patch(b'', delta)
664 699 storedelta = None
665 700 else:
666 701 text = None
667 702 storedelta = (deltabase, delta)
668 703
669 704 self._addrawrevision(node, text, transaction, linkrev, p1, p2,
670 705 storedelta=storedelta, flags=storeflags)
671 706
672 707 if addrevisioncb:
673 708 addrevisioncb(self, node)
674 709
675 710 return nodes
676 711
677 712 def censorrevision(self, tr, censornode, tombstone=b''):
678 713 tombstone = storageutil.packmeta({b'censored': tombstone}, b'')
679 714
680 715 # This restriction is cargo culted from revlogs and makes no sense for
681 716 # SQLite, since columns can be resized at will.
682 717 if len(tombstone) > len(self.revision(censornode, raw=True)):
683 718 raise error.Abort(_('censor tombstone must be no longer than '
684 719 'censored data'))
685 720
686 721 # We need to replace the censored revision's data with the tombstone.
687 722 # But replacing that data will have implications for delta chains that
688 723 # reference it.
689 724 #
690 725 # While "better," more complex strategies are possible, we do something
691 726 # simple: we find delta chain children of the censored revision and we
692 727 # replace those incremental deltas with fulltexts of their corresponding
693 728 # revision. Then we delete the now-unreferenced delta and original
694 729 # revision and insert a replacement.
695 730
696 731 # Find the delta to be censored.
697 732 censoreddeltaid = self._db.execute(
698 733 r'SELECT deltaid FROM fileindex WHERE id=?',
699 734 (self._revisions[censornode].rid,)).fetchone()[0]
700 735
701 736 # Find all its delta chain children.
702 737 # TODO once we support storing deltas for !files, we'll need to look
703 738 # for those delta chains too.
704 739 rows = list(self._db.execute(
705 740 r'SELECT id, pathid, node FROM fileindex '
706 741 r'WHERE deltabaseid=? OR deltaid=?',
707 742 (censoreddeltaid, censoreddeltaid)))
708 743
709 744 for row in rows:
710 745 rid, pathid, node = row
711 746
712 747 fulltext = resolvedeltachain(self._db, pathid, node, {}, {-1: None},
713 748 zstddctx=self._dctx)
714 749
715 750 deltahash = hashlib.sha1(fulltext).digest()
716 751
717 752 if self._compengine == 'zstd':
718 753 deltablob = self._cctx.compress(fulltext)
719 754 compression = COMPRESSION_ZSTD
720 755 elif self._compengine == 'zlib':
721 756 deltablob = zlib.compress(fulltext)
722 757 compression = COMPRESSION_ZLIB
723 758 elif self._compengine == 'none':
724 759 deltablob = fulltext
725 760 compression = COMPRESSION_NONE
726 761 else:
727 762 raise error.ProgrammingError('unhandled compression engine: %s'
728 763 % self._compengine)
729 764
730 765 if len(deltablob) >= len(fulltext):
731 766 deltablob = fulltext
732 767 compression = COMPRESSION_NONE
733 768
734 769 deltaid = insertdelta(self._db, compression, deltahash, deltablob)
735 770
736 771 self._db.execute(
737 772 r'UPDATE fileindex SET deltaid=?, deltabaseid=NULL '
738 773 r'WHERE id=?', (deltaid, rid))
739 774
740 775 # Now create the tombstone delta and replace the delta on the censored
741 776 # node.
742 777 deltahash = hashlib.sha1(tombstone).digest()
743 778 tombstonedeltaid = insertdelta(self._db, COMPRESSION_NONE,
744 779 deltahash, tombstone)
745 780
746 781 flags = self._revisions[censornode].flags
747 782 flags |= FLAG_CENSORED
748 783
749 784 self._db.execute(
750 785 r'UPDATE fileindex SET flags=?, deltaid=?, deltabaseid=NULL '
751 786 r'WHERE pathid=? AND node=?',
752 787 (flags, tombstonedeltaid, self._pathid, censornode))
753 788
754 789 self._db.execute(
755 790 r'DELETE FROM delta WHERE id=?', (censoreddeltaid,))
756 791
757 792 self._refreshindex()
758 793 self._revisioncache.clear()
759 794
760 795 def getstrippoint(self, minlink):
761 796 return storageutil.resolvestripinfo(minlink, len(self) - 1,
762 797 [self.rev(n) for n in self.heads()],
763 798 self.linkrev,
764 799 self.parentrevs)
765 800
766 801 def strip(self, minlink, transaction):
767 802 if not len(self):
768 803 return
769 804
770 805 rev, _ignored = self.getstrippoint(minlink)
771 806
772 807 if rev == len(self):
773 808 return
774 809
775 810 for rev in self.revs(rev):
776 811 self._db.execute(
777 812 r'DELETE FROM fileindex WHERE pathid=? AND node=?',
778 813 (self._pathid, self.node(rev)))
779 814
780 815 # TODO how should we garbage collect data in delta table?
781 816
782 817 self._refreshindex()
783 818
784 819 # End of ifilemutation interface.
785 820
786 821 # Start of ifilestorage interface.
787 822
788 823 def files(self):
789 824 return []
790 825
791 826 def storageinfo(self, exclusivefiles=False, sharedfiles=False,
792 827 revisionscount=False, trackedsize=False,
793 828 storedsize=False):
794 829 d = {}
795 830
796 831 if exclusivefiles:
797 832 d['exclusivefiles'] = []
798 833
799 834 if sharedfiles:
800 835 # TODO list sqlite file(s) here.
801 836 d['sharedfiles'] = []
802 837
803 838 if revisionscount:
804 839 d['revisionscount'] = len(self)
805 840
806 841 if trackedsize:
807 842 d['trackedsize'] = sum(len(self.revision(node))
808 843 for node in self._nodetorev)
809 844
810 845 if storedsize:
811 846 # TODO implement this?
812 847 d['storedsize'] = None
813 848
814 849 return d
815 850
816 851 def verifyintegrity(self, state):
817 852 state['skipread'] = set()
818 853
819 854 for rev in self:
820 855 node = self.node(rev)
821 856
822 857 try:
823 858 self.revision(node)
824 859 except Exception as e:
825 860 yield sqliteproblem(
826 861 error=_('unpacking %s: %s') % (short(node), e),
827 862 node=node)
828 863
829 864 state['skipread'].add(node)
830 865
831 866 # End of ifilestorage interface.
832 867
833 868 def _checkhash(self, fulltext, node, p1=None, p2=None):
834 869 if p1 is None and p2 is None:
835 870 p1, p2 = self.parents(node)
836 871
837 872 if node == storageutil.hashrevisionsha1(fulltext, p1, p2):
838 873 return
839 874
840 875 try:
841 876 del self._revisioncache[node]
842 877 except KeyError:
843 878 pass
844 879
845 880 if storageutil.iscensoredtext(fulltext):
846 881 raise error.CensoredNodeError(self._path, node, fulltext)
847 882
848 883 raise SQLiteStoreError(_('integrity check failed on %s') %
849 884 self._path)
850 885
851 886 def _addrawrevision(self, node, revisiondata, transaction, linkrev,
852 887 p1, p2, storedelta=None, flags=0):
853 888 if self._pathid is None:
854 889 res = self._db.execute(
855 890 r'INSERT INTO filepath (path) VALUES (?)', (self._path,))
856 891 self._pathid = res.lastrowid
857 892
858 893 # For simplicity, always store a delta against p1.
859 894 # TODO we need a lot more logic here to make behavior reasonable.
860 895
861 896 if storedelta:
862 897 deltabase, delta = storedelta
863 898
864 899 if isinstance(deltabase, int):
865 900 deltabase = self.node(deltabase)
866 901
867 902 else:
868 903 assert revisiondata is not None
869 904 deltabase = p1
870 905
871 906 if deltabase == nullid:
872 907 delta = revisiondata
873 908 else:
874 909 delta = mdiff.textdiff(self.revision(self.rev(deltabase)),
875 910 revisiondata)
876 911
877 912 # File index stores a pointer to its delta and the parent delta.
878 913 # The parent delta is stored via a pointer to the fileindex PK.
879 914 if deltabase == nullid:
880 915 baseid = None
881 916 else:
882 917 baseid = self._revisions[deltabase].rid
883 918
884 919 # Deltas are stored with a hash of their content. This allows
885 920 # us to de-duplicate. The table is configured to ignore conflicts
886 921 # and it is faster to just insert and silently noop than to look
887 922 # first.
888 923 deltahash = hashlib.sha1(delta).digest()
889 924
890 925 if self._compengine == 'zstd':
891 926 deltablob = self._cctx.compress(delta)
892 927 compression = COMPRESSION_ZSTD
893 928 elif self._compengine == 'zlib':
894 929 deltablob = zlib.compress(delta)
895 930 compression = COMPRESSION_ZLIB
896 931 elif self._compengine == 'none':
897 932 deltablob = delta
898 933 compression = COMPRESSION_NONE
899 934 else:
900 935 raise error.ProgrammingError('unhandled compression engine: %s' %
901 936 self._compengine)
902 937
903 938 # Don't store compressed data if it isn't practical.
904 939 if len(deltablob) >= len(delta):
905 940 deltablob = delta
906 941 compression = COMPRESSION_NONE
907 942
908 943 deltaid = insertdelta(self._db, compression, deltahash, deltablob)
909 944
910 945 rev = len(self)
911 946
912 947 if p1 == nullid:
913 948 p1rev = nullrev
914 949 else:
915 950 p1rev = self._nodetorev[p1]
916 951
917 952 if p2 == nullid:
918 953 p2rev = nullrev
919 954 else:
920 955 p2rev = self._nodetorev[p2]
921 956
922 957 rid = self._db.execute(
923 958 r'INSERT INTO fileindex ('
924 959 r' pathid, revnum, node, p1rev, p2rev, linkrev, flags, '
925 960 r' deltaid, deltabaseid) '
926 961 r' VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)',
927 962 (self._pathid, rev, node, p1rev, p2rev, linkrev, flags,
928 963 deltaid, baseid)
929 964 ).lastrowid
930 965
931 966 entry = revisionentry(
932 967 rid=rid,
933 968 rev=rev,
934 969 node=node,
935 970 p1rev=p1rev,
936 971 p2rev=p2rev,
937 972 p1node=p1,
938 973 p2node=p2,
939 974 linkrev=linkrev,
940 975 flags=flags)
941 976
942 977 self._nodetorev[node] = rev
943 978 self._revtonode[rev] = node
944 979 self._revisions[node] = entry
945 980
946 981 return node
947 982
948 983 class sqliterepository(localrepo.localrepository):
949 984 def cancopy(self):
950 985 return False
951 986
952 987 def transaction(self, *args, **kwargs):
953 988 current = self.currenttransaction()
954 989
955 990 tr = super(sqliterepository, self).transaction(*args, **kwargs)
956 991
957 992 if current:
958 993 return tr
959 994
960 995 self._dbconn.execute(r'BEGIN TRANSACTION')
961 996
962 997 def committransaction(_):
963 998 self._dbconn.commit()
964 999
965 1000 tr.addfinalize('sqlitestore', committransaction)
966 1001
967 1002 return tr
968 1003
969 1004 @property
970 1005 def _dbconn(self):
971 1006 # SQLite connections can only be used on the thread that created
972 1007 # them. In most cases, this "just works." However, hgweb uses
973 1008 # multiple threads.
974 1009 tid = threading.current_thread().ident
975 1010
976 1011 if self._db:
977 1012 if self._db[0] == tid:
978 1013 return self._db[1]
979 1014
980 1015 db = makedb(self.svfs.join('db.sqlite'))
981 1016 self._db = (tid, db)
982 1017
983 1018 return db
984 1019
985 1020 def makedb(path):
986 1021 """Construct a database handle for a database at path."""
987 1022
988 1023 db = sqlite3.connect(path)
989 1024 db.text_factory = bytes
990 1025
991 1026 res = db.execute(r'PRAGMA user_version').fetchone()[0]
992 1027
993 1028 # New database.
994 1029 if res == 0:
995 1030 for statement in CREATE_SCHEMA:
996 1031 db.execute(statement)
997 1032
998 1033 db.commit()
999 1034
1000 1035 elif res == CURRENT_SCHEMA_VERSION:
1001 1036 pass
1002 1037
1003 1038 else:
1004 1039 raise error.Abort(_('sqlite database has unrecognized version'))
1005 1040
1006 1041 db.execute(r'PRAGMA journal_mode=WAL')
1007 1042
1008 1043 return db
1009 1044
1010 1045 def featuresetup(ui, supported):
1011 1046 supported.add(REQUIREMENT)
1012 1047
1013 1048 if zstd:
1014 1049 supported.add(REQUIREMENT_ZSTD)
1015 1050
1016 1051 supported.add(REQUIREMENT_ZLIB)
1017 1052 supported.add(REQUIREMENT_NONE)
1018 1053 supported.add(REQUIREMENT_SHALLOW_FILES)
1019 1054 supported.add(repository.NARROW_REQUIREMENT)
1020 1055
1021 1056 def newreporequirements(orig, ui, createopts):
1022 1057 if createopts['backend'] != 'sqlite':
1023 1058 return orig(ui, createopts)
1024 1059
1025 1060 # This restriction can be lifted once we have more confidence.
1026 1061 if 'sharedrepo' in createopts:
1027 1062 raise error.Abort(_('shared repositories not supported with SQLite '
1028 1063 'store'))
1029 1064
1030 1065 # This filtering is out of an abundance of caution: we want to ensure
1031 1066 # we honor creation options and we do that by annotating exactly the
1032 1067 # creation options we recognize.
1033 1068 known = {
1034 1069 'narrowfiles',
1035 1070 'backend',
1036 1071 'shallowfilestore',
1037 1072 }
1038 1073
1039 1074 unsupported = set(createopts) - known
1040 1075 if unsupported:
1041 1076 raise error.Abort(_('SQLite store does not support repo creation '
1042 1077 'option: %s') % ', '.join(sorted(unsupported)))
1043 1078
1044 1079 # Since we're a hybrid store that still relies on revlogs, we fall back
1045 1080 # to using the revlogv1 backend's storage requirements then adding our
1046 1081 # own requirement.
1047 1082 createopts['backend'] = 'revlogv1'
1048 1083 requirements = orig(ui, createopts)
1049 1084 requirements.add(REQUIREMENT)
1050 1085
1051 1086 compression = ui.config('storage', 'sqlite.compression')
1052 1087
1053 1088 if compression == 'zstd' and not zstd:
1054 1089 raise error.Abort(_('storage.sqlite.compression set to "zstd" but '
1055 1090 'zstandard compression not available to this '
1056 1091 'Mercurial install'))
1057 1092
1058 1093 if compression == 'zstd':
1059 1094 requirements.add(REQUIREMENT_ZSTD)
1060 1095 elif compression == 'zlib':
1061 1096 requirements.add(REQUIREMENT_ZLIB)
1062 1097 elif compression == 'none':
1063 1098 requirements.add(REQUIREMENT_NONE)
1064 1099 else:
1065 1100 raise error.Abort(_('unknown compression engine defined in '
1066 1101 'storage.sqlite.compression: %s') % compression)
1067 1102
1068 1103 if createopts.get('shallowfilestore'):
1069 1104 requirements.add(REQUIREMENT_SHALLOW_FILES)
1070 1105
1071 1106 return requirements
1072 1107
1073 1108 @interfaceutil.implementer(repository.ilocalrepositoryfilestorage)
1074 1109 class sqlitefilestorage(object):
1075 1110 """Repository file storage backed by SQLite."""
1076 1111 def file(self, path):
1077 1112 if path[0] == b'/':
1078 1113 path = path[1:]
1079 1114
1080 1115 if REQUIREMENT_ZSTD in self.requirements:
1081 1116 compression = 'zstd'
1082 1117 elif REQUIREMENT_ZLIB in self.requirements:
1083 1118 compression = 'zlib'
1084 1119 elif REQUIREMENT_NONE in self.requirements:
1085 1120 compression = 'none'
1086 1121 else:
1087 1122 raise error.Abort(_('unable to determine what compression engine '
1088 1123 'to use for SQLite storage'))
1089 1124
1090 1125 return sqlitefilestore(self._dbconn, path, compression)
1091 1126
1092 1127 def makefilestorage(orig, requirements, features, **kwargs):
1093 1128 """Produce a type conforming to ``ilocalrepositoryfilestorage``."""
1094 1129 if REQUIREMENT in requirements:
1095 1130 if REQUIREMENT_SHALLOW_FILES in requirements:
1096 1131 features.add(repository.REPO_FEATURE_SHALLOW_FILE_STORAGE)
1097 1132
1098 1133 return sqlitefilestorage
1099 1134 else:
1100 1135 return orig(requirements=requirements, features=features, **kwargs)
1101 1136
1102 1137 def makemain(orig, ui, requirements, **kwargs):
1103 1138 if REQUIREMENT in requirements:
1104 1139 if REQUIREMENT_ZSTD in requirements and not zstd:
1105 1140 raise error.Abort(_('repository uses zstandard compression, which '
1106 1141 'is not available to this Mercurial install'))
1107 1142
1108 1143 return sqliterepository
1109 1144
1110 1145 return orig(requirements=requirements, **kwargs)
1111 1146
1112 1147 def verifierinit(orig, self, *args, **kwargs):
1113 1148 orig(self, *args, **kwargs)
1114 1149
1115 1150 # We don't care that files in the store don't align with what is
1116 1151 # advertised. So suppress these warnings.
1117 1152 self.warnorphanstorefiles = False
1118 1153
1119 1154 def extsetup(ui):
1120 1155 localrepo.featuresetupfuncs.add(featuresetup)
1121 1156 extensions.wrapfunction(localrepo, 'newreporequirements',
1122 1157 newreporequirements)
1123 1158 extensions.wrapfunction(localrepo, 'makefilestorage',
1124 1159 makefilestorage)
1125 1160 extensions.wrapfunction(localrepo, 'makemain',
1126 1161 makemain)
1127 1162 extensions.wrapfunction(verify.verifier, '__init__',
1128 1163 verifierinit)
1129 1164
1130 1165 def reposetup(ui, repo):
1131 1166 if isinstance(repo, sqliterepository):
1132 1167 repo._db = None
1133 1168
1134 1169 # TODO check for bundlerepository?
General Comments 0
You need to be logged in to leave comments. Login now