##// END OF EJS Templates
wireproto: remove unused code...
Jun Wu -
r31073:2cf1e520 default
parent child Browse files
Show More
@@ -1,1051 +1,1050 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import itertools
12 12 import os
13 13 import tempfile
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 bin,
18 18 hex,
19 19 )
20 20
21 21 from . import (
22 22 bundle2,
23 23 changegroup as changegroupmod,
24 24 encoding,
25 25 error,
26 26 exchange,
27 27 peer,
28 28 pushkey as pushkeymod,
29 29 pycompat,
30 30 streamclone,
31 31 util,
32 32 )
33 33
34 34 urlerr = util.urlerr
35 35 urlreq = util.urlreq
36 36
37 37 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
38 38 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
39 39 'IncompatibleClient')
40 40 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
41 41
42 42 class abstractserverproto(object):
43 43 """abstract class that summarizes the protocol API
44 44
45 45 Used as reference and documentation.
46 46 """
47 47
48 48 def getargs(self, args):
49 49 """return the value for arguments in <args>
50 50
51 51 returns a list of values (same order as <args>)"""
52 52 raise NotImplementedError()
53 53
54 54 def getfile(self, fp):
55 55 """write the whole content of a file into a file like object
56 56
57 57 The file is in the form::
58 58
59 59 (<chunk-size>\n<chunk>)+0\n
60 60
61 61 chunk size is the ascii version of the int.
62 62 """
63 63 raise NotImplementedError()
64 64
65 65 def redirect(self):
66 66 """may setup interception for stdout and stderr
67 67
68 68 See also the `restore` method."""
69 69 raise NotImplementedError()
70 70
71 71 # If the `redirect` function does install interception, the `restore`
72 72 # function MUST be defined. If interception is not used, this function
73 73 # MUST NOT be defined.
74 74 #
75 75 # left commented here on purpose
76 76 #
77 77 #def restore(self):
78 78 # """reinstall previous stdout and stderr and return intercepted stdout
79 79 # """
80 80 # raise NotImplementedError()
81 81
82 82 class remotebatch(peer.batcher):
83 83 '''batches the queued calls; uses as few roundtrips as possible'''
84 84 def __init__(self, remote):
85 85 '''remote must support _submitbatch(encbatch) and
86 86 _submitone(op, encargs)'''
87 87 peer.batcher.__init__(self)
88 88 self.remote = remote
89 89 def submit(self):
90 90 req, rsp = [], []
91 91 for name, args, opts, resref in self.calls:
92 92 mtd = getattr(self.remote, name)
93 93 batchablefn = getattr(mtd, 'batchable', None)
94 94 if batchablefn is not None:
95 95 batchable = batchablefn(mtd.im_self, *args, **opts)
96 96 encargsorres, encresref = next(batchable)
97 97 if encresref:
98 98 req.append((name, encargsorres,))
99 99 rsp.append((batchable, encresref, resref,))
100 100 else:
101 101 resref.set(encargsorres)
102 102 else:
103 103 if req:
104 104 self._submitreq(req, rsp)
105 105 req, rsp = [], []
106 106 resref.set(mtd(*args, **opts))
107 107 if req:
108 108 self._submitreq(req, rsp)
109 109 def _submitreq(self, req, rsp):
110 110 encresults = self.remote._submitbatch(req)
111 111 for encres, r in zip(encresults, rsp):
112 112 batchable, encresref, resref = r
113 113 encresref.set(encres)
114 114 resref.set(next(batchable))
115 115
116 116 class remoteiterbatcher(peer.iterbatcher):
117 117 def __init__(self, remote):
118 118 super(remoteiterbatcher, self).__init__()
119 119 self._remote = remote
120 120
121 121 def __getattr__(self, name):
122 122 if not getattr(self._remote, name, False):
123 123 raise AttributeError(
124 124 'Attempted to iterbatch non-batchable call to %r' % name)
125 125 return super(remoteiterbatcher, self).__getattr__(name)
126 126
127 127 def submit(self):
128 128 """Break the batch request into many patch calls and pipeline them.
129 129
130 130 This is mostly valuable over http where request sizes can be
131 131 limited, but can be used in other places as well.
132 132 """
133 133 req, rsp = [], []
134 134 for name, args, opts, resref in self.calls:
135 135 mtd = getattr(self._remote, name)
136 136 batchable = mtd.batchable(mtd.im_self, *args, **opts)
137 137 encargsorres, encresref = next(batchable)
138 138 assert encresref
139 139 req.append((name, encargsorres))
140 140 rsp.append((batchable, encresref))
141 141 if req:
142 142 self._resultiter = self._remote._submitbatch(req)
143 143 self._rsp = rsp
144 144
145 145 def results(self):
146 146 for (batchable, encresref), encres in itertools.izip(
147 147 self._rsp, self._resultiter):
148 148 encresref.set(encres)
149 149 yield next(batchable)
150 150
151 151 # Forward a couple of names from peer to make wireproto interactions
152 152 # slightly more sensible.
153 153 batchable = peer.batchable
154 154 future = peer.future
155 155
156 156 # list of nodes encoding / decoding
157 157
158 158 def decodelist(l, sep=' '):
159 159 if l:
160 160 return map(bin, l.split(sep))
161 161 return []
162 162
163 163 def encodelist(l, sep=' '):
164 164 try:
165 165 return sep.join(map(hex, l))
166 166 except TypeError:
167 167 raise
168 168
169 169 # batched call argument encoding
170 170
171 171 def escapearg(plain):
172 172 return (plain
173 173 .replace(':', ':c')
174 174 .replace(',', ':o')
175 175 .replace(';', ':s')
176 176 .replace('=', ':e'))
177 177
178 178 def unescapearg(escaped):
179 179 return (escaped
180 180 .replace(':e', '=')
181 181 .replace(':s', ';')
182 182 .replace(':o', ',')
183 183 .replace(':c', ':'))
184 184
185 185 def encodebatchcmds(req):
186 186 """Return a ``cmds`` argument value for the ``batch`` command."""
187 187 cmds = []
188 188 for op, argsdict in req:
189 189 # Old servers didn't properly unescape argument names. So prevent
190 190 # the sending of argument names that may not be decoded properly by
191 191 # servers.
192 192 assert all(escapearg(k) == k for k in argsdict)
193 193
194 194 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
195 195 for k, v in argsdict.iteritems())
196 196 cmds.append('%s %s' % (op, args))
197 197
198 198 return ';'.join(cmds)
199 199
200 200 # mapping of options accepted by getbundle and their types
201 201 #
202 202 # Meant to be extended by extensions. It is extensions responsibility to ensure
203 203 # such options are properly processed in exchange.getbundle.
204 204 #
205 205 # supported types are:
206 206 #
207 207 # :nodes: list of binary nodes
208 208 # :csv: list of comma-separated values
209 209 # :scsv: list of comma-separated values return as set
210 210 # :plain: string with no transformation needed.
211 211 gboptsmap = {'heads': 'nodes',
212 212 'common': 'nodes',
213 213 'obsmarkers': 'boolean',
214 214 'bundlecaps': 'scsv',
215 215 'listkeys': 'csv',
216 216 'cg': 'boolean',
217 217 'cbattempted': 'boolean'}
218 218
219 219 # client side
220 220
221 221 class wirepeer(peer.peerrepository):
222 222 """Client-side interface for communicating with a peer repository.
223 223
224 224 Methods commonly call wire protocol commands of the same name.
225 225
226 226 See also httppeer.py and sshpeer.py for protocol-specific
227 227 implementations of this interface.
228 228 """
229 229 def batch(self):
230 230 if self.capable('batch'):
231 231 return remotebatch(self)
232 232 else:
233 233 return peer.localbatch(self)
234 234 def _submitbatch(self, req):
235 235 """run batch request <req> on the server
236 236
237 237 Returns an iterator of the raw responses from the server.
238 238 """
239 239 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
240 240 chunk = rsp.read(1024)
241 241 work = [chunk]
242 242 while chunk:
243 243 while ';' not in chunk and chunk:
244 244 chunk = rsp.read(1024)
245 245 work.append(chunk)
246 246 merged = ''.join(work)
247 247 while ';' in merged:
248 248 one, merged = merged.split(';', 1)
249 249 yield unescapearg(one)
250 250 chunk = rsp.read(1024)
251 251 work = [merged, chunk]
252 252 yield unescapearg(''.join(work))
253 253
254 254 def _submitone(self, op, args):
255 255 return self._call(op, **args)
256 256
257 257 def iterbatch(self):
258 258 return remoteiterbatcher(self)
259 259
260 260 @batchable
261 261 def lookup(self, key):
262 262 self.requirecap('lookup', _('look up remote revision'))
263 263 f = future()
264 264 yield {'key': encoding.fromlocal(key)}, f
265 265 d = f.value
266 266 success, data = d[:-1].split(" ", 1)
267 267 if int(success):
268 268 yield bin(data)
269 269 self._abort(error.RepoError(data))
270 270
271 271 @batchable
272 272 def heads(self):
273 273 f = future()
274 274 yield {}, f
275 275 d = f.value
276 276 try:
277 277 yield decodelist(d[:-1])
278 278 except ValueError:
279 279 self._abort(error.ResponseError(_("unexpected response:"), d))
280 280
281 281 @batchable
282 282 def known(self, nodes):
283 283 f = future()
284 284 yield {'nodes': encodelist(nodes)}, f
285 285 d = f.value
286 286 try:
287 287 yield [bool(int(b)) for b in d]
288 288 except ValueError:
289 289 self._abort(error.ResponseError(_("unexpected response:"), d))
290 290
291 291 @batchable
292 292 def branchmap(self):
293 293 f = future()
294 294 yield {}, f
295 295 d = f.value
296 296 try:
297 297 branchmap = {}
298 298 for branchpart in d.splitlines():
299 299 branchname, branchheads = branchpart.split(' ', 1)
300 300 branchname = encoding.tolocal(urlreq.unquote(branchname))
301 301 branchheads = decodelist(branchheads)
302 302 branchmap[branchname] = branchheads
303 303 yield branchmap
304 304 except TypeError:
305 305 self._abort(error.ResponseError(_("unexpected response:"), d))
306 306
307 307 def branches(self, nodes):
308 308 n = encodelist(nodes)
309 309 d = self._call("branches", nodes=n)
310 310 try:
311 311 br = [tuple(decodelist(b)) for b in d.splitlines()]
312 312 return br
313 313 except ValueError:
314 314 self._abort(error.ResponseError(_("unexpected response:"), d))
315 315
316 316 def between(self, pairs):
317 317 batch = 8 # avoid giant requests
318 318 r = []
319 319 for i in xrange(0, len(pairs), batch):
320 320 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
321 321 d = self._call("between", pairs=n)
322 322 try:
323 323 r.extend(l and decodelist(l) or [] for l in d.splitlines())
324 324 except ValueError:
325 325 self._abort(error.ResponseError(_("unexpected response:"), d))
326 326 return r
327 327
328 328 @batchable
329 329 def pushkey(self, namespace, key, old, new):
330 330 if not self.capable('pushkey'):
331 331 yield False, None
332 332 f = future()
333 333 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
334 334 yield {'namespace': encoding.fromlocal(namespace),
335 335 'key': encoding.fromlocal(key),
336 336 'old': encoding.fromlocal(old),
337 337 'new': encoding.fromlocal(new)}, f
338 338 d = f.value
339 339 d, output = d.split('\n', 1)
340 340 try:
341 341 d = bool(int(d))
342 342 except ValueError:
343 343 raise error.ResponseError(
344 344 _('push failed (unexpected response):'), d)
345 345 for l in output.splitlines(True):
346 346 self.ui.status(_('remote: '), l)
347 347 yield d
348 348
349 349 @batchable
350 350 def listkeys(self, namespace):
351 351 if not self.capable('pushkey'):
352 352 yield {}, None
353 353 f = future()
354 354 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
355 355 yield {'namespace': encoding.fromlocal(namespace)}, f
356 356 d = f.value
357 357 self.ui.debug('received listkey for "%s": %i bytes\n'
358 358 % (namespace, len(d)))
359 359 yield pushkeymod.decodekeys(d)
360 360
361 361 def stream_out(self):
362 362 return self._callstream('stream_out')
363 363
364 364 def changegroup(self, nodes, kind):
365 365 n = encodelist(nodes)
366 366 f = self._callcompressable("changegroup", roots=n)
367 367 return changegroupmod.cg1unpacker(f, 'UN')
368 368
369 369 def changegroupsubset(self, bases, heads, kind):
370 370 self.requirecap('changegroupsubset', _('look up remote changes'))
371 371 bases = encodelist(bases)
372 372 heads = encodelist(heads)
373 373 f = self._callcompressable("changegroupsubset",
374 374 bases=bases, heads=heads)
375 375 return changegroupmod.cg1unpacker(f, 'UN')
376 376
377 377 def getbundle(self, source, **kwargs):
378 378 self.requirecap('getbundle', _('look up remote changes'))
379 379 opts = {}
380 380 bundlecaps = kwargs.get('bundlecaps')
381 381 if bundlecaps is not None:
382 382 kwargs['bundlecaps'] = sorted(bundlecaps)
383 383 else:
384 384 bundlecaps = () # kwargs could have it to None
385 385 for key, value in kwargs.iteritems():
386 386 if value is None:
387 387 continue
388 388 keytype = gboptsmap.get(key)
389 389 if keytype is None:
390 390 assert False, 'unexpected'
391 391 elif keytype == 'nodes':
392 392 value = encodelist(value)
393 393 elif keytype in ('csv', 'scsv'):
394 394 value = ','.join(value)
395 395 elif keytype == 'boolean':
396 396 value = '%i' % bool(value)
397 397 elif keytype != 'plain':
398 398 raise KeyError('unknown getbundle option type %s'
399 399 % keytype)
400 400 opts[key] = value
401 401 f = self._callcompressable("getbundle", **opts)
402 402 if any((cap.startswith('HG2') for cap in bundlecaps)):
403 403 return bundle2.getunbundler(self.ui, f)
404 404 else:
405 405 return changegroupmod.cg1unpacker(f, 'UN')
406 406
407 407 def unbundle(self, cg, heads, url):
408 408 '''Send cg (a readable file-like object representing the
409 409 changegroup to push, typically a chunkbuffer object) to the
410 410 remote server as a bundle.
411 411
412 412 When pushing a bundle10 stream, return an integer indicating the
413 413 result of the push (see localrepository.addchangegroup()).
414 414
415 415 When pushing a bundle20 stream, return a bundle20 stream.
416 416
417 417 `url` is the url the client thinks it's pushing to, which is
418 418 visible to hooks.
419 419 '''
420 420
421 421 if heads != ['force'] and self.capable('unbundlehash'):
422 422 heads = encodelist(['hashed',
423 423 hashlib.sha1(''.join(sorted(heads))).digest()])
424 424 else:
425 425 heads = encodelist(heads)
426 426
427 427 if util.safehasattr(cg, 'deltaheader'):
428 428 # this a bundle10, do the old style call sequence
429 429 ret, output = self._callpush("unbundle", cg, heads=heads)
430 430 if ret == "":
431 431 raise error.ResponseError(
432 432 _('push failed:'), output)
433 433 try:
434 434 ret = int(ret)
435 435 except ValueError:
436 436 raise error.ResponseError(
437 437 _('push failed (unexpected response):'), ret)
438 438
439 439 for l in output.splitlines(True):
440 440 self.ui.status(_('remote: '), l)
441 441 else:
442 442 # bundle2 push. Send a stream, fetch a stream.
443 443 stream = self._calltwowaystream('unbundle', cg, heads=heads)
444 444 ret = bundle2.getunbundler(self.ui, stream)
445 445 return ret
446 446
447 447 def debugwireargs(self, one, two, three=None, four=None, five=None):
448 448 # don't pass optional arguments left at their default value
449 449 opts = {}
450 450 if three is not None:
451 451 opts['three'] = three
452 452 if four is not None:
453 453 opts['four'] = four
454 454 return self._call('debugwireargs', one=one, two=two, **opts)
455 455
456 456 def _call(self, cmd, **args):
457 457 """execute <cmd> on the server
458 458
459 459 The command is expected to return a simple string.
460 460
461 461 returns the server reply as a string."""
462 462 raise NotImplementedError()
463 463
464 464 def _callstream(self, cmd, **args):
465 465 """execute <cmd> on the server
466 466
467 467 The command is expected to return a stream. Note that if the
468 468 command doesn't return a stream, _callstream behaves
469 469 differently for ssh and http peers.
470 470
471 471 returns the server reply as a file like object.
472 472 """
473 473 raise NotImplementedError()
474 474
475 475 def _callcompressable(self, cmd, **args):
476 476 """execute <cmd> on the server
477 477
478 478 The command is expected to return a stream.
479 479
480 480 The stream may have been compressed in some implementations. This
481 481 function takes care of the decompression. This is the only difference
482 482 with _callstream.
483 483
484 484 returns the server reply as a file like object.
485 485 """
486 486 raise NotImplementedError()
487 487
488 488 def _callpush(self, cmd, fp, **args):
489 489 """execute a <cmd> on server
490 490
491 491 The command is expected to be related to a push. Push has a special
492 492 return method.
493 493
494 494 returns the server reply as a (ret, output) tuple. ret is either
495 495 empty (error) or a stringified int.
496 496 """
497 497 raise NotImplementedError()
498 498
499 499 def _calltwowaystream(self, cmd, fp, **args):
500 500 """execute <cmd> on server
501 501
502 502 The command will send a stream to the server and get a stream in reply.
503 503 """
504 504 raise NotImplementedError()
505 505
506 506 def _abort(self, exception):
507 507 """clearly abort the wire protocol connection and raise the exception
508 508 """
509 509 raise NotImplementedError()
510 510
511 511 # server side
512 512
513 513 # wire protocol command can either return a string or one of these classes.
514 514 class streamres(object):
515 515 """wireproto reply: binary stream
516 516
517 517 The call was successful and the result is a stream.
518 518
519 519 Accepts either a generator or an object with a ``read(size)`` method.
520 520
521 521 ``v1compressible`` indicates whether this data can be compressed to
522 522 "version 1" clients (technically: HTTP peers using
523 523 application/mercurial-0.1 media type). This flag should NOT be used on
524 524 new commands because new clients should support a more modern compression
525 525 mechanism.
526 526 """
527 527 def __init__(self, gen=None, reader=None, v1compressible=False):
528 528 self.gen = gen
529 529 self.reader = reader
530 530 self.v1compressible = v1compressible
531 531
532 532 class pushres(object):
533 533 """wireproto reply: success with simple integer return
534 534
535 535 The call was successful and returned an integer contained in `self.res`.
536 536 """
537 537 def __init__(self, res):
538 538 self.res = res
539 539
540 540 class pusherr(object):
541 541 """wireproto reply: failure
542 542
543 543 The call failed. The `self.res` attribute contains the error message.
544 544 """
545 545 def __init__(self, res):
546 546 self.res = res
547 547
548 548 class ooberror(object):
549 549 """wireproto reply: failure of a batch of operation
550 550
551 551 Something failed during a batch call. The error message is stored in
552 552 `self.message`.
553 553 """
554 554 def __init__(self, message):
555 555 self.message = message
556 556
557 557 def getdispatchrepo(repo, proto, command):
558 558 """Obtain the repo used for processing wire protocol commands.
559 559
560 560 The intent of this function is to serve as a monkeypatch point for
561 561 extensions that need commands to operate on different repo views under
562 562 specialized circumstances.
563 563 """
564 564 return repo.filtered('served')
565 565
566 566 def dispatch(repo, proto, command):
567 567 repo = getdispatchrepo(repo, proto, command)
568 568 func, spec = commands[command]
569 569 args = proto.getargs(spec)
570 570 return func(repo, proto, *args)
571 571
572 572 def options(cmd, keys, others):
573 573 opts = {}
574 574 for k in keys:
575 575 if k in others:
576 576 opts[k] = others[k]
577 577 del others[k]
578 578 if others:
579 579 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
580 580 % (cmd, ",".join(others)))
581 581 return opts
582 582
583 583 def bundle1allowed(repo, action):
584 584 """Whether a bundle1 operation is allowed from the server.
585 585
586 586 Priority is:
587 587
588 588 1. server.bundle1gd.<action> (if generaldelta active)
589 589 2. server.bundle1.<action>
590 590 3. server.bundle1gd (if generaldelta active)
591 591 4. server.bundle1
592 592 """
593 593 ui = repo.ui
594 594 gd = 'generaldelta' in repo.requirements
595 595
596 596 if gd:
597 597 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
598 598 if v is not None:
599 599 return v
600 600
601 601 v = ui.configbool('server', 'bundle1.%s' % action, None)
602 602 if v is not None:
603 603 return v
604 604
605 605 if gd:
606 606 v = ui.configbool('server', 'bundle1gd', None)
607 607 if v is not None:
608 608 return v
609 609
610 610 return ui.configbool('server', 'bundle1', True)
611 611
612 612 def supportedcompengines(ui, proto, role):
613 613 """Obtain the list of supported compression engines for a request."""
614 614 assert role in (util.CLIENTROLE, util.SERVERROLE)
615 615
616 616 compengines = util.compengines.supportedwireengines(role)
617 617
618 618 # Allow config to override default list and ordering.
619 619 if role == util.SERVERROLE:
620 620 configengines = ui.configlist('server', 'compressionengines')
621 621 config = 'server.compressionengines'
622 622 else:
623 623 # This is currently implemented mainly to facilitate testing. In most
624 624 # cases, the server should be in charge of choosing a compression engine
625 625 # because a server has the most to lose from a sub-optimal choice. (e.g.
626 626 # CPU DoS due to an expensive engine or a network DoS due to poor
627 627 # compression ratio).
628 628 configengines = ui.configlist('experimental',
629 629 'clientcompressionengines')
630 630 config = 'experimental.clientcompressionengines'
631 631
632 632 # No explicit config. Filter out the ones that aren't supposed to be
633 633 # advertised and return default ordering.
634 634 if not configengines:
635 635 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
636 636 return [e for e in compengines
637 637 if getattr(e.wireprotosupport(), attr) > 0]
638 638
639 639 # If compression engines are listed in the config, assume there is a good
640 640 # reason for it (like server operators wanting to achieve specific
641 641 # performance characteristics). So fail fast if the config references
642 642 # unusable compression engines.
643 643 validnames = set(e.name() for e in compengines)
644 644 invalidnames = set(e for e in configengines if e not in validnames)
645 645 if invalidnames:
646 646 raise error.Abort(_('invalid compression engine defined in %s: %s') %
647 647 (config, ', '.join(sorted(invalidnames))))
648 648
649 649 compengines = [e for e in compengines if e.name() in configengines]
650 650 compengines = sorted(compengines,
651 651 key=lambda e: configengines.index(e.name()))
652 652
653 653 if not compengines:
654 654 raise error.Abort(_('%s config option does not specify any known '
655 655 'compression engines') % config,
656 656 hint=_('usable compression engines: %s') %
657 657 ', '.sorted(validnames))
658 658
659 659 return compengines
660 660
661 661 # list of commands
662 662 commands = {}
663 663
664 664 def wireprotocommand(name, args=''):
665 665 """decorator for wire protocol command"""
666 666 def register(func):
667 667 commands[name] = (func, args)
668 668 return func
669 669 return register
670 670
671 671 @wireprotocommand('batch', 'cmds *')
672 672 def batch(repo, proto, cmds, others):
673 673 repo = repo.filtered("served")
674 674 res = []
675 675 for pair in cmds.split(';'):
676 676 op, args = pair.split(' ', 1)
677 677 vals = {}
678 678 for a in args.split(','):
679 679 if a:
680 680 n, v = a.split('=')
681 681 vals[unescapearg(n)] = unescapearg(v)
682 682 func, spec = commands[op]
683 683 if spec:
684 684 keys = spec.split()
685 685 data = {}
686 686 for k in keys:
687 687 if k == '*':
688 688 star = {}
689 689 for key in vals.keys():
690 690 if key not in keys:
691 691 star[key] = vals[key]
692 692 data['*'] = star
693 693 else:
694 694 data[k] = vals[k]
695 695 result = func(repo, proto, *[data[k] for k in keys])
696 696 else:
697 697 result = func(repo, proto)
698 698 if isinstance(result, ooberror):
699 699 return result
700 700 res.append(escapearg(result))
701 701 return ';'.join(res)
702 702
703 703 @wireprotocommand('between', 'pairs')
704 704 def between(repo, proto, pairs):
705 705 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
706 706 r = []
707 707 for b in repo.between(pairs):
708 708 r.append(encodelist(b) + "\n")
709 709 return "".join(r)
710 710
711 711 @wireprotocommand('branchmap')
712 712 def branchmap(repo, proto):
713 713 branchmap = repo.branchmap()
714 714 heads = []
715 715 for branch, nodes in branchmap.iteritems():
716 716 branchname = urlreq.quote(encoding.fromlocal(branch))
717 717 branchnodes = encodelist(nodes)
718 718 heads.append('%s %s' % (branchname, branchnodes))
719 719 return '\n'.join(heads)
720 720
721 721 @wireprotocommand('branches', 'nodes')
722 722 def branches(repo, proto, nodes):
723 723 nodes = decodelist(nodes)
724 724 r = []
725 725 for b in repo.branches(nodes):
726 726 r.append(encodelist(b) + "\n")
727 727 return "".join(r)
728 728
729 729 @wireprotocommand('clonebundles', '')
730 730 def clonebundles(repo, proto):
731 731 """Server command for returning info for available bundles to seed clones.
732 732
733 733 Clients will parse this response and determine what bundle to fetch.
734 734
735 735 Extensions may wrap this command to filter or dynamically emit data
736 736 depending on the request. e.g. you could advertise URLs for the closest
737 737 data center given the client's IP address.
738 738 """
739 739 return repo.opener.tryread('clonebundles.manifest')
740 740
741 741 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
742 742 'known', 'getbundle', 'unbundlehash', 'batch']
743 743
744 744 def _capabilities(repo, proto):
745 745 """return a list of capabilities for a repo
746 746
747 747 This function exists to allow extensions to easily wrap capabilities
748 748 computation
749 749
750 750 - returns a lists: easy to alter
751 751 - change done here will be propagated to both `capabilities` and `hello`
752 752 command without any other action needed.
753 753 """
754 754 # copy to prevent modification of the global list
755 755 caps = list(wireprotocaps)
756 756 if streamclone.allowservergeneration(repo.ui):
757 757 if repo.ui.configbool('server', 'preferuncompressed', False):
758 758 caps.append('stream-preferred')
759 759 requiredformats = repo.requirements & repo.supportedformats
760 760 # if our local revlogs are just revlogv1, add 'stream' cap
761 761 if not requiredformats - set(('revlogv1',)):
762 762 caps.append('stream')
763 763 # otherwise, add 'streamreqs' detailing our local revlog format
764 764 else:
765 765 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
766 766 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
767 767 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
768 768 caps.append('bundle2=' + urlreq.quote(capsblob))
769 769 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
770 770
771 771 if proto.name == 'http':
772 772 caps.append('httpheader=%d' %
773 773 repo.ui.configint('server', 'maxhttpheaderlen', 1024))
774 774 if repo.ui.configbool('experimental', 'httppostargs', False):
775 775 caps.append('httppostargs')
776 776
777 777 # FUTURE advertise 0.2rx once support is implemented
778 778 # FUTURE advertise minrx and mintx after consulting config option
779 779 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
780 780
781 781 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
782 782 if compengines:
783 783 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
784 784 for e in compengines)
785 785 caps.append('compression=%s' % comptypes)
786 786
787 787 return caps
788 788
789 789 # If you are writing an extension and consider wrapping this function. Wrap
790 790 # `_capabilities` instead.
791 791 @wireprotocommand('capabilities')
792 792 def capabilities(repo, proto):
793 793 return ' '.join(_capabilities(repo, proto))
794 794
795 795 @wireprotocommand('changegroup', 'roots')
796 796 def changegroup(repo, proto, roots):
797 797 nodes = decodelist(roots)
798 798 cg = changegroupmod.changegroup(repo, nodes, 'serve')
799 799 return streamres(reader=cg, v1compressible=True)
800 800
801 801 @wireprotocommand('changegroupsubset', 'bases heads')
802 802 def changegroupsubset(repo, proto, bases, heads):
803 803 bases = decodelist(bases)
804 804 heads = decodelist(heads)
805 805 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
806 806 return streamres(reader=cg, v1compressible=True)
807 807
808 808 @wireprotocommand('debugwireargs', 'one two *')
809 809 def debugwireargs(repo, proto, one, two, others):
810 810 # only accept optional args from the known set
811 811 opts = options('debugwireargs', ['three', 'four'], others)
812 812 return repo.debugwireargs(one, two, **opts)
813 813
814 814 @wireprotocommand('getbundle', '*')
815 815 def getbundle(repo, proto, others):
816 816 opts = options('getbundle', gboptsmap.keys(), others)
817 817 for k, v in opts.iteritems():
818 818 keytype = gboptsmap[k]
819 819 if keytype == 'nodes':
820 820 opts[k] = decodelist(v)
821 821 elif keytype == 'csv':
822 822 opts[k] = list(v.split(','))
823 823 elif keytype == 'scsv':
824 824 opts[k] = set(v.split(','))
825 825 elif keytype == 'boolean':
826 826 # Client should serialize False as '0', which is a non-empty string
827 827 # so it evaluates as a True bool.
828 828 if v == '0':
829 829 opts[k] = False
830 830 else:
831 831 opts[k] = bool(v)
832 832 elif keytype != 'plain':
833 833 raise KeyError('unknown getbundle option type %s'
834 834 % keytype)
835 835
836 836 if not bundle1allowed(repo, 'pull'):
837 837 if not exchange.bundle2requested(opts.get('bundlecaps')):
838 838 if proto.name == 'http':
839 839 return ooberror(bundle2required)
840 840 raise error.Abort(bundle2requiredmain,
841 841 hint=bundle2requiredhint)
842 842
843 #chunks = exchange.getbundlechunks(repo, 'serve', **opts)
844 843 try:
845 844 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
846 845 except error.Abort as exc:
847 846 # cleanly forward Abort error to the client
848 847 if not exchange.bundle2requested(opts.get('bundlecaps')):
849 848 if proto.name == 'http':
850 849 return ooberror(str(exc) + '\n')
851 850 raise # cannot do better for bundle1 + ssh
852 851 # bundle2 request expect a bundle2 reply
853 852 bundler = bundle2.bundle20(repo.ui)
854 853 manargs = [('message', str(exc))]
855 854 advargs = []
856 855 if exc.hint is not None:
857 856 advargs.append(('hint', exc.hint))
858 857 bundler.addpart(bundle2.bundlepart('error:abort',
859 858 manargs, advargs))
860 859 return streamres(gen=bundler.getchunks(), v1compressible=True)
861 860 return streamres(gen=chunks, v1compressible=True)
862 861
863 862 @wireprotocommand('heads')
864 863 def heads(repo, proto):
865 864 h = repo.heads()
866 865 return encodelist(h) + "\n"
867 866
868 867 @wireprotocommand('hello')
869 868 def hello(repo, proto):
870 869 '''the hello command returns a set of lines describing various
871 870 interesting things about the server, in an RFC822-like format.
872 871 Currently the only one defined is "capabilities", which
873 872 consists of a line in the form:
874 873
875 874 capabilities: space separated list of tokens
876 875 '''
877 876 return "capabilities: %s\n" % (capabilities(repo, proto))
878 877
879 878 @wireprotocommand('listkeys', 'namespace')
880 879 def listkeys(repo, proto, namespace):
881 880 d = repo.listkeys(encoding.tolocal(namespace)).items()
882 881 return pushkeymod.encodekeys(d)
883 882
884 883 @wireprotocommand('lookup', 'key')
885 884 def lookup(repo, proto, key):
886 885 try:
887 886 k = encoding.tolocal(key)
888 887 c = repo[k]
889 888 r = c.hex()
890 889 success = 1
891 890 except Exception as inst:
892 891 r = str(inst)
893 892 success = 0
894 893 return "%s %s\n" % (success, r)
895 894
896 895 @wireprotocommand('known', 'nodes *')
897 896 def known(repo, proto, nodes, others):
898 897 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
899 898
900 899 @wireprotocommand('pushkey', 'namespace key old new')
901 900 def pushkey(repo, proto, namespace, key, old, new):
902 901 # compatibility with pre-1.8 clients which were accidentally
903 902 # sending raw binary nodes rather than utf-8-encoded hex
904 903 if len(new) == 20 and new.encode('string-escape') != new:
905 904 # looks like it could be a binary node
906 905 try:
907 906 new.decode('utf-8')
908 907 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
909 908 except UnicodeDecodeError:
910 909 pass # binary, leave unmodified
911 910 else:
912 911 new = encoding.tolocal(new) # normal path
913 912
914 913 if util.safehasattr(proto, 'restore'):
915 914
916 915 proto.redirect()
917 916
918 917 try:
919 918 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
920 919 encoding.tolocal(old), new) or False
921 920 except error.Abort:
922 921 r = False
923 922
924 923 output = proto.restore()
925 924
926 925 return '%s\n%s' % (int(r), output)
927 926
928 927 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
929 928 encoding.tolocal(old), new)
930 929 return '%s\n' % int(r)
931 930
932 931 @wireprotocommand('stream_out')
933 932 def stream(repo, proto):
934 933 '''If the server supports streaming clone, it advertises the "stream"
935 934 capability with a value representing the version and flags of the repo
936 935 it is serving. Client checks to see if it understands the format.
937 936 '''
938 937 if not streamclone.allowservergeneration(repo.ui):
939 938 return '1\n'
940 939
941 940 def getstream(it):
942 941 yield '0\n'
943 942 for chunk in it:
944 943 yield chunk
945 944
946 945 try:
947 946 # LockError may be raised before the first result is yielded. Don't
948 947 # emit output until we're sure we got the lock successfully.
949 948 it = streamclone.generatev1wireproto(repo)
950 949 return streamres(gen=getstream(it))
951 950 except error.LockError:
952 951 return '2\n'
953 952
954 953 @wireprotocommand('unbundle', 'heads')
955 954 def unbundle(repo, proto, heads):
956 955 their_heads = decodelist(heads)
957 956
958 957 try:
959 958 proto.redirect()
960 959
961 960 exchange.check_heads(repo, their_heads, 'preparing changes')
962 961
963 962 # write bundle data to temporary file because it can be big
964 963 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
965 964 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
966 965 r = 0
967 966 try:
968 967 proto.getfile(fp)
969 968 fp.seek(0)
970 969 gen = exchange.readbundle(repo.ui, fp, None)
971 970 if (isinstance(gen, changegroupmod.cg1unpacker)
972 971 and not bundle1allowed(repo, 'push')):
973 972 if proto.name == 'http':
974 973 # need to special case http because stderr do not get to
975 974 # the http client on failed push so we need to abuse some
976 975 # other error type to make sure the message get to the
977 976 # user.
978 977 return ooberror(bundle2required)
979 978 raise error.Abort(bundle2requiredmain,
980 979 hint=bundle2requiredhint)
981 980
982 981 r = exchange.unbundle(repo, gen, their_heads, 'serve',
983 982 proto._client())
984 983 if util.safehasattr(r, 'addpart'):
985 984 # The return looks streamable, we are in the bundle2 case and
986 985 # should return a stream.
987 986 return streamres(gen=r.getchunks())
988 987 return pushres(r)
989 988
990 989 finally:
991 990 fp.close()
992 991 os.unlink(tempname)
993 992
994 993 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
995 994 # handle non-bundle2 case first
996 995 if not getattr(exc, 'duringunbundle2', False):
997 996 try:
998 997 raise
999 998 except error.Abort:
1000 999 # The old code we moved used util.stderr directly.
1001 1000 # We did not change it to minimise code change.
1002 1001 # This need to be moved to something proper.
1003 1002 # Feel free to do it.
1004 1003 util.stderr.write("abort: %s\n" % exc)
1005 1004 if exc.hint is not None:
1006 1005 util.stderr.write("(%s)\n" % exc.hint)
1007 1006 return pushres(0)
1008 1007 except error.PushRaced:
1009 1008 return pusherr(str(exc))
1010 1009
1011 1010 bundler = bundle2.bundle20(repo.ui)
1012 1011 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1013 1012 bundler.addpart(out)
1014 1013 try:
1015 1014 try:
1016 1015 raise
1017 1016 except error.PushkeyFailed as exc:
1018 1017 # check client caps
1019 1018 remotecaps = getattr(exc, '_replycaps', None)
1020 1019 if (remotecaps is not None
1021 1020 and 'pushkey' not in remotecaps.get('error', ())):
1022 1021 # no support remote side, fallback to Abort handler.
1023 1022 raise
1024 1023 part = bundler.newpart('error:pushkey')
1025 1024 part.addparam('in-reply-to', exc.partid)
1026 1025 if exc.namespace is not None:
1027 1026 part.addparam('namespace', exc.namespace, mandatory=False)
1028 1027 if exc.key is not None:
1029 1028 part.addparam('key', exc.key, mandatory=False)
1030 1029 if exc.new is not None:
1031 1030 part.addparam('new', exc.new, mandatory=False)
1032 1031 if exc.old is not None:
1033 1032 part.addparam('old', exc.old, mandatory=False)
1034 1033 if exc.ret is not None:
1035 1034 part.addparam('ret', exc.ret, mandatory=False)
1036 1035 except error.BundleValueError as exc:
1037 1036 errpart = bundler.newpart('error:unsupportedcontent')
1038 1037 if exc.parttype is not None:
1039 1038 errpart.addparam('parttype', exc.parttype)
1040 1039 if exc.params:
1041 1040 errpart.addparam('params', '\0'.join(exc.params))
1042 1041 except error.Abort as exc:
1043 1042 manargs = [('message', str(exc))]
1044 1043 advargs = []
1045 1044 if exc.hint is not None:
1046 1045 advargs.append(('hint', exc.hint))
1047 1046 bundler.addpart(bundle2.bundlepart('error:abort',
1048 1047 manargs, advargs))
1049 1048 except error.PushRaced as exc:
1050 1049 bundler.newpart('error:pushraced', [('message', str(exc))])
1051 1050 return streamres(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now