##// END OF EJS Templates
wireproto: make @wireprotocommand version 1 only by default...
Gregory Szorc -
r37558:693cb376 default
parent child Browse files
Show More
@@ -1,1414 +1,1414
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import os
12 12 import tempfile
13 13
14 14 from .i18n import _
15 15 from .node import (
16 16 bin,
17 17 hex,
18 18 nullid,
19 19 )
20 20
21 21 from . import (
22 22 bundle2,
23 23 changegroup as changegroupmod,
24 24 discovery,
25 25 encoding,
26 26 error,
27 27 exchange,
28 28 peer,
29 29 pushkey as pushkeymod,
30 30 pycompat,
31 31 repository,
32 32 streamclone,
33 33 util,
34 34 wireprototypes,
35 35 )
36 36
37 37 from .utils import (
38 38 procutil,
39 39 stringutil,
40 40 )
41 41
42 42 urlerr = util.urlerr
43 43 urlreq = util.urlreq
44 44
45 45 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
46 46 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
47 47 'IncompatibleClient')
48 48 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
49 49
50 50 class remoteiterbatcher(peer.iterbatcher):
51 51 def __init__(self, remote):
52 52 super(remoteiterbatcher, self).__init__()
53 53 self._remote = remote
54 54
55 55 def __getattr__(self, name):
56 56 # Validate this method is batchable, since submit() only supports
57 57 # batchable methods.
58 58 fn = getattr(self._remote, name)
59 59 if not getattr(fn, 'batchable', None):
60 60 raise error.ProgrammingError('Attempted to batch a non-batchable '
61 61 'call to %r' % name)
62 62
63 63 return super(remoteiterbatcher, self).__getattr__(name)
64 64
65 65 def submit(self):
66 66 """Break the batch request into many patch calls and pipeline them.
67 67
68 68 This is mostly valuable over http where request sizes can be
69 69 limited, but can be used in other places as well.
70 70 """
71 71 # 2-tuple of (command, arguments) that represents what will be
72 72 # sent over the wire.
73 73 requests = []
74 74
75 75 # 4-tuple of (command, final future, @batchable generator, remote
76 76 # future).
77 77 results = []
78 78
79 79 for command, args, opts, finalfuture in self.calls:
80 80 mtd = getattr(self._remote, command)
81 81 batchable = mtd.batchable(mtd.__self__, *args, **opts)
82 82
83 83 commandargs, fremote = next(batchable)
84 84 assert fremote
85 85 requests.append((command, commandargs))
86 86 results.append((command, finalfuture, batchable, fremote))
87 87
88 88 if requests:
89 89 self._resultiter = self._remote._submitbatch(requests)
90 90
91 91 self._results = results
92 92
93 93 def results(self):
94 94 for command, finalfuture, batchable, remotefuture in self._results:
95 95 # Get the raw result, set it in the remote future, feed it
96 96 # back into the @batchable generator so it can be decoded, and
97 97 # set the result on the final future to this value.
98 98 remoteresult = next(self._resultiter)
99 99 remotefuture.set(remoteresult)
100 100 finalfuture.set(next(batchable))
101 101
102 102 # Verify our @batchable generators only emit 2 values.
103 103 try:
104 104 next(batchable)
105 105 except StopIteration:
106 106 pass
107 107 else:
108 108 raise error.ProgrammingError('%s @batchable generator emitted '
109 109 'unexpected value count' % command)
110 110
111 111 yield finalfuture.value
112 112
113 113 # Forward a couple of names from peer to make wireproto interactions
114 114 # slightly more sensible.
115 115 batchable = peer.batchable
116 116 future = peer.future
117 117
118 118 # list of nodes encoding / decoding
119 119
120 120 def decodelist(l, sep=' '):
121 121 if l:
122 122 return [bin(v) for v in l.split(sep)]
123 123 return []
124 124
125 125 def encodelist(l, sep=' '):
126 126 try:
127 127 return sep.join(map(hex, l))
128 128 except TypeError:
129 129 raise
130 130
131 131 # batched call argument encoding
132 132
133 133 def escapearg(plain):
134 134 return (plain
135 135 .replace(':', ':c')
136 136 .replace(',', ':o')
137 137 .replace(';', ':s')
138 138 .replace('=', ':e'))
139 139
140 140 def unescapearg(escaped):
141 141 return (escaped
142 142 .replace(':e', '=')
143 143 .replace(':s', ';')
144 144 .replace(':o', ',')
145 145 .replace(':c', ':'))
146 146
147 147 def encodebatchcmds(req):
148 148 """Return a ``cmds`` argument value for the ``batch`` command."""
149 149 cmds = []
150 150 for op, argsdict in req:
151 151 # Old servers didn't properly unescape argument names. So prevent
152 152 # the sending of argument names that may not be decoded properly by
153 153 # servers.
154 154 assert all(escapearg(k) == k for k in argsdict)
155 155
156 156 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
157 157 for k, v in argsdict.iteritems())
158 158 cmds.append('%s %s' % (op, args))
159 159
160 160 return ';'.join(cmds)
161 161
162 162 def clientcompressionsupport(proto):
163 163 """Returns a list of compression methods supported by the client.
164 164
165 165 Returns a list of the compression methods supported by the client
166 166 according to the protocol capabilities. If no such capability has
167 167 been announced, fallback to the default of zlib and uncompressed.
168 168 """
169 169 for cap in proto.getprotocaps():
170 170 if cap.startswith('comp='):
171 171 return cap[5:].split(',')
172 172 return ['zlib', 'none']
173 173
174 174 # mapping of options accepted by getbundle and their types
175 175 #
176 176 # Meant to be extended by extensions. It is extensions responsibility to ensure
177 177 # such options are properly processed in exchange.getbundle.
178 178 #
179 179 # supported types are:
180 180 #
181 181 # :nodes: list of binary nodes
182 182 # :csv: list of comma-separated values
183 183 # :scsv: list of comma-separated values return as set
184 184 # :plain: string with no transformation needed.
185 185 gboptsmap = {'heads': 'nodes',
186 186 'bookmarks': 'boolean',
187 187 'common': 'nodes',
188 188 'obsmarkers': 'boolean',
189 189 'phases': 'boolean',
190 190 'bundlecaps': 'scsv',
191 191 'listkeys': 'csv',
192 192 'cg': 'boolean',
193 193 'cbattempted': 'boolean',
194 194 'stream': 'boolean',
195 195 }
196 196
197 197 # client side
198 198
199 199 class wirepeer(repository.legacypeer):
200 200 """Client-side interface for communicating with a peer repository.
201 201
202 202 Methods commonly call wire protocol commands of the same name.
203 203
204 204 See also httppeer.py and sshpeer.py for protocol-specific
205 205 implementations of this interface.
206 206 """
207 207 # Begin of ipeercommands interface.
208 208
209 209 def iterbatch(self):
210 210 return remoteiterbatcher(self)
211 211
212 212 @batchable
213 213 def lookup(self, key):
214 214 self.requirecap('lookup', _('look up remote revision'))
215 215 f = future()
216 216 yield {'key': encoding.fromlocal(key)}, f
217 217 d = f.value
218 218 success, data = d[:-1].split(" ", 1)
219 219 if int(success):
220 220 yield bin(data)
221 221 else:
222 222 self._abort(error.RepoError(data))
223 223
224 224 @batchable
225 225 def heads(self):
226 226 f = future()
227 227 yield {}, f
228 228 d = f.value
229 229 try:
230 230 yield decodelist(d[:-1])
231 231 except ValueError:
232 232 self._abort(error.ResponseError(_("unexpected response:"), d))
233 233
234 234 @batchable
235 235 def known(self, nodes):
236 236 f = future()
237 237 yield {'nodes': encodelist(nodes)}, f
238 238 d = f.value
239 239 try:
240 240 yield [bool(int(b)) for b in d]
241 241 except ValueError:
242 242 self._abort(error.ResponseError(_("unexpected response:"), d))
243 243
244 244 @batchable
245 245 def branchmap(self):
246 246 f = future()
247 247 yield {}, f
248 248 d = f.value
249 249 try:
250 250 branchmap = {}
251 251 for branchpart in d.splitlines():
252 252 branchname, branchheads = branchpart.split(' ', 1)
253 253 branchname = encoding.tolocal(urlreq.unquote(branchname))
254 254 branchheads = decodelist(branchheads)
255 255 branchmap[branchname] = branchheads
256 256 yield branchmap
257 257 except TypeError:
258 258 self._abort(error.ResponseError(_("unexpected response:"), d))
259 259
260 260 @batchable
261 261 def listkeys(self, namespace):
262 262 if not self.capable('pushkey'):
263 263 yield {}, None
264 264 f = future()
265 265 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
266 266 yield {'namespace': encoding.fromlocal(namespace)}, f
267 267 d = f.value
268 268 self.ui.debug('received listkey for "%s": %i bytes\n'
269 269 % (namespace, len(d)))
270 270 yield pushkeymod.decodekeys(d)
271 271
272 272 @batchable
273 273 def pushkey(self, namespace, key, old, new):
274 274 if not self.capable('pushkey'):
275 275 yield False, None
276 276 f = future()
277 277 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
278 278 yield {'namespace': encoding.fromlocal(namespace),
279 279 'key': encoding.fromlocal(key),
280 280 'old': encoding.fromlocal(old),
281 281 'new': encoding.fromlocal(new)}, f
282 282 d = f.value
283 283 d, output = d.split('\n', 1)
284 284 try:
285 285 d = bool(int(d))
286 286 except ValueError:
287 287 raise error.ResponseError(
288 288 _('push failed (unexpected response):'), d)
289 289 for l in output.splitlines(True):
290 290 self.ui.status(_('remote: '), l)
291 291 yield d
292 292
293 293 def stream_out(self):
294 294 return self._callstream('stream_out')
295 295
296 296 def getbundle(self, source, **kwargs):
297 297 kwargs = pycompat.byteskwargs(kwargs)
298 298 self.requirecap('getbundle', _('look up remote changes'))
299 299 opts = {}
300 300 bundlecaps = kwargs.get('bundlecaps') or set()
301 301 for key, value in kwargs.iteritems():
302 302 if value is None:
303 303 continue
304 304 keytype = gboptsmap.get(key)
305 305 if keytype is None:
306 306 raise error.ProgrammingError(
307 307 'Unexpectedly None keytype for key %s' % key)
308 308 elif keytype == 'nodes':
309 309 value = encodelist(value)
310 310 elif keytype == 'csv':
311 311 value = ','.join(value)
312 312 elif keytype == 'scsv':
313 313 value = ','.join(sorted(value))
314 314 elif keytype == 'boolean':
315 315 value = '%i' % bool(value)
316 316 elif keytype != 'plain':
317 317 raise KeyError('unknown getbundle option type %s'
318 318 % keytype)
319 319 opts[key] = value
320 320 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
321 321 if any((cap.startswith('HG2') for cap in bundlecaps)):
322 322 return bundle2.getunbundler(self.ui, f)
323 323 else:
324 324 return changegroupmod.cg1unpacker(f, 'UN')
325 325
326 326 def unbundle(self, cg, heads, url):
327 327 '''Send cg (a readable file-like object representing the
328 328 changegroup to push, typically a chunkbuffer object) to the
329 329 remote server as a bundle.
330 330
331 331 When pushing a bundle10 stream, return an integer indicating the
332 332 result of the push (see changegroup.apply()).
333 333
334 334 When pushing a bundle20 stream, return a bundle20 stream.
335 335
336 336 `url` is the url the client thinks it's pushing to, which is
337 337 visible to hooks.
338 338 '''
339 339
340 340 if heads != ['force'] and self.capable('unbundlehash'):
341 341 heads = encodelist(['hashed',
342 342 hashlib.sha1(''.join(sorted(heads))).digest()])
343 343 else:
344 344 heads = encodelist(heads)
345 345
346 346 if util.safehasattr(cg, 'deltaheader'):
347 347 # this a bundle10, do the old style call sequence
348 348 ret, output = self._callpush("unbundle", cg, heads=heads)
349 349 if ret == "":
350 350 raise error.ResponseError(
351 351 _('push failed:'), output)
352 352 try:
353 353 ret = int(ret)
354 354 except ValueError:
355 355 raise error.ResponseError(
356 356 _('push failed (unexpected response):'), ret)
357 357
358 358 for l in output.splitlines(True):
359 359 self.ui.status(_('remote: '), l)
360 360 else:
361 361 # bundle2 push. Send a stream, fetch a stream.
362 362 stream = self._calltwowaystream('unbundle', cg, heads=heads)
363 363 ret = bundle2.getunbundler(self.ui, stream)
364 364 return ret
365 365
366 366 # End of ipeercommands interface.
367 367
368 368 # Begin of ipeerlegacycommands interface.
369 369
370 370 def branches(self, nodes):
371 371 n = encodelist(nodes)
372 372 d = self._call("branches", nodes=n)
373 373 try:
374 374 br = [tuple(decodelist(b)) for b in d.splitlines()]
375 375 return br
376 376 except ValueError:
377 377 self._abort(error.ResponseError(_("unexpected response:"), d))
378 378
379 379 def between(self, pairs):
380 380 batch = 8 # avoid giant requests
381 381 r = []
382 382 for i in xrange(0, len(pairs), batch):
383 383 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
384 384 d = self._call("between", pairs=n)
385 385 try:
386 386 r.extend(l and decodelist(l) or [] for l in d.splitlines())
387 387 except ValueError:
388 388 self._abort(error.ResponseError(_("unexpected response:"), d))
389 389 return r
390 390
391 391 def changegroup(self, nodes, kind):
392 392 n = encodelist(nodes)
393 393 f = self._callcompressable("changegroup", roots=n)
394 394 return changegroupmod.cg1unpacker(f, 'UN')
395 395
396 396 def changegroupsubset(self, bases, heads, kind):
397 397 self.requirecap('changegroupsubset', _('look up remote changes'))
398 398 bases = encodelist(bases)
399 399 heads = encodelist(heads)
400 400 f = self._callcompressable("changegroupsubset",
401 401 bases=bases, heads=heads)
402 402 return changegroupmod.cg1unpacker(f, 'UN')
403 403
404 404 # End of ipeerlegacycommands interface.
405 405
406 406 def _submitbatch(self, req):
407 407 """run batch request <req> on the server
408 408
409 409 Returns an iterator of the raw responses from the server.
410 410 """
411 411 ui = self.ui
412 412 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
413 413 ui.debug('devel-peer-request: batched-content\n')
414 414 for op, args in req:
415 415 msg = 'devel-peer-request: - %s (%d arguments)\n'
416 416 ui.debug(msg % (op, len(args)))
417 417
418 418 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
419 419 chunk = rsp.read(1024)
420 420 work = [chunk]
421 421 while chunk:
422 422 while ';' not in chunk and chunk:
423 423 chunk = rsp.read(1024)
424 424 work.append(chunk)
425 425 merged = ''.join(work)
426 426 while ';' in merged:
427 427 one, merged = merged.split(';', 1)
428 428 yield unescapearg(one)
429 429 chunk = rsp.read(1024)
430 430 work = [merged, chunk]
431 431 yield unescapearg(''.join(work))
432 432
433 433 def _submitone(self, op, args):
434 434 return self._call(op, **pycompat.strkwargs(args))
435 435
436 436 def debugwireargs(self, one, two, three=None, four=None, five=None):
437 437 # don't pass optional arguments left at their default value
438 438 opts = {}
439 439 if three is not None:
440 440 opts[r'three'] = three
441 441 if four is not None:
442 442 opts[r'four'] = four
443 443 return self._call('debugwireargs', one=one, two=two, **opts)
444 444
445 445 def _call(self, cmd, **args):
446 446 """execute <cmd> on the server
447 447
448 448 The command is expected to return a simple string.
449 449
450 450 returns the server reply as a string."""
451 451 raise NotImplementedError()
452 452
453 453 def _callstream(self, cmd, **args):
454 454 """execute <cmd> on the server
455 455
456 456 The command is expected to return a stream. Note that if the
457 457 command doesn't return a stream, _callstream behaves
458 458 differently for ssh and http peers.
459 459
460 460 returns the server reply as a file like object.
461 461 """
462 462 raise NotImplementedError()
463 463
464 464 def _callcompressable(self, cmd, **args):
465 465 """execute <cmd> on the server
466 466
467 467 The command is expected to return a stream.
468 468
469 469 The stream may have been compressed in some implementations. This
470 470 function takes care of the decompression. This is the only difference
471 471 with _callstream.
472 472
473 473 returns the server reply as a file like object.
474 474 """
475 475 raise NotImplementedError()
476 476
477 477 def _callpush(self, cmd, fp, **args):
478 478 """execute a <cmd> on server
479 479
480 480 The command is expected to be related to a push. Push has a special
481 481 return method.
482 482
483 483 returns the server reply as a (ret, output) tuple. ret is either
484 484 empty (error) or a stringified int.
485 485 """
486 486 raise NotImplementedError()
487 487
488 488 def _calltwowaystream(self, cmd, fp, **args):
489 489 """execute <cmd> on server
490 490
491 491 The command will send a stream to the server and get a stream in reply.
492 492 """
493 493 raise NotImplementedError()
494 494
495 495 def _abort(self, exception):
496 496 """clearly abort the wire protocol connection and raise the exception
497 497 """
498 498 raise NotImplementedError()
499 499
500 500 # server side
501 501
502 502 # wire protocol command can either return a string or one of these classes.
503 503
504 504 def getdispatchrepo(repo, proto, command):
505 505 """Obtain the repo used for processing wire protocol commands.
506 506
507 507 The intent of this function is to serve as a monkeypatch point for
508 508 extensions that need commands to operate on different repo views under
509 509 specialized circumstances.
510 510 """
511 511 return repo.filtered('served')
512 512
513 513 def dispatch(repo, proto, command):
514 514 repo = getdispatchrepo(repo, proto, command)
515 515
516 516 transportversion = wireprototypes.TRANSPORTS[proto.name]['version']
517 517 commandtable = commandsv2 if transportversion == 2 else commands
518 518 func, spec = commandtable[command]
519 519
520 520 args = proto.getargs(spec)
521 521
522 522 # Version 1 protocols define arguments as a list. Version 2 uses a dict.
523 523 if isinstance(args, list):
524 524 return func(repo, proto, *args)
525 525 elif isinstance(args, dict):
526 526 return func(repo, proto, **args)
527 527 else:
528 528 raise error.ProgrammingError('unexpected type returned from '
529 529 'proto.getargs(): %s' % type(args))
530 530
531 531 def options(cmd, keys, others):
532 532 opts = {}
533 533 for k in keys:
534 534 if k in others:
535 535 opts[k] = others[k]
536 536 del others[k]
537 537 if others:
538 538 procutil.stderr.write("warning: %s ignored unexpected arguments %s\n"
539 539 % (cmd, ",".join(others)))
540 540 return opts
541 541
542 542 def bundle1allowed(repo, action):
543 543 """Whether a bundle1 operation is allowed from the server.
544 544
545 545 Priority is:
546 546
547 547 1. server.bundle1gd.<action> (if generaldelta active)
548 548 2. server.bundle1.<action>
549 549 3. server.bundle1gd (if generaldelta active)
550 550 4. server.bundle1
551 551 """
552 552 ui = repo.ui
553 553 gd = 'generaldelta' in repo.requirements
554 554
555 555 if gd:
556 556 v = ui.configbool('server', 'bundle1gd.%s' % action)
557 557 if v is not None:
558 558 return v
559 559
560 560 v = ui.configbool('server', 'bundle1.%s' % action)
561 561 if v is not None:
562 562 return v
563 563
564 564 if gd:
565 565 v = ui.configbool('server', 'bundle1gd')
566 566 if v is not None:
567 567 return v
568 568
569 569 return ui.configbool('server', 'bundle1')
570 570
571 571 def supportedcompengines(ui, role):
572 572 """Obtain the list of supported compression engines for a request."""
573 573 assert role in (util.CLIENTROLE, util.SERVERROLE)
574 574
575 575 compengines = util.compengines.supportedwireengines(role)
576 576
577 577 # Allow config to override default list and ordering.
578 578 if role == util.SERVERROLE:
579 579 configengines = ui.configlist('server', 'compressionengines')
580 580 config = 'server.compressionengines'
581 581 else:
582 582 # This is currently implemented mainly to facilitate testing. In most
583 583 # cases, the server should be in charge of choosing a compression engine
584 584 # because a server has the most to lose from a sub-optimal choice. (e.g.
585 585 # CPU DoS due to an expensive engine or a network DoS due to poor
586 586 # compression ratio).
587 587 configengines = ui.configlist('experimental',
588 588 'clientcompressionengines')
589 589 config = 'experimental.clientcompressionengines'
590 590
591 591 # No explicit config. Filter out the ones that aren't supposed to be
592 592 # advertised and return default ordering.
593 593 if not configengines:
594 594 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
595 595 return [e for e in compengines
596 596 if getattr(e.wireprotosupport(), attr) > 0]
597 597
598 598 # If compression engines are listed in the config, assume there is a good
599 599 # reason for it (like server operators wanting to achieve specific
600 600 # performance characteristics). So fail fast if the config references
601 601 # unusable compression engines.
602 602 validnames = set(e.name() for e in compengines)
603 603 invalidnames = set(e for e in configengines if e not in validnames)
604 604 if invalidnames:
605 605 raise error.Abort(_('invalid compression engine defined in %s: %s') %
606 606 (config, ', '.join(sorted(invalidnames))))
607 607
608 608 compengines = [e for e in compengines if e.name() in configengines]
609 609 compengines = sorted(compengines,
610 610 key=lambda e: configengines.index(e.name()))
611 611
612 612 if not compengines:
613 613 raise error.Abort(_('%s config option does not specify any known '
614 614 'compression engines') % config,
615 615 hint=_('usable compression engines: %s') %
616 616 ', '.sorted(validnames))
617 617
618 618 return compengines
619 619
620 620 class commandentry(object):
621 621 """Represents a declared wire protocol command."""
622 622 def __init__(self, func, args='', transports=None,
623 623 permission='push'):
624 624 self.func = func
625 625 self.args = args
626 626 self.transports = transports or set()
627 627 self.permission = permission
628 628
629 629 def _merge(self, func, args):
630 630 """Merge this instance with an incoming 2-tuple.
631 631
632 632 This is called when a caller using the old 2-tuple API attempts
633 633 to replace an instance. The incoming values are merged with
634 634 data not captured by the 2-tuple and a new instance containing
635 635 the union of the two objects is returned.
636 636 """
637 637 return commandentry(func, args=args, transports=set(self.transports),
638 638 permission=self.permission)
639 639
640 640 # Old code treats instances as 2-tuples. So expose that interface.
641 641 def __iter__(self):
642 642 yield self.func
643 643 yield self.args
644 644
645 645 def __getitem__(self, i):
646 646 if i == 0:
647 647 return self.func
648 648 elif i == 1:
649 649 return self.args
650 650 else:
651 651 raise IndexError('can only access elements 0 and 1')
652 652
653 653 class commanddict(dict):
654 654 """Container for registered wire protocol commands.
655 655
656 656 It behaves like a dict. But __setitem__ is overwritten to allow silent
657 657 coercion of values from 2-tuples for API compatibility.
658 658 """
659 659 def __setitem__(self, k, v):
660 660 if isinstance(v, commandentry):
661 661 pass
662 662 # Cast 2-tuples to commandentry instances.
663 663 elif isinstance(v, tuple):
664 664 if len(v) != 2:
665 665 raise ValueError('command tuples must have exactly 2 elements')
666 666
667 667 # It is common for extensions to wrap wire protocol commands via
668 668 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
669 669 # doing this aren't aware of the new API that uses objects to store
670 670 # command entries, we automatically merge old state with new.
671 671 if k in self:
672 672 v = self[k]._merge(v[0], v[1])
673 673 else:
674 674 # Use default values from @wireprotocommand.
675 675 v = commandentry(v[0], args=v[1],
676 676 transports=set(wireprototypes.TRANSPORTS),
677 677 permission='push')
678 678 else:
679 679 raise ValueError('command entries must be commandentry instances '
680 680 'or 2-tuples')
681 681
682 682 return super(commanddict, self).__setitem__(k, v)
683 683
684 684 def commandavailable(self, command, proto):
685 685 """Determine if a command is available for the requested protocol."""
686 686 assert proto.name in wireprototypes.TRANSPORTS
687 687
688 688 entry = self.get(command)
689 689
690 690 if not entry:
691 691 return False
692 692
693 693 if proto.name not in entry.transports:
694 694 return False
695 695
696 696 return True
697 697
698 698 # Constants specifying which transports a wire protocol command should be
699 699 # available on. For use with @wireprotocommand.
700 700 POLICY_ALL = 'all'
701 701 POLICY_V1_ONLY = 'v1-only'
702 702 POLICY_V2_ONLY = 'v2-only'
703 703
704 704 # For version 1 transports.
705 705 commands = commanddict()
706 706
707 707 # For version 2 transports.
708 708 commandsv2 = commanddict()
709 709
710 def wireprotocommand(name, args='', transportpolicy=POLICY_ALL,
710 def wireprotocommand(name, args='', transportpolicy=POLICY_V1_ONLY,
711 711 permission='push'):
712 712 """Decorator to declare a wire protocol command.
713 713
714 714 ``name`` is the name of the wire protocol command being provided.
715 715
716 716 ``args`` defines the named arguments accepted by the command. It is
717 717 ideally a dict mapping argument names to their types. For backwards
718 718 compatibility, it can be a space-delimited list of argument names. For
719 719 version 1 transports, ``*`` denotes a special value that says to accept
720 720 all named arguments.
721 721
722 722 ``transportpolicy`` is a POLICY_* constant denoting which transports
723 723 this wire protocol command should be exposed to. By default, commands
724 724 are exposed to all wire protocol transports.
725 725
726 726 ``permission`` defines the permission type needed to run this command.
727 727 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
728 728 respectively. Default is to assume command requires ``push`` permissions
729 729 because otherwise commands not declaring their permissions could modify
730 730 a repository that is supposed to be read-only.
731 731 """
732 732 if transportpolicy == POLICY_ALL:
733 733 transports = set(wireprototypes.TRANSPORTS)
734 734 transportversions = {1, 2}
735 735 elif transportpolicy == POLICY_V1_ONLY:
736 736 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
737 737 if v['version'] == 1}
738 738 transportversions = {1}
739 739 elif transportpolicy == POLICY_V2_ONLY:
740 740 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
741 741 if v['version'] == 2}
742 742 transportversions = {2}
743 743 else:
744 744 raise error.ProgrammingError('invalid transport policy value: %s' %
745 745 transportpolicy)
746 746
747 747 # Because SSHv2 is a mirror of SSHv1, we allow "batch" commands through to
748 748 # SSHv2.
749 749 # TODO undo this hack when SSH is using the unified frame protocol.
750 750 if name == b'batch':
751 751 transports.add(wireprototypes.SSHV2)
752 752
753 753 if permission not in ('push', 'pull'):
754 754 raise error.ProgrammingError('invalid wire protocol permission; '
755 755 'got %s; expected "push" or "pull"' %
756 756 permission)
757 757
758 758 if 1 in transportversions and not isinstance(args, bytes):
759 759 raise error.ProgrammingError('arguments for version 1 commands must '
760 760 'be declared as bytes')
761 761
762 762 if isinstance(args, bytes):
763 763 dictargs = {arg: b'legacy' for arg in args.split()}
764 764 elif isinstance(args, dict):
765 765 dictargs = args
766 766 else:
767 767 raise ValueError('args must be bytes or a dict')
768 768
769 769 def register(func):
770 770 if 1 in transportversions:
771 771 if name in commands:
772 772 raise error.ProgrammingError('%s command already registered '
773 773 'for version 1' % name)
774 774 commands[name] = commandentry(func, args=args,
775 775 transports=transports,
776 776 permission=permission)
777 777 if 2 in transportversions:
778 778 if name in commandsv2:
779 779 raise error.ProgrammingError('%s command already registered '
780 780 'for version 2' % name)
781 781
782 782 commandsv2[name] = commandentry(func, args=dictargs,
783 783 transports=transports,
784 784 permission=permission)
785 785
786 786 return func
787 787 return register
788 788
789 789 # TODO define a more appropriate permissions type to use for this.
790 790 @wireprotocommand('batch', 'cmds *', permission='pull',
791 791 transportpolicy=POLICY_V1_ONLY)
792 792 def batch(repo, proto, cmds, others):
793 793 repo = repo.filtered("served")
794 794 res = []
795 795 for pair in cmds.split(';'):
796 796 op, args = pair.split(' ', 1)
797 797 vals = {}
798 798 for a in args.split(','):
799 799 if a:
800 800 n, v = a.split('=')
801 801 vals[unescapearg(n)] = unescapearg(v)
802 802 func, spec = commands[op]
803 803
804 804 # Validate that client has permissions to perform this command.
805 805 perm = commands[op].permission
806 806 assert perm in ('push', 'pull')
807 807 proto.checkperm(perm)
808 808
809 809 if spec:
810 810 keys = spec.split()
811 811 data = {}
812 812 for k in keys:
813 813 if k == '*':
814 814 star = {}
815 815 for key in vals.keys():
816 816 if key not in keys:
817 817 star[key] = vals[key]
818 818 data['*'] = star
819 819 else:
820 820 data[k] = vals[k]
821 821 result = func(repo, proto, *[data[k] for k in keys])
822 822 else:
823 823 result = func(repo, proto)
824 824 if isinstance(result, wireprototypes.ooberror):
825 825 return result
826 826
827 827 # For now, all batchable commands must return bytesresponse or
828 828 # raw bytes (for backwards compatibility).
829 829 assert isinstance(result, (wireprototypes.bytesresponse, bytes))
830 830 if isinstance(result, wireprototypes.bytesresponse):
831 831 result = result.data
832 832 res.append(escapearg(result))
833 833
834 834 return wireprototypes.bytesresponse(';'.join(res))
835 835
836 836 @wireprotocommand('between', 'pairs', transportpolicy=POLICY_V1_ONLY,
837 837 permission='pull')
838 838 def between(repo, proto, pairs):
839 839 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
840 840 r = []
841 841 for b in repo.between(pairs):
842 842 r.append(encodelist(b) + "\n")
843 843
844 844 return wireprototypes.bytesresponse(''.join(r))
845 845
846 846 @wireprotocommand('branchmap', permission='pull',
847 847 transportpolicy=POLICY_V1_ONLY)
848 848 def branchmap(repo, proto):
849 849 branchmap = repo.branchmap()
850 850 heads = []
851 851 for branch, nodes in branchmap.iteritems():
852 852 branchname = urlreq.quote(encoding.fromlocal(branch))
853 853 branchnodes = encodelist(nodes)
854 854 heads.append('%s %s' % (branchname, branchnodes))
855 855
856 856 return wireprototypes.bytesresponse('\n'.join(heads))
857 857
858 858 @wireprotocommand('branches', 'nodes', transportpolicy=POLICY_V1_ONLY,
859 859 permission='pull')
860 860 def branches(repo, proto, nodes):
861 861 nodes = decodelist(nodes)
862 862 r = []
863 863 for b in repo.branches(nodes):
864 864 r.append(encodelist(b) + "\n")
865 865
866 866 return wireprototypes.bytesresponse(''.join(r))
867 867
868 868 @wireprotocommand('clonebundles', '', permission='pull',
869 869 transportpolicy=POLICY_V1_ONLY)
870 870 def clonebundles(repo, proto):
871 871 """Server command for returning info for available bundles to seed clones.
872 872
873 873 Clients will parse this response and determine what bundle to fetch.
874 874
875 875 Extensions may wrap this command to filter or dynamically emit data
876 876 depending on the request. e.g. you could advertise URLs for the closest
877 877 data center given the client's IP address.
878 878 """
879 879 return wireprototypes.bytesresponse(
880 880 repo.vfs.tryread('clonebundles.manifest'))
881 881
882 882 wireprotocaps = ['lookup', 'branchmap', 'pushkey',
883 883 'known', 'getbundle', 'unbundlehash']
884 884
885 885 def _capabilities(repo, proto):
886 886 """return a list of capabilities for a repo
887 887
888 888 This function exists to allow extensions to easily wrap capabilities
889 889 computation
890 890
891 891 - returns a lists: easy to alter
892 892 - change done here will be propagated to both `capabilities` and `hello`
893 893 command without any other action needed.
894 894 """
895 895 # copy to prevent modification of the global list
896 896 caps = list(wireprotocaps)
897 897
898 898 # Command of same name as capability isn't exposed to version 1 of
899 899 # transports. So conditionally add it.
900 900 if commands.commandavailable('changegroupsubset', proto):
901 901 caps.append('changegroupsubset')
902 902
903 903 if streamclone.allowservergeneration(repo):
904 904 if repo.ui.configbool('server', 'preferuncompressed'):
905 905 caps.append('stream-preferred')
906 906 requiredformats = repo.requirements & repo.supportedformats
907 907 # if our local revlogs are just revlogv1, add 'stream' cap
908 908 if not requiredformats - {'revlogv1'}:
909 909 caps.append('stream')
910 910 # otherwise, add 'streamreqs' detailing our local revlog format
911 911 else:
912 912 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
913 913 if repo.ui.configbool('experimental', 'bundle2-advertise'):
914 914 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
915 915 caps.append('bundle2=' + urlreq.quote(capsblob))
916 916 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
917 917
918 918 return proto.addcapabilities(repo, caps)
919 919
920 920 # If you are writing an extension and consider wrapping this function. Wrap
921 921 # `_capabilities` instead.
922 922 @wireprotocommand('capabilities', permission='pull',
923 923 transportpolicy=POLICY_V1_ONLY)
924 924 def capabilities(repo, proto):
925 925 caps = _capabilities(repo, proto)
926 926 return wireprototypes.bytesresponse(' '.join(sorted(caps)))
927 927
928 928 @wireprotocommand('changegroup', 'roots', transportpolicy=POLICY_V1_ONLY,
929 929 permission='pull')
930 930 def changegroup(repo, proto, roots):
931 931 nodes = decodelist(roots)
932 932 outgoing = discovery.outgoing(repo, missingroots=nodes,
933 933 missingheads=repo.heads())
934 934 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
935 935 gen = iter(lambda: cg.read(32768), '')
936 936 return wireprototypes.streamres(gen=gen)
937 937
938 938 @wireprotocommand('changegroupsubset', 'bases heads',
939 939 transportpolicy=POLICY_V1_ONLY,
940 940 permission='pull')
941 941 def changegroupsubset(repo, proto, bases, heads):
942 942 bases = decodelist(bases)
943 943 heads = decodelist(heads)
944 944 outgoing = discovery.outgoing(repo, missingroots=bases,
945 945 missingheads=heads)
946 946 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
947 947 gen = iter(lambda: cg.read(32768), '')
948 948 return wireprototypes.streamres(gen=gen)
949 949
950 950 @wireprotocommand('debugwireargs', 'one two *',
951 951 permission='pull', transportpolicy=POLICY_V1_ONLY)
952 952 def debugwireargs(repo, proto, one, two, others):
953 953 # only accept optional args from the known set
954 954 opts = options('debugwireargs', ['three', 'four'], others)
955 955 return wireprototypes.bytesresponse(repo.debugwireargs(
956 956 one, two, **pycompat.strkwargs(opts)))
957 957
958 958 def find_pullbundle(repo, proto, opts, clheads, heads, common):
959 959 """Return a file object for the first matching pullbundle.
960 960
961 961 Pullbundles are specified in .hg/pullbundles.manifest similar to
962 962 clonebundles.
963 963 For each entry, the bundle specification is checked for compatibility:
964 964 - Client features vs the BUNDLESPEC.
965 965 - Revisions shared with the clients vs base revisions of the bundle.
966 966 A bundle can be applied only if all its base revisions are known by
967 967 the client.
968 968 - At least one leaf of the bundle's DAG is missing on the client.
969 969 - Every leaf of the bundle's DAG is part of node set the client wants.
970 970 E.g. do not send a bundle of all changes if the client wants only
971 971 one specific branch of many.
972 972 """
973 973 def decodehexstring(s):
974 974 return set([h.decode('hex') for h in s.split(';')])
975 975
976 976 manifest = repo.vfs.tryread('pullbundles.manifest')
977 977 if not manifest:
978 978 return None
979 979 res = exchange.parseclonebundlesmanifest(repo, manifest)
980 980 res = exchange.filterclonebundleentries(repo, res)
981 981 if not res:
982 982 return None
983 983 cl = repo.changelog
984 984 heads_anc = cl.ancestors([cl.rev(rev) for rev in heads], inclusive=True)
985 985 common_anc = cl.ancestors([cl.rev(rev) for rev in common], inclusive=True)
986 986 compformats = clientcompressionsupport(proto)
987 987 for entry in res:
988 988 if 'COMPRESSION' in entry and entry['COMPRESSION'] not in compformats:
989 989 continue
990 990 # No test yet for VERSION, since V2 is supported by any client
991 991 # that advertises partial pulls
992 992 if 'heads' in entry:
993 993 try:
994 994 bundle_heads = decodehexstring(entry['heads'])
995 995 except TypeError:
996 996 # Bad heads entry
997 997 continue
998 998 if bundle_heads.issubset(common):
999 999 continue # Nothing new
1000 1000 if all(cl.rev(rev) in common_anc for rev in bundle_heads):
1001 1001 continue # Still nothing new
1002 1002 if any(cl.rev(rev) not in heads_anc and
1003 1003 cl.rev(rev) not in common_anc for rev in bundle_heads):
1004 1004 continue
1005 1005 if 'bases' in entry:
1006 1006 try:
1007 1007 bundle_bases = decodehexstring(entry['bases'])
1008 1008 except TypeError:
1009 1009 # Bad bases entry
1010 1010 continue
1011 1011 if not all(cl.rev(rev) in common_anc for rev in bundle_bases):
1012 1012 continue
1013 1013 path = entry['URL']
1014 1014 repo.ui.debug('sending pullbundle "%s"\n' % path)
1015 1015 try:
1016 1016 return repo.vfs.open(path)
1017 1017 except IOError:
1018 1018 repo.ui.debug('pullbundle "%s" not accessible\n' % path)
1019 1019 continue
1020 1020 return None
1021 1021
1022 1022 @wireprotocommand('getbundle', '*', permission='pull',
1023 1023 transportpolicy=POLICY_V1_ONLY)
1024 1024 def getbundle(repo, proto, others):
1025 1025 opts = options('getbundle', gboptsmap.keys(), others)
1026 1026 for k, v in opts.iteritems():
1027 1027 keytype = gboptsmap[k]
1028 1028 if keytype == 'nodes':
1029 1029 opts[k] = decodelist(v)
1030 1030 elif keytype == 'csv':
1031 1031 opts[k] = list(v.split(','))
1032 1032 elif keytype == 'scsv':
1033 1033 opts[k] = set(v.split(','))
1034 1034 elif keytype == 'boolean':
1035 1035 # Client should serialize False as '0', which is a non-empty string
1036 1036 # so it evaluates as a True bool.
1037 1037 if v == '0':
1038 1038 opts[k] = False
1039 1039 else:
1040 1040 opts[k] = bool(v)
1041 1041 elif keytype != 'plain':
1042 1042 raise KeyError('unknown getbundle option type %s'
1043 1043 % keytype)
1044 1044
1045 1045 if not bundle1allowed(repo, 'pull'):
1046 1046 if not exchange.bundle2requested(opts.get('bundlecaps')):
1047 1047 if proto.name == 'http-v1':
1048 1048 return wireprototypes.ooberror(bundle2required)
1049 1049 raise error.Abort(bundle2requiredmain,
1050 1050 hint=bundle2requiredhint)
1051 1051
1052 1052 prefercompressed = True
1053 1053
1054 1054 try:
1055 1055 clheads = set(repo.changelog.heads())
1056 1056 heads = set(opts.get('heads', set()))
1057 1057 common = set(opts.get('common', set()))
1058 1058 common.discard(nullid)
1059 1059 if (repo.ui.configbool('server', 'pullbundle') and
1060 1060 'partial-pull' in proto.getprotocaps()):
1061 1061 # Check if a pre-built bundle covers this request.
1062 1062 bundle = find_pullbundle(repo, proto, opts, clheads, heads, common)
1063 1063 if bundle:
1064 1064 return wireprototypes.streamres(gen=util.filechunkiter(bundle),
1065 1065 prefer_uncompressed=True)
1066 1066
1067 1067 if repo.ui.configbool('server', 'disablefullbundle'):
1068 1068 # Check to see if this is a full clone.
1069 1069 changegroup = opts.get('cg', True)
1070 1070 if changegroup and not common and clheads == heads:
1071 1071 raise error.Abort(
1072 1072 _('server has pull-based clones disabled'),
1073 1073 hint=_('remove --pull if specified or upgrade Mercurial'))
1074 1074
1075 1075 info, chunks = exchange.getbundlechunks(repo, 'serve',
1076 1076 **pycompat.strkwargs(opts))
1077 1077 prefercompressed = info.get('prefercompressed', True)
1078 1078 except error.Abort as exc:
1079 1079 # cleanly forward Abort error to the client
1080 1080 if not exchange.bundle2requested(opts.get('bundlecaps')):
1081 1081 if proto.name == 'http-v1':
1082 1082 return wireprototypes.ooberror(pycompat.bytestr(exc) + '\n')
1083 1083 raise # cannot do better for bundle1 + ssh
1084 1084 # bundle2 request expect a bundle2 reply
1085 1085 bundler = bundle2.bundle20(repo.ui)
1086 1086 manargs = [('message', pycompat.bytestr(exc))]
1087 1087 advargs = []
1088 1088 if exc.hint is not None:
1089 1089 advargs.append(('hint', exc.hint))
1090 1090 bundler.addpart(bundle2.bundlepart('error:abort',
1091 1091 manargs, advargs))
1092 1092 chunks = bundler.getchunks()
1093 1093 prefercompressed = False
1094 1094
1095 1095 return wireprototypes.streamres(
1096 1096 gen=chunks, prefer_uncompressed=not prefercompressed)
1097 1097
1098 1098 @wireprotocommand('heads', permission='pull', transportpolicy=POLICY_V1_ONLY)
1099 1099 def heads(repo, proto):
1100 1100 h = repo.heads()
1101 1101 return wireprototypes.bytesresponse(encodelist(h) + '\n')
1102 1102
1103 1103 @wireprotocommand('hello', permission='pull', transportpolicy=POLICY_V1_ONLY)
1104 1104 def hello(repo, proto):
1105 1105 """Called as part of SSH handshake to obtain server info.
1106 1106
1107 1107 Returns a list of lines describing interesting things about the
1108 1108 server, in an RFC822-like format.
1109 1109
1110 1110 Currently, the only one defined is ``capabilities``, which consists of a
1111 1111 line of space separated tokens describing server abilities:
1112 1112
1113 1113 capabilities: <token0> <token1> <token2>
1114 1114 """
1115 1115 caps = capabilities(repo, proto).data
1116 1116 return wireprototypes.bytesresponse('capabilities: %s\n' % caps)
1117 1117
1118 1118 @wireprotocommand('listkeys', 'namespace', permission='pull',
1119 1119 transportpolicy=POLICY_V1_ONLY)
1120 1120 def listkeys(repo, proto, namespace):
1121 1121 d = sorted(repo.listkeys(encoding.tolocal(namespace)).items())
1122 1122 return wireprototypes.bytesresponse(pushkeymod.encodekeys(d))
1123 1123
1124 1124 @wireprotocommand('lookup', 'key', permission='pull',
1125 1125 transportpolicy=POLICY_V1_ONLY)
1126 1126 def lookup(repo, proto, key):
1127 1127 try:
1128 1128 k = encoding.tolocal(key)
1129 1129 n = repo.lookup(k)
1130 1130 r = hex(n)
1131 1131 success = 1
1132 1132 except Exception as inst:
1133 1133 r = stringutil.forcebytestr(inst)
1134 1134 success = 0
1135 1135 return wireprototypes.bytesresponse('%d %s\n' % (success, r))
1136 1136
1137 1137 @wireprotocommand('known', 'nodes *', permission='pull',
1138 1138 transportpolicy=POLICY_V1_ONLY)
1139 1139 def known(repo, proto, nodes, others):
1140 1140 v = ''.join(b and '1' or '0' for b in repo.known(decodelist(nodes)))
1141 1141 return wireprototypes.bytesresponse(v)
1142 1142
1143 1143 @wireprotocommand('protocaps', 'caps', permission='pull',
1144 1144 transportpolicy=POLICY_V1_ONLY)
1145 1145 def protocaps(repo, proto, caps):
1146 1146 if proto.name == wireprototypes.SSHV1:
1147 1147 proto._protocaps = set(caps.split(' '))
1148 1148 return wireprototypes.bytesresponse('OK')
1149 1149
1150 1150 @wireprotocommand('pushkey', 'namespace key old new', permission='push',
1151 1151 transportpolicy=POLICY_V1_ONLY)
1152 1152 def pushkey(repo, proto, namespace, key, old, new):
1153 1153 # compatibility with pre-1.8 clients which were accidentally
1154 1154 # sending raw binary nodes rather than utf-8-encoded hex
1155 1155 if len(new) == 20 and stringutil.escapestr(new) != new:
1156 1156 # looks like it could be a binary node
1157 1157 try:
1158 1158 new.decode('utf-8')
1159 1159 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
1160 1160 except UnicodeDecodeError:
1161 1161 pass # binary, leave unmodified
1162 1162 else:
1163 1163 new = encoding.tolocal(new) # normal path
1164 1164
1165 1165 with proto.mayberedirectstdio() as output:
1166 1166 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
1167 1167 encoding.tolocal(old), new) or False
1168 1168
1169 1169 output = output.getvalue() if output else ''
1170 1170 return wireprototypes.bytesresponse('%d\n%s' % (int(r), output))
1171 1171
1172 1172 @wireprotocommand('stream_out', permission='pull',
1173 1173 transportpolicy=POLICY_V1_ONLY)
1174 1174 def stream(repo, proto):
1175 1175 '''If the server supports streaming clone, it advertises the "stream"
1176 1176 capability with a value representing the version and flags of the repo
1177 1177 it is serving. Client checks to see if it understands the format.
1178 1178 '''
1179 1179 return wireprototypes.streamreslegacy(
1180 1180 streamclone.generatev1wireproto(repo))
1181 1181
1182 1182 @wireprotocommand('unbundle', 'heads', permission='push',
1183 1183 transportpolicy=POLICY_V1_ONLY)
1184 1184 def unbundle(repo, proto, heads):
1185 1185 their_heads = decodelist(heads)
1186 1186
1187 1187 with proto.mayberedirectstdio() as output:
1188 1188 try:
1189 1189 exchange.check_heads(repo, their_heads, 'preparing changes')
1190 1190 cleanup = lambda: None
1191 1191 try:
1192 1192 payload = proto.getpayload()
1193 1193 if repo.ui.configbool('server', 'streamunbundle'):
1194 1194 def cleanup():
1195 1195 # Ensure that the full payload is consumed, so
1196 1196 # that the connection doesn't contain trailing garbage.
1197 1197 for p in payload:
1198 1198 pass
1199 1199 fp = util.chunkbuffer(payload)
1200 1200 else:
1201 1201 # write bundle data to temporary file as it can be big
1202 1202 fp, tempname = None, None
1203 1203 def cleanup():
1204 1204 if fp:
1205 1205 fp.close()
1206 1206 if tempname:
1207 1207 os.unlink(tempname)
1208 1208 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1209 1209 repo.ui.debug('redirecting incoming bundle to %s\n' %
1210 1210 tempname)
1211 1211 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
1212 1212 r = 0
1213 1213 for p in payload:
1214 1214 fp.write(p)
1215 1215 fp.seek(0)
1216 1216
1217 1217 gen = exchange.readbundle(repo.ui, fp, None)
1218 1218 if (isinstance(gen, changegroupmod.cg1unpacker)
1219 1219 and not bundle1allowed(repo, 'push')):
1220 1220 if proto.name == 'http-v1':
1221 1221 # need to special case http because stderr do not get to
1222 1222 # the http client on failed push so we need to abuse
1223 1223 # some other error type to make sure the message get to
1224 1224 # the user.
1225 1225 return wireprototypes.ooberror(bundle2required)
1226 1226 raise error.Abort(bundle2requiredmain,
1227 1227 hint=bundle2requiredhint)
1228 1228
1229 1229 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1230 1230 proto.client())
1231 1231 if util.safehasattr(r, 'addpart'):
1232 1232 # The return looks streamable, we are in the bundle2 case
1233 1233 # and should return a stream.
1234 1234 return wireprototypes.streamreslegacy(gen=r.getchunks())
1235 1235 return wireprototypes.pushres(
1236 1236 r, output.getvalue() if output else '')
1237 1237
1238 1238 finally:
1239 1239 cleanup()
1240 1240
1241 1241 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1242 1242 # handle non-bundle2 case first
1243 1243 if not getattr(exc, 'duringunbundle2', False):
1244 1244 try:
1245 1245 raise
1246 1246 except error.Abort:
1247 1247 # The old code we moved used procutil.stderr directly.
1248 1248 # We did not change it to minimise code change.
1249 1249 # This need to be moved to something proper.
1250 1250 # Feel free to do it.
1251 1251 procutil.stderr.write("abort: %s\n" % exc)
1252 1252 if exc.hint is not None:
1253 1253 procutil.stderr.write("(%s)\n" % exc.hint)
1254 1254 procutil.stderr.flush()
1255 1255 return wireprototypes.pushres(
1256 1256 0, output.getvalue() if output else '')
1257 1257 except error.PushRaced:
1258 1258 return wireprototypes.pusherr(
1259 1259 pycompat.bytestr(exc),
1260 1260 output.getvalue() if output else '')
1261 1261
1262 1262 bundler = bundle2.bundle20(repo.ui)
1263 1263 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1264 1264 bundler.addpart(out)
1265 1265 try:
1266 1266 try:
1267 1267 raise
1268 1268 except error.PushkeyFailed as exc:
1269 1269 # check client caps
1270 1270 remotecaps = getattr(exc, '_replycaps', None)
1271 1271 if (remotecaps is not None
1272 1272 and 'pushkey' not in remotecaps.get('error', ())):
1273 1273 # no support remote side, fallback to Abort handler.
1274 1274 raise
1275 1275 part = bundler.newpart('error:pushkey')
1276 1276 part.addparam('in-reply-to', exc.partid)
1277 1277 if exc.namespace is not None:
1278 1278 part.addparam('namespace', exc.namespace,
1279 1279 mandatory=False)
1280 1280 if exc.key is not None:
1281 1281 part.addparam('key', exc.key, mandatory=False)
1282 1282 if exc.new is not None:
1283 1283 part.addparam('new', exc.new, mandatory=False)
1284 1284 if exc.old is not None:
1285 1285 part.addparam('old', exc.old, mandatory=False)
1286 1286 if exc.ret is not None:
1287 1287 part.addparam('ret', exc.ret, mandatory=False)
1288 1288 except error.BundleValueError as exc:
1289 1289 errpart = bundler.newpart('error:unsupportedcontent')
1290 1290 if exc.parttype is not None:
1291 1291 errpart.addparam('parttype', exc.parttype)
1292 1292 if exc.params:
1293 1293 errpart.addparam('params', '\0'.join(exc.params))
1294 1294 except error.Abort as exc:
1295 1295 manargs = [('message', stringutil.forcebytestr(exc))]
1296 1296 advargs = []
1297 1297 if exc.hint is not None:
1298 1298 advargs.append(('hint', exc.hint))
1299 1299 bundler.addpart(bundle2.bundlepart('error:abort',
1300 1300 manargs, advargs))
1301 1301 except error.PushRaced as exc:
1302 1302 bundler.newpart('error:pushraced',
1303 1303 [('message', stringutil.forcebytestr(exc))])
1304 1304 return wireprototypes.streamreslegacy(gen=bundler.getchunks())
1305 1305
1306 1306 # Wire protocol version 2 commands only past this point.
1307 1307
1308 1308 def _capabilitiesv2(repo, proto):
1309 1309 """Obtain the set of capabilities for version 2 transports.
1310 1310
1311 1311 These capabilities are distinct from the capabilities for version 1
1312 1312 transports.
1313 1313 """
1314 1314 compression = []
1315 1315 for engine in supportedcompengines(repo.ui, util.SERVERROLE):
1316 1316 compression.append({
1317 1317 b'name': engine.wireprotosupport().name,
1318 1318 })
1319 1319
1320 1320 caps = {
1321 1321 'commands': {},
1322 1322 'compression': compression,
1323 1323 }
1324 1324
1325 1325 for command, entry in commandsv2.items():
1326 1326 caps['commands'][command] = {
1327 1327 'args': entry.args,
1328 1328 'permissions': [entry.permission],
1329 1329 }
1330 1330
1331 1331 return proto.addcapabilities(repo, caps)
1332 1332
1333 1333 @wireprotocommand('branchmap', permission='pull',
1334 1334 transportpolicy=POLICY_V2_ONLY)
1335 1335 def branchmapv2(repo, proto):
1336 1336 branchmap = {encoding.fromlocal(k): v
1337 1337 for k, v in repo.branchmap().iteritems()}
1338 1338
1339 1339 return wireprototypes.cborresponse(branchmap)
1340 1340
1341 1341 @wireprotocommand('capabilities', permission='pull',
1342 1342 transportpolicy=POLICY_V2_ONLY)
1343 1343 def capabilitiesv2(repo, proto):
1344 1344 caps = _capabilitiesv2(repo, proto)
1345 1345
1346 1346 return wireprototypes.cborresponse(caps)
1347 1347
1348 1348 @wireprotocommand('heads',
1349 1349 args={
1350 1350 'publiconly': False,
1351 1351 },
1352 1352 permission='pull',
1353 1353 transportpolicy=POLICY_V2_ONLY)
1354 1354 def headsv2(repo, proto, publiconly=False):
1355 1355 if publiconly:
1356 1356 repo = repo.filtered('immutable')
1357 1357
1358 1358 return wireprototypes.cborresponse(repo.heads())
1359 1359
1360 1360 @wireprotocommand('known',
1361 1361 args={
1362 1362 'nodes': [b'deadbeef'],
1363 1363 },
1364 1364 permission='pull',
1365 1365 transportpolicy=POLICY_V2_ONLY)
1366 1366 def knownv2(repo, proto, nodes=None):
1367 1367 nodes = nodes or []
1368 1368 result = b''.join(b'1' if n else b'0' for n in repo.known(nodes))
1369 1369 return wireprototypes.cborresponse(result)
1370 1370
1371 1371 @wireprotocommand('listkeys',
1372 1372 args={
1373 1373 'namespace': b'ns',
1374 1374 },
1375 1375 permission='pull',
1376 1376 transportpolicy=POLICY_V2_ONLY)
1377 1377 def listkeysv2(repo, proto, namespace=None):
1378 1378 keys = repo.listkeys(encoding.tolocal(namespace))
1379 1379 keys = {encoding.fromlocal(k): encoding.fromlocal(v)
1380 1380 for k, v in keys.iteritems()}
1381 1381
1382 1382 return wireprototypes.cborresponse(keys)
1383 1383
1384 1384 @wireprotocommand('lookup',
1385 1385 args={
1386 1386 'key': b'foo',
1387 1387 },
1388 1388 permission='pull',
1389 1389 transportpolicy=POLICY_V2_ONLY)
1390 1390 def lookupv2(repo, proto, key):
1391 1391 key = encoding.tolocal(key)
1392 1392
1393 1393 # TODO handle exception.
1394 1394 node = repo.lookup(key)
1395 1395
1396 1396 return wireprototypes.cborresponse(node)
1397 1397
1398 1398 @wireprotocommand('pushkey',
1399 1399 args={
1400 1400 'namespace': b'ns',
1401 1401 'key': b'key',
1402 1402 'old': b'old',
1403 1403 'new': b'new',
1404 1404 },
1405 1405 permission='push',
1406 1406 transportpolicy=POLICY_V2_ONLY)
1407 1407 def pushkeyv2(repo, proto, namespace, key, old, new):
1408 1408 # TODO handle ui output redirection
1409 1409 r = repo.pushkey(encoding.tolocal(namespace),
1410 1410 encoding.tolocal(key),
1411 1411 encoding.tolocal(old),
1412 1412 encoding.tolocal(new))
1413 1413
1414 1414 return wireprototypes.cborresponse(r)
@@ -1,45 +1,55
1 1 HTTPV2=exp-http-v2-0001
2 2 MEDIATYPE=application/mercurial-exp-framing-0003
3 3
4 4 sendhttpraw() {
5 5 hg --verbose debugwireproto --peer raw http://$LOCALIP:$HGPORT/
6 6 }
7 7
8 8 sendhttpv2peer() {
9 9 hg --verbose debugwireproto --peer http2 http://$LOCALIP:$HGPORT/
10 10 }
11 11
12 12 cat > dummycommands.py << EOF
13 13 from mercurial import (
14 14 wireprototypes,
15 15 wireproto,
16 16 )
17 17
18 18 @wireproto.wireprotocommand('customreadonly', permission='pull')
19 def customreadonly(repo, proto):
19 def customreadonlyv1(repo, proto):
20 return wireprototypes.bytesresponse(b'customreadonly bytes response')
21
22 @wireproto.wireprotocommand('customreadonly', permission='pull',
23 transportpolicy=wireproto.POLICY_V2_ONLY)
24 def customreadonlyv2(repo, proto):
20 25 return wireprototypes.bytesresponse(b'customreadonly bytes response')
21 26
22 27 @wireproto.wireprotocommand('customreadwrite', permission='push')
23 28 def customreadwrite(repo, proto):
24 29 return wireprototypes.bytesresponse(b'customreadwrite bytes response')
30
31 @wireproto.wireprotocommand('customreadwrite', permission='push',
32 transportpolicy=wireproto.POLICY_V2_ONLY)
33 def customreadwritev2(repo, proto):
34 return wireprototypes.bytesresponse(b'customreadwrite bytes response')
25 35 EOF
26 36
27 37 cat >> $HGRCPATH << EOF
28 38 [extensions]
29 39 drawdag = $TESTDIR/drawdag.py
30 40 EOF
31 41
32 42 enabledummycommands() {
33 43 cat >> $HGRCPATH << EOF
34 44 [extensions]
35 45 dummycommands = $TESTTMP/dummycommands.py
36 46 EOF
37 47 }
38 48
39 49 enablehttpv2() {
40 50 cat >> $1/.hg/hgrc << EOF
41 51 [experimental]
42 52 web.apiserver = true
43 53 web.api.http-v2 = true
44 54 EOF
45 55 }
General Comments 0
You need to be logged in to leave comments. Login now