diff --git a/hgext/largefiles/proto.py b/hgext/largefiles/proto.py --- a/hgext/largefiles/proto.py +++ b/hgext/largefiles/proto.py @@ -76,7 +76,7 @@ def getlfile(repo, proto, sha): yield '%d\n' % length for chunk in util.filechunkiter(f): yield chunk - return wireproto.streamres(generator()) + return wireproto.streamres(gen=generator()) def statlfile(repo, proto, sha): '''Server command for checking if a largefile is present - returns '2\n' if diff --git a/mercurial/hgweb/protocol.py b/mercurial/hgweb/protocol.py --- a/mercurial/hgweb/protocol.py +++ b/mercurial/hgweb/protocol.py @@ -73,16 +73,6 @@ class webproto(wireproto.abstractserverp self.ui.ferr, self.ui.fout = self.oldio return val - def groupchunks(self, fh): - def getchunks(): - while True: - chunk = fh.read(32768) - if not chunk: - break - yield chunk - - return self.compresschunks(getchunks()) - def compresschunks(self, chunks): # Don't allow untrusted settings because disabling compression or # setting a very high compression level could lead to flooding @@ -106,8 +96,16 @@ def call(repo, req, cmd): req.respond(HTTP_OK, HGTYPE, body=rsp) return [] elif isinstance(rsp, wireproto.streamres): + if rsp.reader: + gen = iter(lambda: rsp.reader.read(32768), '') + else: + gen = rsp.gen + + if rsp.v1compressible: + gen = p.compresschunks(gen) + req.respond(HTTP_OK, HGTYPE) - return rsp.gen + return gen elif isinstance(rsp, wireproto.pushres): val = p.restore() rsp = '%d\n%s' % (rsp.res, val) diff --git a/mercurial/sshserver.py b/mercurial/sshserver.py --- a/mercurial/sshserver.py +++ b/mercurial/sshserver.py @@ -68,13 +68,6 @@ class sshserver(wireproto.abstractserver def redirect(self): pass - def groupchunks(self, fh): - return iter(lambda: fh.read(4096), '') - - def compresschunks(self, chunks): - for chunk in chunks: - yield chunk - def sendresponse(self, v): self.fout.write("%d\n" % len(v)) self.fout.write(v) @@ -82,7 +75,13 @@ class sshserver(wireproto.abstractserver def sendstream(self, source): write = self.fout.write - for chunk in source.gen: + + if source.reader: + gen = iter(lambda: source.reader.read(4096), '') + else: + gen = source.gen + + for chunk in gen: write(chunk) self.fout.flush() diff --git a/mercurial/wireproto.py b/mercurial/wireproto.py --- a/mercurial/wireproto.py +++ b/mercurial/wireproto.py @@ -78,21 +78,6 @@ class abstractserverproto(object): # """ # raise NotImplementedError() - def groupchunks(self, fh): - """Generator of chunks to send to the client. - - Some protocols may have compressed the contents. - """ - raise NotImplementedError() - - def compresschunks(self, chunks): - """Generator of possible compressed chunks to send to the client. - - This is like ``groupchunks()`` except it accepts a generator as - its argument. - """ - raise NotImplementedError() - class remotebatch(peer.batcher): '''batches the queued calls; uses as few roundtrips as possible''' def __init__(self, remote): @@ -529,10 +514,19 @@ class streamres(object): """wireproto reply: binary stream The call was successful and the result is a stream. - Iterate on the `self.gen` attribute to retrieve chunks. + + Accepts either a generator or an object with a ``read(size)`` method. + + ``v1compressible`` indicates whether this data can be compressed to + "version 1" clients (technically: HTTP peers using + application/mercurial-0.1 media type). This flag should NOT be used on + new commands because new clients should support a more modern compression + mechanism. """ - def __init__(self, gen): + def __init__(self, gen=None, reader=None, v1compressible=False): self.gen = gen + self.reader = reader + self.v1compressible = v1compressible class pushres(object): """wireproto reply: success with simple integer return @@ -739,14 +733,14 @@ def capabilities(repo, proto): def changegroup(repo, proto, roots): nodes = decodelist(roots) cg = changegroupmod.changegroup(repo, nodes, 'serve') - return streamres(proto.groupchunks(cg)) + return streamres(reader=cg, v1compressible=True) @wireprotocommand('changegroupsubset', 'bases heads') def changegroupsubset(repo, proto, bases, heads): bases = decodelist(bases) heads = decodelist(heads) cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve') - return streamres(proto.groupchunks(cg)) + return streamres(reader=cg, v1compressible=True) @wireprotocommand('debugwireargs', 'one two *') def debugwireargs(repo, proto, one, two, others): @@ -781,7 +775,7 @@ def getbundle(repo, proto, others): return ooberror(bundle2required) chunks = exchange.getbundlechunks(repo, 'serve', **opts) - return streamres(proto.compresschunks(chunks)) + return streamres(gen=chunks, v1compressible=True) @wireprotocommand('heads') def heads(repo, proto): @@ -870,7 +864,7 @@ def stream(repo, proto): # LockError may be raised before the first result is yielded. Don't # emit output until we're sure we got the lock successfully. it = streamclone.generatev1wireproto(repo) - return streamres(getstream(it)) + return streamres(gen=getstream(it)) except error.LockError: return '2\n' @@ -900,7 +894,7 @@ def unbundle(repo, proto, heads): if util.safehasattr(r, 'addpart'): # The return looks streamable, we are in the bundle2 case and # should return a stream. - return streamres(r.getchunks()) + return streamres(gen=r.getchunks()) return pushres(r) finally: @@ -962,4 +956,4 @@ def unbundle(repo, proto, heads): manargs, advargs)) except error.PushRaced as exc: bundler.newpart('error:pushraced', [('message', str(exc))]) - return streamres(bundler.getchunks()) + return streamres(gen=bundler.getchunks())