##// END OF EJS Templates
wireproto: do not abort after successful lookup...
Kyle Lippincott -
r34064:6c6169f7 default
parent child Browse files
Show More
@@ -1,1059 +1,1060
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import os
12 12 import tempfile
13 13
14 14 from .i18n import _
15 15 from .node import (
16 16 bin,
17 17 hex,
18 18 nullid,
19 19 )
20 20
21 21 from . import (
22 22 bundle2,
23 23 changegroup as changegroupmod,
24 24 encoding,
25 25 error,
26 26 exchange,
27 27 peer,
28 28 pushkey as pushkeymod,
29 29 pycompat,
30 30 repository,
31 31 streamclone,
32 32 util,
33 33 )
34 34
35 35 urlerr = util.urlerr
36 36 urlreq = util.urlreq
37 37
38 38 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
39 39 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
40 40 'IncompatibleClient')
41 41 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
42 42
43 43 class abstractserverproto(object):
44 44 """abstract class that summarizes the protocol API
45 45
46 46 Used as reference and documentation.
47 47 """
48 48
49 49 def getargs(self, args):
50 50 """return the value for arguments in <args>
51 51
52 52 returns a list of values (same order as <args>)"""
53 53 raise NotImplementedError()
54 54
55 55 def getfile(self, fp):
56 56 """write the whole content of a file into a file like object
57 57
58 58 The file is in the form::
59 59
60 60 (<chunk-size>\n<chunk>)+0\n
61 61
62 62 chunk size is the ascii version of the int.
63 63 """
64 64 raise NotImplementedError()
65 65
66 66 def redirect(self):
67 67 """may setup interception for stdout and stderr
68 68
69 69 See also the `restore` method."""
70 70 raise NotImplementedError()
71 71
72 72 # If the `redirect` function does install interception, the `restore`
73 73 # function MUST be defined. If interception is not used, this function
74 74 # MUST NOT be defined.
75 75 #
76 76 # left commented here on purpose
77 77 #
78 78 #def restore(self):
79 79 # """reinstall previous stdout and stderr and return intercepted stdout
80 80 # """
81 81 # raise NotImplementedError()
82 82
83 83 class remoteiterbatcher(peer.iterbatcher):
84 84 def __init__(self, remote):
85 85 super(remoteiterbatcher, self).__init__()
86 86 self._remote = remote
87 87
88 88 def __getattr__(self, name):
89 89 # Validate this method is batchable, since submit() only supports
90 90 # batchable methods.
91 91 fn = getattr(self._remote, name)
92 92 if not getattr(fn, 'batchable', None):
93 93 raise error.ProgrammingError('Attempted to batch a non-batchable '
94 94 'call to %r' % name)
95 95
96 96 return super(remoteiterbatcher, self).__getattr__(name)
97 97
98 98 def submit(self):
99 99 """Break the batch request into many patch calls and pipeline them.
100 100
101 101 This is mostly valuable over http where request sizes can be
102 102 limited, but can be used in other places as well.
103 103 """
104 104 # 2-tuple of (command, arguments) that represents what will be
105 105 # sent over the wire.
106 106 requests = []
107 107
108 108 # 4-tuple of (command, final future, @batchable generator, remote
109 109 # future).
110 110 results = []
111 111
112 112 for command, args, opts, finalfuture in self.calls:
113 113 mtd = getattr(self._remote, command)
114 114 batchable = mtd.batchable(mtd.im_self, *args, **opts)
115 115
116 116 commandargs, fremote = next(batchable)
117 117 assert fremote
118 118 requests.append((command, commandargs))
119 119 results.append((command, finalfuture, batchable, fremote))
120 120
121 121 if requests:
122 122 self._resultiter = self._remote._submitbatch(requests)
123 123
124 124 self._results = results
125 125
126 126 def results(self):
127 127 for command, finalfuture, batchable, remotefuture in self._results:
128 128 # Get the raw result, set it in the remote future, feed it
129 129 # back into the @batchable generator so it can be decoded, and
130 130 # set the result on the final future to this value.
131 131 remoteresult = next(self._resultiter)
132 132 remotefuture.set(remoteresult)
133 133 finalfuture.set(next(batchable))
134 134
135 135 # Verify our @batchable generators only emit 2 values.
136 136 try:
137 137 next(batchable)
138 138 except StopIteration:
139 139 pass
140 140 else:
141 141 raise error.ProgrammingError('%s @batchable generator emitted '
142 142 'unexpected value count' % command)
143 143
144 144 yield finalfuture.value
145 145
146 146 # Forward a couple of names from peer to make wireproto interactions
147 147 # slightly more sensible.
148 148 batchable = peer.batchable
149 149 future = peer.future
150 150
151 151 # list of nodes encoding / decoding
152 152
153 153 def decodelist(l, sep=' '):
154 154 if l:
155 155 return map(bin, l.split(sep))
156 156 return []
157 157
158 158 def encodelist(l, sep=' '):
159 159 try:
160 160 return sep.join(map(hex, l))
161 161 except TypeError:
162 162 raise
163 163
164 164 # batched call argument encoding
165 165
166 166 def escapearg(plain):
167 167 return (plain
168 168 .replace(':', ':c')
169 169 .replace(',', ':o')
170 170 .replace(';', ':s')
171 171 .replace('=', ':e'))
172 172
173 173 def unescapearg(escaped):
174 174 return (escaped
175 175 .replace(':e', '=')
176 176 .replace(':s', ';')
177 177 .replace(':o', ',')
178 178 .replace(':c', ':'))
179 179
180 180 def encodebatchcmds(req):
181 181 """Return a ``cmds`` argument value for the ``batch`` command."""
182 182 cmds = []
183 183 for op, argsdict in req:
184 184 # Old servers didn't properly unescape argument names. So prevent
185 185 # the sending of argument names that may not be decoded properly by
186 186 # servers.
187 187 assert all(escapearg(k) == k for k in argsdict)
188 188
189 189 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
190 190 for k, v in argsdict.iteritems())
191 191 cmds.append('%s %s' % (op, args))
192 192
193 193 return ';'.join(cmds)
194 194
195 195 # mapping of options accepted by getbundle and their types
196 196 #
197 197 # Meant to be extended by extensions. It is extensions responsibility to ensure
198 198 # such options are properly processed in exchange.getbundle.
199 199 #
200 200 # supported types are:
201 201 #
202 202 # :nodes: list of binary nodes
203 203 # :csv: list of comma-separated values
204 204 # :scsv: list of comma-separated values return as set
205 205 # :plain: string with no transformation needed.
206 206 gboptsmap = {'heads': 'nodes',
207 207 'common': 'nodes',
208 208 'obsmarkers': 'boolean',
209 209 'bundlecaps': 'scsv',
210 210 'listkeys': 'csv',
211 211 'cg': 'boolean',
212 212 'cbattempted': 'boolean'}
213 213
214 214 # client side
215 215
216 216 class wirepeer(repository.legacypeer):
217 217 """Client-side interface for communicating with a peer repository.
218 218
219 219 Methods commonly call wire protocol commands of the same name.
220 220
221 221 See also httppeer.py and sshpeer.py for protocol-specific
222 222 implementations of this interface.
223 223 """
224 224 # Begin of basewirepeer interface.
225 225
226 226 def iterbatch(self):
227 227 return remoteiterbatcher(self)
228 228
229 229 @batchable
230 230 def lookup(self, key):
231 231 self.requirecap('lookup', _('look up remote revision'))
232 232 f = future()
233 233 yield {'key': encoding.fromlocal(key)}, f
234 234 d = f.value
235 235 success, data = d[:-1].split(" ", 1)
236 236 if int(success):
237 237 yield bin(data)
238 self._abort(error.RepoError(data))
238 else:
239 self._abort(error.RepoError(data))
239 240
240 241 @batchable
241 242 def heads(self):
242 243 f = future()
243 244 yield {}, f
244 245 d = f.value
245 246 try:
246 247 yield decodelist(d[:-1])
247 248 except ValueError:
248 249 self._abort(error.ResponseError(_("unexpected response:"), d))
249 250
250 251 @batchable
251 252 def known(self, nodes):
252 253 f = future()
253 254 yield {'nodes': encodelist(nodes)}, f
254 255 d = f.value
255 256 try:
256 257 yield [bool(int(b)) for b in d]
257 258 except ValueError:
258 259 self._abort(error.ResponseError(_("unexpected response:"), d))
259 260
260 261 @batchable
261 262 def branchmap(self):
262 263 f = future()
263 264 yield {}, f
264 265 d = f.value
265 266 try:
266 267 branchmap = {}
267 268 for branchpart in d.splitlines():
268 269 branchname, branchheads = branchpart.split(' ', 1)
269 270 branchname = encoding.tolocal(urlreq.unquote(branchname))
270 271 branchheads = decodelist(branchheads)
271 272 branchmap[branchname] = branchheads
272 273 yield branchmap
273 274 except TypeError:
274 275 self._abort(error.ResponseError(_("unexpected response:"), d))
275 276
276 277 @batchable
277 278 def listkeys(self, namespace):
278 279 if not self.capable('pushkey'):
279 280 yield {}, None
280 281 f = future()
281 282 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
282 283 yield {'namespace': encoding.fromlocal(namespace)}, f
283 284 d = f.value
284 285 self.ui.debug('received listkey for "%s": %i bytes\n'
285 286 % (namespace, len(d)))
286 287 yield pushkeymod.decodekeys(d)
287 288
288 289 @batchable
289 290 def pushkey(self, namespace, key, old, new):
290 291 if not self.capable('pushkey'):
291 292 yield False, None
292 293 f = future()
293 294 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
294 295 yield {'namespace': encoding.fromlocal(namespace),
295 296 'key': encoding.fromlocal(key),
296 297 'old': encoding.fromlocal(old),
297 298 'new': encoding.fromlocal(new)}, f
298 299 d = f.value
299 300 d, output = d.split('\n', 1)
300 301 try:
301 302 d = bool(int(d))
302 303 except ValueError:
303 304 raise error.ResponseError(
304 305 _('push failed (unexpected response):'), d)
305 306 for l in output.splitlines(True):
306 307 self.ui.status(_('remote: '), l)
307 308 yield d
308 309
309 310 def stream_out(self):
310 311 return self._callstream('stream_out')
311 312
312 313 def getbundle(self, source, **kwargs):
313 314 self.requirecap('getbundle', _('look up remote changes'))
314 315 opts = {}
315 316 bundlecaps = kwargs.get('bundlecaps')
316 317 if bundlecaps is not None:
317 318 kwargs['bundlecaps'] = sorted(bundlecaps)
318 319 else:
319 320 bundlecaps = () # kwargs could have it to None
320 321 for key, value in kwargs.iteritems():
321 322 if value is None:
322 323 continue
323 324 keytype = gboptsmap.get(key)
324 325 if keytype is None:
325 326 assert False, 'unexpected'
326 327 elif keytype == 'nodes':
327 328 value = encodelist(value)
328 329 elif keytype in ('csv', 'scsv'):
329 330 value = ','.join(value)
330 331 elif keytype == 'boolean':
331 332 value = '%i' % bool(value)
332 333 elif keytype != 'plain':
333 334 raise KeyError('unknown getbundle option type %s'
334 335 % keytype)
335 336 opts[key] = value
336 337 f = self._callcompressable("getbundle", **opts)
337 338 if any((cap.startswith('HG2') for cap in bundlecaps)):
338 339 return bundle2.getunbundler(self.ui, f)
339 340 else:
340 341 return changegroupmod.cg1unpacker(f, 'UN')
341 342
342 343 def unbundle(self, cg, heads, url):
343 344 '''Send cg (a readable file-like object representing the
344 345 changegroup to push, typically a chunkbuffer object) to the
345 346 remote server as a bundle.
346 347
347 348 When pushing a bundle10 stream, return an integer indicating the
348 349 result of the push (see changegroup.apply()).
349 350
350 351 When pushing a bundle20 stream, return a bundle20 stream.
351 352
352 353 `url` is the url the client thinks it's pushing to, which is
353 354 visible to hooks.
354 355 '''
355 356
356 357 if heads != ['force'] and self.capable('unbundlehash'):
357 358 heads = encodelist(['hashed',
358 359 hashlib.sha1(''.join(sorted(heads))).digest()])
359 360 else:
360 361 heads = encodelist(heads)
361 362
362 363 if util.safehasattr(cg, 'deltaheader'):
363 364 # this a bundle10, do the old style call sequence
364 365 ret, output = self._callpush("unbundle", cg, heads=heads)
365 366 if ret == "":
366 367 raise error.ResponseError(
367 368 _('push failed:'), output)
368 369 try:
369 370 ret = int(ret)
370 371 except ValueError:
371 372 raise error.ResponseError(
372 373 _('push failed (unexpected response):'), ret)
373 374
374 375 for l in output.splitlines(True):
375 376 self.ui.status(_('remote: '), l)
376 377 else:
377 378 # bundle2 push. Send a stream, fetch a stream.
378 379 stream = self._calltwowaystream('unbundle', cg, heads=heads)
379 380 ret = bundle2.getunbundler(self.ui, stream)
380 381 return ret
381 382
382 383 # End of basewirepeer interface.
383 384
384 385 # Begin of baselegacywirepeer interface.
385 386
386 387 def branches(self, nodes):
387 388 n = encodelist(nodes)
388 389 d = self._call("branches", nodes=n)
389 390 try:
390 391 br = [tuple(decodelist(b)) for b in d.splitlines()]
391 392 return br
392 393 except ValueError:
393 394 self._abort(error.ResponseError(_("unexpected response:"), d))
394 395
395 396 def between(self, pairs):
396 397 batch = 8 # avoid giant requests
397 398 r = []
398 399 for i in xrange(0, len(pairs), batch):
399 400 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
400 401 d = self._call("between", pairs=n)
401 402 try:
402 403 r.extend(l and decodelist(l) or [] for l in d.splitlines())
403 404 except ValueError:
404 405 self._abort(error.ResponseError(_("unexpected response:"), d))
405 406 return r
406 407
407 408 def changegroup(self, nodes, kind):
408 409 n = encodelist(nodes)
409 410 f = self._callcompressable("changegroup", roots=n)
410 411 return changegroupmod.cg1unpacker(f, 'UN')
411 412
412 413 def changegroupsubset(self, bases, heads, kind):
413 414 self.requirecap('changegroupsubset', _('look up remote changes'))
414 415 bases = encodelist(bases)
415 416 heads = encodelist(heads)
416 417 f = self._callcompressable("changegroupsubset",
417 418 bases=bases, heads=heads)
418 419 return changegroupmod.cg1unpacker(f, 'UN')
419 420
420 421 # End of baselegacywirepeer interface.
421 422
422 423 def _submitbatch(self, req):
423 424 """run batch request <req> on the server
424 425
425 426 Returns an iterator of the raw responses from the server.
426 427 """
427 428 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
428 429 chunk = rsp.read(1024)
429 430 work = [chunk]
430 431 while chunk:
431 432 while ';' not in chunk and chunk:
432 433 chunk = rsp.read(1024)
433 434 work.append(chunk)
434 435 merged = ''.join(work)
435 436 while ';' in merged:
436 437 one, merged = merged.split(';', 1)
437 438 yield unescapearg(one)
438 439 chunk = rsp.read(1024)
439 440 work = [merged, chunk]
440 441 yield unescapearg(''.join(work))
441 442
442 443 def _submitone(self, op, args):
443 444 return self._call(op, **args)
444 445
445 446 def debugwireargs(self, one, two, three=None, four=None, five=None):
446 447 # don't pass optional arguments left at their default value
447 448 opts = {}
448 449 if three is not None:
449 450 opts['three'] = three
450 451 if four is not None:
451 452 opts['four'] = four
452 453 return self._call('debugwireargs', one=one, two=two, **opts)
453 454
454 455 def _call(self, cmd, **args):
455 456 """execute <cmd> on the server
456 457
457 458 The command is expected to return a simple string.
458 459
459 460 returns the server reply as a string."""
460 461 raise NotImplementedError()
461 462
462 463 def _callstream(self, cmd, **args):
463 464 """execute <cmd> on the server
464 465
465 466 The command is expected to return a stream. Note that if the
466 467 command doesn't return a stream, _callstream behaves
467 468 differently for ssh and http peers.
468 469
469 470 returns the server reply as a file like object.
470 471 """
471 472 raise NotImplementedError()
472 473
473 474 def _callcompressable(self, cmd, **args):
474 475 """execute <cmd> on the server
475 476
476 477 The command is expected to return a stream.
477 478
478 479 The stream may have been compressed in some implementations. This
479 480 function takes care of the decompression. This is the only difference
480 481 with _callstream.
481 482
482 483 returns the server reply as a file like object.
483 484 """
484 485 raise NotImplementedError()
485 486
486 487 def _callpush(self, cmd, fp, **args):
487 488 """execute a <cmd> on server
488 489
489 490 The command is expected to be related to a push. Push has a special
490 491 return method.
491 492
492 493 returns the server reply as a (ret, output) tuple. ret is either
493 494 empty (error) or a stringified int.
494 495 """
495 496 raise NotImplementedError()
496 497
497 498 def _calltwowaystream(self, cmd, fp, **args):
498 499 """execute <cmd> on server
499 500
500 501 The command will send a stream to the server and get a stream in reply.
501 502 """
502 503 raise NotImplementedError()
503 504
504 505 def _abort(self, exception):
505 506 """clearly abort the wire protocol connection and raise the exception
506 507 """
507 508 raise NotImplementedError()
508 509
509 510 # server side
510 511
511 512 # wire protocol command can either return a string or one of these classes.
512 513 class streamres(object):
513 514 """wireproto reply: binary stream
514 515
515 516 The call was successful and the result is a stream.
516 517
517 518 Accepts either a generator or an object with a ``read(size)`` method.
518 519
519 520 ``v1compressible`` indicates whether this data can be compressed to
520 521 "version 1" clients (technically: HTTP peers using
521 522 application/mercurial-0.1 media type). This flag should NOT be used on
522 523 new commands because new clients should support a more modern compression
523 524 mechanism.
524 525 """
525 526 def __init__(self, gen=None, reader=None, v1compressible=False):
526 527 self.gen = gen
527 528 self.reader = reader
528 529 self.v1compressible = v1compressible
529 530
530 531 class pushres(object):
531 532 """wireproto reply: success with simple integer return
532 533
533 534 The call was successful and returned an integer contained in `self.res`.
534 535 """
535 536 def __init__(self, res):
536 537 self.res = res
537 538
538 539 class pusherr(object):
539 540 """wireproto reply: failure
540 541
541 542 The call failed. The `self.res` attribute contains the error message.
542 543 """
543 544 def __init__(self, res):
544 545 self.res = res
545 546
546 547 class ooberror(object):
547 548 """wireproto reply: failure of a batch of operation
548 549
549 550 Something failed during a batch call. The error message is stored in
550 551 `self.message`.
551 552 """
552 553 def __init__(self, message):
553 554 self.message = message
554 555
555 556 def getdispatchrepo(repo, proto, command):
556 557 """Obtain the repo used for processing wire protocol commands.
557 558
558 559 The intent of this function is to serve as a monkeypatch point for
559 560 extensions that need commands to operate on different repo views under
560 561 specialized circumstances.
561 562 """
562 563 return repo.filtered('served')
563 564
564 565 def dispatch(repo, proto, command):
565 566 repo = getdispatchrepo(repo, proto, command)
566 567 func, spec = commands[command]
567 568 args = proto.getargs(spec)
568 569 return func(repo, proto, *args)
569 570
570 571 def options(cmd, keys, others):
571 572 opts = {}
572 573 for k in keys:
573 574 if k in others:
574 575 opts[k] = others[k]
575 576 del others[k]
576 577 if others:
577 578 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
578 579 % (cmd, ",".join(others)))
579 580 return opts
580 581
581 582 def bundle1allowed(repo, action):
582 583 """Whether a bundle1 operation is allowed from the server.
583 584
584 585 Priority is:
585 586
586 587 1. server.bundle1gd.<action> (if generaldelta active)
587 588 2. server.bundle1.<action>
588 589 3. server.bundle1gd (if generaldelta active)
589 590 4. server.bundle1
590 591 """
591 592 ui = repo.ui
592 593 gd = 'generaldelta' in repo.requirements
593 594
594 595 if gd:
595 596 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
596 597 if v is not None:
597 598 return v
598 599
599 600 v = ui.configbool('server', 'bundle1.%s' % action, None)
600 601 if v is not None:
601 602 return v
602 603
603 604 if gd:
604 605 v = ui.configbool('server', 'bundle1gd')
605 606 if v is not None:
606 607 return v
607 608
608 609 return ui.configbool('server', 'bundle1')
609 610
610 611 def supportedcompengines(ui, proto, role):
611 612 """Obtain the list of supported compression engines for a request."""
612 613 assert role in (util.CLIENTROLE, util.SERVERROLE)
613 614
614 615 compengines = util.compengines.supportedwireengines(role)
615 616
616 617 # Allow config to override default list and ordering.
617 618 if role == util.SERVERROLE:
618 619 configengines = ui.configlist('server', 'compressionengines')
619 620 config = 'server.compressionengines'
620 621 else:
621 622 # This is currently implemented mainly to facilitate testing. In most
622 623 # cases, the server should be in charge of choosing a compression engine
623 624 # because a server has the most to lose from a sub-optimal choice. (e.g.
624 625 # CPU DoS due to an expensive engine or a network DoS due to poor
625 626 # compression ratio).
626 627 configengines = ui.configlist('experimental',
627 628 'clientcompressionengines')
628 629 config = 'experimental.clientcompressionengines'
629 630
630 631 # No explicit config. Filter out the ones that aren't supposed to be
631 632 # advertised and return default ordering.
632 633 if not configengines:
633 634 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
634 635 return [e for e in compengines
635 636 if getattr(e.wireprotosupport(), attr) > 0]
636 637
637 638 # If compression engines are listed in the config, assume there is a good
638 639 # reason for it (like server operators wanting to achieve specific
639 640 # performance characteristics). So fail fast if the config references
640 641 # unusable compression engines.
641 642 validnames = set(e.name() for e in compengines)
642 643 invalidnames = set(e for e in configengines if e not in validnames)
643 644 if invalidnames:
644 645 raise error.Abort(_('invalid compression engine defined in %s: %s') %
645 646 (config, ', '.join(sorted(invalidnames))))
646 647
647 648 compengines = [e for e in compengines if e.name() in configengines]
648 649 compengines = sorted(compengines,
649 650 key=lambda e: configengines.index(e.name()))
650 651
651 652 if not compengines:
652 653 raise error.Abort(_('%s config option does not specify any known '
653 654 'compression engines') % config,
654 655 hint=_('usable compression engines: %s') %
655 656 ', '.sorted(validnames))
656 657
657 658 return compengines
658 659
659 660 # list of commands
660 661 commands = {}
661 662
662 663 def wireprotocommand(name, args=''):
663 664 """decorator for wire protocol command"""
664 665 def register(func):
665 666 commands[name] = (func, args)
666 667 return func
667 668 return register
668 669
669 670 @wireprotocommand('batch', 'cmds *')
670 671 def batch(repo, proto, cmds, others):
671 672 repo = repo.filtered("served")
672 673 res = []
673 674 for pair in cmds.split(';'):
674 675 op, args = pair.split(' ', 1)
675 676 vals = {}
676 677 for a in args.split(','):
677 678 if a:
678 679 n, v = a.split('=')
679 680 vals[unescapearg(n)] = unescapearg(v)
680 681 func, spec = commands[op]
681 682 if spec:
682 683 keys = spec.split()
683 684 data = {}
684 685 for k in keys:
685 686 if k == '*':
686 687 star = {}
687 688 for key in vals.keys():
688 689 if key not in keys:
689 690 star[key] = vals[key]
690 691 data['*'] = star
691 692 else:
692 693 data[k] = vals[k]
693 694 result = func(repo, proto, *[data[k] for k in keys])
694 695 else:
695 696 result = func(repo, proto)
696 697 if isinstance(result, ooberror):
697 698 return result
698 699 res.append(escapearg(result))
699 700 return ';'.join(res)
700 701
701 702 @wireprotocommand('between', 'pairs')
702 703 def between(repo, proto, pairs):
703 704 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
704 705 r = []
705 706 for b in repo.between(pairs):
706 707 r.append(encodelist(b) + "\n")
707 708 return "".join(r)
708 709
709 710 @wireprotocommand('branchmap')
710 711 def branchmap(repo, proto):
711 712 branchmap = repo.branchmap()
712 713 heads = []
713 714 for branch, nodes in branchmap.iteritems():
714 715 branchname = urlreq.quote(encoding.fromlocal(branch))
715 716 branchnodes = encodelist(nodes)
716 717 heads.append('%s %s' % (branchname, branchnodes))
717 718 return '\n'.join(heads)
718 719
719 720 @wireprotocommand('branches', 'nodes')
720 721 def branches(repo, proto, nodes):
721 722 nodes = decodelist(nodes)
722 723 r = []
723 724 for b in repo.branches(nodes):
724 725 r.append(encodelist(b) + "\n")
725 726 return "".join(r)
726 727
727 728 @wireprotocommand('clonebundles', '')
728 729 def clonebundles(repo, proto):
729 730 """Server command for returning info for available bundles to seed clones.
730 731
731 732 Clients will parse this response and determine what bundle to fetch.
732 733
733 734 Extensions may wrap this command to filter or dynamically emit data
734 735 depending on the request. e.g. you could advertise URLs for the closest
735 736 data center given the client's IP address.
736 737 """
737 738 return repo.vfs.tryread('clonebundles.manifest')
738 739
739 740 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
740 741 'known', 'getbundle', 'unbundlehash', 'batch']
741 742
742 743 def _capabilities(repo, proto):
743 744 """return a list of capabilities for a repo
744 745
745 746 This function exists to allow extensions to easily wrap capabilities
746 747 computation
747 748
748 749 - returns a lists: easy to alter
749 750 - change done here will be propagated to both `capabilities` and `hello`
750 751 command without any other action needed.
751 752 """
752 753 # copy to prevent modification of the global list
753 754 caps = list(wireprotocaps)
754 755 if streamclone.allowservergeneration(repo):
755 756 if repo.ui.configbool('server', 'preferuncompressed'):
756 757 caps.append('stream-preferred')
757 758 requiredformats = repo.requirements & repo.supportedformats
758 759 # if our local revlogs are just revlogv1, add 'stream' cap
759 760 if not requiredformats - {'revlogv1'}:
760 761 caps.append('stream')
761 762 # otherwise, add 'streamreqs' detailing our local revlog format
762 763 else:
763 764 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
764 765 if repo.ui.configbool('experimental', 'bundle2-advertise'):
765 766 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
766 767 caps.append('bundle2=' + urlreq.quote(capsblob))
767 768 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
768 769
769 770 if proto.name == 'http':
770 771 caps.append('httpheader=%d' %
771 772 repo.ui.configint('server', 'maxhttpheaderlen'))
772 773 if repo.ui.configbool('experimental', 'httppostargs'):
773 774 caps.append('httppostargs')
774 775
775 776 # FUTURE advertise 0.2rx once support is implemented
776 777 # FUTURE advertise minrx and mintx after consulting config option
777 778 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
778 779
779 780 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
780 781 if compengines:
781 782 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
782 783 for e in compengines)
783 784 caps.append('compression=%s' % comptypes)
784 785
785 786 return caps
786 787
787 788 # If you are writing an extension and consider wrapping this function. Wrap
788 789 # `_capabilities` instead.
789 790 @wireprotocommand('capabilities')
790 791 def capabilities(repo, proto):
791 792 return ' '.join(_capabilities(repo, proto))
792 793
793 794 @wireprotocommand('changegroup', 'roots')
794 795 def changegroup(repo, proto, roots):
795 796 nodes = decodelist(roots)
796 797 cg = changegroupmod.changegroup(repo, nodes, 'serve')
797 798 return streamres(reader=cg, v1compressible=True)
798 799
799 800 @wireprotocommand('changegroupsubset', 'bases heads')
800 801 def changegroupsubset(repo, proto, bases, heads):
801 802 bases = decodelist(bases)
802 803 heads = decodelist(heads)
803 804 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
804 805 return streamres(reader=cg, v1compressible=True)
805 806
806 807 @wireprotocommand('debugwireargs', 'one two *')
807 808 def debugwireargs(repo, proto, one, two, others):
808 809 # only accept optional args from the known set
809 810 opts = options('debugwireargs', ['three', 'four'], others)
810 811 return repo.debugwireargs(one, two, **opts)
811 812
812 813 @wireprotocommand('getbundle', '*')
813 814 def getbundle(repo, proto, others):
814 815 opts = options('getbundle', gboptsmap.keys(), others)
815 816 for k, v in opts.iteritems():
816 817 keytype = gboptsmap[k]
817 818 if keytype == 'nodes':
818 819 opts[k] = decodelist(v)
819 820 elif keytype == 'csv':
820 821 opts[k] = list(v.split(','))
821 822 elif keytype == 'scsv':
822 823 opts[k] = set(v.split(','))
823 824 elif keytype == 'boolean':
824 825 # Client should serialize False as '0', which is a non-empty string
825 826 # so it evaluates as a True bool.
826 827 if v == '0':
827 828 opts[k] = False
828 829 else:
829 830 opts[k] = bool(v)
830 831 elif keytype != 'plain':
831 832 raise KeyError('unknown getbundle option type %s'
832 833 % keytype)
833 834
834 835 if not bundle1allowed(repo, 'pull'):
835 836 if not exchange.bundle2requested(opts.get('bundlecaps')):
836 837 if proto.name == 'http':
837 838 return ooberror(bundle2required)
838 839 raise error.Abort(bundle2requiredmain,
839 840 hint=bundle2requiredhint)
840 841
841 842 try:
842 843 if repo.ui.configbool('server', 'disablefullbundle'):
843 844 # Check to see if this is a full clone.
844 845 clheads = set(repo.changelog.heads())
845 846 heads = set(opts.get('heads', set()))
846 847 common = set(opts.get('common', set()))
847 848 common.discard(nullid)
848 849 if not common and clheads == heads:
849 850 raise error.Abort(
850 851 _('server has pull-based clones disabled'),
851 852 hint=_('remove --pull if specified or upgrade Mercurial'))
852 853
853 854 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
854 855 except error.Abort as exc:
855 856 # cleanly forward Abort error to the client
856 857 if not exchange.bundle2requested(opts.get('bundlecaps')):
857 858 if proto.name == 'http':
858 859 return ooberror(str(exc) + '\n')
859 860 raise # cannot do better for bundle1 + ssh
860 861 # bundle2 request expect a bundle2 reply
861 862 bundler = bundle2.bundle20(repo.ui)
862 863 manargs = [('message', str(exc))]
863 864 advargs = []
864 865 if exc.hint is not None:
865 866 advargs.append(('hint', exc.hint))
866 867 bundler.addpart(bundle2.bundlepart('error:abort',
867 868 manargs, advargs))
868 869 return streamres(gen=bundler.getchunks(), v1compressible=True)
869 870 return streamres(gen=chunks, v1compressible=True)
870 871
871 872 @wireprotocommand('heads')
872 873 def heads(repo, proto):
873 874 h = repo.heads()
874 875 return encodelist(h) + "\n"
875 876
876 877 @wireprotocommand('hello')
877 878 def hello(repo, proto):
878 879 '''the hello command returns a set of lines describing various
879 880 interesting things about the server, in an RFC822-like format.
880 881 Currently the only one defined is "capabilities", which
881 882 consists of a line in the form:
882 883
883 884 capabilities: space separated list of tokens
884 885 '''
885 886 return "capabilities: %s\n" % (capabilities(repo, proto))
886 887
887 888 @wireprotocommand('listkeys', 'namespace')
888 889 def listkeys(repo, proto, namespace):
889 890 d = repo.listkeys(encoding.tolocal(namespace)).items()
890 891 return pushkeymod.encodekeys(d)
891 892
892 893 @wireprotocommand('lookup', 'key')
893 894 def lookup(repo, proto, key):
894 895 try:
895 896 k = encoding.tolocal(key)
896 897 c = repo[k]
897 898 r = c.hex()
898 899 success = 1
899 900 except Exception as inst:
900 901 r = str(inst)
901 902 success = 0
902 903 return "%s %s\n" % (success, r)
903 904
904 905 @wireprotocommand('known', 'nodes *')
905 906 def known(repo, proto, nodes, others):
906 907 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
907 908
908 909 @wireprotocommand('pushkey', 'namespace key old new')
909 910 def pushkey(repo, proto, namespace, key, old, new):
910 911 # compatibility with pre-1.8 clients which were accidentally
911 912 # sending raw binary nodes rather than utf-8-encoded hex
912 913 if len(new) == 20 and util.escapestr(new) != new:
913 914 # looks like it could be a binary node
914 915 try:
915 916 new.decode('utf-8')
916 917 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
917 918 except UnicodeDecodeError:
918 919 pass # binary, leave unmodified
919 920 else:
920 921 new = encoding.tolocal(new) # normal path
921 922
922 923 if util.safehasattr(proto, 'restore'):
923 924
924 925 proto.redirect()
925 926
926 927 try:
927 928 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
928 929 encoding.tolocal(old), new) or False
929 930 except error.Abort:
930 931 r = False
931 932
932 933 output = proto.restore()
933 934
934 935 return '%s\n%s' % (int(r), output)
935 936
936 937 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
937 938 encoding.tolocal(old), new)
938 939 return '%s\n' % int(r)
939 940
940 941 @wireprotocommand('stream_out')
941 942 def stream(repo, proto):
942 943 '''If the server supports streaming clone, it advertises the "stream"
943 944 capability with a value representing the version and flags of the repo
944 945 it is serving. Client checks to see if it understands the format.
945 946 '''
946 947 if not streamclone.allowservergeneration(repo):
947 948 return '1\n'
948 949
949 950 def getstream(it):
950 951 yield '0\n'
951 952 for chunk in it:
952 953 yield chunk
953 954
954 955 try:
955 956 # LockError may be raised before the first result is yielded. Don't
956 957 # emit output until we're sure we got the lock successfully.
957 958 it = streamclone.generatev1wireproto(repo)
958 959 return streamres(gen=getstream(it))
959 960 except error.LockError:
960 961 return '2\n'
961 962
962 963 @wireprotocommand('unbundle', 'heads')
963 964 def unbundle(repo, proto, heads):
964 965 their_heads = decodelist(heads)
965 966
966 967 try:
967 968 proto.redirect()
968 969
969 970 exchange.check_heads(repo, their_heads, 'preparing changes')
970 971
971 972 # write bundle data to temporary file because it can be big
972 973 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
973 974 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
974 975 r = 0
975 976 try:
976 977 proto.getfile(fp)
977 978 fp.seek(0)
978 979 gen = exchange.readbundle(repo.ui, fp, None)
979 980 if (isinstance(gen, changegroupmod.cg1unpacker)
980 981 and not bundle1allowed(repo, 'push')):
981 982 if proto.name == 'http':
982 983 # need to special case http because stderr do not get to
983 984 # the http client on failed push so we need to abuse some
984 985 # other error type to make sure the message get to the
985 986 # user.
986 987 return ooberror(bundle2required)
987 988 raise error.Abort(bundle2requiredmain,
988 989 hint=bundle2requiredhint)
989 990
990 991 r = exchange.unbundle(repo, gen, their_heads, 'serve',
991 992 proto._client())
992 993 if util.safehasattr(r, 'addpart'):
993 994 # The return looks streamable, we are in the bundle2 case and
994 995 # should return a stream.
995 996 return streamres(gen=r.getchunks())
996 997 return pushres(r)
997 998
998 999 finally:
999 1000 fp.close()
1000 1001 os.unlink(tempname)
1001 1002
1002 1003 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1003 1004 # handle non-bundle2 case first
1004 1005 if not getattr(exc, 'duringunbundle2', False):
1005 1006 try:
1006 1007 raise
1007 1008 except error.Abort:
1008 1009 # The old code we moved used util.stderr directly.
1009 1010 # We did not change it to minimise code change.
1010 1011 # This need to be moved to something proper.
1011 1012 # Feel free to do it.
1012 1013 util.stderr.write("abort: %s\n" % exc)
1013 1014 if exc.hint is not None:
1014 1015 util.stderr.write("(%s)\n" % exc.hint)
1015 1016 return pushres(0)
1016 1017 except error.PushRaced:
1017 1018 return pusherr(str(exc))
1018 1019
1019 1020 bundler = bundle2.bundle20(repo.ui)
1020 1021 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1021 1022 bundler.addpart(out)
1022 1023 try:
1023 1024 try:
1024 1025 raise
1025 1026 except error.PushkeyFailed as exc:
1026 1027 # check client caps
1027 1028 remotecaps = getattr(exc, '_replycaps', None)
1028 1029 if (remotecaps is not None
1029 1030 and 'pushkey' not in remotecaps.get('error', ())):
1030 1031 # no support remote side, fallback to Abort handler.
1031 1032 raise
1032 1033 part = bundler.newpart('error:pushkey')
1033 1034 part.addparam('in-reply-to', exc.partid)
1034 1035 if exc.namespace is not None:
1035 1036 part.addparam('namespace', exc.namespace, mandatory=False)
1036 1037 if exc.key is not None:
1037 1038 part.addparam('key', exc.key, mandatory=False)
1038 1039 if exc.new is not None:
1039 1040 part.addparam('new', exc.new, mandatory=False)
1040 1041 if exc.old is not None:
1041 1042 part.addparam('old', exc.old, mandatory=False)
1042 1043 if exc.ret is not None:
1043 1044 part.addparam('ret', exc.ret, mandatory=False)
1044 1045 except error.BundleValueError as exc:
1045 1046 errpart = bundler.newpart('error:unsupportedcontent')
1046 1047 if exc.parttype is not None:
1047 1048 errpart.addparam('parttype', exc.parttype)
1048 1049 if exc.params:
1049 1050 errpart.addparam('params', '\0'.join(exc.params))
1050 1051 except error.Abort as exc:
1051 1052 manargs = [('message', str(exc))]
1052 1053 advargs = []
1053 1054 if exc.hint is not None:
1054 1055 advargs.append(('hint', exc.hint))
1055 1056 bundler.addpart(bundle2.bundlepart('error:abort',
1056 1057 manargs, advargs))
1057 1058 except error.PushRaced as exc:
1058 1059 bundler.newpart('error:pushraced', [('message', str(exc))])
1059 1060 return streamres(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now