##// END OF EJS Templates
wireproto: extract repo filtering to standalone function...
Gregory Szorc -
r29590:84c1a594 default
parent child Browse files
Show More
@@ -1,939 +1,948 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import itertools
12 12 import os
13 13 import sys
14 14 import tempfile
15 15
16 16 from .i18n import _
17 17 from .node import (
18 18 bin,
19 19 hex,
20 20 )
21 21
22 22 from . import (
23 23 bundle2,
24 24 changegroup as changegroupmod,
25 25 encoding,
26 26 error,
27 27 exchange,
28 28 peer,
29 29 pushkey as pushkeymod,
30 30 streamclone,
31 31 util,
32 32 )
33 33
34 34 urlerr = util.urlerr
35 35 urlreq = util.urlreq
36 36
37 37 bundle2required = _(
38 38 'incompatible Mercurial client; bundle2 required\n'
39 39 '(see https://www.mercurial-scm.org/wiki/IncompatibleClient)\n')
40 40
41 41 class abstractserverproto(object):
42 42 """abstract class that summarizes the protocol API
43 43
44 44 Used as reference and documentation.
45 45 """
46 46
47 47 def getargs(self, args):
48 48 """return the value for arguments in <args>
49 49
50 50 returns a list of values (same order as <args>)"""
51 51 raise NotImplementedError()
52 52
53 53 def getfile(self, fp):
54 54 """write the whole content of a file into a file like object
55 55
56 56 The file is in the form::
57 57
58 58 (<chunk-size>\n<chunk>)+0\n
59 59
60 60 chunk size is the ascii version of the int.
61 61 """
62 62 raise NotImplementedError()
63 63
64 64 def redirect(self):
65 65 """may setup interception for stdout and stderr
66 66
67 67 See also the `restore` method."""
68 68 raise NotImplementedError()
69 69
70 70 # If the `redirect` function does install interception, the `restore`
71 71 # function MUST be defined. If interception is not used, this function
72 72 # MUST NOT be defined.
73 73 #
74 74 # left commented here on purpose
75 75 #
76 76 #def restore(self):
77 77 # """reinstall previous stdout and stderr and return intercepted stdout
78 78 # """
79 79 # raise NotImplementedError()
80 80
81 81 def groupchunks(self, cg):
82 82 """return 4096 chunks from a changegroup object
83 83
84 84 Some protocols may have compressed the contents."""
85 85 raise NotImplementedError()
86 86
87 87 class remotebatch(peer.batcher):
88 88 '''batches the queued calls; uses as few roundtrips as possible'''
89 89 def __init__(self, remote):
90 90 '''remote must support _submitbatch(encbatch) and
91 91 _submitone(op, encargs)'''
92 92 peer.batcher.__init__(self)
93 93 self.remote = remote
94 94 def submit(self):
95 95 req, rsp = [], []
96 96 for name, args, opts, resref in self.calls:
97 97 mtd = getattr(self.remote, name)
98 98 batchablefn = getattr(mtd, 'batchable', None)
99 99 if batchablefn is not None:
100 100 batchable = batchablefn(mtd.im_self, *args, **opts)
101 101 encargsorres, encresref = next(batchable)
102 102 if encresref:
103 103 req.append((name, encargsorres,))
104 104 rsp.append((batchable, encresref, resref,))
105 105 else:
106 106 resref.set(encargsorres)
107 107 else:
108 108 if req:
109 109 self._submitreq(req, rsp)
110 110 req, rsp = [], []
111 111 resref.set(mtd(*args, **opts))
112 112 if req:
113 113 self._submitreq(req, rsp)
114 114 def _submitreq(self, req, rsp):
115 115 encresults = self.remote._submitbatch(req)
116 116 for encres, r in zip(encresults, rsp):
117 117 batchable, encresref, resref = r
118 118 encresref.set(encres)
119 119 resref.set(next(batchable))
120 120
121 121 class remoteiterbatcher(peer.iterbatcher):
122 122 def __init__(self, remote):
123 123 super(remoteiterbatcher, self).__init__()
124 124 self._remote = remote
125 125
126 126 def __getattr__(self, name):
127 127 if not getattr(self._remote, name, False):
128 128 raise AttributeError(
129 129 'Attempted to iterbatch non-batchable call to %r' % name)
130 130 return super(remoteiterbatcher, self).__getattr__(name)
131 131
132 132 def submit(self):
133 133 """Break the batch request into many patch calls and pipeline them.
134 134
135 135 This is mostly valuable over http where request sizes can be
136 136 limited, but can be used in other places as well.
137 137 """
138 138 req, rsp = [], []
139 139 for name, args, opts, resref in self.calls:
140 140 mtd = getattr(self._remote, name)
141 141 batchable = mtd.batchable(mtd.im_self, *args, **opts)
142 142 encargsorres, encresref = next(batchable)
143 143 assert encresref
144 144 req.append((name, encargsorres))
145 145 rsp.append((batchable, encresref))
146 146 if req:
147 147 self._resultiter = self._remote._submitbatch(req)
148 148 self._rsp = rsp
149 149
150 150 def results(self):
151 151 for (batchable, encresref), encres in itertools.izip(
152 152 self._rsp, self._resultiter):
153 153 encresref.set(encres)
154 154 yield next(batchable)
155 155
156 156 # Forward a couple of names from peer to make wireproto interactions
157 157 # slightly more sensible.
158 158 batchable = peer.batchable
159 159 future = peer.future
160 160
161 161 # list of nodes encoding / decoding
162 162
163 163 def decodelist(l, sep=' '):
164 164 if l:
165 165 return map(bin, l.split(sep))
166 166 return []
167 167
168 168 def encodelist(l, sep=' '):
169 169 try:
170 170 return sep.join(map(hex, l))
171 171 except TypeError:
172 172 raise
173 173
174 174 # batched call argument encoding
175 175
176 176 def escapearg(plain):
177 177 return (plain
178 178 .replace(':', ':c')
179 179 .replace(',', ':o')
180 180 .replace(';', ':s')
181 181 .replace('=', ':e'))
182 182
183 183 def unescapearg(escaped):
184 184 return (escaped
185 185 .replace(':e', '=')
186 186 .replace(':s', ';')
187 187 .replace(':o', ',')
188 188 .replace(':c', ':'))
189 189
190 190 # mapping of options accepted by getbundle and their types
191 191 #
192 192 # Meant to be extended by extensions. It is extensions responsibility to ensure
193 193 # such options are properly processed in exchange.getbundle.
194 194 #
195 195 # supported types are:
196 196 #
197 197 # :nodes: list of binary nodes
198 198 # :csv: list of comma-separated values
199 199 # :scsv: list of comma-separated values return as set
200 200 # :plain: string with no transformation needed.
201 201 gboptsmap = {'heads': 'nodes',
202 202 'common': 'nodes',
203 203 'obsmarkers': 'boolean',
204 204 'bundlecaps': 'scsv',
205 205 'listkeys': 'csv',
206 206 'cg': 'boolean',
207 207 'cbattempted': 'boolean'}
208 208
209 209 # client side
210 210
211 211 class wirepeer(peer.peerrepository):
212 212 """Client-side interface for communicating with a peer repository.
213 213
214 214 Methods commonly call wire protocol commands of the same name.
215 215
216 216 See also httppeer.py and sshpeer.py for protocol-specific
217 217 implementations of this interface.
218 218 """
219 219 def batch(self):
220 220 if self.capable('batch'):
221 221 return remotebatch(self)
222 222 else:
223 223 return peer.localbatch(self)
224 224 def _submitbatch(self, req):
225 225 """run batch request <req> on the server
226 226
227 227 Returns an iterator of the raw responses from the server.
228 228 """
229 229 cmds = []
230 230 for op, argsdict in req:
231 231 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
232 232 for k, v in argsdict.iteritems())
233 233 cmds.append('%s %s' % (op, args))
234 234 rsp = self._callstream("batch", cmds=';'.join(cmds))
235 235 chunk = rsp.read(1024)
236 236 work = [chunk]
237 237 while chunk:
238 238 while ';' not in chunk and chunk:
239 239 chunk = rsp.read(1024)
240 240 work.append(chunk)
241 241 merged = ''.join(work)
242 242 while ';' in merged:
243 243 one, merged = merged.split(';', 1)
244 244 yield unescapearg(one)
245 245 chunk = rsp.read(1024)
246 246 work = [merged, chunk]
247 247 yield unescapearg(''.join(work))
248 248
249 249 def _submitone(self, op, args):
250 250 return self._call(op, **args)
251 251
252 252 def iterbatch(self):
253 253 return remoteiterbatcher(self)
254 254
255 255 @batchable
256 256 def lookup(self, key):
257 257 self.requirecap('lookup', _('look up remote revision'))
258 258 f = future()
259 259 yield {'key': encoding.fromlocal(key)}, f
260 260 d = f.value
261 261 success, data = d[:-1].split(" ", 1)
262 262 if int(success):
263 263 yield bin(data)
264 264 self._abort(error.RepoError(data))
265 265
266 266 @batchable
267 267 def heads(self):
268 268 f = future()
269 269 yield {}, f
270 270 d = f.value
271 271 try:
272 272 yield decodelist(d[:-1])
273 273 except ValueError:
274 274 self._abort(error.ResponseError(_("unexpected response:"), d))
275 275
276 276 @batchable
277 277 def known(self, nodes):
278 278 f = future()
279 279 yield {'nodes': encodelist(nodes)}, f
280 280 d = f.value
281 281 try:
282 282 yield [bool(int(b)) for b in d]
283 283 except ValueError:
284 284 self._abort(error.ResponseError(_("unexpected response:"), d))
285 285
286 286 @batchable
287 287 def branchmap(self):
288 288 f = future()
289 289 yield {}, f
290 290 d = f.value
291 291 try:
292 292 branchmap = {}
293 293 for branchpart in d.splitlines():
294 294 branchname, branchheads = branchpart.split(' ', 1)
295 295 branchname = encoding.tolocal(urlreq.unquote(branchname))
296 296 branchheads = decodelist(branchheads)
297 297 branchmap[branchname] = branchheads
298 298 yield branchmap
299 299 except TypeError:
300 300 self._abort(error.ResponseError(_("unexpected response:"), d))
301 301
302 302 def branches(self, nodes):
303 303 n = encodelist(nodes)
304 304 d = self._call("branches", nodes=n)
305 305 try:
306 306 br = [tuple(decodelist(b)) for b in d.splitlines()]
307 307 return br
308 308 except ValueError:
309 309 self._abort(error.ResponseError(_("unexpected response:"), d))
310 310
311 311 def between(self, pairs):
312 312 batch = 8 # avoid giant requests
313 313 r = []
314 314 for i in xrange(0, len(pairs), batch):
315 315 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
316 316 d = self._call("between", pairs=n)
317 317 try:
318 318 r.extend(l and decodelist(l) or [] for l in d.splitlines())
319 319 except ValueError:
320 320 self._abort(error.ResponseError(_("unexpected response:"), d))
321 321 return r
322 322
323 323 @batchable
324 324 def pushkey(self, namespace, key, old, new):
325 325 if not self.capable('pushkey'):
326 326 yield False, None
327 327 f = future()
328 328 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
329 329 yield {'namespace': encoding.fromlocal(namespace),
330 330 'key': encoding.fromlocal(key),
331 331 'old': encoding.fromlocal(old),
332 332 'new': encoding.fromlocal(new)}, f
333 333 d = f.value
334 334 d, output = d.split('\n', 1)
335 335 try:
336 336 d = bool(int(d))
337 337 except ValueError:
338 338 raise error.ResponseError(
339 339 _('push failed (unexpected response):'), d)
340 340 for l in output.splitlines(True):
341 341 self.ui.status(_('remote: '), l)
342 342 yield d
343 343
344 344 @batchable
345 345 def listkeys(self, namespace):
346 346 if not self.capable('pushkey'):
347 347 yield {}, None
348 348 f = future()
349 349 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
350 350 yield {'namespace': encoding.fromlocal(namespace)}, f
351 351 d = f.value
352 352 self.ui.debug('received listkey for "%s": %i bytes\n'
353 353 % (namespace, len(d)))
354 354 yield pushkeymod.decodekeys(d)
355 355
356 356 def stream_out(self):
357 357 return self._callstream('stream_out')
358 358
359 359 def changegroup(self, nodes, kind):
360 360 n = encodelist(nodes)
361 361 f = self._callcompressable("changegroup", roots=n)
362 362 return changegroupmod.cg1unpacker(f, 'UN')
363 363
364 364 def changegroupsubset(self, bases, heads, kind):
365 365 self.requirecap('changegroupsubset', _('look up remote changes'))
366 366 bases = encodelist(bases)
367 367 heads = encodelist(heads)
368 368 f = self._callcompressable("changegroupsubset",
369 369 bases=bases, heads=heads)
370 370 return changegroupmod.cg1unpacker(f, 'UN')
371 371
372 372 def getbundle(self, source, **kwargs):
373 373 self.requirecap('getbundle', _('look up remote changes'))
374 374 opts = {}
375 375 bundlecaps = kwargs.get('bundlecaps')
376 376 if bundlecaps is not None:
377 377 kwargs['bundlecaps'] = sorted(bundlecaps)
378 378 else:
379 379 bundlecaps = () # kwargs could have it to None
380 380 for key, value in kwargs.iteritems():
381 381 if value is None:
382 382 continue
383 383 keytype = gboptsmap.get(key)
384 384 if keytype is None:
385 385 assert False, 'unexpected'
386 386 elif keytype == 'nodes':
387 387 value = encodelist(value)
388 388 elif keytype in ('csv', 'scsv'):
389 389 value = ','.join(value)
390 390 elif keytype == 'boolean':
391 391 value = '%i' % bool(value)
392 392 elif keytype != 'plain':
393 393 raise KeyError('unknown getbundle option type %s'
394 394 % keytype)
395 395 opts[key] = value
396 396 f = self._callcompressable("getbundle", **opts)
397 397 if any((cap.startswith('HG2') for cap in bundlecaps)):
398 398 return bundle2.getunbundler(self.ui, f)
399 399 else:
400 400 return changegroupmod.cg1unpacker(f, 'UN')
401 401
402 402 def unbundle(self, cg, heads, source):
403 403 '''Send cg (a readable file-like object representing the
404 404 changegroup to push, typically a chunkbuffer object) to the
405 405 remote server as a bundle.
406 406
407 407 When pushing a bundle10 stream, return an integer indicating the
408 408 result of the push (see localrepository.addchangegroup()).
409 409
410 410 When pushing a bundle20 stream, return a bundle20 stream.'''
411 411
412 412 if heads != ['force'] and self.capable('unbundlehash'):
413 413 heads = encodelist(['hashed',
414 414 hashlib.sha1(''.join(sorted(heads))).digest()])
415 415 else:
416 416 heads = encodelist(heads)
417 417
418 418 if util.safehasattr(cg, 'deltaheader'):
419 419 # this a bundle10, do the old style call sequence
420 420 ret, output = self._callpush("unbundle", cg, heads=heads)
421 421 if ret == "":
422 422 raise error.ResponseError(
423 423 _('push failed:'), output)
424 424 try:
425 425 ret = int(ret)
426 426 except ValueError:
427 427 raise error.ResponseError(
428 428 _('push failed (unexpected response):'), ret)
429 429
430 430 for l in output.splitlines(True):
431 431 self.ui.status(_('remote: '), l)
432 432 else:
433 433 # bundle2 push. Send a stream, fetch a stream.
434 434 stream = self._calltwowaystream('unbundle', cg, heads=heads)
435 435 ret = bundle2.getunbundler(self.ui, stream)
436 436 return ret
437 437
438 438 def debugwireargs(self, one, two, three=None, four=None, five=None):
439 439 # don't pass optional arguments left at their default value
440 440 opts = {}
441 441 if three is not None:
442 442 opts['three'] = three
443 443 if four is not None:
444 444 opts['four'] = four
445 445 return self._call('debugwireargs', one=one, two=two, **opts)
446 446
447 447 def _call(self, cmd, **args):
448 448 """execute <cmd> on the server
449 449
450 450 The command is expected to return a simple string.
451 451
452 452 returns the server reply as a string."""
453 453 raise NotImplementedError()
454 454
455 455 def _callstream(self, cmd, **args):
456 456 """execute <cmd> on the server
457 457
458 458 The command is expected to return a stream. Note that if the
459 459 command doesn't return a stream, _callstream behaves
460 460 differently for ssh and http peers.
461 461
462 462 returns the server reply as a file like object.
463 463 """
464 464 raise NotImplementedError()
465 465
466 466 def _callcompressable(self, cmd, **args):
467 467 """execute <cmd> on the server
468 468
469 469 The command is expected to return a stream.
470 470
471 471 The stream may have been compressed in some implementations. This
472 472 function takes care of the decompression. This is the only difference
473 473 with _callstream.
474 474
475 475 returns the server reply as a file like object.
476 476 """
477 477 raise NotImplementedError()
478 478
479 479 def _callpush(self, cmd, fp, **args):
480 480 """execute a <cmd> on server
481 481
482 482 The command is expected to be related to a push. Push has a special
483 483 return method.
484 484
485 485 returns the server reply as a (ret, output) tuple. ret is either
486 486 empty (error) or a stringified int.
487 487 """
488 488 raise NotImplementedError()
489 489
490 490 def _calltwowaystream(self, cmd, fp, **args):
491 491 """execute <cmd> on server
492 492
493 493 The command will send a stream to the server and get a stream in reply.
494 494 """
495 495 raise NotImplementedError()
496 496
497 497 def _abort(self, exception):
498 498 """clearly abort the wire protocol connection and raise the exception
499 499 """
500 500 raise NotImplementedError()
501 501
502 502 # server side
503 503
504 504 # wire protocol command can either return a string or one of these classes.
505 505 class streamres(object):
506 506 """wireproto reply: binary stream
507 507
508 508 The call was successful and the result is a stream.
509 509 Iterate on the `self.gen` attribute to retrieve chunks.
510 510 """
511 511 def __init__(self, gen):
512 512 self.gen = gen
513 513
514 514 class pushres(object):
515 515 """wireproto reply: success with simple integer return
516 516
517 517 The call was successful and returned an integer contained in `self.res`.
518 518 """
519 519 def __init__(self, res):
520 520 self.res = res
521 521
522 522 class pusherr(object):
523 523 """wireproto reply: failure
524 524
525 525 The call failed. The `self.res` attribute contains the error message.
526 526 """
527 527 def __init__(self, res):
528 528 self.res = res
529 529
530 530 class ooberror(object):
531 531 """wireproto reply: failure of a batch of operation
532 532
533 533 Something failed during a batch call. The error message is stored in
534 534 `self.message`.
535 535 """
536 536 def __init__(self, message):
537 537 self.message = message
538 538
539 def getdispatchrepo(repo, proto, command):
540 """Obtain the repo used for processing wire protocol commands.
541
542 The intent of this function is to serve as a monkeypatch point for
543 extensions that need commands to operate on different repo views under
544 specialized circumstances.
545 """
546 return repo.filtered('served')
547
539 548 def dispatch(repo, proto, command):
540 repo = repo.filtered("served")
549 repo = getdispatchrepo(repo, proto, command)
541 550 func, spec = commands[command]
542 551 args = proto.getargs(spec)
543 552 return func(repo, proto, *args)
544 553
545 554 def options(cmd, keys, others):
546 555 opts = {}
547 556 for k in keys:
548 557 if k in others:
549 558 opts[k] = others[k]
550 559 del others[k]
551 560 if others:
552 561 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
553 562 % (cmd, ",".join(others)))
554 563 return opts
555 564
556 565 def bundle1allowed(repo, action):
557 566 """Whether a bundle1 operation is allowed from the server.
558 567
559 568 Priority is:
560 569
561 570 1. server.bundle1gd.<action> (if generaldelta active)
562 571 2. server.bundle1.<action>
563 572 3. server.bundle1gd (if generaldelta active)
564 573 4. server.bundle1
565 574 """
566 575 ui = repo.ui
567 576 gd = 'generaldelta' in repo.requirements
568 577
569 578 if gd:
570 579 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
571 580 if v is not None:
572 581 return v
573 582
574 583 v = ui.configbool('server', 'bundle1.%s' % action, None)
575 584 if v is not None:
576 585 return v
577 586
578 587 if gd:
579 588 v = ui.configbool('server', 'bundle1gd', None)
580 589 if v is not None:
581 590 return v
582 591
583 592 return ui.configbool('server', 'bundle1', True)
584 593
585 594 # list of commands
586 595 commands = {}
587 596
588 597 def wireprotocommand(name, args=''):
589 598 """decorator for wire protocol command"""
590 599 def register(func):
591 600 commands[name] = (func, args)
592 601 return func
593 602 return register
594 603
595 604 @wireprotocommand('batch', 'cmds *')
596 605 def batch(repo, proto, cmds, others):
597 606 repo = repo.filtered("served")
598 607 res = []
599 608 for pair in cmds.split(';'):
600 609 op, args = pair.split(' ', 1)
601 610 vals = {}
602 611 for a in args.split(','):
603 612 if a:
604 613 n, v = a.split('=')
605 614 vals[n] = unescapearg(v)
606 615 func, spec = commands[op]
607 616 if spec:
608 617 keys = spec.split()
609 618 data = {}
610 619 for k in keys:
611 620 if k == '*':
612 621 star = {}
613 622 for key in vals.keys():
614 623 if key not in keys:
615 624 star[key] = vals[key]
616 625 data['*'] = star
617 626 else:
618 627 data[k] = vals[k]
619 628 result = func(repo, proto, *[data[k] for k in keys])
620 629 else:
621 630 result = func(repo, proto)
622 631 if isinstance(result, ooberror):
623 632 return result
624 633 res.append(escapearg(result))
625 634 return ';'.join(res)
626 635
627 636 @wireprotocommand('between', 'pairs')
628 637 def between(repo, proto, pairs):
629 638 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
630 639 r = []
631 640 for b in repo.between(pairs):
632 641 r.append(encodelist(b) + "\n")
633 642 return "".join(r)
634 643
635 644 @wireprotocommand('branchmap')
636 645 def branchmap(repo, proto):
637 646 branchmap = repo.branchmap()
638 647 heads = []
639 648 for branch, nodes in branchmap.iteritems():
640 649 branchname = urlreq.quote(encoding.fromlocal(branch))
641 650 branchnodes = encodelist(nodes)
642 651 heads.append('%s %s' % (branchname, branchnodes))
643 652 return '\n'.join(heads)
644 653
645 654 @wireprotocommand('branches', 'nodes')
646 655 def branches(repo, proto, nodes):
647 656 nodes = decodelist(nodes)
648 657 r = []
649 658 for b in repo.branches(nodes):
650 659 r.append(encodelist(b) + "\n")
651 660 return "".join(r)
652 661
653 662 @wireprotocommand('clonebundles', '')
654 663 def clonebundles(repo, proto):
655 664 """Server command for returning info for available bundles to seed clones.
656 665
657 666 Clients will parse this response and determine what bundle to fetch.
658 667
659 668 Extensions may wrap this command to filter or dynamically emit data
660 669 depending on the request. e.g. you could advertise URLs for the closest
661 670 data center given the client's IP address.
662 671 """
663 672 return repo.opener.tryread('clonebundles.manifest')
664 673
665 674 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
666 675 'known', 'getbundle', 'unbundlehash', 'batch']
667 676
668 677 def _capabilities(repo, proto):
669 678 """return a list of capabilities for a repo
670 679
671 680 This function exists to allow extensions to easily wrap capabilities
672 681 computation
673 682
674 683 - returns a lists: easy to alter
675 684 - change done here will be propagated to both `capabilities` and `hello`
676 685 command without any other action needed.
677 686 """
678 687 # copy to prevent modification of the global list
679 688 caps = list(wireprotocaps)
680 689 if streamclone.allowservergeneration(repo.ui):
681 690 if repo.ui.configbool('server', 'preferuncompressed', False):
682 691 caps.append('stream-preferred')
683 692 requiredformats = repo.requirements & repo.supportedformats
684 693 # if our local revlogs are just revlogv1, add 'stream' cap
685 694 if not requiredformats - set(('revlogv1',)):
686 695 caps.append('stream')
687 696 # otherwise, add 'streamreqs' detailing our local revlog format
688 697 else:
689 698 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
690 699 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
691 700 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
692 701 caps.append('bundle2=' + urlreq.quote(capsblob))
693 702 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
694 703 caps.append(
695 704 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
696 705 if repo.ui.configbool('experimental', 'httppostargs', False):
697 706 caps.append('httppostargs')
698 707 return caps
699 708
700 709 # If you are writing an extension and consider wrapping this function. Wrap
701 710 # `_capabilities` instead.
702 711 @wireprotocommand('capabilities')
703 712 def capabilities(repo, proto):
704 713 return ' '.join(_capabilities(repo, proto))
705 714
706 715 @wireprotocommand('changegroup', 'roots')
707 716 def changegroup(repo, proto, roots):
708 717 nodes = decodelist(roots)
709 718 cg = changegroupmod.changegroup(repo, nodes, 'serve')
710 719 return streamres(proto.groupchunks(cg))
711 720
712 721 @wireprotocommand('changegroupsubset', 'bases heads')
713 722 def changegroupsubset(repo, proto, bases, heads):
714 723 bases = decodelist(bases)
715 724 heads = decodelist(heads)
716 725 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
717 726 return streamres(proto.groupchunks(cg))
718 727
719 728 @wireprotocommand('debugwireargs', 'one two *')
720 729 def debugwireargs(repo, proto, one, two, others):
721 730 # only accept optional args from the known set
722 731 opts = options('debugwireargs', ['three', 'four'], others)
723 732 return repo.debugwireargs(one, two, **opts)
724 733
725 734 # List of options accepted by getbundle.
726 735 #
727 736 # Meant to be extended by extensions. It is the extension's responsibility to
728 737 # ensure such options are properly processed in exchange.getbundle.
729 738 gboptslist = ['heads', 'common', 'bundlecaps']
730 739
731 740 @wireprotocommand('getbundle', '*')
732 741 def getbundle(repo, proto, others):
733 742 opts = options('getbundle', gboptsmap.keys(), others)
734 743 for k, v in opts.iteritems():
735 744 keytype = gboptsmap[k]
736 745 if keytype == 'nodes':
737 746 opts[k] = decodelist(v)
738 747 elif keytype == 'csv':
739 748 opts[k] = list(v.split(','))
740 749 elif keytype == 'scsv':
741 750 opts[k] = set(v.split(','))
742 751 elif keytype == 'boolean':
743 752 # Client should serialize False as '0', which is a non-empty string
744 753 # so it evaluates as a True bool.
745 754 if v == '0':
746 755 opts[k] = False
747 756 else:
748 757 opts[k] = bool(v)
749 758 elif keytype != 'plain':
750 759 raise KeyError('unknown getbundle option type %s'
751 760 % keytype)
752 761
753 762 if not bundle1allowed(repo, 'pull'):
754 763 if not exchange.bundle2requested(opts.get('bundlecaps')):
755 764 return ooberror(bundle2required)
756 765
757 766 cg = exchange.getbundle(repo, 'serve', **opts)
758 767 return streamres(proto.groupchunks(cg))
759 768
760 769 @wireprotocommand('heads')
761 770 def heads(repo, proto):
762 771 h = repo.heads()
763 772 return encodelist(h) + "\n"
764 773
765 774 @wireprotocommand('hello')
766 775 def hello(repo, proto):
767 776 '''the hello command returns a set of lines describing various
768 777 interesting things about the server, in an RFC822-like format.
769 778 Currently the only one defined is "capabilities", which
770 779 consists of a line in the form:
771 780
772 781 capabilities: space separated list of tokens
773 782 '''
774 783 return "capabilities: %s\n" % (capabilities(repo, proto))
775 784
776 785 @wireprotocommand('listkeys', 'namespace')
777 786 def listkeys(repo, proto, namespace):
778 787 d = repo.listkeys(encoding.tolocal(namespace)).items()
779 788 return pushkeymod.encodekeys(d)
780 789
781 790 @wireprotocommand('lookup', 'key')
782 791 def lookup(repo, proto, key):
783 792 try:
784 793 k = encoding.tolocal(key)
785 794 c = repo[k]
786 795 r = c.hex()
787 796 success = 1
788 797 except Exception as inst:
789 798 r = str(inst)
790 799 success = 0
791 800 return "%s %s\n" % (success, r)
792 801
793 802 @wireprotocommand('known', 'nodes *')
794 803 def known(repo, proto, nodes, others):
795 804 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
796 805
797 806 @wireprotocommand('pushkey', 'namespace key old new')
798 807 def pushkey(repo, proto, namespace, key, old, new):
799 808 # compatibility with pre-1.8 clients which were accidentally
800 809 # sending raw binary nodes rather than utf-8-encoded hex
801 810 if len(new) == 20 and new.encode('string-escape') != new:
802 811 # looks like it could be a binary node
803 812 try:
804 813 new.decode('utf-8')
805 814 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
806 815 except UnicodeDecodeError:
807 816 pass # binary, leave unmodified
808 817 else:
809 818 new = encoding.tolocal(new) # normal path
810 819
811 820 if util.safehasattr(proto, 'restore'):
812 821
813 822 proto.redirect()
814 823
815 824 try:
816 825 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
817 826 encoding.tolocal(old), new) or False
818 827 except error.Abort:
819 828 r = False
820 829
821 830 output = proto.restore()
822 831
823 832 return '%s\n%s' % (int(r), output)
824 833
825 834 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
826 835 encoding.tolocal(old), new)
827 836 return '%s\n' % int(r)
828 837
829 838 @wireprotocommand('stream_out')
830 839 def stream(repo, proto):
831 840 '''If the server supports streaming clone, it advertises the "stream"
832 841 capability with a value representing the version and flags of the repo
833 842 it is serving. Client checks to see if it understands the format.
834 843 '''
835 844 if not streamclone.allowservergeneration(repo.ui):
836 845 return '1\n'
837 846
838 847 def getstream(it):
839 848 yield '0\n'
840 849 for chunk in it:
841 850 yield chunk
842 851
843 852 try:
844 853 # LockError may be raised before the first result is yielded. Don't
845 854 # emit output until we're sure we got the lock successfully.
846 855 it = streamclone.generatev1wireproto(repo)
847 856 return streamres(getstream(it))
848 857 except error.LockError:
849 858 return '2\n'
850 859
851 860 @wireprotocommand('unbundle', 'heads')
852 861 def unbundle(repo, proto, heads):
853 862 their_heads = decodelist(heads)
854 863
855 864 try:
856 865 proto.redirect()
857 866
858 867 exchange.check_heads(repo, their_heads, 'preparing changes')
859 868
860 869 # write bundle data to temporary file because it can be big
861 870 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
862 871 fp = os.fdopen(fd, 'wb+')
863 872 r = 0
864 873 try:
865 874 proto.getfile(fp)
866 875 fp.seek(0)
867 876 gen = exchange.readbundle(repo.ui, fp, None)
868 877 if (isinstance(gen, changegroupmod.cg1unpacker)
869 878 and not bundle1allowed(repo, 'push')):
870 879 return ooberror(bundle2required)
871 880
872 881 r = exchange.unbundle(repo, gen, their_heads, 'serve',
873 882 proto._client())
874 883 if util.safehasattr(r, 'addpart'):
875 884 # The return looks streamable, we are in the bundle2 case and
876 885 # should return a stream.
877 886 return streamres(r.getchunks())
878 887 return pushres(r)
879 888
880 889 finally:
881 890 fp.close()
882 891 os.unlink(tempname)
883 892
884 893 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
885 894 # handle non-bundle2 case first
886 895 if not getattr(exc, 'duringunbundle2', False):
887 896 try:
888 897 raise
889 898 except error.Abort:
890 899 # The old code we moved used sys.stderr directly.
891 900 # We did not change it to minimise code change.
892 901 # This need to be moved to something proper.
893 902 # Feel free to do it.
894 903 sys.stderr.write("abort: %s\n" % exc)
895 904 return pushres(0)
896 905 except error.PushRaced:
897 906 return pusherr(str(exc))
898 907
899 908 bundler = bundle2.bundle20(repo.ui)
900 909 for out in getattr(exc, '_bundle2salvagedoutput', ()):
901 910 bundler.addpart(out)
902 911 try:
903 912 try:
904 913 raise
905 914 except error.PushkeyFailed as exc:
906 915 # check client caps
907 916 remotecaps = getattr(exc, '_replycaps', None)
908 917 if (remotecaps is not None
909 918 and 'pushkey' not in remotecaps.get('error', ())):
910 919 # no support remote side, fallback to Abort handler.
911 920 raise
912 921 part = bundler.newpart('error:pushkey')
913 922 part.addparam('in-reply-to', exc.partid)
914 923 if exc.namespace is not None:
915 924 part.addparam('namespace', exc.namespace, mandatory=False)
916 925 if exc.key is not None:
917 926 part.addparam('key', exc.key, mandatory=False)
918 927 if exc.new is not None:
919 928 part.addparam('new', exc.new, mandatory=False)
920 929 if exc.old is not None:
921 930 part.addparam('old', exc.old, mandatory=False)
922 931 if exc.ret is not None:
923 932 part.addparam('ret', exc.ret, mandatory=False)
924 933 except error.BundleValueError as exc:
925 934 errpart = bundler.newpart('error:unsupportedcontent')
926 935 if exc.parttype is not None:
927 936 errpart.addparam('parttype', exc.parttype)
928 937 if exc.params:
929 938 errpart.addparam('params', '\0'.join(exc.params))
930 939 except error.Abort as exc:
931 940 manargs = [('message', str(exc))]
932 941 advargs = []
933 942 if exc.hint is not None:
934 943 advargs.append(('hint', exc.hint))
935 944 bundler.addpart(bundle2.bundlepart('error:abort',
936 945 manargs, advargs))
937 946 except error.PushRaced as exc:
938 947 bundler.newpart('error:pushraced', [('message', str(exc))])
939 948 return streamres(bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now