##// END OF EJS Templates
wireproto: move supportedcompengines out of wireproto...
Gregory Szorc -
r37801:9d818539 default
parent child Browse files
Show More
@@ -1,720 +1,671
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import tempfile
12 12
13 13 from .i18n import _
14 14 from .node import (
15 15 hex,
16 16 nullid,
17 17 )
18 18
19 19 from . import (
20 20 bundle2,
21 21 changegroup as changegroupmod,
22 22 discovery,
23 23 encoding,
24 24 error,
25 25 exchange,
26 26 pushkey as pushkeymod,
27 27 pycompat,
28 28 streamclone,
29 29 util,
30 30 wireprototypes,
31 31 )
32 32
33 33 from .utils import (
34 34 procutil,
35 35 stringutil,
36 36 )
37 37
38 38 urlerr = util.urlerr
39 39 urlreq = util.urlreq
40 40
41 41 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
42 42 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
43 43 'IncompatibleClient')
44 44 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
45 45
46 46 def clientcompressionsupport(proto):
47 47 """Returns a list of compression methods supported by the client.
48 48
49 49 Returns a list of the compression methods supported by the client
50 50 according to the protocol capabilities. If no such capability has
51 51 been announced, fallback to the default of zlib and uncompressed.
52 52 """
53 53 for cap in proto.getprotocaps():
54 54 if cap.startswith('comp='):
55 55 return cap[5:].split(',')
56 56 return ['zlib', 'none']
57 57
58 58 # wire protocol command can either return a string or one of these classes.
59 59
60 60 def getdispatchrepo(repo, proto, command):
61 61 """Obtain the repo used for processing wire protocol commands.
62 62
63 63 The intent of this function is to serve as a monkeypatch point for
64 64 extensions that need commands to operate on different repo views under
65 65 specialized circumstances.
66 66 """
67 67 return repo.filtered('served')
68 68
69 69 def dispatch(repo, proto, command):
70 70 repo = getdispatchrepo(repo, proto, command)
71 71
72 72 func, spec = commands[command]
73 73 args = proto.getargs(spec)
74 74
75 75 return func(repo, proto, *args)
76 76
77 77 def options(cmd, keys, others):
78 78 opts = {}
79 79 for k in keys:
80 80 if k in others:
81 81 opts[k] = others[k]
82 82 del others[k]
83 83 if others:
84 84 procutil.stderr.write("warning: %s ignored unexpected arguments %s\n"
85 85 % (cmd, ",".join(others)))
86 86 return opts
87 87
88 88 def bundle1allowed(repo, action):
89 89 """Whether a bundle1 operation is allowed from the server.
90 90
91 91 Priority is:
92 92
93 93 1. server.bundle1gd.<action> (if generaldelta active)
94 94 2. server.bundle1.<action>
95 95 3. server.bundle1gd (if generaldelta active)
96 96 4. server.bundle1
97 97 """
98 98 ui = repo.ui
99 99 gd = 'generaldelta' in repo.requirements
100 100
101 101 if gd:
102 102 v = ui.configbool('server', 'bundle1gd.%s' % action)
103 103 if v is not None:
104 104 return v
105 105
106 106 v = ui.configbool('server', 'bundle1.%s' % action)
107 107 if v is not None:
108 108 return v
109 109
110 110 if gd:
111 111 v = ui.configbool('server', 'bundle1gd')
112 112 if v is not None:
113 113 return v
114 114
115 115 return ui.configbool('server', 'bundle1')
116 116
117 def supportedcompengines(ui, role):
118 """Obtain the list of supported compression engines for a request."""
119 assert role in (util.CLIENTROLE, util.SERVERROLE)
120
121 compengines = util.compengines.supportedwireengines(role)
122
123 # Allow config to override default list and ordering.
124 if role == util.SERVERROLE:
125 configengines = ui.configlist('server', 'compressionengines')
126 config = 'server.compressionengines'
127 else:
128 # This is currently implemented mainly to facilitate testing. In most
129 # cases, the server should be in charge of choosing a compression engine
130 # because a server has the most to lose from a sub-optimal choice. (e.g.
131 # CPU DoS due to an expensive engine or a network DoS due to poor
132 # compression ratio).
133 configengines = ui.configlist('experimental',
134 'clientcompressionengines')
135 config = 'experimental.clientcompressionengines'
136
137 # No explicit config. Filter out the ones that aren't supposed to be
138 # advertised and return default ordering.
139 if not configengines:
140 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
141 return [e for e in compengines
142 if getattr(e.wireprotosupport(), attr) > 0]
143
144 # If compression engines are listed in the config, assume there is a good
145 # reason for it (like server operators wanting to achieve specific
146 # performance characteristics). So fail fast if the config references
147 # unusable compression engines.
148 validnames = set(e.name() for e in compengines)
149 invalidnames = set(e for e in configengines if e not in validnames)
150 if invalidnames:
151 raise error.Abort(_('invalid compression engine defined in %s: %s') %
152 (config, ', '.join(sorted(invalidnames))))
153
154 compengines = [e for e in compengines if e.name() in configengines]
155 compengines = sorted(compengines,
156 key=lambda e: configengines.index(e.name()))
157
158 if not compengines:
159 raise error.Abort(_('%s config option does not specify any known '
160 'compression engines') % config,
161 hint=_('usable compression engines: %s') %
162 ', '.sorted(validnames))
163
164 return compengines
165
166 117 # For version 1 transports.
167 118 commands = wireprototypes.commanddict()
168 119
169 120 # For version 2 transports.
170 121 commandsv2 = wireprototypes.commanddict()
171 122
172 123 def wireprotocommand(name, args=None, permission='push'):
173 124 """Decorator to declare a wire protocol command.
174 125
175 126 ``name`` is the name of the wire protocol command being provided.
176 127
177 128 ``args`` defines the named arguments accepted by the command. It is
178 129 a space-delimited list of argument names. ``*`` denotes a special value
179 130 that says to accept all named arguments.
180 131
181 132 ``permission`` defines the permission type needed to run this command.
182 133 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
183 134 respectively. Default is to assume command requires ``push`` permissions
184 135 because otherwise commands not declaring their permissions could modify
185 136 a repository that is supposed to be read-only.
186 137 """
187 138 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
188 139 if v['version'] == 1}
189 140
190 141 # Because SSHv2 is a mirror of SSHv1, we allow "batch" commands through to
191 142 # SSHv2.
192 143 # TODO undo this hack when SSH is using the unified frame protocol.
193 144 if name == b'batch':
194 145 transports.add(wireprototypes.SSHV2)
195 146
196 147 if permission not in ('push', 'pull'):
197 148 raise error.ProgrammingError('invalid wire protocol permission; '
198 149 'got %s; expected "push" or "pull"' %
199 150 permission)
200 151
201 152 if args is None:
202 153 args = ''
203 154
204 155 if not isinstance(args, bytes):
205 156 raise error.ProgrammingError('arguments for version 1 commands '
206 157 'must be declared as bytes')
207 158
208 159 def register(func):
209 160 if name in commands:
210 161 raise error.ProgrammingError('%s command already registered '
211 162 'for version 1' % name)
212 163 commands[name] = wireprototypes.commandentry(
213 164 func, args=args, transports=transports, permission=permission)
214 165
215 166 return func
216 167 return register
217 168
218 169 # TODO define a more appropriate permissions type to use for this.
219 170 @wireprotocommand('batch', 'cmds *', permission='pull')
220 171 def batch(repo, proto, cmds, others):
221 172 unescapearg = wireprototypes.unescapebatcharg
222 173 repo = repo.filtered("served")
223 174 res = []
224 175 for pair in cmds.split(';'):
225 176 op, args = pair.split(' ', 1)
226 177 vals = {}
227 178 for a in args.split(','):
228 179 if a:
229 180 n, v = a.split('=')
230 181 vals[unescapearg(n)] = unescapearg(v)
231 182 func, spec = commands[op]
232 183
233 184 # Validate that client has permissions to perform this command.
234 185 perm = commands[op].permission
235 186 assert perm in ('push', 'pull')
236 187 proto.checkperm(perm)
237 188
238 189 if spec:
239 190 keys = spec.split()
240 191 data = {}
241 192 for k in keys:
242 193 if k == '*':
243 194 star = {}
244 195 for key in vals.keys():
245 196 if key not in keys:
246 197 star[key] = vals[key]
247 198 data['*'] = star
248 199 else:
249 200 data[k] = vals[k]
250 201 result = func(repo, proto, *[data[k] for k in keys])
251 202 else:
252 203 result = func(repo, proto)
253 204 if isinstance(result, wireprototypes.ooberror):
254 205 return result
255 206
256 207 # For now, all batchable commands must return bytesresponse or
257 208 # raw bytes (for backwards compatibility).
258 209 assert isinstance(result, (wireprototypes.bytesresponse, bytes))
259 210 if isinstance(result, wireprototypes.bytesresponse):
260 211 result = result.data
261 212 res.append(wireprototypes.escapebatcharg(result))
262 213
263 214 return wireprototypes.bytesresponse(';'.join(res))
264 215
265 216 @wireprotocommand('between', 'pairs', permission='pull')
266 217 def between(repo, proto, pairs):
267 218 pairs = [wireprototypes.decodelist(p, '-') for p in pairs.split(" ")]
268 219 r = []
269 220 for b in repo.between(pairs):
270 221 r.append(wireprototypes.encodelist(b) + "\n")
271 222
272 223 return wireprototypes.bytesresponse(''.join(r))
273 224
274 225 @wireprotocommand('branchmap', permission='pull')
275 226 def branchmap(repo, proto):
276 227 branchmap = repo.branchmap()
277 228 heads = []
278 229 for branch, nodes in branchmap.iteritems():
279 230 branchname = urlreq.quote(encoding.fromlocal(branch))
280 231 branchnodes = wireprototypes.encodelist(nodes)
281 232 heads.append('%s %s' % (branchname, branchnodes))
282 233
283 234 return wireprototypes.bytesresponse('\n'.join(heads))
284 235
285 236 @wireprotocommand('branches', 'nodes', permission='pull')
286 237 def branches(repo, proto, nodes):
287 238 nodes = wireprototypes.decodelist(nodes)
288 239 r = []
289 240 for b in repo.branches(nodes):
290 241 r.append(wireprototypes.encodelist(b) + "\n")
291 242
292 243 return wireprototypes.bytesresponse(''.join(r))
293 244
294 245 @wireprotocommand('clonebundles', '', permission='pull')
295 246 def clonebundles(repo, proto):
296 247 """Server command for returning info for available bundles to seed clones.
297 248
298 249 Clients will parse this response and determine what bundle to fetch.
299 250
300 251 Extensions may wrap this command to filter or dynamically emit data
301 252 depending on the request. e.g. you could advertise URLs for the closest
302 253 data center given the client's IP address.
303 254 """
304 255 return wireprototypes.bytesresponse(
305 256 repo.vfs.tryread('clonebundles.manifest'))
306 257
307 258 wireprotocaps = ['lookup', 'branchmap', 'pushkey',
308 259 'known', 'getbundle', 'unbundlehash']
309 260
310 261 def _capabilities(repo, proto):
311 262 """return a list of capabilities for a repo
312 263
313 264 This function exists to allow extensions to easily wrap capabilities
314 265 computation
315 266
316 267 - returns a lists: easy to alter
317 268 - change done here will be propagated to both `capabilities` and `hello`
318 269 command without any other action needed.
319 270 """
320 271 # copy to prevent modification of the global list
321 272 caps = list(wireprotocaps)
322 273
323 274 # Command of same name as capability isn't exposed to version 1 of
324 275 # transports. So conditionally add it.
325 276 if commands.commandavailable('changegroupsubset', proto):
326 277 caps.append('changegroupsubset')
327 278
328 279 if streamclone.allowservergeneration(repo):
329 280 if repo.ui.configbool('server', 'preferuncompressed'):
330 281 caps.append('stream-preferred')
331 282 requiredformats = repo.requirements & repo.supportedformats
332 283 # if our local revlogs are just revlogv1, add 'stream' cap
333 284 if not requiredformats - {'revlogv1'}:
334 285 caps.append('stream')
335 286 # otherwise, add 'streamreqs' detailing our local revlog format
336 287 else:
337 288 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
338 289 if repo.ui.configbool('experimental', 'bundle2-advertise'):
339 290 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
340 291 caps.append('bundle2=' + urlreq.quote(capsblob))
341 292 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
342 293
343 294 return proto.addcapabilities(repo, caps)
344 295
345 296 # If you are writing an extension and consider wrapping this function. Wrap
346 297 # `_capabilities` instead.
347 298 @wireprotocommand('capabilities', permission='pull')
348 299 def capabilities(repo, proto):
349 300 caps = _capabilities(repo, proto)
350 301 return wireprototypes.bytesresponse(' '.join(sorted(caps)))
351 302
352 303 @wireprotocommand('changegroup', 'roots', permission='pull')
353 304 def changegroup(repo, proto, roots):
354 305 nodes = wireprototypes.decodelist(roots)
355 306 outgoing = discovery.outgoing(repo, missingroots=nodes,
356 307 missingheads=repo.heads())
357 308 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
358 309 gen = iter(lambda: cg.read(32768), '')
359 310 return wireprototypes.streamres(gen=gen)
360 311
361 312 @wireprotocommand('changegroupsubset', 'bases heads',
362 313 permission='pull')
363 314 def changegroupsubset(repo, proto, bases, heads):
364 315 bases = wireprototypes.decodelist(bases)
365 316 heads = wireprototypes.decodelist(heads)
366 317 outgoing = discovery.outgoing(repo, missingroots=bases,
367 318 missingheads=heads)
368 319 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
369 320 gen = iter(lambda: cg.read(32768), '')
370 321 return wireprototypes.streamres(gen=gen)
371 322
372 323 @wireprotocommand('debugwireargs', 'one two *',
373 324 permission='pull')
374 325 def debugwireargs(repo, proto, one, two, others):
375 326 # only accept optional args from the known set
376 327 opts = options('debugwireargs', ['three', 'four'], others)
377 328 return wireprototypes.bytesresponse(repo.debugwireargs(
378 329 one, two, **pycompat.strkwargs(opts)))
379 330
380 331 def find_pullbundle(repo, proto, opts, clheads, heads, common):
381 332 """Return a file object for the first matching pullbundle.
382 333
383 334 Pullbundles are specified in .hg/pullbundles.manifest similar to
384 335 clonebundles.
385 336 For each entry, the bundle specification is checked for compatibility:
386 337 - Client features vs the BUNDLESPEC.
387 338 - Revisions shared with the clients vs base revisions of the bundle.
388 339 A bundle can be applied only if all its base revisions are known by
389 340 the client.
390 341 - At least one leaf of the bundle's DAG is missing on the client.
391 342 - Every leaf of the bundle's DAG is part of node set the client wants.
392 343 E.g. do not send a bundle of all changes if the client wants only
393 344 one specific branch of many.
394 345 """
395 346 def decodehexstring(s):
396 347 return set([h.decode('hex') for h in s.split(';')])
397 348
398 349 manifest = repo.vfs.tryread('pullbundles.manifest')
399 350 if not manifest:
400 351 return None
401 352 res = exchange.parseclonebundlesmanifest(repo, manifest)
402 353 res = exchange.filterclonebundleentries(repo, res)
403 354 if not res:
404 355 return None
405 356 cl = repo.changelog
406 357 heads_anc = cl.ancestors([cl.rev(rev) for rev in heads], inclusive=True)
407 358 common_anc = cl.ancestors([cl.rev(rev) for rev in common], inclusive=True)
408 359 compformats = clientcompressionsupport(proto)
409 360 for entry in res:
410 361 if 'COMPRESSION' in entry and entry['COMPRESSION'] not in compformats:
411 362 continue
412 363 # No test yet for VERSION, since V2 is supported by any client
413 364 # that advertises partial pulls
414 365 if 'heads' in entry:
415 366 try:
416 367 bundle_heads = decodehexstring(entry['heads'])
417 368 except TypeError:
418 369 # Bad heads entry
419 370 continue
420 371 if bundle_heads.issubset(common):
421 372 continue # Nothing new
422 373 if all(cl.rev(rev) in common_anc for rev in bundle_heads):
423 374 continue # Still nothing new
424 375 if any(cl.rev(rev) not in heads_anc and
425 376 cl.rev(rev) not in common_anc for rev in bundle_heads):
426 377 continue
427 378 if 'bases' in entry:
428 379 try:
429 380 bundle_bases = decodehexstring(entry['bases'])
430 381 except TypeError:
431 382 # Bad bases entry
432 383 continue
433 384 if not all(cl.rev(rev) in common_anc for rev in bundle_bases):
434 385 continue
435 386 path = entry['URL']
436 387 repo.ui.debug('sending pullbundle "%s"\n' % path)
437 388 try:
438 389 return repo.vfs.open(path)
439 390 except IOError:
440 391 repo.ui.debug('pullbundle "%s" not accessible\n' % path)
441 392 continue
442 393 return None
443 394
444 395 @wireprotocommand('getbundle', '*', permission='pull')
445 396 def getbundle(repo, proto, others):
446 397 opts = options('getbundle', wireprototypes.GETBUNDLE_ARGUMENTS.keys(),
447 398 others)
448 399 for k, v in opts.iteritems():
449 400 keytype = wireprototypes.GETBUNDLE_ARGUMENTS[k]
450 401 if keytype == 'nodes':
451 402 opts[k] = wireprototypes.decodelist(v)
452 403 elif keytype == 'csv':
453 404 opts[k] = list(v.split(','))
454 405 elif keytype == 'scsv':
455 406 opts[k] = set(v.split(','))
456 407 elif keytype == 'boolean':
457 408 # Client should serialize False as '0', which is a non-empty string
458 409 # so it evaluates as a True bool.
459 410 if v == '0':
460 411 opts[k] = False
461 412 else:
462 413 opts[k] = bool(v)
463 414 elif keytype != 'plain':
464 415 raise KeyError('unknown getbundle option type %s'
465 416 % keytype)
466 417
467 418 if not bundle1allowed(repo, 'pull'):
468 419 if not exchange.bundle2requested(opts.get('bundlecaps')):
469 420 if proto.name == 'http-v1':
470 421 return wireprototypes.ooberror(bundle2required)
471 422 raise error.Abort(bundle2requiredmain,
472 423 hint=bundle2requiredhint)
473 424
474 425 prefercompressed = True
475 426
476 427 try:
477 428 clheads = set(repo.changelog.heads())
478 429 heads = set(opts.get('heads', set()))
479 430 common = set(opts.get('common', set()))
480 431 common.discard(nullid)
481 432 if (repo.ui.configbool('server', 'pullbundle') and
482 433 'partial-pull' in proto.getprotocaps()):
483 434 # Check if a pre-built bundle covers this request.
484 435 bundle = find_pullbundle(repo, proto, opts, clheads, heads, common)
485 436 if bundle:
486 437 return wireprototypes.streamres(gen=util.filechunkiter(bundle),
487 438 prefer_uncompressed=True)
488 439
489 440 if repo.ui.configbool('server', 'disablefullbundle'):
490 441 # Check to see if this is a full clone.
491 442 changegroup = opts.get('cg', True)
492 443 if changegroup and not common and clheads == heads:
493 444 raise error.Abort(
494 445 _('server has pull-based clones disabled'),
495 446 hint=_('remove --pull if specified or upgrade Mercurial'))
496 447
497 448 info, chunks = exchange.getbundlechunks(repo, 'serve',
498 449 **pycompat.strkwargs(opts))
499 450 prefercompressed = info.get('prefercompressed', True)
500 451 except error.Abort as exc:
501 452 # cleanly forward Abort error to the client
502 453 if not exchange.bundle2requested(opts.get('bundlecaps')):
503 454 if proto.name == 'http-v1':
504 455 return wireprototypes.ooberror(pycompat.bytestr(exc) + '\n')
505 456 raise # cannot do better for bundle1 + ssh
506 457 # bundle2 request expect a bundle2 reply
507 458 bundler = bundle2.bundle20(repo.ui)
508 459 manargs = [('message', pycompat.bytestr(exc))]
509 460 advargs = []
510 461 if exc.hint is not None:
511 462 advargs.append(('hint', exc.hint))
512 463 bundler.addpart(bundle2.bundlepart('error:abort',
513 464 manargs, advargs))
514 465 chunks = bundler.getchunks()
515 466 prefercompressed = False
516 467
517 468 return wireprototypes.streamres(
518 469 gen=chunks, prefer_uncompressed=not prefercompressed)
519 470
520 471 @wireprotocommand('heads', permission='pull')
521 472 def heads(repo, proto):
522 473 h = repo.heads()
523 474 return wireprototypes.bytesresponse(wireprototypes.encodelist(h) + '\n')
524 475
525 476 @wireprotocommand('hello', permission='pull')
526 477 def hello(repo, proto):
527 478 """Called as part of SSH handshake to obtain server info.
528 479
529 480 Returns a list of lines describing interesting things about the
530 481 server, in an RFC822-like format.
531 482
532 483 Currently, the only one defined is ``capabilities``, which consists of a
533 484 line of space separated tokens describing server abilities:
534 485
535 486 capabilities: <token0> <token1> <token2>
536 487 """
537 488 caps = capabilities(repo, proto).data
538 489 return wireprototypes.bytesresponse('capabilities: %s\n' % caps)
539 490
540 491 @wireprotocommand('listkeys', 'namespace', permission='pull')
541 492 def listkeys(repo, proto, namespace):
542 493 d = sorted(repo.listkeys(encoding.tolocal(namespace)).items())
543 494 return wireprototypes.bytesresponse(pushkeymod.encodekeys(d))
544 495
545 496 @wireprotocommand('lookup', 'key', permission='pull')
546 497 def lookup(repo, proto, key):
547 498 try:
548 499 k = encoding.tolocal(key)
549 500 n = repo.lookup(k)
550 501 r = hex(n)
551 502 success = 1
552 503 except Exception as inst:
553 504 r = stringutil.forcebytestr(inst)
554 505 success = 0
555 506 return wireprototypes.bytesresponse('%d %s\n' % (success, r))
556 507
557 508 @wireprotocommand('known', 'nodes *', permission='pull')
558 509 def known(repo, proto, nodes, others):
559 510 v = ''.join(b and '1' or '0'
560 511 for b in repo.known(wireprototypes.decodelist(nodes)))
561 512 return wireprototypes.bytesresponse(v)
562 513
563 514 @wireprotocommand('protocaps', 'caps', permission='pull')
564 515 def protocaps(repo, proto, caps):
565 516 if proto.name == wireprototypes.SSHV1:
566 517 proto._protocaps = set(caps.split(' '))
567 518 return wireprototypes.bytesresponse('OK')
568 519
569 520 @wireprotocommand('pushkey', 'namespace key old new', permission='push')
570 521 def pushkey(repo, proto, namespace, key, old, new):
571 522 # compatibility with pre-1.8 clients which were accidentally
572 523 # sending raw binary nodes rather than utf-8-encoded hex
573 524 if len(new) == 20 and stringutil.escapestr(new) != new:
574 525 # looks like it could be a binary node
575 526 try:
576 527 new.decode('utf-8')
577 528 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
578 529 except UnicodeDecodeError:
579 530 pass # binary, leave unmodified
580 531 else:
581 532 new = encoding.tolocal(new) # normal path
582 533
583 534 with proto.mayberedirectstdio() as output:
584 535 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
585 536 encoding.tolocal(old), new) or False
586 537
587 538 output = output.getvalue() if output else ''
588 539 return wireprototypes.bytesresponse('%d\n%s' % (int(r), output))
589 540
590 541 @wireprotocommand('stream_out', permission='pull')
591 542 def stream(repo, proto):
592 543 '''If the server supports streaming clone, it advertises the "stream"
593 544 capability with a value representing the version and flags of the repo
594 545 it is serving. Client checks to see if it understands the format.
595 546 '''
596 547 return wireprototypes.streamreslegacy(
597 548 streamclone.generatev1wireproto(repo))
598 549
599 550 @wireprotocommand('unbundle', 'heads', permission='push')
600 551 def unbundle(repo, proto, heads):
601 552 their_heads = wireprototypes.decodelist(heads)
602 553
603 554 with proto.mayberedirectstdio() as output:
604 555 try:
605 556 exchange.check_heads(repo, their_heads, 'preparing changes')
606 557 cleanup = lambda: None
607 558 try:
608 559 payload = proto.getpayload()
609 560 if repo.ui.configbool('server', 'streamunbundle'):
610 561 def cleanup():
611 562 # Ensure that the full payload is consumed, so
612 563 # that the connection doesn't contain trailing garbage.
613 564 for p in payload:
614 565 pass
615 566 fp = util.chunkbuffer(payload)
616 567 else:
617 568 # write bundle data to temporary file as it can be big
618 569 fp, tempname = None, None
619 570 def cleanup():
620 571 if fp:
621 572 fp.close()
622 573 if tempname:
623 574 os.unlink(tempname)
624 575 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
625 576 repo.ui.debug('redirecting incoming bundle to %s\n' %
626 577 tempname)
627 578 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
628 579 r = 0
629 580 for p in payload:
630 581 fp.write(p)
631 582 fp.seek(0)
632 583
633 584 gen = exchange.readbundle(repo.ui, fp, None)
634 585 if (isinstance(gen, changegroupmod.cg1unpacker)
635 586 and not bundle1allowed(repo, 'push')):
636 587 if proto.name == 'http-v1':
637 588 # need to special case http because stderr do not get to
638 589 # the http client on failed push so we need to abuse
639 590 # some other error type to make sure the message get to
640 591 # the user.
641 592 return wireprototypes.ooberror(bundle2required)
642 593 raise error.Abort(bundle2requiredmain,
643 594 hint=bundle2requiredhint)
644 595
645 596 r = exchange.unbundle(repo, gen, their_heads, 'serve',
646 597 proto.client())
647 598 if util.safehasattr(r, 'addpart'):
648 599 # The return looks streamable, we are in the bundle2 case
649 600 # and should return a stream.
650 601 return wireprototypes.streamreslegacy(gen=r.getchunks())
651 602 return wireprototypes.pushres(
652 603 r, output.getvalue() if output else '')
653 604
654 605 finally:
655 606 cleanup()
656 607
657 608 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
658 609 # handle non-bundle2 case first
659 610 if not getattr(exc, 'duringunbundle2', False):
660 611 try:
661 612 raise
662 613 except error.Abort:
663 614 # The old code we moved used procutil.stderr directly.
664 615 # We did not change it to minimise code change.
665 616 # This need to be moved to something proper.
666 617 # Feel free to do it.
667 618 procutil.stderr.write("abort: %s\n" % exc)
668 619 if exc.hint is not None:
669 620 procutil.stderr.write("(%s)\n" % exc.hint)
670 621 procutil.stderr.flush()
671 622 return wireprototypes.pushres(
672 623 0, output.getvalue() if output else '')
673 624 except error.PushRaced:
674 625 return wireprototypes.pusherr(
675 626 pycompat.bytestr(exc),
676 627 output.getvalue() if output else '')
677 628
678 629 bundler = bundle2.bundle20(repo.ui)
679 630 for out in getattr(exc, '_bundle2salvagedoutput', ()):
680 631 bundler.addpart(out)
681 632 try:
682 633 try:
683 634 raise
684 635 except error.PushkeyFailed as exc:
685 636 # check client caps
686 637 remotecaps = getattr(exc, '_replycaps', None)
687 638 if (remotecaps is not None
688 639 and 'pushkey' not in remotecaps.get('error', ())):
689 640 # no support remote side, fallback to Abort handler.
690 641 raise
691 642 part = bundler.newpart('error:pushkey')
692 643 part.addparam('in-reply-to', exc.partid)
693 644 if exc.namespace is not None:
694 645 part.addparam('namespace', exc.namespace,
695 646 mandatory=False)
696 647 if exc.key is not None:
697 648 part.addparam('key', exc.key, mandatory=False)
698 649 if exc.new is not None:
699 650 part.addparam('new', exc.new, mandatory=False)
700 651 if exc.old is not None:
701 652 part.addparam('old', exc.old, mandatory=False)
702 653 if exc.ret is not None:
703 654 part.addparam('ret', exc.ret, mandatory=False)
704 655 except error.BundleValueError as exc:
705 656 errpart = bundler.newpart('error:unsupportedcontent')
706 657 if exc.parttype is not None:
707 658 errpart.addparam('parttype', exc.parttype)
708 659 if exc.params:
709 660 errpart.addparam('params', '\0'.join(exc.params))
710 661 except error.Abort as exc:
711 662 manargs = [('message', stringutil.forcebytestr(exc))]
712 663 advargs = []
713 664 if exc.hint is not None:
714 665 advargs.append(('hint', exc.hint))
715 666 bundler.addpart(bundle2.bundlepart('error:abort',
716 667 manargs, advargs))
717 668 except error.PushRaced as exc:
718 669 bundler.newpart('error:pushraced',
719 670 [('message', stringutil.forcebytestr(exc))])
720 671 return wireprototypes.streamreslegacy(gen=bundler.getchunks())
@@ -1,811 +1,812
1 1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 3 #
4 4 # This software may be used and distributed according to the terms of the
5 5 # GNU General Public License version 2 or any later version.
6 6
7 7 from __future__ import absolute_import
8 8
9 9 import contextlib
10 10 import struct
11 11 import sys
12 12 import threading
13 13
14 14 from .i18n import _
15 15 from .thirdparty import (
16 16 cbor,
17 17 )
18 18 from .thirdparty.zope import (
19 19 interface as zi,
20 20 )
21 21 from . import (
22 22 encoding,
23 23 error,
24 24 hook,
25 25 pycompat,
26 26 util,
27 27 wireproto,
28 28 wireprototypes,
29 29 wireprotov2server,
30 30 )
31 31 from .utils import (
32 32 procutil,
33 33 )
34 34
35 35 stringio = util.stringio
36 36
37 37 urlerr = util.urlerr
38 38 urlreq = util.urlreq
39 39
40 40 HTTP_OK = 200
41 41
42 42 HGTYPE = 'application/mercurial-0.1'
43 43 HGTYPE2 = 'application/mercurial-0.2'
44 44 HGERRTYPE = 'application/hg-error'
45 45
46 46 SSHV1 = wireprototypes.SSHV1
47 47 SSHV2 = wireprototypes.SSHV2
48 48
49 49 def decodevaluefromheaders(req, headerprefix):
50 50 """Decode a long value from multiple HTTP request headers.
51 51
52 52 Returns the value as a bytes, not a str.
53 53 """
54 54 chunks = []
55 55 i = 1
56 56 while True:
57 57 v = req.headers.get(b'%s-%d' % (headerprefix, i))
58 58 if v is None:
59 59 break
60 60 chunks.append(pycompat.bytesurl(v))
61 61 i += 1
62 62
63 63 return ''.join(chunks)
64 64
65 65 @zi.implementer(wireprototypes.baseprotocolhandler)
66 66 class httpv1protocolhandler(object):
67 67 def __init__(self, req, ui, checkperm):
68 68 self._req = req
69 69 self._ui = ui
70 70 self._checkperm = checkperm
71 71 self._protocaps = None
72 72
73 73 @property
74 74 def name(self):
75 75 return 'http-v1'
76 76
77 77 def getargs(self, args):
78 78 knownargs = self._args()
79 79 data = {}
80 80 keys = args.split()
81 81 for k in keys:
82 82 if k == '*':
83 83 star = {}
84 84 for key in knownargs.keys():
85 85 if key != 'cmd' and key not in keys:
86 86 star[key] = knownargs[key][0]
87 87 data['*'] = star
88 88 else:
89 89 data[k] = knownargs[k][0]
90 90 return [data[k] for k in keys]
91 91
92 92 def _args(self):
93 93 args = self._req.qsparams.asdictoflists()
94 94 postlen = int(self._req.headers.get(b'X-HgArgs-Post', 0))
95 95 if postlen:
96 96 args.update(urlreq.parseqs(
97 97 self._req.bodyfh.read(postlen), keep_blank_values=True))
98 98 return args
99 99
100 100 argvalue = decodevaluefromheaders(self._req, b'X-HgArg')
101 101 args.update(urlreq.parseqs(argvalue, keep_blank_values=True))
102 102 return args
103 103
104 104 def getprotocaps(self):
105 105 if self._protocaps is None:
106 106 value = decodevaluefromheaders(self._req, b'X-HgProto')
107 107 self._protocaps = set(value.split(' '))
108 108 return self._protocaps
109 109
110 110 def getpayload(self):
111 111 # Existing clients *always* send Content-Length.
112 112 length = int(self._req.headers[b'Content-Length'])
113 113
114 114 # If httppostargs is used, we need to read Content-Length
115 115 # minus the amount that was consumed by args.
116 116 length -= int(self._req.headers.get(b'X-HgArgs-Post', 0))
117 117 return util.filechunkiter(self._req.bodyfh, limit=length)
118 118
119 119 @contextlib.contextmanager
120 120 def mayberedirectstdio(self):
121 121 oldout = self._ui.fout
122 122 olderr = self._ui.ferr
123 123
124 124 out = util.stringio()
125 125
126 126 try:
127 127 self._ui.fout = out
128 128 self._ui.ferr = out
129 129 yield out
130 130 finally:
131 131 self._ui.fout = oldout
132 132 self._ui.ferr = olderr
133 133
134 134 def client(self):
135 135 return 'remote:%s:%s:%s' % (
136 136 self._req.urlscheme,
137 137 urlreq.quote(self._req.remotehost or ''),
138 138 urlreq.quote(self._req.remoteuser or ''))
139 139
140 140 def addcapabilities(self, repo, caps):
141 141 caps.append(b'batch')
142 142
143 143 caps.append('httpheader=%d' %
144 144 repo.ui.configint('server', 'maxhttpheaderlen'))
145 145 if repo.ui.configbool('experimental', 'httppostargs'):
146 146 caps.append('httppostargs')
147 147
148 148 # FUTURE advertise 0.2rx once support is implemented
149 149 # FUTURE advertise minrx and mintx after consulting config option
150 150 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
151 151
152 compengines = wireproto.supportedcompengines(repo.ui, util.SERVERROLE)
152 compengines = wireprototypes.supportedcompengines(repo.ui,
153 util.SERVERROLE)
153 154 if compengines:
154 155 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
155 156 for e in compengines)
156 157 caps.append('compression=%s' % comptypes)
157 158
158 159 return caps
159 160
160 161 def checkperm(self, perm):
161 162 return self._checkperm(perm)
162 163
163 164 # This method exists mostly so that extensions like remotefilelog can
164 165 # disable a kludgey legacy method only over http. As of early 2018,
165 166 # there are no other known users, so with any luck we can discard this
166 167 # hook if remotefilelog becomes a first-party extension.
167 168 def iscmd(cmd):
168 169 return cmd in wireproto.commands
169 170
170 171 def handlewsgirequest(rctx, req, res, checkperm):
171 172 """Possibly process a wire protocol request.
172 173
173 174 If the current request is a wire protocol request, the request is
174 175 processed by this function.
175 176
176 177 ``req`` is a ``parsedrequest`` instance.
177 178 ``res`` is a ``wsgiresponse`` instance.
178 179
179 180 Returns a bool indicating if the request was serviced. If set, the caller
180 181 should stop processing the request, as a response has already been issued.
181 182 """
182 183 # Avoid cycle involving hg module.
183 184 from .hgweb import common as hgwebcommon
184 185
185 186 repo = rctx.repo
186 187
187 188 # HTTP version 1 wire protocol requests are denoted by a "cmd" query
188 189 # string parameter. If it isn't present, this isn't a wire protocol
189 190 # request.
190 191 if 'cmd' not in req.qsparams:
191 192 return False
192 193
193 194 cmd = req.qsparams['cmd']
194 195
195 196 # The "cmd" request parameter is used by both the wire protocol and hgweb.
196 197 # While not all wire protocol commands are available for all transports,
197 198 # if we see a "cmd" value that resembles a known wire protocol command, we
198 199 # route it to a protocol handler. This is better than routing possible
199 200 # wire protocol requests to hgweb because it prevents hgweb from using
200 201 # known wire protocol commands and it is less confusing for machine
201 202 # clients.
202 203 if not iscmd(cmd):
203 204 return False
204 205
205 206 # The "cmd" query string argument is only valid on the root path of the
206 207 # repo. e.g. ``/?cmd=foo``, ``/repo?cmd=foo``. URL paths within the repo
207 208 # like ``/blah?cmd=foo`` are not allowed. So don't recognize the request
208 209 # in this case. We send an HTTP 404 for backwards compatibility reasons.
209 210 if req.dispatchpath:
210 211 res.status = hgwebcommon.statusmessage(404)
211 212 res.headers['Content-Type'] = HGTYPE
212 213 # TODO This is not a good response to issue for this request. This
213 214 # is mostly for BC for now.
214 215 res.setbodybytes('0\n%s\n' % b'Not Found')
215 216 return True
216 217
217 218 proto = httpv1protocolhandler(req, repo.ui,
218 219 lambda perm: checkperm(rctx, req, perm))
219 220
220 221 # The permissions checker should be the only thing that can raise an
221 222 # ErrorResponse. It is kind of a layer violation to catch an hgweb
222 223 # exception here. So consider refactoring into a exception type that
223 224 # is associated with the wire protocol.
224 225 try:
225 226 _callhttp(repo, req, res, proto, cmd)
226 227 except hgwebcommon.ErrorResponse as e:
227 228 for k, v in e.headers:
228 229 res.headers[k] = v
229 230 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
230 231 # TODO This response body assumes the failed command was
231 232 # "unbundle." That assumption is not always valid.
232 233 res.setbodybytes('0\n%s\n' % pycompat.bytestr(e))
233 234
234 235 return True
235 236
236 237 def _availableapis(repo):
237 238 apis = set()
238 239
239 240 # Registered APIs are made available via config options of the name of
240 241 # the protocol.
241 242 for k, v in API_HANDLERS.items():
242 243 section, option = v['config']
243 244 if repo.ui.configbool(section, option):
244 245 apis.add(k)
245 246
246 247 return apis
247 248
248 249 def handlewsgiapirequest(rctx, req, res, checkperm):
249 250 """Handle requests to /api/*."""
250 251 assert req.dispatchparts[0] == b'api'
251 252
252 253 repo = rctx.repo
253 254
254 255 # This whole URL space is experimental for now. But we want to
255 256 # reserve the URL space. So, 404 all URLs if the feature isn't enabled.
256 257 if not repo.ui.configbool('experimental', 'web.apiserver'):
257 258 res.status = b'404 Not Found'
258 259 res.headers[b'Content-Type'] = b'text/plain'
259 260 res.setbodybytes(_('Experimental API server endpoint not enabled'))
260 261 return
261 262
262 263 # The URL space is /api/<protocol>/*. The structure of URLs under varies
263 264 # by <protocol>.
264 265
265 266 availableapis = _availableapis(repo)
266 267
267 268 # Requests to /api/ list available APIs.
268 269 if req.dispatchparts == [b'api']:
269 270 res.status = b'200 OK'
270 271 res.headers[b'Content-Type'] = b'text/plain'
271 272 lines = [_('APIs can be accessed at /api/<name>, where <name> can be '
272 273 'one of the following:\n')]
273 274 if availableapis:
274 275 lines.extend(sorted(availableapis))
275 276 else:
276 277 lines.append(_('(no available APIs)\n'))
277 278 res.setbodybytes(b'\n'.join(lines))
278 279 return
279 280
280 281 proto = req.dispatchparts[1]
281 282
282 283 if proto not in API_HANDLERS:
283 284 res.status = b'404 Not Found'
284 285 res.headers[b'Content-Type'] = b'text/plain'
285 286 res.setbodybytes(_('Unknown API: %s\nKnown APIs: %s') % (
286 287 proto, b', '.join(sorted(availableapis))))
287 288 return
288 289
289 290 if proto not in availableapis:
290 291 res.status = b'404 Not Found'
291 292 res.headers[b'Content-Type'] = b'text/plain'
292 293 res.setbodybytes(_('API %s not enabled\n') % proto)
293 294 return
294 295
295 296 API_HANDLERS[proto]['handler'](rctx, req, res, checkperm,
296 297 req.dispatchparts[2:])
297 298
298 299 # Maps API name to metadata so custom API can be registered.
299 300 # Keys are:
300 301 #
301 302 # config
302 303 # Config option that controls whether service is enabled.
303 304 # handler
304 305 # Callable receiving (rctx, req, res, checkperm, urlparts) that is called
305 306 # when a request to this API is received.
306 307 # apidescriptor
307 308 # Callable receiving (req, repo) that is called to obtain an API
308 309 # descriptor for this service. The response must be serializable to CBOR.
309 310 API_HANDLERS = {
310 311 wireprotov2server.HTTP_WIREPROTO_V2: {
311 312 'config': ('experimental', 'web.api.http-v2'),
312 313 'handler': wireprotov2server.handlehttpv2request,
313 314 'apidescriptor': wireprotov2server.httpv2apidescriptor,
314 315 },
315 316 }
316 317
317 318 def _httpresponsetype(ui, proto, prefer_uncompressed):
318 319 """Determine the appropriate response type and compression settings.
319 320
320 321 Returns a tuple of (mediatype, compengine, engineopts).
321 322 """
322 323 # Determine the response media type and compression engine based
323 324 # on the request parameters.
324 325
325 326 if '0.2' in proto.getprotocaps():
326 327 # All clients are expected to support uncompressed data.
327 328 if prefer_uncompressed:
328 329 return HGTYPE2, util._noopengine(), {}
329 330
330 331 # Now find an agreed upon compression format.
331 332 compformats = wireproto.clientcompressionsupport(proto)
332 for engine in wireproto.supportedcompengines(ui, util.SERVERROLE):
333 for engine in wireprototypes.supportedcompengines(ui, util.SERVERROLE):
333 334 if engine.wireprotosupport().name in compformats:
334 335 opts = {}
335 336 level = ui.configint('server', '%slevel' % engine.name())
336 337 if level is not None:
337 338 opts['level'] = level
338 339
339 340 return HGTYPE2, engine, opts
340 341
341 342 # No mutually supported compression format. Fall back to the
342 343 # legacy protocol.
343 344
344 345 # Don't allow untrusted settings because disabling compression or
345 346 # setting a very high compression level could lead to flooding
346 347 # the server's network or CPU.
347 348 opts = {'level': ui.configint('server', 'zliblevel')}
348 349 return HGTYPE, util.compengines['zlib'], opts
349 350
350 351 def processcapabilitieshandshake(repo, req, res, proto):
351 352 """Called during a ?cmd=capabilities request.
352 353
353 354 If the client is advertising support for a newer protocol, we send
354 355 a CBOR response with information about available services. If no
355 356 advertised services are available, we don't handle the request.
356 357 """
357 358 # Fall back to old behavior unless the API server is enabled.
358 359 if not repo.ui.configbool('experimental', 'web.apiserver'):
359 360 return False
360 361
361 362 clientapis = decodevaluefromheaders(req, b'X-HgUpgrade')
362 363 protocaps = decodevaluefromheaders(req, b'X-HgProto')
363 364 if not clientapis or not protocaps:
364 365 return False
365 366
366 367 # We currently only support CBOR responses.
367 368 protocaps = set(protocaps.split(' '))
368 369 if b'cbor' not in protocaps:
369 370 return False
370 371
371 372 descriptors = {}
372 373
373 374 for api in sorted(set(clientapis.split()) & _availableapis(repo)):
374 375 handler = API_HANDLERS[api]
375 376
376 377 descriptorfn = handler.get('apidescriptor')
377 378 if not descriptorfn:
378 379 continue
379 380
380 381 descriptors[api] = descriptorfn(req, repo)
381 382
382 383 v1caps = wireproto.dispatch(repo, proto, 'capabilities')
383 384 assert isinstance(v1caps, wireprototypes.bytesresponse)
384 385
385 386 m = {
386 387 # TODO allow this to be configurable.
387 388 'apibase': 'api/',
388 389 'apis': descriptors,
389 390 'v1capabilities': v1caps.data,
390 391 }
391 392
392 393 res.status = b'200 OK'
393 394 res.headers[b'Content-Type'] = b'application/mercurial-cbor'
394 395 res.setbodybytes(cbor.dumps(m, canonical=True))
395 396
396 397 return True
397 398
398 399 def _callhttp(repo, req, res, proto, cmd):
399 400 # Avoid cycle involving hg module.
400 401 from .hgweb import common as hgwebcommon
401 402
402 403 def genversion2(gen, engine, engineopts):
403 404 # application/mercurial-0.2 always sends a payload header
404 405 # identifying the compression engine.
405 406 name = engine.wireprotosupport().name
406 407 assert 0 < len(name) < 256
407 408 yield struct.pack('B', len(name))
408 409 yield name
409 410
410 411 for chunk in gen:
411 412 yield chunk
412 413
413 414 def setresponse(code, contenttype, bodybytes=None, bodygen=None):
414 415 if code == HTTP_OK:
415 416 res.status = '200 Script output follows'
416 417 else:
417 418 res.status = hgwebcommon.statusmessage(code)
418 419
419 420 res.headers['Content-Type'] = contenttype
420 421
421 422 if bodybytes is not None:
422 423 res.setbodybytes(bodybytes)
423 424 if bodygen is not None:
424 425 res.setbodygen(bodygen)
425 426
426 427 if not wireproto.commands.commandavailable(cmd, proto):
427 428 setresponse(HTTP_OK, HGERRTYPE,
428 429 _('requested wire protocol command is not available over '
429 430 'HTTP'))
430 431 return
431 432
432 433 proto.checkperm(wireproto.commands[cmd].permission)
433 434
434 435 # Possibly handle a modern client wanting to switch protocols.
435 436 if (cmd == 'capabilities' and
436 437 processcapabilitieshandshake(repo, req, res, proto)):
437 438
438 439 return
439 440
440 441 rsp = wireproto.dispatch(repo, proto, cmd)
441 442
442 443 if isinstance(rsp, bytes):
443 444 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
444 445 elif isinstance(rsp, wireprototypes.bytesresponse):
445 446 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp.data)
446 447 elif isinstance(rsp, wireprototypes.streamreslegacy):
447 448 setresponse(HTTP_OK, HGTYPE, bodygen=rsp.gen)
448 449 elif isinstance(rsp, wireprototypes.streamres):
449 450 gen = rsp.gen
450 451
451 452 # This code for compression should not be streamres specific. It
452 453 # is here because we only compress streamres at the moment.
453 454 mediatype, engine, engineopts = _httpresponsetype(
454 455 repo.ui, proto, rsp.prefer_uncompressed)
455 456 gen = engine.compressstream(gen, engineopts)
456 457
457 458 if mediatype == HGTYPE2:
458 459 gen = genversion2(gen, engine, engineopts)
459 460
460 461 setresponse(HTTP_OK, mediatype, bodygen=gen)
461 462 elif isinstance(rsp, wireprototypes.pushres):
462 463 rsp = '%d\n%s' % (rsp.res, rsp.output)
463 464 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
464 465 elif isinstance(rsp, wireprototypes.pusherr):
465 466 rsp = '0\n%s\n' % rsp.res
466 467 res.drain = True
467 468 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
468 469 elif isinstance(rsp, wireprototypes.ooberror):
469 470 setresponse(HTTP_OK, HGERRTYPE, bodybytes=rsp.message)
470 471 else:
471 472 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
472 473
473 474 def _sshv1respondbytes(fout, value):
474 475 """Send a bytes response for protocol version 1."""
475 476 fout.write('%d\n' % len(value))
476 477 fout.write(value)
477 478 fout.flush()
478 479
479 480 def _sshv1respondstream(fout, source):
480 481 write = fout.write
481 482 for chunk in source.gen:
482 483 write(chunk)
483 484 fout.flush()
484 485
485 486 def _sshv1respondooberror(fout, ferr, rsp):
486 487 ferr.write(b'%s\n-\n' % rsp)
487 488 ferr.flush()
488 489 fout.write(b'\n')
489 490 fout.flush()
490 491
491 492 @zi.implementer(wireprototypes.baseprotocolhandler)
492 493 class sshv1protocolhandler(object):
493 494 """Handler for requests services via version 1 of SSH protocol."""
494 495 def __init__(self, ui, fin, fout):
495 496 self._ui = ui
496 497 self._fin = fin
497 498 self._fout = fout
498 499 self._protocaps = set()
499 500
500 501 @property
501 502 def name(self):
502 503 return wireprototypes.SSHV1
503 504
504 505 def getargs(self, args):
505 506 data = {}
506 507 keys = args.split()
507 508 for n in xrange(len(keys)):
508 509 argline = self._fin.readline()[:-1]
509 510 arg, l = argline.split()
510 511 if arg not in keys:
511 512 raise error.Abort(_("unexpected parameter %r") % arg)
512 513 if arg == '*':
513 514 star = {}
514 515 for k in xrange(int(l)):
515 516 argline = self._fin.readline()[:-1]
516 517 arg, l = argline.split()
517 518 val = self._fin.read(int(l))
518 519 star[arg] = val
519 520 data['*'] = star
520 521 else:
521 522 val = self._fin.read(int(l))
522 523 data[arg] = val
523 524 return [data[k] for k in keys]
524 525
525 526 def getprotocaps(self):
526 527 return self._protocaps
527 528
528 529 def getpayload(self):
529 530 # We initially send an empty response. This tells the client it is
530 531 # OK to start sending data. If a client sees any other response, it
531 532 # interprets it as an error.
532 533 _sshv1respondbytes(self._fout, b'')
533 534
534 535 # The file is in the form:
535 536 #
536 537 # <chunk size>\n<chunk>
537 538 # ...
538 539 # 0\n
539 540 count = int(self._fin.readline())
540 541 while count:
541 542 yield self._fin.read(count)
542 543 count = int(self._fin.readline())
543 544
544 545 @contextlib.contextmanager
545 546 def mayberedirectstdio(self):
546 547 yield None
547 548
548 549 def client(self):
549 550 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
550 551 return 'remote:ssh:' + client
551 552
552 553 def addcapabilities(self, repo, caps):
553 554 if self.name == wireprototypes.SSHV1:
554 555 caps.append(b'protocaps')
555 556 caps.append(b'batch')
556 557 return caps
557 558
558 559 def checkperm(self, perm):
559 560 pass
560 561
561 562 class sshv2protocolhandler(sshv1protocolhandler):
562 563 """Protocol handler for version 2 of the SSH protocol."""
563 564
564 565 @property
565 566 def name(self):
566 567 return wireprototypes.SSHV2
567 568
568 569 def addcapabilities(self, repo, caps):
569 570 return caps
570 571
571 572 def _runsshserver(ui, repo, fin, fout, ev):
572 573 # This function operates like a state machine of sorts. The following
573 574 # states are defined:
574 575 #
575 576 # protov1-serving
576 577 # Server is in protocol version 1 serving mode. Commands arrive on
577 578 # new lines. These commands are processed in this state, one command
578 579 # after the other.
579 580 #
580 581 # protov2-serving
581 582 # Server is in protocol version 2 serving mode.
582 583 #
583 584 # upgrade-initial
584 585 # The server is going to process an upgrade request.
585 586 #
586 587 # upgrade-v2-filter-legacy-handshake
587 588 # The protocol is being upgraded to version 2. The server is expecting
588 589 # the legacy handshake from version 1.
589 590 #
590 591 # upgrade-v2-finish
591 592 # The upgrade to version 2 of the protocol is imminent.
592 593 #
593 594 # shutdown
594 595 # The server is shutting down, possibly in reaction to a client event.
595 596 #
596 597 # And here are their transitions:
597 598 #
598 599 # protov1-serving -> shutdown
599 600 # When server receives an empty request or encounters another
600 601 # error.
601 602 #
602 603 # protov1-serving -> upgrade-initial
603 604 # An upgrade request line was seen.
604 605 #
605 606 # upgrade-initial -> upgrade-v2-filter-legacy-handshake
606 607 # Upgrade to version 2 in progress. Server is expecting to
607 608 # process a legacy handshake.
608 609 #
609 610 # upgrade-v2-filter-legacy-handshake -> shutdown
610 611 # Client did not fulfill upgrade handshake requirements.
611 612 #
612 613 # upgrade-v2-filter-legacy-handshake -> upgrade-v2-finish
613 614 # Client fulfilled version 2 upgrade requirements. Finishing that
614 615 # upgrade.
615 616 #
616 617 # upgrade-v2-finish -> protov2-serving
617 618 # Protocol upgrade to version 2 complete. Server can now speak protocol
618 619 # version 2.
619 620 #
620 621 # protov2-serving -> protov1-serving
621 622 # Ths happens by default since protocol version 2 is the same as
622 623 # version 1 except for the handshake.
623 624
624 625 state = 'protov1-serving'
625 626 proto = sshv1protocolhandler(ui, fin, fout)
626 627 protoswitched = False
627 628
628 629 while not ev.is_set():
629 630 if state == 'protov1-serving':
630 631 # Commands are issued on new lines.
631 632 request = fin.readline()[:-1]
632 633
633 634 # Empty lines signal to terminate the connection.
634 635 if not request:
635 636 state = 'shutdown'
636 637 continue
637 638
638 639 # It looks like a protocol upgrade request. Transition state to
639 640 # handle it.
640 641 if request.startswith(b'upgrade '):
641 642 if protoswitched:
642 643 _sshv1respondooberror(fout, ui.ferr,
643 644 b'cannot upgrade protocols multiple '
644 645 b'times')
645 646 state = 'shutdown'
646 647 continue
647 648
648 649 state = 'upgrade-initial'
649 650 continue
650 651
651 652 available = wireproto.commands.commandavailable(request, proto)
652 653
653 654 # This command isn't available. Send an empty response and go
654 655 # back to waiting for a new command.
655 656 if not available:
656 657 _sshv1respondbytes(fout, b'')
657 658 continue
658 659
659 660 rsp = wireproto.dispatch(repo, proto, request)
660 661
661 662 if isinstance(rsp, bytes):
662 663 _sshv1respondbytes(fout, rsp)
663 664 elif isinstance(rsp, wireprototypes.bytesresponse):
664 665 _sshv1respondbytes(fout, rsp.data)
665 666 elif isinstance(rsp, wireprototypes.streamres):
666 667 _sshv1respondstream(fout, rsp)
667 668 elif isinstance(rsp, wireprototypes.streamreslegacy):
668 669 _sshv1respondstream(fout, rsp)
669 670 elif isinstance(rsp, wireprototypes.pushres):
670 671 _sshv1respondbytes(fout, b'')
671 672 _sshv1respondbytes(fout, b'%d' % rsp.res)
672 673 elif isinstance(rsp, wireprototypes.pusherr):
673 674 _sshv1respondbytes(fout, rsp.res)
674 675 elif isinstance(rsp, wireprototypes.ooberror):
675 676 _sshv1respondooberror(fout, ui.ferr, rsp.message)
676 677 else:
677 678 raise error.ProgrammingError('unhandled response type from '
678 679 'wire protocol command: %s' % rsp)
679 680
680 681 # For now, protocol version 2 serving just goes back to version 1.
681 682 elif state == 'protov2-serving':
682 683 state = 'protov1-serving'
683 684 continue
684 685
685 686 elif state == 'upgrade-initial':
686 687 # We should never transition into this state if we've switched
687 688 # protocols.
688 689 assert not protoswitched
689 690 assert proto.name == wireprototypes.SSHV1
690 691
691 692 # Expected: upgrade <token> <capabilities>
692 693 # If we get something else, the request is malformed. It could be
693 694 # from a future client that has altered the upgrade line content.
694 695 # We treat this as an unknown command.
695 696 try:
696 697 token, caps = request.split(b' ')[1:]
697 698 except ValueError:
698 699 _sshv1respondbytes(fout, b'')
699 700 state = 'protov1-serving'
700 701 continue
701 702
702 703 # Send empty response if we don't support upgrading protocols.
703 704 if not ui.configbool('experimental', 'sshserver.support-v2'):
704 705 _sshv1respondbytes(fout, b'')
705 706 state = 'protov1-serving'
706 707 continue
707 708
708 709 try:
709 710 caps = urlreq.parseqs(caps)
710 711 except ValueError:
711 712 _sshv1respondbytes(fout, b'')
712 713 state = 'protov1-serving'
713 714 continue
714 715
715 716 # We don't see an upgrade request to protocol version 2. Ignore
716 717 # the upgrade request.
717 718 wantedprotos = caps.get(b'proto', [b''])[0]
718 719 if SSHV2 not in wantedprotos:
719 720 _sshv1respondbytes(fout, b'')
720 721 state = 'protov1-serving'
721 722 continue
722 723
723 724 # It looks like we can honor this upgrade request to protocol 2.
724 725 # Filter the rest of the handshake protocol request lines.
725 726 state = 'upgrade-v2-filter-legacy-handshake'
726 727 continue
727 728
728 729 elif state == 'upgrade-v2-filter-legacy-handshake':
729 730 # Client should have sent legacy handshake after an ``upgrade``
730 731 # request. Expected lines:
731 732 #
732 733 # hello
733 734 # between
734 735 # pairs 81
735 736 # 0000...-0000...
736 737
737 738 ok = True
738 739 for line in (b'hello', b'between', b'pairs 81'):
739 740 request = fin.readline()[:-1]
740 741
741 742 if request != line:
742 743 _sshv1respondooberror(fout, ui.ferr,
743 744 b'malformed handshake protocol: '
744 745 b'missing %s' % line)
745 746 ok = False
746 747 state = 'shutdown'
747 748 break
748 749
749 750 if not ok:
750 751 continue
751 752
752 753 request = fin.read(81)
753 754 if request != b'%s-%s' % (b'0' * 40, b'0' * 40):
754 755 _sshv1respondooberror(fout, ui.ferr,
755 756 b'malformed handshake protocol: '
756 757 b'missing between argument value')
757 758 state = 'shutdown'
758 759 continue
759 760
760 761 state = 'upgrade-v2-finish'
761 762 continue
762 763
763 764 elif state == 'upgrade-v2-finish':
764 765 # Send the upgrade response.
765 766 fout.write(b'upgraded %s %s\n' % (token, SSHV2))
766 767 servercaps = wireproto.capabilities(repo, proto)
767 768 rsp = b'capabilities: %s' % servercaps.data
768 769 fout.write(b'%d\n%s\n' % (len(rsp), rsp))
769 770 fout.flush()
770 771
771 772 proto = sshv2protocolhandler(ui, fin, fout)
772 773 protoswitched = True
773 774
774 775 state = 'protov2-serving'
775 776 continue
776 777
777 778 elif state == 'shutdown':
778 779 break
779 780
780 781 else:
781 782 raise error.ProgrammingError('unhandled ssh server state: %s' %
782 783 state)
783 784
784 785 class sshserver(object):
785 786 def __init__(self, ui, repo, logfh=None):
786 787 self._ui = ui
787 788 self._repo = repo
788 789 self._fin = ui.fin
789 790 self._fout = ui.fout
790 791
791 792 # Log write I/O to stdout and stderr if configured.
792 793 if logfh:
793 794 self._fout = util.makeloggingfileobject(
794 795 logfh, self._fout, 'o', logdata=True)
795 796 ui.ferr = util.makeloggingfileobject(
796 797 logfh, ui.ferr, 'e', logdata=True)
797 798
798 799 hook.redirect(True)
799 800 ui.fout = repo.ui.fout = ui.ferr
800 801
801 802 # Prevent insertion/deletion of CRs
802 803 procutil.setbinary(self._fin)
803 804 procutil.setbinary(self._fout)
804 805
805 806 def serve_forever(self):
806 807 self.serveuntil(threading.Event())
807 808 sys.exit(0)
808 809
809 810 def serveuntil(self, ev):
810 811 """Serve until a threading.Event is set."""
811 812 _runsshserver(self._ui, self._repo, self._fin, self._fout, ev)
@@ -1,321 +1,375
1 1 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
2 2 #
3 3 # This software may be used and distributed according to the terms of the
4 4 # GNU General Public License version 2 or any later version.
5 5
6 6 from __future__ import absolute_import
7 7
8 8 from .node import (
9 9 bin,
10 10 hex,
11 11 )
12 12 from .thirdparty.zope import (
13 13 interface as zi,
14 14 )
15 from .i18n import _
16 from . import (
17 error,
18 util,
19 )
15 20
16 21 # Names of the SSH protocol implementations.
17 22 SSHV1 = 'ssh-v1'
18 23 # These are advertised over the wire. Increment the counters at the end
19 24 # to reflect BC breakages.
20 25 SSHV2 = 'exp-ssh-v2-0001'
21 26 HTTP_WIREPROTO_V2 = 'exp-http-v2-0001'
22 27
23 28 # All available wire protocol transports.
24 29 TRANSPORTS = {
25 30 SSHV1: {
26 31 'transport': 'ssh',
27 32 'version': 1,
28 33 },
29 34 SSHV2: {
30 35 'transport': 'ssh',
31 36 # TODO mark as version 2 once all commands are implemented.
32 37 'version': 1,
33 38 },
34 39 'http-v1': {
35 40 'transport': 'http',
36 41 'version': 1,
37 42 },
38 43 HTTP_WIREPROTO_V2: {
39 44 'transport': 'http',
40 45 'version': 2,
41 46 }
42 47 }
43 48
44 49 class bytesresponse(object):
45 50 """A wire protocol response consisting of raw bytes."""
46 51 def __init__(self, data):
47 52 self.data = data
48 53
49 54 class ooberror(object):
50 55 """wireproto reply: failure of a batch of operation
51 56
52 57 Something failed during a batch call. The error message is stored in
53 58 `self.message`.
54 59 """
55 60 def __init__(self, message):
56 61 self.message = message
57 62
58 63 class pushres(object):
59 64 """wireproto reply: success with simple integer return
60 65
61 66 The call was successful and returned an integer contained in `self.res`.
62 67 """
63 68 def __init__(self, res, output):
64 69 self.res = res
65 70 self.output = output
66 71
67 72 class pusherr(object):
68 73 """wireproto reply: failure
69 74
70 75 The call failed. The `self.res` attribute contains the error message.
71 76 """
72 77 def __init__(self, res, output):
73 78 self.res = res
74 79 self.output = output
75 80
76 81 class streamres(object):
77 82 """wireproto reply: binary stream
78 83
79 84 The call was successful and the result is a stream.
80 85
81 86 Accepts a generator containing chunks of data to be sent to the client.
82 87
83 88 ``prefer_uncompressed`` indicates that the data is expected to be
84 89 uncompressable and that the stream should therefore use the ``none``
85 90 engine.
86 91 """
87 92 def __init__(self, gen=None, prefer_uncompressed=False):
88 93 self.gen = gen
89 94 self.prefer_uncompressed = prefer_uncompressed
90 95
91 96 class streamreslegacy(object):
92 97 """wireproto reply: uncompressed binary stream
93 98
94 99 The call was successful and the result is a stream.
95 100
96 101 Accepts a generator containing chunks of data to be sent to the client.
97 102
98 103 Like ``streamres``, but sends an uncompressed data for "version 1" clients
99 104 using the application/mercurial-0.1 media type.
100 105 """
101 106 def __init__(self, gen=None):
102 107 self.gen = gen
103 108
104 109 class cborresponse(object):
105 110 """Encode the response value as CBOR."""
106 111 def __init__(self, v):
107 112 self.value = v
108 113
109 114 class v2errorresponse(object):
110 115 """Represents a command error for version 2 transports."""
111 116 def __init__(self, message, args=None):
112 117 self.message = message
113 118 self.args = args
114 119
115 120 class v2streamingresponse(object):
116 121 """A response whose data is supplied by a generator.
117 122
118 123 The generator can either consist of data structures to CBOR
119 124 encode or a stream of already-encoded bytes.
120 125 """
121 126 def __init__(self, gen, compressible=True):
122 127 self.gen = gen
123 128 self.compressible = compressible
124 129
125 130 # list of nodes encoding / decoding
126 131 def decodelist(l, sep=' '):
127 132 if l:
128 133 return [bin(v) for v in l.split(sep)]
129 134 return []
130 135
131 136 def encodelist(l, sep=' '):
132 137 try:
133 138 return sep.join(map(hex, l))
134 139 except TypeError:
135 140 raise
136 141
137 142 # batched call argument encoding
138 143
139 144 def escapebatcharg(plain):
140 145 return (plain
141 146 .replace(':', ':c')
142 147 .replace(',', ':o')
143 148 .replace(';', ':s')
144 149 .replace('=', ':e'))
145 150
146 151 def unescapebatcharg(escaped):
147 152 return (escaped
148 153 .replace(':e', '=')
149 154 .replace(':s', ';')
150 155 .replace(':o', ',')
151 156 .replace(':c', ':'))
152 157
153 158 # mapping of options accepted by getbundle and their types
154 159 #
155 160 # Meant to be extended by extensions. It is extensions responsibility to ensure
156 161 # such options are properly processed in exchange.getbundle.
157 162 #
158 163 # supported types are:
159 164 #
160 165 # :nodes: list of binary nodes
161 166 # :csv: list of comma-separated values
162 167 # :scsv: list of comma-separated values return as set
163 168 # :plain: string with no transformation needed.
164 169 GETBUNDLE_ARGUMENTS = {
165 170 'heads': 'nodes',
166 171 'bookmarks': 'boolean',
167 172 'common': 'nodes',
168 173 'obsmarkers': 'boolean',
169 174 'phases': 'boolean',
170 175 'bundlecaps': 'scsv',
171 176 'listkeys': 'csv',
172 177 'cg': 'boolean',
173 178 'cbattempted': 'boolean',
174 179 'stream': 'boolean',
175 180 }
176 181
177 182 class baseprotocolhandler(zi.Interface):
178 183 """Abstract base class for wire protocol handlers.
179 184
180 185 A wire protocol handler serves as an interface between protocol command
181 186 handlers and the wire protocol transport layer. Protocol handlers provide
182 187 methods to read command arguments, redirect stdio for the duration of
183 188 the request, handle response types, etc.
184 189 """
185 190
186 191 name = zi.Attribute(
187 192 """The name of the protocol implementation.
188 193
189 194 Used for uniquely identifying the transport type.
190 195 """)
191 196
192 197 def getargs(args):
193 198 """return the value for arguments in <args>
194 199
195 200 For version 1 transports, returns a list of values in the same
196 201 order they appear in ``args``. For version 2 transports, returns
197 202 a dict mapping argument name to value.
198 203 """
199 204
200 205 def getprotocaps():
201 206 """Returns the list of protocol-level capabilities of client
202 207
203 208 Returns a list of capabilities as declared by the client for
204 209 the current request (or connection for stateful protocol handlers)."""
205 210
206 211 def getpayload():
207 212 """Provide a generator for the raw payload.
208 213
209 214 The caller is responsible for ensuring that the full payload is
210 215 processed.
211 216 """
212 217
213 218 def mayberedirectstdio():
214 219 """Context manager to possibly redirect stdio.
215 220
216 221 The context manager yields a file-object like object that receives
217 222 stdout and stderr output when the context manager is active. Or it
218 223 yields ``None`` if no I/O redirection occurs.
219 224
220 225 The intent of this context manager is to capture stdio output
221 226 so it may be sent in the response. Some transports support streaming
222 227 stdio to the client in real time. For these transports, stdio output
223 228 won't be captured.
224 229 """
225 230
226 231 def client():
227 232 """Returns a string representation of this client (as bytes)."""
228 233
229 234 def addcapabilities(repo, caps):
230 235 """Adds advertised capabilities specific to this protocol.
231 236
232 237 Receives the list of capabilities collected so far.
233 238
234 239 Returns a list of capabilities. The passed in argument can be returned.
235 240 """
236 241
237 242 def checkperm(perm):
238 243 """Validate that the client has permissions to perform a request.
239 244
240 245 The argument is the permission required to proceed. If the client
241 246 doesn't have that permission, the exception should raise or abort
242 247 in a protocol specific manner.
243 248 """
244 249
245 250 class commandentry(object):
246 251 """Represents a declared wire protocol command."""
247 252 def __init__(self, func, args='', transports=None,
248 253 permission='push'):
249 254 self.func = func
250 255 self.args = args
251 256 self.transports = transports or set()
252 257 self.permission = permission
253 258
254 259 def _merge(self, func, args):
255 260 """Merge this instance with an incoming 2-tuple.
256 261
257 262 This is called when a caller using the old 2-tuple API attempts
258 263 to replace an instance. The incoming values are merged with
259 264 data not captured by the 2-tuple and a new instance containing
260 265 the union of the two objects is returned.
261 266 """
262 267 return commandentry(func, args=args, transports=set(self.transports),
263 268 permission=self.permission)
264 269
265 270 # Old code treats instances as 2-tuples. So expose that interface.
266 271 def __iter__(self):
267 272 yield self.func
268 273 yield self.args
269 274
270 275 def __getitem__(self, i):
271 276 if i == 0:
272 277 return self.func
273 278 elif i == 1:
274 279 return self.args
275 280 else:
276 281 raise IndexError('can only access elements 0 and 1')
277 282
278 283 class commanddict(dict):
279 284 """Container for registered wire protocol commands.
280 285
281 286 It behaves like a dict. But __setitem__ is overwritten to allow silent
282 287 coercion of values from 2-tuples for API compatibility.
283 288 """
284 289 def __setitem__(self, k, v):
285 290 if isinstance(v, commandentry):
286 291 pass
287 292 # Cast 2-tuples to commandentry instances.
288 293 elif isinstance(v, tuple):
289 294 if len(v) != 2:
290 295 raise ValueError('command tuples must have exactly 2 elements')
291 296
292 297 # It is common for extensions to wrap wire protocol commands via
293 298 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
294 299 # doing this aren't aware of the new API that uses objects to store
295 300 # command entries, we automatically merge old state with new.
296 301 if k in self:
297 302 v = self[k]._merge(v[0], v[1])
298 303 else:
299 304 # Use default values from @wireprotocommand.
300 305 v = commandentry(v[0], args=v[1],
301 306 transports=set(TRANSPORTS),
302 307 permission='push')
303 308 else:
304 309 raise ValueError('command entries must be commandentry instances '
305 310 'or 2-tuples')
306 311
307 312 return super(commanddict, self).__setitem__(k, v)
308 313
309 314 def commandavailable(self, command, proto):
310 315 """Determine if a command is available for the requested protocol."""
311 316 assert proto.name in TRANSPORTS
312 317
313 318 entry = self.get(command)
314 319
315 320 if not entry:
316 321 return False
317 322
318 323 if proto.name not in entry.transports:
319 324 return False
320 325
321 326 return True
327
328 def supportedcompengines(ui, role):
329 """Obtain the list of supported compression engines for a request."""
330 assert role in (util.CLIENTROLE, util.SERVERROLE)
331
332 compengines = util.compengines.supportedwireengines(role)
333
334 # Allow config to override default list and ordering.
335 if role == util.SERVERROLE:
336 configengines = ui.configlist('server', 'compressionengines')
337 config = 'server.compressionengines'
338 else:
339 # This is currently implemented mainly to facilitate testing. In most
340 # cases, the server should be in charge of choosing a compression engine
341 # because a server has the most to lose from a sub-optimal choice. (e.g.
342 # CPU DoS due to an expensive engine or a network DoS due to poor
343 # compression ratio).
344 configengines = ui.configlist('experimental',
345 'clientcompressionengines')
346 config = 'experimental.clientcompressionengines'
347
348 # No explicit config. Filter out the ones that aren't supposed to be
349 # advertised and return default ordering.
350 if not configengines:
351 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
352 return [e for e in compengines
353 if getattr(e.wireprotosupport(), attr) > 0]
354
355 # If compression engines are listed in the config, assume there is a good
356 # reason for it (like server operators wanting to achieve specific
357 # performance characteristics). So fail fast if the config references
358 # unusable compression engines.
359 validnames = set(e.name() for e in compengines)
360 invalidnames = set(e for e in configengines if e not in validnames)
361 if invalidnames:
362 raise error.Abort(_('invalid compression engine defined in %s: %s') %
363 (config, ', '.join(sorted(invalidnames))))
364
365 compengines = [e for e in compengines if e.name() in configengines]
366 compengines = sorted(compengines,
367 key=lambda e: configengines.index(e.name()))
368
369 if not compengines:
370 raise error.Abort(_('%s config option does not specify any known '
371 'compression engines') % config,
372 hint=_('usable compression engines: %s') %
373 ', '.sorted(validnames))
374
375 return compengines
@@ -1,533 +1,533
1 1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 3 #
4 4 # This software may be used and distributed according to the terms of the
5 5 # GNU General Public License version 2 or any later version.
6 6
7 7 from __future__ import absolute_import
8 8
9 9 import contextlib
10 10
11 11 from .i18n import _
12 12 from .thirdparty import (
13 13 cbor,
14 14 )
15 15 from .thirdparty.zope import (
16 16 interface as zi,
17 17 )
18 18 from . import (
19 19 encoding,
20 20 error,
21 21 pycompat,
22 22 streamclone,
23 23 util,
24 24 wireproto,
25 25 wireprotoframing,
26 26 wireprototypes,
27 27 )
28 28
29 29 FRAMINGTYPE = b'application/mercurial-exp-framing-0005'
30 30
31 31 HTTP_WIREPROTO_V2 = wireprototypes.HTTP_WIREPROTO_V2
32 32
33 33 def handlehttpv2request(rctx, req, res, checkperm, urlparts):
34 34 from .hgweb import common as hgwebcommon
35 35
36 36 # URL space looks like: <permissions>/<command>, where <permission> can
37 37 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
38 38
39 39 # Root URL does nothing meaningful... yet.
40 40 if not urlparts:
41 41 res.status = b'200 OK'
42 42 res.headers[b'Content-Type'] = b'text/plain'
43 43 res.setbodybytes(_('HTTP version 2 API handler'))
44 44 return
45 45
46 46 if len(urlparts) == 1:
47 47 res.status = b'404 Not Found'
48 48 res.headers[b'Content-Type'] = b'text/plain'
49 49 res.setbodybytes(_('do not know how to process %s\n') %
50 50 req.dispatchpath)
51 51 return
52 52
53 53 permission, command = urlparts[0:2]
54 54
55 55 if permission not in (b'ro', b'rw'):
56 56 res.status = b'404 Not Found'
57 57 res.headers[b'Content-Type'] = b'text/plain'
58 58 res.setbodybytes(_('unknown permission: %s') % permission)
59 59 return
60 60
61 61 if req.method != 'POST':
62 62 res.status = b'405 Method Not Allowed'
63 63 res.headers[b'Allow'] = b'POST'
64 64 res.setbodybytes(_('commands require POST requests'))
65 65 return
66 66
67 67 # At some point we'll want to use our own API instead of recycling the
68 68 # behavior of version 1 of the wire protocol...
69 69 # TODO return reasonable responses - not responses that overload the
70 70 # HTTP status line message for error reporting.
71 71 try:
72 72 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
73 73 except hgwebcommon.ErrorResponse as e:
74 74 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
75 75 for k, v in e.headers:
76 76 res.headers[k] = v
77 77 res.setbodybytes('permission denied')
78 78 return
79 79
80 80 # We have a special endpoint to reflect the request back at the client.
81 81 if command == b'debugreflect':
82 82 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
83 83 return
84 84
85 85 # Extra commands that we handle that aren't really wire protocol
86 86 # commands. Think extra hard before making this hackery available to
87 87 # extension.
88 88 extracommands = {'multirequest'}
89 89
90 90 if command not in wireproto.commandsv2 and command not in extracommands:
91 91 res.status = b'404 Not Found'
92 92 res.headers[b'Content-Type'] = b'text/plain'
93 93 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
94 94 return
95 95
96 96 repo = rctx.repo
97 97 ui = repo.ui
98 98
99 99 proto = httpv2protocolhandler(req, ui)
100 100
101 101 if (not wireproto.commandsv2.commandavailable(command, proto)
102 102 and command not in extracommands):
103 103 res.status = b'404 Not Found'
104 104 res.headers[b'Content-Type'] = b'text/plain'
105 105 res.setbodybytes(_('invalid wire protocol command: %s') % command)
106 106 return
107 107
108 108 # TODO consider cases where proxies may add additional Accept headers.
109 109 if req.headers.get(b'Accept') != FRAMINGTYPE:
110 110 res.status = b'406 Not Acceptable'
111 111 res.headers[b'Content-Type'] = b'text/plain'
112 112 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
113 113 % FRAMINGTYPE)
114 114 return
115 115
116 116 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
117 117 res.status = b'415 Unsupported Media Type'
118 118 # TODO we should send a response with appropriate media type,
119 119 # since client does Accept it.
120 120 res.headers[b'Content-Type'] = b'text/plain'
121 121 res.setbodybytes(_('client MUST send Content-Type header with '
122 122 'value: %s\n') % FRAMINGTYPE)
123 123 return
124 124
125 125 _processhttpv2request(ui, repo, req, res, permission, command, proto)
126 126
127 127 def _processhttpv2reflectrequest(ui, repo, req, res):
128 128 """Reads unified frame protocol request and dumps out state to client.
129 129
130 130 This special endpoint can be used to help debug the wire protocol.
131 131
132 132 Instead of routing the request through the normal dispatch mechanism,
133 133 we instead read all frames, decode them, and feed them into our state
134 134 tracker. We then dump the log of all that activity back out to the
135 135 client.
136 136 """
137 137 import json
138 138
139 139 # Reflection APIs have a history of being abused, accidentally disclosing
140 140 # sensitive data, etc. So we have a config knob.
141 141 if not ui.configbool('experimental', 'web.api.debugreflect'):
142 142 res.status = b'404 Not Found'
143 143 res.headers[b'Content-Type'] = b'text/plain'
144 144 res.setbodybytes(_('debugreflect service not available'))
145 145 return
146 146
147 147 # We assume we have a unified framing protocol request body.
148 148
149 149 reactor = wireprotoframing.serverreactor()
150 150 states = []
151 151
152 152 while True:
153 153 frame = wireprotoframing.readframe(req.bodyfh)
154 154
155 155 if not frame:
156 156 states.append(b'received: <no frame>')
157 157 break
158 158
159 159 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
160 160 frame.requestid,
161 161 frame.payload))
162 162
163 163 action, meta = reactor.onframerecv(frame)
164 164 states.append(json.dumps((action, meta), sort_keys=True,
165 165 separators=(', ', ': ')))
166 166
167 167 action, meta = reactor.oninputeof()
168 168 meta['action'] = action
169 169 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
170 170
171 171 res.status = b'200 OK'
172 172 res.headers[b'Content-Type'] = b'text/plain'
173 173 res.setbodybytes(b'\n'.join(states))
174 174
175 175 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
176 176 """Post-validation handler for HTTPv2 requests.
177 177
178 178 Called when the HTTP request contains unified frame-based protocol
179 179 frames for evaluation.
180 180 """
181 181 # TODO Some HTTP clients are full duplex and can receive data before
182 182 # the entire request is transmitted. Figure out a way to indicate support
183 183 # for that so we can opt into full duplex mode.
184 184 reactor = wireprotoframing.serverreactor(deferoutput=True)
185 185 seencommand = False
186 186
187 187 outstream = reactor.makeoutputstream()
188 188
189 189 while True:
190 190 frame = wireprotoframing.readframe(req.bodyfh)
191 191 if not frame:
192 192 break
193 193
194 194 action, meta = reactor.onframerecv(frame)
195 195
196 196 if action == 'wantframe':
197 197 # Need more data before we can do anything.
198 198 continue
199 199 elif action == 'runcommand':
200 200 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
201 201 reqcommand, reactor, outstream,
202 202 meta, issubsequent=seencommand)
203 203
204 204 if sentoutput:
205 205 return
206 206
207 207 seencommand = True
208 208
209 209 elif action == 'error':
210 210 # TODO define proper error mechanism.
211 211 res.status = b'200 OK'
212 212 res.headers[b'Content-Type'] = b'text/plain'
213 213 res.setbodybytes(meta['message'] + b'\n')
214 214 return
215 215 else:
216 216 raise error.ProgrammingError(
217 217 'unhandled action from frame processor: %s' % action)
218 218
219 219 action, meta = reactor.oninputeof()
220 220 if action == 'sendframes':
221 221 # We assume we haven't started sending the response yet. If we're
222 222 # wrong, the response type will raise an exception.
223 223 res.status = b'200 OK'
224 224 res.headers[b'Content-Type'] = FRAMINGTYPE
225 225 res.setbodygen(meta['framegen'])
226 226 elif action == 'noop':
227 227 pass
228 228 else:
229 229 raise error.ProgrammingError('unhandled action from frame processor: %s'
230 230 % action)
231 231
232 232 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
233 233 outstream, command, issubsequent):
234 234 """Dispatch a wire protocol command made from HTTPv2 requests.
235 235
236 236 The authenticated permission (``authedperm``) along with the original
237 237 command from the URL (``reqcommand``) are passed in.
238 238 """
239 239 # We already validated that the session has permissions to perform the
240 240 # actions in ``authedperm``. In the unified frame protocol, the canonical
241 241 # command to run is expressed in a frame. However, the URL also requested
242 242 # to run a specific command. We need to be careful that the command we
243 243 # run doesn't have permissions requirements greater than what was granted
244 244 # by ``authedperm``.
245 245 #
246 246 # Our rule for this is we only allow one command per HTTP request and
247 247 # that command must match the command in the URL. However, we make
248 248 # an exception for the ``multirequest`` URL. This URL is allowed to
249 249 # execute multiple commands. We double check permissions of each command
250 250 # as it is invoked to ensure there is no privilege escalation.
251 251 # TODO consider allowing multiple commands to regular command URLs
252 252 # iff each command is the same.
253 253
254 254 proto = httpv2protocolhandler(req, ui, args=command['args'])
255 255
256 256 if reqcommand == b'multirequest':
257 257 if not wireproto.commandsv2.commandavailable(command['command'], proto):
258 258 # TODO proper error mechanism
259 259 res.status = b'200 OK'
260 260 res.headers[b'Content-Type'] = b'text/plain'
261 261 res.setbodybytes(_('wire protocol command not available: %s') %
262 262 command['command'])
263 263 return True
264 264
265 265 # TODO don't use assert here, since it may be elided by -O.
266 266 assert authedperm in (b'ro', b'rw')
267 267 wirecommand = wireproto.commandsv2[command['command']]
268 268 assert wirecommand.permission in ('push', 'pull')
269 269
270 270 if authedperm == b'ro' and wirecommand.permission != 'pull':
271 271 # TODO proper error mechanism
272 272 res.status = b'403 Forbidden'
273 273 res.headers[b'Content-Type'] = b'text/plain'
274 274 res.setbodybytes(_('insufficient permissions to execute '
275 275 'command: %s') % command['command'])
276 276 return True
277 277
278 278 # TODO should we also call checkperm() here? Maybe not if we're going
279 279 # to overhaul that API. The granted scope from the URL check should
280 280 # be good enough.
281 281
282 282 else:
283 283 # Don't allow multiple commands outside of ``multirequest`` URL.
284 284 if issubsequent:
285 285 # TODO proper error mechanism
286 286 res.status = b'200 OK'
287 287 res.headers[b'Content-Type'] = b'text/plain'
288 288 res.setbodybytes(_('multiple commands cannot be issued to this '
289 289 'URL'))
290 290 return True
291 291
292 292 if reqcommand != command['command']:
293 293 # TODO define proper error mechanism
294 294 res.status = b'200 OK'
295 295 res.headers[b'Content-Type'] = b'text/plain'
296 296 res.setbodybytes(_('command in frame must match command in URL'))
297 297 return True
298 298
299 299 rsp = dispatch(repo, proto, command['command'])
300 300
301 301 res.status = b'200 OK'
302 302 res.headers[b'Content-Type'] = FRAMINGTYPE
303 303
304 304 if isinstance(rsp, wireprototypes.cborresponse):
305 305 encoded = cbor.dumps(rsp.value, canonical=True)
306 306 action, meta = reactor.oncommandresponseready(outstream,
307 307 command['requestid'],
308 308 encoded)
309 309 elif isinstance(rsp, wireprototypes.v2streamingresponse):
310 310 action, meta = reactor.oncommandresponsereadygen(outstream,
311 311 command['requestid'],
312 312 rsp.gen)
313 313 elif isinstance(rsp, wireprototypes.v2errorresponse):
314 314 action, meta = reactor.oncommanderror(outstream,
315 315 command['requestid'],
316 316 rsp.message,
317 317 rsp.args)
318 318 else:
319 319 action, meta = reactor.onservererror(
320 320 _('unhandled response type from wire proto command'))
321 321
322 322 if action == 'sendframes':
323 323 res.setbodygen(meta['framegen'])
324 324 return True
325 325 elif action == 'noop':
326 326 return False
327 327 else:
328 328 raise error.ProgrammingError('unhandled event from reactor: %s' %
329 329 action)
330 330
331 331 def getdispatchrepo(repo, proto, command):
332 332 return repo.filtered('served')
333 333
334 334 def dispatch(repo, proto, command):
335 335 repo = getdispatchrepo(repo, proto, command)
336 336
337 337 func, spec = wireproto.commandsv2[command]
338 338 args = proto.getargs(spec)
339 339
340 340 return func(repo, proto, **args)
341 341
342 342 @zi.implementer(wireprototypes.baseprotocolhandler)
343 343 class httpv2protocolhandler(object):
344 344 def __init__(self, req, ui, args=None):
345 345 self._req = req
346 346 self._ui = ui
347 347 self._args = args
348 348
349 349 @property
350 350 def name(self):
351 351 return HTTP_WIREPROTO_V2
352 352
353 353 def getargs(self, args):
354 354 data = {}
355 355 for k, typ in args.items():
356 356 if k == '*':
357 357 raise NotImplementedError('do not support * args')
358 358 elif k in self._args:
359 359 # TODO consider validating value types.
360 360 data[k] = self._args[k]
361 361
362 362 return data
363 363
364 364 def getprotocaps(self):
365 365 # Protocol capabilities are currently not implemented for HTTP V2.
366 366 return set()
367 367
368 368 def getpayload(self):
369 369 raise NotImplementedError
370 370
371 371 @contextlib.contextmanager
372 372 def mayberedirectstdio(self):
373 373 raise NotImplementedError
374 374
375 375 def client(self):
376 376 raise NotImplementedError
377 377
378 378 def addcapabilities(self, repo, caps):
379 379 return caps
380 380
381 381 def checkperm(self, perm):
382 382 raise NotImplementedError
383 383
384 384 def httpv2apidescriptor(req, repo):
385 385 proto = httpv2protocolhandler(req, repo.ui)
386 386
387 387 return _capabilitiesv2(repo, proto)
388 388
389 389 def _capabilitiesv2(repo, proto):
390 390 """Obtain the set of capabilities for version 2 transports.
391 391
392 392 These capabilities are distinct from the capabilities for version 1
393 393 transports.
394 394 """
395 395 compression = []
396 for engine in wireproto.supportedcompengines(repo.ui, util.SERVERROLE):
396 for engine in wireprototypes.supportedcompengines(repo.ui, util.SERVERROLE):
397 397 compression.append({
398 398 b'name': engine.wireprotosupport().name,
399 399 })
400 400
401 401 caps = {
402 402 'commands': {},
403 403 'compression': compression,
404 404 'framingmediatypes': [FRAMINGTYPE],
405 405 }
406 406
407 407 for command, entry in wireproto.commandsv2.items():
408 408 caps['commands'][command] = {
409 409 'args': entry.args,
410 410 'permissions': [entry.permission],
411 411 }
412 412
413 413 if streamclone.allowservergeneration(repo):
414 414 caps['rawrepoformats'] = sorted(repo.requirements &
415 415 repo.supportedformats)
416 416
417 417 return proto.addcapabilities(repo, caps)
418 418
419 419 def wireprotocommand(name, args=None, permission='push'):
420 420 """Decorator to declare a wire protocol command.
421 421
422 422 ``name`` is the name of the wire protocol command being provided.
423 423
424 424 ``args`` is a dict of argument names to example values.
425 425
426 426 ``permission`` defines the permission type needed to run this command.
427 427 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
428 428 respectively. Default is to assume command requires ``push`` permissions
429 429 because otherwise commands not declaring their permissions could modify
430 430 a repository that is supposed to be read-only.
431 431 """
432 432 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
433 433 if v['version'] == 2}
434 434
435 435 if permission not in ('push', 'pull'):
436 436 raise error.ProgrammingError('invalid wire protocol permission; '
437 437 'got %s; expected "push" or "pull"' %
438 438 permission)
439 439
440 440 if args is None:
441 441 args = {}
442 442
443 443 if not isinstance(args, dict):
444 444 raise error.ProgrammingError('arguments for version 2 commands '
445 445 'must be declared as dicts')
446 446
447 447 def register(func):
448 448 if name in wireproto.commandsv2:
449 449 raise error.ProgrammingError('%s command already registered '
450 450 'for version 2' % name)
451 451
452 452 wireproto.commandsv2[name] = wireprototypes.commandentry(
453 453 func, args=args, transports=transports, permission=permission)
454 454
455 455 return func
456 456
457 457 return register
458 458
459 459 @wireprotocommand('branchmap', permission='pull')
460 460 def branchmapv2(repo, proto):
461 461 branchmap = {encoding.fromlocal(k): v
462 462 for k, v in repo.branchmap().iteritems()}
463 463
464 464 return wireprototypes.cborresponse(branchmap)
465 465
466 466 @wireprotocommand('capabilities', permission='pull')
467 467 def capabilitiesv2(repo, proto):
468 468 caps = _capabilitiesv2(repo, proto)
469 469
470 470 return wireprototypes.cborresponse(caps)
471 471
472 472 @wireprotocommand('heads',
473 473 args={
474 474 'publiconly': False,
475 475 },
476 476 permission='pull')
477 477 def headsv2(repo, proto, publiconly=False):
478 478 if publiconly:
479 479 repo = repo.filtered('immutable')
480 480
481 481 return wireprototypes.cborresponse(repo.heads())
482 482
483 483 @wireprotocommand('known',
484 484 args={
485 485 'nodes': [b'deadbeef'],
486 486 },
487 487 permission='pull')
488 488 def knownv2(repo, proto, nodes=None):
489 489 nodes = nodes or []
490 490 result = b''.join(b'1' if n else b'0' for n in repo.known(nodes))
491 491 return wireprototypes.cborresponse(result)
492 492
493 493 @wireprotocommand('listkeys',
494 494 args={
495 495 'namespace': b'ns',
496 496 },
497 497 permission='pull')
498 498 def listkeysv2(repo, proto, namespace=None):
499 499 keys = repo.listkeys(encoding.tolocal(namespace))
500 500 keys = {encoding.fromlocal(k): encoding.fromlocal(v)
501 501 for k, v in keys.iteritems()}
502 502
503 503 return wireprototypes.cborresponse(keys)
504 504
505 505 @wireprotocommand('lookup',
506 506 args={
507 507 'key': b'foo',
508 508 },
509 509 permission='pull')
510 510 def lookupv2(repo, proto, key):
511 511 key = encoding.tolocal(key)
512 512
513 513 # TODO handle exception.
514 514 node = repo.lookup(key)
515 515
516 516 return wireprototypes.cborresponse(node)
517 517
518 518 @wireprotocommand('pushkey',
519 519 args={
520 520 'namespace': b'ns',
521 521 'key': b'key',
522 522 'old': b'old',
523 523 'new': b'new',
524 524 },
525 525 permission='push')
526 526 def pushkeyv2(repo, proto, namespace, key, old, new):
527 527 # TODO handle ui output redirection
528 528 r = repo.pushkey(encoding.tolocal(namespace),
529 529 encoding.tolocal(key),
530 530 encoding.tolocal(old),
531 531 encoding.tolocal(new))
532 532
533 533 return wireprototypes.cborresponse(r)
General Comments 0
You need to be logged in to leave comments. Login now