##// END OF EJS Templates
wireprototypes: move wire protocol response types to new module...
Gregory Szorc -
r36090:cd6ab329 default
parent child Browse files
Show More
@@ -0,0 +1,61 b''
1 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
2 #
3 # This software may be used and distributed according to the terms of the
4 # GNU General Public License version 2 or any later version.
5
6 from __future__ import absolute_import
7
8 class ooberror(object):
9 """wireproto reply: failure of a batch of operation
10
11 Something failed during a batch call. The error message is stored in
12 `self.message`.
13 """
14 def __init__(self, message):
15 self.message = message
16
17 class pushres(object):
18 """wireproto reply: success with simple integer return
19
20 The call was successful and returned an integer contained in `self.res`.
21 """
22 def __init__(self, res, output):
23 self.res = res
24 self.output = output
25
26 class pusherr(object):
27 """wireproto reply: failure
28
29 The call failed. The `self.res` attribute contains the error message.
30 """
31 def __init__(self, res, output):
32 self.res = res
33 self.output = output
34
35 class streamres(object):
36 """wireproto reply: binary stream
37
38 The call was successful and the result is a stream.
39
40 Accepts a generator containing chunks of data to be sent to the client.
41
42 ``prefer_uncompressed`` indicates that the data is expected to be
43 uncompressable and that the stream should therefore use the ``none``
44 engine.
45 """
46 def __init__(self, gen=None, prefer_uncompressed=False):
47 self.gen = gen
48 self.prefer_uncompressed = prefer_uncompressed
49
50 class streamreslegacy(object):
51 """wireproto reply: uncompressed binary stream
52
53 The call was successful and the result is a stream.
54
55 Accepts a generator containing chunks of data to be sent to the client.
56
57 Like ``streamres``, but sends an uncompressed data for "version 1" clients
58 using the application/mercurial-0.1 media type.
59 """
60 def __init__(self, gen=None):
61 self.gen = gen
@@ -1,1096 +1,1049 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import os
12 12 import tempfile
13 13
14 14 from .i18n import _
15 15 from .node import (
16 16 bin,
17 17 hex,
18 18 nullid,
19 19 )
20 20
21 21 from . import (
22 22 bundle2,
23 23 changegroup as changegroupmod,
24 24 discovery,
25 25 encoding,
26 26 error,
27 27 exchange,
28 28 peer,
29 29 pushkey as pushkeymod,
30 30 pycompat,
31 31 repository,
32 32 streamclone,
33 33 util,
34 wireprototypes,
34 35 )
35 36
36 37 urlerr = util.urlerr
37 38 urlreq = util.urlreq
38 39
40 ooberror = wireprototypes.ooberror
41 pushres = wireprototypes.pushres
42 pusherr = wireprototypes.pusherr
43 streamres = wireprototypes.streamres
44 streamres_legacy = wireprototypes.streamreslegacy
45
39 46 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
40 47 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
41 48 'IncompatibleClient')
42 49 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
43 50
44 51 class remoteiterbatcher(peer.iterbatcher):
45 52 def __init__(self, remote):
46 53 super(remoteiterbatcher, self).__init__()
47 54 self._remote = remote
48 55
49 56 def __getattr__(self, name):
50 57 # Validate this method is batchable, since submit() only supports
51 58 # batchable methods.
52 59 fn = getattr(self._remote, name)
53 60 if not getattr(fn, 'batchable', None):
54 61 raise error.ProgrammingError('Attempted to batch a non-batchable '
55 62 'call to %r' % name)
56 63
57 64 return super(remoteiterbatcher, self).__getattr__(name)
58 65
59 66 def submit(self):
60 67 """Break the batch request into many patch calls and pipeline them.
61 68
62 69 This is mostly valuable over http where request sizes can be
63 70 limited, but can be used in other places as well.
64 71 """
65 72 # 2-tuple of (command, arguments) that represents what will be
66 73 # sent over the wire.
67 74 requests = []
68 75
69 76 # 4-tuple of (command, final future, @batchable generator, remote
70 77 # future).
71 78 results = []
72 79
73 80 for command, args, opts, finalfuture in self.calls:
74 81 mtd = getattr(self._remote, command)
75 82 batchable = mtd.batchable(mtd.__self__, *args, **opts)
76 83
77 84 commandargs, fremote = next(batchable)
78 85 assert fremote
79 86 requests.append((command, commandargs))
80 87 results.append((command, finalfuture, batchable, fremote))
81 88
82 89 if requests:
83 90 self._resultiter = self._remote._submitbatch(requests)
84 91
85 92 self._results = results
86 93
87 94 def results(self):
88 95 for command, finalfuture, batchable, remotefuture in self._results:
89 96 # Get the raw result, set it in the remote future, feed it
90 97 # back into the @batchable generator so it can be decoded, and
91 98 # set the result on the final future to this value.
92 99 remoteresult = next(self._resultiter)
93 100 remotefuture.set(remoteresult)
94 101 finalfuture.set(next(batchable))
95 102
96 103 # Verify our @batchable generators only emit 2 values.
97 104 try:
98 105 next(batchable)
99 106 except StopIteration:
100 107 pass
101 108 else:
102 109 raise error.ProgrammingError('%s @batchable generator emitted '
103 110 'unexpected value count' % command)
104 111
105 112 yield finalfuture.value
106 113
107 114 # Forward a couple of names from peer to make wireproto interactions
108 115 # slightly more sensible.
109 116 batchable = peer.batchable
110 117 future = peer.future
111 118
112 119 # list of nodes encoding / decoding
113 120
114 121 def decodelist(l, sep=' '):
115 122 if l:
116 123 return [bin(v) for v in l.split(sep)]
117 124 return []
118 125
119 126 def encodelist(l, sep=' '):
120 127 try:
121 128 return sep.join(map(hex, l))
122 129 except TypeError:
123 130 raise
124 131
125 132 # batched call argument encoding
126 133
127 134 def escapearg(plain):
128 135 return (plain
129 136 .replace(':', ':c')
130 137 .replace(',', ':o')
131 138 .replace(';', ':s')
132 139 .replace('=', ':e'))
133 140
134 141 def unescapearg(escaped):
135 142 return (escaped
136 143 .replace(':e', '=')
137 144 .replace(':s', ';')
138 145 .replace(':o', ',')
139 146 .replace(':c', ':'))
140 147
141 148 def encodebatchcmds(req):
142 149 """Return a ``cmds`` argument value for the ``batch`` command."""
143 150 cmds = []
144 151 for op, argsdict in req:
145 152 # Old servers didn't properly unescape argument names. So prevent
146 153 # the sending of argument names that may not be decoded properly by
147 154 # servers.
148 155 assert all(escapearg(k) == k for k in argsdict)
149 156
150 157 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
151 158 for k, v in argsdict.iteritems())
152 159 cmds.append('%s %s' % (op, args))
153 160
154 161 return ';'.join(cmds)
155 162
156 163 # mapping of options accepted by getbundle and their types
157 164 #
158 165 # Meant to be extended by extensions. It is extensions responsibility to ensure
159 166 # such options are properly processed in exchange.getbundle.
160 167 #
161 168 # supported types are:
162 169 #
163 170 # :nodes: list of binary nodes
164 171 # :csv: list of comma-separated values
165 172 # :scsv: list of comma-separated values return as set
166 173 # :plain: string with no transformation needed.
167 174 gboptsmap = {'heads': 'nodes',
168 175 'bookmarks': 'boolean',
169 176 'common': 'nodes',
170 177 'obsmarkers': 'boolean',
171 178 'phases': 'boolean',
172 179 'bundlecaps': 'scsv',
173 180 'listkeys': 'csv',
174 181 'cg': 'boolean',
175 182 'cbattempted': 'boolean',
176 183 'stream': 'boolean',
177 184 }
178 185
179 186 # client side
180 187
181 188 class wirepeer(repository.legacypeer):
182 189 """Client-side interface for communicating with a peer repository.
183 190
184 191 Methods commonly call wire protocol commands of the same name.
185 192
186 193 See also httppeer.py and sshpeer.py for protocol-specific
187 194 implementations of this interface.
188 195 """
189 196 # Begin of basewirepeer interface.
190 197
191 198 def iterbatch(self):
192 199 return remoteiterbatcher(self)
193 200
194 201 @batchable
195 202 def lookup(self, key):
196 203 self.requirecap('lookup', _('look up remote revision'))
197 204 f = future()
198 205 yield {'key': encoding.fromlocal(key)}, f
199 206 d = f.value
200 207 success, data = d[:-1].split(" ", 1)
201 208 if int(success):
202 209 yield bin(data)
203 210 else:
204 211 self._abort(error.RepoError(data))
205 212
206 213 @batchable
207 214 def heads(self):
208 215 f = future()
209 216 yield {}, f
210 217 d = f.value
211 218 try:
212 219 yield decodelist(d[:-1])
213 220 except ValueError:
214 221 self._abort(error.ResponseError(_("unexpected response:"), d))
215 222
216 223 @batchable
217 224 def known(self, nodes):
218 225 f = future()
219 226 yield {'nodes': encodelist(nodes)}, f
220 227 d = f.value
221 228 try:
222 229 yield [bool(int(b)) for b in d]
223 230 except ValueError:
224 231 self._abort(error.ResponseError(_("unexpected response:"), d))
225 232
226 233 @batchable
227 234 def branchmap(self):
228 235 f = future()
229 236 yield {}, f
230 237 d = f.value
231 238 try:
232 239 branchmap = {}
233 240 for branchpart in d.splitlines():
234 241 branchname, branchheads = branchpart.split(' ', 1)
235 242 branchname = encoding.tolocal(urlreq.unquote(branchname))
236 243 branchheads = decodelist(branchheads)
237 244 branchmap[branchname] = branchheads
238 245 yield branchmap
239 246 except TypeError:
240 247 self._abort(error.ResponseError(_("unexpected response:"), d))
241 248
242 249 @batchable
243 250 def listkeys(self, namespace):
244 251 if not self.capable('pushkey'):
245 252 yield {}, None
246 253 f = future()
247 254 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
248 255 yield {'namespace': encoding.fromlocal(namespace)}, f
249 256 d = f.value
250 257 self.ui.debug('received listkey for "%s": %i bytes\n'
251 258 % (namespace, len(d)))
252 259 yield pushkeymod.decodekeys(d)
253 260
254 261 @batchable
255 262 def pushkey(self, namespace, key, old, new):
256 263 if not self.capable('pushkey'):
257 264 yield False, None
258 265 f = future()
259 266 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
260 267 yield {'namespace': encoding.fromlocal(namespace),
261 268 'key': encoding.fromlocal(key),
262 269 'old': encoding.fromlocal(old),
263 270 'new': encoding.fromlocal(new)}, f
264 271 d = f.value
265 272 d, output = d.split('\n', 1)
266 273 try:
267 274 d = bool(int(d))
268 275 except ValueError:
269 276 raise error.ResponseError(
270 277 _('push failed (unexpected response):'), d)
271 278 for l in output.splitlines(True):
272 279 self.ui.status(_('remote: '), l)
273 280 yield d
274 281
275 282 def stream_out(self):
276 283 return self._callstream('stream_out')
277 284
278 285 def getbundle(self, source, **kwargs):
279 286 kwargs = pycompat.byteskwargs(kwargs)
280 287 self.requirecap('getbundle', _('look up remote changes'))
281 288 opts = {}
282 289 bundlecaps = kwargs.get('bundlecaps')
283 290 if bundlecaps is not None:
284 291 kwargs['bundlecaps'] = sorted(bundlecaps)
285 292 else:
286 293 bundlecaps = () # kwargs could have it to None
287 294 for key, value in kwargs.iteritems():
288 295 if value is None:
289 296 continue
290 297 keytype = gboptsmap.get(key)
291 298 if keytype is None:
292 299 raise error.ProgrammingError(
293 300 'Unexpectedly None keytype for key %s' % key)
294 301 elif keytype == 'nodes':
295 302 value = encodelist(value)
296 303 elif keytype in ('csv', 'scsv'):
297 304 value = ','.join(value)
298 305 elif keytype == 'boolean':
299 306 value = '%i' % bool(value)
300 307 elif keytype != 'plain':
301 308 raise KeyError('unknown getbundle option type %s'
302 309 % keytype)
303 310 opts[key] = value
304 311 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
305 312 if any((cap.startswith('HG2') for cap in bundlecaps)):
306 313 return bundle2.getunbundler(self.ui, f)
307 314 else:
308 315 return changegroupmod.cg1unpacker(f, 'UN')
309 316
310 317 def unbundle(self, cg, heads, url):
311 318 '''Send cg (a readable file-like object representing the
312 319 changegroup to push, typically a chunkbuffer object) to the
313 320 remote server as a bundle.
314 321
315 322 When pushing a bundle10 stream, return an integer indicating the
316 323 result of the push (see changegroup.apply()).
317 324
318 325 When pushing a bundle20 stream, return a bundle20 stream.
319 326
320 327 `url` is the url the client thinks it's pushing to, which is
321 328 visible to hooks.
322 329 '''
323 330
324 331 if heads != ['force'] and self.capable('unbundlehash'):
325 332 heads = encodelist(['hashed',
326 333 hashlib.sha1(''.join(sorted(heads))).digest()])
327 334 else:
328 335 heads = encodelist(heads)
329 336
330 337 if util.safehasattr(cg, 'deltaheader'):
331 338 # this a bundle10, do the old style call sequence
332 339 ret, output = self._callpush("unbundle", cg, heads=heads)
333 340 if ret == "":
334 341 raise error.ResponseError(
335 342 _('push failed:'), output)
336 343 try:
337 344 ret = int(ret)
338 345 except ValueError:
339 346 raise error.ResponseError(
340 347 _('push failed (unexpected response):'), ret)
341 348
342 349 for l in output.splitlines(True):
343 350 self.ui.status(_('remote: '), l)
344 351 else:
345 352 # bundle2 push. Send a stream, fetch a stream.
346 353 stream = self._calltwowaystream('unbundle', cg, heads=heads)
347 354 ret = bundle2.getunbundler(self.ui, stream)
348 355 return ret
349 356
350 357 # End of basewirepeer interface.
351 358
352 359 # Begin of baselegacywirepeer interface.
353 360
354 361 def branches(self, nodes):
355 362 n = encodelist(nodes)
356 363 d = self._call("branches", nodes=n)
357 364 try:
358 365 br = [tuple(decodelist(b)) for b in d.splitlines()]
359 366 return br
360 367 except ValueError:
361 368 self._abort(error.ResponseError(_("unexpected response:"), d))
362 369
363 370 def between(self, pairs):
364 371 batch = 8 # avoid giant requests
365 372 r = []
366 373 for i in xrange(0, len(pairs), batch):
367 374 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
368 375 d = self._call("between", pairs=n)
369 376 try:
370 377 r.extend(l and decodelist(l) or [] for l in d.splitlines())
371 378 except ValueError:
372 379 self._abort(error.ResponseError(_("unexpected response:"), d))
373 380 return r
374 381
375 382 def changegroup(self, nodes, kind):
376 383 n = encodelist(nodes)
377 384 f = self._callcompressable("changegroup", roots=n)
378 385 return changegroupmod.cg1unpacker(f, 'UN')
379 386
380 387 def changegroupsubset(self, bases, heads, kind):
381 388 self.requirecap('changegroupsubset', _('look up remote changes'))
382 389 bases = encodelist(bases)
383 390 heads = encodelist(heads)
384 391 f = self._callcompressable("changegroupsubset",
385 392 bases=bases, heads=heads)
386 393 return changegroupmod.cg1unpacker(f, 'UN')
387 394
388 395 # End of baselegacywirepeer interface.
389 396
390 397 def _submitbatch(self, req):
391 398 """run batch request <req> on the server
392 399
393 400 Returns an iterator of the raw responses from the server.
394 401 """
395 402 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
396 403 chunk = rsp.read(1024)
397 404 work = [chunk]
398 405 while chunk:
399 406 while ';' not in chunk and chunk:
400 407 chunk = rsp.read(1024)
401 408 work.append(chunk)
402 409 merged = ''.join(work)
403 410 while ';' in merged:
404 411 one, merged = merged.split(';', 1)
405 412 yield unescapearg(one)
406 413 chunk = rsp.read(1024)
407 414 work = [merged, chunk]
408 415 yield unescapearg(''.join(work))
409 416
410 417 def _submitone(self, op, args):
411 418 return self._call(op, **pycompat.strkwargs(args))
412 419
413 420 def debugwireargs(self, one, two, three=None, four=None, five=None):
414 421 # don't pass optional arguments left at their default value
415 422 opts = {}
416 423 if three is not None:
417 424 opts[r'three'] = three
418 425 if four is not None:
419 426 opts[r'four'] = four
420 427 return self._call('debugwireargs', one=one, two=two, **opts)
421 428
422 429 def _call(self, cmd, **args):
423 430 """execute <cmd> on the server
424 431
425 432 The command is expected to return a simple string.
426 433
427 434 returns the server reply as a string."""
428 435 raise NotImplementedError()
429 436
430 437 def _callstream(self, cmd, **args):
431 438 """execute <cmd> on the server
432 439
433 440 The command is expected to return a stream. Note that if the
434 441 command doesn't return a stream, _callstream behaves
435 442 differently for ssh and http peers.
436 443
437 444 returns the server reply as a file like object.
438 445 """
439 446 raise NotImplementedError()
440 447
441 448 def _callcompressable(self, cmd, **args):
442 449 """execute <cmd> on the server
443 450
444 451 The command is expected to return a stream.
445 452
446 453 The stream may have been compressed in some implementations. This
447 454 function takes care of the decompression. This is the only difference
448 455 with _callstream.
449 456
450 457 returns the server reply as a file like object.
451 458 """
452 459 raise NotImplementedError()
453 460
454 461 def _callpush(self, cmd, fp, **args):
455 462 """execute a <cmd> on server
456 463
457 464 The command is expected to be related to a push. Push has a special
458 465 return method.
459 466
460 467 returns the server reply as a (ret, output) tuple. ret is either
461 468 empty (error) or a stringified int.
462 469 """
463 470 raise NotImplementedError()
464 471
465 472 def _calltwowaystream(self, cmd, fp, **args):
466 473 """execute <cmd> on server
467 474
468 475 The command will send a stream to the server and get a stream in reply.
469 476 """
470 477 raise NotImplementedError()
471 478
472 479 def _abort(self, exception):
473 480 """clearly abort the wire protocol connection and raise the exception
474 481 """
475 482 raise NotImplementedError()
476 483
477 484 # server side
478 485
479 486 # wire protocol command can either return a string or one of these classes.
480 class streamres(object):
481 """wireproto reply: binary stream
482
483 The call was successful and the result is a stream.
484
485 Accepts a generator containing chunks of data to be sent to the client.
486
487 ``prefer_uncompressed`` indicates that the data is expected to be
488 uncompressable and that the stream should therefore use the ``none``
489 engine.
490 """
491 def __init__(self, gen=None, prefer_uncompressed=False):
492 self.gen = gen
493 self.prefer_uncompressed = prefer_uncompressed
494
495 class streamres_legacy(object):
496 """wireproto reply: uncompressed binary stream
497
498 The call was successful and the result is a stream.
499
500 Accepts a generator containing chunks of data to be sent to the client.
501
502 Like ``streamres``, but sends an uncompressed data for "version 1" clients
503 using the application/mercurial-0.1 media type.
504 """
505 def __init__(self, gen=None):
506 self.gen = gen
507
508 class pushres(object):
509 """wireproto reply: success with simple integer return
510
511 The call was successful and returned an integer contained in `self.res`.
512 """
513 def __init__(self, res, output):
514 self.res = res
515 self.output = output
516
517 class pusherr(object):
518 """wireproto reply: failure
519
520 The call failed. The `self.res` attribute contains the error message.
521 """
522 def __init__(self, res, output):
523 self.res = res
524 self.output = output
525
526 class ooberror(object):
527 """wireproto reply: failure of a batch of operation
528
529 Something failed during a batch call. The error message is stored in
530 `self.message`.
531 """
532 def __init__(self, message):
533 self.message = message
534 487
535 488 def getdispatchrepo(repo, proto, command):
536 489 """Obtain the repo used for processing wire protocol commands.
537 490
538 491 The intent of this function is to serve as a monkeypatch point for
539 492 extensions that need commands to operate on different repo views under
540 493 specialized circumstances.
541 494 """
542 495 return repo.filtered('served')
543 496
544 497 def dispatch(repo, proto, command):
545 498 repo = getdispatchrepo(repo, proto, command)
546 499 func, spec = commands[command]
547 500 args = proto.getargs(spec)
548 501 return func(repo, proto, *args)
549 502
550 503 def options(cmd, keys, others):
551 504 opts = {}
552 505 for k in keys:
553 506 if k in others:
554 507 opts[k] = others[k]
555 508 del others[k]
556 509 if others:
557 510 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
558 511 % (cmd, ",".join(others)))
559 512 return opts
560 513
561 514 def bundle1allowed(repo, action):
562 515 """Whether a bundle1 operation is allowed from the server.
563 516
564 517 Priority is:
565 518
566 519 1. server.bundle1gd.<action> (if generaldelta active)
567 520 2. server.bundle1.<action>
568 521 3. server.bundle1gd (if generaldelta active)
569 522 4. server.bundle1
570 523 """
571 524 ui = repo.ui
572 525 gd = 'generaldelta' in repo.requirements
573 526
574 527 if gd:
575 528 v = ui.configbool('server', 'bundle1gd.%s' % action)
576 529 if v is not None:
577 530 return v
578 531
579 532 v = ui.configbool('server', 'bundle1.%s' % action)
580 533 if v is not None:
581 534 return v
582 535
583 536 if gd:
584 537 v = ui.configbool('server', 'bundle1gd')
585 538 if v is not None:
586 539 return v
587 540
588 541 return ui.configbool('server', 'bundle1')
589 542
590 543 def supportedcompengines(ui, role):
591 544 """Obtain the list of supported compression engines for a request."""
592 545 assert role in (util.CLIENTROLE, util.SERVERROLE)
593 546
594 547 compengines = util.compengines.supportedwireengines(role)
595 548
596 549 # Allow config to override default list and ordering.
597 550 if role == util.SERVERROLE:
598 551 configengines = ui.configlist('server', 'compressionengines')
599 552 config = 'server.compressionengines'
600 553 else:
601 554 # This is currently implemented mainly to facilitate testing. In most
602 555 # cases, the server should be in charge of choosing a compression engine
603 556 # because a server has the most to lose from a sub-optimal choice. (e.g.
604 557 # CPU DoS due to an expensive engine or a network DoS due to poor
605 558 # compression ratio).
606 559 configengines = ui.configlist('experimental',
607 560 'clientcompressionengines')
608 561 config = 'experimental.clientcompressionengines'
609 562
610 563 # No explicit config. Filter out the ones that aren't supposed to be
611 564 # advertised and return default ordering.
612 565 if not configengines:
613 566 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
614 567 return [e for e in compengines
615 568 if getattr(e.wireprotosupport(), attr) > 0]
616 569
617 570 # If compression engines are listed in the config, assume there is a good
618 571 # reason for it (like server operators wanting to achieve specific
619 572 # performance characteristics). So fail fast if the config references
620 573 # unusable compression engines.
621 574 validnames = set(e.name() for e in compengines)
622 575 invalidnames = set(e for e in configengines if e not in validnames)
623 576 if invalidnames:
624 577 raise error.Abort(_('invalid compression engine defined in %s: %s') %
625 578 (config, ', '.join(sorted(invalidnames))))
626 579
627 580 compengines = [e for e in compengines if e.name() in configengines]
628 581 compengines = sorted(compengines,
629 582 key=lambda e: configengines.index(e.name()))
630 583
631 584 if not compengines:
632 585 raise error.Abort(_('%s config option does not specify any known '
633 586 'compression engines') % config,
634 587 hint=_('usable compression engines: %s') %
635 588 ', '.sorted(validnames))
636 589
637 590 return compengines
638 591
639 592 class commandentry(object):
640 593 """Represents a declared wire protocol command."""
641 594 def __init__(self, func, args=''):
642 595 self.func = func
643 596 self.args = args
644 597
645 598 def _merge(self, func, args):
646 599 """Merge this instance with an incoming 2-tuple.
647 600
648 601 This is called when a caller using the old 2-tuple API attempts
649 602 to replace an instance. The incoming values are merged with
650 603 data not captured by the 2-tuple and a new instance containing
651 604 the union of the two objects is returned.
652 605 """
653 606 return commandentry(func, args)
654 607
655 608 # Old code treats instances as 2-tuples. So expose that interface.
656 609 def __iter__(self):
657 610 yield self.func
658 611 yield self.args
659 612
660 613 def __getitem__(self, i):
661 614 if i == 0:
662 615 return self.func
663 616 elif i == 1:
664 617 return self.args
665 618 else:
666 619 raise IndexError('can only access elements 0 and 1')
667 620
668 621 class commanddict(dict):
669 622 """Container for registered wire protocol commands.
670 623
671 624 It behaves like a dict. But __setitem__ is overwritten to allow silent
672 625 coercion of values from 2-tuples for API compatibility.
673 626 """
674 627 def __setitem__(self, k, v):
675 628 if isinstance(v, commandentry):
676 629 pass
677 630 # Cast 2-tuples to commandentry instances.
678 631 elif isinstance(v, tuple):
679 632 if len(v) != 2:
680 633 raise ValueError('command tuples must have exactly 2 elements')
681 634
682 635 # It is common for extensions to wrap wire protocol commands via
683 636 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
684 637 # doing this aren't aware of the new API that uses objects to store
685 638 # command entries, we automatically merge old state with new.
686 639 if k in self:
687 640 v = self[k]._merge(v[0], v[1])
688 641 else:
689 642 v = commandentry(v[0], v[1])
690 643 else:
691 644 raise ValueError('command entries must be commandentry instances '
692 645 'or 2-tuples')
693 646
694 647 return super(commanddict, self).__setitem__(k, v)
695 648
696 649 def commandavailable(self, command, proto):
697 650 """Determine if a command is available for the requested protocol."""
698 651 # For now, commands are available for all protocols. So do a simple
699 652 # membership test.
700 653 return command in self
701 654
702 655 commands = commanddict()
703 656
704 657 def wireprotocommand(name, args=''):
705 658 """Decorator to declare a wire protocol command.
706 659
707 660 ``name`` is the name of the wire protocol command being provided.
708 661
709 662 ``args`` is a space-delimited list of named arguments that the command
710 663 accepts. ``*`` is a special value that says to accept all arguments.
711 664 """
712 665 def register(func):
713 666 commands[name] = commandentry(func, args)
714 667 return func
715 668 return register
716 669
717 670 @wireprotocommand('batch', 'cmds *')
718 671 def batch(repo, proto, cmds, others):
719 672 repo = repo.filtered("served")
720 673 res = []
721 674 for pair in cmds.split(';'):
722 675 op, args = pair.split(' ', 1)
723 676 vals = {}
724 677 for a in args.split(','):
725 678 if a:
726 679 n, v = a.split('=')
727 680 vals[unescapearg(n)] = unescapearg(v)
728 681 func, spec = commands[op]
729 682 if spec:
730 683 keys = spec.split()
731 684 data = {}
732 685 for k in keys:
733 686 if k == '*':
734 687 star = {}
735 688 for key in vals.keys():
736 689 if key not in keys:
737 690 star[key] = vals[key]
738 691 data['*'] = star
739 692 else:
740 693 data[k] = vals[k]
741 694 result = func(repo, proto, *[data[k] for k in keys])
742 695 else:
743 696 result = func(repo, proto)
744 697 if isinstance(result, ooberror):
745 698 return result
746 699 res.append(escapearg(result))
747 700 return ';'.join(res)
748 701
749 702 @wireprotocommand('between', 'pairs')
750 703 def between(repo, proto, pairs):
751 704 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
752 705 r = []
753 706 for b in repo.between(pairs):
754 707 r.append(encodelist(b) + "\n")
755 708 return "".join(r)
756 709
757 710 @wireprotocommand('branchmap')
758 711 def branchmap(repo, proto):
759 712 branchmap = repo.branchmap()
760 713 heads = []
761 714 for branch, nodes in branchmap.iteritems():
762 715 branchname = urlreq.quote(encoding.fromlocal(branch))
763 716 branchnodes = encodelist(nodes)
764 717 heads.append('%s %s' % (branchname, branchnodes))
765 718 return '\n'.join(heads)
766 719
767 720 @wireprotocommand('branches', 'nodes')
768 721 def branches(repo, proto, nodes):
769 722 nodes = decodelist(nodes)
770 723 r = []
771 724 for b in repo.branches(nodes):
772 725 r.append(encodelist(b) + "\n")
773 726 return "".join(r)
774 727
775 728 @wireprotocommand('clonebundles', '')
776 729 def clonebundles(repo, proto):
777 730 """Server command for returning info for available bundles to seed clones.
778 731
779 732 Clients will parse this response and determine what bundle to fetch.
780 733
781 734 Extensions may wrap this command to filter or dynamically emit data
782 735 depending on the request. e.g. you could advertise URLs for the closest
783 736 data center given the client's IP address.
784 737 """
785 738 return repo.vfs.tryread('clonebundles.manifest')
786 739
787 740 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
788 741 'known', 'getbundle', 'unbundlehash', 'batch']
789 742
790 743 def _capabilities(repo, proto):
791 744 """return a list of capabilities for a repo
792 745
793 746 This function exists to allow extensions to easily wrap capabilities
794 747 computation
795 748
796 749 - returns a lists: easy to alter
797 750 - change done here will be propagated to both `capabilities` and `hello`
798 751 command without any other action needed.
799 752 """
800 753 # copy to prevent modification of the global list
801 754 caps = list(wireprotocaps)
802 755 if streamclone.allowservergeneration(repo):
803 756 if repo.ui.configbool('server', 'preferuncompressed'):
804 757 caps.append('stream-preferred')
805 758 requiredformats = repo.requirements & repo.supportedformats
806 759 # if our local revlogs are just revlogv1, add 'stream' cap
807 760 if not requiredformats - {'revlogv1'}:
808 761 caps.append('stream')
809 762 # otherwise, add 'streamreqs' detailing our local revlog format
810 763 else:
811 764 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
812 765 if repo.ui.configbool('experimental', 'bundle2-advertise'):
813 766 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
814 767 caps.append('bundle2=' + urlreq.quote(capsblob))
815 768 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
816 769
817 770 if proto.name == 'http':
818 771 caps.append('httpheader=%d' %
819 772 repo.ui.configint('server', 'maxhttpheaderlen'))
820 773 if repo.ui.configbool('experimental', 'httppostargs'):
821 774 caps.append('httppostargs')
822 775
823 776 # FUTURE advertise 0.2rx once support is implemented
824 777 # FUTURE advertise minrx and mintx after consulting config option
825 778 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
826 779
827 780 compengines = supportedcompengines(repo.ui, util.SERVERROLE)
828 781 if compengines:
829 782 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
830 783 for e in compengines)
831 784 caps.append('compression=%s' % comptypes)
832 785
833 786 return caps
834 787
835 788 # If you are writing an extension and consider wrapping this function. Wrap
836 789 # `_capabilities` instead.
837 790 @wireprotocommand('capabilities')
838 791 def capabilities(repo, proto):
839 792 return ' '.join(_capabilities(repo, proto))
840 793
841 794 @wireprotocommand('changegroup', 'roots')
842 795 def changegroup(repo, proto, roots):
843 796 nodes = decodelist(roots)
844 797 outgoing = discovery.outgoing(repo, missingroots=nodes,
845 798 missingheads=repo.heads())
846 799 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
847 800 gen = iter(lambda: cg.read(32768), '')
848 801 return streamres(gen=gen)
849 802
850 803 @wireprotocommand('changegroupsubset', 'bases heads')
851 804 def changegroupsubset(repo, proto, bases, heads):
852 805 bases = decodelist(bases)
853 806 heads = decodelist(heads)
854 807 outgoing = discovery.outgoing(repo, missingroots=bases,
855 808 missingheads=heads)
856 809 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
857 810 gen = iter(lambda: cg.read(32768), '')
858 811 return streamres(gen=gen)
859 812
860 813 @wireprotocommand('debugwireargs', 'one two *')
861 814 def debugwireargs(repo, proto, one, two, others):
862 815 # only accept optional args from the known set
863 816 opts = options('debugwireargs', ['three', 'four'], others)
864 817 return repo.debugwireargs(one, two, **pycompat.strkwargs(opts))
865 818
866 819 @wireprotocommand('getbundle', '*')
867 820 def getbundle(repo, proto, others):
868 821 opts = options('getbundle', gboptsmap.keys(), others)
869 822 for k, v in opts.iteritems():
870 823 keytype = gboptsmap[k]
871 824 if keytype == 'nodes':
872 825 opts[k] = decodelist(v)
873 826 elif keytype == 'csv':
874 827 opts[k] = list(v.split(','))
875 828 elif keytype == 'scsv':
876 829 opts[k] = set(v.split(','))
877 830 elif keytype == 'boolean':
878 831 # Client should serialize False as '0', which is a non-empty string
879 832 # so it evaluates as a True bool.
880 833 if v == '0':
881 834 opts[k] = False
882 835 else:
883 836 opts[k] = bool(v)
884 837 elif keytype != 'plain':
885 838 raise KeyError('unknown getbundle option type %s'
886 839 % keytype)
887 840
888 841 if not bundle1allowed(repo, 'pull'):
889 842 if not exchange.bundle2requested(opts.get('bundlecaps')):
890 843 if proto.name == 'http':
891 844 return ooberror(bundle2required)
892 845 raise error.Abort(bundle2requiredmain,
893 846 hint=bundle2requiredhint)
894 847
895 848 prefercompressed = True
896 849
897 850 try:
898 851 if repo.ui.configbool('server', 'disablefullbundle'):
899 852 # Check to see if this is a full clone.
900 853 clheads = set(repo.changelog.heads())
901 854 changegroup = opts.get('cg', True)
902 855 heads = set(opts.get('heads', set()))
903 856 common = set(opts.get('common', set()))
904 857 common.discard(nullid)
905 858 if changegroup and not common and clheads == heads:
906 859 raise error.Abort(
907 860 _('server has pull-based clones disabled'),
908 861 hint=_('remove --pull if specified or upgrade Mercurial'))
909 862
910 863 info, chunks = exchange.getbundlechunks(repo, 'serve',
911 864 **pycompat.strkwargs(opts))
912 865 prefercompressed = info.get('prefercompressed', True)
913 866 except error.Abort as exc:
914 867 # cleanly forward Abort error to the client
915 868 if not exchange.bundle2requested(opts.get('bundlecaps')):
916 869 if proto.name == 'http':
917 870 return ooberror(str(exc) + '\n')
918 871 raise # cannot do better for bundle1 + ssh
919 872 # bundle2 request expect a bundle2 reply
920 873 bundler = bundle2.bundle20(repo.ui)
921 874 manargs = [('message', str(exc))]
922 875 advargs = []
923 876 if exc.hint is not None:
924 877 advargs.append(('hint', exc.hint))
925 878 bundler.addpart(bundle2.bundlepart('error:abort',
926 879 manargs, advargs))
927 880 chunks = bundler.getchunks()
928 881 prefercompressed = False
929 882
930 883 return streamres(gen=chunks, prefer_uncompressed=not prefercompressed)
931 884
932 885 @wireprotocommand('heads')
933 886 def heads(repo, proto):
934 887 h = repo.heads()
935 888 return encodelist(h) + "\n"
936 889
937 890 @wireprotocommand('hello')
938 891 def hello(repo, proto):
939 892 '''the hello command returns a set of lines describing various
940 893 interesting things about the server, in an RFC822-like format.
941 894 Currently the only one defined is "capabilities", which
942 895 consists of a line in the form:
943 896
944 897 capabilities: space separated list of tokens
945 898 '''
946 899 return "capabilities: %s\n" % (capabilities(repo, proto))
947 900
948 901 @wireprotocommand('listkeys', 'namespace')
949 902 def listkeys(repo, proto, namespace):
950 903 d = repo.listkeys(encoding.tolocal(namespace)).items()
951 904 return pushkeymod.encodekeys(d)
952 905
953 906 @wireprotocommand('lookup', 'key')
954 907 def lookup(repo, proto, key):
955 908 try:
956 909 k = encoding.tolocal(key)
957 910 c = repo[k]
958 911 r = c.hex()
959 912 success = 1
960 913 except Exception as inst:
961 914 r = str(inst)
962 915 success = 0
963 916 return "%d %s\n" % (success, r)
964 917
965 918 @wireprotocommand('known', 'nodes *')
966 919 def known(repo, proto, nodes, others):
967 920 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
968 921
969 922 @wireprotocommand('pushkey', 'namespace key old new')
970 923 def pushkey(repo, proto, namespace, key, old, new):
971 924 # compatibility with pre-1.8 clients which were accidentally
972 925 # sending raw binary nodes rather than utf-8-encoded hex
973 926 if len(new) == 20 and util.escapestr(new) != new:
974 927 # looks like it could be a binary node
975 928 try:
976 929 new.decode('utf-8')
977 930 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
978 931 except UnicodeDecodeError:
979 932 pass # binary, leave unmodified
980 933 else:
981 934 new = encoding.tolocal(new) # normal path
982 935
983 936 with proto.mayberedirectstdio() as output:
984 937 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
985 938 encoding.tolocal(old), new) or False
986 939
987 940 output = output.getvalue() if output else ''
988 941 return '%s\n%s' % (int(r), output)
989 942
990 943 @wireprotocommand('stream_out')
991 944 def stream(repo, proto):
992 945 '''If the server supports streaming clone, it advertises the "stream"
993 946 capability with a value representing the version and flags of the repo
994 947 it is serving. Client checks to see if it understands the format.
995 948 '''
996 949 return streamres_legacy(streamclone.generatev1wireproto(repo))
997 950
998 951 @wireprotocommand('unbundle', 'heads')
999 952 def unbundle(repo, proto, heads):
1000 953 their_heads = decodelist(heads)
1001 954
1002 955 with proto.mayberedirectstdio() as output:
1003 956 try:
1004 957 exchange.check_heads(repo, their_heads, 'preparing changes')
1005 958
1006 959 # write bundle data to temporary file because it can be big
1007 960 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1008 961 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
1009 962 r = 0
1010 963 try:
1011 964 proto.forwardpayload(fp)
1012 965 fp.seek(0)
1013 966 gen = exchange.readbundle(repo.ui, fp, None)
1014 967 if (isinstance(gen, changegroupmod.cg1unpacker)
1015 968 and not bundle1allowed(repo, 'push')):
1016 969 if proto.name == 'http':
1017 970 # need to special case http because stderr do not get to
1018 971 # the http client on failed push so we need to abuse
1019 972 # some other error type to make sure the message get to
1020 973 # the user.
1021 974 return ooberror(bundle2required)
1022 975 raise error.Abort(bundle2requiredmain,
1023 976 hint=bundle2requiredhint)
1024 977
1025 978 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1026 979 proto.client())
1027 980 if util.safehasattr(r, 'addpart'):
1028 981 # The return looks streamable, we are in the bundle2 case
1029 982 # and should return a stream.
1030 983 return streamres_legacy(gen=r.getchunks())
1031 984 return pushres(r, output.getvalue() if output else '')
1032 985
1033 986 finally:
1034 987 fp.close()
1035 988 os.unlink(tempname)
1036 989
1037 990 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1038 991 # handle non-bundle2 case first
1039 992 if not getattr(exc, 'duringunbundle2', False):
1040 993 try:
1041 994 raise
1042 995 except error.Abort:
1043 996 # The old code we moved used util.stderr directly.
1044 997 # We did not change it to minimise code change.
1045 998 # This need to be moved to something proper.
1046 999 # Feel free to do it.
1047 1000 util.stderr.write("abort: %s\n" % exc)
1048 1001 if exc.hint is not None:
1049 1002 util.stderr.write("(%s)\n" % exc.hint)
1050 1003 return pushres(0, output.getvalue() if output else '')
1051 1004 except error.PushRaced:
1052 1005 return pusherr(str(exc),
1053 1006 output.getvalue() if output else '')
1054 1007
1055 1008 bundler = bundle2.bundle20(repo.ui)
1056 1009 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1057 1010 bundler.addpart(out)
1058 1011 try:
1059 1012 try:
1060 1013 raise
1061 1014 except error.PushkeyFailed as exc:
1062 1015 # check client caps
1063 1016 remotecaps = getattr(exc, '_replycaps', None)
1064 1017 if (remotecaps is not None
1065 1018 and 'pushkey' not in remotecaps.get('error', ())):
1066 1019 # no support remote side, fallback to Abort handler.
1067 1020 raise
1068 1021 part = bundler.newpart('error:pushkey')
1069 1022 part.addparam('in-reply-to', exc.partid)
1070 1023 if exc.namespace is not None:
1071 1024 part.addparam('namespace', exc.namespace,
1072 1025 mandatory=False)
1073 1026 if exc.key is not None:
1074 1027 part.addparam('key', exc.key, mandatory=False)
1075 1028 if exc.new is not None:
1076 1029 part.addparam('new', exc.new, mandatory=False)
1077 1030 if exc.old is not None:
1078 1031 part.addparam('old', exc.old, mandatory=False)
1079 1032 if exc.ret is not None:
1080 1033 part.addparam('ret', exc.ret, mandatory=False)
1081 1034 except error.BundleValueError as exc:
1082 1035 errpart = bundler.newpart('error:unsupportedcontent')
1083 1036 if exc.parttype is not None:
1084 1037 errpart.addparam('parttype', exc.parttype)
1085 1038 if exc.params:
1086 1039 errpart.addparam('params', '\0'.join(exc.params))
1087 1040 except error.Abort as exc:
1088 1041 manargs = [('message', str(exc))]
1089 1042 advargs = []
1090 1043 if exc.hint is not None:
1091 1044 advargs.append(('hint', exc.hint))
1092 1045 bundler.addpart(bundle2.bundlepart('error:abort',
1093 1046 manargs, advargs))
1094 1047 except error.PushRaced as exc:
1095 1048 bundler.newpart('error:pushraced', [('message', str(exc))])
1096 1049 return streamres_legacy(gen=bundler.getchunks())
@@ -1,453 +1,454 b''
1 1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 3 #
4 4 # This software may be used and distributed according to the terms of the
5 5 # GNU General Public License version 2 or any later version.
6 6
7 7 from __future__ import absolute_import
8 8
9 9 import abc
10 10 import cgi
11 11 import contextlib
12 12 import struct
13 13 import sys
14 14
15 15 from .i18n import _
16 16 from . import (
17 17 encoding,
18 18 error,
19 19 hook,
20 20 pycompat,
21 21 util,
22 22 wireproto,
23 wireprototypes,
23 24 )
24 25
25 26 stringio = util.stringio
26 27
27 28 urlerr = util.urlerr
28 29 urlreq = util.urlreq
29 30
30 31 HTTP_OK = 200
31 32
32 33 HGTYPE = 'application/mercurial-0.1'
33 34 HGTYPE2 = 'application/mercurial-0.2'
34 35 HGERRTYPE = 'application/hg-error'
35 36
36 37 # Names of the SSH protocol implementations.
37 38 SSHV1 = 'ssh-v1'
38 39 # This is advertised over the wire. Incremental the counter at the end
39 40 # to reflect BC breakages.
40 41 SSHV2 = 'exp-ssh-v2-0001'
41 42
42 43 class baseprotocolhandler(object):
43 44 """Abstract base class for wire protocol handlers.
44 45
45 46 A wire protocol handler serves as an interface between protocol command
46 47 handlers and the wire protocol transport layer. Protocol handlers provide
47 48 methods to read command arguments, redirect stdio for the duration of
48 49 the request, handle response types, etc.
49 50 """
50 51
51 52 __metaclass__ = abc.ABCMeta
52 53
53 54 @abc.abstractproperty
54 55 def name(self):
55 56 """The name of the protocol implementation.
56 57
57 58 Used for uniquely identifying the transport type.
58 59 """
59 60
60 61 @abc.abstractmethod
61 62 def getargs(self, args):
62 63 """return the value for arguments in <args>
63 64
64 65 returns a list of values (same order as <args>)"""
65 66
66 67 @abc.abstractmethod
67 68 def forwardpayload(self, fp):
68 69 """Read the raw payload and forward to a file.
69 70
70 71 The payload is read in full before the function returns.
71 72 """
72 73
73 74 @abc.abstractmethod
74 75 def mayberedirectstdio(self):
75 76 """Context manager to possibly redirect stdio.
76 77
77 78 The context manager yields a file-object like object that receives
78 79 stdout and stderr output when the context manager is active. Or it
79 80 yields ``None`` if no I/O redirection occurs.
80 81
81 82 The intent of this context manager is to capture stdio output
82 83 so it may be sent in the response. Some transports support streaming
83 84 stdio to the client in real time. For these transports, stdio output
84 85 won't be captured.
85 86 """
86 87
87 88 @abc.abstractmethod
88 89 def client(self):
89 90 """Returns a string representation of this client (as bytes)."""
90 91
91 92 def decodevaluefromheaders(req, headerprefix):
92 93 """Decode a long value from multiple HTTP request headers.
93 94
94 95 Returns the value as a bytes, not a str.
95 96 """
96 97 chunks = []
97 98 i = 1
98 99 prefix = headerprefix.upper().replace(r'-', r'_')
99 100 while True:
100 101 v = req.env.get(r'HTTP_%s_%d' % (prefix, i))
101 102 if v is None:
102 103 break
103 104 chunks.append(pycompat.bytesurl(v))
104 105 i += 1
105 106
106 107 return ''.join(chunks)
107 108
108 109 class webproto(baseprotocolhandler):
109 110 def __init__(self, req, ui):
110 111 self._req = req
111 112 self._ui = ui
112 113
113 114 @property
114 115 def name(self):
115 116 return 'http'
116 117
117 118 def getargs(self, args):
118 119 knownargs = self._args()
119 120 data = {}
120 121 keys = args.split()
121 122 for k in keys:
122 123 if k == '*':
123 124 star = {}
124 125 for key in knownargs.keys():
125 126 if key != 'cmd' and key not in keys:
126 127 star[key] = knownargs[key][0]
127 128 data['*'] = star
128 129 else:
129 130 data[k] = knownargs[k][0]
130 131 return [data[k] for k in keys]
131 132
132 133 def _args(self):
133 134 args = util.rapply(pycompat.bytesurl, self._req.form.copy())
134 135 postlen = int(self._req.env.get(r'HTTP_X_HGARGS_POST', 0))
135 136 if postlen:
136 137 args.update(cgi.parse_qs(
137 138 self._req.read(postlen), keep_blank_values=True))
138 139 return args
139 140
140 141 argvalue = decodevaluefromheaders(self._req, r'X-HgArg')
141 142 args.update(cgi.parse_qs(argvalue, keep_blank_values=True))
142 143 return args
143 144
144 145 def forwardpayload(self, fp):
145 146 length = int(self._req.env[r'CONTENT_LENGTH'])
146 147 # If httppostargs is used, we need to read Content-Length
147 148 # minus the amount that was consumed by args.
148 149 length -= int(self._req.env.get(r'HTTP_X_HGARGS_POST', 0))
149 150 for s in util.filechunkiter(self._req, limit=length):
150 151 fp.write(s)
151 152
152 153 @contextlib.contextmanager
153 154 def mayberedirectstdio(self):
154 155 oldout = self._ui.fout
155 156 olderr = self._ui.ferr
156 157
157 158 out = util.stringio()
158 159
159 160 try:
160 161 self._ui.fout = out
161 162 self._ui.ferr = out
162 163 yield out
163 164 finally:
164 165 self._ui.fout = oldout
165 166 self._ui.ferr = olderr
166 167
167 168 def client(self):
168 169 return 'remote:%s:%s:%s' % (
169 170 self._req.env.get('wsgi.url_scheme') or 'http',
170 171 urlreq.quote(self._req.env.get('REMOTE_HOST', '')),
171 172 urlreq.quote(self._req.env.get('REMOTE_USER', '')))
172 173
173 174 def iscmd(cmd):
174 175 return cmd in wireproto.commands
175 176
176 177 def parsehttprequest(repo, req, query):
177 178 """Parse the HTTP request for a wire protocol request.
178 179
179 180 If the current request appears to be a wire protocol request, this
180 181 function returns a dict with details about that request, including
181 182 an ``abstractprotocolserver`` instance suitable for handling the
182 183 request. Otherwise, ``None`` is returned.
183 184
184 185 ``req`` is a ``wsgirequest`` instance.
185 186 """
186 187 # HTTP version 1 wire protocol requests are denoted by a "cmd" query
187 188 # string parameter. If it isn't present, this isn't a wire protocol
188 189 # request.
189 190 if r'cmd' not in req.form:
190 191 return None
191 192
192 193 cmd = pycompat.sysbytes(req.form[r'cmd'][0])
193 194
194 195 # The "cmd" request parameter is used by both the wire protocol and hgweb.
195 196 # While not all wire protocol commands are available for all transports,
196 197 # if we see a "cmd" value that resembles a known wire protocol command, we
197 198 # route it to a protocol handler. This is better than routing possible
198 199 # wire protocol requests to hgweb because it prevents hgweb from using
199 200 # known wire protocol commands and it is less confusing for machine
200 201 # clients.
201 202 if cmd not in wireproto.commands:
202 203 return None
203 204
204 205 proto = webproto(req, repo.ui)
205 206
206 207 return {
207 208 'cmd': cmd,
208 209 'proto': proto,
209 210 'dispatch': lambda: _callhttp(repo, req, proto, cmd),
210 211 'handleerror': lambda ex: _handlehttperror(ex, req, cmd),
211 212 }
212 213
213 214 def _httpresponsetype(ui, req, prefer_uncompressed):
214 215 """Determine the appropriate response type and compression settings.
215 216
216 217 Returns a tuple of (mediatype, compengine, engineopts).
217 218 """
218 219 # Determine the response media type and compression engine based
219 220 # on the request parameters.
220 221 protocaps = decodevaluefromheaders(req, r'X-HgProto').split(' ')
221 222
222 223 if '0.2' in protocaps:
223 224 # All clients are expected to support uncompressed data.
224 225 if prefer_uncompressed:
225 226 return HGTYPE2, util._noopengine(), {}
226 227
227 228 # Default as defined by wire protocol spec.
228 229 compformats = ['zlib', 'none']
229 230 for cap in protocaps:
230 231 if cap.startswith('comp='):
231 232 compformats = cap[5:].split(',')
232 233 break
233 234
234 235 # Now find an agreed upon compression format.
235 236 for engine in wireproto.supportedcompengines(ui, util.SERVERROLE):
236 237 if engine.wireprotosupport().name in compformats:
237 238 opts = {}
238 239 level = ui.configint('server', '%slevel' % engine.name())
239 240 if level is not None:
240 241 opts['level'] = level
241 242
242 243 return HGTYPE2, engine, opts
243 244
244 245 # No mutually supported compression format. Fall back to the
245 246 # legacy protocol.
246 247
247 248 # Don't allow untrusted settings because disabling compression or
248 249 # setting a very high compression level could lead to flooding
249 250 # the server's network or CPU.
250 251 opts = {'level': ui.configint('server', 'zliblevel')}
251 252 return HGTYPE, util.compengines['zlib'], opts
252 253
253 254 def _callhttp(repo, req, proto, cmd):
254 255 def genversion2(gen, engine, engineopts):
255 256 # application/mercurial-0.2 always sends a payload header
256 257 # identifying the compression engine.
257 258 name = engine.wireprotosupport().name
258 259 assert 0 < len(name) < 256
259 260 yield struct.pack('B', len(name))
260 261 yield name
261 262
262 263 for chunk in gen:
263 264 yield chunk
264 265
265 266 rsp = wireproto.dispatch(repo, proto, cmd)
266 267
267 268 if not wireproto.commands.commandavailable(cmd, proto):
268 269 req.respond(HTTP_OK, HGERRTYPE,
269 270 body=_('requested wire protocol command is not available '
270 271 'over HTTP'))
271 272 return []
272 273
273 274 if isinstance(rsp, bytes):
274 275 req.respond(HTTP_OK, HGTYPE, body=rsp)
275 276 return []
276 elif isinstance(rsp, wireproto.streamres_legacy):
277 elif isinstance(rsp, wireprototypes.streamreslegacy):
277 278 gen = rsp.gen
278 279 req.respond(HTTP_OK, HGTYPE)
279 280 return gen
280 elif isinstance(rsp, wireproto.streamres):
281 elif isinstance(rsp, wireprototypes.streamres):
281 282 gen = rsp.gen
282 283
283 284 # This code for compression should not be streamres specific. It
284 285 # is here because we only compress streamres at the moment.
285 286 mediatype, engine, engineopts = _httpresponsetype(
286 287 repo.ui, req, rsp.prefer_uncompressed)
287 288 gen = engine.compressstream(gen, engineopts)
288 289
289 290 if mediatype == HGTYPE2:
290 291 gen = genversion2(gen, engine, engineopts)
291 292
292 293 req.respond(HTTP_OK, mediatype)
293 294 return gen
294 elif isinstance(rsp, wireproto.pushres):
295 elif isinstance(rsp, wireprototypes.pushres):
295 296 rsp = '%d\n%s' % (rsp.res, rsp.output)
296 297 req.respond(HTTP_OK, HGTYPE, body=rsp)
297 298 return []
298 elif isinstance(rsp, wireproto.pusherr):
299 elif isinstance(rsp, wireprototypes.pusherr):
299 300 # This is the httplib workaround documented in _handlehttperror().
300 301 req.drain()
301 302
302 303 rsp = '0\n%s\n' % rsp.res
303 304 req.respond(HTTP_OK, HGTYPE, body=rsp)
304 305 return []
305 elif isinstance(rsp, wireproto.ooberror):
306 elif isinstance(rsp, wireprototypes.ooberror):
306 307 rsp = rsp.message
307 308 req.respond(HTTP_OK, HGERRTYPE, body=rsp)
308 309 return []
309 310 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
310 311
311 312 def _handlehttperror(e, req, cmd):
312 313 """Called when an ErrorResponse is raised during HTTP request processing."""
313 314
314 315 # Clients using Python's httplib are stateful: the HTTP client
315 316 # won't process an HTTP response until all request data is
316 317 # sent to the server. The intent of this code is to ensure
317 318 # we always read HTTP request data from the client, thus
318 319 # ensuring httplib transitions to a state that allows it to read
319 320 # the HTTP response. In other words, it helps prevent deadlocks
320 321 # on clients using httplib.
321 322
322 323 if (req.env[r'REQUEST_METHOD'] == r'POST' and
323 324 # But not if Expect: 100-continue is being used.
324 325 (req.env.get('HTTP_EXPECT',
325 326 '').lower() != '100-continue') or
326 327 # Or the non-httplib HTTP library is being advertised by
327 328 # the client.
328 329 req.env.get('X-HgHttp2', '')):
329 330 req.drain()
330 331 else:
331 332 req.headers.append((r'Connection', r'Close'))
332 333
333 334 # TODO This response body assumes the failed command was
334 335 # "unbundle." That assumption is not always valid.
335 336 req.respond(e, HGTYPE, body='0\n%s\n' % e)
336 337
337 338 return ''
338 339
339 340 def _sshv1respondbytes(fout, value):
340 341 """Send a bytes response for protocol version 1."""
341 342 fout.write('%d\n' % len(value))
342 343 fout.write(value)
343 344 fout.flush()
344 345
345 346 def _sshv1respondstream(fout, source):
346 347 write = fout.write
347 348 for chunk in source.gen:
348 349 write(chunk)
349 350 fout.flush()
350 351
351 352 def _sshv1respondooberror(fout, ferr, rsp):
352 353 ferr.write(b'%s\n-\n' % rsp)
353 354 ferr.flush()
354 355 fout.write(b'\n')
355 356 fout.flush()
356 357
357 358 class sshv1protocolhandler(baseprotocolhandler):
358 359 """Handler for requests services via version 1 of SSH protocol."""
359 360 def __init__(self, ui, fin, fout):
360 361 self._ui = ui
361 362 self._fin = fin
362 363 self._fout = fout
363 364
364 365 @property
365 366 def name(self):
366 367 return 'ssh'
367 368
368 369 def getargs(self, args):
369 370 data = {}
370 371 keys = args.split()
371 372 for n in xrange(len(keys)):
372 373 argline = self._fin.readline()[:-1]
373 374 arg, l = argline.split()
374 375 if arg not in keys:
375 376 raise error.Abort(_("unexpected parameter %r") % arg)
376 377 if arg == '*':
377 378 star = {}
378 379 for k in xrange(int(l)):
379 380 argline = self._fin.readline()[:-1]
380 381 arg, l = argline.split()
381 382 val = self._fin.read(int(l))
382 383 star[arg] = val
383 384 data['*'] = star
384 385 else:
385 386 val = self._fin.read(int(l))
386 387 data[arg] = val
387 388 return [data[k] for k in keys]
388 389
389 390 def forwardpayload(self, fpout):
390 391 # The file is in the form:
391 392 #
392 393 # <chunk size>\n<chunk>
393 394 # ...
394 395 # 0\n
395 396 _sshv1respondbytes(self._fout, b'')
396 397 count = int(self._fin.readline())
397 398 while count:
398 399 fpout.write(self._fin.read(count))
399 400 count = int(self._fin.readline())
400 401
401 402 @contextlib.contextmanager
402 403 def mayberedirectstdio(self):
403 404 yield None
404 405
405 406 def client(self):
406 407 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
407 408 return 'remote:ssh:' + client
408 409
409 410 class sshserver(object):
410 411 def __init__(self, ui, repo):
411 412 self._ui = ui
412 413 self._repo = repo
413 414 self._fin = ui.fin
414 415 self._fout = ui.fout
415 416
416 417 hook.redirect(True)
417 418 ui.fout = repo.ui.fout = ui.ferr
418 419
419 420 # Prevent insertion/deletion of CRs
420 421 util.setbinary(self._fin)
421 422 util.setbinary(self._fout)
422 423
423 424 self._proto = sshv1protocolhandler(self._ui, self._fin, self._fout)
424 425
425 426 def serve_forever(self):
426 427 while self.serve_one():
427 428 pass
428 429 sys.exit(0)
429 430
430 431 def serve_one(self):
431 432 cmd = self._fin.readline()[:-1]
432 433 if cmd and wireproto.commands.commandavailable(cmd, self._proto):
433 434 rsp = wireproto.dispatch(self._repo, self._proto, cmd)
434 435
435 436 if isinstance(rsp, bytes):
436 437 _sshv1respondbytes(self._fout, rsp)
437 elif isinstance(rsp, wireproto.streamres):
438 elif isinstance(rsp, wireprototypes.streamres):
438 439 _sshv1respondstream(self._fout, rsp)
439 elif isinstance(rsp, wireproto.streamres_legacy):
440 elif isinstance(rsp, wireprototypes.streamreslegacy):
440 441 _sshv1respondstream(self._fout, rsp)
441 elif isinstance(rsp, wireproto.pushres):
442 elif isinstance(rsp, wireprototypes.pushres):
442 443 _sshv1respondbytes(self._fout, b'')
443 444 _sshv1respondbytes(self._fout, bytes(rsp.res))
444 elif isinstance(rsp, wireproto.pusherr):
445 elif isinstance(rsp, wireprototypes.pusherr):
445 446 _sshv1respondbytes(self._fout, rsp.res)
446 elif isinstance(rsp, wireproto.ooberror):
447 elif isinstance(rsp, wireprototypes.ooberror):
447 448 _sshv1respondooberror(self._fout, self._ui.ferr, rsp.message)
448 449 else:
449 450 raise error.ProgrammingError('unhandled response type from '
450 451 'wire protocol command: %s' % rsp)
451 452 elif cmd:
452 453 _sshv1respondbytes(self._fout, b'')
453 454 return cmd != ''
General Comments 0
You need to be logged in to leave comments. Login now