##// END OF EJS Templates
wireproto: fix lingering str(exception) with util.forcebytestr(exception)...
Augie Fackler -
r36332:be9c497e default
parent child Browse files
Show More
@@ -1,1066 +1,1067 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import os
12 12 import tempfile
13 13
14 14 from .i18n import _
15 15 from .node import (
16 16 bin,
17 17 hex,
18 18 nullid,
19 19 )
20 20
21 21 from . import (
22 22 bundle2,
23 23 changegroup as changegroupmod,
24 24 discovery,
25 25 encoding,
26 26 error,
27 27 exchange,
28 28 peer,
29 29 pushkey as pushkeymod,
30 30 pycompat,
31 31 repository,
32 32 streamclone,
33 33 util,
34 34 wireprototypes,
35 35 )
36 36
37 37 urlerr = util.urlerr
38 38 urlreq = util.urlreq
39 39
40 40 bytesresponse = wireprototypes.bytesresponse
41 41 ooberror = wireprototypes.ooberror
42 42 pushres = wireprototypes.pushres
43 43 pusherr = wireprototypes.pusherr
44 44 streamres = wireprototypes.streamres
45 45 streamres_legacy = wireprototypes.streamreslegacy
46 46
47 47 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
48 48 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
49 49 'IncompatibleClient')
50 50 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
51 51
52 52 class remoteiterbatcher(peer.iterbatcher):
53 53 def __init__(self, remote):
54 54 super(remoteiterbatcher, self).__init__()
55 55 self._remote = remote
56 56
57 57 def __getattr__(self, name):
58 58 # Validate this method is batchable, since submit() only supports
59 59 # batchable methods.
60 60 fn = getattr(self._remote, name)
61 61 if not getattr(fn, 'batchable', None):
62 62 raise error.ProgrammingError('Attempted to batch a non-batchable '
63 63 'call to %r' % name)
64 64
65 65 return super(remoteiterbatcher, self).__getattr__(name)
66 66
67 67 def submit(self):
68 68 """Break the batch request into many patch calls and pipeline them.
69 69
70 70 This is mostly valuable over http where request sizes can be
71 71 limited, but can be used in other places as well.
72 72 """
73 73 # 2-tuple of (command, arguments) that represents what will be
74 74 # sent over the wire.
75 75 requests = []
76 76
77 77 # 4-tuple of (command, final future, @batchable generator, remote
78 78 # future).
79 79 results = []
80 80
81 81 for command, args, opts, finalfuture in self.calls:
82 82 mtd = getattr(self._remote, command)
83 83 batchable = mtd.batchable(mtd.__self__, *args, **opts)
84 84
85 85 commandargs, fremote = next(batchable)
86 86 assert fremote
87 87 requests.append((command, commandargs))
88 88 results.append((command, finalfuture, batchable, fremote))
89 89
90 90 if requests:
91 91 self._resultiter = self._remote._submitbatch(requests)
92 92
93 93 self._results = results
94 94
95 95 def results(self):
96 96 for command, finalfuture, batchable, remotefuture in self._results:
97 97 # Get the raw result, set it in the remote future, feed it
98 98 # back into the @batchable generator so it can be decoded, and
99 99 # set the result on the final future to this value.
100 100 remoteresult = next(self._resultiter)
101 101 remotefuture.set(remoteresult)
102 102 finalfuture.set(next(batchable))
103 103
104 104 # Verify our @batchable generators only emit 2 values.
105 105 try:
106 106 next(batchable)
107 107 except StopIteration:
108 108 pass
109 109 else:
110 110 raise error.ProgrammingError('%s @batchable generator emitted '
111 111 'unexpected value count' % command)
112 112
113 113 yield finalfuture.value
114 114
115 115 # Forward a couple of names from peer to make wireproto interactions
116 116 # slightly more sensible.
117 117 batchable = peer.batchable
118 118 future = peer.future
119 119
120 120 # list of nodes encoding / decoding
121 121
122 122 def decodelist(l, sep=' '):
123 123 if l:
124 124 return [bin(v) for v in l.split(sep)]
125 125 return []
126 126
127 127 def encodelist(l, sep=' '):
128 128 try:
129 129 return sep.join(map(hex, l))
130 130 except TypeError:
131 131 raise
132 132
133 133 # batched call argument encoding
134 134
135 135 def escapearg(plain):
136 136 return (plain
137 137 .replace(':', ':c')
138 138 .replace(',', ':o')
139 139 .replace(';', ':s')
140 140 .replace('=', ':e'))
141 141
142 142 def unescapearg(escaped):
143 143 return (escaped
144 144 .replace(':e', '=')
145 145 .replace(':s', ';')
146 146 .replace(':o', ',')
147 147 .replace(':c', ':'))
148 148
149 149 def encodebatchcmds(req):
150 150 """Return a ``cmds`` argument value for the ``batch`` command."""
151 151 cmds = []
152 152 for op, argsdict in req:
153 153 # Old servers didn't properly unescape argument names. So prevent
154 154 # the sending of argument names that may not be decoded properly by
155 155 # servers.
156 156 assert all(escapearg(k) == k for k in argsdict)
157 157
158 158 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
159 159 for k, v in argsdict.iteritems())
160 160 cmds.append('%s %s' % (op, args))
161 161
162 162 return ';'.join(cmds)
163 163
164 164 # mapping of options accepted by getbundle and their types
165 165 #
166 166 # Meant to be extended by extensions. It is extensions responsibility to ensure
167 167 # such options are properly processed in exchange.getbundle.
168 168 #
169 169 # supported types are:
170 170 #
171 171 # :nodes: list of binary nodes
172 172 # :csv: list of comma-separated values
173 173 # :scsv: list of comma-separated values return as set
174 174 # :plain: string with no transformation needed.
175 175 gboptsmap = {'heads': 'nodes',
176 176 'bookmarks': 'boolean',
177 177 'common': 'nodes',
178 178 'obsmarkers': 'boolean',
179 179 'phases': 'boolean',
180 180 'bundlecaps': 'scsv',
181 181 'listkeys': 'csv',
182 182 'cg': 'boolean',
183 183 'cbattempted': 'boolean',
184 184 'stream': 'boolean',
185 185 }
186 186
187 187 # client side
188 188
189 189 class wirepeer(repository.legacypeer):
190 190 """Client-side interface for communicating with a peer repository.
191 191
192 192 Methods commonly call wire protocol commands of the same name.
193 193
194 194 See also httppeer.py and sshpeer.py for protocol-specific
195 195 implementations of this interface.
196 196 """
197 197 # Begin of basewirepeer interface.
198 198
199 199 def iterbatch(self):
200 200 return remoteiterbatcher(self)
201 201
202 202 @batchable
203 203 def lookup(self, key):
204 204 self.requirecap('lookup', _('look up remote revision'))
205 205 f = future()
206 206 yield {'key': encoding.fromlocal(key)}, f
207 207 d = f.value
208 208 success, data = d[:-1].split(" ", 1)
209 209 if int(success):
210 210 yield bin(data)
211 211 else:
212 212 self._abort(error.RepoError(data))
213 213
214 214 @batchable
215 215 def heads(self):
216 216 f = future()
217 217 yield {}, f
218 218 d = f.value
219 219 try:
220 220 yield decodelist(d[:-1])
221 221 except ValueError:
222 222 self._abort(error.ResponseError(_("unexpected response:"), d))
223 223
224 224 @batchable
225 225 def known(self, nodes):
226 226 f = future()
227 227 yield {'nodes': encodelist(nodes)}, f
228 228 d = f.value
229 229 try:
230 230 yield [bool(int(b)) for b in d]
231 231 except ValueError:
232 232 self._abort(error.ResponseError(_("unexpected response:"), d))
233 233
234 234 @batchable
235 235 def branchmap(self):
236 236 f = future()
237 237 yield {}, f
238 238 d = f.value
239 239 try:
240 240 branchmap = {}
241 241 for branchpart in d.splitlines():
242 242 branchname, branchheads = branchpart.split(' ', 1)
243 243 branchname = encoding.tolocal(urlreq.unquote(branchname))
244 244 branchheads = decodelist(branchheads)
245 245 branchmap[branchname] = branchheads
246 246 yield branchmap
247 247 except TypeError:
248 248 self._abort(error.ResponseError(_("unexpected response:"), d))
249 249
250 250 @batchable
251 251 def listkeys(self, namespace):
252 252 if not self.capable('pushkey'):
253 253 yield {}, None
254 254 f = future()
255 255 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
256 256 yield {'namespace': encoding.fromlocal(namespace)}, f
257 257 d = f.value
258 258 self.ui.debug('received listkey for "%s": %i bytes\n'
259 259 % (namespace, len(d)))
260 260 yield pushkeymod.decodekeys(d)
261 261
262 262 @batchable
263 263 def pushkey(self, namespace, key, old, new):
264 264 if not self.capable('pushkey'):
265 265 yield False, None
266 266 f = future()
267 267 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
268 268 yield {'namespace': encoding.fromlocal(namespace),
269 269 'key': encoding.fromlocal(key),
270 270 'old': encoding.fromlocal(old),
271 271 'new': encoding.fromlocal(new)}, f
272 272 d = f.value
273 273 d, output = d.split('\n', 1)
274 274 try:
275 275 d = bool(int(d))
276 276 except ValueError:
277 277 raise error.ResponseError(
278 278 _('push failed (unexpected response):'), d)
279 279 for l in output.splitlines(True):
280 280 self.ui.status(_('remote: '), l)
281 281 yield d
282 282
283 283 def stream_out(self):
284 284 return self._callstream('stream_out')
285 285
286 286 def getbundle(self, source, **kwargs):
287 287 kwargs = pycompat.byteskwargs(kwargs)
288 288 self.requirecap('getbundle', _('look up remote changes'))
289 289 opts = {}
290 290 bundlecaps = kwargs.get('bundlecaps')
291 291 if bundlecaps is not None:
292 292 kwargs['bundlecaps'] = sorted(bundlecaps)
293 293 else:
294 294 bundlecaps = () # kwargs could have it to None
295 295 for key, value in kwargs.iteritems():
296 296 if value is None:
297 297 continue
298 298 keytype = gboptsmap.get(key)
299 299 if keytype is None:
300 300 raise error.ProgrammingError(
301 301 'Unexpectedly None keytype for key %s' % key)
302 302 elif keytype == 'nodes':
303 303 value = encodelist(value)
304 304 elif keytype in ('csv', 'scsv'):
305 305 value = ','.join(value)
306 306 elif keytype == 'boolean':
307 307 value = '%i' % bool(value)
308 308 elif keytype != 'plain':
309 309 raise KeyError('unknown getbundle option type %s'
310 310 % keytype)
311 311 opts[key] = value
312 312 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
313 313 if any((cap.startswith('HG2') for cap in bundlecaps)):
314 314 return bundle2.getunbundler(self.ui, f)
315 315 else:
316 316 return changegroupmod.cg1unpacker(f, 'UN')
317 317
318 318 def unbundle(self, cg, heads, url):
319 319 '''Send cg (a readable file-like object representing the
320 320 changegroup to push, typically a chunkbuffer object) to the
321 321 remote server as a bundle.
322 322
323 323 When pushing a bundle10 stream, return an integer indicating the
324 324 result of the push (see changegroup.apply()).
325 325
326 326 When pushing a bundle20 stream, return a bundle20 stream.
327 327
328 328 `url` is the url the client thinks it's pushing to, which is
329 329 visible to hooks.
330 330 '''
331 331
332 332 if heads != ['force'] and self.capable('unbundlehash'):
333 333 heads = encodelist(['hashed',
334 334 hashlib.sha1(''.join(sorted(heads))).digest()])
335 335 else:
336 336 heads = encodelist(heads)
337 337
338 338 if util.safehasattr(cg, 'deltaheader'):
339 339 # this a bundle10, do the old style call sequence
340 340 ret, output = self._callpush("unbundle", cg, heads=heads)
341 341 if ret == "":
342 342 raise error.ResponseError(
343 343 _('push failed:'), output)
344 344 try:
345 345 ret = int(ret)
346 346 except ValueError:
347 347 raise error.ResponseError(
348 348 _('push failed (unexpected response):'), ret)
349 349
350 350 for l in output.splitlines(True):
351 351 self.ui.status(_('remote: '), l)
352 352 else:
353 353 # bundle2 push. Send a stream, fetch a stream.
354 354 stream = self._calltwowaystream('unbundle', cg, heads=heads)
355 355 ret = bundle2.getunbundler(self.ui, stream)
356 356 return ret
357 357
358 358 # End of basewirepeer interface.
359 359
360 360 # Begin of baselegacywirepeer interface.
361 361
362 362 def branches(self, nodes):
363 363 n = encodelist(nodes)
364 364 d = self._call("branches", nodes=n)
365 365 try:
366 366 br = [tuple(decodelist(b)) for b in d.splitlines()]
367 367 return br
368 368 except ValueError:
369 369 self._abort(error.ResponseError(_("unexpected response:"), d))
370 370
371 371 def between(self, pairs):
372 372 batch = 8 # avoid giant requests
373 373 r = []
374 374 for i in xrange(0, len(pairs), batch):
375 375 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
376 376 d = self._call("between", pairs=n)
377 377 try:
378 378 r.extend(l and decodelist(l) or [] for l in d.splitlines())
379 379 except ValueError:
380 380 self._abort(error.ResponseError(_("unexpected response:"), d))
381 381 return r
382 382
383 383 def changegroup(self, nodes, kind):
384 384 n = encodelist(nodes)
385 385 f = self._callcompressable("changegroup", roots=n)
386 386 return changegroupmod.cg1unpacker(f, 'UN')
387 387
388 388 def changegroupsubset(self, bases, heads, kind):
389 389 self.requirecap('changegroupsubset', _('look up remote changes'))
390 390 bases = encodelist(bases)
391 391 heads = encodelist(heads)
392 392 f = self._callcompressable("changegroupsubset",
393 393 bases=bases, heads=heads)
394 394 return changegroupmod.cg1unpacker(f, 'UN')
395 395
396 396 # End of baselegacywirepeer interface.
397 397
398 398 def _submitbatch(self, req):
399 399 """run batch request <req> on the server
400 400
401 401 Returns an iterator of the raw responses from the server.
402 402 """
403 403 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
404 404 chunk = rsp.read(1024)
405 405 work = [chunk]
406 406 while chunk:
407 407 while ';' not in chunk and chunk:
408 408 chunk = rsp.read(1024)
409 409 work.append(chunk)
410 410 merged = ''.join(work)
411 411 while ';' in merged:
412 412 one, merged = merged.split(';', 1)
413 413 yield unescapearg(one)
414 414 chunk = rsp.read(1024)
415 415 work = [merged, chunk]
416 416 yield unescapearg(''.join(work))
417 417
418 418 def _submitone(self, op, args):
419 419 return self._call(op, **pycompat.strkwargs(args))
420 420
421 421 def debugwireargs(self, one, two, three=None, four=None, five=None):
422 422 # don't pass optional arguments left at their default value
423 423 opts = {}
424 424 if three is not None:
425 425 opts[r'three'] = three
426 426 if four is not None:
427 427 opts[r'four'] = four
428 428 return self._call('debugwireargs', one=one, two=two, **opts)
429 429
430 430 def _call(self, cmd, **args):
431 431 """execute <cmd> on the server
432 432
433 433 The command is expected to return a simple string.
434 434
435 435 returns the server reply as a string."""
436 436 raise NotImplementedError()
437 437
438 438 def _callstream(self, cmd, **args):
439 439 """execute <cmd> on the server
440 440
441 441 The command is expected to return a stream. Note that if the
442 442 command doesn't return a stream, _callstream behaves
443 443 differently for ssh and http peers.
444 444
445 445 returns the server reply as a file like object.
446 446 """
447 447 raise NotImplementedError()
448 448
449 449 def _callcompressable(self, cmd, **args):
450 450 """execute <cmd> on the server
451 451
452 452 The command is expected to return a stream.
453 453
454 454 The stream may have been compressed in some implementations. This
455 455 function takes care of the decompression. This is the only difference
456 456 with _callstream.
457 457
458 458 returns the server reply as a file like object.
459 459 """
460 460 raise NotImplementedError()
461 461
462 462 def _callpush(self, cmd, fp, **args):
463 463 """execute a <cmd> on server
464 464
465 465 The command is expected to be related to a push. Push has a special
466 466 return method.
467 467
468 468 returns the server reply as a (ret, output) tuple. ret is either
469 469 empty (error) or a stringified int.
470 470 """
471 471 raise NotImplementedError()
472 472
473 473 def _calltwowaystream(self, cmd, fp, **args):
474 474 """execute <cmd> on server
475 475
476 476 The command will send a stream to the server and get a stream in reply.
477 477 """
478 478 raise NotImplementedError()
479 479
480 480 def _abort(self, exception):
481 481 """clearly abort the wire protocol connection and raise the exception
482 482 """
483 483 raise NotImplementedError()
484 484
485 485 # server side
486 486
487 487 # wire protocol command can either return a string or one of these classes.
488 488
489 489 def getdispatchrepo(repo, proto, command):
490 490 """Obtain the repo used for processing wire protocol commands.
491 491
492 492 The intent of this function is to serve as a monkeypatch point for
493 493 extensions that need commands to operate on different repo views under
494 494 specialized circumstances.
495 495 """
496 496 return repo.filtered('served')
497 497
498 498 def dispatch(repo, proto, command):
499 499 repo = getdispatchrepo(repo, proto, command)
500 500 func, spec = commands[command]
501 501 args = proto.getargs(spec)
502 502 return func(repo, proto, *args)
503 503
504 504 def options(cmd, keys, others):
505 505 opts = {}
506 506 for k in keys:
507 507 if k in others:
508 508 opts[k] = others[k]
509 509 del others[k]
510 510 if others:
511 511 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
512 512 % (cmd, ",".join(others)))
513 513 return opts
514 514
515 515 def bundle1allowed(repo, action):
516 516 """Whether a bundle1 operation is allowed from the server.
517 517
518 518 Priority is:
519 519
520 520 1. server.bundle1gd.<action> (if generaldelta active)
521 521 2. server.bundle1.<action>
522 522 3. server.bundle1gd (if generaldelta active)
523 523 4. server.bundle1
524 524 """
525 525 ui = repo.ui
526 526 gd = 'generaldelta' in repo.requirements
527 527
528 528 if gd:
529 529 v = ui.configbool('server', 'bundle1gd.%s' % action)
530 530 if v is not None:
531 531 return v
532 532
533 533 v = ui.configbool('server', 'bundle1.%s' % action)
534 534 if v is not None:
535 535 return v
536 536
537 537 if gd:
538 538 v = ui.configbool('server', 'bundle1gd')
539 539 if v is not None:
540 540 return v
541 541
542 542 return ui.configbool('server', 'bundle1')
543 543
544 544 def supportedcompengines(ui, role):
545 545 """Obtain the list of supported compression engines for a request."""
546 546 assert role in (util.CLIENTROLE, util.SERVERROLE)
547 547
548 548 compengines = util.compengines.supportedwireengines(role)
549 549
550 550 # Allow config to override default list and ordering.
551 551 if role == util.SERVERROLE:
552 552 configengines = ui.configlist('server', 'compressionengines')
553 553 config = 'server.compressionengines'
554 554 else:
555 555 # This is currently implemented mainly to facilitate testing. In most
556 556 # cases, the server should be in charge of choosing a compression engine
557 557 # because a server has the most to lose from a sub-optimal choice. (e.g.
558 558 # CPU DoS due to an expensive engine or a network DoS due to poor
559 559 # compression ratio).
560 560 configengines = ui.configlist('experimental',
561 561 'clientcompressionengines')
562 562 config = 'experimental.clientcompressionengines'
563 563
564 564 # No explicit config. Filter out the ones that aren't supposed to be
565 565 # advertised and return default ordering.
566 566 if not configengines:
567 567 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
568 568 return [e for e in compengines
569 569 if getattr(e.wireprotosupport(), attr) > 0]
570 570
571 571 # If compression engines are listed in the config, assume there is a good
572 572 # reason for it (like server operators wanting to achieve specific
573 573 # performance characteristics). So fail fast if the config references
574 574 # unusable compression engines.
575 575 validnames = set(e.name() for e in compengines)
576 576 invalidnames = set(e for e in configengines if e not in validnames)
577 577 if invalidnames:
578 578 raise error.Abort(_('invalid compression engine defined in %s: %s') %
579 579 (config, ', '.join(sorted(invalidnames))))
580 580
581 581 compengines = [e for e in compengines if e.name() in configengines]
582 582 compengines = sorted(compengines,
583 583 key=lambda e: configengines.index(e.name()))
584 584
585 585 if not compengines:
586 586 raise error.Abort(_('%s config option does not specify any known '
587 587 'compression engines') % config,
588 588 hint=_('usable compression engines: %s') %
589 589 ', '.sorted(validnames))
590 590
591 591 return compengines
592 592
593 593 class commandentry(object):
594 594 """Represents a declared wire protocol command."""
595 595 def __init__(self, func, args=''):
596 596 self.func = func
597 597 self.args = args
598 598
599 599 def _merge(self, func, args):
600 600 """Merge this instance with an incoming 2-tuple.
601 601
602 602 This is called when a caller using the old 2-tuple API attempts
603 603 to replace an instance. The incoming values are merged with
604 604 data not captured by the 2-tuple and a new instance containing
605 605 the union of the two objects is returned.
606 606 """
607 607 return commandentry(func, args)
608 608
609 609 # Old code treats instances as 2-tuples. So expose that interface.
610 610 def __iter__(self):
611 611 yield self.func
612 612 yield self.args
613 613
614 614 def __getitem__(self, i):
615 615 if i == 0:
616 616 return self.func
617 617 elif i == 1:
618 618 return self.args
619 619 else:
620 620 raise IndexError('can only access elements 0 and 1')
621 621
622 622 class commanddict(dict):
623 623 """Container for registered wire protocol commands.
624 624
625 625 It behaves like a dict. But __setitem__ is overwritten to allow silent
626 626 coercion of values from 2-tuples for API compatibility.
627 627 """
628 628 def __setitem__(self, k, v):
629 629 if isinstance(v, commandentry):
630 630 pass
631 631 # Cast 2-tuples to commandentry instances.
632 632 elif isinstance(v, tuple):
633 633 if len(v) != 2:
634 634 raise ValueError('command tuples must have exactly 2 elements')
635 635
636 636 # It is common for extensions to wrap wire protocol commands via
637 637 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
638 638 # doing this aren't aware of the new API that uses objects to store
639 639 # command entries, we automatically merge old state with new.
640 640 if k in self:
641 641 v = self[k]._merge(v[0], v[1])
642 642 else:
643 643 v = commandentry(v[0], v[1])
644 644 else:
645 645 raise ValueError('command entries must be commandentry instances '
646 646 'or 2-tuples')
647 647
648 648 return super(commanddict, self).__setitem__(k, v)
649 649
650 650 def commandavailable(self, command, proto):
651 651 """Determine if a command is available for the requested protocol."""
652 652 # For now, commands are available for all protocols. So do a simple
653 653 # membership test.
654 654 return command in self
655 655
656 656 commands = commanddict()
657 657
658 658 def wireprotocommand(name, args=''):
659 659 """Decorator to declare a wire protocol command.
660 660
661 661 ``name`` is the name of the wire protocol command being provided.
662 662
663 663 ``args`` is a space-delimited list of named arguments that the command
664 664 accepts. ``*`` is a special value that says to accept all arguments.
665 665 """
666 666 def register(func):
667 667 commands[name] = commandentry(func, args)
668 668 return func
669 669 return register
670 670
671 671 @wireprotocommand('batch', 'cmds *')
672 672 def batch(repo, proto, cmds, others):
673 673 repo = repo.filtered("served")
674 674 res = []
675 675 for pair in cmds.split(';'):
676 676 op, args = pair.split(' ', 1)
677 677 vals = {}
678 678 for a in args.split(','):
679 679 if a:
680 680 n, v = a.split('=')
681 681 vals[unescapearg(n)] = unescapearg(v)
682 682 func, spec = commands[op]
683 683 if spec:
684 684 keys = spec.split()
685 685 data = {}
686 686 for k in keys:
687 687 if k == '*':
688 688 star = {}
689 689 for key in vals.keys():
690 690 if key not in keys:
691 691 star[key] = vals[key]
692 692 data['*'] = star
693 693 else:
694 694 data[k] = vals[k]
695 695 result = func(repo, proto, *[data[k] for k in keys])
696 696 else:
697 697 result = func(repo, proto)
698 698 if isinstance(result, ooberror):
699 699 return result
700 700
701 701 # For now, all batchable commands must return bytesresponse or
702 702 # raw bytes (for backwards compatibility).
703 703 assert isinstance(result, (bytesresponse, bytes))
704 704 if isinstance(result, bytesresponse):
705 705 result = result.data
706 706 res.append(escapearg(result))
707 707
708 708 return bytesresponse(';'.join(res))
709 709
710 710 @wireprotocommand('between', 'pairs')
711 711 def between(repo, proto, pairs):
712 712 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
713 713 r = []
714 714 for b in repo.between(pairs):
715 715 r.append(encodelist(b) + "\n")
716 716
717 717 return bytesresponse(''.join(r))
718 718
719 719 @wireprotocommand('branchmap')
720 720 def branchmap(repo, proto):
721 721 branchmap = repo.branchmap()
722 722 heads = []
723 723 for branch, nodes in branchmap.iteritems():
724 724 branchname = urlreq.quote(encoding.fromlocal(branch))
725 725 branchnodes = encodelist(nodes)
726 726 heads.append('%s %s' % (branchname, branchnodes))
727 727
728 728 return bytesresponse('\n'.join(heads))
729 729
730 730 @wireprotocommand('branches', 'nodes')
731 731 def branches(repo, proto, nodes):
732 732 nodes = decodelist(nodes)
733 733 r = []
734 734 for b in repo.branches(nodes):
735 735 r.append(encodelist(b) + "\n")
736 736
737 737 return bytesresponse(''.join(r))
738 738
739 739 @wireprotocommand('clonebundles', '')
740 740 def clonebundles(repo, proto):
741 741 """Server command for returning info for available bundles to seed clones.
742 742
743 743 Clients will parse this response and determine what bundle to fetch.
744 744
745 745 Extensions may wrap this command to filter or dynamically emit data
746 746 depending on the request. e.g. you could advertise URLs for the closest
747 747 data center given the client's IP address.
748 748 """
749 749 return bytesresponse(repo.vfs.tryread('clonebundles.manifest'))
750 750
751 751 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
752 752 'known', 'getbundle', 'unbundlehash', 'batch']
753 753
754 754 def _capabilities(repo, proto):
755 755 """return a list of capabilities for a repo
756 756
757 757 This function exists to allow extensions to easily wrap capabilities
758 758 computation
759 759
760 760 - returns a lists: easy to alter
761 761 - change done here will be propagated to both `capabilities` and `hello`
762 762 command without any other action needed.
763 763 """
764 764 # copy to prevent modification of the global list
765 765 caps = list(wireprotocaps)
766 766 if streamclone.allowservergeneration(repo):
767 767 if repo.ui.configbool('server', 'preferuncompressed'):
768 768 caps.append('stream-preferred')
769 769 requiredformats = repo.requirements & repo.supportedformats
770 770 # if our local revlogs are just revlogv1, add 'stream' cap
771 771 if not requiredformats - {'revlogv1'}:
772 772 caps.append('stream')
773 773 # otherwise, add 'streamreqs' detailing our local revlog format
774 774 else:
775 775 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
776 776 if repo.ui.configbool('experimental', 'bundle2-advertise'):
777 777 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
778 778 caps.append('bundle2=' + urlreq.quote(capsblob))
779 779 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
780 780
781 781 if proto.name == 'http-v1':
782 782 caps.append('httpheader=%d' %
783 783 repo.ui.configint('server', 'maxhttpheaderlen'))
784 784 if repo.ui.configbool('experimental', 'httppostargs'):
785 785 caps.append('httppostargs')
786 786
787 787 # FUTURE advertise 0.2rx once support is implemented
788 788 # FUTURE advertise minrx and mintx after consulting config option
789 789 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
790 790
791 791 compengines = supportedcompengines(repo.ui, util.SERVERROLE)
792 792 if compengines:
793 793 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
794 794 for e in compengines)
795 795 caps.append('compression=%s' % comptypes)
796 796
797 797 return caps
798 798
799 799 # If you are writing an extension and consider wrapping this function. Wrap
800 800 # `_capabilities` instead.
801 801 @wireprotocommand('capabilities')
802 802 def capabilities(repo, proto):
803 803 return bytesresponse(' '.join(_capabilities(repo, proto)))
804 804
805 805 @wireprotocommand('changegroup', 'roots')
806 806 def changegroup(repo, proto, roots):
807 807 nodes = decodelist(roots)
808 808 outgoing = discovery.outgoing(repo, missingroots=nodes,
809 809 missingheads=repo.heads())
810 810 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
811 811 gen = iter(lambda: cg.read(32768), '')
812 812 return streamres(gen=gen)
813 813
814 814 @wireprotocommand('changegroupsubset', 'bases heads')
815 815 def changegroupsubset(repo, proto, bases, heads):
816 816 bases = decodelist(bases)
817 817 heads = decodelist(heads)
818 818 outgoing = discovery.outgoing(repo, missingroots=bases,
819 819 missingheads=heads)
820 820 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
821 821 gen = iter(lambda: cg.read(32768), '')
822 822 return streamres(gen=gen)
823 823
824 824 @wireprotocommand('debugwireargs', 'one two *')
825 825 def debugwireargs(repo, proto, one, two, others):
826 826 # only accept optional args from the known set
827 827 opts = options('debugwireargs', ['three', 'four'], others)
828 828 return bytesresponse(repo.debugwireargs(one, two,
829 829 **pycompat.strkwargs(opts)))
830 830
831 831 @wireprotocommand('getbundle', '*')
832 832 def getbundle(repo, proto, others):
833 833 opts = options('getbundle', gboptsmap.keys(), others)
834 834 for k, v in opts.iteritems():
835 835 keytype = gboptsmap[k]
836 836 if keytype == 'nodes':
837 837 opts[k] = decodelist(v)
838 838 elif keytype == 'csv':
839 839 opts[k] = list(v.split(','))
840 840 elif keytype == 'scsv':
841 841 opts[k] = set(v.split(','))
842 842 elif keytype == 'boolean':
843 843 # Client should serialize False as '0', which is a non-empty string
844 844 # so it evaluates as a True bool.
845 845 if v == '0':
846 846 opts[k] = False
847 847 else:
848 848 opts[k] = bool(v)
849 849 elif keytype != 'plain':
850 850 raise KeyError('unknown getbundle option type %s'
851 851 % keytype)
852 852
853 853 if not bundle1allowed(repo, 'pull'):
854 854 if not exchange.bundle2requested(opts.get('bundlecaps')):
855 855 if proto.name == 'http-v1':
856 856 return ooberror(bundle2required)
857 857 raise error.Abort(bundle2requiredmain,
858 858 hint=bundle2requiredhint)
859 859
860 860 prefercompressed = True
861 861
862 862 try:
863 863 if repo.ui.configbool('server', 'disablefullbundle'):
864 864 # Check to see if this is a full clone.
865 865 clheads = set(repo.changelog.heads())
866 866 changegroup = opts.get('cg', True)
867 867 heads = set(opts.get('heads', set()))
868 868 common = set(opts.get('common', set()))
869 869 common.discard(nullid)
870 870 if changegroup and not common and clheads == heads:
871 871 raise error.Abort(
872 872 _('server has pull-based clones disabled'),
873 873 hint=_('remove --pull if specified or upgrade Mercurial'))
874 874
875 875 info, chunks = exchange.getbundlechunks(repo, 'serve',
876 876 **pycompat.strkwargs(opts))
877 877 prefercompressed = info.get('prefercompressed', True)
878 878 except error.Abort as exc:
879 879 # cleanly forward Abort error to the client
880 880 if not exchange.bundle2requested(opts.get('bundlecaps')):
881 881 if proto.name == 'http-v1':
882 882 return ooberror(pycompat.bytestr(exc) + '\n')
883 883 raise # cannot do better for bundle1 + ssh
884 884 # bundle2 request expect a bundle2 reply
885 885 bundler = bundle2.bundle20(repo.ui)
886 886 manargs = [('message', pycompat.bytestr(exc))]
887 887 advargs = []
888 888 if exc.hint is not None:
889 889 advargs.append(('hint', exc.hint))
890 890 bundler.addpart(bundle2.bundlepart('error:abort',
891 891 manargs, advargs))
892 892 chunks = bundler.getchunks()
893 893 prefercompressed = False
894 894
895 895 return streamres(gen=chunks, prefer_uncompressed=not prefercompressed)
896 896
897 897 @wireprotocommand('heads')
898 898 def heads(repo, proto):
899 899 h = repo.heads()
900 900 return bytesresponse(encodelist(h) + '\n')
901 901
902 902 @wireprotocommand('hello')
903 903 def hello(repo, proto):
904 904 """Called as part of SSH handshake to obtain server info.
905 905
906 906 Returns a list of lines describing interesting things about the
907 907 server, in an RFC822-like format.
908 908
909 909 Currently, the only one defined is ``capabilities``, which consists of a
910 910 line of space separated tokens describing server abilities:
911 911
912 912 capabilities: <token0> <token1> <token2>
913 913 """
914 914 caps = capabilities(repo, proto).data
915 915 return bytesresponse('capabilities: %s\n' % caps)
916 916
917 917 @wireprotocommand('listkeys', 'namespace')
918 918 def listkeys(repo, proto, namespace):
919 919 d = repo.listkeys(encoding.tolocal(namespace)).items()
920 920 return bytesresponse(pushkeymod.encodekeys(d))
921 921
922 922 @wireprotocommand('lookup', 'key')
923 923 def lookup(repo, proto, key):
924 924 try:
925 925 k = encoding.tolocal(key)
926 926 c = repo[k]
927 927 r = c.hex()
928 928 success = 1
929 929 except Exception as inst:
930 r = str(inst)
930 r = util.forcebytestr(inst)
931 931 success = 0
932 932 return bytesresponse('%d %s\n' % (success, r))
933 933
934 934 @wireprotocommand('known', 'nodes *')
935 935 def known(repo, proto, nodes, others):
936 936 v = ''.join(b and '1' or '0' for b in repo.known(decodelist(nodes)))
937 937 return bytesresponse(v)
938 938
939 939 @wireprotocommand('pushkey', 'namespace key old new')
940 940 def pushkey(repo, proto, namespace, key, old, new):
941 941 # compatibility with pre-1.8 clients which were accidentally
942 942 # sending raw binary nodes rather than utf-8-encoded hex
943 943 if len(new) == 20 and util.escapestr(new) != new:
944 944 # looks like it could be a binary node
945 945 try:
946 946 new.decode('utf-8')
947 947 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
948 948 except UnicodeDecodeError:
949 949 pass # binary, leave unmodified
950 950 else:
951 951 new = encoding.tolocal(new) # normal path
952 952
953 953 with proto.mayberedirectstdio() as output:
954 954 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
955 955 encoding.tolocal(old), new) or False
956 956
957 957 output = output.getvalue() if output else ''
958 958 return bytesresponse('%s\n%s' % (int(r), output))
959 959
960 960 @wireprotocommand('stream_out')
961 961 def stream(repo, proto):
962 962 '''If the server supports streaming clone, it advertises the "stream"
963 963 capability with a value representing the version and flags of the repo
964 964 it is serving. Client checks to see if it understands the format.
965 965 '''
966 966 return streamres_legacy(streamclone.generatev1wireproto(repo))
967 967
968 968 @wireprotocommand('unbundle', 'heads')
969 969 def unbundle(repo, proto, heads):
970 970 their_heads = decodelist(heads)
971 971
972 972 with proto.mayberedirectstdio() as output:
973 973 try:
974 974 exchange.check_heads(repo, their_heads, 'preparing changes')
975 975
976 976 # write bundle data to temporary file because it can be big
977 977 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
978 978 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
979 979 r = 0
980 980 try:
981 981 proto.forwardpayload(fp)
982 982 fp.seek(0)
983 983 gen = exchange.readbundle(repo.ui, fp, None)
984 984 if (isinstance(gen, changegroupmod.cg1unpacker)
985 985 and not bundle1allowed(repo, 'push')):
986 986 if proto.name == 'http-v1':
987 987 # need to special case http because stderr do not get to
988 988 # the http client on failed push so we need to abuse
989 989 # some other error type to make sure the message get to
990 990 # the user.
991 991 return ooberror(bundle2required)
992 992 raise error.Abort(bundle2requiredmain,
993 993 hint=bundle2requiredhint)
994 994
995 995 r = exchange.unbundle(repo, gen, their_heads, 'serve',
996 996 proto.client())
997 997 if util.safehasattr(r, 'addpart'):
998 998 # The return looks streamable, we are in the bundle2 case
999 999 # and should return a stream.
1000 1000 return streamres_legacy(gen=r.getchunks())
1001 1001 return pushres(r, output.getvalue() if output else '')
1002 1002
1003 1003 finally:
1004 1004 fp.close()
1005 1005 os.unlink(tempname)
1006 1006
1007 1007 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1008 1008 # handle non-bundle2 case first
1009 1009 if not getattr(exc, 'duringunbundle2', False):
1010 1010 try:
1011 1011 raise
1012 1012 except error.Abort:
1013 1013 # The old code we moved used util.stderr directly.
1014 1014 # We did not change it to minimise code change.
1015 1015 # This need to be moved to something proper.
1016 1016 # Feel free to do it.
1017 1017 util.stderr.write("abort: %s\n" % exc)
1018 1018 if exc.hint is not None:
1019 1019 util.stderr.write("(%s)\n" % exc.hint)
1020 1020 return pushres(0, output.getvalue() if output else '')
1021 1021 except error.PushRaced:
1022 1022 return pusherr(str(exc),
1023 1023 output.getvalue() if output else '')
1024 1024
1025 1025 bundler = bundle2.bundle20(repo.ui)
1026 1026 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1027 1027 bundler.addpart(out)
1028 1028 try:
1029 1029 try:
1030 1030 raise
1031 1031 except error.PushkeyFailed as exc:
1032 1032 # check client caps
1033 1033 remotecaps = getattr(exc, '_replycaps', None)
1034 1034 if (remotecaps is not None
1035 1035 and 'pushkey' not in remotecaps.get('error', ())):
1036 1036 # no support remote side, fallback to Abort handler.
1037 1037 raise
1038 1038 part = bundler.newpart('error:pushkey')
1039 1039 part.addparam('in-reply-to', exc.partid)
1040 1040 if exc.namespace is not None:
1041 1041 part.addparam('namespace', exc.namespace,
1042 1042 mandatory=False)
1043 1043 if exc.key is not None:
1044 1044 part.addparam('key', exc.key, mandatory=False)
1045 1045 if exc.new is not None:
1046 1046 part.addparam('new', exc.new, mandatory=False)
1047 1047 if exc.old is not None:
1048 1048 part.addparam('old', exc.old, mandatory=False)
1049 1049 if exc.ret is not None:
1050 1050 part.addparam('ret', exc.ret, mandatory=False)
1051 1051 except error.BundleValueError as exc:
1052 1052 errpart = bundler.newpart('error:unsupportedcontent')
1053 1053 if exc.parttype is not None:
1054 1054 errpart.addparam('parttype', exc.parttype)
1055 1055 if exc.params:
1056 1056 errpart.addparam('params', '\0'.join(exc.params))
1057 1057 except error.Abort as exc:
1058 manargs = [('message', str(exc))]
1058 manargs = [('message', util.forcebytestr(exc))]
1059 1059 advargs = []
1060 1060 if exc.hint is not None:
1061 1061 advargs.append(('hint', exc.hint))
1062 1062 bundler.addpart(bundle2.bundlepart('error:abort',
1063 1063 manargs, advargs))
1064 1064 except error.PushRaced as exc:
1065 bundler.newpart('error:pushraced', [('message', str(exc))])
1065 bundler.newpart('error:pushraced',
1066 [('message', util.forcebytestr(exc))])
1066 1067 return streamres_legacy(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now