# HG changeset patch # User Joerg Sonnenberger # Date 2018-01-12 09:59:58 # Node ID a39a9df7ecca0d1750647d04e2f22fc32ff0264a # Parent 3a3b59bbe7ce37c8b90add5fcd9d8f3e69b32db9 wireproto: split streamres into legacy and modern case A couple of commands currently require transmission of uncompressed frames with the old MIME type. Split this case from streamres into a new streamres_legacy class. Streamline the remaining code accordingly. Add a new flag to streamres to request uncompressed streams. This is useful for sending data that is already compressed like a pre-built bundle. Expect clients to support uncompressed data. For older clients, zlib will still be used. Differential Revision: https://phab.mercurial-scm.org/D1862 diff --git a/hgext/largefiles/proto.py b/hgext/largefiles/proto.py --- a/hgext/largefiles/proto.py +++ b/hgext/largefiles/proto.py @@ -75,7 +75,7 @@ def getlfile(repo, proto, sha): yield '%d\n' % length for chunk in util.filechunkiter(f): yield chunk - return wireproto.streamres(gen=generator()) + return wireproto.streamres_legacy(gen=generator()) def statlfile(repo, proto, sha): '''Server command for checking if a largefile is present - returns '2\n' if diff --git a/mercurial/hgweb/protocol.py b/mercurial/hgweb/protocol.py --- a/mercurial/hgweb/protocol.py +++ b/mercurial/hgweb/protocol.py @@ -102,25 +102,20 @@ class webproto(wireproto.abstractserverp urlreq.quote(self.req.env.get('REMOTE_HOST', '')), urlreq.quote(self.req.env.get('REMOTE_USER', ''))) - def responsetype(self, v1compressible=False): + def responsetype(self, prefer_uncompressed): """Determine the appropriate response type and compression settings. - The ``v1compressible`` argument states whether the response with - application/mercurial-0.1 media types should be zlib compressed. - Returns a tuple of (mediatype, compengine, engineopts). """ - # For now, if it isn't compressible in the old world, it's never - # compressible. We can change this to send uncompressed 0.2 payloads - # later. - if not v1compressible: - return HGTYPE, None, None - # Determine the response media type and compression engine based # on the request parameters. protocaps = decodevaluefromheaders(self.req, r'X-HgProto').split(' ') if '0.2' in protocaps: + # All clients are expected to support uncompressed data. + if prefer_uncompressed: + return HGTYPE2, util._noopengine(), {} + # Default as defined by wire protocol spec. compformats = ['zlib', 'none'] for cap in protocaps: @@ -155,7 +150,7 @@ def iscmd(cmd): def call(repo, req, cmd): p = webproto(req, repo.ui) - def genversion2(gen, compress, engine, engineopts): + def genversion2(gen, engine, engineopts): # application/mercurial-0.2 always sends a payload header # identifying the compression engine. name = engine.wireprotosupport().name @@ -163,28 +158,27 @@ def call(repo, req, cmd): yield struct.pack('B', len(name)) yield name - if compress: - for chunk in engine.compressstream(gen, opts=engineopts): - yield chunk - else: - for chunk in gen: - yield chunk + for chunk in gen: + yield chunk rsp = wireproto.dispatch(repo, p, cmd) if isinstance(rsp, bytes): req.respond(HTTP_OK, HGTYPE, body=rsp) return [] + elif isinstance(rsp, wireproto.streamres_legacy): + gen = rsp.gen + req.respond(HTTP_OK, HGTYPE) + return gen elif isinstance(rsp, wireproto.streamres): gen = rsp.gen # This code for compression should not be streamres specific. It # is here because we only compress streamres at the moment. - mediatype, engine, engineopts = p.responsetype(rsp.v1compressible) + mediatype, engine, engineopts = p.responsetype(rsp.prefer_uncompressed) + gen = engine.compressstream(gen, engineopts) - if mediatype == HGTYPE and rsp.v1compressible: - gen = engine.compressstream(gen, engineopts) - elif mediatype == HGTYPE2: - gen = genversion2(gen, rsp.v1compressible, engine, engineopts) + if mediatype == HGTYPE2: + gen = genversion2(gen, engine, engineopts) req.respond(HTTP_OK, mediatype) return gen diff --git a/mercurial/sshserver.py b/mercurial/sshserver.py --- a/mercurial/sshserver.py +++ b/mercurial/sshserver.py @@ -105,6 +105,7 @@ class sshserver(wireproto.abstractserver handlers = { str: sendresponse, wireproto.streamres: sendstream, + wireproto.streamres_legacy: sendstream, wireproto.pushres: sendpushresponse, wireproto.pusherr: sendpusherror, wireproto.ooberror: sendooberror, diff --git a/mercurial/wireproto.py b/mercurial/wireproto.py --- a/mercurial/wireproto.py +++ b/mercurial/wireproto.py @@ -522,15 +522,26 @@ class streamres(object): Accepts a generator containing chunks of data to be sent to the client. - ``v1compressible`` indicates whether this data can be compressed to - "version 1" clients (technically: HTTP peers using - application/mercurial-0.1 media type). This flag should NOT be used on - new commands because new clients should support a more modern compression - mechanism. + ``prefer_uncompressed`` indicates that the data is expected to be + uncompressable and that the stream should therefore use the ``none`` + engine. """ - def __init__(self, gen=None, v1compressible=False): + def __init__(self, gen=None, prefer_uncompressed=False): self.gen = gen - self.v1compressible = v1compressible + self.prefer_uncompressed = prefer_uncompressed + +class streamres_legacy(object): + """wireproto reply: uncompressed binary stream + + The call was successful and the result is a stream. + + Accepts a generator containing chunks of data to be sent to the client. + + Like ``streamres``, but sends an uncompressed data for "version 1" clients + using the application/mercurial-0.1 media type. + """ + def __init__(self, gen=None): + self.gen = gen class pushres(object): """wireproto reply: success with simple integer return @@ -802,7 +813,7 @@ def changegroup(repo, proto, roots): missingheads=repo.heads()) cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve') gen = iter(lambda: cg.read(32768), '') - return streamres(gen=gen, v1compressible=True) + return streamres(gen=gen) @wireprotocommand('changegroupsubset', 'bases heads') def changegroupsubset(repo, proto, bases, heads): @@ -812,7 +823,7 @@ def changegroupsubset(repo, proto, bases missingheads=heads) cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve') gen = iter(lambda: cg.read(32768), '') - return streamres(gen=gen, v1compressible=True) + return streamres(gen=gen) @wireprotocommand('debugwireargs', 'one two *') def debugwireargs(repo, proto, one, two, others): @@ -877,8 +888,8 @@ def getbundle(repo, proto, others): advargs.append(('hint', exc.hint)) bundler.addpart(bundle2.bundlepart('error:abort', manargs, advargs)) - return streamres(gen=bundler.getchunks(), v1compressible=True) - return streamres(gen=chunks, v1compressible=True) + return streamres(gen=bundler.getchunks()) + return streamres(gen=chunks) @wireprotocommand('heads') def heads(repo, proto): @@ -955,7 +966,7 @@ def stream(repo, proto): capability with a value representing the version and flags of the repo it is serving. Client checks to see if it understands the format. ''' - return streamres(streamclone.generatev1wireproto(repo)) + return streamres_legacy(streamclone.generatev1wireproto(repo)) @wireprotocommand('unbundle', 'heads') def unbundle(repo, proto, heads): @@ -990,7 +1001,7 @@ def unbundle(repo, proto, heads): if util.safehasattr(r, 'addpart'): # The return looks streamable, we are in the bundle2 case and # should return a stream. - return streamres(gen=r.getchunks()) + return streamres_legacy(gen=r.getchunks()) return pushres(r) finally: @@ -1054,4 +1065,4 @@ def unbundle(repo, proto, heads): manargs, advargs)) except error.PushRaced as exc: bundler.newpart('error:pushraced', [('message', str(exc))]) - return streamres(gen=bundler.getchunks()) + return streamres_legacy(gen=bundler.getchunks())