##// END OF EJS Templates
wireproto: use repo.lookup() for lookup command...
Martin von Zweigbergk -
r37371:ac666c5c default
parent child Browse files
Show More
@@ -1,1163 +1,1163 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import os
12 12 import tempfile
13 13
14 14 from .i18n import _
15 15 from .node import (
16 16 bin,
17 17 hex,
18 18 nullid,
19 19 )
20 20
21 21 from . import (
22 22 bundle2,
23 23 changegroup as changegroupmod,
24 24 discovery,
25 25 encoding,
26 26 error,
27 27 exchange,
28 28 peer,
29 29 pushkey as pushkeymod,
30 30 pycompat,
31 31 repository,
32 32 streamclone,
33 33 util,
34 34 wireprototypes,
35 35 )
36 36
37 37 from .utils import (
38 38 procutil,
39 39 stringutil,
40 40 )
41 41
42 42 urlerr = util.urlerr
43 43 urlreq = util.urlreq
44 44
45 45 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
46 46 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
47 47 'IncompatibleClient')
48 48 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
49 49
50 50 class remoteiterbatcher(peer.iterbatcher):
51 51 def __init__(self, remote):
52 52 super(remoteiterbatcher, self).__init__()
53 53 self._remote = remote
54 54
55 55 def __getattr__(self, name):
56 56 # Validate this method is batchable, since submit() only supports
57 57 # batchable methods.
58 58 fn = getattr(self._remote, name)
59 59 if not getattr(fn, 'batchable', None):
60 60 raise error.ProgrammingError('Attempted to batch a non-batchable '
61 61 'call to %r' % name)
62 62
63 63 return super(remoteiterbatcher, self).__getattr__(name)
64 64
65 65 def submit(self):
66 66 """Break the batch request into many patch calls and pipeline them.
67 67
68 68 This is mostly valuable over http where request sizes can be
69 69 limited, but can be used in other places as well.
70 70 """
71 71 # 2-tuple of (command, arguments) that represents what will be
72 72 # sent over the wire.
73 73 requests = []
74 74
75 75 # 4-tuple of (command, final future, @batchable generator, remote
76 76 # future).
77 77 results = []
78 78
79 79 for command, args, opts, finalfuture in self.calls:
80 80 mtd = getattr(self._remote, command)
81 81 batchable = mtd.batchable(mtd.__self__, *args, **opts)
82 82
83 83 commandargs, fremote = next(batchable)
84 84 assert fremote
85 85 requests.append((command, commandargs))
86 86 results.append((command, finalfuture, batchable, fremote))
87 87
88 88 if requests:
89 89 self._resultiter = self._remote._submitbatch(requests)
90 90
91 91 self._results = results
92 92
93 93 def results(self):
94 94 for command, finalfuture, batchable, remotefuture in self._results:
95 95 # Get the raw result, set it in the remote future, feed it
96 96 # back into the @batchable generator so it can be decoded, and
97 97 # set the result on the final future to this value.
98 98 remoteresult = next(self._resultiter)
99 99 remotefuture.set(remoteresult)
100 100 finalfuture.set(next(batchable))
101 101
102 102 # Verify our @batchable generators only emit 2 values.
103 103 try:
104 104 next(batchable)
105 105 except StopIteration:
106 106 pass
107 107 else:
108 108 raise error.ProgrammingError('%s @batchable generator emitted '
109 109 'unexpected value count' % command)
110 110
111 111 yield finalfuture.value
112 112
113 113 # Forward a couple of names from peer to make wireproto interactions
114 114 # slightly more sensible.
115 115 batchable = peer.batchable
116 116 future = peer.future
117 117
118 118 # list of nodes encoding / decoding
119 119
120 120 def decodelist(l, sep=' '):
121 121 if l:
122 122 return [bin(v) for v in l.split(sep)]
123 123 return []
124 124
125 125 def encodelist(l, sep=' '):
126 126 try:
127 127 return sep.join(map(hex, l))
128 128 except TypeError:
129 129 raise
130 130
131 131 # batched call argument encoding
132 132
133 133 def escapearg(plain):
134 134 return (plain
135 135 .replace(':', ':c')
136 136 .replace(',', ':o')
137 137 .replace(';', ':s')
138 138 .replace('=', ':e'))
139 139
140 140 def unescapearg(escaped):
141 141 return (escaped
142 142 .replace(':e', '=')
143 143 .replace(':s', ';')
144 144 .replace(':o', ',')
145 145 .replace(':c', ':'))
146 146
147 147 def encodebatchcmds(req):
148 148 """Return a ``cmds`` argument value for the ``batch`` command."""
149 149 cmds = []
150 150 for op, argsdict in req:
151 151 # Old servers didn't properly unescape argument names. So prevent
152 152 # the sending of argument names that may not be decoded properly by
153 153 # servers.
154 154 assert all(escapearg(k) == k for k in argsdict)
155 155
156 156 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
157 157 for k, v in argsdict.iteritems())
158 158 cmds.append('%s %s' % (op, args))
159 159
160 160 return ';'.join(cmds)
161 161
162 162 # mapping of options accepted by getbundle and their types
163 163 #
164 164 # Meant to be extended by extensions. It is extensions responsibility to ensure
165 165 # such options are properly processed in exchange.getbundle.
166 166 #
167 167 # supported types are:
168 168 #
169 169 # :nodes: list of binary nodes
170 170 # :csv: list of comma-separated values
171 171 # :scsv: list of comma-separated values return as set
172 172 # :plain: string with no transformation needed.
173 173 gboptsmap = {'heads': 'nodes',
174 174 'bookmarks': 'boolean',
175 175 'common': 'nodes',
176 176 'obsmarkers': 'boolean',
177 177 'phases': 'boolean',
178 178 'bundlecaps': 'scsv',
179 179 'listkeys': 'csv',
180 180 'cg': 'boolean',
181 181 'cbattempted': 'boolean',
182 182 'stream': 'boolean',
183 183 }
184 184
185 185 # client side
186 186
187 187 class wirepeer(repository.legacypeer):
188 188 """Client-side interface for communicating with a peer repository.
189 189
190 190 Methods commonly call wire protocol commands of the same name.
191 191
192 192 See also httppeer.py and sshpeer.py for protocol-specific
193 193 implementations of this interface.
194 194 """
195 195 # Begin of ipeercommands interface.
196 196
197 197 def iterbatch(self):
198 198 return remoteiterbatcher(self)
199 199
200 200 @batchable
201 201 def lookup(self, key):
202 202 self.requirecap('lookup', _('look up remote revision'))
203 203 f = future()
204 204 yield {'key': encoding.fromlocal(key)}, f
205 205 d = f.value
206 206 success, data = d[:-1].split(" ", 1)
207 207 if int(success):
208 208 yield bin(data)
209 209 else:
210 210 self._abort(error.RepoError(data))
211 211
212 212 @batchable
213 213 def heads(self):
214 214 f = future()
215 215 yield {}, f
216 216 d = f.value
217 217 try:
218 218 yield decodelist(d[:-1])
219 219 except ValueError:
220 220 self._abort(error.ResponseError(_("unexpected response:"), d))
221 221
222 222 @batchable
223 223 def known(self, nodes):
224 224 f = future()
225 225 yield {'nodes': encodelist(nodes)}, f
226 226 d = f.value
227 227 try:
228 228 yield [bool(int(b)) for b in d]
229 229 except ValueError:
230 230 self._abort(error.ResponseError(_("unexpected response:"), d))
231 231
232 232 @batchable
233 233 def branchmap(self):
234 234 f = future()
235 235 yield {}, f
236 236 d = f.value
237 237 try:
238 238 branchmap = {}
239 239 for branchpart in d.splitlines():
240 240 branchname, branchheads = branchpart.split(' ', 1)
241 241 branchname = encoding.tolocal(urlreq.unquote(branchname))
242 242 branchheads = decodelist(branchheads)
243 243 branchmap[branchname] = branchheads
244 244 yield branchmap
245 245 except TypeError:
246 246 self._abort(error.ResponseError(_("unexpected response:"), d))
247 247
248 248 @batchable
249 249 def listkeys(self, namespace):
250 250 if not self.capable('pushkey'):
251 251 yield {}, None
252 252 f = future()
253 253 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
254 254 yield {'namespace': encoding.fromlocal(namespace)}, f
255 255 d = f.value
256 256 self.ui.debug('received listkey for "%s": %i bytes\n'
257 257 % (namespace, len(d)))
258 258 yield pushkeymod.decodekeys(d)
259 259
260 260 @batchable
261 261 def pushkey(self, namespace, key, old, new):
262 262 if not self.capable('pushkey'):
263 263 yield False, None
264 264 f = future()
265 265 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
266 266 yield {'namespace': encoding.fromlocal(namespace),
267 267 'key': encoding.fromlocal(key),
268 268 'old': encoding.fromlocal(old),
269 269 'new': encoding.fromlocal(new)}, f
270 270 d = f.value
271 271 d, output = d.split('\n', 1)
272 272 try:
273 273 d = bool(int(d))
274 274 except ValueError:
275 275 raise error.ResponseError(
276 276 _('push failed (unexpected response):'), d)
277 277 for l in output.splitlines(True):
278 278 self.ui.status(_('remote: '), l)
279 279 yield d
280 280
281 281 def stream_out(self):
282 282 return self._callstream('stream_out')
283 283
284 284 def getbundle(self, source, **kwargs):
285 285 kwargs = pycompat.byteskwargs(kwargs)
286 286 self.requirecap('getbundle', _('look up remote changes'))
287 287 opts = {}
288 288 bundlecaps = kwargs.get('bundlecaps')
289 289 if bundlecaps is not None:
290 290 kwargs['bundlecaps'] = sorted(bundlecaps)
291 291 else:
292 292 bundlecaps = () # kwargs could have it to None
293 293 for key, value in kwargs.iteritems():
294 294 if value is None:
295 295 continue
296 296 keytype = gboptsmap.get(key)
297 297 if keytype is None:
298 298 raise error.ProgrammingError(
299 299 'Unexpectedly None keytype for key %s' % key)
300 300 elif keytype == 'nodes':
301 301 value = encodelist(value)
302 302 elif keytype in ('csv', 'scsv'):
303 303 value = ','.join(value)
304 304 elif keytype == 'boolean':
305 305 value = '%i' % bool(value)
306 306 elif keytype != 'plain':
307 307 raise KeyError('unknown getbundle option type %s'
308 308 % keytype)
309 309 opts[key] = value
310 310 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
311 311 if any((cap.startswith('HG2') for cap in bundlecaps)):
312 312 return bundle2.getunbundler(self.ui, f)
313 313 else:
314 314 return changegroupmod.cg1unpacker(f, 'UN')
315 315
316 316 def unbundle(self, cg, heads, url):
317 317 '''Send cg (a readable file-like object representing the
318 318 changegroup to push, typically a chunkbuffer object) to the
319 319 remote server as a bundle.
320 320
321 321 When pushing a bundle10 stream, return an integer indicating the
322 322 result of the push (see changegroup.apply()).
323 323
324 324 When pushing a bundle20 stream, return a bundle20 stream.
325 325
326 326 `url` is the url the client thinks it's pushing to, which is
327 327 visible to hooks.
328 328 '''
329 329
330 330 if heads != ['force'] and self.capable('unbundlehash'):
331 331 heads = encodelist(['hashed',
332 332 hashlib.sha1(''.join(sorted(heads))).digest()])
333 333 else:
334 334 heads = encodelist(heads)
335 335
336 336 if util.safehasattr(cg, 'deltaheader'):
337 337 # this a bundle10, do the old style call sequence
338 338 ret, output = self._callpush("unbundle", cg, heads=heads)
339 339 if ret == "":
340 340 raise error.ResponseError(
341 341 _('push failed:'), output)
342 342 try:
343 343 ret = int(ret)
344 344 except ValueError:
345 345 raise error.ResponseError(
346 346 _('push failed (unexpected response):'), ret)
347 347
348 348 for l in output.splitlines(True):
349 349 self.ui.status(_('remote: '), l)
350 350 else:
351 351 # bundle2 push. Send a stream, fetch a stream.
352 352 stream = self._calltwowaystream('unbundle', cg, heads=heads)
353 353 ret = bundle2.getunbundler(self.ui, stream)
354 354 return ret
355 355
356 356 # End of ipeercommands interface.
357 357
358 358 # Begin of ipeerlegacycommands interface.
359 359
360 360 def branches(self, nodes):
361 361 n = encodelist(nodes)
362 362 d = self._call("branches", nodes=n)
363 363 try:
364 364 br = [tuple(decodelist(b)) for b in d.splitlines()]
365 365 return br
366 366 except ValueError:
367 367 self._abort(error.ResponseError(_("unexpected response:"), d))
368 368
369 369 def between(self, pairs):
370 370 batch = 8 # avoid giant requests
371 371 r = []
372 372 for i in xrange(0, len(pairs), batch):
373 373 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
374 374 d = self._call("between", pairs=n)
375 375 try:
376 376 r.extend(l and decodelist(l) or [] for l in d.splitlines())
377 377 except ValueError:
378 378 self._abort(error.ResponseError(_("unexpected response:"), d))
379 379 return r
380 380
381 381 def changegroup(self, nodes, kind):
382 382 n = encodelist(nodes)
383 383 f = self._callcompressable("changegroup", roots=n)
384 384 return changegroupmod.cg1unpacker(f, 'UN')
385 385
386 386 def changegroupsubset(self, bases, heads, kind):
387 387 self.requirecap('changegroupsubset', _('look up remote changes'))
388 388 bases = encodelist(bases)
389 389 heads = encodelist(heads)
390 390 f = self._callcompressable("changegroupsubset",
391 391 bases=bases, heads=heads)
392 392 return changegroupmod.cg1unpacker(f, 'UN')
393 393
394 394 # End of ipeerlegacycommands interface.
395 395
396 396 def _submitbatch(self, req):
397 397 """run batch request <req> on the server
398 398
399 399 Returns an iterator of the raw responses from the server.
400 400 """
401 401 ui = self.ui
402 402 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
403 403 ui.debug('devel-peer-request: batched-content\n')
404 404 for op, args in req:
405 405 msg = 'devel-peer-request: - %s (%d arguments)\n'
406 406 ui.debug(msg % (op, len(args)))
407 407
408 408 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
409 409 chunk = rsp.read(1024)
410 410 work = [chunk]
411 411 while chunk:
412 412 while ';' not in chunk and chunk:
413 413 chunk = rsp.read(1024)
414 414 work.append(chunk)
415 415 merged = ''.join(work)
416 416 while ';' in merged:
417 417 one, merged = merged.split(';', 1)
418 418 yield unescapearg(one)
419 419 chunk = rsp.read(1024)
420 420 work = [merged, chunk]
421 421 yield unescapearg(''.join(work))
422 422
423 423 def _submitone(self, op, args):
424 424 return self._call(op, **pycompat.strkwargs(args))
425 425
426 426 def debugwireargs(self, one, two, three=None, four=None, five=None):
427 427 # don't pass optional arguments left at their default value
428 428 opts = {}
429 429 if three is not None:
430 430 opts[r'three'] = three
431 431 if four is not None:
432 432 opts[r'four'] = four
433 433 return self._call('debugwireargs', one=one, two=two, **opts)
434 434
435 435 def _call(self, cmd, **args):
436 436 """execute <cmd> on the server
437 437
438 438 The command is expected to return a simple string.
439 439
440 440 returns the server reply as a string."""
441 441 raise NotImplementedError()
442 442
443 443 def _callstream(self, cmd, **args):
444 444 """execute <cmd> on the server
445 445
446 446 The command is expected to return a stream. Note that if the
447 447 command doesn't return a stream, _callstream behaves
448 448 differently for ssh and http peers.
449 449
450 450 returns the server reply as a file like object.
451 451 """
452 452 raise NotImplementedError()
453 453
454 454 def _callcompressable(self, cmd, **args):
455 455 """execute <cmd> on the server
456 456
457 457 The command is expected to return a stream.
458 458
459 459 The stream may have been compressed in some implementations. This
460 460 function takes care of the decompression. This is the only difference
461 461 with _callstream.
462 462
463 463 returns the server reply as a file like object.
464 464 """
465 465 raise NotImplementedError()
466 466
467 467 def _callpush(self, cmd, fp, **args):
468 468 """execute a <cmd> on server
469 469
470 470 The command is expected to be related to a push. Push has a special
471 471 return method.
472 472
473 473 returns the server reply as a (ret, output) tuple. ret is either
474 474 empty (error) or a stringified int.
475 475 """
476 476 raise NotImplementedError()
477 477
478 478 def _calltwowaystream(self, cmd, fp, **args):
479 479 """execute <cmd> on server
480 480
481 481 The command will send a stream to the server and get a stream in reply.
482 482 """
483 483 raise NotImplementedError()
484 484
485 485 def _abort(self, exception):
486 486 """clearly abort the wire protocol connection and raise the exception
487 487 """
488 488 raise NotImplementedError()
489 489
490 490 # server side
491 491
492 492 # wire protocol command can either return a string or one of these classes.
493 493
494 494 def getdispatchrepo(repo, proto, command):
495 495 """Obtain the repo used for processing wire protocol commands.
496 496
497 497 The intent of this function is to serve as a monkeypatch point for
498 498 extensions that need commands to operate on different repo views under
499 499 specialized circumstances.
500 500 """
501 501 return repo.filtered('served')
502 502
503 503 def dispatch(repo, proto, command):
504 504 repo = getdispatchrepo(repo, proto, command)
505 505
506 506 transportversion = wireprototypes.TRANSPORTS[proto.name]['version']
507 507 commandtable = commandsv2 if transportversion == 2 else commands
508 508 func, spec = commandtable[command]
509 509
510 510 args = proto.getargs(spec)
511 511 return func(repo, proto, *args)
512 512
513 513 def options(cmd, keys, others):
514 514 opts = {}
515 515 for k in keys:
516 516 if k in others:
517 517 opts[k] = others[k]
518 518 del others[k]
519 519 if others:
520 520 procutil.stderr.write("warning: %s ignored unexpected arguments %s\n"
521 521 % (cmd, ",".join(others)))
522 522 return opts
523 523
524 524 def bundle1allowed(repo, action):
525 525 """Whether a bundle1 operation is allowed from the server.
526 526
527 527 Priority is:
528 528
529 529 1. server.bundle1gd.<action> (if generaldelta active)
530 530 2. server.bundle1.<action>
531 531 3. server.bundle1gd (if generaldelta active)
532 532 4. server.bundle1
533 533 """
534 534 ui = repo.ui
535 535 gd = 'generaldelta' in repo.requirements
536 536
537 537 if gd:
538 538 v = ui.configbool('server', 'bundle1gd.%s' % action)
539 539 if v is not None:
540 540 return v
541 541
542 542 v = ui.configbool('server', 'bundle1.%s' % action)
543 543 if v is not None:
544 544 return v
545 545
546 546 if gd:
547 547 v = ui.configbool('server', 'bundle1gd')
548 548 if v is not None:
549 549 return v
550 550
551 551 return ui.configbool('server', 'bundle1')
552 552
553 553 def supportedcompengines(ui, role):
554 554 """Obtain the list of supported compression engines for a request."""
555 555 assert role in (util.CLIENTROLE, util.SERVERROLE)
556 556
557 557 compengines = util.compengines.supportedwireengines(role)
558 558
559 559 # Allow config to override default list and ordering.
560 560 if role == util.SERVERROLE:
561 561 configengines = ui.configlist('server', 'compressionengines')
562 562 config = 'server.compressionengines'
563 563 else:
564 564 # This is currently implemented mainly to facilitate testing. In most
565 565 # cases, the server should be in charge of choosing a compression engine
566 566 # because a server has the most to lose from a sub-optimal choice. (e.g.
567 567 # CPU DoS due to an expensive engine or a network DoS due to poor
568 568 # compression ratio).
569 569 configengines = ui.configlist('experimental',
570 570 'clientcompressionengines')
571 571 config = 'experimental.clientcompressionengines'
572 572
573 573 # No explicit config. Filter out the ones that aren't supposed to be
574 574 # advertised and return default ordering.
575 575 if not configengines:
576 576 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
577 577 return [e for e in compengines
578 578 if getattr(e.wireprotosupport(), attr) > 0]
579 579
580 580 # If compression engines are listed in the config, assume there is a good
581 581 # reason for it (like server operators wanting to achieve specific
582 582 # performance characteristics). So fail fast if the config references
583 583 # unusable compression engines.
584 584 validnames = set(e.name() for e in compengines)
585 585 invalidnames = set(e for e in configengines if e not in validnames)
586 586 if invalidnames:
587 587 raise error.Abort(_('invalid compression engine defined in %s: %s') %
588 588 (config, ', '.join(sorted(invalidnames))))
589 589
590 590 compengines = [e for e in compengines if e.name() in configengines]
591 591 compengines = sorted(compengines,
592 592 key=lambda e: configengines.index(e.name()))
593 593
594 594 if not compengines:
595 595 raise error.Abort(_('%s config option does not specify any known '
596 596 'compression engines') % config,
597 597 hint=_('usable compression engines: %s') %
598 598 ', '.sorted(validnames))
599 599
600 600 return compengines
601 601
602 602 class commandentry(object):
603 603 """Represents a declared wire protocol command."""
604 604 def __init__(self, func, args='', transports=None,
605 605 permission='push'):
606 606 self.func = func
607 607 self.args = args
608 608 self.transports = transports or set()
609 609 self.permission = permission
610 610
611 611 def _merge(self, func, args):
612 612 """Merge this instance with an incoming 2-tuple.
613 613
614 614 This is called when a caller using the old 2-tuple API attempts
615 615 to replace an instance. The incoming values are merged with
616 616 data not captured by the 2-tuple and a new instance containing
617 617 the union of the two objects is returned.
618 618 """
619 619 return commandentry(func, args=args, transports=set(self.transports),
620 620 permission=self.permission)
621 621
622 622 # Old code treats instances as 2-tuples. So expose that interface.
623 623 def __iter__(self):
624 624 yield self.func
625 625 yield self.args
626 626
627 627 def __getitem__(self, i):
628 628 if i == 0:
629 629 return self.func
630 630 elif i == 1:
631 631 return self.args
632 632 else:
633 633 raise IndexError('can only access elements 0 and 1')
634 634
635 635 class commanddict(dict):
636 636 """Container for registered wire protocol commands.
637 637
638 638 It behaves like a dict. But __setitem__ is overwritten to allow silent
639 639 coercion of values from 2-tuples for API compatibility.
640 640 """
641 641 def __setitem__(self, k, v):
642 642 if isinstance(v, commandentry):
643 643 pass
644 644 # Cast 2-tuples to commandentry instances.
645 645 elif isinstance(v, tuple):
646 646 if len(v) != 2:
647 647 raise ValueError('command tuples must have exactly 2 elements')
648 648
649 649 # It is common for extensions to wrap wire protocol commands via
650 650 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
651 651 # doing this aren't aware of the new API that uses objects to store
652 652 # command entries, we automatically merge old state with new.
653 653 if k in self:
654 654 v = self[k]._merge(v[0], v[1])
655 655 else:
656 656 # Use default values from @wireprotocommand.
657 657 v = commandentry(v[0], args=v[1],
658 658 transports=set(wireprototypes.TRANSPORTS),
659 659 permission='push')
660 660 else:
661 661 raise ValueError('command entries must be commandentry instances '
662 662 'or 2-tuples')
663 663
664 664 return super(commanddict, self).__setitem__(k, v)
665 665
666 666 def commandavailable(self, command, proto):
667 667 """Determine if a command is available for the requested protocol."""
668 668 assert proto.name in wireprototypes.TRANSPORTS
669 669
670 670 entry = self.get(command)
671 671
672 672 if not entry:
673 673 return False
674 674
675 675 if proto.name not in entry.transports:
676 676 return False
677 677
678 678 return True
679 679
680 680 # Constants specifying which transports a wire protocol command should be
681 681 # available on. For use with @wireprotocommand.
682 682 POLICY_ALL = 'all'
683 683 POLICY_V1_ONLY = 'v1-only'
684 684 POLICY_V2_ONLY = 'v2-only'
685 685
686 686 # For version 1 transports.
687 687 commands = commanddict()
688 688
689 689 # For version 2 transports.
690 690 commandsv2 = commanddict()
691 691
692 692 def wireprotocommand(name, args='', transportpolicy=POLICY_ALL,
693 693 permission='push'):
694 694 """Decorator to declare a wire protocol command.
695 695
696 696 ``name`` is the name of the wire protocol command being provided.
697 697
698 698 ``args`` is a space-delimited list of named arguments that the command
699 699 accepts. ``*`` is a special value that says to accept all arguments.
700 700
701 701 ``transportpolicy`` is a POLICY_* constant denoting which transports
702 702 this wire protocol command should be exposed to. By default, commands
703 703 are exposed to all wire protocol transports.
704 704
705 705 ``permission`` defines the permission type needed to run this command.
706 706 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
707 707 respectively. Default is to assume command requires ``push`` permissions
708 708 because otherwise commands not declaring their permissions could modify
709 709 a repository that is supposed to be read-only.
710 710 """
711 711 if transportpolicy == POLICY_ALL:
712 712 transports = set(wireprototypes.TRANSPORTS)
713 713 transportversions = {1, 2}
714 714 elif transportpolicy == POLICY_V1_ONLY:
715 715 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
716 716 if v['version'] == 1}
717 717 transportversions = {1}
718 718 elif transportpolicy == POLICY_V2_ONLY:
719 719 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
720 720 if v['version'] == 2}
721 721 transportversions = {2}
722 722 else:
723 723 raise error.ProgrammingError('invalid transport policy value: %s' %
724 724 transportpolicy)
725 725
726 726 # Because SSHv2 is a mirror of SSHv1, we allow "batch" commands through to
727 727 # SSHv2.
728 728 # TODO undo this hack when SSH is using the unified frame protocol.
729 729 if name == b'batch':
730 730 transports.add(wireprototypes.SSHV2)
731 731
732 732 if permission not in ('push', 'pull'):
733 733 raise error.ProgrammingError('invalid wire protocol permission; '
734 734 'got %s; expected "push" or "pull"' %
735 735 permission)
736 736
737 737 def register(func):
738 738 if 1 in transportversions:
739 739 if name in commands:
740 740 raise error.ProgrammingError('%s command already registered '
741 741 'for version 1' % name)
742 742 commands[name] = commandentry(func, args=args,
743 743 transports=transports,
744 744 permission=permission)
745 745 if 2 in transportversions:
746 746 if name in commandsv2:
747 747 raise error.ProgrammingError('%s command already registered '
748 748 'for version 2' % name)
749 749 commandsv2[name] = commandentry(func, args=args,
750 750 transports=transports,
751 751 permission=permission)
752 752
753 753 return func
754 754 return register
755 755
756 756 # TODO define a more appropriate permissions type to use for this.
757 757 @wireprotocommand('batch', 'cmds *', permission='pull',
758 758 transportpolicy=POLICY_V1_ONLY)
759 759 def batch(repo, proto, cmds, others):
760 760 repo = repo.filtered("served")
761 761 res = []
762 762 for pair in cmds.split(';'):
763 763 op, args = pair.split(' ', 1)
764 764 vals = {}
765 765 for a in args.split(','):
766 766 if a:
767 767 n, v = a.split('=')
768 768 vals[unescapearg(n)] = unescapearg(v)
769 769 func, spec = commands[op]
770 770
771 771 # Validate that client has permissions to perform this command.
772 772 perm = commands[op].permission
773 773 assert perm in ('push', 'pull')
774 774 proto.checkperm(perm)
775 775
776 776 if spec:
777 777 keys = spec.split()
778 778 data = {}
779 779 for k in keys:
780 780 if k == '*':
781 781 star = {}
782 782 for key in vals.keys():
783 783 if key not in keys:
784 784 star[key] = vals[key]
785 785 data['*'] = star
786 786 else:
787 787 data[k] = vals[k]
788 788 result = func(repo, proto, *[data[k] for k in keys])
789 789 else:
790 790 result = func(repo, proto)
791 791 if isinstance(result, wireprototypes.ooberror):
792 792 return result
793 793
794 794 # For now, all batchable commands must return bytesresponse or
795 795 # raw bytes (for backwards compatibility).
796 796 assert isinstance(result, (wireprototypes.bytesresponse, bytes))
797 797 if isinstance(result, wireprototypes.bytesresponse):
798 798 result = result.data
799 799 res.append(escapearg(result))
800 800
801 801 return wireprototypes.bytesresponse(';'.join(res))
802 802
803 803 @wireprotocommand('between', 'pairs', transportpolicy=POLICY_V1_ONLY,
804 804 permission='pull')
805 805 def between(repo, proto, pairs):
806 806 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
807 807 r = []
808 808 for b in repo.between(pairs):
809 809 r.append(encodelist(b) + "\n")
810 810
811 811 return wireprototypes.bytesresponse(''.join(r))
812 812
813 813 @wireprotocommand('branchmap', permission='pull')
814 814 def branchmap(repo, proto):
815 815 branchmap = repo.branchmap()
816 816 heads = []
817 817 for branch, nodes in branchmap.iteritems():
818 818 branchname = urlreq.quote(encoding.fromlocal(branch))
819 819 branchnodes = encodelist(nodes)
820 820 heads.append('%s %s' % (branchname, branchnodes))
821 821
822 822 return wireprototypes.bytesresponse('\n'.join(heads))
823 823
824 824 @wireprotocommand('branches', 'nodes', transportpolicy=POLICY_V1_ONLY,
825 825 permission='pull')
826 826 def branches(repo, proto, nodes):
827 827 nodes = decodelist(nodes)
828 828 r = []
829 829 for b in repo.branches(nodes):
830 830 r.append(encodelist(b) + "\n")
831 831
832 832 return wireprototypes.bytesresponse(''.join(r))
833 833
834 834 @wireprotocommand('clonebundles', '', permission='pull')
835 835 def clonebundles(repo, proto):
836 836 """Server command for returning info for available bundles to seed clones.
837 837
838 838 Clients will parse this response and determine what bundle to fetch.
839 839
840 840 Extensions may wrap this command to filter or dynamically emit data
841 841 depending on the request. e.g. you could advertise URLs for the closest
842 842 data center given the client's IP address.
843 843 """
844 844 return wireprototypes.bytesresponse(
845 845 repo.vfs.tryread('clonebundles.manifest'))
846 846
847 847 wireprotocaps = ['lookup', 'branchmap', 'pushkey',
848 848 'known', 'getbundle', 'unbundlehash']
849 849
850 850 def _capabilities(repo, proto):
851 851 """return a list of capabilities for a repo
852 852
853 853 This function exists to allow extensions to easily wrap capabilities
854 854 computation
855 855
856 856 - returns a lists: easy to alter
857 857 - change done here will be propagated to both `capabilities` and `hello`
858 858 command without any other action needed.
859 859 """
860 860 # copy to prevent modification of the global list
861 861 caps = list(wireprotocaps)
862 862
863 863 # Command of same name as capability isn't exposed to version 1 of
864 864 # transports. So conditionally add it.
865 865 if commands.commandavailable('changegroupsubset', proto):
866 866 caps.append('changegroupsubset')
867 867
868 868 if streamclone.allowservergeneration(repo):
869 869 if repo.ui.configbool('server', 'preferuncompressed'):
870 870 caps.append('stream-preferred')
871 871 requiredformats = repo.requirements & repo.supportedformats
872 872 # if our local revlogs are just revlogv1, add 'stream' cap
873 873 if not requiredformats - {'revlogv1'}:
874 874 caps.append('stream')
875 875 # otherwise, add 'streamreqs' detailing our local revlog format
876 876 else:
877 877 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
878 878 if repo.ui.configbool('experimental', 'bundle2-advertise'):
879 879 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
880 880 caps.append('bundle2=' + urlreq.quote(capsblob))
881 881 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
882 882
883 883 return proto.addcapabilities(repo, caps)
884 884
885 885 # If you are writing an extension and consider wrapping this function. Wrap
886 886 # `_capabilities` instead.
887 887 @wireprotocommand('capabilities', permission='pull')
888 888 def capabilities(repo, proto):
889 889 return wireprototypes.bytesresponse(' '.join(_capabilities(repo, proto)))
890 890
891 891 @wireprotocommand('changegroup', 'roots', transportpolicy=POLICY_V1_ONLY,
892 892 permission='pull')
893 893 def changegroup(repo, proto, roots):
894 894 nodes = decodelist(roots)
895 895 outgoing = discovery.outgoing(repo, missingroots=nodes,
896 896 missingheads=repo.heads())
897 897 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
898 898 gen = iter(lambda: cg.read(32768), '')
899 899 return wireprototypes.streamres(gen=gen)
900 900
901 901 @wireprotocommand('changegroupsubset', 'bases heads',
902 902 transportpolicy=POLICY_V1_ONLY,
903 903 permission='pull')
904 904 def changegroupsubset(repo, proto, bases, heads):
905 905 bases = decodelist(bases)
906 906 heads = decodelist(heads)
907 907 outgoing = discovery.outgoing(repo, missingroots=bases,
908 908 missingheads=heads)
909 909 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
910 910 gen = iter(lambda: cg.read(32768), '')
911 911 return wireprototypes.streamres(gen=gen)
912 912
913 913 @wireprotocommand('debugwireargs', 'one two *',
914 914 permission='pull')
915 915 def debugwireargs(repo, proto, one, two, others):
916 916 # only accept optional args from the known set
917 917 opts = options('debugwireargs', ['three', 'four'], others)
918 918 return wireprototypes.bytesresponse(repo.debugwireargs(
919 919 one, two, **pycompat.strkwargs(opts)))
920 920
921 921 @wireprotocommand('getbundle', '*', permission='pull')
922 922 def getbundle(repo, proto, others):
923 923 opts = options('getbundle', gboptsmap.keys(), others)
924 924 for k, v in opts.iteritems():
925 925 keytype = gboptsmap[k]
926 926 if keytype == 'nodes':
927 927 opts[k] = decodelist(v)
928 928 elif keytype == 'csv':
929 929 opts[k] = list(v.split(','))
930 930 elif keytype == 'scsv':
931 931 opts[k] = set(v.split(','))
932 932 elif keytype == 'boolean':
933 933 # Client should serialize False as '0', which is a non-empty string
934 934 # so it evaluates as a True bool.
935 935 if v == '0':
936 936 opts[k] = False
937 937 else:
938 938 opts[k] = bool(v)
939 939 elif keytype != 'plain':
940 940 raise KeyError('unknown getbundle option type %s'
941 941 % keytype)
942 942
943 943 if not bundle1allowed(repo, 'pull'):
944 944 if not exchange.bundle2requested(opts.get('bundlecaps')):
945 945 if proto.name == 'http-v1':
946 946 return wireprototypes.ooberror(bundle2required)
947 947 raise error.Abort(bundle2requiredmain,
948 948 hint=bundle2requiredhint)
949 949
950 950 prefercompressed = True
951 951
952 952 try:
953 953 if repo.ui.configbool('server', 'disablefullbundle'):
954 954 # Check to see if this is a full clone.
955 955 clheads = set(repo.changelog.heads())
956 956 changegroup = opts.get('cg', True)
957 957 heads = set(opts.get('heads', set()))
958 958 common = set(opts.get('common', set()))
959 959 common.discard(nullid)
960 960 if changegroup and not common and clheads == heads:
961 961 raise error.Abort(
962 962 _('server has pull-based clones disabled'),
963 963 hint=_('remove --pull if specified or upgrade Mercurial'))
964 964
965 965 info, chunks = exchange.getbundlechunks(repo, 'serve',
966 966 **pycompat.strkwargs(opts))
967 967 prefercompressed = info.get('prefercompressed', True)
968 968 except error.Abort as exc:
969 969 # cleanly forward Abort error to the client
970 970 if not exchange.bundle2requested(opts.get('bundlecaps')):
971 971 if proto.name == 'http-v1':
972 972 return wireprototypes.ooberror(pycompat.bytestr(exc) + '\n')
973 973 raise # cannot do better for bundle1 + ssh
974 974 # bundle2 request expect a bundle2 reply
975 975 bundler = bundle2.bundle20(repo.ui)
976 976 manargs = [('message', pycompat.bytestr(exc))]
977 977 advargs = []
978 978 if exc.hint is not None:
979 979 advargs.append(('hint', exc.hint))
980 980 bundler.addpart(bundle2.bundlepart('error:abort',
981 981 manargs, advargs))
982 982 chunks = bundler.getchunks()
983 983 prefercompressed = False
984 984
985 985 return wireprototypes.streamres(
986 986 gen=chunks, prefer_uncompressed=not prefercompressed)
987 987
988 988 @wireprotocommand('heads', permission='pull')
989 989 def heads(repo, proto):
990 990 h = repo.heads()
991 991 return wireprototypes.bytesresponse(encodelist(h) + '\n')
992 992
993 993 @wireprotocommand('hello', permission='pull')
994 994 def hello(repo, proto):
995 995 """Called as part of SSH handshake to obtain server info.
996 996
997 997 Returns a list of lines describing interesting things about the
998 998 server, in an RFC822-like format.
999 999
1000 1000 Currently, the only one defined is ``capabilities``, which consists of a
1001 1001 line of space separated tokens describing server abilities:
1002 1002
1003 1003 capabilities: <token0> <token1> <token2>
1004 1004 """
1005 1005 caps = capabilities(repo, proto).data
1006 1006 return wireprototypes.bytesresponse('capabilities: %s\n' % caps)
1007 1007
1008 1008 @wireprotocommand('listkeys', 'namespace', permission='pull')
1009 1009 def listkeys(repo, proto, namespace):
1010 1010 d = sorted(repo.listkeys(encoding.tolocal(namespace)).items())
1011 1011 return wireprototypes.bytesresponse(pushkeymod.encodekeys(d))
1012 1012
1013 1013 @wireprotocommand('lookup', 'key', permission='pull')
1014 1014 def lookup(repo, proto, key):
1015 1015 try:
1016 1016 k = encoding.tolocal(key)
1017 c = repo[k]
1018 r = c.hex()
1017 n = repo.lookup(k)
1018 r = hex(n)
1019 1019 success = 1
1020 1020 except Exception as inst:
1021 1021 r = stringutil.forcebytestr(inst)
1022 1022 success = 0
1023 1023 return wireprototypes.bytesresponse('%d %s\n' % (success, r))
1024 1024
1025 1025 @wireprotocommand('known', 'nodes *', permission='pull')
1026 1026 def known(repo, proto, nodes, others):
1027 1027 v = ''.join(b and '1' or '0' for b in repo.known(decodelist(nodes)))
1028 1028 return wireprototypes.bytesresponse(v)
1029 1029
1030 1030 @wireprotocommand('pushkey', 'namespace key old new', permission='push')
1031 1031 def pushkey(repo, proto, namespace, key, old, new):
1032 1032 # compatibility with pre-1.8 clients which were accidentally
1033 1033 # sending raw binary nodes rather than utf-8-encoded hex
1034 1034 if len(new) == 20 and stringutil.escapestr(new) != new:
1035 1035 # looks like it could be a binary node
1036 1036 try:
1037 1037 new.decode('utf-8')
1038 1038 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
1039 1039 except UnicodeDecodeError:
1040 1040 pass # binary, leave unmodified
1041 1041 else:
1042 1042 new = encoding.tolocal(new) # normal path
1043 1043
1044 1044 with proto.mayberedirectstdio() as output:
1045 1045 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
1046 1046 encoding.tolocal(old), new) or False
1047 1047
1048 1048 output = output.getvalue() if output else ''
1049 1049 return wireprototypes.bytesresponse('%d\n%s' % (int(r), output))
1050 1050
1051 1051 @wireprotocommand('stream_out', permission='pull')
1052 1052 def stream(repo, proto):
1053 1053 '''If the server supports streaming clone, it advertises the "stream"
1054 1054 capability with a value representing the version and flags of the repo
1055 1055 it is serving. Client checks to see if it understands the format.
1056 1056 '''
1057 1057 return wireprototypes.streamreslegacy(
1058 1058 streamclone.generatev1wireproto(repo))
1059 1059
1060 1060 @wireprotocommand('unbundle', 'heads', permission='push')
1061 1061 def unbundle(repo, proto, heads):
1062 1062 their_heads = decodelist(heads)
1063 1063
1064 1064 with proto.mayberedirectstdio() as output:
1065 1065 try:
1066 1066 exchange.check_heads(repo, their_heads, 'preparing changes')
1067 1067
1068 1068 # write bundle data to temporary file because it can be big
1069 1069 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1070 1070 fp = os.fdopen(fd, r'wb+')
1071 1071 r = 0
1072 1072 try:
1073 1073 proto.forwardpayload(fp)
1074 1074 fp.seek(0)
1075 1075 gen = exchange.readbundle(repo.ui, fp, None)
1076 1076 if (isinstance(gen, changegroupmod.cg1unpacker)
1077 1077 and not bundle1allowed(repo, 'push')):
1078 1078 if proto.name == 'http-v1':
1079 1079 # need to special case http because stderr do not get to
1080 1080 # the http client on failed push so we need to abuse
1081 1081 # some other error type to make sure the message get to
1082 1082 # the user.
1083 1083 return wireprototypes.ooberror(bundle2required)
1084 1084 raise error.Abort(bundle2requiredmain,
1085 1085 hint=bundle2requiredhint)
1086 1086
1087 1087 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1088 1088 proto.client())
1089 1089 if util.safehasattr(r, 'addpart'):
1090 1090 # The return looks streamable, we are in the bundle2 case
1091 1091 # and should return a stream.
1092 1092 return wireprototypes.streamreslegacy(gen=r.getchunks())
1093 1093 return wireprototypes.pushres(
1094 1094 r, output.getvalue() if output else '')
1095 1095
1096 1096 finally:
1097 1097 fp.close()
1098 1098 os.unlink(tempname)
1099 1099
1100 1100 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1101 1101 # handle non-bundle2 case first
1102 1102 if not getattr(exc, 'duringunbundle2', False):
1103 1103 try:
1104 1104 raise
1105 1105 except error.Abort:
1106 1106 # The old code we moved used procutil.stderr directly.
1107 1107 # We did not change it to minimise code change.
1108 1108 # This need to be moved to something proper.
1109 1109 # Feel free to do it.
1110 1110 procutil.stderr.write("abort: %s\n" % exc)
1111 1111 if exc.hint is not None:
1112 1112 procutil.stderr.write("(%s)\n" % exc.hint)
1113 1113 procutil.stderr.flush()
1114 1114 return wireprototypes.pushres(
1115 1115 0, output.getvalue() if output else '')
1116 1116 except error.PushRaced:
1117 1117 return wireprototypes.pusherr(
1118 1118 pycompat.bytestr(exc),
1119 1119 output.getvalue() if output else '')
1120 1120
1121 1121 bundler = bundle2.bundle20(repo.ui)
1122 1122 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1123 1123 bundler.addpart(out)
1124 1124 try:
1125 1125 try:
1126 1126 raise
1127 1127 except error.PushkeyFailed as exc:
1128 1128 # check client caps
1129 1129 remotecaps = getattr(exc, '_replycaps', None)
1130 1130 if (remotecaps is not None
1131 1131 and 'pushkey' not in remotecaps.get('error', ())):
1132 1132 # no support remote side, fallback to Abort handler.
1133 1133 raise
1134 1134 part = bundler.newpart('error:pushkey')
1135 1135 part.addparam('in-reply-to', exc.partid)
1136 1136 if exc.namespace is not None:
1137 1137 part.addparam('namespace', exc.namespace,
1138 1138 mandatory=False)
1139 1139 if exc.key is not None:
1140 1140 part.addparam('key', exc.key, mandatory=False)
1141 1141 if exc.new is not None:
1142 1142 part.addparam('new', exc.new, mandatory=False)
1143 1143 if exc.old is not None:
1144 1144 part.addparam('old', exc.old, mandatory=False)
1145 1145 if exc.ret is not None:
1146 1146 part.addparam('ret', exc.ret, mandatory=False)
1147 1147 except error.BundleValueError as exc:
1148 1148 errpart = bundler.newpart('error:unsupportedcontent')
1149 1149 if exc.parttype is not None:
1150 1150 errpart.addparam('parttype', exc.parttype)
1151 1151 if exc.params:
1152 1152 errpart.addparam('params', '\0'.join(exc.params))
1153 1153 except error.Abort as exc:
1154 1154 manargs = [('message', stringutil.forcebytestr(exc))]
1155 1155 advargs = []
1156 1156 if exc.hint is not None:
1157 1157 advargs.append(('hint', exc.hint))
1158 1158 bundler.addpart(bundle2.bundlepart('error:abort',
1159 1159 manargs, advargs))
1160 1160 except error.PushRaced as exc:
1161 1161 bundler.newpart('error:pushraced',
1162 1162 [('message', stringutil.forcebytestr(exc))])
1163 1163 return wireprototypes.streamreslegacy(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now