##// END OF EJS Templates
wireproto: perform chunking and compression at protocol layer (API)...
Gregory Szorc -
r30466:2add671b default
parent child Browse files
Show More
@@ -76,7 +76,7 b' def getlfile(repo, proto, sha):'
76 76 yield '%d\n' % length
77 77 for chunk in util.filechunkiter(f):
78 78 yield chunk
79 return wireproto.streamres(generator())
79 return wireproto.streamres(gen=generator())
80 80
81 81 def statlfile(repo, proto, sha):
82 82 '''Server command for checking if a largefile is present - returns '2\n' if
@@ -73,16 +73,6 b' class webproto(wireproto.abstractserverp'
73 73 self.ui.ferr, self.ui.fout = self.oldio
74 74 return val
75 75
76 def groupchunks(self, fh):
77 def getchunks():
78 while True:
79 chunk = fh.read(32768)
80 if not chunk:
81 break
82 yield chunk
83
84 return self.compresschunks(getchunks())
85
86 76 def compresschunks(self, chunks):
87 77 # Don't allow untrusted settings because disabling compression or
88 78 # setting a very high compression level could lead to flooding
@@ -106,8 +96,16 b' def call(repo, req, cmd):'
106 96 req.respond(HTTP_OK, HGTYPE, body=rsp)
107 97 return []
108 98 elif isinstance(rsp, wireproto.streamres):
99 if rsp.reader:
100 gen = iter(lambda: rsp.reader.read(32768), '')
101 else:
102 gen = rsp.gen
103
104 if rsp.v1compressible:
105 gen = p.compresschunks(gen)
106
109 107 req.respond(HTTP_OK, HGTYPE)
110 return rsp.gen
108 return gen
111 109 elif isinstance(rsp, wireproto.pushres):
112 110 val = p.restore()
113 111 rsp = '%d\n%s' % (rsp.res, val)
@@ -68,13 +68,6 b' class sshserver(wireproto.abstractserver'
68 68 def redirect(self):
69 69 pass
70 70
71 def groupchunks(self, fh):
72 return iter(lambda: fh.read(4096), '')
73
74 def compresschunks(self, chunks):
75 for chunk in chunks:
76 yield chunk
77
78 71 def sendresponse(self, v):
79 72 self.fout.write("%d\n" % len(v))
80 73 self.fout.write(v)
@@ -82,7 +75,13 b' class sshserver(wireproto.abstractserver'
82 75
83 76 def sendstream(self, source):
84 77 write = self.fout.write
85 for chunk in source.gen:
78
79 if source.reader:
80 gen = iter(lambda: source.reader.read(4096), '')
81 else:
82 gen = source.gen
83
84 for chunk in gen:
86 85 write(chunk)
87 86 self.fout.flush()
88 87
@@ -78,21 +78,6 b' class abstractserverproto(object):'
78 78 # """
79 79 # raise NotImplementedError()
80 80
81 def groupchunks(self, fh):
82 """Generator of chunks to send to the client.
83
84 Some protocols may have compressed the contents.
85 """
86 raise NotImplementedError()
87
88 def compresschunks(self, chunks):
89 """Generator of possible compressed chunks to send to the client.
90
91 This is like ``groupchunks()`` except it accepts a generator as
92 its argument.
93 """
94 raise NotImplementedError()
95
96 81 class remotebatch(peer.batcher):
97 82 '''batches the queued calls; uses as few roundtrips as possible'''
98 83 def __init__(self, remote):
@@ -529,10 +514,19 b' class streamres(object):'
529 514 """wireproto reply: binary stream
530 515
531 516 The call was successful and the result is a stream.
532 Iterate on the `self.gen` attribute to retrieve chunks.
517
518 Accepts either a generator or an object with a ``read(size)`` method.
519
520 ``v1compressible`` indicates whether this data can be compressed to
521 "version 1" clients (technically: HTTP peers using
522 application/mercurial-0.1 media type). This flag should NOT be used on
523 new commands because new clients should support a more modern compression
524 mechanism.
533 525 """
534 def __init__(self, gen):
526 def __init__(self, gen=None, reader=None, v1compressible=False):
535 527 self.gen = gen
528 self.reader = reader
529 self.v1compressible = v1compressible
536 530
537 531 class pushres(object):
538 532 """wireproto reply: success with simple integer return
@@ -739,14 +733,14 b' def capabilities(repo, proto):'
739 733 def changegroup(repo, proto, roots):
740 734 nodes = decodelist(roots)
741 735 cg = changegroupmod.changegroup(repo, nodes, 'serve')
742 return streamres(proto.groupchunks(cg))
736 return streamres(reader=cg, v1compressible=True)
743 737
744 738 @wireprotocommand('changegroupsubset', 'bases heads')
745 739 def changegroupsubset(repo, proto, bases, heads):
746 740 bases = decodelist(bases)
747 741 heads = decodelist(heads)
748 742 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
749 return streamres(proto.groupchunks(cg))
743 return streamres(reader=cg, v1compressible=True)
750 744
751 745 @wireprotocommand('debugwireargs', 'one two *')
752 746 def debugwireargs(repo, proto, one, two, others):
@@ -781,7 +775,7 b' def getbundle(repo, proto, others):'
781 775 return ooberror(bundle2required)
782 776
783 777 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
784 return streamres(proto.compresschunks(chunks))
778 return streamres(gen=chunks, v1compressible=True)
785 779
786 780 @wireprotocommand('heads')
787 781 def heads(repo, proto):
@@ -870,7 +864,7 b' def stream(repo, proto):'
870 864 # LockError may be raised before the first result is yielded. Don't
871 865 # emit output until we're sure we got the lock successfully.
872 866 it = streamclone.generatev1wireproto(repo)
873 return streamres(getstream(it))
867 return streamres(gen=getstream(it))
874 868 except error.LockError:
875 869 return '2\n'
876 870
@@ -900,7 +894,7 b' def unbundle(repo, proto, heads):'
900 894 if util.safehasattr(r, 'addpart'):
901 895 # The return looks streamable, we are in the bundle2 case and
902 896 # should return a stream.
903 return streamres(r.getchunks())
897 return streamres(gen=r.getchunks())
904 898 return pushres(r)
905 899
906 900 finally:
@@ -962,4 +956,4 b' def unbundle(repo, proto, heads):'
962 956 manargs, advargs))
963 957 except error.PushRaced as exc:
964 958 bundler.newpart('error:pushraced', [('message', str(exc))])
965 return streamres(bundler.getchunks())
959 return streamres(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now