##// END OF EJS Templates
wirepeer: rename confusing `source` parameter...
Augie Fackler -
r29706:7f6130c7 default
parent child Browse files
Show More
@@ -1,948 +1,952
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import itertools
12 12 import os
13 13 import sys
14 14 import tempfile
15 15
16 16 from .i18n import _
17 17 from .node import (
18 18 bin,
19 19 hex,
20 20 )
21 21
22 22 from . import (
23 23 bundle2,
24 24 changegroup as changegroupmod,
25 25 encoding,
26 26 error,
27 27 exchange,
28 28 peer,
29 29 pushkey as pushkeymod,
30 30 streamclone,
31 31 util,
32 32 )
33 33
34 34 urlerr = util.urlerr
35 35 urlreq = util.urlreq
36 36
37 37 bundle2required = _(
38 38 'incompatible Mercurial client; bundle2 required\n'
39 39 '(see https://www.mercurial-scm.org/wiki/IncompatibleClient)\n')
40 40
41 41 class abstractserverproto(object):
42 42 """abstract class that summarizes the protocol API
43 43
44 44 Used as reference and documentation.
45 45 """
46 46
47 47 def getargs(self, args):
48 48 """return the value for arguments in <args>
49 49
50 50 returns a list of values (same order as <args>)"""
51 51 raise NotImplementedError()
52 52
53 53 def getfile(self, fp):
54 54 """write the whole content of a file into a file like object
55 55
56 56 The file is in the form::
57 57
58 58 (<chunk-size>\n<chunk>)+0\n
59 59
60 60 chunk size is the ascii version of the int.
61 61 """
62 62 raise NotImplementedError()
63 63
64 64 def redirect(self):
65 65 """may setup interception for stdout and stderr
66 66
67 67 See also the `restore` method."""
68 68 raise NotImplementedError()
69 69
70 70 # If the `redirect` function does install interception, the `restore`
71 71 # function MUST be defined. If interception is not used, this function
72 72 # MUST NOT be defined.
73 73 #
74 74 # left commented here on purpose
75 75 #
76 76 #def restore(self):
77 77 # """reinstall previous stdout and stderr and return intercepted stdout
78 78 # """
79 79 # raise NotImplementedError()
80 80
81 81 def groupchunks(self, cg):
82 82 """return 4096 chunks from a changegroup object
83 83
84 84 Some protocols may have compressed the contents."""
85 85 raise NotImplementedError()
86 86
87 87 class remotebatch(peer.batcher):
88 88 '''batches the queued calls; uses as few roundtrips as possible'''
89 89 def __init__(self, remote):
90 90 '''remote must support _submitbatch(encbatch) and
91 91 _submitone(op, encargs)'''
92 92 peer.batcher.__init__(self)
93 93 self.remote = remote
94 94 def submit(self):
95 95 req, rsp = [], []
96 96 for name, args, opts, resref in self.calls:
97 97 mtd = getattr(self.remote, name)
98 98 batchablefn = getattr(mtd, 'batchable', None)
99 99 if batchablefn is not None:
100 100 batchable = batchablefn(mtd.im_self, *args, **opts)
101 101 encargsorres, encresref = next(batchable)
102 102 if encresref:
103 103 req.append((name, encargsorres,))
104 104 rsp.append((batchable, encresref, resref,))
105 105 else:
106 106 resref.set(encargsorres)
107 107 else:
108 108 if req:
109 109 self._submitreq(req, rsp)
110 110 req, rsp = [], []
111 111 resref.set(mtd(*args, **opts))
112 112 if req:
113 113 self._submitreq(req, rsp)
114 114 def _submitreq(self, req, rsp):
115 115 encresults = self.remote._submitbatch(req)
116 116 for encres, r in zip(encresults, rsp):
117 117 batchable, encresref, resref = r
118 118 encresref.set(encres)
119 119 resref.set(next(batchable))
120 120
121 121 class remoteiterbatcher(peer.iterbatcher):
122 122 def __init__(self, remote):
123 123 super(remoteiterbatcher, self).__init__()
124 124 self._remote = remote
125 125
126 126 def __getattr__(self, name):
127 127 if not getattr(self._remote, name, False):
128 128 raise AttributeError(
129 129 'Attempted to iterbatch non-batchable call to %r' % name)
130 130 return super(remoteiterbatcher, self).__getattr__(name)
131 131
132 132 def submit(self):
133 133 """Break the batch request into many patch calls and pipeline them.
134 134
135 135 This is mostly valuable over http where request sizes can be
136 136 limited, but can be used in other places as well.
137 137 """
138 138 req, rsp = [], []
139 139 for name, args, opts, resref in self.calls:
140 140 mtd = getattr(self._remote, name)
141 141 batchable = mtd.batchable(mtd.im_self, *args, **opts)
142 142 encargsorres, encresref = next(batchable)
143 143 assert encresref
144 144 req.append((name, encargsorres))
145 145 rsp.append((batchable, encresref))
146 146 if req:
147 147 self._resultiter = self._remote._submitbatch(req)
148 148 self._rsp = rsp
149 149
150 150 def results(self):
151 151 for (batchable, encresref), encres in itertools.izip(
152 152 self._rsp, self._resultiter):
153 153 encresref.set(encres)
154 154 yield next(batchable)
155 155
156 156 # Forward a couple of names from peer to make wireproto interactions
157 157 # slightly more sensible.
158 158 batchable = peer.batchable
159 159 future = peer.future
160 160
161 161 # list of nodes encoding / decoding
162 162
163 163 def decodelist(l, sep=' '):
164 164 if l:
165 165 return map(bin, l.split(sep))
166 166 return []
167 167
168 168 def encodelist(l, sep=' '):
169 169 try:
170 170 return sep.join(map(hex, l))
171 171 except TypeError:
172 172 raise
173 173
174 174 # batched call argument encoding
175 175
176 176 def escapearg(plain):
177 177 return (plain
178 178 .replace(':', ':c')
179 179 .replace(',', ':o')
180 180 .replace(';', ':s')
181 181 .replace('=', ':e'))
182 182
183 183 def unescapearg(escaped):
184 184 return (escaped
185 185 .replace(':e', '=')
186 186 .replace(':s', ';')
187 187 .replace(':o', ',')
188 188 .replace(':c', ':'))
189 189
190 190 # mapping of options accepted by getbundle and their types
191 191 #
192 192 # Meant to be extended by extensions. It is extensions responsibility to ensure
193 193 # such options are properly processed in exchange.getbundle.
194 194 #
195 195 # supported types are:
196 196 #
197 197 # :nodes: list of binary nodes
198 198 # :csv: list of comma-separated values
199 199 # :scsv: list of comma-separated values return as set
200 200 # :plain: string with no transformation needed.
201 201 gboptsmap = {'heads': 'nodes',
202 202 'common': 'nodes',
203 203 'obsmarkers': 'boolean',
204 204 'bundlecaps': 'scsv',
205 205 'listkeys': 'csv',
206 206 'cg': 'boolean',
207 207 'cbattempted': 'boolean'}
208 208
209 209 # client side
210 210
211 211 class wirepeer(peer.peerrepository):
212 212 """Client-side interface for communicating with a peer repository.
213 213
214 214 Methods commonly call wire protocol commands of the same name.
215 215
216 216 See also httppeer.py and sshpeer.py for protocol-specific
217 217 implementations of this interface.
218 218 """
219 219 def batch(self):
220 220 if self.capable('batch'):
221 221 return remotebatch(self)
222 222 else:
223 223 return peer.localbatch(self)
224 224 def _submitbatch(self, req):
225 225 """run batch request <req> on the server
226 226
227 227 Returns an iterator of the raw responses from the server.
228 228 """
229 229 cmds = []
230 230 for op, argsdict in req:
231 231 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
232 232 for k, v in argsdict.iteritems())
233 233 cmds.append('%s %s' % (op, args))
234 234 rsp = self._callstream("batch", cmds=';'.join(cmds))
235 235 chunk = rsp.read(1024)
236 236 work = [chunk]
237 237 while chunk:
238 238 while ';' not in chunk and chunk:
239 239 chunk = rsp.read(1024)
240 240 work.append(chunk)
241 241 merged = ''.join(work)
242 242 while ';' in merged:
243 243 one, merged = merged.split(';', 1)
244 244 yield unescapearg(one)
245 245 chunk = rsp.read(1024)
246 246 work = [merged, chunk]
247 247 yield unescapearg(''.join(work))
248 248
249 249 def _submitone(self, op, args):
250 250 return self._call(op, **args)
251 251
252 252 def iterbatch(self):
253 253 return remoteiterbatcher(self)
254 254
255 255 @batchable
256 256 def lookup(self, key):
257 257 self.requirecap('lookup', _('look up remote revision'))
258 258 f = future()
259 259 yield {'key': encoding.fromlocal(key)}, f
260 260 d = f.value
261 261 success, data = d[:-1].split(" ", 1)
262 262 if int(success):
263 263 yield bin(data)
264 264 self._abort(error.RepoError(data))
265 265
266 266 @batchable
267 267 def heads(self):
268 268 f = future()
269 269 yield {}, f
270 270 d = f.value
271 271 try:
272 272 yield decodelist(d[:-1])
273 273 except ValueError:
274 274 self._abort(error.ResponseError(_("unexpected response:"), d))
275 275
276 276 @batchable
277 277 def known(self, nodes):
278 278 f = future()
279 279 yield {'nodes': encodelist(nodes)}, f
280 280 d = f.value
281 281 try:
282 282 yield [bool(int(b)) for b in d]
283 283 except ValueError:
284 284 self._abort(error.ResponseError(_("unexpected response:"), d))
285 285
286 286 @batchable
287 287 def branchmap(self):
288 288 f = future()
289 289 yield {}, f
290 290 d = f.value
291 291 try:
292 292 branchmap = {}
293 293 for branchpart in d.splitlines():
294 294 branchname, branchheads = branchpart.split(' ', 1)
295 295 branchname = encoding.tolocal(urlreq.unquote(branchname))
296 296 branchheads = decodelist(branchheads)
297 297 branchmap[branchname] = branchheads
298 298 yield branchmap
299 299 except TypeError:
300 300 self._abort(error.ResponseError(_("unexpected response:"), d))
301 301
302 302 def branches(self, nodes):
303 303 n = encodelist(nodes)
304 304 d = self._call("branches", nodes=n)
305 305 try:
306 306 br = [tuple(decodelist(b)) for b in d.splitlines()]
307 307 return br
308 308 except ValueError:
309 309 self._abort(error.ResponseError(_("unexpected response:"), d))
310 310
311 311 def between(self, pairs):
312 312 batch = 8 # avoid giant requests
313 313 r = []
314 314 for i in xrange(0, len(pairs), batch):
315 315 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
316 316 d = self._call("between", pairs=n)
317 317 try:
318 318 r.extend(l and decodelist(l) or [] for l in d.splitlines())
319 319 except ValueError:
320 320 self._abort(error.ResponseError(_("unexpected response:"), d))
321 321 return r
322 322
323 323 @batchable
324 324 def pushkey(self, namespace, key, old, new):
325 325 if not self.capable('pushkey'):
326 326 yield False, None
327 327 f = future()
328 328 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
329 329 yield {'namespace': encoding.fromlocal(namespace),
330 330 'key': encoding.fromlocal(key),
331 331 'old': encoding.fromlocal(old),
332 332 'new': encoding.fromlocal(new)}, f
333 333 d = f.value
334 334 d, output = d.split('\n', 1)
335 335 try:
336 336 d = bool(int(d))
337 337 except ValueError:
338 338 raise error.ResponseError(
339 339 _('push failed (unexpected response):'), d)
340 340 for l in output.splitlines(True):
341 341 self.ui.status(_('remote: '), l)
342 342 yield d
343 343
344 344 @batchable
345 345 def listkeys(self, namespace):
346 346 if not self.capable('pushkey'):
347 347 yield {}, None
348 348 f = future()
349 349 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
350 350 yield {'namespace': encoding.fromlocal(namespace)}, f
351 351 d = f.value
352 352 self.ui.debug('received listkey for "%s": %i bytes\n'
353 353 % (namespace, len(d)))
354 354 yield pushkeymod.decodekeys(d)
355 355
356 356 def stream_out(self):
357 357 return self._callstream('stream_out')
358 358
359 359 def changegroup(self, nodes, kind):
360 360 n = encodelist(nodes)
361 361 f = self._callcompressable("changegroup", roots=n)
362 362 return changegroupmod.cg1unpacker(f, 'UN')
363 363
364 364 def changegroupsubset(self, bases, heads, kind):
365 365 self.requirecap('changegroupsubset', _('look up remote changes'))
366 366 bases = encodelist(bases)
367 367 heads = encodelist(heads)
368 368 f = self._callcompressable("changegroupsubset",
369 369 bases=bases, heads=heads)
370 370 return changegroupmod.cg1unpacker(f, 'UN')
371 371
372 372 def getbundle(self, source, **kwargs):
373 373 self.requirecap('getbundle', _('look up remote changes'))
374 374 opts = {}
375 375 bundlecaps = kwargs.get('bundlecaps')
376 376 if bundlecaps is not None:
377 377 kwargs['bundlecaps'] = sorted(bundlecaps)
378 378 else:
379 379 bundlecaps = () # kwargs could have it to None
380 380 for key, value in kwargs.iteritems():
381 381 if value is None:
382 382 continue
383 383 keytype = gboptsmap.get(key)
384 384 if keytype is None:
385 385 assert False, 'unexpected'
386 386 elif keytype == 'nodes':
387 387 value = encodelist(value)
388 388 elif keytype in ('csv', 'scsv'):
389 389 value = ','.join(value)
390 390 elif keytype == 'boolean':
391 391 value = '%i' % bool(value)
392 392 elif keytype != 'plain':
393 393 raise KeyError('unknown getbundle option type %s'
394 394 % keytype)
395 395 opts[key] = value
396 396 f = self._callcompressable("getbundle", **opts)
397 397 if any((cap.startswith('HG2') for cap in bundlecaps)):
398 398 return bundle2.getunbundler(self.ui, f)
399 399 else:
400 400 return changegroupmod.cg1unpacker(f, 'UN')
401 401
402 def unbundle(self, cg, heads, source):
402 def unbundle(self, cg, heads, url):
403 403 '''Send cg (a readable file-like object representing the
404 404 changegroup to push, typically a chunkbuffer object) to the
405 405 remote server as a bundle.
406 406
407 407 When pushing a bundle10 stream, return an integer indicating the
408 408 result of the push (see localrepository.addchangegroup()).
409 409
410 When pushing a bundle20 stream, return a bundle20 stream.'''
410 When pushing a bundle20 stream, return a bundle20 stream.
411
412 `url` is the url the client thinks it's pushing to, which is
413 visible to hooks.
414 '''
411 415
412 416 if heads != ['force'] and self.capable('unbundlehash'):
413 417 heads = encodelist(['hashed',
414 418 hashlib.sha1(''.join(sorted(heads))).digest()])
415 419 else:
416 420 heads = encodelist(heads)
417 421
418 422 if util.safehasattr(cg, 'deltaheader'):
419 423 # this a bundle10, do the old style call sequence
420 424 ret, output = self._callpush("unbundle", cg, heads=heads)
421 425 if ret == "":
422 426 raise error.ResponseError(
423 427 _('push failed:'), output)
424 428 try:
425 429 ret = int(ret)
426 430 except ValueError:
427 431 raise error.ResponseError(
428 432 _('push failed (unexpected response):'), ret)
429 433
430 434 for l in output.splitlines(True):
431 435 self.ui.status(_('remote: '), l)
432 436 else:
433 437 # bundle2 push. Send a stream, fetch a stream.
434 438 stream = self._calltwowaystream('unbundle', cg, heads=heads)
435 439 ret = bundle2.getunbundler(self.ui, stream)
436 440 return ret
437 441
438 442 def debugwireargs(self, one, two, three=None, four=None, five=None):
439 443 # don't pass optional arguments left at their default value
440 444 opts = {}
441 445 if three is not None:
442 446 opts['three'] = three
443 447 if four is not None:
444 448 opts['four'] = four
445 449 return self._call('debugwireargs', one=one, two=two, **opts)
446 450
447 451 def _call(self, cmd, **args):
448 452 """execute <cmd> on the server
449 453
450 454 The command is expected to return a simple string.
451 455
452 456 returns the server reply as a string."""
453 457 raise NotImplementedError()
454 458
455 459 def _callstream(self, cmd, **args):
456 460 """execute <cmd> on the server
457 461
458 462 The command is expected to return a stream. Note that if the
459 463 command doesn't return a stream, _callstream behaves
460 464 differently for ssh and http peers.
461 465
462 466 returns the server reply as a file like object.
463 467 """
464 468 raise NotImplementedError()
465 469
466 470 def _callcompressable(self, cmd, **args):
467 471 """execute <cmd> on the server
468 472
469 473 The command is expected to return a stream.
470 474
471 475 The stream may have been compressed in some implementations. This
472 476 function takes care of the decompression. This is the only difference
473 477 with _callstream.
474 478
475 479 returns the server reply as a file like object.
476 480 """
477 481 raise NotImplementedError()
478 482
479 483 def _callpush(self, cmd, fp, **args):
480 484 """execute a <cmd> on server
481 485
482 486 The command is expected to be related to a push. Push has a special
483 487 return method.
484 488
485 489 returns the server reply as a (ret, output) tuple. ret is either
486 490 empty (error) or a stringified int.
487 491 """
488 492 raise NotImplementedError()
489 493
490 494 def _calltwowaystream(self, cmd, fp, **args):
491 495 """execute <cmd> on server
492 496
493 497 The command will send a stream to the server and get a stream in reply.
494 498 """
495 499 raise NotImplementedError()
496 500
497 501 def _abort(self, exception):
498 502 """clearly abort the wire protocol connection and raise the exception
499 503 """
500 504 raise NotImplementedError()
501 505
502 506 # server side
503 507
504 508 # wire protocol command can either return a string or one of these classes.
505 509 class streamres(object):
506 510 """wireproto reply: binary stream
507 511
508 512 The call was successful and the result is a stream.
509 513 Iterate on the `self.gen` attribute to retrieve chunks.
510 514 """
511 515 def __init__(self, gen):
512 516 self.gen = gen
513 517
514 518 class pushres(object):
515 519 """wireproto reply: success with simple integer return
516 520
517 521 The call was successful and returned an integer contained in `self.res`.
518 522 """
519 523 def __init__(self, res):
520 524 self.res = res
521 525
522 526 class pusherr(object):
523 527 """wireproto reply: failure
524 528
525 529 The call failed. The `self.res` attribute contains the error message.
526 530 """
527 531 def __init__(self, res):
528 532 self.res = res
529 533
530 534 class ooberror(object):
531 535 """wireproto reply: failure of a batch of operation
532 536
533 537 Something failed during a batch call. The error message is stored in
534 538 `self.message`.
535 539 """
536 540 def __init__(self, message):
537 541 self.message = message
538 542
539 543 def getdispatchrepo(repo, proto, command):
540 544 """Obtain the repo used for processing wire protocol commands.
541 545
542 546 The intent of this function is to serve as a monkeypatch point for
543 547 extensions that need commands to operate on different repo views under
544 548 specialized circumstances.
545 549 """
546 550 return repo.filtered('served')
547 551
548 552 def dispatch(repo, proto, command):
549 553 repo = getdispatchrepo(repo, proto, command)
550 554 func, spec = commands[command]
551 555 args = proto.getargs(spec)
552 556 return func(repo, proto, *args)
553 557
554 558 def options(cmd, keys, others):
555 559 opts = {}
556 560 for k in keys:
557 561 if k in others:
558 562 opts[k] = others[k]
559 563 del others[k]
560 564 if others:
561 565 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
562 566 % (cmd, ",".join(others)))
563 567 return opts
564 568
565 569 def bundle1allowed(repo, action):
566 570 """Whether a bundle1 operation is allowed from the server.
567 571
568 572 Priority is:
569 573
570 574 1. server.bundle1gd.<action> (if generaldelta active)
571 575 2. server.bundle1.<action>
572 576 3. server.bundle1gd (if generaldelta active)
573 577 4. server.bundle1
574 578 """
575 579 ui = repo.ui
576 580 gd = 'generaldelta' in repo.requirements
577 581
578 582 if gd:
579 583 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
580 584 if v is not None:
581 585 return v
582 586
583 587 v = ui.configbool('server', 'bundle1.%s' % action, None)
584 588 if v is not None:
585 589 return v
586 590
587 591 if gd:
588 592 v = ui.configbool('server', 'bundle1gd', None)
589 593 if v is not None:
590 594 return v
591 595
592 596 return ui.configbool('server', 'bundle1', True)
593 597
594 598 # list of commands
595 599 commands = {}
596 600
597 601 def wireprotocommand(name, args=''):
598 602 """decorator for wire protocol command"""
599 603 def register(func):
600 604 commands[name] = (func, args)
601 605 return func
602 606 return register
603 607
604 608 @wireprotocommand('batch', 'cmds *')
605 609 def batch(repo, proto, cmds, others):
606 610 repo = repo.filtered("served")
607 611 res = []
608 612 for pair in cmds.split(';'):
609 613 op, args = pair.split(' ', 1)
610 614 vals = {}
611 615 for a in args.split(','):
612 616 if a:
613 617 n, v = a.split('=')
614 618 vals[n] = unescapearg(v)
615 619 func, spec = commands[op]
616 620 if spec:
617 621 keys = spec.split()
618 622 data = {}
619 623 for k in keys:
620 624 if k == '*':
621 625 star = {}
622 626 for key in vals.keys():
623 627 if key not in keys:
624 628 star[key] = vals[key]
625 629 data['*'] = star
626 630 else:
627 631 data[k] = vals[k]
628 632 result = func(repo, proto, *[data[k] for k in keys])
629 633 else:
630 634 result = func(repo, proto)
631 635 if isinstance(result, ooberror):
632 636 return result
633 637 res.append(escapearg(result))
634 638 return ';'.join(res)
635 639
636 640 @wireprotocommand('between', 'pairs')
637 641 def between(repo, proto, pairs):
638 642 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
639 643 r = []
640 644 for b in repo.between(pairs):
641 645 r.append(encodelist(b) + "\n")
642 646 return "".join(r)
643 647
644 648 @wireprotocommand('branchmap')
645 649 def branchmap(repo, proto):
646 650 branchmap = repo.branchmap()
647 651 heads = []
648 652 for branch, nodes in branchmap.iteritems():
649 653 branchname = urlreq.quote(encoding.fromlocal(branch))
650 654 branchnodes = encodelist(nodes)
651 655 heads.append('%s %s' % (branchname, branchnodes))
652 656 return '\n'.join(heads)
653 657
654 658 @wireprotocommand('branches', 'nodes')
655 659 def branches(repo, proto, nodes):
656 660 nodes = decodelist(nodes)
657 661 r = []
658 662 for b in repo.branches(nodes):
659 663 r.append(encodelist(b) + "\n")
660 664 return "".join(r)
661 665
662 666 @wireprotocommand('clonebundles', '')
663 667 def clonebundles(repo, proto):
664 668 """Server command for returning info for available bundles to seed clones.
665 669
666 670 Clients will parse this response and determine what bundle to fetch.
667 671
668 672 Extensions may wrap this command to filter or dynamically emit data
669 673 depending on the request. e.g. you could advertise URLs for the closest
670 674 data center given the client's IP address.
671 675 """
672 676 return repo.opener.tryread('clonebundles.manifest')
673 677
674 678 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
675 679 'known', 'getbundle', 'unbundlehash', 'batch']
676 680
677 681 def _capabilities(repo, proto):
678 682 """return a list of capabilities for a repo
679 683
680 684 This function exists to allow extensions to easily wrap capabilities
681 685 computation
682 686
683 687 - returns a lists: easy to alter
684 688 - change done here will be propagated to both `capabilities` and `hello`
685 689 command without any other action needed.
686 690 """
687 691 # copy to prevent modification of the global list
688 692 caps = list(wireprotocaps)
689 693 if streamclone.allowservergeneration(repo.ui):
690 694 if repo.ui.configbool('server', 'preferuncompressed', False):
691 695 caps.append('stream-preferred')
692 696 requiredformats = repo.requirements & repo.supportedformats
693 697 # if our local revlogs are just revlogv1, add 'stream' cap
694 698 if not requiredformats - set(('revlogv1',)):
695 699 caps.append('stream')
696 700 # otherwise, add 'streamreqs' detailing our local revlog format
697 701 else:
698 702 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
699 703 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
700 704 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
701 705 caps.append('bundle2=' + urlreq.quote(capsblob))
702 706 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
703 707 caps.append(
704 708 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
705 709 if repo.ui.configbool('experimental', 'httppostargs', False):
706 710 caps.append('httppostargs')
707 711 return caps
708 712
709 713 # If you are writing an extension and consider wrapping this function. Wrap
710 714 # `_capabilities` instead.
711 715 @wireprotocommand('capabilities')
712 716 def capabilities(repo, proto):
713 717 return ' '.join(_capabilities(repo, proto))
714 718
715 719 @wireprotocommand('changegroup', 'roots')
716 720 def changegroup(repo, proto, roots):
717 721 nodes = decodelist(roots)
718 722 cg = changegroupmod.changegroup(repo, nodes, 'serve')
719 723 return streamres(proto.groupchunks(cg))
720 724
721 725 @wireprotocommand('changegroupsubset', 'bases heads')
722 726 def changegroupsubset(repo, proto, bases, heads):
723 727 bases = decodelist(bases)
724 728 heads = decodelist(heads)
725 729 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
726 730 return streamres(proto.groupchunks(cg))
727 731
728 732 @wireprotocommand('debugwireargs', 'one two *')
729 733 def debugwireargs(repo, proto, one, two, others):
730 734 # only accept optional args from the known set
731 735 opts = options('debugwireargs', ['three', 'four'], others)
732 736 return repo.debugwireargs(one, two, **opts)
733 737
734 738 # List of options accepted by getbundle.
735 739 #
736 740 # Meant to be extended by extensions. It is the extension's responsibility to
737 741 # ensure such options are properly processed in exchange.getbundle.
738 742 gboptslist = ['heads', 'common', 'bundlecaps']
739 743
740 744 @wireprotocommand('getbundle', '*')
741 745 def getbundle(repo, proto, others):
742 746 opts = options('getbundle', gboptsmap.keys(), others)
743 747 for k, v in opts.iteritems():
744 748 keytype = gboptsmap[k]
745 749 if keytype == 'nodes':
746 750 opts[k] = decodelist(v)
747 751 elif keytype == 'csv':
748 752 opts[k] = list(v.split(','))
749 753 elif keytype == 'scsv':
750 754 opts[k] = set(v.split(','))
751 755 elif keytype == 'boolean':
752 756 # Client should serialize False as '0', which is a non-empty string
753 757 # so it evaluates as a True bool.
754 758 if v == '0':
755 759 opts[k] = False
756 760 else:
757 761 opts[k] = bool(v)
758 762 elif keytype != 'plain':
759 763 raise KeyError('unknown getbundle option type %s'
760 764 % keytype)
761 765
762 766 if not bundle1allowed(repo, 'pull'):
763 767 if not exchange.bundle2requested(opts.get('bundlecaps')):
764 768 return ooberror(bundle2required)
765 769
766 770 cg = exchange.getbundle(repo, 'serve', **opts)
767 771 return streamres(proto.groupchunks(cg))
768 772
769 773 @wireprotocommand('heads')
770 774 def heads(repo, proto):
771 775 h = repo.heads()
772 776 return encodelist(h) + "\n"
773 777
774 778 @wireprotocommand('hello')
775 779 def hello(repo, proto):
776 780 '''the hello command returns a set of lines describing various
777 781 interesting things about the server, in an RFC822-like format.
778 782 Currently the only one defined is "capabilities", which
779 783 consists of a line in the form:
780 784
781 785 capabilities: space separated list of tokens
782 786 '''
783 787 return "capabilities: %s\n" % (capabilities(repo, proto))
784 788
785 789 @wireprotocommand('listkeys', 'namespace')
786 790 def listkeys(repo, proto, namespace):
787 791 d = repo.listkeys(encoding.tolocal(namespace)).items()
788 792 return pushkeymod.encodekeys(d)
789 793
790 794 @wireprotocommand('lookup', 'key')
791 795 def lookup(repo, proto, key):
792 796 try:
793 797 k = encoding.tolocal(key)
794 798 c = repo[k]
795 799 r = c.hex()
796 800 success = 1
797 801 except Exception as inst:
798 802 r = str(inst)
799 803 success = 0
800 804 return "%s %s\n" % (success, r)
801 805
802 806 @wireprotocommand('known', 'nodes *')
803 807 def known(repo, proto, nodes, others):
804 808 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
805 809
806 810 @wireprotocommand('pushkey', 'namespace key old new')
807 811 def pushkey(repo, proto, namespace, key, old, new):
808 812 # compatibility with pre-1.8 clients which were accidentally
809 813 # sending raw binary nodes rather than utf-8-encoded hex
810 814 if len(new) == 20 and new.encode('string-escape') != new:
811 815 # looks like it could be a binary node
812 816 try:
813 817 new.decode('utf-8')
814 818 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
815 819 except UnicodeDecodeError:
816 820 pass # binary, leave unmodified
817 821 else:
818 822 new = encoding.tolocal(new) # normal path
819 823
820 824 if util.safehasattr(proto, 'restore'):
821 825
822 826 proto.redirect()
823 827
824 828 try:
825 829 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
826 830 encoding.tolocal(old), new) or False
827 831 except error.Abort:
828 832 r = False
829 833
830 834 output = proto.restore()
831 835
832 836 return '%s\n%s' % (int(r), output)
833 837
834 838 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
835 839 encoding.tolocal(old), new)
836 840 return '%s\n' % int(r)
837 841
838 842 @wireprotocommand('stream_out')
839 843 def stream(repo, proto):
840 844 '''If the server supports streaming clone, it advertises the "stream"
841 845 capability with a value representing the version and flags of the repo
842 846 it is serving. Client checks to see if it understands the format.
843 847 '''
844 848 if not streamclone.allowservergeneration(repo.ui):
845 849 return '1\n'
846 850
847 851 def getstream(it):
848 852 yield '0\n'
849 853 for chunk in it:
850 854 yield chunk
851 855
852 856 try:
853 857 # LockError may be raised before the first result is yielded. Don't
854 858 # emit output until we're sure we got the lock successfully.
855 859 it = streamclone.generatev1wireproto(repo)
856 860 return streamres(getstream(it))
857 861 except error.LockError:
858 862 return '2\n'
859 863
860 864 @wireprotocommand('unbundle', 'heads')
861 865 def unbundle(repo, proto, heads):
862 866 their_heads = decodelist(heads)
863 867
864 868 try:
865 869 proto.redirect()
866 870
867 871 exchange.check_heads(repo, their_heads, 'preparing changes')
868 872
869 873 # write bundle data to temporary file because it can be big
870 874 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
871 875 fp = os.fdopen(fd, 'wb+')
872 876 r = 0
873 877 try:
874 878 proto.getfile(fp)
875 879 fp.seek(0)
876 880 gen = exchange.readbundle(repo.ui, fp, None)
877 881 if (isinstance(gen, changegroupmod.cg1unpacker)
878 882 and not bundle1allowed(repo, 'push')):
879 883 return ooberror(bundle2required)
880 884
881 885 r = exchange.unbundle(repo, gen, their_heads, 'serve',
882 886 proto._client())
883 887 if util.safehasattr(r, 'addpart'):
884 888 # The return looks streamable, we are in the bundle2 case and
885 889 # should return a stream.
886 890 return streamres(r.getchunks())
887 891 return pushres(r)
888 892
889 893 finally:
890 894 fp.close()
891 895 os.unlink(tempname)
892 896
893 897 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
894 898 # handle non-bundle2 case first
895 899 if not getattr(exc, 'duringunbundle2', False):
896 900 try:
897 901 raise
898 902 except error.Abort:
899 903 # The old code we moved used sys.stderr directly.
900 904 # We did not change it to minimise code change.
901 905 # This need to be moved to something proper.
902 906 # Feel free to do it.
903 907 sys.stderr.write("abort: %s\n" % exc)
904 908 return pushres(0)
905 909 except error.PushRaced:
906 910 return pusherr(str(exc))
907 911
908 912 bundler = bundle2.bundle20(repo.ui)
909 913 for out in getattr(exc, '_bundle2salvagedoutput', ()):
910 914 bundler.addpart(out)
911 915 try:
912 916 try:
913 917 raise
914 918 except error.PushkeyFailed as exc:
915 919 # check client caps
916 920 remotecaps = getattr(exc, '_replycaps', None)
917 921 if (remotecaps is not None
918 922 and 'pushkey' not in remotecaps.get('error', ())):
919 923 # no support remote side, fallback to Abort handler.
920 924 raise
921 925 part = bundler.newpart('error:pushkey')
922 926 part.addparam('in-reply-to', exc.partid)
923 927 if exc.namespace is not None:
924 928 part.addparam('namespace', exc.namespace, mandatory=False)
925 929 if exc.key is not None:
926 930 part.addparam('key', exc.key, mandatory=False)
927 931 if exc.new is not None:
928 932 part.addparam('new', exc.new, mandatory=False)
929 933 if exc.old is not None:
930 934 part.addparam('old', exc.old, mandatory=False)
931 935 if exc.ret is not None:
932 936 part.addparam('ret', exc.ret, mandatory=False)
933 937 except error.BundleValueError as exc:
934 938 errpart = bundler.newpart('error:unsupportedcontent')
935 939 if exc.parttype is not None:
936 940 errpart.addparam('parttype', exc.parttype)
937 941 if exc.params:
938 942 errpart.addparam('params', '\0'.join(exc.params))
939 943 except error.Abort as exc:
940 944 manargs = [('message', str(exc))]
941 945 advargs = []
942 946 if exc.hint is not None:
943 947 advargs.append(('hint', exc.hint))
944 948 bundler.addpart(bundle2.bundlepart('error:abort',
945 949 manargs, advargs))
946 950 except error.PushRaced as exc:
947 951 bundler.newpart('error:pushraced', [('message', str(exc))])
948 952 return streamres(bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now