##// END OF EJS Templates
wireprotoserver: move abstractserverproto class from wireproto...
Gregory Szorc -
r35878:d9e71cce default
parent child Browse files
Show More
@@ -1,1076 +1,1036 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import os
12 12 import tempfile
13 13
14 14 from .i18n import _
15 15 from .node import (
16 16 bin,
17 17 hex,
18 18 nullid,
19 19 )
20 20
21 21 from . import (
22 22 bundle2,
23 23 changegroup as changegroupmod,
24 24 discovery,
25 25 encoding,
26 26 error,
27 27 exchange,
28 28 peer,
29 29 pushkey as pushkeymod,
30 30 pycompat,
31 31 repository,
32 32 streamclone,
33 33 util,
34 34 )
35 35
36 36 urlerr = util.urlerr
37 37 urlreq = util.urlreq
38 38
39 39 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
40 40 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
41 41 'IncompatibleClient')
42 42 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
43 43
44 class abstractserverproto(object):
45 """abstract class that summarizes the protocol API
46
47 Used as reference and documentation.
48 """
49
50 def getargs(self, args):
51 """return the value for arguments in <args>
52
53 returns a list of values (same order as <args>)"""
54 raise NotImplementedError()
55
56 def getfile(self, fp):
57 """write the whole content of a file into a file like object
58
59 The file is in the form::
60
61 (<chunk-size>\n<chunk>)+0\n
62
63 chunk size is the ascii version of the int.
64 """
65 raise NotImplementedError()
66
67 def redirect(self):
68 """may setup interception for stdout and stderr
69
70 See also the `restore` method."""
71 raise NotImplementedError()
72
73 # If the `redirect` function does install interception, the `restore`
74 # function MUST be defined. If interception is not used, this function
75 # MUST NOT be defined.
76 #
77 # left commented here on purpose
78 #
79 #def restore(self):
80 # """reinstall previous stdout and stderr and return intercepted stdout
81 # """
82 # raise NotImplementedError()
83
84 44 class remoteiterbatcher(peer.iterbatcher):
85 45 def __init__(self, remote):
86 46 super(remoteiterbatcher, self).__init__()
87 47 self._remote = remote
88 48
89 49 def __getattr__(self, name):
90 50 # Validate this method is batchable, since submit() only supports
91 51 # batchable methods.
92 52 fn = getattr(self._remote, name)
93 53 if not getattr(fn, 'batchable', None):
94 54 raise error.ProgrammingError('Attempted to batch a non-batchable '
95 55 'call to %r' % name)
96 56
97 57 return super(remoteiterbatcher, self).__getattr__(name)
98 58
99 59 def submit(self):
100 60 """Break the batch request into many patch calls and pipeline them.
101 61
102 62 This is mostly valuable over http where request sizes can be
103 63 limited, but can be used in other places as well.
104 64 """
105 65 # 2-tuple of (command, arguments) that represents what will be
106 66 # sent over the wire.
107 67 requests = []
108 68
109 69 # 4-tuple of (command, final future, @batchable generator, remote
110 70 # future).
111 71 results = []
112 72
113 73 for command, args, opts, finalfuture in self.calls:
114 74 mtd = getattr(self._remote, command)
115 75 batchable = mtd.batchable(mtd.__self__, *args, **opts)
116 76
117 77 commandargs, fremote = next(batchable)
118 78 assert fremote
119 79 requests.append((command, commandargs))
120 80 results.append((command, finalfuture, batchable, fremote))
121 81
122 82 if requests:
123 83 self._resultiter = self._remote._submitbatch(requests)
124 84
125 85 self._results = results
126 86
127 87 def results(self):
128 88 for command, finalfuture, batchable, remotefuture in self._results:
129 89 # Get the raw result, set it in the remote future, feed it
130 90 # back into the @batchable generator so it can be decoded, and
131 91 # set the result on the final future to this value.
132 92 remoteresult = next(self._resultiter)
133 93 remotefuture.set(remoteresult)
134 94 finalfuture.set(next(batchable))
135 95
136 96 # Verify our @batchable generators only emit 2 values.
137 97 try:
138 98 next(batchable)
139 99 except StopIteration:
140 100 pass
141 101 else:
142 102 raise error.ProgrammingError('%s @batchable generator emitted '
143 103 'unexpected value count' % command)
144 104
145 105 yield finalfuture.value
146 106
147 107 # Forward a couple of names from peer to make wireproto interactions
148 108 # slightly more sensible.
149 109 batchable = peer.batchable
150 110 future = peer.future
151 111
152 112 # list of nodes encoding / decoding
153 113
154 114 def decodelist(l, sep=' '):
155 115 if l:
156 116 return [bin(v) for v in l.split(sep)]
157 117 return []
158 118
159 119 def encodelist(l, sep=' '):
160 120 try:
161 121 return sep.join(map(hex, l))
162 122 except TypeError:
163 123 raise
164 124
165 125 # batched call argument encoding
166 126
167 127 def escapearg(plain):
168 128 return (plain
169 129 .replace(':', ':c')
170 130 .replace(',', ':o')
171 131 .replace(';', ':s')
172 132 .replace('=', ':e'))
173 133
174 134 def unescapearg(escaped):
175 135 return (escaped
176 136 .replace(':e', '=')
177 137 .replace(':s', ';')
178 138 .replace(':o', ',')
179 139 .replace(':c', ':'))
180 140
181 141 def encodebatchcmds(req):
182 142 """Return a ``cmds`` argument value for the ``batch`` command."""
183 143 cmds = []
184 144 for op, argsdict in req:
185 145 # Old servers didn't properly unescape argument names. So prevent
186 146 # the sending of argument names that may not be decoded properly by
187 147 # servers.
188 148 assert all(escapearg(k) == k for k in argsdict)
189 149
190 150 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
191 151 for k, v in argsdict.iteritems())
192 152 cmds.append('%s %s' % (op, args))
193 153
194 154 return ';'.join(cmds)
195 155
196 156 # mapping of options accepted by getbundle and their types
197 157 #
198 158 # Meant to be extended by extensions. It is extensions responsibility to ensure
199 159 # such options are properly processed in exchange.getbundle.
200 160 #
201 161 # supported types are:
202 162 #
203 163 # :nodes: list of binary nodes
204 164 # :csv: list of comma-separated values
205 165 # :scsv: list of comma-separated values return as set
206 166 # :plain: string with no transformation needed.
207 167 gboptsmap = {'heads': 'nodes',
208 168 'bookmarks': 'boolean',
209 169 'common': 'nodes',
210 170 'obsmarkers': 'boolean',
211 171 'phases': 'boolean',
212 172 'bundlecaps': 'scsv',
213 173 'listkeys': 'csv',
214 174 'cg': 'boolean',
215 175 'cbattempted': 'boolean',
216 176 'stream': 'boolean',
217 177 }
218 178
219 179 # client side
220 180
221 181 class wirepeer(repository.legacypeer):
222 182 """Client-side interface for communicating with a peer repository.
223 183
224 184 Methods commonly call wire protocol commands of the same name.
225 185
226 186 See also httppeer.py and sshpeer.py for protocol-specific
227 187 implementations of this interface.
228 188 """
229 189 # Begin of basewirepeer interface.
230 190
231 191 def iterbatch(self):
232 192 return remoteiterbatcher(self)
233 193
234 194 @batchable
235 195 def lookup(self, key):
236 196 self.requirecap('lookup', _('look up remote revision'))
237 197 f = future()
238 198 yield {'key': encoding.fromlocal(key)}, f
239 199 d = f.value
240 200 success, data = d[:-1].split(" ", 1)
241 201 if int(success):
242 202 yield bin(data)
243 203 else:
244 204 self._abort(error.RepoError(data))
245 205
246 206 @batchable
247 207 def heads(self):
248 208 f = future()
249 209 yield {}, f
250 210 d = f.value
251 211 try:
252 212 yield decodelist(d[:-1])
253 213 except ValueError:
254 214 self._abort(error.ResponseError(_("unexpected response:"), d))
255 215
256 216 @batchable
257 217 def known(self, nodes):
258 218 f = future()
259 219 yield {'nodes': encodelist(nodes)}, f
260 220 d = f.value
261 221 try:
262 222 yield [bool(int(b)) for b in d]
263 223 except ValueError:
264 224 self._abort(error.ResponseError(_("unexpected response:"), d))
265 225
266 226 @batchable
267 227 def branchmap(self):
268 228 f = future()
269 229 yield {}, f
270 230 d = f.value
271 231 try:
272 232 branchmap = {}
273 233 for branchpart in d.splitlines():
274 234 branchname, branchheads = branchpart.split(' ', 1)
275 235 branchname = encoding.tolocal(urlreq.unquote(branchname))
276 236 branchheads = decodelist(branchheads)
277 237 branchmap[branchname] = branchheads
278 238 yield branchmap
279 239 except TypeError:
280 240 self._abort(error.ResponseError(_("unexpected response:"), d))
281 241
282 242 @batchable
283 243 def listkeys(self, namespace):
284 244 if not self.capable('pushkey'):
285 245 yield {}, None
286 246 f = future()
287 247 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
288 248 yield {'namespace': encoding.fromlocal(namespace)}, f
289 249 d = f.value
290 250 self.ui.debug('received listkey for "%s": %i bytes\n'
291 251 % (namespace, len(d)))
292 252 yield pushkeymod.decodekeys(d)
293 253
294 254 @batchable
295 255 def pushkey(self, namespace, key, old, new):
296 256 if not self.capable('pushkey'):
297 257 yield False, None
298 258 f = future()
299 259 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
300 260 yield {'namespace': encoding.fromlocal(namespace),
301 261 'key': encoding.fromlocal(key),
302 262 'old': encoding.fromlocal(old),
303 263 'new': encoding.fromlocal(new)}, f
304 264 d = f.value
305 265 d, output = d.split('\n', 1)
306 266 try:
307 267 d = bool(int(d))
308 268 except ValueError:
309 269 raise error.ResponseError(
310 270 _('push failed (unexpected response):'), d)
311 271 for l in output.splitlines(True):
312 272 self.ui.status(_('remote: '), l)
313 273 yield d
314 274
315 275 def stream_out(self):
316 276 return self._callstream('stream_out')
317 277
318 278 def getbundle(self, source, **kwargs):
319 279 kwargs = pycompat.byteskwargs(kwargs)
320 280 self.requirecap('getbundle', _('look up remote changes'))
321 281 opts = {}
322 282 bundlecaps = kwargs.get('bundlecaps')
323 283 if bundlecaps is not None:
324 284 kwargs['bundlecaps'] = sorted(bundlecaps)
325 285 else:
326 286 bundlecaps = () # kwargs could have it to None
327 287 for key, value in kwargs.iteritems():
328 288 if value is None:
329 289 continue
330 290 keytype = gboptsmap.get(key)
331 291 if keytype is None:
332 292 raise error.ProgrammingError(
333 293 'Unexpectedly None keytype for key %s' % key)
334 294 elif keytype == 'nodes':
335 295 value = encodelist(value)
336 296 elif keytype in ('csv', 'scsv'):
337 297 value = ','.join(value)
338 298 elif keytype == 'boolean':
339 299 value = '%i' % bool(value)
340 300 elif keytype != 'plain':
341 301 raise KeyError('unknown getbundle option type %s'
342 302 % keytype)
343 303 opts[key] = value
344 304 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
345 305 if any((cap.startswith('HG2') for cap in bundlecaps)):
346 306 return bundle2.getunbundler(self.ui, f)
347 307 else:
348 308 return changegroupmod.cg1unpacker(f, 'UN')
349 309
350 310 def unbundle(self, cg, heads, url):
351 311 '''Send cg (a readable file-like object representing the
352 312 changegroup to push, typically a chunkbuffer object) to the
353 313 remote server as a bundle.
354 314
355 315 When pushing a bundle10 stream, return an integer indicating the
356 316 result of the push (see changegroup.apply()).
357 317
358 318 When pushing a bundle20 stream, return a bundle20 stream.
359 319
360 320 `url` is the url the client thinks it's pushing to, which is
361 321 visible to hooks.
362 322 '''
363 323
364 324 if heads != ['force'] and self.capable('unbundlehash'):
365 325 heads = encodelist(['hashed',
366 326 hashlib.sha1(''.join(sorted(heads))).digest()])
367 327 else:
368 328 heads = encodelist(heads)
369 329
370 330 if util.safehasattr(cg, 'deltaheader'):
371 331 # this a bundle10, do the old style call sequence
372 332 ret, output = self._callpush("unbundle", cg, heads=heads)
373 333 if ret == "":
374 334 raise error.ResponseError(
375 335 _('push failed:'), output)
376 336 try:
377 337 ret = int(ret)
378 338 except ValueError:
379 339 raise error.ResponseError(
380 340 _('push failed (unexpected response):'), ret)
381 341
382 342 for l in output.splitlines(True):
383 343 self.ui.status(_('remote: '), l)
384 344 else:
385 345 # bundle2 push. Send a stream, fetch a stream.
386 346 stream = self._calltwowaystream('unbundle', cg, heads=heads)
387 347 ret = bundle2.getunbundler(self.ui, stream)
388 348 return ret
389 349
390 350 # End of basewirepeer interface.
391 351
392 352 # Begin of baselegacywirepeer interface.
393 353
394 354 def branches(self, nodes):
395 355 n = encodelist(nodes)
396 356 d = self._call("branches", nodes=n)
397 357 try:
398 358 br = [tuple(decodelist(b)) for b in d.splitlines()]
399 359 return br
400 360 except ValueError:
401 361 self._abort(error.ResponseError(_("unexpected response:"), d))
402 362
403 363 def between(self, pairs):
404 364 batch = 8 # avoid giant requests
405 365 r = []
406 366 for i in xrange(0, len(pairs), batch):
407 367 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
408 368 d = self._call("between", pairs=n)
409 369 try:
410 370 r.extend(l and decodelist(l) or [] for l in d.splitlines())
411 371 except ValueError:
412 372 self._abort(error.ResponseError(_("unexpected response:"), d))
413 373 return r
414 374
415 375 def changegroup(self, nodes, kind):
416 376 n = encodelist(nodes)
417 377 f = self._callcompressable("changegroup", roots=n)
418 378 return changegroupmod.cg1unpacker(f, 'UN')
419 379
420 380 def changegroupsubset(self, bases, heads, kind):
421 381 self.requirecap('changegroupsubset', _('look up remote changes'))
422 382 bases = encodelist(bases)
423 383 heads = encodelist(heads)
424 384 f = self._callcompressable("changegroupsubset",
425 385 bases=bases, heads=heads)
426 386 return changegroupmod.cg1unpacker(f, 'UN')
427 387
428 388 # End of baselegacywirepeer interface.
429 389
430 390 def _submitbatch(self, req):
431 391 """run batch request <req> on the server
432 392
433 393 Returns an iterator of the raw responses from the server.
434 394 """
435 395 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
436 396 chunk = rsp.read(1024)
437 397 work = [chunk]
438 398 while chunk:
439 399 while ';' not in chunk and chunk:
440 400 chunk = rsp.read(1024)
441 401 work.append(chunk)
442 402 merged = ''.join(work)
443 403 while ';' in merged:
444 404 one, merged = merged.split(';', 1)
445 405 yield unescapearg(one)
446 406 chunk = rsp.read(1024)
447 407 work = [merged, chunk]
448 408 yield unescapearg(''.join(work))
449 409
450 410 def _submitone(self, op, args):
451 411 return self._call(op, **pycompat.strkwargs(args))
452 412
453 413 def debugwireargs(self, one, two, three=None, four=None, five=None):
454 414 # don't pass optional arguments left at their default value
455 415 opts = {}
456 416 if three is not None:
457 417 opts[r'three'] = three
458 418 if four is not None:
459 419 opts[r'four'] = four
460 420 return self._call('debugwireargs', one=one, two=two, **opts)
461 421
462 422 def _call(self, cmd, **args):
463 423 """execute <cmd> on the server
464 424
465 425 The command is expected to return a simple string.
466 426
467 427 returns the server reply as a string."""
468 428 raise NotImplementedError()
469 429
470 430 def _callstream(self, cmd, **args):
471 431 """execute <cmd> on the server
472 432
473 433 The command is expected to return a stream. Note that if the
474 434 command doesn't return a stream, _callstream behaves
475 435 differently for ssh and http peers.
476 436
477 437 returns the server reply as a file like object.
478 438 """
479 439 raise NotImplementedError()
480 440
481 441 def _callcompressable(self, cmd, **args):
482 442 """execute <cmd> on the server
483 443
484 444 The command is expected to return a stream.
485 445
486 446 The stream may have been compressed in some implementations. This
487 447 function takes care of the decompression. This is the only difference
488 448 with _callstream.
489 449
490 450 returns the server reply as a file like object.
491 451 """
492 452 raise NotImplementedError()
493 453
494 454 def _callpush(self, cmd, fp, **args):
495 455 """execute a <cmd> on server
496 456
497 457 The command is expected to be related to a push. Push has a special
498 458 return method.
499 459
500 460 returns the server reply as a (ret, output) tuple. ret is either
501 461 empty (error) or a stringified int.
502 462 """
503 463 raise NotImplementedError()
504 464
505 465 def _calltwowaystream(self, cmd, fp, **args):
506 466 """execute <cmd> on server
507 467
508 468 The command will send a stream to the server and get a stream in reply.
509 469 """
510 470 raise NotImplementedError()
511 471
512 472 def _abort(self, exception):
513 473 """clearly abort the wire protocol connection and raise the exception
514 474 """
515 475 raise NotImplementedError()
516 476
517 477 # server side
518 478
519 479 # wire protocol command can either return a string or one of these classes.
520 480 class streamres(object):
521 481 """wireproto reply: binary stream
522 482
523 483 The call was successful and the result is a stream.
524 484
525 485 Accepts a generator containing chunks of data to be sent to the client.
526 486
527 487 ``prefer_uncompressed`` indicates that the data is expected to be
528 488 uncompressable and that the stream should therefore use the ``none``
529 489 engine.
530 490 """
531 491 def __init__(self, gen=None, prefer_uncompressed=False):
532 492 self.gen = gen
533 493 self.prefer_uncompressed = prefer_uncompressed
534 494
535 495 class streamres_legacy(object):
536 496 """wireproto reply: uncompressed binary stream
537 497
538 498 The call was successful and the result is a stream.
539 499
540 500 Accepts a generator containing chunks of data to be sent to the client.
541 501
542 502 Like ``streamres``, but sends an uncompressed data for "version 1" clients
543 503 using the application/mercurial-0.1 media type.
544 504 """
545 505 def __init__(self, gen=None):
546 506 self.gen = gen
547 507
548 508 class pushres(object):
549 509 """wireproto reply: success with simple integer return
550 510
551 511 The call was successful and returned an integer contained in `self.res`.
552 512 """
553 513 def __init__(self, res):
554 514 self.res = res
555 515
556 516 class pusherr(object):
557 517 """wireproto reply: failure
558 518
559 519 The call failed. The `self.res` attribute contains the error message.
560 520 """
561 521 def __init__(self, res):
562 522 self.res = res
563 523
564 524 class ooberror(object):
565 525 """wireproto reply: failure of a batch of operation
566 526
567 527 Something failed during a batch call. The error message is stored in
568 528 `self.message`.
569 529 """
570 530 def __init__(self, message):
571 531 self.message = message
572 532
573 533 def getdispatchrepo(repo, proto, command):
574 534 """Obtain the repo used for processing wire protocol commands.
575 535
576 536 The intent of this function is to serve as a monkeypatch point for
577 537 extensions that need commands to operate on different repo views under
578 538 specialized circumstances.
579 539 """
580 540 return repo.filtered('served')
581 541
582 542 def dispatch(repo, proto, command):
583 543 repo = getdispatchrepo(repo, proto, command)
584 544 func, spec = commands[command]
585 545 args = proto.getargs(spec)
586 546 return func(repo, proto, *args)
587 547
588 548 def options(cmd, keys, others):
589 549 opts = {}
590 550 for k in keys:
591 551 if k in others:
592 552 opts[k] = others[k]
593 553 del others[k]
594 554 if others:
595 555 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
596 556 % (cmd, ",".join(others)))
597 557 return opts
598 558
599 559 def bundle1allowed(repo, action):
600 560 """Whether a bundle1 operation is allowed from the server.
601 561
602 562 Priority is:
603 563
604 564 1. server.bundle1gd.<action> (if generaldelta active)
605 565 2. server.bundle1.<action>
606 566 3. server.bundle1gd (if generaldelta active)
607 567 4. server.bundle1
608 568 """
609 569 ui = repo.ui
610 570 gd = 'generaldelta' in repo.requirements
611 571
612 572 if gd:
613 573 v = ui.configbool('server', 'bundle1gd.%s' % action)
614 574 if v is not None:
615 575 return v
616 576
617 577 v = ui.configbool('server', 'bundle1.%s' % action)
618 578 if v is not None:
619 579 return v
620 580
621 581 if gd:
622 582 v = ui.configbool('server', 'bundle1gd')
623 583 if v is not None:
624 584 return v
625 585
626 586 return ui.configbool('server', 'bundle1')
627 587
628 588 def supportedcompengines(ui, proto, role):
629 589 """Obtain the list of supported compression engines for a request."""
630 590 assert role in (util.CLIENTROLE, util.SERVERROLE)
631 591
632 592 compengines = util.compengines.supportedwireengines(role)
633 593
634 594 # Allow config to override default list and ordering.
635 595 if role == util.SERVERROLE:
636 596 configengines = ui.configlist('server', 'compressionengines')
637 597 config = 'server.compressionengines'
638 598 else:
639 599 # This is currently implemented mainly to facilitate testing. In most
640 600 # cases, the server should be in charge of choosing a compression engine
641 601 # because a server has the most to lose from a sub-optimal choice. (e.g.
642 602 # CPU DoS due to an expensive engine or a network DoS due to poor
643 603 # compression ratio).
644 604 configengines = ui.configlist('experimental',
645 605 'clientcompressionengines')
646 606 config = 'experimental.clientcompressionengines'
647 607
648 608 # No explicit config. Filter out the ones that aren't supposed to be
649 609 # advertised and return default ordering.
650 610 if not configengines:
651 611 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
652 612 return [e for e in compengines
653 613 if getattr(e.wireprotosupport(), attr) > 0]
654 614
655 615 # If compression engines are listed in the config, assume there is a good
656 616 # reason for it (like server operators wanting to achieve specific
657 617 # performance characteristics). So fail fast if the config references
658 618 # unusable compression engines.
659 619 validnames = set(e.name() for e in compengines)
660 620 invalidnames = set(e for e in configengines if e not in validnames)
661 621 if invalidnames:
662 622 raise error.Abort(_('invalid compression engine defined in %s: %s') %
663 623 (config, ', '.join(sorted(invalidnames))))
664 624
665 625 compengines = [e for e in compengines if e.name() in configengines]
666 626 compengines = sorted(compengines,
667 627 key=lambda e: configengines.index(e.name()))
668 628
669 629 if not compengines:
670 630 raise error.Abort(_('%s config option does not specify any known '
671 631 'compression engines') % config,
672 632 hint=_('usable compression engines: %s') %
673 633 ', '.sorted(validnames))
674 634
675 635 return compengines
676 636
677 637 # list of commands
678 638 commands = {}
679 639
680 640 def wireprotocommand(name, args=''):
681 641 """decorator for wire protocol command"""
682 642 def register(func):
683 643 commands[name] = (func, args)
684 644 return func
685 645 return register
686 646
687 647 @wireprotocommand('batch', 'cmds *')
688 648 def batch(repo, proto, cmds, others):
689 649 repo = repo.filtered("served")
690 650 res = []
691 651 for pair in cmds.split(';'):
692 652 op, args = pair.split(' ', 1)
693 653 vals = {}
694 654 for a in args.split(','):
695 655 if a:
696 656 n, v = a.split('=')
697 657 vals[unescapearg(n)] = unescapearg(v)
698 658 func, spec = commands[op]
699 659 if spec:
700 660 keys = spec.split()
701 661 data = {}
702 662 for k in keys:
703 663 if k == '*':
704 664 star = {}
705 665 for key in vals.keys():
706 666 if key not in keys:
707 667 star[key] = vals[key]
708 668 data['*'] = star
709 669 else:
710 670 data[k] = vals[k]
711 671 result = func(repo, proto, *[data[k] for k in keys])
712 672 else:
713 673 result = func(repo, proto)
714 674 if isinstance(result, ooberror):
715 675 return result
716 676 res.append(escapearg(result))
717 677 return ';'.join(res)
718 678
719 679 @wireprotocommand('between', 'pairs')
720 680 def between(repo, proto, pairs):
721 681 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
722 682 r = []
723 683 for b in repo.between(pairs):
724 684 r.append(encodelist(b) + "\n")
725 685 return "".join(r)
726 686
727 687 @wireprotocommand('branchmap')
728 688 def branchmap(repo, proto):
729 689 branchmap = repo.branchmap()
730 690 heads = []
731 691 for branch, nodes in branchmap.iteritems():
732 692 branchname = urlreq.quote(encoding.fromlocal(branch))
733 693 branchnodes = encodelist(nodes)
734 694 heads.append('%s %s' % (branchname, branchnodes))
735 695 return '\n'.join(heads)
736 696
737 697 @wireprotocommand('branches', 'nodes')
738 698 def branches(repo, proto, nodes):
739 699 nodes = decodelist(nodes)
740 700 r = []
741 701 for b in repo.branches(nodes):
742 702 r.append(encodelist(b) + "\n")
743 703 return "".join(r)
744 704
745 705 @wireprotocommand('clonebundles', '')
746 706 def clonebundles(repo, proto):
747 707 """Server command for returning info for available bundles to seed clones.
748 708
749 709 Clients will parse this response and determine what bundle to fetch.
750 710
751 711 Extensions may wrap this command to filter or dynamically emit data
752 712 depending on the request. e.g. you could advertise URLs for the closest
753 713 data center given the client's IP address.
754 714 """
755 715 return repo.vfs.tryread('clonebundles.manifest')
756 716
757 717 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
758 718 'known', 'getbundle', 'unbundlehash', 'batch']
759 719
760 720 def _capabilities(repo, proto):
761 721 """return a list of capabilities for a repo
762 722
763 723 This function exists to allow extensions to easily wrap capabilities
764 724 computation
765 725
766 726 - returns a lists: easy to alter
767 727 - change done here will be propagated to both `capabilities` and `hello`
768 728 command without any other action needed.
769 729 """
770 730 # copy to prevent modification of the global list
771 731 caps = list(wireprotocaps)
772 732 if streamclone.allowservergeneration(repo):
773 733 if repo.ui.configbool('server', 'preferuncompressed'):
774 734 caps.append('stream-preferred')
775 735 requiredformats = repo.requirements & repo.supportedformats
776 736 # if our local revlogs are just revlogv1, add 'stream' cap
777 737 if not requiredformats - {'revlogv1'}:
778 738 caps.append('stream')
779 739 # otherwise, add 'streamreqs' detailing our local revlog format
780 740 else:
781 741 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
782 742 if repo.ui.configbool('experimental', 'bundle2-advertise'):
783 743 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
784 744 caps.append('bundle2=' + urlreq.quote(capsblob))
785 745 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
786 746
787 747 if proto.name == 'http':
788 748 caps.append('httpheader=%d' %
789 749 repo.ui.configint('server', 'maxhttpheaderlen'))
790 750 if repo.ui.configbool('experimental', 'httppostargs'):
791 751 caps.append('httppostargs')
792 752
793 753 # FUTURE advertise 0.2rx once support is implemented
794 754 # FUTURE advertise minrx and mintx after consulting config option
795 755 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
796 756
797 757 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
798 758 if compengines:
799 759 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
800 760 for e in compengines)
801 761 caps.append('compression=%s' % comptypes)
802 762
803 763 return caps
804 764
805 765 # If you are writing an extension and consider wrapping this function. Wrap
806 766 # `_capabilities` instead.
807 767 @wireprotocommand('capabilities')
808 768 def capabilities(repo, proto):
809 769 return ' '.join(_capabilities(repo, proto))
810 770
811 771 @wireprotocommand('changegroup', 'roots')
812 772 def changegroup(repo, proto, roots):
813 773 nodes = decodelist(roots)
814 774 outgoing = discovery.outgoing(repo, missingroots=nodes,
815 775 missingheads=repo.heads())
816 776 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
817 777 gen = iter(lambda: cg.read(32768), '')
818 778 return streamres(gen=gen)
819 779
820 780 @wireprotocommand('changegroupsubset', 'bases heads')
821 781 def changegroupsubset(repo, proto, bases, heads):
822 782 bases = decodelist(bases)
823 783 heads = decodelist(heads)
824 784 outgoing = discovery.outgoing(repo, missingroots=bases,
825 785 missingheads=heads)
826 786 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
827 787 gen = iter(lambda: cg.read(32768), '')
828 788 return streamres(gen=gen)
829 789
830 790 @wireprotocommand('debugwireargs', 'one two *')
831 791 def debugwireargs(repo, proto, one, two, others):
832 792 # only accept optional args from the known set
833 793 opts = options('debugwireargs', ['three', 'four'], others)
834 794 return repo.debugwireargs(one, two, **pycompat.strkwargs(opts))
835 795
836 796 @wireprotocommand('getbundle', '*')
837 797 def getbundle(repo, proto, others):
838 798 opts = options('getbundle', gboptsmap.keys(), others)
839 799 for k, v in opts.iteritems():
840 800 keytype = gboptsmap[k]
841 801 if keytype == 'nodes':
842 802 opts[k] = decodelist(v)
843 803 elif keytype == 'csv':
844 804 opts[k] = list(v.split(','))
845 805 elif keytype == 'scsv':
846 806 opts[k] = set(v.split(','))
847 807 elif keytype == 'boolean':
848 808 # Client should serialize False as '0', which is a non-empty string
849 809 # so it evaluates as a True bool.
850 810 if v == '0':
851 811 opts[k] = False
852 812 else:
853 813 opts[k] = bool(v)
854 814 elif keytype != 'plain':
855 815 raise KeyError('unknown getbundle option type %s'
856 816 % keytype)
857 817
858 818 if not bundle1allowed(repo, 'pull'):
859 819 if not exchange.bundle2requested(opts.get('bundlecaps')):
860 820 if proto.name == 'http':
861 821 return ooberror(bundle2required)
862 822 raise error.Abort(bundle2requiredmain,
863 823 hint=bundle2requiredhint)
864 824
865 825 prefercompressed = True
866 826
867 827 try:
868 828 if repo.ui.configbool('server', 'disablefullbundle'):
869 829 # Check to see if this is a full clone.
870 830 clheads = set(repo.changelog.heads())
871 831 changegroup = opts.get('cg', True)
872 832 heads = set(opts.get('heads', set()))
873 833 common = set(opts.get('common', set()))
874 834 common.discard(nullid)
875 835 if changegroup and not common and clheads == heads:
876 836 raise error.Abort(
877 837 _('server has pull-based clones disabled'),
878 838 hint=_('remove --pull if specified or upgrade Mercurial'))
879 839
880 840 info, chunks = exchange.getbundlechunks(repo, 'serve',
881 841 **pycompat.strkwargs(opts))
882 842 prefercompressed = info.get('prefercompressed', True)
883 843 except error.Abort as exc:
884 844 # cleanly forward Abort error to the client
885 845 if not exchange.bundle2requested(opts.get('bundlecaps')):
886 846 if proto.name == 'http':
887 847 return ooberror(str(exc) + '\n')
888 848 raise # cannot do better for bundle1 + ssh
889 849 # bundle2 request expect a bundle2 reply
890 850 bundler = bundle2.bundle20(repo.ui)
891 851 manargs = [('message', str(exc))]
892 852 advargs = []
893 853 if exc.hint is not None:
894 854 advargs.append(('hint', exc.hint))
895 855 bundler.addpart(bundle2.bundlepart('error:abort',
896 856 manargs, advargs))
897 857 chunks = bundler.getchunks()
898 858 prefercompressed = False
899 859
900 860 return streamres(gen=chunks, prefer_uncompressed=not prefercompressed)
901 861
902 862 @wireprotocommand('heads')
903 863 def heads(repo, proto):
904 864 h = repo.heads()
905 865 return encodelist(h) + "\n"
906 866
907 867 @wireprotocommand('hello')
908 868 def hello(repo, proto):
909 869 '''the hello command returns a set of lines describing various
910 870 interesting things about the server, in an RFC822-like format.
911 871 Currently the only one defined is "capabilities", which
912 872 consists of a line in the form:
913 873
914 874 capabilities: space separated list of tokens
915 875 '''
916 876 return "capabilities: %s\n" % (capabilities(repo, proto))
917 877
918 878 @wireprotocommand('listkeys', 'namespace')
919 879 def listkeys(repo, proto, namespace):
920 880 d = repo.listkeys(encoding.tolocal(namespace)).items()
921 881 return pushkeymod.encodekeys(d)
922 882
923 883 @wireprotocommand('lookup', 'key')
924 884 def lookup(repo, proto, key):
925 885 try:
926 886 k = encoding.tolocal(key)
927 887 c = repo[k]
928 888 r = c.hex()
929 889 success = 1
930 890 except Exception as inst:
931 891 r = str(inst)
932 892 success = 0
933 893 return "%d %s\n" % (success, r)
934 894
935 895 @wireprotocommand('known', 'nodes *')
936 896 def known(repo, proto, nodes, others):
937 897 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
938 898
939 899 @wireprotocommand('pushkey', 'namespace key old new')
940 900 def pushkey(repo, proto, namespace, key, old, new):
941 901 # compatibility with pre-1.8 clients which were accidentally
942 902 # sending raw binary nodes rather than utf-8-encoded hex
943 903 if len(new) == 20 and util.escapestr(new) != new:
944 904 # looks like it could be a binary node
945 905 try:
946 906 new.decode('utf-8')
947 907 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
948 908 except UnicodeDecodeError:
949 909 pass # binary, leave unmodified
950 910 else:
951 911 new = encoding.tolocal(new) # normal path
952 912
953 913 if util.safehasattr(proto, 'restore'):
954 914
955 915 proto.redirect()
956 916
957 917 try:
958 918 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
959 919 encoding.tolocal(old), new) or False
960 920 except error.Abort:
961 921 r = False
962 922
963 923 output = proto.restore()
964 924
965 925 return '%s\n%s' % (int(r), output)
966 926
967 927 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
968 928 encoding.tolocal(old), new)
969 929 return '%s\n' % int(r)
970 930
971 931 @wireprotocommand('stream_out')
972 932 def stream(repo, proto):
973 933 '''If the server supports streaming clone, it advertises the "stream"
974 934 capability with a value representing the version and flags of the repo
975 935 it is serving. Client checks to see if it understands the format.
976 936 '''
977 937 return streamres_legacy(streamclone.generatev1wireproto(repo))
978 938
979 939 @wireprotocommand('unbundle', 'heads')
980 940 def unbundle(repo, proto, heads):
981 941 their_heads = decodelist(heads)
982 942
983 943 try:
984 944 proto.redirect()
985 945
986 946 exchange.check_heads(repo, their_heads, 'preparing changes')
987 947
988 948 # write bundle data to temporary file because it can be big
989 949 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
990 950 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
991 951 r = 0
992 952 try:
993 953 proto.getfile(fp)
994 954 fp.seek(0)
995 955 gen = exchange.readbundle(repo.ui, fp, None)
996 956 if (isinstance(gen, changegroupmod.cg1unpacker)
997 957 and not bundle1allowed(repo, 'push')):
998 958 if proto.name == 'http':
999 959 # need to special case http because stderr do not get to
1000 960 # the http client on failed push so we need to abuse some
1001 961 # other error type to make sure the message get to the
1002 962 # user.
1003 963 return ooberror(bundle2required)
1004 964 raise error.Abort(bundle2requiredmain,
1005 965 hint=bundle2requiredhint)
1006 966
1007 967 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1008 968 proto._client())
1009 969 if util.safehasattr(r, 'addpart'):
1010 970 # The return looks streamable, we are in the bundle2 case and
1011 971 # should return a stream.
1012 972 return streamres_legacy(gen=r.getchunks())
1013 973 return pushres(r)
1014 974
1015 975 finally:
1016 976 fp.close()
1017 977 os.unlink(tempname)
1018 978
1019 979 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1020 980 # handle non-bundle2 case first
1021 981 if not getattr(exc, 'duringunbundle2', False):
1022 982 try:
1023 983 raise
1024 984 except error.Abort:
1025 985 # The old code we moved used util.stderr directly.
1026 986 # We did not change it to minimise code change.
1027 987 # This need to be moved to something proper.
1028 988 # Feel free to do it.
1029 989 util.stderr.write("abort: %s\n" % exc)
1030 990 if exc.hint is not None:
1031 991 util.stderr.write("(%s)\n" % exc.hint)
1032 992 return pushres(0)
1033 993 except error.PushRaced:
1034 994 return pusherr(str(exc))
1035 995
1036 996 bundler = bundle2.bundle20(repo.ui)
1037 997 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1038 998 bundler.addpart(out)
1039 999 try:
1040 1000 try:
1041 1001 raise
1042 1002 except error.PushkeyFailed as exc:
1043 1003 # check client caps
1044 1004 remotecaps = getattr(exc, '_replycaps', None)
1045 1005 if (remotecaps is not None
1046 1006 and 'pushkey' not in remotecaps.get('error', ())):
1047 1007 # no support remote side, fallback to Abort handler.
1048 1008 raise
1049 1009 part = bundler.newpart('error:pushkey')
1050 1010 part.addparam('in-reply-to', exc.partid)
1051 1011 if exc.namespace is not None:
1052 1012 part.addparam('namespace', exc.namespace, mandatory=False)
1053 1013 if exc.key is not None:
1054 1014 part.addparam('key', exc.key, mandatory=False)
1055 1015 if exc.new is not None:
1056 1016 part.addparam('new', exc.new, mandatory=False)
1057 1017 if exc.old is not None:
1058 1018 part.addparam('old', exc.old, mandatory=False)
1059 1019 if exc.ret is not None:
1060 1020 part.addparam('ret', exc.ret, mandatory=False)
1061 1021 except error.BundleValueError as exc:
1062 1022 errpart = bundler.newpart('error:unsupportedcontent')
1063 1023 if exc.parttype is not None:
1064 1024 errpart.addparam('parttype', exc.parttype)
1065 1025 if exc.params:
1066 1026 errpart.addparam('params', '\0'.join(exc.params))
1067 1027 except error.Abort as exc:
1068 1028 manargs = [('message', str(exc))]
1069 1029 advargs = []
1070 1030 if exc.hint is not None:
1071 1031 advargs.append(('hint', exc.hint))
1072 1032 bundler.addpart(bundle2.bundlepart('error:abort',
1073 1033 manargs, advargs))
1074 1034 except error.PushRaced as exc:
1075 1035 bundler.newpart('error:pushraced', [('message', str(exc))])
1076 1036 return streamres_legacy(gen=bundler.getchunks())
@@ -1,314 +1,354 b''
1 1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 3 #
4 4 # This software may be used and distributed according to the terms of the
5 5 # GNU General Public License version 2 or any later version.
6 6
7 7 from __future__ import absolute_import
8 8
9 9 import cgi
10 10 import struct
11 11 import sys
12 12
13 13 from .i18n import _
14 14 from . import (
15 15 encoding,
16 16 error,
17 17 hook,
18 18 pycompat,
19 19 util,
20 20 wireproto,
21 21 )
22 22
23 23 stringio = util.stringio
24 24
25 25 urlerr = util.urlerr
26 26 urlreq = util.urlreq
27 27
28 28 HTTP_OK = 200
29 29
30 30 HGTYPE = 'application/mercurial-0.1'
31 31 HGTYPE2 = 'application/mercurial-0.2'
32 32 HGERRTYPE = 'application/hg-error'
33 33
34 class abstractserverproto(object):
35 """abstract class that summarizes the protocol API
36
37 Used as reference and documentation.
38 """
39
40 def getargs(self, args):
41 """return the value for arguments in <args>
42
43 returns a list of values (same order as <args>)"""
44 raise NotImplementedError()
45
46 def getfile(self, fp):
47 """write the whole content of a file into a file like object
48
49 The file is in the form::
50
51 (<chunk-size>\n<chunk>)+0\n
52
53 chunk size is the ascii version of the int.
54 """
55 raise NotImplementedError()
56
57 def redirect(self):
58 """may setup interception for stdout and stderr
59
60 See also the `restore` method."""
61 raise NotImplementedError()
62
63 # If the `redirect` function does install interception, the `restore`
64 # function MUST be defined. If interception is not used, this function
65 # MUST NOT be defined.
66 #
67 # left commented here on purpose
68 #
69 #def restore(self):
70 # """reinstall previous stdout and stderr and return intercepted stdout
71 # """
72 # raise NotImplementedError()
73
34 74 def decodevaluefromheaders(req, headerprefix):
35 75 """Decode a long value from multiple HTTP request headers.
36 76
37 77 Returns the value as a bytes, not a str.
38 78 """
39 79 chunks = []
40 80 i = 1
41 81 prefix = headerprefix.upper().replace(r'-', r'_')
42 82 while True:
43 83 v = req.env.get(r'HTTP_%s_%d' % (prefix, i))
44 84 if v is None:
45 85 break
46 86 chunks.append(pycompat.bytesurl(v))
47 87 i += 1
48 88
49 89 return ''.join(chunks)
50 90
51 class webproto(wireproto.abstractserverproto):
91 class webproto(abstractserverproto):
52 92 def __init__(self, req, ui):
53 93 self.req = req
54 94 self.response = ''
55 95 self.ui = ui
56 96 self.name = 'http'
57 97
58 98 def getargs(self, args):
59 99 knownargs = self._args()
60 100 data = {}
61 101 keys = args.split()
62 102 for k in keys:
63 103 if k == '*':
64 104 star = {}
65 105 for key in knownargs.keys():
66 106 if key != 'cmd' and key not in keys:
67 107 star[key] = knownargs[key][0]
68 108 data['*'] = star
69 109 else:
70 110 data[k] = knownargs[k][0]
71 111 return [data[k] for k in keys]
72 112 def _args(self):
73 113 args = self.req.form.copy()
74 114 if pycompat.ispy3:
75 115 args = {k.encode('ascii'): [v.encode('ascii') for v in vs]
76 116 for k, vs in args.items()}
77 117 postlen = int(self.req.env.get(r'HTTP_X_HGARGS_POST', 0))
78 118 if postlen:
79 119 args.update(cgi.parse_qs(
80 120 self.req.read(postlen), keep_blank_values=True))
81 121 return args
82 122
83 123 argvalue = decodevaluefromheaders(self.req, r'X-HgArg')
84 124 args.update(cgi.parse_qs(argvalue, keep_blank_values=True))
85 125 return args
86 126 def getfile(self, fp):
87 127 length = int(self.req.env[r'CONTENT_LENGTH'])
88 128 # If httppostargs is used, we need to read Content-Length
89 129 # minus the amount that was consumed by args.
90 130 length -= int(self.req.env.get(r'HTTP_X_HGARGS_POST', 0))
91 131 for s in util.filechunkiter(self.req, limit=length):
92 132 fp.write(s)
93 133 def redirect(self):
94 134 self.oldio = self.ui.fout, self.ui.ferr
95 135 self.ui.ferr = self.ui.fout = stringio()
96 136 def restore(self):
97 137 val = self.ui.fout.getvalue()
98 138 self.ui.ferr, self.ui.fout = self.oldio
99 139 return val
100 140
101 141 def _client(self):
102 142 return 'remote:%s:%s:%s' % (
103 143 self.req.env.get('wsgi.url_scheme') or 'http',
104 144 urlreq.quote(self.req.env.get('REMOTE_HOST', '')),
105 145 urlreq.quote(self.req.env.get('REMOTE_USER', '')))
106 146
107 147 def responsetype(self, prefer_uncompressed):
108 148 """Determine the appropriate response type and compression settings.
109 149
110 150 Returns a tuple of (mediatype, compengine, engineopts).
111 151 """
112 152 # Determine the response media type and compression engine based
113 153 # on the request parameters.
114 154 protocaps = decodevaluefromheaders(self.req, r'X-HgProto').split(' ')
115 155
116 156 if '0.2' in protocaps:
117 157 # All clients are expected to support uncompressed data.
118 158 if prefer_uncompressed:
119 159 return HGTYPE2, util._noopengine(), {}
120 160
121 161 # Default as defined by wire protocol spec.
122 162 compformats = ['zlib', 'none']
123 163 for cap in protocaps:
124 164 if cap.startswith('comp='):
125 165 compformats = cap[5:].split(',')
126 166 break
127 167
128 168 # Now find an agreed upon compression format.
129 169 for engine in wireproto.supportedcompengines(self.ui, self,
130 170 util.SERVERROLE):
131 171 if engine.wireprotosupport().name in compformats:
132 172 opts = {}
133 173 level = self.ui.configint('server',
134 174 '%slevel' % engine.name())
135 175 if level is not None:
136 176 opts['level'] = level
137 177
138 178 return HGTYPE2, engine, opts
139 179
140 180 # No mutually supported compression format. Fall back to the
141 181 # legacy protocol.
142 182
143 183 # Don't allow untrusted settings because disabling compression or
144 184 # setting a very high compression level could lead to flooding
145 185 # the server's network or CPU.
146 186 opts = {'level': self.ui.configint('server', 'zliblevel')}
147 187 return HGTYPE, util.compengines['zlib'], opts
148 188
149 189 def iscmd(cmd):
150 190 return cmd in wireproto.commands
151 191
152 192 def callhttp(repo, req, cmd):
153 193 p = webproto(req, repo.ui)
154 194
155 195 def genversion2(gen, engine, engineopts):
156 196 # application/mercurial-0.2 always sends a payload header
157 197 # identifying the compression engine.
158 198 name = engine.wireprotosupport().name
159 199 assert 0 < len(name) < 256
160 200 yield struct.pack('B', len(name))
161 201 yield name
162 202
163 203 for chunk in gen:
164 204 yield chunk
165 205
166 206 rsp = wireproto.dispatch(repo, p, cmd)
167 207 if isinstance(rsp, bytes):
168 208 req.respond(HTTP_OK, HGTYPE, body=rsp)
169 209 return []
170 210 elif isinstance(rsp, wireproto.streamres_legacy):
171 211 gen = rsp.gen
172 212 req.respond(HTTP_OK, HGTYPE)
173 213 return gen
174 214 elif isinstance(rsp, wireproto.streamres):
175 215 gen = rsp.gen
176 216
177 217 # This code for compression should not be streamres specific. It
178 218 # is here because we only compress streamres at the moment.
179 219 mediatype, engine, engineopts = p.responsetype(rsp.prefer_uncompressed)
180 220 gen = engine.compressstream(gen, engineopts)
181 221
182 222 if mediatype == HGTYPE2:
183 223 gen = genversion2(gen, engine, engineopts)
184 224
185 225 req.respond(HTTP_OK, mediatype)
186 226 return gen
187 227 elif isinstance(rsp, wireproto.pushres):
188 228 val = p.restore()
189 229 rsp = '%d\n%s' % (rsp.res, val)
190 230 req.respond(HTTP_OK, HGTYPE, body=rsp)
191 231 return []
192 232 elif isinstance(rsp, wireproto.pusherr):
193 233 # drain the incoming bundle
194 234 req.drain()
195 235 p.restore()
196 236 rsp = '0\n%s\n' % rsp.res
197 237 req.respond(HTTP_OK, HGTYPE, body=rsp)
198 238 return []
199 239 elif isinstance(rsp, wireproto.ooberror):
200 240 rsp = rsp.message
201 241 req.respond(HTTP_OK, HGERRTYPE, body=rsp)
202 242 return []
203 243 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
204 244
205 class sshserver(wireproto.abstractserverproto):
245 class sshserver(abstractserverproto):
206 246 def __init__(self, ui, repo):
207 247 self.ui = ui
208 248 self.repo = repo
209 249 self.lock = None
210 250 self.fin = ui.fin
211 251 self.fout = ui.fout
212 252 self.name = 'ssh'
213 253
214 254 hook.redirect(True)
215 255 ui.fout = repo.ui.fout = ui.ferr
216 256
217 257 # Prevent insertion/deletion of CRs
218 258 util.setbinary(self.fin)
219 259 util.setbinary(self.fout)
220 260
221 261 def getargs(self, args):
222 262 data = {}
223 263 keys = args.split()
224 264 for n in xrange(len(keys)):
225 265 argline = self.fin.readline()[:-1]
226 266 arg, l = argline.split()
227 267 if arg not in keys:
228 268 raise error.Abort(_("unexpected parameter %r") % arg)
229 269 if arg == '*':
230 270 star = {}
231 271 for k in xrange(int(l)):
232 272 argline = self.fin.readline()[:-1]
233 273 arg, l = argline.split()
234 274 val = self.fin.read(int(l))
235 275 star[arg] = val
236 276 data['*'] = star
237 277 else:
238 278 val = self.fin.read(int(l))
239 279 data[arg] = val
240 280 return [data[k] for k in keys]
241 281
242 282 def getarg(self, name):
243 283 return self.getargs(name)[0]
244 284
245 285 def getfile(self, fpout):
246 286 self.sendresponse('')
247 287 count = int(self.fin.readline())
248 288 while count:
249 289 fpout.write(self.fin.read(count))
250 290 count = int(self.fin.readline())
251 291
252 292 def redirect(self):
253 293 pass
254 294
255 295 def sendresponse(self, v):
256 296 self.fout.write("%d\n" % len(v))
257 297 self.fout.write(v)
258 298 self.fout.flush()
259 299
260 300 def sendstream(self, source):
261 301 write = self.fout.write
262 302 for chunk in source.gen:
263 303 write(chunk)
264 304 self.fout.flush()
265 305
266 306 def sendpushresponse(self, rsp):
267 307 self.sendresponse('')
268 308 self.sendresponse(str(rsp.res))
269 309
270 310 def sendpusherror(self, rsp):
271 311 self.sendresponse(rsp.res)
272 312
273 313 def sendooberror(self, rsp):
274 314 self.ui.ferr.write('%s\n-\n' % rsp.message)
275 315 self.ui.ferr.flush()
276 316 self.fout.write('\n')
277 317 self.fout.flush()
278 318
279 319 def serve_forever(self):
280 320 try:
281 321 while self.serve_one():
282 322 pass
283 323 finally:
284 324 if self.lock is not None:
285 325 self.lock.release()
286 326 sys.exit(0)
287 327
288 328 handlers = {
289 329 str: sendresponse,
290 330 wireproto.streamres: sendstream,
291 331 wireproto.streamres_legacy: sendstream,
292 332 wireproto.pushres: sendpushresponse,
293 333 wireproto.pusherr: sendpusherror,
294 334 wireproto.ooberror: sendooberror,
295 335 }
296 336
297 337 def serve_one(self):
298 338 cmd = self.fin.readline()[:-1]
299 339 if cmd and cmd in wireproto.commands:
300 340 rsp = wireproto.dispatch(self.repo, self, cmd)
301 341 self.handlers[rsp.__class__](self, rsp)
302 342 elif cmd:
303 343 impl = getattr(self, 'do_' + cmd, None)
304 344 if impl:
305 345 r = impl()
306 346 if r is not None:
307 347 self.sendresponse(r)
308 348 else:
309 349 self.sendresponse("")
310 350 return cmd != ''
311 351
312 352 def _client(self):
313 353 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
314 354 return 'remote:ssh:' + client
General Comments 0
You need to be logged in to leave comments. Login now