##// END OF EJS Templates
wireproto: move command registration types to wireprototypes...
Gregory Szorc -
r37799:352932a1 default
parent child Browse files
Show More
@@ -1,809 +1,730
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import tempfile
12 12
13 13 from .i18n import _
14 14 from .node import (
15 15 hex,
16 16 nullid,
17 17 )
18 18
19 19 from . import (
20 20 bundle2,
21 21 changegroup as changegroupmod,
22 22 discovery,
23 23 encoding,
24 24 error,
25 25 exchange,
26 26 pushkey as pushkeymod,
27 27 pycompat,
28 28 streamclone,
29 29 util,
30 30 wireprototypes,
31 31 )
32 32
33 33 from .utils import (
34 34 procutil,
35 35 stringutil,
36 36 )
37 37
38 38 urlerr = util.urlerr
39 39 urlreq = util.urlreq
40 40
41 41 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
42 42 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
43 43 'IncompatibleClient')
44 44 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
45 45
46 46 def clientcompressionsupport(proto):
47 47 """Returns a list of compression methods supported by the client.
48 48
49 49 Returns a list of the compression methods supported by the client
50 50 according to the protocol capabilities. If no such capability has
51 51 been announced, fallback to the default of zlib and uncompressed.
52 52 """
53 53 for cap in proto.getprotocaps():
54 54 if cap.startswith('comp='):
55 55 return cap[5:].split(',')
56 56 return ['zlib', 'none']
57 57
58 58 # wire protocol command can either return a string or one of these classes.
59 59
60 60 def getdispatchrepo(repo, proto, command):
61 61 """Obtain the repo used for processing wire protocol commands.
62 62
63 63 The intent of this function is to serve as a monkeypatch point for
64 64 extensions that need commands to operate on different repo views under
65 65 specialized circumstances.
66 66 """
67 67 return repo.filtered('served')
68 68
69 69 def dispatch(repo, proto, command):
70 70 repo = getdispatchrepo(repo, proto, command)
71 71
72 72 transportversion = wireprototypes.TRANSPORTS[proto.name]['version']
73 73 commandtable = commandsv2 if transportversion == 2 else commands
74 74 func, spec = commandtable[command]
75 75
76 76 args = proto.getargs(spec)
77 77
78 78 # Version 1 protocols define arguments as a list. Version 2 uses a dict.
79 79 if isinstance(args, list):
80 80 return func(repo, proto, *args)
81 81 elif isinstance(args, dict):
82 82 return func(repo, proto, **args)
83 83 else:
84 84 raise error.ProgrammingError('unexpected type returned from '
85 85 'proto.getargs(): %s' % type(args))
86 86
87 87 def options(cmd, keys, others):
88 88 opts = {}
89 89 for k in keys:
90 90 if k in others:
91 91 opts[k] = others[k]
92 92 del others[k]
93 93 if others:
94 94 procutil.stderr.write("warning: %s ignored unexpected arguments %s\n"
95 95 % (cmd, ",".join(others)))
96 96 return opts
97 97
98 98 def bundle1allowed(repo, action):
99 99 """Whether a bundle1 operation is allowed from the server.
100 100
101 101 Priority is:
102 102
103 103 1. server.bundle1gd.<action> (if generaldelta active)
104 104 2. server.bundle1.<action>
105 105 3. server.bundle1gd (if generaldelta active)
106 106 4. server.bundle1
107 107 """
108 108 ui = repo.ui
109 109 gd = 'generaldelta' in repo.requirements
110 110
111 111 if gd:
112 112 v = ui.configbool('server', 'bundle1gd.%s' % action)
113 113 if v is not None:
114 114 return v
115 115
116 116 v = ui.configbool('server', 'bundle1.%s' % action)
117 117 if v is not None:
118 118 return v
119 119
120 120 if gd:
121 121 v = ui.configbool('server', 'bundle1gd')
122 122 if v is not None:
123 123 return v
124 124
125 125 return ui.configbool('server', 'bundle1')
126 126
127 127 def supportedcompengines(ui, role):
128 128 """Obtain the list of supported compression engines for a request."""
129 129 assert role in (util.CLIENTROLE, util.SERVERROLE)
130 130
131 131 compengines = util.compengines.supportedwireengines(role)
132 132
133 133 # Allow config to override default list and ordering.
134 134 if role == util.SERVERROLE:
135 135 configengines = ui.configlist('server', 'compressionengines')
136 136 config = 'server.compressionengines'
137 137 else:
138 138 # This is currently implemented mainly to facilitate testing. In most
139 139 # cases, the server should be in charge of choosing a compression engine
140 140 # because a server has the most to lose from a sub-optimal choice. (e.g.
141 141 # CPU DoS due to an expensive engine or a network DoS due to poor
142 142 # compression ratio).
143 143 configengines = ui.configlist('experimental',
144 144 'clientcompressionengines')
145 145 config = 'experimental.clientcompressionengines'
146 146
147 147 # No explicit config. Filter out the ones that aren't supposed to be
148 148 # advertised and return default ordering.
149 149 if not configengines:
150 150 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
151 151 return [e for e in compengines
152 152 if getattr(e.wireprotosupport(), attr) > 0]
153 153
154 154 # If compression engines are listed in the config, assume there is a good
155 155 # reason for it (like server operators wanting to achieve specific
156 156 # performance characteristics). So fail fast if the config references
157 157 # unusable compression engines.
158 158 validnames = set(e.name() for e in compengines)
159 159 invalidnames = set(e for e in configengines if e not in validnames)
160 160 if invalidnames:
161 161 raise error.Abort(_('invalid compression engine defined in %s: %s') %
162 162 (config, ', '.join(sorted(invalidnames))))
163 163
164 164 compengines = [e for e in compengines if e.name() in configengines]
165 165 compengines = sorted(compengines,
166 166 key=lambda e: configengines.index(e.name()))
167 167
168 168 if not compengines:
169 169 raise error.Abort(_('%s config option does not specify any known '
170 170 'compression engines') % config,
171 171 hint=_('usable compression engines: %s') %
172 172 ', '.sorted(validnames))
173 173
174 174 return compengines
175 175
176 class commandentry(object):
177 """Represents a declared wire protocol command."""
178 def __init__(self, func, args='', transports=None,
179 permission='push'):
180 self.func = func
181 self.args = args
182 self.transports = transports or set()
183 self.permission = permission
184
185 def _merge(self, func, args):
186 """Merge this instance with an incoming 2-tuple.
187
188 This is called when a caller using the old 2-tuple API attempts
189 to replace an instance. The incoming values are merged with
190 data not captured by the 2-tuple and a new instance containing
191 the union of the two objects is returned.
192 """
193 return commandentry(func, args=args, transports=set(self.transports),
194 permission=self.permission)
195
196 # Old code treats instances as 2-tuples. So expose that interface.
197 def __iter__(self):
198 yield self.func
199 yield self.args
200
201 def __getitem__(self, i):
202 if i == 0:
203 return self.func
204 elif i == 1:
205 return self.args
206 else:
207 raise IndexError('can only access elements 0 and 1')
208
209 class commanddict(dict):
210 """Container for registered wire protocol commands.
211
212 It behaves like a dict. But __setitem__ is overwritten to allow silent
213 coercion of values from 2-tuples for API compatibility.
214 """
215 def __setitem__(self, k, v):
216 if isinstance(v, commandentry):
217 pass
218 # Cast 2-tuples to commandentry instances.
219 elif isinstance(v, tuple):
220 if len(v) != 2:
221 raise ValueError('command tuples must have exactly 2 elements')
222
223 # It is common for extensions to wrap wire protocol commands via
224 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
225 # doing this aren't aware of the new API that uses objects to store
226 # command entries, we automatically merge old state with new.
227 if k in self:
228 v = self[k]._merge(v[0], v[1])
229 else:
230 # Use default values from @wireprotocommand.
231 v = commandentry(v[0], args=v[1],
232 transports=set(wireprototypes.TRANSPORTS),
233 permission='push')
234 else:
235 raise ValueError('command entries must be commandentry instances '
236 'or 2-tuples')
237
238 return super(commanddict, self).__setitem__(k, v)
239
240 def commandavailable(self, command, proto):
241 """Determine if a command is available for the requested protocol."""
242 assert proto.name in wireprototypes.TRANSPORTS
243
244 entry = self.get(command)
245
246 if not entry:
247 return False
248
249 if proto.name not in entry.transports:
250 return False
251
252 return True
253
254 176 # For version 1 transports.
255 commands = commanddict()
177 commands = wireprototypes.commanddict()
256 178
257 179 # For version 2 transports.
258 commandsv2 = commanddict()
180 commandsv2 = wireprototypes.commanddict()
259 181
260 182 def wireprotocommand(name, args=None, permission='push'):
261 183 """Decorator to declare a wire protocol command.
262 184
263 185 ``name`` is the name of the wire protocol command being provided.
264 186
265 187 ``args`` defines the named arguments accepted by the command. It is
266 188 a space-delimited list of argument names. ``*`` denotes a special value
267 189 that says to accept all named arguments.
268 190
269 191 ``permission`` defines the permission type needed to run this command.
270 192 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
271 193 respectively. Default is to assume command requires ``push`` permissions
272 194 because otherwise commands not declaring their permissions could modify
273 195 a repository that is supposed to be read-only.
274 196 """
275 197 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
276 198 if v['version'] == 1}
277 199
278 200 # Because SSHv2 is a mirror of SSHv1, we allow "batch" commands through to
279 201 # SSHv2.
280 202 # TODO undo this hack when SSH is using the unified frame protocol.
281 203 if name == b'batch':
282 204 transports.add(wireprototypes.SSHV2)
283 205
284 206 if permission not in ('push', 'pull'):
285 207 raise error.ProgrammingError('invalid wire protocol permission; '
286 208 'got %s; expected "push" or "pull"' %
287 209 permission)
288 210
289 211 if args is None:
290 212 args = ''
291 213
292 214 if not isinstance(args, bytes):
293 215 raise error.ProgrammingError('arguments for version 1 commands '
294 216 'must be declared as bytes')
295 217
296 218 def register(func):
297 219 if name in commands:
298 220 raise error.ProgrammingError('%s command already registered '
299 221 'for version 1' % name)
300 commands[name] = commandentry(func, args=args,
301 transports=transports,
302 permission=permission)
222 commands[name] = wireprototypes.commandentry(
223 func, args=args, transports=transports, permission=permission)
303 224
304 225 return func
305 226 return register
306 227
307 228 # TODO define a more appropriate permissions type to use for this.
308 229 @wireprotocommand('batch', 'cmds *', permission='pull')
309 230 def batch(repo, proto, cmds, others):
310 231 unescapearg = wireprototypes.unescapebatcharg
311 232 repo = repo.filtered("served")
312 233 res = []
313 234 for pair in cmds.split(';'):
314 235 op, args = pair.split(' ', 1)
315 236 vals = {}
316 237 for a in args.split(','):
317 238 if a:
318 239 n, v = a.split('=')
319 240 vals[unescapearg(n)] = unescapearg(v)
320 241 func, spec = commands[op]
321 242
322 243 # Validate that client has permissions to perform this command.
323 244 perm = commands[op].permission
324 245 assert perm in ('push', 'pull')
325 246 proto.checkperm(perm)
326 247
327 248 if spec:
328 249 keys = spec.split()
329 250 data = {}
330 251 for k in keys:
331 252 if k == '*':
332 253 star = {}
333 254 for key in vals.keys():
334 255 if key not in keys:
335 256 star[key] = vals[key]
336 257 data['*'] = star
337 258 else:
338 259 data[k] = vals[k]
339 260 result = func(repo, proto, *[data[k] for k in keys])
340 261 else:
341 262 result = func(repo, proto)
342 263 if isinstance(result, wireprototypes.ooberror):
343 264 return result
344 265
345 266 # For now, all batchable commands must return bytesresponse or
346 267 # raw bytes (for backwards compatibility).
347 268 assert isinstance(result, (wireprototypes.bytesresponse, bytes))
348 269 if isinstance(result, wireprototypes.bytesresponse):
349 270 result = result.data
350 271 res.append(wireprototypes.escapebatcharg(result))
351 272
352 273 return wireprototypes.bytesresponse(';'.join(res))
353 274
354 275 @wireprotocommand('between', 'pairs', permission='pull')
355 276 def between(repo, proto, pairs):
356 277 pairs = [wireprototypes.decodelist(p, '-') for p in pairs.split(" ")]
357 278 r = []
358 279 for b in repo.between(pairs):
359 280 r.append(wireprototypes.encodelist(b) + "\n")
360 281
361 282 return wireprototypes.bytesresponse(''.join(r))
362 283
363 284 @wireprotocommand('branchmap', permission='pull')
364 285 def branchmap(repo, proto):
365 286 branchmap = repo.branchmap()
366 287 heads = []
367 288 for branch, nodes in branchmap.iteritems():
368 289 branchname = urlreq.quote(encoding.fromlocal(branch))
369 290 branchnodes = wireprototypes.encodelist(nodes)
370 291 heads.append('%s %s' % (branchname, branchnodes))
371 292
372 293 return wireprototypes.bytesresponse('\n'.join(heads))
373 294
374 295 @wireprotocommand('branches', 'nodes', permission='pull')
375 296 def branches(repo, proto, nodes):
376 297 nodes = wireprototypes.decodelist(nodes)
377 298 r = []
378 299 for b in repo.branches(nodes):
379 300 r.append(wireprototypes.encodelist(b) + "\n")
380 301
381 302 return wireprototypes.bytesresponse(''.join(r))
382 303
383 304 @wireprotocommand('clonebundles', '', permission='pull')
384 305 def clonebundles(repo, proto):
385 306 """Server command for returning info for available bundles to seed clones.
386 307
387 308 Clients will parse this response and determine what bundle to fetch.
388 309
389 310 Extensions may wrap this command to filter or dynamically emit data
390 311 depending on the request. e.g. you could advertise URLs for the closest
391 312 data center given the client's IP address.
392 313 """
393 314 return wireprototypes.bytesresponse(
394 315 repo.vfs.tryread('clonebundles.manifest'))
395 316
396 317 wireprotocaps = ['lookup', 'branchmap', 'pushkey',
397 318 'known', 'getbundle', 'unbundlehash']
398 319
399 320 def _capabilities(repo, proto):
400 321 """return a list of capabilities for a repo
401 322
402 323 This function exists to allow extensions to easily wrap capabilities
403 324 computation
404 325
405 326 - returns a lists: easy to alter
406 327 - change done here will be propagated to both `capabilities` and `hello`
407 328 command without any other action needed.
408 329 """
409 330 # copy to prevent modification of the global list
410 331 caps = list(wireprotocaps)
411 332
412 333 # Command of same name as capability isn't exposed to version 1 of
413 334 # transports. So conditionally add it.
414 335 if commands.commandavailable('changegroupsubset', proto):
415 336 caps.append('changegroupsubset')
416 337
417 338 if streamclone.allowservergeneration(repo):
418 339 if repo.ui.configbool('server', 'preferuncompressed'):
419 340 caps.append('stream-preferred')
420 341 requiredformats = repo.requirements & repo.supportedformats
421 342 # if our local revlogs are just revlogv1, add 'stream' cap
422 343 if not requiredformats - {'revlogv1'}:
423 344 caps.append('stream')
424 345 # otherwise, add 'streamreqs' detailing our local revlog format
425 346 else:
426 347 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
427 348 if repo.ui.configbool('experimental', 'bundle2-advertise'):
428 349 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
429 350 caps.append('bundle2=' + urlreq.quote(capsblob))
430 351 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
431 352
432 353 return proto.addcapabilities(repo, caps)
433 354
434 355 # If you are writing an extension and consider wrapping this function. Wrap
435 356 # `_capabilities` instead.
436 357 @wireprotocommand('capabilities', permission='pull')
437 358 def capabilities(repo, proto):
438 359 caps = _capabilities(repo, proto)
439 360 return wireprototypes.bytesresponse(' '.join(sorted(caps)))
440 361
441 362 @wireprotocommand('changegroup', 'roots', permission='pull')
442 363 def changegroup(repo, proto, roots):
443 364 nodes = wireprototypes.decodelist(roots)
444 365 outgoing = discovery.outgoing(repo, missingroots=nodes,
445 366 missingheads=repo.heads())
446 367 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
447 368 gen = iter(lambda: cg.read(32768), '')
448 369 return wireprototypes.streamres(gen=gen)
449 370
450 371 @wireprotocommand('changegroupsubset', 'bases heads',
451 372 permission='pull')
452 373 def changegroupsubset(repo, proto, bases, heads):
453 374 bases = wireprototypes.decodelist(bases)
454 375 heads = wireprototypes.decodelist(heads)
455 376 outgoing = discovery.outgoing(repo, missingroots=bases,
456 377 missingheads=heads)
457 378 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
458 379 gen = iter(lambda: cg.read(32768), '')
459 380 return wireprototypes.streamres(gen=gen)
460 381
461 382 @wireprotocommand('debugwireargs', 'one two *',
462 383 permission='pull')
463 384 def debugwireargs(repo, proto, one, two, others):
464 385 # only accept optional args from the known set
465 386 opts = options('debugwireargs', ['three', 'four'], others)
466 387 return wireprototypes.bytesresponse(repo.debugwireargs(
467 388 one, two, **pycompat.strkwargs(opts)))
468 389
469 390 def find_pullbundle(repo, proto, opts, clheads, heads, common):
470 391 """Return a file object for the first matching pullbundle.
471 392
472 393 Pullbundles are specified in .hg/pullbundles.manifest similar to
473 394 clonebundles.
474 395 For each entry, the bundle specification is checked for compatibility:
475 396 - Client features vs the BUNDLESPEC.
476 397 - Revisions shared with the clients vs base revisions of the bundle.
477 398 A bundle can be applied only if all its base revisions are known by
478 399 the client.
479 400 - At least one leaf of the bundle's DAG is missing on the client.
480 401 - Every leaf of the bundle's DAG is part of node set the client wants.
481 402 E.g. do not send a bundle of all changes if the client wants only
482 403 one specific branch of many.
483 404 """
484 405 def decodehexstring(s):
485 406 return set([h.decode('hex') for h in s.split(';')])
486 407
487 408 manifest = repo.vfs.tryread('pullbundles.manifest')
488 409 if not manifest:
489 410 return None
490 411 res = exchange.parseclonebundlesmanifest(repo, manifest)
491 412 res = exchange.filterclonebundleentries(repo, res)
492 413 if not res:
493 414 return None
494 415 cl = repo.changelog
495 416 heads_anc = cl.ancestors([cl.rev(rev) for rev in heads], inclusive=True)
496 417 common_anc = cl.ancestors([cl.rev(rev) for rev in common], inclusive=True)
497 418 compformats = clientcompressionsupport(proto)
498 419 for entry in res:
499 420 if 'COMPRESSION' in entry and entry['COMPRESSION'] not in compformats:
500 421 continue
501 422 # No test yet for VERSION, since V2 is supported by any client
502 423 # that advertises partial pulls
503 424 if 'heads' in entry:
504 425 try:
505 426 bundle_heads = decodehexstring(entry['heads'])
506 427 except TypeError:
507 428 # Bad heads entry
508 429 continue
509 430 if bundle_heads.issubset(common):
510 431 continue # Nothing new
511 432 if all(cl.rev(rev) in common_anc for rev in bundle_heads):
512 433 continue # Still nothing new
513 434 if any(cl.rev(rev) not in heads_anc and
514 435 cl.rev(rev) not in common_anc for rev in bundle_heads):
515 436 continue
516 437 if 'bases' in entry:
517 438 try:
518 439 bundle_bases = decodehexstring(entry['bases'])
519 440 except TypeError:
520 441 # Bad bases entry
521 442 continue
522 443 if not all(cl.rev(rev) in common_anc for rev in bundle_bases):
523 444 continue
524 445 path = entry['URL']
525 446 repo.ui.debug('sending pullbundle "%s"\n' % path)
526 447 try:
527 448 return repo.vfs.open(path)
528 449 except IOError:
529 450 repo.ui.debug('pullbundle "%s" not accessible\n' % path)
530 451 continue
531 452 return None
532 453
533 454 @wireprotocommand('getbundle', '*', permission='pull')
534 455 def getbundle(repo, proto, others):
535 456 opts = options('getbundle', wireprototypes.GETBUNDLE_ARGUMENTS.keys(),
536 457 others)
537 458 for k, v in opts.iteritems():
538 459 keytype = wireprototypes.GETBUNDLE_ARGUMENTS[k]
539 460 if keytype == 'nodes':
540 461 opts[k] = wireprototypes.decodelist(v)
541 462 elif keytype == 'csv':
542 463 opts[k] = list(v.split(','))
543 464 elif keytype == 'scsv':
544 465 opts[k] = set(v.split(','))
545 466 elif keytype == 'boolean':
546 467 # Client should serialize False as '0', which is a non-empty string
547 468 # so it evaluates as a True bool.
548 469 if v == '0':
549 470 opts[k] = False
550 471 else:
551 472 opts[k] = bool(v)
552 473 elif keytype != 'plain':
553 474 raise KeyError('unknown getbundle option type %s'
554 475 % keytype)
555 476
556 477 if not bundle1allowed(repo, 'pull'):
557 478 if not exchange.bundle2requested(opts.get('bundlecaps')):
558 479 if proto.name == 'http-v1':
559 480 return wireprototypes.ooberror(bundle2required)
560 481 raise error.Abort(bundle2requiredmain,
561 482 hint=bundle2requiredhint)
562 483
563 484 prefercompressed = True
564 485
565 486 try:
566 487 clheads = set(repo.changelog.heads())
567 488 heads = set(opts.get('heads', set()))
568 489 common = set(opts.get('common', set()))
569 490 common.discard(nullid)
570 491 if (repo.ui.configbool('server', 'pullbundle') and
571 492 'partial-pull' in proto.getprotocaps()):
572 493 # Check if a pre-built bundle covers this request.
573 494 bundle = find_pullbundle(repo, proto, opts, clheads, heads, common)
574 495 if bundle:
575 496 return wireprototypes.streamres(gen=util.filechunkiter(bundle),
576 497 prefer_uncompressed=True)
577 498
578 499 if repo.ui.configbool('server', 'disablefullbundle'):
579 500 # Check to see if this is a full clone.
580 501 changegroup = opts.get('cg', True)
581 502 if changegroup and not common and clheads == heads:
582 503 raise error.Abort(
583 504 _('server has pull-based clones disabled'),
584 505 hint=_('remove --pull if specified or upgrade Mercurial'))
585 506
586 507 info, chunks = exchange.getbundlechunks(repo, 'serve',
587 508 **pycompat.strkwargs(opts))
588 509 prefercompressed = info.get('prefercompressed', True)
589 510 except error.Abort as exc:
590 511 # cleanly forward Abort error to the client
591 512 if not exchange.bundle2requested(opts.get('bundlecaps')):
592 513 if proto.name == 'http-v1':
593 514 return wireprototypes.ooberror(pycompat.bytestr(exc) + '\n')
594 515 raise # cannot do better for bundle1 + ssh
595 516 # bundle2 request expect a bundle2 reply
596 517 bundler = bundle2.bundle20(repo.ui)
597 518 manargs = [('message', pycompat.bytestr(exc))]
598 519 advargs = []
599 520 if exc.hint is not None:
600 521 advargs.append(('hint', exc.hint))
601 522 bundler.addpart(bundle2.bundlepart('error:abort',
602 523 manargs, advargs))
603 524 chunks = bundler.getchunks()
604 525 prefercompressed = False
605 526
606 527 return wireprototypes.streamres(
607 528 gen=chunks, prefer_uncompressed=not prefercompressed)
608 529
609 530 @wireprotocommand('heads', permission='pull')
610 531 def heads(repo, proto):
611 532 h = repo.heads()
612 533 return wireprototypes.bytesresponse(wireprototypes.encodelist(h) + '\n')
613 534
614 535 @wireprotocommand('hello', permission='pull')
615 536 def hello(repo, proto):
616 537 """Called as part of SSH handshake to obtain server info.
617 538
618 539 Returns a list of lines describing interesting things about the
619 540 server, in an RFC822-like format.
620 541
621 542 Currently, the only one defined is ``capabilities``, which consists of a
622 543 line of space separated tokens describing server abilities:
623 544
624 545 capabilities: <token0> <token1> <token2>
625 546 """
626 547 caps = capabilities(repo, proto).data
627 548 return wireprototypes.bytesresponse('capabilities: %s\n' % caps)
628 549
629 550 @wireprotocommand('listkeys', 'namespace', permission='pull')
630 551 def listkeys(repo, proto, namespace):
631 552 d = sorted(repo.listkeys(encoding.tolocal(namespace)).items())
632 553 return wireprototypes.bytesresponse(pushkeymod.encodekeys(d))
633 554
634 555 @wireprotocommand('lookup', 'key', permission='pull')
635 556 def lookup(repo, proto, key):
636 557 try:
637 558 k = encoding.tolocal(key)
638 559 n = repo.lookup(k)
639 560 r = hex(n)
640 561 success = 1
641 562 except Exception as inst:
642 563 r = stringutil.forcebytestr(inst)
643 564 success = 0
644 565 return wireprototypes.bytesresponse('%d %s\n' % (success, r))
645 566
646 567 @wireprotocommand('known', 'nodes *', permission='pull')
647 568 def known(repo, proto, nodes, others):
648 569 v = ''.join(b and '1' or '0'
649 570 for b in repo.known(wireprototypes.decodelist(nodes)))
650 571 return wireprototypes.bytesresponse(v)
651 572
652 573 @wireprotocommand('protocaps', 'caps', permission='pull')
653 574 def protocaps(repo, proto, caps):
654 575 if proto.name == wireprototypes.SSHV1:
655 576 proto._protocaps = set(caps.split(' '))
656 577 return wireprototypes.bytesresponse('OK')
657 578
658 579 @wireprotocommand('pushkey', 'namespace key old new', permission='push')
659 580 def pushkey(repo, proto, namespace, key, old, new):
660 581 # compatibility with pre-1.8 clients which were accidentally
661 582 # sending raw binary nodes rather than utf-8-encoded hex
662 583 if len(new) == 20 and stringutil.escapestr(new) != new:
663 584 # looks like it could be a binary node
664 585 try:
665 586 new.decode('utf-8')
666 587 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
667 588 except UnicodeDecodeError:
668 589 pass # binary, leave unmodified
669 590 else:
670 591 new = encoding.tolocal(new) # normal path
671 592
672 593 with proto.mayberedirectstdio() as output:
673 594 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
674 595 encoding.tolocal(old), new) or False
675 596
676 597 output = output.getvalue() if output else ''
677 598 return wireprototypes.bytesresponse('%d\n%s' % (int(r), output))
678 599
679 600 @wireprotocommand('stream_out', permission='pull')
680 601 def stream(repo, proto):
681 602 '''If the server supports streaming clone, it advertises the "stream"
682 603 capability with a value representing the version and flags of the repo
683 604 it is serving. Client checks to see if it understands the format.
684 605 '''
685 606 return wireprototypes.streamreslegacy(
686 607 streamclone.generatev1wireproto(repo))
687 608
688 609 @wireprotocommand('unbundle', 'heads', permission='push')
689 610 def unbundle(repo, proto, heads):
690 611 their_heads = wireprototypes.decodelist(heads)
691 612
692 613 with proto.mayberedirectstdio() as output:
693 614 try:
694 615 exchange.check_heads(repo, their_heads, 'preparing changes')
695 616 cleanup = lambda: None
696 617 try:
697 618 payload = proto.getpayload()
698 619 if repo.ui.configbool('server', 'streamunbundle'):
699 620 def cleanup():
700 621 # Ensure that the full payload is consumed, so
701 622 # that the connection doesn't contain trailing garbage.
702 623 for p in payload:
703 624 pass
704 625 fp = util.chunkbuffer(payload)
705 626 else:
706 627 # write bundle data to temporary file as it can be big
707 628 fp, tempname = None, None
708 629 def cleanup():
709 630 if fp:
710 631 fp.close()
711 632 if tempname:
712 633 os.unlink(tempname)
713 634 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
714 635 repo.ui.debug('redirecting incoming bundle to %s\n' %
715 636 tempname)
716 637 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
717 638 r = 0
718 639 for p in payload:
719 640 fp.write(p)
720 641 fp.seek(0)
721 642
722 643 gen = exchange.readbundle(repo.ui, fp, None)
723 644 if (isinstance(gen, changegroupmod.cg1unpacker)
724 645 and not bundle1allowed(repo, 'push')):
725 646 if proto.name == 'http-v1':
726 647 # need to special case http because stderr do not get to
727 648 # the http client on failed push so we need to abuse
728 649 # some other error type to make sure the message get to
729 650 # the user.
730 651 return wireprototypes.ooberror(bundle2required)
731 652 raise error.Abort(bundle2requiredmain,
732 653 hint=bundle2requiredhint)
733 654
734 655 r = exchange.unbundle(repo, gen, their_heads, 'serve',
735 656 proto.client())
736 657 if util.safehasattr(r, 'addpart'):
737 658 # The return looks streamable, we are in the bundle2 case
738 659 # and should return a stream.
739 660 return wireprototypes.streamreslegacy(gen=r.getchunks())
740 661 return wireprototypes.pushres(
741 662 r, output.getvalue() if output else '')
742 663
743 664 finally:
744 665 cleanup()
745 666
746 667 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
747 668 # handle non-bundle2 case first
748 669 if not getattr(exc, 'duringunbundle2', False):
749 670 try:
750 671 raise
751 672 except error.Abort:
752 673 # The old code we moved used procutil.stderr directly.
753 674 # We did not change it to minimise code change.
754 675 # This need to be moved to something proper.
755 676 # Feel free to do it.
756 677 procutil.stderr.write("abort: %s\n" % exc)
757 678 if exc.hint is not None:
758 679 procutil.stderr.write("(%s)\n" % exc.hint)
759 680 procutil.stderr.flush()
760 681 return wireprototypes.pushres(
761 682 0, output.getvalue() if output else '')
762 683 except error.PushRaced:
763 684 return wireprototypes.pusherr(
764 685 pycompat.bytestr(exc),
765 686 output.getvalue() if output else '')
766 687
767 688 bundler = bundle2.bundle20(repo.ui)
768 689 for out in getattr(exc, '_bundle2salvagedoutput', ()):
769 690 bundler.addpart(out)
770 691 try:
771 692 try:
772 693 raise
773 694 except error.PushkeyFailed as exc:
774 695 # check client caps
775 696 remotecaps = getattr(exc, '_replycaps', None)
776 697 if (remotecaps is not None
777 698 and 'pushkey' not in remotecaps.get('error', ())):
778 699 # no support remote side, fallback to Abort handler.
779 700 raise
780 701 part = bundler.newpart('error:pushkey')
781 702 part.addparam('in-reply-to', exc.partid)
782 703 if exc.namespace is not None:
783 704 part.addparam('namespace', exc.namespace,
784 705 mandatory=False)
785 706 if exc.key is not None:
786 707 part.addparam('key', exc.key, mandatory=False)
787 708 if exc.new is not None:
788 709 part.addparam('new', exc.new, mandatory=False)
789 710 if exc.old is not None:
790 711 part.addparam('old', exc.old, mandatory=False)
791 712 if exc.ret is not None:
792 713 part.addparam('ret', exc.ret, mandatory=False)
793 714 except error.BundleValueError as exc:
794 715 errpart = bundler.newpart('error:unsupportedcontent')
795 716 if exc.parttype is not None:
796 717 errpart.addparam('parttype', exc.parttype)
797 718 if exc.params:
798 719 errpart.addparam('params', '\0'.join(exc.params))
799 720 except error.Abort as exc:
800 721 manargs = [('message', stringutil.forcebytestr(exc))]
801 722 advargs = []
802 723 if exc.hint is not None:
803 724 advargs.append(('hint', exc.hint))
804 725 bundler.addpart(bundle2.bundlepart('error:abort',
805 726 manargs, advargs))
806 727 except error.PushRaced as exc:
807 728 bundler.newpart('error:pushraced',
808 729 [('message', stringutil.forcebytestr(exc))])
809 730 return wireprototypes.streamreslegacy(gen=bundler.getchunks())
@@ -1,243 +1,321
1 1 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
2 2 #
3 3 # This software may be used and distributed according to the terms of the
4 4 # GNU General Public License version 2 or any later version.
5 5
6 6 from __future__ import absolute_import
7 7
8 8 from .node import (
9 9 bin,
10 10 hex,
11 11 )
12 12 from .thirdparty.zope import (
13 13 interface as zi,
14 14 )
15 15
16 16 # Names of the SSH protocol implementations.
17 17 SSHV1 = 'ssh-v1'
18 18 # These are advertised over the wire. Increment the counters at the end
19 19 # to reflect BC breakages.
20 20 SSHV2 = 'exp-ssh-v2-0001'
21 21 HTTP_WIREPROTO_V2 = 'exp-http-v2-0001'
22 22
23 23 # All available wire protocol transports.
24 24 TRANSPORTS = {
25 25 SSHV1: {
26 26 'transport': 'ssh',
27 27 'version': 1,
28 28 },
29 29 SSHV2: {
30 30 'transport': 'ssh',
31 31 # TODO mark as version 2 once all commands are implemented.
32 32 'version': 1,
33 33 },
34 34 'http-v1': {
35 35 'transport': 'http',
36 36 'version': 1,
37 37 },
38 38 HTTP_WIREPROTO_V2: {
39 39 'transport': 'http',
40 40 'version': 2,
41 41 }
42 42 }
43 43
44 44 class bytesresponse(object):
45 45 """A wire protocol response consisting of raw bytes."""
46 46 def __init__(self, data):
47 47 self.data = data
48 48
49 49 class ooberror(object):
50 50 """wireproto reply: failure of a batch of operation
51 51
52 52 Something failed during a batch call. The error message is stored in
53 53 `self.message`.
54 54 """
55 55 def __init__(self, message):
56 56 self.message = message
57 57
58 58 class pushres(object):
59 59 """wireproto reply: success with simple integer return
60 60
61 61 The call was successful and returned an integer contained in `self.res`.
62 62 """
63 63 def __init__(self, res, output):
64 64 self.res = res
65 65 self.output = output
66 66
67 67 class pusherr(object):
68 68 """wireproto reply: failure
69 69
70 70 The call failed. The `self.res` attribute contains the error message.
71 71 """
72 72 def __init__(self, res, output):
73 73 self.res = res
74 74 self.output = output
75 75
76 76 class streamres(object):
77 77 """wireproto reply: binary stream
78 78
79 79 The call was successful and the result is a stream.
80 80
81 81 Accepts a generator containing chunks of data to be sent to the client.
82 82
83 83 ``prefer_uncompressed`` indicates that the data is expected to be
84 84 uncompressable and that the stream should therefore use the ``none``
85 85 engine.
86 86 """
87 87 def __init__(self, gen=None, prefer_uncompressed=False):
88 88 self.gen = gen
89 89 self.prefer_uncompressed = prefer_uncompressed
90 90
91 91 class streamreslegacy(object):
92 92 """wireproto reply: uncompressed binary stream
93 93
94 94 The call was successful and the result is a stream.
95 95
96 96 Accepts a generator containing chunks of data to be sent to the client.
97 97
98 98 Like ``streamres``, but sends an uncompressed data for "version 1" clients
99 99 using the application/mercurial-0.1 media type.
100 100 """
101 101 def __init__(self, gen=None):
102 102 self.gen = gen
103 103
104 104 class cborresponse(object):
105 105 """Encode the response value as CBOR."""
106 106 def __init__(self, v):
107 107 self.value = v
108 108
109 109 class v2errorresponse(object):
110 110 """Represents a command error for version 2 transports."""
111 111 def __init__(self, message, args=None):
112 112 self.message = message
113 113 self.args = args
114 114
115 115 class v2streamingresponse(object):
116 116 """A response whose data is supplied by a generator.
117 117
118 118 The generator can either consist of data structures to CBOR
119 119 encode or a stream of already-encoded bytes.
120 120 """
121 121 def __init__(self, gen, compressible=True):
122 122 self.gen = gen
123 123 self.compressible = compressible
124 124
125 125 # list of nodes encoding / decoding
126 126 def decodelist(l, sep=' '):
127 127 if l:
128 128 return [bin(v) for v in l.split(sep)]
129 129 return []
130 130
131 131 def encodelist(l, sep=' '):
132 132 try:
133 133 return sep.join(map(hex, l))
134 134 except TypeError:
135 135 raise
136 136
137 137 # batched call argument encoding
138 138
139 139 def escapebatcharg(plain):
140 140 return (plain
141 141 .replace(':', ':c')
142 142 .replace(',', ':o')
143 143 .replace(';', ':s')
144 144 .replace('=', ':e'))
145 145
146 146 def unescapebatcharg(escaped):
147 147 return (escaped
148 148 .replace(':e', '=')
149 149 .replace(':s', ';')
150 150 .replace(':o', ',')
151 151 .replace(':c', ':'))
152 152
153 153 # mapping of options accepted by getbundle and their types
154 154 #
155 155 # Meant to be extended by extensions. It is extensions responsibility to ensure
156 156 # such options are properly processed in exchange.getbundle.
157 157 #
158 158 # supported types are:
159 159 #
160 160 # :nodes: list of binary nodes
161 161 # :csv: list of comma-separated values
162 162 # :scsv: list of comma-separated values return as set
163 163 # :plain: string with no transformation needed.
164 164 GETBUNDLE_ARGUMENTS = {
165 165 'heads': 'nodes',
166 166 'bookmarks': 'boolean',
167 167 'common': 'nodes',
168 168 'obsmarkers': 'boolean',
169 169 'phases': 'boolean',
170 170 'bundlecaps': 'scsv',
171 171 'listkeys': 'csv',
172 172 'cg': 'boolean',
173 173 'cbattempted': 'boolean',
174 174 'stream': 'boolean',
175 175 }
176 176
177 177 class baseprotocolhandler(zi.Interface):
178 178 """Abstract base class for wire protocol handlers.
179 179
180 180 A wire protocol handler serves as an interface between protocol command
181 181 handlers and the wire protocol transport layer. Protocol handlers provide
182 182 methods to read command arguments, redirect stdio for the duration of
183 183 the request, handle response types, etc.
184 184 """
185 185
186 186 name = zi.Attribute(
187 187 """The name of the protocol implementation.
188 188
189 189 Used for uniquely identifying the transport type.
190 190 """)
191 191
192 192 def getargs(args):
193 193 """return the value for arguments in <args>
194 194
195 195 For version 1 transports, returns a list of values in the same
196 196 order they appear in ``args``. For version 2 transports, returns
197 197 a dict mapping argument name to value.
198 198 """
199 199
200 200 def getprotocaps():
201 201 """Returns the list of protocol-level capabilities of client
202 202
203 203 Returns a list of capabilities as declared by the client for
204 204 the current request (or connection for stateful protocol handlers)."""
205 205
206 206 def getpayload():
207 207 """Provide a generator for the raw payload.
208 208
209 209 The caller is responsible for ensuring that the full payload is
210 210 processed.
211 211 """
212 212
213 213 def mayberedirectstdio():
214 214 """Context manager to possibly redirect stdio.
215 215
216 216 The context manager yields a file-object like object that receives
217 217 stdout and stderr output when the context manager is active. Or it
218 218 yields ``None`` if no I/O redirection occurs.
219 219
220 220 The intent of this context manager is to capture stdio output
221 221 so it may be sent in the response. Some transports support streaming
222 222 stdio to the client in real time. For these transports, stdio output
223 223 won't be captured.
224 224 """
225 225
226 226 def client():
227 227 """Returns a string representation of this client (as bytes)."""
228 228
229 229 def addcapabilities(repo, caps):
230 230 """Adds advertised capabilities specific to this protocol.
231 231
232 232 Receives the list of capabilities collected so far.
233 233
234 234 Returns a list of capabilities. The passed in argument can be returned.
235 235 """
236 236
237 237 def checkperm(perm):
238 238 """Validate that the client has permissions to perform a request.
239 239
240 240 The argument is the permission required to proceed. If the client
241 241 doesn't have that permission, the exception should raise or abort
242 242 in a protocol specific manner.
243 243 """
244
245 class commandentry(object):
246 """Represents a declared wire protocol command."""
247 def __init__(self, func, args='', transports=None,
248 permission='push'):
249 self.func = func
250 self.args = args
251 self.transports = transports or set()
252 self.permission = permission
253
254 def _merge(self, func, args):
255 """Merge this instance with an incoming 2-tuple.
256
257 This is called when a caller using the old 2-tuple API attempts
258 to replace an instance. The incoming values are merged with
259 data not captured by the 2-tuple and a new instance containing
260 the union of the two objects is returned.
261 """
262 return commandentry(func, args=args, transports=set(self.transports),
263 permission=self.permission)
264
265 # Old code treats instances as 2-tuples. So expose that interface.
266 def __iter__(self):
267 yield self.func
268 yield self.args
269
270 def __getitem__(self, i):
271 if i == 0:
272 return self.func
273 elif i == 1:
274 return self.args
275 else:
276 raise IndexError('can only access elements 0 and 1')
277
278 class commanddict(dict):
279 """Container for registered wire protocol commands.
280
281 It behaves like a dict. But __setitem__ is overwritten to allow silent
282 coercion of values from 2-tuples for API compatibility.
283 """
284 def __setitem__(self, k, v):
285 if isinstance(v, commandentry):
286 pass
287 # Cast 2-tuples to commandentry instances.
288 elif isinstance(v, tuple):
289 if len(v) != 2:
290 raise ValueError('command tuples must have exactly 2 elements')
291
292 # It is common for extensions to wrap wire protocol commands via
293 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
294 # doing this aren't aware of the new API that uses objects to store
295 # command entries, we automatically merge old state with new.
296 if k in self:
297 v = self[k]._merge(v[0], v[1])
298 else:
299 # Use default values from @wireprotocommand.
300 v = commandentry(v[0], args=v[1],
301 transports=set(TRANSPORTS),
302 permission='push')
303 else:
304 raise ValueError('command entries must be commandentry instances '
305 'or 2-tuples')
306
307 return super(commanddict, self).__setitem__(k, v)
308
309 def commandavailable(self, command, proto):
310 """Determine if a command is available for the requested protocol."""
311 assert proto.name in TRANSPORTS
312
313 entry = self.get(command)
314
315 if not entry:
316 return False
317
318 if proto.name not in entry.transports:
319 return False
320
321 return True
@@ -1,522 +1,522
1 1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 3 #
4 4 # This software may be used and distributed according to the terms of the
5 5 # GNU General Public License version 2 or any later version.
6 6
7 7 from __future__ import absolute_import
8 8
9 9 import contextlib
10 10
11 11 from .i18n import _
12 12 from .thirdparty import (
13 13 cbor,
14 14 )
15 15 from .thirdparty.zope import (
16 16 interface as zi,
17 17 )
18 18 from . import (
19 19 encoding,
20 20 error,
21 21 pycompat,
22 22 streamclone,
23 23 util,
24 24 wireproto,
25 25 wireprotoframing,
26 26 wireprototypes,
27 27 )
28 28
29 29 FRAMINGTYPE = b'application/mercurial-exp-framing-0005'
30 30
31 31 HTTP_WIREPROTO_V2 = wireprototypes.HTTP_WIREPROTO_V2
32 32
33 33 def handlehttpv2request(rctx, req, res, checkperm, urlparts):
34 34 from .hgweb import common as hgwebcommon
35 35
36 36 # URL space looks like: <permissions>/<command>, where <permission> can
37 37 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
38 38
39 39 # Root URL does nothing meaningful... yet.
40 40 if not urlparts:
41 41 res.status = b'200 OK'
42 42 res.headers[b'Content-Type'] = b'text/plain'
43 43 res.setbodybytes(_('HTTP version 2 API handler'))
44 44 return
45 45
46 46 if len(urlparts) == 1:
47 47 res.status = b'404 Not Found'
48 48 res.headers[b'Content-Type'] = b'text/plain'
49 49 res.setbodybytes(_('do not know how to process %s\n') %
50 50 req.dispatchpath)
51 51 return
52 52
53 53 permission, command = urlparts[0:2]
54 54
55 55 if permission not in (b'ro', b'rw'):
56 56 res.status = b'404 Not Found'
57 57 res.headers[b'Content-Type'] = b'text/plain'
58 58 res.setbodybytes(_('unknown permission: %s') % permission)
59 59 return
60 60
61 61 if req.method != 'POST':
62 62 res.status = b'405 Method Not Allowed'
63 63 res.headers[b'Allow'] = b'POST'
64 64 res.setbodybytes(_('commands require POST requests'))
65 65 return
66 66
67 67 # At some point we'll want to use our own API instead of recycling the
68 68 # behavior of version 1 of the wire protocol...
69 69 # TODO return reasonable responses - not responses that overload the
70 70 # HTTP status line message for error reporting.
71 71 try:
72 72 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
73 73 except hgwebcommon.ErrorResponse as e:
74 74 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
75 75 for k, v in e.headers:
76 76 res.headers[k] = v
77 77 res.setbodybytes('permission denied')
78 78 return
79 79
80 80 # We have a special endpoint to reflect the request back at the client.
81 81 if command == b'debugreflect':
82 82 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
83 83 return
84 84
85 85 # Extra commands that we handle that aren't really wire protocol
86 86 # commands. Think extra hard before making this hackery available to
87 87 # extension.
88 88 extracommands = {'multirequest'}
89 89
90 90 if command not in wireproto.commandsv2 and command not in extracommands:
91 91 res.status = b'404 Not Found'
92 92 res.headers[b'Content-Type'] = b'text/plain'
93 93 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
94 94 return
95 95
96 96 repo = rctx.repo
97 97 ui = repo.ui
98 98
99 99 proto = httpv2protocolhandler(req, ui)
100 100
101 101 if (not wireproto.commandsv2.commandavailable(command, proto)
102 102 and command not in extracommands):
103 103 res.status = b'404 Not Found'
104 104 res.headers[b'Content-Type'] = b'text/plain'
105 105 res.setbodybytes(_('invalid wire protocol command: %s') % command)
106 106 return
107 107
108 108 # TODO consider cases where proxies may add additional Accept headers.
109 109 if req.headers.get(b'Accept') != FRAMINGTYPE:
110 110 res.status = b'406 Not Acceptable'
111 111 res.headers[b'Content-Type'] = b'text/plain'
112 112 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
113 113 % FRAMINGTYPE)
114 114 return
115 115
116 116 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
117 117 res.status = b'415 Unsupported Media Type'
118 118 # TODO we should send a response with appropriate media type,
119 119 # since client does Accept it.
120 120 res.headers[b'Content-Type'] = b'text/plain'
121 121 res.setbodybytes(_('client MUST send Content-Type header with '
122 122 'value: %s\n') % FRAMINGTYPE)
123 123 return
124 124
125 125 _processhttpv2request(ui, repo, req, res, permission, command, proto)
126 126
127 127 def _processhttpv2reflectrequest(ui, repo, req, res):
128 128 """Reads unified frame protocol request and dumps out state to client.
129 129
130 130 This special endpoint can be used to help debug the wire protocol.
131 131
132 132 Instead of routing the request through the normal dispatch mechanism,
133 133 we instead read all frames, decode them, and feed them into our state
134 134 tracker. We then dump the log of all that activity back out to the
135 135 client.
136 136 """
137 137 import json
138 138
139 139 # Reflection APIs have a history of being abused, accidentally disclosing
140 140 # sensitive data, etc. So we have a config knob.
141 141 if not ui.configbool('experimental', 'web.api.debugreflect'):
142 142 res.status = b'404 Not Found'
143 143 res.headers[b'Content-Type'] = b'text/plain'
144 144 res.setbodybytes(_('debugreflect service not available'))
145 145 return
146 146
147 147 # We assume we have a unified framing protocol request body.
148 148
149 149 reactor = wireprotoframing.serverreactor()
150 150 states = []
151 151
152 152 while True:
153 153 frame = wireprotoframing.readframe(req.bodyfh)
154 154
155 155 if not frame:
156 156 states.append(b'received: <no frame>')
157 157 break
158 158
159 159 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
160 160 frame.requestid,
161 161 frame.payload))
162 162
163 163 action, meta = reactor.onframerecv(frame)
164 164 states.append(json.dumps((action, meta), sort_keys=True,
165 165 separators=(', ', ': ')))
166 166
167 167 action, meta = reactor.oninputeof()
168 168 meta['action'] = action
169 169 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
170 170
171 171 res.status = b'200 OK'
172 172 res.headers[b'Content-Type'] = b'text/plain'
173 173 res.setbodybytes(b'\n'.join(states))
174 174
175 175 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
176 176 """Post-validation handler for HTTPv2 requests.
177 177
178 178 Called when the HTTP request contains unified frame-based protocol
179 179 frames for evaluation.
180 180 """
181 181 # TODO Some HTTP clients are full duplex and can receive data before
182 182 # the entire request is transmitted. Figure out a way to indicate support
183 183 # for that so we can opt into full duplex mode.
184 184 reactor = wireprotoframing.serverreactor(deferoutput=True)
185 185 seencommand = False
186 186
187 187 outstream = reactor.makeoutputstream()
188 188
189 189 while True:
190 190 frame = wireprotoframing.readframe(req.bodyfh)
191 191 if not frame:
192 192 break
193 193
194 194 action, meta = reactor.onframerecv(frame)
195 195
196 196 if action == 'wantframe':
197 197 # Need more data before we can do anything.
198 198 continue
199 199 elif action == 'runcommand':
200 200 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
201 201 reqcommand, reactor, outstream,
202 202 meta, issubsequent=seencommand)
203 203
204 204 if sentoutput:
205 205 return
206 206
207 207 seencommand = True
208 208
209 209 elif action == 'error':
210 210 # TODO define proper error mechanism.
211 211 res.status = b'200 OK'
212 212 res.headers[b'Content-Type'] = b'text/plain'
213 213 res.setbodybytes(meta['message'] + b'\n')
214 214 return
215 215 else:
216 216 raise error.ProgrammingError(
217 217 'unhandled action from frame processor: %s' % action)
218 218
219 219 action, meta = reactor.oninputeof()
220 220 if action == 'sendframes':
221 221 # We assume we haven't started sending the response yet. If we're
222 222 # wrong, the response type will raise an exception.
223 223 res.status = b'200 OK'
224 224 res.headers[b'Content-Type'] = FRAMINGTYPE
225 225 res.setbodygen(meta['framegen'])
226 226 elif action == 'noop':
227 227 pass
228 228 else:
229 229 raise error.ProgrammingError('unhandled action from frame processor: %s'
230 230 % action)
231 231
232 232 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
233 233 outstream, command, issubsequent):
234 234 """Dispatch a wire protocol command made from HTTPv2 requests.
235 235
236 236 The authenticated permission (``authedperm``) along with the original
237 237 command from the URL (``reqcommand``) are passed in.
238 238 """
239 239 # We already validated that the session has permissions to perform the
240 240 # actions in ``authedperm``. In the unified frame protocol, the canonical
241 241 # command to run is expressed in a frame. However, the URL also requested
242 242 # to run a specific command. We need to be careful that the command we
243 243 # run doesn't have permissions requirements greater than what was granted
244 244 # by ``authedperm``.
245 245 #
246 246 # Our rule for this is we only allow one command per HTTP request and
247 247 # that command must match the command in the URL. However, we make
248 248 # an exception for the ``multirequest`` URL. This URL is allowed to
249 249 # execute multiple commands. We double check permissions of each command
250 250 # as it is invoked to ensure there is no privilege escalation.
251 251 # TODO consider allowing multiple commands to regular command URLs
252 252 # iff each command is the same.
253 253
254 254 proto = httpv2protocolhandler(req, ui, args=command['args'])
255 255
256 256 if reqcommand == b'multirequest':
257 257 if not wireproto.commandsv2.commandavailable(command['command'], proto):
258 258 # TODO proper error mechanism
259 259 res.status = b'200 OK'
260 260 res.headers[b'Content-Type'] = b'text/plain'
261 261 res.setbodybytes(_('wire protocol command not available: %s') %
262 262 command['command'])
263 263 return True
264 264
265 265 # TODO don't use assert here, since it may be elided by -O.
266 266 assert authedperm in (b'ro', b'rw')
267 267 wirecommand = wireproto.commandsv2[command['command']]
268 268 assert wirecommand.permission in ('push', 'pull')
269 269
270 270 if authedperm == b'ro' and wirecommand.permission != 'pull':
271 271 # TODO proper error mechanism
272 272 res.status = b'403 Forbidden'
273 273 res.headers[b'Content-Type'] = b'text/plain'
274 274 res.setbodybytes(_('insufficient permissions to execute '
275 275 'command: %s') % command['command'])
276 276 return True
277 277
278 278 # TODO should we also call checkperm() here? Maybe not if we're going
279 279 # to overhaul that API. The granted scope from the URL check should
280 280 # be good enough.
281 281
282 282 else:
283 283 # Don't allow multiple commands outside of ``multirequest`` URL.
284 284 if issubsequent:
285 285 # TODO proper error mechanism
286 286 res.status = b'200 OK'
287 287 res.headers[b'Content-Type'] = b'text/plain'
288 288 res.setbodybytes(_('multiple commands cannot be issued to this '
289 289 'URL'))
290 290 return True
291 291
292 292 if reqcommand != command['command']:
293 293 # TODO define proper error mechanism
294 294 res.status = b'200 OK'
295 295 res.headers[b'Content-Type'] = b'text/plain'
296 296 res.setbodybytes(_('command in frame must match command in URL'))
297 297 return True
298 298
299 299 rsp = wireproto.dispatch(repo, proto, command['command'])
300 300
301 301 res.status = b'200 OK'
302 302 res.headers[b'Content-Type'] = FRAMINGTYPE
303 303
304 304 if isinstance(rsp, wireprototypes.cborresponse):
305 305 encoded = cbor.dumps(rsp.value, canonical=True)
306 306 action, meta = reactor.oncommandresponseready(outstream,
307 307 command['requestid'],
308 308 encoded)
309 309 elif isinstance(rsp, wireprototypes.v2streamingresponse):
310 310 action, meta = reactor.oncommandresponsereadygen(outstream,
311 311 command['requestid'],
312 312 rsp.gen)
313 313 elif isinstance(rsp, wireprototypes.v2errorresponse):
314 314 action, meta = reactor.oncommanderror(outstream,
315 315 command['requestid'],
316 316 rsp.message,
317 317 rsp.args)
318 318 else:
319 319 action, meta = reactor.onservererror(
320 320 _('unhandled response type from wire proto command'))
321 321
322 322 if action == 'sendframes':
323 323 res.setbodygen(meta['framegen'])
324 324 return True
325 325 elif action == 'noop':
326 326 return False
327 327 else:
328 328 raise error.ProgrammingError('unhandled event from reactor: %s' %
329 329 action)
330 330
331 331 @zi.implementer(wireprototypes.baseprotocolhandler)
332 332 class httpv2protocolhandler(object):
333 333 def __init__(self, req, ui, args=None):
334 334 self._req = req
335 335 self._ui = ui
336 336 self._args = args
337 337
338 338 @property
339 339 def name(self):
340 340 return HTTP_WIREPROTO_V2
341 341
342 342 def getargs(self, args):
343 343 data = {}
344 344 for k, typ in args.items():
345 345 if k == '*':
346 346 raise NotImplementedError('do not support * args')
347 347 elif k in self._args:
348 348 # TODO consider validating value types.
349 349 data[k] = self._args[k]
350 350
351 351 return data
352 352
353 353 def getprotocaps(self):
354 354 # Protocol capabilities are currently not implemented for HTTP V2.
355 355 return set()
356 356
357 357 def getpayload(self):
358 358 raise NotImplementedError
359 359
360 360 @contextlib.contextmanager
361 361 def mayberedirectstdio(self):
362 362 raise NotImplementedError
363 363
364 364 def client(self):
365 365 raise NotImplementedError
366 366
367 367 def addcapabilities(self, repo, caps):
368 368 return caps
369 369
370 370 def checkperm(self, perm):
371 371 raise NotImplementedError
372 372
373 373 def httpv2apidescriptor(req, repo):
374 374 proto = httpv2protocolhandler(req, repo.ui)
375 375
376 376 return _capabilitiesv2(repo, proto)
377 377
378 378 def _capabilitiesv2(repo, proto):
379 379 """Obtain the set of capabilities for version 2 transports.
380 380
381 381 These capabilities are distinct from the capabilities for version 1
382 382 transports.
383 383 """
384 384 compression = []
385 385 for engine in wireproto.supportedcompengines(repo.ui, util.SERVERROLE):
386 386 compression.append({
387 387 b'name': engine.wireprotosupport().name,
388 388 })
389 389
390 390 caps = {
391 391 'commands': {},
392 392 'compression': compression,
393 393 'framingmediatypes': [FRAMINGTYPE],
394 394 }
395 395
396 396 for command, entry in wireproto.commandsv2.items():
397 397 caps['commands'][command] = {
398 398 'args': entry.args,
399 399 'permissions': [entry.permission],
400 400 }
401 401
402 402 if streamclone.allowservergeneration(repo):
403 403 caps['rawrepoformats'] = sorted(repo.requirements &
404 404 repo.supportedformats)
405 405
406 406 return proto.addcapabilities(repo, caps)
407 407
408 408 def wireprotocommand(name, args=None, permission='push'):
409 409 """Decorator to declare a wire protocol command.
410 410
411 411 ``name`` is the name of the wire protocol command being provided.
412 412
413 413 ``args`` is a dict of argument names to example values.
414 414
415 415 ``permission`` defines the permission type needed to run this command.
416 416 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
417 417 respectively. Default is to assume command requires ``push`` permissions
418 418 because otherwise commands not declaring their permissions could modify
419 419 a repository that is supposed to be read-only.
420 420 """
421 421 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
422 422 if v['version'] == 2}
423 423
424 424 if permission not in ('push', 'pull'):
425 425 raise error.ProgrammingError('invalid wire protocol permission; '
426 426 'got %s; expected "push" or "pull"' %
427 427 permission)
428 428
429 429 if args is None:
430 430 args = {}
431 431
432 432 if not isinstance(args, dict):
433 433 raise error.ProgrammingError('arguments for version 2 commands '
434 434 'must be declared as dicts')
435 435
436 436 def register(func):
437 437 if name in wireproto.commandsv2:
438 438 raise error.ProgrammingError('%s command already registered '
439 439 'for version 2' % name)
440 440
441 wireproto.commandsv2[name] = wireproto.commandentry(
441 wireproto.commandsv2[name] = wireprototypes.commandentry(
442 442 func, args=args, transports=transports, permission=permission)
443 443
444 444 return func
445 445
446 446 return register
447 447
448 448 @wireprotocommand('branchmap', permission='pull')
449 449 def branchmapv2(repo, proto):
450 450 branchmap = {encoding.fromlocal(k): v
451 451 for k, v in repo.branchmap().iteritems()}
452 452
453 453 return wireprototypes.cborresponse(branchmap)
454 454
455 455 @wireprotocommand('capabilities', permission='pull')
456 456 def capabilitiesv2(repo, proto):
457 457 caps = _capabilitiesv2(repo, proto)
458 458
459 459 return wireprototypes.cborresponse(caps)
460 460
461 461 @wireprotocommand('heads',
462 462 args={
463 463 'publiconly': False,
464 464 },
465 465 permission='pull')
466 466 def headsv2(repo, proto, publiconly=False):
467 467 if publiconly:
468 468 repo = repo.filtered('immutable')
469 469
470 470 return wireprototypes.cborresponse(repo.heads())
471 471
472 472 @wireprotocommand('known',
473 473 args={
474 474 'nodes': [b'deadbeef'],
475 475 },
476 476 permission='pull')
477 477 def knownv2(repo, proto, nodes=None):
478 478 nodes = nodes or []
479 479 result = b''.join(b'1' if n else b'0' for n in repo.known(nodes))
480 480 return wireprototypes.cborresponse(result)
481 481
482 482 @wireprotocommand('listkeys',
483 483 args={
484 484 'namespace': b'ns',
485 485 },
486 486 permission='pull')
487 487 def listkeysv2(repo, proto, namespace=None):
488 488 keys = repo.listkeys(encoding.tolocal(namespace))
489 489 keys = {encoding.fromlocal(k): encoding.fromlocal(v)
490 490 for k, v in keys.iteritems()}
491 491
492 492 return wireprototypes.cborresponse(keys)
493 493
494 494 @wireprotocommand('lookup',
495 495 args={
496 496 'key': b'foo',
497 497 },
498 498 permission='pull')
499 499 def lookupv2(repo, proto, key):
500 500 key = encoding.tolocal(key)
501 501
502 502 # TODO handle exception.
503 503 node = repo.lookup(key)
504 504
505 505 return wireprototypes.cborresponse(node)
506 506
507 507 @wireprotocommand('pushkey',
508 508 args={
509 509 'namespace': b'ns',
510 510 'key': b'key',
511 511 'old': b'old',
512 512 'new': b'new',
513 513 },
514 514 permission='push')
515 515 def pushkeyv2(repo, proto, namespace, key, old, new):
516 516 # TODO handle ui output redirection
517 517 r = repo.pushkey(encoding.tolocal(namespace),
518 518 encoding.tolocal(key),
519 519 encoding.tolocal(old),
520 520 encoding.tolocal(new))
521 521
522 522 return wireprototypes.cborresponse(r)
General Comments 0
You need to be logged in to leave comments. Login now