##// END OF EJS Templates
revlog: subclass the new `repository.iverifyproblem` Protocol class...
revlog: subclass the new `repository.iverifyproblem` Protocol class This is the same transformation as 3a90a6fd710d did for dirstate, but the CamelCase naming was already cleaned up here. We shouldn't have to explicitly subclass, but I'm doing so to test the interplay of regular attributes and the `attrs` class. Also, PyCharm has a nifty feature that puts a jump point in the gutter to navigate back and forth between the base class and subclasses (and override functions and base class functions) when there's an explicit subclassing. Additionally, PyCharm will immediately flag signature mismatches without a 40m pytype run.

File last commit:

r52596:ca7bde5d default
r53365:4ef6dbc2 default
Show More
test-remotefilelog-datapack.py
415 lines | 12.8 KiB | text/x-python | PythonLexer
/ tests / test-remotefilelog-datapack.py
#!/usr/bin/env python
import hashlib
import os
import random
import shutil
import stat
import struct
import sys
import tempfile
import time
import unittest
import silenttestrunner
# Load the local remotefilelog, not the system one
sys.path[0:0] = [os.path.join(os.path.dirname(__file__), '..')]
from mercurial.node import sha1nodeconstants
from mercurial import policy
if not policy._packageprefs.get(policy.policy, (False, False))[1]:
if __name__ == '__main__':
msg = "skipped: pure module not available with module policy:"
print(msg, policy.policy, file=sys.stderr)
sys.exit(80)
from mercurial import (
pycompat,
ui as uimod,
)
from hgext.remotefilelog import (
basepack,
constants,
datapack,
)
class datapacktestsbase:
def __init__(self, datapackreader, paramsavailable):
self.datapackreader = datapackreader
self.paramsavailable = paramsavailable
def setUp(self):
self.tempdirs = []
def tearDown(self):
for d in self.tempdirs:
shutil.rmtree(d)
def makeTempDir(self):
tempdir = pycompat.bytestr(tempfile.mkdtemp())
self.tempdirs.append(tempdir)
return tempdir
def getHash(self, content):
return hashlib.sha1(content).digest()
def getFakeHash(self):
return b''.join(
pycompat.bytechr(random.randint(0, 255)) for _ in range(20)
)
def createPack(self, revisions=None, packdir=None):
if revisions is None:
revisions = [
(
b"filename",
self.getFakeHash(),
sha1nodeconstants.nullid,
b"content",
)
]
if packdir is None:
packdir = self.makeTempDir()
packer = datapack.mutabledatapack(uimod.ui(), packdir, version=2)
for args in revisions:
filename, node, base, content = args[0:4]
# meta is optional
meta = None
if len(args) > 4:
meta = args[4]
packer.add(filename, node, base, content, metadata=meta)
path = packer.close()
return self.datapackreader(path)
def _testAddSingle(self, content):
"""Test putting a simple blob into a pack and reading it out."""
filename = b"foo"
node = self.getHash(content)
revisions = [(filename, node, sha1nodeconstants.nullid, content)]
pack = self.createPack(revisions)
if self.paramsavailable:
self.assertEqual(
pack.params.fanoutprefix, basepack.SMALLFANOUTPREFIX
)
chain = pack.getdeltachain(filename, node)
self.assertEqual(content, chain[0][4])
def testAddSingle(self):
self._testAddSingle(b'')
def testAddSingleEmpty(self):
self._testAddSingle(b'abcdef')
def testAddMultiple(self):
"""Test putting multiple unrelated blobs into a pack and reading them
out.
"""
revisions = []
for i in range(10):
filename = b"foo%d" % i
content = b"abcdef%d" % i
node = self.getHash(content)
revisions.append((filename, node, self.getFakeHash(), content))
pack = self.createPack(revisions)
for filename, node, base, content in revisions:
entry = pack.getdelta(filename, node)
self.assertEqual((content, filename, base, {}), entry)
chain = pack.getdeltachain(filename, node)
self.assertEqual(content, chain[0][4])
def testAddDeltas(self):
"""Test putting multiple delta blobs into a pack and read the chain."""
revisions = []
filename = b"foo"
lastnode = sha1nodeconstants.nullid
for i in range(10):
content = b"abcdef%d" % i
node = self.getHash(content)
revisions.append((filename, node, lastnode, content))
lastnode = node
pack = self.createPack(revisions)
entry = pack.getdelta(filename, revisions[0][1])
realvalue = (revisions[0][3], filename, revisions[0][2], {})
self.assertEqual(entry, realvalue)
# Test that the chain for the final entry has all the others
chain = pack.getdeltachain(filename, node)
for i in range(10):
content = b"abcdef%d" % i
self.assertEqual(content, chain[-i - 1][4])
def testPackMany(self):
"""Pack many related and unrelated objects."""
# Build a random pack file
revisions = []
blobs = {}
random.seed(0)
for i in range(100):
filename = b"filename-%d" % i
filerevs = []
for j in range(random.randint(1, 100)):
content = b"content-%d" % j
node = self.getHash(content)
lastnode = sha1nodeconstants.nullid
if len(filerevs) > 0:
lastnode = filerevs[random.randint(0, len(filerevs) - 1)]
filerevs.append(node)
blobs[(filename, node, lastnode)] = content
revisions.append((filename, node, lastnode, content))
pack = self.createPack(revisions)
# Verify the pack contents
for (filename, node, lastnode), content in sorted(blobs.items()):
chain = pack.getdeltachain(filename, node)
for entry in chain:
expectedcontent = blobs[(entry[0], entry[1], entry[3])]
self.assertEqual(entry[4], expectedcontent)
def testPackMetadata(self):
revisions = []
for i in range(100):
filename = b'%d.txt' % i
content = b'put-something-here \n' * i
node = self.getHash(content)
meta = {
constants.METAKEYFLAG: i**4,
constants.METAKEYSIZE: len(content),
b'Z': b'random_string',
b'_': b'\0' * i,
}
revisions.append(
(filename, node, sha1nodeconstants.nullid, content, meta)
)
pack = self.createPack(revisions)
for name, node, x, content, origmeta in revisions:
parsedmeta = pack.getmeta(name, node)
# flag == 0 should be optimized out
if origmeta[constants.METAKEYFLAG] == 0:
del origmeta[constants.METAKEYFLAG]
self.assertEqual(parsedmeta, origmeta)
def testGetMissing(self):
"""Test the getmissing() api."""
revisions = []
filename = b"foo"
lastnode = sha1nodeconstants.nullid
for i in range(10):
content = b"abcdef%d" % i
node = self.getHash(content)
revisions.append((filename, node, lastnode, content))
lastnode = node
pack = self.createPack(revisions)
missing = pack.getmissing([(b"foo", revisions[0][1])])
self.assertFalse(missing)
missing = pack.getmissing(
[(b"foo", revisions[0][1]), (b"foo", revisions[1][1])]
)
self.assertFalse(missing)
fakenode = self.getFakeHash()
missing = pack.getmissing(
[(b"foo", revisions[0][1]), (b"foo", fakenode)]
)
self.assertEqual(missing, [(b"foo", fakenode)])
def testAddThrows(self):
pack = self.createPack()
try:
pack.add(b'filename', sha1nodeconstants.nullid, b'contents')
self.assertTrue(False, "datapack.add should throw")
except RuntimeError:
pass
def testBadVersionThrows(self):
pack = self.createPack()
path = pack.path + b'.datapack'
with open(path, 'rb') as f:
raw = f.read()
raw = struct.pack('!B', 255) + raw[1:]
os.chmod(path, os.stat(path).st_mode | stat.S_IWRITE)
with open(path, 'wb+') as f:
f.write(raw)
try:
self.datapackreader(pack.path)
self.assertTrue(False, "bad version number should have thrown")
except RuntimeError:
pass
def testMissingDeltabase(self):
fakenode = self.getFakeHash()
revisions = [(b"filename", fakenode, self.getFakeHash(), b"content")]
pack = self.createPack(revisions)
chain = pack.getdeltachain(b"filename", fakenode)
self.assertEqual(len(chain), 1)
def testLargePack(self):
"""Test creating and reading from a large pack with over X entries.
This causes it to use a 2^16 fanout table instead."""
revisions = []
blobs = {}
total = basepack.SMALLFANOUTCUTOFF + 1
for i in range(total):
filename = b"filename-%d" % i
content = filename
node = self.getHash(content)
blobs[(filename, node)] = content
revisions.append(
(filename, node, sha1nodeconstants.nullid, content)
)
pack = self.createPack(revisions)
if self.paramsavailable:
self.assertEqual(
pack.params.fanoutprefix, basepack.LARGEFANOUTPREFIX
)
for (filename, node), content in blobs.items():
actualcontent = pack.getdeltachain(filename, node)[0][4]
self.assertEqual(actualcontent, content)
def testPacksCache(self):
"""Test that we remember the most recent packs while fetching the delta
chain."""
packdir = self.makeTempDir()
deltachains = []
numpacks = 10
revisionsperpack = 100
for i in range(numpacks):
chain = []
revision = (
b'%d' % i,
self.getFakeHash(),
sha1nodeconstants.nullid,
b"content",
)
for _ in range(revisionsperpack):
chain.append(revision)
revision = (
b'%d' % i,
self.getFakeHash(),
revision[1],
self.getFakeHash(),
)
self.createPack(chain, packdir)
deltachains.append(chain)
class testdatapackstore(datapack.datapackstore):
# Ensures that we are not keeping everything in the cache.
DEFAULTCACHESIZE = numpacks // 2
store = testdatapackstore(uimod.ui(), packdir)
random.shuffle(deltachains)
for randomchain in deltachains:
revision = random.choice(randomchain)
chain = store.getdeltachain(revision[0], revision[1])
mostrecentpack = next(iter(store.packs), None)
self.assertEqual(
mostrecentpack.getdeltachain(revision[0], revision[1]), chain
)
self.assertEqual(randomchain.index(revision) + 1, len(chain))
# perf test off by default since it's slow
def _testIndexPerf(self):
random.seed(0)
print("Multi-get perf test")
packsizes = [
100,
10000,
100000,
500000,
1000000,
3000000,
]
lookupsizes = [
10,
100,
1000,
10000,
100000,
1000000,
]
for packsize in packsizes:
revisions = []
for i in range(packsize):
filename = b"filename-%d" % i
content = b"content-%d" % i
node = self.getHash(content)
revisions.append(
(filename, node, sha1nodeconstants.nullid, content)
)
path = self.createPack(revisions).path
# Perf of large multi-get
import gc
gc.disable()
pack = self.datapackreader(path)
for lookupsize in lookupsizes:
if lookupsize > packsize:
continue
random.shuffle(revisions)
findnodes = [(rev[0], rev[1]) for rev in revisions]
start = time.time()
pack.getmissing(findnodes[:lookupsize])
elapsed = time.time() - start
print(
"%s pack %d lookups = %0.04f"
% (
('%d' % packsize).rjust(7),
('%d' % lookupsize).rjust(7),
elapsed,
)
)
print("")
gc.enable()
# The perf test is meant to produce output, so we always fail the test
# so the user sees the output.
raise RuntimeError("perf test always fails")
class datapacktests(datapacktestsbase, unittest.TestCase):
def __init__(self, *args, **kwargs):
datapacktestsbase.__init__(self, datapack.datapack, True)
unittest.TestCase.__init__(self, *args, **kwargs)
# TODO:
# datapack store:
# - getmissing
# - GC two packs into one
if __name__ == '__main__':
if pycompat.iswindows:
sys.exit(80) # Skip on Windows
silenttestrunner.main(__name__)