##// END OF EJS Templates
wireproto: bounce kwargs to/from bytes/str as needed...
Augie Fackler -
r34740:b880cc11 default
parent child Browse files
Show More
@@ -1,1067 +1,1068 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import os
12 12 import tempfile
13 13
14 14 from .i18n import _
15 15 from .node import (
16 16 bin,
17 17 hex,
18 18 nullid,
19 19 )
20 20
21 21 from . import (
22 22 bundle2,
23 23 changegroup as changegroupmod,
24 24 discovery,
25 25 encoding,
26 26 error,
27 27 exchange,
28 28 peer,
29 29 pushkey as pushkeymod,
30 30 pycompat,
31 31 repository,
32 32 streamclone,
33 33 util,
34 34 )
35 35
36 36 urlerr = util.urlerr
37 37 urlreq = util.urlreq
38 38
39 39 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
40 40 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
41 41 'IncompatibleClient')
42 42 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
43 43
44 44 class abstractserverproto(object):
45 45 """abstract class that summarizes the protocol API
46 46
47 47 Used as reference and documentation.
48 48 """
49 49
50 50 def getargs(self, args):
51 51 """return the value for arguments in <args>
52 52
53 53 returns a list of values (same order as <args>)"""
54 54 raise NotImplementedError()
55 55
56 56 def getfile(self, fp):
57 57 """write the whole content of a file into a file like object
58 58
59 59 The file is in the form::
60 60
61 61 (<chunk-size>\n<chunk>)+0\n
62 62
63 63 chunk size is the ascii version of the int.
64 64 """
65 65 raise NotImplementedError()
66 66
67 67 def redirect(self):
68 68 """may setup interception for stdout and stderr
69 69
70 70 See also the `restore` method."""
71 71 raise NotImplementedError()
72 72
73 73 # If the `redirect` function does install interception, the `restore`
74 74 # function MUST be defined. If interception is not used, this function
75 75 # MUST NOT be defined.
76 76 #
77 77 # left commented here on purpose
78 78 #
79 79 #def restore(self):
80 80 # """reinstall previous stdout and stderr and return intercepted stdout
81 81 # """
82 82 # raise NotImplementedError()
83 83
84 84 class remoteiterbatcher(peer.iterbatcher):
85 85 def __init__(self, remote):
86 86 super(remoteiterbatcher, self).__init__()
87 87 self._remote = remote
88 88
89 89 def __getattr__(self, name):
90 90 # Validate this method is batchable, since submit() only supports
91 91 # batchable methods.
92 92 fn = getattr(self._remote, name)
93 93 if not getattr(fn, 'batchable', None):
94 94 raise error.ProgrammingError('Attempted to batch a non-batchable '
95 95 'call to %r' % name)
96 96
97 97 return super(remoteiterbatcher, self).__getattr__(name)
98 98
99 99 def submit(self):
100 100 """Break the batch request into many patch calls and pipeline them.
101 101
102 102 This is mostly valuable over http where request sizes can be
103 103 limited, but can be used in other places as well.
104 104 """
105 105 # 2-tuple of (command, arguments) that represents what will be
106 106 # sent over the wire.
107 107 requests = []
108 108
109 109 # 4-tuple of (command, final future, @batchable generator, remote
110 110 # future).
111 111 results = []
112 112
113 113 for command, args, opts, finalfuture in self.calls:
114 114 mtd = getattr(self._remote, command)
115 115 batchable = mtd.batchable(mtd.__self__, *args, **opts)
116 116
117 117 commandargs, fremote = next(batchable)
118 118 assert fremote
119 119 requests.append((command, commandargs))
120 120 results.append((command, finalfuture, batchable, fremote))
121 121
122 122 if requests:
123 123 self._resultiter = self._remote._submitbatch(requests)
124 124
125 125 self._results = results
126 126
127 127 def results(self):
128 128 for command, finalfuture, batchable, remotefuture in self._results:
129 129 # Get the raw result, set it in the remote future, feed it
130 130 # back into the @batchable generator so it can be decoded, and
131 131 # set the result on the final future to this value.
132 132 remoteresult = next(self._resultiter)
133 133 remotefuture.set(remoteresult)
134 134 finalfuture.set(next(batchable))
135 135
136 136 # Verify our @batchable generators only emit 2 values.
137 137 try:
138 138 next(batchable)
139 139 except StopIteration:
140 140 pass
141 141 else:
142 142 raise error.ProgrammingError('%s @batchable generator emitted '
143 143 'unexpected value count' % command)
144 144
145 145 yield finalfuture.value
146 146
147 147 # Forward a couple of names from peer to make wireproto interactions
148 148 # slightly more sensible.
149 149 batchable = peer.batchable
150 150 future = peer.future
151 151
152 152 # list of nodes encoding / decoding
153 153
154 154 def decodelist(l, sep=' '):
155 155 if l:
156 156 return [bin(v) for v in l.split(sep)]
157 157 return []
158 158
159 159 def encodelist(l, sep=' '):
160 160 try:
161 161 return sep.join(map(hex, l))
162 162 except TypeError:
163 163 raise
164 164
165 165 # batched call argument encoding
166 166
167 167 def escapearg(plain):
168 168 return (plain
169 169 .replace(':', ':c')
170 170 .replace(',', ':o')
171 171 .replace(';', ':s')
172 172 .replace('=', ':e'))
173 173
174 174 def unescapearg(escaped):
175 175 return (escaped
176 176 .replace(':e', '=')
177 177 .replace(':s', ';')
178 178 .replace(':o', ',')
179 179 .replace(':c', ':'))
180 180
181 181 def encodebatchcmds(req):
182 182 """Return a ``cmds`` argument value for the ``batch`` command."""
183 183 cmds = []
184 184 for op, argsdict in req:
185 185 # Old servers didn't properly unescape argument names. So prevent
186 186 # the sending of argument names that may not be decoded properly by
187 187 # servers.
188 188 assert all(escapearg(k) == k for k in argsdict)
189 189
190 190 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
191 191 for k, v in argsdict.iteritems())
192 192 cmds.append('%s %s' % (op, args))
193 193
194 194 return ';'.join(cmds)
195 195
196 196 # mapping of options accepted by getbundle and their types
197 197 #
198 198 # Meant to be extended by extensions. It is extensions responsibility to ensure
199 199 # such options are properly processed in exchange.getbundle.
200 200 #
201 201 # supported types are:
202 202 #
203 203 # :nodes: list of binary nodes
204 204 # :csv: list of comma-separated values
205 205 # :scsv: list of comma-separated values return as set
206 206 # :plain: string with no transformation needed.
207 207 gboptsmap = {'heads': 'nodes',
208 208 'common': 'nodes',
209 209 'obsmarkers': 'boolean',
210 210 'phases': 'boolean',
211 211 'bundlecaps': 'scsv',
212 212 'listkeys': 'csv',
213 213 'cg': 'boolean',
214 214 'cbattempted': 'boolean'}
215 215
216 216 # client side
217 217
218 218 class wirepeer(repository.legacypeer):
219 219 """Client-side interface for communicating with a peer repository.
220 220
221 221 Methods commonly call wire protocol commands of the same name.
222 222
223 223 See also httppeer.py and sshpeer.py for protocol-specific
224 224 implementations of this interface.
225 225 """
226 226 # Begin of basewirepeer interface.
227 227
228 228 def iterbatch(self):
229 229 return remoteiterbatcher(self)
230 230
231 231 @batchable
232 232 def lookup(self, key):
233 233 self.requirecap('lookup', _('look up remote revision'))
234 234 f = future()
235 235 yield {'key': encoding.fromlocal(key)}, f
236 236 d = f.value
237 237 success, data = d[:-1].split(" ", 1)
238 238 if int(success):
239 239 yield bin(data)
240 240 else:
241 241 self._abort(error.RepoError(data))
242 242
243 243 @batchable
244 244 def heads(self):
245 245 f = future()
246 246 yield {}, f
247 247 d = f.value
248 248 try:
249 249 yield decodelist(d[:-1])
250 250 except ValueError:
251 251 self._abort(error.ResponseError(_("unexpected response:"), d))
252 252
253 253 @batchable
254 254 def known(self, nodes):
255 255 f = future()
256 256 yield {'nodes': encodelist(nodes)}, f
257 257 d = f.value
258 258 try:
259 259 yield [bool(int(b)) for b in d]
260 260 except ValueError:
261 261 self._abort(error.ResponseError(_("unexpected response:"), d))
262 262
263 263 @batchable
264 264 def branchmap(self):
265 265 f = future()
266 266 yield {}, f
267 267 d = f.value
268 268 try:
269 269 branchmap = {}
270 270 for branchpart in d.splitlines():
271 271 branchname, branchheads = branchpart.split(' ', 1)
272 272 branchname = encoding.tolocal(urlreq.unquote(branchname))
273 273 branchheads = decodelist(branchheads)
274 274 branchmap[branchname] = branchheads
275 275 yield branchmap
276 276 except TypeError:
277 277 self._abort(error.ResponseError(_("unexpected response:"), d))
278 278
279 279 @batchable
280 280 def listkeys(self, namespace):
281 281 if not self.capable('pushkey'):
282 282 yield {}, None
283 283 f = future()
284 284 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
285 285 yield {'namespace': encoding.fromlocal(namespace)}, f
286 286 d = f.value
287 287 self.ui.debug('received listkey for "%s": %i bytes\n'
288 288 % (namespace, len(d)))
289 289 yield pushkeymod.decodekeys(d)
290 290
291 291 @batchable
292 292 def pushkey(self, namespace, key, old, new):
293 293 if not self.capable('pushkey'):
294 294 yield False, None
295 295 f = future()
296 296 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
297 297 yield {'namespace': encoding.fromlocal(namespace),
298 298 'key': encoding.fromlocal(key),
299 299 'old': encoding.fromlocal(old),
300 300 'new': encoding.fromlocal(new)}, f
301 301 d = f.value
302 302 d, output = d.split('\n', 1)
303 303 try:
304 304 d = bool(int(d))
305 305 except ValueError:
306 306 raise error.ResponseError(
307 307 _('push failed (unexpected response):'), d)
308 308 for l in output.splitlines(True):
309 309 self.ui.status(_('remote: '), l)
310 310 yield d
311 311
312 312 def stream_out(self):
313 313 return self._callstream('stream_out')
314 314
315 315 def getbundle(self, source, **kwargs):
316 kwargs = pycompat.byteskwargs(kwargs)
316 317 self.requirecap('getbundle', _('look up remote changes'))
317 318 opts = {}
318 319 bundlecaps = kwargs.get('bundlecaps')
319 320 if bundlecaps is not None:
320 321 kwargs['bundlecaps'] = sorted(bundlecaps)
321 322 else:
322 323 bundlecaps = () # kwargs could have it to None
323 324 for key, value in kwargs.iteritems():
324 325 if value is None:
325 326 continue
326 327 keytype = gboptsmap.get(key)
327 328 if keytype is None:
328 329 raise error.ProgrammingError(
329 330 'Unexpectedly None keytype for key %s' % key)
330 331 elif keytype == 'nodes':
331 332 value = encodelist(value)
332 333 elif keytype in ('csv', 'scsv'):
333 334 value = ','.join(value)
334 335 elif keytype == 'boolean':
335 336 value = '%i' % bool(value)
336 337 elif keytype != 'plain':
337 338 raise KeyError('unknown getbundle option type %s'
338 339 % keytype)
339 340 opts[key] = value
340 f = self._callcompressable("getbundle", **opts)
341 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
341 342 if any((cap.startswith('HG2') for cap in bundlecaps)):
342 343 return bundle2.getunbundler(self.ui, f)
343 344 else:
344 345 return changegroupmod.cg1unpacker(f, 'UN')
345 346
346 347 def unbundle(self, cg, heads, url):
347 348 '''Send cg (a readable file-like object representing the
348 349 changegroup to push, typically a chunkbuffer object) to the
349 350 remote server as a bundle.
350 351
351 352 When pushing a bundle10 stream, return an integer indicating the
352 353 result of the push (see changegroup.apply()).
353 354
354 355 When pushing a bundle20 stream, return a bundle20 stream.
355 356
356 357 `url` is the url the client thinks it's pushing to, which is
357 358 visible to hooks.
358 359 '''
359 360
360 361 if heads != ['force'] and self.capable('unbundlehash'):
361 362 heads = encodelist(['hashed',
362 363 hashlib.sha1(''.join(sorted(heads))).digest()])
363 364 else:
364 365 heads = encodelist(heads)
365 366
366 367 if util.safehasattr(cg, 'deltaheader'):
367 368 # this a bundle10, do the old style call sequence
368 369 ret, output = self._callpush("unbundle", cg, heads=heads)
369 370 if ret == "":
370 371 raise error.ResponseError(
371 372 _('push failed:'), output)
372 373 try:
373 374 ret = int(ret)
374 375 except ValueError:
375 376 raise error.ResponseError(
376 377 _('push failed (unexpected response):'), ret)
377 378
378 379 for l in output.splitlines(True):
379 380 self.ui.status(_('remote: '), l)
380 381 else:
381 382 # bundle2 push. Send a stream, fetch a stream.
382 383 stream = self._calltwowaystream('unbundle', cg, heads=heads)
383 384 ret = bundle2.getunbundler(self.ui, stream)
384 385 return ret
385 386
386 387 # End of basewirepeer interface.
387 388
388 389 # Begin of baselegacywirepeer interface.
389 390
390 391 def branches(self, nodes):
391 392 n = encodelist(nodes)
392 393 d = self._call("branches", nodes=n)
393 394 try:
394 395 br = [tuple(decodelist(b)) for b in d.splitlines()]
395 396 return br
396 397 except ValueError:
397 398 self._abort(error.ResponseError(_("unexpected response:"), d))
398 399
399 400 def between(self, pairs):
400 401 batch = 8 # avoid giant requests
401 402 r = []
402 403 for i in xrange(0, len(pairs), batch):
403 404 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
404 405 d = self._call("between", pairs=n)
405 406 try:
406 407 r.extend(l and decodelist(l) or [] for l in d.splitlines())
407 408 except ValueError:
408 409 self._abort(error.ResponseError(_("unexpected response:"), d))
409 410 return r
410 411
411 412 def changegroup(self, nodes, kind):
412 413 n = encodelist(nodes)
413 414 f = self._callcompressable("changegroup", roots=n)
414 415 return changegroupmod.cg1unpacker(f, 'UN')
415 416
416 417 def changegroupsubset(self, bases, heads, kind):
417 418 self.requirecap('changegroupsubset', _('look up remote changes'))
418 419 bases = encodelist(bases)
419 420 heads = encodelist(heads)
420 421 f = self._callcompressable("changegroupsubset",
421 422 bases=bases, heads=heads)
422 423 return changegroupmod.cg1unpacker(f, 'UN')
423 424
424 425 # End of baselegacywirepeer interface.
425 426
426 427 def _submitbatch(self, req):
427 428 """run batch request <req> on the server
428 429
429 430 Returns an iterator of the raw responses from the server.
430 431 """
431 432 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
432 433 chunk = rsp.read(1024)
433 434 work = [chunk]
434 435 while chunk:
435 436 while ';' not in chunk and chunk:
436 437 chunk = rsp.read(1024)
437 438 work.append(chunk)
438 439 merged = ''.join(work)
439 440 while ';' in merged:
440 441 one, merged = merged.split(';', 1)
441 442 yield unescapearg(one)
442 443 chunk = rsp.read(1024)
443 444 work = [merged, chunk]
444 445 yield unescapearg(''.join(work))
445 446
446 447 def _submitone(self, op, args):
447 return self._call(op, **args)
448 return self._call(op, **pycompat.strkwargs(args))
448 449
449 450 def debugwireargs(self, one, two, three=None, four=None, five=None):
450 451 # don't pass optional arguments left at their default value
451 452 opts = {}
452 453 if three is not None:
453 454 opts['three'] = three
454 455 if four is not None:
455 456 opts['four'] = four
456 457 return self._call('debugwireargs', one=one, two=two, **opts)
457 458
458 459 def _call(self, cmd, **args):
459 460 """execute <cmd> on the server
460 461
461 462 The command is expected to return a simple string.
462 463
463 464 returns the server reply as a string."""
464 465 raise NotImplementedError()
465 466
466 467 def _callstream(self, cmd, **args):
467 468 """execute <cmd> on the server
468 469
469 470 The command is expected to return a stream. Note that if the
470 471 command doesn't return a stream, _callstream behaves
471 472 differently for ssh and http peers.
472 473
473 474 returns the server reply as a file like object.
474 475 """
475 476 raise NotImplementedError()
476 477
477 478 def _callcompressable(self, cmd, **args):
478 479 """execute <cmd> on the server
479 480
480 481 The command is expected to return a stream.
481 482
482 483 The stream may have been compressed in some implementations. This
483 484 function takes care of the decompression. This is the only difference
484 485 with _callstream.
485 486
486 487 returns the server reply as a file like object.
487 488 """
488 489 raise NotImplementedError()
489 490
490 491 def _callpush(self, cmd, fp, **args):
491 492 """execute a <cmd> on server
492 493
493 494 The command is expected to be related to a push. Push has a special
494 495 return method.
495 496
496 497 returns the server reply as a (ret, output) tuple. ret is either
497 498 empty (error) or a stringified int.
498 499 """
499 500 raise NotImplementedError()
500 501
501 502 def _calltwowaystream(self, cmd, fp, **args):
502 503 """execute <cmd> on server
503 504
504 505 The command will send a stream to the server and get a stream in reply.
505 506 """
506 507 raise NotImplementedError()
507 508
508 509 def _abort(self, exception):
509 510 """clearly abort the wire protocol connection and raise the exception
510 511 """
511 512 raise NotImplementedError()
512 513
513 514 # server side
514 515
515 516 # wire protocol command can either return a string or one of these classes.
516 517 class streamres(object):
517 518 """wireproto reply: binary stream
518 519
519 520 The call was successful and the result is a stream.
520 521
521 522 Accepts either a generator or an object with a ``read(size)`` method.
522 523
523 524 ``v1compressible`` indicates whether this data can be compressed to
524 525 "version 1" clients (technically: HTTP peers using
525 526 application/mercurial-0.1 media type). This flag should NOT be used on
526 527 new commands because new clients should support a more modern compression
527 528 mechanism.
528 529 """
529 530 def __init__(self, gen=None, reader=None, v1compressible=False):
530 531 self.gen = gen
531 532 self.reader = reader
532 533 self.v1compressible = v1compressible
533 534
534 535 class pushres(object):
535 536 """wireproto reply: success with simple integer return
536 537
537 538 The call was successful and returned an integer contained in `self.res`.
538 539 """
539 540 def __init__(self, res):
540 541 self.res = res
541 542
542 543 class pusherr(object):
543 544 """wireproto reply: failure
544 545
545 546 The call failed. The `self.res` attribute contains the error message.
546 547 """
547 548 def __init__(self, res):
548 549 self.res = res
549 550
550 551 class ooberror(object):
551 552 """wireproto reply: failure of a batch of operation
552 553
553 554 Something failed during a batch call. The error message is stored in
554 555 `self.message`.
555 556 """
556 557 def __init__(self, message):
557 558 self.message = message
558 559
559 560 def getdispatchrepo(repo, proto, command):
560 561 """Obtain the repo used for processing wire protocol commands.
561 562
562 563 The intent of this function is to serve as a monkeypatch point for
563 564 extensions that need commands to operate on different repo views under
564 565 specialized circumstances.
565 566 """
566 567 return repo.filtered('served')
567 568
568 569 def dispatch(repo, proto, command):
569 570 repo = getdispatchrepo(repo, proto, command)
570 571 func, spec = commands[command]
571 572 args = proto.getargs(spec)
572 573 return func(repo, proto, *args)
573 574
574 575 def options(cmd, keys, others):
575 576 opts = {}
576 577 for k in keys:
577 578 if k in others:
578 579 opts[k] = others[k]
579 580 del others[k]
580 581 if others:
581 582 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
582 583 % (cmd, ",".join(others)))
583 584 return opts
584 585
585 586 def bundle1allowed(repo, action):
586 587 """Whether a bundle1 operation is allowed from the server.
587 588
588 589 Priority is:
589 590
590 591 1. server.bundle1gd.<action> (if generaldelta active)
591 592 2. server.bundle1.<action>
592 593 3. server.bundle1gd (if generaldelta active)
593 594 4. server.bundle1
594 595 """
595 596 ui = repo.ui
596 597 gd = 'generaldelta' in repo.requirements
597 598
598 599 if gd:
599 600 v = ui.configbool('server', 'bundle1gd.%s' % action)
600 601 if v is not None:
601 602 return v
602 603
603 604 v = ui.configbool('server', 'bundle1.%s' % action)
604 605 if v is not None:
605 606 return v
606 607
607 608 if gd:
608 609 v = ui.configbool('server', 'bundle1gd')
609 610 if v is not None:
610 611 return v
611 612
612 613 return ui.configbool('server', 'bundle1')
613 614
614 615 def supportedcompengines(ui, proto, role):
615 616 """Obtain the list of supported compression engines for a request."""
616 617 assert role in (util.CLIENTROLE, util.SERVERROLE)
617 618
618 619 compengines = util.compengines.supportedwireengines(role)
619 620
620 621 # Allow config to override default list and ordering.
621 622 if role == util.SERVERROLE:
622 623 configengines = ui.configlist('server', 'compressionengines')
623 624 config = 'server.compressionengines'
624 625 else:
625 626 # This is currently implemented mainly to facilitate testing. In most
626 627 # cases, the server should be in charge of choosing a compression engine
627 628 # because a server has the most to lose from a sub-optimal choice. (e.g.
628 629 # CPU DoS due to an expensive engine or a network DoS due to poor
629 630 # compression ratio).
630 631 configengines = ui.configlist('experimental',
631 632 'clientcompressionengines')
632 633 config = 'experimental.clientcompressionengines'
633 634
634 635 # No explicit config. Filter out the ones that aren't supposed to be
635 636 # advertised and return default ordering.
636 637 if not configengines:
637 638 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
638 639 return [e for e in compengines
639 640 if getattr(e.wireprotosupport(), attr) > 0]
640 641
641 642 # If compression engines are listed in the config, assume there is a good
642 643 # reason for it (like server operators wanting to achieve specific
643 644 # performance characteristics). So fail fast if the config references
644 645 # unusable compression engines.
645 646 validnames = set(e.name() for e in compengines)
646 647 invalidnames = set(e for e in configengines if e not in validnames)
647 648 if invalidnames:
648 649 raise error.Abort(_('invalid compression engine defined in %s: %s') %
649 650 (config, ', '.join(sorted(invalidnames))))
650 651
651 652 compengines = [e for e in compengines if e.name() in configengines]
652 653 compengines = sorted(compengines,
653 654 key=lambda e: configengines.index(e.name()))
654 655
655 656 if not compengines:
656 657 raise error.Abort(_('%s config option does not specify any known '
657 658 'compression engines') % config,
658 659 hint=_('usable compression engines: %s') %
659 660 ', '.sorted(validnames))
660 661
661 662 return compengines
662 663
663 664 # list of commands
664 665 commands = {}
665 666
666 667 def wireprotocommand(name, args=''):
667 668 """decorator for wire protocol command"""
668 669 def register(func):
669 670 commands[name] = (func, args)
670 671 return func
671 672 return register
672 673
673 674 @wireprotocommand('batch', 'cmds *')
674 675 def batch(repo, proto, cmds, others):
675 676 repo = repo.filtered("served")
676 677 res = []
677 678 for pair in cmds.split(';'):
678 679 op, args = pair.split(' ', 1)
679 680 vals = {}
680 681 for a in args.split(','):
681 682 if a:
682 683 n, v = a.split('=')
683 684 vals[unescapearg(n)] = unescapearg(v)
684 685 func, spec = commands[op]
685 686 if spec:
686 687 keys = spec.split()
687 688 data = {}
688 689 for k in keys:
689 690 if k == '*':
690 691 star = {}
691 692 for key in vals.keys():
692 693 if key not in keys:
693 694 star[key] = vals[key]
694 695 data['*'] = star
695 696 else:
696 697 data[k] = vals[k]
697 698 result = func(repo, proto, *[data[k] for k in keys])
698 699 else:
699 700 result = func(repo, proto)
700 701 if isinstance(result, ooberror):
701 702 return result
702 703 res.append(escapearg(result))
703 704 return ';'.join(res)
704 705
705 706 @wireprotocommand('between', 'pairs')
706 707 def between(repo, proto, pairs):
707 708 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
708 709 r = []
709 710 for b in repo.between(pairs):
710 711 r.append(encodelist(b) + "\n")
711 712 return "".join(r)
712 713
713 714 @wireprotocommand('branchmap')
714 715 def branchmap(repo, proto):
715 716 branchmap = repo.branchmap()
716 717 heads = []
717 718 for branch, nodes in branchmap.iteritems():
718 719 branchname = urlreq.quote(encoding.fromlocal(branch))
719 720 branchnodes = encodelist(nodes)
720 721 heads.append('%s %s' % (branchname, branchnodes))
721 722 return '\n'.join(heads)
722 723
723 724 @wireprotocommand('branches', 'nodes')
724 725 def branches(repo, proto, nodes):
725 726 nodes = decodelist(nodes)
726 727 r = []
727 728 for b in repo.branches(nodes):
728 729 r.append(encodelist(b) + "\n")
729 730 return "".join(r)
730 731
731 732 @wireprotocommand('clonebundles', '')
732 733 def clonebundles(repo, proto):
733 734 """Server command for returning info for available bundles to seed clones.
734 735
735 736 Clients will parse this response and determine what bundle to fetch.
736 737
737 738 Extensions may wrap this command to filter or dynamically emit data
738 739 depending on the request. e.g. you could advertise URLs for the closest
739 740 data center given the client's IP address.
740 741 """
741 742 return repo.vfs.tryread('clonebundles.manifest')
742 743
743 744 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
744 745 'known', 'getbundle', 'unbundlehash', 'batch']
745 746
746 747 def _capabilities(repo, proto):
747 748 """return a list of capabilities for a repo
748 749
749 750 This function exists to allow extensions to easily wrap capabilities
750 751 computation
751 752
752 753 - returns a lists: easy to alter
753 754 - change done here will be propagated to both `capabilities` and `hello`
754 755 command without any other action needed.
755 756 """
756 757 # copy to prevent modification of the global list
757 758 caps = list(wireprotocaps)
758 759 if streamclone.allowservergeneration(repo):
759 760 if repo.ui.configbool('server', 'preferuncompressed'):
760 761 caps.append('stream-preferred')
761 762 requiredformats = repo.requirements & repo.supportedformats
762 763 # if our local revlogs are just revlogv1, add 'stream' cap
763 764 if not requiredformats - {'revlogv1'}:
764 765 caps.append('stream')
765 766 # otherwise, add 'streamreqs' detailing our local revlog format
766 767 else:
767 768 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
768 769 if repo.ui.configbool('experimental', 'bundle2-advertise'):
769 770 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
770 771 caps.append('bundle2=' + urlreq.quote(capsblob))
771 772 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
772 773
773 774 if proto.name == 'http':
774 775 caps.append('httpheader=%d' %
775 776 repo.ui.configint('server', 'maxhttpheaderlen'))
776 777 if repo.ui.configbool('experimental', 'httppostargs'):
777 778 caps.append('httppostargs')
778 779
779 780 # FUTURE advertise 0.2rx once support is implemented
780 781 # FUTURE advertise minrx and mintx after consulting config option
781 782 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
782 783
783 784 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
784 785 if compengines:
785 786 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
786 787 for e in compengines)
787 788 caps.append('compression=%s' % comptypes)
788 789
789 790 return caps
790 791
791 792 # If you are writing an extension and consider wrapping this function. Wrap
792 793 # `_capabilities` instead.
793 794 @wireprotocommand('capabilities')
794 795 def capabilities(repo, proto):
795 796 return ' '.join(_capabilities(repo, proto))
796 797
797 798 @wireprotocommand('changegroup', 'roots')
798 799 def changegroup(repo, proto, roots):
799 800 nodes = decodelist(roots)
800 801 outgoing = discovery.outgoing(repo, missingroots=nodes,
801 802 missingheads=repo.heads())
802 803 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
803 804 return streamres(reader=cg, v1compressible=True)
804 805
805 806 @wireprotocommand('changegroupsubset', 'bases heads')
806 807 def changegroupsubset(repo, proto, bases, heads):
807 808 bases = decodelist(bases)
808 809 heads = decodelist(heads)
809 810 outgoing = discovery.outgoing(repo, missingroots=bases,
810 811 missingheads=heads)
811 812 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
812 813 return streamres(reader=cg, v1compressible=True)
813 814
814 815 @wireprotocommand('debugwireargs', 'one two *')
815 816 def debugwireargs(repo, proto, one, two, others):
816 817 # only accept optional args from the known set
817 818 opts = options('debugwireargs', ['three', 'four'], others)
818 819 return repo.debugwireargs(one, two, **opts)
819 820
820 821 @wireprotocommand('getbundle', '*')
821 822 def getbundle(repo, proto, others):
822 823 opts = options('getbundle', gboptsmap.keys(), others)
823 824 for k, v in opts.iteritems():
824 825 keytype = gboptsmap[k]
825 826 if keytype == 'nodes':
826 827 opts[k] = decodelist(v)
827 828 elif keytype == 'csv':
828 829 opts[k] = list(v.split(','))
829 830 elif keytype == 'scsv':
830 831 opts[k] = set(v.split(','))
831 832 elif keytype == 'boolean':
832 833 # Client should serialize False as '0', which is a non-empty string
833 834 # so it evaluates as a True bool.
834 835 if v == '0':
835 836 opts[k] = False
836 837 else:
837 838 opts[k] = bool(v)
838 839 elif keytype != 'plain':
839 840 raise KeyError('unknown getbundle option type %s'
840 841 % keytype)
841 842
842 843 if not bundle1allowed(repo, 'pull'):
843 844 if not exchange.bundle2requested(opts.get('bundlecaps')):
844 845 if proto.name == 'http':
845 846 return ooberror(bundle2required)
846 847 raise error.Abort(bundle2requiredmain,
847 848 hint=bundle2requiredhint)
848 849
849 850 try:
850 851 if repo.ui.configbool('server', 'disablefullbundle'):
851 852 # Check to see if this is a full clone.
852 853 clheads = set(repo.changelog.heads())
853 854 heads = set(opts.get('heads', set()))
854 855 common = set(opts.get('common', set()))
855 856 common.discard(nullid)
856 857 if not common and clheads == heads:
857 858 raise error.Abort(
858 859 _('server has pull-based clones disabled'),
859 860 hint=_('remove --pull if specified or upgrade Mercurial'))
860 861
861 862 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
862 863 except error.Abort as exc:
863 864 # cleanly forward Abort error to the client
864 865 if not exchange.bundle2requested(opts.get('bundlecaps')):
865 866 if proto.name == 'http':
866 867 return ooberror(str(exc) + '\n')
867 868 raise # cannot do better for bundle1 + ssh
868 869 # bundle2 request expect a bundle2 reply
869 870 bundler = bundle2.bundle20(repo.ui)
870 871 manargs = [('message', str(exc))]
871 872 advargs = []
872 873 if exc.hint is not None:
873 874 advargs.append(('hint', exc.hint))
874 875 bundler.addpart(bundle2.bundlepart('error:abort',
875 876 manargs, advargs))
876 877 return streamres(gen=bundler.getchunks(), v1compressible=True)
877 878 return streamres(gen=chunks, v1compressible=True)
878 879
879 880 @wireprotocommand('heads')
880 881 def heads(repo, proto):
881 882 h = repo.heads()
882 883 return encodelist(h) + "\n"
883 884
884 885 @wireprotocommand('hello')
885 886 def hello(repo, proto):
886 887 '''the hello command returns a set of lines describing various
887 888 interesting things about the server, in an RFC822-like format.
888 889 Currently the only one defined is "capabilities", which
889 890 consists of a line in the form:
890 891
891 892 capabilities: space separated list of tokens
892 893 '''
893 894 return "capabilities: %s\n" % (capabilities(repo, proto))
894 895
895 896 @wireprotocommand('listkeys', 'namespace')
896 897 def listkeys(repo, proto, namespace):
897 898 d = repo.listkeys(encoding.tolocal(namespace)).items()
898 899 return pushkeymod.encodekeys(d)
899 900
900 901 @wireprotocommand('lookup', 'key')
901 902 def lookup(repo, proto, key):
902 903 try:
903 904 k = encoding.tolocal(key)
904 905 c = repo[k]
905 906 r = c.hex()
906 907 success = 1
907 908 except Exception as inst:
908 909 r = str(inst)
909 910 success = 0
910 911 return "%d %s\n" % (success, r)
911 912
912 913 @wireprotocommand('known', 'nodes *')
913 914 def known(repo, proto, nodes, others):
914 915 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
915 916
916 917 @wireprotocommand('pushkey', 'namespace key old new')
917 918 def pushkey(repo, proto, namespace, key, old, new):
918 919 # compatibility with pre-1.8 clients which were accidentally
919 920 # sending raw binary nodes rather than utf-8-encoded hex
920 921 if len(new) == 20 and util.escapestr(new) != new:
921 922 # looks like it could be a binary node
922 923 try:
923 924 new.decode('utf-8')
924 925 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
925 926 except UnicodeDecodeError:
926 927 pass # binary, leave unmodified
927 928 else:
928 929 new = encoding.tolocal(new) # normal path
929 930
930 931 if util.safehasattr(proto, 'restore'):
931 932
932 933 proto.redirect()
933 934
934 935 try:
935 936 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
936 937 encoding.tolocal(old), new) or False
937 938 except error.Abort:
938 939 r = False
939 940
940 941 output = proto.restore()
941 942
942 943 return '%s\n%s' % (int(r), output)
943 944
944 945 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
945 946 encoding.tolocal(old), new)
946 947 return '%s\n' % int(r)
947 948
948 949 @wireprotocommand('stream_out')
949 950 def stream(repo, proto):
950 951 '''If the server supports streaming clone, it advertises the "stream"
951 952 capability with a value representing the version and flags of the repo
952 953 it is serving. Client checks to see if it understands the format.
953 954 '''
954 955 if not streamclone.allowservergeneration(repo):
955 956 return '1\n'
956 957
957 958 def getstream(it):
958 959 yield '0\n'
959 960 for chunk in it:
960 961 yield chunk
961 962
962 963 try:
963 964 # LockError may be raised before the first result is yielded. Don't
964 965 # emit output until we're sure we got the lock successfully.
965 966 it = streamclone.generatev1wireproto(repo)
966 967 return streamres(gen=getstream(it))
967 968 except error.LockError:
968 969 return '2\n'
969 970
970 971 @wireprotocommand('unbundle', 'heads')
971 972 def unbundle(repo, proto, heads):
972 973 their_heads = decodelist(heads)
973 974
974 975 try:
975 976 proto.redirect()
976 977
977 978 exchange.check_heads(repo, their_heads, 'preparing changes')
978 979
979 980 # write bundle data to temporary file because it can be big
980 981 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
981 982 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
982 983 r = 0
983 984 try:
984 985 proto.getfile(fp)
985 986 fp.seek(0)
986 987 gen = exchange.readbundle(repo.ui, fp, None)
987 988 if (isinstance(gen, changegroupmod.cg1unpacker)
988 989 and not bundle1allowed(repo, 'push')):
989 990 if proto.name == 'http':
990 991 # need to special case http because stderr do not get to
991 992 # the http client on failed push so we need to abuse some
992 993 # other error type to make sure the message get to the
993 994 # user.
994 995 return ooberror(bundle2required)
995 996 raise error.Abort(bundle2requiredmain,
996 997 hint=bundle2requiredhint)
997 998
998 999 r = exchange.unbundle(repo, gen, their_heads, 'serve',
999 1000 proto._client())
1000 1001 if util.safehasattr(r, 'addpart'):
1001 1002 # The return looks streamable, we are in the bundle2 case and
1002 1003 # should return a stream.
1003 1004 return streamres(gen=r.getchunks())
1004 1005 return pushres(r)
1005 1006
1006 1007 finally:
1007 1008 fp.close()
1008 1009 os.unlink(tempname)
1009 1010
1010 1011 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1011 1012 # handle non-bundle2 case first
1012 1013 if not getattr(exc, 'duringunbundle2', False):
1013 1014 try:
1014 1015 raise
1015 1016 except error.Abort:
1016 1017 # The old code we moved used util.stderr directly.
1017 1018 # We did not change it to minimise code change.
1018 1019 # This need to be moved to something proper.
1019 1020 # Feel free to do it.
1020 1021 util.stderr.write("abort: %s\n" % exc)
1021 1022 if exc.hint is not None:
1022 1023 util.stderr.write("(%s)\n" % exc.hint)
1023 1024 return pushres(0)
1024 1025 except error.PushRaced:
1025 1026 return pusherr(str(exc))
1026 1027
1027 1028 bundler = bundle2.bundle20(repo.ui)
1028 1029 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1029 1030 bundler.addpart(out)
1030 1031 try:
1031 1032 try:
1032 1033 raise
1033 1034 except error.PushkeyFailed as exc:
1034 1035 # check client caps
1035 1036 remotecaps = getattr(exc, '_replycaps', None)
1036 1037 if (remotecaps is not None
1037 1038 and 'pushkey' not in remotecaps.get('error', ())):
1038 1039 # no support remote side, fallback to Abort handler.
1039 1040 raise
1040 1041 part = bundler.newpart('error:pushkey')
1041 1042 part.addparam('in-reply-to', exc.partid)
1042 1043 if exc.namespace is not None:
1043 1044 part.addparam('namespace', exc.namespace, mandatory=False)
1044 1045 if exc.key is not None:
1045 1046 part.addparam('key', exc.key, mandatory=False)
1046 1047 if exc.new is not None:
1047 1048 part.addparam('new', exc.new, mandatory=False)
1048 1049 if exc.old is not None:
1049 1050 part.addparam('old', exc.old, mandatory=False)
1050 1051 if exc.ret is not None:
1051 1052 part.addparam('ret', exc.ret, mandatory=False)
1052 1053 except error.BundleValueError as exc:
1053 1054 errpart = bundler.newpart('error:unsupportedcontent')
1054 1055 if exc.parttype is not None:
1055 1056 errpart.addparam('parttype', exc.parttype)
1056 1057 if exc.params:
1057 1058 errpart.addparam('params', '\0'.join(exc.params))
1058 1059 except error.Abort as exc:
1059 1060 manargs = [('message', str(exc))]
1060 1061 advargs = []
1061 1062 if exc.hint is not None:
1062 1063 advargs.append(('hint', exc.hint))
1063 1064 bundler.addpart(bundle2.bundlepart('error:abort',
1064 1065 manargs, advargs))
1065 1066 except error.PushRaced as exc:
1066 1067 bundler.newpart('error:pushraced', [('message', str(exc))])
1067 1068 return streamres(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now