##// END OF EJS Templates
wireprotoserver: document and improve the httplib workaround...
Gregory Szorc -
r36005:6010fe1d default
parent child Browse files
Show More
@@ -1,421 +1,434 b''
1 1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 3 #
4 4 # This software may be used and distributed according to the terms of the
5 5 # GNU General Public License version 2 or any later version.
6 6
7 7 from __future__ import absolute_import
8 8
9 9 import abc
10 10 import cgi
11 11 import struct
12 12 import sys
13 13
14 14 from .i18n import _
15 15 from . import (
16 16 encoding,
17 17 error,
18 18 hook,
19 19 pycompat,
20 20 util,
21 21 wireproto,
22 22 )
23 23
24 24 stringio = util.stringio
25 25
26 26 urlerr = util.urlerr
27 27 urlreq = util.urlreq
28 28
29 29 HTTP_OK = 200
30 30
31 31 HGTYPE = 'application/mercurial-0.1'
32 32 HGTYPE2 = 'application/mercurial-0.2'
33 33 HGERRTYPE = 'application/hg-error'
34 34
35 35 # Names of the SSH protocol implementations.
36 36 SSHV1 = 'ssh-v1'
37 37 # This is advertised over the wire. Incremental the counter at the end
38 38 # to reflect BC breakages.
39 39 SSHV2 = 'exp-ssh-v2-0001'
40 40
41 41 class abstractserverproto(object):
42 42 """abstract class that summarizes the protocol API
43 43
44 44 Used as reference and documentation.
45 45 """
46 46
47 47 __metaclass__ = abc.ABCMeta
48 48
49 49 @abc.abstractproperty
50 50 def name(self):
51 51 """The name of the protocol implementation.
52 52
53 53 Used for uniquely identifying the transport type.
54 54 """
55 55
56 56 @abc.abstractmethod
57 57 def getargs(self, args):
58 58 """return the value for arguments in <args>
59 59
60 60 returns a list of values (same order as <args>)"""
61 61
62 62 @abc.abstractmethod
63 63 def getfile(self, fp):
64 64 """write the whole content of a file into a file like object
65 65
66 66 The file is in the form::
67 67
68 68 (<chunk-size>\n<chunk>)+0\n
69 69
70 70 chunk size is the ascii version of the int.
71 71 """
72 72
73 73 @abc.abstractmethod
74 74 def redirect(self):
75 75 """may setup interception for stdout and stderr
76 76
77 77 See also the `restore` method."""
78 78
79 79 # If the `redirect` function does install interception, the `restore`
80 80 # function MUST be defined. If interception is not used, this function
81 81 # MUST NOT be defined.
82 82 #
83 83 # left commented here on purpose
84 84 #
85 85 #def restore(self):
86 86 # """reinstall previous stdout and stderr and return intercepted stdout
87 87 # """
88 88 # raise NotImplementedError()
89 89
90 90 def decodevaluefromheaders(req, headerprefix):
91 91 """Decode a long value from multiple HTTP request headers.
92 92
93 93 Returns the value as a bytes, not a str.
94 94 """
95 95 chunks = []
96 96 i = 1
97 97 prefix = headerprefix.upper().replace(r'-', r'_')
98 98 while True:
99 99 v = req.env.get(r'HTTP_%s_%d' % (prefix, i))
100 100 if v is None:
101 101 break
102 102 chunks.append(pycompat.bytesurl(v))
103 103 i += 1
104 104
105 105 return ''.join(chunks)
106 106
107 107 class webproto(abstractserverproto):
108 108 def __init__(self, req, ui):
109 109 self._req = req
110 110 self._ui = ui
111 111
112 112 @property
113 113 def name(self):
114 114 return 'http'
115 115
116 116 def getargs(self, args):
117 117 knownargs = self._args()
118 118 data = {}
119 119 keys = args.split()
120 120 for k in keys:
121 121 if k == '*':
122 122 star = {}
123 123 for key in knownargs.keys():
124 124 if key != 'cmd' and key not in keys:
125 125 star[key] = knownargs[key][0]
126 126 data['*'] = star
127 127 else:
128 128 data[k] = knownargs[k][0]
129 129 return [data[k] for k in keys]
130 130
131 131 def _args(self):
132 132 args = util.rapply(pycompat.bytesurl, self._req.form.copy())
133 133 postlen = int(self._req.env.get(r'HTTP_X_HGARGS_POST', 0))
134 134 if postlen:
135 135 args.update(cgi.parse_qs(
136 136 self._req.read(postlen), keep_blank_values=True))
137 137 return args
138 138
139 139 argvalue = decodevaluefromheaders(self._req, r'X-HgArg')
140 140 args.update(cgi.parse_qs(argvalue, keep_blank_values=True))
141 141 return args
142 142
143 143 def getfile(self, fp):
144 144 length = int(self._req.env[r'CONTENT_LENGTH'])
145 145 # If httppostargs is used, we need to read Content-Length
146 146 # minus the amount that was consumed by args.
147 147 length -= int(self._req.env.get(r'HTTP_X_HGARGS_POST', 0))
148 148 for s in util.filechunkiter(self._req, limit=length):
149 149 fp.write(s)
150 150
151 151 def redirect(self):
152 152 self._oldio = self._ui.fout, self._ui.ferr
153 153 self._ui.ferr = self._ui.fout = stringio()
154 154
155 155 def restore(self):
156 156 val = self._ui.fout.getvalue()
157 157 self._ui.ferr, self._ui.fout = self._oldio
158 158 return val
159 159
160 160 def _client(self):
161 161 return 'remote:%s:%s:%s' % (
162 162 self._req.env.get('wsgi.url_scheme') or 'http',
163 163 urlreq.quote(self._req.env.get('REMOTE_HOST', '')),
164 164 urlreq.quote(self._req.env.get('REMOTE_USER', '')))
165 165
166 166 def responsetype(self, prefer_uncompressed):
167 167 """Determine the appropriate response type and compression settings.
168 168
169 169 Returns a tuple of (mediatype, compengine, engineopts).
170 170 """
171 171 # Determine the response media type and compression engine based
172 172 # on the request parameters.
173 173 protocaps = decodevaluefromheaders(self._req, r'X-HgProto').split(' ')
174 174
175 175 if '0.2' in protocaps:
176 176 # All clients are expected to support uncompressed data.
177 177 if prefer_uncompressed:
178 178 return HGTYPE2, util._noopengine(), {}
179 179
180 180 # Default as defined by wire protocol spec.
181 181 compformats = ['zlib', 'none']
182 182 for cap in protocaps:
183 183 if cap.startswith('comp='):
184 184 compformats = cap[5:].split(',')
185 185 break
186 186
187 187 # Now find an agreed upon compression format.
188 188 for engine in wireproto.supportedcompengines(self._ui, self,
189 189 util.SERVERROLE):
190 190 if engine.wireprotosupport().name in compformats:
191 191 opts = {}
192 192 level = self._ui.configint('server',
193 193 '%slevel' % engine.name())
194 194 if level is not None:
195 195 opts['level'] = level
196 196
197 197 return HGTYPE2, engine, opts
198 198
199 199 # No mutually supported compression format. Fall back to the
200 200 # legacy protocol.
201 201
202 202 # Don't allow untrusted settings because disabling compression or
203 203 # setting a very high compression level could lead to flooding
204 204 # the server's network or CPU.
205 205 opts = {'level': self._ui.configint('server', 'zliblevel')}
206 206 return HGTYPE, util.compengines['zlib'], opts
207 207
208 208 def iscmd(cmd):
209 209 return cmd in wireproto.commands
210 210
211 211 def parsehttprequest(repo, req, query):
212 212 """Parse the HTTP request for a wire protocol request.
213 213
214 214 If the current request appears to be a wire protocol request, this
215 215 function returns a dict with details about that request, including
216 216 an ``abstractprotocolserver`` instance suitable for handling the
217 217 request. Otherwise, ``None`` is returned.
218 218
219 219 ``req`` is a ``wsgirequest`` instance.
220 220 """
221 221 # HTTP version 1 wire protocol requests are denoted by a "cmd" query
222 222 # string parameter. If it isn't present, this isn't a wire protocol
223 223 # request.
224 224 if r'cmd' not in req.form:
225 225 return None
226 226
227 227 cmd = pycompat.sysbytes(req.form[r'cmd'][0])
228 228
229 229 # The "cmd" request parameter is used by both the wire protocol and hgweb.
230 230 # While not all wire protocol commands are available for all transports,
231 231 # if we see a "cmd" value that resembles a known wire protocol command, we
232 232 # route it to a protocol handler. This is better than routing possible
233 233 # wire protocol requests to hgweb because it prevents hgweb from using
234 234 # known wire protocol commands and it is less confusing for machine
235 235 # clients.
236 236 if cmd not in wireproto.commands:
237 237 return None
238 238
239 239 proto = webproto(req, repo.ui)
240 240
241 241 return {
242 242 'cmd': cmd,
243 243 'proto': proto,
244 244 'dispatch': lambda: _callhttp(repo, req, proto, cmd),
245 245 'handleerror': lambda ex: _handlehttperror(ex, req, cmd),
246 246 }
247 247
248 248 def _callhttp(repo, req, proto, cmd):
249 249 def genversion2(gen, engine, engineopts):
250 250 # application/mercurial-0.2 always sends a payload header
251 251 # identifying the compression engine.
252 252 name = engine.wireprotosupport().name
253 253 assert 0 < len(name) < 256
254 254 yield struct.pack('B', len(name))
255 255 yield name
256 256
257 257 for chunk in gen:
258 258 yield chunk
259 259
260 260 rsp = wireproto.dispatch(repo, proto, cmd)
261 261
262 262 if not wireproto.commands.commandavailable(cmd, proto):
263 263 req.respond(HTTP_OK, HGERRTYPE,
264 264 body=_('requested wire protocol command is not available '
265 265 'over HTTP'))
266 266 return []
267 267
268 268 if isinstance(rsp, bytes):
269 269 req.respond(HTTP_OK, HGTYPE, body=rsp)
270 270 return []
271 271 elif isinstance(rsp, wireproto.streamres_legacy):
272 272 gen = rsp.gen
273 273 req.respond(HTTP_OK, HGTYPE)
274 274 return gen
275 275 elif isinstance(rsp, wireproto.streamres):
276 276 gen = rsp.gen
277 277
278 278 # This code for compression should not be streamres specific. It
279 279 # is here because we only compress streamres at the moment.
280 280 mediatype, engine, engineopts = proto.responsetype(
281 281 rsp.prefer_uncompressed)
282 282 gen = engine.compressstream(gen, engineopts)
283 283
284 284 if mediatype == HGTYPE2:
285 285 gen = genversion2(gen, engine, engineopts)
286 286
287 287 req.respond(HTTP_OK, mediatype)
288 288 return gen
289 289 elif isinstance(rsp, wireproto.pushres):
290 290 val = proto.restore()
291 291 rsp = '%d\n%s' % (rsp.res, val)
292 292 req.respond(HTTP_OK, HGTYPE, body=rsp)
293 293 return []
294 294 elif isinstance(rsp, wireproto.pusherr):
295 # drain the incoming bundle
295 # This is the httplib workaround documented in _handlehttperror().
296 296 req.drain()
297
297 298 proto.restore()
298 299 rsp = '0\n%s\n' % rsp.res
299 300 req.respond(HTTP_OK, HGTYPE, body=rsp)
300 301 return []
301 302 elif isinstance(rsp, wireproto.ooberror):
302 303 rsp = rsp.message
303 304 req.respond(HTTP_OK, HGERRTYPE, body=rsp)
304 305 return []
305 306 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
306 307
307 308 def _handlehttperror(e, req, cmd):
308 309 """Called when an ErrorResponse is raised during HTTP request processing."""
309 # A client that sends unbundle without 100-continue will
310 # break if we respond early.
311 if (cmd == 'unbundle' and
310
311 # Clients using Python's httplib are stateful: the HTTP client
312 # won't process an HTTP response until all request data is
313 # sent to the server. The intent of this code is to ensure
314 # we always read HTTP request data from the client, thus
315 # ensuring httplib transitions to a state that allows it to read
316 # the HTTP response. In other words, it helps prevent deadlocks
317 # on clients using httplib.
318
319 if (req.env[r'REQUEST_METHOD'] == r'POST' and
320 # But not if Expect: 100-continue is being used.
312 321 (req.env.get('HTTP_EXPECT',
313 322 '').lower() != '100-continue') or
323 # Or the non-httplib HTTP library is being advertised by
324 # the client.
314 325 req.env.get('X-HgHttp2', '')):
315 326 req.drain()
316 327 else:
317 328 req.headers.append((r'Connection', r'Close'))
318 329
330 # TODO This response body assumes the failed command was
331 # "unbundle." That assumption is not always valid.
319 332 req.respond(e, HGTYPE, body='0\n%s\n' % e)
320 333
321 334 return ''
322 335
323 336 class sshserver(abstractserverproto):
324 337 def __init__(self, ui, repo):
325 338 self._ui = ui
326 339 self._repo = repo
327 340 self._fin = ui.fin
328 341 self._fout = ui.fout
329 342
330 343 hook.redirect(True)
331 344 ui.fout = repo.ui.fout = ui.ferr
332 345
333 346 # Prevent insertion/deletion of CRs
334 347 util.setbinary(self._fin)
335 348 util.setbinary(self._fout)
336 349
337 350 @property
338 351 def name(self):
339 352 return 'ssh'
340 353
341 354 def getargs(self, args):
342 355 data = {}
343 356 keys = args.split()
344 357 for n in xrange(len(keys)):
345 358 argline = self._fin.readline()[:-1]
346 359 arg, l = argline.split()
347 360 if arg not in keys:
348 361 raise error.Abort(_("unexpected parameter %r") % arg)
349 362 if arg == '*':
350 363 star = {}
351 364 for k in xrange(int(l)):
352 365 argline = self._fin.readline()[:-1]
353 366 arg, l = argline.split()
354 367 val = self._fin.read(int(l))
355 368 star[arg] = val
356 369 data['*'] = star
357 370 else:
358 371 val = self._fin.read(int(l))
359 372 data[arg] = val
360 373 return [data[k] for k in keys]
361 374
362 375 def getfile(self, fpout):
363 376 self._sendresponse('')
364 377 count = int(self._fin.readline())
365 378 while count:
366 379 fpout.write(self._fin.read(count))
367 380 count = int(self._fin.readline())
368 381
369 382 def redirect(self):
370 383 pass
371 384
372 385 def _sendresponse(self, v):
373 386 self._fout.write("%d\n" % len(v))
374 387 self._fout.write(v)
375 388 self._fout.flush()
376 389
377 390 def _sendstream(self, source):
378 391 write = self._fout.write
379 392 for chunk in source.gen:
380 393 write(chunk)
381 394 self._fout.flush()
382 395
383 396 def _sendpushresponse(self, rsp):
384 397 self._sendresponse('')
385 398 self._sendresponse(str(rsp.res))
386 399
387 400 def _sendpusherror(self, rsp):
388 401 self._sendresponse(rsp.res)
389 402
390 403 def _sendooberror(self, rsp):
391 404 self._ui.ferr.write('%s\n-\n' % rsp.message)
392 405 self._ui.ferr.flush()
393 406 self._fout.write('\n')
394 407 self._fout.flush()
395 408
396 409 def serve_forever(self):
397 410 while self.serve_one():
398 411 pass
399 412 sys.exit(0)
400 413
401 414 _handlers = {
402 415 str: _sendresponse,
403 416 wireproto.streamres: _sendstream,
404 417 wireproto.streamres_legacy: _sendstream,
405 418 wireproto.pushres: _sendpushresponse,
406 419 wireproto.pusherr: _sendpusherror,
407 420 wireproto.ooberror: _sendooberror,
408 421 }
409 422
410 423 def serve_one(self):
411 424 cmd = self._fin.readline()[:-1]
412 425 if cmd and wireproto.commands.commandavailable(cmd, self):
413 426 rsp = wireproto.dispatch(self._repo, self, cmd)
414 427 self._handlers[rsp.__class__](self, rsp)
415 428 elif cmd:
416 429 self._sendresponse("")
417 430 return cmd != ''
418 431
419 432 def _client(self):
420 433 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
421 434 return 'remote:ssh:' + client
General Comments 0
You need to be logged in to leave comments. Login now