##// END OF EJS Templates
wireproto: perform chunking and compression at protocol layer (API)...
Gregory Szorc -
r30466:2add671b default
parent child Browse files
Show More
@@ -76,7 +76,7 b' def getlfile(repo, proto, sha):'
76 yield '%d\n' % length
76 yield '%d\n' % length
77 for chunk in util.filechunkiter(f):
77 for chunk in util.filechunkiter(f):
78 yield chunk
78 yield chunk
79 return wireproto.streamres(generator())
79 return wireproto.streamres(gen=generator())
80
80
81 def statlfile(repo, proto, sha):
81 def statlfile(repo, proto, sha):
82 '''Server command for checking if a largefile is present - returns '2\n' if
82 '''Server command for checking if a largefile is present - returns '2\n' if
@@ -73,16 +73,6 b' class webproto(wireproto.abstractserverp'
73 self.ui.ferr, self.ui.fout = self.oldio
73 self.ui.ferr, self.ui.fout = self.oldio
74 return val
74 return val
75
75
76 def groupchunks(self, fh):
77 def getchunks():
78 while True:
79 chunk = fh.read(32768)
80 if not chunk:
81 break
82 yield chunk
83
84 return self.compresschunks(getchunks())
85
86 def compresschunks(self, chunks):
76 def compresschunks(self, chunks):
87 # Don't allow untrusted settings because disabling compression or
77 # Don't allow untrusted settings because disabling compression or
88 # setting a very high compression level could lead to flooding
78 # setting a very high compression level could lead to flooding
@@ -106,8 +96,16 b' def call(repo, req, cmd):'
106 req.respond(HTTP_OK, HGTYPE, body=rsp)
96 req.respond(HTTP_OK, HGTYPE, body=rsp)
107 return []
97 return []
108 elif isinstance(rsp, wireproto.streamres):
98 elif isinstance(rsp, wireproto.streamres):
99 if rsp.reader:
100 gen = iter(lambda: rsp.reader.read(32768), '')
101 else:
102 gen = rsp.gen
103
104 if rsp.v1compressible:
105 gen = p.compresschunks(gen)
106
109 req.respond(HTTP_OK, HGTYPE)
107 req.respond(HTTP_OK, HGTYPE)
110 return rsp.gen
108 return gen
111 elif isinstance(rsp, wireproto.pushres):
109 elif isinstance(rsp, wireproto.pushres):
112 val = p.restore()
110 val = p.restore()
113 rsp = '%d\n%s' % (rsp.res, val)
111 rsp = '%d\n%s' % (rsp.res, val)
@@ -68,13 +68,6 b' class sshserver(wireproto.abstractserver'
68 def redirect(self):
68 def redirect(self):
69 pass
69 pass
70
70
71 def groupchunks(self, fh):
72 return iter(lambda: fh.read(4096), '')
73
74 def compresschunks(self, chunks):
75 for chunk in chunks:
76 yield chunk
77
78 def sendresponse(self, v):
71 def sendresponse(self, v):
79 self.fout.write("%d\n" % len(v))
72 self.fout.write("%d\n" % len(v))
80 self.fout.write(v)
73 self.fout.write(v)
@@ -82,7 +75,13 b' class sshserver(wireproto.abstractserver'
82
75
83 def sendstream(self, source):
76 def sendstream(self, source):
84 write = self.fout.write
77 write = self.fout.write
85 for chunk in source.gen:
78
79 if source.reader:
80 gen = iter(lambda: source.reader.read(4096), '')
81 else:
82 gen = source.gen
83
84 for chunk in gen:
86 write(chunk)
85 write(chunk)
87 self.fout.flush()
86 self.fout.flush()
88
87
@@ -78,21 +78,6 b' class abstractserverproto(object):'
78 # """
78 # """
79 # raise NotImplementedError()
79 # raise NotImplementedError()
80
80
81 def groupchunks(self, fh):
82 """Generator of chunks to send to the client.
83
84 Some protocols may have compressed the contents.
85 """
86 raise NotImplementedError()
87
88 def compresschunks(self, chunks):
89 """Generator of possible compressed chunks to send to the client.
90
91 This is like ``groupchunks()`` except it accepts a generator as
92 its argument.
93 """
94 raise NotImplementedError()
95
96 class remotebatch(peer.batcher):
81 class remotebatch(peer.batcher):
97 '''batches the queued calls; uses as few roundtrips as possible'''
82 '''batches the queued calls; uses as few roundtrips as possible'''
98 def __init__(self, remote):
83 def __init__(self, remote):
@@ -529,10 +514,19 b' class streamres(object):'
529 """wireproto reply: binary stream
514 """wireproto reply: binary stream
530
515
531 The call was successful and the result is a stream.
516 The call was successful and the result is a stream.
532 Iterate on the `self.gen` attribute to retrieve chunks.
517
518 Accepts either a generator or an object with a ``read(size)`` method.
519
520 ``v1compressible`` indicates whether this data can be compressed to
521 "version 1" clients (technically: HTTP peers using
522 application/mercurial-0.1 media type). This flag should NOT be used on
523 new commands because new clients should support a more modern compression
524 mechanism.
533 """
525 """
534 def __init__(self, gen):
526 def __init__(self, gen=None, reader=None, v1compressible=False):
535 self.gen = gen
527 self.gen = gen
528 self.reader = reader
529 self.v1compressible = v1compressible
536
530
537 class pushres(object):
531 class pushres(object):
538 """wireproto reply: success with simple integer return
532 """wireproto reply: success with simple integer return
@@ -739,14 +733,14 b' def capabilities(repo, proto):'
739 def changegroup(repo, proto, roots):
733 def changegroup(repo, proto, roots):
740 nodes = decodelist(roots)
734 nodes = decodelist(roots)
741 cg = changegroupmod.changegroup(repo, nodes, 'serve')
735 cg = changegroupmod.changegroup(repo, nodes, 'serve')
742 return streamres(proto.groupchunks(cg))
736 return streamres(reader=cg, v1compressible=True)
743
737
744 @wireprotocommand('changegroupsubset', 'bases heads')
738 @wireprotocommand('changegroupsubset', 'bases heads')
745 def changegroupsubset(repo, proto, bases, heads):
739 def changegroupsubset(repo, proto, bases, heads):
746 bases = decodelist(bases)
740 bases = decodelist(bases)
747 heads = decodelist(heads)
741 heads = decodelist(heads)
748 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
742 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
749 return streamres(proto.groupchunks(cg))
743 return streamres(reader=cg, v1compressible=True)
750
744
751 @wireprotocommand('debugwireargs', 'one two *')
745 @wireprotocommand('debugwireargs', 'one two *')
752 def debugwireargs(repo, proto, one, two, others):
746 def debugwireargs(repo, proto, one, two, others):
@@ -781,7 +775,7 b' def getbundle(repo, proto, others):'
781 return ooberror(bundle2required)
775 return ooberror(bundle2required)
782
776
783 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
777 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
784 return streamres(proto.compresschunks(chunks))
778 return streamres(gen=chunks, v1compressible=True)
785
779
786 @wireprotocommand('heads')
780 @wireprotocommand('heads')
787 def heads(repo, proto):
781 def heads(repo, proto):
@@ -870,7 +864,7 b' def stream(repo, proto):'
870 # LockError may be raised before the first result is yielded. Don't
864 # LockError may be raised before the first result is yielded. Don't
871 # emit output until we're sure we got the lock successfully.
865 # emit output until we're sure we got the lock successfully.
872 it = streamclone.generatev1wireproto(repo)
866 it = streamclone.generatev1wireproto(repo)
873 return streamres(getstream(it))
867 return streamres(gen=getstream(it))
874 except error.LockError:
868 except error.LockError:
875 return '2\n'
869 return '2\n'
876
870
@@ -900,7 +894,7 b' def unbundle(repo, proto, heads):'
900 if util.safehasattr(r, 'addpart'):
894 if util.safehasattr(r, 'addpart'):
901 # The return looks streamable, we are in the bundle2 case and
895 # The return looks streamable, we are in the bundle2 case and
902 # should return a stream.
896 # should return a stream.
903 return streamres(r.getchunks())
897 return streamres(gen=r.getchunks())
904 return pushres(r)
898 return pushres(r)
905
899
906 finally:
900 finally:
@@ -962,4 +956,4 b' def unbundle(repo, proto, heads):'
962 manargs, advargs))
956 manargs, advargs))
963 except error.PushRaced as exc:
957 except error.PushRaced as exc:
964 bundler.newpart('error:pushraced', [('message', str(exc))])
958 bundler.newpart('error:pushraced', [('message', str(exc))])
965 return streamres(bundler.getchunks())
959 return streamres(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now