##// END OF EJS Templates
py3: make sure we pass sysstr in sqlite3.connect()...
Pulkit Goyal -
r40446:3b782669 default
parent child Browse files
Show More
@@ -1,1169 +1,1170 b''
1 1 # sqlitestore.py - Storage backend that uses SQLite
2 2 #
3 3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 """store repository data in SQLite (EXPERIMENTAL)
9 9
10 10 The sqlitestore extension enables the storage of repository data in SQLite.
11 11
12 12 This extension is HIGHLY EXPERIMENTAL. There are NO BACKWARDS COMPATIBILITY
13 13 GUARANTEES. This means that repositories created with this extension may
14 14 only be usable with the exact version of this extension/Mercurial that was
15 15 used. The extension attempts to enforce this in order to prevent repository
16 16 corruption.
17 17
18 18 In addition, several features are not yet supported or have known bugs:
19 19
20 20 * Only some data is stored in SQLite. Changeset, manifest, and other repository
21 21 data is not yet stored in SQLite.
22 22 * Transactions are not robust. If the process is aborted at the right time
23 23 during transaction close/rollback, the repository could be in an inconsistent
24 24 state. This problem will diminish once all repository data is tracked by
25 25 SQLite.
26 26 * Bundle repositories do not work (the ability to use e.g.
27 27 `hg -R <bundle-file> log` to automatically overlay a bundle on top of the
28 28 existing repository).
29 29 * Various other features don't work.
30 30
31 31 This extension should work for basic clone/pull, update, and commit workflows.
32 32 Some history rewriting operations may fail due to lack of support for bundle
33 33 repositories.
34 34
35 35 To use, activate the extension and set the ``storage.new-repo-backend`` config
36 36 option to ``sqlite`` to enable new repositories to use SQLite for storage.
37 37 """
38 38
39 39 # To run the test suite with repos using SQLite by default, execute the
40 40 # following:
41 41 #
42 42 # HGREPOFEATURES="sqlitestore" run-tests.py \
43 43 # --extra-config-opt extensions.sqlitestore= \
44 44 # --extra-config-opt storage.new-repo-backend=sqlite
45 45
46 46 from __future__ import absolute_import
47 47
48 48 import hashlib
49 49 import sqlite3
50 50 import struct
51 51 import threading
52 52 import zlib
53 53
54 54 from mercurial.i18n import _
55 55 from mercurial.node import (
56 56 nullid,
57 57 nullrev,
58 58 short,
59 59 )
60 60 from mercurial.thirdparty import (
61 61 attr,
62 62 )
63 63 from mercurial import (
64 64 ancestor,
65 65 dagop,
66 encoding,
66 67 error,
67 68 extensions,
68 69 localrepo,
69 70 mdiff,
70 71 pycompat,
71 72 registrar,
72 73 repository,
73 74 util,
74 75 verify,
75 76 )
76 77 from mercurial.utils import (
77 78 interfaceutil,
78 79 storageutil,
79 80 )
80 81
81 82 try:
82 83 from mercurial import zstd
83 84 zstd.__version__
84 85 except ImportError:
85 86 zstd = None
86 87
87 88 configtable = {}
88 89 configitem = registrar.configitem(configtable)
89 90
90 91 # experimental config: storage.sqlite.compression
91 92 configitem('storage', 'sqlite.compression',
92 93 default='zstd' if zstd else 'zlib')
93 94
94 95 # Note for extension authors: ONLY specify testedwith = 'ships-with-hg-core' for
95 96 # extensions which SHIP WITH MERCURIAL. Non-mainline extensions should
96 97 # be specifying the version(s) of Mercurial they are tested with, or
97 98 # leave the attribute unspecified.
98 99 testedwith = 'ships-with-hg-core'
99 100
100 101 REQUIREMENT = b'exp-sqlite-001'
101 102 REQUIREMENT_ZSTD = b'exp-sqlite-comp-001=zstd'
102 103 REQUIREMENT_ZLIB = b'exp-sqlite-comp-001=zlib'
103 104 REQUIREMENT_NONE = b'exp-sqlite-comp-001=none'
104 105 REQUIREMENT_SHALLOW_FILES = b'exp-sqlite-shallow-files'
105 106
106 107 CURRENT_SCHEMA_VERSION = 1
107 108
108 109 COMPRESSION_NONE = 1
109 110 COMPRESSION_ZSTD = 2
110 111 COMPRESSION_ZLIB = 3
111 112
112 113 FLAG_CENSORED = 1
113 114 FLAG_MISSING_P1 = 2
114 115 FLAG_MISSING_P2 = 4
115 116
116 117 CREATE_SCHEMA = [
117 118 # Deltas are stored as content-indexed blobs.
118 119 # compression column holds COMPRESSION_* constant for how the
119 120 # delta is encoded.
120 121
121 122 r'CREATE TABLE delta ('
122 123 r' id INTEGER PRIMARY KEY, '
123 124 r' compression INTEGER NOT NULL, '
124 125 r' hash BLOB UNIQUE ON CONFLICT ABORT, '
125 126 r' delta BLOB NOT NULL '
126 127 r')',
127 128
128 129 # Tracked paths are denormalized to integers to avoid redundant
129 130 # storage of the path name.
130 131 r'CREATE TABLE filepath ('
131 132 r' id INTEGER PRIMARY KEY, '
132 133 r' path BLOB NOT NULL '
133 134 r')',
134 135
135 136 r'CREATE UNIQUE INDEX filepath_path '
136 137 r' ON filepath (path)',
137 138
138 139 # We have a single table for all file revision data.
139 140 # Each file revision is uniquely described by a (path, rev) and
140 141 # (path, node).
141 142 #
142 143 # Revision data is stored as a pointer to the delta producing this
143 144 # revision and the file revision whose delta should be applied before
144 145 # that one. One can reconstruct the delta chain by recursively following
145 146 # the delta base revision pointers until one encounters NULL.
146 147 #
147 148 # flags column holds bitwise integer flags controlling storage options.
148 149 # These flags are defined by the FLAG_* constants.
149 150 r'CREATE TABLE fileindex ('
150 151 r' id INTEGER PRIMARY KEY, '
151 152 r' pathid INTEGER REFERENCES filepath(id), '
152 153 r' revnum INTEGER NOT NULL, '
153 154 r' p1rev INTEGER NOT NULL, '
154 155 r' p2rev INTEGER NOT NULL, '
155 156 r' linkrev INTEGER NOT NULL, '
156 157 r' flags INTEGER NOT NULL, '
157 158 r' deltaid INTEGER REFERENCES delta(id), '
158 159 r' deltabaseid INTEGER REFERENCES fileindex(id), '
159 160 r' node BLOB NOT NULL '
160 161 r')',
161 162
162 163 r'CREATE UNIQUE INDEX fileindex_pathrevnum '
163 164 r' ON fileindex (pathid, revnum)',
164 165
165 166 r'CREATE UNIQUE INDEX fileindex_pathnode '
166 167 r' ON fileindex (pathid, node)',
167 168
168 169 # Provide a view over all file data for convenience.
169 170 r'CREATE VIEW filedata AS '
170 171 r'SELECT '
171 172 r' fileindex.id AS id, '
172 173 r' filepath.id AS pathid, '
173 174 r' filepath.path AS path, '
174 175 r' fileindex.revnum AS revnum, '
175 176 r' fileindex.node AS node, '
176 177 r' fileindex.p1rev AS p1rev, '
177 178 r' fileindex.p2rev AS p2rev, '
178 179 r' fileindex.linkrev AS linkrev, '
179 180 r' fileindex.flags AS flags, '
180 181 r' fileindex.deltaid AS deltaid, '
181 182 r' fileindex.deltabaseid AS deltabaseid '
182 183 r'FROM filepath, fileindex '
183 184 r'WHERE fileindex.pathid=filepath.id',
184 185
185 186 r'PRAGMA user_version=%d' % CURRENT_SCHEMA_VERSION,
186 187 ]
187 188
188 189 def resolvedeltachain(db, pathid, node, revisioncache,
189 190 stoprids, zstddctx=None):
190 191 """Resolve a delta chain for a file node."""
191 192
192 193 # TODO the "not in ({stops})" here is possibly slowing down the query
193 194 # because it needs to perform the lookup on every recursive invocation.
194 195 # This could possibly be faster if we created a temporary query with
195 196 # baseid "poisoned" to null and limited the recursive filter to
196 197 # "is not null".
197 198 res = db.execute(
198 199 r'WITH RECURSIVE '
199 200 r' deltachain(deltaid, baseid) AS ('
200 201 r' SELECT deltaid, deltabaseid FROM fileindex '
201 202 r' WHERE pathid=? AND node=? '
202 203 r' UNION ALL '
203 204 r' SELECT fileindex.deltaid, deltabaseid '
204 205 r' FROM fileindex, deltachain '
205 206 r' WHERE '
206 207 r' fileindex.id=deltachain.baseid '
207 208 r' AND deltachain.baseid IS NOT NULL '
208 209 r' AND fileindex.id NOT IN ({stops}) '
209 210 r' ) '
210 211 r'SELECT deltachain.baseid, compression, delta '
211 212 r'FROM deltachain, delta '
212 213 r'WHERE delta.id=deltachain.deltaid'.format(
213 214 stops=r','.join([r'?'] * len(stoprids))),
214 215 tuple([pathid, node] + list(stoprids.keys())))
215 216
216 217 deltas = []
217 218 lastdeltabaseid = None
218 219
219 220 for deltabaseid, compression, delta in res:
220 221 lastdeltabaseid = deltabaseid
221 222
222 223 if compression == COMPRESSION_ZSTD:
223 224 delta = zstddctx.decompress(delta)
224 225 elif compression == COMPRESSION_NONE:
225 226 delta = delta
226 227 elif compression == COMPRESSION_ZLIB:
227 228 delta = zlib.decompress(delta)
228 229 else:
229 230 raise SQLiteStoreError('unhandled compression type: %d' %
230 231 compression)
231 232
232 233 deltas.append(delta)
233 234
234 235 if lastdeltabaseid in stoprids:
235 236 basetext = revisioncache[stoprids[lastdeltabaseid]]
236 237 else:
237 238 basetext = deltas.pop()
238 239
239 240 deltas.reverse()
240 241 fulltext = mdiff.patches(basetext, deltas)
241 242
242 243 # SQLite returns buffer instances for blob columns on Python 2. This
243 244 # type can propagate through the delta application layer. Because
244 245 # downstream callers assume revisions are bytes, cast as needed.
245 246 if not isinstance(fulltext, bytes):
246 247 fulltext = bytes(delta)
247 248
248 249 return fulltext
249 250
250 251 def insertdelta(db, compression, hash, delta):
251 252 try:
252 253 return db.execute(
253 254 r'INSERT INTO delta (compression, hash, delta) '
254 255 r'VALUES (?, ?, ?)',
255 256 (compression, hash, delta)).lastrowid
256 257 except sqlite3.IntegrityError:
257 258 return db.execute(
258 259 r'SELECT id FROM delta WHERE hash=?',
259 260 (hash,)).fetchone()[0]
260 261
261 262 class SQLiteStoreError(error.StorageError):
262 263 pass
263 264
264 265 @attr.s
265 266 class revisionentry(object):
266 267 rid = attr.ib()
267 268 rev = attr.ib()
268 269 node = attr.ib()
269 270 p1rev = attr.ib()
270 271 p2rev = attr.ib()
271 272 p1node = attr.ib()
272 273 p2node = attr.ib()
273 274 linkrev = attr.ib()
274 275 flags = attr.ib()
275 276
276 277 @interfaceutil.implementer(repository.irevisiondelta)
277 278 @attr.s(slots=True)
278 279 class sqliterevisiondelta(object):
279 280 node = attr.ib()
280 281 p1node = attr.ib()
281 282 p2node = attr.ib()
282 283 basenode = attr.ib()
283 284 flags = attr.ib()
284 285 baserevisionsize = attr.ib()
285 286 revision = attr.ib()
286 287 delta = attr.ib()
287 288 linknode = attr.ib(default=None)
288 289
289 290 @interfaceutil.implementer(repository.iverifyproblem)
290 291 @attr.s(frozen=True)
291 292 class sqliteproblem(object):
292 293 warning = attr.ib(default=None)
293 294 error = attr.ib(default=None)
294 295 node = attr.ib(default=None)
295 296
296 297 @interfaceutil.implementer(repository.ifilestorage)
297 298 class sqlitefilestore(object):
298 299 """Implements storage for an individual tracked path."""
299 300
300 301 def __init__(self, db, path, compression):
301 302 self._db = db
302 303 self._path = path
303 304
304 305 self._pathid = None
305 306
306 307 # revnum -> node
307 308 self._revtonode = {}
308 309 # node -> revnum
309 310 self._nodetorev = {}
310 311 # node -> data structure
311 312 self._revisions = {}
312 313
313 314 self._revisioncache = util.lrucachedict(10)
314 315
315 316 self._compengine = compression
316 317
317 318 if compression == 'zstd':
318 319 self._cctx = zstd.ZstdCompressor(level=3)
319 320 self._dctx = zstd.ZstdDecompressor()
320 321 else:
321 322 self._cctx = None
322 323 self._dctx = None
323 324
324 325 self._refreshindex()
325 326
326 327 def _refreshindex(self):
327 328 self._revtonode = {}
328 329 self._nodetorev = {}
329 330 self._revisions = {}
330 331
331 332 res = list(self._db.execute(
332 333 r'SELECT id FROM filepath WHERE path=?', (self._path,)))
333 334
334 335 if not res:
335 336 self._pathid = None
336 337 return
337 338
338 339 self._pathid = res[0][0]
339 340
340 341 res = self._db.execute(
341 342 r'SELECT id, revnum, node, p1rev, p2rev, linkrev, flags '
342 343 r'FROM fileindex '
343 344 r'WHERE pathid=? '
344 345 r'ORDER BY revnum ASC',
345 346 (self._pathid,))
346 347
347 348 for i, row in enumerate(res):
348 349 rid, rev, node, p1rev, p2rev, linkrev, flags = row
349 350
350 351 if i != rev:
351 352 raise SQLiteStoreError(_('sqlite database has inconsistent '
352 353 'revision numbers'))
353 354
354 355 if p1rev == nullrev:
355 356 p1node = nullid
356 357 else:
357 358 p1node = self._revtonode[p1rev]
358 359
359 360 if p2rev == nullrev:
360 361 p2node = nullid
361 362 else:
362 363 p2node = self._revtonode[p2rev]
363 364
364 365 entry = revisionentry(
365 366 rid=rid,
366 367 rev=rev,
367 368 node=node,
368 369 p1rev=p1rev,
369 370 p2rev=p2rev,
370 371 p1node=p1node,
371 372 p2node=p2node,
372 373 linkrev=linkrev,
373 374 flags=flags)
374 375
375 376 self._revtonode[rev] = node
376 377 self._nodetorev[node] = rev
377 378 self._revisions[node] = entry
378 379
379 380 # Start of ifileindex interface.
380 381
381 382 def __len__(self):
382 383 return len(self._revisions)
383 384
384 385 def __iter__(self):
385 386 return iter(pycompat.xrange(len(self._revisions)))
386 387
387 388 def hasnode(self, node):
388 389 if node == nullid:
389 390 return False
390 391
391 392 return node in self._nodetorev
392 393
393 394 def revs(self, start=0, stop=None):
394 395 return storageutil.iterrevs(len(self._revisions), start=start,
395 396 stop=stop)
396 397
397 398 def parents(self, node):
398 399 if node == nullid:
399 400 return nullid, nullid
400 401
401 402 if node not in self._revisions:
402 403 raise error.LookupError(node, self._path, _('no node'))
403 404
404 405 entry = self._revisions[node]
405 406 return entry.p1node, entry.p2node
406 407
407 408 def parentrevs(self, rev):
408 409 if rev == nullrev:
409 410 return nullrev, nullrev
410 411
411 412 if rev not in self._revtonode:
412 413 raise IndexError(rev)
413 414
414 415 entry = self._revisions[self._revtonode[rev]]
415 416 return entry.p1rev, entry.p2rev
416 417
417 418 def rev(self, node):
418 419 if node == nullid:
419 420 return nullrev
420 421
421 422 if node not in self._nodetorev:
422 423 raise error.LookupError(node, self._path, _('no node'))
423 424
424 425 return self._nodetorev[node]
425 426
426 427 def node(self, rev):
427 428 if rev == nullrev:
428 429 return nullid
429 430
430 431 if rev not in self._revtonode:
431 432 raise IndexError(rev)
432 433
433 434 return self._revtonode[rev]
434 435
435 436 def lookup(self, node):
436 437 return storageutil.fileidlookup(self, node, self._path)
437 438
438 439 def linkrev(self, rev):
439 440 if rev == nullrev:
440 441 return nullrev
441 442
442 443 if rev not in self._revtonode:
443 444 raise IndexError(rev)
444 445
445 446 entry = self._revisions[self._revtonode[rev]]
446 447 return entry.linkrev
447 448
448 449 def iscensored(self, rev):
449 450 if rev == nullrev:
450 451 return False
451 452
452 453 if rev not in self._revtonode:
453 454 raise IndexError(rev)
454 455
455 456 return self._revisions[self._revtonode[rev]].flags & FLAG_CENSORED
456 457
457 458 def commonancestorsheads(self, node1, node2):
458 459 rev1 = self.rev(node1)
459 460 rev2 = self.rev(node2)
460 461
461 462 ancestors = ancestor.commonancestorsheads(self.parentrevs, rev1, rev2)
462 463 return pycompat.maplist(self.node, ancestors)
463 464
464 465 def descendants(self, revs):
465 466 # TODO we could implement this using a recursive SQL query, which
466 467 # might be faster.
467 468 return dagop.descendantrevs(revs, self.revs, self.parentrevs)
468 469
469 470 def heads(self, start=None, stop=None):
470 471 if start is None and stop is None:
471 472 if not len(self):
472 473 return [nullid]
473 474
474 475 startrev = self.rev(start) if start is not None else nullrev
475 476 stoprevs = {self.rev(n) for n in stop or []}
476 477
477 478 revs = dagop.headrevssubset(self.revs, self.parentrevs,
478 479 startrev=startrev, stoprevs=stoprevs)
479 480
480 481 return [self.node(rev) for rev in revs]
481 482
482 483 def children(self, node):
483 484 rev = self.rev(node)
484 485
485 486 res = self._db.execute(
486 487 r'SELECT'
487 488 r' node '
488 489 r' FROM filedata '
489 490 r' WHERE path=? AND (p1rev=? OR p2rev=?) '
490 491 r' ORDER BY revnum ASC',
491 492 (self._path, rev, rev))
492 493
493 494 return [row[0] for row in res]
494 495
495 496 # End of ifileindex interface.
496 497
497 498 # Start of ifiledata interface.
498 499
499 500 def size(self, rev):
500 501 if rev == nullrev:
501 502 return 0
502 503
503 504 if rev not in self._revtonode:
504 505 raise IndexError(rev)
505 506
506 507 node = self._revtonode[rev]
507 508
508 509 if self.renamed(node):
509 510 return len(self.read(node))
510 511
511 512 return len(self.revision(node))
512 513
513 514 def revision(self, node, raw=False, _verifyhash=True):
514 515 if node in (nullid, nullrev):
515 516 return b''
516 517
517 518 if isinstance(node, int):
518 519 node = self.node(node)
519 520
520 521 if node not in self._nodetorev:
521 522 raise error.LookupError(node, self._path, _('no node'))
522 523
523 524 if node in self._revisioncache:
524 525 return self._revisioncache[node]
525 526
526 527 # Because we have a fulltext revision cache, we are able to
527 528 # short-circuit delta chain traversal and decompression as soon as
528 529 # we encounter a revision in the cache.
529 530
530 531 stoprids = {self._revisions[n].rid: n
531 532 for n in self._revisioncache}
532 533
533 534 if not stoprids:
534 535 stoprids[-1] = None
535 536
536 537 fulltext = resolvedeltachain(self._db, self._pathid, node,
537 538 self._revisioncache, stoprids,
538 539 zstddctx=self._dctx)
539 540
540 541 # Don't verify hashes if parent nodes were rewritten, as the hash
541 542 # wouldn't verify.
542 543 if self._revisions[node].flags & (FLAG_MISSING_P1 | FLAG_MISSING_P2):
543 544 _verifyhash = False
544 545
545 546 if _verifyhash:
546 547 self._checkhash(fulltext, node)
547 548 self._revisioncache[node] = fulltext
548 549
549 550 return fulltext
550 551
551 552 def read(self, node):
552 553 return storageutil.filtermetadata(self.revision(node))
553 554
554 555 def renamed(self, node):
555 556 return storageutil.filerevisioncopied(self, node)
556 557
557 558 def cmp(self, node, fulltext):
558 559 return not storageutil.filedataequivalent(self, node, fulltext)
559 560
560 561 def emitrevisions(self, nodes, nodesorder=None, revisiondata=False,
561 562 assumehaveparentrevisions=False, deltaprevious=False):
562 563 if nodesorder not in ('nodes', 'storage', None):
563 564 raise error.ProgrammingError('unhandled value for nodesorder: %s' %
564 565 nodesorder)
565 566
566 567 nodes = [n for n in nodes if n != nullid]
567 568
568 569 if not nodes:
569 570 return
570 571
571 572 # TODO perform in a single query.
572 573 res = self._db.execute(
573 574 r'SELECT revnum, deltaid FROM fileindex '
574 575 r'WHERE pathid=? '
575 576 r' AND node in (%s)' % (r','.join([r'?'] * len(nodes))),
576 577 tuple([self._pathid] + nodes))
577 578
578 579 deltabases = {}
579 580
580 581 for rev, deltaid in res:
581 582 res = self._db.execute(
582 583 r'SELECT revnum from fileindex WHERE pathid=? AND deltaid=?',
583 584 (self._pathid, deltaid))
584 585 deltabases[rev] = res.fetchone()[0]
585 586
586 587 # TODO define revdifffn so we can use delta from storage.
587 588 for delta in storageutil.emitrevisions(
588 589 self, nodes, nodesorder, sqliterevisiondelta,
589 590 deltaparentfn=deltabases.__getitem__,
590 591 revisiondata=revisiondata,
591 592 assumehaveparentrevisions=assumehaveparentrevisions,
592 593 deltaprevious=deltaprevious):
593 594
594 595 yield delta
595 596
596 597 # End of ifiledata interface.
597 598
598 599 # Start of ifilemutation interface.
599 600
600 601 def add(self, filedata, meta, transaction, linkrev, p1, p2):
601 602 if meta or filedata.startswith(b'\x01\n'):
602 603 filedata = storageutil.packmeta(meta, filedata)
603 604
604 605 return self.addrevision(filedata, transaction, linkrev, p1, p2)
605 606
606 607 def addrevision(self, revisiondata, transaction, linkrev, p1, p2, node=None,
607 608 flags=0, cachedelta=None):
608 609 if flags:
609 610 raise SQLiteStoreError(_('flags not supported on revisions'))
610 611
611 612 validatehash = node is not None
612 613 node = node or storageutil.hashrevisionsha1(revisiondata, p1, p2)
613 614
614 615 if validatehash:
615 616 self._checkhash(revisiondata, node, p1, p2)
616 617
617 618 if node in self._nodetorev:
618 619 return node
619 620
620 621 node = self._addrawrevision(node, revisiondata, transaction, linkrev,
621 622 p1, p2)
622 623
623 624 self._revisioncache[node] = revisiondata
624 625 return node
625 626
626 627 def addgroup(self, deltas, linkmapper, transaction, addrevisioncb=None,
627 628 maybemissingparents=False):
628 629 nodes = []
629 630
630 631 for node, p1, p2, linknode, deltabase, delta, wireflags in deltas:
631 632 storeflags = 0
632 633
633 634 if wireflags & repository.REVISION_FLAG_CENSORED:
634 635 storeflags |= FLAG_CENSORED
635 636
636 637 if wireflags & ~repository.REVISION_FLAG_CENSORED:
637 638 raise SQLiteStoreError('unhandled revision flag')
638 639
639 640 if maybemissingparents:
640 641 if p1 != nullid and not self.hasnode(p1):
641 642 p1 = nullid
642 643 storeflags |= FLAG_MISSING_P1
643 644
644 645 if p2 != nullid and not self.hasnode(p2):
645 646 p2 = nullid
646 647 storeflags |= FLAG_MISSING_P2
647 648
648 649 baserev = self.rev(deltabase)
649 650
650 651 # If base is censored, delta must be full replacement in a single
651 652 # patch operation.
652 653 if baserev != nullrev and self.iscensored(baserev):
653 654 hlen = struct.calcsize('>lll')
654 655 oldlen = len(self.revision(deltabase, raw=True,
655 656 _verifyhash=False))
656 657 newlen = len(delta) - hlen
657 658
658 659 if delta[:hlen] != mdiff.replacediffheader(oldlen, newlen):
659 660 raise error.CensoredBaseError(self._path,
660 661 deltabase)
661 662
662 663 if (not (storeflags & FLAG_CENSORED)
663 664 and storageutil.deltaiscensored(
664 665 delta, baserev, lambda x: len(self.revision(x, raw=True)))):
665 666 storeflags |= FLAG_CENSORED
666 667
667 668 linkrev = linkmapper(linknode)
668 669
669 670 nodes.append(node)
670 671
671 672 if node in self._revisions:
672 673 # Possibly reset parents to make them proper.
673 674 entry = self._revisions[node]
674 675
675 676 if entry.flags & FLAG_MISSING_P1 and p1 != nullid:
676 677 entry.p1node = p1
677 678 entry.p1rev = self._nodetorev[p1]
678 679 entry.flags &= ~FLAG_MISSING_P1
679 680
680 681 self._db.execute(
681 682 r'UPDATE fileindex SET p1rev=?, flags=? '
682 683 r'WHERE id=?',
683 684 (self._nodetorev[p1], entry.flags, entry.rid))
684 685
685 686 if entry.flags & FLAG_MISSING_P2 and p2 != nullid:
686 687 entry.p2node = p2
687 688 entry.p2rev = self._nodetorev[p2]
688 689 entry.flags &= ~FLAG_MISSING_P2
689 690
690 691 self._db.execute(
691 692 r'UPDATE fileindex SET p2rev=?, flags=? '
692 693 r'WHERE id=?',
693 694 (self._nodetorev[p1], entry.flags, entry.rid))
694 695
695 696 continue
696 697
697 698 if deltabase == nullid:
698 699 text = mdiff.patch(b'', delta)
699 700 storedelta = None
700 701 else:
701 702 text = None
702 703 storedelta = (deltabase, delta)
703 704
704 705 self._addrawrevision(node, text, transaction, linkrev, p1, p2,
705 706 storedelta=storedelta, flags=storeflags)
706 707
707 708 if addrevisioncb:
708 709 addrevisioncb(self, node)
709 710
710 711 return nodes
711 712
712 713 def censorrevision(self, tr, censornode, tombstone=b''):
713 714 tombstone = storageutil.packmeta({b'censored': tombstone}, b'')
714 715
715 716 # This restriction is cargo culted from revlogs and makes no sense for
716 717 # SQLite, since columns can be resized at will.
717 718 if len(tombstone) > len(self.revision(censornode, raw=True)):
718 719 raise error.Abort(_('censor tombstone must be no longer than '
719 720 'censored data'))
720 721
721 722 # We need to replace the censored revision's data with the tombstone.
722 723 # But replacing that data will have implications for delta chains that
723 724 # reference it.
724 725 #
725 726 # While "better," more complex strategies are possible, we do something
726 727 # simple: we find delta chain children of the censored revision and we
727 728 # replace those incremental deltas with fulltexts of their corresponding
728 729 # revision. Then we delete the now-unreferenced delta and original
729 730 # revision and insert a replacement.
730 731
731 732 # Find the delta to be censored.
732 733 censoreddeltaid = self._db.execute(
733 734 r'SELECT deltaid FROM fileindex WHERE id=?',
734 735 (self._revisions[censornode].rid,)).fetchone()[0]
735 736
736 737 # Find all its delta chain children.
737 738 # TODO once we support storing deltas for !files, we'll need to look
738 739 # for those delta chains too.
739 740 rows = list(self._db.execute(
740 741 r'SELECT id, pathid, node FROM fileindex '
741 742 r'WHERE deltabaseid=? OR deltaid=?',
742 743 (censoreddeltaid, censoreddeltaid)))
743 744
744 745 for row in rows:
745 746 rid, pathid, node = row
746 747
747 748 fulltext = resolvedeltachain(self._db, pathid, node, {}, {-1: None},
748 749 zstddctx=self._dctx)
749 750
750 751 deltahash = hashlib.sha1(fulltext).digest()
751 752
752 753 if self._compengine == 'zstd':
753 754 deltablob = self._cctx.compress(fulltext)
754 755 compression = COMPRESSION_ZSTD
755 756 elif self._compengine == 'zlib':
756 757 deltablob = zlib.compress(fulltext)
757 758 compression = COMPRESSION_ZLIB
758 759 elif self._compengine == 'none':
759 760 deltablob = fulltext
760 761 compression = COMPRESSION_NONE
761 762 else:
762 763 raise error.ProgrammingError('unhandled compression engine: %s'
763 764 % self._compengine)
764 765
765 766 if len(deltablob) >= len(fulltext):
766 767 deltablob = fulltext
767 768 compression = COMPRESSION_NONE
768 769
769 770 deltaid = insertdelta(self._db, compression, deltahash, deltablob)
770 771
771 772 self._db.execute(
772 773 r'UPDATE fileindex SET deltaid=?, deltabaseid=NULL '
773 774 r'WHERE id=?', (deltaid, rid))
774 775
775 776 # Now create the tombstone delta and replace the delta on the censored
776 777 # node.
777 778 deltahash = hashlib.sha1(tombstone).digest()
778 779 tombstonedeltaid = insertdelta(self._db, COMPRESSION_NONE,
779 780 deltahash, tombstone)
780 781
781 782 flags = self._revisions[censornode].flags
782 783 flags |= FLAG_CENSORED
783 784
784 785 self._db.execute(
785 786 r'UPDATE fileindex SET flags=?, deltaid=?, deltabaseid=NULL '
786 787 r'WHERE pathid=? AND node=?',
787 788 (flags, tombstonedeltaid, self._pathid, censornode))
788 789
789 790 self._db.execute(
790 791 r'DELETE FROM delta WHERE id=?', (censoreddeltaid,))
791 792
792 793 self._refreshindex()
793 794 self._revisioncache.clear()
794 795
795 796 def getstrippoint(self, minlink):
796 797 return storageutil.resolvestripinfo(minlink, len(self) - 1,
797 798 [self.rev(n) for n in self.heads()],
798 799 self.linkrev,
799 800 self.parentrevs)
800 801
801 802 def strip(self, minlink, transaction):
802 803 if not len(self):
803 804 return
804 805
805 806 rev, _ignored = self.getstrippoint(minlink)
806 807
807 808 if rev == len(self):
808 809 return
809 810
810 811 for rev in self.revs(rev):
811 812 self._db.execute(
812 813 r'DELETE FROM fileindex WHERE pathid=? AND node=?',
813 814 (self._pathid, self.node(rev)))
814 815
815 816 # TODO how should we garbage collect data in delta table?
816 817
817 818 self._refreshindex()
818 819
819 820 # End of ifilemutation interface.
820 821
821 822 # Start of ifilestorage interface.
822 823
823 824 def files(self):
824 825 return []
825 826
826 827 def storageinfo(self, exclusivefiles=False, sharedfiles=False,
827 828 revisionscount=False, trackedsize=False,
828 829 storedsize=False):
829 830 d = {}
830 831
831 832 if exclusivefiles:
832 833 d['exclusivefiles'] = []
833 834
834 835 if sharedfiles:
835 836 # TODO list sqlite file(s) here.
836 837 d['sharedfiles'] = []
837 838
838 839 if revisionscount:
839 840 d['revisionscount'] = len(self)
840 841
841 842 if trackedsize:
842 843 d['trackedsize'] = sum(len(self.revision(node))
843 844 for node in self._nodetorev)
844 845
845 846 if storedsize:
846 847 # TODO implement this?
847 848 d['storedsize'] = None
848 849
849 850 return d
850 851
851 852 def verifyintegrity(self, state):
852 853 state['skipread'] = set()
853 854
854 855 for rev in self:
855 856 node = self.node(rev)
856 857
857 858 try:
858 859 self.revision(node)
859 860 except Exception as e:
860 861 yield sqliteproblem(
861 862 error=_('unpacking %s: %s') % (short(node), e),
862 863 node=node)
863 864
864 865 state['skipread'].add(node)
865 866
866 867 # End of ifilestorage interface.
867 868
868 869 def _checkhash(self, fulltext, node, p1=None, p2=None):
869 870 if p1 is None and p2 is None:
870 871 p1, p2 = self.parents(node)
871 872
872 873 if node == storageutil.hashrevisionsha1(fulltext, p1, p2):
873 874 return
874 875
875 876 try:
876 877 del self._revisioncache[node]
877 878 except KeyError:
878 879 pass
879 880
880 881 if storageutil.iscensoredtext(fulltext):
881 882 raise error.CensoredNodeError(self._path, node, fulltext)
882 883
883 884 raise SQLiteStoreError(_('integrity check failed on %s') %
884 885 self._path)
885 886
886 887 def _addrawrevision(self, node, revisiondata, transaction, linkrev,
887 888 p1, p2, storedelta=None, flags=0):
888 889 if self._pathid is None:
889 890 res = self._db.execute(
890 891 r'INSERT INTO filepath (path) VALUES (?)', (self._path,))
891 892 self._pathid = res.lastrowid
892 893
893 894 # For simplicity, always store a delta against p1.
894 895 # TODO we need a lot more logic here to make behavior reasonable.
895 896
896 897 if storedelta:
897 898 deltabase, delta = storedelta
898 899
899 900 if isinstance(deltabase, int):
900 901 deltabase = self.node(deltabase)
901 902
902 903 else:
903 904 assert revisiondata is not None
904 905 deltabase = p1
905 906
906 907 if deltabase == nullid:
907 908 delta = revisiondata
908 909 else:
909 910 delta = mdiff.textdiff(self.revision(self.rev(deltabase)),
910 911 revisiondata)
911 912
912 913 # File index stores a pointer to its delta and the parent delta.
913 914 # The parent delta is stored via a pointer to the fileindex PK.
914 915 if deltabase == nullid:
915 916 baseid = None
916 917 else:
917 918 baseid = self._revisions[deltabase].rid
918 919
919 920 # Deltas are stored with a hash of their content. This allows
920 921 # us to de-duplicate. The table is configured to ignore conflicts
921 922 # and it is faster to just insert and silently noop than to look
922 923 # first.
923 924 deltahash = hashlib.sha1(delta).digest()
924 925
925 926 if self._compengine == 'zstd':
926 927 deltablob = self._cctx.compress(delta)
927 928 compression = COMPRESSION_ZSTD
928 929 elif self._compengine == 'zlib':
929 930 deltablob = zlib.compress(delta)
930 931 compression = COMPRESSION_ZLIB
931 932 elif self._compengine == 'none':
932 933 deltablob = delta
933 934 compression = COMPRESSION_NONE
934 935 else:
935 936 raise error.ProgrammingError('unhandled compression engine: %s' %
936 937 self._compengine)
937 938
938 939 # Don't store compressed data if it isn't practical.
939 940 if len(deltablob) >= len(delta):
940 941 deltablob = delta
941 942 compression = COMPRESSION_NONE
942 943
943 944 deltaid = insertdelta(self._db, compression, deltahash, deltablob)
944 945
945 946 rev = len(self)
946 947
947 948 if p1 == nullid:
948 949 p1rev = nullrev
949 950 else:
950 951 p1rev = self._nodetorev[p1]
951 952
952 953 if p2 == nullid:
953 954 p2rev = nullrev
954 955 else:
955 956 p2rev = self._nodetorev[p2]
956 957
957 958 rid = self._db.execute(
958 959 r'INSERT INTO fileindex ('
959 960 r' pathid, revnum, node, p1rev, p2rev, linkrev, flags, '
960 961 r' deltaid, deltabaseid) '
961 962 r' VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)',
962 963 (self._pathid, rev, node, p1rev, p2rev, linkrev, flags,
963 964 deltaid, baseid)
964 965 ).lastrowid
965 966
966 967 entry = revisionentry(
967 968 rid=rid,
968 969 rev=rev,
969 970 node=node,
970 971 p1rev=p1rev,
971 972 p2rev=p2rev,
972 973 p1node=p1,
973 974 p2node=p2,
974 975 linkrev=linkrev,
975 976 flags=flags)
976 977
977 978 self._nodetorev[node] = rev
978 979 self._revtonode[rev] = node
979 980 self._revisions[node] = entry
980 981
981 982 return node
982 983
983 984 class sqliterepository(localrepo.localrepository):
984 985 def cancopy(self):
985 986 return False
986 987
987 988 def transaction(self, *args, **kwargs):
988 989 current = self.currenttransaction()
989 990
990 991 tr = super(sqliterepository, self).transaction(*args, **kwargs)
991 992
992 993 if current:
993 994 return tr
994 995
995 996 self._dbconn.execute(r'BEGIN TRANSACTION')
996 997
997 998 def committransaction(_):
998 999 self._dbconn.commit()
999 1000
1000 1001 tr.addfinalize('sqlitestore', committransaction)
1001 1002
1002 1003 return tr
1003 1004
1004 1005 @property
1005 1006 def _dbconn(self):
1006 1007 # SQLite connections can only be used on the thread that created
1007 1008 # them. In most cases, this "just works." However, hgweb uses
1008 1009 # multiple threads.
1009 1010 tid = threading.current_thread().ident
1010 1011
1011 1012 if self._db:
1012 1013 if self._db[0] == tid:
1013 1014 return self._db[1]
1014 1015
1015 1016 db = makedb(self.svfs.join('db.sqlite'))
1016 1017 self._db = (tid, db)
1017 1018
1018 1019 return db
1019 1020
1020 1021 def makedb(path):
1021 1022 """Construct a database handle for a database at path."""
1022 1023
1023 db = sqlite3.connect(path)
1024 db = sqlite3.connect(encoding.strfromlocal(path))
1024 1025 db.text_factory = bytes
1025 1026
1026 1027 res = db.execute(r'PRAGMA user_version').fetchone()[0]
1027 1028
1028 1029 # New database.
1029 1030 if res == 0:
1030 1031 for statement in CREATE_SCHEMA:
1031 1032 db.execute(statement)
1032 1033
1033 1034 db.commit()
1034 1035
1035 1036 elif res == CURRENT_SCHEMA_VERSION:
1036 1037 pass
1037 1038
1038 1039 else:
1039 1040 raise error.Abort(_('sqlite database has unrecognized version'))
1040 1041
1041 1042 db.execute(r'PRAGMA journal_mode=WAL')
1042 1043
1043 1044 return db
1044 1045
1045 1046 def featuresetup(ui, supported):
1046 1047 supported.add(REQUIREMENT)
1047 1048
1048 1049 if zstd:
1049 1050 supported.add(REQUIREMENT_ZSTD)
1050 1051
1051 1052 supported.add(REQUIREMENT_ZLIB)
1052 1053 supported.add(REQUIREMENT_NONE)
1053 1054 supported.add(REQUIREMENT_SHALLOW_FILES)
1054 1055 supported.add(repository.NARROW_REQUIREMENT)
1055 1056
1056 1057 def newreporequirements(orig, ui, createopts):
1057 1058 if createopts['backend'] != 'sqlite':
1058 1059 return orig(ui, createopts)
1059 1060
1060 1061 # This restriction can be lifted once we have more confidence.
1061 1062 if 'sharedrepo' in createopts:
1062 1063 raise error.Abort(_('shared repositories not supported with SQLite '
1063 1064 'store'))
1064 1065
1065 1066 # This filtering is out of an abundance of caution: we want to ensure
1066 1067 # we honor creation options and we do that by annotating exactly the
1067 1068 # creation options we recognize.
1068 1069 known = {
1069 1070 'narrowfiles',
1070 1071 'backend',
1071 1072 'shallowfilestore',
1072 1073 }
1073 1074
1074 1075 unsupported = set(createopts) - known
1075 1076 if unsupported:
1076 1077 raise error.Abort(_('SQLite store does not support repo creation '
1077 1078 'option: %s') % ', '.join(sorted(unsupported)))
1078 1079
1079 1080 # Since we're a hybrid store that still relies on revlogs, we fall back
1080 1081 # to using the revlogv1 backend's storage requirements then adding our
1081 1082 # own requirement.
1082 1083 createopts['backend'] = 'revlogv1'
1083 1084 requirements = orig(ui, createopts)
1084 1085 requirements.add(REQUIREMENT)
1085 1086
1086 1087 compression = ui.config('storage', 'sqlite.compression')
1087 1088
1088 1089 if compression == 'zstd' and not zstd:
1089 1090 raise error.Abort(_('storage.sqlite.compression set to "zstd" but '
1090 1091 'zstandard compression not available to this '
1091 1092 'Mercurial install'))
1092 1093
1093 1094 if compression == 'zstd':
1094 1095 requirements.add(REQUIREMENT_ZSTD)
1095 1096 elif compression == 'zlib':
1096 1097 requirements.add(REQUIREMENT_ZLIB)
1097 1098 elif compression == 'none':
1098 1099 requirements.add(REQUIREMENT_NONE)
1099 1100 else:
1100 1101 raise error.Abort(_('unknown compression engine defined in '
1101 1102 'storage.sqlite.compression: %s') % compression)
1102 1103
1103 1104 if createopts.get('shallowfilestore'):
1104 1105 requirements.add(REQUIREMENT_SHALLOW_FILES)
1105 1106
1106 1107 return requirements
1107 1108
1108 1109 @interfaceutil.implementer(repository.ilocalrepositoryfilestorage)
1109 1110 class sqlitefilestorage(object):
1110 1111 """Repository file storage backed by SQLite."""
1111 1112 def file(self, path):
1112 1113 if path[0] == b'/':
1113 1114 path = path[1:]
1114 1115
1115 1116 if REQUIREMENT_ZSTD in self.requirements:
1116 1117 compression = 'zstd'
1117 1118 elif REQUIREMENT_ZLIB in self.requirements:
1118 1119 compression = 'zlib'
1119 1120 elif REQUIREMENT_NONE in self.requirements:
1120 1121 compression = 'none'
1121 1122 else:
1122 1123 raise error.Abort(_('unable to determine what compression engine '
1123 1124 'to use for SQLite storage'))
1124 1125
1125 1126 return sqlitefilestore(self._dbconn, path, compression)
1126 1127
1127 1128 def makefilestorage(orig, requirements, features, **kwargs):
1128 1129 """Produce a type conforming to ``ilocalrepositoryfilestorage``."""
1129 1130 if REQUIREMENT in requirements:
1130 1131 if REQUIREMENT_SHALLOW_FILES in requirements:
1131 1132 features.add(repository.REPO_FEATURE_SHALLOW_FILE_STORAGE)
1132 1133
1133 1134 return sqlitefilestorage
1134 1135 else:
1135 1136 return orig(requirements=requirements, features=features, **kwargs)
1136 1137
1137 1138 def makemain(orig, ui, requirements, **kwargs):
1138 1139 if REQUIREMENT in requirements:
1139 1140 if REQUIREMENT_ZSTD in requirements and not zstd:
1140 1141 raise error.Abort(_('repository uses zstandard compression, which '
1141 1142 'is not available to this Mercurial install'))
1142 1143
1143 1144 return sqliterepository
1144 1145
1145 1146 return orig(requirements=requirements, **kwargs)
1146 1147
1147 1148 def verifierinit(orig, self, *args, **kwargs):
1148 1149 orig(self, *args, **kwargs)
1149 1150
1150 1151 # We don't care that files in the store don't align with what is
1151 1152 # advertised. So suppress these warnings.
1152 1153 self.warnorphanstorefiles = False
1153 1154
1154 1155 def extsetup(ui):
1155 1156 localrepo.featuresetupfuncs.add(featuresetup)
1156 1157 extensions.wrapfunction(localrepo, 'newreporequirements',
1157 1158 newreporequirements)
1158 1159 extensions.wrapfunction(localrepo, 'makefilestorage',
1159 1160 makefilestorage)
1160 1161 extensions.wrapfunction(localrepo, 'makemain',
1161 1162 makemain)
1162 1163 extensions.wrapfunction(verify.verifier, '__init__',
1163 1164 verifierinit)
1164 1165
1165 1166 def reposetup(ui, repo):
1166 1167 if isinstance(repo, sqliterepository):
1167 1168 repo._db = None
1168 1169
1169 1170 # TODO check for bundlerepository?
General Comments 0
You need to be logged in to leave comments. Login now