##// END OF EJS Templates
wireproto: only expose "between" to version 1 of wire protocols...
Gregory Szorc -
r36633:ed770501 default
parent child Browse files
Show More
@@ -1,1093 +1,1091 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import os
12 12 import tempfile
13 13
14 14 from .i18n import _
15 15 from .node import (
16 16 bin,
17 17 hex,
18 18 nullid,
19 19 )
20 20
21 21 from . import (
22 22 bundle2,
23 23 changegroup as changegroupmod,
24 24 discovery,
25 25 encoding,
26 26 error,
27 27 exchange,
28 28 peer,
29 29 pushkey as pushkeymod,
30 30 pycompat,
31 31 repository,
32 32 streamclone,
33 33 util,
34 34 wireprototypes,
35 35 )
36 36
37 37 urlerr = util.urlerr
38 38 urlreq = util.urlreq
39 39
40 40 bytesresponse = wireprototypes.bytesresponse
41 41 ooberror = wireprototypes.ooberror
42 42 pushres = wireprototypes.pushres
43 43 pusherr = wireprototypes.pusherr
44 44 streamres = wireprototypes.streamres
45 45 streamres_legacy = wireprototypes.streamreslegacy
46 46
47 47 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
48 48 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
49 49 'IncompatibleClient')
50 50 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
51 51
52 52 class remoteiterbatcher(peer.iterbatcher):
53 53 def __init__(self, remote):
54 54 super(remoteiterbatcher, self).__init__()
55 55 self._remote = remote
56 56
57 57 def __getattr__(self, name):
58 58 # Validate this method is batchable, since submit() only supports
59 59 # batchable methods.
60 60 fn = getattr(self._remote, name)
61 61 if not getattr(fn, 'batchable', None):
62 62 raise error.ProgrammingError('Attempted to batch a non-batchable '
63 63 'call to %r' % name)
64 64
65 65 return super(remoteiterbatcher, self).__getattr__(name)
66 66
67 67 def submit(self):
68 68 """Break the batch request into many patch calls and pipeline them.
69 69
70 70 This is mostly valuable over http where request sizes can be
71 71 limited, but can be used in other places as well.
72 72 """
73 73 # 2-tuple of (command, arguments) that represents what will be
74 74 # sent over the wire.
75 75 requests = []
76 76
77 77 # 4-tuple of (command, final future, @batchable generator, remote
78 78 # future).
79 79 results = []
80 80
81 81 for command, args, opts, finalfuture in self.calls:
82 82 mtd = getattr(self._remote, command)
83 83 batchable = mtd.batchable(mtd.__self__, *args, **opts)
84 84
85 85 commandargs, fremote = next(batchable)
86 86 assert fremote
87 87 requests.append((command, commandargs))
88 88 results.append((command, finalfuture, batchable, fremote))
89 89
90 90 if requests:
91 91 self._resultiter = self._remote._submitbatch(requests)
92 92
93 93 self._results = results
94 94
95 95 def results(self):
96 96 for command, finalfuture, batchable, remotefuture in self._results:
97 97 # Get the raw result, set it in the remote future, feed it
98 98 # back into the @batchable generator so it can be decoded, and
99 99 # set the result on the final future to this value.
100 100 remoteresult = next(self._resultiter)
101 101 remotefuture.set(remoteresult)
102 102 finalfuture.set(next(batchable))
103 103
104 104 # Verify our @batchable generators only emit 2 values.
105 105 try:
106 106 next(batchable)
107 107 except StopIteration:
108 108 pass
109 109 else:
110 110 raise error.ProgrammingError('%s @batchable generator emitted '
111 111 'unexpected value count' % command)
112 112
113 113 yield finalfuture.value
114 114
115 115 # Forward a couple of names from peer to make wireproto interactions
116 116 # slightly more sensible.
117 117 batchable = peer.batchable
118 118 future = peer.future
119 119
120 120 # list of nodes encoding / decoding
121 121
122 122 def decodelist(l, sep=' '):
123 123 if l:
124 124 return [bin(v) for v in l.split(sep)]
125 125 return []
126 126
127 127 def encodelist(l, sep=' '):
128 128 try:
129 129 return sep.join(map(hex, l))
130 130 except TypeError:
131 131 raise
132 132
133 133 # batched call argument encoding
134 134
135 135 def escapearg(plain):
136 136 return (plain
137 137 .replace(':', ':c')
138 138 .replace(',', ':o')
139 139 .replace(';', ':s')
140 140 .replace('=', ':e'))
141 141
142 142 def unescapearg(escaped):
143 143 return (escaped
144 144 .replace(':e', '=')
145 145 .replace(':s', ';')
146 146 .replace(':o', ',')
147 147 .replace(':c', ':'))
148 148
149 149 def encodebatchcmds(req):
150 150 """Return a ``cmds`` argument value for the ``batch`` command."""
151 151 cmds = []
152 152 for op, argsdict in req:
153 153 # Old servers didn't properly unescape argument names. So prevent
154 154 # the sending of argument names that may not be decoded properly by
155 155 # servers.
156 156 assert all(escapearg(k) == k for k in argsdict)
157 157
158 158 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
159 159 for k, v in argsdict.iteritems())
160 160 cmds.append('%s %s' % (op, args))
161 161
162 162 return ';'.join(cmds)
163 163
164 164 # mapping of options accepted by getbundle and their types
165 165 #
166 166 # Meant to be extended by extensions. It is extensions responsibility to ensure
167 167 # such options are properly processed in exchange.getbundle.
168 168 #
169 169 # supported types are:
170 170 #
171 171 # :nodes: list of binary nodes
172 172 # :csv: list of comma-separated values
173 173 # :scsv: list of comma-separated values return as set
174 174 # :plain: string with no transformation needed.
175 175 gboptsmap = {'heads': 'nodes',
176 176 'bookmarks': 'boolean',
177 177 'common': 'nodes',
178 178 'obsmarkers': 'boolean',
179 179 'phases': 'boolean',
180 180 'bundlecaps': 'scsv',
181 181 'listkeys': 'csv',
182 182 'cg': 'boolean',
183 183 'cbattempted': 'boolean',
184 184 'stream': 'boolean',
185 185 }
186 186
187 187 # client side
188 188
189 189 class wirepeer(repository.legacypeer):
190 190 """Client-side interface for communicating with a peer repository.
191 191
192 192 Methods commonly call wire protocol commands of the same name.
193 193
194 194 See also httppeer.py and sshpeer.py for protocol-specific
195 195 implementations of this interface.
196 196 """
197 197 # Begin of basewirepeer interface.
198 198
199 199 def iterbatch(self):
200 200 return remoteiterbatcher(self)
201 201
202 202 @batchable
203 203 def lookup(self, key):
204 204 self.requirecap('lookup', _('look up remote revision'))
205 205 f = future()
206 206 yield {'key': encoding.fromlocal(key)}, f
207 207 d = f.value
208 208 success, data = d[:-1].split(" ", 1)
209 209 if int(success):
210 210 yield bin(data)
211 211 else:
212 212 self._abort(error.RepoError(data))
213 213
214 214 @batchable
215 215 def heads(self):
216 216 f = future()
217 217 yield {}, f
218 218 d = f.value
219 219 try:
220 220 yield decodelist(d[:-1])
221 221 except ValueError:
222 222 self._abort(error.ResponseError(_("unexpected response:"), d))
223 223
224 224 @batchable
225 225 def known(self, nodes):
226 226 f = future()
227 227 yield {'nodes': encodelist(nodes)}, f
228 228 d = f.value
229 229 try:
230 230 yield [bool(int(b)) for b in d]
231 231 except ValueError:
232 232 self._abort(error.ResponseError(_("unexpected response:"), d))
233 233
234 234 @batchable
235 235 def branchmap(self):
236 236 f = future()
237 237 yield {}, f
238 238 d = f.value
239 239 try:
240 240 branchmap = {}
241 241 for branchpart in d.splitlines():
242 242 branchname, branchheads = branchpart.split(' ', 1)
243 243 branchname = encoding.tolocal(urlreq.unquote(branchname))
244 244 branchheads = decodelist(branchheads)
245 245 branchmap[branchname] = branchheads
246 246 yield branchmap
247 247 except TypeError:
248 248 self._abort(error.ResponseError(_("unexpected response:"), d))
249 249
250 250 @batchable
251 251 def listkeys(self, namespace):
252 252 if not self.capable('pushkey'):
253 253 yield {}, None
254 254 f = future()
255 255 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
256 256 yield {'namespace': encoding.fromlocal(namespace)}, f
257 257 d = f.value
258 258 self.ui.debug('received listkey for "%s": %i bytes\n'
259 259 % (namespace, len(d)))
260 260 yield pushkeymod.decodekeys(d)
261 261
262 262 @batchable
263 263 def pushkey(self, namespace, key, old, new):
264 264 if not self.capable('pushkey'):
265 265 yield False, None
266 266 f = future()
267 267 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
268 268 yield {'namespace': encoding.fromlocal(namespace),
269 269 'key': encoding.fromlocal(key),
270 270 'old': encoding.fromlocal(old),
271 271 'new': encoding.fromlocal(new)}, f
272 272 d = f.value
273 273 d, output = d.split('\n', 1)
274 274 try:
275 275 d = bool(int(d))
276 276 except ValueError:
277 277 raise error.ResponseError(
278 278 _('push failed (unexpected response):'), d)
279 279 for l in output.splitlines(True):
280 280 self.ui.status(_('remote: '), l)
281 281 yield d
282 282
283 283 def stream_out(self):
284 284 return self._callstream('stream_out')
285 285
286 286 def getbundle(self, source, **kwargs):
287 287 kwargs = pycompat.byteskwargs(kwargs)
288 288 self.requirecap('getbundle', _('look up remote changes'))
289 289 opts = {}
290 290 bundlecaps = kwargs.get('bundlecaps')
291 291 if bundlecaps is not None:
292 292 kwargs['bundlecaps'] = sorted(bundlecaps)
293 293 else:
294 294 bundlecaps = () # kwargs could have it to None
295 295 for key, value in kwargs.iteritems():
296 296 if value is None:
297 297 continue
298 298 keytype = gboptsmap.get(key)
299 299 if keytype is None:
300 300 raise error.ProgrammingError(
301 301 'Unexpectedly None keytype for key %s' % key)
302 302 elif keytype == 'nodes':
303 303 value = encodelist(value)
304 304 elif keytype in ('csv', 'scsv'):
305 305 value = ','.join(value)
306 306 elif keytype == 'boolean':
307 307 value = '%i' % bool(value)
308 308 elif keytype != 'plain':
309 309 raise KeyError('unknown getbundle option type %s'
310 310 % keytype)
311 311 opts[key] = value
312 312 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
313 313 if any((cap.startswith('HG2') for cap in bundlecaps)):
314 314 return bundle2.getunbundler(self.ui, f)
315 315 else:
316 316 return changegroupmod.cg1unpacker(f, 'UN')
317 317
318 318 def unbundle(self, cg, heads, url):
319 319 '''Send cg (a readable file-like object representing the
320 320 changegroup to push, typically a chunkbuffer object) to the
321 321 remote server as a bundle.
322 322
323 323 When pushing a bundle10 stream, return an integer indicating the
324 324 result of the push (see changegroup.apply()).
325 325
326 326 When pushing a bundle20 stream, return a bundle20 stream.
327 327
328 328 `url` is the url the client thinks it's pushing to, which is
329 329 visible to hooks.
330 330 '''
331 331
332 332 if heads != ['force'] and self.capable('unbundlehash'):
333 333 heads = encodelist(['hashed',
334 334 hashlib.sha1(''.join(sorted(heads))).digest()])
335 335 else:
336 336 heads = encodelist(heads)
337 337
338 338 if util.safehasattr(cg, 'deltaheader'):
339 339 # this a bundle10, do the old style call sequence
340 340 ret, output = self._callpush("unbundle", cg, heads=heads)
341 341 if ret == "":
342 342 raise error.ResponseError(
343 343 _('push failed:'), output)
344 344 try:
345 345 ret = int(ret)
346 346 except ValueError:
347 347 raise error.ResponseError(
348 348 _('push failed (unexpected response):'), ret)
349 349
350 350 for l in output.splitlines(True):
351 351 self.ui.status(_('remote: '), l)
352 352 else:
353 353 # bundle2 push. Send a stream, fetch a stream.
354 354 stream = self._calltwowaystream('unbundle', cg, heads=heads)
355 355 ret = bundle2.getunbundler(self.ui, stream)
356 356 return ret
357 357
358 358 # End of basewirepeer interface.
359 359
360 360 # Begin of baselegacywirepeer interface.
361 361
362 362 def branches(self, nodes):
363 363 n = encodelist(nodes)
364 364 d = self._call("branches", nodes=n)
365 365 try:
366 366 br = [tuple(decodelist(b)) for b in d.splitlines()]
367 367 return br
368 368 except ValueError:
369 369 self._abort(error.ResponseError(_("unexpected response:"), d))
370 370
371 371 def between(self, pairs):
372 372 batch = 8 # avoid giant requests
373 373 r = []
374 374 for i in xrange(0, len(pairs), batch):
375 375 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
376 376 d = self._call("between", pairs=n)
377 377 try:
378 378 r.extend(l and decodelist(l) or [] for l in d.splitlines())
379 379 except ValueError:
380 380 self._abort(error.ResponseError(_("unexpected response:"), d))
381 381 return r
382 382
383 383 def changegroup(self, nodes, kind):
384 384 n = encodelist(nodes)
385 385 f = self._callcompressable("changegroup", roots=n)
386 386 return changegroupmod.cg1unpacker(f, 'UN')
387 387
388 388 def changegroupsubset(self, bases, heads, kind):
389 389 self.requirecap('changegroupsubset', _('look up remote changes'))
390 390 bases = encodelist(bases)
391 391 heads = encodelist(heads)
392 392 f = self._callcompressable("changegroupsubset",
393 393 bases=bases, heads=heads)
394 394 return changegroupmod.cg1unpacker(f, 'UN')
395 395
396 396 # End of baselegacywirepeer interface.
397 397
398 398 def _submitbatch(self, req):
399 399 """run batch request <req> on the server
400 400
401 401 Returns an iterator of the raw responses from the server.
402 402 """
403 403 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
404 404 chunk = rsp.read(1024)
405 405 work = [chunk]
406 406 while chunk:
407 407 while ';' not in chunk and chunk:
408 408 chunk = rsp.read(1024)
409 409 work.append(chunk)
410 410 merged = ''.join(work)
411 411 while ';' in merged:
412 412 one, merged = merged.split(';', 1)
413 413 yield unescapearg(one)
414 414 chunk = rsp.read(1024)
415 415 work = [merged, chunk]
416 416 yield unescapearg(''.join(work))
417 417
418 418 def _submitone(self, op, args):
419 419 return self._call(op, **pycompat.strkwargs(args))
420 420
421 421 def debugwireargs(self, one, two, three=None, four=None, five=None):
422 422 # don't pass optional arguments left at their default value
423 423 opts = {}
424 424 if three is not None:
425 425 opts[r'three'] = three
426 426 if four is not None:
427 427 opts[r'four'] = four
428 428 return self._call('debugwireargs', one=one, two=two, **opts)
429 429
430 430 def _call(self, cmd, **args):
431 431 """execute <cmd> on the server
432 432
433 433 The command is expected to return a simple string.
434 434
435 435 returns the server reply as a string."""
436 436 raise NotImplementedError()
437 437
438 438 def _callstream(self, cmd, **args):
439 439 """execute <cmd> on the server
440 440
441 441 The command is expected to return a stream. Note that if the
442 442 command doesn't return a stream, _callstream behaves
443 443 differently for ssh and http peers.
444 444
445 445 returns the server reply as a file like object.
446 446 """
447 447 raise NotImplementedError()
448 448
449 449 def _callcompressable(self, cmd, **args):
450 450 """execute <cmd> on the server
451 451
452 452 The command is expected to return a stream.
453 453
454 454 The stream may have been compressed in some implementations. This
455 455 function takes care of the decompression. This is the only difference
456 456 with _callstream.
457 457
458 458 returns the server reply as a file like object.
459 459 """
460 460 raise NotImplementedError()
461 461
462 462 def _callpush(self, cmd, fp, **args):
463 463 """execute a <cmd> on server
464 464
465 465 The command is expected to be related to a push. Push has a special
466 466 return method.
467 467
468 468 returns the server reply as a (ret, output) tuple. ret is either
469 469 empty (error) or a stringified int.
470 470 """
471 471 raise NotImplementedError()
472 472
473 473 def _calltwowaystream(self, cmd, fp, **args):
474 474 """execute <cmd> on server
475 475
476 476 The command will send a stream to the server and get a stream in reply.
477 477 """
478 478 raise NotImplementedError()
479 479
480 480 def _abort(self, exception):
481 481 """clearly abort the wire protocol connection and raise the exception
482 482 """
483 483 raise NotImplementedError()
484 484
485 485 # server side
486 486
487 487 # wire protocol command can either return a string or one of these classes.
488 488
489 489 def getdispatchrepo(repo, proto, command):
490 490 """Obtain the repo used for processing wire protocol commands.
491 491
492 492 The intent of this function is to serve as a monkeypatch point for
493 493 extensions that need commands to operate on different repo views under
494 494 specialized circumstances.
495 495 """
496 496 return repo.filtered('served')
497 497
498 498 def dispatch(repo, proto, command):
499 499 repo = getdispatchrepo(repo, proto, command)
500 500 func, spec = commands[command]
501 501 args = proto.getargs(spec)
502 502 return func(repo, proto, *args)
503 503
504 504 def options(cmd, keys, others):
505 505 opts = {}
506 506 for k in keys:
507 507 if k in others:
508 508 opts[k] = others[k]
509 509 del others[k]
510 510 if others:
511 511 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
512 512 % (cmd, ",".join(others)))
513 513 return opts
514 514
515 515 def bundle1allowed(repo, action):
516 516 """Whether a bundle1 operation is allowed from the server.
517 517
518 518 Priority is:
519 519
520 520 1. server.bundle1gd.<action> (if generaldelta active)
521 521 2. server.bundle1.<action>
522 522 3. server.bundle1gd (if generaldelta active)
523 523 4. server.bundle1
524 524 """
525 525 ui = repo.ui
526 526 gd = 'generaldelta' in repo.requirements
527 527
528 528 if gd:
529 529 v = ui.configbool('server', 'bundle1gd.%s' % action)
530 530 if v is not None:
531 531 return v
532 532
533 533 v = ui.configbool('server', 'bundle1.%s' % action)
534 534 if v is not None:
535 535 return v
536 536
537 537 if gd:
538 538 v = ui.configbool('server', 'bundle1gd')
539 539 if v is not None:
540 540 return v
541 541
542 542 return ui.configbool('server', 'bundle1')
543 543
544 544 def supportedcompengines(ui, role):
545 545 """Obtain the list of supported compression engines for a request."""
546 546 assert role in (util.CLIENTROLE, util.SERVERROLE)
547 547
548 548 compengines = util.compengines.supportedwireengines(role)
549 549
550 550 # Allow config to override default list and ordering.
551 551 if role == util.SERVERROLE:
552 552 configengines = ui.configlist('server', 'compressionengines')
553 553 config = 'server.compressionengines'
554 554 else:
555 555 # This is currently implemented mainly to facilitate testing. In most
556 556 # cases, the server should be in charge of choosing a compression engine
557 557 # because a server has the most to lose from a sub-optimal choice. (e.g.
558 558 # CPU DoS due to an expensive engine or a network DoS due to poor
559 559 # compression ratio).
560 560 configengines = ui.configlist('experimental',
561 561 'clientcompressionengines')
562 562 config = 'experimental.clientcompressionengines'
563 563
564 564 # No explicit config. Filter out the ones that aren't supposed to be
565 565 # advertised and return default ordering.
566 566 if not configengines:
567 567 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
568 568 return [e for e in compengines
569 569 if getattr(e.wireprotosupport(), attr) > 0]
570 570
571 571 # If compression engines are listed in the config, assume there is a good
572 572 # reason for it (like server operators wanting to achieve specific
573 573 # performance characteristics). So fail fast if the config references
574 574 # unusable compression engines.
575 575 validnames = set(e.name() for e in compengines)
576 576 invalidnames = set(e for e in configengines if e not in validnames)
577 577 if invalidnames:
578 578 raise error.Abort(_('invalid compression engine defined in %s: %s') %
579 579 (config, ', '.join(sorted(invalidnames))))
580 580
581 581 compengines = [e for e in compengines if e.name() in configengines]
582 582 compengines = sorted(compengines,
583 583 key=lambda e: configengines.index(e.name()))
584 584
585 585 if not compengines:
586 586 raise error.Abort(_('%s config option does not specify any known '
587 587 'compression engines') % config,
588 588 hint=_('usable compression engines: %s') %
589 589 ', '.sorted(validnames))
590 590
591 591 return compengines
592 592
593 593 class commandentry(object):
594 594 """Represents a declared wire protocol command."""
595 595 def __init__(self, func, args='', transports=None):
596 596 self.func = func
597 597 self.args = args
598 598 self.transports = transports or set()
599 599
600 600 def _merge(self, func, args):
601 601 """Merge this instance with an incoming 2-tuple.
602 602
603 603 This is called when a caller using the old 2-tuple API attempts
604 604 to replace an instance. The incoming values are merged with
605 605 data not captured by the 2-tuple and a new instance containing
606 606 the union of the two objects is returned.
607 607 """
608 608 return commandentry(func, args=args, transports=set(self.transports))
609 609
610 610 # Old code treats instances as 2-tuples. So expose that interface.
611 611 def __iter__(self):
612 612 yield self.func
613 613 yield self.args
614 614
615 615 def __getitem__(self, i):
616 616 if i == 0:
617 617 return self.func
618 618 elif i == 1:
619 619 return self.args
620 620 else:
621 621 raise IndexError('can only access elements 0 and 1')
622 622
623 623 class commanddict(dict):
624 624 """Container for registered wire protocol commands.
625 625
626 626 It behaves like a dict. But __setitem__ is overwritten to allow silent
627 627 coercion of values from 2-tuples for API compatibility.
628 628 """
629 629 def __setitem__(self, k, v):
630 630 if isinstance(v, commandentry):
631 631 pass
632 632 # Cast 2-tuples to commandentry instances.
633 633 elif isinstance(v, tuple):
634 634 if len(v) != 2:
635 635 raise ValueError('command tuples must have exactly 2 elements')
636 636
637 637 # It is common for extensions to wrap wire protocol commands via
638 638 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
639 639 # doing this aren't aware of the new API that uses objects to store
640 640 # command entries, we automatically merge old state with new.
641 641 if k in self:
642 642 v = self[k]._merge(v[0], v[1])
643 643 else:
644 644 # Use default values from @wireprotocommand.
645 645 v = commandentry(v[0], args=v[1],
646 646 transports=set(wireprototypes.TRANSPORTS))
647 647 else:
648 648 raise ValueError('command entries must be commandentry instances '
649 649 'or 2-tuples')
650 650
651 651 return super(commanddict, self).__setitem__(k, v)
652 652
653 653 def commandavailable(self, command, proto):
654 654 """Determine if a command is available for the requested protocol."""
655 655 assert proto.name in wireprototypes.TRANSPORTS
656 656
657 657 entry = self.get(command)
658 658
659 659 if not entry:
660 660 return False
661 661
662 662 if proto.name not in entry.transports:
663 663 return False
664 664
665 665 return True
666 666
667 667 # Constants specifying which transports a wire protocol command should be
668 668 # available on. For use with @wireprotocommand.
669 669 POLICY_ALL = 'all'
670 670 POLICY_V1_ONLY = 'v1-only'
671 671 POLICY_V2_ONLY = 'v2-only'
672 672
673 673 commands = commanddict()
674 674
675 675 def wireprotocommand(name, args='', transportpolicy=POLICY_ALL):
676 676 """Decorator to declare a wire protocol command.
677 677
678 678 ``name`` is the name of the wire protocol command being provided.
679 679
680 680 ``args`` is a space-delimited list of named arguments that the command
681 681 accepts. ``*`` is a special value that says to accept all arguments.
682 682
683 683 ``transportpolicy`` is a POLICY_* constant denoting which transports
684 684 this wire protocol command should be exposed to. By default, commands
685 685 are exposed to all wire protocol transports.
686 686 """
687 687 if transportpolicy == POLICY_ALL:
688 688 transports = set(wireprototypes.TRANSPORTS)
689 689 elif transportpolicy == POLICY_V1_ONLY:
690 690 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
691 691 if v['version'] == 1}
692 692 elif transportpolicy == POLICY_V2_ONLY:
693 693 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
694 694 if v['version'] == 2}
695 695 else:
696 696 raise error.Abort(_('invalid transport policy value: %s') %
697 697 transportpolicy)
698 698
699 699 def register(func):
700 700 commands[name] = commandentry(func, args=args, transports=transports)
701 701 return func
702 702 return register
703 703
704 704 @wireprotocommand('batch', 'cmds *')
705 705 def batch(repo, proto, cmds, others):
706 706 repo = repo.filtered("served")
707 707 res = []
708 708 for pair in cmds.split(';'):
709 709 op, args = pair.split(' ', 1)
710 710 vals = {}
711 711 for a in args.split(','):
712 712 if a:
713 713 n, v = a.split('=')
714 714 vals[unescapearg(n)] = unescapearg(v)
715 715 func, spec = commands[op]
716 716 if spec:
717 717 keys = spec.split()
718 718 data = {}
719 719 for k in keys:
720 720 if k == '*':
721 721 star = {}
722 722 for key in vals.keys():
723 723 if key not in keys:
724 724 star[key] = vals[key]
725 725 data['*'] = star
726 726 else:
727 727 data[k] = vals[k]
728 728 result = func(repo, proto, *[data[k] for k in keys])
729 729 else:
730 730 result = func(repo, proto)
731 731 if isinstance(result, ooberror):
732 732 return result
733 733
734 734 # For now, all batchable commands must return bytesresponse or
735 735 # raw bytes (for backwards compatibility).
736 736 assert isinstance(result, (bytesresponse, bytes))
737 737 if isinstance(result, bytesresponse):
738 738 result = result.data
739 739 res.append(escapearg(result))
740 740
741 741 return bytesresponse(';'.join(res))
742 742
743 # TODO mark as version 1 transport only once interaction with
744 # SSH handshake mechanism is figured out.
745 @wireprotocommand('between', 'pairs')
743 @wireprotocommand('between', 'pairs', transportpolicy=POLICY_V1_ONLY)
746 744 def between(repo, proto, pairs):
747 745 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
748 746 r = []
749 747 for b in repo.between(pairs):
750 748 r.append(encodelist(b) + "\n")
751 749
752 750 return bytesresponse(''.join(r))
753 751
754 752 @wireprotocommand('branchmap')
755 753 def branchmap(repo, proto):
756 754 branchmap = repo.branchmap()
757 755 heads = []
758 756 for branch, nodes in branchmap.iteritems():
759 757 branchname = urlreq.quote(encoding.fromlocal(branch))
760 758 branchnodes = encodelist(nodes)
761 759 heads.append('%s %s' % (branchname, branchnodes))
762 760
763 761 return bytesresponse('\n'.join(heads))
764 762
765 763 @wireprotocommand('branches', 'nodes', transportpolicy=POLICY_V1_ONLY)
766 764 def branches(repo, proto, nodes):
767 765 nodes = decodelist(nodes)
768 766 r = []
769 767 for b in repo.branches(nodes):
770 768 r.append(encodelist(b) + "\n")
771 769
772 770 return bytesresponse(''.join(r))
773 771
774 772 @wireprotocommand('clonebundles', '')
775 773 def clonebundles(repo, proto):
776 774 """Server command for returning info for available bundles to seed clones.
777 775
778 776 Clients will parse this response and determine what bundle to fetch.
779 777
780 778 Extensions may wrap this command to filter or dynamically emit data
781 779 depending on the request. e.g. you could advertise URLs for the closest
782 780 data center given the client's IP address.
783 781 """
784 782 return bytesresponse(repo.vfs.tryread('clonebundles.manifest'))
785 783
786 784 wireprotocaps = ['lookup', 'branchmap', 'pushkey',
787 785 'known', 'getbundle', 'unbundlehash', 'batch']
788 786
789 787 def _capabilities(repo, proto):
790 788 """return a list of capabilities for a repo
791 789
792 790 This function exists to allow extensions to easily wrap capabilities
793 791 computation
794 792
795 793 - returns a lists: easy to alter
796 794 - change done here will be propagated to both `capabilities` and `hello`
797 795 command without any other action needed.
798 796 """
799 797 # copy to prevent modification of the global list
800 798 caps = list(wireprotocaps)
801 799
802 800 # Command of same name as capability isn't exposed to version 1 of
803 801 # transports. So conditionally add it.
804 802 if commands.commandavailable('changegroupsubset', proto):
805 803 caps.append('changegroupsubset')
806 804
807 805 if streamclone.allowservergeneration(repo):
808 806 if repo.ui.configbool('server', 'preferuncompressed'):
809 807 caps.append('stream-preferred')
810 808 requiredformats = repo.requirements & repo.supportedformats
811 809 # if our local revlogs are just revlogv1, add 'stream' cap
812 810 if not requiredformats - {'revlogv1'}:
813 811 caps.append('stream')
814 812 # otherwise, add 'streamreqs' detailing our local revlog format
815 813 else:
816 814 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
817 815 if repo.ui.configbool('experimental', 'bundle2-advertise'):
818 816 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
819 817 caps.append('bundle2=' + urlreq.quote(capsblob))
820 818 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
821 819
822 820 return proto.addcapabilities(repo, caps)
823 821
824 822 # If you are writing an extension and consider wrapping this function. Wrap
825 823 # `_capabilities` instead.
826 824 @wireprotocommand('capabilities')
827 825 def capabilities(repo, proto):
828 826 return bytesresponse(' '.join(_capabilities(repo, proto)))
829 827
830 828 @wireprotocommand('changegroup', 'roots', transportpolicy=POLICY_V1_ONLY)
831 829 def changegroup(repo, proto, roots):
832 830 nodes = decodelist(roots)
833 831 outgoing = discovery.outgoing(repo, missingroots=nodes,
834 832 missingheads=repo.heads())
835 833 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
836 834 gen = iter(lambda: cg.read(32768), '')
837 835 return streamres(gen=gen)
838 836
839 837 @wireprotocommand('changegroupsubset', 'bases heads',
840 838 transportpolicy=POLICY_V1_ONLY)
841 839 def changegroupsubset(repo, proto, bases, heads):
842 840 bases = decodelist(bases)
843 841 heads = decodelist(heads)
844 842 outgoing = discovery.outgoing(repo, missingroots=bases,
845 843 missingheads=heads)
846 844 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
847 845 gen = iter(lambda: cg.read(32768), '')
848 846 return streamres(gen=gen)
849 847
850 848 @wireprotocommand('debugwireargs', 'one two *')
851 849 def debugwireargs(repo, proto, one, two, others):
852 850 # only accept optional args from the known set
853 851 opts = options('debugwireargs', ['three', 'four'], others)
854 852 return bytesresponse(repo.debugwireargs(one, two,
855 853 **pycompat.strkwargs(opts)))
856 854
857 855 @wireprotocommand('getbundle', '*')
858 856 def getbundle(repo, proto, others):
859 857 opts = options('getbundle', gboptsmap.keys(), others)
860 858 for k, v in opts.iteritems():
861 859 keytype = gboptsmap[k]
862 860 if keytype == 'nodes':
863 861 opts[k] = decodelist(v)
864 862 elif keytype == 'csv':
865 863 opts[k] = list(v.split(','))
866 864 elif keytype == 'scsv':
867 865 opts[k] = set(v.split(','))
868 866 elif keytype == 'boolean':
869 867 # Client should serialize False as '0', which is a non-empty string
870 868 # so it evaluates as a True bool.
871 869 if v == '0':
872 870 opts[k] = False
873 871 else:
874 872 opts[k] = bool(v)
875 873 elif keytype != 'plain':
876 874 raise KeyError('unknown getbundle option type %s'
877 875 % keytype)
878 876
879 877 if not bundle1allowed(repo, 'pull'):
880 878 if not exchange.bundle2requested(opts.get('bundlecaps')):
881 879 if proto.name == 'http-v1':
882 880 return ooberror(bundle2required)
883 881 raise error.Abort(bundle2requiredmain,
884 882 hint=bundle2requiredhint)
885 883
886 884 prefercompressed = True
887 885
888 886 try:
889 887 if repo.ui.configbool('server', 'disablefullbundle'):
890 888 # Check to see if this is a full clone.
891 889 clheads = set(repo.changelog.heads())
892 890 changegroup = opts.get('cg', True)
893 891 heads = set(opts.get('heads', set()))
894 892 common = set(opts.get('common', set()))
895 893 common.discard(nullid)
896 894 if changegroup and not common and clheads == heads:
897 895 raise error.Abort(
898 896 _('server has pull-based clones disabled'),
899 897 hint=_('remove --pull if specified or upgrade Mercurial'))
900 898
901 899 info, chunks = exchange.getbundlechunks(repo, 'serve',
902 900 **pycompat.strkwargs(opts))
903 901 prefercompressed = info.get('prefercompressed', True)
904 902 except error.Abort as exc:
905 903 # cleanly forward Abort error to the client
906 904 if not exchange.bundle2requested(opts.get('bundlecaps')):
907 905 if proto.name == 'http-v1':
908 906 return ooberror(pycompat.bytestr(exc) + '\n')
909 907 raise # cannot do better for bundle1 + ssh
910 908 # bundle2 request expect a bundle2 reply
911 909 bundler = bundle2.bundle20(repo.ui)
912 910 manargs = [('message', pycompat.bytestr(exc))]
913 911 advargs = []
914 912 if exc.hint is not None:
915 913 advargs.append(('hint', exc.hint))
916 914 bundler.addpart(bundle2.bundlepart('error:abort',
917 915 manargs, advargs))
918 916 chunks = bundler.getchunks()
919 917 prefercompressed = False
920 918
921 919 return streamres(gen=chunks, prefer_uncompressed=not prefercompressed)
922 920
923 921 @wireprotocommand('heads')
924 922 def heads(repo, proto):
925 923 h = repo.heads()
926 924 return bytesresponse(encodelist(h) + '\n')
927 925
928 926 @wireprotocommand('hello')
929 927 def hello(repo, proto):
930 928 """Called as part of SSH handshake to obtain server info.
931 929
932 930 Returns a list of lines describing interesting things about the
933 931 server, in an RFC822-like format.
934 932
935 933 Currently, the only one defined is ``capabilities``, which consists of a
936 934 line of space separated tokens describing server abilities:
937 935
938 936 capabilities: <token0> <token1> <token2>
939 937 """
940 938 caps = capabilities(repo, proto).data
941 939 return bytesresponse('capabilities: %s\n' % caps)
942 940
943 941 @wireprotocommand('listkeys', 'namespace')
944 942 def listkeys(repo, proto, namespace):
945 943 d = sorted(repo.listkeys(encoding.tolocal(namespace)).items())
946 944 return bytesresponse(pushkeymod.encodekeys(d))
947 945
948 946 @wireprotocommand('lookup', 'key')
949 947 def lookup(repo, proto, key):
950 948 try:
951 949 k = encoding.tolocal(key)
952 950 c = repo[k]
953 951 r = c.hex()
954 952 success = 1
955 953 except Exception as inst:
956 954 r = util.forcebytestr(inst)
957 955 success = 0
958 956 return bytesresponse('%d %s\n' % (success, r))
959 957
960 958 @wireprotocommand('known', 'nodes *')
961 959 def known(repo, proto, nodes, others):
962 960 v = ''.join(b and '1' or '0' for b in repo.known(decodelist(nodes)))
963 961 return bytesresponse(v)
964 962
965 963 @wireprotocommand('pushkey', 'namespace key old new')
966 964 def pushkey(repo, proto, namespace, key, old, new):
967 965 # compatibility with pre-1.8 clients which were accidentally
968 966 # sending raw binary nodes rather than utf-8-encoded hex
969 967 if len(new) == 20 and util.escapestr(new) != new:
970 968 # looks like it could be a binary node
971 969 try:
972 970 new.decode('utf-8')
973 971 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
974 972 except UnicodeDecodeError:
975 973 pass # binary, leave unmodified
976 974 else:
977 975 new = encoding.tolocal(new) # normal path
978 976
979 977 with proto.mayberedirectstdio() as output:
980 978 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
981 979 encoding.tolocal(old), new) or False
982 980
983 981 output = output.getvalue() if output else ''
984 982 return bytesresponse('%d\n%s' % (int(r), output))
985 983
986 984 @wireprotocommand('stream_out')
987 985 def stream(repo, proto):
988 986 '''If the server supports streaming clone, it advertises the "stream"
989 987 capability with a value representing the version and flags of the repo
990 988 it is serving. Client checks to see if it understands the format.
991 989 '''
992 990 return streamres_legacy(streamclone.generatev1wireproto(repo))
993 991
994 992 @wireprotocommand('unbundle', 'heads')
995 993 def unbundle(repo, proto, heads):
996 994 their_heads = decodelist(heads)
997 995
998 996 with proto.mayberedirectstdio() as output:
999 997 try:
1000 998 exchange.check_heads(repo, their_heads, 'preparing changes')
1001 999
1002 1000 # write bundle data to temporary file because it can be big
1003 1001 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1004 1002 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
1005 1003 r = 0
1006 1004 try:
1007 1005 proto.forwardpayload(fp)
1008 1006 fp.seek(0)
1009 1007 gen = exchange.readbundle(repo.ui, fp, None)
1010 1008 if (isinstance(gen, changegroupmod.cg1unpacker)
1011 1009 and not bundle1allowed(repo, 'push')):
1012 1010 if proto.name == 'http-v1':
1013 1011 # need to special case http because stderr do not get to
1014 1012 # the http client on failed push so we need to abuse
1015 1013 # some other error type to make sure the message get to
1016 1014 # the user.
1017 1015 return ooberror(bundle2required)
1018 1016 raise error.Abort(bundle2requiredmain,
1019 1017 hint=bundle2requiredhint)
1020 1018
1021 1019 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1022 1020 proto.client())
1023 1021 if util.safehasattr(r, 'addpart'):
1024 1022 # The return looks streamable, we are in the bundle2 case
1025 1023 # and should return a stream.
1026 1024 return streamres_legacy(gen=r.getchunks())
1027 1025 return pushres(r, output.getvalue() if output else '')
1028 1026
1029 1027 finally:
1030 1028 fp.close()
1031 1029 os.unlink(tempname)
1032 1030
1033 1031 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1034 1032 # handle non-bundle2 case first
1035 1033 if not getattr(exc, 'duringunbundle2', False):
1036 1034 try:
1037 1035 raise
1038 1036 except error.Abort:
1039 1037 # The old code we moved used util.stderr directly.
1040 1038 # We did not change it to minimise code change.
1041 1039 # This need to be moved to something proper.
1042 1040 # Feel free to do it.
1043 1041 util.stderr.write("abort: %s\n" % exc)
1044 1042 if exc.hint is not None:
1045 1043 util.stderr.write("(%s)\n" % exc.hint)
1046 1044 return pushres(0, output.getvalue() if output else '')
1047 1045 except error.PushRaced:
1048 1046 return pusherr(str(exc),
1049 1047 output.getvalue() if output else '')
1050 1048
1051 1049 bundler = bundle2.bundle20(repo.ui)
1052 1050 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1053 1051 bundler.addpart(out)
1054 1052 try:
1055 1053 try:
1056 1054 raise
1057 1055 except error.PushkeyFailed as exc:
1058 1056 # check client caps
1059 1057 remotecaps = getattr(exc, '_replycaps', None)
1060 1058 if (remotecaps is not None
1061 1059 and 'pushkey' not in remotecaps.get('error', ())):
1062 1060 # no support remote side, fallback to Abort handler.
1063 1061 raise
1064 1062 part = bundler.newpart('error:pushkey')
1065 1063 part.addparam('in-reply-to', exc.partid)
1066 1064 if exc.namespace is not None:
1067 1065 part.addparam('namespace', exc.namespace,
1068 1066 mandatory=False)
1069 1067 if exc.key is not None:
1070 1068 part.addparam('key', exc.key, mandatory=False)
1071 1069 if exc.new is not None:
1072 1070 part.addparam('new', exc.new, mandatory=False)
1073 1071 if exc.old is not None:
1074 1072 part.addparam('old', exc.old, mandatory=False)
1075 1073 if exc.ret is not None:
1076 1074 part.addparam('ret', exc.ret, mandatory=False)
1077 1075 except error.BundleValueError as exc:
1078 1076 errpart = bundler.newpart('error:unsupportedcontent')
1079 1077 if exc.parttype is not None:
1080 1078 errpart.addparam('parttype', exc.parttype)
1081 1079 if exc.params:
1082 1080 errpart.addparam('params', '\0'.join(exc.params))
1083 1081 except error.Abort as exc:
1084 1082 manargs = [('message', util.forcebytestr(exc))]
1085 1083 advargs = []
1086 1084 if exc.hint is not None:
1087 1085 advargs.append(('hint', exc.hint))
1088 1086 bundler.addpart(bundle2.bundlepart('error:abort',
1089 1087 manargs, advargs))
1090 1088 except error.PushRaced as exc:
1091 1089 bundler.newpart('error:pushraced',
1092 1090 [('message', util.forcebytestr(exc))])
1093 1091 return streamres_legacy(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now