##// END OF EJS Templates
py3: handle keyword arguments correctly in wireproto.py...
Pulkit Goyal -
r35375:7d229241 default
parent child Browse files
Show More
@@ -1,1070 +1,1070 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import os
12 12 import tempfile
13 13
14 14 from .i18n import _
15 15 from .node import (
16 16 bin,
17 17 hex,
18 18 nullid,
19 19 )
20 20
21 21 from . import (
22 22 bundle2,
23 23 changegroup as changegroupmod,
24 24 discovery,
25 25 encoding,
26 26 error,
27 27 exchange,
28 28 peer,
29 29 pushkey as pushkeymod,
30 30 pycompat,
31 31 repository,
32 32 streamclone,
33 33 util,
34 34 )
35 35
36 36 urlerr = util.urlerr
37 37 urlreq = util.urlreq
38 38
39 39 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
40 40 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
41 41 'IncompatibleClient')
42 42 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
43 43
44 44 class abstractserverproto(object):
45 45 """abstract class that summarizes the protocol API
46 46
47 47 Used as reference and documentation.
48 48 """
49 49
50 50 def getargs(self, args):
51 51 """return the value for arguments in <args>
52 52
53 53 returns a list of values (same order as <args>)"""
54 54 raise NotImplementedError()
55 55
56 56 def getfile(self, fp):
57 57 """write the whole content of a file into a file like object
58 58
59 59 The file is in the form::
60 60
61 61 (<chunk-size>\n<chunk>)+0\n
62 62
63 63 chunk size is the ascii version of the int.
64 64 """
65 65 raise NotImplementedError()
66 66
67 67 def redirect(self):
68 68 """may setup interception for stdout and stderr
69 69
70 70 See also the `restore` method."""
71 71 raise NotImplementedError()
72 72
73 73 # If the `redirect` function does install interception, the `restore`
74 74 # function MUST be defined. If interception is not used, this function
75 75 # MUST NOT be defined.
76 76 #
77 77 # left commented here on purpose
78 78 #
79 79 #def restore(self):
80 80 # """reinstall previous stdout and stderr and return intercepted stdout
81 81 # """
82 82 # raise NotImplementedError()
83 83
84 84 class remoteiterbatcher(peer.iterbatcher):
85 85 def __init__(self, remote):
86 86 super(remoteiterbatcher, self).__init__()
87 87 self._remote = remote
88 88
89 89 def __getattr__(self, name):
90 90 # Validate this method is batchable, since submit() only supports
91 91 # batchable methods.
92 92 fn = getattr(self._remote, name)
93 93 if not getattr(fn, 'batchable', None):
94 94 raise error.ProgrammingError('Attempted to batch a non-batchable '
95 95 'call to %r' % name)
96 96
97 97 return super(remoteiterbatcher, self).__getattr__(name)
98 98
99 99 def submit(self):
100 100 """Break the batch request into many patch calls and pipeline them.
101 101
102 102 This is mostly valuable over http where request sizes can be
103 103 limited, but can be used in other places as well.
104 104 """
105 105 # 2-tuple of (command, arguments) that represents what will be
106 106 # sent over the wire.
107 107 requests = []
108 108
109 109 # 4-tuple of (command, final future, @batchable generator, remote
110 110 # future).
111 111 results = []
112 112
113 113 for command, args, opts, finalfuture in self.calls:
114 114 mtd = getattr(self._remote, command)
115 115 batchable = mtd.batchable(mtd.__self__, *args, **opts)
116 116
117 117 commandargs, fremote = next(batchable)
118 118 assert fremote
119 119 requests.append((command, commandargs))
120 120 results.append((command, finalfuture, batchable, fremote))
121 121
122 122 if requests:
123 123 self._resultiter = self._remote._submitbatch(requests)
124 124
125 125 self._results = results
126 126
127 127 def results(self):
128 128 for command, finalfuture, batchable, remotefuture in self._results:
129 129 # Get the raw result, set it in the remote future, feed it
130 130 # back into the @batchable generator so it can be decoded, and
131 131 # set the result on the final future to this value.
132 132 remoteresult = next(self._resultiter)
133 133 remotefuture.set(remoteresult)
134 134 finalfuture.set(next(batchable))
135 135
136 136 # Verify our @batchable generators only emit 2 values.
137 137 try:
138 138 next(batchable)
139 139 except StopIteration:
140 140 pass
141 141 else:
142 142 raise error.ProgrammingError('%s @batchable generator emitted '
143 143 'unexpected value count' % command)
144 144
145 145 yield finalfuture.value
146 146
147 147 # Forward a couple of names from peer to make wireproto interactions
148 148 # slightly more sensible.
149 149 batchable = peer.batchable
150 150 future = peer.future
151 151
152 152 # list of nodes encoding / decoding
153 153
154 154 def decodelist(l, sep=' '):
155 155 if l:
156 156 return [bin(v) for v in l.split(sep)]
157 157 return []
158 158
159 159 def encodelist(l, sep=' '):
160 160 try:
161 161 return sep.join(map(hex, l))
162 162 except TypeError:
163 163 raise
164 164
165 165 # batched call argument encoding
166 166
167 167 def escapearg(plain):
168 168 return (plain
169 169 .replace(':', ':c')
170 170 .replace(',', ':o')
171 171 .replace(';', ':s')
172 172 .replace('=', ':e'))
173 173
174 174 def unescapearg(escaped):
175 175 return (escaped
176 176 .replace(':e', '=')
177 177 .replace(':s', ';')
178 178 .replace(':o', ',')
179 179 .replace(':c', ':'))
180 180
181 181 def encodebatchcmds(req):
182 182 """Return a ``cmds`` argument value for the ``batch`` command."""
183 183 cmds = []
184 184 for op, argsdict in req:
185 185 # Old servers didn't properly unescape argument names. So prevent
186 186 # the sending of argument names that may not be decoded properly by
187 187 # servers.
188 188 assert all(escapearg(k) == k for k in argsdict)
189 189
190 190 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
191 191 for k, v in argsdict.iteritems())
192 192 cmds.append('%s %s' % (op, args))
193 193
194 194 return ';'.join(cmds)
195 195
196 196 # mapping of options accepted by getbundle and their types
197 197 #
198 198 # Meant to be extended by extensions. It is extensions responsibility to ensure
199 199 # such options are properly processed in exchange.getbundle.
200 200 #
201 201 # supported types are:
202 202 #
203 203 # :nodes: list of binary nodes
204 204 # :csv: list of comma-separated values
205 205 # :scsv: list of comma-separated values return as set
206 206 # :plain: string with no transformation needed.
207 207 gboptsmap = {'heads': 'nodes',
208 208 'bookmarks': 'boolean',
209 209 'common': 'nodes',
210 210 'obsmarkers': 'boolean',
211 211 'phases': 'boolean',
212 212 'bundlecaps': 'scsv',
213 213 'listkeys': 'csv',
214 214 'cg': 'boolean',
215 215 'cbattempted': 'boolean'}
216 216
217 217 # client side
218 218
219 219 class wirepeer(repository.legacypeer):
220 220 """Client-side interface for communicating with a peer repository.
221 221
222 222 Methods commonly call wire protocol commands of the same name.
223 223
224 224 See also httppeer.py and sshpeer.py for protocol-specific
225 225 implementations of this interface.
226 226 """
227 227 # Begin of basewirepeer interface.
228 228
229 229 def iterbatch(self):
230 230 return remoteiterbatcher(self)
231 231
232 232 @batchable
233 233 def lookup(self, key):
234 234 self.requirecap('lookup', _('look up remote revision'))
235 235 f = future()
236 236 yield {'key': encoding.fromlocal(key)}, f
237 237 d = f.value
238 238 success, data = d[:-1].split(" ", 1)
239 239 if int(success):
240 240 yield bin(data)
241 241 else:
242 242 self._abort(error.RepoError(data))
243 243
244 244 @batchable
245 245 def heads(self):
246 246 f = future()
247 247 yield {}, f
248 248 d = f.value
249 249 try:
250 250 yield decodelist(d[:-1])
251 251 except ValueError:
252 252 self._abort(error.ResponseError(_("unexpected response:"), d))
253 253
254 254 @batchable
255 255 def known(self, nodes):
256 256 f = future()
257 257 yield {'nodes': encodelist(nodes)}, f
258 258 d = f.value
259 259 try:
260 260 yield [bool(int(b)) for b in d]
261 261 except ValueError:
262 262 self._abort(error.ResponseError(_("unexpected response:"), d))
263 263
264 264 @batchable
265 265 def branchmap(self):
266 266 f = future()
267 267 yield {}, f
268 268 d = f.value
269 269 try:
270 270 branchmap = {}
271 271 for branchpart in d.splitlines():
272 272 branchname, branchheads = branchpart.split(' ', 1)
273 273 branchname = encoding.tolocal(urlreq.unquote(branchname))
274 274 branchheads = decodelist(branchheads)
275 275 branchmap[branchname] = branchheads
276 276 yield branchmap
277 277 except TypeError:
278 278 self._abort(error.ResponseError(_("unexpected response:"), d))
279 279
280 280 @batchable
281 281 def listkeys(self, namespace):
282 282 if not self.capable('pushkey'):
283 283 yield {}, None
284 284 f = future()
285 285 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
286 286 yield {'namespace': encoding.fromlocal(namespace)}, f
287 287 d = f.value
288 288 self.ui.debug('received listkey for "%s": %i bytes\n'
289 289 % (namespace, len(d)))
290 290 yield pushkeymod.decodekeys(d)
291 291
292 292 @batchable
293 293 def pushkey(self, namespace, key, old, new):
294 294 if not self.capable('pushkey'):
295 295 yield False, None
296 296 f = future()
297 297 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
298 298 yield {'namespace': encoding.fromlocal(namespace),
299 299 'key': encoding.fromlocal(key),
300 300 'old': encoding.fromlocal(old),
301 301 'new': encoding.fromlocal(new)}, f
302 302 d = f.value
303 303 d, output = d.split('\n', 1)
304 304 try:
305 305 d = bool(int(d))
306 306 except ValueError:
307 307 raise error.ResponseError(
308 308 _('push failed (unexpected response):'), d)
309 309 for l in output.splitlines(True):
310 310 self.ui.status(_('remote: '), l)
311 311 yield d
312 312
313 313 def stream_out(self):
314 314 return self._callstream('stream_out')
315 315
316 316 def getbundle(self, source, **kwargs):
317 317 kwargs = pycompat.byteskwargs(kwargs)
318 318 self.requirecap('getbundle', _('look up remote changes'))
319 319 opts = {}
320 320 bundlecaps = kwargs.get('bundlecaps')
321 321 if bundlecaps is not None:
322 322 kwargs['bundlecaps'] = sorted(bundlecaps)
323 323 else:
324 324 bundlecaps = () # kwargs could have it to None
325 325 for key, value in kwargs.iteritems():
326 326 if value is None:
327 327 continue
328 328 keytype = gboptsmap.get(key)
329 329 if keytype is None:
330 330 raise error.ProgrammingError(
331 331 'Unexpectedly None keytype for key %s' % key)
332 332 elif keytype == 'nodes':
333 333 value = encodelist(value)
334 334 elif keytype in ('csv', 'scsv'):
335 335 value = ','.join(value)
336 336 elif keytype == 'boolean':
337 337 value = '%i' % bool(value)
338 338 elif keytype != 'plain':
339 339 raise KeyError('unknown getbundle option type %s'
340 340 % keytype)
341 341 opts[key] = value
342 342 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
343 343 if any((cap.startswith('HG2') for cap in bundlecaps)):
344 344 return bundle2.getunbundler(self.ui, f)
345 345 else:
346 346 return changegroupmod.cg1unpacker(f, 'UN')
347 347
348 348 def unbundle(self, cg, heads, url):
349 349 '''Send cg (a readable file-like object representing the
350 350 changegroup to push, typically a chunkbuffer object) to the
351 351 remote server as a bundle.
352 352
353 353 When pushing a bundle10 stream, return an integer indicating the
354 354 result of the push (see changegroup.apply()).
355 355
356 356 When pushing a bundle20 stream, return a bundle20 stream.
357 357
358 358 `url` is the url the client thinks it's pushing to, which is
359 359 visible to hooks.
360 360 '''
361 361
362 362 if heads != ['force'] and self.capable('unbundlehash'):
363 363 heads = encodelist(['hashed',
364 364 hashlib.sha1(''.join(sorted(heads))).digest()])
365 365 else:
366 366 heads = encodelist(heads)
367 367
368 368 if util.safehasattr(cg, 'deltaheader'):
369 369 # this a bundle10, do the old style call sequence
370 370 ret, output = self._callpush("unbundle", cg, heads=heads)
371 371 if ret == "":
372 372 raise error.ResponseError(
373 373 _('push failed:'), output)
374 374 try:
375 375 ret = int(ret)
376 376 except ValueError:
377 377 raise error.ResponseError(
378 378 _('push failed (unexpected response):'), ret)
379 379
380 380 for l in output.splitlines(True):
381 381 self.ui.status(_('remote: '), l)
382 382 else:
383 383 # bundle2 push. Send a stream, fetch a stream.
384 384 stream = self._calltwowaystream('unbundle', cg, heads=heads)
385 385 ret = bundle2.getunbundler(self.ui, stream)
386 386 return ret
387 387
388 388 # End of basewirepeer interface.
389 389
390 390 # Begin of baselegacywirepeer interface.
391 391
392 392 def branches(self, nodes):
393 393 n = encodelist(nodes)
394 394 d = self._call("branches", nodes=n)
395 395 try:
396 396 br = [tuple(decodelist(b)) for b in d.splitlines()]
397 397 return br
398 398 except ValueError:
399 399 self._abort(error.ResponseError(_("unexpected response:"), d))
400 400
401 401 def between(self, pairs):
402 402 batch = 8 # avoid giant requests
403 403 r = []
404 404 for i in xrange(0, len(pairs), batch):
405 405 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
406 406 d = self._call("between", pairs=n)
407 407 try:
408 408 r.extend(l and decodelist(l) or [] for l in d.splitlines())
409 409 except ValueError:
410 410 self._abort(error.ResponseError(_("unexpected response:"), d))
411 411 return r
412 412
413 413 def changegroup(self, nodes, kind):
414 414 n = encodelist(nodes)
415 415 f = self._callcompressable("changegroup", roots=n)
416 416 return changegroupmod.cg1unpacker(f, 'UN')
417 417
418 418 def changegroupsubset(self, bases, heads, kind):
419 419 self.requirecap('changegroupsubset', _('look up remote changes'))
420 420 bases = encodelist(bases)
421 421 heads = encodelist(heads)
422 422 f = self._callcompressable("changegroupsubset",
423 423 bases=bases, heads=heads)
424 424 return changegroupmod.cg1unpacker(f, 'UN')
425 425
426 426 # End of baselegacywirepeer interface.
427 427
428 428 def _submitbatch(self, req):
429 429 """run batch request <req> on the server
430 430
431 431 Returns an iterator of the raw responses from the server.
432 432 """
433 433 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
434 434 chunk = rsp.read(1024)
435 435 work = [chunk]
436 436 while chunk:
437 437 while ';' not in chunk and chunk:
438 438 chunk = rsp.read(1024)
439 439 work.append(chunk)
440 440 merged = ''.join(work)
441 441 while ';' in merged:
442 442 one, merged = merged.split(';', 1)
443 443 yield unescapearg(one)
444 444 chunk = rsp.read(1024)
445 445 work = [merged, chunk]
446 446 yield unescapearg(''.join(work))
447 447
448 448 def _submitone(self, op, args):
449 449 return self._call(op, **pycompat.strkwargs(args))
450 450
451 451 def debugwireargs(self, one, two, three=None, four=None, five=None):
452 452 # don't pass optional arguments left at their default value
453 453 opts = {}
454 454 if three is not None:
455 opts['three'] = three
455 opts[r'three'] = three
456 456 if four is not None:
457 opts['four'] = four
457 opts[r'four'] = four
458 458 return self._call('debugwireargs', one=one, two=two, **opts)
459 459
460 460 def _call(self, cmd, **args):
461 461 """execute <cmd> on the server
462 462
463 463 The command is expected to return a simple string.
464 464
465 465 returns the server reply as a string."""
466 466 raise NotImplementedError()
467 467
468 468 def _callstream(self, cmd, **args):
469 469 """execute <cmd> on the server
470 470
471 471 The command is expected to return a stream. Note that if the
472 472 command doesn't return a stream, _callstream behaves
473 473 differently for ssh and http peers.
474 474
475 475 returns the server reply as a file like object.
476 476 """
477 477 raise NotImplementedError()
478 478
479 479 def _callcompressable(self, cmd, **args):
480 480 """execute <cmd> on the server
481 481
482 482 The command is expected to return a stream.
483 483
484 484 The stream may have been compressed in some implementations. This
485 485 function takes care of the decompression. This is the only difference
486 486 with _callstream.
487 487
488 488 returns the server reply as a file like object.
489 489 """
490 490 raise NotImplementedError()
491 491
492 492 def _callpush(self, cmd, fp, **args):
493 493 """execute a <cmd> on server
494 494
495 495 The command is expected to be related to a push. Push has a special
496 496 return method.
497 497
498 498 returns the server reply as a (ret, output) tuple. ret is either
499 499 empty (error) or a stringified int.
500 500 """
501 501 raise NotImplementedError()
502 502
503 503 def _calltwowaystream(self, cmd, fp, **args):
504 504 """execute <cmd> on server
505 505
506 506 The command will send a stream to the server and get a stream in reply.
507 507 """
508 508 raise NotImplementedError()
509 509
510 510 def _abort(self, exception):
511 511 """clearly abort the wire protocol connection and raise the exception
512 512 """
513 513 raise NotImplementedError()
514 514
515 515 # server side
516 516
517 517 # wire protocol command can either return a string or one of these classes.
518 518 class streamres(object):
519 519 """wireproto reply: binary stream
520 520
521 521 The call was successful and the result is a stream.
522 522
523 523 Accepts either a generator or an object with a ``read(size)`` method.
524 524
525 525 ``v1compressible`` indicates whether this data can be compressed to
526 526 "version 1" clients (technically: HTTP peers using
527 527 application/mercurial-0.1 media type). This flag should NOT be used on
528 528 new commands because new clients should support a more modern compression
529 529 mechanism.
530 530 """
531 531 def __init__(self, gen=None, reader=None, v1compressible=False):
532 532 self.gen = gen
533 533 self.reader = reader
534 534 self.v1compressible = v1compressible
535 535
536 536 class pushres(object):
537 537 """wireproto reply: success with simple integer return
538 538
539 539 The call was successful and returned an integer contained in `self.res`.
540 540 """
541 541 def __init__(self, res):
542 542 self.res = res
543 543
544 544 class pusherr(object):
545 545 """wireproto reply: failure
546 546
547 547 The call failed. The `self.res` attribute contains the error message.
548 548 """
549 549 def __init__(self, res):
550 550 self.res = res
551 551
552 552 class ooberror(object):
553 553 """wireproto reply: failure of a batch of operation
554 554
555 555 Something failed during a batch call. The error message is stored in
556 556 `self.message`.
557 557 """
558 558 def __init__(self, message):
559 559 self.message = message
560 560
561 561 def getdispatchrepo(repo, proto, command):
562 562 """Obtain the repo used for processing wire protocol commands.
563 563
564 564 The intent of this function is to serve as a monkeypatch point for
565 565 extensions that need commands to operate on different repo views under
566 566 specialized circumstances.
567 567 """
568 568 return repo.filtered('served')
569 569
570 570 def dispatch(repo, proto, command):
571 571 repo = getdispatchrepo(repo, proto, command)
572 572 func, spec = commands[command]
573 573 args = proto.getargs(spec)
574 574 return func(repo, proto, *args)
575 575
576 576 def options(cmd, keys, others):
577 577 opts = {}
578 578 for k in keys:
579 579 if k in others:
580 580 opts[k] = others[k]
581 581 del others[k]
582 582 if others:
583 583 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
584 584 % (cmd, ",".join(others)))
585 585 return opts
586 586
587 587 def bundle1allowed(repo, action):
588 588 """Whether a bundle1 operation is allowed from the server.
589 589
590 590 Priority is:
591 591
592 592 1. server.bundle1gd.<action> (if generaldelta active)
593 593 2. server.bundle1.<action>
594 594 3. server.bundle1gd (if generaldelta active)
595 595 4. server.bundle1
596 596 """
597 597 ui = repo.ui
598 598 gd = 'generaldelta' in repo.requirements
599 599
600 600 if gd:
601 601 v = ui.configbool('server', 'bundle1gd.%s' % action)
602 602 if v is not None:
603 603 return v
604 604
605 605 v = ui.configbool('server', 'bundle1.%s' % action)
606 606 if v is not None:
607 607 return v
608 608
609 609 if gd:
610 610 v = ui.configbool('server', 'bundle1gd')
611 611 if v is not None:
612 612 return v
613 613
614 614 return ui.configbool('server', 'bundle1')
615 615
616 616 def supportedcompengines(ui, proto, role):
617 617 """Obtain the list of supported compression engines for a request."""
618 618 assert role in (util.CLIENTROLE, util.SERVERROLE)
619 619
620 620 compengines = util.compengines.supportedwireengines(role)
621 621
622 622 # Allow config to override default list and ordering.
623 623 if role == util.SERVERROLE:
624 624 configengines = ui.configlist('server', 'compressionengines')
625 625 config = 'server.compressionengines'
626 626 else:
627 627 # This is currently implemented mainly to facilitate testing. In most
628 628 # cases, the server should be in charge of choosing a compression engine
629 629 # because a server has the most to lose from a sub-optimal choice. (e.g.
630 630 # CPU DoS due to an expensive engine or a network DoS due to poor
631 631 # compression ratio).
632 632 configengines = ui.configlist('experimental',
633 633 'clientcompressionengines')
634 634 config = 'experimental.clientcompressionengines'
635 635
636 636 # No explicit config. Filter out the ones that aren't supposed to be
637 637 # advertised and return default ordering.
638 638 if not configengines:
639 639 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
640 640 return [e for e in compengines
641 641 if getattr(e.wireprotosupport(), attr) > 0]
642 642
643 643 # If compression engines are listed in the config, assume there is a good
644 644 # reason for it (like server operators wanting to achieve specific
645 645 # performance characteristics). So fail fast if the config references
646 646 # unusable compression engines.
647 647 validnames = set(e.name() for e in compengines)
648 648 invalidnames = set(e for e in configengines if e not in validnames)
649 649 if invalidnames:
650 650 raise error.Abort(_('invalid compression engine defined in %s: %s') %
651 651 (config, ', '.join(sorted(invalidnames))))
652 652
653 653 compengines = [e for e in compengines if e.name() in configengines]
654 654 compengines = sorted(compengines,
655 655 key=lambda e: configengines.index(e.name()))
656 656
657 657 if not compengines:
658 658 raise error.Abort(_('%s config option does not specify any known '
659 659 'compression engines') % config,
660 660 hint=_('usable compression engines: %s') %
661 661 ', '.sorted(validnames))
662 662
663 663 return compengines
664 664
665 665 # list of commands
666 666 commands = {}
667 667
668 668 def wireprotocommand(name, args=''):
669 669 """decorator for wire protocol command"""
670 670 def register(func):
671 671 commands[name] = (func, args)
672 672 return func
673 673 return register
674 674
675 675 @wireprotocommand('batch', 'cmds *')
676 676 def batch(repo, proto, cmds, others):
677 677 repo = repo.filtered("served")
678 678 res = []
679 679 for pair in cmds.split(';'):
680 680 op, args = pair.split(' ', 1)
681 681 vals = {}
682 682 for a in args.split(','):
683 683 if a:
684 684 n, v = a.split('=')
685 685 vals[unescapearg(n)] = unescapearg(v)
686 686 func, spec = commands[op]
687 687 if spec:
688 688 keys = spec.split()
689 689 data = {}
690 690 for k in keys:
691 691 if k == '*':
692 692 star = {}
693 693 for key in vals.keys():
694 694 if key not in keys:
695 695 star[key] = vals[key]
696 696 data['*'] = star
697 697 else:
698 698 data[k] = vals[k]
699 699 result = func(repo, proto, *[data[k] for k in keys])
700 700 else:
701 701 result = func(repo, proto)
702 702 if isinstance(result, ooberror):
703 703 return result
704 704 res.append(escapearg(result))
705 705 return ';'.join(res)
706 706
707 707 @wireprotocommand('between', 'pairs')
708 708 def between(repo, proto, pairs):
709 709 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
710 710 r = []
711 711 for b in repo.between(pairs):
712 712 r.append(encodelist(b) + "\n")
713 713 return "".join(r)
714 714
715 715 @wireprotocommand('branchmap')
716 716 def branchmap(repo, proto):
717 717 branchmap = repo.branchmap()
718 718 heads = []
719 719 for branch, nodes in branchmap.iteritems():
720 720 branchname = urlreq.quote(encoding.fromlocal(branch))
721 721 branchnodes = encodelist(nodes)
722 722 heads.append('%s %s' % (branchname, branchnodes))
723 723 return '\n'.join(heads)
724 724
725 725 @wireprotocommand('branches', 'nodes')
726 726 def branches(repo, proto, nodes):
727 727 nodes = decodelist(nodes)
728 728 r = []
729 729 for b in repo.branches(nodes):
730 730 r.append(encodelist(b) + "\n")
731 731 return "".join(r)
732 732
733 733 @wireprotocommand('clonebundles', '')
734 734 def clonebundles(repo, proto):
735 735 """Server command for returning info for available bundles to seed clones.
736 736
737 737 Clients will parse this response and determine what bundle to fetch.
738 738
739 739 Extensions may wrap this command to filter or dynamically emit data
740 740 depending on the request. e.g. you could advertise URLs for the closest
741 741 data center given the client's IP address.
742 742 """
743 743 return repo.vfs.tryread('clonebundles.manifest')
744 744
745 745 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
746 746 'known', 'getbundle', 'unbundlehash', 'batch']
747 747
748 748 def _capabilities(repo, proto):
749 749 """return a list of capabilities for a repo
750 750
751 751 This function exists to allow extensions to easily wrap capabilities
752 752 computation
753 753
754 754 - returns a lists: easy to alter
755 755 - change done here will be propagated to both `capabilities` and `hello`
756 756 command without any other action needed.
757 757 """
758 758 # copy to prevent modification of the global list
759 759 caps = list(wireprotocaps)
760 760 if streamclone.allowservergeneration(repo):
761 761 if repo.ui.configbool('server', 'preferuncompressed'):
762 762 caps.append('stream-preferred')
763 763 requiredformats = repo.requirements & repo.supportedformats
764 764 # if our local revlogs are just revlogv1, add 'stream' cap
765 765 if not requiredformats - {'revlogv1'}:
766 766 caps.append('stream')
767 767 # otherwise, add 'streamreqs' detailing our local revlog format
768 768 else:
769 769 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
770 770 if repo.ui.configbool('experimental', 'bundle2-advertise'):
771 771 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
772 772 caps.append('bundle2=' + urlreq.quote(capsblob))
773 773 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
774 774
775 775 if proto.name == 'http':
776 776 caps.append('httpheader=%d' %
777 777 repo.ui.configint('server', 'maxhttpheaderlen'))
778 778 if repo.ui.configbool('experimental', 'httppostargs'):
779 779 caps.append('httppostargs')
780 780
781 781 # FUTURE advertise 0.2rx once support is implemented
782 782 # FUTURE advertise minrx and mintx after consulting config option
783 783 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
784 784
785 785 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
786 786 if compengines:
787 787 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
788 788 for e in compengines)
789 789 caps.append('compression=%s' % comptypes)
790 790
791 791 return caps
792 792
793 793 # If you are writing an extension and consider wrapping this function. Wrap
794 794 # `_capabilities` instead.
795 795 @wireprotocommand('capabilities')
796 796 def capabilities(repo, proto):
797 797 return ' '.join(_capabilities(repo, proto))
798 798
799 799 @wireprotocommand('changegroup', 'roots')
800 800 def changegroup(repo, proto, roots):
801 801 nodes = decodelist(roots)
802 802 outgoing = discovery.outgoing(repo, missingroots=nodes,
803 803 missingheads=repo.heads())
804 804 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
805 805 return streamres(reader=cg, v1compressible=True)
806 806
807 807 @wireprotocommand('changegroupsubset', 'bases heads')
808 808 def changegroupsubset(repo, proto, bases, heads):
809 809 bases = decodelist(bases)
810 810 heads = decodelist(heads)
811 811 outgoing = discovery.outgoing(repo, missingroots=bases,
812 812 missingheads=heads)
813 813 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
814 814 return streamres(reader=cg, v1compressible=True)
815 815
816 816 @wireprotocommand('debugwireargs', 'one two *')
817 817 def debugwireargs(repo, proto, one, two, others):
818 818 # only accept optional args from the known set
819 819 opts = options('debugwireargs', ['three', 'four'], others)
820 return repo.debugwireargs(one, two, **opts)
820 return repo.debugwireargs(one, two, **pycompat.strkwargs(opts))
821 821
822 822 @wireprotocommand('getbundle', '*')
823 823 def getbundle(repo, proto, others):
824 824 opts = options('getbundle', gboptsmap.keys(), others)
825 825 for k, v in opts.iteritems():
826 826 keytype = gboptsmap[k]
827 827 if keytype == 'nodes':
828 828 opts[k] = decodelist(v)
829 829 elif keytype == 'csv':
830 830 opts[k] = list(v.split(','))
831 831 elif keytype == 'scsv':
832 832 opts[k] = set(v.split(','))
833 833 elif keytype == 'boolean':
834 834 # Client should serialize False as '0', which is a non-empty string
835 835 # so it evaluates as a True bool.
836 836 if v == '0':
837 837 opts[k] = False
838 838 else:
839 839 opts[k] = bool(v)
840 840 elif keytype != 'plain':
841 841 raise KeyError('unknown getbundle option type %s'
842 842 % keytype)
843 843
844 844 if not bundle1allowed(repo, 'pull'):
845 845 if not exchange.bundle2requested(opts.get('bundlecaps')):
846 846 if proto.name == 'http':
847 847 return ooberror(bundle2required)
848 848 raise error.Abort(bundle2requiredmain,
849 849 hint=bundle2requiredhint)
850 850
851 851 try:
852 852 if repo.ui.configbool('server', 'disablefullbundle'):
853 853 # Check to see if this is a full clone.
854 854 clheads = set(repo.changelog.heads())
855 855 heads = set(opts.get('heads', set()))
856 856 common = set(opts.get('common', set()))
857 857 common.discard(nullid)
858 858 if not common and clheads == heads:
859 859 raise error.Abort(
860 860 _('server has pull-based clones disabled'),
861 861 hint=_('remove --pull if specified or upgrade Mercurial'))
862 862
863 863 chunks = exchange.getbundlechunks(repo, 'serve',
864 864 **pycompat.strkwargs(opts))
865 865 except error.Abort as exc:
866 866 # cleanly forward Abort error to the client
867 867 if not exchange.bundle2requested(opts.get('bundlecaps')):
868 868 if proto.name == 'http':
869 869 return ooberror(str(exc) + '\n')
870 870 raise # cannot do better for bundle1 + ssh
871 871 # bundle2 request expect a bundle2 reply
872 872 bundler = bundle2.bundle20(repo.ui)
873 873 manargs = [('message', str(exc))]
874 874 advargs = []
875 875 if exc.hint is not None:
876 876 advargs.append(('hint', exc.hint))
877 877 bundler.addpart(bundle2.bundlepart('error:abort',
878 878 manargs, advargs))
879 879 return streamres(gen=bundler.getchunks(), v1compressible=True)
880 880 return streamres(gen=chunks, v1compressible=True)
881 881
882 882 @wireprotocommand('heads')
883 883 def heads(repo, proto):
884 884 h = repo.heads()
885 885 return encodelist(h) + "\n"
886 886
887 887 @wireprotocommand('hello')
888 888 def hello(repo, proto):
889 889 '''the hello command returns a set of lines describing various
890 890 interesting things about the server, in an RFC822-like format.
891 891 Currently the only one defined is "capabilities", which
892 892 consists of a line in the form:
893 893
894 894 capabilities: space separated list of tokens
895 895 '''
896 896 return "capabilities: %s\n" % (capabilities(repo, proto))
897 897
898 898 @wireprotocommand('listkeys', 'namespace')
899 899 def listkeys(repo, proto, namespace):
900 900 d = repo.listkeys(encoding.tolocal(namespace)).items()
901 901 return pushkeymod.encodekeys(d)
902 902
903 903 @wireprotocommand('lookup', 'key')
904 904 def lookup(repo, proto, key):
905 905 try:
906 906 k = encoding.tolocal(key)
907 907 c = repo[k]
908 908 r = c.hex()
909 909 success = 1
910 910 except Exception as inst:
911 911 r = str(inst)
912 912 success = 0
913 913 return "%d %s\n" % (success, r)
914 914
915 915 @wireprotocommand('known', 'nodes *')
916 916 def known(repo, proto, nodes, others):
917 917 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
918 918
919 919 @wireprotocommand('pushkey', 'namespace key old new')
920 920 def pushkey(repo, proto, namespace, key, old, new):
921 921 # compatibility with pre-1.8 clients which were accidentally
922 922 # sending raw binary nodes rather than utf-8-encoded hex
923 923 if len(new) == 20 and util.escapestr(new) != new:
924 924 # looks like it could be a binary node
925 925 try:
926 926 new.decode('utf-8')
927 927 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
928 928 except UnicodeDecodeError:
929 929 pass # binary, leave unmodified
930 930 else:
931 931 new = encoding.tolocal(new) # normal path
932 932
933 933 if util.safehasattr(proto, 'restore'):
934 934
935 935 proto.redirect()
936 936
937 937 try:
938 938 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
939 939 encoding.tolocal(old), new) or False
940 940 except error.Abort:
941 941 r = False
942 942
943 943 output = proto.restore()
944 944
945 945 return '%s\n%s' % (int(r), output)
946 946
947 947 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
948 948 encoding.tolocal(old), new)
949 949 return '%s\n' % int(r)
950 950
951 951 @wireprotocommand('stream_out')
952 952 def stream(repo, proto):
953 953 '''If the server supports streaming clone, it advertises the "stream"
954 954 capability with a value representing the version and flags of the repo
955 955 it is serving. Client checks to see if it understands the format.
956 956 '''
957 957 if not streamclone.allowservergeneration(repo):
958 958 return '1\n'
959 959
960 960 def getstream(it):
961 961 yield '0\n'
962 962 for chunk in it:
963 963 yield chunk
964 964
965 965 try:
966 966 # LockError may be raised before the first result is yielded. Don't
967 967 # emit output until we're sure we got the lock successfully.
968 968 it = streamclone.generatev1wireproto(repo)
969 969 return streamres(gen=getstream(it))
970 970 except error.LockError:
971 971 return '2\n'
972 972
973 973 @wireprotocommand('unbundle', 'heads')
974 974 def unbundle(repo, proto, heads):
975 975 their_heads = decodelist(heads)
976 976
977 977 try:
978 978 proto.redirect()
979 979
980 980 exchange.check_heads(repo, their_heads, 'preparing changes')
981 981
982 982 # write bundle data to temporary file because it can be big
983 983 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
984 984 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
985 985 r = 0
986 986 try:
987 987 proto.getfile(fp)
988 988 fp.seek(0)
989 989 gen = exchange.readbundle(repo.ui, fp, None)
990 990 if (isinstance(gen, changegroupmod.cg1unpacker)
991 991 and not bundle1allowed(repo, 'push')):
992 992 if proto.name == 'http':
993 993 # need to special case http because stderr do not get to
994 994 # the http client on failed push so we need to abuse some
995 995 # other error type to make sure the message get to the
996 996 # user.
997 997 return ooberror(bundle2required)
998 998 raise error.Abort(bundle2requiredmain,
999 999 hint=bundle2requiredhint)
1000 1000
1001 1001 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1002 1002 proto._client())
1003 1003 if util.safehasattr(r, 'addpart'):
1004 1004 # The return looks streamable, we are in the bundle2 case and
1005 1005 # should return a stream.
1006 1006 return streamres(gen=r.getchunks())
1007 1007 return pushres(r)
1008 1008
1009 1009 finally:
1010 1010 fp.close()
1011 1011 os.unlink(tempname)
1012 1012
1013 1013 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1014 1014 # handle non-bundle2 case first
1015 1015 if not getattr(exc, 'duringunbundle2', False):
1016 1016 try:
1017 1017 raise
1018 1018 except error.Abort:
1019 1019 # The old code we moved used util.stderr directly.
1020 1020 # We did not change it to minimise code change.
1021 1021 # This need to be moved to something proper.
1022 1022 # Feel free to do it.
1023 1023 util.stderr.write("abort: %s\n" % exc)
1024 1024 if exc.hint is not None:
1025 1025 util.stderr.write("(%s)\n" % exc.hint)
1026 1026 return pushres(0)
1027 1027 except error.PushRaced:
1028 1028 return pusherr(str(exc))
1029 1029
1030 1030 bundler = bundle2.bundle20(repo.ui)
1031 1031 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1032 1032 bundler.addpart(out)
1033 1033 try:
1034 1034 try:
1035 1035 raise
1036 1036 except error.PushkeyFailed as exc:
1037 1037 # check client caps
1038 1038 remotecaps = getattr(exc, '_replycaps', None)
1039 1039 if (remotecaps is not None
1040 1040 and 'pushkey' not in remotecaps.get('error', ())):
1041 1041 # no support remote side, fallback to Abort handler.
1042 1042 raise
1043 1043 part = bundler.newpart('error:pushkey')
1044 1044 part.addparam('in-reply-to', exc.partid)
1045 1045 if exc.namespace is not None:
1046 1046 part.addparam('namespace', exc.namespace, mandatory=False)
1047 1047 if exc.key is not None:
1048 1048 part.addparam('key', exc.key, mandatory=False)
1049 1049 if exc.new is not None:
1050 1050 part.addparam('new', exc.new, mandatory=False)
1051 1051 if exc.old is not None:
1052 1052 part.addparam('old', exc.old, mandatory=False)
1053 1053 if exc.ret is not None:
1054 1054 part.addparam('ret', exc.ret, mandatory=False)
1055 1055 except error.BundleValueError as exc:
1056 1056 errpart = bundler.newpart('error:unsupportedcontent')
1057 1057 if exc.parttype is not None:
1058 1058 errpart.addparam('parttype', exc.parttype)
1059 1059 if exc.params:
1060 1060 errpart.addparam('params', '\0'.join(exc.params))
1061 1061 except error.Abort as exc:
1062 1062 manargs = [('message', str(exc))]
1063 1063 advargs = []
1064 1064 if exc.hint is not None:
1065 1065 advargs.append(('hint', exc.hint))
1066 1066 bundler.addpart(bundle2.bundlepart('error:abort',
1067 1067 manargs, advargs))
1068 1068 except error.PushRaced as exc:
1069 1069 bundler.newpart('error:pushraced', [('message', str(exc))])
1070 1070 return streamres(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now