##// END OF EJS Templates
wireproto: unescape argument names in batch command (BC)...
Gregory Szorc -
r29734:62e2e048 default
parent child Browse files
Show More
@@ -1,957 +1,962
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import itertools
12 12 import os
13 13 import sys
14 14 import tempfile
15 15
16 16 from .i18n import _
17 17 from .node import (
18 18 bin,
19 19 hex,
20 20 )
21 21
22 22 from . import (
23 23 bundle2,
24 24 changegroup as changegroupmod,
25 25 encoding,
26 26 error,
27 27 exchange,
28 28 peer,
29 29 pushkey as pushkeymod,
30 30 streamclone,
31 31 util,
32 32 )
33 33
34 34 urlerr = util.urlerr
35 35 urlreq = util.urlreq
36 36
37 37 bundle2required = _(
38 38 'incompatible Mercurial client; bundle2 required\n'
39 39 '(see https://www.mercurial-scm.org/wiki/IncompatibleClient)\n')
40 40
41 41 class abstractserverproto(object):
42 42 """abstract class that summarizes the protocol API
43 43
44 44 Used as reference and documentation.
45 45 """
46 46
47 47 def getargs(self, args):
48 48 """return the value for arguments in <args>
49 49
50 50 returns a list of values (same order as <args>)"""
51 51 raise NotImplementedError()
52 52
53 53 def getfile(self, fp):
54 54 """write the whole content of a file into a file like object
55 55
56 56 The file is in the form::
57 57
58 58 (<chunk-size>\n<chunk>)+0\n
59 59
60 60 chunk size is the ascii version of the int.
61 61 """
62 62 raise NotImplementedError()
63 63
64 64 def redirect(self):
65 65 """may setup interception for stdout and stderr
66 66
67 67 See also the `restore` method."""
68 68 raise NotImplementedError()
69 69
70 70 # If the `redirect` function does install interception, the `restore`
71 71 # function MUST be defined. If interception is not used, this function
72 72 # MUST NOT be defined.
73 73 #
74 74 # left commented here on purpose
75 75 #
76 76 #def restore(self):
77 77 # """reinstall previous stdout and stderr and return intercepted stdout
78 78 # """
79 79 # raise NotImplementedError()
80 80
81 81 def groupchunks(self, cg):
82 82 """return 4096 chunks from a changegroup object
83 83
84 84 Some protocols may have compressed the contents."""
85 85 raise NotImplementedError()
86 86
87 87 class remotebatch(peer.batcher):
88 88 '''batches the queued calls; uses as few roundtrips as possible'''
89 89 def __init__(self, remote):
90 90 '''remote must support _submitbatch(encbatch) and
91 91 _submitone(op, encargs)'''
92 92 peer.batcher.__init__(self)
93 93 self.remote = remote
94 94 def submit(self):
95 95 req, rsp = [], []
96 96 for name, args, opts, resref in self.calls:
97 97 mtd = getattr(self.remote, name)
98 98 batchablefn = getattr(mtd, 'batchable', None)
99 99 if batchablefn is not None:
100 100 batchable = batchablefn(mtd.im_self, *args, **opts)
101 101 encargsorres, encresref = next(batchable)
102 102 if encresref:
103 103 req.append((name, encargsorres,))
104 104 rsp.append((batchable, encresref, resref,))
105 105 else:
106 106 resref.set(encargsorres)
107 107 else:
108 108 if req:
109 109 self._submitreq(req, rsp)
110 110 req, rsp = [], []
111 111 resref.set(mtd(*args, **opts))
112 112 if req:
113 113 self._submitreq(req, rsp)
114 114 def _submitreq(self, req, rsp):
115 115 encresults = self.remote._submitbatch(req)
116 116 for encres, r in zip(encresults, rsp):
117 117 batchable, encresref, resref = r
118 118 encresref.set(encres)
119 119 resref.set(next(batchable))
120 120
121 121 class remoteiterbatcher(peer.iterbatcher):
122 122 def __init__(self, remote):
123 123 super(remoteiterbatcher, self).__init__()
124 124 self._remote = remote
125 125
126 126 def __getattr__(self, name):
127 127 if not getattr(self._remote, name, False):
128 128 raise AttributeError(
129 129 'Attempted to iterbatch non-batchable call to %r' % name)
130 130 return super(remoteiterbatcher, self).__getattr__(name)
131 131
132 132 def submit(self):
133 133 """Break the batch request into many patch calls and pipeline them.
134 134
135 135 This is mostly valuable over http where request sizes can be
136 136 limited, but can be used in other places as well.
137 137 """
138 138 req, rsp = [], []
139 139 for name, args, opts, resref in self.calls:
140 140 mtd = getattr(self._remote, name)
141 141 batchable = mtd.batchable(mtd.im_self, *args, **opts)
142 142 encargsorres, encresref = next(batchable)
143 143 assert encresref
144 144 req.append((name, encargsorres))
145 145 rsp.append((batchable, encresref))
146 146 if req:
147 147 self._resultiter = self._remote._submitbatch(req)
148 148 self._rsp = rsp
149 149
150 150 def results(self):
151 151 for (batchable, encresref), encres in itertools.izip(
152 152 self._rsp, self._resultiter):
153 153 encresref.set(encres)
154 154 yield next(batchable)
155 155
156 156 # Forward a couple of names from peer to make wireproto interactions
157 157 # slightly more sensible.
158 158 batchable = peer.batchable
159 159 future = peer.future
160 160
161 161 # list of nodes encoding / decoding
162 162
163 163 def decodelist(l, sep=' '):
164 164 if l:
165 165 return map(bin, l.split(sep))
166 166 return []
167 167
168 168 def encodelist(l, sep=' '):
169 169 try:
170 170 return sep.join(map(hex, l))
171 171 except TypeError:
172 172 raise
173 173
174 174 # batched call argument encoding
175 175
176 176 def escapearg(plain):
177 177 return (plain
178 178 .replace(':', ':c')
179 179 .replace(',', ':o')
180 180 .replace(';', ':s')
181 181 .replace('=', ':e'))
182 182
183 183 def unescapearg(escaped):
184 184 return (escaped
185 185 .replace(':e', '=')
186 186 .replace(':s', ';')
187 187 .replace(':o', ',')
188 188 .replace(':c', ':'))
189 189
190 190 def encodebatchcmds(req):
191 191 """Return a ``cmds`` argument value for the ``batch`` command."""
192 192 cmds = []
193 193 for op, argsdict in req:
194 # Old servers didn't properly unescape argument names. So prevent
195 # the sending of argument names that may not be decoded properly by
196 # servers.
197 assert all(escapearg(k) == k for k in argsdict)
198
194 199 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
195 200 for k, v in argsdict.iteritems())
196 201 cmds.append('%s %s' % (op, args))
197 202
198 203 return ';'.join(cmds)
199 204
200 205 # mapping of options accepted by getbundle and their types
201 206 #
202 207 # Meant to be extended by extensions. It is extensions responsibility to ensure
203 208 # such options are properly processed in exchange.getbundle.
204 209 #
205 210 # supported types are:
206 211 #
207 212 # :nodes: list of binary nodes
208 213 # :csv: list of comma-separated values
209 214 # :scsv: list of comma-separated values return as set
210 215 # :plain: string with no transformation needed.
211 216 gboptsmap = {'heads': 'nodes',
212 217 'common': 'nodes',
213 218 'obsmarkers': 'boolean',
214 219 'bundlecaps': 'scsv',
215 220 'listkeys': 'csv',
216 221 'cg': 'boolean',
217 222 'cbattempted': 'boolean'}
218 223
219 224 # client side
220 225
221 226 class wirepeer(peer.peerrepository):
222 227 """Client-side interface for communicating with a peer repository.
223 228
224 229 Methods commonly call wire protocol commands of the same name.
225 230
226 231 See also httppeer.py and sshpeer.py for protocol-specific
227 232 implementations of this interface.
228 233 """
229 234 def batch(self):
230 235 if self.capable('batch'):
231 236 return remotebatch(self)
232 237 else:
233 238 return peer.localbatch(self)
234 239 def _submitbatch(self, req):
235 240 """run batch request <req> on the server
236 241
237 242 Returns an iterator of the raw responses from the server.
238 243 """
239 244 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
240 245 chunk = rsp.read(1024)
241 246 work = [chunk]
242 247 while chunk:
243 248 while ';' not in chunk and chunk:
244 249 chunk = rsp.read(1024)
245 250 work.append(chunk)
246 251 merged = ''.join(work)
247 252 while ';' in merged:
248 253 one, merged = merged.split(';', 1)
249 254 yield unescapearg(one)
250 255 chunk = rsp.read(1024)
251 256 work = [merged, chunk]
252 257 yield unescapearg(''.join(work))
253 258
254 259 def _submitone(self, op, args):
255 260 return self._call(op, **args)
256 261
257 262 def iterbatch(self):
258 263 return remoteiterbatcher(self)
259 264
260 265 @batchable
261 266 def lookup(self, key):
262 267 self.requirecap('lookup', _('look up remote revision'))
263 268 f = future()
264 269 yield {'key': encoding.fromlocal(key)}, f
265 270 d = f.value
266 271 success, data = d[:-1].split(" ", 1)
267 272 if int(success):
268 273 yield bin(data)
269 274 self._abort(error.RepoError(data))
270 275
271 276 @batchable
272 277 def heads(self):
273 278 f = future()
274 279 yield {}, f
275 280 d = f.value
276 281 try:
277 282 yield decodelist(d[:-1])
278 283 except ValueError:
279 284 self._abort(error.ResponseError(_("unexpected response:"), d))
280 285
281 286 @batchable
282 287 def known(self, nodes):
283 288 f = future()
284 289 yield {'nodes': encodelist(nodes)}, f
285 290 d = f.value
286 291 try:
287 292 yield [bool(int(b)) for b in d]
288 293 except ValueError:
289 294 self._abort(error.ResponseError(_("unexpected response:"), d))
290 295
291 296 @batchable
292 297 def branchmap(self):
293 298 f = future()
294 299 yield {}, f
295 300 d = f.value
296 301 try:
297 302 branchmap = {}
298 303 for branchpart in d.splitlines():
299 304 branchname, branchheads = branchpart.split(' ', 1)
300 305 branchname = encoding.tolocal(urlreq.unquote(branchname))
301 306 branchheads = decodelist(branchheads)
302 307 branchmap[branchname] = branchheads
303 308 yield branchmap
304 309 except TypeError:
305 310 self._abort(error.ResponseError(_("unexpected response:"), d))
306 311
307 312 def branches(self, nodes):
308 313 n = encodelist(nodes)
309 314 d = self._call("branches", nodes=n)
310 315 try:
311 316 br = [tuple(decodelist(b)) for b in d.splitlines()]
312 317 return br
313 318 except ValueError:
314 319 self._abort(error.ResponseError(_("unexpected response:"), d))
315 320
316 321 def between(self, pairs):
317 322 batch = 8 # avoid giant requests
318 323 r = []
319 324 for i in xrange(0, len(pairs), batch):
320 325 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
321 326 d = self._call("between", pairs=n)
322 327 try:
323 328 r.extend(l and decodelist(l) or [] for l in d.splitlines())
324 329 except ValueError:
325 330 self._abort(error.ResponseError(_("unexpected response:"), d))
326 331 return r
327 332
328 333 @batchable
329 334 def pushkey(self, namespace, key, old, new):
330 335 if not self.capable('pushkey'):
331 336 yield False, None
332 337 f = future()
333 338 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
334 339 yield {'namespace': encoding.fromlocal(namespace),
335 340 'key': encoding.fromlocal(key),
336 341 'old': encoding.fromlocal(old),
337 342 'new': encoding.fromlocal(new)}, f
338 343 d = f.value
339 344 d, output = d.split('\n', 1)
340 345 try:
341 346 d = bool(int(d))
342 347 except ValueError:
343 348 raise error.ResponseError(
344 349 _('push failed (unexpected response):'), d)
345 350 for l in output.splitlines(True):
346 351 self.ui.status(_('remote: '), l)
347 352 yield d
348 353
349 354 @batchable
350 355 def listkeys(self, namespace):
351 356 if not self.capable('pushkey'):
352 357 yield {}, None
353 358 f = future()
354 359 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
355 360 yield {'namespace': encoding.fromlocal(namespace)}, f
356 361 d = f.value
357 362 self.ui.debug('received listkey for "%s": %i bytes\n'
358 363 % (namespace, len(d)))
359 364 yield pushkeymod.decodekeys(d)
360 365
361 366 def stream_out(self):
362 367 return self._callstream('stream_out')
363 368
364 369 def changegroup(self, nodes, kind):
365 370 n = encodelist(nodes)
366 371 f = self._callcompressable("changegroup", roots=n)
367 372 return changegroupmod.cg1unpacker(f, 'UN')
368 373
369 374 def changegroupsubset(self, bases, heads, kind):
370 375 self.requirecap('changegroupsubset', _('look up remote changes'))
371 376 bases = encodelist(bases)
372 377 heads = encodelist(heads)
373 378 f = self._callcompressable("changegroupsubset",
374 379 bases=bases, heads=heads)
375 380 return changegroupmod.cg1unpacker(f, 'UN')
376 381
377 382 def getbundle(self, source, **kwargs):
378 383 self.requirecap('getbundle', _('look up remote changes'))
379 384 opts = {}
380 385 bundlecaps = kwargs.get('bundlecaps')
381 386 if bundlecaps is not None:
382 387 kwargs['bundlecaps'] = sorted(bundlecaps)
383 388 else:
384 389 bundlecaps = () # kwargs could have it to None
385 390 for key, value in kwargs.iteritems():
386 391 if value is None:
387 392 continue
388 393 keytype = gboptsmap.get(key)
389 394 if keytype is None:
390 395 assert False, 'unexpected'
391 396 elif keytype == 'nodes':
392 397 value = encodelist(value)
393 398 elif keytype in ('csv', 'scsv'):
394 399 value = ','.join(value)
395 400 elif keytype == 'boolean':
396 401 value = '%i' % bool(value)
397 402 elif keytype != 'plain':
398 403 raise KeyError('unknown getbundle option type %s'
399 404 % keytype)
400 405 opts[key] = value
401 406 f = self._callcompressable("getbundle", **opts)
402 407 if any((cap.startswith('HG2') for cap in bundlecaps)):
403 408 return bundle2.getunbundler(self.ui, f)
404 409 else:
405 410 return changegroupmod.cg1unpacker(f, 'UN')
406 411
407 412 def unbundle(self, cg, heads, url):
408 413 '''Send cg (a readable file-like object representing the
409 414 changegroup to push, typically a chunkbuffer object) to the
410 415 remote server as a bundle.
411 416
412 417 When pushing a bundle10 stream, return an integer indicating the
413 418 result of the push (see localrepository.addchangegroup()).
414 419
415 420 When pushing a bundle20 stream, return a bundle20 stream.
416 421
417 422 `url` is the url the client thinks it's pushing to, which is
418 423 visible to hooks.
419 424 '''
420 425
421 426 if heads != ['force'] and self.capable('unbundlehash'):
422 427 heads = encodelist(['hashed',
423 428 hashlib.sha1(''.join(sorted(heads))).digest()])
424 429 else:
425 430 heads = encodelist(heads)
426 431
427 432 if util.safehasattr(cg, 'deltaheader'):
428 433 # this a bundle10, do the old style call sequence
429 434 ret, output = self._callpush("unbundle", cg, heads=heads)
430 435 if ret == "":
431 436 raise error.ResponseError(
432 437 _('push failed:'), output)
433 438 try:
434 439 ret = int(ret)
435 440 except ValueError:
436 441 raise error.ResponseError(
437 442 _('push failed (unexpected response):'), ret)
438 443
439 444 for l in output.splitlines(True):
440 445 self.ui.status(_('remote: '), l)
441 446 else:
442 447 # bundle2 push. Send a stream, fetch a stream.
443 448 stream = self._calltwowaystream('unbundle', cg, heads=heads)
444 449 ret = bundle2.getunbundler(self.ui, stream)
445 450 return ret
446 451
447 452 def debugwireargs(self, one, two, three=None, four=None, five=None):
448 453 # don't pass optional arguments left at their default value
449 454 opts = {}
450 455 if three is not None:
451 456 opts['three'] = three
452 457 if four is not None:
453 458 opts['four'] = four
454 459 return self._call('debugwireargs', one=one, two=two, **opts)
455 460
456 461 def _call(self, cmd, **args):
457 462 """execute <cmd> on the server
458 463
459 464 The command is expected to return a simple string.
460 465
461 466 returns the server reply as a string."""
462 467 raise NotImplementedError()
463 468
464 469 def _callstream(self, cmd, **args):
465 470 """execute <cmd> on the server
466 471
467 472 The command is expected to return a stream. Note that if the
468 473 command doesn't return a stream, _callstream behaves
469 474 differently for ssh and http peers.
470 475
471 476 returns the server reply as a file like object.
472 477 """
473 478 raise NotImplementedError()
474 479
475 480 def _callcompressable(self, cmd, **args):
476 481 """execute <cmd> on the server
477 482
478 483 The command is expected to return a stream.
479 484
480 485 The stream may have been compressed in some implementations. This
481 486 function takes care of the decompression. This is the only difference
482 487 with _callstream.
483 488
484 489 returns the server reply as a file like object.
485 490 """
486 491 raise NotImplementedError()
487 492
488 493 def _callpush(self, cmd, fp, **args):
489 494 """execute a <cmd> on server
490 495
491 496 The command is expected to be related to a push. Push has a special
492 497 return method.
493 498
494 499 returns the server reply as a (ret, output) tuple. ret is either
495 500 empty (error) or a stringified int.
496 501 """
497 502 raise NotImplementedError()
498 503
499 504 def _calltwowaystream(self, cmd, fp, **args):
500 505 """execute <cmd> on server
501 506
502 507 The command will send a stream to the server and get a stream in reply.
503 508 """
504 509 raise NotImplementedError()
505 510
506 511 def _abort(self, exception):
507 512 """clearly abort the wire protocol connection and raise the exception
508 513 """
509 514 raise NotImplementedError()
510 515
511 516 # server side
512 517
513 518 # wire protocol command can either return a string or one of these classes.
514 519 class streamres(object):
515 520 """wireproto reply: binary stream
516 521
517 522 The call was successful and the result is a stream.
518 523 Iterate on the `self.gen` attribute to retrieve chunks.
519 524 """
520 525 def __init__(self, gen):
521 526 self.gen = gen
522 527
523 528 class pushres(object):
524 529 """wireproto reply: success with simple integer return
525 530
526 531 The call was successful and returned an integer contained in `self.res`.
527 532 """
528 533 def __init__(self, res):
529 534 self.res = res
530 535
531 536 class pusherr(object):
532 537 """wireproto reply: failure
533 538
534 539 The call failed. The `self.res` attribute contains the error message.
535 540 """
536 541 def __init__(self, res):
537 542 self.res = res
538 543
539 544 class ooberror(object):
540 545 """wireproto reply: failure of a batch of operation
541 546
542 547 Something failed during a batch call. The error message is stored in
543 548 `self.message`.
544 549 """
545 550 def __init__(self, message):
546 551 self.message = message
547 552
548 553 def getdispatchrepo(repo, proto, command):
549 554 """Obtain the repo used for processing wire protocol commands.
550 555
551 556 The intent of this function is to serve as a monkeypatch point for
552 557 extensions that need commands to operate on different repo views under
553 558 specialized circumstances.
554 559 """
555 560 return repo.filtered('served')
556 561
557 562 def dispatch(repo, proto, command):
558 563 repo = getdispatchrepo(repo, proto, command)
559 564 func, spec = commands[command]
560 565 args = proto.getargs(spec)
561 566 return func(repo, proto, *args)
562 567
563 568 def options(cmd, keys, others):
564 569 opts = {}
565 570 for k in keys:
566 571 if k in others:
567 572 opts[k] = others[k]
568 573 del others[k]
569 574 if others:
570 575 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
571 576 % (cmd, ",".join(others)))
572 577 return opts
573 578
574 579 def bundle1allowed(repo, action):
575 580 """Whether a bundle1 operation is allowed from the server.
576 581
577 582 Priority is:
578 583
579 584 1. server.bundle1gd.<action> (if generaldelta active)
580 585 2. server.bundle1.<action>
581 586 3. server.bundle1gd (if generaldelta active)
582 587 4. server.bundle1
583 588 """
584 589 ui = repo.ui
585 590 gd = 'generaldelta' in repo.requirements
586 591
587 592 if gd:
588 593 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
589 594 if v is not None:
590 595 return v
591 596
592 597 v = ui.configbool('server', 'bundle1.%s' % action, None)
593 598 if v is not None:
594 599 return v
595 600
596 601 if gd:
597 602 v = ui.configbool('server', 'bundle1gd', None)
598 603 if v is not None:
599 604 return v
600 605
601 606 return ui.configbool('server', 'bundle1', True)
602 607
603 608 # list of commands
604 609 commands = {}
605 610
606 611 def wireprotocommand(name, args=''):
607 612 """decorator for wire protocol command"""
608 613 def register(func):
609 614 commands[name] = (func, args)
610 615 return func
611 616 return register
612 617
613 618 @wireprotocommand('batch', 'cmds *')
614 619 def batch(repo, proto, cmds, others):
615 620 repo = repo.filtered("served")
616 621 res = []
617 622 for pair in cmds.split(';'):
618 623 op, args = pair.split(' ', 1)
619 624 vals = {}
620 625 for a in args.split(','):
621 626 if a:
622 627 n, v = a.split('=')
623 vals[n] = unescapearg(v)
628 vals[unescapearg(n)] = unescapearg(v)
624 629 func, spec = commands[op]
625 630 if spec:
626 631 keys = spec.split()
627 632 data = {}
628 633 for k in keys:
629 634 if k == '*':
630 635 star = {}
631 636 for key in vals.keys():
632 637 if key not in keys:
633 638 star[key] = vals[key]
634 639 data['*'] = star
635 640 else:
636 641 data[k] = vals[k]
637 642 result = func(repo, proto, *[data[k] for k in keys])
638 643 else:
639 644 result = func(repo, proto)
640 645 if isinstance(result, ooberror):
641 646 return result
642 647 res.append(escapearg(result))
643 648 return ';'.join(res)
644 649
645 650 @wireprotocommand('between', 'pairs')
646 651 def between(repo, proto, pairs):
647 652 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
648 653 r = []
649 654 for b in repo.between(pairs):
650 655 r.append(encodelist(b) + "\n")
651 656 return "".join(r)
652 657
653 658 @wireprotocommand('branchmap')
654 659 def branchmap(repo, proto):
655 660 branchmap = repo.branchmap()
656 661 heads = []
657 662 for branch, nodes in branchmap.iteritems():
658 663 branchname = urlreq.quote(encoding.fromlocal(branch))
659 664 branchnodes = encodelist(nodes)
660 665 heads.append('%s %s' % (branchname, branchnodes))
661 666 return '\n'.join(heads)
662 667
663 668 @wireprotocommand('branches', 'nodes')
664 669 def branches(repo, proto, nodes):
665 670 nodes = decodelist(nodes)
666 671 r = []
667 672 for b in repo.branches(nodes):
668 673 r.append(encodelist(b) + "\n")
669 674 return "".join(r)
670 675
671 676 @wireprotocommand('clonebundles', '')
672 677 def clonebundles(repo, proto):
673 678 """Server command for returning info for available bundles to seed clones.
674 679
675 680 Clients will parse this response and determine what bundle to fetch.
676 681
677 682 Extensions may wrap this command to filter or dynamically emit data
678 683 depending on the request. e.g. you could advertise URLs for the closest
679 684 data center given the client's IP address.
680 685 """
681 686 return repo.opener.tryread('clonebundles.manifest')
682 687
683 688 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
684 689 'known', 'getbundle', 'unbundlehash', 'batch']
685 690
686 691 def _capabilities(repo, proto):
687 692 """return a list of capabilities for a repo
688 693
689 694 This function exists to allow extensions to easily wrap capabilities
690 695 computation
691 696
692 697 - returns a lists: easy to alter
693 698 - change done here will be propagated to both `capabilities` and `hello`
694 699 command without any other action needed.
695 700 """
696 701 # copy to prevent modification of the global list
697 702 caps = list(wireprotocaps)
698 703 if streamclone.allowservergeneration(repo.ui):
699 704 if repo.ui.configbool('server', 'preferuncompressed', False):
700 705 caps.append('stream-preferred')
701 706 requiredformats = repo.requirements & repo.supportedformats
702 707 # if our local revlogs are just revlogv1, add 'stream' cap
703 708 if not requiredformats - set(('revlogv1',)):
704 709 caps.append('stream')
705 710 # otherwise, add 'streamreqs' detailing our local revlog format
706 711 else:
707 712 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
708 713 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
709 714 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
710 715 caps.append('bundle2=' + urlreq.quote(capsblob))
711 716 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
712 717 caps.append(
713 718 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
714 719 if repo.ui.configbool('experimental', 'httppostargs', False):
715 720 caps.append('httppostargs')
716 721 return caps
717 722
718 723 # If you are writing an extension and consider wrapping this function. Wrap
719 724 # `_capabilities` instead.
720 725 @wireprotocommand('capabilities')
721 726 def capabilities(repo, proto):
722 727 return ' '.join(_capabilities(repo, proto))
723 728
724 729 @wireprotocommand('changegroup', 'roots')
725 730 def changegroup(repo, proto, roots):
726 731 nodes = decodelist(roots)
727 732 cg = changegroupmod.changegroup(repo, nodes, 'serve')
728 733 return streamres(proto.groupchunks(cg))
729 734
730 735 @wireprotocommand('changegroupsubset', 'bases heads')
731 736 def changegroupsubset(repo, proto, bases, heads):
732 737 bases = decodelist(bases)
733 738 heads = decodelist(heads)
734 739 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
735 740 return streamres(proto.groupchunks(cg))
736 741
737 742 @wireprotocommand('debugwireargs', 'one two *')
738 743 def debugwireargs(repo, proto, one, two, others):
739 744 # only accept optional args from the known set
740 745 opts = options('debugwireargs', ['three', 'four'], others)
741 746 return repo.debugwireargs(one, two, **opts)
742 747
743 748 # List of options accepted by getbundle.
744 749 #
745 750 # Meant to be extended by extensions. It is the extension's responsibility to
746 751 # ensure such options are properly processed in exchange.getbundle.
747 752 gboptslist = ['heads', 'common', 'bundlecaps']
748 753
749 754 @wireprotocommand('getbundle', '*')
750 755 def getbundle(repo, proto, others):
751 756 opts = options('getbundle', gboptsmap.keys(), others)
752 757 for k, v in opts.iteritems():
753 758 keytype = gboptsmap[k]
754 759 if keytype == 'nodes':
755 760 opts[k] = decodelist(v)
756 761 elif keytype == 'csv':
757 762 opts[k] = list(v.split(','))
758 763 elif keytype == 'scsv':
759 764 opts[k] = set(v.split(','))
760 765 elif keytype == 'boolean':
761 766 # Client should serialize False as '0', which is a non-empty string
762 767 # so it evaluates as a True bool.
763 768 if v == '0':
764 769 opts[k] = False
765 770 else:
766 771 opts[k] = bool(v)
767 772 elif keytype != 'plain':
768 773 raise KeyError('unknown getbundle option type %s'
769 774 % keytype)
770 775
771 776 if not bundle1allowed(repo, 'pull'):
772 777 if not exchange.bundle2requested(opts.get('bundlecaps')):
773 778 return ooberror(bundle2required)
774 779
775 780 cg = exchange.getbundle(repo, 'serve', **opts)
776 781 return streamres(proto.groupchunks(cg))
777 782
778 783 @wireprotocommand('heads')
779 784 def heads(repo, proto):
780 785 h = repo.heads()
781 786 return encodelist(h) + "\n"
782 787
783 788 @wireprotocommand('hello')
784 789 def hello(repo, proto):
785 790 '''the hello command returns a set of lines describing various
786 791 interesting things about the server, in an RFC822-like format.
787 792 Currently the only one defined is "capabilities", which
788 793 consists of a line in the form:
789 794
790 795 capabilities: space separated list of tokens
791 796 '''
792 797 return "capabilities: %s\n" % (capabilities(repo, proto))
793 798
794 799 @wireprotocommand('listkeys', 'namespace')
795 800 def listkeys(repo, proto, namespace):
796 801 d = repo.listkeys(encoding.tolocal(namespace)).items()
797 802 return pushkeymod.encodekeys(d)
798 803
799 804 @wireprotocommand('lookup', 'key')
800 805 def lookup(repo, proto, key):
801 806 try:
802 807 k = encoding.tolocal(key)
803 808 c = repo[k]
804 809 r = c.hex()
805 810 success = 1
806 811 except Exception as inst:
807 812 r = str(inst)
808 813 success = 0
809 814 return "%s %s\n" % (success, r)
810 815
811 816 @wireprotocommand('known', 'nodes *')
812 817 def known(repo, proto, nodes, others):
813 818 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
814 819
815 820 @wireprotocommand('pushkey', 'namespace key old new')
816 821 def pushkey(repo, proto, namespace, key, old, new):
817 822 # compatibility with pre-1.8 clients which were accidentally
818 823 # sending raw binary nodes rather than utf-8-encoded hex
819 824 if len(new) == 20 and new.encode('string-escape') != new:
820 825 # looks like it could be a binary node
821 826 try:
822 827 new.decode('utf-8')
823 828 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
824 829 except UnicodeDecodeError:
825 830 pass # binary, leave unmodified
826 831 else:
827 832 new = encoding.tolocal(new) # normal path
828 833
829 834 if util.safehasattr(proto, 'restore'):
830 835
831 836 proto.redirect()
832 837
833 838 try:
834 839 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
835 840 encoding.tolocal(old), new) or False
836 841 except error.Abort:
837 842 r = False
838 843
839 844 output = proto.restore()
840 845
841 846 return '%s\n%s' % (int(r), output)
842 847
843 848 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
844 849 encoding.tolocal(old), new)
845 850 return '%s\n' % int(r)
846 851
847 852 @wireprotocommand('stream_out')
848 853 def stream(repo, proto):
849 854 '''If the server supports streaming clone, it advertises the "stream"
850 855 capability with a value representing the version and flags of the repo
851 856 it is serving. Client checks to see if it understands the format.
852 857 '''
853 858 if not streamclone.allowservergeneration(repo.ui):
854 859 return '1\n'
855 860
856 861 def getstream(it):
857 862 yield '0\n'
858 863 for chunk in it:
859 864 yield chunk
860 865
861 866 try:
862 867 # LockError may be raised before the first result is yielded. Don't
863 868 # emit output until we're sure we got the lock successfully.
864 869 it = streamclone.generatev1wireproto(repo)
865 870 return streamres(getstream(it))
866 871 except error.LockError:
867 872 return '2\n'
868 873
869 874 @wireprotocommand('unbundle', 'heads')
870 875 def unbundle(repo, proto, heads):
871 876 their_heads = decodelist(heads)
872 877
873 878 try:
874 879 proto.redirect()
875 880
876 881 exchange.check_heads(repo, their_heads, 'preparing changes')
877 882
878 883 # write bundle data to temporary file because it can be big
879 884 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
880 885 fp = os.fdopen(fd, 'wb+')
881 886 r = 0
882 887 try:
883 888 proto.getfile(fp)
884 889 fp.seek(0)
885 890 gen = exchange.readbundle(repo.ui, fp, None)
886 891 if (isinstance(gen, changegroupmod.cg1unpacker)
887 892 and not bundle1allowed(repo, 'push')):
888 893 return ooberror(bundle2required)
889 894
890 895 r = exchange.unbundle(repo, gen, their_heads, 'serve',
891 896 proto._client())
892 897 if util.safehasattr(r, 'addpart'):
893 898 # The return looks streamable, we are in the bundle2 case and
894 899 # should return a stream.
895 900 return streamres(r.getchunks())
896 901 return pushres(r)
897 902
898 903 finally:
899 904 fp.close()
900 905 os.unlink(tempname)
901 906
902 907 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
903 908 # handle non-bundle2 case first
904 909 if not getattr(exc, 'duringunbundle2', False):
905 910 try:
906 911 raise
907 912 except error.Abort:
908 913 # The old code we moved used sys.stderr directly.
909 914 # We did not change it to minimise code change.
910 915 # This need to be moved to something proper.
911 916 # Feel free to do it.
912 917 sys.stderr.write("abort: %s\n" % exc)
913 918 return pushres(0)
914 919 except error.PushRaced:
915 920 return pusherr(str(exc))
916 921
917 922 bundler = bundle2.bundle20(repo.ui)
918 923 for out in getattr(exc, '_bundle2salvagedoutput', ()):
919 924 bundler.addpart(out)
920 925 try:
921 926 try:
922 927 raise
923 928 except error.PushkeyFailed as exc:
924 929 # check client caps
925 930 remotecaps = getattr(exc, '_replycaps', None)
926 931 if (remotecaps is not None
927 932 and 'pushkey' not in remotecaps.get('error', ())):
928 933 # no support remote side, fallback to Abort handler.
929 934 raise
930 935 part = bundler.newpart('error:pushkey')
931 936 part.addparam('in-reply-to', exc.partid)
932 937 if exc.namespace is not None:
933 938 part.addparam('namespace', exc.namespace, mandatory=False)
934 939 if exc.key is not None:
935 940 part.addparam('key', exc.key, mandatory=False)
936 941 if exc.new is not None:
937 942 part.addparam('new', exc.new, mandatory=False)
938 943 if exc.old is not None:
939 944 part.addparam('old', exc.old, mandatory=False)
940 945 if exc.ret is not None:
941 946 part.addparam('ret', exc.ret, mandatory=False)
942 947 except error.BundleValueError as exc:
943 948 errpart = bundler.newpart('error:unsupportedcontent')
944 949 if exc.parttype is not None:
945 950 errpart.addparam('parttype', exc.parttype)
946 951 if exc.params:
947 952 errpart.addparam('params', '\0'.join(exc.params))
948 953 except error.Abort as exc:
949 954 manargs = [('message', str(exc))]
950 955 advargs = []
951 956 if exc.hint is not None:
952 957 advargs.append(('hint', exc.hint))
953 958 bundler.addpart(bundle2.bundlepart('error:abort',
954 959 manargs, advargs))
955 960 except error.PushRaced as exc:
956 961 bundler.newpart('error:pushraced', [('message', str(exc))])
957 962 return streamres(bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now