##// END OF EJS Templates
test-ssh: handle very slow ssh transfer rate
test-ssh: handle very slow ssh transfer rate

File last commit:

r12703:40bb5853 default
r12773:98aaf58a default
Show More
wireproto.py
338 lines | 11.1 KiB | text/x-python | PythonLexer
Matt Mackall
protocol: introduce wireproto.py...
r11581 # wireproto.py - generic wire protocol support functions
#
# Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
Benoit Boissinot
fix undefined variables, spotted by pylint
r11879 import urllib, tempfile, os, sys
Matt Mackall
protocol: introduce wireproto.py...
r11581 from i18n import _
from node import bin, hex
Matt Mackall
protocol: unify unbundle on the server side
r11593 import changegroup as changegroupmod
Dirkjan Ochtman
protocol: move the streamclone implementation into wireproto
r11627 import repo, error, encoding, util, store
Martin Geisler
Consistently import foo as foomod when foo to avoid shadowing...
r12085 import pushkey as pushkeymod
Matt Mackall
protocol: introduce wireproto.py...
r11581
Benoit Boissinot
wireproto: refactor list of nodeid encoding / decoding
r11597 # list of nodes encoding / decoding
def decodelist(l, sep=' '):
return map(bin, l.split(sep))
def encodelist(l, sep=' '):
return sep.join(map(hex, l))
Matt Mackall
protocol: move basic ssh client commands to wirerepository
r11586 # client side
class wirerepository(repo.repository):
def lookup(self, key):
self.requirecap('lookup', _('look up remote revision'))
d = self._call("lookup", key=key)
success, data = d[:-1].split(" ", 1)
if int(success):
return bin(data)
self._abort(error.RepoError(data))
def heads(self):
d = self._call("heads")
try:
Benoit Boissinot
wireproto: refactor list of nodeid encoding / decoding
r11597 return decodelist(d[:-1])
Matt Mackall
protocol: move basic ssh client commands to wirerepository
r11586 except:
Benoit Boissinot
fix undefined variables, spotted by pylint
r11879 self._abort(error.ResponseError(_("unexpected response:"), d))
Matt Mackall
protocol: move basic ssh client commands to wirerepository
r11586
def branchmap(self):
d = self._call("branchmap")
try:
branchmap = {}
for branchpart in d.splitlines():
Benoit Boissinot
wireproto: refactor list of nodeid encoding / decoding
r11597 branchname, branchheads = branchpart.split(' ', 1)
branchname = urllib.unquote(branchname)
Matt Mackall
protocol: move basic ssh client commands to wirerepository
r11586 # Earlier servers (1.3.x) send branch names in (their) local
# charset. The best we can do is assume it's identical to our
# own local charset, in case it's not utf-8.
try:
branchname.decode('utf-8')
except UnicodeDecodeError:
branchname = encoding.fromlocal(branchname)
Benoit Boissinot
wireproto: refactor list of nodeid encoding / decoding
r11597 branchheads = decodelist(branchheads)
Matt Mackall
protocol: move basic ssh client commands to wirerepository
r11586 branchmap[branchname] = branchheads
return branchmap
except TypeError:
self._abort(error.ResponseError(_("unexpected response:"), d))
def branches(self, nodes):
Benoit Boissinot
wireproto: refactor list of nodeid encoding / decoding
r11597 n = encodelist(nodes)
Matt Mackall
protocol: move basic ssh client commands to wirerepository
r11586 d = self._call("branches", nodes=n)
try:
Benoit Boissinot
wireproto: refactor list of nodeid encoding / decoding
r11597 br = [tuple(decodelist(b)) for b in d.splitlines()]
Matt Mackall
protocol: move basic ssh client commands to wirerepository
r11586 return br
except:
self._abort(error.ResponseError(_("unexpected response:"), d))
def between(self, pairs):
Matt Mackall
protocol: unify basic http client requests
r11587 batch = 8 # avoid giant requests
r = []
for i in xrange(0, len(pairs), batch):
Benoit Boissinot
wireproto: refactor list of nodeid encoding / decoding
r11597 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
Matt Mackall
protocol: unify basic http client requests
r11587 d = self._call("between", pairs=n)
try:
Benoit Boissinot
wireproto: refactor list of nodeid encoding / decoding
r11597 r.extend(l and decodelist(l) or [] for l in d.splitlines())
Matt Mackall
protocol: unify basic http client requests
r11587 except:
self._abort(error.ResponseError(_("unexpected response:"), d))
return r
Matt Mackall
protocol: move basic ssh client commands to wirerepository
r11586
def pushkey(self, namespace, key, old, new):
if not self.capable('pushkey'):
return False
d = self._call("pushkey",
namespace=namespace, key=key, old=old, new=new)
return bool(int(d))
def listkeys(self, namespace):
if not self.capable('pushkey'):
return {}
d = self._call("listkeys", namespace=namespace)
r = {}
for l in d.splitlines():
k, v = l.split('\t')
r[k.decode('string-escape')] = v.decode('string-escape')
return r
Matt Mackall
protocol: unify stream_out client code
r11588 def stream_out(self):
return self._callstream('stream_out')
Matt Mackall
protocol: unify client changegroup methods
r11591 def changegroup(self, nodes, kind):
Benoit Boissinot
wireproto: refactor list of nodeid encoding / decoding
r11597 n = encodelist(nodes)
Matt Mackall
protocol: unify client changegroup methods
r11591 f = self._callstream("changegroup", roots=n)
Matt Mackall
bundle: encapsulate all bundle streams in unbundle class
r12337 return changegroupmod.unbundle10(self._decompress(f), 'UN')
Matt Mackall
protocol: unify client changegroup methods
r11591
def changegroupsubset(self, bases, heads, kind):
self.requirecap('changegroupsubset', _('look up remote changes'))
Benoit Boissinot
wireproto: refactor list of nodeid encoding / decoding
r11597 bases = encodelist(bases)
heads = encodelist(heads)
Matt Mackall
bundle: encapsulate all bundle streams in unbundle class
r12337 f = self._callstream("changegroupsubset",
bases=bases, heads=heads)
return changegroupmod.unbundle10(self._decompress(f), 'UN')
Matt Mackall
protocol: unify client changegroup methods
r11591
Matt Mackall
protocol: unify client unbundle support...
r11592 def unbundle(self, cg, heads, source):
'''Send cg (a readable file-like object representing the
changegroup to push, typically a chunkbuffer object) to the
remote server as a bundle. Return an integer indicating the
result of the push (see localrepository.addchangegroup()).'''
Benoit Boissinot
wireproto: refactor list of nodeid encoding / decoding
r11597 ret, output = self._callpush("unbundle", cg, heads=encodelist(heads))
Matt Mackall
protocol: unify client unbundle support...
r11592 if ret == "":
raise error.ResponseError(
_('push failed:'), output)
try:
ret = int(ret)
Brodie Rao
cleanup: remove unused variables
r12063 except ValueError:
Matt Mackall
protocol: unify client unbundle support...
r11592 raise error.ResponseError(
_('push failed (unexpected response):'), ret)
for l in output.splitlines(True):
self.ui.status(_('remote: '), l)
return ret
Matt Mackall
protocol: move basic ssh client commands to wirerepository
r11586 # server side
Dirkjan Ochtman
protocol: wrap non-string protocol responses in classes
r11625 class streamres(object):
def __init__(self, gen):
self.gen = gen
class pushres(object):
def __init__(self, res):
self.res = res
Benoit Boissinot
wireproto: introduce pusherr() to deal with "unsynced changes" error...
r12703 class pusherr(object):
def __init__(self, res):
self.res = res
Matt Mackall
protocol: introduce wireproto.py...
r11581 def dispatch(repo, proto, command):
func, spec = commands[command]
args = proto.getargs(spec)
Dirkjan Ochtman
protocol: wrap non-string protocol responses in classes
r11625 return func(repo, proto, *args)
Matt Mackall
protocol: introduce wireproto.py...
r11581
Matt Mackall
protocol: add proto to method prototypes
r11583 def between(repo, proto, pairs):
Benoit Boissinot
wireproto: refactor list of nodeid encoding / decoding
r11597 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
Matt Mackall
protocol: introduce wireproto.py...
r11581 r = []
for b in repo.between(pairs):
Benoit Boissinot
wireproto: refactor list of nodeid encoding / decoding
r11597 r.append(encodelist(b) + "\n")
Matt Mackall
protocol: introduce wireproto.py...
r11581 return "".join(r)
Matt Mackall
protocol: add proto to method prototypes
r11583 def branchmap(repo, proto):
Matt Mackall
protocol: introduce wireproto.py...
r11581 branchmap = repo.branchmap()
heads = []
for branch, nodes in branchmap.iteritems():
branchname = urllib.quote(branch)
Benoit Boissinot
wireproto: refactor list of nodeid encoding / decoding
r11597 branchnodes = encodelist(nodes)
heads.append('%s %s' % (branchname, branchnodes))
Matt Mackall
protocol: introduce wireproto.py...
r11581 return '\n'.join(heads)
Matt Mackall
protocol: add proto to method prototypes
r11583 def branches(repo, proto, nodes):
Benoit Boissinot
wireproto: refactor list of nodeid encoding / decoding
r11597 nodes = decodelist(nodes)
Matt Mackall
protocol: introduce wireproto.py...
r11581 r = []
for b in repo.branches(nodes):
Benoit Boissinot
wireproto: refactor list of nodeid encoding / decoding
r11597 r.append(encodelist(b) + "\n")
Matt Mackall
protocol: introduce wireproto.py...
r11581 return "".join(r)
Matt Mackall
protocol: unify server-side capabilities functions
r11594 def capabilities(repo, proto):
caps = 'lookup changegroupsubset branchmap pushkey'.split()
Dirkjan Ochtman
protocol: move the streamclone implementation into wireproto
r11627 if _allowstream(repo.ui):
Sune Foldager
clone: only use stream when we understand the revlog format...
r12296 requiredformats = repo.requirements & repo.supportedformats
# if our local revlogs are just revlogv1, add 'stream' cap
if not requiredformats - set(('revlogv1',)):
caps.append('stream')
# otherwise, add 'streamreqs' detailing our local revlog format
else:
caps.append('streamreqs=%s' % ','.join(requiredformats))
Matt Mackall
protocol: unify server-side capabilities functions
r11594 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
return ' '.join(caps)
Matt Mackall
protocol: unify changegroup commands...
r11584 def changegroup(repo, proto, roots):
Benoit Boissinot
wireproto: refactor list of nodeid encoding / decoding
r11597 nodes = decodelist(roots)
Matt Mackall
protocol: unify changegroup commands...
r11584 cg = repo.changegroup(nodes, 'serve')
Dirkjan Ochtman
protocol: wrap non-string protocol responses in classes
r11625 return streamres(proto.groupchunks(cg))
Matt Mackall
protocol: unify changegroup commands...
r11584
def changegroupsubset(repo, proto, bases, heads):
Benoit Boissinot
wireproto: refactor list of nodeid encoding / decoding
r11597 bases = decodelist(bases)
heads = decodelist(heads)
Matt Mackall
protocol: unify changegroup commands...
r11584 cg = repo.changegroupsubset(bases, heads, 'serve')
Dirkjan Ochtman
protocol: wrap non-string protocol responses in classes
r11625 return streamres(proto.groupchunks(cg))
Matt Mackall
protocol: unify changegroup commands...
r11584
Matt Mackall
protocol: add proto to method prototypes
r11583 def heads(repo, proto):
Matt Mackall
protocol: introduce wireproto.py...
r11581 h = repo.heads()
Benoit Boissinot
wireproto: refactor list of nodeid encoding / decoding
r11597 return encodelist(h) + "\n"
Matt Mackall
protocol: introduce wireproto.py...
r11581
Matt Mackall
protocol: unify server-side capabilities functions
r11594 def hello(repo, proto):
'''the hello command returns a set of lines describing various
interesting things about the server, in an RFC822-like format.
Currently the only one defined is "capabilities", which
consists of a line in the form:
capabilities: space separated list of tokens
'''
return "capabilities: %s\n" % (capabilities(repo, proto))
Matt Mackall
protocol: add proto to method prototypes
r11583 def listkeys(repo, proto, namespace):
Martin Geisler
Consistently import foo as foomod when foo to avoid shadowing...
r12085 d = pushkeymod.list(repo, namespace).items()
Matt Mackall
protocol: introduce wireproto.py...
r11581 t = '\n'.join(['%s\t%s' % (k.encode('string-escape'),
v.encode('string-escape')) for k, v in d])
return t
Matt Mackall
protocol: add proto to method prototypes
r11583 def lookup(repo, proto, key):
Matt Mackall
protocol: introduce wireproto.py...
r11581 try:
r = hex(repo.lookup(key))
success = 1
except Exception, inst:
r = str(inst)
success = 0
return "%s %s\n" % (success, r)
Matt Mackall
protocol: add proto to method prototypes
r11583 def pushkey(repo, proto, namespace, key, old, new):
Martin Geisler
Consistently import foo as foomod when foo to avoid shadowing...
r12085 r = pushkeymod.push(repo, namespace, key, old, new)
Matt Mackall
protocol: introduce wireproto.py...
r11581 return '%s\n' % int(r)
Dirkjan Ochtman
protocol: move the streamclone implementation into wireproto
r11627 def _allowstream(ui):
return ui.configbool('server', 'uncompressed', True, untrusted=True)
Matt Mackall
protocol: unify stream_out command
r11585 def stream(repo, proto):
Dirkjan Ochtman
protocol: move the streamclone implementation into wireproto
r11627 '''If the server supports streaming clone, it advertises the "stream"
capability with a value representing the version and flags of the repo
it is serving. Client checks to see if it understands the format.
The format is simple: the server writes out a line with the amount
of files, then the total amount of bytes to be transfered (separated
by a space). Then, for each file, the server first writes the filename
and filesize (separated by the null character), then the file contents.
'''
if not _allowstream(repo.ui):
return '1\n'
entries = []
total_bytes = 0
try:
# get consistent snapshot of repo, lock during scan
lock = repo.lock()
try:
repo.ui.debug('scanning\n')
for name, ename, size in repo.store.walk():
entries.append((name, size))
total_bytes += size
finally:
lock.release()
except error.LockError:
return '2\n' # error: 2
def streamer(repo, entries, total):
'''stream out all metadata files in repository.'''
yield '0\n' # success
repo.ui.debug('%d files, %d bytes to transfer\n' %
(len(entries), total_bytes))
yield '%d %d\n' % (len(entries), total_bytes)
for name, size in entries:
repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
# partially encode name over the wire for backwards compat
yield '%s\0%d\n' % (store.encodedir(name), size)
for chunk in util.filechunkiter(repo.sopener(name), limit=size):
yield chunk
return streamres(streamer(repo, entries, total_bytes))
Matt Mackall
protocol: unify stream_out command
r11585
Matt Mackall
protocol: unify unbundle on the server side
r11593 def unbundle(repo, proto, heads):
Benoit Boissinot
wireproto: refactor list of nodeid encoding / decoding
r11597 their_heads = decodelist(heads)
Matt Mackall
protocol: unify unbundle on the server side
r11593
def check_heads():
Benoit Boissinot
wireproto: refactor list of nodeid encoding / decoding
r11597 heads = repo.heads()
return their_heads == ['force'] or their_heads == heads
Matt Mackall
protocol: unify unbundle on the server side
r11593
Benoit Boissinot
wireproto: redirect the output earlier
r12702 proto.redirect()
Matt Mackall
protocol: unify unbundle on the server side
r11593 # fail early if possible
if not check_heads():
Benoit Boissinot
wireproto: introduce pusherr() to deal with "unsynced changes" error...
r12703 return pusherr('unsynced changes')
Matt Mackall
protocol: unify unbundle on the server side
r11593
# write bundle data to temporary file because it can be big
fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
fp = os.fdopen(fd, 'wb+')
r = 0
try:
proto.getfile(fp)
lock = repo.lock()
try:
if not check_heads():
# someone else committed/pushed/unbundled while we
# were transferring data
Benoit Boissinot
wireproto: introduce pusherr() to deal with "unsynced changes" error...
r12703 return pusherr('unsynced changes')
Matt Mackall
protocol: unify unbundle on the server side
r11593
# push can proceed
fp.seek(0)
Matt Mackall
bundle: unify/refactor unbundle/readbundle
r12042 gen = changegroupmod.readbundle(fp, None)
Matt Mackall
protocol: unify unbundle on the server side
r11593
try:
r = repo.addchangegroup(gen, 'serve', proto._client(),
lock=lock)
except util.Abort, inst:
sys.stderr.write("abort: %s\n" % inst)
finally:
lock.release()
Benoit Boissinot
wireproto: return in finally was messing with the return inside the block
r12701 return pushres(r)
Matt Mackall
protocol: unify unbundle on the server side
r11593
finally:
fp.close()
os.unlink(tempname)
Matt Mackall
protocol: introduce wireproto.py...
r11581 commands = {
'between': (between, 'pairs'),
'branchmap': (branchmap, ''),
'branches': (branches, 'nodes'),
Matt Mackall
protocol: unify server-side capabilities functions
r11594 'capabilities': (capabilities, ''),
Matt Mackall
protocol: unify changegroup commands...
r11584 'changegroup': (changegroup, 'roots'),
'changegroupsubset': (changegroupsubset, 'bases heads'),
Matt Mackall
protocol: introduce wireproto.py...
r11581 'heads': (heads, ''),
Matt Mackall
protocol: unify server-side capabilities functions
r11594 'hello': (hello, ''),
Matt Mackall
protocol: introduce wireproto.py...
r11581 'listkeys': (listkeys, 'namespace'),
'lookup': (lookup, 'key'),
'pushkey': (pushkey, 'namespace key old new'),
Matt Mackall
protocol: unify stream_out command
r11585 'stream_out': (stream, ''),
Matt Mackall
protocol: unify unbundle on the server side
r11593 'unbundle': (unbundle, 'heads'),
Matt Mackall
protocol: introduce wireproto.py...
r11581 }