##// END OF EJS Templates
wireproto: split streamres into legacy and modern case...
Joerg Sonnenberger -
r35768:a39a9df7 default
parent child Browse files
Show More
@@ -75,7 +75,7 b' def getlfile(repo, proto, sha):'
75 yield '%d\n' % length
75 yield '%d\n' % length
76 for chunk in util.filechunkiter(f):
76 for chunk in util.filechunkiter(f):
77 yield chunk
77 yield chunk
78 return wireproto.streamres(gen=generator())
78 return wireproto.streamres_legacy(gen=generator())
79
79
80 def statlfile(repo, proto, sha):
80 def statlfile(repo, proto, sha):
81 '''Server command for checking if a largefile is present - returns '2\n' if
81 '''Server command for checking if a largefile is present - returns '2\n' if
@@ -102,25 +102,20 b' class webproto(wireproto.abstractserverp'
102 urlreq.quote(self.req.env.get('REMOTE_HOST', '')),
102 urlreq.quote(self.req.env.get('REMOTE_HOST', '')),
103 urlreq.quote(self.req.env.get('REMOTE_USER', '')))
103 urlreq.quote(self.req.env.get('REMOTE_USER', '')))
104
104
105 def responsetype(self, v1compressible=False):
105 def responsetype(self, prefer_uncompressed):
106 """Determine the appropriate response type and compression settings.
106 """Determine the appropriate response type and compression settings.
107
107
108 The ``v1compressible`` argument states whether the response with
109 application/mercurial-0.1 media types should be zlib compressed.
110
111 Returns a tuple of (mediatype, compengine, engineopts).
108 Returns a tuple of (mediatype, compengine, engineopts).
112 """
109 """
113 # For now, if it isn't compressible in the old world, it's never
114 # compressible. We can change this to send uncompressed 0.2 payloads
115 # later.
116 if not v1compressible:
117 return HGTYPE, None, None
118
119 # Determine the response media type and compression engine based
110 # Determine the response media type and compression engine based
120 # on the request parameters.
111 # on the request parameters.
121 protocaps = decodevaluefromheaders(self.req, r'X-HgProto').split(' ')
112 protocaps = decodevaluefromheaders(self.req, r'X-HgProto').split(' ')
122
113
123 if '0.2' in protocaps:
114 if '0.2' in protocaps:
115 # All clients are expected to support uncompressed data.
116 if prefer_uncompressed:
117 return HGTYPE2, util._noopengine(), {}
118
124 # Default as defined by wire protocol spec.
119 # Default as defined by wire protocol spec.
125 compformats = ['zlib', 'none']
120 compformats = ['zlib', 'none']
126 for cap in protocaps:
121 for cap in protocaps:
@@ -155,7 +150,7 b' def iscmd(cmd):'
155 def call(repo, req, cmd):
150 def call(repo, req, cmd):
156 p = webproto(req, repo.ui)
151 p = webproto(req, repo.ui)
157
152
158 def genversion2(gen, compress, engine, engineopts):
153 def genversion2(gen, engine, engineopts):
159 # application/mercurial-0.2 always sends a payload header
154 # application/mercurial-0.2 always sends a payload header
160 # identifying the compression engine.
155 # identifying the compression engine.
161 name = engine.wireprotosupport().name
156 name = engine.wireprotosupport().name
@@ -163,28 +158,27 b' def call(repo, req, cmd):'
163 yield struct.pack('B', len(name))
158 yield struct.pack('B', len(name))
164 yield name
159 yield name
165
160
166 if compress:
161 for chunk in gen:
167 for chunk in engine.compressstream(gen, opts=engineopts):
162 yield chunk
168 yield chunk
169 else:
170 for chunk in gen:
171 yield chunk
172
163
173 rsp = wireproto.dispatch(repo, p, cmd)
164 rsp = wireproto.dispatch(repo, p, cmd)
174 if isinstance(rsp, bytes):
165 if isinstance(rsp, bytes):
175 req.respond(HTTP_OK, HGTYPE, body=rsp)
166 req.respond(HTTP_OK, HGTYPE, body=rsp)
176 return []
167 return []
168 elif isinstance(rsp, wireproto.streamres_legacy):
169 gen = rsp.gen
170 req.respond(HTTP_OK, HGTYPE)
171 return gen
177 elif isinstance(rsp, wireproto.streamres):
172 elif isinstance(rsp, wireproto.streamres):
178 gen = rsp.gen
173 gen = rsp.gen
179
174
180 # This code for compression should not be streamres specific. It
175 # This code for compression should not be streamres specific. It
181 # is here because we only compress streamres at the moment.
176 # is here because we only compress streamres at the moment.
182 mediatype, engine, engineopts = p.responsetype(rsp.v1compressible)
177 mediatype, engine, engineopts = p.responsetype(rsp.prefer_uncompressed)
178 gen = engine.compressstream(gen, engineopts)
183
179
184 if mediatype == HGTYPE and rsp.v1compressible:
180 if mediatype == HGTYPE2:
185 gen = engine.compressstream(gen, engineopts)
181 gen = genversion2(gen, engine, engineopts)
186 elif mediatype == HGTYPE2:
187 gen = genversion2(gen, rsp.v1compressible, engine, engineopts)
188
182
189 req.respond(HTTP_OK, mediatype)
183 req.respond(HTTP_OK, mediatype)
190 return gen
184 return gen
@@ -105,6 +105,7 b' class sshserver(wireproto.abstractserver'
105 handlers = {
105 handlers = {
106 str: sendresponse,
106 str: sendresponse,
107 wireproto.streamres: sendstream,
107 wireproto.streamres: sendstream,
108 wireproto.streamres_legacy: sendstream,
108 wireproto.pushres: sendpushresponse,
109 wireproto.pushres: sendpushresponse,
109 wireproto.pusherr: sendpusherror,
110 wireproto.pusherr: sendpusherror,
110 wireproto.ooberror: sendooberror,
111 wireproto.ooberror: sendooberror,
@@ -522,15 +522,26 b' class streamres(object):'
522
522
523 Accepts a generator containing chunks of data to be sent to the client.
523 Accepts a generator containing chunks of data to be sent to the client.
524
524
525 ``v1compressible`` indicates whether this data can be compressed to
525 ``prefer_uncompressed`` indicates that the data is expected to be
526 "version 1" clients (technically: HTTP peers using
526 uncompressable and that the stream should therefore use the ``none``
527 application/mercurial-0.1 media type). This flag should NOT be used on
527 engine.
528 new commands because new clients should support a more modern compression
529 mechanism.
530 """
528 """
531 def __init__(self, gen=None, v1compressible=False):
529 def __init__(self, gen=None, prefer_uncompressed=False):
532 self.gen = gen
530 self.gen = gen
533 self.v1compressible = v1compressible
531 self.prefer_uncompressed = prefer_uncompressed
532
533 class streamres_legacy(object):
534 """wireproto reply: uncompressed binary stream
535
536 The call was successful and the result is a stream.
537
538 Accepts a generator containing chunks of data to be sent to the client.
539
540 Like ``streamres``, but sends an uncompressed data for "version 1" clients
541 using the application/mercurial-0.1 media type.
542 """
543 def __init__(self, gen=None):
544 self.gen = gen
534
545
535 class pushres(object):
546 class pushres(object):
536 """wireproto reply: success with simple integer return
547 """wireproto reply: success with simple integer return
@@ -802,7 +813,7 b' def changegroup(repo, proto, roots):'
802 missingheads=repo.heads())
813 missingheads=repo.heads())
803 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
814 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
804 gen = iter(lambda: cg.read(32768), '')
815 gen = iter(lambda: cg.read(32768), '')
805 return streamres(gen=gen, v1compressible=True)
816 return streamres(gen=gen)
806
817
807 @wireprotocommand('changegroupsubset', 'bases heads')
818 @wireprotocommand('changegroupsubset', 'bases heads')
808 def changegroupsubset(repo, proto, bases, heads):
819 def changegroupsubset(repo, proto, bases, heads):
@@ -812,7 +823,7 b' def changegroupsubset(repo, proto, bases'
812 missingheads=heads)
823 missingheads=heads)
813 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
824 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
814 gen = iter(lambda: cg.read(32768), '')
825 gen = iter(lambda: cg.read(32768), '')
815 return streamres(gen=gen, v1compressible=True)
826 return streamres(gen=gen)
816
827
817 @wireprotocommand('debugwireargs', 'one two *')
828 @wireprotocommand('debugwireargs', 'one two *')
818 def debugwireargs(repo, proto, one, two, others):
829 def debugwireargs(repo, proto, one, two, others):
@@ -877,8 +888,8 b' def getbundle(repo, proto, others):'
877 advargs.append(('hint', exc.hint))
888 advargs.append(('hint', exc.hint))
878 bundler.addpart(bundle2.bundlepart('error:abort',
889 bundler.addpart(bundle2.bundlepart('error:abort',
879 manargs, advargs))
890 manargs, advargs))
880 return streamres(gen=bundler.getchunks(), v1compressible=True)
891 return streamres(gen=bundler.getchunks())
881 return streamres(gen=chunks, v1compressible=True)
892 return streamres(gen=chunks)
882
893
883 @wireprotocommand('heads')
894 @wireprotocommand('heads')
884 def heads(repo, proto):
895 def heads(repo, proto):
@@ -955,7 +966,7 b' def stream(repo, proto):'
955 capability with a value representing the version and flags of the repo
966 capability with a value representing the version and flags of the repo
956 it is serving. Client checks to see if it understands the format.
967 it is serving. Client checks to see if it understands the format.
957 '''
968 '''
958 return streamres(streamclone.generatev1wireproto(repo))
969 return streamres_legacy(streamclone.generatev1wireproto(repo))
959
970
960 @wireprotocommand('unbundle', 'heads')
971 @wireprotocommand('unbundle', 'heads')
961 def unbundle(repo, proto, heads):
972 def unbundle(repo, proto, heads):
@@ -990,7 +1001,7 b' def unbundle(repo, proto, heads):'
990 if util.safehasattr(r, 'addpart'):
1001 if util.safehasattr(r, 'addpart'):
991 # The return looks streamable, we are in the bundle2 case and
1002 # The return looks streamable, we are in the bundle2 case and
992 # should return a stream.
1003 # should return a stream.
993 return streamres(gen=r.getchunks())
1004 return streamres_legacy(gen=r.getchunks())
994 return pushres(r)
1005 return pushres(r)
995
1006
996 finally:
1007 finally:
@@ -1054,4 +1065,4 b' def unbundle(repo, proto, heads):'
1054 manargs, advargs))
1065 manargs, advargs))
1055 except error.PushRaced as exc:
1066 except error.PushRaced as exc:
1056 bundler.newpart('error:pushraced', [('message', str(exc))])
1067 bundler.newpart('error:pushraced', [('message', str(exc))])
1057 return streamres(gen=bundler.getchunks())
1068 return streamres_legacy(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now