##// END OF EJS Templates
rust-pathauditor: use interior mutability for use in multi-threaded contexts...
rust-pathauditor: use interior mutability for use in multi-threaded contexts The usual recommendation for using `RwLock` or `Mutex` is that if there are about as many write as there are reads, use `Mutex`, and if there are more reads than writes, use `RwLock`. If after the main bottleneck (i.e. parallel traversal) is removed this shows up on profiles, we should investigate using the `parking_lot` since we don't need a poisoning API, or maybe move to different types of caches entirely. Differential Revision: https://phab.mercurial-scm.org/D8213

File last commit:

r44514:006e7821 default
r45022:07d9fd60 default
Show More
fileserverclient.py
669 lines | 22.3 KiB | text/x-python | PythonLexer
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 # fileserverclient.py - client for communicating with the cache process
#
# Copyright 2013 Facebook, Inc.
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
from __future__ import absolute_import
import io
import os
import threading
import time
Augie Fackler
remotefilelog: rip out lz4 support...
r40542 import zlib
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530
from mercurial.i18n import _
from mercurial.node import bin, hex, nullid
from mercurial import (
error,
Pulkit Goyal
py3: use node.hex(h.digest()) instead of h.hexdigest()...
r40648 node,
Pulkit Goyal
py3: fix keyword arguments handling in hgext/remotefilelog/...
r40646 pycompat,
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 revlog,
sshpeer,
util,
wireprotov1peer,
)
Augie Fackler
hgext: replace references to hashlib.sha1 with hashutil.sha1...
r44519 from mercurial.utils import (
hashutil,
procutil,
)
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530
from . import (
constants,
contentstore,
metadatastore,
)
_sshv1peer = sshpeer.sshv1peer
# Statistics for debugging
fetchcost = 0
fetches = 0
fetched = 0
fetchmisses = 0
_lfsmod = None
Augie Fackler
formatting: blacken the codebase...
r43346
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 def getcachekey(reponame, file, id):
Augie Fackler
hgext: replace references to hashlib.sha1 with hashutil.sha1...
r44519 pathhash = node.hex(hashutil.sha1(file).digest())
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 return os.path.join(reponame, pathhash[:2], pathhash[2:], id)
Augie Fackler
formatting: blacken the codebase...
r43346
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 def getlocalkey(file, id):
Augie Fackler
hgext: replace references to hashlib.sha1 with hashutil.sha1...
r44519 pathhash = node.hex(hashutil.sha1(file).digest())
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 return os.path.join(pathhash, id)
Augie Fackler
formatting: blacken the codebase...
r43346
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 def peersetup(ui, peer):
class remotefilepeer(peer.__class__):
@wireprotov1peer.batchable
Augie Fackler
remotefilelog: rename wireproto methods and remaining capabilities...
r40546 def x_rfl_getfile(self, file, node):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 if not self.capable(b'x_rfl_getfile'):
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 raise error.Abort(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'configured remotefile server does not support getfile'
Augie Fackler
formatting: blacken the codebase...
r43346 )
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 f = wireprotov1peer.future()
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 yield {b'file': file, b'node': node}, f
code, data = f.value.split(b'\0', 1)
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 if int(code):
raise error.LookupError(file, node, data)
yield data
@wireprotov1peer.batchable
Augie Fackler
remotefilelog: rename wireproto methods and remaining capabilities...
r40546 def x_rfl_getflogheads(self, path):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 if not self.capable(b'x_rfl_getflogheads'):
Augie Fackler
formatting: blacken the codebase...
r43346 raise error.Abort(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'configured remotefile server does not '
b'support getflogheads'
Augie Fackler
formatting: blacken the codebase...
r43346 )
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 f = wireprotov1peer.future()
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 yield {b'path': path}, f
heads = f.value.split(b'\n') if f.value else []
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 yield heads
def _updatecallstreamopts(self, command, opts):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 if command != b'getbundle':
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 return
Augie Fackler
formatting: blacken the codebase...
r43346 if (
constants.NETWORK_CAP_LEGACY_SSH_GETFILES
not in self.capabilities()
):
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 return
Martin von Zweigbergk
py3: delete b'' prefix from safehasattr arguments...
r43385 if not util.safehasattr(self, '_localrepo'):
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 return
Augie Fackler
formatting: blacken the codebase...
r43346 if (
constants.SHALLOWREPO_REQUIREMENT
not in self._localrepo.requirements
):
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 return
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 bundlecaps = opts.get(b'bundlecaps')
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 if bundlecaps:
bundlecaps = [bundlecaps]
else:
bundlecaps = []
# shallow, includepattern, and excludepattern are a hacky way of
# carrying over data from the local repo to this getbundle
# command. We need to do it this way because bundle1 getbundle
# doesn't provide any other place we can hook in to manipulate
# getbundle args before it goes across the wire. Once we get rid
# of bundle1, we can use bundle2's _pullbundle2extraprepare to
# do this more cleanly.
Augie Fackler
remotefilelog: consolidate and rename bundle2 capability...
r40544 bundlecaps.append(constants.BUNDLE2_CAPABLITY)
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 if self._localrepo.includepattern:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 patterns = b'\0'.join(self._localrepo.includepattern)
includecap = b"includepattern=" + patterns
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 bundlecaps.append(includecap)
if self._localrepo.excludepattern:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 patterns = b'\0'.join(self._localrepo.excludepattern)
excludecap = b"excludepattern=" + patterns
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 bundlecaps.append(excludecap)
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 opts[b'bundlecaps'] = b','.join(bundlecaps)
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530
def _sendrequest(self, command, args, **opts):
self._updatecallstreamopts(command, args)
Augie Fackler
formatting: blacken the codebase...
r43346 return super(remotefilepeer, self)._sendrequest(
command, args, **opts
)
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530
def _callstream(self, command, **opts):
supertype = super(remotefilepeer, self)
Martin von Zweigbergk
py3: delete b'' prefix from safehasattr arguments...
r43385 if not util.safehasattr(supertype, '_sendrequest'):
Pulkit Goyal
py3: fix keyword arguments handling in hgext/remotefilelog/...
r40646 self._updatecallstreamopts(command, pycompat.byteskwargs(opts))
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 return super(remotefilepeer, self)._callstream(command, **opts)
peer.__class__ = remotefilepeer
Augie Fackler
formatting: blacken the codebase...
r43346
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 class cacheconnection(object):
"""The connection for communicating with the remote cache. Performs
gets and sets by communicating with an external process that has the
cache-specific implementation.
"""
Augie Fackler
formatting: blacken the codebase...
r43346
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 def __init__(self):
self.pipeo = self.pipei = self.pipee = None
self.subprocess = None
self.connected = False
def connect(self, cachecommand):
if self.pipeo:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 raise error.Abort(_(b"cache connection already open"))
Augie Fackler
formatting: blacken the codebase...
r43346 self.pipei, self.pipeo, self.pipee, self.subprocess = procutil.popen4(
cachecommand
)
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 self.connected = True
def close(self):
def tryclose(pipe):
try:
pipe.close()
except Exception:
pass
Augie Fackler
formatting: blacken the codebase...
r43346
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 if self.connected:
try:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.pipei.write(b"exit\n")
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 except Exception:
pass
tryclose(self.pipei)
self.pipei = None
tryclose(self.pipeo)
self.pipeo = None
tryclose(self.pipee)
self.pipee = None
try:
# Wait for process to terminate, making sure to avoid deadlock.
# See https://docs.python.org/2/library/subprocess.html for
# warnings about wait() and deadlocking.
self.subprocess.communicate()
except Exception:
pass
self.subprocess = None
self.connected = False
def request(self, request, flush=True):
if self.connected:
try:
self.pipei.write(request)
if flush:
self.pipei.flush()
except IOError:
self.close()
def receiveline(self):
if not self.connected:
return None
try:
result = self.pipeo.readline()[:-1]
if not result:
self.close()
except IOError:
self.close()
return result
Augie Fackler
formatting: blacken the codebase...
r43346
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 def _getfilesbatch(
Augie Fackler
formatting: blacken the codebase...
r43346 remote, receivemissing, progresstick, missed, idmap, batchsize
):
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 # Over http(s), iterbatch is a streamy method and we can start
# looking at results early. This means we send one (potentially
# large) request, but then we show nice progress as we process
# file results, rather than showing chunks of $batchsize in
# progress.
#
# Over ssh, iterbatch isn't streamy because batch() wasn't
# explicitly designed as a streaming method. In the future we
# should probably introduce a streambatch() method upstream and
# use that for this.
with remote.commandexecutor() as e:
futures = []
for m in missed:
Augie Fackler
formatting: blacken the codebase...
r43346 futures.append(
e.callcommand(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'x_rfl_getfile', {b'file': idmap[m], b'node': m[-40:]}
Augie Fackler
formatting: blacken the codebase...
r43346 )
)
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530
for i, m in enumerate(missed):
r = futures[i].result()
futures[i] = None # release memory
file_ = idmap[m]
node = m[-40:]
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 receivemissing(io.BytesIO(b'%d\n%s' % (len(r), r)), file_, node)
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 progresstick()
Augie Fackler
formatting: blacken the codebase...
r43346
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 def _getfiles_optimistic(
Augie Fackler
formatting: blacken the codebase...
r43346 remote, receivemissing, progresstick, missed, idmap, step
):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 remote._callstream(b"x_rfl_getfiles")
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 i = 0
pipeo = remote._pipeo
pipei = remote._pipei
while i < len(missed):
# issue a batch of requests
start = i
end = min(len(missed), start + step)
i = end
for missingid in missed[start:end]:
# issue new request
versionid = missingid[-40:]
file = idmap[missingid]
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 sshrequest = b"%s%s\n" % (versionid, file)
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 pipeo.write(sshrequest)
pipeo.flush()
# receive batch results
for missingid in missed[start:end]:
versionid = missingid[-40:]
file = idmap[missingid]
receivemissing(pipei, file, versionid)
progresstick()
# End the command
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 pipeo.write(b'\n')
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 pipeo.flush()
Augie Fackler
formatting: blacken the codebase...
r43346
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 def _getfiles_threaded(
Augie Fackler
formatting: blacken the codebase...
r43346 remote, receivemissing, progresstick, missed, idmap, step
):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 remote._callstream(b"getfiles")
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 pipeo = remote._pipeo
pipei = remote._pipei
def writer():
for missingid in missed:
versionid = missingid[-40:]
file = idmap[missingid]
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 sshrequest = b"%s%s\n" % (versionid, file)
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 pipeo.write(sshrequest)
pipeo.flush()
Augie Fackler
formatting: blacken the codebase...
r43346
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 writerthread = threading.Thread(target=writer)
writerthread.daemon = True
writerthread.start()
for missingid in missed:
versionid = missingid[-40:]
file = idmap[missingid]
receivemissing(pipei, file, versionid)
progresstick()
writerthread.join()
# End the command
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 pipeo.write(b'\n')
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 pipeo.flush()
Augie Fackler
formatting: blacken the codebase...
r43346
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 class fileserverclient(object):
"""A client for requesting files from the remote file server.
"""
Augie Fackler
formatting: blacken the codebase...
r43346
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 def __init__(self, repo):
ui = repo.ui
self.repo = repo
self.ui = ui
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.cacheprocess = ui.config(b"remotefilelog", b"cacheprocess")
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 if self.cacheprocess:
self.cacheprocess = util.expandpath(self.cacheprocess)
# This option causes remotefilelog to pass the full file path to the
# cacheprocess instead of a hashed key.
self.cacheprocesspasspath = ui.configbool(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b"remotefilelog", b"cacheprocess.includepath"
Augie Fackler
formatting: blacken the codebase...
r43346 )
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.debugoutput = ui.configbool(b"remotefilelog", b"debug")
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530
self.remotecache = cacheconnection()
def setstore(self, datastore, historystore, writedata, writehistory):
self.datastore = datastore
self.historystore = historystore
self.writedata = writedata
self.writehistory = writehistory
def _connect(self):
return self.repo.connectionpool.get(self.repo.fallbackpath)
def request(self, fileids):
"""Takes a list of filename/node pairs and fetches them from the
server. Files are stored in the local cache.
A list of nodes that the server couldn't find is returned.
If the connection fails, an exception is raised.
"""
if not self.remotecache.connected:
self.connect()
cache = self.remotecache
writedata = self.writedata
repo = self.repo
Martin von Zweigbergk
remotefilelog: avoid temporarily using "count" variable as synonym for "total"...
r40882 total = len(fileids)
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 request = b"get\n%d\n" % total
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 idmap = {}
reponame = repo.name
for file, id in fileids:
fullid = getcachekey(reponame, file, id)
if self.cacheprocesspasspath:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 request += file + b'\0'
request += fullid + b"\n"
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 idmap[fullid] = file
cache.request(request)
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 progress = self.ui.makeprogress(_(b'downloading'), total=total)
Martin von Zweigbergk
remotefilelog: use progress helper in fileserverclient...
r40881 progress.update(0)
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530
missed = []
while True:
missingid = cache.receiveline()
if not missingid:
missedset = set(missed)
Pulkit Goyal
py3: don't use dict.iterkeys()...
r40649 for missingid in idmap:
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 if not missingid in missedset:
missed.append(missingid)
Augie Fackler
formatting: blacken the codebase...
r43346 self.ui.warn(
_(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b"warning: cache connection closed early - "
+ b"falling back to server\n"
Augie Fackler
formatting: blacken the codebase...
r43346 )
)
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 break
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 if missingid == b"0":
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 break
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 if missingid.startswith(b"_hits_"):
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 # receive progress reports
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 parts = missingid.split(b"_")
Martin von Zweigbergk
remotefilelog: rely on progress helper for keeping track of position...
r40886 progress.increment(int(parts[2]))
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 continue
missed.append(missingid)
global fetchmisses
fetchmisses += len(missed)
Martin von Zweigbergk
remotefilelog: replace a "a=[expr]; b=a[0]" by "b=expr; a = [b]"...
r40884 fromcache = total - len(missed)
Martin von Zweigbergk
remotefilelog: reduce use of "count" container...
r40885 progress.update(fromcache, total=total)
Augie Fackler
formatting: blacken the codebase...
r43346 self.ui.log(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b"remotefilelog",
b"remote cache hit rate is %r of %r\n",
Augie Fackler
formatting: blacken the codebase...
r43346 fromcache,
total,
hit=fromcache,
total=total,
)
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530
oldumask = os.umask(0o002)
try:
# receive cache misses from master
if missed:
# When verbose is true, sshpeer prints 'running ssh...'
# to stdout, which can interfere with some command
# outputs
verbose = self.ui.verbose
self.ui.verbose = False
try:
with self._connect() as conn:
remote = conn.peer
Augie Fackler
remotefilelog: rename capability for legacy ssh file fetching method...
r40543 if remote.capable(
Augie Fackler
formatting: blacken the codebase...
r43346 constants.NETWORK_CAP_LEGACY_SSH_GETFILES
):
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 if not isinstance(remote, _sshv1peer):
Augie Fackler
formatting: blacken the codebase...
r43346 raise error.Abort(
Martin von Zweigbergk
cleanup: join string literals that are already on one line...
r43387 b'remotefilelog requires ssh servers'
Augie Fackler
formatting: blacken the codebase...
r43346 )
step = self.ui.configint(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'remotefilelog', b'getfilesstep'
Augie Fackler
formatting: blacken the codebase...
r43346 )
getfilestype = self.ui.config(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'remotefilelog', b'getfilestype'
Augie Fackler
formatting: blacken the codebase...
r43346 )
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 if getfilestype == b'threaded':
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 _getfiles = _getfiles_threaded
else:
_getfiles = _getfiles_optimistic
Augie Fackler
formatting: blacken the codebase...
r43346 _getfiles(
remote,
self.receivemissing,
progress.increment,
missed,
idmap,
step,
)
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 elif remote.capable(b"x_rfl_getfile"):
if remote.capable(b'batch'):
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 batchdefault = 100
else:
batchdefault = 10
batchsize = self.ui.configint(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'remotefilelog', b'batchsize', batchdefault
Augie Fackler
formatting: blacken the codebase...
r43346 )
Augie Fackler
remotefilelog: log when we're about to fetch files...
r42451 self.ui.debug(
b'requesting %d files from '
Augie Fackler
formatting: blacken the codebase...
r43346 b'remotefilelog server...\n' % len(missed)
)
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 _getfilesbatch(
Augie Fackler
formatting: blacken the codebase...
r43346 remote,
self.receivemissing,
progress.increment,
missed,
idmap,
batchsize,
)
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 else:
Augie Fackler
formatting: blacken the codebase...
r43346 raise error.Abort(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b"configured remotefilelog server"
b" does not support remotefilelog"
Augie Fackler
formatting: blacken the codebase...
r43346 )
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530
Augie Fackler
formatting: blacken the codebase...
r43346 self.ui.log(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b"remotefilefetchlog",
b"Success\n",
Augie Fackler
formatting: blacken the codebase...
r43346 fetched_files=progress.pos - fromcache,
total_to_fetch=total - fromcache,
)
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 except Exception:
Augie Fackler
formatting: blacken the codebase...
r43346 self.ui.log(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b"remotefilefetchlog",
b"Fail\n",
Augie Fackler
formatting: blacken the codebase...
r43346 fetched_files=progress.pos - fromcache,
total_to_fetch=total - fromcache,
)
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 raise
finally:
self.ui.verbose = verbose
# send to memcache
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 request = b"set\n%d\n%s\n" % (len(missed), b"\n".join(missed))
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 cache.request(request)
Martin von Zweigbergk
remotefilelog: use progress helper in fileserverclient...
r40881 progress.complete()
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530
# mark ourselves as a user of this cache
writedata.markrepo(self.repo.path)
finally:
os.umask(oldumask)
def receivemissing(self, pipe, filename, node):
line = pipe.readline()[:-1]
if not line:
Augie Fackler
formatting: blacken the codebase...
r43346 raise error.ResponseError(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 _(b"error downloading file contents:"),
_(b"connection closed early"),
Augie Fackler
formatting: blacken the codebase...
r43346 )
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 size = int(line)
data = pipe.read(size)
if len(data) != size:
Augie Fackler
formatting: blacken the codebase...
r43346 raise error.ResponseError(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 _(b"error downloading file contents:"),
_(b"only received %s of %s bytes") % (len(data), size),
Augie Fackler
formatting: blacken the codebase...
r43346 )
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530
Augie Fackler
formatting: blacken the codebase...
r43346 self.writedata.addremotefilelognode(
filename, bin(node), zlib.decompress(data)
)
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530
def connect(self):
if self.cacheprocess:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 cmd = b"%s %s" % (self.cacheprocess, self.writedata._path)
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 self.remotecache.connect(cmd)
else:
# If no cache process is specified, we fake one that always
# returns cache misses. This enables tests to run easily
# and may eventually allow us to be a drop in replacement
# for the largefiles extension.
class simplecache(object):
def __init__(self):
self.missingids = []
self.connected = True
def close(self):
pass
def request(self, value, flush=True):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 lines = value.split(b"\n")
if lines[0] != b"get":
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 return
self.missingids = lines[2:-1]
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.missingids.append(b'0')
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530
def receiveline(self):
if len(self.missingids) > 0:
return self.missingids.pop(0)
return None
self.remotecache = simplecache()
def close(self):
if fetches:
Augie Fackler
formatting: blacken the codebase...
r43346 msg = (
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b"%d files fetched over %d fetches - "
+ b"(%d misses, %0.2f%% hit ratio) over %0.2fs\n"
Augie Fackler
formatting: blacken the codebase...
r43346 ) % (
fetched,
fetches,
fetchmisses,
float(fetched - fetchmisses) / float(fetched) * 100.0,
fetchcost,
)
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 if self.debugoutput:
self.ui.warn(msg)
Augie Fackler
formatting: blacken the codebase...
r43346 self.ui.log(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b"remotefilelog.prefetch",
msg.replace(b"%", b"%%"),
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 remotefilelogfetched=fetched,
remotefilelogfetches=fetches,
remotefilelogfetchmisses=fetchmisses,
Augie Fackler
formatting: blacken the codebase...
r43346 remotefilelogfetchtime=fetchcost * 1000,
)
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530
if self.remotecache.connected:
self.remotecache.close()
Augie Fackler
formatting: blacken the codebase...
r43346 def prefetch(
self, fileids, force=False, fetchdata=True, fetchhistory=False
):
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 """downloads the given file versions to the cache
"""
repo = self.repo
idstocheck = []
for file, id in fileids:
# hack
# - we don't use .hgtags
# - workingctx produces ids with length 42,
# which we skip since they aren't in any cache
Augie Fackler
formatting: blacken the codebase...
r43346 if (
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 file == b'.hgtags'
Augie Fackler
formatting: blacken the codebase...
r43346 or len(id) == 42
or not repo.shallowmatch(file)
):
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 continue
idstocheck.append((file, bin(id)))
datastore = self.datastore
historystore = self.historystore
if force:
datastore = contentstore.unioncontentstore(*repo.shareddatastores)
historystore = metadatastore.unionmetadatastore(
Augie Fackler
formatting: blacken the codebase...
r43346 *repo.sharedhistorystores
)
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530
missingids = set()
if fetchdata:
missingids.update(datastore.getmissing(idstocheck))
if fetchhistory:
missingids.update(historystore.getmissing(idstocheck))
# partition missing nodes into nullid and not-nullid so we can
# warn about this filtering potentially shadowing bugs.
nullids = len([None for unused, id in missingids if id == nullid])
if nullids:
missingids = [(f, id) for f, id in missingids if id != nullid]
repo.ui.develwarn(
Augie Fackler
formatting: blacken the codebase...
r43346 (
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'remotefilelog not fetching %d null revs'
b' - this is likely hiding bugs' % nullids
Augie Fackler
formatting: blacken the codebase...
r43346 ),
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 config=b'remotefilelog-ext',
Augie Fackler
formatting: blacken the codebase...
r43346 )
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 if missingids:
global fetches, fetched, fetchcost
fetches += 1
# We want to be able to detect excess individual file downloads, so
# let's log that information for debugging.
if fetches >= 15 and fetches < 18:
if fetches == 15:
Augie Fackler
formatting: blacken the codebase...
r43346 fetchwarning = self.ui.config(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'remotefilelog', b'fetchwarning'
Augie Fackler
formatting: blacken the codebase...
r43346 )
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 if fetchwarning:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.ui.warn(fetchwarning + b'\n')
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 self.logstacktrace()
Martin von Zweigbergk
remotefilelog: prefetch files in deterministic order...
r42203 missingids = [(file, hex(id)) for file, id in sorted(missingids)]
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 fetched += len(missingids)
start = time.time()
missingids = self.request(missingids)
if missingids:
Augie Fackler
formatting: blacken the codebase...
r43346 raise error.Abort(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 _(b"unable to download %d files") % len(missingids)
Augie Fackler
formatting: blacken the codebase...
r43346 )
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 fetchcost += time.time() - start
self._lfsprefetch(fileids)
def _lfsprefetch(self, fileids):
if not _lfsmod or not util.safehasattr(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.repo.svfs, b'lfslocalblobstore'
Augie Fackler
formatting: blacken the codebase...
r43346 ):
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 return
if not _lfsmod.wrapper.candownload(self.repo):
return
pointers = []
store = self.repo.svfs.lfslocalblobstore
for file, id in fileids:
node = bin(id)
rlog = self.repo.file(file)
if rlog.flags(node) & revlog.REVIDX_EXTSTORED:
rawdata: update caller in remotefilelog...
r43039 text = rlog.rawdata(node)
Augie Fackler
remotefilelog: import pruned-down remotefilelog extension from hg-experimental...
r40530 p = _lfsmod.pointer.deserialize(text)
oid = p.oid()
if not store.has(oid):
pointers.append(p)
if len(pointers) > 0:
self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store)
assert all(store.has(p.oid()) for p in pointers)
def logstacktrace(self):
import traceback
Augie Fackler
formatting: blacken the codebase...
r43346
self.ui.log(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'remotefilelog',
b'excess remotefilelog fetching:\n%s\n',
Kyle Lippincott
remotefilelog: actually fix (and test this time) a bytes vs str issue...
r44453 b''.join(pycompat.sysbytes(s) for s in traceback.format_stack()),
Augie Fackler
formatting: blacken the codebase...
r43346 )