##// END OF EJS Templates
configitems: register the 'server.preferuncompressed' config
marmoute -
r33222:90a1b62b default
parent child Browse files
Show More
@@ -1,142 +1,145
1 1 # configitems.py - centralized declaration of configuration option
2 2 #
3 3 # Copyright 2017 Pierre-Yves David <pierre-yves.david@octobus.net>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import functools
11 11
12 12 from . import (
13 13 error,
14 14 )
15 15
16 16 def loadconfigtable(ui, extname, configtable):
17 17 """update config item known to the ui with the extension ones"""
18 18 for section, items in configtable.items():
19 19 knownitems = ui._knownconfig.setdefault(section, {})
20 20 knownkeys = set(knownitems)
21 21 newkeys = set(items)
22 22 for key in sorted(knownkeys & newkeys):
23 23 msg = "extension '%s' overwrite config item '%s.%s'"
24 24 msg %= (extname, section, key)
25 25 ui.develwarn(msg, config='warn-config')
26 26
27 27 knownitems.update(items)
28 28
29 29 class configitem(object):
30 30 """represent a known config item
31 31
32 32 :section: the official config section where to find this item,
33 33 :name: the official name within the section,
34 34 :default: default value for this item,
35 35 """
36 36
37 37 def __init__(self, section, name, default=None):
38 38 self.section = section
39 39 self.name = name
40 40 self.default = default
41 41
42 42 coreitems = {}
43 43
44 44 def _register(configtable, *args, **kwargs):
45 45 item = configitem(*args, **kwargs)
46 46 section = configtable.setdefault(item.section, {})
47 47 if item.name in section:
48 48 msg = "duplicated config item registration for '%s.%s'"
49 49 raise error.ProgrammingError(msg % (item.section, item.name))
50 50 section[item.name] = item
51 51
52 52 # Registering actual config items
53 53
54 54 def getitemregister(configtable):
55 55 return functools.partial(_register, configtable)
56 56
57 57 coreconfigitem = getitemregister(coreitems)
58 58
59 59 coreconfigitem('auth', 'cookiefile',
60 60 default=None,
61 61 )
62 62 # bookmarks.pushing: internal hack for discovery
63 63 coreconfigitem('bookmarks', 'pushing',
64 64 default=list,
65 65 )
66 66 # bundle.mainreporoot: internal hack for bundlerepo
67 67 coreconfigitem('bundle', 'mainreporoot',
68 68 default='',
69 69 )
70 70 # bundle.reorder: experimental config
71 71 coreconfigitem('bundle', 'reorder',
72 72 default='auto',
73 73 )
74 74 coreconfigitem('color', 'mode',
75 75 default='auto',
76 76 )
77 77 coreconfigitem('devel', 'all-warnings',
78 78 default=False,
79 79 )
80 80 coreconfigitem('devel', 'bundle2.debug',
81 81 default=False,
82 82 )
83 83 coreconfigitem('devel', 'check-locks',
84 84 default=False,
85 85 )
86 86 coreconfigitem('devel', 'check-relroot',
87 87 default=False,
88 88 )
89 89 coreconfigitem('devel', 'disableloaddefaultcerts',
90 90 default=False,
91 91 )
92 92 coreconfigitem('devel', 'legacy.exchange',
93 93 default=list,
94 94 )
95 95 coreconfigitem('devel', 'servercafile',
96 96 default='',
97 97 )
98 98 coreconfigitem('devel', 'serverexactprotocol',
99 99 default='',
100 100 )
101 101 coreconfigitem('devel', 'serverrequirecert',
102 102 default=False,
103 103 )
104 104 coreconfigitem('devel', 'strip-obsmarkers',
105 105 default=True,
106 106 )
107 107 coreconfigitem('hostsecurity', 'ciphers',
108 108 default=None,
109 109 )
110 110 coreconfigitem('hostsecurity', 'disabletls10warning',
111 111 default=False,
112 112 )
113 113 coreconfigitem('patch', 'fuzz',
114 114 default=2,
115 115 )
116 116 coreconfigitem('server', 'bundle1',
117 117 default=True,
118 118 )
119 119 coreconfigitem('server', 'bundle1gd',
120 120 default=None,
121 121 )
122 122 coreconfigitem('server', 'compressionengines',
123 123 default=list,
124 124 )
125 125 coreconfigitem('server', 'concurrent-push-mode',
126 126 default='strict',
127 127 )
128 128 coreconfigitem('server', 'disablefullbundle',
129 129 default=False,
130 130 )
131 131 coreconfigitem('server', 'maxhttpheaderlen',
132 132 default=1024,
133 133 )
134 coreconfigitem('server', 'preferuncompressed',
135 default=False,
136 )
134 137 coreconfigitem('ui', 'clonebundleprefers',
135 138 default=list,
136 139 )
137 140 coreconfigitem('ui', 'interactive',
138 141 default=None,
139 142 )
140 143 coreconfigitem('ui', 'quiet',
141 144 default=False,
142 145 )
@@ -1,1062 +1,1062
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import itertools
12 12 import os
13 13 import tempfile
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 bin,
18 18 hex,
19 19 nullid,
20 20 )
21 21
22 22 from . import (
23 23 bundle2,
24 24 changegroup as changegroupmod,
25 25 encoding,
26 26 error,
27 27 exchange,
28 28 peer,
29 29 pushkey as pushkeymod,
30 30 pycompat,
31 31 streamclone,
32 32 util,
33 33 )
34 34
35 35 urlerr = util.urlerr
36 36 urlreq = util.urlreq
37 37
38 38 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
39 39 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
40 40 'IncompatibleClient')
41 41 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
42 42
43 43 class abstractserverproto(object):
44 44 """abstract class that summarizes the protocol API
45 45
46 46 Used as reference and documentation.
47 47 """
48 48
49 49 def getargs(self, args):
50 50 """return the value for arguments in <args>
51 51
52 52 returns a list of values (same order as <args>)"""
53 53 raise NotImplementedError()
54 54
55 55 def getfile(self, fp):
56 56 """write the whole content of a file into a file like object
57 57
58 58 The file is in the form::
59 59
60 60 (<chunk-size>\n<chunk>)+0\n
61 61
62 62 chunk size is the ascii version of the int.
63 63 """
64 64 raise NotImplementedError()
65 65
66 66 def redirect(self):
67 67 """may setup interception for stdout and stderr
68 68
69 69 See also the `restore` method."""
70 70 raise NotImplementedError()
71 71
72 72 # If the `redirect` function does install interception, the `restore`
73 73 # function MUST be defined. If interception is not used, this function
74 74 # MUST NOT be defined.
75 75 #
76 76 # left commented here on purpose
77 77 #
78 78 #def restore(self):
79 79 # """reinstall previous stdout and stderr and return intercepted stdout
80 80 # """
81 81 # raise NotImplementedError()
82 82
83 83 class remotebatch(peer.batcher):
84 84 '''batches the queued calls; uses as few roundtrips as possible'''
85 85 def __init__(self, remote):
86 86 '''remote must support _submitbatch(encbatch) and
87 87 _submitone(op, encargs)'''
88 88 peer.batcher.__init__(self)
89 89 self.remote = remote
90 90 def submit(self):
91 91 req, rsp = [], []
92 92 for name, args, opts, resref in self.calls:
93 93 mtd = getattr(self.remote, name)
94 94 batchablefn = getattr(mtd, 'batchable', None)
95 95 if batchablefn is not None:
96 96 batchable = batchablefn(mtd.im_self, *args, **opts)
97 97 encargsorres, encresref = next(batchable)
98 98 if encresref:
99 99 req.append((name, encargsorres,))
100 100 rsp.append((batchable, encresref, resref,))
101 101 else:
102 102 resref.set(encargsorres)
103 103 else:
104 104 if req:
105 105 self._submitreq(req, rsp)
106 106 req, rsp = [], []
107 107 resref.set(mtd(*args, **opts))
108 108 if req:
109 109 self._submitreq(req, rsp)
110 110 def _submitreq(self, req, rsp):
111 111 encresults = self.remote._submitbatch(req)
112 112 for encres, r in zip(encresults, rsp):
113 113 batchable, encresref, resref = r
114 114 encresref.set(encres)
115 115 resref.set(next(batchable))
116 116
117 117 class remoteiterbatcher(peer.iterbatcher):
118 118 def __init__(self, remote):
119 119 super(remoteiterbatcher, self).__init__()
120 120 self._remote = remote
121 121
122 122 def __getattr__(self, name):
123 123 if not getattr(self._remote, name, False):
124 124 raise AttributeError(
125 125 'Attempted to iterbatch non-batchable call to %r' % name)
126 126 return super(remoteiterbatcher, self).__getattr__(name)
127 127
128 128 def submit(self):
129 129 """Break the batch request into many patch calls and pipeline them.
130 130
131 131 This is mostly valuable over http where request sizes can be
132 132 limited, but can be used in other places as well.
133 133 """
134 134 req, rsp = [], []
135 135 for name, args, opts, resref in self.calls:
136 136 mtd = getattr(self._remote, name)
137 137 batchable = mtd.batchable(mtd.im_self, *args, **opts)
138 138 encargsorres, encresref = next(batchable)
139 139 assert encresref
140 140 req.append((name, encargsorres))
141 141 rsp.append((batchable, encresref))
142 142 if req:
143 143 self._resultiter = self._remote._submitbatch(req)
144 144 self._rsp = rsp
145 145
146 146 def results(self):
147 147 for (batchable, encresref), encres in itertools.izip(
148 148 self._rsp, self._resultiter):
149 149 encresref.set(encres)
150 150 yield next(batchable)
151 151
152 152 # Forward a couple of names from peer to make wireproto interactions
153 153 # slightly more sensible.
154 154 batchable = peer.batchable
155 155 future = peer.future
156 156
157 157 # list of nodes encoding / decoding
158 158
159 159 def decodelist(l, sep=' '):
160 160 if l:
161 161 return map(bin, l.split(sep))
162 162 return []
163 163
164 164 def encodelist(l, sep=' '):
165 165 try:
166 166 return sep.join(map(hex, l))
167 167 except TypeError:
168 168 raise
169 169
170 170 # batched call argument encoding
171 171
172 172 def escapearg(plain):
173 173 return (plain
174 174 .replace(':', ':c')
175 175 .replace(',', ':o')
176 176 .replace(';', ':s')
177 177 .replace('=', ':e'))
178 178
179 179 def unescapearg(escaped):
180 180 return (escaped
181 181 .replace(':e', '=')
182 182 .replace(':s', ';')
183 183 .replace(':o', ',')
184 184 .replace(':c', ':'))
185 185
186 186 def encodebatchcmds(req):
187 187 """Return a ``cmds`` argument value for the ``batch`` command."""
188 188 cmds = []
189 189 for op, argsdict in req:
190 190 # Old servers didn't properly unescape argument names. So prevent
191 191 # the sending of argument names that may not be decoded properly by
192 192 # servers.
193 193 assert all(escapearg(k) == k for k in argsdict)
194 194
195 195 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
196 196 for k, v in argsdict.iteritems())
197 197 cmds.append('%s %s' % (op, args))
198 198
199 199 return ';'.join(cmds)
200 200
201 201 # mapping of options accepted by getbundle and their types
202 202 #
203 203 # Meant to be extended by extensions. It is extensions responsibility to ensure
204 204 # such options are properly processed in exchange.getbundle.
205 205 #
206 206 # supported types are:
207 207 #
208 208 # :nodes: list of binary nodes
209 209 # :csv: list of comma-separated values
210 210 # :scsv: list of comma-separated values return as set
211 211 # :plain: string with no transformation needed.
212 212 gboptsmap = {'heads': 'nodes',
213 213 'common': 'nodes',
214 214 'obsmarkers': 'boolean',
215 215 'bundlecaps': 'scsv',
216 216 'listkeys': 'csv',
217 217 'cg': 'boolean',
218 218 'cbattempted': 'boolean'}
219 219
220 220 # client side
221 221
222 222 class wirepeer(peer.peerrepository):
223 223 """Client-side interface for communicating with a peer repository.
224 224
225 225 Methods commonly call wire protocol commands of the same name.
226 226
227 227 See also httppeer.py and sshpeer.py for protocol-specific
228 228 implementations of this interface.
229 229 """
230 230 def batch(self):
231 231 if self.capable('batch'):
232 232 return remotebatch(self)
233 233 else:
234 234 return peer.localbatch(self)
235 235 def _submitbatch(self, req):
236 236 """run batch request <req> on the server
237 237
238 238 Returns an iterator of the raw responses from the server.
239 239 """
240 240 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
241 241 chunk = rsp.read(1024)
242 242 work = [chunk]
243 243 while chunk:
244 244 while ';' not in chunk and chunk:
245 245 chunk = rsp.read(1024)
246 246 work.append(chunk)
247 247 merged = ''.join(work)
248 248 while ';' in merged:
249 249 one, merged = merged.split(';', 1)
250 250 yield unescapearg(one)
251 251 chunk = rsp.read(1024)
252 252 work = [merged, chunk]
253 253 yield unescapearg(''.join(work))
254 254
255 255 def _submitone(self, op, args):
256 256 return self._call(op, **args)
257 257
258 258 def iterbatch(self):
259 259 return remoteiterbatcher(self)
260 260
261 261 @batchable
262 262 def lookup(self, key):
263 263 self.requirecap('lookup', _('look up remote revision'))
264 264 f = future()
265 265 yield {'key': encoding.fromlocal(key)}, f
266 266 d = f.value
267 267 success, data = d[:-1].split(" ", 1)
268 268 if int(success):
269 269 yield bin(data)
270 270 self._abort(error.RepoError(data))
271 271
272 272 @batchable
273 273 def heads(self):
274 274 f = future()
275 275 yield {}, f
276 276 d = f.value
277 277 try:
278 278 yield decodelist(d[:-1])
279 279 except ValueError:
280 280 self._abort(error.ResponseError(_("unexpected response:"), d))
281 281
282 282 @batchable
283 283 def known(self, nodes):
284 284 f = future()
285 285 yield {'nodes': encodelist(nodes)}, f
286 286 d = f.value
287 287 try:
288 288 yield [bool(int(b)) for b in d]
289 289 except ValueError:
290 290 self._abort(error.ResponseError(_("unexpected response:"), d))
291 291
292 292 @batchable
293 293 def branchmap(self):
294 294 f = future()
295 295 yield {}, f
296 296 d = f.value
297 297 try:
298 298 branchmap = {}
299 299 for branchpart in d.splitlines():
300 300 branchname, branchheads = branchpart.split(' ', 1)
301 301 branchname = encoding.tolocal(urlreq.unquote(branchname))
302 302 branchheads = decodelist(branchheads)
303 303 branchmap[branchname] = branchheads
304 304 yield branchmap
305 305 except TypeError:
306 306 self._abort(error.ResponseError(_("unexpected response:"), d))
307 307
308 308 def branches(self, nodes):
309 309 n = encodelist(nodes)
310 310 d = self._call("branches", nodes=n)
311 311 try:
312 312 br = [tuple(decodelist(b)) for b in d.splitlines()]
313 313 return br
314 314 except ValueError:
315 315 self._abort(error.ResponseError(_("unexpected response:"), d))
316 316
317 317 def between(self, pairs):
318 318 batch = 8 # avoid giant requests
319 319 r = []
320 320 for i in xrange(0, len(pairs), batch):
321 321 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
322 322 d = self._call("between", pairs=n)
323 323 try:
324 324 r.extend(l and decodelist(l) or [] for l in d.splitlines())
325 325 except ValueError:
326 326 self._abort(error.ResponseError(_("unexpected response:"), d))
327 327 return r
328 328
329 329 @batchable
330 330 def pushkey(self, namespace, key, old, new):
331 331 if not self.capable('pushkey'):
332 332 yield False, None
333 333 f = future()
334 334 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
335 335 yield {'namespace': encoding.fromlocal(namespace),
336 336 'key': encoding.fromlocal(key),
337 337 'old': encoding.fromlocal(old),
338 338 'new': encoding.fromlocal(new)}, f
339 339 d = f.value
340 340 d, output = d.split('\n', 1)
341 341 try:
342 342 d = bool(int(d))
343 343 except ValueError:
344 344 raise error.ResponseError(
345 345 _('push failed (unexpected response):'), d)
346 346 for l in output.splitlines(True):
347 347 self.ui.status(_('remote: '), l)
348 348 yield d
349 349
350 350 @batchable
351 351 def listkeys(self, namespace):
352 352 if not self.capable('pushkey'):
353 353 yield {}, None
354 354 f = future()
355 355 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
356 356 yield {'namespace': encoding.fromlocal(namespace)}, f
357 357 d = f.value
358 358 self.ui.debug('received listkey for "%s": %i bytes\n'
359 359 % (namespace, len(d)))
360 360 yield pushkeymod.decodekeys(d)
361 361
362 362 def stream_out(self):
363 363 return self._callstream('stream_out')
364 364
365 365 def changegroup(self, nodes, kind):
366 366 n = encodelist(nodes)
367 367 f = self._callcompressable("changegroup", roots=n)
368 368 return changegroupmod.cg1unpacker(f, 'UN')
369 369
370 370 def changegroupsubset(self, bases, heads, kind):
371 371 self.requirecap('changegroupsubset', _('look up remote changes'))
372 372 bases = encodelist(bases)
373 373 heads = encodelist(heads)
374 374 f = self._callcompressable("changegroupsubset",
375 375 bases=bases, heads=heads)
376 376 return changegroupmod.cg1unpacker(f, 'UN')
377 377
378 378 def getbundle(self, source, **kwargs):
379 379 self.requirecap('getbundle', _('look up remote changes'))
380 380 opts = {}
381 381 bundlecaps = kwargs.get('bundlecaps')
382 382 if bundlecaps is not None:
383 383 kwargs['bundlecaps'] = sorted(bundlecaps)
384 384 else:
385 385 bundlecaps = () # kwargs could have it to None
386 386 for key, value in kwargs.iteritems():
387 387 if value is None:
388 388 continue
389 389 keytype = gboptsmap.get(key)
390 390 if keytype is None:
391 391 assert False, 'unexpected'
392 392 elif keytype == 'nodes':
393 393 value = encodelist(value)
394 394 elif keytype in ('csv', 'scsv'):
395 395 value = ','.join(value)
396 396 elif keytype == 'boolean':
397 397 value = '%i' % bool(value)
398 398 elif keytype != 'plain':
399 399 raise KeyError('unknown getbundle option type %s'
400 400 % keytype)
401 401 opts[key] = value
402 402 f = self._callcompressable("getbundle", **opts)
403 403 if any((cap.startswith('HG2') for cap in bundlecaps)):
404 404 return bundle2.getunbundler(self.ui, f)
405 405 else:
406 406 return changegroupmod.cg1unpacker(f, 'UN')
407 407
408 408 def unbundle(self, cg, heads, url):
409 409 '''Send cg (a readable file-like object representing the
410 410 changegroup to push, typically a chunkbuffer object) to the
411 411 remote server as a bundle.
412 412
413 413 When pushing a bundle10 stream, return an integer indicating the
414 414 result of the push (see changegroup.apply()).
415 415
416 416 When pushing a bundle20 stream, return a bundle20 stream.
417 417
418 418 `url` is the url the client thinks it's pushing to, which is
419 419 visible to hooks.
420 420 '''
421 421
422 422 if heads != ['force'] and self.capable('unbundlehash'):
423 423 heads = encodelist(['hashed',
424 424 hashlib.sha1(''.join(sorted(heads))).digest()])
425 425 else:
426 426 heads = encodelist(heads)
427 427
428 428 if util.safehasattr(cg, 'deltaheader'):
429 429 # this a bundle10, do the old style call sequence
430 430 ret, output = self._callpush("unbundle", cg, heads=heads)
431 431 if ret == "":
432 432 raise error.ResponseError(
433 433 _('push failed:'), output)
434 434 try:
435 435 ret = int(ret)
436 436 except ValueError:
437 437 raise error.ResponseError(
438 438 _('push failed (unexpected response):'), ret)
439 439
440 440 for l in output.splitlines(True):
441 441 self.ui.status(_('remote: '), l)
442 442 else:
443 443 # bundle2 push. Send a stream, fetch a stream.
444 444 stream = self._calltwowaystream('unbundle', cg, heads=heads)
445 445 ret = bundle2.getunbundler(self.ui, stream)
446 446 return ret
447 447
448 448 def debugwireargs(self, one, two, three=None, four=None, five=None):
449 449 # don't pass optional arguments left at their default value
450 450 opts = {}
451 451 if three is not None:
452 452 opts['three'] = three
453 453 if four is not None:
454 454 opts['four'] = four
455 455 return self._call('debugwireargs', one=one, two=two, **opts)
456 456
457 457 def _call(self, cmd, **args):
458 458 """execute <cmd> on the server
459 459
460 460 The command is expected to return a simple string.
461 461
462 462 returns the server reply as a string."""
463 463 raise NotImplementedError()
464 464
465 465 def _callstream(self, cmd, **args):
466 466 """execute <cmd> on the server
467 467
468 468 The command is expected to return a stream. Note that if the
469 469 command doesn't return a stream, _callstream behaves
470 470 differently for ssh and http peers.
471 471
472 472 returns the server reply as a file like object.
473 473 """
474 474 raise NotImplementedError()
475 475
476 476 def _callcompressable(self, cmd, **args):
477 477 """execute <cmd> on the server
478 478
479 479 The command is expected to return a stream.
480 480
481 481 The stream may have been compressed in some implementations. This
482 482 function takes care of the decompression. This is the only difference
483 483 with _callstream.
484 484
485 485 returns the server reply as a file like object.
486 486 """
487 487 raise NotImplementedError()
488 488
489 489 def _callpush(self, cmd, fp, **args):
490 490 """execute a <cmd> on server
491 491
492 492 The command is expected to be related to a push. Push has a special
493 493 return method.
494 494
495 495 returns the server reply as a (ret, output) tuple. ret is either
496 496 empty (error) or a stringified int.
497 497 """
498 498 raise NotImplementedError()
499 499
500 500 def _calltwowaystream(self, cmd, fp, **args):
501 501 """execute <cmd> on server
502 502
503 503 The command will send a stream to the server and get a stream in reply.
504 504 """
505 505 raise NotImplementedError()
506 506
507 507 def _abort(self, exception):
508 508 """clearly abort the wire protocol connection and raise the exception
509 509 """
510 510 raise NotImplementedError()
511 511
512 512 # server side
513 513
514 514 # wire protocol command can either return a string or one of these classes.
515 515 class streamres(object):
516 516 """wireproto reply: binary stream
517 517
518 518 The call was successful and the result is a stream.
519 519
520 520 Accepts either a generator or an object with a ``read(size)`` method.
521 521
522 522 ``v1compressible`` indicates whether this data can be compressed to
523 523 "version 1" clients (technically: HTTP peers using
524 524 application/mercurial-0.1 media type). This flag should NOT be used on
525 525 new commands because new clients should support a more modern compression
526 526 mechanism.
527 527 """
528 528 def __init__(self, gen=None, reader=None, v1compressible=False):
529 529 self.gen = gen
530 530 self.reader = reader
531 531 self.v1compressible = v1compressible
532 532
533 533 class pushres(object):
534 534 """wireproto reply: success with simple integer return
535 535
536 536 The call was successful and returned an integer contained in `self.res`.
537 537 """
538 538 def __init__(self, res):
539 539 self.res = res
540 540
541 541 class pusherr(object):
542 542 """wireproto reply: failure
543 543
544 544 The call failed. The `self.res` attribute contains the error message.
545 545 """
546 546 def __init__(self, res):
547 547 self.res = res
548 548
549 549 class ooberror(object):
550 550 """wireproto reply: failure of a batch of operation
551 551
552 552 Something failed during a batch call. The error message is stored in
553 553 `self.message`.
554 554 """
555 555 def __init__(self, message):
556 556 self.message = message
557 557
558 558 def getdispatchrepo(repo, proto, command):
559 559 """Obtain the repo used for processing wire protocol commands.
560 560
561 561 The intent of this function is to serve as a monkeypatch point for
562 562 extensions that need commands to operate on different repo views under
563 563 specialized circumstances.
564 564 """
565 565 return repo.filtered('served')
566 566
567 567 def dispatch(repo, proto, command):
568 568 repo = getdispatchrepo(repo, proto, command)
569 569 func, spec = commands[command]
570 570 args = proto.getargs(spec)
571 571 return func(repo, proto, *args)
572 572
573 573 def options(cmd, keys, others):
574 574 opts = {}
575 575 for k in keys:
576 576 if k in others:
577 577 opts[k] = others[k]
578 578 del others[k]
579 579 if others:
580 580 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
581 581 % (cmd, ",".join(others)))
582 582 return opts
583 583
584 584 def bundle1allowed(repo, action):
585 585 """Whether a bundle1 operation is allowed from the server.
586 586
587 587 Priority is:
588 588
589 589 1. server.bundle1gd.<action> (if generaldelta active)
590 590 2. server.bundle1.<action>
591 591 3. server.bundle1gd (if generaldelta active)
592 592 4. server.bundle1
593 593 """
594 594 ui = repo.ui
595 595 gd = 'generaldelta' in repo.requirements
596 596
597 597 if gd:
598 598 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
599 599 if v is not None:
600 600 return v
601 601
602 602 v = ui.configbool('server', 'bundle1.%s' % action, None)
603 603 if v is not None:
604 604 return v
605 605
606 606 if gd:
607 607 v = ui.configbool('server', 'bundle1gd')
608 608 if v is not None:
609 609 return v
610 610
611 611 return ui.configbool('server', 'bundle1')
612 612
613 613 def supportedcompengines(ui, proto, role):
614 614 """Obtain the list of supported compression engines for a request."""
615 615 assert role in (util.CLIENTROLE, util.SERVERROLE)
616 616
617 617 compengines = util.compengines.supportedwireengines(role)
618 618
619 619 # Allow config to override default list and ordering.
620 620 if role == util.SERVERROLE:
621 621 configengines = ui.configlist('server', 'compressionengines')
622 622 config = 'server.compressionengines'
623 623 else:
624 624 # This is currently implemented mainly to facilitate testing. In most
625 625 # cases, the server should be in charge of choosing a compression engine
626 626 # because a server has the most to lose from a sub-optimal choice. (e.g.
627 627 # CPU DoS due to an expensive engine or a network DoS due to poor
628 628 # compression ratio).
629 629 configengines = ui.configlist('experimental',
630 630 'clientcompressionengines')
631 631 config = 'experimental.clientcompressionengines'
632 632
633 633 # No explicit config. Filter out the ones that aren't supposed to be
634 634 # advertised and return default ordering.
635 635 if not configengines:
636 636 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
637 637 return [e for e in compengines
638 638 if getattr(e.wireprotosupport(), attr) > 0]
639 639
640 640 # If compression engines are listed in the config, assume there is a good
641 641 # reason for it (like server operators wanting to achieve specific
642 642 # performance characteristics). So fail fast if the config references
643 643 # unusable compression engines.
644 644 validnames = set(e.name() for e in compengines)
645 645 invalidnames = set(e for e in configengines if e not in validnames)
646 646 if invalidnames:
647 647 raise error.Abort(_('invalid compression engine defined in %s: %s') %
648 648 (config, ', '.join(sorted(invalidnames))))
649 649
650 650 compengines = [e for e in compengines if e.name() in configengines]
651 651 compengines = sorted(compengines,
652 652 key=lambda e: configengines.index(e.name()))
653 653
654 654 if not compengines:
655 655 raise error.Abort(_('%s config option does not specify any known '
656 656 'compression engines') % config,
657 657 hint=_('usable compression engines: %s') %
658 658 ', '.sorted(validnames))
659 659
660 660 return compengines
661 661
662 662 # list of commands
663 663 commands = {}
664 664
665 665 def wireprotocommand(name, args=''):
666 666 """decorator for wire protocol command"""
667 667 def register(func):
668 668 commands[name] = (func, args)
669 669 return func
670 670 return register
671 671
672 672 @wireprotocommand('batch', 'cmds *')
673 673 def batch(repo, proto, cmds, others):
674 674 repo = repo.filtered("served")
675 675 res = []
676 676 for pair in cmds.split(';'):
677 677 op, args = pair.split(' ', 1)
678 678 vals = {}
679 679 for a in args.split(','):
680 680 if a:
681 681 n, v = a.split('=')
682 682 vals[unescapearg(n)] = unescapearg(v)
683 683 func, spec = commands[op]
684 684 if spec:
685 685 keys = spec.split()
686 686 data = {}
687 687 for k in keys:
688 688 if k == '*':
689 689 star = {}
690 690 for key in vals.keys():
691 691 if key not in keys:
692 692 star[key] = vals[key]
693 693 data['*'] = star
694 694 else:
695 695 data[k] = vals[k]
696 696 result = func(repo, proto, *[data[k] for k in keys])
697 697 else:
698 698 result = func(repo, proto)
699 699 if isinstance(result, ooberror):
700 700 return result
701 701 res.append(escapearg(result))
702 702 return ';'.join(res)
703 703
704 704 @wireprotocommand('between', 'pairs')
705 705 def between(repo, proto, pairs):
706 706 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
707 707 r = []
708 708 for b in repo.between(pairs):
709 709 r.append(encodelist(b) + "\n")
710 710 return "".join(r)
711 711
712 712 @wireprotocommand('branchmap')
713 713 def branchmap(repo, proto):
714 714 branchmap = repo.branchmap()
715 715 heads = []
716 716 for branch, nodes in branchmap.iteritems():
717 717 branchname = urlreq.quote(encoding.fromlocal(branch))
718 718 branchnodes = encodelist(nodes)
719 719 heads.append('%s %s' % (branchname, branchnodes))
720 720 return '\n'.join(heads)
721 721
722 722 @wireprotocommand('branches', 'nodes')
723 723 def branches(repo, proto, nodes):
724 724 nodes = decodelist(nodes)
725 725 r = []
726 726 for b in repo.branches(nodes):
727 727 r.append(encodelist(b) + "\n")
728 728 return "".join(r)
729 729
730 730 @wireprotocommand('clonebundles', '')
731 731 def clonebundles(repo, proto):
732 732 """Server command for returning info for available bundles to seed clones.
733 733
734 734 Clients will parse this response and determine what bundle to fetch.
735 735
736 736 Extensions may wrap this command to filter or dynamically emit data
737 737 depending on the request. e.g. you could advertise URLs for the closest
738 738 data center given the client's IP address.
739 739 """
740 740 return repo.vfs.tryread('clonebundles.manifest')
741 741
742 742 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
743 743 'known', 'getbundle', 'unbundlehash', 'batch']
744 744
745 745 def _capabilities(repo, proto):
746 746 """return a list of capabilities for a repo
747 747
748 748 This function exists to allow extensions to easily wrap capabilities
749 749 computation
750 750
751 751 - returns a lists: easy to alter
752 752 - change done here will be propagated to both `capabilities` and `hello`
753 753 command without any other action needed.
754 754 """
755 755 # copy to prevent modification of the global list
756 756 caps = list(wireprotocaps)
757 757 if streamclone.allowservergeneration(repo):
758 if repo.ui.configbool('server', 'preferuncompressed', False):
758 if repo.ui.configbool('server', 'preferuncompressed'):
759 759 caps.append('stream-preferred')
760 760 requiredformats = repo.requirements & repo.supportedformats
761 761 # if our local revlogs are just revlogv1, add 'stream' cap
762 762 if not requiredformats - {'revlogv1'}:
763 763 caps.append('stream')
764 764 # otherwise, add 'streamreqs' detailing our local revlog format
765 765 else:
766 766 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
767 767 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
768 768 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
769 769 caps.append('bundle2=' + urlreq.quote(capsblob))
770 770 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
771 771
772 772 if proto.name == 'http':
773 773 caps.append('httpheader=%d' %
774 774 repo.ui.configint('server', 'maxhttpheaderlen'))
775 775 if repo.ui.configbool('experimental', 'httppostargs', False):
776 776 caps.append('httppostargs')
777 777
778 778 # FUTURE advertise 0.2rx once support is implemented
779 779 # FUTURE advertise minrx and mintx after consulting config option
780 780 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
781 781
782 782 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
783 783 if compengines:
784 784 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
785 785 for e in compengines)
786 786 caps.append('compression=%s' % comptypes)
787 787
788 788 return caps
789 789
790 790 # If you are writing an extension and consider wrapping this function. Wrap
791 791 # `_capabilities` instead.
792 792 @wireprotocommand('capabilities')
793 793 def capabilities(repo, proto):
794 794 return ' '.join(_capabilities(repo, proto))
795 795
796 796 @wireprotocommand('changegroup', 'roots')
797 797 def changegroup(repo, proto, roots):
798 798 nodes = decodelist(roots)
799 799 cg = changegroupmod.changegroup(repo, nodes, 'serve')
800 800 return streamres(reader=cg, v1compressible=True)
801 801
802 802 @wireprotocommand('changegroupsubset', 'bases heads')
803 803 def changegroupsubset(repo, proto, bases, heads):
804 804 bases = decodelist(bases)
805 805 heads = decodelist(heads)
806 806 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
807 807 return streamres(reader=cg, v1compressible=True)
808 808
809 809 @wireprotocommand('debugwireargs', 'one two *')
810 810 def debugwireargs(repo, proto, one, two, others):
811 811 # only accept optional args from the known set
812 812 opts = options('debugwireargs', ['three', 'four'], others)
813 813 return repo.debugwireargs(one, two, **opts)
814 814
815 815 @wireprotocommand('getbundle', '*')
816 816 def getbundle(repo, proto, others):
817 817 opts = options('getbundle', gboptsmap.keys(), others)
818 818 for k, v in opts.iteritems():
819 819 keytype = gboptsmap[k]
820 820 if keytype == 'nodes':
821 821 opts[k] = decodelist(v)
822 822 elif keytype == 'csv':
823 823 opts[k] = list(v.split(','))
824 824 elif keytype == 'scsv':
825 825 opts[k] = set(v.split(','))
826 826 elif keytype == 'boolean':
827 827 # Client should serialize False as '0', which is a non-empty string
828 828 # so it evaluates as a True bool.
829 829 if v == '0':
830 830 opts[k] = False
831 831 else:
832 832 opts[k] = bool(v)
833 833 elif keytype != 'plain':
834 834 raise KeyError('unknown getbundle option type %s'
835 835 % keytype)
836 836
837 837 if not bundle1allowed(repo, 'pull'):
838 838 if not exchange.bundle2requested(opts.get('bundlecaps')):
839 839 if proto.name == 'http':
840 840 return ooberror(bundle2required)
841 841 raise error.Abort(bundle2requiredmain,
842 842 hint=bundle2requiredhint)
843 843
844 844 try:
845 845 if repo.ui.configbool('server', 'disablefullbundle'):
846 846 # Check to see if this is a full clone.
847 847 clheads = set(repo.changelog.heads())
848 848 heads = set(opts.get('heads', set()))
849 849 common = set(opts.get('common', set()))
850 850 common.discard(nullid)
851 851 if not common and clheads == heads:
852 852 raise error.Abort(
853 853 _('server has pull-based clones disabled'),
854 854 hint=_('remove --pull if specified or upgrade Mercurial'))
855 855
856 856 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
857 857 except error.Abort as exc:
858 858 # cleanly forward Abort error to the client
859 859 if not exchange.bundle2requested(opts.get('bundlecaps')):
860 860 if proto.name == 'http':
861 861 return ooberror(str(exc) + '\n')
862 862 raise # cannot do better for bundle1 + ssh
863 863 # bundle2 request expect a bundle2 reply
864 864 bundler = bundle2.bundle20(repo.ui)
865 865 manargs = [('message', str(exc))]
866 866 advargs = []
867 867 if exc.hint is not None:
868 868 advargs.append(('hint', exc.hint))
869 869 bundler.addpart(bundle2.bundlepart('error:abort',
870 870 manargs, advargs))
871 871 return streamres(gen=bundler.getchunks(), v1compressible=True)
872 872 return streamres(gen=chunks, v1compressible=True)
873 873
874 874 @wireprotocommand('heads')
875 875 def heads(repo, proto):
876 876 h = repo.heads()
877 877 return encodelist(h) + "\n"
878 878
879 879 @wireprotocommand('hello')
880 880 def hello(repo, proto):
881 881 '''the hello command returns a set of lines describing various
882 882 interesting things about the server, in an RFC822-like format.
883 883 Currently the only one defined is "capabilities", which
884 884 consists of a line in the form:
885 885
886 886 capabilities: space separated list of tokens
887 887 '''
888 888 return "capabilities: %s\n" % (capabilities(repo, proto))
889 889
890 890 @wireprotocommand('listkeys', 'namespace')
891 891 def listkeys(repo, proto, namespace):
892 892 d = repo.listkeys(encoding.tolocal(namespace)).items()
893 893 return pushkeymod.encodekeys(d)
894 894
895 895 @wireprotocommand('lookup', 'key')
896 896 def lookup(repo, proto, key):
897 897 try:
898 898 k = encoding.tolocal(key)
899 899 c = repo[k]
900 900 r = c.hex()
901 901 success = 1
902 902 except Exception as inst:
903 903 r = str(inst)
904 904 success = 0
905 905 return "%s %s\n" % (success, r)
906 906
907 907 @wireprotocommand('known', 'nodes *')
908 908 def known(repo, proto, nodes, others):
909 909 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
910 910
911 911 @wireprotocommand('pushkey', 'namespace key old new')
912 912 def pushkey(repo, proto, namespace, key, old, new):
913 913 # compatibility with pre-1.8 clients which were accidentally
914 914 # sending raw binary nodes rather than utf-8-encoded hex
915 915 if len(new) == 20 and util.escapestr(new) != new:
916 916 # looks like it could be a binary node
917 917 try:
918 918 new.decode('utf-8')
919 919 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
920 920 except UnicodeDecodeError:
921 921 pass # binary, leave unmodified
922 922 else:
923 923 new = encoding.tolocal(new) # normal path
924 924
925 925 if util.safehasattr(proto, 'restore'):
926 926
927 927 proto.redirect()
928 928
929 929 try:
930 930 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
931 931 encoding.tolocal(old), new) or False
932 932 except error.Abort:
933 933 r = False
934 934
935 935 output = proto.restore()
936 936
937 937 return '%s\n%s' % (int(r), output)
938 938
939 939 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
940 940 encoding.tolocal(old), new)
941 941 return '%s\n' % int(r)
942 942
943 943 @wireprotocommand('stream_out')
944 944 def stream(repo, proto):
945 945 '''If the server supports streaming clone, it advertises the "stream"
946 946 capability with a value representing the version and flags of the repo
947 947 it is serving. Client checks to see if it understands the format.
948 948 '''
949 949 if not streamclone.allowservergeneration(repo):
950 950 return '1\n'
951 951
952 952 def getstream(it):
953 953 yield '0\n'
954 954 for chunk in it:
955 955 yield chunk
956 956
957 957 try:
958 958 # LockError may be raised before the first result is yielded. Don't
959 959 # emit output until we're sure we got the lock successfully.
960 960 it = streamclone.generatev1wireproto(repo)
961 961 return streamres(gen=getstream(it))
962 962 except error.LockError:
963 963 return '2\n'
964 964
965 965 @wireprotocommand('unbundle', 'heads')
966 966 def unbundle(repo, proto, heads):
967 967 their_heads = decodelist(heads)
968 968
969 969 try:
970 970 proto.redirect()
971 971
972 972 exchange.check_heads(repo, their_heads, 'preparing changes')
973 973
974 974 # write bundle data to temporary file because it can be big
975 975 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
976 976 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
977 977 r = 0
978 978 try:
979 979 proto.getfile(fp)
980 980 fp.seek(0)
981 981 gen = exchange.readbundle(repo.ui, fp, None)
982 982 if (isinstance(gen, changegroupmod.cg1unpacker)
983 983 and not bundle1allowed(repo, 'push')):
984 984 if proto.name == 'http':
985 985 # need to special case http because stderr do not get to
986 986 # the http client on failed push so we need to abuse some
987 987 # other error type to make sure the message get to the
988 988 # user.
989 989 return ooberror(bundle2required)
990 990 raise error.Abort(bundle2requiredmain,
991 991 hint=bundle2requiredhint)
992 992
993 993 r = exchange.unbundle(repo, gen, their_heads, 'serve',
994 994 proto._client())
995 995 if util.safehasattr(r, 'addpart'):
996 996 # The return looks streamable, we are in the bundle2 case and
997 997 # should return a stream.
998 998 return streamres(gen=r.getchunks())
999 999 return pushres(r)
1000 1000
1001 1001 finally:
1002 1002 fp.close()
1003 1003 os.unlink(tempname)
1004 1004
1005 1005 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1006 1006 # handle non-bundle2 case first
1007 1007 if not getattr(exc, 'duringunbundle2', False):
1008 1008 try:
1009 1009 raise
1010 1010 except error.Abort:
1011 1011 # The old code we moved used util.stderr directly.
1012 1012 # We did not change it to minimise code change.
1013 1013 # This need to be moved to something proper.
1014 1014 # Feel free to do it.
1015 1015 util.stderr.write("abort: %s\n" % exc)
1016 1016 if exc.hint is not None:
1017 1017 util.stderr.write("(%s)\n" % exc.hint)
1018 1018 return pushres(0)
1019 1019 except error.PushRaced:
1020 1020 return pusherr(str(exc))
1021 1021
1022 1022 bundler = bundle2.bundle20(repo.ui)
1023 1023 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1024 1024 bundler.addpart(out)
1025 1025 try:
1026 1026 try:
1027 1027 raise
1028 1028 except error.PushkeyFailed as exc:
1029 1029 # check client caps
1030 1030 remotecaps = getattr(exc, '_replycaps', None)
1031 1031 if (remotecaps is not None
1032 1032 and 'pushkey' not in remotecaps.get('error', ())):
1033 1033 # no support remote side, fallback to Abort handler.
1034 1034 raise
1035 1035 part = bundler.newpart('error:pushkey')
1036 1036 part.addparam('in-reply-to', exc.partid)
1037 1037 if exc.namespace is not None:
1038 1038 part.addparam('namespace', exc.namespace, mandatory=False)
1039 1039 if exc.key is not None:
1040 1040 part.addparam('key', exc.key, mandatory=False)
1041 1041 if exc.new is not None:
1042 1042 part.addparam('new', exc.new, mandatory=False)
1043 1043 if exc.old is not None:
1044 1044 part.addparam('old', exc.old, mandatory=False)
1045 1045 if exc.ret is not None:
1046 1046 part.addparam('ret', exc.ret, mandatory=False)
1047 1047 except error.BundleValueError as exc:
1048 1048 errpart = bundler.newpart('error:unsupportedcontent')
1049 1049 if exc.parttype is not None:
1050 1050 errpart.addparam('parttype', exc.parttype)
1051 1051 if exc.params:
1052 1052 errpart.addparam('params', '\0'.join(exc.params))
1053 1053 except error.Abort as exc:
1054 1054 manargs = [('message', str(exc))]
1055 1055 advargs = []
1056 1056 if exc.hint is not None:
1057 1057 advargs.append(('hint', exc.hint))
1058 1058 bundler.addpart(bundle2.bundlepart('error:abort',
1059 1059 manargs, advargs))
1060 1060 except error.PushRaced as exc:
1061 1061 bundler.newpart('error:pushraced', [('message', str(exc))])
1062 1062 return streamres(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now