##// END OF EJS Templates
wireproto: use listcomp instead of map()...
Augie Fackler -
r34730:6f532c1a default
parent child Browse files
Show More
@@ -1,1066 +1,1066 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import os
12 12 import tempfile
13 13
14 14 from .i18n import _
15 15 from .node import (
16 16 bin,
17 17 hex,
18 18 nullid,
19 19 )
20 20
21 21 from . import (
22 22 bundle2,
23 23 changegroup as changegroupmod,
24 24 discovery,
25 25 encoding,
26 26 error,
27 27 exchange,
28 28 peer,
29 29 pushkey as pushkeymod,
30 30 pycompat,
31 31 repository,
32 32 streamclone,
33 33 util,
34 34 )
35 35
36 36 urlerr = util.urlerr
37 37 urlreq = util.urlreq
38 38
39 39 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
40 40 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
41 41 'IncompatibleClient')
42 42 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
43 43
44 44 class abstractserverproto(object):
45 45 """abstract class that summarizes the protocol API
46 46
47 47 Used as reference and documentation.
48 48 """
49 49
50 50 def getargs(self, args):
51 51 """return the value for arguments in <args>
52 52
53 53 returns a list of values (same order as <args>)"""
54 54 raise NotImplementedError()
55 55
56 56 def getfile(self, fp):
57 57 """write the whole content of a file into a file like object
58 58
59 59 The file is in the form::
60 60
61 61 (<chunk-size>\n<chunk>)+0\n
62 62
63 63 chunk size is the ascii version of the int.
64 64 """
65 65 raise NotImplementedError()
66 66
67 67 def redirect(self):
68 68 """may setup interception for stdout and stderr
69 69
70 70 See also the `restore` method."""
71 71 raise NotImplementedError()
72 72
73 73 # If the `redirect` function does install interception, the `restore`
74 74 # function MUST be defined. If interception is not used, this function
75 75 # MUST NOT be defined.
76 76 #
77 77 # left commented here on purpose
78 78 #
79 79 #def restore(self):
80 80 # """reinstall previous stdout and stderr and return intercepted stdout
81 81 # """
82 82 # raise NotImplementedError()
83 83
84 84 class remoteiterbatcher(peer.iterbatcher):
85 85 def __init__(self, remote):
86 86 super(remoteiterbatcher, self).__init__()
87 87 self._remote = remote
88 88
89 89 def __getattr__(self, name):
90 90 # Validate this method is batchable, since submit() only supports
91 91 # batchable methods.
92 92 fn = getattr(self._remote, name)
93 93 if not getattr(fn, 'batchable', None):
94 94 raise error.ProgrammingError('Attempted to batch a non-batchable '
95 95 'call to %r' % name)
96 96
97 97 return super(remoteiterbatcher, self).__getattr__(name)
98 98
99 99 def submit(self):
100 100 """Break the batch request into many patch calls and pipeline them.
101 101
102 102 This is mostly valuable over http where request sizes can be
103 103 limited, but can be used in other places as well.
104 104 """
105 105 # 2-tuple of (command, arguments) that represents what will be
106 106 # sent over the wire.
107 107 requests = []
108 108
109 109 # 4-tuple of (command, final future, @batchable generator, remote
110 110 # future).
111 111 results = []
112 112
113 113 for command, args, opts, finalfuture in self.calls:
114 114 mtd = getattr(self._remote, command)
115 115 batchable = mtd.batchable(mtd.__self__, *args, **opts)
116 116
117 117 commandargs, fremote = next(batchable)
118 118 assert fremote
119 119 requests.append((command, commandargs))
120 120 results.append((command, finalfuture, batchable, fremote))
121 121
122 122 if requests:
123 123 self._resultiter = self._remote._submitbatch(requests)
124 124
125 125 self._results = results
126 126
127 127 def results(self):
128 128 for command, finalfuture, batchable, remotefuture in self._results:
129 129 # Get the raw result, set it in the remote future, feed it
130 130 # back into the @batchable generator so it can be decoded, and
131 131 # set the result on the final future to this value.
132 132 remoteresult = next(self._resultiter)
133 133 remotefuture.set(remoteresult)
134 134 finalfuture.set(next(batchable))
135 135
136 136 # Verify our @batchable generators only emit 2 values.
137 137 try:
138 138 next(batchable)
139 139 except StopIteration:
140 140 pass
141 141 else:
142 142 raise error.ProgrammingError('%s @batchable generator emitted '
143 143 'unexpected value count' % command)
144 144
145 145 yield finalfuture.value
146 146
147 147 # Forward a couple of names from peer to make wireproto interactions
148 148 # slightly more sensible.
149 149 batchable = peer.batchable
150 150 future = peer.future
151 151
152 152 # list of nodes encoding / decoding
153 153
154 154 def decodelist(l, sep=' '):
155 155 if l:
156 return map(bin, l.split(sep))
156 return [bin(v) for v in l.split(sep)]
157 157 return []
158 158
159 159 def encodelist(l, sep=' '):
160 160 try:
161 161 return sep.join(map(hex, l))
162 162 except TypeError:
163 163 raise
164 164
165 165 # batched call argument encoding
166 166
167 167 def escapearg(plain):
168 168 return (plain
169 169 .replace(':', ':c')
170 170 .replace(',', ':o')
171 171 .replace(';', ':s')
172 172 .replace('=', ':e'))
173 173
174 174 def unescapearg(escaped):
175 175 return (escaped
176 176 .replace(':e', '=')
177 177 .replace(':s', ';')
178 178 .replace(':o', ',')
179 179 .replace(':c', ':'))
180 180
181 181 def encodebatchcmds(req):
182 182 """Return a ``cmds`` argument value for the ``batch`` command."""
183 183 cmds = []
184 184 for op, argsdict in req:
185 185 # Old servers didn't properly unescape argument names. So prevent
186 186 # the sending of argument names that may not be decoded properly by
187 187 # servers.
188 188 assert all(escapearg(k) == k for k in argsdict)
189 189
190 190 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
191 191 for k, v in argsdict.iteritems())
192 192 cmds.append('%s %s' % (op, args))
193 193
194 194 return ';'.join(cmds)
195 195
196 196 # mapping of options accepted by getbundle and their types
197 197 #
198 198 # Meant to be extended by extensions. It is extensions responsibility to ensure
199 199 # such options are properly processed in exchange.getbundle.
200 200 #
201 201 # supported types are:
202 202 #
203 203 # :nodes: list of binary nodes
204 204 # :csv: list of comma-separated values
205 205 # :scsv: list of comma-separated values return as set
206 206 # :plain: string with no transformation needed.
207 207 gboptsmap = {'heads': 'nodes',
208 208 'common': 'nodes',
209 209 'obsmarkers': 'boolean',
210 210 'phases': 'boolean',
211 211 'bundlecaps': 'scsv',
212 212 'listkeys': 'csv',
213 213 'cg': 'boolean',
214 214 'cbattempted': 'boolean'}
215 215
216 216 # client side
217 217
218 218 class wirepeer(repository.legacypeer):
219 219 """Client-side interface for communicating with a peer repository.
220 220
221 221 Methods commonly call wire protocol commands of the same name.
222 222
223 223 See also httppeer.py and sshpeer.py for protocol-specific
224 224 implementations of this interface.
225 225 """
226 226 # Begin of basewirepeer interface.
227 227
228 228 def iterbatch(self):
229 229 return remoteiterbatcher(self)
230 230
231 231 @batchable
232 232 def lookup(self, key):
233 233 self.requirecap('lookup', _('look up remote revision'))
234 234 f = future()
235 235 yield {'key': encoding.fromlocal(key)}, f
236 236 d = f.value
237 237 success, data = d[:-1].split(" ", 1)
238 238 if int(success):
239 239 yield bin(data)
240 240 else:
241 241 self._abort(error.RepoError(data))
242 242
243 243 @batchable
244 244 def heads(self):
245 245 f = future()
246 246 yield {}, f
247 247 d = f.value
248 248 try:
249 249 yield decodelist(d[:-1])
250 250 except ValueError:
251 251 self._abort(error.ResponseError(_("unexpected response:"), d))
252 252
253 253 @batchable
254 254 def known(self, nodes):
255 255 f = future()
256 256 yield {'nodes': encodelist(nodes)}, f
257 257 d = f.value
258 258 try:
259 259 yield [bool(int(b)) for b in d]
260 260 except ValueError:
261 261 self._abort(error.ResponseError(_("unexpected response:"), d))
262 262
263 263 @batchable
264 264 def branchmap(self):
265 265 f = future()
266 266 yield {}, f
267 267 d = f.value
268 268 try:
269 269 branchmap = {}
270 270 for branchpart in d.splitlines():
271 271 branchname, branchheads = branchpart.split(' ', 1)
272 272 branchname = encoding.tolocal(urlreq.unquote(branchname))
273 273 branchheads = decodelist(branchheads)
274 274 branchmap[branchname] = branchheads
275 275 yield branchmap
276 276 except TypeError:
277 277 self._abort(error.ResponseError(_("unexpected response:"), d))
278 278
279 279 @batchable
280 280 def listkeys(self, namespace):
281 281 if not self.capable('pushkey'):
282 282 yield {}, None
283 283 f = future()
284 284 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
285 285 yield {'namespace': encoding.fromlocal(namespace)}, f
286 286 d = f.value
287 287 self.ui.debug('received listkey for "%s": %i bytes\n'
288 288 % (namespace, len(d)))
289 289 yield pushkeymod.decodekeys(d)
290 290
291 291 @batchable
292 292 def pushkey(self, namespace, key, old, new):
293 293 if not self.capable('pushkey'):
294 294 yield False, None
295 295 f = future()
296 296 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
297 297 yield {'namespace': encoding.fromlocal(namespace),
298 298 'key': encoding.fromlocal(key),
299 299 'old': encoding.fromlocal(old),
300 300 'new': encoding.fromlocal(new)}, f
301 301 d = f.value
302 302 d, output = d.split('\n', 1)
303 303 try:
304 304 d = bool(int(d))
305 305 except ValueError:
306 306 raise error.ResponseError(
307 307 _('push failed (unexpected response):'), d)
308 308 for l in output.splitlines(True):
309 309 self.ui.status(_('remote: '), l)
310 310 yield d
311 311
312 312 def stream_out(self):
313 313 return self._callstream('stream_out')
314 314
315 315 def getbundle(self, source, **kwargs):
316 316 self.requirecap('getbundle', _('look up remote changes'))
317 317 opts = {}
318 318 bundlecaps = kwargs.get('bundlecaps')
319 319 if bundlecaps is not None:
320 320 kwargs['bundlecaps'] = sorted(bundlecaps)
321 321 else:
322 322 bundlecaps = () # kwargs could have it to None
323 323 for key, value in kwargs.iteritems():
324 324 if value is None:
325 325 continue
326 326 keytype = gboptsmap.get(key)
327 327 if keytype is None:
328 328 assert False, 'unexpected'
329 329 elif keytype == 'nodes':
330 330 value = encodelist(value)
331 331 elif keytype in ('csv', 'scsv'):
332 332 value = ','.join(value)
333 333 elif keytype == 'boolean':
334 334 value = '%i' % bool(value)
335 335 elif keytype != 'plain':
336 336 raise KeyError('unknown getbundle option type %s'
337 337 % keytype)
338 338 opts[key] = value
339 339 f = self._callcompressable("getbundle", **opts)
340 340 if any((cap.startswith('HG2') for cap in bundlecaps)):
341 341 return bundle2.getunbundler(self.ui, f)
342 342 else:
343 343 return changegroupmod.cg1unpacker(f, 'UN')
344 344
345 345 def unbundle(self, cg, heads, url):
346 346 '''Send cg (a readable file-like object representing the
347 347 changegroup to push, typically a chunkbuffer object) to the
348 348 remote server as a bundle.
349 349
350 350 When pushing a bundle10 stream, return an integer indicating the
351 351 result of the push (see changegroup.apply()).
352 352
353 353 When pushing a bundle20 stream, return a bundle20 stream.
354 354
355 355 `url` is the url the client thinks it's pushing to, which is
356 356 visible to hooks.
357 357 '''
358 358
359 359 if heads != ['force'] and self.capable('unbundlehash'):
360 360 heads = encodelist(['hashed',
361 361 hashlib.sha1(''.join(sorted(heads))).digest()])
362 362 else:
363 363 heads = encodelist(heads)
364 364
365 365 if util.safehasattr(cg, 'deltaheader'):
366 366 # this a bundle10, do the old style call sequence
367 367 ret, output = self._callpush("unbundle", cg, heads=heads)
368 368 if ret == "":
369 369 raise error.ResponseError(
370 370 _('push failed:'), output)
371 371 try:
372 372 ret = int(ret)
373 373 except ValueError:
374 374 raise error.ResponseError(
375 375 _('push failed (unexpected response):'), ret)
376 376
377 377 for l in output.splitlines(True):
378 378 self.ui.status(_('remote: '), l)
379 379 else:
380 380 # bundle2 push. Send a stream, fetch a stream.
381 381 stream = self._calltwowaystream('unbundle', cg, heads=heads)
382 382 ret = bundle2.getunbundler(self.ui, stream)
383 383 return ret
384 384
385 385 # End of basewirepeer interface.
386 386
387 387 # Begin of baselegacywirepeer interface.
388 388
389 389 def branches(self, nodes):
390 390 n = encodelist(nodes)
391 391 d = self._call("branches", nodes=n)
392 392 try:
393 393 br = [tuple(decodelist(b)) for b in d.splitlines()]
394 394 return br
395 395 except ValueError:
396 396 self._abort(error.ResponseError(_("unexpected response:"), d))
397 397
398 398 def between(self, pairs):
399 399 batch = 8 # avoid giant requests
400 400 r = []
401 401 for i in xrange(0, len(pairs), batch):
402 402 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
403 403 d = self._call("between", pairs=n)
404 404 try:
405 405 r.extend(l and decodelist(l) or [] for l in d.splitlines())
406 406 except ValueError:
407 407 self._abort(error.ResponseError(_("unexpected response:"), d))
408 408 return r
409 409
410 410 def changegroup(self, nodes, kind):
411 411 n = encodelist(nodes)
412 412 f = self._callcompressable("changegroup", roots=n)
413 413 return changegroupmod.cg1unpacker(f, 'UN')
414 414
415 415 def changegroupsubset(self, bases, heads, kind):
416 416 self.requirecap('changegroupsubset', _('look up remote changes'))
417 417 bases = encodelist(bases)
418 418 heads = encodelist(heads)
419 419 f = self._callcompressable("changegroupsubset",
420 420 bases=bases, heads=heads)
421 421 return changegroupmod.cg1unpacker(f, 'UN')
422 422
423 423 # End of baselegacywirepeer interface.
424 424
425 425 def _submitbatch(self, req):
426 426 """run batch request <req> on the server
427 427
428 428 Returns an iterator of the raw responses from the server.
429 429 """
430 430 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
431 431 chunk = rsp.read(1024)
432 432 work = [chunk]
433 433 while chunk:
434 434 while ';' not in chunk and chunk:
435 435 chunk = rsp.read(1024)
436 436 work.append(chunk)
437 437 merged = ''.join(work)
438 438 while ';' in merged:
439 439 one, merged = merged.split(';', 1)
440 440 yield unescapearg(one)
441 441 chunk = rsp.read(1024)
442 442 work = [merged, chunk]
443 443 yield unescapearg(''.join(work))
444 444
445 445 def _submitone(self, op, args):
446 446 return self._call(op, **args)
447 447
448 448 def debugwireargs(self, one, two, three=None, four=None, five=None):
449 449 # don't pass optional arguments left at their default value
450 450 opts = {}
451 451 if three is not None:
452 452 opts['three'] = three
453 453 if four is not None:
454 454 opts['four'] = four
455 455 return self._call('debugwireargs', one=one, two=two, **opts)
456 456
457 457 def _call(self, cmd, **args):
458 458 """execute <cmd> on the server
459 459
460 460 The command is expected to return a simple string.
461 461
462 462 returns the server reply as a string."""
463 463 raise NotImplementedError()
464 464
465 465 def _callstream(self, cmd, **args):
466 466 """execute <cmd> on the server
467 467
468 468 The command is expected to return a stream. Note that if the
469 469 command doesn't return a stream, _callstream behaves
470 470 differently for ssh and http peers.
471 471
472 472 returns the server reply as a file like object.
473 473 """
474 474 raise NotImplementedError()
475 475
476 476 def _callcompressable(self, cmd, **args):
477 477 """execute <cmd> on the server
478 478
479 479 The command is expected to return a stream.
480 480
481 481 The stream may have been compressed in some implementations. This
482 482 function takes care of the decompression. This is the only difference
483 483 with _callstream.
484 484
485 485 returns the server reply as a file like object.
486 486 """
487 487 raise NotImplementedError()
488 488
489 489 def _callpush(self, cmd, fp, **args):
490 490 """execute a <cmd> on server
491 491
492 492 The command is expected to be related to a push. Push has a special
493 493 return method.
494 494
495 495 returns the server reply as a (ret, output) tuple. ret is either
496 496 empty (error) or a stringified int.
497 497 """
498 498 raise NotImplementedError()
499 499
500 500 def _calltwowaystream(self, cmd, fp, **args):
501 501 """execute <cmd> on server
502 502
503 503 The command will send a stream to the server and get a stream in reply.
504 504 """
505 505 raise NotImplementedError()
506 506
507 507 def _abort(self, exception):
508 508 """clearly abort the wire protocol connection and raise the exception
509 509 """
510 510 raise NotImplementedError()
511 511
512 512 # server side
513 513
514 514 # wire protocol command can either return a string or one of these classes.
515 515 class streamres(object):
516 516 """wireproto reply: binary stream
517 517
518 518 The call was successful and the result is a stream.
519 519
520 520 Accepts either a generator or an object with a ``read(size)`` method.
521 521
522 522 ``v1compressible`` indicates whether this data can be compressed to
523 523 "version 1" clients (technically: HTTP peers using
524 524 application/mercurial-0.1 media type). This flag should NOT be used on
525 525 new commands because new clients should support a more modern compression
526 526 mechanism.
527 527 """
528 528 def __init__(self, gen=None, reader=None, v1compressible=False):
529 529 self.gen = gen
530 530 self.reader = reader
531 531 self.v1compressible = v1compressible
532 532
533 533 class pushres(object):
534 534 """wireproto reply: success with simple integer return
535 535
536 536 The call was successful and returned an integer contained in `self.res`.
537 537 """
538 538 def __init__(self, res):
539 539 self.res = res
540 540
541 541 class pusherr(object):
542 542 """wireproto reply: failure
543 543
544 544 The call failed. The `self.res` attribute contains the error message.
545 545 """
546 546 def __init__(self, res):
547 547 self.res = res
548 548
549 549 class ooberror(object):
550 550 """wireproto reply: failure of a batch of operation
551 551
552 552 Something failed during a batch call. The error message is stored in
553 553 `self.message`.
554 554 """
555 555 def __init__(self, message):
556 556 self.message = message
557 557
558 558 def getdispatchrepo(repo, proto, command):
559 559 """Obtain the repo used for processing wire protocol commands.
560 560
561 561 The intent of this function is to serve as a monkeypatch point for
562 562 extensions that need commands to operate on different repo views under
563 563 specialized circumstances.
564 564 """
565 565 return repo.filtered('served')
566 566
567 567 def dispatch(repo, proto, command):
568 568 repo = getdispatchrepo(repo, proto, command)
569 569 func, spec = commands[command]
570 570 args = proto.getargs(spec)
571 571 return func(repo, proto, *args)
572 572
573 573 def options(cmd, keys, others):
574 574 opts = {}
575 575 for k in keys:
576 576 if k in others:
577 577 opts[k] = others[k]
578 578 del others[k]
579 579 if others:
580 580 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
581 581 % (cmd, ",".join(others)))
582 582 return opts
583 583
584 584 def bundle1allowed(repo, action):
585 585 """Whether a bundle1 operation is allowed from the server.
586 586
587 587 Priority is:
588 588
589 589 1. server.bundle1gd.<action> (if generaldelta active)
590 590 2. server.bundle1.<action>
591 591 3. server.bundle1gd (if generaldelta active)
592 592 4. server.bundle1
593 593 """
594 594 ui = repo.ui
595 595 gd = 'generaldelta' in repo.requirements
596 596
597 597 if gd:
598 598 v = ui.configbool('server', 'bundle1gd.%s' % action)
599 599 if v is not None:
600 600 return v
601 601
602 602 v = ui.configbool('server', 'bundle1.%s' % action)
603 603 if v is not None:
604 604 return v
605 605
606 606 if gd:
607 607 v = ui.configbool('server', 'bundle1gd')
608 608 if v is not None:
609 609 return v
610 610
611 611 return ui.configbool('server', 'bundle1')
612 612
613 613 def supportedcompengines(ui, proto, role):
614 614 """Obtain the list of supported compression engines for a request."""
615 615 assert role in (util.CLIENTROLE, util.SERVERROLE)
616 616
617 617 compengines = util.compengines.supportedwireengines(role)
618 618
619 619 # Allow config to override default list and ordering.
620 620 if role == util.SERVERROLE:
621 621 configengines = ui.configlist('server', 'compressionengines')
622 622 config = 'server.compressionengines'
623 623 else:
624 624 # This is currently implemented mainly to facilitate testing. In most
625 625 # cases, the server should be in charge of choosing a compression engine
626 626 # because a server has the most to lose from a sub-optimal choice. (e.g.
627 627 # CPU DoS due to an expensive engine or a network DoS due to poor
628 628 # compression ratio).
629 629 configengines = ui.configlist('experimental',
630 630 'clientcompressionengines')
631 631 config = 'experimental.clientcompressionengines'
632 632
633 633 # No explicit config. Filter out the ones that aren't supposed to be
634 634 # advertised and return default ordering.
635 635 if not configengines:
636 636 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
637 637 return [e for e in compengines
638 638 if getattr(e.wireprotosupport(), attr) > 0]
639 639
640 640 # If compression engines are listed in the config, assume there is a good
641 641 # reason for it (like server operators wanting to achieve specific
642 642 # performance characteristics). So fail fast if the config references
643 643 # unusable compression engines.
644 644 validnames = set(e.name() for e in compengines)
645 645 invalidnames = set(e for e in configengines if e not in validnames)
646 646 if invalidnames:
647 647 raise error.Abort(_('invalid compression engine defined in %s: %s') %
648 648 (config, ', '.join(sorted(invalidnames))))
649 649
650 650 compengines = [e for e in compengines if e.name() in configengines]
651 651 compengines = sorted(compengines,
652 652 key=lambda e: configengines.index(e.name()))
653 653
654 654 if not compengines:
655 655 raise error.Abort(_('%s config option does not specify any known '
656 656 'compression engines') % config,
657 657 hint=_('usable compression engines: %s') %
658 658 ', '.sorted(validnames))
659 659
660 660 return compengines
661 661
662 662 # list of commands
663 663 commands = {}
664 664
665 665 def wireprotocommand(name, args=''):
666 666 """decorator for wire protocol command"""
667 667 def register(func):
668 668 commands[name] = (func, args)
669 669 return func
670 670 return register
671 671
672 672 @wireprotocommand('batch', 'cmds *')
673 673 def batch(repo, proto, cmds, others):
674 674 repo = repo.filtered("served")
675 675 res = []
676 676 for pair in cmds.split(';'):
677 677 op, args = pair.split(' ', 1)
678 678 vals = {}
679 679 for a in args.split(','):
680 680 if a:
681 681 n, v = a.split('=')
682 682 vals[unescapearg(n)] = unescapearg(v)
683 683 func, spec = commands[op]
684 684 if spec:
685 685 keys = spec.split()
686 686 data = {}
687 687 for k in keys:
688 688 if k == '*':
689 689 star = {}
690 690 for key in vals.keys():
691 691 if key not in keys:
692 692 star[key] = vals[key]
693 693 data['*'] = star
694 694 else:
695 695 data[k] = vals[k]
696 696 result = func(repo, proto, *[data[k] for k in keys])
697 697 else:
698 698 result = func(repo, proto)
699 699 if isinstance(result, ooberror):
700 700 return result
701 701 res.append(escapearg(result))
702 702 return ';'.join(res)
703 703
704 704 @wireprotocommand('between', 'pairs')
705 705 def between(repo, proto, pairs):
706 706 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
707 707 r = []
708 708 for b in repo.between(pairs):
709 709 r.append(encodelist(b) + "\n")
710 710 return "".join(r)
711 711
712 712 @wireprotocommand('branchmap')
713 713 def branchmap(repo, proto):
714 714 branchmap = repo.branchmap()
715 715 heads = []
716 716 for branch, nodes in branchmap.iteritems():
717 717 branchname = urlreq.quote(encoding.fromlocal(branch))
718 718 branchnodes = encodelist(nodes)
719 719 heads.append('%s %s' % (branchname, branchnodes))
720 720 return '\n'.join(heads)
721 721
722 722 @wireprotocommand('branches', 'nodes')
723 723 def branches(repo, proto, nodes):
724 724 nodes = decodelist(nodes)
725 725 r = []
726 726 for b in repo.branches(nodes):
727 727 r.append(encodelist(b) + "\n")
728 728 return "".join(r)
729 729
730 730 @wireprotocommand('clonebundles', '')
731 731 def clonebundles(repo, proto):
732 732 """Server command for returning info for available bundles to seed clones.
733 733
734 734 Clients will parse this response and determine what bundle to fetch.
735 735
736 736 Extensions may wrap this command to filter or dynamically emit data
737 737 depending on the request. e.g. you could advertise URLs for the closest
738 738 data center given the client's IP address.
739 739 """
740 740 return repo.vfs.tryread('clonebundles.manifest')
741 741
742 742 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
743 743 'known', 'getbundle', 'unbundlehash', 'batch']
744 744
745 745 def _capabilities(repo, proto):
746 746 """return a list of capabilities for a repo
747 747
748 748 This function exists to allow extensions to easily wrap capabilities
749 749 computation
750 750
751 751 - returns a lists: easy to alter
752 752 - change done here will be propagated to both `capabilities` and `hello`
753 753 command without any other action needed.
754 754 """
755 755 # copy to prevent modification of the global list
756 756 caps = list(wireprotocaps)
757 757 if streamclone.allowservergeneration(repo):
758 758 if repo.ui.configbool('server', 'preferuncompressed'):
759 759 caps.append('stream-preferred')
760 760 requiredformats = repo.requirements & repo.supportedformats
761 761 # if our local revlogs are just revlogv1, add 'stream' cap
762 762 if not requiredformats - {'revlogv1'}:
763 763 caps.append('stream')
764 764 # otherwise, add 'streamreqs' detailing our local revlog format
765 765 else:
766 766 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
767 767 if repo.ui.configbool('experimental', 'bundle2-advertise'):
768 768 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
769 769 caps.append('bundle2=' + urlreq.quote(capsblob))
770 770 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
771 771
772 772 if proto.name == 'http':
773 773 caps.append('httpheader=%d' %
774 774 repo.ui.configint('server', 'maxhttpheaderlen'))
775 775 if repo.ui.configbool('experimental', 'httppostargs'):
776 776 caps.append('httppostargs')
777 777
778 778 # FUTURE advertise 0.2rx once support is implemented
779 779 # FUTURE advertise minrx and mintx after consulting config option
780 780 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
781 781
782 782 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
783 783 if compengines:
784 784 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
785 785 for e in compengines)
786 786 caps.append('compression=%s' % comptypes)
787 787
788 788 return caps
789 789
790 790 # If you are writing an extension and consider wrapping this function. Wrap
791 791 # `_capabilities` instead.
792 792 @wireprotocommand('capabilities')
793 793 def capabilities(repo, proto):
794 794 return ' '.join(_capabilities(repo, proto))
795 795
796 796 @wireprotocommand('changegroup', 'roots')
797 797 def changegroup(repo, proto, roots):
798 798 nodes = decodelist(roots)
799 799 outgoing = discovery.outgoing(repo, missingroots=nodes,
800 800 missingheads=repo.heads())
801 801 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
802 802 return streamres(reader=cg, v1compressible=True)
803 803
804 804 @wireprotocommand('changegroupsubset', 'bases heads')
805 805 def changegroupsubset(repo, proto, bases, heads):
806 806 bases = decodelist(bases)
807 807 heads = decodelist(heads)
808 808 outgoing = discovery.outgoing(repo, missingroots=bases,
809 809 missingheads=heads)
810 810 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
811 811 return streamres(reader=cg, v1compressible=True)
812 812
813 813 @wireprotocommand('debugwireargs', 'one two *')
814 814 def debugwireargs(repo, proto, one, two, others):
815 815 # only accept optional args from the known set
816 816 opts = options('debugwireargs', ['three', 'four'], others)
817 817 return repo.debugwireargs(one, two, **opts)
818 818
819 819 @wireprotocommand('getbundle', '*')
820 820 def getbundle(repo, proto, others):
821 821 opts = options('getbundle', gboptsmap.keys(), others)
822 822 for k, v in opts.iteritems():
823 823 keytype = gboptsmap[k]
824 824 if keytype == 'nodes':
825 825 opts[k] = decodelist(v)
826 826 elif keytype == 'csv':
827 827 opts[k] = list(v.split(','))
828 828 elif keytype == 'scsv':
829 829 opts[k] = set(v.split(','))
830 830 elif keytype == 'boolean':
831 831 # Client should serialize False as '0', which is a non-empty string
832 832 # so it evaluates as a True bool.
833 833 if v == '0':
834 834 opts[k] = False
835 835 else:
836 836 opts[k] = bool(v)
837 837 elif keytype != 'plain':
838 838 raise KeyError('unknown getbundle option type %s'
839 839 % keytype)
840 840
841 841 if not bundle1allowed(repo, 'pull'):
842 842 if not exchange.bundle2requested(opts.get('bundlecaps')):
843 843 if proto.name == 'http':
844 844 return ooberror(bundle2required)
845 845 raise error.Abort(bundle2requiredmain,
846 846 hint=bundle2requiredhint)
847 847
848 848 try:
849 849 if repo.ui.configbool('server', 'disablefullbundle'):
850 850 # Check to see if this is a full clone.
851 851 clheads = set(repo.changelog.heads())
852 852 heads = set(opts.get('heads', set()))
853 853 common = set(opts.get('common', set()))
854 854 common.discard(nullid)
855 855 if not common and clheads == heads:
856 856 raise error.Abort(
857 857 _('server has pull-based clones disabled'),
858 858 hint=_('remove --pull if specified or upgrade Mercurial'))
859 859
860 860 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
861 861 except error.Abort as exc:
862 862 # cleanly forward Abort error to the client
863 863 if not exchange.bundle2requested(opts.get('bundlecaps')):
864 864 if proto.name == 'http':
865 865 return ooberror(str(exc) + '\n')
866 866 raise # cannot do better for bundle1 + ssh
867 867 # bundle2 request expect a bundle2 reply
868 868 bundler = bundle2.bundle20(repo.ui)
869 869 manargs = [('message', str(exc))]
870 870 advargs = []
871 871 if exc.hint is not None:
872 872 advargs.append(('hint', exc.hint))
873 873 bundler.addpart(bundle2.bundlepart('error:abort',
874 874 manargs, advargs))
875 875 return streamres(gen=bundler.getchunks(), v1compressible=True)
876 876 return streamres(gen=chunks, v1compressible=True)
877 877
878 878 @wireprotocommand('heads')
879 879 def heads(repo, proto):
880 880 h = repo.heads()
881 881 return encodelist(h) + "\n"
882 882
883 883 @wireprotocommand('hello')
884 884 def hello(repo, proto):
885 885 '''the hello command returns a set of lines describing various
886 886 interesting things about the server, in an RFC822-like format.
887 887 Currently the only one defined is "capabilities", which
888 888 consists of a line in the form:
889 889
890 890 capabilities: space separated list of tokens
891 891 '''
892 892 return "capabilities: %s\n" % (capabilities(repo, proto))
893 893
894 894 @wireprotocommand('listkeys', 'namespace')
895 895 def listkeys(repo, proto, namespace):
896 896 d = repo.listkeys(encoding.tolocal(namespace)).items()
897 897 return pushkeymod.encodekeys(d)
898 898
899 899 @wireprotocommand('lookup', 'key')
900 900 def lookup(repo, proto, key):
901 901 try:
902 902 k = encoding.tolocal(key)
903 903 c = repo[k]
904 904 r = c.hex()
905 905 success = 1
906 906 except Exception as inst:
907 907 r = str(inst)
908 908 success = 0
909 909 return "%s %s\n" % (success, r)
910 910
911 911 @wireprotocommand('known', 'nodes *')
912 912 def known(repo, proto, nodes, others):
913 913 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
914 914
915 915 @wireprotocommand('pushkey', 'namespace key old new')
916 916 def pushkey(repo, proto, namespace, key, old, new):
917 917 # compatibility with pre-1.8 clients which were accidentally
918 918 # sending raw binary nodes rather than utf-8-encoded hex
919 919 if len(new) == 20 and util.escapestr(new) != new:
920 920 # looks like it could be a binary node
921 921 try:
922 922 new.decode('utf-8')
923 923 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
924 924 except UnicodeDecodeError:
925 925 pass # binary, leave unmodified
926 926 else:
927 927 new = encoding.tolocal(new) # normal path
928 928
929 929 if util.safehasattr(proto, 'restore'):
930 930
931 931 proto.redirect()
932 932
933 933 try:
934 934 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
935 935 encoding.tolocal(old), new) or False
936 936 except error.Abort:
937 937 r = False
938 938
939 939 output = proto.restore()
940 940
941 941 return '%s\n%s' % (int(r), output)
942 942
943 943 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
944 944 encoding.tolocal(old), new)
945 945 return '%s\n' % int(r)
946 946
947 947 @wireprotocommand('stream_out')
948 948 def stream(repo, proto):
949 949 '''If the server supports streaming clone, it advertises the "stream"
950 950 capability with a value representing the version and flags of the repo
951 951 it is serving. Client checks to see if it understands the format.
952 952 '''
953 953 if not streamclone.allowservergeneration(repo):
954 954 return '1\n'
955 955
956 956 def getstream(it):
957 957 yield '0\n'
958 958 for chunk in it:
959 959 yield chunk
960 960
961 961 try:
962 962 # LockError may be raised before the first result is yielded. Don't
963 963 # emit output until we're sure we got the lock successfully.
964 964 it = streamclone.generatev1wireproto(repo)
965 965 return streamres(gen=getstream(it))
966 966 except error.LockError:
967 967 return '2\n'
968 968
969 969 @wireprotocommand('unbundle', 'heads')
970 970 def unbundle(repo, proto, heads):
971 971 their_heads = decodelist(heads)
972 972
973 973 try:
974 974 proto.redirect()
975 975
976 976 exchange.check_heads(repo, their_heads, 'preparing changes')
977 977
978 978 # write bundle data to temporary file because it can be big
979 979 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
980 980 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
981 981 r = 0
982 982 try:
983 983 proto.getfile(fp)
984 984 fp.seek(0)
985 985 gen = exchange.readbundle(repo.ui, fp, None)
986 986 if (isinstance(gen, changegroupmod.cg1unpacker)
987 987 and not bundle1allowed(repo, 'push')):
988 988 if proto.name == 'http':
989 989 # need to special case http because stderr do not get to
990 990 # the http client on failed push so we need to abuse some
991 991 # other error type to make sure the message get to the
992 992 # user.
993 993 return ooberror(bundle2required)
994 994 raise error.Abort(bundle2requiredmain,
995 995 hint=bundle2requiredhint)
996 996
997 997 r = exchange.unbundle(repo, gen, their_heads, 'serve',
998 998 proto._client())
999 999 if util.safehasattr(r, 'addpart'):
1000 1000 # The return looks streamable, we are in the bundle2 case and
1001 1001 # should return a stream.
1002 1002 return streamres(gen=r.getchunks())
1003 1003 return pushres(r)
1004 1004
1005 1005 finally:
1006 1006 fp.close()
1007 1007 os.unlink(tempname)
1008 1008
1009 1009 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1010 1010 # handle non-bundle2 case first
1011 1011 if not getattr(exc, 'duringunbundle2', False):
1012 1012 try:
1013 1013 raise
1014 1014 except error.Abort:
1015 1015 # The old code we moved used util.stderr directly.
1016 1016 # We did not change it to minimise code change.
1017 1017 # This need to be moved to something proper.
1018 1018 # Feel free to do it.
1019 1019 util.stderr.write("abort: %s\n" % exc)
1020 1020 if exc.hint is not None:
1021 1021 util.stderr.write("(%s)\n" % exc.hint)
1022 1022 return pushres(0)
1023 1023 except error.PushRaced:
1024 1024 return pusherr(str(exc))
1025 1025
1026 1026 bundler = bundle2.bundle20(repo.ui)
1027 1027 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1028 1028 bundler.addpart(out)
1029 1029 try:
1030 1030 try:
1031 1031 raise
1032 1032 except error.PushkeyFailed as exc:
1033 1033 # check client caps
1034 1034 remotecaps = getattr(exc, '_replycaps', None)
1035 1035 if (remotecaps is not None
1036 1036 and 'pushkey' not in remotecaps.get('error', ())):
1037 1037 # no support remote side, fallback to Abort handler.
1038 1038 raise
1039 1039 part = bundler.newpart('error:pushkey')
1040 1040 part.addparam('in-reply-to', exc.partid)
1041 1041 if exc.namespace is not None:
1042 1042 part.addparam('namespace', exc.namespace, mandatory=False)
1043 1043 if exc.key is not None:
1044 1044 part.addparam('key', exc.key, mandatory=False)
1045 1045 if exc.new is not None:
1046 1046 part.addparam('new', exc.new, mandatory=False)
1047 1047 if exc.old is not None:
1048 1048 part.addparam('old', exc.old, mandatory=False)
1049 1049 if exc.ret is not None:
1050 1050 part.addparam('ret', exc.ret, mandatory=False)
1051 1051 except error.BundleValueError as exc:
1052 1052 errpart = bundler.newpart('error:unsupportedcontent')
1053 1053 if exc.parttype is not None:
1054 1054 errpart.addparam('parttype', exc.parttype)
1055 1055 if exc.params:
1056 1056 errpart.addparam('params', '\0'.join(exc.params))
1057 1057 except error.Abort as exc:
1058 1058 manargs = [('message', str(exc))]
1059 1059 advargs = []
1060 1060 if exc.hint is not None:
1061 1061 advargs.append(('hint', exc.hint))
1062 1062 bundler.addpart(bundle2.bundlepart('error:abort',
1063 1063 manargs, advargs))
1064 1064 except error.PushRaced as exc:
1065 1065 bundler.newpart('error:pushraced', [('message', str(exc))])
1066 1066 return streamres(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now