fileserverclient.py
670 lines
| 22.2 KiB
| text/x-python
|
PythonLexer
Augie Fackler
|
r40530 | # fileserverclient.py - client for communicating with the cache process | ||
# | ||||
# Copyright 2013 Facebook, Inc. | ||||
# | ||||
# This software may be used and distributed according to the terms of the | ||||
# GNU General Public License version 2 or any later version. | ||||
import io | ||||
import os | ||||
import threading | ||||
import time | ||||
Augie Fackler
|
r40542 | import zlib | ||
Augie Fackler
|
r40530 | |||
from mercurial.i18n import _ | ||||
Joerg Sonnenberger
|
r47771 | from mercurial.node import bin, hex | ||
Augie Fackler
|
r40530 | from mercurial import ( | ||
error, | ||||
Pulkit Goyal
|
r40646 | pycompat, | ||
Augie Fackler
|
r40530 | revlog, | ||
sshpeer, | ||||
util, | ||||
wireprotov1peer, | ||||
) | ||||
Augie Fackler
|
r44519 | from mercurial.utils import ( | ||
hashutil, | ||||
procutil, | ||||
) | ||||
Augie Fackler
|
r40530 | |||
from . import ( | ||||
constants, | ||||
contentstore, | ||||
metadatastore, | ||||
) | ||||
_sshv1peer = sshpeer.sshv1peer | ||||
# Statistics for debugging | ||||
fetchcost = 0 | ||||
fetches = 0 | ||||
fetched = 0 | ||||
fetchmisses = 0 | ||||
_lfsmod = None | ||||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r40530 | def getcachekey(reponame, file, id): | ||
Joerg Sonnenberger
|
r46729 | pathhash = hex(hashutil.sha1(file).digest()) | ||
Augie Fackler
|
r40530 | return os.path.join(reponame, pathhash[:2], pathhash[2:], id) | ||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r40530 | def getlocalkey(file, id): | ||
Joerg Sonnenberger
|
r46729 | pathhash = hex(hashutil.sha1(file).digest()) | ||
Augie Fackler
|
r40530 | return os.path.join(pathhash, id) | ||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r40530 | def peersetup(ui, peer): | ||
class remotefilepeer(peer.__class__): | ||||
@wireprotov1peer.batchable | ||||
Augie Fackler
|
r40546 | def x_rfl_getfile(self, file, node): | ||
Augie Fackler
|
r43347 | if not self.capable(b'x_rfl_getfile'): | ||
Augie Fackler
|
r40530 | raise error.Abort( | ||
Augie Fackler
|
r43347 | b'configured remotefile server does not support getfile' | ||
Augie Fackler
|
r43346 | ) | ||
Valentin Gatien-Baron
|
r48687 | |||
def decode(d): | ||||
code, data = d.split(b'\0', 1) | ||||
if int(code): | ||||
raise error.LookupError(file, node, data) | ||||
return data | ||||
return {b'file': file, b'node': node}, decode | ||||
Augie Fackler
|
r40530 | |||
@wireprotov1peer.batchable | ||||
Augie Fackler
|
r40546 | def x_rfl_getflogheads(self, path): | ||
Augie Fackler
|
r43347 | if not self.capable(b'x_rfl_getflogheads'): | ||
Augie Fackler
|
r43346 | raise error.Abort( | ||
Augie Fackler
|
r43347 | b'configured remotefile server does not ' | ||
b'support getflogheads' | ||||
Augie Fackler
|
r43346 | ) | ||
Valentin Gatien-Baron
|
r48687 | |||
def decode(d): | ||||
return d.split(b'\n') if d else [] | ||||
return {b'path': path}, decode | ||||
Augie Fackler
|
r40530 | |||
def _updatecallstreamopts(self, command, opts): | ||||
Augie Fackler
|
r43347 | if command != b'getbundle': | ||
Augie Fackler
|
r40530 | return | ||
Augie Fackler
|
r43346 | if ( | ||
constants.NETWORK_CAP_LEGACY_SSH_GETFILES | ||||
not in self.capabilities() | ||||
): | ||||
Augie Fackler
|
r40530 | return | ||
r51821 | if not hasattr(self, '_localrepo'): | |||
Augie Fackler
|
r40530 | return | ||
Augie Fackler
|
r43346 | if ( | ||
constants.SHALLOWREPO_REQUIREMENT | ||||
not in self._localrepo.requirements | ||||
): | ||||
Augie Fackler
|
r40530 | return | ||
Augie Fackler
|
r43347 | bundlecaps = opts.get(b'bundlecaps') | ||
Augie Fackler
|
r40530 | if bundlecaps: | ||
bundlecaps = [bundlecaps] | ||||
else: | ||||
bundlecaps = [] | ||||
# shallow, includepattern, and excludepattern are a hacky way of | ||||
# carrying over data from the local repo to this getbundle | ||||
# command. We need to do it this way because bundle1 getbundle | ||||
# doesn't provide any other place we can hook in to manipulate | ||||
# getbundle args before it goes across the wire. Once we get rid | ||||
# of bundle1, we can use bundle2's _pullbundle2extraprepare to | ||||
# do this more cleanly. | ||||
Augie Fackler
|
r40544 | bundlecaps.append(constants.BUNDLE2_CAPABLITY) | ||
Augie Fackler
|
r40530 | if self._localrepo.includepattern: | ||
Augie Fackler
|
r43347 | patterns = b'\0'.join(self._localrepo.includepattern) | ||
includecap = b"includepattern=" + patterns | ||||
Augie Fackler
|
r40530 | bundlecaps.append(includecap) | ||
if self._localrepo.excludepattern: | ||||
Augie Fackler
|
r43347 | patterns = b'\0'.join(self._localrepo.excludepattern) | ||
excludecap = b"excludepattern=" + patterns | ||||
Augie Fackler
|
r40530 | bundlecaps.append(excludecap) | ||
Augie Fackler
|
r43347 | opts[b'bundlecaps'] = b','.join(bundlecaps) | ||
Augie Fackler
|
r40530 | |||
def _sendrequest(self, command, args, **opts): | ||||
self._updatecallstreamopts(command, args) | ||||
Augie Fackler
|
r43346 | return super(remotefilepeer, self)._sendrequest( | ||
command, args, **opts | ||||
) | ||||
Augie Fackler
|
r40530 | |||
def _callstream(self, command, **opts): | ||||
supertype = super(remotefilepeer, self) | ||||
r51821 | if not hasattr(supertype, '_sendrequest'): | |||
Pulkit Goyal
|
r40646 | self._updatecallstreamopts(command, pycompat.byteskwargs(opts)) | ||
Augie Fackler
|
r40530 | return super(remotefilepeer, self)._callstream(command, **opts) | ||
peer.__class__ = remotefilepeer | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r49801 | class cacheconnection: | ||
Augie Fackler
|
r40530 | """The connection for communicating with the remote cache. Performs | ||
gets and sets by communicating with an external process that has the | ||||
cache-specific implementation. | ||||
""" | ||||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r40530 | def __init__(self): | ||
self.pipeo = self.pipei = self.pipee = None | ||||
self.subprocess = None | ||||
self.connected = False | ||||
def connect(self, cachecommand): | ||||
if self.pipeo: | ||||
Augie Fackler
|
r43347 | raise error.Abort(_(b"cache connection already open")) | ||
Augie Fackler
|
r43346 | self.pipei, self.pipeo, self.pipee, self.subprocess = procutil.popen4( | ||
cachecommand | ||||
) | ||||
Augie Fackler
|
r40530 | self.connected = True | ||
def close(self): | ||||
def tryclose(pipe): | ||||
try: | ||||
pipe.close() | ||||
except Exception: | ||||
pass | ||||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r40530 | if self.connected: | ||
try: | ||||
Augie Fackler
|
r43347 | self.pipei.write(b"exit\n") | ||
Augie Fackler
|
r40530 | except Exception: | ||
pass | ||||
tryclose(self.pipei) | ||||
self.pipei = None | ||||
tryclose(self.pipeo) | ||||
self.pipeo = None | ||||
tryclose(self.pipee) | ||||
self.pipee = None | ||||
try: | ||||
# Wait for process to terminate, making sure to avoid deadlock. | ||||
# See https://docs.python.org/2/library/subprocess.html for | ||||
# warnings about wait() and deadlocking. | ||||
self.subprocess.communicate() | ||||
except Exception: | ||||
pass | ||||
self.subprocess = None | ||||
self.connected = False | ||||
def request(self, request, flush=True): | ||||
if self.connected: | ||||
try: | ||||
self.pipei.write(request) | ||||
if flush: | ||||
self.pipei.flush() | ||||
except IOError: | ||||
self.close() | ||||
def receiveline(self): | ||||
if not self.connected: | ||||
return None | ||||
try: | ||||
result = self.pipeo.readline()[:-1] | ||||
if not result: | ||||
self.close() | ||||
except IOError: | ||||
self.close() | ||||
return result | ||||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r40530 | def _getfilesbatch( | ||
Augie Fackler
|
r43346 | remote, receivemissing, progresstick, missed, idmap, batchsize | ||
): | ||||
Augie Fackler
|
r40530 | # Over http(s), iterbatch is a streamy method and we can start | ||
# looking at results early. This means we send one (potentially | ||||
# large) request, but then we show nice progress as we process | ||||
# file results, rather than showing chunks of $batchsize in | ||||
# progress. | ||||
# | ||||
# Over ssh, iterbatch isn't streamy because batch() wasn't | ||||
# explicitly designed as a streaming method. In the future we | ||||
# should probably introduce a streambatch() method upstream and | ||||
# use that for this. | ||||
with remote.commandexecutor() as e: | ||||
futures = [] | ||||
for m in missed: | ||||
Augie Fackler
|
r43346 | futures.append( | ||
e.callcommand( | ||||
Augie Fackler
|
r43347 | b'x_rfl_getfile', {b'file': idmap[m], b'node': m[-40:]} | ||
Augie Fackler
|
r43346 | ) | ||
) | ||||
Augie Fackler
|
r40530 | |||
for i, m in enumerate(missed): | ||||
r = futures[i].result() | ||||
futures[i] = None # release memory | ||||
file_ = idmap[m] | ||||
node = m[-40:] | ||||
Augie Fackler
|
r43347 | receivemissing(io.BytesIO(b'%d\n%s' % (len(r), r)), file_, node) | ||
Augie Fackler
|
r40530 | progresstick() | ||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r40530 | def _getfiles_optimistic( | ||
Augie Fackler
|
r43346 | remote, receivemissing, progresstick, missed, idmap, step | ||
): | ||||
Augie Fackler
|
r43347 | remote._callstream(b"x_rfl_getfiles") | ||
Augie Fackler
|
r40530 | i = 0 | ||
pipeo = remote._pipeo | ||||
pipei = remote._pipei | ||||
while i < len(missed): | ||||
# issue a batch of requests | ||||
start = i | ||||
end = min(len(missed), start + step) | ||||
i = end | ||||
for missingid in missed[start:end]: | ||||
# issue new request | ||||
versionid = missingid[-40:] | ||||
file = idmap[missingid] | ||||
Augie Fackler
|
r43347 | sshrequest = b"%s%s\n" % (versionid, file) | ||
Augie Fackler
|
r40530 | pipeo.write(sshrequest) | ||
pipeo.flush() | ||||
# receive batch results | ||||
for missingid in missed[start:end]: | ||||
versionid = missingid[-40:] | ||||
file = idmap[missingid] | ||||
receivemissing(pipei, file, versionid) | ||||
progresstick() | ||||
# End the command | ||||
Augie Fackler
|
r43347 | pipeo.write(b'\n') | ||
Augie Fackler
|
r40530 | pipeo.flush() | ||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r40530 | def _getfiles_threaded( | ||
Augie Fackler
|
r43346 | remote, receivemissing, progresstick, missed, idmap, step | ||
): | ||||
Kévin Lévesque
|
r47857 | remote._callstream(b"x_rfl_getfiles") | ||
Augie Fackler
|
r40530 | pipeo = remote._pipeo | ||
pipei = remote._pipei | ||||
def writer(): | ||||
for missingid in missed: | ||||
versionid = missingid[-40:] | ||||
file = idmap[missingid] | ||||
Augie Fackler
|
r43347 | sshrequest = b"%s%s\n" % (versionid, file) | ||
Augie Fackler
|
r40530 | pipeo.write(sshrequest) | ||
pipeo.flush() | ||||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r40530 | writerthread = threading.Thread(target=writer) | ||
writerthread.daemon = True | ||||
writerthread.start() | ||||
for missingid in missed: | ||||
versionid = missingid[-40:] | ||||
file = idmap[missingid] | ||||
receivemissing(pipei, file, versionid) | ||||
progresstick() | ||||
writerthread.join() | ||||
# End the command | ||||
Augie Fackler
|
r43347 | pipeo.write(b'\n') | ||
Augie Fackler
|
r40530 | pipeo.flush() | ||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r49801 | class fileserverclient: | ||
Augie Fackler
|
r46554 | """A client for requesting files from the remote file server.""" | ||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r40530 | def __init__(self, repo): | ||
ui = repo.ui | ||||
self.repo = repo | ||||
self.ui = ui | ||||
Augie Fackler
|
r43347 | self.cacheprocess = ui.config(b"remotefilelog", b"cacheprocess") | ||
Augie Fackler
|
r40530 | if self.cacheprocess: | ||
self.cacheprocess = util.expandpath(self.cacheprocess) | ||||
# This option causes remotefilelog to pass the full file path to the | ||||
# cacheprocess instead of a hashed key. | ||||
self.cacheprocesspasspath = ui.configbool( | ||||
Augie Fackler
|
r43347 | b"remotefilelog", b"cacheprocess.includepath" | ||
Augie Fackler
|
r43346 | ) | ||
Augie Fackler
|
r40530 | |||
Augie Fackler
|
r43347 | self.debugoutput = ui.configbool(b"remotefilelog", b"debug") | ||
Augie Fackler
|
r40530 | |||
self.remotecache = cacheconnection() | ||||
def setstore(self, datastore, historystore, writedata, writehistory): | ||||
self.datastore = datastore | ||||
self.historystore = historystore | ||||
self.writedata = writedata | ||||
self.writehistory = writehistory | ||||
def _connect(self): | ||||
return self.repo.connectionpool.get(self.repo.fallbackpath) | ||||
def request(self, fileids): | ||||
"""Takes a list of filename/node pairs and fetches them from the | ||||
server. Files are stored in the local cache. | ||||
A list of nodes that the server couldn't find is returned. | ||||
If the connection fails, an exception is raised. | ||||
""" | ||||
if not self.remotecache.connected: | ||||
self.connect() | ||||
cache = self.remotecache | ||||
writedata = self.writedata | ||||
repo = self.repo | ||||
Martin von Zweigbergk
|
r40882 | total = len(fileids) | ||
Augie Fackler
|
r43347 | request = b"get\n%d\n" % total | ||
Augie Fackler
|
r40530 | idmap = {} | ||
reponame = repo.name | ||||
for file, id in fileids: | ||||
fullid = getcachekey(reponame, file, id) | ||||
if self.cacheprocesspasspath: | ||||
Augie Fackler
|
r43347 | request += file + b'\0' | ||
request += fullid + b"\n" | ||||
Augie Fackler
|
r40530 | idmap[fullid] = file | ||
cache.request(request) | ||||
Augie Fackler
|
r43347 | progress = self.ui.makeprogress(_(b'downloading'), total=total) | ||
Martin von Zweigbergk
|
r40881 | progress.update(0) | ||
Augie Fackler
|
r40530 | |||
missed = [] | ||||
while True: | ||||
missingid = cache.receiveline() | ||||
if not missingid: | ||||
missedset = set(missed) | ||||
Pulkit Goyal
|
r40649 | for missingid in idmap: | ||
Augie Fackler
|
r40530 | if not missingid in missedset: | ||
missed.append(missingid) | ||||
Augie Fackler
|
r43346 | self.ui.warn( | ||
_( | ||||
Augie Fackler
|
r43347 | b"warning: cache connection closed early - " | ||
+ b"falling back to server\n" | ||||
Augie Fackler
|
r43346 | ) | ||
) | ||||
Augie Fackler
|
r40530 | break | ||
Augie Fackler
|
r43347 | if missingid == b"0": | ||
Augie Fackler
|
r40530 | break | ||
Augie Fackler
|
r43347 | if missingid.startswith(b"_hits_"): | ||
Augie Fackler
|
r40530 | # receive progress reports | ||
Augie Fackler
|
r43347 | parts = missingid.split(b"_") | ||
Martin von Zweigbergk
|
r40886 | progress.increment(int(parts[2])) | ||
Augie Fackler
|
r40530 | continue | ||
missed.append(missingid) | ||||
global fetchmisses | ||||
fetchmisses += len(missed) | ||||
Martin von Zweigbergk
|
r40884 | fromcache = total - len(missed) | ||
Martin von Zweigbergk
|
r40885 | progress.update(fromcache, total=total) | ||
Augie Fackler
|
r43346 | self.ui.log( | ||
Augie Fackler
|
r43347 | b"remotefilelog", | ||
b"remote cache hit rate is %r of %r\n", | ||||
Augie Fackler
|
r43346 | fromcache, | ||
total, | ||||
hit=fromcache, | ||||
total=total, | ||||
) | ||||
Augie Fackler
|
r40530 | |||
oldumask = os.umask(0o002) | ||||
try: | ||||
# receive cache misses from master | ||||
if missed: | ||||
# When verbose is true, sshpeer prints 'running ssh...' | ||||
# to stdout, which can interfere with some command | ||||
# outputs | ||||
verbose = self.ui.verbose | ||||
self.ui.verbose = False | ||||
try: | ||||
with self._connect() as conn: | ||||
remote = conn.peer | ||||
Augie Fackler
|
r40543 | if remote.capable( | ||
Augie Fackler
|
r43346 | constants.NETWORK_CAP_LEGACY_SSH_GETFILES | ||
): | ||||
Augie Fackler
|
r40530 | if not isinstance(remote, _sshv1peer): | ||
Augie Fackler
|
r43346 | raise error.Abort( | ||
Martin von Zweigbergk
|
r43387 | b'remotefilelog requires ssh servers' | ||
Augie Fackler
|
r43346 | ) | ||
step = self.ui.configint( | ||||
Augie Fackler
|
r43347 | b'remotefilelog', b'getfilesstep' | ||
Augie Fackler
|
r43346 | ) | ||
getfilestype = self.ui.config( | ||||
Augie Fackler
|
r43347 | b'remotefilelog', b'getfilestype' | ||
Augie Fackler
|
r43346 | ) | ||
Augie Fackler
|
r43347 | if getfilestype == b'threaded': | ||
Augie Fackler
|
r40530 | _getfiles = _getfiles_threaded | ||
else: | ||||
_getfiles = _getfiles_optimistic | ||||
Augie Fackler
|
r43346 | _getfiles( | ||
remote, | ||||
self.receivemissing, | ||||
progress.increment, | ||||
missed, | ||||
idmap, | ||||
step, | ||||
) | ||||
Augie Fackler
|
r43347 | elif remote.capable(b"x_rfl_getfile"): | ||
if remote.capable(b'batch'): | ||||
Augie Fackler
|
r40530 | batchdefault = 100 | ||
else: | ||||
batchdefault = 10 | ||||
batchsize = self.ui.configint( | ||||
Augie Fackler
|
r43347 | b'remotefilelog', b'batchsize', batchdefault | ||
Augie Fackler
|
r43346 | ) | ||
Augie Fackler
|
r42451 | self.ui.debug( | ||
b'requesting %d files from ' | ||||
Augie Fackler
|
r43346 | b'remotefilelog server...\n' % len(missed) | ||
) | ||||
Augie Fackler
|
r40530 | _getfilesbatch( | ||
Augie Fackler
|
r43346 | remote, | ||
self.receivemissing, | ||||
progress.increment, | ||||
missed, | ||||
idmap, | ||||
batchsize, | ||||
) | ||||
Augie Fackler
|
r40530 | else: | ||
Augie Fackler
|
r43346 | raise error.Abort( | ||
Augie Fackler
|
r43347 | b"configured remotefilelog server" | ||
b" does not support remotefilelog" | ||||
Augie Fackler
|
r43346 | ) | ||
Augie Fackler
|
r40530 | |||
Augie Fackler
|
r43346 | self.ui.log( | ||
Augie Fackler
|
r43347 | b"remotefilefetchlog", | ||
b"Success\n", | ||||
Augie Fackler
|
r43346 | fetched_files=progress.pos - fromcache, | ||
total_to_fetch=total - fromcache, | ||||
) | ||||
Augie Fackler
|
r40530 | except Exception: | ||
Augie Fackler
|
r43346 | self.ui.log( | ||
Augie Fackler
|
r43347 | b"remotefilefetchlog", | ||
b"Fail\n", | ||||
Augie Fackler
|
r43346 | fetched_files=progress.pos - fromcache, | ||
total_to_fetch=total - fromcache, | ||||
) | ||||
Augie Fackler
|
r40530 | raise | ||
finally: | ||||
self.ui.verbose = verbose | ||||
# send to memcache | ||||
Augie Fackler
|
r43347 | request = b"set\n%d\n%s\n" % (len(missed), b"\n".join(missed)) | ||
Augie Fackler
|
r40530 | cache.request(request) | ||
Martin von Zweigbergk
|
r40881 | progress.complete() | ||
Augie Fackler
|
r40530 | |||
# mark ourselves as a user of this cache | ||||
writedata.markrepo(self.repo.path) | ||||
finally: | ||||
os.umask(oldumask) | ||||
def receivemissing(self, pipe, filename, node): | ||||
line = pipe.readline()[:-1] | ||||
if not line: | ||||
Augie Fackler
|
r43346 | raise error.ResponseError( | ||
Augie Fackler
|
r43347 | _(b"error downloading file contents:"), | ||
_(b"connection closed early"), | ||||
Augie Fackler
|
r43346 | ) | ||
Augie Fackler
|
r40530 | size = int(line) | ||
data = pipe.read(size) | ||||
if len(data) != size: | ||||
Augie Fackler
|
r43346 | raise error.ResponseError( | ||
Augie Fackler
|
r43347 | _(b"error downloading file contents:"), | ||
_(b"only received %s of %s bytes") % (len(data), size), | ||||
Augie Fackler
|
r43346 | ) | ||
Augie Fackler
|
r40530 | |||
Augie Fackler
|
r43346 | self.writedata.addremotefilelognode( | ||
filename, bin(node), zlib.decompress(data) | ||||
) | ||||
Augie Fackler
|
r40530 | |||
def connect(self): | ||||
if self.cacheprocess: | ||||
Augie Fackler
|
r43347 | cmd = b"%s %s" % (self.cacheprocess, self.writedata._path) | ||
Augie Fackler
|
r40530 | self.remotecache.connect(cmd) | ||
else: | ||||
# If no cache process is specified, we fake one that always | ||||
# returns cache misses. This enables tests to run easily | ||||
# and may eventually allow us to be a drop in replacement | ||||
# for the largefiles extension. | ||||
Gregory Szorc
|
r49801 | class simplecache: | ||
Augie Fackler
|
r40530 | def __init__(self): | ||
self.missingids = [] | ||||
self.connected = True | ||||
def close(self): | ||||
pass | ||||
def request(self, value, flush=True): | ||||
Augie Fackler
|
r43347 | lines = value.split(b"\n") | ||
if lines[0] != b"get": | ||||
Augie Fackler
|
r40530 | return | ||
self.missingids = lines[2:-1] | ||||
Augie Fackler
|
r43347 | self.missingids.append(b'0') | ||
Augie Fackler
|
r40530 | |||
def receiveline(self): | ||||
if len(self.missingids) > 0: | ||||
return self.missingids.pop(0) | ||||
return None | ||||
self.remotecache = simplecache() | ||||
def close(self): | ||||
if fetches: | ||||
Augie Fackler
|
r43346 | msg = ( | ||
Augie Fackler
|
r43347 | b"%d files fetched over %d fetches - " | ||
+ b"(%d misses, %0.2f%% hit ratio) over %0.2fs\n" | ||||
Augie Fackler
|
r43346 | ) % ( | ||
fetched, | ||||
fetches, | ||||
fetchmisses, | ||||
float(fetched - fetchmisses) / float(fetched) * 100.0, | ||||
fetchcost, | ||||
) | ||||
Augie Fackler
|
r40530 | if self.debugoutput: | ||
self.ui.warn(msg) | ||||
Augie Fackler
|
r43346 | self.ui.log( | ||
Augie Fackler
|
r43347 | b"remotefilelog.prefetch", | ||
msg.replace(b"%", b"%%"), | ||||
Augie Fackler
|
r40530 | remotefilelogfetched=fetched, | ||
remotefilelogfetches=fetches, | ||||
remotefilelogfetchmisses=fetchmisses, | ||||
Augie Fackler
|
r43346 | remotefilelogfetchtime=fetchcost * 1000, | ||
) | ||||
Augie Fackler
|
r40530 | |||
if self.remotecache.connected: | ||||
self.remotecache.close() | ||||
Augie Fackler
|
r43346 | def prefetch( | ||
self, fileids, force=False, fetchdata=True, fetchhistory=False | ||||
): | ||||
Augie Fackler
|
r46554 | """downloads the given file versions to the cache""" | ||
Augie Fackler
|
r40530 | repo = self.repo | ||
idstocheck = [] | ||||
for file, id in fileids: | ||||
# hack | ||||
# - we don't use .hgtags | ||||
# - workingctx produces ids with length 42, | ||||
# which we skip since they aren't in any cache | ||||
Augie Fackler
|
r43346 | if ( | ||
Augie Fackler
|
r43347 | file == b'.hgtags' | ||
Augie Fackler
|
r43346 | or len(id) == 42 | ||
or not repo.shallowmatch(file) | ||||
): | ||||
Augie Fackler
|
r40530 | continue | ||
idstocheck.append((file, bin(id))) | ||||
datastore = self.datastore | ||||
historystore = self.historystore | ||||
if force: | ||||
datastore = contentstore.unioncontentstore(*repo.shareddatastores) | ||||
historystore = metadatastore.unionmetadatastore( | ||||
Augie Fackler
|
r43346 | *repo.sharedhistorystores | ||
) | ||||
Augie Fackler
|
r40530 | |||
missingids = set() | ||||
if fetchdata: | ||||
missingids.update(datastore.getmissing(idstocheck)) | ||||
if fetchhistory: | ||||
missingids.update(historystore.getmissing(idstocheck)) | ||||
# partition missing nodes into nullid and not-nullid so we can | ||||
# warn about this filtering potentially shadowing bugs. | ||||
Joerg Sonnenberger
|
r47771 | nullids = len( | ||
[None for unused, id in missingids if id == self.repo.nullid] | ||||
) | ||||
Augie Fackler
|
r40530 | if nullids: | ||
Joerg Sonnenberger
|
r47771 | missingids = [ | ||
(f, id) for f, id in missingids if id != self.repo.nullid | ||||
] | ||||
Augie Fackler
|
r40530 | repo.ui.develwarn( | ||
Augie Fackler
|
r43346 | ( | ||
Augie Fackler
|
r43347 | b'remotefilelog not fetching %d null revs' | ||
b' - this is likely hiding bugs' % nullids | ||||
Augie Fackler
|
r43346 | ), | ||
Augie Fackler
|
r43347 | config=b'remotefilelog-ext', | ||
Augie Fackler
|
r43346 | ) | ||
Augie Fackler
|
r40530 | if missingids: | ||
global fetches, fetched, fetchcost | ||||
fetches += 1 | ||||
# We want to be able to detect excess individual file downloads, so | ||||
# let's log that information for debugging. | ||||
if fetches >= 15 and fetches < 18: | ||||
if fetches == 15: | ||||
Augie Fackler
|
r43346 | fetchwarning = self.ui.config( | ||
Augie Fackler
|
r43347 | b'remotefilelog', b'fetchwarning' | ||
Augie Fackler
|
r43346 | ) | ||
Augie Fackler
|
r40530 | if fetchwarning: | ||
Augie Fackler
|
r43347 | self.ui.warn(fetchwarning + b'\n') | ||
Augie Fackler
|
r40530 | self.logstacktrace() | ||
Martin von Zweigbergk
|
r42203 | missingids = [(file, hex(id)) for file, id in sorted(missingids)] | ||
Augie Fackler
|
r40530 | fetched += len(missingids) | ||
start = time.time() | ||||
missingids = self.request(missingids) | ||||
if missingids: | ||||
Augie Fackler
|
r43346 | raise error.Abort( | ||
Augie Fackler
|
r43347 | _(b"unable to download %d files") % len(missingids) | ||
Augie Fackler
|
r43346 | ) | ||
Augie Fackler
|
r40530 | fetchcost += time.time() - start | ||
self._lfsprefetch(fileids) | ||||
def _lfsprefetch(self, fileids): | ||||
Martin von Zweigbergk
|
r52039 | if not _lfsmod or not hasattr(self.repo.svfs, 'lfslocalblobstore'): | ||
Augie Fackler
|
r40530 | return | ||
if not _lfsmod.wrapper.candownload(self.repo): | ||||
return | ||||
pointers = [] | ||||
store = self.repo.svfs.lfslocalblobstore | ||||
for file, id in fileids: | ||||
node = bin(id) | ||||
rlog = self.repo.file(file) | ||||
if rlog.flags(node) & revlog.REVIDX_EXTSTORED: | ||||
r43039 | text = rlog.rawdata(node) | |||
Augie Fackler
|
r40530 | p = _lfsmod.pointer.deserialize(text) | ||
oid = p.oid() | ||||
if not store.has(oid): | ||||
pointers.append(p) | ||||
if len(pointers) > 0: | ||||
self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store) | ||||
assert all(store.has(p.oid()) for p in pointers) | ||||
def logstacktrace(self): | ||||
import traceback | ||||
Augie Fackler
|
r43346 | |||
self.ui.log( | ||||
Augie Fackler
|
r43347 | b'remotefilelog', | ||
b'excess remotefilelog fetching:\n%s\n', | ||||
Kyle Lippincott
|
r44453 | b''.join(pycompat.sysbytes(s) for s in traceback.format_stack()), | ||
Augie Fackler
|
r43346 | ) | ||