basestore.py
461 lines
| 15.9 KiB
| text/x-python
|
PythonLexer
Augie Fackler
|
r40530 | from __future__ import absolute_import | ||
import errno | ||||
import os | ||||
import shutil | ||||
import stat | ||||
import time | ||||
from mercurial.i18n import _ | ||||
from mercurial.node import bin, hex | ||||
Gregory Szorc
|
r43355 | from mercurial.pycompat import open | ||
Augie Fackler
|
r40530 | from mercurial import ( | ||
error, | ||||
pycompat, | ||||
util, | ||||
) | ||||
Augie Fackler
|
r44519 | from mercurial.utils import hashutil | ||
Augie Fackler
|
r40530 | from . import ( | ||
constants, | ||||
shallowutil, | ||||
) | ||||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r40530 | class basestore(object): | ||
def __init__(self, repo, path, reponame, shared=False): | ||||
"""Creates a remotefilelog store object for the given repo name. | ||||
`path` - The file path where this store keeps its data | ||||
`reponame` - The name of the repo. This is used to partition data from | ||||
many repos. | ||||
`shared` - True if this store is a shared cache of data from the central | ||||
server, for many repos on this machine. False means this store is for | ||||
the local data for one repo. | ||||
""" | ||||
self.repo = repo | ||||
self.ui = repo.ui | ||||
self._path = path | ||||
self._reponame = reponame | ||||
self._shared = shared | ||||
self._uid = os.getuid() if not pycompat.iswindows else None | ||||
Augie Fackler
|
r43346 | self._validatecachelog = self.ui.config( | ||
Augie Fackler
|
r43347 | b"remotefilelog", b"validatecachelog" | ||
Augie Fackler
|
r43346 | ) | ||
self._validatecache = self.ui.config( | ||||
Augie Fackler
|
r43347 | b"remotefilelog", b"validatecache", b'on' | ||
Augie Fackler
|
r43346 | ) | ||
Augie Fackler
|
r43347 | if self._validatecache not in (b'on', b'strict', b'off'): | ||
self._validatecache = b'on' | ||||
if self._validatecache == b'off': | ||||
Augie Fackler
|
r40530 | self._validatecache = False | ||
if shared: | ||||
shallowutil.mkstickygroupdir(self.ui, path) | ||||
def getmissing(self, keys): | ||||
missing = [] | ||||
for name, node in keys: | ||||
filepath = self._getfilepath(name, node) | ||||
exists = os.path.exists(filepath) | ||||
Augie Fackler
|
r43346 | if ( | ||
exists | ||||
Augie Fackler
|
r43347 | and self._validatecache == b'strict' | ||
and not self._validatekey(filepath, b'contains') | ||||
Augie Fackler
|
r43346 | ): | ||
Augie Fackler
|
r40530 | exists = False | ||
if not exists: | ||||
missing.append((name, node)) | ||||
return missing | ||||
# BELOW THIS ARE IMPLEMENTATIONS OF REPACK SOURCE | ||||
def markledger(self, ledger, options=None): | ||||
if options and options.get(constants.OPTION_PACKSONLY): | ||||
return | ||||
if self._shared: | ||||
for filename, nodes in self._getfiles(): | ||||
for node in nodes: | ||||
ledger.markdataentry(self, filename, node) | ||||
ledger.markhistoryentry(self, filename, node) | ||||
def cleanup(self, ledger): | ||||
ui = self.ui | ||||
entries = ledger.sources.get(self, []) | ||||
count = 0 | ||||
Augie Fackler
|
r43346 | progress = ui.makeprogress( | ||
Augie Fackler
|
r43347 | _(b"cleaning up"), unit=b"files", total=len(entries) | ||
Augie Fackler
|
r43346 | ) | ||
Augie Fackler
|
r40530 | for entry in entries: | ||
if entry.gced or (entry.datarepacked and entry.historyrepacked): | ||||
Martin von Zweigbergk
|
r40876 | progress.update(count) | ||
Augie Fackler
|
r40530 | path = self._getfilepath(entry.filename, entry.node) | ||
util.tryunlink(path) | ||||
count += 1 | ||||
Martin von Zweigbergk
|
r40876 | progress.complete() | ||
Augie Fackler
|
r40530 | |||
# Clean up the repo cache directory. | ||||
self._cleanupdirectory(self._getrepocachepath()) | ||||
# BELOW THIS ARE NON-STANDARD APIS | ||||
def _cleanupdirectory(self, rootdir): | ||||
"""Removes the empty directories and unnecessary files within the root | ||||
directory recursively. Note that this method does not remove the root | ||||
directory itself. """ | ||||
oldfiles = set() | ||||
otherfiles = set() | ||||
# osutil.listdir returns stat information which saves some rmdir/listdir | ||||
# syscalls. | ||||
for name, mode in util.osutil.listdir(rootdir): | ||||
if stat.S_ISDIR(mode): | ||||
dirpath = os.path.join(rootdir, name) | ||||
self._cleanupdirectory(dirpath) | ||||
# Now that the directory specified by dirpath is potentially | ||||
# empty, try and remove it. | ||||
try: | ||||
os.rmdir(dirpath) | ||||
except OSError: | ||||
pass | ||||
elif stat.S_ISREG(mode): | ||||
Augie Fackler
|
r43347 | if name.endswith(b'_old'): | ||
Augie Fackler
|
r40530 | oldfiles.add(name[:-4]) | ||
else: | ||||
otherfiles.add(name) | ||||
# Remove the files which end with suffix '_old' and have no | ||||
# corresponding file without the suffix '_old'. See addremotefilelognode | ||||
# method for the generation/purpose of files with '_old' suffix. | ||||
for filename in oldfiles - otherfiles: | ||||
Augie Fackler
|
r43347 | filepath = os.path.join(rootdir, filename + b'_old') | ||
Augie Fackler
|
r40530 | util.tryunlink(filepath) | ||
def _getfiles(self): | ||||
"""Return a list of (filename, [node,...]) for all the revisions that | ||||
exist in the store. | ||||
This is useful for obtaining a list of all the contents of the store | ||||
when performing a repack to another store, since the store API requires | ||||
name+node keys and not namehash+node keys. | ||||
""" | ||||
existing = {} | ||||
for filenamehash, node in self._listkeys(): | ||||
existing.setdefault(filenamehash, []).append(node) | ||||
filenamemap = self._resolvefilenames(existing.keys()) | ||||
Gregory Szorc
|
r43375 | for filename, sha in pycompat.iteritems(filenamemap): | ||
Augie Fackler
|
r40530 | yield (filename, existing[sha]) | ||
def _resolvefilenames(self, hashes): | ||||
"""Given a list of filename hashes that are present in the | ||||
remotefilelog store, return a mapping from filename->hash. | ||||
This is useful when converting remotefilelog blobs into other storage | ||||
formats. | ||||
""" | ||||
if not hashes: | ||||
return {} | ||||
filenames = {} | ||||
missingfilename = set(hashes) | ||||
# Start with a full manifest, since it'll cover the majority of files | ||||
Augie Fackler
|
r43347 | for filename in self.repo[b'tip'].manifest(): | ||
Augie Fackler
|
r44519 | sha = hashutil.sha1(filename).digest() | ||
Augie Fackler
|
r40530 | if sha in missingfilename: | ||
filenames[filename] = sha | ||||
missingfilename.discard(sha) | ||||
# Scan the changelog until we've found every file name | ||||
cl = self.repo.unfiltered().changelog | ||||
for rev in pycompat.xrange(len(cl) - 1, -1, -1): | ||||
if not missingfilename: | ||||
break | ||||
files = cl.readfiles(cl.node(rev)) | ||||
for filename in files: | ||||
Augie Fackler
|
r44519 | sha = hashutil.sha1(filename).digest() | ||
Augie Fackler
|
r40530 | if sha in missingfilename: | ||
filenames[filename] = sha | ||||
missingfilename.discard(sha) | ||||
return filenames | ||||
def _getrepocachepath(self): | ||||
Augie Fackler
|
r43346 | return ( | ||
os.path.join(self._path, self._reponame) | ||||
if self._shared | ||||
else self._path | ||||
) | ||||
Augie Fackler
|
r40530 | |||
def _listkeys(self): | ||||
"""List all the remotefilelog keys that exist in the store. | ||||
Returns a iterator of (filename hash, filecontent hash) tuples. | ||||
""" | ||||
for root, dirs, files in os.walk(self._getrepocachepath()): | ||||
for filename in files: | ||||
if len(filename) != 40: | ||||
continue | ||||
node = filename | ||||
if self._shared: | ||||
# .../1a/85ffda..be21 | ||||
filenamehash = root[-41:-39] + root[-38:] | ||||
else: | ||||
filenamehash = root[-40:] | ||||
yield (bin(filenamehash), bin(node)) | ||||
def _getfilepath(self, name, node): | ||||
node = hex(node) | ||||
if self._shared: | ||||
key = shallowutil.getcachekey(self._reponame, name, node) | ||||
else: | ||||
key = shallowutil.getlocalkey(name, node) | ||||
return os.path.join(self._path, key) | ||||
def _getdata(self, name, node): | ||||
filepath = self._getfilepath(name, node) | ||||
try: | ||||
data = shallowutil.readfile(filepath) | ||||
if self._validatecache and not self._validatedata(data, filepath): | ||||
if self._validatecachelog: | ||||
Inada Naoki
|
r44560 | with open(self._validatecachelog, b'ab+') as f: | ||
Augie Fackler
|
r43347 | f.write(b"corrupt %s during read\n" % filepath) | ||
os.rename(filepath, filepath + b".corrupt") | ||||
raise KeyError(b"corrupt local cache file %s" % filepath) | ||||
Augie Fackler
|
r40530 | except IOError: | ||
Augie Fackler
|
r43346 | raise KeyError( | ||
Augie Fackler
|
r43347 | b"no file found at %s for %s:%s" % (filepath, name, hex(node)) | ||
Augie Fackler
|
r43346 | ) | ||
Augie Fackler
|
r40530 | |||
return data | ||||
def addremotefilelognode(self, name, node, data): | ||||
filepath = self._getfilepath(name, node) | ||||
oldumask = os.umask(0o002) | ||||
try: | ||||
# if this node already exists, save the old version for | ||||
# recovery/debugging purposes. | ||||
if os.path.exists(filepath): | ||||
Augie Fackler
|
r43347 | newfilename = filepath + b'_old' | ||
Augie Fackler
|
r40530 | # newfilename can be read-only and shutil.copy will fail. | ||
# Delete newfilename to avoid it | ||||
if os.path.exists(newfilename): | ||||
shallowutil.unlinkfile(newfilename) | ||||
shutil.copy(filepath, newfilename) | ||||
shallowutil.mkstickygroupdir(self.ui, os.path.dirname(filepath)) | ||||
shallowutil.writefile(filepath, data, readonly=True) | ||||
if self._validatecache: | ||||
Augie Fackler
|
r43347 | if not self._validatekey(filepath, b'write'): | ||
Augie Fackler
|
r43346 | raise error.Abort( | ||
Augie Fackler
|
r43347 | _(b"local cache write was corrupted %s") % filepath | ||
Augie Fackler
|
r43346 | ) | ||
Augie Fackler
|
r40530 | finally: | ||
os.umask(oldumask) | ||||
def markrepo(self, path): | ||||
"""Call this to add the given repo path to the store's list of | ||||
repositories that are using it. This is useful later when doing garbage | ||||
collection, since it allows us to insecpt the repos to see what nodes | ||||
they want to be kept alive in the store. | ||||
""" | ||||
Augie Fackler
|
r43347 | repospath = os.path.join(self._path, b"repos") | ||
with open(repospath, b'ab') as reposfile: | ||||
reposfile.write(os.path.dirname(path) + b"\n") | ||||
Augie Fackler
|
r40530 | |||
repospathstat = os.stat(repospath) | ||||
if repospathstat.st_uid == self._uid: | ||||
os.chmod(repospath, 0o0664) | ||||
def _validatekey(self, path, action): | ||||
Augie Fackler
|
r43347 | with open(path, b'rb') as f: | ||
Augie Fackler
|
r40530 | data = f.read() | ||
if self._validatedata(data, path): | ||||
return True | ||||
if self._validatecachelog: | ||||
Augie Fackler
|
r43347 | with open(self._validatecachelog, b'ab+') as f: | ||
f.write(b"corrupt %s during %s\n" % (path, action)) | ||||
Augie Fackler
|
r40530 | |||
Augie Fackler
|
r43347 | os.rename(path, path + b".corrupt") | ||
Augie Fackler
|
r40530 | return False | ||
def _validatedata(self, data, path): | ||||
try: | ||||
if len(data) > 0: | ||||
# see remotefilelogserver.createfileblob for the format | ||||
offset, size, flags = shallowutil.parsesizeflags(data) | ||||
if len(data) <= size: | ||||
# it is truncated | ||||
return False | ||||
# extract the node from the metadata | ||||
offset += size | ||||
Augie Fackler
|
r43346 | datanode = data[offset : offset + 20] | ||
Augie Fackler
|
r40530 | |||
# and compare against the path | ||||
if os.path.basename(path) == hex(datanode): | ||||
# Content matches the intended path | ||||
return True | ||||
return False | ||||
except (ValueError, RuntimeError): | ||||
pass | ||||
return False | ||||
def gc(self, keepkeys): | ||||
ui = self.ui | ||||
cachepath = self._path | ||||
# prune cache | ||||
Augie Fackler
|
r41302 | queue = pycompat.queue.PriorityQueue() | ||
Augie Fackler
|
r40530 | originalsize = 0 | ||
size = 0 | ||||
count = 0 | ||||
removed = 0 | ||||
# keep files newer than a day even if they aren't needed | ||||
limit = time.time() - (60 * 60 * 24) | ||||
Augie Fackler
|
r43346 | progress = ui.makeprogress( | ||
Augie Fackler
|
r43347 | _(b"removing unnecessary files"), unit=b"files" | ||
Augie Fackler
|
r43346 | ) | ||
Martin von Zweigbergk
|
r40876 | progress.update(0) | ||
Augie Fackler
|
r40530 | for root, dirs, files in os.walk(cachepath): | ||
for file in files: | ||||
Augie Fackler
|
r43347 | if file == b'repos': | ||
Augie Fackler
|
r40530 | continue | ||
# Don't delete pack files | ||||
Augie Fackler
|
r43347 | if b'/packs/' in root: | ||
Augie Fackler
|
r40530 | continue | ||
Martin von Zweigbergk
|
r40876 | progress.update(count) | ||
Augie Fackler
|
r40530 | path = os.path.join(root, file) | ||
key = os.path.relpath(path, cachepath) | ||||
count += 1 | ||||
try: | ||||
pathstat = os.stat(path) | ||||
except OSError as e: | ||||
# errno.ENOENT = no such file or directory | ||||
if e.errno != errno.ENOENT: | ||||
raise | ||||
Augie Fackler
|
r43347 | msg = _( | ||
b"warning: file %s was removed by another process\n" | ||||
) | ||||
Augie Fackler
|
r40530 | ui.warn(msg % path) | ||
continue | ||||
originalsize += pathstat.st_size | ||||
if key in keepkeys or pathstat.st_atime > limit: | ||||
queue.put((pathstat.st_atime, path, pathstat)) | ||||
size += pathstat.st_size | ||||
else: | ||||
try: | ||||
shallowutil.unlinkfile(path) | ||||
except OSError as e: | ||||
# errno.ENOENT = no such file or directory | ||||
if e.errno != errno.ENOENT: | ||||
raise | ||||
Augie Fackler
|
r43346 | msg = _( | ||
Augie Fackler
|
r43347 | b"warning: file %s was removed by another " | ||
b"process\n" | ||||
Augie Fackler
|
r43346 | ) | ||
Augie Fackler
|
r40530 | ui.warn(msg % path) | ||
continue | ||||
removed += 1 | ||||
Martin von Zweigbergk
|
r40876 | progress.complete() | ||
Augie Fackler
|
r40530 | |||
# remove oldest files until under limit | ||||
Augie Fackler
|
r43347 | limit = ui.configbytes(b"remotefilelog", b"cachelimit") | ||
Augie Fackler
|
r40530 | if size > limit: | ||
excess = size - limit | ||||
Augie Fackler
|
r43346 | progress = ui.makeprogress( | ||
Augie Fackler
|
r43347 | _(b"enforcing cache limit"), unit=b"bytes", total=excess | ||
Augie Fackler
|
r43346 | ) | ||
Augie Fackler
|
r40530 | removedexcess = 0 | ||
while queue and size > limit and size > 0: | ||||
Martin von Zweigbergk
|
r40876 | progress.update(removedexcess) | ||
Augie Fackler
|
r40530 | atime, oldpath, oldpathstat = queue.get() | ||
try: | ||||
shallowutil.unlinkfile(oldpath) | ||||
except OSError as e: | ||||
# errno.ENOENT = no such file or directory | ||||
if e.errno != errno.ENOENT: | ||||
raise | ||||
Augie Fackler
|
r43347 | msg = _( | ||
b"warning: file %s was removed by another process\n" | ||||
) | ||||
Augie Fackler
|
r40530 | ui.warn(msg % oldpath) | ||
size -= oldpathstat.st_size | ||||
removed += 1 | ||||
removedexcess += oldpathstat.st_size | ||||
Martin von Zweigbergk
|
r40876 | progress.complete() | ||
Augie Fackler
|
r40530 | |||
Augie Fackler
|
r43346 | ui.status( | ||
Augie Fackler
|
r43347 | _(b"finished: removed %d of %d files (%0.2f GB to %0.2f GB)\n") | ||
Augie Fackler
|
r43346 | % ( | ||
removed, | ||||
count, | ||||
float(originalsize) / 1024.0 / 1024.0 / 1024.0, | ||||
float(size) / 1024.0 / 1024.0 / 1024.0, | ||||
) | ||||
) | ||||
Augie Fackler
|
r40530 | |||
class baseunionstore(object): | ||||
def __init__(self, *args, **kwargs): | ||||
# If one of the functions that iterates all of the stores is about to | ||||
# throw a KeyError, try this many times with a full refresh between | ||||
# attempts. A repack operation may have moved data from one store to | ||||
# another while we were running. | ||||
Augie Fackler
|
r43906 | self.numattempts = kwargs.get('numretries', 0) + 1 | ||
Augie Fackler
|
r40530 | # If not-None, call this function on every retry and if the attempts are | ||
# exhausted. | ||||
Augie Fackler
|
r43906 | self.retrylog = kwargs.get('retrylog', None) | ||
Augie Fackler
|
r40530 | |||
def markforrefresh(self): | ||||
for store in self.stores: | ||||
Augie Fackler
|
r43347 | if util.safehasattr(store, b'markforrefresh'): | ||
Augie Fackler
|
r40530 | store.markforrefresh() | ||
@staticmethod | ||||
def retriable(fn): | ||||
def noop(*args): | ||||
pass | ||||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r40530 | def wrapped(self, *args, **kwargs): | ||
retrylog = self.retrylog or noop | ||||
funcname = fn.__name__ | ||||
Augie Fackler
|
r41642 | i = 0 | ||
while i < self.numattempts: | ||||
Augie Fackler
|
r40530 | if i > 0: | ||
Martin von Zweigbergk
|
r44126 | retrylog( | ||
b're-attempting (n=%d) %s\n' | ||||
% (i, pycompat.sysbytes(funcname)) | ||||
) | ||||
Augie Fackler
|
r40530 | self.markforrefresh() | ||
Augie Fackler
|
r41642 | i += 1 | ||
Augie Fackler
|
r40530 | try: | ||
return fn(self, *args, **kwargs) | ||||
except KeyError: | ||||
Augie Fackler
|
r41642 | if i == self.numattempts: | ||
# retries exhausted | ||||
Augie Fackler
|
r43346 | retrylog( | ||
Augie Fackler
|
r43347 | b'retries exhausted in %s, raising KeyError\n' | ||
Augie Fackler
|
r43346 | % pycompat.sysbytes(funcname) | ||
) | ||||
Augie Fackler
|
r41642 | raise | ||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r40530 | return wrapped | ||