##// END OF EJS Templates
storage: update sqlitestore to use the new `deltamode` parameter...
Boris Feld -
r40464:a0e7fa01 default
parent child Browse files
Show More
@@ -1,1170 +1,1171
1 1 # sqlitestore.py - Storage backend that uses SQLite
2 2 #
3 3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 """store repository data in SQLite (EXPERIMENTAL)
9 9
10 10 The sqlitestore extension enables the storage of repository data in SQLite.
11 11
12 12 This extension is HIGHLY EXPERIMENTAL. There are NO BACKWARDS COMPATIBILITY
13 13 GUARANTEES. This means that repositories created with this extension may
14 14 only be usable with the exact version of this extension/Mercurial that was
15 15 used. The extension attempts to enforce this in order to prevent repository
16 16 corruption.
17 17
18 18 In addition, several features are not yet supported or have known bugs:
19 19
20 20 * Only some data is stored in SQLite. Changeset, manifest, and other repository
21 21 data is not yet stored in SQLite.
22 22 * Transactions are not robust. If the process is aborted at the right time
23 23 during transaction close/rollback, the repository could be in an inconsistent
24 24 state. This problem will diminish once all repository data is tracked by
25 25 SQLite.
26 26 * Bundle repositories do not work (the ability to use e.g.
27 27 `hg -R <bundle-file> log` to automatically overlay a bundle on top of the
28 28 existing repository).
29 29 * Various other features don't work.
30 30
31 31 This extension should work for basic clone/pull, update, and commit workflows.
32 32 Some history rewriting operations may fail due to lack of support for bundle
33 33 repositories.
34 34
35 35 To use, activate the extension and set the ``storage.new-repo-backend`` config
36 36 option to ``sqlite`` to enable new repositories to use SQLite for storage.
37 37 """
38 38
39 39 # To run the test suite with repos using SQLite by default, execute the
40 40 # following:
41 41 #
42 42 # HGREPOFEATURES="sqlitestore" run-tests.py \
43 43 # --extra-config-opt extensions.sqlitestore= \
44 44 # --extra-config-opt storage.new-repo-backend=sqlite
45 45
46 46 from __future__ import absolute_import
47 47
48 48 import hashlib
49 49 import sqlite3
50 50 import struct
51 51 import threading
52 52 import zlib
53 53
54 54 from mercurial.i18n import _
55 55 from mercurial.node import (
56 56 nullid,
57 57 nullrev,
58 58 short,
59 59 )
60 60 from mercurial.thirdparty import (
61 61 attr,
62 62 )
63 63 from mercurial import (
64 64 ancestor,
65 65 dagop,
66 66 encoding,
67 67 error,
68 68 extensions,
69 69 localrepo,
70 70 mdiff,
71 71 pycompat,
72 72 registrar,
73 73 repository,
74 74 util,
75 75 verify,
76 76 )
77 77 from mercurial.utils import (
78 78 interfaceutil,
79 79 storageutil,
80 80 )
81 81
82 82 try:
83 83 from mercurial import zstd
84 84 zstd.__version__
85 85 except ImportError:
86 86 zstd = None
87 87
88 88 configtable = {}
89 89 configitem = registrar.configitem(configtable)
90 90
91 91 # experimental config: storage.sqlite.compression
92 92 configitem('storage', 'sqlite.compression',
93 93 default='zstd' if zstd else 'zlib')
94 94
95 95 # Note for extension authors: ONLY specify testedwith = 'ships-with-hg-core' for
96 96 # extensions which SHIP WITH MERCURIAL. Non-mainline extensions should
97 97 # be specifying the version(s) of Mercurial they are tested with, or
98 98 # leave the attribute unspecified.
99 99 testedwith = 'ships-with-hg-core'
100 100
101 101 REQUIREMENT = b'exp-sqlite-001'
102 102 REQUIREMENT_ZSTD = b'exp-sqlite-comp-001=zstd'
103 103 REQUIREMENT_ZLIB = b'exp-sqlite-comp-001=zlib'
104 104 REQUIREMENT_NONE = b'exp-sqlite-comp-001=none'
105 105 REQUIREMENT_SHALLOW_FILES = b'exp-sqlite-shallow-files'
106 106
107 107 CURRENT_SCHEMA_VERSION = 1
108 108
109 109 COMPRESSION_NONE = 1
110 110 COMPRESSION_ZSTD = 2
111 111 COMPRESSION_ZLIB = 3
112 112
113 113 FLAG_CENSORED = 1
114 114 FLAG_MISSING_P1 = 2
115 115 FLAG_MISSING_P2 = 4
116 116
117 117 CREATE_SCHEMA = [
118 118 # Deltas are stored as content-indexed blobs.
119 119 # compression column holds COMPRESSION_* constant for how the
120 120 # delta is encoded.
121 121
122 122 r'CREATE TABLE delta ('
123 123 r' id INTEGER PRIMARY KEY, '
124 124 r' compression INTEGER NOT NULL, '
125 125 r' hash BLOB UNIQUE ON CONFLICT ABORT, '
126 126 r' delta BLOB NOT NULL '
127 127 r')',
128 128
129 129 # Tracked paths are denormalized to integers to avoid redundant
130 130 # storage of the path name.
131 131 r'CREATE TABLE filepath ('
132 132 r' id INTEGER PRIMARY KEY, '
133 133 r' path BLOB NOT NULL '
134 134 r')',
135 135
136 136 r'CREATE UNIQUE INDEX filepath_path '
137 137 r' ON filepath (path)',
138 138
139 139 # We have a single table for all file revision data.
140 140 # Each file revision is uniquely described by a (path, rev) and
141 141 # (path, node).
142 142 #
143 143 # Revision data is stored as a pointer to the delta producing this
144 144 # revision and the file revision whose delta should be applied before
145 145 # that one. One can reconstruct the delta chain by recursively following
146 146 # the delta base revision pointers until one encounters NULL.
147 147 #
148 148 # flags column holds bitwise integer flags controlling storage options.
149 149 # These flags are defined by the FLAG_* constants.
150 150 r'CREATE TABLE fileindex ('
151 151 r' id INTEGER PRIMARY KEY, '
152 152 r' pathid INTEGER REFERENCES filepath(id), '
153 153 r' revnum INTEGER NOT NULL, '
154 154 r' p1rev INTEGER NOT NULL, '
155 155 r' p2rev INTEGER NOT NULL, '
156 156 r' linkrev INTEGER NOT NULL, '
157 157 r' flags INTEGER NOT NULL, '
158 158 r' deltaid INTEGER REFERENCES delta(id), '
159 159 r' deltabaseid INTEGER REFERENCES fileindex(id), '
160 160 r' node BLOB NOT NULL '
161 161 r')',
162 162
163 163 r'CREATE UNIQUE INDEX fileindex_pathrevnum '
164 164 r' ON fileindex (pathid, revnum)',
165 165
166 166 r'CREATE UNIQUE INDEX fileindex_pathnode '
167 167 r' ON fileindex (pathid, node)',
168 168
169 169 # Provide a view over all file data for convenience.
170 170 r'CREATE VIEW filedata AS '
171 171 r'SELECT '
172 172 r' fileindex.id AS id, '
173 173 r' filepath.id AS pathid, '
174 174 r' filepath.path AS path, '
175 175 r' fileindex.revnum AS revnum, '
176 176 r' fileindex.node AS node, '
177 177 r' fileindex.p1rev AS p1rev, '
178 178 r' fileindex.p2rev AS p2rev, '
179 179 r' fileindex.linkrev AS linkrev, '
180 180 r' fileindex.flags AS flags, '
181 181 r' fileindex.deltaid AS deltaid, '
182 182 r' fileindex.deltabaseid AS deltabaseid '
183 183 r'FROM filepath, fileindex '
184 184 r'WHERE fileindex.pathid=filepath.id',
185 185
186 186 r'PRAGMA user_version=%d' % CURRENT_SCHEMA_VERSION,
187 187 ]
188 188
189 189 def resolvedeltachain(db, pathid, node, revisioncache,
190 190 stoprids, zstddctx=None):
191 191 """Resolve a delta chain for a file node."""
192 192
193 193 # TODO the "not in ({stops})" here is possibly slowing down the query
194 194 # because it needs to perform the lookup on every recursive invocation.
195 195 # This could possibly be faster if we created a temporary query with
196 196 # baseid "poisoned" to null and limited the recursive filter to
197 197 # "is not null".
198 198 res = db.execute(
199 199 r'WITH RECURSIVE '
200 200 r' deltachain(deltaid, baseid) AS ('
201 201 r' SELECT deltaid, deltabaseid FROM fileindex '
202 202 r' WHERE pathid=? AND node=? '
203 203 r' UNION ALL '
204 204 r' SELECT fileindex.deltaid, deltabaseid '
205 205 r' FROM fileindex, deltachain '
206 206 r' WHERE '
207 207 r' fileindex.id=deltachain.baseid '
208 208 r' AND deltachain.baseid IS NOT NULL '
209 209 r' AND fileindex.id NOT IN ({stops}) '
210 210 r' ) '
211 211 r'SELECT deltachain.baseid, compression, delta '
212 212 r'FROM deltachain, delta '
213 213 r'WHERE delta.id=deltachain.deltaid'.format(
214 214 stops=r','.join([r'?'] * len(stoprids))),
215 215 tuple([pathid, node] + list(stoprids.keys())))
216 216
217 217 deltas = []
218 218 lastdeltabaseid = None
219 219
220 220 for deltabaseid, compression, delta in res:
221 221 lastdeltabaseid = deltabaseid
222 222
223 223 if compression == COMPRESSION_ZSTD:
224 224 delta = zstddctx.decompress(delta)
225 225 elif compression == COMPRESSION_NONE:
226 226 delta = delta
227 227 elif compression == COMPRESSION_ZLIB:
228 228 delta = zlib.decompress(delta)
229 229 else:
230 230 raise SQLiteStoreError('unhandled compression type: %d' %
231 231 compression)
232 232
233 233 deltas.append(delta)
234 234
235 235 if lastdeltabaseid in stoprids:
236 236 basetext = revisioncache[stoprids[lastdeltabaseid]]
237 237 else:
238 238 basetext = deltas.pop()
239 239
240 240 deltas.reverse()
241 241 fulltext = mdiff.patches(basetext, deltas)
242 242
243 243 # SQLite returns buffer instances for blob columns on Python 2. This
244 244 # type can propagate through the delta application layer. Because
245 245 # downstream callers assume revisions are bytes, cast as needed.
246 246 if not isinstance(fulltext, bytes):
247 247 fulltext = bytes(delta)
248 248
249 249 return fulltext
250 250
251 251 def insertdelta(db, compression, hash, delta):
252 252 try:
253 253 return db.execute(
254 254 r'INSERT INTO delta (compression, hash, delta) '
255 255 r'VALUES (?, ?, ?)',
256 256 (compression, hash, delta)).lastrowid
257 257 except sqlite3.IntegrityError:
258 258 return db.execute(
259 259 r'SELECT id FROM delta WHERE hash=?',
260 260 (hash,)).fetchone()[0]
261 261
262 262 class SQLiteStoreError(error.StorageError):
263 263 pass
264 264
265 265 @attr.s
266 266 class revisionentry(object):
267 267 rid = attr.ib()
268 268 rev = attr.ib()
269 269 node = attr.ib()
270 270 p1rev = attr.ib()
271 271 p2rev = attr.ib()
272 272 p1node = attr.ib()
273 273 p2node = attr.ib()
274 274 linkrev = attr.ib()
275 275 flags = attr.ib()
276 276
277 277 @interfaceutil.implementer(repository.irevisiondelta)
278 278 @attr.s(slots=True)
279 279 class sqliterevisiondelta(object):
280 280 node = attr.ib()
281 281 p1node = attr.ib()
282 282 p2node = attr.ib()
283 283 basenode = attr.ib()
284 284 flags = attr.ib()
285 285 baserevisionsize = attr.ib()
286 286 revision = attr.ib()
287 287 delta = attr.ib()
288 288 linknode = attr.ib(default=None)
289 289
290 290 @interfaceutil.implementer(repository.iverifyproblem)
291 291 @attr.s(frozen=True)
292 292 class sqliteproblem(object):
293 293 warning = attr.ib(default=None)
294 294 error = attr.ib(default=None)
295 295 node = attr.ib(default=None)
296 296
297 297 @interfaceutil.implementer(repository.ifilestorage)
298 298 class sqlitefilestore(object):
299 299 """Implements storage for an individual tracked path."""
300 300
301 301 def __init__(self, db, path, compression):
302 302 self._db = db
303 303 self._path = path
304 304
305 305 self._pathid = None
306 306
307 307 # revnum -> node
308 308 self._revtonode = {}
309 309 # node -> revnum
310 310 self._nodetorev = {}
311 311 # node -> data structure
312 312 self._revisions = {}
313 313
314 314 self._revisioncache = util.lrucachedict(10)
315 315
316 316 self._compengine = compression
317 317
318 318 if compression == 'zstd':
319 319 self._cctx = zstd.ZstdCompressor(level=3)
320 320 self._dctx = zstd.ZstdDecompressor()
321 321 else:
322 322 self._cctx = None
323 323 self._dctx = None
324 324
325 325 self._refreshindex()
326 326
327 327 def _refreshindex(self):
328 328 self._revtonode = {}
329 329 self._nodetorev = {}
330 330 self._revisions = {}
331 331
332 332 res = list(self._db.execute(
333 333 r'SELECT id FROM filepath WHERE path=?', (self._path,)))
334 334
335 335 if not res:
336 336 self._pathid = None
337 337 return
338 338
339 339 self._pathid = res[0][0]
340 340
341 341 res = self._db.execute(
342 342 r'SELECT id, revnum, node, p1rev, p2rev, linkrev, flags '
343 343 r'FROM fileindex '
344 344 r'WHERE pathid=? '
345 345 r'ORDER BY revnum ASC',
346 346 (self._pathid,))
347 347
348 348 for i, row in enumerate(res):
349 349 rid, rev, node, p1rev, p2rev, linkrev, flags = row
350 350
351 351 if i != rev:
352 352 raise SQLiteStoreError(_('sqlite database has inconsistent '
353 353 'revision numbers'))
354 354
355 355 if p1rev == nullrev:
356 356 p1node = nullid
357 357 else:
358 358 p1node = self._revtonode[p1rev]
359 359
360 360 if p2rev == nullrev:
361 361 p2node = nullid
362 362 else:
363 363 p2node = self._revtonode[p2rev]
364 364
365 365 entry = revisionentry(
366 366 rid=rid,
367 367 rev=rev,
368 368 node=node,
369 369 p1rev=p1rev,
370 370 p2rev=p2rev,
371 371 p1node=p1node,
372 372 p2node=p2node,
373 373 linkrev=linkrev,
374 374 flags=flags)
375 375
376 376 self._revtonode[rev] = node
377 377 self._nodetorev[node] = rev
378 378 self._revisions[node] = entry
379 379
380 380 # Start of ifileindex interface.
381 381
382 382 def __len__(self):
383 383 return len(self._revisions)
384 384
385 385 def __iter__(self):
386 386 return iter(pycompat.xrange(len(self._revisions)))
387 387
388 388 def hasnode(self, node):
389 389 if node == nullid:
390 390 return False
391 391
392 392 return node in self._nodetorev
393 393
394 394 def revs(self, start=0, stop=None):
395 395 return storageutil.iterrevs(len(self._revisions), start=start,
396 396 stop=stop)
397 397
398 398 def parents(self, node):
399 399 if node == nullid:
400 400 return nullid, nullid
401 401
402 402 if node not in self._revisions:
403 403 raise error.LookupError(node, self._path, _('no node'))
404 404
405 405 entry = self._revisions[node]
406 406 return entry.p1node, entry.p2node
407 407
408 408 def parentrevs(self, rev):
409 409 if rev == nullrev:
410 410 return nullrev, nullrev
411 411
412 412 if rev not in self._revtonode:
413 413 raise IndexError(rev)
414 414
415 415 entry = self._revisions[self._revtonode[rev]]
416 416 return entry.p1rev, entry.p2rev
417 417
418 418 def rev(self, node):
419 419 if node == nullid:
420 420 return nullrev
421 421
422 422 if node not in self._nodetorev:
423 423 raise error.LookupError(node, self._path, _('no node'))
424 424
425 425 return self._nodetorev[node]
426 426
427 427 def node(self, rev):
428 428 if rev == nullrev:
429 429 return nullid
430 430
431 431 if rev not in self._revtonode:
432 432 raise IndexError(rev)
433 433
434 434 return self._revtonode[rev]
435 435
436 436 def lookup(self, node):
437 437 return storageutil.fileidlookup(self, node, self._path)
438 438
439 439 def linkrev(self, rev):
440 440 if rev == nullrev:
441 441 return nullrev
442 442
443 443 if rev not in self._revtonode:
444 444 raise IndexError(rev)
445 445
446 446 entry = self._revisions[self._revtonode[rev]]
447 447 return entry.linkrev
448 448
449 449 def iscensored(self, rev):
450 450 if rev == nullrev:
451 451 return False
452 452
453 453 if rev not in self._revtonode:
454 454 raise IndexError(rev)
455 455
456 456 return self._revisions[self._revtonode[rev]].flags & FLAG_CENSORED
457 457
458 458 def commonancestorsheads(self, node1, node2):
459 459 rev1 = self.rev(node1)
460 460 rev2 = self.rev(node2)
461 461
462 462 ancestors = ancestor.commonancestorsheads(self.parentrevs, rev1, rev2)
463 463 return pycompat.maplist(self.node, ancestors)
464 464
465 465 def descendants(self, revs):
466 466 # TODO we could implement this using a recursive SQL query, which
467 467 # might be faster.
468 468 return dagop.descendantrevs(revs, self.revs, self.parentrevs)
469 469
470 470 def heads(self, start=None, stop=None):
471 471 if start is None and stop is None:
472 472 if not len(self):
473 473 return [nullid]
474 474
475 475 startrev = self.rev(start) if start is not None else nullrev
476 476 stoprevs = {self.rev(n) for n in stop or []}
477 477
478 478 revs = dagop.headrevssubset(self.revs, self.parentrevs,
479 479 startrev=startrev, stoprevs=stoprevs)
480 480
481 481 return [self.node(rev) for rev in revs]
482 482
483 483 def children(self, node):
484 484 rev = self.rev(node)
485 485
486 486 res = self._db.execute(
487 487 r'SELECT'
488 488 r' node '
489 489 r' FROM filedata '
490 490 r' WHERE path=? AND (p1rev=? OR p2rev=?) '
491 491 r' ORDER BY revnum ASC',
492 492 (self._path, rev, rev))
493 493
494 494 return [row[0] for row in res]
495 495
496 496 # End of ifileindex interface.
497 497
498 498 # Start of ifiledata interface.
499 499
500 500 def size(self, rev):
501 501 if rev == nullrev:
502 502 return 0
503 503
504 504 if rev not in self._revtonode:
505 505 raise IndexError(rev)
506 506
507 507 node = self._revtonode[rev]
508 508
509 509 if self.renamed(node):
510 510 return len(self.read(node))
511 511
512 512 return len(self.revision(node))
513 513
514 514 def revision(self, node, raw=False, _verifyhash=True):
515 515 if node in (nullid, nullrev):
516 516 return b''
517 517
518 518 if isinstance(node, int):
519 519 node = self.node(node)
520 520
521 521 if node not in self._nodetorev:
522 522 raise error.LookupError(node, self._path, _('no node'))
523 523
524 524 if node in self._revisioncache:
525 525 return self._revisioncache[node]
526 526
527 527 # Because we have a fulltext revision cache, we are able to
528 528 # short-circuit delta chain traversal and decompression as soon as
529 529 # we encounter a revision in the cache.
530 530
531 531 stoprids = {self._revisions[n].rid: n
532 532 for n in self._revisioncache}
533 533
534 534 if not stoprids:
535 535 stoprids[-1] = None
536 536
537 537 fulltext = resolvedeltachain(self._db, self._pathid, node,
538 538 self._revisioncache, stoprids,
539 539 zstddctx=self._dctx)
540 540
541 541 # Don't verify hashes if parent nodes were rewritten, as the hash
542 542 # wouldn't verify.
543 543 if self._revisions[node].flags & (FLAG_MISSING_P1 | FLAG_MISSING_P2):
544 544 _verifyhash = False
545 545
546 546 if _verifyhash:
547 547 self._checkhash(fulltext, node)
548 548 self._revisioncache[node] = fulltext
549 549
550 550 return fulltext
551 551
552 552 def read(self, node):
553 553 return storageutil.filtermetadata(self.revision(node))
554 554
555 555 def renamed(self, node):
556 556 return storageutil.filerevisioncopied(self, node)
557 557
558 558 def cmp(self, node, fulltext):
559 559 return not storageutil.filedataequivalent(self, node, fulltext)
560 560
561 561 def emitrevisions(self, nodes, nodesorder=None, revisiondata=False,
562 assumehaveparentrevisions=False, deltaprevious=False):
562 assumehaveparentrevisions=False,
563 deltamode=repository.CG_DELTAMODE_STD):
563 564 if nodesorder not in ('nodes', 'storage', None):
564 565 raise error.ProgrammingError('unhandled value for nodesorder: %s' %
565 566 nodesorder)
566 567
567 568 nodes = [n for n in nodes if n != nullid]
568 569
569 570 if not nodes:
570 571 return
571 572
572 573 # TODO perform in a single query.
573 574 res = self._db.execute(
574 575 r'SELECT revnum, deltaid FROM fileindex '
575 576 r'WHERE pathid=? '
576 577 r' AND node in (%s)' % (r','.join([r'?'] * len(nodes))),
577 578 tuple([self._pathid] + nodes))
578 579
579 580 deltabases = {}
580 581
581 582 for rev, deltaid in res:
582 583 res = self._db.execute(
583 584 r'SELECT revnum from fileindex WHERE pathid=? AND deltaid=?',
584 585 (self._pathid, deltaid))
585 586 deltabases[rev] = res.fetchone()[0]
586 587
587 588 # TODO define revdifffn so we can use delta from storage.
588 589 for delta in storageutil.emitrevisions(
589 590 self, nodes, nodesorder, sqliterevisiondelta,
590 591 deltaparentfn=deltabases.__getitem__,
591 592 revisiondata=revisiondata,
592 593 assumehaveparentrevisions=assumehaveparentrevisions,
593 deltaprevious=deltaprevious):
594 deltamode=deltamode):
594 595
595 596 yield delta
596 597
597 598 # End of ifiledata interface.
598 599
599 600 # Start of ifilemutation interface.
600 601
601 602 def add(self, filedata, meta, transaction, linkrev, p1, p2):
602 603 if meta or filedata.startswith(b'\x01\n'):
603 604 filedata = storageutil.packmeta(meta, filedata)
604 605
605 606 return self.addrevision(filedata, transaction, linkrev, p1, p2)
606 607
607 608 def addrevision(self, revisiondata, transaction, linkrev, p1, p2, node=None,
608 609 flags=0, cachedelta=None):
609 610 if flags:
610 611 raise SQLiteStoreError(_('flags not supported on revisions'))
611 612
612 613 validatehash = node is not None
613 614 node = node or storageutil.hashrevisionsha1(revisiondata, p1, p2)
614 615
615 616 if validatehash:
616 617 self._checkhash(revisiondata, node, p1, p2)
617 618
618 619 if node in self._nodetorev:
619 620 return node
620 621
621 622 node = self._addrawrevision(node, revisiondata, transaction, linkrev,
622 623 p1, p2)
623 624
624 625 self._revisioncache[node] = revisiondata
625 626 return node
626 627
627 628 def addgroup(self, deltas, linkmapper, transaction, addrevisioncb=None,
628 629 maybemissingparents=False):
629 630 nodes = []
630 631
631 632 for node, p1, p2, linknode, deltabase, delta, wireflags in deltas:
632 633 storeflags = 0
633 634
634 635 if wireflags & repository.REVISION_FLAG_CENSORED:
635 636 storeflags |= FLAG_CENSORED
636 637
637 638 if wireflags & ~repository.REVISION_FLAG_CENSORED:
638 639 raise SQLiteStoreError('unhandled revision flag')
639 640
640 641 if maybemissingparents:
641 642 if p1 != nullid and not self.hasnode(p1):
642 643 p1 = nullid
643 644 storeflags |= FLAG_MISSING_P1
644 645
645 646 if p2 != nullid and not self.hasnode(p2):
646 647 p2 = nullid
647 648 storeflags |= FLAG_MISSING_P2
648 649
649 650 baserev = self.rev(deltabase)
650 651
651 652 # If base is censored, delta must be full replacement in a single
652 653 # patch operation.
653 654 if baserev != nullrev and self.iscensored(baserev):
654 655 hlen = struct.calcsize('>lll')
655 656 oldlen = len(self.revision(deltabase, raw=True,
656 657 _verifyhash=False))
657 658 newlen = len(delta) - hlen
658 659
659 660 if delta[:hlen] != mdiff.replacediffheader(oldlen, newlen):
660 661 raise error.CensoredBaseError(self._path,
661 662 deltabase)
662 663
663 664 if (not (storeflags & FLAG_CENSORED)
664 665 and storageutil.deltaiscensored(
665 666 delta, baserev, lambda x: len(self.revision(x, raw=True)))):
666 667 storeflags |= FLAG_CENSORED
667 668
668 669 linkrev = linkmapper(linknode)
669 670
670 671 nodes.append(node)
671 672
672 673 if node in self._revisions:
673 674 # Possibly reset parents to make them proper.
674 675 entry = self._revisions[node]
675 676
676 677 if entry.flags & FLAG_MISSING_P1 and p1 != nullid:
677 678 entry.p1node = p1
678 679 entry.p1rev = self._nodetorev[p1]
679 680 entry.flags &= ~FLAG_MISSING_P1
680 681
681 682 self._db.execute(
682 683 r'UPDATE fileindex SET p1rev=?, flags=? '
683 684 r'WHERE id=?',
684 685 (self._nodetorev[p1], entry.flags, entry.rid))
685 686
686 687 if entry.flags & FLAG_MISSING_P2 and p2 != nullid:
687 688 entry.p2node = p2
688 689 entry.p2rev = self._nodetorev[p2]
689 690 entry.flags &= ~FLAG_MISSING_P2
690 691
691 692 self._db.execute(
692 693 r'UPDATE fileindex SET p2rev=?, flags=? '
693 694 r'WHERE id=?',
694 695 (self._nodetorev[p1], entry.flags, entry.rid))
695 696
696 697 continue
697 698
698 699 if deltabase == nullid:
699 700 text = mdiff.patch(b'', delta)
700 701 storedelta = None
701 702 else:
702 703 text = None
703 704 storedelta = (deltabase, delta)
704 705
705 706 self._addrawrevision(node, text, transaction, linkrev, p1, p2,
706 707 storedelta=storedelta, flags=storeflags)
707 708
708 709 if addrevisioncb:
709 710 addrevisioncb(self, node)
710 711
711 712 return nodes
712 713
713 714 def censorrevision(self, tr, censornode, tombstone=b''):
714 715 tombstone = storageutil.packmeta({b'censored': tombstone}, b'')
715 716
716 717 # This restriction is cargo culted from revlogs and makes no sense for
717 718 # SQLite, since columns can be resized at will.
718 719 if len(tombstone) > len(self.revision(censornode, raw=True)):
719 720 raise error.Abort(_('censor tombstone must be no longer than '
720 721 'censored data'))
721 722
722 723 # We need to replace the censored revision's data with the tombstone.
723 724 # But replacing that data will have implications for delta chains that
724 725 # reference it.
725 726 #
726 727 # While "better," more complex strategies are possible, we do something
727 728 # simple: we find delta chain children of the censored revision and we
728 729 # replace those incremental deltas with fulltexts of their corresponding
729 730 # revision. Then we delete the now-unreferenced delta and original
730 731 # revision and insert a replacement.
731 732
732 733 # Find the delta to be censored.
733 734 censoreddeltaid = self._db.execute(
734 735 r'SELECT deltaid FROM fileindex WHERE id=?',
735 736 (self._revisions[censornode].rid,)).fetchone()[0]
736 737
737 738 # Find all its delta chain children.
738 739 # TODO once we support storing deltas for !files, we'll need to look
739 740 # for those delta chains too.
740 741 rows = list(self._db.execute(
741 742 r'SELECT id, pathid, node FROM fileindex '
742 743 r'WHERE deltabaseid=? OR deltaid=?',
743 744 (censoreddeltaid, censoreddeltaid)))
744 745
745 746 for row in rows:
746 747 rid, pathid, node = row
747 748
748 749 fulltext = resolvedeltachain(self._db, pathid, node, {}, {-1: None},
749 750 zstddctx=self._dctx)
750 751
751 752 deltahash = hashlib.sha1(fulltext).digest()
752 753
753 754 if self._compengine == 'zstd':
754 755 deltablob = self._cctx.compress(fulltext)
755 756 compression = COMPRESSION_ZSTD
756 757 elif self._compengine == 'zlib':
757 758 deltablob = zlib.compress(fulltext)
758 759 compression = COMPRESSION_ZLIB
759 760 elif self._compengine == 'none':
760 761 deltablob = fulltext
761 762 compression = COMPRESSION_NONE
762 763 else:
763 764 raise error.ProgrammingError('unhandled compression engine: %s'
764 765 % self._compengine)
765 766
766 767 if len(deltablob) >= len(fulltext):
767 768 deltablob = fulltext
768 769 compression = COMPRESSION_NONE
769 770
770 771 deltaid = insertdelta(self._db, compression, deltahash, deltablob)
771 772
772 773 self._db.execute(
773 774 r'UPDATE fileindex SET deltaid=?, deltabaseid=NULL '
774 775 r'WHERE id=?', (deltaid, rid))
775 776
776 777 # Now create the tombstone delta and replace the delta on the censored
777 778 # node.
778 779 deltahash = hashlib.sha1(tombstone).digest()
779 780 tombstonedeltaid = insertdelta(self._db, COMPRESSION_NONE,
780 781 deltahash, tombstone)
781 782
782 783 flags = self._revisions[censornode].flags
783 784 flags |= FLAG_CENSORED
784 785
785 786 self._db.execute(
786 787 r'UPDATE fileindex SET flags=?, deltaid=?, deltabaseid=NULL '
787 788 r'WHERE pathid=? AND node=?',
788 789 (flags, tombstonedeltaid, self._pathid, censornode))
789 790
790 791 self._db.execute(
791 792 r'DELETE FROM delta WHERE id=?', (censoreddeltaid,))
792 793
793 794 self._refreshindex()
794 795 self._revisioncache.clear()
795 796
796 797 def getstrippoint(self, minlink):
797 798 return storageutil.resolvestripinfo(minlink, len(self) - 1,
798 799 [self.rev(n) for n in self.heads()],
799 800 self.linkrev,
800 801 self.parentrevs)
801 802
802 803 def strip(self, minlink, transaction):
803 804 if not len(self):
804 805 return
805 806
806 807 rev, _ignored = self.getstrippoint(minlink)
807 808
808 809 if rev == len(self):
809 810 return
810 811
811 812 for rev in self.revs(rev):
812 813 self._db.execute(
813 814 r'DELETE FROM fileindex WHERE pathid=? AND node=?',
814 815 (self._pathid, self.node(rev)))
815 816
816 817 # TODO how should we garbage collect data in delta table?
817 818
818 819 self._refreshindex()
819 820
820 821 # End of ifilemutation interface.
821 822
822 823 # Start of ifilestorage interface.
823 824
824 825 def files(self):
825 826 return []
826 827
827 828 def storageinfo(self, exclusivefiles=False, sharedfiles=False,
828 829 revisionscount=False, trackedsize=False,
829 830 storedsize=False):
830 831 d = {}
831 832
832 833 if exclusivefiles:
833 834 d['exclusivefiles'] = []
834 835
835 836 if sharedfiles:
836 837 # TODO list sqlite file(s) here.
837 838 d['sharedfiles'] = []
838 839
839 840 if revisionscount:
840 841 d['revisionscount'] = len(self)
841 842
842 843 if trackedsize:
843 844 d['trackedsize'] = sum(len(self.revision(node))
844 845 for node in self._nodetorev)
845 846
846 847 if storedsize:
847 848 # TODO implement this?
848 849 d['storedsize'] = None
849 850
850 851 return d
851 852
852 853 def verifyintegrity(self, state):
853 854 state['skipread'] = set()
854 855
855 856 for rev in self:
856 857 node = self.node(rev)
857 858
858 859 try:
859 860 self.revision(node)
860 861 except Exception as e:
861 862 yield sqliteproblem(
862 863 error=_('unpacking %s: %s') % (short(node), e),
863 864 node=node)
864 865
865 866 state['skipread'].add(node)
866 867
867 868 # End of ifilestorage interface.
868 869
869 870 def _checkhash(self, fulltext, node, p1=None, p2=None):
870 871 if p1 is None and p2 is None:
871 872 p1, p2 = self.parents(node)
872 873
873 874 if node == storageutil.hashrevisionsha1(fulltext, p1, p2):
874 875 return
875 876
876 877 try:
877 878 del self._revisioncache[node]
878 879 except KeyError:
879 880 pass
880 881
881 882 if storageutil.iscensoredtext(fulltext):
882 883 raise error.CensoredNodeError(self._path, node, fulltext)
883 884
884 885 raise SQLiteStoreError(_('integrity check failed on %s') %
885 886 self._path)
886 887
887 888 def _addrawrevision(self, node, revisiondata, transaction, linkrev,
888 889 p1, p2, storedelta=None, flags=0):
889 890 if self._pathid is None:
890 891 res = self._db.execute(
891 892 r'INSERT INTO filepath (path) VALUES (?)', (self._path,))
892 893 self._pathid = res.lastrowid
893 894
894 895 # For simplicity, always store a delta against p1.
895 896 # TODO we need a lot more logic here to make behavior reasonable.
896 897
897 898 if storedelta:
898 899 deltabase, delta = storedelta
899 900
900 901 if isinstance(deltabase, int):
901 902 deltabase = self.node(deltabase)
902 903
903 904 else:
904 905 assert revisiondata is not None
905 906 deltabase = p1
906 907
907 908 if deltabase == nullid:
908 909 delta = revisiondata
909 910 else:
910 911 delta = mdiff.textdiff(self.revision(self.rev(deltabase)),
911 912 revisiondata)
912 913
913 914 # File index stores a pointer to its delta and the parent delta.
914 915 # The parent delta is stored via a pointer to the fileindex PK.
915 916 if deltabase == nullid:
916 917 baseid = None
917 918 else:
918 919 baseid = self._revisions[deltabase].rid
919 920
920 921 # Deltas are stored with a hash of their content. This allows
921 922 # us to de-duplicate. The table is configured to ignore conflicts
922 923 # and it is faster to just insert and silently noop than to look
923 924 # first.
924 925 deltahash = hashlib.sha1(delta).digest()
925 926
926 927 if self._compengine == 'zstd':
927 928 deltablob = self._cctx.compress(delta)
928 929 compression = COMPRESSION_ZSTD
929 930 elif self._compengine == 'zlib':
930 931 deltablob = zlib.compress(delta)
931 932 compression = COMPRESSION_ZLIB
932 933 elif self._compengine == 'none':
933 934 deltablob = delta
934 935 compression = COMPRESSION_NONE
935 936 else:
936 937 raise error.ProgrammingError('unhandled compression engine: %s' %
937 938 self._compengine)
938 939
939 940 # Don't store compressed data if it isn't practical.
940 941 if len(deltablob) >= len(delta):
941 942 deltablob = delta
942 943 compression = COMPRESSION_NONE
943 944
944 945 deltaid = insertdelta(self._db, compression, deltahash, deltablob)
945 946
946 947 rev = len(self)
947 948
948 949 if p1 == nullid:
949 950 p1rev = nullrev
950 951 else:
951 952 p1rev = self._nodetorev[p1]
952 953
953 954 if p2 == nullid:
954 955 p2rev = nullrev
955 956 else:
956 957 p2rev = self._nodetorev[p2]
957 958
958 959 rid = self._db.execute(
959 960 r'INSERT INTO fileindex ('
960 961 r' pathid, revnum, node, p1rev, p2rev, linkrev, flags, '
961 962 r' deltaid, deltabaseid) '
962 963 r' VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)',
963 964 (self._pathid, rev, node, p1rev, p2rev, linkrev, flags,
964 965 deltaid, baseid)
965 966 ).lastrowid
966 967
967 968 entry = revisionentry(
968 969 rid=rid,
969 970 rev=rev,
970 971 node=node,
971 972 p1rev=p1rev,
972 973 p2rev=p2rev,
973 974 p1node=p1,
974 975 p2node=p2,
975 976 linkrev=linkrev,
976 977 flags=flags)
977 978
978 979 self._nodetorev[node] = rev
979 980 self._revtonode[rev] = node
980 981 self._revisions[node] = entry
981 982
982 983 return node
983 984
984 985 class sqliterepository(localrepo.localrepository):
985 986 def cancopy(self):
986 987 return False
987 988
988 989 def transaction(self, *args, **kwargs):
989 990 current = self.currenttransaction()
990 991
991 992 tr = super(sqliterepository, self).transaction(*args, **kwargs)
992 993
993 994 if current:
994 995 return tr
995 996
996 997 self._dbconn.execute(r'BEGIN TRANSACTION')
997 998
998 999 def committransaction(_):
999 1000 self._dbconn.commit()
1000 1001
1001 1002 tr.addfinalize('sqlitestore', committransaction)
1002 1003
1003 1004 return tr
1004 1005
1005 1006 @property
1006 1007 def _dbconn(self):
1007 1008 # SQLite connections can only be used on the thread that created
1008 1009 # them. In most cases, this "just works." However, hgweb uses
1009 1010 # multiple threads.
1010 1011 tid = threading.current_thread().ident
1011 1012
1012 1013 if self._db:
1013 1014 if self._db[0] == tid:
1014 1015 return self._db[1]
1015 1016
1016 1017 db = makedb(self.svfs.join('db.sqlite'))
1017 1018 self._db = (tid, db)
1018 1019
1019 1020 return db
1020 1021
1021 1022 def makedb(path):
1022 1023 """Construct a database handle for a database at path."""
1023 1024
1024 1025 db = sqlite3.connect(encoding.strfromlocal(path))
1025 1026 db.text_factory = bytes
1026 1027
1027 1028 res = db.execute(r'PRAGMA user_version').fetchone()[0]
1028 1029
1029 1030 # New database.
1030 1031 if res == 0:
1031 1032 for statement in CREATE_SCHEMA:
1032 1033 db.execute(statement)
1033 1034
1034 1035 db.commit()
1035 1036
1036 1037 elif res == CURRENT_SCHEMA_VERSION:
1037 1038 pass
1038 1039
1039 1040 else:
1040 1041 raise error.Abort(_('sqlite database has unrecognized version'))
1041 1042
1042 1043 db.execute(r'PRAGMA journal_mode=WAL')
1043 1044
1044 1045 return db
1045 1046
1046 1047 def featuresetup(ui, supported):
1047 1048 supported.add(REQUIREMENT)
1048 1049
1049 1050 if zstd:
1050 1051 supported.add(REQUIREMENT_ZSTD)
1051 1052
1052 1053 supported.add(REQUIREMENT_ZLIB)
1053 1054 supported.add(REQUIREMENT_NONE)
1054 1055 supported.add(REQUIREMENT_SHALLOW_FILES)
1055 1056 supported.add(repository.NARROW_REQUIREMENT)
1056 1057
1057 1058 def newreporequirements(orig, ui, createopts):
1058 1059 if createopts['backend'] != 'sqlite':
1059 1060 return orig(ui, createopts)
1060 1061
1061 1062 # This restriction can be lifted once we have more confidence.
1062 1063 if 'sharedrepo' in createopts:
1063 1064 raise error.Abort(_('shared repositories not supported with SQLite '
1064 1065 'store'))
1065 1066
1066 1067 # This filtering is out of an abundance of caution: we want to ensure
1067 1068 # we honor creation options and we do that by annotating exactly the
1068 1069 # creation options we recognize.
1069 1070 known = {
1070 1071 'narrowfiles',
1071 1072 'backend',
1072 1073 'shallowfilestore',
1073 1074 }
1074 1075
1075 1076 unsupported = set(createopts) - known
1076 1077 if unsupported:
1077 1078 raise error.Abort(_('SQLite store does not support repo creation '
1078 1079 'option: %s') % ', '.join(sorted(unsupported)))
1079 1080
1080 1081 # Since we're a hybrid store that still relies on revlogs, we fall back
1081 1082 # to using the revlogv1 backend's storage requirements then adding our
1082 1083 # own requirement.
1083 1084 createopts['backend'] = 'revlogv1'
1084 1085 requirements = orig(ui, createopts)
1085 1086 requirements.add(REQUIREMENT)
1086 1087
1087 1088 compression = ui.config('storage', 'sqlite.compression')
1088 1089
1089 1090 if compression == 'zstd' and not zstd:
1090 1091 raise error.Abort(_('storage.sqlite.compression set to "zstd" but '
1091 1092 'zstandard compression not available to this '
1092 1093 'Mercurial install'))
1093 1094
1094 1095 if compression == 'zstd':
1095 1096 requirements.add(REQUIREMENT_ZSTD)
1096 1097 elif compression == 'zlib':
1097 1098 requirements.add(REQUIREMENT_ZLIB)
1098 1099 elif compression == 'none':
1099 1100 requirements.add(REQUIREMENT_NONE)
1100 1101 else:
1101 1102 raise error.Abort(_('unknown compression engine defined in '
1102 1103 'storage.sqlite.compression: %s') % compression)
1103 1104
1104 1105 if createopts.get('shallowfilestore'):
1105 1106 requirements.add(REQUIREMENT_SHALLOW_FILES)
1106 1107
1107 1108 return requirements
1108 1109
1109 1110 @interfaceutil.implementer(repository.ilocalrepositoryfilestorage)
1110 1111 class sqlitefilestorage(object):
1111 1112 """Repository file storage backed by SQLite."""
1112 1113 def file(self, path):
1113 1114 if path[0] == b'/':
1114 1115 path = path[1:]
1115 1116
1116 1117 if REQUIREMENT_ZSTD in self.requirements:
1117 1118 compression = 'zstd'
1118 1119 elif REQUIREMENT_ZLIB in self.requirements:
1119 1120 compression = 'zlib'
1120 1121 elif REQUIREMENT_NONE in self.requirements:
1121 1122 compression = 'none'
1122 1123 else:
1123 1124 raise error.Abort(_('unable to determine what compression engine '
1124 1125 'to use for SQLite storage'))
1125 1126
1126 1127 return sqlitefilestore(self._dbconn, path, compression)
1127 1128
1128 1129 def makefilestorage(orig, requirements, features, **kwargs):
1129 1130 """Produce a type conforming to ``ilocalrepositoryfilestorage``."""
1130 1131 if REQUIREMENT in requirements:
1131 1132 if REQUIREMENT_SHALLOW_FILES in requirements:
1132 1133 features.add(repository.REPO_FEATURE_SHALLOW_FILE_STORAGE)
1133 1134
1134 1135 return sqlitefilestorage
1135 1136 else:
1136 1137 return orig(requirements=requirements, features=features, **kwargs)
1137 1138
1138 1139 def makemain(orig, ui, requirements, **kwargs):
1139 1140 if REQUIREMENT in requirements:
1140 1141 if REQUIREMENT_ZSTD in requirements and not zstd:
1141 1142 raise error.Abort(_('repository uses zstandard compression, which '
1142 1143 'is not available to this Mercurial install'))
1143 1144
1144 1145 return sqliterepository
1145 1146
1146 1147 return orig(requirements=requirements, **kwargs)
1147 1148
1148 1149 def verifierinit(orig, self, *args, **kwargs):
1149 1150 orig(self, *args, **kwargs)
1150 1151
1151 1152 # We don't care that files in the store don't align with what is
1152 1153 # advertised. So suppress these warnings.
1153 1154 self.warnorphanstorefiles = False
1154 1155
1155 1156 def extsetup(ui):
1156 1157 localrepo.featuresetupfuncs.add(featuresetup)
1157 1158 extensions.wrapfunction(localrepo, 'newreporequirements',
1158 1159 newreporequirements)
1159 1160 extensions.wrapfunction(localrepo, 'makefilestorage',
1160 1161 makefilestorage)
1161 1162 extensions.wrapfunction(localrepo, 'makemain',
1162 1163 makemain)
1163 1164 extensions.wrapfunction(verify.verifier, '__init__',
1164 1165 verifierinit)
1165 1166
1166 1167 def reposetup(ui, repo):
1167 1168 if isinstance(repo, sqliterepository):
1168 1169 repo._db = None
1169 1170
1170 1171 # TODO check for bundlerepository?
General Comments 0
You need to be logged in to leave comments. Login now