##// END OF EJS Templates
wireproto: improve docstring for @wireprotocommand...
Gregory Szorc -
r35998:b4976912 default
parent child Browse files
Show More
@@ -1,1033 +1,1039
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import os
12 12 import tempfile
13 13
14 14 from .i18n import _
15 15 from .node import (
16 16 bin,
17 17 hex,
18 18 nullid,
19 19 )
20 20
21 21 from . import (
22 22 bundle2,
23 23 changegroup as changegroupmod,
24 24 discovery,
25 25 encoding,
26 26 error,
27 27 exchange,
28 28 peer,
29 29 pushkey as pushkeymod,
30 30 pycompat,
31 31 repository,
32 32 streamclone,
33 33 util,
34 34 )
35 35
36 36 urlerr = util.urlerr
37 37 urlreq = util.urlreq
38 38
39 39 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
40 40 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
41 41 'IncompatibleClient')
42 42 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
43 43
44 44 class remoteiterbatcher(peer.iterbatcher):
45 45 def __init__(self, remote):
46 46 super(remoteiterbatcher, self).__init__()
47 47 self._remote = remote
48 48
49 49 def __getattr__(self, name):
50 50 # Validate this method is batchable, since submit() only supports
51 51 # batchable methods.
52 52 fn = getattr(self._remote, name)
53 53 if not getattr(fn, 'batchable', None):
54 54 raise error.ProgrammingError('Attempted to batch a non-batchable '
55 55 'call to %r' % name)
56 56
57 57 return super(remoteiterbatcher, self).__getattr__(name)
58 58
59 59 def submit(self):
60 60 """Break the batch request into many patch calls and pipeline them.
61 61
62 62 This is mostly valuable over http where request sizes can be
63 63 limited, but can be used in other places as well.
64 64 """
65 65 # 2-tuple of (command, arguments) that represents what will be
66 66 # sent over the wire.
67 67 requests = []
68 68
69 69 # 4-tuple of (command, final future, @batchable generator, remote
70 70 # future).
71 71 results = []
72 72
73 73 for command, args, opts, finalfuture in self.calls:
74 74 mtd = getattr(self._remote, command)
75 75 batchable = mtd.batchable(mtd.__self__, *args, **opts)
76 76
77 77 commandargs, fremote = next(batchable)
78 78 assert fremote
79 79 requests.append((command, commandargs))
80 80 results.append((command, finalfuture, batchable, fremote))
81 81
82 82 if requests:
83 83 self._resultiter = self._remote._submitbatch(requests)
84 84
85 85 self._results = results
86 86
87 87 def results(self):
88 88 for command, finalfuture, batchable, remotefuture in self._results:
89 89 # Get the raw result, set it in the remote future, feed it
90 90 # back into the @batchable generator so it can be decoded, and
91 91 # set the result on the final future to this value.
92 92 remoteresult = next(self._resultiter)
93 93 remotefuture.set(remoteresult)
94 94 finalfuture.set(next(batchable))
95 95
96 96 # Verify our @batchable generators only emit 2 values.
97 97 try:
98 98 next(batchable)
99 99 except StopIteration:
100 100 pass
101 101 else:
102 102 raise error.ProgrammingError('%s @batchable generator emitted '
103 103 'unexpected value count' % command)
104 104
105 105 yield finalfuture.value
106 106
107 107 # Forward a couple of names from peer to make wireproto interactions
108 108 # slightly more sensible.
109 109 batchable = peer.batchable
110 110 future = peer.future
111 111
112 112 # list of nodes encoding / decoding
113 113
114 114 def decodelist(l, sep=' '):
115 115 if l:
116 116 return [bin(v) for v in l.split(sep)]
117 117 return []
118 118
119 119 def encodelist(l, sep=' '):
120 120 try:
121 121 return sep.join(map(hex, l))
122 122 except TypeError:
123 123 raise
124 124
125 125 # batched call argument encoding
126 126
127 127 def escapearg(plain):
128 128 return (plain
129 129 .replace(':', ':c')
130 130 .replace(',', ':o')
131 131 .replace(';', ':s')
132 132 .replace('=', ':e'))
133 133
134 134 def unescapearg(escaped):
135 135 return (escaped
136 136 .replace(':e', '=')
137 137 .replace(':s', ';')
138 138 .replace(':o', ',')
139 139 .replace(':c', ':'))
140 140
141 141 def encodebatchcmds(req):
142 142 """Return a ``cmds`` argument value for the ``batch`` command."""
143 143 cmds = []
144 144 for op, argsdict in req:
145 145 # Old servers didn't properly unescape argument names. So prevent
146 146 # the sending of argument names that may not be decoded properly by
147 147 # servers.
148 148 assert all(escapearg(k) == k for k in argsdict)
149 149
150 150 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
151 151 for k, v in argsdict.iteritems())
152 152 cmds.append('%s %s' % (op, args))
153 153
154 154 return ';'.join(cmds)
155 155
156 156 # mapping of options accepted by getbundle and their types
157 157 #
158 158 # Meant to be extended by extensions. It is extensions responsibility to ensure
159 159 # such options are properly processed in exchange.getbundle.
160 160 #
161 161 # supported types are:
162 162 #
163 163 # :nodes: list of binary nodes
164 164 # :csv: list of comma-separated values
165 165 # :scsv: list of comma-separated values return as set
166 166 # :plain: string with no transformation needed.
167 167 gboptsmap = {'heads': 'nodes',
168 168 'bookmarks': 'boolean',
169 169 'common': 'nodes',
170 170 'obsmarkers': 'boolean',
171 171 'phases': 'boolean',
172 172 'bundlecaps': 'scsv',
173 173 'listkeys': 'csv',
174 174 'cg': 'boolean',
175 175 'cbattempted': 'boolean',
176 176 'stream': 'boolean',
177 177 }
178 178
179 179 # client side
180 180
181 181 class wirepeer(repository.legacypeer):
182 182 """Client-side interface for communicating with a peer repository.
183 183
184 184 Methods commonly call wire protocol commands of the same name.
185 185
186 186 See also httppeer.py and sshpeer.py for protocol-specific
187 187 implementations of this interface.
188 188 """
189 189 # Begin of basewirepeer interface.
190 190
191 191 def iterbatch(self):
192 192 return remoteiterbatcher(self)
193 193
194 194 @batchable
195 195 def lookup(self, key):
196 196 self.requirecap('lookup', _('look up remote revision'))
197 197 f = future()
198 198 yield {'key': encoding.fromlocal(key)}, f
199 199 d = f.value
200 200 success, data = d[:-1].split(" ", 1)
201 201 if int(success):
202 202 yield bin(data)
203 203 else:
204 204 self._abort(error.RepoError(data))
205 205
206 206 @batchable
207 207 def heads(self):
208 208 f = future()
209 209 yield {}, f
210 210 d = f.value
211 211 try:
212 212 yield decodelist(d[:-1])
213 213 except ValueError:
214 214 self._abort(error.ResponseError(_("unexpected response:"), d))
215 215
216 216 @batchable
217 217 def known(self, nodes):
218 218 f = future()
219 219 yield {'nodes': encodelist(nodes)}, f
220 220 d = f.value
221 221 try:
222 222 yield [bool(int(b)) for b in d]
223 223 except ValueError:
224 224 self._abort(error.ResponseError(_("unexpected response:"), d))
225 225
226 226 @batchable
227 227 def branchmap(self):
228 228 f = future()
229 229 yield {}, f
230 230 d = f.value
231 231 try:
232 232 branchmap = {}
233 233 for branchpart in d.splitlines():
234 234 branchname, branchheads = branchpart.split(' ', 1)
235 235 branchname = encoding.tolocal(urlreq.unquote(branchname))
236 236 branchheads = decodelist(branchheads)
237 237 branchmap[branchname] = branchheads
238 238 yield branchmap
239 239 except TypeError:
240 240 self._abort(error.ResponseError(_("unexpected response:"), d))
241 241
242 242 @batchable
243 243 def listkeys(self, namespace):
244 244 if not self.capable('pushkey'):
245 245 yield {}, None
246 246 f = future()
247 247 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
248 248 yield {'namespace': encoding.fromlocal(namespace)}, f
249 249 d = f.value
250 250 self.ui.debug('received listkey for "%s": %i bytes\n'
251 251 % (namespace, len(d)))
252 252 yield pushkeymod.decodekeys(d)
253 253
254 254 @batchable
255 255 def pushkey(self, namespace, key, old, new):
256 256 if not self.capable('pushkey'):
257 257 yield False, None
258 258 f = future()
259 259 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
260 260 yield {'namespace': encoding.fromlocal(namespace),
261 261 'key': encoding.fromlocal(key),
262 262 'old': encoding.fromlocal(old),
263 263 'new': encoding.fromlocal(new)}, f
264 264 d = f.value
265 265 d, output = d.split('\n', 1)
266 266 try:
267 267 d = bool(int(d))
268 268 except ValueError:
269 269 raise error.ResponseError(
270 270 _('push failed (unexpected response):'), d)
271 271 for l in output.splitlines(True):
272 272 self.ui.status(_('remote: '), l)
273 273 yield d
274 274
275 275 def stream_out(self):
276 276 return self._callstream('stream_out')
277 277
278 278 def getbundle(self, source, **kwargs):
279 279 kwargs = pycompat.byteskwargs(kwargs)
280 280 self.requirecap('getbundle', _('look up remote changes'))
281 281 opts = {}
282 282 bundlecaps = kwargs.get('bundlecaps')
283 283 if bundlecaps is not None:
284 284 kwargs['bundlecaps'] = sorted(bundlecaps)
285 285 else:
286 286 bundlecaps = () # kwargs could have it to None
287 287 for key, value in kwargs.iteritems():
288 288 if value is None:
289 289 continue
290 290 keytype = gboptsmap.get(key)
291 291 if keytype is None:
292 292 raise error.ProgrammingError(
293 293 'Unexpectedly None keytype for key %s' % key)
294 294 elif keytype == 'nodes':
295 295 value = encodelist(value)
296 296 elif keytype in ('csv', 'scsv'):
297 297 value = ','.join(value)
298 298 elif keytype == 'boolean':
299 299 value = '%i' % bool(value)
300 300 elif keytype != 'plain':
301 301 raise KeyError('unknown getbundle option type %s'
302 302 % keytype)
303 303 opts[key] = value
304 304 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
305 305 if any((cap.startswith('HG2') for cap in bundlecaps)):
306 306 return bundle2.getunbundler(self.ui, f)
307 307 else:
308 308 return changegroupmod.cg1unpacker(f, 'UN')
309 309
310 310 def unbundle(self, cg, heads, url):
311 311 '''Send cg (a readable file-like object representing the
312 312 changegroup to push, typically a chunkbuffer object) to the
313 313 remote server as a bundle.
314 314
315 315 When pushing a bundle10 stream, return an integer indicating the
316 316 result of the push (see changegroup.apply()).
317 317
318 318 When pushing a bundle20 stream, return a bundle20 stream.
319 319
320 320 `url` is the url the client thinks it's pushing to, which is
321 321 visible to hooks.
322 322 '''
323 323
324 324 if heads != ['force'] and self.capable('unbundlehash'):
325 325 heads = encodelist(['hashed',
326 326 hashlib.sha1(''.join(sorted(heads))).digest()])
327 327 else:
328 328 heads = encodelist(heads)
329 329
330 330 if util.safehasattr(cg, 'deltaheader'):
331 331 # this a bundle10, do the old style call sequence
332 332 ret, output = self._callpush("unbundle", cg, heads=heads)
333 333 if ret == "":
334 334 raise error.ResponseError(
335 335 _('push failed:'), output)
336 336 try:
337 337 ret = int(ret)
338 338 except ValueError:
339 339 raise error.ResponseError(
340 340 _('push failed (unexpected response):'), ret)
341 341
342 342 for l in output.splitlines(True):
343 343 self.ui.status(_('remote: '), l)
344 344 else:
345 345 # bundle2 push. Send a stream, fetch a stream.
346 346 stream = self._calltwowaystream('unbundle', cg, heads=heads)
347 347 ret = bundle2.getunbundler(self.ui, stream)
348 348 return ret
349 349
350 350 # End of basewirepeer interface.
351 351
352 352 # Begin of baselegacywirepeer interface.
353 353
354 354 def branches(self, nodes):
355 355 n = encodelist(nodes)
356 356 d = self._call("branches", nodes=n)
357 357 try:
358 358 br = [tuple(decodelist(b)) for b in d.splitlines()]
359 359 return br
360 360 except ValueError:
361 361 self._abort(error.ResponseError(_("unexpected response:"), d))
362 362
363 363 def between(self, pairs):
364 364 batch = 8 # avoid giant requests
365 365 r = []
366 366 for i in xrange(0, len(pairs), batch):
367 367 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
368 368 d = self._call("between", pairs=n)
369 369 try:
370 370 r.extend(l and decodelist(l) or [] for l in d.splitlines())
371 371 except ValueError:
372 372 self._abort(error.ResponseError(_("unexpected response:"), d))
373 373 return r
374 374
375 375 def changegroup(self, nodes, kind):
376 376 n = encodelist(nodes)
377 377 f = self._callcompressable("changegroup", roots=n)
378 378 return changegroupmod.cg1unpacker(f, 'UN')
379 379
380 380 def changegroupsubset(self, bases, heads, kind):
381 381 self.requirecap('changegroupsubset', _('look up remote changes'))
382 382 bases = encodelist(bases)
383 383 heads = encodelist(heads)
384 384 f = self._callcompressable("changegroupsubset",
385 385 bases=bases, heads=heads)
386 386 return changegroupmod.cg1unpacker(f, 'UN')
387 387
388 388 # End of baselegacywirepeer interface.
389 389
390 390 def _submitbatch(self, req):
391 391 """run batch request <req> on the server
392 392
393 393 Returns an iterator of the raw responses from the server.
394 394 """
395 395 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
396 396 chunk = rsp.read(1024)
397 397 work = [chunk]
398 398 while chunk:
399 399 while ';' not in chunk and chunk:
400 400 chunk = rsp.read(1024)
401 401 work.append(chunk)
402 402 merged = ''.join(work)
403 403 while ';' in merged:
404 404 one, merged = merged.split(';', 1)
405 405 yield unescapearg(one)
406 406 chunk = rsp.read(1024)
407 407 work = [merged, chunk]
408 408 yield unescapearg(''.join(work))
409 409
410 410 def _submitone(self, op, args):
411 411 return self._call(op, **pycompat.strkwargs(args))
412 412
413 413 def debugwireargs(self, one, two, three=None, four=None, five=None):
414 414 # don't pass optional arguments left at their default value
415 415 opts = {}
416 416 if three is not None:
417 417 opts[r'three'] = three
418 418 if four is not None:
419 419 opts[r'four'] = four
420 420 return self._call('debugwireargs', one=one, two=two, **opts)
421 421
422 422 def _call(self, cmd, **args):
423 423 """execute <cmd> on the server
424 424
425 425 The command is expected to return a simple string.
426 426
427 427 returns the server reply as a string."""
428 428 raise NotImplementedError()
429 429
430 430 def _callstream(self, cmd, **args):
431 431 """execute <cmd> on the server
432 432
433 433 The command is expected to return a stream. Note that if the
434 434 command doesn't return a stream, _callstream behaves
435 435 differently for ssh and http peers.
436 436
437 437 returns the server reply as a file like object.
438 438 """
439 439 raise NotImplementedError()
440 440
441 441 def _callcompressable(self, cmd, **args):
442 442 """execute <cmd> on the server
443 443
444 444 The command is expected to return a stream.
445 445
446 446 The stream may have been compressed in some implementations. This
447 447 function takes care of the decompression. This is the only difference
448 448 with _callstream.
449 449
450 450 returns the server reply as a file like object.
451 451 """
452 452 raise NotImplementedError()
453 453
454 454 def _callpush(self, cmd, fp, **args):
455 455 """execute a <cmd> on server
456 456
457 457 The command is expected to be related to a push. Push has a special
458 458 return method.
459 459
460 460 returns the server reply as a (ret, output) tuple. ret is either
461 461 empty (error) or a stringified int.
462 462 """
463 463 raise NotImplementedError()
464 464
465 465 def _calltwowaystream(self, cmd, fp, **args):
466 466 """execute <cmd> on server
467 467
468 468 The command will send a stream to the server and get a stream in reply.
469 469 """
470 470 raise NotImplementedError()
471 471
472 472 def _abort(self, exception):
473 473 """clearly abort the wire protocol connection and raise the exception
474 474 """
475 475 raise NotImplementedError()
476 476
477 477 # server side
478 478
479 479 # wire protocol command can either return a string or one of these classes.
480 480 class streamres(object):
481 481 """wireproto reply: binary stream
482 482
483 483 The call was successful and the result is a stream.
484 484
485 485 Accepts a generator containing chunks of data to be sent to the client.
486 486
487 487 ``prefer_uncompressed`` indicates that the data is expected to be
488 488 uncompressable and that the stream should therefore use the ``none``
489 489 engine.
490 490 """
491 491 def __init__(self, gen=None, prefer_uncompressed=False):
492 492 self.gen = gen
493 493 self.prefer_uncompressed = prefer_uncompressed
494 494
495 495 class streamres_legacy(object):
496 496 """wireproto reply: uncompressed binary stream
497 497
498 498 The call was successful and the result is a stream.
499 499
500 500 Accepts a generator containing chunks of data to be sent to the client.
501 501
502 502 Like ``streamres``, but sends an uncompressed data for "version 1" clients
503 503 using the application/mercurial-0.1 media type.
504 504 """
505 505 def __init__(self, gen=None):
506 506 self.gen = gen
507 507
508 508 class pushres(object):
509 509 """wireproto reply: success with simple integer return
510 510
511 511 The call was successful and returned an integer contained in `self.res`.
512 512 """
513 513 def __init__(self, res):
514 514 self.res = res
515 515
516 516 class pusherr(object):
517 517 """wireproto reply: failure
518 518
519 519 The call failed. The `self.res` attribute contains the error message.
520 520 """
521 521 def __init__(self, res):
522 522 self.res = res
523 523
524 524 class ooberror(object):
525 525 """wireproto reply: failure of a batch of operation
526 526
527 527 Something failed during a batch call. The error message is stored in
528 528 `self.message`.
529 529 """
530 530 def __init__(self, message):
531 531 self.message = message
532 532
533 533 def getdispatchrepo(repo, proto, command):
534 534 """Obtain the repo used for processing wire protocol commands.
535 535
536 536 The intent of this function is to serve as a monkeypatch point for
537 537 extensions that need commands to operate on different repo views under
538 538 specialized circumstances.
539 539 """
540 540 return repo.filtered('served')
541 541
542 542 def dispatch(repo, proto, command):
543 543 repo = getdispatchrepo(repo, proto, command)
544 544 func, spec = commands[command]
545 545 args = proto.getargs(spec)
546 546 return func(repo, proto, *args)
547 547
548 548 def options(cmd, keys, others):
549 549 opts = {}
550 550 for k in keys:
551 551 if k in others:
552 552 opts[k] = others[k]
553 553 del others[k]
554 554 if others:
555 555 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
556 556 % (cmd, ",".join(others)))
557 557 return opts
558 558
559 559 def bundle1allowed(repo, action):
560 560 """Whether a bundle1 operation is allowed from the server.
561 561
562 562 Priority is:
563 563
564 564 1. server.bundle1gd.<action> (if generaldelta active)
565 565 2. server.bundle1.<action>
566 566 3. server.bundle1gd (if generaldelta active)
567 567 4. server.bundle1
568 568 """
569 569 ui = repo.ui
570 570 gd = 'generaldelta' in repo.requirements
571 571
572 572 if gd:
573 573 v = ui.configbool('server', 'bundle1gd.%s' % action)
574 574 if v is not None:
575 575 return v
576 576
577 577 v = ui.configbool('server', 'bundle1.%s' % action)
578 578 if v is not None:
579 579 return v
580 580
581 581 if gd:
582 582 v = ui.configbool('server', 'bundle1gd')
583 583 if v is not None:
584 584 return v
585 585
586 586 return ui.configbool('server', 'bundle1')
587 587
588 588 def supportedcompengines(ui, proto, role):
589 589 """Obtain the list of supported compression engines for a request."""
590 590 assert role in (util.CLIENTROLE, util.SERVERROLE)
591 591
592 592 compengines = util.compengines.supportedwireengines(role)
593 593
594 594 # Allow config to override default list and ordering.
595 595 if role == util.SERVERROLE:
596 596 configengines = ui.configlist('server', 'compressionengines')
597 597 config = 'server.compressionengines'
598 598 else:
599 599 # This is currently implemented mainly to facilitate testing. In most
600 600 # cases, the server should be in charge of choosing a compression engine
601 601 # because a server has the most to lose from a sub-optimal choice. (e.g.
602 602 # CPU DoS due to an expensive engine or a network DoS due to poor
603 603 # compression ratio).
604 604 configengines = ui.configlist('experimental',
605 605 'clientcompressionengines')
606 606 config = 'experimental.clientcompressionengines'
607 607
608 608 # No explicit config. Filter out the ones that aren't supposed to be
609 609 # advertised and return default ordering.
610 610 if not configengines:
611 611 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
612 612 return [e for e in compengines
613 613 if getattr(e.wireprotosupport(), attr) > 0]
614 614
615 615 # If compression engines are listed in the config, assume there is a good
616 616 # reason for it (like server operators wanting to achieve specific
617 617 # performance characteristics). So fail fast if the config references
618 618 # unusable compression engines.
619 619 validnames = set(e.name() for e in compengines)
620 620 invalidnames = set(e for e in configengines if e not in validnames)
621 621 if invalidnames:
622 622 raise error.Abort(_('invalid compression engine defined in %s: %s') %
623 623 (config, ', '.join(sorted(invalidnames))))
624 624
625 625 compengines = [e for e in compengines if e.name() in configengines]
626 626 compengines = sorted(compengines,
627 627 key=lambda e: configengines.index(e.name()))
628 628
629 629 if not compengines:
630 630 raise error.Abort(_('%s config option does not specify any known '
631 631 'compression engines') % config,
632 632 hint=_('usable compression engines: %s') %
633 633 ', '.sorted(validnames))
634 634
635 635 return compengines
636 636
637 637 # list of commands
638 638 commands = {}
639 639
640 640 def wireprotocommand(name, args=''):
641 """decorator for wire protocol command"""
641 """Decorator to declare a wire protocol command.
642
643 ``name`` is the name of the wire protocol command being provided.
644
645 ``args`` is a space-delimited list of named arguments that the command
646 accepts. ``*`` is a special value that says to accept all arguments.
647 """
642 648 def register(func):
643 649 commands[name] = (func, args)
644 650 return func
645 651 return register
646 652
647 653 @wireprotocommand('batch', 'cmds *')
648 654 def batch(repo, proto, cmds, others):
649 655 repo = repo.filtered("served")
650 656 res = []
651 657 for pair in cmds.split(';'):
652 658 op, args = pair.split(' ', 1)
653 659 vals = {}
654 660 for a in args.split(','):
655 661 if a:
656 662 n, v = a.split('=')
657 663 vals[unescapearg(n)] = unescapearg(v)
658 664 func, spec = commands[op]
659 665 if spec:
660 666 keys = spec.split()
661 667 data = {}
662 668 for k in keys:
663 669 if k == '*':
664 670 star = {}
665 671 for key in vals.keys():
666 672 if key not in keys:
667 673 star[key] = vals[key]
668 674 data['*'] = star
669 675 else:
670 676 data[k] = vals[k]
671 677 result = func(repo, proto, *[data[k] for k in keys])
672 678 else:
673 679 result = func(repo, proto)
674 680 if isinstance(result, ooberror):
675 681 return result
676 682 res.append(escapearg(result))
677 683 return ';'.join(res)
678 684
679 685 @wireprotocommand('between', 'pairs')
680 686 def between(repo, proto, pairs):
681 687 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
682 688 r = []
683 689 for b in repo.between(pairs):
684 690 r.append(encodelist(b) + "\n")
685 691 return "".join(r)
686 692
687 693 @wireprotocommand('branchmap')
688 694 def branchmap(repo, proto):
689 695 branchmap = repo.branchmap()
690 696 heads = []
691 697 for branch, nodes in branchmap.iteritems():
692 698 branchname = urlreq.quote(encoding.fromlocal(branch))
693 699 branchnodes = encodelist(nodes)
694 700 heads.append('%s %s' % (branchname, branchnodes))
695 701 return '\n'.join(heads)
696 702
697 703 @wireprotocommand('branches', 'nodes')
698 704 def branches(repo, proto, nodes):
699 705 nodes = decodelist(nodes)
700 706 r = []
701 707 for b in repo.branches(nodes):
702 708 r.append(encodelist(b) + "\n")
703 709 return "".join(r)
704 710
705 711 @wireprotocommand('clonebundles', '')
706 712 def clonebundles(repo, proto):
707 713 """Server command for returning info for available bundles to seed clones.
708 714
709 715 Clients will parse this response and determine what bundle to fetch.
710 716
711 717 Extensions may wrap this command to filter or dynamically emit data
712 718 depending on the request. e.g. you could advertise URLs for the closest
713 719 data center given the client's IP address.
714 720 """
715 721 return repo.vfs.tryread('clonebundles.manifest')
716 722
717 723 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
718 724 'known', 'getbundle', 'unbundlehash', 'batch']
719 725
720 726 def _capabilities(repo, proto):
721 727 """return a list of capabilities for a repo
722 728
723 729 This function exists to allow extensions to easily wrap capabilities
724 730 computation
725 731
726 732 - returns a lists: easy to alter
727 733 - change done here will be propagated to both `capabilities` and `hello`
728 734 command without any other action needed.
729 735 """
730 736 # copy to prevent modification of the global list
731 737 caps = list(wireprotocaps)
732 738 if streamclone.allowservergeneration(repo):
733 739 if repo.ui.configbool('server', 'preferuncompressed'):
734 740 caps.append('stream-preferred')
735 741 requiredformats = repo.requirements & repo.supportedformats
736 742 # if our local revlogs are just revlogv1, add 'stream' cap
737 743 if not requiredformats - {'revlogv1'}:
738 744 caps.append('stream')
739 745 # otherwise, add 'streamreqs' detailing our local revlog format
740 746 else:
741 747 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
742 748 if repo.ui.configbool('experimental', 'bundle2-advertise'):
743 749 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
744 750 caps.append('bundle2=' + urlreq.quote(capsblob))
745 751 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
746 752
747 753 if proto.name == 'http':
748 754 caps.append('httpheader=%d' %
749 755 repo.ui.configint('server', 'maxhttpheaderlen'))
750 756 if repo.ui.configbool('experimental', 'httppostargs'):
751 757 caps.append('httppostargs')
752 758
753 759 # FUTURE advertise 0.2rx once support is implemented
754 760 # FUTURE advertise minrx and mintx after consulting config option
755 761 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
756 762
757 763 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
758 764 if compengines:
759 765 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
760 766 for e in compengines)
761 767 caps.append('compression=%s' % comptypes)
762 768
763 769 return caps
764 770
765 771 # If you are writing an extension and consider wrapping this function. Wrap
766 772 # `_capabilities` instead.
767 773 @wireprotocommand('capabilities')
768 774 def capabilities(repo, proto):
769 775 return ' '.join(_capabilities(repo, proto))
770 776
771 777 @wireprotocommand('changegroup', 'roots')
772 778 def changegroup(repo, proto, roots):
773 779 nodes = decodelist(roots)
774 780 outgoing = discovery.outgoing(repo, missingroots=nodes,
775 781 missingheads=repo.heads())
776 782 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
777 783 gen = iter(lambda: cg.read(32768), '')
778 784 return streamres(gen=gen)
779 785
780 786 @wireprotocommand('changegroupsubset', 'bases heads')
781 787 def changegroupsubset(repo, proto, bases, heads):
782 788 bases = decodelist(bases)
783 789 heads = decodelist(heads)
784 790 outgoing = discovery.outgoing(repo, missingroots=bases,
785 791 missingheads=heads)
786 792 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
787 793 gen = iter(lambda: cg.read(32768), '')
788 794 return streamres(gen=gen)
789 795
790 796 @wireprotocommand('debugwireargs', 'one two *')
791 797 def debugwireargs(repo, proto, one, two, others):
792 798 # only accept optional args from the known set
793 799 opts = options('debugwireargs', ['three', 'four'], others)
794 800 return repo.debugwireargs(one, two, **pycompat.strkwargs(opts))
795 801
796 802 @wireprotocommand('getbundle', '*')
797 803 def getbundle(repo, proto, others):
798 804 opts = options('getbundle', gboptsmap.keys(), others)
799 805 for k, v in opts.iteritems():
800 806 keytype = gboptsmap[k]
801 807 if keytype == 'nodes':
802 808 opts[k] = decodelist(v)
803 809 elif keytype == 'csv':
804 810 opts[k] = list(v.split(','))
805 811 elif keytype == 'scsv':
806 812 opts[k] = set(v.split(','))
807 813 elif keytype == 'boolean':
808 814 # Client should serialize False as '0', which is a non-empty string
809 815 # so it evaluates as a True bool.
810 816 if v == '0':
811 817 opts[k] = False
812 818 else:
813 819 opts[k] = bool(v)
814 820 elif keytype != 'plain':
815 821 raise KeyError('unknown getbundle option type %s'
816 822 % keytype)
817 823
818 824 if not bundle1allowed(repo, 'pull'):
819 825 if not exchange.bundle2requested(opts.get('bundlecaps')):
820 826 if proto.name == 'http':
821 827 return ooberror(bundle2required)
822 828 raise error.Abort(bundle2requiredmain,
823 829 hint=bundle2requiredhint)
824 830
825 831 prefercompressed = True
826 832
827 833 try:
828 834 if repo.ui.configbool('server', 'disablefullbundle'):
829 835 # Check to see if this is a full clone.
830 836 clheads = set(repo.changelog.heads())
831 837 changegroup = opts.get('cg', True)
832 838 heads = set(opts.get('heads', set()))
833 839 common = set(opts.get('common', set()))
834 840 common.discard(nullid)
835 841 if changegroup and not common and clheads == heads:
836 842 raise error.Abort(
837 843 _('server has pull-based clones disabled'),
838 844 hint=_('remove --pull if specified or upgrade Mercurial'))
839 845
840 846 info, chunks = exchange.getbundlechunks(repo, 'serve',
841 847 **pycompat.strkwargs(opts))
842 848 prefercompressed = info.get('prefercompressed', True)
843 849 except error.Abort as exc:
844 850 # cleanly forward Abort error to the client
845 851 if not exchange.bundle2requested(opts.get('bundlecaps')):
846 852 if proto.name == 'http':
847 853 return ooberror(str(exc) + '\n')
848 854 raise # cannot do better for bundle1 + ssh
849 855 # bundle2 request expect a bundle2 reply
850 856 bundler = bundle2.bundle20(repo.ui)
851 857 manargs = [('message', str(exc))]
852 858 advargs = []
853 859 if exc.hint is not None:
854 860 advargs.append(('hint', exc.hint))
855 861 bundler.addpart(bundle2.bundlepart('error:abort',
856 862 manargs, advargs))
857 863 chunks = bundler.getchunks()
858 864 prefercompressed = False
859 865
860 866 return streamres(gen=chunks, prefer_uncompressed=not prefercompressed)
861 867
862 868 @wireprotocommand('heads')
863 869 def heads(repo, proto):
864 870 h = repo.heads()
865 871 return encodelist(h) + "\n"
866 872
867 873 @wireprotocommand('hello')
868 874 def hello(repo, proto):
869 875 '''the hello command returns a set of lines describing various
870 876 interesting things about the server, in an RFC822-like format.
871 877 Currently the only one defined is "capabilities", which
872 878 consists of a line in the form:
873 879
874 880 capabilities: space separated list of tokens
875 881 '''
876 882 return "capabilities: %s\n" % (capabilities(repo, proto))
877 883
878 884 @wireprotocommand('listkeys', 'namespace')
879 885 def listkeys(repo, proto, namespace):
880 886 d = repo.listkeys(encoding.tolocal(namespace)).items()
881 887 return pushkeymod.encodekeys(d)
882 888
883 889 @wireprotocommand('lookup', 'key')
884 890 def lookup(repo, proto, key):
885 891 try:
886 892 k = encoding.tolocal(key)
887 893 c = repo[k]
888 894 r = c.hex()
889 895 success = 1
890 896 except Exception as inst:
891 897 r = str(inst)
892 898 success = 0
893 899 return "%d %s\n" % (success, r)
894 900
895 901 @wireprotocommand('known', 'nodes *')
896 902 def known(repo, proto, nodes, others):
897 903 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
898 904
899 905 @wireprotocommand('pushkey', 'namespace key old new')
900 906 def pushkey(repo, proto, namespace, key, old, new):
901 907 # compatibility with pre-1.8 clients which were accidentally
902 908 # sending raw binary nodes rather than utf-8-encoded hex
903 909 if len(new) == 20 and util.escapestr(new) != new:
904 910 # looks like it could be a binary node
905 911 try:
906 912 new.decode('utf-8')
907 913 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
908 914 except UnicodeDecodeError:
909 915 pass # binary, leave unmodified
910 916 else:
911 917 new = encoding.tolocal(new) # normal path
912 918
913 919 if util.safehasattr(proto, 'restore'):
914 920
915 921 proto.redirect()
916 922
917 923 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
918 924 encoding.tolocal(old), new) or False
919 925
920 926 output = proto.restore()
921 927
922 928 return '%s\n%s' % (int(r), output)
923 929
924 930 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
925 931 encoding.tolocal(old), new)
926 932 return '%s\n' % int(r)
927 933
928 934 @wireprotocommand('stream_out')
929 935 def stream(repo, proto):
930 936 '''If the server supports streaming clone, it advertises the "stream"
931 937 capability with a value representing the version and flags of the repo
932 938 it is serving. Client checks to see if it understands the format.
933 939 '''
934 940 return streamres_legacy(streamclone.generatev1wireproto(repo))
935 941
936 942 @wireprotocommand('unbundle', 'heads')
937 943 def unbundle(repo, proto, heads):
938 944 their_heads = decodelist(heads)
939 945
940 946 try:
941 947 proto.redirect()
942 948
943 949 exchange.check_heads(repo, their_heads, 'preparing changes')
944 950
945 951 # write bundle data to temporary file because it can be big
946 952 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
947 953 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
948 954 r = 0
949 955 try:
950 956 proto.getfile(fp)
951 957 fp.seek(0)
952 958 gen = exchange.readbundle(repo.ui, fp, None)
953 959 if (isinstance(gen, changegroupmod.cg1unpacker)
954 960 and not bundle1allowed(repo, 'push')):
955 961 if proto.name == 'http':
956 962 # need to special case http because stderr do not get to
957 963 # the http client on failed push so we need to abuse some
958 964 # other error type to make sure the message get to the
959 965 # user.
960 966 return ooberror(bundle2required)
961 967 raise error.Abort(bundle2requiredmain,
962 968 hint=bundle2requiredhint)
963 969
964 970 r = exchange.unbundle(repo, gen, their_heads, 'serve',
965 971 proto._client())
966 972 if util.safehasattr(r, 'addpart'):
967 973 # The return looks streamable, we are in the bundle2 case and
968 974 # should return a stream.
969 975 return streamres_legacy(gen=r.getchunks())
970 976 return pushres(r)
971 977
972 978 finally:
973 979 fp.close()
974 980 os.unlink(tempname)
975 981
976 982 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
977 983 # handle non-bundle2 case first
978 984 if not getattr(exc, 'duringunbundle2', False):
979 985 try:
980 986 raise
981 987 except error.Abort:
982 988 # The old code we moved used util.stderr directly.
983 989 # We did not change it to minimise code change.
984 990 # This need to be moved to something proper.
985 991 # Feel free to do it.
986 992 util.stderr.write("abort: %s\n" % exc)
987 993 if exc.hint is not None:
988 994 util.stderr.write("(%s)\n" % exc.hint)
989 995 return pushres(0)
990 996 except error.PushRaced:
991 997 return pusherr(str(exc))
992 998
993 999 bundler = bundle2.bundle20(repo.ui)
994 1000 for out in getattr(exc, '_bundle2salvagedoutput', ()):
995 1001 bundler.addpart(out)
996 1002 try:
997 1003 try:
998 1004 raise
999 1005 except error.PushkeyFailed as exc:
1000 1006 # check client caps
1001 1007 remotecaps = getattr(exc, '_replycaps', None)
1002 1008 if (remotecaps is not None
1003 1009 and 'pushkey' not in remotecaps.get('error', ())):
1004 1010 # no support remote side, fallback to Abort handler.
1005 1011 raise
1006 1012 part = bundler.newpart('error:pushkey')
1007 1013 part.addparam('in-reply-to', exc.partid)
1008 1014 if exc.namespace is not None:
1009 1015 part.addparam('namespace', exc.namespace, mandatory=False)
1010 1016 if exc.key is not None:
1011 1017 part.addparam('key', exc.key, mandatory=False)
1012 1018 if exc.new is not None:
1013 1019 part.addparam('new', exc.new, mandatory=False)
1014 1020 if exc.old is not None:
1015 1021 part.addparam('old', exc.old, mandatory=False)
1016 1022 if exc.ret is not None:
1017 1023 part.addparam('ret', exc.ret, mandatory=False)
1018 1024 except error.BundleValueError as exc:
1019 1025 errpart = bundler.newpart('error:unsupportedcontent')
1020 1026 if exc.parttype is not None:
1021 1027 errpart.addparam('parttype', exc.parttype)
1022 1028 if exc.params:
1023 1029 errpart.addparam('params', '\0'.join(exc.params))
1024 1030 except error.Abort as exc:
1025 1031 manargs = [('message', str(exc))]
1026 1032 advargs = []
1027 1033 if exc.hint is not None:
1028 1034 advargs.append(('hint', exc.hint))
1029 1035 bundler.addpart(bundle2.bundlepart('error:abort',
1030 1036 manargs, advargs))
1031 1037 except error.PushRaced as exc:
1032 1038 bundler.newpart('error:pushraced', [('message', str(exc))])
1033 1039 return streamres_legacy(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now