##// END OF EJS Templates
wireprotov2peer: stream decoded responses...
wireprotov2peer: stream decoded responses Previously, wire protocol version 2 would buffer all response data. Only once all data was received did we CBOR decode it and resolve the future associated with the command. This was obviously not desirable. In future commits that introduce large response payloads, this caused significant memory bloat and slowed down client operations due to waiting on the server. This commit refactors the response handling code so that response data can be streamed. Command response objects now contain a buffered CBOR decoder. As new data arrives, it is fed into the decoder. Decoded objects are made available to the generator as they are decoded. Because there is a separate thread processing incoming frames and feeding data into the response object, there is the potential for race conditions when mutating response objects. So a lock has been added to guard access to critical state variables. Because the generator emitting decoded objects needs to wait on those objects to become available, we've added an Event for the generator to wait on so it doesn't busy loop. This does mean there is the potential for deadlocks. And I'm pretty sure they can occur in some scenarios. We already have a handful of TODOs around this. But I've added some more. Fixing this will likely require moving the background thread receiving frames into clienthandler. We likely would have done this anyway when implementing the client bits for the SSH transport. Test output changes because the initial CBOR map holding the overall response state is now always handled internally by the response object. Differential Revision: https://phab.mercurial-scm.org/D4474

File last commit:

r38001:bbdc1bc5 default
r39597:d06834e0 default
Show More
logexchange.py
152 lines | 4.6 KiB | text/x-python | PythonLexer
Pulkit Goyal
remotenames: rename related file and storage dir to logexchange...
r35348 # logexchange.py
#
# Copyright 2017 Augie Fackler <raf@durin42.com>
# Copyright 2017 Sean Farley <sean@farley.io>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
from __future__ import absolute_import
from .node import hex
from . import (
Pulkit Goyal
logexchange: introduce helper function to get remote path name...
r36076 util,
Pulkit Goyal
remotenames: rename related file and storage dir to logexchange...
r35348 vfs as vfsmod,
)
# directory name in .hg/ in which remotenames files will be present
remotenamedir = 'logexchange'
def readremotenamefile(repo, filename):
"""
reads a file from .hg/logexchange/ directory and yields it's content
filename: the file to be read
yield a tuple (node, remotepath, name)
"""
vfs = vfsmod.vfs(repo.vfs.join(remotenamedir))
if not vfs.exists(filename):
return
f = vfs(filename)
lineno = 0
for line in f:
line = line.strip()
if not line:
continue
# contains the version number
if lineno == 0:
lineno += 1
try:
node, remote, rname = line.split('\0')
yield node, remote, rname
except ValueError:
pass
f.close()
def readremotenames(repo):
"""
read the details about the remotenames stored in .hg/logexchange/ and
yields a tuple (node, remotepath, name). It does not yields information
about whether an entry yielded is branch or bookmark. To get that
information, call the respective functions.
"""
for bmentry in readremotenamefile(repo, 'bookmarks'):
yield bmentry
for branchentry in readremotenamefile(repo, 'branches'):
yield branchentry
def writeremotenamefile(repo, remotepath, names, nametype):
vfs = vfsmod.vfs(repo.vfs.join(remotenamedir))
f = vfs(nametype, 'w', atomictemp=True)
# write the storage version info on top of file
# version '0' represents the very initial version of the storage format
f.write('0\n\n')
olddata = set(readremotenamefile(repo, nametype))
# re-save the data from a different remote than this one.
for node, oldpath, rname in sorted(olddata):
if oldpath != remotepath:
f.write('%s\0%s\0%s\n' % (node, oldpath, rname))
for name, node in sorted(names.iteritems()):
if nametype == "branches":
for n in node:
f.write('%s\0%s\0%s\n' % (n, remotepath, name))
elif nametype == "bookmarks":
if node:
f.write('%s\0%s\0%s\n' % (node, remotepath, name))
f.close()
def saveremotenames(repo, remotepath, branches=None, bookmarks=None):
"""
save remotenames i.e. remotebookmarks and remotebranches in their
respective files under ".hg/logexchange/" directory.
"""
wlock = repo.wlock()
try:
if bookmarks:
writeremotenamefile(repo, remotepath, bookmarks, 'bookmarks')
if branches:
writeremotenamefile(repo, remotepath, branches, 'branches')
finally:
wlock.release()
Pulkit Goyal
logexchange: introduce helper function to get remote path name...
r36076 def activepath(repo, remote):
"""returns remote path"""
local = None
# is the remote a local peer
local = remote.local()
# determine the remote path from the repo, if possible; else just
# use the string given to us
rpath = remote
if local:
rpath = remote._repo.root
Pulkit Goyal
py3: use bytes instead of str in instance()...
r37383 elif not isinstance(remote, bytes):
Pulkit Goyal
logexchange: introduce helper function to get remote path name...
r36076 rpath = remote._url
# represent the remotepath with user defined path name if exists
for path, url in repo.ui.configitems('paths'):
# remove auth info from user defined url
Pulkit Goyal
remotenames: check the remotepath with url containing user information too...
r38001 noauthurl = util.removeauth(url)
if url == rpath or noauthurl == rpath:
Pulkit Goyal
logexchange: introduce helper function to get remote path name...
r36076 rpath = path
break
return rpath
Pulkit Goyal
remotenames: rename related file and storage dir to logexchange...
r35348 def pullremotenames(localrepo, remoterepo):
"""
pulls bookmarks and branches information of the remote repo during a
pull or clone operation.
localrepo is our local repository
remoterepo is the peer instance
"""
Pulkit Goyal
logexchange: introduce helper function to get remote path name...
r36076 remotepath = activepath(localrepo, remoterepo)
Gregory Szorc
logexchange: use command executor for wire protocol commands...
r37657
with remoterepo.commandexecutor() as e:
bookmarks = e.callcommand('listkeys', {
'namespace': 'bookmarks',
}).result()
Pulkit Goyal
remotenames: rename related file and storage dir to logexchange...
r35348 # on a push, we don't want to keep obsolete heads since
# they won't show up as heads on the next pull, so we
# remove them here otherwise we would require the user
# to issue a pull to refresh the storage
bmap = {}
repo = localrepo.unfiltered()
Gregory Szorc
logexchange: use command executor for wire protocol commands...
r37657
with remoterepo.commandexecutor() as e:
branchmap = e.callcommand('branchmap', {}).result()
for branch, nodes in branchmap.iteritems():
Pulkit Goyal
remotenames: rename related file and storage dir to logexchange...
r35348 bmap[branch] = []
for node in nodes:
if node in repo and not repo[node].obsolete():
bmap[branch].append(hex(node))
saveremotenames(localrepo, remotepath, bmap, bookmarks)