##// END OF EJS Templates
rawdata: implement `rawdata` for `sqlitestore` too...
marmoute -
r42948:c9f3f4c8 default
parent child Browse files
Show More
@@ -1,1171 +1,1174 b''
1 1 # sqlitestore.py - Storage backend that uses SQLite
2 2 #
3 3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 """store repository data in SQLite (EXPERIMENTAL)
9 9
10 10 The sqlitestore extension enables the storage of repository data in SQLite.
11 11
12 12 This extension is HIGHLY EXPERIMENTAL. There are NO BACKWARDS COMPATIBILITY
13 13 GUARANTEES. This means that repositories created with this extension may
14 14 only be usable with the exact version of this extension/Mercurial that was
15 15 used. The extension attempts to enforce this in order to prevent repository
16 16 corruption.
17 17
18 18 In addition, several features are not yet supported or have known bugs:
19 19
20 20 * Only some data is stored in SQLite. Changeset, manifest, and other repository
21 21 data is not yet stored in SQLite.
22 22 * Transactions are not robust. If the process is aborted at the right time
23 23 during transaction close/rollback, the repository could be in an inconsistent
24 24 state. This problem will diminish once all repository data is tracked by
25 25 SQLite.
26 26 * Bundle repositories do not work (the ability to use e.g.
27 27 `hg -R <bundle-file> log` to automatically overlay a bundle on top of the
28 28 existing repository).
29 29 * Various other features don't work.
30 30
31 31 This extension should work for basic clone/pull, update, and commit workflows.
32 32 Some history rewriting operations may fail due to lack of support for bundle
33 33 repositories.
34 34
35 35 To use, activate the extension and set the ``storage.new-repo-backend`` config
36 36 option to ``sqlite`` to enable new repositories to use SQLite for storage.
37 37 """
38 38
39 39 # To run the test suite with repos using SQLite by default, execute the
40 40 # following:
41 41 #
42 42 # HGREPOFEATURES="sqlitestore" run-tests.py \
43 43 # --extra-config-opt extensions.sqlitestore= \
44 44 # --extra-config-opt storage.new-repo-backend=sqlite
45 45
46 46 from __future__ import absolute_import
47 47
48 48 import hashlib
49 49 import sqlite3
50 50 import struct
51 51 import threading
52 52 import zlib
53 53
54 54 from mercurial.i18n import _
55 55 from mercurial.node import (
56 56 nullid,
57 57 nullrev,
58 58 short,
59 59 )
60 60 from mercurial.thirdparty import (
61 61 attr,
62 62 )
63 63 from mercurial import (
64 64 ancestor,
65 65 dagop,
66 66 encoding,
67 67 error,
68 68 extensions,
69 69 localrepo,
70 70 mdiff,
71 71 pycompat,
72 72 registrar,
73 73 repository,
74 74 util,
75 75 verify,
76 76 )
77 77 from mercurial.utils import (
78 78 interfaceutil,
79 79 storageutil,
80 80 )
81 81
82 82 try:
83 83 from mercurial import zstd
84 84 zstd.__version__
85 85 except ImportError:
86 86 zstd = None
87 87
88 88 configtable = {}
89 89 configitem = registrar.configitem(configtable)
90 90
91 91 # experimental config: storage.sqlite.compression
92 92 configitem('storage', 'sqlite.compression',
93 93 default='zstd' if zstd else 'zlib')
94 94
95 95 # Note for extension authors: ONLY specify testedwith = 'ships-with-hg-core' for
96 96 # extensions which SHIP WITH MERCURIAL. Non-mainline extensions should
97 97 # be specifying the version(s) of Mercurial they are tested with, or
98 98 # leave the attribute unspecified.
99 99 testedwith = 'ships-with-hg-core'
100 100
101 101 REQUIREMENT = b'exp-sqlite-001'
102 102 REQUIREMENT_ZSTD = b'exp-sqlite-comp-001=zstd'
103 103 REQUIREMENT_ZLIB = b'exp-sqlite-comp-001=zlib'
104 104 REQUIREMENT_NONE = b'exp-sqlite-comp-001=none'
105 105 REQUIREMENT_SHALLOW_FILES = b'exp-sqlite-shallow-files'
106 106
107 107 CURRENT_SCHEMA_VERSION = 1
108 108
109 109 COMPRESSION_NONE = 1
110 110 COMPRESSION_ZSTD = 2
111 111 COMPRESSION_ZLIB = 3
112 112
113 113 FLAG_CENSORED = 1
114 114 FLAG_MISSING_P1 = 2
115 115 FLAG_MISSING_P2 = 4
116 116
117 117 CREATE_SCHEMA = [
118 118 # Deltas are stored as content-indexed blobs.
119 119 # compression column holds COMPRESSION_* constant for how the
120 120 # delta is encoded.
121 121
122 122 r'CREATE TABLE delta ('
123 123 r' id INTEGER PRIMARY KEY, '
124 124 r' compression INTEGER NOT NULL, '
125 125 r' hash BLOB UNIQUE ON CONFLICT ABORT, '
126 126 r' delta BLOB NOT NULL '
127 127 r')',
128 128
129 129 # Tracked paths are denormalized to integers to avoid redundant
130 130 # storage of the path name.
131 131 r'CREATE TABLE filepath ('
132 132 r' id INTEGER PRIMARY KEY, '
133 133 r' path BLOB NOT NULL '
134 134 r')',
135 135
136 136 r'CREATE UNIQUE INDEX filepath_path '
137 137 r' ON filepath (path)',
138 138
139 139 # We have a single table for all file revision data.
140 140 # Each file revision is uniquely described by a (path, rev) and
141 141 # (path, node).
142 142 #
143 143 # Revision data is stored as a pointer to the delta producing this
144 144 # revision and the file revision whose delta should be applied before
145 145 # that one. One can reconstruct the delta chain by recursively following
146 146 # the delta base revision pointers until one encounters NULL.
147 147 #
148 148 # flags column holds bitwise integer flags controlling storage options.
149 149 # These flags are defined by the FLAG_* constants.
150 150 r'CREATE TABLE fileindex ('
151 151 r' id INTEGER PRIMARY KEY, '
152 152 r' pathid INTEGER REFERENCES filepath(id), '
153 153 r' revnum INTEGER NOT NULL, '
154 154 r' p1rev INTEGER NOT NULL, '
155 155 r' p2rev INTEGER NOT NULL, '
156 156 r' linkrev INTEGER NOT NULL, '
157 157 r' flags INTEGER NOT NULL, '
158 158 r' deltaid INTEGER REFERENCES delta(id), '
159 159 r' deltabaseid INTEGER REFERENCES fileindex(id), '
160 160 r' node BLOB NOT NULL '
161 161 r')',
162 162
163 163 r'CREATE UNIQUE INDEX fileindex_pathrevnum '
164 164 r' ON fileindex (pathid, revnum)',
165 165
166 166 r'CREATE UNIQUE INDEX fileindex_pathnode '
167 167 r' ON fileindex (pathid, node)',
168 168
169 169 # Provide a view over all file data for convenience.
170 170 r'CREATE VIEW filedata AS '
171 171 r'SELECT '
172 172 r' fileindex.id AS id, '
173 173 r' filepath.id AS pathid, '
174 174 r' filepath.path AS path, '
175 175 r' fileindex.revnum AS revnum, '
176 176 r' fileindex.node AS node, '
177 177 r' fileindex.p1rev AS p1rev, '
178 178 r' fileindex.p2rev AS p2rev, '
179 179 r' fileindex.linkrev AS linkrev, '
180 180 r' fileindex.flags AS flags, '
181 181 r' fileindex.deltaid AS deltaid, '
182 182 r' fileindex.deltabaseid AS deltabaseid '
183 183 r'FROM filepath, fileindex '
184 184 r'WHERE fileindex.pathid=filepath.id',
185 185
186 186 r'PRAGMA user_version=%d' % CURRENT_SCHEMA_VERSION,
187 187 ]
188 188
189 189 def resolvedeltachain(db, pathid, node, revisioncache,
190 190 stoprids, zstddctx=None):
191 191 """Resolve a delta chain for a file node."""
192 192
193 193 # TODO the "not in ({stops})" here is possibly slowing down the query
194 194 # because it needs to perform the lookup on every recursive invocation.
195 195 # This could possibly be faster if we created a temporary query with
196 196 # baseid "poisoned" to null and limited the recursive filter to
197 197 # "is not null".
198 198 res = db.execute(
199 199 r'WITH RECURSIVE '
200 200 r' deltachain(deltaid, baseid) AS ('
201 201 r' SELECT deltaid, deltabaseid FROM fileindex '
202 202 r' WHERE pathid=? AND node=? '
203 203 r' UNION ALL '
204 204 r' SELECT fileindex.deltaid, deltabaseid '
205 205 r' FROM fileindex, deltachain '
206 206 r' WHERE '
207 207 r' fileindex.id=deltachain.baseid '
208 208 r' AND deltachain.baseid IS NOT NULL '
209 209 r' AND fileindex.id NOT IN ({stops}) '
210 210 r' ) '
211 211 r'SELECT deltachain.baseid, compression, delta '
212 212 r'FROM deltachain, delta '
213 213 r'WHERE delta.id=deltachain.deltaid'.format(
214 214 stops=r','.join([r'?'] * len(stoprids))),
215 215 tuple([pathid, node] + list(stoprids.keys())))
216 216
217 217 deltas = []
218 218 lastdeltabaseid = None
219 219
220 220 for deltabaseid, compression, delta in res:
221 221 lastdeltabaseid = deltabaseid
222 222
223 223 if compression == COMPRESSION_ZSTD:
224 224 delta = zstddctx.decompress(delta)
225 225 elif compression == COMPRESSION_NONE:
226 226 delta = delta
227 227 elif compression == COMPRESSION_ZLIB:
228 228 delta = zlib.decompress(delta)
229 229 else:
230 230 raise SQLiteStoreError('unhandled compression type: %d' %
231 231 compression)
232 232
233 233 deltas.append(delta)
234 234
235 235 if lastdeltabaseid in stoprids:
236 236 basetext = revisioncache[stoprids[lastdeltabaseid]]
237 237 else:
238 238 basetext = deltas.pop()
239 239
240 240 deltas.reverse()
241 241 fulltext = mdiff.patches(basetext, deltas)
242 242
243 243 # SQLite returns buffer instances for blob columns on Python 2. This
244 244 # type can propagate through the delta application layer. Because
245 245 # downstream callers assume revisions are bytes, cast as needed.
246 246 if not isinstance(fulltext, bytes):
247 247 fulltext = bytes(delta)
248 248
249 249 return fulltext
250 250
251 251 def insertdelta(db, compression, hash, delta):
252 252 try:
253 253 return db.execute(
254 254 r'INSERT INTO delta (compression, hash, delta) '
255 255 r'VALUES (?, ?, ?)',
256 256 (compression, hash, delta)).lastrowid
257 257 except sqlite3.IntegrityError:
258 258 return db.execute(
259 259 r'SELECT id FROM delta WHERE hash=?',
260 260 (hash,)).fetchone()[0]
261 261
262 262 class SQLiteStoreError(error.StorageError):
263 263 pass
264 264
265 265 @attr.s
266 266 class revisionentry(object):
267 267 rid = attr.ib()
268 268 rev = attr.ib()
269 269 node = attr.ib()
270 270 p1rev = attr.ib()
271 271 p2rev = attr.ib()
272 272 p1node = attr.ib()
273 273 p2node = attr.ib()
274 274 linkrev = attr.ib()
275 275 flags = attr.ib()
276 276
277 277 @interfaceutil.implementer(repository.irevisiondelta)
278 278 @attr.s(slots=True)
279 279 class sqliterevisiondelta(object):
280 280 node = attr.ib()
281 281 p1node = attr.ib()
282 282 p2node = attr.ib()
283 283 basenode = attr.ib()
284 284 flags = attr.ib()
285 285 baserevisionsize = attr.ib()
286 286 revision = attr.ib()
287 287 delta = attr.ib()
288 288 linknode = attr.ib(default=None)
289 289
290 290 @interfaceutil.implementer(repository.iverifyproblem)
291 291 @attr.s(frozen=True)
292 292 class sqliteproblem(object):
293 293 warning = attr.ib(default=None)
294 294 error = attr.ib(default=None)
295 295 node = attr.ib(default=None)
296 296
297 297 @interfaceutil.implementer(repository.ifilestorage)
298 298 class sqlitefilestore(object):
299 299 """Implements storage for an individual tracked path."""
300 300
301 301 def __init__(self, db, path, compression):
302 302 self._db = db
303 303 self._path = path
304 304
305 305 self._pathid = None
306 306
307 307 # revnum -> node
308 308 self._revtonode = {}
309 309 # node -> revnum
310 310 self._nodetorev = {}
311 311 # node -> data structure
312 312 self._revisions = {}
313 313
314 314 self._revisioncache = util.lrucachedict(10)
315 315
316 316 self._compengine = compression
317 317
318 318 if compression == 'zstd':
319 319 self._cctx = zstd.ZstdCompressor(level=3)
320 320 self._dctx = zstd.ZstdDecompressor()
321 321 else:
322 322 self._cctx = None
323 323 self._dctx = None
324 324
325 325 self._refreshindex()
326 326
327 327 def _refreshindex(self):
328 328 self._revtonode = {}
329 329 self._nodetorev = {}
330 330 self._revisions = {}
331 331
332 332 res = list(self._db.execute(
333 333 r'SELECT id FROM filepath WHERE path=?', (self._path,)))
334 334
335 335 if not res:
336 336 self._pathid = None
337 337 return
338 338
339 339 self._pathid = res[0][0]
340 340
341 341 res = self._db.execute(
342 342 r'SELECT id, revnum, node, p1rev, p2rev, linkrev, flags '
343 343 r'FROM fileindex '
344 344 r'WHERE pathid=? '
345 345 r'ORDER BY revnum ASC',
346 346 (self._pathid,))
347 347
348 348 for i, row in enumerate(res):
349 349 rid, rev, node, p1rev, p2rev, linkrev, flags = row
350 350
351 351 if i != rev:
352 352 raise SQLiteStoreError(_('sqlite database has inconsistent '
353 353 'revision numbers'))
354 354
355 355 if p1rev == nullrev:
356 356 p1node = nullid
357 357 else:
358 358 p1node = self._revtonode[p1rev]
359 359
360 360 if p2rev == nullrev:
361 361 p2node = nullid
362 362 else:
363 363 p2node = self._revtonode[p2rev]
364 364
365 365 entry = revisionentry(
366 366 rid=rid,
367 367 rev=rev,
368 368 node=node,
369 369 p1rev=p1rev,
370 370 p2rev=p2rev,
371 371 p1node=p1node,
372 372 p2node=p2node,
373 373 linkrev=linkrev,
374 374 flags=flags)
375 375
376 376 self._revtonode[rev] = node
377 377 self._nodetorev[node] = rev
378 378 self._revisions[node] = entry
379 379
380 380 # Start of ifileindex interface.
381 381
382 382 def __len__(self):
383 383 return len(self._revisions)
384 384
385 385 def __iter__(self):
386 386 return iter(pycompat.xrange(len(self._revisions)))
387 387
388 388 def hasnode(self, node):
389 389 if node == nullid:
390 390 return False
391 391
392 392 return node in self._nodetorev
393 393
394 394 def revs(self, start=0, stop=None):
395 395 return storageutil.iterrevs(len(self._revisions), start=start,
396 396 stop=stop)
397 397
398 398 def parents(self, node):
399 399 if node == nullid:
400 400 return nullid, nullid
401 401
402 402 if node not in self._revisions:
403 403 raise error.LookupError(node, self._path, _('no node'))
404 404
405 405 entry = self._revisions[node]
406 406 return entry.p1node, entry.p2node
407 407
408 408 def parentrevs(self, rev):
409 409 if rev == nullrev:
410 410 return nullrev, nullrev
411 411
412 412 if rev not in self._revtonode:
413 413 raise IndexError(rev)
414 414
415 415 entry = self._revisions[self._revtonode[rev]]
416 416 return entry.p1rev, entry.p2rev
417 417
418 418 def rev(self, node):
419 419 if node == nullid:
420 420 return nullrev
421 421
422 422 if node not in self._nodetorev:
423 423 raise error.LookupError(node, self._path, _('no node'))
424 424
425 425 return self._nodetorev[node]
426 426
427 427 def node(self, rev):
428 428 if rev == nullrev:
429 429 return nullid
430 430
431 431 if rev not in self._revtonode:
432 432 raise IndexError(rev)
433 433
434 434 return self._revtonode[rev]
435 435
436 436 def lookup(self, node):
437 437 return storageutil.fileidlookup(self, node, self._path)
438 438
439 439 def linkrev(self, rev):
440 440 if rev == nullrev:
441 441 return nullrev
442 442
443 443 if rev not in self._revtonode:
444 444 raise IndexError(rev)
445 445
446 446 entry = self._revisions[self._revtonode[rev]]
447 447 return entry.linkrev
448 448
449 449 def iscensored(self, rev):
450 450 if rev == nullrev:
451 451 return False
452 452
453 453 if rev not in self._revtonode:
454 454 raise IndexError(rev)
455 455
456 456 return self._revisions[self._revtonode[rev]].flags & FLAG_CENSORED
457 457
458 458 def commonancestorsheads(self, node1, node2):
459 459 rev1 = self.rev(node1)
460 460 rev2 = self.rev(node2)
461 461
462 462 ancestors = ancestor.commonancestorsheads(self.parentrevs, rev1, rev2)
463 463 return pycompat.maplist(self.node, ancestors)
464 464
465 465 def descendants(self, revs):
466 466 # TODO we could implement this using a recursive SQL query, which
467 467 # might be faster.
468 468 return dagop.descendantrevs(revs, self.revs, self.parentrevs)
469 469
470 470 def heads(self, start=None, stop=None):
471 471 if start is None and stop is None:
472 472 if not len(self):
473 473 return [nullid]
474 474
475 475 startrev = self.rev(start) if start is not None else nullrev
476 476 stoprevs = {self.rev(n) for n in stop or []}
477 477
478 478 revs = dagop.headrevssubset(self.revs, self.parentrevs,
479 479 startrev=startrev, stoprevs=stoprevs)
480 480
481 481 return [self.node(rev) for rev in revs]
482 482
483 483 def children(self, node):
484 484 rev = self.rev(node)
485 485
486 486 res = self._db.execute(
487 487 r'SELECT'
488 488 r' node '
489 489 r' FROM filedata '
490 490 r' WHERE path=? AND (p1rev=? OR p2rev=?) '
491 491 r' ORDER BY revnum ASC',
492 492 (self._path, rev, rev))
493 493
494 494 return [row[0] for row in res]
495 495
496 496 # End of ifileindex interface.
497 497
498 498 # Start of ifiledata interface.
499 499
500 500 def size(self, rev):
501 501 if rev == nullrev:
502 502 return 0
503 503
504 504 if rev not in self._revtonode:
505 505 raise IndexError(rev)
506 506
507 507 node = self._revtonode[rev]
508 508
509 509 if self.renamed(node):
510 510 return len(self.read(node))
511 511
512 512 return len(self.revision(node))
513 513
514 514 def revision(self, node, raw=False, _verifyhash=True):
515 515 if node in (nullid, nullrev):
516 516 return b''
517 517
518 518 if isinstance(node, int):
519 519 node = self.node(node)
520 520
521 521 if node not in self._nodetorev:
522 522 raise error.LookupError(node, self._path, _('no node'))
523 523
524 524 if node in self._revisioncache:
525 525 return self._revisioncache[node]
526 526
527 527 # Because we have a fulltext revision cache, we are able to
528 528 # short-circuit delta chain traversal and decompression as soon as
529 529 # we encounter a revision in the cache.
530 530
531 531 stoprids = {self._revisions[n].rid: n
532 532 for n in self._revisioncache}
533 533
534 534 if not stoprids:
535 535 stoprids[-1] = None
536 536
537 537 fulltext = resolvedeltachain(self._db, self._pathid, node,
538 538 self._revisioncache, stoprids,
539 539 zstddctx=self._dctx)
540 540
541 541 # Don't verify hashes if parent nodes were rewritten, as the hash
542 542 # wouldn't verify.
543 543 if self._revisions[node].flags & (FLAG_MISSING_P1 | FLAG_MISSING_P2):
544 544 _verifyhash = False
545 545
546 546 if _verifyhash:
547 547 self._checkhash(fulltext, node)
548 548 self._revisioncache[node] = fulltext
549 549
550 550 return fulltext
551 551
552 def rawdata(self, *args, **kwargs):
553 return self.revision(*args, **kwargs)
554
552 555 def read(self, node):
553 556 return storageutil.filtermetadata(self.revision(node))
554 557
555 558 def renamed(self, node):
556 559 return storageutil.filerevisioncopied(self, node)
557 560
558 561 def cmp(self, node, fulltext):
559 562 return not storageutil.filedataequivalent(self, node, fulltext)
560 563
561 564 def emitrevisions(self, nodes, nodesorder=None, revisiondata=False,
562 565 assumehaveparentrevisions=False,
563 566 deltamode=repository.CG_DELTAMODE_STD):
564 567 if nodesorder not in ('nodes', 'storage', 'linear', None):
565 568 raise error.ProgrammingError('unhandled value for nodesorder: %s' %
566 569 nodesorder)
567 570
568 571 nodes = [n for n in nodes if n != nullid]
569 572
570 573 if not nodes:
571 574 return
572 575
573 576 # TODO perform in a single query.
574 577 res = self._db.execute(
575 578 r'SELECT revnum, deltaid FROM fileindex '
576 579 r'WHERE pathid=? '
577 580 r' AND node in (%s)' % (r','.join([r'?'] * len(nodes))),
578 581 tuple([self._pathid] + nodes))
579 582
580 583 deltabases = {}
581 584
582 585 for rev, deltaid in res:
583 586 res = self._db.execute(
584 587 r'SELECT revnum from fileindex WHERE pathid=? AND deltaid=?',
585 588 (self._pathid, deltaid))
586 589 deltabases[rev] = res.fetchone()[0]
587 590
588 591 # TODO define revdifffn so we can use delta from storage.
589 592 for delta in storageutil.emitrevisions(
590 593 self, nodes, nodesorder, sqliterevisiondelta,
591 594 deltaparentfn=deltabases.__getitem__,
592 595 revisiondata=revisiondata,
593 596 assumehaveparentrevisions=assumehaveparentrevisions,
594 597 deltamode=deltamode):
595 598
596 599 yield delta
597 600
598 601 # End of ifiledata interface.
599 602
600 603 # Start of ifilemutation interface.
601 604
602 605 def add(self, filedata, meta, transaction, linkrev, p1, p2):
603 606 if meta or filedata.startswith(b'\x01\n'):
604 607 filedata = storageutil.packmeta(meta, filedata)
605 608
606 609 return self.addrevision(filedata, transaction, linkrev, p1, p2)
607 610
608 611 def addrevision(self, revisiondata, transaction, linkrev, p1, p2, node=None,
609 612 flags=0, cachedelta=None):
610 613 if flags:
611 614 raise SQLiteStoreError(_('flags not supported on revisions'))
612 615
613 616 validatehash = node is not None
614 617 node = node or storageutil.hashrevisionsha1(revisiondata, p1, p2)
615 618
616 619 if validatehash:
617 620 self._checkhash(revisiondata, node, p1, p2)
618 621
619 622 if node in self._nodetorev:
620 623 return node
621 624
622 625 node = self._addrawrevision(node, revisiondata, transaction, linkrev,
623 626 p1, p2)
624 627
625 628 self._revisioncache[node] = revisiondata
626 629 return node
627 630
628 631 def addgroup(self, deltas, linkmapper, transaction, addrevisioncb=None,
629 632 maybemissingparents=False):
630 633 nodes = []
631 634
632 635 for node, p1, p2, linknode, deltabase, delta, wireflags in deltas:
633 636 storeflags = 0
634 637
635 638 if wireflags & repository.REVISION_FLAG_CENSORED:
636 639 storeflags |= FLAG_CENSORED
637 640
638 641 if wireflags & ~repository.REVISION_FLAG_CENSORED:
639 642 raise SQLiteStoreError('unhandled revision flag')
640 643
641 644 if maybemissingparents:
642 645 if p1 != nullid and not self.hasnode(p1):
643 646 p1 = nullid
644 647 storeflags |= FLAG_MISSING_P1
645 648
646 649 if p2 != nullid and not self.hasnode(p2):
647 650 p2 = nullid
648 651 storeflags |= FLAG_MISSING_P2
649 652
650 653 baserev = self.rev(deltabase)
651 654
652 655 # If base is censored, delta must be full replacement in a single
653 656 # patch operation.
654 657 if baserev != nullrev and self.iscensored(baserev):
655 658 hlen = struct.calcsize('>lll')
656 659 oldlen = len(self.revision(deltabase, raw=True,
657 660 _verifyhash=False))
658 661 newlen = len(delta) - hlen
659 662
660 663 if delta[:hlen] != mdiff.replacediffheader(oldlen, newlen):
661 664 raise error.CensoredBaseError(self._path,
662 665 deltabase)
663 666
664 667 if (not (storeflags & FLAG_CENSORED)
665 668 and storageutil.deltaiscensored(
666 669 delta, baserev, lambda x: len(self.revision(x, raw=True)))):
667 670 storeflags |= FLAG_CENSORED
668 671
669 672 linkrev = linkmapper(linknode)
670 673
671 674 nodes.append(node)
672 675
673 676 if node in self._revisions:
674 677 # Possibly reset parents to make them proper.
675 678 entry = self._revisions[node]
676 679
677 680 if entry.flags & FLAG_MISSING_P1 and p1 != nullid:
678 681 entry.p1node = p1
679 682 entry.p1rev = self._nodetorev[p1]
680 683 entry.flags &= ~FLAG_MISSING_P1
681 684
682 685 self._db.execute(
683 686 r'UPDATE fileindex SET p1rev=?, flags=? '
684 687 r'WHERE id=?',
685 688 (self._nodetorev[p1], entry.flags, entry.rid))
686 689
687 690 if entry.flags & FLAG_MISSING_P2 and p2 != nullid:
688 691 entry.p2node = p2
689 692 entry.p2rev = self._nodetorev[p2]
690 693 entry.flags &= ~FLAG_MISSING_P2
691 694
692 695 self._db.execute(
693 696 r'UPDATE fileindex SET p2rev=?, flags=? '
694 697 r'WHERE id=?',
695 698 (self._nodetorev[p1], entry.flags, entry.rid))
696 699
697 700 continue
698 701
699 702 if deltabase == nullid:
700 703 text = mdiff.patch(b'', delta)
701 704 storedelta = None
702 705 else:
703 706 text = None
704 707 storedelta = (deltabase, delta)
705 708
706 709 self._addrawrevision(node, text, transaction, linkrev, p1, p2,
707 710 storedelta=storedelta, flags=storeflags)
708 711
709 712 if addrevisioncb:
710 713 addrevisioncb(self, node)
711 714
712 715 return nodes
713 716
714 717 def censorrevision(self, tr, censornode, tombstone=b''):
715 718 tombstone = storageutil.packmeta({b'censored': tombstone}, b'')
716 719
717 720 # This restriction is cargo culted from revlogs and makes no sense for
718 721 # SQLite, since columns can be resized at will.
719 722 if len(tombstone) > len(self.revision(censornode, raw=True)):
720 723 raise error.Abort(_('censor tombstone must be no longer than '
721 724 'censored data'))
722 725
723 726 # We need to replace the censored revision's data with the tombstone.
724 727 # But replacing that data will have implications for delta chains that
725 728 # reference it.
726 729 #
727 730 # While "better," more complex strategies are possible, we do something
728 731 # simple: we find delta chain children of the censored revision and we
729 732 # replace those incremental deltas with fulltexts of their corresponding
730 733 # revision. Then we delete the now-unreferenced delta and original
731 734 # revision and insert a replacement.
732 735
733 736 # Find the delta to be censored.
734 737 censoreddeltaid = self._db.execute(
735 738 r'SELECT deltaid FROM fileindex WHERE id=?',
736 739 (self._revisions[censornode].rid,)).fetchone()[0]
737 740
738 741 # Find all its delta chain children.
739 742 # TODO once we support storing deltas for !files, we'll need to look
740 743 # for those delta chains too.
741 744 rows = list(self._db.execute(
742 745 r'SELECT id, pathid, node FROM fileindex '
743 746 r'WHERE deltabaseid=? OR deltaid=?',
744 747 (censoreddeltaid, censoreddeltaid)))
745 748
746 749 for row in rows:
747 750 rid, pathid, node = row
748 751
749 752 fulltext = resolvedeltachain(self._db, pathid, node, {}, {-1: None},
750 753 zstddctx=self._dctx)
751 754
752 755 deltahash = hashlib.sha1(fulltext).digest()
753 756
754 757 if self._compengine == 'zstd':
755 758 deltablob = self._cctx.compress(fulltext)
756 759 compression = COMPRESSION_ZSTD
757 760 elif self._compengine == 'zlib':
758 761 deltablob = zlib.compress(fulltext)
759 762 compression = COMPRESSION_ZLIB
760 763 elif self._compengine == 'none':
761 764 deltablob = fulltext
762 765 compression = COMPRESSION_NONE
763 766 else:
764 767 raise error.ProgrammingError('unhandled compression engine: %s'
765 768 % self._compengine)
766 769
767 770 if len(deltablob) >= len(fulltext):
768 771 deltablob = fulltext
769 772 compression = COMPRESSION_NONE
770 773
771 774 deltaid = insertdelta(self._db, compression, deltahash, deltablob)
772 775
773 776 self._db.execute(
774 777 r'UPDATE fileindex SET deltaid=?, deltabaseid=NULL '
775 778 r'WHERE id=?', (deltaid, rid))
776 779
777 780 # Now create the tombstone delta and replace the delta on the censored
778 781 # node.
779 782 deltahash = hashlib.sha1(tombstone).digest()
780 783 tombstonedeltaid = insertdelta(self._db, COMPRESSION_NONE,
781 784 deltahash, tombstone)
782 785
783 786 flags = self._revisions[censornode].flags
784 787 flags |= FLAG_CENSORED
785 788
786 789 self._db.execute(
787 790 r'UPDATE fileindex SET flags=?, deltaid=?, deltabaseid=NULL '
788 791 r'WHERE pathid=? AND node=?',
789 792 (flags, tombstonedeltaid, self._pathid, censornode))
790 793
791 794 self._db.execute(
792 795 r'DELETE FROM delta WHERE id=?', (censoreddeltaid,))
793 796
794 797 self._refreshindex()
795 798 self._revisioncache.clear()
796 799
797 800 def getstrippoint(self, minlink):
798 801 return storageutil.resolvestripinfo(minlink, len(self) - 1,
799 802 [self.rev(n) for n in self.heads()],
800 803 self.linkrev,
801 804 self.parentrevs)
802 805
803 806 def strip(self, minlink, transaction):
804 807 if not len(self):
805 808 return
806 809
807 810 rev, _ignored = self.getstrippoint(minlink)
808 811
809 812 if rev == len(self):
810 813 return
811 814
812 815 for rev in self.revs(rev):
813 816 self._db.execute(
814 817 r'DELETE FROM fileindex WHERE pathid=? AND node=?',
815 818 (self._pathid, self.node(rev)))
816 819
817 820 # TODO how should we garbage collect data in delta table?
818 821
819 822 self._refreshindex()
820 823
821 824 # End of ifilemutation interface.
822 825
823 826 # Start of ifilestorage interface.
824 827
825 828 def files(self):
826 829 return []
827 830
828 831 def storageinfo(self, exclusivefiles=False, sharedfiles=False,
829 832 revisionscount=False, trackedsize=False,
830 833 storedsize=False):
831 834 d = {}
832 835
833 836 if exclusivefiles:
834 837 d['exclusivefiles'] = []
835 838
836 839 if sharedfiles:
837 840 # TODO list sqlite file(s) here.
838 841 d['sharedfiles'] = []
839 842
840 843 if revisionscount:
841 844 d['revisionscount'] = len(self)
842 845
843 846 if trackedsize:
844 847 d['trackedsize'] = sum(len(self.revision(node))
845 848 for node in self._nodetorev)
846 849
847 850 if storedsize:
848 851 # TODO implement this?
849 852 d['storedsize'] = None
850 853
851 854 return d
852 855
853 856 def verifyintegrity(self, state):
854 857 state['skipread'] = set()
855 858
856 859 for rev in self:
857 860 node = self.node(rev)
858 861
859 862 try:
860 863 self.revision(node)
861 864 except Exception as e:
862 865 yield sqliteproblem(
863 866 error=_('unpacking %s: %s') % (short(node), e),
864 867 node=node)
865 868
866 869 state['skipread'].add(node)
867 870
868 871 # End of ifilestorage interface.
869 872
870 873 def _checkhash(self, fulltext, node, p1=None, p2=None):
871 874 if p1 is None and p2 is None:
872 875 p1, p2 = self.parents(node)
873 876
874 877 if node == storageutil.hashrevisionsha1(fulltext, p1, p2):
875 878 return
876 879
877 880 try:
878 881 del self._revisioncache[node]
879 882 except KeyError:
880 883 pass
881 884
882 885 if storageutil.iscensoredtext(fulltext):
883 886 raise error.CensoredNodeError(self._path, node, fulltext)
884 887
885 888 raise SQLiteStoreError(_('integrity check failed on %s') %
886 889 self._path)
887 890
888 891 def _addrawrevision(self, node, revisiondata, transaction, linkrev,
889 892 p1, p2, storedelta=None, flags=0):
890 893 if self._pathid is None:
891 894 res = self._db.execute(
892 895 r'INSERT INTO filepath (path) VALUES (?)', (self._path,))
893 896 self._pathid = res.lastrowid
894 897
895 898 # For simplicity, always store a delta against p1.
896 899 # TODO we need a lot more logic here to make behavior reasonable.
897 900
898 901 if storedelta:
899 902 deltabase, delta = storedelta
900 903
901 904 if isinstance(deltabase, int):
902 905 deltabase = self.node(deltabase)
903 906
904 907 else:
905 908 assert revisiondata is not None
906 909 deltabase = p1
907 910
908 911 if deltabase == nullid:
909 912 delta = revisiondata
910 913 else:
911 914 delta = mdiff.textdiff(self.revision(self.rev(deltabase)),
912 915 revisiondata)
913 916
914 917 # File index stores a pointer to its delta and the parent delta.
915 918 # The parent delta is stored via a pointer to the fileindex PK.
916 919 if deltabase == nullid:
917 920 baseid = None
918 921 else:
919 922 baseid = self._revisions[deltabase].rid
920 923
921 924 # Deltas are stored with a hash of their content. This allows
922 925 # us to de-duplicate. The table is configured to ignore conflicts
923 926 # and it is faster to just insert and silently noop than to look
924 927 # first.
925 928 deltahash = hashlib.sha1(delta).digest()
926 929
927 930 if self._compengine == 'zstd':
928 931 deltablob = self._cctx.compress(delta)
929 932 compression = COMPRESSION_ZSTD
930 933 elif self._compengine == 'zlib':
931 934 deltablob = zlib.compress(delta)
932 935 compression = COMPRESSION_ZLIB
933 936 elif self._compengine == 'none':
934 937 deltablob = delta
935 938 compression = COMPRESSION_NONE
936 939 else:
937 940 raise error.ProgrammingError('unhandled compression engine: %s' %
938 941 self._compengine)
939 942
940 943 # Don't store compressed data if it isn't practical.
941 944 if len(deltablob) >= len(delta):
942 945 deltablob = delta
943 946 compression = COMPRESSION_NONE
944 947
945 948 deltaid = insertdelta(self._db, compression, deltahash, deltablob)
946 949
947 950 rev = len(self)
948 951
949 952 if p1 == nullid:
950 953 p1rev = nullrev
951 954 else:
952 955 p1rev = self._nodetorev[p1]
953 956
954 957 if p2 == nullid:
955 958 p2rev = nullrev
956 959 else:
957 960 p2rev = self._nodetorev[p2]
958 961
959 962 rid = self._db.execute(
960 963 r'INSERT INTO fileindex ('
961 964 r' pathid, revnum, node, p1rev, p2rev, linkrev, flags, '
962 965 r' deltaid, deltabaseid) '
963 966 r' VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)',
964 967 (self._pathid, rev, node, p1rev, p2rev, linkrev, flags,
965 968 deltaid, baseid)
966 969 ).lastrowid
967 970
968 971 entry = revisionentry(
969 972 rid=rid,
970 973 rev=rev,
971 974 node=node,
972 975 p1rev=p1rev,
973 976 p2rev=p2rev,
974 977 p1node=p1,
975 978 p2node=p2,
976 979 linkrev=linkrev,
977 980 flags=flags)
978 981
979 982 self._nodetorev[node] = rev
980 983 self._revtonode[rev] = node
981 984 self._revisions[node] = entry
982 985
983 986 return node
984 987
985 988 class sqliterepository(localrepo.localrepository):
986 989 def cancopy(self):
987 990 return False
988 991
989 992 def transaction(self, *args, **kwargs):
990 993 current = self.currenttransaction()
991 994
992 995 tr = super(sqliterepository, self).transaction(*args, **kwargs)
993 996
994 997 if current:
995 998 return tr
996 999
997 1000 self._dbconn.execute(r'BEGIN TRANSACTION')
998 1001
999 1002 def committransaction(_):
1000 1003 self._dbconn.commit()
1001 1004
1002 1005 tr.addfinalize('sqlitestore', committransaction)
1003 1006
1004 1007 return tr
1005 1008
1006 1009 @property
1007 1010 def _dbconn(self):
1008 1011 # SQLite connections can only be used on the thread that created
1009 1012 # them. In most cases, this "just works." However, hgweb uses
1010 1013 # multiple threads.
1011 1014 tid = threading.current_thread().ident
1012 1015
1013 1016 if self._db:
1014 1017 if self._db[0] == tid:
1015 1018 return self._db[1]
1016 1019
1017 1020 db = makedb(self.svfs.join('db.sqlite'))
1018 1021 self._db = (tid, db)
1019 1022
1020 1023 return db
1021 1024
1022 1025 def makedb(path):
1023 1026 """Construct a database handle for a database at path."""
1024 1027
1025 1028 db = sqlite3.connect(encoding.strfromlocal(path))
1026 1029 db.text_factory = bytes
1027 1030
1028 1031 res = db.execute(r'PRAGMA user_version').fetchone()[0]
1029 1032
1030 1033 # New database.
1031 1034 if res == 0:
1032 1035 for statement in CREATE_SCHEMA:
1033 1036 db.execute(statement)
1034 1037
1035 1038 db.commit()
1036 1039
1037 1040 elif res == CURRENT_SCHEMA_VERSION:
1038 1041 pass
1039 1042
1040 1043 else:
1041 1044 raise error.Abort(_('sqlite database has unrecognized version'))
1042 1045
1043 1046 db.execute(r'PRAGMA journal_mode=WAL')
1044 1047
1045 1048 return db
1046 1049
1047 1050 def featuresetup(ui, supported):
1048 1051 supported.add(REQUIREMENT)
1049 1052
1050 1053 if zstd:
1051 1054 supported.add(REQUIREMENT_ZSTD)
1052 1055
1053 1056 supported.add(REQUIREMENT_ZLIB)
1054 1057 supported.add(REQUIREMENT_NONE)
1055 1058 supported.add(REQUIREMENT_SHALLOW_FILES)
1056 1059 supported.add(repository.NARROW_REQUIREMENT)
1057 1060
1058 1061 def newreporequirements(orig, ui, createopts):
1059 1062 if createopts['backend'] != 'sqlite':
1060 1063 return orig(ui, createopts)
1061 1064
1062 1065 # This restriction can be lifted once we have more confidence.
1063 1066 if 'sharedrepo' in createopts:
1064 1067 raise error.Abort(_('shared repositories not supported with SQLite '
1065 1068 'store'))
1066 1069
1067 1070 # This filtering is out of an abundance of caution: we want to ensure
1068 1071 # we honor creation options and we do that by annotating exactly the
1069 1072 # creation options we recognize.
1070 1073 known = {
1071 1074 'narrowfiles',
1072 1075 'backend',
1073 1076 'shallowfilestore',
1074 1077 }
1075 1078
1076 1079 unsupported = set(createopts) - known
1077 1080 if unsupported:
1078 1081 raise error.Abort(_('SQLite store does not support repo creation '
1079 1082 'option: %s') % ', '.join(sorted(unsupported)))
1080 1083
1081 1084 # Since we're a hybrid store that still relies on revlogs, we fall back
1082 1085 # to using the revlogv1 backend's storage requirements then adding our
1083 1086 # own requirement.
1084 1087 createopts['backend'] = 'revlogv1'
1085 1088 requirements = orig(ui, createopts)
1086 1089 requirements.add(REQUIREMENT)
1087 1090
1088 1091 compression = ui.config('storage', 'sqlite.compression')
1089 1092
1090 1093 if compression == 'zstd' and not zstd:
1091 1094 raise error.Abort(_('storage.sqlite.compression set to "zstd" but '
1092 1095 'zstandard compression not available to this '
1093 1096 'Mercurial install'))
1094 1097
1095 1098 if compression == 'zstd':
1096 1099 requirements.add(REQUIREMENT_ZSTD)
1097 1100 elif compression == 'zlib':
1098 1101 requirements.add(REQUIREMENT_ZLIB)
1099 1102 elif compression == 'none':
1100 1103 requirements.add(REQUIREMENT_NONE)
1101 1104 else:
1102 1105 raise error.Abort(_('unknown compression engine defined in '
1103 1106 'storage.sqlite.compression: %s') % compression)
1104 1107
1105 1108 if createopts.get('shallowfilestore'):
1106 1109 requirements.add(REQUIREMENT_SHALLOW_FILES)
1107 1110
1108 1111 return requirements
1109 1112
1110 1113 @interfaceutil.implementer(repository.ilocalrepositoryfilestorage)
1111 1114 class sqlitefilestorage(object):
1112 1115 """Repository file storage backed by SQLite."""
1113 1116 def file(self, path):
1114 1117 if path[0] == b'/':
1115 1118 path = path[1:]
1116 1119
1117 1120 if REQUIREMENT_ZSTD in self.requirements:
1118 1121 compression = 'zstd'
1119 1122 elif REQUIREMENT_ZLIB in self.requirements:
1120 1123 compression = 'zlib'
1121 1124 elif REQUIREMENT_NONE in self.requirements:
1122 1125 compression = 'none'
1123 1126 else:
1124 1127 raise error.Abort(_('unable to determine what compression engine '
1125 1128 'to use for SQLite storage'))
1126 1129
1127 1130 return sqlitefilestore(self._dbconn, path, compression)
1128 1131
1129 1132 def makefilestorage(orig, requirements, features, **kwargs):
1130 1133 """Produce a type conforming to ``ilocalrepositoryfilestorage``."""
1131 1134 if REQUIREMENT in requirements:
1132 1135 if REQUIREMENT_SHALLOW_FILES in requirements:
1133 1136 features.add(repository.REPO_FEATURE_SHALLOW_FILE_STORAGE)
1134 1137
1135 1138 return sqlitefilestorage
1136 1139 else:
1137 1140 return orig(requirements=requirements, features=features, **kwargs)
1138 1141
1139 1142 def makemain(orig, ui, requirements, **kwargs):
1140 1143 if REQUIREMENT in requirements:
1141 1144 if REQUIREMENT_ZSTD in requirements and not zstd:
1142 1145 raise error.Abort(_('repository uses zstandard compression, which '
1143 1146 'is not available to this Mercurial install'))
1144 1147
1145 1148 return sqliterepository
1146 1149
1147 1150 return orig(requirements=requirements, **kwargs)
1148 1151
1149 1152 def verifierinit(orig, self, *args, **kwargs):
1150 1153 orig(self, *args, **kwargs)
1151 1154
1152 1155 # We don't care that files in the store don't align with what is
1153 1156 # advertised. So suppress these warnings.
1154 1157 self.warnorphanstorefiles = False
1155 1158
1156 1159 def extsetup(ui):
1157 1160 localrepo.featuresetupfuncs.add(featuresetup)
1158 1161 extensions.wrapfunction(localrepo, 'newreporequirements',
1159 1162 newreporequirements)
1160 1163 extensions.wrapfunction(localrepo, 'makefilestorage',
1161 1164 makefilestorage)
1162 1165 extensions.wrapfunction(localrepo, 'makemain',
1163 1166 makemain)
1164 1167 extensions.wrapfunction(verify.verifier, '__init__',
1165 1168 verifierinit)
1166 1169
1167 1170 def reposetup(ui, repo):
1168 1171 if isinstance(repo, sqliterepository):
1169 1172 repo._db = None
1170 1173
1171 1174 # TODO check for bundlerepository?
General Comments 0
You need to be logged in to leave comments. Login now