##// END OF EJS Templates
wireproto: compress data from a generator...
Gregory Szorc -
r30206:d1051954 default
parent child Browse files
Show More
@@ -73,21 +73,30 b' class webproto(wireproto.abstractserverp'
73 val = self.ui.fout.getvalue()
73 val = self.ui.fout.getvalue()
74 self.ui.ferr, self.ui.fout = self.oldio
74 self.ui.ferr, self.ui.fout = self.oldio
75 return val
75 return val
76
76 def groupchunks(self, fh):
77 def groupchunks(self, fh):
78 def getchunks():
79 while True:
80 chunk = fh.read(32768)
81 if not chunk:
82 break
83 yield chunk
84
85 return self.compresschunks(getchunks())
86
87 def compresschunks(self, chunks):
77 # Don't allow untrusted settings because disabling compression or
88 # Don't allow untrusted settings because disabling compression or
78 # setting a very high compression level could lead to flooding
89 # setting a very high compression level could lead to flooding
79 # the server's network or CPU.
90 # the server's network or CPU.
80 z = zlib.compressobj(self.ui.configint('server', 'zliblevel', -1))
91 z = zlib.compressobj(self.ui.configint('server', 'zliblevel', -1))
81 while True:
92 for chunk in chunks:
82 chunk = fh.read(32768)
83 if not chunk:
84 break
85 data = z.compress(chunk)
93 data = z.compress(chunk)
86 # Not all calls to compress() emit data. It is cheaper to inspect
94 # Not all calls to compress() emit data. It is cheaper to inspect
87 # that here than to send it via the generator.
95 # that here than to send it via the generator.
88 if data:
96 if data:
89 yield data
97 yield data
90 yield z.flush()
98 yield z.flush()
99
91 def _client(self):
100 def _client(self):
92 return 'remote:%s:%s:%s' % (
101 return 'remote:%s:%s:%s' % (
93 self.req.env.get('wsgi.url_scheme') or 'http',
102 self.req.env.get('wsgi.url_scheme') or 'http',
@@ -71,6 +71,10 b' class sshserver(wireproto.abstractserver'
71 def groupchunks(self, fh):
71 def groupchunks(self, fh):
72 return iter(lambda: fh.read(4096), '')
72 return iter(lambda: fh.read(4096), '')
73
73
74 def compresschunks(self, chunks):
75 for chunk in chunks:
76 yield chunk
77
74 def sendresponse(self, v):
78 def sendresponse(self, v):
75 self.fout.write("%d\n" % len(v))
79 self.fout.write("%d\n" % len(v))
76 self.fout.write(v)
80 self.fout.write(v)
@@ -85,6 +85,14 b' class abstractserverproto(object):'
85 """
85 """
86 raise NotImplementedError()
86 raise NotImplementedError()
87
87
88 def compresschunks(self, chunks):
89 """Generator of possible compressed chunks to send to the client.
90
91 This is like ``groupchunks()`` except it accepts a generator as
92 its argument.
93 """
94 raise NotImplementedError()
95
88 class remotebatch(peer.batcher):
96 class remotebatch(peer.batcher):
89 '''batches the queued calls; uses as few roundtrips as possible'''
97 '''batches the queued calls; uses as few roundtrips as possible'''
90 def __init__(self, remote):
98 def __init__(self, remote):
@@ -773,9 +781,7 b' def getbundle(repo, proto, others):'
773 return ooberror(bundle2required)
781 return ooberror(bundle2required)
774
782
775 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
783 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
776 # TODO avoid util.chunkbuffer() here since it is adding overhead to
784 return streamres(proto.compresschunks(chunks))
777 # what is fundamentally a generator proxying operation.
778 return streamres(proto.groupchunks(util.chunkbuffer(chunks)))
779
785
780 @wireprotocommand('heads')
786 @wireprotocommand('heads')
781 def heads(repo, proto):
787 def heads(repo, proto):
General Comments 0
You need to be logged in to leave comments. Login now