##// END OF EJS Templates
wireproto: move version 2 commands dict to wireprotov2server...
Gregory Szorc -
r37802:ee0d5e9d default
parent child Browse files
Show More
@@ -1,671 +1,667
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import tempfile
12 12
13 13 from .i18n import _
14 14 from .node import (
15 15 hex,
16 16 nullid,
17 17 )
18 18
19 19 from . import (
20 20 bundle2,
21 21 changegroup as changegroupmod,
22 22 discovery,
23 23 encoding,
24 24 error,
25 25 exchange,
26 26 pushkey as pushkeymod,
27 27 pycompat,
28 28 streamclone,
29 29 util,
30 30 wireprototypes,
31 31 )
32 32
33 33 from .utils import (
34 34 procutil,
35 35 stringutil,
36 36 )
37 37
38 38 urlerr = util.urlerr
39 39 urlreq = util.urlreq
40 40
41 41 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
42 42 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
43 43 'IncompatibleClient')
44 44 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
45 45
46 46 def clientcompressionsupport(proto):
47 47 """Returns a list of compression methods supported by the client.
48 48
49 49 Returns a list of the compression methods supported by the client
50 50 according to the protocol capabilities. If no such capability has
51 51 been announced, fallback to the default of zlib and uncompressed.
52 52 """
53 53 for cap in proto.getprotocaps():
54 54 if cap.startswith('comp='):
55 55 return cap[5:].split(',')
56 56 return ['zlib', 'none']
57 57
58 58 # wire protocol command can either return a string or one of these classes.
59 59
60 60 def getdispatchrepo(repo, proto, command):
61 61 """Obtain the repo used for processing wire protocol commands.
62 62
63 63 The intent of this function is to serve as a monkeypatch point for
64 64 extensions that need commands to operate on different repo views under
65 65 specialized circumstances.
66 66 """
67 67 return repo.filtered('served')
68 68
69 69 def dispatch(repo, proto, command):
70 70 repo = getdispatchrepo(repo, proto, command)
71 71
72 72 func, spec = commands[command]
73 73 args = proto.getargs(spec)
74 74
75 75 return func(repo, proto, *args)
76 76
77 77 def options(cmd, keys, others):
78 78 opts = {}
79 79 for k in keys:
80 80 if k in others:
81 81 opts[k] = others[k]
82 82 del others[k]
83 83 if others:
84 84 procutil.stderr.write("warning: %s ignored unexpected arguments %s\n"
85 85 % (cmd, ",".join(others)))
86 86 return opts
87 87
88 88 def bundle1allowed(repo, action):
89 89 """Whether a bundle1 operation is allowed from the server.
90 90
91 91 Priority is:
92 92
93 93 1. server.bundle1gd.<action> (if generaldelta active)
94 94 2. server.bundle1.<action>
95 95 3. server.bundle1gd (if generaldelta active)
96 96 4. server.bundle1
97 97 """
98 98 ui = repo.ui
99 99 gd = 'generaldelta' in repo.requirements
100 100
101 101 if gd:
102 102 v = ui.configbool('server', 'bundle1gd.%s' % action)
103 103 if v is not None:
104 104 return v
105 105
106 106 v = ui.configbool('server', 'bundle1.%s' % action)
107 107 if v is not None:
108 108 return v
109 109
110 110 if gd:
111 111 v = ui.configbool('server', 'bundle1gd')
112 112 if v is not None:
113 113 return v
114 114
115 115 return ui.configbool('server', 'bundle1')
116 116
117 # For version 1 transports.
118 117 commands = wireprototypes.commanddict()
119 118
120 # For version 2 transports.
121 commandsv2 = wireprototypes.commanddict()
122
123 119 def wireprotocommand(name, args=None, permission='push'):
124 120 """Decorator to declare a wire protocol command.
125 121
126 122 ``name`` is the name of the wire protocol command being provided.
127 123
128 124 ``args`` defines the named arguments accepted by the command. It is
129 125 a space-delimited list of argument names. ``*`` denotes a special value
130 126 that says to accept all named arguments.
131 127
132 128 ``permission`` defines the permission type needed to run this command.
133 129 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
134 130 respectively. Default is to assume command requires ``push`` permissions
135 131 because otherwise commands not declaring their permissions could modify
136 132 a repository that is supposed to be read-only.
137 133 """
138 134 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
139 135 if v['version'] == 1}
140 136
141 137 # Because SSHv2 is a mirror of SSHv1, we allow "batch" commands through to
142 138 # SSHv2.
143 139 # TODO undo this hack when SSH is using the unified frame protocol.
144 140 if name == b'batch':
145 141 transports.add(wireprototypes.SSHV2)
146 142
147 143 if permission not in ('push', 'pull'):
148 144 raise error.ProgrammingError('invalid wire protocol permission; '
149 145 'got %s; expected "push" or "pull"' %
150 146 permission)
151 147
152 148 if args is None:
153 149 args = ''
154 150
155 151 if not isinstance(args, bytes):
156 152 raise error.ProgrammingError('arguments for version 1 commands '
157 153 'must be declared as bytes')
158 154
159 155 def register(func):
160 156 if name in commands:
161 157 raise error.ProgrammingError('%s command already registered '
162 158 'for version 1' % name)
163 159 commands[name] = wireprototypes.commandentry(
164 160 func, args=args, transports=transports, permission=permission)
165 161
166 162 return func
167 163 return register
168 164
169 165 # TODO define a more appropriate permissions type to use for this.
170 166 @wireprotocommand('batch', 'cmds *', permission='pull')
171 167 def batch(repo, proto, cmds, others):
172 168 unescapearg = wireprototypes.unescapebatcharg
173 169 repo = repo.filtered("served")
174 170 res = []
175 171 for pair in cmds.split(';'):
176 172 op, args = pair.split(' ', 1)
177 173 vals = {}
178 174 for a in args.split(','):
179 175 if a:
180 176 n, v = a.split('=')
181 177 vals[unescapearg(n)] = unescapearg(v)
182 178 func, spec = commands[op]
183 179
184 180 # Validate that client has permissions to perform this command.
185 181 perm = commands[op].permission
186 182 assert perm in ('push', 'pull')
187 183 proto.checkperm(perm)
188 184
189 185 if spec:
190 186 keys = spec.split()
191 187 data = {}
192 188 for k in keys:
193 189 if k == '*':
194 190 star = {}
195 191 for key in vals.keys():
196 192 if key not in keys:
197 193 star[key] = vals[key]
198 194 data['*'] = star
199 195 else:
200 196 data[k] = vals[k]
201 197 result = func(repo, proto, *[data[k] for k in keys])
202 198 else:
203 199 result = func(repo, proto)
204 200 if isinstance(result, wireprototypes.ooberror):
205 201 return result
206 202
207 203 # For now, all batchable commands must return bytesresponse or
208 204 # raw bytes (for backwards compatibility).
209 205 assert isinstance(result, (wireprototypes.bytesresponse, bytes))
210 206 if isinstance(result, wireprototypes.bytesresponse):
211 207 result = result.data
212 208 res.append(wireprototypes.escapebatcharg(result))
213 209
214 210 return wireprototypes.bytesresponse(';'.join(res))
215 211
216 212 @wireprotocommand('between', 'pairs', permission='pull')
217 213 def between(repo, proto, pairs):
218 214 pairs = [wireprototypes.decodelist(p, '-') for p in pairs.split(" ")]
219 215 r = []
220 216 for b in repo.between(pairs):
221 217 r.append(wireprototypes.encodelist(b) + "\n")
222 218
223 219 return wireprototypes.bytesresponse(''.join(r))
224 220
225 221 @wireprotocommand('branchmap', permission='pull')
226 222 def branchmap(repo, proto):
227 223 branchmap = repo.branchmap()
228 224 heads = []
229 225 for branch, nodes in branchmap.iteritems():
230 226 branchname = urlreq.quote(encoding.fromlocal(branch))
231 227 branchnodes = wireprototypes.encodelist(nodes)
232 228 heads.append('%s %s' % (branchname, branchnodes))
233 229
234 230 return wireprototypes.bytesresponse('\n'.join(heads))
235 231
236 232 @wireprotocommand('branches', 'nodes', permission='pull')
237 233 def branches(repo, proto, nodes):
238 234 nodes = wireprototypes.decodelist(nodes)
239 235 r = []
240 236 for b in repo.branches(nodes):
241 237 r.append(wireprototypes.encodelist(b) + "\n")
242 238
243 239 return wireprototypes.bytesresponse(''.join(r))
244 240
245 241 @wireprotocommand('clonebundles', '', permission='pull')
246 242 def clonebundles(repo, proto):
247 243 """Server command for returning info for available bundles to seed clones.
248 244
249 245 Clients will parse this response and determine what bundle to fetch.
250 246
251 247 Extensions may wrap this command to filter or dynamically emit data
252 248 depending on the request. e.g. you could advertise URLs for the closest
253 249 data center given the client's IP address.
254 250 """
255 251 return wireprototypes.bytesresponse(
256 252 repo.vfs.tryread('clonebundles.manifest'))
257 253
258 254 wireprotocaps = ['lookup', 'branchmap', 'pushkey',
259 255 'known', 'getbundle', 'unbundlehash']
260 256
261 257 def _capabilities(repo, proto):
262 258 """return a list of capabilities for a repo
263 259
264 260 This function exists to allow extensions to easily wrap capabilities
265 261 computation
266 262
267 263 - returns a lists: easy to alter
268 264 - change done here will be propagated to both `capabilities` and `hello`
269 265 command without any other action needed.
270 266 """
271 267 # copy to prevent modification of the global list
272 268 caps = list(wireprotocaps)
273 269
274 270 # Command of same name as capability isn't exposed to version 1 of
275 271 # transports. So conditionally add it.
276 272 if commands.commandavailable('changegroupsubset', proto):
277 273 caps.append('changegroupsubset')
278 274
279 275 if streamclone.allowservergeneration(repo):
280 276 if repo.ui.configbool('server', 'preferuncompressed'):
281 277 caps.append('stream-preferred')
282 278 requiredformats = repo.requirements & repo.supportedformats
283 279 # if our local revlogs are just revlogv1, add 'stream' cap
284 280 if not requiredformats - {'revlogv1'}:
285 281 caps.append('stream')
286 282 # otherwise, add 'streamreqs' detailing our local revlog format
287 283 else:
288 284 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
289 285 if repo.ui.configbool('experimental', 'bundle2-advertise'):
290 286 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
291 287 caps.append('bundle2=' + urlreq.quote(capsblob))
292 288 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
293 289
294 290 return proto.addcapabilities(repo, caps)
295 291
296 292 # If you are writing an extension and consider wrapping this function. Wrap
297 293 # `_capabilities` instead.
298 294 @wireprotocommand('capabilities', permission='pull')
299 295 def capabilities(repo, proto):
300 296 caps = _capabilities(repo, proto)
301 297 return wireprototypes.bytesresponse(' '.join(sorted(caps)))
302 298
303 299 @wireprotocommand('changegroup', 'roots', permission='pull')
304 300 def changegroup(repo, proto, roots):
305 301 nodes = wireprototypes.decodelist(roots)
306 302 outgoing = discovery.outgoing(repo, missingroots=nodes,
307 303 missingheads=repo.heads())
308 304 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
309 305 gen = iter(lambda: cg.read(32768), '')
310 306 return wireprototypes.streamres(gen=gen)
311 307
312 308 @wireprotocommand('changegroupsubset', 'bases heads',
313 309 permission='pull')
314 310 def changegroupsubset(repo, proto, bases, heads):
315 311 bases = wireprototypes.decodelist(bases)
316 312 heads = wireprototypes.decodelist(heads)
317 313 outgoing = discovery.outgoing(repo, missingroots=bases,
318 314 missingheads=heads)
319 315 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
320 316 gen = iter(lambda: cg.read(32768), '')
321 317 return wireprototypes.streamres(gen=gen)
322 318
323 319 @wireprotocommand('debugwireargs', 'one two *',
324 320 permission='pull')
325 321 def debugwireargs(repo, proto, one, two, others):
326 322 # only accept optional args from the known set
327 323 opts = options('debugwireargs', ['three', 'four'], others)
328 324 return wireprototypes.bytesresponse(repo.debugwireargs(
329 325 one, two, **pycompat.strkwargs(opts)))
330 326
331 327 def find_pullbundle(repo, proto, opts, clheads, heads, common):
332 328 """Return a file object for the first matching pullbundle.
333 329
334 330 Pullbundles are specified in .hg/pullbundles.manifest similar to
335 331 clonebundles.
336 332 For each entry, the bundle specification is checked for compatibility:
337 333 - Client features vs the BUNDLESPEC.
338 334 - Revisions shared with the clients vs base revisions of the bundle.
339 335 A bundle can be applied only if all its base revisions are known by
340 336 the client.
341 337 - At least one leaf of the bundle's DAG is missing on the client.
342 338 - Every leaf of the bundle's DAG is part of node set the client wants.
343 339 E.g. do not send a bundle of all changes if the client wants only
344 340 one specific branch of many.
345 341 """
346 342 def decodehexstring(s):
347 343 return set([h.decode('hex') for h in s.split(';')])
348 344
349 345 manifest = repo.vfs.tryread('pullbundles.manifest')
350 346 if not manifest:
351 347 return None
352 348 res = exchange.parseclonebundlesmanifest(repo, manifest)
353 349 res = exchange.filterclonebundleentries(repo, res)
354 350 if not res:
355 351 return None
356 352 cl = repo.changelog
357 353 heads_anc = cl.ancestors([cl.rev(rev) for rev in heads], inclusive=True)
358 354 common_anc = cl.ancestors([cl.rev(rev) for rev in common], inclusive=True)
359 355 compformats = clientcompressionsupport(proto)
360 356 for entry in res:
361 357 if 'COMPRESSION' in entry and entry['COMPRESSION'] not in compformats:
362 358 continue
363 359 # No test yet for VERSION, since V2 is supported by any client
364 360 # that advertises partial pulls
365 361 if 'heads' in entry:
366 362 try:
367 363 bundle_heads = decodehexstring(entry['heads'])
368 364 except TypeError:
369 365 # Bad heads entry
370 366 continue
371 367 if bundle_heads.issubset(common):
372 368 continue # Nothing new
373 369 if all(cl.rev(rev) in common_anc for rev in bundle_heads):
374 370 continue # Still nothing new
375 371 if any(cl.rev(rev) not in heads_anc and
376 372 cl.rev(rev) not in common_anc for rev in bundle_heads):
377 373 continue
378 374 if 'bases' in entry:
379 375 try:
380 376 bundle_bases = decodehexstring(entry['bases'])
381 377 except TypeError:
382 378 # Bad bases entry
383 379 continue
384 380 if not all(cl.rev(rev) in common_anc for rev in bundle_bases):
385 381 continue
386 382 path = entry['URL']
387 383 repo.ui.debug('sending pullbundle "%s"\n' % path)
388 384 try:
389 385 return repo.vfs.open(path)
390 386 except IOError:
391 387 repo.ui.debug('pullbundle "%s" not accessible\n' % path)
392 388 continue
393 389 return None
394 390
395 391 @wireprotocommand('getbundle', '*', permission='pull')
396 392 def getbundle(repo, proto, others):
397 393 opts = options('getbundle', wireprototypes.GETBUNDLE_ARGUMENTS.keys(),
398 394 others)
399 395 for k, v in opts.iteritems():
400 396 keytype = wireprototypes.GETBUNDLE_ARGUMENTS[k]
401 397 if keytype == 'nodes':
402 398 opts[k] = wireprototypes.decodelist(v)
403 399 elif keytype == 'csv':
404 400 opts[k] = list(v.split(','))
405 401 elif keytype == 'scsv':
406 402 opts[k] = set(v.split(','))
407 403 elif keytype == 'boolean':
408 404 # Client should serialize False as '0', which is a non-empty string
409 405 # so it evaluates as a True bool.
410 406 if v == '0':
411 407 opts[k] = False
412 408 else:
413 409 opts[k] = bool(v)
414 410 elif keytype != 'plain':
415 411 raise KeyError('unknown getbundle option type %s'
416 412 % keytype)
417 413
418 414 if not bundle1allowed(repo, 'pull'):
419 415 if not exchange.bundle2requested(opts.get('bundlecaps')):
420 416 if proto.name == 'http-v1':
421 417 return wireprototypes.ooberror(bundle2required)
422 418 raise error.Abort(bundle2requiredmain,
423 419 hint=bundle2requiredhint)
424 420
425 421 prefercompressed = True
426 422
427 423 try:
428 424 clheads = set(repo.changelog.heads())
429 425 heads = set(opts.get('heads', set()))
430 426 common = set(opts.get('common', set()))
431 427 common.discard(nullid)
432 428 if (repo.ui.configbool('server', 'pullbundle') and
433 429 'partial-pull' in proto.getprotocaps()):
434 430 # Check if a pre-built bundle covers this request.
435 431 bundle = find_pullbundle(repo, proto, opts, clheads, heads, common)
436 432 if bundle:
437 433 return wireprototypes.streamres(gen=util.filechunkiter(bundle),
438 434 prefer_uncompressed=True)
439 435
440 436 if repo.ui.configbool('server', 'disablefullbundle'):
441 437 # Check to see if this is a full clone.
442 438 changegroup = opts.get('cg', True)
443 439 if changegroup and not common and clheads == heads:
444 440 raise error.Abort(
445 441 _('server has pull-based clones disabled'),
446 442 hint=_('remove --pull if specified or upgrade Mercurial'))
447 443
448 444 info, chunks = exchange.getbundlechunks(repo, 'serve',
449 445 **pycompat.strkwargs(opts))
450 446 prefercompressed = info.get('prefercompressed', True)
451 447 except error.Abort as exc:
452 448 # cleanly forward Abort error to the client
453 449 if not exchange.bundle2requested(opts.get('bundlecaps')):
454 450 if proto.name == 'http-v1':
455 451 return wireprototypes.ooberror(pycompat.bytestr(exc) + '\n')
456 452 raise # cannot do better for bundle1 + ssh
457 453 # bundle2 request expect a bundle2 reply
458 454 bundler = bundle2.bundle20(repo.ui)
459 455 manargs = [('message', pycompat.bytestr(exc))]
460 456 advargs = []
461 457 if exc.hint is not None:
462 458 advargs.append(('hint', exc.hint))
463 459 bundler.addpart(bundle2.bundlepart('error:abort',
464 460 manargs, advargs))
465 461 chunks = bundler.getchunks()
466 462 prefercompressed = False
467 463
468 464 return wireprototypes.streamres(
469 465 gen=chunks, prefer_uncompressed=not prefercompressed)
470 466
471 467 @wireprotocommand('heads', permission='pull')
472 468 def heads(repo, proto):
473 469 h = repo.heads()
474 470 return wireprototypes.bytesresponse(wireprototypes.encodelist(h) + '\n')
475 471
476 472 @wireprotocommand('hello', permission='pull')
477 473 def hello(repo, proto):
478 474 """Called as part of SSH handshake to obtain server info.
479 475
480 476 Returns a list of lines describing interesting things about the
481 477 server, in an RFC822-like format.
482 478
483 479 Currently, the only one defined is ``capabilities``, which consists of a
484 480 line of space separated tokens describing server abilities:
485 481
486 482 capabilities: <token0> <token1> <token2>
487 483 """
488 484 caps = capabilities(repo, proto).data
489 485 return wireprototypes.bytesresponse('capabilities: %s\n' % caps)
490 486
491 487 @wireprotocommand('listkeys', 'namespace', permission='pull')
492 488 def listkeys(repo, proto, namespace):
493 489 d = sorted(repo.listkeys(encoding.tolocal(namespace)).items())
494 490 return wireprototypes.bytesresponse(pushkeymod.encodekeys(d))
495 491
496 492 @wireprotocommand('lookup', 'key', permission='pull')
497 493 def lookup(repo, proto, key):
498 494 try:
499 495 k = encoding.tolocal(key)
500 496 n = repo.lookup(k)
501 497 r = hex(n)
502 498 success = 1
503 499 except Exception as inst:
504 500 r = stringutil.forcebytestr(inst)
505 501 success = 0
506 502 return wireprototypes.bytesresponse('%d %s\n' % (success, r))
507 503
508 504 @wireprotocommand('known', 'nodes *', permission='pull')
509 505 def known(repo, proto, nodes, others):
510 506 v = ''.join(b and '1' or '0'
511 507 for b in repo.known(wireprototypes.decodelist(nodes)))
512 508 return wireprototypes.bytesresponse(v)
513 509
514 510 @wireprotocommand('protocaps', 'caps', permission='pull')
515 511 def protocaps(repo, proto, caps):
516 512 if proto.name == wireprototypes.SSHV1:
517 513 proto._protocaps = set(caps.split(' '))
518 514 return wireprototypes.bytesresponse('OK')
519 515
520 516 @wireprotocommand('pushkey', 'namespace key old new', permission='push')
521 517 def pushkey(repo, proto, namespace, key, old, new):
522 518 # compatibility with pre-1.8 clients which were accidentally
523 519 # sending raw binary nodes rather than utf-8-encoded hex
524 520 if len(new) == 20 and stringutil.escapestr(new) != new:
525 521 # looks like it could be a binary node
526 522 try:
527 523 new.decode('utf-8')
528 524 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
529 525 except UnicodeDecodeError:
530 526 pass # binary, leave unmodified
531 527 else:
532 528 new = encoding.tolocal(new) # normal path
533 529
534 530 with proto.mayberedirectstdio() as output:
535 531 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
536 532 encoding.tolocal(old), new) or False
537 533
538 534 output = output.getvalue() if output else ''
539 535 return wireprototypes.bytesresponse('%d\n%s' % (int(r), output))
540 536
541 537 @wireprotocommand('stream_out', permission='pull')
542 538 def stream(repo, proto):
543 539 '''If the server supports streaming clone, it advertises the "stream"
544 540 capability with a value representing the version and flags of the repo
545 541 it is serving. Client checks to see if it understands the format.
546 542 '''
547 543 return wireprototypes.streamreslegacy(
548 544 streamclone.generatev1wireproto(repo))
549 545
550 546 @wireprotocommand('unbundle', 'heads', permission='push')
551 547 def unbundle(repo, proto, heads):
552 548 their_heads = wireprototypes.decodelist(heads)
553 549
554 550 with proto.mayberedirectstdio() as output:
555 551 try:
556 552 exchange.check_heads(repo, their_heads, 'preparing changes')
557 553 cleanup = lambda: None
558 554 try:
559 555 payload = proto.getpayload()
560 556 if repo.ui.configbool('server', 'streamunbundle'):
561 557 def cleanup():
562 558 # Ensure that the full payload is consumed, so
563 559 # that the connection doesn't contain trailing garbage.
564 560 for p in payload:
565 561 pass
566 562 fp = util.chunkbuffer(payload)
567 563 else:
568 564 # write bundle data to temporary file as it can be big
569 565 fp, tempname = None, None
570 566 def cleanup():
571 567 if fp:
572 568 fp.close()
573 569 if tempname:
574 570 os.unlink(tempname)
575 571 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
576 572 repo.ui.debug('redirecting incoming bundle to %s\n' %
577 573 tempname)
578 574 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
579 575 r = 0
580 576 for p in payload:
581 577 fp.write(p)
582 578 fp.seek(0)
583 579
584 580 gen = exchange.readbundle(repo.ui, fp, None)
585 581 if (isinstance(gen, changegroupmod.cg1unpacker)
586 582 and not bundle1allowed(repo, 'push')):
587 583 if proto.name == 'http-v1':
588 584 # need to special case http because stderr do not get to
589 585 # the http client on failed push so we need to abuse
590 586 # some other error type to make sure the message get to
591 587 # the user.
592 588 return wireprototypes.ooberror(bundle2required)
593 589 raise error.Abort(bundle2requiredmain,
594 590 hint=bundle2requiredhint)
595 591
596 592 r = exchange.unbundle(repo, gen, their_heads, 'serve',
597 593 proto.client())
598 594 if util.safehasattr(r, 'addpart'):
599 595 # The return looks streamable, we are in the bundle2 case
600 596 # and should return a stream.
601 597 return wireprototypes.streamreslegacy(gen=r.getchunks())
602 598 return wireprototypes.pushres(
603 599 r, output.getvalue() if output else '')
604 600
605 601 finally:
606 602 cleanup()
607 603
608 604 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
609 605 # handle non-bundle2 case first
610 606 if not getattr(exc, 'duringunbundle2', False):
611 607 try:
612 608 raise
613 609 except error.Abort:
614 610 # The old code we moved used procutil.stderr directly.
615 611 # We did not change it to minimise code change.
616 612 # This need to be moved to something proper.
617 613 # Feel free to do it.
618 614 procutil.stderr.write("abort: %s\n" % exc)
619 615 if exc.hint is not None:
620 616 procutil.stderr.write("(%s)\n" % exc.hint)
621 617 procutil.stderr.flush()
622 618 return wireprototypes.pushres(
623 619 0, output.getvalue() if output else '')
624 620 except error.PushRaced:
625 621 return wireprototypes.pusherr(
626 622 pycompat.bytestr(exc),
627 623 output.getvalue() if output else '')
628 624
629 625 bundler = bundle2.bundle20(repo.ui)
630 626 for out in getattr(exc, '_bundle2salvagedoutput', ()):
631 627 bundler.addpart(out)
632 628 try:
633 629 try:
634 630 raise
635 631 except error.PushkeyFailed as exc:
636 632 # check client caps
637 633 remotecaps = getattr(exc, '_replycaps', None)
638 634 if (remotecaps is not None
639 635 and 'pushkey' not in remotecaps.get('error', ())):
640 636 # no support remote side, fallback to Abort handler.
641 637 raise
642 638 part = bundler.newpart('error:pushkey')
643 639 part.addparam('in-reply-to', exc.partid)
644 640 if exc.namespace is not None:
645 641 part.addparam('namespace', exc.namespace,
646 642 mandatory=False)
647 643 if exc.key is not None:
648 644 part.addparam('key', exc.key, mandatory=False)
649 645 if exc.new is not None:
650 646 part.addparam('new', exc.new, mandatory=False)
651 647 if exc.old is not None:
652 648 part.addparam('old', exc.old, mandatory=False)
653 649 if exc.ret is not None:
654 650 part.addparam('ret', exc.ret, mandatory=False)
655 651 except error.BundleValueError as exc:
656 652 errpart = bundler.newpart('error:unsupportedcontent')
657 653 if exc.parttype is not None:
658 654 errpart.addparam('parttype', exc.parttype)
659 655 if exc.params:
660 656 errpart.addparam('params', '\0'.join(exc.params))
661 657 except error.Abort as exc:
662 658 manargs = [('message', stringutil.forcebytestr(exc))]
663 659 advargs = []
664 660 if exc.hint is not None:
665 661 advargs.append(('hint', exc.hint))
666 662 bundler.addpart(bundle2.bundlepart('error:abort',
667 663 manargs, advargs))
668 664 except error.PushRaced as exc:
669 665 bundler.newpart('error:pushraced',
670 666 [('message', stringutil.forcebytestr(exc))])
671 667 return wireprototypes.streamreslegacy(gen=bundler.getchunks())
@@ -1,533 +1,534
1 1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 3 #
4 4 # This software may be used and distributed according to the terms of the
5 5 # GNU General Public License version 2 or any later version.
6 6
7 7 from __future__ import absolute_import
8 8
9 9 import contextlib
10 10
11 11 from .i18n import _
12 12 from .thirdparty import (
13 13 cbor,
14 14 )
15 15 from .thirdparty.zope import (
16 16 interface as zi,
17 17 )
18 18 from . import (
19 19 encoding,
20 20 error,
21 21 pycompat,
22 22 streamclone,
23 23 util,
24 wireproto,
25 24 wireprotoframing,
26 25 wireprototypes,
27 26 )
28 27
29 28 FRAMINGTYPE = b'application/mercurial-exp-framing-0005'
30 29
31 30 HTTP_WIREPROTO_V2 = wireprototypes.HTTP_WIREPROTO_V2
32 31
32 COMMANDS = wireprototypes.commanddict()
33
33 34 def handlehttpv2request(rctx, req, res, checkperm, urlparts):
34 35 from .hgweb import common as hgwebcommon
35 36
36 37 # URL space looks like: <permissions>/<command>, where <permission> can
37 38 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
38 39
39 40 # Root URL does nothing meaningful... yet.
40 41 if not urlparts:
41 42 res.status = b'200 OK'
42 43 res.headers[b'Content-Type'] = b'text/plain'
43 44 res.setbodybytes(_('HTTP version 2 API handler'))
44 45 return
45 46
46 47 if len(urlparts) == 1:
47 48 res.status = b'404 Not Found'
48 49 res.headers[b'Content-Type'] = b'text/plain'
49 50 res.setbodybytes(_('do not know how to process %s\n') %
50 51 req.dispatchpath)
51 52 return
52 53
53 54 permission, command = urlparts[0:2]
54 55
55 56 if permission not in (b'ro', b'rw'):
56 57 res.status = b'404 Not Found'
57 58 res.headers[b'Content-Type'] = b'text/plain'
58 59 res.setbodybytes(_('unknown permission: %s') % permission)
59 60 return
60 61
61 62 if req.method != 'POST':
62 63 res.status = b'405 Method Not Allowed'
63 64 res.headers[b'Allow'] = b'POST'
64 65 res.setbodybytes(_('commands require POST requests'))
65 66 return
66 67
67 68 # At some point we'll want to use our own API instead of recycling the
68 69 # behavior of version 1 of the wire protocol...
69 70 # TODO return reasonable responses - not responses that overload the
70 71 # HTTP status line message for error reporting.
71 72 try:
72 73 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
73 74 except hgwebcommon.ErrorResponse as e:
74 75 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
75 76 for k, v in e.headers:
76 77 res.headers[k] = v
77 78 res.setbodybytes('permission denied')
78 79 return
79 80
80 81 # We have a special endpoint to reflect the request back at the client.
81 82 if command == b'debugreflect':
82 83 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
83 84 return
84 85
85 86 # Extra commands that we handle that aren't really wire protocol
86 87 # commands. Think extra hard before making this hackery available to
87 88 # extension.
88 89 extracommands = {'multirequest'}
89 90
90 if command not in wireproto.commandsv2 and command not in extracommands:
91 if command not in COMMANDS and command not in extracommands:
91 92 res.status = b'404 Not Found'
92 93 res.headers[b'Content-Type'] = b'text/plain'
93 94 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
94 95 return
95 96
96 97 repo = rctx.repo
97 98 ui = repo.ui
98 99
99 100 proto = httpv2protocolhandler(req, ui)
100 101
101 if (not wireproto.commandsv2.commandavailable(command, proto)
102 if (not COMMANDS.commandavailable(command, proto)
102 103 and command not in extracommands):
103 104 res.status = b'404 Not Found'
104 105 res.headers[b'Content-Type'] = b'text/plain'
105 106 res.setbodybytes(_('invalid wire protocol command: %s') % command)
106 107 return
107 108
108 109 # TODO consider cases where proxies may add additional Accept headers.
109 110 if req.headers.get(b'Accept') != FRAMINGTYPE:
110 111 res.status = b'406 Not Acceptable'
111 112 res.headers[b'Content-Type'] = b'text/plain'
112 113 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
113 114 % FRAMINGTYPE)
114 115 return
115 116
116 117 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
117 118 res.status = b'415 Unsupported Media Type'
118 119 # TODO we should send a response with appropriate media type,
119 120 # since client does Accept it.
120 121 res.headers[b'Content-Type'] = b'text/plain'
121 122 res.setbodybytes(_('client MUST send Content-Type header with '
122 123 'value: %s\n') % FRAMINGTYPE)
123 124 return
124 125
125 126 _processhttpv2request(ui, repo, req, res, permission, command, proto)
126 127
127 128 def _processhttpv2reflectrequest(ui, repo, req, res):
128 129 """Reads unified frame protocol request and dumps out state to client.
129 130
130 131 This special endpoint can be used to help debug the wire protocol.
131 132
132 133 Instead of routing the request through the normal dispatch mechanism,
133 134 we instead read all frames, decode them, and feed them into our state
134 135 tracker. We then dump the log of all that activity back out to the
135 136 client.
136 137 """
137 138 import json
138 139
139 140 # Reflection APIs have a history of being abused, accidentally disclosing
140 141 # sensitive data, etc. So we have a config knob.
141 142 if not ui.configbool('experimental', 'web.api.debugreflect'):
142 143 res.status = b'404 Not Found'
143 144 res.headers[b'Content-Type'] = b'text/plain'
144 145 res.setbodybytes(_('debugreflect service not available'))
145 146 return
146 147
147 148 # We assume we have a unified framing protocol request body.
148 149
149 150 reactor = wireprotoframing.serverreactor()
150 151 states = []
151 152
152 153 while True:
153 154 frame = wireprotoframing.readframe(req.bodyfh)
154 155
155 156 if not frame:
156 157 states.append(b'received: <no frame>')
157 158 break
158 159
159 160 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
160 161 frame.requestid,
161 162 frame.payload))
162 163
163 164 action, meta = reactor.onframerecv(frame)
164 165 states.append(json.dumps((action, meta), sort_keys=True,
165 166 separators=(', ', ': ')))
166 167
167 168 action, meta = reactor.oninputeof()
168 169 meta['action'] = action
169 170 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
170 171
171 172 res.status = b'200 OK'
172 173 res.headers[b'Content-Type'] = b'text/plain'
173 174 res.setbodybytes(b'\n'.join(states))
174 175
175 176 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
176 177 """Post-validation handler for HTTPv2 requests.
177 178
178 179 Called when the HTTP request contains unified frame-based protocol
179 180 frames for evaluation.
180 181 """
181 182 # TODO Some HTTP clients are full duplex and can receive data before
182 183 # the entire request is transmitted. Figure out a way to indicate support
183 184 # for that so we can opt into full duplex mode.
184 185 reactor = wireprotoframing.serverreactor(deferoutput=True)
185 186 seencommand = False
186 187
187 188 outstream = reactor.makeoutputstream()
188 189
189 190 while True:
190 191 frame = wireprotoframing.readframe(req.bodyfh)
191 192 if not frame:
192 193 break
193 194
194 195 action, meta = reactor.onframerecv(frame)
195 196
196 197 if action == 'wantframe':
197 198 # Need more data before we can do anything.
198 199 continue
199 200 elif action == 'runcommand':
200 201 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
201 202 reqcommand, reactor, outstream,
202 203 meta, issubsequent=seencommand)
203 204
204 205 if sentoutput:
205 206 return
206 207
207 208 seencommand = True
208 209
209 210 elif action == 'error':
210 211 # TODO define proper error mechanism.
211 212 res.status = b'200 OK'
212 213 res.headers[b'Content-Type'] = b'text/plain'
213 214 res.setbodybytes(meta['message'] + b'\n')
214 215 return
215 216 else:
216 217 raise error.ProgrammingError(
217 218 'unhandled action from frame processor: %s' % action)
218 219
219 220 action, meta = reactor.oninputeof()
220 221 if action == 'sendframes':
221 222 # We assume we haven't started sending the response yet. If we're
222 223 # wrong, the response type will raise an exception.
223 224 res.status = b'200 OK'
224 225 res.headers[b'Content-Type'] = FRAMINGTYPE
225 226 res.setbodygen(meta['framegen'])
226 227 elif action == 'noop':
227 228 pass
228 229 else:
229 230 raise error.ProgrammingError('unhandled action from frame processor: %s'
230 231 % action)
231 232
232 233 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
233 234 outstream, command, issubsequent):
234 235 """Dispatch a wire protocol command made from HTTPv2 requests.
235 236
236 237 The authenticated permission (``authedperm``) along with the original
237 238 command from the URL (``reqcommand``) are passed in.
238 239 """
239 240 # We already validated that the session has permissions to perform the
240 241 # actions in ``authedperm``. In the unified frame protocol, the canonical
241 242 # command to run is expressed in a frame. However, the URL also requested
242 243 # to run a specific command. We need to be careful that the command we
243 244 # run doesn't have permissions requirements greater than what was granted
244 245 # by ``authedperm``.
245 246 #
246 247 # Our rule for this is we only allow one command per HTTP request and
247 248 # that command must match the command in the URL. However, we make
248 249 # an exception for the ``multirequest`` URL. This URL is allowed to
249 250 # execute multiple commands. We double check permissions of each command
250 251 # as it is invoked to ensure there is no privilege escalation.
251 252 # TODO consider allowing multiple commands to regular command URLs
252 253 # iff each command is the same.
253 254
254 255 proto = httpv2protocolhandler(req, ui, args=command['args'])
255 256
256 257 if reqcommand == b'multirequest':
257 if not wireproto.commandsv2.commandavailable(command['command'], proto):
258 if not COMMANDS.commandavailable(command['command'], proto):
258 259 # TODO proper error mechanism
259 260 res.status = b'200 OK'
260 261 res.headers[b'Content-Type'] = b'text/plain'
261 262 res.setbodybytes(_('wire protocol command not available: %s') %
262 263 command['command'])
263 264 return True
264 265
265 266 # TODO don't use assert here, since it may be elided by -O.
266 267 assert authedperm in (b'ro', b'rw')
267 wirecommand = wireproto.commandsv2[command['command']]
268 wirecommand = COMMANDS[command['command']]
268 269 assert wirecommand.permission in ('push', 'pull')
269 270
270 271 if authedperm == b'ro' and wirecommand.permission != 'pull':
271 272 # TODO proper error mechanism
272 273 res.status = b'403 Forbidden'
273 274 res.headers[b'Content-Type'] = b'text/plain'
274 275 res.setbodybytes(_('insufficient permissions to execute '
275 276 'command: %s') % command['command'])
276 277 return True
277 278
278 279 # TODO should we also call checkperm() here? Maybe not if we're going
279 280 # to overhaul that API. The granted scope from the URL check should
280 281 # be good enough.
281 282
282 283 else:
283 284 # Don't allow multiple commands outside of ``multirequest`` URL.
284 285 if issubsequent:
285 286 # TODO proper error mechanism
286 287 res.status = b'200 OK'
287 288 res.headers[b'Content-Type'] = b'text/plain'
288 289 res.setbodybytes(_('multiple commands cannot be issued to this '
289 290 'URL'))
290 291 return True
291 292
292 293 if reqcommand != command['command']:
293 294 # TODO define proper error mechanism
294 295 res.status = b'200 OK'
295 296 res.headers[b'Content-Type'] = b'text/plain'
296 297 res.setbodybytes(_('command in frame must match command in URL'))
297 298 return True
298 299
299 300 rsp = dispatch(repo, proto, command['command'])
300 301
301 302 res.status = b'200 OK'
302 303 res.headers[b'Content-Type'] = FRAMINGTYPE
303 304
304 305 if isinstance(rsp, wireprototypes.cborresponse):
305 306 encoded = cbor.dumps(rsp.value, canonical=True)
306 307 action, meta = reactor.oncommandresponseready(outstream,
307 308 command['requestid'],
308 309 encoded)
309 310 elif isinstance(rsp, wireprototypes.v2streamingresponse):
310 311 action, meta = reactor.oncommandresponsereadygen(outstream,
311 312 command['requestid'],
312 313 rsp.gen)
313 314 elif isinstance(rsp, wireprototypes.v2errorresponse):
314 315 action, meta = reactor.oncommanderror(outstream,
315 316 command['requestid'],
316 317 rsp.message,
317 318 rsp.args)
318 319 else:
319 320 action, meta = reactor.onservererror(
320 321 _('unhandled response type from wire proto command'))
321 322
322 323 if action == 'sendframes':
323 324 res.setbodygen(meta['framegen'])
324 325 return True
325 326 elif action == 'noop':
326 327 return False
327 328 else:
328 329 raise error.ProgrammingError('unhandled event from reactor: %s' %
329 330 action)
330 331
331 332 def getdispatchrepo(repo, proto, command):
332 333 return repo.filtered('served')
333 334
334 335 def dispatch(repo, proto, command):
335 336 repo = getdispatchrepo(repo, proto, command)
336 337
337 func, spec = wireproto.commandsv2[command]
338 func, spec = COMMANDS[command]
338 339 args = proto.getargs(spec)
339 340
340 341 return func(repo, proto, **args)
341 342
342 343 @zi.implementer(wireprototypes.baseprotocolhandler)
343 344 class httpv2protocolhandler(object):
344 345 def __init__(self, req, ui, args=None):
345 346 self._req = req
346 347 self._ui = ui
347 348 self._args = args
348 349
349 350 @property
350 351 def name(self):
351 352 return HTTP_WIREPROTO_V2
352 353
353 354 def getargs(self, args):
354 355 data = {}
355 356 for k, typ in args.items():
356 357 if k == '*':
357 358 raise NotImplementedError('do not support * args')
358 359 elif k in self._args:
359 360 # TODO consider validating value types.
360 361 data[k] = self._args[k]
361 362
362 363 return data
363 364
364 365 def getprotocaps(self):
365 366 # Protocol capabilities are currently not implemented for HTTP V2.
366 367 return set()
367 368
368 369 def getpayload(self):
369 370 raise NotImplementedError
370 371
371 372 @contextlib.contextmanager
372 373 def mayberedirectstdio(self):
373 374 raise NotImplementedError
374 375
375 376 def client(self):
376 377 raise NotImplementedError
377 378
378 379 def addcapabilities(self, repo, caps):
379 380 return caps
380 381
381 382 def checkperm(self, perm):
382 383 raise NotImplementedError
383 384
384 385 def httpv2apidescriptor(req, repo):
385 386 proto = httpv2protocolhandler(req, repo.ui)
386 387
387 388 return _capabilitiesv2(repo, proto)
388 389
389 390 def _capabilitiesv2(repo, proto):
390 391 """Obtain the set of capabilities for version 2 transports.
391 392
392 393 These capabilities are distinct from the capabilities for version 1
393 394 transports.
394 395 """
395 396 compression = []
396 397 for engine in wireprototypes.supportedcompengines(repo.ui, util.SERVERROLE):
397 398 compression.append({
398 399 b'name': engine.wireprotosupport().name,
399 400 })
400 401
401 402 caps = {
402 403 'commands': {},
403 404 'compression': compression,
404 405 'framingmediatypes': [FRAMINGTYPE],
405 406 }
406 407
407 for command, entry in wireproto.commandsv2.items():
408 for command, entry in COMMANDS.items():
408 409 caps['commands'][command] = {
409 410 'args': entry.args,
410 411 'permissions': [entry.permission],
411 412 }
412 413
413 414 if streamclone.allowservergeneration(repo):
414 415 caps['rawrepoformats'] = sorted(repo.requirements &
415 416 repo.supportedformats)
416 417
417 418 return proto.addcapabilities(repo, caps)
418 419
419 420 def wireprotocommand(name, args=None, permission='push'):
420 421 """Decorator to declare a wire protocol command.
421 422
422 423 ``name`` is the name of the wire protocol command being provided.
423 424
424 425 ``args`` is a dict of argument names to example values.
425 426
426 427 ``permission`` defines the permission type needed to run this command.
427 428 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
428 429 respectively. Default is to assume command requires ``push`` permissions
429 430 because otherwise commands not declaring their permissions could modify
430 431 a repository that is supposed to be read-only.
431 432 """
432 433 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
433 434 if v['version'] == 2}
434 435
435 436 if permission not in ('push', 'pull'):
436 437 raise error.ProgrammingError('invalid wire protocol permission; '
437 438 'got %s; expected "push" or "pull"' %
438 439 permission)
439 440
440 441 if args is None:
441 442 args = {}
442 443
443 444 if not isinstance(args, dict):
444 445 raise error.ProgrammingError('arguments for version 2 commands '
445 446 'must be declared as dicts')
446 447
447 448 def register(func):
448 if name in wireproto.commandsv2:
449 if name in COMMANDS:
449 450 raise error.ProgrammingError('%s command already registered '
450 451 'for version 2' % name)
451 452
452 wireproto.commandsv2[name] = wireprototypes.commandentry(
453 COMMANDS[name] = wireprototypes.commandentry(
453 454 func, args=args, transports=transports, permission=permission)
454 455
455 456 return func
456 457
457 458 return register
458 459
459 460 @wireprotocommand('branchmap', permission='pull')
460 461 def branchmapv2(repo, proto):
461 462 branchmap = {encoding.fromlocal(k): v
462 463 for k, v in repo.branchmap().iteritems()}
463 464
464 465 return wireprototypes.cborresponse(branchmap)
465 466
466 467 @wireprotocommand('capabilities', permission='pull')
467 468 def capabilitiesv2(repo, proto):
468 469 caps = _capabilitiesv2(repo, proto)
469 470
470 471 return wireprototypes.cborresponse(caps)
471 472
472 473 @wireprotocommand('heads',
473 474 args={
474 475 'publiconly': False,
475 476 },
476 477 permission='pull')
477 478 def headsv2(repo, proto, publiconly=False):
478 479 if publiconly:
479 480 repo = repo.filtered('immutable')
480 481
481 482 return wireprototypes.cborresponse(repo.heads())
482 483
483 484 @wireprotocommand('known',
484 485 args={
485 486 'nodes': [b'deadbeef'],
486 487 },
487 488 permission='pull')
488 489 def knownv2(repo, proto, nodes=None):
489 490 nodes = nodes or []
490 491 result = b''.join(b'1' if n else b'0' for n in repo.known(nodes))
491 492 return wireprototypes.cborresponse(result)
492 493
493 494 @wireprotocommand('listkeys',
494 495 args={
495 496 'namespace': b'ns',
496 497 },
497 498 permission='pull')
498 499 def listkeysv2(repo, proto, namespace=None):
499 500 keys = repo.listkeys(encoding.tolocal(namespace))
500 501 keys = {encoding.fromlocal(k): encoding.fromlocal(v)
501 502 for k, v in keys.iteritems()}
502 503
503 504 return wireprototypes.cborresponse(keys)
504 505
505 506 @wireprotocommand('lookup',
506 507 args={
507 508 'key': b'foo',
508 509 },
509 510 permission='pull')
510 511 def lookupv2(repo, proto, key):
511 512 key = encoding.tolocal(key)
512 513
513 514 # TODO handle exception.
514 515 node = repo.lookup(key)
515 516
516 517 return wireprototypes.cborresponse(node)
517 518
518 519 @wireprotocommand('pushkey',
519 520 args={
520 521 'namespace': b'ns',
521 522 'key': b'key',
522 523 'old': b'old',
523 524 'new': b'new',
524 525 },
525 526 permission='push')
526 527 def pushkeyv2(repo, proto, namespace, key, old, new):
527 528 # TODO handle ui output redirection
528 529 r = repo.pushkey(encoding.tolocal(namespace),
529 530 encoding.tolocal(key),
530 531 encoding.tolocal(old),
531 532 encoding.tolocal(new))
532 533
533 534 return wireprototypes.cborresponse(r)
General Comments 0
You need to be logged in to leave comments. Login now