remotefilelogserver.py
442 lines
| 14.1 KiB
| text/x-python
|
PythonLexer
Augie Fackler
|
r40530 | # remotefilelogserver.py - server logic for a remotefilelog server | ||
# | ||||
# Copyright 2013 Facebook, Inc. | ||||
# | ||||
# This software may be used and distributed according to the terms of the | ||||
# GNU General Public License version 2 or any later version. | ||||
from __future__ import absolute_import | ||||
import errno | ||||
import os | ||||
import stat | ||||
import time | ||||
Augie Fackler
|
r40542 | import zlib | ||
Augie Fackler
|
r40530 | |||
from mercurial.i18n import _ | ||||
Augie Fackler
|
r40538 | from mercurial.node import bin, hex, nullid | ||
Gregory Szorc
|
r43355 | from mercurial.pycompat import open | ||
Augie Fackler
|
r40530 | from mercurial import ( | ||
changegroup, | ||||
changelog, | ||||
context, | ||||
error, | ||||
extensions, | ||||
match, | ||||
Gregory Szorc
|
r43375 | pycompat, | ||
Augie Fackler
|
r40530 | store, | ||
streamclone, | ||||
util, | ||||
wireprotoserver, | ||||
wireprototypes, | ||||
wireprotov1server, | ||||
) | ||||
Augie Fackler
|
r43346 | from . import ( | ||
Augie Fackler
|
r40543 | constants, | ||
Augie Fackler
|
r40530 | shallowutil, | ||
) | ||||
_sshv1server = wireprotoserver.sshv1protocolhandler | ||||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r40530 | def setupserver(ui, repo): | ||
"""Sets up a normal Mercurial repo so it can serve files to shallow repos. | ||||
""" | ||||
onetimesetup(ui) | ||||
# don't send files to shallow clients during pulls | ||||
Augie Fackler
|
r43346 | def generatefiles( | ||
orig, self, changedfiles, linknodes, commonrevs, source, *args, **kwargs | ||||
): | ||||
Augie Fackler
|
r40530 | caps = self._bundlecaps or [] | ||
Augie Fackler
|
r40544 | if constants.BUNDLE2_CAPABLITY in caps: | ||
Augie Fackler
|
r40530 | # only send files that don't match the specified patterns | ||
includepattern = None | ||||
excludepattern = None | ||||
Augie Fackler
|
r43346 | for cap in self._bundlecaps or []: | ||
Augie Fackler
|
r43347 | if cap.startswith(b"includepattern="): | ||
includepattern = cap[len(b"includepattern=") :].split(b'\0') | ||||
elif cap.startswith(b"excludepattern="): | ||||
excludepattern = cap[len(b"excludepattern=") :].split(b'\0') | ||||
Augie Fackler
|
r40530 | |||
Martin von Zweigbergk
|
r41825 | m = match.always() | ||
Augie Fackler
|
r40530 | if includepattern or excludepattern: | ||
Augie Fackler
|
r43346 | m = match.match( | ||
Augie Fackler
|
r43347 | repo.root, b'', None, includepattern, excludepattern | ||
Augie Fackler
|
r43346 | ) | ||
Augie Fackler
|
r40530 | |||
changedfiles = list([f for f in changedfiles if not m(f)]) | ||||
Augie Fackler
|
r43346 | return orig( | ||
self, changedfiles, linknodes, commonrevs, source, *args, **kwargs | ||||
) | ||||
Augie Fackler
|
r40530 | |||
extensions.wrapfunction( | ||||
Augie Fackler
|
r43347 | changegroup.cgpacker, b'generatefiles', generatefiles | ||
Augie Fackler
|
r43346 | ) | ||
Augie Fackler
|
r40530 | |||
onetime = False | ||||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r40530 | def onetimesetup(ui): | ||
"""Configures the wireprotocol for both clients and servers. | ||||
""" | ||||
global onetime | ||||
if onetime: | ||||
return | ||||
onetime = True | ||||
# support file content requests | ||||
wireprotov1server.wireprotocommand( | ||||
Augie Fackler
|
r43347 | b'x_rfl_getflogheads', b'path', permission=b'pull' | ||
Augie Fackler
|
r43346 | )(getflogheads) | ||
Augie Fackler
|
r40530 | wireprotov1server.wireprotocommand( | ||
Augie Fackler
|
r43347 | b'x_rfl_getfiles', b'', permission=b'pull' | ||
)(getfiles) | ||||
wireprotov1server.wireprotocommand( | ||||
b'x_rfl_getfile', b'file node', permission=b'pull' | ||||
Augie Fackler
|
r43346 | )(getfile) | ||
Augie Fackler
|
r40530 | |||
class streamstate(object): | ||||
match = None | ||||
shallowremote = False | ||||
noflatmf = False | ||||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r40530 | state = streamstate() | ||
def stream_out_shallow(repo, proto, other): | ||||
includepattern = None | ||||
excludepattern = None | ||||
Augie Fackler
|
r43347 | raw = other.get(b'includepattern') | ||
Augie Fackler
|
r40530 | if raw: | ||
Augie Fackler
|
r43347 | includepattern = raw.split(b'\0') | ||
raw = other.get(b'excludepattern') | ||||
Augie Fackler
|
r40530 | if raw: | ||
Augie Fackler
|
r43347 | excludepattern = raw.split(b'\0') | ||
Augie Fackler
|
r40530 | |||
oldshallow = state.shallowremote | ||||
oldmatch = state.match | ||||
oldnoflatmf = state.noflatmf | ||||
try: | ||||
state.shallowremote = True | ||||
Martin von Zweigbergk
|
r41825 | state.match = match.always() | ||
Augie Fackler
|
r43347 | state.noflatmf = other.get(b'noflatmanifest') == b'True' | ||
Augie Fackler
|
r40530 | if includepattern or excludepattern: | ||
Augie Fackler
|
r43346 | state.match = match.match( | ||
Augie Fackler
|
r43347 | repo.root, b'', None, includepattern, excludepattern | ||
Augie Fackler
|
r43346 | ) | ||
Augie Fackler
|
r40530 | streamres = wireprotov1server.stream(repo, proto) | ||
# Force the first value to execute, so the file list is computed | ||||
# within the try/finally scope | ||||
first = next(streamres.gen) | ||||
second = next(streamres.gen) | ||||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r40530 | def gen(): | ||
yield first | ||||
yield second | ||||
for value in streamres.gen: | ||||
yield value | ||||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r40530 | return wireprototypes.streamres(gen()) | ||
finally: | ||||
state.shallowremote = oldshallow | ||||
state.match = oldmatch | ||||
state.noflatmf = oldnoflatmf | ||||
Augie Fackler
|
r43347 | wireprotov1server.commands[b'stream_out_shallow'] = ( | ||
stream_out_shallow, | ||||
b'*', | ||||
) | ||||
Augie Fackler
|
r40530 | |||
# don't clone filelogs to shallow clients | ||||
Pulkit Goyal
|
r40550 | def _walkstreamfiles(orig, repo, matcher=None): | ||
Augie Fackler
|
r40530 | if state.shallowremote: | ||
# if we are shallow ourselves, stream our local commits | ||||
Pulkit Goyal
|
r40549 | if shallowutil.isenabled(repo): | ||
Augie Fackler
|
r40530 | striplen = len(repo.store.path) + 1 | ||
readdir = repo.store.rawvfs.readdir | ||||
Augie Fackler
|
r43347 | visit = [os.path.join(repo.store.path, b'data')] | ||
Augie Fackler
|
r40530 | while visit: | ||
p = visit.pop() | ||||
for f, kind, st in readdir(p, stat=True): | ||||
Augie Fackler
|
r43347 | fp = p + b'/' + f | ||
Augie Fackler
|
r40530 | if kind == stat.S_IFREG: | ||
Augie Fackler
|
r43347 | if not fp.endswith(b'.i') and not fp.endswith( | ||
b'.d' | ||||
): | ||||
Augie Fackler
|
r40530 | n = util.pconvert(fp[striplen:]) | ||
yield (store.decodedir(n), n, st.st_size) | ||||
if kind == stat.S_IFDIR: | ||||
visit.append(fp) | ||||
Augie Fackler
|
r43347 | if b'treemanifest' in repo.requirements: | ||
Augie Fackler
|
r40530 | for (u, e, s) in repo.store.datafiles(): | ||
Augie Fackler
|
r43347 | if u.startswith(b'meta/') and ( | ||
u.endswith(b'.i') or u.endswith(b'.d') | ||||
Augie Fackler
|
r43346 | ): | ||
Augie Fackler
|
r40530 | yield (u, e, s) | ||
# Return .d and .i files that do not match the shallow pattern | ||||
match = state.match | ||||
if match and not match.always(): | ||||
for (u, e, s) in repo.store.datafiles(): | ||||
f = u[5:-2] # trim data/... and .i/.d | ||||
if not state.match(f): | ||||
yield (u, e, s) | ||||
for x in repo.store.topfiles(): | ||||
Augie Fackler
|
r43347 | if state.noflatmf and x[0][:11] == b'00manifest.': | ||
Augie Fackler
|
r40530 | continue | ||
yield x | ||||
Pulkit Goyal
|
r40549 | elif shallowutil.isenabled(repo): | ||
Augie Fackler
|
r40530 | # don't allow cloning from a shallow repo to a full repo | ||
# since it would require fetching every version of every | ||||
# file in order to create the revlogs. | ||||
Augie Fackler
|
r43346 | raise error.Abort( | ||
Martin von Zweigbergk
|
r43387 | _(b"Cannot clone from a shallow repo to a full repo.") | ||
Augie Fackler
|
r43346 | ) | ||
Augie Fackler
|
r40530 | else: | ||
Pulkit Goyal
|
r40550 | for x in orig(repo, matcher): | ||
Augie Fackler
|
r40530 | yield x | ||
Augie Fackler
|
r43347 | extensions.wrapfunction(streamclone, b'_walkstreamfiles', _walkstreamfiles) | ||
Augie Fackler
|
r40530 | |||
# expose remotefilelog capabilities | ||||
def _capabilities(orig, repo, proto): | ||||
caps = orig(repo, proto) | ||||
Augie Fackler
|
r43346 | if shallowutil.isenabled(repo) or ui.configbool( | ||
Augie Fackler
|
r43347 | b'remotefilelog', b'server' | ||
Augie Fackler
|
r43346 | ): | ||
Augie Fackler
|
r40530 | if isinstance(proto, _sshv1server): | ||
# legacy getfiles method which only works over ssh | ||||
Augie Fackler
|
r40543 | caps.append(constants.NETWORK_CAP_LEGACY_SSH_GETFILES) | ||
Augie Fackler
|
r43347 | caps.append(b'x_rfl_getflogheads') | ||
caps.append(b'x_rfl_getfile') | ||||
Augie Fackler
|
r40530 | return caps | ||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r43347 | extensions.wrapfunction(wireprotov1server, b'_capabilities', _capabilities) | ||
Augie Fackler
|
r40530 | |||
def _adjustlinkrev(orig, self, *args, **kwargs): | ||||
# When generating file blobs, taking the real path is too slow on large | ||||
# repos, so force it to just return the linkrev directly. | ||||
repo = self._repo | ||||
Augie Fackler
|
r43347 | if util.safehasattr(repo, b'forcelinkrev') and repo.forcelinkrev: | ||
Augie Fackler
|
r40530 | return self._filelog.linkrev(self._filelog.rev(self._filenode)) | ||
return orig(self, *args, **kwargs) | ||||
extensions.wrapfunction( | ||||
Augie Fackler
|
r43347 | context.basefilectx, b'_adjustlinkrev', _adjustlinkrev | ||
Augie Fackler
|
r43346 | ) | ||
Augie Fackler
|
r40530 | |||
def _iscmd(orig, cmd): | ||||
Augie Fackler
|
r43347 | if cmd == b'x_rfl_getfiles': | ||
Augie Fackler
|
r40530 | return False | ||
return orig(cmd) | ||||
Augie Fackler
|
r43347 | extensions.wrapfunction(wireprotoserver, b'iscmd', _iscmd) | ||
Augie Fackler
|
r40530 | |||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r40530 | def _loadfileblob(repo, cachepath, path, node): | ||
filecachepath = os.path.join(cachepath, path, hex(node)) | ||||
if not os.path.exists(filecachepath) or os.path.getsize(filecachepath) == 0: | ||||
filectx = repo.filectx(path, fileid=node) | ||||
if filectx.node() == nullid: | ||||
repo.changelog = changelog.changelog(repo.svfs) | ||||
filectx = repo.filectx(path, fileid=node) | ||||
text = createfileblob(filectx) | ||||
Augie Fackler
|
r40542 | # TODO configurable compression engines | ||
text = zlib.compress(text) | ||||
Augie Fackler
|
r40530 | |||
# everything should be user & group read/writable | ||||
oldumask = os.umask(0o002) | ||||
try: | ||||
dirname = os.path.dirname(filecachepath) | ||||
if not os.path.exists(dirname): | ||||
try: | ||||
os.makedirs(dirname) | ||||
except OSError as ex: | ||||
if ex.errno != errno.EEXIST: | ||||
raise | ||||
f = None | ||||
try: | ||||
Augie Fackler
|
r43347 | f = util.atomictempfile(filecachepath, b"wb") | ||
Augie Fackler
|
r40530 | f.write(text) | ||
except (IOError, OSError): | ||||
# Don't abort if the user only has permission to read, | ||||
# and not write. | ||||
pass | ||||
finally: | ||||
if f: | ||||
f.close() | ||||
finally: | ||||
os.umask(oldumask) | ||||
else: | ||||
Augie Fackler
|
r43347 | with open(filecachepath, b"rb") as f: | ||
Augie Fackler
|
r40530 | text = f.read() | ||
return text | ||||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r40530 | def getflogheads(repo, proto, path): | ||
"""A server api for requesting a filelog's heads | ||||
""" | ||||
flog = repo.file(path) | ||||
heads = flog.heads() | ||||
Augie Fackler
|
r43347 | return b'\n'.join((hex(head) for head in heads if head != nullid)) | ||
Augie Fackler
|
r40530 | |||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r40530 | def getfile(repo, proto, file, node): | ||
"""A server api for requesting a particular version of a file. Can be used | ||||
in batches to request many files at once. The return protocol is: | ||||
<errorcode>\0<data/errormsg> where <errorcode> is 0 for success or | ||||
non-zero for an error. | ||||
data is a compressed blob with revlog flag and ancestors information. See | ||||
createfileblob for its content. | ||||
""" | ||||
Pulkit Goyal
|
r40549 | if shallowutil.isenabled(repo): | ||
Augie Fackler
|
r43347 | return b'1\0' + _(b'cannot fetch remote files from shallow repo') | ||
cachepath = repo.ui.config(b"remotefilelog", b"servercachepath") | ||||
Augie Fackler
|
r40530 | if not cachepath: | ||
Augie Fackler
|
r43347 | cachepath = os.path.join(repo.path, b"remotefilelogcache") | ||
Augie Fackler
|
r40530 | node = bin(node.strip()) | ||
if node == nullid: | ||||
Augie Fackler
|
r43347 | return b'0\0' | ||
return b'0\0' + _loadfileblob(repo, cachepath, file, node) | ||||
Augie Fackler
|
r40530 | |||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r40530 | def getfiles(repo, proto): | ||
"""A server api for requesting particular versions of particular files. | ||||
""" | ||||
Pulkit Goyal
|
r40549 | if shallowutil.isenabled(repo): | ||
Augie Fackler
|
r43347 | raise error.Abort(_(b'cannot fetch remote files from shallow repo')) | ||
Augie Fackler
|
r40530 | if not isinstance(proto, _sshv1server): | ||
Augie Fackler
|
r43347 | raise error.Abort(_(b'cannot fetch remote files over non-ssh protocol')) | ||
Augie Fackler
|
r40530 | |||
def streamer(): | ||||
fin = proto._fin | ||||
Augie Fackler
|
r43347 | cachepath = repo.ui.config(b"remotefilelog", b"servercachepath") | ||
Augie Fackler
|
r40530 | if not cachepath: | ||
Augie Fackler
|
r43347 | cachepath = os.path.join(repo.path, b"remotefilelogcache") | ||
Augie Fackler
|
r40530 | |||
while True: | ||||
request = fin.readline()[:-1] | ||||
if not request: | ||||
break | ||||
node = bin(request[:40]) | ||||
if node == nullid: | ||||
Augie Fackler
|
r43347 | yield b'0\n' | ||
Augie Fackler
|
r40530 | continue | ||
path = request[40:] | ||||
text = _loadfileblob(repo, cachepath, path, node) | ||||
Augie Fackler
|
r43347 | yield b'%d\n%s' % (len(text), text) | ||
Augie Fackler
|
r40530 | |||
# it would be better to only flush after processing a whole batch | ||||
# but currently we don't know if there are more requests coming | ||||
proto._fout.flush() | ||||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r40530 | return wireprototypes.streamres(streamer()) | ||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r40530 | def createfileblob(filectx): | ||
""" | ||||
format: | ||||
v0: | ||||
str(len(rawtext)) + '\0' + rawtext + ancestortext | ||||
v1: | ||||
'v1' + '\n' + metalist + '\0' + rawtext + ancestortext | ||||
metalist := metalist + '\n' + meta | meta | ||||
meta := sizemeta | flagmeta | ||||
sizemeta := METAKEYSIZE + str(len(rawtext)) | ||||
flagmeta := METAKEYFLAG + str(flag) | ||||
note: sizemeta must exist. METAKEYFLAG and METAKEYSIZE must have a | ||||
length of 1. | ||||
""" | ||||
flog = filectx.filelog() | ||||
frev = filectx.filerev() | ||||
revlogflags = flog._revlog.flags(frev) | ||||
if revlogflags == 0: | ||||
# normal files | ||||
text = filectx.data() | ||||
else: | ||||
# lfs, read raw revision data | ||||
r43039 | text = flog.rawdata(frev) | |||
Augie Fackler
|
r40530 | |||
repo = filectx._repo | ||||
ancestors = [filectx] | ||||
try: | ||||
repo.forcelinkrev = True | ||||
ancestors.extend([f for f in filectx.ancestors()]) | ||||
Augie Fackler
|
r43347 | ancestortext = b"" | ||
Augie Fackler
|
r40530 | for ancestorctx in ancestors: | ||
parents = ancestorctx.parents() | ||||
p1 = nullid | ||||
p2 = nullid | ||||
if len(parents) > 0: | ||||
p1 = parents[0].filenode() | ||||
if len(parents) > 1: | ||||
p2 = parents[1].filenode() | ||||
Augie Fackler
|
r43347 | copyname = b"" | ||
Augie Fackler
|
r40530 | rename = ancestorctx.renamed() | ||
if rename: | ||||
copyname = rename[0] | ||||
linknode = ancestorctx.node() | ||||
Augie Fackler
|
r43347 | ancestortext += b"%s%s%s%s%s\0" % ( | ||
Augie Fackler
|
r43346 | ancestorctx.filenode(), | ||
p1, | ||||
p2, | ||||
linknode, | ||||
copyname, | ||||
) | ||||
Augie Fackler
|
r40530 | finally: | ||
repo.forcelinkrev = False | ||||
header = shallowutil.buildfileblobheader(len(text), revlogflags) | ||||
Augie Fackler
|
r43347 | return b"%s\0%s%s" % (header, text, ancestortext) | ||
Augie Fackler
|
r40530 | |||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r40530 | def gcserver(ui, repo): | ||
Augie Fackler
|
r43347 | if not repo.ui.configbool(b"remotefilelog", b"server"): | ||
Augie Fackler
|
r40530 | return | ||
neededfiles = set() | ||||
Augie Fackler
|
r43347 | heads = repo.revs(b"heads(tip~25000:) - null") | ||
Augie Fackler
|
r40530 | |||
Augie Fackler
|
r43347 | cachepath = repo.vfs.join(b"remotefilelogcache") | ||
Augie Fackler
|
r40530 | for head in heads: | ||
mf = repo[head].manifest() | ||||
Gregory Szorc
|
r43375 | for filename, filenode in pycompat.iteritems(mf): | ||
Augie Fackler
|
r40530 | filecachepath = os.path.join(cachepath, filename, hex(filenode)) | ||
neededfiles.add(filecachepath) | ||||
# delete unneeded older files | ||||
Augie Fackler
|
r43347 | days = repo.ui.configint(b"remotefilelog", b"serverexpiration") | ||
Augie Fackler
|
r40530 | expiration = time.time() - (days * 24 * 60 * 60) | ||
Augie Fackler
|
r43347 | progress = ui.makeprogress(_(b"removing old server cache"), unit=b"files") | ||
Martin von Zweigbergk
|
r40877 | progress.update(0) | ||
Augie Fackler
|
r40530 | for root, dirs, files in os.walk(cachepath): | ||
for file in files: | ||||
filepath = os.path.join(root, file) | ||||
Martin von Zweigbergk
|
r40877 | progress.increment() | ||
Augie Fackler
|
r40530 | if filepath in neededfiles: | ||
continue | ||||
stat = os.stat(filepath) | ||||
if stat.st_mtime < expiration: | ||||
os.remove(filepath) | ||||
Martin von Zweigbergk
|
r40877 | progress.complete() | ||