##// END OF EJS Templates
wireproto: use %d to encode int, not %s...
Augie Fackler -
r34732:31fdd050 default
parent child Browse files
Show More
@@ -1,1067 +1,1067 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import os
12 12 import tempfile
13 13
14 14 from .i18n import _
15 15 from .node import (
16 16 bin,
17 17 hex,
18 18 nullid,
19 19 )
20 20
21 21 from . import (
22 22 bundle2,
23 23 changegroup as changegroupmod,
24 24 discovery,
25 25 encoding,
26 26 error,
27 27 exchange,
28 28 peer,
29 29 pushkey as pushkeymod,
30 30 pycompat,
31 31 repository,
32 32 streamclone,
33 33 util,
34 34 )
35 35
36 36 urlerr = util.urlerr
37 37 urlreq = util.urlreq
38 38
39 39 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
40 40 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
41 41 'IncompatibleClient')
42 42 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
43 43
44 44 class abstractserverproto(object):
45 45 """abstract class that summarizes the protocol API
46 46
47 47 Used as reference and documentation.
48 48 """
49 49
50 50 def getargs(self, args):
51 51 """return the value for arguments in <args>
52 52
53 53 returns a list of values (same order as <args>)"""
54 54 raise NotImplementedError()
55 55
56 56 def getfile(self, fp):
57 57 """write the whole content of a file into a file like object
58 58
59 59 The file is in the form::
60 60
61 61 (<chunk-size>\n<chunk>)+0\n
62 62
63 63 chunk size is the ascii version of the int.
64 64 """
65 65 raise NotImplementedError()
66 66
67 67 def redirect(self):
68 68 """may setup interception for stdout and stderr
69 69
70 70 See also the `restore` method."""
71 71 raise NotImplementedError()
72 72
73 73 # If the `redirect` function does install interception, the `restore`
74 74 # function MUST be defined. If interception is not used, this function
75 75 # MUST NOT be defined.
76 76 #
77 77 # left commented here on purpose
78 78 #
79 79 #def restore(self):
80 80 # """reinstall previous stdout and stderr and return intercepted stdout
81 81 # """
82 82 # raise NotImplementedError()
83 83
84 84 class remoteiterbatcher(peer.iterbatcher):
85 85 def __init__(self, remote):
86 86 super(remoteiterbatcher, self).__init__()
87 87 self._remote = remote
88 88
89 89 def __getattr__(self, name):
90 90 # Validate this method is batchable, since submit() only supports
91 91 # batchable methods.
92 92 fn = getattr(self._remote, name)
93 93 if not getattr(fn, 'batchable', None):
94 94 raise error.ProgrammingError('Attempted to batch a non-batchable '
95 95 'call to %r' % name)
96 96
97 97 return super(remoteiterbatcher, self).__getattr__(name)
98 98
99 99 def submit(self):
100 100 """Break the batch request into many patch calls and pipeline them.
101 101
102 102 This is mostly valuable over http where request sizes can be
103 103 limited, but can be used in other places as well.
104 104 """
105 105 # 2-tuple of (command, arguments) that represents what will be
106 106 # sent over the wire.
107 107 requests = []
108 108
109 109 # 4-tuple of (command, final future, @batchable generator, remote
110 110 # future).
111 111 results = []
112 112
113 113 for command, args, opts, finalfuture in self.calls:
114 114 mtd = getattr(self._remote, command)
115 115 batchable = mtd.batchable(mtd.__self__, *args, **opts)
116 116
117 117 commandargs, fremote = next(batchable)
118 118 assert fremote
119 119 requests.append((command, commandargs))
120 120 results.append((command, finalfuture, batchable, fremote))
121 121
122 122 if requests:
123 123 self._resultiter = self._remote._submitbatch(requests)
124 124
125 125 self._results = results
126 126
127 127 def results(self):
128 128 for command, finalfuture, batchable, remotefuture in self._results:
129 129 # Get the raw result, set it in the remote future, feed it
130 130 # back into the @batchable generator so it can be decoded, and
131 131 # set the result on the final future to this value.
132 132 remoteresult = next(self._resultiter)
133 133 remotefuture.set(remoteresult)
134 134 finalfuture.set(next(batchable))
135 135
136 136 # Verify our @batchable generators only emit 2 values.
137 137 try:
138 138 next(batchable)
139 139 except StopIteration:
140 140 pass
141 141 else:
142 142 raise error.ProgrammingError('%s @batchable generator emitted '
143 143 'unexpected value count' % command)
144 144
145 145 yield finalfuture.value
146 146
147 147 # Forward a couple of names from peer to make wireproto interactions
148 148 # slightly more sensible.
149 149 batchable = peer.batchable
150 150 future = peer.future
151 151
152 152 # list of nodes encoding / decoding
153 153
154 154 def decodelist(l, sep=' '):
155 155 if l:
156 156 return [bin(v) for v in l.split(sep)]
157 157 return []
158 158
159 159 def encodelist(l, sep=' '):
160 160 try:
161 161 return sep.join(map(hex, l))
162 162 except TypeError:
163 163 raise
164 164
165 165 # batched call argument encoding
166 166
167 167 def escapearg(plain):
168 168 return (plain
169 169 .replace(':', ':c')
170 170 .replace(',', ':o')
171 171 .replace(';', ':s')
172 172 .replace('=', ':e'))
173 173
174 174 def unescapearg(escaped):
175 175 return (escaped
176 176 .replace(':e', '=')
177 177 .replace(':s', ';')
178 178 .replace(':o', ',')
179 179 .replace(':c', ':'))
180 180
181 181 def encodebatchcmds(req):
182 182 """Return a ``cmds`` argument value for the ``batch`` command."""
183 183 cmds = []
184 184 for op, argsdict in req:
185 185 # Old servers didn't properly unescape argument names. So prevent
186 186 # the sending of argument names that may not be decoded properly by
187 187 # servers.
188 188 assert all(escapearg(k) == k for k in argsdict)
189 189
190 190 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
191 191 for k, v in argsdict.iteritems())
192 192 cmds.append('%s %s' % (op, args))
193 193
194 194 return ';'.join(cmds)
195 195
196 196 # mapping of options accepted by getbundle and their types
197 197 #
198 198 # Meant to be extended by extensions. It is extensions responsibility to ensure
199 199 # such options are properly processed in exchange.getbundle.
200 200 #
201 201 # supported types are:
202 202 #
203 203 # :nodes: list of binary nodes
204 204 # :csv: list of comma-separated values
205 205 # :scsv: list of comma-separated values return as set
206 206 # :plain: string with no transformation needed.
207 207 gboptsmap = {'heads': 'nodes',
208 208 'common': 'nodes',
209 209 'obsmarkers': 'boolean',
210 210 'phases': 'boolean',
211 211 'bundlecaps': 'scsv',
212 212 'listkeys': 'csv',
213 213 'cg': 'boolean',
214 214 'cbattempted': 'boolean'}
215 215
216 216 # client side
217 217
218 218 class wirepeer(repository.legacypeer):
219 219 """Client-side interface for communicating with a peer repository.
220 220
221 221 Methods commonly call wire protocol commands of the same name.
222 222
223 223 See also httppeer.py and sshpeer.py for protocol-specific
224 224 implementations of this interface.
225 225 """
226 226 # Begin of basewirepeer interface.
227 227
228 228 def iterbatch(self):
229 229 return remoteiterbatcher(self)
230 230
231 231 @batchable
232 232 def lookup(self, key):
233 233 self.requirecap('lookup', _('look up remote revision'))
234 234 f = future()
235 235 yield {'key': encoding.fromlocal(key)}, f
236 236 d = f.value
237 237 success, data = d[:-1].split(" ", 1)
238 238 if int(success):
239 239 yield bin(data)
240 240 else:
241 241 self._abort(error.RepoError(data))
242 242
243 243 @batchable
244 244 def heads(self):
245 245 f = future()
246 246 yield {}, f
247 247 d = f.value
248 248 try:
249 249 yield decodelist(d[:-1])
250 250 except ValueError:
251 251 self._abort(error.ResponseError(_("unexpected response:"), d))
252 252
253 253 @batchable
254 254 def known(self, nodes):
255 255 f = future()
256 256 yield {'nodes': encodelist(nodes)}, f
257 257 d = f.value
258 258 try:
259 259 yield [bool(int(b)) for b in d]
260 260 except ValueError:
261 261 self._abort(error.ResponseError(_("unexpected response:"), d))
262 262
263 263 @batchable
264 264 def branchmap(self):
265 265 f = future()
266 266 yield {}, f
267 267 d = f.value
268 268 try:
269 269 branchmap = {}
270 270 for branchpart in d.splitlines():
271 271 branchname, branchheads = branchpart.split(' ', 1)
272 272 branchname = encoding.tolocal(urlreq.unquote(branchname))
273 273 branchheads = decodelist(branchheads)
274 274 branchmap[branchname] = branchheads
275 275 yield branchmap
276 276 except TypeError:
277 277 self._abort(error.ResponseError(_("unexpected response:"), d))
278 278
279 279 @batchable
280 280 def listkeys(self, namespace):
281 281 if not self.capable('pushkey'):
282 282 yield {}, None
283 283 f = future()
284 284 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
285 285 yield {'namespace': encoding.fromlocal(namespace)}, f
286 286 d = f.value
287 287 self.ui.debug('received listkey for "%s": %i bytes\n'
288 288 % (namespace, len(d)))
289 289 yield pushkeymod.decodekeys(d)
290 290
291 291 @batchable
292 292 def pushkey(self, namespace, key, old, new):
293 293 if not self.capable('pushkey'):
294 294 yield False, None
295 295 f = future()
296 296 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
297 297 yield {'namespace': encoding.fromlocal(namespace),
298 298 'key': encoding.fromlocal(key),
299 299 'old': encoding.fromlocal(old),
300 300 'new': encoding.fromlocal(new)}, f
301 301 d = f.value
302 302 d, output = d.split('\n', 1)
303 303 try:
304 304 d = bool(int(d))
305 305 except ValueError:
306 306 raise error.ResponseError(
307 307 _('push failed (unexpected response):'), d)
308 308 for l in output.splitlines(True):
309 309 self.ui.status(_('remote: '), l)
310 310 yield d
311 311
312 312 def stream_out(self):
313 313 return self._callstream('stream_out')
314 314
315 315 def getbundle(self, source, **kwargs):
316 316 self.requirecap('getbundle', _('look up remote changes'))
317 317 opts = {}
318 318 bundlecaps = kwargs.get('bundlecaps')
319 319 if bundlecaps is not None:
320 320 kwargs['bundlecaps'] = sorted(bundlecaps)
321 321 else:
322 322 bundlecaps = () # kwargs could have it to None
323 323 for key, value in kwargs.iteritems():
324 324 if value is None:
325 325 continue
326 326 keytype = gboptsmap.get(key)
327 327 if keytype is None:
328 328 raise error.ProgrammingError(
329 329 'Unexpectedly None keytype for key %s' % key)
330 330 elif keytype == 'nodes':
331 331 value = encodelist(value)
332 332 elif keytype in ('csv', 'scsv'):
333 333 value = ','.join(value)
334 334 elif keytype == 'boolean':
335 335 value = '%i' % bool(value)
336 336 elif keytype != 'plain':
337 337 raise KeyError('unknown getbundle option type %s'
338 338 % keytype)
339 339 opts[key] = value
340 340 f = self._callcompressable("getbundle", **opts)
341 341 if any((cap.startswith('HG2') for cap in bundlecaps)):
342 342 return bundle2.getunbundler(self.ui, f)
343 343 else:
344 344 return changegroupmod.cg1unpacker(f, 'UN')
345 345
346 346 def unbundle(self, cg, heads, url):
347 347 '''Send cg (a readable file-like object representing the
348 348 changegroup to push, typically a chunkbuffer object) to the
349 349 remote server as a bundle.
350 350
351 351 When pushing a bundle10 stream, return an integer indicating the
352 352 result of the push (see changegroup.apply()).
353 353
354 354 When pushing a bundle20 stream, return a bundle20 stream.
355 355
356 356 `url` is the url the client thinks it's pushing to, which is
357 357 visible to hooks.
358 358 '''
359 359
360 360 if heads != ['force'] and self.capable('unbundlehash'):
361 361 heads = encodelist(['hashed',
362 362 hashlib.sha1(''.join(sorted(heads))).digest()])
363 363 else:
364 364 heads = encodelist(heads)
365 365
366 366 if util.safehasattr(cg, 'deltaheader'):
367 367 # this a bundle10, do the old style call sequence
368 368 ret, output = self._callpush("unbundle", cg, heads=heads)
369 369 if ret == "":
370 370 raise error.ResponseError(
371 371 _('push failed:'), output)
372 372 try:
373 373 ret = int(ret)
374 374 except ValueError:
375 375 raise error.ResponseError(
376 376 _('push failed (unexpected response):'), ret)
377 377
378 378 for l in output.splitlines(True):
379 379 self.ui.status(_('remote: '), l)
380 380 else:
381 381 # bundle2 push. Send a stream, fetch a stream.
382 382 stream = self._calltwowaystream('unbundle', cg, heads=heads)
383 383 ret = bundle2.getunbundler(self.ui, stream)
384 384 return ret
385 385
386 386 # End of basewirepeer interface.
387 387
388 388 # Begin of baselegacywirepeer interface.
389 389
390 390 def branches(self, nodes):
391 391 n = encodelist(nodes)
392 392 d = self._call("branches", nodes=n)
393 393 try:
394 394 br = [tuple(decodelist(b)) for b in d.splitlines()]
395 395 return br
396 396 except ValueError:
397 397 self._abort(error.ResponseError(_("unexpected response:"), d))
398 398
399 399 def between(self, pairs):
400 400 batch = 8 # avoid giant requests
401 401 r = []
402 402 for i in xrange(0, len(pairs), batch):
403 403 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
404 404 d = self._call("between", pairs=n)
405 405 try:
406 406 r.extend(l and decodelist(l) or [] for l in d.splitlines())
407 407 except ValueError:
408 408 self._abort(error.ResponseError(_("unexpected response:"), d))
409 409 return r
410 410
411 411 def changegroup(self, nodes, kind):
412 412 n = encodelist(nodes)
413 413 f = self._callcompressable("changegroup", roots=n)
414 414 return changegroupmod.cg1unpacker(f, 'UN')
415 415
416 416 def changegroupsubset(self, bases, heads, kind):
417 417 self.requirecap('changegroupsubset', _('look up remote changes'))
418 418 bases = encodelist(bases)
419 419 heads = encodelist(heads)
420 420 f = self._callcompressable("changegroupsubset",
421 421 bases=bases, heads=heads)
422 422 return changegroupmod.cg1unpacker(f, 'UN')
423 423
424 424 # End of baselegacywirepeer interface.
425 425
426 426 def _submitbatch(self, req):
427 427 """run batch request <req> on the server
428 428
429 429 Returns an iterator of the raw responses from the server.
430 430 """
431 431 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
432 432 chunk = rsp.read(1024)
433 433 work = [chunk]
434 434 while chunk:
435 435 while ';' not in chunk and chunk:
436 436 chunk = rsp.read(1024)
437 437 work.append(chunk)
438 438 merged = ''.join(work)
439 439 while ';' in merged:
440 440 one, merged = merged.split(';', 1)
441 441 yield unescapearg(one)
442 442 chunk = rsp.read(1024)
443 443 work = [merged, chunk]
444 444 yield unescapearg(''.join(work))
445 445
446 446 def _submitone(self, op, args):
447 447 return self._call(op, **args)
448 448
449 449 def debugwireargs(self, one, two, three=None, four=None, five=None):
450 450 # don't pass optional arguments left at their default value
451 451 opts = {}
452 452 if three is not None:
453 453 opts['three'] = three
454 454 if four is not None:
455 455 opts['four'] = four
456 456 return self._call('debugwireargs', one=one, two=two, **opts)
457 457
458 458 def _call(self, cmd, **args):
459 459 """execute <cmd> on the server
460 460
461 461 The command is expected to return a simple string.
462 462
463 463 returns the server reply as a string."""
464 464 raise NotImplementedError()
465 465
466 466 def _callstream(self, cmd, **args):
467 467 """execute <cmd> on the server
468 468
469 469 The command is expected to return a stream. Note that if the
470 470 command doesn't return a stream, _callstream behaves
471 471 differently for ssh and http peers.
472 472
473 473 returns the server reply as a file like object.
474 474 """
475 475 raise NotImplementedError()
476 476
477 477 def _callcompressable(self, cmd, **args):
478 478 """execute <cmd> on the server
479 479
480 480 The command is expected to return a stream.
481 481
482 482 The stream may have been compressed in some implementations. This
483 483 function takes care of the decompression. This is the only difference
484 484 with _callstream.
485 485
486 486 returns the server reply as a file like object.
487 487 """
488 488 raise NotImplementedError()
489 489
490 490 def _callpush(self, cmd, fp, **args):
491 491 """execute a <cmd> on server
492 492
493 493 The command is expected to be related to a push. Push has a special
494 494 return method.
495 495
496 496 returns the server reply as a (ret, output) tuple. ret is either
497 497 empty (error) or a stringified int.
498 498 """
499 499 raise NotImplementedError()
500 500
501 501 def _calltwowaystream(self, cmd, fp, **args):
502 502 """execute <cmd> on server
503 503
504 504 The command will send a stream to the server and get a stream in reply.
505 505 """
506 506 raise NotImplementedError()
507 507
508 508 def _abort(self, exception):
509 509 """clearly abort the wire protocol connection and raise the exception
510 510 """
511 511 raise NotImplementedError()
512 512
513 513 # server side
514 514
515 515 # wire protocol command can either return a string or one of these classes.
516 516 class streamres(object):
517 517 """wireproto reply: binary stream
518 518
519 519 The call was successful and the result is a stream.
520 520
521 521 Accepts either a generator or an object with a ``read(size)`` method.
522 522
523 523 ``v1compressible`` indicates whether this data can be compressed to
524 524 "version 1" clients (technically: HTTP peers using
525 525 application/mercurial-0.1 media type). This flag should NOT be used on
526 526 new commands because new clients should support a more modern compression
527 527 mechanism.
528 528 """
529 529 def __init__(self, gen=None, reader=None, v1compressible=False):
530 530 self.gen = gen
531 531 self.reader = reader
532 532 self.v1compressible = v1compressible
533 533
534 534 class pushres(object):
535 535 """wireproto reply: success with simple integer return
536 536
537 537 The call was successful and returned an integer contained in `self.res`.
538 538 """
539 539 def __init__(self, res):
540 540 self.res = res
541 541
542 542 class pusherr(object):
543 543 """wireproto reply: failure
544 544
545 545 The call failed. The `self.res` attribute contains the error message.
546 546 """
547 547 def __init__(self, res):
548 548 self.res = res
549 549
550 550 class ooberror(object):
551 551 """wireproto reply: failure of a batch of operation
552 552
553 553 Something failed during a batch call. The error message is stored in
554 554 `self.message`.
555 555 """
556 556 def __init__(self, message):
557 557 self.message = message
558 558
559 559 def getdispatchrepo(repo, proto, command):
560 560 """Obtain the repo used for processing wire protocol commands.
561 561
562 562 The intent of this function is to serve as a monkeypatch point for
563 563 extensions that need commands to operate on different repo views under
564 564 specialized circumstances.
565 565 """
566 566 return repo.filtered('served')
567 567
568 568 def dispatch(repo, proto, command):
569 569 repo = getdispatchrepo(repo, proto, command)
570 570 func, spec = commands[command]
571 571 args = proto.getargs(spec)
572 572 return func(repo, proto, *args)
573 573
574 574 def options(cmd, keys, others):
575 575 opts = {}
576 576 for k in keys:
577 577 if k in others:
578 578 opts[k] = others[k]
579 579 del others[k]
580 580 if others:
581 581 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
582 582 % (cmd, ",".join(others)))
583 583 return opts
584 584
585 585 def bundle1allowed(repo, action):
586 586 """Whether a bundle1 operation is allowed from the server.
587 587
588 588 Priority is:
589 589
590 590 1. server.bundle1gd.<action> (if generaldelta active)
591 591 2. server.bundle1.<action>
592 592 3. server.bundle1gd (if generaldelta active)
593 593 4. server.bundle1
594 594 """
595 595 ui = repo.ui
596 596 gd = 'generaldelta' in repo.requirements
597 597
598 598 if gd:
599 599 v = ui.configbool('server', 'bundle1gd.%s' % action)
600 600 if v is not None:
601 601 return v
602 602
603 603 v = ui.configbool('server', 'bundle1.%s' % action)
604 604 if v is not None:
605 605 return v
606 606
607 607 if gd:
608 608 v = ui.configbool('server', 'bundle1gd')
609 609 if v is not None:
610 610 return v
611 611
612 612 return ui.configbool('server', 'bundle1')
613 613
614 614 def supportedcompengines(ui, proto, role):
615 615 """Obtain the list of supported compression engines for a request."""
616 616 assert role in (util.CLIENTROLE, util.SERVERROLE)
617 617
618 618 compengines = util.compengines.supportedwireengines(role)
619 619
620 620 # Allow config to override default list and ordering.
621 621 if role == util.SERVERROLE:
622 622 configengines = ui.configlist('server', 'compressionengines')
623 623 config = 'server.compressionengines'
624 624 else:
625 625 # This is currently implemented mainly to facilitate testing. In most
626 626 # cases, the server should be in charge of choosing a compression engine
627 627 # because a server has the most to lose from a sub-optimal choice. (e.g.
628 628 # CPU DoS due to an expensive engine or a network DoS due to poor
629 629 # compression ratio).
630 630 configengines = ui.configlist('experimental',
631 631 'clientcompressionengines')
632 632 config = 'experimental.clientcompressionengines'
633 633
634 634 # No explicit config. Filter out the ones that aren't supposed to be
635 635 # advertised and return default ordering.
636 636 if not configengines:
637 637 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
638 638 return [e for e in compengines
639 639 if getattr(e.wireprotosupport(), attr) > 0]
640 640
641 641 # If compression engines are listed in the config, assume there is a good
642 642 # reason for it (like server operators wanting to achieve specific
643 643 # performance characteristics). So fail fast if the config references
644 644 # unusable compression engines.
645 645 validnames = set(e.name() for e in compengines)
646 646 invalidnames = set(e for e in configengines if e not in validnames)
647 647 if invalidnames:
648 648 raise error.Abort(_('invalid compression engine defined in %s: %s') %
649 649 (config, ', '.join(sorted(invalidnames))))
650 650
651 651 compengines = [e for e in compengines if e.name() in configengines]
652 652 compengines = sorted(compengines,
653 653 key=lambda e: configengines.index(e.name()))
654 654
655 655 if not compengines:
656 656 raise error.Abort(_('%s config option does not specify any known '
657 657 'compression engines') % config,
658 658 hint=_('usable compression engines: %s') %
659 659 ', '.sorted(validnames))
660 660
661 661 return compengines
662 662
663 663 # list of commands
664 664 commands = {}
665 665
666 666 def wireprotocommand(name, args=''):
667 667 """decorator for wire protocol command"""
668 668 def register(func):
669 669 commands[name] = (func, args)
670 670 return func
671 671 return register
672 672
673 673 @wireprotocommand('batch', 'cmds *')
674 674 def batch(repo, proto, cmds, others):
675 675 repo = repo.filtered("served")
676 676 res = []
677 677 for pair in cmds.split(';'):
678 678 op, args = pair.split(' ', 1)
679 679 vals = {}
680 680 for a in args.split(','):
681 681 if a:
682 682 n, v = a.split('=')
683 683 vals[unescapearg(n)] = unescapearg(v)
684 684 func, spec = commands[op]
685 685 if spec:
686 686 keys = spec.split()
687 687 data = {}
688 688 for k in keys:
689 689 if k == '*':
690 690 star = {}
691 691 for key in vals.keys():
692 692 if key not in keys:
693 693 star[key] = vals[key]
694 694 data['*'] = star
695 695 else:
696 696 data[k] = vals[k]
697 697 result = func(repo, proto, *[data[k] for k in keys])
698 698 else:
699 699 result = func(repo, proto)
700 700 if isinstance(result, ooberror):
701 701 return result
702 702 res.append(escapearg(result))
703 703 return ';'.join(res)
704 704
705 705 @wireprotocommand('between', 'pairs')
706 706 def between(repo, proto, pairs):
707 707 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
708 708 r = []
709 709 for b in repo.between(pairs):
710 710 r.append(encodelist(b) + "\n")
711 711 return "".join(r)
712 712
713 713 @wireprotocommand('branchmap')
714 714 def branchmap(repo, proto):
715 715 branchmap = repo.branchmap()
716 716 heads = []
717 717 for branch, nodes in branchmap.iteritems():
718 718 branchname = urlreq.quote(encoding.fromlocal(branch))
719 719 branchnodes = encodelist(nodes)
720 720 heads.append('%s %s' % (branchname, branchnodes))
721 721 return '\n'.join(heads)
722 722
723 723 @wireprotocommand('branches', 'nodes')
724 724 def branches(repo, proto, nodes):
725 725 nodes = decodelist(nodes)
726 726 r = []
727 727 for b in repo.branches(nodes):
728 728 r.append(encodelist(b) + "\n")
729 729 return "".join(r)
730 730
731 731 @wireprotocommand('clonebundles', '')
732 732 def clonebundles(repo, proto):
733 733 """Server command for returning info for available bundles to seed clones.
734 734
735 735 Clients will parse this response and determine what bundle to fetch.
736 736
737 737 Extensions may wrap this command to filter or dynamically emit data
738 738 depending on the request. e.g. you could advertise URLs for the closest
739 739 data center given the client's IP address.
740 740 """
741 741 return repo.vfs.tryread('clonebundles.manifest')
742 742
743 743 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
744 744 'known', 'getbundle', 'unbundlehash', 'batch']
745 745
746 746 def _capabilities(repo, proto):
747 747 """return a list of capabilities for a repo
748 748
749 749 This function exists to allow extensions to easily wrap capabilities
750 750 computation
751 751
752 752 - returns a lists: easy to alter
753 753 - change done here will be propagated to both `capabilities` and `hello`
754 754 command without any other action needed.
755 755 """
756 756 # copy to prevent modification of the global list
757 757 caps = list(wireprotocaps)
758 758 if streamclone.allowservergeneration(repo):
759 759 if repo.ui.configbool('server', 'preferuncompressed'):
760 760 caps.append('stream-preferred')
761 761 requiredformats = repo.requirements & repo.supportedformats
762 762 # if our local revlogs are just revlogv1, add 'stream' cap
763 763 if not requiredformats - {'revlogv1'}:
764 764 caps.append('stream')
765 765 # otherwise, add 'streamreqs' detailing our local revlog format
766 766 else:
767 767 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
768 768 if repo.ui.configbool('experimental', 'bundle2-advertise'):
769 769 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
770 770 caps.append('bundle2=' + urlreq.quote(capsblob))
771 771 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
772 772
773 773 if proto.name == 'http':
774 774 caps.append('httpheader=%d' %
775 775 repo.ui.configint('server', 'maxhttpheaderlen'))
776 776 if repo.ui.configbool('experimental', 'httppostargs'):
777 777 caps.append('httppostargs')
778 778
779 779 # FUTURE advertise 0.2rx once support is implemented
780 780 # FUTURE advertise minrx and mintx after consulting config option
781 781 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
782 782
783 783 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
784 784 if compengines:
785 785 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
786 786 for e in compengines)
787 787 caps.append('compression=%s' % comptypes)
788 788
789 789 return caps
790 790
791 791 # If you are writing an extension and consider wrapping this function. Wrap
792 792 # `_capabilities` instead.
793 793 @wireprotocommand('capabilities')
794 794 def capabilities(repo, proto):
795 795 return ' '.join(_capabilities(repo, proto))
796 796
797 797 @wireprotocommand('changegroup', 'roots')
798 798 def changegroup(repo, proto, roots):
799 799 nodes = decodelist(roots)
800 800 outgoing = discovery.outgoing(repo, missingroots=nodes,
801 801 missingheads=repo.heads())
802 802 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
803 803 return streamres(reader=cg, v1compressible=True)
804 804
805 805 @wireprotocommand('changegroupsubset', 'bases heads')
806 806 def changegroupsubset(repo, proto, bases, heads):
807 807 bases = decodelist(bases)
808 808 heads = decodelist(heads)
809 809 outgoing = discovery.outgoing(repo, missingroots=bases,
810 810 missingheads=heads)
811 811 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
812 812 return streamres(reader=cg, v1compressible=True)
813 813
814 814 @wireprotocommand('debugwireargs', 'one two *')
815 815 def debugwireargs(repo, proto, one, two, others):
816 816 # only accept optional args from the known set
817 817 opts = options('debugwireargs', ['three', 'four'], others)
818 818 return repo.debugwireargs(one, two, **opts)
819 819
820 820 @wireprotocommand('getbundle', '*')
821 821 def getbundle(repo, proto, others):
822 822 opts = options('getbundle', gboptsmap.keys(), others)
823 823 for k, v in opts.iteritems():
824 824 keytype = gboptsmap[k]
825 825 if keytype == 'nodes':
826 826 opts[k] = decodelist(v)
827 827 elif keytype == 'csv':
828 828 opts[k] = list(v.split(','))
829 829 elif keytype == 'scsv':
830 830 opts[k] = set(v.split(','))
831 831 elif keytype == 'boolean':
832 832 # Client should serialize False as '0', which is a non-empty string
833 833 # so it evaluates as a True bool.
834 834 if v == '0':
835 835 opts[k] = False
836 836 else:
837 837 opts[k] = bool(v)
838 838 elif keytype != 'plain':
839 839 raise KeyError('unknown getbundle option type %s'
840 840 % keytype)
841 841
842 842 if not bundle1allowed(repo, 'pull'):
843 843 if not exchange.bundle2requested(opts.get('bundlecaps')):
844 844 if proto.name == 'http':
845 845 return ooberror(bundle2required)
846 846 raise error.Abort(bundle2requiredmain,
847 847 hint=bundle2requiredhint)
848 848
849 849 try:
850 850 if repo.ui.configbool('server', 'disablefullbundle'):
851 851 # Check to see if this is a full clone.
852 852 clheads = set(repo.changelog.heads())
853 853 heads = set(opts.get('heads', set()))
854 854 common = set(opts.get('common', set()))
855 855 common.discard(nullid)
856 856 if not common and clheads == heads:
857 857 raise error.Abort(
858 858 _('server has pull-based clones disabled'),
859 859 hint=_('remove --pull if specified or upgrade Mercurial'))
860 860
861 861 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
862 862 except error.Abort as exc:
863 863 # cleanly forward Abort error to the client
864 864 if not exchange.bundle2requested(opts.get('bundlecaps')):
865 865 if proto.name == 'http':
866 866 return ooberror(str(exc) + '\n')
867 867 raise # cannot do better for bundle1 + ssh
868 868 # bundle2 request expect a bundle2 reply
869 869 bundler = bundle2.bundle20(repo.ui)
870 870 manargs = [('message', str(exc))]
871 871 advargs = []
872 872 if exc.hint is not None:
873 873 advargs.append(('hint', exc.hint))
874 874 bundler.addpart(bundle2.bundlepart('error:abort',
875 875 manargs, advargs))
876 876 return streamres(gen=bundler.getchunks(), v1compressible=True)
877 877 return streamres(gen=chunks, v1compressible=True)
878 878
879 879 @wireprotocommand('heads')
880 880 def heads(repo, proto):
881 881 h = repo.heads()
882 882 return encodelist(h) + "\n"
883 883
884 884 @wireprotocommand('hello')
885 885 def hello(repo, proto):
886 886 '''the hello command returns a set of lines describing various
887 887 interesting things about the server, in an RFC822-like format.
888 888 Currently the only one defined is "capabilities", which
889 889 consists of a line in the form:
890 890
891 891 capabilities: space separated list of tokens
892 892 '''
893 893 return "capabilities: %s\n" % (capabilities(repo, proto))
894 894
895 895 @wireprotocommand('listkeys', 'namespace')
896 896 def listkeys(repo, proto, namespace):
897 897 d = repo.listkeys(encoding.tolocal(namespace)).items()
898 898 return pushkeymod.encodekeys(d)
899 899
900 900 @wireprotocommand('lookup', 'key')
901 901 def lookup(repo, proto, key):
902 902 try:
903 903 k = encoding.tolocal(key)
904 904 c = repo[k]
905 905 r = c.hex()
906 906 success = 1
907 907 except Exception as inst:
908 908 r = str(inst)
909 909 success = 0
910 return "%s %s\n" % (success, r)
910 return "%d %s\n" % (success, r)
911 911
912 912 @wireprotocommand('known', 'nodes *')
913 913 def known(repo, proto, nodes, others):
914 914 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
915 915
916 916 @wireprotocommand('pushkey', 'namespace key old new')
917 917 def pushkey(repo, proto, namespace, key, old, new):
918 918 # compatibility with pre-1.8 clients which were accidentally
919 919 # sending raw binary nodes rather than utf-8-encoded hex
920 920 if len(new) == 20 and util.escapestr(new) != new:
921 921 # looks like it could be a binary node
922 922 try:
923 923 new.decode('utf-8')
924 924 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
925 925 except UnicodeDecodeError:
926 926 pass # binary, leave unmodified
927 927 else:
928 928 new = encoding.tolocal(new) # normal path
929 929
930 930 if util.safehasattr(proto, 'restore'):
931 931
932 932 proto.redirect()
933 933
934 934 try:
935 935 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
936 936 encoding.tolocal(old), new) or False
937 937 except error.Abort:
938 938 r = False
939 939
940 940 output = proto.restore()
941 941
942 942 return '%s\n%s' % (int(r), output)
943 943
944 944 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
945 945 encoding.tolocal(old), new)
946 946 return '%s\n' % int(r)
947 947
948 948 @wireprotocommand('stream_out')
949 949 def stream(repo, proto):
950 950 '''If the server supports streaming clone, it advertises the "stream"
951 951 capability with a value representing the version and flags of the repo
952 952 it is serving. Client checks to see if it understands the format.
953 953 '''
954 954 if not streamclone.allowservergeneration(repo):
955 955 return '1\n'
956 956
957 957 def getstream(it):
958 958 yield '0\n'
959 959 for chunk in it:
960 960 yield chunk
961 961
962 962 try:
963 963 # LockError may be raised before the first result is yielded. Don't
964 964 # emit output until we're sure we got the lock successfully.
965 965 it = streamclone.generatev1wireproto(repo)
966 966 return streamres(gen=getstream(it))
967 967 except error.LockError:
968 968 return '2\n'
969 969
970 970 @wireprotocommand('unbundle', 'heads')
971 971 def unbundle(repo, proto, heads):
972 972 their_heads = decodelist(heads)
973 973
974 974 try:
975 975 proto.redirect()
976 976
977 977 exchange.check_heads(repo, their_heads, 'preparing changes')
978 978
979 979 # write bundle data to temporary file because it can be big
980 980 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
981 981 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
982 982 r = 0
983 983 try:
984 984 proto.getfile(fp)
985 985 fp.seek(0)
986 986 gen = exchange.readbundle(repo.ui, fp, None)
987 987 if (isinstance(gen, changegroupmod.cg1unpacker)
988 988 and not bundle1allowed(repo, 'push')):
989 989 if proto.name == 'http':
990 990 # need to special case http because stderr do not get to
991 991 # the http client on failed push so we need to abuse some
992 992 # other error type to make sure the message get to the
993 993 # user.
994 994 return ooberror(bundle2required)
995 995 raise error.Abort(bundle2requiredmain,
996 996 hint=bundle2requiredhint)
997 997
998 998 r = exchange.unbundle(repo, gen, their_heads, 'serve',
999 999 proto._client())
1000 1000 if util.safehasattr(r, 'addpart'):
1001 1001 # The return looks streamable, we are in the bundle2 case and
1002 1002 # should return a stream.
1003 1003 return streamres(gen=r.getchunks())
1004 1004 return pushres(r)
1005 1005
1006 1006 finally:
1007 1007 fp.close()
1008 1008 os.unlink(tempname)
1009 1009
1010 1010 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1011 1011 # handle non-bundle2 case first
1012 1012 if not getattr(exc, 'duringunbundle2', False):
1013 1013 try:
1014 1014 raise
1015 1015 except error.Abort:
1016 1016 # The old code we moved used util.stderr directly.
1017 1017 # We did not change it to minimise code change.
1018 1018 # This need to be moved to something proper.
1019 1019 # Feel free to do it.
1020 1020 util.stderr.write("abort: %s\n" % exc)
1021 1021 if exc.hint is not None:
1022 1022 util.stderr.write("(%s)\n" % exc.hint)
1023 1023 return pushres(0)
1024 1024 except error.PushRaced:
1025 1025 return pusherr(str(exc))
1026 1026
1027 1027 bundler = bundle2.bundle20(repo.ui)
1028 1028 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1029 1029 bundler.addpart(out)
1030 1030 try:
1031 1031 try:
1032 1032 raise
1033 1033 except error.PushkeyFailed as exc:
1034 1034 # check client caps
1035 1035 remotecaps = getattr(exc, '_replycaps', None)
1036 1036 if (remotecaps is not None
1037 1037 and 'pushkey' not in remotecaps.get('error', ())):
1038 1038 # no support remote side, fallback to Abort handler.
1039 1039 raise
1040 1040 part = bundler.newpart('error:pushkey')
1041 1041 part.addparam('in-reply-to', exc.partid)
1042 1042 if exc.namespace is not None:
1043 1043 part.addparam('namespace', exc.namespace, mandatory=False)
1044 1044 if exc.key is not None:
1045 1045 part.addparam('key', exc.key, mandatory=False)
1046 1046 if exc.new is not None:
1047 1047 part.addparam('new', exc.new, mandatory=False)
1048 1048 if exc.old is not None:
1049 1049 part.addparam('old', exc.old, mandatory=False)
1050 1050 if exc.ret is not None:
1051 1051 part.addparam('ret', exc.ret, mandatory=False)
1052 1052 except error.BundleValueError as exc:
1053 1053 errpart = bundler.newpart('error:unsupportedcontent')
1054 1054 if exc.parttype is not None:
1055 1055 errpart.addparam('parttype', exc.parttype)
1056 1056 if exc.params:
1057 1057 errpart.addparam('params', '\0'.join(exc.params))
1058 1058 except error.Abort as exc:
1059 1059 manargs = [('message', str(exc))]
1060 1060 advargs = []
1061 1061 if exc.hint is not None:
1062 1062 advargs.append(('hint', exc.hint))
1063 1063 bundler.addpart(bundle2.bundlepart('error:abort',
1064 1064 manargs, advargs))
1065 1065 except error.PushRaced as exc:
1066 1066 bundler.newpart('error:pushraced', [('message', str(exc))])
1067 1067 return streamres(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now