##// END OF EJS Templates
wireproto: turn client capabilities into sets, sorted on the wire...
Joerg Sonnenberger -
r37429:3e168871 default
parent child Browse files
Show More
@@ -1,502 +1,502 b''
1 1 # httppeer.py - HTTP repository proxy classes for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from __future__ import absolute_import
10 10
11 11 import errno
12 12 import io
13 13 import os
14 14 import socket
15 15 import struct
16 16 import tempfile
17 17
18 18 from .i18n import _
19 19 from . import (
20 20 bundle2,
21 21 error,
22 22 httpconnection,
23 23 pycompat,
24 24 statichttprepo,
25 25 url as urlmod,
26 26 util,
27 27 wireproto,
28 28 )
29 29
30 30 httplib = util.httplib
31 31 urlerr = util.urlerr
32 32 urlreq = util.urlreq
33 33
34 34 def encodevalueinheaders(value, header, limit):
35 35 """Encode a string value into multiple HTTP headers.
36 36
37 37 ``value`` will be encoded into 1 or more HTTP headers with the names
38 38 ``header-<N>`` where ``<N>`` is an integer starting at 1. Each header
39 39 name + value will be at most ``limit`` bytes long.
40 40
41 41 Returns an iterable of 2-tuples consisting of header names and
42 42 values as native strings.
43 43 """
44 44 # HTTP Headers are ASCII. Python 3 requires them to be unicodes,
45 45 # not bytes. This function always takes bytes in as arguments.
46 46 fmt = pycompat.strurl(header) + r'-%s'
47 47 # Note: it is *NOT* a bug that the last bit here is a bytestring
48 48 # and not a unicode: we're just getting the encoded length anyway,
49 49 # and using an r-string to make it portable between Python 2 and 3
50 50 # doesn't work because then the \r is a literal backslash-r
51 51 # instead of a carriage return.
52 52 valuelen = limit - len(fmt % r'000') - len(': \r\n')
53 53 result = []
54 54
55 55 n = 0
56 56 for i in xrange(0, len(value), valuelen):
57 57 n += 1
58 58 result.append((fmt % str(n), pycompat.strurl(value[i:i + valuelen])))
59 59
60 60 return result
61 61
62 62 def _wraphttpresponse(resp):
63 63 """Wrap an HTTPResponse with common error handlers.
64 64
65 65 This ensures that any I/O from any consumer raises the appropriate
66 66 error and messaging.
67 67 """
68 68 origread = resp.read
69 69
70 70 class readerproxy(resp.__class__):
71 71 def read(self, size=None):
72 72 try:
73 73 return origread(size)
74 74 except httplib.IncompleteRead as e:
75 75 # e.expected is an integer if length known or None otherwise.
76 76 if e.expected:
77 77 msg = _('HTTP request error (incomplete response; '
78 78 'expected %d bytes got %d)') % (e.expected,
79 79 len(e.partial))
80 80 else:
81 81 msg = _('HTTP request error (incomplete response)')
82 82
83 83 raise error.PeerTransportError(
84 84 msg,
85 85 hint=_('this may be an intermittent network failure; '
86 86 'if the error persists, consider contacting the '
87 87 'network or server operator'))
88 88 except httplib.HTTPException as e:
89 89 raise error.PeerTransportError(
90 90 _('HTTP request error (%s)') % e,
91 91 hint=_('this may be an intermittent network failure; '
92 92 'if the error persists, consider contacting the '
93 93 'network or server operator'))
94 94
95 95 resp.__class__ = readerproxy
96 96
97 97 class _multifile(object):
98 98 def __init__(self, *fileobjs):
99 99 for f in fileobjs:
100 100 if not util.safehasattr(f, 'length'):
101 101 raise ValueError(
102 102 '_multifile only supports file objects that '
103 103 'have a length but this one does not:', type(f), f)
104 104 self._fileobjs = fileobjs
105 105 self._index = 0
106 106
107 107 @property
108 108 def length(self):
109 109 return sum(f.length for f in self._fileobjs)
110 110
111 111 def read(self, amt=None):
112 112 if amt <= 0:
113 113 return ''.join(f.read() for f in self._fileobjs)
114 114 parts = []
115 115 while amt and self._index < len(self._fileobjs):
116 116 parts.append(self._fileobjs[self._index].read(amt))
117 117 got = len(parts[-1])
118 118 if got < amt:
119 119 self._index += 1
120 120 amt -= got
121 121 return ''.join(parts)
122 122
123 123 def seek(self, offset, whence=os.SEEK_SET):
124 124 if whence != os.SEEK_SET:
125 125 raise NotImplementedError(
126 126 '_multifile does not support anything other'
127 127 ' than os.SEEK_SET for whence on seek()')
128 128 if offset != 0:
129 129 raise NotImplementedError(
130 130 '_multifile only supports seeking to start, but that '
131 131 'could be fixed if you need it')
132 132 for f in self._fileobjs:
133 133 f.seek(0)
134 134 self._index = 0
135 135
136 136 class httppeer(wireproto.wirepeer):
137 137 def __init__(self, ui, path, url, opener):
138 138 self.ui = ui
139 139 self._path = path
140 140 self._url = url
141 141 self._caps = None
142 142 self._urlopener = opener
143 143 # This is an its own attribute to facilitate extensions overriding
144 144 # the default type.
145 145 self._requestbuilder = urlreq.request
146 146
147 147 def __del__(self):
148 148 for h in self._urlopener.handlers:
149 149 h.close()
150 150 getattr(h, "close_all", lambda: None)()
151 151
152 152 def _openurl(self, req):
153 153 if (self.ui.debugflag
154 154 and self.ui.configbool('devel', 'debug.peer-request')):
155 155 dbg = self.ui.debug
156 156 line = 'devel-peer-request: %s\n'
157 157 dbg(line % '%s %s' % (req.get_method(), req.get_full_url()))
158 158 hgargssize = None
159 159
160 160 for header, value in sorted(req.header_items()):
161 161 if header.startswith('X-hgarg-'):
162 162 if hgargssize is None:
163 163 hgargssize = 0
164 164 hgargssize += len(value)
165 165 else:
166 166 dbg(line % ' %s %s' % (header, value))
167 167
168 168 if hgargssize is not None:
169 169 dbg(line % ' %d bytes of commands arguments in headers'
170 170 % hgargssize)
171 171
172 172 if req.has_data():
173 173 data = req.get_data()
174 174 length = getattr(data, 'length', None)
175 175 if length is None:
176 176 length = len(data)
177 177 dbg(line % ' %d bytes of data' % length)
178 178
179 179 start = util.timer()
180 180
181 181 ret = self._urlopener.open(req)
182 182 if self.ui.configbool('devel', 'debug.peer-request'):
183 183 dbg(line % ' finished in %.4f seconds (%s)'
184 184 % (util.timer() - start, ret.code))
185 185 return ret
186 186
187 187 # Begin of ipeerconnection interface.
188 188
189 189 def url(self):
190 190 return self._path
191 191
192 192 def local(self):
193 193 return None
194 194
195 195 def peer(self):
196 196 return self
197 197
198 198 def canpush(self):
199 199 return True
200 200
201 201 def close(self):
202 202 pass
203 203
204 204 # End of ipeerconnection interface.
205 205
206 206 # Begin of ipeercommands interface.
207 207
208 208 def capabilities(self):
209 209 # self._fetchcaps() should have been called as part of peer
210 210 # handshake. So self._caps should always be set.
211 211 assert self._caps is not None
212 212 return self._caps
213 213
214 214 # End of ipeercommands interface.
215 215
216 216 # look up capabilities only when needed
217 217
218 218 def _fetchcaps(self):
219 219 self._caps = set(self._call('capabilities').split())
220 220
221 221 def _callstream(self, cmd, _compressible=False, **args):
222 222 args = pycompat.byteskwargs(args)
223 223 if cmd == 'pushkey':
224 224 args['data'] = ''
225 225 data = args.pop('data', None)
226 226 headers = args.pop('headers', {})
227 227
228 228 self.ui.debug("sending %s command\n" % cmd)
229 229 q = [('cmd', cmd)]
230 230 headersize = 0
231 231 varyheaders = []
232 232 # Important: don't use self.capable() here or else you end up
233 233 # with infinite recursion when trying to look up capabilities
234 234 # for the first time.
235 235 postargsok = self._caps is not None and 'httppostargs' in self._caps
236 236
237 237 # Send arguments via POST.
238 238 if postargsok and args:
239 239 strargs = urlreq.urlencode(sorted(args.items()))
240 240 if not data:
241 241 data = strargs
242 242 else:
243 243 if isinstance(data, bytes):
244 244 i = io.BytesIO(data)
245 245 i.length = len(data)
246 246 data = i
247 247 argsio = io.BytesIO(strargs)
248 248 argsio.length = len(strargs)
249 249 data = _multifile(argsio, data)
250 250 headers[r'X-HgArgs-Post'] = len(strargs)
251 251 elif args:
252 252 # Calling self.capable() can infinite loop if we are calling
253 253 # "capabilities". But that command should never accept wire
254 254 # protocol arguments. So this should never happen.
255 255 assert cmd != 'capabilities'
256 256 httpheader = self.capable('httpheader')
257 257 if httpheader:
258 258 headersize = int(httpheader.split(',', 1)[0])
259 259
260 260 # Send arguments via HTTP headers.
261 261 if headersize > 0:
262 262 # The headers can typically carry more data than the URL.
263 263 encargs = urlreq.urlencode(sorted(args.items()))
264 264 for header, value in encodevalueinheaders(encargs, 'X-HgArg',
265 265 headersize):
266 266 headers[header] = value
267 267 varyheaders.append(header)
268 268 # Send arguments via query string (Mercurial <1.9).
269 269 else:
270 270 q += sorted(args.items())
271 271
272 272 qs = '?%s' % urlreq.urlencode(q)
273 273 cu = "%s%s" % (self._url, qs)
274 274 size = 0
275 275 if util.safehasattr(data, 'length'):
276 276 size = data.length
277 277 elif data is not None:
278 278 size = len(data)
279 279 if data is not None and r'Content-Type' not in headers:
280 280 headers[r'Content-Type'] = r'application/mercurial-0.1'
281 281
282 282 # Tell the server we accept application/mercurial-0.2 and multiple
283 283 # compression formats if the server is capable of emitting those
284 284 # payloads.
285 protoparams = []
285 protoparams = set()
286 286
287 287 mediatypes = set()
288 288 if self._caps is not None:
289 289 mt = self.capable('httpmediatype')
290 290 if mt:
291 protoparams.append('0.1')
291 protoparams.add('0.1')
292 292 mediatypes = set(mt.split(','))
293 293
294 294 if '0.2tx' in mediatypes:
295 protoparams.append('0.2')
295 protoparams.add('0.2')
296 296
297 297 if '0.2tx' in mediatypes and self.capable('compression'):
298 298 # We /could/ compare supported compression formats and prune
299 299 # non-mutually supported or error if nothing is mutually supported.
300 300 # For now, send the full list to the server and have it error.
301 301 comps = [e.wireprotosupport().name for e in
302 302 util.compengines.supportedwireengines(util.CLIENTROLE)]
303 protoparams.append('comp=%s' % ','.join(comps))
303 protoparams.add('comp=%s' % ','.join(comps))
304 304
305 305 if protoparams:
306 protoheaders = encodevalueinheaders(' '.join(protoparams),
306 protoheaders = encodevalueinheaders(' '.join(sorted(protoparams)),
307 307 'X-HgProto',
308 308 headersize or 1024)
309 309 for header, value in protoheaders:
310 310 headers[header] = value
311 311 varyheaders.append(header)
312 312
313 313 if varyheaders:
314 314 headers[r'Vary'] = r','.join(varyheaders)
315 315
316 316 req = self._requestbuilder(pycompat.strurl(cu), data, headers)
317 317
318 318 if data is not None:
319 319 self.ui.debug("sending %d bytes\n" % size)
320 320 req.add_unredirected_header(r'Content-Length', r'%d' % size)
321 321 try:
322 322 resp = self._openurl(req)
323 323 except urlerr.httperror as inst:
324 324 if inst.code == 401:
325 325 raise error.Abort(_('authorization failed'))
326 326 raise
327 327 except httplib.HTTPException as inst:
328 328 self.ui.debug('http error while sending %s command\n' % cmd)
329 329 self.ui.traceback()
330 330 raise IOError(None, inst)
331 331
332 332 # Insert error handlers for common I/O failures.
333 333 _wraphttpresponse(resp)
334 334
335 335 # record the url we got redirected to
336 336 resp_url = pycompat.bytesurl(resp.geturl())
337 337 if resp_url.endswith(qs):
338 338 resp_url = resp_url[:-len(qs)]
339 339 if self._url.rstrip('/') != resp_url.rstrip('/'):
340 340 if not self.ui.quiet:
341 341 self.ui.warn(_('real URL is %s\n') % resp_url)
342 342 self._url = resp_url
343 343 try:
344 344 proto = pycompat.bytesurl(resp.getheader(r'content-type', r''))
345 345 except AttributeError:
346 346 proto = pycompat.bytesurl(resp.headers.get(r'content-type', r''))
347 347
348 348 safeurl = util.hidepassword(self._url)
349 349 if proto.startswith('application/hg-error'):
350 350 raise error.OutOfBandError(resp.read())
351 351 # accept old "text/plain" and "application/hg-changegroup" for now
352 352 if not (proto.startswith('application/mercurial-') or
353 353 (proto.startswith('text/plain')
354 354 and not resp.headers.get('content-length')) or
355 355 proto.startswith('application/hg-changegroup')):
356 356 self.ui.debug("requested URL: '%s'\n" % util.hidepassword(cu))
357 357 raise error.RepoError(
358 358 _("'%s' does not appear to be an hg repository:\n"
359 359 "---%%<--- (%s)\n%s\n---%%<---\n")
360 360 % (safeurl, proto or 'no content-type', resp.read(1024)))
361 361
362 362 if proto.startswith('application/mercurial-'):
363 363 try:
364 364 version = proto.split('-', 1)[1]
365 365 version_info = tuple([int(n) for n in version.split('.')])
366 366 except ValueError:
367 367 raise error.RepoError(_("'%s' sent a broken Content-Type "
368 368 "header (%s)") % (safeurl, proto))
369 369
370 370 # TODO consider switching to a decompression reader that uses
371 371 # generators.
372 372 if version_info == (0, 1):
373 373 if _compressible:
374 374 return util.compengines['zlib'].decompressorreader(resp)
375 375 return resp
376 376 elif version_info == (0, 2):
377 377 # application/mercurial-0.2 always identifies the compression
378 378 # engine in the payload header.
379 379 elen = struct.unpack('B', resp.read(1))[0]
380 380 ename = resp.read(elen)
381 381 engine = util.compengines.forwiretype(ename)
382 382 return engine.decompressorreader(resp)
383 383 else:
384 384 raise error.RepoError(_("'%s' uses newer protocol %s") %
385 385 (safeurl, version))
386 386
387 387 if _compressible:
388 388 return util.compengines['zlib'].decompressorreader(resp)
389 389
390 390 return resp
391 391
392 392 def _call(self, cmd, **args):
393 393 fp = self._callstream(cmd, **args)
394 394 try:
395 395 return fp.read()
396 396 finally:
397 397 # if using keepalive, allow connection to be reused
398 398 fp.close()
399 399
400 400 def _callpush(self, cmd, cg, **args):
401 401 # have to stream bundle to a temp file because we do not have
402 402 # http 1.1 chunked transfer.
403 403
404 404 types = self.capable('unbundle')
405 405 try:
406 406 types = types.split(',')
407 407 except AttributeError:
408 408 # servers older than d1b16a746db6 will send 'unbundle' as a
409 409 # boolean capability. They only support headerless/uncompressed
410 410 # bundles.
411 411 types = [""]
412 412 for x in types:
413 413 if x in bundle2.bundletypes:
414 414 type = x
415 415 break
416 416
417 417 tempname = bundle2.writebundle(self.ui, cg, None, type)
418 418 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
419 419 headers = {r'Content-Type': r'application/mercurial-0.1'}
420 420
421 421 try:
422 422 r = self._call(cmd, data=fp, headers=headers, **args)
423 423 vals = r.split('\n', 1)
424 424 if len(vals) < 2:
425 425 raise error.ResponseError(_("unexpected response:"), r)
426 426 return vals
427 427 except urlerr.httperror:
428 428 # Catch and re-raise these so we don't try and treat them
429 429 # like generic socket errors. They lack any values in
430 430 # .args on Python 3 which breaks our socket.error block.
431 431 raise
432 432 except socket.error as err:
433 433 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
434 434 raise error.Abort(_('push failed: %s') % err.args[1])
435 435 raise error.Abort(err.args[1])
436 436 finally:
437 437 fp.close()
438 438 os.unlink(tempname)
439 439
440 440 def _calltwowaystream(self, cmd, fp, **args):
441 441 fh = None
442 442 fp_ = None
443 443 filename = None
444 444 try:
445 445 # dump bundle to disk
446 446 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
447 447 fh = os.fdopen(fd, r"wb")
448 448 d = fp.read(4096)
449 449 while d:
450 450 fh.write(d)
451 451 d = fp.read(4096)
452 452 fh.close()
453 453 # start http push
454 454 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
455 455 headers = {r'Content-Type': r'application/mercurial-0.1'}
456 456 return self._callstream(cmd, data=fp_, headers=headers, **args)
457 457 finally:
458 458 if fp_ is not None:
459 459 fp_.close()
460 460 if fh is not None:
461 461 fh.close()
462 462 os.unlink(filename)
463 463
464 464 def _callcompressable(self, cmd, **args):
465 465 return self._callstream(cmd, _compressible=True, **args)
466 466
467 467 def _abort(self, exception):
468 468 raise exception
469 469
470 470 def makepeer(ui, path):
471 471 u = util.url(path)
472 472 if u.query or u.fragment:
473 473 raise error.Abort(_('unsupported URL component: "%s"') %
474 474 (u.query or u.fragment))
475 475
476 476 # urllib cannot handle URLs with embedded user or passwd.
477 477 url, authinfo = u.authinfo()
478 478 ui.debug('using %s\n' % url)
479 479
480 480 opener = urlmod.opener(ui, authinfo)
481 481
482 482 return httppeer(ui, path, url, opener)
483 483
484 484 def instance(ui, path, create):
485 485 if create:
486 486 raise error.Abort(_('cannot create new http repository'))
487 487 try:
488 488 if path.startswith('https:') and not urlmod.has_https:
489 489 raise error.Abort(_('Python support for SSL and HTTPS '
490 490 'is not installed'))
491 491
492 492 inst = makepeer(ui, path)
493 493 inst._fetchcaps()
494 494
495 495 return inst
496 496 except error.RepoError as httpexception:
497 497 try:
498 498 r = statichttprepo.instance(ui, "static-" + path, create)
499 499 ui.note(_('(falling back to static-http)\n'))
500 500 return r
501 501 except error.RepoError:
502 502 raise httpexception # use the original http RepoError instead
@@ -1,634 +1,635 b''
1 1 # sshpeer.py - ssh repository proxy class for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import re
11 11 import uuid
12 12
13 13 from .i18n import _
14 14 from . import (
15 15 error,
16 16 pycompat,
17 17 util,
18 18 wireproto,
19 19 wireprotoserver,
20 20 wireprototypes,
21 21 )
22 22 from .utils import (
23 23 procutil,
24 24 )
25 25
26 26 def _serverquote(s):
27 27 """quote a string for the remote shell ... which we assume is sh"""
28 28 if not s:
29 29 return s
30 30 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
31 31 return s
32 32 return "'%s'" % s.replace("'", "'\\''")
33 33
34 34 def _forwardoutput(ui, pipe):
35 35 """display all data currently available on pipe as remote output.
36 36
37 37 This is non blocking."""
38 38 if pipe:
39 39 s = procutil.readpipe(pipe)
40 40 if s:
41 41 for l in s.splitlines():
42 42 ui.status(_("remote: "), l, '\n')
43 43
44 44 class doublepipe(object):
45 45 """Operate a side-channel pipe in addition of a main one
46 46
47 47 The side-channel pipe contains server output to be forwarded to the user
48 48 input. The double pipe will behave as the "main" pipe, but will ensure the
49 49 content of the "side" pipe is properly processed while we wait for blocking
50 50 call on the "main" pipe.
51 51
52 52 If large amounts of data are read from "main", the forward will cease after
53 53 the first bytes start to appear. This simplifies the implementation
54 54 without affecting actual output of sshpeer too much as we rarely issue
55 55 large read for data not yet emitted by the server.
56 56
57 57 The main pipe is expected to be a 'bufferedinputpipe' from the util module
58 58 that handle all the os specific bits. This class lives in this module
59 59 because it focus on behavior specific to the ssh protocol."""
60 60
61 61 def __init__(self, ui, main, side):
62 62 self._ui = ui
63 63 self._main = main
64 64 self._side = side
65 65
66 66 def _wait(self):
67 67 """wait until some data are available on main or side
68 68
69 69 return a pair of boolean (ismainready, issideready)
70 70
71 71 (This will only wait for data if the setup is supported by `util.poll`)
72 72 """
73 73 if (isinstance(self._main, util.bufferedinputpipe) and
74 74 self._main.hasbuffer):
75 75 # Main has data. Assume side is worth poking at.
76 76 return True, True
77 77
78 78 fds = [self._main.fileno(), self._side.fileno()]
79 79 try:
80 80 act = util.poll(fds)
81 81 except NotImplementedError:
82 82 # non supported yet case, assume all have data.
83 83 act = fds
84 84 return (self._main.fileno() in act, self._side.fileno() in act)
85 85
86 86 def write(self, data):
87 87 return self._call('write', data)
88 88
89 89 def read(self, size):
90 90 r = self._call('read', size)
91 91 if size != 0 and not r:
92 92 # We've observed a condition that indicates the
93 93 # stdout closed unexpectedly. Check stderr one
94 94 # more time and snag anything that's there before
95 95 # letting anyone know the main part of the pipe
96 96 # closed prematurely.
97 97 _forwardoutput(self._ui, self._side)
98 98 return r
99 99
100 100 def readline(self):
101 101 return self._call('readline')
102 102
103 103 def _call(self, methname, data=None):
104 104 """call <methname> on "main", forward output of "side" while blocking
105 105 """
106 106 # data can be '' or 0
107 107 if (data is not None and not data) or self._main.closed:
108 108 _forwardoutput(self._ui, self._side)
109 109 return ''
110 110 while True:
111 111 mainready, sideready = self._wait()
112 112 if sideready:
113 113 _forwardoutput(self._ui, self._side)
114 114 if mainready:
115 115 meth = getattr(self._main, methname)
116 116 if data is None:
117 117 return meth()
118 118 else:
119 119 return meth(data)
120 120
121 121 def close(self):
122 122 return self._main.close()
123 123
124 124 def flush(self):
125 125 return self._main.flush()
126 126
127 127 def _cleanuppipes(ui, pipei, pipeo, pipee):
128 128 """Clean up pipes used by an SSH connection."""
129 129 if pipeo:
130 130 pipeo.close()
131 131 if pipei:
132 132 pipei.close()
133 133
134 134 if pipee:
135 135 # Try to read from the err descriptor until EOF.
136 136 try:
137 137 for l in pipee:
138 138 ui.status(_('remote: '), l)
139 139 except (IOError, ValueError):
140 140 pass
141 141
142 142 pipee.close()
143 143
144 144 def _makeconnection(ui, sshcmd, args, remotecmd, path, sshenv=None):
145 145 """Create an SSH connection to a server.
146 146
147 147 Returns a tuple of (process, stdin, stdout, stderr) for the
148 148 spawned process.
149 149 """
150 150 cmd = '%s %s %s' % (
151 151 sshcmd,
152 152 args,
153 153 procutil.shellquote('%s -R %s serve --stdio' % (
154 154 _serverquote(remotecmd), _serverquote(path))))
155 155
156 156 ui.debug('running %s\n' % cmd)
157 157 cmd = procutil.quotecommand(cmd)
158 158
159 159 # no buffer allow the use of 'select'
160 160 # feel free to remove buffering and select usage when we ultimately
161 161 # move to threading.
162 162 stdin, stdout, stderr, proc = procutil.popen4(cmd, bufsize=0, env=sshenv)
163 163
164 164 return proc, stdin, stdout, stderr
165 165
166 166 def _clientcapabilities():
167 167 """Return list of capabilities of this client.
168 168
169 169 Returns a list of capabilities that are supported by this client.
170 170 """
171 protoparams = []
171 protoparams = set()
172 172 comps = [e.wireprotosupport().name for e in
173 173 util.compengines.supportedwireengines(util.CLIENTROLE)]
174 protoparams.append('comp=%s' % ','.join(comps))
174 protoparams.add('comp=%s' % ','.join(comps))
175 175 return protoparams
176 176
177 177 def _performhandshake(ui, stdin, stdout, stderr):
178 178 def badresponse():
179 179 # Flush any output on stderr.
180 180 _forwardoutput(ui, stderr)
181 181
182 182 msg = _('no suitable response from remote hg')
183 183 hint = ui.config('ui', 'ssherrorhint')
184 184 raise error.RepoError(msg, hint=hint)
185 185
186 186 # The handshake consists of sending wire protocol commands in reverse
187 187 # order of protocol implementation and then sniffing for a response
188 188 # to one of them.
189 189 #
190 190 # Those commands (from oldest to newest) are:
191 191 #
192 192 # ``between``
193 193 # Asks for the set of revisions between a pair of revisions. Command
194 194 # present in all Mercurial server implementations.
195 195 #
196 196 # ``hello``
197 197 # Instructs the server to advertise its capabilities. Introduced in
198 198 # Mercurial 0.9.1.
199 199 #
200 200 # ``upgrade``
201 201 # Requests upgrade from default transport protocol version 1 to
202 202 # a newer version. Introduced in Mercurial 4.6 as an experimental
203 203 # feature.
204 204 #
205 205 # The ``between`` command is issued with a request for the null
206 206 # range. If the remote is a Mercurial server, this request will
207 207 # generate a specific response: ``1\n\n``. This represents the
208 208 # wire protocol encoded value for ``\n``. We look for ``1\n\n``
209 209 # in the output stream and know this is the response to ``between``
210 210 # and we're at the end of our handshake reply.
211 211 #
212 212 # The response to the ``hello`` command will be a line with the
213 213 # length of the value returned by that command followed by that
214 214 # value. If the server doesn't support ``hello`` (which should be
215 215 # rare), that line will be ``0\n``. Otherwise, the value will contain
216 216 # RFC 822 like lines. Of these, the ``capabilities:`` line contains
217 217 # the capabilities of the server.
218 218 #
219 219 # The ``upgrade`` command isn't really a command in the traditional
220 220 # sense of version 1 of the transport because it isn't using the
221 221 # proper mechanism for formatting insteads: instead, it just encodes
222 222 # arguments on the line, delimited by spaces.
223 223 #
224 224 # The ``upgrade`` line looks like ``upgrade <token> <capabilities>``.
225 225 # If the server doesn't support protocol upgrades, it will reply to
226 226 # this line with ``0\n``. Otherwise, it emits an
227 227 # ``upgraded <token> <protocol>`` line to both stdout and stderr.
228 228 # Content immediately following this line describes additional
229 229 # protocol and server state.
230 230 #
231 231 # In addition to the responses to our command requests, the server
232 232 # may emit "banner" output on stdout. SSH servers are allowed to
233 233 # print messages to stdout on login. Issuing commands on connection
234 234 # allows us to flush this banner output from the server by scanning
235 235 # for output to our well-known ``between`` command. Of course, if
236 236 # the banner contains ``1\n\n``, this will throw off our detection.
237 237
238 238 requestlog = ui.configbool('devel', 'debug.peer-request')
239 239
240 240 # Generate a random token to help identify responses to version 2
241 241 # upgrade request.
242 242 token = pycompat.sysbytes(str(uuid.uuid4()))
243 243 upgradecaps = [
244 244 ('proto', wireprotoserver.SSHV2),
245 245 ]
246 246 upgradecaps = util.urlreq.urlencode(upgradecaps)
247 247
248 248 try:
249 249 pairsarg = '%s-%s' % ('0' * 40, '0' * 40)
250 250 handshake = [
251 251 'hello\n',
252 252 'between\n',
253 253 'pairs %d\n' % len(pairsarg),
254 254 pairsarg,
255 255 ]
256 256
257 257 # Request upgrade to version 2 if configured.
258 258 if ui.configbool('experimental', 'sshpeer.advertise-v2'):
259 259 ui.debug('sending upgrade request: %s %s\n' % (token, upgradecaps))
260 260 handshake.insert(0, 'upgrade %s %s\n' % (token, upgradecaps))
261 261
262 262 if requestlog:
263 263 ui.debug('devel-peer-request: hello\n')
264 264 ui.debug('sending hello command\n')
265 265 if requestlog:
266 266 ui.debug('devel-peer-request: between\n')
267 267 ui.debug('devel-peer-request: pairs: %d bytes\n' % len(pairsarg))
268 268 ui.debug('sending between command\n')
269 269
270 270 stdin.write(''.join(handshake))
271 271 stdin.flush()
272 272 except IOError:
273 273 badresponse()
274 274
275 275 # Assume version 1 of wire protocol by default.
276 276 protoname = wireprototypes.SSHV1
277 277 reupgraded = re.compile(b'^upgraded %s (.*)$' % re.escape(token))
278 278
279 279 lines = ['', 'dummy']
280 280 max_noise = 500
281 281 while lines[-1] and max_noise:
282 282 try:
283 283 l = stdout.readline()
284 284 _forwardoutput(ui, stderr)
285 285
286 286 # Look for reply to protocol upgrade request. It has a token
287 287 # in it, so there should be no false positives.
288 288 m = reupgraded.match(l)
289 289 if m:
290 290 protoname = m.group(1)
291 291 ui.debug('protocol upgraded to %s\n' % protoname)
292 292 # If an upgrade was handled, the ``hello`` and ``between``
293 293 # requests are ignored. The next output belongs to the
294 294 # protocol, so stop scanning lines.
295 295 break
296 296
297 297 # Otherwise it could be a banner, ``0\n`` response if server
298 298 # doesn't support upgrade.
299 299
300 300 if lines[-1] == '1\n' and l == '\n':
301 301 break
302 302 if l:
303 303 ui.debug('remote: ', l)
304 304 lines.append(l)
305 305 max_noise -= 1
306 306 except IOError:
307 307 badresponse()
308 308 else:
309 309 badresponse()
310 310
311 311 caps = set()
312 312
313 313 # For version 1, we should see a ``capabilities`` line in response to the
314 314 # ``hello`` command.
315 315 if protoname == wireprototypes.SSHV1:
316 316 for l in reversed(lines):
317 317 # Look for response to ``hello`` command. Scan from the back so
318 318 # we don't misinterpret banner output as the command reply.
319 319 if l.startswith('capabilities:'):
320 320 caps.update(l[:-1].split(':')[1].split())
321 321 break
322 322 elif protoname == wireprotoserver.SSHV2:
323 323 # We see a line with number of bytes to follow and then a value
324 324 # looking like ``capabilities: *``.
325 325 line = stdout.readline()
326 326 try:
327 327 valuelen = int(line)
328 328 except ValueError:
329 329 badresponse()
330 330
331 331 capsline = stdout.read(valuelen)
332 332 if not capsline.startswith('capabilities: '):
333 333 badresponse()
334 334
335 335 ui.debug('remote: %s\n' % capsline)
336 336
337 337 caps.update(capsline.split(':')[1].split())
338 338 # Trailing newline.
339 339 stdout.read(1)
340 340
341 341 # Error if we couldn't find capabilities, this means:
342 342 #
343 343 # 1. Remote isn't a Mercurial server
344 344 # 2. Remote is a <0.9.1 Mercurial server
345 345 # 3. Remote is a future Mercurial server that dropped ``hello``
346 346 # and other attempted handshake mechanisms.
347 347 if not caps:
348 348 badresponse()
349 349
350 350 # Flush any output on stderr before proceeding.
351 351 _forwardoutput(ui, stderr)
352 352
353 353 return protoname, caps
354 354
355 355 class sshv1peer(wireproto.wirepeer):
356 356 def __init__(self, ui, url, proc, stdin, stdout, stderr, caps,
357 357 autoreadstderr=True):
358 358 """Create a peer from an existing SSH connection.
359 359
360 360 ``proc`` is a handle on the underlying SSH process.
361 361 ``stdin``, ``stdout``, and ``stderr`` are handles on the stdio
362 362 pipes for that process.
363 363 ``caps`` is a set of capabilities supported by the remote.
364 364 ``autoreadstderr`` denotes whether to automatically read from
365 365 stderr and to forward its output.
366 366 """
367 367 self._url = url
368 368 self.ui = ui
369 369 # self._subprocess is unused. Keeping a handle on the process
370 370 # holds a reference and prevents it from being garbage collected.
371 371 self._subprocess = proc
372 372
373 373 # And we hook up our "doublepipe" wrapper to allow querying
374 374 # stderr any time we perform I/O.
375 375 if autoreadstderr:
376 376 stdout = doublepipe(ui, util.bufferedinputpipe(stdout), stderr)
377 377 stdin = doublepipe(ui, stdin, stderr)
378 378
379 379 self._pipeo = stdin
380 380 self._pipei = stdout
381 381 self._pipee = stderr
382 382 self._caps = caps
383 383 self._autoreadstderr = autoreadstderr
384 384
385 385 # Commands that have a "framed" response where the first line of the
386 386 # response contains the length of that response.
387 387 _FRAMED_COMMANDS = {
388 388 'batch',
389 389 }
390 390
391 391 # Begin of ipeerconnection interface.
392 392
393 393 def url(self):
394 394 return self._url
395 395
396 396 def local(self):
397 397 return None
398 398
399 399 def peer(self):
400 400 return self
401 401
402 402 def canpush(self):
403 403 return True
404 404
405 405 def close(self):
406 406 pass
407 407
408 408 # End of ipeerconnection interface.
409 409
410 410 # Begin of ipeercommands interface.
411 411
412 412 def capabilities(self):
413 413 return self._caps
414 414
415 415 # End of ipeercommands interface.
416 416
417 417 def _readerr(self):
418 418 _forwardoutput(self.ui, self._pipee)
419 419
420 420 def _abort(self, exception):
421 421 self._cleanup()
422 422 raise exception
423 423
424 424 def _cleanup(self):
425 425 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee)
426 426
427 427 __del__ = _cleanup
428 428
429 429 def _sendrequest(self, cmd, args, framed=False):
430 430 if (self.ui.debugflag
431 431 and self.ui.configbool('devel', 'debug.peer-request')):
432 432 dbg = self.ui.debug
433 433 line = 'devel-peer-request: %s\n'
434 434 dbg(line % cmd)
435 435 for key, value in sorted(args.items()):
436 436 if not isinstance(value, dict):
437 437 dbg(line % ' %s: %d bytes' % (key, len(value)))
438 438 else:
439 439 for dk, dv in sorted(value.items()):
440 440 dbg(line % ' %s-%s: %d' % (key, dk, len(dv)))
441 441 self.ui.debug("sending %s command\n" % cmd)
442 442 self._pipeo.write("%s\n" % cmd)
443 443 _func, names = wireproto.commands[cmd]
444 444 keys = names.split()
445 445 wireargs = {}
446 446 for k in keys:
447 447 if k == '*':
448 448 wireargs['*'] = args
449 449 break
450 450 else:
451 451 wireargs[k] = args[k]
452 452 del args[k]
453 453 for k, v in sorted(wireargs.iteritems()):
454 454 self._pipeo.write("%s %d\n" % (k, len(v)))
455 455 if isinstance(v, dict):
456 456 for dk, dv in v.iteritems():
457 457 self._pipeo.write("%s %d\n" % (dk, len(dv)))
458 458 self._pipeo.write(dv)
459 459 else:
460 460 self._pipeo.write(v)
461 461 self._pipeo.flush()
462 462
463 463 # We know exactly how many bytes are in the response. So return a proxy
464 464 # around the raw output stream that allows reading exactly this many
465 465 # bytes. Callers then can read() without fear of overrunning the
466 466 # response.
467 467 if framed:
468 468 amount = self._getamount()
469 469 return util.cappedreader(self._pipei, amount)
470 470
471 471 return self._pipei
472 472
473 473 def _callstream(self, cmd, **args):
474 474 args = pycompat.byteskwargs(args)
475 475 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
476 476
477 477 def _callcompressable(self, cmd, **args):
478 478 args = pycompat.byteskwargs(args)
479 479 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
480 480
481 481 def _call(self, cmd, **args):
482 482 args = pycompat.byteskwargs(args)
483 483 return self._sendrequest(cmd, args, framed=True).read()
484 484
485 485 def _callpush(self, cmd, fp, **args):
486 486 # The server responds with an empty frame if the client should
487 487 # continue submitting the payload.
488 488 r = self._call(cmd, **args)
489 489 if r:
490 490 return '', r
491 491
492 492 # The payload consists of frames with content followed by an empty
493 493 # frame.
494 494 for d in iter(lambda: fp.read(4096), ''):
495 495 self._writeframed(d)
496 496 self._writeframed("", flush=True)
497 497
498 498 # In case of success, there is an empty frame and a frame containing
499 499 # the integer result (as a string).
500 500 # In case of error, there is a non-empty frame containing the error.
501 501 r = self._readframed()
502 502 if r:
503 503 return '', r
504 504 return self._readframed(), ''
505 505
506 506 def _calltwowaystream(self, cmd, fp, **args):
507 507 # The server responds with an empty frame if the client should
508 508 # continue submitting the payload.
509 509 r = self._call(cmd, **args)
510 510 if r:
511 511 # XXX needs to be made better
512 512 raise error.Abort(_('unexpected remote reply: %s') % r)
513 513
514 514 # The payload consists of frames with content followed by an empty
515 515 # frame.
516 516 for d in iter(lambda: fp.read(4096), ''):
517 517 self._writeframed(d)
518 518 self._writeframed("", flush=True)
519 519
520 520 return self._pipei
521 521
522 522 def _getamount(self):
523 523 l = self._pipei.readline()
524 524 if l == '\n':
525 525 if self._autoreadstderr:
526 526 self._readerr()
527 527 msg = _('check previous remote output')
528 528 self._abort(error.OutOfBandError(hint=msg))
529 529 if self._autoreadstderr:
530 530 self._readerr()
531 531 try:
532 532 return int(l)
533 533 except ValueError:
534 534 self._abort(error.ResponseError(_("unexpected response:"), l))
535 535
536 536 def _readframed(self):
537 537 size = self._getamount()
538 538 if not size:
539 539 return b''
540 540
541 541 return self._pipei.read(size)
542 542
543 543 def _writeframed(self, data, flush=False):
544 544 self._pipeo.write("%d\n" % len(data))
545 545 if data:
546 546 self._pipeo.write(data)
547 547 if flush:
548 548 self._pipeo.flush()
549 549 if self._autoreadstderr:
550 550 self._readerr()
551 551
552 552 class sshv2peer(sshv1peer):
553 553 """A peer that speakers version 2 of the transport protocol."""
554 554 # Currently version 2 is identical to version 1 post handshake.
555 555 # And handshake is performed before the peer is instantiated. So
556 556 # we need no custom code.
557 557
558 558 def makepeer(ui, path, proc, stdin, stdout, stderr, autoreadstderr=True):
559 559 """Make a peer instance from existing pipes.
560 560
561 561 ``path`` and ``proc`` are stored on the eventual peer instance and may
562 562 not be used for anything meaningful.
563 563
564 564 ``stdin``, ``stdout``, and ``stderr`` are the pipes connected to the
565 565 SSH server's stdio handles.
566 566
567 567 This function is factored out to allow creating peers that don't
568 568 actually spawn a new process. It is useful for starting SSH protocol
569 569 servers and clients via non-standard means, which can be useful for
570 570 testing.
571 571 """
572 572 try:
573 573 protoname, caps = _performhandshake(ui, stdin, stdout, stderr)
574 574 except Exception:
575 575 _cleanuppipes(ui, stdout, stdin, stderr)
576 576 raise
577 577
578 578 if protoname == wireprototypes.SSHV1:
579 579 return sshv1peer(ui, path, proc, stdin, stdout, stderr, caps,
580 580 autoreadstderr=autoreadstderr)
581 581 elif protoname == wireprototypes.SSHV2:
582 582 return sshv2peer(ui, path, proc, stdin, stdout, stderr, caps,
583 583 autoreadstderr=autoreadstderr)
584 584 else:
585 585 _cleanuppipes(ui, stdout, stdin, stderr)
586 586 raise error.RepoError(_('unknown version of SSH protocol: %s') %
587 587 protoname)
588 588
589 589 def instance(ui, path, create):
590 590 """Create an SSH peer.
591 591
592 592 The returned object conforms to the ``wireproto.wirepeer`` interface.
593 593 """
594 594 u = util.url(path, parsequery=False, parsefragment=False)
595 595 if u.scheme != 'ssh' or not u.host or u.path is None:
596 596 raise error.RepoError(_("couldn't parse location %s") % path)
597 597
598 598 util.checksafessh(path)
599 599
600 600 if u.passwd is not None:
601 601 raise error.RepoError(_('password in URL not supported'))
602 602
603 603 sshcmd = ui.config('ui', 'ssh')
604 604 remotecmd = ui.config('ui', 'remotecmd')
605 605 sshaddenv = dict(ui.configitems('sshenv'))
606 606 sshenv = procutil.shellenviron(sshaddenv)
607 607 remotepath = u.path or '.'
608 608
609 609 args = procutil.sshargs(sshcmd, u.host, u.user, u.port)
610 610
611 611 if create:
612 612 cmd = '%s %s %s' % (sshcmd, args,
613 613 procutil.shellquote('%s init %s' %
614 614 (_serverquote(remotecmd), _serverquote(remotepath))))
615 615 ui.debug('running %s\n' % cmd)
616 616 res = ui.system(cmd, blockedtag='sshpeer', environ=sshenv)
617 617 if res != 0:
618 618 raise error.RepoError(_('could not create remote repo'))
619 619
620 620 proc, stdin, stdout, stderr = _makeconnection(ui, sshcmd, args, remotecmd,
621 621 remotepath, sshenv)
622 622
623 623 peer = makepeer(ui, path, proc, stdin, stdout, stderr)
624 624
625 625 # Finally, if supported by the server, notify it about our own
626 626 # capabilities.
627 627 if 'protocaps' in peer.capabilities():
628 628 try:
629 peer._call("protocaps", caps=' '.join(_clientcapabilities()))
629 peer._call("protocaps",
630 caps=' '.join(sorted(_clientcapabilities())))
630 631 except IOError:
631 632 peer._cleanup()
632 633 raise error.RepoError(_('capability exchange failed'))
633 634
634 635 return peer
General Comments 0
You need to be logged in to leave comments. Login now