protocol.py
259 lines
| 7.9 KiB
| text/x-python
|
PythonLexer
Augie Fackler
|
r39243 | # Copyright 2016-present Facebook. All Rights Reserved. | ||
# | ||||
# protocol: logic for a server providing fastannotate support | ||||
# | ||||
# This software may be used and distributed according to the terms of the | ||||
# GNU General Public License version 2 or any later version. | ||||
from __future__ import absolute_import | ||||
import contextlib | ||||
import os | ||||
from mercurial.i18n import _ | ||||
Gregory Szorc
|
r43355 | from mercurial.pycompat import open | ||
Augie Fackler
|
r39243 | from mercurial import ( | ||
error, | ||||
extensions, | ||||
hg, | ||||
Matt Harbison
|
r39287 | util, | ||
Augie Fackler
|
r39243 | wireprotov1peer, | ||
wireprotov1server, | ||||
) | ||||
from . import context | ||||
# common | ||||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r39243 | def _getmaster(ui): | ||
"""get the mainbranch, and enforce it is set""" | ||||
Augie Fackler
|
r43347 | master = ui.config(b'fastannotate', b'mainbranch') | ||
Augie Fackler
|
r39243 | if not master: | ||
Augie Fackler
|
r43346 | raise error.Abort( | ||
_( | ||||
Augie Fackler
|
r43347 | b'fastannotate.mainbranch is required ' | ||
b'for both the client and the server' | ||||
Augie Fackler
|
r43346 | ) | ||
) | ||||
Augie Fackler
|
r39243 | return master | ||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r39243 | # server-side | ||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r39243 | def _capabilities(orig, repo, proto): | ||
result = orig(repo, proto) | ||||
Augie Fackler
|
r43347 | result.append(b'getannotate') | ||
Augie Fackler
|
r39243 | return result | ||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r39243 | def _getannotate(repo, proto, path, lastnode): | ||
# output: | ||||
# FILE := vfspath + '\0' + str(size) + '\0' + content | ||||
# OUTPUT := '' | FILE + OUTPUT | ||||
Augie Fackler
|
r43347 | result = b'' | ||
Augie Fackler
|
r43346 | buildondemand = repo.ui.configbool( | ||
Augie Fackler
|
r43347 | b'fastannotate', b'serverbuildondemand', True | ||
Augie Fackler
|
r43346 | ) | ||
Augie Fackler
|
r39243 | with context.annotatecontext(repo, path) as actx: | ||
if buildondemand: | ||||
# update before responding to the client | ||||
master = _getmaster(repo.ui) | ||||
try: | ||||
if not actx.isuptodate(master): | ||||
actx.annotate(master, master) | ||||
except Exception: | ||||
# non-fast-forward move or corrupted. rebuild automically. | ||||
actx.rebuild() | ||||
try: | ||||
actx.annotate(master, master) | ||||
except Exception: | ||||
Augie Fackler
|
r43346 | actx.rebuild() # delete files | ||
Augie Fackler
|
r39243 | finally: | ||
# although the "with" context will also do a close/flush, we | ||||
# need to do it early so we can send the correct respond to | ||||
# client. | ||||
actx.close() | ||||
# send back the full content of revmap and linelog, in the future we | ||||
# may want to do some rsync-like fancy updating. | ||||
# the lastnode check is not necessary if the client and the server | ||||
# agree where the main branch is. | ||||
if actx.lastnode != lastnode: | ||||
for p in [actx.revmappath, actx.linelogpath]: | ||||
if not os.path.exists(p): | ||||
continue | ||||
Augie Fackler
|
r43347 | with open(p, b'rb') as f: | ||
Augie Fackler
|
r39243 | content = f.read() | ||
Augie Fackler
|
r43347 | vfsbaselen = len(repo.vfs.base + b'/') | ||
Augie Fackler
|
r39243 | relpath = p[vfsbaselen:] | ||
Augie Fackler
|
r43347 | result += b'%s\0%d\0%s' % (relpath, len(content), content) | ||
Augie Fackler
|
r39243 | return result | ||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r39243 | def _registerwireprotocommand(): | ||
Augie Fackler
|
r43347 | if b'getannotate' in wireprotov1server.commands: | ||
Augie Fackler
|
r39243 | return | ||
Augie Fackler
|
r43347 | wireprotov1server.wireprotocommand(b'getannotate', b'path lastnode')( | ||
Augie Fackler
|
r43346 | _getannotate | ||
) | ||||
Augie Fackler
|
r39243 | |||
def serveruisetup(ui): | ||||
_registerwireprotocommand() | ||||
Augie Fackler
|
r43347 | extensions.wrapfunction(wireprotov1server, b'_capabilities', _capabilities) | ||
Augie Fackler
|
r39243 | |||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r39243 | # client-side | ||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r39243 | def _parseresponse(payload): | ||
result = {} | ||||
i = 0 | ||||
l = len(payload) - 1 | ||||
Augie Fackler
|
r43346 | state = 0 # 0: vfspath, 1: size | ||
Augie Fackler
|
r43347 | vfspath = size = b'' | ||
Augie Fackler
|
r39243 | while i < l: | ||
Augie Fackler
|
r43346 | ch = payload[i : i + 1] | ||
Augie Fackler
|
r43347 | if ch == b'\0': | ||
Augie Fackler
|
r39243 | if state == 1: | ||
Augie Fackler
|
r43346 | result[vfspath] = payload[i + 1 : i + 1 + int(size)] | ||
Augie Fackler
|
r39243 | i += int(size) | ||
state = 0 | ||||
Augie Fackler
|
r43347 | vfspath = size = b'' | ||
Augie Fackler
|
r39243 | elif state == 0: | ||
state = 1 | ||||
else: | ||||
if state == 1: | ||||
size += ch | ||||
elif state == 0: | ||||
vfspath += ch | ||||
i += 1 | ||||
return result | ||||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r39243 | def peersetup(ui, peer): | ||
class fastannotatepeer(peer.__class__): | ||||
@wireprotov1peer.batchable | ||||
def getannotate(self, path, lastnode=None): | ||||
Augie Fackler
|
r43347 | if not self.capable(b'getannotate'): | ||
ui.warn(_(b'remote peer cannot provide annotate cache\n')) | ||||
Augie Fackler
|
r39243 | yield None, None | ||
else: | ||||
Augie Fackler
|
r43347 | args = {b'path': path, b'lastnode': lastnode or b''} | ||
Augie Fackler
|
r39243 | f = wireprotov1peer.future() | ||
yield args, f | ||||
yield _parseresponse(f.value) | ||||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r39243 | peer.__class__ = fastannotatepeer | ||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r39243 | @contextlib.contextmanager | ||
def annotatepeer(repo): | ||||
ui = repo.ui | ||||
Augie Fackler
|
r39246 | remotepath = ui.expandpath( | ||
Augie Fackler
|
r43347 | ui.config(b'fastannotate', b'remotepath', b'default') | ||
Augie Fackler
|
r43346 | ) | ||
Augie Fackler
|
r39246 | peer = hg.peer(ui, {}, remotepath) | ||
Augie Fackler
|
r39243 | |||
try: | ||||
yield peer | ||||
finally: | ||||
Augie Fackler
|
r39246 | peer.close() | ||
Augie Fackler
|
r39243 | |||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r39243 | def clientfetch(repo, paths, lastnodemap=None, peer=None): | ||
"""download annotate cache from the server for paths""" | ||||
if not paths: | ||||
return | ||||
if peer is None: | ||||
with annotatepeer(repo) as peer: | ||||
return clientfetch(repo, paths, lastnodemap, peer) | ||||
if lastnodemap is None: | ||||
lastnodemap = {} | ||||
ui = repo.ui | ||||
results = [] | ||||
with peer.commandexecutor() as batcher: | ||||
Augie Fackler
|
r43347 | ui.debug(b'fastannotate: requesting %d files\n' % len(paths)) | ||
Augie Fackler
|
r39243 | for p in paths: | ||
Augie Fackler
|
r43346 | results.append( | ||
batcher.callcommand( | ||||
Augie Fackler
|
r43347 | b'getannotate', | ||
{b'path': p, b'lastnode': lastnodemap.get(p)}, | ||||
Augie Fackler
|
r43346 | ) | ||
) | ||||
Augie Fackler
|
r39243 | |||
Martin von Zweigbergk
|
r39751 | for result in results: | ||
r = result.result() | ||||
# TODO: pconvert these paths on the server? | ||||
r = {util.pconvert(p): v for p, v in r.iteritems()} | ||||
for path in sorted(r): | ||||
# ignore malicious paths | ||||
Augie Fackler
|
r43347 | if not path.startswith(b'fastannotate/') or b'/../' in ( | ||
path + b'/' | ||||
Augie Fackler
|
r43346 | ): | ||
Augie Fackler
|
r43347 | ui.debug( | ||
b'fastannotate: ignored malicious path %s\n' % path | ||||
) | ||||
Martin von Zweigbergk
|
r39751 | continue | ||
content = r[path] | ||||
if ui.debugflag: | ||||
Augie Fackler
|
r43346 | ui.debug( | ||
Augie Fackler
|
r43347 | b'fastannotate: writing %d bytes to %s\n' | ||
Augie Fackler
|
r43346 | % (len(content), path) | ||
) | ||||
Martin von Zweigbergk
|
r39751 | repo.vfs.makedirs(os.path.dirname(path)) | ||
Augie Fackler
|
r43347 | with repo.vfs(path, b'wb') as f: | ||
Martin von Zweigbergk
|
r39751 | f.write(content) | ||
Augie Fackler
|
r39243 | |||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r39243 | def _filterfetchpaths(repo, paths): | ||
"""return a subset of paths whose history is long and need to fetch linelog | ||||
from the server. works with remotefilelog and non-remotefilelog repos. | ||||
""" | ||||
Augie Fackler
|
r43347 | threshold = repo.ui.configint(b'fastannotate', b'clientfetchthreshold', 10) | ||
Augie Fackler
|
r39243 | if threshold <= 0: | ||
return paths | ||||
result = [] | ||||
for path in paths: | ||||
try: | ||||
Augie Fackler
|
r39246 | if len(repo.file(path)) >= threshold: | ||
Augie Fackler
|
r39243 | result.append(path) | ||
Augie Fackler
|
r43346 | except Exception: # file not found etc. | ||
Augie Fackler
|
r39243 | result.append(path) | ||
return result | ||||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r39243 | def localreposetup(ui, repo): | ||
class fastannotaterepo(repo.__class__): | ||||
def prefetchfastannotate(self, paths, peer=None): | ||||
master = _getmaster(self.ui) | ||||
needupdatepaths = [] | ||||
lastnodemap = {} | ||||
try: | ||||
for path in _filterfetchpaths(self, paths): | ||||
with context.annotatecontext(self, path) as actx: | ||||
if not actx.isuptodate(master, strict=False): | ||||
needupdatepaths.append(path) | ||||
lastnodemap[path] = actx.lastnode | ||||
if needupdatepaths: | ||||
clientfetch(self, needupdatepaths, lastnodemap, peer) | ||||
except Exception as ex: | ||||
# could be directory not writable or so, not fatal | ||||
Augie Fackler
|
r43347 | self.ui.debug(b'fastannotate: prefetch failed: %r\n' % ex) | ||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r39243 | repo.__class__ = fastannotaterepo | ||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r39243 | def clientreposetup(ui, repo): | ||
_registerwireprotocommand() | ||||
Gregory Szorc
|
r39580 | if repo.local(): | ||
Augie Fackler
|
r39243 | localreposetup(ui, repo) | ||
Augie Fackler
|
r39247 | # TODO: this mutates global state, but only if at least one repo | ||
# has the extension enabled. This is probably bad for hgweb. | ||||
Augie Fackler
|
r39243 | if peersetup not in hg.wirepeersetupfuncs: | ||
hg.wirepeersetupfuncs.append(peersetup) | ||||