##// END OF EJS Templates
wireprotoserver: rename _client to client (API)...
Gregory Szorc -
r36086:957e7736 default
parent child Browse files
Show More
@@ -1,1096 +1,1096 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import os
12 12 import tempfile
13 13
14 14 from .i18n import _
15 15 from .node import (
16 16 bin,
17 17 hex,
18 18 nullid,
19 19 )
20 20
21 21 from . import (
22 22 bundle2,
23 23 changegroup as changegroupmod,
24 24 discovery,
25 25 encoding,
26 26 error,
27 27 exchange,
28 28 peer,
29 29 pushkey as pushkeymod,
30 30 pycompat,
31 31 repository,
32 32 streamclone,
33 33 util,
34 34 )
35 35
36 36 urlerr = util.urlerr
37 37 urlreq = util.urlreq
38 38
39 39 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
40 40 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
41 41 'IncompatibleClient')
42 42 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
43 43
44 44 class remoteiterbatcher(peer.iterbatcher):
45 45 def __init__(self, remote):
46 46 super(remoteiterbatcher, self).__init__()
47 47 self._remote = remote
48 48
49 49 def __getattr__(self, name):
50 50 # Validate this method is batchable, since submit() only supports
51 51 # batchable methods.
52 52 fn = getattr(self._remote, name)
53 53 if not getattr(fn, 'batchable', None):
54 54 raise error.ProgrammingError('Attempted to batch a non-batchable '
55 55 'call to %r' % name)
56 56
57 57 return super(remoteiterbatcher, self).__getattr__(name)
58 58
59 59 def submit(self):
60 60 """Break the batch request into many patch calls and pipeline them.
61 61
62 62 This is mostly valuable over http where request sizes can be
63 63 limited, but can be used in other places as well.
64 64 """
65 65 # 2-tuple of (command, arguments) that represents what will be
66 66 # sent over the wire.
67 67 requests = []
68 68
69 69 # 4-tuple of (command, final future, @batchable generator, remote
70 70 # future).
71 71 results = []
72 72
73 73 for command, args, opts, finalfuture in self.calls:
74 74 mtd = getattr(self._remote, command)
75 75 batchable = mtd.batchable(mtd.__self__, *args, **opts)
76 76
77 77 commandargs, fremote = next(batchable)
78 78 assert fremote
79 79 requests.append((command, commandargs))
80 80 results.append((command, finalfuture, batchable, fremote))
81 81
82 82 if requests:
83 83 self._resultiter = self._remote._submitbatch(requests)
84 84
85 85 self._results = results
86 86
87 87 def results(self):
88 88 for command, finalfuture, batchable, remotefuture in self._results:
89 89 # Get the raw result, set it in the remote future, feed it
90 90 # back into the @batchable generator so it can be decoded, and
91 91 # set the result on the final future to this value.
92 92 remoteresult = next(self._resultiter)
93 93 remotefuture.set(remoteresult)
94 94 finalfuture.set(next(batchable))
95 95
96 96 # Verify our @batchable generators only emit 2 values.
97 97 try:
98 98 next(batchable)
99 99 except StopIteration:
100 100 pass
101 101 else:
102 102 raise error.ProgrammingError('%s @batchable generator emitted '
103 103 'unexpected value count' % command)
104 104
105 105 yield finalfuture.value
106 106
107 107 # Forward a couple of names from peer to make wireproto interactions
108 108 # slightly more sensible.
109 109 batchable = peer.batchable
110 110 future = peer.future
111 111
112 112 # list of nodes encoding / decoding
113 113
114 114 def decodelist(l, sep=' '):
115 115 if l:
116 116 return [bin(v) for v in l.split(sep)]
117 117 return []
118 118
119 119 def encodelist(l, sep=' '):
120 120 try:
121 121 return sep.join(map(hex, l))
122 122 except TypeError:
123 123 raise
124 124
125 125 # batched call argument encoding
126 126
127 127 def escapearg(plain):
128 128 return (plain
129 129 .replace(':', ':c')
130 130 .replace(',', ':o')
131 131 .replace(';', ':s')
132 132 .replace('=', ':e'))
133 133
134 134 def unescapearg(escaped):
135 135 return (escaped
136 136 .replace(':e', '=')
137 137 .replace(':s', ';')
138 138 .replace(':o', ',')
139 139 .replace(':c', ':'))
140 140
141 141 def encodebatchcmds(req):
142 142 """Return a ``cmds`` argument value for the ``batch`` command."""
143 143 cmds = []
144 144 for op, argsdict in req:
145 145 # Old servers didn't properly unescape argument names. So prevent
146 146 # the sending of argument names that may not be decoded properly by
147 147 # servers.
148 148 assert all(escapearg(k) == k for k in argsdict)
149 149
150 150 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
151 151 for k, v in argsdict.iteritems())
152 152 cmds.append('%s %s' % (op, args))
153 153
154 154 return ';'.join(cmds)
155 155
156 156 # mapping of options accepted by getbundle and their types
157 157 #
158 158 # Meant to be extended by extensions. It is extensions responsibility to ensure
159 159 # such options are properly processed in exchange.getbundle.
160 160 #
161 161 # supported types are:
162 162 #
163 163 # :nodes: list of binary nodes
164 164 # :csv: list of comma-separated values
165 165 # :scsv: list of comma-separated values return as set
166 166 # :plain: string with no transformation needed.
167 167 gboptsmap = {'heads': 'nodes',
168 168 'bookmarks': 'boolean',
169 169 'common': 'nodes',
170 170 'obsmarkers': 'boolean',
171 171 'phases': 'boolean',
172 172 'bundlecaps': 'scsv',
173 173 'listkeys': 'csv',
174 174 'cg': 'boolean',
175 175 'cbattempted': 'boolean',
176 176 'stream': 'boolean',
177 177 }
178 178
179 179 # client side
180 180
181 181 class wirepeer(repository.legacypeer):
182 182 """Client-side interface for communicating with a peer repository.
183 183
184 184 Methods commonly call wire protocol commands of the same name.
185 185
186 186 See also httppeer.py and sshpeer.py for protocol-specific
187 187 implementations of this interface.
188 188 """
189 189 # Begin of basewirepeer interface.
190 190
191 191 def iterbatch(self):
192 192 return remoteiterbatcher(self)
193 193
194 194 @batchable
195 195 def lookup(self, key):
196 196 self.requirecap('lookup', _('look up remote revision'))
197 197 f = future()
198 198 yield {'key': encoding.fromlocal(key)}, f
199 199 d = f.value
200 200 success, data = d[:-1].split(" ", 1)
201 201 if int(success):
202 202 yield bin(data)
203 203 else:
204 204 self._abort(error.RepoError(data))
205 205
206 206 @batchable
207 207 def heads(self):
208 208 f = future()
209 209 yield {}, f
210 210 d = f.value
211 211 try:
212 212 yield decodelist(d[:-1])
213 213 except ValueError:
214 214 self._abort(error.ResponseError(_("unexpected response:"), d))
215 215
216 216 @batchable
217 217 def known(self, nodes):
218 218 f = future()
219 219 yield {'nodes': encodelist(nodes)}, f
220 220 d = f.value
221 221 try:
222 222 yield [bool(int(b)) for b in d]
223 223 except ValueError:
224 224 self._abort(error.ResponseError(_("unexpected response:"), d))
225 225
226 226 @batchable
227 227 def branchmap(self):
228 228 f = future()
229 229 yield {}, f
230 230 d = f.value
231 231 try:
232 232 branchmap = {}
233 233 for branchpart in d.splitlines():
234 234 branchname, branchheads = branchpart.split(' ', 1)
235 235 branchname = encoding.tolocal(urlreq.unquote(branchname))
236 236 branchheads = decodelist(branchheads)
237 237 branchmap[branchname] = branchheads
238 238 yield branchmap
239 239 except TypeError:
240 240 self._abort(error.ResponseError(_("unexpected response:"), d))
241 241
242 242 @batchable
243 243 def listkeys(self, namespace):
244 244 if not self.capable('pushkey'):
245 245 yield {}, None
246 246 f = future()
247 247 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
248 248 yield {'namespace': encoding.fromlocal(namespace)}, f
249 249 d = f.value
250 250 self.ui.debug('received listkey for "%s": %i bytes\n'
251 251 % (namespace, len(d)))
252 252 yield pushkeymod.decodekeys(d)
253 253
254 254 @batchable
255 255 def pushkey(self, namespace, key, old, new):
256 256 if not self.capable('pushkey'):
257 257 yield False, None
258 258 f = future()
259 259 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
260 260 yield {'namespace': encoding.fromlocal(namespace),
261 261 'key': encoding.fromlocal(key),
262 262 'old': encoding.fromlocal(old),
263 263 'new': encoding.fromlocal(new)}, f
264 264 d = f.value
265 265 d, output = d.split('\n', 1)
266 266 try:
267 267 d = bool(int(d))
268 268 except ValueError:
269 269 raise error.ResponseError(
270 270 _('push failed (unexpected response):'), d)
271 271 for l in output.splitlines(True):
272 272 self.ui.status(_('remote: '), l)
273 273 yield d
274 274
275 275 def stream_out(self):
276 276 return self._callstream('stream_out')
277 277
278 278 def getbundle(self, source, **kwargs):
279 279 kwargs = pycompat.byteskwargs(kwargs)
280 280 self.requirecap('getbundle', _('look up remote changes'))
281 281 opts = {}
282 282 bundlecaps = kwargs.get('bundlecaps')
283 283 if bundlecaps is not None:
284 284 kwargs['bundlecaps'] = sorted(bundlecaps)
285 285 else:
286 286 bundlecaps = () # kwargs could have it to None
287 287 for key, value in kwargs.iteritems():
288 288 if value is None:
289 289 continue
290 290 keytype = gboptsmap.get(key)
291 291 if keytype is None:
292 292 raise error.ProgrammingError(
293 293 'Unexpectedly None keytype for key %s' % key)
294 294 elif keytype == 'nodes':
295 295 value = encodelist(value)
296 296 elif keytype in ('csv', 'scsv'):
297 297 value = ','.join(value)
298 298 elif keytype == 'boolean':
299 299 value = '%i' % bool(value)
300 300 elif keytype != 'plain':
301 301 raise KeyError('unknown getbundle option type %s'
302 302 % keytype)
303 303 opts[key] = value
304 304 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
305 305 if any((cap.startswith('HG2') for cap in bundlecaps)):
306 306 return bundle2.getunbundler(self.ui, f)
307 307 else:
308 308 return changegroupmod.cg1unpacker(f, 'UN')
309 309
310 310 def unbundle(self, cg, heads, url):
311 311 '''Send cg (a readable file-like object representing the
312 312 changegroup to push, typically a chunkbuffer object) to the
313 313 remote server as a bundle.
314 314
315 315 When pushing a bundle10 stream, return an integer indicating the
316 316 result of the push (see changegroup.apply()).
317 317
318 318 When pushing a bundle20 stream, return a bundle20 stream.
319 319
320 320 `url` is the url the client thinks it's pushing to, which is
321 321 visible to hooks.
322 322 '''
323 323
324 324 if heads != ['force'] and self.capable('unbundlehash'):
325 325 heads = encodelist(['hashed',
326 326 hashlib.sha1(''.join(sorted(heads))).digest()])
327 327 else:
328 328 heads = encodelist(heads)
329 329
330 330 if util.safehasattr(cg, 'deltaheader'):
331 331 # this a bundle10, do the old style call sequence
332 332 ret, output = self._callpush("unbundle", cg, heads=heads)
333 333 if ret == "":
334 334 raise error.ResponseError(
335 335 _('push failed:'), output)
336 336 try:
337 337 ret = int(ret)
338 338 except ValueError:
339 339 raise error.ResponseError(
340 340 _('push failed (unexpected response):'), ret)
341 341
342 342 for l in output.splitlines(True):
343 343 self.ui.status(_('remote: '), l)
344 344 else:
345 345 # bundle2 push. Send a stream, fetch a stream.
346 346 stream = self._calltwowaystream('unbundle', cg, heads=heads)
347 347 ret = bundle2.getunbundler(self.ui, stream)
348 348 return ret
349 349
350 350 # End of basewirepeer interface.
351 351
352 352 # Begin of baselegacywirepeer interface.
353 353
354 354 def branches(self, nodes):
355 355 n = encodelist(nodes)
356 356 d = self._call("branches", nodes=n)
357 357 try:
358 358 br = [tuple(decodelist(b)) for b in d.splitlines()]
359 359 return br
360 360 except ValueError:
361 361 self._abort(error.ResponseError(_("unexpected response:"), d))
362 362
363 363 def between(self, pairs):
364 364 batch = 8 # avoid giant requests
365 365 r = []
366 366 for i in xrange(0, len(pairs), batch):
367 367 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
368 368 d = self._call("between", pairs=n)
369 369 try:
370 370 r.extend(l and decodelist(l) or [] for l in d.splitlines())
371 371 except ValueError:
372 372 self._abort(error.ResponseError(_("unexpected response:"), d))
373 373 return r
374 374
375 375 def changegroup(self, nodes, kind):
376 376 n = encodelist(nodes)
377 377 f = self._callcompressable("changegroup", roots=n)
378 378 return changegroupmod.cg1unpacker(f, 'UN')
379 379
380 380 def changegroupsubset(self, bases, heads, kind):
381 381 self.requirecap('changegroupsubset', _('look up remote changes'))
382 382 bases = encodelist(bases)
383 383 heads = encodelist(heads)
384 384 f = self._callcompressable("changegroupsubset",
385 385 bases=bases, heads=heads)
386 386 return changegroupmod.cg1unpacker(f, 'UN')
387 387
388 388 # End of baselegacywirepeer interface.
389 389
390 390 def _submitbatch(self, req):
391 391 """run batch request <req> on the server
392 392
393 393 Returns an iterator of the raw responses from the server.
394 394 """
395 395 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
396 396 chunk = rsp.read(1024)
397 397 work = [chunk]
398 398 while chunk:
399 399 while ';' not in chunk and chunk:
400 400 chunk = rsp.read(1024)
401 401 work.append(chunk)
402 402 merged = ''.join(work)
403 403 while ';' in merged:
404 404 one, merged = merged.split(';', 1)
405 405 yield unescapearg(one)
406 406 chunk = rsp.read(1024)
407 407 work = [merged, chunk]
408 408 yield unescapearg(''.join(work))
409 409
410 410 def _submitone(self, op, args):
411 411 return self._call(op, **pycompat.strkwargs(args))
412 412
413 413 def debugwireargs(self, one, two, three=None, four=None, five=None):
414 414 # don't pass optional arguments left at their default value
415 415 opts = {}
416 416 if three is not None:
417 417 opts[r'three'] = three
418 418 if four is not None:
419 419 opts[r'four'] = four
420 420 return self._call('debugwireargs', one=one, two=two, **opts)
421 421
422 422 def _call(self, cmd, **args):
423 423 """execute <cmd> on the server
424 424
425 425 The command is expected to return a simple string.
426 426
427 427 returns the server reply as a string."""
428 428 raise NotImplementedError()
429 429
430 430 def _callstream(self, cmd, **args):
431 431 """execute <cmd> on the server
432 432
433 433 The command is expected to return a stream. Note that if the
434 434 command doesn't return a stream, _callstream behaves
435 435 differently for ssh and http peers.
436 436
437 437 returns the server reply as a file like object.
438 438 """
439 439 raise NotImplementedError()
440 440
441 441 def _callcompressable(self, cmd, **args):
442 442 """execute <cmd> on the server
443 443
444 444 The command is expected to return a stream.
445 445
446 446 The stream may have been compressed in some implementations. This
447 447 function takes care of the decompression. This is the only difference
448 448 with _callstream.
449 449
450 450 returns the server reply as a file like object.
451 451 """
452 452 raise NotImplementedError()
453 453
454 454 def _callpush(self, cmd, fp, **args):
455 455 """execute a <cmd> on server
456 456
457 457 The command is expected to be related to a push. Push has a special
458 458 return method.
459 459
460 460 returns the server reply as a (ret, output) tuple. ret is either
461 461 empty (error) or a stringified int.
462 462 """
463 463 raise NotImplementedError()
464 464
465 465 def _calltwowaystream(self, cmd, fp, **args):
466 466 """execute <cmd> on server
467 467
468 468 The command will send a stream to the server and get a stream in reply.
469 469 """
470 470 raise NotImplementedError()
471 471
472 472 def _abort(self, exception):
473 473 """clearly abort the wire protocol connection and raise the exception
474 474 """
475 475 raise NotImplementedError()
476 476
477 477 # server side
478 478
479 479 # wire protocol command can either return a string or one of these classes.
480 480 class streamres(object):
481 481 """wireproto reply: binary stream
482 482
483 483 The call was successful and the result is a stream.
484 484
485 485 Accepts a generator containing chunks of data to be sent to the client.
486 486
487 487 ``prefer_uncompressed`` indicates that the data is expected to be
488 488 uncompressable and that the stream should therefore use the ``none``
489 489 engine.
490 490 """
491 491 def __init__(self, gen=None, prefer_uncompressed=False):
492 492 self.gen = gen
493 493 self.prefer_uncompressed = prefer_uncompressed
494 494
495 495 class streamres_legacy(object):
496 496 """wireproto reply: uncompressed binary stream
497 497
498 498 The call was successful and the result is a stream.
499 499
500 500 Accepts a generator containing chunks of data to be sent to the client.
501 501
502 502 Like ``streamres``, but sends an uncompressed data for "version 1" clients
503 503 using the application/mercurial-0.1 media type.
504 504 """
505 505 def __init__(self, gen=None):
506 506 self.gen = gen
507 507
508 508 class pushres(object):
509 509 """wireproto reply: success with simple integer return
510 510
511 511 The call was successful and returned an integer contained in `self.res`.
512 512 """
513 513 def __init__(self, res, output):
514 514 self.res = res
515 515 self.output = output
516 516
517 517 class pusherr(object):
518 518 """wireproto reply: failure
519 519
520 520 The call failed. The `self.res` attribute contains the error message.
521 521 """
522 522 def __init__(self, res, output):
523 523 self.res = res
524 524 self.output = output
525 525
526 526 class ooberror(object):
527 527 """wireproto reply: failure of a batch of operation
528 528
529 529 Something failed during a batch call. The error message is stored in
530 530 `self.message`.
531 531 """
532 532 def __init__(self, message):
533 533 self.message = message
534 534
535 535 def getdispatchrepo(repo, proto, command):
536 536 """Obtain the repo used for processing wire protocol commands.
537 537
538 538 The intent of this function is to serve as a monkeypatch point for
539 539 extensions that need commands to operate on different repo views under
540 540 specialized circumstances.
541 541 """
542 542 return repo.filtered('served')
543 543
544 544 def dispatch(repo, proto, command):
545 545 repo = getdispatchrepo(repo, proto, command)
546 546 func, spec = commands[command]
547 547 args = proto.getargs(spec)
548 548 return func(repo, proto, *args)
549 549
550 550 def options(cmd, keys, others):
551 551 opts = {}
552 552 for k in keys:
553 553 if k in others:
554 554 opts[k] = others[k]
555 555 del others[k]
556 556 if others:
557 557 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
558 558 % (cmd, ",".join(others)))
559 559 return opts
560 560
561 561 def bundle1allowed(repo, action):
562 562 """Whether a bundle1 operation is allowed from the server.
563 563
564 564 Priority is:
565 565
566 566 1. server.bundle1gd.<action> (if generaldelta active)
567 567 2. server.bundle1.<action>
568 568 3. server.bundle1gd (if generaldelta active)
569 569 4. server.bundle1
570 570 """
571 571 ui = repo.ui
572 572 gd = 'generaldelta' in repo.requirements
573 573
574 574 if gd:
575 575 v = ui.configbool('server', 'bundle1gd.%s' % action)
576 576 if v is not None:
577 577 return v
578 578
579 579 v = ui.configbool('server', 'bundle1.%s' % action)
580 580 if v is not None:
581 581 return v
582 582
583 583 if gd:
584 584 v = ui.configbool('server', 'bundle1gd')
585 585 if v is not None:
586 586 return v
587 587
588 588 return ui.configbool('server', 'bundle1')
589 589
590 590 def supportedcompengines(ui, proto, role):
591 591 """Obtain the list of supported compression engines for a request."""
592 592 assert role in (util.CLIENTROLE, util.SERVERROLE)
593 593
594 594 compengines = util.compengines.supportedwireengines(role)
595 595
596 596 # Allow config to override default list and ordering.
597 597 if role == util.SERVERROLE:
598 598 configengines = ui.configlist('server', 'compressionengines')
599 599 config = 'server.compressionengines'
600 600 else:
601 601 # This is currently implemented mainly to facilitate testing. In most
602 602 # cases, the server should be in charge of choosing a compression engine
603 603 # because a server has the most to lose from a sub-optimal choice. (e.g.
604 604 # CPU DoS due to an expensive engine or a network DoS due to poor
605 605 # compression ratio).
606 606 configengines = ui.configlist('experimental',
607 607 'clientcompressionengines')
608 608 config = 'experimental.clientcompressionengines'
609 609
610 610 # No explicit config. Filter out the ones that aren't supposed to be
611 611 # advertised and return default ordering.
612 612 if not configengines:
613 613 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
614 614 return [e for e in compengines
615 615 if getattr(e.wireprotosupport(), attr) > 0]
616 616
617 617 # If compression engines are listed in the config, assume there is a good
618 618 # reason for it (like server operators wanting to achieve specific
619 619 # performance characteristics). So fail fast if the config references
620 620 # unusable compression engines.
621 621 validnames = set(e.name() for e in compengines)
622 622 invalidnames = set(e for e in configengines if e not in validnames)
623 623 if invalidnames:
624 624 raise error.Abort(_('invalid compression engine defined in %s: %s') %
625 625 (config, ', '.join(sorted(invalidnames))))
626 626
627 627 compengines = [e for e in compengines if e.name() in configengines]
628 628 compengines = sorted(compengines,
629 629 key=lambda e: configengines.index(e.name()))
630 630
631 631 if not compengines:
632 632 raise error.Abort(_('%s config option does not specify any known '
633 633 'compression engines') % config,
634 634 hint=_('usable compression engines: %s') %
635 635 ', '.sorted(validnames))
636 636
637 637 return compengines
638 638
639 639 class commandentry(object):
640 640 """Represents a declared wire protocol command."""
641 641 def __init__(self, func, args=''):
642 642 self.func = func
643 643 self.args = args
644 644
645 645 def _merge(self, func, args):
646 646 """Merge this instance with an incoming 2-tuple.
647 647
648 648 This is called when a caller using the old 2-tuple API attempts
649 649 to replace an instance. The incoming values are merged with
650 650 data not captured by the 2-tuple and a new instance containing
651 651 the union of the two objects is returned.
652 652 """
653 653 return commandentry(func, args)
654 654
655 655 # Old code treats instances as 2-tuples. So expose that interface.
656 656 def __iter__(self):
657 657 yield self.func
658 658 yield self.args
659 659
660 660 def __getitem__(self, i):
661 661 if i == 0:
662 662 return self.func
663 663 elif i == 1:
664 664 return self.args
665 665 else:
666 666 raise IndexError('can only access elements 0 and 1')
667 667
668 668 class commanddict(dict):
669 669 """Container for registered wire protocol commands.
670 670
671 671 It behaves like a dict. But __setitem__ is overwritten to allow silent
672 672 coercion of values from 2-tuples for API compatibility.
673 673 """
674 674 def __setitem__(self, k, v):
675 675 if isinstance(v, commandentry):
676 676 pass
677 677 # Cast 2-tuples to commandentry instances.
678 678 elif isinstance(v, tuple):
679 679 if len(v) != 2:
680 680 raise ValueError('command tuples must have exactly 2 elements')
681 681
682 682 # It is common for extensions to wrap wire protocol commands via
683 683 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
684 684 # doing this aren't aware of the new API that uses objects to store
685 685 # command entries, we automatically merge old state with new.
686 686 if k in self:
687 687 v = self[k]._merge(v[0], v[1])
688 688 else:
689 689 v = commandentry(v[0], v[1])
690 690 else:
691 691 raise ValueError('command entries must be commandentry instances '
692 692 'or 2-tuples')
693 693
694 694 return super(commanddict, self).__setitem__(k, v)
695 695
696 696 def commandavailable(self, command, proto):
697 697 """Determine if a command is available for the requested protocol."""
698 698 # For now, commands are available for all protocols. So do a simple
699 699 # membership test.
700 700 return command in self
701 701
702 702 commands = commanddict()
703 703
704 704 def wireprotocommand(name, args=''):
705 705 """Decorator to declare a wire protocol command.
706 706
707 707 ``name`` is the name of the wire protocol command being provided.
708 708
709 709 ``args`` is a space-delimited list of named arguments that the command
710 710 accepts. ``*`` is a special value that says to accept all arguments.
711 711 """
712 712 def register(func):
713 713 commands[name] = commandentry(func, args)
714 714 return func
715 715 return register
716 716
717 717 @wireprotocommand('batch', 'cmds *')
718 718 def batch(repo, proto, cmds, others):
719 719 repo = repo.filtered("served")
720 720 res = []
721 721 for pair in cmds.split(';'):
722 722 op, args = pair.split(' ', 1)
723 723 vals = {}
724 724 for a in args.split(','):
725 725 if a:
726 726 n, v = a.split('=')
727 727 vals[unescapearg(n)] = unescapearg(v)
728 728 func, spec = commands[op]
729 729 if spec:
730 730 keys = spec.split()
731 731 data = {}
732 732 for k in keys:
733 733 if k == '*':
734 734 star = {}
735 735 for key in vals.keys():
736 736 if key not in keys:
737 737 star[key] = vals[key]
738 738 data['*'] = star
739 739 else:
740 740 data[k] = vals[k]
741 741 result = func(repo, proto, *[data[k] for k in keys])
742 742 else:
743 743 result = func(repo, proto)
744 744 if isinstance(result, ooberror):
745 745 return result
746 746 res.append(escapearg(result))
747 747 return ';'.join(res)
748 748
749 749 @wireprotocommand('between', 'pairs')
750 750 def between(repo, proto, pairs):
751 751 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
752 752 r = []
753 753 for b in repo.between(pairs):
754 754 r.append(encodelist(b) + "\n")
755 755 return "".join(r)
756 756
757 757 @wireprotocommand('branchmap')
758 758 def branchmap(repo, proto):
759 759 branchmap = repo.branchmap()
760 760 heads = []
761 761 for branch, nodes in branchmap.iteritems():
762 762 branchname = urlreq.quote(encoding.fromlocal(branch))
763 763 branchnodes = encodelist(nodes)
764 764 heads.append('%s %s' % (branchname, branchnodes))
765 765 return '\n'.join(heads)
766 766
767 767 @wireprotocommand('branches', 'nodes')
768 768 def branches(repo, proto, nodes):
769 769 nodes = decodelist(nodes)
770 770 r = []
771 771 for b in repo.branches(nodes):
772 772 r.append(encodelist(b) + "\n")
773 773 return "".join(r)
774 774
775 775 @wireprotocommand('clonebundles', '')
776 776 def clonebundles(repo, proto):
777 777 """Server command for returning info for available bundles to seed clones.
778 778
779 779 Clients will parse this response and determine what bundle to fetch.
780 780
781 781 Extensions may wrap this command to filter or dynamically emit data
782 782 depending on the request. e.g. you could advertise URLs for the closest
783 783 data center given the client's IP address.
784 784 """
785 785 return repo.vfs.tryread('clonebundles.manifest')
786 786
787 787 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
788 788 'known', 'getbundle', 'unbundlehash', 'batch']
789 789
790 790 def _capabilities(repo, proto):
791 791 """return a list of capabilities for a repo
792 792
793 793 This function exists to allow extensions to easily wrap capabilities
794 794 computation
795 795
796 796 - returns a lists: easy to alter
797 797 - change done here will be propagated to both `capabilities` and `hello`
798 798 command without any other action needed.
799 799 """
800 800 # copy to prevent modification of the global list
801 801 caps = list(wireprotocaps)
802 802 if streamclone.allowservergeneration(repo):
803 803 if repo.ui.configbool('server', 'preferuncompressed'):
804 804 caps.append('stream-preferred')
805 805 requiredformats = repo.requirements & repo.supportedformats
806 806 # if our local revlogs are just revlogv1, add 'stream' cap
807 807 if not requiredformats - {'revlogv1'}:
808 808 caps.append('stream')
809 809 # otherwise, add 'streamreqs' detailing our local revlog format
810 810 else:
811 811 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
812 812 if repo.ui.configbool('experimental', 'bundle2-advertise'):
813 813 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
814 814 caps.append('bundle2=' + urlreq.quote(capsblob))
815 815 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
816 816
817 817 if proto.name == 'http':
818 818 caps.append('httpheader=%d' %
819 819 repo.ui.configint('server', 'maxhttpheaderlen'))
820 820 if repo.ui.configbool('experimental', 'httppostargs'):
821 821 caps.append('httppostargs')
822 822
823 823 # FUTURE advertise 0.2rx once support is implemented
824 824 # FUTURE advertise minrx and mintx after consulting config option
825 825 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
826 826
827 827 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
828 828 if compengines:
829 829 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
830 830 for e in compengines)
831 831 caps.append('compression=%s' % comptypes)
832 832
833 833 return caps
834 834
835 835 # If you are writing an extension and consider wrapping this function. Wrap
836 836 # `_capabilities` instead.
837 837 @wireprotocommand('capabilities')
838 838 def capabilities(repo, proto):
839 839 return ' '.join(_capabilities(repo, proto))
840 840
841 841 @wireprotocommand('changegroup', 'roots')
842 842 def changegroup(repo, proto, roots):
843 843 nodes = decodelist(roots)
844 844 outgoing = discovery.outgoing(repo, missingroots=nodes,
845 845 missingheads=repo.heads())
846 846 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
847 847 gen = iter(lambda: cg.read(32768), '')
848 848 return streamres(gen=gen)
849 849
850 850 @wireprotocommand('changegroupsubset', 'bases heads')
851 851 def changegroupsubset(repo, proto, bases, heads):
852 852 bases = decodelist(bases)
853 853 heads = decodelist(heads)
854 854 outgoing = discovery.outgoing(repo, missingroots=bases,
855 855 missingheads=heads)
856 856 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
857 857 gen = iter(lambda: cg.read(32768), '')
858 858 return streamres(gen=gen)
859 859
860 860 @wireprotocommand('debugwireargs', 'one two *')
861 861 def debugwireargs(repo, proto, one, two, others):
862 862 # only accept optional args from the known set
863 863 opts = options('debugwireargs', ['three', 'four'], others)
864 864 return repo.debugwireargs(one, two, **pycompat.strkwargs(opts))
865 865
866 866 @wireprotocommand('getbundle', '*')
867 867 def getbundle(repo, proto, others):
868 868 opts = options('getbundle', gboptsmap.keys(), others)
869 869 for k, v in opts.iteritems():
870 870 keytype = gboptsmap[k]
871 871 if keytype == 'nodes':
872 872 opts[k] = decodelist(v)
873 873 elif keytype == 'csv':
874 874 opts[k] = list(v.split(','))
875 875 elif keytype == 'scsv':
876 876 opts[k] = set(v.split(','))
877 877 elif keytype == 'boolean':
878 878 # Client should serialize False as '0', which is a non-empty string
879 879 # so it evaluates as a True bool.
880 880 if v == '0':
881 881 opts[k] = False
882 882 else:
883 883 opts[k] = bool(v)
884 884 elif keytype != 'plain':
885 885 raise KeyError('unknown getbundle option type %s'
886 886 % keytype)
887 887
888 888 if not bundle1allowed(repo, 'pull'):
889 889 if not exchange.bundle2requested(opts.get('bundlecaps')):
890 890 if proto.name == 'http':
891 891 return ooberror(bundle2required)
892 892 raise error.Abort(bundle2requiredmain,
893 893 hint=bundle2requiredhint)
894 894
895 895 prefercompressed = True
896 896
897 897 try:
898 898 if repo.ui.configbool('server', 'disablefullbundle'):
899 899 # Check to see if this is a full clone.
900 900 clheads = set(repo.changelog.heads())
901 901 changegroup = opts.get('cg', True)
902 902 heads = set(opts.get('heads', set()))
903 903 common = set(opts.get('common', set()))
904 904 common.discard(nullid)
905 905 if changegroup and not common and clheads == heads:
906 906 raise error.Abort(
907 907 _('server has pull-based clones disabled'),
908 908 hint=_('remove --pull if specified or upgrade Mercurial'))
909 909
910 910 info, chunks = exchange.getbundlechunks(repo, 'serve',
911 911 **pycompat.strkwargs(opts))
912 912 prefercompressed = info.get('prefercompressed', True)
913 913 except error.Abort as exc:
914 914 # cleanly forward Abort error to the client
915 915 if not exchange.bundle2requested(opts.get('bundlecaps')):
916 916 if proto.name == 'http':
917 917 return ooberror(str(exc) + '\n')
918 918 raise # cannot do better for bundle1 + ssh
919 919 # bundle2 request expect a bundle2 reply
920 920 bundler = bundle2.bundle20(repo.ui)
921 921 manargs = [('message', str(exc))]
922 922 advargs = []
923 923 if exc.hint is not None:
924 924 advargs.append(('hint', exc.hint))
925 925 bundler.addpart(bundle2.bundlepart('error:abort',
926 926 manargs, advargs))
927 927 chunks = bundler.getchunks()
928 928 prefercompressed = False
929 929
930 930 return streamres(gen=chunks, prefer_uncompressed=not prefercompressed)
931 931
932 932 @wireprotocommand('heads')
933 933 def heads(repo, proto):
934 934 h = repo.heads()
935 935 return encodelist(h) + "\n"
936 936
937 937 @wireprotocommand('hello')
938 938 def hello(repo, proto):
939 939 '''the hello command returns a set of lines describing various
940 940 interesting things about the server, in an RFC822-like format.
941 941 Currently the only one defined is "capabilities", which
942 942 consists of a line in the form:
943 943
944 944 capabilities: space separated list of tokens
945 945 '''
946 946 return "capabilities: %s\n" % (capabilities(repo, proto))
947 947
948 948 @wireprotocommand('listkeys', 'namespace')
949 949 def listkeys(repo, proto, namespace):
950 950 d = repo.listkeys(encoding.tolocal(namespace)).items()
951 951 return pushkeymod.encodekeys(d)
952 952
953 953 @wireprotocommand('lookup', 'key')
954 954 def lookup(repo, proto, key):
955 955 try:
956 956 k = encoding.tolocal(key)
957 957 c = repo[k]
958 958 r = c.hex()
959 959 success = 1
960 960 except Exception as inst:
961 961 r = str(inst)
962 962 success = 0
963 963 return "%d %s\n" % (success, r)
964 964
965 965 @wireprotocommand('known', 'nodes *')
966 966 def known(repo, proto, nodes, others):
967 967 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
968 968
969 969 @wireprotocommand('pushkey', 'namespace key old new')
970 970 def pushkey(repo, proto, namespace, key, old, new):
971 971 # compatibility with pre-1.8 clients which were accidentally
972 972 # sending raw binary nodes rather than utf-8-encoded hex
973 973 if len(new) == 20 and util.escapestr(new) != new:
974 974 # looks like it could be a binary node
975 975 try:
976 976 new.decode('utf-8')
977 977 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
978 978 except UnicodeDecodeError:
979 979 pass # binary, leave unmodified
980 980 else:
981 981 new = encoding.tolocal(new) # normal path
982 982
983 983 with proto.mayberedirectstdio() as output:
984 984 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
985 985 encoding.tolocal(old), new) or False
986 986
987 987 output = output.getvalue() if output else ''
988 988 return '%s\n%s' % (int(r), output)
989 989
990 990 @wireprotocommand('stream_out')
991 991 def stream(repo, proto):
992 992 '''If the server supports streaming clone, it advertises the "stream"
993 993 capability with a value representing the version and flags of the repo
994 994 it is serving. Client checks to see if it understands the format.
995 995 '''
996 996 return streamres_legacy(streamclone.generatev1wireproto(repo))
997 997
998 998 @wireprotocommand('unbundle', 'heads')
999 999 def unbundle(repo, proto, heads):
1000 1000 their_heads = decodelist(heads)
1001 1001
1002 1002 with proto.mayberedirectstdio() as output:
1003 1003 try:
1004 1004 exchange.check_heads(repo, their_heads, 'preparing changes')
1005 1005
1006 1006 # write bundle data to temporary file because it can be big
1007 1007 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1008 1008 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
1009 1009 r = 0
1010 1010 try:
1011 1011 proto.getfile(fp)
1012 1012 fp.seek(0)
1013 1013 gen = exchange.readbundle(repo.ui, fp, None)
1014 1014 if (isinstance(gen, changegroupmod.cg1unpacker)
1015 1015 and not bundle1allowed(repo, 'push')):
1016 1016 if proto.name == 'http':
1017 1017 # need to special case http because stderr do not get to
1018 1018 # the http client on failed push so we need to abuse
1019 1019 # some other error type to make sure the message get to
1020 1020 # the user.
1021 1021 return ooberror(bundle2required)
1022 1022 raise error.Abort(bundle2requiredmain,
1023 1023 hint=bundle2requiredhint)
1024 1024
1025 1025 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1026 proto._client())
1026 proto.client())
1027 1027 if util.safehasattr(r, 'addpart'):
1028 1028 # The return looks streamable, we are in the bundle2 case
1029 1029 # and should return a stream.
1030 1030 return streamres_legacy(gen=r.getchunks())
1031 1031 return pushres(r, output.getvalue() if output else '')
1032 1032
1033 1033 finally:
1034 1034 fp.close()
1035 1035 os.unlink(tempname)
1036 1036
1037 1037 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1038 1038 # handle non-bundle2 case first
1039 1039 if not getattr(exc, 'duringunbundle2', False):
1040 1040 try:
1041 1041 raise
1042 1042 except error.Abort:
1043 1043 # The old code we moved used util.stderr directly.
1044 1044 # We did not change it to minimise code change.
1045 1045 # This need to be moved to something proper.
1046 1046 # Feel free to do it.
1047 1047 util.stderr.write("abort: %s\n" % exc)
1048 1048 if exc.hint is not None:
1049 1049 util.stderr.write("(%s)\n" % exc.hint)
1050 1050 return pushres(0, output.getvalue() if output else '')
1051 1051 except error.PushRaced:
1052 1052 return pusherr(str(exc),
1053 1053 output.getvalue() if output else '')
1054 1054
1055 1055 bundler = bundle2.bundle20(repo.ui)
1056 1056 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1057 1057 bundler.addpart(out)
1058 1058 try:
1059 1059 try:
1060 1060 raise
1061 1061 except error.PushkeyFailed as exc:
1062 1062 # check client caps
1063 1063 remotecaps = getattr(exc, '_replycaps', None)
1064 1064 if (remotecaps is not None
1065 1065 and 'pushkey' not in remotecaps.get('error', ())):
1066 1066 # no support remote side, fallback to Abort handler.
1067 1067 raise
1068 1068 part = bundler.newpart('error:pushkey')
1069 1069 part.addparam('in-reply-to', exc.partid)
1070 1070 if exc.namespace is not None:
1071 1071 part.addparam('namespace', exc.namespace,
1072 1072 mandatory=False)
1073 1073 if exc.key is not None:
1074 1074 part.addparam('key', exc.key, mandatory=False)
1075 1075 if exc.new is not None:
1076 1076 part.addparam('new', exc.new, mandatory=False)
1077 1077 if exc.old is not None:
1078 1078 part.addparam('old', exc.old, mandatory=False)
1079 1079 if exc.ret is not None:
1080 1080 part.addparam('ret', exc.ret, mandatory=False)
1081 1081 except error.BundleValueError as exc:
1082 1082 errpart = bundler.newpart('error:unsupportedcontent')
1083 1083 if exc.parttype is not None:
1084 1084 errpart.addparam('parttype', exc.parttype)
1085 1085 if exc.params:
1086 1086 errpart.addparam('params', '\0'.join(exc.params))
1087 1087 except error.Abort as exc:
1088 1088 manargs = [('message', str(exc))]
1089 1089 advargs = []
1090 1090 if exc.hint is not None:
1091 1091 advargs.append(('hint', exc.hint))
1092 1092 bundler.addpart(bundle2.bundlepart('error:abort',
1093 1093 manargs, advargs))
1094 1094 except error.PushRaced as exc:
1095 1095 bundler.newpart('error:pushraced', [('message', str(exc))])
1096 1096 return streamres_legacy(gen=bundler.getchunks())
@@ -1,450 +1,454 b''
1 1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 3 #
4 4 # This software may be used and distributed according to the terms of the
5 5 # GNU General Public License version 2 or any later version.
6 6
7 7 from __future__ import absolute_import
8 8
9 9 import abc
10 10 import cgi
11 11 import contextlib
12 12 import struct
13 13 import sys
14 14
15 15 from .i18n import _
16 16 from . import (
17 17 encoding,
18 18 error,
19 19 hook,
20 20 pycompat,
21 21 util,
22 22 wireproto,
23 23 )
24 24
25 25 stringio = util.stringio
26 26
27 27 urlerr = util.urlerr
28 28 urlreq = util.urlreq
29 29
30 30 HTTP_OK = 200
31 31
32 32 HGTYPE = 'application/mercurial-0.1'
33 33 HGTYPE2 = 'application/mercurial-0.2'
34 34 HGERRTYPE = 'application/hg-error'
35 35
36 36 # Names of the SSH protocol implementations.
37 37 SSHV1 = 'ssh-v1'
38 38 # This is advertised over the wire. Incremental the counter at the end
39 39 # to reflect BC breakages.
40 40 SSHV2 = 'exp-ssh-v2-0001'
41 41
42 42 class baseprotocolhandler(object):
43 43 """Abstract base class for wire protocol handlers.
44 44
45 45 A wire protocol handler serves as an interface between protocol command
46 46 handlers and the wire protocol transport layer. Protocol handlers provide
47 47 methods to read command arguments, redirect stdio for the duration of
48 48 the request, handle response types, etc.
49 49 """
50 50
51 51 __metaclass__ = abc.ABCMeta
52 52
53 53 @abc.abstractproperty
54 54 def name(self):
55 55 """The name of the protocol implementation.
56 56
57 57 Used for uniquely identifying the transport type.
58 58 """
59 59
60 60 @abc.abstractmethod
61 61 def getargs(self, args):
62 62 """return the value for arguments in <args>
63 63
64 64 returns a list of values (same order as <args>)"""
65 65
66 66 @abc.abstractmethod
67 67 def getfile(self, fp):
68 68 """write the whole content of a file into a file like object
69 69
70 70 The file is in the form::
71 71
72 72 (<chunk-size>\n<chunk>)+0\n
73 73
74 74 chunk size is the ascii version of the int.
75 75 """
76 76
77 77 @abc.abstractmethod
78 78 def mayberedirectstdio(self):
79 79 """Context manager to possibly redirect stdio.
80 80
81 81 The context manager yields a file-object like object that receives
82 82 stdout and stderr output when the context manager is active. Or it
83 83 yields ``None`` if no I/O redirection occurs.
84 84
85 85 The intent of this context manager is to capture stdio output
86 86 so it may be sent in the response. Some transports support streaming
87 87 stdio to the client in real time. For these transports, stdio output
88 88 won't be captured.
89 89 """
90 90
91 @abc.abstractmethod
92 def client(self):
93 """Returns a string representation of this client (as bytes)."""
94
91 95 def decodevaluefromheaders(req, headerprefix):
92 96 """Decode a long value from multiple HTTP request headers.
93 97
94 98 Returns the value as a bytes, not a str.
95 99 """
96 100 chunks = []
97 101 i = 1
98 102 prefix = headerprefix.upper().replace(r'-', r'_')
99 103 while True:
100 104 v = req.env.get(r'HTTP_%s_%d' % (prefix, i))
101 105 if v is None:
102 106 break
103 107 chunks.append(pycompat.bytesurl(v))
104 108 i += 1
105 109
106 110 return ''.join(chunks)
107 111
108 112 class webproto(baseprotocolhandler):
109 113 def __init__(self, req, ui):
110 114 self._req = req
111 115 self._ui = ui
112 116
113 117 @property
114 118 def name(self):
115 119 return 'http'
116 120
117 121 def getargs(self, args):
118 122 knownargs = self._args()
119 123 data = {}
120 124 keys = args.split()
121 125 for k in keys:
122 126 if k == '*':
123 127 star = {}
124 128 for key in knownargs.keys():
125 129 if key != 'cmd' and key not in keys:
126 130 star[key] = knownargs[key][0]
127 131 data['*'] = star
128 132 else:
129 133 data[k] = knownargs[k][0]
130 134 return [data[k] for k in keys]
131 135
132 136 def _args(self):
133 137 args = util.rapply(pycompat.bytesurl, self._req.form.copy())
134 138 postlen = int(self._req.env.get(r'HTTP_X_HGARGS_POST', 0))
135 139 if postlen:
136 140 args.update(cgi.parse_qs(
137 141 self._req.read(postlen), keep_blank_values=True))
138 142 return args
139 143
140 144 argvalue = decodevaluefromheaders(self._req, r'X-HgArg')
141 145 args.update(cgi.parse_qs(argvalue, keep_blank_values=True))
142 146 return args
143 147
144 148 def getfile(self, fp):
145 149 length = int(self._req.env[r'CONTENT_LENGTH'])
146 150 # If httppostargs is used, we need to read Content-Length
147 151 # minus the amount that was consumed by args.
148 152 length -= int(self._req.env.get(r'HTTP_X_HGARGS_POST', 0))
149 153 for s in util.filechunkiter(self._req, limit=length):
150 154 fp.write(s)
151 155
152 156 @contextlib.contextmanager
153 157 def mayberedirectstdio(self):
154 158 oldout = self._ui.fout
155 159 olderr = self._ui.ferr
156 160
157 161 out = util.stringio()
158 162
159 163 try:
160 164 self._ui.fout = out
161 165 self._ui.ferr = out
162 166 yield out
163 167 finally:
164 168 self._ui.fout = oldout
165 169 self._ui.ferr = olderr
166 170
167 def _client(self):
171 def client(self):
168 172 return 'remote:%s:%s:%s' % (
169 173 self._req.env.get('wsgi.url_scheme') or 'http',
170 174 urlreq.quote(self._req.env.get('REMOTE_HOST', '')),
171 175 urlreq.quote(self._req.env.get('REMOTE_USER', '')))
172 176
173 177 def responsetype(self, prefer_uncompressed):
174 178 """Determine the appropriate response type and compression settings.
175 179
176 180 Returns a tuple of (mediatype, compengine, engineopts).
177 181 """
178 182 # Determine the response media type and compression engine based
179 183 # on the request parameters.
180 184 protocaps = decodevaluefromheaders(self._req, r'X-HgProto').split(' ')
181 185
182 186 if '0.2' in protocaps:
183 187 # All clients are expected to support uncompressed data.
184 188 if prefer_uncompressed:
185 189 return HGTYPE2, util._noopengine(), {}
186 190
187 191 # Default as defined by wire protocol spec.
188 192 compformats = ['zlib', 'none']
189 193 for cap in protocaps:
190 194 if cap.startswith('comp='):
191 195 compformats = cap[5:].split(',')
192 196 break
193 197
194 198 # Now find an agreed upon compression format.
195 199 for engine in wireproto.supportedcompengines(self._ui, self,
196 200 util.SERVERROLE):
197 201 if engine.wireprotosupport().name in compformats:
198 202 opts = {}
199 203 level = self._ui.configint('server',
200 204 '%slevel' % engine.name())
201 205 if level is not None:
202 206 opts['level'] = level
203 207
204 208 return HGTYPE2, engine, opts
205 209
206 210 # No mutually supported compression format. Fall back to the
207 211 # legacy protocol.
208 212
209 213 # Don't allow untrusted settings because disabling compression or
210 214 # setting a very high compression level could lead to flooding
211 215 # the server's network or CPU.
212 216 opts = {'level': self._ui.configint('server', 'zliblevel')}
213 217 return HGTYPE, util.compengines['zlib'], opts
214 218
215 219 def iscmd(cmd):
216 220 return cmd in wireproto.commands
217 221
218 222 def parsehttprequest(repo, req, query):
219 223 """Parse the HTTP request for a wire protocol request.
220 224
221 225 If the current request appears to be a wire protocol request, this
222 226 function returns a dict with details about that request, including
223 227 an ``abstractprotocolserver`` instance suitable for handling the
224 228 request. Otherwise, ``None`` is returned.
225 229
226 230 ``req`` is a ``wsgirequest`` instance.
227 231 """
228 232 # HTTP version 1 wire protocol requests are denoted by a "cmd" query
229 233 # string parameter. If it isn't present, this isn't a wire protocol
230 234 # request.
231 235 if r'cmd' not in req.form:
232 236 return None
233 237
234 238 cmd = pycompat.sysbytes(req.form[r'cmd'][0])
235 239
236 240 # The "cmd" request parameter is used by both the wire protocol and hgweb.
237 241 # While not all wire protocol commands are available for all transports,
238 242 # if we see a "cmd" value that resembles a known wire protocol command, we
239 243 # route it to a protocol handler. This is better than routing possible
240 244 # wire protocol requests to hgweb because it prevents hgweb from using
241 245 # known wire protocol commands and it is less confusing for machine
242 246 # clients.
243 247 if cmd not in wireproto.commands:
244 248 return None
245 249
246 250 proto = webproto(req, repo.ui)
247 251
248 252 return {
249 253 'cmd': cmd,
250 254 'proto': proto,
251 255 'dispatch': lambda: _callhttp(repo, req, proto, cmd),
252 256 'handleerror': lambda ex: _handlehttperror(ex, req, cmd),
253 257 }
254 258
255 259 def _callhttp(repo, req, proto, cmd):
256 260 def genversion2(gen, engine, engineopts):
257 261 # application/mercurial-0.2 always sends a payload header
258 262 # identifying the compression engine.
259 263 name = engine.wireprotosupport().name
260 264 assert 0 < len(name) < 256
261 265 yield struct.pack('B', len(name))
262 266 yield name
263 267
264 268 for chunk in gen:
265 269 yield chunk
266 270
267 271 rsp = wireproto.dispatch(repo, proto, cmd)
268 272
269 273 if not wireproto.commands.commandavailable(cmd, proto):
270 274 req.respond(HTTP_OK, HGERRTYPE,
271 275 body=_('requested wire protocol command is not available '
272 276 'over HTTP'))
273 277 return []
274 278
275 279 if isinstance(rsp, bytes):
276 280 req.respond(HTTP_OK, HGTYPE, body=rsp)
277 281 return []
278 282 elif isinstance(rsp, wireproto.streamres_legacy):
279 283 gen = rsp.gen
280 284 req.respond(HTTP_OK, HGTYPE)
281 285 return gen
282 286 elif isinstance(rsp, wireproto.streamres):
283 287 gen = rsp.gen
284 288
285 289 # This code for compression should not be streamres specific. It
286 290 # is here because we only compress streamres at the moment.
287 291 mediatype, engine, engineopts = proto.responsetype(
288 292 rsp.prefer_uncompressed)
289 293 gen = engine.compressstream(gen, engineopts)
290 294
291 295 if mediatype == HGTYPE2:
292 296 gen = genversion2(gen, engine, engineopts)
293 297
294 298 req.respond(HTTP_OK, mediatype)
295 299 return gen
296 300 elif isinstance(rsp, wireproto.pushres):
297 301 rsp = '%d\n%s' % (rsp.res, rsp.output)
298 302 req.respond(HTTP_OK, HGTYPE, body=rsp)
299 303 return []
300 304 elif isinstance(rsp, wireproto.pusherr):
301 305 # This is the httplib workaround documented in _handlehttperror().
302 306 req.drain()
303 307
304 308 rsp = '0\n%s\n' % rsp.res
305 309 req.respond(HTTP_OK, HGTYPE, body=rsp)
306 310 return []
307 311 elif isinstance(rsp, wireproto.ooberror):
308 312 rsp = rsp.message
309 313 req.respond(HTTP_OK, HGERRTYPE, body=rsp)
310 314 return []
311 315 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
312 316
313 317 def _handlehttperror(e, req, cmd):
314 318 """Called when an ErrorResponse is raised during HTTP request processing."""
315 319
316 320 # Clients using Python's httplib are stateful: the HTTP client
317 321 # won't process an HTTP response until all request data is
318 322 # sent to the server. The intent of this code is to ensure
319 323 # we always read HTTP request data from the client, thus
320 324 # ensuring httplib transitions to a state that allows it to read
321 325 # the HTTP response. In other words, it helps prevent deadlocks
322 326 # on clients using httplib.
323 327
324 328 if (req.env[r'REQUEST_METHOD'] == r'POST' and
325 329 # But not if Expect: 100-continue is being used.
326 330 (req.env.get('HTTP_EXPECT',
327 331 '').lower() != '100-continue') or
328 332 # Or the non-httplib HTTP library is being advertised by
329 333 # the client.
330 334 req.env.get('X-HgHttp2', '')):
331 335 req.drain()
332 336 else:
333 337 req.headers.append((r'Connection', r'Close'))
334 338
335 339 # TODO This response body assumes the failed command was
336 340 # "unbundle." That assumption is not always valid.
337 341 req.respond(e, HGTYPE, body='0\n%s\n' % e)
338 342
339 343 return ''
340 344
341 345 def _sshv1respondbytes(fout, value):
342 346 """Send a bytes response for protocol version 1."""
343 347 fout.write('%d\n' % len(value))
344 348 fout.write(value)
345 349 fout.flush()
346 350
347 351 def _sshv1respondstream(fout, source):
348 352 write = fout.write
349 353 for chunk in source.gen:
350 354 write(chunk)
351 355 fout.flush()
352 356
353 357 def _sshv1respondooberror(fout, ferr, rsp):
354 358 ferr.write(b'%s\n-\n' % rsp)
355 359 ferr.flush()
356 360 fout.write(b'\n')
357 361 fout.flush()
358 362
359 363 class sshv1protocolhandler(baseprotocolhandler):
360 364 """Handler for requests services via version 1 of SSH protocol."""
361 365 def __init__(self, ui, fin, fout):
362 366 self._ui = ui
363 367 self._fin = fin
364 368 self._fout = fout
365 369
366 370 @property
367 371 def name(self):
368 372 return 'ssh'
369 373
370 374 def getargs(self, args):
371 375 data = {}
372 376 keys = args.split()
373 377 for n in xrange(len(keys)):
374 378 argline = self._fin.readline()[:-1]
375 379 arg, l = argline.split()
376 380 if arg not in keys:
377 381 raise error.Abort(_("unexpected parameter %r") % arg)
378 382 if arg == '*':
379 383 star = {}
380 384 for k in xrange(int(l)):
381 385 argline = self._fin.readline()[:-1]
382 386 arg, l = argline.split()
383 387 val = self._fin.read(int(l))
384 388 star[arg] = val
385 389 data['*'] = star
386 390 else:
387 391 val = self._fin.read(int(l))
388 392 data[arg] = val
389 393 return [data[k] for k in keys]
390 394
391 395 def getfile(self, fpout):
392 396 _sshv1respondbytes(self._fout, b'')
393 397 count = int(self._fin.readline())
394 398 while count:
395 399 fpout.write(self._fin.read(count))
396 400 count = int(self._fin.readline())
397 401
398 402 @contextlib.contextmanager
399 403 def mayberedirectstdio(self):
400 404 yield None
401 405
402 def _client(self):
406 def client(self):
403 407 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
404 408 return 'remote:ssh:' + client
405 409
406 410 class sshserver(object):
407 411 def __init__(self, ui, repo):
408 412 self._ui = ui
409 413 self._repo = repo
410 414 self._fin = ui.fin
411 415 self._fout = ui.fout
412 416
413 417 hook.redirect(True)
414 418 ui.fout = repo.ui.fout = ui.ferr
415 419
416 420 # Prevent insertion/deletion of CRs
417 421 util.setbinary(self._fin)
418 422 util.setbinary(self._fout)
419 423
420 424 self._proto = sshv1protocolhandler(self._ui, self._fin, self._fout)
421 425
422 426 def serve_forever(self):
423 427 while self.serve_one():
424 428 pass
425 429 sys.exit(0)
426 430
427 431 def serve_one(self):
428 432 cmd = self._fin.readline()[:-1]
429 433 if cmd and wireproto.commands.commandavailable(cmd, self._proto):
430 434 rsp = wireproto.dispatch(self._repo, self._proto, cmd)
431 435
432 436 if isinstance(rsp, bytes):
433 437 _sshv1respondbytes(self._fout, rsp)
434 438 elif isinstance(rsp, wireproto.streamres):
435 439 _sshv1respondstream(self._fout, rsp)
436 440 elif isinstance(rsp, wireproto.streamres_legacy):
437 441 _sshv1respondstream(self._fout, rsp)
438 442 elif isinstance(rsp, wireproto.pushres):
439 443 _sshv1respondbytes(self._fout, b'')
440 444 _sshv1respondbytes(self._fout, bytes(rsp.res))
441 445 elif isinstance(rsp, wireproto.pusherr):
442 446 _sshv1respondbytes(self._fout, rsp.res)
443 447 elif isinstance(rsp, wireproto.ooberror):
444 448 _sshv1respondooberror(self._fout, self._ui.ferr, rsp.message)
445 449 else:
446 450 raise error.ProgrammingError('unhandled response type from '
447 451 'wire protocol command: %s' % rsp)
448 452 elif cmd:
449 453 _sshv1respondbytes(self._fout, b'')
450 454 return cmd != ''
General Comments 0
You need to be logged in to leave comments. Login now