##// END OF EJS Templates
wireproto: split streamres into legacy and modern case...
Joerg Sonnenberger -
r35768:a39a9df7 default
parent child Browse files
Show More
@@ -75,7 +75,7 b' def getlfile(repo, proto, sha):'
75 75 yield '%d\n' % length
76 76 for chunk in util.filechunkiter(f):
77 77 yield chunk
78 return wireproto.streamres(gen=generator())
78 return wireproto.streamres_legacy(gen=generator())
79 79
80 80 def statlfile(repo, proto, sha):
81 81 '''Server command for checking if a largefile is present - returns '2\n' if
@@ -102,25 +102,20 b' class webproto(wireproto.abstractserverp'
102 102 urlreq.quote(self.req.env.get('REMOTE_HOST', '')),
103 103 urlreq.quote(self.req.env.get('REMOTE_USER', '')))
104 104
105 def responsetype(self, v1compressible=False):
105 def responsetype(self, prefer_uncompressed):
106 106 """Determine the appropriate response type and compression settings.
107 107
108 The ``v1compressible`` argument states whether the response with
109 application/mercurial-0.1 media types should be zlib compressed.
110
111 108 Returns a tuple of (mediatype, compengine, engineopts).
112 109 """
113 # For now, if it isn't compressible in the old world, it's never
114 # compressible. We can change this to send uncompressed 0.2 payloads
115 # later.
116 if not v1compressible:
117 return HGTYPE, None, None
118
119 110 # Determine the response media type and compression engine based
120 111 # on the request parameters.
121 112 protocaps = decodevaluefromheaders(self.req, r'X-HgProto').split(' ')
122 113
123 114 if '0.2' in protocaps:
115 # All clients are expected to support uncompressed data.
116 if prefer_uncompressed:
117 return HGTYPE2, util._noopengine(), {}
118
124 119 # Default as defined by wire protocol spec.
125 120 compformats = ['zlib', 'none']
126 121 for cap in protocaps:
@@ -155,7 +150,7 b' def iscmd(cmd):'
155 150 def call(repo, req, cmd):
156 151 p = webproto(req, repo.ui)
157 152
158 def genversion2(gen, compress, engine, engineopts):
153 def genversion2(gen, engine, engineopts):
159 154 # application/mercurial-0.2 always sends a payload header
160 155 # identifying the compression engine.
161 156 name = engine.wireprotosupport().name
@@ -163,28 +158,27 b' def call(repo, req, cmd):'
163 158 yield struct.pack('B', len(name))
164 159 yield name
165 160
166 if compress:
167 for chunk in engine.compressstream(gen, opts=engineopts):
168 yield chunk
169 else:
170 for chunk in gen:
171 yield chunk
161 for chunk in gen:
162 yield chunk
172 163
173 164 rsp = wireproto.dispatch(repo, p, cmd)
174 165 if isinstance(rsp, bytes):
175 166 req.respond(HTTP_OK, HGTYPE, body=rsp)
176 167 return []
168 elif isinstance(rsp, wireproto.streamres_legacy):
169 gen = rsp.gen
170 req.respond(HTTP_OK, HGTYPE)
171 return gen
177 172 elif isinstance(rsp, wireproto.streamres):
178 173 gen = rsp.gen
179 174
180 175 # This code for compression should not be streamres specific. It
181 176 # is here because we only compress streamres at the moment.
182 mediatype, engine, engineopts = p.responsetype(rsp.v1compressible)
177 mediatype, engine, engineopts = p.responsetype(rsp.prefer_uncompressed)
178 gen = engine.compressstream(gen, engineopts)
183 179
184 if mediatype == HGTYPE and rsp.v1compressible:
185 gen = engine.compressstream(gen, engineopts)
186 elif mediatype == HGTYPE2:
187 gen = genversion2(gen, rsp.v1compressible, engine, engineopts)
180 if mediatype == HGTYPE2:
181 gen = genversion2(gen, engine, engineopts)
188 182
189 183 req.respond(HTTP_OK, mediatype)
190 184 return gen
@@ -105,6 +105,7 b' class sshserver(wireproto.abstractserver'
105 105 handlers = {
106 106 str: sendresponse,
107 107 wireproto.streamres: sendstream,
108 wireproto.streamres_legacy: sendstream,
108 109 wireproto.pushres: sendpushresponse,
109 110 wireproto.pusherr: sendpusherror,
110 111 wireproto.ooberror: sendooberror,
@@ -522,15 +522,26 b' class streamres(object):'
522 522
523 523 Accepts a generator containing chunks of data to be sent to the client.
524 524
525 ``v1compressible`` indicates whether this data can be compressed to
526 "version 1" clients (technically: HTTP peers using
527 application/mercurial-0.1 media type). This flag should NOT be used on
528 new commands because new clients should support a more modern compression
529 mechanism.
525 ``prefer_uncompressed`` indicates that the data is expected to be
526 uncompressable and that the stream should therefore use the ``none``
527 engine.
530 528 """
531 def __init__(self, gen=None, v1compressible=False):
529 def __init__(self, gen=None, prefer_uncompressed=False):
532 530 self.gen = gen
533 self.v1compressible = v1compressible
531 self.prefer_uncompressed = prefer_uncompressed
532
533 class streamres_legacy(object):
534 """wireproto reply: uncompressed binary stream
535
536 The call was successful and the result is a stream.
537
538 Accepts a generator containing chunks of data to be sent to the client.
539
540 Like ``streamres``, but sends an uncompressed data for "version 1" clients
541 using the application/mercurial-0.1 media type.
542 """
543 def __init__(self, gen=None):
544 self.gen = gen
534 545
535 546 class pushres(object):
536 547 """wireproto reply: success with simple integer return
@@ -802,7 +813,7 b' def changegroup(repo, proto, roots):'
802 813 missingheads=repo.heads())
803 814 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
804 815 gen = iter(lambda: cg.read(32768), '')
805 return streamres(gen=gen, v1compressible=True)
816 return streamres(gen=gen)
806 817
807 818 @wireprotocommand('changegroupsubset', 'bases heads')
808 819 def changegroupsubset(repo, proto, bases, heads):
@@ -812,7 +823,7 b' def changegroupsubset(repo, proto, bases'
812 823 missingheads=heads)
813 824 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
814 825 gen = iter(lambda: cg.read(32768), '')
815 return streamres(gen=gen, v1compressible=True)
826 return streamres(gen=gen)
816 827
817 828 @wireprotocommand('debugwireargs', 'one two *')
818 829 def debugwireargs(repo, proto, one, two, others):
@@ -877,8 +888,8 b' def getbundle(repo, proto, others):'
877 888 advargs.append(('hint', exc.hint))
878 889 bundler.addpart(bundle2.bundlepart('error:abort',
879 890 manargs, advargs))
880 return streamres(gen=bundler.getchunks(), v1compressible=True)
881 return streamres(gen=chunks, v1compressible=True)
891 return streamres(gen=bundler.getchunks())
892 return streamres(gen=chunks)
882 893
883 894 @wireprotocommand('heads')
884 895 def heads(repo, proto):
@@ -955,7 +966,7 b' def stream(repo, proto):'
955 966 capability with a value representing the version and flags of the repo
956 967 it is serving. Client checks to see if it understands the format.
957 968 '''
958 return streamres(streamclone.generatev1wireproto(repo))
969 return streamres_legacy(streamclone.generatev1wireproto(repo))
959 970
960 971 @wireprotocommand('unbundle', 'heads')
961 972 def unbundle(repo, proto, heads):
@@ -990,7 +1001,7 b' def unbundle(repo, proto, heads):'
990 1001 if util.safehasattr(r, 'addpart'):
991 1002 # The return looks streamable, we are in the bundle2 case and
992 1003 # should return a stream.
993 return streamres(gen=r.getchunks())
1004 return streamres_legacy(gen=r.getchunks())
994 1005 return pushres(r)
995 1006
996 1007 finally:
@@ -1054,4 +1065,4 b' def unbundle(repo, proto, heads):'
1054 1065 manargs, advargs))
1055 1066 except error.PushRaced as exc:
1056 1067 bundler.newpart('error:pushraced', [('message', str(exc))])
1057 return streamres(gen=bundler.getchunks())
1068 return streamres_legacy(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now