##// END OF EJS Templates
wireprotoserver: remove broken optimization for non-httplib client...
Gregory Szorc -
r36831:5a3c8341 default
parent child Browse files
Show More
@@ -1,672 +1,669
1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 #
3 #
4 # This software may be used and distributed according to the terms of the
4 # This software may be used and distributed according to the terms of the
5 # GNU General Public License version 2 or any later version.
5 # GNU General Public License version 2 or any later version.
6
6
7 from __future__ import absolute_import
7 from __future__ import absolute_import
8
8
9 import contextlib
9 import contextlib
10 import struct
10 import struct
11 import sys
11 import sys
12 import threading
12 import threading
13
13
14 from .i18n import _
14 from .i18n import _
15 from . import (
15 from . import (
16 encoding,
16 encoding,
17 error,
17 error,
18 hook,
18 hook,
19 pycompat,
19 pycompat,
20 util,
20 util,
21 wireproto,
21 wireproto,
22 wireprototypes,
22 wireprototypes,
23 )
23 )
24
24
25 stringio = util.stringio
25 stringio = util.stringio
26
26
27 urlerr = util.urlerr
27 urlerr = util.urlerr
28 urlreq = util.urlreq
28 urlreq = util.urlreq
29
29
30 HTTP_OK = 200
30 HTTP_OK = 200
31
31
32 HGTYPE = 'application/mercurial-0.1'
32 HGTYPE = 'application/mercurial-0.1'
33 HGTYPE2 = 'application/mercurial-0.2'
33 HGTYPE2 = 'application/mercurial-0.2'
34 HGERRTYPE = 'application/hg-error'
34 HGERRTYPE = 'application/hg-error'
35
35
36 SSHV1 = wireprototypes.SSHV1
36 SSHV1 = wireprototypes.SSHV1
37 SSHV2 = wireprototypes.SSHV2
37 SSHV2 = wireprototypes.SSHV2
38
38
39 def decodevaluefromheaders(wsgireq, headerprefix):
39 def decodevaluefromheaders(wsgireq, headerprefix):
40 """Decode a long value from multiple HTTP request headers.
40 """Decode a long value from multiple HTTP request headers.
41
41
42 Returns the value as a bytes, not a str.
42 Returns the value as a bytes, not a str.
43 """
43 """
44 chunks = []
44 chunks = []
45 i = 1
45 i = 1
46 prefix = headerprefix.upper().replace(r'-', r'_')
46 prefix = headerprefix.upper().replace(r'-', r'_')
47 while True:
47 while True:
48 v = wsgireq.env.get(r'HTTP_%s_%d' % (prefix, i))
48 v = wsgireq.env.get(r'HTTP_%s_%d' % (prefix, i))
49 if v is None:
49 if v is None:
50 break
50 break
51 chunks.append(pycompat.bytesurl(v))
51 chunks.append(pycompat.bytesurl(v))
52 i += 1
52 i += 1
53
53
54 return ''.join(chunks)
54 return ''.join(chunks)
55
55
56 class httpv1protocolhandler(wireprototypes.baseprotocolhandler):
56 class httpv1protocolhandler(wireprototypes.baseprotocolhandler):
57 def __init__(self, wsgireq, ui, checkperm):
57 def __init__(self, wsgireq, ui, checkperm):
58 self._wsgireq = wsgireq
58 self._wsgireq = wsgireq
59 self._ui = ui
59 self._ui = ui
60 self._checkperm = checkperm
60 self._checkperm = checkperm
61
61
62 @property
62 @property
63 def name(self):
63 def name(self):
64 return 'http-v1'
64 return 'http-v1'
65
65
66 def getargs(self, args):
66 def getargs(self, args):
67 knownargs = self._args()
67 knownargs = self._args()
68 data = {}
68 data = {}
69 keys = args.split()
69 keys = args.split()
70 for k in keys:
70 for k in keys:
71 if k == '*':
71 if k == '*':
72 star = {}
72 star = {}
73 for key in knownargs.keys():
73 for key in knownargs.keys():
74 if key != 'cmd' and key not in keys:
74 if key != 'cmd' and key not in keys:
75 star[key] = knownargs[key][0]
75 star[key] = knownargs[key][0]
76 data['*'] = star
76 data['*'] = star
77 else:
77 else:
78 data[k] = knownargs[k][0]
78 data[k] = knownargs[k][0]
79 return [data[k] for k in keys]
79 return [data[k] for k in keys]
80
80
81 def _args(self):
81 def _args(self):
82 args = util.rapply(pycompat.bytesurl, self._wsgireq.form.copy())
82 args = util.rapply(pycompat.bytesurl, self._wsgireq.form.copy())
83 postlen = int(self._wsgireq.env.get(r'HTTP_X_HGARGS_POST', 0))
83 postlen = int(self._wsgireq.env.get(r'HTTP_X_HGARGS_POST', 0))
84 if postlen:
84 if postlen:
85 args.update(urlreq.parseqs(
85 args.update(urlreq.parseqs(
86 self._wsgireq.read(postlen), keep_blank_values=True))
86 self._wsgireq.read(postlen), keep_blank_values=True))
87 return args
87 return args
88
88
89 argvalue = decodevaluefromheaders(self._wsgireq, r'X-HgArg')
89 argvalue = decodevaluefromheaders(self._wsgireq, r'X-HgArg')
90 args.update(urlreq.parseqs(argvalue, keep_blank_values=True))
90 args.update(urlreq.parseqs(argvalue, keep_blank_values=True))
91 return args
91 return args
92
92
93 def forwardpayload(self, fp):
93 def forwardpayload(self, fp):
94 if r'HTTP_CONTENT_LENGTH' in self._wsgireq.env:
94 if r'HTTP_CONTENT_LENGTH' in self._wsgireq.env:
95 length = int(self._wsgireq.env[r'HTTP_CONTENT_LENGTH'])
95 length = int(self._wsgireq.env[r'HTTP_CONTENT_LENGTH'])
96 else:
96 else:
97 length = int(self._wsgireq.env[r'CONTENT_LENGTH'])
97 length = int(self._wsgireq.env[r'CONTENT_LENGTH'])
98 # If httppostargs is used, we need to read Content-Length
98 # If httppostargs is used, we need to read Content-Length
99 # minus the amount that was consumed by args.
99 # minus the amount that was consumed by args.
100 length -= int(self._wsgireq.env.get(r'HTTP_X_HGARGS_POST', 0))
100 length -= int(self._wsgireq.env.get(r'HTTP_X_HGARGS_POST', 0))
101 for s in util.filechunkiter(self._wsgireq, limit=length):
101 for s in util.filechunkiter(self._wsgireq, limit=length):
102 fp.write(s)
102 fp.write(s)
103
103
104 @contextlib.contextmanager
104 @contextlib.contextmanager
105 def mayberedirectstdio(self):
105 def mayberedirectstdio(self):
106 oldout = self._ui.fout
106 oldout = self._ui.fout
107 olderr = self._ui.ferr
107 olderr = self._ui.ferr
108
108
109 out = util.stringio()
109 out = util.stringio()
110
110
111 try:
111 try:
112 self._ui.fout = out
112 self._ui.fout = out
113 self._ui.ferr = out
113 self._ui.ferr = out
114 yield out
114 yield out
115 finally:
115 finally:
116 self._ui.fout = oldout
116 self._ui.fout = oldout
117 self._ui.ferr = olderr
117 self._ui.ferr = olderr
118
118
119 def client(self):
119 def client(self):
120 return 'remote:%s:%s:%s' % (
120 return 'remote:%s:%s:%s' % (
121 self._wsgireq.env.get('wsgi.url_scheme') or 'http',
121 self._wsgireq.env.get('wsgi.url_scheme') or 'http',
122 urlreq.quote(self._wsgireq.env.get('REMOTE_HOST', '')),
122 urlreq.quote(self._wsgireq.env.get('REMOTE_HOST', '')),
123 urlreq.quote(self._wsgireq.env.get('REMOTE_USER', '')))
123 urlreq.quote(self._wsgireq.env.get('REMOTE_USER', '')))
124
124
125 def addcapabilities(self, repo, caps):
125 def addcapabilities(self, repo, caps):
126 caps.append('httpheader=%d' %
126 caps.append('httpheader=%d' %
127 repo.ui.configint('server', 'maxhttpheaderlen'))
127 repo.ui.configint('server', 'maxhttpheaderlen'))
128 if repo.ui.configbool('experimental', 'httppostargs'):
128 if repo.ui.configbool('experimental', 'httppostargs'):
129 caps.append('httppostargs')
129 caps.append('httppostargs')
130
130
131 # FUTURE advertise 0.2rx once support is implemented
131 # FUTURE advertise 0.2rx once support is implemented
132 # FUTURE advertise minrx and mintx after consulting config option
132 # FUTURE advertise minrx and mintx after consulting config option
133 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
133 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
134
134
135 compengines = wireproto.supportedcompengines(repo.ui, util.SERVERROLE)
135 compengines = wireproto.supportedcompengines(repo.ui, util.SERVERROLE)
136 if compengines:
136 if compengines:
137 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
137 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
138 for e in compengines)
138 for e in compengines)
139 caps.append('compression=%s' % comptypes)
139 caps.append('compression=%s' % comptypes)
140
140
141 return caps
141 return caps
142
142
143 def checkperm(self, perm):
143 def checkperm(self, perm):
144 return self._checkperm(perm)
144 return self._checkperm(perm)
145
145
146 # This method exists mostly so that extensions like remotefilelog can
146 # This method exists mostly so that extensions like remotefilelog can
147 # disable a kludgey legacy method only over http. As of early 2018,
147 # disable a kludgey legacy method only over http. As of early 2018,
148 # there are no other known users, so with any luck we can discard this
148 # there are no other known users, so with any luck we can discard this
149 # hook if remotefilelog becomes a first-party extension.
149 # hook if remotefilelog becomes a first-party extension.
150 def iscmd(cmd):
150 def iscmd(cmd):
151 return cmd in wireproto.commands
151 return cmd in wireproto.commands
152
152
153 def handlewsgirequest(rctx, wsgireq, req, checkperm):
153 def handlewsgirequest(rctx, wsgireq, req, checkperm):
154 """Possibly process a wire protocol request.
154 """Possibly process a wire protocol request.
155
155
156 If the current request is a wire protocol request, the request is
156 If the current request is a wire protocol request, the request is
157 processed by this function.
157 processed by this function.
158
158
159 ``wsgireq`` is a ``wsgirequest`` instance.
159 ``wsgireq`` is a ``wsgirequest`` instance.
160 ``req`` is a ``parsedrequest`` instance.
160 ``req`` is a ``parsedrequest`` instance.
161
161
162 Returns a 2-tuple of (bool, response) where the 1st element indicates
162 Returns a 2-tuple of (bool, response) where the 1st element indicates
163 whether the request was handled and the 2nd element is a return
163 whether the request was handled and the 2nd element is a return
164 value for a WSGI application (often a generator of bytes).
164 value for a WSGI application (often a generator of bytes).
165 """
165 """
166 # Avoid cycle involving hg module.
166 # Avoid cycle involving hg module.
167 from .hgweb import common as hgwebcommon
167 from .hgweb import common as hgwebcommon
168
168
169 repo = rctx.repo
169 repo = rctx.repo
170
170
171 # HTTP version 1 wire protocol requests are denoted by a "cmd" query
171 # HTTP version 1 wire protocol requests are denoted by a "cmd" query
172 # string parameter. If it isn't present, this isn't a wire protocol
172 # string parameter. If it isn't present, this isn't a wire protocol
173 # request.
173 # request.
174 if 'cmd' not in req.querystringdict:
174 if 'cmd' not in req.querystringdict:
175 return False, None
175 return False, None
176
176
177 cmd = req.querystringdict['cmd'][0]
177 cmd = req.querystringdict['cmd'][0]
178
178
179 # The "cmd" request parameter is used by both the wire protocol and hgweb.
179 # The "cmd" request parameter is used by both the wire protocol and hgweb.
180 # While not all wire protocol commands are available for all transports,
180 # While not all wire protocol commands are available for all transports,
181 # if we see a "cmd" value that resembles a known wire protocol command, we
181 # if we see a "cmd" value that resembles a known wire protocol command, we
182 # route it to a protocol handler. This is better than routing possible
182 # route it to a protocol handler. This is better than routing possible
183 # wire protocol requests to hgweb because it prevents hgweb from using
183 # wire protocol requests to hgweb because it prevents hgweb from using
184 # known wire protocol commands and it is less confusing for machine
184 # known wire protocol commands and it is less confusing for machine
185 # clients.
185 # clients.
186 if not iscmd(cmd):
186 if not iscmd(cmd):
187 return False, None
187 return False, None
188
188
189 # The "cmd" query string argument is only valid on the root path of the
189 # The "cmd" query string argument is only valid on the root path of the
190 # repo. e.g. ``/?cmd=foo``, ``/repo?cmd=foo``. URL paths within the repo
190 # repo. e.g. ``/?cmd=foo``, ``/repo?cmd=foo``. URL paths within the repo
191 # like ``/blah?cmd=foo`` are not allowed. So don't recognize the request
191 # like ``/blah?cmd=foo`` are not allowed. So don't recognize the request
192 # in this case. We send an HTTP 404 for backwards compatibility reasons.
192 # in this case. We send an HTTP 404 for backwards compatibility reasons.
193 if req.dispatchpath:
193 if req.dispatchpath:
194 res = _handlehttperror(
194 res = _handlehttperror(
195 hgwebcommon.ErrorResponse(hgwebcommon.HTTP_NOT_FOUND), wsgireq,
195 hgwebcommon.ErrorResponse(hgwebcommon.HTTP_NOT_FOUND), wsgireq,
196 cmd)
196 cmd)
197
197
198 return True, res
198 return True, res
199
199
200 proto = httpv1protocolhandler(wsgireq, repo.ui,
200 proto = httpv1protocolhandler(wsgireq, repo.ui,
201 lambda perm: checkperm(rctx, wsgireq, perm))
201 lambda perm: checkperm(rctx, wsgireq, perm))
202
202
203 # The permissions checker should be the only thing that can raise an
203 # The permissions checker should be the only thing that can raise an
204 # ErrorResponse. It is kind of a layer violation to catch an hgweb
204 # ErrorResponse. It is kind of a layer violation to catch an hgweb
205 # exception here. So consider refactoring into a exception type that
205 # exception here. So consider refactoring into a exception type that
206 # is associated with the wire protocol.
206 # is associated with the wire protocol.
207 try:
207 try:
208 res = _callhttp(repo, wsgireq, proto, cmd)
208 res = _callhttp(repo, wsgireq, proto, cmd)
209 except hgwebcommon.ErrorResponse as e:
209 except hgwebcommon.ErrorResponse as e:
210 res = _handlehttperror(e, wsgireq, cmd)
210 res = _handlehttperror(e, wsgireq, cmd)
211
211
212 return True, res
212 return True, res
213
213
214 def _httpresponsetype(ui, wsgireq, prefer_uncompressed):
214 def _httpresponsetype(ui, wsgireq, prefer_uncompressed):
215 """Determine the appropriate response type and compression settings.
215 """Determine the appropriate response type and compression settings.
216
216
217 Returns a tuple of (mediatype, compengine, engineopts).
217 Returns a tuple of (mediatype, compengine, engineopts).
218 """
218 """
219 # Determine the response media type and compression engine based
219 # Determine the response media type and compression engine based
220 # on the request parameters.
220 # on the request parameters.
221 protocaps = decodevaluefromheaders(wsgireq, r'X-HgProto').split(' ')
221 protocaps = decodevaluefromheaders(wsgireq, r'X-HgProto').split(' ')
222
222
223 if '0.2' in protocaps:
223 if '0.2' in protocaps:
224 # All clients are expected to support uncompressed data.
224 # All clients are expected to support uncompressed data.
225 if prefer_uncompressed:
225 if prefer_uncompressed:
226 return HGTYPE2, util._noopengine(), {}
226 return HGTYPE2, util._noopengine(), {}
227
227
228 # Default as defined by wire protocol spec.
228 # Default as defined by wire protocol spec.
229 compformats = ['zlib', 'none']
229 compformats = ['zlib', 'none']
230 for cap in protocaps:
230 for cap in protocaps:
231 if cap.startswith('comp='):
231 if cap.startswith('comp='):
232 compformats = cap[5:].split(',')
232 compformats = cap[5:].split(',')
233 break
233 break
234
234
235 # Now find an agreed upon compression format.
235 # Now find an agreed upon compression format.
236 for engine in wireproto.supportedcompengines(ui, util.SERVERROLE):
236 for engine in wireproto.supportedcompengines(ui, util.SERVERROLE):
237 if engine.wireprotosupport().name in compformats:
237 if engine.wireprotosupport().name in compformats:
238 opts = {}
238 opts = {}
239 level = ui.configint('server', '%slevel' % engine.name())
239 level = ui.configint('server', '%slevel' % engine.name())
240 if level is not None:
240 if level is not None:
241 opts['level'] = level
241 opts['level'] = level
242
242
243 return HGTYPE2, engine, opts
243 return HGTYPE2, engine, opts
244
244
245 # No mutually supported compression format. Fall back to the
245 # No mutually supported compression format. Fall back to the
246 # legacy protocol.
246 # legacy protocol.
247
247
248 # Don't allow untrusted settings because disabling compression or
248 # Don't allow untrusted settings because disabling compression or
249 # setting a very high compression level could lead to flooding
249 # setting a very high compression level could lead to flooding
250 # the server's network or CPU.
250 # the server's network or CPU.
251 opts = {'level': ui.configint('server', 'zliblevel')}
251 opts = {'level': ui.configint('server', 'zliblevel')}
252 return HGTYPE, util.compengines['zlib'], opts
252 return HGTYPE, util.compengines['zlib'], opts
253
253
254 def _callhttp(repo, wsgireq, proto, cmd):
254 def _callhttp(repo, wsgireq, proto, cmd):
255 def genversion2(gen, engine, engineopts):
255 def genversion2(gen, engine, engineopts):
256 # application/mercurial-0.2 always sends a payload header
256 # application/mercurial-0.2 always sends a payload header
257 # identifying the compression engine.
257 # identifying the compression engine.
258 name = engine.wireprotosupport().name
258 name = engine.wireprotosupport().name
259 assert 0 < len(name) < 256
259 assert 0 < len(name) < 256
260 yield struct.pack('B', len(name))
260 yield struct.pack('B', len(name))
261 yield name
261 yield name
262
262
263 for chunk in gen:
263 for chunk in gen:
264 yield chunk
264 yield chunk
265
265
266 if not wireproto.commands.commandavailable(cmd, proto):
266 if not wireproto.commands.commandavailable(cmd, proto):
267 wsgireq.respond(HTTP_OK, HGERRTYPE,
267 wsgireq.respond(HTTP_OK, HGERRTYPE,
268 body=_('requested wire protocol command is not '
268 body=_('requested wire protocol command is not '
269 'available over HTTP'))
269 'available over HTTP'))
270 return []
270 return []
271
271
272 proto.checkperm(wireproto.commands[cmd].permission)
272 proto.checkperm(wireproto.commands[cmd].permission)
273
273
274 rsp = wireproto.dispatch(repo, proto, cmd)
274 rsp = wireproto.dispatch(repo, proto, cmd)
275
275
276 if isinstance(rsp, bytes):
276 if isinstance(rsp, bytes):
277 wsgireq.respond(HTTP_OK, HGTYPE, body=rsp)
277 wsgireq.respond(HTTP_OK, HGTYPE, body=rsp)
278 return []
278 return []
279 elif isinstance(rsp, wireprototypes.bytesresponse):
279 elif isinstance(rsp, wireprototypes.bytesresponse):
280 wsgireq.respond(HTTP_OK, HGTYPE, body=rsp.data)
280 wsgireq.respond(HTTP_OK, HGTYPE, body=rsp.data)
281 return []
281 return []
282 elif isinstance(rsp, wireprototypes.streamreslegacy):
282 elif isinstance(rsp, wireprototypes.streamreslegacy):
283 gen = rsp.gen
283 gen = rsp.gen
284 wsgireq.respond(HTTP_OK, HGTYPE)
284 wsgireq.respond(HTTP_OK, HGTYPE)
285 return gen
285 return gen
286 elif isinstance(rsp, wireprototypes.streamres):
286 elif isinstance(rsp, wireprototypes.streamres):
287 gen = rsp.gen
287 gen = rsp.gen
288
288
289 # This code for compression should not be streamres specific. It
289 # This code for compression should not be streamres specific. It
290 # is here because we only compress streamres at the moment.
290 # is here because we only compress streamres at the moment.
291 mediatype, engine, engineopts = _httpresponsetype(
291 mediatype, engine, engineopts = _httpresponsetype(
292 repo.ui, wsgireq, rsp.prefer_uncompressed)
292 repo.ui, wsgireq, rsp.prefer_uncompressed)
293 gen = engine.compressstream(gen, engineopts)
293 gen = engine.compressstream(gen, engineopts)
294
294
295 if mediatype == HGTYPE2:
295 if mediatype == HGTYPE2:
296 gen = genversion2(gen, engine, engineopts)
296 gen = genversion2(gen, engine, engineopts)
297
297
298 wsgireq.respond(HTTP_OK, mediatype)
298 wsgireq.respond(HTTP_OK, mediatype)
299 return gen
299 return gen
300 elif isinstance(rsp, wireprototypes.pushres):
300 elif isinstance(rsp, wireprototypes.pushres):
301 rsp = '%d\n%s' % (rsp.res, rsp.output)
301 rsp = '%d\n%s' % (rsp.res, rsp.output)
302 wsgireq.respond(HTTP_OK, HGTYPE, body=rsp)
302 wsgireq.respond(HTTP_OK, HGTYPE, body=rsp)
303 return []
303 return []
304 elif isinstance(rsp, wireprototypes.pusherr):
304 elif isinstance(rsp, wireprototypes.pusherr):
305 # This is the httplib workaround documented in _handlehttperror().
305 # This is the httplib workaround documented in _handlehttperror().
306 wsgireq.drain()
306 wsgireq.drain()
307
307
308 rsp = '0\n%s\n' % rsp.res
308 rsp = '0\n%s\n' % rsp.res
309 wsgireq.respond(HTTP_OK, HGTYPE, body=rsp)
309 wsgireq.respond(HTTP_OK, HGTYPE, body=rsp)
310 return []
310 return []
311 elif isinstance(rsp, wireprototypes.ooberror):
311 elif isinstance(rsp, wireprototypes.ooberror):
312 rsp = rsp.message
312 rsp = rsp.message
313 wsgireq.respond(HTTP_OK, HGERRTYPE, body=rsp)
313 wsgireq.respond(HTTP_OK, HGERRTYPE, body=rsp)
314 return []
314 return []
315 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
315 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
316
316
317 def _handlehttperror(e, wsgireq, cmd):
317 def _handlehttperror(e, wsgireq, cmd):
318 """Called when an ErrorResponse is raised during HTTP request processing."""
318 """Called when an ErrorResponse is raised during HTTP request processing."""
319
319
320 # Clients using Python's httplib are stateful: the HTTP client
320 # Clients using Python's httplib are stateful: the HTTP client
321 # won't process an HTTP response until all request data is
321 # won't process an HTTP response until all request data is
322 # sent to the server. The intent of this code is to ensure
322 # sent to the server. The intent of this code is to ensure
323 # we always read HTTP request data from the client, thus
323 # we always read HTTP request data from the client, thus
324 # ensuring httplib transitions to a state that allows it to read
324 # ensuring httplib transitions to a state that allows it to read
325 # the HTTP response. In other words, it helps prevent deadlocks
325 # the HTTP response. In other words, it helps prevent deadlocks
326 # on clients using httplib.
326 # on clients using httplib.
327
327
328 if (wsgireq.env[r'REQUEST_METHOD'] == r'POST' and
328 if (wsgireq.env[r'REQUEST_METHOD'] == r'POST' and
329 # But not if Expect: 100-continue is being used.
329 # But not if Expect: 100-continue is being used.
330 (wsgireq.env.get('HTTP_EXPECT',
330 (wsgireq.env.get('HTTP_EXPECT',
331 '').lower() != '100-continue') or
331 '').lower() != '100-continue')):
332 # Or the non-httplib HTTP library is being advertised by
333 # the client.
334 wsgireq.env.get('X-HgHttp2', '')):
335 wsgireq.drain()
332 wsgireq.drain()
336 else:
333 else:
337 wsgireq.headers.append((r'Connection', r'Close'))
334 wsgireq.headers.append((r'Connection', r'Close'))
338
335
339 # TODO This response body assumes the failed command was
336 # TODO This response body assumes the failed command was
340 # "unbundle." That assumption is not always valid.
337 # "unbundle." That assumption is not always valid.
341 wsgireq.respond(e, HGTYPE, body='0\n%s\n' % pycompat.bytestr(e))
338 wsgireq.respond(e, HGTYPE, body='0\n%s\n' % pycompat.bytestr(e))
342
339
343 return ''
340 return ''
344
341
345 def _sshv1respondbytes(fout, value):
342 def _sshv1respondbytes(fout, value):
346 """Send a bytes response for protocol version 1."""
343 """Send a bytes response for protocol version 1."""
347 fout.write('%d\n' % len(value))
344 fout.write('%d\n' % len(value))
348 fout.write(value)
345 fout.write(value)
349 fout.flush()
346 fout.flush()
350
347
351 def _sshv1respondstream(fout, source):
348 def _sshv1respondstream(fout, source):
352 write = fout.write
349 write = fout.write
353 for chunk in source.gen:
350 for chunk in source.gen:
354 write(chunk)
351 write(chunk)
355 fout.flush()
352 fout.flush()
356
353
357 def _sshv1respondooberror(fout, ferr, rsp):
354 def _sshv1respondooberror(fout, ferr, rsp):
358 ferr.write(b'%s\n-\n' % rsp)
355 ferr.write(b'%s\n-\n' % rsp)
359 ferr.flush()
356 ferr.flush()
360 fout.write(b'\n')
357 fout.write(b'\n')
361 fout.flush()
358 fout.flush()
362
359
363 class sshv1protocolhandler(wireprototypes.baseprotocolhandler):
360 class sshv1protocolhandler(wireprototypes.baseprotocolhandler):
364 """Handler for requests services via version 1 of SSH protocol."""
361 """Handler for requests services via version 1 of SSH protocol."""
365 def __init__(self, ui, fin, fout):
362 def __init__(self, ui, fin, fout):
366 self._ui = ui
363 self._ui = ui
367 self._fin = fin
364 self._fin = fin
368 self._fout = fout
365 self._fout = fout
369
366
370 @property
367 @property
371 def name(self):
368 def name(self):
372 return wireprototypes.SSHV1
369 return wireprototypes.SSHV1
373
370
374 def getargs(self, args):
371 def getargs(self, args):
375 data = {}
372 data = {}
376 keys = args.split()
373 keys = args.split()
377 for n in xrange(len(keys)):
374 for n in xrange(len(keys)):
378 argline = self._fin.readline()[:-1]
375 argline = self._fin.readline()[:-1]
379 arg, l = argline.split()
376 arg, l = argline.split()
380 if arg not in keys:
377 if arg not in keys:
381 raise error.Abort(_("unexpected parameter %r") % arg)
378 raise error.Abort(_("unexpected parameter %r") % arg)
382 if arg == '*':
379 if arg == '*':
383 star = {}
380 star = {}
384 for k in xrange(int(l)):
381 for k in xrange(int(l)):
385 argline = self._fin.readline()[:-1]
382 argline = self._fin.readline()[:-1]
386 arg, l = argline.split()
383 arg, l = argline.split()
387 val = self._fin.read(int(l))
384 val = self._fin.read(int(l))
388 star[arg] = val
385 star[arg] = val
389 data['*'] = star
386 data['*'] = star
390 else:
387 else:
391 val = self._fin.read(int(l))
388 val = self._fin.read(int(l))
392 data[arg] = val
389 data[arg] = val
393 return [data[k] for k in keys]
390 return [data[k] for k in keys]
394
391
395 def forwardpayload(self, fpout):
392 def forwardpayload(self, fpout):
396 # We initially send an empty response. This tells the client it is
393 # We initially send an empty response. This tells the client it is
397 # OK to start sending data. If a client sees any other response, it
394 # OK to start sending data. If a client sees any other response, it
398 # interprets it as an error.
395 # interprets it as an error.
399 _sshv1respondbytes(self._fout, b'')
396 _sshv1respondbytes(self._fout, b'')
400
397
401 # The file is in the form:
398 # The file is in the form:
402 #
399 #
403 # <chunk size>\n<chunk>
400 # <chunk size>\n<chunk>
404 # ...
401 # ...
405 # 0\n
402 # 0\n
406 count = int(self._fin.readline())
403 count = int(self._fin.readline())
407 while count:
404 while count:
408 fpout.write(self._fin.read(count))
405 fpout.write(self._fin.read(count))
409 count = int(self._fin.readline())
406 count = int(self._fin.readline())
410
407
411 @contextlib.contextmanager
408 @contextlib.contextmanager
412 def mayberedirectstdio(self):
409 def mayberedirectstdio(self):
413 yield None
410 yield None
414
411
415 def client(self):
412 def client(self):
416 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
413 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
417 return 'remote:ssh:' + client
414 return 'remote:ssh:' + client
418
415
419 def addcapabilities(self, repo, caps):
416 def addcapabilities(self, repo, caps):
420 return caps
417 return caps
421
418
422 def checkperm(self, perm):
419 def checkperm(self, perm):
423 pass
420 pass
424
421
425 class sshv2protocolhandler(sshv1protocolhandler):
422 class sshv2protocolhandler(sshv1protocolhandler):
426 """Protocol handler for version 2 of the SSH protocol."""
423 """Protocol handler for version 2 of the SSH protocol."""
427
424
428 @property
425 @property
429 def name(self):
426 def name(self):
430 return wireprototypes.SSHV2
427 return wireprototypes.SSHV2
431
428
432 def _runsshserver(ui, repo, fin, fout, ev):
429 def _runsshserver(ui, repo, fin, fout, ev):
433 # This function operates like a state machine of sorts. The following
430 # This function operates like a state machine of sorts. The following
434 # states are defined:
431 # states are defined:
435 #
432 #
436 # protov1-serving
433 # protov1-serving
437 # Server is in protocol version 1 serving mode. Commands arrive on
434 # Server is in protocol version 1 serving mode. Commands arrive on
438 # new lines. These commands are processed in this state, one command
435 # new lines. These commands are processed in this state, one command
439 # after the other.
436 # after the other.
440 #
437 #
441 # protov2-serving
438 # protov2-serving
442 # Server is in protocol version 2 serving mode.
439 # Server is in protocol version 2 serving mode.
443 #
440 #
444 # upgrade-initial
441 # upgrade-initial
445 # The server is going to process an upgrade request.
442 # The server is going to process an upgrade request.
446 #
443 #
447 # upgrade-v2-filter-legacy-handshake
444 # upgrade-v2-filter-legacy-handshake
448 # The protocol is being upgraded to version 2. The server is expecting
445 # The protocol is being upgraded to version 2. The server is expecting
449 # the legacy handshake from version 1.
446 # the legacy handshake from version 1.
450 #
447 #
451 # upgrade-v2-finish
448 # upgrade-v2-finish
452 # The upgrade to version 2 of the protocol is imminent.
449 # The upgrade to version 2 of the protocol is imminent.
453 #
450 #
454 # shutdown
451 # shutdown
455 # The server is shutting down, possibly in reaction to a client event.
452 # The server is shutting down, possibly in reaction to a client event.
456 #
453 #
457 # And here are their transitions:
454 # And here are their transitions:
458 #
455 #
459 # protov1-serving -> shutdown
456 # protov1-serving -> shutdown
460 # When server receives an empty request or encounters another
457 # When server receives an empty request or encounters another
461 # error.
458 # error.
462 #
459 #
463 # protov1-serving -> upgrade-initial
460 # protov1-serving -> upgrade-initial
464 # An upgrade request line was seen.
461 # An upgrade request line was seen.
465 #
462 #
466 # upgrade-initial -> upgrade-v2-filter-legacy-handshake
463 # upgrade-initial -> upgrade-v2-filter-legacy-handshake
467 # Upgrade to version 2 in progress. Server is expecting to
464 # Upgrade to version 2 in progress. Server is expecting to
468 # process a legacy handshake.
465 # process a legacy handshake.
469 #
466 #
470 # upgrade-v2-filter-legacy-handshake -> shutdown
467 # upgrade-v2-filter-legacy-handshake -> shutdown
471 # Client did not fulfill upgrade handshake requirements.
468 # Client did not fulfill upgrade handshake requirements.
472 #
469 #
473 # upgrade-v2-filter-legacy-handshake -> upgrade-v2-finish
470 # upgrade-v2-filter-legacy-handshake -> upgrade-v2-finish
474 # Client fulfilled version 2 upgrade requirements. Finishing that
471 # Client fulfilled version 2 upgrade requirements. Finishing that
475 # upgrade.
472 # upgrade.
476 #
473 #
477 # upgrade-v2-finish -> protov2-serving
474 # upgrade-v2-finish -> protov2-serving
478 # Protocol upgrade to version 2 complete. Server can now speak protocol
475 # Protocol upgrade to version 2 complete. Server can now speak protocol
479 # version 2.
476 # version 2.
480 #
477 #
481 # protov2-serving -> protov1-serving
478 # protov2-serving -> protov1-serving
482 # Ths happens by default since protocol version 2 is the same as
479 # Ths happens by default since protocol version 2 is the same as
483 # version 1 except for the handshake.
480 # version 1 except for the handshake.
484
481
485 state = 'protov1-serving'
482 state = 'protov1-serving'
486 proto = sshv1protocolhandler(ui, fin, fout)
483 proto = sshv1protocolhandler(ui, fin, fout)
487 protoswitched = False
484 protoswitched = False
488
485
489 while not ev.is_set():
486 while not ev.is_set():
490 if state == 'protov1-serving':
487 if state == 'protov1-serving':
491 # Commands are issued on new lines.
488 # Commands are issued on new lines.
492 request = fin.readline()[:-1]
489 request = fin.readline()[:-1]
493
490
494 # Empty lines signal to terminate the connection.
491 # Empty lines signal to terminate the connection.
495 if not request:
492 if not request:
496 state = 'shutdown'
493 state = 'shutdown'
497 continue
494 continue
498
495
499 # It looks like a protocol upgrade request. Transition state to
496 # It looks like a protocol upgrade request. Transition state to
500 # handle it.
497 # handle it.
501 if request.startswith(b'upgrade '):
498 if request.startswith(b'upgrade '):
502 if protoswitched:
499 if protoswitched:
503 _sshv1respondooberror(fout, ui.ferr,
500 _sshv1respondooberror(fout, ui.ferr,
504 b'cannot upgrade protocols multiple '
501 b'cannot upgrade protocols multiple '
505 b'times')
502 b'times')
506 state = 'shutdown'
503 state = 'shutdown'
507 continue
504 continue
508
505
509 state = 'upgrade-initial'
506 state = 'upgrade-initial'
510 continue
507 continue
511
508
512 available = wireproto.commands.commandavailable(request, proto)
509 available = wireproto.commands.commandavailable(request, proto)
513
510
514 # This command isn't available. Send an empty response and go
511 # This command isn't available. Send an empty response and go
515 # back to waiting for a new command.
512 # back to waiting for a new command.
516 if not available:
513 if not available:
517 _sshv1respondbytes(fout, b'')
514 _sshv1respondbytes(fout, b'')
518 continue
515 continue
519
516
520 rsp = wireproto.dispatch(repo, proto, request)
517 rsp = wireproto.dispatch(repo, proto, request)
521
518
522 if isinstance(rsp, bytes):
519 if isinstance(rsp, bytes):
523 _sshv1respondbytes(fout, rsp)
520 _sshv1respondbytes(fout, rsp)
524 elif isinstance(rsp, wireprototypes.bytesresponse):
521 elif isinstance(rsp, wireprototypes.bytesresponse):
525 _sshv1respondbytes(fout, rsp.data)
522 _sshv1respondbytes(fout, rsp.data)
526 elif isinstance(rsp, wireprototypes.streamres):
523 elif isinstance(rsp, wireprototypes.streamres):
527 _sshv1respondstream(fout, rsp)
524 _sshv1respondstream(fout, rsp)
528 elif isinstance(rsp, wireprototypes.streamreslegacy):
525 elif isinstance(rsp, wireprototypes.streamreslegacy):
529 _sshv1respondstream(fout, rsp)
526 _sshv1respondstream(fout, rsp)
530 elif isinstance(rsp, wireprototypes.pushres):
527 elif isinstance(rsp, wireprototypes.pushres):
531 _sshv1respondbytes(fout, b'')
528 _sshv1respondbytes(fout, b'')
532 _sshv1respondbytes(fout, b'%d' % rsp.res)
529 _sshv1respondbytes(fout, b'%d' % rsp.res)
533 elif isinstance(rsp, wireprototypes.pusherr):
530 elif isinstance(rsp, wireprototypes.pusherr):
534 _sshv1respondbytes(fout, rsp.res)
531 _sshv1respondbytes(fout, rsp.res)
535 elif isinstance(rsp, wireprototypes.ooberror):
532 elif isinstance(rsp, wireprototypes.ooberror):
536 _sshv1respondooberror(fout, ui.ferr, rsp.message)
533 _sshv1respondooberror(fout, ui.ferr, rsp.message)
537 else:
534 else:
538 raise error.ProgrammingError('unhandled response type from '
535 raise error.ProgrammingError('unhandled response type from '
539 'wire protocol command: %s' % rsp)
536 'wire protocol command: %s' % rsp)
540
537
541 # For now, protocol version 2 serving just goes back to version 1.
538 # For now, protocol version 2 serving just goes back to version 1.
542 elif state == 'protov2-serving':
539 elif state == 'protov2-serving':
543 state = 'protov1-serving'
540 state = 'protov1-serving'
544 continue
541 continue
545
542
546 elif state == 'upgrade-initial':
543 elif state == 'upgrade-initial':
547 # We should never transition into this state if we've switched
544 # We should never transition into this state if we've switched
548 # protocols.
545 # protocols.
549 assert not protoswitched
546 assert not protoswitched
550 assert proto.name == wireprototypes.SSHV1
547 assert proto.name == wireprototypes.SSHV1
551
548
552 # Expected: upgrade <token> <capabilities>
549 # Expected: upgrade <token> <capabilities>
553 # If we get something else, the request is malformed. It could be
550 # If we get something else, the request is malformed. It could be
554 # from a future client that has altered the upgrade line content.
551 # from a future client that has altered the upgrade line content.
555 # We treat this as an unknown command.
552 # We treat this as an unknown command.
556 try:
553 try:
557 token, caps = request.split(b' ')[1:]
554 token, caps = request.split(b' ')[1:]
558 except ValueError:
555 except ValueError:
559 _sshv1respondbytes(fout, b'')
556 _sshv1respondbytes(fout, b'')
560 state = 'protov1-serving'
557 state = 'protov1-serving'
561 continue
558 continue
562
559
563 # Send empty response if we don't support upgrading protocols.
560 # Send empty response if we don't support upgrading protocols.
564 if not ui.configbool('experimental', 'sshserver.support-v2'):
561 if not ui.configbool('experimental', 'sshserver.support-v2'):
565 _sshv1respondbytes(fout, b'')
562 _sshv1respondbytes(fout, b'')
566 state = 'protov1-serving'
563 state = 'protov1-serving'
567 continue
564 continue
568
565
569 try:
566 try:
570 caps = urlreq.parseqs(caps)
567 caps = urlreq.parseqs(caps)
571 except ValueError:
568 except ValueError:
572 _sshv1respondbytes(fout, b'')
569 _sshv1respondbytes(fout, b'')
573 state = 'protov1-serving'
570 state = 'protov1-serving'
574 continue
571 continue
575
572
576 # We don't see an upgrade request to protocol version 2. Ignore
573 # We don't see an upgrade request to protocol version 2. Ignore
577 # the upgrade request.
574 # the upgrade request.
578 wantedprotos = caps.get(b'proto', [b''])[0]
575 wantedprotos = caps.get(b'proto', [b''])[0]
579 if SSHV2 not in wantedprotos:
576 if SSHV2 not in wantedprotos:
580 _sshv1respondbytes(fout, b'')
577 _sshv1respondbytes(fout, b'')
581 state = 'protov1-serving'
578 state = 'protov1-serving'
582 continue
579 continue
583
580
584 # It looks like we can honor this upgrade request to protocol 2.
581 # It looks like we can honor this upgrade request to protocol 2.
585 # Filter the rest of the handshake protocol request lines.
582 # Filter the rest of the handshake protocol request lines.
586 state = 'upgrade-v2-filter-legacy-handshake'
583 state = 'upgrade-v2-filter-legacy-handshake'
587 continue
584 continue
588
585
589 elif state == 'upgrade-v2-filter-legacy-handshake':
586 elif state == 'upgrade-v2-filter-legacy-handshake':
590 # Client should have sent legacy handshake after an ``upgrade``
587 # Client should have sent legacy handshake after an ``upgrade``
591 # request. Expected lines:
588 # request. Expected lines:
592 #
589 #
593 # hello
590 # hello
594 # between
591 # between
595 # pairs 81
592 # pairs 81
596 # 0000...-0000...
593 # 0000...-0000...
597
594
598 ok = True
595 ok = True
599 for line in (b'hello', b'between', b'pairs 81'):
596 for line in (b'hello', b'between', b'pairs 81'):
600 request = fin.readline()[:-1]
597 request = fin.readline()[:-1]
601
598
602 if request != line:
599 if request != line:
603 _sshv1respondooberror(fout, ui.ferr,
600 _sshv1respondooberror(fout, ui.ferr,
604 b'malformed handshake protocol: '
601 b'malformed handshake protocol: '
605 b'missing %s' % line)
602 b'missing %s' % line)
606 ok = False
603 ok = False
607 state = 'shutdown'
604 state = 'shutdown'
608 break
605 break
609
606
610 if not ok:
607 if not ok:
611 continue
608 continue
612
609
613 request = fin.read(81)
610 request = fin.read(81)
614 if request != b'%s-%s' % (b'0' * 40, b'0' * 40):
611 if request != b'%s-%s' % (b'0' * 40, b'0' * 40):
615 _sshv1respondooberror(fout, ui.ferr,
612 _sshv1respondooberror(fout, ui.ferr,
616 b'malformed handshake protocol: '
613 b'malformed handshake protocol: '
617 b'missing between argument value')
614 b'missing between argument value')
618 state = 'shutdown'
615 state = 'shutdown'
619 continue
616 continue
620
617
621 state = 'upgrade-v2-finish'
618 state = 'upgrade-v2-finish'
622 continue
619 continue
623
620
624 elif state == 'upgrade-v2-finish':
621 elif state == 'upgrade-v2-finish':
625 # Send the upgrade response.
622 # Send the upgrade response.
626 fout.write(b'upgraded %s %s\n' % (token, SSHV2))
623 fout.write(b'upgraded %s %s\n' % (token, SSHV2))
627 servercaps = wireproto.capabilities(repo, proto)
624 servercaps = wireproto.capabilities(repo, proto)
628 rsp = b'capabilities: %s' % servercaps.data
625 rsp = b'capabilities: %s' % servercaps.data
629 fout.write(b'%d\n%s\n' % (len(rsp), rsp))
626 fout.write(b'%d\n%s\n' % (len(rsp), rsp))
630 fout.flush()
627 fout.flush()
631
628
632 proto = sshv2protocolhandler(ui, fin, fout)
629 proto = sshv2protocolhandler(ui, fin, fout)
633 protoswitched = True
630 protoswitched = True
634
631
635 state = 'protov2-serving'
632 state = 'protov2-serving'
636 continue
633 continue
637
634
638 elif state == 'shutdown':
635 elif state == 'shutdown':
639 break
636 break
640
637
641 else:
638 else:
642 raise error.ProgrammingError('unhandled ssh server state: %s' %
639 raise error.ProgrammingError('unhandled ssh server state: %s' %
643 state)
640 state)
644
641
645 class sshserver(object):
642 class sshserver(object):
646 def __init__(self, ui, repo, logfh=None):
643 def __init__(self, ui, repo, logfh=None):
647 self._ui = ui
644 self._ui = ui
648 self._repo = repo
645 self._repo = repo
649 self._fin = ui.fin
646 self._fin = ui.fin
650 self._fout = ui.fout
647 self._fout = ui.fout
651
648
652 # Log write I/O to stdout and stderr if configured.
649 # Log write I/O to stdout and stderr if configured.
653 if logfh:
650 if logfh:
654 self._fout = util.makeloggingfileobject(
651 self._fout = util.makeloggingfileobject(
655 logfh, self._fout, 'o', logdata=True)
652 logfh, self._fout, 'o', logdata=True)
656 ui.ferr = util.makeloggingfileobject(
653 ui.ferr = util.makeloggingfileobject(
657 logfh, ui.ferr, 'e', logdata=True)
654 logfh, ui.ferr, 'e', logdata=True)
658
655
659 hook.redirect(True)
656 hook.redirect(True)
660 ui.fout = repo.ui.fout = ui.ferr
657 ui.fout = repo.ui.fout = ui.ferr
661
658
662 # Prevent insertion/deletion of CRs
659 # Prevent insertion/deletion of CRs
663 util.setbinary(self._fin)
660 util.setbinary(self._fin)
664 util.setbinary(self._fout)
661 util.setbinary(self._fout)
665
662
666 def serve_forever(self):
663 def serve_forever(self):
667 self.serveuntil(threading.Event())
664 self.serveuntil(threading.Event())
668 sys.exit(0)
665 sys.exit(0)
669
666
670 def serveuntil(self, ev):
667 def serveuntil(self, ev):
671 """Serve until a threading.Event is set."""
668 """Serve until a threading.Event is set."""
672 _runsshserver(self._ui, self._repo, self._fin, self._fout, ev)
669 _runsshserver(self._ui, self._repo, self._fin, self._fout, ev)
General Comments 0
You need to be logged in to leave comments. Login now