##// END OF EJS Templates
wireproto: compress data from a generator...
wireproto: compress data from a generator Currently, the "getbundle" wire protocol command obtains a generator of data, converts it to a util.chunkbuffer, then converts it back to a generator via the protocol's groupchunks() implementation. For the SSH protocol, groupchunks() simply reads 4kb chunks then write()s the data to a file descriptor. For the HTTP protocol, groupchunks() reads 32kb chunks, feeds those into a zlib compressor, emits compressed data as it is available, and that is sent to the WSGI layer, where it is likely turned into HTTP chunked transfer chunks as is or further buffered and turned into a larger chunk. For both the SSH and HTTP protocols, there is inefficiency from using util.chunkbuffer. For SSH, emitting consistent 4kb chunks sounds nice. However, the file descriptor it is writing to is almost certainly buffered. That means that a Python .write() probably doesn't translate into exactly what is written to the I/O layer. For HTTP, we're going through an intermediate layer to zlib compress data. So all util.chunkbuffer is doing is ensuring that the chunks we feed into the zlib compressor are of uniform size. This means more CPU time in Python buffering and emitting chunks in util.chunkbuffer but fewer function calls to zlib. This patch introduces and implements a new wire protocol abstract method: compresschunks(). It is like groupchunks() except it operates on a generator instead of something with a .read(). The SSH implementation simply proxies chunks. The HTTP implementation uses zlib compression. To avoid duplicate code, the HTTP groupchunks() has been reimplemented in terms of compresschunks(). To prove this all works, the "getbundle" wire protocol command has been switched to compresschunks(). This removes the util.chunkbuffer from that command. Now, data essentially streams straight from the changegroup emitter to the wire, possibly through a zlib compressor. Generators all the way, baby. There were slim to no performance changes on the server as measured with the mozilla-central repository. This is likely because CPU time is dominated by reading revlogs, producing the changegroup, and zlib compressing the output stream. Still, this brings us a little closer to our ideal of using generators everywhere.

File last commit:

r30206:d1051954 default
r30206:d1051954 default
Show More
protocol.py
133 lines | 4.0 KiB | text/x-python | PythonLexer
#
# Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
# Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
from __future__ import absolute_import
import cgi
import zlib
from .common import (
HTTP_OK,
)
from .. import (
util,
wireproto,
)
stringio = util.stringio
urlerr = util.urlerr
urlreq = util.urlreq
HGTYPE = 'application/mercurial-0.1'
HGERRTYPE = 'application/hg-error'
class webproto(wireproto.abstractserverproto):
def __init__(self, req, ui):
self.req = req
self.response = ''
self.ui = ui
def getargs(self, args):
knownargs = self._args()
data = {}
keys = args.split()
for k in keys:
if k == '*':
star = {}
for key in knownargs.keys():
if key != 'cmd' and key not in keys:
star[key] = knownargs[key][0]
data['*'] = star
else:
data[k] = knownargs[k][0]
return [data[k] for k in keys]
def _args(self):
args = self.req.form.copy()
postlen = int(self.req.env.get('HTTP_X_HGARGS_POST', 0))
if postlen:
args.update(cgi.parse_qs(
self.req.read(postlen), keep_blank_values=True))
return args
chunks = []
i = 1
while True:
h = self.req.env.get('HTTP_X_HGARG_' + str(i))
if h is None:
break
chunks += [h]
i += 1
args.update(cgi.parse_qs(''.join(chunks), keep_blank_values=True))
return args
def getfile(self, fp):
length = int(self.req.env['CONTENT_LENGTH'])
for s in util.filechunkiter(self.req, limit=length):
fp.write(s)
def redirect(self):
self.oldio = self.ui.fout, self.ui.ferr
self.ui.ferr = self.ui.fout = stringio()
def restore(self):
val = self.ui.fout.getvalue()
self.ui.ferr, self.ui.fout = self.oldio
return val
def groupchunks(self, fh):
def getchunks():
while True:
chunk = fh.read(32768)
if not chunk:
break
yield chunk
return self.compresschunks(getchunks())
def compresschunks(self, chunks):
# Don't allow untrusted settings because disabling compression or
# setting a very high compression level could lead to flooding
# the server's network or CPU.
z = zlib.compressobj(self.ui.configint('server', 'zliblevel', -1))
for chunk in chunks:
data = z.compress(chunk)
# Not all calls to compress() emit data. It is cheaper to inspect
# that here than to send it via the generator.
if data:
yield data
yield z.flush()
def _client(self):
return 'remote:%s:%s:%s' % (
self.req.env.get('wsgi.url_scheme') or 'http',
urlreq.quote(self.req.env.get('REMOTE_HOST', '')),
urlreq.quote(self.req.env.get('REMOTE_USER', '')))
def iscmd(cmd):
return cmd in wireproto.commands
def call(repo, req, cmd):
p = webproto(req, repo.ui)
rsp = wireproto.dispatch(repo, p, cmd)
if isinstance(rsp, str):
req.respond(HTTP_OK, HGTYPE, body=rsp)
return []
elif isinstance(rsp, wireproto.streamres):
req.respond(HTTP_OK, HGTYPE)
return rsp.gen
elif isinstance(rsp, wireproto.pushres):
val = p.restore()
rsp = '%d\n%s' % (rsp.res, val)
req.respond(HTTP_OK, HGTYPE, body=rsp)
return []
elif isinstance(rsp, wireproto.pusherr):
# drain the incoming bundle
req.drain()
p.restore()
rsp = '0\n%s\n' % rsp.res
req.respond(HTTP_OK, HGTYPE, body=rsp)
return []
elif isinstance(rsp, wireproto.ooberror):
rsp = rsp.message
req.respond(HTTP_OK, HGERRTYPE, body=rsp)
return []